Skip to content

Commit f3024b4

Browse files
atched issue with duration
1 parent d15f080 commit f3024b4

11 files changed

Lines changed: 530 additions & 73 deletions

File tree

packages/task/package.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@durable-effect/task",
3-
"version": "0.0.4",
3+
"version": "0.0.7",
44
"type": "module",
55
"main": "./dist/index.js",
66
"types": "./dist/index.d.ts",
@@ -28,7 +28,7 @@
2828
"test:watch": "vitest"
2929
},
3030
"peerDependencies": {
31-
"effect": "^4.0.0-beta.21",
31+
"effect": "^4.0.0-beta.38",
3232
"@cloudflare/workers-types": ">=4.0.0"
3333
},
3434
"peerDependenciesMeta": {
@@ -37,7 +37,7 @@
3737
}
3838
},
3939
"devDependencies": {
40-
"effect": "4.0.0-beta.21",
40+
"effect": "4.0.0-beta.38",
4141
"vitest": "^2.1.0"
4242
}
4343
}

packages/task/src/Task.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import type { TaskDefineConfig, TaskDefinition } from "./TaskDefinition.js"
77
export const Task = {
88
define<S, E, EErr, AErr, R, OErr = never, GErr = never>(
99
config: TaskDefineConfig<S, E, EErr, AErr, R, OErr, GErr>,
10-
): TaskDefinition<S, E, EErr | AErr | OErr | GErr, R> {
10+
): TaskDefinition<S, E, EErr, AErr, R, OErr, GErr> {
1111
return {
1212
_tag: "TaskDefinition",
1313
state: config.state,

packages/task/src/TaskDefinition.ts

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ export interface TaskDefineConfig<
3838
) => Effect.Effect<void, AErr, R>
3939
readonly onError?: (
4040
ctx: TaskContext<S>,
41-
error: unknown,
41+
error: EErr | AErr,
4242
) => Effect.Effect<void, OErr, R>
4343
readonly onClientGetState?: (
4444
ctx: TaskContext<S>,
@@ -66,44 +66,53 @@ export interface TaskDefineConfigVoid<
6666
) => Effect.Effect<void, AErr, R>
6767
readonly onError?: (
6868
ctx: TaskContext<S>,
69-
error: unknown,
69+
error: EErr | AErr,
7070
) => Effect.Effect<void, OErr, R>
7171
}
7272

7373
// ---------------------------------------------------------------------------
7474
// TaskDefinition — the pure definition value
7575
// ---------------------------------------------------------------------------
7676

77-
export interface TaskDefinition<S, E, Err, R> {
77+
export interface TaskDefinition<S, E, EErr, AErr, R, OErr = never, GErr = never> {
7878
readonly _tag: "TaskDefinition"
7979
readonly state: PureSchema<S>
8080
readonly event: PureSchema<E>
8181
readonly onEvent: (
8282
ctx: TaskContext<S>,
8383
event: E,
84-
) => Effect.Effect<void, Err, R>
84+
) => Effect.Effect<void, EErr, R>
8585
readonly onAlarm: (
8686
ctx: TaskContext<S>,
87-
) => Effect.Effect<void, Err, R>
87+
) => Effect.Effect<void, AErr, R>
8888
readonly onError?: (
8989
ctx: TaskContext<S>,
90-
error: unknown,
91-
) => Effect.Effect<void, Err, R>
90+
error: EErr | AErr,
91+
) => Effect.Effect<void, OErr, R>
9292
readonly onClientGetState?: (
9393
ctx: TaskContext<S>,
9494
state: S | null,
95-
) => Effect.Effect<S | null, Err, R>
95+
) => Effect.Effect<S | null, GErr, R>
9696
}
9797

98+
// ---------------------------------------------------------------------------
99+
// TaskErrors — extract the full error union from a TaskDefinition
100+
// ---------------------------------------------------------------------------
101+
102+
export type TaskErrors<D> =
103+
D extends TaskDefinition<any, any, infer EErr, infer AErr, any, infer OErr, infer GErr>
104+
? EErr | AErr | OErr | GErr
105+
: never
106+
98107
// ---------------------------------------------------------------------------
99108
// withServices — wraps handlers with Effect.provide to eliminate R,
100109
// returning TaskDefinition<S, E, Err, never> (preserving S and E).
101110
// ---------------------------------------------------------------------------
102111

