diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..19dc571 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,50 @@ +name: ci + +on: + push: + branches: [master] + pull_request: + branches: [master] + +concurrency: + group: ci-${{ github.ref }} + cancel-in-progress: true + +env: + CARGO_TERM_COLOR: always + RUSTFLAGS: -D warnings + +jobs: + check: + name: fmt + clippy + test + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Install stable toolchain + uses: dtolnay/rust-toolchain@stable + with: + components: rustfmt, clippy + + - name: Cache cargo + uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo- + + - name: Format + run: cargo fmt --all -- --check + + - name: Clippy + run: cargo clippy --all-targets --all-features -- -D warnings + + - name: Test + run: cargo test --all-targets --all-features + + - name: Benches compile + run: cargo bench --no-run --all-features diff --git a/Cargo.lock b/Cargo.lock index cf45f7a..21a5001 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11,6 +11,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "alloca" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5a7d05ea6aea7e9e64d25b9156ba2fee3fdd659e34e41063cd2fc7cd020d7f4" +dependencies = [ + "cc", +] + +[[package]] +name = "anes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" + [[package]] name = "anstream" version = "1.0.0" @@ -61,12 +76,170 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "autocfg" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" + +[[package]] +name = "bumpalo" +version = "3.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb" + +[[package]] +name = "cast" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" + +[[package]] +name = "cc" +version = "1.2.61" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d16d90359e986641506914ba71350897565610e87ce0ad9e6f28569db3dd5c6d" +dependencies = [ + "find-msvc-tools", + "shlex", +] + +[[package]] +name = "cfg-if" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" + +[[package]] +name = "ciborium" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757" + +[[package]] +name = "ciborium-ll" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9" +dependencies = [ + "ciborium-io", + "half", +] + +[[package]] +name = "clap" +version = "4.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ddb117e43bbf7dacf0a4190fef4d345b9bad68dfc649cb349e7d17d28428e51" +dependencies = [ + "clap_builder", +] + +[[package]] +name = "clap_builder" +version = "4.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "714a53001bf66416adb0e2ef5ac857140e7dc3a0c48fb28b2f10762fc4b5069f" +dependencies = [ + "anstyle", + "clap_lex", +] + +[[package]] +name = "clap_lex" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9" + [[package]] name = "colorchoice" version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d07550c9036bf2ae0c684c4297d503f838287c83c53686d05370d0e139ae570" +[[package]] +name = "criterion" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "950046b2aa2492f9a536f5f4f9a3de7b9e2476e575e05bd6c333371add4d98f3" +dependencies = [ + "alloca", + "anes", + "cast", + "ciborium", + "clap", + "criterion-plot", + "itertools", + "num-traits", + "oorandom", + "page_size", + "plotters", + "rayon", + "regex", + "serde", + "serde_json", + "tinytemplate", + "walkdir", +] + +[[package]] +name = "criterion-plot" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8d80a2f4f5b554395e47b5d8305bc3d27813bacb73493eb1001e8f76dae29ea" +dependencies = [ + "cast", + "itertools", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + +[[package]] +name = "crunchy" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" + +[[package]] +name = "either" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" + [[package]] name = "env_filter" version = "1.0.1" @@ -104,17 +277,74 @@ dependencies = [ name = "ferrum-kv" version = "0.1.0" dependencies = [ + "criterion", "env_logger", "log", "signal-hook", ] +[[package]] +name = "find-msvc-tools" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" + +[[package]] +name = "futures-core" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" + +[[package]] +name = "futures-task" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "037711b3d59c33004d3856fbdc83b99d4ff37a24768fa1be9ce3538a1cde4393" + +[[package]] +name = "futures-util" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" +dependencies = [ + "futures-core", + "futures-task", + "pin-project-lite", + "slab", +] + +[[package]] +name = "half" +version = "2.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ea2d84b969582b4b1864a92dc5d27cd2b77b622a8d79306834f1be5ba20d84b" +dependencies = [ + "cfg-if", + "crunchy", + "zerocopy", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + +[[package]] +name = "itoa" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" + [[package]] name = "jiff" version = "0.2.24" @@ -139,6 +369,18 @@ dependencies = [ "syn", ] +[[package]] +name = "js-sys" +version = "0.3.97" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1840c94c045fbcf8ba2812c95db44499f7c64910a912551aaaa541decebcacf" +dependencies = [ + "cfg-if", + "futures-util", + "once_cell", + "wasm-bindgen", +] + [[package]] name = "libc" version = "0.2.186" @@ -157,12 +399,77 @@ version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", +] + +[[package]] +name = "once_cell" +version = "1.21.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" + [[package]] name = "once_cell_polyfill" version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" +[[package]] +name = "oorandom" +version = "11.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e" + +[[package]] +name = "page_size" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30d5b2194ed13191c1999ae0704b7839fb18384fa22e49b57eeaa97d79ce40da" +dependencies = [ + "libc", + "winapi", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" + +[[package]] +name = "plotters" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aeb6f403d7a4911efb1e33402027fc44f29b5bf6def3effcc22d7bb75f2b747" +dependencies = [ + "num-traits", + "plotters-backend", + "plotters-svg", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "plotters-backend" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df42e13c12958a16b3f7f4386b9ab1f3e7933914ecea48da7139435263a4172a" + +[[package]] +name = "plotters-svg" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51bae2ac328883f7acdfea3d66a7c35751187f870bc81f94563733a154d7a670" +dependencies = [ + "plotters-backend", +] + [[package]] name = "portable-atomic" version = "1.13.1" @@ -196,6 +503,26 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rayon" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb39b166781f92d482534ef4b4b1b2568f42613b53e5b6c160e24cfbfa30926d" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22e18b0f0062d30d4230b2e85ff77fdfe4326feb054b9783a3460d8435c8ab91" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "regex" version = "1.12.3" @@ -225,6 +552,31 @@ version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" +[[package]] +name = "rustversion" +version = "1.0.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" + +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + +[[package]] +name = "serde" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", + "serde_derive", +] + [[package]] name = "serde_core" version = "1.0.228" @@ -245,11 +597,30 @@ dependencies = [ "syn", ] +[[package]] +name = "serde_json" +version = "1.0.149" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" +dependencies = [ + "itoa", + "memchr", + "serde", + "serde_core", + "zmij", +] + +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "signal-hook" -version = "0.3.18" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d881a16cf4426aa584979d30bd82cb33429027e42122b169753d6ef1085ed6e2" +checksum = "b2a0c28ca5908dbdbcd52e6fdaa00358ab88637f8ab33e1f188dd510eb44b53d" dependencies = [ "libc", "signal-hook-registry", @@ -265,6 +636,12 @@ dependencies = [ "libc", ] +[[package]] +name = "slab" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c790de23124f9ab44544d7ac05d60440adc586479ce501c1d6d7da3cd8c9cf5" + [[package]] name = "syn" version = "2.0.117" @@ -276,6 +653,16 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "tinytemplate" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "unicode-ident" version = "1.0.24" @@ -288,6 +675,102 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + +[[package]] +name = "wasm-bindgen" +version = "0.2.120" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df52b6d9b87e0c74c9edfa1eb2d9bf85e5d63515474513aa50fa181b3c4f5db1" +dependencies = [ + "cfg-if", + "once_cell", + "rustversion", + "wasm-bindgen-macro", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.120" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78b1041f495fb322e64aca85f5756b2172e35cd459376e67f2a6c9dffcedb103" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.120" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dcd0ff20416988a18ac686d4d4d0f6aae9ebf08a389ff5d29012b05af2a1b41" +dependencies = [ + "bumpalo", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.120" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49757b3c82ebf16c57d69365a142940b384176c24df52a087fb748e2085359ea" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "web-sys" +version = "0.3.97" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2eadbac71025cd7b0834f20d1fe8472e8495821b4e9801eb0a60bd1f19827602" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-util" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" +dependencies = [ + "windows-sys", +] + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "windows-link" version = "0.2.1" @@ -302,3 +785,29 @@ checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc" dependencies = [ "windows-link", ] + +[[package]] +name = "zerocopy" +version = "0.8.48" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eed437bf9d6692032087e337407a86f04cd8d6a16a37199ed57949d415bd68e9" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.8.48" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70e3cd084b1788766f53af483dd21f93881ff30d7320490ec3ef7526d203bad4" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "zmij" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa" diff --git a/Cargo.toml b/Cargo.toml index cd7e293..89af80d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,17 @@ version = "0.1.0" edition = "2024" [dependencies] -signal-hook = "0.3" +signal-hook = "0.4.4" log = "0.4" env_logger = "0.11" + +[dev-dependencies] +criterion = "0.8.2" + +[[bench]] +name = "engine_bench" +harness = false + +[[bench]] +name = "resp2_bench" +harness = false diff --git a/benches/engine_bench.rs b/benches/engine_bench.rs new file mode 100644 index 0000000..cbf0ccd --- /dev/null +++ b/benches/engine_bench.rs @@ -0,0 +1,92 @@ +//! Microbenchmarks for the in-memory KV engine. +//! +//! These benchmarks call the engine's public API directly, bypassing the +//! RESP2 wire layer, so any change to the storage fast path shows up as a +//! clear regression. They are intentionally narrow: no network, no AOF, +//! no contention. End-to-end numbers should be gathered with +//! `scripts/bench-redis.sh` against a running server instead. +//! +//! Run with `cargo bench --bench engine_bench`. + +use std::hint::black_box; + +use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main}; + +use ferrum_kv::storage::engine::KvEngine; + +fn bench_set(c: &mut Criterion) { + let mut group = c.benchmark_group("engine/set"); + for &value_len in &[8usize, 64, 512, 4096] { + let value = vec![b'x'; value_len]; + group.throughput(Throughput::Bytes(value_len as u64)); + group.bench_with_input( + BenchmarkId::from_parameter(value_len), + &value, + |b, value| { + let engine = KvEngine::new(); + let mut n: u64 = 0; + b.iter(|| { + n = n.wrapping_add(1); + let key = format!("k{n}").into_bytes(); + engine + .set(black_box(key), black_box(value.clone())) + .unwrap(); + }); + }, + ); + } + group.finish(); +} + +fn bench_get_hit(c: &mut Criterion) { + let engine = KvEngine::new(); + for i in 0..1_000 { + engine + .set(format!("k{i}").into_bytes(), b"value".to_vec()) + .unwrap(); + } + + c.bench_function("engine/get/hit", |b| { + let mut n: u64 = 0; + b.iter(|| { + n = n.wrapping_add(1); + let key = format!("k{}", n % 1_000); + let v = engine.get(black_box(key.as_bytes())).unwrap(); + black_box(v); + }); + }); +} + +fn bench_get_miss(c: &mut Criterion) { + let engine = KvEngine::new(); + c.bench_function("engine/get/miss", |b| { + let mut n: u64 = 0; + b.iter(|| { + n = n.wrapping_add(1); + let key = format!("absent{n}"); + let v = engine.get(black_box(key.as_bytes())).unwrap(); + black_box(v); + }); + }); +} + +fn bench_incr(c: &mut Criterion) { + c.bench_function("engine/incr", |b| { + let engine = KvEngine::new(); + b.iter(|| { + let v = engine + .incr_by(black_box(b"counter".to_vec()), black_box(1)) + .unwrap(); + black_box(v); + }); + }); +} + +criterion_group!( + benches, + bench_set, + bench_get_hit, + bench_get_miss, + bench_incr +); +criterion_main!(benches); diff --git a/benches/resp2_bench.rs b/benches/resp2_bench.rs new file mode 100644 index 0000000..f54f536 --- /dev/null +++ b/benches/resp2_bench.rs @@ -0,0 +1,101 @@ +//! Microbenchmarks for the RESP2 parser and encoder. +//! +//! These benchmarks isolate the wire-format fast path from the rest of the +//! server: every iteration parses (or encodes) a pre-built byte buffer, so +//! any regression in parsing or encoding shows up immediately without TCP +//! or engine noise. +//! +//! Run with `cargo bench --bench resp2_bench`. + +use std::hint::black_box; + +use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main}; + +use ferrum_kv::protocol::encoder; +use ferrum_kv::protocol::parser::{FrameParse, parse_frame}; + +fn build_set_frame(key: &[u8], value: &[u8]) -> Vec { + let mut out = Vec::new(); + out.extend_from_slice(b"*3\r\n"); + encoder::encode_bulk_string(&mut out, b"SET"); + encoder::encode_bulk_string(&mut out, key); + encoder::encode_bulk_string(&mut out, value); + out +} + +fn build_get_frame(key: &[u8]) -> Vec { + let mut out = Vec::new(); + out.extend_from_slice(b"*2\r\n"); + encoder::encode_bulk_string(&mut out, b"GET"); + encoder::encode_bulk_string(&mut out, key); + out +} + +fn bench_parse_set(c: &mut Criterion) { + let mut group = c.benchmark_group("resp2/parse/set"); + for &value_len in &[8usize, 64, 512, 4096] { + let value = vec![b'x'; value_len]; + let frame = build_set_frame(b"some-key", &value); + group.throughput(Throughput::Bytes(frame.len() as u64)); + group.bench_with_input( + BenchmarkId::from_parameter(value_len), + &frame, + |b, frame| { + b.iter(|| { + let parsed = parse_frame(black_box(frame)).unwrap(); + assert!(matches!(parsed, FrameParse::Complete { .. })); + }); + }, + ); + } + group.finish(); +} + +fn bench_parse_get(c: &mut Criterion) { + let frame = build_get_frame(b"some-key"); + c.bench_function("resp2/parse/get", |b| { + b.iter(|| { + let parsed = parse_frame(black_box(&frame)).unwrap(); + assert!(matches!(parsed, FrameParse::Complete { .. })); + }); + }); +} + +fn bench_encode_bulk(c: &mut Criterion) { + let mut group = c.benchmark_group("resp2/encode/bulk"); + for &value_len in &[8usize, 64, 512, 4096] { + let payload = vec![b'x'; value_len]; + group.throughput(Throughput::Bytes(value_len as u64)); + group.bench_with_input( + BenchmarkId::from_parameter(value_len), + &payload, + |b, payload| { + b.iter(|| { + let mut out = Vec::with_capacity(payload.len() + 16); + encoder::encode_bulk_string(&mut out, black_box(payload)); + black_box(out); + }); + }, + ); + } + group.finish(); +} + +fn bench_encode_integer(c: &mut Criterion) { + c.bench_function("resp2/encode/integer", |b| { + b.iter(|| { + let mut out = Vec::with_capacity(16); + encoder::encode_integer(&mut out, black_box(1_234_567)); + black_box(out); + }); + }); +} + +criterion_group!( + benches, + bench_parse_set, + bench_parse_get, + bench_encode_bulk, + bench_encode_integer +); +criterion_main!(benches); diff --git a/examples/load_gen.rs b/examples/load_gen.rs new file mode 100644 index 0000000..b99477e --- /dev/null +++ b/examples/load_gen.rs @@ -0,0 +1,294 @@ +//! Minimal load generator for ferrum-kv. +//! +//! Spawns N worker threads, each opening a single TCP connection to the +//! server and issuing a fixed number of `SET`, `GET`, and `INCR` commands +//! in sequence. Commands are written as raw RESP2 arrays — no compatibility +//! handshake is performed — and replies are parsed just enough to confirm +//! correctness before moving on. +//! +//! The point is not to be a fully-featured benchmark (that is what +//! `redis-benchmark` is for, once we implement enough commands for its +//! handshake), but to give us a reproducible baseline for `SET / GET / INCR` +//! throughput that lives inside this repo and keeps working as the server +//! evolves. +//! +//! Build with `cargo build --release --example load_gen` and drive it via +//! `scripts/bench-smoke.sh`, or invoke directly: +//! +//! ```bash +//! cargo run --release --example load_gen -- \ +//! --addr 127.0.0.1:6399 --clients 50 --requests 100000 +//! ``` + +use std::env; +use std::io::{Read, Write}; +use std::net::TcpStream; +use std::process::ExitCode; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::thread; +use std::time::{Duration, Instant}; + +struct Config { + addr: String, + clients: usize, + requests: usize, + value_size: usize, +} + +fn parse_args() -> Result { + let mut cfg = Config { + addr: "127.0.0.1:6399".to_string(), + clients: 50, + requests: 100_000, + value_size: 64, + }; + let mut iter = env::args().skip(1); + while let Some(flag) = iter.next() { + match flag.as_str() { + "--addr" => { + cfg.addr = iter + .next() + .ok_or_else(|| "--addr requires a value".to_string())?; + } + "--clients" => { + cfg.clients = iter + .next() + .ok_or_else(|| "--clients requires a value".to_string())? + .parse() + .map_err(|e| format!("--clients: {e}"))?; + } + "--requests" => { + cfg.requests = iter + .next() + .ok_or_else(|| "--requests requires a value".to_string())? + .parse() + .map_err(|e| format!("--requests: {e}"))?; + } + "--value-size" => { + cfg.value_size = iter + .next() + .ok_or_else(|| "--value-size requires a value".to_string())? + .parse() + .map_err(|e| format!("--value-size: {e}"))?; + } + "-h" | "--help" => { + print_usage(); + std::process::exit(0); + } + other => return Err(format!("unknown flag: {other}")), + } + } + if cfg.clients == 0 { + return Err("--clients must be > 0".into()); + } + if cfg.requests == 0 { + return Err("--requests must be > 0".into()); + } + Ok(cfg) +} + +fn print_usage() { + eprintln!( + "usage: load_gen [--addr HOST:PORT] [--clients N] [--requests N] [--value-size BYTES]" + ); +} + +fn connect(addr: &str) -> std::io::Result { + let s = TcpStream::connect(addr)?; + s.set_nodelay(true)?; + s.set_read_timeout(Some(Duration::from_secs(10)))?; + s.set_write_timeout(Some(Duration::from_secs(10)))?; + Ok(s) +} + +/// Reads one RESP2 reply from the stream, returning just the first byte so +/// the caller can tell apart `+OK`, `$N`, `:N`, and `-ERR`. +fn read_reply_kind(stream: &mut TcpStream) -> std::io::Result { + let mut header = Vec::with_capacity(16); + let mut byte = [0u8; 1]; + loop { + stream.read_exact(&mut byte)?; + header.push(byte[0]); + if header.len() >= 2 && header.ends_with(b"\r\n") { + break; + } + } + match header[0] { + b'+' | b'-' | b':' => Ok(header[0]), + b'$' => { + let len: i64 = std::str::from_utf8(&header[1..header.len() - 2]) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))? + .parse() + .map_err(|e: std::num::ParseIntError| { + std::io::Error::new(std::io::ErrorKind::InvalidData, e) + })?; + if len >= 0 { + let mut body = vec![0u8; len as usize + 2]; + stream.read_exact(&mut body)?; + } + Ok(b'$') + } + other => Err(std::io::Error::other(format!( + "unexpected reply prefix: {other:#x}" + ))), + } +} + +fn append_bulk(out: &mut Vec, payload: &[u8]) { + out.extend_from_slice(format!("${}\r\n", payload.len()).as_bytes()); + out.extend_from_slice(payload); + out.extend_from_slice(b"\r\n"); +} + +fn build_set(key: &[u8], value: &[u8]) -> Vec { + let mut out = Vec::with_capacity(32 + key.len() + value.len()); + out.extend_from_slice(b"*3\r\n"); + append_bulk(&mut out, b"SET"); + append_bulk(&mut out, key); + append_bulk(&mut out, value); + out +} + +fn build_get(key: &[u8]) -> Vec { + let mut out = Vec::with_capacity(16 + key.len()); + out.extend_from_slice(b"*2\r\n"); + append_bulk(&mut out, b"GET"); + append_bulk(&mut out, key); + out +} + +fn build_incr(key: &[u8]) -> Vec { + let mut out = Vec::with_capacity(16 + key.len()); + out.extend_from_slice(b"*2\r\n"); + append_bulk(&mut out, b"INCR"); + append_bulk(&mut out, key); + out +} + +struct Totals { + set: AtomicU64, + get: AtomicU64, + incr: AtomicU64, + errors: AtomicU64, +} + +fn run_phase(label: &str, cfg: &Config, totals: &Arc, build: F) +where + F: Fn(usize, usize) -> Vec + Send + Sync + 'static + Clone, +{ + let per_client = cfg.requests / cfg.clients; + let remainder = cfg.requests - per_client * cfg.clients; + + let start = Instant::now(); + let mut handles = Vec::with_capacity(cfg.clients); + for cid in 0..cfg.clients { + let count = per_client + if cid < remainder { 1 } else { 0 }; + let addr = cfg.addr.clone(); + let totals = Arc::clone(totals); + let label = label.to_string(); + let build = build.clone(); + handles.push(thread::spawn(move || { + let mut stream = match connect(&addr) { + Ok(s) => s, + Err(e) => { + eprintln!("client {cid}: connect failed: {e}"); + totals.errors.fetch_add(count as u64, Ordering::Relaxed); + return; + } + }; + for op in 0..count { + let frame = build(cid, op); + if stream.write_all(&frame).is_err() { + totals.errors.fetch_add(1, Ordering::Relaxed); + break; + } + match read_reply_kind(&mut stream) { + Ok(b'-') => { + totals.errors.fetch_add(1, Ordering::Relaxed); + } + Ok(_) => match label.as_str() { + "SET" => { + totals.set.fetch_add(1, Ordering::Relaxed); + } + "GET" => { + totals.get.fetch_add(1, Ordering::Relaxed); + } + "INCR" => { + totals.incr.fetch_add(1, Ordering::Relaxed); + } + _ => {} + }, + Err(_) => { + totals.errors.fetch_add(1, Ordering::Relaxed); + break; + } + } + } + })); + } + for h in handles { + let _ = h.join(); + } + let elapsed = start.elapsed(); + + let done = match label { + "SET" => totals.set.load(Ordering::Relaxed), + "GET" => totals.get.load(Ordering::Relaxed), + "INCR" => totals.incr.load(Ordering::Relaxed), + _ => 0, + }; + let secs = elapsed.as_secs_f64().max(1e-9); + let qps = done as f64 / secs; + println!( + "{label:<5} {done:>10} ops {secs:>7.3} s {qps:>10.0} ops/s", + label = label, + done = done, + secs = secs, + qps = qps + ); +} + +fn main() -> ExitCode { + let cfg = match parse_args() { + Ok(c) => c, + Err(e) => { + eprintln!("error: {e}"); + print_usage(); + return ExitCode::from(2); + } + }; + + println!( + "addr={} clients={} requests={} value_size={}", + cfg.addr, cfg.clients, cfg.requests, cfg.value_size + ); + + let totals = Arc::new(Totals { + set: AtomicU64::new(0), + get: AtomicU64::new(0), + incr: AtomicU64::new(0), + errors: AtomicU64::new(0), + }); + + let value_size = cfg.value_size; + run_phase("SET", &cfg, &totals, move |cid, op| { + let key = format!("c{cid}:k{op}").into_bytes(); + let value = vec![b'x'; value_size]; + build_set(&key, &value) + }); + run_phase("GET", &cfg, &totals, move |cid, op| { + let key = format!("c{cid}:k{op}").into_bytes(); + build_get(&key) + }); + run_phase("INCR", &cfg, &totals, move |_cid, _op| { + build_incr(b"counter") + }); + + let errors = totals.errors.load(Ordering::Relaxed); + if errors > 0 { + eprintln!("warning: {errors} command errors during run"); + return ExitCode::from(1); + } + ExitCode::SUCCESS +} diff --git a/scripts/bench-smoke.sh b/scripts/bench-smoke.sh new file mode 100755 index 0000000..68d8ea5 --- /dev/null +++ b/scripts/bench-smoke.sh @@ -0,0 +1,68 @@ +#!/usr/bin/env bash +# Smoke benchmark for ferrum-kv. Starts a release build of the server, +# hammers it with the Rust-native load generator in `examples/load_gen.rs`, +# then tears the server down on exit. +# +# A Rust driver is used instead of `redis-benchmark` or `redis-cli --pipe` +# because both of those probe the server with commands (inline `PING`, +# `ECHO `) that ferrum-kv does not yet implement; speaking RESP2 +# directly from the driver keeps the benchmark honest without adding +# compatibility commands just for tooling. +# +# Prerequisites: +# - a recent Rust toolchain (the driver is built via `cargo run --release`) +# +# Usage: +# scripts/bench-redis.sh # defaults +# PORT=7000 REQUESTS=500000 scripts/bench-redis.sh +# CLIENTS=64 VALUE_SIZE=256 scripts/bench-redis.sh + +set -euo pipefail + +PORT="${PORT:-6399}" +REQUESTS="${REQUESTS:-100000}" +CLIENTS="${CLIENTS:-50}" +VALUE_SIZE="${VALUE_SIZE:-64}" + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +BIN="${ROOT_DIR}/target/release/ferrum-kv" + +echo "==> building release binary and load generator" +cargo build --release --manifest-path "${ROOT_DIR}/Cargo.toml" \ + --example load_gen --bin ferrum-kv >/dev/null + +SERVER_PID="" +cleanup() { + if [[ -n "${SERVER_PID}" ]] && kill -0 "${SERVER_PID}" 2>/dev/null; then + kill "${SERVER_PID}" 2>/dev/null || true + wait "${SERVER_PID}" 2>/dev/null || true + fi +} +trap cleanup EXIT INT TERM + +echo "==> starting ferrum-kv on 127.0.0.1:${PORT}" +"${BIN}" --addr "127.0.0.1:${PORT}" >/tmp/ferrum-kv-bench.log 2>&1 & +SERVER_PID=$! + +# Poll until the server accepts TCP connections. +for _ in {1..50}; do + if (exec 3<>/dev/tcp/127.0.0.1/"${PORT}") 2>/dev/null; then + exec 3>&- 3<&- + break + fi + sleep 0.1 +done + +if ! kill -0 "${SERVER_PID}" 2>/dev/null; then + echo "error: ferrum-kv exited during startup; log:" >&2 + cat /tmp/ferrum-kv-bench.log >&2 || true + exit 1 +fi + +"${ROOT_DIR}/target/release/examples/load_gen" \ + --addr "127.0.0.1:${PORT}" \ + --requests "${REQUESTS}" \ + --clients "${CLIENTS}" \ + --value-size "${VALUE_SIZE}" + +echo "==> done" diff --git a/tests/concurrency_stress_test.rs b/tests/concurrency_stress_test.rs new file mode 100644 index 0000000..2b16c86 --- /dev/null +++ b/tests/concurrency_stress_test.rs @@ -0,0 +1,264 @@ +//! Concurrency and stress integration tests. +//! +//! These tests push many clients at a real TCP listener in parallel to flush +//! out races that single-threaded tests cannot reach: +//! +//! * disjoint-key workloads must not lose writes or corrupt the index; +//! * shared-key `INCR` must be exactly atomic across all clients; +//! * when AOF is enabled, a replay into a fresh engine must reproduce the +//! running engine's state byte-for-byte, proving the log is serialised +//! correctly under contention. +//! +//! Volumes are deliberately modest (tens of clients × hundreds of ops) so the +//! suite stays fast enough for `cargo test`. The `redis-benchmark` script +//! under `scripts/` is the right tool for pushing real load. + +use std::fs; +use std::io::{Read, Write}; +use std::net::{TcpListener, TcpStream}; +use std::path::PathBuf; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::thread; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use ferrum_kv::network::server::{self, ServerConfig}; +use ferrum_kv::network::shutdown::Shutdown; +use ferrum_kv::persistence::AofWriter; +use ferrum_kv::persistence::config::{AofConfig, FsyncPolicy}; +use ferrum_kv::persistence::replay; +use ferrum_kv::protocol::encoder; +use ferrum_kv::storage::engine::KvEngine; + +static COUNTER: AtomicU64 = AtomicU64::new(0); + +fn tmp_aof_path(label: &str) -> PathBuf { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_nanos()) + .unwrap_or(0); + let n = COUNTER.fetch_add(1, Ordering::SeqCst); + std::env::temp_dir().join(format!("ferrum-stress-{label}-{nanos}-{n}.aof")) +} + +struct ServerGuard { + addr: String, + _thread: thread::JoinHandle<()>, +} + +fn spawn_server(engine: KvEngine) -> ServerGuard { + let listener = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port"); + let addr = listener.local_addr().expect("local_addr").to_string(); + let handle = thread::spawn(move || { + let _ = server::run_listener(listener, engine, Shutdown::new(), ServerConfig::default()); + }); + ServerGuard { + addr, + _thread: handle, + } +} + +fn connect(addr: &str) -> TcpStream { + let stream = TcpStream::connect(addr).expect("connect"); + stream + .set_read_timeout(Some(Duration::from_secs(5))) + .expect("set_read_timeout"); + stream + .set_write_timeout(Some(Duration::from_secs(5))) + .expect("set_write_timeout"); + stream +} + +fn build_request(args: &[&[u8]]) -> Vec { + let mut out = Vec::new(); + out.extend_from_slice(format!("*{}\r\n", args.len()).as_bytes()); + for arg in args { + encoder::encode_bulk_string(&mut out, arg); + } + out +} + +/// Reads a single RESP2 reply off the wire. Only the reply kinds produced by +/// the server in these tests are recognised; anything else is an error. +fn read_reply(stream: &mut TcpStream) -> Vec { + let mut header = Vec::new(); + let mut byte = [0u8; 1]; + loop { + stream.read_exact(&mut byte).expect("read reply header"); + header.push(byte[0]); + if header.len() >= 2 && header.ends_with(b"\r\n") { + break; + } + } + match header[0] { + b'+' | b'-' | b':' => header, + b'$' => { + let len: i64 = std::str::from_utf8(&header[1..header.len() - 2]) + .expect("bulk len utf8") + .parse() + .expect("bulk len parse"); + if len < 0 { + return header; // null bulk + } + let mut body = vec![0u8; len as usize + 2]; + stream.read_exact(&mut body).expect("read bulk body"); + header.extend_from_slice(&body); + header + } + other => panic!("unexpected reply prefix: {other:#x}"), + } +} + +fn expect_simple_ok(stream: &mut TcpStream) { + assert_eq!(read_reply(stream), b"+OK\r\n"); +} + +fn expect_integer(stream: &mut TcpStream) -> i64 { + let reply = read_reply(stream); + assert_eq!(reply[0], b':', "expected integer reply, got {reply:?}"); + std::str::from_utf8(&reply[1..reply.len() - 2]) + .expect("int utf8") + .parse() + .expect("int parse") +} + +#[test] +fn many_clients_with_disjoint_keys_do_not_lose_writes() { + const CLIENTS: usize = 16; + const OPS_PER_CLIENT: usize = 200; + + let engine = KvEngine::new(); + let server = spawn_server(engine.clone()); + + let mut handles = Vec::with_capacity(CLIENTS); + for cid in 0..CLIENTS { + let addr = server.addr.clone(); + handles.push(thread::spawn(move || { + let mut s = connect(&addr); + for op in 0..OPS_PER_CLIENT { + let key = format!("c{cid}:k{op}"); + let value = format!("c{cid}:v{op}"); + s.write_all(&build_request(&[b"SET", key.as_bytes(), value.as_bytes()])) + .expect("write SET"); + expect_simple_ok(&mut s); + } + })); + } + for h in handles { + h.join().expect("client thread"); + } + + assert_eq!(engine.dbsize().unwrap(), CLIENTS * OPS_PER_CLIENT); + for cid in 0..CLIENTS { + for op in 0..OPS_PER_CLIENT { + let key = format!("c{cid}:k{op}"); + let expected = format!("c{cid}:v{op}"); + assert_eq!( + engine.get(key.as_bytes()).unwrap(), + Some(expected.into_bytes()), + "client {cid} op {op} lost" + ); + } + } +} + +#[test] +fn concurrent_incr_on_shared_counter_is_atomic() { + const CLIENTS: usize = 16; + const INCRS_PER_CLIENT: usize = 500; + + let engine = KvEngine::new(); + let server = spawn_server(engine.clone()); + + let mut handles = Vec::with_capacity(CLIENTS); + for _ in 0..CLIENTS { + let addr = server.addr.clone(); + handles.push(thread::spawn(move || { + let mut s = connect(&addr); + for _ in 0..INCRS_PER_CLIENT { + s.write_all(&build_request(&[b"INCR", b"counter"])) + .expect("write INCR"); + let _ = expect_integer(&mut s); + } + })); + } + for h in handles { + h.join().expect("client thread"); + } + + let expected = (CLIENTS * INCRS_PER_CLIENT) as i64; + let stored = engine.get(b"counter").unwrap().expect("counter must exist"); + let as_int: i64 = std::str::from_utf8(&stored) + .expect("int utf8") + .parse() + .expect("int parse"); + assert_eq!(as_int, expected, "lost INCR under contention"); +} + +#[test] +fn concurrent_writes_replay_into_identical_state() { + const CLIENTS: usize = 8; + const OPS_PER_CLIENT: usize = 100; + + let path = tmp_aof_path("replay"); + let cfg = AofConfig::new(&path, FsyncPolicy::Always); + let writer = Arc::new(AofWriter::open(&cfg).expect("open aof")); + let engine = KvEngine::new().with_aof(Arc::clone(&writer)); + let server = spawn_server(engine.clone()); + + let mut handles = Vec::with_capacity(CLIENTS); + for cid in 0..CLIENTS { + let addr = server.addr.clone(); + handles.push(thread::spawn(move || { + let mut s = connect(&addr); + for op in 0..OPS_PER_CLIENT { + let key = format!("c{cid}:k{op}"); + let value = format!("v{op}"); + s.write_all(&build_request(&[b"SET", key.as_bytes(), value.as_bytes()])) + .expect("write SET"); + expect_simple_ok(&mut s); + + s.write_all(&build_request(&[b"INCR", b"shared:hits"])) + .expect("write INCR"); + let _ = expect_integer(&mut s); + } + })); + } + for h in handles { + h.join().expect("client thread"); + } + + // Drop the writer so buffers flush before we replay. + let running_dbsize = engine.dbsize().unwrap(); + let running_counter = engine.get(b"shared:hits").unwrap(); + drop(engine); + drop(writer); + + let restored = KvEngine::new(); + let stats = replay(&path, &restored).expect("replay"); + assert_eq!( + stats.skipped, 0, + "replay must not drop records written under contention" + ); + assert!( + !stats.truncated_tail, + "clean shutdown must not leave a truncated tail" + ); + + assert_eq!(restored.dbsize().unwrap(), running_dbsize); + assert_eq!(restored.get(b"shared:hits").unwrap(), running_counter); + + for cid in 0..CLIENTS { + for op in 0..OPS_PER_CLIENT { + let key = format!("c{cid}:k{op}"); + let expected = format!("v{op}"); + assert_eq!( + restored.get(key.as_bytes()).unwrap(), + Some(expected.into_bytes()), + "replay lost c{cid} op {op}" + ); + } + } + + let _ = fs::remove_file(&path); +}