Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 122 additions & 8 deletions crates/misc/component-async-tests/tests/scenario/backpressure.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,127 @@
use wasmtime::Result;
use component_async_tests::Ctx;
use std::{
env, future,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use wasmtime::{
Engine, Result, Store,
component::{Linker, ResourceTable},
};
use wasmtime_wasi::WasiCtxBuilder;

use super::util::test_run;
use super::util::{config, make_component, test_run};

// No-op function; we only test this by composing it in `async_backpressure_caller`
#[allow(
dead_code,
reason = "here only to make the `assert_test_exists` macro happy"
)]
pub fn async_backpressure_callee() {}
mod callee {
wasmtime::component::bindgen!({
path: "wit",
world: "backpressure-callee",
exports: { default: async | store },
});
}

#[tokio::test]
pub async fn async_backpressure_callee() -> Result<()> {
let mut config = config();
// As of this writing, miri/pulley/epochs is a problematic combination, so
// we don't test it.
if env::var_os("MIRI_TEST_CWASM_DIR").is_none() {
config.epoch_interruption(true);
}

let engine = Engine::new(&config)?;
let component = make_component(
&engine,
&[test_programs_artifacts::ASYNC_BACKPRESSURE_CALLEE_COMPONENT],
)
.await?;
let mut linker = Linker::new(&engine);
wasmtime_wasi::p2::add_to_linker_async(&mut linker)?;

let mut store = Store::new(
&engine,
Ctx {
wasi: WasiCtxBuilder::new().inherit_stdio().build(),
table: ResourceTable::default(),
continue_: false,
},
);

if env::var_os("MIRI_TEST_CWASM_DIR").is_none() {
store.set_epoch_deadline(1);

std::thread::spawn(move || {
std::thread::sleep(Duration::from_secs(10));
engine.increment_epoch();
});
}

let guest =
callee::BackpressureCallee::instantiate_async(&mut store, &component, &linker).await?;

store
.run_concurrent(async |accessor| {
guest
.local_local_backpressure()
.call_inc_then_later_dec_backpressure(accessor)
.await?;

let mut instance = Some(Box::pin(
guest
.local_local_run()
.func_run()
.func()
.ready_for_concurrent_call(accessor),
));

let mut a = Some(Box::pin(guest.local_local_run().call_run(accessor)));
let mut b = Some(Box::pin(guest.local_local_run().call_run(accessor)));
let mut c = Some(Box::pin(guest.local_local_run().call_run(accessor)));

let mut backpressure_is_set = true;
future::poll_fn(move |cx| {
let instance_ready = is_ready(cx, &mut instance);
let a_ready = is_ready(cx, &mut a);
let b_ready = is_ready(cx, &mut b);
let c_ready = is_ready(cx, &mut c);

if backpressure_is_set {
assert!(!instance_ready);
assert!(!a_ready);
assert!(!b_ready);
assert!(!c_ready);

backpressure_is_set = false;

Poll::Pending
} else if instance_ready && a_ready && b_ready && c_ready {
Poll::Ready(())
} else {
Poll::Pending
}
})
.await;

wasmtime::error::Ok(())
})
.await??;

Ok(())
}

fn is_ready(cx: &mut Context, fut: &mut Option<Pin<Box<impl Future>>>) -> bool {
if let Some(v) = fut.as_mut() {
if v.as_mut().poll(cx).is_ready() {
*fut = None;
true
} else {
false
}
} else {
true
}
}

