diff --git a/CHANGELOG.adoc b/CHANGELOG.adoc index 423bc759e..10f79209c 100644 --- a/CHANGELOG.adoc +++ b/CHANGELOG.adoc @@ -15,6 +15,18 @@ https://github.com/oxidecomputer/dropshot/compare/v0.11.0\...HEAD[Full list of commits] +=== Breaking Changes + +* https://github.com/oxidecomputer/dropshot/pull/1028[#1028] Updates Dropshot for `hyper` 1.0 and `http` 1.0. Since consumers provide Dropshot with values from `hyper` and `http`, you'll need to update to `hyper` 1.0 and `http` 1.0 (or newer compatible versions), too. + +==== Upgrading to hyper 1.0 + +1. Update your crate's dependencies on `hyper` and `http` to 1.0 (or a newer compatible version) in Cargo.toml. +2. Replace any references to `hyper::Body` with `dropshot::Body` instead. +3. You may need to update your use of `dropshot::Body`; the `http-body-util` can be helpful. + +There are no other known breaking changes in these crates that affect Dropshot. If you have any trouble with this upgrade, please let us know by filing an issue. + == 0.11.0 (released 2024-08-21) https://github.com/oxidecomputer/dropshot/compare/v0.10.1\...v0.11.0[Full list of commits] diff --git a/Cargo.lock b/Cargo.lock index 31fed8029..9bba05e15 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -362,10 +362,12 @@ dependencies = [ "form_urlencoded", "futures", "hostname 0.4.0", - "http 0.2.9", + "http", + "http-body-util", "hyper", "hyper-rustls", "hyper-staticfile", + "hyper-util", "indexmap", "lazy_static", "libc", @@ -656,16 +658,16 @@ dependencies = [ [[package]] name = "h2" -version = "0.3.26" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8" +checksum = "524e8ac6999421f49a846c2d4411f337e53497d8ec55d67753beffa43c5d9205" dependencies = [ + "atomic-waker", "bytes", "fnv", "futures-core", "futures-sink", - "futures-util", - "http 0.2.9", + "http", "indexmap", "slab", "tokio", @@ -724,9 +726,9 @@ dependencies = [ [[package]] name = "http" -version = "0.2.9" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" dependencies = [ "bytes", "fnv", @@ -734,24 +736,25 @@ dependencies = [ ] [[package]] -name = "http" -version = "1.0.0" +name = "http-body" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", - "fnv", - "itoa", + "http", ] [[package]] -name = "http-body" -version = "0.4.3" +name = "http-body-util" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "399c583b2979440c60be0821a6199eca73bc3c8dcd9d070d75ac726e2c6186e5" +checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" dependencies = [ "bytes", - "http 0.2.9", + "futures-util", + "http", + "http-body", "pin-project-lite", ] @@ -775,53 +778,52 @@ checksum = "6456b8a6c8f33fee7d958fcd1b60d55b11940a79e63ae87013e6d22e26034440" [[package]] name = "hyper" -version = "0.14.27" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffb1cfd654a8219eaef89881fdb3bb3b1cdc5fa75ded05d6933b2b382e395468" +checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" dependencies = [ "bytes", "futures-channel", - "futures-core", "futures-util", "h2", - "http 0.2.9", + "http", "http-body", "httparse", "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.9", + "smallvec", "tokio", - "tower-service", - "tracing", "want", ] [[package]] name = "hyper-rustls" -version = "0.25.0" +version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "399c78f9338483cb7e630c8474b07268983c6bd5acee012e4211f9f7bb21b070" +checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c" dependencies = [ "futures-util", - "http 0.2.9", + "http", "hyper", + "hyper-util", "log", "rustls", "rustls-native-certs", "rustls-pki-types", "tokio", "tokio-rustls", + "tower-service", ] [[package]] name = "hyper-staticfile" -version = "0.9.5" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "318ca89e4827e7fe4ddd2824f52337239796ae8ecc761a663324407dc3d8d7e7" +checksum = "bc4bce64c32578957926e75f832032f81ebb30bcee74f86c5848b13a69e547eb" dependencies = [ "futures-util", - "http 0.2.9", + "http", "http-range", "httpdate", "hyper", @@ -833,6 +835,25 @@ dependencies = [ "winapi", ] +[[package]] +name = "hyper-util" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41296eb09f183ac68eec06e03cdbea2e759633d4067b2f6552fc2e009bcad08b" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http", + "http-body", + "hyper", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", +] + [[package]] name = "iana-time-zone" version = "0.1.47" @@ -1004,7 +1025,7 @@ dependencies = [ "bytes", "encoding_rs", "futures-util", - "http 1.0.0", + "http", "httparse", "memchr", "mime", @@ -1719,19 +1740,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.7.0" +version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ecab6c735a6bb4139c0caafd0cc3635748bbb3acf4550e8138122099251f309" - -[[package]] -name = "socket2" -version = "0.4.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662" -dependencies = [ - "libc", - "winapi", -] +checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" [[package]] name = "socket2" @@ -1927,7 +1938,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.5", + "socket2", "tokio-macros", "windows-sys 0.48.0", ] @@ -2068,7 +2079,7 @@ dependencies = [ "byteorder", "bytes", "data-encoding", - "http 1.0.0", + "http", "httparse", "log", "rand", diff --git a/dropshot/Cargo.toml b/dropshot/Cargo.toml index b42360277..46b712566 100644 --- a/dropshot/Cargo.toml +++ b/dropshot/Cargo.toml @@ -23,7 +23,8 @@ debug-ignore = "1.0.5" form_urlencoded = "1.2.1" futures = "0.3.30" hostname = "0.4.0" -http = "0.2.9" +http = "1.1.0" +http-body-util = "0.1.2" indexmap = "2.5.0" multer = "3.1.0" paste = "1.0.15" @@ -55,7 +56,11 @@ version = "^0.11.1-dev" path = "../dropshot_endpoint" [dependencies.hyper] -version = "0.14" +version = "1.4.1" +features = [ "full" ] + +[dependencies.hyper-util] +version = "0.1.9" features = [ "full" ] [dependencies.openapiv3] @@ -88,8 +93,8 @@ anyhow = "1.0.89" async-channel = "2.3.1" buf-list = "1.0.3" expectorate = "1.1.0" -hyper-rustls = "0.25.0" -hyper-staticfile = "0.9" +hyper-rustls = "0.26.0" +hyper-staticfile = "0.10" lazy_static = "1.5.0" libc = "0.2.158" mime_guess = "2.0.5" diff --git a/dropshot/examples/file_server.rs b/dropshot/examples/file_server.rs index df9392496..c9d6fdd9a 100644 --- a/dropshot/examples/file_server.rs +++ b/dropshot/examples/file_server.rs @@ -3,6 +3,7 @@ //! Example using Dropshot to serve files use dropshot::ApiDescription; +use dropshot::Body; use dropshot::ConfigLogging; use dropshot::ConfigLoggingLevel; use dropshot::HttpError; @@ -10,7 +11,6 @@ use dropshot::HttpServerStarter; use dropshot::RequestContext; use dropshot::{endpoint, Path}; use http::{Response, StatusCode}; -use hyper::Body; use schemars::JsonSchema; use serde::Deserialize; use std::path::PathBuf; @@ -133,7 +133,11 @@ async fn static_content( format!("failed to read file {:?}: {:#}", entry, e), ) })?; - let file_stream = hyper_staticfile::FileBytesStream::new(file); + + let file_access = hyper_staticfile::vfs::TokioFileAccess::new(file); + let file_stream = + hyper_staticfile::util::FileBytesStream::new(file_access); + let body = Body::wrap(hyper_staticfile::Body::Full(file_stream)); // Derive the MIME type from the file name let content_type = mime_guess::from_path(&entry) @@ -143,7 +147,7 @@ async fn static_content( Ok(Response::builder() .status(StatusCode::OK) .header(http::header::CONTENT_TYPE, content_type) - .body(file_stream.into_body())?) + .body(body)?) } } diff --git a/dropshot/examples/index.rs b/dropshot/examples/index.rs index 48d3b74e8..1fceee5e5 100644 --- a/dropshot/examples/index.rs +++ b/dropshot/examples/index.rs @@ -2,6 +2,7 @@ //! Example use of Dropshot for matching wildcard paths to serve static content. use dropshot::ApiDescription; +use dropshot::Body; use dropshot::ConfigDropshot; use dropshot::ConfigLogging; use dropshot::ConfigLoggingLevel; @@ -10,7 +11,6 @@ use dropshot::HttpServerStarter; use dropshot::RequestContext; use dropshot::{endpoint, Path}; use http::{Response, StatusCode}; -use hyper::Body; use schemars::JsonSchema; use serde::Deserialize; diff --git a/dropshot/examples/multipart.rs b/dropshot/examples/multipart.rs index 8080d7aa1..b61649db4 100644 --- a/dropshot/examples/multipart.rs +++ b/dropshot/examples/multipart.rs @@ -3,6 +3,7 @@ use dropshot::endpoint; use dropshot::ApiDescription; +use dropshot::Body; use dropshot::ConfigDropshot; use dropshot::ConfigLogging; use dropshot::ConfigLoggingLevel; @@ -11,7 +12,6 @@ use dropshot::HttpServerStarter; use dropshot::MultipartBody; use dropshot::RequestContext; use http::{Response, StatusCode}; -use hyper::Body; #[tokio::main] async fn main() -> Result<(), String> { diff --git a/dropshot/src/api_description.rs b/dropshot/src/api_description.rs index 9758fc8d5..3f832982b 100644 --- a/dropshot/src/api_description.rs +++ b/dropshot/src/api_description.rs @@ -1312,6 +1312,7 @@ mod test { use crate::handler::RequestContext; use crate::ApiDescription; use crate::ApiEndpoint; + use crate::Body; use crate::EndpointTagPolicy; use crate::Path; use crate::Query; @@ -1319,7 +1320,6 @@ mod test { use crate::TagDetails; use crate::CONTENT_TYPE_JSON; use http::Method; - use hyper::Body; use hyper::Response; use openapiv3::OpenAPI; use schemars::JsonSchema; diff --git a/dropshot/src/body.rs b/dropshot/src/body.rs new file mode 100644 index 000000000..bf3585e85 --- /dev/null +++ b/dropshot/src/body.rs @@ -0,0 +1,124 @@ +// Copyright 2024 Oxide Computer Company + +use std::pin::Pin; +use std::task::{Context, Poll}; + +use http_body_util::combinators::BoxBody; +use http_body_util::BodyExt; +use hyper::body::{Body as HttpBody, Bytes, Frame}; + +type BoxError = Box; + +/// A body type for both requests and responses in Dropshot. +#[derive(Debug)] +pub struct Body { + inner: BoxBody, +} + +#[derive(Debug)] +pub(crate) struct DataStream(Body); + +impl Body { + /// Create an empty body. + pub fn empty() -> Self { + let inner = http_body_util::Empty::new() + .map_err(|never| match never {}) + .boxed(); + Body { inner } + } + + /// Create a body with content from a specific buffer. + pub fn with_content(buf: impl Into) -> Self { + let inner = http_body_util::Full::new(buf.into()) + .map_err(|never| match never {}) + .boxed(); + Body { inner } + } + + /// Wrap any body as a dropshot Body. + pub fn wrap(under: B) -> Self + where + B: HttpBody + Send + Sync + 'static, + B::Error: Into, + { + let inner = under.map_err(Into::into).boxed(); + Body { inner } + } + + /// Converts this body into an `impl Stream` of only the data frames. + pub(crate) fn into_data_stream(self) -> DataStream { + DataStream(self) + } +} + +impl Default for Body { + fn default() -> Body { + Body::empty() + } +} + +impl From for Body { + fn from(b: Bytes) -> Body { + Body::with_content(b) + } +} + +impl From> for Body { + fn from(s: Vec) -> Body { + Body::with_content(s) + } +} + +impl From for Body { + fn from(s: String) -> Body { + Body::with_content(s) + } +} + +impl From<&'static str> for Body { + fn from(s: &'static str) -> Body { + Body::with_content(s) + } +} + +impl HttpBody for Body { + type Data = Bytes; + type Error = BoxError; + + #[inline] + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + Pin::new(&mut self.inner).poll_frame(cx) + } + + #[inline] + fn size_hint(&self) -> hyper::body::SizeHint { + self.inner.size_hint() + } + + #[inline] + fn is_end_stream(&self) -> bool { + self.inner.is_end_stream() + } +} + +impl futures::Stream for DataStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + loop { + match futures::ready!(Pin::new(&mut self.0).poll_frame(cx)?) { + Some(frame) => match frame.into_data() { + Ok(data) => return Poll::Ready(Some(Ok(data))), + Err(_frame) => {} + }, + None => return Poll::Ready(None), + } + } + } +} diff --git a/dropshot/src/error.rs b/dropshot/src/error.rs index 7ee2d762b..f79da003e 100644 --- a/dropshot/src/error.rs +++ b/dropshot/src/error.rs @@ -275,7 +275,7 @@ impl HttpError { pub fn into_response( self, request_id: &str, - ) -> hyper::Response { + ) -> hyper::Response { // TODO-hardening: consider handling the operational errors that the // Serde serialization fails or the response construction fails. In // those cases, we should probably try to report this as a serious diff --git a/dropshot/src/extractor/body.rs b/dropshot/src/extractor/body.rs index 52757a28a..db474f097 100644 --- a/dropshot/src/extractor/body.rs +++ b/dropshot/src/extractor/body.rs @@ -19,12 +19,11 @@ use bytes::Bytes; use bytes::BytesMut; use futures::Stream; use futures::TryStreamExt; -use hyper::body::HttpBody; +use http_body_util::BodyExt; use schemars::schema::InstanceType; use schemars::schema::SchemaObject; use schemars::JsonSchema; use serde::de::DeserializeOwned; -use std::convert::Infallible; use std::fmt::Debug; // TypedBody: body extractor for formats that can be deserialized to a specific @@ -57,7 +56,7 @@ pub struct MultipartBody { impl ExclusiveExtractor for MultipartBody { async fn from_request( _rqctx: &RequestContext, - request: hyper::Request, + request: hyper::Request, ) -> Result { let (parts, body) = request.into_parts(); // Get the content-type header. @@ -87,7 +86,10 @@ impl ExclusiveExtractor for MultipartBody { ) })?; Ok(MultipartBody { - content: multer::Multipart::new(body, boundary.to_string()), + content: multer::Multipart::new( + body.into_data_stream(), + boundary.to_string(), + ), }) } @@ -121,7 +123,7 @@ impl ExclusiveExtractor for MultipartBody { /// to the content type, and deserialize it to an instance of `BodyType`. async fn http_request_load_body( rqctx: &RequestContext, - request: hyper::Request, + request: hyper::Request, ) -> Result, HttpError> where BodyType: JsonSchema + DeserializeOwned + Send + Sync, @@ -204,7 +206,7 @@ where { async fn from_request( rqctx: &RequestContext, - request: hyper::Request, + request: hyper::Request, ) -> Result, HttpError> { http_request_load_body(rqctx, request).await } @@ -258,7 +260,7 @@ impl UntypedBody { impl ExclusiveExtractor for UntypedBody { async fn from_request( rqctx: &RequestContext, - request: hyper::Request, + request: hyper::Request, ) -> Result { let server = &rqctx.server; let body = request.into_body(); @@ -282,12 +284,12 @@ impl ExclusiveExtractor for UntypedBody { /// raw bytes available to the consumer. #[derive(Debug)] pub struct StreamingBody { - body: hyper::Body, + body: crate::Body, cap: usize, } impl StreamingBody { - fn new(body: hyper::Body, cap: usize) -> Self { + fn new(body: crate::Body, cap: usize) -> Self { Self { body, cap } } @@ -295,8 +297,7 @@ impl StreamingBody { #[doc(hidden)] pub fn __from_bytes(data: Bytes) -> Self { let cap = data.len(); - let stream = futures::stream::iter([Ok::<_, Infallible>(data)]); - let body = hyper::Body::wrap_stream(stream); + let body = crate::Body::from(data); Self { body, cap } } @@ -377,12 +378,21 @@ impl StreamingBody { ) -> impl Stream> + Send { async_stream::try_stream! { let mut bytes_read: usize = 0; - while let Some(buf_res) = self.body.data().await { - let buf = buf_res?; + while let Some(frame_res) = self.body.frame().await { + let frame = frame_res.map_err(|e| HttpError::for_bad_request( + None, + format!("error streaming request body: {}", e), + ))?; + let Ok(buf) = frame.into_data() else { continue }; // skip trailers let len = buf.len(); if bytes_read + len > self.cap { - http_dump_body(&mut self.body).await?; + http_dump_body(&mut self.body).await.map_err(|e| { + HttpError::for_bad_request( + None, + format!("error streaming request body: {}", e), + ) + })?; // TODO-correctness check status code Err(HttpError::for_bad_request( None, @@ -393,10 +403,6 @@ impl StreamingBody { bytes_read += len; yield buf; } - - // Read the trailers as well, even though we're not going to do anything - // with them. - self.body.trailers().await?; } } @@ -417,7 +423,7 @@ impl StreamingBody { impl ExclusiveExtractor for StreamingBody { async fn from_request( rqctx: &RequestContext, - request: hyper::Request, + request: hyper::Request, ) -> Result { let server = &rqctx.server; diff --git a/dropshot/src/extractor/common.rs b/dropshot/src/extractor/common.rs index 2eec965af..31f964506 100644 --- a/dropshot/src/extractor/common.rs +++ b/dropshot/src/extractor/common.rs @@ -25,7 +25,7 @@ pub trait ExclusiveExtractor: Send + Sync + Sized { /// Construct an instance of this type from a `RequestContext`. async fn from_request( rqctx: &RequestContext, - request: hyper::Request, + request: hyper::Request, ) -> Result; fn metadata( @@ -56,7 +56,7 @@ pub trait SharedExtractor: Send + Sync + Sized { impl ExclusiveExtractor for S { async fn from_request( rqctx: &RequestContext, - _request: hyper::Request, + _request: hyper::Request, ) -> Result { ::from_request(rqctx).await } @@ -96,7 +96,7 @@ pub trait RequestExtractor: Send + Sync + Sized { /// Construct an instance of this type from a `RequestContext`. async fn from_request( rqctx: &RequestContext, - request: hyper::Request, + request: hyper::Request, ) -> Result; fn metadata( @@ -109,7 +109,7 @@ pub trait RequestExtractor: Send + Sync + Sized { impl RequestExtractor for () { async fn from_request( _rqctx: &RequestContext, - _request: hyper::Request, + _request: hyper::Request, ) -> Result { Ok(()) } @@ -129,7 +129,7 @@ impl RequestExtractor for () { impl RequestExtractor for (X,) { async fn from_request( rqctx: &RequestContext, - request: hyper::Request, + request: hyper::Request, ) -> Result { Ok((X::from_request(rqctx, request).await?,)) } @@ -162,7 +162,7 @@ macro_rules! impl_rqextractor_for_tuple { { async fn from_request( rqctx: &RequestContext, - request: hyper::Request + request: hyper::Request ) -> Result<( $($S,)+ X ), HttpError> { futures::try_join!( diff --git a/dropshot/src/extractor/raw_request.rs b/dropshot/src/extractor/raw_request.rs index be6d31411..4fb52eb49 100644 --- a/dropshot/src/extractor/raw_request.rs +++ b/dropshot/src/extractor/raw_request.rs @@ -13,11 +13,11 @@ use std::fmt::Debug; /// [`hyper::Request`]. #[derive(Debug)] pub struct RawRequest { - request: hyper::Request, + request: hyper::Request, } impl RawRequest { - pub fn into_inner(self) -> hyper::Request { + pub fn into_inner(self) -> hyper::Request { self.request } } @@ -26,7 +26,7 @@ impl RawRequest { impl ExclusiveExtractor for RawRequest { async fn from_request( _rqctx: &RequestContext, - request: hyper::Request, + request: hyper::Request, ) -> Result { Ok(RawRequest { request }) } diff --git a/dropshot/src/handler.rs b/dropshot/src/handler.rs index 8e20122cc..b2faba7e8 100644 --- a/dropshot/src/handler.rs +++ b/dropshot/src/handler.rs @@ -42,6 +42,7 @@ use crate::api_description::ApiEndpointHeader; use crate::api_description::ApiEndpointResponse; use crate::api_description::ApiSchemaGenerator; use crate::api_description::StubContext; +use crate::body::Body; use crate::pagination::PaginationParams; use crate::router::VariableSet; use crate::schema_util::make_subschema_for; @@ -52,7 +53,6 @@ use crate::to_map::to_map; use async_trait::async_trait; use http::HeaderMap; use http::StatusCode; -use hyper::Body; use hyper::Response; use schemars::JsonSchema; use serde::de::DeserializeOwned; @@ -376,7 +376,7 @@ pub trait RouteHandler: Debug + Send + Sync { async fn handle_request( &self, rqctx: RequestContext, - request: hyper::Request, + request: hyper::Request, ) -> HttpHandlerResult; } @@ -439,7 +439,7 @@ where async fn handle_request( &self, rqctx: RequestContext, - request: hyper::Request, + request: hyper::Request, ) -> HttpHandlerResult { // This is where the magic happens: in the code below, `funcparams` has // type `FuncParams`, which is a tuple type describing the extractor @@ -514,7 +514,7 @@ impl RouteHandler for StubRouteHandler { async fn handle_request( &self, _: RequestContext, - _: hyper::Request, + _: hyper::Request, ) -> HttpHandlerResult { unimplemented!("stub handler called, not implemented: {}", self.label) } @@ -567,7 +567,7 @@ impl HttpResponse for Response { } } -/// Wraps a [hyper::Body] so that it can be used with coded response types such +/// Wraps a [dropshot::Body] so that it can be used with coded response types such /// as [HttpResponseOk]. pub struct FreeformBody(pub Body); diff --git a/dropshot/src/http_util.rs b/dropshot/src/http_util.rs index c2f87f7af..8c454ed47 100644 --- a/dropshot/src/http_util.rs +++ b/dropshot/src/http_util.rs @@ -2,7 +2,8 @@ //! General-purpose HTTP-related facilities use bytes::Bytes; -use hyper::body::HttpBody; +use http_body_util::BodyExt; +use hyper::body::Body as HttpBody; use serde::de::DeserializeOwned; use super::error::HttpError; @@ -35,9 +36,11 @@ where // TODO better understand pin_mut!() // TODO do we need to use saturating_add() here? let mut nbytesread: usize = 0; - while let Some(maybebuf) = body.data().await { - let buf = maybebuf?; - nbytesread += buf.len(); + while let Some(maybefr) = body.frame().await { + let fr = maybefr?; + if let Ok(buf) = fr.into_data() { + nbytesread += buf.len(); + } } // TODO-correctness why does the is_end_stream() assertion fail? diff --git a/dropshot/src/lib.rs b/dropshot/src/lib.rs index 620f3f1c0..8716777d9 100644 --- a/dropshot/src/lib.rs +++ b/dropshot/src/lib.rs @@ -387,7 +387,7 @@ //! use dropshot::TypedBody; //! use dropshot::Query; //! use dropshot::RequestContext; -//! use hyper::Body; +//! use dropshot::Body; //! use hyper::Response; //! use schemars::JsonSchema; //! use serde::Deserialize; @@ -750,6 +750,7 @@ mod dtrace; mod api_description; +mod body; mod config; mod error; mod extractor; @@ -785,6 +786,7 @@ pub use api_description::StubContext; pub use api_description::TagConfig; pub use api_description::TagDetails; pub use api_description::TagExternalDocs; +pub use body::Body; pub use config::ConfigDropshot; pub use config::ConfigTls; pub use config::HandlerTaskMode; diff --git a/dropshot/src/router.rs b/dropshot/src/router.rs index 93934f02e..ed3af536d 100644 --- a/dropshot/src/router.rs +++ b/dropshot/src/router.rs @@ -735,9 +735,9 @@ mod test { use crate::router::VariableValue; use crate::ApiEndpoint; use crate::ApiEndpointResponse; + use crate::Body; use http::Method; use http::StatusCode; - use hyper::Body; use hyper::Response; use serde::Deserialize; use std::collections::BTreeMap; diff --git a/dropshot/src/server.rs b/dropshot/src/server.rs index 7f7935005..8816c9f77 100644 --- a/dropshot/src/server.rs +++ b/dropshot/src/server.rs @@ -2,6 +2,7 @@ //! Generic server-wide state and facilities use super::api_description::ApiDescription; +use super::body::Body; use super::config::{ConfigDropshot, ConfigTls}; #[cfg(feature = "usdt-probes")] use super::dtrace::probes; @@ -18,12 +19,7 @@ use futures::future::{ }; use futures::lock::Mutex; use futures::stream::{Stream, StreamExt}; -use hyper::server::{ - conn::{AddrIncoming, AddrStream}, - Server, -}; use hyper::service::Service; -use hyper::Body; use hyper::Request; use hyper::Response; use rustls; @@ -197,18 +193,16 @@ impl HttpServerStarter { WrappedHttpServerStarter::Https(https) => { https.start(rx, log_close) } - } - .map(|r| { - r.map_err(|e| format!("waiting for server: {e}"))? - .map_err(|e| format!("server stopped: {e}")) - }); + }; info!(self.app_state.log, "listening"); let handler_waitgroup = self.handler_waitgroup; let join_handle = async move { // After the server shuts down, we also want to wait for any // detached handler futures to complete. - () = join_handle.await?; + () = join_handle + .await + .map_err(|e| format!("server stopped: {e}"))?; () = handler_waitgroup.wait().await; Ok(()) }; @@ -256,7 +250,8 @@ enum WrappedHttpServerStarter { } struct InnerHttpServerStarter( - Server>, + HttpAcceptor, + ServerConnectionHandler, ); type InnerHttpServerStarterNewReturn = @@ -266,17 +261,46 @@ impl InnerHttpServerStarter { /// Begins execution of the underlying Http server. fn start( self, - close_signal: tokio::sync::oneshot::Receiver<()>, + mut close_signal: tokio::sync::oneshot::Receiver<()>, log_close: Logger, - ) -> tokio::task::JoinHandle> { - let graceful = self.0.with_graceful_shutdown(async move { - close_signal.await.expect( - "dropshot server shutting down without invoking close()", - ); - info!(log_close, "received request to begin graceful shutdown"); - }); + ) -> tokio::task::JoinHandle<()> { + use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer}; + use hyper_util::server::conn::auto; + + tokio::spawn(async move { + let mut builder = auto::Builder::new(TokioExecutor::new()); + // http/1 settings + builder.http1().timer(TokioTimer::new()); + // http/2 settings + builder.http2().timer(TokioTimer::new()); + + // Use a graceful watcher to keep track of all existing connections, + // and when the close_signal is trigger, force all known conns + // to start a graceful shutdown. + let graceful = + hyper_util::server::graceful::GracefulShutdown::new(); - tokio::spawn(graceful) + loop { + tokio::select! { + (sock, remote_addr) = self.0.accept() => { + let fut = builder.serve_connection_with_upgrades( + TokioIo::new(sock), + self.1.make_http_request_handler(remote_addr), + ); + let fut = graceful.watch(fut.into_owned()); + tokio::spawn(fut); + }, + + _ = &mut close_signal => { + info!(log_close, "received request to begin graceful shutdown"); + break; + } + } + } + + // optional: could use another select on a timeout + graceful.shutdown().await + }) } /// Set up an HTTP server bound on the specified address that runs @@ -290,9 +314,15 @@ impl InnerHttpServerStarter { private: C, log: &Logger, handler_waitgroup_worker: waitgroup::Worker, - ) -> Result, hyper::Error> { - let incoming = AddrIncoming::bind(&config.bind_address)?; - let local_addr = incoming.local_addr(); + ) -> Result, std::io::Error> { + // We use `from_std` instead of just calling `bind` here directly + // to avoid invoking an async function. + let std_listener = std::net::TcpListener::bind(&config.bind_address)?; + std_listener.set_nonblocking(true)?; + let tcp = TcpListener::from_std(std_listener)?; + let local_addr = tcp.local_addr()?; + let incoming = + HttpAcceptor { tcp, log: log.new(o!("local_addr" => local_addr)) }; let app_state = Arc::new(DropshotState { private, @@ -305,9 +335,44 @@ impl InnerHttpServerStarter { }); let make_service = ServerConnectionHandler::new(app_state.clone()); - let builder = hyper::Server::builder(incoming); - let server = builder.serve(make_service); - Ok((InnerHttpServerStarter(server), app_state, local_addr)) + Ok(( + InnerHttpServerStarter(incoming, make_service), + app_state, + local_addr, + )) + } +} + +/// Accepts TCP connections like a `TcpListener`, but ignores transient errors rather than propagating them to the caller +struct HttpAcceptor { + tcp: TcpListener, + log: slog::Logger, +} + +impl HttpAcceptor { + async fn accept(&self) -> (TcpStream, SocketAddr) { + loop { + match self.tcp.accept().await { + Ok((socket, addr)) => return (socket, addr), + Err(e) => match e.kind() { + // These are errors on the individual socket that we + // tried to accept, and so can be ignored. + std::io::ErrorKind::ConnectionRefused + | std::io::ErrorKind::ConnectionAborted + | std::io::ErrorKind::ConnectionReset => (), + + // This could EMFILE implying resource exhaustion. + // Sleep a little bit and try again. + _ => { + warn!(self.log, "accept error"; "error" => e); + tokio::time::sleep(std::time::Duration::from_millis( + 100, + )) + .await; + } + }, + } + } } } @@ -384,21 +449,25 @@ impl HttpsAcceptor { pub fn new( log: slog::Logger, tls_acceptor: Arc>, - tcp_listener: TcpListener, + http_acceptor: HttpAcceptor, ) -> HttpsAcceptor { HttpsAcceptor { stream: Box::new(Box::pin(Self::new_stream( log, tls_acceptor, - tcp_listener, + http_acceptor, ))), } } + async fn accept(&mut self) -> Option> { + self.stream.next().await + } + fn new_stream( log: slog::Logger, tls_acceptor: Arc>, - tcp_listener: TcpListener, + http_acceptor: HttpAcceptor, ) -> impl Stream> { stream! { let mut tls_negotiations = futures::stream::FuturesUnordered::new(); @@ -424,29 +493,7 @@ impl HttpsAcceptor { }, } }, - accept_result = tcp_listener.accept() => { - let (socket, addr) = match accept_result { - Ok(v) => v, - Err(e) => { - match e.kind() { - std::io::ErrorKind::ConnectionAborted => { - continue; - }, - // The other errors that can be returned - // under POSIX are all programming errors or - // resource exhaustion. For now, handle - // these by no longer accepting new - // connections. - // TODO-robustness: Consider handling these - // more gracefully. - _ => { - yield Err(e); - break; - } - } - } - }; - + (socket, addr) = http_acceptor.accept() => { let tls_negotiation = tls_acceptor .lock() .await @@ -461,21 +508,9 @@ impl HttpsAcceptor { } } -impl hyper::server::accept::Accept for HttpsAcceptor { - type Conn = TlsConn; - type Error = std::io::Error; - - fn poll_accept( - mut self: Pin<&mut Self>, - ctx: &mut core::task::Context, - ) -> core::task::Poll>> { - let pinned = Pin::new(&mut self.stream); - pinned.poll_next(ctx) - } -} - struct InnerHttpsServerStarter( - Server>, + HttpsAcceptor, + ServerConnectionHandler, ); /// Create a TLS configuration from the Dropshot config structure. @@ -554,18 +589,46 @@ type InnerHttpsServerStarterNewReturn = impl InnerHttpsServerStarter { /// Begins execution of the underlying Http server. fn start( - self, - close_signal: tokio::sync::oneshot::Receiver<()>, + mut self, + mut close_signal: tokio::sync::oneshot::Receiver<()>, log_close: Logger, - ) -> tokio::task::JoinHandle> { - let graceful = self.0.with_graceful_shutdown(async move { - close_signal.await.expect( - "dropshot server shutting down without invoking close()", - ); - info!(log_close, "received request to begin graceful shutdown"); - }); + ) -> tokio::task::JoinHandle<()> { + use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer}; + use hyper_util::server::conn::auto; + + tokio::spawn(async move { + let mut builder = auto::Builder::new(TokioExecutor::new()); + // http/1 settings + builder.http1().timer(TokioTimer::new()); - tokio::spawn(graceful) + // Use a graceful watcher to keep track of all existing connections, + // and when the close_signal is trigger, force all known conns + // to start a graceful shutdown. + let graceful = + hyper_util::server::graceful::GracefulShutdown::new(); + + loop { + tokio::select! { + Some(Ok(sock)) = self.0.accept() => { + let remote_addr = sock.remote_addr(); + let fut = builder.serve_connection_with_upgrades( + TokioIo::new(sock), + self.1.make_http_request_handler(remote_addr), + ); + let fut = graceful.watch(fut.into_owned()); + tokio::spawn(fut); + }, + + _ = &mut close_signal => { + info!(log_close, "received request to begin graceful shutdown"); + break; + } + } + } + + // optional: could use another select on a timeout + graceful.shutdown().await + }) } fn new( @@ -592,6 +655,7 @@ impl InnerHttpsServerStarter { let local_addr = tcp.local_addr()?; let logger = log.new(o!("local_addr" => local_addr)); + let tcp = HttpAcceptor { tcp, log: logger.clone() }; let https_acceptor = HttpsAcceptor::new(logger.clone(), acceptor.clone(), tcp); @@ -606,28 +670,12 @@ impl InnerHttpsServerStarter { }); let make_service = ServerConnectionHandler::new(Arc::clone(&app_state)); - let server = Server::builder(https_acceptor).serve(make_service); - Ok((InnerHttpsServerStarter(server), app_state, local_addr)) - } -} - -impl Service<&TlsConn> for ServerConnectionHandler { - type Response = ServerRequestHandler; - type Error = GenericError; - type Future = BoxFuture<'static, Result>; - - fn poll_ready( - &mut self, - _ctx: &mut Context<'_>, - ) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, conn: &TlsConn) -> Self::Future { - let server = Arc::clone(&self.server); - let remote_addr = conn.remote_addr(); - Box::pin(http_connection_handle(server, remote_addr)) + Ok(( + InnerHttpsServerStarter(https_acceptor, make_service), + app_state, + local_addr, + )) } } @@ -764,18 +812,6 @@ impl FusedFuture for HttpServer { } } -/// Initial entry point for handling a new connection to the HTTP server. -/// This is invoked by Hyper when a new connection is accepted. This function -/// must return a Hyper Service object that will handle requests for this -/// connection. -async fn http_connection_handle( - server: Arc>, - remote_addr: SocketAddr, -) -> Result, GenericError> { - info!(server.log, "accepted connection"; "remote_addr" => %remote_addr); - Ok(ServerRequestHandler::new(server, remote_addr)) -} - /// Initial entry point for handling a new request to the HTTP server. This is /// invoked by Hyper when a new request is received. This function returns a /// Result that either represents a valid HTTP response or an error (which will @@ -783,7 +819,7 @@ async fn http_connection_handle( async fn http_request_handle_wrap( server: Arc>, remote_addr: SocketAddr, - request: Request, + request: Request, ) -> Result, GenericError> { // This extra level of indirection makes error handling much more // straightforward, since the request handling code can simply return early @@ -936,7 +972,7 @@ async fn http_request_handle_wrap( async fn http_request_handle( server: Arc>, - request: Request, + request: Request, request_id: &str, request_log: Logger, remote_addr: std::net::SocketAddr, @@ -947,6 +983,7 @@ async fn http_request_handle( // TODO-hardening: add a request read timeout as well so that we don't allow // this to take forever. // TODO-correctness: Do we need to dump the body on errors? + let request = request.map(crate::Body::wrap); let method = request.method(); let uri = request.uri(); let lookup_result = @@ -1056,38 +1093,17 @@ impl ServerConnectionHandler { fn new(server: Arc>) -> Self { ServerConnectionHandler { server } } -} - -impl Service<&AddrStream> for ServerConnectionHandler { - // Recall that a Service in this context is just something that takes a - // request (which could be anything) and produces a response (which could be - // anything). This being a connection handler, the request type is an - // AddrStream (which wraps a TCP connection) and the response type is - // another Service: one that accepts HTTP requests and produces HTTP - // responses. - type Response = ServerRequestHandler; - type Error = GenericError; - type Future = BoxFuture<'static, Result>; - - fn poll_ready( - &mut self, - _cx: &mut Context<'_>, - ) -> Poll> { - // TODO is this right? - Poll::Ready(Ok(())) - } - fn call(&mut self, conn: &AddrStream) -> Self::Future { - // We're given a borrowed reference to the AddrStream, but our interface - // is async (which is good, so that we can support time-consuming - // operations as part of receiving requests). To avoid having to ensure - // that conn's lifetime exceeds that of this async operation, we simply - // copy the only useful information out of the conn: the SocketAddr. We - // may want to create our own connection type to encapsulate the socket - // address and any other per-connection state that we want to keep. - let server = Arc::clone(&self.server); - let remote_addr = conn.remote_addr(); - Box::pin(http_connection_handle(server, remote_addr)) + /// Initial entry point for handling a new connection to the HTTP server. + /// This is invoked by Hyper when a new connection is accepted. This function + /// must return a Hyper Service object that will handle requests for this + /// connection. + fn make_http_request_handler( + &self, + remote_addr: SocketAddr, + ) -> ServerRequestHandler { + info!(self.server.log, "accepted connection"; "remote_addr" => %remote_addr); + ServerRequestHandler::new(self.server.clone(), remote_addr) } } @@ -1110,20 +1126,14 @@ impl ServerRequestHandler { } } -impl Service> for ServerRequestHandler { +impl Service> + for ServerRequestHandler +{ type Response = Response; type Error = GenericError; type Future = BoxFuture<'static, Result>; - fn poll_ready( - &mut self, - _cx: &mut Context<'_>, - ) -> Poll> { - // TODO is this right? - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: Request) -> Self::Future { + fn call(&self, req: Request) -> Self::Future { Box::pin(http_request_handle_wrap( Arc::clone(&self.server), self.remote_addr, @@ -1231,4 +1241,29 @@ mod test { let (server, _) = create_test_server(); std::mem::drop(server); } + + #[tokio::test] + async fn test_http_acceptor_happy_path() { + const TOTAL: usize = 100; + let tcp = + tokio::net::TcpListener::bind("127.0.0.1:0").await.expect("bind"); + let addr = tcp.local_addr().expect("local_addr"); + let acceptor = + HttpAcceptor { log: slog::Logger::root(slog::Discard, o!()), tcp }; + + let t1 = tokio::spawn(async move { + for _ in 0..TOTAL { + let _ = acceptor.accept().await; + } + }); + + let t2 = tokio::spawn(async move { + for _ in 0..TOTAL { + tokio::net::TcpStream::connect(&addr).await.expect("connect"); + } + }); + + t1.await.expect("task 1"); + t2.await.expect("task 2"); + } } diff --git a/dropshot/src/test_util.rs b/dropshot/src/test_util.rs index f94eeb35a..caff1cbc4 100644 --- a/dropshot/src/test_util.rs +++ b/dropshot/src/test_util.rs @@ -6,14 +6,12 @@ use camino::Utf8PathBuf; use chrono::DateTime; use chrono::Utc; use http::method::Method; -use hyper::body::to_bytes; -use hyper::client::HttpConnector; -use hyper::Body; -use hyper::Client; +use http_body_util::BodyExt as _; use hyper::Request; use hyper::Response; use hyper::StatusCode; use hyper::Uri; +use hyper_util::client::legacy::{connect::HttpConnector, Client}; use serde::de::DeserializeOwned; use serde::Deserialize; use serde::Serialize; @@ -28,6 +26,7 @@ use std::sync::atomic::AtomicU32; use std::sync::atomic::Ordering; use crate::api_description::ApiDescription; +use crate::body::Body; use crate::config::ConfigDropshot; use crate::error::HttpErrorResponseBody; use crate::http_util::CONTENT_TYPE_URL_ENCODED; @@ -77,7 +76,7 @@ pub struct ClientTestContext { /// actual bind address of the HTTP server under test pub bind_address: SocketAddr, /// HTTP client, used for making requests against the test server - pub client: Client, + pub client: Client, /// logger for the test suite HTTP client pub client_log: Logger, } @@ -87,7 +86,8 @@ impl ClientTestContext { pub fn new(server_addr: SocketAddr, log: Logger) -> ClientTestContext { ClientTestContext { bind_address: server_addr, - client: Client::new(), + client: Client::builder(hyper_util::rt::TokioExecutor::new()) + .build(HttpConnector::new()), client_log: log, } } @@ -122,7 +122,7 @@ impl ClientTestContext { request_body: Option, expected_status: StatusCode, ) -> Result, HttpErrorResponseBody> { - let body: Body = match request_body { + let body = match request_body { None => Body::empty(), Some(input) => serde_json::to_string(&input).unwrap().into(), }; @@ -320,12 +320,11 @@ impl ClientTestContext { // For "204 No Content" responses, validate that we got no content in // the body. if status == StatusCode::NO_CONTENT { - let body_bytes = to_bytes(response.body_mut()) - .await - .expect("error reading body"); + let body_bytes = read_bytes(&mut response).await; assert_eq!(0, body_bytes.len()); } + let mut response = response.map(Body::wrap); // If this was a successful response, there's nothing else to check // here. Return the response so the caller can validate the content if // they want. @@ -526,10 +525,7 @@ pub async fn read_ndjson( crate::CONTENT_TYPE_NDJSON, headers.get(http::header::CONTENT_TYPE).expect("missing content-type") ); - let body_bytes = - to_bytes(response.body_mut()).await.expect("error reading body"); - let body_string = String::from_utf8(body_bytes.as_ref().into()) - .expect("response contained non-UTF-8 bytes"); + let body_string = read_string(response).await; // TODO-cleanup: Consider using serde_json::StreamDeserializer or maybe // implementing an NDJSON-based Serde type? @@ -556,8 +552,7 @@ pub async fn read_json( crate::CONTENT_TYPE_JSON, headers.get(http::header::CONTENT_TYPE).expect("missing content-type") ); - let body_bytes = - to_bytes(response.body_mut()).await.expect("error reading body"); + let body_bytes = read_bytes(response).await; serde_json::from_slice(body_bytes.as_ref()) .expect("failed to parse server body as expected type") } @@ -565,12 +560,19 @@ pub async fn read_json( /// Given a Hyper Response whose body is expected to be a UTF-8-encoded string, /// asynchronously read the body. pub async fn read_string(response: &mut Response) -> String { - let body_bytes = - to_bytes(response.body_mut()).await.expect("error reading body"); + let body_bytes = read_bytes(response).await; String::from_utf8(body_bytes.as_ref().into()) .expect("response contained non-UTF-8 bytes") } +async fn read_bytes(response: &mut Response) -> hyper::body::Bytes +where + B: hyper::body::Body + Unpin, + B::Error: std::fmt::Debug, +{ + response.body_mut().collect().await.expect("error reading body").to_bytes() +} + /// Given a Hyper Response, extract and parse the Content-Length header. pub fn read_content_length(response: &Response) -> usize { response diff --git a/dropshot/src/websocket.rs b/dropshot/src/websocket.rs index 5311e130c..d184e4120 100644 --- a/dropshot/src/websocket.rs +++ b/dropshot/src/websocket.rs @@ -7,6 +7,7 @@ //! which will be spawned to handle the incoming connection. use crate::api_description::ExtensionMode; +use crate::body::Body; use crate::{ ApiEndpointBodyContentType, ExclusiveExtractor, ExtractorMetadata, HttpError, RequestContext, ServerContext, @@ -17,12 +18,14 @@ use http::header; use http::Response; use http::StatusCode; use hyper::upgrade::OnUpgrade; -use hyper::Body; use schemars::JsonSchema; use serde_json::json; use sha1::{Digest, Sha1}; use slog::Logger; use std::future::Future; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; /// WebsocketUpgrade is an ExclusiveExtractor used to upgrade and handle an HTTP /// request as a websocket when present in a Dropshot endpoint's function @@ -51,7 +54,10 @@ pub type WebsocketEndpointResult = Result, HttpError>; pub struct WebsocketConnection(WebsocketConnectionRaw); /// A type that implements [tokio::io::AsyncRead] + [tokio::io::AsyncWrite]. -pub type WebsocketConnectionRaw = hyper::upgrade::Upgraded; +// A newtype so as to not expose the less-stable hyper-util type. +pub struct WebsocketConnectionRaw( + hyper_util::rt::TokioIo, +); impl WebsocketConnection { /// Consumes `self` and returns the held raw connection. @@ -87,7 +93,7 @@ fn derive_accept_key(request_key: &[u8]) -> String { impl ExclusiveExtractor for WebsocketUpgrade { async fn from_request( rqctx: &RequestContext, - request: hyper::Request, + request: hyper::Request, ) -> Result { if !request .headers() @@ -229,7 +235,9 @@ impl WebsocketUpgrade { tokio::spawn(async move { match upgrade_fut.await { Ok(upgrade) => { - match handler(WebsocketConnection(upgrade)).await { + let io = hyper_util::rt::TokioIo::new(upgrade); + let raw = WebsocketConnectionRaw(io); + match handler(WebsocketConnection(raw)).await { Ok(x) => Ok(x), Err(e) => { error!( @@ -292,8 +300,55 @@ impl JsonSchema for WebsocketUpgrade { } } +impl tokio::io::AsyncRead for WebsocketConnectionRaw { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + Pin::new(&mut self.0).poll_read(cx, buf) + } +} + +impl tokio::io::AsyncWrite for WebsocketConnectionRaw { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.0).poll_write(cx, buf) + } + + fn is_write_vectored(&self) -> bool { + self.0.is_write_vectored() + } + + fn poll_write_vectored( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + Pin::new(&mut self.0).poll_write_vectored(cx, bufs) + } + + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.0).poll_flush(cx) + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.0).poll_shutdown(cx) + } +} + #[cfg(test)] mod tests { + use crate::body::Body; use crate::config::HandlerTaskMode; use crate::router::HttpRouter; use crate::server::{DropshotState, ServerConfig}; @@ -303,7 +358,6 @@ mod tests { }; use debug_ignore::DebugIgnore; use http::Request; - use hyper::Body; use std::net::{IpAddr, Ipv6Addr, SocketAddr}; use std::num::NonZeroU32; use std::sync::Arc; diff --git a/dropshot/tests/fail/bad_channel17.stderr b/dropshot/tests/fail/bad_channel17.stderr index 6009d497b..346f21ac8 100644 --- a/dropshot/tests/fail/bad_channel17.stderr +++ b/dropshot/tests/fail/bad_channel17.stderr @@ -20,7 +20,7 @@ note: required by a bound in `need_shared_extractor` | ------------------- required by a bound in this function = note: this error originates in the attribute macro `channel` (in Nightly builds, run with -Z macro-backtrace for more info) -error[E0277]: the trait bound `fn(RequestContext<()>, WebsocketConnection, WebsocketUpgrade) -> impl Future, HttpError>> {two_websocket_channels_adapter}: dropshot::handler::HttpHandlerFunc<_, _, _>` is not satisfied +error[E0277]: the trait bound `fn(RequestContext<()>, WebsocketConnection, WebsocketUpgrade) -> impl Future, HttpError>> {two_websocket_channels_adapter}: dropshot::handler::HttpHandlerFunc<_, _, _>` is not satisfied --> tests/fail/bad_channel17.rs:16:1 | 12 | / #[channel { @@ -29,7 +29,7 @@ error[E0277]: the trait bound `fn(RequestContext<()>, WebsocketConnection, Webso 15 | | }] | |__- required by a bound introduced by this call 16 | async fn two_websocket_channels( - | ^^^^^ the trait `dropshot::handler::HttpHandlerFunc<_, _, _>` is not implemented for fn item `fn(RequestContext<()>, WebsocketConnection, WebsocketUpgrade) -> impl Future, HttpError>> {two_websocket_channels_adapter}` + | ^^^^^ the trait `dropshot::handler::HttpHandlerFunc<_, _, _>` is not implemented for fn item `fn(RequestContext<()>, WebsocketConnection, WebsocketUpgrade) -> impl Future, HttpError>> {two_websocket_channels_adapter}` | note: required by a bound in `ApiEndpoint::::new` --> src/api_description.rs diff --git a/dropshot/tests/fail/bad_channel18.stderr b/dropshot/tests/fail/bad_channel18.stderr index cc803ca95..d47ce000d 100644 --- a/dropshot/tests/fail/bad_channel18.stderr +++ b/dropshot/tests/fail/bad_channel18.stderr @@ -59,7 +59,7 @@ note: required by a bound in `WebsocketUpgrade::handle` | C: FnOnce(WebsocketConnection) -> F + Send + 'static, | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `WebsocketUpgrade::handle` -error[E0277]: the trait bound `fn(RequestContext<()>, WebsocketConnection, WebsocketUpgrade) -> impl Future, HttpError>> {websocket_channel_not_last_adapter}: dropshot::handler::HttpHandlerFunc<_, _, _>` is not satisfied +error[E0277]: the trait bound `fn(RequestContext<()>, WebsocketConnection, WebsocketUpgrade) -> impl Future, HttpError>> {websocket_channel_not_last_adapter}: dropshot::handler::HttpHandlerFunc<_, _, _>` is not satisfied --> tests/fail/bad_channel18.rs:23:1 | 19 | / #[channel { @@ -68,7 +68,7 @@ error[E0277]: the trait bound `fn(RequestContext<()>, WebsocketConnection, Webso 22 | | }] | |__- required by a bound introduced by this call 23 | async fn websocket_channel_not_last( - | ^^^^^ the trait `dropshot::handler::HttpHandlerFunc<_, _, _>` is not implemented for fn item `fn(RequestContext<()>, WebsocketConnection, WebsocketUpgrade) -> impl Future, HttpError>> {websocket_channel_not_last_adapter}` + | ^^^^^ the trait `dropshot::handler::HttpHandlerFunc<_, _, _>` is not implemented for fn item `fn(RequestContext<()>, WebsocketConnection, WebsocketUpgrade) -> impl Future, HttpError>> {websocket_channel_not_last_adapter}` | note: required by a bound in `ApiEndpoint::::new` --> src/api_description.rs diff --git a/dropshot/tests/fail/bad_channel19.stderr b/dropshot/tests/fail/bad_channel19.stderr index 48416cfa2..26519e2c1 100644 --- a/dropshot/tests/fail/bad_channel19.stderr +++ b/dropshot/tests/fail/bad_channel19.stderr @@ -20,7 +20,7 @@ note: required by a bound in `need_shared_extractor` | ------ required by a bound in this function = note: this error originates in the attribute macro `channel` (in Nightly builds, run with -Z macro-backtrace for more info) -error[E0277]: the trait bound `fn(RequestContext<()>, std::string::String, WebsocketUpgrade) -> impl Future, HttpError>> {non_extractor_as_last_argument_adapter}: dropshot::handler::HttpHandlerFunc<_, _, _>` is not satisfied +error[E0277]: the trait bound `fn(RequestContext<()>, std::string::String, WebsocketUpgrade) -> impl Future, HttpError>> {non_extractor_as_last_argument_adapter}: dropshot::handler::HttpHandlerFunc<_, _, _>` is not satisfied --> tests/fail/bad_channel19.rs:23:1 | 19 | / #[channel { @@ -29,7 +29,7 @@ error[E0277]: the trait bound `fn(RequestContext<()>, std::string::String, Webso 22 | | }] | |__- required by a bound introduced by this call 23 | async fn non_extractor_as_last_argument( - | ^^^^^ the trait `dropshot::handler::HttpHandlerFunc<_, _, _>` is not implemented for fn item `fn(RequestContext<()>, std::string::String, WebsocketUpgrade) -> impl Future, HttpError>> {non_extractor_as_last_argument_adapter}` + | ^^^^^ the trait `dropshot::handler::HttpHandlerFunc<_, _, _>` is not implemented for fn item `fn(RequestContext<()>, std::string::String, WebsocketUpgrade) -> impl Future, HttpError>> {non_extractor_as_last_argument_adapter}` | note: required by a bound in `ApiEndpoint::::new` --> src/api_description.rs diff --git a/dropshot/tests/fail/bad_channel4.stderr b/dropshot/tests/fail/bad_channel4.stderr index 516f22768..72f095c92 100644 --- a/dropshot/tests/fail/bad_channel4.stderr +++ b/dropshot/tests/fail/bad_channel4.stderr @@ -155,7 +155,7 @@ note: required by a bound in `dropshot::Query` | ^^^^^^^^^^^^^^^^ required by this bound in `Query` = note: this error originates in the attribute macro `channel` (in Nightly builds, run with -Z macro-backtrace for more info) -error[E0277]: the trait bound `fn(RequestContext<()>, dropshot::Query, WebsocketUpgrade) -> impl Future, HttpError>> {bad_channel_adapter}: dropshot::handler::HttpHandlerFunc<_, _, _>` is not satisfied +error[E0277]: the trait bound `fn(RequestContext<()>, dropshot::Query, WebsocketUpgrade) -> impl Future, HttpError>> {bad_channel_adapter}: dropshot::handler::HttpHandlerFunc<_, _, _>` is not satisfied --> tests/fail/bad_channel4.rs:20:1 | 16 | / #[channel { @@ -164,7 +164,7 @@ error[E0277]: the trait bound `fn(RequestContext<()>, dropshot::Query` is not implemented for fn item `fn(RequestContext<()>, dropshot::Query, WebsocketUpgrade) -> impl Future, HttpError>> {bad_channel_adapter}` + | ^^^^^ the trait `dropshot::handler::HttpHandlerFunc<_, _, _>` is not implemented for fn item `fn(RequestContext<()>, dropshot::Query, WebsocketUpgrade) -> impl Future, HttpError>> {bad_channel_adapter}` | note: required by a bound in `ApiEndpoint::::new` --> src/api_description.rs diff --git a/dropshot/tests/fail/bad_channel5.stderr b/dropshot/tests/fail/bad_channel5.stderr index fc4a3753d..afaf5fbab 100644 --- a/dropshot/tests/fail/bad_channel5.stderr +++ b/dropshot/tests/fail/bad_channel5.stderr @@ -81,7 +81,7 @@ note: required by a bound in `dropshot::Query` | ^^^^^^^^^^^^^^^^ required by this bound in `Query` = note: this error originates in the attribute macro `channel` (in Nightly builds, run with -Z macro-backtrace for more info) -error[E0277]: the trait bound `fn(RequestContext<()>, dropshot::Query, WebsocketUpgrade) -> impl Future, HttpError>> {bad_channel_adapter}: dropshot::handler::HttpHandlerFunc<_, _, _>` is not satisfied +error[E0277]: the trait bound `fn(RequestContext<()>, dropshot::Query, WebsocketUpgrade) -> impl Future, HttpError>> {bad_channel_adapter}: dropshot::handler::HttpHandlerFunc<_, _, _>` is not satisfied --> tests/fail/bad_channel5.rs:22:1 | 18 | / #[channel { @@ -90,7 +90,7 @@ error[E0277]: the trait bound `fn(RequestContext<()>, dropshot::Query` is not implemented for fn item `fn(RequestContext<()>, dropshot::Query, WebsocketUpgrade) -> impl Future, HttpError>> {bad_channel_adapter}` + | ^^^^^ the trait `dropshot::handler::HttpHandlerFunc<_, _, _>` is not implemented for fn item `fn(RequestContext<()>, dropshot::Query, WebsocketUpgrade) -> impl Future, HttpError>> {bad_channel_adapter}` | note: required by a bound in `ApiEndpoint::::new` --> src/api_description.rs diff --git a/dropshot/tests/fail/bad_channel9.stderr b/dropshot/tests/fail/bad_channel9.stderr index ea0ddcd58..43b74cca0 100644 --- a/dropshot/tests/fail/bad_channel9.stderr +++ b/dropshot/tests/fail/bad_channel9.stderr @@ -18,7 +18,7 @@ error[E0277]: the trait bound `dropshot::Query: RequestContextArgum = help: the trait `RequestContextArgument` is implemented for `RequestContext` = note: this error originates in the attribute macro `channel` (in Nightly builds, run with -Z macro-backtrace for more info) -error[E0277]: the trait bound `fn(dropshot::Query, WebsocketUpgrade) -> impl Future, HttpError>> {bad_channel_adapter}: dropshot::handler::HttpHandlerFunc<_, _, _>` is not satisfied +error[E0277]: the trait bound `fn(dropshot::Query, WebsocketUpgrade) -> impl Future, HttpError>> {bad_channel_adapter}: dropshot::handler::HttpHandlerFunc<_, _, _>` is not satisfied --> tests/fail/bad_channel9.rs:23:1 | 19 | / #[channel { @@ -27,7 +27,7 @@ error[E0277]: the trait bound `fn(dropshot::Query, WebsocketUpgrade 22 | | }] | |__- required by a bound introduced by this call 23 | async fn bad_channel( - | ^^^^^ the trait `dropshot::handler::HttpHandlerFunc<_, _, _>` is not implemented for fn item `fn(dropshot::Query, WebsocketUpgrade) -> impl Future, HttpError>> {bad_channel_adapter}` + | ^^^^^ the trait `dropshot::handler::HttpHandlerFunc<_, _, _>` is not implemented for fn item `fn(dropshot::Query, WebsocketUpgrade) -> impl Future, HttpError>> {bad_channel_adapter}` | note: required by a bound in `ApiEndpoint::::new` --> src/api_description.rs diff --git a/dropshot/tests/fail/bad_endpoint12.stderr b/dropshot/tests/fail/bad_endpoint12.stderr index aa8cd2285..c1b441d84 100644 --- a/dropshot/tests/fail/bad_endpoint12.stderr +++ b/dropshot/tests/fail/bad_endpoint12.stderr @@ -6,7 +6,7 @@ error[E0277]: the trait bound `String: HttpResponse` is not satisfied | = help: the following other types implement trait `HttpResponse`: HttpResponseHeaders - http::response::Response + http::response::Response = note: required for `String` to implement `HttpResponse` note: required for `Result` to implement `ResultTrait` --> tests/fail/bad_endpoint12.rs:15:6 diff --git a/dropshot/tests/fail/bad_trait_channel17.stderr b/dropshot/tests/fail/bad_trait_channel17.stderr index 4e5c7e76c..f9968d896 100644 --- a/dropshot/tests/fail/bad_trait_channel17.stderr +++ b/dropshot/tests/fail/bad_trait_channel17.stderr @@ -17,7 +17,7 @@ note: required by a bound in `ApiEndpoint::::new_for_types` | FuncParams: RequestExtractor + 'static, | ^^^^^^^^^^^^^^^^ required by this bound in `ApiEndpoint::::new_for_types` -error[E0277]: the trait bound `fn(dropshot::RequestContext<::Context>, dropshot::WebsocketConnection, WebsocketUpgrade) -> impl Future, HttpError>> {two_websocket_channels_adapter::}: dropshot::handler::HttpHandlerFunc<_, _, _>` is not satisfied +error[E0277]: the trait bound `fn(dropshot::RequestContext<::Context>, dropshot::WebsocketConnection, WebsocketUpgrade) -> impl Future, HttpError>> {two_websocket_channels_adapter::}: dropshot::handler::HttpHandlerFunc<_, _, _>` is not satisfied --> tests/fail/bad_trait_channel17.rs:15:5 | 10 | #[dropshot::api_description] @@ -28,7 +28,7 @@ error[E0277]: the trait bound `fn(dropshot::RequestContext<` is not implemented for fn item `fn(dropshot::RequestContext<::Context>, dropshot::WebsocketConnection, WebsocketUpgrade) -> impl Future, HttpError>> {two_websocket_channels_adapter::}` + | |_________^ the trait `dropshot::handler::HttpHandlerFunc<_, _, _>` is not implemented for fn item `fn(dropshot::RequestContext<::Context>, dropshot::WebsocketConnection, WebsocketUpgrade) -> impl Future, HttpError>> {two_websocket_channels_adapter::}` | note: required by a bound in `ApiEndpoint::::new` --> src/api_description.rs diff --git a/dropshot/tests/fail/bad_trait_channel18.stderr b/dropshot/tests/fail/bad_trait_channel18.stderr index 79e1e7678..62ff02712 100644 --- a/dropshot/tests/fail/bad_trait_channel18.stderr +++ b/dropshot/tests/fail/bad_trait_channel18.stderr @@ -56,7 +56,7 @@ note: required by a bound in `ApiEndpoint::::new_for_types` | FuncParams: RequestExtractor + 'static, | ^^^^^^^^^^^^^^^^ required by this bound in `ApiEndpoint::::new_for_types` -error[E0277]: the trait bound `fn(dropshot::RequestContext<::Context>, dropshot::WebsocketConnection, WebsocketUpgrade) -> impl Future, HttpError>> {websocket_channel_not_last_adapter::}: dropshot::handler::HttpHandlerFunc<_, _, _>` is not satisfied +error[E0277]: the trait bound `fn(dropshot::RequestContext<::Context>, dropshot::WebsocketConnection, WebsocketUpgrade) -> impl Future, HttpError>> {websocket_channel_not_last_adapter::}: dropshot::handler::HttpHandlerFunc<_, _, _>` is not satisfied --> tests/fail/bad_trait_channel18.rs:22:5 | 17 | #[dropshot::api_description] @@ -67,7 +67,7 @@ error[E0277]: the trait bound `fn(dropshot::RequestContext<` is not implemented for fn item `fn(dropshot::RequestContext<::Context>, dropshot::WebsocketConnection, WebsocketUpgrade) -> impl Future, HttpError>> {websocket_channel_not_last_adapter::}` + | |_________^ the trait `dropshot::handler::HttpHandlerFunc<_, _, _>` is not implemented for fn item `fn(dropshot::RequestContext<::Context>, dropshot::WebsocketConnection, WebsocketUpgrade) -> impl Future, HttpError>> {websocket_channel_not_last_adapter::}` | note: required by a bound in `ApiEndpoint::::new` --> src/api_description.rs diff --git a/dropshot/tests/fail/bad_trait_channel19.stderr b/dropshot/tests/fail/bad_trait_channel19.stderr index 0de4e222f..4ad609601 100644 --- a/dropshot/tests/fail/bad_trait_channel19.stderr +++ b/dropshot/tests/fail/bad_trait_channel19.stderr @@ -17,7 +17,7 @@ note: required by a bound in `ApiEndpoint::::new_for_types` | FuncParams: RequestExtractor + 'static, | ^^^^^^^^^^^^^^^^ required by this bound in `ApiEndpoint::::new_for_types` -error[E0277]: the trait bound `fn(dropshot::RequestContext<::Context>, std::string::String, WebsocketUpgrade) -> impl Future, HttpError>> {middle_not_shared_extractor_adapter::}: dropshot::handler::HttpHandlerFunc<_, _, _>` is not satisfied +error[E0277]: the trait bound `fn(dropshot::RequestContext<::Context>, std::string::String, WebsocketUpgrade) -> impl Future, HttpError>> {middle_not_shared_extractor_adapter::}: dropshot::handler::HttpHandlerFunc<_, _, _>` is not satisfied --> tests/fail/bad_trait_channel19.rs:23:5 | 19 | #[dropshot::api_description] @@ -28,7 +28,7 @@ error[E0277]: the trait bound `fn(dropshot::RequestContext<` is not implemented for fn item `fn(dropshot::RequestContext<::Context>, std::string::String, WebsocketUpgrade) -> impl Future, HttpError>> {middle_not_shared_extractor_adapter::}` + | |_________^ the trait `dropshot::handler::HttpHandlerFunc<_, _, _>` is not implemented for fn item `fn(dropshot::RequestContext<::Context>, std::string::String, WebsocketUpgrade) -> impl Future, HttpError>> {middle_not_shared_extractor_adapter::}` | note: required by a bound in `ApiEndpoint::::new` --> src/api_description.rs diff --git a/dropshot/tests/fail/bad_trait_channel4.stderr b/dropshot/tests/fail/bad_trait_channel4.stderr index dfccfd4c5..10f20116b 100644 --- a/dropshot/tests/fail/bad_trait_channel4.stderr +++ b/dropshot/tests/fail/bad_trait_channel4.stderr @@ -243,7 +243,7 @@ note: required by a bound in `dropshot::Query` | pub struct Query { | ^^^^^^^^^^^^^^^^ required by this bound in `Query` -error[E0277]: the trait bound `fn(dropshot::RequestContext<::Context>, dropshot::Query, WebsocketUpgrade) -> impl Future, HttpError>> {bad_channel_adapter::}: dropshot::handler::HttpHandlerFunc<_, _, _>` is not satisfied +error[E0277]: the trait bound `fn(dropshot::RequestContext<::Context>, dropshot::Query, WebsocketUpgrade) -> impl Future, HttpError>> {bad_channel_adapter::}: dropshot::handler::HttpHandlerFunc<_, _, _>` is not satisfied --> tests/fail/bad_trait_channel4.rs:20:5 | 16 | #[dropshot::api_description] @@ -254,7 +254,7 @@ error[E0277]: the trait bound `fn(dropshot::RequestContext<` is not implemented for fn item `fn(dropshot::RequestContext<::Context>, dropshot::Query, WebsocketUpgrade) -> impl Future, HttpError>> {bad_channel_adapter::}` + | |_________^ the trait `dropshot::handler::HttpHandlerFunc<_, _, _>` is not implemented for fn item `fn(dropshot::RequestContext<::Context>, dropshot::Query, WebsocketUpgrade) -> impl Future, HttpError>> {bad_channel_adapter::}` | note: required by a bound in `ApiEndpoint::::new` --> src/api_description.rs diff --git a/dropshot/tests/fail/bad_trait_channel5.stderr b/dropshot/tests/fail/bad_trait_channel5.stderr index 0b0bf270d..fe6cb8f0f 100644 --- a/dropshot/tests/fail/bad_trait_channel5.stderr +++ b/dropshot/tests/fail/bad_trait_channel5.stderr @@ -128,7 +128,7 @@ note: required by a bound in `dropshot::Query` | pub struct Query { | ^^^^^^^^^^^^^^^^ required by this bound in `Query` -error[E0277]: the trait bound `fn(dropshot::RequestContext<::Context>, dropshot::Query, WebsocketUpgrade) -> impl Future, HttpError>> {bad_channel_adapter::}: dropshot::handler::HttpHandlerFunc<_, _, _>` is not satisfied +error[E0277]: the trait bound `fn(dropshot::RequestContext<::Context>, dropshot::Query, WebsocketUpgrade) -> impl Future, HttpError>> {bad_channel_adapter::}: dropshot::handler::HttpHandlerFunc<_, _, _>` is not satisfied --> tests/fail/bad_trait_channel5.rs:21:5 | 17 | #[dropshot::api_description] @@ -139,7 +139,7 @@ error[E0277]: the trait bound `fn(dropshot::RequestContext<` is not implemented for fn item `fn(dropshot::RequestContext<::Context>, dropshot::Query, WebsocketUpgrade) -> impl Future, HttpError>> {bad_channel_adapter::}` + | |_________^ the trait `dropshot::handler::HttpHandlerFunc<_, _, _>` is not implemented for fn item `fn(dropshot::RequestContext<::Context>, dropshot::Query, WebsocketUpgrade) -> impl Future, HttpError>> {bad_channel_adapter::}` | note: required by a bound in `ApiEndpoint::::new` --> src/api_description.rs diff --git a/dropshot/tests/test_config.rs b/dropshot/tests/test_config.rs index 17c3864e1..bff8c6631 100644 --- a/dropshot/tests/test_config.rs +++ b/dropshot/tests/test_config.rs @@ -3,6 +3,7 @@ //! Tests for configuration file. use dropshot::test_util::read_config; +use dropshot::Body; use dropshot::{ ConfigDropshot, ConfigTls, HandlerTaskMode, HttpError, HttpResponseOk, RequestContext, @@ -119,11 +120,15 @@ fn make_config( // test logic trait TestConfigBindServer where - C: hyper::client::connect::Connect + Clone + Send + Sync + 'static, + C: hyper_util::client::legacy::connect::Connect + + Clone + + Send + + Sync + + 'static, { type Context: Send + Sync + 'static; - fn make_client(&self) -> hyper::Client; + fn make_client(&self) -> hyper_util::client::legacy::Client; fn make_server(&self, bind_port: u16) -> HttpServer; fn make_uri(&self, bind_port: u16) -> hyper::Uri; } @@ -132,7 +137,11 @@ where // it binds to ports as expected. async fn test_config_bind_server(test_config: T, bind_port: u16) where - C: hyper::client::connect::Connect + Clone + Send + Sync + 'static, + C: hyper_util::client::legacy::connect::Connect + + Clone + + Send + + Sync + + 'static, T: TestConfigBindServer, { let client = test_config.make_client(); @@ -185,15 +194,22 @@ async fn test_config_bind_address_http() { struct ConfigBindServerHttp { log: slog::Logger, } - impl TestConfigBindServer + impl + TestConfigBindServer for ConfigBindServerHttp { type Context = i32; fn make_client( &self, - ) -> hyper::Client { - hyper::Client::new() + ) -> hyper_util::client::legacy::Client< + hyper_util::client::legacy::connect::HttpConnector, + Body, + > { + hyper_util::client::legacy::Client::builder( + hyper_util::rt::TokioExecutor::new(), + ) + .build(hyper_util::client::legacy::connect::HttpConnector::new()) } fn make_uri(&self, bind_port: u16) -> hyper::Uri { @@ -228,15 +244,20 @@ async fn test_config_bind_address_https() { impl<'a> TestConfigBindServer< - hyper_rustls::HttpsConnector, + hyper_rustls::HttpsConnector< + hyper_util::client::legacy::connect::HttpConnector, + >, > for ConfigBindServerHttps<'a> { type Context = i32; fn make_client( &self, - ) -> hyper::Client< - hyper_rustls::HttpsConnector, + ) -> hyper_util::client::legacy::Client< + hyper_rustls::HttpsConnector< + hyper_util::client::legacy::connect::HttpConnector, + >, + Body, > { // Configure TLS to trust the self-signed cert let mut root_store = rustls::RootCertStore { roots: vec![] }; @@ -252,7 +273,10 @@ async fn test_config_bind_address_https() { .https_only() .enable_http1() .build(); - hyper::Client::builder().build(https_connector) + hyper_util::client::legacy::Client::builder( + hyper_util::rt::TokioExecutor::new(), + ) + .build(https_connector) } fn make_uri(&self, bind_port: u16) -> hyper::Uri { @@ -300,15 +324,20 @@ async fn test_config_bind_address_https_buffer() { impl<'a> TestConfigBindServer< - hyper_rustls::HttpsConnector, + hyper_rustls::HttpsConnector< + hyper_util::client::legacy::connect::HttpConnector, + >, > for ConfigBindServerHttps<'a> { type Context = i32; fn make_client( &self, - ) -> hyper::Client< - hyper_rustls::HttpsConnector, + ) -> hyper_util::client::legacy::Client< + hyper_rustls::HttpsConnector< + hyper_util::client::legacy::connect::HttpConnector, + >, + Body, > { // Configure TLS to trust the self-signed cert let mut root_store = rustls::RootCertStore { roots: vec![] }; @@ -324,7 +353,11 @@ async fn test_config_bind_address_https_buffer() { .https_only() .enable_http1() .build(); - hyper::Client::builder().build(https_connector) + + hyper_util::client::legacy::Client::builder( + hyper_util::rt::TokioExecutor::new(), + ) + .build(https_connector) } fn make_uri(&self, bind_port: u16) -> hyper::Uri { @@ -396,15 +429,21 @@ struct ConfigHandlerTaskModeHttp { log: slog::Logger, } -impl TestConfigBindServer +impl TestConfigBindServer for ConfigHandlerTaskModeHttp { type Context = HandlerTaskModeContext; fn make_client( &self, - ) -> hyper::Client { - hyper::Client::new() + ) -> hyper_util::client::legacy::Client< + hyper_util::client::legacy::connect::HttpConnector, + Body, + > { + hyper_util::client::legacy::Client::builder( + hyper_util::rt::TokioExecutor::new(), + ) + .build(hyper_util::client::legacy::connect::HttpConnector::new()) } fn make_server(&self, bind_port: u16) -> HttpServer { diff --git a/dropshot/tests/test_demo.rs b/dropshot/tests/test_demo.rs index 8ab1f1865..6d26d047e 100644 --- a/dropshot/tests/test_demo.rs +++ b/dropshot/tests/test_demo.rs @@ -25,6 +25,7 @@ use dropshot::test_util::read_string; use dropshot::test_util::TEST_HEADER_1; use dropshot::test_util::TEST_HEADER_2; use dropshot::ApiDescription; +use dropshot::Body; use dropshot::HttpError; use dropshot::HttpResponseDeleted; use dropshot::HttpResponseFound; @@ -47,7 +48,6 @@ use futures::stream::StreamExt; use futures::SinkExt; use futures::TryStreamExt; use http::StatusCode; -use hyper::Body; use hyper::Method; use hyper::Response; use schemars::JsonSchema; @@ -1255,12 +1255,14 @@ async fn demo_handler_raw_request( _rqctx: RequestContext, raw_request: RawRequest, ) -> Result, HttpError> { + use http_body_util::BodyExt; + let request = raw_request.into_inner(); let (parts, body) = request.into_parts(); // This is not generally a good pattern because it allows untrusted // consumers to use up all memory. This is just a narrow test. - let whole_body = hyper::body::to_bytes(body).await.unwrap(); + let whole_body = body.collect().await.unwrap().to_bytes(); Ok(HttpResponseOk(DemoRaw { nbytes: whole_body.len(), method: parts.method.to_string(), diff --git a/dropshot/tests/test_detached_shutdown.rs b/dropshot/tests/test_detached_shutdown.rs index b5e755ff8..2c11f7fbe 100644 --- a/dropshot/tests/test_detached_shutdown.rs +++ b/dropshot/tests/test_detached_shutdown.rs @@ -3,11 +3,11 @@ //! Test cases for graceful shutdown of a server running tasks in //! `HandlerTaskMode::Detached`. +use dropshot::Body; use dropshot::{ endpoint, ApiDescription, HandlerTaskMode, HttpError, RequestContext, }; use http::{Method, Response, StatusCode}; -use hyper::Body; use std::time::Duration; use tokio::sync::mpsc; diff --git a/dropshot/tests/test_multipart.rs b/dropshot/tests/test_multipart.rs index c9ec51491..3728ce501 100644 --- a/dropshot/tests/test_multipart.rs +++ b/dropshot/tests/test_multipart.rs @@ -4,10 +4,9 @@ use dropshot::test_util::read_string; use dropshot::{ - endpoint, ApiDescription, HttpError, MultipartBody, RequestContext, + endpoint, ApiDescription, Body, HttpError, MultipartBody, RequestContext, }; use http::{Method, Response, StatusCode}; -use hyper::Body; extern crate slog; diff --git a/dropshot/tests/test_openapi.rs b/dropshot/tests/test_openapi.rs index d4402a8b5..039fd7e31 100644 --- a/dropshot/tests/test_openapi.rs +++ b/dropshot/tests/test_openapi.rs @@ -1,5 +1,6 @@ // Copyright 2023 Oxide Computer Company +use dropshot::Body; use dropshot::{ endpoint, http_response_found, http_response_see_other, http_response_temporary_redirect, ApiDescription, @@ -10,7 +11,6 @@ use dropshot::{ PaginationParams, Path, Query, RequestContext, ResultsPage, TagConfig, TagDetails, TypedBody, UntypedBody, }; -use hyper::Body; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::{collections::HashMap, io::Cursor, str::from_utf8}; @@ -362,7 +362,7 @@ async fn handler17( async fn handler18( _rqctx: RequestContext<()>, ) -> Result, HttpError> { - let (_, body) = Body::channel(); + let body = Body::empty(); Ok(HttpResponseOk(body.into())) } diff --git a/dropshot/tests/test_pagination.rs b/dropshot/tests/test_pagination.rs index 0c1fdf64e..51d3d3ab0 100644 --- a/dropshot/tests/test_pagination.rs +++ b/dropshot/tests/test_pagination.rs @@ -11,6 +11,7 @@ use dropshot::test_util::objects_list_page; use dropshot::test_util::ClientTestContext; use dropshot::test_util::LogContext; use dropshot::ApiDescription; +use dropshot::Body; use dropshot::ConfigLogging; use dropshot::ConfigLoggingIfExists; use dropshot::ConfigLoggingLevel; @@ -25,8 +26,6 @@ use dropshot::ResultsPage; use dropshot::WhichPage; use http::Method; use http::StatusCode; -use hyper::Body; -use hyper::Client; use hyper::Request; use schemars::JsonSchema; use serde::de::DeserializeOwned; @@ -832,7 +831,10 @@ async fn start_example(path: &str, port: u16) -> ExampleContext { let server_addr = SocketAddr::from((Ipv4Addr::LOCALHOST, port)); let client = ClientTestContext::new(server_addr, logctx.log.new(o!())); let url = client.url("/"); - let raw_client = Client::new(); + let raw_client = hyper_util::client::legacy::Client::builder( + hyper_util::rt::TokioExecutor::new(), + ) + .build(hyper_util::client::legacy::connect::HttpConnector::new()); let rv = ExampleContext { child, client, logctx: Some(logctx) }; while start.elapsed().as_secs() < 10 { diff --git a/dropshot/tests/test_streaming.rs b/dropshot/tests/test_streaming.rs index d931cc46d..fffa51bef 100644 --- a/dropshot/tests/test_streaming.rs +++ b/dropshot/tests/test_streaming.rs @@ -2,10 +2,10 @@ //! Test cases for streaming requests. +use dropshot::Body; use dropshot::{endpoint, ApiDescription, HttpError, RequestContext}; use http::{Method, Response, StatusCode}; -use hyper::{body::HttpBody, Body}; -use hyper_staticfile::FileBytesStream; +use http_body_util::BodyExt; use tokio::io::{AsyncSeekExt, AsyncWriteExt}; extern crate slog; @@ -46,10 +46,10 @@ async fn api_streaming( } file.seek(std::io::SeekFrom::Start(0)).await.unwrap(); - let file_stream = FileBytesStream::new(file); - Ok(Response::builder() - .status(StatusCode::OK) - .body(file_stream.into_body())?) + let file_access = hyper_staticfile::vfs::TokioFileAccess::new(file); + let file_stream = hyper_staticfile::util::FileBytesStream::new(file_access); + let body = Body::wrap(hyper_staticfile::Body::Full(file_stream)); + Ok(Response::builder().status(StatusCode::OK).body(body)?) } #[endpoint { @@ -64,8 +64,8 @@ async fn api_not_streaming( .body(serde_json::to_string("not-streaming").unwrap().into())?) } -fn check_has_transfer_encoding( - response: &Response, +fn check_has_transfer_encoding( + response: &Response, expected_value: Option<&str>, ) { let transfer_encoding_header = response.headers().get("transfer-encoding"); @@ -96,10 +96,12 @@ async fn test_streaming_server_streaming_client() { let mut chunk_count = 0; let mut byte_count = 0; - while let Some(chunk) = response.body_mut().data().await { + while let Some(chunk) = response.body_mut().frame().await { let chunk = chunk.expect("Should have received chunk without error"); - byte_count += chunk.len(); - chunk_count += 1; + if let Ok(chunk) = chunk.into_data() { + byte_count += chunk.len(); + chunk_count += 1; + } } assert!( @@ -128,9 +130,12 @@ async fn test_streaming_server_buffered_client() { .expect("Expected GET request to succeed"); check_has_transfer_encoding(&response, Some("chunked")); - let body_bytes = hyper::body::to_bytes(response.body_mut()) + let body_bytes = response + .body_mut() + .collect() .await - .expect("Error reading body"); + .expect("Error reading body") + .to_bytes(); assert_eq!( BUF_SIZE * BUF_COUNT, body_bytes.len(), diff --git a/dropshot/tests/test_tls.rs b/dropshot/tests/test_tls.rs index 233cfce2c..3b6f14d7e 100644 --- a/dropshot/tests/test_tls.rs +++ b/dropshot/tests/test_tls.rs @@ -90,8 +90,11 @@ fn make_https_client< T: rustls::client::danger::ServerCertVerifier + Send + Sync + 'static, >( verifier: Arc, -) -> hyper::Client< - hyper_rustls::HttpsConnector, +) -> hyper_util::client::legacy::Client< + hyper_rustls::HttpsConnector< + hyper_util::client::legacy::connect::HttpConnector, + >, + dropshot::Body, > { let tls_config = rustls::ClientConfig::builder() .dangerous() @@ -102,7 +105,10 @@ fn make_https_client< .https_only() .enable_http1() .build(); - hyper::Client::builder().build(https_connector) + hyper_util::client::legacy::Client::builder( + hyper_util::rt::TokioExecutor::new(), + ) + .build(https_connector) } fn make_server( @@ -157,7 +163,7 @@ async fn test_tls_certificate_loading() { let request = hyper::Request::builder() .method(http::method::Method::GET) .uri(&uri) - .body(hyper::Body::empty()) + .body(dropshot::Body::empty()) .unwrap(); let verifier_called = Arc::new(AtomicUsize::new(0)); @@ -212,14 +218,14 @@ async fn test_tls_only() { let https_request = hyper::Request::builder() .method(http::method::Method::GET) .uri(&https_uri) - .body(hyper::Body::empty()) + .body(dropshot::Body::empty()) .unwrap(); let http_uri: hyper::Uri = format!("http://localhost:{}/", port).parse().unwrap(); let http_request = hyper::Request::builder() .method(http::method::Method::GET) .uri(&http_uri) - .body(hyper::Body::empty()) + .body(dropshot::Body::empty()) .unwrap(); let https_client = make_https_client(make_pki_verifier(&certs)); @@ -227,16 +233,20 @@ async fn test_tls_only() { // Send an HTTP request, it should fail due to parse error, since // the server and client are speaking different protocols - let http_client = hyper::Client::builder().build_http(); - let error = http_client.request(http_request).await.unwrap_err(); - assert!(error.is_parse()); + let http_client = hyper_util::client::legacy::Client::builder( + hyper_util::rt::TokioExecutor::new(), + ) + .build(hyper_util::client::legacy::connect::HttpConnector::new()); + let _error = http_client.request(http_request).await.unwrap_err(); + // cannot check if it is a "hyper parse error", but would like to if + // hyper::Error gains the ability in the future // Make an HTTPS request again, to make sure the HTTP client didn't // interfere with HTTPS request processing let https_request = hyper::Request::builder() .method(http::method::Method::GET) .uri(&https_uri) - .body(hyper::Body::empty()) + .body(dropshot::Body::empty()) .unwrap(); https_client.request(https_request).await.unwrap(); @@ -264,7 +274,7 @@ async fn test_tls_refresh_certificates() { hyper::Request::builder() .method(http::method::Method::GET) .uri(&https_uri) - .body(hyper::Body::empty()) + .body(dropshot::Body::empty()) .unwrap() }; @@ -447,7 +457,7 @@ async fn test_server_is_https() { let https_request = hyper::Request::builder() .method(http::method::Method::GET) .uri(format!("https://localhost:{}/?tls=true", port)) - .body(hyper::Body::empty()) + .body(dropshot::Body::empty()) .unwrap(); let res = https_client.request(https_request).await.unwrap(); assert_eq!(res.status(), hyper::StatusCode::OK); @@ -456,7 +466,7 @@ async fn test_server_is_https() { let https_request = hyper::Request::builder() .method(http::method::Method::GET) .uri(format!("https://localhost:{}/?tls=false", port)) - .body(hyper::Body::empty()) + .body(dropshot::Body::empty()) .unwrap(); let res = https_client.request(https_request).await.unwrap(); assert_eq!(res.status(), hyper::StatusCode::BAD_REQUEST);