Skip to content

feat(agent-runtime): support custom SSE event types in streamRun#427

Merged
jerryliang64 merged 2 commits intomasterfrom
feat/stream-custom-events
Apr 1, 2026
Merged

feat(agent-runtime): support custom SSE event types in streamRun#427
jerryliang64 merged 2 commits intomasterfrom
feat/stream-custom-events

Conversation

@jerryliang64
Copy link
Copy Markdown
Contributor

@jerryliang64 jerryliang64 commented Apr 1, 2026

Summary

  • When AgentStreamMessage.type is set, consumeStreamMessages uses it as the SSE event name instead of the default thread.message.delta
  • Content blocks from custom events are still accumulated for storage (thread history)
  • Messages without type continue to use thread.message.delta (fully backward compatible)

This enables upstream executors to emit fine-grained events like:

  • run.initialized — session ready
  • assistant.turn.started — new assistant turn begins
  • assistant.text.delta — text streaming chunk
  • assistant.thinking.delta — thinking streaming chunk
  • tool.started — tool invocation begins
  • tool.delta — tool input streaming
  • tool.completed — tool result returned
  • assistant.message.completed — turn finished

Test plan

  • Custom event types are forwarded as SSE event names
  • Content blocks are carried in custom events when present
  • No-content events (e.g. turn.started) emit with content: undefined
  • Text content from custom events is accumulated for storage
  • Framework lifecycle events (thread.run., thread.message.) still emitted
  • Backward compatible: no-type messages still use thread.message.delta
  • 110 passing, 8 failing (same 8 as master)

🤖 Generated with Claude Code

Summary by CodeRabbit

  • New Features

    • Streaming now forwards custom SSE event types with structured payloads; content is included when present and omitted when empty, while standard thread events and token accumulation remain intact.
  • Tests

    • Added tests covering custom event forwarding, content presence/absence, accumulation of message content, and aggregated token usage; also verifies fallback to standard message deltas.

When AgentStreamMessage.type is set, consumeStreamMessages forwards it
as the SSE event name instead of the default thread.message.delta.
Content blocks are still accumulated for storage.

This enables upstream executors to emit fine-grained events like
assistant.text.delta, tool.started, tool.completed etc., giving
consumers a richer streaming protocol while maintaining backward
compatibility — messages without type continue to use thread.message.delta.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 1, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 9cd3ce15-b40a-4b12-9b06-ebb061db18eb

📥 Commits

Reviewing files that changed from the base of the PR and between fce3166 and 147d6f6.

📒 Files selected for processing (2)
  • core/agent-runtime/src/AgentRuntime.ts
  • core/agent-runtime/test/AgentRuntime.test.ts
✅ Files skipped from review due to trivial changes (1)
  • core/agent-runtime/test/AgentRuntime.test.ts
🚧 Files skipped from review as they are similar to previous changes (1)
  • core/agent-runtime/src/AgentRuntime.ts

📝 Walkthrough

Walkthrough

The stream consumer in AgentRuntime.ts now branches on msg.type: when present it converts msg.message to content blocks and emits an SSE event with that type (including content when available), skipping the standard per-delta ThreadMessageDelta emission path; token usage and completion aggregation remain unchanged.

Changes

Cohort / File(s) Summary
Stream runtime logic
core/agent-runtime/src/AgentRuntime.ts
Added branching on AgentStreamMessage.type in the stream consumption loop; converts msg.message → content blocks and emits a custom SSE event (writer.writeEvent) with { id, content }, skipping the usual delta emission for that message.
Stream behavior tests
core/agent-runtime/test/AgentRuntime.test.ts
Added tests that emit multiple AgentStreamMessage chunks with custom type values, asserting forwarded custom SSE events carry correct type and data.content (when present), that framework completion events still aggregate accumulated text, and that usage aggregation emits correct totals. Also added fallback test for legacy ThreadMessageDelta when type is absent.

Sequence Diagram(s)

sequenceDiagram
  participant Executor as Executor
  participant Runtime as AgentRuntime
  participant Converter as MessageConverter
  participant Writer as SSEWriter
  Executor->>Runtime: yield AgentStreamMessage (with type & message/usage)
  alt msg.type present
    Runtime->>Converter: toContentBlocks(msg.message)
    Converter-->>Runtime: contentBlocks
    Runtime->>Writer: writeEvent(msg.type, { id, content: contentBlocks or undefined })
  else msg.type absent
    Runtime->>Writer: writeEvent(ThreadMessageDelta, { id, delta: msg.message.delta })
  end
  Runtime->>Runtime: accumulate token usage / run-completion totals
  Runtime->>Writer: writeEvent(ThreadRunCompleted, { id, usage: aggregatedUsage })  
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related PRs

