Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions python/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions rust/lance-namespace-datafusion/tests/sql.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

#![recursion_limit = "256"]

use std::sync::Arc;

use arrow_array::{Int32Array, Int64Array, RecordBatch, RecordBatchIterator, StringArray};
Expand Down
73 changes: 73 additions & 0 deletions rust/lance-namespace-impls/BENCHMARK.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# `__manifest` commit benchmark

Measures how fast the copy-on-write directory catalog commits `__manifest` mutations as
the manifest scales, with the inline scalar indices on or off.

The catalog commits every mutation by rewriting the whole `__manifest` (copy-on-write)
and atomically writing a new manifest version. This benchmark characterises:

- **Continuous commit** — a single process commits `N` times into a manifest already
holding `rows` entries (per-commit latency + throughput).
- **Concurrent commit** — `C` processes commit continuously for a fixed duration against
a manifest of `rows` entries (steady, contended TPS).

## Binary: `examples/manifest_bench.rs`

```
manifest_bench seed-large --root <uri> --count <rows> --inline-optimization <true|false> \
[--storage-option aws_region=us-east-1]
manifest_bench run --root <uri> --operation write-create-namespace \
--concurrency 1 --operations 100 --initial-entries <rows> --inline-optimization <bool> # continuous
manifest_bench run --root <uri> --operation write-create-namespace \
--concurrency 50 --duration-secs 30 --initial-entries <rows> --inline-optimization <bool> # concurrent
```

- `seed-large` bootstraps a manifest to `count` rows by writing the Lance dataset
directly (O(rows) once) and then triggering one CoW rewrite so the on-disk state
matches the steady catalog form (single fragment; inline indices when enabled).
- `run` spawns `--concurrency` worker subprocesses. With `--operations` it runs a fixed
commit budget (continuous); with `--duration-secs` each worker commits until the
deadline (steady TPS). It prints one JSON `BenchResult` per concurrency level with
throughput and p50/p90/p99 latency.
- The committed operation (`--operation`) defaults to `write-create-namespace`, the
cheapest pure-`__manifest` mutation (no table data). `write-create-table` /
`write-declare-table` are also available.

S3 requires the default `dir-aws` feature (on by default) and AWS credentials in the
environment; pass `--storage-option aws_region=<region>`.

## Sweep panel: `benches/manifest_commit_sweep.sh`

Runs the full panel — sizes × {inline index, no index} × {continuous, concurrent×C} —
with per-run S3-copy isolation (each run starts at exactly the bootstrapped size),
JSONL results, a `summary.csv`, and resume support.

```bash
cargo build --release --example manifest_bench -p lance-namespace-impls
S3_BASE=s3://<bucket>/manifest-cow-bench/$(date -u +%Y%m%dT%H%M%SZ) \
rust/lance-namespace-impls/benches/manifest_commit_sweep.sh
```

Default panel (override via env): `SIZES="1000 2000 5000 10000 20000 50000 100000 200000
500000 1000000"`, `CONCURRENCY="10 20 50 100 120 150 200"`, `INLINE_VARIANTS="true false"`,
`CONT_OPS=100`, `CONC_DURATION_SECS=30`. Results land in `$OUT_DIR` (default
`~/manifest_cow_bench_<RUN_ID>`).

## Representative results

EC2 `c7i.48xlarge`, S3 `us-east-1`, op `write-create-namespace`. The catalog is a
single-writer-throughput system: per-commit cost scales ~O(rows) and throughput does **not**
scale with concurrency (every commit is a serialized `__manifest` version bump).

Continuous (1 process, 100 commits), ops/s — inline index vs no index:

| rows | inline | no index |
|---:|---:|---:|
| 1,000 | 2.0 | 3.5 |
| 100,000 | 1.1 | 2.1 |
| 1,000,000 | 0.34 | 0.53 |

Concurrent steady TPS is flat across C=10..200 (e.g. inline @100k ≈ 1.4–1.5 ops/s at every C;
@1M ≈ 0.3 ops/s). Conflicts that exceed the retry budget surface as errors and grow with C
(≈0 at C≤20, climbing at C≥100) — the contention ceiling, not data loss. No-index commits run
~1.5–2× faster (no per-commit index build) at the cost of unindexed reads.
9 changes: 9 additions & 0 deletions rust/lance-namespace-impls/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ object_store = { workspace = true }
arrow = { workspace = true }
arrow-ipc = { workspace = true }
arrow-schema = { workspace = true }
datafusion-common = { workspace = true }
datafusion-physical-plan = { workspace = true }

