diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/JobEventDetail.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/JobEventDetail.java new file mode 100644 index 000000000..1369cd638 --- /dev/null +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/JobEventDetail.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark; + +import java.util.Map; + +public class JobEventDetail +{ + private String internalJobID; + private Map jobStats; + + public JobEventDetail(String internalJobID, Map jobStats) + { + this.internalJobID = internalJobID; + this.jobStats = jobStats; + } + + public String internalJobID() + { + return internalJobID; + } + + public Map jobStats() + { + return jobStats; + } +} diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java index 50a01193f..a4ecf82d1 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.UUID; @@ -40,6 +41,7 @@ import org.apache.cassandra.spark.bulkwriter.blobupload.BlobStreamResult; import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler; import org.apache.cassandra.spark.common.client.ClientException; +import org.apache.cassandra.spark.common.stats.JobStatsListener; import org.apache.cassandra.spark.transports.storage.extensions.StorageTransportConfiguration; import org.apache.cassandra.spark.transports.storage.extensions.StorageTransportExtension; import org.apache.cassandra.spark.transports.storage.extensions.StorageTransportHandler; @@ -65,6 +67,7 @@ public class CassandraBulkSourceRelation extends BaseRelation implements Inserta private final SQLContext sqlContext; private final JavaSparkContext sparkContext; private final Broadcast broadcastContext; + private final JobStatsListener jobStatsListener; private final BulkWriteValidator writeValidator; private HeartbeatReporter heartbeatReporter; private long startTimeNanos; @@ -79,6 +82,14 @@ public CassandraBulkSourceRelation(BulkWriterContext writerContext, SQLContext s ReplicaAwareFailureHandler failureHandler = new ReplicaAwareFailureHandler<>(writerContext.cluster().getPartitioner()); this.writeValidator = new BulkWriteValidator(writerContext, failureHandler); onCloudStorageTransport(ignored -> this.heartbeatReporter = new HeartbeatReporter()); + this.jobStatsListener = new JobStatsListener((jobEventDetail) -> { + if (writerContext.job().getId().toString().equals(jobEventDetail.internalJobID())) + { + writerContext.jobStats().publish(jobEventDetail.jobStats()); + } + }); + + this.sparkContext.sc().addSparkListener(jobStatsListener); } @Override @@ -114,7 +125,6 @@ public void insert(@NotNull Dataset data, boolean overwrite) { validateJob(overwrite); this.startTimeNanos = System.nanoTime(); - maybeEnableTransportExtension(); Tokenizer tokenizer = new Tokenizer(writerContext); TableSchema tableSchema = writerContext.schema().getTableSchema(); @@ -186,7 +196,8 @@ private void persist(@NotNull JavaPairRDD sortedRDD, Str long rowCount = streamResults.stream().mapToLong(res -> res.rowCount).sum(); long totalBytesWritten = streamResults.stream().mapToLong(res -> res.bytesWritten).sum(); - boolean hasClusterTopologyChanged = writeResults.stream().anyMatch(WriteResult::isClusterResizeDetected); + boolean hasClusterTopologyChanged = writeResults.stream() + .anyMatch(WriteResult::isClusterResizeDetected); onCloudStorageTransport(context -> { LOGGER.info("Waiting for Cassandra to complete import slices. rows={} bytes={} cluster_resized={}", @@ -217,7 +228,7 @@ private void persist(@NotNull JavaPairRDD sortedRDD, Str markRestoreJobAsSucceeded(context); }); - LOGGER.info("Bulk writer job complete. rows={} bytes={} cluster_resized={}", + LOGGER.info("Bulk writer job complete. rows={} bytes={} cluster_resize={}", rowCount, totalBytesWritten, hasClusterTopologyChanged); @@ -225,8 +236,7 @@ private void persist(@NotNull JavaPairRDD sortedRDD, Str } catch (Throwable throwable) { - publishFailureJobStats(throwable.getMessage()); - LOGGER.error("Bulk Write Failed.", throwable); + LOGGER.error("Bulk Write Failed", throwable); RuntimeException failure = new RuntimeException("Bulk Write to Cassandra has failed", throwable); try { @@ -258,28 +268,17 @@ private void persist(@NotNull JavaPairRDD sortedRDD, Str private void publishSuccessfulJobStats(long rowCount, long totalBytesWritten, boolean hasClusterTopologyChanged) { - writerContext.jobStats().publish(new HashMap() // type declaration required to compile with java8 - {{ + Map stats = new HashMap() + { + { put("jobId", writerContext.job().getId().toString()); put("transportInfo", writerContext.job().transportInfo().toString()); put("rowsWritten", Long.toString(rowCount)); put("bytesWritten", Long.toString(totalBytesWritten)); - put("jobStatus", "Succeeded"); put("clusterResizeDetected", String.valueOf(hasClusterTopologyChanged)); - put("jobElapsedTimeMillis", Long.toString(elapsedTimeMillis())); - }}); - } - - private void publishFailureJobStats(String reason) - { - writerContext.jobStats().publish(new HashMap() // type declaration required to compile with java8 - {{ - put("jobId", writerContext.job().getId().toString()); - put("transportInfo", writerContext.job().transportInfo().toString()); - put("jobStatus", "Failed"); - put("failureReason", reason); - put("jobElapsedTimeMillis", Long.toString(elapsedTimeMillis())); - }}); + } + }; + writerContext.jobStats().publish(stats); } /** diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java index 89f2bb911..0477a2007 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java @@ -68,7 +68,6 @@ protected CassandraBulkWriterContext(@NotNull BulkSparkConf conf, { this.conf = conf; this.clusterInfo = clusterInfo; - clusterInfo.startupValidate(); this.jobStatsPublisher = new LogStatsPublisher(); lowestCassandraVersion = clusterInfo.getLowestCassandraVersion(); this.bridge = CassandraBridgeFactory.get(lowestCassandraVersion); @@ -124,6 +123,7 @@ public static BulkWriterContext fromOptions(@NotNull SparkContext sparkContext, BulkSparkConf conf = new BulkSparkConf(sparkContext.getConf(), strOptions); CassandraClusterInfo clusterInfo = new CassandraClusterInfo(conf); + clusterInfo.startupValidate(); CassandraBulkWriterContext bulkWriterContext = new CassandraBulkWriterContext(conf, clusterInfo, dfSchema, sparkContext); ShutdownHookManager.addShutdownHook(org.apache.spark.util.ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY(), ScalaFunctions.wrapLambda(bulkWriterContext::shutdown)); @@ -135,12 +135,11 @@ private void publishInitialJobStats(String sparkVersion) { Map initialJobStats = new HashMap() // type declaration required to compile with java8 {{ - put("jobId", jobInfo.getId().toString()); put("sparkVersion", sparkVersion); - put("keyspace", jobInfo.getId().toString()); - put("table", jobInfo.getId().toString()); + put("keyspace", jobInfo.getId()); + put("table", jobInfo.qualifiedTableName().toString()); }}; - jobStatsPublisher.publish(initialJobStats); + publish(initialJobStats); } @Override @@ -256,4 +255,9 @@ protected TableSchema initializeTableSchema(@NotNull BulkSparkConf conf, lowestCassandraVersion, jobInfo.qualifiedTableName().quoteIdentifiers()); } + + public void publish(Map stats) + { + LOGGER.info("Job Stats:" + stats); + } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/stats/JobStatsListener.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/stats/JobStatsListener.java new file mode 100644 index 000000000..4cb4fa61e --- /dev/null +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/stats/JobStatsListener.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.common.stats; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import org.apache.commons.math3.stat.StatUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.spark.JobEventDetail; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.scheduler.JobFailed; +import org.apache.spark.scheduler.SparkListener; +import org.apache.spark.scheduler.SparkListenerJobEnd; +import org.apache.spark.scheduler.SparkListenerJobStart; +import org.apache.spark.scheduler.SparkListenerTaskEnd; + +/** + * Spark listener implementation to capture stats on completion of jobs and tasks. + */ +public class JobStatsListener extends SparkListener +{ + private final Map jobIdToTaskRetryStats = new HashMap<>(); + private final Map> jobIdToTaskMetrics = new HashMap<>(); + private final Map jobIdToStartTimes = new HashMap<>(); + private final Map internalJobIdMapping = new HashMap<>(); + + private static int jobId = -1; + private static final Logger LOGGER = LoggerFactory.getLogger(JobStatsListener.class); + private final Consumer jobCompletionConsumer; + + public JobStatsListener(Consumer jobCompletionConsumer) + { + this.jobCompletionConsumer = jobCompletionConsumer; + } + + @Override + public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) + { + try + { + // Calculate max task retries across all tasks in job + int attempt = taskEnd.taskInfo().attemptNumber(); + jobIdToTaskRetryStats.compute(jobId, (k, v) -> (v == null || attempt > v) ? attempt : v); + // Persist all task metrics for the job - across all stages + jobIdToTaskMetrics.computeIfAbsent(jobId, k -> new HashSet<>()).add(taskEnd.taskMetrics()); + LOGGER.debug("Task END for jobId:{} task:{} task attempt:{}} Reason:{}", + jobId, + taskEnd.taskInfo().taskId(), + taskEnd.taskInfo().attemptNumber(), + taskEnd.reason()); + + } + catch (Exception e) + { + LOGGER.warn("Failed to process job stats for the task completion event with jobId: {}", + internalJobIdMapping.get(jobId), e); + } + } + + @Override + public void onJobStart(SparkListenerJobStart jobStart) + { + String internalJobId = (String) jobStart.properties().get("spark.jobGroup.id"); + try + { + + jobId = Integer.valueOf(jobStart.jobId()); + internalJobIdMapping.put(jobId, internalJobId); + jobIdToStartTimes.put(jobId, System.nanoTime()); + } + catch (Exception e) + { + LOGGER.warn("Failed to process job stats for the job start event with jobId: {}", internalJobId, e); + } + } + + @Override + public void onJobEnd(SparkListenerJobEnd jobEnd) + { + try + { + boolean jobFailed = false; + String reason = "null"; + if (jobEnd.jobResult() instanceof JobFailed) + { + jobFailed = true; + JobFailed result = (JobFailed) jobEnd.jobResult(); + reason = result.exception().getCause().getMessage(); + } + + long elapsedTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - jobIdToStartTimes.get(jobId)); + String internalJobId = internalJobIdMapping.get(jobId); + String jobStatus = (jobFailed) ? "Failed" : "Succeeded"; + Map jobStats = new HashMap<>(); + jobStats.put("jobId", internalJobId); + jobStats.put("jobStatus", jobStatus); + jobStats.put("failureReason", reason); + jobStats.put("jobElapsedTimeMillis", String.valueOf(elapsedTimeMillis)); + + LOGGER.debug("Job END for jobId:{} status:{} Reason:{} ElapsedTime: {}", + jobId, + jobStatus, + reason, + elapsedTimeMillis); + + jobStats.putAll(getJobMetrics(jobId)); + jobCompletionConsumer.accept(new JobEventDetail(internalJobId, jobStats)); + cleanup(jobId); + } + catch (Exception e) + { + LOGGER.warn("Failed to process job stats for the job completion event with jobId: {}", + internalJobIdMapping.get(jobId), e); + } + + } + + public synchronized Map getJobMetrics(int jobId) + { + Map jobMetrics = new HashMap<>(); + if (jobIdToTaskMetrics.containsKey(jobId)) + { + List runTimes = jobIdToTaskMetrics.get(jobId) + .stream() + .map(TaskMetrics::executorRunTime) + .collect(Collectors.toList()); + + double[] runTimesArray = runTimes.stream().mapToDouble(Long::doubleValue).toArray(); + jobMetrics.put("maxTaskRuntimeMillis", String.valueOf(StatUtils.max(runTimesArray))); + jobMetrics.put("meanTaskRuntimeMillis", String.valueOf(StatUtils.mean(runTimesArray))); + jobMetrics.put("p50TaskRuntimeMillis", String.valueOf(StatUtils.percentile(runTimesArray, 50))); + jobMetrics.put("p95TaskRuntimeMillis", String.valueOf(StatUtils.percentile(runTimesArray, 95))); + jobMetrics.put("maxTaskRetriesMillis", String.valueOf(jobIdToTaskRetryStats.get(jobId))); + } + return jobMetrics; + } + + private void cleanup(int jobId) + { + jobIdToStartTimes.remove(jobId); + internalJobIdMapping.remove(jobId); + jobIdToTaskMetrics.remove(jobId); + jobIdToTaskRetryStats.remove(jobId); + } +} diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/stats/JobStatsPublisher.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/stats/JobStatsPublisher.java index 9027ce437..5abcea258 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/stats/JobStatsPublisher.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/stats/JobStatsPublisher.java @@ -30,8 +30,9 @@ public interface JobStatsPublisher { /** * Publish the job attributes to be persisted and summarized - * - * @param stats the stats to publish + * @param stats mapping of the metric names and their values */ void publish(Map stats); + + Map stats(); } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/stats/LogStatsPublisher.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/stats/LogStatsPublisher.java index ce6ac0c1b..5408428db 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/stats/LogStatsPublisher.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/stats/LogStatsPublisher.java @@ -37,4 +37,10 @@ public void publish(Map stats) { LOGGER.info("Job Stats: {}", stats); } + + @Override + public Map stats() + { + throw new UnsupportedOperationException("Operation not supported for log publishing"); + } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java index 5bd2a1d07..f6d6e8d34 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; @@ -51,6 +52,7 @@ import com.google.common.cache.CacheBuilder; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Range; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,6 +77,9 @@ import org.apache.cassandra.sidecar.client.SidecarInstanceImpl; import org.apache.cassandra.sidecar.client.SimpleSidecarInstancesProvider; import org.apache.cassandra.sidecar.client.exception.RetriesExhaustedException; +import org.apache.cassandra.spark.common.stats.JobStatsListener; +import org.apache.cassandra.spark.common.stats.JobStatsPublisher; +import org.apache.cassandra.spark.common.stats.LogStatsPublisher; import org.apache.cassandra.spark.config.SchemaFeature; import org.apache.cassandra.spark.config.SchemaFeatureSet; import org.apache.cassandra.spark.data.partitioner.CassandraInstance; @@ -85,6 +90,7 @@ import org.apache.cassandra.spark.sparksql.LastModifiedTimestampDecorator; import org.apache.cassandra.spark.sparksql.RowBuilder; import org.apache.cassandra.spark.stats.Stats; +import org.apache.cassandra.spark.utils.BuildInfo; import org.apache.cassandra.spark.utils.CqlUtils; import org.apache.cassandra.spark.utils.ReaderTimeProvider; import org.apache.cassandra.spark.utils.ScalaFunctions; @@ -94,6 +100,7 @@ import org.apache.cassandra.spark.validation.SidecarValidation; import org.apache.cassandra.spark.validation.StartupValidatable; import org.apache.cassandra.spark.validation.StartupValidator; +import org.apache.spark.SparkContext; import org.apache.spark.sql.types.DataType; import org.apache.spark.util.ShutdownHookManager; import org.jetbrains.annotations.NotNull; @@ -132,18 +139,22 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV protected boolean useIncrementalRepair; protected List requestedFeatures; protected Map rfMap; + private final JobStatsListener jobStatsListener; @Nullable protected String lastModifiedTimestampField; // volatile in order to publish the reference for visibility protected volatile CqlTable cqlTable; protected transient TimeProvider timeProvider; protected transient SidecarClient sidecar; + private transient JobStatsPublisher jobStatsPublisher; private SslConfig sslConfig; @VisibleForTesting transient Map instanceMap; + private String internalJobId; + public CassandraDataLayer(@NotNull ClientConfig options, @NotNull Sidecar.ClientConfig sidecarClientConfig, @Nullable SslConfig sslConfig) @@ -163,6 +174,18 @@ public CassandraDataLayer(@NotNull ClientConfig options, this.useIncrementalRepair = options.useIncrementalRepair(); this.lastModifiedTimestampField = options.lastModifiedTimestampField(); this.requestedFeatures = options.requestedFeatures(); + this.jobStatsPublisher = new LogStatsPublisher(); + + // Note: Consumers are called for all jobs and task. We ONLY have to publish for existing job + this.jobStatsListener = new JobStatsListener((jobEventDetail) -> { + if (!internalJobId.isEmpty() && internalJobId.equals(jobEventDetail.internalJobID())) + { + Map stats = new HashMap<>(); + stats.put("jobId", StringUtils.defaultString(internalJobId, "null")); + stats.putAll(jobEventDetail.jobStats()); + jobStatsPublisher.publish(stats); + } + }); } // For serialization @@ -209,6 +232,17 @@ protected CassandraDataLayer(@Nullable String keyspace, this.useIncrementalRepair = useIncrementalRepair; this.lastModifiedTimestampField = lastModifiedTimestampField; this.requestedFeatures = requestedFeatures; + this.jobStatsPublisher = new LogStatsPublisher(); + this.jobStatsListener = new JobStatsListener((jobEventDetail) -> { + if (!internalJobId.isEmpty() && internalJobId.equals(jobEventDetail.internalJobID())) + { + Map stats = new HashMap<>(); + stats.put("jobId", StringUtils.defaultString(internalJobId, "null")); + stats.putAll(jobEventDetail.jobStats()); + jobStatsPublisher.publish(stats); + } + }); + if (lastModifiedTimestampField != null) { aliasLastModifiedTimestamp(this.requestedFeatures, this.lastModifiedTimestampField); @@ -222,7 +256,11 @@ protected CassandraDataLayer(@Nullable String keyspace, public void initialize(@NotNull ClientConfig options) { - dialHome(options); + SparkContext.getOrCreate().addSparkListener(jobStatsListener); + String description = "Cassandra Bulk Read for table " + table; + internalJobId = UUID.randomUUID().toString(); + SparkContext.getOrCreate().setJobGroup(internalJobId, description, false); + publishJobStats(internalJobId, options); timeProvider = new ReaderTimeProvider(); LOGGER.info("Starting Cassandra Spark job snapshotName={} keyspace={} table={} dc={} referenceEpoch={}", @@ -791,6 +829,37 @@ private static String readNullable(ObjectInputStream in) throws IOException return null; } + private void publishJobStats(String internalJobId, @NotNull ClientConfig options) + { + Map jobStats = new HashMap() + { + { + put("application", "Cassandra Spark Bulk Reader"); + put("jobId", StringUtils.defaultString(internalJobId, "null")); + put("keyspace", StringUtils.defaultString(keyspace, "null")); + put("table", StringUtils.defaultString(table, "null")); + put("snapshotName", snapshotName); + put("datacenter", datacenter); + put("numCores", String.valueOf(options.numCores())); + put("defaultParallelism", String.valueOf(options.defaultParallelism())); + put("jvmVersion", System.getProperty("java.version")); + put("useMtls", String.valueOf(true)); + put("maxRetries", String.valueOf(sidecarClientConfig.maxRetries())); + put("maxPoolSize", String.valueOf(sidecarClientConfig.maxPoolSize())); + put("timeoutSeconds", String.valueOf(sidecarClientConfig.timeoutSeconds())); + put("millisToSleep", String.valueOf(sidecarClientConfig.millisToSleep())); + put("maxBufferSize", String.valueOf(sidecarClientConfig.maxBufferSize())); + put("chunkSize", String.valueOf(sidecarClientConfig.chunkBufferSize())); + put("consistencyLevel", String.valueOf(consistencyLevel)); + put("analyticsVersion", BuildInfo.BUILD_VERSION_AND_REVISION); + put("sparkVersion", SparkContext.getOrCreate().version()); + put("quoteIdentifiers", String.valueOf(quoteIdentifiers)); + put("dataTransport", "DIRECT"); + } + }; + jobStatsPublisher.publish(jobStats); + } + // Kryo Serialization public static class Serializer extends com.esotericsoftware.kryo.Serializer diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataSourceHelper.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataSourceHelper.java index 2ae3370fa..09c138d0d 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataSourceHelper.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataSourceHelper.java @@ -129,6 +129,7 @@ protected static CassandraDataLayer createAndInitCassandraDataLayer( dataLayer.startupValidate(); + return dataLayer; } } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/JobStatsListenerTests.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/JobStatsListenerTests.java new file mode 100644 index 000000000..494d096da --- /dev/null +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/JobStatsListenerTests.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark; + +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.apache.cassandra.spark.common.stats.JobStatsListener; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; +import org.apache.spark.TaskEndReason; +import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.scheduler.SparkListenerJobEnd; +import org.apache.spark.scheduler.SparkListenerJobStart; +import org.apache.spark.scheduler.SparkListenerTaskEnd; +import org.apache.spark.scheduler.StageInfo; +import org.apache.spark.scheduler.TaskInfo; +import org.mockito.Mockito; +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.mockito.Mockito.when; + +public class JobStatsListenerTests +{ + private SparkContext sparkContext; + + @BeforeEach + public void setUp() + { + SparkConf conf = new SparkConf().setMaster("local").setAppName("test"); + sparkContext = new SparkContext(conf); + } + + @AfterEach + public void tearDown() + { + sparkContext.stop(); + } + + @Test + public void testOnEventHandling() + { + + List stageInfoList = Collections.emptyList(); + Seq stageInfoSeq = JavaConverters.asScalaIteratorConverter(stageInfoList.iterator()) + .asScala().toSeq(); + AtomicReference jobEvent = new AtomicReference<>(); + String jobId = UUID.randomUUID().toString(); + + JobStatsListener listener = new JobStatsListener((jobEventDetail) -> { + System.out.println("Job End Called"); + jobEvent.set(jobEventDetail); + }); + + Properties p = new Properties(); + p.setProperty("spark.jobGroup.id", jobId); + SparkListenerJobStart jobStartEvent = new SparkListenerJobStart(1, + System.currentTimeMillis(), + stageInfoSeq, + p); + listener.onJobStart(jobStartEvent); + listener.onTaskEnd(createMockTaskEndEvent()); + SparkListenerJobEnd mockJobEnd = Mockito.mock(SparkListenerJobEnd.class); + listener.onJobEnd(mockJobEnd); + + assertThat(jobEvent.get().internalJobID()).isEqualTo(jobId); + assertThat(jobEvent.get().jobStats().get("jobId")).isEqualTo(jobId); + assertThat(jobEvent.get().jobStats().get("jobStatus")).isEqualTo("Succeeded"); + assertThat(jobEvent.get().jobStats().get("maxTaskRetriesMillis")).isEqualTo("1"); + assertThat(jobEvent.get().jobStats().get("p50TaskRuntimeMillis")).isEqualTo("1.0"); + assertThat(jobEvent.get().jobStats().get("meanTaskRuntimeMillis")).isEqualTo("1.0"); + assertThat(jobEvent.get().jobStats().get("maxTaskRuntimeMillis")).isEqualTo("1.0"); + assertThat(jobEvent.get().jobStats().get("p95TaskRuntimeMillis")).isEqualTo("1.0"); + + // Validate that the previous job's metrics are reset after job completion + listener.onJobStart(jobStartEvent); + listener.onJobEnd(mockJobEnd); + assertFalse(jobEvent.get().jobStats().containsKey("maxTaskRetriesMillis")); + assertFalse(jobEvent.get().jobStats().containsKey("p50TaskRuntimeMillis")); + assertFalse(jobEvent.get().jobStats().containsKey("meanTaskRuntimeMillis")); + assertFalse(jobEvent.get().jobStats().containsKey("maxTaskRuntimeMillis")); + assertFalse(jobEvent.get().jobStats().containsKey("p95TaskRuntimeMillis")); + + } + + private SparkListenerTaskEnd createMockTaskEndEvent() + { + SparkListenerTaskEnd mockTaskEnd = Mockito.mock(SparkListenerTaskEnd.class); + TaskMetrics mTaskMetrics = Mockito.mock(TaskMetrics.class); + when(mTaskMetrics.executorRunTime()).thenReturn(1L); + TaskInfo mTaskInfo = Mockito.mock(TaskInfo.class); + when(mTaskInfo.taskId()).thenReturn(1L); + when(mTaskInfo.attemptNumber()).thenReturn(1); + + TaskEndReason mTaskEndReason = Mockito.mock(TaskEndReason.class); + when(mTaskEndReason.toString()).thenReturn("Success"); + + when(mockTaskEnd.taskMetrics()).thenReturn(mTaskMetrics); + when(mockTaskEnd.taskInfo()).thenReturn(mTaskInfo); + when(mockTaskEnd.reason()).thenReturn(mTaskEndReason); + return mockTaskEnd; + } +} diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java index 205d4129c..e605fa6aa 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java @@ -23,6 +23,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -81,12 +82,19 @@ public class MockBulkWriterContext implements BulkWriterContext, ClusterInfo, Jo new CqlField.CqlType[]{mockCqlType(INT), mockCqlType(DATE), mockCqlType(VARCHAR), mockCqlType(INT)}); private ConsistencyLevel.CL consistencyLevel; private int sstableDataSizeInMB = 128; + private int sstableWriteBatchSize = 2; + private Map jobStats = new HashMap<>(); private CassandraBridge bridge = CassandraBridgeFactory.get(CassandraVersion.FOURZERO); @Override public void publish(Map stats) { - // DO NOTHING + jobStats.putAll(stats); + } + + public Map stats() + { + return jobStats; } public interface CommitResultSupplier extends BiFunction, String, DirectDataTransferApi.RemoteCommitResult>