Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use ferrum_kv::network::server::ServerConfig;
use ferrum_kv::network::shutdown::Shutdown;
use ferrum_kv::persistence::AofWriter;
use ferrum_kv::storage::engine::KvEngine;
use ferrum_kv::storage::expire;

use crate::cli::{CliArgs, Invocation, USAGE};

Expand Down Expand Up @@ -83,9 +84,13 @@ fn main() -> ExitCode {
info!("maxclients: {}", server_config.max_clients);
}

if let Err(e) =
ferrum_kv::network::server::run_listener(listener, engine, shutdown, server_config)
{
let expire_handle = expire::spawn(engine.clone(), shutdown.clone());

let server_result =
ferrum_kv::network::server::run_listener(listener, engine, shutdown, server_config);
expire_handle.shutdown();

if let Err(e) = server_result {
error!("server error: {e}");
return ExitCode::FAILURE;
}
Expand Down
67 changes: 66 additions & 1 deletion src/network/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::error::FerrumError;
use crate::network::shutdown::Shutdown;
use crate::protocol::encoder;
use crate::protocol::parser::{self, Command, FrameParse};
use crate::storage::engine::KvEngine;
use crate::storage::engine::{KvEngine, TtlStatus, current_epoch_ms};

/// Initial capacity of each per-connection read buffer.
///
Expand Down Expand Up @@ -306,6 +306,71 @@ pub fn execute_command(cmd: Command, engine: &KvEngine, out: &mut Vec<u8>) {
Ok(()) => encoder::encode_simple_string(out, "OK"),
Err(e) => write_ferrum_error(out, &e),
},
Command::Expire { key, seconds } => {
let reply = expire_reply(engine, &key, checked_seconds_to_ms(seconds));
write_bool_integer(out, reply);
}
Command::PExpire { key, millis } => {
let reply = expire_reply(engine, &key, Some(millis));
write_bool_integer(out, reply);
}
Command::PExpireAt { key, abs_epoch_ms } => match engine.expire_at_ms(&key, abs_epoch_ms) {
Ok(true) => encoder::encode_integer(out, 1),
Ok(false) => encoder::encode_integer(out, 0),
Err(e) => write_ferrum_error(out, &e),
},
Command::Persist { key } => match engine.persist(&key) {
Ok(true) => encoder::encode_integer(out, 1),
Ok(false) => encoder::encode_integer(out, 0),
Err(e) => write_ferrum_error(out, &e),
},
Command::Ttl { key } => match engine.ttl_ms(&key) {
Ok(TtlStatus::Missing) => encoder::encode_integer(out, -2),
Ok(TtlStatus::NoExpire) => encoder::encode_integer(out, -1),
Ok(TtlStatus::Millis(ms)) => encoder::encode_integer(out, (ms + 999) / 1000),
Err(e) => write_ferrum_error(out, &e),
},
Command::PTtl { key } => match engine.ttl_ms(&key) {
Ok(TtlStatus::Missing) => encoder::encode_integer(out, -2),
Ok(TtlStatus::NoExpire) => encoder::encode_integer(out, -1),
Ok(TtlStatus::Millis(ms)) => encoder::encode_integer(out, ms),
Err(e) => write_ferrum_error(out, &e),
},
}
}

/// Converts an `EXPIRE` second delta to milliseconds, saturating on overflow.
///
/// A `None` return means the caller should treat the request as "delete this
/// key right now" — which is how [`KvEngine::expire_at_ms`] interprets an
/// already-past absolute timestamp.
fn checked_seconds_to_ms(seconds: i64) -> Option<i64> {
seconds.checked_mul(1_000)
}

