From 4e685f2359ebc460795063f9b6db373bdd6567d6 Mon Sep 17 00:00:00 2001 From: Pierre Barre Date: Thu, 5 Feb 2026 12:57:56 +0100 Subject: [PATCH] fix(http1): send error when dispatcher is dropped mid-body (#3995) Wrap body_tx in a SenderDropGuard that sends an IncompleteMessage error if the Dispatcher future is dropped while still owning a body Sender, preventing silent data truncation. --- src/proto/h1/dispatch.rs | 47 +++++++++++++++++++++++++++++++++------- 1 file changed, 39 insertions(+), 8 deletions(-) diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 5daeb5ebf6..41ad8c3439 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -22,11 +22,41 @@ use crate::upgrade::OnUpgrade; pub(crate) struct Dispatcher { conn: Conn, dispatch: D, - body_tx: Option, + body_tx: SenderDropGuard, body_rx: Pin>>, is_closing: bool, } +/// A drop guard for the body `Sender`. +/// +/// If the `Dispatcher` future is dropped (e.g. the runtime driving the +/// connection is shut down) while it still owns a body `Sender`, the guard +/// sends an incomplete-message error so the receiver sees an error instead +/// of a silent, clean end-of-stream. +struct SenderDropGuard(Option); + +impl SenderDropGuard { + fn none() -> Self { + SenderDropGuard(None) + } + + fn set(&mut self, sender: crate::body::Sender) { + self.0 = Some(sender); + } + + fn take(&mut self) -> Option { + self.0.take() + } +} + +impl Drop for SenderDropGuard { + fn drop(&mut self) { + if let Some(mut sender) = self.0.take() { + sender.send_error(crate::Error::new_incomplete()); + } + } +} + pub(crate) trait Dispatch { type PollItem; type PollBody; @@ -81,7 +111,7 @@ where Dispatcher { conn, dispatch, - body_tx: None, + body_tx: SenderDropGuard::none(), body_rx: Box::pin(None), is_closing: false, } @@ -126,7 +156,8 @@ where should_shutdown: bool, ) -> Poll> { Poll::Ready(ready!(self.poll_inner(cx, should_shutdown)).or_else(|e| { - // Be sure to alert a streaming body of the failure. + // Be sure to alert a streaming body of the failure with a + // more specific error than the drop guard would provide. if let Some(mut body) = self.body_tx.take() { body.send_error(crate::Error::new_body("connection error")); } @@ -203,7 +234,7 @@ where match body.poll_ready(cx) { Poll::Ready(Ok(())) => (), Poll::Pending => { - self.body_tx = Some(body); + self.body_tx.set(body); return Poll::Pending; } Poll::Ready(Err(_canceled)) => { @@ -220,7 +251,7 @@ where let chunk = frame.into_data().unwrap_or_else(|_| unreachable!()); match body.try_send_data(chunk) { Ok(()) => { - self.body_tx = Some(body); + self.body_tx.set(body); } Err(_canceled) => { if self.conn.can_read_body() { @@ -234,7 +265,7 @@ where frame.into_trailers().unwrap_or_else(|_| unreachable!()); match body.try_send_trailers(trailers) { Ok(()) => { - self.body_tx = Some(body); + self.body_tx.set(body); } Err(_canceled) => { if self.conn.can_read_body() { @@ -252,7 +283,7 @@ where // just drop, the body will close automatically } Poll::Pending => { - self.body_tx = Some(body); + self.body_tx.set(body); return Poll::Pending; } Poll::Ready(Some(Err(e))) => { @@ -287,7 +318,7 @@ where other => { let (tx, rx) = IncomingBody::new_channel(other, wants.contains(Wants::EXPECT)); - self.body_tx = Some(tx); + self.body_tx.set(tx); rx } };