#[tokio::test]
pub async fn async_backpressure_caller() -> Result<()> {
Expand Down
1 change: 1 addition & 0 deletions crates/misc/component-async-tests/wit/test.wit
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ interface backpressure {
set-backpressure: func(enabled: bool);
inc-backpressure: func();
dec-backpressure: func();
inc-then-later-dec-backpressure: async func();
}

interface transmit {
Expand Down
10 changes: 10 additions & 0 deletions crates/test-programs/src/bin/async_backpressure_callee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ impl Backpressure for Component {
fn dec_backpressure() {
wit_bindgen::backpressure_dec();
}
async fn inc_then_later_dec_backpressure() {
wit_bindgen::backpressure_inc();

wit_bindgen::spawn_local(async {
for _ in 0..10 {
wit_bindgen::yield_async().await;
}
wit_bindgen::backpressure_dec();
});
}
}

// Unused function; required since this file is built as a `bin`:
Expand Down
10 changes: 10 additions & 0 deletions crates/test-programs/src/bin/async_cancel_callee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,16 @@ unsafe extern "C" fn export_dec_backpressure() {
wit_bindgen::backpressure_dec();
}

#[unsafe(export_name = "[async-lift]local:local/backpressure#inc-then-later-dec-backpressure")]
unsafe extern "C" fn export_inc_then_later_dec_backpressure() -> u32 {
todo!()
}

#[unsafe(export_name = "[callback][async-lift]local:local/backpressure#inc-then-later-dec-backpressure")]
unsafe extern "C" fn callback_inc_then_later_dec_backpressure(_: u32, _: u32, _: u32) -> u32 {
todo!()
}

#[unsafe(export_name = "local:local/yield#yield-times")]
unsafe extern "C" fn export_yield_yield_times(times: u64) {
unsafe {
Expand Down
36 changes: 34 additions & 2 deletions crates/wasi-http/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use std::sync::{
use std::task::{Context, Poll};
use std::time::Instant;
use tokio::sync::Notify;
use wasmtime::component::{Accessor, GuestTaskId, Resource, TypedFuncCallConcurrent};
use wasmtime::component::{Accessor, GuestTaskId, HasData, Resource, TypedFuncCallConcurrent};
#[cfg(feature = "p2")]
use wasmtime::error::Context as _;
use wasmtime::{AsContextMut, Result, Store, StoreContextMut, format_err};
Expand Down Expand Up @@ -793,6 +793,7 @@ where
Some((pair, queue))
}
));
let mut ready = pin!(when_ready(accessor, proxy));
future::poll_fn(|cx| {
loop {
// First, and crucially first, poll `futures`. This way
Expand Down Expand Up @@ -839,6 +840,13 @@ where
Poll::Ready(None) | Poll::Pending => {}
}

ready.set(when_ready(accessor, proxy));
let is_ready = match ready.as_mut().poll(cx) {
Poll::Ready(Ok(())) => true,
Poll::Ready(Err(error)) => break Poll::Ready(Err(error)),
Poll::Pending => false,
};

// At this point `futures` is either empty or it's `Pending`
// meaning nothing is ready. Note that `Pending` here
// doesn't necessarily mean all tasks are blocked on I/O.
Expand All @@ -850,6 +858,7 @@ where
// at all or all our tasks really are blocked on I/O.
self.set_available(
may_accept
&& is_ready
&& match dropper
.state
.should_accept_request(futures.len(), reuse_count)
Expand Down Expand Up @@ -905,7 +914,7 @@ where
// if we're not actually capable of accepting any more work,
// then we're completely done and it's time to exit this
// worker.
if !may_accept {
if !(may_accept && is_ready) {
break Poll::Ready(Ok(()));
}

Expand Down Expand Up @@ -1401,3 +1410,26 @@ impl<'a, T: Send> Prepared<'a, T> {
}
}
}

async fn when_ready<T, D: HasData>(accessor: &Accessor<T, D>, proxy: &Proxy) -> Result<()> {
match proxy {
#[cfg(feature = "p3")]
Proxy::P3(guest) => {
guest
.wasi_http_handler()
.func_handle()
.func()
.ready_for_concurrent_call(accessor)
.await
}
#[cfg(feature = "p2")]
Proxy::P2(guest) => {
guest
.wasi_http_incoming_handler()
.func_handle()
.func()
.ready_for_concurrent_call(accessor)
.await
}
}
}
84 changes: 84 additions & 0 deletions crates/wasmtime/src/runtime/component/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1843,6 +1843,9 @@ impl StoreOpaque {
/// Iterate over `InstanceState::pending`, moving any ready items into the
/// "high priority" work item queue.
///
/// Also, wake any wakers interested in the transition from not ready to
/// ready.
///
/// See `GuestCall::is_ready` for details.
fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
for (thread, kind) in
Expand All @@ -1860,6 +1863,20 @@ impl StoreOpaque {
}
}

for waker in self
.instance_state(instance)
.concurrent_state()
.wakers
.get_mut()
.iter_mut()
{
if let Some(waker) = waker.downcast_ref::<Waker>() {
waker.wake_by_ref();
} else {
bail_bug!("`ConcurrentInstanceState::wakers` should contain only `Waker`s");
}
}

Ok(())
}

Expand Down Expand Up @@ -4989,6 +5006,7 @@ pub struct ConcurrentInstanceState {
/// Pending calls for this instance which require `Self::backpressure` to be
/// `true` and/or `Self::do_not_enter` to be false before they can proceed.
pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
wakers: AlwaysMut<ResourceTable>,
}

impl ConcurrentInstanceState {
Expand Down Expand Up @@ -5702,3 +5720,69 @@ fn queue_call0<T: 'static>(
)
}
}

pub(crate) struct ReadyToCall<'a, T: 'static, D: HasData + ?Sized> {
accessor: &'a Accessor<T, D>,
func: Func,
waker_key: Option<Resource<Waker>>,
}
Comment on lines +5724 to +5728

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this perhaps be modeled as a poll_* function instead of an async function to avoid the need to have a list of wakers to wake up? Similar to Accesor::poll_no_interesting_tasks which makes the API contract clear. I think one-warker-per instance is all we need for wasi:http, right?


impl<T, D: HasData + ?Sized> Future for ReadyToCall<'_, T, D> {
type Output = Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.accessor.with(|mut store| {
let store = store.as_context_mut();
let (_, _, _, raw_options) = self.func.abi_info(store.0);
let instance = self.func.instance().runtime_instance(raw_options.instance);
let state = store.0.instance_state(instance).concurrent_state();
if state.backpressure == 0 {
Poll::Ready(Ok(()))
} else {
let waker = cx.waker().clone();
if let Some(key) = &self.waker_key {
*state.wakers.get_mut().get_mut(key).unwrap() = waker;
} else {
self.waker_key = Some(
state
.wakers
.get_mut()
.push(waker)
.map_err(crate::Error::from)?,
);
}
Poll::Pending
}
})
}
}

