Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
# beans-x0zh
title: Fix subscription resolver dropping notifications under backpressure
status: completed
type: bug
priority: normal
created_at: 2026-03-21T08:16:46Z
updated_at: 2026-03-21T08:21:04Z
---

The AgentSessionChanged and ActiveAgentStatuses subscription resolvers block on sending to the out channel. While blocked, new notifications on the buffer-1 ch channel are silently dropped. This can cause the frontend to miss entire RUNNING status transitions, resulting in the agent working indicator disappearing while the agent is still active. Fix by using a latest-value pattern that refreshes pending state when new notifications arrive during a blocked send.

## Summary of Changes

Changed the AgentSessionChanged and ActiveAgentStatuses subscription resolvers from a simple block-on-send pattern to a latest-value pattern. When the resolver is blocked sending a payload to the WebSocket transport and a new notification arrives, it now refreshes the pending state with the latest session data instead of dropping the notification. This ensures the frontend always receives the most recent session status, preventing the 'Agent is working' spinner from disappearing while the agent is actively working.

- Modified AgentSessionChanged resolver in schema.resolvers.go
- Modified ActiveAgentStatuses resolver in schema.resolvers.go
- Added test TestAgentSessionSubscription_DeliversLatestState
82 changes: 56 additions & 26 deletions internal/graph/schema.resolvers.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 46 additions & 0 deletions internal/graph/schema.resolvers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package graph
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"testing"
"time"

"github.com/hmans/beans/internal/agent"
"github.com/hmans/beans/internal/graph/model"
Expand Down Expand Up @@ -1145,6 +1147,50 @@ func TestSubscriptionBeanChanged(t *testing.T) {
})
}

func TestAgentSessionSubscription_DeliversLatestState(t *testing.T) {
// Verify that when multiple rapid notifications occur, the subscriber
// eventually receives the latest state (not a stale intermediate).
mgr := agent.NewManager("", nil)

beanID := "test-latest-value"
resolver := &Resolver{AgentMgr: mgr}
sr := resolver.Subscription()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ch, err := sr.AgentSessionChanged(ctx, beanID)
if err != nil {
t.Fatalf("AgentSessionChanged() error = %v", err)
}

// No session exists yet, so no initial emission. Fire rapid
// notifications by adding multiple info messages without reading.
for i := range 10 {
mgr.AddInfoMessage(beanID, fmt.Sprintf("msg-%d", i))
}

// Drain the channel. Use a short idle timeout: once we stop receiving
// updates for 100ms, assume all coalesced notifications have been delivered.
var lastSession *model.AgentSession
for {
select {
case s := <-ch:
lastSession = s
case <-time.After(100 * time.Millisecond):
goto done
}
}
done:
if lastSession == nil {
t.Fatal("received no updates")
}
// The final state must include all info messages
if len(lastSession.Messages) != 10 {
t.Errorf("last update had %d messages, want 10", len(lastSession.Messages))
}
}

func TestRelationshipFieldsWithFilter(t *testing.T) {
resolver, core := setupTestResolver(t)
ctx := context.Background()
Expand Down
Loading