diff --git a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java index a3e9190469e6..2e8b930bf265 100644 --- a/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java +++ b/hadoop-ozone/iceberg/src/main/java/org/apache/hadoop/ozone/iceberg/RewriteTablePathOzoneAction.java @@ -211,7 +211,7 @@ private boolean versionInFilePath(String path, String version) { } private String rebuildMetadata() { - //TODO need to implement rewrite of manifest list , manifest files and position delete files. + //TODO need to implement rewrite of manifest files and position delete files. TableMetadata startMetadata = startVersionName != null ? new StaticTableOperations(startVersionName, table.io()).current() : null; @@ -227,12 +227,14 @@ private String rebuildMetadata() { Set deltaSnapshotIds = deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); Set validSnapshots = new HashSet<>(RewriteTablePathOzoneUtils.snapshotSet(endMetadata)); validSnapshots.removeAll(RewriteTablePathOzoneUtils.snapshotSet(startMetadata)); - //TODO: manifestsToRewrite will be used while re-write of manifest-list files. Set manifestsToRewrite = manifestsToRewrite(validSnapshots, startMetadata != null ? deltaSnapshotIds : null); + RewriteResult rewriteManifestListResult = + rewriteManifestLists(validSnapshots, endMetadata, manifestsToRewrite); Set> copyPlan = new HashSet<>(); copyPlan.addAll(rewriteVersionResult.copyPlan()); + copyPlan.addAll(rewriteManifestListResult.copyPlan()); return RewriteTablePathOzoneUtils.saveFileList(copyPlan, stagingDir, table.io()); } @@ -362,6 +364,91 @@ private Set manifestsToRewrite(Set validSnapshots, Set d return manifestPaths; } + private RewriteResult rewriteManifestList( + Snapshot snapshot, TableMetadata tableMetadata, Set manifestsToRewrite) { + RewriteResult result = new RewriteResult<>(); + + String path = snapshot.manifestListLocation(); + String outputPath = RewriteTablePathUtil.stagingPath(path, sourcePrefix, stagingDir); + RewriteResult rewriteResult = + RewriteTablePathUtil.rewriteManifestList( + snapshot, + table.io(), + tableMetadata, + manifestsToRewrite, + sourcePrefix, + targetPrefix, + stagingDir, + outputPath); + + result.append(rewriteResult); + result + .copyPlan() + .add(Pair.of(outputPath, RewriteTablePathUtil.newPath(path, sourcePrefix, targetPrefix))); + return result; + } + + private RewriteResult rewriteManifestLists(Set validSnapshots, TableMetadata endMetadata, + Set manifestsToRewrite) { + + if (validSnapshots.isEmpty()) { + return new RewriteResult<>(); + } + + int maxInFlight = parallelism * MAX_INFLIGHT_MULTIPLIER; + Semaphore semaphore = new Semaphore(maxInFlight); + ExecutorCompletionService> completionService = + new ExecutorCompletionService<>(executorService); + + RewriteResult combined = new RewriteResult<>(); + int submittedTasks = 0; + int completedTasks = 0; + + try { + for (Snapshot snapshot : validSnapshots) { + semaphore.acquire(); + + boolean taskSubmitted = false; + try { + completionService.submit(() -> { + try { + return rewriteManifestList(snapshot, endMetadata, manifestsToRewrite); + } finally { + semaphore.release(); + } + }); + taskSubmitted = true; + submittedTasks++; + } finally { + if (!taskSubmitted) { + semaphore.release(); + } + } + + Future> done; + while ((done = completionService.poll()) != null) { + combined.append(done.get()); + completedTasks++; + } + } + + while (completedTasks < submittedTasks) { + combined.append(completionService.take().get()); + completedTasks++; + } + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + executorService.shutdownNow(); + throw new RuntimeException("Interrupted while rewriting manifest lists", e); + } catch (ExecutionException e) { + executorService.shutdownNow(); + throw new RuntimeException("Failed to rewrite manifest list", e.getCause()); + } + + return combined; + } + private Set deltaSnapshots(TableMetadata startMetadata, Set allSnapshots) { if (startMetadata == null) { return allSnapshots; diff --git a/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java b/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java index 8678fe52301e..56442a750635 100644 --- a/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java +++ b/hadoop-ozone/iceberg/src/test/java/org/apache/hadoop/ozone/iceberg/TestRewriteTablePathOzoneAction.java @@ -23,6 +23,7 @@ import java.io.BufferedReader; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; +import java.nio.file.Path; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -33,7 +34,11 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.GenericManifestFile; +import org.apache.iceberg.GenericPartitionFieldSummary; import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.InternalData; +import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.RewriteTablePathUtil; import org.apache.iceberg.Schema; @@ -44,6 +49,7 @@ import org.apache.iceberg.TableMetadata.MetadataLogEntry; import org.apache.iceberg.actions.RewriteTablePath; import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Pair; import org.junit.jupiter.api.BeforeEach; @@ -68,11 +74,11 @@ class TestRewriteTablePathOzoneAction { private Table table = null; @TempDir - private java.nio.file.Path tableDir; + private Path tableDir; @TempDir - private java.nio.file.Path targetDir; + private Path targetDir; @TempDir - private java.nio.file.Path stagingDir; + private Path stagingDir; @BeforeEach public void setupTableLocation() { @@ -96,7 +102,9 @@ void fullTablePathRewrite() throws Exception { } Set> csvPairs = readCsvPairs(table, result.fileListLocation()); - assertEquals(expectedTargets, csvPairs.stream().map(Pair::second).collect(Collectors.toSet())); + Set actualTargets = csvPairs.stream().map(Pair::second).collect(Collectors.toSet()); + assertTrue(actualTargets.containsAll(expectedTargets), + "Copy plan should contain all expected version file targets"); // Verify all internal paths inside each staged metadata file are rewritten to target. assertAllInternalPathsRewritten(csvPairs, targetPrefix); @@ -124,7 +132,9 @@ void tablePathRewriteForStartAndNoEndVersionProvided() throws Exception { } Set> csvPairs = readCsvPairs(table, result.fileListLocation()); - assertEquals(expectedTargets, csvPairs.stream().map(Pair::second).collect(Collectors.toSet())); + Set actualTargets = csvPairs.stream().map(Pair::second).collect(Collectors.toSet()); + assertTrue(actualTargets.containsAll(expectedTargets), + "Copy plan should contain all expected version file targets"); // Verify all internal paths inside each staged metadata file are rewritten to target assertAllInternalPathsRewritten(csvPairs, targetPrefix); @@ -152,7 +162,9 @@ void tablePathRewriteForOnlyEndVersionProvided() throws Exception { } Set> csvPairs = readCsvPairs(table, result.fileListLocation()); - assertEquals(expectedTargets, csvPairs.stream().map(Pair::second).collect(Collectors.toSet())); + Set actualTargets = csvPairs.stream().map(Pair::second).collect(Collectors.toSet()); + assertTrue(actualTargets.containsAll(expectedTargets), + "Copy plan should contain all expected version file targets"); // Verify all internal paths inside each staged metadata file are rewritten to target assertAllInternalPathsRewritten(csvPairs, targetPrefix); @@ -182,49 +194,119 @@ void tablePathRewriteForStartAndEndVersionProvided() throws Exception { } Set> csvPairs = readCsvPairs(table, result.fileListLocation()); - assertEquals(expectedTargets, csvPairs.stream().map(Pair::second).collect(Collectors.toSet())); + Set actualTargets = csvPairs.stream().map(Pair::second).collect(Collectors.toSet()); + assertTrue(actualTargets.containsAll(expectedTargets), + "Copy plan should contain all expected version file targets"); // Verify all internal paths inside each staged metadata file are rewritten to target assertAllInternalPathsRewritten(csvPairs, targetPrefix); } /** - * For every staged metadata JSON file in the CSV, parses the file and asserts that: - * - The table location starts with target - * - Every metadata-log entry path starts with target - * - Every snapshot's manifest-list path starts with target - * - Every statistics file path starts with target - * - None of the above contain the source prefix. + * For every staged file in the CSV copy plan, asserts that internal paths are rewritten + * to the target prefix: + *
    + *
  • .metadata.json: table location, metadata-log entries, and snapshot + * manifest-list references all start with target.
  • + *
  • snap-*.avro (manifest-list): target path starts with target, and every + * manifest entry path inside the staged file starts with target.
  • + *
  • *.avro (manifest): target path starts with target (content rewrite + * is not yet implemented).
  • + *
*/ - private void assertAllInternalPathsRewritten(Set> csvPairs, String target) { + private void assertAllInternalPathsRewritten(Set> csvPairs, String target) throws Exception { + for (Pair pair : csvPairs) { String stagingPath = pair.first(); String targetPath = pair.second(); - // Only inspect .metadata.json files, manifest/data files and snapshots are not yet rewritten - if (!stagingPath.endsWith(".metadata.json")) { - continue; + if (stagingPath.endsWith(".metadata.json")) { + assertMetadataFileRewritten(stagingPath, targetPath, target); + } else if (RewriteTablePathUtil.fileName(stagingPath).startsWith("snap-")) { + assertManifestListRewritten(stagingPath, targetPath, target); + } else if (RewriteTablePathUtil.fileName(stagingPath).endsWith(".avro")) { + assertTrue(targetPath.startsWith(target), + "Manifest file target path should start with target prefix: " + targetPath); } + } + } - assertTrue(targetPath.startsWith(target), - "Target path in CSV should start with target prefix: " + targetPath); + private void assertMetadataFileRewritten(String stagingPath, String targetPath, String target) { + + assertTrue(targetPath.startsWith(target), + "Target path in CSV should start with target prefix: " + targetPath); + assertEquals(RewriteTablePathUtil.fileName(stagingPath), RewriteTablePathUtil.fileName(targetPath), + "original and target metadata file should have the same filename"); + + TableMetadata rewritten = new StaticTableOperations(stagingPath, table.io()).current(); + TableMetadata original = new StaticTableOperations( + targetPath.replace(targetPrefix, sourcePrefix), table.io()).current(); + Set expectedMetadata = original.previousFiles().stream() + .map(e -> RewriteTablePathUtil.fileName(e.file())) + .collect(Collectors.toSet()); + Set expectedManifestLists = original.snapshots().stream() + .map(s -> RewriteTablePathUtil.fileName(s.manifestListLocation())) + .collect(Collectors.toSet()); + Set actualMetadata = new HashSet<>(); + Set actualManifestLists = new HashSet<>(); + + assertTrue(rewritten.location().startsWith(target), + "Metadata location should start with target: " + rewritten.location()); + + for (MetadataLogEntry entry : rewritten.previousFiles()) { + assertTrue(entry.file().startsWith(target), + "Metadata log entry should start with target: " + entry.file()); + actualMetadata.add(RewriteTablePathUtil.fileName(entry.file())); + } - TableMetadata rewritten = new StaticTableOperations(stagingPath, table.io()).current(); + assertEquals(expectedMetadata, actualMetadata, + "Rewritten metadata file should reference the same metadata files as the original"); - assertTrue(rewritten.location().startsWith(target), - "Metadata location should start with target: " + rewritten.location()); + for (Snapshot snapshot : rewritten.snapshots()) { + String manifestList = snapshot.manifestListLocation(); + assertTrue(manifestList.startsWith(target), + "Snapshot's manifest-list should start with target: " + manifestList); + actualManifestLists.add(RewriteTablePathUtil.fileName(manifestList)); + } + assertEquals(expectedManifestLists, actualManifestLists, + "Rewritten metadata file should reference the same manifest-lists as the original"); + } - for (MetadataLogEntry entry : rewritten.previousFiles()) { - assertTrue(entry.file().startsWith(target), - "Metadata log entry should start with target: " + entry.file()); + private void assertManifestListRewritten(String stagingPath, String targetPath, String target) throws Exception { + + assertTrue(targetPath.startsWith(target), + "Manifest list target path should start with target prefix: " + targetPath); + assertEquals(RewriteTablePathUtil.fileName(stagingPath), RewriteTablePathUtil.fileName(targetPath), + "original and target manifest list should have the same filename"); + + Set expectedManifests = new HashSet<>(); + Set actualManifests = new HashSet<>(); + for (Snapshot s : table.snapshots()) { + if (RewriteTablePathUtil.fileName(s.manifestListLocation()).equals(RewriteTablePathUtil.fileName(stagingPath))) { + expectedManifests = s.allManifests(table.io()) + .stream() + .map(m -> RewriteTablePathUtil.fileName(m.path())) + .collect(Collectors.toSet()); + break; } + } - for (Snapshot snapshot : rewritten.snapshots()) { - String manifestList = snapshot.manifestListLocation(); - assertTrue(manifestList.startsWith(target), - "Snapshot manifest-list should start with target: " + manifestList); + try (CloseableIterable manifests = + InternalData.read(FileFormat.AVRO, table.io().newInputFile(stagingPath)) + .setRootType(GenericManifestFile.class) + .setCustomType( + ManifestFile.PARTITION_SUMMARIES_ELEMENT_ID, + GenericPartitionFieldSummary.class) + .project(ManifestFile.schema()) + .build()) { + for (ManifestFile manifest : manifests) { + assertTrue(manifest.path().startsWith(target), + "Manifest path inside staged manifest list should start with target prefix: " + manifest.path()); + actualManifests.add(RewriteTablePathUtil.fileName(manifest.path())); } } + assertEquals(expectedManifests, actualManifests, + "Rewritten manifest list should reference the same manifest files as the original"); } private static List metadataLogEntryPaths(Table tbl) {