impl<T, D: HasData + ?Sized> Drop for ReadyToCall<'_, T, D> {
fn drop(&mut self) {
if let Some(key) = self.waker_key.take() {
self.accessor.with(|mut store| {
let store = store.as_context_mut();
let (_, _, _, raw_options) = self.func.abi_info(store.0);
let instance = self.func.instance().runtime_instance(raw_options.instance);
_ = store
.0
.instance_state(instance)
.concurrent_state()
.wakers
.get_mut()
.delete(key);
});
}
}
}

pub(crate) fn ready_to_call<'a, T, D: HasData + ?Sized>(
accessor: &'a Accessor<T, D>,
func: Func,
) -> ReadyToCall<'a, T, D> {
ReadyToCall {
accessor,
func,
waker_key: None,
}
}
11 changes: 11 additions & 0 deletions crates/wasmtime/src/runtime/component/concurrent/func.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,17 @@ impl Func {
Ok(())
}

/// Returns a future which will resolve once the component instance
/// corresponding to this function is ready to run a concurrent call without
/// queuing it (i.e. does not have backpressure enabled and does not have a
/// sync call in progress).
pub async fn ready_for_concurrent_call<T: 'static>(
self,
accessor: impl AsAccessor<Data = T>,
) -> Result<()> {
concurrent::ready_to_call(accessor.as_accessor(), self).await
}

/// Calls `concurrent::prepare_call` with monomorphized functions for
/// lowering the parameters and lifting the result.
fn prepare_call_dynamic<'a, T: Send + 'static>(
Expand Down
Loading