Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions dropshot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ categories = ["network-programming", "web-programming::http-server"]
async-stream = "0.3.3"
async-trait = "0.1.60"
base64 = "0.20.0"
buf-list = "1.0.0"
bytes = "1"
futures = "0.3.25"
hostname = "0.3.0"
http = "0.2.8"
indexmap = "1.9.2"
paste = "1.0.11"
percent-encoding = "2.2.0"
pin-project-lite = "0.2.9"
proc-macro2 = "1.0.49"
rustls = "0.20.7"
rustls-pemfile = "1.0.1"
Expand Down
256 changes: 219 additions & 37 deletions dropshot/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@

use super::error::HttpError;
use super::http_util::http_extract_path_params;
use super::http_util::http_read_body;
use super::http_util::CONTENT_TYPE_JSON;
use super::http_util::CONTENT_TYPE_OCTET_STREAM;
use super::server::DropshotState;
Expand All @@ -52,14 +51,25 @@ use crate::router::VariableSet;
use crate::to_map::to_map;
use crate::websocket::WEBSOCKET_PARAM_SENTINEL;

use async_stream::stream;
use async_trait::async_trait;
use buf_list::BufList;
use bytes::BufMut;
use bytes::Bytes;
use bytes::BytesMut;
use futures::lock::Mutex;
use futures::ready;
use futures::stream::BoxStream;
use futures::Stream;
use futures::StreamExt;
use futures::TryStreamExt;
use http::HeaderMap;
use http::StatusCode;
use hyper::body::HttpBody;
use hyper::Body;
use hyper::Request;
use hyper::Response;
use pin_project_lite::pin_project;
use schemars::schema::InstanceType;
use schemars::schema::SchemaObject;
use schemars::JsonSchema;
Expand All @@ -74,7 +84,9 @@ use std::fmt::Result as FmtResult;
use std::future::Future;
use std::marker::PhantomData;
use std::num::NonZeroU32;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;

/**
* Type alias for the result returned by HTTP handler functions.
Expand Down Expand Up @@ -971,13 +983,15 @@ where
BodyType: JsonSchema + DeserializeOwned + Send + Sync,
{
let server = &rqctx.server;
let mut request = rqctx.request.lock().await;
let body = http_read_body(
request.body_mut(),
server.config.request_body_max_bytes,
)
let body = UntypedBody {
request: rqctx.request.clone(),
max_bytes: server.config.request_body_max_bytes,
}
.into_bytes()
.await?;

let request = rqctx.request.lock().await;

// RFC 7231 §3.1.1.1: media types are case insensitive and may
// be followed by whitespace and/or a parameter (e.g., charset),
// which we currently ignore.
Expand Down Expand Up @@ -1065,57 +1079,137 @@ where
}
}

/*
* UntypedBody: body extractor for a plain array of bytes of a body.
*/

/**
* `UntypedBody` is an extractor for reading in the contents of the HTTP request
* body and making the raw bytes directly available to the consumer.
*/
/// An extractor for raw bytes.
///
/// `UntypedBody` is meant to read in the contents of an HTTP request body as a
/// series of raw bytes. Unlike [`TypedBody`], an `UntypedBody` represents a
/// read that hasn't happened yet; a method like
/// [`into_bytes`](Self::into_bytes) or [`into_stream`](Self::into_stream) must
/// be called to read the full body.
#[derive(Debug)]
pub struct UntypedBody {
content: Bytes,
request: Arc<Mutex<Request<Body>>>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it okay to take this lock at the point where we take it? Previously, we read this whole thing earlier. I get why we don't do this now, but that means we're taking a lock later in the request processing. I thought we'd be holding the lock for longer, too, but I don't think that's true.

max_bytes: usize,
}

