Skip to content

feat(go): added DefineSessionFlow#4462

Draft
apascal07 wants to merge 6 commits intoap/go-bidifrom
ap/go-session-flow
Draft

feat(go): added DefineSessionFlow#4462
apascal07 wants to merge 6 commits intoap/go-bidifrom
ap/go-session-flow

Conversation

@apascal07
Copy link
Collaborator

@apascal07 apascal07 commented Feb 6, 2026

Adds bidirectional streaming primitives to core and a high-level SessionFlow API for multi-turn conversations with automatic snapshot management.

Examples

Bidirectional Streaming Flows

DefineBidiFlow creates a flow that accepts multiple inputs and streams multiple outputs over a persistent connection:

echoFlow := genkit.DefineBidiFlow(g, "echo",
    func(ctx context.Context, _ struct{}, inCh <-chan string, outCh chan<- string) (int, error) {
        var count int
        for input := range inCh {
            count++
            outCh <- fmt.Sprintf("echo: %s", input)
        }
        return count, nil
    },
)

conn, _ := echoFlow.StreamBidi(ctx, struct{}{})

go func() {
    conn.Send("hello")
    conn.Send("world")
    conn.Close()
}()

for chunk, err := range conn.Receive() {
    fmt.Println(chunk) // "echo: hello", "echo: world"
}

output, _ := conn.Output()
fmt.Println(output) // 2

Session Flows

DefineSessionFlow builds on bidi streaming to provide multi-turn conversations with managed state, token-level streaming, and automatic snapshots. The Session.Run loop handles turn boundaries while allowing flexibility before and after to set up expensive clients or clean up in-progress state before returning the final outcome.

chatFlow := genkit.DefineSessionFlow(g, "chat",
    func(ctx context.Context, resp aix.Responder[any], params *aix.SessionFlowParams[struct{}]) error {
        return params.Session.Run(ctx, func(ctx context.Context, input *aix.SessionFlowInput) error {
            sess := params.Session

            for chunk, err := range genkit.GenerateStream(ctx, g,
                ai.WithModelName("googleai/gemini-3-flash-preview"),
                ai.WithMessages(sess.Messages()...),
            ) {
                if err != nil {
                    return err
                }
                if chunk.Done {
                    sess.AddMessages(chunk.Response.Message)
                    break
                }
                resp.SendChunk(chunk.Chunk) // stream tokens to client
            }

            return nil
        })
    },
)

The client drives the conversation by sending messages and iterating chunks until EndTurn:

conn, _ := chatFlow.StreamBidi(ctx)

conn.SendText("What is Go?")

for chunk, err := range conn.Receive() {
    if chunk.Chunk != nil {
        fmt.Print(chunk.Chunk.Text())
    }
    if chunk.EndTurn {
        break // turn complete, ready for next input
    }
}

conn.SendText("Tell me more about its concurrency model")
// ... iterate conn.Receive() again ...

conn.Close()

Prompt-Backed Session Flows

DefineSessionFlowFromPrompt eliminates the manual generate loop entirely. Give it a prompt and it handles rendering, streaming, and history management automatically:

# prompts/chat.prompt
---
model: googleai/gemini-3-flash-preview
input:
  schema:
    personality: string
  default:
    personality: a helpful assistant
---
You are {{personality}}. Keep responses concise.
type ChatInput struct {
    Personality string `json:"personality"`
}

chatPrompt := genkit.LookupDataPrompt[ChatInput, string](g, "chat")

chatFlow := genkit.DefineSessionFlowFromPrompt[struct{}](
    g, "chat", chatPrompt, ChatInput{Personality: "a sarcastic pirate"},
)

conn, _ := chatFlow.StreamBidi(ctx)
conn.SendText("What is Go?")
for chunk, _ := range conn.Receive() {
    // tokens stream automatically
    if chunk.EndTurn { break }
}
conn.Close()

Snapshots & Resumption

Configure automatic snapshot persistence with a store and optional callback:

store := aix.NewInMemorySnapshotStore[MyState]()

chatFlow := genkit.DefineSessionFlow(g, "chat", myFunc,
    aix.WithSnapshotStore(store),
    aix.WithSnapshotCallback(aix.SnapshotOn[MyState](aix.SnapshotEventTurnEnd)),
)

Resume a conversation from a server-stored snapshot:

