Skip to content

Commit 5fccac1

Browse files
authored
Add protoc support for ArrowScanExecNode (apache#20280) (apache#20284)
## Which issue does this PR close? - Closes apache#20280. ## Rationale for this change Physical plans that read Arrow files (.arrow / IPC) could not be serialized or deserialized via the proto layer. PhysicalPlanNode already had scan nodes for Parquet, CSV, JSON, Avro, and in-memory sources, but not for Arrow, so a DataSourceExec using ArrowSource was not round-trippable. That blocked use cases like distributing plans that scan Arrow files (e.g. Ballista). This change adds Arrow scan to the proto layer so those plans can be serialized and deserialized like the other file formats. ## What changes are included in this PR? Proto: Added ArrowScanExecNode (with FileScanExecConf base_conf) and arrow_scan = 38 to the PhysicalPlanNode oneof in datafusion.proto. Generated code: Updated prost.rs and pbjson.rs to include ArrowScanExecNode and the ArrowScan variant (manual edits; protoc was not run). To-proto: In try_from_data_source_exec, when the data source is a FileScanConfig whose file source is ArrowSource, it is now serialized as ArrowScanExecNode. From-proto: Implemented try_into_arrow_scan_physical_plan to deserialize ArrowScanExecNode into DataSourceExec with ArrowSource; missing base_conf returns an explicit error (no .unwrap()). Test: Added roundtrip_arrow_scan in roundtrip_physical_plan.rs to assert Arrow scan plans round-trip correctly. ## Are these changes tested? Yes. A new test roundtrip_arrow_scan builds a physical plan that scans Arrow files, serializes it to bytes and deserializes it back, and asserts the round-tripped plan matches the original. The full cargo test -p datafusion-proto suite (150 tests: unit, integration, and doc tests) passes, including all existing roundtrip and serialization tests. ## Are there any user-facing changes? No. This only extends the existing physical-plan proto support to Arrow scan. Callers that already serialize/deserialize physical plans (e.g. for distributed execution) can now round-trip plans that read Arrow files in addition to Parquet, CSV, JSON, and Avro, with no API or behavioral changes for existing usage.
1 parent aa9520e commit 5fccac1

5 files changed

Lines changed: 187 additions & 3 deletions

File tree

datafusion/proto/proto/datafusion.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -751,6 +751,7 @@ message PhysicalPlanNode {
751751
MemoryScanExecNode memory_scan = 35;
752752
AsyncFuncExecNode async_func = 36;
753753
BufferExecNode buffer = 37;
754+
ArrowScanExecNode arrow_scan = 38;
754755
}
755756
}
756757

@@ -1106,6 +1107,10 @@ message AvroScanExecNode {
11061107
FileScanExecConf base_conf = 1;
11071108
}
11081109

