diff --git a/src/cluster/mod.rs b/src/cluster/mod.rs index 2c81038..6989526 100644 --- a/src/cluster/mod.rs +++ b/src/cluster/mod.rs @@ -21,6 +21,7 @@ use crate::auth::{AuthAction, BackendAuth, FrontendAuthenticator}; use crate::backend::client::{ClientId, FrontConnectionGuard}; use crate::backend::pool::{BackendNode, ConnectionPool, Connector, SessionCommand}; use crate::config::ClusterConfig; +use crate::info::{InfoContext, ProxyMode}; use crate::metrics; use crate::protocol::redis::{ BlockingKind, MultiDispatch, RedisCommand, RespCodec, RespValue, SlotMap, SubCommand, @@ -50,6 +51,8 @@ pub struct ClusterProxy { pool: Arc>, fetch_trigger: mpsc::UnboundedSender<()>, backend_timeout: Duration, + listen_port: u16, + seed_nodes: usize, } impl ClusterProxy { @@ -77,6 +80,7 @@ impl ClusterProxy { .transpose()? .map(Arc::new); + let listen_port = config.listen_port()?; let proxy = Self { cluster: cluster.clone(), hash_tag, @@ -87,6 +91,8 @@ impl ClusterProxy { pool: pool.clone(), fetch_trigger: trigger_tx.clone(), backend_timeout: Duration::from_millis(timeout_ms), + listen_port, + seed_nodes: config.servers.len(), }; // trigger an immediate topology fetch @@ -262,6 +268,17 @@ impl ClusterProxy { } } } + if let Some(response) = self.try_handle_info(&cmd) { + metrics::front_command( + self.cluster.as_ref(), + cmd.kind_label(), + true, + ); + let fut = async move { response }; + pending.push_back(Box::pin(fut)); + inflight += 1; + continue; + } let guard = self.prepare_dispatch(client_id, cmd); pending.push_back(Box::pin(guard)); inflight += 1; @@ -302,6 +319,24 @@ impl ClusterProxy { Ok(()) } + fn try_handle_info(&self, command: &RedisCommand) -> Option { + if !command.command_name().eq_ignore_ascii_case(b"INFO") { + return None; + } + let section = command + .args() + .get(1) + .map(|arg| String::from_utf8_lossy(arg).to_string()); + let context = InfoContext { + cluster: self.cluster.as_ref(), + mode: ProxyMode::Cluster, + listen_port: self.listen_port, + backend_nodes: self.seed_nodes, + }; + let payload = crate::info::render_info(context, section.as_deref()); + Some(RespValue::BulkString(payload)) + } + async fn run_subscription( &self, parts: FramedParts, diff --git a/src/info.rs b/src/info.rs new file mode 100644 index 0000000..1bcf264 --- /dev/null +++ b/src/info.rs @@ -0,0 +1,234 @@ +use std::time::{SystemTime, UNIX_EPOCH}; + +use bytes::Bytes; +use once_cell::sync::Lazy; + +use crate::metrics::{self, FrontCommandStats}; + +static START_TIME: Lazy = Lazy::new(SystemTime::now); + +#[derive(Debug, Clone, Copy)] +pub enum ProxyMode { + Standalone, + Cluster, +} + +impl ProxyMode { + pub fn as_str(self) -> &'static str { + match self { + ProxyMode::Standalone => "standalone", + ProxyMode::Cluster => "cluster", + } + } +} + +pub struct InfoContext<'a> { + pub cluster: &'a str, + pub mode: ProxyMode, + pub listen_port: u16, + pub backend_nodes: usize, +} + +pub fn render_info(context: InfoContext<'_>, section: Option<&str>) -> Bytes { + let uptime = SystemTime::now() + .duration_since(*START_TIME) + .unwrap_or_default(); + let uptime_seconds = uptime.as_secs(); + let uptime_days = uptime_seconds / 86_400; + let startup_time_unix = START_TIME + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + let process_id = std::process::id(); + let version = env!("CARGO_PKG_VERSION"); + let arch_bits = (std::mem::size_of::() * 8) as u64; + let build_target = format!("{}-{}", std::env::consts::OS, std::env::consts::ARCH); + + let connected_clients = metrics::front_connections_current(context.cluster); + let total_connections = metrics::front_connections_total(context.cluster); + let command_stats = metrics::front_command_stats(context.cluster); + let memory_bytes = metrics::memory_usage_bytes(); + let cpu_percent = metrics::cpu_usage_percent(); + let global_errors = metrics::global_error_count(); + + let sections = collect_sections( + &context, + uptime_seconds, + uptime_days, + startup_time_unix, + process_id, + version, + arch_bits, + &build_target, + connected_clients, + total_connections, + command_stats, + memory_bytes, + cpu_percent, + global_errors, + ); + + let filter = section + .map(|s| s.trim().to_ascii_lowercase()) + .filter(|s| !s.is_empty()); + + let mut output = String::new(); + for (name, entries) in sections { + if !should_include(&filter, name) { + continue; + } + if !output.is_empty() { + output.push_str("\r\n"); + } + output.push_str("# "); + output.push_str(name); + output.push_str("\r\n"); + for (key, value) in entries { + output.push_str(&key); + output.push(':'); + output.push_str(&value); + output.push_str("\r\n"); + } + } + + if output.is_empty() { + output.push_str("\r\n"); + } + + Bytes::from(output) +} + +fn collect_sections( + context: &InfoContext<'_>, + uptime_seconds: u64, + uptime_days: u64, + startup_time_unix: u64, + process_id: u32, + version: &str, + arch_bits: u64, + build_target: &str, + connected_clients: u64, + total_connections: u64, + command_stats: FrontCommandStats, + memory_bytes: u64, + cpu_percent: f64, + global_errors: u64, +) -> Vec<(&'static str, Vec<(String, String)>)> { + let mut sections = Vec::new(); + + sections.push(( + "Server", + vec![ + ("aster_version".to_string(), version.to_string()), + ("aster_mode".to_string(), context.mode.as_str().to_string()), + ("cluster_name".to_string(), context.cluster.to_string()), + ("process_id".to_string(), process_id.to_string()), + ("tcp_port".to_string(), context.listen_port.to_string()), + ("arch_bits".to_string(), arch_bits.to_string()), + ("os".to_string(), std::env::consts::OS.to_string()), + ("build_target".to_string(), build_target.to_string()), + ( + "startup_time_unix".to_string(), + startup_time_unix.to_string(), + ), + ("uptime_in_seconds".to_string(), uptime_seconds.to_string()), + ("uptime_in_days".to_string(), uptime_days.to_string()), + ], + )); + + sections.push(( + "Clients", + vec![ + ( + "connected_clients".to_string(), + connected_clients.to_string(), + ), + ( + "total_connections_received".to_string(), + total_connections.to_string(), + ), + ], + )); + + sections.push(( + "Stats", + vec![ + ( + "total_commands_processed".to_string(), + command_stats.total().to_string(), + ), + ( + "total_commands_succeeded".to_string(), + command_stats.total_ok().to_string(), + ), + ( + "total_commands_failed".to_string(), + command_stats.total_fail().to_string(), + ), + ("global_error_count".to_string(), global_errors.to_string()), + ], + )); + + sections.push(( + "Memory", + vec![ + ("used_memory".to_string(), memory_bytes.to_string()), + ("used_memory_human".to_string(), format_bytes(memory_bytes)), + ], + )); + + sections.push(( + "CPU", + vec![( + "used_cpu_percent".to_string(), + format!("{:.2}", cpu_percent), + )], + )); + + sections.push(( + "Proxy", + vec![ + ("proxy_mode".to_string(), context.mode.as_str().to_string()), + ( + "backend_nodes".to_string(), + context.backend_nodes.to_string(), + ), + ("cluster".to_string(), context.cluster.to_string()), + ], + )); + + sections +} + +fn should_include(filter: &Option, section: &str) -> bool { + match filter.as_deref() { + None => true, + Some("all") | Some("default") | Some("everything") => true, + Some(candidate) => candidate == section.to_ascii_lowercase(), + } +} + +fn format_bytes(bytes: u64) -> String { + const UNITS: [&str; 5] = ["B", "KB", "MB", "GB", "TB"]; + if bytes == 0 { + return "0B".to_string(); + } + + let mut value = bytes as f64; + let mut unit = "B"; + for candidate in UNITS.iter() { + unit = candidate; + if value < 1024.0 { + break; + } + if *candidate != "TB" { + value /= 1024.0; + } + } + if unit == "B" { + format!("{bytes}{unit}") + } else { + format!("{value:.2}{unit}") + } +} diff --git a/src/lib.rs b/src/lib.rs index 4a99ae5..685a3e5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,6 +19,7 @@ pub mod auth; pub mod backend; pub mod cluster; pub mod config; +pub mod info; pub mod meta; pub mod metrics; pub mod protocol; diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs index 6e1d645..bbd4f71 100644 --- a/src/metrics/mod.rs +++ b/src/metrics/mod.rs @@ -215,6 +215,31 @@ pub fn register_version(version: &str) { VERSION_GAUGE.with_label_values(&[version]).set(1.0); } +#[derive(Debug, Default, Clone, Copy)] +pub struct FrontCommandStats { + pub read_ok: u64, + pub read_fail: u64, + pub write_ok: u64, + pub write_fail: u64, + pub other_ok: u64, + pub other_fail: u64, + pub invalid_fail: u64, +} + +impl FrontCommandStats { + pub fn total_ok(&self) -> u64 { + self.read_ok + self.write_ok + self.other_ok + } + + pub fn total_fail(&self) -> u64 { + self.read_fail + self.write_fail + self.other_fail + self.invalid_fail + } + + pub fn total(&self) -> u64 { + self.total_ok() + self.total_fail() + } +} + /// Record a new front connection. pub fn front_conn_open(cluster: &str) { FRONT_CONNECTION_INCR.with_label_values(&[cluster]).inc(); @@ -329,6 +354,62 @@ pub fn remote_tracker(cluster: &str) -> Tracker { Tracker::new(REMOTE_TIMER.with_label_values(&[cluster])) } +/// Current frontend connections for a cluster. +pub fn front_connections_current(cluster: &str) -> u64 { + let gauge = FRONT_CONNECTIONS.with_label_values(&[cluster]); + gauge.get().max(0.0).round() as u64 +} + +/// Total frontend connections since proxy start for a cluster. +pub fn front_connections_total(cluster: &str) -> u64 { + let counter = FRONT_CONNECTION_INCR.with_label_values(&[cluster]); + counter.get().max(0) as u64 +} + +/// Aggregate frontend command counters for a cluster. +pub fn front_command_stats(cluster: &str) -> FrontCommandStats { + let mut stats = FrontCommandStats::default(); + let mappings = [ + ("read", "ok"), + ("read", "fail"), + ("write", "ok"), + ("write", "fail"), + ("other", "ok"), + ("other", "fail"), + ("invalid", "fail"), + ]; + for (kind, result) in mappings { + let counter = FRONT_COMMAND_TOTAL.with_label_values(&[cluster, kind, result]); + let value = counter.get().max(0) as u64; + match (kind, result) { + ("read", "ok") => stats.read_ok = value, + ("read", "fail") => stats.read_fail = value, + ("write", "ok") => stats.write_ok = value, + ("write", "fail") => stats.write_fail = value, + ("other", "ok") => stats.other_ok = value, + ("other", "fail") => stats.other_fail = value, + ("invalid", "fail") => stats.invalid_fail = value, + _ => {} + } + } + stats +} + +/// Global error count across all clusters. +pub fn global_error_count() -> u64 { + GLOBAL_ERROR.get().max(0) as u64 +} + +/// Memory usage in bytes, derived from the system monitor gauge (kB). +pub fn memory_usage_bytes() -> u64 { + (MEMORY_USAGE.get().max(0.0) * 1024.0) as u64 +} + +/// CPU usage percentage reported by the system monitor gauge. +pub fn cpu_usage_percent() -> f64 { + CPU_USAGE.get().max(0.0) +} + /// Spawn the metrics HTTP server and system monitor tasks. pub fn spawn_background_tasks(port: u16) -> MetricsHandles { MetricsHandles { diff --git a/src/standalone/mod.rs b/src/standalone/mod.rs index 0604746..63779a9 100644 --- a/src/standalone/mod.rs +++ b/src/standalone/mod.rs @@ -21,6 +21,7 @@ use crate::auth::{AuthAction, BackendAuth, FrontendAuthenticator}; use crate::backend::client::{ClientId, FrontConnectionGuard}; use crate::backend::pool::{BackendNode, ConnectionPool, Connector, SessionCommand}; use crate::config::ClusterConfig; +use crate::info::{InfoContext, ProxyMode}; use crate::metrics; use crate::protocol::redis::{ BlockingKind, MultiDispatch, RedisCommand, RedisResponse, RespCodec, RespValue, SubCommand, @@ -47,6 +48,8 @@ pub struct StandaloneProxy { backend_auth: Option, pool: Arc>, backend_timeout: Duration, + listen_port: u16, + backend_nodes: usize, } impl StandaloneProxy { @@ -78,6 +81,9 @@ impl StandaloneProxy { .map(Arc::new); let pool = Arc::new(ConnectionPool::new(cluster.clone(), connector)); + let backend_nodes = nodes.len(); + let listen_port = config.listen_port()?; + Ok(Self { cluster, hash_tag, @@ -86,6 +92,8 @@ impl StandaloneProxy { backend_auth, pool, backend_timeout: Duration::from_millis(timeout_ms), + listen_port, + backend_nodes, }) } @@ -409,6 +417,12 @@ impl StandaloneProxy { } } + if let Some(response) = self.try_handle_info(&command) { + metrics::front_command(self.cluster.as_ref(), kind_label, true); + framed.send(response).await?; + continue; + } + let response = match self.dispatch(client_id, command).await { Ok(resp) => resp, Err(err) => { @@ -427,6 +441,24 @@ impl StandaloneProxy { Ok(()) } + fn try_handle_info(&self, command: &RedisCommand) -> Option { + if !command.command_name().eq_ignore_ascii_case(b"INFO") { + return None; + } + let section = command + .args() + .get(1) + .map(|arg| String::from_utf8_lossy(arg).to_string()); + let context = InfoContext { + cluster: self.cluster.as_ref(), + mode: ProxyMode::Standalone, + listen_port: self.listen_port, + backend_nodes: self.backend_nodes, + }; + let payload = crate::info::render_info(context, section.as_deref()); + Some(RespValue::BulkString(payload)) + } + fn select_node(&self, client_id: ClientId, command: &RedisCommand) -> Result { if self.ring.is_empty() { bail!("no backend nodes configured");