diff --git a/lading/src/bin/payloadtool.rs b/lading/src/bin/payloadtool.rs index 0e90d2a9b..5596b6915 100644 --- a/lading/src/bin/payloadtool.rs +++ b/lading/src/bin/payloadtool.rs @@ -486,6 +486,12 @@ fn check_generator(config: &generator::Config, args: &Args) -> Result { + if args.fingerprint { + return Ok(None); + } + unimplemented!("TcpCrr not supported") + } generator::Inner::TcpRr(_) => { if args.fingerprint { return Ok(None); diff --git a/lading/src/blackhole.rs b/lading/src/blackhole.rs index e23db4c69..1d130598f 100644 --- a/lading/src/blackhole.rs +++ b/lading/src/blackhole.rs @@ -15,6 +15,7 @@ pub mod otlp; pub mod splunk_hec; pub mod sqs; pub mod tcp; +pub mod tcp_crr; pub mod tcp_rr; pub mod udp; pub mod unix_datagram; @@ -26,6 +27,9 @@ pub enum Error { /// See [`crate::blackhole::tcp::Error`] for details. #[error(transparent)] Tcp(tcp::Error), + /// See [`crate::blackhole::tcp_crr::Error`] for details. + #[error(transparent)] + TcpCrr(tcp_crr::Error), /// See [`crate::blackhole::tcp_rr::Error`] for details. #[error(transparent)] TcpRr(tcp_rr::Error), @@ -87,6 +91,8 @@ pub struct General { pub enum Inner { /// See [`crate::blackhole::tcp::Config`] for details. Tcp(tcp::Config), + /// See [`crate::blackhole::tcp_crr::Config`] for details. + TcpCrr(tcp_crr::Config), /// See [`crate::blackhole::tcp_rr::Config`] for details. TcpRr(tcp_rr::Config), /// See [`crate::blackhole::datadog::Config`] for details. @@ -117,6 +123,8 @@ pub enum Inner { pub enum Server { /// See [`crate::blackhole::tcp::Tcp`] for details. Tcp(tcp::Tcp), + /// See [`crate::blackhole::tcp_crr::TcpCrr`] for details. + TcpCrr(tcp_crr::TcpCrr), /// See [`crate::blackhole::tcp_rr::TcpRr`] for details. TcpRr(tcp_rr::TcpRr), /// See [`crate::blackhole::datadog::Datadog`] for details. @@ -152,6 +160,9 @@ impl Server { pub fn new(config: Config, shutdown: lading_signal::Watcher) -> Result { let server = match config.inner { Inner::Tcp(conf) => Self::Tcp(tcp::Tcp::new(config.general, &conf, shutdown)), + Inner::TcpCrr(conf) => { + Self::TcpCrr(tcp_crr::TcpCrr::new(config.general, &conf, shutdown)) + } Inner::TcpRr(conf) => Self::TcpRr(tcp_rr::TcpRr::new(config.general, &conf, shutdown)), Inner::Datadog(conf) => { Self::Datadog(datadog::Datadog::new(config.general, conf, shutdown)) @@ -194,6 +205,7 @@ impl Server { pub async fn run(self) -> Result<(), Error> { match self { Server::Tcp(inner) => inner.run().await.map_err(Error::Tcp), + Server::TcpCrr(inner) => inner.run().await.map_err(Error::TcpCrr), Server::TcpRr(inner) => inner.run().await.map_err(Error::TcpRr), Server::Datadog(inner) => inner.run().await.map_err(Error::Datadog), Server::DatadogStatefulLogs(inner) => { diff --git a/lading/src/blackhole/tcp_crr.rs b/lading/src/blackhole/tcp_crr.rs new file mode 100644 index 000000000..fb437df7c --- /dev/null +++ b/lading/src/blackhole/tcp_crr.rs @@ -0,0 +1,144 @@ +//! TCP connect/request/response (`tcp_crr`) blackhole - the server side. +//! Based on +//! +//! Listens for incoming connections and, for each flow, reads a fixed-size +//! request then writes a fixed-size response. The CRR client closes the +//! connection after each response; the server side is identical to `tcp_rr` +//! and delegates to the same shared machinery. +//! +//! The event-loop machinery lives in [`crate::neper::rr`]; this module is a +//! thin wrapper that supplies configuration. +//! +//! ## Metrics +//! +//! `connections_accepted`: Incoming connections accepted +//! `requests_received`: Completed request reads +//! `responses_sent`: Completed response writes +//! `bytes_received`: Request bytes read +//! `bytes_written`: Response bytes sent +//! `connections_closed`: Flow removals (client close + I/O errors) + +use std::net::{IpAddr, SocketAddr}; +use std::num::{NonZeroU16, NonZeroUsize}; + +use serde::{Deserialize, Serialize}; + +use super::General; +use crate::neper::rr::{self, Mode, ServerParams}; + +fn default_nonzero_u16() -> NonZeroU16 { + NonZeroU16::new(1).expect("1 is nonzero") +} + +fn default_nonzero_usize() -> NonZeroUsize { + NonZeroUsize::new(1).expect("1 is nonzero") +} + +fn default_control_port() -> u16 { + 12866 +} + +fn default_data_port() -> u16 { + 12867 +} + +fn default_backlog() -> i32 { + 1024 +} + +const fn default_true() -> bool { + true +} + +#[derive(Debug, Deserialize, Serialize, Clone, Copy, PartialEq, Eq)] +#[serde(deny_unknown_fields)] +/// Configuration for the `tcp_crr` blackhole. +pub struct Config { + /// IP address to bind on. + pub addr: IpAddr, + /// Data port for flow connections. Default 12867. + #[serde(default = "default_data_port")] + pub data_port: u16, + /// Control port for startup synchronization with the generator. Default 12866. + #[serde(default = "default_control_port")] + pub control_port: u16, + /// Number of OS server threads. Default 1. When > 1, uses `SO_REUSEPORT` + /// with an eBPF program for load balancing. + #[serde(default = "default_nonzero_u16")] + pub threads: NonZeroU16, + /// Total number of TCP flows the generator should open (neper `-F`). + /// Default 1. Sent to the generator over the control connection at + /// startup; the generator does not configure this independently. + #[serde(default = "default_nonzero_u16")] + pub flows: NonZeroU16, + /// Bytes to read per request. Default 1. + #[serde(default = "default_nonzero_usize")] + pub request_size: NonZeroUsize, + /// Bytes to send per response. Default 1. + #[serde(default = "default_nonzero_usize")] + pub response_size: NonZeroUsize, + /// Whether to set `TCP_NODELAY` on accepted connections. Default true. + #[serde(default = "default_true")] + pub no_delay: bool, + /// Listener backlog (pending-connection queue length) passed to `listen(2)`. + /// Default 1024. CRR workloads benefit from a larger backlog to absorb + /// connect bursts. + #[serde(default = "default_backlog")] + pub backlog: i32, +} + +#[derive(thiserror::Error, Debug)] +/// Errors produced by [`TcpCrr`]. +pub enum Error { + /// Shared neper-style request/response error. + #[error(transparent)] + Rr(#[from] rr::Error), +} + +#[derive(Debug)] +/// The `tcp_crr` blackhole (server side). +pub struct TcpCrr { + config: Config, + metric_labels: Vec<(String, String)>, + shutdown: lading_signal::Watcher, +} + +impl TcpCrr { + /// Create a new [`TcpCrr`] blackhole instance. + #[must_use] + pub fn new(general: General, config: &Config, shutdown: lading_signal::Watcher) -> Self { + let mut metric_labels = vec![ + ("component".to_string(), "blackhole".to_string()), + ("component_name".to_string(), "tcp_crr".to_string()), + ]; + if let Some(id) = general.id { + metric_labels.push(("id".to_string(), id)); + } + Self { + config: *config, + metric_labels, + shutdown, + } + } + + /// Run the blackhole to completion or until a shutdown signal is received. + /// + /// # Errors + /// + /// Returns an error if binding fails or a worker thread panics. + pub async fn run(self) -> Result<(), Error> { + let params = ServerParams { + data_addr: SocketAddr::new(self.config.addr, self.config.data_port), + control_addr: SocketAddr::new(self.config.addr, self.config.control_port), + threads: self.config.threads.get(), + flows: self.config.flows.get(), + request_size: self.config.request_size.get(), + response_size: self.config.response_size.get(), + no_delay: self.config.no_delay, + backlog: self.config.backlog, + mode: Mode::Crr, + }; + rr::run_server(params, self.metric_labels, self.shutdown, "tcp_crr").await?; + Ok(()) + } +} diff --git a/lading/src/blackhole/tcp_rr.rs b/lading/src/blackhole/tcp_rr.rs index f6aefb1f0..9da929ace 100644 --- a/lading/src/blackhole/tcp_rr.rs +++ b/lading/src/blackhole/tcp_rr.rs @@ -1,4 +1,4 @@ -//! TCP request/response (`tcp_rr`) blackhole — the server side. +//! TCP request/response (`tcp_rr`) blackhole - the server side. //! Based on //! //! Listens for incoming connections and, for each flow, reads a fixed-size @@ -22,7 +22,7 @@ use std::num::{NonZeroU16, NonZeroUsize}; use serde::{Deserialize, Serialize}; use super::General; -use crate::neper::rr::{self, ServerParams}; +use crate::neper::rr::{self, Mode, ServerParams}; fn default_nonzero_u16() -> NonZeroU16 { NonZeroU16::new(1).expect("1 is nonzero") @@ -133,6 +133,7 @@ impl TcpRr { response_size: self.config.response_size.get(), no_delay: self.config.no_delay, backlog: self.config.backlog, + mode: Mode::Rr, }; rr::run_server(params, self.metric_labels, self.shutdown, "tcp_rr").await?; Ok(()) diff --git a/lading/src/generator.rs b/lading/src/generator.rs index 0d2744680..f2ea9490f 100644 --- a/lading/src/generator.rs +++ b/lading/src/generator.rs @@ -27,6 +27,7 @@ pub mod process_tree; pub mod procfs; pub mod splunk_hec; pub mod tcp; +pub mod tcp_crr; pub mod tcp_rr; pub mod trace_agent; pub mod udp; @@ -39,6 +40,9 @@ pub enum Error { /// See [`crate::generator::tcp::Error`] for details. #[error(transparent)] Tcp(#[from] tcp::Error), + /// See [`crate::generator::tcp_crr::Error`] for details. + #[error(transparent)] + TcpCrr(#[from] tcp_crr::Error), /// See [`crate::generator::tcp_rr::Error`] for details. #[error(transparent)] TcpRr(#[from] tcp_rr::Error), @@ -115,6 +119,8 @@ pub struct General { pub enum Inner { /// See [`crate::generator::tcp::Config`] for details. Tcp(tcp::Config), + /// See [`crate::generator::tcp_crr::Config`] for details. + TcpCrr(tcp_crr::Config), /// See [`crate::generator::tcp_rr::Config`] for details. TcpRr(tcp_rr::Config), /// See [`crate::generator::udp::Config`] for details. @@ -156,6 +162,8 @@ pub enum Inner { pub enum Server { /// See [`crate::generator::tcp::Tcp`] for details. Tcp(tcp::Tcp), + /// See [`crate::generator::tcp_crr::TcpCrr`] for details. + TcpCrr(tcp_crr::TcpCrr), /// See [`crate::generator::tcp_rr::TcpRr`] for details. TcpRr(tcp_rr::TcpRr), /// See [`crate::generator::udp::Udp`] for details. @@ -201,6 +209,9 @@ impl Server { pub fn new(config: Config, shutdown: lading_signal::Watcher) -> Result { let srv = match config.inner { Inner::Tcp(conf) => Self::Tcp(tcp::Tcp::new(config.general, &conf, shutdown)?), + Inner::TcpCrr(conf) => { + Self::TcpCrr(tcp_crr::TcpCrr::new(config.general, &conf, shutdown)) + } Inner::TcpRr(conf) => Self::TcpRr(tcp_rr::TcpRr::new(config.general, &conf, shutdown)), Inner::Udp(conf) => Self::Udp(udp::Udp::new(config.general, &conf, shutdown)?), Inner::Http(conf) => Self::Http(http::Http::new(config.general, conf, shutdown)?), @@ -276,6 +287,7 @@ impl Server { match self { Server::Tcp(inner) => inner.spin().await?, + Server::TcpCrr(inner) => inner.spin().await?, Server::TcpRr(inner) => inner.spin().await?, Server::Udp(inner) => inner.spin().await?, Server::Http(inner) => inner.spin().await?, diff --git a/lading/src/generator/tcp_crr.rs b/lading/src/generator/tcp_crr.rs new file mode 100644 index 000000000..ceb5e2888 --- /dev/null +++ b/lading/src/generator/tcp_crr.rs @@ -0,0 +1,131 @@ +//! TCP connect/request/response (`tcp_crr`) generator — the client side. +//! Based on +//! +//! Implements neper's `tcp_crr` protocol: each flow connects, sends a +//! fixed-size request, reads a fixed-size response, closes the connection, +//! then reconnects and repeats. This measures connection-establishment rate +//! end-to-end, including kernel and TCP-handshake overhead. +//! +//! The event-loop machinery lives in [`crate::neper::rr`]; this module is a +//! thin wrapper that supplies configuration and selects [`Mode::Crr`]. +//! +//! ## Metrics +//! +//! `connections_initiated`: Successful client-side connect completions +//! `requests_sent`: Completed request writes +//! `responses_received`: Completed response reads +//! `bytes_written`: Request bytes sent +//! `bytes_read`: Response bytes received +//! `connections_failed`: Failed connection attempts + +use std::net::{IpAddr, SocketAddr}; +use std::num::{NonZeroU16, NonZeroUsize}; + +use serde::{Deserialize, Serialize}; + +use super::General; +use crate::generator::common::MetricsBuilder; +use crate::neper::rr::{self, ClientParams, Mode}; + +fn default_nonzero_u16() -> NonZeroU16 { + NonZeroU16::new(1).expect("1 is nonzero") +} + +fn default_nonzero_usize() -> NonZeroUsize { + NonZeroUsize::new(1).expect("1 is nonzero") +} + +const fn default_true() -> bool { + true +} + +fn default_control_port() -> u16 { + 12866 +} + +fn default_data_port() -> u16 { + 12867 +} + +#[derive(Debug, Deserialize, Serialize, PartialEq, Clone)] +#[serde(deny_unknown_fields)] +/// Configuration for the `tcp_crr` generator. +/// +/// Flow count (neper `-F`) is *not* configured here — it is owned by the +/// `tcp_crr` blackhole and communicated to the generator over the control +/// port during startup. +pub struct Config { + /// The IP address of the `tcp_crr` server. + pub addr: String, + /// Data port for flow connections. Default 12867. + #[serde(default = "default_data_port")] + pub data_port: u16, + /// Control port for startup synchronization with the blackhole. Default 12866. + #[serde(default = "default_control_port")] + pub control_port: u16, + /// Number of OS threads (neper -T). Default 1. + #[serde(default = "default_nonzero_u16")] + pub threads: NonZeroU16, + /// Bytes per request. Default 1. + #[serde(default = "default_nonzero_usize")] + pub request_size: NonZeroUsize, + /// Bytes per response to read back. Default 1. + #[serde(default = "default_nonzero_usize")] + pub response_size: NonZeroUsize, + /// Whether to set `TCP_NODELAY` on connections. Default true. + #[serde(default = "default_true")] + pub no_delay: bool, +} + +#[derive(thiserror::Error, Debug)] +/// Errors produced by [`TcpCrr`]. +pub enum Error { + /// Shared neper-style request/response error. + #[error(transparent)] + Rr(#[from] rr::Error), +} + +#[derive(Debug)] +/// The `tcp_crr` generator (client side). +pub struct TcpCrr { + config: Config, + metric_labels: Vec<(String, String)>, + shutdown: lading_signal::Watcher, +} + +impl TcpCrr { + /// Create a new [`TcpCrr`] generator instance. + #[must_use] + pub fn new(general: General, config: &Config, shutdown: lading_signal::Watcher) -> Self { + let metric_labels = MetricsBuilder::new("tcp_crr").with_id(general.id).build(); + Self { + config: config.clone(), + metric_labels, + shutdown, + } + } + + /// Run the generator to completion or until a shutdown signal is received. + /// + /// # Errors + /// + /// Returns an error if a worker thread panics or configuration is invalid. + /// + /// # Panics + /// + /// Panics if `addr` cannot be parsed as an IP address. + pub async fn spin(self) -> Result<(), Error> { + let ip: IpAddr = self.config.addr.parse().expect("invalid addr"); + let params = ClientParams { + data_addr: SocketAddr::new(ip, self.config.data_port), + control_addr: SocketAddr::new(ip, self.config.control_port), + threads: self.config.threads.get(), + request_size: self.config.request_size.get(), + response_size: self.config.response_size.get(), + no_delay: self.config.no_delay, + mode: Mode::Crr, + }; + rr::run_client(params, self.metric_labels, self.shutdown, "tcp_crr").await?; + Ok(()) + } +} diff --git a/lading/src/generator/tcp_rr.rs b/lading/src/generator/tcp_rr.rs index d3f2e75b8..2185e751d 100644 --- a/lading/src/generator/tcp_rr.rs +++ b/lading/src/generator/tcp_rr.rs @@ -23,7 +23,7 @@ use serde::{Deserialize, Serialize}; use super::General; use crate::generator::common::MetricsBuilder; -use crate::neper::rr::{self, ClientParams}; +use crate::neper::rr::{self, ClientParams, Mode}; fn default_nonzero_u16() -> NonZeroU16 { NonZeroU16::new(1).expect("1 is nonzero") @@ -121,6 +121,7 @@ impl TcpRr { request_size: self.config.request_size.get(), response_size: self.config.response_size.get(), no_delay: self.config.no_delay, + mode: Mode::Rr, }; rr::run_client(params, self.metric_labels, self.shutdown, "tcp_rr").await?; Ok(()) diff --git a/lading/src/neper/flow.rs b/lading/src/neper/flow.rs index e4b95dc46..ed8af24df 100644 --- a/lading/src/neper/flow.rs +++ b/lading/src/neper/flow.rs @@ -31,34 +31,67 @@ pub(crate) enum Action { /// Token-indexed flow storage. /// /// Flows are stored in a `Vec` indexed by token value. Removed slots become -/// `None` and are not reused — tokens are monotonically increasing, matching +/// `None` and are not reused - tokens are monotonically increasing, matching /// neper's behavior. pub(crate) struct FlowMap { inner: Vec>>, } +/// Errors produced by `FlowMap`. +#[derive(thiserror::Error, Debug)] +pub(crate) enum FlowMapError { + /// No capacity + #[error("Server flow map is at capacity: {0}")] + NoCapacity(usize), +} + impl FlowMap { - pub(crate) fn new() -> Self { - Self { inner: Vec::new() } + pub(crate) fn new(flows: usize) -> Self { + Self { + inner: Vec::with_capacity(flows * 2), + } } - /// Insert a flow. Grows the backing vec if needed. - pub(crate) fn insert(&mut self, flow: Flow) { - let idx = flow.token.0; - if idx >= self.inner.len() { + /// Insert a flow. + pub(crate) fn insert(&mut self, flow: Flow) -> Result<(), FlowMapError> { + let idx = flow.token.0 % self.inner.capacity(); + if self.inner.len() <= idx { self.inner.resize_with(idx + 1, || None); } - self.inner[idx] = Some(flow); + if self.inner[idx].is_none() { + self.inner[idx] = Some(flow); + return Ok(()); + } + + Err(FlowMapError::NoCapacity(self.inner.capacity())) } /// Get a mutable reference to the flow at the given token. + /// + /// Returns `None` for an empty slot. A slot occupied by a flow with a + /// different token indicates a token-collision bug (tokens congruent + /// modulo capacity share a slot), so it is asserted against. pub(crate) fn get_mut(&mut self, token: Token) -> Option<&mut Flow> { - self.inner.get_mut(token.0).and_then(|slot| slot.as_mut()) + let idx = token.0 % self.inner.capacity(); + let flow = self.inner.get_mut(idx).and_then(|slot| slot.as_mut())?; + assert_eq!( + flow.token, token, + "FlowMap slot {idx} holds a flow with a mismatched token" + ); + Some(flow) } /// Remove and return the flow at the given token. + /// + /// Returns `None` for an empty slot; asserts the occupant's token matches. pub(crate) fn remove(&mut self, token: Token) -> Option> { - self.inner.get_mut(token.0).and_then(Option::take) + let idx = token.0 % self.inner.capacity(); + let flow = self.inner.get_mut(idx).and_then(Option::take)?; + assert_eq!( + flow.token, token, + "FlowMap slot {idx} holds a flow with a mismatched token" + ); + Some(flow) } } diff --git a/lading/src/neper/rr.rs b/lading/src/neper/rr.rs index 0f3d269dd..99c328042 100644 --- a/lading/src/neper/rr.rs +++ b/lading/src/neper/rr.rs @@ -16,7 +16,7 @@ use std::time::{Duration, Instant}; use mio::net::{TcpListener, TcpStream}; use mio::{Events, Interest, Poll, Token}; use tokio::sync::mpsc; -use tracing::{info, trace, warn}; +use tracing::{info, warn}; use crate::neper::bpf; use crate::neper::flow::{self, Action, Flow, FlowMap}; @@ -46,9 +46,18 @@ pub enum Error { Config(String), } +/// Which neper-style protocol the client is driving. +#[derive(Clone, Copy, Debug)] +pub(crate) enum Mode { + /// `tcp_rr`: persistent connection, request/response loop forever. + Rr, + /// `tcp_crr`: connect, request/response, close, reconnect, repeat. + Crr, +} + /// Parameters for [`run_client`]. /// -/// Flow count is *not* a client parameter — it is owned by the server and +/// Flow count is *not* a client parameter - it is owned by the server and /// communicated to the client over the control connection during startup. pub(crate) struct ClientParams { /// Address of the server's data port. @@ -63,6 +72,8 @@ pub(crate) struct ClientParams { pub(crate) response_size: usize, /// Whether to set `TCP_NODELAY`. pub(crate) no_delay: bool, + /// RR or CRR. + pub(crate) mode: Mode, } /// Parameters for [`run_server`]. @@ -84,23 +95,42 @@ pub(crate) struct ServerParams { pub(crate) no_delay: bool, /// Listener backlog. pub(crate) backlog: i32, + /// RR or CRR. + pub(crate) mode: Mode, } enum ClientState { + /// CRR only: waiting for a non-blocking `connect(2)` to complete. The + /// `WRITABLE` readiness event signals connect completion; `take_error()` + /// then tells us if it succeeded. + Connecting, SendRequest, RecvResponse, } +/// Actions returned by [`handle_client_event`]. Supersets [`flow::Action`] +/// with [`ClientAction::Reconnect`] for CRR's per-transaction reconnect. +#[derive(Clone, Copy)] +enum ClientAction { + Continue, + Reregister(Interest), + /// CRR: response complete - close this socket and open a new one with the + /// same `Token`. Handled by [`apply_client_action`]. + Reconnect, + Remove, +} + enum ServerState { RecvRequest, SendResponse, + CloseStream, } const LISTENER_TOKEN: Token = Token(0); /// Control-channel handshake: server writes `flows` to the accepted control /// connection as a 2-byte big-endian `u16` and closes; client reads the same -/// 2 bytes after connecting. Internal protocol — no magic / version byte. +/// 2 bytes after connecting. Internal protocol - no magic / version byte. const HANDSHAKE_LEN: usize = 2; const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(5); @@ -116,6 +146,7 @@ const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(5); /// /// Returns an error if configuration is invalid, the blackhole control port /// is never reachable, or a worker thread panics. +#[allow(clippy::too_many_lines)] pub(crate) async fn run_client( params: ClientParams, metric_labels: Vec<(String, String)>, @@ -149,7 +180,7 @@ pub(crate) async fn run_client( ), ))); } - match net::TcpStream::connect(params.control_addr) { + match generator_connect_blocking(params.control_addr) { Ok(mut conn) => { conn.set_read_timeout(Some(HANDSHAKE_TIMEOUT)) .expect("set_read_timeout on connected TcpStream must succeed"); @@ -202,6 +233,7 @@ pub(crate) async fn run_client( let request_size = params.request_size; let response_size = params.response_size; let no_delay = params.no_delay; + let mode = params.mode; let mut worker_handles = Vec::with_capacity(params.threads as usize); for i in 0..params.threads { let thread_flows = flow_dist[i as usize]; @@ -214,6 +246,7 @@ pub(crate) async fn run_client( request_size, response_size, no_delay, + mode, &flag, &tm[i as usize], ); @@ -231,12 +264,117 @@ pub(crate) async fn run_client( Ok(()) } +/// `IP_LOCAL_PORT_RANGE` socket option (Linux >= 6.3). Not yet exposed by the +/// `libc` crate, so it is defined here from ``. +const IP_LOCAL_PORT_RANGE: libc::c_int = 51; +/// Lowest ephemeral local port the generator may use for its sockets. +const LOCAL_PORT_LOW: u16 = 1024; +/// Highest ephemeral local port the generator may use for its sockets. +const LOCAL_PORT_HIGH: u16 = 60999; + +/// Set once the kernel is found not to support `IP_LOCAL_PORT_RANGE`, so the +/// "unsupported, falling back" warning is logged a single time rather than on +/// every socket the generator opens. +static PORT_RANGE_UNSUPPORTED: AtomicBool = AtomicBool::new(false); + +/// Increase `socket`'s automatic source-port selection to +/// `[LOCAL_PORT_LOW, LOCAL_PORT_HIGH]` via `IP_LOCAL_PORT_RANGE`. The option +/// value packs the high port in the upper 16 bits and the low port in the +/// lower 16 bits. +/// +/// This is done to reduce `EADDRNOTAVAIL` errors when a large number of flows are +/// created especially for `tcp_crr` workload. +/// Since port ranges are specific to network namespaces, this should not cause issues +/// for other daemons coming online on lower port ranges when lading is launched in its own +/// namespace. +fn set_local_port_range(socket: &socket2::Socket) -> io::Result<()> { + let value: u32 = (u32::from(LOCAL_PORT_HIGH) << 16) | u32::from(LOCAL_PORT_LOW); + // SAFETY: `socket` owns a valid fd for the duration of the borrow, and we + // pass a pointer to a correctly sized `u32` as the option value, exactly as + // `IP_LOCAL_PORT_RANGE` expects. + let ret = unsafe { + libc::setsockopt( + socket.as_raw_fd(), + libc::IPPROTO_IP, + IP_LOCAL_PORT_RANGE, + std::ptr::addr_of!(value).cast::(), + std::mem::size_of::() + .try_into() + .expect("u32 size fits in socklen_t"), + ) + }; + if ret != 0 { + let err = io::Error::last_os_error(); + // ENOPROTOOPT / EOPNOTSUPP means the running kernel predates + // IP_LOCAL_PORT_RANGE (< 6.3). Degrade gracefully: fall back to the + // system-wide ephemeral range rather than failing the connection. + if matches!( + err.raw_os_error(), + Some(libc::ENOPROTOOPT | libc::EOPNOTSUPP) + ) { + if !PORT_RANGE_UNSUPPORTED.swap(true, Relaxed) { + warn!( + "IP_LOCAL_PORT_RANGE not supported by this kernel; \ + falling back to the system ephemeral port range" + ); + } + return Ok(()); + } + return Err(err); + } + Ok(()) +} + +/// Create a TCP socket for the generator with its local port range constrained +/// to `[LOCAL_PORT_LOW, LOCAL_PORT_HIGH]`. +fn new_generator_socket(addr: SocketAddr) -> io::Result { + let socket = socket2::Socket::new( + socket2::Domain::for_address(addr), + socket2::Type::STREAM, + Some(socket2::Protocol::TCP), + )?; + set_local_port_range(&socket)?; + Ok(socket) +} + +/// Blocking connect to `addr` using a port-range-constrained generator socket. +/// +/// The kernel picks the source port from the constrained ephemeral range (see +/// [`set_local_port_range`]); the generator does not manage source ports itself. +fn generator_connect_blocking(addr: SocketAddr) -> io::Result { + let socket = new_generator_socket(addr)?; + socket.connect(&addr.into())?; + Ok(net::TcpStream::from(socket)) +} + +/// Non-blocking connect to `addr` using a port-range-constrained generator +/// socket, returning a mio stream whose connect is in progress (completion is +/// signalled by a `WRITABLE` readiness event). +/// +/// The kernel picks the source port from the constrained ephemeral range (see +/// [`set_local_port_range`]); the generator does not manage source ports itself. +fn generator_connect_nonblocking(addr: SocketAddr) -> io::Result { + let socket = new_generator_socket(addr)?; + socket.set_nonblocking(true)?; + // A non-blocking connect reports in-progress as EINPROGRESS / WouldBlock; + // that is expected and not an error. + match socket.connect(&addr.into()) { + Ok(()) => {} + Err(e) if e.raw_os_error() == Some(libc::EINPROGRESS) => {} + Err(e) if e.kind() == ErrorKind::WouldBlock => {} + Err(e) => return Err(e), + } + Ok(TcpStream::from_std(net::TcpStream::from(socket))) +} + +#[allow(clippy::too_many_arguments)] fn client_thread_main( addr: SocketAddr, num_flows: u16, request_size: usize, response_size: usize, no_delay: bool, + mode: Mode, shutdown_flag: &AtomicBool, metrics: &ThreadMetrics, ) { @@ -244,31 +382,34 @@ fn client_thread_main( let mut events = Events::with_capacity(num_flows as usize); let request_buf = vec![0u8; request_size]; let mut response_buf = vec![0u8; response_size]; - let mut flows: FlowMap = FlowMap::new(); + let mut flows: FlowMap = FlowMap::new(num_flows as usize); let mut next_token: usize = 0; for _ in 0..num_flows { - match net::TcpStream::connect(addr) { + let token = Token(next_token); + match generator_connect_blocking(addr) { Ok(std_stream) => { let _ = std_stream.set_nodelay(no_delay); std_stream .set_nonblocking(true) .expect("failed to set nonblocking"); let mut stream = TcpStream::from_std(std_stream); - let token = Token(next_token); next_token += 1; poll.registry() .register(&mut stream, token, Interest::WRITABLE) .expect("failed to register flow"); - flows.insert(Flow { - stream, - token, - state: ClientState::SendRequest, - xfer: request_size, - }); + flows + .insert(Flow { + stream, + token, + state: ClientState::SendRequest, + xfer: request_size, + }) + .expect("client should never be able to exceed FlowMap capacity"); + metrics.connections_initiated.add(1); } Err(e) => { - trace!("connection to {addr} failed: {e}"); + warn!("connection to {addr} failed: {e}"); metrics.connections_failed.add(1); } } @@ -284,19 +425,128 @@ fn client_thread_main( let Some(fl) = flows.get_mut(token) else { continue; }; - let action = handle_client_event(fl, &request_buf, &mut response_buf, metrics); - flow::apply_action(action, token, &mut flows, poll.registry()); + let action = handle_client_event(fl, mode, &request_buf, &mut response_buf, metrics); + apply_client_action(action, token, &mut flows, &poll, addr, no_delay, metrics); + } + } +} + +/// Apply a [`ClientAction`] to the flow map. Handles the CRR +/// reconnect transition (deregister old stream, open a new +/// non-blocking connect, reregister with the same token). +fn apply_client_action( + action: ClientAction, + token: Token, + flows: &mut FlowMap, + poll: &Poll, + addr: SocketAddr, + no_delay: bool, + metrics: &ThreadMetrics, +) { + let registry = poll.registry(); + match action { + ClientAction::Continue => {} + ClientAction::Reregister(interest) => { + if let Some(flow) = flows.get_mut(token) { + let _ = registry.reregister(&mut flow.stream, flow.token, interest); + } + } + ClientAction::Reconnect => { + // Take the flow out of the map so the old socket is fully closed + // before the new connection is opened. + let Some(mut flow) = flows.remove(token) else { + return; + }; + let _ = registry.deregister(&mut flow.stream); + { + // Abortive close: SO_LINGER with a zero timeout makes the drop + // below emit a RST instead of a FIN, so the socket skips + // TIME_WAIT and its source port returns to the constrained + // ephemeral range immediately rather than lingering 2*MSL. + let sock = socket2::SockRef::from(&flow.stream); + if let Err(e) = sock.set_linger(Some(Duration::from_secs(0))) { + warn!("failed to set SO_LINGER for abortive close on reconnect: {e}"); + } + } + // Drop the old stream now (sends RST) before the new connect. + drop(flow); + match generator_connect_nonblocking(addr) { + Ok(mut new_stream) => { + { + let sock = socket2::SockRef::from(&new_stream); + if let Err(e) = sock.set_tcp_nodelay(no_delay) { + warn!("failed to set TCP_NODELAY on reconnect: {e}"); + } + } + if let Err(e) = registry.register(&mut new_stream, token, Interest::WRITABLE) { + warn!("reconnect register failed: {e}"); + metrics.connections_failed.add(1); + } else if let Err(err) = flows.insert(Flow { + stream: new_stream, + token, + state: ClientState::Connecting, + xfer: 0, + }) { + warn!("failed to reinsert reconnected flow: {err}"); + metrics.connections_failed.add(1); + } + } + Err(e) => { + warn!("reconnect to {addr} failed: {e}"); + metrics.connections_failed.add(1); + } + } + } + ClientAction::Remove => { + metrics.connections_closed.add(1); + if let Some(mut flow) = flows.remove(token) { + let _ = registry.deregister(&mut flow.stream); + // Abortive close (RST) so the source port skips TIME_WAIT and + // is reclaimed immediately rather than lingering 2*MSL. + let sock = socket2::SockRef::from(&flow.stream); + if let Err(e) = sock.set_linger(Some(Duration::from_secs(0))) { + warn!("failed to set SO_LINGER for abortive close: {e}"); + } + // `flow` (and its stream fd) dropped here -> RST sent. + } } } } fn handle_client_event( flow: &mut Flow, + mode: Mode, request_buf: &[u8], response_buf: &mut [u8], metrics: &ThreadMetrics, -) -> Action { +) -> ClientAction { + // Connecting -> SendRequest transition: mio is edge-triggered, so the + // single WRITABLE event that signaled connect completion is also the + // event that must drive the first write. Transition state and fall + // through to SendRequest in the same call. + if matches!(flow.state, ClientState::Connecting) { + match flow.stream.take_error() { + Ok(None) => { + flow.state = ClientState::SendRequest; + flow.xfer = request_buf.len(); + metrics.connections_initiated.add(1); + // fall through + } + Ok(Some(e)) => { + warn!("connect failed: {e}"); + metrics.connections_failed.add(1); + return ClientAction::Reconnect; + } + Err(e) => { + warn!("take_error failed: {e}"); + metrics.connections_failed.add(1); + return ClientAction::Reconnect; + } + } + } + match flow.state { + ClientState::Connecting => unreachable!("transitioned out of Connecting above"), ClientState::SendRequest => { let offset = request_buf.len() - flow.xfer; match flow.stream.write(&request_buf[offset..]) { @@ -307,22 +557,22 @@ fn handle_client_event( flow.state = ClientState::RecvResponse; metrics.requests_sent.add(1); metrics.bytes_written.add(request_buf.len() as u64); - Action::Reregister(Interest::READABLE) + ClientAction::Reregister(Interest::READABLE) } else { - Action::Continue + ClientAction::Continue } } - Err(e) if e.kind() == ErrorKind::WouldBlock => Action::Continue, + Err(e) if e.kind() == ErrorKind::WouldBlock => ClientAction::Continue, Err(e) => { - trace!("write error: {e}"); - Action::Remove + warn!("write error: {e}"); + ClientAction::Remove } } } ClientState::RecvResponse => { let offset = response_buf.len() - flow.xfer; match flow.stream.read(&mut response_buf[offset..]) { - Ok(0) => Action::Remove, + Ok(0) => ClientAction::Remove, Ok(n) => { flow.xfer -= n; if flow.xfer == 0 { @@ -330,15 +580,22 @@ fn handle_client_event( flow.state = ClientState::SendRequest; metrics.responses_received.add(1); metrics.bytes_read.add(response_buf.len() as u64); - Action::Reregister(Interest::WRITABLE) + match mode { + Mode::Rr => { + flow.xfer = request_buf.len(); + flow.state = ClientState::SendRequest; + ClientAction::Reregister(Interest::WRITABLE) + } + Mode::Crr => ClientAction::Reconnect, + } } else { - Action::Continue + ClientAction::Continue } } - Err(e) if e.kind() == ErrorKind::WouldBlock => Action::Continue, + Err(e) if e.kind() == ErrorKind::WouldBlock => ClientAction::Continue, Err(e) => { - trace!("read error: {e}"); - Action::Remove + warn!("read error: {e}"); + ClientAction::Remove } } } @@ -388,11 +645,16 @@ pub(crate) async fn run_server( // Pre-build thread 0's listener here so the BPF program is attached to the // reuseport group before any other thread calls bind(). This removes the - // need for a cross-thread BPF barrier — if bind fails or panics, it + // need for a cross-thread BPF barrier - if bind fails or panics, it // propagates as an error directly from this task. let binding_addr = params.data_addr; let thread0_listener = if num_threads > 1 { - Some(create_listener(0, num_threads, binding_addr, params.backlog)) + Some(create_listener( + 0, + num_threads, + binding_addr, + params.backlog, + )) } else { None }; @@ -433,6 +695,7 @@ pub(crate) async fn run_server( &flag, &tm[i as usize], tx, + params.mode, ); }); handles.push(handle); @@ -582,6 +845,7 @@ fn server_thread_main( shutdown_flag: &AtomicBool, metrics: &ThreadMetrics, ready_tx: mpsc::UnboundedSender<()>, + mode: Mode, ) { // Thread 0 uses the pre-built listener (with BPF already attached); others // bind their own sockets that join the existing reuseport group. @@ -605,7 +869,7 @@ fn server_thread_main( let mut request_buf = vec![0u8; request_size]; let response_buf = vec![0u8; response_size]; - let mut flows: FlowMap = FlowMap::new(); + let mut flows: FlowMap = FlowMap::new(num_flows as usize); let mut next_token: usize = 1; loop { @@ -623,16 +887,25 @@ fn server_thread_main( set_nodelay_mio(&stream, no_delay); let token = Token(next_token); next_token += 1; - let mut mio_stream = stream; - poll.registry() - .register(&mut mio_stream, token, Interest::READABLE) - .expect("failed to register flow"); - flows.insert(Flow { - stream: mio_stream, + // Insert first: `insert` takes the flow by value and + // drops it (closing the fd) on failure, so there is + // nothing registered to clean up on the error path. + if let Err(err) = flows.insert(Flow { + stream, token, state: ServerState::RecvRequest, xfer: request_size, - }); + }) { + warn!( + "failed to insert flow in server FlowMap: {err} {0}", + token.0 + ); + break; + } + let flow = flows.get_mut(token).expect("flow was just inserted"); + poll.registry() + .register(&mut flow.stream, token, Interest::READABLE) + .expect("failed to register flow"); metrics.connections_accepted.add(1); } Err(ref e) if e.kind() == ErrorKind::WouldBlock => break, @@ -651,7 +924,8 @@ fn server_thread_main( let Some(fl) = flows.get_mut(token) else { continue; }; - let action = handle_server_event(fl, &mut request_buf, &response_buf, metrics); + let action = + handle_server_event(fl, &mut request_buf, &response_buf, metrics, mode); flow::apply_action(action, token, &mut flows, poll.registry()); } } @@ -662,7 +936,7 @@ fn server_thread_main( fn set_nodelay_mio(stream: &TcpStream, no_delay: bool) { let sock = socket2::SockRef::from(stream); if let Err(e) = sock.set_tcp_nodelay(no_delay) { - trace!("failed to set TCP_NODELAY: {e}"); + warn!("failed to set TCP_NODELAY: {e}"); } } @@ -671,12 +945,16 @@ fn handle_server_event( request_buf: &mut [u8], response_buf: &[u8], metrics: &ThreadMetrics, + mode: Mode, ) -> Action { match flow.state { ServerState::RecvRequest => { let offset = request_buf.len() - flow.xfer; match flow.stream.read(&mut request_buf[offset..]) { - Ok(0) => Action::Remove, + Ok(0) => { + metrics.connections_closed.add(1); + Action::Remove + } Ok(n) => { flow.xfer -= n; if flow.xfer == 0 { @@ -691,7 +969,8 @@ fn handle_server_event( } Err(e) if e.kind() == ErrorKind::WouldBlock => Action::Continue, Err(e) => { - trace!("read error: {e}"); + warn!("read error: {e}"); + metrics.connections_closed.add(1); Action::Remove } } @@ -703,7 +982,10 @@ fn handle_server_event( flow.xfer -= n; if flow.xfer == 0 { flow.xfer = request_buf.len(); - flow.state = ServerState::RecvRequest; + match mode { + Mode::Rr => flow.state = ServerState::RecvRequest, + Mode::Crr => flow.state = ServerState::CloseStream, + } metrics.responses_sent.add(1); metrics.bytes_written.add(response_buf.len() as u64); Action::Reregister(Interest::READABLE) @@ -713,10 +995,12 @@ fn handle_server_event( } Err(e) if e.kind() == ErrorKind::WouldBlock => Action::Continue, Err(e) => { - trace!("write error: {e}"); + warn!("write error: {e}"); + metrics.connections_closed.add(1); Action::Remove } } } + ServerState::CloseStream => Action::Remove, } }