Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.StatsSetupConst;
Expand All @@ -46,6 +45,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.apache.thrift.TException;

public class MetastoreUtil {
Expand Down Expand Up @@ -112,16 +112,17 @@ public static void alterTable(
}
}

public static List<FieldSchema> getPartitionKeys(org.apache.iceberg.Table table, int specId) {
Schema schema = table.specs().get(specId).schema();
List<FieldSchema> hiveSchema = HiveSchemaUtil.convert(schema);
Map<String, String> colNameToColType = hiveSchema.stream()
.collect(Collectors.toMap(FieldSchema::getName, FieldSchema::getType));
return table.specs().get(specId).fields().stream()
.map(partField -> new FieldSchema(
schema.findColumnName(partField.sourceId()),
colNameToColType.get(schema.findColumnName(partField.sourceId())),
String.format("Transform: %s", partField.transform().toString()))
public static List<FieldSchema> getPartitionKeys(org.apache.iceberg.Table table) {
Schema schema = table.spec().schema();

return table.spec().fields().stream()
.map(partField -> {
Types.NestedField col = schema.findField(partField.sourceId());
return new FieldSchema(
col.name(),
HiveSchemaUtil.convertToTypeString(col.type()),
col.doc());
}
)
.toList();
}
Expand All @@ -134,7 +135,7 @@ public static Table toHiveTable(org.apache.iceberg.Table table, Configuration co
result.setDbName(tableName.getDb());
result.setTableName(tableName.getTable());
result.setTableType(TableType.EXTERNAL_TABLE.toString());
result.setPartitionKeys(getPartitionKeys(table, table.spec().specId()));
result.setPartitionKeys(getPartitionKeys(table));
TableMetadata metadata = ((BaseTable) table).operations().current();
long maxHiveTablePropertySize = conf.getLong(HiveOperationsBase.HIVE_TABLE_PROPERTY_MAX_SIZE,
HiveOperationsBase.HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2056,7 +2056,7 @@ public boolean canUseTruncate(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
public List<Partition> getPartitions(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
Map<String, String> partitionSpec, boolean latestSpecOnly) throws SemanticException {
List<String> partNames = IcebergTableUtil.getPartitionNames(conf, hmsTable, partitionSpec, latestSpecOnly);
return IcebergTableUtil.convertNameToMetastorePartition(hmsTable, partNames);
return IcebergTableUtil.convertNameToHivePartition(hmsTable, partNames);
}

public boolean isPartitioned(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
Expand Down Expand Up @@ -2209,7 +2209,7 @@ public List<FieldSchema> getPartitionKeys(org.apache.hadoop.hive.ql.metadata.Tab
return Collections.emptyList();
}
Table icebergTable = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
return MetastoreUtil.getPartitionKeys(icebergTable, icebergTable.spec().specId());
return MetastoreUtil.getPartitionKeys(icebergTable);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -40,7 +39,6 @@
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.type.TimestampTZ;
import org.apache.hadoop.hive.common.type.TimestampTZUtil;
import org.apache.hadoop.hive.conf.HiveConf;
Expand All @@ -54,7 +52,6 @@
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.metadata.DummyPartition;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec;
Expand All @@ -64,8 +61,6 @@
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
import org.apache.hadoop.util.Sets;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
Expand Down Expand Up @@ -696,26 +691,6 @@ private static Map.Entry<PartitionData, PartitionSpec> extractPartitionDataAndSp
spec);
}

public static PartitionSpec getPartitionSpec(Table icebergTable, String partitionPath)
throws MetaException, HiveException {
if (icebergTable == null || partitionPath == null || partitionPath.isEmpty()) {
throw new HiveException("Table and partitionPath must not be null or empty.");
}

// Extract field names from the path: "field1=val1/field2=val2" → [field1, field2]
List<String> fieldNames = Lists.newArrayList(Warehouse.makeSpecFromName(partitionPath).keySet());

return icebergTable.specs().values().stream()
.filter(spec -> {
List<String> specFieldNames = spec.fields().stream()
.map(PartitionField::name)
.toList();
return specFieldNames.equals(fieldNames);
})
.findFirst() // Supposed to be only one matching spec
.orElseThrow(() -> new HiveException("No matching partition spec found for partition path: " + partitionPath));
}

public static TransformSpec getTransformSpec(Table table, String transformName, int sourceId) {
TransformSpec spec = TransformSpec.fromString(transformName.toUpperCase(),
table.schema().findColumnName(sourceId));
Expand Down Expand Up @@ -767,26 +742,15 @@ public static boolean hasUndergonePartitionEvolution(Table table) {
.anyMatch(id -> id != table.spec().specId());
}

public static <T extends ContentFile<?>> Set<String> getPartitionNames(Table icebergTable, Iterable<T> files,
Boolean latestSpecOnly) {
Set<String> partitions = Sets.newHashSet();
int tableSpecId = icebergTable.spec().specId();
for (T file : files) {
if (latestSpecOnly == null || latestSpecOnly.equals(file.specId() == tableSpecId)) {
String partName = icebergTable.specs().get(file.specId()).partitionToPath(file.partition());
partitions.add(partName);
}
}
return partitions;
}

public static List<Partition> convertNameToMetastorePartition(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
public static List<Partition> convertNameToHivePartition(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
Collection<String> partNames) {
List<Partition> partitions = Lists.newArrayList();
for (String partName : partNames) {
Map<String, String> partSpecMap = Maps.newLinkedHashMap();
Warehouse.makeSpecFromName(partSpecMap, new Path(partName), null);
partitions.add(new DummyPartition(hmsTable, partName, partSpecMap));
try {
partitions.add(new DummyPartition(hmsTable, partName, Warehouse.makeSpecFromName(partName)));
} catch (MetaException e) {
LOG.error(e.getMessage(), e);
}
}
return partitions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,19 @@
package org.apache.iceberg.mr.hive.compaction;

import java.util.List;
import java.util.Set;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.util.Sets;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PositionDeletesScanTask;
import org.apache.iceberg.ScanTask;
import org.apache.iceberg.Table;
Expand Down Expand Up @@ -100,4 +107,37 @@ public static List<DeleteFile> getDeleteFiles(Table table, String partitionPath)
return Lists.newArrayList(CloseableIterable.transform(filteredDeletesScanTasks,
t -> ((PositionDeletesScanTask) t).file()));
}

static PartitionSpec getPartitionSpec(Table icebergTable, String partitionPath)
throws MetaException, HiveException {
if (icebergTable == null || partitionPath == null || partitionPath.isEmpty()) {
throw new HiveException("Table and partitionPath must not be null or empty.");
}

// Extract field names from the path: "field1=val1/field2=val2" → [field1, field2]
List<String> fieldNames = Lists.newArrayList(Warehouse.makeSpecFromName(partitionPath).keySet());

return icebergTable.specs().values().stream()
.filter(spec -> {
List<String> specFieldNames = spec.fields().stream()
.map(PartitionField::name)
.toList();
return specFieldNames.equals(fieldNames);
})
.findFirst() // Supposed to be only one matching spec
.orElseThrow(() -> new HiveException("No matching partition spec found for partition path: " + partitionPath));
}

static <T extends ContentFile<?>> Set<String> getPartitionNames(Table icebergTable, Iterable<T> files,
boolean latestSpecOnly) {
Set<String> partitions = Sets.newHashSet();
int tableSpecId = icebergTable.spec().specId();
for (T file : files) {
if (latestSpecOnly == (file.specId() == tableSpecId)) {
String partName = icebergTable.specs().get(file.specId()).partitionToPath(file.partition());
partitions.add(partName);
}
}
return partitions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ private String buildCompactionQuery(CompactorContext context, String compactTabl
PartitionSpec spec;
String partitionPredicate;
try {
spec = IcebergTableUtil.getPartitionSpec(icebergTable, ci.partName);
spec = IcebergCompactionUtil.getPartitionSpec(icebergTable, ci.partName);
partitionPredicate = buildPartitionPredicate(ci, spec);
} catch (MetaException e) {
throw new HiveException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,13 @@ private void addCompactionTargetIfEligible(Table table, org.apache.iceberg.Table
* @param pastSnapshotTimeMil The timestamp in milliseconds of the snapshot to check from (exclusive).
* @param latestSpecOnly when True, returns partitions with the current spec only;
* False - older specs only;
* Null - any spec
* @return A List of {@link org.apache.hadoop.hive.ql.metadata.Partition} representing the unique modified
* partition names.
* @throws IllegalArgumentException if snapshot IDs are invalid or out of order, or if the table has no current
* snapshot.
*/
private List<Partition> findModifiedPartitions(org.apache.hadoop.hive.ql.metadata.Table hiveTable,
org.apache.iceberg.Table icebergTable, Long pastSnapshotTimeMil, Boolean latestSpecOnly) {
org.apache.iceberg.Table icebergTable, Long pastSnapshotTimeMil, boolean latestSpecOnly) {

List<Snapshot> relevantSnapshots = getRelevantSnapshots(icebergTable, pastSnapshotTimeMil).toList();
if (relevantSnapshots.isEmpty()) {
Expand All @@ -192,7 +191,7 @@ private List<Partition> findModifiedPartitions(org.apache.hadoop.hive.ql.metadat
snapshot.addedDeleteFiles(io),
snapshot.removedDeleteFiles(io))
.toList();
return IcebergTableUtil.getPartitionNames(icebergTable, affectedFiles, latestSpecOnly);
return IcebergCompactionUtil.getPartitionNames(icebergTable, affectedFiles, latestSpecOnly);
}))
.toList();

Expand All @@ -202,7 +201,7 @@ private List<Partition> findModifiedPartitions(org.apache.hadoop.hive.ql.metadat
modifiedPartitions.addAll(future.get());
}

return IcebergTableUtil.convertNameToMetastorePartition(hiveTable, modifiedPartitions);
return IcebergTableUtil.convertNameToHivePartition(hiveTable, modifiedPartitions);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeMetaException(e, "Interrupted while finding modified partitions");
Expand Down
8 changes: 3 additions & 5 deletions ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,8 @@ public boolean equals(Object obj) {
public List<FieldSchema> getPartCols() {
List<FieldSchema> partKeys = tTable.getPartitionKeys();
if (partKeys == null) {
partKeys = new ArrayList<>();
partKeys = hasNonNativePartitionSupport() ?
getStorageHandler().getPartitionKeys(this) : new ArrayList<>();
tTable.setPartitionKeys(partKeys);
}
return partKeys;
Expand All @@ -625,10 +626,7 @@ public FieldSchema getPartColByName(String colName) {
}

public List<String> getPartColNames() {
List<FieldSchema> partCols = hasNonNativePartitionSupport() ?
getStorageHandler().getPartitionKeys(this) : getPartCols();
return partCols.stream().map(FieldSchema::getName)
.collect(Collectors.toList());
return getPartCols().stream().map(FieldSchema::getName).toList();
}

public boolean hasNonNativePartitionSupport() {
Expand Down