Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,56 @@ make validate-examples # validate all examples
7. Routes evaluated via `Router` using Jinja2 or simpleeval expressions
8. Final output built from templates in `output:` section

### Notifications (domain events)

A `type: notification` step lets workflow authors publish typed, declared **domain events** to a dedicated JSONL stream for external tooling to hook off. Distinct from execution events (`agent_started`, `route_evaluated`, …): execution events describe what the runtime is doing; notifications describe what the workflow is asserting to the outside world.

**Notifications are a visibility primitive, not a control-flow primitive.** They do not block, do not wait for an ack, and do not gate routing. If a decision needs to flow back into the workflow, use `human_gate`. Routes on a notification step fire independently of emission.

YAML surface (full example in `examples/notifications.yaml`):

```yaml
workflow:
notifications:
namespace: polyphony.feature_pr # optional; defaults to slugified workflow name
correlation: [apex_id] # workflow input keys auto-surfaced + inherited by sub-workflows
types:
pr_ready:
version: 1
payload:
pr_url: { type: string }
pr_id: { type: number }

agents:
- name: announce_ready
type: notification
notification: pr_ready
payload:
pr_url: "{{ open_pr.output.url }}"
pr_id: "{{ open_pr.output.id }}"
routes:
- to: poll_for_review # continues; emission is independent of routing
```

**Envelope** (the `data` of each `notification` event):

- `emission_id` — `<run_id>:<dotted_step_path>:<iteration>`; stable across resume/replay so consumers can dedupe at the first hop.
- `schema_id` — `<namespace>.<type>@<version>`; consumers MUST pin on this, not on `notification_type` alone.
- `namespace`, `notification_type`, `version`, `run_id`, `workflow`, `source_agent`, `subworkflow_path`.
- `correlation` — first-class field, auto-merged from declared correlation keys and inherited from parent workflows (parent wins on key collision so the upstream trail survives).
- `workflow_metadata` — passthrough from `workflow.metadata`.
- `payload` — declared per-type fields, Jinja2-rendered then type-validated against the `OutputField` schema.

**Delivery (v1):** every emission flows through `WorkflowEventEmitter` and is written to `$TMPDIR/conductor/conductor-<workflow>-<ts>-<run_id>.notifications.jsonl` by `NotificationLogSubscriber`. The path is printed at end-of-run alongside the event log path. No webhooks, shell hooks, or MCP-driven emission in v1.

**Schema-version-bump policy** (documented; not enforced):
- Adding an optional field is non-breaking; do **not** bump `version`.
- Removing/renaming a field, changing a field's type, or making an optional field required is breaking; bump `version` and emit under the new `schema_id`.
- During a transition a workflow MAY emit both `pr_ready@1` and `pr_ready@2` from adjacent steps.
- Consumers MUST pin on `schema_id`.

**Validation failures** (undeclared type, payload field mismatch, unknown correlation key, type mismatch on a rendered value) fail the workflow loudly with `ValidationError`. The contract is the whole point.

### Key Patterns

- **Context modes**: `accumulate` (all prior outputs), `last_only` (previous only), `explicit` (only declared inputs)
Expand Down
125 changes: 125 additions & 0 deletions examples/notifications.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# Notifications Example
#
# Demonstrates user-defined domain notifications: fire-and-forget messages
# the workflow publishes to the outside world for external tooling to hook
# off. Notifications are a VISIBILITY primitive — they do not block, do
# not wait for an ack, and do not gate control flow. If you need a
# decision to flow back into the workflow, use a human_gate instead.
#
# Each emission is written to a dedicated notifications.jsonl file
# alongside the standard event log. The file path is printed at the end
# of the run.
#
# Usage:
# conductor run examples/notifications.yaml \
# --input pr_id=42 \
# --input apex_id=apex-2026-05-18

workflow:
name: notifications-demo
description: Demonstrates user-defined fire-and-forget notifications
version: "1.0.0"
entry_point: open_pr

runtime:
provider: copilot

limits:
max_iterations: 10

input:
pr_id:
type: number
description: ADO pull-request id
apex_id:
type: string
description: Apex run id (correlation key)
ado_org:
type: string
default: contoso
ado_project:
type: string
default: feature-pr

# Workflow-level notification configuration.
notifications:
# Optional. Defaults to the slugified workflow name.
namespace: polyphony.feature_pr

# Workflow input keys auto-surfaced on every notification. Inherited
# by sub-workflows (parent wins on key collision).
correlation:
- apex_id

# Type registry. Each entry declares a fixed payload schema and a
# version. Consumers MUST pin on schema_id (<namespace>.<type>@<version>),
# not on type name alone.
types:
pr_ready:
version: 1
description: A pull request has reached ready-for-review state.
payload:
pr_url: { type: string }
pr_id: { type: number }
ado_org: { type: string }
ado_project: { type: string }

pr_needs_input:
version: 1
description: A PR has changed and requires reviewer input.
payload:
pr_url: { type: string }
change_summary: { type: string }

agents:
- name: open_pr
description: Stand-in for whatever opens / discovers the PR.
model: claude-haiku-4.5
prompt: |
Pretend you are creating pull request {{ workflow.input.pr_id }} in
{{ workflow.input.ado_org }}/{{ workflow.input.ado_project }}.
Reply with a one-sentence summary of what was done.
output:
summary:
type: string
routes:
- to: announce_ready

