Skip to content

Commit 604422b

Browse files
authored
Core: Refactor internal Avro reader to resolve schemas directly (#9366)
1 parent a8f468d commit 604422b

9 files changed

Lines changed: 835 additions & 88 deletions

File tree

core/src/main/java/org/apache/iceberg/avro/Avro.java

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.apache.iceberg.io.FileAppender;
6262
import org.apache.iceberg.io.InputFile;
6363
import org.apache.iceberg.io.OutputFile;
64+
import org.apache.iceberg.mapping.MappingUtil;
6465
import org.apache.iceberg.mapping.NameMapping;
6566
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
6667
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -610,11 +611,12 @@ public static class ReadBuilder {
610611
private org.apache.iceberg.Schema schema = null;
611612
private Function<Schema, DatumReader<?>> createReaderFunc = null;
612613
private BiFunction<org.apache.iceberg.Schema, Schema, DatumReader<?>> createReaderBiFunc = null;
614+
private Function<org.apache.iceberg.Schema, DatumReader<?>> createResolvingReaderFunc = null;
613615

614616
@SuppressWarnings("UnnecessaryLambda")
615-
private final Function<Schema, DatumReader<?>> defaultCreateReaderFunc =
617+
private final Function<org.apache.iceberg.Schema, DatumReader<?>> defaultCreateReaderFunc =
616618
readSchema -> {
617-
GenericAvroReader<?> reader = new GenericAvroReader<>(readSchema);
619+
GenericAvroReader<?> reader = GenericAvroReader.create(readSchema);
618620
reader.setClassLoader(loader);
619621
return reader;
620622
};
@@ -627,15 +629,28 @@ private ReadBuilder(InputFile file) {
627629
this.file = file;
628630
}
629631

632+
public ReadBuilder createResolvingReader(
633+
Function<org.apache.iceberg.Schema, DatumReader<?>> readerFunction) {
634+
Preconditions.checkState(
635+
createReaderBiFunc == null && createReaderFunc == null,
636+
"Cannot set multiple read builder functions");
637+
this.createResolvingReaderFunc = readerFunction;
638+
return this;
639+
}
640+
630641
public ReadBuilder createReaderFunc(Function<Schema, DatumReader<?>> readerFunction) {
631-
Preconditions.checkState(createReaderBiFunc == null, "Cannot set multiple createReaderFunc");
642+
Preconditions.checkState(
643+
createReaderBiFunc == null && createResolvingReaderFunc == null,
644+
"Cannot set multiple read builder functions");
632645
this.createReaderFunc = readerFunction;
633646
return this;
634647
}
635648

636649
public ReadBuilder createReaderFunc(
637650
BiFunction<org.apache.iceberg.Schema, Schema, DatumReader<?>> readerFunction) {
638-
Preconditions.checkState(createReaderFunc == null, "Cannot set multiple createReaderFunc");
651+
Preconditions.checkState(
652+
createReaderFunc == null && createResolvingReaderFunc == null,
653+
"Cannot set multiple read builder functions");
639654
this.createReaderBiFunc = readerFunction;
640655
return this;
641656
}
@@ -683,23 +698,34 @@ public ReadBuilder classLoader(ClassLoader classLoader) {
683698
return this;
684699
}
685700

701+
@SuppressWarnings("unchecked")
686702
public <D> AvroIterable<D> build() {
687703
Preconditions.checkNotNull(schema, "Schema is required");
688-
Function<Schema, DatumReader<?>> readerFunc;
704+
705+
if (null == nameMapping) {
706+
this.nameMapping = MappingUtil.create(schema);
707+
}
708+
709+
DatumReader<D> reader;
689710
if (createReaderBiFunc != null) {
690-
readerFunc = avroSchema -> createReaderBiFunc.apply(schema, avroSchema);
711+
reader =
712+
new ProjectionDatumReader<>(
713+
avroSchema -> createReaderBiFunc.apply(schema, avroSchema), schema, renames, null);
691714
} else if (createReaderFunc != null) {
692-
readerFunc = createReaderFunc;
715+
reader = new ProjectionDatumReader<>(createReaderFunc, schema, renames, null);
716+
} else if (createResolvingReaderFunc != null) {
717+
reader = (DatumReader<D>) createResolvingReaderFunc.apply(schema);
693718
} else {
694-
readerFunc = defaultCreateReaderFunc;
719+
reader = (DatumReader<D>) defaultCreateReaderFunc.apply(schema);
720+
}
721+
722+
if (reader instanceof SupportsCustomRecords) {
723+
((SupportsCustomRecords) reader).setClassLoader(loader);
724+
((SupportsCustomRecords) reader).setRenames(renames);
695725
}
696726

697727
return new AvroIterable<>(
698-
file,
699-
new ProjectionDatumReader<>(readerFunc, schema, renames, nameMapping),
700-
start,
701-
length,
702-
reuseContainers);
728+
file, new NameMappingDatumReader<>(nameMapping, reader), start, length, reuseContainers);
703729
}
704730
}
705731