conn, _ := chatFlow.StreamBidi(ctx, aix.WithSnapshotID[MyState]("snapshot-abc-123"))

Or resume from client-kept state (no server store needed):

conn, _ := chatFlow.StreamBidi(ctx, aix.WithState(&aix.SessionState[MyState]{
    Messages: previousMessages,
    Custom:   MyState{Topic: "concurrency"},
}))

Custom Session State

The State type parameter lets you maintain typed state across turns:

type ChatState struct {
    TopicsDiscussed []string `json:"topicsDiscussed"`
}

chatFlow := genkit.DefineSessionFlow(g, "chat",
    func(ctx context.Context, resp aix.Responder[any], params *aix.SessionFlowParams[ChatState]) error {
        return params.Session.Run(ctx, func(ctx context.Context, input *aix.SessionFlowInput) error {
            // ... generate response ...

            params.Session.UpdateCustom(func(s ChatState) ChatState {
                s.TopicsDiscussed = append(s.TopicsDiscussed, extractTopic(input))
                return s
            })
            return nil
        })
    },
    aix.WithSnapshotStore(aix.NewInMemorySnapshotStore[ChatState]()),
)

Custom state is included in snapshots and available when resuming.


API Reference

Core Bidi Primitives (core)

// Function signature for bidirectional streaming actions.
type BidiFunc[In, Out, Stream, Init any] =
    func(ctx context.Context, init Init, inCh <-chan In, outCh chan<- Stream) (Out, error)

// Flow now has a 4th type parameter (Init). Non-bidi flows use struct{}.
type Flow[In, Out, Stream, Init any] struct { *Action[In, Out, Stream, Init] }

func DefineBidiFlow[In, Out, Stream, Init any](r Registry, name string, fn BidiFunc[...]) *Flow[...]
func NewBidiFlow[In, Out, Stream, Init any](name string, fn BidiFunc[...]) *Flow[...]

// Start a bidi connection from any bidi flow/action.
func (*Flow[...]).StreamBidi(ctx context.Context, init Init) (*BidiConnection[...], error)

BidiConnection[In, Out, Stream]

func (*BidiConnection) Send(input In) error
func (*BidiConnection) Close() error                              // signals no more inputs
func (*BidiConnection) Receive() iter.Seq2[Stream, error]         // break cancels the connection
func (*BidiConnection) Output() (Out, error)                      // blocks until done
func (*BidiConnection) Done() <-chan struct{}

Session Flow API (ai/x — experimental)

Define

func DefineSessionFlow[Stream, State any](
    r Registry, name string,
    fn SessionFlowFunc[Stream, State],
    opts ...SessionFlowOption[State],
) *SessionFlow[Stream, State]

func DefineSessionFlowFromPrompt[State, PromptIn any](
    r Registry, name string,
    p PromptRenderer[PromptIn],
    defaultInput PromptIn,
    opts ...SessionFlowOption[State],
) *SessionFlow[struct{}, State]

// SessionFlowOption[State]
WithSnapshotStore[State](store SnapshotStore[State])
WithSnapshotCallback[State](cb SnapshotCallback[State])

SessionFlowFunc

type SessionFlowFunc[Stream, State any] = func(
    ctx context.Context,
    resp Responder[Stream],
    params *SessionFlowParams[State],
) error

SessionFlow[Stream, State]

func (*SessionFlow) StreamBidi(ctx context.Context, opts ...StreamBidiOption[State]) (*SessionFlowConnection[...], error)

// StreamBidiOption[State]
WithSnapshotID[State](id string)              // resume from server-stored snapshot
WithState[State](state *SessionState[State])  // resume from client-kept state
WithPromptInput[State](input any)             // override prompt input (prompt-backed flows)

SessionFlowConnection[Stream, State]

Unlike BidiConnection, breaking from Receive() does not cancel the connection — enabling multi-turn patterns.

func (*SessionFlowConnection) Send(input *SessionFlowInput) error
func (*SessionFlowConnection) SendMessages(messages ...*ai.Message) error
func (*SessionFlowConnection) SendText(text string) error
func (*SessionFlowConnection) Close() error
func (*SessionFlowConnection) Receive() iter.Seq2[*SessionFlowStreamChunk[Stream], error]
func (*SessionFlowConnection) Output() (*SessionFlowOutput[State], error)
func (*SessionFlowConnection) Done() <-chan struct{}

