diff --git a/CHANGES.txt b/CHANGES.txt index ec3409eac..58d4cf960 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 0.5.0 ----- + * Regenerate bloom filters for CQLSSTableWriter (CASSANALYTICS-167) * Avoid Spark 4 partitioning warnings during bulk reads (CASSANALYTICS-171) * Spark 4.0 Support (CASSANALYTICS-34) * Add IAM credential support for S3 storage transport (CASSANALYTICS-155) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java index 3d09be476..877aafa15 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java @@ -43,12 +43,14 @@ import org.apache.cassandra.bridge.SSTableDescriptor; import org.apache.cassandra.spark.common.Digest; import org.apache.cassandra.spark.common.SSTables; +import org.apache.cassandra.spark.data.FileSystemSSTable; import org.apache.cassandra.spark.data.FileType; import org.apache.cassandra.spark.data.LocalDataLayer; import org.apache.cassandra.spark.data.partitioner.Partitioner; import org.apache.cassandra.spark.reader.RowData; import org.apache.cassandra.spark.reader.StreamScanner; import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter; +import org.apache.cassandra.spark.stats.BufferingInputStreamStats; import org.apache.cassandra.spark.utils.DigestAlgorithm; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -227,6 +229,9 @@ public synchronized Map prepareSStablesToSend(@NotNull BulkWriterC }; Set dataFilePaths = new HashSet<>(); Map fileDigests = new HashMap<>(); + // FIXME: CQLSSTableWriter may produce incomplete Filter.db file, rebuilding it manually (see CASSANDRA-21423). + // rebuild Filter.db files before calculating their digest + rebuildFilterComponents(writerContext, sstableFilter); try (DirectoryStream stream = Files.newDirectoryStream(getOutDir(), sstableFilter)) { for (Path path : stream) @@ -281,15 +286,21 @@ public synchronized void close(BulkWriterContext writerContext) throws IOExcepti // Only process new SSTables produced during final flush DirectoryStream.Filter unhashedFilter = path -> !hashedFiles.contains(path); - for (Path dataFile : getDataFileStream(unhashedFilter)) + // FIXME: CQLSSTableWriter may produce incomplete Filter.db file, rebuilding it manually (see CASSANDRA-21423). + rebuildFilterComponents(writerContext, unhashedFilter); + + try (DirectoryStream dataFileStream = getDataFileStream(unhashedFilter)) { - // NOTE: We calculate file hashes before re-reading so that we know what we hashed - // is what we validated. Then we send these along with the files and the - // receiving end re-hashes the files to make sure they still match. - Map newFileDigests = calculateFileDigestMap(dataFile); - overallFileDigests.putAll(newFileDigests); - newlyHashedFiles.addAll(newFileDigests.keySet()); - sstableCount += 1; + for (Path dataFile : dataFileStream) + { + // NOTE: We calculate file hashes before re-reading so that we know what we hashed + // is what we validated. Then we send these along with the files and the + // receiving end re-hashes the files to make sure they still match. + Map newFileDigests = calculateFileDigestMap(dataFile); + overallFileDigests.putAll(newFileDigests); + newlyHashedFiles.addAll(newFileDigests.keySet()); + sstableCount += 1; + } } // Only calculate size for newly hashed files, not all files in overallFileDigests // (previously hashed files may have been deleted by prepareSStablesToSend) @@ -297,6 +308,21 @@ public synchronized void close(BulkWriterContext writerContext) throws IOExcepti validateSSTables(writerContext); } + protected void rebuildFilterComponents(@NotNull BulkWriterContext writerContext, + @NotNull DirectoryStream.Filter filter) throws IOException + { + LocalDataLayer layer = buildLocalDataLayer(writerContext, getOutDir(), null); + try (DirectoryStream dataFileStream = getDataFileStream(filter)) + { + for (Path dataFile : dataFileStream) + { + FileSystemSSTable ssTable = new FileSystemSSTable(dataFile, false, BufferingInputStreamStats::doNothingStats); + writerContext.bridge().rebuildBloomFilter(layer.partitioner(), layer.cqlTable(), ssTable, getOutDir()); + LOGGER.debug("Rebuilt bloom filter for sstable {}", dataFile); + } + } + } + @VisibleForTesting public void validateSSTables(@NotNull BulkWriterContext writerContext) { @@ -319,26 +345,7 @@ public void validateSSTables(@NotNull BulkWriterContext writerContext, @NotNull // and then validate all of them in parallel threads try { - CassandraVersion version = CassandraBridgeFactory.getCassandraVersion(writerContext.cluster().getLowestCassandraVersion()); - String keyspace = writerContext.job().qualifiedTableName().keyspace(); - String schema = writerContext.schema().getTableSchema().createStatement; - Partitioner partitioner = writerContext.cluster().getPartitioner(); - Set udtStatements = writerContext.schema().getUserDefinedTypeStatements(); - LocalDataLayer layer = new LocalDataLayer(version, - partitioner, - keyspace, - schema, - udtStatements, - Collections.emptyList() /* requestedFeatures */, - false /* useSSTableInputStream */, - null /* statsClass */, - SSTableTimeRangeFilter.ALL, - outputDirectory.toString()); - if (dataFilePaths != null) - { - layer.setDataFilePaths(dataFilePaths); - } - + LocalDataLayer layer = buildLocalDataLayer(writerContext, outputDirectory, dataFilePaths); try (StreamScanner scanner = layer.openCompactionScanner(partitionId, Collections.emptyList(), null)) { while (scanner.next()) @@ -354,6 +361,30 @@ public void validateSSTables(@NotNull BulkWriterContext writerContext, @NotNull } } + private LocalDataLayer buildLocalDataLayer(@NotNull BulkWriterContext writerContext, @NotNull Path outputDirectory, @Nullable Set dataFilePaths) + { + CassandraVersion version = CassandraBridgeFactory.getCassandraVersion(writerContext.cluster().getLowestCassandraVersion()); + String keyspace = writerContext.job().qualifiedTableName().keyspace(); + String schema = writerContext.schema().getTableSchema().createStatement; + Partitioner partitioner = writerContext.cluster().getPartitioner(); + Set udtStatements = writerContext.schema().getUserDefinedTypeStatements(); + LocalDataLayer layer = new LocalDataLayer(version, + partitioner, + keyspace, + schema, + udtStatements, + Collections.emptyList() /* requestedFeatures */, + false /* useSSTableInputStream */, + null /* statsClass */, + SSTableTimeRangeFilter.ALL, + outputDirectory.toString()); + if (dataFilePaths != null) + { + layer.setDataFilePaths(dataFilePaths); + } + return layer; + } + private DirectoryStream getDataFileStream(DirectoryStream.Filter filter) throws IOException { // Combine the data file filter with the provided filter diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java index 64b57e1cd..0a30cde03 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java @@ -69,7 +69,8 @@ import static org.apache.cassandra.spark.utils.FilterUtils.parseSSTableTimeRangeFilter; /** - * Basic DataLayer implementation to read SSTables from local file system. Mostly used for testing. + * Basic DataLayer implementation to read SSTables from local file system. + * Mostly used for testing, but also for validating SSTables during bulk data insertion. */ @SuppressWarnings({"unused", "WeakerAccess"}) public class LocalDataLayer extends DataLayer implements Serializable diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSortedSSTableWriter.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSortedSSTableWriter.java index 0498b7e85..39a4daf6c 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSortedSSTableWriter.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSortedSSTableWriter.java @@ -19,6 +19,7 @@ package org.apache.cassandra.spark.bulkwriter; +import java.nio.file.DirectoryStream; import java.nio.file.Path; import org.apache.cassandra.spark.utils.DigestAlgorithm; @@ -36,4 +37,11 @@ public void validateSSTables(@NotNull BulkWriterContext writerContext) { // Skip validation for these tests } + + @Override + protected void rebuildFilterComponents(@NotNull BulkWriterContext writerContext, + @NotNull DirectoryStream.Filter filter) + { + // Skip rebuild for these tests + } } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriterTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriterTest.java index 271c760d6..2bd5b5634 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriterTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriterTest.java @@ -19,17 +19,23 @@ package org.apache.cassandra.spark.bulkwriter; +import java.io.File; import java.io.IOException; import java.math.BigInteger; +import java.nio.ByteBuffer; import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -37,6 +43,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Uninterruptibles; import org.junit.jupiter.api.AfterAll; @@ -45,12 +52,20 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; +import org.apache.cassandra.bridge.BloomFilter; +import org.apache.cassandra.bridge.CassandraBridge; +import org.apache.cassandra.bridge.CassandraBridgeFactory; import org.apache.cassandra.bridge.CassandraVersion; import org.apache.cassandra.bridge.CassandraVersionFeatures; import org.apache.cassandra.bridge.SSTableDescriptor; import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel; import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; import org.apache.cassandra.spark.common.Digest; +import org.apache.cassandra.spark.data.CqlTable; +import org.apache.cassandra.spark.data.FileSystemSSTable; +import org.apache.cassandra.spark.data.ReplicationFactor; +import org.apache.cassandra.spark.data.partitioner.Partitioner; +import org.apache.cassandra.spark.stats.BufferingInputStreamStats; import org.apache.cassandra.spark.utils.XXHash32DigestAlgorithm; import org.jetbrains.annotations.NotNull; @@ -148,6 +163,122 @@ public void canCreateWriterForVersion(String version) throws IOException tw.validateSSTables(writerContext, tw.getOutDir(), dataFilePaths); } + @ParameterizedTest + @MethodSource("supportedVersions") + public void testBloomFilterRebuild(String version) throws IOException + { + int rowCount = 50_000; + CassandraBridge bridge = CassandraBridgeFactory.get(version); + MockBulkWriterContext writerContext = new MockBulkWriterContext(tokenRangeMapping, version, ConsistencyLevel.CL.LOCAL_QUORUM); + Partitioner partitioner = writerContext.getPartitioner(); + CqlTable cqlTable = bridge.buildSchema(writerContext.schema().getTableSchema().createStatement, + writerContext.qualifiedTableName().keyspace(), + new ReplicationFactor(ReplicationFactor.ReplicationStrategy.SimpleStrategy, + ImmutableMap.of("replication_factor", 1)), + partitioner, + Collections.emptySet()); + SortedMap> sortedKeys = new TreeMap<>(); + for (int i = 0; i < rowCount; ++i) + { + List keys = ImmutableList.of(String.valueOf(i), "1"); + AbstractMap.SimpleEntry partitionKey = bridge.getPartitionKey(cqlTable, partitioner, keys); + sortedKeys.put(partitionKey.getValue(), keys); + } + + SortedSSTableWriter tw = new SortedSSTableWriter(writerContext, tmpDir, new XXHash32DigestAlgorithm(), 1); + List allSSTables = new ArrayList<>(); + tw.setSSTablesProducedListener(allSSTables::addAll); + for (BigInteger token : sortedKeys.keySet()) + { + List partitionKey = sortedKeys.get(token); + tw.addRow(token, + ImmutableMap.of("id", Integer.parseInt(partitionKey.get(0)), + "date", Integer.parseInt(partitionKey.get(1)), + "course", "foo", "marks", 1)); + } + tw.close(writerContext); + + assertThat(allSSTables).hasSize(1); + + Set filterFilePaths = new HashSet<>(); + try (DirectoryStream filterFileStream = Files.newDirectoryStream(tw.getOutDir(), "*-Filter.db")) + { + filterFileStream.forEach(filterFilePaths::add); + } + + assertThat(filterFilePaths).hasSize(1); + + Path filterFile = filterFilePaths.iterator().next(); + String dataFileName = filterFile.toFile().getName().replace("-Filter", "-Data"); + Path dataFilePath = filterFile.getParent().resolve(dataFileName); + FileSystemSSTable ssTable = new FileSystemSSTable(dataFilePath, false, BufferingInputStreamStats::doNothingStats); + + BloomFilter bloomFilter = bridge.openBloomFilter(partitioner, + writerContext.qualifiedTableName().keyspace(), + writerContext.qualifiedTableName().table(), + ssTable); + + // second column is always set to 1 when inserting data + List searchKeys = bridge.encodePartitionKeys(partitioner, + writerContext.qualifiedTableName().keyspace(), + writerContext.schema().getTableSchema().createStatement, + ImmutableList.of(ImmutableList.of("1", "1"), ImmutableList.of("7", "2"))); + + assertThat(bloomFilter.mightContain(searchKeys.get(0))).isTrue(); + // Flaky assertion: bloom filters can answer false positive, but since we are using limited data set, + // it is unlikely to happen. + assertThat(bloomFilter.doesNotContain(searchKeys.get(1))).isTrue(); + } + + @ParameterizedTest + @MethodSource("supportedVersions") + public void testBloomFilterRebuildErrorHandling(String version) throws IOException + { + MockBulkWriterContext writerContext = new MockBulkWriterContext(tokenRangeMapping, version, ConsistencyLevel.CL.LOCAL_QUORUM); + SortedSSTableWriter tw = new SortedSSTableWriter(writerContext, tmpDir, new XXHash32DigestAlgorithm(), 1) + { + protected void rebuildFilterComponents(@NotNull BulkWriterContext writerContext, + @NotNull DirectoryStream.Filter filter) throws IOException + { + // temporarily move index file to simulate error in bloom filter rebuild process + try (DirectoryStream indexFileStream = Files.newDirectoryStream(getOutDir(), "*.db")) + { + indexFileStream.forEach(indexFilePath -> { + for (String indexSuffix : Arrays.asList("Partitions.db", "Index.db")) + { + if (indexFilePath.toFile().getName().endsWith(indexSuffix)) + { + File indexFile = indexFilePath.toFile(); + boolean moved = indexFile.renameTo(new File(indexFile.getAbsolutePath() + "_hidden")); + assertThat(moved).isTrue(); + } + } + }); + } + super.rebuildFilterComponents(writerContext, filter); + // move the index files back + try (DirectoryStream hiddenFileStream = Files.newDirectoryStream(getOutDir(), "*_hidden")) + { + hiddenFileStream.forEach(hiddenFilePath -> { + File hiddenFile = hiddenFilePath.toFile(); + boolean moved = hiddenFile.renameTo(new File(hiddenFile.getParent(), hiddenFile.getName().replace("_hidden", ""))); + assertThat(moved).isTrue(); + }); + } + } + }; + List allSSTables = new ArrayList<>(); + tw.setSSTablesProducedListener(allSSTables::addAll); + tw.addRow(BigInteger.ONE, ImmutableMap.of("id", 1, "date", 1, "course", "foo", "marks", 1)); + tw.close(writerContext); + assertThat(allSSTables).hasSize(1); + // verify that bloom filter was not created + try (DirectoryStream filterFileStream = Files.newDirectoryStream(tw.getOutDir(), "*Filter.db")) + { + assertThat(filterFileStream.iterator().hasNext()).isFalse(); + } + } + /** * Tests the race condition fix between prepareSStablesToSend (called from background threads) * and close (called from the main thread). This test exercises CASSANALYTICS-107. diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java index 9025d882b..701ca337d 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java @@ -31,6 +31,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.cassandra.bridge.CassandraBridge; @@ -467,7 +468,7 @@ private String getKeyDef() String clusteringKey = primaryColumns.stream() .map(this::maybeQuoteIdentifierIfRequested) .collect(Collectors.joining(",")); - return "PRIMARY KEY (" + partitionKey + clusteringKey + ")"; + return "PRIMARY KEY (" + partitionKey + (StringUtils.isNotBlank(clusteringKey) ? ("," + clusteringKey) : "") + ")"; } @Override diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java index 6bbf7f142..67c5bc7a0 100644 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java +++ b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java @@ -556,6 +556,21 @@ public abstract BloomFilter openBloomFilter(@NotNull Partitioner partitioner, @NotNull String table, @NotNull SSTable ssTable) throws IOException; + /** + * Recreates bloom filter for a given SSTable based on index file. + * Old {@code Filter.db} file is overwritten if present. + * + * @param partitioner Cassandra partitioner + * @param cqltable CQL table + * @param ssTable SSTable instance + * @param directory Directory where SSTable files are present + * @throws IOException + */ + public abstract void rebuildBloomFilter(@NotNull Partitioner partitioner, + @NotNull CqlTable cqltable, + @NotNull SSTable ssTable, + @NotNull Path directory) throws IOException; + /** * @param partitioner Cassandra partitioner * @param keyspace keyspace name diff --git a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java index 88d5c6df0..5dd648552 100644 --- a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java +++ b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java @@ -71,12 +71,14 @@ import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.ISSTableScanner; import org.apache.cassandra.io.sstable.SSTableTombstoneWriter; +import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.bti.BtiReaderUtils; import org.apache.cassandra.io.sstable.metadata.MetadataComponent; import org.apache.cassandra.io.sstable.metadata.MetadataType; import org.apache.cassandra.io.sstable.metadata.StatsMetadata; import org.apache.cassandra.io.util.File; +import org.apache.cassandra.io.util.FileOutputStreamPlus; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.TableMetadataRef; @@ -115,6 +117,8 @@ import org.apache.cassandra.util.IntWrapper; import org.apache.cassandra.utils.BloomFilter; import org.apache.cassandra.utils.CompressionUtilImplementation; +import org.apache.cassandra.utils.FilterDbUtils; +import org.apache.cassandra.utils.IFilter; import org.apache.cassandra.utils.TimeUUID; import org.apache.cassandra.utils.TokenUtils; import org.jetbrains.annotations.NotNull; @@ -348,6 +352,64 @@ public org.apache.cassandra.bridge.BloomFilter openBloomFilter(Partitioner parti }; } + @Override + public void rebuildBloomFilter(@NotNull Partitioner partitioner, + @NotNull CqlTable cqltable, + @NotNull SSTable ssTable, + @NotNull Path directory) throws IOException + { + String keyspace = cqltable.keyspace(); + String table = cqltable.table(); + IPartitioner iPartitioner = getPartitioner(partitioner); + SchemaBuilder schemaBuilder = new SchemaBuilder(cqltable, partitioner); + TableMetadata tableMetadata = schemaBuilder.tableMetaData(); + + if (tableMetadata.params.bloomFilterFpChance == 1.0) + { + return; // bloom filter has been disabled for the table + } + + Descriptor descriptor = ReaderUtils.constructDescriptor(keyspace, table, ssTable); + File filterFile = new File(directory, descriptor.fileFor(SSTableFormat.Components.FILTER).name()); + try (IFilter filter = FilterDbUtils.buildBloomFilter(cqltable, ssTable, tableMetadata)) + { + Function tracker = bytes -> { + DecoratedKey key = iPartitioner.decorateKey(bytes); + filter.add(key); + return false; + }; + if (ssTable.isBtiFormat()) + { + BtiReaderUtils.readPrimaryIndex(ssTable, iPartitioner, descriptor, + tableMetadata.params.crcCheckChance, tracker); + } + else + { + try (InputStream primaryIndex = ssTable.openPrimaryIndexStream()) + { + if (primaryIndex == null) + { + throw new IOException("Could not read Index.db file"); + } + ReaderUtils.readPrimaryIndex(primaryIndex, tracker); + } + } + + try (FileOutputStreamPlus stream = filterFile.newOutputStream(File.WriteMode.OVERWRITE)) + { + filter.serialize(stream, descriptor.version.hasOldBfFormat()); + stream.flush(); + stream.sync(); + } + } + catch (Exception e) + { + LOGGER.error("Failed to rebuild bloom filter for sstable {}/{}", directory, ssTable.getDataFileName(), e); + // Remove potentially corrupted bloom filter. It will be rebuilt by Cassandra during sstable import. + Files.deleteIfExists(filterFile.toPath()); + } + } + private BloomFilter openBloomFilter(Descriptor descriptor, SSTable ssTable) throws IOException { return ReaderUtils.readFilter(ssTable, descriptor); diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java index 943a3b76a..5a8f9ed8b 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.FileNotFoundException; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.ObjectInputStream; @@ -44,6 +45,7 @@ import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.function.Consumer; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -68,6 +70,7 @@ import org.apache.cassandra.io.compress.ICompressor; import org.apache.cassandra.io.compress.LZ4Compressor; import org.apache.cassandra.io.sstable.CQLSSTableWriter; +import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.ISSTableScanner; import org.apache.cassandra.io.sstable.SSTableTombstoneWriter; @@ -75,6 +78,8 @@ import org.apache.cassandra.io.sstable.metadata.MetadataComponent; import org.apache.cassandra.io.sstable.metadata.MetadataType; import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus; +import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.TableMetadataRef; @@ -111,7 +116,11 @@ import org.apache.cassandra.util.CompressionUtil; import org.apache.cassandra.util.IntWrapper; import org.apache.cassandra.utils.BloomFilter; +import org.apache.cassandra.utils.BloomFilterSerializer; import org.apache.cassandra.utils.CompressionUtilImplementation; +import org.apache.cassandra.utils.FilterDbUtils; +import org.apache.cassandra.utils.IFilter; +import org.apache.cassandra.utils.SyncUtil; import org.apache.cassandra.utils.TokenUtils; import org.apache.cassandra.utils.UUIDGen; import org.jetbrains.annotations.NotNull; @@ -324,6 +333,58 @@ public List encodePartitionKeys(Partitioner partitioner, String keys return keys.stream().map(key -> buildPartitionKey(table, key)).collect(Collectors.toList()); } + @Override + public void rebuildBloomFilter(@NotNull Partitioner partitioner, + @NotNull CqlTable cqltable, + @NotNull SSTable ssTable, + @NotNull Path directory) throws IOException + { + String keyspace = cqltable.keyspace(); + String table = cqltable.table(); + IPartitioner iPartitioner = getPartitioner(partitioner); + SchemaBuilder schemaBuilder = new SchemaBuilder(cqltable, partitioner); + TableMetadata tableMetadata = schemaBuilder.tableMetaData(); + + if (tableMetadata.params.bloomFilterFpChance == 1.0) + { + return; // bloom filter has been disabled for the table + } + + Descriptor descriptor = ReaderUtils.constructDescriptor(keyspace, table, ssTable); + File filterFile = new File(directory.toFile(), descriptor.relativeFilenameFor(Component.FILTER)); + try (IFilter filter = FilterDbUtils.buildBloomFilter(cqltable, ssTable, tableMetadata)) + { + Function tracker = bytes -> { + DecoratedKey key = iPartitioner.decorateKey(bytes); + filter.add(key); + return false; + }; + + try (InputStream primaryIndex = ssTable.openPrimaryIndexStream()) + { + if (primaryIndex == null) + { + throw new IOException("Could not read Index.db file"); + } + ReaderUtils.readPrimaryIndex(primaryIndex, tracker); + } + + try (FileOutputStream fos = new FileOutputStream(filterFile, false); + DataOutputStreamPlus stream = new BufferedDataOutputStreamPlus(fos)) + { + BloomFilterSerializer.serialize((BloomFilter) filter, stream); + stream.flush(); + SyncUtil.sync(fos); + } + } + catch (Exception e) + { + LOGGER.error("Failed to rebuild bloom filter for sstable {}/{}", directory, ssTable.getDataFileName(), e); + // Remove potentially corrupted bloom filter. It will be rebuilt by Cassandra during sstable import. + Files.deleteIfExists(filterFile.toPath()); + } + } + @Override public org.apache.cassandra.bridge.BloomFilter openBloomFilter(Partitioner partitioner, String keyspace, diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/utils/FilterDbUtils.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/utils/FilterDbUtils.java new file mode 100644 index 000000000..d8fc46fae --- /dev/null +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/utils/FilterDbUtils.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.utils; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.Map; + +import org.apache.cassandra.io.sstable.metadata.MetadataComponent; +import org.apache.cassandra.io.sstable.metadata.MetadataType; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.spark.data.CqlTable; +import org.apache.cassandra.spark.data.SSTable; +import org.apache.cassandra.spark.reader.ReaderUtils; +import org.jetbrains.annotations.NotNull; + +/** + * Helper methods for managing bloom filters. + */ +public class FilterDbUtils +{ + private FilterDbUtils() + { + throw new IllegalStateException(getClass() + " is static utility class and shall not be instantiated"); + } + + public static IFilter buildBloomFilter(@NotNull CqlTable cqltable, + @NotNull SSTable ssTable, + @NotNull TableMetadata tableMetadata) throws IOException + { + String keyspace = cqltable.keyspace(); + String table = cqltable.table(); + + Map componentMap = ReaderUtils.deserializeStatsMetadata(keyspace, table, ssTable, EnumSet.of(MetadataType.STATS)); + StatsMetadata statsMetadata = (StatsMetadata) componentMap.get(MetadataType.STATS); + + return FilterFactory.getFilter(statsMetadata.totalRows, tableMetadata.params.bloomFilterFpChance); + } +}