Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ tokio = { version = "1.36.0", features = [
"rt-multi-thread",
"parking_lot",
] }
tokio-metrics = "0.3"
tokio-stream = "0.1.17"
tokio-util = { version = "0.7.15", features = ["rt"] }
toml = "0.8.10"
Expand Down
8 changes: 8 additions & 0 deletions crates/core/common/src/planning_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,14 @@ impl DetachedLogicalPlan {
self.0.schema().clone()
}

/// Get a reference to the inner LogicalPlan for analysis purposes.
///
/// Note: This plan still contains `PlanningTable` providers and cannot be executed
/// until it is attached to a `QueryContext` via `attach_to()`.
pub fn as_inner(&self) -> &LogicalPlan {
&self.0
}

pub fn propagate_block_num(self) -> Result<Self, DataFusionError> {
Ok(Self(propagate_block_num(self.0)?))
}
Expand Down
126 changes: 79 additions & 47 deletions crates/core/dump/src/core/raw_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,65 +405,97 @@ impl<S: BlockStreamer> DumpPartition<S> {
}

async fn run_range(&self, range: RangeInclusive<BlockNum>) -> Result<(), BoxError> {
let stream = {
let block_streamer = self.block_streamer.clone();
block_streamer
.block_stream(*range.start(), *range.end())
.await
// Get dataset name for instrumentation
let dataset = self.catalog.tables()[0]
.job_labels()
.dataset_name
.to_string();

// Track records extracted
let records_extracted = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));

// Create task type for instrumentation
let task_type = monitoring::TaskType::Dump {
dataset: dataset.clone(),
block_range: (*range.start(), *range.end()),
records_extracted: None, // Will be set later
};

// limit the missing table ranges to the partition range
let mut missing_ranges_by_table: BTreeMap<TableName, Vec<RangeInclusive<BlockNum>>> =
Default::default();
for (table, ranges) in &self.missing_ranges_by_table {
let entry = missing_ranges_by_table.entry(table.clone()).or_default();
for missing in ranges {
let start = BlockNum::max(*missing.start(), *range.start());
let end = BlockNum::min(*missing.end(), *range.end());
if start <= end {
entry.push(start..=end);
let instrumentation = monitoring::InstrumentedTaskExecution::new(task_type);
let records_counter = records_extracted.clone();

let dump_fut = async move {
let stream = {
let block_streamer = self.block_streamer.clone();
block_streamer
.block_stream(*range.start(), *range.end())
.await
};

// limit the missing table ranges to the partition range
let mut missing_ranges_by_table: BTreeMap<TableName, Vec<RangeInclusive<BlockNum>>> =
Default::default();
for (table, ranges) in &self.missing_ranges_by_table {
let entry = missing_ranges_by_table.entry(table.clone()).or_default();
for missing in ranges {
let start = BlockNum::max(*missing.start(), *range.start());
let end = BlockNum::min(*missing.end(), *range.end());
if start <= end {
entry.push(start..=end);
}
}
}
}

let mut writer = RawDatasetWriter::new(
self.catalog.clone(),
self.metadata_db.clone(),
self.parquet_opts.clone(),
missing_ranges_by_table,
self.metrics.clone(),
)?;

let mut stream = std::pin::pin!(stream);
while let Some(dataset_rows) = stream.try_next().await? {
for table_rows in dataset_rows {
if let Some(ref metrics) = self.metrics {
let mut writer = RawDatasetWriter::new(
self.catalog.clone(),
self.metadata_db.clone(),
self.parquet_opts.clone(),
missing_ranges_by_table,
self.metrics.clone(),
)?;

let mut stream = std::pin::pin!(stream);
while let Some(dataset_rows) = stream.try_next().await? {
for table_rows in dataset_rows {
let num_rows: u64 = table_rows.rows.num_rows().try_into().unwrap();
let table_name = table_rows.table.name();
let block_num = table_rows.block_num();
let physical_table = self
.catalog
.tables()
.iter()
.find(|t| t.table_name() == table_name)
.expect("table should exist");
let location_id = *physical_table.location_id();
// Record rows only (bytes tracked separately in writer)
metrics.record_ingestion_rows(num_rows, table_name.to_string(), location_id);
// Update latest block gauge
metrics.set_latest_block(block_num, table_name.to_string(), location_id);
records_counter
.fetch_add(num_rows as usize, std::sync::atomic::Ordering::Relaxed);

if let Some(ref metrics) = self.metrics {
let table_name = table_rows.table.name();
let block_num = table_rows.block_num();
let physical_table = self
.catalog
.tables()
.iter()
.find(|t| t.table_name() == table_name)
.expect("table should exist");
let location_id = *physical_table.location_id();
// Record rows only (bytes tracked separately in writer)
metrics.record_ingestion_rows(
num_rows,
table_name.to_string(),
location_id,
);
// Update latest block gauge
metrics.set_latest_block(block_num, table_name.to_string(), location_id);
}

writer.write(table_rows).await?;
}

writer.write(table_rows).await?;
self.progress_reporter.block_covered();
}

self.progress_reporter.block_covered();
}
// Close the last part file for each table, checking for any errors.
writer.close().await?;

// Close the last part file for each table, checking for any errors.
writer.close().await?;
Ok(())
};

Ok(())
// Execute with instrumentation
let (result, _metrics) = instrumentation.execute(dump_fut).await;
result
}
}

Expand Down
4 changes: 4 additions & 0 deletions crates/core/monitoring/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@ license-file.workspace = true

[dependencies]
common = { path = "../common" }
datafusion.workspace = true
opentelemetry.workspace = true
opentelemetry-otlp.workspace = true
opentelemetry_sdk.workspace = true
tokio.workspace = true
tokio-metrics.workspace = true
serde_json.workspace = true
tracing.workspace = true
tracing-opentelemetry.workspace = true
tracing-subscriber.workspace = true
uuid.workspace = true


[dev-dependencies]
Expand Down
Loading