From 9c0a662091dc65fcf7756e51c418679d1f5b161b Mon Sep 17 00:00:00 2001 From: Grant Nicholas Date: Sun, 10 May 2026 11:40:54 -0500 Subject: [PATCH] Skip reading manifest entry stats when not used in ManifestFilterManager manifestHasDeletedFiles doesn't read manifest stats when deleteExpression not set --- .../apache/iceberg/ManifestFilterManager.java | 44 +++- .../org/apache/iceberg/TestRewriteFiles.java | 225 ++++++++++++++++++ 2 files changed, 261 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java index 7d146d924667..13db15f8dc35 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java +++ b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java @@ -42,6 +42,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.CharSequenceSet; import org.apache.iceberg.util.ContentFileUtil; import org.apache.iceberg.util.ManifestFileUtil; @@ -55,6 +56,22 @@ abstract class ManifestFilterManager> { private static final Logger LOG = LoggerFactory.getLogger(ManifestFilterManager.class); private static final Joiner COMMA = Joiner.on(","); + // avoid projecting large fields when they are not used in manifestHasDeletedFiles + private static final List MANIFEST_ENTRY_IGNORED_FIELDS = + ImmutableList.of( + DataFile.COLUMN_SIZES.fieldId(), + DataFile.VALUE_COUNTS.fieldId(), + DataFile.NULL_VALUE_COUNTS.fieldId(), + DataFile.NAN_VALUE_COUNTS.fieldId(), + DataFile.LOWER_BOUNDS.fieldId(), + DataFile.UPPER_BOUNDS.fieldId(), + DataFile.KEY_METADATA.fieldId(), + DataFile.SPLIT_OFFSETS.fieldId()); + private static final List MANIFEST_ENTRY_FIELDS_FOR_HAS_DELETED_FILES = + DataFile.getType(BaseFile.EMPTY_STRUCT_TYPE).fields().stream() + .filter(field -> !MANIFEST_ENTRY_IGNORED_FIELDS.contains(field.fieldId())) + .map(Types.NestedField::name) + .collect(ImmutableList.toImmutableList()); protected static class DeleteException extends ValidationException { private final String partition; @@ -378,23 +395,33 @@ private ManifestFile filterManifest( } try (ManifestReader reader = newManifestReader(manifest)) { + // can skip reading large stats columns to reduce allocations + if (deleteExpression == Expressions.alwaysFalse()) { + reader.select(MANIFEST_ENTRY_FIELDS_FOR_HAS_DELETED_FILES); + } PartitionSpec spec = reader.spec(); PartitionAndMetricsEvaluator evaluator = new PartitionAndMetricsEvaluator(tableSchema, spec, deleteExpression); - // this assumes that the manifest doesn't have files to remove and streams through the - // manifest without copying data. if a manifest does have a file to remove, this will break - // out of the loop and move on to filtering the manifest. - if (manifestHasDeletedFiles(evaluator, manifest, reader)) { - ManifestFile filtered = filterManifestWithDeletedFiles(evaluator, manifest, reader); - replacedManifestsCount.incrementAndGet(); - return filtered; - } else { + // if the manifest doesn't have files to remove, stream through manifest without copying data + if (!manifestHasDeletedFiles(evaluator, manifest, reader)) { filteredManifests.put(manifest, manifest); return manifest; } } catch (IOException e) { throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest); } + + // the manifest does have files to remove; reopen and rewrite the manifest + try (ManifestReader reader = newManifestReader(manifest)) { + PartitionSpec spec = reader.spec(); + PartitionAndMetricsEvaluator evaluator = + new PartitionAndMetricsEvaluator(tableSchema, spec, deleteExpression); + ManifestFile filtered = filterManifestWithDeletedFiles(evaluator, manifest, reader); + replacedManifestsCount.incrementAndGet(); + return filtered; + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest); + } } private boolean canContainDeletedFiles(ManifestFile manifest, boolean trustManifestReferences) { @@ -456,6 +483,7 @@ private boolean manifestHasDeletedFiles( boolean isDelete = reader.isDeleteManifestReader(); for (ManifestEntry entry : reader.liveEntries()) { + // F can be partially projected, see: MANIFEST_ENTRY_FIELDS_FOR_HAS_DELETED_FILES F file = entry.file(); boolean markedForDelete = deletePaths.contains(file.location()) diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java b/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java index 72a3c89b74d5..2c1c8f16a73a 100644 --- a/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java +++ b/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java @@ -27,14 +27,19 @@ import static org.assertj.core.api.Assumptions.assumeThat; import java.io.File; +import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Types; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.internal.util.collections.Sets; @@ -778,6 +783,226 @@ public void testNewDeleteFile() { branch); } + @TestTemplate + public void testRewriteWithManyUntouchedDeleteManifests() { + assumeThat(formatVersion).isGreaterThanOrEqualTo(2); + + int numManifests = 100; + int entriesPerManifest = 10; + int statsColumns = 100; + + // disable manifest merging to produce large numbers of manifests synthetically + table.updateProperties().set(TableProperties.MANIFEST_MERGE_ENABLED, "false").commit(); + + // metrics are the dominant source of allocations when reading manifest entries + ImmutableMap.Builder valueCounts = ImmutableMap.builder(); + ImmutableMap.Builder nullCounts = ImmutableMap.builder(); + ImmutableMap.Builder lowerBounds = ImmutableMap.builder(); + ImmutableMap.Builder upperBounds = ImmutableMap.builder(); + for (int col = 0; col < statsColumns; col++) { + int fieldId = 1000 + col; + valueCounts.put(fieldId, 1000L); + nullCounts.put(fieldId, 0L); + lowerBounds.put(fieldId, Conversions.toByteBuffer(Types.IntegerType.get(), 0)); + upperBounds.put(fieldId, Conversions.toByteBuffer(Types.IntegerType.get(), 1000)); + } + Metrics metrics = + new Metrics( + 1L, + null, + valueCounts.build(), + nullCounts.build(), + null, + lowerBounds.build(), + upperBounds.build()); + + List dataFiles = Lists.newArrayList(); + List deleteFiles = Lists.newArrayList(); + for (int m = 0; m < numManifests; m++) { + // batch entriesPerManifest pairs into a single commit so each manifest has multiple entries + RowDelta rowDelta = table.newRowDelta(); + for (int e = 0; e < entriesPerManifest; e++) { + int idx = m * entriesPerManifest + e; + DataFile dataFile = + DataFiles.builder(SPEC) + .withPath("/path/to/data-many-" + idx + ".parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=0") + .withRecordCount(1) + .build(); + DeleteFile deleteFile; + if (formatVersion >= 3) { + deleteFile = + FileMetadata.deleteFileBuilder(SPEC) + .ofPositionDeletes() + .withPath("/path/to/delete-many-" + idx + ".puffin") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=0") + .withRecordCount(1) + .withReferencedDataFile(dataFile.location()) + .withContentOffset(4) + .withContentSizeInBytes(6) + .withMetrics(metrics) + .build(); + } else { + deleteFile = + FileMetadata.deleteFileBuilder(SPEC) + .ofPositionDeletes() + .withPath("/path/to/delete-many-" + idx + ".parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=0") + .withRecordCount(1) + .withMetrics(metrics) + .build(); + } + dataFiles.add(dataFile); + deleteFiles.add(deleteFile); + rowDelta.addRows(dataFile).addDeletes(deleteFile); + } + commit(table, rowDelta, branch); + } + + Snapshot baseSnapshot = latestSnapshot(table, branch); + List baseDeleteManifests = baseSnapshot.deleteManifests(table.io()); + assertThat(baseDeleteManifests).hasSize(numManifests); + + Set baseDeleteManifestPaths = + baseDeleteManifests.stream().map(ManifestFile::path).collect(Collectors.toSet()); + + DataFile replacementFile = + DataFiles.builder(SPEC) + .withPath("/path/to/data-many-replacement.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=0") + .withRecordCount(1) + .build(); + + // Compact only the first data+delete file pair, all other manifests should remain unchanged + // This simulates incremental compaction of a small portion of a large table + commit( + table, + table + .newRewrite() + .validateFromSnapshot(baseSnapshot.snapshotId()) + .rewriteFiles( + ImmutableSet.of(dataFiles.get(0)), + ImmutableSet.of(deleteFiles.get(0)), + ImmutableSet.of(replacementFile), + ImmutableSet.of()), + branch); + + Snapshot resultSnapshot = latestSnapshot(table, branch); + List resultDeleteManifests = resultSnapshot.deleteManifests(table.io()); + + assertThat(resultDeleteManifests).hasSize(numManifests); + + Set resultDeleteManifestPaths = + resultDeleteManifests.stream().map(ManifestFile::path).collect(Collectors.toSet()); + + long passedThrough = + resultDeleteManifestPaths.stream().filter(baseDeleteManifestPaths::contains).count(); + long rewritten = + resultDeleteManifestPaths.stream() + .filter(p -> !baseDeleteManifestPaths.contains(p)) + .count(); + + assertThat(passedThrough) + .as("exactly %d delete manifests should pass through unchanged", numManifests - 1) + .isEqualTo(numManifests - 1); + assertThat(rewritten).as("exactly one delete manifest should be rewritten").isEqualTo(1); + } + + @TestTemplate + public void testRewriteFilesWithDeletionVectors() { + // DeleteFileSet equality uses: location, contentOffset, contentSizeInBytes. + // Ensure MANIFEST_ENTRY_FIELDS_FOR_HAS_DELETED_FILES contains these fields + assumeThat(formatVersion).isGreaterThanOrEqualTo(3); + + table.updateProperties().set(TableProperties.MANIFEST_MERGE_ENABLED, "false").commit(); + + DataFile dataFile1 = + DataFiles.builder(SPEC) + .withPath("/path/to/data-dv-1.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=0") + .withRecordCount(1) + .build(); + DataFile dataFile2 = + DataFiles.builder(SPEC) + .withPath("/path/to/data-dv-2.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=0") + .withRecordCount(1) + .build(); + + // Two DVs packed into the same puffin file at different byte ranges + // location alone is not sufficient for identity, contentOffset and contentSizeInBytes are + // required + DeleteFile dv1 = + FileMetadata.deleteFileBuilder(SPEC) + .ofPositionDeletes() + .withPath("/path/to/shared.puffin") + .withFileSizeInBytes(100) + .withPartitionPath("data_bucket=0") + .withRecordCount(1) + .withReferencedDataFile(dataFile1.location()) + .withContentOffset(0) + .withContentSizeInBytes(40) + .build(); + DeleteFile dv2 = + FileMetadata.deleteFileBuilder(SPEC) + .ofPositionDeletes() + .withPath("/path/to/shared.puffin") + .withFileSizeInBytes(100) + .withPartitionPath("data_bucket=0") + .withRecordCount(1) + .withReferencedDataFile(dataFile2.location()) + .withContentOffset(40) + .withContentSizeInBytes(60) + .build(); + + commit(table, table.newRowDelta().addRows(dataFile1).addDeletes(dv1), branch); + commit(table, table.newRowDelta().addRows(dataFile2).addDeletes(dv2), branch); + + Snapshot baseSnapshot = latestSnapshot(table, branch); + List baseDeleteManifests = baseSnapshot.deleteManifests(table.io()); + assertThat(baseDeleteManifests).hasSize(2); + Set baseDeleteManifestPaths = + baseDeleteManifests.stream().map(ManifestFile::path).collect(Collectors.toSet()); + + DataFile replacement = + DataFiles.builder(SPEC) + .withPath("/path/to/data-dv-replacement.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=0") + .withRecordCount(1) + .build(); + + commit( + table, + table + .newRewrite() + .validateFromSnapshot(baseSnapshot.snapshotId()) + .rewriteFiles( + ImmutableSet.of(dataFile1), + ImmutableSet.of(dv1), + ImmutableSet.of(replacement), + ImmutableSet.of()), + branch); + + List resultDeleteManifests = + latestSnapshot(table, branch).deleteManifests(table.io()); + assertThat(resultDeleteManifests).hasSize(2); + Set resultPaths = + resultDeleteManifests.stream().map(ManifestFile::path).collect(Collectors.toSet()); + + long passedThrough = resultPaths.stream().filter(baseDeleteManifestPaths::contains).count(); + long rewritten = resultPaths.stream().filter(p -> !baseDeleteManifestPaths.contains(p)).count(); + + assertThat(passedThrough).as("dv2 manifest should pass through unchanged").isEqualTo(1); + assertThat(rewritten).as("dv1 manifest should be rewritten").isEqualTo(1); + } + @TestTemplate public void removingDataFileAlsoRemovesDV() { assumeThat(formatVersion).isGreaterThanOrEqualTo(3);