Skip to content

Commit 2e3000a

Browse files
Merge pull request #37 from backpine/dep-injection
added server provider to jobs
2 parents 25e49df + 8cf69a0 commit 2e3000a

13 files changed

Lines changed: 498 additions & 171 deletions

File tree

examples/effect-worker-v2/src/jobs/basic-debounce.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@ class Random extends Context.Tag("MyRandomService")<
66
{ readonly next: Effect.Effect<number> }
77
>() {}
88

9+
import { Layer } from "effect";
10+
11+
const RandomLive = Layer.succeed(Random, {
12+
next: Effect.sync(() => Math.random()),
13+
});
14+
915
// =============================================================================
1016
// Debounce Job - Batches events and flushes after delay
1117
// =============================================================================
@@ -69,5 +75,5 @@ export const debounceExample = Debounce.make({
6975
yield* Effect.log(
7076
`Debounce flushed! Events: ${eventCount}, Last action: ${state?.actionId}, Reason: ${ctx.flushReason}`,
7177
);
72-
}),
78+
}).pipe(Effect.provide(RandomLive)),
7379
});

packages/jobs/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@durable-effect/jobs",
3-
"version": "0.0.1-next.5",
3+
"version": "0.0.1-next.6",
44
"type": "module",
55
"main": "./dist/index.js",
66
"types": "./dist/index.d.ts",

