Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion neqo-bin/benches/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ fn transfer(c: &mut Criterion) {
},
|(server_handle, client)| {
black_box(async move {
client.await.unwrap();
Box::pin(client).await.unwrap();
// Tell server to shut down.
server_handle.send(()).unwrap();
})
Expand Down
3 changes: 1 addition & 2 deletions neqo-bin/src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
reason = "FIXME: False positive?"
)]
async fn main() -> Result<(), neqo_bin::client::Error> {
let args = neqo_bin::client::Args::parse();

neqo_bin::client::client(args).await
Box::pin(neqo_bin::client::client(args)).await

Check warning on line 17 in neqo-bin/src/bin/client.rs

View workflow job for this annotation

GitHub Actions / Find mutants

Missed mutant

replace main -> Result<(), neqo_bin::client::Error> with Ok(())
}
2 changes: 0 additions & 2 deletions neqo-bin/src/client/http09.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

#![expect(clippy::unwrap_used, reason = "This is example code.")]

//! An [HTTP 0.9](https://www.w3.org/Protocols/HTTP/AsImplemented.html) client implementation.

use std::{
Expand Down
4 changes: 1 addition & 3 deletions neqo-bin/src/client/http3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

#![expect(clippy::unwrap_used, reason = "This is example code.")]

//! An HTTP 3 client implementation.

use std::{
Expand Down Expand Up @@ -83,7 +81,7 @@ pub fn create_client(
cid_generator,
local_addr,
remote_addr,
args.shared.quic_parameters.get(args.shared.alpn.as_str()),
args.shared.quic_parameters.get(args.shared.alpn.as_str()).pmtud(false),
Instant::now(),
)?;
let ciphers = args.get_ciphers();
Expand Down
173 changes: 115 additions & 58 deletions neqo-bin/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

#![expect(clippy::unwrap_used, reason = "This is example code.")]

use std::{
collections::VecDeque,
fmt::Display,
Expand Down Expand Up @@ -37,10 +35,11 @@
use tokio::time::Sleep;
use url::{Host, Origin, Url};

use crate::SharedArgs;
use crate::{udp, SharedArgs};

mod http09;
mod http3;
mod proxied_http3;

const BUFWRITER_BUFFER_SIZE: usize = 64 * 1024;

Expand Down Expand Up @@ -179,6 +178,10 @@
#[arg(name = "cid-length", short = 'l', long, default_value = "0",
value_parser = clap::value_parser!(u8).range(..=20))]
cid_len: u8,

/// Use a MASQUE connect-udp proxy server at the given URL.
#[arg(name = "proxy", long)]
proxy: Option<Url>,
}

impl Args {
Expand Down Expand Up @@ -220,6 +223,7 @@
upload_size,
stats: false,
cid_len: 0,
proxy: None,
}
}

Expand Down Expand Up @@ -368,7 +372,7 @@

// Wait for the socket to be readable or the timeout to fire.
async fn ready(
socket: &crate::udp::Socket,
socket: &udp::Socket,
mut timeout: Option<&mut Pin<Box<Sleep>>>,
) -> Result<Ready, io::Error> {
let socket_ready = Box::pin(socket.readable()).map_ok(|()| Ready::Socket);
Expand All @@ -387,6 +391,7 @@
fn take_token(&mut self) -> Option<ResumptionToken>;
}

