Skip to content
1 change: 1 addition & 0 deletions cli/src/cmd/tools/dump_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ impl Dumper {
&consensus_config,
mc_state_extra.consensus_info.genesis_info,
)
.await
.context("Failed to load dumped anchors")?;

let dst_dir = self.output_dir.path().join("mempool");
Expand Down
1 change: 1 addition & 0 deletions collator/src/mempool/impls/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod cache;
pub mod deduplicator;
pub mod parser;
pub mod shuttle;
pub mod v_set_adapter;
101 changes: 101 additions & 0 deletions collator/src/mempool/impls/common/shuttle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use std::time::Duration;

use anyhow::Result;
use bumpalo::Bump;
use tycho_consensus::prelude::{AnchorData, MempoolAdapterStore};
use tycho_util::time::now_millis;

use crate::mempool::impls::common::parser::{Parser, ParserOutput};
use crate::mempool::{MempoolAnchor, MempoolAnchorId};
use crate::tracing_targets;

pub struct Shuttle {
pub store: MempoolAdapterStore,
pub parser: Parser,
pub first_after_gap: Option<MempoolAnchorId>,
pub set_committed_in_db: bool,
}

impl Shuttle {
pub async fn handle<F>(mut self, committed: AnchorData, push: F) -> Result<Self>
where
F: FnOnce(MempoolAnchor) + Send + 'static,
{
let anchor_id: MempoolAnchorId = committed.anchor.round().0;
metrics::gauge!("tycho_mempool_last_anchor_round").set(anchor_id);

let chain_time = committed.anchor.time().millis();
let is_executable =
(self.first_after_gap.as_ref()).is_none_or(|first_id| anchor_id >= *first_id);

let task = tokio::task::spawn_blocking(move || {
let bump = Bump::with_capacity(
(self.store).expand_anchor_history_arena_size(&committed.history),
);

let payloads = (self.store).expand_anchor_history(&committed, &bump)?;

let total_messages = payloads.len();
let total_bytes: usize = payloads.iter().fold(0, |acc, bytes| acc + bytes.len());

let ParserOutput {
unique_messages,
unique_payload_bytes,
} = self.parser.parse_unique(anchor_id, payloads);

let unique_messages_len = unique_messages.len();

if is_executable {
push(MempoolAnchor {
id: anchor_id,
prev_id: committed.prev_anchor.map(|round| round.0),
chain_time,
author: *committed.anchor.author(),
externals: unique_messages,
});
}

metrics::counter!("tycho_mempool_msgs_unique_count")
.increment(unique_messages_len as _);
metrics::counter!("tycho_mempool_msgs_unique_bytes")
.increment(unique_payload_bytes as _);

metrics::counter!("tycho_mempool_msgs_duplicates_count")
.increment((total_messages - unique_messages_len) as _);
metrics::counter!("tycho_mempool_msgs_duplicates_bytes")
.increment((total_bytes - unique_payload_bytes) as _);

metrics::histogram!("tycho_mempool_commit_anchor_latency_time").record(
Duration::from_millis(now_millis().max(chain_time) - chain_time),
);

tracing::info!(
target: tracing_targets::MEMPOOL_ADAPTER,
id = anchor_id,
%is_executable,
time = chain_time,
externals_unique = unique_messages_len,
externals_skipped = total_messages - unique_messages_len,
"new anchor"
);

// Note: removed from hot path at the price of anchor dump may not recover last anchor
// in case of ungraceful shutdown; node restart is independent from stored anchor flags
if self.set_committed_in_db {
self.store.set_committed(&committed)?;
}

tycho_util::mem::Reclaimer::instance().drop((committed, bump));

self.parser.clean(anchor_id);

anyhow::Ok(self)
});

match task.await {
Ok(result) => result,
Err(e) if e.is_panic() => std::panic::resume_unwind(e.into_panic()),
Err(e) => Err(e.into()),
}
}
}
135 changes: 57 additions & 78 deletions collator/src/mempool/impls/dump_anchors.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
use std::sync::Arc;
use std::sync::{Arc, Mutex};

use anyhow::{Context, Result};
use bumpalo::Bump;
use tycho_consensus::prelude::{
AnchorStageRole, ConsensusConfigExt, MempoolAdapterStore, MempoolConfigBuilder, MempoolDb,
MempoolNodeConfig,
MempoolAdapterStore, MempoolConfigBuilder, MempoolDb, MempoolNodeConfig, MempoolOutput,
};
use tycho_network::PeerId;
use tycho_storage::StorageContext;
use tycho_types::boc::Boc;
use tycho_types::models::{ConsensusConfig, GenesisInfo, Message, MsgInfo};

