Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions apps/webapp/app/services/runsReplicationService.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { Logger, type LogLevel } from "@trigger.dev/core/logger";
import { tryCatch } from "@trigger.dev/core/utils";
import { parsePacketAsJson } from "@trigger.dev/core/v3/utils/ioSerialization";
import { unsafeExtractIdempotencyKeyScope, unsafeExtractIdempotencyKeyUser } from "@trigger.dev/core/v3/serverOnly";
import { RunAnnotations } from "@trigger.dev/core/v3";
import { type TaskRun } from "@trigger.dev/database";
import { nanoid } from "nanoid";
import EventEmitter from "node:events";
Expand Down Expand Up @@ -866,6 +867,8 @@ export class RunsReplicationService {
? calculateErrorFingerprint(run.error)
: '';

const annotations = this.#parseAnnotations(run.annotations);

// Return array matching TASK_RUN_COLUMNS order
return [
run.runtimeEnvironmentId, // environment_id
Expand Down Expand Up @@ -916,9 +919,16 @@ export class RunsReplicationService {
run.bulkActionGroupIds ?? [], // bulk_action_group_ids
run.masterQueue ?? "", // worker_queue
run.maxDurationInSeconds ?? null, // max_duration_in_seconds
annotations?.triggerSource ?? "", // trigger_source
annotations?.rootTriggerSource ?? "", // root_trigger_source
run.isWarmStart ?? null, // is_warm_start
];
}

#parseAnnotations(annotations: unknown) {
return RunAnnotations.safeParse(annotations).data;
}

async #preparePayloadInsert(run: TaskRun, _version: bigint): Promise<PayloadInsertArray> {
const payload = await this.#prepareJson(run.payload, run.payloadType);

Expand Down
9 changes: 9 additions & 0 deletions apps/webapp/test/runsReplicationService.part1.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ describe("RunsReplicationService (part 1/2)", () => {
organizationId: organization.id,
environmentType: "DEVELOPMENT",
engine: "V2",
annotations: {
triggerSource: "api",
triggerAction: "trigger",
rootTriggerSource: "dashboard",
},
isWarmStart: true,
},
});

Expand All @@ -111,6 +117,9 @@ describe("RunsReplicationService (part 1/2)", () => {
organization_id: organization.id,
environment_type: "DEVELOPMENT",
engine: "V2",
trigger_source: "api",
root_trigger_source: "dashboard",
is_warm_start: 1,
})
);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- +goose Up
ALTER TABLE trigger_dev.task_runs_v2
ADD COLUMN trigger_source LowCardinality(String) DEFAULT '';

ALTER TABLE trigger_dev.task_runs_v2
ADD COLUMN root_trigger_source LowCardinality(String) DEFAULT '';

ALTER TABLE trigger_dev.task_runs_v2
ADD COLUMN is_warm_start Nullable(UInt8) DEFAULT NULL;

-- +goose Down
ALTER TABLE trigger_dev.task_runs_v2
DROP COLUMN trigger_source;

ALTER TABLE trigger_dev.task_runs_v2
DROP COLUMN root_trigger_source;

