Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
0ee4d36
feat: additional tests for lax counters
dev-davexoyinbo Apr 11, 2026
8871534
test: test for different scenario
dev-davexoyinbo Apr 11, 2026
cbca476
test: add test to to test staleness
dev-davexoyinbo Apr 12, 2026
edf68ee
fix: fix fetching stale
dev-davexoyinbo Apr 12, 2026
0bcabcf
fix: race condition happening because of reset of the flush all_keys
dev-davexoyinbo Apr 12, 2026
f3463b4
feat: execute_pipeleine_with_script_retry
dev-davexoyinbo Apr 12, 2026
b431b73
fix: fix race condition for flushes
dev-davexoyinbo Apr 12, 2026
8a0fb0c
feat: interface for set_all and get_alls
dev-davexoyinbo Apr 12, 2026
de82903
feat: get_all and set_all implementations for lax_counter
dev-davexoyinbo Apr 12, 2026
9cc050c
feat: get_all and set_all implementation for strict_counter
dev-davexoyinbo Apr 12, 2026
9f40768
feat: use as_str instead of to_string
dev-davexoyinbo Apr 12, 2026
a9415b3
feat: use max batch size const
dev-davexoyinbo Apr 12, 2026
eabebab
feat: get_batch on strict instance aware
dev-davexoyinbo Apr 12, 2026
445533c
feat: include key in the get
dev-davexoyinbo Apr 12, 2026
bf713c3
feat: get and get_all
dev-davexoyinbo Apr 12, 2026
e3b7b47
feat: set all and set all on instance
dev-davexoyinbo Apr 12, 2026
2ce7f2c
feat: batch refresh stale
dev-davexoyinbo Apr 12, 2026
8cff052
feat: get_all lax_instance aware
dev-davexoyinbo Apr 12, 2026
43403fd
fix get_all_on instance , add locks and set_all on instance
dev-davexoyinbo Apr 12, 2026
37a0883
feat: set all
dev-davexoyinbo Apr 12, 2026
07f780c
feat: add instance wide mutex to prevent deadlock
dev-davexoyinbo Apr 12, 2026
b29ab5a
feat: allow lag for the instance_wide lock
dev-davexoyinbo Apr 12, 2026
0a7080a
test: update tests
dev-davexoyinbo Apr 12, 2026
000ea63
feat: update counter trait
dev-davexoyinbo Apr 12, 2026
ac38936
feat: comparator
dev-davexoyinbo Apr 12, 2026
377d877
feat: update traits
dev-davexoyinbo Apr 12, 2026
8688cfb
test: lax counter for confitional inc and set
dev-davexoyinbo Apr 12, 2026
fa4f58e
test: strict counter test
dev-davexoyinbo Apr 12, 2026
a606a2f
feat: strict counter update for conditionals
dev-davexoyinbo Apr 12, 2026
2d9cbcd
feat: lax_counter
dev-davexoyinbo Apr 12, 2026
82209f5
test: lax instance aware for conditionals
dev-davexoyinbo Apr 12, 2026
1ea2369
test: strict instance aware
dev-davexoyinbo Apr 12, 2026
079262c
feat: comparator lax instance aware
dev-davexoyinbo Apr 12, 2026
20230d0
feat: comparison strict instance aware
dev-davexoyinbo Apr 12, 2026
721b8e3
test: inc_if and inc_all_if
dev-davexoyinbo Apr 12, 2026
c19e63a
feat: add inc all if to traits
dev-davexoyinbo Apr 12, 2026
2d7ac1b
feat: inc and inc all if
dev-davexoyinbo Apr 13, 2026
e15cedb
feat: inc_all_if
dev-davexoyinbo Apr 13, 2026
0546165
feat: rename RedisKit and update docs
dev-davexoyinbo Apr 13, 2026
f2a2db8
feat: update bench
dev-davexoyinbo Apr 13, 2026
c94a595
bump version v0.3.0
dev-davexoyinbo Apr 13, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Commands

```bash
make test # Start Redis, run full test suite, tear down
make bench # Start Redis, run criterion benchmarks, tear down
make redis-up # Start Redis only (docker-compose, port REDIS_PORT=16379)
make redis-down # Stop Redis and remove volumes
```

