Skip to content

Commit b7fa420

Browse files
committed
refactor: consolidate compute gateway clients into shared @internal/compute package
1 parent 2219d11 commit b7fa420

File tree

8 files changed

+253
-194
lines changed

8 files changed

+253
-194
lines changed

apps/supervisor/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
"dependencies": {
1616
"@aws-sdk/client-ecr": "^3.839.0",
1717
"@kubernetes/client-node": "^1.0.0",
18+
"@internal/compute": "workspace:*",
1819
"@trigger.dev/core": "workspace:*",
1920
"dockerode": "^4.0.6",
2021
"prom-client": "^15.1.0",

apps/supervisor/src/workloadManager/compute.ts

Lines changed: 45 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
type WorkloadManagerCreateOptions,
77
type WorkloadManagerOptions,
88
} from "./types.js";
9+
import { ComputeClient } from "@internal/compute";
910
import { env } from "../env.js";
1011
import { getRunnerId } from "../util.js";
1112
import { buildOtlpTracePayload } from "../otlpPayload.js";
@@ -20,13 +21,20 @@ type ComputeWorkloadManagerOptions = WorkloadManagerOptions & {
2021

2122
export class ComputeWorkloadManager implements WorkloadManager {
2223
private readonly logger = new SimpleStructuredLogger("compute-workload-manager");
24+
private readonly compute: ComputeClient;
2325

2426
constructor(private opts: ComputeWorkloadManagerOptions) {
2527
if (opts.workloadApiDomain) {
2628
this.logger.warn("⚠️ Custom workload API domain", {
2729
domain: opts.workloadApiDomain,
2830
});
2931
}
32+
33+
this.compute = new ComputeClient({
34+
gatewayUrl: opts.gatewayUrl,
35+
authToken: opts.gatewayAuthToken,
36+
timeoutMs: opts.gatewayTimeoutMs,
37+
});
3038
}
3139

3240
async create(opts: WorkloadManagerCreateOptions) {
@@ -73,19 +81,9 @@ export class ComputeWorkloadManager implements WorkloadManager {
7381
Object.assign(envVars, this.opts.additionalEnvVars);
7482
}
7583

76-
const headers: Record<string, string> = {
77-
"Content-Type": "application/json",
78-
};
79-
80-
if (this.opts.gatewayAuthToken) {
81-
headers["Authorization"] = `Bearer ${this.opts.gatewayAuthToken}`;
82-
}
83-
84-
// Strip image digest — resolve by tag, not digest
84+
// Strip image digest - resolve by tag, not digest
8585
const imageRef = opts.image.split("@")[0]!;
8686

87-
const url = `${this.opts.gatewayUrl}/api/instances`;
88-
8987
// Wide event: single canonical log line emitted in finally
9088
const event: Record<string, unknown> = {
9189
// High-cardinality identifiers
@@ -105,58 +103,34 @@ export class ComputeWorkloadManager implements WorkloadManager {
105103
warmStartCheckMs: opts.warmStartCheckMs,
106104
// Request
107105
image: imageRef,
108-
url,
109106
};
110107

111108
const startMs = performance.now();
112109

113110
try {
114-
const [fetchError, response] = await tryCatch(
115-
fetch(url, {
116-
method: "POST",
117-
headers,
118-
signal: AbortSignal.timeout(this.opts.gatewayTimeoutMs),
119-
body: JSON.stringify({
120-
name: runnerId,
121-
image: imageRef,
122-
env: envVars,
123-
cpu: opts.machine.cpu,
124-
memory_gb: opts.machine.memory,
125-
metadata: {
126-
runId: opts.runFriendlyId,
127-
envId: opts.envId,
128-
envType: opts.envType,
129-
orgId: opts.orgId,
130-
projectId: opts.projectId,
131-
deploymentVersion: opts.deploymentVersion,
132-
machine: opts.machine.name,
133-
},
134-
}),
111+
const [error, data] = await tryCatch(
112+
this.compute.instances.create({
113+
name: runnerId,
114+
image: imageRef,
115+
env: envVars,
116+
cpu: opts.machine.cpu,
117+
memory_gb: opts.machine.memory,
118+
metadata: {
119+
runId: opts.runFriendlyId,
120+
envId: opts.envId,
121+
envType: opts.envType,
122+
orgId: opts.orgId,
123+
projectId: opts.projectId,
124+
deploymentVersion: opts.deploymentVersion,
125+
machine: opts.machine.name,
126+
},
135127
})
136128
);
137129

138-
if (fetchError) {
139-
event.error = fetchError instanceof Error ? fetchError.message : String(fetchError);
130+
if (error) {
131+
event.error = error instanceof Error ? error.message : String(error);
140132
event.errorType =
141-
fetchError instanceof DOMException && fetchError.name === "TimeoutError"
142-
? "timeout"
143-
: "fetch";
144-
return;
145-
}
146-
147-
event.status = response.status;
148-
149-
if (!response.ok) {
150-
const [bodyError, body] = await tryCatch(response.text());
151-
event.responseBody = bodyError ? undefined : body;
152-
return;
153-
}
154-
155-
const [parseError, data] = await tryCatch(response.json());
156-
157-
if (parseError) {
158-
event.error = parseError instanceof Error ? parseError.message : String(parseError);
159-
event.errorType = "parse";
133+
error instanceof DOMException && error.name === "TimeoutError" ? "timeout" : "fetch";
160134
return;
161135
}
162136

@@ -176,34 +150,17 @@ export class ComputeWorkloadManager implements WorkloadManager {
176150
}
177151
}
178152

179-
private get authHeaders(): Record<string, string> {
180-
const headers: Record<string, string> = {
181-
"Content-Type": "application/json",
182-
};
183-
if (this.opts.gatewayAuthToken) {
184-
headers["Authorization"] = `Bearer ${this.opts.gatewayAuthToken}`;
185-
}
186-
return headers;
187-
}
188-
189153
async snapshot(opts: {
190154
runnerId: string;
191155
callbackUrl: string;
192156
metadata: Record<string, string>;
193157
}): Promise<boolean> {
194-
const url = `${this.opts.gatewayUrl}/api/instances/${opts.runnerId}/snapshot`;
195-
196-
const [error, response] = await tryCatch(
197-
fetch(url, {
198-
method: "POST",
199-
headers: this.authHeaders,
200-
signal: AbortSignal.timeout(this.opts.gatewayTimeoutMs),
201-
body: JSON.stringify({
202-
callback: {
203-
url: opts.callbackUrl,
204-
metadata: opts.metadata,
205-
},
206-
}),
158+
const [error] = await tryCatch(
159+
this.compute.instances.snapshot(opts.runnerId, {
160+
callback: {
161+
url: opts.callbackUrl,
162+
metadata: opts.metadata,
163+
},
207164
})
208165
);
209166

@@ -215,28 +172,12 @@ export class ComputeWorkloadManager implements WorkloadManager {
215172
return false;
216173
}
217174

218-
if (response.status !== 202) {
219-
this.logger.error("snapshot request rejected", {
220-
runnerId: opts.runnerId,
221-
status: response.status,
222-
});
223-
return false;
224-
}
225-
226175
this.logger.debug("snapshot request accepted", { runnerId: opts.runnerId });
227176
return true;
228177
}
229178

230179
async deleteInstance(runnerId: string): Promise<boolean> {
231-
const url = `${this.opts.gatewayUrl}/api/instances/${runnerId}`;
232-
233-
const [error, response] = await tryCatch(
234-
fetch(url, {
235-
method: "DELETE",
236-
headers: this.authHeaders,
237-
signal: AbortSignal.timeout(this.opts.gatewayTimeoutMs),
238-
})
239-
);
180+
const [error] = await tryCatch(this.compute.instances.delete(runnerId));
240181

241182
if (error) {
242183
this.logger.error("delete instance failed", {
@@ -246,14 +187,6 @@ export class ComputeWorkloadManager implements WorkloadManager {
246187
return false;
247188
}
248189

249-
if (!response.ok) {
250-
this.logger.error("delete instance rejected", {
251-
runnerId,
252-
status: response.status,
253-
});
254-
return false;
255-
}
256-
257190
this.logger.debug("delete instance success", { runnerId });
258191
return true;
259192
}
@@ -329,8 +262,6 @@ export class ComputeWorkloadManager implements WorkloadManager {
329262
projectId?: string;
330263
dequeuedAt?: Date;
331264
}): Promise<boolean> {
332-
const url = `${this.opts.gatewayUrl}/api/snapshots/${opts.snapshotId}/restore`;
333-
334265
const metadata: Record<string, string> = {
335266
TRIGGER_RUNNER_ID: opts.runnerId,
336267
TRIGGER_RUN_ID: opts.runFriendlyId,
@@ -341,23 +272,19 @@ export class ComputeWorkloadManager implements WorkloadManager {
341272
TRIGGER_WORKER_INSTANCE_NAME: env.TRIGGER_WORKER_INSTANCE_NAME,
342273
};
343274

344-
const body = {
345-
name: opts.runnerId,
346-
metadata,
347-
cpu: opts.machine.cpu,
348-
memory_mb: opts.machine.memory * 1024,
349-
};
350-
351-
this.logger.verbose("restore request body", { url, body });
275+
this.logger.verbose("restore request body", {
276+
snapshotId: opts.snapshotId,
277+
runnerId: opts.runnerId,
278+
});
352279

353280
const startMs = performance.now();
354281

355-
const [error, response] = await tryCatch(
356-
fetch(url, {
357-
method: "POST",
358-
headers: this.authHeaders,
359-
signal: AbortSignal.timeout(this.opts.gatewayTimeoutMs),
360-
body: JSON.stringify(body),
282+
const [error] = await tryCatch(
283+
this.compute.snapshots.restore(opts.snapshotId, {
284+
name: opts.runnerId,
285+
metadata,
286+
cpu: opts.machine.cpu,
287+
memory_mb: opts.machine.memory * 1024,
361288
})
362289
);
363290

@@ -373,16 +300,6 @@ export class ComputeWorkloadManager implements WorkloadManager {
373300
return false;
374301
}
375302

376-
if (!response.ok) {
377-
this.logger.error("restore request rejected", {
378-
snapshotId: opts.snapshotId,
379-
runnerId: opts.runnerId,
380-
status: response.status,
381-
durationMs,
382-
});
383-
return false;
384-
}
385-
386303
this.logger.debug("restore request success", {
387304
snapshotId: opts.snapshotId,
388305
runnerId: opts.runnerId,
@@ -448,4 +365,3 @@ export class ComputeWorkloadManager implements WorkloadManager {
448365
sendOtlpTrace(payload);
449366
}
450367
}
451-

apps/webapp/app/v3/services/computeTemplateCreation.server.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { ComputeGatewayClient } from "@internal/compute";
1+
import { ComputeClient } from "@internal/compute";
22
import { env } from "~/env.server";
33
import { logger } from "~/services/logger.server";
44
import type { PrismaClientOrTransaction } from "~/db.server";
@@ -10,11 +10,11 @@ import { FailDeploymentService } from "./failDeployment.server";
1010
type TemplateCreationMode = "required" | "shadow" | "skip";
1111

1212
export class ComputeTemplateCreationService {
13-
private client: ComputeGatewayClient | undefined;
13+
private client: ComputeClient | undefined;
1414

1515
constructor() {
1616
if (env.COMPUTE_GATEWAY_URL) {
17-
this.client = new ComputeGatewayClient({
17+
this.client = new ComputeClient({
1818
gatewayUrl: env.COMPUTE_GATEWAY_URL,
1919
authToken: env.COMPUTE_GATEWAY_AUTH_TOKEN,
2020
timeoutMs: 5 * 60 * 1000, // 5 minutes
@@ -158,7 +158,7 @@ export class ComputeTemplateCreationService {
158158
}
159159

160160
try {
161-
await this.client.createTemplate({
161+
await this.client.templates.create({
162162
image: imageReference,
163163
cpu: 0.5,
164164
memory_mb: 512,

0 commit comments

Comments
 (0)