Skip to content

Commit 471abd3

Browse files
committed
replace coerce_batch_to_schema to BatchAdapter
1 parent eadd337 commit 471abd3

3 files changed

Lines changed: 13 additions & 36 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/datasource-avro/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ async-trait = { workspace = true }
3737
bytes = { workspace = true }
3838
datafusion-common = { workspace = true, features = ["object_store"] }
3939
datafusion-datasource = { workspace = true }
40+
datafusion-physical-expr-adapter = { workspace = true }
4041
datafusion-physical-plan = { workspace = true }
4142
datafusion-session = { workspace = true }
4243
futures = { workspace = true }

datafusion/datasource-avro/src/source.rs

Lines changed: 11 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020
use std::any::Any;
2121
use std::sync::Arc;
2222

23-
use arrow::array::{ArrayRef, RecordBatch, new_null_array};
24-
use arrow::compute::cast;
2523
use arrow::datatypes::{Schema, SchemaRef};
2624
use arrow_avro::reader::{Reader, ReaderBuilder};
2725
use datafusion_common::error::Result;
@@ -31,6 +29,7 @@ use datafusion_datasource::file::FileSource;
3129
use datafusion_datasource::file_scan_config::FileScanConfig;
3230
use datafusion_datasource::file_stream::FileOpener;
3331
use datafusion_datasource::projection::{ProjectionOpener, SplitProjection};
32+
use datafusion_physical_expr_adapter::BatchAdapterFactory;
3433
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
3534
use datafusion_physical_plan::projection::ProjectionExprs;
3635

@@ -114,28 +113,6 @@ impl AvroSource {
114113
}
115114
}
116115

117-
fn coerce_batch_to_schema(
118-
batch: &RecordBatch,
119-
target_schema: SchemaRef,
120-
) -> Result<RecordBatch> {
121-
let mut columns = Vec::with_capacity(target_schema.fields().len());
122-
for field in target_schema.fields() {
123-
let array: ArrayRef = match batch.schema().column_with_name(field.name()) {
124-
Some((idx, _)) => {
125-
let source_array = Arc::clone(batch.column(idx));
126-
if source_array.data_type() == field.data_type() {
127-
source_array
128-
} else {
129-
cast(&source_array, field.data_type())?
130-
}
131-
}
132-
None => new_null_array(field.data_type(), batch.num_rows()),
133-
};
134-
columns.push(array);
135-
}
136-
Ok(RecordBatch::try_new(target_schema, columns)?)
137-
}
138-
139116
impl FileSource for AvroSource {
140117
fn create_file_opener(
141118
&self,
@@ -251,14 +228,13 @@ mod private {
251228
file.rewind()?;
252229
let reader =
253230
config.open(BufReader::new(file), writer_projection)?;
231+
let batch_adapter =
232+
BatchAdapterFactory::new(Arc::clone(&projected_file_schema))
233+
.make_adapter(&reader.schema())?;
254234
Ok(futures::stream::iter(reader)
255235
.map(move |r| {
256-
r.map_err(Into::into).and_then(|batch| {
257-
coerce_batch_to_schema(
258-
&batch,
259-
Arc::clone(&projected_file_schema),
260-
)
261-
})
236+
r.map_err(Into::into)
237+
.and_then(|batch| batch_adapter.adapt_batch(&batch))
262238
})
263239
.boxed())
264240
}
@@ -274,14 +250,13 @@ mod private {
274250
);
275251
let reader = config
276252
.open(BufReader::new(bytes.reader()), writer_projection)?;
253+
let batch_adapter =
254+
BatchAdapterFactory::new(Arc::clone(&projected_file_schema))
255+
.make_adapter(&reader.schema())?;
277256
Ok(futures::stream::iter(reader)
278257
.map(move |r| {
279-
r.map_err(Into::into).and_then(|batch| {
280-
coerce_batch_to_schema(
281-
&batch,
282-
Arc::clone(&projected_file_schema),
283-
)
284-
})
258+
r.map_err(Into::into)
259+
.and_then(|batch| batch_adapter.adapt_batch(&batch))
285260
})
286261
.boxed())
287262
}

0 commit comments

Comments
 (0)