ALTER TABLE trigger_dev.task_runs_v2
DROP COLUMN is_warm_start;
12 changes: 12 additions & 0 deletions internal-packages/clickhouse/src/taskRuns.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ describe("Task Runs V2", () => {
["bulk_action_group_id_1234", "bulk_action_group_id_1235"], // bulk_action_group_ids
"", // worker_queue
null, // max_duration_in_seconds
"", // trigger_source
"", // root_trigger_source
null, // is_warm_start
];

const [insertError, insertResult] = await insert([taskRunData]);
Expand Down Expand Up @@ -210,6 +213,9 @@ describe("Task Runs V2", () => {
[], // bulk_action_group_ids
"", // worker_queue
null, // max_duration_in_seconds
"", // trigger_source
"", // root_trigger_source
null, // is_warm_start
];

const run2: TaskRunInsertArray = [
Expand Down Expand Up @@ -261,6 +267,9 @@ describe("Task Runs V2", () => {
[], // bulk_action_group_ids
"", // worker_queue
null, // max_duration_in_seconds
"", // trigger_source
"", // root_trigger_source
null, // is_warm_start
];

const [insertError, insertResult] = await insert([run1, run2]);
Expand Down Expand Up @@ -359,6 +368,9 @@ describe("Task Runs V2", () => {
[], // bulk_action_group_ids
"", // worker_queue
null, // max_duration_in_seconds
"", // trigger_source
"", // root_trigger_source
null, // is_warm_start
];

const [insertError, insertResult] = await insert([taskRun]);
Expand Down
12 changes: 12 additions & 0 deletions internal-packages/clickhouse/src/taskRuns.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ export const TaskRunV2 = z.object({
bulk_action_group_ids: z.array(z.string()).default([]),
worker_queue: z.string().default(""),
max_duration_in_seconds: z.number().int().nullish(),
trigger_source: z.string().default(""),
root_trigger_source: z.string().default(""),
is_warm_start: z.boolean().nullish(),
_version: z.string(),
_is_deleted: z.number().int().default(0),
});
Expand Down Expand Up @@ -105,6 +108,9 @@ export const TASK_RUN_COLUMNS = [
"bulk_action_group_ids",
"worker_queue",
"max_duration_in_seconds",
"trigger_source",
"root_trigger_source",
"is_warm_start",
] as const;

export type TaskRunColumnName = (typeof TASK_RUN_COLUMNS)[number];
Expand Down Expand Up @@ -168,6 +174,9 @@ export type TaskRunFieldTypes = {
bulk_action_group_ids: string[];
worker_queue: string;
max_duration_in_seconds: number | null;
trigger_source: string;
root_trigger_source: string;
is_warm_start: boolean | null;
};

/**
Expand Down Expand Up @@ -302,6 +311,9 @@ export type TaskRunInsertArray = [
bulk_action_group_ids: string[],
worker_queue: string,
max_duration_in_seconds: number | null,
trigger_source: string,
root_trigger_source: string,
is_warm_start: boolean | null,
];

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "public"."TaskRun" ADD COLUMN "isWarmStart" BOOLEAN;
12 changes: 7 additions & 5 deletions internal-packages/database/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -537,13 +537,13 @@ model BackgroundWorkerFile {
}

model Prompt {
id String @id @default(cuid())
friendlyId String @unique @map("friendly_id")
id String @id @default(cuid())
friendlyId String @unique @map("friendly_id")
slug String
description String?
type String @default("text") // "text" | "chat"

organization Organization @relation(fields: [organizationId], references: [id], onDelete: Cascade, onUpdate: Cascade)
organization Organization @relation(fields: [organizationId], references: [id], onDelete: Cascade, onUpdate: Cascade)
organizationId String

project Project @relation(fields: [projectId], references: [id], onDelete: Cascade, onUpdate: Cascade)
Expand All @@ -558,7 +558,7 @@ model Prompt {
defaultModel String?
defaultConfig Json?

tags String[] @default([])
tags String[] @default([])
archivedAt DateTime?

createdAt DateTime @default(now())
Expand Down Expand Up @@ -840,6 +840,9 @@ model TaskRun {
/// Structured annotations: triggerSource, triggerAction, rootTriggerSource, rootScheduleId
annotations Json?

/// Whether the latest attempt was a warm start. Null until first attempt starts.
isWarmStart Boolean?

/// Run output
output String?
outputType String @default("application/json")
Expand All @@ -857,7 +860,6 @@ model TaskRun {
/// Store the stream keys that are being used by the run
realtimeStreams String[] @default([])


@@unique([oneTimeUseToken])
@@unique([runtimeEnvironmentId, taskIdentifier, idempotencyKey])
// Finding child runs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ export class RunAttemptSystem {
status: "EXECUTING",
attemptNumber: nextAttemptNumber,
executedAt: taskRun.attemptNumber === null ? new Date() : undefined,
isWarmStart: isWarmStart ?? false,
},
select: {
id: true,
Expand Down
Loading