Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
password = "example-frontend-secret"
# 后端 ACL:此处演示用户名 + 密码;如果后端是简单密码,可直接使用 backend_password
backend_auth = { username = "proxy", password = "example-backend-secret" }
# 慢查询日志阈值(微秒);默认 10000,设置为 -1 以关闭记录
slowlog_log_slower_than = 10000
# 慢查询日志最大保留条数;默认 128
slowlog_max_len = 128

[[clusters]]
name = "test-cluster"
Expand Down Expand Up @@ -50,3 +54,7 @@
] }
# 旧版写法依然兼容:backend_password = "raw-secret"
backend_password = "cluster-backend-secret"
# 慢查询日志阈值(微秒);默认 10000,设置为 -1 以关闭记录
slowlog_log_slower_than = 10000
# 慢查询日志最大保留条数;默认 128
slowlog_max_len = 128
4 changes: 4 additions & 0 deletions docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@ cargo build --release
- `hash_tag`:一致性 hash 标签,例如 `{}`。
- `read_timeout` / `write_timeout`:后端超时(毫秒)。
- `read_from_slave`:Cluster 模式下允许从 replica 读取。
- `slowlog_log_slower_than`:慢查询阈值(微秒,默认 `10000`,设为 `-1` 关闭记录)。
- `slowlog_max_len`:慢查询日志最大保留条数(默认 `128`)。
- `auth` / `password`:前端 ACL,详见下文。
- `backend_auth` / `backend_password`:后端 ACL 认证,详见下文。

> 提示:代理原生支持 `SLOWLOG GET/LEN/RESET`,并按集群维度汇总慢查询;配置上述阈值和长度即可控制记录行为。

示例参见仓库根目录的 `default.toml`。

### ACL 配置
Expand Down
40 changes: 38 additions & 2 deletions src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::protocol::redis::{
BlockingKind, MultiDispatch, RedisCommand, RespCodec, RespValue, SlotMap, SubCommand,
SubResponse, SubscriptionKind, SLOT_COUNT,
};
use crate::slowlog::Slowlog;
use crate::utils::{crc16, trim_hash_tag};

