Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@
import org.apache.hadoop.hive.ql.Context.RewritePolicy;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.ddl.misc.msck.MsckDesc;
import org.apache.hadoop.hive.ql.ddl.misc.msck.MsckResult;
import org.apache.hadoop.hive.ql.ddl.table.AbstractAlterTableDesc;
import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
import org.apache.hadoop.hive.ql.ddl.table.create.CreateTableDesc;
Expand Down Expand Up @@ -185,6 +187,7 @@
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.hive.actions.HiveIcebergDeleteOrphanFiles;
import org.apache.iceberg.mr.hive.actions.HiveIcebergRepairTable;
import org.apache.iceberg.mr.hive.plan.IcebergBucketFunction;
import org.apache.iceberg.mr.hive.udf.GenericUDFIcebergZorder;
import org.apache.iceberg.parquet.VariantUtil;
Expand Down Expand Up @@ -1138,7 +1141,7 @@ public void executeOperation(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
(AlterTableExecuteSpec.ExpireSnapshotsSpec) executeSpec.getOperationParams();
int numThreads = conf.getInt(ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.varname,
ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.defaultIntVal);
expireSnapshot(icebergTable, expireSnapshotsSpec, numThreads);
expireSnapshots(icebergTable, expireSnapshotsSpec, numThreads);
break;
case SET_CURRENT_SNAPSHOT:
AlterTableExecuteSpec.SetCurrentSnapshotSpec setSnapshotVersionSpec =
Expand Down Expand Up @@ -1182,8 +1185,8 @@ private void deleteOrphanFiles(Table icebergTable, long timestampMillis, int num
try {
if (numThreads > 0) {
LOG.info("Executing delete orphan files on iceberg table {} with {} threads", icebergTable.name(), numThreads);
deleteExecutorService = IcebergTableUtil.newDeleteThreadPool(icebergTable.name(),
numThreads);
deleteExecutorService = IcebergTableUtil.newFixedThreadPool(
"delete-orphan-files" + icebergTable.name(), numThreads);
}

HiveIcebergDeleteOrphanFiles deleteOrphanFiles = new HiveIcebergDeleteOrphanFiles(conf, icebergTable);
Expand All @@ -1200,13 +1203,14 @@ private void deleteOrphanFiles(Table icebergTable, long timestampMillis, int num
}
}

private void expireSnapshot(Table icebergTable, AlterTableExecuteSpec.ExpireSnapshotsSpec expireSnapshotsSpec,
private void expireSnapshots(Table icebergTable, AlterTableExecuteSpec.ExpireSnapshotsSpec expireSnapshotsSpec,
int numThreads) {
ExecutorService deleteExecutorService = null;
try {
if (numThreads > 0) {
LOG.info("Executing expire snapshots on iceberg table {} with {} threads", icebergTable.name(), numThreads);
deleteExecutorService = IcebergTableUtil.newDeleteThreadPool(icebergTable.name(), numThreads);
deleteExecutorService = IcebergTableUtil.newFixedThreadPool(
"expire-snapshots-" + icebergTable.name(), numThreads);
}
if (expireSnapshotsSpec == null) {
expireSnapshotWithDefaultParams(icebergTable, deleteExecutorService);
Expand Down Expand Up @@ -2306,6 +2310,23 @@ public boolean supportsDefaultColumnValues(Map<String, String> tblProps) {
return IcebergTableUtil.formatVersion(tblProps) >= 3;
}

@Override
public MsckResult repair(org.apache.hadoop.hive.ql.metadata.Table hmsTable, HiveConf conf, MsckDesc desc)
throws HiveException {
LOG.info("Starting Iceberg table repair{} for {}", desc.isRepair() ? "" : " (dry-run)",
hmsTable.getFullyQualifiedName());
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());

HiveIcebergRepairTable repair = new HiveIcebergRepairTable(table, desc);
try {
return repair.execute();
} catch (Exception e) {
String errorMsg = String.format("Failed to repair Iceberg table %s: %s",
hmsTable.getFullyQualifiedName(), e.getMessage());
throw new HiveException(errorMsg, e);
}
}


private static List<FieldSchema> schema(List<VirtualColumn> exprs) {
return exprs.stream().map(v ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ThreadFactory;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -103,6 +103,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -749,13 +750,12 @@ public static <T> List<T> readColStats(Table table, Long snapshotId, Predicate<B
return colStats;
}

public static ExecutorService newDeleteThreadPool(String completeName, int numThreads) {
AtomicInteger deleteThreadsIndex = new AtomicInteger(0);
return Executors.newFixedThreadPool(numThreads, runnable -> {
Thread thread = new Thread(runnable);
thread.setName("remove-snapshot-" + completeName + "-" + deleteThreadsIndex.getAndIncrement());
return thread;
});
public static ExecutorService newFixedThreadPool(String threadName, int numThreads) {
ThreadFactory threadFactory =
new ThreadFactoryBuilder()
.setNameFormat(threadName + "-%d")
.build();
return Executors.newFixedThreadPool(numThreads, threadFactory);
}

public static boolean hasUndergonePartitionEvolution(Table table) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.iceberg.mr.hive.actions;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.stream.StreamSupport;
import org.apache.hadoop.hive.ql.ddl.misc.msck.MsckDesc;
import org.apache.hadoop.hive.ql.ddl.misc.msck.MsckResult;
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.mr.hive.IcebergTableUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Repairs an Iceberg table by removing dangling file references.
* <p>
* Detects and removes references to data files that are missing from the filesystem
* but still referenced in metadata. Supports dry-run mode and parallel execution.
*/
public class HiveIcebergRepairTable {

private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergRepairTable.class);
private static final int DEFAULT_NUM_THREADS = 4;

private final Table table;
private final MsckDesc desc;
private final int numThreads;

public HiveIcebergRepairTable(Table table, MsckDesc desc) {
this(table, desc, DEFAULT_NUM_THREADS);
}

public HiveIcebergRepairTable(Table table, MsckDesc desc, int numThreads) {
this.table = table;
this.desc = desc;
this.numThreads = numThreads;
}

/**
* Executes the repair operation within a provided transaction.
*
* @param transaction the Iceberg transaction to use
* @return repair result containing number of issues fixed and log message
* @throws IOException if metadata validation or file check fails
*/
public MsckResult execute(Transaction transaction) throws IOException {
List<String> missingFiles = getMissingFiles();

if (missingFiles.isEmpty()) {
String msg = "No missing files detected";
LOG.info(msg);
return new MsckResult(0, msg, new java.util.ArrayList<>());
} else if (desc.isRepair()) {
// Only commit changes if not in dry-run mode
DeleteFiles deleteFiles = transaction.newDelete();
for (String path : missingFiles) {
deleteFiles.deleteFile(path);
}
deleteFiles.commit();
}

String summaryMsg = desc.isRepair() ?
"Removed %d dangling file reference(s)".formatted(missingFiles.size()) :
"Would remove %d dangling file reference(s)".formatted(missingFiles.size());
LOG.info(summaryMsg);

String detailedMsg = desc.isRepair() ?
"Iceberg table repair completed: %s".formatted(summaryMsg) :
"Iceberg table repair (dry-run): %s".formatted(summaryMsg);

return new MsckResult(missingFiles.size(), detailedMsg, missingFiles);
}

/**
* Executes the repair operation, automatically creating and committing a transaction.
*
* @return repair result containing removed files and statistics
* @throws IOException if metadata validation or file check fails
*/
public MsckResult execute() throws IOException {
Transaction transaction = table.newTransaction();
MsckResult result = execute(transaction);
if (desc.isRepair() && result.numIssues() > 0) {
transaction.commitTransaction();
}
return result;
}

/**
* Finds all missing data files by checking their physical existence in parallel.
*
* @return list of file paths for missing data files
* @throws IOException if the file check operation fails or is interrupted
*/
private List<String> getMissingFiles() throws IOException {
try (ExecutorService executorService = IcebergTableUtil.newFixedThreadPool(
"repair-table-" + table.name(), numThreads);
CloseableIterable<FileScanTask> fileScanTasks = table.newScan().planFiles()) {
return executorService.submit(() ->
StreamSupport.stream(fileScanTasks.spliterator(), true)
.map(task -> task.file().location())
.filter(path -> !table.io().newInputFile(path).exists())
.toList()
).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while checking for missing files", e);

} catch (ExecutionException e) {
throw new IOException("Failed to check for missing files: " + e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void setConf(Configuration configuration) {
HiveConf.ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.defaultIntVal);
if (numThreads > 0) {
LOG.info("Will expire Iceberg snapshots using an executor service with {} threads", numThreads);
deleteExecutorService = IcebergTableUtil.newDeleteThreadPool("iceberg-housekeeper-service", numThreads);
deleteExecutorService = IcebergTableUtil.newFixedThreadPool("iceberg-housekeeper-service", numThreads);
}
}
}
Loading