diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala index e8fbea0e69a4..c3d2dfc8812d 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala @@ -116,7 +116,17 @@ class PaimonMicroBatchStream( override def planInputPartitions(start: Offset, end: Offset): Array[InputPartition] = { val startOffset = { val startOffset0 = PaimonSourceOffset(start) - if (startOffset0.compareTo(initOffset) < 0) { + // Fall back to initOffset only when the checkpointed snapshot has expired. + // initOffset is recomputed from the current table state on every (re)start, + // so with scan modes like latest-full it points at the current snapshot with + // scanSnapshot=true. Clamping a still-valid checkpointed offset up to it made + // a restarted query silently skip the changelog gap and re-scan the whole + // snapshot, re-emitting every row as +I. + if (startOffset0.snapshotId < table.snapshotManager().earliestSnapshotId()) { + logWarning( + s"Checkpointed start offset $startOffset0 is no longer available " + + s"(earliest snapshot: ${table.snapshotManager().earliestSnapshotId()}), " + + s"falling back to $initOffset.") initOffset } else { startOffset0