From b7ea0eb102bc591e12298ac7fc3582cfd9a93a04 Mon Sep 17 00:00:00 2001 From: Stefan Miklosovic Date: Tue, 17 Jun 2025 16:49:00 +0200 Subject: [PATCH 1/2] CASSANALYTICS-32 wip --- .../spark/bulkwriter/BulkSparkConf.java | 3 ++ .../CassandraBulkSourceRelation.java | 6 ++-- .../spark/bulkwriter/CassandraJobInfo.java | 6 ++++ .../spark/bulkwriter/DirectStreamResult.java | 5 ++-- .../spark/bulkwriter/DirectStreamSession.java | 3 +- .../cassandra/spark/bulkwriter/JobInfo.java | 2 ++ .../spark/bulkwriter/SortedSSTableWriter.java | 29 +++++++++++++++++-- .../spark/bulkwriter/StreamResult.java | 5 +++- .../spark/bulkwriter/WriterOptions.java | 1 + .../CloudStorageStreamResult.java | 8 +++-- .../CloudStorageStreamSession.java | 3 +- .../bulkwriter/CommitCoordinatorTest.java | 2 +- .../ImportCompletionCoordinatorTest.java | 2 +- 13 files changed, 61 insertions(+), 14 deletions(-) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java index 7fdc48f6a..3d4924791 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java @@ -121,6 +121,7 @@ public class BulkSparkConf implements Serializable public final Integer sstableDataSizeInMiB; public final int commitBatchSize; public final boolean skipExtendedVerify; + public final boolean skipRowsViolatingConstraints; public final WriteMode writeMode; public final int commitThreadsPerInstance; public final double importCoordinatorTimeoutMultiplier; @@ -166,6 +167,8 @@ public BulkSparkConf(SparkConf conf, Map options) this.table = MapUtils.getOrThrow(options, WriterOptions.TABLE.name()); this.skipExtendedVerify = MapUtils.getBoolean(options, WriterOptions.SKIP_EXTENDED_VERIFY.name(), true, "skip extended verification of SSTables by Cassandra"); + this.skipRowsViolatingConstraints = MapUtils.getBoolean(options, WriterOptions.SKIP_ROWS_VIOLATING_CONSTRAINTS.name(), false, + "skip rows which failed to be written because of violated constraints"); this.consistencyLevel = ConsistencyLevel.CL.valueOf(MapUtils.getOrDefault(options, WriterOptions.BULK_WRITER_CL.name(), "EACH_QUORUM")); String dc = MapUtils.getOrDefault(options, WriterOptions.LOCAL_DC.name(), null); if (!consistencyLevel.isLocal() && dc != null) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java index f80ae5ca6..b466235b6 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java @@ -199,13 +199,14 @@ private void persist(@NotNull JavaPairRDD sortedRDD, Str long rowCount = streamResults.stream().mapToLong(res -> res.rowCount).sum(); long totalBytesWritten = streamResults.stream().mapToLong(res -> res.bytesWritten).sum(); + long rowsViolatedConstraints = streamResults.stream().mapToLong(res -> res.rowsViolatedConstraints).sum(); boolean hasClusterTopologyChanged = writeResults.stream().anyMatch(WriteResult::isClusterResizeDetected); onCloudStorageTransport(context -> waitForImportCompletion(context, rowCount, totalBytesWritten, hasClusterTopologyChanged, streamResults)); LOGGER.info("Bulk writer job complete. rowCount={} totalBytes={} hasClusterTopologyChanged={}", rowCount, totalBytesWritten, hasClusterTopologyChanged); - publishSuccessfulJobStats(rowCount, totalBytesWritten, hasClusterTopologyChanged); + publishSuccessfulJobStats(rowCount, totalBytesWritten, rowsViolatedConstraints, hasClusterTopologyChanged); } catch (Throwable throwable) { @@ -300,13 +301,14 @@ private void awaitImportCompletion(TransportContext.CloudStorageTransportContext importCoordinator.await(); } - private void publishSuccessfulJobStats(long rowCount, long totalBytesWritten, boolean hasClusterTopologyChanged) + private void publishSuccessfulJobStats(long rowCount, long rowsViolatingConstraints, long totalBytesWritten, boolean hasClusterTopologyChanged) { writerContext.jobStats().publish(new HashMap() // type declaration required to compile with java8 {{ put("jobId", writerContext.job().getId().toString()); put("transportInfo", writerContext.job().transportInfo().toString()); put("rowsWritten", Long.toString(rowCount)); + put("rowsViolatingConstraints", Long.toString(rowsViolatingConstraints)); put("bytesWritten", Long.toString(totalBytesWritten)); put("jobStatus", "Succeeded"); put("clusterResizeDetected", String.valueOf(hasClusterTopologyChanged)); diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java index 3759ea1c3..9cf73c841 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java @@ -76,6 +76,12 @@ public boolean skipExtendedVerify() return conf.skipExtendedVerify; } + @Override + public boolean skipRowsViolatingConstraints() + { + return conf.skipRowsViolatingConstraints; + } + @Override public boolean getSkipClean() { diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DirectStreamResult.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DirectStreamResult.java index 948566a7b..62ac3befa 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DirectStreamResult.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DirectStreamResult.java @@ -32,9 +32,9 @@ public class DirectStreamResult extends StreamResult public DirectStreamResult(String sessionID, Range tokenRange, List failures, List passed, - long rowCount, long bytesWritten) + long rowCount, long bytesWritten, long rowsViolatedConstraints) { - super(sessionID, tokenRange, failures, passed, rowCount, bytesWritten); + super(sessionID, tokenRange, failures, passed, rowCount, bytesWritten, rowsViolatedConstraints); } public void setCommitResults(List commitResult) @@ -53,6 +53,7 @@ public String toString() + ", commitResults=" + commitResults + ", passed=" + passed + ", bytesWritten=" + bytesWritten + + ", rowsViolatedConstraints=" + rowsViolatedConstraints + '}'; } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DirectStreamSession.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DirectStreamSession.java index 857e8b157..0a8e38c31 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DirectStreamSession.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DirectStreamSession.java @@ -123,7 +123,8 @@ protected StreamResult doFinalizeStream() errors, new ArrayList<>(replicas), sstableWriter.rowCount(), - sstableWriter.bytesWritten()); + sstableWriter.bytesWritten(), + sstableWriter.rowsViolatedConstraints()); List cr; try { diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java index bbbb03b75..66a8882a0 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java @@ -70,6 +70,8 @@ default String getId() boolean skipExtendedVerify(); + boolean skipRowsViolatingConstraints(); + boolean getSkipClean(); /** diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java index e3b534c56..7e49c2d52 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java @@ -77,12 +77,14 @@ public class SortedSSTableWriter private BigInteger maxToken = null; private final Map overallFileDigests = new HashMap<>(); private final DigestAlgorithm digestAlgorithm; + private final boolean skipRowsViolatingConstraints; private volatile boolean isClosed = false; private int sstableCount = 0; private long rowCount = 0; private long bytesWritten = 0; + private long rowsViolatedConstraints = 0; public SortedSSTableWriter(org.apache.cassandra.bridge.SSTableWriter tableWriter, Path outDir, DigestAlgorithm digestAlgorithm, @@ -92,6 +94,7 @@ public SortedSSTableWriter(org.apache.cassandra.bridge.SSTableWriter tableWriter this.outDir = outDir; this.digestAlgorithm = digestAlgorithm; this.partitionId = partitionId; + this.skipRowsViolatingConstraints = false; } public SortedSSTableWriter(BulkWriterContext writerContext, Path outDir, DigestAlgorithm digestAlgorithm, int partitionId) @@ -99,6 +102,7 @@ public SortedSSTableWriter(BulkWriterContext writerContext, Path outDir, DigestA this.outDir = outDir; this.digestAlgorithm = digestAlgorithm; this.partitionId = partitionId; + this.skipRowsViolatingConstraints = writerContext.job().skipRowsViolatingConstraints(); String lowestCassandraVersion = writerContext.cluster().getLowestCassandraVersion(); String packageVersion = getPackageVersion(lowestCassandraVersion); @@ -137,8 +141,24 @@ public void addRow(BigInteger token, Map boundValues) throws IOE } // rows are sorted. Therefore, only update the maxToken maxToken = token; - cqlSSTableWriter.addRow(boundValues); - rowCount += 1; + try + { + cqlSSTableWriter.addRow(boundValues); + rowCount += 1; + } + catch (Throwable t) + { + if (t.getCause() != null && t.getCause().getClass().getName().equals("org.apache.cassandra.cql3.constraints.ConstraintViolationException")) + { + rowsViolatedConstraints += 1; + if (!skipRowsViolatingConstraints) + throw t; + } + else + { + throw t; + } + } } public void setSSTablesProducedListener(Consumer> listener) @@ -324,4 +344,9 @@ public Map fileDigestMap() { return Collections.unmodifiableMap(overallFileDigests); } + + public long rowsViolatedConstraints() + { + return rowsViolatedConstraints; + } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamResult.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamResult.java index 56b2dbba5..df286b108 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamResult.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamResult.java @@ -34,13 +34,15 @@ public abstract class StreamResult implements Serializable public final List passed; public final long rowCount; public final long bytesWritten; + public final long rowsViolatedConstraints; protected StreamResult(String sessionID, Range tokenRange, List failures, List passed, long rowCount, - long bytesWritten) + long bytesWritten, + long rowsViolatedConstraints) { this.sessionID = sessionID; this.tokenRange = tokenRange; @@ -48,5 +50,6 @@ protected StreamResult(String sessionID, this.passed = passed; this.rowCount = rowCount; this.bytesWritten = bytesWritten; + this.rowsViolatedConstraints = rowsViolatedConstraints; } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java index 6574e2782..2ab28a8d8 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java @@ -45,6 +45,7 @@ public enum WriterOptions implements WriterOption COMMIT_THREADS_PER_INSTANCE, COMMIT_BATCH_SIZE, SKIP_EXTENDED_VERIFY, + SKIP_ROWS_VIOLATING_CONSTRAINTS, WRITE_MODE, KEYSTORE_PASSWORD, KEYSTORE_PATH, diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamResult.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamResult.java index 971786c8d..54cb48360 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamResult.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamResult.java @@ -44,7 +44,7 @@ public class CloudStorageStreamResult extends StreamResult public static CloudStorageStreamResult empty(String sessionID, Range tokenRange) { - return new CloudStorageStreamResult(sessionID, tokenRange, new ArrayList<>(), new ArrayList<>(), new HashSet<>(), 0, 0, 0); + return new CloudStorageStreamResult(sessionID, tokenRange, new ArrayList<>(), new ArrayList<>(), new HashSet<>(), 0, 0, 0, 0); } public CloudStorageStreamResult(String sessionID, @@ -54,9 +54,10 @@ public CloudStorageStreamResult(String sessionID, Set createdRestoreSlices, int objectCount, long rowCount, - long bytesWritten) + long bytesWritten, + long rowsViolatedConstraints) { - super(sessionID, tokenRange, failures, passed, rowCount, bytesWritten); + super(sessionID, tokenRange, failures, passed, rowCount, bytesWritten, rowsViolatedConstraints); this.createdRestoreSlices = Collections.unmodifiableSet(createdRestoreSlices); this.objectCount = objectCount; } @@ -70,6 +71,7 @@ public String toString() + ", objectCount=" + objectCount + ", rowCount=" + rowCount + ", bytesWritten=" + bytesWritten + + ", rowsViolatedConstraints=" + rowsViolatedConstraints + ", failures=" + failures + ", createdRestoreSlices=" + createdRestoreSlices + ", passed=" + passed diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSession.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSession.java index 964c4edf3..611fbd666 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSession.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamSession.java @@ -189,7 +189,8 @@ protected StreamResult doFinalizeStream() createdRestoreSlices, sliceCount, sstableWriter.rowCount(), - sstableWriter.bytesWritten()); + sstableWriter.bytesWritten(), + sstableWriter.rowsViolatedConstraints()); LOGGER.info("StreamResult: {}", streamResult); // CL validation is only required for non-coordinated write. // For coordinated write, executors only need to make sure the bundles are uploaded and restore slice is created on sidecar. diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CommitCoordinatorTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CommitCoordinatorTest.java index 258cfe2d4..cd88b3563 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CommitCoordinatorTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CommitCoordinatorTest.java @@ -232,7 +232,7 @@ DirectStreamResult build() DirectStreamResult sr = new DirectStreamResult(UUID.randomUUID().toString(), TEST_RANGE, buildFailures(), - buildPassed(), 0, 0); + buildPassed(), 0, 0, 0); if (successfulCommits > 0 || failedCommits > 0) { List commitResults = new ArrayList<>(); diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/ImportCompletionCoordinatorTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/ImportCompletionCoordinatorTest.java index dbc0d2717..a6e23a0d5 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/ImportCompletionCoordinatorTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/ImportCompletionCoordinatorTest.java @@ -481,7 +481,7 @@ else if (failedPerReplica-- > 0) } CloudStorageStreamResult result = new CloudStorageStreamResult("", mock(Range.class), Collections.emptyList(), passedReplicaSet, createdRestoreSlices, - createdRestoreSlices.size(), 0, 0); + createdRestoreSlices.size(), 0, 0, 0); resultList.add(result); } return resultList; From 71c95c63badceb7bbabd945d906d88061b390f3f Mon Sep 17 00:00:00 2001 From: Stefan Miklosovic Date: Thu, 3 Jul 2025 09:49:03 +0200 Subject: [PATCH 2/2] review --- .../java/org/apache/cassandra/spark/bulkwriter/JobInfo.java | 5 +++++ .../cassandra/spark/bulkwriter/SortedSSTableWriter.java | 2 +- .../cassandra/spark/bulkwriter/MockBulkWriterContext.java | 6 ++++++ 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java index 66a8882a0..34c8d2747 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java @@ -70,6 +70,11 @@ default String getId() boolean skipExtendedVerify(); + /** + * Defaults to false, in that case, when a row violates a constraint, a job will fail. + * + * @return true if rows violating constraints will not fail a job, false otherwise + */ boolean skipRowsViolatingConstraints(); boolean getSkipClean(); diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java index 7e49c2d52..6cc65bf6d 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java @@ -148,7 +148,7 @@ public void addRow(BigInteger token, Map boundValues) throws IOE } catch (Throwable t) { - if (t.getCause() != null && t.getCause().getClass().getName().equals("org.apache.cassandra.cql3.constraints.ConstraintViolationException")) + if (t.getCause() != null && "org.apache.cassandra.cql3.constraints.ConstraintViolationException".equals(t.getCause().getClass().getName())) { rowsViolatedConstraints += 1; if (!skipRowsViolatingConstraints) diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java index e9f86caa4..d7d402bb0 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java @@ -259,6 +259,12 @@ public boolean skipExtendedVerify() return false; } + @Override + public boolean skipRowsViolatingConstraints() + { + return false; + } + @Override public boolean getSkipClean() {