Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.RowCompactedSerializer;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.lookup.LookupStoreFactory;
import org.apache.paimon.lookup.LookupStoreWriter;
import org.apache.paimon.mergetree.lookup.FilePosition;
import org.apache.paimon.mergetree.lookup.LookupSerializerFactory;
import org.apache.paimon.mergetree.lookup.PersistProcessor;
import org.apache.paimon.mergetree.lookup.PositionedKeyValue;
import org.apache.paimon.mergetree.lookup.RemoteFileDownloader;
import org.apache.paimon.reader.FileRecordIterator;
import org.apache.paimon.reader.RecordReader;
Expand Down Expand Up @@ -69,6 +72,7 @@ public class LookupLevels<T> implements Levels.DropFileCallback, Closeable {
private final LookupStoreFactory lookupStoreFactory;
private final Function<Long, BloomFilter.Builder> bfGenerator;
private final Cache<String, LookupFile> lookupFileCache;
private final DeletionVector.Factory dvFactory;
private final Set<String> ownCachedFiles;
private final Map<Pair<Long, String>, PersistProcessor<T>> schemaIdAndSerVersionToProcessors;

Expand All @@ -86,7 +90,8 @@ public LookupLevels(
Function<String, File> localFileFactory,
LookupStoreFactory lookupStoreFactory,
Function<Long, BloomFilter.Builder> bfGenerator,
Cache<String, LookupFile> lookupFileCache) {
Cache<String, LookupFile> lookupFileCache,
DeletionVector.Factory dvFactory) {
this.schemaFunction = schemaFunction;
this.currentSchemaId = currentSchemaId;
this.levels = levels;
Expand All @@ -99,6 +104,7 @@ public LookupLevels(
this.lookupStoreFactory = lookupStoreFactory;
this.bfGenerator = bfGenerator;
this.lookupFileCache = lookupFileCache;
this.dvFactory = dvFactory;
this.ownCachedFiles = new HashSet<>();
this.schemaIdAndSerVersionToProcessors = new ConcurrentHashMap<>();
levels.addDropFileCallback(this);
Expand Down Expand Up @@ -129,7 +135,36 @@ public void notifyDropFile(String file) {

@Nullable
public T lookup(InternalRow key, int startLevel) throws IOException {
return LookupUtils.lookup(levels, key, startLevel, this::lookup, this::lookupLevel0);
T result = LookupUtils.lookup(levels, key, startLevel, this::lookup, this::lookupLevel0);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check only covers the situation where 'deleted rows are hit in the cache lookup file', but when the cold cache/cache is retired, it may still retrieve older versions at a deeper level. Create SstFileFromDataMile will use a reader with dvFactory to build a lookup file, and the reader will first filter deleted rows using DV; So the latest record at the current level is filtered and returns null, and LookupUtils.lookup continues to search the next layer, possibly returning an old version. This is semantically inconsistent with the comment 'deleted hit means newest version is gone; must not continue deeper'. Suggest not applying DV first when building lookup SST. Keep the position and have LookupLevels verify and short-circuit it, or explicitly distinguish between "hit but deleted by DV" and "current file has no key"

if (result != null && isDeletedByDeletionVector(result)) {
// The hit may be served by a cached lookup file built before the latest
// deletion vector update: data files are immutable, but their deletion
// vectors are not, and lookup files freeze the deletion state of their
// build time. A hit whose position is marked deleted means the newest
// version of this key is gone; deeper levels only hold older versions,
// so the key must be reported as absent instead of continuing the search.
return null;
}
return result;
}

private boolean isDeletedByDeletionVector(T result) throws IOException {
String fileName;
long rowPosition;
if (result instanceof PositionedKeyValue) {
PositionedKeyValue positioned = (PositionedKeyValue) result;
fileName = positioned.fileName();
rowPosition = positioned.rowPosition();
} else if (result instanceof FilePosition) {
FilePosition position = (FilePosition) result;
fileName = position.fileName();
rowPosition = position.rowPosition();
} else {
// No position information persisted (e.g. value-only processors), cannot
// validate the hit against the deletion vector.
return false;
}
return dvFactory.create(fileName).map(dv -> dv.isDeleted(rowPosition)).orElse(false);
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,12 @@ private MergeTreeCompactRewriter createRewriter(
}
LookupLevels<?> lookupLevels =
createLookupLevels(
partition, bucket, levels, processorFactory, lookupReaderFactory);
partition,
bucket,
levels,
processorFactory,
lookupReaderFactory,
dvFactory);
RemoteLookupFileManager<?> remoteLookupFileManager = null;
if (options.lookupRemoteFileEnabled()) {
remoteLookupFileManager =
Expand Down Expand Up @@ -351,7 +356,8 @@ private <T> LookupLevels<T> createLookupLevels(
int bucket,
Levels levels,
PersistProcessor.Factory<T> processorFactory,
FileReaderFactory<KeyValue> readerFactory) {
FileReaderFactory<KeyValue> readerFactory,
DeletionVector.Factory dvFactory) {
if (ioManager == null) {
throw new RuntimeException(
"Can not use lookup, there is no temp disk directory to use.");
Expand Down Expand Up @@ -384,7 +390,8 @@ private <T> LookupLevels<T> createLookupLevels(
.getPathFile(),
lookupStoreFactory,
bfGenerator(options),
lookupFileCache);
lookupFileCache,
dvFactory);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,9 @@ private void newLookupLevels(BinaryRow partition, int bucket, List<DataFileMeta>
.getPathFile(),
lookupStoreFactory,
bfGenerator(options),
lookupFileCache);
lookupFileCache,
// TODO pass DeletionVector factory (see reader factory above)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LocalTableQuery still passes DeletionVector. mptyFactory() to both reader and LookupLevels, and uses PersistValueProcessor with no position information available for new validation. Long term cache paths such as Flink lookup service/partial lookup may still return deleted cached values even after DV updates the same data file. The TODO here is not a small issue. The PR title says that fixing LookupLevels respects the current DV, but this public lookup path still completely disregards DV.

DeletionVector.emptyFactory());

// Optimization - download lookup files if already persisted to object store
// We download these files if three conditions are met
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ private LookupLevels<Boolean> createContainsLevels(Levels levels, MemorySize max
4096,
new CompressOptions("none", 1)),
rowCount -> BloomFilter.builder(rowCount, 0.01),
LookupFile.createCache(Duration.ofHours(1), maxDiskSize));
LookupFile.createCache(Duration.ofHours(1), maxDiskSize),
DeletionVector.emptyFactory());
}

private KeyValue kv(int key, int value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.RowCompactedSerializer;
import org.apache.paimon.deletionvectors.BitmapDeletionVector;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.format.FlushingFileFormat;
import org.apache.paimon.fs.FileIOFinder;
Expand All @@ -37,7 +38,9 @@
import org.apache.paimon.lookup.sort.SortLookupStoreFactory;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.mergetree.lookup.DefaultLookupSerializerFactory;
import org.apache.paimon.mergetree.lookup.PersistValueAndPosProcessor;
import org.apache.paimon.mergetree.lookup.PersistValueProcessor;
import org.apache.paimon.mergetree.lookup.PositionedKeyValue;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
Expand Down Expand Up @@ -66,6 +69,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;

Expand Down Expand Up @@ -270,6 +274,61 @@ public void testLookupLevel0() throws Exception {
assertThat(kv.value().getInt(1)).isEqualTo(11);
}

@Test
public void testLookupRespectsDeletionVectorUpdates() throws IOException {
Levels levels = new Levels(comparator, Arrays.asList(newFile(1, kv(1, 11))), 3);
Map<String, DeletionVector> deletionVectors = new HashMap<>();
LookupLevels<PositionedKeyValue> lookupLevels =
createPositionedLookupLevels(
levels, fileName -> Optional.ofNullable(deletionVectors.get(fileName)));

// first lookup hits the live row and warms the lookup file cache
PositionedKeyValue hit = lookupLevels.lookup(row(1), 1);
assertThat(hit).isNotNull();
assertThat(hit.keyValue().value().getInt(1)).isEqualTo(11);

// a deletion of an unrelated position must not affect the live row
BitmapDeletionVector unrelated = new BitmapDeletionVector();
unrelated.delete(hit.rowPosition() + 1);
deletionVectors.put(hit.fileName(), unrelated);
hit = lookupLevels.lookup(row(1), 1);
assertThat(hit).isNotNull();
assertThat(hit.keyValue().value().getInt(1)).isEqualTo(11);

// mark the returned position deleted; data files are immutable so the cached
// lookup file is not rebuilt and would keep serving the stale row
BitmapDeletionVector deletionVector = new BitmapDeletionVector();
deletionVector.delete(hit.rowPosition());
deletionVectors.put(hit.fileName(), deletionVector);

// the hit must now be validated against the current deletion vector
assertThat(lookupLevels.lookup(row(1), 1)).isNull();

lookupLevels.close();
}

private LookupLevels<PositionedKeyValue> createPositionedLookupLevels(
Levels levels, DeletionVector.Factory dvFactory) {
return new LookupLevels<>(
schemaId -> rowType,
0L,
levels,
comparator,
keyType,
PersistValueAndPosProcessor.factory(rowType),
new DefaultLookupSerializerFactory(),
file -> createReaderFactory().createRecordReader(file),
file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + UUID.randomUUID()),
new SortLookupStoreFactory(
new RowCompactedSerializer(keyType).createSliceComparator(),
new CacheManager(MemorySize.ofMebiBytes(1)),
4096,
new CompressOptions("none", 1)),
rowCount -> BloomFilter.builder(rowCount, 0.05),
LookupFile.createCache(Duration.ofHours(1), MemorySize.ofMebiBytes(10)),
dvFactory);
}

private LookupLevels<KeyValue> createLookupLevels(Levels levels, MemorySize maxDiskSize) {
return new LookupLevels<>(
schemaId -> rowType,
Expand All @@ -287,7 +346,8 @@ private LookupLevels<KeyValue> createLookupLevels(Levels levels, MemorySize maxD
4096,
new CompressOptions("none", 1)),
rowCount -> BloomFilter.builder(rowCount, 0.05),
LookupFile.createCache(Duration.ofHours(1), maxDiskSize));
LookupFile.createCache(Duration.ofHours(1), maxDiskSize),
DeletionVector.emptyFactory());
}

private KeyValue kv(int key, int value) {
Expand Down