Session[State]

Propagated via context. Provides the turn loop and state access.

// Turn loop — iterates inCh, wraps each turn in a trace span, sends EndTurn on success.
func (*Session) Run(ctx context.Context, fn func(ctx context.Context, input *SessionFlowInput) error) error

// Conversation history
func (*Session) Messages() []*ai.Message
func (*Session) AddMessages(messages ...*ai.Message)
func (*Session) SetMessages(messages []*ai.Message)

// Custom state
func (*Session) Custom() State
func (*Session) SetCustom(custom State)
func (*Session) UpdateCustom(fn func(State) State)

// Artifacts
func (*Session) Artifacts() []*SessionFlowArtifact
func (*Session) AddArtifacts(artifacts ...*SessionFlowArtifact)
func (*Session) SetArtifacts(artifacts []*SessionFlowArtifact)

// Context helpers
func NewSessionContext[State](ctx, *Session[State]) context.Context
func SessionFromContext[State](ctx) *Session[State]

Responder[Stream]

Output channel with convenience methods. Artifacts sent here are auto-added to the session.

type Responder[Stream any] chan<- *SessionFlowStreamChunk[Stream]

func (Responder) SendChunk(chunk *ai.ModelResponseChunk)
func (Responder) SendStatus(status Stream)
func (Responder) SendArtifact(artifact *SessionFlowArtifact)

Wire Types

type SessionFlowInput struct {
    Messages []*ai.Message
}

type SessionFlowStreamChunk[Stream] struct {
    Chunk           *ai.ModelResponseChunk  // token-level streaming
    Status          Stream                  // user-defined status
    Artifact        *SessionFlowArtifact    // newly produced artifact
    SnapshotCreated string                  // ID of just-persisted snapshot
    EndTurn         bool                    // signals turn complete
}

type SessionFlowOutput[State] struct {
    SnapshotID string
    State      *SessionState[State]
}

type SessionState[State] struct {
    Messages    []*ai.Message
    Custom      State
    Artifacts   []*SessionFlowArtifact
    PromptInput any
}

type SessionFlowArtifact struct {
    Name     string
    Parts    []*ai.Part
    Metadata map[string]any
}

Snapshot System

// Store interface
type SnapshotStore[State] interface {
    GetSnapshot(ctx context.Context, snapshotID string) (*SessionSnapshot[State], error)
    SaveSnapshot(ctx context.Context, snapshot *SessionSnapshot[State]) error
}

func NewInMemorySnapshotStore[State]() *InMemorySnapshotStore[State]

// Callback — return true to create a snapshot, false to skip.
type SnapshotCallback[State] = func(ctx context.Context, sc *SnapshotContext[State]) bool

// Helper — only snapshot on specific events.
func SnapshotOn[State](events ...SnapshotEvent) SnapshotCallback[State]

// Events
const SnapshotEventTurnEnd       SnapshotEvent = "turnEnd"
const SnapshotEventInvocationEnd SnapshotEvent = "invocationEnd"

type SessionSnapshot[State] struct {
    SnapshotID string
    ParentID   string
    CreatedAt  time.Time
    TurnIndex  int
    Event      SnapshotEvent
    State      SessionState[State]
}

PromptRenderer interface

Satisfied by ai.Prompt and *ai.DataPrompt[In, Out]. Used by DefineSessionFlowFromPrompt.

type PromptRenderer[In any] interface {
    Render(ctx context.Context, input In) (*ai.GenerateActionOptions, error)
}

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @apascal07, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the Genkit Go SDK by introducing a new SessionFlow API, specifically designed for building and managing complex, stateful, multi-turn conversational AI applications. This new API is built upon a fundamental refactoring of the core Action type to support bidirectional streaming, enabling more dynamic and interactive AI experiences. The changes provide a structured approach to handling conversational state, including message history, custom data, and generated artifacts, with built-in mechanisms for persistence and lifecycle management.

