Skip to content

Commit bf332af

Browse files
committed
Fix pending queue locking and lastItem updates
Rework pending queue locking and item housekeeping to avoid races and stale pointers. getPendingQueue now locks the queue.mu before releasing pendingQueuesMu and consistently applies settings for both new and existing queues. drainPendingQueue clears queue.items and lastItem when removing a queue. removePendingQueueBySourceEvent reassigns lastItem to the new tail if the removed item was the last. enqueuePendingItem removed a now-redundant explicit lock (the returned queue is already locked). Added a unit test to verify lastItem is cleared/reassigned, and tightened an error assertion in the streaming test to use errors.Is for wrapped cancellations.
1 parent a242d26 commit bf332af

3 files changed

Lines changed: 54 additions & 18 deletions

File tree

bridges/ai/pending_queue.go

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ type pendingQueueDispatchCandidate struct {
5757

5858
func (oc *AIClient) getPendingQueue(roomID id.RoomID, settings airuntime.QueueSettings) *pendingQueue {
5959
oc.pendingQueuesMu.Lock()
60-
defer oc.pendingQueuesMu.Unlock()
6160
queue := oc.pendingQueues[roomID]
6261
if queue == nil {
6362
queue = &pendingQueue{
@@ -68,20 +67,19 @@ func (oc *AIClient) getPendingQueue(roomID id.RoomID, settings airuntime.QueueSe
6867
dropPolicy: settings.DropPolicy,
6968
}
7069
oc.pendingQueues[roomID] = queue
71-
} else {
72-
queue.mu.Lock()
73-
queue.mode = settings.Mode
74-
if settings.DebounceMs >= 0 {
75-
queue.debounceMs = settings.DebounceMs
76-
}
77-
if settings.Cap > 0 {
78-
queue.cap = settings.Cap
79-
}
80-
if settings.DropPolicy != "" {
81-
queue.dropPolicy = settings.DropPolicy
82-
}
83-
queue.mu.Unlock()
8470
}
71+
queue.mu.Lock()
72+
queue.mode = settings.Mode
73+
if settings.DebounceMs >= 0 {
74+
queue.debounceMs = settings.DebounceMs
75+
}
76+
if settings.Cap > 0 {
77+
queue.cap = settings.Cap
78+
}
79+
if settings.DropPolicy != "" {
80+
queue.dropPolicy = settings.DropPolicy
81+
}
82+
oc.pendingQueuesMu.Unlock()
8583
return queue
8684
}
8785

@@ -99,9 +97,11 @@ func (oc *AIClient) drainPendingQueue(roomID id.RoomID) []pendingQueueItem {
9997
oc.pendingQueuesMu.Unlock()
10098
return nil
10199
}
102-
delete(oc.pendingQueues, roomID)
103100
queue.mu.Lock()
101+
delete(oc.pendingQueues, roomID)
104102
items := queue.items
103+
queue.items = nil
104+
queue.lastItem = nil
105105
queue.mu.Unlock()
106106
oc.pendingQueuesMu.Unlock()
107107

@@ -131,6 +131,13 @@ func (oc *AIClient) removePendingQueueBySourceEvent(roomID id.RoomID, sourceEven
131131
}
132132
clear(queue.items[len(kept):])
133133
queue.items = kept
134+
if queue.lastItem != nil && queue.lastItem.pending.sourceEventID() == sourceEventID {
135+
queue.lastItem = nil
136+
if len(kept) > 0 {
137+
lastItem := kept[len(kept)-1]
138+
queue.lastItem = &lastItem
139+
}
140+
}
134141
empty := len(queue.items) == 0 && queue.droppedCount == 0
135142
if empty {
136143
delete(oc.pendingQueues, roomID)
@@ -149,7 +156,6 @@ func (oc *AIClient) enqueuePendingItem(roomID id.RoomID, item pendingQueueItem,
149156
if queue == nil {
150157
return false
151158
}
152-
queue.mu.Lock()
153159
defer queue.mu.Unlock()
154160

155161
for _, existing := range queue.items {

bridges/ai/queue_status_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,3 +190,33 @@ func TestDispatchOrQueueQueuesBehindExistingPendingWork(t *testing.T) {
190190
t.Fatalf("expected room to remain unacquired while backlog exists")
191191
}
192192
}
193+
194+
func TestRemovePendingQueueBySourceEventClearsRemovedLastItem(t *testing.T) {
195+
roomID := id.RoomID("!room:example.com")
196+
first := pendingQueueItem{pending: pendingMessage{SourceEventID: id.EventID("$one")}}
197+
last := pendingQueueItem{pending: pendingMessage{SourceEventID: id.EventID("$two")}}
198+
oc := &AIClient{
199+
pendingQueues: map[id.RoomID]*pendingQueue{
200+
roomID: {
201+
items: []pendingQueueItem{first, last},
202+
lastItem: &last,
203+
},
204+
},
205+
}
206+
207+
removed := oc.removePendingQueueBySourceEvent(roomID, id.EventID("$two"))
208+
if len(removed) != 1 {
209+
t.Fatalf("expected one removed item, got %d", len(removed))
210+
}
211+
212+
snapshot := oc.getQueueSnapshot(roomID)
213+
if snapshot == nil {
214+
t.Fatal("expected queue snapshot to remain")
215+
}
216+
if snapshot.lastItem == nil {
217+
t.Fatal("expected lastItem to be reassigned to the new tail")
218+
}
219+
if got := snapshot.lastItem.pending.sourceEventID(); got != id.EventID("$one") {
220+
t.Fatalf("expected lastItem to point at remaining item, got %q", got)
221+
}
222+
}

bridges/ai/streaming_error_handling_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,8 @@ func TestFinishStreamingWithFailureCancelledEndsTurnAsCancelled(t *testing.T) {
101101
"cancelled",
102102
context.Canceled,
103103
)
104-
if err == nil {
105-
t.Fatal("expected wrapped cancellation error")
104+
if !errors.Is(err, context.Canceled) {
105+
t.Fatalf("expected wrapped cancellation error, got %#v", err)
106106
}
107107

108108
message := streamui.SnapshotUIMessage(state.turn.UIState())

0 commit comments

Comments
 (0)