From 502abd44bfab2f7f47fe90f47c791021cb8de6d4 Mon Sep 17 00:00:00 2001 From: Sreeja Chintalapati Date: Tue, 5 May 2026 16:17:13 +0530 Subject: [PATCH 1/3] HDDS-14943. Implement rewrite logic for Iceberg's manifest-list files for path migration --- .../iceberg/RewriteTablePathOzoneAction.java | 91 ++++++++++++++++++- .../TestRewriteTablePathOzoneAction.java | 85 ++++++++++++----- 2 files changed, 152 insertions(+), 24 deletions(-) diff --git a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java index a3e9190469e6..2e8b930bf265 100644 --- a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java +++ b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java @@ -211,7 +211,7 @@ private boolean versionInFilePath(String path, String version) { } private String rebuildMetadata() { - //TODO need to implement rewrite of manifest list , manifest files and position delete files. + //TODO need to implement rewrite of manifest files and position delete files. TableMetadata startMetadata = startVersionName != null ? new StaticTableOperations(startVersionName, table.io()).current() : null; @@ -227,12 +227,14 @@ private String rebuildMetadata() { Set deltaSnapshotIds = deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); Set validSnapshots = new HashSet<>(RewriteTablePathOzoneUtils.snapshotSet(endMetadata)); validSnapshots.removeAll(RewriteTablePathOzoneUtils.snapshotSet(startMetadata)); - //TODO: manifestsToRewrite will be used while re-write of manifest-list files. Set manifestsToRewrite = manifestsToRewrite(validSnapshots, startMetadata != null ? deltaSnapshotIds : null); + RewriteResult rewriteManifestListResult = + rewriteManifestLists(validSnapshots, endMetadata, manifestsToRewrite); Set> copyPlan = new HashSet<>(); copyPlan.addAll(rewriteVersionResult.copyPlan()); + copyPlan.addAll(rewriteManifestListResult.copyPlan()); return RewriteTablePathOzoneUtils.saveFileList(copyPlan, stagingDir, table.io()); } @@ -362,6 +364,91 @@ private Set manifestsToRewrite(Set validSnapshots, Set d return manifestPaths; } + private RewriteResult rewriteManifestList( + Snapshot snapshot, TableMetadata tableMetadata, Set manifestsToRewrite) { + RewriteResult result = new RewriteResult<>(); + + String path = snapshot.manifestListLocation(); + String outputPath = RewriteTablePathUtil.stagingPath(path, sourcePrefix, stagingDir); + RewriteResult rewriteResult = + RewriteTablePathUtil.rewriteManifestList( + snapshot, + table.io(), + tableMetadata, + manifestsToRewrite, + sourcePrefix, + targetPrefix, + stagingDir, + outputPath); + + result.append(rewriteResult); + result + .copyPlan() + .add(Pair.of(outputPath, RewriteTablePathUtil.newPath(path, sourcePrefix, targetPrefix))); + return result; + } + + private RewriteResult rewriteManifestLists(Set validSnapshots, TableMetadata endMetadata, + Set manifestsToRewrite) { + + if (validSnapshots.isEmpty()) { + return new RewriteResult<>(); + } + + int maxInFlight = parallelism * MAX_INFLIGHT_MULTIPLIER; + Semaphore semaphore = new Semaphore(maxInFlight); + ExecutorCompletionService> completionService = + new ExecutorCompletionService<>(executorService); + + RewriteResult combined = new RewriteResult<>(); + int submittedTasks = 0; + int completedTasks = 0; + + try { + for (Snapshot snapshot : validSnapshots) { + semaphore.acquire(); + + boolean taskSubmitted = false; + try { + completionService.submit(() -> { + try { + return rewriteManifestList(snapshot, endMetadata, manifestsToRewrite); + } finally { + semaphore.release(); + } + }); + taskSubmitted = true; + submittedTasks++; + } finally { + if (!taskSubmitted) { + semaphore.release(); + } + } + + Future> done; + while ((done = completionService.poll()) != null) { + combined.append(done.get()); + completedTasks++; + } + } + + while (completedTasks < submittedTasks) { + combined.append(completionService.take().get()); + completedTasks++; + } + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + executorService.shutdownNow(); + throw new RuntimeException("Interrupted while rewriting manifest lists", e); + } catch (ExecutionException e) { + executorService.shutdownNow(); + throw new RuntimeException("Failed to rewrite manifest list", e.getCause()); + } + + return combined; + } + private Set deltaSnapshots(TableMetadata startMetadata, Set allSnapshots) { if (startMetadata == null) { return allSnapshots; diff --git a/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java b/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java index 8678fe52301e..0dc616b3fc81 100644 --- a/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java +++ b/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java @@ -17,7 +17,6 @@ package org.apache.hadoop.ozone.iceberg; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.BufferedReader; @@ -33,7 +32,11 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.GenericManifestFile; +import org.apache.iceberg.GenericPartitionFieldSummary; import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.InternalData; +import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.RewriteTablePathUtil; import org.apache.iceberg.Schema; @@ -44,6 +47,7 @@ import org.apache.iceberg.TableMetadata.MetadataLogEntry; import org.apache.iceberg.actions.RewriteTablePath; import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Pair; import org.junit.jupiter.api.BeforeEach; @@ -96,7 +100,9 @@ void fullTablePathRewrite() throws Exception { } Set> csvPairs = readCsvPairs(table, result.fileListLocation()); - assertEquals(expectedTargets, csvPairs.stream().map(Pair::second).collect(Collectors.toSet())); + Set actualTargets = csvPairs.stream().map(Pair::second).collect(Collectors.toSet()); + assertTrue(actualTargets.containsAll(expectedTargets), + "Copy plan should contain all expected version file targets"); // Verify all internal paths inside each staged metadata file are rewritten to target. assertAllInternalPathsRewritten(csvPairs, targetPrefix); @@ -124,7 +130,9 @@ void tablePathRewriteForStartAndNoEndVersionProvided() throws Exception { } Set> csvPairs = readCsvPairs(table, result.fileListLocation()); - assertEquals(expectedTargets, csvPairs.stream().map(Pair::second).collect(Collectors.toSet())); + Set actualTargets = csvPairs.stream().map(Pair::second).collect(Collectors.toSet()); + assertTrue(actualTargets.containsAll(expectedTargets), + "Copy plan should contain all expected version file targets"); // Verify all internal paths inside each staged metadata file are rewritten to target assertAllInternalPathsRewritten(csvPairs, targetPrefix); @@ -152,7 +160,9 @@ void tablePathRewriteForOnlyEndVersionProvided() throws Exception { } Set> csvPairs = readCsvPairs(table, result.fileListLocation()); - assertEquals(expectedTargets, csvPairs.stream().map(Pair::second).collect(Collectors.toSet())); + Set actualTargets = csvPairs.stream().map(Pair::second).collect(Collectors.toSet()); + assertTrue(actualTargets.containsAll(expectedTargets), + "Copy plan should contain all expected version file targets"); // Verify all internal paths inside each staged metadata file are rewritten to target assertAllInternalPathsRewritten(csvPairs, targetPrefix); @@ -182,7 +192,9 @@ void tablePathRewriteForStartAndEndVersionProvided() throws Exception { } Set> csvPairs = readCsvPairs(table, result.fileListLocation()); - assertEquals(expectedTargets, csvPairs.stream().map(Pair::second).collect(Collectors.toSet())); + Set actualTargets = csvPairs.stream().map(Pair::second).collect(Collectors.toSet()); + assertTrue(actualTargets.containsAll(expectedTargets), + "Copy plan should contain all expected version file targets"); // Verify all internal paths inside each staged metadata file are rewritten to target assertAllInternalPathsRewritten(csvPairs, targetPrefix); @@ -196,33 +208,62 @@ void tablePathRewriteForStartAndEndVersionProvided() throws Exception { * - Every statistics file path starts with target * - None of the above contain the source prefix. */ - private void assertAllInternalPathsRewritten(Set> csvPairs, String target) { + private void assertAllInternalPathsRewritten(Set> csvPairs, String target) throws Exception { + for (Pair pair : csvPairs) { String stagingPath = pair.first(); String targetPath = pair.second(); - // Only inspect .metadata.json files, manifest/data files and snapshots are not yet rewritten - if (!stagingPath.endsWith(".metadata.json")) { - continue; + if (stagingPath.endsWith(".metadata.json")) { + assertMetadataFileRewritten(stagingPath, targetPath, target); + } else if (RewriteTablePathUtil.fileName(stagingPath).startsWith("snap-")) { + assertManifestListRewritten(stagingPath, targetPath, target); + } else if (RewriteTablePathUtil.fileName(stagingPath).endsWith(".avro")) { + assertTrue(targetPath.startsWith(target), + "Manifest file target path should start with target prefix: " + targetPath); } + } + } - assertTrue(targetPath.startsWith(target), - "Target path in CSV should start with target prefix: " + targetPath); + private void assertMetadataFileRewritten(String stagingPath, String targetPath, String target) { - TableMetadata rewritten = new StaticTableOperations(stagingPath, table.io()).current(); + assertTrue(targetPath.startsWith(target), + "Target path in CSV should start with target prefix: " + targetPath); - assertTrue(rewritten.location().startsWith(target), - "Metadata location should start with target: " + rewritten.location()); + TableMetadata rewritten = new StaticTableOperations(stagingPath, table.io()).current(); - for (MetadataLogEntry entry : rewritten.previousFiles()) { - assertTrue(entry.file().startsWith(target), - "Metadata log entry should start with target: " + entry.file()); - } + assertTrue(rewritten.location().startsWith(target), + "Metadata location should start with target: " + rewritten.location()); + + for (MetadataLogEntry entry : rewritten.previousFiles()) { + assertTrue(entry.file().startsWith(target), + "Metadata log entry should start with target: " + entry.file()); + } + + for (Snapshot snapshot : rewritten.snapshots()) { + String manifestList = snapshot.manifestListLocation(); + assertTrue(manifestList.startsWith(target), + "Snapshot manifest-list should start with target: " + manifestList); + } + } - for (Snapshot snapshot : rewritten.snapshots()) { - String manifestList = snapshot.manifestListLocation(); - assertTrue(manifestList.startsWith(target), - "Snapshot manifest-list should start with target: " + manifestList); + private void assertManifestListRewritten(String stagingPath, String targetPath, String target) throws Exception { + + assertTrue(targetPath.startsWith(target), + "Manifest list target path should start with target prefix: " + targetPath); + + try (CloseableIterable manifests = + InternalData.read(FileFormat.AVRO, table.io().newInputFile(stagingPath)) + .setRootType(GenericManifestFile.class) + .setCustomType( + ManifestFile.PARTITION_SUMMARIES_ELEMENT_ID, + GenericPartitionFieldSummary.class) + .project(ManifestFile.schema()) + .build()) { + for (ManifestFile manifest : manifests) { + assertTrue(manifest.path().startsWith(target), + "Manifest path inside staged manifest list should start with target prefix: " + + manifest.path()); } } } From 8912621e5f2dd8e879318b619654702b9a0dd6fe Mon Sep 17 00:00:00 2001 From: Sreeja Chintalapati Date: Tue, 5 May 2026 17:39:25 +0530 Subject: [PATCH 2/3] Updated javadoc of test --- .../iceberg/TestRewriteTablePathOzoneAction.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java b/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java index 0dc616b3fc81..1cee163b45a9 100644 --- a/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java +++ b/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java @@ -201,12 +201,16 @@ void tablePathRewriteForStartAndEndVersionProvided() throws Exception { } /** - * For every staged metadata JSON file in the CSV, parses the file and asserts that: - * - The table location starts with target - * - Every metadata-log entry path starts with target - * - Every snapshot's manifest-list path starts with target - * - Every statistics file path starts with target - * - None of the above contain the source prefix. + * For every staged file in the CSV copy plan, asserts that internal paths are rewritten + * to the target prefix: + *
    + *
  • .metadata.json: table location, metadata-log entries, and snapshot + * manifest-list references all start with target.
  • + *
  • snap-*.avro (manifest-list): target path starts with target, and every + * manifest entry path inside the staged file starts with target.
  • + *
  • *.avro (manifest): target path starts with target (content rewrite + * is not yet implemented).
  • + *