Poem

🐰 I hopped the stream and found a type,
A tiny event with payload ripe.
I turned the blocks and sent them free,
SSE bells chimed, hooray for me! 🌿✨

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and concisely describes the main feature addition: supporting custom SSE event types in the streamRun method of agent-runtime.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/stream-custom-events

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces support for custom event types in the AgentRuntime, allowing messages with a defined type to be forwarded directly to the event writer while still accumulating content for storage. It also includes new test cases to verify custom event forwarding and fallback behavior. A critical issue was identified in the implementation where the use of a continue statement skips the token usage accumulation logic, which would result in the loss of usage statistics for events that contain both a custom type and usage data.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
core/agent-runtime/test/AgentRuntime.test.ts (1)

451-504: Add coverage for usage on custom typed events

Great coverage overall. Please add one typed chunk that also includes usage and assert final run usage totals, so this path is regression-proof.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/agent-runtime/test/AgentRuntime.test.ts` around lines 451 - 504, Add a
test case step that includes usage on a custom typed AgentStreamMessage and
assert the aggregated run usage totals after streamRun completes: inside the
existing test "should forward custom event types from AgentStreamMessage.type"
(where executor.execRun yields AgentStreamMessage objects), add one yield with a
custom type whose message includes a usage object (e.g., message.usage = {
prompt_tokens: X, completion_tokens: Y, total_tokens: Z }), then after await
runtime.streamRun(...) locate the final run-completed/framework event (e.g.,
AgentSSEEvent.ThreadRunCompleted or the stored completed event found via
writer.events) and assert that its data contains the expected aggregated usage
totals; ensure you update references to writer.events, customEvents, and the
completedEvent lookup so the new usage-bearing event is included in accumulation
and the final assertion checks totals.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@core/agent-runtime/src/AgentRuntime.ts`:
- Around line 359-373: When msg.type is present the code returns early (the
continue at the end of the custom-event branch) and thereby skips accounting of
any token usage that may be attached to that chunk; update the custom-event
handling (the branch that calls MessageConverter.toContentBlocks and
writer.writeEvent) to also process msg.usage before continuing — e.g.,
merge/apply msg.usage into the run-level usage accumulator (or call the same
usage-adding helper used in the normal flow) so that usage is not dropped for
messages that include both a custom type and a usage field; ensure the same
change is mirrored for the other branch noted (the code around the existing
usage handling at lines corresponding to 387-391).

---

Nitpick comments:
In `@core/agent-runtime/test/AgentRuntime.test.ts`:
- Around line 451-504: Add a test case step that includes usage on a custom
typed AgentStreamMessage and assert the aggregated run usage totals after
streamRun completes: inside the existing test "should forward custom event types
from AgentStreamMessage.type" (where executor.execRun yields AgentStreamMessage
objects), add one yield with a custom type whose message includes a usage object
(e.g., message.usage = { prompt_tokens: X, completion_tokens: Y, total_tokens: Z
}), then after await runtime.streamRun(...) locate the final
run-completed/framework event (e.g., AgentSSEEvent.ThreadRunCompleted or the
stored completed event found via writer.events) and assert that its data
contains the expected aggregated usage totals; ensure you update references to
writer.events, customEvents, and the completedEvent lookup so the new
usage-bearing event is included in accumulation and the final assertion checks
totals.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: f8fd13e3-049a-4611-a0fd-f9772443de3f

📥 Commits

Reviewing files that changed from the base of the PR and between d87ed90 and fce3166.

📒 Files selected for processing (2)
  • core/agent-runtime/src/AgentRuntime.ts
  • core/agent-runtime/test/AgentRuntime.test.ts

…stom type

The `continue` statement in the custom event branch caused `msg.usage`
processing to be skipped entirely. Replace with `else if` so usage
accumulation always runs regardless of event type.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@jerryliang64 jerryliang64 merged commit 2efe539 into master Apr 1, 2026
12 checks passed
@jerryliang64 jerryliang64 deleted the feat/stream-custom-events branch April 1, 2026 09:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant