From 31b5186ba1b9ad22b44ee92b3ae56c95b709f6e4 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 12 Jun 2026 11:05:57 -0700 Subject: [PATCH] fix: fill schema-added nested columns with typed NULL arrays on read When a column of a nested type (list, map, or a struct with nested children) is added to the table schema after data files were written, reading those older files failed with "unexpected target column type": the helpers that materialize missing columns only handled primitive types plus structs with primitive-only children, via hand-written per-type NULL branches. Build the all-NULL column with arrow's new_null_array instead, which supports every Arrow type (including arbitrarily nested ones), and drop the per-type NULL branches from create_primitive_array_repeated and create_primitive_array_single_element. Closes #2618 --- .../src/arrow/record_batch_transformer.rs | 124 ++++++++++- crates/iceberg/src/arrow/value.rs | 192 ++---------------- 2 files changed, 135 insertions(+), 181 deletions(-) diff --git a/crates/iceberg/src/arrow/record_batch_transformer.rs b/crates/iceberg/src/arrow/record_batch_transformer.rs index 439358435c..8a83259b7d 100644 --- a/crates/iceberg/src/arrow/record_batch_transformer.rs +++ b/crates/iceberg/src/arrow/record_batch_transformer.rs @@ -673,7 +673,9 @@ mod test { use crate::arrow::record_batch_transformer::{ RecordBatchTransformer, RecordBatchTransformerBuilder, }; - use crate::spec::{Literal, NestedField, PrimitiveType, Schema, Struct, Type}; + use crate::spec::{ + ListType, Literal, MapType, NestedField, PrimitiveType, Schema, Struct, Type, + }; /// Helper to extract string values from either StringArray or RunEndEncoded /// Returns empty string for null values @@ -921,6 +923,126 @@ mod test { assert!(struct_column.is_null(2)); } + #[test] + fn schema_evolution_adds_list_map_and_nested_struct_columns_with_nulls() { + // Reproduces https://github.com/apache/iceberg-rust/issues/2618. + // Columns added by schema evolution can be of any nested type (list, map, + // or a struct that itself contains nested children). Old data files lack + // these columns, so the transformer must fill them with typed all-NULL + // arrays instead of erroring with "unexpected target column type". + let snapshot_schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional( + 2, + "xs", + Type::List(ListType { + element_field: NestedField::list_element( + 3, + Type::Primitive(PrimitiveType::Int), + false, + ) + .into(), + }), + ) + .into(), + NestedField::optional( + 4, + "props", + Type::Map(MapType { + key_field: NestedField::map_key_element( + 5, + Type::Primitive(PrimitiveType::String), + ) + .into(), + value_field: NestedField::map_value_element( + 6, + Type::Primitive(PrimitiveType::Int), + false, + ) + .into(), + }), + ) + .into(), + NestedField::optional( + 7, + "s", + Type::Struct(crate::spec::StructType::new(vec![ + NestedField::optional(8, "a", Type::Primitive(PrimitiveType::String)) + .into(), + NestedField::optional( + 9, + "ys", + Type::List(ListType { + element_field: NestedField::list_element( + 10, + Type::Primitive(PrimitiveType::Long), + false, + ) + .into(), + }), + ) + .into(), + ])), + ) + .into(), + ]) + .build() + .unwrap(), + ); + let projected_iceberg_field_ids = [1, 2, 4, 7]; + + let mut transformer = + RecordBatchTransformerBuilder::new(snapshot_schema, &projected_iceberg_field_ids) + .build(); + + // The old data file only contains `id`. + let file_schema = Arc::new(ArrowSchema::new(vec![simple_field( + "id", + DataType::Int32, + false, + "1", + )])); + let file_batch = + RecordBatch::try_new(file_schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]) + .unwrap(); + + let result = transformer.process_record_batch(file_batch).unwrap(); + + assert_eq!(result.num_columns(), 4); + assert_eq!(result.num_rows(), 3); + + let id_column = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id_column.values(), &[1, 2, 3]); + + // The added columns must use the evolved schema's Arrow types and be all-NULL. + assert!(matches!( + result.schema().field(1).data_type(), + DataType::List(_) + )); + assert!(matches!( + result.schema().field(2).data_type(), + DataType::Map(_, _) + )); + assert!(matches!( + result.schema().field(3).data_type(), + DataType::Struct(_) + )); + for (idx, name) in [(1, "xs"), (2, "props"), (3, "s")] { + assert_eq!( + result.column(idx).null_count(), + 3, + "added nested column `{name}` should be all-NULL" + ); + } + } + pub fn source_record_batch() -> RecordBatch { RecordBatch::try_new( arrow_schema_promotion_addition_and_renaming_required(), diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index d07233c420..5844d23eb8 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -23,7 +23,6 @@ use arrow_array::{ LargeListArray, LargeStringArray, ListArray, MapArray, StringArray, StructArray, Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray, }; -use arrow_buffer::NullBuffer; use arrow_schema::{DataType, FieldRef, TimeUnit}; use uuid::Uuid; @@ -628,23 +627,25 @@ pub(crate) fn create_primitive_array_single_element( data_type: &DataType, prim_lit: &Option, ) -> Result { + // With no value, the single element is NULL. `new_null_array` supports every + // Arrow type, including nested ones (list/map/struct), which matters for + // columns added by schema evolution after a data file was written (#2618). + if prim_lit.is_none() { + return Ok(arrow_array::new_null_array(data_type, 1)); + } match (data_type, prim_lit) { (DataType::Boolean, Some(PrimitiveLiteral::Boolean(v))) => { Ok(Arc::new(BooleanArray::from(vec![*v]))) } - (DataType::Boolean, None) => Ok(Arc::new(BooleanArray::from(vec![Option::::None]))), (DataType::Int32, Some(PrimitiveLiteral::Int(v))) => { Ok(Arc::new(Int32Array::from(vec![*v]))) } - (DataType::Int32, None) => Ok(Arc::new(Int32Array::from(vec![Option::::None]))), (DataType::Date32, Some(PrimitiveLiteral::Int(v))) => { Ok(Arc::new(Date32Array::from(vec![*v]))) } - (DataType::Date32, None) => Ok(Arc::new(Date32Array::from(vec![Option::::None]))), (DataType::Int64, Some(PrimitiveLiteral::Long(v))) => { Ok(Arc::new(Int64Array::from(vec![*v]))) } - (DataType::Int64, None) => Ok(Arc::new(Int64Array::from(vec![Option::::None]))), (DataType::Timestamp(TimeUnit::Microsecond, timezone), Some(PrimitiveLiteral::Long(v))) => { let array = TimestampMicrosecondArray::from(vec![*v]); if let Some(timezone) = timezone { @@ -653,14 +654,6 @@ pub(crate) fn create_primitive_array_single_element( Ok(Arc::new(array)) } } - (DataType::Timestamp(TimeUnit::Microsecond, timezone), None) => { - let array = TimestampMicrosecondArray::from(vec![Option::::None]); - if let Some(timezone) = timezone { - Ok(Arc::new(array.with_timezone(timezone.clone()))) - } else { - Ok(Arc::new(array)) - } - } (DataType::Timestamp(TimeUnit::Nanosecond, timezone), Some(PrimitiveLiteral::Long(v))) => { let array = TimestampNanosecondArray::from(vec![*v]); if let Some(timezone) = timezone { @@ -669,32 +662,18 @@ pub(crate) fn create_primitive_array_single_element( Ok(Arc::new(array)) } } - (DataType::Timestamp(TimeUnit::Nanosecond, timezone), None) => { - let array = TimestampNanosecondArray::from(vec![Option::::None]); - if let Some(timezone) = timezone { - Ok(Arc::new(array.with_timezone(timezone.clone()))) - } else { - Ok(Arc::new(array)) - } - } (DataType::Float32, Some(PrimitiveLiteral::Float(v))) => { Ok(Arc::new(Float32Array::from(vec![v.0]))) } - (DataType::Float32, None) => Ok(Arc::new(Float32Array::from(vec![Option::::None]))), (DataType::Float64, Some(PrimitiveLiteral::Double(v))) => { Ok(Arc::new(Float64Array::from(vec![v.0]))) } - (DataType::Float64, None) => Ok(Arc::new(Float64Array::from(vec![Option::::None]))), (DataType::Utf8, Some(PrimitiveLiteral::String(v))) => { Ok(Arc::new(StringArray::from(vec![v.as_str()]))) } - (DataType::Utf8, None) => Ok(Arc::new(StringArray::from(vec![Option::<&str>::None]))), (DataType::Binary, Some(PrimitiveLiteral::Binary(v))) => { Ok(Arc::new(BinaryArray::from_vec(vec![v.as_slice()]))) } - (DataType::Binary, None) => Ok(Arc::new(BinaryArray::from_opt_vec(vec![ - Option::<&[u8]>::None, - ]))), (DataType::Decimal128(precision, scale), Some(PrimitiveLiteral::Int128(v))) => { let array = Decimal128Array::from(vec![{ *v }]) .with_precision_and_scale(*precision, *scale) @@ -721,81 +700,6 @@ pub(crate) fn create_primitive_array_single_element( })?; Ok(Arc::new(array)) } - (DataType::Decimal128(precision, scale), None) => { - let array = Decimal128Array::from(vec![Option::::None]) - .with_precision_and_scale(*precision, *scale) - .map_err(|e| { - Error::new( - ErrorKind::DataInvalid, - format!( - "Failed to create Decimal128Array with precision {precision} and scale {scale}: {e}" - ), - ) - })?; - Ok(Arc::new(array)) - } - (DataType::Struct(fields), None) => { - // Create a single-element StructArray with nulls - let null_arrays: Vec = fields - .iter() - .map(|f| { - // Recursively create null arrays for struct fields - // For primitive fields in structs, use simple null arrays (not REE within struct) - match f.data_type() { - DataType::Boolean => { - Ok(Arc::new(BooleanArray::from(vec![Option::::None])) - as ArrayRef) - } - DataType::Int32 | DataType::Date32 => { - Ok(Arc::new(Int32Array::from(vec![Option::::None])) as ArrayRef) - } - DataType::Int64 => { - Ok(Arc::new(Int64Array::from(vec![Option::::None])) as ArrayRef) - } - DataType::Timestamp(TimeUnit::Microsecond, timezone) => { - let array = TimestampMicrosecondArray::from(vec![Option::::None]); - if let Some(timezone) = timezone { - Ok(Arc::new(array.with_timezone(timezone.clone())) as ArrayRef) - } else { - Ok(Arc::new(array) as ArrayRef) - } - } - DataType::Timestamp(TimeUnit::Nanosecond, timezone) => { - let array = TimestampNanosecondArray::from(vec![Option::::None]); - if let Some(timezone) = timezone { - Ok(Arc::new(array.with_timezone(timezone.clone())) as ArrayRef) - } else { - Ok(Arc::new(array) as ArrayRef) - } - } - DataType::Float32 => { - Ok(Arc::new(Float32Array::from(vec![Option::::None])) as ArrayRef) - } - DataType::Float64 => { - Ok(Arc::new(Float64Array::from(vec![Option::::None])) as ArrayRef) - } - DataType::Utf8 => { - Ok(Arc::new(StringArray::from(vec![Option::<&str>::None])) as ArrayRef) - } - DataType::Binary => { - Ok( - Arc::new(BinaryArray::from_opt_vec(vec![Option::<&[u8]>::None])) - as ArrayRef, - ) - } - _ => Err(Error::new( - ErrorKind::Unexpected, - format!("Unsupported struct field type: {:?}", f.data_type()), - )), - } - }) - .collect::>>()?; - Ok(Arc::new(arrow_array::StructArray::new( - fields.clone(), - null_arrays, - Some(arrow_buffer::NullBuffer::new_null(1)), - ))) - } _ => Err(Error::new( ErrorKind::Unexpected, format!("Unsupported constant type combination: {data_type:?} with {prim_lit:?}"), @@ -812,35 +716,25 @@ pub(crate) fn create_primitive_array_repeated( prim_lit: &Option, num_rows: usize, ) -> Result { + // With no value to repeat, the column is all-NULL. `new_null_array` supports + // every Arrow type, including nested ones (list/map/struct), which matters for + // columns added by schema evolution after a data file was written (#2618). + if prim_lit.is_none() { + return Ok(arrow_array::new_null_array(data_type, num_rows)); + } Ok(match (data_type, prim_lit) { (DataType::Boolean, Some(PrimitiveLiteral::Boolean(value))) => { Arc::new(BooleanArray::from(vec![*value; num_rows])) } - (DataType::Boolean, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(BooleanArray::from(vals)) - } (DataType::Int32, Some(PrimitiveLiteral::Int(value))) => { Arc::new(Int32Array::from(vec![*value; num_rows])) } - (DataType::Int32, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(Int32Array::from(vals)) - } (DataType::Date32, Some(PrimitiveLiteral::Int(value))) => { Arc::new(Date32Array::from(vec![*value; num_rows])) } - (DataType::Date32, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(Date32Array::from(vals)) - } (DataType::Int64, Some(PrimitiveLiteral::Long(value))) => { Arc::new(Int64Array::from(vec![*value; num_rows])) } - (DataType::Int64, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(Int64Array::from(vals)) - } ( DataType::Timestamp(TimeUnit::Microsecond, timezone), Some(PrimitiveLiteral::Long(value)), @@ -852,15 +746,6 @@ pub(crate) fn create_primitive_array_repeated( Arc::new(array) } } - (DataType::Timestamp(TimeUnit::Microsecond, timezone), None) => { - let vals: Vec> = vec![None; num_rows]; - let array = TimestampMicrosecondArray::from(vals); - if let Some(timezone) = timezone { - Arc::new(array.with_timezone(timezone.clone())) - } else { - Arc::new(array) - } - } ( DataType::Timestamp(TimeUnit::Nanosecond, timezone), Some(PrimitiveLiteral::Long(value)), @@ -872,43 +757,18 @@ pub(crate) fn create_primitive_array_repeated( Arc::new(array) } } - (DataType::Timestamp(TimeUnit::Nanosecond, timezone), None) => { - let vals: Vec> = vec![None; num_rows]; - let array = TimestampNanosecondArray::from(vals); - if let Some(timezone) = timezone { - Arc::new(array.with_timezone(timezone.clone())) - } else { - Arc::new(array) - } - } (DataType::Float32, Some(PrimitiveLiteral::Float(value))) => { Arc::new(Float32Array::from(vec![value.0; num_rows])) } - (DataType::Float32, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(Float32Array::from(vals)) - } (DataType::Float64, Some(PrimitiveLiteral::Double(value))) => { Arc::new(Float64Array::from(vec![value.0; num_rows])) } - (DataType::Float64, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(Float64Array::from(vals)) - } (DataType::Utf8, Some(PrimitiveLiteral::String(value))) => { Arc::new(StringArray::from(vec![value.clone(); num_rows])) } - (DataType::Utf8, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(StringArray::from(vals)) - } (DataType::Binary, Some(PrimitiveLiteral::Binary(value))) => { Arc::new(BinaryArray::from_vec(vec![value; num_rows])) } - (DataType::Binary, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(BinaryArray::from_opt_vec(vals)) - } (DataType::Decimal128(precision, scale), Some(PrimitiveLiteral::Int128(value))) => { Arc::new( Decimal128Array::from(vec![*value; num_rows]) @@ -937,34 +797,6 @@ pub(crate) fn create_primitive_array_repeated( })?, ) } - (DataType::Decimal128(precision, scale), None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new( - Decimal128Array::from(vals) - .with_precision_and_scale(*precision, *scale) - .map_err(|e| { - Error::new( - ErrorKind::DataInvalid, - format!( - "Failed to create Decimal128Array with precision {precision} and scale {scale}: {e}" - ), - ) - })?, - ) - } - (DataType::Struct(fields), None) => { - // Create a StructArray filled with nulls - let null_arrays: Vec = fields - .iter() - .map(|field| create_primitive_array_repeated(field.data_type(), &None, num_rows)) - .collect::>>()?; - - Arc::new(StructArray::new( - fields.clone(), - null_arrays, - Some(NullBuffer::new_null(num_rows)), - )) - } (DataType::Null, _) => Arc::new(arrow_array::NullArray::new(num_rows)), (dt, _) => { return Err(Error::new(