Skip to content

Commit 44f147e

Browse files
matt-aitkenclaude
andcommitted
fix(webapp): use composite keyset cursor for run pagination
listRunIds/listRuns order by the composite key (created_at, run_id) but the cursor predicate cut on run_id alone. That is only sound when run_id lexicographic order matches created_at order. When a burst of runs is created such that the two diverge, keyset pagination both re-includes already-returned runs (duplicates) and drops runs it should return (skips). For bulk replay this produced duplicate runs; for the dashboard and runs.list it could silently skip or repeat runs at page boundaries. - Encode cursors as the composite (created_at, run_id) key (v2_<createdAtMs>_<runId>) and cut on the matching tuple predicate ((created_at, run_id) < / > (...)). The ORDER BY is unchanged, so the table's primary-key alignment (and query performance) is preserved. - Cursors are server-issued opaque tokens (the SDK just echoes pagination.next/previous back), so this needs no client update. Legacy bare-run_id cursors decode to the old run_id-only predicate for backwards compatibility with in-flight cursors. - Add listRunIdsWithCursor for forward-only batch iteration (bulk actions) so the created_at component is sourced from the same query that orders the rows. - ClickHouse getTaskRunsQueryBuilder now also selects toUnixTimestamp64Milli(created_at) AS created_at_ms. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent d1f4302 commit 44f147e

7 files changed

Lines changed: 473 additions & 30 deletions

File tree

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Fix unsound keyset pagination in `ClickHouseRunsRepository` that could duplicate
7+
or skip runs. The list query orders by `(created_at, run_id)` but the cursor
8+
predicate cut on `run_id` alone, which is only sound when run_id order matches
9+
created_at order. When a burst of runs is created such that the two diverge,
10+
bulk replay could re-process already-replayed runs, and dashboard / `runs.list`
11+
pagination could silently skip or repeat runs at page boundaries.
12+
13+
Cursors now encode the composite `(created_at, run_id)` key (`v2_<ms>_<runId>`)
14+
and the query cuts on the matching tuple. Cursors are server-issued opaque
15+
tokens, so this needs no SDK update; legacy bare-run_id cursors decode to the
16+
old `run_id`-only predicate for backwards compatibility with in-flight cursors.

apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts

Lines changed: 82 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ import {
1010
convertRunListInputOptionsToFilterRunsOptions,
1111
} from "./runsRepository.server";
1212
import parseDuration from "parse-duration";
13+
import { decodeRunsCursor, encodeRunsCursor } from "./runsCursor.server";
14+
15+
type RunCursorRow = { runId: string; createdAt: number };
1316

