Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
0.5.0
-----
* Regenerate bloom filters for CQLSSTableWriter (CASSANALYTICS-167)
* Avoid Spark 4 partitioning warnings during bulk reads (CASSANALYTICS-171)
* Spark 4.0 Support (CASSANALYTICS-34)
* Add IAM credential support for S3 storage transport (CASSANALYTICS-155)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@
import org.apache.cassandra.bridge.SSTableDescriptor;
import org.apache.cassandra.spark.common.Digest;
import org.apache.cassandra.spark.common.SSTables;
import org.apache.cassandra.spark.data.FileSystemSSTable;
import org.apache.cassandra.spark.data.FileType;
import org.apache.cassandra.spark.data.LocalDataLayer;
import org.apache.cassandra.spark.data.partitioner.Partitioner;
import org.apache.cassandra.spark.reader.RowData;
import org.apache.cassandra.spark.reader.StreamScanner;
import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
import org.apache.cassandra.spark.stats.BufferingInputStreamStats;
import org.apache.cassandra.spark.utils.DigestAlgorithm;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -227,6 +229,9 @@ public synchronized Map<Path, Digest> prepareSStablesToSend(@NotNull BulkWriterC
};
Set<Path> dataFilePaths = new HashSet<>();
Map<Path, Digest> fileDigests = new HashMap<>();
// FIXME: CQLSSTableWriter may produce incomplete Filter.db file, rebuilding it manually (see CASSANDRA-21423).
// rebuild Filter.db files before calculating their digest
rebuildFilterComponents(writerContext, sstableFilter);
try (DirectoryStream<Path> stream = Files.newDirectoryStream(getOutDir(), sstableFilter))
{
for (Path path : stream)
Expand Down Expand Up @@ -281,22 +286,43 @@ public synchronized void close(BulkWriterContext writerContext) throws IOExcepti
// Only process new SSTables produced during final flush
DirectoryStream.Filter<Path> unhashedFilter = path -> !hashedFiles.contains(path);

for (Path dataFile : getDataFileStream(unhashedFilter))
// FIXME: CQLSSTableWriter may produce incomplete Filter.db file, rebuilding it manually (see CASSANDRA-21423).
rebuildFilterComponents(writerContext, unhashedFilter);

try (DirectoryStream<Path> dataFileStream = getDataFileStream(unhashedFilter))
{
// NOTE: We calculate file hashes before re-reading so that we know what we hashed
// is what we validated. Then we send these along with the files and the
// receiving end re-hashes the files to make sure they still match.
Map<Path, Digest> newFileDigests = calculateFileDigestMap(dataFile);
overallFileDigests.putAll(newFileDigests);
newlyHashedFiles.addAll(newFileDigests.keySet());
sstableCount += 1;
for (Path dataFile : dataFileStream)
{
// NOTE: We calculate file hashes before re-reading so that we know what we hashed
// is what we validated. Then we send these along with the files and the
// receiving end re-hashes the files to make sure they still match.
Map<Path, Digest> newFileDigests = calculateFileDigestMap(dataFile);
overallFileDigests.putAll(newFileDigests);
newlyHashedFiles.addAll(newFileDigests.keySet());
sstableCount += 1;
}
}
// Only calculate size for newly hashed files, not all files in overallFileDigests
// (previously hashed files may have been deleted by prepareSStablesToSend)
bytesWritten += calculatedTotalSize(newlyHashedFiles);
validateSSTables(writerContext);
}

protected void rebuildFilterComponents(@NotNull BulkWriterContext writerContext,
@NotNull DirectoryStream.Filter<Path> filter) throws IOException
{
LocalDataLayer layer = buildLocalDataLayer(writerContext, getOutDir(), null);
try (DirectoryStream<Path> dataFileStream = getDataFileStream(filter))
{
for (Path dataFile : dataFileStream)
{
FileSystemSSTable ssTable = new FileSystemSSTable(dataFile, false, BufferingInputStreamStats::doNothingStats);
writerContext.bridge().rebuildBloomFilter(layer.partitioner(), layer.cqlTable(), ssTable, getOutDir());
LOGGER.debug("Rebuilt bloom filter for sstable {}", dataFile);
}
}
}