/// Computes the absolute epoch-millisecond deadline for `EXPIRE`/`PEXPIRE`
/// and forwards it to the engine.
///
/// A delta that overflows `i64` when expressed in milliseconds (only possible
/// with `EXPIRE` and astronomically large second counts) is treated the same
/// way as an in-the-past deadline: the key is dropped on the spot. That keeps
/// the wire contract simple — either the key existed and was updated
/// (`:1`), or it did not (`:0`).
fn expire_reply(engine: &KvEngine, key: &[u8], delta_ms: Option<i64>) -> Result<bool, FerrumError> {
let now_ms = current_epoch_ms();
let abs_ms = match delta_ms {
Some(d) => now_ms.saturating_add(d),
None => now_ms, // treat overflow as "expire immediately"
};
engine.expire_at_ms(key, abs_ms)
}

/// Writes a `Result<bool, _>` as a RESP integer (`0`/`1`) or as an error.
fn write_bool_integer(out: &mut Vec<u8>, reply: Result<bool, FerrumError>) {
match reply {
Ok(true) => encoder::encode_integer(out, 1),
Ok(false) => encoder::encode_integer(out, 0),
Err(e) => write_ferrum_error(out, &e),
}
}

Expand Down
92 changes: 92 additions & 0 deletions src/persistence/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,29 @@ fn apply_record(engine: &KvEngine, parts: &[Vec<u8>]) -> Result<(), ApplyError>
}
engine.flushdb().map_err(ApplyError::Engine)
}
b"PEXPIREAT" => {
if parts.len() != 3 {
return Err(ApplyError::Arity(cmd_name(cmd)));
}
let ts_text =
std::str::from_utf8(&parts[2]).map_err(|_| ApplyError::Arity(cmd_name(cmd)))?;
let abs_ms: i64 = ts_text
.parse()
.map_err(|_| ApplyError::Arity(cmd_name(cmd)))?;
engine
.expire_at_ms(&parts[1], abs_ms)
.map(|_| ())
.map_err(ApplyError::Engine)
}
b"PERSIST" => {
if parts.len() != 2 {
return Err(ApplyError::Arity(cmd_name(cmd)));
}
engine
.persist(&parts[1])
.map(|_| ())
.map_err(ApplyError::Engine)
}
_ => Err(ApplyError::Unknown(cmd_name(cmd))),
}
}
Expand Down Expand Up @@ -499,4 +522,73 @@ mod tests {
assert_eq!(stats, ReplayStats::default());
let _ = fs::remove_file(&path);
}

#[test]
fn replays_pexpireat_and_expires_key_after_deadline() {
use crate::storage::engine::TtlStatus;

let path = tmp_path("pexpireat");
let now_ms = crate::storage::engine::current_epoch_ms();
let abs_ms = now_ms + 60_000;
let abs_text = abs_ms.to_string();
let mut content = String::new();
content.push_str("*3\r\n$3\r\nSET\r\n$1\r\nk\r\n$1\r\nv\r\n");
content.push_str(&format!(
"*3\r\n$9\r\nPEXPIREAT\r\n$1\r\nk\r\n${}\r\n{}\r\n",
abs_text.len(),
abs_text,
));
fs::write(&path, &content).unwrap();

let engine = KvEngine::new();
let stats = replay(&path, &engine).unwrap();
assert_eq!(stats.applied, 2);
assert!(matches!(engine.ttl_ms(b"k").unwrap(), TtlStatus::Millis(_)));
let _ = fs::remove_file(&path);
}

#[test]
fn replay_drops_keys_whose_pexpireat_is_already_past() {
let path = tmp_path("pexpireat-past");
let now_ms = crate::storage::engine::current_epoch_ms();
let abs_ms = now_ms - 1_000; // already expired
let abs_text = abs_ms.to_string();
let mut content = String::new();
content.push_str("*3\r\n$3\r\nSET\r\n$1\r\nk\r\n$1\r\nv\r\n");
content.push_str(&format!(
"*3\r\n$9\r\nPEXPIREAT\r\n$1\r\nk\r\n${}\r\n{}\r\n",
abs_text.len(),
abs_text,
));
fs::write(&path, &content).unwrap();

let engine = KvEngine::new();
replay(&path, &engine).unwrap();
assert_eq!(engine.get(b"k").unwrap(), None);
let _ = fs::remove_file(&path);
}

#[test]
fn replays_persist_and_clears_ttl() {
use crate::storage::engine::TtlStatus;

let path = tmp_path("persist");
let now_ms = crate::storage::engine::current_epoch_ms();
let abs_ms = now_ms + 60_000;
let abs_text = abs_ms.to_string();
let mut content = String::new();
content.push_str("*3\r\n$3\r\nSET\r\n$1\r\nk\r\n$1\r\nv\r\n");
content.push_str(&format!(
"*3\r\n$9\r\nPEXPIREAT\r\n$1\r\nk\r\n${}\r\n{}\r\n",
abs_text.len(),
abs_text,
));
content.push_str("*2\r\n$7\r\nPERSIST\r\n$1\r\nk\r\n");
fs::write(&path, &content).unwrap();

let engine = KvEngine::new();
replay(&path, &engine).unwrap();
assert!(matches!(engine.ttl_ms(b"k").unwrap(), TtlStatus::NoExpire));
let _ = fs::remove_file(&path);
}
}
16 changes: 16 additions & 0 deletions src/persistence/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,22 @@ impl AofWriter {
self.append(&[b"FLUSHDB"])
}

/// Appends a `PEXPIREAT key abs_epoch_ms` entry to the log.
///
/// The absolute millisecond timestamp is recorded rather than a relative
/// offset so replay stays correct regardless of how long the log has been
/// sitting on disk. Any already-past timestamp encountered during replay
/// makes the key be dropped immediately.
pub fn append_pexpireat(&self, key: &[u8], abs_epoch_ms: i64) -> Result<(), FerrumError> {
let ts = abs_epoch_ms.to_string();
self.append(&[b"PEXPIREAT", key, ts.as_bytes()])
}

/// Appends a `PERSIST key` entry to the log.
pub fn append_persist(&self, key: &[u8]) -> Result<(), FerrumError> {
self.append(&[b"PERSIST", key])
}

fn append(&self, parts: &[&[u8]]) -> Result<(), FerrumError> {
let bytes = encode_command(parts);
self.write_bytes(&bytes)
Expand Down
120 changes: 120 additions & 0 deletions src/protocol/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,19 @@ pub enum Command {
DbSize,
/// `FLUSHDB`, which removes all keys.
FlushDb,
/// `EXPIRE key seconds` — set TTL in seconds.
Expire { key: Vec<u8>, seconds: i64 },
/// `PEXPIRE key milliseconds` — set TTL in milliseconds.
PExpire { key: Vec<u8>, millis: i64 },
/// `PEXPIREAT key ms-timestamp` — set TTL as an absolute
/// Unix epoch millisecond timestamp.
PExpireAt { key: Vec<u8>, abs_epoch_ms: i64 },
/// `PERSIST key` — remove any TTL.
Persist { key: Vec<u8> },
/// `TTL key` — remaining TTL in whole seconds.
Ttl { key: Vec<u8> },
/// `PTTL key` — remaining TTL in milliseconds.
PTtl { key: Vec<u8> },
}

/// Outcome of attempting to parse a single RESP2 frame from a byte buffer.
Expand Down Expand Up @@ -361,6 +374,57 @@ fn build_command(parts: Vec<Vec<u8>>) -> Result<Command, FerrumError> {
}
Ok(Command::FlushDb)
}
b"EXPIRE" => {
if args.len() != 2 {
return Err(FerrumError::WrongArity { cmd: "EXPIRE" });
}
let mut it = args.into_iter();
let key = it.next().unwrap();
let seconds = parse_integer_argument(&it.next().unwrap(), "EXPIRE")?;
Ok(Command::Expire { key, seconds })
}
b"PEXPIRE" => {
if args.len() != 2 {
return Err(FerrumError::WrongArity { cmd: "PEXPIRE" });
}
let mut it = args.into_iter();
let key = it.next().unwrap();
let millis = parse_integer_argument(&it.next().unwrap(), "PEXPIRE")?;
Ok(Command::PExpire { key, millis })
}
b"PEXPIREAT" => {
if args.len() != 2 {
return Err(FerrumError::WrongArity { cmd: "PEXPIREAT" });
}
let mut it = args.into_iter();
let key = it.next().unwrap();
let abs_epoch_ms = parse_integer_argument(&it.next().unwrap(), "PEXPIREAT")?;
Ok(Command::PExpireAt { key, abs_epoch_ms })
}
b"PERSIST" => {
if args.len() != 1 {
return Err(FerrumError::WrongArity { cmd: "PERSIST" });
}
Ok(Command::Persist {
key: args.into_iter().next().unwrap(),
})
}
b"TTL" => {
if args.len() != 1 {
return Err(FerrumError::WrongArity { cmd: "TTL" });
}
Ok(Command::Ttl {
key: args.into_iter().next().unwrap(),
})
}
b"PTTL" => {
if args.len() != 1 {
return Err(FerrumError::WrongArity { cmd: "PTTL" });
}
Ok(Command::PTtl {
key: args.into_iter().next().unwrap(),
})
}
_ => Err(FerrumError::UnknownCommand(
String::from_utf8_lossy(&name).into_owned(),
)),
Expand Down Expand Up @@ -779,4 +843,60 @@ mod frame_tests {
let err = parse_frame(b"*1000000\r\n").unwrap_err();
assert!(matches!(err, FerrumError::ParseError(_)));
}

#[test]
fn parses_expire_and_ttl_family() {
assert_eq!(
parse_exact(b"*3\r\n$6\r\nEXPIRE\r\n$1\r\nk\r\n$2\r\n60\r\n"),
Command::Expire {
key: b"k".to_vec(),
seconds: 60,
}
);
assert_eq!(
parse_exact(b"*3\r\n$7\r\nPEXPIRE\r\n$1\r\nk\r\n$4\r\n1500\r\n"),
Command::PExpire {
key: b"k".to_vec(),
millis: 1500,
}
);
assert_eq!(
parse_exact(b"*3\r\n$9\r\nPEXPIREAT\r\n$1\r\nk\r\n$13\r\n1700000000000\r\n"),
Command::PExpireAt {
key: b"k".to_vec(),
abs_epoch_ms: 1_700_000_000_000,
}
);
assert_eq!(
parse_exact(b"*2\r\n$7\r\nPERSIST\r\n$1\r\nk\r\n"),
Command::Persist { key: b"k".to_vec() }
);
assert_eq!(
parse_exact(b"*2\r\n$3\r\nTTL\r\n$1\r\nk\r\n"),
Command::Ttl { key: b"k".to_vec() }
);
assert_eq!(
parse_exact(b"*2\r\n$4\r\nPTTL\r\n$1\r\nk\r\n"),
Command::PTtl { key: b"k".to_vec() }
);
}

#[test]
fn expire_with_non_integer_is_invalid() {
let (err, _) = match parse_frame(b"*3\r\n$6\r\nEXPIRE\r\n$1\r\nk\r\n$3\r\nabc\r\n").unwrap()
{
FrameParse::Invalid { error, consumed } => (error, consumed),
other => panic!("expected Invalid, got {other:?}"),
};
assert!(matches!(err, FerrumError::ParseError(_)));
}

#[test]
fn ttl_without_key_is_wrong_arity() {
let err = match parse_frame(b"*1\r\n$3\r\nTTL\r\n").unwrap() {
FrameParse::Invalid { error, .. } => error,
other => panic!("expected Invalid, got {other:?}"),
};
assert!(matches!(err, FerrumError::WrongArity { cmd: "TTL" }));
}
}
Loading
Loading