const FETCH_INTERVAL: Duration = Duration::from_secs(10);
Expand All @@ -52,6 +53,7 @@ pub struct ClusterProxy {
fetch_trigger: mpsc::UnboundedSender<()>,
runtime: Arc<ClusterRuntime>,
config_manager: Arc<ConfigManager>,
slowlog: Arc<Slowlog>,
listen_port: u16,
seed_nodes: usize,
}
Expand Down Expand Up @@ -83,6 +85,9 @@ impl ClusterProxy {
.map(Arc::new);

let listen_port = config.listen_port()?;
let slowlog = config_manager
.slowlog_for(&config.name)
.ok_or_else(|| anyhow!("missing slowlog state for cluster {}", config.name))?;
let proxy = Self {
cluster: cluster.clone(),
hash_tag,
Expand All @@ -94,6 +99,7 @@ impl ClusterProxy {
fetch_trigger: trigger_tx.clone(),
runtime,
config_manager,
slowlog,
listen_port,
seed_nodes: config.servers.len(),
};
Expand Down Expand Up @@ -295,6 +301,18 @@ impl ClusterProxy {
inflight += 1;
continue;
}
if let Some(response) = self.try_handle_slowlog(&cmd) {
let success = !response.is_error();
metrics::front_command(
self.cluster.as_ref(),
cmd.kind_label(),
success,
);
let fut = async move { response };
pending.push_back(Box::pin(fut));
inflight += 1;
continue;
}
let guard = self.prepare_dispatch(client_id, cmd);
pending.push_back(Box::pin(guard));
inflight += 1;
Expand Down Expand Up @@ -339,6 +357,16 @@ impl ClusterProxy {
self.config_manager.handle_command(command).await
}

fn try_handle_slowlog(&self, command: &RedisCommand) -> Option<RespValue> {
if !command.command_name().eq_ignore_ascii_case(b"SLOWLOG") {
return None;
}
Some(crate::slowlog::handle_command(
&self.slowlog,
command.args(),
))
}

fn try_handle_info(&self, command: &RedisCommand) -> Option<RespValue> {
if !command.command_name().eq_ignore_ascii_case(b"INFO") {
return None;
Expand Down Expand Up @@ -646,6 +674,7 @@ impl ClusterProxy {
let pool = self.pool.clone();
let fetch_trigger = self.fetch_trigger.clone();
let cluster = self.cluster.clone();
let slowlog = self.slowlog.clone();
let kind_label = command.kind_label();
Box::pin(async move {
match dispatch_with_context(
Expand All @@ -655,6 +684,7 @@ impl ClusterProxy {
pool,
fetch_trigger,
client_id,
slowlog,
command,
)
.await
Expand Down Expand Up @@ -1232,9 +1262,13 @@ async fn dispatch_with_context(
pool: Arc<ConnectionPool<RedisCommand>>,
fetch_trigger: mpsc::UnboundedSender<()>,
client_id: ClientId,
slowlog: Arc<Slowlog>,
command: RedisCommand,
) -> Result<RespValue> {
if let Some(multi) = command.expand_for_multi(hash_tag.as_deref()) {
let command_snapshot = command.clone();
let multi = command.expand_for_multi(hash_tag.as_deref());
let started = Instant::now();
let result = if let Some(multi) = multi {
dispatch_multi(
hash_tag,
read_from_slave,
Expand All @@ -1256,7 +1290,9 @@ async fn dispatch_with_context(
command,
)
.await
}
};
slowlog.maybe_record(&command_snapshot, started.elapsed());
result
}

async fn dispatch_multi(
Expand Down
96 changes: 92 additions & 4 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,21 @@ use tracing::{info, warn};

use crate::auth::{AuthUserConfig, BackendAuthConfig, FrontendAuthConfig};
use crate::protocol::redis::{RedisCommand, RespValue};
use crate::slowlog::Slowlog;

/// Environment variable controlling the default worker thread count when a
/// cluster omits the `thread` field.
pub const ENV_DEFAULT_THREADS: &str = "ASTER_DEFAULT_THREAD";
const DUMP_VALUE_DEFAULT: &str = "default";

fn default_slowlog_log_slower_than() -> i64 {
10_000
}

fn default_slowlog_max_len() -> usize {
128
}

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Config {
#[serde(default)]
Expand Down Expand Up @@ -130,6 +139,10 @@ pub struct ClusterConfig {
pub backend_auth: Option<BackendAuthConfig>,
#[serde(default)]
pub backend_password: Option<String>,
#[serde(default = "default_slowlog_log_slower_than")]
pub slowlog_log_slower_than: i64,
#[serde(default = "default_slowlog_max_len")]
pub slowlog_max_len: usize,
}

impl ClusterConfig {
Expand Down Expand Up @@ -161,6 +174,12 @@ impl ClusterConfig {
self.name
);
}
if self.slowlog_log_slower_than < -1 {
bail!(
"cluster {} slowlog-log-slower-than must be >= -1",
self.name
);
}
Ok(())
}

Expand Down Expand Up @@ -290,6 +309,7 @@ fn atomic_to_option(value: i64) -> Option<u64> {
struct ClusterEntry {
index: usize,
runtime: Arc<ClusterRuntime>,
slowlog: Arc<Slowlog>,
}

#[derive(Debug)]
Expand All @@ -304,14 +324,20 @@ impl ConfigManager {
let mut clusters = HashMap::new();
for (index, cluster) in config.clusters().iter().enumerate() {
let key = cluster.name.to_ascii_lowercase();
let runtime = Arc::new(ClusterRuntime::new(
cluster.read_timeout,
cluster.write_timeout,
));
let slowlog = Arc::new(Slowlog::new(
cluster.slowlog_log_slower_than,
cluster.slowlog_max_len,
));
clusters.insert(
key,
ClusterEntry {
index,
runtime: Arc::new(ClusterRuntime::new(
cluster.read_timeout,
cluster.write_timeout,
)),
runtime,
slowlog,
},
);
}
Expand All @@ -329,6 +355,12 @@ impl ConfigManager {
.map(|entry| entry.runtime.clone())
}

pub fn slowlog_for(&self, name: &str) -> Option<Arc<Slowlog>> {
self.clusters
.get(&name.to_ascii_lowercase())
.map(|entry| entry.slowlog.clone())
}

pub async fn handle_command(&self, command: &RedisCommand) -> Option<RespValue> {
if !command.command_name().eq_ignore_ascii_case(b"CONFIG") {
return None;
Expand Down Expand Up @@ -429,6 +461,28 @@ impl ConfigManager {
"cluster write_timeout updated via CONFIG SET"
);
}
ClusterField::SlowlogLogSlowerThan => {
let parsed = parse_slowlog_threshold(value)?;
entry.slowlog.set_threshold(parsed);
let mut guard = self.config.write();
guard.clusters_mut()[entry.index].slowlog_log_slower_than = parsed;
info!(
cluster = cluster_name,
value = value,
"cluster slowlog_log_slower_than updated via CONFIG SET"
);
}
ClusterField::SlowlogMaxLen => {
let parsed = parse_slowlog_len(value)?;
entry.slowlog.set_max_len(parsed);
let mut guard = self.config.write();
guard.clusters_mut()[entry.index].slowlog_max_len = parsed;
info!(
cluster = cluster_name,
value = value,
"cluster slowlog_max_len updated via CONFIG SET"
);
}
}
Ok(())
}
Expand Down Expand Up @@ -457,6 +511,14 @@ impl ConfigManager {
format!("cluster.{}.write-timeout", name),
option_to_string(runtime.write_timeout()),
));
entries.push((
format!("cluster.{}.slowlog-log-slower-than", name),
entry.slowlog.threshold().to_string(),
));
entries.push((
format!("cluster.{}.slowlog-max-len", name),
entry.slowlog.max_len().to_string(),
));
}
}
entries.sort_by(|a, b| a.0.cmp(&b.0));
Expand Down Expand Up @@ -491,6 +553,8 @@ fn parse_key(key: &str) -> Result<(String, ClusterField)> {
let field = match field.to_ascii_lowercase().as_str() {
"read-timeout" => ClusterField::ReadTimeout,
"write-timeout" => ClusterField::WriteTimeout,
"slowlog-log-slower-than" => ClusterField::SlowlogLogSlowerThan,
"slowlog-max-len" => ClusterField::SlowlogMaxLen,
unknown => bail!("unknown cluster field '{}'", unknown),
};
Ok((cluster.to_string(), field))
Expand All @@ -507,6 +571,28 @@ fn parse_timeout_value(value: &str) -> Result<Option<u64>> {
Ok(Some(parsed))
}

fn parse_slowlog_threshold(value: &str) -> Result<i64> {
let parsed: i64 = value
.trim()
.parse()
.with_context(|| format!("invalid slowlog-log-slower-than value '{}'", value))?;
if parsed < -1 {
bail!("slowlog-log-slower-than must be >= -1");
}
Ok(parsed)
}

fn parse_slowlog_len(value: &str) -> Result<usize> {
let parsed: i64 = value
.trim()
.parse()
.with_context(|| format!("invalid slowlog-max-len value '{}'", value))?;
if parsed < 0 {
bail!("slowlog-max-len must be >= 0");
}
usize::try_from(parsed).map_err(|_| anyhow!("slowlog-max-len is too large"))
}

fn option_to_string(value: Option<u64>) -> String {
value
.map(|v| v.to_string())
Expand All @@ -530,6 +616,8 @@ fn err_response<T: ToString>(message: T) -> RespValue {
enum ClusterField {
ReadTimeout,
WriteTimeout,
SlowlogLogSlowerThan,
SlowlogMaxLen,
}

fn wildcard_match(pattern: &str, target: &str) -> bool {
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub mod info;
pub mod meta;
pub mod metrics;
pub mod protocol;
pub mod slowlog;
pub mod standalone;
pub mod utils;

Expand Down
Loading