Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
0.2.0
-----
* Fix LEAK DETECTED errors during bulk read (CASSANALYTICS-87)
* Create bridge modules for Cassandra 5.0 (CASSANALYTICS-84)
* Analytics job fails when source table has secondary indexes (CASSANALYTICS-86)
* Set KeyStore to be optional (CASSANALYTICS-69)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import org.apache.cassandra.spark.reader.CompactionStreamScanner;
import org.apache.cassandra.spark.reader.IndexEntry;
import org.apache.cassandra.spark.reader.IndexReader;
import org.apache.cassandra.spark.reader.IndexSummaryComponent;
import org.apache.cassandra.spark.reader.ReaderUtils;
import org.apache.cassandra.spark.reader.RowData;
import org.apache.cassandra.spark.reader.SchemaBuilder;
Expand Down Expand Up @@ -567,7 +568,7 @@ protected SSTableSummary getSSTableSummary(@NotNull IPartitioner partitioner,
{
try
{
SummaryDbUtils.Summary summary = SummaryDbUtils.readSummary(ssTable, partitioner, minIndexInterval, maxIndexInterval);
IndexSummaryComponent summary = SummaryDbUtils.readSummary(ssTable, partitioner, minIndexInterval, maxIndexInterval);
Pair<DecoratedKey, DecoratedKey> keys = summary == null ? null : Pair.of(summary.first(), summary.last());
if (summary == null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public static Long findDataDbOffset(@NotNull IndexSummary indexSummary,
@NotNull SSTable ssTable,
@NotNull Stats stats) throws IOException
{
long searchStartOffset = SummaryDbUtils.findIndexOffsetInSummary(indexSummary, partitioner, range.firstEnclosedValue());
long searchStartOffset = findIndexOffsetInSummary(indexSummary, partitioner, range.firstEnclosedValue());

// Open the Index.db, skip to nearest offset found in Summary.db and find start & end offset for the Data.db file
return findDataDbOffset(range, partitioner, ssTable, stats, searchStartOffset);
Expand Down Expand Up @@ -171,4 +171,50 @@ static BigInteger readNextToken(@NotNull IPartitioner partitioner,
stats.readPartitionIndexDb((ByteBuffer) key.rewind(), token);
return token;
}

/**
* Binary search Summary.db to find nearest offset in Index.db that precedes the token we are looking for
*
* @param summary IndexSummary from Summary.db file
* @param partitioner Cassandra partitioner to hash partition keys to token
* @param token the token we are trying to find
* @return offset into the Index.db file for the closest to partition in the Summary.db file that precedes the token we are looking for
*/
public static long findIndexOffsetInSummary(IndexSummary summary, IPartitioner partitioner, BigInteger token)
{
return summary.getPosition(binarySearchSummary(summary, partitioner, token));
}

/**
* The class is private on purpose.
* Think carefully if you want to open up the access modifier from private to public.
* IndexSummary's underlying memory could be released. You do not want to leak the reference and get segment fault.
*/
private static class IndexSummaryTokenList implements SummaryDbUtils.TokenList
{
final IPartitioner partitioner;
final IndexSummary summary;

IndexSummaryTokenList(IPartitioner partitioner,
IndexSummary summary)
{
this.partitioner = partitioner;
this.summary = summary;
}

public int size()
{
return summary.size();
}

public BigInteger tokenAt(int index)
{
return ReaderUtils.tokenToBigInteger(partitioner.decorateKey(ByteBuffer.wrap(summary.getKey(index))).getToken());
}
}

public static int binarySearchSummary(IndexSummary summary, IPartitioner partitioner, BigInteger token)
{
return SummaryDbUtils.binarySearchSummary(new IndexSummaryTokenList(partitioner, summary), token);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.sstable.indexsummary.IndexSummary;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.spark.data.FileType;
Expand Down Expand Up @@ -73,7 +74,7 @@ public IndexReader(@NotNull SSTable ssTable,
now = System.nanoTime();
if (rangeFilter != null)
{
SummaryDbUtils.Summary summary = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable);
IndexSummaryComponent summary = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable);
if (summary != null)
{
this.ssTableRange = TokenRange.closed(ReaderUtils.tokenToBigInteger(summary.first().getToken()),
Expand All @@ -87,10 +88,13 @@ public IndexReader(@NotNull SSTable ssTable,
return;
}

skipAhead = summary.summary().getPosition(
SummaryDbUtils.binarySearchSummary(summary.summary(), metadata.partitioner, rangeFilter.tokenRange().firstEnclosedValue())
);
stats.indexSummaryFileRead(System.nanoTime() - now);
try (IndexSummary indexSummary = summary.summarySharedCopy())
{
skipAhead = indexSummary.getPosition(
IndexDbUtils.binarySearchSummary(indexSummary, metadata.partitioner, rangeFilter.tokenRange().firstEnclosedValue())
);
stats.indexSummaryFileRead(System.nanoTime() - now);
}
now = System.nanoTime();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.cassandra.spark.reader;

import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;

import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.sstable.indexsummary.IndexSummary;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.RebufferingChannelInputStream;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.jetbrains.annotations.Nullable;

public class IndexSummaryComponent implements AutoCloseable
{
private final IndexSummary indexSummary;
private final DecoratedKey firstKey;
private final DecoratedKey lastKey;

/**
* Read and deserialize the Summary.db file
*
* @param summaryStream input stream for Summary.db file
* @param partitioner token partitioner
* @param minIndexInterval min index interval
* @param maxIndexInterval max index interval
* @return Summary object
* @throws IOException io exception
*/
@Nullable
static IndexSummaryComponent readSummary(InputStream summaryStream,
IPartitioner partitioner,
int minIndexInterval,
int maxIndexInterval) throws IOException
{
if (summaryStream == null)
{
return null;
}

int bufferSize = ReaderUtils.inputStreamBufferSize(summaryStream);
try (DataInputStream is = new DataInputStream(summaryStream);
DataInputPlus.DataInputStreamPlus dis = new RebufferingChannelInputStream(is, bufferSize))
{
IndexSummary indexSummary = IndexSummary.serializer.deserialize(dis, partitioner, minIndexInterval, maxIndexInterval);
DecoratedKey firstKey = partitioner.decorateKey(ByteBufferUtil.readWithLength(dis));
DecoratedKey lastKey = partitioner.decorateKey(ByteBufferUtil.readWithLength(dis));
return new IndexSummaryComponent(indexSummary, firstKey, lastKey);
}
}

IndexSummaryComponent(IndexSummary indexSummary,
DecoratedKey firstKey,
DecoratedKey lastKey)
{
this.indexSummary = indexSummary;
this.firstKey = firstKey;
this.lastKey = lastKey;
}

/**
* Get a shared copy of the IndexSummary, whose reference count is incremented.
* It is important to close the shared copy to decrement the reference count.
* @return a shared copy of the IndexSummary object
*/
public IndexSummary summarySharedCopy()
{
return indexSummary.sharedCopy();
}

public DecoratedKey first()
{
return firstKey;
}

public DecoratedKey last()
{
return lastKey;
}

@Override // The method is expected to be called when evicting the object from sstable cache; do not call it explicitly.
public void close() throws Exception
{
indexSummary.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,14 +249,16 @@ public SSTableReader(@NotNull TableMetadata metadata,
Descriptor descriptor = ReaderUtils.constructDescriptor(metadata.keyspace, metadata.name, ssTable);
this.version = descriptor.version;

SummaryDbUtils.Summary summary = null;
IndexSummaryComponent summary = null;
Pair<DecoratedKey, DecoratedKey> keys = null;
IndexSummary indexSummary = null; // indexSummary is only assigned when readIndexOffset is enabled
try
{
now = System.nanoTime();
summary = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable);
stats.readSummaryDb(ssTable, System.nanoTime() - now);
keys = Pair.of(summary.first(), summary.last());
indexSummary = readIndexOffset ? summary.summarySharedCopy() : null;
}
catch (IOException exception)
{
Expand Down Expand Up @@ -390,11 +392,13 @@ public SSTableReader(@NotNull TableMetadata metadata,
buildColumnFilter(metadata, columnFilter));
this.metadata = metadata;

if (readIndexOffset && summary != null)
if (indexSummary != null)
{
SummaryDbUtils.Summary finalSummary = summary;
extractRange(sparkRangeFilter, partitionKeyFilters)
.ifPresent(range -> readOffsets(finalSummary.summary(), range));
try (IndexSummary indexSummaryCopy = indexSummary)
{
extractRange(sparkRangeFilter, partitionKeyFilters)
.ifPresent(range -> readOffsets(indexSummaryCopy, range));
}
}
else
{
Expand Down
Loading