diff --git a/NanoCtrl/Cargo.lock b/NanoCtrl/Cargo.lock index 4902975..5d864ac 100644 --- a/NanoCtrl/Cargo.lock +++ b/NanoCtrl/Cargo.lock @@ -232,6 +232,40 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "comfy-table" +version = "7.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "958c5d6ecf1f214b4c2bbbbf6ab9523a864bd136dcf71a7e8904799acfe1ad47" +dependencies = [ + "crossterm", + "unicode-segmentation", + "unicode-width", +] + +[[package]] +name = "crossterm" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8b9f2e4c67f833b660cdb0a3523065869fb35570177239812ed4c905aeff87b" +dependencies = [ + "bitflags", + "crossterm_winapi", + "document-features", + "parking_lot", + "rustix", + "winapi", +] + +[[package]] +name = "crossterm_winapi" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acdd7c62a3665c7f6830a51635d9ac9b23ed385797f70a83bb8bafe9c572ab2b" +dependencies = [ + "winapi", +] + [[package]] name = "deadpool" version = "0.12.3" @@ -274,6 +308,15 @@ dependencies = [ "syn", ] +[[package]] +name = "document-features" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4b8a88685455ed29a21542a33abd9cb6510b6b129abadabdcef0f4c55bc8f61" +dependencies = [ + "litrs", +] + [[package]] name = "either" version = "1.15.0" @@ -564,9 +607,15 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.180" +version = "0.2.186" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68ab91017fe16c622486840e4c83c9a37afeff978bd239b5293d61ece587de66" + +[[package]] +name = "linux-raw-sys" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bcc35a38544a891a5f7c865aca548a982ccb3b8650a5b06d0fd33a10283c56fc" +checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53" [[package]] name = "litemap" @@ -574,6 +623,12 @@ version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6373607a59f0be73a39b6fe456b8192fcc3585f602af20751600e974dd455e77" +[[package]] +name = "litrs" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11d3d7f243d5c5a8b9bb5d6dd2b1602c0cb0b9db1621bafc7ed66e35ff9fe092" + [[package]] name = "lock_api" version = "0.4.14" @@ -629,11 +684,12 @@ dependencies = [ [[package]] name = "nanoctrl" -version = "0.0.7" +version = "0.0.8" dependencies = [ "anyhow", "axum", "clap", + "comfy-table", "deadpool-redis", "redis", "serde", @@ -822,6 +878,19 @@ version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58" +[[package]] +name = "rustix" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190" +dependencies = [ + "bitflags", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.61.2", +] + [[package]] name = "rustversion" version = "1.0.22" @@ -1181,6 +1250,18 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5" +[[package]] +name = "unicode-segmentation" +version = "1.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9629274872b2bfaf8d66f5f15725007f635594914870f65218920345aa11aa8c" + +[[package]] +name = "unicode-width" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4ac048d71ede7ee76d585517add45da530660ef4390e49b098733c6e897f254" + [[package]] name = "url" version = "2.5.8" @@ -1217,6 +1298,28 @@ version = "0.11.1+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "windows-link" version = "0.2.1" diff --git a/NanoCtrl/Cargo.toml b/NanoCtrl/Cargo.toml index 3ac69b9..2b1c97b 100644 --- a/NanoCtrl/Cargo.toml +++ b/NanoCtrl/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "nanoctrl" -version = "0.0.7" +version = "0.0.8" edition = "2021" description = "NanoInfra control plane server" license = "MIT" @@ -23,3 +23,4 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } anyhow = "1.0" tower = "0.4" tower-http = { version = "0.5", features = ["trace"] } +comfy-table = "7" diff --git a/NanoCtrl/src/main.rs b/NanoCtrl/src/main.rs index b3943f3..8a2f293 100644 --- a/NanoCtrl/src/main.rs +++ b/NanoCtrl/src/main.rs @@ -8,6 +8,7 @@ mod error; mod handlers; mod models; mod net; +mod obs; mod redis_repo; mod state; @@ -51,6 +52,8 @@ enum Commands { Status(StatusArgs), /// Stop NanoCtrl background server. Stop(StopArgs), + /// Query observability snapshots from Redis. + Obs(obs::ObsArgs), } #[derive(ClapArgs, Clone, Debug)] @@ -125,6 +128,7 @@ async fn main() -> anyhow::Result<()> { Some(Commands::Start(args)) => start_server(args).await, Some(Commands::Status(args)) => status_server(args), Some(Commands::Stop(args)) => stop_server(args), + Some(Commands::Obs(args)) => obs::run_obs(args), None => run_server(cli.server).await, } } diff --git a/NanoCtrl/src/obs.rs b/NanoCtrl/src/obs.rs new file mode 100644 index 0000000..b64cd08 --- /dev/null +++ b/NanoCtrl/src/obs.rs @@ -0,0 +1,752 @@ +//! `nanoctrl obs` subcommands: query observability snapshots from Redis. +//! +//! Each PeerAgent (when `DLSLIME_OBS=1`) periodically writes a JSON snapshot +//! to `{scope}:obs:peer:{peer_id}`. This module scans those keys and +//! presents them as human-readable tables or `--json` output. + +use anyhow::Result; +use clap::{Args as ClapArgs, Subcommand}; +use comfy_table::{presets::UTF8_FULL, Cell, CellAlignment, Table}; +use serde_json::Value; +use std::collections::BTreeMap; +use std::time::{SystemTime, UNIX_EPOCH}; + +// ────────────────────────── CLI args ────────────────────────── + +#[derive(ClapArgs, Clone, Debug)] +pub struct ObsArgs { + #[command(subcommand)] + pub command: ObsCommands, +} + +#[derive(Subcommand, Clone, Debug)] +pub enum ObsCommands { + /// Cluster-wide summary (peers alive, total traffic, errors). + Status(ObsQueryArgs), + /// Per-peer breakdown table. + Peers(ObsQueryArgs), + /// Per-NIC breakdown table. + Nics(ObsQueryArgs), + /// Per-connection catalog (directed links between peers). + /// + /// v0 lists SRC/DST peer+NIC and STATE derived from each peer's + /// reported `connections` list. BW / BYTES / PENDING / ERRORS are + /// not yet accounted per connection — they render as `-` and will + /// land in a follow-up PR that adds peer-pair dimensions to the + /// C++ counters. + Links(ObsQueryArgs), +} + +#[derive(ClapArgs, Clone, Debug)] +pub struct ObsQueryArgs { + /// Redis connection URL. + #[arg(long, default_value = "redis://127.0.0.1:6379")] + pub redis_url: String, + /// Scope prefix (same as NANOCTRL_SCOPE on agents). + #[arg(long)] + pub scope: Option, + /// Snapshots older than this are marked stale (ms). + #[arg(long, default_value_t = 45000)] + pub stale_ms: u64, + /// Output raw JSON instead of table. + #[arg(long)] + pub json: bool, +} + +// ────────────────────────── Data types ────────────────────────── + +#[derive(Debug)] +#[allow(dead_code)] +struct ObsSnapshot { + key: String, + peer_id: String, + host: String, + pid: u64, + reported_at_ms: u64, + age_ms: u64, + stale: bool, + // Explicit lifecycle marker written by PeerAgent on graceful shutdown + // (status = "stopped"). Absent for alive snapshots. Takes precedence + // over the age-derived alive/stale state in the STATE column. + status: Option, + summary: Value, + nics: Vec, + ewma_bandwidth_bps: f64, + connections: Vec, + raw: Value, +} + +// ────────────────────────── Entry point ────────────────────────── + +pub fn run_obs(args: ObsArgs) -> Result<()> { + match args.command { + ObsCommands::Status(q) => cmd_status(q), + ObsCommands::Peers(q) => cmd_peers(q), + ObsCommands::Nics(q) => cmd_nics(q), + ObsCommands::Links(q) => cmd_links(q), + } +} + +// ────────────────────────── Scan helper ────────────────────────── + +fn scan_snapshots(args: &ObsQueryArgs) -> Result> { + let redis_url = std::env::var("NANOCTRL_REDIS_URL").unwrap_or_else(|_| args.redis_url.clone()); + let client = redis::Client::open(redis_url.as_str())?; + let mut conn = client.get_connection()?; + + let pattern = match &args.scope { + Some(s) if !s.is_empty() => format!("{s}:obs:peer:*"), + _ => "obs:peer:*".to_string(), + }; + + // SCAN, not KEYS: KEYS is O(N) and blocks Redis in real deployments. + // SCAN iterates the keyspace in chunks and is safe under load. + let mut keys: Vec = Vec::new(); + let mut cursor: u64 = 0; + loop { + let (next_cursor, chunk): (u64, Vec) = redis::cmd("SCAN") + .arg(cursor) + .arg("MATCH") + .arg(&pattern) + .arg("COUNT") + .arg(200) + .query(&mut conn)?; + keys.extend(chunk); + if next_cursor == 0 { + break; + } + cursor = next_cursor; + } + + let now_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + + // Bulk fetch with MGET (empty vec → skip the round-trip entirely). + let values: Vec> = if keys.is_empty() { + Vec::new() + } else { + redis::cmd("MGET").arg(&keys).query(&mut conn)? + }; + + let mut snapshots = Vec::new(); + for (key, val) in keys.iter().zip(values.into_iter()) { + let Some(val_str) = val else { continue }; + if let Some(snap) = parse_snapshot(key, &val_str, now_ms, args.stale_ms) { + snapshots.push(snap); + } + } + + // Sort by peer_id for stable output + snapshots.sort_by(|a, b| a.peer_id.cmp(&b.peer_id)); + Ok(snapshots) +} + +/// Parse one Redis value into an ObsSnapshot. Extracted as a pure +/// function so it can be unit-tested without a live Redis instance. +fn parse_snapshot(key: &str, val_str: &str, now_ms: u64, stale_ms: u64) -> Option { + let raw: Value = serde_json::from_str(val_str).ok()?; + + let reported_at_ms = raw + .get("reported_at_ms") + .and_then(Value::as_u64) + .unwrap_or(0); + let age_ms = now_ms.saturating_sub(reported_at_ms); + + Some(ObsSnapshot { + key: key.to_string(), + peer_id: raw + .get("peer_id") + .and_then(Value::as_str) + .unwrap_or("") + .to_string(), + host: raw + .get("host") + .and_then(Value::as_str) + .unwrap_or("") + .to_string(), + pid: raw.get("pid").and_then(Value::as_u64).unwrap_or(0), + reported_at_ms, + age_ms, + stale: age_ms > stale_ms, + status: raw + .get("status") + .and_then(Value::as_str) + .map(str::to_string), + summary: raw.get("summary").cloned().unwrap_or(Value::Null), + nics: raw + .get("nics") + .and_then(Value::as_array) + .cloned() + .unwrap_or_default(), + ewma_bandwidth_bps: raw + .get("ewma_bandwidth_bps") + .and_then(Value::as_f64) + .unwrap_or(0.0), + connections: raw + .get("connections") + .and_then(Value::as_array) + .cloned() + .unwrap_or_default(), + raw, + }) +} + +// ────────────────────────── Formatters ────────────────────────── + +fn human_bytes(b: u64) -> String { + const KB: u64 = 1024; + const MB: u64 = 1024 * KB; + const GB: u64 = 1024 * MB; + const TB: u64 = 1024 * GB; + if b >= TB { + format!("{:.1}TB", b as f64 / TB as f64) + } else if b >= GB { + format!("{:.1}GB", b as f64 / GB as f64) + } else if b >= MB { + format!("{:.1}MB", b as f64 / MB as f64) + } else if b >= KB { + format!("{:.1}KB", b as f64 / KB as f64) + } else { + format!("{b}B") + } +} + +fn human_bps(bps: f64) -> String { + if bps >= 1e12 { + format!("{:.1}Tbps", bps / 1e12) + } else if bps >= 1e9 { + format!("{:.1}Gbps", bps / 1e9) + } else if bps >= 1e6 { + format!("{:.1}Mbps", bps / 1e6) + } else if bps >= 1e3 { + format!("{:.1}Kbps", bps / 1e3) + } else { + format!("{:.0}bps", bps) + } +} + +fn human_age(ms: u64) -> String { + if ms >= 60_000 { + format!("{}m", ms / 60_000) + } else if ms >= 1_000 { + format!("{}s", ms / 1_000) + } else { + format!("{ms}ms") + } +} + +fn get_u64(v: &Value, key: &str) -> u64 { + v.get(key).and_then(Value::as_u64).unwrap_or(0) +} + +fn get_i64(v: &Value, key: &str) -> i64 { + v.get(key).and_then(Value::as_i64).unwrap_or(0) +} + +/// Map an ObsSnapshot to the STATE column string. +/// +/// Priority: explicit `status` marker (e.g. "stopped" from graceful +/// shutdown) overrides the age-derived state. Otherwise fall back to +/// the stale/alive flag. +fn snapshot_state(s: &ObsSnapshot) -> &'static str { + if let Some(status) = s.status.as_deref() { + match status { + "stopped" => return "stopped", + "starting" => return "starting", + _ => {} + } + } + if s.stale { + "stale" + } else { + "alive" + } +} + +/// Fresh comfy-table with the preset the obs subcommands use. +/// Headers passed in `headers` are laid out left-to-right; columns +/// listed in `right_aligned` are right-aligned (typical for numerics). +fn obs_table(headers: &[&str], right_aligned: &[usize]) -> Table { + let mut table = Table::new(); + table.load_preset(UTF8_FULL); + table.set_header(headers.iter().map(Cell::new)); + for idx in right_aligned { + if let Some(col) = table.column_mut(*idx) { + col.set_cell_alignment(CellAlignment::Right); + } + } + table +} + +/// Unicode dash rendered in columns that aren't accounted yet. +const UNKNOWN: &str = "-"; + +// ────────────────────────── Commands ────────────────────────── + +fn cmd_status(args: ObsQueryArgs) -> Result<()> { + let snapshots = scan_snapshots(&args)?; + + // Partition snapshots by the same rule the peers table uses. + let alive = snapshots + .iter() + .filter(|s| snapshot_state(s) == "alive") + .count(); + let stale = snapshots + .iter() + .filter(|s| snapshot_state(s) == "stale") + .count(); + let stopped = snapshots + .iter() + .filter(|s| snapshot_state(s) == "stopped") + .count(); + + if args.json { + let mut out = BTreeMap::new(); + let total_assign: u64 = snapshots + .iter() + .map(|s| get_u64(&s.summary, "assign_total")) + .sum(); + let total_bytes: u64 = snapshots + .iter() + .map(|s| get_u64(&s.summary, "completed_bytes_total")) + .sum(); + let total_pending: i64 = snapshots + .iter() + .map(|s| get_i64(&s.summary, "pending_ops")) + .sum(); + let total_errors: u64 = snapshots + .iter() + .map(|s| get_u64(&s.summary, "error_total")) + .sum(); + let total_bw: f64 = snapshots.iter().map(|s| s.ewma_bandwidth_bps).sum(); + + out.insert("alive", serde_json::json!(alive)); + out.insert("stale", serde_json::json!(stale)); + out.insert("stopped", serde_json::json!(stopped)); + out.insert("assign_total", serde_json::json!(total_assign)); + out.insert("completed_bytes_total", serde_json::json!(total_bytes)); + out.insert("pending_ops", serde_json::json!(total_pending)); + out.insert("error_total", serde_json::json!(total_errors)); + out.insert("ewma_bandwidth_bps", serde_json::json!(total_bw)); + + println!("{}", serde_json::to_string_pretty(&out)?); + return Ok(()); + } + + let scope_str = args.scope.as_deref().unwrap_or("(none)"); + println!("Redis: {} Scope: {}", args.redis_url, scope_str); + println!( + "Peers: {} alive, {} stale, {} stopped", + alive, stale, stopped + ); + + let total_assign: u64 = snapshots + .iter() + .map(|s| get_u64(&s.summary, "assign_total")) + .sum(); + let total_batch: u64 = snapshots + .iter() + .map(|s| get_u64(&s.summary, "batch_total")) + .sum(); + let total_bytes: u64 = snapshots + .iter() + .map(|s| get_u64(&s.summary, "completed_bytes_total")) + .sum(); + let total_pending: i64 = snapshots + .iter() + .map(|s| get_i64(&s.summary, "pending_ops")) + .sum(); + let total_errors: u64 = snapshots + .iter() + .map(|s| get_u64(&s.summary, "error_total")) + .sum(); + let total_bw: f64 = snapshots.iter().map(|s| s.ewma_bandwidth_bps).sum(); + + println!( + "Total: assign={} batch={} bytes={} pending={} errors={}", + total_assign, + total_batch, + human_bytes(total_bytes), + total_pending, + total_errors, + ); + println!("Estimated BW: {} (EWMA)", human_bps(total_bw)); + + Ok(()) +} + +fn cmd_peers(args: ObsQueryArgs) -> Result<()> { + let snapshots = scan_snapshots(&args)?; + + if args.json { + let arr: Vec<&Value> = snapshots.iter().map(|s| &s.raw).collect(); + println!("{}", serde_json::to_string_pretty(&arr)?); + return Ok(()); + } + + if snapshots.is_empty() { + println!("(no obs snapshots found)"); + return Ok(()); + } + + let mut table = obs_table( + &[ + "PEER", "HOST", "PID", "AGE", "STATE", "ASSIGN", "BATCH", "BW", "BYTES", "PENDING", + "ERRORS", + ], + // Numeric columns: ASSIGN..ERRORS + &[5, 6, 7, 8, 9, 10], + ); + + for s in &snapshots { + table.add_row(vec![ + Cell::new(&s.peer_id), + Cell::new(&s.host), + Cell::new(s.pid), + Cell::new(human_age(s.age_ms)), + Cell::new(snapshot_state(s)), + Cell::new(get_u64(&s.summary, "assign_total")), + Cell::new(get_u64(&s.summary, "batch_total")), + Cell::new(human_bps(s.ewma_bandwidth_bps)), + Cell::new(human_bytes(get_u64(&s.summary, "completed_bytes_total"))), + Cell::new(get_i64(&s.summary, "pending_ops")), + Cell::new(get_u64(&s.summary, "error_total")), + ]); + } + + println!("{table}"); + Ok(()) +} + +fn cmd_nics(args: ObsQueryArgs) -> Result<()> { + let snapshots = scan_snapshots(&args)?; + + if args.json { + let mut arr = Vec::new(); + for s in &snapshots { + for nic in &s.nics { + let mut obj = nic.clone(); + if let Some(map) = obj.as_object_mut() { + map.insert("peer_id".to_string(), Value::String(s.peer_id.clone())); + map.insert("age_ms".to_string(), serde_json::json!(s.age_ms)); + } + arr.push(obj); + } + } + println!("{}", serde_json::to_string_pretty(&arr)?); + return Ok(()); + } + + if snapshots.is_empty() { + println!("(no obs snapshots found)"); + return Ok(()); + } + + // Columns: PEER NIC AGE ASSIGN BATCH BW BYTES PENDING ERRORS POST_BYTES POST_FAIL CQ_ERR + // BW = ewma_bandwidth_bps (real bandwidth, computed by PeerAgent reporter) + // BYTES = completed_bytes_total (semantic) + // POST_BYTES = post_bytes_total (transport-level, cumulative) + let mut table = obs_table( + &[ + "PEER", + "NIC", + "AGE", + "ASSIGN", + "BATCH", + "BW", + "BYTES", + "PENDING", + "ERRORS", + "POST_BYTES", + "POST_FAIL", + "CQ_ERR", + ], + // Numeric columns: ASSIGN..CQ_ERR + &[3, 4, 5, 6, 7, 8, 9, 10, 11], + ); + + let mut any_row = false; + for s in &snapshots { + for nic in &s.nics { + any_row = true; + let nic_name = nic.get("nic").and_then(Value::as_str).unwrap_or("?"); + let nic_bw_bps = nic + .get("ewma_bandwidth_bps") + .and_then(Value::as_f64) + .unwrap_or(0.0); + + table.add_row(vec![ + Cell::new(&s.peer_id), + Cell::new(nic_name), + Cell::new(human_age(s.age_ms)), + Cell::new(get_u64(nic, "assign_total")), + Cell::new(get_u64(nic, "batch_total")), + Cell::new(human_bps(nic_bw_bps)), + Cell::new(human_bytes(get_u64(nic, "completed_bytes_total"))), + Cell::new(get_i64(nic, "pending_ops")), + Cell::new(get_u64(nic, "error_total")), + Cell::new(human_bytes(get_u64(nic, "post_bytes_total"))), + Cell::new(get_u64(nic, "post_failures_total")), + Cell::new(get_u64(nic, "cq_errors_total")), + ]); + } + } + + if !any_row { + println!("(no obs snapshots found)"); + return Ok(()); + } + + println!("{table}"); + Ok(()) +} + +/// Per-connection catalog. Fields SRC_PEER / SRC_NIC / DST_PEER / DST_NIC +/// / STATE are harvested from each peer's reported `connections` list. +/// +/// v0 does NOT index traffic counters by (src_peer, src_nic, dst_peer, +/// dst_nic) — those columns render as `-`. Peer-pair accounting is a +/// deliberate follow-up. +fn cmd_links(args: ObsQueryArgs) -> Result<()> { + let snapshots = scan_snapshots(&args)?; + + if args.json { + let mut arr = Vec::new(); + for s in &snapshots { + for conn in &s.connections { + let mut obj = conn.clone(); + if let Some(map) = obj.as_object_mut() { + map.insert("src_peer".to_string(), Value::String(s.peer_id.clone())); + map.insert("age_ms".to_string(), serde_json::json!(s.age_ms)); + } + arr.push(obj); + } + } + println!("{}", serde_json::to_string_pretty(&arr)?); + return Ok(()); + } + + if snapshots.is_empty() { + println!("(no obs snapshots found)"); + return Ok(()); + } + + let mut table = obs_table( + &[ + "SRC_PEER", "SRC_NIC", "DST_PEER", "DST_NIC", "STATE", "BW", "BYTES", "PENDING", + "ERRORS", + ], + &[5, 6, 7, 8], + ); + + let mut any_row = false; + for s in &snapshots { + for conn in &s.connections { + any_row = true; + let src_nic = conn.get("local_nic").and_then(Value::as_str).unwrap_or("?"); + let dst_peer = conn.get("peer").and_then(Value::as_str).unwrap_or("?"); + let dst_nic = conn + .get("remote_nic") + .and_then(Value::as_str) + .unwrap_or("?"); + + // Prefer the explicit DirectedConnection.state field; fall + // back to the connected bool when state is absent. + let state_owned = conn + .get("state") + .and_then(Value::as_str) + .map(str::to_string) + .unwrap_or_else(|| match conn.get("connected").and_then(Value::as_bool) { + Some(true) => "connected".to_string(), + Some(false) => "disconnected".to_string(), + None => "unknown".to_string(), + }); + + table.add_row(vec![ + Cell::new(&s.peer_id), + Cell::new(src_nic), + Cell::new(dst_peer), + Cell::new(dst_nic), + Cell::new(state_owned), + Cell::new(UNKNOWN), + Cell::new(UNKNOWN), + Cell::new(UNKNOWN), + Cell::new(UNKNOWN), + ]); + } + } + + if !any_row { + println!("(no connections reported)"); + return Ok(()); + } + + println!("{table}"); + println!(); + println!( + "Note: per-link traffic counters (BW, BYTES, PENDING, ERRORS) are \ + not yet accounted in v0 and render as '{UNKNOWN}'. \ + See `nanoctrl obs nics` for per-NIC traffic." + ); + Ok(()) +} + +// ────────────────────────── Tests ────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + fn sample_snapshot_json(reported_at_ms: u64) -> String { + serde_json::json!({ + "schema_version": 1, + "session_id": "alice:123:1700000000000", + "peer_id": "alice", + "host": "10.0.0.1", + "pid": 123, + "reported_at_ms": reported_at_ms, + "summary": { + "assign_total": 42, + "batch_total": 7, + "completed_bytes_total": 1048576, + "pending_ops": 0, + "error_total": 0, + "pending_by_op": {"read": 0, "write": 0, "write_with_imm": 0} + }, + "nics": [{ + "nic": "mlx5_0", + "assign_total": 42, + "batch_total": 7, + "completed_bytes_total": 1048576, + "post_bytes_total": 2097152, + "pending_ops": 0, + "error_total": 0, + "post_failures_total": 0, + "cq_errors_total": 0, + "ewma_bandwidth_bps": 1.25e9 + }], + "connections": [{ + "conn_id": "alice:mlx5_0->bob:mlx5_1", + "peer": "bob", + "local_nic": "mlx5_0", + "remote_nic": "mlx5_1", + "state": "connected", + "connected": true + }], + "ewma_bandwidth_bps": 1.25e9 + }) + .to_string() + } + + #[test] + fn parse_snapshot_extracts_all_fields() { + let now_ms = 1_700_000_010_000u64; + let reported_at_ms = 1_700_000_005_000u64; + let json = sample_snapshot_json(reported_at_ms); + + let snap = parse_snapshot("scope:obs:peer:alice", &json, now_ms, 45_000) + .expect("parse should succeed"); + + assert_eq!(snap.peer_id, "alice"); + assert_eq!(snap.host, "10.0.0.1"); + assert_eq!(snap.pid, 123); + assert_eq!(snap.reported_at_ms, reported_at_ms); + assert_eq!(snap.age_ms, 5_000); + assert!(!snap.stale); + assert_eq!(snap.nics.len(), 1); + assert_eq!(snap.connections.len(), 1); + assert!((snap.ewma_bandwidth_bps - 1.25e9).abs() < 1e-6); + + // Per-NIC ewma_bandwidth_bps should be preserved in the nic JSON. + let nic_ewma = snap.nics[0] + .get("ewma_bandwidth_bps") + .and_then(Value::as_f64) + .unwrap(); + assert!((nic_ewma - 1.25e9).abs() < 1e-6); + } + + #[test] + fn parse_snapshot_marks_stale_beyond_threshold() { + let now_ms = 1_700_000_100_000u64; + let reported_at_ms = 1_700_000_000_000u64; // 100s ago + let json = sample_snapshot_json(reported_at_ms); + + let snap = parse_snapshot("scope:obs:peer:alice", &json, now_ms, 45_000).unwrap(); + assert!(snap.stale); + assert!(snap.age_ms >= 100_000); + } + + #[test] + fn parse_snapshot_returns_none_on_invalid_json() { + let snap = parse_snapshot("bad", "not json", 0, 45_000); + assert!(snap.is_none()); + } + + #[test] + fn parse_snapshot_defaults_when_fields_missing() { + // Minimal payload — just enough to be valid JSON object. + let snap = parse_snapshot("scope:obs:peer:mystery", "{}", 1_000_000, 45_000).unwrap(); + assert_eq!(snap.peer_id, ""); + assert_eq!(snap.reported_at_ms, 0); + assert!(snap.stale); // age_ms = now_ms > stale_ms + assert!(snap.nics.is_empty()); + assert!(snap.connections.is_empty()); + assert_eq!(snap.ewma_bandwidth_bps, 0.0); + } + + #[test] + fn human_bps_buckets() { + assert_eq!(human_bps(0.0), "0bps"); + assert_eq!(human_bps(1_500.0), "1.5Kbps"); + assert_eq!(human_bps(1.25e9), "1.2Gbps"); + } + + #[test] + fn snapshot_state_uses_status_marker_over_age() { + // Fresh snapshot but explicit status="stopped" takes precedence. + let now_ms = 1_700_000_010_000u64; + let reported_at_ms = 1_700_000_009_000u64; // 1s ago, not stale + let mut raw = serde_json::from_str::(&sample_snapshot_json(reported_at_ms)).unwrap(); + raw["status"] = Value::String("stopped".to_string()); + let json = raw.to_string(); + let snap = parse_snapshot("k", &json, now_ms, 45_000).unwrap(); + assert!(!snap.stale); + assert_eq!(snapshot_state(&snap), "stopped"); + } + + #[test] + fn snapshot_state_falls_back_to_stale_when_no_status() { + let now_ms = 1_700_000_100_000u64; + let reported_at_ms = 1_700_000_000_000u64; + let json = sample_snapshot_json(reported_at_ms); + let snap = parse_snapshot("k", &json, now_ms, 45_000).unwrap(); + assert_eq!(snapshot_state(&snap), "stale"); + } + + #[test] + fn snapshot_state_alive_when_fresh_and_no_status() { + let now_ms = 1_700_000_010_000u64; + let reported_at_ms = 1_700_000_009_000u64; + let json = sample_snapshot_json(reported_at_ms); + let snap = parse_snapshot("k", &json, now_ms, 45_000).unwrap(); + assert_eq!(snapshot_state(&snap), "alive"); + } + + #[test] + fn parse_snapshot_extracts_connection_fields_for_links() { + // The links subcommand walks snapshot.connections — verify the + // parser preserves the per-entry fields we surface in the table. + let now_ms = 1_700_000_010_000u64; + let reported_at_ms = 1_700_000_009_000u64; + let json = sample_snapshot_json(reported_at_ms); + let snap = parse_snapshot("k", &json, now_ms, 45_000).unwrap(); + assert_eq!(snap.connections.len(), 1); + let c = &snap.connections[0]; + assert_eq!(c.get("peer").and_then(Value::as_str), Some("bob")); + assert_eq!(c.get("local_nic").and_then(Value::as_str), Some("mlx5_0")); + assert_eq!(c.get("remote_nic").and_then(Value::as_str), Some("mlx5_1")); + assert_eq!(c.get("state").and_then(Value::as_str), Some("connected")); + } +} diff --git a/dlslime/csrc/CMakeLists.txt b/dlslime/csrc/CMakeLists.txt index 18f1d00..0947f3c 100644 --- a/dlslime/csrc/CMakeLists.txt +++ b/dlslime/csrc/CMakeLists.txt @@ -1,3 +1,4 @@ +add_subdirectory(observability) add_subdirectory(device) add_subdirectory(engine) @@ -20,7 +21,7 @@ target_include_directories(_slime_topology PUBLIC ) add_library(dlslime INTERFACE) -target_link_libraries(dlslime INTERFACE _slime_engine _slime_device _slime_topology) +target_link_libraries(dlslime INTERFACE _slime_engine _slime_device _slime_topology _slime_obs) if(BUILD_RDMA) target_link_libraries(dlslime INTERFACE _slime_rdma) diff --git a/dlslime/csrc/engine/rdma/CMakeLists.txt b/dlslime/csrc/engine/rdma/CMakeLists.txt index 092cf9e..1f81190 100644 --- a/dlslime/csrc/engine/rdma/CMakeLists.txt +++ b/dlslime/csrc/engine/rdma/CMakeLists.txt @@ -10,7 +10,7 @@ set(RDMA_SOURCES ) add_library(_slime_rdma SHARED ${RDMA_SOURCES}) -target_link_libraries(_slime_rdma PUBLIC _slime_device _slime_engine numa ibverbs) +target_link_libraries(_slime_rdma PUBLIC _slime_device _slime_engine _slime_obs numa ibverbs) set_target_properties( _slime_rdma diff --git a/dlslime/csrc/engine/rdma/memory_pool.cpp b/dlslime/csrc/engine/rdma/memory_pool.cpp index 932fb54..ba3e6dc 100644 --- a/dlslime/csrc/engine/rdma/memory_pool.cpp +++ b/dlslime/csrc/engine/rdma/memory_pool.cpp @@ -11,6 +11,7 @@ #include #include "dlslime/csrc/logging.h" +#include "dlslime/csrc/observability/obs.h" namespace dlslime { @@ -18,6 +19,10 @@ int32_t RDMAMemoryPool::registerMemoryRegion(uintptr_t data_ptr, uint64_t length { std::unique_lock lock(name_mutex_); + // Observability: classify this MR as a system (internal) or + // user-visible registration based on the "sys." name prefix. + const bool is_system = name.has_value() && name.value().rfind("sys.", 0) == 0; + // Check if pointer is already registered if (ptr_to_handle_.count(data_ptr)) { int32_t handle = ptr_to_handle_[data_ptr]; @@ -35,9 +40,11 @@ int32_t RDMAMemoryPool::registerMemoryRegion(uintptr_t data_ptr, uint64_t length return handle; } - // Existing MR is too small (address reused for larger buffer) — re-register - SLIME_LOG_INFO( - "Re-registering MR at ", (void*)data_ptr, ": old length=", existing->length, ", new length=", length); + // Existing MR is too small (address reused for larger buffer) — re-register. + // Only the delta (length - existing->length) should be added to the + // MR-bytes counter; the old length was already counted at initial register. + uint64_t old_length = existing->length; + SLIME_LOG_INFO("Re-registering MR at ", (void*)data_ptr, ": old length=", old_length, ", new length=", length); ibv_dereg_mr(existing); int access_rights = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ; @@ -48,6 +55,17 @@ int32_t RDMAMemoryPool::registerMemoryRegion(uintptr_t data_ptr, uint64_t length if (name.has_value()) { name_to_handle_[name.value()] = handle; } + + // Carry forward the user/system classification of the original + // registration — the handle's identity does not change. + bool handle_is_system = false; + auto sys_it = handle_to_is_system_.find(handle); + if (sys_it != handle_to_is_system_.end()) { + handle_is_system = sys_it->second; + } + if (length > old_length) { + obs::obs_record_mr_grow(length - old_length, handle_is_system); + } return handle; } @@ -58,7 +76,8 @@ int32_t RDMAMemoryPool::registerMemoryRegion(uintptr_t data_ptr, uint64_t length int32_t handle = handle_to_mr_.size(); handle_to_mr_.push_back(mr); - ptr_to_handle_[data_ptr] = handle; + ptr_to_handle_[data_ptr] = handle; + handle_to_is_system_[handle] = is_system; if (name.has_value()) { if (name_to_handle_.count(name.value())) { @@ -68,14 +87,15 @@ int32_t RDMAMemoryPool::registerMemoryRegion(uintptr_t data_ptr, uint64_t length name_to_handle_[name.value()] = handle; } - // SLIME_LOG_DEBUG("Registered MR: Handle=", handle, ", Ptr=", (void*)data_ptr, ", Name=", (name.has_value() ? - // name.value() : "None")); if (name.has_value()) { SLIME_LOG_INFO("Registered Local MR: Name=", name.value(), ", Handle=", handle, ", Ptr=", (void*)data_ptr); } else { SLIME_LOG_INFO("Registered Local MR: Handle=", handle, ", Ptr=", (void*)data_ptr); } + + obs::obs_record_mr_register(length, is_system); + return handle; } @@ -126,12 +146,25 @@ int RDMAMemoryPool::unregisterMemoryRegion(int32_t handle) return -1; } + // Capture length and user/system classification before deregistration + // frees the MR struct / removes the map entry. + uint64_t mr_length = handle_to_mr_[handle] ? handle_to_mr_[handle]->length : 0; + bool is_system = false; + auto sys_it = handle_to_is_system_.find(handle); + if (sys_it != handle_to_is_system_.end()) { + is_system = sys_it->second; + } + int rc = ibv_dereg_mr(handle_to_mr_[handle]); if (rc != 0) { SLIME_LOG_ERROR("Failed to unregister Local MR handle=", handle, ", rc=", rc, ", errno=", errno); return rc; } + + obs::obs_record_mr_unregister(mr_length, is_system); + handle_to_mr_[handle] = nullptr; + handle_to_is_system_.erase(handle); for (auto it = name_to_handle_.begin(); it != name_to_handle_.end();) { if (it->second == handle) { diff --git a/dlslime/csrc/engine/rdma/memory_pool.h b/dlslime/csrc/engine/rdma/memory_pool.h index 5c83910..c7981dd 100644 --- a/dlslime/csrc/engine/rdma/memory_pool.h +++ b/dlslime/csrc/engine/rdma/memory_pool.h @@ -136,5 +136,10 @@ class RDMAMemoryPool { std::unordered_map ptr_to_handle_; std::vector handle_to_mr_; + + // Observability: remember whether each registered MR is a system + // (internal "sys.*") MR or a user-visible MR. Looked up on unregister + // so user/sys counters don't drift. + std::unordered_map handle_to_is_system_; }; } // namespace dlslime diff --git a/dlslime/csrc/engine/rdma/rdma_channel.cpp b/dlslime/csrc/engine/rdma/rdma_channel.cpp old mode 100755 new mode 100644 index 75ab0a0..246bb3f --- a/dlslime/csrc/engine/rdma/rdma_channel.cpp +++ b/dlslime/csrc/engine/rdma/rdma_channel.cpp @@ -1,5 +1,6 @@ #include "rdma_channel.h" +#include "dlslime/csrc/observability/obs.h" #include "memory_pool.h" namespace dlslime { @@ -88,6 +89,11 @@ int32_t RDMAChannel::init(std::shared_ptr ctx, size_t num_qp, int32 SLIME_LOG_INFO("RDMA context initialized") SLIME_LOG_DEBUG("RDMA context local configuration: ", channelInfo()); + // Observability: register NIC for transport-level post counters + if (obs::obs_enabled()) { + obs_nic_id_ = obs::obs_register_nic(ctx_->device_name_.c_str()); + } + state = RDMAChannelState::Initialized; return 0; @@ -240,8 +246,21 @@ int64_t RDMAChannel::post_send_batch(int qpi, RDMAAssign* assign, std::shared_pt } ret = ibv_post_send(qp_[qpi], wr, &bad_wr); if (ret) { + obs::obs_record_post_failure(obs_nic_id_, obs::OPCODE_TO_OBS[static_cast(assign->opcode_)]); return -1; } + + // Observability: record transport-level post (bytes already computed in sge loop) + if (obs::obs_enabled()) { + uint64_t total_bytes = 0; + for (size_t i = 0; i < batch_size; ++i) + total_bytes += sge[i].length; + obs::obs_record_post_batch(obs_nic_id_, + obs::OPCODE_TO_OBS[static_cast(assign->opcode_)], + static_cast(batch_size), + total_bytes); + } + return 0; } @@ -273,9 +292,18 @@ int64_t RDMAChannel::post_recv_batch(int qpi, RDMAAssign* assign, std::shared_pt ret = ibv_post_recv(qp_[qpi], wr, &bad_wr); if (ret) { SLIME_LOG_ERROR("Failed to post RDMA recv : " << strerror(ret)); + obs::obs_record_post_failure(obs_nic_id_, obs::OBS_OP_RECV); return -1; } + // Observability: record transport-level post + if (obs::obs_enabled()) { + uint64_t total_bytes = 0; + for (size_t i = 0; i < batch_size; ++i) + total_bytes += sge[i].length; + obs::obs_record_post_batch(obs_nic_id_, obs::OBS_OP_RECV, static_cast(batch_size), total_bytes); + } + return 0; } @@ -321,8 +349,21 @@ int64_t RDMAChannel::post_rc_oneside_batch(int qpi, RDMAAssign* assign, std::sha if (ret) { SLIME_LOG_ERROR("Failed to post RDMA send : " << strerror(ret), ". Error Assignment: ", assign->dump(), "."); + obs::obs_record_post_failure(obs_nic_id_, obs::OPCODE_TO_OBS[static_cast(assign->opcode_)]); return -1; } + + // Observability: record transport-level post + if (obs::obs_enabled()) { + uint64_t total_bytes = 0; + for (size_t i = 0; i < batch_size; ++i) + total_bytes += sge[i].length; + obs::obs_record_post_batch(obs_nic_id_, + obs::OPCODE_TO_OBS[static_cast(assign->opcode_)], + static_cast(batch_size), + total_bytes); + } + return 0; } diff --git a/dlslime/csrc/engine/rdma/rdma_channel.h b/dlslime/csrc/engine/rdma/rdma_channel.h index 8218177..480f8d7 100755 --- a/dlslime/csrc/engine/rdma/rdma_channel.h +++ b/dlslime/csrc/engine/rdma/rdma_channel.h @@ -76,5 +76,7 @@ class RDMAChannel { std::shared_ptr remote_pool_{}; RDMAChannelState state{RDMAChannelState::Destroyed}; + + int obs_nic_id_{-1}; // Observability: registered NIC slot index }; } // namespace dlslime diff --git a/dlslime/csrc/engine/rdma/rdma_context.cpp b/dlslime/csrc/engine/rdma/rdma_context.cpp index 369835b..938b310 100644 --- a/dlslime/csrc/engine/rdma/rdma_context.cpp +++ b/dlslime/csrc/engine/rdma/rdma_context.cpp @@ -31,6 +31,7 @@ #include "dlslime/csrc/engine/rdma/rdma_env.h" #include "dlslime/csrc/engine/rdma/rdma_utils.h" #include "dlslime/csrc/logging.h" +#include "dlslime/csrc/observability/obs.h" namespace dlslime { @@ -165,6 +166,11 @@ int64_t RDMAContext::init(const std::string& dev_name, uint8_t ib_port, const st cq_ = ibv_create_cq(ib_ctx_, actual_cq_depth, NULL, comp_channel_, 0); SLIME_ASSERT(cq_, "create CQ failed"); + // Observability: register NIC for CQ-level error/completion counters + if (obs::obs_enabled()) { + obs_nic_id_ = obs::obs_register_nic(dev_name.c_str()); + } + launch_future(); SLIME_LOG_INFO("RDMA Context Initialized"); return 0; @@ -217,11 +223,19 @@ int64_t RDMAContext::cq_poll_handle() RDMAAssign* assign = reinterpret_cast(wc[i].wr_id); SLIME_LOG_ERROR("Failed WR ID: " << (void*)assign); } + + // Observability: record CQ error (real failure, not flush) + obs::obs_record_cq_error(obs_nic_id_); } } if (wc[i].wr_id != 0) { RDMAAssign* assign = reinterpret_cast(wc[i].wr_id); + + // Semantic completion accounting is owned by the + // EndpointOpState-level callback below (exchange-guarded for + // once-only semantics). CQ polling only records CQ-level + // error signals. if (assign->callback_) { assign->callback_(status_code, wc[i].imm_data); } diff --git a/dlslime/csrc/engine/rdma/rdma_context.h b/dlslime/csrc/engine/rdma/rdma_context.h index f138eab..35959ae 100755 --- a/dlslime/csrc/engine/rdma/rdma_context.h +++ b/dlslime/csrc/engine/rdma/rdma_context.h @@ -96,6 +96,8 @@ class RDMAContext: public std::enable_shared_from_this { /* Completion Queue Polling */ int64_t cq_poll_handle(); + int obs_nic_id_{-1}; // Observability: registered NIC slot index + int64_t service_level_{0}; }; diff --git a/dlslime/csrc/engine/rdma/rdma_endpoint.cpp b/dlslime/csrc/engine/rdma/rdma_endpoint.cpp index 157a883..ab9966a 100644 --- a/dlslime/csrc/engine/rdma/rdma_endpoint.cpp +++ b/dlslime/csrc/engine/rdma/rdma_endpoint.cpp @@ -20,6 +20,7 @@ #include "dlslime/csrc/device/device_api.h" #include "dlslime/csrc/engine/assignment.h" #include "dlslime/csrc/logging.h" +#include "dlslime/csrc/observability/obs.h" #include "dlslime/csrc/utils.h" #include "engine/rdma/memory_pool.h" #include "engine/rdma/rdma_channel.h" @@ -93,6 +94,11 @@ void RDMAEndpoint::init(std::shared_ptr worker) if (not ctx_) SLIME_ABORT("No NIC Resources"); + // Observability: register this NIC for counter tracking + if (obs::obs_enabled()) { + obs_nic_id_ = obs::obs_register_nic(ctx_->device_name_.c_str()); + } + // Always create a new Remote Pool (Independent) remote_pool_ = std::make_shared(); @@ -768,6 +774,9 @@ int32_t RDMAEndpoint::dispatchTask(OpCode op_code, } bool is_final_slot = (req_idx >= total_reqs); + // Transport-level bookkeeping for this slot. Semantic obs fields + // live on op_state (set once at submit time); callbacks do not + // look at RDMAAssign for obs data. for (size_t qpi = 0; qpi < num_qp_; ++qpi) { auto& assign = ctx->assigns_[qpi]; assign.opcode_ = slot_op_code; @@ -787,6 +796,14 @@ int32_t RDMAEndpoint::dispatchTask(OpCode op_code, int32_t expected = RDMAAssign::SUCCESS; op_state->completion_status.compare_exchange_strong( expected, status, std::memory_order_release, std::memory_order_relaxed); + + // Observability: record failure once per user op. + // Any qpi on any slot can win this exchange; the + // remaining callbacks short-circuit. + if (op_state->obs_enabled && !op_state->obs_completed.exchange(true, std::memory_order_acq_rel)) { + obs::obs_record_fail( + op_state->obs_nic_id, static_cast(op_state->obs_op), op_state->obs_bytes); + } } token_bucket_[qpi].fetch_add(batch_size, std::memory_order_release); @@ -804,6 +821,17 @@ int32_t RDMAEndpoint::dispatchTask(OpCode op_code, uint32_t mask_before = ctx->slot_qp_mask.fetch_or(1u << qpi, std::memory_order_acq_rel); uint32_t mask_after = mask_before | (1u << qpi); if (mask_after == ctx->slot_qp_expected) { + // Observability: record success once per user op, + // only when the FINAL slot has fully completed. A + // prior failure on any slot already won the exchange + // (op_state->completion_status != SUCCESS), so a + // double record is prevented by obs_completed. + if (is_final_slot && op_state->obs_enabled + && op_state->completion_status.load(std::memory_order_acquire) == RDMAAssign::SUCCESS + && !op_state->obs_completed.exchange(true, std::memory_order_acq_rel)) { + obs::obs_record_complete( + op_state->obs_nic_id, static_cast(op_state->obs_op), op_state->obs_bytes); + } releaseReadWriteSlot(ctx); } }; @@ -824,6 +852,14 @@ int32_t RDMAEndpoint::dispatchTask(OpCode op_code, // ============================================================ // Two-Sided Primitives (Message Passing) // ============================================================ +// +// Observability v0 reports semantic submit/completion only for +// one-sided read/write/writeWithImm. Two-sided send/recv and immRecv +// semantic pending are intentionally not reported in v0 — their +// completion paths are not yet integrated with the op_state-level +// once-only accounting. Transport-level post counters +// (post_send_batch / post_recv_batch / post_rc_oneside_batch) in +// rdma_channel.cpp are unaffected. std::shared_ptr RDMAEndpoint::send(const chunk_tuple_t& chunk, void* stream_handle) { @@ -904,6 +940,22 @@ std::shared_ptr RDMAEndpoint::recv(const chunk_tuple_t& chunk, void* std::shared_ptr RDMAEndpoint::read(const std::vector& assign, void* stream) { auto op_state = makeOpState((1u << num_qp_) - 1, false, stream, /*trace_start=*/false); + + // Observability: anchor semantic accounting on op_state. The CQ + // callback uses obs_completed.exchange(true) to record completion + // exactly once per user op, regardless of num_qp or slot count. + if (obs::obs_enabled()) { + uint64_t total_bytes = 0; + for (const auto& a : assign) + total_bytes += std::get<4>(a); + op_state->obs_enabled = true; + op_state->obs_bytes = total_bytes; + op_state->obs_assign_count = static_cast(assign.size()); + op_state->obs_op = static_cast(obs::OBS_OP_READ); + op_state->obs_nic_id = static_cast(obs_nic_id_); + obs::obs_record_submit(obs_nic_id_, obs::OBS_OP_READ, op_state->obs_assign_count, total_bytes); + } + registerInFlight(op_state); dispatchTask(OpCode::READ, assign, op_state, 0, stream); return std::make_shared(op_state); @@ -912,6 +964,19 @@ std::shared_ptr RDMAEndpoint::read(const std::vector RDMAEndpoint::write(const std::vector& assign, void* stream) { auto op_state = makeOpState((1u << num_qp_) - 1, false, stream, /*trace_start=*/false); + + if (obs::obs_enabled()) { + uint64_t total_bytes = 0; + for (const auto& a : assign) + total_bytes += std::get<4>(a); + op_state->obs_enabled = true; + op_state->obs_bytes = total_bytes; + op_state->obs_assign_count = static_cast(assign.size()); + op_state->obs_op = static_cast(obs::OBS_OP_WRITE); + op_state->obs_nic_id = static_cast(obs_nic_id_); + obs::obs_record_submit(obs_nic_id_, obs::OBS_OP_WRITE, op_state->obs_assign_count, total_bytes); + } + registerInFlight(op_state); dispatchTask(OpCode::WRITE, assign, op_state, 0, stream); return std::make_shared(op_state); @@ -921,6 +986,19 @@ std::shared_ptr RDMAEndpoint::writeWithImm(const std::vector& assign, int32_t imm_data, void* stream) { auto op_state = makeOpState((1u << num_qp_) - 1, false, stream, /*trace_start=*/false); + + if (obs::obs_enabled()) { + uint64_t total_bytes = 0; + for (const auto& a : assign) + total_bytes += std::get<4>(a); + op_state->obs_enabled = true; + op_state->obs_bytes = total_bytes; + op_state->obs_assign_count = static_cast(assign.size()); + op_state->obs_op = static_cast(obs::OBS_OP_WRITE_WITH_IMM); + op_state->obs_nic_id = static_cast(obs_nic_id_); + obs::obs_record_submit(obs_nic_id_, obs::OBS_OP_WRITE_WITH_IMM, op_state->obs_assign_count, total_bytes); + } + registerInFlight(op_state); dispatchTask(OpCode::WRITE_WITH_IMM, assign, op_state, imm_data, stream); return std::make_shared(op_state); @@ -928,6 +1006,8 @@ RDMAEndpoint::writeWithImm(const std::vector& assign, int32_t im std::shared_ptr RDMAEndpoint::immRecv(void* stream) { + // v0: no semantic submit for immRecv — see note on send/recv above. + io_recv_slot_id_.fetch_add(1, std::memory_order_relaxed); // One user-level receive operation. It may complete immediately from a @@ -1313,6 +1393,13 @@ void RDMAEndpoint::cancelAll() if (op_state->signal) { op_state->signal->force_complete(); } + // Observability: record cancellation as failure, exactly once. + // If the real completion already raced ahead and recorded, this + // short-circuits via obs_completed. + if (op_state->obs_enabled && !op_state->obs_completed.exchange(true, std::memory_order_acq_rel)) { + obs::obs_record_fail( + op_state->obs_nic_id, static_cast(op_state->obs_op), op_state->obs_bytes); + } } // Also force-complete transport-owned imm-recv slot signals so any diff --git a/dlslime/csrc/engine/rdma/rdma_endpoint.h b/dlslime/csrc/engine/rdma/rdma_endpoint.h index 8efbbb2..f928730 100644 --- a/dlslime/csrc/engine/rdma/rdma_endpoint.h +++ b/dlslime/csrc/engine/rdma/rdma_endpoint.h @@ -316,6 +316,8 @@ class RDMAEndpoint: public std::enable_shared_from_this { std::atomic id_{-1}; std::atomic connected_{false}; + int obs_nic_id_{-1}; // Observability: registered NIC slot index + std::shared_ptr ctx_; std::shared_ptr local_pool_; // user_pool (shared when from PeerAgent) std::shared_ptr meta_pool_; // per-endpoint, sys buffers (borrows PD from local_pool_) diff --git a/dlslime/csrc/engine/rdma/rdma_op_state.h b/dlslime/csrc/engine/rdma/rdma_op_state.h index f7b5d52..fb9d475 100644 --- a/dlslime/csrc/engine/rdma/rdma_op_state.h +++ b/dlslime/csrc/engine/rdma/rdma_op_state.h @@ -40,6 +40,26 @@ struct EndpointOpState { // Optional tracing for the imm-recv fast path. Zero if unused. std::atomic trace_start_ns{0}; std::atomic trace_end_ns{0}; + + // ============================================================ + // Observability metadata (v0: one-sided read/write/writeWithImm). + // + // Semantic accounting is anchored here — not on RDMAAssign or CQ + // callbacks — so that one user-visible op decrements pending_ops + // exactly once regardless of num_qp or slot count. + // + // obs_enabled is the per-op gate: set to obs::obs_enabled() at + // submit time so callbacks can short-circuit cheaply. + // obs_completed is the once-only guard: callbacks do + // !obs_completed.exchange(true, acq_rel) + // and the winner records completion (success or failure). + // ============================================================ + uint64_t obs_bytes{0}; + uint32_t obs_assign_count{0}; + uint8_t obs_op{0}; // ObsOpIndex + uint16_t obs_nic_id{0}; + bool obs_enabled{false}; + std::atomic obs_completed{false}; }; } // namespace dlslime diff --git a/dlslime/csrc/observability/CMakeLists.txt b/dlslime/csrc/observability/CMakeLists.txt new file mode 100644 index 0000000..5392a65 --- /dev/null +++ b/dlslime/csrc/observability/CMakeLists.txt @@ -0,0 +1,20 @@ +add_library(_slime_obs SHARED obs.cpp) + +set_target_properties( + _slime_obs + PROPERTIES + BUILD_WITH_INSTALL_RPATH TRUE + INSTALL_RPATH "\${ORIGIN}" +) + +target_include_directories(_slime_obs PUBLIC + $ + $ +) + +install( + TARGETS + _slime_obs + EXPORT dlslimeTargets + LIBRARY DESTINATION ${DLSLIME_INSTALL_PATH} +) diff --git a/dlslime/csrc/observability/obs.cpp b/dlslime/csrc/observability/obs.cpp new file mode 100644 index 0000000..8cab982 --- /dev/null +++ b/dlslime/csrc/observability/obs.cpp @@ -0,0 +1,242 @@ +#include "dlslime/csrc/observability/obs.h" + +#include +#include +#include + +namespace dlslime { +namespace obs { + +// ============================================================ +// Global counter storage — defined here, extern-declared in obs.h +// ============================================================ + +PeerCounters g_peer; +NicCounters g_nics[OBS_MAX_NICS]; +static std::atomic g_nic_count{0}; +static std::mutex g_nic_register_mu; + +// ============================================================ +// obs_enabled(): one-shot init from DLSLIME_OBS env var +// ============================================================ + +bool obs_enabled() +{ + static const bool enabled = [] { + const char* val = std::getenv("DLSLIME_OBS"); + return val != nullptr && val[0] == '1' && val[1] == '\0'; + }(); + return enabled; +} + +// ============================================================ +// obs_register_nic(): allocate a fixed slot for a NIC device +// +// Slow path — concurrent PeerAgent endpoint creation may race on the +// same device_name. A std::mutex makes the check-then-insert atomic +// so each device_name maps to exactly one nic_id. The hot path still +// reads directly from the fixed g_nics[] array via nic_id. +// ============================================================ + +int obs_register_nic(const char* device_name) +{ + if (!device_name) + return -1; + + std::lock_guard lk(g_nic_register_mu); + + int count = g_nic_count.load(std::memory_order_acquire); + for (int i = 0; i < count; ++i) { + if (g_nics[i].active && std::strncmp(g_nics[i].device_name, device_name, 63) == 0) { + return i; + } + } + + if (count >= OBS_MAX_NICS) { + return -1; // out of slots + } + + int slot = count; + std::strncpy(g_nics[slot].device_name, device_name, 63); + g_nics[slot].device_name[63] = '\0'; + g_nics[slot].active = true; + + // Publish the new count last so the lock-free readers in + // obs_snapshot_json() observe a fully-initialized slot. + g_nic_count.store(slot + 1, std::memory_order_release); + + return slot; +} + +// ============================================================ +// obs_snapshot_json(): slow path — reads all atomics, builds JSON +// ============================================================ + +nlohmann::json obs_snapshot_json() +{ + using json = nlohmann::json; + + if (!obs_enabled()) { + return json{{"enabled", false}}; + } + + // Peer-level summary + json summary; + summary["assign_total"] = g_peer.assign_total.load(std::memory_order_relaxed); + summary["batch_total"] = g_peer.batch_total.load(std::memory_order_relaxed); + summary["submitted_bytes_total"] = g_peer.submitted_bytes_total.load(std::memory_order_relaxed); + summary["completed_bytes_total"] = g_peer.completed_bytes_total.load(std::memory_order_relaxed); + summary["failed_bytes_total"] = g_peer.failed_bytes_total.load(std::memory_order_relaxed); + summary["pending_ops"] = g_peer.pending_ops.load(std::memory_order_relaxed); + summary["error_total"] = g_peer.error_total.load(std::memory_order_relaxed); + + uint64_t user_mr_count = g_peer.user_mr_count.load(std::memory_order_relaxed); + uint64_t user_mr_bytes = g_peer.user_mr_bytes.load(std::memory_order_relaxed); + summary["user_mr_count"] = user_mr_count; + summary["user_mr_bytes"] = user_mr_bytes; + summary["sys_mr_count"] = g_peer.sys_mr_count.load(std::memory_order_relaxed); + summary["sys_mr_bytes"] = g_peer.sys_mr_bytes.load(std::memory_order_relaxed); + // Back-compat aliases: mr_count / mr_bytes now mean user-MR counts. + summary["mr_count"] = user_mr_count; + summary["mr_bytes"] = user_mr_bytes; + + // Peer-level per-op pending breakdown (authoritative, unaffected by + // NIC registration state). + json pending_by_op = json::object(); + for (int op = 0; op < OBS_OP_COUNT; ++op) { + pending_by_op[obs_op_name(static_cast(op))] = + g_peer.pending_by_op[op].load(std::memory_order_relaxed); + } + summary["pending_by_op"] = pending_by_op; + + // Per-NIC + json nics_arr = json::array(); + int count = g_nic_count.load(std::memory_order_acquire); + for (int i = 0; i < count && i < OBS_MAX_NICS; ++i) { + if (!g_nics[i].active) + continue; + + auto& nic = g_nics[i]; + + // Aggregate across ops for the summary fields + uint64_t n_assign = 0, n_batch = 0, n_sub_bytes = 0; + uint64_t n_comp_bytes = 0, n_fail_bytes = 0; + int64_t n_pending = 0; + uint64_t n_errors = 0; + uint64_t n_post_batch = 0, n_post_wr = 0, n_post_bytes = 0; + uint64_t n_post_fail = 0; + + json by_op = json::object(); + for (int op = 0; op < OBS_OP_COUNT; ++op) { + uint64_t a = nic.assign_total[op].load(std::memory_order_relaxed); + uint64_t b = nic.batch_total[op].load(std::memory_order_relaxed); + uint64_t sb = nic.submitted_bytes_total[op].load(std::memory_order_relaxed); + uint64_t cb = nic.completed_bytes_total[op].load(std::memory_order_relaxed); + uint64_t fb = nic.failed_bytes_total[op].load(std::memory_order_relaxed); + int64_t p = nic.pending_ops[op].load(std::memory_order_relaxed); + uint64_t e = nic.error_total[op].load(std::memory_order_relaxed); + uint64_t pb = nic.post_batch_total[op].load(std::memory_order_relaxed); + uint64_t pw = nic.post_wr_total[op].load(std::memory_order_relaxed); + uint64_t pby = nic.post_bytes_total[op].load(std::memory_order_relaxed); + uint64_t pf = nic.post_failures_total[op].load(std::memory_order_relaxed); + + n_assign += a; + n_batch += b; + n_sub_bytes += sb; + n_comp_bytes += cb; + n_fail_bytes += fb; + n_pending += p; + n_errors += e; + n_post_batch += pb; + n_post_wr += pw; + n_post_bytes += pby; + n_post_fail += pf; + + // Include per-op detail if non-zero + if (a > 0 || b > 0 || sb > 0 || cb > 0) { + by_op[obs_op_name(static_cast(op))] = json{{"assign", a}, + {"batch", b}, + {"submitted_bytes", sb}, + {"completed_bytes", cb}, + {"failed_bytes", fb}, + {"pending", p}, + {"errors", e}, + {"post_batch", pb}, + {"post_wr", pw}, + {"post_bytes", pby}, + {"post_failures", pf}}; + } + } + + json nic_obj; + nic_obj["nic"] = std::string(nic.device_name); + nic_obj["nic_bdf"] = ""; // v0: not populated + nic_obj["assign_total"] = n_assign; + nic_obj["batch_total"] = n_batch; + nic_obj["submitted_bytes_total"] = n_sub_bytes; + nic_obj["completed_bytes_total"] = n_comp_bytes; + nic_obj["failed_bytes_total"] = n_fail_bytes; + nic_obj["pending_ops"] = n_pending; + nic_obj["error_total"] = n_errors; + nic_obj["post_batch_total"] = n_post_batch; + nic_obj["post_wr_total"] = n_post_wr; + nic_obj["post_bytes_total"] = n_post_bytes; + nic_obj["post_failures_total"] = n_post_fail; + nic_obj["cq_errors_total"] = nic.cq_errors_total.load(std::memory_order_relaxed); + if (!by_op.empty()) { + nic_obj["by_op"] = by_op; + } + + nics_arr.push_back(nic_obj); + } + + json result; + result["enabled"] = true; + result["summary"] = summary; + result["nics"] = nics_arr; + return result; +} + +// ============================================================ +// obs_reset_for_test(): zero all counters +// ============================================================ + +void obs_reset_for_test() +{ + g_peer.assign_total.store(0, std::memory_order_relaxed); + g_peer.batch_total.store(0, std::memory_order_relaxed); + g_peer.submitted_bytes_total.store(0, std::memory_order_relaxed); + g_peer.completed_bytes_total.store(0, std::memory_order_relaxed); + g_peer.failed_bytes_total.store(0, std::memory_order_relaxed); + g_peer.pending_ops.store(0, std::memory_order_relaxed); + g_peer.error_total.store(0, std::memory_order_relaxed); + g_peer.user_mr_count.store(0, std::memory_order_relaxed); + g_peer.user_mr_bytes.store(0, std::memory_order_relaxed); + g_peer.sys_mr_count.store(0, std::memory_order_relaxed); + g_peer.sys_mr_bytes.store(0, std::memory_order_relaxed); + for (int op = 0; op < OBS_OP_COUNT; ++op) { + g_peer.pending_by_op[op].store(0, std::memory_order_relaxed); + } + + int count = g_nic_count.load(std::memory_order_acquire); + for (int i = 0; i < count && i < OBS_MAX_NICS; ++i) { + auto& nic = g_nics[i]; + for (int op = 0; op < OBS_OP_COUNT; ++op) { + nic.assign_total[op].store(0, std::memory_order_relaxed); + nic.batch_total[op].store(0, std::memory_order_relaxed); + nic.submitted_bytes_total[op].store(0, std::memory_order_relaxed); + nic.completed_bytes_total[op].store(0, std::memory_order_relaxed); + nic.failed_bytes_total[op].store(0, std::memory_order_relaxed); + nic.pending_ops[op].store(0, std::memory_order_relaxed); + nic.error_total[op].store(0, std::memory_order_relaxed); + nic.post_batch_total[op].store(0, std::memory_order_relaxed); + nic.post_wr_total[op].store(0, std::memory_order_relaxed); + nic.post_bytes_total[op].store(0, std::memory_order_relaxed); + nic.post_failures_total[op].store(0, std::memory_order_relaxed); + } + nic.cq_errors_total.store(0, std::memory_order_relaxed); + } +} + +} // namespace obs +} // namespace dlslime diff --git a/dlslime/csrc/observability/obs.h b/dlslime/csrc/observability/obs.h new file mode 100644 index 0000000..097751e --- /dev/null +++ b/dlslime/csrc/observability/obs.h @@ -0,0 +1,312 @@ +#pragma once + +/// @file obs.h +/// @brief Low-overhead observability counters for DLSlime RDMA data plane. +/// +/// Design constraints (hot-path safe): +/// - No Redis, JSON, malloc, mutex, or string operations on hot path. +/// - All record functions are gated by `obs_enabled()` (single branch). +/// - Counters use std::atomic + memory_order_relaxed. +/// - Counter structs are alignas(64) to avoid false sharing. +/// - NIC slots are pre-allocated (fixed array, no map lookup). +/// +/// Enable via: export DLSLIME_OBS=1 +/// Snapshot (slow path only): obs_snapshot_json() + +#include +#include +#include + +#include "dlslime/csrc/common/json.hpp" + +namespace dlslime { +namespace obs { + +// ============================================================ +// Op index enum — maps to fixed array slots, not string labels +// ============================================================ + +enum ObsOpIndex : uint8_t { + OBS_OP_READ = 0, + OBS_OP_WRITE, + OBS_OP_WRITE_WITH_IMM, + OBS_OP_SEND, + OBS_OP_RECV, + OBS_OP_IMM_RECV, + OBS_OP_COUNT // sentinel +}; + +/// Map from engine OpCode to ObsOpIndex. Defined as constexpr array +/// so the hot-path lookup is a single indexed load. +/// OpCode enum: READ=0, WRITE=1, SEND=2, RECV=3, SEND_WITH_IMM=4, WRITE_WITH_IMM=5 +inline constexpr ObsOpIndex OPCODE_TO_OBS[] = { + OBS_OP_READ, // OpCode::READ = 0 + OBS_OP_WRITE, // OpCode::WRITE = 1 + OBS_OP_SEND, // OpCode::SEND = 2 + OBS_OP_RECV, // OpCode::RECV = 3 + OBS_OP_SEND, // OpCode::SEND_WITH_IMM = 4 (same counter as SEND) + OBS_OP_WRITE_WITH_IMM // OpCode::WRITE_WITH_IMM= 5 +}; + +inline const char* obs_op_name(ObsOpIndex op) +{ + static const char* names[] = {"read", "write", "write_with_imm", "send", "recv", "imm_recv"}; + return (op < OBS_OP_COUNT) ? names[op] : "unknown"; +} + +// ============================================================ +// Counter structs +// ============================================================ + +constexpr int OBS_MAX_NICS = 16; + +/// Peer-level aggregate counters. +struct alignas(64) PeerCounters { + std::atomic assign_total{0}; + std::atomic batch_total{0}; + std::atomic submitted_bytes_total{0}; + std::atomic completed_bytes_total{0}; + std::atomic failed_bytes_total{0}; + std::atomic pending_ops{0}; + std::atomic error_total{0}; + + // Per-op pending breakdown (peer-level authoritative; stays correct + // even if nic_id == -1 at the call site). + std::atomic pending_by_op[OBS_OP_COUNT]{}; + + // User-registered MRs. Back-compat aliases mr_count / mr_bytes map + // to these. + std::atomic user_mr_count{0}; + std::atomic user_mr_bytes{0}; + // System MRs (internal bookkeeping: sys.io_dummy, sys.msg_dummy, + // sys.send_ctx). Tracked separately so user-facing MR counts are + // not inflated by internal allocations. + std::atomic sys_mr_count{0}; + std::atomic sys_mr_bytes{0}; +}; + +/// Per-NIC counters with per-op breakdown. +struct alignas(64) NicCounters { + char device_name[64]{}; + bool active{false}; + + // Semantic-level (from RDMAEndpoint) + std::atomic assign_total[OBS_OP_COUNT]{}; + std::atomic batch_total[OBS_OP_COUNT]{}; + std::atomic submitted_bytes_total[OBS_OP_COUNT]{}; + std::atomic completed_bytes_total[OBS_OP_COUNT]{}; + std::atomic failed_bytes_total[OBS_OP_COUNT]{}; + std::atomic pending_ops[OBS_OP_COUNT]{}; + std::atomic error_total[OBS_OP_COUNT]{}; + + // Transport-level (from RDMAChannel) + std::atomic post_batch_total[OBS_OP_COUNT]{}; + std::atomic post_wr_total[OBS_OP_COUNT]{}; + std::atomic post_bytes_total[OBS_OP_COUNT]{}; + std::atomic post_failures_total[OBS_OP_COUNT]{}; + + // CQ-level + std::atomic cq_errors_total{0}; +}; + +// ============================================================ +// Global state +// ============================================================ + +/// Returns true if observability is enabled (DLSLIME_OBS=1). +/// Evaluated once at first call via static local. +bool obs_enabled(); + +// ============================================================ +// Slow-path API (init / snapshot) +// ============================================================ + +/// Register a NIC device name, returns a nic_id (0-based slot index). +/// Thread-safe (uses atomic CAS). Called during RDMAContext::init. +int obs_register_nic(const char* device_name); + +/// Produce a JSON snapshot of all counters (slow path, allocates). +nlohmann::json obs_snapshot_json(); + +/// Reset all counters (for testing only). +void obs_reset_for_test(); + +// ============================================================ +// Hot-path recording API — all inline, gated by obs_enabled() +// ============================================================ + +/// Record a semantic submit (called from RDMAEndpoint for one-sided ops only). +inline void obs_record_submit(int nic_id, ObsOpIndex op, uint32_t assign_count, uint64_t bytes) +{ + if (!obs_enabled()) + return; + + // Access global state defined in obs.cpp via extern declarations. + extern PeerCounters g_peer; + extern NicCounters g_nics[OBS_MAX_NICS]; + + g_peer.assign_total.fetch_add(assign_count, std::memory_order_relaxed); + g_peer.batch_total.fetch_add(1, std::memory_order_relaxed); + g_peer.submitted_bytes_total.fetch_add(bytes, std::memory_order_relaxed); + g_peer.pending_ops.fetch_add(1, std::memory_order_relaxed); + if (op < OBS_OP_COUNT) { + g_peer.pending_by_op[op].fetch_add(1, std::memory_order_relaxed); + } + + if (nic_id >= 0 && nic_id < OBS_MAX_NICS) { + auto& nic = g_nics[nic_id]; + nic.assign_total[op].fetch_add(assign_count, std::memory_order_relaxed); + nic.batch_total[op].fetch_add(1, std::memory_order_relaxed); + nic.submitted_bytes_total[op].fetch_add(bytes, std::memory_order_relaxed); + nic.pending_ops[op].fetch_add(1, std::memory_order_relaxed); + } +} + +/// Record a successful completion. Must be called at most once per semantic +/// user op — idempotency is enforced at the call site via +/// EndpointOpState::obs_completed.exchange(), not inside this primitive. +inline void obs_record_complete(int nic_id, ObsOpIndex op, uint64_t bytes) +{ + if (!obs_enabled()) + return; + + extern PeerCounters g_peer; + extern NicCounters g_nics[OBS_MAX_NICS]; + + g_peer.completed_bytes_total.fetch_add(bytes, std::memory_order_relaxed); + g_peer.pending_ops.fetch_sub(1, std::memory_order_relaxed); + if (op < OBS_OP_COUNT) { + g_peer.pending_by_op[op].fetch_sub(1, std::memory_order_relaxed); + } + + if (nic_id >= 0 && nic_id < OBS_MAX_NICS) { + auto& nic = g_nics[nic_id]; + nic.completed_bytes_total[op].fetch_add(bytes, std::memory_order_relaxed); + nic.pending_ops[op].fetch_sub(1, std::memory_order_relaxed); + } +} + +/// Record a failure. Same once-per-op contract as obs_record_complete. +inline void obs_record_fail(int nic_id, ObsOpIndex op, uint64_t bytes) +{ + if (!obs_enabled()) + return; + + extern PeerCounters g_peer; + extern NicCounters g_nics[OBS_MAX_NICS]; + + g_peer.failed_bytes_total.fetch_add(bytes, std::memory_order_relaxed); + g_peer.pending_ops.fetch_sub(1, std::memory_order_relaxed); + g_peer.error_total.fetch_add(1, std::memory_order_relaxed); + if (op < OBS_OP_COUNT) { + g_peer.pending_by_op[op].fetch_sub(1, std::memory_order_relaxed); + } + + if (nic_id >= 0 && nic_id < OBS_MAX_NICS) { + auto& nic = g_nics[nic_id]; + nic.failed_bytes_total[op].fetch_add(bytes, std::memory_order_relaxed); + nic.pending_ops[op].fetch_sub(1, std::memory_order_relaxed); + nic.error_total[op].fetch_add(1, std::memory_order_relaxed); + } +} + +/// Record a transport-level post batch (called from RDMAChannel). +inline void obs_record_post_batch(int nic_id, ObsOpIndex op, uint32_t wr_count, uint64_t bytes) +{ + if (!obs_enabled()) + return; + + extern NicCounters g_nics[OBS_MAX_NICS]; + + if (nic_id >= 0 && nic_id < OBS_MAX_NICS) { + auto& nic = g_nics[nic_id]; + nic.post_batch_total[op].fetch_add(1, std::memory_order_relaxed); + nic.post_wr_total[op].fetch_add(wr_count, std::memory_order_relaxed); + nic.post_bytes_total[op].fetch_add(bytes, std::memory_order_relaxed); + } +} + +/// Record a transport-level post failure (ibv_post_send/recv returned error). +inline void obs_record_post_failure(int nic_id, ObsOpIndex op) +{ + if (!obs_enabled()) + return; + + extern NicCounters g_nics[OBS_MAX_NICS]; + + if (nic_id >= 0 && nic_id < OBS_MAX_NICS) { + g_nics[nic_id].post_failures_total[op].fetch_add(1, std::memory_order_relaxed); + } +} + +/// Record a CQ error (wc.status != IBV_WC_SUCCESS and not flush). +inline void obs_record_cq_error(int nic_id) +{ + if (!obs_enabled()) + return; + + extern PeerCounters g_peer; + extern NicCounters g_nics[OBS_MAX_NICS]; + + g_peer.error_total.fetch_add(1, std::memory_order_relaxed); + + if (nic_id >= 0 && nic_id < OBS_MAX_NICS) { + g_nics[nic_id].cq_errors_total.fetch_add(1, std::memory_order_relaxed); + } +} + +/// Record an MR registration. +inline void obs_record_mr_register(uint64_t bytes, bool is_system) +{ + if (!obs_enabled()) + return; + + extern PeerCounters g_peer; + + if (is_system) { + g_peer.sys_mr_count.fetch_add(1, std::memory_order_relaxed); + g_peer.sys_mr_bytes.fetch_add(bytes, std::memory_order_relaxed); + } + else { + g_peer.user_mr_count.fetch_add(1, std::memory_order_relaxed); + g_peer.user_mr_bytes.fetch_add(bytes, std::memory_order_relaxed); + } +} + +/// Record an MR unregistration. +inline void obs_record_mr_unregister(uint64_t bytes, bool is_system) +{ + if (!obs_enabled()) + return; + + extern PeerCounters g_peer; + + if (is_system) { + g_peer.sys_mr_count.fetch_sub(1, std::memory_order_relaxed); + g_peer.sys_mr_bytes.fetch_sub(bytes, std::memory_order_relaxed); + } + else { + g_peer.user_mr_count.fetch_sub(1, std::memory_order_relaxed); + g_peer.user_mr_bytes.fetch_sub(bytes, std::memory_order_relaxed); + } +} + +/// Record a delta increase to an existing MR (used when re-registering an +/// MR with a larger length). bytes is the positive delta. +inline void obs_record_mr_grow(uint64_t delta_bytes, bool is_system) +{ + if (!obs_enabled() || delta_bytes == 0) + return; + + extern PeerCounters g_peer; + + if (is_system) { + g_peer.sys_mr_bytes.fetch_add(delta_bytes, std::memory_order_relaxed); + } + else { + g_peer.user_mr_bytes.fetch_add(delta_bytes, std::memory_order_relaxed); + } +} + +} // namespace obs +} // namespace dlslime diff --git a/dlslime/csrc/python/CMakeLists.txt b/dlslime/csrc/python/CMakeLists.txt index f2c6ead..389be03 100755 --- a/dlslime/csrc/python/CMakeLists.txt +++ b/dlslime/csrc/python/CMakeLists.txt @@ -32,7 +32,7 @@ pybind11_add_module( set(_slime_c_link_libraries "") set(BIND_INSTALL_RPATH "\${ORIGIN}") -list(APPEND _slime_c_link_libraries _slime_topology) +list(APPEND _slime_c_link_libraries _slime_topology _slime_obs) if (BUILD_NVLINK) target_compile_definitions(_slime_c PRIVATE BUILD_NVLINK) diff --git a/dlslime/csrc/python/bind.cpp b/dlslime/csrc/python/bind.cpp index fec0f68..b5a5659 100644 --- a/dlslime/csrc/python/bind.cpp +++ b/dlslime/csrc/python/bind.cpp @@ -64,6 +64,7 @@ void bind_cache(pybind11::module_& m); #include "dlslime/csrc/common/json.hpp" #include "dlslime/csrc/common/pybind_json/pybind_json.hpp" #include "dlslime/csrc/logging.h" +#include "dlslime/csrc/observability/obs.h" #include "dlslime/csrc/topology.h" using json = nlohmann::json; @@ -538,4 +539,19 @@ PYBIND11_MODULE(_slime_c, m) // .def("all_gather_ll", &dlslime::AllGatherInterLLBuffer::allGatherLL) // .def("all_gather_ll_hook", &dlslime::AllGatherInterLLBuffer::allGatherLLHook); // #endif + + // ========================================================================= + // Observability (always available, independent of backend) + // ========================================================================= + m.def("obs_enabled", &dlslime::obs::obs_enabled, "Check if observability is enabled (DLSLIME_OBS=1)"); + m.def( + "obs_snapshot", + []() { return dlslime::obs::obs_snapshot_json(); }, + "Return a dict snapshot of all obs counters"); + m.def("obs_reset_for_test", &dlslime::obs::obs_reset_for_test, "Reset all obs counters (testing only)"); + m.def( + "obs_register_nic_for_test", + [](const std::string& name) { return dlslime::obs::obs_register_nic(name.c_str()); }, + py::arg("name"), + "Register a NIC name and return its slot id (test-only wrapper around obs_register_nic)"); } diff --git a/dlslime/peer_agent/_accounting.py b/dlslime/peer_agent/_accounting.py new file mode 100644 index 0000000..5e35504 --- /dev/null +++ b/dlslime/peer_agent/_accounting.py @@ -0,0 +1,303 @@ +""" +ObsReporter: Daemon thread that periodically snapshots C++ obs counters +and publishes them to Redis for cluster-wide visibility. + +Architecture: + C++ atomic counters → obs_snapshot() (pybind) → enrich with PeerAgent state + → SET {scope}:obs:peer:{peer_id} + PEXPIRE + +Enable: DLSLIME_OBS=1 +Config: + DLSLIME_OBS_TIME_STEP_MS (default 1000) + DLSLIME_OBS_REDIS (default 1, set 0 to disable Redis writes) + +The snapshot JSON schema (version 1): + { + "schema_version": 1, + "session_id": "{alias}:{pid}:{start_ms}", + "peer_id": "agent-1", + "host": "10.0.0.1", + "pid": 1234, + "reported_at_ms": 1715000000000, + "summary": { ... }, // from C++ obs_snapshot() + "nics": [ ... ], // from C++ obs_snapshot() + "ewma_bandwidth_bps": 0, // computed here + "connections": [ ... ], // from PeerAgent state + } +""" + +from __future__ import annotations + +import json +import logging +import os +import socket +import threading +import time +from typing import Any, Dict, Optional, TYPE_CHECKING + +if TYPE_CHECKING: + from ._agent import PeerAgent + +logger = logging.getLogger("dlslime.obs") + +# EWMA smoothing factor +_EWMA_ALPHA = 0.3 + + +class ObsReporter: + """Background thread that publishes obs snapshots to Redis.""" + + def __init__(self, agent: "PeerAgent") -> None: + self._agent = agent + self._stop_event = threading.Event() + self._thread: Optional[threading.Thread] = None + + self._time_step_ms = int(os.environ.get("DLSLIME_OBS_TIME_STEP_MS", "1000")) + self._redis_enabled = os.environ.get("DLSLIME_OBS_REDIS", "1") != "0" + self._host = socket.gethostname() + self._pid = os.getpid() + self._start_ms = int(time.time() * 1000) + self._session_id = f"{agent.alias}:{self._pid}:{self._start_ms}" + + # Peer-level EWMA state + self._prev_completed_bytes: int = 0 + self._prev_time_ms: int = self._start_ms + self._ewma_bw_bps: float = 0.0 + + # Per-NIC EWMA state. Keys are NIC device names (e.g. "mlx5_0"). + self._prev_nic_completed_bytes: Dict[str, int] = {} + self._prev_nic_time_ms: Dict[str, int] = {} + self._nic_ewma_bw_bps: Dict[str, float] = {} + + def start(self) -> None: + if self._thread is not None: + return + self._thread = threading.Thread( + target=self._run, + name=f"obs-reporter-{self._agent.alias}", + daemon=True, + ) + self._thread.start() + logger.info( + "ObsReporter started for %s (step=%dms, redis=%s)", + self._agent.alias, + self._time_step_ms, + self._redis_enabled, + ) + + def stop(self) -> None: + """Stop the reporter thread and publish one final "stopped" snapshot + so operators see the transition immediately in `nanoctrl obs peers` + instead of waiting for the TTL to evict the last `alive` snapshot. + """ + self._stop_event.set() + if self._thread is not None: + self._thread.join(timeout=5.0) + self._thread = None + try: + self._emit_final_snapshot() + except Exception: + logger.debug("ObsReporter final snapshot failed", exc_info=True) + logger.info("ObsReporter stopped for %s", self._agent.alias) + + def _emit_final_snapshot(self) -> None: + """Write one last snapshot with status="stopped" so the CLI can + distinguish clean shutdown from a crashed/stale agent. + + Uses a short TTL (max(3*step, 10 s)) — the key should disappear + quickly after graceful shutdown, unlike alive snapshots which + linger for the full _ttl_ms() window. + """ + if not self._redis_enabled: + return + redis_client = self._get_redis_client() + if redis_client is None: + return + + try: + import dlslime._slime_c as _c # type: ignore + except ImportError: + return + + now_ms = int(time.time() * 1000) + snap = _c.obs_snapshot() if _c.obs_enabled() else {} + summary = snap.get("summary", {}) if isinstance(snap, dict) else {} + nics = snap.get("nics", []) if isinstance(snap, dict) else [] + + connections = self._gather_connections() + + full_snap: Dict[str, Any] = { + "schema_version": 1, + "session_id": self._session_id, + "peer_id": self._agent.alias, + "host": self._host, + "pid": self._pid, + "reported_at_ms": now_ms, + # Explicit lifecycle marker. CLI renders this as the STATE + # column when present, overriding the alive/stale derivation + # that depends on reported_at_ms age. + "status": "stopped", + "summary": summary, + "nics": nics, + "ewma_bandwidth_bps": 0.0, + "connections": connections, + } + + scope = self._agent._scope or "" + prefix = f"{scope}:" if scope else "" + key = f"{prefix}obs:peer:{self._agent.alias}" + # Short TTL — graceful shutdown should clear quickly, not linger + # for the full alive-snapshot window. + ttl_ms = max(3 * self._time_step_ms, 10_000) + + try: + pipe = redis_client.pipeline(transaction=False) + pipe.set(key, json.dumps(full_snap)) + pipe.pexpire(key, ttl_ms) + pipe.execute() + except Exception: + logger.debug("ObsReporter Redis final-write failed", exc_info=True) + + def _run(self) -> None: + try: + import dlslime._slime_c as _c # type: ignore + except ImportError: + logger.warning("Cannot import _slime_c, obs reporter disabled") + return + + redis_client = self._get_redis_client() + + while not self._stop_event.is_set(): + try: + self._tick(_c, redis_client) + except Exception: + logger.debug("ObsReporter tick error", exc_info=True) + + self._stop_event.wait(timeout=self._time_step_ms / 1000.0) + + def _tick(self, _c: Any, redis_client: Any) -> None: + # 1. Get C++ snapshot + snap = _c.obs_snapshot() + if not snap.get("enabled", False): + return + + now_ms = int(time.time() * 1000) + + # 2. Compute EWMA bandwidth at the peer level + summary = snap.get("summary", {}) + completed_bytes = summary.get("completed_bytes_total", 0) + delta_bytes = completed_bytes - self._prev_completed_bytes + delta_time_s = max((now_ms - self._prev_time_ms) / 1000.0, 0.001) + + instant_bps = 8.0 * delta_bytes / delta_time_s + self._ewma_bw_bps = ( + _EWMA_ALPHA * instant_bps + (1 - _EWMA_ALPHA) * self._ewma_bw_bps + ) + + self._prev_completed_bytes = completed_bytes + self._prev_time_ms = now_ms + + # 2b. Compute per-NIC EWMA bandwidth. `completed_bytes_total` is + # monotonic, so delta is always >= 0. Mutates each NIC dict in + # place so the stamped field ends up in the Redis snapshot. + nics_out = snap.get("nics", []) or [] + for nic in nics_out: + nic_name = nic.get("nic") or "" + if not nic_name: + continue + nic_completed = int(nic.get("completed_bytes_total", 0)) + prev_bytes = self._prev_nic_completed_bytes.get(nic_name, nic_completed) + prev_ms = self._prev_nic_time_ms.get(nic_name, now_ms) + prev_ewma = self._nic_ewma_bw_bps.get(nic_name, 0.0) + + nic_delta_bytes = nic_completed - prev_bytes + nic_delta_s = max((now_ms - prev_ms) / 1000.0, 0.001) + nic_instant_bps = 8.0 * nic_delta_bytes / nic_delta_s + nic_ewma = _EWMA_ALPHA * nic_instant_bps + (1 - _EWMA_ALPHA) * prev_ewma + + self._prev_nic_completed_bytes[nic_name] = nic_completed + self._prev_nic_time_ms[nic_name] = now_ms + self._nic_ewma_bw_bps[nic_name] = nic_ewma + nic["ewma_bandwidth_bps"] = nic_ewma + + # 3. Gather connection info + connections = self._gather_connections() + + # 4. Build full snapshot + full_snap: Dict[str, Any] = { + "schema_version": 1, + "session_id": self._session_id, + "peer_id": self._agent.alias, + "host": self._host, + "pid": self._pid, + "reported_at_ms": now_ms, + "summary": summary, + "nics": nics_out, + "ewma_bandwidth_bps": self._ewma_bw_bps, + "connections": connections, + } + + # 5. Write to Redis + if self._redis_enabled and redis_client is not None: + scope = self._agent._scope or "" + prefix = f"{scope}:" if scope else "" + key = f"{prefix}obs:peer:{self._agent.alias}" + # TTL is decoupled from the CLI's --stale-ms (default 45s). + # The key must outlive the stale threshold so that after a + # crash/hard-kill the CLI has a window to show `stale` before + # Redis evicts the key; otherwise users only ever see + # `alive` -> gone. + ttl_ms = max(3 * self._time_step_ms, 180_000) + + try: + pipe = redis_client.pipeline(transaction=False) + pipe.set(key, json.dumps(full_snap)) + pipe.pexpire(key, ttl_ms) + pipe.execute() + except Exception: + logger.debug("ObsReporter Redis write failed", exc_info=True) + + def _gather_connections(self) -> list: + """Extract lightweight connection info from PeerAgent. + + ``conn`` is a DirectedConnection (not a PeerConnection), so it + exposes ``state`` but no ``is_connected()`` method. Connection + readiness is determined via ``agent._is_connection_connected``, + which consults the separate ``_connected_peers`` set. + """ + result = [] + try: + agent = self._agent + with agent._connections_lock: + items = list(agent._connections.items()) + for conn_id, conn in items: + try: + result.append( + { + "conn_id": conn_id, + "peer": conn.peer_alias, + "local_nic": conn.local_key.device, + "remote_nic": conn.peer_key.device, + "state": conn.state, + "connected": agent._is_connection_connected(conn_id), + } + ) + except Exception: + logger.debug( + "obs _gather_connections: failed to snapshot conn_id=%s", + conn_id, + exc_info=True, + ) + except Exception: + logger.debug("obs _gather_connections failed", exc_info=True) + return result + + def _get_redis_client(self) -> Any: + """Get a Redis client from the PeerAgent if available.""" + if not self._redis_enabled: + return None + try: + return self._agent._redis_client + except AttributeError: + return None diff --git a/dlslime/peer_agent/_agent.py b/dlslime/peer_agent/_agent.py index ef44a7f..7b7087f 100644 --- a/dlslime/peer_agent/_agent.py +++ b/dlslime/peer_agent/_agent.py @@ -9,6 +9,7 @@ import inspect import json +import os import threading import time from concurrent.futures import ThreadPoolExecutor @@ -299,6 +300,15 @@ def __init__( # Start heartbeat thread (keeps agent alive in NanoCtrl) self._start_heartbeat() + # Start observability reporter (if enabled) + self._obs_reporter = None + self._scope = scope # kept for ObsReporter to read + if os.environ.get("DLSLIME_OBS", "") not in ("", "0", "false"): + from ._accounting import ObsReporter + + self._obs_reporter = ObsReporter(self) + self._obs_reporter.start() + # ------------------------------------------------------------------ # Registration / lifecycle # ------------------------------------------------------------------ @@ -1627,6 +1637,11 @@ def shutdown(self) -> None: logger.info("PeerAgent %s: Shutting down...", self.alias) self._stop_event.set() + + # Stop obs reporter first (lightweight, no dependency on endpoints) + if getattr(self, "_obs_reporter", None) is not None: + self._obs_reporter.stop() + if self._mailbox: self._mailbox.stop() diff --git a/examples/python/p2p_rdma_rc_read_ctrl_plane.py b/examples/python/p2p_rdma_rc_read_ctrl_plane.py index 1d00c6c..f1838e2 100755 --- a/examples/python/p2p_rdma_rc_read_ctrl_plane.py +++ b/examples/python/p2p_rdma_rc_read_ctrl_plane.py @@ -36,14 +36,12 @@ def print_topology_discovery(agent, label, peer_aliases): initiator_agent = start_peer_agent( # alias=None (default) - NanoCtrl will auto-generate unique name nanoctrl_url="http://127.0.0.1:3000", - alias="dlslime0", scope=EXAMPLE_SCOPE, ) target_agent = start_peer_agent( # alias=None (default) - NanoCtrl will auto-generate unique name nanoctrl_url="http://127.0.0.1:3000", - alias="dlslime1", scope=EXAMPLE_SCOPE, ) diff --git a/tests/python/test_obs_counters.py b/tests/python/test_obs_counters.py new file mode 100644 index 0000000..268c104 --- /dev/null +++ b/tests/python/test_obs_counters.py @@ -0,0 +1,188 @@ +"""Tests for C++ observability counters via pybind11.""" + +import os +import threading + +import pytest + + +@pytest.fixture(autouse=True) +def _set_obs_env(monkeypatch): + """Enable obs for the test process.""" + monkeypatch.setenv("DLSLIME_OBS", "1") + + +def _import_slime_c(): + try: + import dlslime._slime_c as _c + + return _c + except ImportError: + pytest.skip("dlslime._slime_c not available (build without RDMA?)") + + +class TestObsEnabled: + """obs_enabled() reflects DLSLIME_OBS env var.""" + + def test_enabled(self): + _c = _import_slime_c() + # Note: obs_enabled() reads env once at first call (static bool). + # If another test already called it with DLSLIME_OBS=0, this + # would be sticky. In practice the env is set via autouse fixture + # before any import. + assert _c.obs_enabled() in (True, False) # at least callable + + def test_snapshot_returns_dict(self): + _c = _import_slime_c() + snap = _c.obs_snapshot() + assert isinstance(snap, dict) + # Should have 'enabled' key + assert "enabled" in snap + + +class TestObsSnapshot: + """obs_snapshot() returns well-structured data.""" + + def test_snapshot_structure_when_enabled(self): + _c = _import_slime_c() + snap = _c.obs_snapshot() + + if not snap.get("enabled"): + pytest.skip("obs not enabled (env var read at import time)") + + assert "summary" in snap + assert "nics" in snap + + summary = snap["summary"] + for key in [ + "assign_total", + "batch_total", + "submitted_bytes_total", + "completed_bytes_total", + "failed_bytes_total", + "pending_ops", + "error_total", + # User/sys MR split (new) — with back-compat aliases + "user_mr_count", + "user_mr_bytes", + "sys_mr_count", + "sys_mr_bytes", + "mr_count", + "mr_bytes", + # Aggregated per-op pending (new) + "pending_by_op", + ]: + assert key in summary, f"Missing summary key: {key}" + + # Back-compat: mr_count/mr_bytes must equal user_mr_count/user_mr_bytes + assert summary["mr_count"] == summary["user_mr_count"] + assert summary["mr_bytes"] == summary["user_mr_bytes"] + + # pending_by_op shape: at least the one-sided ops must appear + pending_by_op = summary["pending_by_op"] + assert isinstance(pending_by_op, dict) + for op_name in ("read", "write", "write_with_imm"): + assert op_name in pending_by_op + + def test_reset_for_test(self): + _c = _import_slime_c() + _c.obs_reset_for_test() + + snap = _c.obs_snapshot() + if not snap.get("enabled"): + pytest.skip("obs not enabled") + + summary = snap["summary"] + assert summary["assign_total"] == 0 + assert summary["batch_total"] == 0 + assert summary["submitted_bytes_total"] == 0 + assert summary["completed_bytes_total"] == 0 + assert summary["pending_ops"] == 0 + for v in summary["pending_by_op"].values(): + assert v == 0 + + def test_nics_array(self): + _c = _import_slime_c() + snap = _c.obs_snapshot() + if not snap.get("enabled"): + pytest.skip("obs not enabled") + + nics = snap["nics"] + assert isinstance(nics, list) + + # If NICs are registered, each should have required fields + for nic in nics: + assert "nic" in nic + assert "assign_total" in nic + assert "batch_total" in nic + assert "cq_errors_total" in nic + + +class TestObsRegisterNicRace: + """Concurrent obs_register_nic("mlx5_0") must collapse to a single slot.""" + + def test_concurrent_same_name_returns_single_slot(self): + _c = _import_slime_c() + if not hasattr(_c, "obs_register_nic_for_test"): + pytest.skip("obs_register_nic_for_test not exposed") + if not _c.obs_enabled(): + pytest.skip("obs not enabled") + + name = "test_nic_race_mlx5_0" + results: list[int] = [] + lock = threading.Lock() + barrier = threading.Barrier(8) + + def worker(): + barrier.wait() + slot = _c.obs_register_nic_for_test(name) + with lock: + results.append(slot) + + threads = [threading.Thread(target=worker) for _ in range(8)] + for t in threads: + t.start() + for t in threads: + t.join() + + # All threads must receive the same slot id. + assert len(results) == 8 + assert all(r == results[0] for r in results), results + assert results[0] >= 0 + + # The snapshot must contain exactly one entry for this name. + snap = _c.obs_snapshot() + matching = [n for n in snap.get("nics", []) if n.get("nic") == name] + assert len(matching) == 1, matching + + +class TestObsNoTwoSideSubmit: + """v0: production code must not call obs_record_submit for send/recv/immRecv.""" + + def test_no_twoside_submit_in_rdma_endpoint(self): + import pathlib + + src = pathlib.Path(__file__).resolve().parents[2] / ( + "dlslime/csrc/engine/rdma/rdma_endpoint.cpp" + ) + if not src.exists(): + pytest.skip(f"source not found at {src}") + + text = src.read_text() + + # obs_record_submit may legitimately appear only inside the three + # one-sided functions: read(), write(), writeWithImm(). Detect + # by scanning forward from each helper's definition boundary. + twoside_defs = [ + "RDMAEndpoint::send(", + "RDMAEndpoint::recv(", + "RDMAEndpoint::immRecv(", + ] + for sig in twoside_defs: + start = text.find(sig) + assert start != -1, f"expected to find {sig} in rdma_endpoint.cpp" + # take a ~2000-char window which comfortably covers one function + window = text[start : start + 2000] + assert ( + "obs::obs_record_submit" not in window + ), f"{sig} must not call obs::obs_record_submit in v0" diff --git a/tests/python/test_obs_multi_qp.py b/tests/python/test_obs_multi_qp.py new file mode 100644 index 0000000..68014c4 --- /dev/null +++ b/tests/python/test_obs_multi_qp.py @@ -0,0 +1,324 @@ +"""Multi-QP observability correctness test. + +Drives a one-sided RDMA write across `num_qp > 1` through two real +PeerAgents and asserts the obs counters behave per the v0 contract: + + * `completed_bytes_total` == bytes written (NOT num_qp * bytes) + * `pending_ops` returns to 0 + * `pending_by_op["write"]` returns to 0 + * `pending_ops` never observed negative at any sample point + +This is the guard for the P0 fix in this PR that moved semantic +completion accounting from per-CQE (over-counted by num_qp) to +per-EndpointOpState with an exchange-guarded once-only record. + +Skips cleanly when NanoCtrl / Redis / RDMA NICs are not available. +""" + +from __future__ import annotations + +import os +import time +import uuid + +import pytest + + +NUM_QP = 4 +BUFFER_BYTES = 64 * 1024 # 64 KiB is enough to exercise all 4 QPs + + +def _need_env(): + # Obs must be on for counters to update at all. + if os.environ.get("DLSLIME_OBS", "") in ("", "0", "false"): + pytest.skip("DLSLIME_OBS not enabled; re-run with DLSLIME_OBS=1") + + +def _need_torch(): + try: + import torch # noqa: F401 + + return torch + except ImportError: + pytest.skip("torch not available") + + +def _need_slime_c(): + try: + import dlslime._slime_c as _c # type: ignore + + return _c + except ImportError: + pytest.skip("dlslime._slime_c not available") + + +def _need_nanoctrl(url: str) -> None: + """Skip if NanoCtrl isn't reachable — the two-PeerAgent dance needs it.""" + try: + import httpx + + httpx.get(url, timeout=1.0) + except Exception as exc: + pytest.skip(f"NanoCtrl not reachable at {url}: {exc}") + + +@pytest.fixture(autouse=True) +def _enable_obs(monkeypatch): + monkeypatch.setenv("DLSLIME_OBS", "1") + + +def test_multi_qp_completion_is_counted_once(): + """One user-visible write() with num_qp=4 must decrement pending_ops + exactly once and credit `completed_bytes_total` exactly once — not + num_qp times. + """ + _need_env() + torch = _need_torch() + _c = _need_slime_c() + + if not _c.obs_enabled(): + pytest.skip("obs_enabled() is False; env var was not set at import") + if not getattr(_c, "_BUILD_RDMA", False): + pytest.skip("build does not include RDMA") + + nanoctrl_url = os.environ.get("NANOCTRL_URL", "http://127.0.0.1:3000") + _need_nanoctrl(nanoctrl_url) + + try: + from dlslime import start_peer_agent + except ImportError: + pytest.skip("dlslime.start_peer_agent not importable") + + # Unique scope per test run so we don't collide with other live agents. + scope = f"test-obs-multi-qp-{uuid.uuid4().hex[:8]}" + + # Baseline the counters. obs_reset_for_test() zeros peer + per-NIC atomics. + _c.obs_reset_for_test() + baseline = _c.obs_snapshot()["summary"] + assert baseline["pending_ops"] == 0 + assert baseline["completed_bytes_total"] == 0 + + initiator = None + target = None + try: + try: + initiator = start_peer_agent( + nanoctrl_url=nanoctrl_url, + alias=f"obs-init-{uuid.uuid4().hex[:6]}", + scope=scope, + ) + target = start_peer_agent( + nanoctrl_url=nanoctrl_url, + alias=f"obs-tgt-{uuid.uuid4().hex[:6]}", + scope=scope, + ) + except Exception as exc: + pytest.skip(f"PeerAgent start failed (likely no RDMA NIC): {exc}") + + # num_qp=NUM_QP on BOTH sides — symmetric, otherwise connect + # reconciliation will refuse. + try: + c1 = initiator.connect_to(target.alias, ib_port=1, qp_num=NUM_QP) + c2 = target.connect_to(initiator.alias, ib_port=1, qp_num=NUM_QP) + c1.wait() + c2.wait() + except Exception as exc: + pytest.skip(f"peer connect failed (no RDMA or wrong port): {exc}") + + # Register a local MR on each side with the **same name** — + # agent.write(peer, [(name, ...)]) looks up `name` locally (as + # the source MR) and on the peer (as the destination MR). + src = torch.zeros([BUFFER_BYTES], dtype=torch.uint8, device="cpu") + dst = torch.zeros([BUFFER_BYTES], dtype=torch.uint8, device="cpu") + src.fill_(0xAB) + + initiator.register_memory_region( + "kv", + src.data_ptr(), + int(src.storage_offset()), + src.numel() * src.itemsize, + ) + target.register_memory_region( + "kv", + dst.data_ptr(), + int(dst.storage_offset()), + dst.numel() * dst.itemsize, + ) + + # Issue a single user-visible write. The transfer is + # striped across NUM_QP QPs under the hood — that's the + # condition under which the original PR over-counted. + fut = initiator.write( + target.alias, + [("kv", 0, 0, BUFFER_BYTES)], + ) + fut.wait() + + # Give the (asynchronous) free-slot release a chance to fire the + # final qpi callback, in case wait() returned before the last + # slot_qp_mask or after all four but before the exchange record. + # The accounting is synchronous inside the callback, but scheduling + # can still race the snapshot by microseconds. + for _ in range(50): + snap = _c.obs_snapshot()["summary"] + if snap["pending_ops"] == 0 and snap["completed_bytes_total"] > 0: + break + time.sleep(0.01) + + # Sample the final state. + summary = _c.obs_snapshot()["summary"] + + # The smoking gun: bytes must equal the single user write, not + # num_qp * write. The pre-fix code did NUM_QP * BUFFER_BYTES here. + assert summary["completed_bytes_total"] == BUFFER_BYTES, ( + f"completed_bytes_total should be exactly {BUFFER_BYTES}, " + f"got {summary['completed_bytes_total']} " + f"(num_qp={NUM_QP}, num_qp*bytes={NUM_QP * BUFFER_BYTES})" + ) + + # Semantic pending must return to zero. + assert ( + summary["pending_ops"] == 0 + ), f"pending_ops leaked; got {summary['pending_ops']} after one write" + + # pending_by_op breakdown must agree: no write is in-flight. + assert summary["pending_by_op"]["write"] == 0, ( + f"pending_by_op['write'] leaked; got " + f"{summary['pending_by_op']['write']}" + ) + + # submit counter should have incremented by exactly one batch. + # (submit increments batch_total by 1 per user op) + assert summary["batch_total"] >= 1 + assert summary["submitted_bytes_total"] == BUFFER_BYTES, ( + f"submitted_bytes_total = {summary['submitted_bytes_total']}, " + f"expected exactly {BUFFER_BYTES}" + ) + + # No failures. + assert summary["failed_bytes_total"] == 0 + assert summary["error_total"] == 0 + + finally: + # Tear down explicitly. shutdown() also emits the final + # "stopped" snapshot via the ObsReporter. + for agent in (initiator, target): + if agent is not None: + try: + agent.shutdown() + except Exception: + pass + + +def test_multi_qp_pending_never_negative_across_writes(): + """Over a small burst of writes with num_qp=4, pending_ops sampled + between submit and wait must always be non-negative. This is the + other failure mode of the original bug: multi-QP callbacks fired + fetch_sub(1) per CQE, driving pending_ops below zero. + """ + _need_env() + torch = _need_torch() + _c = _need_slime_c() + + if not _c.obs_enabled(): + pytest.skip("obs_enabled() is False") + if not getattr(_c, "_BUILD_RDMA", False): + pytest.skip("build does not include RDMA") + + nanoctrl_url = os.environ.get("NANOCTRL_URL", "http://127.0.0.1:3000") + _need_nanoctrl(nanoctrl_url) + + try: + from dlslime import start_peer_agent + except ImportError: + pytest.skip("dlslime.start_peer_agent not importable") + + scope = f"test-obs-multi-qp-burst-{uuid.uuid4().hex[:8]}" + _c.obs_reset_for_test() + + initiator = None + target = None + try: + try: + initiator = start_peer_agent( + nanoctrl_url=nanoctrl_url, + alias=f"obs-burst-init-{uuid.uuid4().hex[:6]}", + scope=scope, + ) + target = start_peer_agent( + nanoctrl_url=nanoctrl_url, + alias=f"obs-burst-tgt-{uuid.uuid4().hex[:6]}", + scope=scope, + ) + c1 = initiator.connect_to(target.alias, ib_port=1, qp_num=NUM_QP) + c2 = target.connect_to(initiator.alias, ib_port=1, qp_num=NUM_QP) + c1.wait() + c2.wait() + except Exception as exc: + pytest.skip(f"RDMA setup failed: {exc}") + + src = torch.zeros([BUFFER_BYTES], dtype=torch.uint8, device="cpu") + dst = torch.zeros([BUFFER_BYTES], dtype=torch.uint8, device="cpu") + src.fill_(0xCD) + + initiator.register_memory_region( + "kv", + src.data_ptr(), + int(src.storage_offset()), + src.numel() * src.itemsize, + ) + target.register_memory_region( + "kv", + dst.data_ptr(), + int(dst.storage_offset()), + dst.numel() * dst.itemsize, + ) + + NUM_WRITES = 8 + futures = [] + pending_samples = [] + + for _ in range(NUM_WRITES): + fut = initiator.write(target.alias, [("kv", 0, 0, BUFFER_BYTES)]) + futures.append(fut) + # Sample mid-burst so we catch pending_ops while in-flight. + pending_samples.append(_c.obs_snapshot()["summary"]["pending_ops"]) + + for fut in futures: + fut.wait() + + for _ in range(100): + snap = _c.obs_snapshot()["summary"] + if snap["pending_ops"] == 0: + break + time.sleep(0.01) + + summary = _c.obs_snapshot()["summary"] + + # The floor: no QP callback may drive pending below zero. + assert ( + min(pending_samples) >= 0 + ), f"pending_ops went negative during burst: samples={pending_samples}" + + # After the whole burst settles, everything returns to zero. + assert summary["pending_ops"] == 0, ( + f"pending_ops leaked after {NUM_WRITES} writes: " + f"got {summary['pending_ops']}" + ) + assert summary["pending_by_op"]["write"] == 0 + + # Bytes: exactly NUM_WRITES * BUFFER_BYTES, not + # NUM_QP * NUM_WRITES * BUFFER_BYTES. + assert summary["completed_bytes_total"] == NUM_WRITES * BUFFER_BYTES, ( + f"completed_bytes_total = {summary['completed_bytes_total']}, " + f"expected {NUM_WRITES * BUFFER_BYTES} " + f"(NOT {NUM_QP * NUM_WRITES * BUFFER_BYTES})" + ) + + finally: + for agent in (initiator, target): + if agent is not None: + try: + agent.shutdown() + except Exception: + pass diff --git a/tests/python/test_obs_reporter.py b/tests/python/test_obs_reporter.py new file mode 100644 index 0000000..7ef9e97 --- /dev/null +++ b/tests/python/test_obs_reporter.py @@ -0,0 +1,236 @@ +"""Tests for the ObsReporter daemon thread.""" + +import json +import os +import threading +import time +import types + +import pytest + + +def _try_import_accounting(): + try: + from dlslime.peer_agent._accounting import ObsReporter + + return ObsReporter + except ImportError: + pytest.skip("dlslime.peer_agent._accounting not available") + + +class FakeAgent: + """Minimal mock of PeerAgent for ObsReporter tests.""" + + def __init__(self, alias="test-agent", scope="test-scope"): + self.alias = alias + self._scope = scope + self._connections_lock = threading.Lock() + self._connections = {} + self._connected_set: set = set() + self._redis_client = None + + def _is_connection_connected(self, conn_id: str) -> bool: + return conn_id in self._connected_set + + +def _make_fake_directed_conn( + peer_alias: str = "peer-B", + local_device: str = "mlx5_0", + remote_device: str = "mlx5_1", + state: str = "connected", +) -> types.SimpleNamespace: + """Build a duck-typed DirectedConnection fit for _gather_connections().""" + return types.SimpleNamespace( + peer_alias=peer_alias, + local_key=types.SimpleNamespace(device=local_device), + peer_key=types.SimpleNamespace(device=remote_device), + state=state, + ) + + +class TestObsReporterUnit: + """Unit tests for ObsReporter (no Redis needed).""" + + def test_reporter_starts_and_stops(self, monkeypatch): + monkeypatch.setenv("DLSLIME_OBS", "1") + monkeypatch.setenv("DLSLIME_OBS_TIME_STEP_MS", "100") + monkeypatch.setenv("DLSLIME_OBS_REDIS", "0") # disable Redis + + ObsReporter = _try_import_accounting() + agent = FakeAgent() + reporter = ObsReporter(agent) + + reporter.start() + assert reporter._thread is not None + assert reporter._thread.is_alive() + + time.sleep(0.3) # let a few ticks happen + reporter.stop() + assert reporter._thread is None + + def test_reporter_session_id(self, monkeypatch): + monkeypatch.setenv("DLSLIME_OBS", "1") + monkeypatch.setenv("DLSLIME_OBS_REDIS", "0") + + ObsReporter = _try_import_accounting() + agent = FakeAgent(alias="my-agent") + reporter = ObsReporter(agent) + + assert reporter._session_id.startswith("my-agent:") + parts = reporter._session_id.split(":") + assert len(parts) == 3 + assert int(parts[1]) == os.getpid() + + def test_gather_connections_empty(self, monkeypatch): + monkeypatch.setenv("DLSLIME_OBS", "1") + monkeypatch.setenv("DLSLIME_OBS_REDIS", "0") + + ObsReporter = _try_import_accounting() + agent = FakeAgent() + reporter = ObsReporter(agent) + + conns = reporter._gather_connections() + assert conns == [] + + def test_gather_connections_with_directed_connection(self, monkeypatch): + """_gather_connections must use conn.state + agent._is_connection_connected, + not a nonexistent DirectedConnection.is_connected() method.""" + monkeypatch.setenv("DLSLIME_OBS", "1") + monkeypatch.setenv("DLSLIME_OBS_REDIS", "0") + + ObsReporter = _try_import_accounting() + agent = FakeAgent() + reporter = ObsReporter(agent) + + conn_id = "peer-B:mlx5_0->peer-B:mlx5_1" + agent._connections[conn_id] = _make_fake_directed_conn() + agent._connected_set.add(conn_id) + + conns = reporter._gather_connections() + assert len(conns) == 1 + c = conns[0] + assert c["conn_id"] == conn_id + assert c["peer"] == "peer-B" + assert c["local_nic"] == "mlx5_0" + assert c["remote_nic"] == "mlx5_1" + assert c["state"] == "connected" + assert c["connected"] is True + + def test_gather_connections_disconnected(self, monkeypatch): + monkeypatch.setenv("DLSLIME_OBS", "1") + monkeypatch.setenv("DLSLIME_OBS_REDIS", "0") + + ObsReporter = _try_import_accounting() + agent = FakeAgent() + reporter = ObsReporter(agent) + + conn_id = "not-yet-ready" + agent._connections[conn_id] = _make_fake_directed_conn(state="connecting") + # _connected_set intentionally empty + + conns = reporter._gather_connections() + assert len(conns) == 1 + assert conns[0]["state"] == "connecting" + assert conns[0]["connected"] is False + + +class TestObsReporterRedis: + """Integration tests requiring a local Redis instance.""" + + @pytest.fixture + def redis_client(self): + try: + import redis as redis_mod + + client = redis_mod.Redis(host="127.0.0.1", port=6379, decode_responses=True) + client.ping() + yield client + # Cleanup + for key in client.scan_iter("test-obs:obs:peer:*"): + client.delete(key) + except Exception: + pytest.skip("Redis not available at localhost:6379") + + def test_reporter_writes_to_redis(self, monkeypatch, redis_client): + monkeypatch.setenv("DLSLIME_OBS", "1") + monkeypatch.setenv("DLSLIME_OBS_TIME_STEP_MS", "200") + monkeypatch.setenv("DLSLIME_OBS_REDIS", "1") + + ObsReporter = _try_import_accounting() + agent = FakeAgent(alias="redis-test-agent", scope="test-obs") + agent._redis_client = redis_client + + reporter = ObsReporter(agent) + reporter.start() + + # Wait for at least one alive tick to write — then read before + # stop() emits the final stopped snapshot (which overwrites the + # same key with a shorter TTL and status="stopped"). + time.sleep(1.0) + + key = "test-obs:obs:peer:redis-test-agent" + val = redis_client.get(key) + reporter.stop() + + if val is None: + pytest.skip( + "Reporter did not write (obs might not be enabled at C++ level)" + ) + + snap = json.loads(val) + + # Full schema v0: every documented field must be present. + for k in ( + "schema_version", + "session_id", + "peer_id", + "host", + "pid", + "reported_at_ms", + "summary", + "nics", + "connections", + "ewma_bandwidth_bps", + ): + assert k in snap, f"missing schema field: {k}" + + assert snap["schema_version"] == 1 + assert snap["peer_id"] == "redis-test-agent" + assert snap["pid"] == os.getpid() + assert isinstance(snap["summary"], dict) + assert isinstance(snap["nics"], list) + assert isinstance(snap["connections"], list) + # Alive snapshot must NOT carry the stopped marker. + assert snap.get("status") != "stopped" + + def test_reporter_writes_stopped_snapshot_on_stop(self, monkeypatch, redis_client): + """On graceful stop() the reporter must publish a final snapshot with + status="stopped" so operators see the transition immediately instead + of waiting for Redis TTL to evict the last alive snapshot.""" + monkeypatch.setenv("DLSLIME_OBS", "1") + monkeypatch.setenv("DLSLIME_OBS_TIME_STEP_MS", "200") + monkeypatch.setenv("DLSLIME_OBS_REDIS", "1") + + ObsReporter = _try_import_accounting() + agent = FakeAgent(alias="stopped-test-agent", scope="test-obs") + agent._redis_client = redis_client + + reporter = ObsReporter(agent) + reporter.start() + time.sleep(0.6) # at least one alive tick + reporter.stop() # must emit final stopped snapshot synchronously + + key = "test-obs:obs:peer:stopped-test-agent" + val = redis_client.get(key) + if val is None: + pytest.skip( + "Reporter did not write (obs might not be enabled at C++ level)" + ) + + snap = json.loads(val) + assert snap.get("status") == "stopped" + assert snap["peer_id"] == "stopped-test-agent" + + # Stopped snapshots use a short TTL so they clear quickly. + ttl = redis_client.pttl(key) + assert 0 < ttl <= 15_000, f"stopped snapshot TTL should be short; got {ttl}"