Skip to content

Commit e2f63c2

Browse files
committed
avro: remove reader schema coercion and align tests
1 parent 7020131 commit e2f63c2

4 files changed

Lines changed: 67 additions & 38 deletions

File tree

datafusion/core/src/datasource/physical_plan/avro.rs

Lines changed: 9 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -172,35 +172,18 @@ mod tests {
172172
.execute(0, state.task_ctx())
173173
.expect("plan execution failed");
174174

175-
let batch = results
175+
// Avro follows the file's writer schema for projection. Referencing a
176+
// non-existent file column should fail instead of being NULL-padded.
177+
let err = results
176178
.next()
177179
.await
178180
.expect("plan iterator empty")
179-
.expect("plan iterator returned an error");
180-
181-
insta::allow_duplicates! {assert_snapshot!(batches_to_string(&[batch]), @r"
182-
+----+----------+-------------+-------------+
183-
| id | bool_col | tinyint_col | missing_col |
184-
+----+----------+-------------+-------------+
185-
| 4 | true | 0 | |
186-
| 5 | false | 1 | |
187-
| 6 | true | 0 | |
188-
| 7 | false | 1 | |
189-
| 2 | true | 0 | |
190-
| 3 | false | 1 | |
191-
| 0 | true | 0 | |
192-
| 1 | false | 1 | |
193-
+----+----------+-------------+-------------+
194-
");}
195-
196-
let batch = results.next().await;
197-
assert!(batch.is_none());
198-
199-
let batch = results.next().await;
200-
assert!(batch.is_none());
201-
202-
let batch = results.next().await;
203-
assert!(batch.is_none());
181+
.expect_err("missing projected column should error");
182+
let err_msg = err.to_string();
183+
assert!(
184+
err_msg.contains("Projection index") && err_msg.contains("out of bounds"),
185+
"unexpected error: {err_msg}"
186+
);
204187

205188
Ok(())
206189
}

datafusion/datasource-avro/src/mod.rs

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,68 @@
3030
pub mod file_format;
3131
pub mod source;
3232

33-
use arrow::datatypes::Schema;
33+
use arrow::datatypes::{DataType, Field, Fields, Schema};
3434
pub use arrow_avro;
3535
use arrow_avro::reader::ReaderBuilder;
3636
pub use file_format::*;
3737
use std::io::{BufReader, Read};
38+
use std::sync::Arc;
3839

3940
/// Read Avro schema given a reader
4041
pub fn read_avro_schema_from_reader<R: Read>(
4142
reader: &mut R,
4243
) -> datafusion_common::Result<Schema> {
4344
let avro_reader = ReaderBuilder::new().build(BufReader::new(reader))?;
44-
Ok(avro_reader.schema().as_ref().clone())
45+
// Avro readers perform strict schema resolution rules (e.g. record identity checks)
46+
// that are stricter than DataFusion's table schema handling needs for inferred schemas.
47+
// Drop metadata from inferred schemas so runtime batches and inferred table schemas
48+
// compare consistently without requiring strict Avro metadata identity.
49+
Ok(strip_metadata_from_schema(avro_reader.schema().as_ref().clone()))
50+
}
51+
52+
fn strip_metadata_from_schema(schema: Schema) -> Schema {
53+
let fields = schema
54+
.fields
55+
.into_iter()
56+
.map(|f| Arc::new(strip_metadata_from_field(f.as_ref())))
57+
.collect::<Fields>();
58+
// Intentionally drop schema-level metadata
59+
Schema::new(fields)
60+
}
61+
62+
fn strip_metadata_from_field(field: &Field) -> Field {
63+
// Intentionally drop field-level metadata
64+
Field::new(
65+
field.name(),
66+
strip_metadata_from_data_type(field.data_type()),
67+
field.is_nullable(),
68+
)
69+
}
70+
71+
fn strip_metadata_from_data_type(data_type: &DataType) -> DataType {
72+
match data_type {
73+
DataType::Struct(fields) => DataType::Struct(
74+
fields
75+
.iter()
76+
.map(|f| Arc::new(strip_metadata_from_field(f.as_ref())))
77+
.collect(),
78+
),
79+
DataType::List(field) => {
80+
DataType::List(Arc::new(strip_metadata_from_field(field.as_ref())))
81+
}
82+
DataType::LargeList(field) => {
83+
DataType::LargeList(Arc::new(strip_metadata_from_field(field.as_ref())))
84+
}
85+
DataType::FixedSizeList(field, size) => DataType::FixedSizeList(
86+
Arc::new(strip_metadata_from_field(field.as_ref())),
87+
*size,
88+
),
89+
DataType::Map(field, sorted) => DataType::Map(
90+
Arc::new(strip_metadata_from_field(field.as_ref())),
91+
*sorted,
92+
),
93+
_ => data_type.clone(),
94+
}
4595
}
4696

4797
#[cfg(test)]

datafusion/datasource-avro/src/source.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use std::any::Any;
2121
use std::sync::Arc;
2222

2323
use arrow_avro::reader::{Reader, ReaderBuilder};
24-
use arrow_avro::schema::AvroSchema;
2524
use datafusion_common::error::Result;
2625
use datafusion_datasource::TableSchema;
2726
use datafusion_datasource::file::FileSource;
@@ -56,9 +55,6 @@ impl AvroSource {
5655

5756
fn open<R: std::io::BufRead>(&self, reader: R) -> Result<Reader<R>> {
5857
ReaderBuilder::new()
59-
.with_reader_schema(
60-
AvroSchema::try_from(self.table_schema.file_schema().as_ref()).unwrap(),
61-
)
6258
.with_batch_size(self.batch_size.expect("Batch size must set before open"))
6359
.with_projection(self.projection.file_indices.clone())
6460
.build(reader)

datafusion/sqllogictest/test_files/avro.slt

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ CREATE EXTERNAL TABLE alltypes_plain (
3131
float_col FLOAT NOT NULL,
3232
double_col DOUBLE NOT NULL,
3333
date_string_col BYTEA NOT NULL,
34-
string_col VARCHAR NOT NULL,
34+
string_col BYTEA NOT NULL,
3535
timestamp_col TIMESTAMP NOT NULL,
3636
)
3737
STORED AS AVRO
@@ -48,7 +48,7 @@ CREATE EXTERNAL TABLE alltypes_plain_snappy (
4848
float_col FLOAT NOT NULL,
4949
double_col DOUBLE NOT NULL,
5050
date_string_col BYTEA NOT NULL,
51-
string_col VARCHAR NOT NULL,
51+
string_col BYTEA NOT NULL,
5252
timestamp_col TIMESTAMP NOT NULL,
5353
)
5454
STORED AS AVRO
@@ -65,7 +65,7 @@ CREATE EXTERNAL TABLE alltypes_plain_bzip2 (
6565
float_col FLOAT NOT NULL,
6666
double_col DOUBLE NOT NULL,
6767
date_string_col BYTEA NOT NULL,
68-
string_col VARCHAR NOT NULL,
68+
string_col BYTEA NOT NULL,
6969
timestamp_col TIMESTAMP NOT NULL,
7070
)
7171
STORED AS AVRO
@@ -82,7 +82,7 @@ CREATE EXTERNAL TABLE alltypes_plain_xz (
8282
float_col FLOAT NOT NULL,
8383
double_col DOUBLE NOT NULL,
8484
date_string_col BYTEA NOT NULL,
85-
string_col VARCHAR NOT NULL,
85+
string_col BYTEA NOT NULL,
8686
timestamp_col TIMESTAMP NOT NULL,
8787
)
8888
STORED AS AVRO
@@ -99,7 +99,7 @@ CREATE EXTERNAL TABLE alltypes_plain_zstandard (
9999
float_col FLOAT NOT NULL,
100100
double_col DOUBLE NOT NULL,
101101
date_string_col BYTEA NOT NULL,
102-
string_col VARCHAR NOT NULL,
102+
string_col BYTEA NOT NULL,
103103
timestamp_col TIMESTAMP NOT NULL,
104104
)
105105
STORED AS AVRO
@@ -260,7 +260,7 @@ physical_plan
260260

261261
# test column projection order from avro file
262262
query ITII
263-
SELECT id, string_col, int_col, bigint_col FROM alltypes_plain ORDER BY id LIMIT 5
263+
SELECT id, CAST(string_col AS varchar), int_col, bigint_col FROM alltypes_plain ORDER BY id LIMIT 5
264264
----
265265
0 0 0 0
266266
1 1 1 10

0 commit comments

Comments
 (0)