Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 0 additions & 21 deletions crates/sprout-acp/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,27 +220,6 @@ pub struct PromptContext {
// ── AgentPool impl ────────────────────────────────────────────────────────────

impl AgentPool {
/// Create a new pool from a list of initialized agents.
///
/// Agents are placed into indexed slots. The unbounded channel is created
/// here; tasks send results back through `result_tx`.
///
/// Prefer [`AgentPool::from_slots`] for startup paths where some agents may
/// have failed — `new()` packs agents densely and will break the
/// `agent.index` invariant if any slot was skipped.
#[allow(dead_code)]
pub fn new(agents: Vec<OwnedAgent>) -> Self {
let (result_tx, result_rx) = mpsc::unbounded_channel();
let slots = agents.into_iter().map(Some).collect();
Self {
agents: slots,
result_tx,
result_rx,
join_set: JoinSet::new(),
task_map: HashMap::new(),
}
}

/// Create a pool from pre-indexed slots (may contain None for failed startups).
///
/// Slot positions are preserved so that `agent.index` always matches the
Expand Down
124 changes: 42 additions & 82 deletions crates/sprout-acp/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ impl EventQueue {
})
.collect();

// Remove the queue entry if now empty (keeps pending_channels() accurate).
// Remove the queue entry if now empty.
if self.queues.get(&channel_id).is_some_and(|q| q.is_empty()) {
self.queues.remove(&channel_id);
}
Expand Down Expand Up @@ -490,20 +490,7 @@ impl EventQueue {
.any(|id| !self.in_flight_channels.contains(id))
}

/// Whether any prompt is currently in flight.
#[allow(dead_code)]
pub fn is_in_flight(&self) -> bool {
!self.in_flight_channels.is_empty()
}

/// Total number of pending events across all channels.
#[allow(dead_code)]
pub fn pending_count(&self) -> usize {
self.queues.values().map(|q| q.len()).sum()
}

/// Number of channels with pending events.
#[allow(dead_code)]
pub fn pending_channels(&self) -> usize {
self.queues.len()
}
Expand All @@ -517,9 +504,6 @@ impl EventQueue {
///
/// Also clears any `retry_after` throttle for the channel.
///
/// Returns the number of events dropped.
/// Drop all queued (non-in-flight) events for a channel.
///
/// Returns the event IDs of dropped events so the caller can clean up
/// any reactions (👀) that were added at queue-push time.
pub fn drain_channel(&mut self, channel_id: Uuid) -> Vec<String> {
Expand All @@ -540,7 +524,6 @@ impl EventQueue {
}

/// Whether a prompt is currently in-flight for the given channel.
#[allow(dead_code)]
pub fn is_channel_in_flight(&self, channel_id: Uuid) -> bool {
self.in_flight_channels.contains(&channel_id)
}
Expand Down Expand Up @@ -1108,6 +1091,14 @@ mod tests {
}
}

fn pending_count(q: &EventQueue) -> usize {
q.queues.values().map(|q| q.len()).sum()
}

fn any_in_flight(q: &EventQueue) -> bool {
!q.in_flight_channels.is_empty()
}

// ── Test 1: push + flush_next basic ──────────────────────────────────────

#[test]
Expand All @@ -1123,8 +1114,8 @@ mod tests {
assert_eq!(batch.events[0].event.content, "hello");

// Queue should be empty now.
assert_eq!(q.pending_count(), 0);
assert_eq!(q.pending_channels(), 0);
assert_eq!(pending_count(&q), 0);
assert_eq!(q.queues.len(), 0);
}

// ── Test 2: same channel cannot be flushed twice ─────────────────────────
Expand All @@ -1136,7 +1127,7 @@ mod tests {

q.push(make_queued(ch, "first"));
let _batch = q.flush_next().expect("first flush should succeed");
assert!(q.is_in_flight());
assert!(any_in_flight(&q));

// Push another event while in-flight.
q.push(make_queued(ch, "second"));
Expand All @@ -1162,7 +1153,7 @@ mod tests {

// Complete the in-flight prompt.
q.mark_complete(ch);
assert!(!q.is_in_flight());
assert!(!any_in_flight(&q));

// Now flush should succeed.
let batch = q.flush_next().expect("should flush after mark_complete");
Expand All @@ -1182,7 +1173,7 @@ mod tests {
q.push(make_queued(ch, "msg2"));
q.push(make_queued(ch, "msg3"));

assert_eq!(q.pending_count(), 3);
assert_eq!(pending_count(&q), 3);

let batch = q.flush_next().expect("should return batch");
assert_eq!(batch.channel_id, ch);
Expand All @@ -1192,8 +1183,8 @@ mod tests {
assert_eq!(batch.events[2].event.content, "msg3");

// All drained.
assert_eq!(q.pending_count(), 0);
assert_eq!(q.pending_channels(), 0);
assert_eq!(pending_count(&q), 0);
assert_eq!(q.queues.len(), 0);
}

// ── Test 5: FIFO fairness ─────────────────────────────────────────────────
Expand Down Expand Up @@ -1229,11 +1220,11 @@ mod tests {
// First flush picks A.
let batch_a = q.flush_next().expect("first flush");
assert_eq!(batch_a.channel_id, ch_a);
assert!(q.is_in_flight());
assert!(any_in_flight(&q));

// B still pending.
assert_eq!(q.pending_count(), 1);
assert_eq!(q.pending_channels(), 1);
assert_eq!(pending_count(&q), 1);
assert_eq!(q.queues.len(), 1);

q.mark_complete(ch_a);

Expand All @@ -1242,7 +1233,7 @@ mod tests {
assert_eq!(batch_b.channel_id, ch_b);
assert_eq!(batch_b.events[0].event.content, "B-event");

assert_eq!(q.pending_count(), 0);
assert_eq!(pending_count(&q), 0);
}

// ── Test 7: empty queue returns None ─────────────────────────────────────
Expand All @@ -1253,37 +1244,6 @@ mod tests {
assert!(q.flush_next().is_none());
}

// ── Test 8: pending_count ─────────────────────────────────────────────────

#[test]
fn test_pending_count() {
let mut q = EventQueue::new(DedupMode::Queue);
let ch_a = Uuid::new_v4();
let ch_b = Uuid::new_v4();

assert_eq!(q.pending_count(), 0);
assert_eq!(q.pending_channels(), 0);

q.push(make_queued(ch_a, "a1"));
q.push(make_queued(ch_a, "a2"));
q.push(make_queued(ch_b, "b1"));

assert_eq!(q.pending_count(), 3);
assert_eq!(q.pending_channels(), 2);

// Flush A (2 events drained).
let _ = q.flush_next();
assert_eq!(q.pending_count(), 1);
assert_eq!(q.pending_channels(), 1);

q.mark_complete(ch_a);

// Flush B (1 event drained).
let _ = q.flush_next();
assert_eq!(q.pending_count(), 0);
assert_eq!(q.pending_channels(), 0);
}

// ── Test 9: format_prompt single event ───────────────────────────────────

#[test]
Expand Down Expand Up @@ -1331,7 +1291,7 @@ mod tests {

let batch = queue.flush_next().unwrap();
assert_eq!(batch.events.len(), 2);
assert!(queue.is_in_flight());
assert!(any_in_flight(&queue));

// Simulate failure — requeue the batch.
queue.requeue(batch);
Expand Down Expand Up @@ -1491,11 +1451,11 @@ mod tests {

q.push(make_queued(ch, "first"));
let _batch = q.flush_next().expect("first flush");
assert!(q.is_in_flight());
assert!(any_in_flight(&q));

// In drop mode, pushing to the in-flight channel should be discarded.
q.push(make_queued(ch, "dropped"));
assert_eq!(q.pending_count(), 0, "event should be dropped");
assert_eq!(pending_count(&q), 0, "event should be dropped");

q.mark_complete(ch);
// Nothing to flush.
Expand All @@ -1512,11 +1472,11 @@ mod tests {

q.push(make_queued(ch_a, "A-first"));
let _batch = q.flush_next().expect("flush A");
assert!(q.is_in_flight());
assert!(any_in_flight(&q));

// Events for ch_b should still queue.
q.push(make_queued(ch_b, "B-event"));
assert_eq!(q.pending_count(), 1);
assert_eq!(pending_count(&q), 1);

q.mark_complete(ch_a);
let batch_b = q.flush_next().expect("flush B");
Expand All @@ -1537,7 +1497,7 @@ mod tests {
// Flush A — now A is in-flight.
let batch_a = q.flush_next().expect("flush A");
assert_eq!(batch_a.channel_id, ch_a);
assert!(q.is_in_flight());
assert!(any_in_flight(&q));

// Flush B — B should also be flushable (different channel).
let batch_b = q.flush_next().expect("flush B while A in-flight");
Expand All @@ -1548,11 +1508,11 @@ mod tests {

// Complete A only.
q.mark_complete(ch_a);
assert!(q.is_in_flight()); // B still in-flight.
assert!(any_in_flight(&q)); // B still in-flight.

// Complete B.
q.mark_complete(ch_b);
assert!(!q.is_in_flight());
assert!(!any_in_flight(&q));
}

// ── Test 15: same channel cannot be flushed twice ─────────────────────────
Expand Down Expand Up @@ -1597,7 +1557,7 @@ mod tests {
// Drop mode: pushing to either in-flight channel is dropped.
q.push(make_queued(ch_a, "A-dropped"));
q.push(make_queued(ch_b, "B-dropped"));
assert_eq!(q.pending_count(), 0);
assert_eq!(pending_count(&q), 0);

q.mark_complete(ch_a);
q.mark_complete(ch_b);
Expand Down Expand Up @@ -1660,10 +1620,10 @@ mod tests {
assert!(!q.in_flight_channels.contains(&ch_a));

// B still in-flight.
assert!(q.is_in_flight());
assert!(any_in_flight(&q));

q.mark_complete(ch_b);
assert!(!q.is_in_flight());
assert!(!any_in_flight(&q));
}

// ── Test 19: requeue_preserve_timestamps preserves received_at ───────────
Expand Down Expand Up @@ -1722,29 +1682,29 @@ mod tests {
for i in 0..MAX_PENDING_PER_CHANNEL {
q.push(make_queued(ch, &format!("fill-{i}")));
}
assert_eq!(q.pending_count(), MAX_PENDING_PER_CHANNEL);
assert_eq!(pending_count(&q), MAX_PENDING_PER_CHANNEL);

// Flush a batch (removes some events from the queue).
let batch = q.flush_next().expect("should flush");
let batch_size = batch.events.len();
let remaining = MAX_PENDING_PER_CHANNEL - batch_size;
assert_eq!(q.pending_count(), remaining);
assert_eq!(pending_count(&q), remaining);

// Push more events while the batch is "in-flight" — fill back to cap.
for i in 0..batch_size {
q.push(make_queued(ch, &format!("new-{i}")));
}
assert_eq!(q.pending_count(), MAX_PENDING_PER_CHANNEL);
assert_eq!(pending_count(&q), MAX_PENDING_PER_CHANNEL);

// Requeue the original batch — without cap enforcement this would
// push the queue to MAX_PENDING_PER_CHANNEL + batch_size.
q.requeue_preserve_timestamps(batch);

// Cap must be enforced: queue should not exceed MAX_PENDING_PER_CHANNEL.
assert!(
q.pending_count() <= MAX_PENDING_PER_CHANNEL,
pending_count(&q) <= MAX_PENDING_PER_CHANNEL,
"queue exceeded cap: {} > {}",
q.pending_count(),
pending_count(&q),
MAX_PENDING_PER_CHANNEL,
);
}
Expand Down Expand Up @@ -2358,11 +2318,11 @@ mod tests {

q.push(make_queued(ch, "msg1"));
q.push(make_queued(ch, "msg2"));
assert_eq!(q.pending_count(), 2);
assert_eq!(pending_count(&q), 2);

let drained = q.drain_channel(ch);
assert_eq!(drained.len(), 2);
assert_eq!(q.pending_count(), 0);
assert_eq!(pending_count(&q), 0);
}

#[test]
Expand All @@ -2376,7 +2336,7 @@ mod tests {

let drained = q.drain_channel(ch_a);
assert_eq!(drained.len(), 1);
assert_eq!(q.pending_count(), 1); // ch_b still has 1
assert_eq!(pending_count(&q), 1); // ch_b still has 1
}

#[test]
Expand All @@ -2393,7 +2353,7 @@ mod tests {
assert!(!q.has_flushable_work());
let drained = q.drain_channel(ch);
assert_eq!(drained.len(), 1);
assert_eq!(q.pending_count(), 0);
assert_eq!(pending_count(&q), 0);
}

#[test]
Expand All @@ -2410,15 +2370,15 @@ mod tests {

q.push(make_queued(ch, "msg1"));
let _batch = q.flush_next().unwrap(); // now in-flight
assert!(q.is_in_flight());
assert!(any_in_flight(&q));

// Push another event while in-flight.
q.push(make_queued(ch, "msg2"));

// drain_channel should only remove the queued event, not the in-flight one.
let drained = q.drain_channel(ch);
assert_eq!(drained.len(), 1);
assert!(q.is_in_flight()); // in-flight unaffected
assert!(any_in_flight(&q)); // in-flight unaffected
}

// ── compact_expired_state ─────────────────────────────────────────────
Expand Down
Loading