# REST adapter implementation dependencies (optional, enabled by "rest-adapter" feature)
axum = { workspace = true, optional = true }
Expand All @@ -66,6 +68,8 @@ serde_json = { workspace = true }
futures.workspace = true
log.workspace = true
rand.workspace = true
roaring.workspace = true
uuid.workspace = true

# Shared credential vending dependencies
sha2 = { version = "0.10", optional = true }
Expand Down Expand Up @@ -96,6 +100,11 @@ rstest.workspace = true
lance-table.workspace = true
lance-arrow = { workspace = true }
lance = { workspace = true }
serde = { workspace = true, features = ["derive"] }

[[example]]
name = "manifest_bench"
path = "examples/manifest_bench.rs"

[lints]
workspace = true
146 changes: 146 additions & 0 deletions rust/lance-namespace-impls/benches/manifest_commit_sweep.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
#!/usr/bin/env bash
# Copy-on-write __manifest commit benchmark sweep panel.
#
# Drives `cargo run --release --example manifest_bench` across a panel of:
# - bootstrap manifest sizes (rows already in __manifest)
# - inline scalar indices on vs off
# - continuous commit (single process, N commits) and
# concurrent commit (C processes, steady TPS over a fixed duration)
#
# Each run is isolated: a "golden" manifest is bootstrapped once per (size, index)
# and server-side-copied to a fresh S3 prefix per run, so every run starts at exactly
# the bootstrapped size. Results are written as JSONL (one BenchResult per line) and
# summarised to CSV. The sweep is resumable: completed runs are skipped.
#
# Usage:
# S3_BASE=s3://jack-devland-build/manifest-cow-bench/$(date -u +%Y%m%dT%H%M%SZ) \
# ./manifest_commit_sweep.sh
#
# Env knobs (defaults match the requested panel):
# SIZES, CONCURRENCY, INLINE_VARIANTS, CONT_OPS, CONC_DURATION_SECS,
# AWS_REGION, OUT_DIR, BIN
#
# Resilient by design: a single failed run is logged and skipped rather than aborting
# the sweep, and re-running fills the gaps (completed runs are detected and skipped).
set -uo pipefail

RUN_ID="${RUN_ID:-$(date -u +%Y%m%dT%H%M%SZ)}"
S3_BASE="${S3_BASE:?set S3_BASE, e.g. s3://jack-devland-build/manifest-cow-bench/$RUN_ID}"
AWS_REGION="${AWS_REGION:-us-east-1}"
export AWS_REGION AWS_DEFAULT_REGION="$AWS_REGION"

REPO_ROOT="${REPO_ROOT:-$HOME/oss/lance}"
BIN="${BIN:-$REPO_ROOT/target/release/examples/manifest_bench}"
OUT_DIR="${OUT_DIR:-$HOME/manifest_cow_bench_${RUN_ID}}"
RESULTS="$OUT_DIR/results.jsonl"
PROGRESS="$OUT_DIR/progress.log"
mkdir -p "$OUT_DIR"

SIZES=(${SIZES:-1000 2000 5000 10000 20000 50000 100000 200000 500000 1000000})
CONCURRENCY=(${CONCURRENCY:-10 20 50 100 120 150 200})
INLINE_VARIANTS=(${INLINE_VARIANTS:-true false})
CONT_OPS="${CONT_OPS:-100}"
CONC_DURATION_SECS="${CONC_DURATION_SECS:-30}"
STORAGE_OPT=(--storage-option "aws_region=${AWS_REGION}")

log() { printf '%s %s\n' "$(date -u +%H:%M:%S)" "$*" | tee -a "$PROGRESS"; }

# Skip a run if its tag already appears in results.jsonl (resume support).
done_already() { grep -q "\"bench_tag\":\"$1\"" "$RESULTS" 2>/dev/null; }

# Append a result line, tagging it so reruns can resume and we can pivot later.
record() {
local tag="$1"; shift
# shellcheck disable=SC2016
python3 -c 'import json,sys; d=json.load(sys.stdin); d["bench_tag"]=sys.argv[1]; print(json.dumps(d))' \
"$tag" >> "$RESULTS"
}

s3_copy() { aws s3 cp --recursive --quiet "$1" "$2" --region "$AWS_REGION"; }
s3_rm() { aws s3 rm --recursive --quiet "$1" --region "$AWS_REGION" || true; }

