Skip to content

feat(realtime): support multi-message generation per response#5763

Open
longcw wants to merge 2 commits into
mainfrom
longc/multi-message-realtime-v2
Open

feat(realtime): support multi-message generation per response#5763
longcw wants to merge 2 commits into
mainfrom
longc/multi-message-realtime-v2

Conversation

@longcw
Copy link
Copy Markdown
Contributor

@longcw longcw commented May 18, 2026

Summary

  • Process each MessageGeneration from generation_ev.message_stream serially via perform_audio_forwarding + perform_text_forwarding + wait_for_playout. Only one flush is in flight at a time.
  • Per-msg state is derived directly from the playback_finished event:
    • full β†’ emit ChatMessage(interrupted=False) with the msg's message_id
    • partial β†’ emit ChatMessage(interrupted=True) and call _rt_session.truncate(...) with this msg's local playback_position (not a cumulative offset)
    • skipped β†’ drop locally and call update_chat_ctx(...) so the realtime server removes never-played items from its history
  • _on_first_frame now early-returns once started_speaking_at is set, so per-msg first-frame callbacks don't re-fire _update_agent_state("speaking") for each message.

Alternative considered

#5690 makes multi-message work by flushing per message β€” that needs the synchronizer to keep pending/finalizing impls alive and serialize concurrent flushes in room_io/_output.py. Our AudioOutput assumes there is only one speech at a time, serializing per-message at the wait_for_playout boundary (this PR) avoids both changes.

close #5690, #5684

Some realtime providers (e.g. GPT-Realtime-2.0) emit multiple message
items in a single response. Process each one serially: push frames,
flush, wait_for_playout. Only one flush is ever in flight at a time, so
room_io and the transcript synchronizer keep their single-segment
invariants without modification.

Per-msg state is derived from the playback_finished event:
- 'full'    -> emit ChatMessage(interrupted=False) with the msg's id
- 'partial' -> emit ChatMessage(interrupted=True); call truncate() with
               the msg's local playback position
- 'skipped' -> drop from local chat ctx; call update_chat_ctx() so the
               realtime server removes never-played items from history

This is a cleaner alternative to flushing per-message, which would
require keeping multiple in-flight flush_tasks / synchronizer segments
alive simultaneously.
@chenghao-mou chenghao-mou requested a review from a team May 18, 2026 06:46
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 potential issue.

View 5 additional findings in Devin Review.

Open in Devin Review

Comment on lines +3341 to +3342
if not forwarded_text:
continue
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟑 Interrupted messages with empty forwarded text no longer trigger server-side truncation

The refactored post-processing loop at line 3341 gates truncation behind if not forwarded_text: continue, which skips truncation for interrupted messages where no text was produced. In the old code, truncation was always called for interrupted messages regardless of text content β€” it ran with forwarded_text="" and audio_end_ms=0 for "skipped" messages (audio never played) and with the actual playback_position for partially-played messages without text.

Affected scenarios and old-code comparison

Old code (removed at lines ~3274–3300 of the base):

# truncation was unconditional inside the interrupted branch
if self.llm.capabilities.message_truncation:
    self._rt_session.truncate(
        message_id=msg_gen.message_id,
        modalities=msg_modalities,
        audio_end_ms=int(playback_position * 1000),
        audio_transcript=forwarded_text,  # could be ""
    )

New code: for "skipped" entries (entry.played == "skipped"), the loop continues at line 3334 before reaching truncation. The fallback at line 3370 uses update_chat_ctx, which only works when mutable_chat_context is True. For "partial" entries with empty forwarded_text, the continue at line 3342 also skips truncation.

For models that support message_truncation but not mutable_chat_context (e.g., future models; Ultravox has a no-op truncate so is unaffected today), "skipped" messages will leave stale server-side context. For OpenAI Realtime (which supports both), a very early interruption where no text has been produced yet would skip truncation with no fallback (the update_chat_ctx at line 3370 only triggers when any_skipped is True, not for "partial" entries with empty text).

Prompt for agents
In _realtime_generation_task_impl, the post-processing loop (around lines 3331-3353) skips truncation for entries where forwarded_text is empty. The old code always called truncate() for interrupted messages, even with empty text and playback_position=0.

To fix: decouple the truncation logic from the forwarded_text guard. Truncation should be called for ALL interrupted entries (both 'partial' and 'skipped') when message_truncation is supported, using entry.playback_position (which is 0.0 for skipped) and whatever forwarded_text is available (which may be empty). The assistant message creation can still be gated on non-empty forwarded_text.

Specifically:
1. For 'skipped' entries: instead of just setting any_skipped=True and continuing, also call truncate() if message_truncation is supported (with audio_end_ms=0, audio_transcript='').
2. For 'partial' entries with empty forwarded_text: still call truncate() at the actual playback_position before the continue.
3. Keep the update_chat_ctx fallback at line 3370 as an additional safety net for skipped messages.

Relevant code: agent_activity.py lines 3331-3377, the _MsgOutput dataclass at line 3069, and the _process_one_message function at line 3079.
Open in Devin Review

Was this helpful? React with πŸ‘ or πŸ‘Ž to provide feedback.

Server-side truncation must run independently of local ChatMessage
emission. The previous order skipped truncate() when forwarded_text
was empty (transcription disabled, or interrupt before the text
stream caught up to audio), leaving the realtime server with the
full un-truncated audio.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant