Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/preserve-lease-on-heartbeat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@electric-ax/agents-server': patch
---

Fix `materializeHeartbeatClaim` nulling out `consumer_claims.lease_expires_at` when called without a lease argument. The heartbeat path is now an alive-ping only — it updates `last_heartbeat_at` and leaves the lease (set at claim materialization time from the upstream `lease_ttl_ms`) intact. Callers that genuinely want to extend the lease can still pass `leaseExpiresAt` explicitly.
8 changes: 7 additions & 1 deletion packages/agents-server/src/entity-registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,11 +360,17 @@ export class PostgresRegistry {
input: MaterializeHeartbeatClaimInput
): Promise<void> {
const heartbeatAt = input.heartbeatAt ?? new Date()
// Only touch leaseExpiresAt when the caller explicitly provides one.
// The lease was set at materializeActiveClaim time from the upstream
// lease_ttl_ms and remains the authoritative expiry; heartbeats are
// alive-pings, not lease extensions.
await this.db
.update(consumerClaims)
.set({
lastHeartbeatAt: heartbeatAt,
leaseExpiresAt: input.leaseExpiresAt ?? null,
...(input.leaseExpiresAt !== undefined
? { leaseExpiresAt: input.leaseExpiresAt }
: {}),
updatedAt: heartbeatAt,
})
.where(
Expand Down
97 changes: 97 additions & 0 deletions packages/agents-server/test/consumer-claim-registry.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import { and, eq } from 'drizzle-orm'
import { afterAll, beforeAll, beforeEach, describe, expect, it } from 'vitest'
import { createDb } from '../src/db/index'
import { consumerClaims } from '../src/db/schema'
import { PostgresRegistry } from '../src/entity-registry'
import {
TEST_POSTGRES_URL,
resetElectricAgentsTestBackend,
} from './test-backend'

describe(`PostgresRegistry consumer-claim heartbeat (regression for #4341)`, () => {
let registry: PostgresRegistry
let db: ReturnType<typeof createDb>[`db`]
let client: ReturnType<typeof createDb>[`client`]

beforeAll(async () => {
await resetElectricAgentsTestBackend()
const connection = createDb(TEST_POSTGRES_URL)
db = connection.db
client = connection.client
registry = new PostgresRegistry(db)
}, 120_000)

beforeEach(async () => {
await resetElectricAgentsTestBackend()
}, 120_000)

afterAll(async () => {
await client?.end()
}, 120_000)

async function readLease(
consumerId: string,
epoch: number
): Promise<Date | null> {
const rows = await db
.select()
.from(consumerClaims)
.where(
and(
eq(consumerClaims.consumerId, consumerId),
eq(consumerClaims.epoch, epoch)
)
)
.limit(1)
return rows[0]?.leaseExpiresAt ?? null
}

it(`preserves lease_expires_at when heartbeat is called without one`, async () => {
const claimedAt = new Date(`2026-05-19T10:00:00Z`)
const lease = new Date(`2026-05-19T10:00:30Z`)
await registry.materializeActiveClaim({
consumerId: `wake-preserve`,
epoch: 1,
entityUrl: `/horton/preserve`,
streamPath: `/horton/preserve/main`,
claimedAt,
leaseExpiresAt: lease,
})

expect(await readLease(`wake-preserve`, 1)).toEqual(lease)

// Heartbeat with no leaseExpiresAt — must not null the column.
await registry.materializeHeartbeatClaim({
consumerId: `wake-preserve`,
epoch: 1,
heartbeatAt: new Date(`2026-05-19T10:00:10Z`),
})

expect(await readLease(`wake-preserve`, 1)).toEqual(lease)
})

it(`updates lease_expires_at when heartbeat explicitly provides one`, async () => {
const claimedAt = new Date(`2026-05-19T10:00:00Z`)
const initialLease = new Date(`2026-05-19T10:00:30Z`)
const extendedLease = new Date(`2026-05-19T10:01:00Z`)
await registry.materializeActiveClaim({
consumerId: `wake-extend`,
epoch: 1,
entityUrl: `/horton/extend`,
streamPath: `/horton/extend/main`,
claimedAt,
leaseExpiresAt: initialLease,
})

expect(await readLease(`wake-extend`, 1)).toEqual(initialLease)

await registry.materializeHeartbeatClaim({
consumerId: `wake-extend`,
epoch: 1,
heartbeatAt: new Date(`2026-05-19T10:00:20Z`),
leaseExpiresAt: extendedLease,
})

expect(await readLease(`wake-extend`, 1)).toEqual(extendedLease)
})
})
Loading