diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java index f8dce8f2e853..cc13cfdf135c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java @@ -22,11 +22,14 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.serializer.RowCompactedSerializer; +import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.lookup.LookupStoreFactory; import org.apache.paimon.lookup.LookupStoreWriter; +import org.apache.paimon.mergetree.lookup.FilePosition; import org.apache.paimon.mergetree.lookup.LookupSerializerFactory; import org.apache.paimon.mergetree.lookup.PersistProcessor; +import org.apache.paimon.mergetree.lookup.PositionedKeyValue; import org.apache.paimon.mergetree.lookup.RemoteFileDownloader; import org.apache.paimon.reader.FileRecordIterator; import org.apache.paimon.reader.RecordReader; @@ -69,6 +72,7 @@ public class LookupLevels implements Levels.DropFileCallback, Closeable { private final LookupStoreFactory lookupStoreFactory; private final Function bfGenerator; private final Cache lookupFileCache; + private final DeletionVector.Factory dvFactory; private final Set ownCachedFiles; private final Map, PersistProcessor> schemaIdAndSerVersionToProcessors; @@ -86,7 +90,8 @@ public LookupLevels( Function localFileFactory, LookupStoreFactory lookupStoreFactory, Function bfGenerator, - Cache lookupFileCache) { + Cache lookupFileCache, + DeletionVector.Factory dvFactory) { this.schemaFunction = schemaFunction; this.currentSchemaId = currentSchemaId; this.levels = levels; @@ -99,6 +104,7 @@ public LookupLevels( this.lookupStoreFactory = lookupStoreFactory; this.bfGenerator = bfGenerator; this.lookupFileCache = lookupFileCache; + this.dvFactory = dvFactory; this.ownCachedFiles = new HashSet<>(); this.schemaIdAndSerVersionToProcessors = new ConcurrentHashMap<>(); levels.addDropFileCallback(this); @@ -129,7 +135,36 @@ public void notifyDropFile(String file) { @Nullable public T lookup(InternalRow key, int startLevel) throws IOException { - return LookupUtils.lookup(levels, key, startLevel, this::lookup, this::lookupLevel0); + T result = LookupUtils.lookup(levels, key, startLevel, this::lookup, this::lookupLevel0); + if (result != null && isDeletedByDeletionVector(result)) { + // The hit may be served by a cached lookup file built before the latest + // deletion vector update: data files are immutable, but their deletion + // vectors are not, and lookup files freeze the deletion state of their + // build time. A hit whose position is marked deleted means the newest + // version of this key is gone; deeper levels only hold older versions, + // so the key must be reported as absent instead of continuing the search. + return null; + } + return result; + } + + private boolean isDeletedByDeletionVector(T result) throws IOException { + String fileName; + long rowPosition; + if (result instanceof PositionedKeyValue) { + PositionedKeyValue positioned = (PositionedKeyValue) result; + fileName = positioned.fileName(); + rowPosition = positioned.rowPosition(); + } else if (result instanceof FilePosition) { + FilePosition position = (FilePosition) result; + fileName = position.fileName(); + rowPosition = position.rowPosition(); + } else { + // No position information persisted (e.g. value-only processors), cannot + // validate the hit against the deletion vector. + return false; + } + return dvFactory.create(fileName).map(dv -> dv.isDeleted(rowPosition)).orElse(false); } @Nullable diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java index 89d19313844e..8233915ed383 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java @@ -309,7 +309,12 @@ private MergeTreeCompactRewriter createRewriter( } LookupLevels lookupLevels = createLookupLevels( - partition, bucket, levels, processorFactory, lookupReaderFactory); + partition, + bucket, + levels, + processorFactory, + lookupReaderFactory, + dvFactory); RemoteLookupFileManager remoteLookupFileManager = null; if (options.lookupRemoteFileEnabled()) { remoteLookupFileManager = @@ -351,7 +356,8 @@ private LookupLevels createLookupLevels( int bucket, Levels levels, PersistProcessor.Factory processorFactory, - FileReaderFactory readerFactory) { + FileReaderFactory readerFactory, + DeletionVector.Factory dvFactory) { if (ioManager == null) { throw new RuntimeException( "Can not use lookup, there is no temp disk directory to use."); @@ -384,7 +390,8 @@ private LookupLevels createLookupLevels( .getPathFile(), lookupStoreFactory, bfGenerator(options), - lookupFileCache); + lookupFileCache, + dvFactory); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java index f2e83d073fc0..f7cd5a9a1567 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java @@ -168,7 +168,9 @@ private void newLookupLevels(BinaryRow partition, int bucket, List .getPathFile(), lookupStoreFactory, bfGenerator(options), - lookupFileCache); + lookupFileCache, + // TODO pass DeletionVector factory (see reader factory above) + DeletionVector.emptyFactory()); // Optimization - download lookup files if already persisted to object store // We download these files if three conditions are met diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java index 7bc95449d5a6..2c575f0b45e4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java @@ -210,7 +210,8 @@ private LookupLevels createContainsLevels(Levels levels, MemorySize max 4096, new CompressOptions("none", 1)), rowCount -> BloomFilter.builder(rowCount, 0.01), - LookupFile.createCache(Duration.ofHours(1), maxDiskSize)); + LookupFile.createCache(Duration.ofHours(1), maxDiskSize), + DeletionVector.emptyFactory()); } private KeyValue kv(int key, int value) { diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java index 6aac8e1fe7bb..1c9cb7e47838 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java @@ -25,6 +25,7 @@ import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.serializer.RowCompactedSerializer; +import org.apache.paimon.deletionvectors.BitmapDeletionVector; import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.format.FlushingFileFormat; import org.apache.paimon.fs.FileIOFinder; @@ -37,7 +38,9 @@ import org.apache.paimon.lookup.sort.SortLookupStoreFactory; import org.apache.paimon.manifest.FileSource; import org.apache.paimon.mergetree.lookup.DefaultLookupSerializerFactory; +import org.apache.paimon.mergetree.lookup.PersistValueAndPosProcessor; import org.apache.paimon.mergetree.lookup.PersistValueProcessor; +import org.apache.paimon.mergetree.lookup.PositionedKeyValue; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; import org.apache.paimon.schema.KeyValueFieldsExtractor; @@ -66,6 +69,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.function.Function; @@ -270,6 +274,61 @@ public void testLookupLevel0() throws Exception { assertThat(kv.value().getInt(1)).isEqualTo(11); } + @Test + public void testLookupRespectsDeletionVectorUpdates() throws IOException { + Levels levels = new Levels(comparator, Arrays.asList(newFile(1, kv(1, 11))), 3); + Map deletionVectors = new HashMap<>(); + LookupLevels lookupLevels = + createPositionedLookupLevels( + levels, fileName -> Optional.ofNullable(deletionVectors.get(fileName))); + + // first lookup hits the live row and warms the lookup file cache + PositionedKeyValue hit = lookupLevels.lookup(row(1), 1); + assertThat(hit).isNotNull(); + assertThat(hit.keyValue().value().getInt(1)).isEqualTo(11); + + // a deletion of an unrelated position must not affect the live row + BitmapDeletionVector unrelated = new BitmapDeletionVector(); + unrelated.delete(hit.rowPosition() + 1); + deletionVectors.put(hit.fileName(), unrelated); + hit = lookupLevels.lookup(row(1), 1); + assertThat(hit).isNotNull(); + assertThat(hit.keyValue().value().getInt(1)).isEqualTo(11); + + // mark the returned position deleted; data files are immutable so the cached + // lookup file is not rebuilt and would keep serving the stale row + BitmapDeletionVector deletionVector = new BitmapDeletionVector(); + deletionVector.delete(hit.rowPosition()); + deletionVectors.put(hit.fileName(), deletionVector); + + // the hit must now be validated against the current deletion vector + assertThat(lookupLevels.lookup(row(1), 1)).isNull(); + + lookupLevels.close(); + } + + private LookupLevels createPositionedLookupLevels( + Levels levels, DeletionVector.Factory dvFactory) { + return new LookupLevels<>( + schemaId -> rowType, + 0L, + levels, + comparator, + keyType, + PersistValueAndPosProcessor.factory(rowType), + new DefaultLookupSerializerFactory(), + file -> createReaderFactory().createRecordReader(file), + file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + UUID.randomUUID()), + new SortLookupStoreFactory( + new RowCompactedSerializer(keyType).createSliceComparator(), + new CacheManager(MemorySize.ofMebiBytes(1)), + 4096, + new CompressOptions("none", 1)), + rowCount -> BloomFilter.builder(rowCount, 0.05), + LookupFile.createCache(Duration.ofHours(1), MemorySize.ofMebiBytes(10)), + dvFactory); + } + private LookupLevels createLookupLevels(Levels levels, MemorySize maxDiskSize) { return new LookupLevels<>( schemaId -> rowType, @@ -287,7 +346,8 @@ private LookupLevels createLookupLevels(Levels levels, MemorySize maxD 4096, new CompressOptions("none", 1)), rowCount -> BloomFilter.builder(rowCount, 0.05), - LookupFile.createCache(Duration.ofHours(1), maxDiskSize)); + LookupFile.createCache(Duration.ofHours(1), maxDiskSize), + DeletionVector.emptyFactory()); } private KeyValue kv(int key, int value) {