Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
743 changes: 316 additions & 427 deletions Cargo.lock

Large diffs are not rendered by default.

19 changes: 10 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ semver = { version = "1.0.18", features = ["serde"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_yaml = "0.9.34"
sqlparser = "0.58"
sqlparser = "0.59"
tempfile = "3.13.0"
thiserror = "2.0"
tokio = { version = "1.36.0", features = [
Expand All @@ -124,18 +124,19 @@ url = { version = "2.5.0", features = ["serde"] }
uuid = { version = "1.11.0", features = ["v7"] }

# Datafusion and Arrow crates
arrow-flight = { version = "56", features = ["flight-sql-experimental"] }
datafusion = { version = "50", features = ["serde"] }
datafusion-tracing = { version = "50" }
datafusion-datasource = { version = "50" }
arrow-flight = { version = "57", features = ["flight-sql-experimental"] }
datafusion = { version = "51", features = ["serde"] }
datafusion-tracing = { version = "51" }
datafusion-datasource = { version = "51" }
object_store = { version = "0.12", features = ["aws", "gcp", "azure"] }

# Crates that should follow the version used by DataFusion and Arrow
prost = "0.13.3"
prost-build = "0.13.3"
tonic = { version = "0.13", features = [
prost = "0.14.1"
prost-build = "0.14.1"
tonic = { version = "0.14", features = [
"transport",
"gzip",
"tls-native-roots",
] }
tonic-build = "0.13"
tonic-prost = "0.14"
tonic-prost-build = "0.14"
4 changes: 2 additions & 2 deletions crates/arrow-to-postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ version.workspace = true
license-file.workspace = true

[dependencies]
arrow-array = { version = "56", default-features = false }
arrow-schema = { version = "56" }
arrow-array = { version = "57", default-features = false }
arrow-schema = { version = "57" }
bytes = "1.10.1"
enum_dispatch = "0.3.13"
serde_json = "1.0"
Expand Down
2 changes: 1 addition & 1 deletion crates/bin/ampd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ snmalloc = ["dep:snmalloc-rs"]
[dependencies]
clap.workspace = true
common = { path = "../../core/common" }
console-subscriber = { version = "0.4.1", default-features = false, optional = true }
console-subscriber = { version = "0.5.0", default-features = false, optional = true }
controller = { path = "../../services/controller" }
metadata-db = { path = "../../core/metadata-db" }
monitoring = { path = "../../core/monitoring" }
Expand Down
4 changes: 2 additions & 2 deletions crates/bin/ampsync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ default = []
amp-client = { path = "../../client", features = ["postgres"] }
ampctl = { path = "../ampctl" }
anyhow.workspace = true
arrow-array = { version = "56", default-features = false }
arrow-schema = { version = "56" }
arrow-array = { version = "57", default-features = false }
arrow-schema = { version = "57" }
arrow-to-postgres = { path = "../../arrow-to-postgres" }
backon.workspace = true
bytes = "1.10.1"
Expand Down
2 changes: 1 addition & 1 deletion crates/core/common/src/catalog/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@ impl TableProvider for TableSnapshot {
.with_file_groups(file_groups)
.with_limit(limit)
.with_output_ordering(output_ordering)
.with_projection(projection.cloned())
.with_projection_indices(projection.cloned())
.with_statistics(statistics)
.build(),
);
Expand Down
13 changes: 7 additions & 6 deletions crates/core/common/src/catalog/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{ops::Range, sync::Arc};

use bytes::Bytes;
use datafusion::{
datasource::physical_plan::{FileMeta, ParquetFileMetrics, ParquetFileReaderFactory},
datasource::physical_plan::{ParquetFileMetrics, ParquetFileReaderFactory},
error::{DataFusionError, Result as DataFusionResult},
parquet::{
arrow::{
Expand All @@ -14,6 +14,7 @@ use datafusion::{
},
physical_plan::metrics::ExecutionPlanMetricsSet,
};
use datafusion_datasource::PartitionedFile;
use foyer::Cache;
use futures::{TryFutureExt as _, future::BoxFuture};
use metadata_db::{FileId, LocationId, MetadataDb};
Expand Down Expand Up @@ -50,18 +51,18 @@ impl ParquetFileReaderFactory for AmpReaderFactory {
fn create_reader(
&self,
partition_index: usize,
file_meta: FileMeta,
partitioned_file: PartitionedFile,
_metadata_size_hint: Option<usize>,
metrics: &ExecutionPlanMetricsSet,
) -> DataFusionResult<Box<dyn AsyncFileReader + Send>> {
let path = file_meta.location();
let file_meta = &partitioned_file.object_meta;
let path = &file_meta.location;
let file_metrics = ParquetFileMetrics::new(partition_index, path.as_ref(), metrics);
let metadata_db = self.metadata_db.clone();
let store = Arc::clone(&self.object_store);
let inner = ParquetObjectReader::new(store, path.clone())
.with_file_size(file_meta.object_meta.size);
let inner = ParquetObjectReader::new(store, path.clone()).with_file_size(file_meta.size);
let location_id = self.location_id;
let file_id = file_meta
let file_id = partitioned_file
.extensions
.ok_or(DataFusionError::Execution(format!(
"FileMeta missing extensions for location_id: {}",
Expand Down
6 changes: 6 additions & 0 deletions crates/core/common/src/evm/udfs/eth_call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ impl EthCall {
DataType::Utf8,
]),
volatility: Volatility::Volatile,
parameter_names: Some(vec![
"from".to_string(),
"to".to_string(),
"input_data".to_string(),
"block".to_string(),
]),
},
fields: Fields::from_iter([
Field::new("data", DataType::Binary, true),
Expand Down
8 changes: 4 additions & 4 deletions crates/core/common/src/evm/udfs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ fn append_sol_value_to_builder(
.append_value(i64::try_from(s)?),
n if n <= DEC_128_MAX_BINARY_PREC => {
let val = i128::try_from(s)?;
validate_decimal_precision(val, DEC128_PREC)?;
validate_decimal_precision(val, DEC128_PREC, 0)?;
let builder = builder
.as_any_mut()
.downcast_mut::<Decimal128Builder>()
Expand All @@ -622,7 +622,7 @@ fn append_sol_value_to_builder(
}
n if n <= DEC_256_MAX_BINARY_PREC => {
let val = i256::from_le_bytes(s.to_le_bytes());
validate_decimal256_precision(val, DEC256_PREC)?;
validate_decimal256_precision(val, DEC256_PREC, 0)?;
let builder = builder
.as_any_mut()
.downcast_mut::<Decimal256Builder>()
Expand Down Expand Up @@ -671,7 +671,7 @@ fn append_sol_value_to_builder(
.append_value(u64::try_from(u)?),
n if n <= DEC_128_MAX_BINARY_PREC => {
let val = i128::try_from(u)?;
validate_decimal_precision(val, DEC128_PREC)?;
validate_decimal_precision(val, DEC128_PREC, 0)?;
let builder = builder
.as_any_mut()
.downcast_mut::<Decimal128Builder>()
Expand All @@ -682,7 +682,7 @@ fn append_sol_value_to_builder(
}
n if n <= DEC_256_MAX_BINARY_PREC => {
let val = i256::from_le_bytes(u.to_le_bytes());
validate_decimal256_precision(val, DEC256_PREC)?;
validate_decimal256_precision(val, DEC256_PREC, 0)?;
let builder = builder
.as_any_mut()
.downcast_mut::<Decimal256Builder>()
Expand Down
1 change: 1 addition & 0 deletions crates/core/common/src/js_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ impl JsUdf {
let signature = Signature {
type_signature: TypeSignature::Exact(input_types),
volatility: Volatility::Immutable,
parameter_names: None,
};

// Create UDF name based on whether schema is provided
Expand Down
4 changes: 2 additions & 2 deletions crates/core/dump/src/compaction/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use common::{
},
};
use datafusion::{
datasource::physical_plan::{FileMeta, ParquetFileReaderFactory},
datasource::{listing::PartitionedFile, physical_plan::ParquetFileReaderFactory},
error::DataFusionError,
execution::SendableRecordBatchStream,
physical_plan::{metrics::ExecutionPlanMetricsSet, stream::RecordBatchStreamAdapter},
Expand Down Expand Up @@ -57,7 +57,7 @@ impl CompactionFile {
let file_id = segment.id;
let range = segment.range.clone();

let mut file_meta = FileMeta::from(segment.object.clone());
let mut file_meta = PartitionedFile::from(segment.object.clone());

file_meta.extensions = Some(Arc::new(file_id));

Expand Down
4 changes: 2 additions & 2 deletions crates/core/dump/src/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use common::{
},
segments::BlockRange,
},
parquet::{arrow::AsyncArrowWriter, errors::ParquetError, format::KeyValue},
parquet::{arrow::AsyncArrowWriter, errors::ParquetError, file::metadata::KeyValue},
};
use metadata_db::{FileId, FooterBytes, LocationId, MetadataDb};
use object_store::{ObjectMeta, buffered::BufWriter, path::Path};
Expand Down Expand Up @@ -146,7 +146,7 @@ impl ParquetFileWriter {
self.filename,
range.start(),
range.end(),
meta.num_rows,
meta.file_metadata().num_rows(),
);

let location = Path::from_url_path(self.file_url.path())?;
Expand Down
2 changes: 2 additions & 0 deletions crates/core/js-runtime/src/convert/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ impl ToV8 for ScalarValue {
| ScalarValue::DurationMicrosecond(_)
| ScalarValue::DurationNanosecond(_)
| ScalarValue::Union(_, _, _)
| ScalarValue::Decimal32(_, _, _)
| ScalarValue::Decimal64(_, _, _)
| ScalarValue::Dictionary(_, _) => Err(BoxError::from(format!(
"{} not yet supported in functions",
self.data_type()
Expand Down
3 changes: 2 additions & 1 deletion crates/extractors/firehose/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ serde_json = { workspace = true, optional = true }
thiserror.workspace = true
tokio.workspace = true
tonic.workspace = true
tonic-prost.workspace = true
tracing.workspace = true

[dev-dependencies]
Expand All @@ -30,7 +31,7 @@ serde_json.workspace = true
# These dependencies are only included when the gen_proto cfg flag is enabled
[target.'cfg(gen_proto)'.build-dependencies]
prost-build = { workspace = true }
tonic-build = { workspace = true }
tonic-prost-build = { workspace = true }

[lints.rust]
# Allow the gen_proto cfg flag used for conditional protobuf code generation
Expand Down
4 changes: 2 additions & 2 deletions crates/extractors/firehose/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// These comments break doc tests, so we disable them.
prost_config.disable_comments(&["google.protobuf.Timestamp", "google.protobuf.Any"]);

let config = tonic_build::configure()
let config = tonic_prost_build::configure()
.build_server(false)
.out_dir("src/proto");

Expand All @@ -22,7 +22,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]")
.emit_rerun_if_changed(false); // See https://github.com/hyperium/tonic/issues/1070

config.compile_protos_with_config(
config.compile_with_config(
prost_config,
&["proto/firehose.proto", "proto/ethereum.proto"],
&[""],
Expand Down
1 change: 0 additions & 1 deletion crates/extractors/firehose/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ pub struct AuthInterceptor {
}

impl AuthInterceptor {
#[expect(clippy::result_large_err)]
pub fn new(token: Option<String>) -> Result<Self, Error> {
Ok(AuthInterceptor {
token: token
Expand Down
1 change: 0 additions & 1 deletion crates/extractors/firehose/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ pub mod dataset;
mod dataset_kind;
pub mod evm;
pub mod metrics;
#[expect(clippy::doc_overindented_list_items)]
#[expect(clippy::enum_variant_names)]
mod proto;

Expand Down
17 changes: 8 additions & 9 deletions crates/extractors/firehose/src/proto/google.protobuf.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// This file is @generated by prost-build.
#[derive(serde::Serialize, serde::Deserialize)]
#[derive(Clone, PartialEq, ::prost::Message)]
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
pub struct Any {
/// A URL/resource name that uniquely identifies the type of the serialized
/// protocol buffer message. This string must contain at least
Expand All @@ -15,13 +15,13 @@ pub struct Any {
/// server that maps type URLs to message definitions as follows:
///
/// * If no scheme is provided, `https` is assumed.
/// * An HTTP GET on the URL must yield a [google.protobuf.Type][]
/// value in binary format, or produce an error.
/// * An HTTP GET on the URL must yield a \[google.protobuf.Type\]\[\]
/// value in binary format, or produce an error.
/// * Applications are allowed to cache lookup results based on the
/// URL, or have them precompiled into a binary to avoid any
/// lookup. Therefore, binary compatibility needs to be preserved
/// on changes to types. (Use versioned type names to manage
/// breaking changes.)
/// URL, or have them precompiled into a binary to avoid any
/// lookup. Therefore, binary compatibility needs to be preserved
/// on changes to types. (Use versioned type names to manage
/// breaking changes.)
///
/// Note: this functionality is not currently available in the official
/// protobuf release, and it is not used for type URLs beginning with
Expand All @@ -30,15 +30,14 @@ pub struct Any {
///
/// Schemes other than `http`, `https` (or the empty scheme) might be
/// used with implementation specific semantics.
///
#[prost(string, tag = "1")]
pub type_url: ::prost::alloc::string::String,
/// Must be a valid serialized protocol buffer of the above specified type.
#[prost(bytes = "vec", tag = "2")]
pub value: ::prost::alloc::vec::Vec<u8>,
}
#[derive(serde::Serialize, serde::Deserialize)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
pub struct Timestamp {
/// Represents seconds of UTC time since Unix epoch
/// 1970-01-01T00:00:00Z. Must be from 0001-01-01T00:00:00Z to
Expand Down
Loading
Loading