Skip to content

Commit ed6caa1

Browse files
committed
add support for partition evolution
1 parent 3514be0 commit ed6caa1

4 files changed

Lines changed: 64 additions & 37 deletions

File tree

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java

Lines changed: 36 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -447,45 +447,56 @@ public static PartitionData toPartitionData(StructLike sourceKey, Types.StructTy
447447
public static Expression generateExpressionFromPartitionSpec(Table table, Map<String, String> partitionSpec,
448448
boolean latestSpecOnly) throws SemanticException {
449449

450-
Map<String, PartitionField> partitionFieldsByName = getPartitionFields(table, latestSpecOnly).stream()
451-
.collect(Collectors.toMap(
452-
partitionField -> table.schema().findColumnName(partitionField.sourceId()),
453-
Function.identity())
450+
// Group partition fields by source column name to handle partition evolution
451+
// where the same source column may have multiple transforms across different specs
452+
Map<String, List<PartitionField>> partitionFieldsBySourceName = getPartitionFields(table, latestSpecOnly).stream()
453+
.collect(Collectors.groupingBy(
454+
partitionField -> table.schema().findColumnName(partitionField.sourceId()))
454455
);
455456

456457
Expression predicate = Expressions.alwaysTrue();
457458

458459
for (Map.Entry<String, String> entry : partitionSpec.entrySet()) {
459460
String partitionColumn = entry.getKey();
460-
PartitionField partitionField = partitionFieldsByName.get(partitionColumn);
461+
List<PartitionField> partitionFields = partitionFieldsBySourceName.get(partitionColumn);
461462

462-
if (partitionField == null) {
463+
if (partitionFields == null || partitionFields.isEmpty()) {
463464
throw new SemanticException(String.format("No partition column by the name: %s", partitionColumn));
464465
}
465-
Types.NestedField sourceField = table.schema().findField(partitionField.sourceId());
466-
Object sourceValue = Conversions.fromPartitionString(sourceField.type(), entry.getValue());
467-
// Apply the transform to the source value
468-
@SuppressWarnings("unchecked")
469-
Transform<Object, Object> transform = (Transform<Object, Object>) partitionField.transform();
470-
Object transformedValue = transform.bind(sourceField.type()).apply(sourceValue);
471-
472-
TransformSpec transformSpec = TransformSpec.fromString(transform.toString().toUpperCase(), sourceField.name());
473-
UnboundTerm<Object> term = SchemaUtils.toTerm(transformSpec);
474-
475-
predicate = Expressions.and(
476-
predicate, Expressions.equal(term, transformedValue));
466+
467+
// When there are multiple partition fields for the same source column (due to partition evolution),
468+
// create an OR expression that matches any of the transforms
469+
Expression columnPredicate = Expressions.alwaysFalse();
470+
471+
for (PartitionField partitionField : partitionFields) {
472+
Types.NestedField sourceField = table.schema().findField(partitionField.sourceId());
473+
Object sourceValue = Conversions.fromPartitionString(sourceField.type(), entry.getValue());
474+
// Apply the transform to the source value
475+
@SuppressWarnings("unchecked")
476+
Transform<Object, Object> transform = (Transform<Object, Object>) partitionField.transform();
477+
Object transformedValue = transform.bind(sourceField.type()).apply(sourceValue);
478+
479+
TransformSpec transformSpec = TransformSpec.fromString(transform.toString().toUpperCase(), sourceField.name());
480+
UnboundTerm<Object> term = SchemaUtils.toTerm(transformSpec);
481+
482+
columnPredicate = Expressions.or(
483+
columnPredicate, Expressions.equal(term, transformedValue));
484+
}
485+
486+
predicate = Expressions.and(predicate, columnPredicate);
477487
}
478488

479489
return predicate;
480490
}
481491

482492
public static List<PartitionField> getPartitionFields(Table table, boolean latestSpecOnly) {
483-
return latestSpecOnly ? table.spec().fields() :
484-
table.specs().values().stream()
485-
.flatMap(spec -> spec.fields().stream()
486-
.filter(f -> !f.transform().isVoid()))
487-
.distinct()
488-
.collect(Collectors.toList());
493+
if (latestSpecOnly) {
494+
return table.spec().fields();
495+
}
496+
return table.specs().values().stream()
497+
.flatMap(spec -> spec.fields().stream())
498+
.filter(f -> !f.transform().isVoid())
499+
.toList();
489500
}
490501

491502
public static Partition getPartition(Configuration conf,
@@ -499,6 +510,7 @@ public static Partition getPartition(Configuration conf,
499510
}
500511

501512
try {
513+
// Use the actual partition path (with transforms) as the partition name
502514
String partitionName = partitionNames.getFirst();
503515
if (partitionSpec.size() != Warehouse.makeSpecFromName(partitionName).size()) {
504516
return null;

ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/formatter/TextDescTableFormatter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,8 +174,8 @@ private void addPartitionData(DataOutputStream out, HiveConf conf, String column
174174
List<FieldSchema> partitionColumns = null;
175175
if (table.isPartitioned()) {
176176
partitionColumns = table.hasNonNativePartitionSupport() ?
177-
table.getStorageHandler().getPartitionKeys(table) :
178-
table.getPartCols();
177+
table.getStorageHandler().getPartitionKeys(table) :
178+
table.getPartCols();
179179
}
180180
if (CollectionUtils.isNotEmpty(partitionColumns) &&
181181
conf.getBoolVar(ConfVars.HIVE_DISPLAY_PARTITION_COLUMNS_SEPARATELY)) {

ql/src/java/org/apache/hadoop/hive/ql/metadata/DummyPartition.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,9 @@ public List<String> getValues() {
8484
Table table = this.getTable();
8585
List<String> values = new ArrayList<String>();
8686
for (FieldSchema fs :
87-
table.hasNonNativePartitionSupport() ?
88-
table.getStorageHandler().getPartitionKeys(table) :
89-
table.getPartCols()) {
87+
table.hasNonNativePartitionSupport() ?
88+
table.getStorageHandler().getPartitionKeys(table) :
89+
table.getPartCols()) {
9090
values.add(partSpec.get(fs.getName()));
9191
}
9292
return values;

ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1348,14 +1348,7 @@ public TableSpec(Hive db, HiveConf conf, ASTNode ast, boolean allowDynamicPartit
13481348
if (partHandle == null) {
13491349
// if partSpec doesn't exists in DB, return a delegate one
13501350
// and the actual partition is created in MoveTask
1351-
try {
1352-
partHandle = tableHandle.hasNonNativePartitionSupport() ?
1353-
new DummyPartition(tableHandle, Warehouse.makePartName(partSpec, false), partSpec) :
1354-
new Partition(tableHandle, partSpec, null);
1355-
} catch (MetaException e) {
1356-
throw new SemanticException("Unable to construct name for dummy partition due to: ", e);
1357-
}
1358-
1351+
partHandle = createDummyPartitionHandle(tableHandle, partSpec);
13591352
} else {
13601353
partitions.add(partHandle);
13611354
}
@@ -1382,6 +1375,28 @@ public TableSpec(Hive db, HiveConf conf, ASTNode ast, boolean allowDynamicPartit
13821375
}
13831376
}
13841377

1378+
/**
1379+
* Creates a dummy partition handle for the given table and partition spec.
1380+
* The partition doesn't exist in DB yet - it will be created in MoveTask.
1381+
*
1382+
* @param table the table
1383+
* @param partSpec partition specification
1384+
* @return dummy partition handle
1385+
* @throws SemanticException if unable to construct partition name
1386+
*/
1387+
private Partition createDummyPartitionHandle(Table table, Map<String, String> partSpec)
1388+
throws HiveException {
1389+
try {
1390+
if (table.hasNonNativePartitionSupport()) {
1391+
return new DummyPartition(table, Warehouse.makePartName(partSpec, false), partSpec);
1392+
} else {
1393+
return new Partition(table, partSpec, null);
1394+
}
1395+
} catch (MetaException e) {
1396+
throw new SemanticException("Unable to construct name for dummy partition due to: ", e);
1397+
}
1398+
}
1399+
13851400
private boolean checkUseNativeApi(HiveConf conf, ASTNode ast) {
13861401
boolean isLoad = ast.getParent() != null && ast.getParent().getType() == HiveParser.TOK_LOAD;
13871402
return isLoad && tableHandle.isNonNative() && conf.getBoolVar(HIVE_LOAD_DATA_USE_NATIVE_API);

0 commit comments

Comments
 (0)