To run tests manually (Redis must be running):
```bash
REDIS_URL="redis://127.0.0.1:16379/" cargo test --all-features
REDIS_URL="redis://127.0.0.1:16379/" cargo test --all-features <test_name> # single test
cargo test --doc --all-features # doctests only
```
Comment on lines +14 to +19
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

1. cargo test --doc missing redis_url 📘 Rule violation ☼ Reliability

CLAUDE.md instructs running cargo test --doc --all-features without setting REDIS_URL, despite
stating tests require a live Redis and REDIS_URL. This can cause contributors/automation to run
tests in an invalid environment, violating the required test invocation guidance.
Agent Prompt
## Issue description
`CLAUDE.md` includes a bare `cargo test --doc --all-features` command without `REDIS_URL`, which conflicts with the requirement that tests run with a live Redis and `REDIS_URL` set.

## Issue Context
The compliance checklist requires that documentation/instructions use `make test` for the full test suite, and that any direct `cargo test` invocation sets `REDIS_URL` and assumes Redis is running.

## Fix Focus Areas
- CLAUDE.md[14-21]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


**Always use `make test`, not bare `cargo test`** — tests require `REDIS_URL` to be set and a live Redis instance.

## Architecture

**distkit** is an async Rust library (Tokio + Redis) providing distributed counting primitives with two consistency modes and two counter families.

### Counter families

| Family | Trait | Strict impl | Lax impl |
|--------|-------|-------------|----------|
| Simple | `CounterTrait` | `StrictCounter` | `LaxCounter` |
| Instance-aware | `InstanceAwareCounterTrait` | `StrictInstanceAwareCounter` | `LaxInstanceAwareCounter` |

**Strict** counters: every operation is an atomic Lua script round-trip — fully consistent.

**Lax** counters: `inc`/`dec`/`get`/`set_on_instance` are served from an in-memory `DashMap`; a background Tokio task flushes accumulated deltas to Redis every `flush_interval` (default 20 ms). Epoch-bumping operations (`set`, `del`, `clear`) flush first, then delegate to the strict backend. The background task holds a `Weak` reference and stops when the counter is dropped.

### Instance-aware counters

Each counter instance gets a UUID. Operations return `(cumulative, instance_count)`. Redis stores:
- `cumulative_key` hash: key → global total
- `instance_count_key` hash: per-instance contributions
- `instances_key` sorted set: instance_id → last-heartbeat timestamp (ms)
- `epoch_key` hash: per-key epoch counter (bumped by `set`/`del` to invalidate stale slices)

Dead instances (no heartbeat for `dead_instance_threshold_ms`, default 30 s) are cleaned up by the next live instance that touches an affected key.

### Lua scripts

All Redis logic is embedded as inline Lua strings inside each `*_counter.rs` file — no external `.lua` files. `HELPERS_LUA` (in `strict_instance_aware_counter.rs`) defines shared helpers (`now_ms`, `delete_dead_instances`, `check_and_zadd`) that are prepended to every icounter script via string concatenation.

Scripts echo keys back in their return values so callers can build `HashMap<String, T>` results instead of relying on positional ordering. **Never use `.zip()` to align pipeline results with input keys** — use the HashMap keyed on the returned key string.

### `execute_pipeline_with_script_retry`

`src/common/mod.rs` exports this generic helper used by every batch pipeline operation:

```rust
execute_pipeline_with_script_retry(conn, script, items, |item| {
let mut inv = script.key(...);
inv.key(...).arg(...);
inv // return owned ScriptInvocation
})
```

On `NOSCRIPT` error it prepends `load_script` and retries the entire pipeline. Callers pass a closure returning one `ScriptInvocation<'s>` per item; the function owns all pipeline mechanics.

### `RedisKey`

Newtype wrapping `String`, validated on construction (`TryFrom<String>`): non-empty, ≤255 chars, no colons. Used as the public API key type throughout. `RedisKeyGenerator` prepends the counter-type prefix when building actual Redis keys.