use crate::mempool::impls::common::parser::{Parser, ParserOutput};
use crate::mempool::impls::common::parser::Parser;
use crate::mempool::impls::common::shuttle::Shuttle;
use crate::mempool::{ExternalMessage, MempoolAnchor, MempoolAnchorId};
use crate::tracing_targets;

#[derive(serde::Deserialize, serde::Serialize, Debug, Clone)]
pub struct DumpedAnchor {
Expand Down Expand Up @@ -61,16 +61,17 @@ impl DumpAnchors {
store: MempoolAdapterStore::new(mempool_db),
})
}
pub fn load(
&self,

pub async fn load(
self,
top_processed_to_anchor: MempoolAnchorId,
mempool_node_config: &MempoolNodeConfig,
consensus_config: &ConsensusConfig,
genesis_info: GenesisInfo,
) -> Result<Vec<MempoolAnchor>> {
anyhow::ensure!(
top_processed_to_anchor >= genesis_info.start_round,
"Cannot load history earlier than genesis round: \
top_processed_to_anchor > genesis_info.start_round,
"Cannot load history of previous genesis: \
got top_processed_to_anchor={top_processed_to_anchor} and {genesis_info:?}",
);

Expand All @@ -80,68 +81,53 @@ impl DumpAnchors {

let conf = config_builder.build()?.conf;

let bottom_round = top_processed_to_anchor
.saturating_sub(conf.consensus.replay_anchor_rounds())
.max(conf.genesis_round.0);

// no overlay id check: do not rewrite db state, just try to load data
let outputs = (self.store)
.restore_committed(top_processed_to_anchor, &conf)
.await?;

let anchors = (self.store).load_history_since(bottom_round);
let mut shuttle = Shuttle {
store: self.store,
parser: Parser::new(conf.consensus.deduplicate_rounds),
first_after_gap: Some(top_processed_to_anchor),
set_committed_in_db: false,
};

let mut total_payload_bytes: usize = 0;
for (_, history) in anchors.values() {
for info in history {
total_payload_bytes += info.payload_bytes() as usize;
let results = Arc::new(Mutex::new(Vec::new()));

for output in outputs {
match output {
MempoolOutput::NewStartAfterGap(anchors_full_bottom) => {
let first_to_execute =
(anchors_full_bottom + conf.consensus.deduplicate_rounds).0;
shuttle.parser = Parser::new(conf.consensus.deduplicate_rounds);
shuttle.first_after_gap = Some(first_to_execute);

tracing::info!(
target: tracing_targets::MEMPOOL_ADAPTER,
new_bottom = anchors_full_bottom.0,
first_after_gap = first_to_execute,
"unrecoverable gap in anchor chain",
);
}
MempoolOutput::NextAnchor(adata) => {
let results = results.clone();
let f = move |anchor| results.lock().unwrap().push(anchor);
shuttle = shuttle.handle(adata, f).await?;
}
MempoolOutput::CommitFinished(_)
| MempoolOutput::Running
| MempoolOutput::Paused => {}
}
}
let bump = Bump::with_capacity(total_payload_bytes);

let mut parser = Parser::new(conf.consensus.deduplicate_rounds);

let mut output = Vec::new();

let mut prev_visited_anchor = None;

for (anchor_round, (anchor, history)) in anchors {
let payloads = (self.store).expand_anchor_history(&anchor, &history, &bump, false);

let ParserOutput {
unique_messages, ..
} = parser.parse_unique(anchor_round, payloads);

let prev_linked_anchor = anchor.anchor_round(AnchorStageRole::Proof).prev().0;

if let Some(prev_visited_anchor) = prev_visited_anchor {
anyhow::ensure!(
prev_visited_anchor == prev_linked_anchor,
"cannot reproduce anchor history because mempool state is not synced; \
there is a gap after anchor {prev_visited_anchor}: \
expected at most {prev_linked_anchor} got {anchor_round}",
);
}
let results = Arc::into_inner(results).expect("it's the only ref");

if anchor_round >= top_processed_to_anchor {
output.push(MempoolAnchor {
id: anchor_round,
prev_id: (prev_linked_anchor > conf.genesis_round.0)
.then_some(prev_linked_anchor),
chain_time: anchor.time().millis(),
author: *anchor.author(),
externals: unique_messages,
});
}

prev_visited_anchor = Some(anchor_round);
}

Ok(output)
results.into_inner().context("Shuttle handle anchors")
}
}

#[cfg(all(test, feature = "test"))]
mod test {
use std::num::NonZeroU16;

use tycho_consensus::test_utils::default_test_config;
use tycho_storage::StorageConfig;
use tycho_util::test::init_logger;
Expand All @@ -163,26 +149,19 @@ mod test {

let top_processed_to_anchor: MempoolAnchorId = 10;

let mempool_node_conf = MempoolNodeConfig {
clean_db_period_rounds: NonZeroU16::new(10).unwrap(),
..Default::default()
};

let test_conf = default_test_config();

let dump = dump_anchors.load(
top_processed_to_anchor,
&mempool_node_conf,
&test_conf.conf.consensus,
GenesisInfo {
start_round: 2,
genesis_millis: 0,
},
)?;

for i in dump {
tracing::info!("{i:?}");
}
let _ = dump_anchors
.load(
top_processed_to_anchor,
test_conf.node_config(),
&test_conf.conf.consensus,
GenesisInfo {
start_round: 2,
genesis_millis: 0,
},
)
.await?;

Ok(())
}
Expand Down
7 changes: 3 additions & 4 deletions collator/src/mempool/impls/single_node_impl/anchor_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::Arc;
use std::time::Duration;

use bytes::Bytes;
use tycho_consensus::prelude::WAVE_ROUNDS;
use tycho_network::PeerId;
use tycho_types::models::ConsensusConfig;
use tycho_util::time::{MonotonicClock, now_millis};
Expand All @@ -11,8 +12,6 @@ use crate::mempool::impls::common::parser::{Parser, ParserOutput};
use crate::mempool::{MempoolAnchor, MempoolAnchorId};
use crate::tracing_targets;

pub const ANCHOR_ID_STEP: u32 = 4;

pub struct SingleNodeAnchorHandler {
cache: Arc<Cache>,
parser: Parser,
Expand All @@ -27,7 +26,7 @@ impl SingleNodeAnchorHandler {
top_processed_to_anchor_id: MempoolAnchorId,
config: &ConsensusConfig,
) -> Self {
let prev_anchor_id = top_processed_to_anchor_id.saturating_sub(ANCHOR_ID_STEP);
let prev_anchor_id = top_processed_to_anchor_id.saturating_sub(WAVE_ROUNDS);
Self {
cache,
parser: Parser::new(config.deduplicate_rounds),
Expand All @@ -38,7 +37,7 @@ impl SingleNodeAnchorHandler {

pub async fn handle(mut self, payloads: Vec<Bytes>) -> Self {
let prev_anchor_id = self.prev_anchor_id.take();
let anchor_id = prev_anchor_id.unwrap_or(1) + ANCHOR_ID_STEP;
let anchor_id = prev_anchor_id.unwrap_or(1) + WAVE_ROUNDS;
metrics::gauge!("tycho_mempool_last_anchor_round").set(anchor_id);

let chain_time = MonotonicClock::now_millis();
Expand Down
6 changes: 2 additions & 4 deletions collator/src/mempool/impls/single_node_impl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ use tycho_util::futures::JoinTask;
use crate::mempool::StateUpdateContext;
use crate::mempool::impls::common::cache::Cache;
use crate::mempool::impls::common::v_set_adapter::VSetAdapter;
use crate::mempool::impls::single_node_impl::anchor_handler::{
ANCHOR_ID_STEP, SingleNodeAnchorHandler,
};
use crate::mempool::impls::single_node_impl::anchor_handler::SingleNodeAnchorHandler;
use crate::tracing_targets;

pub struct MempoolAdapterSingleNodeImpl {
Expand Down Expand Up @@ -59,7 +57,7 @@ impl MempoolAdapterSingleNodeImpl {

let timeout = merged_conf.conf.consensus.broadcast_retry_millis.get() as u64
* merged_conf.conf.consensus.min_sign_attempts.get() as u64
* ANCHOR_ID_STEP as u64;
* WAVE_ROUNDS as u64;

let mut interval = tokio::time::interval(std::time::Duration::from_millis(timeout));
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
Expand Down
Loading