Skip to content

Add self-checkpointing support for pooled streaming processors#4636

Open
abuijze wants to merge 2 commits into
mainfrom
feature/event-handler-checkpointing
Open

Add self-checkpointing support for pooled streaming processors#4636
abuijze wants to merge 2 commits into
mainfrom
feature/event-handler-checkpointing

Conversation

@abuijze

@abuijze abuijze commented Jun 4, 2026

Copy link
Copy Markdown
Contributor

Background

A pooled streaming processor stores a segment's TrackingToken once per batch, in the same transaction as the batch. That's correct for a synchronous, transactional projection, but wrong for a handler whose work isn't durable when the event handler returns -- one that persists asynchronously (off the processing thread, or via a longer-running process), or builds an in-memory model it snapshots later. Committing the batch doesn't make that work durable, so storing the batch-end token advertises progress that hasn't actually been made, and a restart would skip events that were never persisted.

This change lets an event-handling unit manage its own progress. A unit that implements Checkpointing decides when its work for a segment is durable and asks the processor to advance the stored token through a CheckpointTrigger; the processor stores the token only after the unit confirms durability, under a transaction it controls, so the stored token is always a genuine resume point. A processor whose every handler is self-checkpointing advances only on request ("fully-deferred"); one that also has an ordinary handler keeps checkpointing every batch as before.

Usage

A projection carries its @EventHandler methods and implements Checkpointing. The example below processes entirely in memory and persists only when the segment is released -- the simplest opt-in, and a clear case of deferral: onCheckpointAdvanced is the only required method, and since the projection never requests a checkpoint while running, the framework invokes it only on release (onSegmentReleased delegates to it by default):

class AccountProjection implements Checkpointing {

    private final ExternalStore store;
    private final Map<String, Long> balances = new ConcurrentHashMap<>();

    AccountProjection(ExternalStore store) {
        this.store = store;
    }

    @EventHandler
    void on(AccountCredited event) {
        balances.merge(event.accountId(), event.amount(), Long::sum); // pure in-memory; nothing persisted per batch
    }

    @Override
    public CompletableFuture<TrackingToken> onCheckpointAdvanced(Segment segment, TrackingToken requested) {
        return store.saveAsync(balances).thenApply(ignored -> TrackingToken.LATEST);
    }
}

It's registered like any other projection; implementing the interface is the entire opt-in, with no checkpoint-specific configuration:

EventProcessorModule.pooledStreaming("account-projection")
                    .eventHandlingComponents(c -> c.autodetected(
                            "accountProjection",
                            cfg -> new AccountProjection(cfg.getComponent(ExternalStore.class))));

Because the token advances only on release, a hard crash (no graceful release) rebuilds the in-memory model by reprocessing from the last stored token -- the trade-off for full-speed in-memory processing, and the expected behaviour for an in-memory read model.

To advance the stored token during processing instead (bounding the replay window) -- e.g. once an asynchronous write is durable, or on a timer -- request it through a CheckpointTrigger: take it as a handler parameter to request while handling an event, or retain it from onSegmentClaimed to request out-of-band (an async callback, a @Scheduled snapshot).

Introduce the Checkpointing operations interface and the per-segment
CheckpointTrigger (messaging streaming.checkpoint package), letting an
event-handling unit manage when its segment's TrackingToken advances.

Add the EventHandlingComponent#unwrap(Class) convention to detect such a
unit, forwarded through DelegatingEventHandlingComponent and
SequenceOverridingEventHandlingComponent and bridged to an annotated POJO
via AnnotatedEventHandlingComponent / AnnotatedHandlerInspector.

Extend WorkPackage and Coordinator to request, reconcile and store
checkpoints and to flush on segment release, and have
PooledStreamingEventProcessor resolve the participants and select auto
vs fully-deferred mode.

Register CheckpointTrigger and Segment handler-parameter resolvers
(checkpoint.annotation and segmenting.annotation).
@abuijze abuijze requested a review from a team as a code owner June 4, 2026 16:16
@abuijze abuijze requested review from MateuszNaKodach, hatzlj and smcvb and removed request for a team June 4, 2026 16:16
Place the volatile modifier before the @nullable type-use annotation on
lastConsumedToken, so the annotation sits adjacent to its type. This
follows the JLS modifier ordering and matches the field nullability
convention used elsewhere in the codebase.
@sonarqubecloud

sonarqubecloud Bot commented Jun 5, 2026

Copy link
Copy Markdown

Quality Gate Failed Quality Gate failed

Failed conditions
B Reliability Rating on New Code (required ≥ A)

See analysis details on SonarQube Cloud

Catch issues before they fail your Quality Gate with our IDE extension SonarQube for IDE

@laura-devriendt-lemon laura-devriendt-lemon left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some suggestions and questions

