diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/retina/RetinaService.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/retina/RetinaService.java
index 7780f2e2d..d27f912c0 100644
--- a/pixels-common/src/main/java/io/pixelsdb/pixels/common/retina/RetinaService.java
+++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/retina/RetinaService.java
@@ -315,16 +315,65 @@ public boolean addVisibility(String filePath) throws RetinaException
return true;
}
- public long[][] queryVisibility(long fileId, int[] rgIds, long timestamp) throws RetinaException
+ public static final class VisibilityResult
+ {
+ private final long[][] bitmaps;
+ private final String checkpointPath;
+
+ private VisibilityResult(long[][] bitmaps, String checkpointPath)
+ {
+ this.bitmaps = bitmaps;
+ this.checkpointPath = checkpointPath;
+ }
+
+ public static VisibilityResult fromBitmaps(long[][] bitmaps)
+ {
+ return new VisibilityResult(bitmaps, null);
+ }
+
+ public static VisibilityResult fromCheckpointPath(String path)
+ {
+ return new VisibilityResult(null, path);
+ }
+
+ public boolean isOffloaded()
+ {
+ return checkpointPath != null;
+ }
+
+ public long[][] getBitmaps()
+ {
+ if (isOffloaded())
+ {
+ throw new IllegalStateException("Data is offloaded to checkpoint, use getCheckpointPath() instead.");
+ }
+ return bitmaps;
+ }
+
+ public String getCheckpointPath()
+ {
+ return checkpointPath;
+ }
+ }
+
+ public VisibilityResult queryVisibility(long fileId, int[] rgIds, long timestamp) throws RetinaException
+ {
+ return queryVisibility(fileId, rgIds, timestamp, -1);
+ }
+
+ public VisibilityResult queryVisibility(long fileId, int[] rgIds, long timestamp, long transId) throws RetinaException
{
String token = UUID.randomUUID().toString();
- RetinaProto.QueryVisibilityRequest request = RetinaProto.QueryVisibilityRequest.newBuilder()
+ RetinaProto.QueryVisibilityRequest.Builder requestBuilder = RetinaProto.QueryVisibilityRequest.newBuilder()
.setHeader(RetinaProto.RequestHeader.newBuilder().setToken(token).build())
.setFileId(fileId)
.addAllRgIds(Arrays.stream(rgIds).boxed().collect(Collectors.toList()))
- .setTimestamp(timestamp)
- .build();
- RetinaProto.QueryVisibilityResponse response = this.stub.queryVisibility(request);
+ .setTimestamp(timestamp);
+ if (transId != -1)
+ {
+ requestBuilder.setTransId(transId);
+ }
+ RetinaProto.QueryVisibilityResponse response = this.stub.queryVisibility(requestBuilder.build());
if (response.getHeader().getErrorCode() != 0)
{
throw new RetinaException("Failed to query visibility: " + response.getHeader().getErrorCode()
@@ -334,13 +383,19 @@ public long[][] queryVisibility(long fileId, int[] rgIds, long timestamp) throws
{
throw new RetinaException("Response token does not match");
}
+
+ if (response.getCheckpointPath() != null && !response.getCheckpointPath().isEmpty())
+ {
+ return VisibilityResult.fromCheckpointPath(response.getCheckpointPath());
+ }
+
long[][] visibilityBitmaps = new long[rgIds.length][];
for (int i = 0; i < response.getBitmapsCount(); i++)
{
RetinaProto.VisibilityBitmap bitmap = response.getBitmaps(i);
visibilityBitmaps[i] = bitmap.getBitmapList().stream().mapToLong(Long::longValue).toArray();
}
- return visibilityBitmaps;
+ return VisibilityResult.fromBitmaps(visibilityBitmaps);
}
public boolean reclaimVisibility(long fileId, int[] rgIds, long timestamp) throws RetinaException
diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/RetinaUtils.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/RetinaUtils.java
index 2ba8d0c4b..143a43d84 100644
--- a/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/RetinaUtils.java
+++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/RetinaUtils.java
@@ -29,6 +29,10 @@
public class RetinaUtils
{
+ public static final String CHECKPOINT_PREFIX_GC = "vis_gc_";
+ public static final String CHECKPOINT_PREFIX_OFFLOAD = "vis_offload_";
+ public static final String CHECKPOINT_SUFFIX = ".bin";
+
private static volatile RetinaUtils instance;
private final int bucketNum;
private final int defaultRetinaPort;
@@ -103,7 +107,7 @@ public static RetinaService getRetinaServiceFromBucketId(int bucketId)
return RetinaService.CreateInstance(retinaHost, getInstance().defaultRetinaPort);
}
- public static RetinaService getRetinaServiceFromPath(String path)
+ public static RetinaService getRetinaServiceFromPath(String path)
{
String retinaHost = extractRetinaHostNameFromPath(path);
if(retinaHost == null || retinaHost.equals(Constants.LOAD_DEFAULT_RETINA_PREFIX))
@@ -113,6 +117,16 @@ public static RetinaService getRetinaServiceFromPath(String path)
return RetinaService.CreateInstance(retinaHost, getInstance().defaultRetinaPort);
}
+ public static String getCheckpointFileName(String prefix, String hostname, long timestamp)
+ {
+ return prefix + hostname + "_" + timestamp + CHECKPOINT_SUFFIX;
+ }
+
+ public static String getCheckpointPrefix(String typePrefix, String hostname)
+ {
+ return typePrefix + hostname + "_";
+ }
+
private static String extractRetinaHostNameFromPath(String path)
{
if (path == null || path.isEmpty()) {
diff --git a/pixels-common/src/main/resources/pixels.properties b/pixels-common/src/main/resources/pixels.properties
index 3d1b07e44..6f781d67e 100644
--- a/pixels-common/src/main/resources/pixels.properties
+++ b/pixels-common/src/main/resources/pixels.properties
@@ -287,6 +287,8 @@ retina.buffer.flush.count=20
retina.buffer.flush.interval=30
# interval in seconds for retina visibility garbage
retina.gc.interval=300
+# number of threads for retina checkpoint
+retina.checkpoint.threads=4
# retina buffer reader prefetch threads num
retina.reader.prefetch.threads=8
# retina service init threads num
@@ -297,6 +299,8 @@ retina.metrics.log.interval=-1
retina.upsert-mode.enabled=false
# offloading threshold for long query in seconds
pixels.transaction.offload.threshold=1800
+# lease duration for retina offload cache in seconds, default 600s
+retina.offload.cache.lease.duration=600
# snapshot storage directory
pixels.retina.checkpoint.dir=file:///tmp/pixels-checkpoints
diff --git a/pixels-core/pom.xml b/pixels-core/pom.xml
index 952f37b95..76cdceaa0 100644
--- a/pixels-core/pom.xml
+++ b/pixels-core/pom.xml
@@ -59,6 +59,12 @@
test
true
+
+
+ org.junit.vintage
+ junit-vintage-engine
+ test
+
diff --git a/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/PixelsRecordReaderImpl.java b/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/PixelsRecordReaderImpl.java
index 7eb707135..534c40c20 100644
--- a/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/PixelsRecordReaderImpl.java
+++ b/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/PixelsRecordReaderImpl.java
@@ -509,7 +509,21 @@ else if (predicate.matchesNone())
{
MetadataService metadataService = MetadataService.Instance();
long fileId = metadataService.getFileId(physicalReader.getPathUri());
- rgVisibilityBitmaps = retinaService.queryVisibility(fileId, targetRGs, option.getTransTimestamp());
+ RetinaService.VisibilityResult result = retinaService.queryVisibility(fileId, targetRGs, option.getTransTimestamp(), option.getTransId());
+ if (result.isOffloaded())
+ {
+ String checkpointPath = result.getCheckpointPath();
+ rgVisibilityBitmaps = new long[targetRGNum][];
+ for (int i = 0; i < targetRGNum; i++)
+ {
+ rgVisibilityBitmaps[i] = VisibilityCheckpointCache.getInstance()
+ .getVisibilityBitmap(option.getTransTimestamp(), checkpointPath, fileId, targetRGs[i]);
+ }
+ }
+ else
+ {
+ rgVisibilityBitmaps = result.getBitmaps();
+ }
} catch (IOException e)
{
logger.error("Failed to get path uri");
diff --git a/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/VisibilityCheckpointCache.java b/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/VisibilityCheckpointCache.java
new file mode 100644
index 000000000..ee7845dff
--- /dev/null
+++ b/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/VisibilityCheckpointCache.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2024 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+package io.pixelsdb.pixels.core.reader;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import io.pixelsdb.pixels.common.physical.PhysicalReader;
+import io.pixelsdb.pixels.common.physical.PhysicalReaderUtil;
+import io.pixelsdb.pixels.common.physical.Storage;
+import io.pixelsdb.pixels.common.physical.StorageFactory;
+import io.pixelsdb.pixels.common.utils.ConfigFactory;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Client-side cache for visibility checkpoints.
+ * This cache is used by PixelsReader to avoid repeated loading of checkpoint files from storage.
+ */
+public class VisibilityCheckpointCache
+{
+ private static final Logger logger = LogManager.getLogger(VisibilityCheckpointCache.class);
+ private static final VisibilityCheckpointCache instance = new VisibilityCheckpointCache();
+
+ private final Cache> cache;
+
+ private VisibilityCheckpointCache()
+ {
+ long leaseDuration = Long.parseLong(ConfigFactory.Instance().getProperty("retina.offload.cache.lease.duration"));
+
+ this.cache = Caffeine.newBuilder()
+ .expireAfterAccess(leaseDuration, TimeUnit.SECONDS)
+ .removalListener((key, value, cause) -> {
+ logger.info("Client-side visibility cache for timestamp {} evicted due to {}", key, cause);
+ })
+ .build();
+ }
+
+ public static VisibilityCheckpointCache getInstance()
+ {
+ return instance;
+ }
+
+ public long[] getVisibilityBitmap(long timestamp, String checkpointPath, long targetFileId, int targetRgId) throws IOException
+ {
+ Map timestampCache = cache.getIfPresent(timestamp);
+
+ if (timestampCache == null)
+ {
+ synchronized (this)
+ {
+ timestampCache = cache.getIfPresent(timestamp);
+ if (timestampCache == null)
+ {
+ timestampCache = loadCheckpointFile(checkpointPath);
+ cache.put(timestamp, timestampCache);
+ }
+ }
+ }
+
+ String rgKey = targetFileId + "_" + targetRgId;
+ return timestampCache.getOrDefault(rgKey, null);
+ }
+
+ private Map loadCheckpointFile(String path) throws IOException
+ {
+ long start = System.currentTimeMillis();
+ Storage storage = StorageFactory.Instance().getStorage(path);
+ long fileLength = storage.getStatus(path).getLength();
+
+ byte[] content;
+ try (PhysicalReader reader = PhysicalReaderUtil.newPhysicalReader(storage, path))
+ {
+ ByteBuffer buffer = reader.readFully((int) fileLength);
+ if (buffer.hasArray())
+ {
+ content = buffer.array();
+ }
+ else
+ {
+ content = new byte[(int) fileLength];
+ buffer.get(content);
+ }
+ }
+
+ Map timestampCache;
+ try (DataInputStream in = new DataInputStream(new ByteArrayInputStream(content)))
+ {
+ int rgCount = in.readInt();
+ // Initial capacity: count / 0.75 + 1 to avoid rehash
+ timestampCache = new ConcurrentHashMap<>((int) (rgCount / 0.75) + 1);
+
+ for (int i = 0; i < rgCount; i++)
+ {
+ long fileId = in.readLong();
+ int rgId = in.readInt();
+ int len = in.readInt();
+ long[] bitmap = new long[len];
+ for (int j = 0; j < len; j++)
+ {
+ bitmap[j] = in.readLong();
+ }
+ timestampCache.put(fileId + "_" + rgId, bitmap);
+ }
+ }
+ long end = System.currentTimeMillis();
+ logger.info("Loaded visibility checkpoint from {} in {} ms, RG count: {}", path, (end - start), timestampCache.size());
+ return timestampCache;
+ }
+}
diff --git a/pixels-core/src/test/java/io/pixelsdb/pixels/core/reader/TestVisibilityCheckpointCache.java b/pixels-core/src/test/java/io/pixelsdb/pixels/core/reader/TestVisibilityCheckpointCache.java
new file mode 100644
index 000000000..bfcec50ee
--- /dev/null
+++ b/pixels-core/src/test/java/io/pixelsdb/pixels/core/reader/TestVisibilityCheckpointCache.java
@@ -0,0 +1,169 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+package io.pixelsdb.pixels.core.reader;
+
+import io.pixelsdb.pixels.common.physical.Storage;
+import io.pixelsdb.pixels.common.physical.StorageFactory;
+import io.pixelsdb.pixels.common.utils.ConfigFactory;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test client-side checkpoint loading and caching performance.
+ */
+public class TestVisibilityCheckpointCache
+{
+ private String testCheckpointDir;
+ private Storage storage;
+ private final long fileId = 999999L;
+ private final int rgId = 0;
+
+ @Before
+ public void setUp() throws IOException
+ {
+ testCheckpointDir = ConfigFactory.Instance().getProperty("pixels.retina.checkpoint.dir");
+ storage = StorageFactory.Instance().getStorage(testCheckpointDir);
+
+ if (!storage.exists(testCheckpointDir))
+ {
+ storage.mkdirs(testCheckpointDir);
+ }
+ }
+
+ private String resolve(String dir, String filename) {
+ return dir.endsWith("/") ? dir + filename : dir + "/" + filename;
+ }
+
+ /**
+ * Helper to create a dummy checkpoint file.
+ */
+ private void createDummyCheckpoint(String path, int numFiles, int rgsPerFile, long[] bitmap) throws IOException
+ {
+ try (DataOutputStream out = storage.create(path, true, 8 * 1024 * 1024))
+ {
+ out.writeInt(numFiles * rgsPerFile);
+ for (int f = 0; f < numFiles; f++)
+ {
+ for (int r = 0; r < rgsPerFile; r++)
+ {
+ out.writeLong((long)f);
+ out.writeInt(r);
+ out.writeInt(bitmap.length);
+ for (long l : bitmap)
+ {
+ out.writeLong(l);
+ }
+ }
+ }
+ out.flush();
+ }
+ }
+
+ @Test
+ public void testCacheLoading() throws IOException
+ {
+ long timestamp = 1000L;
+ String checkpointPath = resolve(testCheckpointDir, "vis_gc_tencent_100.bin");
+ long[] dummyBitmap = new long[]{0x1L, 0x2L};
+ createDummyCheckpoint(checkpointPath, 1, 1, dummyBitmap);
+
+ VisibilityCheckpointCache cache = VisibilityCheckpointCache.getInstance();
+ long[] loaded = cache.getVisibilityBitmap(timestamp, checkpointPath, 0L, 0);
+
+ assertNotNull(loaded);
+ assertTrue(loaded.length == 2);
+ assertTrue(loaded[0] == 0x1L);
+ assertTrue(loaded[1] == 0x2L);
+ }
+
+ /**
+ * Migrated and adapted performance test.
+ */
+ @Test
+ public void testCheckpointPerformance() throws InterruptedException, IOException
+ {
+ // 1. Configuration parameters
+ int numFiles = 5000;
+ int rgsPerFile = 1;
+ int rowsPerRG = 200000;
+ int queryCount = 200;
+ long timestamp = 2000L;
+ String checkpointPath = resolve(testCheckpointDir, "perf_test_2000.bin");
+
+ // 2. Prepare data
+ int bitmapLen = (rowsPerRG + 63) / 64;
+ long[] dummyBitmap = new long[bitmapLen];
+ for(int i=0; ilog4j-over-slf4j
true
+
+
+ org.junit.vintage
+ junit-vintage-engine
+ test
+
diff --git a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RGVisibility.java b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RGVisibility.java
index 954c29710..a986717f9 100644
--- a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RGVisibility.java
+++ b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RGVisibility.java
@@ -63,14 +63,17 @@ public class RGVisibility implements AutoCloseable
* Constructor creates C++ object and returns handle.
*/
private final AtomicLong nativeHandle = new AtomicLong();
+ private final long recordNum;
public RGVisibility(long rgRecordNum)
{
+ this.recordNum = rgRecordNum;
this.nativeHandle.set(createNativeObject(rgRecordNum));
}
public RGVisibility(long rgRecordNum, long timestamp, long[] initialBitmap)
{
+ this.recordNum = rgRecordNum;
if (initialBitmap == null)
{
this.nativeHandle.set(createNativeObject(rgRecordNum));
@@ -80,6 +83,11 @@ public RGVisibility(long rgRecordNum, long timestamp, long[] initialBitmap)
}
}
+ public long getRecordNum()
+ {
+ return recordNum;
+ }
+
@Override
public void close()
{
diff --git a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java
index d9cbb972d..a28df69a4 100644
--- a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java
+++ b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java
@@ -25,13 +25,13 @@
import io.pixelsdb.pixels.common.metadata.MetadataService;
import io.pixelsdb.pixels.common.metadata.domain.Column;
import io.pixelsdb.pixels.common.metadata.domain.Layout;
-import io.pixelsdb.pixels.common.metadata.domain.Table;
import io.pixelsdb.pixels.common.physical.PhysicalReader;
import io.pixelsdb.pixels.common.physical.PhysicalReaderUtil;
import io.pixelsdb.pixels.common.physical.Storage;
import io.pixelsdb.pixels.common.physical.StorageFactory;
import io.pixelsdb.pixels.common.transaction.TransService;
import io.pixelsdb.pixels.common.utils.ConfigFactory;
+import io.pixelsdb.pixels.common.utils.RetinaUtils;
import io.pixelsdb.pixels.core.PixelsProto;
import io.pixelsdb.pixels.core.TypeDescription;
import io.pixelsdb.pixels.index.IndexProto;
@@ -45,10 +45,7 @@
import java.nio.ByteOrder;
import java.nio.file.Paths;
import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -67,25 +64,30 @@ public class RetinaResourceManager
private final ScheduledExecutorService gcExecutor;
// Checkpoint related fields
+ private final ExecutorService checkpointExecutor;
private final Map offloadedCheckpoints;
+ private final Map> checkpointFutures;
private final String checkpointDir;
private long latestGcTimestamp = -1;
private final int totalVirtualNodeNum;
private final Map checkpointRefCounts;
- private static class RecoveredState
+ private static class CheckpointEntry
{
- final long timestamp;
+ final long fileId;
+ final int rgId;
+ final int recordNum;
final long[] bitmap;
- RecoveredState(long timestamp, long[] bitmap)
+ CheckpointEntry(long fileId, int rgId, int recordNum, long[] bitmap)
{
- this.timestamp = timestamp;
+ this.fileId = fileId;
+ this.rgId = rgId;
+ this.recordNum = recordNum;
this.bitmap = bitmap;
}
}
- private final Map recoveryCache;
private enum CheckpointType
{
@@ -99,18 +101,27 @@ private RetinaResourceManager()
this.rgVisibilityMap = new ConcurrentHashMap<>();
this.pixelsWriteBufferMap = new ConcurrentHashMap<>();
this.offloadedCheckpoints = new ConcurrentHashMap<>();
+ this.checkpointFutures = new ConcurrentHashMap<>();
+
+ ConfigFactory config = ConfigFactory.Instance();
+
this.checkpointRefCounts = new ConcurrentHashMap<>();
- this.checkpointDir = ConfigFactory.Instance().getProperty("pixels.retina.checkpoint.dir");
- this.recoveryCache = new ConcurrentHashMap<>();
+ this.checkpointDir = config.getProperty("pixels.retina.checkpoint.dir");
+
+ int cpThreads = Integer.parseInt(config.getProperty("retina.checkpoint.threads"));
+ this.checkpointExecutor = Executors.newFixedThreadPool(cpThreads, r -> {
+ Thread t = new Thread(r, "retina-checkpoint-thread");
+ t.setDaemon(true);
+ return t;
+ });
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(r -> {
- Thread t = new Thread(r, "retina-gc-thread");
- t.setDaemon(true);
- return t;
- });
+ Thread t = new Thread(r, "retina-gc-thread");
+ t.setDaemon(true);
+ return t;
+ });
try
{
- ConfigFactory config = ConfigFactory.Instance();
long interval = Long.parseLong(config.getProperty("retina.gc.interval"));
if (interval > 0)
{
@@ -151,131 +162,15 @@ public static RetinaResourceManager Instance()
return InstanceHolder.instance;
}
- public void recoverCheckpoints()
- {
- try
- {
- Storage storage = StorageFactory.Instance().getStorage(checkpointDir);
- if (!storage.exists(checkpointDir))
- {
- storage.mkdirs(checkpointDir);
- return;
- }
-
- List allFiles = storage.listPaths(checkpointDir);
- // filter only .bin files
- allFiles = allFiles.stream().filter(p -> p.endsWith(".bin")).collect(Collectors.toList());
-
- List gcTimestamps = new ArrayList<>();
- for (String path : allFiles)
- {
- // use Paths.get().getFileName() to extract filename from path string
- String filename = Paths.get(path).getFileName().toString();
- if (filename.startsWith("vis_offload_"))
- {
- // delete offload checkpoint files when restarting
- try
- {
- storage.delete(path, false);
- } catch (IOException e)
- {
- logger.error("Failed to delete checkpoint file {}", path, e);
- }
- } else if (filename.startsWith("vis_gc_"))
- {
- try
- {
- gcTimestamps.add(Long.parseLong(filename.replace("vis_gc_", "").replace(".bin", "")));
- } catch (Exception e)
- {
- logger.error("Failed to parse checkpoint timestamp from file {}", path, e);
- }
- }
- }
-
- if (gcTimestamps.isEmpty())
- {
- return;
- }
-
- Collections.sort(gcTimestamps);
- long latestTs = gcTimestamps.get(gcTimestamps.size() - 1);
- this.latestGcTimestamp = latestTs;
- logger.info("Loading system state from GC checkpoint: {}", latestTs);
-
- // load to recoveryCache
- loadCheckpointToCache(latestTs);
-
- // delete old GC checkpoint files
- for (int i = 0; i < gcTimestamps.size() - 1; i++)
- {
- removeCheckpointFile(gcTimestamps.get(i), CheckpointType.GC);
- }
- } catch (IOException e)
- {
- logger.error("Failed to recover checkpoints", e);
- }
- }
-
- private void loadCheckpointToCache(long timestamp)
- {
- String fileName = "vis_gc_" + timestamp + ".bin";
- // construct path. Storage expects '/' separator usually, but let's be safe
- String path = checkpointDir.endsWith("/") ? checkpointDir + fileName : checkpointDir + "/" + fileName;
-
- try
- {
- Storage storage = StorageFactory.Instance().getStorage(path);
- if (!storage.exists(path))
- {
- return;
- }
-
- try (DataInputStream in = storage.open(path))
- {
- int rgCount = in.readInt();
- for (int i = 0; i < rgCount; i++)
- {
- long fileId = in.readLong();
- int rgId = in.readInt();
- int len = in.readInt();
- long[] bitmap = new long[len];
- for (int j = 0; j < len; j++)
- {
- bitmap[j] = in.readLong();
- }
- recoveryCache.put(fileId + "_" + rgId, new RecoveredState(timestamp, bitmap));
- }
- }
- } catch (IOException e)
- {
- logger.error("Failed to read checkpoint file: {}", e);
- }
- }
-
public void addVisibility(long fileId, int rgId, int recordNum)
{
String rgKey = fileId + "_" + rgId;
- RecoveredState recoveredState = recoveryCache.remove(rgKey);
-
- RGVisibility rgVisibility;
- if (recoveredState != null)
+ if (rgVisibilityMap.containsKey(rgKey))
{
- rgVisibility = new RGVisibility(recordNum, recoveredState.timestamp, recoveredState.bitmap);
- } else
- {
- rgVisibility = new RGVisibility(recordNum);
+ return;
}
- rgVisibilityMap.put(rgKey, rgVisibility);
- }
- public void finishRecovery()
- {
- if (!recoveryCache.isEmpty())
- {
- logger.info("Dropping {} orphaned entries from recovery cache.", recoveryCache.size());
- recoveryCache.clear();
- }
+ rgVisibilityMap.put(rgKey, new RGVisibility(recordNum));
}
public void addVisibility(String filePath) throws RetinaException
@@ -309,16 +204,7 @@ public void addVisibility(String filePath) throws RetinaException
public long[] queryVisibility(long fileId, int rgId, long timestamp, long transId) throws RetinaException
{
- // [Routing Logic] Only read from disk if the transaction is explicitly registered as Offload
- if (transId != -1)
- {
- if (offloadedCheckpoints.containsKey(timestamp))
- {
- return loadBitmapFromDisk(timestamp, fileId, rgId);
- }
- throw new RetinaException("Offloaded checkpoint missing for TransID" + transId);
- }
- // otherwise read from memory
+ // read from memory
RGVisibility rgVisibility = checkRGVisibility(fileId, rgId);
long[] visibilityBitmap = rgVisibility.getVisibilityBitmap(timestamp);
if (visibilityBitmap == null)
@@ -328,6 +214,7 @@ public long[] queryVisibility(long fileId, int rgId, long timestamp, long transI
return visibilityBitmap;
}
+
public long[] queryVisibility(long fileId, int rgId, long timestamp) throws RetinaException
{
return queryVisibility(fileId, rgId, timestamp, -1);
@@ -345,35 +232,57 @@ public long[] queryVisibility(long fileId, int rgId, long timestamp) throws Reti
*/
public void registerOffload(long timestamp) throws RetinaException
{
- while (true)
+ AtomicInteger refCount = checkpointRefCounts.computeIfAbsent(timestamp, k -> new AtomicInteger(0));
+ CompletableFuture future;
+
+ synchronized (refCount)
{
- AtomicInteger refCount = checkpointRefCounts.computeIfAbsent(timestamp, k -> new AtomicInteger(0));
+ refCount.incrementAndGet();
- synchronized (refCount)
+ // If checkpoint already exists and is fully committed, just return
+ if (offloadedCheckpoints.containsKey(timestamp))
{
- if (checkpointRefCounts.get(timestamp) != refCount)
- {
- continue;
- }
+ logger.info("Registered offload for Timestamp: {} (already exists)", timestamp);
+ return;
+ }
- int currentRef = refCount.incrementAndGet();
- if (currentRef == 1)
- {
+ // Check if there is an existing future
+ future = checkpointFutures.get(timestamp);
+ if (future != null && future.isCompletedExceptionally())
+ {
+ // If previous attempt failed, remove it so we can retry
+ checkpointFutures.remove(timestamp, future);
+ future = null;
+ }
+
+ if (future == null)
+ {
+ future = checkpointFutures.computeIfAbsent(timestamp, k -> {
try
{
- if (!offloadedCheckpoints.containsKey(timestamp))
- {
- createCheckpoint(timestamp, CheckpointType.OFFLOAD);
- }
- } catch (Exception e)
+ return createCheckpoint(timestamp, CheckpointType.OFFLOAD);
+ } catch (RetinaException e)
{
- refCount.decrementAndGet();
- throw e;
+ throw new CompletionException(e);
}
- }
+ });
}
+ }
+
+ try
+ {
+ future.join();
logger.info("Registered offload for Timestamp: {}", timestamp);
- return;
+ } catch (Exception e)
+ {
+ synchronized (refCount)
+ {
+ refCount.decrementAndGet();
+ // We don't remove from checkpointFutures here anymore,
+ // because it's handled above in the synchronized block for retries
+ // or let the next caller handle it.
+ }
+ throw new RetinaException("Failed to create checkpoint for timestamp: " + timestamp, e);
}
}
@@ -388,6 +297,7 @@ public void unregisterOffload(long timestamp)
if (remaining <= 0)
{
offloadedCheckpoints.remove(timestamp);
+ checkpointFutures.remove(timestamp);
if (refCount.get() > 0)
{
logger.info("Checkpoint resurrection detected, skipping deletion. TS: {}", timestamp);
@@ -401,117 +311,116 @@ public void unregisterOffload(long timestamp)
}
}
- private void createCheckpoint(long timestamp, CheckpointType type) throws RetinaException
+ private CompletableFuture createCheckpoint(long timestamp, CheckpointType type) throws RetinaException
{
- String prefix = (type == CheckpointType.GC) ? "vis_gc_" : "vis_offload_";
- String fileName = prefix + timestamp + ".bin";
+ String prefix = (type == CheckpointType.GC) ? RetinaUtils.CHECKPOINT_PREFIX_GC : RetinaUtils.CHECKPOINT_PREFIX_OFFLOAD;
+ String fileName = RetinaUtils.getCheckpointFileName(prefix, retinaHostName, timestamp);
String filePath = checkpointDir.endsWith("/") ? checkpointDir + fileName : checkpointDir + "/" + fileName;
- try
+ // 1. Capture current entries to ensure we process a consistent set of RGs
+ List> entries = new ArrayList<>(this.rgVisibilityMap.entrySet());
+ int totalRgs = entries.size();
+ logger.info("Starting {} checkpoint for {} RGs at timestamp {}", type, totalRgs, timestamp);
+
+ // 2. Use a BlockingQueue for producer-consumer pattern
+ // Limit capacity to avoid excessive memory usage if writing is slow
+ BlockingQueue queue = new LinkedBlockingQueue<>(1024);
+
+ // 3. Start producer tasks to fetch bitmaps in parallel
+ for (Map.Entry entry : entries)
{
- Storage storage = StorageFactory.Instance().getStorage(filePath);
- // Write directly to the final file as atomic move is not supported by all storages (e.g. S3).
- // Object stores typically guarantee atomicity of the put operation.
- try (DataOutputStream out = storage.create(filePath, true, 4096))
- {
- int rgCount = this.rgVisibilityMap.size();
- out.writeInt(rgCount);
- for (Map.Entry entry : this.rgVisibilityMap.entrySet())
+ checkpointExecutor.submit(() -> {
+ try
{
- String[] parts = entry.getKey().split("_");
+ String key = entry.getKey();
+ String[] parts = key.split("_");
long fileId = Long.parseLong(parts[0]);
int rgId = Integer.parseInt(parts[1]);
- long[] bitmap = entry.getValue().getVisibilityBitmap(timestamp);
-
- out.writeLong(fileId);
- out.writeInt(rgId);
- out.writeInt(bitmap.length);
- for (long l : bitmap)
- {
- out.writeLong(l);
- }
- }
- out.flush();
- }
-
- if (type == CheckpointType.OFFLOAD)
- {
- offloadedCheckpoints.put(timestamp, filePath);
- } else
- {
- long oldGcTs = this.latestGcTimestamp;
- this.latestGcTimestamp = timestamp;
- if (oldGcTs != -1 && oldGcTs != timestamp)
+ RGVisibility rgVisibility = entry.getValue();
+ long[] bitmap = rgVisibility.getVisibilityBitmap(timestamp);
+ queue.put(new CheckpointEntry(fileId, rgId, (int) rgVisibility.getRecordNum(), bitmap));
+ } catch (Exception e)
{
- removeCheckpointFile(oldGcTs, CheckpointType.GC);
+ logger.error("Failed to fetch visibility bitmap for checkpoint", e);
}
- }
- } catch (IOException e)
- {
- // Try to cleanup the potentially corrupted or partial file
- try
- {
- StorageFactory.Instance().getStorage(filePath).delete(filePath, false);
- } catch (IOException ignored)
- {
- }
- throw new RetinaException("Failed to commit checkpoint file", e);
+ });
}
- }
- private long[] loadBitmapFromDisk(long timestamp, long targetFileId, int targetRgId) throws RetinaException
- {
- String path = offloadedCheckpoints.get(timestamp);
- if (path == null)
- {
- throw new RetinaException("Checkpoint missing: " + timestamp);
- }
-
- try
- {
- Storage storage = StorageFactory.Instance().getStorage(path);
- if (!storage.exists(path))
+ // 4. Async Write: perform IO in background thread (Consumer)
+ // Use commonPool to avoid deadlocks with checkpointExecutor
+ return CompletableFuture.runAsync(() -> {
+ // Lock on filePath string intern to ensure only one thread writes to the same file
+ synchronized (filePath.intern())
{
- throw new RetinaException("Checkpoint file missing: " + path);
- }
+ if (type == CheckpointType.OFFLOAD && offloadedCheckpoints.containsKey(timestamp))
+ {
+ return;
+ }
+ if (type == CheckpointType.GC && timestamp <= latestGcTimestamp)
+ {
+ return;
+ }
- try (DataInputStream in = storage.open(path))
- {
- int rgCount = in.readInt();
- for (int i = 0; i < rgCount; i++)
+ long startWrite = System.currentTimeMillis();
+ try
{
- long fileId = in.readLong();
- int rgId = in.readInt();
- int len = in.readInt();
- if (fileId == targetFileId && rgId == targetRgId)
+ Storage storage = StorageFactory.Instance().getStorage(filePath);
+ // Use a temporary file to ensure atomic commit
+ // Although LocalFS lacks rename, using a synchronized block here
+ // makes it safe within this JVM instance.
+ try (DataOutputStream out = storage.create(filePath, true, 8 * 1024 * 1024))
{
- long[] bitmap = new long[len];
- for (int j = 0; j < len; j++)
+ out.writeInt(totalRgs);
+ for (int i = 0; i < totalRgs; i++)
{
- bitmap[j] = in.readLong();
+ CheckpointEntry entry = queue.take();
+ out.writeLong(entry.fileId);
+ out.writeInt(entry.rgId);
+ out.writeInt(entry.recordNum);
+ out.writeInt(entry.bitmap.length);
+ for (long l : entry.bitmap)
+ {
+ out.writeLong(l);
+ }
}
- return bitmap;
+ out.flush();
+ }
+ long endWrite = System.currentTimeMillis();
+ logger.info("Writing {} checkpoint file to {} took {} ms", type, filePath, (endWrite - startWrite));
+
+ if (type == CheckpointType.OFFLOAD)
+ {
+ offloadedCheckpoints.put(timestamp, filePath);
} else
{
- int skipped = in.skipBytes(len * 8);
- if (skipped != len * 8)
+ long oldGcTs = this.latestGcTimestamp;
+ this.latestGcTimestamp = timestamp;
+ if (oldGcTs != -1 && oldGcTs != timestamp)
{
- throw new IOException("Unexpected EOF");
+ removeCheckpointFile(oldGcTs, CheckpointType.GC);
}
}
+ } catch (Exception e)
+ {
+ logger.error("Failed to commit {} checkpoint file for timestamp: {}", type, timestamp, e);
+ // Try to cleanup the potentially corrupted or partial file
+ try
+ {
+ StorageFactory.Instance().getStorage(filePath).delete(filePath, false);
+ } catch (IOException ignored)
+ {
+ }
+ throw new CompletionException(e);
}
}
- } catch (IOException e)
- {
- throw new RetinaException("Failed to read checkpoint file", e);
- }
- return new long[0];
+ });
}
+
private void removeCheckpointFile(long timestamp, CheckpointType type)
{
- String prefix = (type == CheckpointType.GC) ? "vis_gc_" : "vis_offload_";
- String fileName = prefix + timestamp + ".bin";
+ String prefix = (type == CheckpointType.GC) ? RetinaUtils.CHECKPOINT_PREFIX_GC : RetinaUtils.CHECKPOINT_PREFIX_OFFLOAD;
+ String fileName = RetinaUtils.getCheckpointFileName(prefix, retinaHostName, timestamp);
String path = checkpointDir.endsWith("/") ? checkpointDir + fileName : checkpointDir + "/" + fileName;
try
@@ -529,6 +438,11 @@ public void reclaimVisibility(long fileId, int rgId, long timestamp) throws Reti
rgVisibility.getVisibilityBitmap(timestamp);
}
+ public String getCheckpointPath(long timestamp)
+ {
+ return offloadedCheckpoints.get(timestamp);
+ }
+
public void deleteRecord(long fileId, int rgId, int rgRowOffset, long timestamp) throws RetinaException
{
RGVisibility rgVisibility = checkRGVisibility(fileId, rgId);
@@ -753,4 +667,102 @@ private void runGC()
logger.error("Error while running GC", e);
}
}
+
+ public void recoverCheckpoints()
+ {
+ try
+ {
+ Storage storage = StorageFactory.Instance().getStorage(checkpointDir);
+ if (!storage.exists(checkpointDir))
+ {
+ storage.mkdirs(checkpointDir);
+ return;
+ }
+
+ List allFiles = storage.listPaths(checkpointDir);
+ // filter only .bin files
+ allFiles = allFiles.stream().filter(p -> p.endsWith(".bin")).collect(Collectors.toList());
+
+ List gcTimestamps = new ArrayList<>();
+ String offloadPrefix = RetinaUtils.getCheckpointPrefix(RetinaUtils.CHECKPOINT_PREFIX_OFFLOAD, retinaHostName);
+ String gcPrefix = RetinaUtils.getCheckpointPrefix(RetinaUtils.CHECKPOINT_PREFIX_GC, retinaHostName);
+
+ for (String path : allFiles)
+ {
+ // use Paths.get().getFileName() to extract filename from path string
+ String filename = Paths.get(path).getFileName().toString();
+ if (filename.startsWith(offloadPrefix))
+ {
+ // delete offload checkpoint files when restarting
+ try
+ {
+ storage.delete(path, false);
+ } catch (IOException e)
+ {
+ logger.error("Failed to delete checkpoint file {}", path, e);
+ }
+ } else if (filename.startsWith(gcPrefix))
+ {
+ try
+ {
+ gcTimestamps.add(Long.parseLong(filename.replace(gcPrefix, "").replace(".bin", "")));
+ } catch (Exception e)
+ {
+ logger.error("Failed to parse checkpoint timestamp from file {}", path, e);
+ }
+ }
+ }
+
+ if (gcTimestamps.isEmpty())
+ {
+ return;
+ }
+
+ Collections.sort(gcTimestamps);
+ long latestTs = gcTimestamps.get(gcTimestamps.size() - 1);
+ this.latestGcTimestamp = latestTs;
+ logger.info("Loading system state from GC checkpoint: {}", latestTs);
+
+ // load to rgVisibilityMap
+ String fileName = RetinaUtils.getCheckpointFileName(RetinaUtils.CHECKPOINT_PREFIX_GC, retinaHostName, latestTs);
+ String latestPath = checkpointDir.endsWith("/") ? checkpointDir + fileName : checkpointDir + "/" + fileName;
+
+ try
+ {
+ Storage latestStorage = StorageFactory.Instance().getStorage(latestPath);
+ if (latestStorage.exists(latestPath))
+ {
+ try (DataInputStream in = latestStorage.open(latestPath))
+ {
+ int rgCount = in.readInt();
+ for (int i = 0; i < rgCount; i++)
+ {
+ long fileId = in.readLong();
+ int rgId = in.readInt();
+ int recordNum = in.readInt();
+ int len = in.readInt();
+ long[] bitmap = new long[len];
+ for (int j = 0; j < len; j++)
+ {
+ bitmap[j] = in.readLong();
+ }
+ rgVisibilityMap.put(fileId + "_" + rgId, new RGVisibility(recordNum, latestTs, bitmap));
+ }
+ }
+ }
+ } catch (IOException e)
+ {
+ logger.error("Failed to read checkpoint file: {}", e);
+ }
+
+ // delete old GC checkpoint files
+ for (int i = 0; i < gcTimestamps.size() - 1; i++)
+ {
+ removeCheckpointFile(gcTimestamps.get(i), CheckpointType.GC);
+ }
+ } catch (IOException e)
+ {
+ logger.error("Failed to recover checkpoints", e);
+ }
+ }
}
diff --git a/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java b/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java
index c41ab36ae..b1fab563a 100644
--- a/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java
+++ b/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java
@@ -23,6 +23,7 @@
import io.pixelsdb.pixels.common.physical.Storage;
import io.pixelsdb.pixels.common.physical.StorageFactory;
import io.pixelsdb.pixels.common.utils.ConfigFactory;
+import io.pixelsdb.pixels.common.utils.RetinaUtils;
import org.junit.Before;
import org.junit.Test;
@@ -30,25 +31,29 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.reflect.Field;
+import java.net.InetAddress;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
/**
- * Test checkpoint creation and recovery logic.
+ * Test checkpoint creation and recovery logic in Retina side.
*/
public class TestRetinaCheckpoint
{
private RetinaResourceManager retinaManager;
private String testCheckpointDir;
private Storage storage;
- private final long fileId = 1L;
+ private final long fileId = 999999L;
private final int rgId = 0;
private final int numRows = 1024;
+ private String hostName;
@Before
public void setUp() throws IOException, RetinaException
@@ -74,65 +79,90 @@ public void setUp() throws IOException, RetinaException
}
retinaManager = RetinaResourceManager.Instance();
- retinaManager.addVisibility(fileId, rgId, numRows);
+ resetSingletonState();
+ hostName = System.getenv("HOSTNAME");
+ if (hostName == null)
+ {
+ hostName = InetAddress.getLocalHost().getHostName();
+ }
}
private String resolve(String dir, String filename) {
return dir.endsWith("/") ? dir + filename : dir + "/" + filename;
}
+ private String getOffloadFileName(long timestamp) {
+ return RetinaUtils.getCheckpointFileName(RetinaUtils.CHECKPOINT_PREFIX_OFFLOAD, hostName, timestamp);
+ }
+
+ private String getGcFileName(long timestamp) {
+ return RetinaUtils.getCheckpointFileName(RetinaUtils.CHECKPOINT_PREFIX_GC, hostName, timestamp);
+ }
+
@Test
public void testRegisterOffload() throws RetinaException, IOException
{
- long transId = 12345L;
+ System.out.println("\n[Test] Starting testRegisterOffload...");
+ retinaManager.addVisibility(fileId, rgId, numRows);
long timestamp = 100L;
// Register offload
+ System.out.println("Registering offload for timestamp: " + timestamp);
retinaManager.registerOffload(timestamp);
// Verify checkpoint file exists
- String expectedFile = resolve(testCheckpointDir, "vis_offload_100.bin");
+ String expectedFile = resolve(testCheckpointDir, getOffloadFileName(timestamp));
assertTrue("Offload checkpoint file should exist", storage.exists(expectedFile));
+ System.out.println("Verified: Checkpoint file exists at " + expectedFile);
// Unregister
+ System.out.println("Unregistering offload...");
retinaManager.unregisterOffload(timestamp);
// File should be removed
assertFalse("Offload checkpoint file should be removed", storage.exists(expectedFile));
+ System.out.println("Verified: Checkpoint file removed. testRegisterOffload passed.");
}
@Test
public void testMultipleOffloads() throws RetinaException, IOException
{
- long transId1 = 12345L;
+ System.out.println("\n[Test] Starting testMultipleOffloads...");
+ retinaManager.addVisibility(fileId, rgId, numRows);
long timestamp1 = 100L;
- long transId2 = 12346L;
long timestamp1_dup = 100L; // same timestamp
// Both register the same timestamp - should share checkpoint
+ System.out.println("Registering same timestamp twice...");
retinaManager.registerOffload(timestamp1);
retinaManager.registerOffload(timestamp1_dup);
- String expectedFile = resolve(testCheckpointDir, "vis_offload_100.bin");
+ String expectedFile = resolve(testCheckpointDir, getOffloadFileName(timestamp1));
assertTrue("Offload checkpoint file should exist", storage.exists(expectedFile));
// Unregister one - should not remove yet (ref count >1)
+ System.out.println("Unregistering once (ref count should still be > 0)...");
retinaManager.unregisterOffload(timestamp1);
assertTrue("Offload checkpoint should still exist (ref count >1)", storage.exists(expectedFile));
+ System.out.println("Verified: Checkpoint still exists after one unregister.");
// Unregister second
+ System.out.println("Unregistering second time...");
retinaManager.unregisterOffload(timestamp1);
assertFalse("Offload checkpoint should be removed", storage.exists(expectedFile));
+ System.out.println("Verified: Checkpoint removed after final unregister. testMultipleOffloads passed.");
}
@Test
public void testCheckpointRecovery() throws RetinaException, IOException
{
+ System.out.println("\n[Test] Starting testCheckpointRecovery...");
+ retinaManager.addVisibility(fileId, rgId, numRows);
long timestamp = 100L;
- long transId = 999L;
// 1. Delete row 10
int rowToDelete = 10;
+ System.out.println("Deleting row " + rowToDelete + " in memory...");
retinaManager.deleteRecord(fileId, rgId, rowToDelete, timestamp);
// Verify deleted in memory
@@ -140,13 +170,15 @@ public void testCheckpointRecovery() throws RetinaException, IOException
assertTrue("Row 10 should be deleted in memory", isBitSet(memBitmap, rowToDelete));
// 2. Register Offload to generate checkpoint file
+ System.out.println("Creating checkpoint on disk...");
retinaManager.registerOffload(timestamp);
- String offloadPath = resolve(testCheckpointDir, "vis_offload_" + timestamp + ".bin");
+ String offloadPath = resolve(testCheckpointDir, getOffloadFileName(timestamp));
assertTrue("Checkpoint file should exist", storage.exists(offloadPath));
// 3. Rename offload file to GC file to simulate checkpoint generated by GC
- String gcPath = resolve(testCheckpointDir, "vis_gc_" + timestamp + ".bin");
- // Storage interface doesn't have renamed, using copy and delete
+ String gcPath = resolve(testCheckpointDir, getGcFileName(timestamp));
+ System.out.println("Simulating GC checkpoint by renaming offload file to: " + gcPath);
+ // Storage interface doesn't have rename, using copy and delete
try (DataInputStream in = storage.open(offloadPath);
DataOutputStream out = storage.create(gcPath, true, 4096))
{
@@ -160,58 +192,140 @@ public void testCheckpointRecovery() throws RetinaException, IOException
storage.delete(offloadPath, false);
// 4. Reset singleton state (Simulate Crash/Restart)
+ System.out.println("Simulating system restart (resetting memory state)...");
resetSingletonState();
// 5. Perform recovery
- // At this point rgVisibilityMap is empty, need to call recoverCheckpoints to load data into cache
+ System.out.println("Running recoverCheckpoints()...");
+ // At this point rgVisibilityMap is empty, recoverCheckpoints will load data directly into rgVisibilityMap
retinaManager.recoverCheckpoints();
- // 6. Re-add Visibility, at this point it should recover state from recoveryCache instead of creating new
- retinaManager.addVisibility(fileId, rgId, numRows);
-
- // 7. Verify recovered state: Row 10 should still be in deleted state
+ // 6. Verify recovered state immediately after recovery
+ System.out.println("Verifying recovered state immediately after recoverCheckpoints()...");
long[] recoveredBitmap = retinaManager.queryVisibility(fileId, rgId, timestamp);
- assertTrue("Row 10 should still be deleted after recovery", isBitSet(recoveredBitmap, rowToDelete));
+ assertTrue("Row 10 should be deleted after recovery", isBitSet(recoveredBitmap, rowToDelete));
assertFalse("Row 11 should not be deleted", isBitSet(recoveredBitmap, rowToDelete + 1));
+
+ // 7. Re-add Visibility, at this point it should see that it already exists in rgVisibilityMap
+ System.out.println("Re-adding visibility for file (should skip as it already exists)...");
+ retinaManager.addVisibility(fileId, rgId, numRows);
+
+ // 8. Verify state still correct
+ long[] finalBitmap = retinaManager.queryVisibility(fileId, rgId, timestamp);
+ assertTrue("Row 10 should still be deleted", isBitSet(finalBitmap, rowToDelete));
+ System.out.println("Verified: Recovery successful, row state restored directly to map. testCheckpointRecovery passed.");
}
@Test
- public void testDiskBitmapQuery() throws RetinaException
+ public void testCheckpointRetryAfterFailure() throws RetinaException, IOException
{
- long baseTimestamp = 200L;
- long transId = 888L;
+ System.out.println("\n[Test] Starting testCheckpointRetryAfterFailure...");
+ retinaManager.addVisibility(fileId, rgId, numRows);
+ long timestamp = 123L;
+
+ String expectedFile = resolve(testCheckpointDir, getOffloadFileName(timestamp));
+
+ // 1. Pre-create a DIRECTORY with the same name to cause creation failure
+ storage.mkdirs(expectedFile);
+ System.out.println("Created a directory at " + expectedFile + " to simulate failure.");
+
+ // 2. Try to register offload - should fail
+ try
+ {
+ retinaManager.registerOffload(timestamp);
+ assertTrue("Should have thrown an exception", false);
+ } catch (RetinaException e)
+ {
+ System.out.println("Expected failure occurred: " + e.getMessage());
+ }
+
+ // 3. Remove the directory
+ storage.delete(expectedFile, true);
+ assertFalse("Directory should be removed", storage.exists(expectedFile));
+
+ // 4. Try again - should succeed now because we clear failed futures
+ System.out.println("Retrying registration...");
+ retinaManager.registerOffload(timestamp);
+
+ assertTrue("Offload checkpoint file should exist after retry", storage.exists(expectedFile));
+ System.out.println("Verified: Retry successful. testCheckpointRetryAfterFailure passed.");
+ }
+
+ @Test
+ public void testMultiRGCheckpoint() throws RetinaException, IOException
+ {
+ System.out.println("\n[Test] Starting testMultiRGCheckpoint...");
+ int numRgs = 3;
+ for (int i = 0; i < numRgs; i++)
+ {
+ retinaManager.addVisibility(fileId, i, numRows);
+ }
+ long timestamp = 200L;
- // 1. Delete row 5 at baseTimestamp
- retinaManager.deleteRecord(fileId, rgId, 5, baseTimestamp);
+ // Delete records in different RGs
+ retinaManager.deleteRecord(fileId, 0, 10, timestamp);
+ retinaManager.deleteRecord(fileId, 1, 20, timestamp);
+ retinaManager.deleteRecord(fileId, 2, 30, timestamp);
- // 2. Register Offload for this transaction (save snapshot at this moment to disk)
- retinaManager.registerOffload(baseTimestamp);
+ // Create checkpoint
+ retinaManager.registerOffload(timestamp);
+ String offloadPath = resolve(testCheckpointDir, getOffloadFileName(timestamp));
+
+ // Simulating GC checkpoint for recovery
+ String gcPath = resolve(testCheckpointDir, getGcFileName(timestamp));
+ try (DataInputStream in = storage.open(offloadPath);
+ DataOutputStream out = storage.create(gcPath, true, 4096))
+ {
+ byte[] buffer = new byte[4096];
+ int bytesRead;
+ while ((bytesRead = in.read(buffer)) != -1)
+ {
+ out.write(buffer, 0, bytesRead);
+ }
+ }
+
+ // Reset and recover
+ resetSingletonState();
+ retinaManager.recoverCheckpoints();
- // 3. Delete row 6 at a later time baseTimestamp + 10
- // This only affects the latest state in memory, should not affect the checkpoint on disk
- retinaManager.deleteRecord(fileId, rgId, 6, baseTimestamp + 10);
+ // Verify all RGs
+ assertTrue("RG 0 row 10 should be deleted", isBitSet(retinaManager.queryVisibility(fileId, 0, timestamp), 10));
+ assertTrue("RG 1 row 20 should be deleted", isBitSet(retinaManager.queryVisibility(fileId, 1, timestamp), 20));
+ assertTrue("RG 2 row 30 should be deleted", isBitSet(retinaManager.queryVisibility(fileId, 2, timestamp), 30));
+
+ System.out.println("Verified: Multi-RG state correctly restored. testMultiRGCheckpoint passed.");
+ }
- // 4. Case A: Query using transId (should read disk Checkpoint)
- // Expected: Row 5 deleted, Row 6 not deleted (deleted after checkpoint)
- long[] diskBitmap = retinaManager.queryVisibility(fileId, rgId, baseTimestamp, transId);
- assertTrue("Disk: Row 5 should be deleted", isBitSet(diskBitmap, 5));
- assertFalse("Disk: Row 6 should NOT be deleted (deleted after checkpoint)", isBitSet(diskBitmap, 6));
+ @Test
+ public void testCheckpointDataIntegrity() throws RetinaException, IOException
+ {
+ System.out.println("\n[Test] Starting testCheckpointDataIntegrity...");
+ int numRgs = 5;
+ for (int i = 0; i < numRgs; i++)
+ {
+ retinaManager.addVisibility(fileId, i, numRows);
+ }
+ long timestamp = 300L;
- // 5. Case B: Query without transId (read memory)
- // Expected: Query at a later timestamp, both rows 5 and 6 are deleted
- long[] memBitmap = retinaManager.queryVisibility(fileId, rgId, baseTimestamp + 20);
- assertTrue("Memory: Row 5 should be deleted", isBitSet(memBitmap, 5));
- assertTrue("Memory: Row 6 should be deleted", isBitSet(memBitmap, 6));
+ retinaManager.registerOffload(timestamp);
+ String path = resolve(testCheckpointDir, getOffloadFileName(timestamp));
- // Cleanup
- retinaManager.unregisterOffload(baseTimestamp);
+ // Directly read file to verify header
+ try (DataInputStream in = storage.open(path))
+ {
+ int savedRgs = in.readInt();
+ assertTrue("Saved RG count " + savedRgs + " should match " + numRgs, savedRgs == numRgs);
+ }
+ System.out.println("Verified: Data integrity (header) is correct. testCheckpointDataIntegrity passed.");
}
@Test
public void testConcurrency() throws InterruptedException, RetinaException
{
- int numThreads = 100;
- int operationsPerThread = 500;
+ System.out.println("\n[Test] Starting testConcurrency with 20 threads...");
+ retinaManager.addVisibility(fileId, rgId, numRows);
+ int numThreads = 20;
+ int operationsPerThread = 50;
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch doneLatch = new CountDownLatch(numThreads);
@@ -227,7 +341,6 @@ public void testConcurrency() throws InterruptedException, RetinaException
try
{
startLatch.await();
- long transId = 10000L + threadId;
long timestamp = 500L + (threadId % 5) * 10; // Multiple threads may share the same timestamp
for (int j = 0; j < operationsPerThread; j++)
@@ -237,16 +350,11 @@ public void testConcurrency() throws InterruptedException, RetinaException
{
// Register Offload
retinaManager.registerOffload(timestamp);
- // Verify file exists
- String p = resolve(testCheckpointDir, "vis_offload_" + timestamp + ".bin");
- if (!storage.exists(p)) {
- throw new RuntimeException("Checkpoint file missing after register: " + p);
- }
}
else if (j % 3 == 1)
{
- // Query visibility
- long[] bitmap = retinaManager.queryVisibility(fileId, rgId, timestamp, transId);
+ // Query visibility from memory
+ long[] bitmap = retinaManager.queryVisibility(fileId, rgId, timestamp);
if (!isBitSet(bitmap, 0)) {
throw new RuntimeException("Row 0 should be deleted in all views");
}
@@ -269,7 +377,7 @@ else if (j % 3 == 1)
}
startLatch.countDown(); // Start all threads
- boolean finished = doneLatch.await(30, TimeUnit.SECONDS);
+ boolean finished = doneLatch.await(60, TimeUnit.SECONDS);
executor.shutdownNow();
assertTrue("Timeout waiting for concurrency test", finished);
@@ -303,30 +411,12 @@ private void resetSingletonState()
gcTimestampField.setAccessible(true);
gcTimestampField.setLong(retinaManager, -1L);
- Field recoveryCacheField = RetinaResourceManager.class.getDeclaredField("recoveryCache");
- recoveryCacheField.setAccessible(true);
- ((Map, ?>) recoveryCacheField.get(retinaManager)).clear();
-
} catch (Exception e)
{
throw new RuntimeException("Failed to reset singleton state", e);
}
}
- private boolean assertTrue(String message, boolean condition)
- {
- if (!condition)
- {
- throw new AssertionError(message);
- }
- return condition;
- }
-
- private boolean assertFalse(String message, boolean condition)
- {
- return assertTrue(message, !condition);
- }
-
private boolean isBitSet(long[] bitmap, int rowIndex)
{
if (bitmap == null || bitmap.length == 0) return false;
diff --git a/pom.xml b/pom.xml
index 1ca34493c..78011c6a5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -148,6 +148,7 @@
1.8.2
4.5.1
5.8.2
+ 5.8.2
@@ -927,6 +928,12 @@
${dep.junit.jupiter.version}
test
+
+ org.junit.vintage
+ junit-vintage-engine
+ ${dep.junit.vintage.version}
+ test
+
org.mockito
diff --git a/proto/retina.proto b/proto/retina.proto
index e41cb93f5..9c625aecd 100644
--- a/proto/retina.proto
+++ b/proto/retina.proto
@@ -55,8 +55,8 @@ message RequestHeader {
message ResponseHeader {
string token = 1;
- int32 errorCode = 2; // errorCode == 0 means success
- string errorMsg = 3; // empty if errorCode == 0
+ int32 errorCode = 2; // errorCode == 0 means success
+ string errorMsg = 3; // empty if errorCode == 0
}
// auxiliary structure
@@ -135,6 +135,7 @@ message QueryVisibilityRequest {
message QueryVisibilityResponse {
ResponseHeader header = 1;
repeated VisibilityBitmap bitmaps = 2;
+ string checkpointPath = 3;
}
message ReclaimVisibilityRequest {