Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ signal-hook = "0.4.4"
log = "0.4"
env_logger = "0.11"
indexmap = "2"
tokio = { version = "1", features = ["rt-multi-thread", "net", "io-util", "macros", "signal", "sync", "time"] }

[dev-dependencies]
criterion = "0.8.2"
Expand Down
58 changes: 58 additions & 0 deletions benches/redis-benchmark.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,61 @@ redis-benchmark -h 127.0.0.1 -p 6399 -n 100000 -c 50 -q -t set,get,incr
redis-benchmark -h 127.0.0.1 -p 6399 -n 100000 -c 50 -P 16 -q -t set,get,incr
kill "$SERVER"
```

---

## v0.4.0 — tokio async runtime (Week 8)

Re-ran on the same Apple M5 / macOS 26.4.1 box after the `feat/async-runtime`
branch replaced the thread-per-connection model with a multi-threaded tokio
runtime. The server binary, the benchmark client and the command mix are
identical to the v0.3.0 runs above so the deltas are apples-to-apples.

### Scenario 1 — no memory cap, `c=50`

| Command | v0.3.0 QPS | v0.4.0 QPS | Δ |
| ------- | ---------- | ---------- | ------ |
| SET | 58 207 | 62 189 | +6.8% |
| GET | 61 349 | 65 231 | +6.3% |
| INCR | 61 728 | 63 492 | +2.9% |

### Scenario 2 — pipelined (`-P 16`, `c=50`)

| Command | v0.3.0 QPS | v0.4.0 QPS | Δ |
| ------- | ---------- | ---------- | ------ |
| SET | 366 300 | 350 877 | -4.2% |
| GET | 373 134 | 378 787 | +1.5% |
| INCR | 375 939 | 381 679 | +1.5% |

> Pipelined throughput is dominated by in-memory command execution rather
> than socket I/O, so the delta is inside run-to-run noise. What matters
> is that the async rewrite did **not** regress the hot path.

### Scenario 5 — high client fan-out (`c=500`)

```
./target/release/ferrum-kv --addr 127.0.0.1:6399
redis-benchmark -h 127.0.0.1 -p 6399 -n 100000 -c 500 -q -t set,get
```

| Command | v0.4.0 QPS | p50 |
| ------- | ---------- | -------- |
| SET | 50 301 | 4.567 ms |
| GET | 43 440 | 5.871 ms |

> The pre-tokio build would have tried to spawn 500 OS threads here — one
> per accepted connection — and spent most of its time context switching.
> v0.4.0 handles the whole fan-out on a handful of worker threads with
> single-digit-millisecond p50, the clearest win of Phase 8.

### Observations

- Single-client-per-thread throughput nudged up ~5% because tokio replaces
two blocking `read`/`write` syscalls per round-trip with `epoll` /
`kqueue`-backed readiness checks that batch more efficiently under load.
- The thread-per-connection floor is gone: scaling `-c` past the CPU count
no longer multiplies kernel threads; the runtime just multiplexes more
sockets onto the same workers.
- The integration suite's 500-client PING fan-out finishes in under 300 ms
on the same box, confirming the smoke result is not a microbenchmark
artefact.
7 changes: 7 additions & 0 deletions ferrum.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,10 @@ loglevel info

# How many random candidates to inspect per eviction round (Redis default: 5).
# maxmemory-samples 5

# ---- Runtime -------------------------------------------------------
# Number of tokio worker threads driving accept/read/write loops.
# 0 (or unset) means "one per logical CPU", which is the right default
# for most deployments. Bump it higher only if you are pinning other
# workloads to specific cores and want to cap FerrumKV's share.
# io-threads 0
57 changes: 53 additions & 4 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub const USAGE: &str = concat!(
" [--appendfsync always|everysec|no]\n",
" [--client-timeout SECONDS] [--maxclients N]\n",
" [--maxmemory BYTES] [--maxmemory-policy POLICY]\n",
" [--maxmemory-samples N]\n",
" [--maxmemory-samples N] [--io-threads N]\n",
" [--loglevel off|error|warn|info|debug|trace]"
);

Expand Down Expand Up @@ -60,6 +60,9 @@ pub struct CliArgs {
max_memory_policy: Option<EvictionPolicy>,
/// How many random candidates each eviction round considers.
max_memory_samples: Option<usize>,
/// Number of tokio worker threads; `0` (unset) asks tokio for the
/// default (usually one per logical CPU).
io_threads: Option<usize>,
}

/// Raw, un-merged values taken verbatim from the command line.
Expand All @@ -81,6 +84,7 @@ struct RawFlags {
max_memory: Option<u64>,
max_memory_policy: Option<EvictionPolicy>,
max_memory_samples: Option<usize>,
io_threads: Option<usize>,
/// Whether AOF was explicitly enabled via the config file's `appendonly yes`.
/// CLI `--aof-path` implies enabled; this field only carries the file's
/// intent so that a later merge step can decide.
Expand All @@ -92,7 +96,7 @@ impl CliArgs {
pub fn parse<I: IntoIterator<Item = String>>(args: I) -> Result<Invocation, String> {
let raw = match scan_argv(args)? {
ScanOutcome::Help => return Ok(Invocation::Help),
ScanOutcome::Flags(f) => f,
ScanOutcome::Flags(f) => *f,
};

let file_cfg = if let Some(path) = raw.config_path.as_deref() {
Expand Down Expand Up @@ -127,6 +131,12 @@ impl CliArgs {
self.loglevel.as_deref()
}

/// Returns the explicit tokio worker thread count. `None` means
/// "let tokio pick", which is what most deployments want.
pub fn io_threads(&self) -> Option<usize> {
self.io_threads
}

/// Returns the resolved eviction configuration. Defaults (unlimited
/// memory, `noeviction`, 5 samples) apply when neither CLI nor config
/// file specifies a value.
Expand All @@ -142,7 +152,7 @@ impl CliArgs {

enum ScanOutcome {
Help,
Flags(RawFlags),
Flags(Box<RawFlags>),
}

/// First pass: walk `argv` and record every flag we recognise without
Expand Down Expand Up @@ -210,12 +220,18 @@ fn scan_argv<I: IntoIterator<Item = String>>(args: I) -> Result<ScanOutcome, Str
format!("invalid --maxmemory-samples: '{value}' is not a non-negative integer")
})?);
}
"--io-threads" => {
let value = take_value(&mut iter, "--io-threads")?;
raw.io_threads = Some(value.parse().map_err(|_| {
format!("invalid --io-threads: '{value}' is not a non-negative integer")
})?);
}
"-h" | "--help" => return Ok(ScanOutcome::Help),
other => return Err(format!("unrecognised argument: '{other}'")),
}
}

Ok(ScanOutcome::Flags(raw))
Ok(ScanOutcome::Flags(Box::new(raw)))
}

/// Merges `raw` flags with an optional [`FileConfig`], applying defaults for
Expand Down Expand Up @@ -278,6 +294,7 @@ fn merge(raw: RawFlags, file: Option<&FileConfig>) -> Result<CliArgs, String> {
let max_memory_samples = raw
.max_memory_samples
.or_else(|| file.and_then(|f| f.max_memory_samples));
let io_threads = raw.io_threads.or_else(|| file.and_then(|f| f.io_threads));

Ok(CliArgs {
addr,
Expand All @@ -289,6 +306,7 @@ fn merge(raw: RawFlags, file: Option<&FileConfig>) -> Result<CliArgs, String> {
max_memory,
max_memory_policy,
max_memory_samples,
io_threads,
})
}

Expand Down Expand Up @@ -649,4 +667,35 @@ mod tests {
assert_eq!(cfg.policy, EvictionPolicy::AllKeysLru);
assert_eq!(cfg.samples, 3);
}

#[test]
fn io_threads_defaults_to_none() {
assert!(parse_run(&[]).io_threads().is_none());
}

#[test]
fn io_threads_flag_is_parsed() {
assert_eq!(parse_run(&["--io-threads", "4"]).io_threads(), Some(4));
assert_eq!(parse_run(&["--io-threads", "0"]).io_threads(), Some(0));
}

#[test]
fn io_threads_rejects_non_integer() {
let err = parse(&["--io-threads", "lots"]).unwrap_err();
assert!(err.contains("--io-threads"));
}

#[test]
fn io_threads_from_config_file() {
let conf = TempConf::new("io-threads", "io-threads 8\n");
let args = parse_run(&["--config", conf.path.to_str().unwrap()]);
assert_eq!(args.io_threads(), Some(8));
}

#[test]
fn cli_io_threads_overrides_config_file() {
let conf = TempConf::new("io-threads-override", "io-threads 8\n");
let args = parse_run(&["--config", conf.path.to_str().unwrap(), "--io-threads", "2"]);
assert_eq!(args.io_threads(), Some(2));
}
}
13 changes: 13 additions & 0 deletions src/config/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ pub struct FileConfig {
pub max_memory_policy: Option<EvictionPolicy>,
/// Number of keys inspected per eviction round.
pub max_memory_samples: Option<usize>,
/// Number of tokio worker threads. `0` (or unset) lets the runtime
/// pick a sensible default (usually the logical CPU count).
pub io_threads: Option<usize>,
}

/// Error type for configuration file parsing.
Expand Down Expand Up @@ -180,6 +183,9 @@ fn apply_directive(cfg: &mut FileConfig, key: &str, value: &str) -> Result<(), S
"maxmemory-samples" => {
cfg.max_memory_samples = Some(parse_usize(value, "maxmemory-samples")?);
}
"io-threads" => {
cfg.io_threads = Some(parse_usize(value, "io-threads")?);
}
other => return Err(format!("unknown directive '{other}'")),
}
Ok(())
Expand Down Expand Up @@ -389,4 +395,11 @@ mod tests {
Some(12),
);
}

#[test]
fn io_threads_is_parsed() {
assert_eq!(parse("io-threads 4\n").unwrap().io_threads, Some(4));
assert_eq!(parse("io-threads 0\n").unwrap().io_threads, Some(0));
assert!(parse("io-threads nope\n").is_err());
}
}
15 changes: 11 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ fn main() -> ExitCode {
max_clients: args
.max_clients()
.unwrap_or_else(|| ServerConfig::default().max_clients),
worker_threads: args.io_threads().unwrap_or(0),
};
match server_config.client_timeout {
Some(d) => info!("client idle timeout: {}s", d.as_secs()),
Expand All @@ -99,6 +100,11 @@ fn main() -> ExitCode {
} else {
info!("maxclients: {}", server_config.max_clients);
}
if server_config.worker_threads == 0 {
info!("io-threads: auto (one per logical CPU)");
} else {
info!("io-threads: {}", server_config.worker_threads);
}

let expire_handle = expire::spawn(engine.clone(), shutdown.clone());

Expand Down Expand Up @@ -176,9 +182,11 @@ fn build_engine(args: &CliArgs) -> Result<KvEngine, ExitCode> {
}
}

/// Installs SIGINT/SIGTERM handlers that flip the shared shutdown flag and
/// self-connect to the listener so the blocked `accept` returns immediately.
fn install_signal_handlers(shutdown: Shutdown, wake_addr: SocketAddr) -> std::io::Result<()> {
/// Installs SIGINT/SIGTERM handlers that flip the shared shutdown flag. The
/// async accept loop observes the flag via `Shutdown::notified` and exits on
/// the next scheduler poll, so we no longer need the self-connect wake-up
/// trick that the blocking listener required.
fn install_signal_handlers(shutdown: Shutdown, _wake_addr: SocketAddr) -> std::io::Result<()> {
use signal_hook::consts::{SIGINT, SIGTERM};
use signal_hook::iterator::Signals;

Expand All @@ -197,7 +205,6 @@ fn install_signal_handlers(shutdown: Shutdown, wake_addr: SocketAddr) -> std::io
};
warn!("received {name}, initiating graceful shutdown");
shutdown.trigger();
Shutdown::wake_listener(wake_addr);
}
})?;
Ok(())
Expand Down
Loading
Loading