Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public class BulkSparkConf implements Serializable
public final Integer sstableDataSizeInMiB;
public final int commitBatchSize;
public final boolean skipExtendedVerify;
public final boolean skipRowsViolatingConstraints;
public final WriteMode writeMode;
public final int commitThreadsPerInstance;
public final double importCoordinatorTimeoutMultiplier;
Expand Down Expand Up @@ -166,6 +167,8 @@ public BulkSparkConf(SparkConf conf, Map<String, String> options)
this.table = MapUtils.getOrThrow(options, WriterOptions.TABLE.name());
this.skipExtendedVerify = MapUtils.getBoolean(options, WriterOptions.SKIP_EXTENDED_VERIFY.name(), true,
"skip extended verification of SSTables by Cassandra");
this.skipRowsViolatingConstraints = MapUtils.getBoolean(options, WriterOptions.SKIP_ROWS_VIOLATING_CONSTRAINTS.name(), false,
"skip rows which failed to be written because of violated constraints");
this.consistencyLevel = ConsistencyLevel.CL.valueOf(MapUtils.getOrDefault(options, WriterOptions.BULK_WRITER_CL.name(), "EACH_QUORUM"));
String dc = MapUtils.getOrDefault(options, WriterOptions.LOCAL_DC.name(), null);
if (!consistencyLevel.isLocal() && dc != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,14 @@ private void persist(@NotNull JavaPairRDD<DecoratedKey, Object[]> sortedRDD, Str

long rowCount = streamResults.stream().mapToLong(res -> res.rowCount).sum();
long totalBytesWritten = streamResults.stream().mapToLong(res -> res.bytesWritten).sum();
long rowsViolatedConstraints = streamResults.stream().mapToLong(res -> res.rowsViolatedConstraints).sum();
boolean hasClusterTopologyChanged = writeResults.stream().anyMatch(WriteResult::isClusterResizeDetected);

onCloudStorageTransport(context -> waitForImportCompletion(context, rowCount, totalBytesWritten, hasClusterTopologyChanged, streamResults));

LOGGER.info("Bulk writer job complete. rowCount={} totalBytes={} hasClusterTopologyChanged={}",
rowCount, totalBytesWritten, hasClusterTopologyChanged);
publishSuccessfulJobStats(rowCount, totalBytesWritten, hasClusterTopologyChanged);
publishSuccessfulJobStats(rowCount, totalBytesWritten, rowsViolatedConstraints, hasClusterTopologyChanged);
}
catch (Throwable throwable)
{
Expand Down Expand Up @@ -300,13 +301,14 @@ private void awaitImportCompletion(TransportContext.CloudStorageTransportContext
importCoordinator.await();
}

private void publishSuccessfulJobStats(long rowCount, long totalBytesWritten, boolean hasClusterTopologyChanged)
private void publishSuccessfulJobStats(long rowCount, long rowsViolatingConstraints, long totalBytesWritten, boolean hasClusterTopologyChanged)
{
writerContext.jobStats().publish(new HashMap<String, String>() // type declaration required to compile with java8
{{
put("jobId", writerContext.job().getId().toString());
put("transportInfo", writerContext.job().transportInfo().toString());
put("rowsWritten", Long.toString(rowCount));
put("rowsViolatingConstraints", Long.toString(rowsViolatingConstraints));
put("bytesWritten", Long.toString(totalBytesWritten));
put("jobStatus", "Succeeded");
put("clusterResizeDetected", String.valueOf(hasClusterTopologyChanged));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ public boolean skipExtendedVerify()
return conf.skipExtendedVerify;
}

@Override
public boolean skipRowsViolatingConstraints()
{
return conf.skipRowsViolatingConstraints;
}

@Override
public boolean getSkipClean()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ public class DirectStreamResult extends StreamResult

public DirectStreamResult(String sessionID, Range<BigInteger> tokenRange,
List<StreamError> failures, List<RingInstance> passed,
long rowCount, long bytesWritten)
long rowCount, long bytesWritten, long rowsViolatedConstraints)
{
super(sessionID, tokenRange, failures, passed, rowCount, bytesWritten);
super(sessionID, tokenRange, failures, passed, rowCount, bytesWritten, rowsViolatedConstraints);
}

public void setCommitResults(List<CommitResult> commitResult)
Expand All @@ -53,6 +53,7 @@ public String toString()
+ ", commitResults=" + commitResults
+ ", passed=" + passed
+ ", bytesWritten=" + bytesWritten
+ ", rowsViolatedConstraints=" + rowsViolatedConstraints
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ protected StreamResult doFinalizeStream()
errors,
new ArrayList<>(replicas),
sstableWriter.rowCount(),
sstableWriter.bytesWritten());
sstableWriter.bytesWritten(),
sstableWriter.rowsViolatedConstraints());
List<CommitResult> cr;
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ default String getId()

boolean skipExtendedVerify();

/**
* Defaults to false, in that case, when a row violates a constraint, a job will fail.
*
* @return true if rows violating constraints will not fail a job, false otherwise
*/
boolean skipRowsViolatingConstraints();

boolean getSkipClean();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,14 @@ public class SortedSSTableWriter
private BigInteger maxToken = null;
private final Map<Path, Digest> overallFileDigests = new HashMap<>();
private final DigestAlgorithm digestAlgorithm;
private final boolean skipRowsViolatingConstraints;

private volatile boolean isClosed = false;

private int sstableCount = 0;
private long rowCount = 0;
private long bytesWritten = 0;
private long rowsViolatedConstraints = 0;

public SortedSSTableWriter(org.apache.cassandra.bridge.SSTableWriter tableWriter, Path outDir,
DigestAlgorithm digestAlgorithm,
Expand All @@ -92,13 +94,15 @@ public SortedSSTableWriter(org.apache.cassandra.bridge.SSTableWriter tableWriter
this.outDir = outDir;
this.digestAlgorithm = digestAlgorithm;
this.partitionId = partitionId;
this.skipRowsViolatingConstraints = false;
}

public SortedSSTableWriter(BulkWriterContext writerContext, Path outDir, DigestAlgorithm digestAlgorithm, int partitionId)
{
this.outDir = outDir;
this.digestAlgorithm = digestAlgorithm;
this.partitionId = partitionId;
this.skipRowsViolatingConstraints = writerContext.job().skipRowsViolatingConstraints();

String lowestCassandraVersion = writerContext.cluster().getLowestCassandraVersion();
String packageVersion = getPackageVersion(lowestCassandraVersion);
Expand Down Expand Up @@ -137,8 +141,24 @@ public void addRow(BigInteger token, Map<String, Object> boundValues) throws IOE
}
// rows are sorted. Therefore, only update the maxToken
maxToken = token;
cqlSSTableWriter.addRow(boundValues);
rowCount += 1;
try
{
cqlSSTableWriter.addRow(boundValues);
rowCount += 1;
}
catch (Throwable t)
{
if (t.getCause() != null && "org.apache.cassandra.cql3.constraints.ConstraintViolationException".equals(t.getCause().getClass().getName()))
{
rowsViolatedConstraints += 1;
if (!skipRowsViolatingConstraints)
throw t;
}
else
{
throw t;
}
}
}

public void setSSTablesProducedListener(Consumer<Set<SSTableDescriptor>> listener)
Expand Down Expand Up @@ -324,4 +344,9 @@ public Map<Path, Digest> fileDigestMap()
{
return Collections.unmodifiableMap(overallFileDigests);
}

public long rowsViolatedConstraints()
{
return rowsViolatedConstraints;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,22 @@ public abstract class StreamResult implements Serializable
public final List<RingInstance> passed;
public final long rowCount;
public final long bytesWritten;
public final long rowsViolatedConstraints;

protected StreamResult(String sessionID,
Range<BigInteger> tokenRange,
List<StreamError> failures,
List<RingInstance> passed,
long rowCount,
long bytesWritten)
long bytesWritten,
long rowsViolatedConstraints)
{
this.sessionID = sessionID;
this.tokenRange = tokenRange;
this.failures = failures;
this.passed = passed;
this.rowCount = rowCount;
this.bytesWritten = bytesWritten;
this.rowsViolatedConstraints = rowsViolatedConstraints;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public enum WriterOptions implements WriterOption
COMMIT_THREADS_PER_INSTANCE,
COMMIT_BATCH_SIZE,
SKIP_EXTENDED_VERIFY,
SKIP_ROWS_VIOLATING_CONSTRAINTS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you maybe add a comment here that this feature is only available in Cassandra 5.0 version? Or is it 6?

Copy link
Contributor Author

@smiklosovic smiklosovic Jun 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankgh thank you for the review but I am not sure this approach is still viable based on the discussion in the related ticket. Did I interpret that comment correctly when I think that what I did here is itself not enough to consider CASSANALYTICS-32 to be resolved as we need to expose failed rows somehow, but this I am trying to do here is nice to have / the first step towards that direction which is valuable already to have as such?

https://issues.apache.org/jira/browse/CASSANALYTICS-32?focusedCommentId=17980384&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17980384

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I saw the comment from Yifan after I reviewed the PR. I think it's a good first approach, you need to explicitly enable this feature in order to write data that does not violate the constraints.

WRITE_MODE,
KEYSTORE_PASSWORD,
KEYSTORE_PATH,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class CloudStorageStreamResult extends StreamResult

public static CloudStorageStreamResult empty(String sessionID, Range<BigInteger> tokenRange)
{
return new CloudStorageStreamResult(sessionID, tokenRange, new ArrayList<>(), new ArrayList<>(), new HashSet<>(), 0, 0, 0);
return new CloudStorageStreamResult(sessionID, tokenRange, new ArrayList<>(), new ArrayList<>(), new HashSet<>(), 0, 0, 0, 0);
}

public CloudStorageStreamResult(String sessionID,
Expand All @@ -54,9 +54,10 @@ public CloudStorageStreamResult(String sessionID,
Set<CreatedRestoreSlice> createdRestoreSlices,
int objectCount,
long rowCount,
long bytesWritten)
long bytesWritten,
long rowsViolatedConstraints)
{
super(sessionID, tokenRange, failures, passed, rowCount, bytesWritten);
super(sessionID, tokenRange, failures, passed, rowCount, bytesWritten, rowsViolatedConstraints);
this.createdRestoreSlices = Collections.unmodifiableSet(createdRestoreSlices);
this.objectCount = objectCount;
}
Expand All @@ -70,6 +71,7 @@ public String toString()
+ ", objectCount=" + objectCount
+ ", rowCount=" + rowCount
+ ", bytesWritten=" + bytesWritten
+ ", rowsViolatedConstraints=" + rowsViolatedConstraints
+ ", failures=" + failures
+ ", createdRestoreSlices=" + createdRestoreSlices
+ ", passed=" + passed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,8 @@ protected StreamResult doFinalizeStream()
createdRestoreSlices,
sliceCount,
sstableWriter.rowCount(),
sstableWriter.bytesWritten());
sstableWriter.bytesWritten(),
sstableWriter.rowsViolatedConstraints());
LOGGER.info("StreamResult: {}", streamResult);
// CL validation is only required for non-coordinated write.
// For coordinated write, executors only need to make sure the bundles are uploaded and restore slice is created on sidecar.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ DirectStreamResult build()
DirectStreamResult sr = new DirectStreamResult(UUID.randomUUID().toString(),
TEST_RANGE,
buildFailures(),
buildPassed(), 0, 0);
buildPassed(), 0, 0, 0);
if (successfulCommits > 0 || failedCommits > 0)
{
List<CommitResult> commitResults = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,12 @@ public boolean skipExtendedVerify()
return false;
}

@Override
public boolean skipRowsViolatingConstraints()
{
return false;
}

@Override
public boolean getSkipClean()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ else if (failedPerReplica-- > 0)
}
CloudStorageStreamResult result = new CloudStorageStreamResult("", mock(Range.class), Collections.emptyList(),
passedReplicaSet, createdRestoreSlices,
createdRestoreSlices.size(), 0, 0);
createdRestoreSlices.size(), 0, 0, 0);
resultList.add(result);
}
return resultList;
Expand Down