From f8a467f1e506e1bc8271b8ac7a3e1fafddca377d Mon Sep 17 00:00:00 2001 From: yugeeklab Date: Thu, 11 Jun 2026 15:03:03 +0900 Subject: [PATCH] [spark] Fix restarted streaming query re-scanning full snapshot instead of resuming from checkpoint planInputPartitions clamped the checkpointed start offset up to initOffset whenever it compared lower. initOffset is recomputed from the current table state on every restart, so with scan modes like latest-full it always points at the current snapshot with scanSnapshot=true. Any restarted query therefore dropped its valid checkpointed position, skipped the changelog gap, and re-emitted the entire snapshot as +I rows. Fall back to initOffset only when the checkpointed snapshot has actually expired (older than earliestSnapshotId). Co-Authored-By: Claude Fable 5 --- .../spark/sources/PaimonMicroBatchStream.scala | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala index e8fbea0e69a4..c3d2dfc8812d 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala @@ -116,7 +116,17 @@ class PaimonMicroBatchStream( override def planInputPartitions(start: Offset, end: Offset): Array[InputPartition] = { val startOffset = { val startOffset0 = PaimonSourceOffset(start) - if (startOffset0.compareTo(initOffset) < 0) { + // Fall back to initOffset only when the checkpointed snapshot has expired. + // initOffset is recomputed from the current table state on every (re)start, + // so with scan modes like latest-full it points at the current snapshot with + // scanSnapshot=true. Clamping a still-valid checkpointed offset up to it made + // a restarted query silently skip the changelog gap and re-scan the whole + // snapshot, re-emitting every row as +I. + if (startOffset0.snapshotId < table.snapshotManager().earliestSnapshotId()) { + logWarning( + s"Checkpointed start offset $startOffset0 is no longer available " + + s"(earliest snapshot: ${table.snapshotManager().earliestSnapshotId()}), " + + s"falling back to $initOffset.") initOffset } else { startOffset0