Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 39 additions & 8 deletions src/proto/h1/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,41 @@ use crate::upgrade::OnUpgrade;
pub(crate) struct Dispatcher<D, Bs: Body, I, T> {
conn: Conn<I, Bs::Data, T>,
dispatch: D,
body_tx: Option<crate::body::Sender>,
body_tx: SenderDropGuard,
body_rx: Pin<Box<Option<Bs>>>,
is_closing: bool,
}

/// A drop guard for the body `Sender`.
///
/// If the `Dispatcher` future is dropped (e.g. the runtime driving the
/// connection is shut down) while it still owns a body `Sender`, the guard
/// sends an incomplete-message error so the receiver sees an error instead
/// of a silent, clean end-of-stream.
struct SenderDropGuard(Option<crate::body::Sender>);

impl SenderDropGuard {
fn none() -> Self {
SenderDropGuard(None)
}

fn set(&mut self, sender: crate::body::Sender) {
self.0 = Some(sender);
}

fn take(&mut self) -> Option<crate::body::Sender> {
self.0.take()
}
}

impl Drop for SenderDropGuard {
fn drop(&mut self) {
if let Some(mut sender) = self.0.take() {
sender.send_error(crate::Error::new_incomplete());
}
}
}

pub(crate) trait Dispatch {
type PollItem;
type PollBody;
Expand Down Expand Up @@ -81,7 +111,7 @@ where
Dispatcher {
conn,
dispatch,
body_tx: None,
body_tx: SenderDropGuard::none(),
body_rx: Box::pin(None),
is_closing: false,
}
Expand Down Expand Up @@ -126,7 +156,8 @@ where
should_shutdown: bool,
) -> Poll<crate::Result<Dispatched>> {
Poll::Ready(ready!(self.poll_inner(cx, should_shutdown)).or_else(|e| {
// Be sure to alert a streaming body of the failure.
// Be sure to alert a streaming body of the failure with a
// more specific error than the drop guard would provide.
if let Some(mut body) = self.body_tx.take() {
body.send_error(crate::Error::new_body("connection error"));
}
Expand Down Expand Up @@ -203,7 +234,7 @@ where
match body.poll_ready(cx) {
Poll::Ready(Ok(())) => (),
Poll::Pending => {
self.body_tx = Some(body);
self.body_tx.set(body);
return Poll::Pending;
}
Poll::Ready(Err(_canceled)) => {
Expand All @@ -220,7 +251,7 @@ where
let chunk = frame.into_data().unwrap_or_else(|_| unreachable!());
match body.try_send_data(chunk) {
Ok(()) => {
self.body_tx = Some(body);
self.body_tx.set(body);
}
Err(_canceled) => {
if self.conn.can_read_body() {
Expand All @@ -234,7 +265,7 @@ where
frame.into_trailers().unwrap_or_else(|_| unreachable!());
match body.try_send_trailers(trailers) {
Ok(()) => {
self.body_tx = Some(body);
self.body_tx.set(body);
}
Err(_canceled) => {
if self.conn.can_read_body() {
Expand All @@ -252,7 +283,7 @@ where
// just drop, the body will close automatically
}
Poll::Pending => {
self.body_tx = Some(body);
self.body_tx.set(body);
return Poll::Pending;
}
Poll::Ready(Some(Err(e))) => {
Expand Down Expand Up @@ -287,7 +318,7 @@ where
other => {
let (tx, rx) =
IncomingBody::new_channel(other, wants.contains(Wants::EXPECT));
self.body_tx = Some(tx);
self.body_tx.set(tx);
rx
}
};
Expand Down