A toolkit of distributed systems primitives for Rust, backed by Redis.
distkit provides building blocks for distributed applications. It ships distributed counters (strict and lax), instance-aware counters, and rate limiting, all backed by Redis.
- StrictCounter -- every operation executes a Redis Lua script atomically. Reads always reflect the latest write. Best for billing, inventory, or anything where accuracy is critical.
- LaxCounter -- buffers increments in memory and flushes to Redis every ~20 ms. Sub-microsecond latency on the hot path. Best for analytics and high-throughput metrics.
- Instance-aware counters -- each running instance owns a named slice of the total, with automatic cleanup of contributions from instances that stop heartbeating.
- Rate limiting (opt-in
trypemafeature) -- sliding-window rate limiting with local, Redis-backed, and hybrid providers. Supports absolute and probabilistic suppression strategies. - Safe by default --
#![forbid(unsafe_code)], no panics in library code.
| Feature | Default | Description |
|---|---|---|
counter |
yes | Distributed counters (StrictCounter, LaxCounter) |
instance-aware-counter |
no | Per-instance counters (StrictInstanceAwareCounter, LaxInstanceAwareCounter) |
trypema |
no | Rate limiting via the trypema crate |
cargo add distkitOr add to Cargo.toml:
[dependencies]
distkit = "0.4"To enable instance-aware counters or rate limiting:
[dependencies]
distkit = { version = "0.4", features = ["instance-aware-counter", "trypema"] }distkit requires a running Redis instance (5.0+ for Lua script support).
use distkit::{DistkitRedisKey, counter::{StrictCounter, LaxCounter, CounterOptions, CounterTrait}};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = redis::Client::open("redis://127.0.0.1/")?;
let conn = client.get_connection_manager().await?;
let prefix = DistkitRedisKey::try_from("my_app".to_string())?;
let options = CounterOptions::new(prefix, conn);
let key = DistkitRedisKey::try_from("page_views".to_string())?;
// Strict: immediate consistency
let strict = StrictCounter::new(options.clone());
strict.inc(&key, 1).await?;
let total = strict.get(&key).await?;
println!("strict: {total}");
// Lax: eventual consistency, much faster
let lax = LaxCounter::new(options);
lax.inc(&key, 1).await?;
let approx = lax.get(&key).await?;
println!("lax: {approx}");
Ok(())
}Every call is a single Redis round-trip executing an atomic Lua script. The counter value is always authoritative.
let key = DistkitRedisKey::try_from("orders".to_string())?;
strict.inc(&key, 1).await?; // HINCRBY via Lua
strict.set(&key, 100).await?; // HSET via Lua
strict.del(&key).await?; // HDEL, returns old value
strict.clear().await?; // DEL on the hashConditional writes use CounterComparator and return (new, old). When the
comparison fails, new == old.
use distkit::CounterComparator;
strict.set(&key, 10).await?;
assert_eq!(
strict.inc_if(&key, CounterComparator::Eq(10), 5).await?,
(15, 10)
);
assert_eq!(
strict.set_if(&key, CounterComparator::Gt(20), 99).await?,
(15, 15)
);Batch increments follow the same rules and preserve input order.
let results = strict
.inc_all_if(&[
(&key, CounterComparator::Eq(15), 2),
(&key, CounterComparator::Nil, 3),
])
.await?;
assert_eq!(results, vec![(&key, 17, 15), (&key, 20, 17)]);Writes are buffered in a local DashMap and flushed to Redis in batched
pipelines every allowed_lag (default 20 ms). Reads return the local view
(remote_total + pending_delta), which is always consistent within the same
process.
let key = DistkitRedisKey::try_from("impressions".to_string())?;
lax.inc(&key, 1).await?; // local atomic add, sub-microsecond
let val = lax.get(&key).await?; // reads local state, no Redis hitA background Tokio task handles flushing. It holds a Weak reference to the
counter, so it stops automatically when the counter is dropped.
StrictCounter |
LaxCounter |
StrictInstanceAwareCounter |
LaxInstanceAwareCounter |
|
|---|---|---|---|---|
| Consistency | Immediate | Eventual (default: ~20 ms lag) | Immediate | Eventual (flush_interval lag) |
inc latency |
Redis round-trip | Sub-microsecond (warm path) | Redis round-trip | Sub-microsecond (warm path) |
| Redis I/O | Every operation | Batched on interval | Every inc |
Batched on interval |
set / del |
Immediate | Immediate | Immediate (bumps epoch) | Flushes pending delta, then immediate |
| Per-instance tracking | No | No | Yes | Yes |
| Dead-instance cleanup | No | No | Yes | Yes |
| Feature flag | counter (default) |
counter (default) |
instance-aware-counter |
instance-aware-counter |
| Use case | Billing, inventory, exact global count | Analytics, high-throughput metrics | Connection counts, exact live metrics | High-frequency per-node throughput metrics |
Enable the instance-aware-counter feature:
[dependencies]
distkit = { version = "0.4", features = ["instance-aware-counter"] }Instance-aware counters track each running instance's contribution separately.
The cumulative total is the sum of all live instances. When an instance stops
heartbeating for longer than dead_instance_threshold_ms (default 30 s), its
contribution is automatically subtracted from the cumulative on the next
operation by any surviving instance.
This makes them well-suited for:
- Connection pool sizing -- each server reports its active connection count; the cumulative is the cluster-wide total.
- Live session counting -- contributions disappear naturally when a node restarts or crashes.
- Per-node metrics -- see both the global total and each instance's slice.
Conditional instance-aware writes follow the same rule set:
inc_ifandset_ifcompare against the cumulative total.set_on_instance_ifcompares against the calling instance's slice.- Failed comparisons return the current
(cumulative, instance_count)unchanged.
Every call is immediately consistent with Redis. set and del bump a
per-key epoch that causes stale instances to reset their stored count on
their next operation, preventing double-counting.
use distkit::icounter::{
InstanceAwareCounterTrait,
StrictInstanceAwareCounter, StrictInstanceAwareCounterOptions,
};
use distkit::DistkitRedisKey;
let client = redis::Client::open("redis://127.0.0.1/")?;
let conn = client.get_connection_manager().await?;
let prefix = DistkitRedisKey::try_from("my_app".to_string())?;
let counter = StrictInstanceAwareCounter::new(
StrictInstanceAwareCounterOptions::new(prefix, conn),
);
let key = DistkitRedisKey::try_from("connections".to_string())?;
// Increment this instance's contribution; returns (cumulative, instance_count).
let (total, mine) = counter.inc(&key, 5).await?;
// Decrement this instance's contribution.
let (total, mine) = counter.dec(&key, 2).await?;
// Read without modifying.
let (total, mine) = counter.get(&key).await?;
// Set this instance's slice to an exact value without bumping the epoch.
let (total, mine) = counter.set_on_instance(&key, 10).await?;
// Set the global total to an exact value and bump the epoch.
let (total, mine) = counter.set(&key, 100).await?;
// Remove only this instance's contribution.
let (total, removed) = counter.del_on_instance(&key).await?;
// Delete the key globally and bump the epoch.
let (old_total, _) = counter.del(&key).await?;Each instance sends a heartbeat on every operation. If a process silently dies, surviving instances automatically remove its contribution the next time any of them touches the same key.
use distkit::icounter::{
InstanceAwareCounterTrait,
StrictInstanceAwareCounter, StrictInstanceAwareCounterOptions,
};
use distkit::DistkitRedisKey;
let client = redis::Client::open("redis://127.0.0.1/")?;
let conn1 = client.get_connection_manager().await?;
let conn2 = client.get_connection_manager().await?;
let prefix = DistkitRedisKey::try_from("my_app".to_string())?;
let key = DistkitRedisKey::try_from("connections".to_string())?;
let opts = |conn| StrictInstanceAwareCounterOptions {
prefix: prefix.clone(),
connection_manager: conn,
dead_instance_threshold_ms: 30_000, // 30 s
};
let server_a = StrictInstanceAwareCounter::new(opts(conn1));
let server_b = StrictInstanceAwareCounter::new(opts(conn2));
server_a.inc(&key, 10).await?; // cumulative = 10
server_b.inc(&key, 5).await?; // cumulative = 15
// server_a goes offline. After 30 s, server_b's next call removes its
// contribution automatically.
let (total, _) = server_b.get(&key).await?; // total = 5 once cleaned upA buffered wrapper around StrictInstanceAwareCounter. inc calls accumulate
locally and are flushed to the strict counter in bulk every flush_interval
(default 20 ms). Global operations (set, del, clear) flush any pending
delta first, then delegate immediately.
Use this when you have many inc/dec calls per second and can tolerate a
small consistency lag.
use distkit::icounter::{
InstanceAwareCounterTrait,
LaxInstanceAwareCounter, LaxInstanceAwareCounterOptions,
};
use distkit::DistkitRedisKey;
use std::time::Duration;
let client = redis::Client::open("redis://127.0.0.1/")?;
let conn = client.get_connection_manager().await?;
let prefix = DistkitRedisKey::try_from("my_app".to_string())?;
let counter = LaxInstanceAwareCounter::new(LaxInstanceAwareCounterOptions {
prefix,
connection_manager: conn,
dead_instance_threshold_ms: 30_000,
flush_interval: Duration::from_millis(20),
allowed_lag: Duration::from_millis(20),
});
let key = DistkitRedisKey::try_from("connections".to_string())?;
// Returns the local estimate immediately — no Redis round-trip on warm path.
let (local_total, mine) = counter.inc(&key, 1).await?;
// Decrement also stays local until flushed.
let (local_total, mine) = counter.dec(&key, 1).await?;
// get() also returns the local estimate (cumulative + pending delta).
let (total, mine) = counter.get(&key).await?;Enable the trypema feature to access sliding-window rate limiting.
Trypema documentation website: https://trypema.davidoyinbo.com
[dependencies]
distkit = { version = "0.4", features = ["trypema"] }All public types from the trypema crate are
re-exported under distkit::trypema. The module provides:
- Sliding-window rate limiting with configurable window size and rate.
- Three providers -- local (in-process), Redis-backed (distributed), and hybrid (local fast-path with periodic Redis sync).
- Two strategies -- absolute (binary allow/reject) and suppressed (probabilistic degradation that smoothly ramps rejection probability).
use std::sync::Arc;
use distkit::trypema::{
HardLimitFactor, RateGroupSizeMs, RateLimit, RateLimitDecision,
RateLimiter, RateLimiterOptions, SuppressionFactorCacheMs, WindowSizeSeconds,
local::LocalRateLimiterOptions,
};
let rl = Arc::new(RateLimiter::new(RateLimiterOptions {
local: LocalRateLimiterOptions {
window_size_seconds: WindowSizeSeconds::try_from(60).unwrap(),
rate_group_size_ms: RateGroupSizeMs::try_from(100).unwrap(),
hard_limit_factor: HardLimitFactor::default(),
suppression_factor_cache_ms: SuppressionFactorCacheMs::default(),
},
}));
rl.run_cleanup_loop();
let rate = RateLimit::try_from(10.0).unwrap(); // 10 requests per second
match rl.local().absolute().inc("user_123", &rate, 1) {
RateLimitDecision::Allowed => { /* process request */ }
RateLimitDecision::Rejected { retry_after_ms, .. } => {
eprintln!("Rate limited, retry in {retry_after_ms} ms");
}
_ => {}
}For distributed enforcement across multiple processes or servers:
use std::sync::Arc;
use distkit::trypema::{
HardLimitFactor, RateGroupSizeMs, RateLimit, RateLimitDecision,
RateLimiter, RateLimiterOptions, SuppressionFactorCacheMs, WindowSizeSeconds,
local::LocalRateLimiterOptions,
redis::{RedisKey, RedisRateLimiterOptions},
hybrid::SyncIntervalMs,
};
let client = redis::Client::open("redis://127.0.0.1/")?;
let conn = client.get_connection_manager().await?;
let window = WindowSizeSeconds::try_from(60)?;
let bucket = RateGroupSizeMs::try_from(100)?;
let rl = Arc::new(RateLimiter::new(RateLimiterOptions {
local: LocalRateLimiterOptions {
window_size_seconds: window,
rate_group_size_ms: bucket,
hard_limit_factor: HardLimitFactor::default(),
suppression_factor_cache_ms: SuppressionFactorCacheMs::default(),
},
redis: RedisRateLimiterOptions {
connection_manager: conn,
prefix: None,
window_size_seconds: window,
rate_group_size_ms: bucket,
hard_limit_factor: HardLimitFactor::default(),
suppression_factor_cache_ms: SuppressionFactorCacheMs::default(),
sync_interval_ms: SyncIntervalMs::default(),
},
}));
rl.run_cleanup_loop();
let key = RedisKey::try_from("user_123".to_string())?;
let rate = RateLimit::try_from(50.0)?;
// Distributed absolute enforcement
let decision = rl.redis().absolute().inc(&key, &rate, 1).await?;
// Or use the hybrid provider for local fast-path with Redis sync
let decision = rl.hybrid().absolute().inc(&key, &rate, 1).await?;See the trypema documentation for full API details and advanced configuration.
- Rust (latest stable)
- Docker (for the test Redis instance)
make test # Start Redis, run tests, tear down
make bench # Start Redis, run criterion benchmarks, tear down
make redis-up # Start Redis on port 16379
make redis-down # Stop Redis and remove volumesTests and benchmarks require the REDIS_URL environment variable.
The make targets set this automatically.
MIT