1417
export class ClickHouseRunsRepository implements IRunsRepository {
1518
constructor(private readonly options: RunsRepositoryOptions) {}
@@ -18,25 +21,51 @@ export class ClickHouseRunsRepository implements IRunsRepository {
1821
return "clickhouse";
1922
}
2023

21-
async listRunIds(options: ListRunsOptions) {
24+
/**
25+
* Runs the keyset-paginated query and returns `{ runId, createdAt }` rows
26+
* (one extra beyond `page.size` to signal "has more"). The ordering is always
27+
* the composite `(created_at, run_id)`; the cursor predicate must match it.
28+
*
29+
* v2 cursors carry both components, so we cut on the `(created_at, run_id)`
30+
* tuple — sound regardless of how run_id order relates to created_at order.
31+
* Legacy bare-run_id cursors fall back to the old `run_id`-only predicate
32+
* (knowingly unsound) for backwards compatibility with in-flight cursors.
33+
*/
34+
private async listRunRows(options: ListRunsOptions): Promise<RunCursorRow[]> {
2235
const queryBuilder = this.options.clickhouse.taskRuns.queryBuilder();
2336
applyRunFiltersToQueryBuilder(
2437
queryBuilder,
2538
await convertRunListInputOptionsToFilterRunsOptions(options, this.options.prisma)
2639
);
2740

41+
const forward = options.page.direction === "forward" || !options.page.direction;
42+
2843
if (options.page.cursor) {
29-
if (options.page.direction === "forward" || !options.page.direction) {
30-
queryBuilder
31-
.where("run_id < {runId: String}", { runId: options.page.cursor })
32-
.orderBy("created_at DESC, run_id DESC")
33-
.limit(options.page.size + 1);
44+
const decoded = decodeRunsCursor(options.page.cursor);
45+
46+
if (forward) {
47+
if (decoded.kind === "composite") {
48+
queryBuilder.where(
49+
"(created_at, run_id) < (fromUnixTimestamp64Milli({cursorCreatedAt: Int64}), {runId: String})",
50+
{ cursorCreatedAt: decoded.createdAt, runId: decoded.runId }
51+
);
52+
} else {
53+
queryBuilder.where("run_id < {runId: String}", { runId: decoded.runId });
54+
}
55+
queryBuilder.orderBy("created_at DESC, run_id DESC");
3456
} else {
35-
queryBuilder
36-
.where("run_id > {runId: String}", { runId: options.page.cursor })
37-
.orderBy("created_at ASC, run_id ASC")
38-
.limit(options.page.size + 1);
57+
if (decoded.kind === "composite") {
58+
queryBuilder.where(
59+
"(created_at, run_id) > (fromUnixTimestamp64Milli({cursorCreatedAt: Int64}), {runId: String})",
60+
{ cursorCreatedAt: decoded.createdAt, runId: decoded.runId }
61+
);
62+
} else {
63+
queryBuilder.where("run_id > {runId: String}", { runId: decoded.runId });
64+
}
65+
queryBuilder.orderBy("created_at ASC, run_id ASC");
3966
}
67+
68+
queryBuilder.limit(options.page.size + 1);
4069
} else {
4170
// Initial page - no cursor provided
4271
queryBuilder.orderBy("created_at DESC, run_id DESC").limit(options.page.size + 1);
@@ -48,8 +77,31 @@ export class ClickHouseRunsRepository implements IRunsRepository {
4877
throw queryError;
4978
}
5079

51-
const runIds = result.map((row) => row.run_id);
52-
return runIds;
80+
return result.map((row) => ({ runId: row.run_id, createdAt: row.created_at_ms }));
81+
}
82+
83+
async listRunIds(options: ListRunsOptions) {
84+
const rows = await this.listRunRows(options);
85+
return rows.map((row) => row.runId);
86+
}
87+
88+
/**
89+
* Forward-only batch iteration (bulk actions). Returns up to `page.size` run
90+
* ids plus the composite cursor for the next batch (null when this batch is
91+
* empty). The `created_at` component comes from the same query that orders the
92+
* rows, so the next batch's tuple predicate is always consistent.
93+
*/
94+
async listRunIdsWithCursor(
95+
options: ListRunsOptions
96+
): Promise<{ runIds: string[]; nextCursor: string | null }> {
97+
const rows = await this.listRunRows(options);
98+
const batch = rows.slice(0, options.page.size);
99+
const last = batch.at(-1);
100+
101+
return {
102+
runIds: batch.map((row) => row.runId),
103+
nextCursor: last ? encodeRunsCursor(last.createdAt, last.runId) : null,
104+
};
53105
}
54106

55107
async listFriendlyRunIds(options: ListRunsOptions) {
@@ -76,10 +128,15 @@ export class ClickHouseRunsRepository implements IRunsRepository {
76128
}
77129

78130
async listRuns(options: ListRunsOptions) {
79-
const runIds = await this.listRunIds(options);
131+
const rows = await this.listRunRows(options);
80132

81133
// If there are more runs than the page size, we need to fetch the next page
82-
const hasMore = runIds.length > options.page.size;
134+
const hasMore = rows.length > options.page.size;
135+
136+
// Cursors carry both (created_at, run_id) so the next/prev page predicate
137+
// matches the composite ordering — see runsCursor.server.ts.
138+
const cursorFor = (row: RunCursorRow | undefined): string | null =>
139+
row ? encodeRunsCursor(row.createdAt, row.runId) : null;
83140

84141
let nextCursor: string | null = null;
85142
let previousCursor: string | null = null;
@@ -88,30 +145,31 @@ export class ClickHouseRunsRepository implements IRunsRepository {
88145
const direction = options.page.direction ?? "forward";
89146
switch (direction) {
90147
case "forward": {
91-
previousCursor = options.page.cursor ? runIds.at(0) ?? null : null;
148+
previousCursor = options.page.cursor ? cursorFor(rows.at(0)) : null;
92149
if (hasMore) {
93-
// The next cursor should be the last run ID from this page
94-
nextCursor = runIds[options.page.size - 1];
150+
// The next cursor should be the last run from this page
151+
nextCursor = cursorFor(rows[options.page.size - 1]);
95152
}
96153
break;
97154
}
98155
case "backward": {
99-
const reversedRunIds = [...runIds].reverse();
156+
const reversedRows = [...rows].reverse();
100157
if (hasMore) {
101-
previousCursor = reversedRunIds.at(1) ?? null;
102-
nextCursor = reversedRunIds.at(options.page.size) ?? null;
158+
previousCursor = cursorFor(reversedRows.at(1));
159+
nextCursor = cursorFor(reversedRows.at(options.page.size));
103160
} else {
104-
nextCursor = reversedRunIds.at(options.page.size - 1) ?? null;
161+
nextCursor = cursorFor(reversedRows.at(options.page.size - 1));
105162
}
106163

107164
break;
108165
}
109166
}
110167

111-
const runIdsToReturn =
168+
const runIdsToReturn = (
112169
options.page.direction === "backward" && hasMore
113-
? runIds.slice(1, options.page.size + 1)
114-
: runIds.slice(0, options.page.size);
170+
? rows.slice(1, options.page.size + 1)
171+
: rows.slice(0, options.page.size)
172+
).map((row) => row.runId);
115173

116174
let runs = await this.options.prisma.taskRun.findMany({
117175
where: {
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/**
2+
* Cursor encoding for keyset pagination over `(created_at, run_id)`.
3+
*
4+
* The list query orders by the composite key `(created_at, run_id)`, so a sound
5+
* cursor must carry BOTH components — cutting on `run_id` alone re-includes and
6+
* skips rows whenever `run_id` order diverges from `created_at` order.
7+
*
8+
* A v2 cursor is `v2_<createdAtMs>_<runId>`. Run ids are cuids (no underscore,
9+
* never start with `v2_`), so a bare run_id is unambiguously a *legacy* cursor:
10+
* we decode it to `{ kind: "legacy" }` and fall back to the old `run_id`-only
11+
* predicate. Cursors are server-issued opaque tokens (the SDK just echoes
12+
* `pagination.next`/`previous` back), so this format change needs no client
13+
* update — only legacy in-flight cursors, which drain naturally.
14+
*/
15+
const RUNS_CURSOR_V2_PREFIX = "v2_";
16+
17+
export type DecodedRunsCursor =
18+
| { kind: "composite"; createdAt: number; runId: string }
19+
| { kind: "legacy"; runId: string };
20+
21+
export function encodeRunsCursor(createdAtMs: number, runId: string): string {
22+
return `${RUNS_CURSOR_V2_PREFIX}${createdAtMs}_${runId}`;
23+
}
24+
25+
export function decodeRunsCursor(cursor: string): DecodedRunsCursor {
26+
if (cursor.startsWith(RUNS_CURSOR_V2_PREFIX)) {
27+
const rest = cursor.slice(RUNS_CURSOR_V2_PREFIX.length);
28+
const separatorIndex = rest.indexOf("_");
29+
if (separatorIndex > 0) {
30+
const createdAt = Number(rest.slice(0, separatorIndex));
31+
const runId = rest.slice(separatorIndex + 1);
32+
if (Number.isFinite(createdAt) && runId.length > 0) {
33+
return { kind: "composite", createdAt, runId };
34+
}
35+
}
36+
}
37+
38+
return { kind: "legacy", runId: cursor };
39+
}

apps/webapp/app/services/runsRepository/runsRepository.server.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,15 @@ export type TagList = {
130130
export interface IRunsRepository {
131131
name: string;
132132
listRunIds(options: ListRunsOptions): Promise<string[]>;
133+
/**
134+
* Forward-only batch iteration (bulk actions). Returns up to `page.size` run
135+
* ids and the composite cursor for the next batch (null when the batch is
136+
* empty). Keeping cursor construction here ensures the `created_at` component
137+
* comes from the same source as the ordering.
138+
*/
139+
listRunIdsWithCursor(
140+
options: ListRunsOptions
141+
): Promise<{ runIds: string[]; nextCursor: string | null }>;
133142
/** Returns friendly IDs (e.g., run_xxx) instead of internal UUIDs. Used for ClickHouse task_events queries. */
134143
listFriendlyRunIds(options: ListRunsOptions): Promise<string[]>;
135144
listRuns(options: ListRunsOptions): Promise<{
@@ -169,6 +178,23 @@ export class RunsRepository implements IRunsRepository {
169178
);
170179
}
171180

181+
async listRunIdsWithCursor(
182+
options: ListRunsOptions
183+
): Promise<{ runIds: string[]; nextCursor: string | null }> {
184+
return startActiveSpan(
185+
"runsRepository.listRunIdsWithCursor",
186+
async () => this.clickHouseRunsRepository.listRunIdsWithCursor(options),
187+
{
188+
attributes: {
189+
"repository.name": "clickhouse",
190+
organizationId: options.organizationId,
191+
projectId: options.projectId,
192+
environmentId: options.environmentId,
193+
},
194+
}
195+
);
196+
}
197+
172198
async listFriendlyRunIds(options: ListRunsOptions): Promise<string[]> {
173199
return startActiveSpan(
174200
"runsRepository.listFriendlyRunIds",

apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,10 @@ export class BulkActionService extends BaseService {
159159
throw new Error(`Bulk action group has invalid query name: ${group.queryName}`);
160160
}
161161

162-
// 2. Get the runs to process in this batch
163-
const runIds = await runsRepository.listRunIds({
162+
// 2. Get the runs to process in this batch, plus the cursor for the next
163+
// batch. The cursor is a composite (created_at, run_id) keyset cursor so the
164+
// next batch can't re-include or skip runs.
165+
const { runIds: runIdsToProcess, nextCursor } = await runsRepository.listRunIdsWithCursor({
164166
...filters,
165167
page: {
166168
size: env.BULK_ACTION_BATCH_SIZE,
@@ -172,8 +174,6 @@ export class BulkActionService extends BaseService {
172174
// 3. Process the runs
173175
let successCount = 0;
174176
let failureCount = 0;
175-
// Slice because we fetch an extra for the cursor
176-
const runIdsToProcess = runIds.slice(0, env.BULK_ACTION_BATCH_SIZE);
177177

178178
switch (group.type) {
179179
case BulkActionType.CANCEL: {
@@ -292,7 +292,7 @@ export class BulkActionService extends BaseService {
292292
const updatedGroup = await this._prisma.bulkActionGroup.update({
293293
where: { id: bulkActionId },
294294
data: {
295-
cursor: runIdsToProcess.at(runIdsToProcess.length - 1),
295+
cursor: nextCursor,
296296
successCount: {
297297
increment: successCount,
298298
},

0 commit comments

Comments
 (0)