1110+
message ArrowScanExecNode {
1111+
FileScanExecConf base_conf = 1;
1112+
}
1113+
11091114
message MemoryScanExecNode {
11101115
repeated bytes partitions = 1;
11111116
datafusion_common.Schema schema = 2;

datafusion/proto/src/generated/pbjson.rs

Lines changed: 106 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 8 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use datafusion_datasource::file_compression_type::FileCompressionType;
3434
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
3535
use datafusion_datasource::sink::DataSinkExec;
3636
use datafusion_datasource::source::{DataSource, DataSourceExec};
37+
use datafusion_datasource_arrow::source::ArrowSource;
3738
#[cfg(feature = "avro")]
3839
use datafusion_datasource_avro::source::AvroSource;
3940
use datafusion_datasource_csv::file_format::CsvSink;
@@ -199,6 +200,9 @@ impl protobuf::PhysicalPlanNode {
199200
PhysicalPlanType::MemoryScan(scan) => {
200201
self.try_into_memory_scan_physical_plan(scan, ctx, codec, proto_converter)
201202
}
203+
PhysicalPlanType::ArrowScan(scan) => {
204+
self.try_into_arrow_scan_physical_plan(scan, ctx, codec, proto_converter)
205+
}
202206
PhysicalPlanType::CoalesceBatches(coalesce_batches) => self
203207
.try_into_coalesce_batches_physical_plan(
204208
coalesce_batches,
@@ -774,6 +778,27 @@ impl protobuf::PhysicalPlanNode {
774778
Ok(DataSourceExec::from_data_source(scan_conf))
775779
}
776780

781+
fn try_into_arrow_scan_physical_plan(
782+
&self,
783+
scan: &protobuf::ArrowScanExecNode,
784+
ctx: &TaskContext,
785+
codec: &dyn PhysicalExtensionCodec,
786+
proto_converter: &dyn PhysicalProtoConverterExtension,
787+
) -> Result<Arc<dyn ExecutionPlan>> {
788+
let base_conf = scan.base_conf.as_ref().ok_or_else(|| {
789+
internal_datafusion_err!("base_conf in ArrowScanExecNode is missing.")
790+
})?;
791+
let table_schema = parse_table_schema_from_proto(base_conf)?;
792+
let scan_conf = parse_protobuf_file_scan_config(
793+
base_conf,
794+
ctx,
795+
codec,
796+
proto_converter,
797+
Arc::new(ArrowSource::new_file_source(table_schema)),
798+
)?;
799+
Ok(DataSourceExec::from_data_source(scan_conf))
800+
}
801+
777802
#[cfg_attr(not(feature = "parquet"), expect(unused_variables))]
778803
fn try_into_parquet_scan_physical_plan(
779804
&self,
@@ -2867,6 +2892,23 @@ impl protobuf::PhysicalPlanNode {
28672892
}
28682893
}
28692894

2895+
if let Some(scan_conf) = data_source.as_any().downcast_ref::<FileScanConfig>() {
2896+
let source = scan_conf.file_source();
2897+
if let Some(_arrow_source) = source.as_any().downcast_ref::<ArrowSource>() {
2898+
return Ok(Some(protobuf::PhysicalPlanNode {
2899+
physical_plan_type: Some(PhysicalPlanType::ArrowScan(
2900+
protobuf::ArrowScanExecNode {
2901+
base_conf: Some(serialize_file_scan_config(
2902+
scan_conf,
2903+
codec,
2904+
proto_converter,
2905+
)?),
2906+
},
2907+
)),
2908+
}));
2909+
}
2910+
}
2911+
28702912
#[cfg(feature = "parquet")]
28712913
if let Some((maybe_parquet, conf)) =
28722914
data_source_exec.downcast_to_file_source::<ParquetSource>()

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ use datafusion::datasource::listing::{
3636
};
3737
use datafusion::datasource::object_store::ObjectStoreUrl;
3838
use datafusion::datasource::physical_plan::{
39-
FileGroup, FileOutputMode, FileScanConfigBuilder, FileSinkConfig, ParquetSource,
40-
wrap_partition_type_in_dict, wrap_partition_value_in_dict,
39+
ArrowSource, FileGroup, FileOutputMode, FileScanConfigBuilder, FileSinkConfig,
40+
ParquetSource, wrap_partition_type_in_dict, wrap_partition_value_in_dict,
4141
};
4242
use datafusion::datasource::sink::DataSinkExec;
4343
use datafusion::datasource::source::DataSourceExec;
@@ -929,6 +929,30 @@ fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> {
929929
roundtrip_test(DataSourceExec::from_data_source(scan_config))
930930
}
931931

932+
#[test]
933+
fn roundtrip_arrow_scan() -> Result<()> {
934+
let file_schema =
935+
Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
936+
937+
let table_schema = TableSchema::new(file_schema.clone(), vec![]);
938+
let file_source = Arc::new(ArrowSource::new_file_source(table_schema));
939+
940+
let scan_config =
941+
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
942+
.with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
943+
"/path/to/file.arrow".to_string(),
944+
1024,
945+
)])])
946+
.with_statistics(Statistics {
947+
num_rows: Precision::Inexact(100),
948+
total_byte_size: Precision::Inexact(1024),
949+
column_statistics: Statistics::unknown_column(&file_schema),
950+
})
951+
.build();
952+
953+
roundtrip_test(DataSourceExec::from_data_source(scan_config))
954+
}
955+
932956
#[tokio::test]
933957
async fn roundtrip_parquet_exec_with_table_partition_cols() -> Result<()> {
934958
let mut file_group =

0 commit comments

Comments
 (0)