Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/iceberg/avro/avro_data_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,11 @@ Status AppendFieldToBuilder(const ::avro::NodePtr& avro_node,
const SchemaField& projected_field,
const arrow::MetadataColumnContext& metadata_context,
::arrow::ArrayBuilder* array_builder) {
if (projection.kind == FieldProjection::Kind::kNull) {
ICEBERG_ARROW_RETURN_NOT_OK(array_builder->AppendNull());
return {};
}

if (avro_node->type() == ::avro::AVRO_UNION) {
size_t branch = avro_datum.unionBranch();
if (avro_node->leafAt(branch)->type() == ::avro::AVRO_NULL) {
Expand Down Expand Up @@ -494,6 +499,9 @@ Status ExtractDatumFromArray(const ::arrow::Array& array, int64_t index,
}

if (array.IsNull(index)) {
if (datum->type() == ::avro::AVRO_NULL) {
return {};
}
if (!datum->isUnion()) [[unlikely]] {
return InvalidSchema("Cannot extract null to non-union type: {}",
::avro::toString(datum->type()));
Expand Down
6 changes: 6 additions & 0 deletions src/iceberg/avro/avro_direct_decoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,12 @@ Status DecodeFieldToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& d
const SchemaField& projected_field,
const arrow::MetadataColumnContext& metadata_context,
::arrow::ArrayBuilder* array_builder, DecodeContext& ctx) {
if (projection.kind == FieldProjection::Kind::kNull) {
ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(avro_node, decoder));
ICEBERG_ARROW_RETURN_NOT_OK(array_builder->AppendNull());
return {};
}

if (avro_node->type() == ::avro::AVRO_UNION) {
const size_t branch_index = decoder.decodeUnionIndex();

Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/avro/avro_direct_encoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ Status EncodeArrowToAvro(const ::avro::NodePtr& avro_node, ::avro::Encoder& enco
return EncodeArrowToAvro(branches.value_node, encoder, type, array, row_index, ctx);
}

if (is_null) {
if (is_null && avro_node->type() != ::avro::AVRO_NULL) {
return InvalidArgument("Null value in non-nullable field");
}

Expand Down
92 changes: 52 additions & 40 deletions src/iceberg/avro/avro_schema_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,11 @@ Status ToAvroNodeVisitor::Visit(const BinaryType& type, ::avro::NodePtr* node) {
return {};
}

Status ToAvroNodeVisitor::Visit(const UnknownType&, ::avro::NodePtr* node) {
*node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_NULL);
return {};
}

Status ToAvroNodeVisitor::Visit(const StructType& type, ::avro::NodePtr* node) {
*node = std::make_shared<::avro::NodeRecord>();

Expand Down Expand Up @@ -320,7 +325,7 @@ Status ToAvroNodeVisitor::Visit(const SchemaField& field, ::avro::NodePtr* node)
field_ids_.push(field.field_id());
ICEBERG_RETURN_UNEXPECTED(VisitTypeInline(*field.type(), /*visitor=*/this, node));

if (field.optional()) {
if (field.optional() && (*node)->type() != ::avro::AVRO_NULL) {
::avro::MultiLeaves union_types;
union_types.add(std::make_shared<::avro::NodePrimitive>(::avro::AVRO_NULL));
union_types.add(std::move(*node));
Expand Down Expand Up @@ -365,8 +370,8 @@ Status HasIdVisitor::Visit(const ::avro::NodePtr& node) {
case ::avro::AVRO_STRING:
case ::avro::AVRO_BYTES:
case ::avro::AVRO_FIXED:
return {};
case ::avro::AVRO_NULL:
return {};
case ::avro::AVRO_ENUM:
default:
return InvalidSchema("Unsupported Avro type: {}", static_cast<int>(node->type()));
Expand Down Expand Up @@ -494,6 +499,10 @@ Result<int32_t> GetFieldId(const ::avro::NodePtr& node, size_t field_idx) {

Status ValidateAvroSchemaEvolution(const Type& expected_type,
const ::avro::NodePtr& avro_node) {
if (avro_node->type() == ::avro::AVRO_NULL) {
return {};
}

switch (expected_type.type_id()) {
case TypeId::kBoolean:
if (avro_node->type() == ::avro::AVRO_BOOL) {
Expand Down Expand Up @@ -583,6 +592,8 @@ Status ValidateAvroSchemaEvolution(const Type& expected_type,
return {};
}
break;
case TypeId::kUnknown:
return {};
default:
break;
}
Expand Down Expand Up @@ -618,6 +629,35 @@ Result<FieldProjection> ProjectNested(const Type& expected_type,
const ::avro::NodePtr& avro_node,
bool prune_source);

Result<FieldProjection> ProjectField(const SchemaField& expected_field,
const ::avro::NodePtr& avro_node,
size_t source_index, bool prune_source) {
const Type& expected_type = *expected_field.type();
::avro::NodePtr field_node;
ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(avro_node, &field_node));

FieldProjection projection;
if (expected_type.type_id() == TypeId::kUnknown ||
field_node->type() == ::avro::AVRO_NULL) {
if (!expected_field.optional()) {
return InvalidSchema("Cannot project required field with ID: {} as null",
expected_field.field_id());
}
projection.kind = FieldProjection::Kind::kNull;
return projection;
Comment thread
manuzhang marked this conversation as resolved.
}

if (expected_type.is_nested()) {
ICEBERG_ASSIGN_OR_RAISE(projection,
ProjectNested(expected_type, field_node, prune_source));
} else {
ICEBERG_RETURN_UNEXPECTED(ValidateAvroSchemaEvolution(expected_type, field_node));
}
projection.from = source_index;
projection.kind = FieldProjection::Kind::kProjected;
return projection;
}

Result<FieldProjection> ProjectStruct(const StructType& struct_type,
const ::avro::NodePtr& avro_node,
bool prune_source) {
Expand Down Expand Up @@ -653,18 +693,9 @@ Result<FieldProjection> ProjectStruct(const StructType& struct_type,
FieldProjection child_projection;

if (auto iter = node_info_map.find(field_id); iter != node_info_map.cend()) {
::avro::NodePtr field_node;
ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(iter->second.field_node, &field_node));
if (expected_field.type()->is_nested()) {
ICEBERG_ASSIGN_OR_RAISE(
child_projection,
ProjectNested(*expected_field.type(), field_node, prune_source));
} else {
ICEBERG_RETURN_UNEXPECTED(
ValidateAvroSchemaEvolution(*expected_field.type(), field_node));
}
child_projection.from = iter->second.local_index;
child_projection.kind = FieldProjection::Kind::kProjected;
ICEBERG_ASSIGN_OR_RAISE(child_projection,
ProjectField(expected_field, iter->second.field_node,
iter->second.local_index, prune_source));
} else if (MetadataColumns::IsMetadataColumn(field_id)) {
child_projection.kind = FieldProjection::Kind::kMetadata;
} else if (expected_field.optional()) {
Expand Down Expand Up @@ -701,20 +732,9 @@ Result<FieldProjection> ProjectList(const ListType& list_type,
}

FieldProjection element_projection;
::avro::NodePtr element_node;
ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(avro_node->leafAt(0), &element_node));
if (expected_element_field.type()->is_nested()) {
ICEBERG_ASSIGN_OR_RAISE(
element_projection,
ProjectNested(*expected_element_field.type(), element_node, prune_source));
} else {
ICEBERG_RETURN_UNEXPECTED(
ValidateAvroSchemaEvolution(*expected_element_field.type(), element_node));
}

// Set the element projection metadata but preserve its children
element_projection.kind = FieldProjection::Kind::kProjected;
element_projection.from = size_t{0};
ICEBERG_ASSIGN_OR_RAISE(element_projection,
ProjectField(expected_element_field, avro_node->leafAt(0),
size_t{0}, prune_source));

FieldProjection result;
result.children.emplace_back(std::move(element_projection));
Expand Down Expand Up @@ -770,18 +790,10 @@ Result<FieldProjection> ProjectMap(const MapType& map_type,

for (size_t i = 0; i < map_node->leaves(); ++i) {
FieldProjection sub_projection;
::avro::NodePtr sub_node;
ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(map_node->leafAt(i), &sub_node));
const auto& expected_sub_field = map_type.fields()[i];
if (expected_sub_field.type()->is_nested()) {
ICEBERG_ASSIGN_OR_RAISE(sub_projection, ProjectNested(*expected_sub_field.type(),
sub_node, prune_source));
} else {
ICEBERG_RETURN_UNEXPECTED(
ValidateAvroSchemaEvolution(*expected_sub_field.type(), sub_node));
}
sub_projection.kind = FieldProjection::Kind::kProjected;
sub_projection.from = i;
ICEBERG_ASSIGN_OR_RAISE(
sub_projection,
ProjectField(expected_sub_field, map_node->leafAt(i), i, prune_source));
result.children.emplace_back(std::move(sub_projection));
}

Expand Down Expand Up @@ -1017,9 +1029,9 @@ Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original
case ::avro::AVRO_STRING:
case ::avro::AVRO_BYTES:
case ::avro::AVRO_FIXED:
case ::avro::AVRO_NULL:
// For primitive types, just return a copy
return original_node;
case ::avro::AVRO_NULL:
case ::avro::AVRO_ENUM:
default:
return InvalidSchema("Unsupported Avro type for field ID application: {}",
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/avro/avro_schema_util_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class ToAvroNodeVisitor {
Status Visit(const UuidType& type, ::avro::NodePtr* node);
Status Visit(const FixedType& type, ::avro::NodePtr* node);
Status Visit(const BinaryType& type, ::avro::NodePtr* node);
Status Visit(const UnknownType&, ::avro::NodePtr*);
Status Visit(const StructType& type, ::avro::NodePtr* node);
Status Visit(const ListType& type, ::avro::NodePtr* node);
Status Visit(const MapType& type, ::avro::NodePtr* node);
Expand Down
19 changes: 19 additions & 0 deletions src/iceberg/json_serde.cc
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,8 @@ nlohmann::json ToJson(const Type& type) {
}
case TypeId::kUuid:
return "uuid";
case TypeId::kUnknown:
return "unknown";
}
std::unreachable();
}
Expand Down Expand Up @@ -437,12 +439,22 @@ Result<std::unique_ptr<Type>> StructTypeFromJson(const nlohmann::json& json) {
return std::make_unique<StructType>(std::move(fields));
}

Status ValidateUnknownFieldOptional(const Type& type, bool optional,
std::string_view field_name) {
if (type.type_id() == TypeId::kUnknown && !optional) {
return JsonParseError("Unknown type field '{}' must be optional", field_name);
}
return {};
}

Result<std::unique_ptr<Type>> ListTypeFromJson(const nlohmann::json& json) {
ICEBERG_ASSIGN_OR_RAISE(auto element_type, TypeFromJson(json[kElement]));
ICEBERG_ASSIGN_OR_RAISE(auto element_id, GetJsonValue<int32_t>(json, kElementId));
ICEBERG_ASSIGN_OR_RAISE(auto element_required,
GetJsonValue<bool>(json, kElementRequired));

ICEBERG_RETURN_UNEXPECTED(ValidateUnknownFieldOptional(*element_type, !element_required,
ListType::kElementName));
return std::make_unique<ListType>(
SchemaField(element_id, std::string(ListType::kElementName),
std::move(element_type), !element_required));
Expand All @@ -458,6 +470,10 @@ Result<std::unique_ptr<Type>> MapTypeFromJson(const nlohmann::json& json) {
ICEBERG_ASSIGN_OR_RAISE(auto value_id, GetJsonValue<int32_t>(json, kValueId));
ICEBERG_ASSIGN_OR_RAISE(auto value_required, GetJsonValue<bool>(json, kValueRequired));

ICEBERG_RETURN_UNEXPECTED(
ValidateUnknownFieldOptional(*key_type, /*optional=*/false, MapType::kKeyName));
ICEBERG_RETURN_UNEXPECTED(
ValidateUnknownFieldOptional(*value_type, !value_required, MapType::kValueName));
SchemaField key_field(key_id, std::string(MapType::kKeyName), std::move(key_type),
/*optional=*/false);
SchemaField value_field(value_id, std::string(MapType::kValueName),
Expand Down Expand Up @@ -494,6 +510,8 @@ Result<std::unique_ptr<Type>> TypeFromJson(const nlohmann::json& json) {
return std::make_unique<BinaryType>();
} else if (type_str == "uuid") {
return std::make_unique<UuidType>();
} else if (type_str == "unknown") {
return std::make_unique<UnknownType>();
Comment thread
manuzhang marked this conversation as resolved.
} else if (type_str.starts_with("fixed")) {
std::regex fixed_regex(R"(fixed\[\s*(\d+)\s*\])");
std::smatch match;
Expand Down Expand Up @@ -540,6 +558,7 @@ Result<std::unique_ptr<SchemaField>> FieldFromJson(const nlohmann::json& json) {
ICEBERG_ASSIGN_OR_RAISE(auto required, GetJsonValue<bool>(json, kRequired));
ICEBERG_ASSIGN_OR_RAISE(auto doc, GetJsonValueOrDefault<std::string>(json, kDoc));

ICEBERG_RETURN_UNEXPECTED(ValidateUnknownFieldOptional(*type, !required, name));
return std::make_unique<SchemaField>(field_id, std::move(name), std::move(type),
!required, doc);
}
Expand Down
27 changes: 24 additions & 3 deletions src/iceberg/parquet/parquet_data_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,14 @@ Result<std::shared_ptr<::arrow::Array>> ProjectListArrayImpl(
const auto& output_element_type = output_list_type->value_type();

std::shared_ptr<::arrow::Array> projected_values;
if (element_field.type()->is_nested()) {
if (element_projection.kind == FieldProjection::Kind::kNull) {
ICEBERG_ASSIGN_OR_RAISE(
projected_values,
MakeNullArray(output_element_type, list_array->values()->length(), pool));
} else if (element_projection.kind != FieldProjection::Kind::kProjected) {
return NotImplemented("Unsupported list element projection kind: {}",
ToString(element_projection.kind));
} else if (element_field.type()->is_nested()) {
const auto& nested_type =
internal::checked_cast<const NestedType&>(*element_field.type());
ICEBERG_ASSIGN_OR_RAISE(
Expand Down Expand Up @@ -219,7 +226,14 @@ Result<std::shared_ptr<::arrow::Array>> ProjectMapArray(

// Project keys
std::shared_ptr<::arrow::Array> projected_keys;
if (key_type->is_nested()) {
if (key_projection.kind == FieldProjection::Kind::kNull) {
ICEBERG_ASSIGN_OR_RAISE(
projected_keys,
MakeNullArray(output_map_type->key_type(), map_array->keys()->length(), pool));
} else if (key_projection.kind != FieldProjection::Kind::kProjected) {
return NotImplemented("Unsupported map key projection kind: {}",
ToString(key_projection.kind));
} else if (key_type->is_nested()) {
const auto& nested_type = internal::checked_cast<const NestedType&>(*key_type);
ICEBERG_ASSIGN_OR_RAISE(
projected_keys,
Expand All @@ -233,7 +247,14 @@ Result<std::shared_ptr<::arrow::Array>> ProjectMapArray(

// Project values
std::shared_ptr<::arrow::Array> projected_items;
if (value_type->is_nested()) {
if (value_projection.kind == FieldProjection::Kind::kNull) {
ICEBERG_ASSIGN_OR_RAISE(
projected_items,
MakeNullArray(output_map_type->item_type(), map_array->items()->length(), pool));
} else if (value_projection.kind != FieldProjection::Kind::kProjected) {
return NotImplemented("Unsupported map value projection kind: {}",
ToString(value_projection.kind));
} else if (value_type->is_nested()) {
const auto& nested_type = internal::checked_cast<const NestedType&>(*value_type);
ICEBERG_ASSIGN_OR_RAISE(
projected_items,
Expand Down
Loading
Loading