-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathjob.ts
More file actions
469 lines (429 loc) · 14.4 KB
/
job.ts
File metadata and controls
469 lines (429 loc) · 14.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
import type { JobId, SessionId, TraceId } from "@arcp/core";
import { buildEnvelope } from "@arcp/core/envelope";
import {
CancelledError,
HeartbeatLostError,
InternalError,
InvalidRequestError,
} from "@arcp/core/errors";
import type { Logger } from "@arcp/core/logger";
import type {
JobErrorPayload,
JobResultPayload,
JobStateName,
Lease,
LeaseConstraints,
} from "@arcp/core/messages";
import { newJobId, newMessageId, nowTimestamp } from "@arcp/core/util";
import type { EventSeqSource, JobOptions, JobSend } from "./types.js";
/** Constructor dependency bag for {@link Job}. */
export interface JobDependencies {
readonly options: JobOptions;
readonly send: JobSend;
readonly seq: EventSeqSource;
readonly logger: Logger;
}
// ARCP v1.1 §7-§8 job execution.
//
// State machine: pending → running → {success | error | cancelled | timed_out}.
// All event-bearing envelopes (job.event / job.result / job.error) carry a
// session-scoped `event_seq` stamped by the SessionContext at emit time.
const JOB_TRANSITIONS = {
pending: new Set<JobStateName>([
"running",
"cancelled",
"error",
"timed_out",
]),
running: new Set<JobStateName>([
"success",
"error",
"cancelled",
"timed_out",
]),
success: new Set<JobStateName>(),
error: new Set<JobStateName>(),
cancelled: new Set<JobStateName>(),
timed_out: new Set<JobStateName>(),
} as const satisfies Record<JobStateName, ReadonlySet<JobStateName>>;
const TERMINAL = new Set<JobStateName>([
"success",
"error",
"cancelled",
"timed_out",
]);
/**
* Per-job state machine (§7.3 / §8).
*
* Owns the job's lifecycle, the abort signal exposed to the agent, the
* heartbeat watchdog, and the emission of `job.accepted` /
* `job.event` / `job.result` / `job.error` envelopes. The session
* provides the monotonic `event_seq` source.
*/
export class Job {
public readonly jobId: JobId;
public readonly sessionId: SessionId;
public readonly agent: string;
/** v1.1 §7.5 — resolved version, or null if none was advertised. */
public readonly agentVersion: string | null;
public readonly lease: Lease;
public readonly leaseConstraints: LeaseConstraints | undefined;
/** v1.1 §9.6 — mutable per-currency budget counters. */
public readonly budget: Map<string, number>;
/** v1.1 §9.6 — initial budget for inclusion in `job.accepted`. */
public readonly initialBudget: Map<string, number>;
public readonly parentJobId: JobId | undefined;
public readonly delegateId: string | undefined;
public readonly traceId: TraceId | undefined;
/** Timestamp at which the job was constructed (for §6.6 listing). */
public readonly createdAt: string = nowTimestamp();
/**
* v1.1 §6.6 / §7.6 — the principal that submitted this job. Used to scope
* cross-session observation (`session.list_jobs`, `job.subscribe`). Set by
* the runtime on job creation.
*/
public submitterPrincipal: string | undefined = undefined;
/**
* v1.1 §7.6 — the session that owns the job's event stream (i.e., the
* submitter's session). Subscribers tap this session's event log for
* history replay. Set by the runtime on job creation; typed as `unknown`
* to avoid an import cycle, but in practice is a `SessionContext`.
*/
public owningSession: { state: { id: SessionId | undefined } } | undefined =
undefined;
public state: JobStateName = "pending";
public readonly abortController = new AbortController();
/** v1.1 §8.4 — set true after the first `result_chunk` event is emitted. */
public chunkedResultStarted = false;
/** Track last-emitted remaining per currency for chatty-emit debounce. */
private readonly lastEmittedRemaining = new Map<string, number>();
private missedHeartbeats = 0;
private heartbeatTimer: ReturnType<typeof setTimeout> | null = null;
private readonly missesAllowed: number;
private readonly heartbeatIntervalMs: number;
private readonly send: JobSend;
private readonly seq: EventSeqSource;
public readonly logger: Logger;
public constructor(deps: JobDependencies) {
const { options, send, seq, logger } = deps;
this.send = send;
this.seq = seq;
this.logger = logger;
this.jobId = options.jobId ?? newJobId();
this.sessionId = options.sessionId;
this.agent = options.agent;
this.agentVersion = options.agentVersion ?? null;
this.lease = options.lease;
this.leaseConstraints = options.leaseConstraints;
this.initialBudget = new Map(options.initialBudget);
this.budget = new Map(options.initialBudget);
this.parentJobId = options.parentJobId;
this.delegateId = options.delegateId;
this.traceId = options.traceId;
this.heartbeatIntervalMs = options.heartbeatIntervalSeconds * 1000;
this.missesAllowed = options.missedHeartbeatsAllowed ?? 2;
}
/** Wire-form `agent` string: `name@version` if a version is set, else bare name. */
public get agentRef(): string {
return this.agentVersion === null
? this.agent
: `${this.agent}@${this.agentVersion}`;
}
/**
* v1.1 §9.6: decrement the matching budget counter from a `metric` event
* whose name begins with `cost.` and whose unit matches a budgeted
* currency. Returns the new remaining value, or `null` if no counter is
* affected. Negative values are ignored.
*/
public applyCostMetric(
name: string,
value: number,
unit: string | undefined,
): number | null {
if (!name.startsWith("cost.")) return null;
if (name === "cost.budget.remaining") return null;
if (unit === undefined) return null;
if (!Number.isFinite(value) || value < 0) return null;
const current = this.budget.get(unit);
if (current === undefined) return null;
const next = current - value;
this.budget.set(unit, next);
return next;
}
/**
* Whether to emit a debounced `cost.budget.remaining` metric for `currency`.
* Only emits when the remaining has changed by ≥5% of the initial budget
* since the last emit (or on first emission).
*/
public shouldEmitBudgetRemaining(currency: string): boolean {
const remaining = this.budget.get(currency);
if (remaining === undefined) return false;
const initial = this.initialBudget.get(currency) ?? 0;
const last = this.lastEmittedRemaining.get(currency);
if (last === undefined) {
this.lastEmittedRemaining.set(currency, remaining);
return true;
}
const threshold = initial * 0.05;
if (Math.abs(last - remaining) >= threshold || remaining <= 0) {
this.lastEmittedRemaining.set(currency, remaining);
return true;
}
return false;
}
public get signal(): AbortSignal {
return this.abortController.signal;
}
public get isTerminal(): boolean {
return TERMINAL.has(this.state);
}
public transition(next: JobStateName): void {
if (this.state === next) return;
const allowed = JOB_TRANSITIONS[this.state];
if (!allowed.has(next)) {
throw new InvalidRequestError(
`Illegal job transition: ${this.state} → ${next}`,
{
details: { from: this.state, to: next, jobId: this.jobId },
},
);
}
this.state = next;
}
public startWatchdog(): void {
this.armWatchdog();
}
public markHeartbeat(): void {
this.missedHeartbeats = 0;
this.armWatchdog();
}
/** Cooperatively cancel the job; armed timer will follow per §7.4. */
public cancel(reason: string): void {
if (this.isTerminal) return;
this.abortController.abort(new CancelledError(reason));
}
/** Force-fail the job after a hard kill / grace expiry. */
public abortHard(reason: string): void {
if (this.isTerminal) return;
const err = new InternalError(reason);
this.abortController.abort(err);
this.disarmWatchdog();
this.transition("error");
void this.emitErrorEnvelope({
final_status: "error",
code: "INTERNAL_ERROR",
message: reason,
retryable: true,
});
}
// -------- Outbound envelopes ---------------------------------------
/** Emit `job.accepted`. Does NOT carry `event_seq`. */
public async emitAccepted(): Promise<void> {
const budgetObj: Record<string, number> = {};
let hasBudget = false;
for (const [k, v] of this.initialBudget.entries()) {
budgetObj[k] = v;
hasBudget = true;
}
const env = buildEnvelope({
id: newMessageId(),
type: "job.accepted" as const,
payload: {
job_id: this.jobId,
agent: this.agentRef,
lease: this.lease,
...(this.leaseConstraints === undefined
? {}
: { lease_constraints: this.leaseConstraints }),
...(hasBudget ? { budget: budgetObj } : {}),
accepted_at: this.createdAt,
...(this.parentJobId === undefined
? {}
: { parent_job_id: this.parentJobId }),
...(this.delegateId === undefined
? {}
: { delegate_id: this.delegateId }),
...(this.traceId === undefined ? {} : { trace_id: this.traceId }),
},
optional: {
session_id: this.sessionId,
job_id: this.jobId,
...(this.traceId === undefined ? {} : { trace_id: this.traceId }),
},
});
await this.send(env);
}
/** Mark transition pending → running and emit a `status` event. */
public async emitRunning(): Promise<void> {
this.transition("running");
await this.emitEventKind("status", { phase: "running" });
this.startWatchdog();
}
/**
* Emit a `job.event` with a specific kind and body. Stamps `event_seq`.
*
* v1.1: tracks `chunkedResultStarted` when a `result_chunk` body is
* emitted, so the terminal `job.result` enforcement can reject mixing
* inline result with chunks (§8.4).
*/
public async emitEventKind(kind: string, body: unknown): Promise<void> {
if (this.isTerminal) return;
this.markHeartbeat();
if (kind === "result_chunk") this.chunkedResultStarted = true;
const env = buildEnvelope({
id: newMessageId(),
type: "job.event" as const,
payload: { kind, ts: nowTimestamp(), body },
optional: {
session_id: this.sessionId,
job_id: this.jobId,
event_seq: this.seq.nextEventSeq(),
...(this.traceId === undefined ? {} : { trace_id: this.traceId }),
},
});
await this.send(env);
}
/**
* Emit `job.result` (success terminal). Stamps `event_seq`.
*
* v1.1 §8.4: if `result_chunk` events have been emitted on this job, the
* terminating `job.result` MUST carry `result_id` and MUST NOT carry an
* inline `result` value.
*/
public async emitResult(result: JobResultPayload): Promise<void> {
if (this.isTerminal) return;
if (this.chunkedResultStarted) {
if (result.result_id === undefined) {
throw new InvalidRequestError(
"job.result MUST carry result_id when result_chunk events were emitted",
);
}
if (result.result !== undefined) {
throw new InvalidRequestError(
"job.result MUST NOT carry inline `result` when result_chunk events were emitted",
);
}
}
this.transition("success");
this.disarmWatchdog();
const env = buildEnvelope({
id: newMessageId(),
type: "job.result" as const,
payload: result,
optional: {
session_id: this.sessionId,
job_id: this.jobId,
event_seq: this.seq.nextEventSeq(),
...(this.traceId === undefined ? {} : { trace_id: this.traceId }),
},
});
await this.send(env);
}
/** Emit `job.error` (error / cancelled / timed_out terminal). */
public async emitErrorEnvelope(payload: JobErrorPayload): Promise<void> {
if (this.isTerminal) return;
this.disarmWatchdog();
const target: JobStateName = payload.final_status;
this.transition(target);
const env = buildEnvelope({
id: newMessageId(),
type: "job.error" as const,
payload,
optional: {
session_id: this.sessionId,
job_id: this.jobId,
event_seq: this.seq.nextEventSeq(),
...(this.traceId === undefined ? {} : { trace_id: this.traceId }),
},
});
try {
await this.send(env);
} catch (error) {
this.logger.error(
{ err: error, jobId: this.jobId },
"failed to emit job.error envelope",
);
}
}
// -------- Watchdog ----------------------------------------------
private armWatchdog(): void {
this.disarmWatchdog();
if (this.isTerminal) return;
this.heartbeatTimer = setTimeout(() => {
this.onWatchdogFire();
}, this.heartbeatIntervalMs);
this.heartbeatTimer.unref();
}
private disarmWatchdog(): void {
if (this.heartbeatTimer !== null) {
clearTimeout(this.heartbeatTimer);
this.heartbeatTimer = null;
}
}
private onWatchdogFire(): void {
if (this.isTerminal) return;
this.missedHeartbeats += 1;
if (this.missedHeartbeats >= this.missesAllowed) {
const err = new HeartbeatLostError(
`Job ${this.jobId} failed: ${this.missedHeartbeats} consecutive missed heartbeats`,
);
this.abortController.abort(err);
void this.emitErrorEnvelope({
final_status: "error",
code: "HEARTBEAT_LOST",
message: err.message,
retryable: false,
});
return;
}
this.armWatchdog();
}
}
/**
* Tracks all live jobs for a session. Owned by {@link SessionContext}.
*/
export class JobManager {
private readonly jobs = new Map<string, Job>();
public register(job: Job): void {
this.jobs.set(job.jobId, job);
}
public get(jobId: string): Job | undefined {
return this.jobs.get(jobId);
}
public has(jobId: string): boolean {
return this.jobs.has(jobId);
}
public retire(jobId: string): void {
this.jobs.delete(jobId);
}
public list(): readonly Job[] {
return [...this.jobs.values()];
}
/** Cancel every active job. Returns the count of jobs that were cancelled. */
public cancelAll(reason: string): number {
let count = 0;
for (const job of this.jobs.values()) {
if (!job.isTerminal) {
job.cancel(reason);
count += 1;
}
}
return count;
}
/** Reject every still-running handler with INTERNAL during shutdown. */
public abortAll(reason: string): void {
for (const job of this.jobs.values()) {
if (!job.isTerminal) {
job.abortHard(reason);
}
}
}
}
export { makeJobContext } from "./job-context.js";
// Re-export commonly used error types so consumers can import in one place.
export {
TimeoutError,
CancelledError,
HeartbeatLostError,
InternalError,
} from "@arcp/core/errors";