Skip to content

Commit 2a32386

Browse files
committed
feat: replicate trigger_source, root_trigger_source, and is_warm_start to clickhouse
1 parent efcafdf commit 2a32386

File tree

8 files changed

+72
-5
lines changed

8 files changed

+72
-5
lines changed

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import { Logger, type LogLevel } from "@trigger.dev/core/logger";
2222
import { tryCatch } from "@trigger.dev/core/utils";
2323
import { parsePacketAsJson } from "@trigger.dev/core/v3/utils/ioSerialization";
2424
import { unsafeExtractIdempotencyKeyScope, unsafeExtractIdempotencyKeyUser } from "@trigger.dev/core/v3/serverOnly";
25+
import { RunAnnotations } from "@trigger.dev/core/v3";
2526
import { type TaskRun } from "@trigger.dev/database";
2627
import { nanoid } from "nanoid";
2728
import EventEmitter from "node:events";
@@ -866,6 +867,8 @@ export class RunsReplicationService {
866867
? calculateErrorFingerprint(run.error)
867868
: '';
868869

870+
const annotations = this.#parseAnnotations(run.annotations);
871+
869872
// Return array matching TASK_RUN_COLUMNS order
870873
return [
871874
run.runtimeEnvironmentId, // environment_id
@@ -916,9 +919,16 @@ export class RunsReplicationService {
916919
run.bulkActionGroupIds ?? [], // bulk_action_group_ids
917920
run.masterQueue ?? "", // worker_queue
918921
run.maxDurationInSeconds ?? null, // max_duration_in_seconds
922+
annotations?.triggerSource ?? "", // trigger_source
923+
annotations?.rootTriggerSource ?? "", // root_trigger_source
924+
run.isWarmStart ?? null, // is_warm_start
919925
];
920926
}
921927

928+
#parseAnnotations(annotations: unknown) {
929+
return RunAnnotations.safeParse(annotations).data;
930+
}
931+
922932
async #preparePayloadInsert(run: TaskRun, _version: bigint): Promise<PayloadInsertArray> {
923933
const payload = await this.#prepareJson(run.payload, run.payloadType);
924934

apps/webapp/test/runsReplicationService.part1.test.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,12 @@ describe("RunsReplicationService (part 1/2)", () => {
8686
organizationId: organization.id,
8787
environmentType: "DEVELOPMENT",
8888
engine: "V2",
89+
annotations: {
90+
triggerSource: "api",
91+
triggerAction: "trigger",
92+
rootTriggerSource: "dashboard",
93+
},
94+
isWarmStart: true,
8995
},
9096
});
9197

@@ -111,6 +117,9 @@ describe("RunsReplicationService (part 1/2)", () => {
111117
organization_id: organization.id,
112118
environment_type: "DEVELOPMENT",
113119
engine: "V2",
120+
trigger_source: "api",
121+
root_trigger_source: "dashboard",
122+
is_warm_start: true,
114123
})
115124
);
116125

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
-- +goose Up
2+
ALTER TABLE trigger_dev.task_runs_v2
3+
ADD COLUMN trigger_source LowCardinality(String) DEFAULT '';
4+
5+
ALTER TABLE trigger_dev.task_runs_v2
6+
ADD COLUMN root_trigger_source LowCardinality(String) DEFAULT '';
7+
8+
ALTER TABLE trigger_dev.task_runs_v2
9+
ADD COLUMN is_warm_start Nullable(UInt8) DEFAULT NULL;
10+
11+
-- +goose Down
12+
ALTER TABLE trigger_dev.task_runs_v2
13+
DROP COLUMN trigger_source;
14+
15+
ALTER TABLE trigger_dev.task_runs_v2
16+
DROP COLUMN root_trigger_source;
17+
18+
ALTER TABLE trigger_dev.task_runs_v2
19+
DROP COLUMN is_warm_start;

