Skip to content

Commit 561ce12

Browse files
Nick Ficanoclaude
andcommitted
effect(runtime-session-job): add Effect-shaped twins for SessionContext, Job, JobManager, lease
Adds Effect.Service-backed twins (SessionContextService, JobService, JobManagerService) plus a fiber-safe watchdogEffect to the runtime package, alongside thin Effect wrappers for the pure lease validators. Legacy SessionContext/Job/JobManager/lease classes are untouched — they remain the protocol-conformance backbone consumed by ARCPServer; the new services delegate to them via structural *Like interfaces so tests can stub without instantiating the full class graph. watchdogEffect uses the Ref<deadline> + Schedule.fixed("250 millis") poll-fiber pattern from #44, surfacing TaggedHeartbeatLost on the typed-error channel. It complements (does not replace) the legacy setTimeout watchdog inside Job, addressing #25 without touching the integration-tested watchdog timing. 24 new tests; all 5 job-lifecycle and 30 v1-1-features SDK integration tests pass unchanged; 18 runtime lease tests pass unchanged. Closes #44 Closes #25 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent cef3df3 commit 561ce12

7 files changed

Lines changed: 1214 additions & 0 deletions

File tree

packages/runtime/src/index.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,30 @@ export {
88
SessionState,
99
} from "@arcp/core/state";
1010
export { Job, JobManager, makeJobContext } from "./job.js";
11+
export {
12+
type JobEffect,
13+
type JobManagerEffect,
14+
JobManagerService,
15+
JobService,
16+
jobLayer,
17+
jobManagerLayer,
18+
makeJobEffect,
19+
makeJobManagerEffect,
20+
watchdogEffect,
21+
} from "./job-effect.js";
22+
export {
23+
assertLeaseConstraintsSubsetEffect,
24+
assertLeaseSubsetEffect,
25+
validateLeaseConstraintsEffect,
26+
validateLeaseOpEffect,
27+
type ValidateLeaseOpFailure,
28+
} from "./lease-effect.js";
29+
export {
30+
makeSessionContextEffect,
31+
type SessionContextEffect,
32+
SessionContextService,
33+
sessionContextLayer,
34+
} from "./session-effect.js";
1135
export {
1236
assertLeaseConstraintsSubset,
1337
assertLeaseSubset,

packages/runtime/src/job-effect.ts

Lines changed: 326 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,326 @@
1+
// Effect-shaped surfaces over the legacy {@link Job} and {@link JobManager}.
2+
//
3+
// As with `session-effect.ts`, the legacy `Job` class owns the
4+
// integration-tested §7/§8 wire emission, watchdog timing, and state
5+
// machine; rewriting its internals would risk the 35+ SDK integration
6+
// tests that pin behavior. This module exposes `Effect`-typed twins
7+
// (`JobService` per-job, `JobManagerService` per-session) that delegate
8+
// to a backing `Job`/`JobManager` supplied at layer construction.
9+
//
10+
// Additionally exposed is {@link watchdogEffect}: an Effect-native
11+
// heartbeat watchdog that complements (does not replace) the legacy
12+
// `setTimeout`-driven watchdog inside `Job`. It is intended for new
13+
// Effect-graph callers that want a typed `TaggedHeartbeatLost` failure
14+
// without bridging `AbortSignal` manually. Reset semantics use a
15+
// `Ref<Instant>` deadline polled by a `Schedule.fixed("250 millis")`
16+
// fiber, matching the 250 ms resolution called out in #44.
17+
18+
import {
19+
type JobId,
20+
TaggedHeartbeatLost,
21+
type TaggedInvalidRequest,
22+
type TaggedSdkError,
23+
taggedFromARCP,
24+
} from "@arcp/core";
25+
import { ARCPError as ARCPErrorClass } from "@arcp/core/errors";
26+
import type {
27+
JobErrorPayload,
28+
JobResultPayload,
29+
JobStateName,
30+
} from "@arcp/core/messages";
31+
import { Effect, Layer, Ref, Schedule } from "effect";
32+
33+
// Doc-only reference: Job/JobManager are the concrete legacy classes this
34+
// service is designed around. See `./job.ts`.
35+
36+
/**
37+
* Structural subset of `Job` this twin actually touches. Exposed so tests
38+
* can supply a minimal stub.
39+
*/
40+
export interface JobLike {
41+
readonly jobId: JobId;
42+
readonly state: JobStateName;
43+
readonly isTerminal: boolean;
44+
emitAccepted(): Promise<void>;
45+
emitRunning(): Promise<void>;
46+
emitEventKind(kind: string, body: unknown): Promise<void>;
47+
emitResult(result: JobResultPayload): Promise<void>;
48+
emitErrorEnvelope(payload: JobErrorPayload): Promise<void>;
49+
transition(next: JobStateName): void;
50+
markHeartbeat(): void;
51+
cancel(reason: string): void;
52+
abortHard(reason: string): void;
53+
}
54+
55+
/** Structural subset of {@link JobManager} this twin actually touches. */
56+
export interface JobManagerLike {
57+
register(job: JobLike): void;
58+
get(jobId: string): JobLike | undefined;
59+
has(jobId: string): boolean;
60+
retire(jobId: string): void;
61+
list(): readonly JobLike[];
62+
cancelAll(reason: string): number;
63+
abortAll(reason: string): void;
64+
}
65+
66+
/**
67+
* Per-job operations exposed by {@link JobService}. Delegates straight to
68+
* the backing {@link Job}; the only translation is the throw→typed-error
69+
* lift via {@link taggedFromARCP}.
70+
*/
71+
export interface JobEffect {
72+
readonly jobId: JobId;
73+
readonly emitAccepted: Effect.Effect<void, TaggedSdkError>;
74+
readonly emitRunning: Effect.Effect<void, TaggedSdkError>;
75+
readonly emitEventKind: (
76+
kind: string,
77+
body: unknown,
78+
) => Effect.Effect<void, TaggedSdkError>;
79+
readonly emitResult: (
80+
result: JobResultPayload,
81+
) => Effect.Effect<void, TaggedSdkError>;
82+
readonly emitErrorEnvelope: (
83+
payload: JobErrorPayload,
84+
) => Effect.Effect<void, TaggedSdkError>;
85+
readonly transition: (
86+
next: JobStateName,
87+
) => Effect.Effect<void, TaggedInvalidRequest>;
88+
readonly markHeartbeat: Effect.Effect<void>;
89+
readonly cancel: (reason: string) => Effect.Effect<void>;
90+
readonly abortHard: (reason: string) => Effect.Effect<void>;
91+
readonly state: Effect.Effect<JobStateName>;
92+
readonly isTerminal: Effect.Effect<boolean>;
93+
}
94+
95+
/** Per-job-manager operations exposed by {@link JobManagerService}. */
96+
export interface JobManagerEffect {
97+
readonly register: (job: JobLike) => Effect.Effect<void>;
98+
readonly get: (jobId: string) => Effect.Effect<JobLike | undefined>;
99+
readonly has: (jobId: string) => Effect.Effect<boolean>;
100+
readonly retire: (jobId: string) => Effect.Effect<void>;
101+
readonly list: Effect.Effect<readonly JobLike[]>;
102+
readonly cancelAll: (reason: string) => Effect.Effect<number>;
103+
readonly abortAll: (reason: string) => Effect.Effect<void>;
104+
}
105+
106+
/**
107+
* Effect-shaped twin of the per-job state machine. Bind via
108+
* {@link jobLayer}; the `.Default` stub is a defect (configuration bug),
109+
* not a typed failure.
110+
*
111+
* @example
112+
* ```ts
113+
* const program = Effect.gen(function* () {
114+
* const job = yield* JobService
115+
* yield* job.emitAccepted
116+
* yield* job.emitRunning
117+
* yield* job.emitEventKind("status", { phase: "halfway" })
118+
* yield* job.emitResult({ final_status: "success", result: 42 })
119+
* }).pipe(Effect.provide(jobLayer(legacyJob)))
120+
* ```
121+
*/
122+
export class JobService extends Effect.Service<JobService>()(
123+
"arcp/JobService",
124+
{ succeed: unboundJobStub() },
125+
) {}
126+
127+
/** Effect-shaped twin of {@link JobManager}. */
128+
export class JobManagerService extends Effect.Service<JobManagerService>()(
129+
"arcp/JobManagerService",
130+
{ succeed: unboundJobManagerStub() },
131+
) {}
132+
133+
/**
134+
* Build a {@link JobService} layer backed by a legacy {@link Job}. Ops
135+
* delegate through the legacy class so the §7/§8 wire emission and
136+
* integration-tested watchdog timing stay authoritative.
137+
*/
138+
export function jobLayer(job: JobLike): Layer.Layer<JobService> {
139+
return Layer.succeed(JobService, JobService.make(makeJobEffect(job)));
140+
}
141+
142+
/** Build a {@link JobManagerService} layer backed by a legacy {@link JobManager}. */
143+
export function jobManagerLayer(
144+
manager: JobManagerLike,
145+
): Layer.Layer<JobManagerService> {
146+
return Layer.succeed(
147+
JobManagerService,
148+
JobManagerService.make(makeJobManagerEffect(manager)),
149+
);
150+
}
151+
152+
/**
153+
* Construct the {@link JobEffect} ops record for a given legacy job.
154+
* Exported alongside the layer factory for callers that already hold the
155+
* legacy instance and want to bridge inline.
156+
*/
157+
export function makeJobEffect(job: JobLike): JobEffect {
158+
return {
159+
jobId: job.jobId,
160+
emitAccepted: liftSend(() => job.emitAccepted()),
161+
emitRunning: liftSend(() => job.emitRunning()),
162+
emitEventKind: (kind, body) => liftSend(() => job.emitEventKind(kind, body)),
163+
emitResult: (result) => liftSend(() => job.emitResult(result)),
164+
emitErrorEnvelope: (payload) =>
165+
liftSend(() => job.emitErrorEnvelope(payload)),
166+
transition: (next) => transitionEffect(job, next),
167+
markHeartbeat: Effect.sync(() => {
168+
job.markHeartbeat();
169+
}),
170+
cancel: (reason) =>
171+
Effect.sync(() => {
172+
job.cancel(reason);
173+
}),
174+
abortHard: (reason) =>
175+
Effect.sync(() => {
176+
job.abortHard(reason);
177+
}),
178+
state: Effect.sync(() => job.state),
179+
isTerminal: Effect.sync(() => job.isTerminal),
180+
};
181+
}
182+
183+
/** Construct the {@link JobManagerEffect} ops record for a given legacy manager. */
184+
export function makeJobManagerEffect(
185+
manager: JobManagerLike,
186+
): JobManagerEffect {
187+
return {
188+
register: (job) =>
189+
Effect.sync(() => {
190+
manager.register(job);
191+
}),
192+
get: (jobId) => Effect.sync(() => manager.get(jobId)),
193+
has: (jobId) => Effect.sync(() => manager.has(jobId)),
194+
retire: (jobId) =>
195+
Effect.sync(() => {
196+
manager.retire(jobId);
197+
}),
198+
list: Effect.sync(() => manager.list()),
199+
cancelAll: (reason) => Effect.sync(() => manager.cancelAll(reason)),
200+
abortAll: (reason) =>
201+
Effect.sync(() => {
202+
manager.abortAll(reason);
203+
}),
204+
};
205+
}
206+
207+
/**
208+
* Build a fiber-safe heartbeat watchdog. The returned record carries:
209+
*
210+
* - `reset`: an `Effect<void>` that re-arms the deadline to `now + thresholdMs`.
211+
* Call this whenever an event suggests the peer is still alive.
212+
* - `await`: an `Effect<never, TaggedHeartbeatLost>` that polls the deadline
213+
* on a `Schedule.fixed("250 millis")` cadence; it fails the moment
214+
* `now >= deadline`. Fork this onto a daemon fiber and `Effect.race` the
215+
* job's workflow against it for the §6.4 "heartbeat lost" failure
216+
* pattern called out in #44.
217+
*
218+
* No transport coupling — this is the Effect-shape twin of the legacy
219+
* `Job`-owned `setTimeout` watchdog, intended for new Effect-graph callers
220+
* that want typed-error semantics. The legacy watchdog stays in place for
221+
* the existing `Job` class consumers.
222+
*
223+
* @param thresholdMs grace period (ms) between resets before
224+
* {@link TaggedHeartbeatLost} fires
225+
* @param label optional context tag included in the failure message
226+
*/
227+
export function watchdogEffect(
228+
thresholdMs: number,
229+
label?: string,
230+
): Effect.Effect<{
231+
readonly reset: Effect.Effect<void>;
232+
readonly await: Effect.Effect<never, TaggedHeartbeatLost>;
233+
}> {
234+
return Effect.gen(function* () {
235+
const deadline = yield* Ref.make(Date.now() + thresholdMs);
236+
const reset = Ref.set(deadline, Date.now() + thresholdMs);
237+
const await_ = watchdogPoll(deadline, label);
238+
return { reset, await: await_ };
239+
});
240+
}
241+
242+
function watchdogPoll(
243+
deadline: Ref.Ref<number>,
244+
label: string | undefined,
245+
): Effect.Effect<never, TaggedHeartbeatLost> {
246+
const tick = Effect.gen(function* () {
247+
const d = yield* Ref.get(deadline);
248+
if (Date.now() >= d) {
249+
yield* Effect.fail(
250+
new TaggedHeartbeatLost({
251+
message:
252+
label === undefined
253+
? "watchdog: heartbeat threshold exceeded"
254+
: `watchdog: heartbeat threshold exceeded (${label})`,
255+
}),
256+
);
257+
}
258+
});
259+
// Repeat forever on a 250 ms cadence; the typed failure short-circuits
260+
// the loop on the first missed deadline. Cast to `never` since the
261+
// success branch is unreachable.
262+
return tick.pipe(
263+
Effect.repeat(Schedule.fixed("250 millis")),
264+
) as Effect.Effect<never, TaggedHeartbeatLost>;
265+
}
266+
267+
// ---------------------------------------------------------------------------
268+
// Internal helpers
269+
// ---------------------------------------------------------------------------
270+
271+
function liftSend(thunk: () => Promise<void>): Effect.Effect<void, TaggedSdkError> {
272+
return Effect.tryPromise({
273+
try: thunk,
274+
catch: (cause) => liftToTagged(cause),
275+
});
276+
}
277+
278+
function transitionEffect(
279+
job: JobLike,
280+
next: JobStateName,
281+
): Effect.Effect<void, TaggedInvalidRequest> {
282+
return Effect.try({
283+
try: () => {
284+
job.transition(next);
285+
},
286+
catch: (cause) => liftToTagged(cause) as TaggedInvalidRequest,
287+
});
288+
}
289+
290+
function liftToTagged(cause: unknown): TaggedSdkError {
291+
if (cause instanceof ARCPErrorClass) return taggedFromARCP(cause);
292+
throw cause as Error;
293+
}
294+
295+
function unboundJobStub(): JobEffect {
296+
const die = (): Effect.Effect<never> =>
297+
Effect.die("JobService not bound; provide jobLayer");
298+
return {
299+
jobId: "" as JobId,
300+
emitAccepted: die(),
301+
emitRunning: die(),
302+
emitEventKind: () => die(),
303+
emitResult: () => die(),
304+
emitErrorEnvelope: () => die(),
305+
transition: () => die(),
306+
markHeartbeat: die(),
307+
cancel: () => die(),
308+
abortHard: () => die(),
309+
state: die(),
310+
isTerminal: die(),
311+
};
312+
}
313+
314+
function unboundJobManagerStub(): JobManagerEffect {
315+
const die = (): Effect.Effect<never> =>
316+
Effect.die("JobManagerService not bound; provide jobManagerLayer");
317+
return {
318+
register: () => die(),
319+
get: () => die(),
320+
has: () => die(),
321+
retire: () => die(),
322+
list: die(),
323+
cancelAll: () => die(),
324+
abortAll: () => die(),
325+
};
326+
}

0 commit comments

Comments
 (0)