diff --git a/crates/rbuilder-operator/src/clickhouse.rs b/crates/rbuilder-operator/src/clickhouse.rs index 15176807e..598ee6c13 100644 --- a/crates/rbuilder-operator/src/clickhouse.rs +++ b/crates/rbuilder-operator/src/clickhouse.rs @@ -15,7 +15,10 @@ use rbuilder::{ }; use rbuilder_primitives::{Order, OrderId}; use rbuilder_utils::clickhouse::{ - backup::primitives::{ClickhouseIndexableData, ClickhouseRowExt}, + backup::{ + primitives::{ClickhouseIndexableData, ClickhouseRowExt}, + DiskBackup, DiskBackupConfig, + }, serde::{option_u256, vec_u256}, spawn_clickhouse_inserter_and_backup, }; @@ -146,14 +149,22 @@ impl BuiltBlocksWriter { let task_executor = task_manager.executor(); let (block_tx, block_rx) = mpsc::channel::(BUILT_BLOCKS_CHANNEL_SIZE); + let disk_backup = DiskBackup::new( + DiskBackupConfig::new() + .with_path(Some(config.disk_database_path)) + .with_max_size_bytes(Some( + config.disk_max_size_mb.unwrap_or(DEFAULT_MAX_DISK_SIZE_MB) * MEGA, + )), + &task_executor, + ) + .expect("could not create disk backup"); spawn_clickhouse_inserter_and_backup::( &client, block_rx, &task_executor, BLOCKS_TABLE_NAME.to_string(), "".to_string(), // No buildername used in blocks table. - Some(config.disk_database_path), - Some(config.disk_max_size_mb.unwrap_or(DEFAULT_MAX_DISK_SIZE_MB) * MEGA), + disk_backup, config .memory_max_size_mb .unwrap_or(DEFAULT_MAX_MEMORY_SIZE_MB) diff --git a/crates/rbuilder-utils/src/clickhouse/mod.rs b/crates/rbuilder-utils/src/clickhouse/mod.rs index c4580d982..ba853b0e3 100644 --- a/crates/rbuilder-utils/src/clickhouse/mod.rs +++ b/crates/rbuilder-utils/src/clickhouse/mod.rs @@ -1,7 +1,7 @@ pub mod backup; pub mod indexer; pub mod serde; -use std::{path::PathBuf, time::Duration}; +use std::time::Duration; use ::serde::{Deserialize, Serialize}; use clickhouse::Client; @@ -12,7 +12,7 @@ use crate::clickhouse::{ backup::{ metrics::Metrics, primitives::{ClickhouseIndexableData, ClickhouseRowExt}, - Backup, DiskBackup, DiskBackupConfig, MemoryBackupConfig, + Backup, DiskBackup, MemoryBackupConfig, }, indexer::{default_inserter, ClickhouseInserter, InserterRunner}, }; @@ -72,21 +72,13 @@ pub fn spawn_clickhouse_inserter_and_backup< task_executor: &TaskExecutor, clickhouse_table_name: String, builder_name: String, - disk_database_path: Option>, - disk_max_size_bytes: Option, + disk_backup_db: DiskBackup, memory_max_size_bytes: u64, tracing_target: &'static str, ) where for<'a> ::Value<'a>: Sync, { let backup_table_name = RowType::TABLE_NAME.to_string(); - let disk_backup = DiskBackup::new( - DiskBackupConfig::new() - .with_path(disk_database_path) - .with_max_size_bytes(disk_max_size_bytes), // 1 GiB - task_executor, - ) - .expect("could not create disk backup"); let (failed_commit_tx, failed_commit_rx) = mpsc::channel(BACKUP_INPUT_CHANNEL_BUFFER_SIZE); let inserter = default_inserter(client, &clickhouse_table_name); let inserter = ClickhouseInserter::<_, MetricsType>::new(inserter, failed_commit_tx); @@ -99,7 +91,7 @@ pub fn spawn_clickhouse_inserter_and_backup< Some(CLICKHOUSE_INSERT_TIMEOUT), Some(CLICKHOUSE_END_TIMEOUT), ), - disk_backup.clone(), + disk_backup_db, ) .with_memory_backup_config(MemoryBackupConfig::new(memory_max_size_bytes)); inserter_runner.spawn(task_executor, backup_table_name.clone(), tracing_target);