internal-packages/clickhouse/src/taskRuns.test.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@ describe("Task Runs V2", () => {
8282
["bulk_action_group_id_1234", "bulk_action_group_id_1235"], // bulk_action_group_ids
8383
"", // worker_queue
8484
null, // max_duration_in_seconds
85+
"", // trigger_source
86+
"", // root_trigger_source
87+
null, // is_warm_start
8588
];
8689

8790
const [insertError, insertResult] = await insert([taskRunData]);
@@ -210,6 +213,9 @@ describe("Task Runs V2", () => {
210213
[], // bulk_action_group_ids
211214
"", // worker_queue
212215
null, // max_duration_in_seconds
216+
"", // trigger_source
217+
"", // root_trigger_source
218+
null, // is_warm_start
213219
];
214220

215221
const run2: TaskRunInsertArray = [
@@ -261,6 +267,9 @@ describe("Task Runs V2", () => {
261267
[], // bulk_action_group_ids
262268
"", // worker_queue
263269
null, // max_duration_in_seconds
270+
"", // trigger_source
271+
"", // root_trigger_source
272+
null, // is_warm_start
264273
];
265274

266275
const [insertError, insertResult] = await insert([run1, run2]);
@@ -359,6 +368,9 @@ describe("Task Runs V2", () => {
359368
[], // bulk_action_group_ids
360369
"", // worker_queue
361370
null, // max_duration_in_seconds
371+
"", // trigger_source
372+
"", // root_trigger_source
373+
null, // is_warm_start
362374
];
363375

364376
const [insertError, insertResult] = await insert([taskRun]);

internal-packages/clickhouse/src/taskRuns.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ export const TaskRunV2 = z.object({
4949
bulk_action_group_ids: z.array(z.string()).default([]),
5050
worker_queue: z.string().default(""),
5151
max_duration_in_seconds: z.number().int().nullish(),
52+
trigger_source: z.string().default(""),
53+
root_trigger_source: z.string().default(""),
54+
is_warm_start: z.boolean().nullish(),
5255
_version: z.string(),
5356
_is_deleted: z.number().int().default(0),
5457
});
@@ -105,6 +108,9 @@ export const TASK_RUN_COLUMNS = [
105108
"bulk_action_group_ids",
106109
"worker_queue",
107110
"max_duration_in_seconds",
111+
"trigger_source",
112+
"root_trigger_source",
113+
"is_warm_start",
108114
] as const;
109115

110116
export type TaskRunColumnName = (typeof TASK_RUN_COLUMNS)[number];
@@ -168,6 +174,9 @@ export type TaskRunFieldTypes = {
168174
bulk_action_group_ids: string[];
169175
worker_queue: string;
170176
max_duration_in_seconds: number | null;
177+
trigger_source: string;
178+
root_trigger_source: string;
179+
is_warm_start: boolean | null;
171180
};
172181

173182
/**
@@ -302,6 +311,9 @@ export type TaskRunInsertArray = [
302311
bulk_action_group_ids: string[],
303312
worker_queue: string,
304313
max_duration_in_seconds: number | null,
314+
trigger_source: string,
315+
root_trigger_source: string,
316+
is_warm_start: boolean | null,
305317
];
306318

307319
/**
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- AlterTable
2+
ALTER TABLE "public"."TaskRun" ADD COLUMN "isWarmStart" BOOLEAN;

internal-packages/database/prisma/schema.prisma

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -537,13 +537,13 @@ model BackgroundWorkerFile {
537537
}
538538

539539
model Prompt {
540-
id String @id @default(cuid())
541-
friendlyId String @unique @map("friendly_id")
540+
id String @id @default(cuid())
541+
friendlyId String @unique @map("friendly_id")
542542
slug String
543543
description String?
544544
type String @default("text") // "text" | "chat"
545545

546-
organization Organization @relation(fields: [organizationId], references: [id], onDelete: Cascade, onUpdate: Cascade)
546+
organization Organization @relation(fields: [organizationId], references: [id], onDelete: Cascade, onUpdate: Cascade)
547547
organizationId String
548548

549549
project Project @relation(fields: [projectId], references: [id], onDelete: Cascade, onUpdate: Cascade)
@@ -558,7 +558,7 @@ model Prompt {
558558
defaultModel String?
559559
defaultConfig Json?
560560

561-
tags String[] @default([])
561+
tags String[] @default([])
562562
archivedAt DateTime?
563563

564564
createdAt DateTime @default(now())
@@ -840,6 +840,9 @@ model TaskRun {
840840
/// Structured annotations: triggerSource, triggerAction, rootTriggerSource, rootScheduleId
841841
annotations Json?
842842

843+
/// Whether the latest attempt was a warm start. Null until first attempt starts.
844+
isWarmStart Boolean?
845+
843846
/// Run output
844847
output String?
845848
outputType String @default("application/json")
@@ -857,7 +860,6 @@ model TaskRun {
857860
/// Store the stream keys that are being used by the run
858861
realtimeStreams String[] @default([])
859862

860-
861863
@@unique([oneTimeUseToken])
862864
@@unique([runtimeEnvironmentId, taskIdentifier, idempotencyKey])
863865
// Finding child runs

internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,7 @@ export class RunAttemptSystem {
402402
status: "EXECUTING",
403403
attemptNumber: nextAttemptNumber,
404404
executedAt: taskRun.attemptNumber === null ? new Date() : undefined,
405+
isWarmStart: isWarmStart ?? false,
405406
},
406407
select: {
407408
id: true,

0 commit comments

Comments
 (0)