private final Coordinator coordinator;
private final WorkPackage.EventFilter workPackageEventFilter;
private final List<Checkpointing> checkpointingParticipants;
private final boolean autoMode;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The autoMode name isn't self-explanatory => "auto" mode of what? The reader has to trace the field to figure out it means "stores a checkpoint at the batch-end token every batch." Could we rename it to autoCheckpointing? That says what it does, and it lines up with the prose already used in WorkPackage ("the auto-checkpointing path", "the auto component"), so code and javadoc would use the same word. Also at workpackage?

.map(c -> c.unwrap(Checkpointing.class))
.flatMap(Optional::stream)
.toList();
this.autoMode = checkpointingParticipants.size() < eventHandlingComponents.size();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so if i understood correctly => if even one handler is ordinary (synchronous, durable-on-commit), the processor must keep storing per batch to protect that handler => so it stays in auto mode, and the checkpointing handlers get dragged along (forced to cover the batch-end token). Only when nobody needs per-batch storage can it fully defer.

It gets forced to confirm durability every batch with nothing telling the user. I get why (one shared token per segment can only be safe for the handler that can't reprocess), but it's invisible. Could this be documented so the user is aware of this and what they need to do if they do not want this behavior or/and maybe log a line at startup when a Checkpointing component shares a processor with an ordinary one, so the downgrade to the auto mode isn't silent?

WorkPackage workPackage = workPackageFactory.apply(segment, token);
workPackage.onBatchProcessed(() -> resetRetryExponentialBackoff(segment.getSegmentId()));
// Hand the per-segment CheckpointTrigger to any self-checkpointing components on this segment.
workPackage.notifySegmentClaimed();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion the comment here is a sign the method name could carry more. Personally I'd lean towards renaming notifySegmentClaimed() to something like handCheckpointTriggerToParticipants() and dropping the comment.

The name would then say what happens on its own, and where it's called already conveys the "on claim" timing. Just a suggestion, but I tend to prefer letting the name do the work over an explanatory comment.

WorkPackage workPackage = workPackageFactory.apply(segment, token);
workPackage.onBatchProcessed(() -> resetRetryExponentialBackoff(segment.getSegmentId()));
// Hand the per-segment CheckpointTrigger to any self-checkpointing components on this segment.
workPackage.notifySegmentClaimed();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

notifySegmentClaimed() invokes onSegmentClaimed code synchronously and unwrapped, whereas the segmentChangeListener.onSegmentClaimed(...) call right below it is wrapped in .handle(...) that logs and continues.

A throwing participant therefore propagates up to abortAndScheduleRetry, tearing down the whole claim cycle, while a throwing listener is contained. Was the asymmetry intended?

* (if it advances). Invoked by the {@link Coordinator} while the token-store claim is still held, before the claim
* is released. With no participant it is a no-op (the per-batch store already covered progress).
*/
CompletableFuture<Void> finalCheckpoint(ProcessingContext ctx) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be misreading this but the three javadocs around the release path seem to disagree about which token gets stored, and I can't tell which is intended.

finalCheckpoint's comment says lowest:

// ...then stores the lowerBound of the reported tokens within the given ctx

but the code goes for the highest via reconcile, and only uses lowerBound if reconcile throws:

.thenCompose(reported -> reconcile(reported).exceptionally(error -> {
    return lowerBound(reported.values());   // only on reconcile failure
}))

reconcile's own javadoc explains why it prefers the highest, to avoid idempotency reliance:

// ...which is only safe if that processing is idempotent. To avoid relying on
// idempotency, this reconciles the reported positions: it takes the highest reported
// position and re-requests every component that has not yet reached it...

That makes sense in the running checkpoint() path. But on release, onSegmentReleased's contract seems to say the opposite — that reprocessing is expected and fine:

// Returning a token below upTo is the normal lagging case: the uncovered tail
// ...is simply reprocessed on the next claim.

So on the release path, reconcile is trying to avoid the very reprocessing that onSegmentReleased already accepts. And mechanically, reconcile re-invokes onCheckpointAdvanced(participant, agreed) on a component after it already got onSegmentReleased (where the contract says it may "discard the segment's local state"), so I'm not sure it could advance at that point anyway, which would just send us to the lowerBound fallback.

I'm probably missing something, but I can't reconcile (no pun intended) the three: was finalCheckpoint meant to store lowerBound directly like its comment says, with reconcile reserved for the running path? Or is reconcile-on-release intentional in which case should onSegmentReleased's "lagging is normal" wording and finalCheckpoint's "stores the lowerBound" comment be updated to match? Either way the docs point three different directions right now. (Also: the only release test uses a single participant, so this multi-participant path isn't exercised either way.)

});
unitOfWork.onInvocation(ctx -> batchProcessor.process(eventBatch, ctx).asCompletableFuture());
// One transaction handles the batch AND stores the checkpoint for this cycle (if any).
unitOfWork.onPrepareCommit(this::checkpoint);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Folding checkpoint() into the batch transaction via onPrepareCommit means the batch tx stays open while we await participant.onCheckpointAdvanced(...) (awaited without a timeout). In auto mode this happens every batch, so a slow participant flush holds the batch's connection/locks open, and a participant failure rolls back the co-located ordinary handler's work too.