@VisibleForTesting
public void validateSSTables(@NotNull BulkWriterContext writerContext)
{
Expand All @@ -319,26 +345,7 @@ public void validateSSTables(@NotNull BulkWriterContext writerContext, @NotNull
// and then validate all of them in parallel threads
try
{
CassandraVersion version = CassandraBridgeFactory.getCassandraVersion(writerContext.cluster().getLowestCassandraVersion());
String keyspace = writerContext.job().qualifiedTableName().keyspace();
String schema = writerContext.schema().getTableSchema().createStatement;
Partitioner partitioner = writerContext.cluster().getPartitioner();
Set<String> udtStatements = writerContext.schema().getUserDefinedTypeStatements();
LocalDataLayer layer = new LocalDataLayer(version,
partitioner,
keyspace,
schema,
udtStatements,
Collections.emptyList() /* requestedFeatures */,
false /* useSSTableInputStream */,
null /* statsClass */,
SSTableTimeRangeFilter.ALL,
outputDirectory.toString());
if (dataFilePaths != null)
{
layer.setDataFilePaths(dataFilePaths);
}

LocalDataLayer layer = buildLocalDataLayer(writerContext, outputDirectory, dataFilePaths);
try (StreamScanner<RowData> scanner = layer.openCompactionScanner(partitionId, Collections.emptyList(), null))
{
while (scanner.next())
Expand All @@ -354,6 +361,30 @@ public void validateSSTables(@NotNull BulkWriterContext writerContext, @NotNull
}
}

private LocalDataLayer buildLocalDataLayer(@NotNull BulkWriterContext writerContext, @NotNull Path outputDirectory, @Nullable Set<Path> dataFilePaths)
{
CassandraVersion version = CassandraBridgeFactory.getCassandraVersion(writerContext.cluster().getLowestCassandraVersion());
String keyspace = writerContext.job().qualifiedTableName().keyspace();
String schema = writerContext.schema().getTableSchema().createStatement;
Partitioner partitioner = writerContext.cluster().getPartitioner();
Set<String> udtStatements = writerContext.schema().getUserDefinedTypeStatements();
LocalDataLayer layer = new LocalDataLayer(version,
partitioner,
keyspace,
schema,
udtStatements,
Collections.emptyList() /* requestedFeatures */,
false /* useSSTableInputStream */,
null /* statsClass */,
SSTableTimeRangeFilter.ALL,
outputDirectory.toString());
if (dataFilePaths != null)
{
layer.setDataFilePaths(dataFilePaths);
}
return layer;
}

private DirectoryStream<Path> getDataFileStream(DirectoryStream.Filter<Path> filter) throws IOException
{
// Combine the data file filter with the provided filter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@
import static org.apache.cassandra.spark.utils.FilterUtils.parseSSTableTimeRangeFilter;

/**
* Basic DataLayer implementation to read SSTables from local file system. Mostly used for testing.
* Basic DataLayer implementation to read SSTables from local file system.
* Mostly used for testing, but also for validating SSTables during bulk data insertion.
*/
@SuppressWarnings({"unused", "WeakerAccess"})
public class LocalDataLayer extends DataLayer implements Serializable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.cassandra.spark.bulkwriter;

import java.nio.file.DirectoryStream;
import java.nio.file.Path;

import org.apache.cassandra.spark.utils.DigestAlgorithm;
Expand All @@ -36,4 +37,11 @@ public void validateSSTables(@NotNull BulkWriterContext writerContext)
{
// Skip validation for these tests
}

@Override
protected void rebuildFilterComponents(@NotNull BulkWriterContext writerContext,
@NotNull DirectoryStream.Filter<Path> filter)
{
// Skip rebuild for these tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,31 @@

package org.apache.cassandra.spark.bulkwriter;

import java.io.File;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.jupiter.api.AfterAll;
Expand All @@ -45,12 +52,20 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import org.apache.cassandra.bridge.BloomFilter;
import org.apache.cassandra.bridge.CassandraBridge;
import org.apache.cassandra.bridge.CassandraBridgeFactory;
import org.apache.cassandra.bridge.CassandraVersion;
import org.apache.cassandra.bridge.CassandraVersionFeatures;
import org.apache.cassandra.bridge.SSTableDescriptor;
import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
import org.apache.cassandra.spark.common.Digest;
import org.apache.cassandra.spark.data.CqlTable;
import org.apache.cassandra.spark.data.FileSystemSSTable;
import org.apache.cassandra.spark.data.ReplicationFactor;
import org.apache.cassandra.spark.data.partitioner.Partitioner;
import org.apache.cassandra.spark.stats.BufferingInputStreamStats;
import org.apache.cassandra.spark.utils.XXHash32DigestAlgorithm;
import org.jetbrains.annotations.NotNull;

Expand Down Expand Up @@ -148,6 +163,122 @@ public void canCreateWriterForVersion(String version) throws IOException
tw.validateSSTables(writerContext, tw.getOutDir(), dataFilePaths);
}

@ParameterizedTest
@MethodSource("supportedVersions")
public void testBloomFilterRebuild(String version) throws IOException
{
int rowCount = 50_000;
CassandraBridge bridge = CassandraBridgeFactory.get(version);
MockBulkWriterContext writerContext = new MockBulkWriterContext(tokenRangeMapping, version, ConsistencyLevel.CL.LOCAL_QUORUM);
Partitioner partitioner = writerContext.getPartitioner();
CqlTable cqlTable = bridge.buildSchema(writerContext.schema().getTableSchema().createStatement,
writerContext.qualifiedTableName().keyspace(),
new ReplicationFactor(ReplicationFactor.ReplicationStrategy.SimpleStrategy,
ImmutableMap.of("replication_factor", 1)),
partitioner,
Collections.emptySet());
SortedMap<BigInteger, List<String>> sortedKeys = new TreeMap<>();
for (int i = 0; i < rowCount; ++i)
{
List<String> keys = ImmutableList.of(String.valueOf(i), "1");
AbstractMap.SimpleEntry<ByteBuffer, BigInteger> partitionKey = bridge.getPartitionKey(cqlTable, partitioner, keys);
sortedKeys.put(partitionKey.getValue(), keys);
}

SortedSSTableWriter tw = new SortedSSTableWriter(writerContext, tmpDir, new XXHash32DigestAlgorithm(), 1);
List<SSTableDescriptor> allSSTables = new ArrayList<>();
tw.setSSTablesProducedListener(allSSTables::addAll);
for (BigInteger token : sortedKeys.keySet())
{
List<String> partitionKey = sortedKeys.get(token);
tw.addRow(token,
ImmutableMap.of("id", Integer.parseInt(partitionKey.get(0)),
"date", Integer.parseInt(partitionKey.get(1)),
"course", "foo", "marks", 1));
}
tw.close(writerContext);

assertThat(allSSTables).hasSize(1);

Set<Path> filterFilePaths = new HashSet<>();
try (DirectoryStream<Path> filterFileStream = Files.newDirectoryStream(tw.getOutDir(), "*-Filter.db"))
{
filterFileStream.forEach(filterFilePaths::add);
}

assertThat(filterFilePaths).hasSize(1);

Path filterFile = filterFilePaths.iterator().next();
String dataFileName = filterFile.toFile().getName().replace("-Filter", "-Data");
Path dataFilePath = filterFile.getParent().resolve(dataFileName);
FileSystemSSTable ssTable = new FileSystemSSTable(dataFilePath, false, BufferingInputStreamStats::doNothingStats);

BloomFilter bloomFilter = bridge.openBloomFilter(partitioner,
writerContext.qualifiedTableName().keyspace(),
writerContext.qualifiedTableName().table(),
ssTable);

// second column is always set to 1 when inserting data
List<ByteBuffer> searchKeys = bridge.encodePartitionKeys(partitioner,
writerContext.qualifiedTableName().keyspace(),
writerContext.schema().getTableSchema().createStatement,
ImmutableList.of(ImmutableList.of("1", "1"), ImmutableList.of("7", "2")));

assertThat(bloomFilter.mightContain(searchKeys.get(0))).isTrue();
// Flaky assertion: bloom filters can answer false positive, but since we are using limited data set,
// it is unlikely to happen.
assertThat(bloomFilter.doesNotContain(searchKeys.get(1))).isTrue();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're using a limited set of data in here, it's very unlikely we'll ever hit this... so maybe just leave a comment. If the test changes, we could hit a bloom filter false positive, and it'll cause the test to fail. I know it's really, really unlikely, so please just document it.

}

@ParameterizedTest
@MethodSource("supportedVersions")
public void testBloomFilterRebuildErrorHandling(String version) throws IOException
{
MockBulkWriterContext writerContext = new MockBulkWriterContext(tokenRangeMapping, version, ConsistencyLevel.CL.LOCAL_QUORUM);
SortedSSTableWriter tw = new SortedSSTableWriter(writerContext, tmpDir, new XXHash32DigestAlgorithm(), 1)
{
protected void rebuildFilterComponents(@NotNull BulkWriterContext writerContext,
@NotNull DirectoryStream.Filter<Path> filter) throws IOException
{
// temporarily move index file to simulate error in bloom filter rebuild process
try (DirectoryStream<Path> indexFileStream = Files.newDirectoryStream(getOutDir(), "*.db"))
{
indexFileStream.forEach(indexFilePath -> {
for (String indexSuffix : Arrays.asList("Partitions.db", "Index.db"))
{
if (indexFilePath.toFile().getName().endsWith(indexSuffix))
{
File indexFile = indexFilePath.toFile();
boolean moved = indexFile.renameTo(new File(indexFile.getAbsolutePath() + "_hidden"));
assertThat(moved).isTrue();
}
}
});
}
super.rebuildFilterComponents(writerContext, filter);
// move the index files back
try (DirectoryStream<Path> hiddenFileStream = Files.newDirectoryStream(getOutDir(), "*_hidden"))
{
hiddenFileStream.forEach(hiddenFilePath -> {
File hiddenFile = hiddenFilePath.toFile();
boolean moved = hiddenFile.renameTo(new File(hiddenFile.getParent(), hiddenFile.getName().replace("_hidden", "")));
assertThat(moved).isTrue();
});
}
}
};
List<SSTableDescriptor> allSSTables = new ArrayList<>();
tw.setSSTablesProducedListener(allSSTables::addAll);
tw.addRow(BigInteger.ONE, ImmutableMap.of("id", 1, "date", 1, "course", "foo", "marks", 1));
tw.close(writerContext);
assertThat(allSSTables).hasSize(1);
// verify that bloom filter was not created
try (DirectoryStream<Path> filterFileStream = Files.newDirectoryStream(tw.getOutDir(), "*Filter.db"))
{
assertThat(filterFileStream.iterator().hasNext()).isFalse();
}
}

/**
* Tests the race condition fix between prepareSStablesToSend (called from background threads)
* and close (called from the main thread). This test exercises CASSANALYTICS-107.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;

import org.apache.cassandra.bridge.CassandraBridge;
Expand Down Expand Up @@ -467,7 +468,7 @@ private String getKeyDef()
String clusteringKey = primaryColumns.stream()
.map(this::maybeQuoteIdentifierIfRequested)
.collect(Collectors.joining(","));
return "PRIMARY KEY (" + partitionKey + clusteringKey + ")";
return "PRIMARY KEY (" + partitionKey + (StringUtils.isNotBlank(clusteringKey) ? ("," + clusteringKey) : "") + ")";
}

@Override
Expand Down
Loading
Loading