Skip to content

[Bug] Spark: restarted streaming query drops valid checkpoint offset and re-scans the full snapshot (scan.mode=latest-full) #8205

@yugeeklab

Description

@yugeeklab

Search before asking

  • I searched in the issues and found nothing similar.

Paimon version

master (1.5-SNAPSHOT), also present in release-1.4

Compute Engine

Spark 3.5, Structured Streaming (micro-batch), scan.mode = latest-full

Minimal reproduce step

  1. Start a streaming query reading a primary-key table (spark.readStream.table(...) with scan.mode=latest-full), let it commit a few batches.
  2. Kill the query and restart it with the same checkpoint.
  3. The first batch after restart is empty; the second batch re-reads the entire snapshot instead of resuming from the checkpointed offset, and the changelog between the checkpointed offset and the restart-time snapshot is skipped.

Offset WAL across one observed restart (5-minute trigger, ~300k-row source):

batch N    {"snapshotId":134483,"index":20,"scanSnapshot":false}   <- committed before stop
batch N+1  {"snapshotId":134491,"index":29,"scanSnapshot":false}   <- 0 rows, 2s
batch N+2  {"snapshotId":134511,"index":29,"scanSnapshot":false}   <- 1,055,509 input rows

What doesn't meet your expectations?

PaimonMicroBatchStream#planInputPartitions clamps the checkpointed start offset up to initOffset whenever it compares lower:

if (startOffset0.compareTo(initOffset) < 0) { initOffset } else { startOffset0 }

initOffset is a lazy val recomputed from the current table state on every restart; with scan.mode=latest-full it always points at the current snapshot with scanSnapshot=true. After any downtime the checkpointed offset compares lower, so a perfectly valid checkpoint is silently discarded:

  • the batch planned against the old WAL end offset becomes empty (all splits filtered by the (start, end] range check),
  • the next batch re-emits the whole snapshot as +I rows carrying their stored field values (e.g. old event-time columns), and
  • the changelog gap between the checkpointed offset and the restart-time snapshot is never read.

For stateful consumers (e.g. SCD2 history building) the re-emission applies historical rows a second time and corrupted downstream state in our production pipeline.

Anything else?

Proposed fix (PR follows): fall back to initOffset only when the checkpointed snapshot has actually expired (startOffset0.snapshotId < earliestSnapshotId()); otherwise resume from the checkpointed offset as-is.

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions