Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .config/nextest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ test-threads = 15
# other, while still allowing unrelated tests to use the remaining default
# concurrency budget.
stateful-heavy = { max-threads = 1 }
proxied-reconnect = { max-threads = 2 }

[[profile.default.overrides]]
filter = 'test(test_scenario_08_subscription_reconnect)'
Expand Down Expand Up @@ -85,7 +86,7 @@ test-group = "stateful-heavy"

[[profile.default.overrides]]
filter = 'test(test_large_initial_snapshot_survives_repeated_outages)'
test-group = "stateful-heavy"
test-group = "proxied-reconnect"

[[profile.default.overrides]]
filter = 'test(test_latency_spike_during_initial_snapshot_recovers)'
Expand All @@ -97,7 +98,7 @@ test-group = "stateful-heavy"

[[profile.default.overrides]]
filter = 'test(test_loading_snapshot_with_live_writes_resumes_without_duplicate_rows)'
test-group = "stateful-heavy"
test-group = "proxied-reconnect"

[[profile.default.overrides]]
filter = 'test(test_proxy_three_subscriptions_resume_after_server_bounce)'
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/orm.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,7 @@ jobs:
- name: Run integration tests
working-directory: link/sdks/typescript/orm
env:
KALAMDB_TEST_URL: http://localhost:8088
KALAMDB_TEST_USER: admin
KALAMDB_TEST_PASSWORD: testpass123
run: node --test tests/driver.test.mjs tests/generate.test.mjs tests/cli.test.mjs tests/live.test.mjs
15 changes: 14 additions & 1 deletion .github/workflows/sdks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ jobs:
chmod +x ./kalamdb-server
echo "SDK test source: release-binary"

- name: Run TypeScript SDK tests
- name: Run TypeScript client, consumer, and ORM npm package tests
id: run_typescript_tests
continue-on-error: true
shell: bash
Expand All @@ -207,6 +207,19 @@ jobs:
set -euo pipefail
./scripts/test-typescript-sdk-release.sh

- name: Verify TypeScript npm package test coverage
if: always()
shell: bash
run: |
set -euo pipefail
output="ts-sdk-test-output.txt"
for package in client consumer orm; do
if ! grep -q "Running @kalamdb/${package} tests" "$output"; then
echo "Missing @kalamdb/${package} npm package test run in ${output}" >&2
exit 1
fi
done

- name: Parse TypeScript SDK test counts
if: always()
id: parse_typescript_badge
Expand Down
13 changes: 11 additions & 2 deletions backend/crates/kalamdb-api/src/ui/embedded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
//! 1. Run `npm run build` in the `ui/` directory
//! 2. Rebuild the server with `cargo build`

use std::borrow::Cow;

use actix_web::{web, HttpRequest, HttpResponse};
use log::debug;
use rust_embed::Embed;
Expand All @@ -22,6 +24,13 @@ use super::UiRuntimeConfig;
#[exclude = "kalam_client_bg.wasm"]
struct UiAssets;

fn embedded_asset_body(data: Cow<'static, [u8]>) -> web::Bytes {
match data {
Cow::Borrowed(bytes) => web::Bytes::from_static(bytes),
Cow::Owned(bytes) => web::Bytes::from(bytes),
}
}

/// Serve embedded UI assets
///
/// Handles requests to /ui/* and serves the appropriate static file.
Expand All @@ -43,7 +52,7 @@ pub async fn serve_embedded_ui(req: HttpRequest) -> HttpResponse {

debug!("[embedded_ui] Found file: {} (mime: {})", path, mime_type);

return HttpResponse::Ok().content_type(mime_type).body(content.data.into_owned());
return HttpResponse::Ok().content_type(mime_type).body(embedded_asset_body(content.data));
}

