-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathtraceEvents.server.ts
More file actions
125 lines (119 loc) · 3.94 KB
/
traceEvents.server.ts
File metadata and controls
125 lines (119 loc) · 3.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import { SemanticInternalAttributes } from "@trigger.dev/core/v3/semanticInternalAttributes";
import { TaskRun } from "@trigger.dev/database";
import { IEventRepository } from "~/v3/eventRepository/eventRepository.types";
import { getEventRepository } from "~/v3/eventRepository/index.server";
import { TracedEventSpan, TraceEventConcern, TriggerTaskRequest } from "../types";
export class DefaultTraceEventsConcern implements TraceEventConcern {
async #getEventRepository(
request: TriggerTaskRequest,
parentStore: string | undefined
): Promise<{ repository: IEventRepository; store: string }> {
return await getEventRepository(
request.environment.organization.featureFlags as Record<string, unknown>,
parentStore
);
}
async traceRun<T>(
request: TriggerTaskRequest,
parentStore: string | undefined,
callback: (span: TracedEventSpan, store: string) => Promise<T>
): Promise<T> {
const { repository, store } = await this.#getEventRepository(request, parentStore);
return await repository.traceEvent(
request.taskId,
{
context: request.options?.traceContext,
spanParentAsLink: request.options?.spanParentAsLink,
kind: "SERVER",
environment: request.environment,
taskSlug: request.taskId,
attributes: {
properties: {},
style: {
icon: request.options?.customIcon ?? "task",
},
},
incomplete: true,
immediate: true,
startTime: request.options?.overrideCreatedAt
? BigInt(request.options.overrideCreatedAt.getTime()) * BigInt(1000000)
: undefined,
},
async (event, traceContext, traceparent) => {
return await callback(
{
traceId: event.traceId,
spanId: event.spanId,
traceContext,
traceparent,
setAttribute: (key, value) => event.setAttribute(key as any, value),
failWithError: event.failWithError.bind(event),
},
store
);
}
);
}
async traceIdempotentRun<T>(
request: TriggerTaskRequest,
parentStore: string | undefined,
options: {
existingRun: TaskRun;
idempotencyKey: string;
incomplete: boolean;
isError: boolean;
},
callback: (span: TracedEventSpan, store: string) => Promise<T>
): Promise<T> {
const { existingRun, idempotencyKey, incomplete, isError } = options;
const { repository, store } = await this.#getEventRepository(request, parentStore);
return await repository.traceEvent(
`${request.taskId} (cached)`,
{
context: request.options?.traceContext,
spanParentAsLink: request.options?.spanParentAsLink,
kind: "SERVER",
environment: request.environment,
taskSlug: request.taskId,
attributes: {
properties: {
[SemanticInternalAttributes.ORIGINAL_RUN_ID]: existingRun.friendlyId,
},
style: {
icon: "task-cached",
},
runId: existingRun.friendlyId,
},
incomplete,
isError,
immediate: true,
},
async (event, traceContext, traceparent) => {
//log a message
await repository.recordEvent(
`There's an existing run for idempotencyKey: ${idempotencyKey}`,
{
taskSlug: request.taskId,
environment: request.environment,
attributes: {
runId: existingRun.friendlyId,
},
context: request.options?.traceContext,
parentId: event.spanId,
}
);
return await callback(
{
traceId: event.traceId,
spanId: event.spanId,
traceContext,
traceparent,
setAttribute: (key, value) => event.setAttribute(key as any, value),
failWithError: event.failWithError.bind(event),
},
store
);
}
);
}
}