diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index 7554de2c588a..6e7b33607df9 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -76,6 +76,8 @@ import org.apache.hadoop.hive.ql.Context.RewritePolicy; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.ddl.misc.msck.MsckDesc; +import org.apache.hadoop.hive.ql.ddl.misc.msck.MsckResult; import org.apache.hadoop.hive.ql.ddl.table.AbstractAlterTableDesc; import org.apache.hadoop.hive.ql.ddl.table.AlterTableType; import org.apache.hadoop.hive.ql.ddl.table.create.CreateTableDesc; @@ -185,6 +187,7 @@ import org.apache.iceberg.mr.Catalogs; import org.apache.iceberg.mr.InputFormatConfig; import org.apache.iceberg.mr.hive.actions.HiveIcebergDeleteOrphanFiles; +import org.apache.iceberg.mr.hive.actions.HiveIcebergRepairTable; import org.apache.iceberg.mr.hive.plan.IcebergBucketFunction; import org.apache.iceberg.mr.hive.udf.GenericUDFIcebergZorder; import org.apache.iceberg.parquet.VariantUtil; @@ -1138,7 +1141,7 @@ public void executeOperation(org.apache.hadoop.hive.ql.metadata.Table hmsTable, (AlterTableExecuteSpec.ExpireSnapshotsSpec) executeSpec.getOperationParams(); int numThreads = conf.getInt(ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.varname, ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.defaultIntVal); - expireSnapshot(icebergTable, expireSnapshotsSpec, numThreads); + expireSnapshots(icebergTable, expireSnapshotsSpec, numThreads); break; case SET_CURRENT_SNAPSHOT: AlterTableExecuteSpec.SetCurrentSnapshotSpec setSnapshotVersionSpec = @@ -1182,8 +1185,8 @@ private void deleteOrphanFiles(Table icebergTable, long timestampMillis, int num try { if (numThreads > 0) { LOG.info("Executing delete orphan files on iceberg table {} with {} threads", icebergTable.name(), numThreads); - deleteExecutorService = IcebergTableUtil.newDeleteThreadPool(icebergTable.name(), - numThreads); + deleteExecutorService = IcebergTableUtil.newFixedThreadPool( + "delete-orphan-files" + icebergTable.name(), numThreads); } HiveIcebergDeleteOrphanFiles deleteOrphanFiles = new HiveIcebergDeleteOrphanFiles(conf, icebergTable); @@ -1200,13 +1203,14 @@ private void deleteOrphanFiles(Table icebergTable, long timestampMillis, int num } } - private void expireSnapshot(Table icebergTable, AlterTableExecuteSpec.ExpireSnapshotsSpec expireSnapshotsSpec, + private void expireSnapshots(Table icebergTable, AlterTableExecuteSpec.ExpireSnapshotsSpec expireSnapshotsSpec, int numThreads) { ExecutorService deleteExecutorService = null; try { if (numThreads > 0) { LOG.info("Executing expire snapshots on iceberg table {} with {} threads", icebergTable.name(), numThreads); - deleteExecutorService = IcebergTableUtil.newDeleteThreadPool(icebergTable.name(), numThreads); + deleteExecutorService = IcebergTableUtil.newFixedThreadPool( + "expire-snapshots-" + icebergTable.name(), numThreads); } if (expireSnapshotsSpec == null) { expireSnapshotWithDefaultParams(icebergTable, deleteExecutorService); @@ -2306,6 +2310,23 @@ public boolean supportsDefaultColumnValues(Map tblProps) { return IcebergTableUtil.formatVersion(tblProps) >= 3; } + @Override + public MsckResult repair(org.apache.hadoop.hive.ql.metadata.Table hmsTable, HiveConf conf, MsckDesc desc) + throws HiveException { + LOG.info("Starting Iceberg table repair{} for {}", desc.isRepair() ? "" : " (dry-run)", + hmsTable.getFullyQualifiedName()); + Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable()); + + HiveIcebergRepairTable repair = new HiveIcebergRepairTable(table, desc); + try { + return repair.execute(); + } catch (Exception e) { + String errorMsg = String.format("Failed to repair Iceberg table %s: %s", + hmsTable.getFullyQualifiedName(), e.getMessage()); + throw new HiveException(errorMsg, e); + } + } + private static List schema(List exprs) { return exprs.stream().map(v -> diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java index b85df34405d0..3be7d36b370d 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java @@ -32,7 +32,7 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ThreadFactory; import java.util.function.BinaryOperator; import java.util.function.Function; import java.util.function.Predicate; @@ -103,6 +103,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.iceberg.transforms.Transform; import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Types; @@ -749,13 +750,12 @@ public static List readColStats(Table table, Long snapshotId, Predicate { - Thread thread = new Thread(runnable); - thread.setName("remove-snapshot-" + completeName + "-" + deleteThreadsIndex.getAndIncrement()); - return thread; - }); + public static ExecutorService newFixedThreadPool(String threadName, int numThreads) { + ThreadFactory threadFactory = + new ThreadFactoryBuilder() + .setNameFormat(threadName + "-%d") + .build(); + return Executors.newFixedThreadPool(numThreads, threadFactory); } public static boolean hasUndergonePartitionEvolution(Table table) { diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/actions/HiveIcebergRepairTable.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/actions/HiveIcebergRepairTable.java new file mode 100644 index 000000000000..7deff2b67caa --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/actions/HiveIcebergRepairTable.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.mr.hive.actions; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.stream.StreamSupport; +import org.apache.hadoop.hive.ql.ddl.misc.msck.MsckDesc; +import org.apache.hadoop.hive.ql.ddl.misc.msck.MsckResult; +import org.apache.iceberg.DeleteFiles; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.mr.hive.IcebergTableUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Repairs an Iceberg table by removing dangling file references. + *

+ * Detects and removes references to data files that are missing from the filesystem + * but still referenced in metadata. Supports dry-run mode and parallel execution. + */ +public class HiveIcebergRepairTable { + + private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergRepairTable.class); + private static final int DEFAULT_NUM_THREADS = 4; + + private final Table table; + private final MsckDesc desc; + private final int numThreads; + + public HiveIcebergRepairTable(Table table, MsckDesc desc) { + this(table, desc, DEFAULT_NUM_THREADS); + } + + public HiveIcebergRepairTable(Table table, MsckDesc desc, int numThreads) { + this.table = table; + this.desc = desc; + this.numThreads = numThreads; + } + + /** + * Executes the repair operation within a provided transaction. + * + * @param transaction the Iceberg transaction to use + * @return repair result containing number of issues fixed and log message + * @throws IOException if metadata validation or file check fails + */ + public MsckResult execute(Transaction transaction) throws IOException { + List missingFiles = getMissingFiles(); + + if (missingFiles.isEmpty()) { + String msg = "No missing files detected"; + LOG.info(msg); + return new MsckResult(0, msg, new java.util.ArrayList<>()); + } else if (desc.isRepair()) { + // Only commit changes if not in dry-run mode + DeleteFiles deleteFiles = transaction.newDelete(); + for (String path : missingFiles) { + deleteFiles.deleteFile(path); + } + deleteFiles.commit(); + } + + String summaryMsg = desc.isRepair() ? + "Removed %d dangling file reference(s)".formatted(missingFiles.size()) : + "Would remove %d dangling file reference(s)".formatted(missingFiles.size()); + LOG.info(summaryMsg); + + String detailedMsg = desc.isRepair() ? + "Iceberg table repair completed: %s".formatted(summaryMsg) : + "Iceberg table repair (dry-run): %s".formatted(summaryMsg); + + return new MsckResult(missingFiles.size(), detailedMsg, missingFiles); + } + + /** + * Executes the repair operation, automatically creating and committing a transaction. + * + * @return repair result containing removed files and statistics + * @throws IOException if metadata validation or file check fails + */ + public MsckResult execute() throws IOException { + Transaction transaction = table.newTransaction(); + MsckResult result = execute(transaction); + if (desc.isRepair() && result.numIssues() > 0) { + transaction.commitTransaction(); + } + return result; + } + + /** + * Finds all missing data files by checking their physical existence in parallel. + * + * @return list of file paths for missing data files + * @throws IOException if the file check operation fails or is interrupted + */ + private List getMissingFiles() throws IOException { + try (ExecutorService executorService = IcebergTableUtil.newFixedThreadPool( + "repair-table-" + table.name(), numThreads); + CloseableIterable fileScanTasks = table.newScan().planFiles()) { + return executorService.submit(() -> + StreamSupport.stream(fileScanTasks.spliterator(), true) + .map(task -> task.file().location()) + .filter(path -> !table.io().newInputFile(path).exists()) + .toList() + ).get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while checking for missing files", e); + + } catch (ExecutionException e) { + throw new IOException("Failed to check for missing files: " + e.getMessage(), e); + } + } +} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java index 13ab0ca1ab2b..984359b65e88 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java @@ -114,7 +114,7 @@ public void setConf(Configuration configuration) { HiveConf.ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.defaultIntVal); if (numThreads > 0) { LOG.info("Will expire Iceberg snapshots using an executor service with {} threads", numThreads); - deleteExecutorService = IcebergTableUtil.newDeleteThreadPool("iceberg-housekeeper-service", numThreads); + deleteExecutorService = IcebergTableUtil.newFixedThreadPool("iceberg-housekeeper-service", numThreads); } } } diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/actions/TestHiveIcebergRepairTable.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/actions/TestHiveIcebergRepairTable.java new file mode 100644 index 000000000000..174464efe787 --- /dev/null +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/actions/TestHiveIcebergRepairTable.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.mr.hive.actions; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.ddl.misc.msck.MsckDesc; +import org.apache.hadoop.hive.ql.ddl.misc.msck.MsckResult; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.types.Types; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for HiveIcebergRepairTable functionality. + */ +public class TestHiveIcebergRepairTable { + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private HadoopCatalog catalog; + private File warehouseDir; + + private TableIdentifier tableId; + private Table table; + private MsckDesc msckDesc; + + @Before + public void before() throws IOException { + Configuration conf = new Configuration(); + Schema schema = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get()) + ); + // Create a table + warehouseDir = temp.newFolder("warehouse"); + catalog = new HadoopCatalog(conf, warehouseDir.getAbsolutePath()); + + tableId = TableIdentifier.of("db", "test_table"); + table = catalog.createTable(tableId, schema, PartitionSpec.unpartitioned()); + + msckDesc = new MsckDesc(tableId.name(), null, + new Path(warehouseDir.getAbsolutePath()), true, false, false); + } + + @After + public void after() throws Exception { + if (catalog != null) { + catalog.dropTable(tableId); + catalog.close(); + } + } + + @Test + public void testRepairWithNoMissingFiles() throws Exception { + // Add a data file that exists + File dataFile = temp.newFile("data1.parquet"); + Files.write(dataFile.toPath(), "test data".getBytes()); + + DataFile file = newDataFile(dataFile); + + table.newAppend() + .appendFile(file) + .commit(); + countFiles(table); + // Run repair - should find no missing files + HiveIcebergRepairTable repair = new HiveIcebergRepairTable(table, msckDesc); + MsckResult result = repair.execute(); + + assertEquals("No files should be removed when all files exist", 0, result.numIssues()); + + // Verify the file is still there + long fileCountAfter = countFiles(table); + assertEquals("File should still be in metadata", 1, fileCountAfter); + } + + @Test + public void testRepairWithMissingFiles() throws Exception { + // Add two data files - one that exists and one that doesn't + File existingFile = temp.newFile("existing.parquet"); + Files.write(existingFile.toPath(), "test data".getBytes()); + + File missingFile = new File(temp.getRoot(), "missing.parquet"); + // Don't create this file - it will be missing + + DataFile existingDataFile = newDataFile(existingFile); + DataFile missingDataFile = newDataFile(missingFile); + + table.newAppend() + .appendFile(existingDataFile) + .appendFile(missingDataFile) + .commit(); + + // Verify we have 2 files before repair + long fileCountBefore = countFiles(table); + assertEquals("Should have 2 files before repair", 2, fileCountBefore); + + // Run repair - should remove the missing file reference + HiveIcebergRepairTable repair = new HiveIcebergRepairTable(table, msckDesc); + MsckResult result = repair.execute(); + + assertEquals("One file reference should be removed", 1, result.numIssues()); + + // Verify only one file remains after repair + long fileCountAfter = countFiles(table); + assertEquals("Should have 1 file after repair", 1, fileCountAfter); + } + + @Test + public void testRepairWithMultipleMissingFiles() throws Exception { + // Add multiple missing files + for (int i = 0; i < 3; i++) { + File missingFile = new File(temp.getRoot(), "missing_" + i + ".parquet"); + + DataFile missingDataFile = newDataFile(missingFile); + + table.newAppend() + .appendFile(missingDataFile) + .commit(); + } + + // Run repair + HiveIcebergRepairTable repair = new HiveIcebergRepairTable(table, msckDesc); + MsckResult result = repair.execute(); + + assertEquals("Three file references should be removed", 3, result.numIssues()); + } + + @Test + public void testRepairDryRunMode() throws Exception { + // Add a missing file + File missingFile = new File(temp.getRoot(), "missing.parquet"); + DataFile missingDataFile = newDataFile(missingFile); + + table.newAppend() + .appendFile(missingDataFile) + .commit(); + + long fileCountBefore = countFiles(table); + assertEquals("Should have 1 file before repair", 1, fileCountBefore); + + MsckDesc dryRun = new MsckDesc(tableId.name(), null, + new Path(warehouseDir.getAbsolutePath()), false, false, false); + // Run repair in dry-run mode + HiveIcebergRepairTable repair = new HiveIcebergRepairTable(table, dryRun); + MsckResult result = repair.execute(); + + // Should identify the missing file + assertEquals("Should identify 1 missing file", 1, result.numIssues()); + + // But file should still be in metadata (dry-run doesn't commit) + table.refresh(); + long fileCountAfter = countFiles(table); + assertEquals("File should still be in metadata after dry-run", 1, fileCountAfter); + } + + private static DataFile newDataFile(File file) { + return DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath(file.getAbsolutePath()) + .withFileSizeInBytes(10) + .withRecordCount(1) + .withFormat(FileFormat.PARQUET) + .build(); + } + + /** + * Count the number of files in the table, properly closing resources. + */ + private static long countFiles(Table table) { + return Long.parseLong(table.currentSnapshot().summary() + .get(SnapshotSummary.TOTAL_DATA_FILES_PROP)); + } +} diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_msck_repair.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_msck_repair.q new file mode 100644 index 000000000000..f3023aafc7da --- /dev/null +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_msck_repair.q @@ -0,0 +1,27 @@ +-- SORT_QUERY_RESULTS + +create external table ice_msck_repair_test (id int, name string) stored by iceberg stored as orc; + +-- First insert - creates some data files +insert into ice_msck_repair_test values (1, 'one'), (2, 'two'); + +-- List files in the data directory +dfs -ls ${hiveconf:hive.metastore.warehouse.dir}/ice_msck_repair_test/data/; + +-- Manually delete ALL data files from the first insert to simulate accidental deletion +dfs -rm ${hiveconf:hive.metastore.warehouse.dir}/ice_msck_repair_test/data/*.orc; + +-- Now insert more data - this creates new files, but metadata still references the deleted ones +insert into ice_msck_repair_test values (3, 'three'), (4, 'four'); +insert into ice_msck_repair_test values (5, 'five'), (6, 'six'); + +-- List files in the data directory +dfs -ls ${hiveconf:hive.metastore.warehouse.dir}/ice_msck_repair_test/data/; + +-- Run MSCK REPAIR to remove dangling file references for the deleted files +MSCK REPAIR TABLE ice_msck_repair_test; + +-- Verify the table works correctly after repair +select * from ice_msck_repair_test order by id; + +drop table ice_msck_repair_test; diff --git a/iceberg/iceberg-handler/src/test/results/positive/iceberg_msck_repair.q.out b/iceberg/iceberg-handler/src/test/results/positive/iceberg_msck_repair.q.out new file mode 100644 index 000000000000..b46b1096d477 --- /dev/null +++ b/iceberg/iceberg-handler/src/test/results/positive/iceberg_msck_repair.q.out @@ -0,0 +1,66 @@ +PREHOOK: query: create external table ice_msck_repair_test (id int, name string) stored by iceberg stored as orc +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@ice_msck_repair_test +POSTHOOK: query: create external table ice_msck_repair_test (id int, name string) stored by iceberg stored as orc +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@ice_msck_repair_test +PREHOOK: query: insert into ice_msck_repair_test values (1, 'one'), (2, 'two') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_msck_repair_test +POSTHOOK: query: insert into ice_msck_repair_test values (1, 'one'), (2, 'two') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_msck_repair_test +Found 1 items +-rw-rw-rw- 3 ### USER ### ### GROUP ### ### SIZE ### ### HDFS DATE ### hdfs://### HDFS PATH ### +#### A masked pattern was here #### +PREHOOK: query: insert into ice_msck_repair_test values (3, 'three'), (4, 'four') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_msck_repair_test +POSTHOOK: query: insert into ice_msck_repair_test values (3, 'three'), (4, 'four') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_msck_repair_test +PREHOOK: query: insert into ice_msck_repair_test values (5, 'five'), (6, 'six') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_msck_repair_test +POSTHOOK: query: insert into ice_msck_repair_test values (5, 'five'), (6, 'six') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_msck_repair_test +Found 2 items +-rw-rw-rw- 3 ### USER ### ### GROUP ### ### SIZE ### ### HDFS DATE ### hdfs://### HDFS PATH ### +-rw-rw-rw- 3 ### USER ### ### GROUP ### ### SIZE ### ### HDFS DATE ### hdfs://### HDFS PATH ### +PREHOOK: query: MSCK REPAIR TABLE ice_msck_repair_test +PREHOOK: type: MSCK +PREHOOK: Output: default@ice_msck_repair_test +POSTHOOK: query: MSCK REPAIR TABLE ice_msck_repair_test +POSTHOOK: type: MSCK +POSTHOOK: Output: default@ice_msck_repair_test +PREHOOK: query: select * from ice_msck_repair_test order by id +PREHOOK: type: QUERY +PREHOOK: Input: default@ice_msck_repair_test +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select * from ice_msck_repair_test order by id +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice_msck_repair_test +POSTHOOK: Output: hdfs://### HDFS PATH ### +3 three +4 four +5 five +6 six +PREHOOK: query: drop table ice_msck_repair_test +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@ice_msck_repair_test +PREHOOK: Output: database:default +PREHOOK: Output: default@ice_msck_repair_test +POSTHOOK: query: drop table ice_msck_repair_test +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@ice_msck_repair_test +POSTHOOK: Output: database:default +POSTHOOK: Output: default@ice_msck_repair_test diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/msck/MsckAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/msck/MsckAnalyzer.java index 4366e380ec42..b4ed61bdfd49 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/msck/MsckAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/msck/MsckAnalyzer.java @@ -60,7 +60,7 @@ public void analyzeInternal(ASTNode root) throws SemanticException { boolean repair = root.getChild(0).getType() == HiveParser.KW_REPAIR; int offset = repair ? 1 : 0; - String tableName = getUnescapedName((ASTNode) root.getChild(0 + offset)); + String tableName = getUnescapedName((ASTNode) root.getChild(offset)); boolean addPartitions = true; boolean dropPartitions = false; @@ -73,7 +73,7 @@ public void analyzeInternal(ASTNode root) throws SemanticException { Map> partitionSpecs = ParseUtils.getFullPartitionSpecs(root, table, conf, false); byte[] filterExp = null; - if (partitionSpecs != null & !partitionSpecs.isEmpty()) { + if (!partitionSpecs.isEmpty()) { // expression proxy class needs to be PartitionExpressionForMetastore since we intend to use the // filterPartitionsByExpr of PartitionExpressionForMetastore for partition pruning down the line. // Bail out early if expressionProxyClass is not configured properly. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/msck/MsckDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/msck/MsckDesc.java index 7d443663c964..855ad2d5c1f5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/msck/MsckDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/msck/MsckDesc.java @@ -34,16 +34,16 @@ public class MsckDesc implements DDLDesc, Serializable { private final String tableName; private final byte[] filterExp; private final String resFile; - private final boolean repairPartitions; + private final boolean repair; private final boolean addPartitions; private final boolean dropPartitions; public MsckDesc(String tableName, byte[] filterExp, Path resFile, - boolean repairPartitions, boolean addPartitions, boolean dropPartitions) { + boolean repair, boolean addPartitions, boolean dropPartitions) { this.tableName = tableName; this.filterExp = filterExp; this.resFile = resFile.toString(); - this.repairPartitions = repairPartitions; + this.repair = repair; this.addPartitions = addPartitions; this.dropPartitions = dropPartitions; } @@ -62,10 +62,10 @@ public String getResFile() { return resFile; } - @Explain(displayName = "repair partition", displayOnlyOnTrue = true, + @Explain(displayName = "repair", displayOnlyOnTrue = true, explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) - public boolean isRepairPartitions() { - return repairPartitions; + public boolean isRepair() { + return repair; } @Explain(displayName = "add partition", displayOnlyOnTrue = true, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/msck/MsckOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/msck/MsckOperation.java index b8ed7bb36c3a..8aabbb94b7ab 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/msck/MsckOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/msck/MsckOperation.java @@ -23,25 +23,28 @@ import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; +import com.google.common.base.Joiner; +import com.google.common.collect.Iterables; import org.apache.hadoop.hive.common.TableName; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.Msck; import org.apache.hadoop.hive.metastore.MsckInfo; import org.apache.hadoop.hive.metastore.PartitionManagementTask; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.MetastoreException; -import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.ql.ddl.DDLOperation; import org.apache.hadoop.hive.ql.ddl.DDLOperationContext; +import org.apache.hadoop.hive.ql.ddl.DDLUtils; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.HiveTableName; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Operation process of metastore check. @@ -50,12 +53,33 @@ * and partitions that are either missing on disk on in the metastore. */ public class MsckOperation extends DDLOperation { + + private static final Logger LOG = LoggerFactory.getLogger(MsckOperation.class); + private static final SessionState.LogHelper CONSOLE = SessionState.getConsole(); + public MsckOperation(DDLOperationContext context, MsckDesc desc) { super(context, desc); } @Override public int execute() throws HiveException, IOException, TException, MetastoreException { + Table table = context.getDb().getTable(desc.getTableName()); + + if (DDLUtils.isIcebergTable(table)) { + MsckResult result = + table.getStorageHandler().repair(table, context.getConf(), desc); + + CONSOLE.printInfo(result.message()); + + // Print details (file paths) if available + if (!result.details().isEmpty()) { + String filesLog = formatFilesForConsole(result.details()); + CONSOLE.printInfo("[MSCK] Details: " + filesLog); + } + + return 0; + } + try { Msck msck = new Msck(false, false); msck.init(Msck.getMsckConf(context.getDb().getConf())); @@ -64,39 +88,30 @@ public int execute() throws HiveException, IOException, TException, MetastoreExc TableName tableName = HiveTableName.of(desc.getTableName()); long partitionExpirySeconds = -1L; - try (HiveMetaStoreClient msc = new HiveMetaStoreClient(context.getConf())) { - boolean msckEnablePartitionRetention = MetastoreConf.getBoolVar(context.getConf(), - MetastoreConf.ConfVars.MSCK_REPAIR_ENABLE_PARTITION_RETENTION); - if (msckEnablePartitionRetention) { - Table table = msc.getTable(SessionState.get().getCurrentCatalog(), tableName.getDb(), tableName.getTable()); - String qualifiedTableName = Warehouse.getCatalogQualifiedTableName(table); - partitionExpirySeconds = PartitionManagementTask.getRetentionPeriodInSeconds(table); - LOG.info("{} - Retention period ({}s) for partition is enabled for MSCK REPAIR..", qualifiedTableName, - partitionExpirySeconds); - } + boolean msckEnablePartitionRetention = MetastoreConf.getBoolVar(context.getConf(), + MetastoreConf.ConfVars.MSCK_REPAIR_ENABLE_PARTITION_RETENTION); + if (msckEnablePartitionRetention) { + org.apache.hadoop.hive.metastore.api.Table tTable = table.getTTable(); + String qualifiedTableName = Warehouse.getCatalogQualifiedTableName(tTable); + partitionExpirySeconds = PartitionManagementTask.getRetentionPeriodInSeconds(tTable); + LOG.info("{} - Retention period ({}s) for partition is enabled for MSCK REPAIR..", qualifiedTableName, + partitionExpirySeconds); } MsckInfo msckInfo = new MsckInfo(SessionState.get().getCurrentCatalog(), tableName.getDb(), tableName.getTable(), - desc.getFilterExp(), desc.getResFile(), desc.isRepairPartitions(), + desc.getFilterExp(), desc.getResFile(), desc.isRepair(), desc.isAddPartitions(), desc.isDropPartitions(), partitionExpirySeconds); int result = msck.repair(msckInfo); Map smallFilesStats = msckInfo.getSmallFilesStats(); if (smallFilesStats != null && !smallFilesStats.isEmpty()) { // keep the small files information in logInfo List logInfo = smallFilesStats.entrySet().stream() - .map(entry -> String.format( - "Average file size is too small, small files exist. %n Partition name: %s. %s", - entry.getKey(), entry.getValue())) - .collect(Collectors.toList()); + .map(entry -> String.format( + "Average file size is too small, small files exist. %n Partition name: %s. %s", + entry.getKey(), entry.getValue())) + .toList(); // print out the small files information on console to end users - SessionState ss = SessionState.get(); - if (ss != null && ss.getConsole() != null) { - ss.getConsole().printInfo("[MSCK] Small files detected."); - ss.getConsole().printInfo(""); // add a blank line for separation - logInfo.forEach(line -> ss.getConsole().printInfo("[MSCK] " + line)); - } else { - // if there is no console to print out, keep the small files info in logs - LOG.info("There are small files exist.\n{}", String.join("\n", logInfo)); - } + CONSOLE.printInfo("[MSCK] Small files detected.\n"); + logInfo.forEach(line -> CONSOLE.printInfo("[MSCK] " + line)); } return result; } catch (MetaException | MetastoreException e) { @@ -108,4 +123,20 @@ public int execute() throws HiveException, IOException, TException, MetastoreExc } } + /** + * Format list of files for console output. + */ + private static String formatFilesForConsole(List files) { + int numPathsToLog = LOG.isTraceEnabled() ? 100 : 3; + int total = files.size(); + String fileNames = Joiner.on(", ").join(Iterables.limit(files, numPathsToLog)); + + if (total > numPathsToLog) { + int remaining = total - numPathsToLog; + fileNames += String.format(" (and %d more)", remaining); + } + return fileNames; + } + } + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/msck/MsckResult.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/msck/MsckResult.java new file mode 100644 index 000000000000..2f84dd67d3bb --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/msck/MsckResult.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.ddl.misc.msck; + +import java.util.ArrayList; +import java.util.List; + +/** + * Generic result of a table repair operation. + * Contains the number of issues found/fixed and a detailed log message. + */ +public record MsckResult(int numIssues, String message, List details) { + + /** + * Creates a result with issue count and log message. + * + * @param numIssues number of issues found or fixed + * @param message log message describing the operation result + */ + public MsckResult(int numIssues, String message) { + this(numIssues, message, new ArrayList<>()); + } + + /** + * Creates a result with issue count, log message, and detail items. + * + * @param numIssues number of issues found or fixed + * @param message log message describing the operation result + * @param details list of detail strings (e.g., file paths) + */ + public MsckResult(int numIssues, String message, List details) { + this.numIssues = numIssues; + this.message = message; + this.details = details != null ? details : new ArrayList<>(); + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java index 39358ca3d594..ffea1435a84f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java @@ -46,6 +46,8 @@ import org.apache.hadoop.hive.ql.Context.Operation; import org.apache.hadoop.hive.ql.ddl.DDLOperationContext; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.ddl.misc.msck.MsckDesc; +import org.apache.hadoop.hive.ql.ddl.misc.msck.MsckResult; import org.apache.hadoop.hive.ql.ddl.table.AbstractAlterTableDesc; import org.apache.hadoop.hive.ql.ddl.table.AlterTableType; import org.apache.hadoop.hive.ql.ddl.table.create.CreateTableDesc; @@ -1023,4 +1025,20 @@ default void setMergeTaskDeleteProperties(TableDesc tableDesc) { default boolean supportsDefaultColumnValues(Map tblProps) { return false; } + + /** + * Repair table metadata by removing dangling references or fixing inconsistencies. + * This is called by MSCK REPAIR TABLE command for non-native tables. + * + * @param table the table to repair + * @param conf the Hive configuration + * @param desc the msckDesc + * @return MsckResult containing repair statistics and details + * @throws HiveException if the repair operation fails + */ + default MsckResult repair(org.apache.hadoop.hive.ql.metadata.Table table, HiveConf conf, MsckDesc desc) + throws HiveException { + return new MsckResult(0, "Repair not supported for this table type"); + } + } diff --git a/ql/src/test/results/clientpositive/llap/msck_repair_drop.q.out b/ql/src/test/results/clientpositive/llap/msck_repair_drop.q.out index e17b6d8ec863..ce7b54b77f4a 100644 --- a/ql/src/test/results/clientpositive/llap/msck_repair_drop.q.out +++ b/ql/src/test/results/clientpositive/llap/msck_repair_drop.q.out @@ -48,7 +48,7 @@ STAGE PLANS: Metastore Check table name: default.repairtable_n1 add partition: true - repair partition: true + repair: true PREHOOK: query: MSCK REPAIR TABLE default.repairtable_n1 PREHOOK: type: MSCK @@ -113,7 +113,7 @@ STAGE PLANS: Metastore Check table name: default.repairtable_n1 drop partition: true - repair partition: true + repair: true PREHOOK: query: MSCK REPAIR TABLE default.repairtable_n1 DROP PARTITIONS PREHOOK: type: MSCK @@ -335,7 +335,7 @@ STAGE PLANS: Metastore Check table name: default.repairtable_n1 add partition: true - repair partition: true + repair: true PREHOOK: query: MSCK REPAIR TABLE default.repairtable_n1 ADD PARTITIONS PREHOOK: type: MSCK @@ -426,7 +426,7 @@ STAGE PLANS: table name: default.repairtable_n1 add partition: true drop partition: true - repair partition: true + repair: true PREHOOK: query: MSCK REPAIR TABLE default.repairtable_n1 SYNC PARTITIONS PREHOOK: type: MSCK