diff --git a/Cargo.lock b/Cargo.lock index 21a5001..52bb3ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -263,6 +263,12 @@ dependencies = [ "log", ] +[[package]] +name = "equivalent" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" + [[package]] name = "errno" version = "0.3.14" @@ -279,6 +285,7 @@ version = "0.1.0" dependencies = [ "criterion", "env_logger", + "indexmap", "log", "signal-hook", ] @@ -324,6 +331,22 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "hashbrown" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f467dd6dccf739c208452f8014c75c18bb8301b050ad1cfb27153803edb0f51" + +[[package]] +name = "indexmap" +version = "2.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d466e9454f08e4a911e14806c24e16fba1b4c121d1ea474396f396069cf949d9" +dependencies = [ + "equivalent", + "hashbrown", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.2" diff --git a/Cargo.toml b/Cargo.toml index 89af80d..552e11e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ edition = "2024" signal-hook = "0.4.4" log = "0.4" env_logger = "0.11" +indexmap = "2" [dev-dependencies] criterion = "0.8.2" diff --git a/src/cli.rs b/src/cli.rs index 1c300e2..10974e1 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -15,6 +15,7 @@ use std::time::Duration; use ferrum_kv::config::{FileConfig, FileConfigError}; use ferrum_kv::persistence::config::{AofConfig, FsyncPolicy}; +use ferrum_kv::storage::eviction::{EvictionConfig, EvictionPolicy}; pub const DEFAULT_ADDR: &str = "127.0.0.1:6380"; @@ -22,6 +23,8 @@ pub const USAGE: &str = concat!( "usage: ferrum-kv [--config PATH] [--addr HOST:PORT] [--aof-path PATH]\n", " [--appendfsync always|everysec|no]\n", " [--client-timeout SECONDS] [--maxclients N]\n", + " [--maxmemory BYTES] [--maxmemory-policy POLICY]\n", + " [--maxmemory-samples N]\n", " [--loglevel off|error|warn|info|debug|trace]" ); @@ -50,6 +53,13 @@ pub struct CliArgs { /// Explicit `loglevel` requested via `--loglevel` or the config file. /// The caller may still override this with `FERRUM_LOG`/`RUST_LOG`. loglevel: Option, + /// Memory ceiling in bytes. `None` keeps the built-in default of + /// zero (disabled); `Some(0)` explicitly disables enforcement. + max_memory: Option, + /// Eviction policy when `maxmemory` is reached. + max_memory_policy: Option, + /// How many random candidates each eviction round considers. + max_memory_samples: Option, } /// Raw, un-merged values taken verbatim from the command line. @@ -68,6 +78,9 @@ struct RawFlags { client_timeout: Option>, // outer Some = flag was passed, inner None = disabled max_clients: Option, loglevel: Option, + max_memory: Option, + max_memory_policy: Option, + max_memory_samples: Option, /// Whether AOF was explicitly enabled via the config file's `appendonly yes`. /// CLI `--aof-path` implies enabled; this field only carries the file's /// intent so that a later merge step can decide. @@ -113,6 +126,18 @@ impl CliArgs { pub fn loglevel(&self) -> Option<&str> { self.loglevel.as_deref() } + + /// Returns the resolved eviction configuration. Defaults (unlimited + /// memory, `noeviction`, 5 samples) apply when neither CLI nor config + /// file specifies a value. + pub fn eviction_config(&self) -> EvictionConfig { + let default = EvictionConfig::default(); + EvictionConfig { + max_memory: self.max_memory.unwrap_or(default.max_memory), + policy: self.max_memory_policy.unwrap_or(default.policy), + samples: self.max_memory_samples.unwrap_or(default.samples), + } + } } enum ScanOutcome { @@ -161,6 +186,29 @@ fn scan_argv>(args: I) -> Result { + let value = take_value(&mut iter, "--maxmemory")?; + raw.max_memory = + Some(parse_bytes(&value).map_err(|e| format!("invalid --maxmemory: {e}"))?); + } + "--maxmemory-policy" => { + let value = take_value(&mut iter, "--maxmemory-policy")?; + let name = value.to_ascii_lowercase(); + raw.max_memory_policy = + Some(EvictionPolicy::from_name(&name).ok_or_else(|| { + format!( + "invalid --maxmemory-policy '{value}' (expected noeviction, \ + allkeys-lru, volatile-lru, allkeys-random, volatile-random, \ + volatile-ttl)" + ) + })?); + } + "--maxmemory-samples" => { + let value = take_value(&mut iter, "--maxmemory-samples")?; + raw.max_memory_samples = Some(value.parse().map_err(|_| { + format!("invalid --maxmemory-samples: '{value}' is not a non-negative integer") + })?); + } "-h" | "--help" => return Ok(ScanOutcome::Help), other => return Err(format!("unrecognised argument: '{other}'")), } @@ -221,6 +269,15 @@ fn merge(raw: RawFlags, file: Option<&FileConfig>) -> Result { .loglevel .or_else(|| file.and_then(|f| f.loglevel.clone())); + // --- maxmemory family --------------------------------------------------- + let max_memory = raw.max_memory.or_else(|| file.and_then(|f| f.max_memory)); + let max_memory_policy = raw + .max_memory_policy + .or_else(|| file.and_then(|f| f.max_memory_policy)); + let max_memory_samples = raw + .max_memory_samples + .or_else(|| file.and_then(|f| f.max_memory_samples)); + Ok(CliArgs { addr, aof_path, @@ -228,6 +285,9 @@ fn merge(raw: RawFlags, file: Option<&FileConfig>) -> Result { client_timeout, max_clients, loglevel, + max_memory, + max_memory_policy, + max_memory_samples, }) } @@ -285,6 +345,34 @@ fn parse_timeout_seconds(raw: &str) -> Result, String> { } } +/// Parses a Redis-style byte size (`100mb`, `1gb`, plain integer, …). +fn parse_bytes(raw: &str) -> Result { + let lower = raw.trim().to_ascii_lowercase(); + let (num, factor): (&str, u64) = if let Some(s) = lower.strip_suffix("gb") { + (s, 1024 * 1024 * 1024) + } else if let Some(s) = lower.strip_suffix("mb") { + (s, 1024 * 1024) + } else if let Some(s) = lower.strip_suffix("kb") { + (s, 1024) + } else if let Some(s) = lower.strip_suffix('g') { + (s, 1024 * 1024 * 1024) + } else if let Some(s) = lower.strip_suffix('m') { + (s, 1024 * 1024) + } else if let Some(s) = lower.strip_suffix('k') { + (s, 1024) + } else if let Some(s) = lower.strip_suffix('b') { + (s, 1) + } else { + (lower.as_str(), 1) + }; + let num: u64 = num + .trim() + .parse() + .map_err(|_| format!("'{raw}' is not a byte size"))?; + num.checked_mul(factor) + .ok_or_else(|| format!("'{raw}' overflows u64")) +} + // Marker to silence the dead_code lint on the currently-reserved fields. // The struct itself is module-private; this impl is cheap. impl RawFlags { @@ -518,4 +606,46 @@ mod tests { let err = parse(&["--config", "/definitely/does/not/exist/xyz.conf"]).unwrap_err(); assert!(err.contains("failed to read config")); } + + #[test] + fn maxmemory_flag_accepts_byte_suffixes() { + let args = parse_run(&["--maxmemory", "10mb"]); + assert_eq!(args.eviction_config().max_memory, 10 * 1024 * 1024); + } + + #[test] + fn maxmemory_policy_flag_is_parsed() { + let args = parse_run(&["--maxmemory-policy", "allkeys-lru"]); + assert_eq!(args.eviction_config().policy, EvictionPolicy::AllKeysLru); + } + + #[test] + fn maxmemory_samples_flag_is_parsed() { + let args = parse_run(&["--maxmemory-samples", "20"]); + assert_eq!(args.eviction_config().samples, 20); + } + + #[test] + fn maxmemory_rejects_unknown_policy() { + let err = parse(&["--maxmemory-policy", "wishful"]).unwrap_err(); + assert!(err.contains("--maxmemory-policy")); + } + + #[test] + fn cli_maxmemory_overrides_config_file() { + let conf = TempConf::new( + "maxmem", + "maxmemory 1kb\nmaxmemory-policy allkeys-lru\nmaxmemory-samples 3\n", + ); + let args = parse_run(&[ + "--config", + conf.path.to_str().unwrap(), + "--maxmemory", + "2mb", + ]); + let cfg = args.eviction_config(); + assert_eq!(cfg.max_memory, 2 * 1024 * 1024); + assert_eq!(cfg.policy, EvictionPolicy::AllKeysLru); + assert_eq!(cfg.samples, 3); + } } diff --git a/src/config/file.rs b/src/config/file.rs index 84b7c7d..75829b7 100644 --- a/src/config/file.rs +++ b/src/config/file.rs @@ -17,6 +17,7 @@ use std::fs; use std::path::{Path, PathBuf}; use crate::persistence::config::FsyncPolicy; +use crate::storage::eviction::EvictionPolicy; /// Parsed configuration file contents. /// @@ -35,6 +36,12 @@ pub struct FileConfig { pub appendfsync: Option, /// Log level string accepted by `env_logger` (e.g. `info`, `debug`). pub loglevel: Option, + /// Memory ceiling in bytes. `0` disables enforcement. + pub max_memory: Option, + /// Eviction policy selected when `maxmemory` is reached. + pub max_memory_policy: Option, + /// Number of keys inspected per eviction round. + pub max_memory_samples: Option, } /// Error type for configuration file parsing. @@ -156,6 +163,22 @@ fn apply_directive(cfg: &mut FileConfig, key: &str, value: &str) -> Result<(), S } cfg.loglevel = Some(normalised); } + "maxmemory" => { + cfg.max_memory = Some(parse_bytes(value, "maxmemory")?); + } + "maxmemory-policy" => { + let name = value.to_ascii_lowercase(); + cfg.max_memory_policy = Some(EvictionPolicy::from_name(&name).ok_or_else(|| { + format!( + "invalid maxmemory-policy '{value}' (expected one of noeviction, \ + allkeys-lru, volatile-lru, allkeys-random, volatile-random, \ + volatile-ttl)" + ) + })?); + } + "maxmemory-samples" => { + cfg.max_memory_samples = Some(parse_usize(value, "maxmemory-samples")?); + } other => return Err(format!("unknown directive '{other}'")), } Ok(()) @@ -196,6 +219,36 @@ fn strip_quotes(raw: &str) -> &str { } } +/// Parses byte sizes in Redis-friendly form: a bare integer, or an integer +/// followed by one of the suffixes `b`, `k`, `kb`, `m`, `mb`, `g`, `gb` +/// (case-insensitive, no space between number and suffix). +fn parse_bytes(raw: &str, name: &str) -> Result { + let lower = raw.trim().to_ascii_lowercase(); + let (num, factor): (&str, u64) = if let Some(stripped) = lower.strip_suffix("gb") { + (stripped, 1024 * 1024 * 1024) + } else if let Some(stripped) = lower.strip_suffix("mb") { + (stripped, 1024 * 1024) + } else if let Some(stripped) = lower.strip_suffix("kb") { + (stripped, 1024) + } else if let Some(stripped) = lower.strip_suffix('g') { + (stripped, 1024 * 1024 * 1024) + } else if let Some(stripped) = lower.strip_suffix('m') { + (stripped, 1024 * 1024) + } else if let Some(stripped) = lower.strip_suffix('k') { + (stripped, 1024) + } else if let Some(stripped) = lower.strip_suffix('b') { + (stripped, 1) + } else { + (lower.as_str(), 1) + }; + let num: u64 = num + .trim() + .parse() + .map_err(|_| format!("invalid {name}: '{raw}' is not a byte size"))?; + num.checked_mul(factor) + .ok_or_else(|| format!("invalid {name}: '{raw}' overflows u64")) +} + #[cfg(test)] mod tests { use super::*; @@ -295,4 +348,44 @@ mod tests { let cfg = parse("PORT 12345\n").unwrap(); assert_eq!(cfg.port, Some(12345)); } + + #[test] + fn maxmemory_accepts_byte_and_suffix_forms() { + assert_eq!(parse("maxmemory 0\n").unwrap().max_memory, Some(0)); + assert_eq!(parse("maxmemory 1024\n").unwrap().max_memory, Some(1024)); + assert_eq!(parse("maxmemory 2k\n").unwrap().max_memory, Some(2 * 1024)); + assert_eq!( + parse("maxmemory 100mb\n").unwrap().max_memory, + Some(100 * 1024 * 1024) + ); + assert_eq!( + parse("maxmemory 1GB\n").unwrap().max_memory, + Some(1024 * 1024 * 1024) + ); + } + + #[test] + fn maxmemory_rejects_nonsense() { + assert!(parse("maxmemory abc\n").is_err()); + assert!(parse("maxmemory 10tb\n").is_err()); + } + + #[test] + fn maxmemory_policy_is_parsed_and_validated() { + assert_eq!( + parse("maxmemory-policy allkeys-lru\n") + .unwrap() + .max_memory_policy, + Some(EvictionPolicy::AllKeysLru), + ); + assert!(parse("maxmemory-policy bogus\n").is_err()); + } + + #[test] + fn maxmemory_samples_is_parsed() { + assert_eq!( + parse("maxmemory-samples 12\n").unwrap().max_memory_samples, + Some(12), + ); + } } diff --git a/src/main.rs b/src/main.rs index 4a602b9..97818b3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -44,6 +44,22 @@ fn main() -> ExitCode { Err(code) => return code, }; + let eviction_cfg = args.eviction_config(); + if let Err(e) = engine.set_eviction_config(eviction_cfg) { + error!("failed to apply eviction config: {e}"); + return ExitCode::FAILURE; + } + if eviction_cfg.max_memory == 0 { + info!("maxmemory: unlimited"); + } else { + info!( + "maxmemory: {} bytes, policy={}, samples={}", + eviction_cfg.max_memory, + eviction_cfg.policy.name(), + eviction_cfg.samples, + ); + } + // Bind the listener up front so that `--addr :0` is resolved before we // install the signal handler that needs the concrete address. let listener = match TcpListener::bind(&args.addr) { diff --git a/src/network/server.rs b/src/network/server.rs index 4e4cc7a..0c309ba 100644 --- a/src/network/server.rs +++ b/src/network/server.rs @@ -336,7 +336,58 @@ pub fn execute_command(cmd: Command, engine: &KvEngine, out: &mut Vec) { Ok(TtlStatus::Millis(ms)) => encoder::encode_integer(out, ms), Err(e) => write_ferrum_error(out, &e), }, + Command::MemoryUsage { key } => match engine.memory_usage(&key) { + Ok(Some(bytes)) => encoder::encode_integer(out, bytes as i64), + Ok(None) => encoder::encode_null_bulk(out), + Err(e) => write_ferrum_error(out, &e), + }, + Command::Info { section } => { + let body = render_info(engine, section.as_deref()); + encoder::encode_bulk_string(out, body.as_bytes()); + } + } +} + +/// Produces the payload for `INFO [section]`. +/// +/// Only the `server` and `memory` sections are currently populated, plus a +/// small `keyspace` summary that mirrors Redis' `db0` line. Unknown sections +/// return an empty body, matching Redis' behaviour. +fn render_info(engine: &KvEngine, section: Option<&[u8]>) -> String { + let wants = |name: &str| match section { + None => true, + Some(s) => s.eq_ignore_ascii_case(name.as_bytes()), + }; + + let mut out = String::new(); + if wants("server") { + out.push_str("# Server\r\n"); + out.push_str(concat!( + "ferrum_version:", + env!("CARGO_PKG_VERSION"), + "\r\n" + )); + out.push_str("\r\n"); + } + if wants("memory") { + let used = engine.used_memory(); + let cfg = engine.eviction_config().unwrap_or_default(); + out.push_str("# Memory\r\n"); + out.push_str(&format!("used_memory:{used}\r\n")); + out.push_str(&format!("maxmemory:{}\r\n", cfg.max_memory)); + out.push_str(&format!("maxmemory_policy:{}\r\n", cfg.policy.name())); + out.push_str(&format!("maxmemory_samples:{}\r\n", cfg.samples)); + out.push_str("\r\n"); + } + if wants("keyspace") { + let keys = engine.dbsize().unwrap_or(0); + out.push_str("# Keyspace\r\n"); + if keys > 0 { + out.push_str(&format!("db0:keys={keys},expires=0,avg_ttl=0\r\n")); + } + out.push_str("\r\n"); } + out } /// Converts an `EXPIRE` second delta to milliseconds, saturating on overflow. diff --git a/src/protocol/parser.rs b/src/protocol/parser.rs index 2388bb4..823e79a 100644 --- a/src/protocol/parser.rs +++ b/src/protocol/parser.rs @@ -46,6 +46,12 @@ pub enum Command { Ttl { key: Vec }, /// `PTTL key` — remaining TTL in milliseconds. PTtl { key: Vec }, + /// `MEMORY USAGE key` — approximate byte cost of a single key. + MemoryUsage { key: Vec }, + /// `INFO [section]` — server/memory/clients metrics. `None` means + /// "everything"; only known sections are recognised and unknown ones + /// produce an empty reply, as in Redis. + Info { section: Option> }, } /// Outcome of attempting to parse a single RESP2 frame from a byte buffer. @@ -425,6 +431,35 @@ fn build_command(parts: Vec>) -> Result { key: args.into_iter().next().unwrap(), }) } + b"MEMORY" => { + // `MEMORY` is a container command in Redis; we currently only + // recognise the `USAGE key` subcommand. + if args.len() < 2 { + return Err(FerrumError::WrongArity { cmd: "MEMORY" }); + } + let mut it = args.into_iter(); + let sub = it.next().unwrap(); + if ascii_uppercase(&sub) != b"USAGE" { + return Err(FerrumError::UnknownCommand(format!( + "MEMORY {}", + String::from_utf8_lossy(&sub) + ))); + } + // Accept and ignore the optional `SAMPLES count` tail for + // compatibility; we don't sample anything, we compute exactly. + let key = it.next().unwrap(); + Ok(Command::MemoryUsage { key }) + } + b"INFO" => { + // Zero or one section argument; further sections are ignored + // with a WrongArity error so surprises surface early. + if args.len() > 1 { + return Err(FerrumError::WrongArity { cmd: "INFO" }); + } + Ok(Command::Info { + section: args.into_iter().next(), + }) + } _ => Err(FerrumError::UnknownCommand( String::from_utf8_lossy(&name).into_owned(), )), @@ -899,4 +934,44 @@ mod frame_tests { }; assert!(matches!(err, FerrumError::WrongArity { cmd: "TTL" })); } + + #[test] + fn parses_memory_usage() { + assert_eq!( + parse_exact(b"*3\r\n$6\r\nMEMORY\r\n$5\r\nUSAGE\r\n$1\r\nk\r\n"), + Command::MemoryUsage { key: b"k".to_vec() } + ); + } + + #[test] + fn memory_without_subcommand_is_wrong_arity() { + let err = match parse_frame(b"*1\r\n$6\r\nMEMORY\r\n").unwrap() { + FrameParse::Invalid { error, .. } => error, + other => panic!("expected Invalid, got {other:?}"), + }; + assert!(matches!(err, FerrumError::WrongArity { cmd: "MEMORY" })); + } + + #[test] + fn memory_unknown_subcommand_is_rejected() { + let err = match parse_frame(b"*3\r\n$6\r\nMEMORY\r\n$5\r\nSTATS\r\n$1\r\nk\r\n").unwrap() { + FrameParse::Invalid { error, .. } => error, + other => panic!("expected Invalid, got {other:?}"), + }; + assert!(matches!(err, FerrumError::UnknownCommand(_))); + } + + #[test] + fn parses_info_with_and_without_section() { + assert_eq!( + parse_exact(b"*1\r\n$4\r\nINFO\r\n"), + Command::Info { section: None } + ); + assert_eq!( + parse_exact(b"*2\r\n$4\r\nINFO\r\n$6\r\nmemory\r\n"), + Command::Info { + section: Some(b"memory".to_vec()) + } + ); + } } diff --git a/src/storage/engine.rs b/src/storage/engine.rs index 42f9cee..67b355b 100644 --- a/src/storage/engine.rs +++ b/src/storage/engine.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; @@ -6,6 +7,7 @@ use log::warn; use crate::error::FerrumError; use crate::persistence::AofWriter; +use crate::storage::eviction::{self, Candidate, EvictionConfig, EvictionPolicy, EvictionScope}; /// Maximum allowed key size in bytes (64 KiB). pub const KEY_MAX_BYTES: usize = 64 * 1024; @@ -13,15 +15,28 @@ pub const KEY_MAX_BYTES: usize = 64 * 1024; /// Maximum allowed value size in bytes (16 MiB). pub const VALUE_MAX_BYTES: usize = 16 * 1024 * 1024; +/// Approximate per-entry overhead charged on top of `key.len() + value.len()`. +/// +/// Covers the `HashMap` bucket, the `ValueEntry` metadata, and the per-key +/// allocations that `Vec` carries. Chosen as a conservative constant so +/// the reported `used_memory` stays a useful safety bound without pretending +/// to match the allocator to the byte. +pub const PER_ENTRY_OVERHEAD: u64 = 48; + /// A value stored in the engine, together with its optional expiration. /// /// `expire_at` is a monotonic deadline derived from [`Instant::now`] at the /// time the TTL was installed. A value whose deadline is `<= Instant::now()` /// is considered expired and must be treated as absent by every read path. +/// +/// `last_access` tracks the most recent successful read or write and is +/// maintained inside the store's write lock. Only the approximate LRU +/// policies consult it, so keeping it current is best-effort. #[derive(Clone)] struct ValueEntry { data: Vec, expire_at: Option, + last_access: Instant, } impl ValueEntry { @@ -29,12 +44,17 @@ impl ValueEntry { Self { data, expire_at: None, + last_access: Instant::now(), } } fn is_expired(&self, now: Instant) -> bool { matches!(self.expire_at, Some(deadline) if deadline <= now) } + + fn touch(&mut self) { + self.last_access = Instant::now(); + } } /// A thread-safe key-value storage engine backed by a [`HashMap`]. @@ -61,6 +81,17 @@ impl ValueEntry { pub struct KvEngine { store: Arc, ValueEntry>>>, aof: Option>, + /// Approximate memory footprint of the live dataset, in bytes. + /// + /// The counter is updated inside the store's write lock so it stays + /// consistent with the `HashMap` it describes. Readers can load it + /// without holding any engine lock, which keeps `INFO memory` cheap. + used_memory: Arc, + /// Live eviction configuration. + /// + /// Wrapped in a lock so the server can mutate it at runtime (e.g. via + /// a future `CONFIG SET` command) without cloning the whole engine. + eviction: Arc>, } impl Default for KvEngine { @@ -75,6 +106,8 @@ impl KvEngine { Self { store: Arc::default(), aof: None, + used_memory: Arc::new(AtomicU64::new(0)), + eviction: Arc::new(RwLock::new(EvictionConfig::default())), } } @@ -87,6 +120,21 @@ impl KvEngine { self } + /// Replaces the live eviction configuration. + /// + /// Takes effect for all subsequent writes; already-stored keys are + /// eligible under the new policy immediately. + pub fn set_eviction_config(&self, cfg: EvictionConfig) -> Result<(), FerrumError> { + let mut guard = self.eviction.write()?; + *guard = cfg; + Ok(()) + } + + /// Returns a copy of the current eviction configuration. + pub fn eviction_config(&self) -> Result { + Ok(*self.eviction.read()?) + } + /// Sets a key-value pair and returns the previous value, if any. /// /// Matches Redis' default semantics: any existing TTL on the key is @@ -100,10 +148,11 @@ impl KvEngine { validate_value(&value)?; let mut store = self.store.write()?; + self.enforce_for_write(&mut store, &key, value.len())?; if let Some(aof) = &self.aof { log_aof_result("SET", aof.append_set(&key, &value)); } - let previous = store.insert(key, ValueEntry::new(value)); + let previous = self.track_insert(&mut store, key, ValueEntry::new(value)); Ok(previous.and_then(live_payload)) } @@ -126,12 +175,13 @@ impl KvEngine { // the key were absent. We intentionally do not log a DEL because // the subsequent SET, once replayed, overwrites the stale entry // anyway and skipping the DEL keeps the log shorter. - store.remove(key.as_slice()); + self.track_remove(&mut store, key.as_slice()); } + self.enforce_for_write(&mut store, &key, value.len())?; if let Some(aof) = &self.aof { log_aof_result("SETNX", aof.append_set(&key, &value)); } - store.insert(key, ValueEntry::new(value)); + self.track_insert(&mut store, key, ValueEntry::new(value)); Ok(true) } @@ -148,11 +198,14 @@ impl KvEngine { } let mut store = self.store.write()?; + for (k, v) in &pairs { + self.enforce_for_write(&mut store, k, v.len())?; + } if let Some(aof) = &self.aof { log_aof_result("MSET", aof.append_set_many(&pairs)); } for (k, v) in pairs { - store.insert(k, ValueEntry::new(v)); + self.track_insert(&mut store, k, ValueEntry::new(v)); } Ok(()) } @@ -169,11 +222,15 @@ impl KvEngine { for key in keys { match store.get(key.as_slice()) { Some(entry) if entry.is_expired(now) => { - store.remove(key.as_slice()); + self.track_remove(&mut store, key.as_slice()); self.log_expire_drop(key); out.push(None); } - Some(entry) => out.push(Some(entry.data.clone())), + Some(entry) => { + let data = entry.data.clone(); + self.touch_access(&mut store, key.as_slice()); + out.push(Some(data)); + } None => out.push(None), } } @@ -198,7 +255,7 @@ impl KvEngine { let now = Instant::now(); let (current, existing_deadline) = match store.get(key.as_slice()) { Some(entry) if entry.is_expired(now) => { - store.remove(key.as_slice()); + self.track_remove(&mut store, key.as_slice()); self.log_expire_drop(&key); (0i64, None) } @@ -219,6 +276,7 @@ impl KvEngine { })?; let serialised = new_value.to_string().into_bytes(); + self.enforce_for_write(&mut store, &key, serialised.len())?; if let Some(aof) = &self.aof { log_aof_result("INCRBY", aof.append_set(&key, &serialised)); // INCR preserves TTL: re-emit the existing deadline so replay @@ -229,11 +287,13 @@ impl KvEngine { log_aof_result("PEXPIREAT", aof.append_pexpireat(&key, abs_ms)); } } - store.insert( + self.track_insert( + &mut store, key, ValueEntry { data: serialised, expire_at: existing_deadline, + last_access: Instant::now(), }, ); Ok(new_value) @@ -245,11 +305,15 @@ impl KvEngine { let now = Instant::now(); match store.get(key) { Some(entry) if entry.is_expired(now) => { - store.remove(key); + self.track_remove(&mut store, key); self.log_expire_drop(key); Ok(None) } - Some(entry) => Ok(Some(entry.data.clone())), + Some(entry) => { + let data = entry.data.clone(); + self.touch_access(&mut store, key); + Ok(Some(data)) + } None => Ok(None), } } @@ -258,7 +322,7 @@ impl KvEngine { pub fn del(&self, key: &[u8]) -> Result { let mut store = self.store.write()?; let now = Instant::now(); - let existed = match store.remove(key) { + let existed = match self.track_remove(&mut store, key) { Some(entry) => !entry.is_expired(now), None => false, }; @@ -284,7 +348,7 @@ impl KvEngine { let now = Instant::now(); let mut removed: Vec<&[u8]> = Vec::with_capacity(keys.len()); for key in keys { - if let Some(entry) = store.remove(key.as_slice()) + if let Some(entry) = self.track_remove(&mut store, key.as_slice()) && !entry.is_expired(now) { removed.push(key.as_slice()); @@ -304,7 +368,7 @@ impl KvEngine { let now = Instant::now(); match store.get(key) { Some(entry) if entry.is_expired(now) => { - store.remove(key); + self.track_remove(&mut store, key); self.log_expire_drop(key); Ok(false) } @@ -337,7 +401,7 @@ impl KvEngine { let now = Instant::now(); let (new_value, existing_deadline) = match store.get(key.as_slice()) { Some(entry) if entry.is_expired(now) => { - store.remove(key.as_slice()); + self.track_remove(&mut store, key.as_slice()); self.log_expire_drop(&key); (suffix, None) } @@ -351,6 +415,7 @@ impl KvEngine { }; validate_value(&new_value)?; + self.enforce_for_write(&mut store, &key, new_value.len())?; if let Some(aof) = &self.aof { log_aof_result("APPEND", aof.append_set(&key, &new_value)); if let Some(deadline) = existing_deadline @@ -360,11 +425,13 @@ impl KvEngine { } } let new_len = new_value.len(); - store.insert( + self.track_insert( + &mut store, key, ValueEntry { data: new_value, expire_at: existing_deadline, + last_access: Instant::now(), }, ); Ok(new_len) @@ -376,7 +443,7 @@ impl KvEngine { let now = Instant::now(); match store.get(key) { Some(entry) if entry.is_expired(now) => { - store.remove(key); + self.track_remove(&mut store, key); self.log_expire_drop(key); Ok(0) } @@ -388,7 +455,7 @@ impl KvEngine { /// Removes all keys from the store. pub fn flushdb(&self) -> Result<(), FerrumError> { let mut store = self.store.write()?; - store.clear(); + self.track_clear(&mut store); if let Some(aof) = &self.aof { log_aof_result("FLUSHDB", aof.append_flushdb()); } @@ -414,7 +481,7 @@ impl KvEngine { if let Some(entry) = store.get(key) && entry.is_expired(now_instant) { - store.remove(key); + self.track_remove(&mut store, key); self.log_expire_drop(key); } @@ -423,7 +490,7 @@ impl KvEngine { } if abs_epoch_ms <= now_ms { - store.remove(key); + self.track_remove(&mut store, key); if let Some(aof) = &self.aof { log_aof_result("DEL", aof.append_del(key)); } @@ -452,7 +519,7 @@ impl KvEngine { if let Some(entry) = store.get(key) && entry.is_expired(now) { - store.remove(key); + self.track_remove(&mut store, key); self.log_expire_drop(key); return Ok(false); } @@ -481,7 +548,7 @@ impl KvEngine { match store.get(key) { None => Ok(TtlStatus::Missing), Some(entry) if entry.is_expired(now) => { - store.remove(key); + self.track_remove(&mut store, key); self.log_expire_drop(key); Ok(TtlStatus::Missing) } @@ -529,7 +596,7 @@ impl KvEngine { let evicted = victims.len(); for key in &victims { - store.remove(key.as_slice()); + self.track_remove(&mut store, key.as_slice()); if let Some(aof) = &self.aof { log_aof_result("DEL", aof.append_del(key)); } @@ -547,6 +614,222 @@ impl KvEngine { log_aof_result("DEL", aof.append_del(key)); } } + + /// Returns the current approximate memory footprint, in bytes. + /// + /// Includes key bytes, value bytes, and a fixed per-entry overhead + /// (see [`PER_ENTRY_OVERHEAD`]). The value is eventually consistent + /// with the store: it is updated inside the write lock that guards + /// every mutation, so callers observing it after a successful command + /// always see the post-mutation total. + pub fn used_memory(&self) -> u64 { + self.used_memory.load(Ordering::Relaxed) + } + + /// Returns the approximate per-entry cost of `key`, in bytes, or + /// `None` if the key is absent or already expired. + /// + /// Matches the `MEMORY USAGE` command's contract of reporting the + /// single-entry contribution that `used_memory()` would shed if the + /// key were removed. + pub fn memory_usage(&self, key: &[u8]) -> Result, FerrumError> { + let mut store = self.store.write()?; + let now = Instant::now(); + match store.get(key) { + Some(entry) if entry.is_expired(now) => { + // Surface the expiration through the normal lazy path so + // both MEMORY USAGE and GET agree on the key being gone. + let removed = store.remove(key); + if let Some(entry) = removed { + self.untrack(key, &entry); + } + self.log_expire_drop(key); + Ok(None) + } + Some(entry) => Ok(Some(entry_bytes(key, entry))), + None => Ok(None), + } + } + + /// Inserts `entry` into `store` and updates the memory counter, + /// returning the old entry (if any) so callers can reason about + /// overwrite semantics. + fn track_insert( + &self, + store: &mut HashMap, ValueEntry>, + key: Vec, + entry: ValueEntry, + ) -> Option { + let incoming = entry_bytes(&key, &entry); + let previous = store.insert(key.clone(), entry); + let outgoing = previous.as_ref().map(|p| entry_bytes(&key, p)).unwrap_or(0); + self.apply_delta(incoming as i64 - outgoing as i64); + previous + } + + /// Evicts keys until `used_memory + incoming <= max_memory`, honouring + /// the active [`EvictionPolicy`]. + /// + /// `incoming` is the approximate byte cost of the entry that is about + /// to be written. Passing it up front lets the sweep leave enough + /// headroom for a single write so the caller never overshoots the + /// ceiling by a full entry. Callers that only want the current + /// counter to fit (for example, retroactive enforcement after a + /// `maxmemory` shrink) can pass `0`. + /// + /// Returns `Err(FerrumError::OutOfMemory)` when the policy is + /// `noeviction`, when no policy-eligible key exists, or when the best + /// effort eviction still cannot free enough space. Successful evictions + /// are logged to the AOF so replay converges on the post-eviction state. + /// + /// The caller must already hold the write lock on `store`. + fn enforce_memory_limit( + &self, + store: &mut HashMap, ValueEntry>, + incoming: u64, + ) -> Result<(), FerrumError> { + let cfg = *self.eviction.read()?; + if cfg.max_memory == 0 { + return Ok(()); + } + let fits = |used: u64| used.saturating_add(incoming) <= cfg.max_memory; + // Bounded loop so a pathological policy cannot spin forever. + for _ in 0..store.len().max(1) + 16 { + if fits(self.used_memory.load(Ordering::Relaxed)) { + return Ok(()); + } + if cfg.policy == EvictionPolicy::NoEviction { + return Err(FerrumError::OutOfMemory); + } + + let candidates = sample_candidates(store, cfg.policy.scope(), cfg.samples); + let Some(victim) = eviction::pick_victim(cfg.policy, candidates) else { + // For volatile policies on a dataset without TTL, Redis + // reports OOM; we do the same so the caller sees a clear + // failure instead of a silent accept. + return Err(FerrumError::OutOfMemory); + }; + self.track_remove(store, &victim.key); + if let Some(aof) = &self.aof { + log_aof_result("DEL", aof.append_del(&victim.key)); + } + } + if fits(self.used_memory.load(Ordering::Relaxed)) { + Ok(()) + } else { + Err(FerrumError::OutOfMemory) + } + } + + /// Updates the `last_access` stamp on `key` if it is still live. + /// + /// Used by read paths to give the LRU policy some accuracy without + /// forcing the caller to touch `ValueEntry` internals. + fn touch_access(&self, store: &mut HashMap, ValueEntry>, key: &[u8]) { + if let Some(entry) = store.get_mut(key) { + entry.touch(); + } + } + + /// Convenience wrapper: computes the net byte increase that writing + /// `(key, value_len)` would introduce and forwards it to + /// [`Self::enforce_memory_limit`]. + fn enforce_for_write( + &self, + store: &mut HashMap, ValueEntry>, + key: &[u8], + value_len: usize, + ) -> Result<(), FerrumError> { + let incoming = key.len() as u64 + value_len as u64 + PER_ENTRY_OVERHEAD; + let net = incoming.saturating_sub(store.get(key).map(|e| entry_bytes(key, e)).unwrap_or(0)); + self.enforce_memory_limit(store, net) + } + + /// Removes `key` from `store`, updates the memory counter, and + /// returns the evicted entry. A no-op when the key is absent. + fn track_remove( + &self, + store: &mut HashMap, ValueEntry>, + key: &[u8], + ) -> Option { + let removed = store.remove(key); + if let Some(entry) = &removed { + self.apply_delta(-(entry_bytes(key, entry) as i64)); + } + removed + } + + /// Clears every entry, resetting the memory counter to zero. + fn track_clear(&self, store: &mut HashMap, ValueEntry>) { + store.clear(); + self.used_memory.store(0, Ordering::Relaxed); + } + + /// Untracks an entry that was already removed by some other path + /// (e.g. a caller that held onto the returned `Option`). + fn untrack(&self, key: &[u8], entry: &ValueEntry) { + self.apply_delta(-(entry_bytes(key, entry) as i64)); + } + + fn apply_delta(&self, delta: i64) { + if delta == 0 { + return; + } + if delta > 0 { + self.used_memory.fetch_add(delta as u64, Ordering::Relaxed); + } else { + let mag = (-delta) as u64; + // Saturating sub: the counter is an approximation, and any + // underflow would only happen if the invariants slipped, which + // we'd rather clamp than panic on in release builds. + let mut cur = self.used_memory.load(Ordering::Relaxed); + loop { + let next = cur.saturating_sub(mag); + match self.used_memory.compare_exchange_weak( + cur, + next, + Ordering::Relaxed, + Ordering::Relaxed, + ) { + Ok(_) => break, + Err(obs) => cur = obs, + } + } + } + } +} + +/// Approximate byte cost of a single live entry. +fn entry_bytes(key: &[u8], entry: &ValueEntry) -> u64 { + key.len() as u64 + entry.data.len() as u64 + PER_ENTRY_OVERHEAD +} + +/// Samples up to `sample` candidates from `store` under the given scope. +/// +/// `HashMap`'s iteration order is already randomised by its hasher, so we +/// rely on that for "pick at random" without introducing an extra RNG +/// dependency. For `EvictionScope::Volatile` we skip keys without a TTL and +/// keep walking until the cap is reached or the store is exhausted. +fn sample_candidates( + store: &HashMap, ValueEntry>, + scope: EvictionScope, + sample: usize, +) -> Vec { + let mut out = Vec::with_capacity(sample); + for (key, entry) in store.iter() { + if out.len() >= sample { + break; + } + if matches!(scope, EvictionScope::Volatile) && entry.expire_at.is_none() { + continue; + } + out.push(Candidate { + key: key.clone(), + last_access: Some(entry.last_access), + expire_at: entry.expire_at, + }); + } + out } /// Outcome of a single call to [`KvEngine::sweep_expired`]. @@ -1321,4 +1604,204 @@ mod tests { assert_eq!(stats.examined, 0); assert_eq!(stats.evicted, 0); } + + #[test] + fn used_memory_starts_at_zero_and_grows_on_set() { + let engine = KvEngine::new(); + assert_eq!(engine.used_memory(), 0); + engine.set(b"k".to_vec(), b"v".to_vec()).unwrap(); + let after_set = engine.used_memory(); + assert!(after_set >= 2 + PER_ENTRY_OVERHEAD); + } + + #[test] + fn used_memory_adjusts_on_overwrite_and_delete() { + let engine = KvEngine::new(); + engine.set(b"k".to_vec(), b"short".to_vec()).unwrap(); + let a = engine.used_memory(); + engine.set(b"k".to_vec(), vec![b'x'; 100]).unwrap(); + let b = engine.used_memory(); + assert!(b > a, "grow overwrite should increase used_memory"); + + engine.del(b"k").unwrap(); + assert_eq!(engine.used_memory(), 0); + } + + #[test] + fn flushdb_resets_used_memory() { + let engine = KvEngine::new(); + engine.set(b"a".to_vec(), b"1".to_vec()).unwrap(); + engine.set(b"b".to_vec(), b"2".to_vec()).unwrap(); + assert!(engine.used_memory() > 0); + engine.flushdb().unwrap(); + assert_eq!(engine.used_memory(), 0); + } + + #[test] + fn memory_usage_returns_per_entry_bytes_or_none() { + let engine = KvEngine::new(); + assert_eq!(engine.memory_usage(b"absent").unwrap(), None); + engine.set(b"k".to_vec(), b"value".to_vec()).unwrap(); + let reported = engine.memory_usage(b"k").unwrap().unwrap(); + assert_eq!(reported, 1 + 5 + PER_ENTRY_OVERHEAD); + } + + #[test] + fn memory_usage_expires_key_lazily() { + let engine = KvEngine::new(); + engine.set(b"k".to_vec(), b"v".to_vec()).unwrap(); + { + let mut store = engine.store.write().unwrap(); + if let Some(entry) = store.get_mut(b"k".as_slice()) { + entry.expire_at = Some(Instant::now() - Duration::from_millis(1)); + } + } + assert_eq!(engine.memory_usage(b"k").unwrap(), None); + assert_eq!(engine.used_memory(), 0); + } + + #[test] + fn used_memory_tracks_mset_and_del_many() { + let engine = KvEngine::new(); + engine + .mset(vec![ + (b"a".to_vec(), b"1".to_vec()), + (b"bb".to_vec(), b"22".to_vec()), + ]) + .unwrap(); + assert_eq!( + engine.used_memory(), + (1 + 1 + PER_ENTRY_OVERHEAD) + (2 + 2 + PER_ENTRY_OVERHEAD), + ); + let removed = engine + .del_many(&[b"a".to_vec(), b"bb".to_vec(), b"missing".to_vec()]) + .unwrap(); + assert_eq!(removed, 2); + assert_eq!(engine.used_memory(), 0); + } + + #[test] + fn noeviction_refuses_writes_past_max_memory() { + let engine = KvEngine::new(); + engine + .set_eviction_config(EvictionConfig { + max_memory: PER_ENTRY_OVERHEAD + 8, + policy: EvictionPolicy::NoEviction, + samples: 5, + }) + .unwrap(); + engine.set(b"k".to_vec(), b"12345".to_vec()).unwrap(); + // Writing anything now overshoots: expect OOM without touching the + // existing key. + let err = engine.set(b"x".to_vec(), b"y".to_vec()).unwrap_err(); + assert!(matches!(err, FerrumError::OutOfMemory)); + assert_eq!(engine.get(b"k").unwrap(), Some(b"12345".to_vec())); + } + + #[test] + fn allkeys_lru_evicts_least_recently_used_key() { + let engine = KvEngine::new(); + // Room for exactly two entries. + let max = 2 * (1 + 3 + PER_ENTRY_OVERHEAD); + engine + .set_eviction_config(EvictionConfig { + max_memory: max, + policy: EvictionPolicy::AllKeysLru, + samples: 10, + }) + .unwrap(); + + engine.set(b"a".to_vec(), b"AAA".to_vec()).unwrap(); + std::thread::sleep(Duration::from_millis(5)); + engine.set(b"b".to_vec(), b"BBB".to_vec()).unwrap(); + std::thread::sleep(Duration::from_millis(5)); + + // Touch `a` so it becomes the newest. + engine.get(b"a").unwrap(); + std::thread::sleep(Duration::from_millis(5)); + + // Third insert must evict: `b` is now the least recently used. + engine.set(b"c".to_vec(), b"CCC".to_vec()).unwrap(); + assert_eq!( + engine.get(b"b").unwrap(), + None, + "b should have been evicted" + ); + assert!(engine.exists(b"a").unwrap()); + assert!(engine.exists(b"c").unwrap()); + } + + #[test] + fn volatile_lru_falls_back_to_oom_without_volatile_keys() { + let engine = KvEngine::new(); + engine + .set_eviction_config(EvictionConfig { + max_memory: PER_ENTRY_OVERHEAD + 2, + policy: EvictionPolicy::VolatileLru, + samples: 5, + }) + .unwrap(); + engine.set(b"k".to_vec(), b"v".to_vec()).unwrap(); + let err = engine.set(b"x".to_vec(), b"y".to_vec()).unwrap_err(); + assert!(matches!(err, FerrumError::OutOfMemory)); + } + + #[test] + fn volatile_ttl_evicts_soonest_expiring() { + let engine = KvEngine::new(); + let max = 2 * (1 + 1 + PER_ENTRY_OVERHEAD); + engine + .set_eviction_config(EvictionConfig { + max_memory: max, + policy: EvictionPolicy::VolatileTtl, + samples: 10, + }) + .unwrap(); + engine.set(b"a".to_vec(), b"1".to_vec()).unwrap(); + engine.set(b"b".to_vec(), b"2".to_vec()).unwrap(); + let now = current_epoch_ms(); + engine.expire_at_ms(b"a", now + 10_000).unwrap(); + engine.expire_at_ms(b"b", now + 60_000).unwrap(); + + engine.set(b"c".to_vec(), b"3".to_vec()).unwrap(); + assert_eq!( + engine.get(b"a").unwrap(), + None, + "shortest-TTL key should evict" + ); + assert!(engine.exists(b"b").unwrap()); + assert!(engine.exists(b"c").unwrap()); + } + + #[test] + fn allkeys_random_picks_some_victim_under_pressure() { + let engine = KvEngine::new(); + let max = 2 * (1 + 1 + PER_ENTRY_OVERHEAD); + engine + .set_eviction_config(EvictionConfig { + max_memory: max, + policy: EvictionPolicy::AllKeysRandom, + samples: 5, + }) + .unwrap(); + engine.set(b"a".to_vec(), b"1".to_vec()).unwrap(); + engine.set(b"b".to_vec(), b"2".to_vec()).unwrap(); + engine.set(b"c".to_vec(), b"3".to_vec()).unwrap(); + // One of the three must have been evicted. + let survivors = [b"a", b"b", b"c"] + .iter() + .filter(|k| engine.exists(k.as_slice()).unwrap()) + .count(); + assert_eq!(survivors, 2); + } + + #[test] + fn zero_max_memory_disables_enforcement() { + let engine = KvEngine::new(); + for i in 0..100 { + let k = format!("k{i}"); + engine.set(k.into_bytes(), vec![b'x'; 128]).unwrap(); + } + assert_eq!(engine.dbsize().unwrap(), 100); + } } diff --git a/src/storage/eviction.rs b/src/storage/eviction.rs new file mode 100644 index 0000000..c17f2be --- /dev/null +++ b/src/storage/eviction.rs @@ -0,0 +1,203 @@ +//! Eviction configuration and candidate selection. +//! +//! Separated from [`crate::storage::engine`] so the policy logic can be +//! unit-tested in isolation. Candidate selection is approximate, matching +//! Redis' design: instead of maintaining a global priority structure, we +//! sample a small number of keys and pick the least desirable one. + +use std::time::Instant; + +/// How the engine should decide which key to drop when it runs out of memory. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum EvictionPolicy { + /// Refuse writes that would grow the dataset past `maxmemory`. + NoEviction, + /// Approximate LRU over every key. + AllKeysLru, + /// Approximate LRU over keys that have a TTL; falls back to + /// [`EvictionPolicy::NoEviction`] if no volatile keys exist. + VolatileLru, + /// Pick a random key. + AllKeysRandom, + /// Pick a random volatile key. + VolatileRandom, + /// Pick the volatile key with the shortest remaining TTL. + VolatileTtl, +} + +impl EvictionPolicy { + /// Parses the wire name used by the config file and by `CONFIG SET`. + pub fn from_name(name: &str) -> Option { + match name { + "noeviction" => Some(Self::NoEviction), + "allkeys-lru" => Some(Self::AllKeysLru), + "volatile-lru" => Some(Self::VolatileLru), + "allkeys-random" => Some(Self::AllKeysRandom), + "volatile-random" => Some(Self::VolatileRandom), + "volatile-ttl" => Some(Self::VolatileTtl), + _ => None, + } + } + + pub fn name(self) -> &'static str { + match self { + Self::NoEviction => "noeviction", + Self::AllKeysLru => "allkeys-lru", + Self::VolatileLru => "volatile-lru", + Self::AllKeysRandom => "allkeys-random", + Self::VolatileRandom => "volatile-random", + Self::VolatileTtl => "volatile-ttl", + } + } + + pub fn scope(self) -> EvictionScope { + match self { + Self::NoEviction => EvictionScope::AllKeys, + Self::AllKeysLru | Self::AllKeysRandom => EvictionScope::AllKeys, + Self::VolatileLru | Self::VolatileRandom | Self::VolatileTtl => EvictionScope::Volatile, + } + } +} + +/// Which subset of the keyspace is eligible for eviction under a policy. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum EvictionScope { + /// Every live key is a candidate. + AllKeys, + /// Only keys with an active TTL are candidates. + Volatile, +} + +/// User-facing eviction configuration. +#[derive(Debug, Clone, Copy)] +pub struct EvictionConfig { + /// Soft memory ceiling, in bytes. Zero means unlimited. + pub max_memory: u64, + /// Strategy used when the ceiling is exceeded. + pub policy: EvictionPolicy, + /// How many random candidates are inspected per eviction round. + /// + /// Redis' default is 5; higher values give better approximations at + /// the cost of doing more work per write that overflows memory. + pub samples: usize, +} + +impl Default for EvictionConfig { + fn default() -> Self { + Self { + max_memory: 0, + policy: EvictionPolicy::NoEviction, + samples: 5, + } + } +} + +/// Metadata forwarded by the engine for each sampling candidate. +#[derive(Debug, Clone)] +pub struct Candidate { + /// Owned copy of the key, returned so the caller can drop it from the + /// store without having to re-borrow the map. + pub key: Vec, + /// `Instant` of the most recent access; `None` if the key has never + /// been touched (should not happen in practice). + pub last_access: Option, + /// TTL deadline, if any. + pub expire_at: Option, +} + +/// Picks the "worst" candidate according to `policy`. +/// +/// Returns `None` when the iterator was empty — typically because no key +/// matched the policy's scope (for example `volatile-lru` on a dataset +/// without any TTL). Callers should treat that the same way they'd treat +/// [`EvictionPolicy::NoEviction`]. +pub fn pick_victim(policy: EvictionPolicy, candidates: Vec) -> Option { + if candidates.is_empty() { + return None; + } + match policy { + EvictionPolicy::NoEviction => None, + EvictionPolicy::AllKeysRandom | EvictionPolicy::VolatileRandom => { + // Pick the last candidate: the engine supplies them in random + // iteration order (HashMap + indexmap are already unordered + // from the client's point of view). + candidates.into_iter().next() + } + EvictionPolicy::AllKeysLru | EvictionPolicy::VolatileLru => candidates + .into_iter() + .min_by_key(|c| c.last_access.unwrap_or(Instant::now())), + EvictionPolicy::VolatileTtl => candidates + .into_iter() + .filter(|c| c.expire_at.is_some()) + .min_by_key(|c| c.expire_at.unwrap()), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + fn candidate(name: &str, age_ms: u64, ttl_ms: Option) -> Candidate { + let now = Instant::now(); + Candidate { + key: name.as_bytes().to_vec(), + last_access: Some(now - Duration::from_millis(age_ms)), + expire_at: ttl_ms.map(|t| now + Duration::from_millis(t)), + } + } + + #[test] + fn lru_picks_oldest_access() { + let pick = pick_victim( + EvictionPolicy::AllKeysLru, + vec![ + candidate("young", 10, None), + candidate("old", 1_000, None), + candidate("mid", 100, None), + ], + ) + .unwrap(); + assert_eq!(pick.key, b"old"); + } + + #[test] + fn volatile_ttl_picks_shortest_remaining() { + let pick = pick_victim( + EvictionPolicy::VolatileTtl, + vec![ + candidate("long", 0, Some(60_000)), + candidate("short", 0, Some(500)), + candidate("mid", 0, Some(5_000)), + ], + ) + .unwrap(); + assert_eq!(pick.key, b"short"); + } + + #[test] + fn no_eviction_never_picks() { + assert!(pick_victim(EvictionPolicy::NoEviction, vec![candidate("a", 0, None)]).is_none()); + } + + #[test] + fn empty_candidates_return_none() { + assert!(pick_victim(EvictionPolicy::AllKeysLru, vec![]).is_none()); + } + + #[test] + fn policy_name_roundtrip() { + for name in [ + "noeviction", + "allkeys-lru", + "volatile-lru", + "allkeys-random", + "volatile-random", + "volatile-ttl", + ] { + let p = EvictionPolicy::from_name(name).unwrap(); + assert_eq!(p.name(), name); + } + assert!(EvictionPolicy::from_name("bogus").is_none()); + } +} diff --git a/src/storage/mod.rs b/src/storage/mod.rs index da81298..88aedcb 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1,2 +1,3 @@ pub mod engine; +pub mod eviction; pub mod expire; diff --git a/tests/eviction_test.rs b/tests/eviction_test.rs new file mode 100644 index 0000000..2a20935 --- /dev/null +++ b/tests/eviction_test.rs @@ -0,0 +1,206 @@ +//! End-to-end tests for memory accounting, MEMORY USAGE, INFO, and +//! maxmemory eviction. + +use std::io::{Read, Write}; +use std::net::{TcpListener, TcpStream}; +use std::thread; +use std::time::Duration; + +use ferrum_kv::network::server::{self, ServerConfig}; +use ferrum_kv::network::shutdown::Shutdown; +use ferrum_kv::protocol::encoder; +use ferrum_kv::storage::engine::KvEngine; +use ferrum_kv::storage::eviction::{EvictionConfig, EvictionPolicy}; + +struct ServerGuard { + addr: String, + shutdown: Shutdown, + _server: thread::JoinHandle<()>, +} + +impl Drop for ServerGuard { + fn drop(&mut self) { + self.shutdown.trigger(); + let _ = TcpStream::connect_timeout(&self.addr.parse().unwrap(), Duration::from_millis(200)); + } +} + +fn spawn_server_with_engine(engine: KvEngine) -> ServerGuard { + let listener = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port"); + let addr = listener.local_addr().expect("local_addr").to_string(); + let shutdown = Shutdown::new(); + let shutdown_clone = shutdown.clone(); + let handle = thread::spawn(move || { + let _ = server::run_listener(listener, engine, shutdown_clone, ServerConfig::default()); + }); + ServerGuard { + addr, + shutdown, + _server: handle, + } +} + +fn connect(addr: &str) -> TcpStream { + let stream = TcpStream::connect(addr).expect("connect"); + stream + .set_read_timeout(Some(Duration::from_secs(2))) + .unwrap(); + stream + .set_write_timeout(Some(Duration::from_secs(2))) + .unwrap(); + stream +} + +fn build_request(args: &[&[u8]]) -> Vec { + let mut out = Vec::new(); + out.extend_from_slice(format!("*{}\r\n", args.len()).as_bytes()); + for arg in args { + encoder::encode_bulk_string(&mut out, arg); + } + out +} + +fn send(stream: &mut TcpStream, request: &[u8]) -> Vec { + stream.write_all(request).expect("write request"); + read_one_reply(stream) +} + +fn read_one_reply(stream: &mut TcpStream) -> Vec { + let mut out = Vec::new(); + let mut byte = [0u8; 1]; + stream.read_exact(&mut byte).expect("read type byte"); + out.push(byte[0]); + match byte[0] { + b'+' | b'-' | b':' => read_until_crlf(stream, &mut out), + b'$' => { + let mut header = Vec::new(); + read_until_crlf(stream, &mut header); + let header_str = + std::str::from_utf8(&header[..header.len() - 2]).expect("ascii length"); + let len: i64 = header_str.parse().expect("integer length"); + out.extend_from_slice(&header); + if len >= 0 { + let mut body = vec![0u8; len as usize + 2]; + stream.read_exact(&mut body).expect("read bulk body"); + out.extend_from_slice(&body); + } + } + other => panic!("unexpected RESP type byte {other:#x}"), + } + out +} + +fn read_until_crlf(stream: &mut TcpStream, out: &mut Vec) { + let mut byte = [0u8; 1]; + loop { + stream.read_exact(&mut byte).expect("read byte"); + out.push(byte[0]); + if out.len() >= 2 && out[out.len() - 2] == b'\r' && out[out.len() - 1] == b'\n' { + return; + } + } +} + +#[test] +fn memory_usage_reports_integer_for_known_key() { + let engine = KvEngine::new(); + let guard = spawn_server_with_engine(engine); + let mut stream = connect(&guard.addr); + + send(&mut stream, &build_request(&[b"SET", b"k", b"hello"])); + let reply = send(&mut stream, &build_request(&[b"MEMORY", b"USAGE", b"k"])); + // Payload is `:\r\n`. + let text = std::str::from_utf8(&reply).unwrap(); + assert!(text.starts_with(':'), "reply: {text:?}"); + let n: u64 = text[1..text.len() - 2].parse().unwrap(); + assert!(n >= (1 + 5), "memory usage should cover key+value: {n}"); +} + +#[test] +fn memory_usage_returns_null_bulk_for_missing_key() { + let engine = KvEngine::new(); + let guard = spawn_server_with_engine(engine); + let mut stream = connect(&guard.addr); + + let reply = send( + &mut stream, + &build_request(&[b"MEMORY", b"USAGE", b"absent"]), + ); + assert_eq!(reply, b"$-1\r\n"); +} + +#[test] +fn info_memory_contains_used_memory_and_policy() { + let engine = KvEngine::new(); + engine + .set_eviction_config(EvictionConfig { + max_memory: 1024, + policy: EvictionPolicy::AllKeysLru, + samples: 7, + }) + .unwrap(); + let guard = spawn_server_with_engine(engine); + let mut stream = connect(&guard.addr); + + let reply = send(&mut stream, &build_request(&[b"INFO", b"memory"])); + let text = String::from_utf8_lossy(&reply); + assert!(text.starts_with('$'), "reply: {text}"); + assert!(text.contains("used_memory:")); + assert!(text.contains("maxmemory:1024")); + assert!(text.contains("maxmemory_policy:allkeys-lru")); + assert!(text.contains("maxmemory_samples:7")); +} + +#[test] +fn noeviction_returns_oom_error_on_overflow() { + let engine = KvEngine::new(); + engine + .set_eviction_config(EvictionConfig { + max_memory: 64, // tight + policy: EvictionPolicy::NoEviction, + samples: 5, + }) + .unwrap(); + let guard = spawn_server_with_engine(engine); + let mut stream = connect(&guard.addr); + + // First SET fits. + send(&mut stream, &build_request(&[b"SET", b"k", b"v"])); + // Second SET with a large value should bust the cap. + let payload = vec![b'x'; 256]; + let reply = send(&mut stream, &build_request(&[b"SET", b"big", &payload])); + let text = String::from_utf8_lossy(&reply); + assert!(text.starts_with('-'), "expected error reply, got {text:?}"); +} + +#[test] +fn allkeys_lru_evicts_over_the_wire() { + let engine = KvEngine::new(); + // Room for two small entries. + engine + .set_eviction_config(EvictionConfig { + max_memory: 2 * (1 + 1 + 48), + policy: EvictionPolicy::AllKeysLru, + samples: 10, + }) + .unwrap(); + let guard = spawn_server_with_engine(engine); + let mut stream = connect(&guard.addr); + + send(&mut stream, &build_request(&[b"SET", b"a", b"1"])); + thread::sleep(Duration::from_millis(5)); + send(&mut stream, &build_request(&[b"SET", b"b", b"2"])); + thread::sleep(Duration::from_millis(5)); + // Touch `a` so `b` becomes the LRU victim. + send(&mut stream, &build_request(&[b"GET", b"a"])); + thread::sleep(Duration::from_millis(5)); + send(&mut stream, &build_request(&[b"SET", b"c", b"3"])); + + // `b` should have been evicted; both `a` and `c` should be alive. + let reply_b = send(&mut stream, &build_request(&[b"EXISTS", b"b"])); + assert_eq!(reply_b, b":0\r\n"); + let reply_a = send(&mut stream, &build_request(&[b"EXISTS", b"a"])); + assert_eq!(reply_a, b":1\r\n"); + let reply_c = send(&mut stream, &build_request(&[b"EXISTS", b"c"])); + assert_eq!(reply_c, b":1\r\n"); +}