feat(notifications): user-defined domain notifications (type: notification)#213
feat(notifications): user-defined domain notifications (type: notification)#213PolyphonyRequiem wants to merge 1 commit into
Conversation
…ification) Introduces a new `type: notification` step that lets workflow authors publish typed, declared *domain events* to a dedicated JSONL stream for external tooling to hook off. Distinct from execution events (`agent_started`, `route_evaluated`, ...): execution events describe what the runtime is doing; notifications describe what the workflow is asserting to the outside world. Notifications are a visibility primitive, not a control-flow primitive. They do not block, do not wait for an ack, and do not gate routing. If a decision needs to flow back into the workflow, use `human_gate`. Workflow-level config (`workflow.notifications`): - `namespace` -- dotted-identifier (defaults to slugified workflow name) - `correlation` -- workflow input keys auto-surfaced on every emission; inherited by sub-workflows (parent wins on collision so the upstream trail survives deep nesting) - `types` -- per-type `version` + `payload` schema (reusing `OutputField`) Each emission ships a fixed envelope: - `emission_id` = `<run_id>:<dotted_step_path>:<iteration>` -- stable across resume/replay so the first downstream consumer can dedupe - `schema_id` = `<namespace>.<type>@<version>` -- consumers MUST pin on this, not on `notification_type` alone - `correlation` -- first-class, not buried in metadata - `payload` -- Jinja2-rendered then type-validated Delivery (v1): rides `WorkflowEventEmitter`; `NotificationLogSubscriber` writes a dedicated `...notifications.jsonl` next to the existing event log. No webhooks, shell hooks, or model-driven emission in v1. Schema-version-bump policy documented in AGENTS.md (additive = no bump; remove/rename/retype/required = bump; consumers MUST pin on schema_id). Validation failures (undeclared type, payload field mismatch, unknown correlation key, rendered-value type mismatch) fail the workflow loudly with `ValidationError` -- the contract is the point. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #213 +/- ##
=======================================
Coverage ? 87.67%
=======================================
Files ? 61
Lines ? 9943
Branches ? 0
=======================================
Hits ? 8718
Misses ? 1225
Partials ? 0 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
jrob5756
left a comment
There was a problem hiding this comment.
Nice work. I have a few questions/comments below
| continue | ||
|
|
||
| # Handle notification steps (fire-and-forget visibility) | ||
| if agent.type == "notification": |
There was a problem hiding this comment.
Parallel / for-each containment is undefined. This dispatch lives only in the sequential path; behavior inside parallel: or for_each: is unspecified and get_agent_execution_count won't give per-slot iteration counts (breaking emission_id uniqueness).
Either disallow at validation time, or thread emission through the group dispatchers + add tests.
| # notifications block exists when a notification step | ||
| # is present, but assert defensively at runtime so | ||
| # the type narrowing is explicit for callers. | ||
| assert notif_config is not None # noqa: S101 |
There was a problem hiding this comment.
assert is stripped under python -O — this becomes a no-op and the next line dereferences None. Make it an explicit raise.
| # The domain notification itself — this is what | ||
| # external tooling tails the notifications.jsonl | ||
| # subscriber for. | ||
| self._emit("notification", envelope) |
There was a problem hiding this comment.
Dashboard rendering for the new event types is missing. Per your discussion with Daniel about a bottom-pane tab at the workflow level — worth landing alongside this PR (or as an immediate follow-up) so --web users actually see emissions. Currently the four new event types (notification_started, notification, notification_completed, notification_failed) and the new step type aren't handled in workflow-store.ts or the Cytoscape view.
| # Store an empty dict so routes can fire without | ||
| # exposing notification internals as agent output | ||
| # (consistent with the "no response flow" contract). | ||
| self.context.store(agent.name, {}) |
There was a problem hiding this comment.
Storing {} means {{ announce_ready.output.x }} downstream silently renders empty under accumulate mode — quiet footgun for authors who confuse a notification with a regular agent. Either document the behavior in AGENTS.md, or store a sentinel that lets templates fail loudly.
| # run_id stem with the event log so the two files line up. | ||
| from conductor.engine.event_log import NotificationLogSubscriber | ||
|
|
||
| notification_log_subscriber = NotificationLogSubscriber( |
There was a problem hiding this comment.
Quick confirmation: on resume the dashboard already replays the main event log (dashboard.replay_events_from_jsonl at line 1823), and notification events flow through the same emitter — so old emissions will render in the dashboard on resume. The separate .notifications.jsonl is fresh per resume, which is fine for external tailers given the stable emission_id dedup. No action needed; flagging only to confirm the behavior is intentional.
| return value | ||
| try: | ||
| return json.loads(value) | ||
| except (json.JSONDecodeError, ValueError): |
There was a problem hiding this comment.
Silent fallback to the raw string means a bad template (e.g. "3.14abc" for a number field) fails one frame later with "got str" rather than "failed to coerce 'X' as number". Better author UX to raise here with the offending value.
| "source_agent": agent.name, | ||
| "subworkflow_path": list(subworkflow_path), | ||
| "correlation": dict(correlation), | ||
| "workflow_metadata": dict(workflow_metadata), |
There was a problem hiding this comment.
Nit: workflow_metadata feels redundant in the envelope. Consumers already get run_id + workflow + subworkflow_path and can join against the event log if they need more. Splatting the whole metadata dict into every emission bloats the wire format and couples the notification schema to whatever ad-hoc keys workflows happen to use. Consider dropping it (or opt-in allowlist).
| effort: high | ||
| """ | ||
|
|
||
| notification: str | None = None |
There was a problem hiding this comment.
Nit: type: notification + notification: pr_ready reads redundant. Siblings use script→command, human_gate→dialog. Something like emit: or event: would parse more naturally — easier to rename now than after it ships.
Summary
Adds a new
type: notificationstep that lets workflow authors publish typed, declared domain events to a dedicated JSONL stream for external tooling to hook off.Notifications are distinct from execution events (
agent_started,route_evaluated, ...): execution events describe what the runtime is doing; notifications describe what the workflow is asserting to the outside world.YAML surface
Envelope shape
emission_id=<run_id>:<dotted_step_path>:<iteration>— stable across resume/replay so the first downstream consumer can dedupe.schema_id=<namespace>.<type>@<version>— consumers MUST pin on this, not onnotification_typealone.correlation— first-class field; auto-merged from declared keys and inherited from the parent workflow (parent wins on collision so the upstream trail survives deep nesting).payload— Jinja2-rendered then type-validated against the declaredOutputFieldschema.Delivery (v1)
Every emission flows through
WorkflowEventEmitterand is written byNotificationLogSubscriberto a dedicated…notifications.jsonlfile next to the existing event log (path printed at end-of-run).No webhooks, shell hooks, or model-driven emission in v1.
What's in this PR
src/conductor/config/schema.py—NotificationsConfig+NotificationTypeDef;notificationandpayloadfields onAgentDef; strict positive/negative validators.src/conductor/config/validator.py— cross-references undeclared types, payload field mismatches, unknown correlation keys.src/conductor/executor/notification.py— pure envelope builder (rendering, coercion, type validation, emission-id construction).src/conductor/engine/workflow.py— dispatch branch + correlation snapshot inherited into sub-workflows.src/conductor/engine/event_log.py—NotificationLogSubscriberfilters tonotificationevents only.src/conductor/cli/run.py— wires the subscriber in bothrunandresumepaths.examples/notifications.yaml— runnable example with namespace, correlation, mid-workflow emission.AGENTS.md— full design + schema-version-bump policy + "visibility not control flow" callout.tests/test_config/test_notifications.py+tests/test_executor/test_notification.py(18 focused tests).Schema-version-bump policy (documented, not enforced)
version.versionand emit under the newschema_id.pr_ready@1andpr_ready@2from adjacent steps.schema_id.Validation
Undeclared type, payload field mismatch, unknown correlation key, or a type mismatch on a rendered value all fail the workflow loudly with
ValidationError. The contract is the whole point.Testing
uv run ruff check— cleanuv run ruff format --check— cleanuv run pytest tests/test_config tests/test_executor tests/test_engine tests/test_cli -m "not performance"— 1692 passed, 5 skippeduv run conductor validate examples/notifications.yaml— cleanexamples/*.yamlstill validate clean.Deliberately deferred
notify:onRouteDef) — possible v1.5 sugar reusing the same envelope.human_gateinstead).