From 62943cbdce057160a83b436499b52c203744589b Mon Sep 17 00:00:00 2001 From: Usama Saqib Date: Fri, 29 May 2026 13:53:06 +0200 Subject: [PATCH 1/6] refactor rr --- lading/src/blackhole/tcp_rr.rs | 397 +------------------ lading/src/generator/tcp_rr.rs | 256 +----------- lading/src/neper.rs | 1 + lading/src/neper/metrics.rs | 2 + lading/src/neper/rr.rs | 691 +++++++++++++++++++++++++++++++++ 5 files changed, 728 insertions(+), 619 deletions(-) create mode 100644 lading/src/neper/rr.rs diff --git a/lading/src/blackhole/tcp_rr.rs b/lading/src/blackhole/tcp_rr.rs index b81fc513c..a8c284981 100644 --- a/lading/src/blackhole/tcp_rr.rs +++ b/lading/src/blackhole/tcp_rr.rs @@ -5,6 +5,9 @@ //! request then writes a fixed-size response, repeating until the flow closes //! or lading shuts down. //! +//! The event-loop machinery lives in [`crate::neper::rr`]; this module is a +//! thin wrapper that supplies configuration. +//! //! ## Metrics //! //! `connections_accepted`: Incoming connections accepted @@ -13,25 +16,13 @@ //! `bytes_received`: Request bytes read //! `bytes_written`: Response bytes sent -use std::io::{ErrorKind, Read, Write}; -use std::net::{self, IpAddr, SocketAddr}; +use std::net::{IpAddr, SocketAddr}; use std::num::{NonZeroU16, NonZeroUsize}; -use std::os::fd::AsRawFd; -use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering::Relaxed}; -use std::time::Duration; -use mio::net::{TcpListener, TcpStream}; -use mio::{Events, Interest, Poll, Token}; use serde::{Deserialize, Serialize}; -use tokio::sync::mpsc; -use tracing::{info, trace, warn}; use super::General; -use crate::neper::bpf; -use crate::neper::flow::{self, Action, Flow, FlowMap}; -use crate::neper::metrics::{self, ThreadMetrics}; -use crate::neper::thread; +use crate::neper::rr::{self, ServerParams}; fn default_nonzero_u16() -> NonZeroU16 { NonZeroU16::new(1).expect("1 is nonzero") @@ -91,21 +82,9 @@ pub struct Config { #[derive(thiserror::Error, Debug)] /// Errors produced by [`TcpRr`]. pub enum Error { - /// IO error + /// Shared neper-style request/response error. #[error(transparent)] - Io(#[from] std::io::Error), - /// Error binding TCP listener - #[error("Failed to bind TCP listener to {addr}: {source}")] - Bind { - /// Binding address - addr: SocketAddr, - /// Underlying IO error - #[source] - source: Box, - }, - /// Worker thread panicked - #[error("Worker thread panicked")] - ThreadPanicked, + Rr(#[from] rr::Error), } #[derive(Debug)] @@ -116,13 +95,6 @@ pub struct TcpRr { shutdown: lading_signal::Watcher, } -enum ServerState { - RecvRequest, - SendResponse, -} - -const LISTENER_TOKEN: Token = Token(0); - impl TcpRr { /// Create a new [`TcpRr`] blackhole instance. #[must_use] @@ -146,354 +118,17 @@ impl TcpRr { /// # Errors /// /// Returns an error if binding fails or a worker thread panics. - /// - /// # Panics - /// - /// Panics if the ready-barrier tokio task is cancelled. - #[allow(clippy::too_many_lines)] pub async fn run(self) -> Result<(), Error> { - let shutdown_flag = thread::new_shutdown_flag(); - let num_threads = self.config.threads.get(); - - let thread_metrics = Arc::new( - (0..num_threads) - .map(|_| ThreadMetrics::new()) - .collect::>(), - ); - - let metrics_handle = { - let tm = Arc::clone(&thread_metrics); - let labels = self.metric_labels.clone(); - let flag = Arc::clone(&shutdown_flag); - thread::spawn_named("tcp_rr-bh-metrics", move || { - metrics::run_metrics_thread(&tm, &labels, &flag); - }) - }; - - // Pre-build thread 0's listener here so the BPF program is attached - // to the reuseport group before any other thread calls bind(). This - // removes the need for a cross-thread BPF barrier — if the bind fails - // or panics, it propagates as an error directly from this task. - let binding_addr = SocketAddr::new(self.config.addr, self.config.data_port); - let thread0_listener = if num_threads > 1 { - Some(create_listener( - 0, - num_threads, - binding_addr, - self.config.backlog, - )) - } else { - None + let params = ServerParams { + data_addr: SocketAddr::new(self.config.addr, self.config.data_port), + control_addr: SocketAddr::new(self.config.addr, self.config.control_port), + threads: self.config.threads.get(), + request_size: self.config.request_size.get(), + response_size: self.config.response_size.get(), + no_delay: self.config.no_delay, + backlog: self.config.backlog, }; - - // Each thread sends a ready signal via this channel after binding. - // If a thread panics before signaling, its sender drops; once all - // senders are gone, recv() returns None and we detect the failure - // instead of hanging forever. - let (ready_tx, mut ready_rx) = mpsc::unbounded_channel::<()>(); - - let mut handles = Vec::with_capacity(num_threads as usize); - let mut thread0_listener = thread0_listener; - for i in 0..num_threads { - let request_size = self.config.request_size.get(); - let response_size = self.config.response_size.get(); - let no_delay = self.config.no_delay; - let backlog = self.config.backlog; - let flag = Arc::clone(&shutdown_flag); - let tm = Arc::clone(&thread_metrics); - let prebuilt = if i == 0 { - thread0_listener.take() - } else { - None - }; - let tx = ready_tx.clone(); - let handle = thread::spawn_named(&format!("tcp_rr-server-{i}"), move || { - server_thread_main( - i, - num_threads, - binding_addr, - prebuilt, - backlog, - request_size, - response_size, - no_delay, - &flag, - &tm[i as usize], - tx, - ); - }); - handles.push(handle); - } - // Drop our own copy so the channel closes when all worker threads exit. - drop(ready_tx); - - // Wait for each thread to signal ready. If a sender drops without - // signaling (thread panicked), recv() eventually returns None. - for _ in 0..num_threads { - if ready_rx.recv().await.is_none() { - shutdown_flag.store(true, Relaxed); - thread::join_all(handles).map_err(|()| Error::ThreadPanicked)?; - return Err(Error::ThreadPanicked); - } - } - - // All data listeners are up. Open control port so the generator - // can connect and know we're ready. - let control_addr = SocketAddr::new(self.config.addr, self.config.control_port); - let control_listener = - net::TcpListener::bind(control_addr).map_err(|source| Error::Bind { - addr: control_addr, - source: Box::new(source), - })?; - control_listener - .set_nonblocking(true) - .expect("failed to set control listener nonblocking"); - info!("control port listening on {control_addr}, waiting for generator"); - - handles.push(metrics_handle); - - // Accept with shutdown awareness: poll accept in a loop. - let flag = Arc::clone(&shutdown_flag); - let shutdown_clone = self.shutdown.clone(); - tokio::spawn(async move { - shutdown_clone.recv().await; - flag.store(true, Relaxed); - }); - let mut generator_connected = false; - loop { - if shutdown_flag.load(Relaxed) { - info!("shutdown before generator connected"); - break; - } - match control_listener.accept() { - Ok((_conn, peer)) => { - info!("generator connected from {peer}, data threads running"); - generator_connected = true; - break; - } - Err(ref e) if e.kind() == ErrorKind::WouldBlock => { - tokio::time::sleep(Duration::from_millis(100)).await; - } - Err(e) => { - return Err(Error::Bind { - addr: control_addr, - source: Box::new(e), - }); - } - } - } - drop(control_listener); - - if generator_connected { - self.shutdown.recv().await; - info!("shutdown signal received"); - } - shutdown_flag.store(true, Relaxed); - - thread::join_all(handles).map_err(|()| Error::ThreadPanicked)?; - + rr::run_server(params, self.metric_labels, self.shutdown, "tcp_rr").await?; Ok(()) } } - -/// Create a listener socket. When `num_threads` > 1, sets `SO_REUSEPORT` -/// and (for thread 0) attaches the reuseport eBPF program. -fn create_listener( - thread_index: u16, - num_threads: u16, - binding_addr: SocketAddr, - backlog: i32, -) -> net::TcpListener { - let domain = if binding_addr.is_ipv4() { - socket2::Domain::IPV4 - } else { - socket2::Domain::IPV6 - }; - let socket = socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP)) - .expect("failed to create socket"); - socket - .set_nonblocking(true) - .expect("failed to set nonblocking"); - socket - .set_cloexec(true) - .expect("failed to set close-on-exec"); - socket - .set_reuse_address(true) - .expect("failed to set SO_REUSEADDR"); - - if num_threads > 1 { - socket - .set_reuse_port(true) - .expect("failed to set SO_REUSEPORT"); - - if thread_index == 0 { - match bpf::load_reuseport_ebpf(u32::from(num_threads)) { - Ok(prog) => { - if let Err(e) = bpf::attach_reuseport_ebpf(socket.as_raw_fd(), &prog) { - warn!("failed to attach reuseport eBPF: {e}, falling back to kernel hash"); - } - } - Err(e) => { - warn!("failed to load reuseport eBPF: {e}, falling back to kernel hash"); - } - } - } - } - - socket - .bind(&binding_addr.into()) - .unwrap_or_else(|e| panic!("failed to bind to {binding_addr}: {e}")); - socket.listen(backlog).expect("failed to listen"); - - socket.into() -} - -#[allow(clippy::too_many_arguments)] -fn server_thread_main( - thread_index: u16, - num_threads: u16, - binding_addr: SocketAddr, - prebuilt_listener: Option, - backlog: i32, - request_size: usize, - response_size: usize, - no_delay: bool, - shutdown_flag: &AtomicBool, - metrics: &ThreadMetrics, - ready_tx: mpsc::UnboundedSender<()>, -) { - // Thread 0 uses the pre-built listener (with BPF already attached); - // others bind their own sockets that join the existing reuseport group. - let std_listener = prebuilt_listener - .unwrap_or_else(|| create_listener(thread_index, num_threads, binding_addr, backlog)); - - // Signal that this thread's listener is bound and ready. If this send - // fails the receiver has gone away (blackhole is shutting down). - let _ = ready_tx.send(()); - drop(ready_tx); - - let mut listener = TcpListener::from_std(std_listener); - let mut poll = Poll::new().expect("failed to create mio::Poll"); - let mut events = Events::with_capacity(256); - - poll.registry() - .register(&mut listener, LISTENER_TOKEN, Interest::READABLE) - .expect("failed to register listener"); - - let mut request_buf = vec![0u8; request_size]; - let response_buf = vec![0u8; response_size]; - let mut flows: FlowMap = FlowMap::new(); - let mut next_token: usize = 1; - - loop { - let _ = poll.poll(&mut events, Some(Duration::from_millis(100))); - if shutdown_flag.load(Relaxed) { - break; - } - - let mut attempts = 0; - for event in &events { - if event.token() == LISTENER_TOKEN { - loop { - match listener.accept() { - Ok((stream, _addr)) => { - set_nodelay_mio(&stream, no_delay); - let token = Token(next_token); - next_token += 1; - let mut mio_stream = stream; - poll.registry() - .register(&mut mio_stream, token, Interest::READABLE) - .expect("failed to register flow"); - flows.insert(Flow { - stream: mio_stream, - token, - state: ServerState::RecvRequest, - xfer: request_size, - }); - metrics.connections_accepted.add(1); - } - Err(ref e) if e.kind() == ErrorKind::WouldBlock => break, - Err(e) => { - if attempts > 2 { - break; - } - warn!("accept error: {e}"); - attempts += 1; - std::thread::sleep(Duration::from_millis(1000)); - } - } - } - } else { - let token = event.token(); - let Some(fl) = flows.get_mut(token) else { - continue; - }; - let action = handle_server_event(fl, &mut request_buf, &response_buf, metrics); - flow::apply_action(action, token, &mut flows, poll.registry()); - } - } - } -} - -/// Set `TCP_NODELAY` on a mio [`TcpStream`] via a borrowed `socket2::SockRef`. -fn set_nodelay_mio(stream: &TcpStream, no_delay: bool) { - let sock = socket2::SockRef::from(stream); - if let Err(e) = sock.set_tcp_nodelay(no_delay) { - trace!("failed to set TCP_NODELAY: {e}"); - } -} - -fn handle_server_event( - flow: &mut Flow, - request_buf: &mut [u8], - response_buf: &[u8], - metrics: &ThreadMetrics, -) -> Action { - match flow.state { - ServerState::RecvRequest => { - let offset = request_buf.len() - flow.xfer; - match flow.stream.read(&mut request_buf[offset..]) { - Ok(0) => Action::Remove, - Ok(n) => { - flow.xfer -= n; - if flow.xfer == 0 { - flow.xfer = response_buf.len(); - flow.state = ServerState::SendResponse; - metrics.requests_received.add(1); - metrics.bytes_received.add(request_buf.len() as u64); - Action::Reregister(Interest::WRITABLE) - } else { - Action::Continue - } - } - Err(e) if e.kind() == ErrorKind::WouldBlock => Action::Continue, - Err(e) => { - trace!("read error: {e}"); - Action::Remove - } - } - } - ServerState::SendResponse => { - let offset = response_buf.len() - flow.xfer; - match flow.stream.write(&response_buf[offset..]) { - Ok(n) => { - flow.xfer -= n; - if flow.xfer == 0 { - flow.xfer = request_buf.len(); - flow.state = ServerState::RecvRequest; - metrics.responses_sent.add(1); - metrics.bytes_written.add(response_buf.len() as u64); - Action::Reregister(Interest::READABLE) - } else { - Action::Continue - } - } - Err(e) if e.kind() == ErrorKind::WouldBlock => Action::Continue, - Err(e) => { - trace!("write error: {e}"); - Action::Remove - } - } - } - } -} diff --git a/lading/src/generator/tcp_rr.rs b/lading/src/generator/tcp_rr.rs index 5a7a3394d..9d9d66b82 100644 --- a/lading/src/generator/tcp_rr.rs +++ b/lading/src/generator/tcp_rr.rs @@ -5,6 +5,9 @@ //! waits for a fixed-size response, and repeats. Flows are distributed across //! OS threads and multiplexed via mio. //! +//! The event-loop machinery lives in [`crate::neper::rr`]; this module is a +//! thin wrapper that supplies configuration. +//! //! ## Metrics //! //! `requests_sent`: Completed request writes @@ -13,23 +16,14 @@ //! `bytes_read`: Response bytes received //! `connections_failed`: Failed connection attempts -use std::io::{self, ErrorKind, Read, Write}; -use std::net::{self, IpAddr, SocketAddr}; +use std::net::{IpAddr, SocketAddr}; use std::num::{NonZeroU16, NonZeroUsize}; -use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering::Relaxed}; -use std::time::{Duration, Instant}; -use mio::net::TcpStream; -use mio::{Events, Interest, Poll, Token}; use serde::{Deserialize, Serialize}; -use tracing::{info, trace}; use super::General; use crate::generator::common::MetricsBuilder; -use crate::neper::flow::{self, Action, Flow, FlowMap}; -use crate::neper::metrics::{self, ThreadMetrics}; -use crate::neper::thread; +use crate::neper::rr::{self, ClientParams}; fn default_nonzero_u16() -> NonZeroU16 { NonZeroU16::new(1).expect("1 is nonzero") @@ -83,15 +77,9 @@ pub struct Config { #[derive(thiserror::Error, Debug)] /// Errors produced by [`TcpRr`]. pub enum Error { - /// IO error + /// Shared neper-style request/response error. #[error(transparent)] - Io(#[from] std::io::Error), - /// Worker thread panicked - #[error("Worker thread panicked")] - ThreadPanicked, - /// Invalid configuration. - #[error("invalid config: {0}")] - Config(String), + Rr(#[from] rr::Error), } #[derive(Debug)] @@ -102,11 +90,6 @@ pub struct TcpRr { shutdown: lading_signal::Watcher, } -enum ClientState { - SendRequest, - RecvResponse, -} - impl TcpRr { /// Create a new [`TcpRr`] generator instance. #[must_use] @@ -123,226 +106,23 @@ impl TcpRr { /// /// # Errors /// - /// Returns an error if a worker thread panics. + /// Returns an error if a worker thread panics or configuration is invalid. /// /// # Panics /// - /// Panics if `addr` cannot be resolved to a socket address. + /// Panics if `addr` cannot be parsed as an IP address. pub async fn spin(self) -> Result<(), Error> { - if self.config.threads > self.config.flows { - return Err(Error::Config(format!( - "threads ({}) must be <= flows ({})", - self.config.threads, self.config.flows - ))); - } - let ip: IpAddr = self.config.addr.parse().expect("invalid addr"); - let data_addr = SocketAddr::new(ip, self.config.data_port); - let control_addr = SocketAddr::new(ip, self.config.control_port); - - let shutdown_flag = thread::new_shutdown_flag(); - - // Wait for the blackhole to be ready by connecting to its control port. - info!("waiting for blackhole control port at {control_addr}"); - let deadline = Instant::now() + Duration::from_secs(300); - { - let flag = Arc::clone(&shutdown_flag); - let shutdown = self.shutdown.clone(); - tokio::spawn(async move { - shutdown.recv().await; - flag.store(true, Relaxed); - }); - } - loop { - if shutdown_flag.load(Relaxed) { - return Err(Error::Io(io::Error::new( - ErrorKind::ConnectionRefused, - format!( - "shutdown before blackhole control port {control_addr} became reachable" - ), - ))); - } - match net::TcpStream::connect(control_addr) { - Ok(_conn) => { - info!("blackhole ready, starting flows"); - break; - } - Err(e) => { - if Instant::now() >= deadline { - return Err(Error::Io(io::Error::new( - ErrorKind::TimedOut, - format!( - "blackhole control port {control_addr} not reachable after 5 minutes: {e}" - ), - ))); - } - std::thread::sleep(Duration::from_millis(100)); - } - } - } - let num_threads = self.config.threads.get(); - let num_flows = self.config.flows.get(); - let request_size = self.config.request_size.get(); - let response_size = self.config.response_size.get(); - - let flow_dist = thread::distribute_flows(num_flows, num_threads); - - let thread_metrics = Arc::new( - (0..num_threads) - .map(|_| ThreadMetrics::new()) - .collect::>(), - ); - - let metrics_handle = { - let tm = Arc::clone(&thread_metrics); - let labels = self.metric_labels.clone(); - let flag = Arc::clone(&shutdown_flag); - thread::spawn_named("tcp_rr-metrics", move || { - metrics::run_metrics_thread(&tm, &labels, &flag); - }) + let params = ClientParams { + data_addr: SocketAddr::new(ip, self.config.data_port), + control_addr: SocketAddr::new(ip, self.config.control_port), + threads: self.config.threads.get(), + flows: self.config.flows.get(), + request_size: self.config.request_size.get(), + response_size: self.config.response_size.get(), + no_delay: self.config.no_delay, }; - - let mut worker_handles = Vec::with_capacity(num_threads as usize); - for i in 0..num_threads { - let thread_flows = flow_dist[i as usize]; - let flag = Arc::clone(&shutdown_flag); - let tm = Arc::clone(&thread_metrics); - let no_delay = self.config.no_delay; - let handle = thread::spawn_named(&format!("tcp_rr-client-{i}"), move || { - client_thread_main( - data_addr, - thread_flows, - request_size, - response_size, - no_delay, - &flag, - &tm[i as usize], - ); - }); - worker_handles.push(handle); - } - - self.shutdown.recv().await; - info!("shutdown signal received"); - shutdown_flag.store(true, Relaxed); - - worker_handles.push(metrics_handle); - thread::join_all(worker_handles).map_err(|()| Error::ThreadPanicked)?; - + rr::run_client(params, self.metric_labels, self.shutdown, "tcp_rr").await?; Ok(()) } } - -fn client_thread_main( - addr: SocketAddr, - num_flows: u16, - request_size: usize, - response_size: usize, - no_delay: bool, - shutdown_flag: &AtomicBool, - metrics: &ThreadMetrics, -) { - let mut poll = Poll::new().expect("failed to create mio::Poll"); - let mut events = Events::with_capacity(num_flows as usize); - let request_buf = vec![0u8; request_size]; - let mut response_buf = vec![0u8; response_size]; - let mut flows: FlowMap = FlowMap::new(); - let mut next_token: usize = 0; - - for _ in 0..num_flows { - match net::TcpStream::connect(addr) { - Ok(std_stream) => { - let _ = std_stream.set_nodelay(no_delay); - std_stream - .set_nonblocking(true) - .expect("failed to set nonblocking"); - let mut stream = TcpStream::from_std(std_stream); - let token = Token(next_token); - next_token += 1; - poll.registry() - .register(&mut stream, token, Interest::WRITABLE) - .expect("failed to register flow"); - flows.insert(Flow { - stream, - token, - state: ClientState::SendRequest, - xfer: request_size, - }); - } - Err(e) => { - trace!("connection to {addr} failed: {e}"); - metrics.connections_failed.add(1); - } - } - } - - loop { - let _ = poll.poll(&mut events, Some(Duration::from_millis(100))); - if shutdown_flag.load(Relaxed) { - break; - } - for event in &events { - let token = event.token(); - let Some(fl) = flows.get_mut(token) else { - continue; - }; - let action = handle_client_event(fl, &request_buf, &mut response_buf, metrics); - flow::apply_action(action, token, &mut flows, poll.registry()); - } - } -} - -fn handle_client_event( - flow: &mut Flow, - request_buf: &[u8], - response_buf: &mut [u8], - metrics: &ThreadMetrics, -) -> Action { - match flow.state { - ClientState::SendRequest => { - let offset = request_buf.len() - flow.xfer; - match flow.stream.write(&request_buf[offset..]) { - Ok(n) => { - flow.xfer -= n; - if flow.xfer == 0 { - flow.xfer = response_buf.len(); - flow.state = ClientState::RecvResponse; - metrics.requests_sent.add(1); - metrics.bytes_written.add(request_buf.len() as u64); - Action::Reregister(Interest::READABLE) - } else { - Action::Continue - } - } - Err(e) if e.kind() == ErrorKind::WouldBlock => Action::Continue, - Err(e) => { - trace!("write error: {e}"); - Action::Remove - } - } - } - ClientState::RecvResponse => { - let offset = response_buf.len() - flow.xfer; - match flow.stream.read(&mut response_buf[offset..]) { - Ok(0) => Action::Remove, - Ok(n) => { - flow.xfer -= n; - if flow.xfer == 0 { - flow.xfer = request_buf.len(); - flow.state = ClientState::SendRequest; - metrics.responses_received.add(1); - metrics.bytes_read.add(response_buf.len() as u64); - Action::Reregister(Interest::WRITABLE) - } else { - Action::Continue - } - } - Err(e) if e.kind() == ErrorKind::WouldBlock => Action::Continue, - Err(e) => { - trace!("read error: {e}"); - Action::Remove - } - } - } - } -} diff --git a/lading/src/neper.rs b/lading/src/neper.rs index 1a07396ea..befe81fa0 100644 --- a/lading/src/neper.rs +++ b/lading/src/neper.rs @@ -11,4 +11,5 @@ pub(crate) mod bpf; pub(crate) mod bpf; pub(crate) mod flow; pub(crate) mod metrics; +pub(crate) mod rr; pub(crate) mod thread; diff --git a/lading/src/neper/metrics.rs b/lading/src/neper/metrics.rs index f449c4535..8c3fdab06 100644 --- a/lading/src/neper/metrics.rs +++ b/lading/src/neper/metrics.rs @@ -69,8 +69,10 @@ define_thread_metrics! { responses_received, bytes_written, bytes_read, + connections_initiated, connections_failed, connections_accepted, + connections_closed, requests_received, responses_sent, bytes_received, diff --git a/lading/src/neper/rr.rs b/lading/src/neper/rr.rs new file mode 100644 index 000000000..cff09a0c9 --- /dev/null +++ b/lading/src/neper/rr.rs @@ -0,0 +1,691 @@ +//! Shared client/server machinery for neper-style request/response workloads. +//! +//! Provides [`run_client`] and [`run_server`] entry points used by `tcp_rr` +//! (and forthcoming `tcp_crr`). The shared code owns the mio event loops, flow +//! lifecycle, control-port synchronization, and per-thread metrics plumbing; +//! per-variant modules build the [`ClientParams`] / [`ServerParams`] and +//! call in. + +use std::io::{self, ErrorKind, Read, Write}; +use std::net::{self, SocketAddr}; +use std::os::fd::AsRawFd; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering::Relaxed}; +use std::time::{Duration, Instant}; + +use mio::net::{TcpListener, TcpStream}; +use mio::{Events, Interest, Poll, Token}; +use tokio::sync::mpsc; +use tracing::{info, trace, warn}; + +use crate::neper::bpf; +use crate::neper::flow::{self, Action, Flow, FlowMap}; +use crate::neper::metrics::{self, ThreadMetrics}; +use crate::neper::thread; + +/// Errors produced by [`run_client`] and [`run_server`]. +#[derive(thiserror::Error, Debug)] +pub enum Error { + /// IO error. + #[error(transparent)] + Io(#[from] std::io::Error), + /// Failed to bind a listener. + #[error("Failed to bind TCP listener to {addr}: {source}")] + Bind { + /// Binding address. + addr: SocketAddr, + /// Underlying IO error. + #[source] + source: Box, + }, + /// A worker thread panicked. + #[error("Worker thread panicked")] + ThreadPanicked, + /// Invalid configuration. + #[error("invalid config: {0}")] + Config(String), +} + +/// Parameters for [`run_client`]. +pub(crate) struct ClientParams { + /// Address of the server's data port. + pub(crate) data_addr: SocketAddr, + /// Address of the server's control port. + pub(crate) control_addr: SocketAddr, + /// Number of OS threads. + pub(crate) threads: u16, + /// Total number of TCP flows. + pub(crate) flows: u16, + /// Bytes per request. + pub(crate) request_size: usize, + /// Bytes per response. + pub(crate) response_size: usize, + /// Whether to set `TCP_NODELAY`. + pub(crate) no_delay: bool, +} + +/// Parameters for [`run_server`]. +pub(crate) struct ServerParams { + /// Address to bind the data listener on. + pub(crate) data_addr: SocketAddr, + /// Address to bind the control listener on. + pub(crate) control_addr: SocketAddr, + /// Number of OS server threads. + pub(crate) threads: u16, + /// Bytes to read per request. + pub(crate) request_size: usize, + /// Bytes to send per response. + pub(crate) response_size: usize, + /// Whether to set `TCP_NODELAY` on accepted connections. + pub(crate) no_delay: bool, + /// Listener backlog. + pub(crate) backlog: i32, +} + +enum ClientState { + SendRequest, + RecvResponse, +} + +enum ServerState { + RecvRequest, + SendResponse, +} + +const LISTENER_TOKEN: Token = Token(0); + +/// Run the neper-style client (generator side). +/// +/// Connects `flows` TCP flows distributed across `threads` OS threads, then +/// runs a request/response loop on each until `shutdown` fires. +/// +/// `thread_prefix` is used to name OS threads (`{prefix}-metrics`, +/// `{prefix}-client-{i}`) so multiple variants can coexist in `top -H`. +/// +/// # Errors +/// +/// Returns an error if configuration is invalid, the blackhole control port +/// is never reachable, or a worker thread panics. +pub(crate) async fn run_client( + params: ClientParams, + metric_labels: Vec<(String, String)>, + shutdown: lading_signal::Watcher, + thread_prefix: &'static str, +) -> Result<(), Error> { + if params.threads > params.flows { + return Err(Error::Config(format!( + "threads ({}) must be <= flows ({})", + params.threads, params.flows + ))); + } + + let shutdown_flag = thread::new_shutdown_flag(); + + // Wait for the blackhole to be ready by connecting to its control port. + info!( + "waiting for blackhole control port at {}", + params.control_addr + ); + let deadline = Instant::now() + Duration::from_secs(300); + { + let flag = Arc::clone(&shutdown_flag); + let shutdown = shutdown.clone(); + tokio::spawn(async move { + shutdown.recv().await; + flag.store(true, Relaxed); + }); + } + loop { + if shutdown_flag.load(Relaxed) { + return Err(Error::Io(io::Error::new( + ErrorKind::ConnectionRefused, + format!( + "shutdown before blackhole control port {} became reachable", + params.control_addr + ), + ))); + } + match net::TcpStream::connect(params.control_addr) { + Ok(_conn) => { + info!("blackhole ready, starting flows"); + break; + } + Err(e) => { + if Instant::now() >= deadline { + return Err(Error::Io(io::Error::new( + ErrorKind::TimedOut, + format!( + "blackhole control port {} not reachable after 5 minutes: {e}", + params.control_addr + ), + ))); + } + std::thread::sleep(Duration::from_millis(100)); + } + } + } + + let flow_dist = thread::distribute_flows(params.flows, params.threads); + + let thread_metrics = Arc::new( + (0..params.threads) + .map(|_| ThreadMetrics::new()) + .collect::>(), + ); + + let metrics_handle = { + let tm = Arc::clone(&thread_metrics); + let labels = metric_labels.clone(); + let flag = Arc::clone(&shutdown_flag); + thread::spawn_named(&format!("{thread_prefix}-metrics"), move || { + metrics::run_metrics_thread(&tm, &labels, &flag); + }) + }; + + let data_addr = params.data_addr; + let request_size = params.request_size; + let response_size = params.response_size; + let no_delay = params.no_delay; + let mut worker_handles = Vec::with_capacity(params.threads as usize); + for i in 0..params.threads { + let thread_flows = flow_dist[i as usize]; + let flag = Arc::clone(&shutdown_flag); + let tm = Arc::clone(&thread_metrics); + let handle = thread::spawn_named(&format!("{thread_prefix}-client-{i}"), move || { + client_thread_main( + data_addr, + thread_flows, + request_size, + response_size, + no_delay, + &flag, + &tm[i as usize], + ); + }); + worker_handles.push(handle); + } + + shutdown.recv().await; + info!("shutdown signal received"); + shutdown_flag.store(true, Relaxed); + + worker_handles.push(metrics_handle); + thread::join_all(worker_handles).map_err(|()| Error::ThreadPanicked)?; + + Ok(()) +} + +fn client_thread_main( + addr: SocketAddr, + num_flows: u16, + request_size: usize, + response_size: usize, + no_delay: bool, + shutdown_flag: &AtomicBool, + metrics: &ThreadMetrics, +) { + let mut poll = Poll::new().expect("failed to create mio::Poll"); + let mut events = Events::with_capacity(num_flows as usize); + let request_buf = vec![0u8; request_size]; + let mut response_buf = vec![0u8; response_size]; + let mut flows: FlowMap = FlowMap::new(); + let mut next_token: usize = 0; + + for _ in 0..num_flows { + match net::TcpStream::connect(addr) { + Ok(std_stream) => { + let _ = std_stream.set_nodelay(no_delay); + std_stream + .set_nonblocking(true) + .expect("failed to set nonblocking"); + let mut stream = TcpStream::from_std(std_stream); + let token = Token(next_token); + next_token += 1; + poll.registry() + .register(&mut stream, token, Interest::WRITABLE) + .expect("failed to register flow"); + flows.insert(Flow { + stream, + token, + state: ClientState::SendRequest, + xfer: request_size, + }); + } + Err(e) => { + trace!("connection to {addr} failed: {e}"); + metrics.connections_failed.add(1); + } + } + } + + loop { + let _ = poll.poll(&mut events, Some(Duration::from_millis(100))); + if shutdown_flag.load(Relaxed) { + break; + } + for event in &events { + let token = event.token(); + let Some(fl) = flows.get_mut(token) else { + continue; + }; + let action = handle_client_event(fl, &request_buf, &mut response_buf, metrics); + flow::apply_action(action, token, &mut flows, poll.registry()); + } + } +} + +fn handle_client_event( + flow: &mut Flow, + request_buf: &[u8], + response_buf: &mut [u8], + metrics: &ThreadMetrics, +) -> Action { + match flow.state { + ClientState::SendRequest => { + let offset = request_buf.len() - flow.xfer; + match flow.stream.write(&request_buf[offset..]) { + Ok(n) => { + flow.xfer -= n; + if flow.xfer == 0 { + flow.xfer = response_buf.len(); + flow.state = ClientState::RecvResponse; + metrics.requests_sent.add(1); + metrics.bytes_written.add(request_buf.len() as u64); + Action::Reregister(Interest::READABLE) + } else { + Action::Continue + } + } + Err(e) if e.kind() == ErrorKind::WouldBlock => Action::Continue, + Err(e) => { + trace!("write error: {e}"); + Action::Remove + } + } + } + ClientState::RecvResponse => { + let offset = response_buf.len() - flow.xfer; + match flow.stream.read(&mut response_buf[offset..]) { + Ok(0) => Action::Remove, + Ok(n) => { + flow.xfer -= n; + if flow.xfer == 0 { + flow.xfer = request_buf.len(); + flow.state = ClientState::SendRequest; + metrics.responses_received.add(1); + metrics.bytes_read.add(response_buf.len() as u64); + Action::Reregister(Interest::WRITABLE) + } else { + Action::Continue + } + } + Err(e) if e.kind() == ErrorKind::WouldBlock => Action::Continue, + Err(e) => { + trace!("read error: {e}"); + Action::Remove + } + } + } + } +} + +/// Run the neper-style server (blackhole side). +/// +/// Binds a data listener (with `SO_REUSEPORT` + reuseport eBPF when +/// `threads > 1`), then accepts and services request/response flows until +/// `shutdown` fires. +/// +/// `thread_prefix` is used to name OS threads (`{prefix}-bh-metrics`, +/// `{prefix}-server-{i}`). +/// +/// # Errors +/// +/// Returns an error if binding fails or a worker thread panics. +/// +/// # Panics +/// +/// Panics if the ready-barrier tokio task is cancelled. +#[allow(clippy::too_many_lines)] +pub(crate) async fn run_server( + params: ServerParams, + metric_labels: Vec<(String, String)>, + shutdown: lading_signal::Watcher, + thread_prefix: &'static str, +) -> Result<(), Error> { + let shutdown_flag = thread::new_shutdown_flag(); + let num_threads = params.threads; + + let thread_metrics = Arc::new( + (0..num_threads) + .map(|_| ThreadMetrics::new()) + .collect::>(), + ); + + let metrics_handle = { + let tm = Arc::clone(&thread_metrics); + let labels = metric_labels.clone(); + let flag = Arc::clone(&shutdown_flag); + thread::spawn_named(&format!("{thread_prefix}-bh-metrics"), move || { + metrics::run_metrics_thread(&tm, &labels, &flag); + }) + }; + + // Pre-build thread 0's listener here so the BPF program is attached to the + // reuseport group before any other thread calls bind(). This removes the + // need for a cross-thread BPF barrier — if bind fails or panics, it + // propagates as an error directly from this task. + let binding_addr = params.data_addr; + let thread0_listener = if num_threads > 1 { + Some(create_listener(0, num_threads, binding_addr, params.backlog)) + } else { + None + }; + + // Each thread sends a ready signal via this channel after binding. If a + // thread panics before signaling, its sender drops; once all senders are + // gone, recv() returns None and we detect the failure instead of hanging + // forever. + let (ready_tx, mut ready_rx) = mpsc::unbounded_channel::<()>(); + + let mut handles = Vec::with_capacity(num_threads as usize); + let mut thread0_listener = thread0_listener; + for i in 0..num_threads { + let request_size = params.request_size; + let response_size = params.response_size; + let no_delay = params.no_delay; + let backlog = params.backlog; + let flag = Arc::clone(&shutdown_flag); + let tm = Arc::clone(&thread_metrics); + let prebuilt = if i == 0 { + thread0_listener.take() + } else { + None + }; + let tx = ready_tx.clone(); + let handle = thread::spawn_named(&format!("{thread_prefix}-server-{i}"), move || { + server_thread_main( + i, + num_threads, + binding_addr, + prebuilt, + backlog, + request_size, + response_size, + no_delay, + &flag, + &tm[i as usize], + tx, + ); + }); + handles.push(handle); + } + // Drop our own copy so the channel closes when all worker threads exit. + drop(ready_tx); + + // Wait for each thread to signal ready. If a sender drops without + // signaling (thread panicked), recv() eventually returns None. + for _ in 0..num_threads { + if ready_rx.recv().await.is_none() { + shutdown_flag.store(true, Relaxed); + thread::join_all(handles).map_err(|()| Error::ThreadPanicked)?; + return Err(Error::ThreadPanicked); + } + } + + // All data listeners are up. Open control port so the generator can + // connect and know we're ready. + let control_addr = params.control_addr; + let control_listener = net::TcpListener::bind(control_addr).map_err(|source| Error::Bind { + addr: control_addr, + source: Box::new(source), + })?; + control_listener + .set_nonblocking(true) + .expect("failed to set control listener nonblocking"); + info!("control port listening on {control_addr}, waiting for generator"); + + handles.push(metrics_handle); + + let flag = Arc::clone(&shutdown_flag); + let shutdown_clone = shutdown.clone(); + tokio::spawn(async move { + shutdown_clone.recv().await; + flag.store(true, Relaxed); + }); + let mut generator_connected = false; + loop { + if shutdown_flag.load(Relaxed) { + info!("shutdown before generator connected"); + break; + } + match control_listener.accept() { + Ok((_conn, peer)) => { + info!("generator connected from {peer}, data threads running"); + generator_connected = true; + break; + } + Err(ref e) if e.kind() == ErrorKind::WouldBlock => { + tokio::time::sleep(Duration::from_millis(100)).await; + } + Err(e) => { + return Err(Error::Bind { + addr: control_addr, + source: Box::new(e), + }); + } + } + } + drop(control_listener); + + if generator_connected { + shutdown.recv().await; + info!("shutdown signal received"); + } + shutdown_flag.store(true, Relaxed); + + thread::join_all(handles).map_err(|()| Error::ThreadPanicked)?; + + Ok(()) +} + +/// Create a listener socket. When `num_threads` > 1, sets `SO_REUSEPORT` +/// and (for thread 0) attaches the reuseport eBPF program. +fn create_listener( + thread_index: u16, + num_threads: u16, + binding_addr: SocketAddr, + backlog: i32, +) -> net::TcpListener { + let domain = if binding_addr.is_ipv4() { + socket2::Domain::IPV4 + } else { + socket2::Domain::IPV6 + }; + let socket = socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP)) + .expect("failed to create socket"); + socket + .set_nonblocking(true) + .expect("failed to set nonblocking"); + socket + .set_cloexec(true) + .expect("failed to set close-on-exec"); + socket + .set_reuse_address(true) + .expect("failed to set SO_REUSEADDR"); + + if num_threads > 1 { + socket + .set_reuse_port(true) + .expect("failed to set SO_REUSEPORT"); + + if thread_index == 0 { + match bpf::load_reuseport_ebpf(u32::from(num_threads)) { + Ok(prog) => { + if let Err(e) = bpf::attach_reuseport_ebpf(socket.as_raw_fd(), &prog) { + warn!("failed to attach reuseport eBPF: {e}, falling back to kernel hash"); + } + } + Err(e) => { + warn!("failed to load reuseport eBPF: {e}, falling back to kernel hash"); + } + } + } + } + + socket + .bind(&binding_addr.into()) + .unwrap_or_else(|e| panic!("failed to bind to {binding_addr}: {e}")); + socket.listen(backlog).expect("failed to listen"); + + socket.into() +} + +#[allow(clippy::too_many_arguments)] +fn server_thread_main( + thread_index: u16, + num_threads: u16, + binding_addr: SocketAddr, + prebuilt_listener: Option, + backlog: i32, + request_size: usize, + response_size: usize, + no_delay: bool, + shutdown_flag: &AtomicBool, + metrics: &ThreadMetrics, + ready_tx: mpsc::UnboundedSender<()>, +) { + // Thread 0 uses the pre-built listener (with BPF already attached); others + // bind their own sockets that join the existing reuseport group. + let std_listener = prebuilt_listener + .unwrap_or_else(|| create_listener(thread_index, num_threads, binding_addr, backlog)); + + // Signal that this thread's listener is bound and ready. If this send + // fails the receiver has gone away (blackhole is shutting down). + let _ = ready_tx.send(()); + drop(ready_tx); + + let mut listener = TcpListener::from_std(std_listener); + let mut poll = Poll::new().expect("failed to create mio::Poll"); + let mut events = Events::with_capacity(256); + + poll.registry() + .register(&mut listener, LISTENER_TOKEN, Interest::READABLE) + .expect("failed to register listener"); + + let mut request_buf = vec![0u8; request_size]; + let response_buf = vec![0u8; response_size]; + let mut flows: FlowMap = FlowMap::new(); + let mut next_token: usize = 1; + + loop { + let _ = poll.poll(&mut events, Some(Duration::from_millis(100))); + if shutdown_flag.load(Relaxed) { + break; + } + + let mut attempts = 0; + for event in &events { + if event.token() == LISTENER_TOKEN { + loop { + match listener.accept() { + Ok((stream, _addr)) => { + set_nodelay_mio(&stream, no_delay); + let token = Token(next_token); + next_token += 1; + let mut mio_stream = stream; + poll.registry() + .register(&mut mio_stream, token, Interest::READABLE) + .expect("failed to register flow"); + flows.insert(Flow { + stream: mio_stream, + token, + state: ServerState::RecvRequest, + xfer: request_size, + }); + metrics.connections_accepted.add(1); + } + Err(ref e) if e.kind() == ErrorKind::WouldBlock => break, + Err(e) => { + if attempts > 2 { + break; + } + warn!("accept error: {e}"); + attempts += 1; + std::thread::sleep(Duration::from_millis(1000)); + } + } + } + } else { + let token = event.token(); + let Some(fl) = flows.get_mut(token) else { + continue; + }; + let action = handle_server_event(fl, &mut request_buf, &response_buf, metrics); + flow::apply_action(action, token, &mut flows, poll.registry()); + } + } + } +} + +/// Set `TCP_NODELAY` on a mio [`TcpStream`] via a borrowed `socket2::SockRef`. +fn set_nodelay_mio(stream: &TcpStream, no_delay: bool) { + let sock = socket2::SockRef::from(stream); + if let Err(e) = sock.set_tcp_nodelay(no_delay) { + trace!("failed to set TCP_NODELAY: {e}"); + } +} + +fn handle_server_event( + flow: &mut Flow, + request_buf: &mut [u8], + response_buf: &[u8], + metrics: &ThreadMetrics, +) -> Action { + match flow.state { + ServerState::RecvRequest => { + let offset = request_buf.len() - flow.xfer; + match flow.stream.read(&mut request_buf[offset..]) { + Ok(0) => Action::Remove, + Ok(n) => { + flow.xfer -= n; + if flow.xfer == 0 { + flow.xfer = response_buf.len(); + flow.state = ServerState::SendResponse; + metrics.requests_received.add(1); + metrics.bytes_received.add(request_buf.len() as u64); + Action::Reregister(Interest::WRITABLE) + } else { + Action::Continue + } + } + Err(e) if e.kind() == ErrorKind::WouldBlock => Action::Continue, + Err(e) => { + trace!("read error: {e}"); + Action::Remove + } + } + } + ServerState::SendResponse => { + let offset = response_buf.len() - flow.xfer; + match flow.stream.write(&response_buf[offset..]) { + Ok(n) => { + flow.xfer -= n; + if flow.xfer == 0 { + flow.xfer = request_buf.len(); + flow.state = ServerState::RecvRequest; + metrics.responses_sent.add(1); + metrics.bytes_written.add(response_buf.len() as u64); + Action::Reregister(Interest::READABLE) + } else { + Action::Continue + } + } + Err(e) if e.kind() == ErrorKind::WouldBlock => Action::Continue, + Err(e) => { + trace!("write error: {e}"); + Action::Remove + } + } + } + } +} From 1046f0d5ac3a60d579106d67b049cb6f9f323655 Mon Sep 17 00:00:00 2001 From: Usama Saqib Date: Mon, 1 Jun 2026 15:28:01 +0200 Subject: [PATCH 2/6] do not swallow poll registry errors --- lading/src/neper/flow.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/lading/src/neper/flow.rs b/lading/src/neper/flow.rs index 102a53121..e4b95dc46 100644 --- a/lading/src/neper/flow.rs +++ b/lading/src/neper/flow.rs @@ -73,12 +73,16 @@ pub(crate) fn apply_action( Action::Continue => {} Action::Reregister(interest) => { if let Some(flow) = flows.get_mut(token) { - let _ = registry.reregister(&mut flow.stream, flow.token, interest); + registry + .reregister(&mut flow.stream, flow.token, interest) + .expect("reregister of a live, owned flow must succeed"); } } Action::Remove => { if let Some(mut flow) = flows.remove(token) { - let _ = registry.deregister(&mut flow.stream); + registry + .deregister(&mut flow.stream) + .expect("deregister of a registered, owned flow must succeed"); } } } From 1785a95bb929c0329b707a6d5ff746c575966153 Mon Sep 17 00:00:00 2001 From: Usama Saqib Date: Mon, 1 Jun 2026 16:31:57 +0200 Subject: [PATCH 3/6] coordinate number of flows between generator and blackhole --- lading/src/blackhole/tcp_rr.rs | 8 +++- lading/src/generator/tcp_rr.rs | 8 ++-- lading/src/neper/rr.rs | 67 +++++++++++++++++++++++++--------- 3 files changed, 60 insertions(+), 23 deletions(-) diff --git a/lading/src/blackhole/tcp_rr.rs b/lading/src/blackhole/tcp_rr.rs index a8c284981..f6aefb1f0 100644 --- a/lading/src/blackhole/tcp_rr.rs +++ b/lading/src/blackhole/tcp_rr.rs @@ -61,9 +61,14 @@ pub struct Config { #[serde(default = "default_control_port")] pub control_port: u16, /// Number of OS server threads. Default 1. When > 1, uses `SO_REUSEPORT` - /// with an eBPF program for load balancing + /// with an eBPF program for load balancing. #[serde(default = "default_nonzero_u16")] pub threads: NonZeroU16, + /// Total number of TCP flows the generator should open. + /// Default 1. Sent to the generator over the control connection at + /// startup; the generator does not configure this independently. + #[serde(default = "default_nonzero_u16")] + pub flows: NonZeroU16, /// Bytes to read per request. Default 1. #[serde(default = "default_nonzero_usize")] pub request_size: NonZeroUsize, @@ -123,6 +128,7 @@ impl TcpRr { data_addr: SocketAddr::new(self.config.addr, self.config.data_port), control_addr: SocketAddr::new(self.config.addr, self.config.control_port), threads: self.config.threads.get(), + flows: self.config.flows.get(), request_size: self.config.request_size.get(), response_size: self.config.response_size.get(), no_delay: self.config.no_delay, diff --git a/lading/src/generator/tcp_rr.rs b/lading/src/generator/tcp_rr.rs index 9d9d66b82..d3f2e75b8 100644 --- a/lading/src/generator/tcp_rr.rs +++ b/lading/src/generator/tcp_rr.rs @@ -48,6 +48,10 @@ fn default_data_port() -> u16 { #[derive(Debug, Deserialize, Serialize, PartialEq, Clone)] #[serde(deny_unknown_fields)] /// Configuration for the `tcp_rr` generator. +/// +/// Flow count is *not* configured here — it is owned by the +/// `tcp_rr` blackhole and communicated to the generator over the control port +/// during startup. pub struct Config { /// The IP address of the `tcp_rr` server. pub addr: String, @@ -60,9 +64,6 @@ pub struct Config { /// Number of OS threads (neper -T). Default 1. #[serde(default = "default_nonzero_u16")] pub threads: NonZeroU16, - /// Total number of TCP flows/connections (neper -F). Default 1. - #[serde(default = "default_nonzero_u16")] - pub flows: NonZeroU16, /// Bytes per request. Default 1. #[serde(default = "default_nonzero_usize")] pub request_size: NonZeroUsize, @@ -117,7 +118,6 @@ impl TcpRr { data_addr: SocketAddr::new(ip, self.config.data_port), control_addr: SocketAddr::new(ip, self.config.control_port), threads: self.config.threads.get(), - flows: self.config.flows.get(), request_size: self.config.request_size.get(), response_size: self.config.response_size.get(), no_delay: self.config.no_delay, diff --git a/lading/src/neper/rr.rs b/lading/src/neper/rr.rs index cff09a0c9..0f3d269dd 100644 --- a/lading/src/neper/rr.rs +++ b/lading/src/neper/rr.rs @@ -47,6 +47,9 @@ pub enum Error { } /// Parameters for [`run_client`]. +/// +/// Flow count is *not* a client parameter — it is owned by the server and +/// communicated to the client over the control connection during startup. pub(crate) struct ClientParams { /// Address of the server's data port. pub(crate) data_addr: SocketAddr, @@ -54,8 +57,6 @@ pub(crate) struct ClientParams { pub(crate) control_addr: SocketAddr, /// Number of OS threads. pub(crate) threads: u16, - /// Total number of TCP flows. - pub(crate) flows: u16, /// Bytes per request. pub(crate) request_size: usize, /// Bytes per response. @@ -72,6 +73,9 @@ pub(crate) struct ServerParams { pub(crate) control_addr: SocketAddr, /// Number of OS server threads. pub(crate) threads: u16, + /// Total number of TCP flows the client should open. Sent to the client + /// over the control connection during startup. + pub(crate) flows: u16, /// Bytes to read per request. pub(crate) request_size: usize, /// Bytes to send per response. @@ -94,6 +98,12 @@ enum ServerState { const LISTENER_TOKEN: Token = Token(0); +/// Control-channel handshake: server writes `flows` to the accepted control +/// connection as a 2-byte big-endian `u16` and closes; client reads the same +/// 2 bytes after connecting. Internal protocol — no magic / version byte. +const HANDSHAKE_LEN: usize = 2; +const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(5); + /// Run the neper-style client (generator side). /// /// Connects `flows` TCP flows distributed across `threads` OS threads, then @@ -112,16 +122,10 @@ pub(crate) async fn run_client( shutdown: lading_signal::Watcher, thread_prefix: &'static str, ) -> Result<(), Error> { - if params.threads > params.flows { - return Err(Error::Config(format!( - "threads ({}) must be <= flows ({})", - params.threads, params.flows - ))); - } - let shutdown_flag = thread::new_shutdown_flag(); - // Wait for the blackhole to be ready by connecting to its control port. + // Wait for the blackhole to be ready by connecting to its control port, + // then read the flow count over that connection. info!( "waiting for blackhole control port at {}", params.control_addr @@ -135,7 +139,7 @@ pub(crate) async fn run_client( flag.store(true, Relaxed); }); } - loop { + let flows: u16 = loop { if shutdown_flag.load(Relaxed) { return Err(Error::Io(io::Error::new( ErrorKind::ConnectionRefused, @@ -146,9 +150,14 @@ pub(crate) async fn run_client( ))); } match net::TcpStream::connect(params.control_addr) { - Ok(_conn) => { - info!("blackhole ready, starting flows"); - break; + Ok(mut conn) => { + conn.set_read_timeout(Some(HANDSHAKE_TIMEOUT)) + .expect("set_read_timeout on connected TcpStream must succeed"); + let mut buf = [0u8; HANDSHAKE_LEN]; + conn.read_exact(&mut buf)?; + let received = u16::from_be_bytes(buf); + info!("blackhole ready, {received} flows to open"); + break received; } Err(e) => { if Instant::now() >= deadline { @@ -163,9 +172,16 @@ pub(crate) async fn run_client( std::thread::sleep(Duration::from_millis(100)); } } + }; + + if params.threads > flows { + return Err(Error::Config(format!( + "threads ({}) must be <= flows received from blackhole ({flows})", + params.threads + ))); } - let flow_dist = thread::distribute_flows(params.flows, params.threads); + let flow_dist = thread::distribute_flows(flows, params.threads); let thread_metrics = Arc::new( (0..params.threads) @@ -389,6 +405,7 @@ pub(crate) async fn run_server( let mut handles = Vec::with_capacity(num_threads as usize); let mut thread0_listener = thread0_listener; + let flows = params.flows; for i in 0..num_threads { let request_size = params.request_size; let response_size = params.response_size; @@ -409,6 +426,7 @@ pub(crate) async fn run_server( binding_addr, prebuilt, backlog, + flows, request_size, response_size, no_delay, @@ -453,14 +471,24 @@ pub(crate) async fn run_server( flag.store(true, Relaxed); }); let mut generator_connected = false; + let flows_bytes = params.flows.to_be_bytes(); loop { if shutdown_flag.load(Relaxed) { info!("shutdown before generator connected"); break; } match control_listener.accept() { - Ok((_conn, peer)) => { - info!("generator connected from {peer}, data threads running"); + Ok((mut conn, peer)) => { + // accept(2) on Linux returns a blocking socket regardless of + // the listener's O_NONBLOCK; a small write_timeout guards + // against a generator that connects but never reads. + conn.set_write_timeout(Some(HANDSHAKE_TIMEOUT)) + .expect("set_write_timeout on accepted TcpStream must succeed"); + conn.write_all(&flows_bytes)?; + info!( + "generator connected from {peer}, sent flows={}, data threads running", + params.flows + ); generator_connected = true; break; } @@ -547,6 +575,7 @@ fn server_thread_main( binding_addr: SocketAddr, prebuilt_listener: Option, backlog: i32, + num_flows: u16, request_size: usize, response_size: usize, no_delay: bool, @@ -566,7 +595,9 @@ fn server_thread_main( let mut listener = TcpListener::from_std(std_listener); let mut poll = Poll::new().expect("failed to create mio::Poll"); - let mut events = Events::with_capacity(256); + // Worst case under SO_REUSEPORT: every flow lands on this thread, so size + // for the total flow count plus the listener token. + let mut events = Events::with_capacity(num_flows as usize + 1); poll.registry() .register(&mut listener, LISTENER_TOKEN, Interest::READABLE) From db754bad5532150cdb247133c086c474a0fd5264 Mon Sep 17 00:00:00 2001 From: Usama Saqib Date: Tue, 2 Jun 2026 11:04:30 +0200 Subject: [PATCH 4/6] remove non ascii chars from comments --- lading/src/blackhole/tcp_rr.rs | 2 +- lading/src/generator/tcp_rr.rs | 4 ++-- lading/src/neper/bpf.rs | 4 ++-- lading/src/neper/flow.rs | 2 +- lading/src/neper/metrics.rs | 2 +- lading/src/neper/rr.rs | 6 +++--- 6 files changed, 10 insertions(+), 10 deletions(-) diff --git a/lading/src/blackhole/tcp_rr.rs b/lading/src/blackhole/tcp_rr.rs index f6aefb1f0..0309447a1 100644 --- a/lading/src/blackhole/tcp_rr.rs +++ b/lading/src/blackhole/tcp_rr.rs @@ -1,4 +1,4 @@ -//! TCP request/response (`tcp_rr`) blackhole — the server side. +//! TCP request/response (`tcp_rr`) blackhole - the server side. //! Based on //! //! Listens for incoming connections and, for each flow, reads a fixed-size diff --git a/lading/src/generator/tcp_rr.rs b/lading/src/generator/tcp_rr.rs index d3f2e75b8..9137265e1 100644 --- a/lading/src/generator/tcp_rr.rs +++ b/lading/src/generator/tcp_rr.rs @@ -1,4 +1,4 @@ -//! TCP request/response (`tcp_rr`) generator — the client side. +//! TCP request/response (`tcp_rr`) generator - the client side. //! Based on //! //! Implements neper's `tcp_rr` protocol: each flow sends a fixed-size request, @@ -49,7 +49,7 @@ fn default_data_port() -> u16 { #[serde(deny_unknown_fields)] /// Configuration for the `tcp_rr` generator. /// -/// Flow count is *not* configured here — it is owned by the +/// Flow count is *not* configured here - it is owned by the /// `tcp_rr` blackhole and communicated to the generator over the control port /// during startup. pub struct Config { diff --git a/lading/src/neper/bpf.rs b/lading/src/neper/bpf.rs index a7eea42b6..30051b262 100644 --- a/lading/src/neper/bpf.rs +++ b/lading/src/neper/bpf.rs @@ -5,7 +5,7 @@ //! //! The program is loaded via the `bpf()` syscall and attached to a listener //! socket with `SO_ATTACH_REUSEPORT_EBPF`. Only the first socket in the -//! reuseport group needs the program attached — the kernel applies it to the +//! reuseport group needs the program attached - the kernel applies it to the //! entire group. #![allow( @@ -64,7 +64,7 @@ const BPF_FUNC_GET_PRANDOM_U32: i32 = 7; const BPF_PROG_LOAD: libc::c_int = 5; const BPF_PROG_TYPE_SOCKET_FILTER: u32 = 1; -// bpf_attr union for BPF_PROG_LOAD — only the fields we need. +// bpf_attr union for BPF_PROG_LOAD - only the fields we need. #[repr(C)] struct BpfAttrProgLoad { prog_type: u32, diff --git a/lading/src/neper/flow.rs b/lading/src/neper/flow.rs index e4b95dc46..5037825f2 100644 --- a/lading/src/neper/flow.rs +++ b/lading/src/neper/flow.rs @@ -31,7 +31,7 @@ pub(crate) enum Action { /// Token-indexed flow storage. /// /// Flows are stored in a `Vec` indexed by token value. Removed slots become -/// `None` and are not reused — tokens are monotonically increasing, matching +/// `None` and are not reused - tokens are monotonically increasing, matching /// neper's behavior. pub(crate) struct FlowMap { inner: Vec>>, diff --git a/lading/src/neper/metrics.rs b/lading/src/neper/metrics.rs index 8c3fdab06..3c739268a 100644 --- a/lading/src/neper/metrics.rs +++ b/lading/src/neper/metrics.rs @@ -42,7 +42,7 @@ impl ThreadCounter { macro_rules! define_thread_metrics { ($($name:ident),* $(,)?) => { /// Per-thread counters. - /// Fields are a superset used across all modes — unused fields stay at 0. + /// Fields are a superset used across all modes - unused fields stay at 0. #[repr(C, align(128))] pub(crate) struct ThreadMetrics { $(pub(crate) $name: ThreadCounter,)* diff --git a/lading/src/neper/rr.rs b/lading/src/neper/rr.rs index 0f3d269dd..adccf0323 100644 --- a/lading/src/neper/rr.rs +++ b/lading/src/neper/rr.rs @@ -48,7 +48,7 @@ pub enum Error { /// Parameters for [`run_client`]. /// -/// Flow count is *not* a client parameter — it is owned by the server and +/// Flow count is *not* a client parameter - it is owned by the server and /// communicated to the client over the control connection during startup. pub(crate) struct ClientParams { /// Address of the server's data port. @@ -100,7 +100,7 @@ const LISTENER_TOKEN: Token = Token(0); /// Control-channel handshake: server writes `flows` to the accepted control /// connection as a 2-byte big-endian `u16` and closes; client reads the same -/// 2 bytes after connecting. Internal protocol — no magic / version byte. +/// 2 bytes after connecting. Internal protocol - no magic / version byte. const HANDSHAKE_LEN: usize = 2; const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(5); @@ -388,7 +388,7 @@ pub(crate) async fn run_server( // Pre-build thread 0's listener here so the BPF program is attached to the // reuseport group before any other thread calls bind(). This removes the - // need for a cross-thread BPF barrier — if bind fails or panics, it + // need for a cross-thread BPF barrier - if bind fails or panics, it // propagates as an error directly from this task. let binding_addr = params.data_addr; let thread0_listener = if num_threads > 1 { From f74a091536e54244bcf2329130cd5af121bc2646 Mon Sep 17 00:00:00 2001 From: Usama Saqib Date: Tue, 2 Jun 2026 11:04:56 +0200 Subject: [PATCH 5/6] fmt --- lading/src/neper/rr.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/lading/src/neper/rr.rs b/lading/src/neper/rr.rs index adccf0323..71a3ec01f 100644 --- a/lading/src/neper/rr.rs +++ b/lading/src/neper/rr.rs @@ -392,7 +392,12 @@ pub(crate) async fn run_server( // propagates as an error directly from this task. let binding_addr = params.data_addr; let thread0_listener = if num_threads > 1 { - Some(create_listener(0, num_threads, binding_addr, params.backlog)) + Some(create_listener( + 0, + num_threads, + binding_addr, + params.backlog, + )) } else { None }; From 86bb9725e0a46aa7b88023afc14a9f19fa6120cf Mon Sep 17 00:00:00 2001 From: Usama Saqib Date: Tue, 2 Jun 2026 12:07:43 +0200 Subject: [PATCH 6/6] ensure shutdown flag is set on error paths --- lading/src/neper/rr.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/lading/src/neper/rr.rs b/lading/src/neper/rr.rs index 71a3ec01f..aa62f8d19 100644 --- a/lading/src/neper/rr.rs +++ b/lading/src/neper/rr.rs @@ -458,10 +458,14 @@ pub(crate) async fn run_server( // All data listeners are up. Open control port so the generator can // connect and know we're ready. let control_addr = params.control_addr; - let control_listener = net::TcpListener::bind(control_addr).map_err(|source| Error::Bind { + let ctrl_res = net::TcpListener::bind(control_addr).map_err(|source| Error::Bind { addr: control_addr, source: Box::new(source), - })?; + }); + if ctrl_res.is_err() { + shutdown_flag.store(true, Relaxed); + } + let control_listener = ctrl_res?; control_listener .set_nonblocking(true) .expect("failed to set control listener nonblocking"); @@ -489,7 +493,10 @@ pub(crate) async fn run_server( // against a generator that connects but never reads. conn.set_write_timeout(Some(HANDSHAKE_TIMEOUT)) .expect("set_write_timeout on accepted TcpStream must succeed"); - conn.write_all(&flows_bytes)?; + if let Err(err) = conn.write_all(&flows_bytes) { + shutdown_flag.store(true, Relaxed); + return Err(err.into()); + } info!( "generator connected from {peer}, sent flows={}, data threads running", params.flows @@ -501,6 +508,7 @@ pub(crate) async fn run_server( tokio::time::sleep(Duration::from_millis(100)).await; } Err(e) => { + shutdown_flag.store(true, Relaxed); return Err(Error::Bind { addr: control_addr, source: Box::new(e),