diff --git a/Cargo.lock b/Cargo.lock index 098a7440a..379757df1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2649,6 +2649,7 @@ dependencies = [ "itertools 0.14.0", "js-runtime", "metadata-db", + "monitoring", "object_store", "rand 0.9.2", "regex", @@ -6651,7 +6652,6 @@ name = "monitoring" version = "0.1.0" dependencies = [ "cargo_metadata", - "common", "opentelemetry", "opentelemetry-otlp", "opentelemetry_sdk", diff --git a/crates/core/common/Cargo.toml b/crates/core/common/Cargo.toml index 0f1dcd65e..436568f4d 100644 --- a/crates/core/common/Cargo.toml +++ b/crates/core/common/Cargo.toml @@ -26,6 +26,7 @@ indoc.workspace = true itertools.workspace = true js-runtime = { path = "../js-runtime" } metadata-db = { path = "../metadata-db" } +monitoring = { path = "../monitoring" } object_store.workspace = true rand.workspace = true regex.workspace = true diff --git a/crates/core/common/src/lib.rs b/crates/core/common/src/lib.rs index e12f439ed..f68471850 100644 --- a/crates/core/common/src/lib.rs +++ b/crates/core/common/src/lib.rs @@ -216,18 +216,48 @@ impl BlockStreamer for BlockStreamerWithRetry yield block; } Err(e) => { + let error_source = monitoring::logging::error_source(e.as_ref()); // Progressively more severe logging and longer retry interval. - if num_retries < DEBUG_RETRY_LIMIT { - num_retries += 1; - tracing::debug!(block = %current_block, error = %e, "Block streaming failed, retrying"); - tokio::time::sleep(DEBUG_RETRY_DELAY).await; - } else if num_retries < WARN_RETRY_LIMIT { - num_retries += 1; - tracing::warn!(block = %current_block, error = %e, "Block streaming failed, retrying"); - tokio::time::sleep(WARN_RETRY_DELAY).await; - } else { - tracing::error!(block = %current_block, error = %e, "Block streaming failed, retrying"); - tokio::time::sleep(ERROR_RETRY_DELAY).await; + match num_retries { + 0 => { + // First error, make sure it is visible in info (default) logs. + num_retries += 1; + tracing::info!( + block = %current_block, + error = %e, + error_source, + "Block streaming failed, retrying" + ); + tokio::time::sleep(DEBUG_RETRY_DELAY).await; + } + 1..DEBUG_RETRY_LIMIT => { + num_retries += 1; + tracing::debug!( + block = %current_block, + error = %e, + error_source, + "Block streaming failed, retrying"); + tokio::time::sleep(DEBUG_RETRY_DELAY).await; + } + DEBUG_RETRY_LIMIT..WARN_RETRY_LIMIT => { + num_retries += 1; + tracing::warn!( + block = %current_block, + error = %e, + error_source, + "Block streaming failed, retrying" + ); + tokio::time::sleep(WARN_RETRY_DELAY).await; + } + _ => { + tracing::error!( + block = %current_block, + error = %e, + error_source, + "Block streaming failed, retrying" + ); + tokio::time::sleep(ERROR_RETRY_DELAY).await; + } } current_block = start + blocks_sent; continue 'retry; diff --git a/crates/core/monitoring/Cargo.toml b/crates/core/monitoring/Cargo.toml index 81cc4d691..8839493b4 100644 --- a/crates/core/monitoring/Cargo.toml +++ b/crates/core/monitoring/Cargo.toml @@ -5,7 +5,6 @@ version.workspace = true license-file.workspace = true [dependencies] -common = { path = "../common" } opentelemetry.workspace = true opentelemetry-otlp.workspace = true opentelemetry_sdk.workspace = true diff --git a/crates/extractors/solana/Cargo.toml b/crates/extractors/solana/Cargo.toml index 3075ea1b5..2ace9b501 100644 --- a/crates/extractors/solana/Cargo.toml +++ b/crates/extractors/solana/Cargo.toml @@ -15,6 +15,7 @@ bs58 = "0.5.1" common = { path = "../../core/common" } datasets-common = { path = "../../core/datasets-common" } futures.workspace = true +fs-err.workspace = true governor.workspace = true monitoring = { path = "../../core/monitoring" } reqwest.workspace = true diff --git a/crates/extractors/solana/src/of1_client.rs b/crates/extractors/solana/src/of1_client.rs index 8fae7bf5f..248c8b612 100644 --- a/crates/extractors/solana/src/of1_client.rs +++ b/crates/extractors/solana/src/of1_client.rs @@ -13,6 +13,24 @@ use crate::{metrics, rpc_client}; const OLD_FAITHFUL_ARCHIVE_URL: &str = "https://files.old-faithful.net"; +#[derive(Debug, Default)] +pub(crate) struct DecodedBlock { + pub(crate) slot: Slot, + pub(crate) parent_slot: Slot, + + pub(crate) blockhash: [u8; 32], + pub(crate) prev_blockhash: [u8; 32], + + pub(crate) block_height: Option, + pub(crate) blocktime: u64, + + pub(crate) transactions: Vec, + pub(crate) transaction_metas: Vec, + + #[allow(dead_code)] + pub(crate) block_rewards: Vec, +} + #[allow(clippy::too_many_arguments)] pub(crate) fn stream( start: solana_clock::Slot, @@ -64,24 +82,31 @@ pub(crate) fn stream( } }; + let reqwest_client = reqwest::Client::new(); + // Download historical data via Old Faithful archive CAR files. 'epochs: loop { tracing::debug!(epoch, "processing Old Faithful CAR file"); - let epoch_car_file_path = of1_car_directory.join(of1_car_filename(epoch)); - - if !std::fs::exists(&epoch_car_file_path)? - && let Err(e) = download_of1_car_file( - epoch, - &of1_car_directory, - metrics.clone(), - &provider, - &network, - ).await - { + let local_filename = format!("epoch-{}.car", epoch); + let epoch_car_file_path = of1_car_directory.join(local_filename); + if let Err(e) = download_of1_car_file( + epoch, + &reqwest_client, + &epoch_car_file_path, + metrics.clone(), + &provider, + &network, + ).await { if let FileDownloadError::Http(404) = e { // No more epoch CAR files available. break 'epochs; } else { + tracing::debug!("failed to download Old Faithful CAR file"); + + if let Some(metrics) = &metrics { + metrics.record_of1_car_download_error(epoch, &provider, &network); + } + yield Err(e.into()); return; } @@ -119,65 +144,139 @@ pub(crate) fn stream( } /// Downloads the Old Faithful CAR file for the given epoch into the specified output directory. +/// +/// If the file was partially downloaded before, the download will resume from where it left off. async fn download_of1_car_file( epoch: solana_clock::Epoch, - output_dir: &Path, + reqwest_client: &reqwest::Client, + dest: &Path, metrics: Option>, provider: &str, network: &str, ) -> Result<(), FileDownloadError> { - let filename = of1_car_filename(epoch); - let car_file_url = format!("{OLD_FAITHFUL_ARCHIVE_URL}/{epoch}/{filename}"); - tracing::info!(%car_file_url, "downloading Old Faithful CAR file"); + enum DownloadAction { + Download, + Resume(u64), + Restart, + Skip, + } - let car_file_path = output_dir.join(filename); - let mut file = tokio::fs::File::create(&car_file_path).await?; + let download_url = of1_car_download_url(epoch); - let start = std::time::Instant::now(); - let response = reqwest::get(&car_file_url).await?; - let status = response.status(); + let action = match fs_err::metadata(dest).map(|meta| meta.len()) { + Ok(0) => DownloadAction::Download, + Ok(local_file_size) => { + // Get the actual file size from the server to determine if we need to resume. + let head_response = reqwest_client.head(&download_url).send().await?; - if !status.is_success() { - tracing::debug!( - %status, - "failed to download Old Faithful CAR file" - ); + if head_response.status() != reqwest::StatusCode::OK { + return Err(FileDownloadError::Http(head_response.status().as_u16())); + } - if let Some(metrics) = metrics { - metrics.record_of1_car_download_error(epoch, provider, network); + let Some(content_length) = head_response.headers().get(reqwest::header::CONTENT_LENGTH) + else { + return Err(FileDownloadError::MissingContentLengthHeader); + }; + let remote_file_size = content_length + .to_str() + .map_err(|_| FileDownloadError::ContentLengthParsing)? + .parse() + .map_err(|_| FileDownloadError::ContentLengthParsing)?; + + match local_file_size.cmp(&remote_file_size) { + // Local file is partially downloaded, need to resume. + std::cmp::Ordering::Less => DownloadAction::Resume(local_file_size), + // Local file is larger than remote file, need to restart download. + std::cmp::Ordering::Greater => DownloadAction::Restart, + // File already fully downloaded. + std::cmp::Ordering::Equal => DownloadAction::Skip, + } + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => DownloadAction::Download, + Err(e) => return Err(FileDownloadError::Io(e)), + }; + + // Set up HTTP headers for range requests if the file already exists. + let mut headers = reqwest::header::HeaderMap::new(); + + match action { + DownloadAction::Download => { + tracing::debug!(%download_url, "downloading Old Faithful CAR file"); + } + DownloadAction::Resume(download_offset) => { + tracing::debug!( + %download_url, + %download_offset, + "resuming Old Faithful CAR file download" + ); + let range_header = format!("bytes={download_offset}-"); + let range_header_value = + reqwest::header::HeaderValue::from_str(&range_header).expect("valid range header"); + headers.insert(reqwest::header::RANGE, range_header_value); + } + DownloadAction::Restart => { + tracing::debug!( + %download_url, + "local Old Faithful CAR file is larger than remote file, restarting download" + ); + tokio::fs::remove_file(&dest).await?; } + DownloadAction::Skip => { + tracing::debug!( + %download_url, + "local Old Faithful CAR file already fully downloaded, skipping" + ); + return Ok(()); + } + } + + let start = std::time::Instant::now(); + let response = reqwest_client + .get(download_url) + .headers(headers) + .send() + .await?; + + let status = response.status(); + if !status.is_success() { return Err(FileDownloadError::Http(status.as_u16())); } + if let DownloadAction::Resume(_) = action { + // Expecting a 206 Partial Content response when resuming. + if status != reqwest::StatusCode::PARTIAL_CONTENT { + return Err(FileDownloadError::PartialDownloadNotSupported); + } + } + + let mut file = tokio::fs::File::options() + .create(true) // Create the file if it doesn't exist. + .append(true) // Append to the file to support resuming. + .open(&dest) + .await?; + // Stream the file content since these files can be extremely large. - let mut total_bytes = 0u64; let mut stream = response.bytes_stream(); + let mut bytes_downloaded = 0u64; while let Some(chunk) = stream.next().await { - let chunk = match chunk { - Ok(c) => c, - Err(err) => { - if let Some(metrics) = metrics { - metrics.record_of1_car_download_error(epoch, provider, network); - } - return Err(FileDownloadError::Reqwest(err)); - } - }; + let chunk = chunk?; + + file.write_all(&chunk).await?; + + bytes_downloaded += chunk.len() as u64; if let Some(ref metrics) = metrics { - metrics.record_of1_car_download_bytes(total_bytes, epoch, provider, network); + metrics.record_of1_car_download_bytes(chunk.len() as u64, epoch, provider, network); } - - total_bytes += chunk.len() as u64; - file.write_all(&chunk).await?; } let duration = start.elapsed().as_secs_f64(); - tracing::info!( - epoch, - bytes = total_bytes, - duration_secs = duration, + tracing::debug!( + %epoch, + %bytes_downloaded, + duration_secs = %duration, "downloaded Old Faithful CAR file" ); @@ -196,6 +295,12 @@ enum FileDownloadError { Http(u16), #[error("Reqwest error: {0}")] Reqwest(#[from] reqwest::Error), + #[error("missing Content-Length header in HTTP response")] + MissingContentLengthHeader, + #[error("error parsing Content-Length header")] + ContentLengthParsing, + #[error("partial downloads are not supported by the server")] + PartialDownloadNotSupported, } /// Read an entire block worth of nodes from the given node reader and decode them into @@ -282,27 +387,9 @@ async fn read_entire_block( Ok(Some(block)) } -#[derive(Debug, Default)] -pub(crate) struct DecodedBlock { - pub(crate) slot: Slot, - pub(crate) parent_slot: Slot, - - pub(crate) blockhash: [u8; 32], - pub(crate) prev_blockhash: [u8; 32], - - pub(crate) block_height: Option, - pub(crate) blocktime: u64, - - pub(crate) transactions: Vec, - pub(crate) transaction_metas: Vec, - - #[allow(dead_code)] - pub(crate) block_rewards: Vec, -} - -/// Generates the Old Faithful epoch CAR filename for the given epoch. +/// Generates the Old Faithful CAR download URL for the given epoch. /// /// Reference: . -fn of1_car_filename(epoch: solana_clock::Epoch) -> String { - format!("epoch-{}.car", epoch) +fn of1_car_download_url(epoch: solana_clock::Epoch) -> String { + format!("{OLD_FAITHFUL_ARCHIVE_URL}/{epoch}/epoch-{epoch}.car") }