core/src/main/java/org/apache/iceberg/avro/AvroIterable.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.iceberg.io.CloseableIterable;
3232
import org.apache.iceberg.io.CloseableIterator;
3333
import org.apache.iceberg.io.InputFile;
34+
import org.apache.iceberg.relocated.com.google.common.base.Suppliers;
3435
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
3536

3637
public class AvroIterable<D> extends CloseableGroup implements CloseableIterable<D> {
@@ -78,7 +79,8 @@ public CloseableIterator<D> iterator() {
7879
if (start != null) {
7980
if (reader instanceof SupportsRowPosition) {
8081
((SupportsRowPosition) reader)
81-
.setRowPositionSupplier(() -> AvroIO.findStartingRowPos(file::newStream, start));
82+
.setRowPositionSupplier(
83+
Suppliers.memoize(() -> AvroIO.findStartingRowPos(file::newStream, start)));
8284
}
8385
fileReader = new AvroRangeIterator<>(fileReader, start, end);
8486
} else if (reader instanceof SupportsRowPosition) {
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.avro;
20+
21+
import java.util.Deque;
22+
import java.util.List;
23+
import org.apache.avro.Schema;
24+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
25+
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
26+
import org.apache.iceberg.types.Type;
27+
import org.apache.iceberg.types.Types;
28+
29+
public class AvroWithPartnerVisitor<P, R> {
30+
public interface PartnerAccessors<P> {
31+
P fieldPartner(P partnerStruct, Integer fieldId, String name);
32+
33+
P mapKeyPartner(P partnerMap);
34+
35+
P mapValuePartner(P partnerMap);
36+
37+
P listElementPartner(P partnerList);
38+
}
39+
40+
static class FieldIDAccessors implements AvroWithPartnerVisitor.PartnerAccessors<Type> {
41+
private static final FieldIDAccessors INSTANCE = new FieldIDAccessors();
42+
43+
public static FieldIDAccessors get() {
44+
return INSTANCE;
45+
}
46+
47+
@Override
48+
public Type fieldPartner(Type partner, Integer fieldId, String name) {
49+
Types.NestedField field = partner.asStructType().field(fieldId);
50+
return field != null ? field.type() : null;
51+
}
52+
53+
@Override
54+
public Type mapKeyPartner(Type partner) {
55+
return partner.asMapType().keyType();
56+
}
57+
58+
@Override
59+
public Type mapValuePartner(Type partner) {
60+
return partner.asMapType().valueType();
61+
}
62+
63+
@Override
64+
public Type listElementPartner(Type partner) {
65+
return partner.asListType().elementType();
66+
}
67+
}
68+
69+
/** Used to fail on recursive types. */
70+
private Deque<String> recordLevels = Lists.newLinkedList();
71+
72+
public R record(P partner, Schema record, List<R> fieldResults) {
73+
return null;
74+
}
75+
76+
public R union(P partner, Schema union, List<R> optionResults) {
77+
return null;
78+
}
79+
80+
public R array(P partner, Schema array, R elementResult) {
81+
return null;
82+
}
83+
84+
public R arrayMap(P partner, Schema map, R keyResult, R valueResult) {
85+
return null;
86+
}
87+
88+
public R map(P partner, Schema map, R valueResult) {
89+
return null;
90+
}
91+
92+
public R primitive(P partner, Schema primitive) {
93+
return null;
94+
}
95+
96+
public static <P, R> R visit(
97+
P partner,
98+
Schema schema,
99+
AvroWithPartnerVisitor<P, R> visitor,
100+
PartnerAccessors<P> accessors) {
101+
switch (schema.getType()) {
102+
case RECORD:
103+
return visitRecord(partner, schema, visitor, accessors);
104+
105+
case UNION:
106+
return visitUnion(partner, schema, visitor, accessors);
107+
108+
case ARRAY:
109+
return visitArray(partner, schema, visitor, accessors);
110+
111+
case MAP:
112+
return visitor.map(
113+
partner,
114+
schema,
115+
visit(
116+
partner != null ? accessors.mapValuePartner(partner) : null,
117+
schema.getValueType(),
118+
visitor,
119+
accessors));
120+
121+
default:
122+
return visitor.primitive(partner, schema);
123+
}
124+
}
125+
126+
private static <P, R> R visitRecord(
127+
P partnerStruct,
128+
Schema record,
129+
AvroWithPartnerVisitor<P, R> visitor,
130+
PartnerAccessors<P> accessors) {
131+
// check to make sure this hasn't been visited before
132+
String recordName = record.getFullName();
133+
Preconditions.checkState(
134+
!visitor.recordLevels.contains(recordName),
135+
"Cannot process recursive Avro record %s",
136+
recordName);
137+
visitor.recordLevels.push(recordName);
138+
139+
List<Schema.Field> fields = record.getFields();
140+
List<R> results = Lists.newArrayListWithExpectedSize(fields.size());
141+
for (int pos = 0; pos < fields.size(); pos += 1) {
142+
Schema.Field field = fields.get(pos);
143+
Integer fieldId = AvroSchemaUtil.fieldId(field);
144+
145+
P fieldPartner =
146+
partnerStruct != null && fieldId != null
147+
? accessors.fieldPartner(partnerStruct, fieldId, field.name())
148+
: null;
149+
results.add(visit(fieldPartner, field.schema(), visitor, accessors));
150+
}
151+
152+
visitor.recordLevels.pop();
153+
154+
return visitor.record(partnerStruct, record, results);
155+
}
156+
157+
private static <P, R> R visitUnion(
158+
P partner,
159+
Schema union,
160+
AvroWithPartnerVisitor<P, R> visitor,
161+
PartnerAccessors<P> accessors) {
162+
Preconditions.checkArgument(
163+
AvroSchemaUtil.isOptionSchema(union), "Cannot visit non-option union: %s", union);
164+
165+
List<Schema> types = union.getTypes();
166+
List<R> options = Lists.newArrayListWithExpectedSize(types.size());
167+
for (Schema branch : types) {
168+
options.add(visit(partner, branch, visitor, accessors));
169+
}
170+
171+
return visitor.union(partner, union, options);
172+
}
173+
174+
private static <P, R> R visitArray(
175+
P partnerArray,
176+
Schema array,
177+
AvroWithPartnerVisitor<P, R> visitor,
178+
PartnerAccessors<P> accessors) {
179+
if (array.getLogicalType() instanceof LogicalMap) {
180+
Preconditions.checkState(
181+
AvroSchemaUtil.isKeyValueSchema(array.getElementType()),
182+
"Cannot visit invalid logical map type: %s",
183+
array);
184+
185+
List<Schema.Field> keyValueFields = array.getElementType().getFields();
186+
return visitor.arrayMap(
187+
partnerArray,
188+
array,
189+
visit(
190+
partnerArray != null ? accessors.mapKeyPartner(partnerArray) : null,
191+
keyValueFields.get(0).schema(),
192+
visitor,
193+
accessors),
194+
visit(
195+
partnerArray != null ? accessors.mapValuePartner(partnerArray) : null,
196+
keyValueFields.get(1).schema(),
197+
visitor,
198+
accessors));
199+
200+
} else {
201+
return visitor.array(
202+
partnerArray,
203+
array,
204+
visit(
205+
partnerArray != null ? accessors.listElementPartner(partnerArray) : null,
206+
array.getElementType(),
207+
visitor,
208+
accessors));
209+
}
210+
}
211+
}

0 commit comments

Comments
 (0)