From d415331bd77b32be196176d3b88c929f6f49d545 Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Fri, 8 Aug 2025 09:57:05 -0700 Subject: [PATCH 1/3] add failing test for cooperative concurrency as provided by @SilverMira in #70, and tracked in #73 Co-authored-by: SilverMira <66930495+SilverMira@users.noreply.github.com> --- Cargo.toml | 2 ++ src/runtime/reactor.rs | 27 +++++++++++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index 23416d4..b8e7c52 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ serde_json = { workspace = true, optional = true } anyhow.workspace = true clap.workspace = true futures-lite.workspace = true +futures-concurrency.workspace = true humantime.workspace = true serde = { workspace = true, features = ["derive"] } serde_json.workspace = true @@ -63,6 +64,7 @@ cargo_metadata = "0.18.1" clap = { version = "4.5.26", features = ["derive"] } futures-core = "0.3.19" futures-lite = "1.12.0" +futures-concurrency = "7.6" humantime = "2.1.0" heck = "0.5" http = "1.1" diff --git a/src/runtime/reactor.rs b/src/runtime/reactor.rs index 8462bd9..1bead5c 100644 --- a/src/runtime/reactor.rs +++ b/src/runtime/reactor.rs @@ -321,4 +321,31 @@ mod test { ); }) } + + #[test] + fn cooperative_concurrency() { + crate::runtime::block_on(async { + let cpu_heavy = async move { + // Simulating a CPU-heavy task that runs for 1 second and yields occasionally + for _ in 0..10 { + std::thread::sleep(std::time::Duration::from_millis(100)); + futures_lite::future::yield_now().await; + } + true + }; + let timeout = async move { + crate::time::Timer::after(crate::time::Duration::from_millis(200)) + .wait() + .await; + false + }; + let mut future_group = futures_concurrency::future::FutureGroup::< + Pin>>, + >::new(); + future_group.insert(Box::pin(cpu_heavy)); + future_group.insert(Box::pin(timeout)); + let result = futures_lite::StreamExt::next(&mut future_group).await; + assert_eq!(result, Some(false), "cpu_heavy task should have timed out"); + }); + } } From 83667bed0e7e279995a3ef885f531acce01ea6b3 Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Fri, 8 Aug 2025 10:28:14 -0700 Subject: [PATCH 2/3] reactor: add a nonblock_check_pollables variant to use when busy sharing most of the implementation with block_on_pollables --- src/runtime/block_on.rs | 1 + src/runtime/reactor.rs | 62 +++++++++++++++++++++++++++++++++++------ 2 files changed, 54 insertions(+), 9 deletions(-) diff --git a/src/runtime/block_on.rs b/src/runtime/block_on.rs index 36c6fe1..34b5d09 100644 --- a/src/runtime/block_on.rs +++ b/src/runtime/block_on.rs @@ -37,6 +37,7 @@ where // as awake, reset and poll again. otherwise, block until a // pollable wakes a future. if root.is_awake() { + reactor.nonblock_check_pollables(); root.reset() } else { reactor.block_on_pollables() diff --git a/src/runtime/reactor.rs b/src/runtime/reactor.rs index 1bead5c..d214f3f 100644 --- a/src/runtime/reactor.rs +++ b/src/runtime/reactor.rs @@ -128,8 +128,58 @@ impl Reactor { /// Block until at least one pending pollable is ready, waking a pending future. pub(crate) fn block_on_pollables(&self) { + self.check_pollables(|targets| { + debug_assert_ne!( + targets.len(), + 0, + "Attempting to block on an empty list of pollables - without any pending work, no progress can be made and wasi::io::poll::poll will trap" + ); + wasi::io::poll::poll(targets) + + }) + } + + /// Without blocking, check for any ready pollables and wake the + /// associated futures. + pub(crate) fn nonblock_check_pollables(&self) { + // Lazily create a pollable which always resolves to ready. + use std::sync::LazyLock; + static READY_POLLABLE: LazyLock = + LazyLock::new(|| wasi::clocks::monotonic_clock::subscribe_duration(0)); + + self.check_pollables(|targets| { + // Create a new set of targets, with the addition of the ready + // pollable: + let ready_index = targets.len(); + let mut new_targets = Vec::with_capacity(ready_index + 1); + new_targets.extend_from_slice(targets); + new_targets.push(&*READY_POLLABLE); + + // Poll is now guaranteed to return immediately, because at least + // one member is ready: + let mut ready_list = wasi::io::poll::poll(&new_targets); + + // Erase our extra ready pollable from the ready list: + ready_list.retain(|e| *e != ready_index as u32); + ready_list + }) + } + + /// Common core of blocking and nonblocking pollable checks. Wakes any + /// futures which are pending on the pollables, according to the result of + /// the check_ready function. + fn check_pollables(&self, check_ready: F) + where + F: FnOnce(&[&Pollable]) -> Vec, + { let reactor = self.inner.borrow(); + // If no wakers are pending on pollables, there is no work to be done + // here: + if reactor.wakers.is_empty() { + return; + } + // We're about to wait for a number of pollables. When they wake we get // the *indexes* back for the pollables whose events were available - so // we need to be able to associate the index with the right waker. @@ -144,15 +194,9 @@ impl Reactor { targets.push(&reactor.pollables[pollable_index.0]); } - debug_assert_ne!( - targets.len(), - 0, - "Attempting to block on an empty list of pollables - without any pending work, no progress can be made and wasi::io::poll::poll will trap" - ); - - // Now that we have that association, we're ready to poll our targets. - // This will block until an event has completed. - let ready_indexes = wasi::io::poll::poll(&targets); + // Now that we have that association, we're ready to check our targets for readiness. + // (This is either a wasi poll, or the nonblocking variant.) + let ready_indexes = check_ready(&targets); // Once we have the indexes for which pollables are available, we need // to convert it back to the right keys for the wakers. Earlier we From 18d97a2235a905fc675065e62e755752e78c2bfc Mon Sep 17 00:00:00 2001 From: Pat Hickey Date: Mon, 11 Aug 2025 12:15:47 -0700 Subject: [PATCH 3/3] factor out check for nonempty pending pollables in the nonblocking case, we can skip work. in the blocking case, we need to panic. In the absence of a panic, either the debug assert in block_on_pollables will go off, or the wasi poll() will trap. --- src/runtime/block_on.rs | 8 ++++++++ src/runtime/reactor.rs | 20 ++++++++++++++------ 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/src/runtime/block_on.rs b/src/runtime/block_on.rs index 34b5d09..d38a9de 100644 --- a/src/runtime/block_on.rs +++ b/src/runtime/block_on.rs @@ -40,6 +40,14 @@ where reactor.nonblock_check_pollables(); root.reset() } else { + // If there are no futures awake or waiting on a WASI + // pollable, its impossible for the reactor to make + // progress, and the only valid behaviors are to sleep + // forever or panic. This should only be reachable if the + // user's Futures are implemented incorrectly. + if !reactor.nonempty_pending_pollables() { + panic!("reactor has no futures which are awake, or are waiting on a WASI pollable to be ready") + } reactor.block_on_pollables() } } diff --git a/src/runtime/reactor.rs b/src/runtime/reactor.rs index d214f3f..22b34a8 100644 --- a/src/runtime/reactor.rs +++ b/src/runtime/reactor.rs @@ -126,7 +126,15 @@ impl Reactor { } } + /// The reactor tracks the set of WASI pollables which have an associated + /// Future pending on their readiness. This function returns indicating + /// that set of pollables is not empty. + pub(crate) fn nonempty_pending_pollables(&self) -> bool { + !self.inner.borrow().wakers.is_empty() + } + /// Block until at least one pending pollable is ready, waking a pending future. + /// Precondition: self.nonempty_pending_pollables() is true. pub(crate) fn block_on_pollables(&self) { self.check_pollables(|targets| { debug_assert_ne!( @@ -142,6 +150,11 @@ impl Reactor { /// Without blocking, check for any ready pollables and wake the /// associated futures. pub(crate) fn nonblock_check_pollables(&self) { + // If there are no pollables with associated pending futures, there is + // no work to do here, so return immediately. + if !self.nonempty_pending_pollables() { + return; + } // Lazily create a pollable which always resolves to ready. use std::sync::LazyLock; static READY_POLLABLE: LazyLock = @@ -168,18 +181,13 @@ impl Reactor { /// Common core of blocking and nonblocking pollable checks. Wakes any /// futures which are pending on the pollables, according to the result of /// the check_ready function. + /// Precondition: self.nonempty_pending_pollables() is true. fn check_pollables(&self, check_ready: F) where F: FnOnce(&[&Pollable]) -> Vec, { let reactor = self.inner.borrow(); - // If no wakers are pending on pollables, there is no work to be done - // here: - if reactor.wakers.is_empty() { - return; - } - // We're about to wait for a number of pollables. When they wake we get // the *indexes* back for the pollables whose events were available - so // we need to be able to associate the index with the right waker.