packages/jobs/src/definitions/continuous.ts

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import type { JobRetryConfig } from "../retry/types";
1616
/**
1717
* Input config for creating a continuous job definition.
1818
*/
19-
export interface ContinuousMakeConfig<S, E, R> {
19+
export interface ContinuousMakeConfig<S, E> {
2020
/**
2121
* Schema for validating and serializing state.
2222
* Accepts any Effect Schema (Struct, Class, etc.)
@@ -73,8 +73,20 @@ export interface ContinuousMakeConfig<S, E, R> {
7373

7474
/**
7575
* The function to execute on schedule.
76+
*
77+
* Must return Effect<void, E, never> - all service requirements must be satisfied.
78+
* If your effect requires services, provide them via .pipe(Effect.provide(layer)).
79+
*
80+
* @example
81+
* ```ts
82+
* execute: (ctx) =>
83+
* Effect.gen(function* () {
84+
* const random = yield* Random;
85+
* // ...
86+
* }).pipe(Effect.provide(RandomLive))
87+
* ```
7688
*/
77-
execute(ctx: ContinuousContext<S>): Effect.Effect<void, E, R>;
89+
execute(ctx: ContinuousContext<S>): Effect.Effect<void, E, never>;
7890
}
7991

8092
/**
@@ -116,9 +128,9 @@ export const Continuous = {
116128
* @param config - Configuration for the job
117129
* @returns An UnregisteredContinuousDefinition that can be registered
118130
*/
119-
make: <S, E = never, R = never>(
120-
config: ContinuousMakeConfig<S, E, R>
121-
): UnregisteredContinuousDefinition<S, E, R> => ({
131+
make: <S, E = never>(
132+
config: ContinuousMakeConfig<S, E>
133+
): UnregisteredContinuousDefinition<S, E> => ({
122134
_tag: "ContinuousDefinition",
123135
stateSchema: config.stateSchema,
124136
schedule: config.schedule,

packages/jobs/src/definitions/debounce.ts

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,11 @@ import type { JobRetryConfig } from "../retry/types";
1515

1616
/**
1717
* Configuration for creating a debounce job definition.
18+
*
19+
* Note: All handler functions must return Effect with R = never.
20+
* If your effect requires services, provide them via .pipe(Effect.provide(layer)).
1821
*/
19-
export interface DebounceMakeConfig<I, S, E, R> {
22+
export interface DebounceMakeConfig<I, S, E> {
2023
/**
2124
* Schema for validating incoming events.
2225
*/
@@ -76,14 +79,24 @@ export interface DebounceMakeConfig<I, S, E, R> {
7679

7780
/**
7881
* Reducer for each incoming event. Defaults to returning the latest event.
82+
* Must return Effect<S, never, never> - all service requirements must be satisfied.
7983
*/
80-
onEvent?(ctx: DebounceEventContext<I, S>): Effect.Effect<S, never, R>;
84+
onEvent?(ctx: DebounceEventContext<I, S>): Effect.Effect<S, never, never>;
8185

8286
/**
8387
* Effect executed when the debounce flushes.
88+
* Must return Effect<void, E, never> - all service requirements must be satisfied.
89+
*
90+
* @example
91+
* ```ts
92+
* execute: (ctx) =>
93+
* Effect.gen(function* () {
94+
* const random = yield* Random;
95+
* // ...
96+
* }).pipe(Effect.provide(RandomLive))
97+
* ```
8498
*/
85-
execute(ctx: DebounceExecuteContext<S>): Effect.Effect<void, E, R>;
86-
99+
execute(ctx: DebounceExecuteContext<S>): Effect.Effect<void, E, never>;
87100
}
88101

89102
/**
@@ -99,14 +112,14 @@ export interface DebounceMakeConfig<I, S, E, R> {
99112
* ```
100113
*/
101114
export const Debounce = {
102-
make: <I, S = I, E = never, R = never>(
103-
config: DebounceMakeConfig<I, S, E, R>
104-
): UnregisteredDebounceDefinition<I, S, E, R> => ({
115+
make: <I, S = I, E = never>(
116+
config: DebounceMakeConfig<I, S, E>
117+
): UnregisteredDebounceDefinition<I, S, E> => ({
105118
_tag: "DebounceDefinition",
106119
eventSchema: config.eventSchema,
107120
stateSchema: (config.stateSchema ?? config.eventSchema) as Schema.Schema<
108121
S,
109-
any,
122+
unknown,
110123
never
111124
>,
112125
flushAfter: config.flushAfter,

packages/jobs/src/definitions/task.ts

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@ import type {
2222
* - Execute runs when alarm fires
2323
* - User controls the full lifecycle via schedule/clear
2424
*
25+
* Note: All handler functions must return Effect with R = never.
26+
* If your effect requires services, provide them via .pipe(Effect.provide(layer)).
27+
*
2528
* @typeParam S - State type (inferred from stateSchema)
2629
* @typeParam E - Event type (inferred from eventSchema)
2730
* @typeParam Err - Error type (inferred from handlers)
28-
* @typeParam R - Effect requirements (inferred from handlers)
2931
*/
30-
export interface TaskMakeConfig<S, E, Err, R> {
32+
export interface TaskMakeConfig<S, E, Err> {
3133
/**
3234
* Schema for validating and serializing state.
3335
* State is persisted durably and survives restarts.
@@ -42,6 +44,7 @@ export interface TaskMakeConfig<S, E, Err, R> {
4244

4345
/**
4446
* Handler called for each incoming event.
47+
* Must return Effect<void, Err, never> - all service requirements must be satisfied.
4548
*
4649
* The event is passed as the first parameter (not on ctx) to make it
4750
* clear that it's a direct value, not an Effect that needs yielding.
@@ -76,10 +79,11 @@ export interface TaskMakeConfig<S, E, Err, R> {
7679
* })
7780
* ```
7881
*/
79-
onEvent(event: E, ctx: TaskEventContext<S>): Effect.Effect<void, Err, R>;
82+
onEvent(event: E, ctx: TaskEventContext<S>): Effect.Effect<void, Err, never>;
8083

8184
/**
8285
* Handler called when the scheduled alarm fires.
86+
* Must return Effect<void, Err, never> - all service requirements must be satisfied.
8387
*
8488
* Responsibilities:
8589
* - Process the current state
@@ -100,11 +104,12 @@ export interface TaskMakeConfig<S, E, Err, R> {
100104
* })
101105
* ```
102106
*/
103-
execute(ctx: TaskExecuteContext<S>): Effect.Effect<void, Err, R>;
107+
execute(ctx: TaskExecuteContext<S>): Effect.Effect<void, Err, never>;
104108

105109
/**
106110
* Optional handler called when either `onEvent` or `execute` completes
107111
* and no alarm is scheduled.
112+
* Must return Effect<void, never, never> - all service requirements must be satisfied.
108113
*
109114
* Use this to:
110115
* - Schedule delayed cleanup
@@ -119,10 +124,11 @@ export interface TaskMakeConfig<S, E, Err, R> {
119124
* })
120125
* ```
121126
*/
122-
readonly onIdle?: (ctx: TaskIdleContext<S>) => Effect.Effect<void, never, R>;
127+
readonly onIdle?: (ctx: TaskIdleContext<S>) => Effect.Effect<void, never, never>;
123128

124129
/**
125130
* Optional error handler for onEvent/execute failures.
131+
* Must return Effect<void, never, never> - all service requirements must be satisfied.
126132
*
127133
* Use this to:
128134
* - Log errors
@@ -151,7 +157,7 @@ export interface TaskMakeConfig<S, E, Err, R> {
151157
readonly onError?: (
152158
error: Err,
153159
ctx: TaskErrorContext<S>
154-
) => Effect.Effect<void, never, R>;
160+
) => Effect.Effect<void, never, never>;
155161

156162
/**
157163
* Control logging for this job.
@@ -269,9 +275,9 @@ export const Task = {
269275
* @param config - Configuration for the task
270276
* @returns An UnregisteredTaskDefinition that can be registered
271277
*/
272-
make: <S, E, Err = never, R = never>(
273-
config: TaskMakeConfig<S, E, Err, R>
274-
): UnregisteredTaskDefinition<S, E, Err, R> => ({
278+
make: <S, E, Err = never>(
279+
config: TaskMakeConfig<S, E, Err>
280+
): UnregisteredTaskDefinition<S, E, Err> => ({
275281
_tag: "TaskDefinition",
276282
stateSchema: config.stateSchema,
277283
eventSchema: config.eventSchema,

packages/jobs/src/handlers/continuous/handler.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ export const ContinuousHandlerLayer = Layer.effect(
5050

5151
const getDefinition = (
5252
name: string,
53-
): Effect.Effect<StoredContinuousDefinition<any, any>, JobNotFoundError> => {
53+
): Effect.Effect<StoredContinuousDefinition, JobNotFoundError> => {
5454
const def = registryService.registry.continuous[name];
5555
if (!def) {
5656
return Effect.fail(new JobNotFoundError({ type: "continuous", name }));
@@ -78,7 +78,7 @@ export const ContinuousHandlerLayer = Layer.effect(
7878
});
7979

8080
const scheduleNext = (
81-
def: StoredContinuousDefinition<any, any>,
81+
def: StoredContinuousDefinition,
8282
): Effect.Effect<void, SchedulerError | ExecutionError> => {
8383
const schedule = def.schedule;
8484
switch (schedule._tag) {
@@ -94,7 +94,7 @@ export const ContinuousHandlerLayer = Layer.effect(
9494
};
9595

9696
const runExecution = (
97-
def: StoredContinuousDefinition<any, any>,
97+
def: StoredContinuousDefinition,
9898
runCount: number,
9999
id?: string,
100100
) =>
@@ -107,7 +107,7 @@ export const ContinuousHandlerLayer = Layer.effect(
107107
id,
108108
run: (ctx: ContinuousContext<unknown>) => def.execute(ctx),
109109
createContext: (base) => {
110-
const proxyHolder: StateHolder<any> = {
110+
const proxyHolder: StateHolder<unknown> = {
111111
get current() {
112112
return base.getState();
113113
},
@@ -132,9 +132,9 @@ export const ContinuousHandlerLayer = Layer.effect(
132132
});
133133

134134
const handleStart = (
135-
def: StoredContinuousDefinition<any, any>,
135+
def: StoredContinuousDefinition,
136136
request: ContinuousRequest,
137-
): Effect.Effect<ContinuousResponse, HandlerError, any> =>
137+
): Effect.Effect<ContinuousResponse, HandlerError> =>
138138
Effect.gen(function* () {
139139
const existing = yield* metadata.get();
140140
if (existing) {
@@ -251,8 +251,8 @@ export const ContinuousHandlerLayer = Layer.effect(
251251
});
252252

253253
const handleTrigger = (
254-
def: StoredContinuousDefinition<any, any>,
255-
): Effect.Effect<ContinuousResponse, HandlerError, any> =>
254+
def: StoredContinuousDefinition,
255+
): Effect.Effect<ContinuousResponse, HandlerError> =>
256256
Effect.gen(function* () {
257257
const existing = yield* metadata.get();
258258
if (
@@ -321,7 +321,7 @@ export const ContinuousHandlerLayer = Layer.effect(
321321
});
322322

323323
const handleGetState = (
324-
def: StoredContinuousDefinition<any, any>,
324+
def: StoredContinuousDefinition,
325325
): Effect.Effect<ContinuousResponse, HandlerError> =>
326326
Effect.gen(function* () {
327327
const stateService = yield* createEntityStateService(

packages/jobs/src/handlers/debounce/handler.ts

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ export const DebounceHandlerLayer = Layer.effect(
5959

6060
const getDefinition = (
6161
name: string,
62-
): Effect.Effect<StoredDebounceDefinition<any, any, any>, JobNotFoundError> => {
62+
): Effect.Effect<StoredDebounceDefinition, JobNotFoundError> => {
6363
const def = registryService.registry.debounce[name];
6464
if (!def) {
6565
return Effect.fail(new JobNotFoundError({ type: "debounce", name }));
@@ -91,7 +91,7 @@ export const DebounceHandlerLayer = Layer.effect(
9191
});
9292

9393
const runFlush = (
94-
def: StoredDebounceDefinition<any, any, any>,
94+
def: StoredDebounceDefinition,
9595
flushReason: "maxEvents" | "flushAfter" | "manual",
9696
id?: string,
9797
) =>
@@ -102,7 +102,7 @@ export const DebounceHandlerLayer = Layer.effect(
102102
retryConfig: def.retry,
103103
runCount: 0, // Debounce doesn't track runCount persistently in same way
104104
id,
105-
run: (ctx: DebounceExecuteContext<any>) => def.execute(ctx),
105+
run: (ctx: DebounceExecuteContext<unknown>) => def.execute(ctx),
106106
createContext: (base) => {
107107
return {
108108
state: Effect.succeed(base.getState()), // Snapshotted state
@@ -116,14 +116,14 @@ export const DebounceHandlerLayer = Layer.effect(
116116
flushReason,
117117
attempt: base.attempt,
118118
isRetry: base.isRetry,
119-
} as DebounceExecuteContext<any>;
119+
} as DebounceExecuteContext<unknown>;
120120
},
121121
});
122122

123123
const handleAdd = (
124-
def: StoredDebounceDefinition<any, any, any>,
124+
def: StoredDebounceDefinition,
125125
request: DebounceRequest,
126-
): Effect.Effect<DebounceResponse, HandlerError, any> =>
126+
): Effect.Effect<DebounceResponse, HandlerError> =>
127127
Effect.gen(function* () {
128128
const meta = yield* metadata.get();
129129
const created = !meta;
@@ -158,13 +158,12 @@ export const DebounceHandlerLayer = Layer.effect(
158158
const stateForContext = currentState ?? (validatedEvent as unknown);
159159

160160
const onEvent = def.onEvent!;
161-
// Cast is still needed unless we fix Definition generic constraints
162161
const reducedState = yield* onEvent({
163162
event: validatedEvent as unknown,
164163
state: stateForContext,
165164
eventCount: nextCount,
166165
instanceId: runtime.instanceId,
167-
} as any);
166+
});
168167

169168
yield* stateService.set(reducedState);
170169
yield* setEventCount(nextCount);
@@ -244,9 +243,9 @@ export const DebounceHandlerLayer = Layer.effect(
244243
});
245244

246245
const handleFlush = (
247-
def: StoredDebounceDefinition<any, any, any>,
246+
def: StoredDebounceDefinition,
248247
reason: "manual" | "flushAfter" | "maxEvents",
249-
): Effect.Effect<DebounceResponse, HandlerError, any> =>
248+
): Effect.Effect<DebounceResponse, HandlerError> =>
250249
Effect.gen(function* () {
251250
const meta = yield* metadata.get();
252251
if (!meta) {
@@ -353,7 +352,7 @@ export const DebounceHandlerLayer = Layer.effect(
353352
});
354353

355354
const handleGetState = (
356-
def: StoredDebounceDefinition<any, any, any>,
355+
def: StoredDebounceDefinition,
357356
): Effect.Effect<DebounceResponse, HandlerError> =>
358357
Effect.gen(function* () {
359358
const stateService = yield* withStorage(

0 commit comments

Comments
 (0)