Skip to content

Commit eaa9687

Browse files
committed
add support for transforms
1 parent e77fe17 commit eaa9687

6 files changed

Lines changed: 187 additions & 151 deletions

File tree

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import java.time.temporal.ChronoUnit;
3131
import java.util.List;
3232
import java.util.stream.Collectors;
33-
import org.apache.commons.lang3.ObjectUtils;
3433
import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
3534
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
3635
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
@@ -111,9 +110,7 @@ private static Expression translate(ExpressionTree tree, List<PredicateLeaf> lea
111110
*/
112111
private static Expression translateLeaf(PredicateLeaf leaf) {
113112
TransformSpec transformSpec = TransformSpec.fromStringWithColumnName(leaf.getColumnName());
114-
String columnName = transformSpec.getColumnName();
115-
UnboundTerm<Object> column =
116-
ObjectUtils.defaultIfNull(toTerm(columnName, transformSpec), Expressions.ref(columnName));
113+
UnboundTerm<Object> column = SchemaUtils.toTerm(transformSpec);
117114

118115
switch (leaf.getOperator()) {
119116
case EQUALS:
@@ -144,30 +141,6 @@ private static Expression translateLeaf(PredicateLeaf leaf) {
144141
}
145142
}
146143

147-
public static UnboundTerm<Object> toTerm(String columnName, TransformSpec transformSpec) {
148-
if (transformSpec == null) {
149-
return null;
150-
}
151-
switch (transformSpec.getTransformType()) {
152-
case YEAR:
153-
return Expressions.year(columnName);
154-
case MONTH:
155-
return Expressions.month(columnName);
156-
case DAY:
157-
return Expressions.day(columnName);
158-
case HOUR:
159-
return Expressions.hour(columnName);
160-
case TRUNCATE:
161-
return Expressions.truncate(columnName, transformSpec.getTransformParam());
162-
case BUCKET:
163-
return Expressions.bucket(columnName, transformSpec.getTransformParam());
164-
case IDENTITY:
165-
return null;
166-
default:
167-
throw new UnsupportedOperationException("Unknown transformSpec: " + transformSpec);
168-
}
169-
}
170-
171144
// PredicateLeafImpl has a work-around for Kryo serialization with java.util.Date objects where it converts values to
172145
// Timestamp using Date#getTime. This conversion discards microseconds, so this is a necessary to avoid it.
173146
private static final DynFields.UnboundField<?> LITERAL_FIELD = DynFields.builder()

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import java.util.stream.Collectors;
3434
import java.util.stream.Stream;
3535
import org.apache.commons.collections4.CollectionUtils;
36-
import org.apache.commons.lang3.ObjectUtils;
3736
import org.apache.hadoop.conf.Configuration;
3837
import org.apache.hadoop.fs.FileSystem;
3938
import org.apache.hadoop.fs.Path;
@@ -972,10 +971,7 @@ private static UnboundPredicate<Object> getPartitionPredicate(PartitionData part
972971
String columName = schema.findField(field.sourceId()).name();
973972
TransformSpec transformSpec = TransformSpec.fromString(field.transform().toString(), columName);
974973

975-
UnboundTerm<Object> partitionColumn =
976-
ObjectUtils.defaultIfNull(HiveIcebergFilterFactory.toTerm(columName, transformSpec),
977-
Expressions.ref(field.name()));
978-
974+
UnboundTerm<Object> partitionColumn = SchemaUtils.toTerm(transformSpec);
979975
return Expressions.equal(partitionColumn, partitionData.get(index, Object.class));
980976
}
981977

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -866,8 +866,7 @@ public List<TransformSpec> getPartitionTransformSpec(org.apache.hadoop.hive.ql.m
866866
return table.spec().fields().stream()
867867
.filter(f -> !f.transform().isVoid())
868868
.map(f -> {
869-
TransformSpec spec = IcebergTableUtil.getTransformSpec(
870-
table, f.transform().toString(), f.sourceId());
869+
TransformSpec spec = IcebergTableUtil.getTransformSpec(table, f.transform().toString(), f.sourceId());
871870
spec.setFieldName(f.name());
872871
return spec;
873872
})
@@ -882,8 +881,7 @@ public Map<Integer, List<TransformSpec>> getPartitionTransformSpecs(
882881
e.getValue().fields().stream()
883882
.filter(f -> !f.transform().isVoid())
884883
.map(f -> {
885-
TransformSpec spec = IcebergTableUtil.getTransformSpec(
886-
table, f.transform().toString(), f.sourceId());
884+
TransformSpec spec = IcebergTableUtil.getTransformSpec(table, f.transform().toString(), f.sourceId());
887885
spec.setFieldName(f.name());
888886
return Pair.of(e.getKey(), spec);
889887
}))
@@ -893,9 +891,8 @@ public Map<Integer, List<TransformSpec>> getPartitionTransformSpecs(
893891

894892
private List<TransformSpec> getSortTransformSpec(Table table) {
895893
return table.sortOrder().fields().stream().map(s ->
896-
IcebergTableUtil.getTransformSpec(table, s.transform().toString(), s.sourceId())
897-
)
898-
.collect(Collectors.toList());
894+
IcebergTableUtil.getTransformSpec(table, s.transform().toString(), s.sourceId()))
895+
.toList();
899896
}
900897

901898
@Override
@@ -2056,8 +2053,7 @@ public boolean canUseTruncate(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
20562053
@Override
20572054
public List<Partition> getPartitions(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
20582055
Map<String, String> partitionSpec, boolean latestSpecOnly) throws SemanticException {
2059-
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
2060-
List<String> partNames = IcebergTableUtil.getPartitionNames(table, partitionSpec, latestSpecOnly);
2056+
List<String> partNames = IcebergTableUtil.getPartitionNames(conf, hmsTable, partitionSpec, latestSpecOnly);
20612057
return IcebergTableUtil.convertNameToMetastorePartition(hmsTable, partNames);
20622058
}
20632059

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java

Lines changed: 65 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
import org.apache.iceberg.expressions.Expression;
9292
import org.apache.iceberg.expressions.Expressions;
9393
import org.apache.iceberg.expressions.ResidualEvaluator;
94+
import org.apache.iceberg.expressions.UnboundTerm;
9495
import org.apache.iceberg.hive.IcebergCatalogProperties;
9596
import org.apache.iceberg.io.CloseableIterable;
9697
import org.apache.iceberg.mr.Catalogs;
@@ -102,8 +103,8 @@
102103
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
103104
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
104105
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
106+
import org.apache.iceberg.transforms.Transform;
105107
import org.apache.iceberg.types.Conversions;
106-
import org.apache.iceberg.types.Type;
107108
import org.apache.iceberg.types.Types;
108109
import org.apache.iceberg.util.ByteBuffers;
109110
import org.apache.iceberg.util.Pair;
@@ -260,41 +261,15 @@ static PartitionStatisticsFile getPartitionStatsFile(Table table, long snapshotI
260261
* @return iceberg partition spec, always non-null
261262
*/
262263
public static PartitionSpec spec(Configuration configuration, Schema schema) {
263-
List<TransformSpec> partitionTransformSpecList = SessionStateUtil
264+
List<TransformSpec> partitionBy = SessionStateUtil
264265
.getResource(configuration, hive_metastoreConstants.PARTITION_TRANSFORM_SPEC)
265266
.map(o -> (List<TransformSpec>) o).orElse(null);
266267

267-
if (partitionTransformSpecList == null) {
268+
if (partitionBy == null) {
268269
LOG.warn(PARTITION_TRANSFORM_SPEC_NOT_FOUND);
269270
return null;
270271
}
271-
PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
272-
partitionTransformSpecList.forEach(spec -> {
273-
switch (spec.getTransformType()) {
274-
case IDENTITY:
275-
builder.identity(spec.getColumnName().toLowerCase());
276-
break;
277-
case YEAR:
278-
builder.year(spec.getColumnName());
279-
break;
280-
case MONTH:
281-
builder.month(spec.getColumnName());
282-
break;
283-
case DAY:
284-
builder.day(spec.getColumnName());
285-
break;
286-
case HOUR:
287-
builder.hour(spec.getColumnName());
288-
break;
289-
case TRUNCATE:
290-
builder.truncate(spec.getColumnName(), spec.getTransformParam());
291-
break;
292-
case BUCKET:
293-
builder.bucket(spec.getColumnName(), spec.getTransformParam());
294-
break;
295-
}
296-
});
297-
return builder.build();
272+
return SchemaUtils.createPartitionSpec(schema, partitionBy);
298273
}
299274

300275
public static void updateSpec(Configuration configuration, Table table) {
@@ -317,32 +292,8 @@ public static void updateSpec(Configuration configuration, Table table) {
317292
LOG.warn(PARTITION_TRANSFORM_SPEC_NOT_FOUND);
318293
return;
319294
}
320-
partitionTransformSpecList.forEach(spec -> {
321-
switch (spec.getTransformType()) {
322-
case IDENTITY:
323-
updatePartitionSpec.addField(spec.getColumnName());
324-
break;
325-
case YEAR:
326-
updatePartitionSpec.addField(Expressions.year(spec.getColumnName()));
327-
break;
328-
case MONTH:
329-
updatePartitionSpec.addField(Expressions.month(spec.getColumnName()));
330-
break;
331-
case DAY:
332-
updatePartitionSpec.addField(Expressions.day(spec.getColumnName()));
333-
break;
334-
case HOUR:
335-
updatePartitionSpec.addField(Expressions.hour(spec.getColumnName()));
336-
break;
337-
case TRUNCATE:
338-
updatePartitionSpec.addField(Expressions.truncate(spec.getColumnName(), spec.getTransformParam()));
339-
break;
340-
case BUCKET:
341-
updatePartitionSpec.addField(Expressions.bucket(spec.getColumnName(), spec.getTransformParam()));
342-
break;
343-
}
344-
});
345-
295+
partitionTransformSpecList.forEach(spec ->
296+
updatePartitionSpec.addField(SchemaUtils.toTerm(spec)));
346297
updatePartitionSpec.commit();
347298
}
348299

@@ -495,27 +446,37 @@ public static PartitionData toPartitionData(StructLike sourceKey, Types.StructTy
495446

496447
public static Expression generateExpressionFromPartitionSpec(Table table, Map<String, String> partitionSpec,
497448
boolean latestSpecOnly) throws SemanticException {
498-
Map<String, PartitionField> partitionFieldMap = getPartitionFields(table, latestSpecOnly).stream()
499-
.collect(Collectors.toMap(PartitionField::name, Function.identity()));
500-
Expression finalExp = Expressions.alwaysTrue();
449+
450+
Map<String, PartitionField> partitionFieldsByName = getPartitionFields(table, latestSpecOnly).stream()
451+
.collect(Collectors.toMap(
452+
partitionField -> table.schema().findColumnName(partitionField.sourceId()),
453+
Function.identity())
454+
);
455+
456+
Expression predicate = Expressions.alwaysTrue();
457+
501458
for (Map.Entry<String, String> entry : partitionSpec.entrySet()) {
502-
String partColName = entry.getKey();
503-
if (partitionFieldMap.containsKey(partColName)) {
504-
PartitionField partitionField = partitionFieldMap.get(partColName);
505-
if (partitionField.transform().isIdentity()) {
506-
final Type partKeyType = table.schema().findField(partitionField.sourceId()).type();
507-
final Object partKeyVal = Conversions.fromPartitionString(partKeyType, entry.getValue());
508-
Expression boundPredicate = Expressions.equal(partColName, partKeyVal);
509-
finalExp = Expressions.and(finalExp, boundPredicate);
510-
} else {
511-
throw new SemanticException(
512-
String.format("Partition transforms are not supported here: %s", partColName));
513-
}
514-
} else {
515-
throw new SemanticException(String.format("No partition column by the name: %s", partColName));
459+
String partitionColumn = entry.getKey();
460+
PartitionField partitionField = partitionFieldsByName.get(partitionColumn);
461+
462+
if (partitionField == null) {
463+
throw new SemanticException(String.format("No partition column by the name: %s", partitionColumn));
516464
}
465+
Types.NestedField sourceField = table.schema().findField(partitionField.sourceId());
466+
Object sourceValue = Conversions.fromPartitionString(sourceField.type(), entry.getValue());
467+
// Apply the transform to the source value
468+
@SuppressWarnings("unchecked")
469+
Transform<Object, Object> transform = (Transform<Object, Object>) partitionField.transform();
470+
Object transformedValue = transform.bind(sourceField.type()).apply(sourceValue);
471+
472+
TransformSpec transformSpec = TransformSpec.fromString(transform.toString().toUpperCase(), sourceField.name());
473+
UnboundTerm<Object> term = SchemaUtils.toTerm(transformSpec);
474+
475+
predicate = Expressions.and(
476+
predicate, Expressions.equal(term, transformedValue));
517477
}
518-
return finalExp;
478+
479+
return predicate;
519480
}
520481

521482
public static List<PartitionField> getPartitionFields(Table table, boolean latestSpecOnly) {
@@ -527,36 +488,59 @@ public static List<PartitionField> getPartitionFields(Table table, boolean lates
527488
.collect(Collectors.toList());
528489
}
529490

491+
public static Partition getPartition(Configuration conf,
492+
org.apache.hadoop.hive.ql.metadata.Table table, Map<String, String> partitionSpec)
493+
throws SemanticException {
494+
List<String> partitionNames =
495+
getPartitionNames(conf, table, partitionSpec, false);
496+
497+
if (partitionNames.isEmpty()) {
498+
return null;
499+
}
500+
501+
try {
502+
String partitionName = partitionNames.getFirst();
503+
if (partitionSpec.size() != Warehouse.makeSpecFromName(partitionName).size()) {
504+
return null;
505+
}
506+
return new DummyPartition(table, partitionName, partitionSpec);
507+
} catch (MetaException e) {
508+
throw new SemanticException("Unable to create partition spec from name", e);
509+
}
510+
}
511+
530512
/**
531513
* Returns a list of partition names satisfying the provided partition spec.
532514
* @param table Iceberg table
533515
* @param partSpecMap Partition Spec used as the criteria for filtering
534516
* @param latestSpecOnly when True, returns partitions with the current spec only, else - any specs
535517
* @return List of partition names
536518
*/
537-
public static List<String> getPartitionNames(Table table, Map<String, String> partSpecMap,
519+
public static List<String> getPartitionNames(Configuration conf,
520+
org.apache.hadoop.hive.ql.metadata.Table table, Map<String, String> partSpecMap,
538521
boolean latestSpecOnly) throws SemanticException {
522+
Table icebergTable = getTable(conf, table.getTTable());
539523
Expression expression = IcebergTableUtil.generateExpressionFromPartitionSpec(
540-
table, partSpecMap, latestSpecOnly);
524+
icebergTable, partSpecMap, latestSpecOnly);
541525
PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils.createMetadataTableInstance(
542-
table, MetadataTableType.PARTITIONS);
526+
icebergTable, MetadataTableType.PARTITIONS);
543527

544528
try (CloseableIterable<FileScanTask> fileScanTasks = partitionsTable.newScan().planFiles()) {
545529
return FluentIterable.from(fileScanTasks)
546530
.transformAndConcat(task -> task.asDataTask().rows())
547531
.transform(row -> {
548532
StructLike data = row.get(IcebergTableUtil.PART_IDX, StructProjection.class);
549-
PartitionSpec spec = table.specs().get(row.get(IcebergTableUtil.SPEC_IDX, Integer.class));
533+
PartitionSpec spec = icebergTable.specs().get(row.get(IcebergTableUtil.SPEC_IDX, Integer.class));
550534
return Maps.immutableEntry(
551535
IcebergTableUtil.toPartitionData(
552-
data, Partitioning.partitionType(table), spec.partitionType()),
536+
data, Partitioning.partitionType(icebergTable), spec.partitionType()),
553537
spec);
554538
}).filter(e -> {
555539
ResidualEvaluator resEval = ResidualEvaluator.of(e.getValue(),
556540
expression, false);
557541
return e.getValue().isPartitioned() &&
558542
resEval.residualFor(e.getKey()).isEquivalentTo(Expressions.alwaysTrue()) &&
559-
(e.getValue().specId() == table.spec().specId() || !latestSpecOnly);
543+
(e.getValue().specId() == icebergTable.spec().specId() || !latestSpecOnly);
560544

561545
}).transform(e -> e.getValue().partitionToPath(e.getKey())).toSortedList(
562546
Comparator.naturalOrder());
@@ -596,7 +580,7 @@ public static TransformSpec getTransformSpec(Table table, String transformName,
596580
public static <T> List<T> readColStats(Table table, Long snapshotId, Predicate<BlobMetadata> filter) {
597581
List<T> colStats = Lists.newArrayList();
598582

599-
String statsPath = IcebergTableUtil.getColStatsPath(table, snapshotId);
583+
String statsPath = IcebergTableUtil.getColStatsPath(table, snapshotId);
600584
if (statsPath == null) {
601585
return colStats;
602586
}
@@ -638,34 +622,6 @@ public static boolean hasUndergonePartitionEvolution(Table table) {
638622
.anyMatch(id -> id != table.spec().specId());
639623
}
640624

641-
public static Partition getPartition(Configuration conf,
642-
org.apache.hadoop.hive.ql.metadata.Table table,
643-
Map<String, String> partitionSpec) throws SemanticException {
644-
try {
645-
List<String> partNames = getPartitionNames(conf, table, partitionSpec, false);
646-
if (partNames.isEmpty()) {
647-
return null;
648-
}
649-
String expectedName = Warehouse.makePartName(partitionSpec, false);
650-
if (!expectedName.equals(partNames.getFirst())) {
651-
return null;
652-
}
653-
return new DummyPartition(table, expectedName, partitionSpec);
654-
} catch (SemanticException e) {
655-
return null;
656-
} catch (MetaException e) {
657-
throw new SemanticException("Unable to construct dummy partition", e);
658-
}
659-
}
660-
661-
public static List<String> getPartitionNames(Configuration conf,
662-
org.apache.hadoop.hive.ql.metadata.Table table,
663-
Map<String, String> partitionSpec,
664-
boolean latestSpecOnly) throws SemanticException {
665-
Table icebergTable = getTable(conf, table.getTTable());
666-
return getPartitionNames(icebergTable, partitionSpec, latestSpecOnly);
667-
}
668-
669625
public static <T extends ContentFile<?>> Set<String> getPartitionNames(Table icebergTable, Iterable<T> files,
670626
Boolean latestSpecOnly) {
671627
Set<String> partitions = Sets.newHashSet();

0 commit comments

Comments
 (0)