From 207d4955e821f6184156fb425ba21f9a3c831ce6 Mon Sep 17 00:00:00 2001 From: Yifan Cai Date: Thu, 4 Sep 2025 16:13:16 -0700 Subject: [PATCH 1/7] CASSANALYTICS-87: Fix LEAK DETECTED errors during bulk read Patch by Yifan Cai, James Berragan; Reviewed by TBD for CASSANALYTICS-87 --- CHANGES.txt | 1 + cassandra-five-zero-bridge/build.gradle | 1 + .../spark/reader/SummaryDbUtils.java | 8 +++- .../spark/reader/SSTableCacheTests.java | 41 +++++++++++++++++++ cassandra-four-zero-bridge/build.gradle | 1 + .../cassandra/spark/reader/SSTableCache.java | 33 +++++++++++++++ .../spark/reader/SummaryDbUtils.java | 8 +++- .../spark/reader/SSTableCacheTests.java | 41 +++++++++++++++++++ gradle.properties | 1 + 9 files changed, 133 insertions(+), 2 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index a3698b71e..5b8a72589 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 0.2.0 ----- + * Fix LEAK DETECTED errors during bulk read (CASSANALYTICS-87) * Create bridge modules for Cassandra 5.0 (CASSANALYTICS-84) * Analytics job fails when source table has secondary indexes (CASSANALYTICS-86) * Set KeyStore to be optional (CASSANALYTICS-69) diff --git a/cassandra-five-zero-bridge/build.gradle b/cassandra-five-zero-bridge/build.gradle index 0d730f2bd..af27a47df 100644 --- a/cassandra-five-zero-bridge/build.gradle +++ b/cassandra-five-zero-bridge/build.gradle @@ -72,6 +72,7 @@ dependencies { testImplementation(group: "${sparkGroupId}", name: "spark-core_${scalaMajorVersion}", version: "${project.rootProject.sparkVersion}") testImplementation(group: "${sparkGroupId}", name: "spark-sql_${scalaMajorVersion}", version: "${project.rootProject.sparkVersion}") testImplementation(group: 'com.github.luben', name: 'zstd-jni', version: '1.5.0-4') + testImplementation(group: 'com.github.valfirst', name: 'slf4j-test', version: "${project.slf4jTestVersion}") testRuntimeOnly(group: 'net.java.dev.jna', name: 'jna', version: "${jnaVersion}") testRuntimeOnly(group: 'net.java.dev.jna', name: 'jna-platform', version: "${jnaVersion}") diff --git a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java index a724035a1..11e20fa90 100644 --- a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java +++ b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java @@ -41,7 +41,7 @@ */ public final class SummaryDbUtils { - public static class Summary + public static class Summary implements AutoCloseable { private final IndexSummary indexSummary; private final DecoratedKey firstKey; @@ -70,6 +70,12 @@ public DecoratedKey last() { return lastKey; } + + @Override // The method is expected to be called when evicting the object from sstable cache; do not call it explicitly. + public void close() throws Exception + { + indexSummary.close(); + } } private SummaryDbUtils() diff --git a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java index 1fd7af85b..58cb76f31 100644 --- a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java +++ b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java @@ -25,8 +25,11 @@ import java.util.stream.IntStream; import com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import com.github.valfirst.slf4jtest.TestLogger; +import com.github.valfirst.slf4jtest.TestLoggerFactory; import org.apache.cassandra.bridge.CassandraBridgeImplementation; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.DecoratedKey; @@ -52,6 +55,13 @@ public class SSTableCacheTests { private static final CassandraBridgeImplementation BRIDGE = new CassandraBridgeImplementation(); + private static final TestLogger REF_LOGGER = TestLoggerFactory.getTestLogger("org.apache.cassandra.utils.concurrent.Ref"); + + @AfterEach + void cleanup() + { + REF_LOGGER.clear(); + } // CHECKSTYLE IGNORE: Long method @Test @@ -191,4 +201,35 @@ public void testCache() } }); } + + @Test + void testNoLeakDetectedError() throws Exception + { + Partitioner partitioner = Partitioner.Murmur3Partitioner; + try (TemporaryDirectory directory = new TemporaryDirectory()) + { + // Write an SSTable + TestSchema schema = TestSchema.basic(BRIDGE); + schema.writeSSTable(directory, BRIDGE, partitioner, + writer -> IntStream.range(0, 10).forEach(index -> writer.write(index, 0, index))); + SSTable sstable = TestSSTable.firstIn(directory.path()); + TableMetadata metadata = new SchemaBuilder(schema.createStatement, + schema.keyspace, + new ReplicationFactor(ReplicationFactor.ReplicationStrategy.SimpleStrategy, + ImmutableMap.of("replication_factor", 1)), + partitioner).tableMetaData(); + + SummaryDbUtils.Summary summary = SSTableCache.INSTANCE.keysFromSummary(metadata, sstable); + assertThat(summary).isNotNull(); + assertThat(SSTableCache.INSTANCE.containsSummary(sstable)).isTrue(); + SSTableCache.INSTANCE.invalidate(sstable); + summary = null; + assertThat(SSTableCache.INSTANCE.containsSummary(sstable)).isFalse(); + // trigger GC and wait a bit before asserting LEAK DETECTED is not logged. + System.gc(); + Thread.sleep(1000); + System.gc(); + assertThat(REF_LOGGER.getAllLoggingEvents()).isEmpty(); + } + } } diff --git a/cassandra-four-zero-bridge/build.gradle b/cassandra-four-zero-bridge/build.gradle index add277fb0..959e655e1 100644 --- a/cassandra-four-zero-bridge/build.gradle +++ b/cassandra-four-zero-bridge/build.gradle @@ -60,6 +60,7 @@ dependencies { testImplementation(group: "${sparkGroupId}", name: "spark-core_${scalaMajorVersion}", version: "${project.rootProject.sparkVersion}") testImplementation(group: "${sparkGroupId}", name: "spark-sql_${scalaMajorVersion}", version: "${project.rootProject.sparkVersion}") testImplementation(group: 'com.github.luben', name: 'zstd-jni', version: '1.5.0-4') + testImplementation(group: 'com.github.valfirst', name: 'slf4j-test', version: "${project.slf4jTestVersion}") testRuntimeOnly(group: 'net.java.dev.jna', name: 'jna', version: "${jnaVersion}") testRuntimeOnly(group: 'net.java.dev.jna', name: 'jna-platform', version: "${jnaVersion}") diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java index 083a0ea2c..038900e47 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; +import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import org.slf4j.Logger; @@ -107,6 +108,28 @@ private Cache buildCache(int size, int expireAfterMins) return CacheBuilder.newBuilder() .expireAfterAccess(expireAfterMins, TimeUnit.MINUTES) .maximumSize(size) + .removalListener(notification -> { + // The function is to eliminate the LEAK DETECTED errors. + // How it happens: + // 1. AutoCloseable objects (e.g. IndexSummary and BloomFilter) are evicted from cache + // 2. JVM GC and the close method is not called explicitly to reduce the reference count + // 3. Reference-Reaper thread release the object and print the LEAK DETECTED error + // The function fixes it by closing the object when evicting. + Object val = notification.getValue(); + if (val instanceof AutoCloseable) + { + String typeLiteral = val.getClass().getName(); + try + { + LOGGER.debug("Evicting auto-closable of type: {}", typeLiteral); + ((AutoCloseable) val).close(); + } + catch (Exception e) + { + LOGGER.error("Exception closing cached instance of {}", typeLiteral, e); + } + } + }) .build(); } @@ -211,4 +234,14 @@ private static IOException toIOException(Throwable throwable) IOException ioException = ThrowableUtils.rootCause(throwable, IOException.class); return ioException != null ? ioException : new IOException(ThrowableUtils.rootCause(throwable)); } + + @VisibleForTesting + void invalidate(SSTable sstable) + { + summary.invalidate(sstable); + index.invalidate(sstable); + stats.invalidate(sstable); + filter.invalidate(sstable); + compressionMetadata.invalidate(sstable); + } } diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java index a7cc98dc0..f91337181 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java @@ -39,7 +39,7 @@ */ public final class SummaryDbUtils { - public static class Summary + public static class Summary implements AutoCloseable { private final IndexSummary indexSummary; private final DecoratedKey firstKey; @@ -68,6 +68,12 @@ public DecoratedKey last() { return lastKey; } + + @Override // The method is expected to be called when evicting the object from sstable cache; do not call it explicitly. + public void close() throws Exception + { + indexSummary.close(); + } } private SummaryDbUtils() diff --git a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java index b5c9928fe..96fda2684 100644 --- a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java +++ b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java @@ -26,8 +26,11 @@ import java.util.stream.IntStream; import com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import com.github.valfirst.slf4jtest.TestLogger; +import com.github.valfirst.slf4jtest.TestLoggerFactory; import org.apache.cassandra.bridge.CassandraBridgeImplementation; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.io.sstable.Descriptor; @@ -50,6 +53,13 @@ public class SSTableCacheTests { private static final CassandraBridgeImplementation BRIDGE = new CassandraBridgeImplementation(); + private static final TestLogger REF_LOGGER = TestLoggerFactory.getTestLogger("org.apache.cassandra.utils.concurrent.Ref"); + + @AfterEach + void cleanup() + { + REF_LOGGER.clear(); + } @Test public void testCache() @@ -159,4 +169,35 @@ public void testCache() } }); } + + @Test + void testNoLeakDetectedError() throws Exception + { + Partitioner partitioner = Partitioner.Murmur3Partitioner; + try (TemporaryDirectory directory = new TemporaryDirectory()) + { + // Write an SSTable + TestSchema schema = TestSchema.basic(BRIDGE); + schema.writeSSTable(directory, BRIDGE, partitioner, + writer -> IntStream.range(0, 10).forEach(index -> writer.write(index, 0, index))); + SSTable sstable = TestSSTable.firstIn(directory.path()); + TableMetadata metadata = new SchemaBuilder(schema.createStatement, + schema.keyspace, + new ReplicationFactor(ReplicationFactor.ReplicationStrategy.SimpleStrategy, + ImmutableMap.of("replication_factor", 1)), + partitioner).tableMetaData(); + + SummaryDbUtils.Summary summary = SSTableCache.INSTANCE.keysFromSummary(metadata, sstable); + assertThat(summary).isNotNull(); + assertThat(SSTableCache.INSTANCE.containsSummary(sstable)).isTrue(); + SSTableCache.INSTANCE.invalidate(sstable); + summary = null; + assertThat(SSTableCache.INSTANCE.containsSummary(sstable)).isFalse(); + // trigger GC and wait a bit before asserting LEAK DETECTED is not logged. + System.gc(); + Thread.sleep(1000); + System.gc(); + assertThat(REF_LOGGER.getAllLoggingEvents()).isEmpty(); + } + } } diff --git a/gradle.properties b/gradle.properties index 4312771ca..7d2afb34b 100644 --- a/gradle.properties +++ b/gradle.properties @@ -34,6 +34,7 @@ scala=2.12 spark=3 kryoVersion=4.0.2 slf4jApiVersion=1.7.30 +slf4jTestVersion=2.3.0 guavaVersion=16.0.1 # force version 4.5.1 of vertx to prevent issues initializing io.vertx.core.json.jackson.JacksonCodec, # which requires a newer version of jackson, which is not available in spark 2 From 18ad5cb742ee537a0cfc71d7f8871c8dfced8e92 Mon Sep 17 00:00:00 2001 From: Yifan Cai Date: Thu, 4 Sep 2025 21:24:23 -0700 Subject: [PATCH 2/7] code style --- .../org/apache/cassandra/spark/reader/SSTableCacheTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java index 58cb76f31..a7bc718d2 100644 --- a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java +++ b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java @@ -201,7 +201,7 @@ public void testCache() } }); } - + @Test void testNoLeakDetectedError() throws Exception { From 61add5b66b8d29e387ab60248f89f4d747aaa0f0 Mon Sep 17 00:00:00 2001 From: Yifan Cai Date: Fri, 5 Sep 2025 19:18:34 -0700 Subject: [PATCH 3/7] avoid early release of index summary --- .../cassandra/spark/reader/IndexReader.java | 12 ++++++--- .../cassandra/spark/reader/SSTableReader.java | 11 +++++--- .../spark/reader/SummaryDbUtils.java | 16 +++++++++--- .../spark/reader/SummaryDbTests.java | 25 +++++++++++-------- .../cassandra/spark/reader/IndexReader.java | 12 ++++++--- .../cassandra/spark/reader/SSTableReader.java | 11 +++++--- .../spark/reader/SummaryDbUtils.java | 16 +++++++++--- .../spark/reader/SummaryDbTests.java | 25 +++++++++++-------- 8 files changed, 86 insertions(+), 42 deletions(-) diff --git a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java index f13a39043..d9bc52b40 100644 --- a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java +++ b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java @@ -34,6 +34,7 @@ import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.sstable.indexsummary.IndexSummary; import org.apache.cassandra.io.util.File; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.spark.data.FileType; @@ -87,10 +88,13 @@ public IndexReader(@NotNull SSTable ssTable, return; } - skipAhead = summary.summary().getPosition( - SummaryDbUtils.binarySearchSummary(summary.summary(), metadata.partitioner, rangeFilter.tokenRange().firstEnclosedValue()) - ); - stats.indexSummaryFileRead(System.nanoTime() - now); + try (IndexSummary indexSummary = summary.summarySharedCopy()) + { + skipAhead = indexSummary.getPosition( + SummaryDbUtils.binarySearchSummary(indexSummary, metadata.partitioner, rangeFilter.tokenRange().firstEnclosedValue()) + ); + stats.indexSummaryFileRead(System.nanoTime() - now); + } now = System.nanoTime(); } } diff --git a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java index dd4a59ca1..da61cf1d0 100644 --- a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java +++ b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java @@ -390,11 +390,16 @@ public SSTableReader(@NotNull TableMetadata metadata, buildColumnFilter(metadata, columnFilter)); this.metadata = metadata; + // The summary might be evicted already if downloading other components takes too long; + // Get it from cache again to avoid using the already evicted object + summary = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable); if (readIndexOffset && summary != null) { - SummaryDbUtils.Summary finalSummary = summary; - extractRange(sparkRangeFilter, partitionKeyFilters) - .ifPresent(range -> readOffsets(finalSummary.summary(), range)); + try (IndexSummary indexSummary = summary.summarySharedCopy()) + { + extractRange(sparkRangeFilter, partitionKeyFilters) + .ifPresent(range -> readOffsets(indexSummary, range)); + } } else { diff --git a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java index 11e20fa90..b07d68c95 100644 --- a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java +++ b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java @@ -56,9 +56,14 @@ public static class Summary implements AutoCloseable this.lastKey = lastKey; } - public IndexSummary summary() + /** + * Get a shared copy of the IndexSummary, whose reference count is incremented. + * It is important to close the shared copy to decrement the reference count. + * @return a shared copy of the IndexSummary object + */ + public IndexSummary summarySharedCopy() { - return indexSummary; + return indexSummary.sharedCopy(); } public DecoratedKey first() @@ -149,7 +154,12 @@ public static long findIndexOffsetInSummary(IndexSummary summary, IPartitioner p return summary.getPosition(binarySearchSummary(summary, partitioner, token)); } - public static class IndexSummaryTokenList implements TokenList + /** + * The class is private on purpose. + * Think carefully if you want to open up the access modifier from private to public. + * IndexSummary's underlying memory could be released. You do not want to leak the reference and get segment fault. + */ + private static class IndexSummaryTokenList implements TokenList { final IPartitioner partitioner; final IndexSummary summary; diff --git a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SummaryDbTests.java b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SummaryDbTests.java index 5e787067c..b13b07e9d 100644 --- a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SummaryDbTests.java +++ b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SummaryDbTests.java @@ -34,6 +34,7 @@ import org.apache.cassandra.bridge.CassandraBridgeImplementation; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.io.sstable.indexsummary.IndexSummary; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.spark.TestUtils; @@ -117,20 +118,22 @@ public void testSearchSummary() assertThat(ssTable).as("Could not find SSTable").isNotNull(); // Binary search Summary.db file in token order and verify offsets are ordered - SummaryDbUtils.Summary summary = SummaryDbUtils.readSummary(metadata, ssTable); - long previous = -1; - for (BigInteger token : tokens) + try (IndexSummary indexSummary = SummaryDbUtils.readSummary(metadata, ssTable).summarySharedCopy()) { - long offset = SummaryDbUtils.findIndexOffsetInSummary(summary.summary(), iPartitioner, token); - if (previous < 0) + long previous = -1; + for (BigInteger token : tokens) { - assertThat(offset).isEqualTo(0); + long offset = SummaryDbUtils.findIndexOffsetInSummary(indexSummary, iPartitioner, token); + if (previous < 0) + { + assertThat(offset).isEqualTo(0); + } + else + { + assertThat(previous).isLessThanOrEqualTo(offset); + } + previous = offset; } - else - { - assertThat(previous).isLessThanOrEqualTo(offset); - } - previous = offset; } } catch (IOException exception) diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java index c855b7af4..aa8d132cb 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java @@ -34,6 +34,7 @@ import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.IndexSummary; import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.spark.data.FileType; @@ -87,10 +88,13 @@ public IndexReader(@NotNull SSTable ssTable, return; } - skipAhead = summary.summary().getPosition( - SummaryDbUtils.binarySearchSummary(summary.summary(), metadata.partitioner, rangeFilter.tokenRange().firstEnclosedValue()) - ); - stats.indexSummaryFileRead(System.nanoTime() - now); + try (IndexSummary indexSummary = summary.summarySharedCopy()) + { + skipAhead = indexSummary.getPosition( + SummaryDbUtils.binarySearchSummary(indexSummary, metadata.partitioner, rangeFilter.tokenRange().firstEnclosedValue()) + ); + stats.indexSummaryFileRead(System.nanoTime() - now); + } now = System.nanoTime(); } } diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java index 0426fcf91..3ee3d507e 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java @@ -388,11 +388,16 @@ public SSTableReader(@NotNull TableMetadata metadata, buildColumnFilter(metadata, columnFilter)); this.metadata = metadata; + // The summary might be evicted already if downloading other components takes too long; + // Get it from cache again to avoid using the already evicted object + summary = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable); if (readIndexOffset && summary != null) { - SummaryDbUtils.Summary finalSummary = summary; - extractRange(sparkRangeFilter, partitionKeyFilters) - .ifPresent(range -> readOffsets(finalSummary.summary(), range)); + try (IndexSummary indexSummary = summary.summarySharedCopy()) + { + extractRange(sparkRangeFilter, partitionKeyFilters) + .ifPresent(range -> readOffsets(indexSummary, range)); + } } else { diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java index f91337181..a17927d0b 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java @@ -54,9 +54,14 @@ public static class Summary implements AutoCloseable this.lastKey = lastKey; } - public IndexSummary summary() + /** + * Get a shared copy of the IndexSummary, whose reference count is incremented. + * It is important to close the shared copy to decrement the reference count. + * @return a shared copy of the IndexSummary object + */ + public IndexSummary summarySharedCopy() { - return indexSummary; + return indexSummary.sharedCopy(); } public DecoratedKey first() @@ -145,7 +150,12 @@ public static long findIndexOffsetInSummary(IndexSummary summary, IPartitioner p return summary.getPosition(binarySearchSummary(summary, partitioner, token)); } - public static class IndexSummaryTokenList implements TokenList + /** + * The class is private on purpose. + * Think carefully if you want to open up the access modifier from private to public. + * IndexSummary's underlying memory could be released. You do not want to leak the reference and get segment fault. + */ + private static class IndexSummaryTokenList implements TokenList { final IPartitioner partitioner; final IndexSummary summary; diff --git a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SummaryDbTests.java b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SummaryDbTests.java index 6ba0ea4c3..5a7816f9c 100644 --- a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SummaryDbTests.java +++ b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SummaryDbTests.java @@ -33,6 +33,7 @@ import org.apache.cassandra.bridge.CassandraBridgeImplementation; import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.io.sstable.IndexSummary; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.spark.data.FileType; @@ -114,20 +115,22 @@ public void testSearchSummary() assertThat(ssTable).as("Could not find SSTable").isNotNull(); // Binary search Summary.db file in token order and verify offsets are ordered - SummaryDbUtils.Summary summary = SummaryDbUtils.readSummary(metadata, ssTable); - long previous = -1; - for (BigInteger token : tokens) + try (IndexSummary indexSummary = SummaryDbUtils.readSummary(metadata, ssTable).summarySharedCopy()) { - long offset = SummaryDbUtils.findIndexOffsetInSummary(summary.summary(), iPartitioner, token); - if (previous < 0) + long previous = -1; + for (BigInteger token : tokens) { - assertThat(offset).isEqualTo(0); + long offset = SummaryDbUtils.findIndexOffsetInSummary(indexSummary, iPartitioner, token); + if (previous < 0) + { + assertThat(offset).isEqualTo(0); + } + else + { + assertThat(previous).isLessThanOrEqualTo(offset); + } + previous = offset; } - else - { - assertThat(previous).isLessThanOrEqualTo(offset); - } - previous = offset; } } catch (IOException exception) From 3e01b9967fed656b51daeb680220a40a4485123d Mon Sep 17 00:00:00 2001 From: Yifan Cai Date: Fri, 5 Sep 2025 19:41:24 -0700 Subject: [PATCH 4/7] Refactor: make SummaryDbUtils version agnostic. --- .../bridge/CassandraBridgeImplementation.java | 3 +- .../cassandra/spark/reader/IndexDbUtils.java | 48 +++- .../cassandra/spark/reader/IndexReader.java | 4 +- .../spark/reader/IndexSummaryComponent.java | 106 ++++++++ .../cassandra/spark/reader/SSTableReader.java | 2 +- .../spark/reader/SummaryDbUtils.java | 229 ------------------ .../spark/reader/ReaderUtilsTests.java | 4 +- .../spark/reader/SSTableCacheTests.java | 6 +- .../spark/reader/SSTableReaderTests.java | 8 +- .../spark/reader/SummaryDbTests.java | 2 +- .../bridge/CassandraBridgeImplementation.java | 3 +- .../cassandra/spark/reader/IndexDbUtils.java | 48 +++- .../cassandra/spark/reader/IndexReader.java | 5 +- .../spark/reader/IndexSummaryComponent.java | 102 ++++++++ .../cassandra/spark/reader/SSTableCache.java | 29 ++- .../cassandra/spark/reader/SSTableReader.java | 2 +- .../spark/reader/SummaryDbUtils.java | 136 +---------- .../spark/reader/ReaderUtilsTests.java | 4 +- .../spark/reader/SSTableCacheTests.java | 6 +- .../spark/reader/SSTableReaderTests.java | 10 +- .../spark/reader/SummaryDbTests.java | 2 +- 21 files changed, 360 insertions(+), 399 deletions(-) create mode 100644 cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexSummaryComponent.java delete mode 100644 cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java create mode 100644 cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexSummaryComponent.java diff --git a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java index 4ed69f401..b72288575 100644 --- a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java +++ b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java @@ -92,6 +92,7 @@ import org.apache.cassandra.spark.reader.CompactionStreamScanner; import org.apache.cassandra.spark.reader.IndexEntry; import org.apache.cassandra.spark.reader.IndexReader; +import org.apache.cassandra.spark.reader.IndexSummaryComponent; import org.apache.cassandra.spark.reader.ReaderUtils; import org.apache.cassandra.spark.reader.RowData; import org.apache.cassandra.spark.reader.SchemaBuilder; @@ -567,7 +568,7 @@ protected SSTableSummary getSSTableSummary(@NotNull IPartitioner partitioner, { try { - SummaryDbUtils.Summary summary = SummaryDbUtils.readSummary(ssTable, partitioner, minIndexInterval, maxIndexInterval); + IndexSummaryComponent summary = SummaryDbUtils.readSummary(ssTable, partitioner, minIndexInterval, maxIndexInterval); Pair keys = summary == null ? null : Pair.of(summary.first(), summary.last()); if (summary == null) { diff --git a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexDbUtils.java b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexDbUtils.java index 4dd993e01..5e1981491 100644 --- a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexDbUtils.java +++ b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexDbUtils.java @@ -53,7 +53,7 @@ public static Long findDataDbOffset(@NotNull IndexSummary indexSummary, @NotNull SSTable ssTable, @NotNull Stats stats) throws IOException { - long searchStartOffset = SummaryDbUtils.findIndexOffsetInSummary(indexSummary, partitioner, range.firstEnclosedValue()); + long searchStartOffset = findIndexOffsetInSummary(indexSummary, partitioner, range.firstEnclosedValue()); // Open the Index.db, skip to nearest offset found in Summary.db and find start & end offset for the Data.db file return findDataDbOffset(range, partitioner, ssTable, stats, searchStartOffset); @@ -171,4 +171,50 @@ static BigInteger readNextToken(@NotNull IPartitioner partitioner, stats.readPartitionIndexDb((ByteBuffer) key.rewind(), token); return token; } + + /** + * Binary search Summary.db to find nearest offset in Index.db that precedes the token we are looking for + * + * @param summary IndexSummary from Summary.db file + * @param partitioner Cassandra partitioner to hash partition keys to token + * @param token the token we are trying to find + * @return offset into the Index.db file for the closest to partition in the Summary.db file that precedes the token we are looking for + */ + public static long findIndexOffsetInSummary(IndexSummary summary, IPartitioner partitioner, BigInteger token) + { + return summary.getPosition(binarySearchSummary(summary, partitioner, token)); + } + + /** + * The class is private on purpose. + * Think carefully if you want to open up the access modifier from private to public. + * IndexSummary's underlying memory could be released. You do not want to leak the reference and get segment fault. + */ + private static class IndexSummaryTokenList implements SummaryDbUtils.TokenList + { + final IPartitioner partitioner; + final IndexSummary summary; + + IndexSummaryTokenList(IPartitioner partitioner, + IndexSummary summary) + { + this.partitioner = partitioner; + this.summary = summary; + } + + public int size() + { + return summary.size(); + } + + public BigInteger tokenAt(int index) + { + return ReaderUtils.tokenToBigInteger(partitioner.decorateKey(ByteBuffer.wrap(summary.getKey(index))).getToken()); + } + } + + public static int binarySearchSummary(IndexSummary summary, IPartitioner partitioner, BigInteger token) + { + return SummaryDbUtils.binarySearchSummary(new IndexSummaryTokenList(partitioner, summary), token); + } } diff --git a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java index d9bc52b40..9a1c62692 100644 --- a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java +++ b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java @@ -74,7 +74,7 @@ public IndexReader(@NotNull SSTable ssTable, now = System.nanoTime(); if (rangeFilter != null) { - SummaryDbUtils.Summary summary = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable); + IndexSummaryComponent summary = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable); if (summary != null) { this.ssTableRange = TokenRange.closed(ReaderUtils.tokenToBigInteger(summary.first().getToken()), @@ -91,7 +91,7 @@ public IndexReader(@NotNull SSTable ssTable, try (IndexSummary indexSummary = summary.summarySharedCopy()) { skipAhead = indexSummary.getPosition( - SummaryDbUtils.binarySearchSummary(indexSummary, metadata.partitioner, rangeFilter.tokenRange().firstEnclosedValue()) + IndexDbUtils.binarySearchSummary(indexSummary, metadata.partitioner, rangeFilter.tokenRange().firstEnclosedValue()) ); stats.indexSummaryFileRead(System.nanoTime() - now); } diff --git a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexSummaryComponent.java b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexSummaryComponent.java new file mode 100644 index 000000000..414cfb61a --- /dev/null +++ b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexSummaryComponent.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.reader; + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.io.sstable.indexsummary.IndexSummary; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.RebufferingChannelInputStream; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.jetbrains.annotations.Nullable; + +public class IndexSummaryComponent implements AutoCloseable +{ + private final IndexSummary indexSummary; + private final DecoratedKey firstKey; + private final DecoratedKey lastKey; + + /** + * Read and deserialize the Summary.db file + * + * @param summaryStream input stream for Summary.db file + * @param partitioner token partitioner + * @param minIndexInterval min index interval + * @param maxIndexInterval max index interval + * @return Summary object + * @throws IOException io exception + */ + @Nullable + static IndexSummaryComponent readSummary(InputStream summaryStream, + IPartitioner partitioner, + int minIndexInterval, + int maxIndexInterval) throws IOException + { + if (summaryStream == null) + { + return null; + } + + int bufferSize = ReaderUtils.inputStreamBufferSize(summaryStream); + try (DataInputStream is = new DataInputStream(summaryStream); + DataInputPlus.DataInputStreamPlus dis = new RebufferingChannelInputStream(is, bufferSize)) + { + IndexSummary indexSummary = IndexSummary.serializer.deserialize(dis, partitioner, minIndexInterval, maxIndexInterval); + DecoratedKey firstKey = partitioner.decorateKey(ByteBufferUtil.readWithLength(dis)); + DecoratedKey lastKey = partitioner.decorateKey(ByteBufferUtil.readWithLength(dis)); + return new IndexSummaryComponent(indexSummary, firstKey, lastKey); + } + } + + IndexSummaryComponent(IndexSummary indexSummary, + DecoratedKey firstKey, + DecoratedKey lastKey) + { + this.indexSummary = indexSummary; + this.firstKey = firstKey; + this.lastKey = lastKey; + } + + /** + * Get a shared copy of the IndexSummary, whose reference count is incremented. + * It is important to close the shared copy to decrement the reference count. + * @return a shared copy of the IndexSummary object + */ + public IndexSummary summarySharedCopy() + { + return indexSummary.sharedCopy(); + } + + public DecoratedKey first() + { + return firstKey; + } + + public DecoratedKey last() + { + return lastKey; + } + + @Override // The method is expected to be called when evicting the object from sstable cache; do not call it explicitly. + public void close() throws Exception + { + indexSummary.close(); + } +} diff --git a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java index da61cf1d0..fe1744d05 100644 --- a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java +++ b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java @@ -249,7 +249,7 @@ public SSTableReader(@NotNull TableMetadata metadata, Descriptor descriptor = ReaderUtils.constructDescriptor(metadata.keyspace, metadata.name, ssTable); this.version = descriptor.version; - SummaryDbUtils.Summary summary = null; + IndexSummaryComponent summary = null; Pair keys = null; try { diff --git a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java deleted file mode 100644 index b07d68c95..000000000 --- a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.cassandra.spark.reader; - -import java.io.DataInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.math.BigInteger; -import java.nio.ByteBuffer; - -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.io.sstable.indexsummary.IndexSummary; -import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.io.util.RebufferingChannelInputStream; -import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.spark.data.SSTable; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; - -/** - * Helper methods for reading the Summary.db SSTable file component - */ -public final class SummaryDbUtils -{ - public static class Summary implements AutoCloseable - { - private final IndexSummary indexSummary; - private final DecoratedKey firstKey; - private final DecoratedKey lastKey; - - Summary(IndexSummary indexSummary, - DecoratedKey firstKey, - DecoratedKey lastKey) - { - this.indexSummary = indexSummary; - this.firstKey = firstKey; - this.lastKey = lastKey; - } - - /** - * Get a shared copy of the IndexSummary, whose reference count is incremented. - * It is important to close the shared copy to decrement the reference count. - * @return a shared copy of the IndexSummary object - */ - public IndexSummary summarySharedCopy() - { - return indexSummary.sharedCopy(); - } - - public DecoratedKey first() - { - return firstKey; - } - - public DecoratedKey last() - { - return lastKey; - } - - @Override // The method is expected to be called when evicting the object from sstable cache; do not call it explicitly. - public void close() throws Exception - { - indexSummary.close(); - } - } - - private SummaryDbUtils() - { - throw new IllegalStateException(getClass() + " is static utility class and shall not be instantiated"); - } - - public static Summary readSummary(@NotNull TableMetadata metadata, @NotNull SSTable ssTable) throws IOException - { - return readSummary(ssTable, metadata.partitioner, metadata.params.minIndexInterval, metadata.params.maxIndexInterval); - } - - @Nullable - public static Summary readSummary(@NotNull SSTable ssTable, IPartitioner partitioner, int minIndexInterval, int maxIndexInterval) throws IOException - { - try (InputStream in = ssTable.openSummaryStream()) - { - return readSummary(in, partitioner, minIndexInterval, maxIndexInterval); - } - } - - /** - * Read and deserialize the Summary.db file - * - * @param summaryStream input stream for Summary.db file - * @param partitioner token partitioner - * @param minIndexInterval min index interval - * @param maxIndexInterval max index interval - * @return Summary object - * @throws IOException io exception - */ - @Nullable - static Summary readSummary(InputStream summaryStream, - IPartitioner partitioner, - int minIndexInterval, - int maxIndexInterval) throws IOException - { - if (summaryStream == null) - { - return null; - } - - int bufferSize = ReaderUtils.inputStreamBufferSize(summaryStream); - try (DataInputStream is = new DataInputStream(summaryStream); - DataInputPlus.DataInputStreamPlus dis = new RebufferingChannelInputStream(is, bufferSize)) - { - IndexSummary indexSummary = IndexSummary.serializer.deserialize(dis, partitioner, minIndexInterval, maxIndexInterval); - DecoratedKey firstKey = partitioner.decorateKey(ByteBufferUtil.readWithLength(dis)); - DecoratedKey lastKey = partitioner.decorateKey(ByteBufferUtil.readWithLength(dis)); - return new Summary(indexSummary, firstKey, lastKey); - } - } - - public interface TokenList - { - int size(); - - BigInteger tokenAt(int index); - } - - /** - * Binary search Summary.db to find nearest offset in Index.db that precedes the token we are looking for - * - * @param summary IndexSummary from Summary.db file - * @param partitioner Cassandra partitioner to hash partition keys to token - * @param token the token we are trying to find - * @return offset into the Index.db file for the closest to partition in the Summary.db file that precedes the token we are looking for - */ - public static long findIndexOffsetInSummary(IndexSummary summary, IPartitioner partitioner, BigInteger token) - { - return summary.getPosition(binarySearchSummary(summary, partitioner, token)); - } - - /** - * The class is private on purpose. - * Think carefully if you want to open up the access modifier from private to public. - * IndexSummary's underlying memory could be released. You do not want to leak the reference and get segment fault. - */ - private static class IndexSummaryTokenList implements TokenList - { - final IPartitioner partitioner; - final IndexSummary summary; - - IndexSummaryTokenList(IPartitioner partitioner, - IndexSummary summary) - { - this.partitioner = partitioner; - this.summary = summary; - } - - public int size() - { - return summary.size(); - } - - public BigInteger tokenAt(int index) - { - return ReaderUtils.tokenToBigInteger(partitioner.decorateKey(ByteBuffer.wrap(summary.getKey(index))).getToken()); - } - } - - public static int binarySearchSummary(IndexSummary summary, IPartitioner partitioner, BigInteger token) - { - return binarySearchSummary(new IndexSummaryTokenList(partitioner, summary), token); - } - - /** - * Binary search the Summary.db file to find nearest index offset in Index.db for a given token. - * Method lifted from org.apache.cassandra.io.sstable.IndexSummary.binarySearch(PartitionPosition key) and reworked for tokens. - * - * @param tokenList list of tokens to binary search - * @param token token to find - * @return closest offset in Index.db preceding token - */ - public static int binarySearchSummary(TokenList tokenList, BigInteger token) - { - int low = 0; - int mid = tokenList.size(); - int high = mid - 1; - int result = -1; - while (low <= high) - { - mid = low + high >> 1; - result = token.compareTo(tokenList.tokenAt(mid)); - if (result > 0) - { - low = mid + 1; - } - else if (result < 0) - { - high = mid - 1; - } - else - { - break; // Exact match - } - } - - // If: - // 1) result < 0: the token is less than nearest sampled token found at mid, so we need to start from mid - 1. - // 2) result == 0: we found an exact match for the token in the sample, - // but there may be token collisions in Data.db so start from mid -1 to be safe. - // 3) result > 0: the nearest sample token at mid is less than the token so we can start from that position. - return result <= 0 ? Math.max(0, mid - 1) : mid; - } -} diff --git a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/ReaderUtilsTests.java b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/ReaderUtilsTests.java index a9440f763..db90a44ba 100644 --- a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/ReaderUtilsTests.java +++ b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/ReaderUtilsTests.java @@ -165,10 +165,10 @@ public void testReadFirstLastPartitionKey() // Read Summary.db file for first and last partition keys from Summary.db Path summaryFile = TestSSTable.firstIn(directory.path(), FileType.SUMMARY); - SummaryDbUtils.Summary summaryKeys; + IndexSummaryComponent summaryKeys; try (InputStream in = new BufferedInputStream(Files.newInputStream(summaryFile))) { - summaryKeys = SummaryDbUtils.readSummary(in, Murmur3Partitioner.instance, 128, 2048); + summaryKeys = IndexSummaryComponent.readSummary(in, Murmur3Partitioner.instance, 128, 2048); } assertThat(summaryKeys).isNotNull(); assertThat(summaryKeys.first()).isNotNull(); diff --git a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java index a7bc718d2..75ed51f22 100644 --- a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java +++ b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java @@ -92,7 +92,7 @@ public void testCache() assertThat(SSTableCache.INSTANCE.containsStats(ssTable0)).isFalse(); assertThat(SSTableCache.INSTANCE.containsCompressionMetadata(ssTable0)).isFalse(); - SummaryDbUtils.Summary key1 = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable0); + IndexSummaryComponent key1 = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable0); if (ssTable0.isBigFormat()) { assertThat(key1).isNotNull(); @@ -171,7 +171,7 @@ public void testCache() assertThat(SSTableCache.INSTANCE.containsCompressionMetadata(ssTable1)).isFalse(); if (ssTable1.isBigFormat()) { - SummaryDbUtils.Summary key3 = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable1); + IndexSummaryComponent key3 = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable1); assertThat(key3.first()).isNotEqualTo(key1.first()); assertThat(key3.last()).isNotEqualTo(key1.last()); assertThat(SSTableCache.INSTANCE.keysFromIndex(metadata, ssTable1).left) @@ -219,7 +219,7 @@ void testNoLeakDetectedError() throws Exception ImmutableMap.of("replication_factor", 1)), partitioner).tableMetaData(); - SummaryDbUtils.Summary summary = SSTableCache.INSTANCE.keysFromSummary(metadata, sstable); + IndexSummaryComponent summary = SSTableCache.INSTANCE.keysFromSummary(metadata, sstable); assertThat(summary).isNotNull(); assertThat(SSTableCache.INSTANCE.containsSummary(sstable)).isTrue(); SSTableCache.INSTANCE.invalidate(sstable); diff --git a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java index fae43c532..f5a9c3816 100644 --- a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java +++ b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java @@ -393,10 +393,10 @@ public void testSkipNoPartitions() Path summaryFile = TestSSTable.firstIn(directory.path(), FileType.SUMMARY); try (InputStream in = new BufferedInputStream(Files.newInputStream(summaryFile))) { - SummaryDbUtils.Summary summary = SummaryDbUtils.readSummary(in, - metadata.partitioner, - metadata.params.minIndexInterval, - metadata.params.maxIndexInterval); + IndexSummaryComponent summary = IndexSummaryComponent.readSummary(in, + metadata.partitioner, + metadata.params.minIndexInterval, + metadata.params.maxIndexInterval); first = summary.first(); last = summary.last(); } diff --git a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SummaryDbTests.java b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SummaryDbTests.java index b13b07e9d..b171f2803 100644 --- a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SummaryDbTests.java +++ b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SummaryDbTests.java @@ -123,7 +123,7 @@ public void testSearchSummary() long previous = -1; for (BigInteger token : tokens) { - long offset = SummaryDbUtils.findIndexOffsetInSummary(indexSummary, iPartitioner, token); + long offset = IndexDbUtils.findIndexOffsetInSummary(indexSummary, iPartitioner, token); if (previous < 0) { assertThat(offset).isEqualTo(0); diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java index 25a8954a1..12add5526 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java @@ -92,6 +92,7 @@ import org.apache.cassandra.spark.reader.CompactionStreamScanner; import org.apache.cassandra.spark.reader.IndexEntry; import org.apache.cassandra.spark.reader.IndexReader; +import org.apache.cassandra.spark.reader.IndexSummaryComponent; import org.apache.cassandra.spark.reader.ReaderUtils; import org.apache.cassandra.spark.reader.RowData; import org.apache.cassandra.spark.reader.SchemaBuilder; @@ -567,7 +568,7 @@ protected SSTableSummary getSSTableSummary(@NotNull IPartitioner partitioner, { try { - SummaryDbUtils.Summary summary = SummaryDbUtils.readSummary(ssTable, partitioner, minIndexInterval, maxIndexInterval); + IndexSummaryComponent summary = SummaryDbUtils.readSummary(ssTable, partitioner, minIndexInterval, maxIndexInterval); Pair keys = summary == null ? null : Pair.of(summary.first(), summary.last()); if (summary == null) { diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexDbUtils.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexDbUtils.java index d48892bbd..dc9b7f8c6 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexDbUtils.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexDbUtils.java @@ -53,7 +53,7 @@ public static Long findDataDbOffset(@NotNull IndexSummary indexSummary, @NotNull SSTable ssTable, @NotNull Stats stats) throws IOException { - long searchStartOffset = SummaryDbUtils.findIndexOffsetInSummary(indexSummary, partitioner, range.firstEnclosedValue()); + long searchStartOffset = findIndexOffsetInSummary(indexSummary, partitioner, range.firstEnclosedValue()); // Open the Index.db, skip to nearest offset found in Summary.db and find start & end offset for the Data.db file return findDataDbOffset(range, partitioner, ssTable, stats, searchStartOffset); @@ -171,4 +171,50 @@ static BigInteger readNextToken(@NotNull IPartitioner partitioner, stats.readPartitionIndexDb((ByteBuffer) key.rewind(), token); return token; } + + /** + * Binary search Summary.db to find nearest offset in Index.db that precedes the token we are looking for + * + * @param summary IndexSummary from Summary.db file + * @param partitioner Cassandra partitioner to hash partition keys to token + * @param token the token we are trying to find + * @return offset into the Index.db file for the closest to partition in the Summary.db file that precedes the token we are looking for + */ + public static long findIndexOffsetInSummary(IndexSummary summary, IPartitioner partitioner, BigInteger token) + { + return summary.getPosition(binarySearchSummary(summary, partitioner, token)); + } + + /** + * The class is private on purpose. + * Think carefully if you want to open up the access modifier from private to public. + * IndexSummary's underlying memory could be released. You do not want to leak the reference and get segment fault. + */ + private static class IndexSummaryTokenList implements SummaryDbUtils.TokenList + { + final IPartitioner partitioner; + final IndexSummary summary; + + IndexSummaryTokenList(IPartitioner partitioner, + IndexSummary summary) + { + this.partitioner = partitioner; + this.summary = summary; + } + + public int size() + { + return summary.size(); + } + + public BigInteger tokenAt(int index) + { + return ReaderUtils.tokenToBigInteger(partitioner.decorateKey(ByteBuffer.wrap(summary.getKey(index))).getToken()); + } + } + + public static int binarySearchSummary(IndexSummary summary, IPartitioner partitioner, BigInteger token) + { + return SummaryDbUtils.binarySearchSummary(new IndexSummaryTokenList(partitioner, summary), token); + } } diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java index aa8d132cb..ed53c2be0 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java @@ -74,7 +74,7 @@ public IndexReader(@NotNull SSTable ssTable, now = System.nanoTime(); if (rangeFilter != null) { - SummaryDbUtils.Summary summary = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable); + IndexSummaryComponent summary = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable); if (summary != null) { this.ssTableRange = TokenRange.closed(ReaderUtils.tokenToBigInteger(summary.first().getToken()), @@ -91,8 +91,7 @@ public IndexReader(@NotNull SSTable ssTable, try (IndexSummary indexSummary = summary.summarySharedCopy()) { skipAhead = indexSummary.getPosition( - SummaryDbUtils.binarySearchSummary(indexSummary, metadata.partitioner, rangeFilter.tokenRange().firstEnclosedValue()) - ); + IndexDbUtils.binarySearchSummary(indexSummary, metadata.partitioner, rangeFilter.tokenRange().firstEnclosedValue())); stats.indexSummaryFileRead(System.nanoTime() - now); } now = System.nanoTime(); diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexSummaryComponent.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexSummaryComponent.java new file mode 100644 index 000000000..f9995803a --- /dev/null +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexSummaryComponent.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.reader; + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.io.sstable.IndexSummary; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.jetbrains.annotations.Nullable; + +public class IndexSummaryComponent implements AutoCloseable +{ + private final IndexSummary indexSummary; + private final DecoratedKey firstKey; + private final DecoratedKey lastKey; + + /** + * Read and deserialize the Summary.db file + * + * @param summaryStream input stream for Summary.db file + * @param partitioner token partitioner + * @param minIndexInterval min index interval + * @param maxIndexInterval max index interval + * @return Summary object + * @throws IOException io exception + */ + @Nullable + static IndexSummaryComponent readSummary(InputStream summaryStream, + IPartitioner partitioner, + int minIndexInterval, + int maxIndexInterval) throws IOException + { + if (summaryStream == null) + { + return null; + } + + try (DataInputStream is = new DataInputStream(summaryStream)) + { + IndexSummary indexSummary = IndexSummary.serializer.deserialize(is, partitioner, minIndexInterval, maxIndexInterval); + DecoratedKey firstKey = partitioner.decorateKey(ByteBufferUtil.readWithLength(is)); + DecoratedKey lastKey = partitioner.decorateKey(ByteBufferUtil.readWithLength(is)); + return new IndexSummaryComponent(indexSummary, firstKey, lastKey); + } + } + + IndexSummaryComponent(IndexSummary indexSummary, + DecoratedKey firstKey, + DecoratedKey lastKey) + { + this.indexSummary = indexSummary; + this.firstKey = firstKey; + this.lastKey = lastKey; + } + + /** + * Get a shared copy of the IndexSummary, whose reference count is incremented. + * It is important to close the shared copy to decrement the reference count. + * @return a shared copy of the IndexSummary object + */ + public IndexSummary summarySharedCopy() + { + return indexSummary.sharedCopy(); + } + + public DecoratedKey first() + { + return firstKey; + } + + public DecoratedKey last() + { + return lastKey; + } + + @Override // The method is expected to be called when evicting the object from sstable cache; do not call it explicitly. + public void close() throws Exception + { + indexSummary.close(); + } +} diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java index 038900e47..261295f24 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java @@ -57,17 +57,24 @@ public class SSTableCache private static final Logger LOGGER = LoggerFactory.getLogger(SSTableCache.class); public static final SSTableCache INSTANCE = new SSTableCache(); - private final Cache summary = buildCache(propOrDefault("sbr.cache.summary.maxEntries", 4096), - propOrDefault("sbr.cache.summary.expireAfterMins", 15)); - private final Cache> index = buildCache(propOrDefault("sbr.cache.index.maxEntries", 128), - propOrDefault("sbr.cache.index.expireAfterMins", 60)); - private final Cache> stats = buildCache(propOrDefault("sbr.cache.stats.maxEntries", 16384), - propOrDefault("sbr.cache.stats.expireAfterMins", 60)); - private final Cache filter = buildCache(propOrDefault("sbr.cache.filter.maxEntries", 16384), - propOrDefault("sbr.cache.filter.expireAfterMins", 60)); + private final Cache summary = buildCache( + propOrDefault("sbr.cache.summary.maxEntries", 4096), + propOrDefault("sbr.cache.summary.expireAfterMins", 15)); + + private final Cache> index = buildCache( + propOrDefault("sbr.cache.index.maxEntries", 128), + propOrDefault("sbr.cache.index.expireAfterMins", 60)); + + private final Cache> stats = buildCache( + propOrDefault("sbr.cache.stats.maxEntries", 16384), + propOrDefault("sbr.cache.stats.expireAfterMins", 60)); + + private final Cache filter = buildCache( + propOrDefault("sbr.cache.filter.maxEntries", 16384), + propOrDefault("sbr.cache.filter.expireAfterMins", 60)); // if compression is disabled then the CompressionInfo.db file will not exist // therefore we can cache as Optional to a) avoid null errors in the cache and b) record that the component does not exist - private final Cache> compressionMetadata = buildCache( + private final Cache> compressionMetadata = buildCache( propOrDefault("sbr.cache.compressionInfo.maxEntries", 128), propOrDefault("sbr.cache.compressionInfo.expireAfterMins", 15)); @@ -133,8 +140,8 @@ private Cache buildCache(int size, int expireAfterMins) .build(); } - public SummaryDbUtils.Summary keysFromSummary(@NotNull TableMetadata metadata, - @NotNull SSTable ssTable) throws IOException + public IndexSummaryComponent keysFromSummary(@NotNull TableMetadata metadata, + @NotNull SSTable ssTable) throws IOException { return get(summary, ssTable, () -> SummaryDbUtils.readSummary(metadata, ssTable)); } diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java index 3ee3d507e..54ddcc390 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java @@ -247,7 +247,7 @@ public SSTableReader(@NotNull TableMetadata metadata, Descriptor descriptor = ReaderUtils.constructDescriptor(metadata.keyspace, metadata.name, ssTable); this.version = descriptor.version; - SummaryDbUtils.Summary summary = null; + IndexSummaryComponent summary = null; Pair keys = null; try { diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java index a17927d0b..f0991220c 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SummaryDbUtils.java @@ -19,114 +19,42 @@ package org.apache.cassandra.spark.reader; -import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; import java.math.BigInteger; -import java.nio.ByteBuffer; -import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.io.sstable.IndexSummary; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.spark.data.SSTable; -import org.apache.cassandra.utils.ByteBufferUtil; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; /** * Helper methods for reading the Summary.db SSTable file component + *
+ * This class is Cassandra version agnostic; it is not duplicated in the other bridges */ public final class SummaryDbUtils { - public static class Summary implements AutoCloseable - { - private final IndexSummary indexSummary; - private final DecoratedKey firstKey; - private final DecoratedKey lastKey; - - Summary(IndexSummary indexSummary, - DecoratedKey firstKey, - DecoratedKey lastKey) - { - this.indexSummary = indexSummary; - this.firstKey = firstKey; - this.lastKey = lastKey; - } - - /** - * Get a shared copy of the IndexSummary, whose reference count is incremented. - * It is important to close the shared copy to decrement the reference count. - * @return a shared copy of the IndexSummary object - */ - public IndexSummary summarySharedCopy() - { - return indexSummary.sharedCopy(); - } - - public DecoratedKey first() - { - return firstKey; - } - - public DecoratedKey last() - { - return lastKey; - } - - @Override // The method is expected to be called when evicting the object from sstable cache; do not call it explicitly. - public void close() throws Exception - { - indexSummary.close(); - } - } - private SummaryDbUtils() { throw new IllegalStateException(getClass() + " is static utility class and shall not be instantiated"); } - public static Summary readSummary(@NotNull TableMetadata metadata, @NotNull SSTable ssTable) throws IOException + public static IndexSummaryComponent readSummary(@NotNull TableMetadata metadata, @NotNull SSTable ssTable) throws IOException { return readSummary(ssTable, metadata.partitioner, metadata.params.minIndexInterval, metadata.params.maxIndexInterval); } @Nullable - public static Summary readSummary(@NotNull SSTable ssTable, IPartitioner partitioner, int minIndexInterval, int maxIndexInterval) throws IOException + public static IndexSummaryComponent readSummary(@NotNull SSTable sstable, + IPartitioner partitioner, + int minIndexInterval, + int maxIndexInterval) throws IOException { - try (InputStream in = ssTable.openSummaryStream()) + try (InputStream in = sstable.openSummaryStream()) { - return readSummary(in, partitioner, minIndexInterval, maxIndexInterval); - } - } - - /** - * Read and deserialize the Summary.db file - * - * @param summaryStream input stream for Summary.db file - * @param partitioner token partitioner - * @param minIndexInterval min index interval - * @param maxIndexInterval max index interval - * @return Summary object - * @throws IOException io exception - */ - @Nullable - static Summary readSummary(InputStream summaryStream, - IPartitioner partitioner, - int minIndexInterval, - int maxIndexInterval) throws IOException - { - if (summaryStream == null) - { - return null; - } - - try (DataInputStream is = new DataInputStream(summaryStream)) - { - IndexSummary indexSummary = IndexSummary.serializer.deserialize(is, partitioner, minIndexInterval, maxIndexInterval); - DecoratedKey firstKey = partitioner.decorateKey(ByteBufferUtil.readWithLength(is)); - DecoratedKey lastKey = partitioner.decorateKey(ByteBufferUtil.readWithLength(is)); - return new Summary(indexSummary, firstKey, lastKey); + return IndexSummaryComponent.readSummary(in, partitioner, minIndexInterval, maxIndexInterval); } } @@ -137,52 +65,6 @@ public interface TokenList BigInteger tokenAt(int index); } - /** - * Binary search Summary.db to find nearest offset in Index.db that precedes the token we are looking for - * - * @param summary IndexSummary from Summary.db file - * @param partitioner Cassandra partitioner to hash partition keys to token - * @param token the token we are trying to find - * @return offset into the Index.db file for the closest to partition in the Summary.db file that precedes the token we are looking for - */ - public static long findIndexOffsetInSummary(IndexSummary summary, IPartitioner partitioner, BigInteger token) - { - return summary.getPosition(binarySearchSummary(summary, partitioner, token)); - } - - /** - * The class is private on purpose. - * Think carefully if you want to open up the access modifier from private to public. - * IndexSummary's underlying memory could be released. You do not want to leak the reference and get segment fault. - */ - private static class IndexSummaryTokenList implements TokenList - { - final IPartitioner partitioner; - final IndexSummary summary; - - IndexSummaryTokenList(IPartitioner partitioner, - IndexSummary summary) - { - this.partitioner = partitioner; - this.summary = summary; - } - - public int size() - { - return summary.size(); - } - - public BigInteger tokenAt(int index) - { - return ReaderUtils.tokenToBigInteger(partitioner.decorateKey(ByteBuffer.wrap(summary.getKey(index))).getToken()); - } - } - - public static int binarySearchSummary(IndexSummary summary, IPartitioner partitioner, BigInteger token) - { - return binarySearchSummary(new IndexSummaryTokenList(partitioner, summary), token); - } - /** * Binary search the Summary.db file to find nearest index offset in Index.db for a given token. * Method lifted from org.apache.cassandra.io.sstable.IndexSummary.binarySearch(PartitionPosition key) and reworked for tokens. diff --git a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/ReaderUtilsTests.java b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/ReaderUtilsTests.java index e27dd593b..772d0dc06 100644 --- a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/ReaderUtilsTests.java +++ b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/ReaderUtilsTests.java @@ -161,10 +161,10 @@ public void testReadFirstLastPartitionKey() // Read Summary.db file for first and last partition keys from Summary.db Path summaryFile = TestSSTable.firstIn(directory.path(), FileType.SUMMARY); - SummaryDbUtils.Summary summaryKeys; + IndexSummaryComponent summaryKeys; try (InputStream in = new BufferedInputStream(Files.newInputStream(summaryFile))) { - summaryKeys = SummaryDbUtils.readSummary(in, Murmur3Partitioner.instance, 128, 2048); + summaryKeys = IndexSummaryComponent.readSummary(in, Murmur3Partitioner.instance, 128, 2048); } assertThat(summaryKeys).isNotNull(); assertThat(summaryKeys.first()).isNotNull(); diff --git a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java index 96fda2684..f43f1812a 100644 --- a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java +++ b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java @@ -88,7 +88,7 @@ public void testCache() assertThat(SSTableCache.INSTANCE.containsStats(ssTable0)).isFalse(); assertThat(SSTableCache.INSTANCE.containsCompressionMetadata(ssTable0)).isFalse(); - SummaryDbUtils.Summary key1 = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable0); + IndexSummaryComponent key1 = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable0); assertThat(key1).isNotNull(); assertThat(SSTableCache.INSTANCE.containsSummary(ssTable0)).isTrue(); assertThat(SSTableCache.INSTANCE.containsIndex(ssTable0)).isFalse(); @@ -143,7 +143,7 @@ public void testCache() assertThat(SSTableCache.INSTANCE.containsStats(ssTable1)).isFalse(); assertThat(SSTableCache.INSTANCE.containsFilter(ssTable1)).isFalse(); assertThat(SSTableCache.INSTANCE.containsCompressionMetadata(ssTable1)).isFalse(); - SummaryDbUtils.Summary key3 = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable1); + IndexSummaryComponent key3 = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable1); assertThat(key3.first()).isNotEqualTo(key1.first()); assertThat(key3.last()).isNotEqualTo(key1.last()); Pair key4 = SSTableCache.INSTANCE.keysFromIndex(metadata, ssTable1); @@ -187,7 +187,7 @@ void testNoLeakDetectedError() throws Exception ImmutableMap.of("replication_factor", 1)), partitioner).tableMetaData(); - SummaryDbUtils.Summary summary = SSTableCache.INSTANCE.keysFromSummary(metadata, sstable); + IndexSummaryComponent summary = SSTableCache.INSTANCE.keysFromSummary(metadata, sstable); assertThat(summary).isNotNull(); assertThat(SSTableCache.INSTANCE.containsSummary(sstable)).isTrue(); SSTableCache.INSTANCE.invalidate(sstable); diff --git a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java index d3b4ac383..e700356ea 100644 --- a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java +++ b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java @@ -375,13 +375,13 @@ public void testSkipNoPartitions() SSTable dataFile = TestSSTable.firstIn(directory.path()); Path summaryFile = TestSSTable.firstIn(directory.path(), FileType.SUMMARY); TableMetadata metadata = tableMetadata(schema, partitioner); - SummaryDbUtils.Summary summary; + IndexSummaryComponent summary; try (InputStream in = new BufferedInputStream(Files.newInputStream(summaryFile))) { - summary = SummaryDbUtils.readSummary(in, - metadata.partitioner, - metadata.params.minIndexInterval, - metadata.params.maxIndexInterval); + summary = IndexSummaryComponent.readSummary(in, + metadata.partitioner, + metadata.params.minIndexInterval, + metadata.params.maxIndexInterval); } // Set Spark token range equal to SSTable token range TokenRange sparkTokenRange = TokenRange.closed(ReaderUtils.tokenToBigInteger(summary.first().getToken()), diff --git a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SummaryDbTests.java b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SummaryDbTests.java index 5a7816f9c..504f4d3dc 100644 --- a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SummaryDbTests.java +++ b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SummaryDbTests.java @@ -120,7 +120,7 @@ public void testSearchSummary() long previous = -1; for (BigInteger token : tokens) { - long offset = SummaryDbUtils.findIndexOffsetInSummary(indexSummary, iPartitioner, token); + long offset = IndexDbUtils.findIndexOffsetInSummary(indexSummary, iPartitioner, token); if (previous < 0) { assertThat(offset).isEqualTo(0); From 222475142fcfaf69041cab9ace93cef290d98423 Mon Sep 17 00:00:00 2001 From: Yifan Cai Date: Fri, 5 Sep 2025 19:50:01 -0700 Subject: [PATCH 5/7] test update --- cassandra-five-zero-bridge/build.gradle | 1 + .../org/apache/cassandra/spark/reader/SSTableCacheTests.java | 4 +++- cassandra-four-zero-bridge/build.gradle | 1 + .../org/apache/cassandra/spark/reader/SSTableCacheTests.java | 4 +++- 4 files changed, 8 insertions(+), 2 deletions(-) diff --git a/cassandra-five-zero-bridge/build.gradle b/cassandra-five-zero-bridge/build.gradle index af27a47df..222d7c91b 100644 --- a/cassandra-five-zero-bridge/build.gradle +++ b/cassandra-five-zero-bridge/build.gradle @@ -73,6 +73,7 @@ dependencies { testImplementation(group: "${sparkGroupId}", name: "spark-sql_${scalaMajorVersion}", version: "${project.rootProject.sparkVersion}") testImplementation(group: 'com.github.luben', name: 'zstd-jni', version: '1.5.0-4') testImplementation(group: 'com.github.valfirst', name: 'slf4j-test', version: "${project.slf4jTestVersion}") + testImplementation(group: 'com.google.guava', name: 'guava', version: "${project.guavaVersion}") testRuntimeOnly(group: 'net.java.dev.jna', name: 'jna', version: "${jnaVersion}") testRuntimeOnly(group: 'net.java.dev.jna', name: 'jna-platform', version: "${jnaVersion}") diff --git a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java index 75ed51f22..fc4fc9df1 100644 --- a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java +++ b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java @@ -22,9 +22,11 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.Uninterruptibles; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; @@ -227,7 +229,7 @@ void testNoLeakDetectedError() throws Exception assertThat(SSTableCache.INSTANCE.containsSummary(sstable)).isFalse(); // trigger GC and wait a bit before asserting LEAK DETECTED is not logged. System.gc(); - Thread.sleep(1000); + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); System.gc(); assertThat(REF_LOGGER.getAllLoggingEvents()).isEmpty(); } diff --git a/cassandra-four-zero-bridge/build.gradle b/cassandra-four-zero-bridge/build.gradle index 959e655e1..1cf63dccf 100644 --- a/cassandra-four-zero-bridge/build.gradle +++ b/cassandra-four-zero-bridge/build.gradle @@ -61,6 +61,7 @@ dependencies { testImplementation(group: "${sparkGroupId}", name: "spark-sql_${scalaMajorVersion}", version: "${project.rootProject.sparkVersion}") testImplementation(group: 'com.github.luben', name: 'zstd-jni', version: '1.5.0-4') testImplementation(group: 'com.github.valfirst', name: 'slf4j-test', version: "${project.slf4jTestVersion}") + testImplementation(group: 'com.google.guava', name: 'guava', version: "${project.guavaVersion}") testRuntimeOnly(group: 'net.java.dev.jna', name: 'jna', version: "${jnaVersion}") testRuntimeOnly(group: 'net.java.dev.jna', name: 'jna-platform', version: "${jnaVersion}") diff --git a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java index f43f1812a..bed30185a 100644 --- a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java +++ b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java @@ -23,9 +23,11 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.Uninterruptibles; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; @@ -195,7 +197,7 @@ void testNoLeakDetectedError() throws Exception assertThat(SSTableCache.INSTANCE.containsSummary(sstable)).isFalse(); // trigger GC and wait a bit before asserting LEAK DETECTED is not logged. System.gc(); - Thread.sleep(1000); + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); System.gc(); assertThat(REF_LOGGER.getAllLoggingEvents()).isEmpty(); } From 2991034a83b6f705959be00c7702f79ac9fc2fab Mon Sep 17 00:00:00 2001 From: Yifan Cai Date: Sun, 7 Sep 2025 22:12:18 -0700 Subject: [PATCH 6/7] avoid redownloading summary component if cache evicted --- .../apache/cassandra/spark/reader/SSTableReader.java | 11 +++++------ .../apache/cassandra/spark/reader/SSTableReader.java | 11 +++++------ 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java index fe1744d05..df085663b 100644 --- a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java +++ b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java @@ -251,12 +251,14 @@ public SSTableReader(@NotNull TableMetadata metadata, IndexSummaryComponent summary = null; Pair keys = null; + IndexSummary indexSummary = null; // indexSummary is only assigned when readIndexOffset is enabled try { now = System.nanoTime(); summary = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable); stats.readSummaryDb(ssTable, System.nanoTime() - now); keys = Pair.of(summary.first(), summary.last()); + indexSummary = readIndexOffset ? summary.summarySharedCopy() : null; } catch (IOException exception) { @@ -390,15 +392,12 @@ public SSTableReader(@NotNull TableMetadata metadata, buildColumnFilter(metadata, columnFilter)); this.metadata = metadata; - // The summary might be evicted already if downloading other components takes too long; - // Get it from cache again to avoid using the already evicted object - summary = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable); - if (readIndexOffset && summary != null) + if (indexSummary != null) { - try (IndexSummary indexSummary = summary.summarySharedCopy()) + try (IndexSummary indexSummaryCopy = indexSummary) { extractRange(sparkRangeFilter, partitionKeyFilters) - .ifPresent(range -> readOffsets(indexSummary, range)); + .ifPresent(range -> readOffsets(indexSummaryCopy, range)); } } else diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java index 54ddcc390..4392594e5 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java @@ -249,12 +249,14 @@ public SSTableReader(@NotNull TableMetadata metadata, IndexSummaryComponent summary = null; Pair keys = null; + IndexSummary indexSummary = null; // indexSummary is only assigned when readIndexOffset is enabled try { now = System.nanoTime(); summary = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable); stats.readSummaryDb(ssTable, System.nanoTime() - now); keys = Pair.of(summary.first(), summary.last()); + indexSummary = readIndexOffset ? summary.summarySharedCopy() : null; } catch (IOException exception) { @@ -388,15 +390,12 @@ public SSTableReader(@NotNull TableMetadata metadata, buildColumnFilter(metadata, columnFilter)); this.metadata = metadata; - // The summary might be evicted already if downloading other components takes too long; - // Get it from cache again to avoid using the already evicted object - summary = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable); - if (readIndexOffset && summary != null) + if (indexSummary != null) { - try (IndexSummary indexSummary = summary.summarySharedCopy()) + try (IndexSummary indexSummaryCopy = indexSummary) { extractRange(sparkRangeFilter, partitionKeyFilters) - .ifPresent(range -> readOffsets(indexSummary, range)); + .ifPresent(range -> readOffsets(indexSummaryCopy, range)); } } else From 4190d40539e944c551b936ebbc4be8eb7a66c6f7 Mon Sep 17 00:00:00 2001 From: Yifan Cai Date: Mon, 8 Sep 2025 00:46:27 -0700 Subject: [PATCH 7/7] revert slf4j-test and the new test; it slows down test execution too much as all logs are stored and synchronized. --- cassandra-five-zero-bridge/build.gradle | 2 - .../spark/reader/SSTableCacheTests.java | 43 ------------------- cassandra-four-zero-bridge/build.gradle | 2 - .../spark/reader/SSTableCacheTests.java | 43 ------------------- gradle.properties | 1 - 5 files changed, 91 deletions(-) diff --git a/cassandra-five-zero-bridge/build.gradle b/cassandra-five-zero-bridge/build.gradle index 222d7c91b..0d730f2bd 100644 --- a/cassandra-five-zero-bridge/build.gradle +++ b/cassandra-five-zero-bridge/build.gradle @@ -72,8 +72,6 @@ dependencies { testImplementation(group: "${sparkGroupId}", name: "spark-core_${scalaMajorVersion}", version: "${project.rootProject.sparkVersion}") testImplementation(group: "${sparkGroupId}", name: "spark-sql_${scalaMajorVersion}", version: "${project.rootProject.sparkVersion}") testImplementation(group: 'com.github.luben', name: 'zstd-jni', version: '1.5.0-4') - testImplementation(group: 'com.github.valfirst', name: 'slf4j-test', version: "${project.slf4jTestVersion}") - testImplementation(group: 'com.google.guava', name: 'guava', version: "${project.guavaVersion}") testRuntimeOnly(group: 'net.java.dev.jna', name: 'jna', version: "${jnaVersion}") testRuntimeOnly(group: 'net.java.dev.jna', name: 'jna-platform', version: "${jnaVersion}") diff --git a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java index fc4fc9df1..2b6a3aaf2 100644 --- a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java +++ b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java @@ -22,16 +22,11 @@ import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.Uninterruptibles; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; -import com.github.valfirst.slf4jtest.TestLogger; -import com.github.valfirst.slf4jtest.TestLoggerFactory; import org.apache.cassandra.bridge.CassandraBridgeImplementation; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.DecoratedKey; @@ -57,13 +52,6 @@ public class SSTableCacheTests { private static final CassandraBridgeImplementation BRIDGE = new CassandraBridgeImplementation(); - private static final TestLogger REF_LOGGER = TestLoggerFactory.getTestLogger("org.apache.cassandra.utils.concurrent.Ref"); - - @AfterEach - void cleanup() - { - REF_LOGGER.clear(); - } // CHECKSTYLE IGNORE: Long method @Test @@ -203,35 +191,4 @@ public void testCache() } }); } - - @Test - void testNoLeakDetectedError() throws Exception - { - Partitioner partitioner = Partitioner.Murmur3Partitioner; - try (TemporaryDirectory directory = new TemporaryDirectory()) - { - // Write an SSTable - TestSchema schema = TestSchema.basic(BRIDGE); - schema.writeSSTable(directory, BRIDGE, partitioner, - writer -> IntStream.range(0, 10).forEach(index -> writer.write(index, 0, index))); - SSTable sstable = TestSSTable.firstIn(directory.path()); - TableMetadata metadata = new SchemaBuilder(schema.createStatement, - schema.keyspace, - new ReplicationFactor(ReplicationFactor.ReplicationStrategy.SimpleStrategy, - ImmutableMap.of("replication_factor", 1)), - partitioner).tableMetaData(); - - IndexSummaryComponent summary = SSTableCache.INSTANCE.keysFromSummary(metadata, sstable); - assertThat(summary).isNotNull(); - assertThat(SSTableCache.INSTANCE.containsSummary(sstable)).isTrue(); - SSTableCache.INSTANCE.invalidate(sstable); - summary = null; - assertThat(SSTableCache.INSTANCE.containsSummary(sstable)).isFalse(); - // trigger GC and wait a bit before asserting LEAK DETECTED is not logged. - System.gc(); - Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); - System.gc(); - assertThat(REF_LOGGER.getAllLoggingEvents()).isEmpty(); - } - } } diff --git a/cassandra-four-zero-bridge/build.gradle b/cassandra-four-zero-bridge/build.gradle index 1cf63dccf..add277fb0 100644 --- a/cassandra-four-zero-bridge/build.gradle +++ b/cassandra-four-zero-bridge/build.gradle @@ -60,8 +60,6 @@ dependencies { testImplementation(group: "${sparkGroupId}", name: "spark-core_${scalaMajorVersion}", version: "${project.rootProject.sparkVersion}") testImplementation(group: "${sparkGroupId}", name: "spark-sql_${scalaMajorVersion}", version: "${project.rootProject.sparkVersion}") testImplementation(group: 'com.github.luben', name: 'zstd-jni', version: '1.5.0-4') - testImplementation(group: 'com.github.valfirst', name: 'slf4j-test', version: "${project.slf4jTestVersion}") - testImplementation(group: 'com.google.guava', name: 'guava', version: "${project.guavaVersion}") testRuntimeOnly(group: 'net.java.dev.jna', name: 'jna', version: "${jnaVersion}") testRuntimeOnly(group: 'net.java.dev.jna', name: 'jna-platform', version: "${jnaVersion}") diff --git a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java index bed30185a..257288492 100644 --- a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java +++ b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableCacheTests.java @@ -23,16 +23,11 @@ import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.Uninterruptibles; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; -import com.github.valfirst.slf4jtest.TestLogger; -import com.github.valfirst.slf4jtest.TestLoggerFactory; import org.apache.cassandra.bridge.CassandraBridgeImplementation; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.io.sstable.Descriptor; @@ -55,13 +50,6 @@ public class SSTableCacheTests { private static final CassandraBridgeImplementation BRIDGE = new CassandraBridgeImplementation(); - private static final TestLogger REF_LOGGER = TestLoggerFactory.getTestLogger("org.apache.cassandra.utils.concurrent.Ref"); - - @AfterEach - void cleanup() - { - REF_LOGGER.clear(); - } @Test public void testCache() @@ -171,35 +159,4 @@ public void testCache() } }); } - - @Test - void testNoLeakDetectedError() throws Exception - { - Partitioner partitioner = Partitioner.Murmur3Partitioner; - try (TemporaryDirectory directory = new TemporaryDirectory()) - { - // Write an SSTable - TestSchema schema = TestSchema.basic(BRIDGE); - schema.writeSSTable(directory, BRIDGE, partitioner, - writer -> IntStream.range(0, 10).forEach(index -> writer.write(index, 0, index))); - SSTable sstable = TestSSTable.firstIn(directory.path()); - TableMetadata metadata = new SchemaBuilder(schema.createStatement, - schema.keyspace, - new ReplicationFactor(ReplicationFactor.ReplicationStrategy.SimpleStrategy, - ImmutableMap.of("replication_factor", 1)), - partitioner).tableMetaData(); - - IndexSummaryComponent summary = SSTableCache.INSTANCE.keysFromSummary(metadata, sstable); - assertThat(summary).isNotNull(); - assertThat(SSTableCache.INSTANCE.containsSummary(sstable)).isTrue(); - SSTableCache.INSTANCE.invalidate(sstable); - summary = null; - assertThat(SSTableCache.INSTANCE.containsSummary(sstable)).isFalse(); - // trigger GC and wait a bit before asserting LEAK DETECTED is not logged. - System.gc(); - Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); - System.gc(); - assertThat(REF_LOGGER.getAllLoggingEvents()).isEmpty(); - } - } } diff --git a/gradle.properties b/gradle.properties index 7d2afb34b..4312771ca 100644 --- a/gradle.properties +++ b/gradle.properties @@ -34,7 +34,6 @@ scala=2.12 spark=3 kryoVersion=4.0.2 slf4jApiVersion=1.7.30 -slf4jTestVersion=2.3.0 guavaVersion=16.0.1 # force version 4.5.1 of vertx to prevent issues initializing io.vertx.core.json.jackson.JacksonCodec, # which requires a newer version of jackson, which is not available in spark 2