Skip to content

Commit a0755b7

Browse files
committed
avro: remap projection pushdown to writer schema ordinals
1 parent 6831da1 commit a0755b7

1 file changed

Lines changed: 62 additions & 12 deletions

File tree

datafusion/datasource-avro/src/source.rs

Lines changed: 62 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,17 @@ impl AvroSource {
5757
}
5858
}
5959

60-
fn open<R: std::io::BufRead>(&self, reader: R) -> Result<Reader<R>> {
61-
// Do not push projection ordinals into `arrow-avro` here. Those ordinals are
62-
// based on DataFusion's logical file schema, while `arrow-avro` without a reader
63-
// schema interprets them against the writer schema of each file.
64-
// We apply projection by column name after decode in `coerce_batch_to_schema`.
65-
ReaderBuilder::new()
66-
.with_batch_size(self.batch_size.expect("Batch size must set before open"))
67-
.build(reader)
68-
.map_err(Into::into)
60+
fn open<R: std::io::BufRead>(
61+
&self,
62+
reader: R,
63+
projection: Option<Vec<usize>>,
64+
) -> Result<Reader<R>> {
65+
let mut builder = ReaderBuilder::new()
66+
.with_batch_size(self.batch_size.expect("Batch size must set before open"));
67+
if let Some(projection) = projection {
68+
builder = builder.with_projection(projection);
69+
}
70+
builder.build(reader).map_err(Into::into)
6971
}
7072

7173
fn projected_file_schema(&self) -> SchemaRef {
@@ -82,6 +84,34 @@ impl AvroSource {
8284
.collect::<Vec<_>>(),
8385
))
8486
}
87+
88+
fn writer_projection_for_schema(
89+
&self,
90+
writer_schema: &Schema,
91+
target_schema: &Schema,
92+
) -> Option<Vec<usize>> {
93+
// `arrow-avro` accepts projection ordinals against the file's writer schema,
94+
// while DataFusion plans projection against the logical table schema. Remap
95+
// projected column names to writer ordinals so reader-level pushdown still
96+
// preserves DataFusion's existing name-based projection semantics.
97+
let projection = target_schema
98+
.fields()
99+
.iter()
100+
.filter_map(|field| {
101+
writer_schema
102+
.column_with_name(field.name())
103+
.map(|(idx, _)| idx)
104+
})
105+
.collect::<Vec<_>>();
106+
107+
let identity_projection = projection.len() == writer_schema.fields().len()
108+
&& projection
109+
.iter()
110+
.enumerate()
111+
.all(|(idx, value)| idx == *value);
112+
113+
(!identity_projection).then_some(projection)
114+
}
85115
}
86116

87117
fn coerce_batch_to_schema(
@@ -186,6 +216,7 @@ impl FileSource for AvroSource {
186216
mod private {
187217
use super::*;
188218
use std::io::BufReader;
219+
use std::io::Seek;
189220

190221
use bytes::Buf;
191222
use datafusion_datasource::{PartitionedFile, file_stream::FileOpenFuture};
@@ -208,8 +239,18 @@ mod private {
208239
.get(&partitioned_file.object_meta.location)
209240
.await?;
210241
match r.payload {
211-
GetResultPayload::File(file, _) => {
212-
let reader = config.open(BufReader::new(file))?;
242+
GetResultPayload::File(mut file, _) => {
243+
// Probe the writer schema first so logical projected columns can be
244+
// translated to the writer-schema ordinals expected by `arrow-avro`.
245+
let probe_reader =
246+
config.open(BufReader::new(file.try_clone()?), None)?;
247+
let writer_projection = config.writer_projection_for_schema(
248+
probe_reader.schema().as_ref(),
249+
projected_file_schema.as_ref(),
250+
);
251+
file.rewind()?;
252+
let reader =
253+
config.open(BufReader::new(file), writer_projection)?;
213254
Ok(futures::stream::iter(reader)
214255
.map(move |r| {
215256
r.map_err(Into::into).and_then(|batch| {
@@ -223,7 +264,16 @@ mod private {
223264
}
224265
GetResultPayload::Stream(_) => {
225266
let bytes = r.bytes().await?;
226-
let reader = config.open(BufReader::new(bytes.reader()))?;
267+
// As above, inspect the writer schema before constructing the real
268+
// reader so `with_projection` can use writer-schema ordinals.
269+
let probe_reader =
270+
config.open(BufReader::new(bytes.clone().reader()), None)?;
271+
let writer_projection = config.writer_projection_for_schema(
272+
probe_reader.schema().as_ref(),
273+
projected_file_schema.as_ref(),
274+
);
275+
let reader = config
276+
.open(BufReader::new(bytes.reader()), writer_projection)?;
227277
Ok(futures::stream::iter(reader)
228278
.map(move |r| {
229279
r.map_err(Into::into).and_then(|batch| {

0 commit comments

Comments
 (0)