From 9443fca43113a96da4e8fe959d94f0153224a8b5 Mon Sep 17 00:00:00 2001 From: Usama Saqib Date: Tue, 2 Jun 2026 10:58:22 +0200 Subject: [PATCH 01/13] introduce tcp crr --- lading/src/bin/payloadtool.rs | 6 ++ lading/src/blackhole.rs | 12 +++ lading/src/blackhole/tcp_crr.rs | 143 ++++++++++++++++++++++++++++ lading/src/generator.rs | 12 +++ lading/src/generator/tcp_crr.rs | 139 +++++++++++++++++++++++++++ lading/src/generator/tcp_rr.rs | 3 +- lading/src/neper/rr.rs | 164 +++++++++++++++++++++++++++++--- 7 files changed, 464 insertions(+), 15 deletions(-) create mode 100644 lading/src/blackhole/tcp_crr.rs create mode 100644 lading/src/generator/tcp_crr.rs diff --git a/lading/src/bin/payloadtool.rs b/lading/src/bin/payloadtool.rs index 0e90d2a9b..5596b6915 100644 --- a/lading/src/bin/payloadtool.rs +++ b/lading/src/bin/payloadtool.rs @@ -486,6 +486,12 @@ fn check_generator(config: &generator::Config, args: &Args) -> Result { + if args.fingerprint { + return Ok(None); + } + unimplemented!("TcpCrr not supported") + } generator::Inner::TcpRr(_) => { if args.fingerprint { return Ok(None); diff --git a/lading/src/blackhole.rs b/lading/src/blackhole.rs index e23db4c69..1d130598f 100644 --- a/lading/src/blackhole.rs +++ b/lading/src/blackhole.rs @@ -15,6 +15,7 @@ pub mod otlp; pub mod splunk_hec; pub mod sqs; pub mod tcp; +pub mod tcp_crr; pub mod tcp_rr; pub mod udp; pub mod unix_datagram; @@ -26,6 +27,9 @@ pub enum Error { /// See [`crate::blackhole::tcp::Error`] for details. #[error(transparent)] Tcp(tcp::Error), + /// See [`crate::blackhole::tcp_crr::Error`] for details. + #[error(transparent)] + TcpCrr(tcp_crr::Error), /// See [`crate::blackhole::tcp_rr::Error`] for details. #[error(transparent)] TcpRr(tcp_rr::Error), @@ -87,6 +91,8 @@ pub struct General { pub enum Inner { /// See [`crate::blackhole::tcp::Config`] for details. Tcp(tcp::Config), + /// See [`crate::blackhole::tcp_crr::Config`] for details. + TcpCrr(tcp_crr::Config), /// See [`crate::blackhole::tcp_rr::Config`] for details. TcpRr(tcp_rr::Config), /// See [`crate::blackhole::datadog::Config`] for details. @@ -117,6 +123,8 @@ pub enum Inner { pub enum Server { /// See [`crate::blackhole::tcp::Tcp`] for details. Tcp(tcp::Tcp), + /// See [`crate::blackhole::tcp_crr::TcpCrr`] for details. + TcpCrr(tcp_crr::TcpCrr), /// See [`crate::blackhole::tcp_rr::TcpRr`] for details. TcpRr(tcp_rr::TcpRr), /// See [`crate::blackhole::datadog::Datadog`] for details. @@ -152,6 +160,9 @@ impl Server { pub fn new(config: Config, shutdown: lading_signal::Watcher) -> Result { let server = match config.inner { Inner::Tcp(conf) => Self::Tcp(tcp::Tcp::new(config.general, &conf, shutdown)), + Inner::TcpCrr(conf) => { + Self::TcpCrr(tcp_crr::TcpCrr::new(config.general, &conf, shutdown)) + } Inner::TcpRr(conf) => Self::TcpRr(tcp_rr::TcpRr::new(config.general, &conf, shutdown)), Inner::Datadog(conf) => { Self::Datadog(datadog::Datadog::new(config.general, conf, shutdown)) @@ -194,6 +205,7 @@ impl Server { pub async fn run(self) -> Result<(), Error> { match self { Server::Tcp(inner) => inner.run().await.map_err(Error::Tcp), + Server::TcpCrr(inner) => inner.run().await.map_err(Error::TcpCrr), Server::TcpRr(inner) => inner.run().await.map_err(Error::TcpRr), Server::Datadog(inner) => inner.run().await.map_err(Error::Datadog), Server::DatadogStatefulLogs(inner) => { diff --git a/lading/src/blackhole/tcp_crr.rs b/lading/src/blackhole/tcp_crr.rs new file mode 100644 index 000000000..3cca19458 --- /dev/null +++ b/lading/src/blackhole/tcp_crr.rs @@ -0,0 +1,143 @@ +//! TCP connect/request/response (`tcp_crr`) blackhole — the server side. +//! Based on +//! +//! Listens for incoming connections and, for each flow, reads a fixed-size +//! request then writes a fixed-size response. The CRR client closes the +//! connection after each response; the server side is identical to `tcp_rr` +//! and delegates to the same shared machinery. +//! +//! The event-loop machinery lives in [`crate::neper::rr`]; this module is a +//! thin wrapper that supplies configuration. +//! +//! ## Metrics +//! +//! `connections_accepted`: Incoming connections accepted +//! `requests_received`: Completed request reads +//! `responses_sent`: Completed response writes +//! `bytes_received`: Request bytes read +//! `bytes_written`: Response bytes sent +//! `connections_closed`: Flow removals (client close + I/O errors) + +use std::net::{IpAddr, SocketAddr}; +use std::num::{NonZeroU16, NonZeroUsize}; + +use serde::{Deserialize, Serialize}; + +use super::General; +use crate::neper::rr::{self, ServerParams}; + +fn default_nonzero_u16() -> NonZeroU16 { + NonZeroU16::new(1).expect("1 is nonzero") +} + +fn default_nonzero_usize() -> NonZeroUsize { + NonZeroUsize::new(1).expect("1 is nonzero") +} + +fn default_control_port() -> u16 { + 12866 +} + +fn default_data_port() -> u16 { + 12867 +} + +fn default_backlog() -> i32 { + 1024 +} + +const fn default_true() -> bool { + true +} + +#[derive(Debug, Deserialize, Serialize, Clone, Copy, PartialEq, Eq)] +#[serde(deny_unknown_fields)] +/// Configuration for the `tcp_crr` blackhole. +pub struct Config { + /// IP address to bind on. + pub addr: IpAddr, + /// Data port for flow connections. Default 12867. + #[serde(default = "default_data_port")] + pub data_port: u16, + /// Control port for startup synchronization with the generator. Default 12866. + #[serde(default = "default_control_port")] + pub control_port: u16, + /// Number of OS server threads. Default 1. When > 1, uses `SO_REUSEPORT` + /// with an eBPF program for load balancing. + #[serde(default = "default_nonzero_u16")] + pub threads: NonZeroU16, + /// Total number of TCP flows the generator should open (neper `-F`). + /// Default 1. Sent to the generator over the control connection at + /// startup; the generator does not configure this independently. + #[serde(default = "default_nonzero_u16")] + pub flows: NonZeroU16, + /// Bytes to read per request. Default 1. + #[serde(default = "default_nonzero_usize")] + pub request_size: NonZeroUsize, + /// Bytes to send per response. Default 1. + #[serde(default = "default_nonzero_usize")] + pub response_size: NonZeroUsize, + /// Whether to set `TCP_NODELAY` on accepted connections. Default true. + #[serde(default = "default_true")] + pub no_delay: bool, + /// Listener backlog (pending-connection queue length) passed to `listen(2)`. + /// Default 1024. CRR workloads benefit from a larger backlog to absorb + /// connect bursts. + #[serde(default = "default_backlog")] + pub backlog: i32, +} + +#[derive(thiserror::Error, Debug)] +/// Errors produced by [`TcpCrr`]. +pub enum Error { + /// Shared neper-style request/response error. + #[error(transparent)] + Rr(#[from] rr::Error), +} + +#[derive(Debug)] +/// The `tcp_crr` blackhole (server side). +pub struct TcpCrr { + config: Config, + metric_labels: Vec<(String, String)>, + shutdown: lading_signal::Watcher, +} + +impl TcpCrr { + /// Create a new [`TcpCrr`] blackhole instance. + #[must_use] + pub fn new(general: General, config: &Config, shutdown: lading_signal::Watcher) -> Self { + let mut metric_labels = vec![ + ("component".to_string(), "blackhole".to_string()), + ("component_name".to_string(), "tcp_crr".to_string()), + ]; + if let Some(id) = general.id { + metric_labels.push(("id".to_string(), id)); + } + Self { + config: *config, + metric_labels, + shutdown, + } + } + + /// Run the blackhole to completion or until a shutdown signal is received. + /// + /// # Errors + /// + /// Returns an error if binding fails or a worker thread panics. + pub async fn run(self) -> Result<(), Error> { + let params = ServerParams { + data_addr: SocketAddr::new(self.config.addr, self.config.data_port), + control_addr: SocketAddr::new(self.config.addr, self.config.control_port), + threads: self.config.threads.get(), + flows: self.config.flows.get(), + request_size: self.config.request_size.get(), + response_size: self.config.response_size.get(), + no_delay: self.config.no_delay, + backlog: self.config.backlog, + }; + rr::run_server(params, self.metric_labels, self.shutdown, "tcp_crr").await?; + Ok(()) + } +} diff --git a/lading/src/generator.rs b/lading/src/generator.rs index 0d2744680..f2ea9490f 100644 --- a/lading/src/generator.rs +++ b/lading/src/generator.rs @@ -27,6 +27,7 @@ pub mod process_tree; pub mod procfs; pub mod splunk_hec; pub mod tcp; +pub mod tcp_crr; pub mod tcp_rr; pub mod trace_agent; pub mod udp; @@ -39,6 +40,9 @@ pub enum Error { /// See [`crate::generator::tcp::Error`] for details. #[error(transparent)] Tcp(#[from] tcp::Error), + /// See [`crate::generator::tcp_crr::Error`] for details. + #[error(transparent)] + TcpCrr(#[from] tcp_crr::Error), /// See [`crate::generator::tcp_rr::Error`] for details. #[error(transparent)] TcpRr(#[from] tcp_rr::Error), @@ -115,6 +119,8 @@ pub struct General { pub enum Inner { /// See [`crate::generator::tcp::Config`] for details. Tcp(tcp::Config), + /// See [`crate::generator::tcp_crr::Config`] for details. + TcpCrr(tcp_crr::Config), /// See [`crate::generator::tcp_rr::Config`] for details. TcpRr(tcp_rr::Config), /// See [`crate::generator::udp::Config`] for details. @@ -156,6 +162,8 @@ pub enum Inner { pub enum Server { /// See [`crate::generator::tcp::Tcp`] for details. Tcp(tcp::Tcp), + /// See [`crate::generator::tcp_crr::TcpCrr`] for details. + TcpCrr(tcp_crr::TcpCrr), /// See [`crate::generator::tcp_rr::TcpRr`] for details. TcpRr(tcp_rr::TcpRr), /// See [`crate::generator::udp::Udp`] for details. @@ -201,6 +209,9 @@ impl Server { pub fn new(config: Config, shutdown: lading_signal::Watcher) -> Result { let srv = match config.inner { Inner::Tcp(conf) => Self::Tcp(tcp::Tcp::new(config.general, &conf, shutdown)?), + Inner::TcpCrr(conf) => { + Self::TcpCrr(tcp_crr::TcpCrr::new(config.general, &conf, shutdown)) + } Inner::TcpRr(conf) => Self::TcpRr(tcp_rr::TcpRr::new(config.general, &conf, shutdown)), Inner::Udp(conf) => Self::Udp(udp::Udp::new(config.general, &conf, shutdown)?), Inner::Http(conf) => Self::Http(http::Http::new(config.general, conf, shutdown)?), @@ -276,6 +287,7 @@ impl Server { match self { Server::Tcp(inner) => inner.spin().await?, + Server::TcpCrr(inner) => inner.spin().await?, Server::TcpRr(inner) => inner.spin().await?, Server::Udp(inner) => inner.spin().await?, Server::Http(inner) => inner.spin().await?, diff --git a/lading/src/generator/tcp_crr.rs b/lading/src/generator/tcp_crr.rs new file mode 100644 index 000000000..5ee5c4d90 --- /dev/null +++ b/lading/src/generator/tcp_crr.rs @@ -0,0 +1,139 @@ +//! TCP connect/request/response (`tcp_crr`) generator — the client side. +//! Based on +//! +//! Implements neper's `tcp_crr` protocol: each flow connects, sends a +//! fixed-size request, reads a fixed-size response, closes the connection, +//! then reconnects and repeats. This measures connection-establishment rate +//! end-to-end, including kernel and TCP-handshake overhead. +//! +//! The event-loop machinery lives in [`crate::neper::rr`]; this module is a +//! thin wrapper that supplies configuration and selects [`Mode::Crr`]. +//! +//! ## Ephemeral port / `TIME_WAIT` +//! +//! Each transaction consumes one client-side ephemeral port for the duration +//! of `TIME_WAIT` (~60s by default on Linux). At even a few thousand +//! transactions/sec the ~28k usable port range exhausts in seconds. Widen +//! `net.ipv4.ip_local_port_range` and consider lowering `tcp_fin_timeout` +//! before running sustained CRR workloads. +//! +//! ## Metrics +//! +//! `connections_initiated`: Successful client-side connect completions +//! `requests_sent`: Completed request writes +//! `responses_received`: Completed response reads +//! `bytes_written`: Request bytes sent +//! `bytes_read`: Response bytes received +//! `connections_failed`: Failed connection attempts + +use std::net::{IpAddr, SocketAddr}; +use std::num::{NonZeroU16, NonZeroUsize}; + +use serde::{Deserialize, Serialize}; + +use super::General; +use crate::generator::common::MetricsBuilder; +use crate::neper::rr::{self, ClientParams, Mode}; + +fn default_nonzero_u16() -> NonZeroU16 { + NonZeroU16::new(1).expect("1 is nonzero") +} + +fn default_nonzero_usize() -> NonZeroUsize { + NonZeroUsize::new(1).expect("1 is nonzero") +} + +const fn default_true() -> bool { + true +} + +fn default_control_port() -> u16 { + 12866 +} + +fn default_data_port() -> u16 { + 12867 +} + +#[derive(Debug, Deserialize, Serialize, PartialEq, Clone)] +#[serde(deny_unknown_fields)] +/// Configuration for the `tcp_crr` generator. +/// +/// Flow count (neper `-F`) is *not* configured here — it is owned by the +/// `tcp_crr` blackhole and communicated to the generator over the control +/// port during startup. +pub struct Config { + /// The IP address of the `tcp_crr` server. + pub addr: String, + /// Data port for flow connections. Default 12867. + #[serde(default = "default_data_port")] + pub data_port: u16, + /// Control port for startup synchronization with the blackhole. Default 12866. + #[serde(default = "default_control_port")] + pub control_port: u16, + /// Number of OS threads (neper -T). Default 1. + #[serde(default = "default_nonzero_u16")] + pub threads: NonZeroU16, + /// Bytes per request. Default 1. + #[serde(default = "default_nonzero_usize")] + pub request_size: NonZeroUsize, + /// Bytes per response to read back. Default 1. + #[serde(default = "default_nonzero_usize")] + pub response_size: NonZeroUsize, + /// Whether to set `TCP_NODELAY` on connections. Default true. + #[serde(default = "default_true")] + pub no_delay: bool, +} + +#[derive(thiserror::Error, Debug)] +/// Errors produced by [`TcpCrr`]. +pub enum Error { + /// Shared neper-style request/response error. + #[error(transparent)] + Rr(#[from] rr::Error), +} + +#[derive(Debug)] +/// The `tcp_crr` generator (client side). +pub struct TcpCrr { + config: Config, + metric_labels: Vec<(String, String)>, + shutdown: lading_signal::Watcher, +} + +impl TcpCrr { + /// Create a new [`TcpCrr`] generator instance. + #[must_use] + pub fn new(general: General, config: &Config, shutdown: lading_signal::Watcher) -> Self { + let metric_labels = MetricsBuilder::new("tcp_crr").with_id(general.id).build(); + Self { + config: config.clone(), + metric_labels, + shutdown, + } + } + + /// Run the generator to completion or until a shutdown signal is received. + /// + /// # Errors + /// + /// Returns an error if a worker thread panics or configuration is invalid. + /// + /// # Panics + /// + /// Panics if `addr` cannot be parsed as an IP address. + pub async fn spin(self) -> Result<(), Error> { + let ip: IpAddr = self.config.addr.parse().expect("invalid addr"); + let params = ClientParams { + data_addr: SocketAddr::new(ip, self.config.data_port), + control_addr: SocketAddr::new(ip, self.config.control_port), + threads: self.config.threads.get(), + request_size: self.config.request_size.get(), + response_size: self.config.response_size.get(), + no_delay: self.config.no_delay, + mode: Mode::Crr, + }; + rr::run_client(params, self.metric_labels, self.shutdown, "tcp_crr").await?; + Ok(()) + } +} diff --git a/lading/src/generator/tcp_rr.rs b/lading/src/generator/tcp_rr.rs index d3f2e75b8..2185e751d 100644 --- a/lading/src/generator/tcp_rr.rs +++ b/lading/src/generator/tcp_rr.rs @@ -23,7 +23,7 @@ use serde::{Deserialize, Serialize}; use super::General; use crate::generator::common::MetricsBuilder; -use crate::neper::rr::{self, ClientParams}; +use crate::neper::rr::{self, ClientParams, Mode}; fn default_nonzero_u16() -> NonZeroU16 { NonZeroU16::new(1).expect("1 is nonzero") @@ -121,6 +121,7 @@ impl TcpRr { request_size: self.config.request_size.get(), response_size: self.config.response_size.get(), no_delay: self.config.no_delay, + mode: Mode::Rr, }; rr::run_client(params, self.metric_labels, self.shutdown, "tcp_rr").await?; Ok(()) diff --git a/lading/src/neper/rr.rs b/lading/src/neper/rr.rs index 0f3d269dd..6c124244b 100644 --- a/lading/src/neper/rr.rs +++ b/lading/src/neper/rr.rs @@ -46,6 +46,15 @@ pub enum Error { Config(String), } +/// Which neper-style protocol the client is driving. +#[derive(Clone, Copy, Debug)] +pub(crate) enum Mode { + /// `tcp_rr`: persistent connection, request/response loop forever. + Rr, + /// `tcp_crr`: connect, request/response, close, reconnect, repeat. + Crr, +} + /// Parameters for [`run_client`]. /// /// Flow count is *not* a client parameter — it is owned by the server and @@ -63,6 +72,8 @@ pub(crate) struct ClientParams { pub(crate) response_size: usize, /// Whether to set `TCP_NODELAY`. pub(crate) no_delay: bool, + /// RR or CRR. + pub(crate) mode: Mode, } /// Parameters for [`run_server`]. @@ -87,10 +98,26 @@ pub(crate) struct ServerParams { } enum ClientState { + /// CRR only: waiting for a non-blocking `connect(2)` to complete. The + /// `WRITABLE` readiness event signals connect completion; `take_error()` + /// then tells us if it succeeded. + Connecting, SendRequest, RecvResponse, } +/// Actions returned by [`handle_client_event`]. Supersets [`flow::Action`] +/// with [`ClientAction::Reconnect`] for CRR's per-transaction reconnect. +#[derive(Clone, Copy)] +enum ClientAction { + Continue, + Reregister(Interest), + /// CRR: response complete — close this socket and open a new one with the + /// same `Token`. Handled by [`apply_client_action`]. + Reconnect, + Remove, +} + enum ServerState { RecvRequest, SendResponse, @@ -202,6 +229,7 @@ pub(crate) async fn run_client( let request_size = params.request_size; let response_size = params.response_size; let no_delay = params.no_delay; + let mode = params.mode; let mut worker_handles = Vec::with_capacity(params.threads as usize); for i in 0..params.threads { let thread_flows = flow_dist[i as usize]; @@ -214,6 +242,7 @@ pub(crate) async fn run_client( request_size, response_size, no_delay, + mode, &flag, &tm[i as usize], ); @@ -231,12 +260,14 @@ pub(crate) async fn run_client( Ok(()) } +#[allow(clippy::too_many_arguments)] fn client_thread_main( addr: SocketAddr, num_flows: u16, request_size: usize, response_size: usize, no_delay: bool, + mode: Mode, shutdown_flag: &AtomicBool, metrics: &ThreadMetrics, ) { @@ -266,6 +297,7 @@ fn client_thread_main( state: ClientState::SendRequest, xfer: request_size, }); + metrics.connections_initiated.add(1); } Err(e) => { trace!("connection to {addr} failed: {e}"); @@ -284,19 +316,106 @@ fn client_thread_main( let Some(fl) = flows.get_mut(token) else { continue; }; - let action = handle_client_event(fl, &request_buf, &mut response_buf, metrics); - flow::apply_action(action, token, &mut flows, poll.registry()); + let action = handle_client_event(fl, mode, &request_buf, &mut response_buf, metrics); + apply_client_action(action, token, &mut flows, &poll, addr, no_delay, metrics); + } + } +} + +/// Apply a [`ClientAction`] to the flow map. Handles the CRR +/// reconnect transition (deregister old stream, open a new +/// non-blocking connect, reregister with the same token). +fn apply_client_action( + action: ClientAction, + token: Token, + flows: &mut FlowMap, + poll: &Poll, + addr: SocketAddr, + no_delay: bool, + metrics: &ThreadMetrics, +) { + let registry = poll.registry(); + match action { + ClientAction::Continue => {} + ClientAction::Reregister(interest) => { + if let Some(flow) = flows.get_mut(token) { + let _ = registry.reregister(&mut flow.stream, flow.token, interest); + } + } + ClientAction::Reconnect => { + let Some(flow) = flows.get_mut(token) else { + return; + }; + let _ = registry.deregister(&mut flow.stream); + match TcpStream::connect(addr) { + Ok(mut new_stream) => { + { + let sock = socket2::SockRef::from(&new_stream); + if let Err(e) = sock.set_tcp_nodelay(no_delay) { + trace!("failed to set TCP_NODELAY on reconnect: {e}"); + } + } + if let Err(e) = + registry.register(&mut new_stream, flow.token, Interest::WRITABLE) + { + trace!("reconnect register failed: {e}"); + metrics.connections_failed.add(1); + let _ = flows.remove(token); + } else { + flow.stream = new_stream; + flow.state = ClientState::Connecting; + flow.xfer = 0; + } + } + Err(e) => { + trace!("reconnect to {addr} failed: {e}"); + metrics.connections_failed.add(1); + let _ = flows.remove(token); + } + } + } + ClientAction::Remove => { + if let Some(mut flow) = flows.remove(token) { + let _ = registry.deregister(&mut flow.stream); + } } } } fn handle_client_event( flow: &mut Flow, + mode: Mode, request_buf: &[u8], response_buf: &mut [u8], metrics: &ThreadMetrics, -) -> Action { +) -> ClientAction { + // Connecting → SendRequest transition: mio is edge-triggered, so the + // single WRITABLE event that signaled connect completion is also the + // event that must drive the first write. Transition state and fall + // through to SendRequest in the same call. + if matches!(flow.state, ClientState::Connecting) { + match flow.stream.take_error() { + Ok(None) => { + flow.state = ClientState::SendRequest; + flow.xfer = request_buf.len(); + metrics.connections_initiated.add(1); + // fall through + } + Ok(Some(e)) => { + trace!("connect failed: {e}"); + metrics.connections_failed.add(1); + return ClientAction::Reconnect; + } + Err(e) => { + trace!("take_error failed: {e}"); + metrics.connections_failed.add(1); + return ClientAction::Reconnect; + } + } + } + match flow.state { + ClientState::Connecting => unreachable!("transitioned out of Connecting above"), ClientState::SendRequest => { let offset = request_buf.len() - flow.xfer; match flow.stream.write(&request_buf[offset..]) { @@ -307,22 +426,22 @@ fn handle_client_event( flow.state = ClientState::RecvResponse; metrics.requests_sent.add(1); metrics.bytes_written.add(request_buf.len() as u64); - Action::Reregister(Interest::READABLE) + ClientAction::Reregister(Interest::READABLE) } else { - Action::Continue + ClientAction::Continue } } - Err(e) if e.kind() == ErrorKind::WouldBlock => Action::Continue, + Err(e) if e.kind() == ErrorKind::WouldBlock => ClientAction::Continue, Err(e) => { trace!("write error: {e}"); - Action::Remove + ClientAction::Remove } } } ClientState::RecvResponse => { let offset = response_buf.len() - flow.xfer; match flow.stream.read(&mut response_buf[offset..]) { - Ok(0) => Action::Remove, + Ok(0) => ClientAction::Remove, Ok(n) => { flow.xfer -= n; if flow.xfer == 0 { @@ -330,15 +449,22 @@ fn handle_client_event( flow.state = ClientState::SendRequest; metrics.responses_received.add(1); metrics.bytes_read.add(response_buf.len() as u64); - Action::Reregister(Interest::WRITABLE) + match mode { + Mode::Rr => { + flow.xfer = request_buf.len(); + flow.state = ClientState::SendRequest; + ClientAction::Reregister(Interest::WRITABLE) + } + Mode::Crr => ClientAction::Reconnect, + } } else { - Action::Continue + ClientAction::Continue } } - Err(e) if e.kind() == ErrorKind::WouldBlock => Action::Continue, + Err(e) if e.kind() == ErrorKind::WouldBlock => ClientAction::Continue, Err(e) => { trace!("read error: {e}"); - Action::Remove + ClientAction::Remove } } } @@ -392,7 +518,12 @@ pub(crate) async fn run_server( // propagates as an error directly from this task. let binding_addr = params.data_addr; let thread0_listener = if num_threads > 1 { - Some(create_listener(0, num_threads, binding_addr, params.backlog)) + Some(create_listener( + 0, + num_threads, + binding_addr, + params.backlog, + )) } else { None }; @@ -676,7 +807,10 @@ fn handle_server_event( ServerState::RecvRequest => { let offset = request_buf.len() - flow.xfer; match flow.stream.read(&mut request_buf[offset..]) { - Ok(0) => Action::Remove, + Ok(0) => { + metrics.connections_closed.add(1); + Action::Remove + } Ok(n) => { flow.xfer -= n; if flow.xfer == 0 { @@ -692,6 +826,7 @@ fn handle_server_event( Err(e) if e.kind() == ErrorKind::WouldBlock => Action::Continue, Err(e) => { trace!("read error: {e}"); + metrics.connections_closed.add(1); Action::Remove } } @@ -714,6 +849,7 @@ fn handle_server_event( Err(e) if e.kind() == ErrorKind::WouldBlock => Action::Continue, Err(e) => { trace!("write error: {e}"); + metrics.connections_closed.add(1); Action::Remove } } From a954228de58e77ce92ed6d55dfdba6444e08f2f3 Mon Sep 17 00:00:00 2001 From: Usama Saqib Date: Tue, 2 Jun 2026 13:34:06 +0200 Subject: [PATCH 02/13] fix flow management so that FlowMap is a ring buffer and reuses slots --- lading/src/blackhole/tcp_crr.rs | 3 +- lading/src/blackhole/tcp_rr.rs | 3 +- lading/src/neper/flow.rs | 35 ++++++++++++++++------ lading/src/neper/rr.rs | 53 ++++++++++++++++++++++----------- 4 files changed, 66 insertions(+), 28 deletions(-) diff --git a/lading/src/blackhole/tcp_crr.rs b/lading/src/blackhole/tcp_crr.rs index 3cca19458..ba8f90ddc 100644 --- a/lading/src/blackhole/tcp_crr.rs +++ b/lading/src/blackhole/tcp_crr.rs @@ -24,7 +24,7 @@ use std::num::{NonZeroU16, NonZeroUsize}; use serde::{Deserialize, Serialize}; use super::General; -use crate::neper::rr::{self, ServerParams}; +use crate::neper::rr::{self, Mode, ServerParams}; fn default_nonzero_u16() -> NonZeroU16 { NonZeroU16::new(1).expect("1 is nonzero") @@ -136,6 +136,7 @@ impl TcpCrr { response_size: self.config.response_size.get(), no_delay: self.config.no_delay, backlog: self.config.backlog, + mode: Mode::Crr, }; rr::run_server(params, self.metric_labels, self.shutdown, "tcp_crr").await?; Ok(()) diff --git a/lading/src/blackhole/tcp_rr.rs b/lading/src/blackhole/tcp_rr.rs index f6aefb1f0..d2c919da2 100644 --- a/lading/src/blackhole/tcp_rr.rs +++ b/lading/src/blackhole/tcp_rr.rs @@ -22,7 +22,7 @@ use std::num::{NonZeroU16, NonZeroUsize}; use serde::{Deserialize, Serialize}; use super::General; -use crate::neper::rr::{self, ServerParams}; +use crate::neper::rr::{self, Mode, ServerParams}; fn default_nonzero_u16() -> NonZeroU16 { NonZeroU16::new(1).expect("1 is nonzero") @@ -133,6 +133,7 @@ impl TcpRr { response_size: self.config.response_size.get(), no_delay: self.config.no_delay, backlog: self.config.backlog, + mode: Mode::Rr, }; rr::run_server(params, self.metric_labels, self.shutdown, "tcp_rr").await?; Ok(()) diff --git a/lading/src/neper/flow.rs b/lading/src/neper/flow.rs index e4b95dc46..3c349ab6b 100644 --- a/lading/src/neper/flow.rs +++ b/lading/src/neper/flow.rs @@ -37,28 +37,45 @@ pub(crate) struct FlowMap { inner: Vec>>, } +/// Errors produced by `FlowMap`. +#[derive(thiserror::Error, Debug)] +pub(crate) enum FlowMapError { + /// No capacity + #[error("Server flow map is at capacity: {0}")] + NoCapacity(usize), +} + impl FlowMap { - pub(crate) fn new() -> Self { - Self { inner: Vec::new() } + pub(crate) fn new(flows: usize) -> Self { + Self { + inner: Vec::with_capacity(flows * 2), + } } - /// Insert a flow. Grows the backing vec if needed. - pub(crate) fn insert(&mut self, flow: Flow) { - let idx = flow.token.0; - if idx >= self.inner.len() { + /// Insert a flow. + pub(crate) fn insert(&mut self, flow: Flow) -> Result<(), FlowMapError> { + let idx = flow.token.0 % self.inner.capacity(); + if self.inner.len() < idx { self.inner.resize_with(idx + 1, || None); } - self.inner[idx] = Some(flow); + if self.inner[idx].is_none() { + self.inner[idx] = Some(flow); + return Ok(()); + } + + Err(FlowMapError::NoCapacity(self.inner.capacity())) } /// Get a mutable reference to the flow at the given token. pub(crate) fn get_mut(&mut self, token: Token) -> Option<&mut Flow> { - self.inner.get_mut(token.0).and_then(|slot| slot.as_mut()) + let idx = token.0 % self.inner.capacity(); + self.inner.get_mut(idx).and_then(|slot| slot.as_mut()) } /// Remove and return the flow at the given token. pub(crate) fn remove(&mut self, token: Token) -> Option> { - self.inner.get_mut(token.0).and_then(Option::take) + let idx = token.0 % self.inner.capacity(); + self.inner.get_mut(idx).and_then(Option::take) } } diff --git a/lading/src/neper/rr.rs b/lading/src/neper/rr.rs index 6c124244b..afb2995ac 100644 --- a/lading/src/neper/rr.rs +++ b/lading/src/neper/rr.rs @@ -95,6 +95,8 @@ pub(crate) struct ServerParams { pub(crate) no_delay: bool, /// Listener backlog. pub(crate) backlog: i32, + /// RR or CRR. + pub(crate) mode: Mode, } enum ClientState { @@ -121,6 +123,7 @@ enum ClientAction { enum ServerState { RecvRequest, SendResponse, + CloseStream, } const LISTENER_TOKEN: Token = Token(0); @@ -275,7 +278,7 @@ fn client_thread_main( let mut events = Events::with_capacity(num_flows as usize); let request_buf = vec![0u8; request_size]; let mut response_buf = vec![0u8; response_size]; - let mut flows: FlowMap = FlowMap::new(); + let mut flows: FlowMap = FlowMap::new(num_flows as usize); let mut next_token: usize = 0; for _ in 0..num_flows { @@ -291,12 +294,14 @@ fn client_thread_main( poll.registry() .register(&mut stream, token, Interest::WRITABLE) .expect("failed to register flow"); - flows.insert(Flow { - stream, - token, - state: ClientState::SendRequest, - xfer: request_size, - }); + flows + .insert(Flow { + stream, + token, + state: ClientState::SendRequest, + xfer: request_size, + }) + .expect("client should never be able to exceed FlowMap capacity"); metrics.connections_initiated.add(1); } Err(e) => { @@ -564,6 +569,7 @@ pub(crate) async fn run_server( &flag, &tm[i as usize], tx, + params.mode, ); }); handles.push(handle); @@ -713,6 +719,7 @@ fn server_thread_main( shutdown_flag: &AtomicBool, metrics: &ThreadMetrics, ready_tx: mpsc::UnboundedSender<()>, + mode: Mode, ) { // Thread 0 uses the pre-built listener (with BPF already attached); others // bind their own sockets that join the existing reuseport group. @@ -736,7 +743,7 @@ fn server_thread_main( let mut request_buf = vec![0u8; request_size]; let response_buf = vec![0u8; response_size]; - let mut flows: FlowMap = FlowMap::new(); + let mut flows: FlowMap = FlowMap::new(num_flows as usize); let mut next_token: usize = 1; loop { @@ -754,16 +761,22 @@ fn server_thread_main( set_nodelay_mio(&stream, no_delay); let token = Token(next_token); next_token += 1; - let mut mio_stream = stream; - poll.registry() - .register(&mut mio_stream, token, Interest::READABLE) - .expect("failed to register flow"); - flows.insert(Flow { - stream: mio_stream, + // Insert first: `insert` takes the flow by value and + // drops it (closing the fd) on failure, so there is + // nothing registered to clean up on the error path. + if let Err(err) = flows.insert(Flow { + stream, token, state: ServerState::RecvRequest, xfer: request_size, - }); + }) { + warn!("failed to insert flow in server FlowMap: {err}"); + break; + } + let flow = flows.get_mut(token).expect("flow was just inserted"); + poll.registry() + .register(&mut flow.stream, token, Interest::READABLE) + .expect("failed to register flow"); metrics.connections_accepted.add(1); } Err(ref e) if e.kind() == ErrorKind::WouldBlock => break, @@ -782,7 +795,8 @@ fn server_thread_main( let Some(fl) = flows.get_mut(token) else { continue; }; - let action = handle_server_event(fl, &mut request_buf, &response_buf, metrics); + let action = + handle_server_event(fl, &mut request_buf, &response_buf, metrics, mode); flow::apply_action(action, token, &mut flows, poll.registry()); } } @@ -802,6 +816,7 @@ fn handle_server_event( request_buf: &mut [u8], response_buf: &[u8], metrics: &ThreadMetrics, + mode: Mode, ) -> Action { match flow.state { ServerState::RecvRequest => { @@ -838,7 +853,10 @@ fn handle_server_event( flow.xfer -= n; if flow.xfer == 0 { flow.xfer = request_buf.len(); - flow.state = ServerState::RecvRequest; + match mode { + Mode::Rr => flow.state = ServerState::RecvRequest, + Mode::Crr => flow.state = ServerState::CloseStream, + } metrics.responses_sent.add(1); metrics.bytes_written.add(response_buf.len() as u64); Action::Reregister(Interest::READABLE) @@ -854,5 +872,6 @@ fn handle_server_event( } } } + ServerState::CloseStream => Action::Remove, } } From 4ca89fa859f75ab296beeffb54e156ac3e2733ee Mon Sep 17 00:00:00 2001 From: Usama Saqib Date: Tue, 2 Jun 2026 13:47:51 +0200 Subject: [PATCH 03/13] fix off-by-one --- lading/src/neper/flow.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lading/src/neper/flow.rs b/lading/src/neper/flow.rs index 3c349ab6b..06a2dc972 100644 --- a/lading/src/neper/flow.rs +++ b/lading/src/neper/flow.rs @@ -55,7 +55,7 @@ impl FlowMap { /// Insert a flow. pub(crate) fn insert(&mut self, flow: Flow) -> Result<(), FlowMapError> { let idx = flow.token.0 % self.inner.capacity(); - if self.inner.len() < idx { + if self.inner.len() <= idx { self.inner.resize_with(idx + 1, || None); } if self.inner[idx].is_none() { From ac7aa5c2842bbfb8cfe27bae1717529e1662bd8a Mon Sep 17 00:00:00 2001 From: Usama Saqib Date: Tue, 2 Jun 2026 14:17:49 +0200 Subject: [PATCH 04/13] use warn to report errors --- lading/src/neper/rr.rs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/lading/src/neper/rr.rs b/lading/src/neper/rr.rs index afb2995ac..3772ee5a6 100644 --- a/lading/src/neper/rr.rs +++ b/lading/src/neper/rr.rs @@ -16,7 +16,7 @@ use std::time::{Duration, Instant}; use mio::net::{TcpListener, TcpStream}; use mio::{Events, Interest, Poll, Token}; use tokio::sync::mpsc; -use tracing::{info, trace, warn}; +use tracing::{info, warn}; use crate::neper::bpf; use crate::neper::flow::{self, Action, Flow, FlowMap}; @@ -305,7 +305,7 @@ fn client_thread_main( metrics.connections_initiated.add(1); } Err(e) => { - trace!("connection to {addr} failed: {e}"); + warn!("connection to {addr} failed: {e}"); metrics.connections_failed.add(1); } } @@ -357,13 +357,13 @@ fn apply_client_action( { let sock = socket2::SockRef::from(&new_stream); if let Err(e) = sock.set_tcp_nodelay(no_delay) { - trace!("failed to set TCP_NODELAY on reconnect: {e}"); + warn!("failed to set TCP_NODELAY on reconnect: {e}"); } } if let Err(e) = registry.register(&mut new_stream, flow.token, Interest::WRITABLE) { - trace!("reconnect register failed: {e}"); + warn!("reconnect register failed: {e}"); metrics.connections_failed.add(1); let _ = flows.remove(token); } else { @@ -373,7 +373,7 @@ fn apply_client_action( } } Err(e) => { - trace!("reconnect to {addr} failed: {e}"); + warn!("reconnect to {addr} failed: {e}"); metrics.connections_failed.add(1); let _ = flows.remove(token); } @@ -407,12 +407,12 @@ fn handle_client_event( // fall through } Ok(Some(e)) => { - trace!("connect failed: {e}"); + warn!("connect failed: {e}"); metrics.connections_failed.add(1); return ClientAction::Reconnect; } Err(e) => { - trace!("take_error failed: {e}"); + warn!("take_error failed: {e}"); metrics.connections_failed.add(1); return ClientAction::Reconnect; } @@ -438,7 +438,7 @@ fn handle_client_event( } Err(e) if e.kind() == ErrorKind::WouldBlock => ClientAction::Continue, Err(e) => { - trace!("write error: {e}"); + warn!("write error: {e}"); ClientAction::Remove } } @@ -468,7 +468,7 @@ fn handle_client_event( } Err(e) if e.kind() == ErrorKind::WouldBlock => ClientAction::Continue, Err(e) => { - trace!("read error: {e}"); + warn!("read error: {e}"); ClientAction::Remove } } @@ -807,7 +807,7 @@ fn server_thread_main( fn set_nodelay_mio(stream: &TcpStream, no_delay: bool) { let sock = socket2::SockRef::from(stream); if let Err(e) = sock.set_tcp_nodelay(no_delay) { - trace!("failed to set TCP_NODELAY: {e}"); + warn!("failed to set TCP_NODELAY: {e}"); } } @@ -840,7 +840,7 @@ fn handle_server_event( } Err(e) if e.kind() == ErrorKind::WouldBlock => Action::Continue, Err(e) => { - trace!("read error: {e}"); + warn!("read error: {e}"); metrics.connections_closed.add(1); Action::Remove } @@ -866,7 +866,7 @@ fn handle_server_event( } Err(e) if e.kind() == ErrorKind::WouldBlock => Action::Continue, Err(e) => { - trace!("write error: {e}"); + warn!("write error: {e}"); metrics.connections_closed.add(1); Action::Remove } From cacb587ca182ba6a3774e247bb28c19032d56432 Mon Sep 17 00:00:00 2001 From: Usama Saqib Date: Tue, 2 Jun 2026 17:33:07 +0200 Subject: [PATCH 05/13] increment connections closed when connection closes cleanly --- lading/src/neper/rr.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/lading/src/neper/rr.rs b/lading/src/neper/rr.rs index 3772ee5a6..bfb2faf61 100644 --- a/lading/src/neper/rr.rs +++ b/lading/src/neper/rr.rs @@ -380,6 +380,7 @@ fn apply_client_action( } } ClientAction::Remove => { + metrics.connections_closed.add(1); if let Some(mut flow) = flows.remove(token) { let _ = registry.deregister(&mut flow.stream); } From e8839970164ec00849097dd4d4ae088458da3787 Mon Sep 17 00:00:00 2001 From: Usama Saqib Date: Tue, 2 Jun 2026 18:05:06 +0200 Subject: [PATCH 06/13] guard against token mismatch is FlowMap --- lading/src/neper/flow.rs | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/lading/src/neper/flow.rs b/lading/src/neper/flow.rs index 06a2dc972..4036d1323 100644 --- a/lading/src/neper/flow.rs +++ b/lading/src/neper/flow.rs @@ -67,15 +67,31 @@ impl FlowMap { } /// Get a mutable reference to the flow at the given token. + /// + /// Returns `None` for an empty slot. A slot occupied by a flow with a + /// different token indicates a token-collision bug (tokens congruent + /// modulo capacity share a slot), so it is asserted against. pub(crate) fn get_mut(&mut self, token: Token) -> Option<&mut Flow> { let idx = token.0 % self.inner.capacity(); - self.inner.get_mut(idx).and_then(|slot| slot.as_mut()) + let flow = self.inner.get_mut(idx).and_then(|slot| slot.as_mut())?; + assert_eq!( + flow.token, token, + "FlowMap slot {idx} holds a flow with a mismatched token" + ); + Some(flow) } /// Remove and return the flow at the given token. + /// + /// Returns `None` for an empty slot; asserts the occupant's token matches. pub(crate) fn remove(&mut self, token: Token) -> Option> { let idx = token.0 % self.inner.capacity(); - self.inner.get_mut(idx).and_then(Option::take) + let flow = self.inner.get_mut(idx).and_then(Option::take)?; + assert_eq!( + flow.token, token, + "FlowMap slot {idx} holds a flow with a mismatched token" + ); + Some(flow) } } From 69e43c064f4de06b65aaab6d0400fa05f6823747 Mon Sep 17 00:00:00 2001 From: Usama Saqib Date: Tue, 2 Jun 2026 18:20:09 +0200 Subject: [PATCH 07/13] for tcp_crr, when a very large number of flows are created connect calls may fail with EADDRNOTAVAIL due to port exhaustion. We reduce chances of this by increase the local port range for sockets created by the generator. --- lading/src/neper/rr.rs | 103 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 100 insertions(+), 3 deletions(-) diff --git a/lading/src/neper/rr.rs b/lading/src/neper/rr.rs index bfb2faf61..3f744ba08 100644 --- a/lading/src/neper/rr.rs +++ b/lading/src/neper/rr.rs @@ -179,7 +179,7 @@ pub(crate) async fn run_client( ), ))); } - match net::TcpStream::connect(params.control_addr) { + match generator_connect_blocking(params.control_addr) { Ok(mut conn) => { conn.set_read_timeout(Some(HANDSHAKE_TIMEOUT)) .expect("set_read_timeout on connected TcpStream must succeed"); @@ -263,6 +263,103 @@ pub(crate) async fn run_client( Ok(()) } +/// `IP_LOCAL_PORT_RANGE` socket option (Linux >= 6.3). Not yet exposed by the +/// `libc` crate, so it is defined here from ``. +const IP_LOCAL_PORT_RANGE: libc::c_int = 51; +/// Lowest ephemeral local port the generator may use for its sockets. +const LOCAL_PORT_LOW: u16 = 1024; +/// Highest ephemeral local port the generator may use for its sockets. +const LOCAL_PORT_HIGH: u16 = 60999; + +/// Set once the kernel is found not to support `IP_LOCAL_PORT_RANGE`, so the +/// "unsupported, falling back" warning is logged a single time rather than on +/// every socket the generator opens. +static PORT_RANGE_UNSUPPORTED: AtomicBool = AtomicBool::new(false); + +/// Increase `socket`'s automatic source-port selection to +/// `[LOCAL_PORT_LOW, LOCAL_PORT_HIGH]` via `IP_LOCAL_PORT_RANGE`. The option +/// value packs the high port in the upper 16 bits and the low port in the +/// lower 16 bits. +/// +/// This is done to reduce `EADDRNOTAVAIL` errors when a large number of flows are +/// created especially for tcp_crr workload. +/// Since port ranges are specific to network namespaces, this should not cause issues +/// for other daemons coming online on lower port ranges when lading is launched in its own +/// namespace. +fn set_local_port_range(socket: &socket2::Socket) -> io::Result<()> { + let value: u32 = (u32::from(LOCAL_PORT_HIGH) << 16) | u32::from(LOCAL_PORT_LOW); + // SAFETY: `socket` owns a valid fd for the duration of the borrow, and we + // pass a pointer to a correctly sized `u32` as the option value, exactly as + // `IP_LOCAL_PORT_RANGE` expects. + let ret = unsafe { + libc::setsockopt( + socket.as_raw_fd(), + libc::IPPROTO_IP, + IP_LOCAL_PORT_RANGE, + std::ptr::addr_of!(value).cast::(), + std::mem::size_of::() + .try_into() + .expect("u32 size fits in socklen_t"), + ) + }; + if ret != 0 { + let err = io::Error::last_os_error(); + // ENOPROTOOPT / EOPNOTSUPP means the running kernel predates + // IP_LOCAL_PORT_RANGE (< 6.3). Degrade gracefully: fall back to the + // system-wide ephemeral range rather than failing the connection. + if matches!( + err.raw_os_error(), + Some(libc::ENOPROTOOPT | libc::EOPNOTSUPP) + ) { + if !PORT_RANGE_UNSUPPORTED.swap(true, Relaxed) { + warn!( + "IP_LOCAL_PORT_RANGE not supported by this kernel; \ + falling back to the system ephemeral port range" + ); + } + return Ok(()); + } + return Err(err); + } + Ok(()) +} + +/// Create a TCP socket for the generator with its local port range constrained +/// to `[LOCAL_PORT_LOW, LOCAL_PORT_HIGH]`. +fn new_generator_socket(addr: SocketAddr) -> io::Result { + let socket = socket2::Socket::new( + socket2::Domain::for_address(addr), + socket2::Type::STREAM, + Some(socket2::Protocol::TCP), + )?; + set_local_port_range(&socket)?; + Ok(socket) +} + +/// Blocking connect to `addr` using a port-range-constrained generator socket. +fn generator_connect_blocking(addr: SocketAddr) -> io::Result { + let socket = new_generator_socket(addr)?; + socket.connect(&addr.into())?; + Ok(net::TcpStream::from(socket)) +} + +/// Non-blocking connect to `addr` using a port-range-constrained generator +/// socket, returning a mio stream whose connect is in progress (completion is +/// signalled by a `WRITABLE` readiness event). +fn generator_connect_nonblocking(addr: SocketAddr) -> io::Result { + let socket = new_generator_socket(addr)?; + socket.set_nonblocking(true)?; + // A non-blocking connect reports in-progress as EINPROGRESS / WouldBlock; + // that is expected and not an error. + match socket.connect(&addr.into()) { + Ok(()) => {} + Err(e) if e.raw_os_error() == Some(libc::EINPROGRESS) => {} + Err(e) if e.kind() == ErrorKind::WouldBlock => {} + Err(e) => return Err(e), + } + Ok(TcpStream::from_std(net::TcpStream::from(socket))) +} + #[allow(clippy::too_many_arguments)] fn client_thread_main( addr: SocketAddr, @@ -282,7 +379,7 @@ fn client_thread_main( let mut next_token: usize = 0; for _ in 0..num_flows { - match net::TcpStream::connect(addr) { + match generator_connect_blocking(addr) { Ok(std_stream) => { let _ = std_stream.set_nodelay(no_delay); std_stream @@ -352,7 +449,7 @@ fn apply_client_action( return; }; let _ = registry.deregister(&mut flow.stream); - match TcpStream::connect(addr) { + match generator_connect_nonblocking(addr) { Ok(mut new_stream) => { { let sock = socket2::SockRef::from(&new_stream); From 301c265c8022b47dc7dcd2c15e51dcfad4d9bbe4 Mon Sep 17 00:00:00 2001 From: Usama Saqib Date: Tue, 2 Jun 2026 18:20:55 +0200 Subject: [PATCH 08/13] remove comment about increasing local port range with sysctl --- lading/src/generator/tcp_crr.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/lading/src/generator/tcp_crr.rs b/lading/src/generator/tcp_crr.rs index 5ee5c4d90..ceb5e2888 100644 --- a/lading/src/generator/tcp_crr.rs +++ b/lading/src/generator/tcp_crr.rs @@ -9,14 +9,6 @@ //! The event-loop machinery lives in [`crate::neper::rr`]; this module is a //! thin wrapper that supplies configuration and selects [`Mode::Crr`]. //! -//! ## Ephemeral port / `TIME_WAIT` -//! -//! Each transaction consumes one client-side ephemeral port for the duration -//! of `TIME_WAIT` (~60s by default on Linux). At even a few thousand -//! transactions/sec the ~28k usable port range exhausts in seconds. Widen -//! `net.ipv4.ip_local_port_range` and consider lowering `tcp_fin_timeout` -//! before running sustained CRR workloads. -//! //! ## Metrics //! //! `connections_initiated`: Successful client-side connect completions From 21fc4270047828788fc1948d03428c942ab494ff Mon Sep 17 00:00:00 2001 From: Usama Saqib Date: Tue, 2 Jun 2026 18:27:42 +0200 Subject: [PATCH 09/13] address clippy issues --- lading/src/neper/rr.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lading/src/neper/rr.rs b/lading/src/neper/rr.rs index 3f744ba08..b1a0676e1 100644 --- a/lading/src/neper/rr.rs +++ b/lading/src/neper/rr.rs @@ -282,7 +282,7 @@ static PORT_RANGE_UNSUPPORTED: AtomicBool = AtomicBool::new(false); /// lower 16 bits. /// /// This is done to reduce `EADDRNOTAVAIL` errors when a large number of flows are -/// created especially for tcp_crr workload. +/// created especially for `tcp_crr` workload. /// Since port ranges are specific to network namespaces, this should not cause issues /// for other daemons coming online on lower port ranges when lading is launched in its own /// namespace. From 935256d6412c8653f7cf039cb59b2b1bfda35554 Mon Sep 17 00:00:00 2001 From: Usama Saqib Date: Tue, 2 Jun 2026 18:34:38 +0200 Subject: [PATCH 10/13] remove non ascii chars --- lading/src/blackhole/tcp_crr.rs | 2 +- lading/src/blackhole/tcp_rr.rs | 2 +- lading/src/neper/flow.rs | 2 +- lading/src/neper/rr.rs | 10 +++++----- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/lading/src/blackhole/tcp_crr.rs b/lading/src/blackhole/tcp_crr.rs index ba8f90ddc..fb437df7c 100644 --- a/lading/src/blackhole/tcp_crr.rs +++ b/lading/src/blackhole/tcp_crr.rs @@ -1,4 +1,4 @@ -//! TCP connect/request/response (`tcp_crr`) blackhole — the server side. +//! TCP connect/request/response (`tcp_crr`) blackhole - the server side. //! Based on //! //! Listens for incoming connections and, for each flow, reads a fixed-size diff --git a/lading/src/blackhole/tcp_rr.rs b/lading/src/blackhole/tcp_rr.rs index d2c919da2..9da929ace 100644 --- a/lading/src/blackhole/tcp_rr.rs +++ b/lading/src/blackhole/tcp_rr.rs @@ -1,4 +1,4 @@ -//! TCP request/response (`tcp_rr`) blackhole — the server side. +//! TCP request/response (`tcp_rr`) blackhole - the server side. //! Based on //! //! Listens for incoming connections and, for each flow, reads a fixed-size diff --git a/lading/src/neper/flow.rs b/lading/src/neper/flow.rs index 4036d1323..ed8af24df 100644 --- a/lading/src/neper/flow.rs +++ b/lading/src/neper/flow.rs @@ -31,7 +31,7 @@ pub(crate) enum Action { /// Token-indexed flow storage. /// /// Flows are stored in a `Vec` indexed by token value. Removed slots become -/// `None` and are not reused — tokens are monotonically increasing, matching +/// `None` and are not reused - tokens are monotonically increasing, matching /// neper's behavior. pub(crate) struct FlowMap { inner: Vec>>, diff --git a/lading/src/neper/rr.rs b/lading/src/neper/rr.rs index b1a0676e1..2f03b071b 100644 --- a/lading/src/neper/rr.rs +++ b/lading/src/neper/rr.rs @@ -57,7 +57,7 @@ pub(crate) enum Mode { /// Parameters for [`run_client`]. /// -/// Flow count is *not* a client parameter — it is owned by the server and +/// Flow count is *not* a client parameter - it is owned by the server and /// communicated to the client over the control connection during startup. pub(crate) struct ClientParams { /// Address of the server's data port. @@ -114,7 +114,7 @@ enum ClientState { enum ClientAction { Continue, Reregister(Interest), - /// CRR: response complete — close this socket and open a new one with the + /// CRR: response complete - close this socket and open a new one with the /// same `Token`. Handled by [`apply_client_action`]. Reconnect, Remove, @@ -130,7 +130,7 @@ const LISTENER_TOKEN: Token = Token(0); /// Control-channel handshake: server writes `flows` to the accepted control /// connection as a 2-byte big-endian `u16` and closes; client reads the same -/// 2 bytes after connecting. Internal protocol — no magic / version byte. +/// 2 bytes after connecting. Internal protocol - no magic / version byte. const HANDSHAKE_LEN: usize = 2; const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(5); @@ -492,7 +492,7 @@ fn handle_client_event( response_buf: &mut [u8], metrics: &ThreadMetrics, ) -> ClientAction { - // Connecting → SendRequest transition: mio is edge-triggered, so the + // Connecting -> SendRequest transition: mio is edge-triggered, so the // single WRITABLE event that signaled connect completion is also the // event that must drive the first write. Transition state and fall // through to SendRequest in the same call. @@ -617,7 +617,7 @@ pub(crate) async fn run_server( // Pre-build thread 0's listener here so the BPF program is attached to the // reuseport group before any other thread calls bind(). This removes the - // need for a cross-thread BPF barrier — if bind fails or panics, it + // need for a cross-thread BPF barrier - if bind fails or panics, it // propagates as an error directly from this task. let binding_addr = params.data_addr; let thread0_listener = if num_threads > 1 { From 1bc4a4a7a5ac70862854345eb729f747cd4b6301 Mon Sep 17 00:00:00 2001 From: Usama Saqib Date: Wed, 3 Jun 2026 14:47:39 +0200 Subject: [PATCH 11/13] in order to Use dedicated ports on generator side for each flow. In order to avoid EADDRNOTAVAIL failures due to race conditions when creating new connections, we assign a dedicated port to each flow. This prevents port collisions when multiple threads are attempting to create a connection at the same time. To ensure that the port has been freed before it is reused in the ClientAction::Reconnect pathway, we forcefully trigger a RST packet to be sent so that the generator's kernel frees the port when the tcp connection is dropped. This way the upcoming new connection for the same flow will not fail due to EADDRINUSE. To accomplish this we use a trick where we set the SO_LINGER socket option to 0 to force an abortive close on close(fd). At the application protocol level, when the client initiates an abortive close the server has already transitioned into CLOSESTREAM state where it will close drop the connection from its end anyway. --- lading/src/neper/flow.rs | 1 + lading/src/neper/rr.rs | 128 ++++++++++++++++++++++++++++++++------- 2 files changed, 108 insertions(+), 21 deletions(-) diff --git a/lading/src/neper/flow.rs b/lading/src/neper/flow.rs index ed8af24df..c7e289bfe 100644 --- a/lading/src/neper/flow.rs +++ b/lading/src/neper/flow.rs @@ -15,6 +15,7 @@ pub(crate) struct Flow { pub(crate) state: S, /// Remaining bytes for the current I/O operation. pub(crate) xfer: usize, + pub(crate) port: u16, } /// What the event loop should do after processing a flow event. diff --git a/lading/src/neper/rr.rs b/lading/src/neper/rr.rs index 2f03b071b..b2cbc07fd 100644 --- a/lading/src/neper/rr.rs +++ b/lading/src/neper/rr.rs @@ -44,6 +44,9 @@ pub enum Error { /// Invalid configuration. #[error("invalid config: {0}")] Config(String), + // Too many flows + #[error("too many flows will cause port exhaustion: {0}")] + TooManyFlows(u16), } /// Which neper-style protocol the client is driving. @@ -146,6 +149,7 @@ const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(5); /// /// Returns an error if configuration is invalid, the blackhole control port /// is never reachable, or a worker thread panics. +#[allow(clippy::too_many_lines)] pub(crate) async fn run_client( params: ClientParams, metric_labels: Vec<(String, String)>, @@ -179,7 +183,7 @@ pub(crate) async fn run_client( ), ))); } - match generator_connect_blocking(params.control_addr) { + match generator_connect_blocking(params.control_addr, None) { Ok(mut conn) => { conn.set_read_timeout(Some(HANDSHAKE_TIMEOUT)) .expect("set_read_timeout on connected TcpStream must succeed"); @@ -211,7 +215,17 @@ pub(crate) async fn run_client( ))); } + if flows > (LOCAL_PORT_HIGH - LOCAL_PORT_LOW) { + return Err(Error::TooManyFlows(flows)); + } + let flow_dist = thread::distribute_flows(flows, params.threads); + let mut port_dist = Vec::new(); + for i in 0..params.threads { + port_dist.insert(i as usize, flow_dist[..i as usize].iter().sum()); + } + + let port_dist = port_dist; let thread_metrics = Arc::new( (0..params.threads) @@ -236,12 +250,14 @@ pub(crate) async fn run_client( let mut worker_handles = Vec::with_capacity(params.threads as usize); for i in 0..params.threads { let thread_flows = flow_dist[i as usize]; + let ports_base = port_dist[i as usize]; let flag = Arc::clone(&shutdown_flag); let tm = Arc::clone(&thread_metrics); let handle = thread::spawn_named(&format!("{thread_prefix}-client-{i}"), move || { client_thread_main( data_addr, thread_flows, + ports_base, request_size, response_size, no_delay, @@ -337,8 +353,19 @@ fn new_generator_socket(addr: SocketAddr) -> io::Result { } /// Blocking connect to `addr` using a port-range-constrained generator socket. -fn generator_connect_blocking(addr: SocketAddr) -> io::Result { +/// +/// When `port` is `Some`, the socket is bound to that local source port (on the +/// wildcard address for `addr`'s family) before connecting; otherwise the +/// kernel picks a source port from the constrained range. +fn generator_connect_blocking(addr: SocketAddr, port: Option) -> io::Result { let socket = new_generator_socket(addr)?; + if let Some(port) = port { + let bind_addr = match addr { + SocketAddr::V4(_) => SocketAddr::from((net::Ipv4Addr::UNSPECIFIED, port)), + SocketAddr::V6(_) => SocketAddr::from((net::Ipv6Addr::UNSPECIFIED, port)), + }; + socket.bind(&bind_addr.into())?; + } socket.connect(&addr.into())?; Ok(net::TcpStream::from(socket)) } @@ -346,8 +373,19 @@ fn generator_connect_blocking(addr: SocketAddr) -> io::Result { /// Non-blocking connect to `addr` using a port-range-constrained generator /// socket, returning a mio stream whose connect is in progress (completion is /// signalled by a `WRITABLE` readiness event). -fn generator_connect_nonblocking(addr: SocketAddr) -> io::Result { +/// +/// When `port` is `Some`, the socket is bound to that local source port (on the +/// wildcard address for `addr`'s family) before connecting; otherwise the +/// kernel picks a source port from the constrained range. +fn generator_connect_nonblocking(addr: SocketAddr, port: Option) -> io::Result { let socket = new_generator_socket(addr)?; + if let Some(port) = port { + let bind_addr = match addr { + SocketAddr::V4(_) => SocketAddr::from((net::Ipv4Addr::UNSPECIFIED, port)), + SocketAddr::V6(_) => SocketAddr::from((net::Ipv6Addr::UNSPECIFIED, port)), + }; + socket.bind(&bind_addr.into())?; + } socket.set_nonblocking(true)?; // A non-blocking connect reports in-progress as EINPROGRESS / WouldBlock; // that is expected and not an error. @@ -364,6 +402,7 @@ fn generator_connect_nonblocking(addr: SocketAddr) -> io::Result { fn client_thread_main( addr: SocketAddr, num_flows: u16, + ports_base: u16, request_size: usize, response_size: usize, no_delay: bool, @@ -378,15 +417,28 @@ fn client_thread_main( let mut flows: FlowMap = FlowMap::new(num_flows as usize); let mut next_token: usize = 0; - for _ in 0..num_flows { - match generator_connect_blocking(addr) { + let mut f = 0; + let mut backwards = 0; + loop { + if f >= num_flows { + break; + } + + f += 1; + + let token = Token(next_token); + let port = if backwards > 0 { + LOCAL_PORT_HIGH - ports_base - f - (backwards - 1) + } else { + LOCAL_PORT_LOW + ports_base + f + }; + match generator_connect_blocking(addr, Some(port)) { Ok(std_stream) => { let _ = std_stream.set_nodelay(no_delay); std_stream .set_nonblocking(true) .expect("failed to set nonblocking"); let mut stream = TcpStream::from_std(std_stream); - let token = Token(next_token); next_token += 1; poll.registry() .register(&mut stream, token, Interest::WRITABLE) @@ -395,15 +447,22 @@ fn client_thread_main( .insert(Flow { stream, token, + port, state: ClientState::SendRequest, xfer: request_size, }) .expect("client should never be able to exceed FlowMap capacity"); metrics.connections_initiated.add(1); + backwards = 0; } Err(e) => { - warn!("connection to {addr} failed: {e}"); - metrics.connections_failed.add(1); + if e.kind() == ErrorKind::AddrInUse && backwards < num_flows { + f -= 1; + backwards += 1; + } else { + warn!("connection to {addr} failed from {port}: {e}"); + metrics.connections_failed.add(1); + } } } } @@ -445,11 +504,29 @@ fn apply_client_action( } } ClientAction::Reconnect => { - let Some(flow) = flows.get_mut(token) else { + // Take the flow out of the map so the old socket can be fully + // closed before the new connection binds to the same source port. + // The old socket holds `port` until its fd is dropped, so a bind + // to `port` while it is still open would fail with EADDRINUSE. + let Some(flow) = flows.remove(token) else { return; }; - let _ = registry.deregister(&mut flow.stream); - match generator_connect_nonblocking(addr) { + let Flow { + mut stream, port, .. + } = flow; + let _ = registry.deregister(&mut stream); + { + // Abortive close: SO_LINGER with a zero timeout makes the drop + // below emit a RST instead of a FIN, so the socket skips + // TIME_WAIT and `port` is released immediately for the bind. + let sock = socket2::SockRef::from(&stream); + if let Err(e) = sock.set_linger(Some(Duration::from_secs(0))) { + warn!("failed to set SO_LINGER for abortive close on reconnect: {e}"); + } + } + // Drop the old stream now (sends RST, frees `port`) before binding. + drop(stream); + match generator_connect_nonblocking(addr, Some(port)) { Ok(mut new_stream) => { { let sock = socket2::SockRef::from(&new_stream); @@ -457,22 +534,23 @@ fn apply_client_action( warn!("failed to set TCP_NODELAY on reconnect: {e}"); } } - if let Err(e) = - registry.register(&mut new_stream, flow.token, Interest::WRITABLE) - { + if let Err(e) = registry.register(&mut new_stream, token, Interest::WRITABLE) { warn!("reconnect register failed: {e}"); metrics.connections_failed.add(1); - let _ = flows.remove(token); - } else { - flow.stream = new_stream; - flow.state = ClientState::Connecting; - flow.xfer = 0; + } else if let Err(err) = flows.insert(Flow { + stream: new_stream, + token, + port, + state: ClientState::Connecting, + xfer: 0, + }) { + warn!("failed to reinsert reconnected flow: {err}"); + metrics.connections_failed.add(1); } } Err(e) => { warn!("reconnect to {addr} failed: {e}"); metrics.connections_failed.add(1); - let _ = flows.remove(token); } } } @@ -480,6 +558,13 @@ fn apply_client_action( metrics.connections_closed.add(1); if let Some(mut flow) = flows.remove(token) { let _ = registry.deregister(&mut flow.stream); + // Abortive close (RST) so the source port skips TIME_WAIT and + // is reclaimed immediately rather than lingering 2*MSL. + let sock = socket2::SockRef::from(&flow.stream); + if let Err(e) = sock.set_linger(Some(Duration::from_secs(0))) { + warn!("failed to set SO_LINGER for abortive close: {e}"); + } + // `flow` (and its stream fd) dropped here -> RST sent. } } } @@ -867,8 +952,9 @@ fn server_thread_main( token, state: ServerState::RecvRequest, xfer: request_size, + port: 0, }) { - warn!("failed to insert flow in server FlowMap: {err}"); + warn!("failed to insert flow in server FlowMap: {err} {0}", token.0); break; } let flow = flows.get_mut(token).expect("flow was just inserted"); From 5d0b2fdfa9a055e0aa68b6b1ad4bf5d94ad811e5 Mon Sep 17 00:00:00 2001 From: Usama Saqib Date: Wed, 3 Jun 2026 14:48:06 +0200 Subject: [PATCH 12/13] format --- lading/src/neper/rr.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lading/src/neper/rr.rs b/lading/src/neper/rr.rs index b2cbc07fd..091bb2bd0 100644 --- a/lading/src/neper/rr.rs +++ b/lading/src/neper/rr.rs @@ -221,7 +221,7 @@ pub(crate) async fn run_client( let flow_dist = thread::distribute_flows(flows, params.threads); let mut port_dist = Vec::new(); - for i in 0..params.threads { + for i in 0..params.threads { port_dist.insert(i as usize, flow_dist[..i as usize].iter().sum()); } @@ -954,7 +954,10 @@ fn server_thread_main( xfer: request_size, port: 0, }) { - warn!("failed to insert flow in server FlowMap: {err} {0}", token.0); + warn!( + "failed to insert flow in server FlowMap: {err} {0}", + token.0 + ); break; } let flow = flows.get_mut(token).expect("flow was just inserted"); From d4e7bf424ea2a0e3c4f8c4e21f1b125fcffa2d27 Mon Sep 17 00:00:00 2001 From: Usama Saqib Date: Thu, 4 Jun 2026 13:35:53 +0200 Subject: [PATCH 13/13] remove manual port management for flows pinning ports to flows was changing the behavior on the bpf side since the 5-tuple was becoming less variable. This meant that the effect of flow tracking in bpf was different due to less overhead from managing more new flows. --- lading/src/neper/flow.rs | 1 - lading/src/neper/rr.rs | 100 ++++++++------------------------------- 2 files changed, 21 insertions(+), 80 deletions(-) diff --git a/lading/src/neper/flow.rs b/lading/src/neper/flow.rs index c7e289bfe..ed8af24df 100644 --- a/lading/src/neper/flow.rs +++ b/lading/src/neper/flow.rs @@ -15,7 +15,6 @@ pub(crate) struct Flow { pub(crate) state: S, /// Remaining bytes for the current I/O operation. pub(crate) xfer: usize, - pub(crate) port: u16, } /// What the event loop should do after processing a flow event. diff --git a/lading/src/neper/rr.rs b/lading/src/neper/rr.rs index 091bb2bd0..99c328042 100644 --- a/lading/src/neper/rr.rs +++ b/lading/src/neper/rr.rs @@ -44,9 +44,6 @@ pub enum Error { /// Invalid configuration. #[error("invalid config: {0}")] Config(String), - // Too many flows - #[error("too many flows will cause port exhaustion: {0}")] - TooManyFlows(u16), } /// Which neper-style protocol the client is driving. @@ -183,7 +180,7 @@ pub(crate) async fn run_client( ), ))); } - match generator_connect_blocking(params.control_addr, None) { + match generator_connect_blocking(params.control_addr) { Ok(mut conn) => { conn.set_read_timeout(Some(HANDSHAKE_TIMEOUT)) .expect("set_read_timeout on connected TcpStream must succeed"); @@ -215,17 +212,7 @@ pub(crate) async fn run_client( ))); } - if flows > (LOCAL_PORT_HIGH - LOCAL_PORT_LOW) { - return Err(Error::TooManyFlows(flows)); - } - let flow_dist = thread::distribute_flows(flows, params.threads); - let mut port_dist = Vec::new(); - for i in 0..params.threads { - port_dist.insert(i as usize, flow_dist[..i as usize].iter().sum()); - } - - let port_dist = port_dist; let thread_metrics = Arc::new( (0..params.threads) @@ -250,14 +237,12 @@ pub(crate) async fn run_client( let mut worker_handles = Vec::with_capacity(params.threads as usize); for i in 0..params.threads { let thread_flows = flow_dist[i as usize]; - let ports_base = port_dist[i as usize]; let flag = Arc::clone(&shutdown_flag); let tm = Arc::clone(&thread_metrics); let handle = thread::spawn_named(&format!("{thread_prefix}-client-{i}"), move || { client_thread_main( data_addr, thread_flows, - ports_base, request_size, response_size, no_delay, @@ -354,18 +339,10 @@ fn new_generator_socket(addr: SocketAddr) -> io::Result { /// Blocking connect to `addr` using a port-range-constrained generator socket. /// -/// When `port` is `Some`, the socket is bound to that local source port (on the -/// wildcard address for `addr`'s family) before connecting; otherwise the -/// kernel picks a source port from the constrained range. -fn generator_connect_blocking(addr: SocketAddr, port: Option) -> io::Result { +/// The kernel picks the source port from the constrained ephemeral range (see +/// [`set_local_port_range`]); the generator does not manage source ports itself. +fn generator_connect_blocking(addr: SocketAddr) -> io::Result { let socket = new_generator_socket(addr)?; - if let Some(port) = port { - let bind_addr = match addr { - SocketAddr::V4(_) => SocketAddr::from((net::Ipv4Addr::UNSPECIFIED, port)), - SocketAddr::V6(_) => SocketAddr::from((net::Ipv6Addr::UNSPECIFIED, port)), - }; - socket.bind(&bind_addr.into())?; - } socket.connect(&addr.into())?; Ok(net::TcpStream::from(socket)) } @@ -374,18 +351,10 @@ fn generator_connect_blocking(addr: SocketAddr, port: Option) -> io::Result /// socket, returning a mio stream whose connect is in progress (completion is /// signalled by a `WRITABLE` readiness event). /// -/// When `port` is `Some`, the socket is bound to that local source port (on the -/// wildcard address for `addr`'s family) before connecting; otherwise the -/// kernel picks a source port from the constrained range. -fn generator_connect_nonblocking(addr: SocketAddr, port: Option) -> io::Result { +/// The kernel picks the source port from the constrained ephemeral range (see +/// [`set_local_port_range`]); the generator does not manage source ports itself. +fn generator_connect_nonblocking(addr: SocketAddr) -> io::Result { let socket = new_generator_socket(addr)?; - if let Some(port) = port { - let bind_addr = match addr { - SocketAddr::V4(_) => SocketAddr::from((net::Ipv4Addr::UNSPECIFIED, port)), - SocketAddr::V6(_) => SocketAddr::from((net::Ipv6Addr::UNSPECIFIED, port)), - }; - socket.bind(&bind_addr.into())?; - } socket.set_nonblocking(true)?; // A non-blocking connect reports in-progress as EINPROGRESS / WouldBlock; // that is expected and not an error. @@ -402,7 +371,6 @@ fn generator_connect_nonblocking(addr: SocketAddr, port: Option) -> io::Res fn client_thread_main( addr: SocketAddr, num_flows: u16, - ports_base: u16, request_size: usize, response_size: usize, no_delay: bool, @@ -417,22 +385,9 @@ fn client_thread_main( let mut flows: FlowMap = FlowMap::new(num_flows as usize); let mut next_token: usize = 0; - let mut f = 0; - let mut backwards = 0; - loop { - if f >= num_flows { - break; - } - - f += 1; - + for _ in 0..num_flows { let token = Token(next_token); - let port = if backwards > 0 { - LOCAL_PORT_HIGH - ports_base - f - (backwards - 1) - } else { - LOCAL_PORT_LOW + ports_base + f - }; - match generator_connect_blocking(addr, Some(port)) { + match generator_connect_blocking(addr) { Ok(std_stream) => { let _ = std_stream.set_nodelay(no_delay); std_stream @@ -447,22 +402,15 @@ fn client_thread_main( .insert(Flow { stream, token, - port, state: ClientState::SendRequest, xfer: request_size, }) .expect("client should never be able to exceed FlowMap capacity"); metrics.connections_initiated.add(1); - backwards = 0; } Err(e) => { - if e.kind() == ErrorKind::AddrInUse && backwards < num_flows { - f -= 1; - backwards += 1; - } else { - warn!("connection to {addr} failed from {port}: {e}"); - metrics.connections_failed.add(1); - } + warn!("connection to {addr} failed: {e}"); + metrics.connections_failed.add(1); } } } @@ -504,29 +452,25 @@ fn apply_client_action( } } ClientAction::Reconnect => { - // Take the flow out of the map so the old socket can be fully - // closed before the new connection binds to the same source port. - // The old socket holds `port` until its fd is dropped, so a bind - // to `port` while it is still open would fail with EADDRINUSE. - let Some(flow) = flows.remove(token) else { + // Take the flow out of the map so the old socket is fully closed + // before the new connection is opened. + let Some(mut flow) = flows.remove(token) else { return; }; - let Flow { - mut stream, port, .. - } = flow; - let _ = registry.deregister(&mut stream); + let _ = registry.deregister(&mut flow.stream); { // Abortive close: SO_LINGER with a zero timeout makes the drop // below emit a RST instead of a FIN, so the socket skips - // TIME_WAIT and `port` is released immediately for the bind. - let sock = socket2::SockRef::from(&stream); + // TIME_WAIT and its source port returns to the constrained + // ephemeral range immediately rather than lingering 2*MSL. + let sock = socket2::SockRef::from(&flow.stream); if let Err(e) = sock.set_linger(Some(Duration::from_secs(0))) { warn!("failed to set SO_LINGER for abortive close on reconnect: {e}"); } } - // Drop the old stream now (sends RST, frees `port`) before binding. - drop(stream); - match generator_connect_nonblocking(addr, Some(port)) { + // Drop the old stream now (sends RST) before the new connect. + drop(flow); + match generator_connect_nonblocking(addr) { Ok(mut new_stream) => { { let sock = socket2::SockRef::from(&new_stream); @@ -540,7 +484,6 @@ fn apply_client_action( } else if let Err(err) = flows.insert(Flow { stream: new_stream, token, - port, state: ClientState::Connecting, xfer: 0, }) { @@ -952,7 +895,6 @@ fn server_thread_main( token, state: ServerState::RecvRequest, xfer: request_size, - port: 0, }) { warn!( "failed to insert flow in server FlowMap: {err} {0}",