diff --git a/default.toml b/default.toml index 6b1c707..63c2ad8 100644 --- a/default.toml +++ b/default.toml @@ -22,6 +22,10 @@ password = "example-frontend-secret" # 后端 ACL:此处演示用户名 + 密码;如果后端是简单密码,可直接使用 backend_password backend_auth = { username = "proxy", password = "example-backend-secret" } + # 慢查询日志阈值(微秒);默认 10000,设置为 -1 以关闭记录 + slowlog_log_slower_than = 10000 + # 慢查询日志最大保留条数;默认 128 + slowlog_max_len = 128 [[clusters]] name = "test-cluster" @@ -50,3 +54,7 @@ ] } # 旧版写法依然兼容:backend_password = "raw-secret" backend_password = "cluster-backend-secret" + # 慢查询日志阈值(微秒);默认 10000,设置为 -1 以关闭记录 + slowlog_log_slower_than = 10000 + # 慢查询日志最大保留条数;默认 128 + slowlog_max_len = 128 diff --git a/docs/usage.md b/docs/usage.md index 57a6f9c..b63119c 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -26,9 +26,13 @@ cargo build --release - `hash_tag`:一致性 hash 标签,例如 `{}`。 - `read_timeout` / `write_timeout`:后端超时(毫秒)。 - `read_from_slave`:Cluster 模式下允许从 replica 读取。 +- `slowlog_log_slower_than`:慢查询阈值(微秒,默认 `10000`,设为 `-1` 关闭记录)。 +- `slowlog_max_len`:慢查询日志最大保留条数(默认 `128`)。 - `auth` / `password`:前端 ACL,详见下文。 - `backend_auth` / `backend_password`:后端 ACL 认证,详见下文。 +> 提示:代理原生支持 `SLOWLOG GET/LEN/RESET`,并按集群维度汇总慢查询;配置上述阈值和长度即可控制记录行为。 + 示例参见仓库根目录的 `default.toml`。 ### ACL 配置 diff --git a/src/cluster/mod.rs b/src/cluster/mod.rs index 438e98a..f4aa6c0 100644 --- a/src/cluster/mod.rs +++ b/src/cluster/mod.rs @@ -27,6 +27,7 @@ use crate::protocol::redis::{ BlockingKind, MultiDispatch, RedisCommand, RespCodec, RespValue, SlotMap, SubCommand, SubResponse, SubscriptionKind, SLOT_COUNT, }; +use crate::slowlog::Slowlog; use crate::utils::{crc16, trim_hash_tag}; const FETCH_INTERVAL: Duration = Duration::from_secs(10); @@ -52,6 +53,7 @@ pub struct ClusterProxy { fetch_trigger: mpsc::UnboundedSender<()>, runtime: Arc, config_manager: Arc, + slowlog: Arc, listen_port: u16, seed_nodes: usize, } @@ -83,6 +85,9 @@ impl ClusterProxy { .map(Arc::new); let listen_port = config.listen_port()?; + let slowlog = config_manager + .slowlog_for(&config.name) + .ok_or_else(|| anyhow!("missing slowlog state for cluster {}", config.name))?; let proxy = Self { cluster: cluster.clone(), hash_tag, @@ -94,6 +99,7 @@ impl ClusterProxy { fetch_trigger: trigger_tx.clone(), runtime, config_manager, + slowlog, listen_port, seed_nodes: config.servers.len(), }; @@ -295,6 +301,18 @@ impl ClusterProxy { inflight += 1; continue; } + if let Some(response) = self.try_handle_slowlog(&cmd) { + let success = !response.is_error(); + metrics::front_command( + self.cluster.as_ref(), + cmd.kind_label(), + success, + ); + let fut = async move { response }; + pending.push_back(Box::pin(fut)); + inflight += 1; + continue; + } let guard = self.prepare_dispatch(client_id, cmd); pending.push_back(Box::pin(guard)); inflight += 1; @@ -339,6 +357,16 @@ impl ClusterProxy { self.config_manager.handle_command(command).await } + fn try_handle_slowlog(&self, command: &RedisCommand) -> Option { + if !command.command_name().eq_ignore_ascii_case(b"SLOWLOG") { + return None; + } + Some(crate::slowlog::handle_command( + &self.slowlog, + command.args(), + )) + } + fn try_handle_info(&self, command: &RedisCommand) -> Option { if !command.command_name().eq_ignore_ascii_case(b"INFO") { return None; @@ -646,6 +674,7 @@ impl ClusterProxy { let pool = self.pool.clone(); let fetch_trigger = self.fetch_trigger.clone(); let cluster = self.cluster.clone(); + let slowlog = self.slowlog.clone(); let kind_label = command.kind_label(); Box::pin(async move { match dispatch_with_context( @@ -655,6 +684,7 @@ impl ClusterProxy { pool, fetch_trigger, client_id, + slowlog, command, ) .await @@ -1232,9 +1262,13 @@ async fn dispatch_with_context( pool: Arc>, fetch_trigger: mpsc::UnboundedSender<()>, client_id: ClientId, + slowlog: Arc, command: RedisCommand, ) -> Result { - if let Some(multi) = command.expand_for_multi(hash_tag.as_deref()) { + let command_snapshot = command.clone(); + let multi = command.expand_for_multi(hash_tag.as_deref()); + let started = Instant::now(); + let result = if let Some(multi) = multi { dispatch_multi( hash_tag, read_from_slave, @@ -1256,7 +1290,9 @@ async fn dispatch_with_context( command, ) .await - } + }; + slowlog.maybe_record(&command_snapshot, started.elapsed()); + result } async fn dispatch_multi( diff --git a/src/config/mod.rs b/src/config/mod.rs index c198dd0..908f8f8 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -12,12 +12,21 @@ use tracing::{info, warn}; use crate::auth::{AuthUserConfig, BackendAuthConfig, FrontendAuthConfig}; use crate::protocol::redis::{RedisCommand, RespValue}; +use crate::slowlog::Slowlog; /// Environment variable controlling the default worker thread count when a /// cluster omits the `thread` field. pub const ENV_DEFAULT_THREADS: &str = "ASTER_DEFAULT_THREAD"; const DUMP_VALUE_DEFAULT: &str = "default"; +fn default_slowlog_log_slower_than() -> i64 { + 10_000 +} + +fn default_slowlog_max_len() -> usize { + 128 +} + #[derive(Debug, Clone, Deserialize, Serialize)] pub struct Config { #[serde(default)] @@ -130,6 +139,10 @@ pub struct ClusterConfig { pub backend_auth: Option, #[serde(default)] pub backend_password: Option, + #[serde(default = "default_slowlog_log_slower_than")] + pub slowlog_log_slower_than: i64, + #[serde(default = "default_slowlog_max_len")] + pub slowlog_max_len: usize, } impl ClusterConfig { @@ -161,6 +174,12 @@ impl ClusterConfig { self.name ); } + if self.slowlog_log_slower_than < -1 { + bail!( + "cluster {} slowlog-log-slower-than must be >= -1", + self.name + ); + } Ok(()) } @@ -290,6 +309,7 @@ fn atomic_to_option(value: i64) -> Option { struct ClusterEntry { index: usize, runtime: Arc, + slowlog: Arc, } #[derive(Debug)] @@ -304,14 +324,20 @@ impl ConfigManager { let mut clusters = HashMap::new(); for (index, cluster) in config.clusters().iter().enumerate() { let key = cluster.name.to_ascii_lowercase(); + let runtime = Arc::new(ClusterRuntime::new( + cluster.read_timeout, + cluster.write_timeout, + )); + let slowlog = Arc::new(Slowlog::new( + cluster.slowlog_log_slower_than, + cluster.slowlog_max_len, + )); clusters.insert( key, ClusterEntry { index, - runtime: Arc::new(ClusterRuntime::new( - cluster.read_timeout, - cluster.write_timeout, - )), + runtime, + slowlog, }, ); } @@ -329,6 +355,12 @@ impl ConfigManager { .map(|entry| entry.runtime.clone()) } + pub fn slowlog_for(&self, name: &str) -> Option> { + self.clusters + .get(&name.to_ascii_lowercase()) + .map(|entry| entry.slowlog.clone()) + } + pub async fn handle_command(&self, command: &RedisCommand) -> Option { if !command.command_name().eq_ignore_ascii_case(b"CONFIG") { return None; @@ -429,6 +461,28 @@ impl ConfigManager { "cluster write_timeout updated via CONFIG SET" ); } + ClusterField::SlowlogLogSlowerThan => { + let parsed = parse_slowlog_threshold(value)?; + entry.slowlog.set_threshold(parsed); + let mut guard = self.config.write(); + guard.clusters_mut()[entry.index].slowlog_log_slower_than = parsed; + info!( + cluster = cluster_name, + value = value, + "cluster slowlog_log_slower_than updated via CONFIG SET" + ); + } + ClusterField::SlowlogMaxLen => { + let parsed = parse_slowlog_len(value)?; + entry.slowlog.set_max_len(parsed); + let mut guard = self.config.write(); + guard.clusters_mut()[entry.index].slowlog_max_len = parsed; + info!( + cluster = cluster_name, + value = value, + "cluster slowlog_max_len updated via CONFIG SET" + ); + } } Ok(()) } @@ -457,6 +511,14 @@ impl ConfigManager { format!("cluster.{}.write-timeout", name), option_to_string(runtime.write_timeout()), )); + entries.push(( + format!("cluster.{}.slowlog-log-slower-than", name), + entry.slowlog.threshold().to_string(), + )); + entries.push(( + format!("cluster.{}.slowlog-max-len", name), + entry.slowlog.max_len().to_string(), + )); } } entries.sort_by(|a, b| a.0.cmp(&b.0)); @@ -491,6 +553,8 @@ fn parse_key(key: &str) -> Result<(String, ClusterField)> { let field = match field.to_ascii_lowercase().as_str() { "read-timeout" => ClusterField::ReadTimeout, "write-timeout" => ClusterField::WriteTimeout, + "slowlog-log-slower-than" => ClusterField::SlowlogLogSlowerThan, + "slowlog-max-len" => ClusterField::SlowlogMaxLen, unknown => bail!("unknown cluster field '{}'", unknown), }; Ok((cluster.to_string(), field)) @@ -507,6 +571,28 @@ fn parse_timeout_value(value: &str) -> Result> { Ok(Some(parsed)) } +fn parse_slowlog_threshold(value: &str) -> Result { + let parsed: i64 = value + .trim() + .parse() + .with_context(|| format!("invalid slowlog-log-slower-than value '{}'", value))?; + if parsed < -1 { + bail!("slowlog-log-slower-than must be >= -1"); + } + Ok(parsed) +} + +fn parse_slowlog_len(value: &str) -> Result { + let parsed: i64 = value + .trim() + .parse() + .with_context(|| format!("invalid slowlog-max-len value '{}'", value))?; + if parsed < 0 { + bail!("slowlog-max-len must be >= 0"); + } + usize::try_from(parsed).map_err(|_| anyhow!("slowlog-max-len is too large")) +} + fn option_to_string(value: Option) -> String { value .map(|v| v.to_string()) @@ -530,6 +616,8 @@ fn err_response(message: T) -> RespValue { enum ClusterField { ReadTimeout, WriteTimeout, + SlowlogLogSlowerThan, + SlowlogMaxLen, } fn wildcard_match(pattern: &str, target: &str) -> bool { diff --git a/src/lib.rs b/src/lib.rs index f870cb5..5f72a1a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,6 +23,7 @@ pub mod info; pub mod meta; pub mod metrics; pub mod protocol; +pub mod slowlog; pub mod standalone; pub mod utils; diff --git a/src/slowlog.rs b/src/slowlog.rs new file mode 100644 index 0000000..beb3132 --- /dev/null +++ b/src/slowlog.rs @@ -0,0 +1,321 @@ +use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use bytes::Bytes; +use parking_lot::Mutex; + +use crate::protocol::redis::{RedisCommand, RespValue}; + +#[derive(Clone, Debug)] +pub struct SlowlogEntry { + pub id: i64, + pub timestamp: i64, + pub duration_us: u64, + pub command: Vec, +} + +struct SlowlogState { + next_id: i64, + entries: RingBuffer, +} + +impl SlowlogState { + fn new(capacity: usize) -> Self { + Self { + next_id: 1, + entries: RingBuffer::with_capacity(capacity), + } + } +} + +pub struct Slowlog { + threshold_us: AtomicI64, + max_len: AtomicUsize, + state: Mutex, +} + +impl std::fmt::Debug for Slowlog { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Slowlog") + .field("threshold_us", &self.threshold()) + .field("max_len", &self.max_len()) + .field("len", &self.len()) + .finish() + } +} + +impl Slowlog { + pub fn new(threshold_us: i64, max_len: usize) -> Self { + Self { + threshold_us: AtomicI64::new(threshold_us), + max_len: AtomicUsize::new(max_len), + state: Mutex::new(SlowlogState::new(max_len)), + } + } + + pub fn threshold(&self) -> i64 { + self.threshold_us.load(Ordering::Relaxed) + } + + pub fn max_len(&self) -> usize { + self.max_len.load(Ordering::Relaxed) + } + + pub fn set_threshold(&self, value: i64) { + self.threshold_us.store(value, Ordering::Relaxed); + } + + pub fn set_max_len(&self, value: usize) { + self.max_len.store(value, Ordering::Relaxed); + let mut state = self.state.lock(); + state.entries.resize(value); + } + + pub fn reset(&self) { + let mut state = self.state.lock(); + state.entries.clear(); + } + + pub fn len(&self) -> usize { + self.state.lock().entries.len() + } + + pub fn snapshot(&self, count: Option) -> Vec { + let state = self.state.lock(); + state.entries.newest(count) + } + + pub fn maybe_record(&self, command: &RedisCommand, duration: Duration) { + let threshold = self.threshold(); + if threshold < 0 { + return; + } + + let duration_us = duration.as_micros(); + if duration_us < threshold as u128 { + return; + } + + let clamped_duration = duration_us.min(u64::MAX as u128) as u64; + let command_parts = command.args().iter().cloned().collect::>(); + let timestamp = SystemTime::now() + .checked_sub(duration) + .unwrap_or(UNIX_EPOCH) + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() as i64; + + let mut state = self.state.lock(); + let id = state.next_id; + state.next_id = state.next_id.saturating_add(1); + state.entries.push(SlowlogEntry { + id, + timestamp, + duration_us: clamped_duration, + command: command_parts, + }); + } +} + +pub fn handle_command(slowlog: &Slowlog, args: &[Bytes]) -> RespValue { + if args.len() < 2 { + return slowlog_error("wrong number of arguments for 'slowlog' command"); + } + + let sub = args[1].to_vec().to_ascii_uppercase(); + match sub.as_slice() { + b"GET" => handle_get(slowlog, args), + b"LEN" => handle_len(slowlog, args), + b"RESET" => handle_reset(slowlog, args), + _ => slowlog_error("unknown slowlog subcommand"), + } +} + +fn handle_get(slowlog: &Slowlog, args: &[Bytes]) -> RespValue { + if args.len() > 3 { + return slowlog_error("wrong number of arguments for 'slowlog get' command"); + } + let count = if args.len() == 3 { + match parse_non_negative(&args[2]) { + Ok(value) => Some(value), + Err(err) => return err, + } + } else { + None + }; + let entries = slowlog.snapshot(count); + let payload = entries + .into_iter() + .map(|entry| { + let mut fields = Vec::with_capacity(4); + fields.push(RespValue::Integer(entry.id)); + fields.push(RespValue::Integer(entry.timestamp)); + let duration = entry.duration_us.min(i64::MAX as u64) as i64; + fields.push(RespValue::Integer(duration)); + let command = entry + .command + .into_iter() + .map(RespValue::BulkString) + .collect(); + fields.push(RespValue::Array(command)); + RespValue::Array(fields) + }) + .collect(); + RespValue::Array(payload) +} + +fn handle_len(slowlog: &Slowlog, args: &[Bytes]) -> RespValue { + if args.len() != 2 { + return slowlog_error("wrong number of arguments for 'slowlog len' command"); + } + let len = slowlog.len().min(i64::MAX as usize) as i64; + RespValue::Integer(len) +} + +fn handle_reset(slowlog: &Slowlog, args: &[Bytes]) -> RespValue { + if args.len() != 2 { + return slowlog_error("wrong number of arguments for 'slowlog reset' command"); + } + slowlog.reset(); + RespValue::simple("OK") +} + +fn parse_non_negative(arg: &Bytes) -> Result { + let text = std::str::from_utf8(arg).map_err(|_| slowlog_value_error())?; + let value: i64 = text.parse().map_err(|_| slowlog_value_error())?; + if value < 0 { + return Err(slowlog_value_error()); + } + usize::try_from(value).map_err(|_| slowlog_value_error()) +} + +fn slowlog_error(message: &str) -> RespValue { + RespValue::Error(Bytes::from(format!("ERR {message}"))) +} + +fn slowlog_value_error() -> RespValue { + RespValue::Error(Bytes::from_static( + b"ERR value is not an integer or out of range", + )) +} + +struct RingBuffer { + buf: Vec, + capacity: usize, + next: usize, + len: usize, +} + +impl RingBuffer { + fn with_capacity(capacity: usize) -> Self { + Self { + buf: Vec::with_capacity(capacity), + capacity, + next: 0, + len: 0, + } + } + + fn push(&mut self, value: T) { + if self.capacity == 0 { + return; + } + if self.buf.len() < self.capacity { + if self.next == self.buf.len() { + self.buf.push(value); + } else { + self.buf[self.next] = value; + } + } else { + self.buf[self.next] = value; + } + self.next = (self.next + 1) % self.capacity; + if self.len < self.capacity { + self.len += 1; + } + } + + fn clear(&mut self) { + self.buf.clear(); + self.next = 0; + self.len = 0; + } + + fn len(&self) -> usize { + self.len + } +} + +impl RingBuffer { + fn newest(&self, count: Option) -> Vec { + if self.len == 0 || self.capacity == 0 { + return Vec::new(); + } + let limit = count.unwrap_or(self.len).min(self.len); + let mut result = Vec::with_capacity(limit); + let mut idx = (self.next + self.capacity - 1) % self.capacity; + for _ in 0..limit { + if idx >= self.buf.len() { + break; + } + result.push(self.buf[idx].clone()); + if result.len() == limit { + break; + } + if self.len == 1 { + break; + } + if idx == 0 { + idx = (idx + self.capacity - 1) % self.capacity; + } else { + idx -= 1; + } + } + result + } + + fn resize(&mut self, new_capacity: usize) { + if new_capacity == self.capacity { + return; + } + + if new_capacity == 0 { + self.clear(); + self.capacity = 0; + return; + } + + let entries = { + if self.len == 0 { + Vec::new() + } else { + let mut collected = Vec::new(); + let limit = self.len.min(new_capacity); + let mut idx = (self.next + self.capacity - 1) % self.capacity; + for _ in 0..limit { + if idx >= self.buf.len() { + break; + } + collected.push(self.buf[idx].clone()); + if collected.len() == limit { + break; + } + if idx == 0 { + idx = (idx + self.capacity - 1) % self.capacity; + } else { + idx -= 1; + } + } + collected + } + }; + + self.buf = Vec::with_capacity(new_capacity); + self.capacity = new_capacity; + self.next = 0; + self.len = 0; + for entry in entries.into_iter().rev() { + self.push(entry); + } + } +} diff --git a/src/standalone/mod.rs b/src/standalone/mod.rs index 2bbae64..3bf4b75 100644 --- a/src/standalone/mod.rs +++ b/src/standalone/mod.rs @@ -27,6 +27,7 @@ use crate::protocol::redis::{ BlockingKind, MultiDispatch, RedisCommand, RedisResponse, RespCodec, RespValue, SubCommand, SubResponse, SubscriptionKind, }; +use crate::slowlog::Slowlog; use crate::utils::trim_hash_tag; const DEFAULT_TIMEOUT_MS: u64 = 1_000; @@ -49,6 +50,7 @@ pub struct StandaloneProxy { pool: Arc>, runtime: Arc, config_manager: Arc, + slowlog: Arc, listen_port: u16, backend_nodes: usize, } @@ -85,6 +87,9 @@ impl StandaloneProxy { let backend_nodes = nodes.len(); let listen_port = config.listen_port()?; + let slowlog = config_manager + .slowlog_for(&config.name) + .ok_or_else(|| anyhow!("missing slowlog state for cluster {}", config.name))?; Ok(Self { cluster, @@ -95,6 +100,7 @@ impl StandaloneProxy { pool, runtime, config_manager, + slowlog, listen_port, backend_nodes, }) @@ -107,7 +113,8 @@ impl StandaloneProxy { ) -> Result { let hash_tag = self.hash_tag.as_deref(); let ring = &self.ring; - if let Some(multi) = command.expand_for_multi_with(|key| { + let command_snapshot = command.clone(); + let multi = command.expand_for_multi_with(|key| { if ring.is_empty() { return 0; } @@ -119,11 +126,16 @@ impl StandaloneProxy { Err(idx) => idx, }; idx as u64 - }) { + }); + let started = Instant::now(); + let result = if let Some(multi) = multi { self.dispatch_multi(client_id, multi).await } else { self.dispatch_single(client_id, command).await - } + }; + self.slowlog + .maybe_record(&command_snapshot, started.elapsed()); + result } async fn dispatch_single( @@ -434,6 +446,13 @@ impl StandaloneProxy { continue; } + if let Some(response) = self.try_handle_slowlog(&command) { + let success = !response.is_error(); + metrics::front_command(self.cluster.as_ref(), kind_label, success); + framed.send(response).await?; + continue; + } + let response = match self.dispatch(client_id, command).await { Ok(resp) => resp, Err(err) => { @@ -452,6 +471,17 @@ impl StandaloneProxy { Ok(()) } + fn try_handle_slowlog(&self, command: &RedisCommand) -> Option { + if !command.command_name().eq_ignore_ascii_case(b"SLOWLOG") { + return None; + } + + Some(crate::slowlog::handle_command( + &self.slowlog, + command.args(), + )) + } + async fn try_handle_config(&self, command: &RedisCommand) -> Option { self.config_manager.handle_command(command).await }