I think it's forced by the shared-token atomicity, so not asking for a code change but could we document the behavior? The "awaits without a timeout, can stall the segment" risk is already well covered in the Checkpointing javadoc. What I don't see documented is that this await sits inside the batch transaction (onPrepareCommit), so in auto mode it holds the batch's connection/locks open every batch, and a participant failure rolls back the co-located ordinary handler's work too. Could we add a line on that, plus a note recommending async/deferred handlers go in their own processor to avoid it?

* @see CheckpointTrigger
* @since 5.2.0
*/
public interface Checkpointing {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will there come a separate pr for antora docs?

* @param <C> The capability type.
* @return An {@link Optional} holding the resolved capability, or empty if not available.
*/
default <C> Optional<C> unwrap(Class<C> componentType) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New public method without an @SInCE 5.2.0 tag.

* @param <B> the behavior type
* @return an {@link Optional} holding the resolved behavior, or empty if the {@code target} does not provide it
*/
public <B> Optional<B> resolveBehavior(T target, Class<B> behaviorType) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New public method without an @ since 5.2.0 tag.

class Release {

@Test
void finalCheckpointStoresTheLowerBoundOfReleasedHighWaterMarks() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test uses a single participant, so reconcile short-circuits and the multi-participant release behavior isn't exercised. Could we add a 2-participant release test to pin the intended behavior?

@laura-devriendt-lemon

Copy link
Copy Markdown
Contributor

Something that could be usefull => I ran claude to ask if there where some behavior tests missing this was the output:

1. Multi-participant release. Every release test uses a single participant, so reconcile short-circuits (size == 1) and the release reconcile/lowerBound logic is never exercised. Add a release test with 2+ participants reporting different positions.

2. Auto-mode participant failure. AutoModeWithParticipant only has the happy path. Missing: a participant that can't cover the batch-end token in auto mode → the checkpoint fails in onPrepareCommit → the batch rolls back and the worker aborts, nothing stored.(it's only tested in deferred mode via aParticipantThatCannotBeReconciled).

3. Multiple segments, one projection instance. The whole "retain the trigger keyed by segment" contract is untested — both ITs use initialSegmentCount(1). Add a test with 2+ segments where one projection instance gets a distinct trigger per segment and checkpoints them at independent positions. This would catch the classic bug where someone stores this.trigger = trigger and clobbers segment 0 when segment 1 is claimed.

4. Mixed-mode end-to-end (ordinary + checkpointing in one processor). Every e2e test is a single checkpointing handler (fully-deferred). There's no test of a real processor running auto mode with a mix — verifying the ordinary handler stores every batch and the checkpointing one is dragged up to batch-end. This is the configuration most likely to surprise user

Moderate

5. Auto mode with multiple participants — reconcile running through the auto path (batch-end floor + several participants at different positions). Currently auto mode is only tested with one participant.

6. Checkpointing component in a subscribing (non-streaming) processor. The Checkpointing javadoc claims the callbacks never fire and the behavior is "inert" there. No test verifies that — i.e. a Checkpointing component runs as a plain handler in a SubscribingEventProcessor with no checkpoint calls, and a CheckpointTrigger parameter in that setting fails loudly. (The resolver's "no trigger present" case is unit-tested, but not end-to-end in a real non-streaming processor.)

7. Running-path regression guard. storeIfAdvanced's "ignore a token that regresses below the stored one" is only tested on release (aReleaseTokenBehindTheLastCheckpointIsIgnored...). Add a running-path case: request a checkpoint at position 5 (stored), then request at 3 → ignored, stored token stays 5.

8. Concurrent requests merging via upperBound. "only rises" behavior isn't directly tested: requests for 5 and 8 racing in from different threads should collapse to 8; a request for 3 after 8 should be a no-op. aRequestArrivingWhileTheWorkerIsRunning covers the re-entrant scheduling race but not the merge semantics.

Lower / edge

9. Non-comparable tokens still stored. storeIfAdvanced explicitly documents that tokens "merely not comparable" (replay boundary, partial multi-source advances) are still stored — but every test uses GlobalSequenceTrackingToken (always comparable), so that branch is untested.

10. Segment split/merge with an active Checkpointing component — does split/merge fire onSegmentReleased on the old work package and onSegmentClaimed on the new ones correctly? Untested.

11. Reset/replay interaction — what happens to a Checkpointing component when the processor is reset (token rewound)? Untested; may be fine, but worth a sanity test.

12. onCheckpointAdvanced returning LATEST as its return value. resolveLatest is applied to the return of onCheckpointAdvanced (not just the trigger and onSegmentReleased), but only the trigger/release LATEST paths are tested.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants