From 24ef9cc3e39eee8a28448ecf4ea2790137a10553 Mon Sep 17 00:00:00 2001 From: Arjun Ashok Date: Fri, 5 Apr 2024 13:57:27 -0700 Subject: [PATCH 1/5] CASSANDRA-19480 Additional task execution specific instrumentation of job stats --- .../cassandra/spark/JobEventDetail.java | 44 ++++++ .../CassandraBulkSourceRelation.java | 42 +++-- .../CassandraBulkWriterContext.java | 15 +- .../spark/bulkwriter/JobStatsListener.java | 145 ++++++++++++++++++ .../cassandra/spark/common/JobStats.java | 35 +++++ .../spark/data/CassandraDataLayer.java | 79 +++++++++- .../spark/data/CassandraDataSourceHelper.java | 1 + .../spark/data/PartitionedDataLayer.java | 1 + 8 files changed, 334 insertions(+), 28 deletions(-) create mode 100644 cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/JobEventDetail.java create mode 100644 cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobStatsListener.java create mode 100644 cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/JobStats.java diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/JobEventDetail.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/JobEventDetail.java new file mode 100644 index 000000000..1369cd638 --- /dev/null +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/JobEventDetail.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark; + +import java.util.Map; + +public class JobEventDetail +{ + private String internalJobID; + private Map jobStats; + + public JobEventDetail(String internalJobID, Map jobStats) + { + this.internalJobID = internalJobID; + this.jobStats = jobStats; + } + + public String internalJobID() + { + return internalJobID; + } + + public Map jobStats() + { + return jobStats; + } +} diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java index 50a01193f..309f4aade 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.UUID; @@ -65,6 +66,7 @@ public class CassandraBulkSourceRelation extends BaseRelation implements Inserta private final SQLContext sqlContext; private final JavaSparkContext sparkContext; private final Broadcast broadcastContext; + private final JobStatsListener jobStatsListener; private final BulkWriteValidator writeValidator; private HeartbeatReporter heartbeatReporter; private long startTimeNanos; @@ -79,6 +81,14 @@ public CassandraBulkSourceRelation(BulkWriterContext writerContext, SQLContext s ReplicaAwareFailureHandler failureHandler = new ReplicaAwareFailureHandler<>(writerContext.cluster().getPartitioner()); this.writeValidator = new BulkWriteValidator(writerContext, failureHandler); onCloudStorageTransport(ignored -> this.heartbeatReporter = new HeartbeatReporter()); + this.jobStatsListener = new JobStatsListener((jobEventDetail) -> { + if (writerContext.job().getId().toString().equals(jobEventDetail.internalJobID())) + { + writerContext.jobStats().publish(jobEventDetail.jobStats()); + } + }); + + this.sparkContext.sc().addSparkListener(jobStatsListener); } @Override @@ -114,7 +124,6 @@ public void insert(@NotNull Dataset data, boolean overwrite) { validateJob(overwrite); this.startTimeNanos = System.nanoTime(); - maybeEnableTransportExtension(); Tokenizer tokenizer = new Tokenizer(writerContext); TableSchema tableSchema = writerContext.schema().getTableSchema(); @@ -186,7 +195,8 @@ private void persist(@NotNull JavaPairRDD sortedRDD, Str long rowCount = streamResults.stream().mapToLong(res -> res.rowCount).sum(); long totalBytesWritten = streamResults.stream().mapToLong(res -> res.bytesWritten).sum(); - boolean hasClusterTopologyChanged = writeResults.stream().anyMatch(WriteResult::isClusterResizeDetected); + boolean hasClusterTopologyChanged = writeResults.stream() + .anyMatch(WriteResult::isClusterResizeDetected); onCloudStorageTransport(context -> { LOGGER.info("Waiting for Cassandra to complete import slices. rows={} bytes={} cluster_resized={}", @@ -217,7 +227,7 @@ private void persist(@NotNull JavaPairRDD sortedRDD, Str markRestoreJobAsSucceeded(context); }); - LOGGER.info("Bulk writer job complete. rows={} bytes={} cluster_resized={}", + LOGGER.info("Bulk writer job complete. rows={} bytes={} cluster_resize={}", rowCount, totalBytesWritten, hasClusterTopologyChanged); @@ -225,8 +235,7 @@ private void persist(@NotNull JavaPairRDD sortedRDD, Str } catch (Throwable throwable) { - publishFailureJobStats(throwable.getMessage()); - LOGGER.error("Bulk Write Failed.", throwable); + LOGGER.error("Bulk Write Failed", throwable); RuntimeException failure = new RuntimeException("Bulk Write to Cassandra has failed", throwable); try { @@ -258,28 +267,17 @@ private void persist(@NotNull JavaPairRDD sortedRDD, Str private void publishSuccessfulJobStats(long rowCount, long totalBytesWritten, boolean hasClusterTopologyChanged) { - writerContext.jobStats().publish(new HashMap() // type declaration required to compile with java8 - {{ + Map stats = new HashMap<>() + { + { put("jobId", writerContext.job().getId().toString()); put("transportInfo", writerContext.job().transportInfo().toString()); put("rowsWritten", Long.toString(rowCount)); put("bytesWritten", Long.toString(totalBytesWritten)); - put("jobStatus", "Succeeded"); put("clusterResizeDetected", String.valueOf(hasClusterTopologyChanged)); - put("jobElapsedTimeMillis", Long.toString(elapsedTimeMillis())); - }}); - } - - private void publishFailureJobStats(String reason) - { - writerContext.jobStats().publish(new HashMap() // type declaration required to compile with java8 - {{ - put("jobId", writerContext.job().getId().toString()); - put("transportInfo", writerContext.job().transportInfo().toString()); - put("jobStatus", "Failed"); - put("failureReason", reason); - put("jobElapsedTimeMillis", Long.toString(elapsedTimeMillis())); - }}); + } + }; + writerContext.jobStats().publish(stats); } /** diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java index 89f2bb911..ce4108071 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java @@ -124,6 +124,9 @@ public static BulkWriterContext fromOptions(@NotNull SparkContext sparkContext, BulkSparkConf conf = new BulkSparkConf(sparkContext.getConf(), strOptions); CassandraClusterInfo clusterInfo = new CassandraClusterInfo(conf); + + clusterInfo.startupValidate(); + CassandraBulkWriterContext bulkWriterContext = new CassandraBulkWriterContext(conf, clusterInfo, dfSchema, sparkContext); ShutdownHookManager.addShutdownHook(org.apache.spark.util.ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY(), ScalaFunctions.wrapLambda(bulkWriterContext::shutdown)); @@ -135,12 +138,11 @@ private void publishInitialJobStats(String sparkVersion) { Map initialJobStats = new HashMap() // type declaration required to compile with java8 {{ - put("jobId", jobInfo.getId().toString()); put("sparkVersion", sparkVersion); - put("keyspace", jobInfo.getId().toString()); - put("table", jobInfo.getId().toString()); + put("keyspace", jobInfo.getId()); + put("table", jobInfo.qualifiedTableName().toString()); }}; - jobStatsPublisher.publish(initialJobStats); + publish(initialJobStats); } @Override @@ -256,4 +258,9 @@ protected TableSchema initializeTableSchema(@NotNull BulkSparkConf conf, lowestCassandraVersion, jobInfo.qualifiedTableName().quoteIdentifiers()); } + + public void publish(Map stats) + { + LOGGER.info("Job Stats:" + stats); + } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobStatsListener.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobStatsListener.java new file mode 100644 index 000000000..4f068531a --- /dev/null +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobStatsListener.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.bulkwriter; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import org.apache.commons.math3.stat.StatUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.spark.JobEventDetail; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.scheduler.JobFailed; +import org.apache.spark.scheduler.SparkListener; +import org.apache.spark.scheduler.SparkListenerJobEnd; +import org.apache.spark.scheduler.SparkListenerJobStart; +import org.apache.spark.scheduler.SparkListenerTaskEnd; + +/** + * Spark listener implementation to capture stats on completion of jobs and tasks. + */ +public class JobStatsListener extends SparkListener +{ + private final Map jobIdToTaskRetryStats = new HashMap<>(); + private final Map> jobIdToTaskMetrics = new HashMap<>(); + private final Map jobIdToStartTimes = new HashMap<>(); + private final Map internalJobIdMapping = new HashMap<>(); + + private static int jobId = -1; + private static final Logger LOGGER = LoggerFactory.getLogger(JobStatsListener.class); + private final Consumer jobCompletionConsumer; + + public JobStatsListener(Consumer jobCompletionConsumer) + { + this.jobCompletionConsumer = jobCompletionConsumer; + } + + @Override + public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) + { + // Calculate max task retries across all tasks in job + int attempt = taskEnd.taskInfo().attemptNumber(); + jobIdToTaskRetryStats.compute(jobId, (k, v) -> (v == null || attempt > v) ? attempt : v); + // Persist all task metrics for the job - across all stages + jobIdToTaskMetrics.computeIfAbsent(jobId, k -> new HashSet<>()).add(taskEnd.taskMetrics()); + LOGGER.debug("Task END for jobId:{} task:{} task attempt:{}} Reason:{}", + jobId, + taskEnd.taskInfo().taskId(), + taskEnd.taskInfo().attemptNumber(), + taskEnd.reason()); + } + + @Override + public void onJobStart(SparkListenerJobStart jobStart) + { + String internalJobId = (String) jobStart.properties().get("spark.jobGroup.id"); + jobId = Integer.valueOf(jobStart.jobId()); + internalJobIdMapping.put(jobId, internalJobId); + jobIdToStartTimes.put(jobId, System.nanoTime()); + } + + @Override + public void onJobEnd(SparkListenerJobEnd jobEnd) + { + boolean jobFailed = false; + String reason = "null"; + if (jobEnd.jobResult() instanceof JobFailed) + { + jobFailed = true; + JobFailed result = (JobFailed) jobEnd.jobResult(); + reason = result.exception().getCause().getMessage(); + } + + long elapsedTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - jobIdToStartTimes.get(jobId)); + String internalJobId = internalJobIdMapping.get(jobId); + String jobStatus = (jobFailed) ? "Failed" : "Succeeded"; + Map jobStats = new HashMap<>(); + jobStats.put("jobId", internalJobId); + jobStats.put("jobStatus", jobStatus); + jobStats.put("failureReason", reason); + jobStats.put("jobElapsedTimeMillis", String.valueOf(elapsedTimeMillis)); + + LOGGER.debug("Job END for jobId:{} status:{} Reason:{} ElapsedTime: {}", + jobId, + jobStatus, + reason, + elapsedTimeMillis); + + jobStats.putAll(getJobMetrics(jobId)); + jobCompletionConsumer.accept(new JobEventDetail(internalJobId, jobStats)); + cleanup(jobId); + } + + public synchronized Map getJobMetrics(int jobId) + { + Map jobMetrics = new HashMap<>(); + if (jobIdToTaskMetrics.containsKey(jobId)) + { + List runTimes = jobIdToTaskMetrics.get(jobId) + .stream() + .map(TaskMetrics::executorRunTime) + .collect(Collectors.toList()); + + double[] runTimesArray = runTimes.stream().mapToDouble(Long::doubleValue).toArray(); + jobMetrics.put("maxTaskRuntimeMillis", String.valueOf(StatUtils.max(runTimesArray))); + jobMetrics.put("meanTaskRuntimeMillis", String.valueOf(StatUtils.mean(runTimesArray))); + jobMetrics.put("p50TaskRuntimeMillis", String.valueOf(StatUtils.percentile(runTimesArray, 50))); + jobMetrics.put("p95TaskRuntimeMillis", String.valueOf(StatUtils.percentile(runTimesArray, 95))); + jobMetrics.put("maxTaskRetriesMillis", String.valueOf(jobIdToTaskRetryStats.get(jobId))); + } + return jobMetrics; + } + + private void cleanup(int jobId) + { + jobIdToStartTimes.remove(jobId); + internalJobIdMapping.remove(jobId); + jobIdToTaskMetrics.remove(jobId); + jobIdToTaskRetryStats.remove(jobId); + } +} diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/JobStats.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/JobStats.java new file mode 100644 index 000000000..9688cd487 --- /dev/null +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/JobStats.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.common; + +import java.util.Map; + +/** + * Interface to provide functionality to report Spark Job Statistics and/or properties + * that can optionally be instrumented. The default implementation merely logs these + * stats at the end of the job. + */ +public interface JobStats +{ + /** + * Publish the accumulated job attributes to be persisted and summarized + */ + void publish(Map stats); +} diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java index 5bd2a1d07..260d83349 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; @@ -41,6 +42,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -51,6 +53,7 @@ import com.google.common.cache.CacheBuilder; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Range; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,6 +78,8 @@ import org.apache.cassandra.sidecar.client.SidecarInstanceImpl; import org.apache.cassandra.sidecar.client.SimpleSidecarInstancesProvider; import org.apache.cassandra.sidecar.client.exception.RetriesExhaustedException; +import org.apache.cassandra.spark.bulkwriter.JobStatsListener; +import org.apache.cassandra.spark.common.JobStats; import org.apache.cassandra.spark.config.SchemaFeature; import org.apache.cassandra.spark.config.SchemaFeatureSet; import org.apache.cassandra.spark.data.partitioner.CassandraInstance; @@ -85,6 +90,7 @@ import org.apache.cassandra.spark.sparksql.LastModifiedTimestampDecorator; import org.apache.cassandra.spark.sparksql.RowBuilder; import org.apache.cassandra.spark.stats.Stats; +import org.apache.cassandra.spark.utils.BuildInfo; import org.apache.cassandra.spark.utils.CqlUtils; import org.apache.cassandra.spark.utils.ReaderTimeProvider; import org.apache.cassandra.spark.utils.ScalaFunctions; @@ -94,6 +100,7 @@ import org.apache.cassandra.spark.validation.SidecarValidation; import org.apache.cassandra.spark.validation.StartupValidatable; import org.apache.cassandra.spark.validation.StartupValidator; +import org.apache.spark.SparkContext; import org.apache.spark.sql.types.DataType; import org.apache.spark.util.ShutdownHookManager; import org.jetbrains.annotations.NotNull; @@ -101,7 +108,7 @@ import static org.apache.cassandra.spark.utils.Properties.NODE_STATUS_NOT_CONSIDERED; -public class CassandraDataLayer extends PartitionedDataLayer implements StartupValidatable, Serializable +public class CassandraDataLayer extends PartitionedDataLayer implements StartupValidatable, Serializable, JobStats { private static final long serialVersionUID = -9038926850642710787L; @@ -132,6 +139,9 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV protected boolean useIncrementalRepair; protected List requestedFeatures; protected Map rfMap; + + private final AtomicReference jobId; + private final JobStatsListener jobStatsListener; @Nullable protected String lastModifiedTimestampField; // volatile in order to publish the reference for visibility @@ -144,6 +154,8 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV @VisibleForTesting transient Map instanceMap; + private String internalJobId; + public CassandraDataLayer(@NotNull ClientConfig options, @NotNull Sidecar.ClientConfig sidecarClientConfig, @Nullable SslConfig sslConfig) @@ -163,6 +175,18 @@ public CassandraDataLayer(@NotNull ClientConfig options, this.useIncrementalRepair = options.useIncrementalRepair(); this.lastModifiedTimestampField = options.lastModifiedTimestampField(); this.requestedFeatures = options.requestedFeatures(); + this.jobId = new AtomicReference<>(); + + // Note: Consumers are called for all jobs and task. We ONLY have to publish for existing job + this.jobStatsListener = new JobStatsListener((jobEventDetail) -> { + if (!internalJobId.isEmpty() && internalJobId.equals(jobEventDetail.internalJobID())) + { + Map stats = new HashMap<>(); + stats.put("jobId", StringUtils.defaultString(internalJobId, "null")); + stats.putAll(jobEventDetail.jobStats()); + publish(stats); + } + }); } // For serialization @@ -209,6 +233,17 @@ protected CassandraDataLayer(@Nullable String keyspace, this.useIncrementalRepair = useIncrementalRepair; this.lastModifiedTimestampField = lastModifiedTimestampField; this.requestedFeatures = requestedFeatures; + this.jobId = new AtomicReference<>(); + this.jobStatsListener = new JobStatsListener((jobEventDetail) -> { + if (!internalJobId.isEmpty() && internalJobId.equals(jobEventDetail.internalJobID())) + { + Map stats = new HashMap<>(); + stats.put("jobId", StringUtils.defaultString(internalJobId, "null")); + stats.putAll(jobEventDetail.jobStats()); + publish(stats); + } + }); + if (lastModifiedTimestampField != null) { aliasLastModifiedTimestamp(this.requestedFeatures, this.lastModifiedTimestampField); @@ -222,7 +257,11 @@ protected CassandraDataLayer(@Nullable String keyspace, public void initialize(@NotNull ClientConfig options) { - dialHome(options); + SparkContext.getOrCreate().addSparkListener(jobStatsListener); + String description = "Cassandra Bulk Read for table " + table; + internalJobId = UUID.randomUUID().toString(); + SparkContext.getOrCreate().setJobGroup(internalJobId, description, false); + publishJobStats(internalJobId, options); timeProvider = new ReaderTimeProvider(); LOGGER.info("Starting Cassandra Spark job snapshotName={} keyspace={} table={} dc={} referenceEpoch={}", @@ -791,6 +830,42 @@ private static String readNullable(ObjectInputStream in) throws IOException return null; } + private void publishJobStats(String internalJobId, @NotNull ClientConfig options) + { + Map jobStats = new HashMap() + { + { + put("application", "Cassandra Spark Bulk Reader"); + put("jobId", StringUtils.defaultString(internalJobId, "null")); + put("keyspace", StringUtils.defaultString(keyspace, "null")); + put("table", StringUtils.defaultString(table, "null")); + put("snapshotName", snapshotName); + put("datacenter", datacenter); + put("numCores", String.valueOf(options.numCores())); + put("defaultParallelism", String.valueOf(options.defaultParallelism())); + put("jvmVersion", System.getProperty("java.version")); + put("useMtls", String.valueOf(true)); + put("maxRetries", String.valueOf(sidecarClientConfig.maxRetries())); + put("maxPoolSize", String.valueOf(sidecarClientConfig.maxPoolSize())); + put("timeoutSeconds", String.valueOf(sidecarClientConfig.timeoutSeconds())); + put("millisToSleep", String.valueOf(sidecarClientConfig.millisToSleep())); + put("maxBufferSize", String.valueOf(sidecarClientConfig.maxBufferSize())); + put("chunkSize", String.valueOf(sidecarClientConfig.chunkBufferSize())); + put("consistencyLevel", String.valueOf(consistencyLevel)); + put("analyticsVersion", BuildInfo.BUILD_VERSION_AND_REVISION); + put("sparkVersion", SparkContext.getOrCreate().version()); + put("quoteIdentifiers", String.valueOf(quoteIdentifiers)); + put("dataTransport", "DIRECT"); + } + }; + publish(jobStats); + } + + public void publish(Map stats) + { + LOGGER.info("Job Stats:" + stats); + } + // Kryo Serialization public static class Serializer extends com.esotericsoftware.kryo.Serializer diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataSourceHelper.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataSourceHelper.java index 2ae3370fa..09c138d0d 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataSourceHelper.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataSourceHelper.java @@ -129,6 +129,7 @@ protected static CassandraDataLayer createAndInitCassandraDataLayer( dataLayer.startupValidate(); + return dataLayer; } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/PartitionedDataLayer.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/PartitionedDataLayer.java index ccb9f3d5e..22c9906fa 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/PartitionedDataLayer.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/PartitionedDataLayer.java @@ -285,6 +285,7 @@ public SSTablesSupplier sstables(int partitionId, int minReplicas = consistencyLevel.blockFor(replicationFactor, datacenter); ReplicaSet replicaSet = PartitionedDataLayer.splitReplicas( consistencyLevel, datacenter, instRanges, replicas, this::getAvailability, minReplicas, partitionId); + if (replicaSet.primary().size() < minReplicas) { // Could not find enough primary replicas to meet consistency level From 37c78eb34f165ce0129d5630e39e3658ffc6d385 Mon Sep 17 00:00:00 2001 From: Arjun Ashok Date: Mon, 15 Apr 2024 19:02:32 -0500 Subject: [PATCH 2/5] Adds minor changes and tests to validate the stats listener --- .../CassandraBulkSourceRelation.java | 1 + .../cassandra/spark/common/JobStats.java | 35 ----- .../stats}/JobStatsListener.java | 2 +- .../spark/common/stats/JobStatsPublisher.java | 5 +- .../spark/common/stats/LogStatsPublisher.java | 5 + .../spark/data/CassandraDataLayer.java | 21 ++- .../spark/JobStatsListenerTests.java | 130 ++++++++++++++++++ .../bulkwriter/MockBulkWriterContext.java | 10 +- 8 files changed, 159 insertions(+), 50 deletions(-) delete mode 100644 cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/JobStats.java rename cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/{bulkwriter => common/stats}/JobStatsListener.java (99%) create mode 100644 cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/JobStatsListenerTests.java diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java index 309f4aade..8df98cbbf 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java @@ -41,6 +41,7 @@ import org.apache.cassandra.spark.bulkwriter.blobupload.BlobStreamResult; import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler; import org.apache.cassandra.spark.common.client.ClientException; +import org.apache.cassandra.spark.common.stats.JobStatsListener; import org.apache.cassandra.spark.transports.storage.extensions.StorageTransportConfiguration; import org.apache.cassandra.spark.transports.storage.extensions.StorageTransportExtension; import org.apache.cassandra.spark.transports.storage.extensions.StorageTransportHandler; diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/JobStats.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/JobStats.java deleted file mode 100644 index 9688cd487..000000000 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/JobStats.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.cassandra.spark.common; - -import java.util.Map; - -/** - * Interface to provide functionality to report Spark Job Statistics and/or properties - * that can optionally be instrumented. The default implementation merely logs these - * stats at the end of the job. - */ -public interface JobStats -{ - /** - * Publish the accumulated job attributes to be persisted and summarized - */ - void publish(Map stats); -} diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobStatsListener.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/stats/JobStatsListener.java similarity index 99% rename from cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobStatsListener.java rename to cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/stats/JobStatsListener.java index 4f068531a..af42c4b36 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobStatsListener.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/stats/JobStatsListener.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.cassandra.spark.bulkwriter; +package org.apache.cassandra.spark.common.stats; import java.util.HashMap; import java.util.HashSet; diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/stats/JobStatsPublisher.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/stats/JobStatsPublisher.java index 9027ce437..5abcea258 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/stats/JobStatsPublisher.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/stats/JobStatsPublisher.java @@ -30,8 +30,9 @@ public interface JobStatsPublisher { /** * Publish the job attributes to be persisted and summarized - * - * @param stats the stats to publish + * @param stats mapping of the metric names and their values */ void publish(Map stats); + + Map stats(); } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/stats/LogStatsPublisher.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/stats/LogStatsPublisher.java index ce6ac0c1b..380ad417c 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/stats/LogStatsPublisher.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/stats/LogStatsPublisher.java @@ -37,4 +37,9 @@ public void publish(Map stats) { LOGGER.info("Job Stats: {}", stats); } + + public Map stats() + { + throw new UnsupportedOperationException("Operation not supported for log publishing"); + } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java index 260d83349..1befdadac 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java @@ -78,8 +78,9 @@ import org.apache.cassandra.sidecar.client.SidecarInstanceImpl; import org.apache.cassandra.sidecar.client.SimpleSidecarInstancesProvider; import org.apache.cassandra.sidecar.client.exception.RetriesExhaustedException; -import org.apache.cassandra.spark.bulkwriter.JobStatsListener; -import org.apache.cassandra.spark.common.JobStats; +import org.apache.cassandra.spark.common.stats.JobStatsListener; +import org.apache.cassandra.spark.common.stats.JobStatsPublisher; +import org.apache.cassandra.spark.common.stats.LogStatsPublisher; import org.apache.cassandra.spark.config.SchemaFeature; import org.apache.cassandra.spark.config.SchemaFeatureSet; import org.apache.cassandra.spark.data.partitioner.CassandraInstance; @@ -108,7 +109,7 @@ import static org.apache.cassandra.spark.utils.Properties.NODE_STATUS_NOT_CONSIDERED; -public class CassandraDataLayer extends PartitionedDataLayer implements StartupValidatable, Serializable, JobStats +public class CassandraDataLayer extends PartitionedDataLayer implements StartupValidatable, Serializable { private static final long serialVersionUID = -9038926850642710787L; @@ -148,6 +149,7 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV protected volatile CqlTable cqlTable; protected transient TimeProvider timeProvider; protected transient SidecarClient sidecar; + private transient JobStatsPublisher jobStatsPublisher; private SslConfig sslConfig; @@ -176,6 +178,7 @@ public CassandraDataLayer(@NotNull ClientConfig options, this.lastModifiedTimestampField = options.lastModifiedTimestampField(); this.requestedFeatures = options.requestedFeatures(); this.jobId = new AtomicReference<>(); + this.jobStatsPublisher = new LogStatsPublisher(); // Note: Consumers are called for all jobs and task. We ONLY have to publish for existing job this.jobStatsListener = new JobStatsListener((jobEventDetail) -> { @@ -184,7 +187,7 @@ public CassandraDataLayer(@NotNull ClientConfig options, Map stats = new HashMap<>(); stats.put("jobId", StringUtils.defaultString(internalJobId, "null")); stats.putAll(jobEventDetail.jobStats()); - publish(stats); + jobStatsPublisher.publish(stats); } }); } @@ -234,13 +237,14 @@ protected CassandraDataLayer(@Nullable String keyspace, this.lastModifiedTimestampField = lastModifiedTimestampField; this.requestedFeatures = requestedFeatures; this.jobId = new AtomicReference<>(); + this.jobStatsPublisher = new LogStatsPublisher(); this.jobStatsListener = new JobStatsListener((jobEventDetail) -> { if (!internalJobId.isEmpty() && internalJobId.equals(jobEventDetail.internalJobID())) { Map stats = new HashMap<>(); stats.put("jobId", StringUtils.defaultString(internalJobId, "null")); stats.putAll(jobEventDetail.jobStats()); - publish(stats); + jobStatsPublisher.publish(stats); } }); @@ -858,12 +862,7 @@ private void publishJobStats(String internalJobId, @NotNull ClientConfig options put("dataTransport", "DIRECT"); } }; - publish(jobStats); - } - - public void publish(Map stats) - { - LOGGER.info("Job Stats:" + stats); + jobStatsPublisher.publish(jobStats); } // Kryo Serialization diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/JobStatsListenerTests.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/JobStatsListenerTests.java new file mode 100644 index 000000000..494d096da --- /dev/null +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/JobStatsListenerTests.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark; + +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.apache.cassandra.spark.common.stats.JobStatsListener; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; +import org.apache.spark.TaskEndReason; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.scheduler.SparkListenerJobEnd; +import org.apache.spark.scheduler.SparkListenerJobStart; +import org.apache.spark.scheduler.SparkListenerTaskEnd; +import org.apache.spark.scheduler.StageInfo; +import org.apache.spark.scheduler.TaskInfo; +import org.mockito.Mockito; +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.mockito.Mockito.when; + +public class JobStatsListenerTests +{ + private SparkContext sparkContext; + + @BeforeEach + public void setUp() + { + SparkConf conf = new SparkConf().setMaster("local").setAppName("test"); + sparkContext = new SparkContext(conf); + } + + @AfterEach + public void tearDown() + { + sparkContext.stop(); + } + + @Test + public void testOnEventHandling() + { + + List stageInfoList = Collections.emptyList(); + Seq stageInfoSeq = JavaConverters.asScalaIteratorConverter(stageInfoList.iterator()) + .asScala().toSeq(); + AtomicReference jobEvent = new AtomicReference<>(); + String jobId = UUID.randomUUID().toString(); + + JobStatsListener listener = new JobStatsListener((jobEventDetail) -> { + System.out.println("Job End Called"); + jobEvent.set(jobEventDetail); + }); + + Properties p = new Properties(); + p.setProperty("spark.jobGroup.id", jobId); + SparkListenerJobStart jobStartEvent = new SparkListenerJobStart(1, + System.currentTimeMillis(), + stageInfoSeq, + p); + listener.onJobStart(jobStartEvent); + listener.onTaskEnd(createMockTaskEndEvent()); + SparkListenerJobEnd mockJobEnd = Mockito.mock(SparkListenerJobEnd.class); + listener.onJobEnd(mockJobEnd); + + assertThat(jobEvent.get().internalJobID()).isEqualTo(jobId); + assertThat(jobEvent.get().jobStats().get("jobId")).isEqualTo(jobId); + assertThat(jobEvent.get().jobStats().get("jobStatus")).isEqualTo("Succeeded"); + assertThat(jobEvent.get().jobStats().get("maxTaskRetriesMillis")).isEqualTo("1"); + assertThat(jobEvent.get().jobStats().get("p50TaskRuntimeMillis")).isEqualTo("1.0"); + assertThat(jobEvent.get().jobStats().get("meanTaskRuntimeMillis")).isEqualTo("1.0"); + assertThat(jobEvent.get().jobStats().get("maxTaskRuntimeMillis")).isEqualTo("1.0"); + assertThat(jobEvent.get().jobStats().get("p95TaskRuntimeMillis")).isEqualTo("1.0"); + + // Validate that the previous job's metrics are reset after job completion + listener.onJobStart(jobStartEvent); + listener.onJobEnd(mockJobEnd); + assertFalse(jobEvent.get().jobStats().containsKey("maxTaskRetriesMillis")); + assertFalse(jobEvent.get().jobStats().containsKey("p50TaskRuntimeMillis")); + assertFalse(jobEvent.get().jobStats().containsKey("meanTaskRuntimeMillis")); + assertFalse(jobEvent.get().jobStats().containsKey("maxTaskRuntimeMillis")); + assertFalse(jobEvent.get().jobStats().containsKey("p95TaskRuntimeMillis")); + + } + + private SparkListenerTaskEnd createMockTaskEndEvent() + { + SparkListenerTaskEnd mockTaskEnd = Mockito.mock(SparkListenerTaskEnd.class); + TaskMetrics mTaskMetrics = Mockito.mock(TaskMetrics.class); + when(mTaskMetrics.executorRunTime()).thenReturn(1L); + TaskInfo mTaskInfo = Mockito.mock(TaskInfo.class); + when(mTaskInfo.taskId()).thenReturn(1L); + when(mTaskInfo.attemptNumber()).thenReturn(1); + + TaskEndReason mTaskEndReason = Mockito.mock(TaskEndReason.class); + when(mTaskEndReason.toString()).thenReturn("Success"); + + when(mockTaskEnd.taskMetrics()).thenReturn(mTaskMetrics); + when(mockTaskEnd.taskInfo()).thenReturn(mTaskInfo); + when(mockTaskEnd.reason()).thenReturn(mTaskEndReason); + return mockTaskEnd; + } +} diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java index 205d4129c..e605fa6aa 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java @@ -23,6 +23,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -81,12 +82,19 @@ public class MockBulkWriterContext implements BulkWriterContext, ClusterInfo, Jo new CqlField.CqlType[]{mockCqlType(INT), mockCqlType(DATE), mockCqlType(VARCHAR), mockCqlType(INT)}); private ConsistencyLevel.CL consistencyLevel; private int sstableDataSizeInMB = 128; + private int sstableWriteBatchSize = 2; + private Map jobStats = new HashMap<>(); private CassandraBridge bridge = CassandraBridgeFactory.get(CassandraVersion.FOURZERO); @Override public void publish(Map stats) { - // DO NOTHING + jobStats.putAll(stats); + } + + public Map stats() + { + return jobStats; } public interface CommitResultSupplier extends BiFunction, String, DirectDataTransferApi.RemoteCommitResult> From 8f7bf340d5669a64ee809905b4d5b589466c2180 Mon Sep 17 00:00:00 2001 From: Arjun Ashok Date: Mon, 15 Apr 2024 19:16:36 -0500 Subject: [PATCH 3/5] Prevent errors from stats listeners from propagating --- .../spark/common/stats/JobStatsListener.java | 100 +++++++++++------- 1 file changed, 63 insertions(+), 37 deletions(-) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/stats/JobStatsListener.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/stats/JobStatsListener.java index af42c4b36..4cb4fa61e 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/stats/JobStatsListener.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/stats/JobStatsListener.java @@ -62,57 +62,83 @@ public JobStatsListener(Consumer jobCompletionConsumer) @Override public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) { - // Calculate max task retries across all tasks in job - int attempt = taskEnd.taskInfo().attemptNumber(); - jobIdToTaskRetryStats.compute(jobId, (k, v) -> (v == null || attempt > v) ? attempt : v); - // Persist all task metrics for the job - across all stages - jobIdToTaskMetrics.computeIfAbsent(jobId, k -> new HashSet<>()).add(taskEnd.taskMetrics()); - LOGGER.debug("Task END for jobId:{} task:{} task attempt:{}} Reason:{}", - jobId, - taskEnd.taskInfo().taskId(), - taskEnd.taskInfo().attemptNumber(), - taskEnd.reason()); + try + { + // Calculate max task retries across all tasks in job + int attempt = taskEnd.taskInfo().attemptNumber(); + jobIdToTaskRetryStats.compute(jobId, (k, v) -> (v == null || attempt > v) ? attempt : v); + // Persist all task metrics for the job - across all stages + jobIdToTaskMetrics.computeIfAbsent(jobId, k -> new HashSet<>()).add(taskEnd.taskMetrics()); + LOGGER.debug("Task END for jobId:{} task:{} task attempt:{}} Reason:{}", + jobId, + taskEnd.taskInfo().taskId(), + taskEnd.taskInfo().attemptNumber(), + taskEnd.reason()); + + } + catch (Exception e) + { + LOGGER.warn("Failed to process job stats for the task completion event with jobId: {}", + internalJobIdMapping.get(jobId), e); + } } @Override public void onJobStart(SparkListenerJobStart jobStart) { String internalJobId = (String) jobStart.properties().get("spark.jobGroup.id"); - jobId = Integer.valueOf(jobStart.jobId()); - internalJobIdMapping.put(jobId, internalJobId); - jobIdToStartTimes.put(jobId, System.nanoTime()); + try + { + + jobId = Integer.valueOf(jobStart.jobId()); + internalJobIdMapping.put(jobId, internalJobId); + jobIdToStartTimes.put(jobId, System.nanoTime()); + } + catch (Exception e) + { + LOGGER.warn("Failed to process job stats for the job start event with jobId: {}", internalJobId, e); + } } @Override public void onJobEnd(SparkListenerJobEnd jobEnd) { - boolean jobFailed = false; - String reason = "null"; - if (jobEnd.jobResult() instanceof JobFailed) + try + { + boolean jobFailed = false; + String reason = "null"; + if (jobEnd.jobResult() instanceof JobFailed) + { + jobFailed = true; + JobFailed result = (JobFailed) jobEnd.jobResult(); + reason = result.exception().getCause().getMessage(); + } + + long elapsedTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - jobIdToStartTimes.get(jobId)); + String internalJobId = internalJobIdMapping.get(jobId); + String jobStatus = (jobFailed) ? "Failed" : "Succeeded"; + Map jobStats = new HashMap<>(); + jobStats.put("jobId", internalJobId); + jobStats.put("jobStatus", jobStatus); + jobStats.put("failureReason", reason); + jobStats.put("jobElapsedTimeMillis", String.valueOf(elapsedTimeMillis)); + + LOGGER.debug("Job END for jobId:{} status:{} Reason:{} ElapsedTime: {}", + jobId, + jobStatus, + reason, + elapsedTimeMillis); + + jobStats.putAll(getJobMetrics(jobId)); + jobCompletionConsumer.accept(new JobEventDetail(internalJobId, jobStats)); + cleanup(jobId); + } + catch (Exception e) { - jobFailed = true; - JobFailed result = (JobFailed) jobEnd.jobResult(); - reason = result.exception().getCause().getMessage(); + LOGGER.warn("Failed to process job stats for the job completion event with jobId: {}", + internalJobIdMapping.get(jobId), e); } - long elapsedTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - jobIdToStartTimes.get(jobId)); - String internalJobId = internalJobIdMapping.get(jobId); - String jobStatus = (jobFailed) ? "Failed" : "Succeeded"; - Map jobStats = new HashMap<>(); - jobStats.put("jobId", internalJobId); - jobStats.put("jobStatus", jobStatus); - jobStats.put("failureReason", reason); - jobStats.put("jobElapsedTimeMillis", String.valueOf(elapsedTimeMillis)); - - LOGGER.debug("Job END for jobId:{} status:{} Reason:{} ElapsedTime: {}", - jobId, - jobStatus, - reason, - elapsedTimeMillis); - - jobStats.putAll(getJobMetrics(jobId)); - jobCompletionConsumer.accept(new JobEventDetail(internalJobId, jobStats)); - cleanup(jobId); } public synchronized Map getJobMetrics(int jobId) From dd97edb392f6fad8a6d8f02be8be9e3081591535 Mon Sep 17 00:00:00 2001 From: Arjun Ashok Date: Thu, 25 Apr 2024 16:11:27 -0700 Subject: [PATCH 4/5] Fix for java8 --- .../cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java index 8df98cbbf..a4ecf82d1 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java @@ -268,7 +268,7 @@ private void persist(@NotNull JavaPairRDD sortedRDD, Str private void publishSuccessfulJobStats(long rowCount, long totalBytesWritten, boolean hasClusterTopologyChanged) { - Map stats = new HashMap<>() + Map stats = new HashMap() { { put("jobId", writerContext.job().getId().toString()); From 7bb12eb970edae5bc393172a47c1b3016707d14b Mon Sep 17 00:00:00 2001 From: Arjun Ashok Date: Thu, 25 Apr 2024 16:40:57 -0700 Subject: [PATCH 5/5] Cleanup after rebase --- .../spark/bulkwriter/CassandraBulkWriterContext.java | 3 --- .../cassandra/spark/common/stats/LogStatsPublisher.java | 1 + .../org/apache/cassandra/spark/data/CassandraDataLayer.java | 5 ----- .../apache/cassandra/spark/data/PartitionedDataLayer.java | 1 - 4 files changed, 1 insertion(+), 9 deletions(-) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java index ce4108071..0477a2007 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java @@ -68,7 +68,6 @@ protected CassandraBulkWriterContext(@NotNull BulkSparkConf conf, { this.conf = conf; this.clusterInfo = clusterInfo; - clusterInfo.startupValidate(); this.jobStatsPublisher = new LogStatsPublisher(); lowestCassandraVersion = clusterInfo.getLowestCassandraVersion(); this.bridge = CassandraBridgeFactory.get(lowestCassandraVersion); @@ -124,9 +123,7 @@ public static BulkWriterContext fromOptions(@NotNull SparkContext sparkContext, BulkSparkConf conf = new BulkSparkConf(sparkContext.getConf(), strOptions); CassandraClusterInfo clusterInfo = new CassandraClusterInfo(conf); - clusterInfo.startupValidate(); - CassandraBulkWriterContext bulkWriterContext = new CassandraBulkWriterContext(conf, clusterInfo, dfSchema, sparkContext); ShutdownHookManager.addShutdownHook(org.apache.spark.util.ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY(), ScalaFunctions.wrapLambda(bulkWriterContext::shutdown)); diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/stats/LogStatsPublisher.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/stats/LogStatsPublisher.java index 380ad417c..5408428db 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/stats/LogStatsPublisher.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/stats/LogStatsPublisher.java @@ -38,6 +38,7 @@ public void publish(Map stats) LOGGER.info("Job Stats: {}", stats); } + @Override public Map stats() { throw new UnsupportedOperationException("Operation not supported for log publishing"); diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java index 1befdadac..f6d6e8d34 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java @@ -42,7 +42,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -140,8 +139,6 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV protected boolean useIncrementalRepair; protected List requestedFeatures; protected Map rfMap; - - private final AtomicReference jobId; private final JobStatsListener jobStatsListener; @Nullable protected String lastModifiedTimestampField; @@ -177,7 +174,6 @@ public CassandraDataLayer(@NotNull ClientConfig options, this.useIncrementalRepair = options.useIncrementalRepair(); this.lastModifiedTimestampField = options.lastModifiedTimestampField(); this.requestedFeatures = options.requestedFeatures(); - this.jobId = new AtomicReference<>(); this.jobStatsPublisher = new LogStatsPublisher(); // Note: Consumers are called for all jobs and task. We ONLY have to publish for existing job @@ -236,7 +232,6 @@ protected CassandraDataLayer(@Nullable String keyspace, this.useIncrementalRepair = useIncrementalRepair; this.lastModifiedTimestampField = lastModifiedTimestampField; this.requestedFeatures = requestedFeatures; - this.jobId = new AtomicReference<>(); this.jobStatsPublisher = new LogStatsPublisher(); this.jobStatsListener = new JobStatsListener((jobEventDetail) -> { if (!internalJobId.isEmpty() && internalJobId.equals(jobEventDetail.internalJobID())) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/PartitionedDataLayer.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/PartitionedDataLayer.java index 22c9906fa..ccb9f3d5e 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/PartitionedDataLayer.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/PartitionedDataLayer.java @@ -285,7 +285,6 @@ public SSTablesSupplier sstables(int partitionId, int minReplicas = consistencyLevel.blockFor(replicationFactor, datacenter); ReplicaSet replicaSet = PartitionedDataLayer.splitReplicas( consistencyLevel, datacenter, instRanges, replicas, this::getAvailability, minReplicas, partitionId); - if (replicaSet.primary().size() < minReplicas) { // Could not find enough primary replicas to meet consistency level