*/ private void assertAllInternalPathsRewritten(Set> csvPairs, String target) throws Exception { From 535a45d7ce5d3d03ac182abfc060711c7029c2f5 Mon Sep 17 00:00:00 2001 From: Sreeja Chintalapati Date: Tue, 5 May 2026 19:44:58 +0530 Subject: [PATCH 3/3] Assert metadata and manifest list references in path rewrite tests --- .../TestRewriteTablePathOzoneAction.java | 49 ++++++++++++++++--- 1 file changed, 43 insertions(+), 6 deletions(-) diff --git a/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java b/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java index 1cee163b45a9..56442a750635 100644 --- a/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java +++ b/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java @@ -17,11 +17,13 @@ package org.apache.hadoop.ozone.iceberg; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.BufferedReader; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; +import java.nio.file.Path; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -72,11 +74,11 @@ class TestRewriteTablePathOzoneAction { private Table table = null; @TempDir - private java.nio.file.Path tableDir; + private Path tableDir; @TempDir - private java.nio.file.Path targetDir; + private Path targetDir; @TempDir - private java.nio.file.Path stagingDir; + private Path stagingDir; @BeforeEach public void setupTableLocation() { @@ -233,8 +235,20 @@ private void assertMetadataFileRewritten(String stagingPath, String targetPath, assertTrue(targetPath.startsWith(target), "Target path in CSV should start with target prefix: " + targetPath); + assertEquals(RewriteTablePathUtil.fileName(stagingPath), RewriteTablePathUtil.fileName(targetPath), + "original and target metadata file should have the same filename"); TableMetadata rewritten = new StaticTableOperations(stagingPath, table.io()).current(); + TableMetadata original = new StaticTableOperations( + targetPath.replace(targetPrefix, sourcePrefix), table.io()).current(); + Set expectedMetadata = original.previousFiles().stream() + .map(e -> RewriteTablePathUtil.fileName(e.file())) + .collect(Collectors.toSet()); + Set expectedManifestLists = original.snapshots().stream() + .map(s -> RewriteTablePathUtil.fileName(s.manifestListLocation())) + .collect(Collectors.toSet()); + Set actualMetadata = new HashSet<>(); + Set actualManifestLists = new HashSet<>(); assertTrue(rewritten.location().startsWith(target), "Metadata location should start with target: " + rewritten.location()); @@ -242,19 +256,40 @@ private void assertMetadataFileRewritten(String stagingPath, String targetPath, for (MetadataLogEntry entry : rewritten.previousFiles()) { assertTrue(entry.file().startsWith(target), "Metadata log entry should start with target: " + entry.file()); + actualMetadata.add(RewriteTablePathUtil.fileName(entry.file())); } + assertEquals(expectedMetadata, actualMetadata, + "Rewritten metadata file should reference the same metadata files as the original"); + for (Snapshot snapshot : rewritten.snapshots()) { String manifestList = snapshot.manifestListLocation(); assertTrue(manifestList.startsWith(target), - "Snapshot manifest-list should start with target: " + manifestList); + "Snapshot's manifest-list should start with target: " + manifestList); + actualManifestLists.add(RewriteTablePathUtil.fileName(manifestList)); } + assertEquals(expectedManifestLists, actualManifestLists, + "Rewritten metadata file should reference the same manifest-lists as the original"); } private void assertManifestListRewritten(String stagingPath, String targetPath, String target) throws Exception { assertTrue(targetPath.startsWith(target), "Manifest list target path should start with target prefix: " + targetPath); + assertEquals(RewriteTablePathUtil.fileName(stagingPath), RewriteTablePathUtil.fileName(targetPath), + "original and target manifest list should have the same filename"); + + Set expectedManifests = new HashSet<>(); + Set actualManifests = new HashSet<>(); + for (Snapshot s : table.snapshots()) { + if (RewriteTablePathUtil.fileName(s.manifestListLocation()).equals(RewriteTablePathUtil.fileName(stagingPath))) { + expectedManifests = s.allManifests(table.io()) + .stream() + .map(m -> RewriteTablePathUtil.fileName(m.path())) + .collect(Collectors.toSet()); + break; + } + } try (CloseableIterable manifests = InternalData.read(FileFormat.AVRO, table.io().newInputFile(stagingPath)) @@ -266,10 +301,12 @@ private void assertManifestListRewritten(String stagingPath, String targetPath, .build()) { for (ManifestFile manifest : manifests) { assertTrue(manifest.path().startsWith(target), - "Manifest path inside staged manifest list should start with target prefix: " - + manifest.path()); + "Manifest path inside staged manifest list should start with target prefix: " + manifest.path()); + actualManifests.add(RewriteTablePathUtil.fileName(manifest.path())); } } + assertEquals(expectedManifests, actualManifests, + "Rewritten manifest list should reference the same manifest files as the original"); } private static List metadataLogEntryPaths(Table tbl) {