Skip to content

Commit 22e3faf

Browse files
cojiclaude
andauthored
feat: add purgeRuns API and retainRuns auto-cleanup option (#109)
* feat: add purgeRuns API and retainRuns auto-cleanup option - `durably.purgeRuns({ olderThan, limit })` for manual batch deletion of terminal runs (completed, failed, cancelled) with cascading cleanup of steps, logs, and labels - `retainRuns: '30d'` option on createDurably for automatic periodic purge (runs once per 60s during worker polling, batch size 100) - Migration v2: adds (status, completed_at) index for efficient purge queries - Duration parser supports 'd' (days), 'h' (hours), 'm' (minutes) - 11 new tests covering all purge scenarios Closes #88 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * refactor: simplify purge implementation per review - Extract cascadeDeleteRuns() helper to eliminate duplication between deleteRun() and purgeRuns() - Extract TERMINAL_STATUSES constant to module scope - Move PURGE_INTERVAL_MS to module scope (was recreated per processOne call) - Make auto-purge fire-and-forget (void) so it doesn't block job claiming or lease renewal in the worker polling loop Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * refactor: consolidate purge index into migration v1 No backward compatibility needed — merge the (status, completed_at) index into the single v1 migration instead of keeping a separate v2. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: address simplify review findings - Move TERMINAL_STATUSES after RunStatus type definition (ordering) - Replace void fire-and-forget with .catch() to prevent unhandled rejection - Use single Date.now() variable in purge block instead of calling 3 times - Add comment documenting intentional immediate purge on first cycle Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * perf: remove redundant getRuns({ status: 'leased' }) from processOne hot path The claimNext SQL already contains an activeLeaseGuard subquery that checks NOT EXISTS for active leases with the same concurrency key. The JS-side getRuns → filter → excludeConcurrencyKeys flow was redundant and added a full table scan + JOIN + JSON parse per poll cycle. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * docs: add purgeRuns and retainRuns to website API reference Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * docs: fix purgeRuns signature — olderThan is required Date, not optional string Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: address CodeRabbit review findings - Move auto-purge after claimNext so it never serializes with job claiming - Add positive auto-purge test with backdated completed_at - Clarify retainRuns runs only during worker polling in docs - Fix purgeRuns(options?) → purgeRuns(options) in API quick reference - Clarify olderThan matches completedAt, not just "completed" status Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 459962d commit 22e3faf

12 files changed

Lines changed: 476 additions & 99 deletions

File tree

CLAUDE.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ Five tables: `durably_runs`, `durably_run_labels`, `durably_steps`, `durably_log
4545
- `leaseRenewIntervalMs`: 5000ms
4646
- `leaseMs`: 30000ms (lease duration; expired leases are reclaimed)
4747
- `preserveSteps`: false (deletes step output data when runs reach terminal state)
48+
- `retainRuns`: undefined (no automatic cleanup; set e.g. `'30d'` to auto-delete terminal runs)
4849

4950
## Browser Constraints (by design)
5051

packages/durably/docs/llms.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ const durably = createDurably({
3636
leaseRenewIntervalMs: 5000, // Lease renewal interval (ms)
3737
leaseMs: 30000, // Lease duration (ms); expired leases are reclaimed
3838
preserveSteps: false, // Set to true to keep step output data after terminal state (default: false = cleanup)
39+
retainRuns: '30d', // Auto-delete terminal runs older than 30 days (runs during worker polling; supports 'd', 'h', 'm' units)
3940
// Optional: type-safe labels with Zod schema
4041
// labels: z.object({ organizationId: z.string(), env: z.string() }),
4142
jobs: {
@@ -229,6 +230,21 @@ await durably.cancel(runId)
229230
await durably.deleteRun(runId)
230231
```
231232

233+
### Purge Old Runs
234+
235+
Batch-delete terminal runs (completed, failed, cancelled) older than a cutoff date.
236+
Pending and leased runs are never deleted.
237+
238+
```ts
239+
// Delete terminal runs older than 30 days
240+
const deleted = await durably.purgeRuns({
241+
olderThan: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000),
242+
limit: 500, // optional batch size (default: 1000)
243+
})
244+
```
245+
246+
For automatic cleanup, use the `retainRuns` option (see Quick Start). Cleanup runs during idle worker polling cycles, at most once per minute, in batches of 100.
247+
232248
## Events
233249

234250
Subscribe to job execution events:

packages/durably/src/durably.ts

Lines changed: 62 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,12 @@ export interface DurablyOptions<
7171
* ```
7272
*/
7373
jobs?: TJobs
74+
/**
75+
* Auto-delete terminal runs older than the specified duration.
76+
* Only runs in terminal states (completed, failed, cancelled) are purged.
77+
* @example '30d' (30 days), '24h' (24 hours), '60m' (60 minutes)
78+
*/
79+
retainRuns?: string
7480
}
7581

7682
/**
@@ -83,6 +89,25 @@ const DEFAULTS = {
8389
preserveSteps: false,
8490
} as const
8591

92+
function parseDuration(value: string): number {
93+
const match = value.match(/^(\d+)(d|h|m)$/)
94+
if (!match) {
95+
throw new Error(
96+
`Invalid duration format: "${value}". Use e.g. '30d', '24h', '60m'`,
97+
)
98+
}
99+
const num = Number.parseInt(match[1], 10)
100+
const unit = match[2]
101+
const multipliers: Record<string, number> = {
102+
d: 86400000,
103+
h: 3600000,
104+
m: 60000,
105+
}
106+
return num * multipliers[unit]
107+
}
108+
109+
const PURGE_INTERVAL_MS = 60_000
110+
86111
const ulid = monotonicFactory()
87112
const BROWSER_SINGLETON_REGISTRY_KEY = '__durablyBrowserSingletonRegistry'
88113
const BROWSER_LOCAL_DIALECT_KEY = '__durablyBrowserLocalKey'
@@ -307,6 +332,13 @@ export interface Durably<
307332
*/
308333
deleteRun(runId: string): Promise<void>
309334

335+
/**
336+
* Delete terminal runs older than the specified cutoff.
337+
* Only runs in terminal states (completed, failed, cancelled) are purged.
338+
* @returns Number of deleted runs
339+
*/
340+
purgeRuns(options: { olderThan: Date; limit?: number }): Promise<number>
341+
310342
/**
311343
* Get a run by ID
312344
* @example
@@ -376,6 +408,8 @@ interface DurablyState<
376408
migrated: boolean
377409
leaseMs: number
378410
leaseRenewIntervalMs: number
411+
retainRunsMs: number | null
412+
lastPurgeAt: number
379413
releaseBrowserSingleton: () => void
380414
}
381415

@@ -864,26 +898,38 @@ function createDurablyInstance<
864898
})
865899
},
866900

901+
async purgeRuns(options: {
902+
olderThan: Date
903+
limit?: number
904+
}): Promise<number> {
905+
return storage.purgeRuns({
906+
olderThan: options.olderThan.toISOString(),
907+
limit: options.limit,
908+
})
909+
},
910+
867911
async processOne(options?: { workerId?: string }): Promise<boolean> {
868912
const workerId = options?.workerId ?? defaultWorkerId()
869913
const now = new Date().toISOString()
870914

871915
await storage.releaseExpiredLeases(now)
872916

873-
const leasedRuns = await storage.getRuns({ status: 'leased' })
874-
const excludeConcurrencyKeys = leasedRuns
875-
.filter(
876-
(entry): entry is Run<TLabels> & { concurrencyKey: string } =>
877-
entry.concurrencyKey !== null &&
878-
entry.leaseExpiresAt !== null &&
879-
entry.leaseExpiresAt > now,
880-
)
881-
.map((entry) => entry.concurrencyKey)
882-
883-
const run = await storage.claimNext(workerId, now, state.leaseMs, {
884-
excludeConcurrencyKeys,
885-
})
917+
const run = await storage.claimNext(workerId, now, state.leaseMs)
886918
if (!run) {
919+
// Auto-purge old terminal runs if retainRuns is configured.
920+
// Runs after claimNext so purge never serializes with job claiming.
921+
// lastPurgeAt starts at 0, so the first idle cycle purges immediately.
922+
if (
923+
state.retainRunsMs !== null &&
924+
Date.now() - state.lastPurgeAt >= PURGE_INTERVAL_MS
925+
) {
926+
const purgeNow = Date.now()
927+
state.lastPurgeAt = purgeNow
928+
const cutoff = new Date(purgeNow - state.retainRunsMs).toISOString()
929+
storage.purgeRuns({ olderThan: cutoff, limit: 100 }).catch(() => {
930+
// Purge failure is non-fatal — will retry on next interval
931+
})
932+
}
887933
return false
888934
}
889935

@@ -978,6 +1024,7 @@ export function createDurably<
9781024
options.leaseRenewIntervalMs ?? DEFAULTS.leaseRenewIntervalMs,
9791025
leaseMs: options.leaseMs ?? DEFAULTS.leaseMs,
9801026
preserveSteps: options.preserveSteps ?? DEFAULTS.preserveSteps,
1027+
retainRunsMs: options.retainRuns ? parseDuration(options.retainRuns) : null,
9811028
}
9821029

9831030
const db = new Kysely<Database>({ dialect: options.dialect })
@@ -1023,6 +1070,8 @@ export function createDurably<
10231070
migrated: false,
10241071
leaseMs: config.leaseMs,
10251072
leaseRenewIntervalMs: config.leaseRenewIntervalMs,
1073+
retainRunsMs: config.retainRunsMs,
1074+
lastPurgeAt: 0,
10261075
releaseBrowserSingleton,
10271076
}
10281077

packages/durably/src/migrations.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,13 @@ const migrations: Migration[] = [
8080
.columns(['job_name', 'created_at'])
8181
.execute()
8282

83+
await db.schema
84+
.createIndex('idx_durably_runs_status_completed')
85+
.ifNotExists()
86+
.on('durably_runs')
87+
.columns(['status', 'completed_at'])
88+
.execute()
89+
8390
// Create normalized labels table for indexed label filtering
8491
await db.schema
8592
.createTable('durably_run_labels')

packages/durably/src/storage.ts

Lines changed: 46 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ export type RunStatus =
1111
| 'failed'
1212
| 'cancelled'
1313

14+
/** Run statuses that represent terminal (non-active) states */
15+
const TERMINAL_STATUSES: RunStatus[] = ['completed', 'failed', 'cancelled']
16+
1417
/**
1518
* Run data for creating a new run
1619
*/
@@ -125,10 +128,6 @@ export interface ProgressData {
125128
message?: string
126129
}
127130

128-
export interface ClaimOptions {
129-
excludeConcurrencyKeys?: string[]
130-
}
131-
132131
export type DatabaseBackend = 'generic' | 'postgres'
133132

134133
/**
@@ -169,7 +168,6 @@ export interface Store<
169168
workerId: string,
170169
now: string,
171170
leaseMs: number,
172-
options?: ClaimOptions,
173171
): Promise<Run<TLabels> | null>
174172
renewLease(
175173
runId: string,
@@ -215,6 +213,9 @@ export interface Store<
215213
progress: ProgressData | null,
216214
): Promise<void>
217215

216+
// Purge
217+
purgeRuns(options: { olderThan: string; limit?: number }): Promise<number>
218+
218219
// Logs
219220
createLog(input: CreateLogInput): Promise<Log>
220221
getLogs(runId: string): Promise<Log[]>
@@ -361,6 +362,20 @@ export function createKyselyStore(
361362
): Store<Record<string, string>> {
362363
const withWriteLock = createWriteMutex()
363364

365+
/** Delete runs and all associated data (steps, logs, labels) in dependency order */
366+
async function cascadeDeleteRuns(
367+
trx: Kysely<Database>,
368+
ids: string[],
369+
): Promise<void> {
370+
await trx.deleteFrom('durably_steps').where('run_id', 'in', ids).execute()
371+
await trx.deleteFrom('durably_logs').where('run_id', 'in', ids).execute()
372+
await trx
373+
.deleteFrom('durably_run_labels')
374+
.where('run_id', 'in', ids)
375+
.execute()
376+
await trx.deleteFrom('durably_runs').where('id', 'in', ids).execute()
377+
}
378+
364379
async function insertLabelRows(
365380
executor: Kysely<Database>,
366381
runId: string,
@@ -648,30 +663,40 @@ export function createKyselyStore(
648663

649664
async deleteRun(runId: string) {
650665
await db.transaction().execute(async (trx) => {
651-
await trx
652-
.deleteFrom('durably_steps')
653-
.where('run_id', '=', runId)
654-
.execute()
655-
await trx
656-
.deleteFrom('durably_logs')
657-
.where('run_id', '=', runId)
658-
.execute()
659-
await trx
660-
.deleteFrom('durably_run_labels')
661-
.where('run_id', '=', runId)
666+
await cascadeDeleteRuns(trx, [runId])
667+
})
668+
},
669+
670+
async purgeRuns(options: {
671+
olderThan: string
672+
limit?: number
673+
}): Promise<number> {
674+
const limit = options.limit ?? 1000
675+
676+
return await db.transaction().execute(async (trx) => {
677+
const rows = await trx
678+
.selectFrom('durably_runs')
679+
.select('id')
680+
.where('status', 'in', TERMINAL_STATUSES)
681+
.where('completed_at', '<', options.olderThan)
682+
.orderBy('completed_at', 'asc')
683+
.limit(limit)
662684
.execute()
663-
await trx.deleteFrom('durably_runs').where('id', '=', runId).execute()
685+
686+
if (rows.length === 0) return 0
687+
688+
const ids = rows.map((r) => r.id)
689+
await cascadeDeleteRuns(trx, ids)
690+
return ids.length
664691
})
665692
},
666693

667694
async claimNext(
668695
workerId: string,
669696
now: string,
670697
leaseMs: number,
671-
options?: ClaimOptions,
672698
): Promise<Run | null> {
673699
const leaseExpiresAt = new Date(Date.parse(now) + leaseMs).toISOString()
674-
const excludeConcurrencyKeys = options?.excludeConcurrencyKeys ?? []
675700
const activeLeaseGuard = sql<boolean>`
676701
(
677702
concurrency_key IS NULL
@@ -689,7 +714,7 @@ export function createKyselyStore(
689714

690715
if (backend === 'postgres') {
691716
return await db.transaction().execute(async (trx) => {
692-
const skipKeys = [...excludeConcurrencyKeys]
717+
const skipKeys: string[] = []
693718

694719
// Loop: on concurrency-key conflict, exclude that key and retry
695720
// to find the next eligible candidate in the same transaction.
@@ -790,15 +815,6 @@ export function createKyselyStore(
790815
.orderBy('id', 'asc')
791816
.limit(1)
792817

793-
if (excludeConcurrencyKeys.length > 0) {
794-
subquery = subquery.where((eb) =>
795-
eb.or([
796-
eb('concurrency_key', 'is', null),
797-
eb('concurrency_key', 'not in', excludeConcurrencyKeys),
798-
]),
799-
)
800-
}
801-
802818
const row = await db
803819
.updateTable('durably_runs')
804820
.set({
@@ -1035,6 +1051,7 @@ export function createKyselyStore(
10351051
'enqueueMany',
10361052
'updateRun',
10371053
'deleteRun',
1054+
'purgeRuns',
10381055
'claimNext',
10391056
'renewLease',
10401057
'releaseExpiredLeases',

packages/durably/tests/node/migration-consolidated.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ describe('migration consolidated schema', () => {
6161
expect(indexNames).toContain('idx_durably_runs_status_created')
6262
expect(indexNames).toContain('idx_durably_runs_status_lease_expires')
6363
expect(indexNames).toContain('idx_durably_runs_job_created')
64+
expect(indexNames).toContain('idx_durably_runs_status_completed')
6465

6566
// Steps indexes
6667
expect(indexNames).toContain('idx_durably_steps_run_index')
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
import { createNodeDialect } from '../helpers/node-dialect'
2+
import { createPurgeTests } from '../shared/purge.shared'
3+
4+
createPurgeTests(createNodeDialect)

0 commit comments

Comments
 (0)