Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 123 additions & 1 deletion crates/iceberg/src/arrow/record_batch_transformer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,9 @@ mod test {
use crate::arrow::record_batch_transformer::{
RecordBatchTransformer, RecordBatchTransformerBuilder,
};
use crate::spec::{Literal, NestedField, PrimitiveType, Schema, Struct, Type};
use crate::spec::{
ListType, Literal, MapType, NestedField, PrimitiveType, Schema, Struct, Type,
};

/// Helper to extract string values from either StringArray or RunEndEncoded<StringArray>
/// Returns empty string for null values
Expand Down Expand Up @@ -921,6 +923,126 @@ mod test {
assert!(struct_column.is_null(2));
}

#[test]
fn schema_evolution_adds_list_map_and_nested_struct_columns_with_nulls() {
// Reproduces https://github.com/apache/iceberg-rust/issues/2618.
// Columns added by schema evolution can be of any nested type (list, map,
// or a struct that itself contains nested children). Old data files lack
// these columns, so the transformer must fill them with typed all-NULL
// arrays instead of erroring with "unexpected target column type".
let snapshot_schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(
2,
"xs",
Type::List(ListType {
element_field: NestedField::list_element(
3,
Type::Primitive(PrimitiveType::Int),
false,
)
.into(),
}),
)
.into(),
NestedField::optional(
4,
"props",
Type::Map(MapType {
key_field: NestedField::map_key_element(
5,
Type::Primitive(PrimitiveType::String),
)
.into(),
value_field: NestedField::map_value_element(
6,
Type::Primitive(PrimitiveType::Int),
false,
)
.into(),
}),
)
.into(),
NestedField::optional(
7,
"s",
Type::Struct(crate::spec::StructType::new(vec![
NestedField::optional(8, "a", Type::Primitive(PrimitiveType::String))
.into(),
NestedField::optional(
9,
"ys",
Type::List(ListType {
element_field: NestedField::list_element(
10,
Type::Primitive(PrimitiveType::Long),
false,
)
.into(),
}),
)
.into(),
])),
)
.into(),
])
.build()
.unwrap(),
);
let projected_iceberg_field_ids = [1, 2, 4, 7];

let mut transformer =
RecordBatchTransformerBuilder::new(snapshot_schema, &projected_iceberg_field_ids)
.build();

// The old data file only contains `id`.
let file_schema = Arc::new(ArrowSchema::new(vec![simple_field(
"id",
DataType::Int32,
false,
"1",
)]));
let file_batch =
RecordBatch::try_new(file_schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))])
.unwrap();

let result = transformer.process_record_batch(file_batch).unwrap();

assert_eq!(result.num_columns(), 4);
assert_eq!(result.num_rows(), 3);

let id_column = result
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(id_column.values(), &[1, 2, 3]);

// The added columns must use the evolved schema's Arrow types and be all-NULL.
assert!(matches!(
result.schema().field(1).data_type(),
DataType::List(_)
));
assert!(matches!(
result.schema().field(2).data_type(),
DataType::Map(_, _)
));
assert!(matches!(
result.schema().field(3).data_type(),
DataType::Struct(_)
));
for (idx, name) in [(1, "xs"), (2, "props"), (3, "s")] {
assert_eq!(
result.column(idx).null_count(),
3,
"added nested column `{name}` should be all-NULL"
);
}
}

pub fn source_record_batch() -> RecordBatch {
RecordBatch::try_new(
arrow_schema_promotion_addition_and_renaming_required(),
Expand Down
Loading
Loading