Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ private boolean versionInFilePath(String path, String version) {
}

private String rebuildMetadata() {
//TODO need to implement rewrite of manifest list , manifest files and position delete files.
//TODO need to implement rewrite of manifest files and position delete files.
TableMetadata startMetadata = startVersionName != null
? new StaticTableOperations(startVersionName, table.io()).current()
: null;
Expand All @@ -227,12 +227,14 @@ private String rebuildMetadata() {
Set<Long> deltaSnapshotIds = deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
Set<Snapshot> validSnapshots = new HashSet<>(RewriteTablePathOzoneUtils.snapshotSet(endMetadata));
validSnapshots.removeAll(RewriteTablePathOzoneUtils.snapshotSet(startMetadata));
//TODO: manifestsToRewrite will be used while re-write of manifest-list files.
Set<String> manifestsToRewrite = manifestsToRewrite(validSnapshots,
startMetadata != null ? deltaSnapshotIds : null);
RewriteResult<ManifestFile> rewriteManifestListResult =
rewriteManifestLists(validSnapshots, endMetadata, manifestsToRewrite);

Set<Pair<String, String>> copyPlan = new HashSet<>();
copyPlan.addAll(rewriteVersionResult.copyPlan());
copyPlan.addAll(rewriteManifestListResult.copyPlan());

return RewriteTablePathOzoneUtils.saveFileList(copyPlan, stagingDir, table.io());
}
Expand Down Expand Up @@ -362,6 +364,91 @@ private Set<String> manifestsToRewrite(Set<Snapshot> validSnapshots, Set<Long> d
return manifestPaths;
}

private RewriteResult<ManifestFile> rewriteManifestList(
Snapshot snapshot, TableMetadata tableMetadata, Set<String> manifestsToRewrite) {
RewriteResult<ManifestFile> result = new RewriteResult<>();

String path = snapshot.manifestListLocation();
String outputPath = RewriteTablePathUtil.stagingPath(path, sourcePrefix, stagingDir);
RewriteResult<ManifestFile> rewriteResult =
RewriteTablePathUtil.rewriteManifestList(
snapshot,
table.io(),
tableMetadata,
manifestsToRewrite,
sourcePrefix,
targetPrefix,
stagingDir,
outputPath);

result.append(rewriteResult);
result
.copyPlan()
.add(Pair.of(outputPath, RewriteTablePathUtil.newPath(path, sourcePrefix, targetPrefix)));
return result;
}

private RewriteResult<ManifestFile> rewriteManifestLists(Set<Snapshot> validSnapshots, TableMetadata endMetadata,
Set<String> manifestsToRewrite) {

if (validSnapshots.isEmpty()) {
return new RewriteResult<>();
}

int maxInFlight = parallelism * MAX_INFLIGHT_MULTIPLIER;
Semaphore semaphore = new Semaphore(maxInFlight);
ExecutorCompletionService<RewriteResult<ManifestFile>> completionService =
new ExecutorCompletionService<>(executorService);

RewriteResult<ManifestFile> combined = new RewriteResult<>();
int submittedTasks = 0;
int completedTasks = 0;

try {
for (Snapshot snapshot : validSnapshots) {
semaphore.acquire();

boolean taskSubmitted = false;
try {
completionService.submit(() -> {
try {
return rewriteManifestList(snapshot, endMetadata, manifestsToRewrite);
} finally {
semaphore.release();
}
});
taskSubmitted = true;
submittedTasks++;
} finally {
if (!taskSubmitted) {
semaphore.release();
}
}

Future<RewriteResult<ManifestFile>> done;
while ((done = completionService.poll()) != null) {
combined.append(done.get());
completedTasks++;
}
}

while (completedTasks < submittedTasks) {
combined.append(completionService.take().get());
completedTasks++;
}

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
executorService.shutdownNow();
throw new RuntimeException("Interrupted while rewriting manifest lists", e);
} catch (ExecutionException e) {
executorService.shutdownNow();
throw new RuntimeException("Failed to rewrite manifest list", e.getCause());
}

return combined;
}

private Set<Snapshot> deltaSnapshots(TableMetadata startMetadata, Set<Snapshot> allSnapshots) {
if (startMetadata == null) {
return allSnapshots;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -33,7 +34,11 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.GenericManifestFile;
import org.apache.iceberg.GenericPartitionFieldSummary;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.InternalData;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RewriteTablePathUtil;
import org.apache.iceberg.Schema;
Expand All @@ -44,6 +49,7 @@
import org.apache.iceberg.TableMetadata.MetadataLogEntry;
import org.apache.iceberg.actions.RewriteTablePath;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -68,11 +74,11 @@ class TestRewriteTablePathOzoneAction {
private Table table = null;

@TempDir
private java.nio.file.Path tableDir;
private Path tableDir;
@TempDir
private java.nio.file.Path targetDir;
private Path targetDir;
@TempDir
private java.nio.file.Path stagingDir;
private Path stagingDir;

@BeforeEach
public void setupTableLocation() {
Expand All @@ -96,7 +102,9 @@ void fullTablePathRewrite() throws Exception {
}

Set<Pair<String, String>> csvPairs = readCsvPairs(table, result.fileListLocation());
assertEquals(expectedTargets, csvPairs.stream().map(Pair::second).collect(Collectors.toSet()));
Set<String> actualTargets = csvPairs.stream().map(Pair::second).collect(Collectors.toSet());
assertTrue(actualTargets.containsAll(expectedTargets),
"Copy plan should contain all expected version file targets");

// Verify all internal paths inside each staged metadata file are rewritten to target.
assertAllInternalPathsRewritten(csvPairs, targetPrefix);
Expand Down Expand Up @@ -124,7 +132,9 @@ void tablePathRewriteForStartAndNoEndVersionProvided() throws Exception {
}

Set<Pair<String, String>> csvPairs = readCsvPairs(table, result.fileListLocation());
assertEquals(expectedTargets, csvPairs.stream().map(Pair::second).collect(Collectors.toSet()));
Set<String> actualTargets = csvPairs.stream().map(Pair::second).collect(Collectors.toSet());
assertTrue(actualTargets.containsAll(expectedTargets),
"Copy plan should contain all expected version file targets");

// Verify all internal paths inside each staged metadata file are rewritten to target
assertAllInternalPathsRewritten(csvPairs, targetPrefix);
Expand Down Expand Up @@ -152,7 +162,9 @@ void tablePathRewriteForOnlyEndVersionProvided() throws Exception {
}

Set<Pair<String, String>> csvPairs = readCsvPairs(table, result.fileListLocation());
assertEquals(expectedTargets, csvPairs.stream().map(Pair::second).collect(Collectors.toSet()));
Set<String> actualTargets = csvPairs.stream().map(Pair::second).collect(Collectors.toSet());
assertTrue(actualTargets.containsAll(expectedTargets),
"Copy plan should contain all expected version file targets");

// Verify all internal paths inside each staged metadata file are rewritten to target
assertAllInternalPathsRewritten(csvPairs, targetPrefix);
Expand Down Expand Up @@ -182,49 +194,119 @@ void tablePathRewriteForStartAndEndVersionProvided() throws Exception {
}

Set<Pair<String, String>> csvPairs = readCsvPairs(table, result.fileListLocation());
assertEquals(expectedTargets, csvPairs.stream().map(Pair::second).collect(Collectors.toSet()));
Set<String> actualTargets = csvPairs.stream().map(Pair::second).collect(Collectors.toSet());
assertTrue(actualTargets.containsAll(expectedTargets),
"Copy plan should contain all expected version file targets");

// Verify all internal paths inside each staged metadata file are rewritten to target
assertAllInternalPathsRewritten(csvPairs, targetPrefix);
}

/**
* For every staged metadata JSON file in the CSV, parses the file and asserts that:
* - The table location starts with target
* - Every metadata-log entry path starts with target
* - Every snapshot's manifest-list path starts with target
* - Every statistics file path starts with target
* - None of the above contain the source prefix.
* For every staged file in the CSV copy plan, asserts that internal paths are rewritten
* to the target prefix:
* <ul>
* <li><b>.metadata.json</b>: table location, metadata-log entries, and snapshot
* manifest-list references all start with target.</li>
* <li><b>snap-*.avro (manifest-list)</b>: target path starts with target, and every
* manifest entry path inside the staged file starts with target.</li>
* <li><b>*.avro (manifest)</b>: target path starts with target (content rewrite
* is not yet implemented).</li>
* </ul>
*/
private void assertAllInternalPathsRewritten(Set<Pair<String, String>> csvPairs, String target) {
private void assertAllInternalPathsRewritten(Set<Pair<String, String>> csvPairs, String target) throws Exception {

for (Pair<String, String> pair : csvPairs) {
String stagingPath = pair.first();
String targetPath = pair.second();

// Only inspect .metadata.json files, manifest/data files and snapshots are not yet rewritten
if (!stagingPath.endsWith(".metadata.json")) {
continue;
if (stagingPath.endsWith(".metadata.json")) {
assertMetadataFileRewritten(stagingPath, targetPath, target);
} else if (RewriteTablePathUtil.fileName(stagingPath).startsWith("snap-")) {
assertManifestListRewritten(stagingPath, targetPath, target);
} else if (RewriteTablePathUtil.fileName(stagingPath).endsWith(".avro")) {
assertTrue(targetPath.startsWith(target),
"Manifest file target path should start with target prefix: " + targetPath);
}
}
}

assertTrue(targetPath.startsWith(target),
"Target path in CSV should start with target prefix: " + targetPath);
private void assertMetadataFileRewritten(String stagingPath, String targetPath, String target) {

assertTrue(targetPath.startsWith(target),
"Target path in CSV should start with target prefix: " + targetPath);
assertEquals(RewriteTablePathUtil.fileName(stagingPath), RewriteTablePathUtil.fileName(targetPath),
"original and target metadata file should have the same filename");

TableMetadata rewritten = new StaticTableOperations(stagingPath, table.io()).current();
TableMetadata original = new StaticTableOperations(
targetPath.replace(targetPrefix, sourcePrefix), table.io()).current();
Set<String> expectedMetadata = original.previousFiles().stream()
.map(e -> RewriteTablePathUtil.fileName(e.file()))
.collect(Collectors.toSet());
Set<String> expectedManifestLists = original.snapshots().stream()
.map(s -> RewriteTablePathUtil.fileName(s.manifestListLocation()))
.collect(Collectors.toSet());
Set<String> actualMetadata = new HashSet<>();
Set<String> actualManifestLists = new HashSet<>();

assertTrue(rewritten.location().startsWith(target),
"Metadata location should start with target: " + rewritten.location());

for (MetadataLogEntry entry : rewritten.previousFiles()) {
assertTrue(entry.file().startsWith(target),
"Metadata log entry should start with target: " + entry.file());
actualMetadata.add(RewriteTablePathUtil.fileName(entry.file()));
}

TableMetadata rewritten = new StaticTableOperations(stagingPath, table.io()).current();
assertEquals(expectedMetadata, actualMetadata,
"Rewritten metadata file should reference the same metadata files as the original");

assertTrue(rewritten.location().startsWith(target),
"Metadata location should start with target: " + rewritten.location());
for (Snapshot snapshot : rewritten.snapshots()) {
String manifestList = snapshot.manifestListLocation();
assertTrue(manifestList.startsWith(target),
"Snapshot's manifest-list should start with target: " + manifestList);
actualManifestLists.add(RewriteTablePathUtil.fileName(manifestList));
}
assertEquals(expectedManifestLists, actualManifestLists,
"Rewritten metadata file should reference the same manifest-lists as the original");
}

for (MetadataLogEntry entry : rewritten.previousFiles()) {
assertTrue(entry.file().startsWith(target),
"Metadata log entry should start with target: " + entry.file());
private void assertManifestListRewritten(String stagingPath, String targetPath, String target) throws Exception {

assertTrue(targetPath.startsWith(target),
"Manifest list target path should start with target prefix: " + targetPath);
assertEquals(RewriteTablePathUtil.fileName(stagingPath), RewriteTablePathUtil.fileName(targetPath),
"original and target manifest list should have the same filename");

Set<String> expectedManifests = new HashSet<>();
Set<String> actualManifests = new HashSet<>();
for (Snapshot s : table.snapshots()) {
if (RewriteTablePathUtil.fileName(s.manifestListLocation()).equals(RewriteTablePathUtil.fileName(stagingPath))) {
expectedManifests = s.allManifests(table.io())
.stream()
.map(m -> RewriteTablePathUtil.fileName(m.path()))
.collect(Collectors.toSet());
break;
}
}

for (Snapshot snapshot : rewritten.snapshots()) {
String manifestList = snapshot.manifestListLocation();
assertTrue(manifestList.startsWith(target),
"Snapshot manifest-list should start with target: " + manifestList);
try (CloseableIterable<ManifestFile> manifests =
InternalData.read(FileFormat.AVRO, table.io().newInputFile(stagingPath))
.setRootType(GenericManifestFile.class)
.setCustomType(
ManifestFile.PARTITION_SUMMARIES_ELEMENT_ID,
GenericPartitionFieldSummary.class)
.project(ManifestFile.schema())
.build()) {
for (ManifestFile manifest : manifests) {
assertTrue(manifest.path().startsWith(target),
"Manifest path inside staged manifest list should start with target prefix: " + manifest.path());
actualManifests.add(RewriteTablePathUtil.fileName(manifest.path()));
}
}
assertEquals(expectedManifests, actualManifests,
"Rewritten manifest list should reference the same manifest files as the original");
}

private static List<String> metadataLogEntryPaths(Table tbl) {
Expand Down