# Notifications fire mid-flow. Note we route to the next step, NOT to
# $end — emission is independent of routing.
- name: announce_ready
type: notification
notification: pr_ready
payload:
pr_url: "https://dev.azure.com/{{ workflow.input.ado_org }}/{{ workflow.input.ado_project }}/_git/repo/pullrequest/{{ workflow.input.pr_id }}"
pr_id: "{{ workflow.input.pr_id }}"
ado_org: "{{ workflow.input.ado_org }}"
ado_project: "{{ workflow.input.ado_project }}"
routes:
- to: detect_change

- name: detect_change
description: Stand-in for whatever notices the PR changed.
model: claude-haiku-4.5
prompt: |
Pretend a reviewer pushed a small change to PR
{{ workflow.input.pr_id }}. Reply with a one-line description of
the change.
output:
change:
type: string
routes:
- to: announce_needs_input

- name: announce_needs_input
type: notification
notification: pr_needs_input
payload:
pr_url: "https://dev.azure.com/{{ workflow.input.ado_org }}/{{ workflow.input.ado_project }}/_git/repo/pullrequest/{{ workflow.input.pr_id }}"
change_summary: "{{ detect_change.output.change }}"
routes:
- to: $end

output:
pr_summary: "{{ open_pr.output.summary }}"
change: "{{ detect_change.output.change }}"
33 changes: 32 additions & 1 deletion src/conductor/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -1154,6 +1154,7 @@ async def run_workflow_async(
# Always create event emitter and JSONL log subscriber
emitter = WorkflowEventEmitter()
event_log_subscriber: Any = None
notification_log_subscriber: Any = None
dashboard: Any = None

if web:
Expand Down Expand Up @@ -1198,11 +1199,20 @@ async def run_workflow_async(
verbose_log(f"Agents: {len(config.agents)}")

# Start JSONL event log subscriber (always-on structured diagnostics)
from conductor.engine.event_log import EventLogSubscriber
from conductor.engine.event_log import EventLogSubscriber, NotificationLogSubscriber

event_log_subscriber = EventLogSubscriber(config.workflow.name)
emitter.subscribe(event_log_subscriber.on_event)

# Dedicated notifications.jsonl for external tooling that only cares
# about domain notifications, not execution telemetry. Share the same
# run_id so the two files have aligned filename stems.
notification_log_subscriber = NotificationLogSubscriber(
config.workflow.name,
run_id=event_log_subscriber.run_id,
)
emitter.subscribe(notification_log_subscriber.on_event)

# Subscribe console output to the event emitter
console_subscriber = ConsoleEventSubscriber()
emitter.subscribe(console_subscriber.on_event)
Expand Down Expand Up @@ -1341,6 +1351,11 @@ async def run_workflow_async(
if event_log_subscriber is not None:
event_log_subscriber.close()
_verbose_console.print(f"[dim]Event log written to: {event_log_subscriber.path}[/dim]")
if notification_log_subscriber is not None:
notification_log_subscriber.close()
_verbose_console.print(
f"[dim]Notifications written to: {notification_log_subscriber.path}[/dim]"
)

# Report log file path to stderr and close file logging
if log_file is not None and _file_console is not None:
Expand Down Expand Up @@ -1623,6 +1638,7 @@ async def resume_workflow_async(
# Always create event emitter and JSONL log subscriber (parity with run)
emitter = WorkflowEventEmitter()
event_log_subscriber: Any = None
notification_log_subscriber: Any = None
dashboard: Any = None

try:
Expand Down Expand Up @@ -1775,6 +1791,16 @@ async def resume_workflow_async(
)
emitter.subscribe(event_log_subscriber.on_event)

# Dedicated notifications.jsonl (parity with run). Shares the
# run_id stem with the event log so the two files line up.
from conductor.engine.event_log import NotificationLogSubscriber

notification_log_subscriber = NotificationLogSubscriber(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick confirmation: on resume the dashboard already replays the main event log (dashboard.replay_events_from_jsonl at line 1823), and notification events flow through the same emitter — so old emissions will render in the dashboard on resume. The separate .notifications.jsonl is fresh per resume, which is fine for external tailers given the stable emission_id dedup. No action needed; flagging only to confirm the behavior is intentional.

config.workflow.name,
run_id=event_log_subscriber.run_id,
)
emitter.subscribe(notification_log_subscriber.on_event)

# Subscribe console output to the event emitter (parity with run)
console_subscriber = ConsoleEventSubscriber()
emitter.subscribe(console_subscriber.on_event)
Expand Down Expand Up @@ -1904,6 +1930,11 @@ async def resume_workflow_async(
if event_log_subscriber is not None:
event_log_subscriber.close()
_verbose_console.print(f"[dim]Event log written to: {event_log_subscriber.path}[/dim]")
if notification_log_subscriber is not None:
notification_log_subscriber.close()
_verbose_console.print(
f"[dim]Notifications written to: {notification_log_subscriber.path}[/dim]"
)

# Report log file path to stderr and close file logging
if log_file is not None and _file_console is not None:
Expand Down
Loading
Loading