From 1dfd375e40705bd02f2c6517fcddfbcebdfd83c4 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Mon, 18 May 2026 15:08:13 +0200 Subject: [PATCH 1/2] fix(agents-server): release pull-wake claim row even when in-memory token is missing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The release path in callback-forward was gated by `stillOwnsClaim`, an in-memory check that fails after server restart or when a newer wake on the same stream evicts the token. When that happened, the consumer_claims row stayed at status=active indefinitely and the entity remained stuck at status=running long after `done` arrived. Decouple the concerns: - materializeReleasedClaim runs whenever epoch is defined (DB identity is sufficient to release the row). Now returns `{ claim, entityCleared }` where entityCleared is true iff our (consumerId, epoch) was the active dispatch and we just cleared it. - updateStatus(idle) and onEntityChanged fire when `entityCleared || stillOwnsClaim` — covers happy path, server restart, retry-after-failed- updateStatus, while still leaving status=running when a newer wake holds the entity's active dispatch. - clearStream remains gated by stillOwnsClaim so we never clear another consumer's token from under it. Regression tests in test/webhook-forward-routing.test.ts cover the three failure modes (lost token, evicted token, retry). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../release-pull-wake-claim-after-dispatch.md | 5 + packages/agents-server/src/entity-registry.ts | 13 +- .../src/routing/internal-router.ts | 46 +++-- .../test/webhook-forward-routing.test.ts | 195 ++++++++++++++++++ 4 files changed, 243 insertions(+), 16 deletions(-) create mode 100644 .changeset/release-pull-wake-claim-after-dispatch.md diff --git a/.changeset/release-pull-wake-claim-after-dispatch.md b/.changeset/release-pull-wake-claim-after-dispatch.md new file mode 100644 index 0000000000..47e221bdae --- /dev/null +++ b/.changeset/release-pull-wake-claim-after-dispatch.md @@ -0,0 +1,5 @@ +--- +'@electric-ax/agents-server': patch +--- + +Fix pull-wake claims leaking in `consumer_claims` after dispatch. The release path in `callback-forward` was gated entirely on the in-memory write-token state, so any condition that lost or evicted the token (server restart, a newer wake on the same stream) would prevent `materializeReleasedClaim` from running and leave the DB row pinned at `status='active'`. The fix decouples the durable-row release (keyed by `consumerId + epoch`) from in-memory token cleanup, and uses `entityCleared || stillOwnsClaim` to gate the entity status transition back to `idle`. Includes regression tests in `test/webhook-forward-routing.test.ts`. diff --git a/packages/agents-server/src/entity-registry.ts b/packages/agents-server/src/entity-registry.ts index a0c5b23426..3b84ce89d0 100644 --- a/packages/agents-server/src/entity-registry.ts +++ b/packages/agents-server/src/entity-registry.ts @@ -378,7 +378,7 @@ export class PostgresRegistry { async materializeReleasedClaim( input: MaterializeReleasedClaimInput - ): Promise { + ): Promise<{ claim: ConsumerClaim | null; entityCleared: boolean }> { const releasedAt = input.releasedAt ?? new Date() const rows = await this.db .update(consumerClaims) @@ -398,8 +398,13 @@ export class PostgresRegistry { .returning() const claim = rows[0] ? this.rowToConsumerClaim(rows[0]) : null + let entityCleared = false if (claim) { - await this.db + // entityCleared distinguishes "we were the active dispatch and now it's + // empty" from "a newer claim was already active for this entity." The + // WHERE clause matches our (consumerId, epoch) so an evicted-by-newer + // case correctly returns zero rows. + const cleared = await this.db .update(entityDispatchState) .set({ activeConsumerId: null, @@ -419,8 +424,10 @@ export class PostgresRegistry { eq(entityDispatchState.activeEpoch, input.epoch) ) ) + .returning({ entityUrl: entityDispatchState.entityUrl }) + entityCleared = cleared.length > 0 } - return claim + return { claim, entityCleared } } async getActiveClaimsForRunner( diff --git a/packages/agents-server/src/routing/internal-router.ts b/packages/agents-server/src/routing/internal-router.ts index caa5067ae3..6f5f4c0350 100644 --- a/packages/agents-server/src/routing/internal-router.ts +++ b/packages/agents-server/src/routing/internal-router.ts @@ -624,8 +624,15 @@ async function callbackForward( const entity = await ctx.entityManager.registry.getEntityByStream( target.primaryStream ) - if (entity && stillOwnsClaim) { - if (epoch !== undefined) { + + // Release the consumer_claims row by its DB identity (consumerId, + // epoch). The in-memory write token is a separate concern (write + // authorization during the run); release of the durable row must + // succeed even if the token was lost (server restart) or evicted + // (a later wake re-minted for the same stream). + let entityCleared = false + if (epoch !== undefined) { + const result = await ctx.entityManager.registry.materializeReleasedClaim?.({ consumerId, epoch, @@ -643,28 +650,41 @@ async function callbackForward( }) : undefined, }) - } + entityCleared = result?.entityCleared ?? false + } + + // Transition entity back to idle when either signal says it's safe: + // - entityCleared: our release just cleared the entity's active + // dispatch state, so no in-flight wake remains. + // - stillOwnsClaim: this consumer is still the in-memory write-token + // owner, so no newer wake has displaced it. Covers two cases: + // (a) retry of a failed done (first attempt cleared the DB state + // but failed to update status), (b) server restart scenarios where + // the token is intact even though entityDispatchState may diverge. + // If both are false, a newer wake owns the entity — leave status as-is. + if (entity && (entityCleared || stillOwnsClaim)) { await ctx.entityManager.registry.updateStatus(entity.url, `idle`) - ctx.runtime.claimWriteTokens.clearStream( - ctx.service, - target.primaryStream - ) await ctx.entityBridgeManager.onEntityChanged(entity.url) serverLog.info( `[callback-forward] status updated to idle for ${entity.url}` ) - } else if (stillOwnsClaim) { + } else if (!entity) { + serverLog.warn( + `[callback-forward] done received but no entity found for stream=${target.primaryStream}` + ) + } + + // Clear the in-memory write token only if this consumer still owns it. + // If a newer wake has taken over, that newer wake owns the token now + // and we must not clear it out from under it. + if (stillOwnsClaim) { ctx.runtime.claimWriteTokens.clearStream( ctx.service, target.primaryStream ) } else if (entity) { serverLog.info( - `[callback-forward] done ignored for stale claim stream=${target.primaryStream} consumer=${consumerId}` - ) - } else { - serverLog.warn( - `[callback-forward] done received but no entity found for stream=${target.primaryStream}` + `[callback-forward] done arrived after in-memory token evicted (stream=${target.primaryStream} consumer=${consumerId})` ) } } else if (requestBody?.done === true) { diff --git a/packages/agents-server/test/webhook-forward-routing.test.ts b/packages/agents-server/test/webhook-forward-routing.test.ts index 409021d32e..e60d28bd40 100644 --- a/packages/agents-server/test/webhook-forward-routing.test.ts +++ b/packages/agents-server/test/webhook-forward-routing.test.ts @@ -100,6 +100,11 @@ function buildContext(overrides: Partial = {}): TenantContext { registry: { getEntityByStream: vi.fn().mockResolvedValue(entity), updateStatus: vi.fn().mockResolvedValue(undefined), + materializeReleasedClaim: vi.fn().mockResolvedValue({ + claim: null, + entityCleared: true, + }), + materializeHeartbeatClaim: vi.fn().mockResolvedValue(undefined), }, enrichPayload: vi.fn(async (payload: Record) => ({ ...payload, @@ -580,4 +585,194 @@ describe(`webhook forwarding for Durable Streams subscriptions`, () => { fetchSpy.mockRestore() } }) + + describe(`claim release on done callback (regression for #4340)`, () => { + const upstreamOk = (): void => { + vi.spyOn(globalThis, `fetch`).mockResolvedValue( + new Response(JSON.stringify({ ok: true, next_wake: false }), { + headers: { 'content-type': `application/json` }, + }) + ) + } + + it(`marks the consumer_claims row released and the entity idle on done`, async () => { + const select = selectDb([ + { + callbackUrl: `http://durable.local/v1/stream-meta/subscriptions/horton-handler/callback?service=tenant-a`, + primaryStream: `/horton/demo/main`, + }, + ]) + upstreamOk() + const ctx = buildContext({ + pgDb: { select: select.select } as any, + }) + // Simulate the runtime's claim phase having minted a token. + ctx.runtime.claimWriteTokens.mint( + `tenant-a`, + `/horton/demo/main`, + `wake-1` + ) + + try { + const response = await globalRouter.fetch( + new Request(`http://agents.local/_electric/callback-forward/wake-1`, { + method: `POST`, + headers: { + 'content-type': `application/json`, + authorization: `Bearer callback-token`, + }, + body: JSON.stringify({ + epoch: 7, + acks: [{ path: `/horton/demo/main`, offset: `1` }], + done: true, + }), + }), + ctx + ) + + expect(response.status).toBe(200) + expect( + ctx.entityManager.registry.materializeReleasedClaim + ).toHaveBeenCalledWith( + expect.objectContaining({ + consumerId: `wake-1`, + epoch: 7, + ackedStreams: [{ path: `/horton/demo/main`, offset: `1` }], + }) + ) + expect(ctx.entityManager.registry.updateStatus).toHaveBeenCalledWith( + `/horton/demo`, + `idle` + ) + } finally { + vi.mocked(globalThis.fetch).mockRestore() + } + }) + + it(`still releases the consumer_claims row when the in-memory write token is missing (e.g. server restart)`, async () => { + // Reproduces the production failure mode where the in-memory + // ClaimWriteTokenStore is empty (server restarted between dispatch and + // done, or another wake evicted the token) but the consumer_claims row + // is still active in the DB. Today the release path skips + // materializeReleasedClaim under this condition and the row leaks. + const select = selectDb([ + { + callbackUrl: `http://durable.local/v1/stream-meta/subscriptions/horton-handler/callback?service=tenant-a`, + primaryStream: `/horton/demo/main`, + }, + ]) + upstreamOk() + const ctx = buildContext({ + pgDb: { select: select.select } as any, + }) + // Intentionally do NOT mint a write token — simulates the token + // being lost between the runtime's claim phase and its done call. + + try { + const response = await globalRouter.fetch( + new Request(`http://agents.local/_electric/callback-forward/wake-1`, { + method: `POST`, + headers: { + 'content-type': `application/json`, + authorization: `Bearer callback-token`, + }, + body: JSON.stringify({ + epoch: 7, + acks: [{ path: `/horton/demo/main`, offset: `1` }], + done: true, + }), + }), + ctx + ) + + expect(response.status).toBe(200) + // The DB row identity (consumerId, epoch) is sufficient to release + // the claim — release must not depend on in-memory write-token state. + expect( + ctx.entityManager.registry.materializeReleasedClaim + ).toHaveBeenCalledWith( + expect.objectContaining({ + consumerId: `wake-1`, + epoch: 7, + }) + ) + // The entity should transition back to idle so the UI no longer + // shows it as "running" after the agent finishes. + expect(ctx.entityManager.registry.updateStatus).toHaveBeenCalledWith( + `/horton/demo`, + `idle` + ) + } finally { + vi.mocked(globalThis.fetch).mockRestore() + } + }) + + it(`releases an earlier wake's claim even after a later wake evicted its in-memory token`, async () => { + // Same defect, different cause: two wakes for the same entity arrive + // close together. The second's mint evicts the first's token. The + // first wake's done call then finds stillOwnsClaim=false and skips + // the release. + const select = selectDb([ + { + callbackUrl: `http://durable.local/v1/stream-meta/subscriptions/horton-handler/callback?service=tenant-a`, + primaryStream: `/horton/demo/main`, + }, + ]) + upstreamOk() + const ctx = buildContext({ + pgDb: { select: select.select } as any, + }) + // Simulate the DB state where consumer-old's row is still active but + // entityDispatchState's active claim is consumer-new (the later wake). + ;( + ctx.entityManager.registry.materializeReleasedClaim as any + ).mockResolvedValue({ claim: null, entityCleared: false }) + + ctx.runtime.claimWriteTokens.mint( + `tenant-a`, + `/horton/demo/main`, + `wake-1` + ) + // A second wake arrives and re-mints for the same stream, evicting + // wake-1 from the in-memory store. + ctx.runtime.claimWriteTokens.mint( + `tenant-a`, + `/horton/demo/main`, + `wake-2` + ) + + try { + await globalRouter.fetch( + new Request(`http://agents.local/_electric/callback-forward/wake-1`, { + method: `POST`, + headers: { + 'content-type': `application/json`, + authorization: `Bearer callback-token`, + }, + body: JSON.stringify({ + epoch: 7, + acks: [{ path: `/horton/demo/main`, offset: `1` }], + done: true, + }), + }), + ctx + ) + + // The DB row for wake-1 must still be released. + expect( + ctx.entityManager.registry.materializeReleasedClaim + ).toHaveBeenCalledWith( + expect.objectContaining({ + consumerId: `wake-1`, + epoch: 7, + }) + ) + // But the entity status must NOT be set to idle — wake-2 is still + // in flight and owns the entity's active dispatch. + expect(ctx.entityManager.registry.updateStatus).not.toHaveBeenCalled() + } finally { + vi.mocked(globalThis.fetch).mockRestore() + } + }) + }) }) From 8b288ee1f8bca524c3687b7dd100953ad219f5fc Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Wed, 20 May 2026 10:17:44 +0200 Subject: [PATCH 2/2] test(agents-server): reframe done-release tests around expected behavior MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rename the describe block and the three tests so they read as positive behavioral contracts ("releases ... and marks the entity idle when ...") rather than as bug-reproduction narratives. Drop comments that described the production failure mode now that the names carry the intent. Also assert in the newer-wake-takes-over case that wake-2's in-memory write token is preserved by wake-1's done — protects against future regressions that might clear the wrong consumer's token from the shared map. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../test/webhook-forward-routing.test.ts | 42 +++++++------------ 1 file changed, 15 insertions(+), 27 deletions(-) diff --git a/packages/agents-server/test/webhook-forward-routing.test.ts b/packages/agents-server/test/webhook-forward-routing.test.ts index e60d28bd40..17f46b156d 100644 --- a/packages/agents-server/test/webhook-forward-routing.test.ts +++ b/packages/agents-server/test/webhook-forward-routing.test.ts @@ -586,7 +586,7 @@ describe(`webhook forwarding for Durable Streams subscriptions`, () => { } }) - describe(`claim release on done callback (regression for #4340)`, () => { + describe(`callback-forward done releases the durable claim independently of the in-memory write token`, () => { const upstreamOk = (): void => { vi.spyOn(globalThis, `fetch`).mockResolvedValue( new Response(JSON.stringify({ ok: true, next_wake: false }), { @@ -595,7 +595,7 @@ describe(`webhook forwarding for Durable Streams subscriptions`, () => { ) } - it(`marks the consumer_claims row released and the entity idle on done`, async () => { + it(`releases the consumer_claims row and marks the entity idle when the consumer holds the in-memory write token`, async () => { const select = selectDb([ { callbackUrl: `http://durable.local/v1/stream-meta/subscriptions/horton-handler/callback?service=tenant-a`, @@ -606,7 +606,6 @@ describe(`webhook forwarding for Durable Streams subscriptions`, () => { const ctx = buildContext({ pgDb: { select: select.select } as any, }) - // Simulate the runtime's claim phase having minted a token. ctx.runtime.claimWriteTokens.mint( `tenant-a`, `/horton/demo/main`, @@ -649,12 +648,7 @@ describe(`webhook forwarding for Durable Streams subscriptions`, () => { } }) - it(`still releases the consumer_claims row when the in-memory write token is missing (e.g. server restart)`, async () => { - // Reproduces the production failure mode where the in-memory - // ClaimWriteTokenStore is empty (server restarted between dispatch and - // done, or another wake evicted the token) but the consumer_claims row - // is still active in the DB. Today the release path skips - // materializeReleasedClaim under this condition and the row leaks. + it(`releases the consumer_claims row and marks the entity idle when no in-memory write token is present for the consumer`, async () => { const select = selectDb([ { callbackUrl: `http://durable.local/v1/stream-meta/subscriptions/horton-handler/callback?service=tenant-a`, @@ -665,8 +659,6 @@ describe(`webhook forwarding for Durable Streams subscriptions`, () => { const ctx = buildContext({ pgDb: { select: select.select } as any, }) - // Intentionally do NOT mint a write token — simulates the token - // being lost between the runtime's claim phase and its done call. try { const response = await globalRouter.fetch( @@ -686,8 +678,6 @@ describe(`webhook forwarding for Durable Streams subscriptions`, () => { ) expect(response.status).toBe(200) - // The DB row identity (consumerId, epoch) is sufficient to release - // the claim — release must not depend on in-memory write-token state. expect( ctx.entityManager.registry.materializeReleasedClaim ).toHaveBeenCalledWith( @@ -696,8 +686,6 @@ describe(`webhook forwarding for Durable Streams subscriptions`, () => { epoch: 7, }) ) - // The entity should transition back to idle so the UI no longer - // shows it as "running" after the agent finishes. expect(ctx.entityManager.registry.updateStatus).toHaveBeenCalledWith( `/horton/demo`, `idle` @@ -707,11 +695,7 @@ describe(`webhook forwarding for Durable Streams subscriptions`, () => { } }) - it(`releases an earlier wake's claim even after a later wake evicted its in-memory token`, async () => { - // Same defect, different cause: two wakes for the same entity arrive - // close together. The second's mint evicts the first's token. The - // first wake's done call then finds stillOwnsClaim=false and skips - // the release. + it(`releases the consumer_claims row for the old consumer when a newer consumer has taken over the in-memory token, without disturbing the newer consumer's token or the entity status`, async () => { const select = selectDb([ { callbackUrl: `http://durable.local/v1/stream-meta/subscriptions/horton-handler/callback?service=tenant-a`, @@ -722,8 +706,8 @@ describe(`webhook forwarding for Durable Streams subscriptions`, () => { const ctx = buildContext({ pgDb: { select: select.select } as any, }) - // Simulate the DB state where consumer-old's row is still active but - // entityDispatchState's active claim is consumer-new (the later wake). + // The newer consumer is the active dispatch in the DB, so releasing + // the older consumer's row must not clear the entity's dispatch state. ;( ctx.entityManager.registry.materializeReleasedClaim as any ).mockResolvedValue({ claim: null, entityCleared: false }) @@ -733,8 +717,6 @@ describe(`webhook forwarding for Durable Streams subscriptions`, () => { `/horton/demo/main`, `wake-1` ) - // A second wake arrives and re-mints for the same stream, evicting - // wake-1 from the in-memory store. ctx.runtime.claimWriteTokens.mint( `tenant-a`, `/horton/demo/main`, @@ -758,7 +740,6 @@ describe(`webhook forwarding for Durable Streams subscriptions`, () => { ctx ) - // The DB row for wake-1 must still be released. expect( ctx.entityManager.registry.materializeReleasedClaim ).toHaveBeenCalledWith( @@ -767,9 +748,16 @@ describe(`webhook forwarding for Durable Streams subscriptions`, () => { epoch: 7, }) ) - // But the entity status must NOT be set to idle — wake-2 is still - // in flight and owns the entity's active dispatch. + // The newer consumer owns the entity now — its status must stay + // as-is and its in-memory write token must remain intact. expect(ctx.entityManager.registry.updateStatus).not.toHaveBeenCalled() + expect( + ctx.runtime.claimWriteTokens.owns( + `tenant-a`, + `/horton/demo/main`, + `wake-2` + ) + ).toBe(true) } finally { vi.mocked(globalThis.fetch).mockRestore() }