From 741360fec46cd5776f277903bb84213b7d34f5e4 Mon Sep 17 00:00:00 2001 From: Dongyang Geng Date: Tue, 10 Feb 2026 00:04:12 +0800 Subject: [PATCH 01/15] feat: add cp cache --- pixels-retina/pom.xml | 6 + .../pixels/retina/RetinaResourceManager.java | 248 ++++++++++++------ .../pixels/retina/TestRetinaCheckpoint.java | 137 ++++++++-- pom.xml | 7 + 4 files changed, 296 insertions(+), 102 deletions(-) diff --git a/pixels-retina/pom.xml b/pixels-retina/pom.xml index 3d5b6086ad..d391f96518 100644 --- a/pixels-retina/pom.xml +++ b/pixels-retina/pom.xml @@ -116,6 +116,12 @@ log4j-over-slf4j true + + + org.junit.vintage + junit-vintage-engine + test + diff --git a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java index d9cbb972db..9ce2d5fc6f 100644 --- a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java +++ b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java @@ -30,6 +30,8 @@ import io.pixelsdb.pixels.common.physical.PhysicalReaderUtil; import io.pixelsdb.pixels.common.physical.Storage; import io.pixelsdb.pixels.common.physical.StorageFactory; +import io.pixelsdb.pixels.common.transaction.TransContext; +import io.pixelsdb.pixels.common.transaction.TransContextCache; import io.pixelsdb.pixels.common.transaction.TransService; import io.pixelsdb.pixels.common.utils.ConfigFactory; import io.pixelsdb.pixels.core.PixelsProto; @@ -45,10 +47,7 @@ import java.nio.ByteOrder; import java.nio.file.Paths; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -67,7 +66,9 @@ public class RetinaResourceManager private final ScheduledExecutorService gcExecutor; // Checkpoint related fields + private final ExecutorService checkpointExecutor; private final Map offloadedCheckpoints; + private final Map> offloadCache; private final String checkpointDir; private long latestGcTimestamp = -1; private final int totalVirtualNodeNum; @@ -99,10 +100,17 @@ private RetinaResourceManager() this.rgVisibilityMap = new ConcurrentHashMap<>(); this.pixelsWriteBufferMap = new ConcurrentHashMap<>(); this.offloadedCheckpoints = new ConcurrentHashMap<>(); + this.offloadCache = new ConcurrentHashMap<>(); this.checkpointRefCounts = new ConcurrentHashMap<>(); this.checkpointDir = ConfigFactory.Instance().getProperty("pixels.retina.checkpoint.dir"); this.recoveryCache = new ConcurrentHashMap<>(); + this.checkpointExecutor = Executors.newFixedThreadPool(4, r -> { + Thread t = new Thread(r, "retina-checkpoint-thread"); + t.setDaemon(true); + return t; + }); + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(r -> { Thread t = new Thread(r, "retina-gc-thread"); t.setDaemon(true); @@ -121,9 +129,19 @@ private RetinaResourceManager() TimeUnit.SECONDS ); } + + // Start offload monitor + String offloadIntervalStr = config.getProperty("retina.offload.monitor.interval"); + long offloadInterval = (offloadIntervalStr != null) ? Long.parseLong(offloadIntervalStr) : 5; + executor.scheduleAtFixedRate( + this::monitorAndOffloadTransactions, + offloadInterval, + offloadInterval, + TimeUnit.SECONDS + ); } catch (Exception e) { - logger.error("Failed to start retina background gc", e); + logger.error("Failed to start retina background gc or monitor", e); } this.gcExecutor = executor; totalVirtualNodeNum = Integer.parseInt(ConfigFactory.Instance().getProperty("node.virtual.num")); @@ -151,6 +169,55 @@ public static RetinaResourceManager Instance() return InstanceHolder.instance; } + private void monitorAndOffloadTransactions() + { + try + { + String thresholdStr = ConfigFactory.Instance().getProperty("retina.offload.threshold"); + long thresholdMs = (thresholdStr != null) ? Long.parseLong(thresholdStr) : 60000; + long currentTime = System.currentTimeMillis(); + + Map activeTransactions = getActiveTransactions(); + for (TransContext context : activeTransactions.values()) + { + if (context.isReadOnly() && !context.isOffloaded()) + { + if (currentTime - context.getStartTime() > thresholdMs) + { + try + { + logger.info("Transaction {} exceeds threshold, offloading...", context.getTransId()); + registerOffload(context.getTimestamp()); + TransService.Instance().markTransOffloaded(context.getTransId()); + context.setOffloaded(true); + } catch (Exception e) + { + logger.error("Failed to offload transaction {}", context.getTransId(), e); + } + } + } + } + } catch (Exception e) + { + logger.error("Error in offload monitor", e); + } + } + + @SuppressWarnings("unchecked") + private Map getActiveTransactions() + { + try + { + java.lang.reflect.Field field = TransContextCache.class.getDeclaredField("transIdToContext"); + field.setAccessible(true); + return (Map) field.get(TransContextCache.Instance()); + } catch (Exception e) + { + logger.error("Failed to get active transactions via reflection", e); + return Collections.emptyMap(); + } + } + public void recoverCheckpoints() { try @@ -309,14 +376,14 @@ public void addVisibility(String filePath) throws RetinaException public long[] queryVisibility(long fileId, int rgId, long timestamp, long transId) throws RetinaException { - // [Routing Logic] Only read from disk if the transaction is explicitly registered as Offload + // [Routing Logic] Only read from disk/cache if the transaction is explicitly registered as Offload if (transId != -1) { if (offloadedCheckpoints.containsKey(timestamp)) { - return loadBitmapFromDisk(timestamp, fileId, rgId); + return queryFromOffloadCache(timestamp, fileId, rgId); } - throw new RetinaException("Offloaded checkpoint missing for TransID" + transId); + throw new RetinaException("Offloaded checkpoint missing for TransID " + transId); } // otherwise read from memory RGVisibility rgVisibility = checkRGVisibility(fileId, rgId); @@ -328,6 +395,21 @@ public long[] queryVisibility(long fileId, int rgId, long timestamp, long transI return visibilityBitmap; } + private long[] queryFromOffloadCache(long timestamp, long fileId, int rgId) throws RetinaException + { + Map cache = offloadCache.get(timestamp); + if (cache != null) + { + long[] bitmap = cache.get(fileId + "_" + rgId); + if (bitmap != null) + { + return bitmap; + } + } + // Cache miss, load from disk + return loadBitmapFromDisk(timestamp, fileId, rgId); + } + public long[] queryVisibility(long fileId, int rgId, long timestamp) throws RetinaException { return queryVisibility(fileId, rgId, timestamp, -1); @@ -363,12 +445,18 @@ public void registerOffload(long timestamp) throws RetinaException { if (!offloadedCheckpoints.containsKey(timestamp)) { - createCheckpoint(timestamp, CheckpointType.OFFLOAD); + CompletableFuture future = createCheckpoint(timestamp, CheckpointType.OFFLOAD); + // For performance testing or specific needs, we might want to wait. + // But for general offload, we let it run asynchronously. + // To maintain current test's expectations, we add a way to wait if needed, + // or just wait here if it's the first time. + future.join(); } } catch (Exception e) { refCount.decrementAndGet(); - throw e; + if (e instanceof RuntimeException) throw (RuntimeException) e; + throw new RetinaException("Failed to register offload", e); } } } @@ -388,6 +476,7 @@ public void unregisterOffload(long timestamp) if (remaining <= 0) { offloadedCheckpoints.remove(timestamp); + offloadCache.remove(timestamp); if (refCount.get() > 0) { logger.info("Checkpoint resurrection detected, skipping deletion. TS: {}", timestamp); @@ -401,62 +490,70 @@ public void unregisterOffload(long timestamp) } } - private void createCheckpoint(long timestamp, CheckpointType type) throws RetinaException + private CompletableFuture createCheckpoint(long timestamp, CheckpointType type) throws RetinaException { String prefix = (type == CheckpointType.GC) ? "vis_gc_" : "vis_offload_"; String fileName = prefix + timestamp + ".bin"; String filePath = checkpointDir.endsWith("/") ? checkpointDir + fileName : checkpointDir + "/" + fileName; - try + // 1. Snapshot: capture bitmaps in memory synchronously to ensure consistency + Map snapshot = new HashMap<>(); + for (Map.Entry entry : this.rgVisibilityMap.entrySet()) { - Storage storage = StorageFactory.Instance().getStorage(filePath); - // Write directly to the final file as atomic move is not supported by all storages (e.g. S3). - // Object stores typically guarantee atomicity of the put operation. - try (DataOutputStream out = storage.create(filePath, true, 4096)) + snapshot.put(entry.getKey(), entry.getValue().getVisibilityBitmap(timestamp)); + } + + // 2. Async Write: perform IO in background thread + return CompletableFuture.runAsync(() -> { + try { - int rgCount = this.rgVisibilityMap.size(); - out.writeInt(rgCount); - for (Map.Entry entry : this.rgVisibilityMap.entrySet()) + Storage storage = StorageFactory.Instance().getStorage(filePath); + try (DataOutputStream out = storage.create(filePath, true, 8 * 1024 * 1024)) { - String[] parts = entry.getKey().split("_"); - long fileId = Long.parseLong(parts[0]); - int rgId = Integer.parseInt(parts[1]); - long[] bitmap = entry.getValue().getVisibilityBitmap(timestamp); - - out.writeLong(fileId); - out.writeInt(rgId); - out.writeInt(bitmap.length); - for (long l : bitmap) + out.writeInt(snapshot.size()); + for (Map.Entry entry : snapshot.entrySet()) { - out.writeLong(l); + String[] parts = entry.getKey().split("_"); + long fileId = Long.parseLong(parts[0]); + int rgId = Integer.parseInt(parts[1]); + long[] bitmap = entry.getValue(); + + out.writeLong(fileId); + out.writeInt(rgId); + out.writeInt(bitmap.length); + for (long l : bitmap) + { + out.writeLong(l); + } } + out.flush(); } - out.flush(); - } - if (type == CheckpointType.OFFLOAD) - { - offloadedCheckpoints.put(timestamp, filePath); - } else - { - long oldGcTs = this.latestGcTimestamp; - this.latestGcTimestamp = timestamp; - if (oldGcTs != -1 && oldGcTs != timestamp) + if (type == CheckpointType.OFFLOAD) + { + offloadedCheckpoints.put(timestamp, filePath); + } else { - removeCheckpointFile(oldGcTs, CheckpointType.GC); + long oldGcTs = this.latestGcTimestamp; + this.latestGcTimestamp = timestamp; + if (oldGcTs != -1 && oldGcTs != timestamp) + { + removeCheckpointFile(oldGcTs, CheckpointType.GC); + } } - } - } catch (IOException e) - { - // Try to cleanup the potentially corrupted or partial file - try - { - StorageFactory.Instance().getStorage(filePath).delete(filePath, false); - } catch (IOException ignored) + } catch (IOException e) { + logger.error("Failed to commit {} checkpoint file for timestamp: {}", type, timestamp, e); + // Try to cleanup the potentially corrupted or partial file + try + { + StorageFactory.Instance().getStorage(filePath).delete(filePath, false); + } catch (IOException ignored) + { + } + throw new CompletionException(e); } - throw new RetinaException("Failed to commit checkpoint file", e); - } + }, checkpointExecutor); } private long[] loadBitmapFromDisk(long timestamp, long targetFileId, int targetRgId) throws RetinaException @@ -467,45 +564,50 @@ private long[] loadBitmapFromDisk(long timestamp, long targetFileId, int targetR throw new RetinaException("Checkpoint missing: " + timestamp); } - try + // Use a lock to ensure only one thread parses the file for this timestamp + Map cache = offloadCache.computeIfAbsent(timestamp, k -> new ConcurrentHashMap<>()); + + synchronized (cache) { - Storage storage = StorageFactory.Instance().getStorage(path); - if (!storage.exists(path)) + // Double check if target already in cache after acquiring lock + long[] cached = cache.get(targetFileId + "_" + targetRgId); + if (cached != null) { - throw new RetinaException("Checkpoint file missing: " + path); + return cached; } - try (DataInputStream in = storage.open(path)) + // Still not in cache, perform full load + try { - int rgCount = in.readInt(); - for (int i = 0; i < rgCount; i++) + Storage storage = StorageFactory.Instance().getStorage(path); + if (!storage.exists(path)) { - long fileId = in.readLong(); - int rgId = in.readInt(); - int len = in.readInt(); - if (fileId == targetFileId && rgId == targetRgId) + throw new RetinaException("Checkpoint file missing: " + path); + } + + try (DataInputStream in = storage.open(path)) + { + int rgCount = in.readInt(); + for (int i = 0; i < rgCount; i++) { + long fileId = in.readLong(); + int rgId = in.readInt(); + int len = in.readInt(); long[] bitmap = new long[len]; for (int j = 0; j < len; j++) { bitmap[j] = in.readLong(); } - return bitmap; - } else - { - int skipped = in.skipBytes(len * 8); - if (skipped != len * 8) - { - throw new IOException("Unexpected EOF"); - } + cache.put(fileId + "_" + rgId, bitmap); } } + } catch (IOException e) + { + throw new RetinaException("Failed to read checkpoint file", e); } - } catch (IOException e) - { - throw new RetinaException("Failed to read checkpoint file", e); } - return new long[0]; + + return cache.getOrDefault(targetFileId + "_" + targetRgId, new long[0]); } private void removeCheckpointFile(long timestamp, CheckpointType type) diff --git a/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java b/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java index c41ab36ae9..eec2847006 100644 --- a/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java +++ b/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java @@ -31,13 +31,15 @@ import java.io.IOException; import java.lang.reflect.Field; import java.util.Map; -import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + /** * Test checkpoint creation and recovery logic. */ @@ -46,7 +48,7 @@ public class TestRetinaCheckpoint private RetinaResourceManager retinaManager; private String testCheckpointDir; private Storage storage; - private final long fileId = 1L; + private final long fileId = 999999L; private final int rgId = 0; private final int numRows = 1024; @@ -74,7 +76,7 @@ public void setUp() throws IOException, RetinaException } retinaManager = RetinaResourceManager.Instance(); - retinaManager.addVisibility(fileId, rgId, numRows); + resetSingletonState(); } private String resolve(String dir, String filename) { @@ -84,7 +86,7 @@ private String resolve(String dir, String filename) { @Test public void testRegisterOffload() throws RetinaException, IOException { - long transId = 12345L; + retinaManager.addVisibility(fileId, rgId, numRows); long timestamp = 100L; // Register offload @@ -104,9 +106,8 @@ public void testRegisterOffload() throws RetinaException, IOException @Test public void testMultipleOffloads() throws RetinaException, IOException { - long transId1 = 12345L; + retinaManager.addVisibility(fileId, rgId, numRows); long timestamp1 = 100L; - long transId2 = 12346L; long timestamp1_dup = 100L; // same timestamp // Both register the same timestamp - should share checkpoint @@ -128,8 +129,8 @@ public void testMultipleOffloads() throws RetinaException, IOException @Test public void testCheckpointRecovery() throws RetinaException, IOException { + retinaManager.addVisibility(fileId, rgId, numRows); long timestamp = 100L; - long transId = 999L; // 1. Delete row 10 int rowToDelete = 10; @@ -178,6 +179,7 @@ public void testCheckpointRecovery() throws RetinaException, IOException @Test public void testDiskBitmapQuery() throws RetinaException { + retinaManager.addVisibility(fileId, rgId, numRows); long baseTimestamp = 200L; long transId = 888L; @@ -210,8 +212,9 @@ public void testDiskBitmapQuery() throws RetinaException @Test public void testConcurrency() throws InterruptedException, RetinaException { - int numThreads = 100; - int operationsPerThread = 500; + retinaManager.addVisibility(fileId, rgId, numRows); + int numThreads = 20; + int operationsPerThread = 50; ExecutorService executor = Executors.newFixedThreadPool(numThreads); CountDownLatch startLatch = new CountDownLatch(1); CountDownLatch doneLatch = new CountDownLatch(numThreads); @@ -237,11 +240,6 @@ public void testConcurrency() throws InterruptedException, RetinaException { // Register Offload retinaManager.registerOffload(timestamp); - // Verify file exists - String p = resolve(testCheckpointDir, "vis_offload_" + timestamp + ".bin"); - if (!storage.exists(p)) { - throw new RuntimeException("Checkpoint file missing after register: " + p); - } } else if (j % 3 == 1) { @@ -269,7 +267,7 @@ else if (j % 3 == 1) } startLatch.countDown(); // Start all threads - boolean finished = doneLatch.await(30, TimeUnit.SECONDS); + boolean finished = doneLatch.await(60, TimeUnit.SECONDS); executor.shutdownNow(); assertTrue("Timeout waiting for concurrency test", finished); @@ -295,6 +293,10 @@ private void resetSingletonState() offloadedField.setAccessible(true); ((Map) offloadedField.get(retinaManager)).clear(); + Field offloadCacheField = RetinaResourceManager.class.getDeclaredField("offloadCache"); + offloadCacheField.setAccessible(true); + ((Map) offloadCacheField.get(retinaManager)).clear(); + Field refCountsField = RetinaResourceManager.class.getDeclaredField("checkpointRefCounts"); refCountsField.setAccessible(true); ((Map) refCountsField.get(retinaManager)).clear(); @@ -313,20 +315,6 @@ private void resetSingletonState() } } - private boolean assertTrue(String message, boolean condition) - { - if (!condition) - { - throw new AssertionError(message); - } - return condition; - } - - private boolean assertFalse(String message, boolean condition) - { - return assertTrue(message, !condition); - } - private boolean isBitSet(long[] bitmap, int rowIndex) { if (bitmap == null || bitmap.length == 0) return false; @@ -338,4 +326,95 @@ private boolean isBitSet(long[] bitmap, int rowIndex) return (bitmap[longIndex] & (1L << bitOffset)) != 0; } + + @Test + public void testCheckpointPerformance() throws RetinaException, InterruptedException + { + // 1. Configuration parameters + int numFiles = 50000; + int rgsPerFile = 1; + int rowsPerRG = 200000; // rows per Row Group + long totalRecords = (long) numFiles * rgsPerFile * rowsPerRG; + double targetDeleteRatio = 0.1; + int queryCount = 200; + + long timestamp = 1000L; + long transId = 2000L; + java.util.Random random = new java.util.Random(); + + System.out.println("--- Starting Checkpoint Performance Test ---"); + System.out.printf("Config: %d files, %d RGs/file, %d rows/RG, %d queries\n", + numFiles, rgsPerFile, rowsPerRG, queryCount); + System.out.printf("Target Delete Ratio: %.2f%%\n", targetDeleteRatio * 100); + + // 2. Initialize data and perform random deletes, accurately count actual deleted rows + long totalActualDeletedRows = 0; + for (int f = 0; f < numFiles; f++) + { + for (int r = 0; r < rgsPerFile; r++) + { + retinaManager.addVisibility(f, r, rowsPerRG); + int targetDeleteCount = (int) (rowsPerRG * targetDeleteRatio); + + java.util.Set deletedInRG = new java.util.HashSet<>(); + while (deletedInRG.size() < targetDeleteCount) + { + int rowId = random.nextInt(rowsPerRG); + if (deletedInRG.add(rowId)) + { + retinaManager.deleteRecord(f, r, rowId, timestamp); + } + } + totalActualDeletedRows += deletedInRG.size(); + } + } + + double actualDeleteRatio = (double) totalActualDeletedRows / totalRecords; + System.out.printf("Actual Total Deleted Rows: %d\n", totalActualDeletedRows); + System.out.printf("Actual Delete Ratio: %.4f%%\n", actualDeleteRatio * 100); + + // 3. Test Offload (Checkpoint Creation) performance + long startOffload = System.nanoTime(); + retinaManager.registerOffload(timestamp); + long endOffload = System.nanoTime(); + double offloadTimeMs = (endOffload - startOffload) / 1e6; + + System.out.printf("Total Offload Time (Writing to disk): %.2f ms\n", offloadTimeMs); + System.out.printf("Average Offload Time per RG: %.4f ms\n", offloadTimeMs / (numFiles * rgsPerFile)); + + // 4. Test Load (Checkpoint Load) performance + long firstLoadTimeNs = 0; + long subsequentLoadsTotalNs = 0; + + for (int i = 0; i < queryCount; i++) + { + int f = random.nextInt(numFiles); + int r = random.nextInt(rgsPerFile); + + long start = System.nanoTime(); + retinaManager.queryVisibility(f, r, timestamp, transId); + long end = System.nanoTime(); + + if (i == 0) + { + firstLoadTimeNs = (end - start); + } + else + { + subsequentLoadsTotalNs += (end - start); + } + } + + double totalLoadTimeMs = (firstLoadTimeNs + subsequentLoadsTotalNs) / 1e6; + double firstLoadTimeMs = firstLoadTimeNs / 1e6; + double avgSubsequentLoadTimeMs = (subsequentLoadsTotalNs / (queryCount - 1.0)) / 1e6; + + System.out.printf("Total Load Time (for %d queries): %.2f ms\n", queryCount, totalLoadTimeMs); + System.out.printf("First Load Time (Cold Start IO): %.2f ms\n", firstLoadTimeMs); + System.out.printf("Average Subsequent Load Time (Memory Hit): %.6f ms\n", avgSubsequentLoadTimeMs); + + // 5. Cleanup + retinaManager.unregisterOffload(timestamp); + System.out.println("--- Performance Test Finished ---\n"); + } } diff --git a/pom.xml b/pom.xml index 1ca34493c8..78011c6a56 100644 --- a/pom.xml +++ b/pom.xml @@ -148,6 +148,7 @@ 1.8.2 4.5.1 5.8.2 + 5.8.2 @@ -927,6 +928,12 @@ ${dep.junit.jupiter.version} test + + org.junit.vintage + junit-vintage-engine + ${dep.junit.vintage.version} + test + org.mockito From 7114affaa93c3f1d59036f752d5c83fd5406176a Mon Sep 17 00:00:00 2001 From: Dongyang Geng Date: Tue, 10 Feb 2026 11:18:52 +0800 Subject: [PATCH 02/15] feat: cp read opt --- .../pixels/retina/RetinaResourceManager.java | 113 ++++++++++++++++-- .../pixels/retina/TestRetinaCheckpoint.java | 66 +++++++--- 2 files changed, 150 insertions(+), 29 deletions(-) diff --git a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java index 9ce2d5fc6f..195110ffab 100644 --- a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java +++ b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java @@ -431,6 +431,7 @@ public void registerOffload(long timestamp) throws RetinaException { AtomicInteger refCount = checkpointRefCounts.computeIfAbsent(timestamp, k -> new AtomicInteger(0)); + CompletableFuture currentFuture = null; synchronized (refCount) { if (checkpointRefCounts.get(timestamp) != refCount) @@ -445,12 +446,7 @@ public void registerOffload(long timestamp) throws RetinaException { if (!offloadedCheckpoints.containsKey(timestamp)) { - CompletableFuture future = createCheckpoint(timestamp, CheckpointType.OFFLOAD); - // For performance testing or specific needs, we might want to wait. - // But for general offload, we let it run asynchronously. - // To maintain current test's expectations, we add a way to wait if needed, - // or just wait here if it's the first time. - future.join(); + currentFuture = createCheckpoint(timestamp, CheckpointType.OFFLOAD); } } catch (Exception e) { @@ -460,6 +456,43 @@ public void registerOffload(long timestamp) throws RetinaException } } } + + if (currentFuture != null) + { + try + { + currentFuture.join(); + } catch (Exception e) + { + // Checkpoint creation failed + synchronized (refCount) + { + refCount.decrementAndGet(); + } + throw new RetinaException("Failed to create checkpoint", e); + } + } + else + { + // Wait for the thread that's currently creating the checkpoint + while (!offloadedCheckpoints.containsKey(timestamp) && checkpointRefCounts.containsKey(timestamp)) + { + try + { + Thread.sleep(100); + } catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + throw new RetinaException("Interrupted while waiting for offload", e); + } + } + if (!offloadedCheckpoints.containsKey(timestamp)) + { + // Maybe the other thread failed or it was unregistered immediately + continue; + } + } + logger.info("Registered offload for Timestamp: {}", timestamp); return; } @@ -497,14 +530,18 @@ private CompletableFuture createCheckpoint(long timestamp, CheckpointType String filePath = checkpointDir.endsWith("/") ? checkpointDir + fileName : checkpointDir + "/" + fileName; // 1. Snapshot: capture bitmaps in memory synchronously to ensure consistency - Map snapshot = new HashMap<>(); + long startSnapshot = System.currentTimeMillis(); + Map snapshot = new HashMap<>(this.rgVisibilityMap.size()); for (Map.Entry entry : this.rgVisibilityMap.entrySet()) { snapshot.put(entry.getKey(), entry.getValue().getVisibilityBitmap(timestamp)); } + long endSnapshot = System.currentTimeMillis(); + logger.info("Snapshot for {} checkpoint took {} ms, size: {}", type, (endSnapshot - startSnapshot), snapshot.size()); // 2. Async Write: perform IO in background thread return CompletableFuture.runAsync(() -> { + long startWrite = System.currentTimeMillis(); try { Storage storage = StorageFactory.Instance().getStorage(filePath); @@ -528,6 +565,8 @@ private CompletableFuture createCheckpoint(long timestamp, CheckpointType } out.flush(); } + long endWrite = System.currentTimeMillis(); + logger.info("Writing {} checkpoint file to {} took {} ms", type, filePath, (endWrite - startWrite)); if (type == CheckpointType.OFFLOAD) { @@ -565,10 +604,18 @@ private long[] loadBitmapFromDisk(long timestamp, long targetFileId, int targetR } // Use a lock to ensure only one thread parses the file for this timestamp - Map cache = offloadCache.computeIfAbsent(timestamp, k -> new ConcurrentHashMap<>()); - + // For 50,000 RGs, the initial capacity of ConcurrentHashMap should be sufficient + Map cache = offloadCache.computeIfAbsent(timestamp, k -> new ConcurrentHashMap<>(50000)); + + long lockWaitStart = System.currentTimeMillis(); synchronized (cache) { + long lockAcquiredTime = System.currentTimeMillis(); + if (lockAcquiredTime - lockWaitStart > 0) + { + System.out.println("Wait for checkpoint lock took " + (lockAcquiredTime - lockWaitStart) + " ms"); + } + // Double check if target already in cache after acquiring lock long[] cached = cache.get(targetFileId + "_" + targetRgId); if (cached != null) @@ -577,19 +624,45 @@ private long[] loadBitmapFromDisk(long timestamp, long targetFileId, int targetR } // Still not in cache, perform full load + long startTime = System.currentTimeMillis(); try { + long s1 = System.currentTimeMillis(); Storage storage = StorageFactory.Instance().getStorage(path); - if (!storage.exists(path)) + long fileLength = storage.getStatus(path).getLength(); + long s2 = System.currentTimeMillis(); + System.out.println("Storage get and length check took " + (s2 - s1) + " ms, file size: " + (fileLength / 1024) + " KB"); + + // Use PhysicalReader to read the entire file into memory at once + // This is much faster than streaming from S3 for many small reads + byte[] content; + try (PhysicalReader reader = PhysicalReaderUtil.newPhysicalReader(storage, path)) { - throw new RetinaException("Checkpoint file missing: " + path); + ByteBuffer buffer = reader.readFully((int) fileLength); + if (buffer.hasArray()) + { + content = buffer.array(); + } + else + { + content = new byte[(int) fileLength]; + buffer.get(content); + } } + long s3 = System.currentTimeMillis(); + System.out.println("PhysicalReader readFully took " + (s3 - s2) + " ms"); - try (DataInputStream in = storage.open(path)) + try (DataInputStream in = new DataInputStream(new ByteArrayInputStream(content))) { int rgCount = in.readInt(); + System.out.println("Loading checkpoint from memory buffer, RG count: " + rgCount); + + long totalReadNanos = 0; + long totalPutNanos = 0; + for (int i = 0; i < rgCount; i++) { + long t1 = System.nanoTime(); long fileId = in.readLong(); int rgId = in.readInt(); int len = in.readInt(); @@ -598,9 +671,25 @@ private long[] loadBitmapFromDisk(long timestamp, long targetFileId, int targetR { bitmap[j] = in.readLong(); } + long t2 = System.nanoTime(); + cache.put(fileId + "_" + rgId, bitmap); + long t3 = System.nanoTime(); + + totalReadNanos += (t2 - t1); + totalPutNanos += (t3 - t2); + + if ((i + 1) % 10000 == 0) + { + System.out.println("Processed " + (i + 1) + "/" + rgCount + " RGs... (cumulative memory parse: " + (totalReadNanos / 1_000_000) + " ms, put: " + (totalPutNanos / 1_000_000) + " ms)"); + System.out.flush(); + } } + long s4 = System.currentTimeMillis(); + System.out.println("Core loop finished. Total memory parse: " + (totalReadNanos / 1_000_000) + " ms, Total cache put: " + (totalPutNanos / 1_000_000) + " ms. Loop wall time: " + (s4 - s3) + " ms"); } + long endTime = System.currentTimeMillis(); + System.out.println("Loaded " + cache.size() + " RGs from checkpoint " + path + " in total " + (endTime - startTime) + " ms (wall time)"); } catch (IOException e) { throw new RetinaException("Failed to read checkpoint file", e); diff --git a/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java b/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java index eec2847006..696437b20c 100644 --- a/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java +++ b/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java @@ -36,6 +36,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.LongAdder; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -340,7 +341,6 @@ public void testCheckpointPerformance() throws RetinaException, InterruptedExcep long timestamp = 1000L; long transId = 2000L; - java.util.Random random = new java.util.Random(); System.out.println("--- Starting Checkpoint Performance Test ---"); System.out.printf("Config: %d files, %d RGs/file, %d rows/RG, %d queries\n", @@ -348,29 +348,59 @@ public void testCheckpointPerformance() throws RetinaException, InterruptedExcep System.out.printf("Target Delete Ratio: %.2f%%\n", targetDeleteRatio * 100); // 2. Initialize data and perform random deletes, accurately count actual deleted rows - long totalActualDeletedRows = 0; + LongAdder totalActualDeletedRows = new LongAdder(); + + // Step A: Pre-add Visibility (Synchronous or Sequential to ensure correct state in memory) + for (int f = 0; f < numFiles; f++) { + for (int r = 0; r < rgsPerFile; r++) { + retinaManager.addVisibility(f, r, rowsPerRG); + } + } + + // Step B: Parallel deleteRecord (RG-level parallelism) + int numThreads = Runtime.getRuntime().availableProcessors(); + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + + CountDownLatch latch = new CountDownLatch(numFiles * rgsPerFile); for (int f = 0; f < numFiles; f++) { for (int r = 0; r < rgsPerFile; r++) { - retinaManager.addVisibility(f, r, rowsPerRG); - int targetDeleteCount = (int) (rowsPerRG * targetDeleteRatio); - - java.util.Set deletedInRG = new java.util.HashSet<>(); - while (deletedInRG.size() < targetDeleteCount) - { - int rowId = random.nextInt(rowsPerRG); - if (deletedInRG.add(rowId)) + final int fileId = f; + final int rgId = r; + executor.submit(() -> { + try + { + java.util.Random randomInThread = new java.util.Random(); + int targetDeleteCount = (int) (rowsPerRG * targetDeleteRatio); + + int actualDeletedCount = 0; + java.util.Set deletedInRG = new java.util.HashSet<>(); + while (deletedInRG.size() < targetDeleteCount) + { + int rowId = randomInThread.nextInt(rowsPerRG); + if (deletedInRG.add(rowId)) + { + retinaManager.deleteRecord(fileId, rgId, rowId, timestamp); + actualDeletedCount++; + } + } + totalActualDeletedRows.add(actualDeletedCount); + } catch (Exception e) { - retinaManager.deleteRecord(f, r, rowId, timestamp); + e.printStackTrace(); + } finally + { + latch.countDown(); } - } - totalActualDeletedRows += deletedInRG.size(); + }); } } + latch.await(30, TimeUnit.MINUTES); + executor.shutdown(); - double actualDeleteRatio = (double) totalActualDeletedRows / totalRecords; - System.out.printf("Actual Total Deleted Rows: %d\n", totalActualDeletedRows); + double actualDeleteRatio = (double) totalActualDeletedRows.sum() / totalRecords; + System.out.printf("Actual Total Deleted Rows: %d\n", totalActualDeletedRows.sum()); System.out.printf("Actual Delete Ratio: %.4f%%\n", actualDeleteRatio * 100); // 3. Test Offload (Checkpoint Creation) performance @@ -385,11 +415,13 @@ public void testCheckpointPerformance() throws RetinaException, InterruptedExcep // 4. Test Load (Checkpoint Load) performance long firstLoadTimeNs = 0; long subsequentLoadsTotalNs = 0; + java.util.Random randomForQuery = new java.util.Random(); for (int i = 0; i < queryCount; i++) { - int f = random.nextInt(numFiles); - int r = random.nextInt(rgsPerFile); + System.out.println("query id: " + i); + int f = randomForQuery.nextInt(numFiles); + int r = randomForQuery.nextInt(rgsPerFile); long start = System.nanoTime(); retinaManager.queryVisibility(f, r, timestamp, transId); From 48ccb5219ca6d0ec90741922f6b42f4608574da0 Mon Sep 17 00:00:00 2001 From: Dongyang Geng Date: Tue, 10 Feb 2026 12:58:08 +0800 Subject: [PATCH 03/15] feat: update cp test stat --- .../pixels/retina/TestRetinaCheckpoint.java | 127 +++++++++++++++--- pom.xml | 2 +- 2 files changed, 107 insertions(+), 22 deletions(-) diff --git a/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java b/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java index 696437b20c..c85591e617 100644 --- a/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java +++ b/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java @@ -328,36 +328,47 @@ private boolean isBitSet(long[] bitmap, int rowIndex) return (bitmap[longIndex] & (1L << bitOffset)) != 0; } + /** + * Test the performance and memory overhead of checkpoint offload and load. + *