debug!("[embedded_ui] File not found: {}, falling back to index.html", path);
Expand All @@ -53,7 +62,7 @@ pub async fn serve_embedded_ui(req: HttpRequest) -> HttpResponse {
if let Some(index) = UiAssets::get("index.html") {
return HttpResponse::Ok()
.content_type("text/html; charset=utf-8")
.body(index.data.into_owned());
.body(embedded_asset_body(index.data));
}

// No UI built - show helpful message
Expand Down
2 changes: 0 additions & 2 deletions backend/crates/kalamdb-api/src/ws/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,12 @@ pub(super) async fn run_websocket(
msg = msg_stream.next() => {
match msg {
Some(Ok(Message::Ping(bytes))) => {
record_activity_now();
connection_state.update_heartbeat();
if session.pong(&bytes).await.is_err() {
break;
}
}
Some(Ok(Message::Pong(_))) => {
record_activity_now();
connection_state.update_heartbeat();
}
Some(Ok(Message::Text(text))) => {
Expand Down
185 changes: 120 additions & 65 deletions backend/crates/kalamdb-commons/benches/snowflake_generator.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::{
hint::black_box,
sync::{atomic::{AtomicU64, Ordering}, Arc},
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
thread,
time::{Duration, SystemTime, UNIX_EPOCH},
};

use criterion::{
criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion, Throughput,
};
use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion, Throughput};
use kalamdb_commons::ids::SnowflakeGenerator;
use parking_lot::Mutex;

Expand Down Expand Up @@ -160,13 +161,17 @@ fn bench_batch_generation(c: &mut Criterion) {
for batch_size in [32_usize, 256, 1024] {
group.throughput(Throughput::Elements(batch_size as u64));

group.bench_with_input(BenchmarkId::new("optimized", batch_size), &batch_size, |b, &size| {
b.iter_batched(
|| SnowflakeGenerator::new(1),
|generator| black_box(generator.next_ids(size).expect("optimized next_ids")),
BatchSize::SmallInput,
);
});
group.bench_with_input(
BenchmarkId::new("optimized", batch_size),
&batch_size,
|b, &size| {
b.iter_batched(
|| SnowflakeGenerator::new(1),
|generator| black_box(generator.next_ids(size).expect("optimized next_ids")),
BatchSize::SmallInput,
);
},
);

group.bench_with_input(BenchmarkId::new("legacy", batch_size), &batch_size, |b, &size| {
b.iter_batched(
Expand All @@ -180,78 +185,128 @@ fn bench_batch_generation(c: &mut Criterion) {
group.finish();
}

fn bench_mapped_batch_generation(c: &mut Criterion) {
let mut group = c.benchmark_group("snowflake_mapped_batch_generation");

for batch_size in [256_usize, 1024] {
group.throughput(Throughput::Elements(batch_size as u64));

group.bench_with_input(
BenchmarkId::new("optimized_mapped", batch_size),
&batch_size,
|b, &size| {
b.iter_batched(
|| SnowflakeGenerator::new(1),
|generator| {
black_box(
generator
.next_ids_mapped(size, |id| id.wrapping_mul(31))
.expect("optimized next_ids_mapped"),
)
},
BatchSize::SmallInput,
);
},
);

group.bench_with_input(
BenchmarkId::new("legacy_then_map", batch_size),
&batch_size,
|b, &size| {
b.iter_batched(
|| LegacySnowflakeGenerator::new(1),
|generator| {
let ids = generator.next_ids(size).expect("legacy next_ids");
black_box(ids.into_iter().map(|id| id.wrapping_mul(31)).collect::<Vec<_>>())
},
BatchSize::SmallInput,
);
},
);
}

group.finish();
}

fn bench_concurrent_single_generation(c: &mut Criterion) {
let mut group = c.benchmark_group("snowflake_concurrent_single_generation");
let thread_count = 8;
let ids_per_thread = 1_000;
group.throughput(Throughput::Elements((thread_count * ids_per_thread) as u64));

group.bench_function(BenchmarkId::new("optimized", format!("{}x{}", thread_count, ids_per_thread)), |b| {
b.iter(|| {
let generator = Arc::new(SnowflakeGenerator::new(1));
let duplicates = Arc::new(AtomicU64::new(0));
thread::scope(|scope| {
let mut handles = Vec::with_capacity(thread_count);
for _ in 0..thread_count {
let generator = Arc::clone(&generator);
let duplicates = Arc::clone(&duplicates);
handles.push(scope.spawn(move || {
let mut prev = None;
for _ in 0..ids_per_thread {
let next = generator.next_id().expect("optimized concurrent next_id");
if let Some(prev) = prev {
if next <= prev {
duplicates.fetch_add(1, Ordering::Relaxed);
group.bench_function(
BenchmarkId::new("optimized", format!("{}x{}", thread_count, ids_per_thread)),
|b| {
b.iter(|| {
let generator = Arc::new(SnowflakeGenerator::new(1));
let duplicates = Arc::new(AtomicU64::new(0));
thread::scope(|scope| {
let mut handles = Vec::with_capacity(thread_count);
for _ in 0..thread_count {
let generator = Arc::clone(&generator);
let duplicates = Arc::clone(&duplicates);
handles.push(scope.spawn(move || {
let mut prev = None;
for _ in 0..ids_per_thread {
let next =
generator.next_id().expect("optimized concurrent next_id");
if let Some(prev) = prev {
if next <= prev {
duplicates.fetch_add(1, Ordering::Relaxed);
}
}
prev = Some(next);
}
prev = Some(next);
}
}));
}
for handle in handles {
handle.join().expect("optimized thread join");
}
}));
}
for handle in handles {
handle.join().expect("optimized thread join");
}
});
assert_eq!(duplicates.load(Ordering::Relaxed), 0);
});
assert_eq!(duplicates.load(Ordering::Relaxed), 0);
});
});

group.bench_function(BenchmarkId::new("legacy", format!("{}x{}", thread_count, ids_per_thread)), |b| {
b.iter(|| {
let generator = Arc::new(LegacySnowflakeGenerator::new(1));
let duplicates = Arc::new(AtomicU64::new(0));
thread::scope(|scope| {
let mut handles = Vec::with_capacity(thread_count);
for _ in 0..thread_count {
let generator = Arc::clone(&generator);
let duplicates = Arc::clone(&duplicates);
handles.push(scope.spawn(move || {
let mut prev = None;
for _ in 0..ids_per_thread {
let next = generator.next_id().expect("legacy concurrent next_id");
if let Some(prev) = prev {
if next <= prev {
duplicates.fetch_add(1, Ordering::Relaxed);
},
);

group.bench_function(
BenchmarkId::new("legacy", format!("{}x{}", thread_count, ids_per_thread)),
|b| {
b.iter(|| {
let generator = Arc::new(LegacySnowflakeGenerator::new(1));
let duplicates = Arc::new(AtomicU64::new(0));
thread::scope(|scope| {
let mut handles = Vec::with_capacity(thread_count);
for _ in 0..thread_count {
let generator = Arc::clone(&generator);
let duplicates = Arc::clone(&duplicates);
handles.push(scope.spawn(move || {
let mut prev = None;
for _ in 0..ids_per_thread {
let next = generator.next_id().expect("legacy concurrent next_id");
if let Some(prev) = prev {
if next <= prev {
duplicates.fetch_add(1, Ordering::Relaxed);
}
}
prev = Some(next);
}
prev = Some(next);
}
}));
}
for handle in handles {
handle.join().expect("legacy thread join");
}
}));
}
for handle in handles {
handle.join().expect("legacy thread join");
}
});
assert_eq!(duplicates.load(Ordering::Relaxed), 0);
});
assert_eq!(duplicates.load(Ordering::Relaxed), 0);
});
});
},
);

group.finish();
}

criterion_group!(
name = benches;
config = Criterion::default().sample_size(20).warm_up_time(Duration::from_millis(500));
targets = bench_single_generation, bench_batch_generation, bench_concurrent_single_generation
targets = bench_single_generation, bench_batch_generation, bench_mapped_batch_generation, bench_concurrent_single_generation
);
criterion_main!(benches);
Loading
Loading