Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 36 additions & 8 deletions core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.ManifestFileUtil;
Expand All @@ -55,6 +56,22 @@
abstract class ManifestFilterManager<F extends ContentFile<F>> {
private static final Logger LOG = LoggerFactory.getLogger(ManifestFilterManager.class);
private static final Joiner COMMA = Joiner.on(",");
// avoid projecting large fields when they are not used in manifestHasDeletedFiles
private static final List<Integer> MANIFEST_ENTRY_IGNORED_FIELDS =
ImmutableList.of(
DataFile.COLUMN_SIZES.fieldId(),
DataFile.VALUE_COUNTS.fieldId(),
DataFile.NULL_VALUE_COUNTS.fieldId(),
DataFile.NAN_VALUE_COUNTS.fieldId(),
DataFile.LOWER_BOUNDS.fieldId(),
DataFile.UPPER_BOUNDS.fieldId(),
DataFile.KEY_METADATA.fieldId(),
DataFile.SPLIT_OFFSETS.fieldId());
private static final List<String> MANIFEST_ENTRY_FIELDS_FOR_HAS_DELETED_FILES =
DataFile.getType(BaseFile.EMPTY_STRUCT_TYPE).fields().stream()
.filter(field -> !MANIFEST_ENTRY_IGNORED_FIELDS.contains(field.fieldId()))
.map(Types.NestedField::name)
.collect(ImmutableList.toImmutableList());

protected static class DeleteException extends ValidationException {
private final String partition;
Expand Down Expand Up @@ -378,23 +395,33 @@ private ManifestFile filterManifest(
}

try (ManifestReader<F> reader = newManifestReader(manifest)) {
// can skip reading large stats columns to reduce allocations
if (deleteExpression == Expressions.alwaysFalse()) {
reader.select(MANIFEST_ENTRY_FIELDS_FOR_HAS_DELETED_FILES);
}
PartitionSpec spec = reader.spec();
PartitionAndMetricsEvaluator evaluator =
new PartitionAndMetricsEvaluator(tableSchema, spec, deleteExpression);
// this assumes that the manifest doesn't have files to remove and streams through the
// manifest without copying data. if a manifest does have a file to remove, this will break
// out of the loop and move on to filtering the manifest.
if (manifestHasDeletedFiles(evaluator, manifest, reader)) {
ManifestFile filtered = filterManifestWithDeletedFiles(evaluator, manifest, reader);
replacedManifestsCount.incrementAndGet();
return filtered;
} else {
// if the manifest doesn't have files to remove, stream through manifest without copying data
if (!manifestHasDeletedFiles(evaluator, manifest, reader)) {
filteredManifests.put(manifest, manifest);
return manifest;
}
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest);
}

// the manifest does have files to remove; reopen and rewrite the manifest
try (ManifestReader<F> reader = newManifestReader(manifest)) {
PartitionSpec spec = reader.spec();
PartitionAndMetricsEvaluator evaluator =
new PartitionAndMetricsEvaluator(tableSchema, spec, deleteExpression);
ManifestFile filtered = filterManifestWithDeletedFiles(evaluator, manifest, reader);
replacedManifestsCount.incrementAndGet();
return filtered;
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest);
}
}

