From c0dfa987426d3efdf99622ffc67f07207fb47f0f Mon Sep 17 00:00:00 2001 From: Denys Kuzmenko Date: Fri, 23 Jan 2026 13:36:51 +0200 Subject: [PATCH 1/2] partition retrieval optimization --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java index 5be6735c1e3f..263bbf36eac7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java @@ -612,7 +612,8 @@ public boolean equals(Object obj) { public List getPartCols() { List partKeys = tTable.getPartitionKeys(); if (partKeys == null) { - partKeys = new ArrayList<>(); + partKeys = hasNonNativePartitionSupport() ? + getStorageHandler().getPartitionKeys(this) : new ArrayList<>(); tTable.setPartitionKeys(partKeys); } return partKeys; @@ -625,10 +626,7 @@ public FieldSchema getPartColByName(String colName) { } public List getPartColNames() { - List partCols = hasNonNativePartitionSupport() ? - getStorageHandler().getPartitionKeys(this) : getPartCols(); - return partCols.stream().map(FieldSchema::getName) - .collect(Collectors.toList()); + return getPartCols().stream().map(FieldSchema::getName).toList(); } public boolean hasNonNativePartitionSupport() { From a0c17f3473446d31d6e45e189a183c76c90304f9 Mon Sep 17 00:00:00 2001 From: Denys Kuzmenko Date: Mon, 26 Jan 2026 08:57:36 +0200 Subject: [PATCH 2/2] refactor --- .../apache/iceberg/hive/MetastoreUtil.java | 25 +++++----- .../mr/hive/HiveIcebergStorageHandler.java | 4 +- .../iceberg/mr/hive/IcebergTableUtil.java | 48 +++---------------- .../compaction/IcebergCompactionUtil.java | 40 ++++++++++++++++ .../compaction/IcebergQueryCompactor.java | 2 +- .../compaction/IcebergTableOptimizer.java | 7 ++- 6 files changed, 65 insertions(+), 61 deletions(-) diff --git a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/MetastoreUtil.java b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/MetastoreUtil.java index 95e1e5b36623..0e6a8b80bafd 100644 --- a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/MetastoreUtil.java +++ b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/MetastoreUtil.java @@ -22,7 +22,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.StatsSetupConst; @@ -46,6 +45,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; import org.apache.thrift.TException; public class MetastoreUtil { @@ -112,16 +112,17 @@ public static void alterTable( } } - public static List getPartitionKeys(org.apache.iceberg.Table table, int specId) { - Schema schema = table.specs().get(specId).schema(); - List hiveSchema = HiveSchemaUtil.convert(schema); - Map colNameToColType = hiveSchema.stream() - .collect(Collectors.toMap(FieldSchema::getName, FieldSchema::getType)); - return table.specs().get(specId).fields().stream() - .map(partField -> new FieldSchema( - schema.findColumnName(partField.sourceId()), - colNameToColType.get(schema.findColumnName(partField.sourceId())), - String.format("Transform: %s", partField.transform().toString())) + public static List getPartitionKeys(org.apache.iceberg.Table table) { + Schema schema = table.spec().schema(); + + return table.spec().fields().stream() + .map(partField -> { + Types.NestedField col = schema.findField(partField.sourceId()); + return new FieldSchema( + col.name(), + HiveSchemaUtil.convertToTypeString(col.type()), + col.doc()); + } ) .toList(); } @@ -134,7 +135,7 @@ public static Table toHiveTable(org.apache.iceberg.Table table, Configuration co result.setDbName(tableName.getDb()); result.setTableName(tableName.getTable()); result.setTableType(TableType.EXTERNAL_TABLE.toString()); - result.setPartitionKeys(getPartitionKeys(table, table.spec().specId())); + result.setPartitionKeys(getPartitionKeys(table)); TableMetadata metadata = ((BaseTable) table).operations().current(); long maxHiveTablePropertySize = conf.getLong(HiveOperationsBase.HIVE_TABLE_PROPERTY_MAX_SIZE, HiveOperationsBase.HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT); diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index 7554de2c588a..cacea39a572a 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -2056,7 +2056,7 @@ public boolean canUseTruncate(org.apache.hadoop.hive.ql.metadata.Table hmsTable, public List getPartitions(org.apache.hadoop.hive.ql.metadata.Table hmsTable, Map partitionSpec, boolean latestSpecOnly) throws SemanticException { List partNames = IcebergTableUtil.getPartitionNames(conf, hmsTable, partitionSpec, latestSpecOnly); - return IcebergTableUtil.convertNameToMetastorePartition(hmsTable, partNames); + return IcebergTableUtil.convertNameToHivePartition(hmsTable, partNames); } public boolean isPartitioned(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { @@ -2209,7 +2209,7 @@ public List getPartitionKeys(org.apache.hadoop.hive.ql.metadata.Tab return Collections.emptyList(); } Table icebergTable = IcebergTableUtil.getTable(conf, hmsTable.getTTable()); - return MetastoreUtil.getPartitionKeys(icebergTable, icebergTable.spec().specId()); + return MetastoreUtil.getPartitionKeys(icebergTable); } @Override diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java index b85df34405d0..ad1bda67117f 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java @@ -29,7 +29,6 @@ import java.util.Map; import java.util.Optional; import java.util.Properties; -import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; @@ -40,7 +39,6 @@ import org.apache.commons.lang3.SerializationUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.type.TimestampTZ; import org.apache.hadoop.hive.common.type.TimestampTZUtil; import org.apache.hadoop.hive.conf.HiveConf; @@ -54,7 +52,6 @@ import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.metadata.DummyPartition; -import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec; @@ -64,8 +61,6 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionStateUtil; -import org.apache.hadoop.util.Sets; -import org.apache.iceberg.ContentFile; import org.apache.iceberg.DeleteFiles; import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; @@ -696,26 +691,6 @@ private static Map.Entry extractPartitionDataAndSp spec); } - public static PartitionSpec getPartitionSpec(Table icebergTable, String partitionPath) - throws MetaException, HiveException { - if (icebergTable == null || partitionPath == null || partitionPath.isEmpty()) { - throw new HiveException("Table and partitionPath must not be null or empty."); - } - - // Extract field names from the path: "field1=val1/field2=val2" → [field1, field2] - List fieldNames = Lists.newArrayList(Warehouse.makeSpecFromName(partitionPath).keySet()); - - return icebergTable.specs().values().stream() - .filter(spec -> { - List specFieldNames = spec.fields().stream() - .map(PartitionField::name) - .toList(); - return specFieldNames.equals(fieldNames); - }) - .findFirst() // Supposed to be only one matching spec - .orElseThrow(() -> new HiveException("No matching partition spec found for partition path: " + partitionPath)); - } - public static TransformSpec getTransformSpec(Table table, String transformName, int sourceId) { TransformSpec spec = TransformSpec.fromString(transformName.toUpperCase(), table.schema().findColumnName(sourceId)); @@ -767,26 +742,15 @@ public static boolean hasUndergonePartitionEvolution(Table table) { .anyMatch(id -> id != table.spec().specId()); } - public static > Set getPartitionNames(Table icebergTable, Iterable files, - Boolean latestSpecOnly) { - Set partitions = Sets.newHashSet(); - int tableSpecId = icebergTable.spec().specId(); - for (T file : files) { - if (latestSpecOnly == null || latestSpecOnly.equals(file.specId() == tableSpecId)) { - String partName = icebergTable.specs().get(file.specId()).partitionToPath(file.partition()); - partitions.add(partName); - } - } - return partitions; - } - - public static List convertNameToMetastorePartition(org.apache.hadoop.hive.ql.metadata.Table hmsTable, + public static List convertNameToHivePartition(org.apache.hadoop.hive.ql.metadata.Table hmsTable, Collection partNames) { List partitions = Lists.newArrayList(); for (String partName : partNames) { - Map partSpecMap = Maps.newLinkedHashMap(); - Warehouse.makeSpecFromName(partSpecMap, new Path(partName), null); - partitions.add(new DummyPartition(hmsTable, partName, partSpecMap)); + try { + partitions.add(new DummyPartition(hmsTable, partName, Warehouse.makeSpecFromName(partName))); + } catch (MetaException e) { + LOG.error(e.getMessage(), e); + } } return partitions; } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergCompactionUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergCompactionUtil.java index 63f1c3edf217..04046e461965 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergCompactionUtil.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergCompactionUtil.java @@ -19,12 +19,19 @@ package org.apache.iceberg.mr.hive.compaction; import java.util.List; +import java.util.Set; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.util.Sets; import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.MetadataTableUtils; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PositionDeletesScanTask; import org.apache.iceberg.ScanTask; import org.apache.iceberg.Table; @@ -100,4 +107,37 @@ public static List getDeleteFiles(Table table, String partitionPath) return Lists.newArrayList(CloseableIterable.transform(filteredDeletesScanTasks, t -> ((PositionDeletesScanTask) t).file())); } + + static PartitionSpec getPartitionSpec(Table icebergTable, String partitionPath) + throws MetaException, HiveException { + if (icebergTable == null || partitionPath == null || partitionPath.isEmpty()) { + throw new HiveException("Table and partitionPath must not be null or empty."); + } + + // Extract field names from the path: "field1=val1/field2=val2" → [field1, field2] + List fieldNames = Lists.newArrayList(Warehouse.makeSpecFromName(partitionPath).keySet()); + + return icebergTable.specs().values().stream() + .filter(spec -> { + List specFieldNames = spec.fields().stream() + .map(PartitionField::name) + .toList(); + return specFieldNames.equals(fieldNames); + }) + .findFirst() // Supposed to be only one matching spec + .orElseThrow(() -> new HiveException("No matching partition spec found for partition path: " + partitionPath)); + } + + static > Set getPartitionNames(Table icebergTable, Iterable files, + boolean latestSpecOnly) { + Set partitions = Sets.newHashSet(); + int tableSpecId = icebergTable.spec().specId(); + for (T file : files) { + if (latestSpecOnly == (file.specId() == tableSpecId)) { + String partName = icebergTable.specs().get(file.specId()).partitionToPath(file.partition()); + partitions.add(partName); + } + } + return partitions; + } } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergQueryCompactor.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergQueryCompactor.java index 9c4a73cc7d0b..9ca788b89e40 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergQueryCompactor.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergQueryCompactor.java @@ -135,7 +135,7 @@ private String buildCompactionQuery(CompactorContext context, String compactTabl PartitionSpec spec; String partitionPredicate; try { - spec = IcebergTableUtil.getPartitionSpec(icebergTable, ci.partName); + spec = IcebergCompactionUtil.getPartitionSpec(icebergTable, ci.partName); partitionPredicate = buildPartitionPredicate(ci, spec); } catch (MetaException e) { throw new HiveException(e); diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java index dfc352ee0dad..e9d3b0bd5137 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java @@ -167,14 +167,13 @@ private void addCompactionTargetIfEligible(Table table, org.apache.iceberg.Table * @param pastSnapshotTimeMil The timestamp in milliseconds of the snapshot to check from (exclusive). * @param latestSpecOnly when True, returns partitions with the current spec only; * False - older specs only; - * Null - any spec * @return A List of {@link org.apache.hadoop.hive.ql.metadata.Partition} representing the unique modified * partition names. * @throws IllegalArgumentException if snapshot IDs are invalid or out of order, or if the table has no current * snapshot. */ private List findModifiedPartitions(org.apache.hadoop.hive.ql.metadata.Table hiveTable, - org.apache.iceberg.Table icebergTable, Long pastSnapshotTimeMil, Boolean latestSpecOnly) { + org.apache.iceberg.Table icebergTable, Long pastSnapshotTimeMil, boolean latestSpecOnly) { List relevantSnapshots = getRelevantSnapshots(icebergTable, pastSnapshotTimeMil).toList(); if (relevantSnapshots.isEmpty()) { @@ -192,7 +191,7 @@ private List findModifiedPartitions(org.apache.hadoop.hive.ql.metadat snapshot.addedDeleteFiles(io), snapshot.removedDeleteFiles(io)) .toList(); - return IcebergTableUtil.getPartitionNames(icebergTable, affectedFiles, latestSpecOnly); + return IcebergCompactionUtil.getPartitionNames(icebergTable, affectedFiles, latestSpecOnly); })) .toList(); @@ -202,7 +201,7 @@ private List findModifiedPartitions(org.apache.hadoop.hive.ql.metadat modifiedPartitions.addAll(future.get()); } - return IcebergTableUtil.convertNameToMetastorePartition(hiveTable, modifiedPartitions); + return IcebergTableUtil.convertNameToHivePartition(hiveTable, modifiedPartitions); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeMetaException(e, "Interrupted while finding modified partitions");