# Backstops for unattended runs: cap any single run and clear leaked worker processes
# (a killed coordinator can orphan its worker children) before the next run.
RUN_TIMEOUT="${RUN_TIMEOUT:-1200}"
clear_stragglers() { pkill -f 'examples/manifest_bench worker' 2>/dev/null || true; sleep 1; }

for inline in "${INLINE_VARIANTS[@]}"; do
for rows in "${SIZES[@]}"; do
golden="${S3_BASE}/golden/inline_${inline}_rows_${rows}"
boot_tag="boot_inline_${inline}_rows_${rows}"

if ! done_already "$boot_tag"; then
log "BOOTSTRAP inline=$inline rows=$rows -> $golden"
s3_rm "$golden"
if "$BIN" seed-large --root "$golden" --count "$rows" \
--inline-optimization "$inline" "${STORAGE_OPT[@]}"; then
echo "{\"bench_tag\":\"$boot_tag\"}" >> "$RESULTS"
else
log "BOOTSTRAP FAILED inline=$inline rows=$rows (skipping this size)"
continue
fi
else
log "skip bootstrap $boot_tag (done)"
fi

# ---- Continuous: single process, CONT_OPS commits ----
cont_tag="cont_inline_${inline}_rows_${rows}"
if ! done_already "$cont_tag"; then
run_prefix="${S3_BASE}/run/${cont_tag}"
log "CONTINUOUS inline=$inline rows=$rows ops=$CONT_OPS"
clear_stragglers
s3_copy "$golden" "$run_prefix"
timeout "$RUN_TIMEOUT" "$BIN" run --root "$run_prefix" --operation write-create-namespace \
--concurrency 1 --operations "$CONT_OPS" --initial-entries "$rows" \
--inline-optimization "$inline" "${STORAGE_OPT[@]}" \
2>>"$PROGRESS" | while read -r line; do record "$cont_tag" <<<"$line"; done
s3_rm "$run_prefix"
else
log "skip continuous $cont_tag (done)"
fi

# ---- Concurrent: C processes, steady TPS over CONC_DURATION_SECS ----
for c in "${CONCURRENCY[@]}"; do
conc_tag="conc_inline_${inline}_rows_${rows}_c_${c}"
if done_already "$conc_tag"; then log "skip concurrent $conc_tag (done)"; continue; fi
run_prefix="${S3_BASE}/run/${conc_tag}"
log "CONCURRENT inline=$inline rows=$rows c=$c dur=${CONC_DURATION_SECS}s"
clear_stragglers
s3_copy "$golden" "$run_prefix"
timeout "$RUN_TIMEOUT" "$BIN" run --root "$run_prefix" --operation write-create-namespace \
--concurrency "$c" --duration-secs "$CONC_DURATION_SECS" --initial-entries "$rows" \
--inline-optimization "$inline" "${STORAGE_OPT[@]}" \
2>>"$PROGRESS" | while read -r line; do record "$conc_tag" <<<"$line"; done
s3_rm "$run_prefix"
done
done
done

# ---- Summarise to CSV ----
CSV="$OUT_DIR/summary.csv"
python3 - "$RESULTS" "$CSV" <<'PY'
import json, sys, csv
rows = []
with open(sys.argv[1]) as f:
for line in f:
d = json.loads(line)
if "throughput_ops_per_sec" not in d:
continue # bootstrap marker
mode = "continuous" if d["duration_secs"] == 0 else "concurrent"
rows.append({
"mode": mode, "variant": d["variant"], "initial_entries": d["initial_entries"],
"concurrency": d["concurrency"], "duration_secs": d["duration_secs"],
"ops": d["total_operations"], "errors": d["errors"],
"tps": round(d["throughput_ops_per_sec"], 3),
"avg_ms": round(d["avg_latency_ms"], 2), "p50_ms": round(d["p50_latency_ms"], 2),
"p90_ms": round(d["p90_latency_ms"], 2), "p99_ms": round(d["p99_latency_ms"], 2),
})
rows.sort(key=lambda r: (r["mode"], r["variant"], r["initial_entries"], r["concurrency"]))
with open(sys.argv[2], "w", newline="") as f:
w = csv.DictWriter(f, fieldnames=list(rows[0].keys()) if rows else [])
w.writeheader(); w.writerows(rows)
print(f"wrote {len(rows)} rows to {sys.argv[2]}")
PY

log "SWEEP COMPLETE. Results: $RESULTS Summary: $CSV"
s3_rm "${S3_BASE}/golden" "${S3_BASE}/run" 2>/dev/null || true
Loading
Loading