Skip to content

Body dropped too late when an h1 handshake hits an error #4122

Description

@alexcrichton

Version

hyper 1.10.1

Platform

Linux x86_64 7.0.0-27-generic

Summary

Wasmtime hit a spurious failure in a test recently in CI that I think I've diagnosed to some code in hyper. The test in question is a wasm guest that issues an HTTP/1.1 request to a server that only works with HTTP/2, and the expectation is that the wasm guest gets a HttpProtocolError within wasm. This test ended up timing out in CI and getting a timeout failure instead.

After some investigation it looks like this is due hyper's use of tokio's channels and the precise synchronization/sequencing of sending a request from one channel to another. The code in question attached here is a small program which showcases this race where the intention of the test is:

  • No actual I/O is performed, it's just in-memory data structures.
  • An http1 handshake is done against a connection that reads corrupt data (sort of mimicking what an http2 server might initiate with)
  • The code in question sends a request, completes the connection I/O, and then asserts that the request was dropped.
  • There's various bits and pieces of the program intended to stress scheduling to make the bug appear more often.

Code Sample

use bytes::Bytes;
use http_body::Body;
use hyper::rt::{Read, ReadBufCursor, Write};
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Barrier};
use std::task::{Context, Poll, Waker};

fn main() {
    // start up some background work to stress scheduling a bit more.
    for _ in 0..16 {
        std::thread::spawn(|| loop {
            std::hint::black_box(std::time::Instant::now());
        });
    }

    let rt = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap();
    let _guard = rt.enter();

    for _ in 0..2_000_000 {
        // Handshake against the mock IO. handshake() itself does no I/O.
        let (mut sender, mut conn) = match rt
            .block_on(hyper::client::conn::http1::handshake::<_, TrackedBody>(
                MockIo::default(),
            )) {
            Ok(p) => p,
            Err(_) => continue,
        };

        let dropped = Arc::new(AtomicBool::new(false));
        let req = hyper::Request::connect("http://x/")
            .body(TrackedBody {
                dropped: dropped.clone(),
            })
            .unwrap();

        let barrier = Arc::new(Barrier::new(2));

        // Thread B: dispatch the request the instant the barrier releases.
        let b2 = barrier.clone();
        let sender_thread = std::thread::spawn(move || {
            b2.wait();
            let fut = sender.send_request(req);
            (fut, sender)
        });

        barrier.wait();
        let waker = Waker::noop();
        let mut cx = Context::from_waker(&waker);
        for _ in 0..4 {
            if Pin::new(&mut conn).poll(&mut cx).is_ready() {
                break;
            }
        }
        drop(conn);

        let (fut, sender2) = sender_thread.join().unwrap();
        drop(fut);

        // At this point the future from `send_request` is gone, the receiver
        // through `conn` is gone, so it should be the case that our body in the
        // request that was sent was dropped. However this isn't always true so
        // sometimes this will print out.
        if !dropped.load(Ordering::SeqCst) {
            println!("body wasn't dropped");
        }
        drop(sender2);
        assert!(
            dropped.load(Ordering::SeqCst),
            "request must be reclaimed once the SendRequest drops"
        );
    }
}

#[derive(Default)]
struct MockIo {
    gave: bool,
}

impl Read for MockIo {
    fn poll_read(
        mut self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
        mut buf: ReadBufCursor<'_>,
    ) -> Poll<io::Result<()>> {
        // read enough bytes that the connection should terminate with an error.
        if !self.gave {
            self.gave = true;
            buf.put_slice(&[0u8; 9]);
        }
        Poll::Ready(Ok(()))
    }
}

impl Write for MockIo {
    fn poll_write(
        self: Pin<&mut Self>,
        _: &mut Context<'_>,
        _buf: &[u8],
    ) -> Poll<io::Result<usize>> {
        // shouldn't be called for h1
        unreachable!()
    }
    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
        Poll::Ready(Ok(()))
    }
    fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
        Poll::Ready(Ok(()))
    }
}

/// An empty body that sets `dropped` to true when dropped.
struct TrackedBody {
    dropped: Arc<AtomicBool>,
}

impl Body for TrackedBody {
    type Data = Bytes;
    type Error = std::convert::Infallible;
    fn poll_frame(
        self: Pin<&mut Self>,
        _: &mut Context<'_>,
    ) -> Poll<Option<Result<http_body::Frame<Bytes>, Self::Error>>> {
        Poll::Ready(None)
    }
}

impl Drop for TrackedBody {
    fn drop(&mut self) {
        self.dropped.store(true, Ordering::SeqCst);
    }
}

Expected Behavior

My expectation of this program is that the body of the request is always dropped by the time that the only live value is the sender itself. Effectively this should never print body wasn't dropped.

Actual Behavior

Locally ~2m iterations show ~20 instances of the body not being dropped before the sender is fully dropped. This is how in Wasmtime's embedding it ended up showing as a timeout because we rely on the body being dropped to register when the request is sent and such (or an error occurred).

Additional Context

Some poking and prodding around things shows that one possible culprit here is this line which is a bit more robust using Tokio's try_recv (my guess is that try_recv was added after this was authored). Even with that, however, the program above still doesn't drop the body 100% of the time, and this seems due to the fact that even if close has been called on a channel it's possible for try_recv to return Empty (as opposed to Disconnected) if there's an outstanding permit of a message being sent. I think this is basically still a race between the sender/receiver where try_recv isn't enough, hyper would have to loop over try_recv.

Overall this is where I figured it'd be best to file an issue. I'd want to confirm the expectation that this program should reliably drop the body, and then also see what others' thoughts are on this.

Metadata

Metadata

Assignees

No one assigned

    Labels

    C-bugCategory: bug. Something is wrong. This is bad!

    Type

    No type

    Fields

    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions