Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: CI

on:
push:
branches: [master]
branches: [main]
pull_request:
branches: [master]
branches: [main]

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ Thumbs.db

# Local
/audits
*.dump
45 changes: 45 additions & 0 deletions backend/alembic/versions/96db4b158ec4_add_sub_workflows_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""add sub_workflows table

Revision ID: 96db4b158ec4
Revises: a8908de4ea3e
Create Date: 2026-03-08 15:01:40.215422

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision: str = '96db4b158ec4'
down_revision: Union[str, Sequence[str], None] = 'a8908de4ea3e'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('sub_workflows',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=150), nullable=False),
sa.Column('description', sa.String(length=500), nullable=False),
sa.Column('graph', sa.JSON().with_variant(postgresql.JSONB(astext_type=sa.Text()), 'postgresql'), nullable=True),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False),
sa.Column('updated_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False),
sa.PrimaryKeyConstraint('id', name=op.f('pk_sub_workflows'))
)
op.create_index(op.f('ix_sub_workflows_name'), 'sub_workflows', ['name'], unique=False)
op.create_index(op.f('ix_sub_workflows_user_id'), 'sub_workflows', ['user_id'], unique=False)
# ### end Alembic commands ###


def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_sub_workflows_user_id'), table_name='sub_workflows')
op.drop_index(op.f('ix_sub_workflows_name'), table_name='sub_workflows')
op.drop_table('sub_workflows')
# ### end Alembic commands ###
2 changes: 2 additions & 0 deletions backend/app/api/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from app.api.data_units import router as data_units_router
from app.api.handlers import router as handlers_router
from app.api.logs import router as logs_router
from app.api.sub_workflows import router as sub_workflows_router
from app.api.system import router as system_router
from app.api.widgets import router as widgets_router
from app.api.workflow import router as workflow_router
Expand All @@ -20,5 +21,6 @@
api_router.include_router(widgets_router)
api_router.include_router(workflow_router)
api_router.include_router(actions_router)
api_router.include_router(sub_workflows_router)
api_router.include_router(logs_router)
api_router.include_router(system_router)
167 changes: 167 additions & 0 deletions backend/app/api/sub_workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import logging

from fastapi import APIRouter, HTTPException, status
from sqlalchemy import select

from app.dependencies import CurrentUser, DbSession, HandlerManagerDep
from app.models.sub_workflow import SubWorkflow
from app.schemas.sub_workflow import (
SubWorkflowCreate,
SubWorkflowRead,
SubWorkflowSummary,
SubWorkflowUpdate,
)
from app.schemas.workflow import WorkflowData
from app.socketio_app import sio

logger = logging.getLogger(__name__)

router = APIRouter(prefix="/sub-workflows", tags=["sub-workflows"])


@router.get("/", response_model=list[SubWorkflowSummary])
async def list_sub_workflows(db: DbSession, _current_user: CurrentUser):
result = await db.execute(select(SubWorkflow).order_by(SubWorkflow.id.asc()))
return result.scalars().all()


@router.get("/{sub_workflow_id}", response_model=SubWorkflowRead)
async def get_sub_workflow(sub_workflow_id: int, db: DbSession, _current_user: CurrentUser):
result = await db.execute(select(SubWorkflow).where(SubWorkflow.id == sub_workflow_id))
sw = result.scalar_one_or_none()
if not sw:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Sub-workflow not found")
return sw


@router.post("/", response_model=SubWorkflowRead, status_code=status.HTTP_201_CREATED)
async def create_sub_workflow(body: SubWorkflowCreate, db: DbSession, current_user: CurrentUser):
sw = SubWorkflow(name=body.name, description=body.description, user_id=current_user.id)
db.add(sw)
await db.commit()
await db.refresh(sw)
await sio.emit("mutate", {"entity": "sub_workflows"})
return sw


@router.patch("/{sub_workflow_id}", response_model=SubWorkflowRead)
async def update_sub_workflow(sub_workflow_id: int, body: SubWorkflowUpdate, db: DbSession, _current_user: CurrentUser):
result = await db.execute(select(SubWorkflow).where(SubWorkflow.id == sub_workflow_id))
sw = result.scalar_one_or_none()
if not sw:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Sub-workflow not found")
for field, value in body.model_dump(exclude_unset=True).items():
setattr(sw, field, value)
await db.commit()
await db.refresh(sw)
await sio.emit("mutate", {"entity": "sub_workflows"})
return sw


