diff --git a/crates/misc/component-async-tests/tests/scenario/backpressure.rs b/crates/misc/component-async-tests/tests/scenario/backpressure.rs index 21f18cdf2723..92bd66b4a97b 100644 --- a/crates/misc/component-async-tests/tests/scenario/backpressure.rs +++ b/crates/misc/component-async-tests/tests/scenario/backpressure.rs @@ -1,13 +1,127 @@ -use wasmtime::Result; +use component_async_tests::Ctx; +use std::{ + env, future, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; +use wasmtime::{ + Engine, Result, Store, + component::{Linker, ResourceTable}, +}; +use wasmtime_wasi::WasiCtxBuilder; -use super::util::test_run; +use super::util::{config, make_component, test_run}; -// No-op function; we only test this by composing it in `async_backpressure_caller` -#[allow( - dead_code, - reason = "here only to make the `assert_test_exists` macro happy" -)] -pub fn async_backpressure_callee() {} +mod callee { + wasmtime::component::bindgen!({ + path: "wit", + world: "backpressure-callee", + exports: { default: async | store }, + }); +} + +#[tokio::test] +pub async fn async_backpressure_callee() -> Result<()> { + let mut config = config(); + // As of this writing, miri/pulley/epochs is a problematic combination, so + // we don't test it. + if env::var_os("MIRI_TEST_CWASM_DIR").is_none() { + config.epoch_interruption(true); + } + + let engine = Engine::new(&config)?; + let component = make_component( + &engine, + &[test_programs_artifacts::ASYNC_BACKPRESSURE_CALLEE_COMPONENT], + ) + .await?; + let mut linker = Linker::new(&engine); + wasmtime_wasi::p2::add_to_linker_async(&mut linker)?; + + let mut store = Store::new( + &engine, + Ctx { + wasi: WasiCtxBuilder::new().inherit_stdio().build(), + table: ResourceTable::default(), + continue_: false, + }, + ); + + if env::var_os("MIRI_TEST_CWASM_DIR").is_none() { + store.set_epoch_deadline(1); + + std::thread::spawn(move || { + std::thread::sleep(Duration::from_secs(10)); + engine.increment_epoch(); + }); + } + + let guest = + callee::BackpressureCallee::instantiate_async(&mut store, &component, &linker).await?; + + store + .run_concurrent(async |accessor| { + guest + .local_local_backpressure() + .call_inc_then_later_dec_backpressure(accessor) + .await?; + + let mut instance = Some(Box::pin( + guest + .local_local_run() + .func_run() + .func() + .ready_for_concurrent_call(accessor), + )); + + let mut a = Some(Box::pin(guest.local_local_run().call_run(accessor))); + let mut b = Some(Box::pin(guest.local_local_run().call_run(accessor))); + let mut c = Some(Box::pin(guest.local_local_run().call_run(accessor))); + + let mut backpressure_is_set = true; + future::poll_fn(move |cx| { + let instance_ready = is_ready(cx, &mut instance); + let a_ready = is_ready(cx, &mut a); + let b_ready = is_ready(cx, &mut b); + let c_ready = is_ready(cx, &mut c); + + if backpressure_is_set { + assert!(!instance_ready); + assert!(!a_ready); + assert!(!b_ready); + assert!(!c_ready); + + backpressure_is_set = false; + + Poll::Pending + } else if instance_ready && a_ready && b_ready && c_ready { + Poll::Ready(()) + } else { + Poll::Pending + } + }) + .await; + + wasmtime::error::Ok(()) + }) + .await??; + + Ok(()) +} + +fn is_ready(cx: &mut Context, fut: &mut Option>>) -> bool { + if let Some(v) = fut.as_mut() { + if v.as_mut().poll(cx).is_ready() { + *fut = None; + true + } else { + false + } + } else { + true + } +} #[tokio::test] pub async fn async_backpressure_caller() -> Result<()> { diff --git a/crates/misc/component-async-tests/wit/test.wit b/crates/misc/component-async-tests/wit/test.wit index 84efb8ec40a0..59eae0d05deb 100644 --- a/crates/misc/component-async-tests/wit/test.wit +++ b/crates/misc/component-async-tests/wit/test.wit @@ -63,6 +63,7 @@ interface backpressure { set-backpressure: func(enabled: bool); inc-backpressure: func(); dec-backpressure: func(); + inc-then-later-dec-backpressure: async func(); } interface transmit { diff --git a/crates/test-programs/src/bin/async_backpressure_callee.rs b/crates/test-programs/src/bin/async_backpressure_callee.rs index 7f89a67d77fa..ec1fef7f0529 100644 --- a/crates/test-programs/src/bin/async_backpressure_callee.rs +++ b/crates/test-programs/src/bin/async_backpressure_callee.rs @@ -32,6 +32,16 @@ impl Backpressure for Component { fn dec_backpressure() { wit_bindgen::backpressure_dec(); } + async fn inc_then_later_dec_backpressure() { + wit_bindgen::backpressure_inc(); + + wit_bindgen::spawn_local(async { + for _ in 0..10 { + wit_bindgen::yield_async().await; + } + wit_bindgen::backpressure_dec(); + }); + } } // Unused function; required since this file is built as a `bin`: diff --git a/crates/test-programs/src/bin/async_cancel_callee.rs b/crates/test-programs/src/bin/async_cancel_callee.rs index 2ee686dec974..b92410843cf5 100644 --- a/crates/test-programs/src/bin/async_cancel_callee.rs +++ b/crates/test-programs/src/bin/async_cancel_callee.rs @@ -97,6 +97,16 @@ unsafe extern "C" fn export_dec_backpressure() { wit_bindgen::backpressure_dec(); } +#[unsafe(export_name = "[async-lift]local:local/backpressure#inc-then-later-dec-backpressure")] +unsafe extern "C" fn export_inc_then_later_dec_backpressure() -> u32 { + todo!() +} + +#[unsafe(export_name = "[callback][async-lift]local:local/backpressure#inc-then-later-dec-backpressure")] +unsafe extern "C" fn callback_inc_then_later_dec_backpressure(_: u32, _: u32, _: u32) -> u32 { + todo!() +} + #[unsafe(export_name = "local:local/yield#yield-times")] unsafe extern "C" fn export_yield_yield_times(times: u64) { unsafe { diff --git a/crates/wasi-http/src/handler.rs b/crates/wasi-http/src/handler.rs index 2dc49ab90766..69f8fa7bf61f 100644 --- a/crates/wasi-http/src/handler.rs +++ b/crates/wasi-http/src/handler.rs @@ -31,7 +31,7 @@ use std::sync::{ use std::task::{Context, Poll}; use std::time::Instant; use tokio::sync::Notify; -use wasmtime::component::{Accessor, GuestTaskId, Resource, TypedFuncCallConcurrent}; +use wasmtime::component::{Accessor, GuestTaskId, HasData, Resource, TypedFuncCallConcurrent}; #[cfg(feature = "p2")] use wasmtime::error::Context as _; use wasmtime::{AsContextMut, Result, Store, StoreContextMut, format_err}; @@ -793,6 +793,7 @@ where Some((pair, queue)) } )); + let mut ready = pin!(when_ready(accessor, proxy)); future::poll_fn(|cx| { loop { // First, and crucially first, poll `futures`. This way @@ -839,6 +840,13 @@ where Poll::Ready(None) | Poll::Pending => {} } + ready.set(when_ready(accessor, proxy)); + let is_ready = match ready.as_mut().poll(cx) { + Poll::Ready(Ok(())) => true, + Poll::Ready(Err(error)) => break Poll::Ready(Err(error)), + Poll::Pending => false, + }; + // At this point `futures` is either empty or it's `Pending` // meaning nothing is ready. Note that `Pending` here // doesn't necessarily mean all tasks are blocked on I/O. @@ -850,6 +858,7 @@ where // at all or all our tasks really are blocked on I/O. self.set_available( may_accept + && is_ready && match dropper .state .should_accept_request(futures.len(), reuse_count) @@ -905,7 +914,7 @@ where // if we're not actually capable of accepting any more work, // then we're completely done and it's time to exit this // worker. - if !may_accept { + if !(may_accept && is_ready) { break Poll::Ready(Ok(())); } @@ -1401,3 +1410,26 @@ impl<'a, T: Send> Prepared<'a, T> { } } } + +async fn when_ready(accessor: &Accessor, proxy: &Proxy) -> Result<()> { + match proxy { + #[cfg(feature = "p3")] + Proxy::P3(guest) => { + guest + .wasi_http_handler() + .func_handle() + .func() + .ready_for_concurrent_call(accessor) + .await + } + #[cfg(feature = "p2")] + Proxy::P2(guest) => { + guest + .wasi_http_incoming_handler() + .func_handle() + .func() + .ready_for_concurrent_call(accessor) + .await + } + } +} diff --git a/crates/wasmtime/src/runtime/component/concurrent.rs b/crates/wasmtime/src/runtime/component/concurrent.rs index 5ec4de15bb15..d858f61ccf30 100644 --- a/crates/wasmtime/src/runtime/component/concurrent.rs +++ b/crates/wasmtime/src/runtime/component/concurrent.rs @@ -1843,6 +1843,9 @@ impl StoreOpaque { /// Iterate over `InstanceState::pending`, moving any ready items into the /// "high priority" work item queue. /// + /// Also, wake any wakers interested in the transition from not ready to + /// ready. + /// /// See `GuestCall::is_ready` for details. fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> { for (thread, kind) in @@ -1860,6 +1863,20 @@ impl StoreOpaque { } } + for waker in self + .instance_state(instance) + .concurrent_state() + .wakers + .get_mut() + .iter_mut() + { + if let Some(waker) = waker.downcast_ref::() { + waker.wake_by_ref(); + } else { + bail_bug!("`ConcurrentInstanceState::wakers` should contain only `Waker`s"); + } + } + Ok(()) } @@ -4989,6 +5006,7 @@ pub struct ConcurrentInstanceState { /// Pending calls for this instance which require `Self::backpressure` to be /// `true` and/or `Self::do_not_enter` to be false before they can proceed. pending: BTreeMap, + wakers: AlwaysMut, } impl ConcurrentInstanceState { @@ -5702,3 +5720,69 @@ fn queue_call0( ) } } + +pub(crate) struct ReadyToCall<'a, T: 'static, D: HasData + ?Sized> { + accessor: &'a Accessor, + func: Func, + waker_key: Option>, +} + +impl Future for ReadyToCall<'_, T, D> { + type Output = Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.accessor.with(|mut store| { + let store = store.as_context_mut(); + let (_, _, _, raw_options) = self.func.abi_info(store.0); + let instance = self.func.instance().runtime_instance(raw_options.instance); + let state = store.0.instance_state(instance).concurrent_state(); + if state.backpressure == 0 { + Poll::Ready(Ok(())) + } else { + let waker = cx.waker().clone(); + if let Some(key) = &self.waker_key { + *state.wakers.get_mut().get_mut(key).unwrap() = waker; + } else { + self.waker_key = Some( + state + .wakers + .get_mut() + .push(waker) + .map_err(crate::Error::from)?, + ); + } + Poll::Pending + } + }) + } +} + +impl Drop for ReadyToCall<'_, T, D> { + fn drop(&mut self) { + if let Some(key) = self.waker_key.take() { + self.accessor.with(|mut store| { + let store = store.as_context_mut(); + let (_, _, _, raw_options) = self.func.abi_info(store.0); + let instance = self.func.instance().runtime_instance(raw_options.instance); + _ = store + .0 + .instance_state(instance) + .concurrent_state() + .wakers + .get_mut() + .delete(key); + }); + } + } +} + +pub(crate) fn ready_to_call<'a, T, D: HasData + ?Sized>( + accessor: &'a Accessor, + func: Func, +) -> ReadyToCall<'a, T, D> { + ReadyToCall { + accessor, + func, + waker_key: None, + } +} diff --git a/crates/wasmtime/src/runtime/component/concurrent/func.rs b/crates/wasmtime/src/runtime/component/concurrent/func.rs index 371bb8e621e9..365c0c7a5e93 100644 --- a/crates/wasmtime/src/runtime/component/concurrent/func.rs +++ b/crates/wasmtime/src/runtime/component/concurrent/func.rs @@ -163,6 +163,17 @@ impl Func { Ok(()) } + /// Returns a future which will resolve once the component instance + /// corresponding to this function is ready to run a concurrent call without + /// queuing it (i.e. does not have backpressure enabled and does not have a + /// sync call in progress). + pub async fn ready_for_concurrent_call( + self, + accessor: impl AsAccessor, + ) -> Result<()> { + concurrent::ready_to_call(accessor.as_accessor(), self).await + } + /// Calls `concurrent::prepare_call` with monomorphized functions for /// lowering the parameters and lifting the result. fn prepare_call_dynamic<'a, T: Send + 'static>(