diff --git a/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-conditions.json b/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-conditions.json index 4260d96654d73..c256b8cbebb2d 100644 --- a/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-conditions.json +++ b/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-conditions.json @@ -120,5 +120,13 @@ "message" : [ "The specified start is greater than the end for topic partition ." ] + }, + "KAFKA_MALFORMED_RECORD_TIMESTAMP" : { + "message": [ + "Record at topic partition offset has timestamp value that overflows when converting to Spark's microsecond-precision TimestampType.", + "Kafka record timestamps must be in milliseconds since epoch per the Kafka protocol. The provided value suggests the producer may be writing timestamps in microseconds or nanoseconds instead.", + "Ensure the Kafka producer sets the ProducerRecord timestamp in milliseconds." + ], + "sqlState": "22008" } } diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala index 9db9f69c54dfe..17472fb920f19 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala @@ -178,6 +178,25 @@ object KafkaExceptions { "assignedPartitions" -> assignedPartitions.toString)) } + def kafkaMalformedRecordTimestamp( + topic: String, + partition: Int, + offset: Long, + timestamp: Long, + cause: Throwable + ): KafkaIllegalStateException = { + new KafkaIllegalStateException( + errorClass = "KAFKA_MALFORMED_RECORD_TIMESTAMP", + messageParameters = Map( + "topic" -> topic, + "partition" -> partition.toString, + "offset" -> offset.toString, + "timestamp" -> timestamp.toString + ), + cause = cause + ) + } + def nullTopicInData(): KafkaIllegalStateException = { new KafkaIllegalStateException( errorClass = "KAFKA_NULL_TOPIC_IN_DATA", diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToRowConverter.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToRowConverter.scala index 8d0bcc5816775..212b08ee3979b 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToRowConverter.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToRowConverter.scala @@ -39,13 +39,13 @@ private[kafka010] class KafkaRecordToRowConverter { val toInternalRowWithoutHeaders: Record => InternalRow = (cr: Record) => InternalRow( cr.key, cr.value, UTF8String.fromString(cr.topic), cr.partition, cr.offset, - DateTimeUtils.fromJavaTimestamp(new Timestamp(cr.timestamp)), cr.timestampType.id + parseTimestamp(cr), cr.timestampType.id ) val toInternalRowWithHeaders: Record => InternalRow = (cr: Record) => InternalRow( cr.key, cr.value, UTF8String.fromString(cr.topic), cr.partition, cr.offset, - DateTimeUtils.fromJavaTimestamp(new Timestamp(cr.timestamp)), cr.timestampType.id, + parseTimestamp(cr), cr.timestampType.id, if (cr.headers.iterator().hasNext) { new GenericArrayData(cr.headers.iterator().asScala .map(header => @@ -90,4 +90,19 @@ private[kafka010] object KafkaRecordToRowConverter { def kafkaSchema(includeHeaders: Boolean): StructType = { if (includeHeaders) schemaWithHeaders else schemaWithoutHeaders } + + private def parseTimestamp(cr: Record): Long = { + try { + DateTimeUtils.fromJavaTimestamp(new Timestamp(cr.timestamp)) + } catch { + case e: ArithmeticException => + throw KafkaExceptions.kafkaMalformedRecordTimestamp( + cr.topic(), + cr.partition(), + cr.offset(), + cr.timestamp(), + e + ) + } + } }