Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/core/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ indoc.workspace = true
itertools.workspace = true
js-runtime = { path = "../js-runtime" }
metadata-db = { path = "../metadata-db" }
monitoring = { path = "../monitoring" }
object_store.workspace = true
rand.workspace = true
regex.workspace = true
Expand Down
52 changes: 41 additions & 11 deletions crates/core/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,18 +216,48 @@ impl<T: BlockStreamer + Send + Sync> BlockStreamer for BlockStreamerWithRetry<T>
yield block;
}
Err(e) => {
let error_source = monitoring::logging::error_source(e.as_ref());
// Progressively more severe logging and longer retry interval.
if num_retries < DEBUG_RETRY_LIMIT {
num_retries += 1;
tracing::debug!(block = %current_block, error = %e, "Block streaming failed, retrying");
tokio::time::sleep(DEBUG_RETRY_DELAY).await;
} else if num_retries < WARN_RETRY_LIMIT {
num_retries += 1;
tracing::warn!(block = %current_block, error = %e, "Block streaming failed, retrying");
tokio::time::sleep(WARN_RETRY_DELAY).await;
} else {
tracing::error!(block = %current_block, error = %e, "Block streaming failed, retrying");
tokio::time::sleep(ERROR_RETRY_DELAY).await;
match num_retries {
0 => {
// First error, make sure it is visible in info (default) logs.
num_retries += 1;
tracing::info!(
block = %current_block,
error = %e,
error_source,
"Block streaming failed, retrying"
);
tokio::time::sleep(DEBUG_RETRY_DELAY).await;
}
1..DEBUG_RETRY_LIMIT => {
num_retries += 1;
tracing::debug!(
block = %current_block,
error = %e,
error_source,
"Block streaming failed, retrying");
tokio::time::sleep(DEBUG_RETRY_DELAY).await;
}
DEBUG_RETRY_LIMIT..WARN_RETRY_LIMIT => {
num_retries += 1;
tracing::warn!(
block = %current_block,
error = %e,
error_source,
"Block streaming failed, retrying"
);
tokio::time::sleep(WARN_RETRY_DELAY).await;
}
_ => {
tracing::error!(
block = %current_block,
error = %e,
error_source,
"Block streaming failed, retrying"
);
tokio::time::sleep(ERROR_RETRY_DELAY).await;
}
}
current_block = start + blocks_sent;
continue 'retry;
Expand Down
1 change: 0 additions & 1 deletion crates/core/monitoring/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ version.workspace = true
license-file.workspace = true

[dependencies]
common = { path = "../common" }
opentelemetry.workspace = true
opentelemetry-otlp.workspace = true
opentelemetry_sdk.workspace = true
Expand Down
1 change: 1 addition & 0 deletions crates/extractors/solana/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ bs58 = "0.5.1"
common = { path = "../../core/common" }
datasets-common = { path = "../../core/datasets-common" }
futures.workspace = true
fs-err.workspace = true
governor.workspace = true
monitoring = { path = "../../core/monitoring" }
reqwest.workspace = true
Expand Down
219 changes: 153 additions & 66 deletions crates/extractors/solana/src/of1_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,24 @@ use crate::{metrics, rpc_client};

const OLD_FAITHFUL_ARCHIVE_URL: &str = "https://files.old-faithful.net";

#[derive(Debug, Default)]
pub(crate) struct DecodedBlock {
pub(crate) slot: Slot,
pub(crate) parent_slot: Slot,

pub(crate) blockhash: [u8; 32],
pub(crate) prev_blockhash: [u8; 32],

pub(crate) block_height: Option<u64>,
pub(crate) blocktime: u64,

pub(crate) transactions: Vec<solana_sdk::transaction::VersionedTransaction>,
pub(crate) transaction_metas: Vec<solana_storage_proto::confirmed_block::TransactionStatusMeta>,

#[allow(dead_code)]
pub(crate) block_rewards: Vec<solana_storage_proto::confirmed_block::Rewards>,
}

#[allow(clippy::too_many_arguments)]
pub(crate) fn stream(
start: solana_clock::Slot,
Expand Down Expand Up @@ -64,24 +82,31 @@ pub(crate) fn stream(
}
};

let reqwest_client = reqwest::Client::new();

// Download historical data via Old Faithful archive CAR files.
'epochs: loop {
tracing::debug!(epoch, "processing Old Faithful CAR file");
let epoch_car_file_path = of1_car_directory.join(of1_car_filename(epoch));

if !std::fs::exists(&epoch_car_file_path)?
&& let Err(e) = download_of1_car_file(
epoch,
&of1_car_directory,
metrics.clone(),
&provider,
&network,
).await
{
let local_filename = format!("epoch-{}.car", epoch);
let epoch_car_file_path = of1_car_directory.join(local_filename);
if let Err(e) = download_of1_car_file(
epoch,
&reqwest_client,
&epoch_car_file_path,
metrics.clone(),
&provider,
&network,
).await {
if let FileDownloadError::Http(404) = e {
// No more epoch CAR files available.
break 'epochs;
} else {
tracing::debug!("failed to download Old Faithful CAR file");

if let Some(metrics) = &metrics {
metrics.record_of1_car_download_error(epoch, &provider, &network);
}

yield Err(e.into());
return;
}
Expand Down Expand Up @@ -119,65 +144,139 @@ pub(crate) fn stream(
}

/// Downloads the Old Faithful CAR file for the given epoch into the specified output directory.
///
/// If the file was partially downloaded before, the download will resume from where it left off.
async fn download_of1_car_file(
epoch: solana_clock::Epoch,
output_dir: &Path,
reqwest_client: &reqwest::Client,
dest: &Path,
metrics: Option<Arc<metrics::MetricsRegistry>>,
provider: &str,
network: &str,
) -> Result<(), FileDownloadError> {
let filename = of1_car_filename(epoch);
let car_file_url = format!("{OLD_FAITHFUL_ARCHIVE_URL}/{epoch}/{filename}");
tracing::info!(%car_file_url, "downloading Old Faithful CAR file");
enum DownloadAction {
Download,
Resume(u64),
Restart,
Skip,
}

let car_file_path = output_dir.join(filename);
let mut file = tokio::fs::File::create(&car_file_path).await?;
let download_url = of1_car_download_url(epoch);

let start = std::time::Instant::now();
let response = reqwest::get(&car_file_url).await?;
let status = response.status();
let action = match fs_err::metadata(dest).map(|meta| meta.len()) {
Ok(0) => DownloadAction::Download,
Ok(local_file_size) => {
// Get the actual file size from the server to determine if we need to resume.
let head_response = reqwest_client.head(&download_url).send().await?;

if !status.is_success() {
tracing::debug!(
%status,
"failed to download Old Faithful CAR file"
);
if head_response.status() != reqwest::StatusCode::OK {
return Err(FileDownloadError::Http(head_response.status().as_u16()));
}

if let Some(metrics) = metrics {
metrics.record_of1_car_download_error(epoch, provider, network);
let Some(content_length) = head_response.headers().get(reqwest::header::CONTENT_LENGTH)
else {
return Err(FileDownloadError::MissingContentLengthHeader);
};
let remote_file_size = content_length
.to_str()
.map_err(|_| FileDownloadError::ContentLengthParsing)?
.parse()
.map_err(|_| FileDownloadError::ContentLengthParsing)?;

match local_file_size.cmp(&remote_file_size) {
// Local file is partially downloaded, need to resume.
std::cmp::Ordering::Less => DownloadAction::Resume(local_file_size),
// Local file is larger than remote file, need to restart download.
std::cmp::Ordering::Greater => DownloadAction::Restart,
// File already fully downloaded.
std::cmp::Ordering::Equal => DownloadAction::Skip,
}
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => DownloadAction::Download,
Err(e) => return Err(FileDownloadError::Io(e)),
};

// Set up HTTP headers for range requests if the file already exists.
let mut headers = reqwest::header::HeaderMap::new();

match action {
DownloadAction::Download => {
tracing::debug!(%download_url, "downloading Old Faithful CAR file");
}
DownloadAction::Resume(download_offset) => {
tracing::debug!(
%download_url,
%download_offset,
"resuming Old Faithful CAR file download"
);
let range_header = format!("bytes={download_offset}-");
let range_header_value =
reqwest::header::HeaderValue::from_str(&range_header).expect("valid range header");
headers.insert(reqwest::header::RANGE, range_header_value);
}
DownloadAction::Restart => {
tracing::debug!(
%download_url,
"local Old Faithful CAR file is larger than remote file, restarting download"
);
tokio::fs::remove_file(&dest).await?;
}
DownloadAction::Skip => {
tracing::debug!(
%download_url,
"local Old Faithful CAR file already fully downloaded, skipping"
);
return Ok(());
}
}

let start = std::time::Instant::now();

let response = reqwest_client
.get(download_url)
.headers(headers)
.send()
.await?;

let status = response.status();
if !status.is_success() {
return Err(FileDownloadError::Http(status.as_u16()));
}

if let DownloadAction::Resume(_) = action {
// Expecting a 206 Partial Content response when resuming.
if status != reqwest::StatusCode::PARTIAL_CONTENT {
return Err(FileDownloadError::PartialDownloadNotSupported);
}
}

let mut file = tokio::fs::File::options()
.create(true) // Create the file if it doesn't exist.
.append(true) // Append to the file to support resuming.
.open(&dest)
.await?;

// Stream the file content since these files can be extremely large.
let mut total_bytes = 0u64;
let mut stream = response.bytes_stream();
let mut bytes_downloaded = 0u64;
while let Some(chunk) = stream.next().await {
let chunk = match chunk {
Ok(c) => c,
Err(err) => {
if let Some(metrics) = metrics {
metrics.record_of1_car_download_error(epoch, provider, network);
}
return Err(FileDownloadError::Reqwest(err));
}
};
let chunk = chunk?;

file.write_all(&chunk).await?;

bytes_downloaded += chunk.len() as u64;

if let Some(ref metrics) = metrics {
metrics.record_of1_car_download_bytes(total_bytes, epoch, provider, network);
metrics.record_of1_car_download_bytes(chunk.len() as u64, epoch, provider, network);
}

total_bytes += chunk.len() as u64;
file.write_all(&chunk).await?;
}

let duration = start.elapsed().as_secs_f64();

tracing::info!(
epoch,
bytes = total_bytes,
duration_secs = duration,
tracing::debug!(
%epoch,
%bytes_downloaded,
duration_secs = %duration,
"downloaded Old Faithful CAR file"
);

Expand All @@ -196,6 +295,12 @@ enum FileDownloadError {
Http(u16),
#[error("Reqwest error: {0}")]
Reqwest(#[from] reqwest::Error),
#[error("missing Content-Length header in HTTP response")]
MissingContentLengthHeader,
#[error("error parsing Content-Length header")]
ContentLengthParsing,
#[error("partial downloads are not supported by the server")]
PartialDownloadNotSupported,
}

/// Read an entire block worth of nodes from the given node reader and decode them into
Expand Down Expand Up @@ -282,27 +387,9 @@ async fn read_entire_block<R: tokio::io::AsyncRead + Unpin>(
Ok(Some(block))
}

#[derive(Debug, Default)]
pub(crate) struct DecodedBlock {
pub(crate) slot: Slot,
pub(crate) parent_slot: Slot,

pub(crate) blockhash: [u8; 32],
pub(crate) prev_blockhash: [u8; 32],

pub(crate) block_height: Option<u64>,
pub(crate) blocktime: u64,

pub(crate) transactions: Vec<solana_sdk::transaction::VersionedTransaction>,
pub(crate) transaction_metas: Vec<solana_storage_proto::confirmed_block::TransactionStatusMeta>,

#[allow(dead_code)]
pub(crate) block_rewards: Vec<solana_storage_proto::confirmed_block::Rewards>,
}

/// Generates the Old Faithful epoch CAR filename for the given epoch.
/// Generates the Old Faithful CAR download URL for the given epoch.
///
/// Reference: <https://docs.old-faithful.net/references/of1-files>.
fn of1_car_filename(epoch: solana_clock::Epoch) -> String {
format!("epoch-{}.car", epoch)
fn of1_car_download_url(epoch: solana_clock::Epoch) -> String {
format!("{OLD_FAITHFUL_ARCHIVE_URL}/{epoch}/epoch-{epoch}.car")
}