Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
9d15919
feature(collator): transactional `AnchorCache`
drmick Dec 24, 2025
9bf5dea
feature(collator): transactional `CumulativeStatistics`
drmick Jan 7, 2026
50935c3
refactor(collator): refactor `AnchorCache` and tests
drmick Jan 8, 2026
4c8f85b
Revert "refactor(collator): use `Box` wrapper for `ParsedMessage`"
drmick Jan 8, 2026
27aab7d
feature(collator): transactional `MessageBuffer`
drmick Jan 8, 2026
70bab1c
feature(collator): transactional `ExternalReader`
drmick Jan 8, 2026
931ef36
feature(collator): add transactional macro
drmick Jan 15, 2026
ede13b3
fix(collator): message buffer
drmick Jan 18, 2026
7f80da3
refactor(collator): improve queue statistics
drmick Jan 20, 2026
155a314
feature(collator): rework `CumulativeStatistics`
drmick Jan 24, 2026
005cbf2
test(collator): transaction test
drmick Jan 24, 2026
0fe1254
refactor(collator): split internal states
drmick Jan 24, 2026
9ab5040
feature(collator): transactional `mq_adapter` at collation task
drmick Jan 25, 2026
776380e
refactor(collator): clippy and refactor
drmick Jan 26, 2026
ffe70a9
feature(collator): transactional `do_collate`
drmick Jan 26, 2026
28b3275
refactor(collator): clippy fix
drmick Jan 26, 2026
f5e2f5c
refactor(collator): separate transactional types
drmick Feb 4, 2026
929174a
refactor(collator): reworked `QueueStatistics` transaction
drmick Feb 26, 2026
20245cd
fix(collator): cumulative stats duplicate diff fix
drmick Mar 9, 2026
18ee6b6
fix(collator): fix refill messages buffer in transaction
drmick Mar 17, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ dbg_macro = "warn"
debug_assert_with_mut_call = "warn"
disallowed_methods = "warn"
doc_markdown = "warn"
empty_enums = "warn"
empty_enum = "warn"
enum_glob_use = "warn"
exit = "warn"
expl_impl_clone_on_copy = "warn"
Expand Down
11 changes: 9 additions & 2 deletions collator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ harness = false
#path = "benches/messages_buffer.rs"
required-features = ["bench-helpers"]

[[bench]]
name = "queue_statistics"
harness = false
required-features = ["bench-helpers"]

[[test]]
name = "adapter_tests"
required-features = ["test"]
Expand All @@ -34,12 +39,13 @@ blake3 = { workspace = true }
bumpalo = { workspace = true }
bytes = { workspace = true }
bytesize = { workspace = true }
dashmap = { workspace = true }
futures-util = { workspace = true }
humantime = { workspace = true }
indexmap = { workspace = true }
metrics = { workspace = true }
num-bigint = { workspace = true }
parking_lot = { workspace = true }
parking_lot = { workspace = true, features = ["send_guard"] }
rand = { workspace = true }
rayon = { workspace = true }
scc = { workspace = true }
Expand All @@ -55,6 +61,7 @@ tracing = { workspace = true }
tracing-subscriber = { workspace = true }
tycho-crypto = { workspace = true }
tycho-types = { workspace = true, features = ["rand9", "blake3", "rayon"] }
tycho-util-proc = { workspace = true}
weedb = { workspace = true }

# local deps
Expand All @@ -68,13 +75,13 @@ tycho-util = { workspace = true }
tycho-vm = { workspace = true }

[dev-dependencies]
criterion = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread"] }
tycho-block-util = { workspace = true, features = ["test"] }
tycho-core = { workspace = true, features = ["test"] }
tycho-network = { workspace = true, features = ["test"] }
tycho-storage = { workspace = true, features = ["test"] }
tycho-util = { workspace = true, features = ["test"] }
criterion = { workspace = true }

[features]
default = []
Expand Down
151 changes: 151 additions & 0 deletions collator/benches/queue_statistics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
use std::hint::black_box;
use std::time::Duration;

use criterion::{BatchSize, Criterion, Throughput, criterion_group, criterion_main};
use tycho_collator::internal_queue::types::stats::{AccountStatistics, QueueStatistics};
use tycho_types::cell::HashBytes;
use tycho_types::models::{IntAddr, StdAddr};
use tycho_util::transactional::Transactional;
use tycho_util::{FastHashMap, FastHashSet};

