Skip to content

Commit 2c0c63a

Browse files
committed
test(http1): setup test fixture for dispatch loop
1 parent 1c70fab commit 2c0c63a

File tree

5 files changed

+510
-0
lines changed

5 files changed

+510
-0
lines changed

Cargo.toml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ pin-project-lite = "0.2.4"
5252
spmc = "0.3"
5353
serde = { version = "1.0", features = ["derive"] }
5454
serde_json = "1.0"
55+
tracing = "0.1"
56+
tracing-subscriber = "0.3"
5557
tokio = { version = "1", features = [
5658
"fs",
5759
"macros",
@@ -239,6 +241,16 @@ name = "integration"
239241
path = "tests/integration.rs"
240242
required-features = ["full"]
241243

244+
[[test]]
245+
name = "ready_on_poll_stream"
246+
path = "tests/ready_on_poll_stream.rs"
247+
required-features = ["full"]
248+
249+
[[test]]
250+
name = "unbuffered_stream"
251+
path = "tests/unbuffered_stream.rs"
252+
required-features = ["full"]
253+
242254
[[test]]
243255
name = "server"
244256
path = "tests/server.rs"

tests/h1_server/fixture.rs

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
use http_body_util::StreamBody;
2+
use hyper::body::Bytes;
3+
use hyper::body::Frame;
4+
use hyper::server::conn::http1;
5+
use hyper::service::service_fn;
6+
use hyper::{Response, StatusCode};
7+
use std::convert::Infallible;
8+
use std::time::Duration;
9+
use tokio::sync::mpsc;
10+
use tokio::time::timeout;
11+
use tracing::{error, info};
12+
13+
pub struct TestConfig {
14+
pub total_chunks: usize,
15+
pub chunk_size: usize,
16+
pub chunk_timeout: Duration,
17+
}
18+
19+
impl TestConfig {
20+
pub fn with_timeout(chunk_timeout: Duration) -> Self {
21+
Self {
22+
total_chunks: 16,
23+
chunk_size: 64 * 1024,
24+
chunk_timeout,
25+
}
26+
}
27+
}
28+
29+
pub struct Client {
30+
pub rx: mpsc::UnboundedReceiver<Vec<u8>>,
31+
pub tx: mpsc::UnboundedSender<Vec<u8>>,
32+
}
33+
34+
pub async fn run<S>(server: S, mut client: Client, config: TestConfig)
35+
where
36+
S: hyper::rt::Read + hyper::rt::Write + Send + Unpin + 'static,
37+
{
38+
let mut http_builder = http1::Builder::new();
39+
http_builder.max_buf_size(config.chunk_size);
40+
41+
let total_chunks = config.total_chunks;
42+
let chunk_size = config.chunk_size;
43+
44+
let service = service_fn(move |_| {
45+
let total_chunks = total_chunks;
46+
let chunk_size = chunk_size;
47+
async move {
48+
info!(
49+
"Creating payload of {} chunks of {} KiB each ({} MiB total)...",
50+
total_chunks,
51+
chunk_size / 1024,
52+
total_chunks * chunk_size / (1024 * 1024)
53+
);
54+
let bytes = Bytes::from(vec![0; chunk_size]);
55+
let data = vec![bytes.clone(); total_chunks];
56+
let stream = futures_util::stream::iter(
57+
data.into_iter()
58+
.map(|b| Ok::<_, Infallible>(Frame::data(b))),
59+
);
60+
let body = StreamBody::new(stream);
61+
info!("Server: Sending data response...");
62+
Ok::<_, hyper::Error>(
63+
Response::builder()
64+
.status(StatusCode::OK)
65+
.header("content-type", "application/octet-stream")
66+
.header("content-length", (total_chunks * chunk_size).to_string())
67+
.body(body)
68+
.unwrap(),
69+
)
70+
}
71+
});
72+
73+
let server_task = tokio::spawn(async move {
74+
let conn = http_builder.serve_connection(Box::pin(server), service);
75+
let conn_result = conn.await;
76+
if let Err(e) = &conn_result {
77+
error!("Server connection error: {}", e);
78+
}
79+
conn_result
80+
});
81+
82+
let get_request = "GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n";
83+
client
84+
.tx
85+
.send(get_request.as_bytes().to_vec())
86+
.map_err(|e| {
87+
Box::new(std::io::Error::new(
88+
std::io::ErrorKind::Other,
89+
format!("Failed to send request: {}", e),
90+
))
91+
})
92+
.unwrap();
93+
94+
info!("Client is reading response...");
95+
let mut bytes_received = 0;
96+
let mut all_data = Vec::new();
97+
loop {
98+
match timeout(config.chunk_timeout, client.rx.recv()).await {
99+
Ok(Some(chunk)) => {
100+
bytes_received += chunk.len();
101+
all_data.extend_from_slice(&chunk);
102+
}
103+
Ok(None) => break,
104+
Err(_) => {
105+
panic!(
106+
"Chunk timeout: chunk took longer than {:?}",
107+
config.chunk_timeout
108+
);
109+
}
110+
}
111+
}
112+
113+
// Clean up
114+
let result = server_task.await.unwrap();
115+
result.unwrap();
116+
117+
// Parse HTTP response to find body start
118+
// HTTP response format: "HTTP/1.1 200 OK\r\n...headers...\r\n\r\n<body>"
119+
let body_start = all_data
120+
.windows(4)
121+
.position(|w| w == b"\r\n\r\n")
122+
.map(|pos| pos + 4)
123+
.unwrap_or(0);
124+
125+
let body_bytes = bytes_received - body_start;
126+
assert_eq!(
127+
body_bytes,
128+
config.total_chunks * config.chunk_size,
129+
"Expected {} body bytes, got {} (total received: {}, headers: {})",
130+
config.total_chunks * config.chunk_size,
131+
body_bytes,
132+
bytes_received,
133+
body_start
134+
);
135+
info!(bytes_received, body_bytes, "Client done receiving bytes");
136+
}

tests/h1_server/mod.rs

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
pub mod fixture;
2+
3+
use hyper::rt::{Read, ReadBufCursor};
4+
use pin_project_lite::pin_project;
5+
use std::io;
6+
use std::pin::Pin;
7+
use std::task::{ready, Context, Poll};
8+
use tokio::sync::mpsc;
9+
10+
// Common read half shared by both stream types
11+
pin_project! {
12+
#[derive(Debug)]
13+
pub struct StreamReadHalf {
14+
#[pin]
15+
read_rx: mpsc::UnboundedReceiver<Vec<u8>>,
16+
read_buffer: Vec<u8>,
17+
}
18+
}
19+
20+
impl StreamReadHalf {
21+
pub fn new(read_rx: mpsc::UnboundedReceiver<Vec<u8>>) -> Self {
22+
Self {
23+
read_rx,
24+
read_buffer: Vec::new(),
25+
}
26+
}
27+
}
28+
29+
impl Read for StreamReadHalf {
30+
fn poll_read(
31+
mut self: Pin<&mut Self>,
32+
cx: &mut Context<'_>,
33+
mut buf: ReadBufCursor<'_>,
34+
) -> Poll<io::Result<()>> {
35+
let mut this = self.as_mut().project();
36+
37+
// First, try to satisfy the read request from the internal buffer
38+
if !this.read_buffer.is_empty() {
39+
let to_read = std::cmp::min(this.read_buffer.len(), buf.remaining());
40+
// Copy data from internal buffer to the read buffer
41+
buf.put_slice(&this.read_buffer[..to_read]);
42+
// Remove the consumed data from the internal buffer
43+
this.read_buffer.drain(..to_read);
44+
return Poll::Ready(Ok(()));
45+
}
46+
47+
// If internal buffer is empty, try to get data from the channel
48+
match this.read_rx.as_mut().get_mut().try_recv() {
49+
Ok(data) => {
50+
// Copy as much data as we can fit in the buffer
51+
let to_read = std::cmp::min(data.len(), buf.remaining());
52+
buf.put_slice(&data[..to_read]);
53+
54+
// Store any remaining data in the internal buffer for next time
55+
if to_read < data.len() {
56+
let remaining = &data[to_read..];
57+
this.read_buffer.extend_from_slice(remaining);
58+
}
59+
Poll::Ready(Ok(()))
60+
}
61+
Err(mpsc::error::TryRecvError::Empty) => {
62+
match ready!(this.read_rx.poll_recv(cx)) {
63+
Some(data) => {
64+
// Copy as much data as we can fit in the buffer
65+
let to_read = std::cmp::min(data.len(), buf.remaining());
66+
buf.put_slice(&data[..to_read]);
67+
68+
// Store any remaining data in the internal buffer for next time
69+
if to_read < data.len() {
70+
let remaining = &data[to_read..];
71+
this.read_buffer.extend_from_slice(remaining);
72+
}
73+
Poll::Ready(Ok(()))
74+
}
75+
None => Poll::Ready(Ok(())),
76+
}
77+
}
78+
Err(mpsc::error::TryRecvError::Disconnected) => {
79+
// Channel closed, return EOF
80+
Poll::Ready(Ok(()))
81+
}
82+
}
83+
}
84+
}
85+
86+
pub fn init_tracing() {
87+
use std::sync::Once;
88+
static INIT: Once = Once::new();
89+
INIT.call_once(|| {
90+
tracing_subscriber::fmt()
91+
.with_max_level(tracing::Level::INFO)
92+
.with_target(true)
93+
.with_thread_ids(true)
94+
.with_thread_names(true)
95+
.init();
96+
});
97+
}

0 commit comments

Comments
 (0)