[#4632] fix(messaging): token store intit loses spring transaction pooled streaming processor #4645
Conversation
|
| }); | ||
| // The initial TrackingToken can resolve on a foreign thread: the Axon Server connector completes its future on | ||
| // a gRPC callback thread. Persisting on that thread fails, because the unit of work's transaction is bound to | ||
| // the thread that began it (issue #4632). So we resolve the token first, then dispatch the persist to this |
There was a problem hiding this comment.
Do we want to reference issues in the comments? I believe we can skip it, because the commit should reference the issue that it belongs to.
There was a problem hiding this comment.
I would remove it entriely and add to the commit something like that:
The initial TrackingToken can resolve on a foreign thread: the Axon Server connector completes its future on a gRPC callback thread (issue #4632). A unit of work's transaction is bound to the thread that began it, so the persist may not run as a continuation inside a unit of work started on another thread. Therefore, the token is resolved first, and the persist runs in its own unit of work, dispatched to this coordinator's executor so the blocking database work never occupies the connector's callback thread.
| // then initialization completes successfully and the persist observed a bound transaction. The precise | ||
| // correctness invariant is that initializeTokenSegments runs on a thread that owns the transaction (which it | ||
| // began); any fix satisfying that passes, regardless of which thread that turns out to be. | ||
| assertThatCode(() -> started.orTimeout(5, TimeUnit.SECONDS).join()) |
There was a problem hiding this comment.
The comment above this assertion says the test is implementation-agnostic ("any fix satisfying that passes, regardless of which thread that turns out to be"), but the final assertion then pins the persist to COORDINATOR_THREAD_NAME — so the test contradicts its own stated contract.
The behavioral invariants we actually care about are:
initializeTokenSegmentsruns on a thread that owns the transaction (covered by thetransactionBoundDuringInitializeassertion), and- it never runs on the connector's callback thread.
Pinning the exact executor means a future valid refactoring (e.g., a dedicated init executor, or a structural fix inside UnitOfWork that re-dispatches continuations) fails this test for the wrong reason. Suggest asserting the negative instead:
assertThat(initializeThread.get())
.as("initializeTokenSegments must never run on the connector thread that completed the initial-token future")
.isNotEqualTo(GRPC_THREAD_NAME);Alternatively, if locking the dispatch to the coordinator's executor is intentional (to also guard against blocking gRPC callback threads with DB I/O), keep the strict assertion but drop the "regardless of which thread" sentence from the comment so the test and its documentation agree.
| } | ||
| logger.info("Processor [{}]. Initializing ({}) segments", | ||
| name, initialSegmentCount); | ||
| return initialToken.apply(eventSource) |
There was a problem hiding this comment.
One semantic change here worth calling out explicitly: initialToken.apply(eventSource) now runs outside any UnitOfWork. Previously it executed inside the invocation phase of the (single) unit of work, so it ran with an active transaction bound to the invoking thread.
The function signature (Function<TrackingTokenSource, CompletableFuture<TrackingToken>>) never exposed a ProcessingContext, so running inside a transaction was arguably never part of the contract. But a custom initialToken that implicitly relied on the thread-bound transaction — e.g., one resolving a token by querying a JPA-backed event store through a Spring shared EntityManager — would have silently worked before and will now fail or open its own resources. I hope everything will work since we use the TransactionalProviders and we didn't make any mistake regarding that before.
I think this is the right trade-off (it's exactly what makes the fix work: no foreign-thread future may resolve inside a started unit of work), and the default token functions only call firstToken/latestToken/tokenAt which manage their own resources. Just confirming: are we aware of and OK with this behavior change for custom initialToken implementations? If so, it may deserve a sentence in the PR description / release notes so it's a documented decision rather than an accident.



Fix: PooledStreamingEventProcessor JPA token-store init loses Spring transaction on Axon Server (#4632)
The problem
On a fresh database, a
PooledStreamingEventProcessorusing Axon Server (DCB) for events + a JPATokenStorefails to start. Token-store initialization throwsTransactionRequiredException, retries 30×, and the application context fails to start.Root cause
Coordinator.initializeTokenStore()chained the JPA persist directly onto the Axon Server token future:The Axon Server connector resolves the initial token over gRPC and completes that future on its own gRPC callback thread. So the
.thenComposecontinuation, the JPA persist, runs on the connector thread. But the unit of work began its Spring transaction on a different thread, and Spring transactions are bound to the thread that started them. On the connector thread there is no bound transaction =>TransactionRequiredException.Options considered
joinAndUnwrap) before persisting => simplest, but Replace usages ofFutureUtils#joinAndUnwrapinPooledStreamingEventProcessor#3773 explicitly removed blocking from the PSEP, so this is off the table.thenCompose=> correct, but the whole transaction would then run on the connector's gRPC thread (DB work on a thread the connector owns and sizes for its own callbacks, risks starving it).Chosen solution
Option 3. We check segments in their own unit of work, and only when initialization is needed, resolve the token and dispatch the persist to the coordinator's own executor in a separate unit of work:
Why
thenComposeAsync(..., executorService)and notthenCompose? A plainthenComposeruns on whatever thread completed the upstream future, i.e. the connector's gRPC thread, so the new unit of work's begin/persist/commit would all execute there.thenComposeAsyncwith the coordinator's own executor moves that work onto a thread we control, where begin, persist, and commit stay co-located.