diff --git a/.changeset/release-pull-wake-claim-after-dispatch.md b/.changeset/release-pull-wake-claim-after-dispatch.md new file mode 100644 index 0000000000..47e221bdae --- /dev/null +++ b/.changeset/release-pull-wake-claim-after-dispatch.md @@ -0,0 +1,5 @@ +--- +'@electric-ax/agents-server': patch +--- + +Fix pull-wake claims leaking in `consumer_claims` after dispatch. The release path in `callback-forward` was gated entirely on the in-memory write-token state, so any condition that lost or evicted the token (server restart, a newer wake on the same stream) would prevent `materializeReleasedClaim` from running and leave the DB row pinned at `status='active'`. The fix decouples the durable-row release (keyed by `consumerId + epoch`) from in-memory token cleanup, and uses `entityCleared || stillOwnsClaim` to gate the entity status transition back to `idle`. Includes regression tests in `test/webhook-forward-routing.test.ts`. diff --git a/packages/agents-server/src/entity-registry.ts b/packages/agents-server/src/entity-registry.ts index a0c5b23426..3b84ce89d0 100644 --- a/packages/agents-server/src/entity-registry.ts +++ b/packages/agents-server/src/entity-registry.ts @@ -378,7 +378,7 @@ export class PostgresRegistry { async materializeReleasedClaim( input: MaterializeReleasedClaimInput - ): Promise { + ): Promise<{ claim: ConsumerClaim | null; entityCleared: boolean }> { const releasedAt = input.releasedAt ?? new Date() const rows = await this.db .update(consumerClaims) @@ -398,8 +398,13 @@ export class PostgresRegistry { .returning() const claim = rows[0] ? this.rowToConsumerClaim(rows[0]) : null + let entityCleared = false if (claim) { - await this.db + // entityCleared distinguishes "we were the active dispatch and now it's + // empty" from "a newer claim was already active for this entity." The + // WHERE clause matches our (consumerId, epoch) so an evicted-by-newer + // case correctly returns zero rows. + const cleared = await this.db .update(entityDispatchState) .set({ activeConsumerId: null, @@ -419,8 +424,10 @@ export class PostgresRegistry { eq(entityDispatchState.activeEpoch, input.epoch) ) ) + .returning({ entityUrl: entityDispatchState.entityUrl }) + entityCleared = cleared.length > 0 } - return claim + return { claim, entityCleared } } async getActiveClaimsForRunner( diff --git a/packages/agents-server/src/routing/internal-router.ts b/packages/agents-server/src/routing/internal-router.ts index caa5067ae3..6f5f4c0350 100644 --- a/packages/agents-server/src/routing/internal-router.ts +++ b/packages/agents-server/src/routing/internal-router.ts @@ -624,8 +624,15 @@ async function callbackForward( const entity = await ctx.entityManager.registry.getEntityByStream( target.primaryStream ) - if (entity && stillOwnsClaim) { - if (epoch !== undefined) { + + // Release the consumer_claims row by its DB identity (consumerId, + // epoch). The in-memory write token is a separate concern (write + // authorization during the run); release of the durable row must + // succeed even if the token was lost (server restart) or evicted + // (a later wake re-minted for the same stream). + let entityCleared = false + if (epoch !== undefined) { + const result = await ctx.entityManager.registry.materializeReleasedClaim?.({ consumerId, epoch, @@ -643,28 +650,41 @@ async function callbackForward( }) : undefined, }) - } + entityCleared = result?.entityCleared ?? false + } + + // Transition entity back to idle when either signal says it's safe: + // - entityCleared: our release just cleared the entity's active + // dispatch state, so no in-flight wake remains. + // - stillOwnsClaim: this consumer is still the in-memory write-token + // owner, so no newer wake has displaced it. Covers two cases: + // (a) retry of a failed done (first attempt cleared the DB state + // but failed to update status), (b) server restart scenarios where + // the token is intact even though entityDispatchState may diverge. + // If both are false, a newer wake owns the entity — leave status as-is. + if (entity && (entityCleared || stillOwnsClaim)) { await ctx.entityManager.registry.updateStatus(entity.url, `idle`) - ctx.runtime.claimWriteTokens.clearStream( - ctx.service, - target.primaryStream - ) await ctx.entityBridgeManager.onEntityChanged(entity.url) serverLog.info( `[callback-forward] status updated to idle for ${entity.url}` ) - } else if (stillOwnsClaim) { + } else if (!entity) { + serverLog.warn( + `[callback-forward] done received but no entity found for stream=${target.primaryStream}` + ) + } + + // Clear the in-memory write token only if this consumer still owns it. + // If a newer wake has taken over, that newer wake owns the token now + // and we must not clear it out from under it. + if (stillOwnsClaim) { ctx.runtime.claimWriteTokens.clearStream( ctx.service, target.primaryStream ) } else if (entity) { serverLog.info( - `[callback-forward] done ignored for stale claim stream=${target.primaryStream} consumer=${consumerId}` - ) - } else { - serverLog.warn( - `[callback-forward] done received but no entity found for stream=${target.primaryStream}` + `[callback-forward] done arrived after in-memory token evicted (stream=${target.primaryStream} consumer=${consumerId})` ) } } else if (requestBody?.done === true) { diff --git a/packages/agents-server/test/webhook-forward-routing.test.ts b/packages/agents-server/test/webhook-forward-routing.test.ts index 409021d32e..17f46b156d 100644 --- a/packages/agents-server/test/webhook-forward-routing.test.ts +++ b/packages/agents-server/test/webhook-forward-routing.test.ts @@ -100,6 +100,11 @@ function buildContext(overrides: Partial = {}): TenantContext { registry: { getEntityByStream: vi.fn().mockResolvedValue(entity), updateStatus: vi.fn().mockResolvedValue(undefined), + materializeReleasedClaim: vi.fn().mockResolvedValue({ + claim: null, + entityCleared: true, + }), + materializeHeartbeatClaim: vi.fn().mockResolvedValue(undefined), }, enrichPayload: vi.fn(async (payload: Record) => ({ ...payload, @@ -580,4 +585,182 @@ describe(`webhook forwarding for Durable Streams subscriptions`, () => { fetchSpy.mockRestore() } }) + + describe(`callback-forward done releases the durable claim independently of the in-memory write token`, () => { + const upstreamOk = (): void => { + vi.spyOn(globalThis, `fetch`).mockResolvedValue( + new Response(JSON.stringify({ ok: true, next_wake: false }), { + headers: { 'content-type': `application/json` }, + }) + ) + } + + it(`releases the consumer_claims row and marks the entity idle when the consumer holds the in-memory write token`, async () => { + const select = selectDb([ + { + callbackUrl: `http://durable.local/v1/stream-meta/subscriptions/horton-handler/callback?service=tenant-a`, + primaryStream: `/horton/demo/main`, + }, + ]) + upstreamOk() + const ctx = buildContext({ + pgDb: { select: select.select } as any, + }) + ctx.runtime.claimWriteTokens.mint( + `tenant-a`, + `/horton/demo/main`, + `wake-1` + ) + + try { + const response = await globalRouter.fetch( + new Request(`http://agents.local/_electric/callback-forward/wake-1`, { + method: `POST`, + headers: { + 'content-type': `application/json`, + authorization: `Bearer callback-token`, + }, + body: JSON.stringify({ + epoch: 7, + acks: [{ path: `/horton/demo/main`, offset: `1` }], + done: true, + }), + }), + ctx + ) + + expect(response.status).toBe(200) + expect( + ctx.entityManager.registry.materializeReleasedClaim + ).toHaveBeenCalledWith( + expect.objectContaining({ + consumerId: `wake-1`, + epoch: 7, + ackedStreams: [{ path: `/horton/demo/main`, offset: `1` }], + }) + ) + expect(ctx.entityManager.registry.updateStatus).toHaveBeenCalledWith( + `/horton/demo`, + `idle` + ) + } finally { + vi.mocked(globalThis.fetch).mockRestore() + } + }) + + it(`releases the consumer_claims row and marks the entity idle when no in-memory write token is present for the consumer`, async () => { + const select = selectDb([ + { + callbackUrl: `http://durable.local/v1/stream-meta/subscriptions/horton-handler/callback?service=tenant-a`, + primaryStream: `/horton/demo/main`, + }, + ]) + upstreamOk() + const ctx = buildContext({ + pgDb: { select: select.select } as any, + }) + + try { + const response = await globalRouter.fetch( + new Request(`http://agents.local/_electric/callback-forward/wake-1`, { + method: `POST`, + headers: { + 'content-type': `application/json`, + authorization: `Bearer callback-token`, + }, + body: JSON.stringify({ + epoch: 7, + acks: [{ path: `/horton/demo/main`, offset: `1` }], + done: true, + }), + }), + ctx + ) + + expect(response.status).toBe(200) + expect( + ctx.entityManager.registry.materializeReleasedClaim + ).toHaveBeenCalledWith( + expect.objectContaining({ + consumerId: `wake-1`, + epoch: 7, + }) + ) + expect(ctx.entityManager.registry.updateStatus).toHaveBeenCalledWith( + `/horton/demo`, + `idle` + ) + } finally { + vi.mocked(globalThis.fetch).mockRestore() + } + }) + + it(`releases the consumer_claims row for the old consumer when a newer consumer has taken over the in-memory token, without disturbing the newer consumer's token or the entity status`, async () => { + const select = selectDb([ + { + callbackUrl: `http://durable.local/v1/stream-meta/subscriptions/horton-handler/callback?service=tenant-a`, + primaryStream: `/horton/demo/main`, + }, + ]) + upstreamOk() + const ctx = buildContext({ + pgDb: { select: select.select } as any, + }) + // The newer consumer is the active dispatch in the DB, so releasing + // the older consumer's row must not clear the entity's dispatch state. + ;( + ctx.entityManager.registry.materializeReleasedClaim as any + ).mockResolvedValue({ claim: null, entityCleared: false }) + + ctx.runtime.claimWriteTokens.mint( + `tenant-a`, + `/horton/demo/main`, + `wake-1` + ) + ctx.runtime.claimWriteTokens.mint( + `tenant-a`, + `/horton/demo/main`, + `wake-2` + ) + + try { + await globalRouter.fetch( + new Request(`http://agents.local/_electric/callback-forward/wake-1`, { + method: `POST`, + headers: { + 'content-type': `application/json`, + authorization: `Bearer callback-token`, + }, + body: JSON.stringify({ + epoch: 7, + acks: [{ path: `/horton/demo/main`, offset: `1` }], + done: true, + }), + }), + ctx + ) + + expect( + ctx.entityManager.registry.materializeReleasedClaim + ).toHaveBeenCalledWith( + expect.objectContaining({ + consumerId: `wake-1`, + epoch: 7, + }) + ) + // The newer consumer owns the entity now — its status must stay + // as-is and its in-memory write token must remain intact. + expect(ctx.entityManager.registry.updateStatus).not.toHaveBeenCalled() + expect( + ctx.runtime.claimWriteTokens.owns( + `tenant-a`, + `/horton/demo/main`, + `wake-2` + ) + ).toBe(true) + } finally { + vi.mocked(globalThis.fetch).mockRestore() + } + }) + }) })