diff --git a/CHANGES.txt b/CHANGES.txt index a3698b71e..5b8a72589 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 0.2.0 ----- + * Fix LEAK DETECTED errors during bulk read (CASSANALYTICS-87) * Create bridge modules for Cassandra 5.0 (CASSANALYTICS-84) * Analytics job fails when source table has secondary indexes (CASSANALYTICS-86) * Set KeyStore to be optional (CASSANALYTICS-69) diff --git a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java index 4ed69f401..b72288575 100644 --- a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java +++ b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java @@ -92,6 +92,7 @@ import org.apache.cassandra.spark.reader.CompactionStreamScanner; import org.apache.cassandra.spark.reader.IndexEntry; import org.apache.cassandra.spark.reader.IndexReader; +import org.apache.cassandra.spark.reader.IndexSummaryComponent; import org.apache.cassandra.spark.reader.ReaderUtils; import org.apache.cassandra.spark.reader.RowData; import org.apache.cassandra.spark.reader.SchemaBuilder; @@ -567,7 +568,7 @@ protected SSTableSummary getSSTableSummary(@NotNull IPartitioner partitioner, { try { - SummaryDbUtils.Summary summary = SummaryDbUtils.readSummary(ssTable, partitioner, minIndexInterval, maxIndexInterval); + IndexSummaryComponent summary = SummaryDbUtils.readSummary(ssTable, partitioner, minIndexInterval, maxIndexInterval); Pair keys = summary == null ? null : Pair.of(summary.first(), summary.last()); if (summary == null) { diff --git a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexDbUtils.java b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexDbUtils.java index 4dd993e01..5e1981491 100644 --- a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexDbUtils.java +++ b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexDbUtils.java @@ -53,7 +53,7 @@ public static Long findDataDbOffset(@NotNull IndexSummary indexSummary, @NotNull SSTable ssTable, @NotNull Stats stats) throws IOException { - long searchStartOffset = SummaryDbUtils.findIndexOffsetInSummary(indexSummary, partitioner, range.firstEnclosedValue()); + long searchStartOffset = findIndexOffsetInSummary(indexSummary, partitioner, range.firstEnclosedValue()); // Open the Index.db, skip to nearest offset found in Summary.db and find start & end offset for the Data.db file return findDataDbOffset(range, partitioner, ssTable, stats, searchStartOffset); @@ -171,4 +171,50 @@ static BigInteger readNextToken(@NotNull IPartitioner partitioner, stats.readPartitionIndexDb((ByteBuffer) key.rewind(), token); return token; } + + /** + * Binary search Summary.db to find nearest offset in Index.db that precedes the token we are looking for + * + * @param summary IndexSummary from Summary.db file + * @param partitioner Cassandra partitioner to hash partition keys to token + * @param token the token we are trying to find + * @return offset into the Index.db file for the closest to partition in the Summary.db file that precedes the token we are looking for + */ + public static long findIndexOffsetInSummary(IndexSummary summary, IPartitioner partitioner, BigInteger token) + { + return summary.getPosition(binarySearchSummary(summary, partitioner, token)); + } + + /** + * The class is private on purpose. + * Think carefully if you want to open up the access modifier from private to public. + * IndexSummary's underlying memory could be released. You do not want to leak the reference and get segment fault. + */ + private static class IndexSummaryTokenList implements SummaryDbUtils.TokenList + { + final IPartitioner partitioner; + final IndexSummary summary; + + IndexSummaryTokenList(IPartitioner partitioner, + IndexSummary summary) + { + this.partitioner = partitioner; + this.summary = summary; + } + + public int size() + { + return summary.size(); + } + + public BigInteger tokenAt(int index) + { + return ReaderUtils.tokenToBigInteger(partitioner.decorateKey(ByteBuffer.wrap(summary.getKey(index))).getToken()); + } + } + + public static int binarySearchSummary(IndexSummary summary, IPartitioner partitioner, BigInteger token) + { + return SummaryDbUtils.binarySearchSummary(new IndexSummaryTokenList(partitioner, summary), token); + } } diff --git a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java index f13a39043..9a1c62692 100644 --- a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java +++ b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java @@ -34,6 +34,7 @@ import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.sstable.indexsummary.IndexSummary; import org.apache.cassandra.io.util.File; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.spark.data.FileType; @@ -73,7 +74,7 @@ public IndexReader(@NotNull SSTable ssTable, now = System.nanoTime(); if (rangeFilter != null) { - SummaryDbUtils.Summary summary = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable); + IndexSummaryComponent summary = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable); if (summary != null) { this.ssTableRange = TokenRange.closed(ReaderUtils.tokenToBigInteger(summary.first().getToken()), @@ -87,10 +88,13 @@ public IndexReader(@NotNull SSTable ssTable, return; } - skipAhead = summary.summary().getPosition( - SummaryDbUtils.binarySearchSummary(summary.summary(), metadata.partitioner, rangeFilter.tokenRange().firstEnclosedValue()) - ); - stats.indexSummaryFileRead(System.nanoTime() - now); + try (IndexSummary indexSummary = summary.summarySharedCopy()) + { + skipAhead = indexSummary.getPosition( + IndexDbUtils.binarySearchSummary(indexSummary, metadata.partitioner, rangeFilter.tokenRange().firstEnclosedValue()) + ); + stats.indexSummaryFileRead(System.nanoTime() - now); + } now = System.nanoTime(); } } diff --git a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexSummaryComponent.java b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexSummaryComponent.java new file mode 100644 index 000000000..414cfb61a --- /dev/null +++ b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexSummaryComponent.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.reader; + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.io.sstable.indexsummary.IndexSummary; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.RebufferingChannelInputStream; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.jetbrains.annotations.Nullable; + +public class IndexSummaryComponent implements AutoCloseable +{ + private final IndexSummary indexSummary; + private final DecoratedKey firstKey; + private final DecoratedKey lastKey; + + /** + * Read and deserialize the Summary.db file + * + * @param summaryStream input stream for Summary.db file + * @param partitioner token partitioner + * @param minIndexInterval min index interval + * @param maxIndexInterval max index interval + * @return Summary object + * @throws IOException io exception + */ + @Nullable + static IndexSummaryComponent readSummary(InputStream summaryStream, + IPartitioner partitioner, + int minIndexInterval, + int maxIndexInterval) throws IOException + { + if (summaryStream == null) + { + return null; + } + + int bufferSize = ReaderUtils.inputStreamBufferSize(summaryStream); + try (DataInputStream is = new DataInputStream(summaryStream); + DataInputPlus.DataInputStreamPlus dis = new RebufferingChannelInputStream(is, bufferSize)) + { + IndexSummary indexSummary = IndexSummary.serializer.deserialize(dis, partitioner, minIndexInterval, maxIndexInterval); + DecoratedKey firstKey = partitioner.decorateKey(ByteBufferUtil.readWithLength(dis)); + DecoratedKey lastKey = partitioner.decorateKey(ByteBufferUtil.readWithLength(dis)); + return new IndexSummaryComponent(indexSummary, firstKey, lastKey); + } + } + + IndexSummaryComponent(IndexSummary indexSummary, + DecoratedKey firstKey, + DecoratedKey lastKey) + { + this.indexSummary = indexSummary; + this.firstKey = firstKey; + this.lastKey = lastKey; + } + + /** + * Get a shared copy of the IndexSummary, whose reference count is incremented. + * It is important to close the shared copy to decrement the reference count. + * @return a shared copy of the IndexSummary object + */ + public IndexSummary summarySharedCopy() + { + return indexSummary.sharedCopy(); + } + + public DecoratedKey first() + { + return firstKey; + } + + public DecoratedKey last() + { + return lastKey; + } + + @Override // The method is expected to be called when evicting the object from sstable cache; do not call it explicitly. + public void close() throws Exception + { + indexSummary.close(); + } +} diff --git a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java index dd4a59ca1..df085663b 100644 --- a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java +++ b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java @@ -249,14 +249,16 @@ public SSTableReader(@NotNull TableMetadata metadata, Descriptor descriptor = ReaderUtils.constructDescriptor(metadata.keyspace, metadata.name, ssTable); this.version = descriptor.version; - SummaryDbUtils.Summary summary = null; + IndexSummaryComponent summary = null; Pair keys = null; + IndexSummary indexSummary = null; // indexSummary is only assigned when readIndexOffset is enabled try { now = System.nanoTime(); summary = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable); stats.readSummaryDb(ssTable, System.nanoTime() - now); keys = Pair.of(summary.first(), summary.last()); + indexSummary = readIndexOffset ? summary.summarySharedCopy() : null; } catch (IOException exception) { @@ -390,11 +392,13 @@ public SSTableReader(@NotNull TableMetadata metadata, buildColumnFilter(metadata, columnFilter)); this.metadata = metadata; - if (readIndexOffset && summary != null) + if (indexSummary != null) { - SummaryDbUtils.Summary finalSummary = summary; - extractRange(sparkRangeFilter, partitionKeyFilters) - .ifPresent(range -> readOffsets(finalSummary.summary(), range)); + try (IndexSummary indexSummaryCopy = indexSummary) + { + extractRange(sparkRangeFilter, partitionKeyFilters) + .ifPresent(range -> readOffsets(indexSummaryCopy, range)); + } } else { diff --git a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java deleted file mode 100644 index a724035a1..000000000 --- a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.cassandra.spark.reader; - -import java.io.DataInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.math.BigInteger; -import java.nio.ByteBuffer; - -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.io.sstable.indexsummary.IndexSummary; -import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.io.util.RebufferingChannelInputStream; -import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.spark.data.SSTable; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; - -/** - * Helper methods for reading the Summary.db SSTable file component - */ -public final class SummaryDbUtils -{ - public static class Summary - { - private final IndexSummary indexSummary; - private final DecoratedKey firstKey; - private final DecoratedKey lastKey; - - Summary(IndexSummary indexSummary, - DecoratedKey firstKey, - DecoratedKey lastKey) - { - this.indexSummary = indexSummary; - this.firstKey = firstKey; - this.lastKey = lastKey; - } - - public IndexSummary summary() - { - return indexSummary; - } - - public DecoratedKey first() - { - return firstKey; - } - - public DecoratedKey last() - { - return lastKey; - } - } - - private SummaryDbUtils() - { - throw new IllegalStateException(getClass() + " is static utility class and shall not be instantiated"); - } - - public static Summary readSummary(@NotNull TableMetadata metadata, @NotNull SSTable ssTable) throws IOException - { - return readSummary(ssTable, metadata.partitioner, metadata.params.minIndexInterval, metadata.params.maxIndexInterval); - } - - @Nullable - public static Summary readSummary(@NotNull SSTable ssTable, IPartitioner partitioner, int minIndexInterval, int maxIndexInterval) throws IOException - { - try (InputStream in = ssTable.openSummaryStream()) - { - return readSummary(in, partitioner, minIndexInterval, maxIndexInterval); - } - } - - /** - * Read and deserialize the Summary.db file - * - * @param summaryStream input stream for Summary.db file - * @param partitioner token partitioner - * @param minIndexInterval min index interval - * @param maxIndexInterval max index interval - * @return Summary object - * @throws IOException io exception - */ - @Nullable - static Summary readSummary(InputStream summaryStream, - IPartitioner partitioner, - int minIndexInterval, - int maxIndexInterval) throws IOException - { - if (summaryStream == null) - { - return null; - } - - int bufferSize = ReaderUtils.inputStreamBufferSize(summaryStream); - try (DataInputStream is = new DataInputStream(summaryStream); - DataInputPlus.DataInputStreamPlus dis = new RebufferingChannelInputStream(is, bufferSize)) - { - IndexSummary indexSummary = IndexSummary.serializer.deserialize(dis, partitioner, minIndexInterval, maxIndexInterval); - DecoratedKey firstKey = partitioner.decorateKey(ByteBufferUtil.readWithLength(dis)); - DecoratedKey lastKey = partitioner.decorateKey(ByteBufferUtil.readWithLength(dis)); - return new Summary(indexSummary, firstKey, lastKey); - } - } - - public interface TokenList - { - int size(); - - BigInteger tokenAt(int index); - } - - /** - * Binary search Summary.db to find nearest offset in Index.db that precedes the token we are looking for - * - * @param summary IndexSummary from Summary.db file - * @param partitioner Cassandra partitioner to hash partition keys to token - * @param token the token we are trying to find - * @return offset into the Index.db file for the closest to partition in the Summary.db file that precedes the token we are looking for - */ - public static long findIndexOffsetInSummary(IndexSummary summary, IPartitioner partitioner, BigInteger token) - { - return summary.getPosition(binarySearchSummary(summary, partitioner, token)); - } - - public static class IndexSummaryTokenList implements TokenList - { - final IPartitioner partitioner; - final IndexSummary summary; - - IndexSummaryTokenList(IPartitioner partitioner, - IndexSummary summary) - { - this.partitioner = partitioner; - this.summary = summary; - } - - public int size() - { - return summary.size(); - } - - public BigInteger tokenAt(int index) - { - return ReaderUtils.tokenToBigInteger(partitioner.decorateKey(ByteBuffer.wrap(summary.getKey(index))).getToken()); - } - } - - public static int binarySearchSummary(IndexSummary summary, IPartitioner partitioner, BigInteger token) - { - return binarySearchSummary(new IndexSummaryTokenList(partitioner, summary), token); - } - - /** - * Binary search the Summary.db file to find nearest index offset in Index.db for a given token. - * Method lifted from org.apache.cassandra.io.sstable.IndexSummary.binarySearch(PartitionPosition key) and reworked for tokens. - * - * @param tokenList list of tokens to binary search - * @param token token to find - * @return closest offset in Index.db preceding token - */ - public static int binarySearchSummary(TokenList tokenList, BigInteger token) - { - int low = 0; - int mid = tokenList.size(); - int high = mid - 1; - int result = -1; - while (low <= high) - { - mid = low + high >> 1; - result = token.compareTo(tokenList.tokenAt(mid)); - if (result > 0) - { - low = mid + 1; - } - else if (result < 0) - { - high = mid - 1; - } - else - { - break; // Exact match - } - } - - // If: - // 1) result < 0: the token is less than nearest sampled token found at mid, so we need to start from mid - 1. - // 2) result == 0: we found an exact match for the token in the sample, - // but there may be token collisions in Data.db so start from mid -1 to be safe. - // 3) result > 0: the nearest sample token at mid is less than the token so we can start from that position. - return result <= 0 ? Math.max(0, mid - 1) : mid; - } -} diff --git a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/ReaderUtilsTests.java b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/ReaderUtilsTests.java index a9440f763..db90a44ba 100644 --- a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/ReaderUtilsTests.java +++ b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/ReaderUtilsTests.java @@ -165,10 +165,10 @@ public void testReadFirstLastPartitionKey() // Read Summary.db file for first and last partition keys from Summary.db Path summaryFile = TestSSTable.firstIn(directory.path(), FileType.SUMMARY); - SummaryDbUtils.Summary summaryKeys; + IndexSummaryComponent summaryKeys; try (InputStream in = new BufferedInputStream(Files.newInputStream(summaryFile))) { - summaryKeys = SummaryDbUtils.readSummary(in, Murmur3Partitioner.instance, 128, 2048); + summaryKeys = IndexSummaryComponent.readSummary(in, Murmur3Partitioner.instance, 128, 2048); } assertThat(summaryKeys).isNotNull(); assertThat(summaryKeys.first()).isNotNull(); diff --git a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java index 1fd7af85b..2b6a3aaf2 100644 --- a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java +++ b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java @@ -82,7 +82,7 @@ public void testCache() assertThat(SSTableCache.INSTANCE.containsStats(ssTable0)).isFalse(); assertThat(SSTableCache.INSTANCE.containsCompressionMetadata(ssTable0)).isFalse(); - SummaryDbUtils.Summary key1 = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable0); + IndexSummaryComponent key1 = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable0); if (ssTable0.isBigFormat()) { assertThat(key1).isNotNull(); @@ -161,7 +161,7 @@ public void testCache() assertThat(SSTableCache.INSTANCE.containsCompressionMetadata(ssTable1)).isFalse(); if (ssTable1.isBigFormat()) { - SummaryDbUtils.Summary key3 = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable1); + IndexSummaryComponent key3 = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable1); assertThat(key3.first()).isNotEqualTo(key1.first()); assertThat(key3.last()).isNotEqualTo(key1.last()); assertThat(SSTableCache.INSTANCE.keysFromIndex(metadata, ssTable1).left) diff --git a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java index fae43c532..f5a9c3816 100644 --- a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java +++ b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java @@ -393,10 +393,10 @@ public void testSkipNoPartitions() Path summaryFile = TestSSTable.firstIn(directory.path(), FileType.SUMMARY); try (InputStream in = new BufferedInputStream(Files.newInputStream(summaryFile))) { - SummaryDbUtils.Summary summary = SummaryDbUtils.readSummary(in, - metadata.partitioner, - metadata.params.minIndexInterval, - metadata.params.maxIndexInterval); + IndexSummaryComponent summary = IndexSummaryComponent.readSummary(in, + metadata.partitioner, + metadata.params.minIndexInterval, + metadata.params.maxIndexInterval); first = summary.first(); last = summary.last(); } diff --git a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SummaryDbTests.java b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SummaryDbTests.java index 5e787067c..b171f2803 100644 --- a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SummaryDbTests.java +++ b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SummaryDbTests.java @@ -34,6 +34,7 @@ import org.apache.cassandra.bridge.CassandraBridgeImplementation; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.io.sstable.indexsummary.IndexSummary; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.spark.TestUtils; @@ -117,20 +118,22 @@ public void testSearchSummary() assertThat(ssTable).as("Could not find SSTable").isNotNull(); // Binary search Summary.db file in token order and verify offsets are ordered - SummaryDbUtils.Summary summary = SummaryDbUtils.readSummary(metadata, ssTable); - long previous = -1; - for (BigInteger token : tokens) + try (IndexSummary indexSummary = SummaryDbUtils.readSummary(metadata, ssTable).summarySharedCopy()) { - long offset = SummaryDbUtils.findIndexOffsetInSummary(summary.summary(), iPartitioner, token); - if (previous < 0) + long previous = -1; + for (BigInteger token : tokens) { - assertThat(offset).isEqualTo(0); + long offset = IndexDbUtils.findIndexOffsetInSummary(indexSummary, iPartitioner, token); + if (previous < 0) + { + assertThat(offset).isEqualTo(0); + } + else + { + assertThat(previous).isLessThanOrEqualTo(offset); + } + previous = offset; } - else - { - assertThat(previous).isLessThanOrEqualTo(offset); - } - previous = offset; } } catch (IOException exception) diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java index 25a8954a1..12add5526 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java @@ -92,6 +92,7 @@ import org.apache.cassandra.spark.reader.CompactionStreamScanner; import org.apache.cassandra.spark.reader.IndexEntry; import org.apache.cassandra.spark.reader.IndexReader; +import org.apache.cassandra.spark.reader.IndexSummaryComponent; import org.apache.cassandra.spark.reader.ReaderUtils; import org.apache.cassandra.spark.reader.RowData; import org.apache.cassandra.spark.reader.SchemaBuilder; @@ -567,7 +568,7 @@ protected SSTableSummary getSSTableSummary(@NotNull IPartitioner partitioner, { try { - SummaryDbUtils.Summary summary = SummaryDbUtils.readSummary(ssTable, partitioner, minIndexInterval, maxIndexInterval); + IndexSummaryComponent summary = SummaryDbUtils.readSummary(ssTable, partitioner, minIndexInterval, maxIndexInterval); Pair keys = summary == null ? null : Pair.of(summary.first(), summary.last()); if (summary == null) { diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexDbUtils.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexDbUtils.java index d48892bbd..dc9b7f8c6 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexDbUtils.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexDbUtils.java @@ -53,7 +53,7 @@ public static Long findDataDbOffset(@NotNull IndexSummary indexSummary, @NotNull SSTable ssTable, @NotNull Stats stats) throws IOException { - long searchStartOffset = SummaryDbUtils.findIndexOffsetInSummary(indexSummary, partitioner, range.firstEnclosedValue()); + long searchStartOffset = findIndexOffsetInSummary(indexSummary, partitioner, range.firstEnclosedValue()); // Open the Index.db, skip to nearest offset found in Summary.db and find start & end offset for the Data.db file return findDataDbOffset(range, partitioner, ssTable, stats, searchStartOffset); @@ -171,4 +171,50 @@ static BigInteger readNextToken(@NotNull IPartitioner partitioner, stats.readPartitionIndexDb((ByteBuffer) key.rewind(), token); return token; } + + /** + * Binary search Summary.db to find nearest offset in Index.db that precedes the token we are looking for + * + * @param summary IndexSummary from Summary.db file + * @param partitioner Cassandra partitioner to hash partition keys to token + * @param token the token we are trying to find + * @return offset into the Index.db file for the closest to partition in the Summary.db file that precedes the token we are looking for + */ + public static long findIndexOffsetInSummary(IndexSummary summary, IPartitioner partitioner, BigInteger token) + { + return summary.getPosition(binarySearchSummary(summary, partitioner, token)); + } + + /** + * The class is private on purpose. + * Think carefully if you want to open up the access modifier from private to public. + * IndexSummary's underlying memory could be released. You do not want to leak the reference and get segment fault. + */ + private static class IndexSummaryTokenList implements SummaryDbUtils.TokenList + { + final IPartitioner partitioner; + final IndexSummary summary; + + IndexSummaryTokenList(IPartitioner partitioner, + IndexSummary summary) + { + this.partitioner = partitioner; + this.summary = summary; + } + + public int size() + { + return summary.size(); + } + + public BigInteger tokenAt(int index) + { + return ReaderUtils.tokenToBigInteger(partitioner.decorateKey(ByteBuffer.wrap(summary.getKey(index))).getToken()); + } + } + + public static int binarySearchSummary(IndexSummary summary, IPartitioner partitioner, BigInteger token) + { + return SummaryDbUtils.binarySearchSummary(new IndexSummaryTokenList(partitioner, summary), token); + } } diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java index c855b7af4..ed53c2be0 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java @@ -34,6 +34,7 @@ import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.IndexSummary; import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.spark.data.FileType; @@ -73,7 +74,7 @@ public IndexReader(@NotNull SSTable ssTable, now = System.nanoTime(); if (rangeFilter != null) { - SummaryDbUtils.Summary summary = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable); + IndexSummaryComponent summary = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable); if (summary != null) { this.ssTableRange = TokenRange.closed(ReaderUtils.tokenToBigInteger(summary.first().getToken()), @@ -87,10 +88,12 @@ public IndexReader(@NotNull SSTable ssTable, return; } - skipAhead = summary.summary().getPosition( - SummaryDbUtils.binarySearchSummary(summary.summary(), metadata.partitioner, rangeFilter.tokenRange().firstEnclosedValue()) - ); - stats.indexSummaryFileRead(System.nanoTime() - now); + try (IndexSummary indexSummary = summary.summarySharedCopy()) + { + skipAhead = indexSummary.getPosition( + IndexDbUtils.binarySearchSummary(indexSummary, metadata.partitioner, rangeFilter.tokenRange().firstEnclosedValue())); + stats.indexSummaryFileRead(System.nanoTime() - now); + } now = System.nanoTime(); } } diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexSummaryComponent.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexSummaryComponent.java new file mode 100644 index 000000000..f9995803a --- /dev/null +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexSummaryComponent.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.reader; + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.io.sstable.IndexSummary; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.jetbrains.annotations.Nullable; + +public class IndexSummaryComponent implements AutoCloseable +{ + private final IndexSummary indexSummary; + private final DecoratedKey firstKey; + private final DecoratedKey lastKey; + + /** + * Read and deserialize the Summary.db file + * + * @param summaryStream input stream for Summary.db file + * @param partitioner token partitioner + * @param minIndexInterval min index interval + * @param maxIndexInterval max index interval + * @return Summary object + * @throws IOException io exception + */ + @Nullable + static IndexSummaryComponent readSummary(InputStream summaryStream, + IPartitioner partitioner, + int minIndexInterval, + int maxIndexInterval) throws IOException + { + if (summaryStream == null) + { + return null; + } + + try (DataInputStream is = new DataInputStream(summaryStream)) + { + IndexSummary indexSummary = IndexSummary.serializer.deserialize(is, partitioner, minIndexInterval, maxIndexInterval); + DecoratedKey firstKey = partitioner.decorateKey(ByteBufferUtil.readWithLength(is)); + DecoratedKey lastKey = partitioner.decorateKey(ByteBufferUtil.readWithLength(is)); + return new IndexSummaryComponent(indexSummary, firstKey, lastKey); + } + } + + IndexSummaryComponent(IndexSummary indexSummary, + DecoratedKey firstKey, + DecoratedKey lastKey) + { + this.indexSummary = indexSummary; + this.firstKey = firstKey; + this.lastKey = lastKey; + } + + /** + * Get a shared copy of the IndexSummary, whose reference count is incremented. + * It is important to close the shared copy to decrement the reference count. + * @return a shared copy of the IndexSummary object + */ + public IndexSummary summarySharedCopy() + { + return indexSummary.sharedCopy(); + } + + public DecoratedKey first() + { + return firstKey; + } + + public DecoratedKey last() + { + return lastKey; + } + + @Override // The method is expected to be called when evicting the object from sstable cache; do not call it explicitly. + public void close() throws Exception + { + indexSummary.close(); + } +} diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java index 083a0ea2c..261295f24 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; +import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import org.slf4j.Logger; @@ -56,17 +57,24 @@ public class SSTableCache private static final Logger LOGGER = LoggerFactory.getLogger(SSTableCache.class); public static final SSTableCache INSTANCE = new SSTableCache(); - private final Cache summary = buildCache(propOrDefault("sbr.cache.summary.maxEntries", 4096), - propOrDefault("sbr.cache.summary.expireAfterMins", 15)); - private final Cache> index = buildCache(propOrDefault("sbr.cache.index.maxEntries", 128), - propOrDefault("sbr.cache.index.expireAfterMins", 60)); - private final Cache> stats = buildCache(propOrDefault("sbr.cache.stats.maxEntries", 16384), - propOrDefault("sbr.cache.stats.expireAfterMins", 60)); - private final Cache filter = buildCache(propOrDefault("sbr.cache.filter.maxEntries", 16384), - propOrDefault("sbr.cache.filter.expireAfterMins", 60)); + private final Cache summary = buildCache( + propOrDefault("sbr.cache.summary.maxEntries", 4096), + propOrDefault("sbr.cache.summary.expireAfterMins", 15)); + + private final Cache> index = buildCache( + propOrDefault("sbr.cache.index.maxEntries", 128), + propOrDefault("sbr.cache.index.expireAfterMins", 60)); + + private final Cache> stats = buildCache( + propOrDefault("sbr.cache.stats.maxEntries", 16384), + propOrDefault("sbr.cache.stats.expireAfterMins", 60)); + + private final Cache filter = buildCache( + propOrDefault("sbr.cache.filter.maxEntries", 16384), + propOrDefault("sbr.cache.filter.expireAfterMins", 60)); // if compression is disabled then the CompressionInfo.db file will not exist // therefore we can cache as Optional to a) avoid null errors in the cache and b) record that the component does not exist - private final Cache> compressionMetadata = buildCache( + private final Cache> compressionMetadata = buildCache( propOrDefault("sbr.cache.compressionInfo.maxEntries", 128), propOrDefault("sbr.cache.compressionInfo.expireAfterMins", 15)); @@ -107,11 +115,33 @@ private Cache buildCache(int size, int expireAfterMins) return CacheBuilder.newBuilder() .expireAfterAccess(expireAfterMins, TimeUnit.MINUTES) .maximumSize(size) + .removalListener(notification -> { + // The function is to eliminate the LEAK DETECTED errors. + // How it happens: + // 1. AutoCloseable objects (e.g. IndexSummary and BloomFilter) are evicted from cache + // 2. JVM GC and the close method is not called explicitly to reduce the reference count + // 3. Reference-Reaper thread release the object and print the LEAK DETECTED error + // The function fixes it by closing the object when evicting. + Object val = notification.getValue(); + if (val instanceof AutoCloseable) + { + String typeLiteral = val.getClass().getName(); + try + { + LOGGER.debug("Evicting auto-closable of type: {}", typeLiteral); + ((AutoCloseable) val).close(); + } + catch (Exception e) + { + LOGGER.error("Exception closing cached instance of {}", typeLiteral, e); + } + } + }) .build(); } - public SummaryDbUtils.Summary keysFromSummary(@NotNull TableMetadata metadata, - @NotNull SSTable ssTable) throws IOException + public IndexSummaryComponent keysFromSummary(@NotNull TableMetadata metadata, + @NotNull SSTable ssTable) throws IOException { return get(summary, ssTable, () -> SummaryDbUtils.readSummary(metadata, ssTable)); } @@ -211,4 +241,14 @@ private static IOException toIOException(Throwable throwable) IOException ioException = ThrowableUtils.rootCause(throwable, IOException.class); return ioException != null ? ioException : new IOException(ThrowableUtils.rootCause(throwable)); } + + @VisibleForTesting + void invalidate(SSTable sstable) + { + summary.invalidate(sstable); + index.invalidate(sstable); + stats.invalidate(sstable); + filter.invalidate(sstable); + compressionMetadata.invalidate(sstable); + } } diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java index 0426fcf91..4392594e5 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java @@ -247,14 +247,16 @@ public SSTableReader(@NotNull TableMetadata metadata, Descriptor descriptor = ReaderUtils.constructDescriptor(metadata.keyspace, metadata.name, ssTable); this.version = descriptor.version; - SummaryDbUtils.Summary summary = null; + IndexSummaryComponent summary = null; Pair keys = null; + IndexSummary indexSummary = null; // indexSummary is only assigned when readIndexOffset is enabled try { now = System.nanoTime(); summary = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable); stats.readSummaryDb(ssTable, System.nanoTime() - now); keys = Pair.of(summary.first(), summary.last()); + indexSummary = readIndexOffset ? summary.summarySharedCopy() : null; } catch (IOException exception) { @@ -388,11 +390,13 @@ public SSTableReader(@NotNull TableMetadata metadata, buildColumnFilter(metadata, columnFilter)); this.metadata = metadata; - if (readIndexOffset && summary != null) + if (indexSummary != null) { - SummaryDbUtils.Summary finalSummary = summary; - extractRange(sparkRangeFilter, partitionKeyFilters) - .ifPresent(range -> readOffsets(finalSummary.summary(), range)); + try (IndexSummary indexSummaryCopy = indexSummary) + { + extractRange(sparkRangeFilter, partitionKeyFilters) + .ifPresent(range -> readOffsets(indexSummaryCopy, range)); + } } else { diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java index a7cc98dc0..f0991220c 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java @@ -19,103 +19,42 @@ package org.apache.cassandra.spark.reader; -import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; import java.math.BigInteger; -import java.nio.ByteBuffer; -import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.io.sstable.IndexSummary; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.spark.data.SSTable; -import org.apache.cassandra.utils.ByteBufferUtil; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; /** * Helper methods for reading the Summary.db SSTable file component + *
+ * This class is Cassandra version agnostic; it is not duplicated in the other bridges */ public final class SummaryDbUtils { - public static class Summary - { - private final IndexSummary indexSummary; - private final DecoratedKey firstKey; - private final DecoratedKey lastKey; - - Summary(IndexSummary indexSummary, - DecoratedKey firstKey, - DecoratedKey lastKey) - { - this.indexSummary = indexSummary; - this.firstKey = firstKey; - this.lastKey = lastKey; - } - - public IndexSummary summary() - { - return indexSummary; - } - - public DecoratedKey first() - { - return firstKey; - } - - public DecoratedKey last() - { - return lastKey; - } - } - private SummaryDbUtils() { throw new IllegalStateException(getClass() + " is static utility class and shall not be instantiated"); } - public static Summary readSummary(@NotNull TableMetadata metadata, @NotNull SSTable ssTable) throws IOException + public static IndexSummaryComponent readSummary(@NotNull TableMetadata metadata, @NotNull SSTable ssTable) throws IOException { return readSummary(ssTable, metadata.partitioner, metadata.params.minIndexInterval, metadata.params.maxIndexInterval); } @Nullable - public static Summary readSummary(@NotNull SSTable ssTable, IPartitioner partitioner, int minIndexInterval, int maxIndexInterval) throws IOException - { - try (InputStream in = ssTable.openSummaryStream()) - { - return readSummary(in, partitioner, minIndexInterval, maxIndexInterval); - } - } - - /** - * Read and deserialize the Summary.db file - * - * @param summaryStream input stream for Summary.db file - * @param partitioner token partitioner - * @param minIndexInterval min index interval - * @param maxIndexInterval max index interval - * @return Summary object - * @throws IOException io exception - */ - @Nullable - static Summary readSummary(InputStream summaryStream, - IPartitioner partitioner, - int minIndexInterval, - int maxIndexInterval) throws IOException + public static IndexSummaryComponent readSummary(@NotNull SSTable sstable, + IPartitioner partitioner, + int minIndexInterval, + int maxIndexInterval) throws IOException { - if (summaryStream == null) + try (InputStream in = sstable.openSummaryStream()) { - return null; - } - - try (DataInputStream is = new DataInputStream(summaryStream)) - { - IndexSummary indexSummary = IndexSummary.serializer.deserialize(is, partitioner, minIndexInterval, maxIndexInterval); - DecoratedKey firstKey = partitioner.decorateKey(ByteBufferUtil.readWithLength(is)); - DecoratedKey lastKey = partitioner.decorateKey(ByteBufferUtil.readWithLength(is)); - return new Summary(indexSummary, firstKey, lastKey); + return IndexSummaryComponent.readSummary(in, partitioner, minIndexInterval, maxIndexInterval); } } @@ -126,47 +65,6 @@ public interface TokenList BigInteger tokenAt(int index); } - /** - * Binary search Summary.db to find nearest offset in Index.db that precedes the token we are looking for - * - * @param summary IndexSummary from Summary.db file - * @param partitioner Cassandra partitioner to hash partition keys to token - * @param token the token we are trying to find - * @return offset into the Index.db file for the closest to partition in the Summary.db file that precedes the token we are looking for - */ - public static long findIndexOffsetInSummary(IndexSummary summary, IPartitioner partitioner, BigInteger token) - { - return summary.getPosition(binarySearchSummary(summary, partitioner, token)); - } - - public static class IndexSummaryTokenList implements TokenList - { - final IPartitioner partitioner; - final IndexSummary summary; - - IndexSummaryTokenList(IPartitioner partitioner, - IndexSummary summary) - { - this.partitioner = partitioner; - this.summary = summary; - } - - public int size() - { - return summary.size(); - } - - public BigInteger tokenAt(int index) - { - return ReaderUtils.tokenToBigInteger(partitioner.decorateKey(ByteBuffer.wrap(summary.getKey(index))).getToken()); - } - } - - public static int binarySearchSummary(IndexSummary summary, IPartitioner partitioner, BigInteger token) - { - return binarySearchSummary(new IndexSummaryTokenList(partitioner, summary), token); - } - /** * Binary search the Summary.db file to find nearest index offset in Index.db for a given token. * Method lifted from org.apache.cassandra.io.sstable.IndexSummary.binarySearch(PartitionPosition key) and reworked for tokens. diff --git a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/ReaderUtilsTests.java b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/ReaderUtilsTests.java index e27dd593b..772d0dc06 100644 --- a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/ReaderUtilsTests.java +++ b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/ReaderUtilsTests.java @@ -161,10 +161,10 @@ public void testReadFirstLastPartitionKey() // Read Summary.db file for first and last partition keys from Summary.db Path summaryFile = TestSSTable.firstIn(directory.path(), FileType.SUMMARY); - SummaryDbUtils.Summary summaryKeys; + IndexSummaryComponent summaryKeys; try (InputStream in = new BufferedInputStream(Files.newInputStream(summaryFile))) { - summaryKeys = SummaryDbUtils.readSummary(in, Murmur3Partitioner.instance, 128, 2048); + summaryKeys = IndexSummaryComponent.readSummary(in, Murmur3Partitioner.instance, 128, 2048); } assertThat(summaryKeys).isNotNull(); assertThat(summaryKeys.first()).isNotNull(); diff --git a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java index b5c9928fe..257288492 100644 --- a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java +++ b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java @@ -78,7 +78,7 @@ public void testCache() assertThat(SSTableCache.INSTANCE.containsStats(ssTable0)).isFalse(); assertThat(SSTableCache.INSTANCE.containsCompressionMetadata(ssTable0)).isFalse(); - SummaryDbUtils.Summary key1 = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable0); + IndexSummaryComponent key1 = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable0); assertThat(key1).isNotNull(); assertThat(SSTableCache.INSTANCE.containsSummary(ssTable0)).isTrue(); assertThat(SSTableCache.INSTANCE.containsIndex(ssTable0)).isFalse(); @@ -133,7 +133,7 @@ public void testCache() assertThat(SSTableCache.INSTANCE.containsStats(ssTable1)).isFalse(); assertThat(SSTableCache.INSTANCE.containsFilter(ssTable1)).isFalse(); assertThat(SSTableCache.INSTANCE.containsCompressionMetadata(ssTable1)).isFalse(); - SummaryDbUtils.Summary key3 = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable1); + IndexSummaryComponent key3 = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable1); assertThat(key3.first()).isNotEqualTo(key1.first()); assertThat(key3.last()).isNotEqualTo(key1.last()); Pair key4 = SSTableCache.INSTANCE.keysFromIndex(metadata, ssTable1); diff --git a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java index d3b4ac383..e700356ea 100644 --- a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java +++ b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java @@ -375,13 +375,13 @@ public void testSkipNoPartitions() SSTable dataFile = TestSSTable.firstIn(directory.path()); Path summaryFile = TestSSTable.firstIn(directory.path(), FileType.SUMMARY); TableMetadata metadata = tableMetadata(schema, partitioner); - SummaryDbUtils.Summary summary; + IndexSummaryComponent summary; try (InputStream in = new BufferedInputStream(Files.newInputStream(summaryFile))) { - summary = SummaryDbUtils.readSummary(in, - metadata.partitioner, - metadata.params.minIndexInterval, - metadata.params.maxIndexInterval); + summary = IndexSummaryComponent.readSummary(in, + metadata.partitioner, + metadata.params.minIndexInterval, + metadata.params.maxIndexInterval); } // Set Spark token range equal to SSTable token range TokenRange sparkTokenRange = TokenRange.closed(ReaderUtils.tokenToBigInteger(summary.first().getToken()), diff --git a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SummaryDbTests.java b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SummaryDbTests.java index 6ba0ea4c3..504f4d3dc 100644 --- a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SummaryDbTests.java +++ b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SummaryDbTests.java @@ -33,6 +33,7 @@ import org.apache.cassandra.bridge.CassandraBridgeImplementation; import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.io.sstable.IndexSummary; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.spark.data.FileType; @@ -114,20 +115,22 @@ public void testSearchSummary() assertThat(ssTable).as("Could not find SSTable").isNotNull(); // Binary search Summary.db file in token order and verify offsets are ordered - SummaryDbUtils.Summary summary = SummaryDbUtils.readSummary(metadata, ssTable); - long previous = -1; - for (BigInteger token : tokens) + try (IndexSummary indexSummary = SummaryDbUtils.readSummary(metadata, ssTable).summarySharedCopy()) { - long offset = SummaryDbUtils.findIndexOffsetInSummary(summary.summary(), iPartitioner, token); - if (previous < 0) + long previous = -1; + for (BigInteger token : tokens) { - assertThat(offset).isEqualTo(0); + long offset = IndexDbUtils.findIndexOffsetInSummary(indexSummary, iPartitioner, token); + if (previous < 0) + { + assertThat(offset).isEqualTo(0); + } + else + { + assertThat(previous).isLessThanOrEqualTo(offset); + } + previous = offset; } - else - { - assertThat(previous).isLessThanOrEqualTo(offset); - } - previous = offset; } } catch (IOException exception)