From 8266f0ac594684b75cfc309835ac21efe0941d5c Mon Sep 17 00:00:00 2001 From: Ben Hagen Date: Wed, 29 Apr 2026 16:17:49 +0200 Subject: [PATCH] fix(actor): drain Actor::tasks JoinSet in run_async The three streaming-reply actions push tasks into `Actor::tasks` via `spawn_local`, but `run_async` had no `join_next` arm, so completed task headers accumulated for the actor's lifetime and only got released by `abort_all()` at shutdown. Add the missing arm, matching the drain pattern already used by `LiveActor` and `GossipActor`. --- src/actor.rs | 79 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/src/actor.rs b/src/actor.rs index 9969832c..40829225 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -89,6 +89,12 @@ enum Action { #[debug("reply")] reply: Option>, }, + #[cfg(test)] + #[display("DebugTasksLen")] + DebugTasksLen { + #[debug("reply")] + reply: oneshot::Sender, + }, } #[derive(derive_more::Debug, strum::Display)] @@ -514,6 +520,13 @@ impl SyncHandle { Ok(store) } + #[cfg(test)] + async fn debug_tasks_len(&self) -> Result { + let (reply, rx) = oneshot::channel(); + self.send(Action::DebugTasksLen { reply }).await?; + Ok(rx.await?) + } + pub async fn list_authors( &self, reply: mpsc::Sender>, @@ -661,6 +674,14 @@ impl Actor { } continue; } + Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => { + if let Err(err) = res { + if !err.is_cancelled() { + warn!(?err, "actor reply-streamer task panicked"); + } + } + continue; + } action = self.action_rx.recv() => { match action { Ok(action) => action, @@ -702,6 +723,8 @@ impl Actor { Action::Shutdown { .. } => { unreachable!("Shutdown is handled in run()") } + #[cfg(test)] + Action::DebugTasksLen { reply } => send_reply(reply, self.tasks.len()), Action::ImportAuthor { author, reply } => { let id = author.id(); send_reply(reply, self.store.import_author(author).map(|_| id)) @@ -1112,4 +1135,60 @@ mod tests { assert!(rx.recv().await.is_err()); Ok(()) } + + /// Tests that streamer tasks spawned into `Actor.tasks` are reaped + /// once they complete. + /// + /// The three streaming actions (`ListAuthors`, `ListReplicas`, and + /// `ReplicaAction::GetMany`) each `spawn_local` a task into + /// `Actor.tasks` to drive their reply channel. The actor must + /// `join_next` those tasks once they finish, otherwise the + /// `JoinSet` grows without bound for the lifetime of the actor. + #[tokio::test] + async fn actor_tasks_joinset_drain() -> anyhow::Result<()> { + let store = store::Store::memory(); + let sync = SyncHandle::spawn(store, None, "drain".into()); + + let namespace = NamespaceSecret::new(&mut rand::rng()); + let id = namespace.id(); + sync.import_namespace(namespace.into()).await?; + sync.open(id, Default::default()).await?; + + const ITERATIONS: usize = 1000; + + for _ in 0..ITERATIONS { + let (tx, mut rx) = mpsc::channel(64); + sync.list_authors(tx).await?; + while rx.recv().await?.is_some() {} + } + + for _ in 0..ITERATIONS { + let (tx, mut rx) = mpsc::channel(64); + sync.list_replicas(tx).await?; + while rx.recv().await?.is_some() {} + } + + for _ in 0..ITERATIONS { + let (tx, mut rx) = mpsc::channel(64); + sync.get_many(id, store::Query::all().into(), tx).await?; + while rx.recv().await?.is_some() {} + } + + let mut last = sync.debug_tasks_len().await?; + let deadline = std::time::Instant::now() + Duration::from_secs(10); + while last > 16 && std::time::Instant::now() < deadline { + tokio::time::sleep(Duration::from_millis(50)).await; + last = sync.debug_tasks_len().await?; + } + + assert!( + last <= 16, + "residual Actor.tasks JoinSet len = {last}, expected <= 16 \ + (was the join_next arm in run_async lost? streamer tasks \ + for ListAuthors / ListReplicas / GetMany are not being reaped)" + ); + + sync.close(id).await?; + Ok(()) + } }