From 26b1080bb48214a8457b2598f7390cb8e8358307 Mon Sep 17 00:00:00 2001 From: Kavpreet Grewal Date: Thu, 26 Mar 2026 18:33:30 +0000 Subject: [PATCH] [SPARK-56243] Throw detailed error on malformed Kafka record timestamps --- .../error/kafka-error-conditions.json | 8 ++++++++ .../spark/sql/kafka010/KafkaExceptions.scala | 19 +++++++++++++++++++ .../kafka010/KafkaRecordToRowConverter.scala | 19 +++++++++++++++++-- 3 files changed, 44 insertions(+), 2 deletions(-) diff --git a/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-conditions.json b/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-conditions.json index 4260d96654d73..c256b8cbebb2d 100644 --- a/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-conditions.json +++ b/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-conditions.json @@ -120,5 +120,13 @@ "message" : [ "The specified start is greater than the end for topic partition ." ] + }, + "KAFKA_MALFORMED_RECORD_TIMESTAMP" : { + "message": [ + "Record at topic partition offset has timestamp value that overflows when converting to Spark's microsecond-precision TimestampType.", + "Kafka record timestamps must be in milliseconds since epoch per the Kafka protocol. The provided value suggests the producer may be writing timestamps in microseconds or nanoseconds instead.", + "Ensure the Kafka producer sets the ProducerRecord timestamp in milliseconds." + ], + "sqlState": "22008" } } diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala index 9db9f69c54dfe..17472fb920f19 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala @@ -178,6 +178,25 @@ object KafkaExceptions { "assignedPartitions" -> assignedPartitions.toString)) } + def kafkaMalformedRecordTimestamp( + topic: String, + partition: Int, + offset: Long, + timestamp: Long, + cause: Throwable + ): KafkaIllegalStateException = { + new KafkaIllegalStateException( + errorClass = "KAFKA_MALFORMED_RECORD_TIMESTAMP", + messageParameters = Map( + "topic" -> topic, + "partition" -> partition.toString, + "offset" -> offset.toString, + "timestamp" -> timestamp.toString + ), + cause = cause + ) + } + def nullTopicInData(): KafkaIllegalStateException = { new KafkaIllegalStateException( errorClass = "KAFKA_NULL_TOPIC_IN_DATA", diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToRowConverter.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToRowConverter.scala index 8d0bcc5816775..212b08ee3979b 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToRowConverter.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToRowConverter.scala @@ -39,13 +39,13 @@ private[kafka010] class KafkaRecordToRowConverter { val toInternalRowWithoutHeaders: Record => InternalRow = (cr: Record) => InternalRow( cr.key, cr.value, UTF8String.fromString(cr.topic), cr.partition, cr.offset, - DateTimeUtils.fromJavaTimestamp(new Timestamp(cr.timestamp)), cr.timestampType.id + parseTimestamp(cr), cr.timestampType.id ) val toInternalRowWithHeaders: Record => InternalRow = (cr: Record) => InternalRow( cr.key, cr.value, UTF8String.fromString(cr.topic), cr.partition, cr.offset, - DateTimeUtils.fromJavaTimestamp(new Timestamp(cr.timestamp)), cr.timestampType.id, + parseTimestamp(cr), cr.timestampType.id, if (cr.headers.iterator().hasNext) { new GenericArrayData(cr.headers.iterator().asScala .map(header => @@ -90,4 +90,19 @@ private[kafka010] object KafkaRecordToRowConverter { def kafkaSchema(includeHeaders: Boolean): StructType = { if (includeHeaders) schemaWithHeaders else schemaWithoutHeaders } + + private def parseTimestamp(cr: Record): Long = { + try { + DateTimeUtils.fromJavaTimestamp(new Timestamp(cr.timestamp)) + } catch { + case e: ArithmeticException => + throw KafkaExceptions.kafkaMalformedRecordTimestamp( + cr.topic(), + cr.partition(), + cr.offset(), + cr.timestamp(), + e + ) + } + } }