From 9526cbafbc347071a485d347095f642f77881d78 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 19 May 2026 13:16:57 +0200 Subject: [PATCH] fix(agents-server): preserve consumer_claims.lease_expires_at across heartbeats The per-wake heartbeat caller in callback-forward does not pass leaseExpiresAt to materializeHeartbeatClaim. The registry method was unconditionally writing `input.leaseExpiresAt ?? null`, so the first heartbeat (~10s after dispatch) was nulling the lease and leaving every active claim row without an expiry for the rest of its lifetime. Treat heartbeats as alive-pings only: update last_heartbeat_at and leave lease_expires_at alone unless the caller explicitly provides a new lease. The lease set by materializeActiveClaim from the upstream lease_ttl_ms stays authoritative. Adds an integration test that materializes an active claim with a lease, heartbeats without one, and verifies the lease is preserved (and a second test verifying the lease IS updated when explicitly extended). Fixes #4341. Co-Authored-By: Claude Opus 4.7 (1M context) --- .changeset/preserve-lease-on-heartbeat.md | 5 + packages/agents-server/src/entity-registry.ts | 8 +- .../test/consumer-claim-registry.test.ts | 97 +++++++++++++++++++ 3 files changed, 109 insertions(+), 1 deletion(-) create mode 100644 .changeset/preserve-lease-on-heartbeat.md create mode 100644 packages/agents-server/test/consumer-claim-registry.test.ts diff --git a/.changeset/preserve-lease-on-heartbeat.md b/.changeset/preserve-lease-on-heartbeat.md new file mode 100644 index 0000000000..80e4539cf2 --- /dev/null +++ b/.changeset/preserve-lease-on-heartbeat.md @@ -0,0 +1,5 @@ +--- +'@electric-ax/agents-server': patch +--- + +Fix `materializeHeartbeatClaim` nulling out `consumer_claims.lease_expires_at` when called without a lease argument. The heartbeat path is now an alive-ping only — it updates `last_heartbeat_at` and leaves the lease (set at claim materialization time from the upstream `lease_ttl_ms`) intact. Callers that genuinely want to extend the lease can still pass `leaseExpiresAt` explicitly. diff --git a/packages/agents-server/src/entity-registry.ts b/packages/agents-server/src/entity-registry.ts index a0c5b23426..4eb2bede8e 100644 --- a/packages/agents-server/src/entity-registry.ts +++ b/packages/agents-server/src/entity-registry.ts @@ -360,11 +360,17 @@ export class PostgresRegistry { input: MaterializeHeartbeatClaimInput ): Promise { const heartbeatAt = input.heartbeatAt ?? new Date() + // Only touch leaseExpiresAt when the caller explicitly provides one. + // The lease was set at materializeActiveClaim time from the upstream + // lease_ttl_ms and remains the authoritative expiry; heartbeats are + // alive-pings, not lease extensions. await this.db .update(consumerClaims) .set({ lastHeartbeatAt: heartbeatAt, - leaseExpiresAt: input.leaseExpiresAt ?? null, + ...(input.leaseExpiresAt !== undefined + ? { leaseExpiresAt: input.leaseExpiresAt } + : {}), updatedAt: heartbeatAt, }) .where( diff --git a/packages/agents-server/test/consumer-claim-registry.test.ts b/packages/agents-server/test/consumer-claim-registry.test.ts new file mode 100644 index 0000000000..e41166dd78 --- /dev/null +++ b/packages/agents-server/test/consumer-claim-registry.test.ts @@ -0,0 +1,97 @@ +import { and, eq } from 'drizzle-orm' +import { afterAll, beforeAll, beforeEach, describe, expect, it } from 'vitest' +import { createDb } from '../src/db/index' +import { consumerClaims } from '../src/db/schema' +import { PostgresRegistry } from '../src/entity-registry' +import { + TEST_POSTGRES_URL, + resetElectricAgentsTestBackend, +} from './test-backend' + +describe(`PostgresRegistry consumer-claim heartbeat (regression for #4341)`, () => { + let registry: PostgresRegistry + let db: ReturnType[`db`] + let client: ReturnType[`client`] + + beforeAll(async () => { + await resetElectricAgentsTestBackend() + const connection = createDb(TEST_POSTGRES_URL) + db = connection.db + client = connection.client + registry = new PostgresRegistry(db) + }, 120_000) + + beforeEach(async () => { + await resetElectricAgentsTestBackend() + }, 120_000) + + afterAll(async () => { + await client?.end() + }, 120_000) + + async function readLease( + consumerId: string, + epoch: number + ): Promise { + const rows = await db + .select() + .from(consumerClaims) + .where( + and( + eq(consumerClaims.consumerId, consumerId), + eq(consumerClaims.epoch, epoch) + ) + ) + .limit(1) + return rows[0]?.leaseExpiresAt ?? null + } + + it(`preserves lease_expires_at when heartbeat is called without one`, async () => { + const claimedAt = new Date(`2026-05-19T10:00:00Z`) + const lease = new Date(`2026-05-19T10:00:30Z`) + await registry.materializeActiveClaim({ + consumerId: `wake-preserve`, + epoch: 1, + entityUrl: `/horton/preserve`, + streamPath: `/horton/preserve/main`, + claimedAt, + leaseExpiresAt: lease, + }) + + expect(await readLease(`wake-preserve`, 1)).toEqual(lease) + + // Heartbeat with no leaseExpiresAt — must not null the column. + await registry.materializeHeartbeatClaim({ + consumerId: `wake-preserve`, + epoch: 1, + heartbeatAt: new Date(`2026-05-19T10:00:10Z`), + }) + + expect(await readLease(`wake-preserve`, 1)).toEqual(lease) + }) + + it(`updates lease_expires_at when heartbeat explicitly provides one`, async () => { + const claimedAt = new Date(`2026-05-19T10:00:00Z`) + const initialLease = new Date(`2026-05-19T10:00:30Z`) + const extendedLease = new Date(`2026-05-19T10:01:00Z`) + await registry.materializeActiveClaim({ + consumerId: `wake-extend`, + epoch: 1, + entityUrl: `/horton/extend`, + streamPath: `/horton/extend/main`, + claimedAt, + leaseExpiresAt: initialLease, + }) + + expect(await readLease(`wake-extend`, 1)).toEqual(initialLease) + + await registry.materializeHeartbeatClaim({ + consumerId: `wake-extend`, + epoch: 1, + heartbeatAt: new Date(`2026-05-19T10:00:20Z`), + leaseExpiresAt: extendedLease, + }) + + expect(await readLease(`wake-extend`, 1)).toEqual(extendedLease) + }) +})