private boolean canContainDeletedFiles(ManifestFile manifest, boolean trustManifestReferences) {
Expand Down Expand Up @@ -456,6 +483,7 @@ private boolean manifestHasDeletedFiles(
boolean isDelete = reader.isDeleteManifestReader();

for (ManifestEntry<F> entry : reader.liveEntries()) {
// F can be partially projected, see: MANIFEST_ENTRY_FIELDS_FOR_HAS_DELETED_FILES
F file = entry.file();
boolean markedForDelete =
deletePaths.contains(file.location())
Expand Down
225 changes: 225 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestRewriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,19 @@
import static org.assertj.core.api.Assumptions.assumeThat;

import java.io.File;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.internal.util.collections.Sets;
Expand Down Expand Up @@ -778,6 +783,226 @@ public void testNewDeleteFile() {
branch);
}

@TestTemplate
public void testRewriteWithManyUntouchedDeleteManifests() {
assumeThat(formatVersion).isGreaterThanOrEqualTo(2);

int numManifests = 100;
int entriesPerManifest = 10;
int statsColumns = 100;

// disable manifest merging to produce large numbers of manifests synthetically
table.updateProperties().set(TableProperties.MANIFEST_MERGE_ENABLED, "false").commit();

// metrics are the dominant source of allocations when reading manifest entries
ImmutableMap.Builder<Integer, Long> valueCounts = ImmutableMap.builder();
ImmutableMap.Builder<Integer, Long> nullCounts = ImmutableMap.builder();
ImmutableMap.Builder<Integer, ByteBuffer> lowerBounds = ImmutableMap.builder();
ImmutableMap.Builder<Integer, ByteBuffer> upperBounds = ImmutableMap.builder();
for (int col = 0; col < statsColumns; col++) {
int fieldId = 1000 + col;
valueCounts.put(fieldId, 1000L);
nullCounts.put(fieldId, 0L);
lowerBounds.put(fieldId, Conversions.toByteBuffer(Types.IntegerType.get(), 0));
upperBounds.put(fieldId, Conversions.toByteBuffer(Types.IntegerType.get(), 1000));
}
Metrics metrics =
new Metrics(
1L,
null,
valueCounts.build(),
nullCounts.build(),
null,
lowerBounds.build(),
upperBounds.build());

List<DataFile> dataFiles = Lists.newArrayList();
List<DeleteFile> deleteFiles = Lists.newArrayList();
for (int m = 0; m < numManifests; m++) {
// batch entriesPerManifest pairs into a single commit so each manifest has multiple entries
RowDelta rowDelta = table.newRowDelta();
for (int e = 0; e < entriesPerManifest; e++) {
int idx = m * entriesPerManifest + e;
DataFile dataFile =
DataFiles.builder(SPEC)
.withPath("/path/to/data-many-" + idx + ".parquet")
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=0")
.withRecordCount(1)
.build();
DeleteFile deleteFile;
if (formatVersion >= 3) {
deleteFile =
FileMetadata.deleteFileBuilder(SPEC)
.ofPositionDeletes()
.withPath("/path/to/delete-many-" + idx + ".puffin")
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=0")
.withRecordCount(1)
.withReferencedDataFile(dataFile.location())
.withContentOffset(4)
.withContentSizeInBytes(6)
.withMetrics(metrics)
.build();
} else {
deleteFile =
FileMetadata.deleteFileBuilder(SPEC)
.ofPositionDeletes()
.withPath("/path/to/delete-many-" + idx + ".parquet")
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=0")
.withRecordCount(1)
.withMetrics(metrics)
.build();
}
dataFiles.add(dataFile);
deleteFiles.add(deleteFile);
rowDelta.addRows(dataFile).addDeletes(deleteFile);
}
commit(table, rowDelta, branch);
}

Snapshot baseSnapshot = latestSnapshot(table, branch);
List<ManifestFile> baseDeleteManifests = baseSnapshot.deleteManifests(table.io());
assertThat(baseDeleteManifests).hasSize(numManifests);

Set<String> baseDeleteManifestPaths =
baseDeleteManifests.stream().map(ManifestFile::path).collect(Collectors.toSet());

DataFile replacementFile =
DataFiles.builder(SPEC)
.withPath("/path/to/data-many-replacement.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=0")
.withRecordCount(1)
.build();

// Compact only the first data+delete file pair, all other manifests should remain unchanged
// This simulates incremental compaction of a small portion of a large table
commit(
table,
table
.newRewrite()
.validateFromSnapshot(baseSnapshot.snapshotId())
.rewriteFiles(
ImmutableSet.of(dataFiles.get(0)),
ImmutableSet.of(deleteFiles.get(0)),
ImmutableSet.of(replacementFile),
ImmutableSet.of()),
branch);

Snapshot resultSnapshot = latestSnapshot(table, branch);
List<ManifestFile> resultDeleteManifests = resultSnapshot.deleteManifests(table.io());

assertThat(resultDeleteManifests).hasSize(numManifests);

Set<String> resultDeleteManifestPaths =
resultDeleteManifests.stream().map(ManifestFile::path).collect(Collectors.toSet());

long passedThrough =
resultDeleteManifestPaths.stream().filter(baseDeleteManifestPaths::contains).count();
long rewritten =
resultDeleteManifestPaths.stream()
.filter(p -> !baseDeleteManifestPaths.contains(p))
.count();

assertThat(passedThrough)
.as("exactly %d delete manifests should pass through unchanged", numManifests - 1)
.isEqualTo(numManifests - 1);
assertThat(rewritten).as("exactly one delete manifest should be rewritten").isEqualTo(1);
}

@TestTemplate
public void testRewriteFilesWithDeletionVectors() {
// DeleteFileSet equality uses: location, contentOffset, contentSizeInBytes.
// Ensure MANIFEST_ENTRY_FIELDS_FOR_HAS_DELETED_FILES contains these fields
assumeThat(formatVersion).isGreaterThanOrEqualTo(3);

table.updateProperties().set(TableProperties.MANIFEST_MERGE_ENABLED, "false").commit();

DataFile dataFile1 =
DataFiles.builder(SPEC)
.withPath("/path/to/data-dv-1.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=0")
.withRecordCount(1)
.build();
DataFile dataFile2 =
DataFiles.builder(SPEC)
.withPath("/path/to/data-dv-2.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=0")
.withRecordCount(1)
.build();

// Two DVs packed into the same puffin file at different byte ranges
// location alone is not sufficient for identity, contentOffset and contentSizeInBytes are
// required
DeleteFile dv1 =
FileMetadata.deleteFileBuilder(SPEC)
.ofPositionDeletes()
.withPath("/path/to/shared.puffin")
.withFileSizeInBytes(100)
.withPartitionPath("data_bucket=0")
.withRecordCount(1)
.withReferencedDataFile(dataFile1.location())
.withContentOffset(0)
.withContentSizeInBytes(40)
.build();
DeleteFile dv2 =
FileMetadata.deleteFileBuilder(SPEC)
.ofPositionDeletes()
.withPath("/path/to/shared.puffin")
.withFileSizeInBytes(100)
.withPartitionPath("data_bucket=0")
.withRecordCount(1)
.withReferencedDataFile(dataFile2.location())
.withContentOffset(40)
.withContentSizeInBytes(60)
.build();

commit(table, table.newRowDelta().addRows(dataFile1).addDeletes(dv1), branch);
commit(table, table.newRowDelta().addRows(dataFile2).addDeletes(dv2), branch);

Snapshot baseSnapshot = latestSnapshot(table, branch);
List<ManifestFile> baseDeleteManifests = baseSnapshot.deleteManifests(table.io());
assertThat(baseDeleteManifests).hasSize(2);
Set<String> baseDeleteManifestPaths =
baseDeleteManifests.stream().map(ManifestFile::path).collect(Collectors.toSet());

DataFile replacement =
DataFiles.builder(SPEC)
.withPath("/path/to/data-dv-replacement.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=0")
.withRecordCount(1)
.build();

commit(
table,
table
.newRewrite()
.validateFromSnapshot(baseSnapshot.snapshotId())
.rewriteFiles(
ImmutableSet.of(dataFile1),
ImmutableSet.of(dv1),
ImmutableSet.of(replacement),
ImmutableSet.of()),
branch);

List<ManifestFile> resultDeleteManifests =
latestSnapshot(table, branch).deleteManifests(table.io());
assertThat(resultDeleteManifests).hasSize(2);
Set<String> resultPaths =
resultDeleteManifests.stream().map(ManifestFile::path).collect(Collectors.toSet());

long passedThrough = resultPaths.stream().filter(baseDeleteManifestPaths::contains).count();
long rewritten = resultPaths.stream().filter(p -> !baseDeleteManifestPaths.contains(p)).count();

assertThat(passedThrough).as("dv2 manifest should pass through unchanged").isEqualTo(1);
assertThat(rewritten).as("dv1 manifest should be rewritten").isEqualTo(1);
}

@TestTemplate
public void removingDataFileAlsoRemovesDV() {
assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
Expand Down
Loading