@router.put("/{sub_workflow_id}/graph", response_model=WorkflowData)
async def save_sub_workflow_graph(
sub_workflow_id: int,
body: WorkflowData,
db: DbSession,
_current_user: CurrentUser,
manager: HandlerManagerDep,
):
result = await db.execute(select(SubWorkflow).where(SubWorkflow.id == sub_workflow_id))
sw = result.scalar_one_or_none()
if not sw:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Sub-workflow not found")

graph_dict = body.model_dump(by_alias=True)

# Load sub-workflow registry for cross-graph validation
all_sws = await db.execute(select(SubWorkflow))
registry: dict[int, dict] = {}
for other_sw in all_sws.scalars().all():
if other_sw.id != sub_workflow_id:
registry[other_sw.id] = {"name": other_sw.name, "graph": other_sw.graph}

# Validate the sub-workflow graph (cycle detection, unknown types, recursion)
from app.nodes.graph import CycleDetectedError, SubWorkflowRecursionError

try:
manager.validate_sub_workflow_graph(sub_workflow_id, graph_dict, registry=registry)
except CycleDetectedError as e:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail={"message": str(e), "node_id": e.node_id},
) from e
except SubWorkflowRecursionError as e:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail={"message": str(e)},
) from e

sw.graph = graph_dict
await db.commit()

# Rebuild the main workflow so it picks up updated sub-workflow graphs
await manager.rebuild_workflow(db)

await sio.emit("mutate", {"entity": "sub_workflows"})
await sio.emit("mutate", {"entity": "workflow"})
logger.info("Sub-workflow %d graph saved", sub_workflow_id)
return graph_dict


@router.delete("/{sub_workflow_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_sub_workflow(
sub_workflow_id: int, db: DbSession, _current_user: CurrentUser, manager: HandlerManagerDep
):
result = await db.execute(select(SubWorkflow).where(SubWorkflow.id == sub_workflow_id))
sw = result.scalar_one_or_none()
if not sw:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Sub-workflow not found")

# Check if any workflow or other sub-workflow references this one
refs = await _find_references(db, sub_workflow_id)
if refs:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Sub-workflow is referenced by: {', '.join(refs)}",
)

await db.delete(sw)
await db.commit()

await manager.rebuild_workflow(db)

await sio.emit("mutate", {"entity": "sub_workflows"})
await sio.emit("mutate", {"entity": "workflow"})


async def _find_references(db: DbSession, sub_workflow_id: int) -> list[str]:
"""Find all workflows/sub-workflows that reference the given sub-workflow ID."""
from app.models.settings import Settings

refs: list[str] = []

# Check main workflow
settings_result = await db.execute(select(Settings).limit(1))
settings = settings_result.scalar_one_or_none()
if settings and settings.actions_node_map:
for node in settings.actions_node_map.get("nodes", []):
if _node_references_sub_workflow(node, sub_workflow_id):
refs.append("Main Workflow")
break

# Check other sub-workflows
sws_result = await db.execute(select(SubWorkflow).where(SubWorkflow.id != sub_workflow_id))
for other_sw in sws_result.scalars().all():
if other_sw.graph:
for node in other_sw.graph.get("nodes", []):
if _node_references_sub_workflow(node, sub_workflow_id):
refs.append(other_sw.name)
break

return refs


def _node_references_sub_workflow(node: dict, sub_workflow_id: int) -> bool:
"""Check if a node is a SubWorkflowNode referencing the given sub-workflow."""
node_type = node.get("type", "")
return node_type == f"sub_workflow_{sub_workflow_id}"
101 changes: 95 additions & 6 deletions backend/app/api/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
from app.models.attribute import Attribute
from app.models.handler import Handler
from app.models.settings import Settings
from app.nodes import NODES
from app.nodes.graph import CycleDetectedError
from app.models.sub_workflow import SubWorkflow
from app.nodes import NODES, SUB_WORKFLOW_NODES
from app.nodes.graph import CycleDetectedError, derive_sub_workflow_ports
from app.schemas.workflow import NodeDefinition, WorkflowData

logger = logging.getLogger(__name__)
Expand All @@ -19,8 +20,17 @@


