diff --git a/Cargo.toml b/Cargo.toml index 4e248eb..4501978 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,10 +16,16 @@ rust-version.workspace = true default = ["json"] json = ["dep:serde", "dep:serde_json"] +[lints.rust] +unexpected_cfgs = { level = "warn", check-cfg = [ + 'cfg(feature, values("wasip3"))', +] } + [dependencies] anyhow.workspace = true async-task.workspace = true bytes.workspace = true +cfg-if.workspace = true futures-lite.workspace = true http-body-util.workspace = true http-body.workspace = true @@ -44,12 +50,7 @@ serde = { workspace = true, features = ["derive"] } serde_json.workspace = true [workspace] -members = [ - "axum", - "axum/macro", - "macro", - "test-programs", -] +members = ["axum", "axum/macro", "macro", "test-programs"] resolver = "2" [workspace.package] @@ -72,6 +73,7 @@ async-task = "4.7" axum = { version = "0.8.6", default-features = false } bytes = "1.10.1" cargo_metadata = "0.22" +cfg-if = "1" clap = { version = "4.5.26", features = ["derive"] } futures-core = "0.3.19" futures-lite = "1.12.0" @@ -84,7 +86,7 @@ http-body-util = "0.1.3" itoa = "1" pin-project-lite = "0.2.8" quote = "1.0" -serde= "1" +serde = "1" serde_json = "1" serde_qs = "0.15" sync_wrapper = "1" @@ -102,6 +104,4 @@ wstd-macro = { path = "./macro", version = "=0.6.6" } [package.metadata.docs.rs] all-features = true -targets = [ - "wasm32-wasip2" -] +targets = ["wasm32-wasip2"] diff --git a/src/future.rs b/src/future.rs new file mode 100644 index 0000000..b4762e0 --- /dev/null +++ b/src/future.rs @@ -0,0 +1,233 @@ +//! Asynchronous values. +//! +//! # Cancellation +//! +//! Futures can be cancelled by dropping them before they finish executing. This +//! is useful when we're no longer interested in the result of an operation, as +//! it allows us to stop doing needless work. This also means that a future may cancel at any `.await` point, and so just +//! like with `?` we have to be careful to roll back local state if our future +//! halts there. +//! +//! +//! ```no_run +//! use futures_lite::prelude::*; +//! use wstd::prelude::*; +//! use wstd::time::Duration; +//! +//! #[wstd::main] +//! async fn main() { +//! let mut counter = 0; +//! let value = async { "meow" } +//! .delay(Duration::from_millis(100)) +//! .timeout(Duration::from_millis(200)) +//! .await; +//! +//! assert_eq!(value.unwrap(), "meow"); +//! } +//! ``` + +use std::future::Future; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll, ready}; + +use pin_project_lite::pin_project; + +use crate::time::utils::timeout_err; + +pub use self::future_ext::FutureExt; + +// ---- Delay ---- + +pin_project! { + /// Suspends a future until the specified deadline. + /// + /// This `struct` is created by the [`delay`] method on [`FutureExt`]. See its + /// documentation for more. + /// + /// [`delay`]: crate::future::FutureExt::delay + /// [`FutureExt`]: crate::future::futureExt + #[must_use = "futures do nothing unless polled or .awaited"] + pub struct Delay { + #[pin] + future: F, + #[pin] + deadline: D, + state: State, + } +} + +/// The internal state +#[derive(Debug)] +enum State { + Started, + PollFuture, + Completed, +} + +impl Delay { + fn new(future: F, deadline: D) -> Self { + Self { + future, + deadline, + state: State::Started, + } + } +} + +impl Future for Delay { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + loop { + match this.state { + State::Started => { + ready!(this.deadline.as_mut().poll(cx)); + *this.state = State::PollFuture; + } + State::PollFuture => { + let value = ready!(this.future.as_mut().poll(cx)); + *this.state = State::Completed; + return Poll::Ready(value); + } + State::Completed => panic!("future polled after completing"), + } + } + } +} + +// ---- Timeout ---- + +pin_project! { + /// A future that times out after a duration of time. + /// + /// This `struct` is created by the [`timeout`] method on [`FutureExt`]. See its + /// documentation for more. + /// + /// [`timeout`]: crate::future::FutureExt::timeout + /// [`FutureExt`]: crate::future::futureExt + #[must_use = "futures do nothing unless polled or .awaited"] + pub struct Timeout { + #[pin] + future: F, + #[pin] + deadline: D, + completed: bool, + } +} + +impl Timeout { + fn new(future: F, deadline: D) -> Self { + Self { + future, + deadline, + completed: false, + } + } +} + +impl Future for Timeout { + type Output = io::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + assert!(!*this.completed, "future polled after completing"); + + match this.future.poll(cx) { + Poll::Ready(v) => { + *this.completed = true; + Poll::Ready(Ok(v)) + } + Poll::Pending => match this.deadline.poll(cx) { + Poll::Ready(_) => { + *this.completed = true; + Poll::Ready(Err(timeout_err("future timed out"))) + } + Poll::Pending => Poll::Pending, + }, + } + } +} + +// ---- FutureExt ---- + +mod future_ext { + use super::{Delay, Timeout}; + use std::future::{Future, IntoFuture}; + + /// Extend `Future` with time-based operations. + pub trait FutureExt: Future { + /// Return an error if a future does not complete within a given time span. + /// + /// Typically timeouts are, as the name implies, based on _time_. However + /// this method can time out based on any future. This can be useful in + /// combination with channels, as it allows (long-lived) futures to be + /// cancelled based on some external event. + /// + /// When a timeout is returned, the future will be dropped and destructors + /// will be run. + /// + /// # Example + /// + /// ```no_run + /// use wstd::prelude::*; + /// use wstd::time::{Instant, Duration}; + /// use std::io; + /// + /// #[wstd::main] + /// async fn main() { + /// let res = async { "meow" } + /// .delay(Duration::from_millis(100)) // longer delay + /// .timeout(Duration::from_millis(50)) // shorter timeout + /// .await; + /// assert_eq!(res.unwrap_err().kind(), io::ErrorKind::TimedOut); // error + /// + /// let res = async { "meow" } + /// .delay(Duration::from_millis(50)) // shorter delay + /// .timeout(Duration::from_millis(100)) // longer timeout + /// .await; + /// assert_eq!(res.unwrap(), "meow"); // success + /// } + /// ``` + fn timeout(self, deadline: D) -> Timeout + where + Self: Sized, + D: IntoFuture, + { + Timeout::new(self, deadline.into_future()) + } + + /// Delay resolving the future until the given deadline. + /// + /// The underlying future will not be polled until the deadline has expired. In addition + /// to using a time source as a deadline, any future can be used as a + /// deadline too. When used in combination with a multi-consumer channel, + /// this method can be used to synchronize the start of multiple futures and streams. + /// + /// # Example + /// + /// ```no_run + /// use wstd::prelude::*; + /// use wstd::time::{Instant, Duration}; + /// + /// #[wstd::main] + /// async fn main() { + /// let now = Instant::now(); + /// let delay = Duration::from_millis(100); + /// let _ = async { "meow" }.delay(delay).await; + /// assert!(now.elapsed() >= delay); + /// } + /// ``` + fn delay(self, deadline: D) -> Delay + where + Self: Sized, + D: IntoFuture, + { + Delay::new(self, deadline.into_future()) + } + } + + impl FutureExt for T where T: Future {} +} diff --git a/src/future/delay.rs b/src/future/delay.rs deleted file mode 100644 index 20d6753..0000000 --- a/src/future/delay.rs +++ /dev/null @@ -1,63 +0,0 @@ -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll, ready}; - -use pin_project_lite::pin_project; - -pin_project! { - /// Suspends a future until the specified deadline. - /// - /// This `struct` is created by the [`delay`] method on [`FutureExt`]. See its - /// documentation for more. - /// - /// [`delay`]: crate::future::FutureExt::delay - /// [`FutureExt`]: crate::future::futureExt - #[must_use = "futures do nothing unless polled or .awaited"] - pub struct Delay { - #[pin] - future: F, - #[pin] - deadline: D, - state: State, - } -} - -/// The internal state -#[derive(Debug)] -enum State { - Started, - PollFuture, - Completed, -} - -impl Delay { - pub(super) fn new(future: F, deadline: D) -> Self { - Self { - future, - deadline, - state: State::Started, - } - } -} - -impl Future for Delay { - type Output = F::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.project(); - loop { - match this.state { - State::Started => { - ready!(this.deadline.as_mut().poll(cx)); - *this.state = State::PollFuture; - } - State::PollFuture => { - let value = ready!(this.future.as_mut().poll(cx)); - *this.state = State::Completed; - return Poll::Ready(value); - } - State::Completed => panic!("future polled after completing"), - } - } - } -} diff --git a/src/future/future_ext.rs b/src/future/future_ext.rs deleted file mode 100644 index 2835f4b..0000000 --- a/src/future/future_ext.rs +++ /dev/null @@ -1,76 +0,0 @@ -use super::{Delay, Timeout}; -use std::future::{Future, IntoFuture}; - -/// Extend `Future` with time-based operations. -pub trait FutureExt: Future { - /// Return an error if a future does not complete within a given time span. - /// - /// Typically timeouts are, as the name implies, based on _time_. However - /// this method can time out based on any future. This can be useful in - /// combination with channels, as it allows (long-lived) futures to be - /// cancelled based on some external event. - /// - /// When a timeout is returned, the future will be dropped and destructors - /// will be run. - /// - /// # Example - /// - /// ```no_run - /// use wstd::prelude::*; - /// use wstd::time::{Instant, Duration}; - /// use std::io; - /// - /// #[wstd::main] - /// async fn main() { - /// let res = async { "meow" } - /// .delay(Duration::from_millis(100)) // longer delay - /// .timeout(Duration::from_millis(50)) // shorter timeout - /// .await; - /// assert_eq!(res.unwrap_err().kind(), io::ErrorKind::TimedOut); // error - /// - /// let res = async { "meow" } - /// .delay(Duration::from_millis(50)) // shorter delay - /// .timeout(Duration::from_millis(100)) // longer timeout - /// .await; - /// assert_eq!(res.unwrap(), "meow"); // success - /// } - /// ``` - fn timeout(self, deadline: D) -> Timeout - where - Self: Sized, - D: IntoFuture, - { - Timeout::new(self, deadline.into_future()) - } - - /// Delay resolving the future until the given deadline. - /// - /// The underlying future will not be polled until the deadline has expired. In addition - /// to using a time source as a deadline, any future can be used as a - /// deadline too. When used in combination with a multi-consumer channel, - /// this method can be used to synchronize the start of multiple futures and streams. - /// - /// # Example - /// - /// ```no_run - /// use wstd::prelude::*; - /// use wstd::time::{Instant, Duration}; - /// - /// #[wstd::main] - /// async fn main() { - /// let now = Instant::now(); - /// let delay = Duration::from_millis(100); - /// let _ = async { "meow" }.delay(delay).await; - /// assert!(now.elapsed() >= delay); - /// } - /// ``` - fn delay(self, deadline: D) -> Delay - where - Self: Sized, - D: IntoFuture, - { - Delay::new(self, deadline.into_future()) - } -} - -impl FutureExt for T where T: Future {} diff --git a/src/future/mod.rs b/src/future/mod.rs deleted file mode 100644 index a359afd..0000000 --- a/src/future/mod.rs +++ /dev/null @@ -1,35 +0,0 @@ -//! Asynchronous values. -//! -//! # Cancellation -//! -//! Futures can be cancelled by dropping them before they finish executing. This -//! is useful when we're no longer interested in the result of an operation, as -//! it allows us to stop doing needless work. This also means that a future may cancel at any `.await` point, and so just -//! like with `?` we have to be careful to roll back local state if our future -//! halts there. -//! -//! -//! ```no_run -//! use futures_lite::prelude::*; -//! use wstd::prelude::*; -//! use wstd::time::Duration; -//! -//! #[wstd::main] -//! async fn main() { -//! let mut counter = 0; -//! let value = async { "meow" } -//! .delay(Duration::from_millis(100)) -//! .timeout(Duration::from_millis(200)) -//! .await; -//! -//! assert_eq!(value.unwrap(), "meow"); -//! } -//! ``` - -mod delay; -mod future_ext; -mod timeout; - -pub use delay::Delay; -pub use future_ext::FutureExt; -pub use timeout::Timeout; diff --git a/src/future/timeout.rs b/src/future/timeout.rs deleted file mode 100644 index 9b00e1b..0000000 --- a/src/future/timeout.rs +++ /dev/null @@ -1,60 +0,0 @@ -use crate::time::utils::timeout_err; - -use std::future::Future; -use std::io; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use pin_project_lite::pin_project; - -pin_project! { - /// A future that times out after a duration of time. - /// - /// This `struct` is created by the [`timeout`] method on [`FutureExt`]. See its - /// documentation for more. - /// - /// [`timeout`]: crate::future::FutureExt::timeout - /// [`FutureExt`]: crate::future::futureExt - #[must_use = "futures do nothing unless polled or .awaited"] - pub struct Timeout { - #[pin] - future: F, - #[pin] - deadline: D, - completed: bool, - } -} - -impl Timeout { - pub(super) fn new(future: F, deadline: D) -> Self { - Self { - future, - deadline, - completed: false, - } - } -} - -impl Future for Timeout { - type Output = io::Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - - assert!(!*this.completed, "future polled after completing"); - - match this.future.poll(cx) { - Poll::Ready(v) => { - *this.completed = true; - Poll::Ready(Ok(v)) - } - Poll::Pending => match this.deadline.poll(cx) { - Poll::Ready(_) => { - *this.completed = true; - Poll::Ready(Err(timeout_err("future timed out"))) - } - Poll::Pending => Poll::Pending, - }, - } - } -} diff --git a/src/http.rs b/src/http.rs new file mode 100644 index 0000000..12309be --- /dev/null +++ b/src/http.rs @@ -0,0 +1,69 @@ +//! HTTP networking support + +pub use http::status::StatusCode; +pub use http::uri::{Authority, PathAndQuery, Uri}; + +pub use crate::sys::http::client::Client; +pub use crate::sys::http::fields::{HeaderMap, HeaderName, HeaderValue}; +pub use crate::sys::http::method::Method; +pub use crate::sys::http::scheme::{InvalidUri, Scheme}; +#[doc(inline)] +pub use body::{Body, util::BodyExt}; +pub use error::{Error, ErrorCode, Result}; +pub use request::Request; +pub use response::Response; + +pub mod body { + //! HTTP body types. + pub use crate::sys::http::body::*; +} + +pub mod error { + //! The http portion of wstd uses `anyhow::Error` as its `Error` type. + //! + //! There are various concrete error types + + pub use crate::http::body::InvalidContentLength; + pub use crate::sys::http::{ErrorCode, HeaderError}; + pub use anyhow::Context; + pub use http::header::{InvalidHeaderName, InvalidHeaderValue}; + pub use http::method::InvalidMethod; + + pub type Error = anyhow::Error; + /// The `http` result type. + pub type Result = std::result::Result; +} + +pub mod request { + //! HTTP request types. + pub use crate::sys::http::request::*; +} + +pub mod response { + //! HTTP response types. + pub use crate::sys::http::response::*; +} + +pub mod server { + //! HTTP servers + //! + //! The WASI HTTP server uses the [typed main] idiom, with a `main` function + //! that takes a [`Request`] and succeeds with a [`Response`], using the + //! [`http_server`] macro: + //! + //! ```no_run + //! use wstd::http::{Request, Response, Body, Error}; + //! #[wstd::http_server] + //! async fn main(_request: Request) -> Result, Error> { + //! Ok(Response::new("Hello!\n".into())) + //! } + //! ``` + //! + //! [typed main]: https://sunfishcode.github.io/typed-main-wasi-presentation/chapter_1.html + //! [`Request`]: crate::http::Request + //! [`Responder`]: crate::http::server::Responder + //! [`Response`]: crate::http::Response + //! [`http_server`]: crate::http_server + + pub use crate::sys::http::server::*; +} diff --git a/src/http/error.rs b/src/http/error.rs deleted file mode 100644 index a4f22b0..0000000 --- a/src/http/error.rs +++ /dev/null @@ -1,13 +0,0 @@ -//! The http portion of wstd uses `anyhow::Error` as its `Error` type. -//! -//! There are various concrete error types - -pub use crate::http::body::InvalidContentLength; -pub use anyhow::Context; -pub use http::header::{InvalidHeaderName, InvalidHeaderValue}; -pub use http::method::InvalidMethod; -pub use wasip2::http::types::{ErrorCode, HeaderError}; - -pub type Error = anyhow::Error; -/// The `http` result type. -pub type Result = std::result::Result; diff --git a/src/http/mod.rs b/src/http/mod.rs deleted file mode 100644 index 39f0a40..0000000 --- a/src/http/mod.rs +++ /dev/null @@ -1,25 +0,0 @@ -//! HTTP networking support -//! -pub use http::status::StatusCode; -pub use http::uri::{Authority, PathAndQuery, Uri}; - -#[doc(inline)] -pub use body::{Body, util::BodyExt}; -pub use client::Client; -pub use error::{Error, ErrorCode, Result}; -pub use fields::{HeaderMap, HeaderName, HeaderValue}; -pub use method::Method; -pub use request::Request; -pub use response::Response; -pub use scheme::{InvalidUri, Scheme}; - -pub mod body; - -mod client; -pub mod error; -mod fields; -mod method; -pub mod request; -pub mod response; -mod scheme; -pub mod server; diff --git a/src/io/mod.rs b/src/io/mod.rs index 0f34b1b..898d4d7 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -5,18 +5,16 @@ mod cursor; mod empty; mod read; mod seek; -mod stdio; -mod streams; mod write; pub use crate::runtime::AsyncPollable; +pub use crate::sys::io::*; +pub use crate::sys::stdio::*; pub use copy::*; pub use cursor::*; pub use empty::*; pub use read::*; pub use seek::*; -pub use stdio::*; -pub use streams::*; pub use write::*; /// The error type for I/O operations. diff --git a/src/iter/mod.rs b/src/iter.rs similarity index 100% rename from src/iter/mod.rs rename to src/iter.rs diff --git a/src/lib.rs b/src/lib.rs index ebc673d..e4edc55 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -55,6 +55,9 @@ //! These are unique capabilities provided by WASI 0.2, and because this library //! is specific to that are exposed from here. +#[allow(unreachable_pub)] +mod sys; + pub mod future; #[macro_use] pub mod http; diff --git a/src/net.rs b/src/net.rs new file mode 100644 index 0000000..6ddff7d --- /dev/null +++ b/src/net.rs @@ -0,0 +1,3 @@ +//! Async network abstractions. + +pub use crate::sys::net::*; diff --git a/src/rand.rs b/src/rand.rs new file mode 100644 index 0000000..e79a70b --- /dev/null +++ b/src/rand.rs @@ -0,0 +1,3 @@ +//! Random number generation. + +pub use crate::sys::random::{get_insecure_random_bytes, get_random_bytes}; diff --git a/src/runtime.rs b/src/runtime.rs new file mode 100644 index 0000000..19dd379 --- /dev/null +++ b/src/runtime.rs @@ -0,0 +1,13 @@ +//! Async event loop support. +//! +//! The way to use this is to call [`block_on()`]. Inside the future, [`Reactor::current`] +//! will give an instance of the [`Reactor`] running the event loop, which can be +//! to [`AsyncPollable::wait_for`] instances of +//! [`wasip2::Pollable`](https://docs.rs/wasi/latest/wasi/io/poll/struct.Pollable.html). +//! This will automatically wait for the futures to resolve, and call the +//! necessary wakers to work. + +#![deny(missing_debug_implementations, nonstandard_style)] +#![warn(missing_docs, unreachable_pub)] + +pub use crate::sys::runtime::*; diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs deleted file mode 100644 index 24b9fc2..0000000 --- a/src/runtime/mod.rs +++ /dev/null @@ -1,36 +0,0 @@ -//! Async event loop support. -//! -//! The way to use this is to call [`block_on()`]. Inside the future, [`Reactor::current`] -//! will give an instance of the [`Reactor`] running the event loop, which can be -//! to [`AsyncPollable::wait_for`] instances of -//! [`wasip2::Pollable`](https://docs.rs/wasi/latest/wasi/io/poll/struct.Pollable.html). -//! This will automatically wait for the futures to resolve, and call the -//! necessary wakers to work. - -#![deny(missing_debug_implementations, nonstandard_style)] -#![warn(missing_docs, unreachable_pub)] - -mod block_on; -mod reactor; - -pub use ::async_task::Task; -pub use block_on::block_on; -pub use reactor::{AsyncPollable, Reactor, WaitFor}; -use std::cell::RefCell; - -// There are no threads in WASI 0.2, so this is just a safe way to thread a single reactor to all -// use sites in the background. -std::thread_local! { -pub(crate) static REACTOR: RefCell> = const { RefCell::new(None) }; -} - -/// Spawn a `Future` as a `Task` on the current `Reactor`. -/// -/// Panics if called from outside `block_on`. -pub fn spawn(fut: F) -> Task -where - F: std::future::Future + 'static, - T: 'static, -{ - Reactor::current().spawn(fut) -} diff --git a/src/sys/mod.rs b/src/sys/mod.rs new file mode 100644 index 0000000..aaa8e41 --- /dev/null +++ b/src/sys/mod.rs @@ -0,0 +1,10 @@ +cfg_if::cfg_if! { + if #[cfg(all(target_os = "wasi", target_env = "p2"))] { + mod p2; + use p2 as backend; + } else { + compile_error!("unsupported target: wstd only compiles on `wasm32-wasip2`"); + } +} + +pub use backend::*; diff --git a/src/http/body.rs b/src/sys/p2/http/body.rs similarity index 99% rename from src/http/body.rs rename to src/sys/p2/http/body.rs index c23a3c0..b070a59 100644 --- a/src/http/body.rs +++ b/src/sys/p2/http/body.rs @@ -1,14 +1,11 @@ -use crate::http::{ - Error, HeaderMap, - error::Context as _, - fields::{header_map_from_wasi, header_map_to_wasi}, -}; +use super::fields::{header_map_from_wasi, header_map_to_wasi}; use crate::io::{AsyncInputStream, AsyncOutputStream}; use crate::runtime::{AsyncPollable, Reactor, WaitFor}; pub use ::http_body::{Body as HttpBody, Frame, SizeHint}; pub use bytes::Bytes; +use anyhow::Context as _; use http::header::CONTENT_LENGTH; use http_body_util::{BodyExt, combinators::UnsyncBoxBody}; use std::fmt; @@ -20,6 +17,9 @@ use wasip2::http::types::{ }; use wasip2::io::streams::{InputStream as WasiInputStream, StreamError}; +type Error = anyhow::Error; +type HeaderMap = http::header::HeaderMap; + pub mod util { pub use http_body_util::*; } diff --git a/src/http/client.rs b/src/sys/p2/http/client.rs similarity index 96% rename from src/http/client.rs rename to src/sys/p2/http/client.rs index 3676fa8..4957f72 100644 --- a/src/http/client.rs +++ b/src/sys/p2/http/client.rs @@ -1,6 +1,6 @@ -use super::{Body, Error, Request, Response}; -use crate::http::request::try_into_outgoing; -use crate::http::response::try_from_incoming; +use super::request::try_into_outgoing; +use super::response::try_from_incoming; +use crate::http::{Body, Error, Request, Response}; use crate::io::AsyncPollable; use crate::time::Duration; use wasip2::http::types::RequestOptions as WasiRequestOptions; diff --git a/src/http/fields.rs b/src/sys/p2/http/fields.rs similarity index 94% rename from src/http/fields.rs rename to src/sys/p2/http/fields.rs index de6df16..5c40d50 100644 --- a/src/http/fields.rs +++ b/src/sys/p2/http/fields.rs @@ -1,6 +1,7 @@ pub use http::header::{HeaderMap, HeaderName, HeaderValue}; -use super::{Error, error::Context}; +use crate::http::Error; +use crate::http::error::Context; use wasip2::http::types::Fields; pub(crate) fn header_map_from_wasi(wasi_fields: Fields) -> Result { diff --git a/src/http/method.rs b/src/sys/p2/http/method.rs similarity index 100% rename from src/http/method.rs rename to src/sys/p2/http/method.rs diff --git a/src/sys/p2/http/mod.rs b/src/sys/p2/http/mod.rs new file mode 100644 index 0000000..e62627a --- /dev/null +++ b/src/sys/p2/http/mod.rs @@ -0,0 +1,10 @@ +pub mod body; +pub(crate) mod client; +pub(crate) mod fields; +pub(crate) mod method; +pub mod request; +pub mod response; +pub(crate) mod scheme; +pub mod server; + +pub use wasip2::http::types::{ErrorCode, HeaderError}; diff --git a/src/http/request.rs b/src/sys/p2/http/request.rs similarity index 95% rename from src/http/request.rs rename to src/sys/p2/http/request.rs index 6694d03..a2aa9f8 100644 --- a/src/http/request.rs +++ b/src/sys/p2/http/request.rs @@ -1,11 +1,12 @@ -use super::{ +use super::fields::{header_map_from_wasi, header_map_to_wasi}; +use super::method::{from_wasi_method, to_wasi_method}; +use super::scheme::{from_wasi_scheme, to_wasi_scheme}; +use crate::http::{ Authority, HeaderMap, PathAndQuery, Uri, body::{Body, BodyHint}, error::{Context, Error, ErrorCode}, - fields::{header_map_from_wasi, header_map_to_wasi}, - method::{from_wasi_method, to_wasi_method}, - scheme::{from_wasi_scheme, to_wasi_scheme}, }; + use wasip2::http::outgoing_handler::OutgoingRequest; use wasip2::http::types::IncomingRequest; diff --git a/src/http/response.rs b/src/sys/p2/http/response.rs similarity index 96% rename from src/http/response.rs rename to src/sys/p2/http/response.rs index 2ab8d87..c3cae53 100644 --- a/src/http/response.rs +++ b/src/sys/p2/http/response.rs @@ -1,9 +1,9 @@ use http::StatusCode; use wasip2::http::types::IncomingResponse; +use super::fields::{HeaderMap, header_map_from_wasi}; use crate::http::body::{Body, BodyHint}; use crate::http::error::Error; -use crate::http::fields::{HeaderMap, header_map_from_wasi}; pub use http::response::{Builder, Response}; @@ -21,7 +21,6 @@ pub(crate) fn try_from_incoming(incoming: IncomingResponse) -> Result` or @@ -31,6 +30,7 @@ pub(crate) fn try_from_incoming(incoming: IncomingResponse) -> Result> = const { RefCell::new(None) }; +} use std::future::Future; use std::pin::pin; @@ -62,3 +72,14 @@ where } } } + +/// Spawn a `Future` as a `Task` on the current `Reactor`. +/// +/// Panics if called from outside `block_on`. +pub fn spawn(fut: F) -> Task +where + F: std::future::Future + 'static, + T: 'static, +{ + Reactor::current().spawn(fut) +} diff --git a/src/runtime/reactor.rs b/src/sys/p2/runtime/reactor.rs similarity index 100% rename from src/runtime/reactor.rs rename to src/sys/p2/runtime/reactor.rs diff --git a/src/io/stdio.rs b/src/sys/p2/stdio.rs similarity index 98% rename from src/io/stdio.rs rename to src/sys/p2/stdio.rs index b2ac153..fa183a3 100644 --- a/src/io/stdio.rs +++ b/src/sys/p2/stdio.rs @@ -1,4 +1,4 @@ -use super::{AsyncInputStream, AsyncOutputStream, AsyncRead, AsyncWrite, Result}; +use crate::io::{AsyncInputStream, AsyncOutputStream, AsyncRead, AsyncWrite, Result}; use std::cell::LazyCell; use wasip2::cli::terminal_input::TerminalInput; use wasip2::cli::terminal_output::TerminalOutput; diff --git a/src/sys/p2/time.rs b/src/sys/p2/time.rs new file mode 100644 index 0000000..238a218 --- /dev/null +++ b/src/sys/p2/time.rs @@ -0,0 +1,49 @@ +use wasip2::clocks::{ + monotonic_clock::{self, subscribe_duration, subscribe_instant}, + wall_clock, +}; + +use crate::runtime::{AsyncPollable, Reactor}; + +/// A measurement of a monotonically nondecreasing clock. Opaque and useful only +/// with Duration. +pub type MonotonicInstant = monotonic_clock::Instant; + +/// A duration from the monotonic clock, in nanoseconds. +pub type MonotonicDuration = monotonic_clock::Duration; + +/// Return the current monotonic clock instant. +pub fn now() -> MonotonicInstant { + monotonic_clock::now() +} + +/// A measurement of the system clock, useful for talking to external entities +/// like the file system or other processes. May be converted losslessly to a +/// more useful `std::time::SystemTime` to provide more methods. +#[derive(Debug, Clone, Copy)] +#[allow(dead_code)] +pub struct SystemTime(wall_clock::Datetime); + +impl SystemTime { + pub fn now() -> Self { + Self(wall_clock::now()) + } +} + +impl From for std::time::SystemTime { + fn from(st: SystemTime) -> Self { + std::time::SystemTime::UNIX_EPOCH + + std::time::Duration::from_secs(st.0.seconds) + + std::time::Duration::from_nanos(st.0.nanoseconds.into()) + } +} + +/// Create a timer that fires at a specific monotonic clock instant. +pub fn subscribe_at(instant: MonotonicInstant) -> AsyncPollable { + Reactor::current().schedule(subscribe_instant(instant)) +} + +/// Create a timer that fires after a monotonic clock duration. +pub fn subscribe_after(duration: MonotonicDuration) -> AsyncPollable { + Reactor::current().schedule(subscribe_duration(duration)) +} diff --git a/src/time/duration.rs b/src/time.rs similarity index 55% rename from src/time/duration.rs rename to src/time.rs index 7f67ceb..c9a9495 100644 --- a/src/time/duration.rs +++ b/src/time.rs @@ -1,7 +1,24 @@ -use super::{Instant, Wait}; -use std::future::IntoFuture; +//! Async time interfaces. + +pub(crate) mod utils { + use std::io; + + pub(crate) fn timeout_err(msg: &'static str) -> io::Error { + io::Error::new(io::ErrorKind::TimedOut, msg) + } +} + +use pin_project_lite::pin_project; +use std::future::{Future, IntoFuture}; use std::ops::{Add, AddAssign, Sub, SubAssign}; -use wasip2::clocks::monotonic_clock; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::{iter::AsyncIterator, runtime::AsyncPollable}; + +pub use crate::sys::time::SystemTime; + +// ---- Duration ---- /// A Duration type to represent a span of time, typically used for system /// timeouts. @@ -10,7 +27,7 @@ use wasip2::clocks::monotonic_clock; /// without coherence issues, just like if we were implementing this in the /// stdlib. #[derive(Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Clone, Copy)] -pub struct Duration(pub(crate) monotonic_clock::Duration); +pub struct Duration(pub(crate) crate::sys::time::MonotonicDuration); impl Duration { /// Creates a new `Duration` from the specified number of whole seconds and /// additional nanoseconds. @@ -162,10 +179,186 @@ impl IntoFuture for Duration { } } +// ---- Instant ---- + +/// A measurement of a monotonically nondecreasing clock. Opaque and useful only +/// with Duration. +/// +/// This type wraps `std::time::Duration` so we can implement traits on it +/// without coherence issues, just like if we were implementing this in the +/// stdlib. +#[derive(Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Clone, Copy)] +pub struct Instant(pub(crate) crate::sys::time::MonotonicInstant); + +impl Instant { + /// Returns an instant corresponding to "now". + /// + /// # Examples + /// + /// ```no_run + /// use wstd::time::Instant; + /// + /// let now = Instant::now(); + /// ``` + #[must_use] + pub fn now() -> Self { + Instant(crate::sys::time::now()) + } + + /// Returns the amount of time elapsed from another instant to this one, or zero duration if + /// that instant is later than this one. + pub fn duration_since(&self, earlier: Instant) -> Duration { + Duration::from_nanos(self.0.saturating_sub(earlier.0)) + } + + /// Returns the amount of time elapsed since this instant. + pub fn elapsed(&self) -> Duration { + Instant::now().duration_since(*self) + } +} + +impl Add for Instant { + type Output = Self; + + fn add(self, rhs: Duration) -> Self::Output { + Self(self.0 + rhs.0) + } +} + +impl AddAssign for Instant { + fn add_assign(&mut self, rhs: Duration) { + *self = Self(self.0 + rhs.0) + } +} + +impl Sub for Instant { + type Output = Self; + + fn sub(self, rhs: Duration) -> Self::Output { + Self(self.0 - rhs.0) + } +} + +impl SubAssign for Instant { + fn sub_assign(&mut self, rhs: Duration) { + *self = Self(self.0 - rhs.0) + } +} + +impl IntoFuture for Instant { + type Output = Instant; + + type IntoFuture = Wait; + + fn into_future(self) -> Self::IntoFuture { + crate::task::sleep_until(self) + } +} + +// ---- Timer / Interval ---- + +/// An async iterator representing notifications at fixed interval. +pub fn interval(duration: Duration) -> Interval { + Interval { duration } +} + +/// An async iterator representing notifications at fixed interval. +/// +/// See the [`interval`] function for more. +#[derive(Debug)] +pub struct Interval { + duration: Duration, +} +impl AsyncIterator for Interval { + type Item = Instant; + + async fn next(&mut self) -> Option { + Some(Timer::after(self.duration).wait().await) + } +} + +#[derive(Debug)] +pub struct Timer(Option); + +impl Timer { + pub fn never() -> Timer { + Timer(None) + } + pub fn at(deadline: Instant) -> Timer { + let pollable = crate::sys::time::subscribe_at(deadline.0); + Timer(Some(pollable)) + } + pub fn after(duration: Duration) -> Timer { + let pollable = crate::sys::time::subscribe_after(duration.0); + Timer(Some(pollable)) + } + pub fn set_after(&mut self, duration: Duration) { + *self = Self::after(duration); + } + pub fn wait(&self) -> Wait { + let wait_for = self.0.as_ref().map(AsyncPollable::wait_for); + Wait { wait_for } + } +} + +pin_project! { + /// Future created by [`Timer::wait`] + #[must_use = "futures do nothing unless polled or .awaited"] + pub struct Wait { + #[pin] + wait_for: Option + } +} + +impl Future for Wait { + type Output = Instant; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + match this.wait_for.as_pin_mut() { + None => Poll::Pending, + Some(f) => match f.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(()) => Poll::Ready(Instant::now()), + }, + } + } +} + #[cfg(test)] -mod tests { +mod test { use super::*; + async fn debug_duration(what: &str, f: impl Future) { + let start = Instant::now(); + let now = f.await; + let d = now.duration_since(start); + let d: std::time::Duration = d.into(); + println!("{what} awaited for {} s", d.as_secs_f32()); + } + + #[test] + fn timer_now() { + crate::runtime::block_on(debug_duration("timer_now", async { + Timer::at(Instant::now()).wait().await + })); + } + + #[test] + fn timer_after_100_milliseconds() { + crate::runtime::block_on(debug_duration("timer_after_100_milliseconds", async { + Timer::after(Duration::from_millis(100)).wait().await + })); + } + + #[test] + fn test_duration_since() { + let x = Instant::now(); + let d = Duration::new(456, 789); + let y = x + d; + assert_eq!(y.duration_since(x), d); + } + #[test] fn test_new_from_as() { assert_eq!(Duration::new(456, 864209753).as_secs(), 456); diff --git a/src/time/instant.rs b/src/time/instant.rs deleted file mode 100644 index 6e9cf97..0000000 --- a/src/time/instant.rs +++ /dev/null @@ -1,91 +0,0 @@ -use super::{Duration, Wait}; -use std::future::IntoFuture; -use std::ops::{Add, AddAssign, Sub, SubAssign}; -use wasip2::clocks::monotonic_clock; - -/// A measurement of a monotonically nondecreasing clock. Opaque and useful only -/// with Duration. -/// -/// This type wraps `std::time::Duration` so we can implement traits on it -/// without coherence issues, just like if we were implementing this in the -/// stdlib. -#[derive(Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Clone, Copy)] -pub struct Instant(pub(crate) monotonic_clock::Instant); - -impl Instant { - /// Returns an instant corresponding to "now". - /// - /// # Examples - /// - /// ```no_run - /// use wstd::time::Instant; - /// - /// let now = Instant::now(); - /// ``` - #[must_use] - pub fn now() -> Self { - Instant(wasip2::clocks::monotonic_clock::now()) - } - - /// Returns the amount of time elapsed from another instant to this one, or zero duration if - /// that instant is later than this one. - pub fn duration_since(&self, earlier: Instant) -> Duration { - Duration::from_nanos(self.0.saturating_sub(earlier.0)) - } - - /// Returns the amount of time elapsed since this instant. - pub fn elapsed(&self) -> Duration { - Instant::now().duration_since(*self) - } -} - -impl Add for Instant { - type Output = Self; - - fn add(self, rhs: Duration) -> Self::Output { - Self(self.0 + rhs.0) - } -} - -impl AddAssign for Instant { - fn add_assign(&mut self, rhs: Duration) { - *self = Self(self.0 + rhs.0) - } -} - -impl Sub for Instant { - type Output = Self; - - fn sub(self, rhs: Duration) -> Self::Output { - Self(self.0 - rhs.0) - } -} - -impl SubAssign for Instant { - fn sub_assign(&mut self, rhs: Duration) { - *self = Self(self.0 - rhs.0) - } -} - -impl IntoFuture for Instant { - type Output = Instant; - - type IntoFuture = Wait; - - fn into_future(self) -> Self::IntoFuture { - crate::task::sleep_until(self) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_duration_since() { - let x = Instant::now(); - let d = Duration::new(456, 789); - let y = x + d; - assert_eq!(y.duration_since(x), d); - } -} diff --git a/src/time/mod.rs b/src/time/mod.rs deleted file mode 100644 index db0e1b3..0000000 --- a/src/time/mod.rs +++ /dev/null @@ -1,138 +0,0 @@ -//! Async time interfaces. - -pub(crate) mod utils; - -mod duration; -mod instant; -pub use duration::Duration; -pub use instant::Instant; - -use pin_project_lite::pin_project; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; -use wasip2::clocks::{ - monotonic_clock::{subscribe_duration, subscribe_instant}, - wall_clock, -}; - -use crate::{ - iter::AsyncIterator, - runtime::{AsyncPollable, Reactor}, -}; - -/// A measurement of the system clock, useful for talking to external entities -/// like the file system or other processes. May be converted losslessly to a -/// more useful `std::time::SystemTime` to provide more methods. -#[derive(Debug, Clone, Copy)] -#[allow(dead_code)] -pub struct SystemTime(wall_clock::Datetime); - -impl SystemTime { - pub fn now() -> Self { - Self(wall_clock::now()) - } -} - -impl From for std::time::SystemTime { - fn from(st: SystemTime) -> Self { - std::time::SystemTime::UNIX_EPOCH - + std::time::Duration::from_secs(st.0.seconds) - + std::time::Duration::from_nanos(st.0.nanoseconds.into()) - } -} - -/// An async iterator representing notifications at fixed interval. -pub fn interval(duration: Duration) -> Interval { - Interval { duration } -} - -/// An async iterator representing notifications at fixed interval. -/// -/// See the [`interval`] function for more. -#[derive(Debug)] -pub struct Interval { - duration: Duration, -} -impl AsyncIterator for Interval { - type Item = Instant; - - async fn next(&mut self) -> Option { - Some(Timer::after(self.duration).wait().await) - } -} - -#[derive(Debug)] -pub struct Timer(Option); - -impl Timer { - pub fn never() -> Timer { - Timer(None) - } - pub fn at(deadline: Instant) -> Timer { - let pollable = Reactor::current().schedule(subscribe_instant(deadline.0)); - Timer(Some(pollable)) - } - pub fn after(duration: Duration) -> Timer { - let pollable = Reactor::current().schedule(subscribe_duration(duration.0)); - Timer(Some(pollable)) - } - pub fn set_after(&mut self, duration: Duration) { - *self = Self::after(duration); - } - pub fn wait(&self) -> Wait { - let wait_for = self.0.as_ref().map(AsyncPollable::wait_for); - Wait { wait_for } - } -} - -pin_project! { - /// Future created by [`Timer::wait`] - #[must_use = "futures do nothing unless polled or .awaited"] - pub struct Wait { - #[pin] - wait_for: Option - } -} - -impl Future for Wait { - type Output = Instant; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - match this.wait_for.as_pin_mut() { - None => Poll::Pending, - Some(f) => match f.poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(()) => Poll::Ready(Instant::now()), - }, - } - } -} - -#[cfg(test)] -mod test { - use super::*; - - async fn debug_duration(what: &str, f: impl Future) { - let start = Instant::now(); - let now = f.await; - let d = now.duration_since(start); - let d: std::time::Duration = d.into(); - println!("{what} awaited for {} s", d.as_secs_f32()); - } - - #[test] - fn timer_now() { - crate::runtime::block_on(debug_duration("timer_now", async { - Timer::at(Instant::now()).wait().await - })); - } - - #[test] - fn timer_after_100_milliseconds() { - crate::runtime::block_on(debug_duration("timer_after_100_milliseconds", async { - Timer::after(Duration::from_millis(100)).wait().await - })); - } -} diff --git a/src/time/utils.rs b/src/time/utils.rs deleted file mode 100644 index e6e3993..0000000 --- a/src/time/utils.rs +++ /dev/null @@ -1,5 +0,0 @@ -use std::io; - -pub(crate) fn timeout_err(msg: &'static str) -> io::Error { - io::Error::new(io::ErrorKind::TimedOut, msg) -}