diff --git a/src/iceberg/avro/avro_data_util.cc b/src/iceberg/avro/avro_data_util.cc index 5ac565f61..2d819387d 100644 --- a/src/iceberg/avro/avro_data_util.cc +++ b/src/iceberg/avro/avro_data_util.cc @@ -444,6 +444,11 @@ Status AppendFieldToBuilder(const ::avro::NodePtr& avro_node, const SchemaField& projected_field, const arrow::MetadataColumnContext& metadata_context, ::arrow::ArrayBuilder* array_builder) { + if (projection.kind == FieldProjection::Kind::kNull) { + ICEBERG_ARROW_RETURN_NOT_OK(array_builder->AppendNull()); + return {}; + } + if (avro_node->type() == ::avro::AVRO_UNION) { size_t branch = avro_datum.unionBranch(); if (avro_node->leafAt(branch)->type() == ::avro::AVRO_NULL) { @@ -494,6 +499,9 @@ Status ExtractDatumFromArray(const ::arrow::Array& array, int64_t index, } if (array.IsNull(index)) { + if (datum->type() == ::avro::AVRO_NULL) { + return {}; + } if (!datum->isUnion()) [[unlikely]] { return InvalidSchema("Cannot extract null to non-union type: {}", ::avro::toString(datum->type())); diff --git a/src/iceberg/avro/avro_direct_decoder.cc b/src/iceberg/avro/avro_direct_decoder.cc index 335b6064e..5bd239488 100644 --- a/src/iceberg/avro/avro_direct_decoder.cc +++ b/src/iceberg/avro/avro_direct_decoder.cc @@ -574,6 +574,12 @@ Status DecodeFieldToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& d const SchemaField& projected_field, const arrow::MetadataColumnContext& metadata_context, ::arrow::ArrayBuilder* array_builder, DecodeContext& ctx) { + if (projection.kind == FieldProjection::Kind::kNull) { + ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(avro_node, decoder)); + ICEBERG_ARROW_RETURN_NOT_OK(array_builder->AppendNull()); + return {}; + } + if (avro_node->type() == ::avro::AVRO_UNION) { const size_t branch_index = decoder.decodeUnionIndex(); diff --git a/src/iceberg/avro/avro_direct_encoder.cc b/src/iceberg/avro/avro_direct_encoder.cc index caab7f699..045535077 100644 --- a/src/iceberg/avro/avro_direct_encoder.cc +++ b/src/iceberg/avro/avro_direct_encoder.cc @@ -80,7 +80,7 @@ Status EncodeArrowToAvro(const ::avro::NodePtr& avro_node, ::avro::Encoder& enco return EncodeArrowToAvro(branches.value_node, encoder, type, array, row_index, ctx); } - if (is_null) { + if (is_null && avro_node->type() != ::avro::AVRO_NULL) { return InvalidArgument("Null value in non-nullable field"); } diff --git a/src/iceberg/avro/avro_schema_util.cc b/src/iceberg/avro/avro_schema_util.cc index 75db6d8d9..98a5774fe 100644 --- a/src/iceberg/avro/avro_schema_util.cc +++ b/src/iceberg/avro/avro_schema_util.cc @@ -219,6 +219,11 @@ Status ToAvroNodeVisitor::Visit(const BinaryType& type, ::avro::NodePtr* node) { return {}; } +Status ToAvroNodeVisitor::Visit(const UnknownType&, ::avro::NodePtr* node) { + *node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_NULL); + return {}; +} + Status ToAvroNodeVisitor::Visit(const StructType& type, ::avro::NodePtr* node) { *node = std::make_shared<::avro::NodeRecord>(); @@ -320,7 +325,7 @@ Status ToAvroNodeVisitor::Visit(const SchemaField& field, ::avro::NodePtr* node) field_ids_.push(field.field_id()); ICEBERG_RETURN_UNEXPECTED(VisitTypeInline(*field.type(), /*visitor=*/this, node)); - if (field.optional()) { + if (field.optional() && (*node)->type() != ::avro::AVRO_NULL) { ::avro::MultiLeaves union_types; union_types.add(std::make_shared<::avro::NodePrimitive>(::avro::AVRO_NULL)); union_types.add(std::move(*node)); @@ -365,8 +370,8 @@ Status HasIdVisitor::Visit(const ::avro::NodePtr& node) { case ::avro::AVRO_STRING: case ::avro::AVRO_BYTES: case ::avro::AVRO_FIXED: - return {}; case ::avro::AVRO_NULL: + return {}; case ::avro::AVRO_ENUM: default: return InvalidSchema("Unsupported Avro type: {}", static_cast(node->type())); @@ -494,6 +499,10 @@ Result GetFieldId(const ::avro::NodePtr& node, size_t field_idx) { Status ValidateAvroSchemaEvolution(const Type& expected_type, const ::avro::NodePtr& avro_node) { + if (avro_node->type() == ::avro::AVRO_NULL) { + return {}; + } + switch (expected_type.type_id()) { case TypeId::kBoolean: if (avro_node->type() == ::avro::AVRO_BOOL) { @@ -583,6 +592,8 @@ Status ValidateAvroSchemaEvolution(const Type& expected_type, return {}; } break; + case TypeId::kUnknown: + return {}; default: break; } @@ -618,6 +629,35 @@ Result ProjectNested(const Type& expected_type, const ::avro::NodePtr& avro_node, bool prune_source); +Result ProjectField(const SchemaField& expected_field, + const ::avro::NodePtr& avro_node, + size_t source_index, bool prune_source) { + const Type& expected_type = *expected_field.type(); + ::avro::NodePtr field_node; + ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(avro_node, &field_node)); + + FieldProjection projection; + if (expected_type.type_id() == TypeId::kUnknown || + field_node->type() == ::avro::AVRO_NULL) { + if (!expected_field.optional()) { + return InvalidSchema("Cannot project required field with ID: {} as null", + expected_field.field_id()); + } + projection.kind = FieldProjection::Kind::kNull; + return projection; + } + + if (expected_type.is_nested()) { + ICEBERG_ASSIGN_OR_RAISE(projection, + ProjectNested(expected_type, field_node, prune_source)); + } else { + ICEBERG_RETURN_UNEXPECTED(ValidateAvroSchemaEvolution(expected_type, field_node)); + } + projection.from = source_index; + projection.kind = FieldProjection::Kind::kProjected; + return projection; +} + Result ProjectStruct(const StructType& struct_type, const ::avro::NodePtr& avro_node, bool prune_source) { @@ -653,18 +693,9 @@ Result ProjectStruct(const StructType& struct_type, FieldProjection child_projection; if (auto iter = node_info_map.find(field_id); iter != node_info_map.cend()) { - ::avro::NodePtr field_node; - ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(iter->second.field_node, &field_node)); - if (expected_field.type()->is_nested()) { - ICEBERG_ASSIGN_OR_RAISE( - child_projection, - ProjectNested(*expected_field.type(), field_node, prune_source)); - } else { - ICEBERG_RETURN_UNEXPECTED( - ValidateAvroSchemaEvolution(*expected_field.type(), field_node)); - } - child_projection.from = iter->second.local_index; - child_projection.kind = FieldProjection::Kind::kProjected; + ICEBERG_ASSIGN_OR_RAISE(child_projection, + ProjectField(expected_field, iter->second.field_node, + iter->second.local_index, prune_source)); } else if (MetadataColumns::IsMetadataColumn(field_id)) { child_projection.kind = FieldProjection::Kind::kMetadata; } else if (expected_field.optional()) { @@ -701,20 +732,9 @@ Result ProjectList(const ListType& list_type, } FieldProjection element_projection; - ::avro::NodePtr element_node; - ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(avro_node->leafAt(0), &element_node)); - if (expected_element_field.type()->is_nested()) { - ICEBERG_ASSIGN_OR_RAISE( - element_projection, - ProjectNested(*expected_element_field.type(), element_node, prune_source)); - } else { - ICEBERG_RETURN_UNEXPECTED( - ValidateAvroSchemaEvolution(*expected_element_field.type(), element_node)); - } - - // Set the element projection metadata but preserve its children - element_projection.kind = FieldProjection::Kind::kProjected; - element_projection.from = size_t{0}; + ICEBERG_ASSIGN_OR_RAISE(element_projection, + ProjectField(expected_element_field, avro_node->leafAt(0), + size_t{0}, prune_source)); FieldProjection result; result.children.emplace_back(std::move(element_projection)); @@ -770,18 +790,10 @@ Result ProjectMap(const MapType& map_type, for (size_t i = 0; i < map_node->leaves(); ++i) { FieldProjection sub_projection; - ::avro::NodePtr sub_node; - ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(map_node->leafAt(i), &sub_node)); const auto& expected_sub_field = map_type.fields()[i]; - if (expected_sub_field.type()->is_nested()) { - ICEBERG_ASSIGN_OR_RAISE(sub_projection, ProjectNested(*expected_sub_field.type(), - sub_node, prune_source)); - } else { - ICEBERG_RETURN_UNEXPECTED( - ValidateAvroSchemaEvolution(*expected_sub_field.type(), sub_node)); - } - sub_projection.kind = FieldProjection::Kind::kProjected; - sub_projection.from = i; + ICEBERG_ASSIGN_OR_RAISE( + sub_projection, + ProjectField(expected_sub_field, map_node->leafAt(i), i, prune_source)); result.children.emplace_back(std::move(sub_projection)); } @@ -1017,9 +1029,9 @@ Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original case ::avro::AVRO_STRING: case ::avro::AVRO_BYTES: case ::avro::AVRO_FIXED: + case ::avro::AVRO_NULL: // For primitive types, just return a copy return original_node; - case ::avro::AVRO_NULL: case ::avro::AVRO_ENUM: default: return InvalidSchema("Unsupported Avro type for field ID application: {}", diff --git a/src/iceberg/avro/avro_schema_util_internal.h b/src/iceberg/avro/avro_schema_util_internal.h index bdfbf135a..292f5665a 100644 --- a/src/iceberg/avro/avro_schema_util_internal.h +++ b/src/iceberg/avro/avro_schema_util_internal.h @@ -56,6 +56,7 @@ class ToAvroNodeVisitor { Status Visit(const UuidType& type, ::avro::NodePtr* node); Status Visit(const FixedType& type, ::avro::NodePtr* node); Status Visit(const BinaryType& type, ::avro::NodePtr* node); + Status Visit(const UnknownType&, ::avro::NodePtr*); Status Visit(const StructType& type, ::avro::NodePtr* node); Status Visit(const ListType& type, ::avro::NodePtr* node); Status Visit(const MapType& type, ::avro::NodePtr* node); diff --git a/src/iceberg/json_serde.cc b/src/iceberg/json_serde.cc index 2d8c22255..7f8a442b3 100644 --- a/src/iceberg/json_serde.cc +++ b/src/iceberg/json_serde.cc @@ -373,6 +373,8 @@ nlohmann::json ToJson(const Type& type) { } case TypeId::kUuid: return "uuid"; + case TypeId::kUnknown: + return "unknown"; } std::unreachable(); } @@ -437,12 +439,22 @@ Result> StructTypeFromJson(const nlohmann::json& json) { return std::make_unique(std::move(fields)); } +Status ValidateUnknownFieldOptional(const Type& type, bool optional, + std::string_view field_name) { + if (type.type_id() == TypeId::kUnknown && !optional) { + return JsonParseError("Unknown type field '{}' must be optional", field_name); + } + return {}; +} + Result> ListTypeFromJson(const nlohmann::json& json) { ICEBERG_ASSIGN_OR_RAISE(auto element_type, TypeFromJson(json[kElement])); ICEBERG_ASSIGN_OR_RAISE(auto element_id, GetJsonValue(json, kElementId)); ICEBERG_ASSIGN_OR_RAISE(auto element_required, GetJsonValue(json, kElementRequired)); + ICEBERG_RETURN_UNEXPECTED(ValidateUnknownFieldOptional(*element_type, !element_required, + ListType::kElementName)); return std::make_unique( SchemaField(element_id, std::string(ListType::kElementName), std::move(element_type), !element_required)); @@ -458,6 +470,10 @@ Result> MapTypeFromJson(const nlohmann::json& json) { ICEBERG_ASSIGN_OR_RAISE(auto value_id, GetJsonValue(json, kValueId)); ICEBERG_ASSIGN_OR_RAISE(auto value_required, GetJsonValue(json, kValueRequired)); + ICEBERG_RETURN_UNEXPECTED( + ValidateUnknownFieldOptional(*key_type, /*optional=*/false, MapType::kKeyName)); + ICEBERG_RETURN_UNEXPECTED( + ValidateUnknownFieldOptional(*value_type, !value_required, MapType::kValueName)); SchemaField key_field(key_id, std::string(MapType::kKeyName), std::move(key_type), /*optional=*/false); SchemaField value_field(value_id, std::string(MapType::kValueName), @@ -494,6 +510,8 @@ Result> TypeFromJson(const nlohmann::json& json) { return std::make_unique(); } else if (type_str == "uuid") { return std::make_unique(); + } else if (type_str == "unknown") { + return std::make_unique(); } else if (type_str.starts_with("fixed")) { std::regex fixed_regex(R"(fixed\[\s*(\d+)\s*\])"); std::smatch match; @@ -540,6 +558,7 @@ Result> FieldFromJson(const nlohmann::json& json) { ICEBERG_ASSIGN_OR_RAISE(auto required, GetJsonValue(json, kRequired)); ICEBERG_ASSIGN_OR_RAISE(auto doc, GetJsonValueOrDefault(json, kDoc)); + ICEBERG_RETURN_UNEXPECTED(ValidateUnknownFieldOptional(*type, !required, name)); return std::make_unique(field_id, std::move(name), std::move(type), !required, doc); } diff --git a/src/iceberg/parquet/parquet_data_util.cc b/src/iceberg/parquet/parquet_data_util.cc index 43efd1cbd..0265f3327 100644 --- a/src/iceberg/parquet/parquet_data_util.cc +++ b/src/iceberg/parquet/parquet_data_util.cc @@ -166,7 +166,14 @@ Result> ProjectListArrayImpl( const auto& output_element_type = output_list_type->value_type(); std::shared_ptr<::arrow::Array> projected_values; - if (element_field.type()->is_nested()) { + if (element_projection.kind == FieldProjection::Kind::kNull) { + ICEBERG_ASSIGN_OR_RAISE( + projected_values, + MakeNullArray(output_element_type, list_array->values()->length(), pool)); + } else if (element_projection.kind != FieldProjection::Kind::kProjected) { + return NotImplemented("Unsupported list element projection kind: {}", + ToString(element_projection.kind)); + } else if (element_field.type()->is_nested()) { const auto& nested_type = internal::checked_cast(*element_field.type()); ICEBERG_ASSIGN_OR_RAISE( @@ -219,7 +226,14 @@ Result> ProjectMapArray( // Project keys std::shared_ptr<::arrow::Array> projected_keys; - if (key_type->is_nested()) { + if (key_projection.kind == FieldProjection::Kind::kNull) { + ICEBERG_ASSIGN_OR_RAISE( + projected_keys, + MakeNullArray(output_map_type->key_type(), map_array->keys()->length(), pool)); + } else if (key_projection.kind != FieldProjection::Kind::kProjected) { + return NotImplemented("Unsupported map key projection kind: {}", + ToString(key_projection.kind)); + } else if (key_type->is_nested()) { const auto& nested_type = internal::checked_cast(*key_type); ICEBERG_ASSIGN_OR_RAISE( projected_keys, @@ -233,7 +247,14 @@ Result> ProjectMapArray( // Project values std::shared_ptr<::arrow::Array> projected_items; - if (value_type->is_nested()) { + if (value_projection.kind == FieldProjection::Kind::kNull) { + ICEBERG_ASSIGN_OR_RAISE( + projected_items, + MakeNullArray(output_map_type->item_type(), map_array->items()->length(), pool)); + } else if (value_projection.kind != FieldProjection::Kind::kProjected) { + return NotImplemented("Unsupported map value projection kind: {}", + ToString(value_projection.kind)); + } else if (value_type->is_nested()) { const auto& nested_type = internal::checked_cast(*value_type); ICEBERG_ASSIGN_OR_RAISE( projected_items, diff --git a/src/iceberg/parquet/parquet_schema_util.cc b/src/iceberg/parquet/parquet_schema_util.cc index e9574a48c..55e772dd5 100644 --- a/src/iceberg/parquet/parquet_schema_util.cc +++ b/src/iceberg/parquet/parquet_schema_util.cc @@ -59,10 +59,59 @@ std::optional GetFieldId(const ::parquet::arrow::SchemaField& parquet_f return FieldIdFromMetadata(parquet_field.field->metadata()); } -// TODO(gangwu): support v3 unknown type +bool IsNullPhysicalField(const ::parquet::arrow::SchemaField& parquet_field) { + return parquet_field.field->type()->id() == ::arrow::Type::NA; +} + +bool HasSelectedColumn(const FieldProjection& projection) { + if (projection.attributes) { + const auto& attributes = + internal::checked_cast(*projection.attributes); + if (attributes.column_id) { + return true; + } + } + return std::ranges::any_of(projection.children, HasSelectedColumn); +} + +std::optional FirstColumnIndex( + const ::parquet::arrow::SchemaField& parquet_field) { + if (parquet_field.column_index >= 0) { + return parquet_field.column_index; + } + for (const auto& child : parquet_field.children) { + if (auto column_index = FirstColumnIndex(child)) { + return column_index; + } + } + return std::nullopt; +} + +void SelectShapeColumnIfNeeded( + FieldProjection* projection, + const std::vector<::parquet::arrow::SchemaField>& parquet_fields) { + if (HasSelectedColumn(*projection)) { + return; + } + for (const auto& parquet_field : parquet_fields) { + if (auto column_index = FirstColumnIndex(parquet_field)) { + projection->attributes = + std::make_shared(column_index.value()); + return; + } + } +} + +} // namespace + Status ValidateParquetSchemaEvolution( const Type& expected_type, const ::parquet::arrow::SchemaField& parquet_field) { const auto& arrow_type = parquet_field.field->type(); + // Iceberg v3 `unknown` values are represented as nulls in physical files. + // Allow reading them as any projected field type. + if (arrow_type->id() == ::arrow::Type::NA) { + return {}; + } switch (expected_type.type_id()) { case TypeId::kBoolean: if (arrow_type->id() == ::arrow::Type::BOOL) { @@ -166,6 +215,8 @@ Status ValidateParquetSchemaEvolution( } } break; + case TypeId::kUnknown: + return {}; case TypeId::kStruct: if (arrow_type->id() == ::arrow::Type::STRUCT) { return {}; @@ -189,11 +240,41 @@ Status ValidateParquetSchemaEvolution( expected_type, arrow_type->ToString()); } +namespace { + // Forward declaration Result ProjectNested( const Type& nested_type, const std::vector<::parquet::arrow::SchemaField>& parquet_fields); +Result ProjectField(const SchemaField& expected_field, + const ::parquet::arrow::SchemaField& parquet_field, + size_t source_index) { + const Type& expected_type = *expected_field.type(); + ICEBERG_RETURN_UNEXPECTED(ValidateParquetSchemaEvolution(expected_type, parquet_field)); + + FieldProjection projection; + if (expected_type.type_id() == TypeId::kUnknown || IsNullPhysicalField(parquet_field)) { + if (!expected_field.optional()) { + return InvalidSchema("Cannot project required field with id {} as null", + expected_field.field_id()); + } + projection.kind = FieldProjection::Kind::kNull; + return projection; + } + + if (expected_type.is_nested()) { + ICEBERG_ASSIGN_OR_RAISE(projection, + ProjectNested(expected_type, parquet_field.children)); + } else { + projection.attributes = + std::make_shared(parquet_field.column_index); + } + projection.from = source_index; + projection.kind = FieldProjection::Kind::kProjected; + return projection; +} + Result ProjectStruct( const StructType& struct_type, const std::vector<::parquet::arrow::SchemaField>& parquet_fields) { @@ -228,17 +309,8 @@ Result ProjectStruct( if (auto iter = field_context_map.find(field_id); iter != field_context_map.cend()) { const auto& parquet_field = iter->second.parquet_field; - ICEBERG_RETURN_UNEXPECTED( - ValidateParquetSchemaEvolution(*field.type(), parquet_field)); - if (field.type()->is_nested()) { - ICEBERG_ASSIGN_OR_RAISE(child_projection, - ProjectNested(*field.type(), parquet_field.children)); - } else { - child_projection.attributes = - std::make_shared(parquet_field.column_index); - } - child_projection.from = iter->second.local_index; - child_projection.kind = FieldProjection::Kind::kProjected; + ICEBERG_ASSIGN_OR_RAISE( + child_projection, ProjectField(field, parquet_field, iter->second.local_index)); } else if (MetadataColumns::IsMetadataColumn(field_id)) { child_projection.kind = FieldProjection::Kind::kMetadata; } else if (field.optional()) { @@ -250,6 +322,7 @@ Result ProjectStruct( result.children.emplace_back(std::move(child_projection)); } + SelectShapeColumnIfNeeded(&result, parquet_fields); PruneFieldProjection(result); return result; } @@ -274,23 +347,12 @@ Result ProjectList( element_field.field_id(), element_field_id.value()); } - ICEBERG_RETURN_UNEXPECTED( - ValidateParquetSchemaEvolution(*element_field.type(), parquet_field)); - - FieldProjection element_projection; - if (element_field.type()->is_nested()) { - ICEBERG_ASSIGN_OR_RAISE(element_projection, - ProjectNested(*element_field.type(), parquet_field.children)); - } else { - element_projection.attributes = - std::make_shared(parquet_field.column_index); - } - - element_projection.kind = FieldProjection::Kind::kProjected; - element_projection.from = size_t{0}; + ICEBERG_ASSIGN_OR_RAISE(auto element_projection, + ProjectField(element_field, parquet_field, size_t{0})); FieldProjection result; result.children.emplace_back(std::move(element_projection)); + SelectShapeColumnIfNeeded(&result, parquet_fields); return result; } @@ -326,23 +388,20 @@ Result ProjectMap( result.children.reserve(2); for (size_t i = 0; i < parquet_fields.size(); ++i) { - FieldProjection sub_projection; const auto& sub_node = parquet_fields[i]; const auto& sub_field = map_type.fields()[i]; - ICEBERG_RETURN_UNEXPECTED( - ValidateParquetSchemaEvolution(*sub_field.type(), sub_node)); - if (sub_field.type()->is_nested()) { - ICEBERG_ASSIGN_OR_RAISE(sub_projection, - ProjectNested(*sub_field.type(), sub_node.children)); - } else { - sub_projection.attributes = - std::make_shared(sub_node.column_index); + ICEBERG_ASSIGN_OR_RAISE(auto sub_projection, ProjectField(sub_field, sub_node, i)); + if (sub_projection.kind == FieldProjection::Kind::kNull && + !HasSelectedColumn(sub_projection)) { + if (auto column_index = FirstColumnIndex(sub_node)) { + sub_projection.attributes = + std::make_shared(column_index.value()); + } } - sub_projection.kind = FieldProjection::Kind::kProjected; - sub_projection.from = i; result.children.emplace_back(std::move(sub_projection)); } + SelectShapeColumnIfNeeded(&result, parquet_fields); return result; } diff --git a/src/iceberg/parquet/parquet_schema_util_internal.h b/src/iceberg/parquet/parquet_schema_util_internal.h index 8e06b0bcf..567069291 100644 --- a/src/iceberg/parquet/parquet_schema_util_internal.h +++ b/src/iceberg/parquet/parquet_schema_util_internal.h @@ -62,4 +62,8 @@ std::vector SelectedColumnIndices(const SchemaProjection& projection); /// \return True if the Parquet schema has field IDs, false otherwise. bool HasFieldIds(const ::parquet::schema::NodePtr& root_node); +/// \brief Validate whether a projected Iceberg type is compatible with a Parquet field. +Status ValidateParquetSchemaEvolution(const Type& expected_type, + const ::parquet::arrow::SchemaField& parquet_field); + } // namespace iceberg::parquet diff --git a/src/iceberg/row/arrow_array_wrapper.cc b/src/iceberg/row/arrow_array_wrapper.cc index e97293bcd..0d6d48c15 100644 --- a/src/iceberg/row/arrow_array_wrapper.cc +++ b/src/iceberg/row/arrow_array_wrapper.cc @@ -44,6 +44,8 @@ Result ExtractValue(const ArrowSchema* schema, const ArrowArray* array, } switch (array_view->storage_type) { + case NANOARROW_TYPE_NA: + return std::monostate{}; case NANOARROW_TYPE_BOOL: return static_cast(ArrowArrayViewGetIntUnsafe(array_view, index)); case NANOARROW_TYPE_INT32: diff --git a/src/iceberg/schema_internal.cc b/src/iceberg/schema_internal.cc index dedb603e2..91ee334d6 100644 --- a/src/iceberg/schema_internal.cc +++ b/src/iceberg/schema_internal.cc @@ -141,6 +141,9 @@ ArrowErrorCode ToArrowSchema(const Type& type, bool optional, std::string_view n ArrowMetadataBuilderAppend(&metadata_buffer, ArrowCharView(kArrowExtensionName), ArrowCharView(kArrowUuidExtensionName))); } break; + case TypeId::kUnknown: + NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(schema, NANOARROW_TYPE_NA)); + break; } if (!name.empty()) { @@ -208,6 +211,9 @@ Result> FromArrowSchema(const ArrowSchema& schema) { auto field_id = GetFieldId(schema); bool is_optional = (schema.flags & ARROW_FLAG_NULLABLE) != 0; + if (field_type->type_id() == TypeId::kUnknown && !is_optional) { + return InvalidSchema("Arrow null field '{}' must be nullable", schema.name); + } return std::make_unique(field_id, schema.name, std::move(field_type), is_optional); }; @@ -295,6 +301,8 @@ Result> FromArrowSchema(const ArrowSchema& schema) { } return iceberg::fixed(schema_view.fixed_size); } + case NANOARROW_TYPE_NA: + return iceberg::unknown(); default: return InvalidSchema("Unsupported Arrow type: {}", ArrowTypeString(schema_view.type)); diff --git a/src/iceberg/schema_util.cc b/src/iceberg/schema_util.cc index 4acdab631..7cf3677b3 100644 --- a/src/iceberg/schema_util.cc +++ b/src/iceberg/schema_util.cc @@ -49,6 +49,9 @@ Status ValidateSchemaEvolution(const Type& expected_type, const Type& source_typ if (expected_type == source_type) { return {}; } + if (source_type.type_id() == TypeId::kUnknown && expected_type.is_primitive()) { + return {}; + } switch (expected_type.type_id()) { case TypeId::kLong: { @@ -79,6 +82,55 @@ Status ValidateSchemaEvolution(const Type& expected_type, const Type& source_typ return NotSupported("Cannot read {} from {}", expected_type, source_type); } +Result ProjectNested(const Type& expected_type, const Type& source_type, + bool prune_source); + +Result ProjectField(const SchemaField& expected_field, + const SchemaField& source_field, size_t source_index, + bool prune_source) { + FieldProjection projection; + + if (expected_field.type()->type_id() == TypeId::kUnknown) { + if (!expected_field.optional()) { + return InvalidSchema("Cannot project required field with id {} as null", + expected_field.field_id()); + } + projection.kind = FieldProjection::Kind::kNull; + return projection; + } + + if (source_field.type()->type_id() == TypeId::kUnknown && + expected_field.type()->is_nested()) { + if (!expected_field.optional()) { + return InvalidSchema("Cannot project required field with id {} as null", + expected_field.field_id()); + } + projection.kind = FieldProjection::Kind::kNull; + return projection; + } + + if (source_field.type()->type_id() == TypeId::kUnknown && !expected_field.optional()) { + return InvalidSchema("Cannot project required field with id {} as null", + expected_field.field_id()); + } + + if (expected_field.type()->is_nested()) { + ICEBERG_ASSIGN_OR_RAISE( + projection, + ProjectNested(*expected_field.type(), *source_field.type(), prune_source)); + } else { + ICEBERG_RETURN_UNEXPECTED( + ValidateSchemaEvolution(*expected_field.type(), *source_field.type())); + } + + // If `prune_source` is false, all fields will be read so the local index is exactly + // the position to read data. Otherwise, the local index is computed by pruning all + // non-projected fields. + projection.from = source_index; + projection.kind = FieldProjection::Kind::kProjected; + return projection; +} + Result ProjectNested(const Type& expected_type, const Type& source_type, bool prune_source) { if (!expected_type.is_nested()) { @@ -120,19 +172,9 @@ Result ProjectNested(const Type& expected_type, const Type& sou FieldProjection child_projection; if (auto iter = source_field_map.find(field_id); iter != source_field_map.cend()) { - if (expected_field.type()->is_nested()) { - ICEBERG_ASSIGN_OR_RAISE(child_projection, - ProjectNested(*expected_field.type(), - *iter->second.field->type(), prune_source)); - } else { - ICEBERG_RETURN_UNEXPECTED( - ValidateSchemaEvolution(*expected_field.type(), *iter->second.field->type())); - } - // If `prune_source` is false, all fields will be read so the local index - // is exactly the position to read data. Otherwise, the local index is computed - // by pruning all non-projected fields - child_projection.from = iter->second.local_index; - child_projection.kind = FieldProjection::Kind::kProjected; + ICEBERG_ASSIGN_OR_RAISE(child_projection, + ProjectField(expected_field, *iter->second.field, + iter->second.local_index, prune_source)); } else if (MetadataColumns::IsMetadataColumn(field_id)) { child_projection.kind = FieldProjection::Kind::kMetadata; } else if (expected_field.optional()) { diff --git a/src/iceberg/test/arrow_test.cc b/src/iceberg/test/arrow_test.cc index 12039280e..7eb9299fd 100644 --- a/src/iceberg/test/arrow_test.cc +++ b/src/iceberg/test/arrow_test.cc @@ -101,7 +101,9 @@ INSTANTIATE_TEST_SUITE_P( ToArrowSchemaParam{.iceberg_type = iceberg::uuid(), .arrow_type = ::arrow::extension::uuid()}, ToArrowSchemaParam{.iceberg_type = iceberg::fixed(20), - .arrow_type = ::arrow::fixed_size_binary(20)})); + .arrow_type = ::arrow::fixed_size_binary(20)}, + ToArrowSchemaParam{.iceberg_type = iceberg::unknown(), + .arrow_type = ::arrow::null()})); namespace { @@ -227,6 +229,81 @@ TEST(ToArrowSchemaTest, MapType) { /*nullable=*/true, kValueFieldId)); } +TEST(ToArrowSchemaTest, NestedUnknownFieldsRoundTrip) { + Schema schema( + { + SchemaField::MakeOptional( + /*field_id=*/1, "profile", + std::make_shared(std::vector{ + SchemaField::MakeOptional(/*field_id=*/2, "mystery", + iceberg::unknown()), + })), + SchemaField::MakeOptional( + /*field_id=*/3, "mysteries", + std::make_shared(SchemaField::MakeOptional( + /*field_id=*/4, "element", iceberg::unknown()))), + SchemaField::MakeOptional( + /*field_id=*/5, "properties", + std::make_shared( + SchemaField::MakeRequired(/*field_id=*/6, "key", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/7, "value", + iceberg::unknown()))), + }, + /*schema_id=*/0); + + ArrowSchema arrow_c_schema; + ASSERT_THAT(ToArrowSchema(schema, &arrow_c_schema), IsOk()); + + auto imported_schema = ::arrow::ImportSchema(&arrow_c_schema).ValueOrDie(); + ASSERT_EQ(imported_schema->num_fields(), 3); + + auto profile_type = + std::static_pointer_cast<::arrow::StructType>(imported_schema->field(0)->type()); + ASSERT_EQ(profile_type->num_fields(), 1); + ASSERT_NO_FATAL_FAILURE(CheckArrowField(*profile_type->field(0), ::arrow::Type::NA, + "mystery", /*nullable=*/true, + /*field_id=*/2)); + + auto mysteries_type = + std::static_pointer_cast<::arrow::ListType>(imported_schema->field(1)->type()); + ASSERT_NO_FATAL_FAILURE(CheckArrowField(*mysteries_type->value_field(), + ::arrow::Type::NA, "element", + /*nullable=*/true, /*field_id=*/4)); + + auto properties_type = + std::static_pointer_cast<::arrow::MapType>(imported_schema->field(2)->type()); + ASSERT_NO_FATAL_FAILURE(CheckArrowField(*properties_type->key_field(), + ::arrow::Type::STRING, "key", + /*nullable=*/false, /*field_id=*/6)); + ASSERT_NO_FATAL_FAILURE(CheckArrowField(*properties_type->item_field(), + ::arrow::Type::NA, "value", + /*nullable=*/true, /*field_id=*/7)); + + ArrowSchema exported_schema; + ASSERT_TRUE(::arrow::ExportSchema(*imported_schema, &exported_schema).ok()); + auto schema_result = FromArrowSchema(exported_schema, /*schema_id=*/0); + ASSERT_THAT(schema_result, IsOk()); + ArrowSchemaRelease(&exported_schema); + + const auto& round_tripped_schema = *schema_result.value(); + ASSERT_EQ(round_tripped_schema.fields().size(), 3); + + const auto* profile = + dynamic_cast(round_tripped_schema.fields()[0].type().get()); + ASSERT_NE(profile, nullptr); + ASSERT_EQ(profile->fields()[0].type()->type_id(), TypeId::kUnknown); + + const auto* mysteries = + dynamic_cast(round_tripped_schema.fields()[1].type().get()); + ASSERT_NE(mysteries, nullptr); + ASSERT_EQ(mysteries->fields()[0].type()->type_id(), TypeId::kUnknown); + + const auto* properties = + dynamic_cast(round_tripped_schema.fields()[2].type().get()); + ASSERT_NE(properties, nullptr); + ASSERT_EQ(properties->value().type()->type_id(), TypeId::kUnknown); +} + struct FromArrowSchemaParam { std::shared_ptr arrow_type; bool optional = true; @@ -296,7 +373,51 @@ INSTANTIATE_TEST_SUITE_P( FromArrowSchemaParam{.arrow_type = ::arrow::extension::uuid(), .iceberg_type = iceberg::uuid()}, FromArrowSchemaParam{.arrow_type = ::arrow::fixed_size_binary(20), - .iceberg_type = iceberg::fixed(20)})); + .iceberg_type = iceberg::fixed(20)}, + FromArrowSchemaParam{.arrow_type = ::arrow::null(), + .iceberg_type = iceberg::unknown()})); + +TEST(FromArrowSchemaTest, RejectRequiredNullFieldAsUnknown) { + auto metadata = + ::arrow::key_value_metadata(std::unordered_map{ + {std::string(kParquetFieldIdKey), "1"}}); + auto arrow_schema = ::arrow::schema({::arrow::field( + "mystery", ::arrow::null(), /*nullable=*/false, std::move(metadata))}); + + ArrowSchema exported_schema; + ASSERT_TRUE(::arrow::ExportSchema(*arrow_schema, &exported_schema).ok()); + + auto schema_result = FromArrowSchema(exported_schema, /*schema_id=*/0); + ArrowSchemaRelease(&exported_schema); + + ASSERT_THAT(schema_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(schema_result, + HasErrorMessage("Arrow null field 'mystery' must be nullable")); +} + +TEST(FromArrowSchemaTest, RejectRequiredNullListElementAsUnknown) { + auto list_metadata = + ::arrow::key_value_metadata(std::unordered_map{ + {std::string(kParquetFieldIdKey), "1"}}); + auto element_metadata = + ::arrow::key_value_metadata(std::unordered_map{ + {std::string(kParquetFieldIdKey), "2"}}); + auto element_field = ::arrow::field("element", ::arrow::null(), /*nullable=*/false, + std::move(element_metadata)); + auto arrow_schema = + ::arrow::schema({::arrow::field("mysteries", ::arrow::list(element_field), + /*nullable=*/true, std::move(list_metadata))}); + + ArrowSchema exported_schema; + ASSERT_TRUE(::arrow::ExportSchema(*arrow_schema, &exported_schema).ok()); + + auto schema_result = FromArrowSchema(exported_schema, /*schema_id=*/0); + ArrowSchemaRelease(&exported_schema); + + ASSERT_THAT(schema_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(schema_result, + HasErrorMessage("Arrow null field 'element' must be nullable")); +} TEST(FromArrowSchemaTest, StructType) { constexpr int32_t kStructFieldId = 1; diff --git a/src/iceberg/test/avro_data_test.cc b/src/iceberg/test/avro_data_test.cc index c0e42f67b..63f232d8e 100644 --- a/src/iceberg/test/avro_data_test.cc +++ b/src/iceberg/test/avro_data_test.cc @@ -1193,6 +1193,27 @@ TEST(ExtractDatumFromArrayTest, NullHandling) { EXPECT_EQ(record2.fieldAt(0).type(), ::avro::AVRO_NULL); } +TEST(ExtractDatumFromArrayTest, UnknownType) { + Schema iceberg_schema({SchemaField::MakeOptional(1, "a", unknown())}); + ::avro::NodePtr avro_node; + ASSERT_THAT(ToAvroNodeVisitor{}.Visit(iceberg_schema, &avro_node), IsOk()); + + ArrowSchema arrow_c_schema; + ASSERT_THAT(ToArrowSchema(iceberg_schema, &arrow_c_schema), IsOk()); + auto arrow_schema = ::arrow::ImportSchema(&arrow_c_schema).ValueOrDie(); + auto arrow_struct_type = std::make_shared<::arrow::StructType>(arrow_schema->fields()); + + auto arrow_array = + ::arrow::json::ArrayFromJSONString(arrow_struct_type, R"([{"a": null}])") + .ValueOrDie(); + + ::avro::GenericDatum datum(avro_node); + ASSERT_THAT(ExtractDatumFromArray(*arrow_array, 0, &datum), IsOk()); + + const auto& record = datum.value<::avro::GenericRecord>(); + EXPECT_EQ(record.fieldAt(0).type(), ::avro::AVRO_NULL); +} + struct RoundTripParam { std::string name; std::shared_ptr iceberg_schema; diff --git a/src/iceberg/test/avro_schema_test.cc b/src/iceberg/test/avro_schema_test.cc index 2c1ee8a96..41290e70d 100644 --- a/src/iceberg/test/avro_schema_test.cc +++ b/src/iceberg/test/avro_schema_test.cc @@ -230,6 +230,12 @@ TEST(ToAvroNodeVisitorTest, BinaryType) { EXPECT_EQ(node->type(), ::avro::AVRO_BYTES); } +TEST(ToAvroNodeVisitorTest, UnknownType) { + ::avro::NodePtr node; + EXPECT_THAT(ToAvroNodeVisitor{}.Visit(UnknownType{}, &node), IsOk()); + EXPECT_EQ(node->type(), ::avro::AVRO_NULL); +} + TEST(ToAvroNodeVisitorTest, StructType) { StructType struct_type{{SchemaField{/*field_id=*/1, "bool_field", iceberg::boolean(), /*optional=*/false}, @@ -256,6 +262,70 @@ TEST(ToAvroNodeVisitorTest, StructType) { EXPECT_EQ(node->leafAt(1)->leafAt(1)->type(), ::avro::AVRO_INT); } +TEST(ToAvroNodeVisitorTest, OptionalUnknownField) { + StructType struct_type{{SchemaField{/*field_id=*/1, "mystery", iceberg::unknown(), + /*optional=*/true}}}; + + ::avro::NodePtr node; + EXPECT_THAT(ToAvroNodeVisitor{}.Visit(struct_type, &node), IsOk()); + + ASSERT_EQ(node->leaves(), 1); + EXPECT_EQ(node->leafAt(0)->type(), ::avro::AVRO_NULL); + ASSERT_EQ(node->customAttributes(), 1); + ASSERT_NO_FATAL_FAILURE(CheckFieldIdAt(node, /*index=*/0, /*field_id=*/1)); +} + +TEST(ToAvroNodeVisitorTest, NestedUnknownFields) { + StructType struct_type{ + {SchemaField::MakeOptional( + /*field_id=*/1, "profile", + std::make_shared(std::vector{ + SchemaField::MakeOptional(/*field_id=*/2, "mystery", iceberg::unknown()), + })), + SchemaField::MakeOptional( + /*field_id=*/3, "mysteries", + std::make_shared(SchemaField::MakeOptional( + /*field_id=*/4, "element", iceberg::unknown()))), + SchemaField::MakeOptional( + /*field_id=*/5, "properties", + std::make_shared( + SchemaField::MakeRequired(/*field_id=*/6, "key", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/7, "value", iceberg::unknown())))}}; + + ::avro::NodePtr node; + EXPECT_THAT(ToAvroNodeVisitor{}.Visit(struct_type, &node), IsOk()); + + ASSERT_EQ(node->leaves(), 3); + auto profile_union = node->leafAt(0); + ASSERT_EQ(profile_union->type(), ::avro::AVRO_UNION); + auto profile_node = profile_union->leafAt(1); + ASSERT_EQ(profile_node->type(), ::avro::AVRO_RECORD); + ASSERT_EQ(profile_node->leaves(), 1); + EXPECT_EQ(profile_node->leafAt(0)->type(), ::avro::AVRO_NULL); + ASSERT_NO_FATAL_FAILURE(CheckFieldIdAt(profile_node, /*index=*/0, /*field_id=*/2)); + + auto list_union = node->leafAt(1); + ASSERT_EQ(list_union->type(), ::avro::AVRO_UNION); + auto list_node = list_union->leafAt(1); + ASSERT_EQ(list_node->type(), ::avro::AVRO_ARRAY); + ASSERT_EQ(list_node->leaves(), 1); + EXPECT_EQ(list_node->leafAt(0)->type(), ::avro::AVRO_NULL); + ASSERT_NO_FATAL_FAILURE(CheckFieldIdAt(list_node, /*index=*/0, /*field_id=*/4, + /*key=*/"element-id")); + + auto map_union = node->leafAt(2); + ASSERT_EQ(map_union->type(), ::avro::AVRO_UNION); + auto map_node = map_union->leafAt(1); + ASSERT_EQ(map_node->type(), ::avro::AVRO_MAP); + ASSERT_EQ(map_node->leaves(), 2); + EXPECT_EQ(map_node->leafAt(0)->type(), ::avro::AVRO_STRING); + EXPECT_EQ(map_node->leafAt(1)->type(), ::avro::AVRO_NULL); + ASSERT_NO_FATAL_FAILURE(CheckFieldIdAt(map_node, /*index=*/0, /*field_id=*/6, + /*key=*/"key-id")); + ASSERT_NO_FATAL_FAILURE(CheckFieldIdAt(map_node, /*index=*/0, /*field_id=*/7, + /*key=*/"value-id")); +} + TEST(ToAvroNodeVisitorTest, StructTypeWithFieldNames) { StructType struct_type{ {SchemaField{/*field_id=*/1, "user-name", iceberg::string(), @@ -460,6 +530,13 @@ TEST(HasIdVisitorTest, HasNoIds) { EXPECT_FALSE(visitor.AllHaveIds()); } +TEST(HasIdVisitorTest, NullType) { + HasIdVisitor visitor; + EXPECT_THAT(visitor.Visit(::avro::compileJsonSchemaFromString("\"null\"")), IsOk()); + EXPECT_TRUE(visitor.HasNoIds()); + EXPECT_FALSE(visitor.AllHaveIds()); +} + TEST(HasIdVisitorTest, RecordWithFieldIds) { const std::string schema_json = R"({ "type": "record", @@ -879,6 +956,146 @@ TEST(AvroSchemaProjectionTest, ProjectSchemaEvolutionFloatToDouble) { ASSERT_EQ(std::get<1>(projection.fields[0].from), 0); } +TEST(AvroSchemaProjectionTest, ProjectUnknownExpectedFieldAsNull) { + Schema expected_schema({ + SchemaField::MakeOptional(/*field_id=*/1, "mystery", iceberg::unknown()), + }); + + std::string avro_schema_json = R"({ + "type": "record", + "name": "iceberg_schema", + "fields": [ + {"name": "mystery", "type": "int", "field-id": 1} + ] + })"; + auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json); + + auto projection_result = + Project(expected_schema, avro_schema.root(), /*prune_source=*/false); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kNull); +} + +TEST(AvroSchemaProjectionTest, ProjectNestedUnknownExpectedFieldsAsNull) { + Schema expected_schema({ + SchemaField::MakeOptional( + /*field_id=*/1, "profile", + std::make_shared(std::vector{ + SchemaField::MakeOptional(/*field_id=*/2, "name", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/3, "mystery", iceberg::unknown()), + })), + SchemaField::MakeOptional( + /*field_id=*/4, "mysteries", + std::make_shared(SchemaField::MakeOptional( + /*field_id=*/5, "element", iceberg::unknown()))), + SchemaField::MakeOptional( + /*field_id=*/6, "properties", + std::make_shared( + SchemaField::MakeRequired(/*field_id=*/7, "key", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/8, "value", iceberg::unknown()))), + }); + + std::string avro_schema_json = R"({ + "type": "record", + "name": "iceberg_schema", + "fields": [ + {"name": "profile", "type": ["null", { + "type": "record", + "name": "profile_record", + "fields": [ + {"name": "name", "type": ["null", "string"], "field-id": 2}, + {"name": "mystery", "type": ["null", "int"], "field-id": 3} + ] + }], "field-id": 1}, + {"name": "mysteries", "type": ["null", { + "type": "array", + "items": ["null", "int"], + "element-id": 5 + }], "field-id": 4}, + {"name": "properties", "type": ["null", { + "type": "map", + "values": ["null", "int"], + "key-id": 7, + "value-id": 8 + }], "field-id": 6} + ] + })"; + auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json); + + auto projection_result = + Project(expected_schema, avro_schema.root(), /*prune_source=*/false); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 3); + + ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(projection.fields[0].children.size(), 2); + ASSERT_EQ(projection.fields[0].children[0].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(projection.fields[0].children[1].kind, FieldProjection::Kind::kNull); + + ASSERT_EQ(projection.fields[1].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(projection.fields[1].children.size(), 1); + ASSERT_EQ(projection.fields[1].children[0].kind, FieldProjection::Kind::kNull); + + ASSERT_EQ(projection.fields[2].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(projection.fields[2].children.size(), 2); + ASSERT_EQ(projection.fields[2].children[0].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(projection.fields[2].children[1].kind, FieldProjection::Kind::kNull); +} + +TEST(AvroSchemaProjectionTest, RejectNullLeafForRequiredField) { + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "value", iceberg::int32()), + }); + + std::string avro_schema_json = R"({ + "type": "record", + "name": "iceberg_schema", + "fields": [ + {"name": "value", "type": "null", "field-id": 1} + ] + })"; + auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json); + + auto projection_result = + Project(expected_schema, avro_schema.root(), /*prune_source=*/false); + ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(projection_result, + HasErrorMessage("Cannot project required field with ID: 1 as null")); +} + +TEST(AvroSchemaProjectionTest, RejectNullListElementForRequiredElement) { + Schema expected_schema({ + SchemaField::MakeOptional( + /*field_id=*/1, "numbers", + std::make_shared(SchemaField::MakeRequired( + /*field_id=*/101, "element", iceberg::int32()))), + }); + + std::string avro_schema_json = R"({ + "type": "record", + "name": "iceberg_schema", + "fields": [ + {"name": "numbers", "type": ["null", { + "type": "array", + "items": "null", + "element-id": 101 + }], "field-id": 1} + ] + })"; + auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json); + + auto projection_result = + Project(expected_schema, avro_schema.root(), /*prune_source=*/false); + ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(projection_result, + HasErrorMessage("Cannot project required field with ID: 101 as null")); +} + TEST(AvroSchemaProjectionTest, ProjectSchemaEvolutionIncompatibleTypes) { // Create iceberg schema expecting an int Schema expected_schema({ diff --git a/src/iceberg/test/avro_test.cc b/src/iceberg/test/avro_test.cc index b74fe829b..90fb86151 100644 --- a/src/iceberg/test/avro_test.cc +++ b/src/iceberg/test/avro_test.cc @@ -890,6 +890,31 @@ TEST_P(AvroWriterTest, WriteOptionalFields) { VerifyWrittenData(test_data); } +TEST_P(AvroWriterTest, WriteNestedUnknownFields) { + auto schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "id", std::make_shared()), + SchemaField::MakeOptional(2, "profile", + std::make_shared(std::vector{ + SchemaField::MakeOptional(3, "mystery", unknown()), + })), + SchemaField::MakeOptional( + 4, "mysteries", + std::make_shared(SchemaField::MakeOptional(5, "element", unknown()))), + SchemaField::MakeOptional( + 6, "properties", + std::make_shared(SchemaField::MakeRequired(7, "key", string()), + SchemaField::MakeOptional(8, "value", unknown())))}); + + std::string test_data = R"([ + [1, [null], [null, null], [["a", null], ["b", null]]], + [2, null, [], []], + [3, [null], null, null] + ])"; + + WriteAvroFile(schema, test_data); + VerifyWrittenData(test_data); +} + TEST_P(AvroWriterTest, WriteLargeDataset) { auto schema = std::make_shared(std::vector{ SchemaField::MakeRequired(1, "id", std::make_shared()), diff --git a/src/iceberg/test/parquet_data_test.cc b/src/iceberg/test/parquet_data_test.cc index 9ed28114e..606ad8ca5 100644 --- a/src/iceberg/test/parquet_data_test.cc +++ b/src/iceberg/test/parquet_data_test.cc @@ -316,6 +316,50 @@ TEST(ProjectRecordBatchTest, MapStringToInt) { VerifyProjectRecordBatch(iceberg_schema, iceberg_schema, input_json, input_json)); } +TEST(ProjectRecordBatchTest, NestedUnknownFields) { + Schema projected_schema({ + SchemaField::MakeRequired(1, "profile", + std::make_shared(std::vector{ + SchemaField::MakeRequired(2, "name", string()), + SchemaField::MakeOptional(3, "mystery", unknown()), + })), + SchemaField::MakeRequired( + 4, "mysteries", + std::make_shared(SchemaField::MakeOptional(5, "element", unknown()))), + SchemaField::MakeRequired( + 6, "properties", + std::make_shared(SchemaField::MakeRequired(7, "key", string()), + SchemaField::MakeOptional(8, "value", unknown()))), + }); + + Schema source_schema({ + SchemaField::MakeRequired(1, "profile", + std::make_shared(std::vector{ + SchemaField::MakeRequired(2, "name", string()), + SchemaField::MakeOptional(3, "mystery", int32()), + })), + SchemaField::MakeRequired( + 4, "mysteries", + std::make_shared(SchemaField::MakeOptional(5, "element", int32()))), + SchemaField::MakeRequired( + 6, "properties", + std::make_shared(SchemaField::MakeRequired(7, "key", string()), + SchemaField::MakeOptional(8, "value", int32()))), + }); + + const std::string input_json = R"([ + {"profile": {"name": "Person0", "mystery": 10}, "mysteries": [1, 2], "properties": [["a", 100], ["b", 200]]}, + {"profile": {"name": "Person1", "mystery": null}, "mysteries": [], "properties": []} + ])"; + const std::string expected_json = R"([ + {"profile": {"name": "Person0", "mystery": null}, "mysteries": [null, null], "properties": [["a", null], ["b", null]]}, + {"profile": {"name": "Person1", "mystery": null}, "mysteries": [], "properties": []} + ])"; + + ASSERT_NO_FATAL_FAILURE(VerifyProjectRecordBatch(projected_schema, source_schema, + input_json, expected_json)); +} + TEST(ProjectRecordBatchTest, MapStringToStruct) { Schema iceberg_schema({ SchemaField::MakeRequired( diff --git a/src/iceberg/test/parquet_schema_test.cc b/src/iceberg/test/parquet_schema_test.cc index a9da3f9f7..40b06adbd 100644 --- a/src/iceberg/test/parquet_schema_test.cc +++ b/src/iceberg/test/parquet_schema_test.cc @@ -17,7 +17,12 @@ * under the License. */ +#include +#include +#include + #include +#include #include #include #include @@ -123,6 +128,30 @@ ::parquet::arrow::SchemaManifest MakeSchemaManifest( return manifest; } +::parquet::arrow::SchemaField MakeNullSchemaField(const std::string& name, int field_id) { + ::parquet::arrow::SchemaField schema_field; + schema_field.field = + ::arrow::field(name, ::arrow::null()) + ->WithMetadata(::arrow::key_value_metadata({std::string(kParquetFieldIdKey)}, + {std::to_string(field_id)})); + return schema_field; +} + +::parquet::arrow::SchemaField MakeListSchemaFieldWithNullElement(const std::string& name, + int field_id, + int element_field_id) { + ::parquet::arrow::SchemaField element_field = + MakeNullSchemaField("element", element_field_id); + + ::parquet::arrow::SchemaField schema_field; + schema_field.field = + ::arrow::field(name, ::arrow::list(element_field.field)) + ->WithMetadata(::arrow::key_value_metadata({std::string(kParquetFieldIdKey)}, + {std::to_string(field_id)})); + schema_field.children = {std::move(element_field)}; + return schema_field; +} + #define ASSERT_PROJECTED_FIELD(field_projection, index) \ ASSERT_EQ(field_projection.kind, FieldProjection::Kind::kProjected); \ ASSERT_EQ(std::get<1>(field_projection.from), index); @@ -303,6 +332,177 @@ TEST(ParquetSchemaProjectionTest, ProjectSchemaEvolutionFloatToDouble) { ASSERT_PROJECTED_FIELD(projection.fields[0], 0); } +TEST(ParquetSchemaProjectionTest, ValidateSchemaEvolutionAllowsNullPhysicalType) { + ::parquet::arrow::SchemaField parquet_field; + parquet_field.field = ::arrow::field("value", ::arrow::null()); + + auto status = ValidateParquetSchemaEvolution(*iceberg::int32(), parquet_field); + ASSERT_THAT(status, IsOk()); +} + +TEST(ParquetSchemaProjectionTest, ProjectNullPhysicalFieldsAsNull) { + Schema expected_schema({ + SchemaField::MakeOptional(/*field_id=*/1, "age", iceberg::int32()), + SchemaField::MakeOptional( + /*field_id=*/2, "profile", + std::make_shared(std::vector{ + SchemaField::MakeRequired(/*field_id=*/201, "name", iceberg::string()), + })), + SchemaField::MakeOptional( + /*field_id=*/3, "numbers", + std::make_shared(SchemaField::MakeRequired( + /*field_id=*/301, "element", iceberg::int32()))), + SchemaField::MakeOptional( + /*field_id=*/4, "counts", + std::make_shared( + SchemaField::MakeRequired(/*field_id=*/401, "key", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/402, "value", iceberg::int32()))), + }); + + ::parquet::arrow::SchemaManifest schema_manifest; + schema_manifest.schema_fields = { + MakeNullSchemaField("age", /*field_id=*/1), + MakeNullSchemaField("profile", /*field_id=*/2), + MakeNullSchemaField("numbers", /*field_id=*/3), + MakeNullSchemaField("counts", /*field_id=*/4), + }; + + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 4); + for (const auto& field_projection : projection.fields) { + ASSERT_PROJECTED_NULL_FIELD(field_projection); + ASSERT_TRUE(field_projection.children.empty()); + } + + ASSERT_TRUE(SelectedColumnIndices(projection).empty()); +} + +TEST(ParquetSchemaProjectionTest, RejectNullPhysicalFieldForRequiredField) { + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "age", iceberg::int32()), + }); + + ::parquet::arrow::SchemaManifest schema_manifest; + schema_manifest.schema_fields = { + MakeNullSchemaField("age", /*field_id=*/1), + }; + + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(projection_result, + HasErrorMessage("Cannot project required field with id 1 as null")); +} + +TEST(ParquetSchemaProjectionTest, RejectNullPhysicalListElementForRequiredElement) { + Schema expected_schema({ + SchemaField::MakeOptional( + /*field_id=*/1, "numbers", + std::make_shared(SchemaField::MakeRequired( + /*field_id=*/101, "element", iceberg::int32()))), + }); + + ::parquet::arrow::SchemaManifest schema_manifest; + schema_manifest.schema_fields = { + MakeListSchemaFieldWithNullElement("numbers", /*field_id=*/1, + /*element_field_id=*/101), + }; + + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(projection_result, + HasErrorMessage("Cannot project required field with id 101 as null")); +} + +TEST(ParquetSchemaProjectionTest, ProjectUnknownExpectedFieldAsNull) { + Schema expected_schema({ + SchemaField::MakeOptional(/*field_id=*/1, "mystery", iceberg::unknown()), + }); + + auto parquet_schema = + MakeGroupNode("iceberg_schema", {MakeInt32Node("mystery", /*field_id=*/1)}); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + ASSERT_PROJECTED_NULL_FIELD(projection.fields[0]); + ASSERT_TRUE(SelectedColumnIndices(projection).empty()); +} + +TEST(ParquetSchemaProjectionTest, ProjectNestedUnknownExpectedFieldsAsNull) { + Schema expected_schema({ + SchemaField::MakeOptional( + /*field_id=*/1, "profile", + std::make_shared(std::vector{ + SchemaField::MakeOptional(/*field_id=*/2, "name", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/3, "mystery", iceberg::unknown()), + })), + SchemaField::MakeOptional( + /*field_id=*/4, "mysteries", + std::make_shared(SchemaField::MakeOptional( + /*field_id=*/5, "element", iceberg::unknown()))), + SchemaField::MakeOptional( + /*field_id=*/6, "properties", + std::make_shared( + SchemaField::MakeRequired(/*field_id=*/7, "key", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/8, "value", iceberg::unknown()))), + SchemaField::MakeOptional( + /*field_id=*/9, "wrapper", + std::make_shared(std::vector{ + SchemaField::MakeOptional(/*field_id=*/10, "mystery", iceberg::unknown()), + })), + }); + + auto parquet_schema = MakeGroupNode( + "iceberg_schema", + { + MakeGroupNode("profile", + {MakeStringNode("name", /*field_id=*/2), + MakeInt32Node("mystery", /*field_id=*/3)}, + /*field_id=*/1), + MakeListNode("mysteries", MakeInt32Node("element", /*field_id=*/5), + /*field_id=*/4), + MakeMapNode("properties", + MakeStringNode("key", /*field_id=*/7, /*optional=*/false), + MakeInt32Node("value", /*field_id=*/8), + /*field_id=*/6), + MakeGroupNode("wrapper", {MakeInt32Node("mystery", /*field_id=*/10)}, + /*field_id=*/9), + }); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 4); + + ASSERT_PROJECTED_FIELD(projection.fields[0], 0); + ASSERT_EQ(projection.fields[0].children.size(), 2); + ASSERT_PROJECTED_FIELD(projection.fields[0].children[0], 0); + ASSERT_PROJECTED_NULL_FIELD(projection.fields[0].children[1]); + + ASSERT_PROJECTED_FIELD(projection.fields[1], 1); + ASSERT_EQ(projection.fields[1].children.size(), 1); + ASSERT_PROJECTED_NULL_FIELD(projection.fields[1].children[0]); + + ASSERT_PROJECTED_FIELD(projection.fields[2], 2); + ASSERT_EQ(projection.fields[2].children.size(), 2); + ASSERT_PROJECTED_FIELD(projection.fields[2].children[0], 0); + ASSERT_PROJECTED_NULL_FIELD(projection.fields[2].children[1]); + + ASSERT_PROJECTED_FIELD(projection.fields[3], 3); + ASSERT_EQ(projection.fields[3].children.size(), 1); + ASSERT_PROJECTED_NULL_FIELD(projection.fields[3].children[0]); + + ASSERT_EQ(SelectedColumnIndices(projection), std::vector({0, 2, 3, 4, 5})); +} + TEST(ParquetSchemaProjectionTest, ProjectSchemaEvolutionIncompatibleTypes) { Schema expected_schema({ SchemaField::MakeRequired(/*field_id=*/1, "value", iceberg::int32()), diff --git a/src/iceberg/test/parquet_test.cc b/src/iceberg/test/parquet_test.cc index 65a4602d8..ff24c19fe 100644 --- a/src/iceberg/test/parquet_test.cc +++ b/src/iceberg/test/parquet_test.cc @@ -436,6 +436,78 @@ TEST_F(ParquetReaderTest, ReadMetadataOnlyProjection) { ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader, kExpectedJson)); } +TEST_F(ParquetReaderTest, ReadNestedUnknownProjection) { + temp_parquet_file_ = "nested_unknown.parquet"; + auto write_schema = std::make_shared(std::vector{ + SchemaField::MakeOptional(1, "profile", + std::make_shared(std::vector{ + SchemaField::MakeOptional(2, "name", string()), + SchemaField::MakeOptional(3, "mystery", int32()), + })), + SchemaField::MakeOptional( + 4, "mysteries", + std::make_shared(SchemaField::MakeOptional(5, "element", int32()))), + SchemaField::MakeOptional( + 6, "properties", + std::make_shared(SchemaField::MakeRequired(7, "key", string()), + SchemaField::MakeOptional(8, "value", int32()))), + SchemaField::MakeOptional(9, "wrapper", + std::make_shared(std::vector{ + SchemaField::MakeOptional(10, "mystery", int32()), + })), + }); + auto read_schema = std::make_shared(std::vector{ + SchemaField::MakeOptional(1, "profile", + std::make_shared(std::vector{ + SchemaField::MakeOptional(2, "name", string()), + SchemaField::MakeOptional(3, "mystery", unknown()), + })), + SchemaField::MakeOptional( + 4, "mysteries", + std::make_shared(SchemaField::MakeOptional(5, "element", unknown()))), + SchemaField::MakeOptional( + 6, "properties", + std::make_shared(SchemaField::MakeRequired(7, "key", string()), + SchemaField::MakeOptional(8, "value", unknown()))), + SchemaField::MakeOptional(9, "wrapper", + std::make_shared(std::vector{ + SchemaField::MakeOptional(10, "mystery", unknown()), + })), + }); + + ArrowSchema arrow_c_schema; + ASSERT_THAT(ToArrowSchema(*write_schema, &arrow_c_schema), IsOk()); + auto arrow_type = ::arrow::ImportType(&arrow_c_schema).ValueOrDie(); + auto array = ::arrow::json::ArrayFromJSONString(arrow_type, + R"([ + {"profile": {"name": "Person0", "mystery": 10}, "mysteries": [1, 2], "properties": [["a", 100], ["b", 200]], "wrapper": {"mystery": 300}}, + {"profile": {"name": "Person1", "mystery": null}, "mysteries": [], "properties": [], "wrapper": {"mystery": null}} + ])") + .ValueOrDie(); + + WriterProperties writer_properties; + writer_properties.Set(WriterProperties::kParquetCompression, + std::string("uncompressed")); + ASSERT_THAT(WriteArray(array, {.path = temp_parquet_file_, + .schema = write_schema, + .io = file_io_, + .properties = std::move(writer_properties)}), + IsOk()); + + ICEBERG_UNWRAP_OR_FAIL( + auto reader, + ReaderFactoryRegistry::Open( + FileFormatType::kParquet, + {.path = temp_parquet_file_, .io = file_io_, .projection = read_schema})); + + ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader, + R"([ + {"profile": {"name": "Person0", "mystery": null}, "mysteries": [null, null], "properties": [["a", null], ["b", null]], "wrapper": {"mystery": null}}, + {"profile": {"name": "Person1", "mystery": null}, "mysteries": [], "properties": [], "wrapper": {"mystery": null}} + ])")); + ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader)); +} + class ParquetReadWrite : public ::testing::Test { protected: static void SetUpTestSuite() { parquet::RegisterAll(); } diff --git a/src/iceberg/test/schema_json_test.cc b/src/iceberg/test/schema_json_test.cc index 87388cbb4..6d9f2b864 100644 --- a/src/iceberg/test/schema_json_test.cc +++ b/src/iceberg/test/schema_json_test.cc @@ -64,6 +64,7 @@ INSTANTIATE_TEST_SUITE_P( SchemaJsonParam{.json = "\"string\"", .type = iceberg::string()}, SchemaJsonParam{.json = "\"binary\"", .type = iceberg::binary()}, SchemaJsonParam{.json = "\"uuid\"", .type = iceberg::uuid()}, + SchemaJsonParam{.json = "\"unknown\"", .type = iceberg::unknown()}, SchemaJsonParam{.json = "\"fixed[8]\"", .type = iceberg::fixed(8)}, SchemaJsonParam{.json = "\"decimal(10,2)\"", .type = iceberg::decimal(10, 2)}, SchemaJsonParam{.json = "\"date\"", .type = iceberg::date()}, @@ -134,6 +135,87 @@ TEST(SchemaJsonTest, RoundTrip) { ASSERT_EQ(dumped_json, json); } +TEST(SchemaJsonTest, UnknownFieldRoundTrip) { + constexpr std::string_view json = + R"({"fields":[{"id":1,"name":"mystery","required":false,"type":"unknown"}],"schema-id":1,"type":"struct"})"; + + ICEBERG_UNWRAP_OR_FAIL(auto schema, SchemaFromJson(nlohmann::json::parse(json))); + ASSERT_EQ(schema->fields().size(), 1); + + const auto& field = schema->fields()[0]; + ASSERT_EQ(field.field_id(), 1); + ASSERT_EQ(field.name(), "mystery"); + ASSERT_EQ(field.type()->type_id(), TypeId::kUnknown); + ASSERT_TRUE(field.optional()); + ASSERT_EQ(ToJson(*schema).dump(), json); +} + +TEST(SchemaJsonTest, NestedUnknownFieldsRoundTrip) { + constexpr std::string_view json = + R"({"fields":[{"id":1,"name":"profile","required":false,"type":{"fields":[{"id":2,"name":"mystery","required":false,"type":"unknown"}],"type":"struct"}},{"id":3,"name":"mysteries","required":false,"type":{"element":"unknown","element-id":4,"element-required":false,"type":"list"}},{"id":5,"name":"properties","required":false,"type":{"key":"string","key-id":6,"type":"map","value":"unknown","value-id":7,"value-required":false}}],"schema-id":1,"type":"struct"})"; + + ICEBERG_UNWRAP_OR_FAIL(auto schema, SchemaFromJson(nlohmann::json::parse(json))); + ASSERT_EQ(schema->fields().size(), 3); + + const auto* profile = dynamic_cast(schema->fields()[0].type().get()); + ASSERT_NE(profile, nullptr); + ASSERT_EQ(profile->fields().size(), 1); + ASSERT_EQ(profile->fields()[0].type()->type_id(), TypeId::kUnknown); + ASSERT_TRUE(profile->fields()[0].optional()); + + const auto* mysteries = dynamic_cast(schema->fields()[1].type().get()); + ASSERT_NE(mysteries, nullptr); + ASSERT_EQ(mysteries->fields()[0].type()->type_id(), TypeId::kUnknown); + ASSERT_TRUE(mysteries->fields()[0].optional()); + + const auto* properties = dynamic_cast(schema->fields()[2].type().get()); + ASSERT_NE(properties, nullptr); + ASSERT_EQ(properties->value().type()->type_id(), TypeId::kUnknown); + ASSERT_TRUE(properties->value().optional()); + + ASSERT_EQ(ToJson(*schema).dump(), json); +} + +TEST(SchemaJsonTest, RejectRequiredUnknownField) { + constexpr std::string_view json = + R"({"fields":[{"id":1,"name":"mystery","required":true,"type":"unknown"}],"schema-id":1,"type":"struct"})"; + + auto schema_result = SchemaFromJson(nlohmann::json::parse(json)); + ASSERT_THAT(schema_result, IsError(ErrorKind::kJsonParseError)); + ASSERT_THAT(schema_result, + HasErrorMessage("Unknown type field 'mystery' must be optional")); +} + +TEST(SchemaJsonTest, RejectRequiredUnknownListElement) { + constexpr std::string_view json = + R"({"fields":[{"id":1,"name":"mysteries","required":false,"type":{"element":"unknown","element-id":2,"element-required":true,"type":"list"}}],"schema-id":1,"type":"struct"})"; + + auto schema_result = SchemaFromJson(nlohmann::json::parse(json)); + ASSERT_THAT(schema_result, IsError(ErrorKind::kJsonParseError)); + ASSERT_THAT(schema_result, + HasErrorMessage("Unknown type field 'element' must be optional")); +} + +TEST(SchemaJsonTest, RejectUnknownMapKey) { + constexpr std::string_view json = + R"({"fields":[{"id":1,"name":"mysteries","required":false,"type":{"key":"unknown","key-id":2,"type":"map","value":"string","value-id":3,"value-required":false}}],"schema-id":1,"type":"struct"})"; + + auto schema_result = SchemaFromJson(nlohmann::json::parse(json)); + ASSERT_THAT(schema_result, IsError(ErrorKind::kJsonParseError)); + ASSERT_THAT(schema_result, + HasErrorMessage("Unknown type field 'key' must be optional")); +} + +TEST(SchemaJsonTest, RejectRequiredUnknownMapValue) { + constexpr std::string_view json = + R"({"fields":[{"id":1,"name":"mysteries","required":false,"type":{"key":"string","key-id":2,"type":"map","value":"unknown","value-id":3,"value-required":true}}],"schema-id":1,"type":"struct"})"; + + auto schema_result = SchemaFromJson(nlohmann::json::parse(json)); + ASSERT_THAT(schema_result, IsError(ErrorKind::kJsonParseError)); + ASSERT_THAT(schema_result, + HasErrorMessage("Unknown type field 'value' must be optional")); +} + TEST(SchemaJsonTest, IdentifierFieldIds) { // Test schema with identifier-field-ids constexpr std::string_view json_with_identifier_str = diff --git a/src/iceberg/test/schema_util_test.cc b/src/iceberg/test/schema_util_test.cc index fe6579ab3..4f15ed806 100644 --- a/src/iceberg/test/schema_util_test.cc +++ b/src/iceberg/test/schema_util_test.cc @@ -226,6 +226,133 @@ TEST(SchemaUtilTest, ProjectSchemaEvolutionFloatToDouble) { AssertProjectedField(projection.fields[0], 0); } +TEST(SchemaUtilTest, ProjectSchemaEvolutionUnknownToPrimitive) { + Schema source_schema( + {SchemaField::MakeOptional(/*field_id=*/2, "value", iceberg::unknown())}); + Schema expected_schema( + {SchemaField::MakeOptional(/*field_id=*/2, "value", iceberg::string())}); + + auto projection_result = + Project(expected_schema, source_schema, /*prune_source=*/false); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + AssertProjectedField(projection.fields[0], 0); +} + +TEST(SchemaUtilTest, RejectSchemaEvolutionUnknownToRequiredPrimitive) { + Schema source_schema( + {SchemaField::MakeOptional(/*field_id=*/2, "value", iceberg::unknown())}); + Schema expected_schema( + {SchemaField::MakeRequired(/*field_id=*/2, "value", iceberg::string())}); + + auto projection_result = + Project(expected_schema, source_schema, /*prune_source=*/false); + ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(projection_result, + HasErrorMessage("Cannot project required field with id 2 as null")); +} + +TEST(SchemaUtilTest, ProjectSchemaEvolutionNestedFieldsToUnknown) { + Schema source_schema({ + SchemaField::MakeOptional( + /*field_id=*/2, "profile", + std::make_shared(std::vector{ + SchemaField::MakeOptional(/*field_id=*/201, "mystery", iceberg::int32()), + SchemaField::MakeOptional(/*field_id=*/202, "name", iceberg::string()), + })), + SchemaField::MakeOptional( + /*field_id=*/3, "items", + std::make_shared(SchemaField::MakeOptional( + /*field_id=*/301, "element", iceberg::int32()))), + SchemaField::MakeOptional( + /*field_id=*/4, "attributes", + std::make_shared( + SchemaField::MakeRequired(/*field_id=*/401, "key", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/402, "value", iceberg::int32()))), + }); + Schema expected_schema({ + SchemaField::MakeOptional( + /*field_id=*/2, "profile", + std::make_shared(std::vector{ + SchemaField::MakeOptional(/*field_id=*/201, "mystery", iceberg::unknown()), + SchemaField::MakeOptional(/*field_id=*/202, "name", iceberg::string()), + })), + SchemaField::MakeOptional( + /*field_id=*/3, "items", + std::make_shared(SchemaField::MakeOptional( + /*field_id=*/301, "element", iceberg::unknown()))), + SchemaField::MakeOptional( + /*field_id=*/4, "attributes", + std::make_shared( + SchemaField::MakeRequired(/*field_id=*/401, "key", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/402, "value", iceberg::unknown()))), + }); + + for (bool prune_source : {false, true}) { + auto projection_result = Project(expected_schema, source_schema, prune_source); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 3); + AssertProjectedField(projection.fields[0], 0); + AssertProjectedField(projection.fields[1], 1); + AssertProjectedField(projection.fields[2], 2); + + ASSERT_EQ(projection.fields[0].children.size(), 2); + ASSERT_EQ(projection.fields[0].children[0].kind, FieldProjection::Kind::kNull); + AssertProjectedField(projection.fields[0].children[1], prune_source ? 0 : 1); + + ASSERT_EQ(projection.fields[1].children.size(), 1); + ASSERT_EQ(projection.fields[1].children[0].kind, FieldProjection::Kind::kNull); + + ASSERT_EQ(projection.fields[2].children.size(), 2); + AssertProjectedField(projection.fields[2].children[0], 0); + ASSERT_EQ(projection.fields[2].children[1].kind, FieldProjection::Kind::kNull); + } +} + +TEST(SchemaUtilTest, ProjectSchemaEvolutionUnknownToOptionalNested) { + Schema source_schema({ + SchemaField::MakeOptional(/*field_id=*/2, "profile", iceberg::unknown()), + SchemaField::MakeOptional(/*field_id=*/3, "items", iceberg::unknown()), + SchemaField::MakeOptional(/*field_id=*/4, "attributes", iceberg::unknown()), + }); + Schema expected_schema({ + SchemaField::MakeOptional(/*field_id=*/2, "profile", CreateNestedStruct()), + SchemaField::MakeOptional(/*field_id=*/3, "items", CreateListOfStruct()), + SchemaField::MakeOptional(/*field_id=*/4, "attributes", CreateMapWithStructValue()), + }); + + for (bool prune_source : {false, true}) { + auto projection_result = Project(expected_schema, source_schema, prune_source); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 3); + for (const auto& field_projection : projection.fields) { + ASSERT_EQ(field_projection.kind, FieldProjection::Kind::kNull); + ASSERT_TRUE(field_projection.children.empty()); + } + } +} + +TEST(SchemaUtilTest, RejectSchemaEvolutionUnknownToRequiredNested) { + Schema source_schema({ + SchemaField::MakeOptional(/*field_id=*/2, "profile", iceberg::unknown()), + }); + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/2, "profile", CreateNestedStruct()), + }); + + auto projection_result = + Project(expected_schema, source_schema, /*prune_source=*/false); + ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(projection_result, + HasErrorMessage("Cannot project required field with id 2 as null")); +} + TEST(SchemaUtilTest, ProjectSchemaEvolutionDecimalCompatible) { Schema source_schema( {SchemaField::MakeOptional(/*field_id=*/2, "value", iceberg::decimal(9, 2))}); diff --git a/src/iceberg/test/type_test.cc b/src/iceberg/test/type_test.cc index 266ff6103..675bbf39b 100644 --- a/src/iceberg/test/type_test.cc +++ b/src/iceberg/test/type_test.cc @@ -90,7 +90,7 @@ TEST_P(TypeTest, StdFormat) { ASSERT_EQ(test_case.repr, std::format("{}", *test_case.type)); } -const static std::array kPrimitiveTypes = {{ +const static std::array kPrimitiveTypes = {{ { .name = "boolean", .type = iceberg::boolean(), @@ -203,6 +203,13 @@ const static std::array kPrimitiveTypes = {{ .primitive = true, .repr = "uuid", }, + { + .name = "unknown", + .type = iceberg::unknown(), + .type_id = iceberg::TypeId::kUnknown, + .primitive = true, + .repr = "unknown", + }, }}; const static std::array kNestedTypes = {{ diff --git a/src/iceberg/test/update_schema_test.cc b/src/iceberg/test/update_schema_test.cc index 8550c8b56..1fbb6d257 100644 --- a/src/iceberg/test/update_schema_test.cc +++ b/src/iceberg/test/update_schema_test.cc @@ -1054,6 +1054,20 @@ TEST_F(UpdateSchemaTest, UpdateColumnFloatToDouble) { EXPECT_EQ(*field_opt->get().type(), *float64()); } +TEST_F(UpdateSchemaTest, UpdateColumnUnknownToPrimitive) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema()); + update->AddColumn("mystery", unknown(), "A null-only placeholder"); + update->UpdateColumn("mystery", string()); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + + ICEBERG_UNWRAP_OR_FAIL(auto field_opt, result.schema->FindFieldByName("mystery")); + ASSERT_TRUE(field_opt.has_value()); + EXPECT_EQ(*field_opt->get().type(), *string()); + EXPECT_TRUE(field_opt->get().optional()); + EXPECT_EQ(field_opt->get().doc(), "A null-only placeholder"); +} + TEST_F(UpdateSchemaTest, UpdateColumnSameType) { ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema()); update->AddColumn("id", int32()); diff --git a/src/iceberg/test/visit_type_test.cc b/src/iceberg/test/visit_type_test.cc index 786e1fd2c..64ad7d9c8 100644 --- a/src/iceberg/test/visit_type_test.cc +++ b/src/iceberg/test/visit_type_test.cc @@ -53,7 +53,7 @@ std::string TypeTestCaseToString(const ::testing::TestParamInfo& i return info.param.name; } -const static std::array kPrimitiveTypes = {{ +const static std::array kPrimitiveTypes = {{ { .name = "boolean", .type = iceberg::boolean(), @@ -166,6 +166,13 @@ const static std::array kPrimitiveTypes = {{ .primitive = true, .repr = "uuid", }, + { + .name = "unknown", + .type = iceberg::unknown(), + .type_id = iceberg::TypeId::kUnknown, + .primitive = true, + .repr = "unknown", + }, }}; const static std::array kNestedTypes = {{ diff --git a/src/iceberg/type.cc b/src/iceberg/type.cc index f008ad908..2d254bd4d 100644 --- a/src/iceberg/type.cc +++ b/src/iceberg/type.cc @@ -334,6 +334,10 @@ TypeId UuidType::type_id() const { return kTypeId; } std::string UuidType::ToString() const { return "uuid"; } bool UuidType::Equals(const Type& other) const { return other.type_id() == kTypeId; } +TypeId UnknownType::type_id() const { return kTypeId; } +std::string UnknownType::ToString() const { return "unknown"; } +bool UnknownType::Equals(const Type& other) const { return other.type_id() == kTypeId; } + FixedType::FixedType(int32_t length) : length_(length) { ICEBERG_CHECK_OR_DIE(length >= 0, "FixedType: length must be >= 0, was {}", length); } @@ -374,6 +378,7 @@ TYPE_FACTORY(timestamp_tz, TimestampTzType) TYPE_FACTORY(binary, BinaryType) TYPE_FACTORY(string, StringType) TYPE_FACTORY(uuid, UuidType) +TYPE_FACTORY(unknown, UnknownType) #undef TYPE_FACTORY @@ -433,6 +438,8 @@ std::string_view ToString(TypeId id) { return "fixed"; case TypeId::kBinary: return "binary"; + case TypeId::kUnknown: + return "unknown"; } std::unreachable(); diff --git a/src/iceberg/type.h b/src/iceberg/type.h index 1c50135dc..e63549c71 100644 --- a/src/iceberg/type.h +++ b/src/iceberg/type.h @@ -465,6 +465,21 @@ class ICEBERG_EXPORT UuidType : public PrimitiveType { bool Equals(const Type& other) const override; }; +/// \brief A null-only placeholder type used when a more specific type is not known. +class ICEBERG_EXPORT UnknownType : public PrimitiveType { + public: + constexpr static const TypeId kTypeId = TypeId::kUnknown; + + UnknownType() = default; + ~UnknownType() override = default; + + TypeId type_id() const override; + std::string ToString() const override; + + protected: + bool Equals(const Type& other) const override; +}; + /// @} /// \defgroup type-factories Factory functions for creating primitive data types @@ -496,6 +511,8 @@ ICEBERG_EXPORT const std::shared_ptr& binary(); ICEBERG_EXPORT const std::shared_ptr& string(); /// \brief Return a UuidType instance. ICEBERG_EXPORT const std::shared_ptr& uuid(); +/// \brief Return an UnknownType instance. +ICEBERG_EXPORT const std::shared_ptr& unknown(); /// \brief Create a DecimalType with the given precision and scale. /// \param precision The number of decimal digits (max 38). diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 3fe199d8a..4704a7e9d 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -31,7 +31,7 @@ namespace iceberg { /// This is not a complete data type by itself because some types are nested /// and/or parameterized. /// -/// Iceberg V3 types are not currently supported. +/// Iceberg V3's `unknown` type is supported as a null-only placeholder type. enum class TypeId { kStruct, kList, @@ -50,6 +50,7 @@ enum class TypeId { kUuid, kFixed, kBinary, + kUnknown, }; /// \brief The time unit. In Iceberg V3 nanoseconds are also supported. @@ -78,6 +79,7 @@ class TimestampBase; class TimestampType; class TimestampTzType; class Type; +class UnknownType; class UuidType; /// \brief Data values. diff --git a/src/iceberg/util/struct_like_set.cc b/src/iceberg/util/struct_like_set.cc index cc3de5293..e1dda8a7e 100644 --- a/src/iceberg/util/struct_like_set.cc +++ b/src/iceberg/util/struct_like_set.cc @@ -263,6 +263,8 @@ Status ValidateScalarAgainstType(const Scalar& scalar, const Type& type) { } switch (type.type_id()) { + case TypeId::kUnknown: + return InvalidArgument("Expected unknown but got {}", ScalarTypeName(scalar)); case TypeId::kBoolean: ICEBERG_PRECHECK(std::holds_alternative(scalar), "Expected boolean but got {}", ScalarTypeName(scalar)); diff --git a/src/iceberg/util/type_util.cc b/src/iceberg/util/type_util.cc index c6b9bb3ed..e22b9b148 100644 --- a/src/iceberg/util/type_util.cc +++ b/src/iceberg/util/type_util.cc @@ -426,6 +426,10 @@ bool IsPromotionAllowed(const std::shared_ptr& from_type, TypeId from_id = from_type->type_id(); TypeId to_id = to_type->type_id(); + if (from_id == TypeId::kUnknown) { + return true; + } + // int -> long if (from_id == TypeId::kInt && to_id == TypeId::kLong) { return true; diff --git a/src/iceberg/util/visitor_generate.h b/src/iceberg/util/visitor_generate.h index 053371d41..c2bcaf2eb 100644 --- a/src/iceberg/util/visitor_generate.h +++ b/src/iceberg/util/visitor_generate.h @@ -36,6 +36,7 @@ namespace iceberg { ACTION(Uuid); \ ACTION(Fixed); \ ACTION(Binary); \ + ACTION(Unknown); \ ACTION(Struct); \ ACTION(List); \ ACTION(Map);