diff --git a/CHANGELOG.md b/CHANGELOG.md index c53bfbb..22c164f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,10 @@ # change log +## Unreleased + +- add backup request config + dispatcher to duplicate slow master reads to replicas in Redis Cluster mode + # 1.3.3 - avoid infinite call when cluster endpoint was down diff --git a/README.md b/README.md index 3346756..6128cfb 100644 --- a/README.md +++ b/README.md @@ -82,6 +82,11 @@ fetch = 600 # read_from_slave is the feature make slave balanced readed by client and ignore side effects. read_from_slave = true +# backup_request duplicates slow reads to replica nodes when enabled. +# trigger_slow_ms decides the fixed delay (set "default" or remove field to rely on moving average). +# multiplier is applied to the rolling average latency to determine another trigger threshold. +backup_request = { enabled = false, trigger_slow_ms = 5, multiplier = 2.0 } + ############################# Proxy Mode Special ####################################################### # ping_fail_limit means when ping fail reach the limit number, the node will be ejected from the cluster # until the ping is ok in future. diff --git a/default.toml b/default.toml index f8a42e1..73f3f61 100644 --- a/default.toml +++ b/default.toml @@ -44,6 +44,7 @@ fetch_interval = 1800000 # 1800s , 30 minutes fetch_since_latest_cmd = 1000 # 3600s , 1 hour read_from_slave = false + backup_request = { enabled = false, trigger_slow_ms = 5, multiplier = 2.0 } ping_fail_limit = 10 ping_interval = 300 diff --git a/docs/functionality.md b/docs/functionality.md index a45307f..600da90 100644 --- a/docs/functionality.md +++ b/docs/functionality.md @@ -72,6 +72,7 @@ - 订阅与阻塞命令: - SUBSCRIBE / PSUBSCRIBE 会进入独占会话,按频道哈希槽选择节点,并在 MOVED / ASK 时自动重连与重放订阅; - BLPOP 等阻塞类命令复用独占连接,避免被 pipeline 请求阻塞。 +- 备份读(backup request):仅在 Redis Cluster 模式下可选启用;当 master 读命令在配置阈值上仍未返回时,会复制该请求至对应 replica,优先向客户端返回更快的副本响应,同时继续跟踪 master 延迟以动态更新阈值。 - 依赖大量 `Rc>`、`futures::unsync::mpsc`,并使用 `tokio::runtime::current_thread`. ## 协议与命令抽象 diff --git a/docs/usage.md b/docs/usage.md index 651d747..36e8705 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -26,6 +26,11 @@ cargo build --release - `hash_tag`:一致性 hash 标签,例如 `{}`。 - `read_timeout` / `write_timeout`:后端超时(毫秒)。 - `read_from_slave`:Cluster 模式下允许从 replica 读取。 +- `backup_request`:Cluster 模式下用于配置“副本兜底读”策略的表,包含: + - `enabled`:是否开启该策略(默认 `false`)。 + - `trigger_slow_ms`:固定延迟阈值(毫秒,可写 `"default"` 关闭固定阈值),超过该延迟仍未返回则发送副本备份请求。 + - `multiplier`:相对阈值,等于“master 累计平均耗时 × multiplier”;当满足固定阈值或相对阈值任意条件即派发备份请求。 +- `backup_request` 的三个字段均可通过 `CONFIG SET cluster..backup-request-*` 在线调整。 - `slowlog_log_slower_than`:慢查询阈值(微秒,默认 `10000`,设为 `-1` 关闭记录)。 - `slowlog_max_len`:慢查询日志最大保留条数(默认 `128`)。 - `hotkey_sample_every`:热点 Key 采样间隔(默认 `32`,越大代表对请求采样越稀疏)。 diff --git a/src/cluster/mod.rs b/src/cluster/mod.rs index a3ba4bd..6da80d1 100644 --- a/src/cluster/mod.rs +++ b/src/cluster/mod.rs @@ -1,4 +1,5 @@ -use std::collections::{HashSet, VecDeque}; +use std::collections::{HashMap, HashSet, VecDeque}; +use std::pin::Pin; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -8,11 +9,12 @@ use bytes::Bytes; use futures::future::BoxFuture; use futures::stream::FuturesOrdered; use futures::{SinkExt, StreamExt}; +use parking_lot::RwLock; use rand::{seq::SliceRandom, thread_rng}; #[cfg(any(unix, windows))] use socket2::{SockRef, TcpKeepalive}; use tokio::net::TcpStream; -use tokio::sync::{mpsc, watch}; +use tokio::sync::{mpsc, oneshot, watch}; use tokio::time::{interval, sleep, timeout, MissedTickBehavior}; use tokio_util::codec::{Framed, FramedParts}; use tracing::{debug, info, warn}; @@ -21,7 +23,7 @@ use crate::auth::{AuthAction, BackendAuth, FrontendAuthenticator}; use crate::cache::{tracker::CacheTrackerSet, ClientCache}; use crate::backend::client::{ClientId, FrontConnectionGuard}; use crate::backend::pool::{BackendNode, ConnectionPool, Connector, SessionCommand}; -use crate::config::{ClusterConfig, ClusterRuntime, ConfigManager}; +use crate::config::{BackupRequestRuntime, ClusterConfig, ClusterRuntime, ConfigManager}; use crate::hotkey::Hotkey; use crate::info::{InfoContext, ProxyMode}; use crate::metrics; @@ -57,6 +59,7 @@ pub struct ClusterProxy { config_manager: Arc, slowlog: Arc, hotkey: Arc, + backup: Arc, listen_port: u16, seed_nodes: usize, client_cache: Arc, @@ -97,6 +100,10 @@ impl ClusterProxy { let hotkey = config_manager .hotkey_for(&config.name) .ok_or_else(|| anyhow!("missing hotkey state for cluster {}", config.name))?; + let backup_runtime = config_manager + .backup_request_for(&config.name) + .ok_or_else(|| anyhow!("missing backup request state for cluster {}", config.name))?; + let backup = Arc::new(BackupRequestController::new(backup_runtime)); let client_cache = config_manager .client_cache_for(&config.name) .ok_or_else(|| anyhow!("missing client cache state for cluster {}", config.name))?; @@ -120,6 +127,7 @@ impl ClusterProxy { config_manager, slowlog, hotkey, + backup, listen_port, seed_nodes: config.servers.len(), client_cache, @@ -755,6 +763,7 @@ impl ClusterProxy { let cluster = self.cluster.clone(); let slowlog = self.slowlog.clone(); let hotkey = self.hotkey.clone(); + let backup = self.backup.clone(); let kind_label = command.kind_label(); let cache = self.client_cache.clone(); Box::pin(async move { @@ -770,6 +779,8 @@ impl ClusterProxy { client_id, slowlog, hotkey, + backup, + cluster.clone(), command, ) .await @@ -1471,6 +1482,8 @@ async fn dispatch_with_context( client_id: ClientId, slowlog: Arc, hotkey: Arc, + backup: Arc, + cluster: Arc, command: RedisCommand, ) -> Result { let command_snapshot = command.clone(); @@ -1484,6 +1497,8 @@ async fn dispatch_with_context( pool, fetch_trigger, client_id, + backup, + cluster, multi, ) .await @@ -1495,6 +1510,8 @@ async fn dispatch_with_context( pool, fetch_trigger, client_id, + backup, + cluster, command, ) .await @@ -1511,6 +1528,8 @@ async fn dispatch_multi( pool: Arc>, fetch_trigger: mpsc::UnboundedSender<()>, client_id: ClientId, + backup: Arc, + cluster: Arc, multi: MultiDispatch, ) -> Result { let mut tasks: FuturesOrdered>> = FuturesOrdered::new(); @@ -1519,6 +1538,8 @@ async fn dispatch_multi( let slots = slots.clone(); let pool = pool.clone(); let fetch_trigger = fetch_trigger.clone(); + let backup = backup.clone(); + let cluster = cluster.clone(); let SubCommand { positions, command } = sub; tasks.push_back(Box::pin(async move { let response = dispatch_single( @@ -1528,6 +1549,8 @@ async fn dispatch_multi( pool, fetch_trigger, client_id, + backup, + cluster, command, ) .await?; @@ -1552,9 +1575,12 @@ async fn dispatch_single( pool: Arc>, fetch_trigger: mpsc::UnboundedSender<()>, client_id: ClientId, + backup: Arc, + cluster: Arc, command: RedisCommand, ) -> Result { let blocking = command.as_blocking(); + let is_read_only = command.is_read_only(); let mut slot = command .hash_slot(hash_tag.as_deref()) .ok_or_else(|| anyhow!("command missing key"))?; @@ -1596,29 +1622,46 @@ async fn dispatch_single( Err(_) => return Err(anyhow!("backend session closed")), } } else { - let response_rx = pool - .dispatch(target.clone(), client_id, command.clone()) - .await?; + let backup_plan = if target_override.is_none() + && !read_from_slave + && is_read_only + && matches!(blocking, BlockingKind::None) + { + replica_node_for_slot(&slots, slot) + .and_then(|replica| backup.plan(&target, Some(replica))) + } else { + None + }; + if backup_plan.is_some() { + metrics::backup_event(cluster.as_ref(), "planned"); + } - match response_rx.await { - Ok(Ok(resp)) => match parse_redirect(resp.clone())? { - Some(Redirect::Moved { - slot: new_slot, - address, - }) => { - let _ = fetch_trigger.send(()); - slot = new_slot; - target_override = Some(BackendNode::new(address)); - continue; - } - Some(Redirect::Ask { address }) => { - target_override = Some(BackendNode::new(address)); - continue; - } - None => return Ok(resp), - }, - Ok(Err(err)) => return Err(err), - Err(_) => return Err(anyhow!("backend session closed")), + let resp = execute_with_backup( + pool.clone(), + client_id, + &command, + target.clone(), + cluster.clone(), + backup_plan, + backup.clone(), + ) + .await?; + + match parse_redirect(resp.clone())? { + Some(Redirect::Moved { + slot: new_slot, + address, + }) => { + let _ = fetch_trigger.send(()); + slot = new_slot; + target_override = Some(BackendNode::new(address)); + continue; + } + Some(Redirect::Ask { address }) => { + target_override = Some(BackendNode::new(address)); + continue; + } + None => return Ok(resp), } } } @@ -1626,6 +1669,132 @@ async fn dispatch_single( Err(anyhow!("too many cluster redirects")) } +#[derive(Clone)] +struct BackupPlan { + replica: BackendNode, + delay: Duration, +} + +async fn execute_with_backup( + pool: Arc>, + client_id: ClientId, + command: &RedisCommand, + target: BackendNode, + cluster: Arc, + plan: Option, + controller: Arc, +) -> Result { + let primary_rx = pool + .dispatch(target.clone(), client_id, command.clone()) + .await?; + if let Some(plan) = plan { + race_with_backup( + pool, + client_id, + command, + target, + primary_rx, + cluster, + plan, + controller, + ) + .await + } else { + let started = Instant::now(); + let result = primary_rx.await; + if result.is_ok() { + controller.record_primary(&target, started.elapsed()); + } + result? + } +} + +async fn race_with_backup( + pool: Arc>, + client_id: ClientId, + command: &RedisCommand, + master: BackendNode, + primary_rx: oneshot::Receiver>, + cluster: Arc, + plan: BackupPlan, + controller: Arc, +) -> Result { + let master_start = Instant::now(); + let mut primary_future = Box::pin(primary_rx); + let mut delay_future = Box::pin(tokio::time::sleep(plan.delay)); + + if let Some(result) = tokio::select! { + res = primary_future.as_mut() => Some(res), + _ = delay_future.as_mut() => None, + } { + if result.is_ok() { + controller.record_primary(&master, master_start.elapsed()); + } + metrics::backup_event(cluster.as_ref(), "primary-before"); + return result?; + } + + let backup_rx = match pool + .dispatch(plan.replica.clone(), client_id, command.clone()) + .await + { + Ok(rx) => { + metrics::backup_event(cluster.as_ref(), "dispatched"); + rx + } + Err(err) => { + metrics::backup_event(cluster.as_ref(), "dispatch-fail"); + warn!( + master = %master.as_str(), + replica = %plan.replica.as_str(), + error = %err, + "failed to dispatch backup request; falling back to primary" + ); + return await_primary_only(primary_future, controller, master, master_start).await; + } + }; + let mut backup_future = Box::pin(backup_rx); + + tokio::select! { + res = primary_future.as_mut() => { + if res.is_ok() { + controller.record_primary(&master, master_start.elapsed()); + } + metrics::backup_event(cluster.as_ref(), "primary-after"); + res? + } + res = backup_future.as_mut() => { + metrics::backup_event(cluster.as_ref(), "replica-win"); + let remaining = primary_future; + let controller_clone = controller.clone(); + let master_clone = master.clone(); + tokio::spawn(async move { + let mut future = remaining; + if let Ok(Ok(_)) = future.as_mut().await { + controller_clone.record_primary(&master_clone, master_start.elapsed()); + } + }); + res? + } + } +} + +async fn await_primary_only( + mut primary_future: Pin>>>, + controller: Arc, + master: BackendNode, + master_start: Instant, +) -> Result { + match primary_future.as_mut().await { + Ok(Ok(resp)) => { + controller.record_primary(&master, master_start.elapsed()); + Ok(resp) + } + Ok(Err(err)) => Err(err), + Err(_) => Err(anyhow!("backend session closed")), + } +} + fn select_node_for_slot( slots: &watch::Sender, read_from_slave: bool, @@ -1643,6 +1812,85 @@ fn select_node_for_slot( bail!("slot {} not covered", slot) } +fn replica_node_for_slot(slots: &watch::Sender, slot: u16) -> Option { + slots + .borrow() + .replica_for_slot(slot) + .map(|addr| BackendNode::new(addr.to_string())) +} + +#[derive(Clone)] +struct BackupRequestController { + runtime: Arc, + averages: Arc>>, +} + +impl BackupRequestController { + fn new(runtime: Arc) -> Self { + Self { + runtime, + averages: Arc::new(RwLock::new(HashMap::new())), + } + } + + fn plan(&self, master: &BackendNode, replica: Option) -> Option { + let replica = replica?; + let delay = self.delay_for(master)?; + Some(BackupPlan { replica, delay }) + } + + fn record_primary(&self, master: &BackendNode, elapsed: Duration) { + let micros = elapsed.as_secs_f64() * 1_000_000.0; + let mut guard = self.averages.write(); + let entry = guard + .entry(master.as_str().to_string()) + .or_insert_with(LatencyAverage::default); + entry.update(micros); + } + + fn delay_for(&self, master: &BackendNode) -> Option { + if !self.runtime.enabled() { + return None; + } + let mut candidates = Vec::new(); + if let Some(ms) = self.runtime.threshold_ms() { + candidates.push(Duration::from_millis(ms)); + } + if let Some(avg) = self.average_for(master) { + let multiplier = self.runtime.multiplier(); + if multiplier > 0.0 { + let scaled = (avg * multiplier).clamp(0.0, u64::MAX as f64); + let micros = scaled as u64; + candidates.push(Duration::from_micros(micros)); + } + } + candidates.into_iter().min() + } + + fn average_for(&self, master: &BackendNode) -> Option { + let guard = self.averages.read(); + guard.get(master.as_str()).map(|stats| stats.avg_micros) + } +} + +#[derive(Default, Clone)] +struct LatencyAverage { + avg_micros: f64, + samples: u64, +} + +impl LatencyAverage { + fn update(&mut self, sample: f64) { + self.samples = self.samples.saturating_add(1); + if self.samples == 1 { + self.avg_micros = sample; + } else { + let count = self.samples as f64; + self.avg_micros += (sample - self.avg_micros) / count; + } + } +} + fn subscription_count(resp: &RespValue) -> Option<(SubscriptionKind, i64)> { if let RespValue::Array(items) = resp { if items.len() >= 3 { diff --git a/src/config/mod.rs b/src/config/mod.rs index 7991d01..e652fae 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -1,7 +1,7 @@ use std::collections::{HashMap, HashSet}; use std::env; use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicI64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering}; use std::sync::Arc; use anyhow::{anyhow, bail, Context, Result}; @@ -76,6 +76,10 @@ fn default_client_cache_drain_interval_ms() -> u64 { 50 } +fn default_backup_multiplier() -> f64 { + 2.0 +} + #[derive(Debug, Clone, Deserialize, Serialize)] pub struct Config { #[serde(default)] @@ -146,6 +150,24 @@ impl Default for CacheType { } } +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(default)] +pub struct BackupRequestConfig { + pub enabled: bool, + pub trigger_slow_ms: Option, + pub multiplier: f64, +} + +impl Default for BackupRequestConfig { + fn default() -> Self { + Self { + enabled: false, + trigger_slow_ms: None, + multiplier: default_backup_multiplier(), + } + } +} + #[derive(Debug, Clone, Deserialize, Serialize)] pub struct ClientCacheConfig { #[serde(default)] @@ -256,6 +278,8 @@ pub struct ClusterConfig { pub backend_resp_version: RespVersion, #[serde(default)] pub client_cache: ClientCacheConfig, + #[serde(default)] + pub backup_request: BackupRequestConfig, } impl ClusterConfig { @@ -310,6 +334,12 @@ impl ClusterConfig { if !(self.hotkey_decay > 0.0 && self.hotkey_decay <= 1.0) { bail!("cluster {} hotkey_decay must be in (0, 1]", self.name); } + if self.backup_request.multiplier <= 0.0 { + bail!( + "cluster {} backup_request.multiplier must be greater than 0", + self.name + ); + } Ok(()) } @@ -420,6 +450,49 @@ impl ClusterRuntime { } } +#[derive(Debug)] +pub struct BackupRequestRuntime { + enabled: AtomicBool, + threshold_ms: AtomicI64, + multiplier_bits: AtomicU64, +} + +impl BackupRequestRuntime { + fn new(config: &BackupRequestConfig) -> Self { + Self { + enabled: AtomicBool::new(config.enabled), + threshold_ms: AtomicI64::new(option_to_atomic(config.trigger_slow_ms)), + multiplier_bits: AtomicU64::new(config.multiplier.to_bits()), + } + } + + pub fn enabled(&self) -> bool { + self.enabled.load(Ordering::Relaxed) + } + + pub fn set_enabled(&self, value: bool) { + self.enabled.store(value, Ordering::Relaxed); + } + + pub fn threshold_ms(&self) -> Option { + atomic_to_option(self.threshold_ms.load(Ordering::Relaxed)) + } + + pub fn set_threshold_ms(&self, value: Option) { + self.threshold_ms + .store(option_to_atomic(value), Ordering::Relaxed); + } + + pub fn multiplier(&self) -> f64 { + f64::from_bits(self.multiplier_bits.load(Ordering::Relaxed)) + } + + pub fn set_multiplier(&self, value: f64) { + self.multiplier_bits + .store(value.to_bits(), Ordering::Relaxed); + } +} + fn option_to_atomic(value: Option) -> i64 { match value { Some(v) => v as i64, @@ -442,6 +515,7 @@ struct ClusterEntry { slowlog: Arc, hotkey: Arc, client_cache: Arc, + backup: Arc, } #[derive(Debug)] @@ -478,6 +552,7 @@ impl ConfigManager { cluster.client_cache.clone(), cluster.backend_resp_version == RespVersion::Resp3, )); + let backup = Arc::new(BackupRequestRuntime::new(&cluster.backup_request)); clusters.insert( key, ClusterEntry { @@ -486,6 +561,7 @@ impl ConfigManager { slowlog, hotkey, client_cache, + backup, }, ); } @@ -521,6 +597,12 @@ impl ConfigManager { .map(|entry| entry.client_cache.clone()) } + pub fn backup_request_for(&self, name: &str) -> Option> { + self.clusters + .get(&name.to_ascii_lowercase()) + .map(|entry| entry.backup.clone()) + } + pub async fn handle_command(&self, command: &RedisCommand) -> Option { if !command.command_name().eq_ignore_ascii_case(b"CONFIG") { return None; @@ -771,6 +853,41 @@ impl ConfigManager { "cluster client_cache.drain_interval_ms updated via CONFIG SET" ); } + ClusterField::BackupRequestEnabled => { + let enabled = parse_bool_flag(value, "backup-request-enabled")?; + entry.backup.set_enabled(enabled); + let mut guard = self.config.write(); + guard.clusters_mut()[entry.index].backup_request.enabled = enabled; + info!( + cluster = cluster_name, + value = value, + "cluster backup_request.enabled updated via CONFIG SET" + ); + } + ClusterField::BackupRequestThreshold => { + let parsed = parse_timeout_value(value)?; + entry.backup.set_threshold_ms(parsed); + let mut guard = self.config.write(); + guard.clusters_mut()[entry.index] + .backup_request + .trigger_slow_ms = parsed; + info!( + cluster = cluster_name, + value = value, + "cluster backup_request.trigger_slow_ms updated via CONFIG SET" + ); + } + ClusterField::BackupRequestMultiplier => { + let parsed = parse_backup_multiplier(value)?; + entry.backup.set_multiplier(parsed); + let mut guard = self.config.write(); + guard.clusters_mut()[entry.index].backup_request.multiplier = parsed; + info!( + cluster = cluster_name, + value = value, + "cluster backup_request.multiplier updated via CONFIG SET" + ); + } } Ok(()) } @@ -853,6 +970,18 @@ impl ConfigManager { format!("cluster.{}.client-cache-drain-interval-ms", name), cache_cfg.drain_interval_ms.to_string(), )); + entries.push(( + format!("cluster.{}.backup-request-enabled", name), + bool_to_string(entry.backup.enabled()), + )); + entries.push(( + format!("cluster.{}.backup-request-threshold-ms", name), + option_to_string(entry.backup.threshold_ms()), + )); + entries.push(( + format!("cluster.{}.backup-request-multiplier", name), + entry.backup.multiplier().to_string(), + )); } } entries.sort_by(|a, b| a.0.cmp(&b.0)); @@ -900,6 +1029,9 @@ fn parse_key(key: &str) -> Result<(String, ClusterField)> { "client-cache-shard-count" => ClusterField::ClientCacheShardCount, "client-cache-drain-batch" => ClusterField::ClientCacheDrainBatch, "client-cache-drain-interval-ms" => ClusterField::ClientCacheDrainIntervalMs, + "backup-request-enabled" => ClusterField::BackupRequestEnabled, + "backup-request-threshold-ms" => ClusterField::BackupRequestThreshold, + "backup-request-multiplier" => ClusterField::BackupRequestMultiplier, unknown => bail!("unknown cluster field '{}'", unknown), }; Ok((cluster.to_string(), field)) @@ -1023,12 +1155,31 @@ fn parse_positive_u64(value: &str, field: &str) -> Result { Ok(parsed as u64) } +fn parse_backup_multiplier(value: &str) -> Result { + let parsed: f64 = value + .trim() + .parse() + .with_context(|| format!("invalid backup-request-multiplier value '{}'", value))?; + if parsed <= 0.0 { + bail!("backup-request-multiplier must be > 0"); + } + Ok(parsed) +} + fn option_to_string(value: Option) -> String { value .map(|v| v.to_string()) .unwrap_or_else(|| DUMP_VALUE_DEFAULT.to_string()) } +fn bool_to_string(value: bool) -> String { + if value { + "yes".to_string() + } else { + "no".to_string() + } +} + fn flatten_pairs(entries: Vec<(String, String)>) -> Vec { let mut values = Vec::with_capacity(entries.len() * 2); for (key, value) in entries { @@ -1059,6 +1210,9 @@ enum ClusterField { ClientCacheShardCount, ClientCacheDrainBatch, ClientCacheDrainIntervalMs, + BackupRequestEnabled, + BackupRequestThreshold, + BackupRequestMultiplier, } fn wildcard_match(pattern: &str, target: &str) -> bool { diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs index c7b852c..aeff374 100644 --- a/src/metrics/mod.rs +++ b/src/metrics/mod.rs @@ -254,6 +254,17 @@ static REMOTE_TIMER: Lazy = Lazy::new(|| { .expect("remote timer histogram registration must succeed") }); +static BACKUP_REQUEST_EVENTS: Lazy = Lazy::new(|| { + register_int_counter_vec!( + opts!( + "aster_backup_request_total", + "backup request events grouped by cluster and event" + ), + &["cluster", "event"] + ) + .expect("backup request counter registration must succeed") +}); + /// Register the running version with metrics. pub fn register_version(version: &str) { VERSION_GAUGE.with_label_values(&[version]).set(1.0); @@ -403,6 +414,13 @@ pub fn backend_request_result(cluster: &str, backend: &str, result: &str) { .inc(); } +/// Record a backup request related event for observability. +pub fn backup_event(cluster: &str, event: &str) { + BACKUP_REQUEST_EVENTS + .with_label_values(&[cluster, event]) + .inc(); +} + /// Record the outcome of a backend heartbeat check. pub fn backend_heartbeat(cluster: &str, backend: &str, ok: bool) { let status = if ok { "ok" } else { "fail" };