+ * Run Command: + * LD_PRELOAD=/lib/x86_64-linux-gnu/libjemalloc.so.2 mvn test \ + * -Dtest=TestRetinaCheckpoint#testCheckpointPerformance \ + * -pl pixels-retina \ + * -DargLine="-Xms40g -Xmx40g" + */ @Test - public void testCheckpointPerformance() throws RetinaException, InterruptedException + public void testCheckpointPerformance() throws RetinaException, InterruptedException, IOException { // 1. Configuration parameters - int numFiles = 50000; + int numFiles = 500; int rgsPerFile = 1; int rowsPerRG = 200000; // rows per Row Group long totalRecords = (long) numFiles * rgsPerFile * rowsPerRG; double targetDeleteRatio = 0.1; int queryCount = 200; - + long timestamp = 1000L; long transId = 2000L; + System.out.println("\n============================================================"); System.out.println("--- Starting Checkpoint Performance Test ---"); - System.out.printf("Config: %d files, %d RGs/file, %d rows/RG, %d queries\n", + System.out.printf("Config: %d files, %d RGs/file, %d rows/RG, %d queries\n", numFiles, rgsPerFile, rowsPerRG, queryCount); System.out.printf("Target Delete Ratio: %.2f%%\n", targetDeleteRatio * 100); + System.out.println("============================================================\n"); - // 2. Initialize data and perform random deletes, accurately count actual deleted rows + // 2. Initialize data and perform random deletes LongAdder totalActualDeletedRows = new LongAdder(); - - // Step A: Pre-add Visibility (Synchronous or Sequential to ensure correct state in memory) + + // Step A: Pre-add Visibility for (int f = 0; f < numFiles; f++) { for (int r = 0; r < rgsPerFile; r++) { retinaManager.addVisibility(f, r, rowsPerRG); } } - // Step B: Parallel deleteRecord (RG-level parallelism) + // Step B: Parallel deleteRecord int numThreads = Runtime.getRuntime().availableProcessors(); ExecutorService executor = Executors.newFixedThreadPool(numThreads); @@ -400,53 +411,127 @@ public void testCheckpointPerformance() throws RetinaException, InterruptedExcep executor.shutdown(); double actualDeleteRatio = (double) totalActualDeletedRows.sum() / totalRecords; - System.out.printf("Actual Total Deleted Rows: %d\n", totalActualDeletedRows.sum()); - System.out.printf("Actual Delete Ratio: %.4f%%\n", actualDeleteRatio * 100); + System.out.printf("[Data Gen] Total Deleted Rows: %d (Actual Ratio: %.4f%%)\n", + totalActualDeletedRows.sum(), actualDeleteRatio * 100); // 3. Test Offload (Checkpoint Creation) performance + System.out.println("\n[Phase 1] Testing Checkpoint Offload (Write)..."); long startOffload = System.nanoTime(); retinaManager.registerOffload(timestamp); long endOffload = System.nanoTime(); + + // [Accuracy] Calculate offload peak memory overhead AFTER timing to avoid interference. + // We simulate the snapshot logic to get the exact physical size of long arrays being offloaded. + long offloadPeakBytes = calculateOffloadPeakMemory(timestamp); + double offloadTimeMs = (endOffload - startOffload) / 1e6; + String offloadPath = resolve(testCheckpointDir, "vis_offload_" + timestamp + ".bin"); + long fileSize = storage.getStatus(offloadPath).getLength(); + double writeThroughputMBs = (fileSize / (1024.0 * 1024.0)) / (offloadTimeMs / 1000.0); - System.out.printf("Total Offload Time (Writing to disk): %.2f ms\n", offloadTimeMs); - System.out.printf("Average Offload Time per RG: %.4f ms\n", offloadTimeMs / (numFiles * rgsPerFile)); + System.out.println("------------------------------------------------------------"); + System.out.printf("Total Offload Time: %.2f ms\n", offloadTimeMs); + System.out.printf("Checkpoint File Size: %.2f MB\n", fileSize / (1024.0 * 1024.0)); + System.out.printf("Offload Peak Mem Overhead: %.2f MB\n", offloadPeakBytes / (1024.0 * 1024.0)); + System.out.printf("Write Throughput: %.2f MB/s\n", writeThroughputMBs); + System.out.printf("Avg Offload Time per RG: %.4f ms\n", offloadTimeMs / (numFiles * rgsPerFile)); + System.out.println("------------------------------------------------------------"); // 4. Test Load (Checkpoint Load) performance + System.out.println("\n[Phase 2] Testing Checkpoint Load (Read)..."); long firstLoadTimeNs = 0; long subsequentLoadsTotalNs = 0; java.util.Random randomForQuery = new java.util.Random(); for (int i = 0; i < queryCount; i++) { - System.out.println("query id: " + i); int f = randomForQuery.nextInt(numFiles); int r = randomForQuery.nextInt(rgsPerFile); - + long start = System.nanoTime(); retinaManager.queryVisibility(f, r, timestamp, transId); long end = System.nanoTime(); - + if (i == 0) { firstLoadTimeNs = (end - start); + System.out.printf("Cold Start Query (Triggered full file load): %.2f ms\n", firstLoadTimeNs / 1e6); } else { subsequentLoadsTotalNs += (end - start); } } - - double totalLoadTimeMs = (firstLoadTimeNs + subsequentLoadsTotalNs) / 1e6; + + // [Accuracy] Calculate load memory overhead AFTER timing via reflection on offloadCache. + long loadCacheBytes = calculateLoadCacheMemory(timestamp); + double firstLoadTimeMs = firstLoadTimeNs / 1e6; double avgSubsequentLoadTimeMs = (subsequentLoadsTotalNs / (queryCount - 1.0)) / 1e6; + double readThroughputMBs = (fileSize / (1024.0 * 1024.0)) / (firstLoadTimeMs / 1000.0); - System.out.printf("Total Load Time (for %d queries): %.2f ms\n", queryCount, totalLoadTimeMs); - System.out.printf("First Load Time (Cold Start IO): %.2f ms\n", firstLoadTimeMs); - System.out.printf("Average Subsequent Load Time (Memory Hit): %.6f ms\n", avgSubsequentLoadTimeMs); + System.out.println("------------------------------------------------------------"); + System.out.printf("First Load Time (Cold): %.2f ms\n", firstLoadTimeMs); + System.out.printf("Load Memory Overhead: %.2f MB\n", loadCacheBytes / (1024.0 * 1024.0)); + System.out.printf("Read/Parse Throughput: %.2f MB/s\n", readThroughputMBs); + System.out.printf("Avg Memory Hit Latency: %.6f ms\n", avgSubsequentLoadTimeMs); + System.out.printf("Total Time for %d queries: %.2f ms\n", queryCount, (firstLoadTimeNs + subsequentLoadsTotalNs) / 1e6); + System.out.println("------------------------------------------------------------"); // 5. Cleanup retinaManager.unregisterOffload(timestamp); - System.out.println("--- Performance Test Finished ---\n"); + System.out.println("\n--- Checkpoint Performance Test Finished ---\n"); + } + + /** + * Accurately calculate the memory size of long arrays that would be captured in a snapshot. + * This represents the peak heap usage during the offload process. + */ + private long calculateOffloadPeakMemory(long timestamp) + { + try { + Field rgMapField = RetinaResourceManager.class.getDeclaredField("rgVisibilityMap"); + rgMapField.setAccessible(true); + @SuppressWarnings("unchecked") + Map rgMap = (Map) rgMapField.get(retinaManager); + + long totalBytes = 0; + for (RGVisibility visibility : rgMap.values()) { + long[] bitmap = visibility.getVisibilityBitmap(timestamp); + if (bitmap != null) { + totalBytes += (long) bitmap.length * 8; + } + } + return totalBytes; + } catch (Exception e) { + e.printStackTrace(); + return 0; + } + } + + /** + * Accurately calculate the memory size of long arrays currently stored in offloadCache. + * This represents the persistent heap usage after loading a checkpoint. + */ + private long calculateLoadCacheMemory(long timestamp) + { + try { + Field offloadCacheField = RetinaResourceManager.class.getDeclaredField("offloadCache"); + offloadCacheField.setAccessible(true); + @SuppressWarnings("unchecked") + Map> offloadCache = (Map>) offloadCacheField.get(retinaManager); + + Map cacheForTs = offloadCache.get(timestamp); + if (cacheForTs == null) return 0; + + long totalBytes = 0; + for (long[] bitmap : cacheForTs.values()) { + totalBytes += (long) bitmap.length * 8; + } + return totalBytes; + } catch (Exception e) { + e.printStackTrace(); + return 0; + } } } diff --git a/pom.xml b/pom.xml index 78011c6a56..91d6a33506 100644 --- a/pom.xml +++ b/pom.xml @@ -1029,7 +1029,7 @@ maven-surefire-plugin ${maven.plugin.surefire.version} - true + false From 160b4545fecdf43ded170b7a1051a9da515c788f Mon Sep 17 00:00:00 2001 From: Dongyang Geng Date: Tue, 10 Feb 2026 16:50:23 +0800 Subject: [PATCH 04/15] feat: update cp cache --- .../src/main/resources/pixels.properties | 2 + .../pixels/retina/RetinaResourceManager.java | 134 ++++++++++-------- .../pixels/retina/TestRetinaCheckpoint.java | 49 ++++++- 3 files changed, 121 insertions(+), 64 deletions(-) diff --git a/pixels-common/src/main/resources/pixels.properties b/pixels-common/src/main/resources/pixels.properties index 3d1b07e442..e926b751e6 100644 --- a/pixels-common/src/main/resources/pixels.properties +++ b/pixels-common/src/main/resources/pixels.properties @@ -297,6 +297,8 @@ retina.metrics.log.interval=-1 retina.upsert-mode.enabled=false # offloading threshold for long query in seconds pixels.transaction.offload.threshold=1800 +# lease duration for retina offload cache in seconds, default 600s +retina.offload.cache.lease.duration=600 # snapshot storage directory pixels.retina.checkpoint.dir=file:///tmp/pixels-checkpoints diff --git a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java index 195110ffab..705501fd7e 100644 --- a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java +++ b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java @@ -36,6 +36,8 @@ import io.pixelsdb.pixels.common.utils.ConfigFactory; import io.pixelsdb.pixels.core.PixelsProto; import io.pixelsdb.pixels.core.TypeDescription; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import io.pixelsdb.pixels.index.IndexProto; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -68,7 +70,7 @@ public class RetinaResourceManager // Checkpoint related fields private final ExecutorService checkpointExecutor; private final Map offloadedCheckpoints; - private final Map> offloadCache; + private final Cache> offloadCache; private final String checkpointDir; private long latestGcTimestamp = -1; private final int totalVirtualNodeNum; @@ -88,6 +90,20 @@ private static class RecoveredState } private final Map recoveryCache; + private static class CheckpointEntry + { + final long fileId; + final int rgId; + final long[] bitmap; + + CheckpointEntry(long fileId, int rgId, long[] bitmap) + { + this.fileId = fileId; + this.rgId = rgId; + this.bitmap = bitmap; + } + } + private enum CheckpointType { GC, @@ -100,9 +116,20 @@ private RetinaResourceManager() this.rgVisibilityMap = new ConcurrentHashMap<>(); this.pixelsWriteBufferMap = new ConcurrentHashMap<>(); this.offloadedCheckpoints = new ConcurrentHashMap<>(); - this.offloadCache = new ConcurrentHashMap<>(); + + ConfigFactory config = ConfigFactory.Instance(); + String leaseDurationStr = config.getProperty("retina.offload.cache.lease.duration"); + long leaseDuration = (leaseDurationStr != null) ? Long.parseLong(leaseDurationStr) : 600; + + this.offloadCache = Caffeine.newBuilder() + .expireAfterAccess(leaseDuration, TimeUnit.SECONDS) + .removalListener((key, value, cause) -> { + logger.info("Retina offload cache for timestamp {} evicted due to {}", key, cause); + }) + .build(); + this.checkpointRefCounts = new ConcurrentHashMap<>(); - this.checkpointDir = ConfigFactory.Instance().getProperty("pixels.retina.checkpoint.dir"); + this.checkpointDir = config.getProperty("pixels.retina.checkpoint.dir"); this.recoveryCache = new ConcurrentHashMap<>(); this.checkpointExecutor = Executors.newFixedThreadPool(4, r -> { @@ -118,7 +145,6 @@ private RetinaResourceManager() }); try { - ConfigFactory config = ConfigFactory.Instance(); long interval = Long.parseLong(config.getProperty("retina.gc.interval")); if (interval > 0) { @@ -397,7 +423,7 @@ public long[] queryVisibility(long fileId, int rgId, long timestamp, long transI private long[] queryFromOffloadCache(long timestamp, long fileId, int rgId) throws RetinaException { - Map cache = offloadCache.get(timestamp); + Map cache = offloadCache.getIfPresent(timestamp); if (cache != null) { long[] bitmap = cache.get(fileId + "_" + rgId); @@ -509,7 +535,7 @@ public void unregisterOffload(long timestamp) if (remaining <= 0) { offloadedCheckpoints.remove(timestamp); - offloadCache.remove(timestamp); + offloadCache.invalidate(timestamp); if (refCount.get() > 0) { logger.info("Checkpoint resurrection detected, skipping deletion. TS: {}", timestamp); @@ -529,17 +555,35 @@ private CompletableFuture createCheckpoint(long timestamp, CheckpointType String fileName = prefix + timestamp + ".bin"; String filePath = checkpointDir.endsWith("/") ? checkpointDir + fileName : checkpointDir + "/" + fileName; - // 1. Snapshot: capture bitmaps in memory synchronously to ensure consistency - long startSnapshot = System.currentTimeMillis(); - Map snapshot = new HashMap<>(this.rgVisibilityMap.size()); - for (Map.Entry entry : this.rgVisibilityMap.entrySet()) + // 1. Capture current entries to ensure we process a consistent set of RGs + List> entries = new ArrayList<>(this.rgVisibilityMap.entrySet()); + int totalRgs = entries.size(); + logger.info("Starting {} checkpoint for {} RGs at timestamp {}", type, totalRgs, timestamp); + + // 2. Use a BlockingQueue for producer-consumer pattern + // Limit capacity to avoid excessive memory usage if writing is slow + BlockingQueue queue = new LinkedBlockingQueue<>(1024); + + // 3. Start producer tasks to fetch bitmaps in parallel + for (Map.Entry entry : entries) { - snapshot.put(entry.getKey(), entry.getValue().getVisibilityBitmap(timestamp)); + checkpointExecutor.submit(() -> { + try + { + String key = entry.getKey(); + String[] parts = key.split("_"); + long fileId = Long.parseLong(parts[0]); + int rgId = Integer.parseInt(parts[1]); + long[] bitmap = entry.getValue().getVisibilityBitmap(timestamp); + queue.put(new CheckpointEntry(fileId, rgId, bitmap)); + } catch (Exception e) + { + logger.error("Failed to fetch visibility bitmap for checkpoint", e); + } + }); } - long endSnapshot = System.currentTimeMillis(); - logger.info("Snapshot for {} checkpoint took {} ms, size: {}", type, (endSnapshot - startSnapshot), snapshot.size()); - // 2. Async Write: perform IO in background thread + // 4. Async Write: perform IO in background thread (Consumer) return CompletableFuture.runAsync(() -> { long startWrite = System.currentTimeMillis(); try @@ -547,18 +591,14 @@ private CompletableFuture createCheckpoint(long timestamp, CheckpointType Storage storage = StorageFactory.Instance().getStorage(filePath); try (DataOutputStream out = storage.create(filePath, true, 8 * 1024 * 1024)) { - out.writeInt(snapshot.size()); - for (Map.Entry entry : snapshot.entrySet()) + out.writeInt(totalRgs); + for (int i = 0; i < totalRgs; i++) { - String[] parts = entry.getKey().split("_"); - long fileId = Long.parseLong(parts[0]); - int rgId = Integer.parseInt(parts[1]); - long[] bitmap = entry.getValue(); - - out.writeLong(fileId); - out.writeInt(rgId); - out.writeInt(bitmap.length); - for (long l : bitmap) + CheckpointEntry entry = queue.take(); + out.writeLong(entry.fileId); + out.writeInt(entry.rgId); + out.writeInt(entry.bitmap.length); + for (long l : entry.bitmap) { out.writeLong(l); } @@ -580,7 +620,7 @@ private CompletableFuture createCheckpoint(long timestamp, CheckpointType removeCheckpointFile(oldGcTs, CheckpointType.GC); } } - } catch (IOException e) + } catch (Exception e) { logger.error("Failed to commit {} checkpoint file for timestamp: {}", type, timestamp, e); // Try to cleanup the potentially corrupted or partial file @@ -605,17 +645,19 @@ private long[] loadBitmapFromDisk(long timestamp, long targetFileId, int targetR // Use a lock to ensure only one thread parses the file for this timestamp // For 50,000 RGs, the initial capacity of ConcurrentHashMap should be sufficient - Map cache = offloadCache.computeIfAbsent(timestamp, k -> new ConcurrentHashMap<>(50000)); - - long lockWaitStart = System.currentTimeMillis(); - synchronized (cache) + Map cache; + synchronized (offloadCache) { - long lockAcquiredTime = System.currentTimeMillis(); - if (lockAcquiredTime - lockWaitStart > 0) + cache = offloadCache.getIfPresent(timestamp); + if (cache == null) { - System.out.println("Wait for checkpoint lock took " + (lockAcquiredTime - lockWaitStart) + " ms"); + cache = new ConcurrentHashMap<>(50000); + offloadCache.put(timestamp, cache); } + } + synchronized (cache) + { // Double check if target already in cache after acquiring lock long[] cached = cache.get(targetFileId + "_" + targetRgId); if (cached != null) @@ -624,14 +666,10 @@ private long[] loadBitmapFromDisk(long timestamp, long targetFileId, int targetR } // Still not in cache, perform full load - long startTime = System.currentTimeMillis(); try { - long s1 = System.currentTimeMillis(); Storage storage = StorageFactory.Instance().getStorage(path); long fileLength = storage.getStatus(path).getLength(); - long s2 = System.currentTimeMillis(); - System.out.println("Storage get and length check took " + (s2 - s1) + " ms, file size: " + (fileLength / 1024) + " KB"); // Use PhysicalReader to read the entire file into memory at once // This is much faster than streaming from S3 for many small reads @@ -649,20 +687,13 @@ private long[] loadBitmapFromDisk(long timestamp, long targetFileId, int targetR buffer.get(content); } } - long s3 = System.currentTimeMillis(); - System.out.println("PhysicalReader readFully took " + (s3 - s2) + " ms"); try (DataInputStream in = new DataInputStream(new ByteArrayInputStream(content))) { int rgCount = in.readInt(); - System.out.println("Loading checkpoint from memory buffer, RG count: " + rgCount); - - long totalReadNanos = 0; - long totalPutNanos = 0; for (int i = 0; i < rgCount; i++) { - long t1 = System.nanoTime(); long fileId = in.readLong(); int rgId = in.readInt(); int len = in.readInt(); @@ -671,25 +702,10 @@ private long[] loadBitmapFromDisk(long timestamp, long targetFileId, int targetR { bitmap[j] = in.readLong(); } - long t2 = System.nanoTime(); cache.put(fileId + "_" + rgId, bitmap); - long t3 = System.nanoTime(); - - totalReadNanos += (t2 - t1); - totalPutNanos += (t3 - t2); - - if ((i + 1) % 10000 == 0) - { - System.out.println("Processed " + (i + 1) + "/" + rgCount + " RGs... (cumulative memory parse: " + (totalReadNanos / 1_000_000) + " ms, put: " + (totalPutNanos / 1_000_000) + " ms)"); - System.out.flush(); - } } - long s4 = System.currentTimeMillis(); - System.out.println("Core loop finished. Total memory parse: " + (totalReadNanos / 1_000_000) + " ms, Total cache put: " + (totalPutNanos / 1_000_000) + " ms. Loop wall time: " + (s4 - s3) + " ms"); } - long endTime = System.currentTimeMillis(); - System.out.println("Loaded " + cache.size() + " RGs from checkpoint " + path + " in total " + (endTime - startTime) + " ms (wall time)"); } catch (IOException e) { throw new RetinaException("Failed to read checkpoint file", e); diff --git a/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java b/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java index c85591e617..9608dcc8ef 100644 --- a/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java +++ b/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java @@ -87,31 +87,38 @@ private String resolve(String dir, String filename) { @Test public void testRegisterOffload() throws RetinaException, IOException { + System.out.println("\n[Test] Starting testRegisterOffload..."); retinaManager.addVisibility(fileId, rgId, numRows); long timestamp = 100L; // Register offload + System.out.println("Registering offload for timestamp: " + timestamp); retinaManager.registerOffload(timestamp); // Verify checkpoint file exists String expectedFile = resolve(testCheckpointDir, "vis_offload_100.bin"); assertTrue("Offload checkpoint file should exist", storage.exists(expectedFile)); + System.out.println("Verified: Checkpoint file exists at " + expectedFile); // Unregister + System.out.println("Unregistering offload..."); retinaManager.unregisterOffload(timestamp); // File should be removed assertFalse("Offload checkpoint file should be removed", storage.exists(expectedFile)); + System.out.println("Verified: Checkpoint file removed. testRegisterOffload passed."); } @Test public void testMultipleOffloads() throws RetinaException, IOException { + System.out.println("\n[Test] Starting testMultipleOffloads..."); retinaManager.addVisibility(fileId, rgId, numRows); long timestamp1 = 100L; long timestamp1_dup = 100L; // same timestamp // Both register the same timestamp - should share checkpoint + System.out.println("Registering same timestamp twice..."); retinaManager.registerOffload(timestamp1); retinaManager.registerOffload(timestamp1_dup); @@ -119,22 +126,28 @@ public void testMultipleOffloads() throws RetinaException, IOException assertTrue("Offload checkpoint file should exist", storage.exists(expectedFile)); // Unregister one - should not remove yet (ref count >1) + System.out.println("Unregistering once (ref count should still be > 0)..."); retinaManager.unregisterOffload(timestamp1); assertTrue("Offload checkpoint should still exist (ref count >1)", storage.exists(expectedFile)); + System.out.println("Verified: Checkpoint still exists after one unregister."); // Unregister second + System.out.println("Unregistering second time..."); retinaManager.unregisterOffload(timestamp1); assertFalse("Offload checkpoint should be removed", storage.exists(expectedFile)); + System.out.println("Verified: Checkpoint removed after final unregister. testMultipleOffloads passed."); } @Test public void testCheckpointRecovery() throws RetinaException, IOException { + System.out.println("\n[Test] Starting testCheckpointRecovery..."); retinaManager.addVisibility(fileId, rgId, numRows); long timestamp = 100L; // 1. Delete row 10 int rowToDelete = 10; + System.out.println("Deleting row " + rowToDelete + " in memory..."); retinaManager.deleteRecord(fileId, rgId, rowToDelete, timestamp); // Verify deleted in memory @@ -142,12 +155,14 @@ public void testCheckpointRecovery() throws RetinaException, IOException assertTrue("Row 10 should be deleted in memory", isBitSet(memBitmap, rowToDelete)); // 2. Register Offload to generate checkpoint file + System.out.println("Creating checkpoint on disk..."); retinaManager.registerOffload(timestamp); String offloadPath = resolve(testCheckpointDir, "vis_offload_" + timestamp + ".bin"); assertTrue("Checkpoint file should exist", storage.exists(offloadPath)); // 3. Rename offload file to GC file to simulate checkpoint generated by GC String gcPath = resolve(testCheckpointDir, "vis_gc_" + timestamp + ".bin"); + System.out.println("Simulating GC checkpoint by renaming offload file to: " + gcPath); // Storage interface doesn't have renamed, using copy and delete try (DataInputStream in = storage.open(offloadPath); DataOutputStream out = storage.create(gcPath, true, 4096)) @@ -162,49 +177,61 @@ public void testCheckpointRecovery() throws RetinaException, IOException storage.delete(offloadPath, false); // 4. Reset singleton state (Simulate Crash/Restart) + System.out.println("Simulating system restart (resetting memory state)..."); resetSingletonState(); // 5. Perform recovery + System.out.println("Running recoverCheckpoints()..."); // At this point rgVisibilityMap is empty, need to call recoverCheckpoints to load data into cache retinaManager.recoverCheckpoints(); // 6. Re-add Visibility, at this point it should recover state from recoveryCache instead of creating new + System.out.println("Re-adding visibility for file (should trigger recovery from cache)..."); retinaManager.addVisibility(fileId, rgId, numRows); // 7. Verify recovered state: Row 10 should still be in deleted state long[] recoveredBitmap = retinaManager.queryVisibility(fileId, rgId, timestamp); assertTrue("Row 10 should still be deleted after recovery", isBitSet(recoveredBitmap, rowToDelete)); assertFalse("Row 11 should not be deleted", isBitSet(recoveredBitmap, rowToDelete + 1)); + System.out.println("Verified: Recovery successful, row state restored. testCheckpointRecovery passed."); } @Test public void testDiskBitmapQuery() throws RetinaException { + System.out.println("\n[Test] Starting testDiskBitmapQuery..."); retinaManager.addVisibility(fileId, rgId, numRows); long baseTimestamp = 200L; long transId = 888L; // 1. Delete row 5 at baseTimestamp + System.out.println("Deleting row 5 at t=" + baseTimestamp); retinaManager.deleteRecord(fileId, rgId, 5, baseTimestamp); // 2. Register Offload for this transaction (save snapshot at this moment to disk) + System.out.println("Registering offload (disk snapshot)..."); retinaManager.registerOffload(baseTimestamp); // 3. Delete row 6 at a later time baseTimestamp + 10 // This only affects the latest state in memory, should not affect the checkpoint on disk + System.out.println("Deleting row 6 at t=" + (baseTimestamp + 10) + " (after snapshot)"); retinaManager.deleteRecord(fileId, rgId, 6, baseTimestamp + 10); // 4. Case A: Query using transId (should read disk Checkpoint) // Expected: Row 5 deleted, Row 6 not deleted (deleted after checkpoint) + System.out.println("Querying via transId (expecting disk data)..."); long[] diskBitmap = retinaManager.queryVisibility(fileId, rgId, baseTimestamp, transId); assertTrue("Disk: Row 5 should be deleted", isBitSet(diskBitmap, 5)); assertFalse("Disk: Row 6 should NOT be deleted (deleted after checkpoint)", isBitSet(diskBitmap, 6)); + System.out.println("Verified: Disk query correctly ignores later memory updates."); // 5. Case B: Query without transId (read memory) // Expected: Query at a later timestamp, both rows 5 and 6 are deleted + System.out.println("Querying via memory..."); long[] memBitmap = retinaManager.queryVisibility(fileId, rgId, baseTimestamp + 20); assertTrue("Memory: Row 5 should be deleted", isBitSet(memBitmap, 5)); assertTrue("Memory: Row 6 should be deleted", isBitSet(memBitmap, 6)); + System.out.println("Verified: Memory query sees all updates. testDiskBitmapQuery passed."); // Cleanup retinaManager.unregisterOffload(baseTimestamp); @@ -213,6 +240,7 @@ public void testDiskBitmapQuery() throws RetinaException @Test public void testConcurrency() throws InterruptedException, RetinaException { + System.out.println("\n[Test] Starting testConcurrency with 20 threads..."); retinaManager.addVisibility(fileId, rgId, numRows); int numThreads = 20; int operationsPerThread = 50; @@ -296,7 +324,12 @@ private void resetSingletonState() Field offloadCacheField = RetinaResourceManager.class.getDeclaredField("offloadCache"); offloadCacheField.setAccessible(true); - ((Map) offloadCacheField.get(retinaManager)).clear(); + Object offloadCache = offloadCacheField.get(retinaManager); + if (offloadCache != null) { + java.lang.reflect.Method invalidateAll = offloadCache.getClass().getMethod("invalidateAll"); + invalidateAll.setAccessible(true); + invalidateAll.invoke(offloadCache); + } Field refCountsField = RetinaResourceManager.class.getDeclaredField("checkpointRefCounts"); refCountsField.setAccessible(true); @@ -341,7 +374,7 @@ private boolean isBitSet(long[] bitmap, int rowIndex) public void testCheckpointPerformance() throws RetinaException, InterruptedException, IOException { // 1. Configuration parameters - int numFiles = 500; + int numFiles = 1; int rgsPerFile = 1; int rowsPerRG = 200000; // rows per Row Group long totalRecords = (long) numFiles * rgsPerFile * rowsPerRG; @@ -364,7 +397,7 @@ public void testCheckpointPerformance() throws RetinaException, InterruptedExcep // Step A: Pre-add Visibility for (int f = 0; f < numFiles; f++) { for (int r = 0; r < rgsPerFile; r++) { - retinaManager.addVisibility(f, r, rowsPerRG); + retinaManager.addVisibility(f, r, rowsPerRG); } } @@ -518,10 +551,16 @@ private long calculateLoadCacheMemory(long timestamp) try { Field offloadCacheField = RetinaResourceManager.class.getDeclaredField("offloadCache"); offloadCacheField.setAccessible(true); + Object offloadCacheObj = offloadCacheField.get(retinaManager); + if (offloadCacheObj == null) return 0; + + java.lang.reflect.Method asMapMethod = offloadCacheObj.getClass().getMethod("asMap"); + asMapMethod.setAccessible(true); @SuppressWarnings("unchecked") - Map> offloadCache = (Map>) offloadCacheField.get(retinaManager); + Map> offloadCacheMap = + (Map>) asMapMethod.invoke(offloadCacheObj); - Map cacheForTs = offloadCache.get(timestamp); + Map cacheForTs = offloadCacheMap.get(timestamp); if (cacheForTs == null) return 0; long totalBytes = 0; From 5a285c3d1587f9d7048c9b7743ddba79660c5c9c Mon Sep 17 00:00:00 2001 From: Dongyang Geng Date: Tue, 10 Feb 2026 17:26:22 +0800 Subject: [PATCH 05/15] feat: opt checkpoint --- .../src/main/resources/pixels.properties | 2 + .../pixels/retina/RetinaResourceManager.java | 81 +++---------------- 2 files changed, 13 insertions(+), 70 deletions(-) diff --git a/pixels-common/src/main/resources/pixels.properties b/pixels-common/src/main/resources/pixels.properties index e926b751e6..6f781d67ee 100644 --- a/pixels-common/src/main/resources/pixels.properties +++ b/pixels-common/src/main/resources/pixels.properties @@ -287,6 +287,8 @@ retina.buffer.flush.count=20 retina.buffer.flush.interval=30 # interval in seconds for retina visibility garbage retina.gc.interval=300 +# number of threads for retina checkpoint +retina.checkpoint.threads=4 # retina buffer reader prefetch threads num retina.reader.prefetch.threads=8 # retina service init threads num diff --git a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java index 705501fd7e..0b657f9b66 100644 --- a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java +++ b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java @@ -30,8 +30,6 @@ import io.pixelsdb.pixels.common.physical.PhysicalReaderUtil; import io.pixelsdb.pixels.common.physical.Storage; import io.pixelsdb.pixels.common.physical.StorageFactory; -import io.pixelsdb.pixels.common.transaction.TransContext; -import io.pixelsdb.pixels.common.transaction.TransContextCache; import io.pixelsdb.pixels.common.transaction.TransService; import io.pixelsdb.pixels.common.utils.ConfigFactory; import io.pixelsdb.pixels.core.PixelsProto; @@ -132,17 +130,19 @@ private RetinaResourceManager() this.checkpointDir = config.getProperty("pixels.retina.checkpoint.dir"); this.recoveryCache = new ConcurrentHashMap<>(); - this.checkpointExecutor = Executors.newFixedThreadPool(4, r -> { + String cpThreadsStr = config.getProperty("retina.checkpoint.threads"); + int cpThreads = (cpThreadsStr != null) ? Integer.parseInt(cpThreadsStr) : 4; + this.checkpointExecutor = Executors.newFixedThreadPool(cpThreads, r -> { Thread t = new Thread(r, "retina-checkpoint-thread"); t.setDaemon(true); return t; }); ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(r -> { - Thread t = new Thread(r, "retina-gc-thread"); - t.setDaemon(true); - return t; - }); + Thread t = new Thread(r, "retina-gc-thread"); + t.setDaemon(true); + return t; + }); try { long interval = Long.parseLong(config.getProperty("retina.gc.interval")); @@ -155,19 +155,9 @@ private RetinaResourceManager() TimeUnit.SECONDS ); } - - // Start offload monitor - String offloadIntervalStr = config.getProperty("retina.offload.monitor.interval"); - long offloadInterval = (offloadIntervalStr != null) ? Long.parseLong(offloadIntervalStr) : 5; - executor.scheduleAtFixedRate( - this::monitorAndOffloadTransactions, - offloadInterval, - offloadInterval, - TimeUnit.SECONDS - ); } catch (Exception e) { - logger.error("Failed to start retina background gc or monitor", e); + logger.error("Failed to start retina background gc", e); } this.gcExecutor = executor; totalVirtualNodeNum = Integer.parseInt(ConfigFactory.Instance().getProperty("node.virtual.num")); @@ -195,55 +185,6 @@ public static RetinaResourceManager Instance() return InstanceHolder.instance; } - private void monitorAndOffloadTransactions() - { - try - { - String thresholdStr = ConfigFactory.Instance().getProperty("retina.offload.threshold"); - long thresholdMs = (thresholdStr != null) ? Long.parseLong(thresholdStr) : 60000; - long currentTime = System.currentTimeMillis(); - - Map activeTransactions = getActiveTransactions(); - for (TransContext context : activeTransactions.values()) - { - if (context.isReadOnly() && !context.isOffloaded()) - { - if (currentTime - context.getStartTime() > thresholdMs) - { - try - { - logger.info("Transaction {} exceeds threshold, offloading...", context.getTransId()); - registerOffload(context.getTimestamp()); - TransService.Instance().markTransOffloaded(context.getTransId()); - context.setOffloaded(true); - } catch (Exception e) - { - logger.error("Failed to offload transaction {}", context.getTransId(), e); - } - } - } - } - } catch (Exception e) - { - logger.error("Error in offload monitor", e); - } - } - - @SuppressWarnings("unchecked") - private Map getActiveTransactions() - { - try - { - java.lang.reflect.Field field = TransContextCache.class.getDeclaredField("transIdToContext"); - field.setAccessible(true); - return (Map) field.get(TransContextCache.Instance()); - } catch (Exception e) - { - logger.error("Failed to get active transactions via reflection", e); - return Collections.emptyMap(); - } - } - public void recoverCheckpoints() { try @@ -432,8 +373,8 @@ private long[] queryFromOffloadCache(long timestamp, long fileId, int rgId) thro return bitmap; } } - // Cache miss, load from disk - return loadBitmapFromDisk(timestamp, fileId, rgId); + // Cache miss, load from storage + return loadBitmapFromStorage(timestamp, fileId, rgId); } public long[] queryVisibility(long fileId, int rgId, long timestamp) throws RetinaException @@ -635,7 +576,7 @@ private CompletableFuture createCheckpoint(long timestamp, CheckpointType }, checkpointExecutor); } - private long[] loadBitmapFromDisk(long timestamp, long targetFileId, int targetRgId) throws RetinaException + private long[] loadBitmapFromStorage(long timestamp, long targetFileId, int targetRgId) throws RetinaException { String path = offloadedCheckpoints.get(timestamp); if (path == null) From e42829d8a7d7ed4ecb45f1060930dee19bed411f Mon Sep 17 00:00:00 2001 From: Dongyang Geng Date: Tue, 10 Feb 2026 17:32:42 +0800 Subject: [PATCH 06/15] feat: opt cp --- .../io/pixelsdb/pixels/retina/RetinaResourceManager.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java index 0b657f9b66..7a83e49d9e 100644 --- a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java +++ b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java @@ -116,8 +116,7 @@ private RetinaResourceManager() this.offloadedCheckpoints = new ConcurrentHashMap<>(); ConfigFactory config = ConfigFactory.Instance(); - String leaseDurationStr = config.getProperty("retina.offload.cache.lease.duration"); - long leaseDuration = (leaseDurationStr != null) ? Long.parseLong(leaseDurationStr) : 600; + long leaseDuration = Long.parseLong(config.getProperty("retina.offload.cache.lease.duration")); this.offloadCache = Caffeine.newBuilder() .expireAfterAccess(leaseDuration, TimeUnit.SECONDS) @@ -130,8 +129,7 @@ private RetinaResourceManager() this.checkpointDir = config.getProperty("pixels.retina.checkpoint.dir"); this.recoveryCache = new ConcurrentHashMap<>(); - String cpThreadsStr = config.getProperty("retina.checkpoint.threads"); - int cpThreads = (cpThreadsStr != null) ? Integer.parseInt(cpThreadsStr) : 4; + int cpThreads = Integer.parseInt(config.getProperty("retina.checkpoint.threads")); this.checkpointExecutor = Executors.newFixedThreadPool(cpThreads, r -> { Thread t = new Thread(r, "retina-checkpoint-thread"); t.setDaemon(true); From 3cb49b3c91cb583076072532d20abd93cd24c328 Mon Sep 17 00:00:00 2001 From: Dongyang Geng Date: Tue, 10 Feb 2026 17:33:27 +0800 Subject: [PATCH 07/15] fix: clean import --- .../java/io/pixelsdb/pixels/retina/RetinaResourceManager.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java index 7a83e49d9e..719cda4193 100644 --- a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java +++ b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java @@ -25,7 +25,6 @@ import io.pixelsdb.pixels.common.metadata.MetadataService; import io.pixelsdb.pixels.common.metadata.domain.Column; import io.pixelsdb.pixels.common.metadata.domain.Layout; -import io.pixelsdb.pixels.common.metadata.domain.Table; import io.pixelsdb.pixels.common.physical.PhysicalReader; import io.pixelsdb.pixels.common.physical.PhysicalReaderUtil; import io.pixelsdb.pixels.common.physical.Storage; From c6fa0e4ee16276a5411206577d986dc370b04d10 Mon Sep 17 00:00:00 2001 From: Dongyang Geng Date: Tue, 10 Feb 2026 17:46:32 +0800 Subject: [PATCH 08/15] feat: add cp test scripts --- .../pixels/retina/TestRetinaCheckpoint.java | 2 +- scripts/test_checkpoint_performance.py | 110 ++++++++++++++++++ 2 files changed, 111 insertions(+), 1 deletion(-) create mode 100644 scripts/test_checkpoint_performance.py diff --git a/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java b/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java index 9608dcc8ef..aea09cd43d 100644 --- a/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java +++ b/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java @@ -378,7 +378,7 @@ public void testCheckpointPerformance() throws RetinaException, InterruptedExcep int rgsPerFile = 1; int rowsPerRG = 200000; // rows per Row Group long totalRecords = (long) numFiles * rgsPerFile * rowsPerRG; - double targetDeleteRatio = 0.1; + double targetDeleteRatio = 0.1; // @TARGET_DELETE_RATIO@ int queryCount = 200; long timestamp = 1000L; diff --git a/scripts/test_checkpoint_performance.py b/scripts/test_checkpoint_performance.py new file mode 100644 index 0000000000..00d99dbf5b --- /dev/null +++ b/scripts/test_checkpoint_performance.py @@ -0,0 +1,110 @@ +import subprocess +import re +import csv +import os + +# 配置 +TEST_FILE = "pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java" +RATIOS = [0.01, 0.05, 0.10, 0.20, 0.50] +OUTPUT_CSV = "checkpoint_perf_results.csv" + +# 正则表达式用于提取结果 +PATTERNS = { + "Target Delete Ratio": r"Target Delete Ratio: ([\d.]+)%", + "Actual Ratio": r"Actual Ratio: ([\d.]+)%", + "Offload Time (ms)": r"Total Offload Time:\s+([\d.]+) ms", + "File Size (MB)": r"Checkpoint File Size:\s+([\d.]+) MB", + "Offload Peak Mem (MB)": r"Offload Peak Mem Overhead:\s+([\d.]+) MB", + "Write Throughput (MB/s)": r"Write Throughput:\s+([\d.]+) MB/s", + "Cold Load Time (ms)": r"First Load Time \(Cold\):\s+([\d.]+) ms", + "Load Memory (MB)": r"Load Memory Overhead:\s+([\d.]+) MB", + "Read Throughput (MB/s)": r"Read/Parse Throughput:\s+([\d.]+) MB/s", + "Avg Memory Hit Latency (ms)": r"Avg Memory Hit Latency:\s+([\d.]+) ms" +} + +def run_maven_test(ratio): + print(f"\n>>> Running test for Target Delete Ratio: {ratio*100:.2f}%") + + # 1. 修改源码中的 ratio + with open(TEST_FILE, 'r') as f: + content = f.read() + + new_content = re.sub( + r"double targetDeleteRatio = [\d.]+; // @TARGET_DELETE_RATIO@", + f"double targetDeleteRatio = {ratio}; // @TARGET_DELETE_RATIO@", + content + ) + + with open(TEST_FILE, 'w') as f: + f.write(new_content) + + # 2. 执行命令 + cmd = [ + "mvn", "test", + "-Dtest=TestRetinaCheckpoint#testCheckpointPerformance", + "-pl", "pixels-retina", + "-DargLine=-Xms40g -Xmx40g" + ] + env = os.environ.copy() + env["LD_PRELOAD"] = "/lib/x86_64-linux-gnu/libjemalloc.so.2" + + try: + process = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) + full_output = [] + for line in process.stdout: + print(line, end="") + full_output.append(line) + process.wait() + + if process.returncode != 0: + print(f"Error: Test failed for ratio {ratio}") + return None + + # 3. 解析输出 + output_str = "".join(full_output) + results = {"Target Delete Ratio": ratio * 100} + for key, pattern in PATTERNS.items(): + if key == "Target Delete Ratio": continue + match = re.search(pattern, output_str) + if match: + results[key] = match.group(1) + else: + results[key] = "N/A" + + return results + + except Exception as e: + print(f"Exception during test: {e}") + return None + +def main(): + all_results = [] + + # 备份原始文件 + with open(TEST_FILE, 'r') as f: + original_content = f.read() + + try: + for ratio in RATIOS: + res = run_maven_test(ratio) + if res: + all_results.append(res) + + # 写入 CSV + if all_results: + keys = all_results[0].keys() + with open(OUTPUT_CSV, 'w', newline='') as f: + dict_writer = csv.DictWriter(f, fieldnames=keys) + dict_writer.writeheader() + dict_writer.writerows(all_results) + print(f"\nSuccess! Results saved to {OUTPUT_CSV}") + else: + print("\nNo results collected.") + + finally: + # 还原原始文件 + with open(TEST_FILE, 'w') as f: + f.write(original_content) + +if __name__ == "__main__": + main() From 16244fb581500a3ae9152359b446d6eee203e073 Mon Sep 17 00:00:00 2001 From: Dongyang Geng Date: Tue, 10 Feb 2026 18:47:15 +0800 Subject: [PATCH 09/15] fix: decoupled thread pool --- .../java/io/pixelsdb/pixels/retina/RetinaResourceManager.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java index 719cda4193..c6ffbe9759 100644 --- a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java +++ b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java @@ -522,6 +522,7 @@ private CompletableFuture createCheckpoint(long timestamp, CheckpointType } // 4. Async Write: perform IO in background thread (Consumer) + // Use commonPool to avoid deadlocks with checkpointExecutor return CompletableFuture.runAsync(() -> { long startWrite = System.currentTimeMillis(); try @@ -570,7 +571,7 @@ private CompletableFuture createCheckpoint(long timestamp, CheckpointType } throw new CompletionException(e); } - }, checkpointExecutor); + }); } private long[] loadBitmapFromStorage(long timestamp, long targetFileId, int targetRgId) throws RetinaException From d76d70bf35d27f36a579714ccbf63f5ae38b6812 Mon Sep 17 00:00:00 2001 From: Dongyang Geng Date: Tue, 10 Feb 2026 21:09:59 +0800 Subject: [PATCH 10/15] fix: multi retina node --- .../pixels/retina/RetinaResourceManager.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java index c6ffbe9759..6eeef3cb6f 100644 --- a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java +++ b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java @@ -198,11 +198,14 @@ public void recoverCheckpoints() allFiles = allFiles.stream().filter(p -> p.endsWith(".bin")).collect(Collectors.toList()); List gcTimestamps = new ArrayList<>(); + String offloadPrefix = "vis_offload_" + retinaHostName + "_"; + String gcPrefix = "vis_gc_" + retinaHostName + "_"; + for (String path : allFiles) { // use Paths.get().getFileName() to extract filename from path string String filename = Paths.get(path).getFileName().toString(); - if (filename.startsWith("vis_offload_")) + if (filename.startsWith(offloadPrefix)) { // delete offload checkpoint files when restarting try @@ -212,11 +215,11 @@ public void recoverCheckpoints() { logger.error("Failed to delete checkpoint file {}", path, e); } - } else if (filename.startsWith("vis_gc_")) + } else if (filename.startsWith(gcPrefix)) { try { - gcTimestamps.add(Long.parseLong(filename.replace("vis_gc_", "").replace(".bin", ""))); + gcTimestamps.add(Long.parseLong(filename.replace(gcPrefix, "").replace(".bin", ""))); } catch (Exception e) { logger.error("Failed to parse checkpoint timestamp from file {}", path, e); @@ -250,7 +253,7 @@ public void recoverCheckpoints() private void loadCheckpointToCache(long timestamp) { - String fileName = "vis_gc_" + timestamp + ".bin"; + String fileName = "vis_gc_" + retinaHostName + "_" + timestamp + ".bin"; // construct path. Storage expects '/' separator usually, but let's be safe String path = checkpointDir.endsWith("/") ? checkpointDir + fileName : checkpointDir + "/" + fileName; @@ -490,7 +493,7 @@ public void unregisterOffload(long timestamp) private CompletableFuture createCheckpoint(long timestamp, CheckpointType type) throws RetinaException { String prefix = (type == CheckpointType.GC) ? "vis_gc_" : "vis_offload_"; - String fileName = prefix + timestamp + ".bin"; + String fileName = prefix + retinaHostName + "_" + timestamp + ".bin"; String filePath = checkpointDir.endsWith("/") ? checkpointDir + fileName : checkpointDir + "/" + fileName; // 1. Capture current entries to ensure we process a consistent set of RGs @@ -657,7 +660,7 @@ private long[] loadBitmapFromStorage(long timestamp, long targetFileId, int targ private void removeCheckpointFile(long timestamp, CheckpointType type) { String prefix = (type == CheckpointType.GC) ? "vis_gc_" : "vis_offload_"; - String fileName = prefix + timestamp + ".bin"; + String fileName = prefix + retinaHostName + "_" + timestamp + ".bin"; String path = checkpointDir.endsWith("/") ? checkpointDir + fileName : checkpointDir + "/" + fileName; try From 1c640a8ce2f7c9bd34f9a6b01b5099ada3aaba58 Mon Sep 17 00:00:00 2001 From: Dongyang Geng Date: Tue, 10 Feb 2026 22:41:37 +0800 Subject: [PATCH 11/15] feat: use cp cache in worker --- .../pixels/common/retina/RetinaService.java | 67 +++- .../core/reader/PixelsRecordReaderImpl.java | 16 +- .../reader/VisibilityCheckpointCache.java | 136 ++++++++ .../reader/TestVisibilityCheckpointCache.java | 169 ++++++++++ .../daemon/retina/RetinaServerImpl.java | 20 +- .../pixels/retina/RetinaResourceManager.java | 121 +------ .../pixels/retina/TestRetinaCheckpoint.java | 294 ++---------------- proto/retina.proto | 5 +- 8 files changed, 426 insertions(+), 402 deletions(-) create mode 100644 pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/VisibilityCheckpointCache.java create mode 100644 pixels-core/src/test/java/io/pixelsdb/pixels/core/reader/TestVisibilityCheckpointCache.java diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/retina/RetinaService.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/retina/RetinaService.java index 7780f2e2db..d27f912c01 100644 --- a/pixels-common/src/main/java/io/pixelsdb/pixels/common/retina/RetinaService.java +++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/retina/RetinaService.java @@ -315,16 +315,65 @@ public boolean addVisibility(String filePath) throws RetinaException return true; } - public long[][] queryVisibility(long fileId, int[] rgIds, long timestamp) throws RetinaException + public static final class VisibilityResult + { + private final long[][] bitmaps; + private final String checkpointPath; + + private VisibilityResult(long[][] bitmaps, String checkpointPath) + { + this.bitmaps = bitmaps; + this.checkpointPath = checkpointPath; + } + + public static VisibilityResult fromBitmaps(long[][] bitmaps) + { + return new VisibilityResult(bitmaps, null); + } + + public static VisibilityResult fromCheckpointPath(String path) + { + return new VisibilityResult(null, path); + } + + public boolean isOffloaded() + { + return checkpointPath != null; + } + + public long[][] getBitmaps() + { + if (isOffloaded()) + { + throw new IllegalStateException("Data is offloaded to checkpoint, use getCheckpointPath() instead."); + } + return bitmaps; + } + + public String getCheckpointPath() + { + return checkpointPath; + } + } + + public VisibilityResult queryVisibility(long fileId, int[] rgIds, long timestamp) throws RetinaException + { + return queryVisibility(fileId, rgIds, timestamp, -1); + } + + public VisibilityResult queryVisibility(long fileId, int[] rgIds, long timestamp, long transId) throws RetinaException { String token = UUID.randomUUID().toString(); - RetinaProto.QueryVisibilityRequest request = RetinaProto.QueryVisibilityRequest.newBuilder() + RetinaProto.QueryVisibilityRequest.Builder requestBuilder = RetinaProto.QueryVisibilityRequest.newBuilder() .setHeader(RetinaProto.RequestHeader.newBuilder().setToken(token).build()) .setFileId(fileId) .addAllRgIds(Arrays.stream(rgIds).boxed().collect(Collectors.toList())) - .setTimestamp(timestamp) - .build(); - RetinaProto.QueryVisibilityResponse response = this.stub.queryVisibility(request); + .setTimestamp(timestamp); + if (transId != -1) + { + requestBuilder.setTransId(transId); + } + RetinaProto.QueryVisibilityResponse response = this.stub.queryVisibility(requestBuilder.build()); if (response.getHeader().getErrorCode() != 0) { throw new RetinaException("Failed to query visibility: " + response.getHeader().getErrorCode() @@ -334,13 +383,19 @@ public long[][] queryVisibility(long fileId, int[] rgIds, long timestamp) throws { throw new RetinaException("Response token does not match"); } + + if (response.getCheckpointPath() != null && !response.getCheckpointPath().isEmpty()) + { + return VisibilityResult.fromCheckpointPath(response.getCheckpointPath()); + } + long[][] visibilityBitmaps = new long[rgIds.length][]; for (int i = 0; i < response.getBitmapsCount(); i++) { RetinaProto.VisibilityBitmap bitmap = response.getBitmaps(i); visibilityBitmaps[i] = bitmap.getBitmapList().stream().mapToLong(Long::longValue).toArray(); } - return visibilityBitmaps; + return VisibilityResult.fromBitmaps(visibilityBitmaps); } public boolean reclaimVisibility(long fileId, int[] rgIds, long timestamp) throws RetinaException diff --git a/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/PixelsRecordReaderImpl.java b/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/PixelsRecordReaderImpl.java index 7eb707135c..534c40c204 100644 --- a/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/PixelsRecordReaderImpl.java +++ b/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/PixelsRecordReaderImpl.java @@ -509,7 +509,21 @@ else if (predicate.matchesNone()) { MetadataService metadataService = MetadataService.Instance(); long fileId = metadataService.getFileId(physicalReader.getPathUri()); - rgVisibilityBitmaps = retinaService.queryVisibility(fileId, targetRGs, option.getTransTimestamp()); + RetinaService.VisibilityResult result = retinaService.queryVisibility(fileId, targetRGs, option.getTransTimestamp(), option.getTransId()); + if (result.isOffloaded()) + { + String checkpointPath = result.getCheckpointPath(); + rgVisibilityBitmaps = new long[targetRGNum][]; + for (int i = 0; i < targetRGNum; i++) + { + rgVisibilityBitmaps[i] = VisibilityCheckpointCache.getInstance() + .getVisibilityBitmap(option.getTransTimestamp(), checkpointPath, fileId, targetRGs[i]); + } + } + else + { + rgVisibilityBitmaps = result.getBitmaps(); + } } catch (IOException e) { logger.error("Failed to get path uri"); diff --git a/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/VisibilityCheckpointCache.java b/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/VisibilityCheckpointCache.java new file mode 100644 index 0000000000..00d58ec043 --- /dev/null +++ b/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/VisibilityCheckpointCache.java @@ -0,0 +1,136 @@ +/* + * Copyright 2024 PixelsDB. + * + * This file is part of Pixels. + * + * Pixels is free software: you can redistribute it and/or modify + * it under the terms of the Affero GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Pixels is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Affero GNU General Public License for more details. + * + * You should have received a copy of the Affero GNU General Public + * License along with Pixels. If not, see + * . + */ +package io.pixelsdb.pixels.core.reader; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import io.pixelsdb.pixels.common.physical.PhysicalReader; +import io.pixelsdb.pixels.common.physical.PhysicalReaderUtil; +import io.pixelsdb.pixels.common.physical.Storage; +import io.pixelsdb.pixels.common.physical.StorageFactory; +import io.pixelsdb.pixels.common.utils.ConfigFactory; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +/** + * Client-side cache for visibility checkpoints. + * This cache is used by PixelsReader to avoid repeated loading of checkpoint files from storage. + */ +public class VisibilityCheckpointCache +{ + private static final Logger logger = LogManager.getLogger(VisibilityCheckpointCache.class); + private static final VisibilityCheckpointCache instance = new VisibilityCheckpointCache(); + + private final Cache> cache; + + private VisibilityCheckpointCache() + { + ConfigFactory config = ConfigFactory.Instance(); + String leaseProperty = config.getProperty("retina.offload.cache.lease.duration"); + long leaseDuration = leaseProperty != null ? Long.parseLong(leaseProperty) : 3600; + + this.cache = Caffeine.newBuilder() + .expireAfterAccess(leaseDuration, TimeUnit.SECONDS) + .removalListener((key, value, cause) -> { + logger.info("Client-side visibility cache for timestamp {} evicted due to {}", key, cause); + }) + .build(); + } + + public static VisibilityCheckpointCache getInstance() + { + return instance; + } + + public long[] getVisibilityBitmap(long timestamp, String checkpointPath, long targetFileId, int targetRgId) throws IOException + { + Map timestampCache = cache.getIfPresent(timestamp); + + if (timestampCache == null) + { + synchronized (this) + { + timestampCache = cache.getIfPresent(timestamp); + if (timestampCache == null) + { + timestampCache = loadCheckpointFile(checkpointPath); + cache.put(timestamp, timestampCache); + } + } + } + + String rgKey = targetFileId + "_" + targetRgId; + return timestampCache.getOrDefault(rgKey, null); + } + + private Map loadCheckpointFile(String path) throws IOException + { + long start = System.currentTimeMillis(); + Storage storage = StorageFactory.Instance().getStorage(path); + long fileLength = storage.getStatus(path).getLength(); + + byte[] content; + try (PhysicalReader reader = PhysicalReaderUtil.newPhysicalReader(storage, path)) + { + ByteBuffer buffer = reader.readFully((int) fileLength); + if (buffer.hasArray()) + { + content = buffer.array(); + } + else + { + content = new byte[(int) fileLength]; + buffer.get(content); + } + } + + Map timestampCache; + try (DataInputStream in = new DataInputStream(new ByteArrayInputStream(content))) + { + int rgCount = in.readInt(); + // Initial capacity: count / 0.75 + 1 to avoid rehash + timestampCache = new ConcurrentHashMap<>((int) (rgCount / 0.75) + 1); + + for (int i = 0; i < rgCount; i++) + { + long fileId = in.readLong(); + int rgId = in.readInt(); + int len = in.readInt(); + long[] bitmap = new long[len]; + for (int j = 0; j < len; j++) + { + bitmap[j] = in.readLong(); + } + timestampCache.put(fileId + "_" + rgId, bitmap); + } + } + long end = System.currentTimeMillis(); + logger.info("Loaded visibility checkpoint from {} in {} ms, RG count: {}", path, (end - start), timestampCache.size()); + return timestampCache; + } +} diff --git a/pixels-core/src/test/java/io/pixelsdb/pixels/core/reader/TestVisibilityCheckpointCache.java b/pixels-core/src/test/java/io/pixelsdb/pixels/core/reader/TestVisibilityCheckpointCache.java new file mode 100644 index 0000000000..b452f62b32 --- /dev/null +++ b/pixels-core/src/test/java/io/pixelsdb/pixels/core/reader/TestVisibilityCheckpointCache.java @@ -0,0 +1,169 @@ +/* + * Copyright 2025 PixelsDB. + * + * This file is part of Pixels. + * + * Pixels is free software: you can redistribute it and/or modify + * it under the terms of the Affero GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Pixels is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Affero GNU General Public License for more details. + * + * You should have received a copy of the Affero GNU General Public + * License along with Pixels. If not, see + * . + */ +package io.pixelsdb.pixels.core.reader; + +import io.pixelsdb.pixels.common.physical.Storage; +import io.pixelsdb.pixels.common.physical.StorageFactory; +import io.pixelsdb.pixels.common.utils.ConfigFactory; +import org.junit.Before; +import org.junit.Test; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Test client-side checkpoint loading and caching performance. + */ +public class TestVisibilityCheckpointCache +{ + private String testCheckpointDir; + private Storage storage; + private final long fileId = 999999L; + private final int rgId = 0; + + @Before + public void setUp() throws IOException + { + testCheckpointDir = ConfigFactory.Instance().getProperty("pixels.retina.checkpoint.dir"); + storage = StorageFactory.Instance().getStorage(testCheckpointDir); + + if (!storage.exists(testCheckpointDir)) + { + storage.mkdirs(testCheckpointDir); + } + } + + private String resolve(String dir, String filename) { + return dir.endsWith("/") ? dir + filename : dir + "/" + filename; + } + + /** + * Helper to create a dummy checkpoint file. + */ + private void createDummyCheckpoint(String path, int numFiles, int rgsPerFile, long[] bitmap) throws IOException + { + try (DataOutputStream out = storage.create(path, true, 8 * 1024 * 1024)) + { + out.writeInt(numFiles * rgsPerFile); + for (int f = 0; f < numFiles; f++) + { + for (int r = 0; r < rgsPerFile; r++) + { + out.writeLong((long)f); + out.writeInt(r); + out.writeInt(bitmap.length); + for (long l : bitmap) + { + out.writeLong(l); + } + } + } + out.flush(); + } + } + + @Test + public void testCacheLoading() throws IOException + { + long timestamp = 1000L; + String checkpointPath = resolve(testCheckpointDir, "test_vis_1000.bin"); + long[] dummyBitmap = new long[]{0x1L, 0x2L}; + createDummyCheckpoint(checkpointPath, 1, 1, dummyBitmap); + + VisibilityCheckpointCache cache = VisibilityCheckpointCache.getInstance(); + long[] loaded = cache.getVisibilityBitmap(timestamp, checkpointPath, 0L, 0); + + assertNotNull(loaded); + assertTrue(loaded.length == 2); + assertTrue(loaded[0] == 0x1L); + assertTrue(loaded[1] == 0x2L); + } + + /** + * Migrated and adapted performance test. + */ + @Test + public void testCheckpointPerformance() throws InterruptedException, IOException + { + // 1. Configuration parameters + int numFiles = 5000; + int rgsPerFile = 1; + int rowsPerRG = 200000; + int queryCount = 200; + long timestamp = 2000L; + String checkpointPath = resolve(testCheckpointDir, "perf_test_2000.bin"); + + // 2. Prepare data + int bitmapLen = (rowsPerRG + 63) / 64; + long[] dummyBitmap = new long[bitmapLen]; + for(int i=0; i offloadedCheckpoints; - private final Cache> offloadCache; private final String checkpointDir; private long latestGcTimestamp = -1; private final int totalVirtualNodeNum; @@ -115,14 +112,6 @@ private RetinaResourceManager() this.offloadedCheckpoints = new ConcurrentHashMap<>(); ConfigFactory config = ConfigFactory.Instance(); - long leaseDuration = Long.parseLong(config.getProperty("retina.offload.cache.lease.duration")); - - this.offloadCache = Caffeine.newBuilder() - .expireAfterAccess(leaseDuration, TimeUnit.SECONDS) - .removalListener((key, value, cause) -> { - logger.info("Retina offload cache for timestamp {} evicted due to {}", key, cause); - }) - .build(); this.checkpointRefCounts = new ConcurrentHashMap<>(); this.checkpointDir = config.getProperty("pixels.retina.checkpoint.dir"); @@ -343,16 +332,7 @@ public void addVisibility(String filePath) throws RetinaException public long[] queryVisibility(long fileId, int rgId, long timestamp, long transId) throws RetinaException { - // [Routing Logic] Only read from disk/cache if the transaction is explicitly registered as Offload - if (transId != -1) - { - if (offloadedCheckpoints.containsKey(timestamp)) - { - return queryFromOffloadCache(timestamp, fileId, rgId); - } - throw new RetinaException("Offloaded checkpoint missing for TransID " + transId); - } - // otherwise read from memory + // read from memory RGVisibility rgVisibility = checkRGVisibility(fileId, rgId); long[] visibilityBitmap = rgVisibility.getVisibilityBitmap(timestamp); if (visibilityBitmap == null) @@ -362,20 +342,6 @@ public long[] queryVisibility(long fileId, int rgId, long timestamp, long transI return visibilityBitmap; } - private long[] queryFromOffloadCache(long timestamp, long fileId, int rgId) throws RetinaException - { - Map cache = offloadCache.getIfPresent(timestamp); - if (cache != null) - { - long[] bitmap = cache.get(fileId + "_" + rgId); - if (bitmap != null) - { - return bitmap; - } - } - // Cache miss, load from storage - return loadBitmapFromStorage(timestamp, fileId, rgId); - } public long[] queryVisibility(long fileId, int rgId, long timestamp) throws RetinaException { @@ -476,7 +442,6 @@ public void unregisterOffload(long timestamp) if (remaining <= 0) { offloadedCheckpoints.remove(timestamp); - offloadCache.invalidate(timestamp); if (refCount.get() > 0) { logger.info("Checkpoint resurrection detected, skipping deletion. TS: {}", timestamp); @@ -577,85 +542,6 @@ private CompletableFuture createCheckpoint(long timestamp, CheckpointType }); } - private long[] loadBitmapFromStorage(long timestamp, long targetFileId, int targetRgId) throws RetinaException - { - String path = offloadedCheckpoints.get(timestamp); - if (path == null) - { - throw new RetinaException("Checkpoint missing: " + timestamp); - } - - // Use a lock to ensure only one thread parses the file for this timestamp - // For 50,000 RGs, the initial capacity of ConcurrentHashMap should be sufficient - Map cache; - synchronized (offloadCache) - { - cache = offloadCache.getIfPresent(timestamp); - if (cache == null) - { - cache = new ConcurrentHashMap<>(50000); - offloadCache.put(timestamp, cache); - } - } - - synchronized (cache) - { - // Double check if target already in cache after acquiring lock - long[] cached = cache.get(targetFileId + "_" + targetRgId); - if (cached != null) - { - return cached; - } - - // Still not in cache, perform full load - try - { - Storage storage = StorageFactory.Instance().getStorage(path); - long fileLength = storage.getStatus(path).getLength(); - - // Use PhysicalReader to read the entire file into memory at once - // This is much faster than streaming from S3 for many small reads - byte[] content; - try (PhysicalReader reader = PhysicalReaderUtil.newPhysicalReader(storage, path)) - { - ByteBuffer buffer = reader.readFully((int) fileLength); - if (buffer.hasArray()) - { - content = buffer.array(); - } - else - { - content = new byte[(int) fileLength]; - buffer.get(content); - } - } - - try (DataInputStream in = new DataInputStream(new ByteArrayInputStream(content))) - { - int rgCount = in.readInt(); - - for (int i = 0; i < rgCount; i++) - { - long fileId = in.readLong(); - int rgId = in.readInt(); - int len = in.readInt(); - long[] bitmap = new long[len]; - for (int j = 0; j < len; j++) - { - bitmap[j] = in.readLong(); - } - - cache.put(fileId + "_" + rgId, bitmap); - } - } - } catch (IOException e) - { - throw new RetinaException("Failed to read checkpoint file", e); - } - } - - return cache.getOrDefault(targetFileId + "_" + targetRgId, new long[0]); - } private void removeCheckpointFile(long timestamp, CheckpointType type) { @@ -678,6 +564,11 @@ public void reclaimVisibility(long fileId, int rgId, long timestamp) throws Reti rgVisibility.getVisibilityBitmap(timestamp); } + public String getCheckpointPath(long timestamp) + { + return offloadedCheckpoints.get(timestamp); + } + public void deleteRecord(long fileId, int rgId, int rgRowOffset, long timestamp) throws RetinaException { RGVisibility rgVisibility = checkRGVisibility(fileId, rgId); diff --git a/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java b/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java index aea09cd43d..4a36223477 100644 --- a/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java +++ b/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java @@ -30,19 +30,19 @@ import java.io.DataOutputStream; import java.io.IOException; import java.lang.reflect.Field; +import java.net.InetAddress; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.LongAdder; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; /** - * Test checkpoint creation and recovery logic. + * Test checkpoint creation and recovery logic in Retina side. */ public class TestRetinaCheckpoint { @@ -52,6 +52,7 @@ public class TestRetinaCheckpoint private final long fileId = 999999L; private final int rgId = 0; private final int numRows = 1024; + private String hostName; @Before public void setUp() throws IOException, RetinaException @@ -78,12 +79,25 @@ public void setUp() throws IOException, RetinaException retinaManager = RetinaResourceManager.Instance(); resetSingletonState(); + hostName = System.getenv("HOSTNAME"); + if (hostName == null) + { + hostName = InetAddress.getLocalHost().getHostName(); + } } private String resolve(String dir, String filename) { return dir.endsWith("/") ? dir + filename : dir + "/" + filename; } + private String getOffloadFileName(long timestamp) { + return "vis_offload_" + hostName + "_" + timestamp + ".bin"; + } + + private String getGcFileName(long timestamp) { + return "vis_gc_" + hostName + "_" + timestamp + ".bin"; + } + @Test public void testRegisterOffload() throws RetinaException, IOException { @@ -96,7 +110,7 @@ public void testRegisterOffload() throws RetinaException, IOException retinaManager.registerOffload(timestamp); // Verify checkpoint file exists - String expectedFile = resolve(testCheckpointDir, "vis_offload_100.bin"); + String expectedFile = resolve(testCheckpointDir, getOffloadFileName(timestamp)); assertTrue("Offload checkpoint file should exist", storage.exists(expectedFile)); System.out.println("Verified: Checkpoint file exists at " + expectedFile); @@ -122,7 +136,7 @@ public void testMultipleOffloads() throws RetinaException, IOException retinaManager.registerOffload(timestamp1); retinaManager.registerOffload(timestamp1_dup); - String expectedFile = resolve(testCheckpointDir, "vis_offload_100.bin"); + String expectedFile = resolve(testCheckpointDir, getOffloadFileName(timestamp1)); assertTrue("Offload checkpoint file should exist", storage.exists(expectedFile)); // Unregister one - should not remove yet (ref count >1) @@ -157,11 +171,11 @@ public void testCheckpointRecovery() throws RetinaException, IOException // 2. Register Offload to generate checkpoint file System.out.println("Creating checkpoint on disk..."); retinaManager.registerOffload(timestamp); - String offloadPath = resolve(testCheckpointDir, "vis_offload_" + timestamp + ".bin"); + String offloadPath = resolve(testCheckpointDir, getOffloadFileName(timestamp)); assertTrue("Checkpoint file should exist", storage.exists(offloadPath)); // 3. Rename offload file to GC file to simulate checkpoint generated by GC - String gcPath = resolve(testCheckpointDir, "vis_gc_" + timestamp + ".bin"); + String gcPath = resolve(testCheckpointDir, getGcFileName(timestamp)); System.out.println("Simulating GC checkpoint by renaming offload file to: " + gcPath); // Storage interface doesn't have renamed, using copy and delete try (DataInputStream in = storage.open(offloadPath); @@ -196,47 +210,6 @@ public void testCheckpointRecovery() throws RetinaException, IOException System.out.println("Verified: Recovery successful, row state restored. testCheckpointRecovery passed."); } - @Test - public void testDiskBitmapQuery() throws RetinaException - { - System.out.println("\n[Test] Starting testDiskBitmapQuery..."); - retinaManager.addVisibility(fileId, rgId, numRows); - long baseTimestamp = 200L; - long transId = 888L; - - // 1. Delete row 5 at baseTimestamp - System.out.println("Deleting row 5 at t=" + baseTimestamp); - retinaManager.deleteRecord(fileId, rgId, 5, baseTimestamp); - - // 2. Register Offload for this transaction (save snapshot at this moment to disk) - System.out.println("Registering offload (disk snapshot)..."); - retinaManager.registerOffload(baseTimestamp); - - // 3. Delete row 6 at a later time baseTimestamp + 10 - // This only affects the latest state in memory, should not affect the checkpoint on disk - System.out.println("Deleting row 6 at t=" + (baseTimestamp + 10) + " (after snapshot)"); - retinaManager.deleteRecord(fileId, rgId, 6, baseTimestamp + 10); - - // 4. Case A: Query using transId (should read disk Checkpoint) - // Expected: Row 5 deleted, Row 6 not deleted (deleted after checkpoint) - System.out.println("Querying via transId (expecting disk data)..."); - long[] diskBitmap = retinaManager.queryVisibility(fileId, rgId, baseTimestamp, transId); - assertTrue("Disk: Row 5 should be deleted", isBitSet(diskBitmap, 5)); - assertFalse("Disk: Row 6 should NOT be deleted (deleted after checkpoint)", isBitSet(diskBitmap, 6)); - System.out.println("Verified: Disk query correctly ignores later memory updates."); - - // 5. Case B: Query without transId (read memory) - // Expected: Query at a later timestamp, both rows 5 and 6 are deleted - System.out.println("Querying via memory..."); - long[] memBitmap = retinaManager.queryVisibility(fileId, rgId, baseTimestamp + 20); - assertTrue("Memory: Row 5 should be deleted", isBitSet(memBitmap, 5)); - assertTrue("Memory: Row 6 should be deleted", isBitSet(memBitmap, 6)); - System.out.println("Verified: Memory query sees all updates. testDiskBitmapQuery passed."); - - // Cleanup - retinaManager.unregisterOffload(baseTimestamp); - } - @Test public void testConcurrency() throws InterruptedException, RetinaException { @@ -259,7 +232,6 @@ public void testConcurrency() throws InterruptedException, RetinaException try { startLatch.await(); - long transId = 10000L + threadId; long timestamp = 500L + (threadId % 5) * 10; // Multiple threads may share the same timestamp for (int j = 0; j < operationsPerThread; j++) @@ -272,8 +244,8 @@ public void testConcurrency() throws InterruptedException, RetinaException } else if (j % 3 == 1) { - // Query visibility - long[] bitmap = retinaManager.queryVisibility(fileId, rgId, timestamp, transId); + // Query visibility from memory + long[] bitmap = retinaManager.queryVisibility(fileId, rgId, timestamp); if (!isBitSet(bitmap, 0)) { throw new RuntimeException("Row 0 should be deleted in all views"); } @@ -322,15 +294,6 @@ private void resetSingletonState() offloadedField.setAccessible(true); ((Map) offloadedField.get(retinaManager)).clear(); - Field offloadCacheField = RetinaResourceManager.class.getDeclaredField("offloadCache"); - offloadCacheField.setAccessible(true); - Object offloadCache = offloadCacheField.get(retinaManager); - if (offloadCache != null) { - java.lang.reflect.Method invalidateAll = offloadCache.getClass().getMethod("invalidateAll"); - invalidateAll.setAccessible(true); - invalidateAll.invoke(offloadCache); - } - Field refCountsField = RetinaResourceManager.class.getDeclaredField("checkpointRefCounts"); refCountsField.setAccessible(true); ((Map) refCountsField.get(retinaManager)).clear(); @@ -360,217 +323,4 @@ private boolean isBitSet(long[] bitmap, int rowIndex) return (bitmap[longIndex] & (1L << bitOffset)) != 0; } - - /** - * Test the performance and memory overhead of checkpoint offload and load. - *