103-
export function withServices<S, E, Err, R>(
104-
definition: TaskDefinition<S, E, Err, R>,
112+
export function withServices<S, E, EErr, AErr, R, OErr, GErr>(
113+
definition: TaskDefinition<S, E, EErr, AErr, R, OErr, GErr>,
105114
layer: Layer.Layer<R>,
106-
): TaskDefinition<S, E, Err, never> {
115+
): TaskDefinition<S, E, EErr, AErr, never, OErr, GErr> {
107116
return {
108117
_tag: "TaskDefinition",
109118
state: definition.state,

packages/task/src/cloudflare/createTasks.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,14 @@ import { TaskClientError } from "./errors.js"
1111
// ---------------------------------------------------------------------------
1212

1313
export type EventOf<
14-
T extends Record<string, TaskDefinition<any, any, any, never>>,
14+
T extends Record<string, TaskDefinition<any, any, any, any, never>>,
1515
K extends keyof T,
16-
> = T[K] extends TaskDefinition<any, infer E, any, any> ? E : never
16+
> = T[K] extends TaskDefinition<any, infer E, any, any, any, any, any> ? E : never
1717

1818
export type StateOf<
19-
T extends Record<string, TaskDefinition<any, any, any, never>>,
19+
T extends Record<string, TaskDefinition<any, any, any, any, never>>,
2020
K extends keyof T,
21-
> = T[K] extends TaskDefinition<infer S, any, any, any> ? S : never
21+
> = T[K] extends TaskDefinition<infer S, any, any, any, any, any, any> ? S : never
2222

2323
// ---------------------------------------------------------------------------
2424
// Structural types — avoids depending on CF ambient types
@@ -46,7 +46,7 @@ export interface TaskHandle<S, E> {
4646
// ---------------------------------------------------------------------------
4747

4848
export interface TasksAccessor<
49-
T extends Record<string, TaskDefinition<any, any, any, never>>,
49+
T extends Record<string, TaskDefinition<any, any, any, any, never>>,
5050
> {
5151
<K extends keyof T & string>(
5252
doNamespace: DurableObjectNamespaceLike,
@@ -59,7 +59,7 @@ export interface TasksAccessor<
5959
// ---------------------------------------------------------------------------
6060

6161
export function createTasks<
62-
const T extends Record<string, TaskDefinition<any, any, any, never>>,
62+
const T extends Record<string, TaskDefinition<any, any, any, any, never>>,
6363
>(
6464
definitions: T,
6565
): {

packages/task/src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ export { Alarm, AlarmError } from "./services/Alarm.js"
1414
// User-facing API
1515
export { Task } from "./Task.js"
1616
export type { TaskContext } from "./TaskContext.js"
17-
export type { PureSchema, TaskDefineConfig, TaskDefinition } from "./TaskDefinition.js"
17+
export type { PureSchema, TaskDefineConfig, TaskDefinition, TaskErrors } from "./TaskDefinition.js"
1818
export { withServices } from "./TaskDefinition.js"
1919

2020
// Framework services

packages/task/src/services/TaskRegistry.ts

Lines changed: 55 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ function buildTaskContext<S>(
9595

9696
const scheduleIn = (delay: Duration.Input): Effect.Effect<void, TaskError> =>
9797
Effect.gen(function* () {
98-
const ms = Duration.toMillis(delay)
98+
const ms = Duration.toMillis(Duration.fromInputUnsafe(delay))
9999
const ts = Date.now() + ms
100100
yield* alarm.set(ts).pipe(Effect.mapError(mapAlarmError))
101101
yield* storage.set(ALARM_KEY, ts).pipe(Effect.mapError(mapStorageError))
@@ -140,18 +140,53 @@ function cleanup(
140140
}).pipe(Effect.mapError((e) => new TaskExecutionError({ cause: e })))
141141
}
142142

143+
// ---------------------------------------------------------------------------
144+
// Internal: type guard for PurgeSignal
145+
// ---------------------------------------------------------------------------
146+
147+
function isPurgeSignal(error: unknown): error is PurgeSignal {
148+
return error instanceof PurgeSignal
149+
}
150+
151+
// ---------------------------------------------------------------------------
152+
// Internal: handle errors from a handler effect, routing PurgeSignal to
153+
// cleanup and other errors to onError (if provided).
154+
// ---------------------------------------------------------------------------
155+
156+
function handleHandlerError<S, HErr, OErr>(
157+
ctx: TaskContext<S>,
158+
error: HErr,
159+
onError: ((ctx: TaskContext<S>, error: HErr) => Effect.Effect<void, OErr, never>) | undefined,
160+
storage: Storage["Service"],
161+
alarm: Alarm["Service"],
162+
): Effect.Effect<void, TaskExecutionError> {
163+
if (isPurgeSignal(error)) {
164+
return cleanup(storage, alarm)
165+
}
166+
167+
if (!onError) {
168+
return Effect.fail(new TaskExecutionError({ cause: error }))
169+
}
170+
171+
return onError(ctx, error).pipe(
172+
Effect.catch((oErr) =>
173+
isPurgeSignal(oErr)
174+
? cleanup(storage, alarm)
175+
: Effect.fail(new TaskExecutionError({ cause: oErr })),
176+
),
177+
Effect.mapError((e) => new TaskExecutionError({ cause: e })),
178+
)
179+
}
180+
143181
// ---------------------------------------------------------------------------
144182
// Internal: build RegisteredTask from a fully-resolved definition (R = never).
145183
//
146184
// All handler effects in the definition have R = never, so no layer provision
147-
// is needed. S, E, Err are captured in the closures via the generic params.
148-
//
149-
// The error widening trick (mapError to Err | PurgeSignal) lets catchTag
150-
// find the PurgeSignal tag even when Err is a generic type parameter.
185+
// is needed. S, E, EErr, AErr are captured in closures via generic params.
151186
// ---------------------------------------------------------------------------
152187

153-
function buildRegisteredTask<S, E, Err>(
154-
definition: TaskDefinition<S, E, Err, never>,
188+
function buildRegisteredTask<S, E, EErr, AErr, OErr, GErr>(
189+
definition: TaskDefinition<S, E, EErr, AErr, never, OErr, GErr>,
155190
): RegisteredTask {
156191
const decodeEvent = Schema.decodeUnknownEffect(definition.event)
157192
const decodeState = Schema.decodeUnknownEffect(definition.state)
@@ -171,26 +206,9 @@ function buildRegisteredTask<S, E, Err>(
171206

172207
const ctx = buildTaskContext(storage, alarm, id, name, decodeState, encodeState)
173208

174-
// Widen error to include PurgeSignal so catchTag can resolve the tag
175-
const withPurge = definition.onEvent(ctx, event).pipe(
176-
Effect.mapError((e): Err | PurgeSignal => e),
177-
Effect.catchTag("PurgeSignal", () => cleanup(storage, alarm)),
178-
)
179-
180-
if (!definition.onError) {
181-
yield* withPurge.pipe(
182-
Effect.mapError((e) => new TaskExecutionError({ cause: e })),
183-
)
184-
return
185-
}
186-
187-
yield* withPurge.pipe(
209+
yield* definition.onEvent(ctx, event).pipe(
188210
Effect.catch((error) =>
189-
definition.onError!(ctx, error).pipe(
190-
Effect.mapError((e): Err | PurgeSignal => e),
191-
Effect.catchTag("PurgeSignal", () => cleanup(storage, alarm)),
192-
Effect.mapError((e) => new TaskExecutionError({ cause: e })),
193-
)
211+
handleHandlerError(ctx, error, definition.onError, storage, alarm),
194212
),
195213
)
196214
})
@@ -202,27 +220,16 @@ function buildRegisteredTask<S, E, Err>(
202220
name: string,
203221
): Effect.Effect<void, TaskExecutionError> =>
204222
Effect.gen(function* () {
205-
const ctx = buildTaskContext(storage, alarm, id, name, decodeState, encodeState)
206-
207-
const withPurge = definition.onAlarm(ctx).pipe(
208-
Effect.mapError((e): Err | PurgeSignal => e),
209-
Effect.catchTag("PurgeSignal", () => cleanup(storage, alarm)),
223+
// Clear the alarm bookmark — the alarm has fired, so it's no longer pending
224+
yield* storage.delete(ALARM_KEY).pipe(
225+
Effect.mapError((e) => new TaskExecutionError({ cause: e })),
210226
)
211227

212-
if (!definition.onError) {
213-
yield* withPurge.pipe(
214-
Effect.mapError((e) => new TaskExecutionError({ cause: e })),
215-
)
216-
return
217-
}
228+
const ctx = buildTaskContext(storage, alarm, id, name, decodeState, encodeState)
218229

219-
yield* withPurge.pipe(
230+
yield* definition.onAlarm(ctx).pipe(
220231
Effect.catch((error) =>
221-
definition.onError!(ctx, error).pipe(
222-
Effect.mapError((e): Err | PurgeSignal => e),
223-
Effect.catchTag("PurgeSignal", () => cleanup(storage, alarm)),
224-
Effect.mapError((e) => new TaskExecutionError({ cause: e })),
225-
)
232+
handleHandlerError(ctx, error, definition.onError, storage, alarm),
226233
),
227234
)
228235
})
@@ -258,8 +265,8 @@ function buildRegisteredTask<S, E, Err>(
258265
// registerTask — for definitions with no service requirements (R = never)
259266
// ---------------------------------------------------------------------------
260267

261-
export function registerTask<S, E, Err>(
262-
definition: TaskDefinition<S, E, Err, never>,
268+
export function registerTask<S, E, EErr, AErr, OErr, GErr>(
269+
definition: TaskDefinition<S, E, EErr, AErr, never, OErr, GErr>,
263270
): RegisteredTask {
264271
return buildRegisteredTask(definition)
265272
}
@@ -272,11 +279,11 @@ export function registerTask<S, E, Err>(
272279
// shared buildRegisteredTask.
273280
// ---------------------------------------------------------------------------
274281

275-
export function registerTaskWithLayer<S, E, Err, R>(
276-
definition: TaskDefinition<S, E, Err, R>,
282+
export function registerTaskWithLayer<S, E, EErr, AErr, R, OErr, GErr>(
283+
definition: TaskDefinition<S, E, EErr, AErr, R, OErr, GErr>,
277284
layer: Layer.Layer<R>,
278285
): RegisteredTask {
279-
const resolved: TaskDefinition<S, E, Err, never> = {
286+
const resolved: TaskDefinition<S, E, EErr, AErr, never, OErr, GErr> = {
280287
_tag: "TaskDefinition",
281288
state: definition.state,
282289
event: definition.event,

0 commit comments

Comments
 (0)