const ACCOUNTS_COUNT: usize = 1_000_000;
const SPARSE_CHANGED_COUNT: usize = 50_000;
const HOTSET_CHANGED_COUNT: usize = 5_000;
const TX_OPS_COUNT: usize = 50_000;

fn config() -> Criterion {
Criterion::default()
.sample_size(10)
.warm_up_time(Duration::from_secs(2))
.measurement_time(Duration::from_secs(20))
}

fn make_addr(i: u32) -> IntAddr {
let mut bytes = [0u8; 32];
bytes[..4].copy_from_slice(&i.to_le_bytes());
IntAddr::Std(StdAddr::new(0, HashBytes(bytes)))
}

fn build_base_stats(accounts_count: usize) -> QueueStatistics {
let mut accounts: AccountStatistics = FastHashMap::default();
accounts.reserve(accounts_count);

for i in 0..accounts_count as u32 {
accounts.insert(make_addr(i), 1);
}

QueueStatistics::with_statistics(accounts)
}

fn build_prepared_stats_for_commit(accounts_count: usize, changed_count: usize) -> QueueStatistics {
assert!(changed_count <= accounts_count);

let mut stats = build_base_stats(accounts_count);
stats.begin();

for i in 0..changed_count as u32 {
stats.increment_for_account(make_addr(i), 1);
}

stats
}

fn build_increment_ops(unique_accounts: usize, ops_count: usize) -> Vec<IntAddr> {
assert!(unique_accounts > 0);
let mut ops = Vec::with_capacity(ops_count);
for i in 0..ops_count {
ops.push(make_addr((i % unique_accounts) as u32));
}
ops
}

fn queue_statistics_commit_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("queue_statistics_commit");
group.throughput(Throughput::Elements(ACCOUNTS_COUNT as u64));

for (name, changed_count) in [
("1m_accounts_all_staged", ACCOUNTS_COUNT),
("1m_accounts_50k_staged", SPARSE_CHANGED_COUNT),
] {
let prepared = build_prepared_stats_for_commit(ACCOUNTS_COUNT, changed_count);

group.bench_function(name, |b| {
b.iter_batched(
|| prepared.clone(),
|mut stats| {
stats.commit();
black_box(stats);
},
BatchSize::LargeInput,
);
});
}

group.finish();
}

fn dirty_accounts_allocation_benchmark(c: &mut Criterion) {
let addresses: Vec<IntAddr> = (0..SPARSE_CHANGED_COUNT as u32).map(make_addr).collect();

let mut group = c.benchmark_group("dirty_accounts_allocation");
group.throughput(Throughput::Elements(SPARSE_CHANGED_COUNT as u64));

group.bench_function("vec_50k_clone", |b| {
b.iter(|| {
let mut dirty = Vec::with_capacity(SPARSE_CHANGED_COUNT);
dirty.extend(addresses.iter().cloned());
black_box(dirty);
});
});

group.bench_function("fast_hash_set_50k_insert", |b| {
b.iter(|| {
let mut dirty: FastHashSet<IntAddr> = FastHashSet::default();
dirty.reserve(SPARSE_CHANGED_COUNT);
for addr in &addresses {
dirty.insert(addr.clone());
}
black_box(dirty);
});
});

group.finish();
}

fn queue_statistics_tx_total_benchmark(c: &mut Criterion) {
let base = build_base_stats(ACCOUNTS_COUNT);
let ops_50k_unique = build_increment_ops(SPARSE_CHANGED_COUNT, TX_OPS_COUNT);
let ops_5k_unique = build_increment_ops(HOTSET_CHANGED_COUNT, TX_OPS_COUNT);

let mut group = c.benchmark_group("queue_statistics_tx_total");
group.throughput(Throughput::Elements(TX_OPS_COUNT as u64));

for (name, ops) in [
("1m_base_50k_ops_50k_unique", &ops_50k_unique),
("1m_base_50k_ops_5k_unique", &ops_5k_unique),
] {
group.bench_function(name, |b| {
b.iter_batched(
|| base.clone(),
|mut stats| {
stats.begin();
for addr in ops {
stats.increment_for_account(addr.clone(), 1);
}
stats.commit();
black_box(stats);
},
BatchSize::LargeInput,
);
});
}

group.finish();
}

criterion_group! {
name = benches;
config = config();
targets = queue_statistics_commit_benchmark, dirty_accounts_allocation_benchmark, queue_statistics_tx_total_benchmark
}
criterion_main!(benches);
Loading
Loading