- * Run Command: - * LD_PRELOAD=/lib/x86_64-linux-gnu/libjemalloc.so.2 mvn test \ - * -Dtest=TestRetinaCheckpoint#testCheckpointPerformance \ - * -pl pixels-retina \ - * -DargLine="-Xms40g -Xmx40g" - */ - @Test - public void testCheckpointPerformance() throws RetinaException, InterruptedException, IOException - { - // 1. Configuration parameters - int numFiles = 1; - int rgsPerFile = 1; - int rowsPerRG = 200000; // rows per Row Group - long totalRecords = (long) numFiles * rgsPerFile * rowsPerRG; - double targetDeleteRatio = 0.1; // @TARGET_DELETE_RATIO@ - int queryCount = 200; - - long timestamp = 1000L; - long transId = 2000L; - - System.out.println("\n============================================================"); - System.out.println("--- Starting Checkpoint Performance Test ---"); - System.out.printf("Config: %d files, %d RGs/file, %d rows/RG, %d queries\n", - numFiles, rgsPerFile, rowsPerRG, queryCount); - System.out.printf("Target Delete Ratio: %.2f%%\n", targetDeleteRatio * 100); - System.out.println("============================================================\n"); - - // 2. Initialize data and perform random deletes - LongAdder totalActualDeletedRows = new LongAdder(); - - // Step A: Pre-add Visibility - for (int f = 0; f < numFiles; f++) { - for (int r = 0; r < rgsPerFile; r++) { - retinaManager.addVisibility(f, r, rowsPerRG); - } - } - - // Step B: Parallel deleteRecord - int numThreads = Runtime.getRuntime().availableProcessors(); - ExecutorService executor = Executors.newFixedThreadPool(numThreads); - - CountDownLatch latch = new CountDownLatch(numFiles * rgsPerFile); - for (int f = 0; f < numFiles; f++) - { - for (int r = 0; r < rgsPerFile; r++) - { - final int fileId = f; - final int rgId = r; - executor.submit(() -> { - try - { - java.util.Random randomInThread = new java.util.Random(); - int targetDeleteCount = (int) (rowsPerRG * targetDeleteRatio); - - int actualDeletedCount = 0; - java.util.Set deletedInRG = new java.util.HashSet<>(); - while (deletedInRG.size() < targetDeleteCount) - { - int rowId = randomInThread.nextInt(rowsPerRG); - if (deletedInRG.add(rowId)) - { - retinaManager.deleteRecord(fileId, rgId, rowId, timestamp); - actualDeletedCount++; - } - } - totalActualDeletedRows.add(actualDeletedCount); - } catch (Exception e) - { - e.printStackTrace(); - } finally - { - latch.countDown(); - } - }); - } - } - latch.await(30, TimeUnit.MINUTES); - executor.shutdown(); - - double actualDeleteRatio = (double) totalActualDeletedRows.sum() / totalRecords; - System.out.printf("[Data Gen] Total Deleted Rows: %d (Actual Ratio: %.4f%%)\n", - totalActualDeletedRows.sum(), actualDeleteRatio * 100); - - // 3. Test Offload (Checkpoint Creation) performance - System.out.println("\n[Phase 1] Testing Checkpoint Offload (Write)..."); - long startOffload = System.nanoTime(); - retinaManager.registerOffload(timestamp); - long endOffload = System.nanoTime(); - - // [Accuracy] Calculate offload peak memory overhead AFTER timing to avoid interference. - // We simulate the snapshot logic to get the exact physical size of long arrays being offloaded. - long offloadPeakBytes = calculateOffloadPeakMemory(timestamp); - - double offloadTimeMs = (endOffload - startOffload) / 1e6; - String offloadPath = resolve(testCheckpointDir, "vis_offload_" + timestamp + ".bin"); - long fileSize = storage.getStatus(offloadPath).getLength(); - double writeThroughputMBs = (fileSize / (1024.0 * 1024.0)) / (offloadTimeMs / 1000.0); - - System.out.println("------------------------------------------------------------"); - System.out.printf("Total Offload Time: %.2f ms\n", offloadTimeMs); - System.out.printf("Checkpoint File Size: %.2f MB\n", fileSize / (1024.0 * 1024.0)); - System.out.printf("Offload Peak Mem Overhead: %.2f MB\n", offloadPeakBytes / (1024.0 * 1024.0)); - System.out.printf("Write Throughput: %.2f MB/s\n", writeThroughputMBs); - System.out.printf("Avg Offload Time per RG: %.4f ms\n", offloadTimeMs / (numFiles * rgsPerFile)); - System.out.println("------------------------------------------------------------"); - - // 4. Test Load (Checkpoint Load) performance - System.out.println("\n[Phase 2] Testing Checkpoint Load (Read)..."); - long firstLoadTimeNs = 0; - long subsequentLoadsTotalNs = 0; - java.util.Random randomForQuery = new java.util.Random(); - - for (int i = 0; i < queryCount; i++) - { - int f = randomForQuery.nextInt(numFiles); - int r = randomForQuery.nextInt(rgsPerFile); - - long start = System.nanoTime(); - retinaManager.queryVisibility(f, r, timestamp, transId); - long end = System.nanoTime(); - - if (i == 0) - { - firstLoadTimeNs = (end - start); - System.out.printf("Cold Start Query (Triggered full file load): %.2f ms\n", firstLoadTimeNs / 1e6); - } - else - { - subsequentLoadsTotalNs += (end - start); - } - } - - // [Accuracy] Calculate load memory overhead AFTER timing via reflection on offloadCache. - long loadCacheBytes = calculateLoadCacheMemory(timestamp); - - double firstLoadTimeMs = firstLoadTimeNs / 1e6; - double avgSubsequentLoadTimeMs = (subsequentLoadsTotalNs / (queryCount - 1.0)) / 1e6; - double readThroughputMBs = (fileSize / (1024.0 * 1024.0)) / (firstLoadTimeMs / 1000.0); - - System.out.println("------------------------------------------------------------"); - System.out.printf("First Load Time (Cold): %.2f ms\n", firstLoadTimeMs); - System.out.printf("Load Memory Overhead: %.2f MB\n", loadCacheBytes / (1024.0 * 1024.0)); - System.out.printf("Read/Parse Throughput: %.2f MB/s\n", readThroughputMBs); - System.out.printf("Avg Memory Hit Latency: %.6f ms\n", avgSubsequentLoadTimeMs); - System.out.printf("Total Time for %d queries: %.2f ms\n", queryCount, (firstLoadTimeNs + subsequentLoadsTotalNs) / 1e6); - System.out.println("------------------------------------------------------------"); - - // 5. Cleanup - retinaManager.unregisterOffload(timestamp); - System.out.println("\n--- Checkpoint Performance Test Finished ---\n"); - } - - /** - * Accurately calculate the memory size of long arrays that would be captured in a snapshot. - * This represents the peak heap usage during the offload process. - */ - private long calculateOffloadPeakMemory(long timestamp) - { - try { - Field rgMapField = RetinaResourceManager.class.getDeclaredField("rgVisibilityMap"); - rgMapField.setAccessible(true); - @SuppressWarnings("unchecked") - Map rgMap = (Map) rgMapField.get(retinaManager); - - long totalBytes = 0; - for (RGVisibility visibility : rgMap.values()) { - long[] bitmap = visibility.getVisibilityBitmap(timestamp); - if (bitmap != null) { - totalBytes += (long) bitmap.length * 8; - } - } - return totalBytes; - } catch (Exception e) { - e.printStackTrace(); - return 0; - } - } - - /** - * Accurately calculate the memory size of long arrays currently stored in offloadCache. - * This represents the persistent heap usage after loading a checkpoint. - */ - private long calculateLoadCacheMemory(long timestamp) - { - try { - Field offloadCacheField = RetinaResourceManager.class.getDeclaredField("offloadCache"); - offloadCacheField.setAccessible(true); - Object offloadCacheObj = offloadCacheField.get(retinaManager); - if (offloadCacheObj == null) return 0; - - java.lang.reflect.Method asMapMethod = offloadCacheObj.getClass().getMethod("asMap"); - asMapMethod.setAccessible(true); - @SuppressWarnings("unchecked") - Map> offloadCacheMap = - (Map>) asMapMethod.invoke(offloadCacheObj); - - Map cacheForTs = offloadCacheMap.get(timestamp); - if (cacheForTs == null) return 0; - - long totalBytes = 0; - for (long[] bitmap : cacheForTs.values()) { - totalBytes += (long) bitmap.length * 8; - } - return totalBytes; - } catch (Exception e) { - e.printStackTrace(); - return 0; - } - } } diff --git a/proto/retina.proto b/proto/retina.proto index e41cb93f51..9c625aecdd 100644 --- a/proto/retina.proto +++ b/proto/retina.proto @@ -55,8 +55,8 @@ message RequestHeader { message ResponseHeader { string token = 1; - int32 errorCode = 2; // errorCode == 0 means success - string errorMsg = 3; // empty if errorCode == 0 + int32 errorCode = 2; // errorCode == 0 means success + string errorMsg = 3; // empty if errorCode == 0 } // auxiliary structure @@ -135,6 +135,7 @@ message QueryVisibilityRequest { message QueryVisibilityResponse { ResponseHeader header = 1; repeated VisibilityBitmap bitmaps = 2; + string checkpointPath = 3; } message ReclaimVisibilityRequest { From 62faaa130e48ec8c0537791d2237a2a121cceaaa Mon Sep 17 00:00:00 2001 From: Dongyang Geng Date: Tue, 10 Feb 2026 23:36:23 +0800 Subject: [PATCH 12/15] fix: checkpoint load test --- pixels-core/pom.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pixels-core/pom.xml b/pixels-core/pom.xml index 952f37b952..76cdceaa0f 100644 --- a/pixels-core/pom.xml +++ b/pixels-core/pom.xml @@ -59,6 +59,12 @@ test true + + + org.junit.vintage + junit-vintage-engine + test + From dfa1ab0776f10615d5a2fc37ac1c357e6086b665 Mon Sep 17 00:00:00 2001 From: Dongyang Geng Date: Wed, 11 Feb 2026 10:19:04 +0800 Subject: [PATCH 13/15] feat: update cp cache test --- .../pixels/core/reader/TestVisibilityCheckpointCache.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pixels-core/src/test/java/io/pixelsdb/pixels/core/reader/TestVisibilityCheckpointCache.java b/pixels-core/src/test/java/io/pixelsdb/pixels/core/reader/TestVisibilityCheckpointCache.java index b452f62b32..bfcec50ee6 100644 --- a/pixels-core/src/test/java/io/pixelsdb/pixels/core/reader/TestVisibilityCheckpointCache.java +++ b/pixels-core/src/test/java/io/pixelsdb/pixels/core/reader/TestVisibilityCheckpointCache.java @@ -91,7 +91,7 @@ private void createDummyCheckpoint(String path, int numFiles, int rgsPerFile, lo public void testCacheLoading() throws IOException { long timestamp = 1000L; - String checkpointPath = resolve(testCheckpointDir, "test_vis_1000.bin"); + String checkpointPath = resolve(testCheckpointDir, "vis_gc_tencent_100.bin"); long[] dummyBitmap = new long[]{0x1L, 0x2L}; createDummyCheckpoint(checkpointPath, 1, 1, dummyBitmap); From 6374cd76d28fce489bcf4f5bcf6408ba8e44e339 Mon Sep 17 00:00:00 2001 From: Dongyang Geng Date: Wed, 11 Feb 2026 12:00:23 +0800 Subject: [PATCH 14/15] fix: cp recover --- .../pixels/common/utils/RetinaUtils.java | 16 +- .../reader/VisibilityCheckpointCache.java | 4 +- .../daemon/retina/RetinaServerImpl.java | 1 - .../pixelsdb/pixels/retina/RGVisibility.java | 8 + .../pixels/retina/RetinaResourceManager.java | 434 ++++++++---------- .../pixels/retina/TestRetinaCheckpoint.java | 135 +++++- 6 files changed, 347 insertions(+), 251 deletions(-) diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/RetinaUtils.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/RetinaUtils.java index 2ba8d0c4b8..143a43d846 100644 --- a/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/RetinaUtils.java +++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/RetinaUtils.java @@ -29,6 +29,10 @@ public class RetinaUtils { + public static final String CHECKPOINT_PREFIX_GC = "vis_gc_"; + public static final String CHECKPOINT_PREFIX_OFFLOAD = "vis_offload_"; + public static final String CHECKPOINT_SUFFIX = ".bin"; + private static volatile RetinaUtils instance; private final int bucketNum; private final int defaultRetinaPort; @@ -103,7 +107,7 @@ public static RetinaService getRetinaServiceFromBucketId(int bucketId) return RetinaService.CreateInstance(retinaHost, getInstance().defaultRetinaPort); } - public static RetinaService getRetinaServiceFromPath(String path) + public static RetinaService getRetinaServiceFromPath(String path) { String retinaHost = extractRetinaHostNameFromPath(path); if(retinaHost == null || retinaHost.equals(Constants.LOAD_DEFAULT_RETINA_PREFIX)) @@ -113,6 +117,16 @@ public static RetinaService getRetinaServiceFromPath(String path) return RetinaService.CreateInstance(retinaHost, getInstance().defaultRetinaPort); } + public static String getCheckpointFileName(String prefix, String hostname, long timestamp) + { + return prefix + hostname + "_" + timestamp + CHECKPOINT_SUFFIX; + } + + public static String getCheckpointPrefix(String typePrefix, String hostname) + { + return typePrefix + hostname + "_"; + } + private static String extractRetinaHostNameFromPath(String path) { if (path == null || path.isEmpty()) { diff --git a/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/VisibilityCheckpointCache.java b/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/VisibilityCheckpointCache.java index 00d58ec043..ee7845dff3 100644 --- a/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/VisibilityCheckpointCache.java +++ b/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/VisibilityCheckpointCache.java @@ -50,9 +50,7 @@ public class VisibilityCheckpointCache private VisibilityCheckpointCache() { - ConfigFactory config = ConfigFactory.Instance(); - String leaseProperty = config.getProperty("retina.offload.cache.lease.duration"); - long leaseDuration = leaseProperty != null ? Long.parseLong(leaseProperty) : 3600; + long leaseDuration = Long.parseLong(ConfigFactory.Instance().getProperty("retina.offload.cache.lease.duration")); this.cache = Caffeine.newBuilder() .expireAfterAccess(leaseDuration, TimeUnit.SECONDS) diff --git a/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/retina/RetinaServerImpl.java b/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/retina/RetinaServerImpl.java index 2c53f34914..4070862995 100644 --- a/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/retina/RetinaServerImpl.java +++ b/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/retina/RetinaServerImpl.java @@ -154,7 +154,6 @@ public RetinaServerImpl() this.retinaResourceManager.addWriteBuffer(schema.getName(), table.getName()); } } - this.retinaResourceManager.finishRecovery(); logger.info("Retina service is ready"); } catch (Exception e) { diff --git a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RGVisibility.java b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RGVisibility.java index 954c297104..a986717f93 100644 --- a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RGVisibility.java +++ b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RGVisibility.java @@ -63,14 +63,17 @@ public class RGVisibility implements AutoCloseable * Constructor creates C++ object and returns handle. */ private final AtomicLong nativeHandle = new AtomicLong(); + private final long recordNum; public RGVisibility(long rgRecordNum) { + this.recordNum = rgRecordNum; this.nativeHandle.set(createNativeObject(rgRecordNum)); } public RGVisibility(long rgRecordNum, long timestamp, long[] initialBitmap) { + this.recordNum = rgRecordNum; if (initialBitmap == null) { this.nativeHandle.set(createNativeObject(rgRecordNum)); @@ -80,6 +83,11 @@ public RGVisibility(long rgRecordNum, long timestamp, long[] initialBitmap) } } + public long getRecordNum() + { + return recordNum; + } + @Override public void close() { diff --git a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java index 185ba59ded..a28df69a48 100644 --- a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java +++ b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java @@ -31,6 +31,7 @@ import io.pixelsdb.pixels.common.physical.StorageFactory; import io.pixelsdb.pixels.common.transaction.TransService; import io.pixelsdb.pixels.common.utils.ConfigFactory; +import io.pixelsdb.pixels.common.utils.RetinaUtils; import io.pixelsdb.pixels.core.PixelsProto; import io.pixelsdb.pixels.core.TypeDescription; import io.pixelsdb.pixels.index.IndexProto; @@ -65,35 +66,25 @@ public class RetinaResourceManager // Checkpoint related fields private final ExecutorService checkpointExecutor; private final Map offloadedCheckpoints; + private final Map> checkpointFutures; private final String checkpointDir; private long latestGcTimestamp = -1; private final int totalVirtualNodeNum; private final Map checkpointRefCounts; - private static class RecoveredState - { - final long timestamp; - final long[] bitmap; - - RecoveredState(long timestamp, long[] bitmap) - { - this.timestamp = timestamp; - this.bitmap = bitmap; - } - } - private final Map recoveryCache; - private static class CheckpointEntry { final long fileId; final int rgId; + final int recordNum; final long[] bitmap; - CheckpointEntry(long fileId, int rgId, long[] bitmap) + CheckpointEntry(long fileId, int rgId, int recordNum, long[] bitmap) { this.fileId = fileId; this.rgId = rgId; + this.recordNum = recordNum; this.bitmap = bitmap; } } @@ -110,12 +101,12 @@ private RetinaResourceManager() this.rgVisibilityMap = new ConcurrentHashMap<>(); this.pixelsWriteBufferMap = new ConcurrentHashMap<>(); this.offloadedCheckpoints = new ConcurrentHashMap<>(); + this.checkpointFutures = new ConcurrentHashMap<>(); ConfigFactory config = ConfigFactory.Instance(); this.checkpointRefCounts = new ConcurrentHashMap<>(); this.checkpointDir = config.getProperty("pixels.retina.checkpoint.dir"); - this.recoveryCache = new ConcurrentHashMap<>(); int cpThreads = Integer.parseInt(config.getProperty("retina.checkpoint.threads")); this.checkpointExecutor = Executors.newFixedThreadPool(cpThreads, r -> { @@ -171,134 +162,15 @@ public static RetinaResourceManager Instance() return InstanceHolder.instance; } - public void recoverCheckpoints() - { - try - { - Storage storage = StorageFactory.Instance().getStorage(checkpointDir); - if (!storage.exists(checkpointDir)) - { - storage.mkdirs(checkpointDir); - return; - } - - List allFiles = storage.listPaths(checkpointDir); - // filter only .bin files - allFiles = allFiles.stream().filter(p -> p.endsWith(".bin")).collect(Collectors.toList()); - - List gcTimestamps = new ArrayList<>(); - String offloadPrefix = "vis_offload_" + retinaHostName + "_"; - String gcPrefix = "vis_gc_" + retinaHostName + "_"; - - for (String path : allFiles) - { - // use Paths.get().getFileName() to extract filename from path string - String filename = Paths.get(path).getFileName().toString(); - if (filename.startsWith(offloadPrefix)) - { - // delete offload checkpoint files when restarting - try - { - storage.delete(path, false); - } catch (IOException e) - { - logger.error("Failed to delete checkpoint file {}", path, e); - } - } else if (filename.startsWith(gcPrefix)) - { - try - { - gcTimestamps.add(Long.parseLong(filename.replace(gcPrefix, "").replace(".bin", ""))); - } catch (Exception e) - { - logger.error("Failed to parse checkpoint timestamp from file {}", path, e); - } - } - } - - if (gcTimestamps.isEmpty()) - { - return; - } - - Collections.sort(gcTimestamps); - long latestTs = gcTimestamps.get(gcTimestamps.size() - 1); - this.latestGcTimestamp = latestTs; - logger.info("Loading system state from GC checkpoint: {}", latestTs); - - // load to recoveryCache - loadCheckpointToCache(latestTs); - - // delete old GC checkpoint files - for (int i = 0; i < gcTimestamps.size() - 1; i++) - { - removeCheckpointFile(gcTimestamps.get(i), CheckpointType.GC); - } - } catch (IOException e) - { - logger.error("Failed to recover checkpoints", e); - } - } - - private void loadCheckpointToCache(long timestamp) - { - String fileName = "vis_gc_" + retinaHostName + "_" + timestamp + ".bin"; - // construct path. Storage expects '/' separator usually, but let's be safe - String path = checkpointDir.endsWith("/") ? checkpointDir + fileName : checkpointDir + "/" + fileName; - - try - { - Storage storage = StorageFactory.Instance().getStorage(path); - if (!storage.exists(path)) - { - return; - } - - try (DataInputStream in = storage.open(path)) - { - int rgCount = in.readInt(); - for (int i = 0; i < rgCount; i++) - { - long fileId = in.readLong(); - int rgId = in.readInt(); - int len = in.readInt(); - long[] bitmap = new long[len]; - for (int j = 0; j < len; j++) - { - bitmap[j] = in.readLong(); - } - recoveryCache.put(fileId + "_" + rgId, new RecoveredState(timestamp, bitmap)); - } - } - } catch (IOException e) - { - logger.error("Failed to read checkpoint file: {}", e); - } - } - public void addVisibility(long fileId, int rgId, int recordNum) { String rgKey = fileId + "_" + rgId; - RecoveredState recoveredState = recoveryCache.remove(rgKey); - - RGVisibility rgVisibility; - if (recoveredState != null) - { - rgVisibility = new RGVisibility(recordNum, recoveredState.timestamp, recoveredState.bitmap); - } else + if (rgVisibilityMap.containsKey(rgKey)) { - rgVisibility = new RGVisibility(recordNum); + return; } - rgVisibilityMap.put(rgKey, rgVisibility); - } - public void finishRecovery() - { - if (!recoveryCache.isEmpty()) - { - logger.info("Dropping {} orphaned entries from recovery cache.", recoveryCache.size()); - recoveryCache.clear(); - } + rgVisibilityMap.put(rgKey, new RGVisibility(recordNum)); } public void addVisibility(String filePath) throws RetinaException @@ -360,74 +232,57 @@ public long[] queryVisibility(long fileId, int rgId, long timestamp) throws Reti */ public void registerOffload(long timestamp) throws RetinaException { - while (true) + AtomicInteger refCount = checkpointRefCounts.computeIfAbsent(timestamp, k -> new AtomicInteger(0)); + CompletableFuture future; + + synchronized (refCount) { - AtomicInteger refCount = checkpointRefCounts.computeIfAbsent(timestamp, k -> new AtomicInteger(0)); + refCount.incrementAndGet(); - CompletableFuture currentFuture = null; - synchronized (refCount) + // If checkpoint already exists and is fully committed, just return + if (offloadedCheckpoints.containsKey(timestamp)) { - if (checkpointRefCounts.get(timestamp) != refCount) - { - continue; - } - - int currentRef = refCount.incrementAndGet(); - if (currentRef == 1) - { - try - { - if (!offloadedCheckpoints.containsKey(timestamp)) - { - currentFuture = createCheckpoint(timestamp, CheckpointType.OFFLOAD); - } - } catch (Exception e) - { - refCount.decrementAndGet(); - if (e instanceof RuntimeException) throw (RuntimeException) e; - throw new RetinaException("Failed to register offload", e); - } - } + logger.info("Registered offload for Timestamp: {} (already exists)", timestamp); + return; } - if (currentFuture != null) + // Check if there is an existing future + future = checkpointFutures.get(timestamp); + if (future != null && future.isCompletedExceptionally()) { - try - { - currentFuture.join(); - } catch (Exception e) - { - // Checkpoint creation failed - synchronized (refCount) - { - refCount.decrementAndGet(); - } - throw new RetinaException("Failed to create checkpoint", e); - } + // If previous attempt failed, remove it so we can retry + checkpointFutures.remove(timestamp, future); + future = null; } - else + + if (future == null) { - // Wait for the thread that's currently creating the checkpoint - while (!offloadedCheckpoints.containsKey(timestamp) && checkpointRefCounts.containsKey(timestamp)) - { + future = checkpointFutures.computeIfAbsent(timestamp, k -> { try { - Thread.sleep(100); - } catch (InterruptedException e) + return createCheckpoint(timestamp, CheckpointType.OFFLOAD); + } catch (RetinaException e) { - Thread.currentThread().interrupt(); - throw new RetinaException("Interrupted while waiting for offload", e); + throw new CompletionException(e); } - } - if (!offloadedCheckpoints.containsKey(timestamp)) - { - // Maybe the other thread failed or it was unregistered immediately - continue; - } + }); } + } + try + { + future.join(); logger.info("Registered offload for Timestamp: {}", timestamp); - return; + } catch (Exception e) + { + synchronized (refCount) + { + refCount.decrementAndGet(); + // We don't remove from checkpointFutures here anymore, + // because it's handled above in the synchronized block for retries + // or let the next caller handle it. + } + throw new RetinaException("Failed to create checkpoint for timestamp: " + timestamp, e); } } @@ -442,6 +297,7 @@ public void unregisterOffload(long timestamp) if (remaining <= 0) { offloadedCheckpoints.remove(timestamp); + checkpointFutures.remove(timestamp); if (refCount.get() > 0) { logger.info("Checkpoint resurrection detected, skipping deletion. TS: {}", timestamp); @@ -457,8 +313,8 @@ public void unregisterOffload(long timestamp) private CompletableFuture createCheckpoint(long timestamp, CheckpointType type) throws RetinaException { - String prefix = (type == CheckpointType.GC) ? "vis_gc_" : "vis_offload_"; - String fileName = prefix + retinaHostName + "_" + timestamp + ".bin"; + String prefix = (type == CheckpointType.GC) ? RetinaUtils.CHECKPOINT_PREFIX_GC : RetinaUtils.CHECKPOINT_PREFIX_OFFLOAD; + String fileName = RetinaUtils.getCheckpointFileName(prefix, retinaHostName, timestamp); String filePath = checkpointDir.endsWith("/") ? checkpointDir + fileName : checkpointDir + "/" + fileName; // 1. Capture current entries to ensure we process a consistent set of RGs @@ -480,8 +336,9 @@ private CompletableFuture createCheckpoint(long timestamp, CheckpointType String[] parts = key.split("_"); long fileId = Long.parseLong(parts[0]); int rgId = Integer.parseInt(parts[1]); - long[] bitmap = entry.getValue().getVisibilityBitmap(timestamp); - queue.put(new CheckpointEntry(fileId, rgId, bitmap)); + RGVisibility rgVisibility = entry.getValue(); + long[] bitmap = rgVisibility.getVisibilityBitmap(timestamp); + queue.put(new CheckpointEntry(fileId, rgId, (int) rgVisibility.getRecordNum(), bitmap)); } catch (Exception e) { logger.error("Failed to fetch visibility bitmap for checkpoint", e); @@ -492,52 +349,69 @@ private CompletableFuture createCheckpoint(long timestamp, CheckpointType // 4. Async Write: perform IO in background thread (Consumer) // Use commonPool to avoid deadlocks with checkpointExecutor return CompletableFuture.runAsync(() -> { - long startWrite = System.currentTimeMillis(); - try + // Lock on filePath string intern to ensure only one thread writes to the same file + synchronized (filePath.intern()) { - Storage storage = StorageFactory.Instance().getStorage(filePath); - try (DataOutputStream out = storage.create(filePath, true, 8 * 1024 * 1024)) + if (type == CheckpointType.OFFLOAD && offloadedCheckpoints.containsKey(timestamp)) + { + return; + } + if (type == CheckpointType.GC && timestamp <= latestGcTimestamp) { - out.writeInt(totalRgs); - for (int i = 0; i < totalRgs; i++) + return; + } + + long startWrite = System.currentTimeMillis(); + try + { + Storage storage = StorageFactory.Instance().getStorage(filePath); + // Use a temporary file to ensure atomic commit + // Although LocalFS lacks rename, using a synchronized block here + // makes it safe within this JVM instance. + try (DataOutputStream out = storage.create(filePath, true, 8 * 1024 * 1024)) { - CheckpointEntry entry = queue.take(); - out.writeLong(entry.fileId); - out.writeInt(entry.rgId); - out.writeInt(entry.bitmap.length); - for (long l : entry.bitmap) + out.writeInt(totalRgs); + for (int i = 0; i < totalRgs; i++) { - out.writeLong(l); + CheckpointEntry entry = queue.take(); + out.writeLong(entry.fileId); + out.writeInt(entry.rgId); + out.writeInt(entry.recordNum); + out.writeInt(entry.bitmap.length); + for (long l : entry.bitmap) + { + out.writeLong(l); + } } + out.flush(); } - out.flush(); - } - long endWrite = System.currentTimeMillis(); - logger.info("Writing {} checkpoint file to {} took {} ms", type, filePath, (endWrite - startWrite)); + long endWrite = System.currentTimeMillis(); + logger.info("Writing {} checkpoint file to {} took {} ms", type, filePath, (endWrite - startWrite)); - if (type == CheckpointType.OFFLOAD) - { - offloadedCheckpoints.put(timestamp, filePath); - } else - { - long oldGcTs = this.latestGcTimestamp; - this.latestGcTimestamp = timestamp; - if (oldGcTs != -1 && oldGcTs != timestamp) + if (type == CheckpointType.OFFLOAD) + { + offloadedCheckpoints.put(timestamp, filePath); + } else { - removeCheckpointFile(oldGcTs, CheckpointType.GC); + long oldGcTs = this.latestGcTimestamp; + this.latestGcTimestamp = timestamp; + if (oldGcTs != -1 && oldGcTs != timestamp) + { + removeCheckpointFile(oldGcTs, CheckpointType.GC); + } } - } - } catch (Exception e) - { - logger.error("Failed to commit {} checkpoint file for timestamp: {}", type, timestamp, e); - // Try to cleanup the potentially corrupted or partial file - try - { - StorageFactory.Instance().getStorage(filePath).delete(filePath, false); - } catch (IOException ignored) + } catch (Exception e) { + logger.error("Failed to commit {} checkpoint file for timestamp: {}", type, timestamp, e); + // Try to cleanup the potentially corrupted or partial file + try + { + StorageFactory.Instance().getStorage(filePath).delete(filePath, false); + } catch (IOException ignored) + { + } + throw new CompletionException(e); } - throw new CompletionException(e); } }); } @@ -545,8 +419,8 @@ private CompletableFuture createCheckpoint(long timestamp, CheckpointType private void removeCheckpointFile(long timestamp, CheckpointType type) { - String prefix = (type == CheckpointType.GC) ? "vis_gc_" : "vis_offload_"; - String fileName = prefix + retinaHostName + "_" + timestamp + ".bin"; + String prefix = (type == CheckpointType.GC) ? RetinaUtils.CHECKPOINT_PREFIX_GC : RetinaUtils.CHECKPOINT_PREFIX_OFFLOAD; + String fileName = RetinaUtils.getCheckpointFileName(prefix, retinaHostName, timestamp); String path = checkpointDir.endsWith("/") ? checkpointDir + fileName : checkpointDir + "/" + fileName; try @@ -793,4 +667,102 @@ private void runGC() logger.error("Error while running GC", e); } } + + public void recoverCheckpoints() + { + try + { + Storage storage = StorageFactory.Instance().getStorage(checkpointDir); + if (!storage.exists(checkpointDir)) + { + storage.mkdirs(checkpointDir); + return; + } + + List allFiles = storage.listPaths(checkpointDir); + // filter only .bin files + allFiles = allFiles.stream().filter(p -> p.endsWith(".bin")).collect(Collectors.toList()); + + List gcTimestamps = new ArrayList<>(); + String offloadPrefix = RetinaUtils.getCheckpointPrefix(RetinaUtils.CHECKPOINT_PREFIX_OFFLOAD, retinaHostName); + String gcPrefix = RetinaUtils.getCheckpointPrefix(RetinaUtils.CHECKPOINT_PREFIX_GC, retinaHostName); + + for (String path : allFiles) + { + // use Paths.get().getFileName() to extract filename from path string + String filename = Paths.get(path).getFileName().toString(); + if (filename.startsWith(offloadPrefix)) + { + // delete offload checkpoint files when restarting + try + { + storage.delete(path, false); + } catch (IOException e) + { + logger.error("Failed to delete checkpoint file {}", path, e); + } + } else if (filename.startsWith(gcPrefix)) + { + try + { + gcTimestamps.add(Long.parseLong(filename.replace(gcPrefix, "").replace(".bin", ""))); + } catch (Exception e) + { + logger.error("Failed to parse checkpoint timestamp from file {}", path, e); + } + } + } + + if (gcTimestamps.isEmpty()) + { + return; + } + + Collections.sort(gcTimestamps); + long latestTs = gcTimestamps.get(gcTimestamps.size() - 1); + this.latestGcTimestamp = latestTs; + logger.info("Loading system state from GC checkpoint: {}", latestTs); + + // load to rgVisibilityMap + String fileName = RetinaUtils.getCheckpointFileName(RetinaUtils.CHECKPOINT_PREFIX_GC, retinaHostName, latestTs); + String latestPath = checkpointDir.endsWith("/") ? checkpointDir + fileName : checkpointDir + "/" + fileName; + + try + { + Storage latestStorage = StorageFactory.Instance().getStorage(latestPath); + if (latestStorage.exists(latestPath)) + { + try (DataInputStream in = latestStorage.open(latestPath)) + { + int rgCount = in.readInt(); + for (int i = 0; i < rgCount; i++) + { + long fileId = in.readLong(); + int rgId = in.readInt(); + int recordNum = in.readInt(); + int len = in.readInt(); + long[] bitmap = new long[len]; + for (int j = 0; j < len; j++) + { + bitmap[j] = in.readLong(); + } + rgVisibilityMap.put(fileId + "_" + rgId, new RGVisibility(recordNum, latestTs, bitmap)); + } + } + } + } catch (IOException e) + { + logger.error("Failed to read checkpoint file: {}", e); + } + + // delete old GC checkpoint files + for (int i = 0; i < gcTimestamps.size() - 1; i++) + { + removeCheckpointFile(gcTimestamps.get(i), CheckpointType.GC); + } + } catch (IOException e) + { + logger.error("Failed to recover checkpoints", e); + } + } } diff --git a/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java b/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java index 4a36223477..b1fab563a0 100644 --- a/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java +++ b/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java @@ -23,6 +23,7 @@ import io.pixelsdb.pixels.common.physical.Storage; import io.pixelsdb.pixels.common.physical.StorageFactory; import io.pixelsdb.pixels.common.utils.ConfigFactory; +import io.pixelsdb.pixels.common.utils.RetinaUtils; import org.junit.Before; import org.junit.Test; @@ -91,11 +92,11 @@ private String resolve(String dir, String filename) { } private String getOffloadFileName(long timestamp) { - return "vis_offload_" + hostName + "_" + timestamp + ".bin"; + return RetinaUtils.getCheckpointFileName(RetinaUtils.CHECKPOINT_PREFIX_OFFLOAD, hostName, timestamp); } private String getGcFileName(long timestamp) { - return "vis_gc_" + hostName + "_" + timestamp + ".bin"; + return RetinaUtils.getCheckpointFileName(RetinaUtils.CHECKPOINT_PREFIX_GC, hostName, timestamp); } @Test @@ -177,7 +178,7 @@ public void testCheckpointRecovery() throws RetinaException, IOException // 3. Rename offload file to GC file to simulate checkpoint generated by GC String gcPath = resolve(testCheckpointDir, getGcFileName(timestamp)); System.out.println("Simulating GC checkpoint by renaming offload file to: " + gcPath); - // Storage interface doesn't have renamed, using copy and delete + // Storage interface doesn't have rename, using copy and delete try (DataInputStream in = storage.open(offloadPath); DataOutputStream out = storage.create(gcPath, true, 4096)) { @@ -196,18 +197,126 @@ public void testCheckpointRecovery() throws RetinaException, IOException // 5. Perform recovery System.out.println("Running recoverCheckpoints()..."); - // At this point rgVisibilityMap is empty, need to call recoverCheckpoints to load data into cache + // At this point rgVisibilityMap is empty, recoverCheckpoints will load data directly into rgVisibilityMap retinaManager.recoverCheckpoints(); - // 6. Re-add Visibility, at this point it should recover state from recoveryCache instead of creating new - System.out.println("Re-adding visibility for file (should trigger recovery from cache)..."); - retinaManager.addVisibility(fileId, rgId, numRows); - - // 7. Verify recovered state: Row 10 should still be in deleted state + // 6. Verify recovered state immediately after recovery + System.out.println("Verifying recovered state immediately after recoverCheckpoints()..."); long[] recoveredBitmap = retinaManager.queryVisibility(fileId, rgId, timestamp); - assertTrue("Row 10 should still be deleted after recovery", isBitSet(recoveredBitmap, rowToDelete)); + assertTrue("Row 10 should be deleted after recovery", isBitSet(recoveredBitmap, rowToDelete)); assertFalse("Row 11 should not be deleted", isBitSet(recoveredBitmap, rowToDelete + 1)); - System.out.println("Verified: Recovery successful, row state restored. testCheckpointRecovery passed."); + + // 7. Re-add Visibility, at this point it should see that it already exists in rgVisibilityMap + System.out.println("Re-adding visibility for file (should skip as it already exists)..."); + retinaManager.addVisibility(fileId, rgId, numRows); + + // 8. Verify state still correct + long[] finalBitmap = retinaManager.queryVisibility(fileId, rgId, timestamp); + assertTrue("Row 10 should still be deleted", isBitSet(finalBitmap, rowToDelete)); + System.out.println("Verified: Recovery successful, row state restored directly to map. testCheckpointRecovery passed."); + } + + @Test + public void testCheckpointRetryAfterFailure() throws RetinaException, IOException + { + System.out.println("\n[Test] Starting testCheckpointRetryAfterFailure..."); + retinaManager.addVisibility(fileId, rgId, numRows); + long timestamp = 123L; + + String expectedFile = resolve(testCheckpointDir, getOffloadFileName(timestamp)); + + // 1. Pre-create a DIRECTORY with the same name to cause creation failure + storage.mkdirs(expectedFile); + System.out.println("Created a directory at " + expectedFile + " to simulate failure."); + + // 2. Try to register offload - should fail + try + { + retinaManager.registerOffload(timestamp); + assertTrue("Should have thrown an exception", false); + } catch (RetinaException e) + { + System.out.println("Expected failure occurred: " + e.getMessage()); + } + + // 3. Remove the directory + storage.delete(expectedFile, true); + assertFalse("Directory should be removed", storage.exists(expectedFile)); + + // 4. Try again - should succeed now because we clear failed futures + System.out.println("Retrying registration..."); + retinaManager.registerOffload(timestamp); + + assertTrue("Offload checkpoint file should exist after retry", storage.exists(expectedFile)); + System.out.println("Verified: Retry successful. testCheckpointRetryAfterFailure passed."); + } + + @Test + public void testMultiRGCheckpoint() throws RetinaException, IOException + { + System.out.println("\n[Test] Starting testMultiRGCheckpoint..."); + int numRgs = 3; + for (int i = 0; i < numRgs; i++) + { + retinaManager.addVisibility(fileId, i, numRows); + } + long timestamp = 200L; + + // Delete records in different RGs + retinaManager.deleteRecord(fileId, 0, 10, timestamp); + retinaManager.deleteRecord(fileId, 1, 20, timestamp); + retinaManager.deleteRecord(fileId, 2, 30, timestamp); + + // Create checkpoint + retinaManager.registerOffload(timestamp); + String offloadPath = resolve(testCheckpointDir, getOffloadFileName(timestamp)); + + // Simulating GC checkpoint for recovery + String gcPath = resolve(testCheckpointDir, getGcFileName(timestamp)); + try (DataInputStream in = storage.open(offloadPath); + DataOutputStream out = storage.create(gcPath, true, 4096)) + { + byte[] buffer = new byte[4096]; + int bytesRead; + while ((bytesRead = in.read(buffer)) != -1) + { + out.write(buffer, 0, bytesRead); + } + } + + // Reset and recover + resetSingletonState(); + retinaManager.recoverCheckpoints(); + + // Verify all RGs + assertTrue("RG 0 row 10 should be deleted", isBitSet(retinaManager.queryVisibility(fileId, 0, timestamp), 10)); + assertTrue("RG 1 row 20 should be deleted", isBitSet(retinaManager.queryVisibility(fileId, 1, timestamp), 20)); + assertTrue("RG 2 row 30 should be deleted", isBitSet(retinaManager.queryVisibility(fileId, 2, timestamp), 30)); + + System.out.println("Verified: Multi-RG state correctly restored. testMultiRGCheckpoint passed."); + } + + @Test + public void testCheckpointDataIntegrity() throws RetinaException, IOException + { + System.out.println("\n[Test] Starting testCheckpointDataIntegrity..."); + int numRgs = 5; + for (int i = 0; i < numRgs; i++) + { + retinaManager.addVisibility(fileId, i, numRows); + } + long timestamp = 300L; + + retinaManager.registerOffload(timestamp); + String path = resolve(testCheckpointDir, getOffloadFileName(timestamp)); + + // Directly read file to verify header + try (DataInputStream in = storage.open(path)) + { + int savedRgs = in.readInt(); + assertTrue("Saved RG count " + savedRgs + " should match " + numRgs, savedRgs == numRgs); + } + System.out.println("Verified: Data integrity (header) is correct. testCheckpointDataIntegrity passed."); } @Test @@ -302,10 +411,6 @@ private void resetSingletonState() gcTimestampField.setAccessible(true); gcTimestampField.setLong(retinaManager, -1L); - Field recoveryCacheField = RetinaResourceManager.class.getDeclaredField("recoveryCache"); - recoveryCacheField.setAccessible(true); - ((Map) recoveryCacheField.get(retinaManager)).clear(); - } catch (Exception e) { throw new RuntimeException("Failed to reset singleton state", e); From 2d150d81a1f4f9578c91506a4f4fe655b933448b Mon Sep 17 00:00:00 2001 From: Dongyang Geng Date: Wed, 11 Feb 2026 12:41:50 +0800 Subject: [PATCH 15/15] fix: restore the modifications made for testing --- pom.xml | 2 +- scripts/test_checkpoint_performance.py | 110 ------------------------- 2 files changed, 1 insertion(+), 111 deletions(-) delete mode 100644 scripts/test_checkpoint_performance.py diff --git a/pom.xml b/pom.xml index 91d6a33506..78011c6a56 100644 --- a/pom.xml +++ b/pom.xml @@ -1029,7 +1029,7 @@ maven-surefire-plugin ${maven.plugin.surefire.version} - false + true diff --git a/scripts/test_checkpoint_performance.py b/scripts/test_checkpoint_performance.py deleted file mode 100644 index 00d99dbf5b..0000000000 --- a/scripts/test_checkpoint_performance.py +++ /dev/null @@ -1,110 +0,0 @@ -import subprocess -import re -import csv -import os - -# 配置 -TEST_FILE = "pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java" -RATIOS = [0.01, 0.05, 0.10, 0.20, 0.50] -OUTPUT_CSV = "checkpoint_perf_results.csv" - -# 正则表达式用于提取结果 -PATTERNS = { - "Target Delete Ratio": r"Target Delete Ratio: ([\d.]+)%", - "Actual Ratio": r"Actual Ratio: ([\d.]+)%", - "Offload Time (ms)": r"Total Offload Time:\s+([\d.]+) ms", - "File Size (MB)": r"Checkpoint File Size:\s+([\d.]+) MB", - "Offload Peak Mem (MB)": r"Offload Peak Mem Overhead:\s+([\d.]+) MB", - "Write Throughput (MB/s)": r"Write Throughput:\s+([\d.]+) MB/s", - "Cold Load Time (ms)": r"First Load Time \(Cold\):\s+([\d.]+) ms", - "Load Memory (MB)": r"Load Memory Overhead:\s+([\d.]+) MB", - "Read Throughput (MB/s)": r"Read/Parse Throughput:\s+([\d.]+) MB/s", - "Avg Memory Hit Latency (ms)": r"Avg Memory Hit Latency:\s+([\d.]+) ms" -} - -def run_maven_test(ratio): - print(f"\n>>> Running test for Target Delete Ratio: {ratio*100:.2f}%") - - # 1. 修改源码中的 ratio - with open(TEST_FILE, 'r') as f: - content = f.read() - - new_content = re.sub( - r"double targetDeleteRatio = [\d.]+; // @TARGET_DELETE_RATIO@", - f"double targetDeleteRatio = {ratio}; // @TARGET_DELETE_RATIO@", - content - ) - - with open(TEST_FILE, 'w') as f: - f.write(new_content) - - # 2. 执行命令 - cmd = [ - "mvn", "test", - "-Dtest=TestRetinaCheckpoint#testCheckpointPerformance", - "-pl", "pixels-retina", - "-DargLine=-Xms40g -Xmx40g" - ] - env = os.environ.copy() - env["LD_PRELOAD"] = "/lib/x86_64-linux-gnu/libjemalloc.so.2" - - try: - process = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) - full_output = [] - for line in process.stdout: - print(line, end="") - full_output.append(line) - process.wait() - - if process.returncode != 0: - print(f"Error: Test failed for ratio {ratio}") - return None - - # 3. 解析输出 - output_str = "".join(full_output) - results = {"Target Delete Ratio": ratio * 100} - for key, pattern in PATTERNS.items(): - if key == "Target Delete Ratio": continue - match = re.search(pattern, output_str) - if match: - results[key] = match.group(1) - else: - results[key] = "N/A" - - return results - - except Exception as e: - print(f"Exception during test: {e}") - return None - -def main(): - all_results = [] - - # 备份原始文件 - with open(TEST_FILE, 'r') as f: - original_content = f.read() - - try: - for ratio in RATIOS: - res = run_maven_test(ratio) - if res: - all_results.append(res) - - # 写入 CSV - if all_results: - keys = all_results[0].keys() - with open(OUTPUT_CSV, 'w', newline='') as f: - dict_writer = csv.DictWriter(f, fieldnames=keys) - dict_writer.writeheader() - dict_writer.writerows(all_results) - print(f"\nSuccess! Results saved to {OUTPUT_CSV}") - else: - print("\nNo results collected.") - - finally: - # 还原原始文件 - with open(TEST_FILE, 'w') as f: - f.write(original_content) - -if __name__ == "__main__": - main()