Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
374 changes: 355 additions & 19 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ members = [
"crypto/kzg",
"database_manager",
"dummy_el",
"execution-witness-sentry",
"lcli",
"lighthouse",
"lighthouse/environment",
Expand Down
18 changes: 18 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4149,6 +4149,24 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// This prevents inconsistency between the two at the expense of concurrency.
drop(fork_choice);

// Persist execution proofs to the database if zkvm is enabled and proofs are cached.
// This is done after the block is successfully stored so we don't lose proofs on cache eviction.
if let Some(proofs) = self
.data_availability_checker
.get_execution_proofs(&block_root)
&& !proofs.is_empty()
{
let proofs_owned: Vec<_> = proofs.iter().map(|p| (**p).clone()).collect();
if let Err(e) = self.store.put_execution_proofs(&block_root, &proofs_owned) {
// Log but don't fail block import - proofs can still be served from cache
warn!(
%block_root,
error = ?e,
"Failed to persist execution proofs to database"
);
}
}

// We're declaring the block "imported" at this point, since fork choice and the DB know
// about it.
let block_time_imported = timestamp_now();
Expand Down
7 changes: 7 additions & 0 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1159,6 +1159,13 @@ where
.process_prune_blobs(data_availability_boundary);
}

// Prune execution proofs older than the execution proof boundary in the background.
if let Some(execution_proof_boundary) = beacon_chain.execution_proof_boundary() {
beacon_chain
.store_migrator
.process_prune_execution_proofs(execution_proof_boundary);
}

Ok(beacon_chain)
}
}
Expand Down
6 changes: 6 additions & 0 deletions beacon_node/beacon_chain/src/canonical_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1034,6 +1034,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.process_prune_blobs(data_availability_boundary);
}

// Prune execution proofs in the background.
if let Some(execution_proof_boundary) = self.execution_proof_boundary() {
self.store_migrator
.process_prune_execution_proofs(execution_proof_boundary);
}

// Take a write-lock on the canonical head and signal for it to prune.
self.canonical_head.fork_choice_write_lock().prune()?;

Expand Down
34 changes: 34 additions & 0 deletions beacon_node/beacon_chain/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ pub enum Notification {
Finalization(FinalizationNotification),
Reconstruction,
PruneBlobs(Epoch),
PruneExecutionProofs(Epoch),
ManualFinalization(ManualFinalizationNotification),
ManualCompaction,
}
Expand Down Expand Up @@ -251,6 +252,28 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
}
}

pub fn process_prune_execution_proofs(&self, execution_proof_boundary: Epoch) {
if let Some(Notification::PruneExecutionProofs(execution_proof_boundary)) = self
.send_background_notification(Notification::PruneExecutionProofs(
execution_proof_boundary,
))
{
Self::run_prune_execution_proofs(self.db.clone(), execution_proof_boundary);
}
}

pub fn run_prune_execution_proofs(
db: Arc<HotColdDB<E, Hot, Cold>>,
execution_proof_boundary: Epoch,
) {
if let Err(e) = db.try_prune_execution_proofs(false, execution_proof_boundary) {
error!(
error = ?e,
"Execution proof pruning failed"
);
}
}