#[derive(Debug)]
enum CloseState {
NotClosing,
Closing,
Expand All @@ -412,7 +417,7 @@

struct Runner<'a, H: Handler> {
local_addr: SocketAddr,
socket: &'a mut crate::udp::Socket,
socket: &'a mut udp::Socket,
client: H::Client,
handler: H,
timeout: Option<Pin<Box<Sleep>>>,
Expand All @@ -423,7 +428,7 @@
impl<'a, H: Handler> Runner<'a, H> {
fn new(
local_addr: SocketAddr,
socket: &'a mut crate::udp::Socket,
socket: &'a mut udp::Socket,
client: H::Client,
handler: H,
args: &'a Args,
Expand Down Expand Up @@ -587,7 +592,6 @@

#[expect(
clippy::future_not_send,
clippy::missing_panics_doc,
clippy::missing_errors_doc,
reason = "This is example code."
)]
Expand All @@ -604,67 +608,120 @@

init()?;

for ((host, port), mut urls) in urls_by_origin(&args.urls) {
if args.resume && urls.len() < 2 {
qerror!("Resumption to {host} cannot work without at least 2 URLs");
exit(127);
}
for ((host, port), urls) in urls_by_origin(&args.urls) {
client_for_origin(host, port, urls, &args).await?;
}

Ok(())
}

#[expect(clippy::future_not_send, reason = "This is example code.")]
async fn client_for_origin(host: Host, port: u16, mut urls: VecDeque<Url>, args: &Args) -> Res<()> {
if args.resume && urls.len() < 2 {

Check warning on line 620 in neqo-bin/src/client/mod.rs

View workflow job for this annotation

GitHub Actions / Find mutants

Missed mutant

replace < with <= in client_for_origin

Check warning on line 620 in neqo-bin/src/client/mod.rs

View workflow job for this annotation

GitHub Actions / Find mutants

Missed mutant

replace < with > in client_for_origin

Check warning on line 620 in neqo-bin/src/client/mod.rs

View workflow job for this annotation

GitHub Actions / Find mutants

Missed mutant

replace < with == in client_for_origin
qerror!("Resumption to {host} cannot work without at least 2 URLs");
exit(127);
}

let Some(origin_addr) = format!("{host}:{port}").to_socket_addrs()?.find(|addr| {
!matches!(
(addr, args.ipv4_only, args.ipv6_only),
(SocketAddr::V4(..), false, true) | (SocketAddr::V6(..), true, false)
)
}) else {
qerror!("No compatible address found for: {host}");
exit(1);
};

let remote_addr = format!("{host}:{port}").to_socket_addrs()?.find(|addr| {
!matches!(
(addr, args.ipv4_only, args.ipv6_only),
(SocketAddr::V4(..), false, true) | (SocketAddr::V6(..), true, false)
)
});
let Some(remote_addr) = remote_addr else {
qerror!("No compatible address found for: {host}");
let proxy_addr = args.proxy.as_ref().map(|proxy_url| {
let Origin::Tuple(_scheme, proxy_host, proxy_port) = proxy_url.origin() else {
panic!();
};
let Some(proxy_addr) = format!("{proxy_host}:{proxy_port}")
.to_socket_addrs()
.unwrap()
.find(|addr| {
!matches!(

Check warning on line 643 in neqo-bin/src/client/mod.rs

View workflow job for this annotation

GitHub Actions / Find mutants

Missed mutant

delete ! in client_for_origin
(addr, args.ipv4_only, args.ipv6_only),
(SocketAddr::V4(..), false, true) | (SocketAddr::V6(..), true, false)
)
})
else {
qerror!("No compatible address found for: {proxy_host}");
exit(1);
};
let mut socket = crate::udp::Socket::bind(local_addr_for(&remote_addr, 0))?;
if socket.may_fragment() {
qinfo!("Datagrams may be fragmented by the IP layer. Disabling PMTUD.");
args.shared.quic_parameters.no_pmtud = true;
}
let real_local = socket.local_addr().unwrap();
qinfo!(
"{} Client connecting: {real_local:?} -> {remote_addr:?}",
args.shared.alpn
);

let hostname = format!("{host}");
let mut token: Option<ResumptionToken> = None;
let mut first = true;
while !urls.is_empty() {
let to_request = if (args.resume && first) || args.download_in_series {
urls.pop_front().into_iter().collect()
} else {
std::mem::take(&mut urls)
};
(proxy_host, proxy_addr)
});

let mut socket = udp::Socket::bind(local_addr_for(
&proxy_addr.as_ref().map_or(origin_addr, |p| p.1),
0,
))?;
let real_local = socket.local_addr().unwrap();
qinfo!(
"{} Client connecting: {real_local:?} -> {origin_addr:?}",
args.shared.alpn
);

first = false;
let hostname = format!("{host}");
let mut token: Option<ResumptionToken> = None;
let mut first = true;
while !urls.is_empty() {
let to_request = if (args.resume && first) || args.download_in_series {

Check warning on line 669 in neqo-bin/src/client/mod.rs

View workflow job for this annotation

GitHub Actions / Find mutants

Missed mutant

replace && with || in client_for_origin

Check warning on line 669 in neqo-bin/src/client/mod.rs

View workflow job for this annotation

GitHub Actions / Find mutants

Missed mutant

replace || with && in client_for_origin
urls.pop_front().into_iter().collect()
} else {
std::mem::take(&mut urls)
};

token = if args.shared.alpn == "h3" {
let client = http3::create_client(&args, real_local, remote_addr, &hostname, token)
first = false;

token = if args.shared.alpn == "h3" {

Check warning on line 677 in neqo-bin/src/client/mod.rs

View workflow job for this annotation

GitHub Actions / Find mutants

Missed mutant

replace == with != in client_for_origin
let mut client_args = args.clone();
// Don't forward proxy authorization to origin.
client_args.headers.retain(|h| {
let name_lower = h.name().to_ascii_lowercase();
name_lower != "authorization" && name_lower != "proxy-authorization"

Check warning on line 682 in neqo-bin/src/client/mod.rs

View workflow job for this annotation

GitHub Actions / Find mutants

Missed mutant

replace != with == in client_for_origin

Check warning on line 682 in neqo-bin/src/client/mod.rs

View workflow job for this annotation

GitHub Actions / Find mutants

Missed mutant

replace && with || in client_for_origin
});
let client =
http3::create_client(&client_args, real_local, origin_addr, &hostname, token)
.expect("failed to create client");

let handler = http3::Handler::new(to_request, args.clone());

Runner::new(real_local, &mut socket, client, handler, &args)
let client_handler = http3::Handler::new(to_request, client_args.clone());

if let Some((proxy_host, proxy_addr)) = &proxy_addr {
let proxy = proxied_http3::ProxiedHttp3::new(
client,
client_handler,
args.proxy.clone().unwrap(),
args.headers.clone(),
&format!("{proxy_host}"),
real_local,
*proxy_addr,
)?;
Box::pin(
Runner::new(
real_local,
&mut socket,
proxy,
proxied_http3::Handler::default(),
args,
)
.run(),
)
.await?
} else {
Runner::new(real_local, &mut socket, client, client_handler, args)
.run()
.await?
} else {
let client =
http09::create_client(&args, real_local, remote_addr, &hostname, token)
.expect("failed to create client");
}
} else {
let client = http09::create_client(args, real_local, origin_addr, &hostname, token)
.expect("failed to create client");

let handler = http09::Handler::new(to_request, &args);
let handler = http09::Handler::new(to_request, args);

Runner::new(real_local, &mut socket, client, handler, &args)
.run()
.await?
};
}
Runner::new(real_local, &mut socket, client, handler, args)
.run()
.await?
};
}

Ok(())
}
Loading
Loading