Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -315,16 +315,65 @@ public boolean addVisibility(String filePath) throws RetinaException
return true;
}

public long[][] queryVisibility(long fileId, int[] rgIds, long timestamp) throws RetinaException
public static final class VisibilityResult
{
private final long[][] bitmaps;
private final String checkpointPath;

private VisibilityResult(long[][] bitmaps, String checkpointPath)
{
this.bitmaps = bitmaps;
this.checkpointPath = checkpointPath;
}

public static VisibilityResult fromBitmaps(long[][] bitmaps)
{
return new VisibilityResult(bitmaps, null);
}

public static VisibilityResult fromCheckpointPath(String path)
{
return new VisibilityResult(null, path);
}

public boolean isOffloaded()
{
return checkpointPath != null;
}

public long[][] getBitmaps()
{
if (isOffloaded())
{
throw new IllegalStateException("Data is offloaded to checkpoint, use getCheckpointPath() instead.");
}
return bitmaps;
}

public String getCheckpointPath()
{
return checkpointPath;
}
}

public VisibilityResult queryVisibility(long fileId, int[] rgIds, long timestamp) throws RetinaException
{
return queryVisibility(fileId, rgIds, timestamp, -1);
}

public VisibilityResult queryVisibility(long fileId, int[] rgIds, long timestamp, long transId) throws RetinaException
{
String token = UUID.randomUUID().toString();
RetinaProto.QueryVisibilityRequest request = RetinaProto.QueryVisibilityRequest.newBuilder()
RetinaProto.QueryVisibilityRequest.Builder requestBuilder = RetinaProto.QueryVisibilityRequest.newBuilder()
.setHeader(RetinaProto.RequestHeader.newBuilder().setToken(token).build())
.setFileId(fileId)
.addAllRgIds(Arrays.stream(rgIds).boxed().collect(Collectors.toList()))
.setTimestamp(timestamp)
.build();
RetinaProto.QueryVisibilityResponse response = this.stub.queryVisibility(request);
.setTimestamp(timestamp);
if (transId != -1)
{
requestBuilder.setTransId(transId);
}
RetinaProto.QueryVisibilityResponse response = this.stub.queryVisibility(requestBuilder.build());
if (response.getHeader().getErrorCode() != 0)
{
throw new RetinaException("Failed to query visibility: " + response.getHeader().getErrorCode()
Expand All @@ -334,13 +383,19 @@ public long[][] queryVisibility(long fileId, int[] rgIds, long timestamp) throws
{
throw new RetinaException("Response token does not match");
}

if (response.getCheckpointPath() != null && !response.getCheckpointPath().isEmpty())
{
return VisibilityResult.fromCheckpointPath(response.getCheckpointPath());
}

long[][] visibilityBitmaps = new long[rgIds.length][];
for (int i = 0; i < response.getBitmapsCount(); i++)
{
RetinaProto.VisibilityBitmap bitmap = response.getBitmaps(i);
visibilityBitmaps[i] = bitmap.getBitmapList().stream().mapToLong(Long::longValue).toArray();
}
return visibilityBitmaps;
return VisibilityResult.fromBitmaps(visibilityBitmaps);
}

public boolean reclaimVisibility(long fileId, int[] rgIds, long timestamp) throws RetinaException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@

public class RetinaUtils
{
public static final String CHECKPOINT_PREFIX_GC = "vis_gc_";
public static final String CHECKPOINT_PREFIX_OFFLOAD = "vis_offload_";
public static final String CHECKPOINT_SUFFIX = ".bin";

private static volatile RetinaUtils instance;
private final int bucketNum;
private final int defaultRetinaPort;
Expand Down Expand Up @@ -103,7 +107,7 @@ public static RetinaService getRetinaServiceFromBucketId(int bucketId)
return RetinaService.CreateInstance(retinaHost, getInstance().defaultRetinaPort);
}

public static RetinaService getRetinaServiceFromPath(String path)
public static RetinaService getRetinaServiceFromPath(String path)
{
String retinaHost = extractRetinaHostNameFromPath(path);
if(retinaHost == null || retinaHost.equals(Constants.LOAD_DEFAULT_RETINA_PREFIX))
Expand All @@ -113,6 +117,16 @@ public static RetinaService getRetinaServiceFromPath(String path)
return RetinaService.CreateInstance(retinaHost, getInstance().defaultRetinaPort);
}

public static String getCheckpointFileName(String prefix, String hostname, long timestamp)
{
return prefix + hostname + "_" + timestamp + CHECKPOINT_SUFFIX;
}

public static String getCheckpointPrefix(String typePrefix, String hostname)
{
return typePrefix + hostname + "_";
}

private static String extractRetinaHostNameFromPath(String path)
{
if (path == null || path.isEmpty()) {
Expand Down
4 changes: 4 additions & 0 deletions pixels-common/src/main/resources/pixels.properties
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ retina.buffer.flush.count=20
retina.buffer.flush.interval=30
# interval in seconds for retina visibility garbage
retina.gc.interval=300
# number of threads for retina checkpoint
retina.checkpoint.threads=4
# retina buffer reader prefetch threads num
retina.reader.prefetch.threads=8
# retina service init threads num
Expand All @@ -297,6 +299,8 @@ retina.metrics.log.interval=-1
retina.upsert-mode.enabled=false
# offloading threshold for long query in seconds
pixels.transaction.offload.threshold=1800
# lease duration for retina offload cache in seconds, default 600s
retina.offload.cache.lease.duration=600
# snapshot storage directory
pixels.retina.checkpoint.dir=file:///tmp/pixels-checkpoints

Expand Down
6 changes: 6 additions & 0 deletions pixels-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@
<scope>test</scope>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,21 @@ else if (predicate.matchesNone())
{
MetadataService metadataService = MetadataService.Instance();
long fileId = metadataService.getFileId(physicalReader.getPathUri());
rgVisibilityBitmaps = retinaService.queryVisibility(fileId, targetRGs, option.getTransTimestamp());
RetinaService.VisibilityResult result = retinaService.queryVisibility(fileId, targetRGs, option.getTransTimestamp(), option.getTransId());
if (result.isOffloaded())
{
String checkpointPath = result.getCheckpointPath();
rgVisibilityBitmaps = new long[targetRGNum][];
for (int i = 0; i < targetRGNum; i++)
{
rgVisibilityBitmaps[i] = VisibilityCheckpointCache.getInstance()
.getVisibilityBitmap(option.getTransTimestamp(), checkpointPath, fileId, targetRGs[i]);
}
}
else
{
rgVisibilityBitmaps = result.getBitmaps();
}
} catch (IOException e)
{
logger.error("Failed to get path uri");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright 2024 PixelsDB.
*
* This file is part of Pixels.
*
* Pixels is free software: you can redistribute it and/or modify
* it under the terms of the Affero GNU General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* Pixels is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Affero GNU General Public License for more details.
*
* You should have received a copy of the Affero GNU General Public
* License along with Pixels. If not, see
* <https://www.gnu.org/licenses/>.
*/
package io.pixelsdb.pixels.core.reader;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import io.pixelsdb.pixels.common.physical.PhysicalReader;
import io.pixelsdb.pixels.common.physical.PhysicalReaderUtil;
import io.pixelsdb.pixels.common.physical.Storage;
import io.pixelsdb.pixels.common.physical.StorageFactory;
import io.pixelsdb.pixels.common.utils.ConfigFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/**
* Client-side cache for visibility checkpoints.
* This cache is used by PixelsReader to avoid repeated loading of checkpoint files from storage.
*/
public class VisibilityCheckpointCache
{
private static final Logger logger = LogManager.getLogger(VisibilityCheckpointCache.class);
private static final VisibilityCheckpointCache instance = new VisibilityCheckpointCache();

private final Cache<Long, Map<String, long[]>> cache;

private VisibilityCheckpointCache()
{
long leaseDuration = Long.parseLong(ConfigFactory.Instance().getProperty("retina.offload.cache.lease.duration"));

this.cache = Caffeine.newBuilder()
.expireAfterAccess(leaseDuration, TimeUnit.SECONDS)
.removalListener((key, value, cause) -> {
logger.info("Client-side visibility cache for timestamp {} evicted due to {}", key, cause);
})
.build();
}

public static VisibilityCheckpointCache getInstance()
{
return instance;
}

public long[] getVisibilityBitmap(long timestamp, String checkpointPath, long targetFileId, int targetRgId) throws IOException
{
Map<String, long[]> timestampCache = cache.getIfPresent(timestamp);

if (timestampCache == null)
{
synchronized (this)
{
timestampCache = cache.getIfPresent(timestamp);
if (timestampCache == null)
{
timestampCache = loadCheckpointFile(checkpointPath);
cache.put(timestamp, timestampCache);
}
}
}

String rgKey = targetFileId + "_" + targetRgId;
return timestampCache.getOrDefault(rgKey, null);
}

private Map<String, long[]> loadCheckpointFile(String path) throws IOException
{
long start = System.currentTimeMillis();
Storage storage = StorageFactory.Instance().getStorage(path);
long fileLength = storage.getStatus(path).getLength();

byte[] content;
try (PhysicalReader reader = PhysicalReaderUtil.newPhysicalReader(storage, path))
{
ByteBuffer buffer = reader.readFully((int) fileLength);
if (buffer.hasArray())
{
content = buffer.array();
}
else
{
content = new byte[(int) fileLength];
buffer.get(content);
}
}

Map<String, long[]> timestampCache;
try (DataInputStream in = new DataInputStream(new ByteArrayInputStream(content)))
{
int rgCount = in.readInt();
// Initial capacity: count / 0.75 + 1 to avoid rehash
timestampCache = new ConcurrentHashMap<>((int) (rgCount / 0.75) + 1);

for (int i = 0; i < rgCount; i++)
{
long fileId = in.readLong();
int rgId = in.readInt();
int len = in.readInt();
long[] bitmap = new long[len];
for (int j = 0; j < len; j++)
{
bitmap[j] = in.readLong();
}
timestampCache.put(fileId + "_" + rgId, bitmap);
}
}
long end = System.currentTimeMillis();
logger.info("Loaded visibility checkpoint from {} in {} ms, RG count: {}", path, (end - start), timestampCache.size());
return timestampCache;
}
}
Loading