Skip to content

feat(go): added SessionRunner.RunBatched()#5018

Draft
apascal07 wants to merge 1 commit intoap/go-session-flowfrom
ap/go-session-flow-interruptible
Draft

feat(go): added SessionRunner.RunBatched()#5018
apascal07 wants to merge 1 commit intoap/go-session-flowfrom
ap/go-session-flow-interruptible

Conversation

@apascal07
Copy link
Copy Markdown
Collaborator

@apascal07 apascal07 commented Mar 27, 2026

Adds SessionRunner.RunBatched() for combining queued inputs into a single turn when a client sends multiple messages in rapid succession, saving model round-trips. Also replaces EndTurn bool + SnapshotID string on SessionFlowStreamChunk with a structured TurnEnd type that consolidates both signals into one chunk and reports how many inputs were combined.

Stacked on #4462.

Examples

Custom session flows

Replace sess.Run with sess.RunBatched. Same callback signature; combining is transparent:

chatFlow := genkit.DefineSessionFlow(g, "chat",
    func(ctx context.Context, resp aix.Responder[any], sess *aix.SessionRunner[MyState]) (*aix.SessionFlowResult, error) {
        return nil, sess.RunBatched(ctx, func(ctx context.Context, input *aix.SessionFlowInput) error {
            // input.Messages contains messages from ALL combined inputs.
            return nil
        })
    },
)

Prompt-backed session flows

Opt in via WithBatchedInputs():

chatFlow := genkit.DefineSessionFlowFromPrompt[MyState](
    g, "chat", defaultInput,
    aix.WithBatchedInputs[MyState](),
)

Client-side usage

No client changes required. TurnEnd.InputCount tells the client how many sends were consumed:

conn, _ := chatFlow.StreamBidi(ctx)
conn.SendText("hello")
conn.SendText("and also, what about X?")

for chunk, err := range conn.Receive() {
    if chunk.TurnEnd != nil {
        // chunk.TurnEnd.InputCount == 2 if both were combined
        // chunk.TurnEnd.SnapshotID has the snapshot ID (if any)
        break
    }
    fmt.Print(chunk.ModelChunk.Text())
}

API Reference

New

// RunBatched is like Run but combines queued inputs into a single turn.
// A draining goroutine forwards InputCh (buffer 1) into a bounded
// intermediary buffer (128) so clients sending multiple messages do not block.
func (*SessionRunner[State]) RunBatched(
    ctx context.Context,
    fn func(ctx context.Context, input *SessionFlowInput) error,
) error

// TurnEnd signals the completion of a turn and carries per-turn metadata.
type TurnEnd struct {
    SnapshotID string `json:"snapshotId,omitempty"` // snapshot persisted at turn end
    InputCount int    `json:"inputCount"`           // number of combined inputs (>= 1)
}

// WithBatchedInputs enables input batching for prompt-backed session flows.
func WithBatchedInputs[State any]() SessionFlowOption[State]

Changed

SessionFlowStreamChunk fields EndTurn bool and SnapshotID string replaced by TurnEnd *TurnEnd:

// Before
if chunk.EndTurn { break }
if chunk.SnapshotID != "" { /* ... */ }

// After
if chunk.TurnEnd != nil {
    if chunk.TurnEnd.SnapshotID != "" { /* ... */ }
    break
}

Why ToolRestart inputs are not special-cased

RunBatched combines all queued inputs unconditionally because mixed-type queuing cannot occur in correct client code:

  1. Message + Message -- User types fast. This is what batching is for.
  2. ToolRestart, then Message -- Restart is already mid-execution; the message queues for the next turn. Sequential, not mixed.
  3. Message, then ToolRestart -- Impossible. ToolRestarts are reactive: the client must receive an interrupted response first.
  4. ToolRestart, then ToolRestart -- Impossible. The second interrupt is only observable after the first restart completes.

If a buggy client queues mixed types, handleResumeOption fails with a clear precondition error.

@apascal07 apascal07 requested a review from huangjeff5 as a code owner March 27, 2026 17:46
@apascal07 apascal07 changed the base branch from main to ap/go-session-flow March 27, 2026 17:46
@apascal07 apascal07 removed the request for review from huangjeff5 March 27, 2026 17:47
@apascal07 apascal07 marked this pull request as draft March 27, 2026 17:52
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces the SessionFlow API, an experimental feature for managing stateful, multi-turn conversational flows with automatic snapshot persistence and streaming support. It includes new Go types for session management, a SessionRunner for turn-based interaction, and prompt-backed flow definitions. My feedback highlights potential performance concerns regarding the use of JSON marshaling for deep-copying snapshots and session states, as well as a recommendation to replace the magic number in the RunBatched buffer size with a named constant.

I am having trouble creating individual review comments. Click here to see my feedback.

go/ai/exp/session.go (115-128)

medium

The copySnapshot function uses JSON marshaling and unmarshaling for deep copying. While this is a convenient way to achieve deep copies, it can be inefficient for very large SessionSnapshot objects or if this operation is performed frequently. Consider if a more performant deep copy mechanism is necessary, depending on the expected size and frequency of state snapshots.

go/ai/exp/session.go (249-258)

medium

Similar to copySnapshot, copyStateLocked uses JSON marshaling and unmarshaling for deep copying the session state. This could introduce performance overhead if the session state (SessionState[State]) becomes very large or if state copying is a frequent operation. Evaluate the potential performance impact based on typical session state sizes.

go/ai/exp/session_flow.go (117)

medium

The buffer size of 128 for the buf channel in RunBatched is a magic number. It would be clearer and more maintainable to define this as a named constant, possibly configurable, to explain its purpose and allow for easier adjustments in the future.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant