Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,13 @@
"message" : [
"The specified start <offsetType> <startOffset> is greater than the end <offsetType> <endOffset> for topic <topic> partition <partition>."
]
},
"KAFKA_MALFORMED_RECORD_TIMESTAMP" : {
"message": [
"Record at topic <topic> partition <partition> offset <offset> has timestamp value <timestamp> that overflows when converting to Spark's microsecond-precision TimestampType.",
"Kafka record timestamps must be in milliseconds since epoch per the Kafka protocol. The provided value suggests the producer may be writing timestamps in microseconds or nanoseconds instead.",
"Ensure the Kafka producer sets the ProducerRecord timestamp in milliseconds."
],
"sqlState": "22008"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,25 @@ object KafkaExceptions {
"assignedPartitions" -> assignedPartitions.toString))
}

def kafkaMalformedRecordTimestamp(
topic: String,
partition: Int,
offset: Long,
timestamp: Long,
cause: Throwable
): KafkaIllegalStateException = {
new KafkaIllegalStateException(
errorClass = "KAFKA_MALFORMED_RECORD_TIMESTAMP",
messageParameters = Map(
"topic" -> topic,
"partition" -> partition.toString,
"offset" -> offset.toString,
"timestamp" -> timestamp.toString
),
cause = cause
)
}

def nullTopicInData(): KafkaIllegalStateException = {
new KafkaIllegalStateException(
errorClass = "KAFKA_NULL_TOPIC_IN_DATA",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ private[kafka010] class KafkaRecordToRowConverter {
val toInternalRowWithoutHeaders: Record => InternalRow =
(cr: Record) => InternalRow(
cr.key, cr.value, UTF8String.fromString(cr.topic), cr.partition, cr.offset,
DateTimeUtils.fromJavaTimestamp(new Timestamp(cr.timestamp)), cr.timestampType.id
parseTimestamp(cr), cr.timestampType.id
)

val toInternalRowWithHeaders: Record => InternalRow =
(cr: Record) => InternalRow(
cr.key, cr.value, UTF8String.fromString(cr.topic), cr.partition, cr.offset,
DateTimeUtils.fromJavaTimestamp(new Timestamp(cr.timestamp)), cr.timestampType.id,
parseTimestamp(cr), cr.timestampType.id,
if (cr.headers.iterator().hasNext) {
new GenericArrayData(cr.headers.iterator().asScala
.map(header =>
Expand Down Expand Up @@ -90,4 +90,19 @@ private[kafka010] object KafkaRecordToRowConverter {
def kafkaSchema(includeHeaders: Boolean): StructType = {
if (includeHeaders) schemaWithHeaders else schemaWithoutHeaders
}

private def parseTimestamp(cr: Record): Long = {
try {
DateTimeUtils.fromJavaTimestamp(new Timestamp(cr.timestamp))
} catch {
case e: ArithmeticException =>
throw KafkaExceptions.kafkaMalformedRecordTimestamp(
cr.topic(),
cr.partition(),
cr.offset(),
cr.timestamp(),
e
)
}
}
}