### `ActivityTracker`

Drives the lax flush task's sleep/wake cycle. `signal()` sets `is_active = true` atomically and sends on a `watch` channel. The flush task parks at `is_active_watch.changed()` when idle; `run_is_active_task` sets `is_active = false` after `epoch_interval / 2` (7.5 s) of inactivity. The epoch advances every `EPOCH_CHANGE_INTERVAL` (15 s); `signal()` is a no-op within the same epoch to avoid redundant sends.

### Feature flags

- `counter` (default): `StrictCounter`, `LaxCounter`
- `instance-aware-counter` (default): `StrictInstanceAwareCounter`, `LaxInstanceAwareCounter`
- `trypema`: re-exports the `trypema` rate-limiting crate

### Module layout

```
src/
lib.rs # feature-gated re-exports
error.rs # DistkitError
common/
mod.rs # RedisKey, RedisKeyGenerator, execute_pipeline_with_script_retry,
# ActivityTracker, EPOCH_CHANGE_INTERVAL
counter/
counter_trait.rs # CounterTrait (async_trait)
strict_counter.rs # StrictCounter + embedded Lua
lax_counter.rs # LaxCounter + embedded Lua
tests/ # unit tests per impl
icounter/
mod.rs # InstanceAwareCounterTrait (async_trait)
strict_instance_aware_counter.rs # StrictInstanceAwareCounter + all Lua scripts
lax_instance_aware_counter.rs # LaxInstanceAwareCounter (wraps strict)
tests/
__doctest_helpers.rs # Counter factory helpers for inline doc examples
```
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "distkit"
version = "0.2.3"
version = "0.3.0"
edition = "2024"
description = "A toolkit of distributed systems primitives for Rust, backed by Redis"
authors = ["Oyinbo David Bayode <dev.davexoyinbo@gmail.com>"]
Expand Down Expand Up @@ -40,6 +40,7 @@ trypema = ["trypema-crate", "trypema-crate/redis-tokio"]
[dependencies]
async-trait = "0.1.89"
dashmap = "6.1.0"
regex = "1.12.3"
redis = { version = "1.2.0", features = [
"aio",
"connection-manager",
Expand Down
57 changes: 43 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,17 @@ distkit requires a running Redis instance (5.0+ for Lua script support).
## Quick start

```rust
use distkit::{RedisKey, counter::{StrictCounter, LaxCounter, CounterOptions, CounterTrait}};
use distkit::{DistkitRedisKey, counter::{StrictCounter, LaxCounter, CounterOptions, CounterTrait}};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = redis::Client::open("redis://127.0.0.1/")?;
let conn = client.get_connection_manager().await?;

let prefix = RedisKey::try_from("my_app".to_string())?;
let prefix = DistkitRedisKey::try_from("my_app".to_string())?;
let options = CounterOptions::new(prefix, conn);

let key = RedisKey::try_from("page_views".to_string())?;
let key = DistkitRedisKey::try_from("page_views".to_string())?;

// Strict: immediate consistency
let strict = StrictCounter::new(options.clone());
Expand All @@ -97,13 +97,36 @@ Every call is a single Redis round-trip executing an atomic Lua script. The
counter value is always authoritative.

```rust
let key = RedisKey::try_from("orders".to_string())?;
let key = DistkitRedisKey::try_from("orders".to_string())?;
strict.inc(&key, 1).await?; // HINCRBY via Lua
strict.set(&key, 100).await?; // HSET via Lua
strict.del(&key).await?; // HDEL, returns old value
strict.clear().await?; // DEL on the hash
```

Conditional writes use `CounterComparator` and return the current value
unchanged when the comparison fails.

```rust
use distkit::CounterComparator;

strict.set(&key, 10).await?;
assert_eq!(strict.inc_if(&key, CounterComparator::Eq(10), 5).await?, 15);
assert_eq!(strict.set_if(&key, CounterComparator::Gt(20), 99).await?, 15);
```

Batch increments follow the same rules and preserve input order.

```rust
let results = strict
.inc_all_if(&[
(&key, CounterComparator::Eq(15), 2),
(&key, CounterComparator::Nil, 3),
])
.await?;
assert_eq!(results, vec![(&key, 17), (&key, 20)]);
```

### LaxCounter

Writes are buffered in a local `DashMap` and flushed to Redis in batched
Expand All @@ -112,7 +135,7 @@ pipelines every `allowed_lag` (default 20 ms). Reads return the local view
process.

```rust
let key = RedisKey::try_from("impressions".to_string())?;
let key = DistkitRedisKey::try_from("impressions".to_string())?;
lax.inc(&key, 1).await?; // local atomic add, sub-microsecond
let val = lax.get(&key).await?; // reads local state, no Redis hit
```
Expand Down Expand Up @@ -156,6 +179,12 @@ This makes them well-suited for:
restarts or crashes.
- **Per-node metrics** -- see both the global total and each instance's slice.

Conditional instance-aware writes follow the same rule set:

- `inc_if` and `set_if` compare against the cumulative total.
- `set_on_instance_if` compares against the calling instance's slice.
- Failed comparisons return the current `(cumulative, instance_count)` unchanged.

### StrictInstanceAwareCounter

Every call is immediately consistent with Redis. `set` and `del` bump a
Expand All @@ -167,16 +196,16 @@ use distkit::icounter::{
InstanceAwareCounterTrait,
StrictInstanceAwareCounter, StrictInstanceAwareCounterOptions,
};
use distkit::RedisKey;
use distkit::DistkitRedisKey;

let client = redis::Client::open("redis://127.0.0.1/")?;
let conn = client.get_connection_manager().await?;
let prefix = RedisKey::try_from("my_app".to_string())?;
let prefix = DistkitRedisKey::try_from("my_app".to_string())?;
let counter = StrictInstanceAwareCounter::new(
StrictInstanceAwareCounterOptions::new(prefix, conn),
);

let key = RedisKey::try_from("connections".to_string())?;
let key = DistkitRedisKey::try_from("connections".to_string())?;

// Increment this instance's contribution; returns (cumulative, instance_count).
let (total, mine) = counter.inc(&key, 5).await?;
Expand Down Expand Up @@ -211,13 +240,13 @@ use distkit::icounter::{
InstanceAwareCounterTrait,
StrictInstanceAwareCounter, StrictInstanceAwareCounterOptions,
};
use distkit::RedisKey;
use distkit::DistkitRedisKey;

let client = redis::Client::open("redis://127.0.0.1/")?;
let conn1 = client.get_connection_manager().await?;
let conn2 = client.get_connection_manager().await?;
let prefix = RedisKey::try_from("my_app".to_string())?;
let key = RedisKey::try_from("connections".to_string())?;
let prefix = DistkitRedisKey::try_from("my_app".to_string())?;
let key = DistkitRedisKey::try_from("connections".to_string())?;

let opts = |conn| StrictInstanceAwareCounterOptions {
prefix: prefix.clone(),
Expand Down Expand Up @@ -250,12 +279,12 @@ use distkit::icounter::{
InstanceAwareCounterTrait,
LaxInstanceAwareCounter, LaxInstanceAwareCounterOptions,
};
use distkit::RedisKey;
use distkit::DistkitRedisKey;
use std::time::Duration;

let client = redis::Client::open("redis://127.0.0.1/")?;
let conn = client.get_connection_manager().await?;
let prefix = RedisKey::try_from("my_app".to_string())?;
let prefix = DistkitRedisKey::try_from("my_app".to_string())?;
let counter = LaxInstanceAwareCounter::new(LaxInstanceAwareCounterOptions {
prefix,
connection_manager: conn,
Expand All @@ -264,7 +293,7 @@ let counter = LaxInstanceAwareCounter::new(LaxInstanceAwareCounterOptions {
allowed_lag: Duration::from_millis(20),
});

let key = RedisKey::try_from("connections".to_string())?;
let key = DistkitRedisKey::try_from("connections".to_string())?;

// Returns the local estimate immediately — no Redis round-trip on warm path.
let (local_total, mine) = counter.inc(&key, 1).await?;
Expand Down
15 changes: 7 additions & 8 deletions benches/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,16 @@
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};

use distkit::DistkitRedisKey;
use distkit::counter::{CounterOptions, LaxCounter, StrictCounter};
use distkit::icounter::{
LaxInstanceAwareCounter, LaxInstanceAwareCounterOptions, StrictInstanceAwareCounter,
StrictInstanceAwareCounterOptions,
};
use distkit::RedisKey;
use redis::aio::ConnectionManager;

pub async fn make_connection() -> ConnectionManager {
let url = std::env::var("REDIS_URL")
.expect("REDIS_URL must be set — run via `make bench`");
let url = std::env::var("REDIS_URL").expect("REDIS_URL must be set — run via `make bench`");
let client = redis::Client::open(url).expect("REDIS_URL is not a valid Redis URL");
client
.get_connection_manager()
Expand Down Expand Up @@ -48,21 +47,21 @@ pub async fn make_lax_counter(bench_name: &str) -> Arc<LaxCounter> {
LaxCounter::new(CounterOptions::new(bench_prefix(bench_name), conn))
}

/// Builds a `RedisKey` from a plain name string.
pub fn key(name: &str) -> RedisKey {
RedisKey::try_from(name.to_string())
/// Builds a `DistkitRedisKey` from a plain name string.
pub fn key(name: &str) -> DistkitRedisKey {
DistkitRedisKey::try_from(name.to_string())
.expect("bench key must be non-empty, ≤255 chars, and colon-free")
}

// ---------------------------------------------------------------------------
// Internal helpers
// ---------------------------------------------------------------------------

fn bench_prefix(bench_name: &str) -> RedisKey {
fn bench_prefix(bench_name: &str) -> DistkitRedisKey {
let ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();
RedisKey::try_from(format!("bench_{}_{}", ts, bench_name))
DistkitRedisKey::try_from(format!("bench_{}_{}", ts, bench_name))
.expect("constructed bench prefix is always valid")
}
22 changes: 11 additions & 11 deletions benches/strict_instance_aware_counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
// function uses a distinct member key so there is no cross-bench state
// interference. Destructive operations (del, del_on_instance, clear,
// clear_on_instance) use `iter_batched` to re-seed the key before every
// measured call. `inc_batch_10` rebuilds its input Vec via `iter_batched`
// because the vec is drained after each call.
// measured call. `inc_all_10` rebuilds its borrowed input Vec each iteration.

mod common;

use std::time::Duration;

use criterion::{BatchSize, Criterion, criterion_group, criterion_main};
use distkit::RedisKey;
use distkit::DistkitRedisKey;
use distkit::icounter::InstanceAwareCounterTrait;
use tokio::runtime::Runtime;

fn bench_strict_instance_aware_counter(c: &mut Criterion) {
Expand Down Expand Up @@ -91,17 +91,17 @@ fn bench_strict_instance_aware_counter(c: &mut Criterion) {
);
});

// inc_batch_10 — pipeline 10 distinct keys in a single batch.
// The Vec is rebuilt each iteration (inc_batch drains it). The allocation
// is negligible compared to the Redis round-trip being measured.
let batch_keys: Vec<RedisKey> = (0..10)
// inc_all_10 — pipeline 10 distinct keys in a single batch.
// The borrowed input Vec is rebuilt each iteration. The allocation is
// negligible compared to the Redis round-trip being measured.
let batch_keys: Vec<DistkitRedisKey> = (0..10)
.map(|i| common::key(&format!("batch_{i}")))
.collect();
group.bench_function("inc_batch_10", |b| {
group.bench_function("inc_all_10", |b| {
b.to_async(&rt).iter(|| async {
let mut increments: Vec<(RedisKey, i64)> =
batch_keys.iter().map(|k| (k.clone(), 1i64)).collect();
counter.inc_batch(&mut increments, 50).await.unwrap();
let increments: Vec<(&DistkitRedisKey, i64)> =
batch_keys.iter().map(|k| (k, 1i64)).collect();
counter.inc_all(&increments).await.unwrap();
});
});

Expand Down
Loading
Loading