/// If configured to run in the background, send `notif` to the background thread.
///
/// Return `None` if the message was sent to the background thread, `Some(notif)` otherwise.
Expand Down Expand Up @@ -440,11 +463,15 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
let mut manual_finalization_notif = None;
let mut manual_compaction_notif = None;
let mut prune_blobs_notif = None;
let mut prune_execution_proofs_notif = None;
match notif {
Notification::Reconstruction => reconstruction_notif = Some(notif),
Notification::Finalization(fin) => finalization_notif = Some(fin),
Notification::ManualFinalization(fin) => manual_finalization_notif = Some(fin),
Notification::PruneBlobs(dab) => prune_blobs_notif = Some(dab),
Notification::PruneExecutionProofs(epb) => {
prune_execution_proofs_notif = Some(epb)
}
Notification::ManualCompaction => manual_compaction_notif = Some(notif),
}
// Read the rest of the messages in the channel, taking the best of each type.
Expand Down Expand Up @@ -475,6 +502,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
Notification::PruneBlobs(dab) => {
prune_blobs_notif = std::cmp::max(prune_blobs_notif, Some(dab));
}
Notification::PruneExecutionProofs(epb) => {
prune_execution_proofs_notif =
std::cmp::max(prune_execution_proofs_notif, Some(epb));
}
}
}
// Run finalization and blob pruning migrations first, then a reconstruction batch.
Expand All @@ -489,6 +520,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
if let Some(dab) = prune_blobs_notif {
Self::run_prune_blobs(db.clone(), dab);
}
if let Some(epb) = prune_execution_proofs_notif {
Self::run_prune_execution_proofs(db.clone(), epb);
}
if reconstruction_notif.is_some() {
Self::run_reconstruction(db.clone(), Some(inner_tx.clone()));
}
Expand Down
6 changes: 3 additions & 3 deletions beacon_node/beacon_chain/tests/schema_stability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ async fn schema_stability() {
fn check_db_columns() {
let current_columns: Vec<&'static str> = DBColumn::iter().map(|c| c.as_str()).collect();
let expected_columns = vec![
"bma", "blk", "blb", "bdc", "bdi", "ste", "hsd", "hsn", "bsn", "bsd", "bss", "bs3", "bcs",
"bst", "exp", "bch", "opo", "etc", "frk", "pkc", "brp", "bsx", "bsr", "bbx", "bbr", "bhr",
"brm", "dht", "cus", "otb", "bhs", "olc", "lcu", "scb", "scm", "dmy",
"bma", "blk", "blb", "bdc", "bdi", "bep", "ste", "hsd", "hsn", "bsn", "bsd", "bss", "bs3",
"bcs", "bst", "exp", "bch", "opo", "etc", "frk", "pkc", "brp", "bsx", "bsr", "bbx", "bbr",
"bhr", "brm", "dht", "cus", "otb", "bhs", "olc", "lcu", "scb", "scm", "dmy",
];
assert_eq!(expected_columns, current_columns);
}
Expand Down
16 changes: 16 additions & 0 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ pub struct BeaconProcessorQueueLengths {
block_broots_queue: usize,
blob_broots_queue: usize,
execution_proof_broots_queue: usize,
execution_proof_brange_queue: usize,
blob_brange_queue: usize,
dcbroots_queue: usize,
dcbrange_queue: usize,
Expand Down Expand Up @@ -198,6 +199,7 @@ impl BeaconProcessorQueueLengths {
block_broots_queue: 1024,
blob_broots_queue: 1024,
execution_proof_broots_queue: 1024,
execution_proof_brange_queue: 1024,
blob_brange_queue: 1024,
dcbroots_queue: 1024,
dcbrange_queue: 1024,
Expand Down Expand Up @@ -620,6 +622,7 @@ pub enum Work<E: EthSpec> {
BlobsByRangeRequest(BlockingFn),
BlobsByRootsRequest(BlockingFn),
ExecutionProofsByRootsRequest(BlockingFn),
ExecutionProofsByRangeRequest(BlockingFn),
DataColumnsByRootsRequest(BlockingFn),
DataColumnsByRangeRequest(BlockingFn),
GossipBlsToExecutionChange(BlockingFn),
Expand Down Expand Up @@ -675,6 +678,7 @@ pub enum WorkType {
BlobsByRangeRequest,
BlobsByRootsRequest,
ExecutionProofsByRootsRequest,
ExecutionProofsByRangeRequest,
DataColumnsByRootsRequest,
DataColumnsByRangeRequest,
GossipBlsToExecutionChange,
Expand Down Expand Up @@ -728,6 +732,7 @@ impl<E: EthSpec> Work<E> {
Work::BlobsByRangeRequest(_) => WorkType::BlobsByRangeRequest,
Work::BlobsByRootsRequest(_) => WorkType::BlobsByRootsRequest,
Work::ExecutionProofsByRootsRequest(_) => WorkType::ExecutionProofsByRootsRequest,
Work::ExecutionProofsByRangeRequest(_) => WorkType::ExecutionProofsByRangeRequest,
Work::DataColumnsByRootsRequest(_) => WorkType::DataColumnsByRootsRequest,
Work::DataColumnsByRangeRequest(_) => WorkType::DataColumnsByRangeRequest,
Work::LightClientBootstrapRequest(_) => WorkType::LightClientBootstrapRequest,
Expand Down Expand Up @@ -901,6 +906,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
let mut blob_broots_queue = FifoQueue::new(queue_lengths.blob_broots_queue);
let mut execution_proof_broots_queue =
FifoQueue::new(queue_lengths.execution_proof_broots_queue);
let mut execution_proof_brange_queue =
FifoQueue::new(queue_lengths.execution_proof_brange_queue);
let mut blob_brange_queue = FifoQueue::new(queue_lengths.blob_brange_queue);
let mut dcbroots_queue = FifoQueue::new(queue_lengths.dcbroots_queue);
let mut dcbrange_queue = FifoQueue::new(queue_lengths.dcbrange_queue);
Expand Down Expand Up @@ -1226,6 +1233,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
Some(item)
} else if let Some(item) = execution_proof_broots_queue.pop() {
Some(item)
} else if let Some(item) = execution_proof_brange_queue.pop() {
Some(item)
} else if let Some(item) = dcbroots_queue.pop() {
Some(item)
} else if let Some(item) = dcbrange_queue.pop() {
Expand Down Expand Up @@ -1430,6 +1439,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::ExecutionProofsByRootsRequest { .. } => {
execution_proof_broots_queue.push(work, work_id)
}
Work::ExecutionProofsByRangeRequest { .. } => {
execution_proof_brange_queue.push(work, work_id)
}
Work::DataColumnsByRootsRequest { .. } => {
dcbroots_queue.push(work, work_id)
}
Expand Down Expand Up @@ -1489,6 +1501,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
WorkType::ExecutionProofsByRootsRequest => {
execution_proof_broots_queue.len()
}
WorkType::ExecutionProofsByRangeRequest => {
execution_proof_brange_queue.len()
}
WorkType::DataColumnsByRootsRequest => dcbroots_queue.len(),
WorkType::DataColumnsByRangeRequest => dcbrange_queue.len(),
WorkType::GossipBlsToExecutionChange => {
Expand Down Expand Up @@ -1649,6 +1664,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::BlobsByRangeRequest(process_fn)
| Work::BlobsByRootsRequest(process_fn)
| Work::ExecutionProofsByRootsRequest(process_fn)
| Work::ExecutionProofsByRangeRequest(process_fn)
| Work::DataColumnsByRootsRequest(process_fn)
| Work::DataColumnsByRangeRequest(process_fn) => {
task_spawner.spawn_blocking(process_fn)
Expand Down
Loading
Loading