Highlights

  • Introduction of SessionFlow API: A new SessionFlow API has been introduced in go/ai/x to manage stateful, multi-turn conversational AI interactions, including automatic snapshot management and artifact handling.
  • Core Action Refactoring for Bidirectional Streaming: The core ActionDef type has been refactored to Action to natively support bidirectional streaming, which is a foundational change enabling the SessionFlow API. This includes new BidiFunc and BidiConnection types.
  • Update of Existing AI Components: All existing AI components (embedder, evaluator, model, prompt, resource, retriever) have been updated to utilize the new core.Action type and its extended capabilities.
  • Snapshot Management Features: The SessionFlow now includes robust snapshot management, allowing state to be persisted, loaded, and controlled via callbacks, with an in-memory store implementation provided.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • go/ai/embedder.go
    • Updated embedder struct to use core.Action instead of core.ActionDef.
    • Modified NewEmbedder and LookupEmbedder to align with the new core.Action type and updated ResolveActionFor signature.
  • go/ai/evaluator.go
    • Updated evaluator struct to use core.Action instead of core.ActionDef.
    • Modified NewEvaluator, NewBatchEvaluator, and LookupEvaluator to align with the new core.Action type and updated ResolveActionFor signature.
  • go/ai/generate.go
    • Updated model and generateAction structs to use core.Action instead of core.ActionDef.
    • Modified LookupModel, model.Generate, and model.supportsConstrained to align with the new core.Action type and updated ResolveActionFor signature.
  • go/ai/prompt.go
    • Updated prompt struct to use core.Action instead of core.ActionDef.
    • Modified DefinePrompt, LookupPrompt, and prompt.Desc to align with the new core.Action type and updated ResolveActionFor signature.
  • go/ai/resource.go
    • Updated resource struct to use core.Action instead of core.ActionDef.
    • Modified DefineResource, NewResource, FindMatchingResource, and LookupResource to align with the new core.Action type and updated ResolveActionFor signature.
  • go/ai/retriever.go
    • Updated retriever struct to use core.Action instead of core.ActionDef.
    • Modified NewRetriever and LookupRetriever to align with the new core.Action type and updated ResolveActionFor signature.
  • go/ai/x/option.go
    • Added SessionFlowOption and StreamBidiOption interfaces for configuring session flows and bidirectional streams.
    • Introduced WithSnapshotStore, WithSnapshotCallback, WithState, and WithSnapshotID functions for flexible session flow initialization.
  • go/ai/x/session_flow.go
    • Introduced SessionFlowArtifact, SessionFlowInput, SessionFlowInit, SessionFlowOutput, and SessionFlowStreamChunk data structures.
    • Defined the Session type for managing conversational state (messages, custom state, artifacts) with methods for manipulation and snapshot handling.
    • Implemented Responder for sending various types of stream chunks (generation, status, artifacts).
    • Introduced the SessionFlow type, DefineSessionFlow for registration, and StreamBidi for initiating sessions.
    • Added logic for snapshot creation, loading, and integration with tracing, including SessionFlowConnection for buffered chunk reception.
  • go/ai/x/session_flow_test.go
    • Added comprehensive unit tests for SessionFlow covering multi-turn interactions, snapshot persistence, resuming from snapshots, client-managed state, artifact handling, snapshot callbacks, and error handling.
  • go/ai/x/snapshot.go
    • Defined SessionState for portable conversation state, SnapshotEvent for trigger types, and SessionSnapshot for persisted state.
    • Introduced SnapshotContext and SnapshotCallback for custom snapshot logic.
    • Defined SnapshotStore interface and provided an InMemorySnapshotStore implementation.
    • Added SnapshotOn utility function for selective snapshotting.
  • go/core/action.go
    • Refactored ActionDef to Action and added a new Init type parameter for bidirectional actions.
    • Introduced BidiFunc for bidirectional streaming function signatures and ActionOptions for configuration.
    • Implemented NewBidiAction and DefineBidiAction for creating and registering bidirectional actions.
    • Added StreamBidi method to Action for initiating bidirectional connections.
    • Introduced BidiConnection type for managing bidirectional streaming, including Send, Close, Receive, Output, and Done methods.
    • Updated ResolveActionFor and LookupActionFor to use the new Action type and Init parameter.
    • Added wrapBidiAsStreaming to adapt BidiFunc to StreamingFunc.
  • go/core/action_test.go
    • Updated existing tests to use DefineStreamingAction and the new Init type parameter in ResolveActionFor and LookupActionFor.
    • Added new tests for BidiAction functionality, covering echo, initialization, send after close, context cancellation, and Done channel.
  • go/core/api/action.go
    • Added new ActionType constants: ActionTypeSessionFlow and ActionTypeSnapshotStore.
    • Extended ActionDesc with StreamSchema and InitSchema fields for describing bidirectional actions.
  • go/core/background_action.go
    • Updated BackgroundActionDef to use core.Action instead of core.ActionDef.
    • Modified Register, NewBackgroundAction, and LookupBackgroundAction to align with the new core.Action type and updated ResolveActionFor signature.
  • go/core/flow.go
    • Refactored Flow to be a struct embedding *Action with the new Init type parameter.
    • Updated DefineFlow and DefineStreamingFlow to use the new Flow struct and Action type.
    • Introduced NewBidiFlow and DefineBidiFlow for creating and registering bidirectional flows.
    • Updated Run and Stream methods to use the embedded Action's Run method.
  • go/core/flow_test.go
    • Updated existing tests to reflect changes in Flow type and method calls.
    • Added new tests for BidiFlow functionality, including registration, echo, and integration with core.Run.
  • go/genkit/genkit.go
    • Updated DefineFlow and DefineStreamingFlow signatures to include the new Init type parameter.
    • Added DefineBidiFlow function to expose the new bidirectional flow definition.
  • go/genkit/session_flow.go
    • Introduced DefineSessionFlow as a top-level Genkit function to define and register session flows, wrapping aix.DefineSessionFlow.
  • go/samples/basic-session-flow/main.go
    • Added a sample CLI REPL application demonstrating the usage of SessionFlow for multi-turn conversations with token-level streaming and snapshot management.
