-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathidempotencyKeys.server.ts
More file actions
124 lines (109 loc) · 4.8 KB
/
idempotencyKeys.server.ts
File metadata and controls
124 lines (109 loc) · 4.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import { RunId } from "@trigger.dev/core/v3/isomorphic";
import type { PrismaClientOrTransaction, TaskRun } from "@trigger.dev/database";
import { logger } from "~/services/logger.server";
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
import type { RunEngine } from "~/v3/runEngine.server";
import { shouldIdempotencyKeyBeCleared } from "~/v3/taskStatus";
import type { TraceEventConcern, TriggerTaskRequest } from "../types";
export type IdempotencyKeyConcernResult =
| { isCached: true; run: TaskRun }
| { isCached: false; idempotencyKey?: string; idempotencyKeyExpiresAt?: Date };
export class IdempotencyKeyConcern {
constructor(
private readonly prisma: PrismaClientOrTransaction,
private readonly engine: RunEngine,
private readonly traceEventConcern: TraceEventConcern
) {}
async handleTriggerRequest(request: TriggerTaskRequest): Promise<IdempotencyKeyConcernResult> {
const idempotencyKey = request.options?.idempotencyKey ?? request.body.options?.idempotencyKey;
const idempotencyKeyExpiresAt =
request.options?.idempotencyKeyExpiresAt ??
resolveIdempotencyKeyTTL(request.body.options?.idempotencyKeyTTL) ??
new Date(Date.now() + 24 * 60 * 60 * 1000 * 30); // 30 days
if (!idempotencyKey) {
return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt };
}
const existingRun = idempotencyKey
? await this.prisma.taskRun.findFirst({
where: {
runtimeEnvironmentId: request.environment.id,
idempotencyKey,
taskIdentifier: request.taskId,
},
include: {
associatedWaitpoint: true,
},
})
: undefined;
if (existingRun) {
// The idempotency key has expired
if (existingRun.idempotencyKeyExpiresAt && existingRun.idempotencyKeyExpiresAt < new Date()) {
logger.debug("[TriggerTaskService][call] Idempotency key has expired", {
idempotencyKey: request.options?.idempotencyKey,
run: existingRun,
});
// Update the existing run to remove the idempotency key
await this.prisma.taskRun.updateMany({
where: { id: existingRun.id, idempotencyKey },
data: { idempotencyKey: null, idempotencyKeyExpiresAt: null },
});
return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt };
}
// If the existing run failed or was expired, we clear the key and do a new run
if (shouldIdempotencyKeyBeCleared(existingRun.status)) {
logger.debug("[TriggerTaskService][call] Idempotency key should be cleared", {
idempotencyKey: request.options?.idempotencyKey,
runStatus: existingRun.status,
runId: existingRun.id,
});
// Update the existing run to remove the idempotency key
await this.prisma.taskRun.updateMany({
where: { id: existingRun.id, idempotencyKey },
data: { idempotencyKey: null, idempotencyKeyExpiresAt: null },
});
return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt };
}
// We have an idempotent run, so we return it
const associatedWaitpoint = existingRun.associatedWaitpoint;
const parentRunId = request.body.options?.parentRunId;
const resumeParentOnCompletion = request.body.options?.resumeParentOnCompletion;
//We're using `andWait` so we need to block the parent run with a waitpoint
if (associatedWaitpoint && resumeParentOnCompletion && parentRunId) {
await this.traceEventConcern.traceIdempotentRun(
request,
{
existingRun,
idempotencyKey,
incomplete: associatedWaitpoint.status === "PENDING",
isError: associatedWaitpoint.outputIsError,
},
async (event) => {
const spanId =
request.options?.parentAsLinkType === "replay"
? event.spanId
: event.traceparent?.spanId
? `${event.traceparent.spanId}:${event.spanId}`
: event.spanId;
//block run with waitpoint
await this.engine.blockRunWithWaitpoint({
runId: RunId.fromFriendlyId(parentRunId),
waitpoints: associatedWaitpoint.id,
spanIdToComplete: spanId,
batch: request.options?.batchId
? {
id: request.options.batchId,
index: request.options.batchIndex ?? 0,
}
: undefined,
projectId: request.environment.projectId,
organizationId: request.environment.organizationId,
tx: this.prisma,
});
}
);
}
return { isCached: true, run: existingRun };
}
return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt };
}
}