diff --git a/AGENTS.md b/AGENTS.md index ae89322..f046fdf 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -120,6 +120,56 @@ make validate-examples # validate all examples 7. Routes evaluated via `Router` using Jinja2 or simpleeval expressions 8. Final output built from templates in `output:` section +### Notifications (domain events) + +A `type: notification` step lets workflow authors publish typed, declared **domain events** to a dedicated JSONL stream for external tooling to hook off. Distinct from execution events (`agent_started`, `route_evaluated`, …): execution events describe what the runtime is doing; notifications describe what the workflow is asserting to the outside world. + +**Notifications are a visibility primitive, not a control-flow primitive.** They do not block, do not wait for an ack, and do not gate routing. If a decision needs to flow back into the workflow, use `human_gate`. Routes on a notification step fire independently of emission. + +YAML surface (full example in `examples/notifications.yaml`): + +```yaml +workflow: + notifications: + namespace: polyphony.feature_pr # optional; defaults to slugified workflow name + correlation: [apex_id] # workflow input keys auto-surfaced + inherited by sub-workflows + types: + pr_ready: + version: 1 + payload: + pr_url: { type: string } + pr_id: { type: number } + +agents: + - name: announce_ready + type: notification + notification: pr_ready + payload: + pr_url: "{{ open_pr.output.url }}" + pr_id: "{{ open_pr.output.id }}" + routes: + - to: poll_for_review # continues; emission is independent of routing +``` + +**Envelope** (the `data` of each `notification` event): + +- `emission_id` — `::`; stable across resume/replay so consumers can dedupe at the first hop. +- `schema_id` — `.@`; consumers MUST pin on this, not on `notification_type` alone. +- `namespace`, `notification_type`, `version`, `run_id`, `workflow`, `source_agent`, `subworkflow_path`. +- `correlation` — first-class field, auto-merged from declared correlation keys and inherited from parent workflows (parent wins on key collision so the upstream trail survives). +- `workflow_metadata` — passthrough from `workflow.metadata`. +- `payload` — declared per-type fields, Jinja2-rendered then type-validated against the `OutputField` schema. + +**Delivery (v1):** every emission flows through `WorkflowEventEmitter` and is written to `$TMPDIR/conductor/conductor---.notifications.jsonl` by `NotificationLogSubscriber`. The path is printed at end-of-run alongside the event log path. No webhooks, shell hooks, or MCP-driven emission in v1. + +**Schema-version-bump policy** (documented; not enforced): +- Adding an optional field is non-breaking; do **not** bump `version`. +- Removing/renaming a field, changing a field's type, or making an optional field required is breaking; bump `version` and emit under the new `schema_id`. +- During a transition a workflow MAY emit both `pr_ready@1` and `pr_ready@2` from adjacent steps. +- Consumers MUST pin on `schema_id`. + +**Validation failures** (undeclared type, payload field mismatch, unknown correlation key, type mismatch on a rendered value) fail the workflow loudly with `ValidationError`. The contract is the whole point. + ### Key Patterns - **Context modes**: `accumulate` (all prior outputs), `last_only` (previous only), `explicit` (only declared inputs) diff --git a/examples/notifications.yaml b/examples/notifications.yaml new file mode 100644 index 0000000..5f1796f --- /dev/null +++ b/examples/notifications.yaml @@ -0,0 +1,125 @@ +# Notifications Example +# +# Demonstrates user-defined domain notifications: fire-and-forget messages +# the workflow publishes to the outside world for external tooling to hook +# off. Notifications are a VISIBILITY primitive — they do not block, do +# not wait for an ack, and do not gate control flow. If you need a +# decision to flow back into the workflow, use a human_gate instead. +# +# Each emission is written to a dedicated notifications.jsonl file +# alongside the standard event log. The file path is printed at the end +# of the run. +# +# Usage: +# conductor run examples/notifications.yaml \ +# --input pr_id=42 \ +# --input apex_id=apex-2026-05-18 + +workflow: + name: notifications-demo + description: Demonstrates user-defined fire-and-forget notifications + version: "1.0.0" + entry_point: open_pr + + runtime: + provider: copilot + + limits: + max_iterations: 10 + + input: + pr_id: + type: number + description: ADO pull-request id + apex_id: + type: string + description: Apex run id (correlation key) + ado_org: + type: string + default: contoso + ado_project: + type: string + default: feature-pr + + # Workflow-level notification configuration. + notifications: + # Optional. Defaults to the slugified workflow name. + namespace: polyphony.feature_pr + + # Workflow input keys auto-surfaced on every notification. Inherited + # by sub-workflows (parent wins on key collision). + correlation: + - apex_id + + # Type registry. Each entry declares a fixed payload schema and a + # version. Consumers MUST pin on schema_id (.@), + # not on type name alone. + types: + pr_ready: + version: 1 + description: A pull request has reached ready-for-review state. + payload: + pr_url: { type: string } + pr_id: { type: number } + ado_org: { type: string } + ado_project: { type: string } + + pr_needs_input: + version: 1 + description: A PR has changed and requires reviewer input. + payload: + pr_url: { type: string } + change_summary: { type: string } + +agents: + - name: open_pr + description: Stand-in for whatever opens / discovers the PR. + model: claude-haiku-4.5 + prompt: | + Pretend you are creating pull request {{ workflow.input.pr_id }} in + {{ workflow.input.ado_org }}/{{ workflow.input.ado_project }}. + Reply with a one-sentence summary of what was done. + output: + summary: + type: string + routes: + - to: announce_ready + + # Notifications fire mid-flow. Note we route to the next step, NOT to + # $end — emission is independent of routing. + - name: announce_ready + type: notification + notification: pr_ready + payload: + pr_url: "https://dev.azure.com/{{ workflow.input.ado_org }}/{{ workflow.input.ado_project }}/_git/repo/pullrequest/{{ workflow.input.pr_id }}" + pr_id: "{{ workflow.input.pr_id }}" + ado_org: "{{ workflow.input.ado_org }}" + ado_project: "{{ workflow.input.ado_project }}" + routes: + - to: detect_change + + - name: detect_change + description: Stand-in for whatever notices the PR changed. + model: claude-haiku-4.5 + prompt: | + Pretend a reviewer pushed a small change to PR + {{ workflow.input.pr_id }}. Reply with a one-line description of + the change. + output: + change: + type: string + routes: + - to: announce_needs_input + + - name: announce_needs_input + type: notification + notification: pr_needs_input + payload: + pr_url: "https://dev.azure.com/{{ workflow.input.ado_org }}/{{ workflow.input.ado_project }}/_git/repo/pullrequest/{{ workflow.input.pr_id }}" + change_summary: "{{ detect_change.output.change }}" + routes: + - to: $end + +output: + pr_summary: "{{ open_pr.output.summary }}" + change: "{{ detect_change.output.change }}" diff --git a/src/conductor/cli/run.py b/src/conductor/cli/run.py index 722aec1..78500f5 100644 --- a/src/conductor/cli/run.py +++ b/src/conductor/cli/run.py @@ -1154,6 +1154,7 @@ async def run_workflow_async( # Always create event emitter and JSONL log subscriber emitter = WorkflowEventEmitter() event_log_subscriber: Any = None + notification_log_subscriber: Any = None dashboard: Any = None if web: @@ -1198,11 +1199,20 @@ async def run_workflow_async( verbose_log(f"Agents: {len(config.agents)}") # Start JSONL event log subscriber (always-on structured diagnostics) - from conductor.engine.event_log import EventLogSubscriber + from conductor.engine.event_log import EventLogSubscriber, NotificationLogSubscriber event_log_subscriber = EventLogSubscriber(config.workflow.name) emitter.subscribe(event_log_subscriber.on_event) + # Dedicated notifications.jsonl for external tooling that only cares + # about domain notifications, not execution telemetry. Share the same + # run_id so the two files have aligned filename stems. + notification_log_subscriber = NotificationLogSubscriber( + config.workflow.name, + run_id=event_log_subscriber.run_id, + ) + emitter.subscribe(notification_log_subscriber.on_event) + # Subscribe console output to the event emitter console_subscriber = ConsoleEventSubscriber() emitter.subscribe(console_subscriber.on_event) @@ -1341,6 +1351,11 @@ async def run_workflow_async( if event_log_subscriber is not None: event_log_subscriber.close() _verbose_console.print(f"[dim]Event log written to: {event_log_subscriber.path}[/dim]") + if notification_log_subscriber is not None: + notification_log_subscriber.close() + _verbose_console.print( + f"[dim]Notifications written to: {notification_log_subscriber.path}[/dim]" + ) # Report log file path to stderr and close file logging if log_file is not None and _file_console is not None: @@ -1623,6 +1638,7 @@ async def resume_workflow_async( # Always create event emitter and JSONL log subscriber (parity with run) emitter = WorkflowEventEmitter() event_log_subscriber: Any = None + notification_log_subscriber: Any = None dashboard: Any = None try: @@ -1775,6 +1791,16 @@ async def resume_workflow_async( ) emitter.subscribe(event_log_subscriber.on_event) + # Dedicated notifications.jsonl (parity with run). Shares the + # run_id stem with the event log so the two files line up. + from conductor.engine.event_log import NotificationLogSubscriber + + notification_log_subscriber = NotificationLogSubscriber( + config.workflow.name, + run_id=event_log_subscriber.run_id, + ) + emitter.subscribe(notification_log_subscriber.on_event) + # Subscribe console output to the event emitter (parity with run) console_subscriber = ConsoleEventSubscriber() emitter.subscribe(console_subscriber.on_event) @@ -1904,6 +1930,11 @@ async def resume_workflow_async( if event_log_subscriber is not None: event_log_subscriber.close() _verbose_console.print(f"[dim]Event log written to: {event_log_subscriber.path}[/dim]") + if notification_log_subscriber is not None: + notification_log_subscriber.close() + _verbose_console.print( + f"[dim]Notifications written to: {notification_log_subscriber.path}[/dim]" + ) # Report log file path to stderr and close file logging if log_file is not None and _file_console is not None: diff --git a/src/conductor/config/schema.py b/src/conductor/config/schema.py index 43e9d65..3f2ad85 100644 --- a/src/conductor/config/schema.py +++ b/src/conductor/config/schema.py @@ -6,6 +6,7 @@ from __future__ import annotations +import re from typing import Any, Literal from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator @@ -447,6 +448,129 @@ class ReasoningConfig(BaseModel): """Reasoning effort level applied to the agent's model calls.""" +# Dotted-identifier regex used for notification namespaces. +# e.g. ``polyphony``, ``polyphony.feature_pr``, ``my_pkg.sub_pkg.workflow``. +_NAMESPACE_PATTERN = re.compile(r"^[a-z_][a-z0-9_]*(\.[a-z_][a-z0-9_]*)*$") + + +class NotificationTypeDef(BaseModel): + """Declared schema for a single user-defined notification type. + + Notifications are a *visibility* primitive — fire-and-forget messages + the workflow author publishes to the outside world for external + tooling to hook off. They are **not** a control-flow primitive; if + you need a decision to flow back into the workflow, use a + ``human_gate`` instead. + + Each declared type has a fixed payload schema (reusing + :class:`OutputField` for per-field type declarations) and a version + that consumers MUST pin against. Bumping the version produces a + distinct ``schema_id`` so consumers can transition deliberately. + """ + + model_config = ConfigDict(extra="forbid") + + version: int = Field(default=1, ge=1) + """Schema version. Bump when making a breaking change to ``payload``. + + Bump policy: + - Adding an *optional* field is non-breaking; do not bump. + - Removing a field, renaming a field, changing a field's type, or + making an optional field required is breaking; bump and emit + under the new ``schema_id``. + - Consumers MUST pin on ``schema_id`` (``.@``), + not on ``notification_type`` alone. + """ + + description: str | None = None + """Human-readable description of when this notification fires.""" + + payload: dict[str, OutputField] = Field(default_factory=dict) + """Declared payload field schema. + + Each value is an :class:`OutputField` describing the field's type + (and, for arrays/objects, its item/property structure). At emission + time the engine renders the step's ``payload:`` block via Jinja2, + then validates the rendered values against this schema. + """ + + +class NotificationsConfig(BaseModel): + """Workflow-level notification configuration. + + Declares the namespace, correlation keys, and type registry for all + notifications the workflow may emit. + + Example YAML:: + + notifications: + namespace: polyphony.feature_pr + correlation: + - apex_id + - work_item_id + types: + pr_ready: + version: 1 + description: A pull request has reached ready-for-review state. + payload: + pr_url: { type: string } + pr_id: { type: number } + ado_org: { type: string } + ado_project: { type: string } + """ + + model_config = ConfigDict(extra="forbid") + + namespace: str | None = None + """Optional dotted-identifier namespace for ``schema_id`` values. + + Defaults to a slugified form of the workflow ``name``. Override when + multiple workflows share a package boundary so consumers can + disambiguate same-named local types. + + Format: ``[a-z_][a-z0-9_]*(\\.[a-z_][a-z0-9_]*)*``. + """ + + correlation: list[str] = Field(default_factory=list) + """Names of workflow inputs to surface on every notification. + + Values are snapshotted from ``workflow.input.*`` at run start and + auto-merged into every notification's envelope under the + ``correlation`` field. Sub-workflows inherit the parent's correlation + snapshot (parent wins on key collision) so deeply-nested emissions + carry upstream identifiers without the leaf author wiring them. + """ + + types: dict[str, NotificationTypeDef] = Field(default_factory=dict) + """Registry of declared notification types, keyed by local type name. + + Type names must be valid identifiers (``[a-z_][a-z0-9_]*``). + """ + + @field_validator("namespace") + @classmethod + def _validate_namespace(cls, v: str | None) -> str | None: + if v is None: + return v + if not _NAMESPACE_PATTERN.match(v): + raise ValueError( + f"namespace '{v}' must be a dotted lowercase identifier " + "(e.g. 'polyphony.feature_pr')" + ) + return v + + @field_validator("types") + @classmethod + def _validate_type_names( + cls, v: dict[str, NotificationTypeDef] + ) -> dict[str, NotificationTypeDef]: + ident = re.compile(r"^[a-z_][a-z0-9_]*$") + for name in v: + if not ident.match(name): + raise ValueError(f"notification type name '{name}' must be a lowercase identifier") + return v + + class AgentDef(BaseModel): """Definition for a single agent in the workflow.""" @@ -458,7 +582,7 @@ class AgentDef(BaseModel): description: str | None = None """Human-readable description of agent's purpose.""" - type: Literal["agent", "human_gate", "script", "workflow"] | None = None + type: Literal["agent", "human_gate", "script", "workflow", "notification"] | None = None """Agent type. Defaults to 'agent' if not specified.""" provider: Literal["copilot", "claude"] | None = None @@ -671,6 +795,18 @@ class AgentDef(BaseModel): effort: high """ + notification: str | None = None + """For ``type=notification`` steps: the declared notification type name + to emit. Must reference a key under ``workflow.notifications.types``. + """ + + payload: dict[str, Any] | None = None + """For ``type=notification`` steps: Jinja2-templated values for each + declared payload field. Keys must exactly match the declared payload + schema (no missing fields, no extras). Each value is rendered against + the workflow context at emission time. + """ + @field_validator("timeout") @classmethod def validate_timeout(cls, v: int | None) -> int | None: @@ -758,6 +894,54 @@ def validate_agent_type(self) -> AgentDef: raise ValueError("workflow agents cannot have 'dialog'") if self.timeout_seconds is not None: raise ValueError("workflow agents cannot have 'timeout_seconds'") + elif self.type == "notification": + if not self.notification: + raise ValueError( + "notification agents require 'notification' " + "(the declared notification type name to emit)" + ) + if self.payload is None: + raise ValueError( + "notification agents require 'payload' (values for the declared payload fields)" + ) + if self.prompt: + raise ValueError("notification agents cannot have 'prompt'") + if self.provider: + raise ValueError("notification agents cannot have 'provider'") + if self.model: + raise ValueError("notification agents cannot have 'model'") + if self.tools is not None: + raise ValueError("notification agents cannot have 'tools'") + if self.system_prompt: + raise ValueError("notification agents cannot have 'system_prompt'") + if self.options: + raise ValueError("notification agents cannot have 'options'") + if self.command: + raise ValueError("notification agents cannot have 'command'") + if self.workflow: + raise ValueError("notification agents cannot have 'workflow'") + if self.output: + raise ValueError( + "notification agents cannot have 'output' " + "(notifications are fire-and-forget; if you need a value to " + "flow back into the workflow, use a different step type)" + ) + if self.max_session_seconds: + raise ValueError("notification agents cannot have 'max_session_seconds'") + if self.max_agent_iterations is not None: + raise ValueError("notification agents cannot have 'max_agent_iterations'") + if self.retry is not None: + raise ValueError("notification agents cannot have 'retry'") + if self.dialog is not None: + raise ValueError("notification agents cannot have 'dialog'") + if self.reasoning is not None: + raise ValueError("notification agents cannot have 'reasoning'") + if self.timeout_seconds is not None: + raise ValueError("notification agents cannot have 'timeout_seconds'") + if self.input_mapping is not None: + raise ValueError("notification agents cannot have 'input_mapping'") + if self.max_depth is not None: + raise ValueError("notification agents cannot have 'max_depth'") else: # Regular agent or human_gate — input_mapping is not valid if self.input_mapping is not None: @@ -770,6 +954,18 @@ def validate_agent_type(self) -> AgentDef: f"'{self.type or 'agent'}' agents cannot have 'max_depth' " "(only workflow agents support max_depth)" ) + # Cross-type guard: notification/payload fields only valid on notification steps + if self.type != "notification": + if self.notification is not None: + raise ValueError( + f"'{self.type or 'agent'}' agents cannot have 'notification' " + "(only type=notification steps emit notifications)" + ) + if self.payload is not None: + raise ValueError( + f"'{self.type or 'agent'}' agents cannot have 'payload' " + "(only type=notification steps emit notifications)" + ) if self.type == "workflow" and self.reasoning is not None: raise ValueError("workflow agents cannot have 'reasoning'") return self @@ -949,6 +1145,15 @@ class WorkflowDef(BaseModel): hooks: HooksConfig | None = None """Lifecycle event hooks.""" + notifications: NotificationsConfig | None = None + """Optional notification configuration. + + Declares the namespace, correlation keys, and type registry for + domain notifications this workflow may emit via ``type=notification`` + steps. Notifications are fire-and-forget messages published to the + outside world for external tooling to hook off. + """ + metadata: dict[str, Any] = Field(default_factory=dict) """Arbitrary key-value metadata for external tooling (dashboards, trackers, etc.). diff --git a/src/conductor/config/validator.py b/src/conductor/config/validator.py index 1b1f753..68b7eca 100644 --- a/src/conductor/config/validator.py +++ b/src/conductor/config/validator.py @@ -212,6 +212,9 @@ def validate_workflow_config( "inline agent. Script steps cannot be used in for_each groups." ) + # Validate notification configuration and notification steps + errors.extend(_validate_notifications(config)) + # Validate sub-workflow references (local paths and registry refs). # Skipped when workflow_path is not provided — relative paths cannot be # resolved without knowing the file's location. @@ -250,6 +253,78 @@ def validate_workflow_config( return warnings +def _validate_notifications(config: WorkflowConfig) -> list[str]: + """Validate workflow notification configuration and notification steps. + + Checks: + - Every step with ``type=notification`` references a declared + ``notifications.types`` key (or fails if no notifications block exists). + - Provided payload keys exactly match the declared payload field set. + - All declared ``correlation`` keys reference actual workflow inputs. + + Returns a list of error message strings (collected, not raised, so the + caller can aggregate them with other validator errors). + """ + errors: list[str] = [] + notif_config = config.workflow.notifications + + # All notification steps need somewhere to look up their type + notification_steps = [a for a in config.agents if a.type == "notification"] + + if notification_steps and notif_config is None: + for agent in notification_steps: + errors.append( + f"Agent '{agent.name}' is type=notification but no " + "'workflow.notifications' block is declared in the workflow." + ) + return errors + + if notif_config is None: + return errors + + # Correlation keys must resolve to declared workflow inputs + workflow_input_keys = set(config.workflow.input.keys()) + for key in notif_config.correlation: + if key not in workflow_input_keys: + errors.append( + f"workflow.notifications.correlation key '{key}' does not match " + f"any declared workflow input. Available inputs: " + f"{', '.join(sorted(workflow_input_keys)) or '(none)'}" + ) + + # Each notification step must reference a declared type, and its payload + # must match the declared field set exactly + for agent in notification_steps: + type_name = agent.notification + if type_name not in notif_config.types: + available = ", ".join(sorted(notif_config.types.keys())) or "(none)" + errors.append( + f"Agent '{agent.name}' references undeclared notification type " + f"'{type_name}'. Declared types: {available}" + ) + continue + + type_def = notif_config.types[type_name] + declared = set(type_def.payload.keys()) + provided = set((agent.payload or {}).keys()) + missing = declared - provided + extra = provided - declared + if missing: + errors.append( + f"Agent '{agent.name}' notification payload for type " + f"'{type_name}' is missing field(s): {', '.join(sorted(missing))}" + ) + if extra: + errors.append( + f"Agent '{agent.name}' notification payload for type " + f"'{type_name}' has unexpected field(s): " + f"{', '.join(sorted(extra))}. " + f"Declared fields: {', '.join(sorted(declared)) or '(none)'}" + ) + + return errors + + def _validate_agent_routes( agent_name: str, routes: list, diff --git a/src/conductor/engine/event_log.py b/src/conductor/engine/event_log.py index 8e0584f..8b2836d 100644 --- a/src/conductor/engine/event_log.py +++ b/src/conductor/engine/event_log.py @@ -177,3 +177,61 @@ def close(self) -> None: """Close the log file handle.""" if self._handle is not None and not self._handle.closed: self._handle.close() + + +class NotificationLogSubscriber: + """Writes only domain notification events to a dedicated JSONL file. + + Mirrors :class:`EventLogSubscriber` but filters to ``event.type == + "notification"`` so external tooling can tail a clean stream without + parsing through execution telemetry. + + Args: + workflow_name: Used in the filename for easy identification. + run_id: Optional pre-existing run identifier to embed in the + filename. When ``None``, generates a fresh 8-char hex token. + Pass the matching :class:`EventLogSubscriber.run_id` so the + notifications file and events file share a filename stem. + """ + + def __init__(self, workflow_name: str, run_id: str | None = None) -> None: + import secrets + + ts = time.strftime("%Y%m%d-%H%M%S") + self._run_id = run_id or secrets.token_hex(4) + ts = f"{ts}-{self._run_id}" + self._path = ( + Path(tempfile.gettempdir()) + / "conductor" + / f"conductor-{workflow_name}-{ts}.notifications.jsonl" + ) + self._path.parent.mkdir(parents=True, exist_ok=True) + self._handle = open(self._path, "w", encoding="utf-8") # noqa: SIM115 + + @property + def run_id(self) -> str: + """Unique run identifier (8-char hex).""" + return self._run_id + + @property + def path(self) -> Path: + """Path to the notifications JSONL file.""" + return self._path + + def on_event(self, event: WorkflowEvent) -> None: + """Write a notification event as a JSON line; ignore other event types.""" + if event.type != "notification": + return + if self._handle is None or self._handle.closed: + return + try: + line = json.dumps(_make_json_safe(event.to_dict()), separators=(",", ":")) + self._handle.write(line + "\n") + self._handle.flush() + except Exception: + logger.debug("Failed to write notification event to log", exc_info=True) + + def close(self) -> None: + """Close the log file handle.""" + if self._handle is not None and not self._handle.closed: + self._handle.close() diff --git a/src/conductor/engine/workflow.py b/src/conductor/engine/workflow.py index eccc740..24027d2 100644 --- a/src/conductor/engine/workflow.py +++ b/src/conductor/engine/workflow.py @@ -294,6 +294,7 @@ def __init__( run_context: RunContext | None = None, _dashboard_context_path: list[str] | None = None, instructions_preamble: str | None = None, + _parent_correlation: dict[str, Any] | None = None, ) -> None: """Initialize the WorkflowEngine. @@ -431,6 +432,20 @@ def __init__( # without inferring parentage from activeContextPath. self._dashboard_context_path: list[str] = list(_dashboard_context_path or []) + # Notification correlation. The parent engine's snapshotted correlation + # map is passed in via _parent_correlation; this engine layers its own + # declared correlation keys (resolved from workflow.input at run start) + # underneath, with parent values winning on collision so deeply-nested + # emissions preserve the upstream trail. Populated in ``run``. + self._parent_correlation: dict[str, Any] = dict(_parent_correlation or {}) + self._correlation_snapshot: dict[str, Any] = dict(self._parent_correlation) + + # Pre-instantiated notification envelope builder. Stateless, so a + # single instance per engine is fine. + from conductor.executor.notification import NotificationExecutor + + self._notification_executor = NotificationExecutor() + @property def _workflow_dir(self) -> Path | None: """Resolved parent directory of the workflow file, or None if unset.""" @@ -855,6 +870,29 @@ async def _execute_with_agent_timeout( timeout_seconds=agent.timeout_seconds, ) from e + def _snapshot_correlation(self, inputs: dict[str, Any]) -> None: + """Snapshot correlation values from workflow inputs. + + Resolves every key declared in ``workflow.notifications.correlation`` + against *inputs*, then merges the result under the parent engine's + correlation map. Parent values win on collision so deeply-nested + emissions preserve the upstream trail. + + Stores the resolved map on ``self._correlation_snapshot`` for the + :class:`NotificationExecutor` to pick up at emission time. + + Args: + inputs: Merged workflow input values (defaults applied). + """ + own: dict[str, Any] = {} + notif = self.config.workflow.notifications + if notif is not None: + for key in notif.correlation: + if key in inputs: + own[key] = inputs[key] + # Parent wins on key collision + self._correlation_snapshot = {**own, **self._parent_correlation} + def _build_subworkflow_inputs( self, agent: AgentDef, @@ -1141,6 +1179,7 @@ async def _execute_subworkflow( slot_key or agent.name, ], instructions_preamble=child_preamble, + _parent_correlation=self._correlation_snapshot, ) return await child_engine.run(sub_inputs) @@ -1252,6 +1291,7 @@ async def _execute_subworkflow_with_inputs( else: child_preamble = _wrap_preamble(sub_inner) child_engine_kwargs["instructions_preamble"] = child_preamble + child_engine_kwargs["_parent_correlation"] = self._correlation_snapshot child_engine = WorkflowEngine(**child_engine_kwargs) @@ -1345,6 +1385,7 @@ async def run(self, inputs: dict[str, Any]) -> dict[str, Any]: # Apply defaults from input schema for optional inputs not provided merged_inputs = self._apply_input_defaults(inputs) self.context.set_workflow_inputs(merged_inputs) + self._snapshot_correlation(merged_inputs) self.limits.start() current_agent_name = self.config.workflow.entry_point @@ -1375,6 +1416,12 @@ async def resume(self, current_agent_name: str) -> dict[str, Any]: # Fresh timeout window for resumed execution self.limits.start_time = _time.monotonic() + # Rebuild correlation snapshot from restored context inputs so resumed + # runs emit notifications with the same correlation keys as the original. + restored_inputs = self.context.get_for_template().get("workflow", {}).get("input", {}) + if isinstance(restored_inputs, dict): + self._snapshot_correlation(restored_inputs) + # Execute on_start hook (signals resume) self._execute_hook("on_start") @@ -2479,6 +2526,111 @@ async def _execute_loop(self, current_agent_name: str) -> dict[str, Any]: ) continue + # Handle notification steps (fire-and-forget visibility) + if agent.type == "notification": + agent_context = self.context.build_for_agent( + agent.name, + agent.input, + mode=self.config.workflow.context.mode, + agent_type=agent.type, + ) + notif_execution_count = ( + self.limits.get_agent_execution_count(agent.name) + 1 + ) + + self._emit( + "notification_started", + { + "agent_name": agent.name, + "iteration": notif_execution_count, + "notification_type": agent.notification, + }, + ) + + notif_config = self.config.workflow.notifications + # The schema/validator both guarantee a non-None + # notifications block exists when a notification step + # is present, but assert defensively at runtime so + # the type narrowing is explicit for callers. + assert notif_config is not None # noqa: S101 + + try: + envelope = self._notification_executor.build_envelope( + agent, + notif_config, + agent_context, + workflow_name=self.config.workflow.name, + run_id=self._run_id, + subworkflow_path=self._dashboard_context_path, + iteration=notif_execution_count, + correlation=self._correlation_snapshot, + workflow_metadata=self.config.workflow.metadata, + ) + except Exception as exc: + self._emit( + "notification_failed", + { + "agent_name": agent.name, + "notification_type": agent.notification, + "error_type": type(exc).__name__, + "message": str(exc), + }, + ) + raise + + # The domain notification itself — this is what + # external tooling tails the notifications.jsonl + # subscriber for. + self._emit("notification", envelope) + + self._emit( + "notification_completed", + { + "agent_name": agent.name, + "notification_type": agent.notification, + "emission_id": envelope["emission_id"], + "schema_id": envelope["schema_id"], + }, + ) + + # Store an empty dict so routes can fire without + # exposing notification internals as agent output + # (consistent with the "no response flow" contract). + self.context.store(agent.name, {}) + self.limits.record_execution(agent.name) + self.limits.check_timeout() + + route_result = self._evaluate_routes(agent, {}) + + self._emit( + "route_taken", + { + "from_agent": agent.name, + "to_agent": route_result.target, + }, + ) + + if route_result.target == "$end": + result = self._build_final_output(route_result.output_transform) + self._emit( + "workflow_completed", + { + "elapsed": _time.time() - _workflow_start, + "output": result, + }, + ) + self._execute_hook("on_complete", result=result) + return result + + current_agent_name = route_result.target + + interrupt_result = await self._check_interrupt(current_agent_name) + if interrupt_result is not None: + current_agent_name = await self._handle_interrupt_result( + interrupt_result, current_agent_name + ) + continue + # Build context for this agent agent_context = self.context.build_for_agent( agent.name, diff --git a/src/conductor/executor/notification.py b/src/conductor/executor/notification.py new file mode 100644 index 0000000..9dd065b --- /dev/null +++ b/src/conductor/executor/notification.py @@ -0,0 +1,235 @@ +"""Notification step executor for Conductor workflows. + +Renders a notification step's payload via Jinja2, validates each rendered +value against the declared :class:`OutputField` schema, and returns a +fully-built envelope ready to emit as a ``notification`` event. + +Notifications are a fire-and-forget visibility primitive — there is no +provider call and no side effect beyond constructing the envelope. The +engine is responsible for actually emitting the event. +""" + +from __future__ import annotations + +import json +import re +from typing import TYPE_CHECKING, Any + +from conductor.exceptions import ValidationError +from conductor.executor.template import TemplateRenderer + +if TYPE_CHECKING: + from conductor.config.schema import AgentDef, NotificationsConfig, OutputField + + +# Same regex used in schema.py for namespace validation. Kept local so the +# executor can slug a workflow name into a valid namespace at envelope-build +# time without importing the regex constant cross-module. +_NAMESPACE_PATTERN = re.compile(r"^[a-z_][a-z0-9_]*(\.[a-z_][a-z0-9_]*)*$") + + +def slug_namespace(workflow_name: str) -> str: + """Slugify *workflow_name* into a valid dotted-identifier namespace. + + Lowercases, replaces any character outside ``[a-z0-9_.]`` with ``_``, + and prepends ``_`` if the result would otherwise start with a digit + or a dot. Used to derive a default namespace when the workflow author + did not set ``notifications.namespace`` explicitly. + """ + s = re.sub(r"[^a-z0-9_.]", "_", workflow_name.lower()) + if not s or not re.match(r"^[a-z_]", s): + s = "_" + s + return s + + +def build_step_path(subworkflow_path: list[str], step_name: str) -> str: + """Build the dotted step-path component of an ``emission_id``. + + Joins the engine's ``subworkflow_path`` slot keys with the step name + using ``/`` (matches the dashboard's existing path convention). + """ + if subworkflow_path: + return "/".join([*subworkflow_path, step_name]) + return step_name + + +def build_emission_id(run_id: str, step_path: str, iteration: int) -> str: + """Build a stable ``emission_id`` for a single notification emission. + + Format: ``::``. Deterministic across + resume and replay so the first downstream consumer can dedupe. + """ + return f"{run_id}:{step_path}:{iteration}" + + +def _validate_value(value: Any, field: OutputField, path: str) -> None: + """Validate *value* against an :class:`OutputField` schema. + + Recurses into array items and object properties when the field + declares them. Raises :class:`ValidationError` on mismatch with a + dotted ``path`` identifying the offending location in the payload. + """ + expected = field.type + if expected == "string": + if not isinstance(value, str): + raise ValidationError( + f"Notification payload field '{path}' must be a string, got {type(value).__name__}" + ) + elif expected == "number": + if isinstance(value, bool) or not isinstance(value, (int, float)): + raise ValidationError( + f"Notification payload field '{path}' must be a number, got {type(value).__name__}" + ) + elif expected == "boolean": + if not isinstance(value, bool): + raise ValidationError( + f"Notification payload field '{path}' must be a boolean, got {type(value).__name__}" + ) + elif expected == "array": + if not isinstance(value, list): + raise ValidationError( + f"Notification payload field '{path}' must be an array, got {type(value).__name__}" + ) + if field.items is not None: + for i, item in enumerate(value): + _validate_value(item, field.items, f"{path}[{i}]") + elif expected == "object": + if not isinstance(value, dict): + raise ValidationError( + f"Notification payload field '{path}' must be an object, got {type(value).__name__}" + ) + if field.properties is not None: + for prop_name, prop_schema in field.properties.items(): + if prop_name in value: + _validate_value(value[prop_name], prop_schema, f"{path}.{prop_name}") + + +def _coerce_rendered(value: Any, field_type: str) -> Any: + """Best-effort coerce a Jinja2-rendered string to the declared type. + + Jinja2 rendering produces a string. For non-string declared types we + try ``json.loads`` so ``"42"`` becomes ``int(42)`` and ``"[1,2]"`` + becomes a list. Falls back to the raw value if parsing fails or the + value is already non-string (e.g. a dict literal passed through). + """ + if field_type == "string": + return value + if not isinstance(value, str): + return value + try: + return json.loads(value) + except (json.JSONDecodeError, ValueError): + return value + + +class NotificationExecutor: + """Builds the envelope for a ``type=notification`` step. + + The executor is pure — it renders templates, validates types, and + returns a dict ready to ship as the ``data`` of a ``notification`` + event. It does not call into the event emitter; the engine does that. + """ + + def __init__(self) -> None: + self._renderer = TemplateRenderer() + + def build_envelope( + self, + agent: AgentDef, + notifications_config: NotificationsConfig, + context: dict[str, Any], + *, + workflow_name: str, + run_id: str, + subworkflow_path: list[str], + iteration: int, + correlation: dict[str, Any], + workflow_metadata: dict[str, Any], + ) -> dict[str, Any]: + """Build the full notification envelope for emission. + + Raises: + ValidationError: If the referenced type is undeclared, if + payload keys don't match the declared schema, or if any + rendered value fails type validation. + """ + if agent.notification is None or agent.payload is None: + raise ValidationError( + f"Notification step '{agent.name}' is missing 'notification' or 'payload' " + "(this should have been caught by schema validation)" + ) + + type_name = agent.notification + type_def = notifications_config.types.get(type_name) + if type_def is None: + available = ", ".join(sorted(notifications_config.types.keys())) or "(none)" + raise ValidationError( + f"Notification step '{agent.name}' references undeclared notification " + f"type '{type_name}'. Declared types: {available}", + suggestion=( + f"Add a '{type_name}' entry under workflow.notifications.types, " + "or change the step to reference an existing type." + ), + ) + + declared_fields = set(type_def.payload.keys()) + provided_fields = set(agent.payload.keys()) + missing = declared_fields - provided_fields + extra = provided_fields - declared_fields + if missing or extra: + parts = [] + if missing: + parts.append(f"missing field(s): {', '.join(sorted(missing))}") + if extra: + parts.append(f"unexpected field(s): {', '.join(sorted(extra))}") + raise ValidationError( + f"Notification step '{agent.name}' payload for type '{type_name}' " + f"does not match declared schema: {'; '.join(parts)}", + suggestion=(f"Declared fields: {', '.join(sorted(declared_fields)) or '(none)'}"), + ) + + rendered_payload: dict[str, Any] = {} + for field_name, template_value in agent.payload.items(): + field_schema = type_def.payload[field_name] + if isinstance(template_value, str): + try: + rendered = self._renderer.render(template_value, context) + except Exception as e: + raise ValidationError( + f"Failed to render payload field '{field_name}' for " + f"notification step '{agent.name}': {e}" + ) from e + rendered = _coerce_rendered(rendered, field_schema.type) + else: + rendered = template_value + _validate_value(rendered, field_schema, field_name) + rendered_payload[field_name] = rendered + + namespace = notifications_config.namespace or slug_namespace(workflow_name) + if not _NAMESPACE_PATTERN.match(namespace): + raise ValidationError( + f"Resolved namespace '{namespace}' is not a valid dotted identifier", + suggestion=( + "Set workflow.notifications.namespace explicitly to a valid value " + "(e.g. 'my_pkg.my_workflow')." + ), + ) + + step_path = build_step_path(subworkflow_path, agent.name) + emission_id = build_emission_id(run_id, step_path, iteration) + schema_id = f"{namespace}.{type_name}@{type_def.version}" + + return { + "emission_id": emission_id, + "schema_id": schema_id, + "notification_type": type_name, + "namespace": namespace, + "version": type_def.version, + "run_id": run_id, + "workflow": workflow_name, + "source_agent": agent.name, + "subworkflow_path": list(subworkflow_path), + "correlation": dict(correlation), + "workflow_metadata": dict(workflow_metadata), + "payload": rendered_payload, + } diff --git a/tests/test_config/test_notifications.py b/tests/test_config/test_notifications.py new file mode 100644 index 0000000..87f0362 --- /dev/null +++ b/tests/test_config/test_notifications.py @@ -0,0 +1,182 @@ +"""Tests for notification-related schema and validator behavior. + +Lightweight surface coverage: positive case + the negative cases that +are most likely to bite authors (undeclared type, payload mismatch, +unknown correlation key, bad namespace). +""" + +from __future__ import annotations + +import pytest +from pydantic import ValidationError as PydanticValidationError + +from conductor.config.schema import ( + AgentDef, + InputDef, + LimitsConfig, + NotificationsConfig, + NotificationTypeDef, + OutputField, + RouteDef, + RuntimeConfig, + WorkflowConfig, + WorkflowDef, +) +from conductor.config.validator import validate_workflow_config +from conductor.exceptions import ConfigurationError + + +def _make_config( + *, + notifications: NotificationsConfig | None, + agents: list[AgentDef], + inputs: dict[str, InputDef] | None = None, +) -> WorkflowConfig: + return WorkflowConfig( + workflow=WorkflowDef( + name="test", + entry_point=agents[0].name, + runtime=RuntimeConfig(provider="copilot"), + limits=LimitsConfig(max_iterations=10), + input=inputs or {}, + notifications=notifications, + ), + agents=agents, + ) + + +class TestNotificationSchema: + def test_valid_notification_step(self) -> None: + agent = AgentDef( + name="announce", + type="notification", + notification="pr_ready", + payload={"pr_url": "https://x/1"}, + ) + assert agent.notification == "pr_ready" + assert agent.payload == {"pr_url": "https://x/1"} + + def test_notification_step_without_notification_field_raises(self) -> None: + with pytest.raises(PydanticValidationError, match="notification"): + AgentDef(name="bad", type="notification", payload={"x": "y"}) + + def test_notification_step_without_payload_raises(self) -> None: + with pytest.raises(PydanticValidationError, match="payload"): + AgentDef(name="bad", type="notification", notification="pr_ready") + + def test_notification_step_with_prompt_raises(self) -> None: + with pytest.raises(PydanticValidationError, match="cannot have 'prompt'"): + AgentDef( + name="bad", + type="notification", + notification="pr_ready", + payload={"x": "y"}, + prompt="no", + ) + + def test_notification_fields_on_non_notification_step_raises(self) -> None: + with pytest.raises(PydanticValidationError, match="notification"): + AgentDef(name="bad", prompt="hi", notification="pr_ready") + + def test_invalid_namespace_rejected(self) -> None: + with pytest.raises(PydanticValidationError, match="dotted lowercase"): + NotificationsConfig(namespace="Bad-Name", types={}) + + +class TestNotificationValidator: + def _types(self) -> dict[str, NotificationTypeDef]: + return { + "pr_ready": NotificationTypeDef( + payload={ + "pr_url": OutputField(type="string"), + "pr_id": OutputField(type="number"), + } + ) + } + + def test_valid_workflow_passes(self) -> None: + config = _make_config( + inputs={"apex_id": InputDef(type="string")}, + notifications=NotificationsConfig( + namespace="ns", + correlation=["apex_id"], + types=self._types(), + ), + agents=[ + AgentDef( + name="announce", + type="notification", + notification="pr_ready", + payload={"pr_url": "{{ workflow.input.apex_id }}", "pr_id": "1"}, + routes=[RouteDef(to="$end")], + ) + ], + ) + validate_workflow_config(config) + + def test_undeclared_type_rejected(self) -> None: + config = _make_config( + notifications=NotificationsConfig(types=self._types()), + agents=[ + AgentDef( + name="bad", + type="notification", + notification="not_declared", + payload={}, + routes=[RouteDef(to="$end")], + ) + ], + ) + with pytest.raises(ConfigurationError, match="undeclared notification type"): + validate_workflow_config(config) + + def test_payload_field_mismatch_rejected(self) -> None: + config = _make_config( + notifications=NotificationsConfig(types=self._types()), + agents=[ + AgentDef( + name="bad", + type="notification", + notification="pr_ready", + payload={"pr_url": "x"}, # missing pr_id + routes=[RouteDef(to="$end")], + ) + ], + ) + with pytest.raises(ConfigurationError, match="missing field"): + validate_workflow_config(config) + + def test_unknown_correlation_key_rejected(self) -> None: + config = _make_config( + notifications=NotificationsConfig( + correlation=["not_an_input"], + types=self._types(), + ), + agents=[ + AgentDef( + name="announce", + type="notification", + notification="pr_ready", + payload={"pr_url": "x", "pr_id": "1"}, + routes=[RouteDef(to="$end")], + ) + ], + ) + with pytest.raises(ConfigurationError, match="correlation key"): + validate_workflow_config(config) + + def test_notification_step_without_block_rejected(self) -> None: + config = _make_config( + notifications=None, + agents=[ + AgentDef( + name="bad", + type="notification", + notification="pr_ready", + payload={}, + routes=[RouteDef(to="$end")], + ) + ], + ) + with pytest.raises(ConfigurationError, match="no 'workflow.notifications' block"): + validate_workflow_config(config) diff --git a/tests/test_executor/test_notification.py b/tests/test_executor/test_notification.py new file mode 100644 index 0000000..86a342c --- /dev/null +++ b/tests/test_executor/test_notification.py @@ -0,0 +1,186 @@ +"""Tests for the NotificationExecutor envelope builder. + +Covers the parts of the envelope that downstream consumers (Polyphony, +JSONL tail subscribers) will depend on: the deterministic ``emission_id``, +the namespaced ``schema_id``, payload rendering + type validation, and +correlation propagation. +""" + +from __future__ import annotations + +import pytest + +from conductor.config.schema import ( + AgentDef, + NotificationsConfig, + NotificationTypeDef, + OutputField, +) +from conductor.exceptions import ValidationError +from conductor.executor.notification import ( + NotificationExecutor, + build_emission_id, + slug_namespace, +) + + +@pytest.fixture +def executor() -> NotificationExecutor: + return NotificationExecutor() + + +@pytest.fixture +def pr_ready_config() -> NotificationsConfig: + return NotificationsConfig( + namespace="polyphony.feature_pr", + correlation=["apex_id"], + types={ + "pr_ready": NotificationTypeDef( + version=1, + payload={ + "pr_url": OutputField(type="string"), + "pr_id": OutputField(type="number"), + }, + ), + }, + ) + + +def _step(name: str = "announce_ready") -> AgentDef: + return AgentDef( + name=name, + type="notification", + notification="pr_ready", + payload={ + "pr_url": "{{ workflow.input.pr_url }}", + "pr_id": "{{ workflow.input.pr_id }}", + }, + ) + + +class TestEnvelope: + def test_envelope_shape( + self, executor: NotificationExecutor, pr_ready_config: NotificationsConfig + ) -> None: + env = executor.build_envelope( + _step(), + pr_ready_config, + context={"workflow": {"input": {"pr_url": "https://x/42", "pr_id": "42"}}}, + workflow_name="feature-pr", + run_id="run123", + subworkflow_path=[], + iteration=1, + correlation={"apex_id": "apex-1"}, + workflow_metadata={}, + ) + + assert env["schema_id"] == "polyphony.feature_pr.pr_ready@1" + assert env["namespace"] == "polyphony.feature_pr" + assert env["notification_type"] == "pr_ready" + assert env["version"] == 1 + assert env["emission_id"] == "run123:announce_ready:1" + assert env["source_agent"] == "announce_ready" + assert env["correlation"] == {"apex_id": "apex-1"} + # number field rendered from a string template gets json-coerced + assert env["payload"] == {"pr_url": "https://x/42", "pr_id": 42} + + def test_emission_id_includes_subworkflow_path( + self, executor: NotificationExecutor, pr_ready_config: NotificationsConfig + ) -> None: + env = executor.build_envelope( + _step(), + pr_ready_config, + context={"workflow": {"input": {"pr_url": "x", "pr_id": "1"}}}, + workflow_name="feature-pr", + run_id="r1", + subworkflow_path=["wave_dispatch", "dispatch_items.3"], + iteration=2, + correlation={}, + workflow_metadata={}, + ) + assert env["emission_id"] == "r1:wave_dispatch/dispatch_items.3/announce_ready:2" + assert env["subworkflow_path"] == ["wave_dispatch", "dispatch_items.3"] + + def test_namespace_defaults_to_slugified_workflow_name( + self, executor: NotificationExecutor + ) -> None: + config = NotificationsConfig( + types={ + "pr_ready": NotificationTypeDef( + payload={"pr_url": OutputField(type="string")}, + ) + } + ) + env = executor.build_envelope( + AgentDef( + name="n", + type="notification", + notification="pr_ready", + payload={"pr_url": "x"}, + ), + config, + context={}, + workflow_name="My Feature PR!", + run_id="r", + subworkflow_path=[], + iteration=1, + correlation={}, + workflow_metadata={}, + ) + assert env["namespace"] == slug_namespace("My Feature PR!") + assert env["schema_id"].startswith(env["namespace"] + ".pr_ready@") + + def test_wrong_payload_type_raises( + self, executor: NotificationExecutor, pr_ready_config: NotificationsConfig + ) -> None: + bad = AgentDef( + name="bad", + type="notification", + notification="pr_ready", + payload={"pr_url": "ok", "pr_id": "not-a-number"}, + ) + with pytest.raises(ValidationError, match="pr_id"): + executor.build_envelope( + bad, + pr_ready_config, + context={}, + workflow_name="w", + run_id="r", + subworkflow_path=[], + iteration=1, + correlation={}, + workflow_metadata={}, + ) + + def test_undeclared_type_raises( + self, executor: NotificationExecutor, pr_ready_config: NotificationsConfig + ) -> None: + bad = AgentDef( + name="bad", + type="notification", + notification="not_a_type", + payload={}, + ) + with pytest.raises(ValidationError, match="undeclared notification type"): + executor.build_envelope( + bad, + pr_ready_config, + context={}, + workflow_name="w", + run_id="r", + subworkflow_path=[], + iteration=1, + correlation={}, + workflow_metadata={}, + ) + + +class TestHelpers: + def test_slug_namespace(self) -> None: + assert slug_namespace("feature-pr") == "feature_pr" + assert slug_namespace("Polyphony.Feature_PR") == "polyphony.feature_pr" + # leading digit gets prefixed + assert slug_namespace("2nd").startswith("_") + + def test_emission_id_format(self) -> None: + assert build_emission_id("r1", "a/b", 3) == "r1:a/b:3"