Skip to content

With rustls, POST bodies are still being read after the request ended and was dropped #2884

@konstin

Description

@konstin

In uv, we observed a panic because a reader that was supposedly dropped still polls the file, trying to update a progress bar that doesn't exist anymore (astral-sh/uv#17090). This reproduces with a fairly minimal POST example. It seems that with rustls, reqwest ignores that a request failed and continues sending the body.

The minimal example that has this behavior:

let request = client
    .post("https://example.com/")
    .body(Body::wrap_stream(ReaderStream::new(reader)));

I tried to look for an explicit cancellation function but couldn't find one. Neither explicit drop nor explicit had an effect.

In uv, the actual production logic involves a loop for retries of failed network requests, formdata POST uploads and reproduced by cutting and reestablishing the network connection (https://github.com/astral-sh/uv/blob/38ae41468206bf59b8b96b0a2d791b4211fa7c14/crates/uv-publish/src/lib.rs#L474-L511). The example below simplifies this to a POST body, while using the progress bar updater to print which readers are being advanced instead.

With default-tls/openssl, each reader gets dropped after the end of the request:

$ cargo run large-file.bin
Finished 1
Dropping reader 1
Switched reader from 1 to 2
Finished 2
Dropping reader 2
Switched reader from 2 to 3
Finished 3
Dropping reader 3
Switched reader from 3 to 4
Finished 4
Dropping reader 4
Switched reader from 4 to 5

With rustl, readers of past requests get polled continuously, adding more and more readers as requests fail:

$ cargo run large-file.bin
Finished 1
Switched reader from 1 to 2
Switched reader from 2 to 1
Switched reader from 1 to 2
Switched reader from 2 to 1
Switched reader from 1 to 2
Switched reader from 2 to 1
Switched reader from 1 to 2
Switched reader from 2 to 1
Switched reader from 1 to 2

MRE:

[package]
name = "reqwest-continues-reading"
version = "0.1.0"
edition = "2024"

[dependencies]
reqwest = { version = "0.12.25", default-features = false, features = [
    "http2",
    "rustls-tls", # Replace this with "default-tls" makes the failure go away
    "stream",
] }
tokio = { version = "1.48.0", features = ["macros", "rt-multi-thread"] }
tokio-util = { version = "0.7.17" }
use reqwest::{Body, Client, RequestBuilder};
use std::env;
use std::error::Error;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use tokio::fs::File;
use tokio_util::io::ReaderStream;

/// An asynchronous reader that reports progress as bytes are read.
pub struct ProgressReader<Reader: tokio::io::AsyncRead + Unpin, Callback: Fn() + Unpin> {
    idx: usize,
    reader: Reader,
    callback: Callback,
}

impl<Reader: tokio::io::AsyncRead + Unpin, Callback: Fn() + Unpin>
    ProgressReader<Reader, Callback>
{
    /// Create a new [`ProgressReader`] that wraps another reader.
    pub fn new(idx: usize, reader: Reader, callback: Callback) -> Self {
        Self {
            idx,
            reader,
            callback,
        }
    }
}

impl<Reader: tokio::io::AsyncRead + Unpin, Callback: Fn() + Unpin> tokio::io::AsyncRead
    for ProgressReader<Reader, Callback>
{
    fn poll_read(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>,
    ) -> std::task::Poll<std::io::Result<()>> {
        std::pin::Pin::new(&mut self.as_mut().reader)
            .poll_read(cx, buf)
            .map_ok(|()| {
                (self.callback)();
            })
    }
}

impl<Reader: tokio::io::AsyncRead + Unpin, Callback: Fn() + Unpin> Drop
    for ProgressReader<Reader, Callback>
{
    fn drop(&mut self) {
        eprintln!("Dropping reader {}", self.idx);
    }
}

// Track which streams are reading.
static LAST_WRITER: AtomicUsize = AtomicUsize::new(1);

async fn build_request(client: &Client, idx: usize) -> Result<RequestBuilder, Box<dyn Error>> {
    let file = File::open(&env::args().nth(1).unwrap()).await?;
    // Inspect the stream while it's read, originally to update the progress bar.
    let reader = tokio::io::BufReader::new(ProgressReader::new(idx, file, move || {
        let last_writer = LAST_WRITER.load(Ordering::SeqCst);
        if last_writer != idx {
            eprintln!("Switched reader from {last_writer} to {idx}");
        }
        LAST_WRITER.store(idx, Ordering::SeqCst);
    }));

    let request = client
        .post("https://upload.pypi.org/legacy/")
        .body(Body::wrap_stream(ReaderStream::new(reader)));
    Ok(request)
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let client = reqwest::ClientBuilder::new()
        .read_timeout(Duration::from_secs(1))
        .build()?;

    // Simulate client that retries
    for i in 1..10 {
        let request = build_request(&client, i).await?;
        let response = request.send().await;
        eprintln!("Finished {i}");
        drop(response);
        // tokio::task::yield_now().await; // Tried this, doesn't help.
    }

    Ok(())
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    B-upstreamBlocked: upstream. Depends on a dependency to make a change first.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions