diff --git a/src/main.rs b/src/main.rs index 9958e04..4a602b9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,6 +10,7 @@ use ferrum_kv::network::server::ServerConfig; use ferrum_kv::network::shutdown::Shutdown; use ferrum_kv::persistence::AofWriter; use ferrum_kv::storage::engine::KvEngine; +use ferrum_kv::storage::expire; use crate::cli::{CliArgs, Invocation, USAGE}; @@ -83,9 +84,13 @@ fn main() -> ExitCode { info!("maxclients: {}", server_config.max_clients); } - if let Err(e) = - ferrum_kv::network::server::run_listener(listener, engine, shutdown, server_config) - { + let expire_handle = expire::spawn(engine.clone(), shutdown.clone()); + + let server_result = + ferrum_kv::network::server::run_listener(listener, engine, shutdown, server_config); + expire_handle.shutdown(); + + if let Err(e) = server_result { error!("server error: {e}"); return ExitCode::FAILURE; } diff --git a/src/network/server.rs b/src/network/server.rs index c13ad93..4e4cc7a 100644 --- a/src/network/server.rs +++ b/src/network/server.rs @@ -11,7 +11,7 @@ use crate::error::FerrumError; use crate::network::shutdown::Shutdown; use crate::protocol::encoder; use crate::protocol::parser::{self, Command, FrameParse}; -use crate::storage::engine::KvEngine; +use crate::storage::engine::{KvEngine, TtlStatus, current_epoch_ms}; /// Initial capacity of each per-connection read buffer. /// @@ -306,6 +306,71 @@ pub fn execute_command(cmd: Command, engine: &KvEngine, out: &mut Vec) { Ok(()) => encoder::encode_simple_string(out, "OK"), Err(e) => write_ferrum_error(out, &e), }, + Command::Expire { key, seconds } => { + let reply = expire_reply(engine, &key, checked_seconds_to_ms(seconds)); + write_bool_integer(out, reply); + } + Command::PExpire { key, millis } => { + let reply = expire_reply(engine, &key, Some(millis)); + write_bool_integer(out, reply); + } + Command::PExpireAt { key, abs_epoch_ms } => match engine.expire_at_ms(&key, abs_epoch_ms) { + Ok(true) => encoder::encode_integer(out, 1), + Ok(false) => encoder::encode_integer(out, 0), + Err(e) => write_ferrum_error(out, &e), + }, + Command::Persist { key } => match engine.persist(&key) { + Ok(true) => encoder::encode_integer(out, 1), + Ok(false) => encoder::encode_integer(out, 0), + Err(e) => write_ferrum_error(out, &e), + }, + Command::Ttl { key } => match engine.ttl_ms(&key) { + Ok(TtlStatus::Missing) => encoder::encode_integer(out, -2), + Ok(TtlStatus::NoExpire) => encoder::encode_integer(out, -1), + Ok(TtlStatus::Millis(ms)) => encoder::encode_integer(out, (ms + 999) / 1000), + Err(e) => write_ferrum_error(out, &e), + }, + Command::PTtl { key } => match engine.ttl_ms(&key) { + Ok(TtlStatus::Missing) => encoder::encode_integer(out, -2), + Ok(TtlStatus::NoExpire) => encoder::encode_integer(out, -1), + Ok(TtlStatus::Millis(ms)) => encoder::encode_integer(out, ms), + Err(e) => write_ferrum_error(out, &e), + }, + } +} + +/// Converts an `EXPIRE` second delta to milliseconds, saturating on overflow. +/// +/// A `None` return means the caller should treat the request as "delete this +/// key right now" — which is how [`KvEngine::expire_at_ms`] interprets an +/// already-past absolute timestamp. +fn checked_seconds_to_ms(seconds: i64) -> Option { + seconds.checked_mul(1_000) +} + +/// Computes the absolute epoch-millisecond deadline for `EXPIRE`/`PEXPIRE` +/// and forwards it to the engine. +/// +/// A delta that overflows `i64` when expressed in milliseconds (only possible +/// with `EXPIRE` and astronomically large second counts) is treated the same +/// way as an in-the-past deadline: the key is dropped on the spot. That keeps +/// the wire contract simple — either the key existed and was updated +/// (`:1`), or it did not (`:0`). +fn expire_reply(engine: &KvEngine, key: &[u8], delta_ms: Option) -> Result { + let now_ms = current_epoch_ms(); + let abs_ms = match delta_ms { + Some(d) => now_ms.saturating_add(d), + None => now_ms, // treat overflow as "expire immediately" + }; + engine.expire_at_ms(key, abs_ms) +} + +/// Writes a `Result` as a RESP integer (`0`/`1`) or as an error. +fn write_bool_integer(out: &mut Vec, reply: Result) { + match reply { + Ok(true) => encoder::encode_integer(out, 1), + Ok(false) => encoder::encode_integer(out, 0), + Err(e) => write_ferrum_error(out, &e), } } diff --git a/src/persistence/replay.rs b/src/persistence/replay.rs index 37cc936..2362959 100644 --- a/src/persistence/replay.rs +++ b/src/persistence/replay.rs @@ -363,6 +363,29 @@ fn apply_record(engine: &KvEngine, parts: &[Vec]) -> Result<(), ApplyError> } engine.flushdb().map_err(ApplyError::Engine) } + b"PEXPIREAT" => { + if parts.len() != 3 { + return Err(ApplyError::Arity(cmd_name(cmd))); + } + let ts_text = + std::str::from_utf8(&parts[2]).map_err(|_| ApplyError::Arity(cmd_name(cmd)))?; + let abs_ms: i64 = ts_text + .parse() + .map_err(|_| ApplyError::Arity(cmd_name(cmd)))?; + engine + .expire_at_ms(&parts[1], abs_ms) + .map(|_| ()) + .map_err(ApplyError::Engine) + } + b"PERSIST" => { + if parts.len() != 2 { + return Err(ApplyError::Arity(cmd_name(cmd))); + } + engine + .persist(&parts[1]) + .map(|_| ()) + .map_err(ApplyError::Engine) + } _ => Err(ApplyError::Unknown(cmd_name(cmd))), } } @@ -499,4 +522,73 @@ mod tests { assert_eq!(stats, ReplayStats::default()); let _ = fs::remove_file(&path); } + + #[test] + fn replays_pexpireat_and_expires_key_after_deadline() { + use crate::storage::engine::TtlStatus; + + let path = tmp_path("pexpireat"); + let now_ms = crate::storage::engine::current_epoch_ms(); + let abs_ms = now_ms + 60_000; + let abs_text = abs_ms.to_string(); + let mut content = String::new(); + content.push_str("*3\r\n$3\r\nSET\r\n$1\r\nk\r\n$1\r\nv\r\n"); + content.push_str(&format!( + "*3\r\n$9\r\nPEXPIREAT\r\n$1\r\nk\r\n${}\r\n{}\r\n", + abs_text.len(), + abs_text, + )); + fs::write(&path, &content).unwrap(); + + let engine = KvEngine::new(); + let stats = replay(&path, &engine).unwrap(); + assert_eq!(stats.applied, 2); + assert!(matches!(engine.ttl_ms(b"k").unwrap(), TtlStatus::Millis(_))); + let _ = fs::remove_file(&path); + } + + #[test] + fn replay_drops_keys_whose_pexpireat_is_already_past() { + let path = tmp_path("pexpireat-past"); + let now_ms = crate::storage::engine::current_epoch_ms(); + let abs_ms = now_ms - 1_000; // already expired + let abs_text = abs_ms.to_string(); + let mut content = String::new(); + content.push_str("*3\r\n$3\r\nSET\r\n$1\r\nk\r\n$1\r\nv\r\n"); + content.push_str(&format!( + "*3\r\n$9\r\nPEXPIREAT\r\n$1\r\nk\r\n${}\r\n{}\r\n", + abs_text.len(), + abs_text, + )); + fs::write(&path, &content).unwrap(); + + let engine = KvEngine::new(); + replay(&path, &engine).unwrap(); + assert_eq!(engine.get(b"k").unwrap(), None); + let _ = fs::remove_file(&path); + } + + #[test] + fn replays_persist_and_clears_ttl() { + use crate::storage::engine::TtlStatus; + + let path = tmp_path("persist"); + let now_ms = crate::storage::engine::current_epoch_ms(); + let abs_ms = now_ms + 60_000; + let abs_text = abs_ms.to_string(); + let mut content = String::new(); + content.push_str("*3\r\n$3\r\nSET\r\n$1\r\nk\r\n$1\r\nv\r\n"); + content.push_str(&format!( + "*3\r\n$9\r\nPEXPIREAT\r\n$1\r\nk\r\n${}\r\n{}\r\n", + abs_text.len(), + abs_text, + )); + content.push_str("*2\r\n$7\r\nPERSIST\r\n$1\r\nk\r\n"); + fs::write(&path, &content).unwrap(); + + let engine = KvEngine::new(); + replay(&path, &engine).unwrap(); + assert!(matches!(engine.ttl_ms(b"k").unwrap(), TtlStatus::NoExpire)); + let _ = fs::remove_file(&path); + } } diff --git a/src/persistence/writer.rs b/src/persistence/writer.rs index 8f7f2d6..d83733a 100644 --- a/src/persistence/writer.rs +++ b/src/persistence/writer.rs @@ -92,6 +92,22 @@ impl AofWriter { self.append(&[b"FLUSHDB"]) } + /// Appends a `PEXPIREAT key abs_epoch_ms` entry to the log. + /// + /// The absolute millisecond timestamp is recorded rather than a relative + /// offset so replay stays correct regardless of how long the log has been + /// sitting on disk. Any already-past timestamp encountered during replay + /// makes the key be dropped immediately. + pub fn append_pexpireat(&self, key: &[u8], abs_epoch_ms: i64) -> Result<(), FerrumError> { + let ts = abs_epoch_ms.to_string(); + self.append(&[b"PEXPIREAT", key, ts.as_bytes()]) + } + + /// Appends a `PERSIST key` entry to the log. + pub fn append_persist(&self, key: &[u8]) -> Result<(), FerrumError> { + self.append(&[b"PERSIST", key]) + } + fn append(&self, parts: &[&[u8]]) -> Result<(), FerrumError> { let bytes = encode_command(parts); self.write_bytes(&bytes) diff --git a/src/protocol/parser.rs b/src/protocol/parser.rs index 6d5a637..2388bb4 100644 --- a/src/protocol/parser.rs +++ b/src/protocol/parser.rs @@ -33,6 +33,19 @@ pub enum Command { DbSize, /// `FLUSHDB`, which removes all keys. FlushDb, + /// `EXPIRE key seconds` — set TTL in seconds. + Expire { key: Vec, seconds: i64 }, + /// `PEXPIRE key milliseconds` — set TTL in milliseconds. + PExpire { key: Vec, millis: i64 }, + /// `PEXPIREAT key ms-timestamp` — set TTL as an absolute + /// Unix epoch millisecond timestamp. + PExpireAt { key: Vec, abs_epoch_ms: i64 }, + /// `PERSIST key` — remove any TTL. + Persist { key: Vec }, + /// `TTL key` — remaining TTL in whole seconds. + Ttl { key: Vec }, + /// `PTTL key` — remaining TTL in milliseconds. + PTtl { key: Vec }, } /// Outcome of attempting to parse a single RESP2 frame from a byte buffer. @@ -361,6 +374,57 @@ fn build_command(parts: Vec>) -> Result { } Ok(Command::FlushDb) } + b"EXPIRE" => { + if args.len() != 2 { + return Err(FerrumError::WrongArity { cmd: "EXPIRE" }); + } + let mut it = args.into_iter(); + let key = it.next().unwrap(); + let seconds = parse_integer_argument(&it.next().unwrap(), "EXPIRE")?; + Ok(Command::Expire { key, seconds }) + } + b"PEXPIRE" => { + if args.len() != 2 { + return Err(FerrumError::WrongArity { cmd: "PEXPIRE" }); + } + let mut it = args.into_iter(); + let key = it.next().unwrap(); + let millis = parse_integer_argument(&it.next().unwrap(), "PEXPIRE")?; + Ok(Command::PExpire { key, millis }) + } + b"PEXPIREAT" => { + if args.len() != 2 { + return Err(FerrumError::WrongArity { cmd: "PEXPIREAT" }); + } + let mut it = args.into_iter(); + let key = it.next().unwrap(); + let abs_epoch_ms = parse_integer_argument(&it.next().unwrap(), "PEXPIREAT")?; + Ok(Command::PExpireAt { key, abs_epoch_ms }) + } + b"PERSIST" => { + if args.len() != 1 { + return Err(FerrumError::WrongArity { cmd: "PERSIST" }); + } + Ok(Command::Persist { + key: args.into_iter().next().unwrap(), + }) + } + b"TTL" => { + if args.len() != 1 { + return Err(FerrumError::WrongArity { cmd: "TTL" }); + } + Ok(Command::Ttl { + key: args.into_iter().next().unwrap(), + }) + } + b"PTTL" => { + if args.len() != 1 { + return Err(FerrumError::WrongArity { cmd: "PTTL" }); + } + Ok(Command::PTtl { + key: args.into_iter().next().unwrap(), + }) + } _ => Err(FerrumError::UnknownCommand( String::from_utf8_lossy(&name).into_owned(), )), @@ -779,4 +843,60 @@ mod frame_tests { let err = parse_frame(b"*1000000\r\n").unwrap_err(); assert!(matches!(err, FerrumError::ParseError(_))); } + + #[test] + fn parses_expire_and_ttl_family() { + assert_eq!( + parse_exact(b"*3\r\n$6\r\nEXPIRE\r\n$1\r\nk\r\n$2\r\n60\r\n"), + Command::Expire { + key: b"k".to_vec(), + seconds: 60, + } + ); + assert_eq!( + parse_exact(b"*3\r\n$7\r\nPEXPIRE\r\n$1\r\nk\r\n$4\r\n1500\r\n"), + Command::PExpire { + key: b"k".to_vec(), + millis: 1500, + } + ); + assert_eq!( + parse_exact(b"*3\r\n$9\r\nPEXPIREAT\r\n$1\r\nk\r\n$13\r\n1700000000000\r\n"), + Command::PExpireAt { + key: b"k".to_vec(), + abs_epoch_ms: 1_700_000_000_000, + } + ); + assert_eq!( + parse_exact(b"*2\r\n$7\r\nPERSIST\r\n$1\r\nk\r\n"), + Command::Persist { key: b"k".to_vec() } + ); + assert_eq!( + parse_exact(b"*2\r\n$3\r\nTTL\r\n$1\r\nk\r\n"), + Command::Ttl { key: b"k".to_vec() } + ); + assert_eq!( + parse_exact(b"*2\r\n$4\r\nPTTL\r\n$1\r\nk\r\n"), + Command::PTtl { key: b"k".to_vec() } + ); + } + + #[test] + fn expire_with_non_integer_is_invalid() { + let (err, _) = match parse_frame(b"*3\r\n$6\r\nEXPIRE\r\n$1\r\nk\r\n$3\r\nabc\r\n").unwrap() + { + FrameParse::Invalid { error, consumed } => (error, consumed), + other => panic!("expected Invalid, got {other:?}"), + }; + assert!(matches!(err, FerrumError::ParseError(_))); + } + + #[test] + fn ttl_without_key_is_wrong_arity() { + let err = match parse_frame(b"*1\r\n$3\r\nTTL\r\n").unwrap() { + FrameParse::Invalid { error, .. } => error, + other => panic!("expected Invalid, got {other:?}"), + }; + assert!(matches!(err, FerrumError::WrongArity { cmd: "TTL" })); + } } diff --git a/src/storage/engine.rs b/src/storage/engine.rs index 7692bc4..42f9cee 100644 --- a/src/storage/engine.rs +++ b/src/storage/engine.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; use std::sync::{Arc, RwLock}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use log::warn; @@ -12,6 +13,30 @@ pub const KEY_MAX_BYTES: usize = 64 * 1024; /// Maximum allowed value size in bytes (16 MiB). pub const VALUE_MAX_BYTES: usize = 16 * 1024 * 1024; +/// A value stored in the engine, together with its optional expiration. +/// +/// `expire_at` is a monotonic deadline derived from [`Instant::now`] at the +/// time the TTL was installed. A value whose deadline is `<= Instant::now()` +/// is considered expired and must be treated as absent by every read path. +#[derive(Clone)] +struct ValueEntry { + data: Vec, + expire_at: Option, +} + +impl ValueEntry { + fn new(data: Vec) -> Self { + Self { + data, + expire_at: None, + } + } + + fn is_expired(&self, now: Instant) -> bool { + matches!(self.expire_at, Some(deadline) if deadline <= now) + } +} + /// A thread-safe key-value storage engine backed by a [`HashMap`]. /// /// Keys and values are stored as `Vec`, making the engine fully @@ -19,17 +44,22 @@ pub const VALUE_MAX_BYTES: usize = 16 * 1024 * 1024; /// preserved verbatim. This matches the contract of the RESP2 bulk string, /// which is already byte-oriented on the wire. /// -/// Mutating commands (`SET`, `DEL`, `FLUSHDB`) are optionally forwarded to an -/// [`AofWriter`] so changes survive a restart. The log is appended while the -/// write lock is still held, which preserves the ordering invariant described -/// in the whitepaper (§8.7): the in-memory state and the on-disk log always -/// agree on the relative order of successful writes. +/// Each value carries an optional expiration deadline. Expired entries are +/// removed lazily on access (Redis-style) and proactively by a background +/// sweeper that calls [`KvEngine::sweep_expired`] periodically. +/// +/// Mutating commands (`SET`, `DEL`, `FLUSHDB`, `EXPIRE`, `PERSIST`) are +/// optionally forwarded to an [`AofWriter`] so changes survive a restart. +/// The log is appended while the write lock is still held, which preserves +/// the ordering invariant described in the whitepaper (§8.7): the in-memory +/// state and the on-disk log always agree on the relative order of +/// successful writes. /// /// Public methods return [`Result`] so lock poisoning can be reported instead /// of causing a panic. #[derive(Clone)] pub struct KvEngine { - store: Arc, Vec>>>, + store: Arc, ValueEntry>>>, aof: Option>, } @@ -59,6 +89,10 @@ impl KvEngine { /// Sets a key-value pair and returns the previous value, if any. /// + /// Matches Redis' default semantics: any existing TTL on the key is + /// cleared by the write. Callers that need the Redis `KEEPTTL` option + /// will have to go through a dedicated future API. + /// /// Returns [`FerrumError::KeyTooLong`] or [`FerrumError::ValueTooLarge`] if /// the configured size limits are exceeded. pub fn set(&self, key: Vec, value: Vec) -> Result>, FerrumError> { @@ -69,8 +103,8 @@ impl KvEngine { if let Some(aof) = &self.aof { log_aof_result("SET", aof.append_set(&key, &value)); } - let previous = store.insert(key, value); - Ok(previous) + let previous = store.insert(key, ValueEntry::new(value)); + Ok(previous.and_then(live_payload)) } /// Sets `key` to `value` only if the key is not already present. @@ -83,13 +117,21 @@ impl KvEngine { validate_value(&value)?; let mut store = self.store.write()?; - if store.contains_key(key.as_slice()) { - return Ok(false); + let now = Instant::now(); + if let Some(entry) = store.get(key.as_slice()) { + if !entry.is_expired(now) { + return Ok(false); + } + // The old value has already expired: remove it and proceed as if + // the key were absent. We intentionally do not log a DEL because + // the subsequent SET, once replayed, overwrites the stale entry + // anyway and skipping the DEL keeps the log shorter. + store.remove(key.as_slice()); } if let Some(aof) = &self.aof { log_aof_result("SETNX", aof.append_set(&key, &value)); } - store.insert(key, value); + store.insert(key, ValueEntry::new(value)); Ok(true) } @@ -110,7 +152,7 @@ impl KvEngine { log_aof_result("MSET", aof.append_set_many(&pairs)); } for (k, v) in pairs { - store.insert(k, v); + store.insert(k, ValueEntry::new(v)); } Ok(()) } @@ -118,13 +160,24 @@ impl KvEngine { /// Returns the stored value for every key in `keys`, preserving order. /// /// Missing keys map to `None` so the caller can serialise them as - /// null bulk strings without ambiguity. + /// null bulk strings without ambiguity. Expired entries are dropped in + /// the same pass so the result reflects live state only. pub fn mget(&self, keys: &[Vec]) -> Result>>, FerrumError> { - let store = self.store.read()?; - Ok(keys - .iter() - .map(|k| store.get(k.as_slice()).cloned()) - .collect()) + let mut store = self.store.write()?; + let now = Instant::now(); + let mut out = Vec::with_capacity(keys.len()); + for key in keys { + match store.get(key.as_slice()) { + Some(entry) if entry.is_expired(now) => { + store.remove(key.as_slice()); + self.log_expire_drop(key); + out.push(None); + } + Some(entry) => out.push(Some(entry.data.clone())), + None => out.push(None), + } + } + Ok(out) } /// Atomically adds `delta` to the integer value at `key` and returns @@ -133,23 +186,32 @@ impl KvEngine { /// A missing key is treated as starting from zero, matching Redis' /// `INCR` semantics. The existing value, if any, must be a decimal /// ASCII integer that fits into an [`i64`]; values outside that range - /// or that fail to parse produce the Redis-standard - /// `-ERR value is not an integer or out of range` reply. Overflow of - /// the addition itself is treated the same way. + /// or that fail to parse produce the Redis-standard reply + /// `-ERR value is not an integer or out of range`. Overflow of the + /// addition itself is treated the same way. + /// + /// The key's existing TTL, if any, is preserved. pub fn incr_by(&self, key: Vec, delta: i64) -> Result { validate_key(&key)?; let mut store = self.store.write()?; - let current = match store.get(key.as_slice()) { - Some(bytes) => { - let text = std::str::from_utf8(bytes).map_err(|_| { + let now = Instant::now(); + let (current, existing_deadline) = match store.get(key.as_slice()) { + Some(entry) if entry.is_expired(now) => { + store.remove(key.as_slice()); + self.log_expire_drop(&key); + (0i64, None) + } + Some(entry) => { + let text = std::str::from_utf8(&entry.data).map_err(|_| { FerrumError::ParseError("value is not an integer or out of range".into()) })?; - text.parse::().map_err(|_| { + let n = text.parse::().map_err(|_| { FerrumError::ParseError("value is not an integer or out of range".into()) - })? + })?; + (n, entry.expire_at) } - None => 0, + None => (0i64, None), }; let new_value = current.checked_add(delta).ok_or_else(|| { @@ -159,21 +221,47 @@ impl KvEngine { if let Some(aof) = &self.aof { log_aof_result("INCRBY", aof.append_set(&key, &serialised)); + // INCR preserves TTL: re-emit the existing deadline so replay + // converges to the same state regardless of record ordering. + if let Some(deadline) = existing_deadline + && let Some(abs_ms) = deadline_to_epoch_ms(deadline, now) + { + log_aof_result("PEXPIREAT", aof.append_pexpireat(&key, abs_ms)); + } } - store.insert(key, serialised); + store.insert( + key, + ValueEntry { + data: serialised, + expire_at: existing_deadline, + }, + ); Ok(new_value) } /// Returns the value for `key`, or `None` if the key does not exist. pub fn get(&self, key: &[u8]) -> Result>, FerrumError> { - let store = self.store.read()?; - Ok(store.get(key).cloned()) + let mut store = self.store.write()?; + let now = Instant::now(); + match store.get(key) { + Some(entry) if entry.is_expired(now) => { + store.remove(key); + self.log_expire_drop(key); + Ok(None) + } + Some(entry) => Ok(Some(entry.data.clone())), + None => Ok(None), + } } /// Deletes `key` and returns `true` if it existed. pub fn del(&self, key: &[u8]) -> Result { let mut store = self.store.write()?; - let existed = store.remove(key).is_some(); + let now = Instant::now(); + let existed = match store.remove(key) { + Some(entry) => !entry.is_expired(now), + None => false, + }; if existed && let Some(aof) = &self.aof { log_aof_result("DEL", aof.append_del(key)); } @@ -187,15 +275,18 @@ impl KvEngine { /// atomic from an observer's point of view: concurrent readers see /// either all deletions or none of them. Persisted log records are /// appended only for keys that were actually removed, mirroring - /// Redis' behaviour. + /// Redis' behaviour. Already-expired keys do not count as removed. pub fn del_many(&self, keys: &[Vec]) -> Result { if keys.is_empty() { return Ok(0); } let mut store = self.store.write()?; + let now = Instant::now(); let mut removed: Vec<&[u8]> = Vec::with_capacity(keys.len()); for key in keys { - if store.remove(key.as_slice()).is_some() { + if let Some(entry) = store.remove(key.as_slice()) + && !entry.is_expired(now) + { removed.push(key.as_slice()); } } @@ -207,52 +298,91 @@ impl KvEngine { Ok(removed.len()) } - /// Returns `true` if `key` exists. + /// Returns `true` if `key` exists and has not expired. pub fn exists(&self, key: &[u8]) -> Result { - let store = self.store.read()?; - Ok(store.contains_key(key)) + let mut store = self.store.write()?; + let now = Instant::now(); + match store.get(key) { + Some(entry) if entry.is_expired(now) => { + store.remove(key); + self.log_expire_drop(key); + Ok(false) + } + Some(_) => Ok(true), + None => Ok(false), + } } /// Returns the number of keys currently stored. + /// + /// Already-expired keys still pending lazy cleanup are excluded so + /// callers see a value consistent with `EXISTS`. pub fn dbsize(&self) -> Result { let store = self.store.read()?; - Ok(store.len()) + let now = Instant::now(); + Ok(store.values().filter(|v| !v.is_expired(now)).count()) } /// Appends `suffix` to the value at `key` and returns the new length. /// /// If `key` is absent, the command behaves like `SET` with an empty /// initial value (the same contract as Redis). The resulting value is - /// subject to the usual size validation, and the AOF records the new - /// full value with a `SET` entry so that replay is guaranteed to - /// converge to the same state regardless of history. + /// subject to the usual size validation. The existing TTL, if any, is + /// preserved and re-emitted to the AOF so replay converges regardless + /// of record ordering. pub fn append(&self, key: Vec, suffix: Vec) -> Result { validate_key(&key)?; let mut store = self.store.write()?; - let new_value = match store.get(key.as_slice()) { - Some(existing) => { - let mut buf = Vec::with_capacity(existing.len() + suffix.len()); - buf.extend_from_slice(existing); + let now = Instant::now(); + let (new_value, existing_deadline) = match store.get(key.as_slice()) { + Some(entry) if entry.is_expired(now) => { + store.remove(key.as_slice()); + self.log_expire_drop(&key); + (suffix, None) + } + Some(entry) => { + let mut buf = Vec::with_capacity(entry.data.len() + suffix.len()); + buf.extend_from_slice(&entry.data); buf.extend_from_slice(&suffix); - buf + (buf, entry.expire_at) } - None => suffix, + None => (suffix, None), }; validate_value(&new_value)?; if let Some(aof) = &self.aof { log_aof_result("APPEND", aof.append_set(&key, &new_value)); + if let Some(deadline) = existing_deadline + && let Some(abs_ms) = deadline_to_epoch_ms(deadline, now) + { + log_aof_result("PEXPIREAT", aof.append_pexpireat(&key, abs_ms)); + } } let new_len = new_value.len(); - store.insert(key, new_value); + store.insert( + key, + ValueEntry { + data: new_value, + expire_at: existing_deadline, + }, + ); Ok(new_len) } /// Returns the byte length of the value at `key`, or `0` if absent. pub fn strlen(&self, key: &[u8]) -> Result { - let store = self.store.read()?; - Ok(store.get(key).map(|v| v.len()).unwrap_or(0)) + let mut store = self.store.write()?; + let now = Instant::now(); + match store.get(key) { + Some(entry) if entry.is_expired(now) => { + store.remove(key); + self.log_expire_drop(key); + Ok(0) + } + Some(entry) => Ok(entry.data.len()), + None => Ok(0), + } } /// Removes all keys from the store. @@ -264,6 +394,224 @@ impl KvEngine { } Ok(()) } + + /// Installs an absolute expiration time on `key`, measured in Unix + /// epoch milliseconds. + /// + /// Returns `true` if the key exists and the deadline was recorded. + /// Returns `false` when the key is absent or has already expired, + /// matching the Redis semantics of `EXPIRE`/`PEXPIREAT` returning `0`. + /// + /// A deadline in the past (`abs_epoch_ms <= now`) causes the key to + /// be removed immediately and an accompanying `DEL` to be logged, + /// keeping the AOF idempotent across replays. + pub fn expire_at_ms(&self, key: &[u8], abs_epoch_ms: i64) -> Result { + let mut store = self.store.write()?; + let now_instant = Instant::now(); + let now_ms = current_epoch_ms(); + + // Drop the entry if it is already expired under its current TTL. + if let Some(entry) = store.get(key) + && entry.is_expired(now_instant) + { + store.remove(key); + self.log_expire_drop(key); + } + + if !store.contains_key(key) { + return Ok(false); + } + + if abs_epoch_ms <= now_ms { + store.remove(key); + if let Some(aof) = &self.aof { + log_aof_result("DEL", aof.append_del(key)); + } + return Ok(true); + } + + let delta_ms = (abs_epoch_ms - now_ms) as u64; + let deadline = now_instant + Duration::from_millis(delta_ms); + if let Some(entry) = store.get_mut(key) { + entry.expire_at = Some(deadline); + } + if let Some(aof) = &self.aof { + log_aof_result("PEXPIREAT", aof.append_pexpireat(key, abs_epoch_ms)); + } + Ok(true) + } + + /// Removes any TTL from `key`. + /// + /// Returns `true` only when the key existed **and** had a TTL before + /// the call — matching Redis' `PERSIST` return semantics. + pub fn persist(&self, key: &[u8]) -> Result { + let mut store = self.store.write()?; + let now = Instant::now(); + + if let Some(entry) = store.get(key) + && entry.is_expired(now) + { + store.remove(key); + self.log_expire_drop(key); + return Ok(false); + } + + let Some(entry) = store.get_mut(key) else { + return Ok(false); + }; + if entry.expire_at.is_none() { + return Ok(false); + } + entry.expire_at = None; + if let Some(aof) = &self.aof { + log_aof_result("PERSIST", aof.append_persist(key)); + } + Ok(true) + } + + /// Returns the remaining TTL for `key` in milliseconds. + /// + /// * `Ok(TtlStatus::Missing)` — key does not exist (Redis reports `-2`). + /// * `Ok(TtlStatus::NoExpire)` — key exists without a TTL (Redis `-1`). + /// * `Ok(TtlStatus::Millis(n))` — `n` milliseconds remaining. + pub fn ttl_ms(&self, key: &[u8]) -> Result { + let mut store = self.store.write()?; + let now = Instant::now(); + match store.get(key) { + None => Ok(TtlStatus::Missing), + Some(entry) if entry.is_expired(now) => { + store.remove(key); + self.log_expire_drop(key); + Ok(TtlStatus::Missing) + } + Some(entry) => match entry.expire_at { + None => Ok(TtlStatus::NoExpire), + Some(deadline) => { + let remaining = deadline.saturating_duration_since(now); + Ok(TtlStatus::Millis(remaining.as_millis() as i64)) + } + }, + } + } + + /// Proactively removes up to `sample` expired entries. + /// + /// Sampling is random — the first `sample` keys yielded by the map's + /// iteration order are checked. This mirrors Redis' active expiration + /// loop and is the primary caller of the `ferrum-expire` background + /// thread. Returns the number of entries actually evicted. + /// + /// When more than 25% of the sample was expired the caller is expected + /// to invoke this method again immediately; the signal is surfaced via + /// the returned fraction so the scheduler can make that decision. + pub fn sweep_expired(&self, sample: usize) -> Result { + if sample == 0 { + return Ok(SweepStats { + examined: 0, + evicted: 0, + }); + } + let mut store = self.store.write()?; + let now = Instant::now(); + + let mut victims: Vec> = Vec::new(); + let mut examined = 0usize; + for (key, entry) in store.iter() { + if examined >= sample { + break; + } + examined += 1; + if entry.is_expired(now) { + victims.push(key.clone()); + } + } + + let evicted = victims.len(); + for key in &victims { + store.remove(key.as_slice()); + if let Some(aof) = &self.aof { + log_aof_result("DEL", aof.append_del(key)); + } + } + + Ok(SweepStats { examined, evicted }) + } + + /// Logs a DEL record caused by lazy expiration. + /// + /// Kept separate from `del()` so call sites stay terse; callers must + /// already hold the write lock. + fn log_expire_drop(&self, key: &[u8]) { + if let Some(aof) = &self.aof { + log_aof_result("DEL", aof.append_del(key)); + } + } +} + +/// Outcome of a single call to [`KvEngine::sweep_expired`]. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct SweepStats { + /// Number of entries inspected during the sweep. + pub examined: usize, + /// Number of entries that were actually expired and removed. + pub evicted: usize, +} + +impl SweepStats { + /// Returns `true` when the expired ratio warrants a follow-up sweep. + /// + /// Matches Redis' active expiration heuristic: keep running while at + /// least 25% of the sample is expired. + pub fn should_continue(&self) -> bool { + self.examined > 0 && self.evicted * 4 > self.examined + } +} + +/// Tri-state return for [`KvEngine::ttl_ms`]. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TtlStatus { + /// The key does not exist; RESP reply is `:-2`. + Missing, + /// The key exists with no TTL; RESP reply is `:-1`. + NoExpire, + /// Remaining TTL in milliseconds (`>= 0`). + Millis(i64), +} + +/// Returns the payload of `entry` if it has not already expired. +fn live_payload(entry: ValueEntry) -> Option> { + let now = Instant::now(); + if entry.is_expired(now) { + None + } else { + Some(entry.data) + } +} + +/// Current wall-clock time expressed as Unix epoch milliseconds. +/// +/// A system clock earlier than the Unix epoch (extremely unusual in +/// practice) falls back to zero so the engine never panics. +pub(crate) fn current_epoch_ms() -> i64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis() as i64) + .unwrap_or(0) +} + +/// Converts a monotonic `deadline` to a Unix epoch millisecond timestamp. +/// +/// Returns `None` when the deadline is already in the past (the caller will +/// drop the key instead of writing an expiration record). +fn deadline_to_epoch_ms(deadline: Instant, now: Instant) -> Option { + if deadline <= now { + return None; + } + let remaining = deadline - now; + let now_ms = current_epoch_ms(); + let abs_ms = now_ms.saturating_add(remaining.as_millis() as i64); + Some(abs_ms) } fn validate_key(key: &[u8]) -> Result<(), FerrumError> { @@ -790,4 +1138,187 @@ mod tests { assert!(bytes.is_empty()); let _ = fs::remove_file(&path); } + + #[test] + fn expire_at_ms_sets_and_ttl_reports_remaining() { + let engine = KvEngine::new(); + engine.set(b"k".to_vec(), b"v".to_vec()).unwrap(); + + let now = current_epoch_ms(); + assert!(engine.expire_at_ms(b"k", now + 60_000).unwrap()); + + match engine.ttl_ms(b"k").unwrap() { + TtlStatus::Millis(ms) => assert!(ms > 0 && ms <= 60_000), + other => panic!("expected Millis(..), got {other:?}"), + } + } + + #[test] + fn expire_at_ms_returns_false_for_missing_key() { + let engine = KvEngine::new(); + let now = current_epoch_ms(); + assert!(!engine.expire_at_ms(b"missing", now + 1000).unwrap()); + } + + #[test] + fn expire_in_the_past_deletes_key_immediately() { + let engine = KvEngine::new(); + engine.set(b"k".to_vec(), b"v".to_vec()).unwrap(); + + let now = current_epoch_ms(); + assert!(engine.expire_at_ms(b"k", now - 1).unwrap()); + assert_eq!(engine.get(b"k").unwrap(), None); + assert!(matches!(engine.ttl_ms(b"k").unwrap(), TtlStatus::Missing)); + } + + #[test] + fn persist_strips_ttl_only_when_present() { + let engine = KvEngine::new(); + engine.set(b"k".to_vec(), b"v".to_vec()).unwrap(); + assert!(!engine.persist(b"k").unwrap()); + + let now = current_epoch_ms(); + engine.expire_at_ms(b"k", now + 10_000).unwrap(); + assert!(engine.persist(b"k").unwrap()); + assert!(matches!(engine.ttl_ms(b"k").unwrap(), TtlStatus::NoExpire)); + assert!(!engine.persist(b"k").unwrap()); + } + + #[test] + fn ttl_status_for_missing_and_persistent_keys() { + let engine = KvEngine::new(); + assert!(matches!( + engine.ttl_ms(b"missing").unwrap(), + TtlStatus::Missing + )); + engine.set(b"k".to_vec(), b"v".to_vec()).unwrap(); + assert!(matches!(engine.ttl_ms(b"k").unwrap(), TtlStatus::NoExpire)); + } + + #[test] + fn lazy_expiration_drops_key_on_read() { + let engine = KvEngine::new(); + engine.set(b"k".to_vec(), b"v".to_vec()).unwrap(); + let deadline = Instant::now() + Duration::from_millis(20); + { + let mut store = engine.store.write().unwrap(); + if let Some(entry) = store.get_mut(b"k".as_slice()) { + entry.expire_at = Some(deadline); + } + } + std::thread::sleep(Duration::from_millis(40)); + assert_eq!(engine.get(b"k").unwrap(), None); + assert!(!engine.exists(b"k").unwrap()); + assert_eq!(engine.dbsize().unwrap(), 0); + } + + #[test] + fn set_overwrite_clears_existing_ttl() { + let engine = KvEngine::new(); + engine.set(b"k".to_vec(), b"v".to_vec()).unwrap(); + let now = current_epoch_ms(); + engine.expire_at_ms(b"k", now + 60_000).unwrap(); + engine.set(b"k".to_vec(), b"v2".to_vec()).unwrap(); + assert!(matches!(engine.ttl_ms(b"k").unwrap(), TtlStatus::NoExpire)); + } + + #[test] + fn incr_preserves_ttl() { + let engine = KvEngine::new(); + engine.set(b"k".to_vec(), b"1".to_vec()).unwrap(); + let now = current_epoch_ms(); + engine.expire_at_ms(b"k", now + 60_000).unwrap(); + engine.incr_by(b"k".to_vec(), 5).unwrap(); + match engine.ttl_ms(b"k").unwrap() { + TtlStatus::Millis(ms) => assert!(ms > 0), + other => panic!("expected Millis(..), got {other:?}"), + } + } + + #[test] + fn append_preserves_ttl() { + let engine = KvEngine::new(); + engine.set(b"k".to_vec(), b"hi".to_vec()).unwrap(); + let now = current_epoch_ms(); + engine.expire_at_ms(b"k", now + 60_000).unwrap(); + engine.append(b"k".to_vec(), b"!".to_vec()).unwrap(); + match engine.ttl_ms(b"k").unwrap() { + TtlStatus::Millis(ms) => assert!(ms > 0), + other => panic!("expected Millis(..), got {other:?}"), + } + } + + #[test] + fn expire_logs_pexpireat_record_to_aof() { + let path = tmp_aof_path("expire"); + let (engine, writer) = engine_with_aof(&path); + + engine.set(b"k".to_vec(), b"v".to_vec()).unwrap(); + let abs_ms = current_epoch_ms() + 60_000; + engine.expire_at_ms(b"k", abs_ms).unwrap(); + drop(engine); + drop(writer); + + let bytes = fs::read(&path).unwrap(); + let text = String::from_utf8_lossy(&bytes); + assert!( + text.contains("PEXPIREAT"), + "missing PEXPIREAT record in {text:?}" + ); + assert!(text.contains(&abs_ms.to_string())); + let _ = fs::remove_file(&path); + } + + #[test] + fn persist_logs_persist_record_to_aof() { + let path = tmp_aof_path("persist"); + let (engine, writer) = engine_with_aof(&path); + + engine.set(b"k".to_vec(), b"v".to_vec()).unwrap(); + let abs_ms = current_epoch_ms() + 60_000; + engine.expire_at_ms(b"k", abs_ms).unwrap(); + assert!(engine.persist(b"k").unwrap()); + drop(engine); + drop(writer); + + let bytes = fs::read(&path).unwrap(); + let text = String::from_utf8_lossy(&bytes); + assert!( + text.contains("PERSIST"), + "missing PERSIST record in {text:?}" + ); + let _ = fs::remove_file(&path); + } + + #[test] + fn sweep_removes_expired_entries_and_logs_del() { + let path = tmp_aof_path("sweep"); + let (engine, writer) = engine_with_aof(&path); + + engine.set(b"k".to_vec(), b"v".to_vec()).unwrap(); + { + let mut store = engine.store.write().unwrap(); + if let Some(entry) = store.get_mut(b"k".as_slice()) { + entry.expire_at = Some(Instant::now() - Duration::from_millis(1)); + } + } + + let stats = engine.sweep_expired(16).unwrap(); + assert_eq!(stats.evicted, 1); + assert_eq!(engine.dbsize().unwrap(), 0); + + drop(engine); + drop(writer); + let bytes = fs::read(&path).unwrap(); + assert!(String::from_utf8_lossy(&bytes).contains("DEL")); + let _ = fs::remove_file(&path); + } + + #[test] + fn sweep_respects_zero_sample() { + let engine = KvEngine::new(); + let stats = engine.sweep_expired(0).unwrap(); + assert_eq!(stats.examined, 0); + assert_eq!(stats.evicted, 0); + } } diff --git a/src/storage/expire.rs b/src/storage/expire.rs new file mode 100644 index 0000000..74748e6 --- /dev/null +++ b/src/storage/expire.rs @@ -0,0 +1,194 @@ +//! Active expiration sweeper. +//! +//! Runs on its own thread, periodically sampling keys from the engine and +//! removing those whose TTL has elapsed. The sampler mirrors Redis' adaptive +//! strategy: sample 20 keys every 100ms; when more than 25% of the sample +//! was already expired, run another round immediately instead of sleeping. +//! +//! The thread is cooperatively shut down via [`Shutdown`], matching the rest +//! of the server. + +use std::sync::Arc; +use std::sync::{Condvar, Mutex}; +use std::thread::{self, JoinHandle}; +use std::time::Duration; + +use log::{debug, warn}; + +use crate::network::shutdown::Shutdown; +use crate::storage::engine::KvEngine; + +/// Default sample size per sweep tick. +pub const DEFAULT_SAMPLE: usize = 20; + +/// Default interval between sweep ticks. +pub const DEFAULT_INTERVAL: Duration = Duration::from_millis(100); + +/// Spawns the background expiration sweeper. +/// +/// Returns a [`ExpireSweeperHandle`] whose [`ExpireSweeperHandle::shutdown`] +/// waits for the worker thread to exit. The worker also self-terminates when +/// the shared [`Shutdown`] flag flips, so callers that forward the server's +/// shutdown signal do not need to call the handle explicitly — dropping it +/// will still join the thread. +pub fn spawn(engine: KvEngine, shutdown: Shutdown) -> ExpireSweeperHandle { + spawn_with(engine, shutdown, DEFAULT_SAMPLE, DEFAULT_INTERVAL) +} + +/// Testing-oriented variant that accepts custom sampling parameters. +pub fn spawn_with( + engine: KvEngine, + shutdown: Shutdown, + sample: usize, + interval: Duration, +) -> ExpireSweeperHandle { + let wait = Arc::new(SleepWaker::default()); + let wait_clone = Arc::clone(&wait); + let shutdown_clone = shutdown.clone(); + + let handle = thread::Builder::new() + .name("ferrum-expire".into()) + .spawn(move || run(engine, shutdown_clone, sample, interval, wait_clone)) + .expect("failed to spawn ferrum-expire thread"); + + ExpireSweeperHandle { + handle: Some(handle), + wait, + } +} + +/// Joins the sweeper thread on drop, surfacing any thread panic as a warning. +pub struct ExpireSweeperHandle { + handle: Option>, + wait: Arc, +} + +impl ExpireSweeperHandle { + /// Wakes the sweeper and blocks until it exits. + pub fn shutdown(mut self) { + self.wait.wake(); + if let Some(handle) = self.handle.take() + && let Err(e) = handle.join() + { + warn!("ferrum-expire thread panicked: {e:?}"); + } + } +} + +impl Drop for ExpireSweeperHandle { + fn drop(&mut self) { + self.wait.wake(); + if let Some(handle) = self.handle.take() { + let _ = handle.join(); + } + } +} + +/// Interruptible sleep primitive shared between the sweeper and its owner. +#[derive(Default)] +struct SleepWaker { + lock: Mutex, + cvar: Condvar, +} + +impl SleepWaker { + fn wake(&self) { + if let Ok(mut guard) = self.lock.lock() { + *guard = true; + self.cvar.notify_all(); + } + } + + fn sleep(&self, timeout: Duration) { + let guard = match self.lock.lock() { + Ok(g) => g, + Err(poisoned) => poisoned.into_inner(), + }; + let (mut guard, _) = match self.cvar.wait_timeout(guard, timeout) { + Ok(pair) => pair, + Err(poisoned) => { + let pair = poisoned.into_inner(); + (pair.0, pair.1) + } + }; + // Reset so the next tick sleeps again unless explicitly woken. + *guard = false; + } +} + +fn run( + engine: KvEngine, + shutdown: Shutdown, + sample: usize, + interval: Duration, + wait: Arc, +) { + debug!("ferrum-expire: started (sample={sample}, interval={interval:?})"); + while !shutdown.is_triggered() { + // Iterate the adaptive loop a small bounded number of times so one + // busy sweep cannot starve other readers holding the write lock. + for _ in 0..16 { + match engine.sweep_expired(sample) { + Ok(stats) => { + if !stats.should_continue() { + break; + } + } + Err(e) => { + warn!("ferrum-expire: sweep failed: {e}"); + break; + } + } + } + wait.sleep(interval); + } + debug!("ferrum-expire: stopped"); +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Instant; + + #[test] + fn sweeper_removes_expired_keys_in_the_background() { + let engine = KvEngine::new(); + engine.set(b"k".to_vec(), b"v".to_vec()).unwrap(); + // Install a 10 ms TTL. + let now = crate::storage::engine::current_epoch_ms(); + engine.expire_at_ms(b"k", now + 10).unwrap(); + + let shutdown = Shutdown::new(); + let handle = spawn_with( + engine.clone(), + shutdown.clone(), + 8, + Duration::from_millis(5), + ); + + let deadline = Instant::now() + Duration::from_millis(500); + while Instant::now() < deadline { + if !engine.exists(b"k").unwrap() { + break; + } + thread::sleep(Duration::from_millis(5)); + } + + shutdown.trigger(); + handle.shutdown(); + assert!( + !engine.exists(b"k").unwrap(), + "sweeper failed to remove expired key" + ); + } + + #[test] + fn sweeper_honours_shutdown_flag() { + let engine = KvEngine::new(); + let shutdown = Shutdown::new(); + let handle = spawn_with(engine, shutdown.clone(), 8, Duration::from_millis(50)); + shutdown.trigger(); + handle.shutdown(); + // Absence of a hang here is the assertion. + } +} diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 702e611..da81298 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1 +1,2 @@ pub mod engine; +pub mod expire; diff --git a/tests/expire_test.rs b/tests/expire_test.rs new file mode 100644 index 0000000..f1b703a --- /dev/null +++ b/tests/expire_test.rs @@ -0,0 +1,222 @@ +//! End-to-end tests for the TTL / expiration commands. +//! +//! Drives the full server path (RESP2 parser, engine, active sweeper) through +//! a real TCP connection so regressions in wire format, command dispatch, or +//! background expiration all surface here. + +use std::io::{Read, Write}; +use std::net::{TcpListener, TcpStream}; +use std::thread; +use std::time::{Duration, Instant}; + +use ferrum_kv::network::server::{self, ServerConfig}; +use ferrum_kv::network::shutdown::Shutdown; +use ferrum_kv::protocol::encoder; +use ferrum_kv::storage::engine::KvEngine; +use ferrum_kv::storage::expire; + +struct ServerGuard { + addr: String, + shutdown: Shutdown, + _server: thread::JoinHandle<()>, +} + +impl Drop for ServerGuard { + fn drop(&mut self) { + self.shutdown.trigger(); + // Self-connect to unblock accept() so the thread can wind down. + let _ = TcpStream::connect_timeout(&self.addr.parse().unwrap(), Duration::from_millis(200)); + } +} + +/// Spawns a server with an active expiration sweeper that ticks frequently +/// so tests do not need to wait a full 100 ms per iteration. +fn spawn_server_with_sweeper() -> ServerGuard { + let listener = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port"); + let addr = listener.local_addr().expect("local_addr").to_string(); + let engine = KvEngine::new(); + let shutdown = Shutdown::new(); + + // Keep the sweeper handle alive for the server thread's lifetime. + let sweeper_engine = engine.clone(); + let sweeper_shutdown = shutdown.clone(); + let handle = thread::spawn(move || { + let sweeper = expire::spawn_with( + sweeper_engine, + sweeper_shutdown.clone(), + 16, + Duration::from_millis(10), + ); + let _ = server::run_listener(listener, engine, sweeper_shutdown, ServerConfig::default()); + sweeper.shutdown(); + }); + + ServerGuard { + addr, + shutdown, + _server: handle, + } +} + +fn connect(addr: &str) -> TcpStream { + let stream = TcpStream::connect(addr).expect("connect"); + stream + .set_read_timeout(Some(Duration::from_secs(2))) + .expect("set_read_timeout"); + stream + .set_write_timeout(Some(Duration::from_secs(2))) + .expect("set_write_timeout"); + stream +} + +fn build_request(args: &[&[u8]]) -> Vec { + let mut out = Vec::new(); + out.extend_from_slice(format!("*{}\r\n", args.len()).as_bytes()); + for arg in args { + encoder::encode_bulk_string(&mut out, arg); + } + out +} + +fn send(stream: &mut TcpStream, request: &[u8]) -> Vec { + stream.write_all(request).expect("write request"); + read_one_reply(stream) +} + +/// Reads exactly one RESP2 reply from `stream`. +/// +/// Supports simple strings, errors, integers, and bulk strings, which is +/// enough for the TTL command family. +fn read_one_reply(stream: &mut TcpStream) -> Vec { + let mut out = Vec::new(); + let mut byte = [0u8; 1]; + stream.read_exact(&mut byte).expect("read type byte"); + out.push(byte[0]); + match byte[0] { + b'+' | b'-' | b':' => { + read_until_crlf(stream, &mut out); + } + b'$' => { + let mut header = Vec::new(); + read_until_crlf(stream, &mut header); + // header is "\r\n"; parse it to know how many body bytes follow. + let header_str = + std::str::from_utf8(&header[..header.len() - 2]).expect("ascii length prefix"); + let len: i64 = header_str.parse().expect("integer length"); + out.extend_from_slice(&header); + if len >= 0 { + let mut body = vec![0u8; len as usize + 2]; + stream.read_exact(&mut body).expect("read bulk body"); + out.extend_from_slice(&body); + } + } + other => panic!("unexpected RESP type byte {other:#x}"), + } + out +} + +fn read_until_crlf(stream: &mut TcpStream, out: &mut Vec) { + let mut byte = [0u8; 1]; + loop { + stream.read_exact(&mut byte).expect("read byte"); + out.push(byte[0]); + if out.len() >= 2 && out[out.len() - 2] == b'\r' && out[out.len() - 1] == b'\n' { + return; + } + } +} + +#[test] +fn ttl_returns_minus_two_for_missing_key() { + let guard = spawn_server_with_sweeper(); + let mut stream = connect(&guard.addr); + + let reply = send(&mut stream, &build_request(&[b"TTL", b"missing"])); + assert_eq!(reply, b":-2\r\n"); +} + +#[test] +fn ttl_returns_minus_one_for_persistent_key() { + let guard = spawn_server_with_sweeper(); + let mut stream = connect(&guard.addr); + + send(&mut stream, &build_request(&[b"SET", b"k", b"v"])); + let reply = send(&mut stream, &build_request(&[b"TTL", b"k"])); + assert_eq!(reply, b":-1\r\n"); +} + +#[test] +fn expire_and_ttl_roundtrip() { + let guard = spawn_server_with_sweeper(); + let mut stream = connect(&guard.addr); + + send(&mut stream, &build_request(&[b"SET", b"k", b"v"])); + let r = send(&mut stream, &build_request(&[b"EXPIRE", b"k", b"100"])); + assert_eq!(r, b":1\r\n"); + + let ttl = send(&mut stream, &build_request(&[b"TTL", b"k"])); + let text = std::str::from_utf8(&ttl).unwrap(); + assert!(text.starts_with(':') && text.ends_with("\r\n")); + let n: i64 = text[1..text.len() - 2].parse().unwrap(); + assert!((1..=100).contains(&n), "TTL out of range: {n}"); +} + +#[test] +fn persist_removes_ttl() { + let guard = spawn_server_with_sweeper(); + let mut stream = connect(&guard.addr); + + send(&mut stream, &build_request(&[b"SET", b"k", b"v"])); + send(&mut stream, &build_request(&[b"EXPIRE", b"k", b"60"])); + let r = send(&mut stream, &build_request(&[b"PERSIST", b"k"])); + assert_eq!(r, b":1\r\n"); + let ttl = send(&mut stream, &build_request(&[b"TTL", b"k"])); + assert_eq!(ttl, b":-1\r\n"); +} + +#[test] +fn pexpire_triggers_background_eviction() { + let guard = spawn_server_with_sweeper(); + let mut stream = connect(&guard.addr); + + send(&mut stream, &build_request(&[b"SET", b"k", b"v"])); + send(&mut stream, &build_request(&[b"PEXPIRE", b"k", b"50"])); + + // Poll EXISTS until the sweeper removes the key, with a generous ceiling + // so CI jitter does not flake the test. + let deadline = Instant::now() + Duration::from_secs(2); + loop { + let reply = send(&mut stream, &build_request(&[b"EXISTS", b"k"])); + if reply == b":0\r\n" { + break; + } + if Instant::now() >= deadline { + panic!("key was not expired within 2s"); + } + thread::sleep(Duration::from_millis(20)); + } +} + +#[test] +fn expire_with_negative_seconds_deletes_key() { + let guard = spawn_server_with_sweeper(); + let mut stream = connect(&guard.addr); + + send(&mut stream, &build_request(&[b"SET", b"k", b"v"])); + let r = send(&mut stream, &build_request(&[b"EXPIRE", b"k", b"-1"])); + assert_eq!(r, b":1\r\n"); + let reply = send(&mut stream, &build_request(&[b"GET", b"k"])); + assert_eq!(reply, b"$-1\r\n"); +} + +#[test] +fn set_overwrite_clears_ttl_on_wire() { + let guard = spawn_server_with_sweeper(); + let mut stream = connect(&guard.addr); + + send(&mut stream, &build_request(&[b"SET", b"k", b"v"])); + send(&mut stream, &build_request(&[b"EXPIRE", b"k", b"60"])); + send(&mut stream, &build_request(&[b"SET", b"k", b"v2"])); + let ttl = send(&mut stream, &build_request(&[b"TTL", b"k"])); + assert_eq!(ttl, b":-1\r\n"); +}