Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 25 additions & 32 deletions src/production/connection_optimized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ pub struct OptimizedConnectionHandler<S> {
transaction_errors: bool,
/// Watched keys with their values at WATCH time (for optimistic locking)
watched_keys: Vec<(String, RespValue)>,
/// Timestamp for the current read batch — amortizes clock_gettime syscalls
/// across all commands in a pipeline batch instead of 2 syscalls per command.
batch_start: Instant,
}

impl<S> OptimizedConnectionHandler<S>
Expand Down Expand Up @@ -168,6 +171,7 @@ where
transaction_queue: Vec::new(),
transaction_errors: false,
watched_keys: Vec::new(),
batch_start: Instant::now(),
}
}

Expand All @@ -191,6 +195,8 @@ where
break;
}
Ok(n) => {
self.batch_start = Instant::now();

if self.buffer.len() + n > self.config.max_buffer_size {
error!(
"Buffer overflow from {}, closing connection",
Expand Down Expand Up @@ -220,9 +226,8 @@ where

if get_count >= batch_threshold {
// Batch execute multiple GETs concurrently
let start = Instant::now();
let results = self.state.fast_batch_get_pipeline(get_keys).await;
let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
let duration_ms = self.batch_start.elapsed().as_secs_f64() * 1000.0;

for response in &results {
let success = !matches!(response, RespValue::Error(_));
Expand All @@ -242,10 +247,9 @@ where

if set_count >= batch_threshold {
// Batch execute multiple SETs concurrently
let start = Instant::now();
let results =
self.state.fast_batch_set_pipeline(set_pairs).await;
let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
let duration_ms = self.batch_start.elapsed().as_secs_f64() * 1000.0;

for response in &results {
let success = !matches!(response, RespValue::Error(_));
Expand Down Expand Up @@ -337,7 +341,6 @@ where
Ok(Some(resp_value)) => match Command::from_resp_zero_copy(&resp_value) {
Ok(cmd) => {
let cmd_name = cmd.name();
let start = Instant::now();

// Handle connection-level transaction state
let response = if self.in_transaction {
Expand Down Expand Up @@ -535,7 +538,7 @@ where
}
};

let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
let duration_ms = self.batch_start.elapsed().as_secs_f64() * 1000.0;
let success = !matches!(&response, RespValue::Error(_));
self.metrics.record_command(cmd_name, duration_ms, success);

Expand Down Expand Up @@ -1156,12 +1159,10 @@ where
break; // Need more data
}

// Extract key
let key = bytes::Bytes::copy_from_slice(&buf[key_start..key_start + key_len]);
// Zero-copy: split consumed bytes from buffer, freeze to Bytes, then slice the key
let consumed = self.buffer.split_to(total_needed).freeze();
let key = consumed.slice(key_start..key_start + key_len);
keys.push(key);

// Consume this GET from buffer
let _ = self.buffer.split_to(total_needed);
}

let count = keys.len();
Expand Down Expand Up @@ -1242,13 +1243,11 @@ where
break; // Need more data
}

// Extract key and value
let key = bytes::Bytes::copy_from_slice(&buf[key_start..key_end]);
let value = bytes::Bytes::copy_from_slice(&buf[val_start..val_start + val_len]);
// Zero-copy: split consumed bytes, freeze, then slice key and value
let consumed = self.buffer.split_to(total_needed).freeze();
let key = consumed.slice(key_start..key_end);
let value = consumed.slice(val_start..val_start + val_len);
pairs.push((key, value));

// Consume this SET from buffer
let _ = self.buffer.split_to(total_needed);
}

let count = pairs.len();
Expand Down Expand Up @@ -1320,16 +1319,13 @@ where
return FastPathResult::NeedMoreData;
}

// Extract key as bytes::Bytes (zero-copy from buffer)
let key = bytes::Bytes::copy_from_slice(&buf[key_start..key_start + key_len]);

// Consume the parsed bytes from buffer
let _ = self.buffer.split_to(total_needed);
// Zero-copy: split consumed bytes, freeze, then slice key
let consumed = self.buffer.split_to(total_needed).freeze();
let key = consumed.slice(key_start..key_start + key_len);

// Execute fast GET using pooled response slot (avoids oneshot allocation)
let start = Instant::now();
let response = self.state.pooled_fast_get(key).await;
let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
let duration_ms = self.batch_start.elapsed().as_secs_f64() * 1000.0;
let success = !matches!(&response, RespValue::Error(_));
self.metrics.record_command("GET", duration_ms, success);

Expand Down Expand Up @@ -1398,17 +1394,14 @@ where
return FastPathResult::NeedMoreData;
}

// Extract key and value as bytes::Bytes
let key = bytes::Bytes::copy_from_slice(&buf[key_start..key_end]);
let value = bytes::Bytes::copy_from_slice(&buf[val_start..val_start + val_len]);

// Consume the parsed bytes
let _ = self.buffer.split_to(total_needed);
// Zero-copy: split consumed bytes, freeze, then slice key and value
let consumed = self.buffer.split_to(total_needed).freeze();
let key = consumed.slice(key_start..key_end);
let value = consumed.slice(val_start..val_start + val_len);

// Execute fast SET using pooled response slot (avoids oneshot allocation)
let start = Instant::now();
let response = self.state.pooled_fast_set(key, value).await;
let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
let duration_ms = self.batch_start.elapsed().as_secs_f64() * 1000.0;
let success = !matches!(&response, RespValue::Error(_));
self.metrics.record_command("SET", duration_ms, success);

Expand Down
32 changes: 32 additions & 0 deletions src/production/perf_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ pub struct PerformanceConfig {
/// Batching configuration
#[serde(default)]
pub batching: BatchingConfig,

/// Connection pool configuration
#[serde(default)]
pub connection_pool: ConnectionPoolConfig,
}

/// Response pool parameters for reducing channel allocation overhead
Expand Down Expand Up @@ -63,6 +67,18 @@ pub struct BatchingConfig {
pub batch_threshold: usize,
}

/// Connection pool parameters
#[derive(Debug, Clone, Deserialize)]
pub struct ConnectionPoolConfig {
/// Maximum concurrent connections (default: 10000)
#[serde(default = "default_max_connections")]
pub max_connections: usize,

/// Number of pre-allocated I/O buffers in the pool (default: 64)
#[serde(default = "default_buffer_pool_size")]
pub buffer_pool_size: usize,
}

// Default value functions for serde
fn default_num_shards() -> usize {
16
Expand All @@ -85,6 +101,12 @@ fn default_min_pipeline_buffer() -> usize {
fn default_batch_threshold() -> usize {
2
}
fn default_max_connections() -> usize {
10000
}
fn default_buffer_pool_size() -> usize {
64
}

impl Default for PerformanceConfig {
fn default() -> Self {
Expand All @@ -93,6 +115,16 @@ impl Default for PerformanceConfig {
response_pool: ResponsePoolConfig::default(),
buffers: BufferConfig::default(),
batching: BatchingConfig::default(),
connection_pool: ConnectionPoolConfig::default(),
}
}
}

impl Default for ConnectionPoolConfig {
fn default() -> Self {
Self {
max_connections: default_max_connections(),
buffer_pool_size: default_buffer_pool_size(),
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions src/production/server_optimized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ impl OptimizedRedisServer {
}

info!(
"Performance config: shards={}, pool_capacity={}, pool_prewarm={}, read_buffer={}, min_pipeline={}",
"Performance config: shards={}, pool_capacity={}, pool_prewarm={}, read_buffer={}, min_pipeline={}, max_conns={}, buffer_pool={}",
perf_config.num_shards,
perf_config.response_pool.capacity,
perf_config.response_pool.prewarm,
perf_config.buffers.read_size,
perf_config.batching.min_pipeline_buffer,
perf_config.connection_pool.max_connections,
perf_config.connection_pool.buffer_pool_size,
);

// Load security configuration
Expand Down Expand Up @@ -78,7 +80,10 @@ impl OptimizedRedisServer {
let acl_manager = Arc::new(RwLock::new(acl_manager));

let state = ShardedActorState::with_perf_config(&perf_config);
let connection_pool = Arc::new(ConnectionPool::new(10000, 512));
let connection_pool = Arc::new(ConnectionPool::new(
perf_config.connection_pool.max_connections,
perf_config.connection_pool.buffer_pool_size,
));

// Create connection config from performance config
let conn_config =
Expand Down
148 changes: 80 additions & 68 deletions src/production/sharded_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,78 +183,90 @@ impl ShardActor {

async fn run(mut self) {
while let Some(msg) = self.rx.recv().await {
match msg {
ShardMessage::Command {
cmd,
virtual_time,
response_tx,
} => {
self.executor.set_time(virtual_time);
let response = self.executor.execute(&cmd);
let _ = response_tx.send(response);
}
ShardMessage::BatchCommand { cmd, virtual_time } => {
// Fire-and-forget: execute without sending response
self.executor.set_time(virtual_time);
let _ = self.executor.execute(&cmd);
}
ShardMessage::EvictExpired {
virtual_time,
response_tx,
} => {
let evicted = self.executor.evict_expired_direct(virtual_time);
let _ = response_tx.send(evicted);
}
ShardMessage::FastGet { key, response_tx } => {
// Fast path: direct GET without Command enum overhead
let key_str = unsafe { std::str::from_utf8_unchecked(&key) };
let response = self.executor.get_direct(key_str);
let _ = response_tx.send(response);
}
ShardMessage::FastSet {
key,
value,
response_tx,
} => {
// Fast path: direct SET without Command enum overhead
let key_str = unsafe { std::str::from_utf8_unchecked(&key) };
let response = self.executor.set_direct(key_str, &value);
let _ = response_tx.send(response);
}
ShardMessage::FastBatchGet { keys, response_tx } => {
// Batch GET: process multiple keys in single message
let mut results = Vec::with_capacity(keys.len());
for key in keys {
let key_str = unsafe { std::str::from_utf8_unchecked(&key) };
results.push(self.executor.get_direct(key_str));
}
let _ = response_tx.send(results);
}
ShardMessage::FastBatchSet { pairs, response_tx } => {
// Batch SET: process multiple key-value pairs in single message
let mut results = Vec::with_capacity(pairs.len());
for (key, value) in pairs {
let key_str = unsafe { std::str::from_utf8_unchecked(&key) };
results.push(self.executor.set_direct(key_str, &value));
}
let _ = response_tx.send(results);
}
ShardMessage::PooledFastGet { key, response_slot } => {
// Pooled fast GET: uses response slot instead of oneshot
self.process_message(msg);
// FoundationDB actor loop pattern: drain all pending messages without
// yielding back to the tokio scheduler. This reduces context switches
// (24% of CPU in profiling) by batching work within a single scheduler turn.
while let Ok(msg) = self.rx.try_recv() {
self.process_message(msg);
}
}
}

/// Process a single shard message. Extracted to support the try_recv drain pattern.
#[inline]
fn process_message(&mut self, msg: ShardMessage) {
match msg {
ShardMessage::Command {
cmd,
virtual_time,
response_tx,
} => {
self.executor.set_time(virtual_time);
let response = self.executor.execute(&cmd);
let _ = response_tx.send(response);
}
ShardMessage::BatchCommand { cmd, virtual_time } => {
// Fire-and-forget: execute without sending response
self.executor.set_time(virtual_time);
let _ = self.executor.execute(&cmd);
}
ShardMessage::EvictExpired {
virtual_time,
response_tx,
} => {
let evicted = self.executor.evict_expired_direct(virtual_time);
let _ = response_tx.send(evicted);
}
ShardMessage::FastGet { key, response_tx } => {
// Fast path: direct GET without Command enum overhead
let key_str = unsafe { std::str::from_utf8_unchecked(&key) };
let response = self.executor.get_direct(key_str);
let _ = response_tx.send(response);
}
ShardMessage::FastSet {
key,
value,
response_tx,
} => {
// Fast path: direct SET without Command enum overhead
let key_str = unsafe { std::str::from_utf8_unchecked(&key) };
let response = self.executor.set_direct(key_str, &value);
let _ = response_tx.send(response);
}
ShardMessage::FastBatchGet { keys, response_tx } => {
// Batch GET: process multiple keys in single message
let mut results = Vec::with_capacity(keys.len());
for key in keys {
let key_str = unsafe { std::str::from_utf8_unchecked(&key) };
let response = self.executor.get_direct(key_str);
response_slot.send(response);
results.push(self.executor.get_direct(key_str));
}
ShardMessage::PooledFastSet {
key,
value,
response_slot,
} => {
// Pooled fast SET: uses response slot instead of oneshot
let _ = response_tx.send(results);
}
ShardMessage::FastBatchSet { pairs, response_tx } => {
// Batch SET: process multiple key-value pairs in single message
let mut results = Vec::with_capacity(pairs.len());
for (key, value) in pairs {
let key_str = unsafe { std::str::from_utf8_unchecked(&key) };
let response = self.executor.set_direct(key_str, &value);
response_slot.send(response);
results.push(self.executor.set_direct(key_str, &value));
}
let _ = response_tx.send(results);
}
ShardMessage::PooledFastGet { key, response_slot } => {
// Pooled fast GET: uses response slot instead of oneshot
let key_str = unsafe { std::str::from_utf8_unchecked(&key) };
let response = self.executor.get_direct(key_str);
response_slot.send(response);
}
ShardMessage::PooledFastSet {
key,
value,
response_slot,
} => {
// Pooled fast SET: uses response slot instead of oneshot
let key_str = unsafe { std::str::from_utf8_unchecked(&key) };
let response = self.executor.set_direct(key_str, &value);
response_slot.send(response);
}
}
}
Expand Down
Loading
Loading