From 4304ad2b37f94ee70f531c58ba548aa1622b87e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?batuhan=20i=C3=A7=C3=B6z?= Date: Tue, 7 Apr 2026 09:49:49 +0200 Subject: [PATCH 01/11] Fit final edit payloads; status info & misc fixes Add logic to ensure final edit payloads fit within Matrix event size limits by estimating content size and progressively compacting: clear formatted body, drop link previews, compact or drop AI UI metadata, drop other extra fields, and trim body as a last resort. Introduce MaxMatrixEventContentBytes, FitFinalEditPayload, related helpers, fit result logging (used in Turn.buildFinalEdit), and tests for the behavior. Also add MatrixMessageStatusEventInfo helper to populate/fallback room IDs and use it in message status sending paths (bridges/ai client and handler and SendMatrixMessageStatus), and bump maxAgentLoopToolTurns from 10 to 50. Include new unit tests for status helper and final edit fitting. --- bridges/ai/agent_loop_runtime.go | 2 +- bridges/ai/client.go | 4 +- bridges/ai/handleai.go | 8 +- sdk/final_edit.go | 203 +++++++++++++++++++++++++++++++ sdk/final_edit_test.go | 92 ++++++++++++++ sdk/turn.go | 18 ++- status_helpers.go | 25 +++- status_helpers_test.go | 44 +++++++ 8 files changed, 387 insertions(+), 9 deletions(-) create mode 100644 sdk/final_edit_test.go create mode 100644 status_helpers_test.go diff --git a/bridges/ai/agent_loop_runtime.go b/bridges/ai/agent_loop_runtime.go index cf1d20c2..6c4d189d 100644 --- a/bridges/ai/agent_loop_runtime.go +++ b/bridges/ai/agent_loop_runtime.go @@ -8,7 +8,7 @@ import ( "maunium.net/go/mautrix/event" ) -const maxAgentLoopToolTurns = 10 +const maxAgentLoopToolTurns = 50 func runAgentLoopStreamStep[T any]( ctx context.Context, diff --git a/bridges/ai/client.go b/bridges/ai/client.go index b181d712..1ce274a0 100644 --- a/bridges/ai/client.go +++ b/bridges/ai/client.go @@ -601,7 +601,9 @@ func (oc *AIClient) sendQueueRejectedStatus(ctx context.Context, portal *bridgev WithIsCertain(true). WithSendNotice(false) for _, statusEvt := range queueStatusEvents(evt, extras) { - portal.Bridge.Matrix.SendMessageStatus(ctx, &msgStatus, bridgev2.StatusEventInfoFromEvent(statusEvt)) + if info := agentremote.MatrixMessageStatusEventInfo(portal, statusEvt); info != nil { + portal.Bridge.Matrix.SendMessageStatus(ctx, &msgStatus, info) + } } } diff --git a/bridges/ai/handleai.go b/bridges/ai/handleai.go index a29553dd..25a84979 100644 --- a/bridges/ai/handleai.go +++ b/bridges/ai/handleai.go @@ -59,10 +59,14 @@ func (oc *AIClient) notifyMatrixSendFailure(ctx context.Context, portal *bridgev WithMessage(errorMessage). WithIsCertain(true). WithSendNotice(true) - portal.Bridge.Matrix.SendMessageStatus(ctx, &msgStatus, bridgev2.StatusEventInfoFromEvent(evt)) + if info := agentremote.MatrixMessageStatusEventInfo(portal, evt); info != nil { + portal.Bridge.Matrix.SendMessageStatus(ctx, &msgStatus, info) + } for _, extra := range statusEventsFromContext(ctx) { if extra != nil { - portal.Bridge.Matrix.SendMessageStatus(ctx, &msgStatus, bridgev2.StatusEventInfoFromEvent(extra)) + if info := agentremote.MatrixMessageStatusEventInfo(portal, extra); info != nil { + portal.Bridge.Matrix.SendMessageStatus(ctx, &msgStatus, info) + } } } } diff --git a/sdk/final_edit.go b/sdk/final_edit.go index 0bdd09b8..ed027275 100644 --- a/sdk/final_edit.go +++ b/sdk/final_edit.go @@ -1,12 +1,20 @@ package sdk import ( + "encoding/json" + "fmt" "maps" + "reflect" "strings" "github.com/beeper/agentremote/pkg/matrixevents" + "github.com/beeper/agentremote/turns" + "maunium.net/go/mautrix/event" + "maunium.net/go/mautrix/id" ) +const MaxMatrixEventContentBytes = 60000 + // BuildCompactFinalUIMessage removes streaming-only parts from a UI message so // the payload is suitable for attachment to the final Matrix edit. func BuildCompactFinalUIMessage(uiMessage map[string]any) map[string]any { @@ -44,6 +52,24 @@ func BuildCompactFinalUIMessage(uiMessage map[string]any) map[string]any { return out } +// BuildMinimalFinalUIMessage removes optional detail from a UI message while +// preserving stable identifiers and metadata. +func BuildMinimalFinalUIMessage(uiMessage map[string]any) map[string]any { + if len(uiMessage) == 0 { + return nil + } + out := map[string]any{} + for _, key := range []string{"id", "role", "metadata"} { + if value, ok := uiMessage[key]; ok && value != nil { + out[key] = value + } + } + if len(out) == 0 { + return nil + } + return out +} + // BuildDefaultFinalEditExtra builds the SDK's default replacement payload // that should live inside m.new_content for terminal final edits. func BuildDefaultFinalEditExtra(uiMessage map[string]any) map[string]any { @@ -107,3 +133,180 @@ func withFinalEditFinishReason(uiMessage map[string]any, finishReason string) ma out["metadata"] = metadata return out } + +type FinalEditFitDetails struct { + OriginalSize int + FinalSize int + ClearedFormattedBody bool + DroppedLinkPreviews bool + CompactedUIMessage bool + DroppedUIMessage bool + DroppedExtra bool + TrimmedBody bool +} + +func (d FinalEditFitDetails) Changed() bool { + return d.ClearedFormattedBody || d.DroppedLinkPreviews || d.CompactedUIMessage || d.DroppedUIMessage || d.DroppedExtra || d.TrimmedBody +} + +func (d FinalEditFitDetails) Summary() string { + steps := make([]string, 0, 6) + if d.ClearedFormattedBody { + steps = append(steps, "cleared_formatted_body") + } + if d.DroppedLinkPreviews { + steps = append(steps, "dropped_link_previews") + } + if d.CompactedUIMessage { + steps = append(steps, "compacted_ui_message") + } + if d.DroppedUIMessage { + steps = append(steps, "dropped_ui_message") + } + if d.DroppedExtra { + steps = append(steps, "dropped_extra") + } + if d.TrimmedBody { + steps = append(steps, "trimmed_body") + } + if len(steps) == 0 { + return "" + } + return strings.Join(steps, ",") +} + +func cloneFinalEditPayload(payload *FinalEditPayload) *FinalEditPayload { + if payload == nil { + return nil + } + cloned := &FinalEditPayload{ + Extra: maps.Clone(payload.Extra), + TopLevelExtra: maps.Clone(payload.TopLevelExtra), + } + if payload.Content != nil { + content := *payload.Content + if payload.Content.Mentions != nil { + mentions := *payload.Content.Mentions + if len(mentions.UserIDs) > 0 { + mentions.UserIDs = append([]id.UserID(nil), mentions.UserIDs...) + } + content.Mentions = &mentions + } + cloned.Content = &content + } + return cloned +} + +func estimateFinalEditContentSize(payload *FinalEditPayload, target id.EventID) int { + if payload == nil || payload.Content == nil { + return 0 + } + content := *payload.Content + if content.Mentions == nil { + content.Mentions = &event.Mentions{} + } + content.SetEdit(target) + raw := maps.Clone(payload.TopLevelExtra) + if raw == nil { + raw = map[string]any{} + } + if payload.Extra != nil { + raw["m.new_content"] = payload.Extra + } + data, err := json.Marshal(&event.Content{ + Parsed: &content, + Raw: raw, + }) + if err != nil { + return MaxMatrixEventContentBytes + 1 + } + return len(data) +} + +func FitFinalEditPayload(payload *FinalEditPayload, target id.EventID) (*FinalEditPayload, FinalEditFitDetails) { + fitted := cloneFinalEditPayload(payload) + if fitted == nil || fitted.Content == nil { + return fitted, FinalEditFitDetails{} + } + details := FinalEditFitDetails{ + OriginalSize: estimateFinalEditContentSize(fitted, target), + } + size := details.OriginalSize + if size <= MaxMatrixEventContentBytes { + details.FinalSize = size + return fitted, details + } + + if fitted.Content.Format != "" || fitted.Content.FormattedBody != "" { + fitted.Content.Format = "" + fitted.Content.FormattedBody = "" + details.ClearedFormattedBody = true + size = estimateFinalEditContentSize(fitted, target) + } + if size > MaxMatrixEventContentBytes && fitted.Extra != nil { + if _, ok := fitted.Extra["com.beeper.linkpreviews"]; ok { + delete(fitted.Extra, "com.beeper.linkpreviews") + details.DroppedLinkPreviews = true + size = estimateFinalEditContentSize(fitted, target) + } + } + if size > MaxMatrixEventContentBytes && fitted.Extra != nil { + if rawUI, ok := fitted.Extra[matrixevents.BeeperAIKey].(map[string]any); ok { + minimalUI := BuildMinimalFinalUIMessage(rawUI) + switch { + case minimalUI == nil: + delete(fitted.Extra, matrixevents.BeeperAIKey) + details.DroppedUIMessage = true + case !reflect.DeepEqual(minimalUI, rawUI): + fitted.Extra[matrixevents.BeeperAIKey] = minimalUI + details.CompactedUIMessage = true + } + size = estimateFinalEditContentSize(fitted, target) + } + } + if size > MaxMatrixEventContentBytes && fitted.Extra != nil { + if _, ok := fitted.Extra[matrixevents.BeeperAIKey]; ok { + delete(fitted.Extra, matrixevents.BeeperAIKey) + details.DroppedUIMessage = true + size = estimateFinalEditContentSize(fitted, target) + } + } + if size > MaxMatrixEventContentBytes && len(fitted.Extra) > 0 { + fitted.Extra = nil + details.DroppedExtra = true + size = estimateFinalEditContentSize(fitted, target) + } + if size > MaxMatrixEventContentBytes && fitted.Content != nil && fitted.Content.Body != "" { + best := strings.TrimSpace(fitted.Content.Body) + low, high := 1, len(fitted.Content.Body) + for low <= high { + mid := (low + high) / 2 + candidate, _ := turns.SplitAtMarkdownBoundary(fitted.Content.Body, mid) + candidate = strings.TrimSpace(candidate) + if candidate == "" { + high = mid - 1 + continue + } + fitted.Content.Body = candidate + candidateSize := estimateFinalEditContentSize(fitted, target) + if candidateSize <= MaxMatrixEventContentBytes { + best = candidate + low = mid + 1 + } else { + high = mid - 1 + } + } + fitted.Content.Body = best + details.TrimmedBody = best != strings.TrimSpace(payload.Content.Body) + size = estimateFinalEditContentSize(fitted, target) + } + details.FinalSize = size + return fitted, details +} + +func FormatFinalEditFitLog(details FinalEditFitDetails) string { + if !details.Changed() { + return "" + } + return fmt.Sprintf("%d->%d bytes (%s)", details.OriginalSize, details.FinalSize, details.Summary()) +} diff --git a/sdk/final_edit_test.go b/sdk/final_edit_test.go new file mode 100644 index 00000000..e0eb99cf --- /dev/null +++ b/sdk/final_edit_test.go @@ -0,0 +1,92 @@ +package sdk + +import ( + "strings" + "testing" + + "github.com/beeper/agentremote/pkg/matrixevents" + "maunium.net/go/mautrix/event" + "maunium.net/go/mautrix/id" +) + +func TestFitFinalEditPayloadCompactsOptionalMetadataFirst(t *testing.T) { + largePart := map[string]any{ + "type": "tool-call", + "text": strings.Repeat("x", MaxMatrixEventContentBytes), + } + payload := &FinalEditPayload{ + Content: &event.MessageEventContent{ + MsgType: event.MsgText, + Body: "done", + Format: event.FormatHTML, + FormattedBody: strings.Repeat("

done

", MaxMatrixEventContentBytes/8), + }, + Extra: map[string]any{ + matrixevents.BeeperAIKey: map[string]any{ + "id": "turn-1", + "role": "assistant", + "metadata": map[string]any{"finish_reason": "stop"}, + "parts": []any{largePart}, + }, + "com.beeper.linkpreviews": []map[string]any{{ + "matched_url": "https://example.com", + "title": strings.Repeat("preview", 2000), + }}, + }, + TopLevelExtra: map[string]any{ + "com.beeper.dont_render_edited": true, + }, + } + + fitted, details := FitFinalEditPayload(payload, id.EventID("$event-1")) + if fitted == nil || fitted.Content == nil { + t.Fatal("expected fitted payload") + } + if details.FinalSize > MaxMatrixEventContentBytes { + t.Fatalf("expected fitted payload under %d bytes, got %d", MaxMatrixEventContentBytes, details.FinalSize) + } + if fitted.Content.Body != "done" { + t.Fatalf("expected body to be preserved, got %q", fitted.Content.Body) + } + if !details.ClearedFormattedBody { + t.Fatal("expected formatted body to be cleared before trimming body") + } + if !details.DroppedLinkPreviews { + t.Fatal("expected oversized link previews to be dropped") + } + if details.TrimmedBody { + t.Fatal("expected metadata reductions to avoid trimming the visible body") + } + if rawUI, ok := fitted.Extra[matrixevents.BeeperAIKey].(map[string]any); ok { + if _, ok = rawUI["parts"]; ok { + t.Fatalf("expected ui message parts to be removed, got %#v", rawUI["parts"]) + } + } +} + +func TestFitFinalEditPayloadTrimsBodyAsLastResort(t *testing.T) { + body := strings.Repeat("abc\n", MaxMatrixEventContentBytes) + payload := &FinalEditPayload{ + Content: &event.MessageEventContent{ + MsgType: event.MsgText, + Body: body, + }, + TopLevelExtra: map[string]any{ + "com.beeper.dont_render_edited": true, + }, + } + + fitted, details := FitFinalEditPayload(payload, id.EventID("$event-2")) + if fitted == nil || fitted.Content == nil { + t.Fatal("expected fitted payload") + } + if !details.TrimmedBody { + t.Fatal("expected oversized body to be trimmed") + } + if details.FinalSize > MaxMatrixEventContentBytes { + t.Fatalf("expected fitted payload under %d bytes, got %d", MaxMatrixEventContentBytes, details.FinalSize) + } + if len(fitted.Content.Body) >= len(body) { + t.Fatalf("expected trimmed body to be shorter than original") + } +} diff --git a/sdk/turn.go b/sdk/turn.go index 015d45f4..11a38957 100644 --- a/sdk/turn.go +++ b/sdk/turn.go @@ -675,13 +675,25 @@ func (t *Turn) buildFinalEdit() (networkid.MessageID, *bridgev2.ConvertedEdit) { if target == "" { return "", nil } - content := *payload.Content + fittedPayload, fitDetails := FitFinalEditPayload(payload, t.initialEventID) + if fittedPayload == nil || fittedPayload.Content == nil { + return "", nil + } + if fitDetails.Changed() && t.conv != nil && t.conv.login != nil { + t.conv.login.Log.Warn(). + Str("component", "sdk_turn"). + Int("original_bytes", fitDetails.OriginalSize). + Int("final_bytes", fitDetails.FinalSize). + Str("reductions", fitDetails.Summary()). + Msg("Reduced final edit payload to fit Matrix content limits") + } + content := *fittedPayload.Content if content.Mentions == nil { content.Mentions = &event.Mentions{} } content.RelatesTo = nil - extra := maps.Clone(payload.Extra) - topLevelExtra := maps.Clone(payload.TopLevelExtra) + extra := maps.Clone(fittedPayload.Extra) + topLevelExtra := maps.Clone(fittedPayload.TopLevelExtra) if extra == nil { extra = map[string]any{} } diff --git a/status_helpers.go b/status_helpers.go index 264263a6..e747e368 100644 --- a/status_helpers.go +++ b/status_helpers.go @@ -50,8 +50,29 @@ func SendMatrixMessageStatus( evt *event.Event, status bridgev2.MessageStatus, ) { - if portal == nil || portal.Bridge == nil || evt == nil { + if portal == nil || portal.Bridge == nil { return } - portal.Bridge.Matrix.SendMessageStatus(ctx, &status, bridgev2.StatusEventInfoFromEvent(evt)) + info := MatrixMessageStatusEventInfo(portal, evt) + if info == nil { + return + } + portal.Bridge.Matrix.SendMessageStatus(ctx, &status, info) +} + +func MatrixMessageStatusEventInfo( + portal *bridgev2.Portal, + evt *event.Event, +) *bridgev2.MessageStatusEventInfo { + if portal == nil || evt == nil { + return nil + } + info := bridgev2.StatusEventInfoFromEvent(evt) + if info == nil { + return nil + } + if info.RoomID == "" && portal.MXID != "" { + info.RoomID = portal.MXID + } + return info } diff --git a/status_helpers_test.go b/status_helpers_test.go new file mode 100644 index 00000000..89bfa5dc --- /dev/null +++ b/status_helpers_test.go @@ -0,0 +1,44 @@ +package agentremote + +import ( + "testing" + + "maunium.net/go/mautrix/appservice" + "maunium.net/go/mautrix/bridgev2" + "maunium.net/go/mautrix/bridgev2/database" + "maunium.net/go/mautrix/event" + "maunium.net/go/mautrix/id" +) + +func TestMatrixMessageStatusEventInfoFallsBackToPortalRoom(t *testing.T) { + portal := &bridgev2.Portal{ + Portal: &database.Portal{ + MXID: id.RoomID("!portal:test"), + }, + } + evt := &event.Event{ + ID: id.EventID("$event:test"), + Type: event.EventMessage, + Sender: id.UserID("@alice:test"), + Content: event.Content{ + Parsed: &event.MessageEventContent{MsgType: event.MsgText}, + Raw: map[string]any{ + appservice.DoublePuppetKey: true, + }, + }, + } + + info := MatrixMessageStatusEventInfo(portal, evt) + if info == nil { + t.Fatal("expected status event info") + } + if info.RoomID != portal.MXID { + t.Fatalf("expected room id %q, got %q", portal.MXID, info.RoomID) + } + if info.SourceEventID != evt.ID { + t.Fatalf("expected source event id %q, got %q", evt.ID, info.SourceEventID) + } + if !info.IsSourceEventDoublePuppeted { + t.Fatal("expected double puppet flag to be preserved") + } +} From 8fa5417a94fbaa73a1d51c3165e994bb6dfc9f30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?batuhan=20i=C3=A7=C3=B6z?= Date: Tue, 7 Apr 2026 12:13:34 +0200 Subject: [PATCH 02/11] Preserve reasoning/tool parts in final UI payload Treat reasoning (and tool/artifact) parts as non-duplicate when building the compact final UI message so they can be attached to the final Matrix edit when size allows. Update BuildCompactFinalUIMessage comment and logic to only drop duplicate text (and streaming step-start) parts, and adjust tests to expect reasoning and tool parts to be preserved (rename and extend test to verify presence of reasoning and tool metadata). --- bridges/ai/response_finalization_test.go | 29 +++++++++++++++++++++--- sdk/final_edit.go | 8 ++++--- sdk/turn_test.go | 4 ++-- 3 files changed, 33 insertions(+), 8 deletions(-) diff --git a/bridges/ai/response_finalization_test.go b/bridges/ai/response_finalization_test.go index 5f61bdaa..e00e3147 100644 --- a/bridges/ai/response_finalization_test.go +++ b/bridges/ai/response_finalization_test.go @@ -96,11 +96,20 @@ func TestBuildFinalEditUIMessage_IncludesSourceAndFileParts(t *testing.T) { } } -func TestBuildFinalEditUIMessage_OmitsTextAndReasoningParts(t *testing.T) { +func TestBuildFinalEditUIMessage_OmitsTextButKeepsReasoningAndToolPartsWhenTheyFit(t *testing.T) { oc := &AIClient{} state := testStreamingState("turn-2") state.accumulated.WriteString("hello") state.reasoning.WriteString("thinking") + state.toolCalls = []ToolCallMetadata{{ + CallID: "tool-call-2", + ToolName: "web.search", + ToolType: "function", + Status: "output-available", + ResultStatus: "completed", + Input: map[string]any{"query": "creatine"}, + Output: map[string]any{"ok": true}, + }} streamui.ApplyChunk(state.turn.UIState(), map[string]any{"type": "start", "messageId": "turn-2"}) streamui.ApplyChunk(state.turn.UIState(), map[string]any{"type": "text-start", "id": "text-2"}) streamui.ApplyChunk(state.turn.UIState(), map[string]any{"type": "text-delta", "id": "text-2", "delta": "hello"}) @@ -111,13 +120,27 @@ func TestBuildFinalEditUIMessage_OmitsTextAndReasoningParts(t *testing.T) { ui := buildCompactFinalUIMessage(oc.buildStreamUIMessage(state, modelModeTestMeta("openai/gpt-4.1"), nil)) parts, _ := ui["parts"].([]any) + foundReasoning := false + foundTool := false for _, rawPart := range parts { part, _ := rawPart.(map[string]any) switch part["type"] { - case "text", "reasoning": - t.Fatalf("expected final UIMessage to omit textual parts, got %#v", part) + case "text": + t.Fatalf("expected final UIMessage to omit duplicate text parts, got %#v", part) + case "reasoning": + foundReasoning = true + case "tool": + if part["toolCallId"] == "tool-call-2" { + foundTool = true + } } } + if !foundReasoning { + t.Fatal("expected reasoning part in compact final UI message") + } + if !foundTool { + t.Fatal("expected tool part in compact final UI message") + } } func TestBuildFinalEditUIMessage_UsesNestedUsageContextLimitFromSnapshot(t *testing.T) { diff --git a/sdk/final_edit.go b/sdk/final_edit.go index ed027275..7878705b 100644 --- a/sdk/final_edit.go +++ b/sdk/final_edit.go @@ -15,8 +15,10 @@ import ( const MaxMatrixEventContentBytes = 60000 -// BuildCompactFinalUIMessage removes streaming-only parts from a UI message so -// the payload is suitable for attachment to the final Matrix edit. +// BuildCompactFinalUIMessage removes duplicate streaming-only parts from a UI +// message so the payload is suitable for attachment to the final Matrix edit. +// Visible assistant text already lives in the Matrix message body, but +// reasoning/tool/artifact parts are preserved when the size budget allows. func BuildCompactFinalUIMessage(uiMessage map[string]any) map[string]any { if len(uiMessage) == 0 { return nil @@ -40,7 +42,7 @@ func BuildCompactFinalUIMessage(uiMessage map[string]any) map[string]any { continue } switch strings.TrimSpace(stringValue(part["type"])) { - case "text", "reasoning", "step-start": + case "text", "step-start": continue default: parts = append(parts, part) diff --git a/sdk/turn_test.go b/sdk/turn_test.go index be24a5da..42af7a39 100644 --- a/sdk/turn_test.go +++ b/sdk/turn_test.go @@ -565,8 +565,8 @@ func TestTurnBuildFinalEditDefaultsToVisibleText(t *testing.T) { if !ok { continue } - if partType := strings.TrimSpace(stringValue(part["type"])); partType == "text" || partType == "reasoning" { - t.Fatalf("expected compact final payload without textual parts, got %#v", part) + if partType := strings.TrimSpace(stringValue(part["type"])); partType == "text" { + t.Fatalf("expected compact final payload without duplicate text parts, got %#v", part) } } } From 777f5cb01dc386b8d4205b7991c7518b37de8b83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?batuhan=20i=C3=A7=C3=B6z?= Date: Tue, 7 Apr 2026 12:25:38 +0200 Subject: [PATCH 03/11] Add agent loop timeout; update final edit fitting Introduce a 2-minute agent loop wall-clock timeout and helper (withAgentLoopTimeout) used when running agent loops and heartbeats to ensure runs don't hang; update heartbeat logging to include the configured timeout. Replace backgroundContext usage with withAgentLoopTimeout in dispatch and heartbeat execution. In SDK final_edit, switch deep-clone logic to jsonutil.DeepCloneMap, adjust FitFinalEditPayload to return an error alongside the fitted payload and details (and return an error if payload still exceeds Matrix limits after fitting), and update imports accordingly. Changes touch bridges/ai/* and sdk/final_edit.go. --- bridges/ai/agent_loop_runtime.go | 78 ++++++++++++++++++++++++ bridges/ai/agent_loop_test.go | 23 +++++++ bridges/ai/handleai.go | 3 +- bridges/ai/heartbeat_execute.go | 4 +- bridges/ai/response_retry.go | 7 +++ bridges/ai/streaming_chat_completions.go | 3 + bridges/ai/streaming_error_handling.go | 3 + bridges/ai/streaming_executor.go | 2 + bridges/ai/streaming_function_calls.go | 2 + bridges/ai/streaming_responses_api.go | 3 + bridges/ai/tool_approvals.go | 2 + sdk/final_edit.go | 21 +++---- sdk/final_edit_test.go | 71 ++++++++++++++++++++- sdk/turn.go | 13 +++- sdk/turn_test.go | 21 +++++++ 15 files changed, 238 insertions(+), 18 deletions(-) diff --git a/bridges/ai/agent_loop_runtime.go b/bridges/ai/agent_loop_runtime.go index 6c4d189d..7fe21a96 100644 --- a/bridges/ai/agent_loop_runtime.go +++ b/bridges/ai/agent_loop_runtime.go @@ -2,6 +2,8 @@ package ai import ( "context" + "errors" + "time" "github.com/openai/openai-go/v3/packages/ssestream" "maunium.net/go/mautrix/bridgev2" @@ -9,6 +11,80 @@ import ( ) const maxAgentLoopToolTurns = 50 +const agentLoopInactivityTimeout = 10 * time.Minute +const heartbeatRunTimeout = 2 * time.Minute + +var errAgentLoopInactivityTimeout = errors.New("agent loop inactivity timeout") + +type agentLoopActivityKey struct{} + +func withActivityTimeout(parent context.Context, timeout time.Duration, timeoutErr error) (context.Context, func(), context.CancelFunc) { + ctx, cancel := context.WithCancelCause(parent) + if timeout <= 0 { + return ctx, func() {}, func() { cancel(context.Canceled) } + } + + activityCh := make(chan struct{}, 1) + go func() { + timer := time.NewTimer(timeout) + defer timer.Stop() + for { + select { + case <-ctx.Done(): + return + case <-activityCh: + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + timer.Reset(timeout) + case <-timer.C: + cancel(timeoutErr) + return + } + } + }() + + touch := func() { + select { + case activityCh <- struct{}{}: + default: + } + } + touch() + return ctx, touch, func() { cancel(context.Canceled) } +} + +func withAgentLoopActivity(ctx context.Context, touch func()) context.Context { + if touch == nil { + return ctx + } + return context.WithValue(ctx, agentLoopActivityKey{}, touch) +} + +func touchAgentLoopActivity(ctx context.Context) { + if touch, ok := ctx.Value(agentLoopActivityKey{}).(func()); ok && touch != nil { + touch() + } +} + +func agentLoopInactivityCause(ctx context.Context) error { + if ctx == nil { + return nil + } + cause := context.Cause(ctx) + if errors.Is(cause, errAgentLoopInactivityTimeout) { + return cause + } + return nil +} + +func (oc *AIClient) withAgentLoopInactivityTimeout(ctx context.Context) (context.Context, context.CancelFunc) { + runCtx, touch, cancel := withActivityTimeout(oc.backgroundContext(ctx), agentLoopInactivityTimeout, errAgentLoopInactivityTimeout) + return withAgentLoopActivity(runCtx, touch), cancel +} func runAgentLoopStreamStep[T any]( ctx context.Context, @@ -24,7 +100,9 @@ func runAgentLoopStreamStep[T any]( writer := state.writer() writer.StepStart(ctx) defer writer.StepFinish(ctx) + touchAgentLoopActivity(ctx) for stream.Next() { + touchAgentLoopActivity(ctx) current := stream.Current() done, cle, err := handleEvent(current) if err == nil && cle == nil && (shouldMarkSuccess == nil || shouldMarkSuccess(current)) { diff --git a/bridges/ai/agent_loop_test.go b/bridges/ai/agent_loop_test.go index cd1e6412..7a89b1b7 100644 --- a/bridges/ai/agent_loop_test.go +++ b/bridges/ai/agent_loop_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "testing" + "time" "maunium.net/go/mautrix/event" ) @@ -134,3 +135,25 @@ func TestExecuteAgentLoopRoundsDoesNotInlineFollowUpMessages(t *testing.T) { t.Fatalf("unexpected rounds observed: %#v", provider.roundsObserved) } } + +func TestWithActivityTimeoutResetsOnTouchAndCancelsOnIdle(t *testing.T) { + ctx, touch, cancel := withActivityTimeout(context.Background(), 40*time.Millisecond, errAgentLoopInactivityTimeout) + defer cancel() + + time.Sleep(20 * time.Millisecond) + touch() + time.Sleep(30 * time.Millisecond) + if err := ctx.Err(); err != nil { + t.Fatalf("expected context to stay alive after activity, got %v", err) + } + + select { + case <-ctx.Done(): + case <-time.After(100 * time.Millisecond): + t.Fatal("expected inactivity timeout to cancel the context") + } + + if !errors.Is(context.Cause(ctx), errAgentLoopInactivityTimeout) { + t.Fatalf("expected inactivity timeout cause, got %v", context.Cause(ctx)) + } +} diff --git a/bridges/ai/handleai.go b/bridges/ai/handleai.go index 25a84979..98ab2db6 100644 --- a/bridges/ai/handleai.go +++ b/bridges/ai/handleai.go @@ -27,7 +27,8 @@ func (oc *AIClient) dispatchCompletionInternal( meta *PortalMetadata, promptContext PromptContext, ) { - runCtx := oc.backgroundContext(ctx) + runCtx, cancel := oc.withAgentLoopInactivityTimeout(ctx) + defer cancel() // Always use streaming responses oc.runAgentLoopWithRetry(runCtx, sourceEvent, portal, meta, promptContext) diff --git a/bridges/ai/heartbeat_execute.go b/bridges/ai/heartbeat_execute.go index cb1e37a2..4129ed1f 100644 --- a/bridges/ai/heartbeat_execute.go +++ b/bridges/ai/heartbeat_execute.go @@ -197,7 +197,7 @@ func (oc *AIClient) runHeartbeatOnce(agentID string, heartbeat *HeartbeatConfig, Msg("Heartbeat executing") resultCh := make(chan HeartbeatRunOutcome, 1) - timeoutCtx, cancel := context.WithTimeout(oc.backgroundContext(context.Background()), 2*time.Minute) + timeoutCtx, cancel := context.WithTimeout(oc.backgroundContext(context.Background()), heartbeatRunTimeout) defer cancel() runCtx := withHeartbeatRun(timeoutCtx, hbCfg, resultCh) done := make(chan struct{}) @@ -219,7 +219,7 @@ func (oc *AIClient) runHeartbeatOnce(agentID string, heartbeat *HeartbeatConfig, oc.emitHeartbeatFailure(hbCfg, startedAtMs, "stream-finished-without-outcome") return heartbeatRunResult{Status: "failed", Reason: "heartbeat failed"} case <-timeoutCtx.Done(): - oc.log.Warn().Str("agent_id", agentID).Msg("Heartbeat timed out after 2 minutes") + oc.log.Warn().Str("agent_id", agentID).Dur("timeout", heartbeatRunTimeout).Msg("Heartbeat timed out") oc.emitHeartbeatFailure(hbCfg, startedAtMs, "timeout") return heartbeatRunResult{Status: "failed", Reason: "heartbeat timed out"} } diff --git a/bridges/ai/response_retry.go b/bridges/ai/response_retry.go index a999ace5..b13fb330 100644 --- a/bridges/ai/response_retry.go +++ b/bridges/ai/response_retry.go @@ -50,6 +50,10 @@ func (oc *AIClient) responseWithRetry( } if err != nil { if errors.Is(err, context.Canceled) { + if timeoutErr := agentLoopInactivityCause(ctx); timeoutErr != nil { + oc.loggerForContext(ctx).Warn().Err(timeoutErr).Int("attempt", attempt+1).Str("log_label", logLabel).Msg("Agent loop timed out due to inactivity") + return false, timeoutErr + } return true, nil } oc.loggerForContext(ctx).Warn().Err(err).Int("attempt", attempt+1).Str("log_label", logLabel).Msg("Response attempt failed with error") @@ -360,6 +364,9 @@ func (oc *AIClient) runAgentLoopWithRetry( return } if errors.Is(err, context.Canceled) { + if timeoutErr := agentLoopInactivityCause(ctx); timeoutErr != nil { + oc.notifyMatrixSendFailure(ctx, portal, evt, timeoutErr) + } return } oc.notifyMatrixSendFailure(ctx, portal, evt, err) diff --git a/bridges/ai/streaming_chat_completions.go b/bridges/ai/streaming_chat_completions.go index 4f1c3ea6..6d92f94e 100644 --- a/bridges/ai/streaming_chat_completions.go +++ b/bridges/ai/streaming_chat_completions.go @@ -26,6 +26,9 @@ func (a *chatCompletionsTurnAdapter) handleStreamStepError( stepErr error, ) (*ContextLengthError, error) { if errors.Is(stepErr, context.Canceled) { + if timeoutErr := agentLoopInactivityCause(ctx); timeoutErr != nil { + return nil, a.oc.finishStreamingWithFailure(ctx, a.log, a.portal, a.state, a.meta, "timeout", timeoutErr) + } return nil, a.oc.finishStreamingWithFailure(ctx, a.log, a.portal, a.state, a.meta, "cancelled", stepErr) } if cle := ParseContextLengthError(stepErr); cle != nil { diff --git a/bridges/ai/streaming_error_handling.go b/bridges/ai/streaming_error_handling.go index 14a848fa..2271126a 100644 --- a/bridges/ai/streaming_error_handling.go +++ b/bridges/ai/streaming_error_handling.go @@ -70,6 +70,9 @@ func (oc *AIClient) handleResponsesStreamErr( includeContextLength bool, ) (*ContextLengthError, error) { if errors.Is(err, context.Canceled) { + if timeoutErr := agentLoopInactivityCause(ctx); timeoutErr != nil { + return nil, oc.finishStreamingWithFailure(context.Background(), *oc.loggerForContext(ctx), portal, state, meta, "timeout", timeoutErr) + } return nil, oc.finishStreamingWithFailure(context.Background(), *oc.loggerForContext(ctx), portal, state, meta, "cancelled", err) } diff --git a/bridges/ai/streaming_executor.go b/bridges/ai/streaming_executor.go index 303d2956..09228c4c 100644 --- a/bridges/ai/streaming_executor.go +++ b/bridges/ai/streaming_executor.go @@ -80,7 +80,9 @@ func executeAgentLoopRounds( evt *event.Event, ) (bool, *ContextLengthError, error) { for round := 0; ; round++ { + touchAgentLoopActivity(ctx) continueLoop, cle, err := provider.RunAgentTurn(ctx, evt, round) + touchAgentLoopActivity(ctx) if cle != nil || err != nil { finalizeAgentLoopExit(ctx, provider, true) return false, cle, err diff --git a/bridges/ai/streaming_function_calls.go b/bridges/ai/streaming_function_calls.go index 32a0437f..460bcb9c 100644 --- a/bridges/ai/streaming_function_calls.go +++ b/bridges/ai/streaming_function_calls.go @@ -251,6 +251,7 @@ func (oc *AIClient) executeStreamingBuiltinTool( result = "Denied by user" } if resultStatus != ResultStatusDenied { + touchAgentLoopActivity(ctx) toolCtx := WithBridgeToolContext(ctx, &BridgeToolContext{ Client: oc, Portal: portal, @@ -265,6 +266,7 @@ func (oc *AIClient) executeStreamingBuiltinTool( result = fmt.Sprintf("Error: %s", err) resultStatus = ResultStatusError } + touchAgentLoopActivity(ctx) } } diff --git a/bridges/ai/streaming_responses_api.go b/bridges/ai/streaming_responses_api.go index 861907e3..4b0f0919 100644 --- a/bridges/ai/streaming_responses_api.go +++ b/bridges/ai/streaming_responses_api.go @@ -131,6 +131,9 @@ func (a *responsesTurnAdapter) RunAgentTurn( stream, params, err = a.startContinuationRound(ctx) if err != nil { if errors.Is(err, context.Canceled) { + if timeoutErr := agentLoopInactivityCause(ctx); timeoutErr != nil { + return false, nil, a.oc.finishStreamingWithFailure(ctx, a.log, a.portal, state, a.meta, "timeout", timeoutErr) + } return false, nil, a.oc.finishStreamingWithFailure(ctx, a.log, a.portal, state, a.meta, "cancelled", err) } logResponsesFailure(a.log, err, params, a.meta, a.prompt, "continuation_init") diff --git a/bridges/ai/tool_approvals.go b/bridges/ai/tool_approvals.go index a78b3b7d..ec93fb6e 100644 --- a/bridges/ai/tool_approvals.go +++ b/bridges/ai/tool_approvals.go @@ -378,10 +378,12 @@ func (oc *AIClient) waitForToolApprovalDecision( state *streamingState, handle bridgesdk.ApprovalHandle, ) airuntime.ToolApprovalDecision { + touchAgentLoopActivity(ctx) if handle == nil { return airuntime.ToolApprovalDecision{State: airuntime.ToolApprovalTimedOut, Reason: approvalWaitReason(ctx)} } resp, err := handle.Wait(ctx) + touchAgentLoopActivity(ctx) if err != nil { return airuntime.ToolApprovalDecision{State: airuntime.ToolApprovalDenied, Reason: err.Error()} } diff --git a/sdk/final_edit.go b/sdk/final_edit.go index 7878705b..07924316 100644 --- a/sdk/final_edit.go +++ b/sdk/final_edit.go @@ -8,6 +8,7 @@ import ( "strings" "github.com/beeper/agentremote/pkg/matrixevents" + "github.com/beeper/agentremote/pkg/shared/jsonutil" "github.com/beeper/agentremote/turns" "maunium.net/go/mautrix/event" "maunium.net/go/mautrix/id" @@ -182,8 +183,8 @@ func cloneFinalEditPayload(payload *FinalEditPayload) *FinalEditPayload { return nil } cloned := &FinalEditPayload{ - Extra: maps.Clone(payload.Extra), - TopLevelExtra: maps.Clone(payload.TopLevelExtra), + Extra: jsonutil.DeepCloneMap(payload.Extra), + TopLevelExtra: jsonutil.DeepCloneMap(payload.TopLevelExtra), } if payload.Content != nil { content := *payload.Content @@ -225,10 +226,10 @@ func estimateFinalEditContentSize(payload *FinalEditPayload, target id.EventID) return len(data) } -func FitFinalEditPayload(payload *FinalEditPayload, target id.EventID) (*FinalEditPayload, FinalEditFitDetails) { +func FitFinalEditPayload(payload *FinalEditPayload, target id.EventID) (*FinalEditPayload, FinalEditFitDetails, error) { fitted := cloneFinalEditPayload(payload) if fitted == nil || fitted.Content == nil { - return fitted, FinalEditFitDetails{} + return fitted, FinalEditFitDetails{}, nil } details := FinalEditFitDetails{ OriginalSize: estimateFinalEditContentSize(fitted, target), @@ -236,7 +237,7 @@ func FitFinalEditPayload(payload *FinalEditPayload, target id.EventID) (*FinalEd size := details.OriginalSize if size <= MaxMatrixEventContentBytes { details.FinalSize = size - return fitted, details + return fitted, details, nil } if fitted.Content.Format != "" || fitted.Content.FormattedBody != "" { @@ -303,12 +304,8 @@ func FitFinalEditPayload(payload *FinalEditPayload, target id.EventID) (*FinalEd size = estimateFinalEditContentSize(fitted, target) } details.FinalSize = size - return fitted, details -} - -func FormatFinalEditFitLog(details FinalEditFitDetails) string { - if !details.Changed() { - return "" + if size > MaxMatrixEventContentBytes { + return nil, details, fmt.Errorf("final edit payload exceeds Matrix content limit after fitting: %d > %d", size, MaxMatrixEventContentBytes) } - return fmt.Sprintf("%d->%d bytes (%s)", details.OriginalSize, details.FinalSize, details.Summary()) + return fitted, details, nil } diff --git a/sdk/final_edit_test.go b/sdk/final_edit_test.go index e0eb99cf..d4968290 100644 --- a/sdk/final_edit_test.go +++ b/sdk/final_edit_test.go @@ -38,7 +38,10 @@ func TestFitFinalEditPayloadCompactsOptionalMetadataFirst(t *testing.T) { }, } - fitted, details := FitFinalEditPayload(payload, id.EventID("$event-1")) + fitted, details, err := FitFinalEditPayload(payload, id.EventID("$event-1")) + if err != nil { + t.Fatalf("expected fit to succeed, got %v", err) + } if fitted == nil || fitted.Content == nil { t.Fatal("expected fitted payload") } @@ -64,6 +67,67 @@ func TestFitFinalEditPayloadCompactsOptionalMetadataFirst(t *testing.T) { } } +func TestFitFinalEditPayloadDeepClonesNestedMaps(t *testing.T) { + payload := &FinalEditPayload{ + Content: &event.MessageEventContent{ + MsgType: event.MsgText, + Body: "done", + }, + Extra: map[string]any{ + "nested": map[string]any{ + "value": "original", + }, + }, + TopLevelExtra: map[string]any{ + "outer": map[string]any{ + "flag": true, + }, + }, + } + + fitted, _, err := FitFinalEditPayload(payload, id.EventID("$event-clone")) + if err != nil { + t.Fatalf("expected fit to succeed, got %v", err) + } + if fitted == nil || fitted.Content == nil { + t.Fatal("expected fitted payload") + } + + fitted.Extra["nested"].(map[string]any)["value"] = "changed" + fitted.TopLevelExtra["outer"].(map[string]any)["flag"] = false + + if got := payload.Extra["nested"].(map[string]any)["value"]; got != "original" { + t.Fatalf("expected original nested extra to stay unchanged, got %#v", got) + } + if got := payload.TopLevelExtra["outer"].(map[string]any)["flag"]; got != true { + t.Fatalf("expected original nested top-level extra to stay unchanged, got %#v", got) + } +} + +func TestFitFinalEditPayloadReturnsErrorWhenPayloadCannotFit(t *testing.T) { + payload := &FinalEditPayload{ + Content: &event.MessageEventContent{ + MsgType: event.MsgText, + Body: "done", + }, + TopLevelExtra: map[string]any{ + "com.beeper.dont_render_edited": true, + "huge": strings.Repeat("x", MaxMatrixEventContentBytes), + }, + } + + fitted, details, err := FitFinalEditPayload(payload, id.EventID("$event-too-large")) + if err == nil { + t.Fatal("expected fit to fail for unshrinkable payload") + } + if fitted != nil { + t.Fatalf("expected no fitted payload on failure, got %#v", fitted) + } + if details.FinalSize <= MaxMatrixEventContentBytes { + t.Fatalf("expected final size to remain oversized, got %d", details.FinalSize) + } +} + func TestFitFinalEditPayloadTrimsBodyAsLastResort(t *testing.T) { body := strings.Repeat("abc\n", MaxMatrixEventContentBytes) payload := &FinalEditPayload{ @@ -76,7 +140,10 @@ func TestFitFinalEditPayloadTrimsBodyAsLastResort(t *testing.T) { }, } - fitted, details := FitFinalEditPayload(payload, id.EventID("$event-2")) + fitted, details, err := FitFinalEditPayload(payload, id.EventID("$event-2")) + if err != nil { + t.Fatalf("expected fit to succeed, got %v", err) + } if fitted == nil || fitted.Content == nil { t.Fatal("expected fitted payload") } diff --git a/sdk/turn.go b/sdk/turn.go index 11a38957..608e33a6 100644 --- a/sdk/turn.go +++ b/sdk/turn.go @@ -675,7 +675,18 @@ func (t *Turn) buildFinalEdit() (networkid.MessageID, *bridgev2.ConvertedEdit) { if target == "" { return "", nil } - fittedPayload, fitDetails := FitFinalEditPayload(payload, t.initialEventID) + fittedPayload, fitDetails, err := FitFinalEditPayload(payload, t.initialEventID) + if err != nil { + if t.conv != nil && t.conv.login != nil { + t.conv.login.Log.Warn(). + Err(err). + Str("component", "sdk_turn"). + Int("original_bytes", fitDetails.OriginalSize). + Int("final_bytes", fitDetails.FinalSize). + Msg("Skipped final edit because payload could not fit Matrix content limits") + } + return "", nil + } if fittedPayload == nil || fittedPayload.Content == nil { return "", nil } diff --git a/sdk/turn_test.go b/sdk/turn_test.go index 42af7a39..350a5172 100644 --- a/sdk/turn_test.go +++ b/sdk/turn_test.go @@ -630,6 +630,27 @@ func TestTurnBuildFinalEditPreservesMentionsInContent(t *testing.T) { } } +func TestTurnBuildFinalEditSkipsUnshrinkablePayload(t *testing.T) { + turn := newTurn(context.Background(), nil, nil, nil) + turn.initialEventID = id.EventID("$event-too-large") + turn.networkMessageID = "msg-too-large" + turn.SetFinalEditPayload(&FinalEditPayload{ + Content: &event.MessageEventContent{ + MsgType: event.MsgText, + Body: "done", + }, + TopLevelExtra: map[string]any{ + "com.beeper.dont_render_edited": true, + "huge": strings.Repeat("x", MaxMatrixEventContentBytes), + }, + }) + + target, edit := turn.buildFinalEdit() + if target != "" || edit != nil { + t.Fatalf("expected oversized final edit to be skipped, got target=%q edit=%#v", target, edit) + } +} + func TestApplyStreamPartPreservesWhitespaceTextDelta(t *testing.T) { turn := newTurn(context.Background(), nil, nil, nil) From 5eb34998820e4f071c753f6eac5727cc5d62b37f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?batuhan=20i=C3=A7=C3=B6z?= Date: Tue, 7 Apr 2026 12:29:06 +0200 Subject: [PATCH 04/11] Reorder imports in sdk/final_edit files MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reorganize import blocks in sdk/final_edit.go and sdk/final_edit_test.go to group third-party packages consistently (maunium.net imports) and maintain gofmt/goimports ordering. No functional changes — only import formatting. --- sdk/final_edit.go | 5 +++-- sdk/final_edit_test.go | 3 ++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/sdk/final_edit.go b/sdk/final_edit.go index 07924316..adb4ff3d 100644 --- a/sdk/final_edit.go +++ b/sdk/final_edit.go @@ -7,11 +7,12 @@ import ( "reflect" "strings" + "maunium.net/go/mautrix/event" + "maunium.net/go/mautrix/id" + "github.com/beeper/agentremote/pkg/matrixevents" "github.com/beeper/agentremote/pkg/shared/jsonutil" "github.com/beeper/agentremote/turns" - "maunium.net/go/mautrix/event" - "maunium.net/go/mautrix/id" ) const MaxMatrixEventContentBytes = 60000 diff --git a/sdk/final_edit_test.go b/sdk/final_edit_test.go index d4968290..02865414 100644 --- a/sdk/final_edit_test.go +++ b/sdk/final_edit_test.go @@ -4,9 +4,10 @@ import ( "strings" "testing" - "github.com/beeper/agentremote/pkg/matrixevents" "maunium.net/go/mautrix/event" "maunium.net/go/mautrix/id" + + "github.com/beeper/agentremote/pkg/matrixevents" ) func TestFitFinalEditPayloadCompactsOptionalMetadataFirst(t *testing.T) { From 9098d1bbee36b8f613716a18f8e9b01ce733709f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?batuhan=20i=C3=A7=C3=B6z?= Date: Tue, 7 Apr 2026 14:09:45 +0200 Subject: [PATCH 05/11] Fallback to text-only final edits on fit failure If fitting a FinalEditPayload fails due to size limits, try a text-only fallback (strip Extra/TopLevelExtra) and use that instead; enhance logging with fallback metrics and errors. Fix the binary-search trimming logic in FitFinalEditPayload to operate on the original body (avoids re-trimming an already-trimmed body). Add BuildTextOnlyFinalEditPayload helper and tests covering the binary-search behavior and the fallback path. Also adjust activity timeout timings in agent loop tests to reduce flakiness. --- bridges/ai/agent_loop_test.go | 8 +-- sdk/final_edit.go | 19 +++++-- sdk/final_edit_test.go | 31 +++++++++++ sdk/turn.go | 86 +++++++++++++++++++--------- sdk/turn_test.go | 102 +++++++++++++++++++++++++++++++++- 5 files changed, 211 insertions(+), 35 deletions(-) diff --git a/bridges/ai/agent_loop_test.go b/bridges/ai/agent_loop_test.go index 7a89b1b7..e29c142a 100644 --- a/bridges/ai/agent_loop_test.go +++ b/bridges/ai/agent_loop_test.go @@ -137,19 +137,19 @@ func TestExecuteAgentLoopRoundsDoesNotInlineFollowUpMessages(t *testing.T) { } func TestWithActivityTimeoutResetsOnTouchAndCancelsOnIdle(t *testing.T) { - ctx, touch, cancel := withActivityTimeout(context.Background(), 40*time.Millisecond, errAgentLoopInactivityTimeout) + ctx, touch, cancel := withActivityTimeout(context.Background(), 100*time.Millisecond, errAgentLoopInactivityTimeout) defer cancel() - time.Sleep(20 * time.Millisecond) + time.Sleep(40 * time.Millisecond) touch() - time.Sleep(30 * time.Millisecond) + time.Sleep(40 * time.Millisecond) if err := ctx.Err(); err != nil { t.Fatalf("expected context to stay alive after activity, got %v", err) } select { case <-ctx.Done(): - case <-time.After(100 * time.Millisecond): + case <-time.After(200 * time.Millisecond): t.Fatal("expected inactivity timeout to cancel the context") } diff --git a/sdk/final_edit.go b/sdk/final_edit.go index adb4ff3d..555b1a70 100644 --- a/sdk/final_edit.go +++ b/sdk/final_edit.go @@ -281,11 +281,12 @@ func FitFinalEditPayload(payload *FinalEditPayload, target id.EventID) (*FinalEd size = estimateFinalEditContentSize(fitted, target) } if size > MaxMatrixEventContentBytes && fitted.Content != nil && fitted.Content.Body != "" { - best := strings.TrimSpace(fitted.Content.Body) - low, high := 1, len(fitted.Content.Body) + originalBody := fitted.Content.Body + best := strings.TrimSpace(originalBody) + low, high := 1, len(originalBody) for low <= high { mid := (low + high) / 2 - candidate, _ := turns.SplitAtMarkdownBoundary(fitted.Content.Body, mid) + candidate, _ := turns.SplitAtMarkdownBoundary(originalBody, mid) candidate = strings.TrimSpace(candidate) if candidate == "" { high = mid - 1 @@ -301,7 +302,7 @@ func FitFinalEditPayload(payload *FinalEditPayload, target id.EventID) (*FinalEd } } fitted.Content.Body = best - details.TrimmedBody = best != strings.TrimSpace(payload.Content.Body) + details.TrimmedBody = best != strings.TrimSpace(originalBody) size = estimateFinalEditContentSize(fitted, target) } details.FinalSize = size @@ -310,3 +311,13 @@ func FitFinalEditPayload(payload *FinalEditPayload, target id.EventID) (*FinalEd } return fitted, details, nil } + +func BuildTextOnlyFinalEditPayload(payload *FinalEditPayload) *FinalEditPayload { + minimal := cloneFinalEditPayload(payload) + if minimal == nil { + return nil + } + minimal.Extra = nil + minimal.TopLevelExtra = nil + return minimal +} diff --git a/sdk/final_edit_test.go b/sdk/final_edit_test.go index 02865414..1d83ee5f 100644 --- a/sdk/final_edit_test.go +++ b/sdk/final_edit_test.go @@ -158,3 +158,34 @@ func TestFitFinalEditPayloadTrimsBodyAsLastResort(t *testing.T) { t.Fatalf("expected trimmed body to be shorter than original") } } + +func TestFitFinalEditPayloadBinarySearchUsesOriginalBody(t *testing.T) { + paragraphOne := strings.Repeat("a", 25000) + paragraphTwo := strings.Repeat("b", 25000) + paragraphThree := strings.Repeat("c", 25000) + body := paragraphOne + "\n\n" + paragraphTwo + "\n\n" + paragraphThree + payload := &FinalEditPayload{ + Content: &event.MessageEventContent{ + MsgType: event.MsgText, + Body: body, + }, + TopLevelExtra: map[string]any{ + "com.beeper.dont_render_edited": true, + }, + } + + fitted, details, err := FitFinalEditPayload(payload, id.EventID("$event-boundary")) + if err != nil { + t.Fatalf("expected fit to succeed, got %v", err) + } + if fitted == nil || fitted.Content == nil { + t.Fatal("expected fitted payload") + } + want := paragraphOne + "\n\n" + paragraphTwo + if fitted.Content.Body != want { + t.Fatalf("expected trimmed body to retain the largest markdown-safe prefix, got len=%d want len=%d", len(fitted.Content.Body), len(want)) + } + if !details.TrimmedBody { + t.Fatal("expected body trimming details to be reported") + } +} diff --git a/sdk/turn.go b/sdk/turn.go index 608e33a6..9d870940 100644 --- a/sdk/turn.go +++ b/sdk/turn.go @@ -357,17 +357,7 @@ func (t *Turn) defaultStreamPublisher(callCtx context.Context) (bridgev2.BeeperS } func (t *Turn) ensureSenderJoined() error { - if t == nil || t.conv == nil || t.conv.portal == nil || t.conv.portal.MXID == "" { - return nil - } - if t.conv.intentOverride == nil && (t.conv.login == nil || t.conv.login.Bridge == nil || t.conv.portal.Bridge == nil) { - return nil - } - intent, err := t.conv.getIntent(t.turnCtx) - if err != nil { - return err - } - return intent.EnsureJoined(t.turnCtx, t.conv.portal.MXID) + return t.ensureSenderJoinedWithContext(t.turnCtx) } func (t *Turn) ensureStarted() { @@ -639,17 +629,18 @@ func (t *Turn) finalMetadata(finishReason string) agentremote.BaseMessageMetadat } func (t *Turn) persistFinalMessage(finishReason string) { + finalCtx := t.finalizationContext() if t.conv == nil || t.conv.login == nil || t.conv.portal == nil { return } - sender := t.resolveSender(t.turnCtx) + sender := t.resolveSender(finalCtx) metadata := any(t.finalMetadata(finishReason)) if t.finalMetadataProvider != nil { if custom := t.finalMetadataProvider.FinalMetadata(t, finishReason); custom != nil { metadata = custom } } - agentremote.UpsertAssistantMessage(t.turnCtx, agentremote.UpsertAssistantMessageParams{ + agentremote.UpsertAssistantMessage(finalCtx, agentremote.UpsertAssistantMessageParams{ Login: t.conv.login, Portal: t.conv.portal, SenderID: sender.Sender, @@ -677,15 +668,26 @@ func (t *Turn) buildFinalEdit() (networkid.MessageID, *bridgev2.ConvertedEdit) { } fittedPayload, fitDetails, err := FitFinalEditPayload(payload, t.initialEventID) if err != nil { - if t.conv != nil && t.conv.login != nil { + fallbackPayload := BuildTextOnlyFinalEditPayload(payload) + fallbackFittedPayload, fallbackFitDetails, fallbackErr := FitFinalEditPayload(fallbackPayload, t.initialEventID) + if fallbackErr == nil { + fittedPayload = fallbackFittedPayload + fitDetails = fallbackFitDetails + err = nil + } else if t.conv != nil && t.conv.login != nil { t.conv.login.Log.Warn(). - Err(err). Str("component", "sdk_turn"). Int("original_bytes", fitDetails.OriginalSize). - Int("final_bytes", fitDetails.FinalSize). + Int("original_final_bytes", fitDetails.FinalSize). + Err(err). + Int("fallback_bytes", fallbackFitDetails.OriginalSize). + Int("fallback_final_bytes", fallbackFitDetails.FinalSize). + Str("fallback_error", fallbackErr.Error()). Msg("Skipped final edit because payload could not fit Matrix content limits") + return "", nil + } else { + return "", nil } - return "", nil } if fittedPayload == nil || fittedPayload.Content == nil { return "", nil @@ -724,7 +726,7 @@ func (t *Turn) buildFinalEdit() (networkid.MessageID, *bridgev2.ConvertedEdit) { } } -func (t *Turn) sendFinalEdit() { +func (t *Turn) sendFinalEdit(ctx context.Context) { if t == nil || t.conv == nil || t.conv.login == nil || t.conv.portal == nil { return } @@ -732,10 +734,10 @@ func (t *Turn) sendFinalEdit() { if target == "" || edit == nil { return } - if err := t.ensureSenderJoined(); err != nil && t.conv.login != nil { + if err := t.ensureSenderJoinedWithContext(ctx); err != nil && t.conv.login != nil { t.conv.login.Log.Warn().Err(err).Str("component", "sdk_turn").Msg("Failed to ensure sender joined before final turn edit") } - sender := t.resolveSender(t.turnCtx) + sender := t.resolveSender(ctx) if err := agentremote.SendEditViaPortal( t.conv.login, t.conv.portal, @@ -832,11 +834,12 @@ func (t *Turn) Abort(reason string) { } func (t *Turn) finalizeTurn(endReason turns.EndReason, finishReason, fallbackBody string) { - t.flushPendingStream() + finalCtx := t.finalizationContext() + t.flushPendingStream(finalCtx) t.ensureDefaultFinalEditPayload(finishReason, fallbackBody) - t.sendFinalEdit() + t.sendFinalEdit(finalCtx) if t.session != nil { - t.session.End(t.turnCtx, endReason) + t.session.End(finalCtx, endReason) } t.persistFinalMessage(finishReason) } @@ -1046,11 +1049,44 @@ func (t *Turn) awaitStreamStart() { } } -func (t *Turn) flushPendingStream() { +func (t *Turn) flushPendingStream(ctx context.Context) { if t == nil || t.session == nil { return } - if err := t.session.FlushPending(t.turnCtx); err != nil && t.startErr == nil { + if err := t.session.FlushPending(ctx); err != nil && t.startErr == nil { t.startErr = err } } + +func (t *Turn) finalizationContext() context.Context { + if t == nil { + return context.Background() + } + if t.turnCtx != nil && t.turnCtx.Err() == nil { + return t.turnCtx + } + if t.conv != nil && t.conv.ctx != nil && t.conv.ctx.Err() == nil { + return t.conv.ctx + } + if t.ctx != nil && t.ctx.Err() == nil { + return t.ctx + } + if t.conv != nil && t.conv.login != nil && t.conv.login.Bridge != nil && t.conv.login.Bridge.BackgroundCtx != nil { + return t.conv.login.Bridge.BackgroundCtx + } + return context.Background() +} + +func (t *Turn) ensureSenderJoinedWithContext(ctx context.Context) error { + if t == nil || t.conv == nil || t.conv.portal == nil || t.conv.portal.MXID == "" { + return nil + } + if t.conv.intentOverride == nil && (t.conv.login == nil || t.conv.login.Bridge == nil || t.conv.portal.Bridge == nil) { + return nil + } + intent, err := t.conv.getIntent(ctx) + if err != nil { + return err + } + return intent.EnsureJoined(ctx, t.conv.portal.MXID) +} diff --git a/sdk/turn_test.go b/sdk/turn_test.go index 350a5172..ce4cc9c8 100644 --- a/sdk/turn_test.go +++ b/sdk/turn_test.go @@ -634,11 +634,38 @@ func TestTurnBuildFinalEditSkipsUnshrinkablePayload(t *testing.T) { turn := newTurn(context.Background(), nil, nil, nil) turn.initialEventID = id.EventID("$event-too-large") turn.networkMessageID = "msg-too-large" + turn.SetFinalEditPayload(&FinalEditPayload{ + Content: &event.MessageEventContent{ + MsgType: event.MsgText, + Body: "done", + Mentions: &event.Mentions{ + UserIDs: []id.UserID{ + id.UserID("@" + strings.Repeat("x", MaxMatrixEventContentBytes) + ":test"), + }, + }, + }, + }) + + target, edit := turn.buildFinalEdit() + if target != "" || edit != nil { + t.Fatalf("expected oversized final edit to be skipped, got target=%q edit=%#v", target, edit) + } +} + +func TestTurnBuildFinalEditFallsBackToTextOnlyPayload(t *testing.T) { + turn := newTurn(context.Background(), nil, nil, nil) + turn.initialEventID = id.EventID("$event-fallback") + turn.networkMessageID = "msg-fallback" turn.SetFinalEditPayload(&FinalEditPayload{ Content: &event.MessageEventContent{ MsgType: event.MsgText, Body: "done", }, + Extra: map[string]any{ + matrixevents.BeeperAIKey: map[string]any{ + "id": "turn-1", + }, + }, TopLevelExtra: map[string]any{ "com.beeper.dont_render_edited": true, "huge": strings.Repeat("x", MaxMatrixEventContentBytes), @@ -646,8 +673,79 @@ func TestTurnBuildFinalEditSkipsUnshrinkablePayload(t *testing.T) { }) target, edit := turn.buildFinalEdit() - if target != "" || edit != nil { - t.Fatalf("expected oversized final edit to be skipped, got target=%q edit=%#v", target, edit) + if target != "msg-fallback" { + t.Fatalf("expected fallback edit target msg-fallback, got %q", target) + } + if edit == nil || len(edit.ModifiedParts) != 1 { + t.Fatalf("expected single modified part, got %#v", edit) + } + if got := edit.ModifiedParts[0].Content.Body; got != "done" { + t.Fatalf("expected fallback to preserve visible body, got %q", got) + } + if _, ok := edit.ModifiedParts[0].Extra[matrixevents.BeeperAIKey]; ok { + t.Fatalf("expected text-only fallback to strip extra metadata, got %#v", edit.ModifiedParts[0].Extra) + } + if _, ok := edit.ModifiedParts[0].TopLevelExtra["com.beeper.dont_render_edited"]; ok { + t.Fatalf("expected text-only fallback to drop optional top-level metadata, got %#v", edit.ModifiedParts[0].TopLevelExtra) + } + gotRelatesTo, ok := edit.ModifiedParts[0].TopLevelExtra["m.relates_to"].(*event.RelatesTo) + if !ok { + t.Fatalf("expected fallback edit to restore replace relation, got %#v", edit.ModifiedParts[0].TopLevelExtra) + } + if gotRelatesTo.EventID != id.EventID("$event-fallback") || gotRelatesTo.Type != event.RelReplace { + t.Fatalf("expected replace relation for original event, got %#v", gotRelatesTo) + } +} + +func TestTurnFinalizationContextPrefersActiveTurnContext(t *testing.T) { + type ctxKey string + const key ctxKey = "source" + + parent := context.WithValue(context.Background(), key, "turn") + turn := newTurn(parent, nil, nil, nil) + + got := turn.finalizationContext() + if got == nil { + t.Fatal("expected finalization context") + } + if got != turn.Context() { + t.Fatal("expected active turn context to be reused for finalization") + } + if got.Value(key) != "turn" { + t.Fatalf("expected turn context value, got %#v", got.Value(key)) + } +} + +func TestTurnFinalizationContextFallsBackToBridgeBackground(t *testing.T) { + type ctxKey string + const key ctxKey = "source" + + bridgeCtx := context.WithValue(context.Background(), key, "bridge") + parent, cancel := context.WithCancel(context.WithValue(context.Background(), key, "parent")) + login := &bridgev2.UserLogin{ + UserLogin: &database.UserLogin{ID: "login-1"}, + Bridge: &bridgev2.Bridge{BackgroundCtx: bridgeCtx}, + } + conv := newConversation(parent, &bridgev2.Portal{Portal: &database.Portal{}}, login, bridgev2.EventSender{}, nil) + turn := newTurn(parent, conv, nil, nil) + + cancel() + if turn.Context().Err() == nil { + t.Fatal("expected turn context to be cancelled") + } + + got := turn.finalizationContext() + if got == nil { + t.Fatal("expected fallback finalization context") + } + if got.Err() != nil { + t.Fatalf("expected active fallback context, got err=%v", got.Err()) + } + if got == turn.Context() { + t.Fatal("expected fallback context instead of cancelled turn context") + } + if got.Value(key) != "bridge" { + t.Fatalf("expected bridge background context, got %#v", got.Value(key)) } } From e4bfdc56a6c4a3389fe5d27fa820f89a609ec710 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?batuhan=20i=C3=A7=C3=B6z?= Date: Tue, 7 Apr 2026 14:18:20 +0200 Subject: [PATCH 06/11] Preserve parent context in agent loop timeout Use the caller-provided context when creating the agent loop inactivity timeout instead of oc.backgroundContext(ctx), so parent cancellations propagate to the derived context. Add TestWithAgentLoopInactivityTimeoutPreservesParentCancellation to verify the derived context cancels and reports context.Canceled when the parent is cancelled. Changes in bridges/ai/agent_loop_runtime.go and bridges/ai/agent_loop_test.go. --- bridges/ai/agent_loop_runtime.go | 2 +- bridges/ai/agent_loop_test.go | 20 ++++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/bridges/ai/agent_loop_runtime.go b/bridges/ai/agent_loop_runtime.go index 7fe21a96..b7f69f93 100644 --- a/bridges/ai/agent_loop_runtime.go +++ b/bridges/ai/agent_loop_runtime.go @@ -82,7 +82,7 @@ func agentLoopInactivityCause(ctx context.Context) error { } func (oc *AIClient) withAgentLoopInactivityTimeout(ctx context.Context) (context.Context, context.CancelFunc) { - runCtx, touch, cancel := withActivityTimeout(oc.backgroundContext(ctx), agentLoopInactivityTimeout, errAgentLoopInactivityTimeout) + runCtx, touch, cancel := withActivityTimeout(ctx, agentLoopInactivityTimeout, errAgentLoopInactivityTimeout) return withAgentLoopActivity(runCtx, touch), cancel } diff --git a/bridges/ai/agent_loop_test.go b/bridges/ai/agent_loop_test.go index e29c142a..4f51ebf7 100644 --- a/bridges/ai/agent_loop_test.go +++ b/bridges/ai/agent_loop_test.go @@ -157,3 +157,23 @@ func TestWithActivityTimeoutResetsOnTouchAndCancelsOnIdle(t *testing.T) { t.Fatalf("expected inactivity timeout cause, got %v", context.Cause(ctx)) } } + +func TestWithAgentLoopInactivityTimeoutPreservesParentCancellation(t *testing.T) { + parent, parentCancel := context.WithCancel(context.Background()) + client := &AIClient{} + + ctx, cancel := client.withAgentLoopInactivityTimeout(parent) + defer cancel() + + parentCancel() + + select { + case <-ctx.Done(): + case <-time.After(100 * time.Millisecond): + t.Fatal("expected derived context to cancel when parent is cancelled") + } + + if !errors.Is(context.Cause(ctx), context.Canceled) { + t.Fatalf("expected cancelled cause from parent, got %v", context.Cause(ctx)) + } +} From fa2e603abbffef6e4da3b6a44bdcfb9a37d9e84f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?batuhan=20i=C3=A7=C3=B6z?= Date: Tue, 7 Apr 2026 14:28:49 +0200 Subject: [PATCH 07/11] Suppress context.Canceled logs; use SDK UI helpers Silence noisy logging/processing for context.Canceled across AI bridge code by returning early in setModelTyping, savePortalQuiet, and StreamSession.logWarn. Replace local UI helper wrappers with SDK-provided helpers for building compact final UI messages and default final-edit extras, and update tests accordingly. Simplify agent loop finalization by removing the errorExit branching and guarding FinalizeAgentLoop to avoid double-finalization; also remove special-case notification for canceled agent loops. Add a unit test to ensure context.Canceled is suppressed in logWarn and adjust imports where needed. --- bridges/ai/agent_loop_runtime.go | 9 +-------- bridges/ai/handleai.go | 3 +++ bridges/ai/handlematrix.go | 3 +++ bridges/ai/response_finalization.go | 10 ++-------- bridges/ai/response_finalization_test.go | 8 ++++---- bridges/ai/response_retry.go | 6 ------ bridges/ai/streaming_executor.go | 18 +++--------------- bridges/ai/streaming_responses_api.go | 3 +++ bridges/ai/streaming_ui_helpers.go | 4 ---- sdk/final_edit.go | 24 ++++++++++++------------ sdk/final_edit_test.go | 6 ++++-- turns/session.go | 3 +++ turns/session_target_test.go | 17 +++++++++++++++++ 13 files changed, 55 insertions(+), 59 deletions(-) diff --git a/bridges/ai/agent_loop_runtime.go b/bridges/ai/agent_loop_runtime.go index b7f69f93..4132c8ab 100644 --- a/bridges/ai/agent_loop_runtime.go +++ b/bridges/ai/agent_loop_runtime.go @@ -57,13 +57,6 @@ func withActivityTimeout(parent context.Context, timeout time.Duration, timeoutE return ctx, touch, func() { cancel(context.Canceled) } } -func withAgentLoopActivity(ctx context.Context, touch func()) context.Context { - if touch == nil { - return ctx - } - return context.WithValue(ctx, agentLoopActivityKey{}, touch) -} - func touchAgentLoopActivity(ctx context.Context) { if touch, ok := ctx.Value(agentLoopActivityKey{}).(func()); ok && touch != nil { touch() @@ -83,7 +76,7 @@ func agentLoopInactivityCause(ctx context.Context) error { func (oc *AIClient) withAgentLoopInactivityTimeout(ctx context.Context) (context.Context, context.CancelFunc) { runCtx, touch, cancel := withActivityTimeout(ctx, agentLoopInactivityTimeout, errAgentLoopInactivityTimeout) - return withAgentLoopActivity(runCtx, touch), cancel + return context.WithValue(runCtx, agentLoopActivityKey{}, touch), cancel } func runAgentLoopStreamStep[T any]( diff --git a/bridges/ai/handleai.go b/bridges/ai/handleai.go index 98ab2db6..02033157 100644 --- a/bridges/ai/handleai.go +++ b/bridges/ai/handleai.go @@ -177,6 +177,9 @@ func (oc *AIClient) setModelTyping(ctx context.Context, portal *bridgev2.Portal, timeout = 0 // Zero timeout stops typing } if err := intent.MarkTyping(ctx, portal.MXID, bridgev2.TypingTypeText, timeout); err != nil { + if errors.Is(err, context.Canceled) { + return + } oc.loggerForContext(ctx).Warn().Err(err).Bool("typing", typing).Msg("Failed to set typing indicator") } } diff --git a/bridges/ai/handlematrix.go b/bridges/ai/handlematrix.go index d1864069..3301ad1f 100644 --- a/bridges/ai/handlematrix.go +++ b/bridges/ai/handlematrix.go @@ -929,6 +929,9 @@ func (oc *AIClient) handleTextFileMessage( // savePortalQuiet saves portal and logs errors without failing func (oc *AIClient) savePortalQuiet(ctx context.Context, portal *bridgev2.Portal, action string) { if err := portal.Save(ctx); err != nil { + if errors.Is(err, context.Canceled) { + return + } oc.loggerForContext(ctx).Warn().Err(err).Str("action", action).Msg("Failed to save portal") } } diff --git a/bridges/ai/response_finalization.go b/bridges/ai/response_finalization.go index a039c0f5..321d01d8 100644 --- a/bridges/ai/response_finalization.go +++ b/bridges/ai/response_finalization.go @@ -543,9 +543,9 @@ func (oc *AIClient) sendFinalAssistantTurnContent(ctx context.Context, portal *b } linkPreviews := generateOutboundLinkPreviews(ctx, rendered.Body, intent, portal, sourceCitations, getLinkPreviewConfig(&oc.connector.Config)) - uiMessage := buildCompactFinalUIMessage(oc.buildStreamUIMessage(state, meta, linkPreviews)) + uiMessage := sdk.BuildCompactFinalUIMessage(oc.buildStreamUIMessage(state, meta, linkPreviews)) - topLevelExtra := buildFinalEditTopLevelExtra() + topLevelExtra := sdk.BuildDefaultFinalEditTopLevelExtra() if state != nil && state.turn != nil { finalTopLevelExtra := topLevelExtra if len(uiMessage) > 0 || len(linkPreviews) > 0 { @@ -579,12 +579,6 @@ func (oc *AIClient) sendFinalAssistantTurnContent(ctx context.Context, portal *b } } -func buildFinalEditTopLevelExtra() map[string]any { - return map[string]any{ - "com.beeper.dont_render_edited": true, - } -} - // generateOutboundLinkPreviews extracts URLs from AI response text, generates link previews, and uploads images to Matrix. // When citations are provided (e.g. from Exa search results), matching URLs use the citation's // image directly instead of fetching the page's HTML. diff --git a/bridges/ai/response_finalization_test.go b/bridges/ai/response_finalization_test.go index e00e3147..4cd2aa30 100644 --- a/bridges/ai/response_finalization_test.go +++ b/bridges/ai/response_finalization_test.go @@ -46,7 +46,7 @@ func TestBuildFinalEditUIMessage_IncludesSourceAndFileParts(t *testing.T) { streamui.ApplyChunk(state.turn.UIState(), map[string]any{"type": "text-delta", "id": "text-1", "delta": "hello"}) streamui.ApplyChunk(state.turn.UIState(), map[string]any{"type": "text-end", "id": "text-1"}) - ui := buildCompactFinalUIMessage(oc.buildStreamUIMessage(state, modelModeTestMeta("openai/gpt-4.1"), nil)) + ui := bridgesdk.BuildCompactFinalUIMessage(oc.buildStreamUIMessage(state, modelModeTestMeta("openai/gpt-4.1"), nil)) if ui == nil { t.Fatalf("expected final edit UI message") } @@ -118,7 +118,7 @@ func TestBuildFinalEditUIMessage_OmitsTextButKeepsReasoningAndToolPartsWhenTheyF streamui.ApplyChunk(state.turn.UIState(), map[string]any{"type": "reasoning-delta", "id": "reasoning-2", "delta": "thinking"}) streamui.ApplyChunk(state.turn.UIState(), map[string]any{"type": "reasoning-end", "id": "reasoning-2"}) - ui := buildCompactFinalUIMessage(oc.buildStreamUIMessage(state, modelModeTestMeta("openai/gpt-4.1"), nil)) + ui := bridgesdk.BuildCompactFinalUIMessage(oc.buildStreamUIMessage(state, modelModeTestMeta("openai/gpt-4.1"), nil)) parts, _ := ui["parts"].([]any) foundReasoning := false foundTool := false @@ -156,7 +156,7 @@ func TestBuildFinalEditUIMessage_UsesNestedUsageContextLimitFromSnapshot(t *test streamui.ApplyChunk(state.turn.UIState(), map[string]any{"type": "text-delta", "id": "text-usage", "delta": "hello"}) streamui.ApplyChunk(state.turn.UIState(), map[string]any{"type": "text-end", "id": "text-usage"}) - ui := buildCompactFinalUIMessage(oc.buildStreamUIMessage(state, modelModeTestMeta("openai/gpt-4.1"), nil)) + ui := bridgesdk.BuildCompactFinalUIMessage(oc.buildStreamUIMessage(state, modelModeTestMeta("openai/gpt-4.1"), nil)) metadata, ok := ui["metadata"].(map[string]any) if !ok { t.Fatalf("expected metadata map, got %T", ui["metadata"]) @@ -198,7 +198,7 @@ func TestBuildFinalEditTopLevelExtra_KeepsOnlyEditMetadata(t *testing.T) { MatchedURL: "https://example.com", }} - extra := buildFinalEditTopLevelExtra() + extra := bridgesdk.BuildDefaultFinalEditTopLevelExtra() if _, ok := extra["body"]; ok { t.Fatalf("expected body fallback to come from Matrix edit content, got %#v", extra["body"]) diff --git a/bridges/ai/response_retry.go b/bridges/ai/response_retry.go index b13fb330..601d2766 100644 --- a/bridges/ai/response_retry.go +++ b/bridges/ai/response_retry.go @@ -363,12 +363,6 @@ func (oc *AIClient) runAgentLoopWithRetry( if success || err == nil { return } - if errors.Is(err, context.Canceled) { - if timeoutErr := agentLoopInactivityCause(ctx); timeoutErr != nil { - oc.notifyMatrixSendFailure(ctx, portal, evt, timeoutErr) - } - return - } oc.notifyMatrixSendFailure(ctx, portal, evt, err) } diff --git a/bridges/ai/streaming_executor.go b/bridges/ai/streaming_executor.go index 09228c4c..4c7da413 100644 --- a/bridges/ai/streaming_executor.go +++ b/bridges/ai/streaming_executor.go @@ -84,7 +84,7 @@ func executeAgentLoopRounds( continueLoop, cle, err := provider.RunAgentTurn(ctx, evt, round) touchAgentLoopActivity(ctx) if cle != nil || err != nil { - finalizeAgentLoopExit(ctx, provider, true) + finalizeAgentLoopExit(ctx, provider) return false, cle, err } if continueLoop { @@ -93,26 +93,14 @@ func executeAgentLoopRounds( // Queued user messages are dispatched after room release via processPendingQueue. // Finalize this turn immediately so later prompts cannot reopen it with more edits. - finalizeAgentLoopExit(ctx, provider, false) + finalizeAgentLoopExit(ctx, provider) return true, nil, nil } } -func finalizeAgentLoopExit(ctx context.Context, provider agentLoopProvider, errorExit bool) { +func finalizeAgentLoopExit(ctx context.Context, provider agentLoopProvider) { if provider == nil { return } - if errorExit { - switch p := provider.(type) { - case *chatCompletionsTurnAdapter: - if p != nil && p.state != nil && p.state.completedAtMs != 0 { - return - } - case *responsesTurnAdapter: - if p != nil && p.state != nil && p.state.completedAtMs != 0 { - return - } - } - } provider.FinalizeAgentLoop(ctx) } diff --git a/bridges/ai/streaming_responses_api.go b/bridges/ai/streaming_responses_api.go index 4b0f0919..1bd9907a 100644 --- a/bridges/ai/streaming_responses_api.go +++ b/bridges/ai/streaming_responses_api.go @@ -176,6 +176,9 @@ func (a *responsesTurnAdapter) RunAgentTurn( } func (a *responsesTurnAdapter) FinalizeAgentLoop(ctx context.Context) { + if a.state == nil || a.state.completedAtMs != 0 { + return + } a.oc.finalizeResponsesStream(ctx, a.log, a.portal, a.state, a.meta) } diff --git a/bridges/ai/streaming_ui_helpers.go b/bridges/ai/streaming_ui_helpers.go index 5f1dd7f4..b75d2413 100644 --- a/bridges/ai/streaming_ui_helpers.go +++ b/bridges/ai/streaming_ui_helpers.go @@ -67,10 +67,6 @@ func (oc *AIClient) buildStreamUIMessage(state *streamingState, meta *PortalMeta return sdk.UIMessageFromTurnData(turnData) } -func buildCompactFinalUIMessage(uiMessage map[string]any) map[string]any { - return sdk.BuildCompactFinalUIMessage(uiMessage) -} - func shouldContinueChatToolLoop(finishReason string, toolCallCount int) bool { if toolCallCount <= 0 { return false diff --git a/sdk/final_edit.go b/sdk/final_edit.go index 555b1a70..1be85a82 100644 --- a/sdk/final_edit.go +++ b/sdk/final_edit.go @@ -150,7 +150,7 @@ type FinalEditFitDetails struct { } func (d FinalEditFitDetails) Changed() bool { - return d.ClearedFormattedBody || d.DroppedLinkPreviews || d.CompactedUIMessage || d.DroppedUIMessage || d.DroppedExtra || d.TrimmedBody + return d.Summary() != "" } func (d FinalEditFitDetails) Summary() string { @@ -228,18 +228,16 @@ func estimateFinalEditContentSize(payload *FinalEditPayload, target id.EventID) } func FitFinalEditPayload(payload *FinalEditPayload, target id.EventID) (*FinalEditPayload, FinalEditFitDetails, error) { - fitted := cloneFinalEditPayload(payload) - if fitted == nil || fitted.Content == nil { - return fitted, FinalEditFitDetails{}, nil - } - details := FinalEditFitDetails{ - OriginalSize: estimateFinalEditContentSize(fitted, target), + if payload == nil || payload.Content == nil { + return payload, FinalEditFitDetails{}, nil } - size := details.OriginalSize - if size <= MaxMatrixEventContentBytes { - details.FinalSize = size - return fitted, details, nil + initialSize := estimateFinalEditContentSize(payload, target) + if initialSize <= MaxMatrixEventContentBytes { + return payload, FinalEditFitDetails{OriginalSize: initialSize, FinalSize: initialSize}, nil } + fitted := cloneFinalEditPayload(payload) + details := FinalEditFitDetails{OriginalSize: initialSize} + size := initialSize if fitted.Content.Format != "" || fitted.Content.FormattedBody != "" { fitted.Content.Format = "" @@ -283,6 +281,7 @@ func FitFinalEditPayload(payload *FinalEditPayload, target id.EventID) (*FinalEd if size > MaxMatrixEventContentBytes && fitted.Content != nil && fitted.Content.Body != "" { originalBody := fitted.Content.Body best := strings.TrimSpace(originalBody) + bestSize := size low, high := 1, len(originalBody) for low <= high { mid := (low + high) / 2 @@ -296,6 +295,7 @@ func FitFinalEditPayload(payload *FinalEditPayload, target id.EventID) (*FinalEd candidateSize := estimateFinalEditContentSize(fitted, target) if candidateSize <= MaxMatrixEventContentBytes { best = candidate + bestSize = candidateSize low = mid + 1 } else { high = mid - 1 @@ -303,7 +303,7 @@ func FitFinalEditPayload(payload *FinalEditPayload, target id.EventID) (*FinalEd } fitted.Content.Body = best details.TrimmedBody = best != strings.TrimSpace(originalBody) - size = estimateFinalEditContentSize(fitted, target) + size = bestSize } details.FinalSize = size if size > MaxMatrixEventContentBytes { diff --git a/sdk/final_edit_test.go b/sdk/final_edit_test.go index 1d83ee5f..ccee4ae1 100644 --- a/sdk/final_edit_test.go +++ b/sdk/final_edit_test.go @@ -71,8 +71,10 @@ func TestFitFinalEditPayloadCompactsOptionalMetadataFirst(t *testing.T) { func TestFitFinalEditPayloadDeepClonesNestedMaps(t *testing.T) { payload := &FinalEditPayload{ Content: &event.MessageEventContent{ - MsgType: event.MsgText, - Body: "done", + MsgType: event.MsgText, + Body: "done", + Format: event.FormatHTML, + FormattedBody: strings.Repeat("

x

", MaxMatrixEventContentBytes/4), }, Extra: map[string]any{ "nested": map[string]any{ diff --git a/turns/session.go b/turns/session.go index 854081b0..afa2d9cf 100644 --- a/turns/session.go +++ b/turns/session.go @@ -419,6 +419,9 @@ func (s *StreamSession) logWarn(reason string, err error, kv ...any) { if s == nil || s.params.Logger == nil { return } + if errors.Is(err, context.Canceled) { + return + } logEvt := s.params.Logger.Warn().Str("reason", reason) if err != nil { logEvt = logEvt.Err(err) diff --git a/turns/session_target_test.go b/turns/session_target_test.go index 9daf1b20..9f2651f1 100644 --- a/turns/session_target_test.go +++ b/turns/session_target_test.go @@ -1,9 +1,11 @@ package turns import ( + "bytes" "context" "testing" + "github.com/rs/zerolog" "maunium.net/go/mautrix/bridgev2" "maunium.net/go/mautrix/event" "maunium.net/go/mautrix/id" @@ -416,3 +418,18 @@ func TestStreamSessionCurrentTargetFallsBackToStartedTarget(t *testing.T) { t.Fatalf("expected no buffered parts after fallback publish, got %d", pendingPartCount(session)) } } + +func TestStreamSessionLogWarnSuppressesContextCanceled(t *testing.T) { + var buf bytes.Buffer + logger := zerolog.New(&buf) + session := NewStreamSession(StreamSessionParams{ + TurnID: "turn-cancelled", + Logger: &logger, + }) + + session.logWarn("stream_publish_failed", context.Canceled, "seq", 1) + + if buf.Len() != 0 { + t.Fatalf("expected no log output for context cancellation, got %q", buf.String()) + } +} From b5125b5ee0c6f230d448488dd2c8ed90e45a371e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?batuhan=20i=C3=A7=C3=B6z?= Date: Tue, 7 Apr 2026 14:40:01 +0200 Subject: [PATCH 08/11] Update final_edit_test.go --- sdk/final_edit_test.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/sdk/final_edit_test.go b/sdk/final_edit_test.go index ccee4ae1..b1a46e4f 100644 --- a/sdk/final_edit_test.go +++ b/sdk/final_edit_test.go @@ -74,7 +74,7 @@ func TestFitFinalEditPayloadDeepClonesNestedMaps(t *testing.T) { MsgType: event.MsgText, Body: "done", Format: event.FormatHTML, - FormattedBody: strings.Repeat("

x

", MaxMatrixEventContentBytes/4), + FormattedBody: strings.Repeat("x", MaxMatrixEventContentBytes*2), }, Extra: map[string]any{ "nested": map[string]any{ @@ -88,14 +88,20 @@ func TestFitFinalEditPayloadDeepClonesNestedMaps(t *testing.T) { }, } - fitted, _, err := FitFinalEditPayload(payload, id.EventID("$event-clone")) + fitted, details, err := FitFinalEditPayload(payload, id.EventID("$event-clone")) if err != nil { - t.Fatalf("expected fit to succeed, got %v", err) + t.Fatalf("expected fit to succeed, got %v; details: %+v", err, details) } if fitted == nil || fitted.Content == nil { t.Fatal("expected fitted payload") } + t.Logf("details: %+v", details) + t.Logf("fitted.Extra: %v", fitted.Extra) + t.Logf("fitted.TopLevelExtra: %v", fitted.TopLevelExtra) + if fitted.Extra == nil { + t.Fatal("expected Extra to survive fitting") + } fitted.Extra["nested"].(map[string]any)["value"] = "changed" fitted.TopLevelExtra["outer"].(map[string]any)["flag"] = false From c8618852029235204720e80563577548126118a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?batuhan=20i=C3=A7=C3=B6z?= Date: Tue, 7 Apr 2026 15:03:58 +0200 Subject: [PATCH 09/11] Clone Extra and reorder final edit send Shallow-clone FinalEditPayload.Extra before embedding it into event content to avoid in-place mutation by mautrix's Content.MarshalJSON (which deep-merges Parsed into Raw). Cleaned up the corresponding test to stop inspecting/logging fit details. Also move sendFinalEdit to occur after session.End in Turn.finalizeTurn so the session teardown runs before the final edit is sent; persisting the final message remains unchanged. --- sdk/final_edit.go | 4 +++- sdk/final_edit_test.go | 10 ++-------- sdk/turn.go | 14 +++++++++++++- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/sdk/final_edit.go b/sdk/final_edit.go index 1be85a82..5a71966f 100644 --- a/sdk/final_edit.go +++ b/sdk/final_edit.go @@ -215,7 +215,9 @@ func estimateFinalEditContentSize(payload *FinalEditPayload, target id.EventID) raw = map[string]any{} } if payload.Extra != nil { - raw["m.new_content"] = payload.Extra + // Shallow-clone Extra to avoid mutation by mautrix's Content.MarshalJSON + // which deep-merges Parsed into Raw in-place via mergeMaps. + raw["m.new_content"] = maps.Clone(payload.Extra) } data, err := json.Marshal(&event.Content{ Parsed: &content, diff --git a/sdk/final_edit_test.go b/sdk/final_edit_test.go index b1a46e4f..69c19cee 100644 --- a/sdk/final_edit_test.go +++ b/sdk/final_edit_test.go @@ -88,20 +88,14 @@ func TestFitFinalEditPayloadDeepClonesNestedMaps(t *testing.T) { }, } - fitted, details, err := FitFinalEditPayload(payload, id.EventID("$event-clone")) + fitted, _, err := FitFinalEditPayload(payload, id.EventID("$event-clone")) if err != nil { - t.Fatalf("expected fit to succeed, got %v; details: %+v", err, details) + t.Fatalf("expected fit to succeed, got %v", err) } if fitted == nil || fitted.Content == nil { t.Fatal("expected fitted payload") } - t.Logf("details: %+v", details) - t.Logf("fitted.Extra: %v", fitted.Extra) - t.Logf("fitted.TopLevelExtra: %v", fitted.TopLevelExtra) - if fitted.Extra == nil { - t.Fatal("expected Extra to survive fitting") - } fitted.Extra["nested"].(map[string]any)["value"] = "changed" fitted.TopLevelExtra["outer"].(map[string]any)["flag"] = false diff --git a/sdk/turn.go b/sdk/turn.go index 9d870940..bc60e97e 100644 --- a/sdk/turn.go +++ b/sdk/turn.go @@ -138,6 +138,7 @@ type Turn struct { placeholderPayload *PlaceholderMessagePayload finalEditPayload *FinalEditPayload sendFunc func(ctx context.Context) (id.EventID, networkid.MessageID, error) + sendFinalEditFunc func(ctx context.Context) suppressSend bool suppressFinalEdit bool idleTimer *time.Timer @@ -752,6 +753,17 @@ func (t *Turn) sendFinalEdit(ctx context.Context) { } } +func (t *Turn) dispatchFinalEdit(ctx context.Context) { + if t == nil { + return + } + if t.sendFinalEditFunc != nil { + t.sendFinalEditFunc(ctx) + return + } + t.sendFinalEdit(ctx) +} + func supportedBaseMetadataFromMap(metadata map[string]any) agentremote.BaseMessageMetadata { if len(metadata) == 0 { return agentremote.BaseMessageMetadata{} @@ -837,10 +849,10 @@ func (t *Turn) finalizeTurn(endReason turns.EndReason, finishReason, fallbackBod finalCtx := t.finalizationContext() t.flushPendingStream(finalCtx) t.ensureDefaultFinalEditPayload(finishReason, fallbackBody) - t.sendFinalEdit(finalCtx) if t.session != nil { t.session.End(finalCtx, endReason) } + t.dispatchFinalEdit(finalCtx) t.persistFinalMessage(finishReason) } From b775ea0804ce8f8740586f80092ff9f2d99773c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?batuhan=20i=C3=A7=C3=B6z?= Date: Tue, 7 Apr 2026 15:09:57 +0200 Subject: [PATCH 10/11] Handle Matrix chat deletion; clear live stream Add support for Matrix chat deletion cleanup and ensure live-stream descriptors are cleared on final edits. - Introduce HandleMatrixDeleteChat in bridges/ai/delete_chat.go to best-effort clean up runtime state (drain queues, stop runs/typing, release room), remove persisted session artifacts, forget portal references in login metadata, and notify session mutations. - Wire up AIClient to implement DeleteChatHandlingNetworkAPI. - Add clearSystemEventsForSession to bridges/ai/system_events.go to drop in-memory system events for a session. - In sdk/turn.go, clear the BeeperStream descriptor and com.beeper.stream extras when a session exists so terminal edits don't appear as active streams. - Add tests in sdk/turn_test.go to verify stream sessions are closed before dispatching final edits and that stream descriptors are cleared in final edits. These changes prevent sending status/stop notices to deleted rooms, remove stale persisted state, and ensure final edited events no longer look like active streaming placeholders. --- bridges/ai/client.go | 1 + bridges/ai/delete_chat.go | 117 ++++++++++++++++++++++++++++++++++++ bridges/ai/system_events.go | 10 +++ sdk/turn.go | 7 +++ sdk/turn_test.go | 61 +++++++++++++++++++ 5 files changed, 196 insertions(+) create mode 100644 bridges/ai/delete_chat.go diff --git a/bridges/ai/client.go b/bridges/ai/client.go index a9791c1b..bcecc24b 100644 --- a/bridges/ai/client.go +++ b/bridges/ai/client.go @@ -40,6 +40,7 @@ var ( _ bridgev2.EditHandlingNetworkAPI = (*AIClient)(nil) _ bridgev2.ReactionHandlingNetworkAPI = (*AIClient)(nil) _ bridgev2.RedactionHandlingNetworkAPI = (*AIClient)(nil) + _ bridgev2.DeleteChatHandlingNetworkAPI = (*AIClient)(nil) _ bridgev2.DisappearTimerChangingNetworkAPI = (*AIClient)(nil) _ bridgev2.TypingHandlingNetworkAPI = (*AIClient)(nil) _ bridgev2.ReadReceiptHandlingNetworkAPI = (*AIClient)(nil) diff --git a/bridges/ai/delete_chat.go b/bridges/ai/delete_chat.go new file mode 100644 index 00000000..5fb3ceb5 --- /dev/null +++ b/bridges/ai/delete_chat.go @@ -0,0 +1,117 @@ +package ai + +import ( + "context" + "strings" + + "maunium.net/go/mautrix/bridgev2" + "maunium.net/go/mautrix/id" +) + +// HandleMatrixDeleteChat best-effort cleans up AI-room runtime and persisted +// state when Matrix deletes the chat. The core bridge handles the actual room +// and portal deletion. +func (oc *AIClient) HandleMatrixDeleteChat(ctx context.Context, msg *bridgev2.MatrixDeleteChat) error { + if oc == nil || msg == nil || msg.Portal == nil { + return nil + } + + portal := msg.Portal + meta := portalMeta(portal) + roomID := portal.MXID + sessionKey := strings.TrimSpace(roomID.String()) + + if roomID != "" { + oc.cleanupDeletedRoomRuntime(ctx, roomID) + } + if sessionKey != "" { + oc.deletePersistedSessionArtifacts(ctx, sessionKey) + } + oc.forgetDeletedPortalReferences(ctx, portal) + + if meta != nil { + oc.notifySessionMutation(ctx, portal, meta, false) + } + return nil +} + +func (oc *AIClient) cleanupDeletedRoomRuntime(ctx context.Context, roomID id.RoomID) { + if oc == nil || roomID == "" { + return + } + + // Room deletion should be silent; drop queued work instead of sending stop + // notices/status events into a room that's being removed. + _ = oc.drainPendingQueue(roomID) + oc.stopSubagentRuns(ctx, roomID) + oc.stopQueueTyping(roomID) + oc.releaseRoom(roomID) + + oc.groupHistoryMu.Lock() + delete(oc.groupHistoryBuffers, roomID) + oc.groupHistoryMu.Unlock() + + oc.userTypingMu.Lock() + delete(oc.userTypingState, roomID) + oc.userTypingMu.Unlock() + + ackReactionStoreMu.Lock() + delete(ackReactionStore, roomID) + ackReactionStoreMu.Unlock() +} + +func (oc *AIClient) deletePersistedSessionArtifacts(ctx context.Context, sessionKey string) { + if oc == nil { + return + } + sessionKey = strings.TrimSpace(sessionKey) + if sessionKey == "" { + return + } + + db, bridgeID, loginID := loginDBContext(oc) + if db != nil && bridgeID != "" && loginID != "" { + bestEffortExec(ctx, db, oc.Log(), + `DELETE FROM agentremote_sessions WHERE bridge_id=$1 AND login_id=$2 AND session_key=$3`, + bridgeID, loginID, sessionKey, + ) + bestEffortExec(ctx, db, oc.Log(), + `DELETE FROM aichats_system_events WHERE bridge_id=$1 AND login_id=$2 AND session_key=$3`, + bridgeID, loginID, sessionKey, + ) + } + + clearSystemEventsForSession(systemEventsOwnerKey(oc), sessionKey) +} + +func (oc *AIClient) forgetDeletedPortalReferences(ctx context.Context, portal *bridgev2.Portal) { + if oc == nil || oc.UserLogin == nil || portal == nil { + return + } + + loginMeta := loginMetadata(oc.UserLogin) + if loginMeta == nil { + return + } + + changed := false + roomID := strings.TrimSpace(portal.MXID.String()) + portalID := strings.TrimSpace(string(portal.PortalKey.ID)) + + if portalID != "" && loginMeta.DefaultChatPortalID == portalID { + loginMeta.DefaultChatPortalID = "" + changed = true + } + if roomID != "" && len(loginMeta.LastActiveRoomByAgent) > 0 { + for agentID, activeRoomID := range loginMeta.LastActiveRoomByAgent { + if activeRoomID == roomID { + delete(loginMeta.LastActiveRoomByAgent, agentID) + changed = true + } + } + } + + if changed { + _ = oc.UserLogin.Save(ctx) + } +} diff --git a/bridges/ai/system_events.go b/bridges/ai/system_events.go index ee79f3b0..1537bed0 100644 --- a/bridges/ai/system_events.go +++ b/bridges/ai/system_events.go @@ -119,6 +119,16 @@ func hasSystemEvents(ownerKey string, sessionKey string) bool { return has } +func clearSystemEventsForSession(ownerKey string, sessionKey string) { + key, err := buildSystemEventsMapKey(ownerKey, sessionKey) + if err != nil { + return + } + systemEventsMu.Lock() + delete(systemEvents, key) + systemEventsMu.Unlock() +} + func buildSystemEventsMapKey(ownerKey string, sessionKey string) (string, error) { owner := strings.TrimSpace(ownerKey) key, err := requireSessionKey(sessionKey) diff --git a/sdk/turn.go b/sdk/turn.go index bc60e97e..1ee6ffde 100644 --- a/sdk/turn.go +++ b/sdk/turn.go @@ -714,6 +714,13 @@ func (t *Turn) buildFinalEdit() (networkid.MessageID, *bridgev2.ConvertedEdit) { if topLevelExtra == nil { topLevelExtra = map[string]any{} } + if t.session != nil { + // Explicitly clear the live-stream descriptor on terminal edits so the + // edited event no longer looks like an active placeholder. + content.BeeperStream = nil + extra["com.beeper.stream"] = nil + topLevelExtra["com.beeper.stream"] = nil + } if t.initialEventID != "" { topLevelExtra["m.relates_to"] = (&event.RelatesTo{}).SetReplace(t.initialEventID) } diff --git a/sdk/turn_test.go b/sdk/turn_test.go index ce4cc9c8..74cc9d99 100644 --- a/sdk/turn_test.go +++ b/sdk/turn_test.go @@ -576,6 +576,34 @@ func TestTurnBuildFinalEditDefaultsToVisibleText(t *testing.T) { } } +func TestTurnFinalizeTurnEndsStreamBeforeDispatchingFinalEdit(t *testing.T) { + turn := newTurn(context.Background(), nil, nil, nil) + turn.session = turns.NewStreamSession(turns.StreamSessionParams{ + TurnID: "turn-finalize-order", + }) + + if turn.session.IsClosed() { + t.Fatal("expected stream session to start open") + } + + finalEditDispatched := false + turn.sendFinalEditFunc = func(context.Context) { + finalEditDispatched = true + if !turn.session.IsClosed() { + t.Fatal("expected stream session to be closed before dispatching final edit") + } + } + + turn.finalizeTurn(turns.EndReasonFinish, "stop", "") + + if !finalEditDispatched { + t.Fatal("expected final edit dispatch hook to run") + } + if !turn.session.IsClosed() { + t.Fatal("expected stream session to remain closed after finalization") + } +} + func TestTurnBuildFinalEditDefaultsToGenericBodyForArtifacts(t *testing.T) { turn := newTurn(context.Background(), nil, nil, nil) turn.initialEventID = id.EventID("$event-artifact") @@ -603,6 +631,39 @@ func TestTurnBuildFinalEditDefaultsToGenericBodyForArtifacts(t *testing.T) { } } +func TestTurnBuildFinalEditClearsStreamDescriptorWhenSessionExists(t *testing.T) { + turn := newTurn(context.Background(), nil, nil, nil) + turn.initialEventID = id.EventID("$event-stream") + turn.networkMessageID = "msg-stream" + turn.session = turns.NewStreamSession(turns.StreamSessionParams{ + TurnID: "turn-stream-clear", + }) + turn.SetFinalEditPayload(&FinalEditPayload{ + Content: &event.MessageEventContent{ + MsgType: event.MsgText, + Body: "done", + }, + }) + + _, edit := turn.buildFinalEdit() + if edit == nil || len(edit.ModifiedParts) != 1 { + t.Fatalf("expected single modified part, got %#v", edit) + } + part := edit.ModifiedParts[0] + if _, ok := part.Extra["com.beeper.stream"]; !ok { + t.Fatalf("expected m.new_content to explicitly clear com.beeper.stream, got %#v", part.Extra) + } + if part.Extra["com.beeper.stream"] != nil { + t.Fatalf("expected com.beeper.stream to be cleared with nil, got %#v", part.Extra["com.beeper.stream"]) + } + if _, ok := part.TopLevelExtra["com.beeper.stream"]; !ok { + t.Fatalf("expected top-level edit content to explicitly clear com.beeper.stream, got %#v", part.TopLevelExtra) + } + if part.TopLevelExtra["com.beeper.stream"] != nil { + t.Fatalf("expected top-level com.beeper.stream to be cleared with nil, got %#v", part.TopLevelExtra["com.beeper.stream"]) + } +} + func TestTurnBuildFinalEditPreservesMentionsInContent(t *testing.T) { turn := newTurn(context.Background(), nil, nil, nil) turn.initialEventID = id.EventID("$event-mentions") From f490865ae113047ebf111c2c2a1e120e641126bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?batuhan=20i=C3=A7=C3=B6z?= Date: Tue, 7 Apr 2026 15:24:57 +0200 Subject: [PATCH 11/11] Finalize streaming state and handle terminal events Add a finalized flag to streamingState (markFinalized/isFinalized) and use it to guard finalization paths across the streaming lifecycle. Ensure streams are closed in runAgentLoopStreamStep and short-circuit when state is nil in finishStreamingWithFailure. Update responsesTurnAdapter/FinaleAgentLoop and chatCompletionsTurnAdapter to use isFinalized checks, make processResponseStreamEvent explicitly handle response.failed and response.incomplete (finalize metadata, log, and return loop-stop), and return on response.completed. Add early-finalize guard in completeStreamingSuccess. Include new tests to cover completed, failed, and finalize behavior. These changes prevent duplicate finalization/races and ensure terminal events stop the loop and close resources correctly. --- bridges/ai/agent_loop_runtime.go | 3 + bridges/ai/streaming_chat_completions.go | 2 +- bridges/ai/streaming_error_handling.go | 6 ++ .../ai/streaming_lifecycle_cluster_test.go | 101 ++++++++++++++++++ bridges/ai/streaming_responses_api.go | 25 ++++- bridges/ai/streaming_state.go | 17 ++- bridges/ai/streaming_success.go | 3 + 7 files changed, 153 insertions(+), 4 deletions(-) diff --git a/bridges/ai/agent_loop_runtime.go b/bridges/ai/agent_loop_runtime.go index 4132c8ab..6d0d24b3 100644 --- a/bridges/ai/agent_loop_runtime.go +++ b/bridges/ai/agent_loop_runtime.go @@ -90,6 +90,9 @@ func runAgentLoopStreamStep[T any]( handleEvent func(T) (done bool, cle *ContextLengthError, err error), handleErr func(error) (cle *ContextLengthError, err error), ) (bool, *ContextLengthError, error) { + if stream != nil { + defer stream.Close() + } writer := state.writer() writer.StepStart(ctx) defer writer.StepFinish(ctx) diff --git a/bridges/ai/streaming_chat_completions.go b/bridges/ai/streaming_chat_completions.go index 6d92f94e..e7256172 100644 --- a/bridges/ai/streaming_chat_completions.go +++ b/bridges/ai/streaming_chat_completions.go @@ -203,7 +203,7 @@ func (a *chatCompletionsTurnAdapter) FinalizeAgentLoop(ctx context.Context) { state := a.state portal := a.portal meta := a.meta - if state == nil || state.completedAtMs != 0 { + if state == nil || state.isFinalized() { return } diff --git a/bridges/ai/streaming_error_handling.go b/bridges/ai/streaming_error_handling.go index 3f50f14d..3d2f0a61 100644 --- a/bridges/ai/streaming_error_handling.go +++ b/bridges/ai/streaming_error_handling.go @@ -40,6 +40,12 @@ func (oc *AIClient) finishStreamingWithFailure( reason string, err error, ) error { + if state == nil { + return err + } + if !state.markFinalized() { + return streamFailureError(state, err) + } if state != nil && state.stop.Load() != nil && reason == "cancelled" { reason = "stop" } diff --git a/bridges/ai/streaming_lifecycle_cluster_test.go b/bridges/ai/streaming_lifecycle_cluster_test.go index 25ae460e..54e09e69 100644 --- a/bridges/ai/streaming_lifecycle_cluster_test.go +++ b/bridges/ai/streaming_lifecycle_cluster_test.go @@ -162,3 +162,104 @@ func TestProcessResponseStreamEventUpdatesCompletedResponseStatus(t *testing.T) t.Fatalf("expected writer metadata to be completed, got %#v", metadata["response_status"]) } } + +func TestProcessResponseStreamEventCompletedSignalsLoopStop(t *testing.T) { + state := newTestStreamingStateWithTurn() + oc := &AIClient{} + + rsc := &responseStreamContext{ + base: &agentLoopProviderBase{ + oc: oc, + log: zerolog.Nop(), + state: state, + }, + } + + done, cle, err := oc.processResponseStreamEvent(context.Background(), rsc, responses.ResponseStreamEventUnion{ + Type: "response.completed", + Response: responses.Response{ + ID: "resp_done", + Status: "completed", + }, + }, false) + if !done { + t.Fatal("expected completed response event to stop the stream loop") + } + if cle != nil { + t.Fatalf("did not expect context-length error, got %#v", cle) + } + if err != nil { + t.Fatalf("did not expect error, got %v", err) + } +} + +func TestResponsesTurnAdapterFinalizeAgentLoopDoesNotSkipTerminalLifecycle(t *testing.T) { + state := newTestStreamingStateWithTurn() + state.turn.SetSuppressSend(true) + state.writer().TextDelta(context.Background(), "done") + state.completedAtMs = 123 + state.finishReason = "stop" + + adapter := &responsesTurnAdapter{ + agentLoopProviderBase: agentLoopProviderBase{ + oc: &AIClient{}, + log: zerolog.Nop(), + state: state, + }, + } + + adapter.FinalizeAgentLoop(context.Background()) + + if !state.isFinalized() { + t.Fatal("expected finalize agent loop to finalize terminal response state") + } + + message := streamui.SnapshotUIMessage(state.turn.UIState()) + metadata, _ := message["metadata"].(map[string]any) + if metadata["finish_reason"] != "stop" { + t.Fatalf("expected finalized UI message finish_reason stop, got %#v", metadata["finish_reason"]) + } +} + +func TestProcessResponseStreamEventFailedFinalizesAsError(t *testing.T) { + state := newTestStreamingStateWithTurn() + state.turn.SetSuppressSend(true) + state.writer().TextDelta(context.Background(), "hello") + oc := &AIClient{} + + rsc := &responseStreamContext{ + base: &agentLoopProviderBase{ + oc: oc, + log: zerolog.Nop(), + state: state, + }, + } + + done, cle, err := oc.processResponseStreamEvent(context.Background(), rsc, responses.ResponseStreamEventUnion{ + Type: "response.failed", + Response: responses.Response{ + ID: "resp_failed", + Status: "failed", + Error: responses.ResponseError{ + Message: "boom", + }, + }, + }, false) + if !done { + t.Fatal("expected failed response event to stop the stream loop") + } + if cle != nil { + t.Fatalf("did not expect context-length error, got %#v", cle) + } + if err == nil { + t.Fatal("expected failed response event to return an error") + } + if !state.isFinalized() { + t.Fatal("expected failed response event to finalize the turn") + } + message := streamui.SnapshotUIMessage(state.turn.UIState()) + metadata, _ := message["metadata"].(map[string]any) + if metadata["finish_reason"] != "error" { + t.Fatalf("expected error finish_reason, got %#v", metadata["finish_reason"]) + } +} diff --git a/bridges/ai/streaming_responses_api.go b/bridges/ai/streaming_responses_api.go index 1bd9907a..a29abed1 100644 --- a/bridges/ai/streaming_responses_api.go +++ b/bridges/ai/streaming_responses_api.go @@ -176,7 +176,7 @@ func (a *responsesTurnAdapter) RunAgentTurn( } func (a *responsesTurnAdapter) FinalizeAgentLoop(ctx context.Context) { - if a.state == nil || a.state.completedAtMs != 0 { + if a.state == nil || a.state.isFinalized() { return } a.oc.finalizeResponsesStream(ctx, a.log, a.portal, a.state, a.meta) @@ -217,9 +217,29 @@ func (oc *AIClient) processResponseStreamEvent( ) switch streamEvent.Type { - case "response.created", "response.queued", "response.in_progress", "response.failed", "response.incomplete": + case "response.created", "response.queued", "response.in_progress": oc.handleResponseLifecycleEvent(ctx, portal, state, meta, streamEvent.Type, streamEvent.Response) + case "response.failed": + oc.handleResponseLifecycleEvent(ctx, portal, state, meta, streamEvent.Type, streamEvent.Response) + state.completedAtMs = time.Now().UnixMilli() + errText := strings.TrimSpace(streamEvent.Response.Error.Message) + if errText == "" { + errText = "response failed" + } + return true, nil, oc.finishStreamingWithFailure(ctx, log, portal, state, meta, "error", errors.New(errText)) + + case "response.incomplete": + oc.handleResponseLifecycleEvent(ctx, portal, state, meta, streamEvent.Type, streamEvent.Response) + state.completedAtMs = time.Now().UnixMilli() + actions.finalizeMetadata() + log.Debug(). + Str("reason", state.finishReason). + Str("response_id", state.responseID). + Str("response_status", state.responseStatus). + Msg("Response stream ended incomplete" + contSuffix) + return true, nil, nil + case "response.output_item.added": actions.outputItemAdded(streamEvent.Item) @@ -377,6 +397,7 @@ func (oc *AIClient) processResponseStreamEvent( } log.Debug().Str("reason", state.finishReason).Str("response_id", state.responseID).Int("images", len(state.pendingImages)). Msg("Response stream completed" + contSuffix) + return true, nil, nil case "error": apiErr := fmt.Errorf("API error: %s", streamEvent.Message) diff --git a/bridges/ai/streaming_state.go b/bridges/ai/streaming_state.go index cb205045..fefce4f7 100644 --- a/bridges/ai/streaming_state.go +++ b/bridges/ai/streaming_state.go @@ -71,7 +71,8 @@ type streamingState struct { pendingMcpApprovals []mcpApprovalRequest pendingMcpApprovalsSeen map[string]bool - stop atomic.Pointer[assistantStopMetadata] + finalized atomic.Bool + stop atomic.Pointer[assistantStopMetadata] } // sourceEventID returns the triggering user message event ID from the turn's source ref. @@ -109,6 +110,20 @@ func (s *streamingState) writer() *sdk.Writer { return s.turn.Writer() } +func (s *streamingState) markFinalized() bool { + if s == nil { + return false + } + return s.finalized.CompareAndSwap(false, true) +} + +func (s *streamingState) isFinalized() bool { + if s == nil { + return false + } + return s.finalized.Load() +} + func (s *streamingState) nextMessageTiming() agentremote.EventTiming { if s == nil { return agentremote.ResolveEventTiming(time.Time{}, 0) diff --git a/bridges/ai/streaming_success.go b/bridges/ai/streaming_success.go index 18f0bb6d..d73f731c 100644 --- a/bridges/ai/streaming_success.go +++ b/bridges/ai/streaming_success.go @@ -17,6 +17,9 @@ func (oc *AIClient) completeStreamingSuccess( state *streamingState, meta *PortalMetadata, ) { + if state == nil || !state.markFinalized() { + return + } state.completedAtMs = time.Now().UnixMilli() if state.finishReason == "" { state.finishReason = "stop"