@router.get("/nodes", response_model=list[NodeDefinition])
async def get_node_definitions(db: DbSession, _current_user: CurrentUser, manager: HandlerManagerDep):
"""Return all node type definitions with dynamic select options populated."""
async def get_node_definitions(
db: DbSession,
_current_user: CurrentUser,
manager: HandlerManagerDep,
context: str = "main",
):
"""Return all node type definitions with dynamic select options populated.

Args:
context: "main" for the main workflow editor, "sub_workflow" for sub-workflow editing.
"""
# Load dynamic options from DB
handlers_result = await db.execute(
select(Handler.id, Handler.type, Handler.label).order_by(Handler.order.asc().nullslast(), Handler.id.asc())
Expand Down Expand Up @@ -61,18 +71,91 @@ async def get_node_definitions(db: DbSession, _current_user: CurrentUser, manage
"handler_data_key": manager.get_all_data_keys(attr_labels),
}

# Determine which node classes to include based on context
if context == "sub_workflow":
# Inside sub-workflow: include standard nodes (except listeners) + Input/Output
from app.nodes.attribute_reader import AttributeReader
from app.nodes.handler_listener import HandlerListener

node_classes = [n for n in NODES if n not in (HandlerListener, AttributeReader)]
node_classes.extend(SUB_WORKFLOW_NODES)
else:
# Main workflow: all standard nodes
node_classes = list(NODES)

definitions = []
for node_cls in NODES:
for node_cls in node_classes:
defn = node_cls.get_definition()
# Inject dynamic options for select ports with empty options
for port in defn["input_ports"]:
if port["control"] in ("select", "tree-select") and not port["options"]:
port["options"] = dynamic_options.get(port["type"], [])
definitions.append(defn)

# Generate one NodeDefinition per sub-workflow (available in both contexts)
sub_workflows_result = await db.execute(select(SubWorkflow).order_by(SubWorkflow.id.asc()))
for sw in sub_workflows_result.scalars().all():
defn = _build_sub_workflow_node_definition(sw)
if defn:
definitions.append(defn)

return definitions


def _build_sub_workflow_node_definition(sw: SubWorkflow) -> dict | None:
"""Build a NodeDefinition for a sub-workflow based on its graph's Input/Output nodes."""
if not sw.graph:
# Sub-workflow with no graph yet — still show it with no ports
return {
"type": f"sub_workflow_{sw.id}",
"label": sw.name,
"description": sw.description or "Sub-workflow",
"input_ports": [],
"output_ports": [],
}

try:
input_ports, output_ports = derive_sub_workflow_ports(sw.graph)
except Exception:
logger.warning("Failed to derive ports for sub-workflow %d", sw.id)
input_ports, output_ports = [], []

# Convert derived ports to NodeDefinition format
port_colors = {"event": "yellow", "value": "blue"}
input_port_defs = [
{
"name": p["name"],
"type": p["type"],
"label": p["name"],
"color": port_colors.get(p["type"], ""),
"control": "",
"options": [],
"data_key": p["name"],
}
for p in input_ports
]
output_port_defs = [
{
"name": p["name"],
"type": p["type"],
"label": p["name"],
"color": port_colors.get(p["type"], ""),
"control": "",
"options": [],
"data_key": p["name"],
}
for p in output_ports
]

return {
"type": f"sub_workflow_{sw.id}",
"label": sw.name,
"description": sw.description or "Sub-workflow",
"input_ports": input_port_defs,
"output_ports": output_port_defs,
}


@router.get("/", response_model=WorkflowData)
async def get_workflow(db: DbSession, _current_user: CurrentUser):
"""Load saved workflow from Settings."""
Expand All @@ -88,9 +171,15 @@ async def save_workflow(body: WorkflowData, db: DbSession, _current_user: Curren
"""Validate and save workflow, then rebuild the execution graph."""
workflow_dict = body.model_dump(by_alias=True)

# Load sub-workflow registry for validation
sw_result = await db.execute(select(SubWorkflow))
registry: dict[int, dict] = {}
for sw in sw_result.scalars().all():
registry[sw.id] = {"name": sw.name, "graph": sw.graph}

# Validate by building the graph (detects cycles, unknown types) without installing it
try:
graph = manager.build_workflow(workflow_dict)
graph = manager.build_workflow(workflow_dict, sub_workflow_registry=registry)
except CycleDetectedError as e:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
Expand Down
Loading