Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ enum Action {
#[debug("reply")]
reply: Option<oneshot::Sender<Store>>,
},
#[cfg(test)]
#[display("DebugTasksLen")]
DebugTasksLen {
#[debug("reply")]
reply: oneshot::Sender<usize>,
},
}

#[derive(derive_more::Debug, strum::Display)]
Expand Down Expand Up @@ -514,6 +520,13 @@ impl SyncHandle {
Ok(store)
}

#[cfg(test)]
async fn debug_tasks_len(&self) -> Result<usize> {
let (reply, rx) = oneshot::channel();
self.send(Action::DebugTasksLen { reply }).await?;
Ok(rx.await?)
}

pub async fn list_authors(
&self,
reply: mpsc::Sender<RpcResult<AuthorListResponse>>,
Expand Down Expand Up @@ -661,6 +674,14 @@ impl Actor {
}
continue;
}
Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
if let Err(err) = res {
if !err.is_cancelled() {
warn!(?err, "actor reply-streamer task panicked");
}
}
continue;
}
action = self.action_rx.recv() => {
match action {
Ok(action) => action,
Expand Down Expand Up @@ -702,6 +723,8 @@ impl Actor {
Action::Shutdown { .. } => {
unreachable!("Shutdown is handled in run()")
}
#[cfg(test)]
Action::DebugTasksLen { reply } => send_reply(reply, self.tasks.len()),
Action::ImportAuthor { author, reply } => {
let id = author.id();
send_reply(reply, self.store.import_author(author).map(|_| id))
Expand Down Expand Up @@ -1112,4 +1135,60 @@ mod tests {
assert!(rx.recv().await.is_err());
Ok(())
}

/// Tests that streamer tasks spawned into `Actor.tasks` are reaped
/// once they complete.
///
/// The three streaming actions (`ListAuthors`, `ListReplicas`, and
/// `ReplicaAction::GetMany`) each `spawn_local` a task into
/// `Actor.tasks` to drive their reply channel. The actor must
/// `join_next` those tasks once they finish, otherwise the
/// `JoinSet` grows without bound for the lifetime of the actor.
#[tokio::test]
async fn actor_tasks_joinset_drain() -> anyhow::Result<()> {
let store = store::Store::memory();
let sync = SyncHandle::spawn(store, None, "drain".into());

let namespace = NamespaceSecret::new(&mut rand::rng());
let id = namespace.id();
sync.import_namespace(namespace.into()).await?;
sync.open(id, Default::default()).await?;

const ITERATIONS: usize = 1000;

for _ in 0..ITERATIONS {
let (tx, mut rx) = mpsc::channel(64);
sync.list_authors(tx).await?;
while rx.recv().await?.is_some() {}
}

for _ in 0..ITERATIONS {
let (tx, mut rx) = mpsc::channel(64);
sync.list_replicas(tx).await?;
while rx.recv().await?.is_some() {}
}

for _ in 0..ITERATIONS {
let (tx, mut rx) = mpsc::channel(64);
sync.get_many(id, store::Query::all().into(), tx).await?;
while rx.recv().await?.is_some() {}
}

let mut last = sync.debug_tasks_len().await?;
let deadline = std::time::Instant::now() + Duration::from_secs(10);
while last > 16 && std::time::Instant::now() < deadline {
tokio::time::sleep(Duration::from_millis(50)).await;
last = sync.debug_tasks_len().await?;
}

assert!(
last <= 16,
"residual Actor.tasks JoinSet len = {last}, expected <= 16 \
(was the join_next arm in run_async lost? streamer tasks \
for ListAuthors / ListReplicas / GetMany are not being reaped)"
);

sync.close(id).await?;
Ok(())
}
}
Loading