impl UntypedBody {
/**
* Returns a byte slice of the underlying body content.
*/
/*
* TODO drop this in favor of Deref? + Display and Debug for convenience?
*/
pub fn as_bytes(&self) -> &[u8] {
&self.content
/// Reads the body into a single, contiguous [`Bytes`] chunk, up to the
/// server's configured maximum body size.
///
/// Recommended for smaller request bodies. Constructing a single `Bytes`
/// chunk from larger request bodies might cause excessive copying and
/// reallocations.
///
/// # Errors
///
/// Errors if there's an underlying HTTP error. Returns a "400 Bad Request"
/// error if the request body is too large.
pub async fn into_bytes(self) -> Result<Bytes, HttpError> {
let mut stream = self.into_stream();
let mut bytes = BytesMut::new();

while let Some(data) = stream.next().await {
bytes.put(data?);
}

Ok(bytes.freeze())
}

/**
* Convenience wrapper to convert the body to a UTF-8 string slice,
* returning a 400-level error if the body is not valid UTF-8.
*/
pub fn as_str(&self) -> Result<&str, HttpError> {
std::str::from_utf8(self.as_bytes()).map_err(|e| {
/// Reads the body into a `String`, up to the server's configured maximum
/// body size.
///
/// # Errors
///
/// In addition to the errors returned by [`into_bytes`](Self::into_bytes),
/// returns a "400 Bad Request" error if the if the body is not valid UTF-8.
pub async fn into_string(self) -> Result<String, HttpError> {
let v = Vec::from(self.into_bytes().await?);
String::from_utf8(v).map_err(|e| {
HttpError::for_bad_request(
None,
format!("failed to parse body as UTF-8 string: {}", e),
)
})
}

/// Reads the body into a [`BufList`], up to the server's configured maximum
/// body size.
///
/// A `BufList` is a list of [`Bytes`] chunks that implements the
/// [`Buf`](bytes::Buf) trait. A `BufList` chunks can be operated on
/// as a unit.
///
/// Recommended over [`into_bytes`](Self::into_bytes) or
/// [`into_string`](Self::into_string) for larger request bodies. Like those
/// functions, this function fails if the body exceeds the server's
/// configured maximum body size.
pub async fn into_buf_list(self) -> Result<BufList, HttpError> {
self.into_stream().try_collect().await
}

/// Reads the body into a [`BufList`], with a custom limit for the maximum
/// body size.
///
/// This method is similar to [`into_buf_list`](Self::into_buf_list), except
/// it specifies a custom limit. If this method is called, the default
/// [`request_body_max_bytes`](ServerConfig::request_body_max_bytes) limit
/// is ignored.
pub async fn into_buf_list_with_limit(
self,
max_bytes: usize,
) -> Result<BufList, HttpError> {
self.into_stream().with_limit(max_bytes).try_collect().await
}

/// Converts `self` into a [`Stream`] of `Result<Bytes, HttpError>` chunks.
///
/// By default, the stream is limited to the server's configured maximum
/// body size. To set a different maximum body size, call
/// [`with_limit`](UntypedBodyStream::with_limit) on the returned stream.
///
/// # Errors
///
/// The stream errors if there's an underlying HTTP error, or if the request
/// body exceeds the limit.
pub fn into_stream(self) -> UntypedBodyStream {
let max_bytes = self.max_bytes;

let stream = stream! {
let mut request = self.request.lock().await;
let body = request.body_mut();

'outer: {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know I said I'd reserve my feedback until you go through @davepacheco's comments, but free advice on this: it might be less work to defer the use of relatively-new structures rather than potentially impacting some dropshot consumer at Oxide that's on an older rust version. My personal threshold is about 4-6 months in terms of my willingness to just hope that folks are up-to-date.

while let Some(data) = body.data().await {
match data {
Ok(data) => yield Ok(data),
Err(e) => {
yield Err(HttpError::from(e));
break 'outer;
}
}
}

// Read the trailers even though we aren't going to do anything
// with them.
if let Err(e) = body.trailers().await {
yield Err(HttpError::from(e));
}
}
};

UntypedBodyStream::new(stream, max_bytes)
}
}

#[async_trait]
impl Extractor for UntypedBody {
async fn from_request<Context: ServerContext>(
rqctx: Arc<RequestContext<Context>>,
) -> Result<UntypedBody, HttpError> {
let server = &rqctx.server;
let mut request = rqctx.request.lock().await;
let body_bytes = http_read_body(
request.body_mut(),
server.config.request_body_max_bytes,
)
.await?;
Ok(UntypedBody { content: body_bytes })
Ok(Self {
request: rqctx.request.clone(),
max_bytes: rqctx.server.config.request_body_max_bytes,
})
}

fn metadata(
Expand Down Expand Up @@ -1143,6 +1237,94 @@ impl Extractor for UntypedBody {
}
}

pin_project! {
/// A stream over an HTTP body. This stream that a limit on the number of
/// bytes that can be read from it.
pub struct UntypedBodyStream {
// TODO: replace with concrete type once TAIT is stabilized.
#[pin]
stream: BoxStream<'static, Result<Bytes, HttpError>>,
max_bytes: usize,
current_bytes: usize,
}
}

impl UntypedBodyStream {
pub(crate) fn new(
stream: impl Stream<Item = Result<Bytes, HttpError>> + Send + 'static,
max_bytes: usize,
) -> Self {
Self { stream: stream.boxed(), max_bytes, current_bytes: 0 }
}

/// Returns the maximum number of bytes that can be read from this stream.
pub fn max_bytes(&self) -> usize {
self.max_bytes
}

/// Returns the current number of bytes read from the stream.
pub fn current_bytes(&self) -> usize {
self.current_bytes
}

/// Sets a new limit on the stream.
///
/// [`Self::current_bytes`] does not change.
pub fn set_limit(&mut self, new_limit: usize) -> &mut Self {
self.max_bytes = new_limit;
self
}

/// An owned version of [`set_limit`](Self::set_limit).
pub fn with_limit(mut self, new_limit: usize) -> Self {
self.set_limit(new_limit);
self
}
}

impl Stream for UntypedBodyStream {
type Item = Result<Bytes, HttpError>;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let this = self.as_mut().project();
let result = ready!(this.stream.poll_next(cx));
let bytes = match result {
Some(Ok(bytes)) => bytes,
x @ None | x @ Some(Err(_)) => return Poll::Ready(x),
};

let max_bytes = *this.max_bytes;

let is_too_large =
Arc::new(this.current_bytes.checked_add(bytes.len()))
.map_or(true, |x| x > max_bytes);
if is_too_large {
// The request was too large. Drain the rest of the stream.
while let Some(data) =
futures::ready!(self.as_mut().project().stream.poll_next(cx))
{
if let Err(e) = data {
return Poll::Ready(Some(Err(e)));
}
}
return Poll::Ready(Some(Err(HttpError::for_bad_request(
None,
format!(
"request body exceeded maximum size of {} bytes",
max_bytes
),
))));
}

*this.current_bytes += bytes.len();

Poll::Ready(Some(Ok(bytes)))
}
}

/*
* Response Type Conversion
*
Expand Down
Loading