Activity
  • The pull request author, apascal07, has indicated that the PR title adheres to conventional commits.
  • The author has confirmed that the changes have been manually and unit tested.
  • Documentation updates are noted as pending in the PR description.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@apascal07 apascal07 changed the base branch from main to ap/go-bidi February 6, 2026 05:48
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This is a great pull request that introduces the SessionFlow feature and refactors the core Action type to support bidirectional streaming. The new functionality is well-structured, comes with comprehensive tests, and includes a helpful sample application. I've identified a critical race condition in the new BidiConnection implementation and a minor issue in the sample code that would prevent it from running. My comments provide suggestions to address these points.

I am having trouble creating individual review comments. Click here to see my feedback.

go/core/action.go (534-550)

high

This Send implementation has a race condition that can cause a panic. The mutex is unlocked on line 540 before the channel send on line 543. If another goroutine calls Close() in between, c.inputCh will be closed, and the send will panic.

A robust way to fix this is to use recover to handle the "send on closed channel" panic, which is a common pattern in Go for this scenario. This avoids holding a lock over a potentially blocking operation.

Here's a suggested safer implementation for Send that removes the racy mutex usage. The Close method's use of the mutex remains important to make it safe for concurrent calls.

func (c *BidiConnection[In, Out, Stream]) Send(input In) (err error) {
	defer func() {
		if r := recover(); r != nil {
			// This recovers from a panic that occurs when sending on a closed channel.
			err = NewError(FAILED_PRECONDITION, "connection is closed")
		}
	}()

	select {
	case c.inputCh <- input:
		return nil
	case <-c.ctx.Done():
		return c.ctx.Err()
	case <-c.doneCh:
		// The recover will handle a panic if doneCh and inputCh close concurrently.
		return NewError(FAILED_PRECONDITION, "action has completed")
	}
}

go/samples/basic-session-flow/main.go (49-53)

medium

The model name googleai/gemini-3-flash-preview appears to be incorrect and will likely cause the sample to fail at runtime. Please use a valid model name, for example googleai/gemini-1.5-flash-latest.

					ai.WithModel(googlegenai.ModelRef("googleai/gemini-1.5-flash-latest", &genai.GenerateContentConfig{
						ThinkingConfig: &genai.ThinkingConfig{
							ThinkingBudget: genai.Ptr[int32](0),
						},
					})),

@apascal07 apascal07 changed the title feat(go): added SessionFlow and related feat(go): added DefineSessionFlow Feb 6, 2026
@apascal07 apascal07 mentioned this pull request Feb 6, 2026
@apascal07 apascal07 linked an issue Feb 6, 2026 that may be closed by this pull request
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

RFC: Session flows

1 participant