From 9511787c7b7d05e9cafdd01d86daa5ebd0577323 Mon Sep 17 00:00:00 2001 From: Julien Polo Date: Fri, 8 May 2026 23:19:32 +0200 Subject: [PATCH 1/7] =?UTF-8?q?=F0=9F=94=A8=20Refactor=20JobEnqueueOptions?= =?UTF-8?q?=20to=20use=20consistent=20naming=20for=20enqueue=20types?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/playground/src/job/Job.ts | 14 +++++++++---- packages/playground/src/job/JobEnqueue.ts | 21 +++++++++++++++++++ packages/playground/src/job/JobHandler.ts | 9 ++++++++ packages/playground/src/job/JobProvider.ts | 7 ++----- .../src/job/MemoryJobProvider.spec.ts | 8 +++---- .../playground/src/job/MemoryJobProvider.ts | 9 ++++---- 6 files changed, 51 insertions(+), 17 deletions(-) create mode 100644 packages/playground/src/job/JobEnqueue.ts create mode 100644 packages/playground/src/job/JobHandler.ts diff --git a/packages/playground/src/job/Job.ts b/packages/playground/src/job/Job.ts index 6c6266dd9bf..35050ad7f6d 100644 --- a/packages/playground/src/job/Job.ts +++ b/packages/playground/src/job/Job.ts @@ -1,13 +1,16 @@ import { Codec, lazy, Tag, Type } from '@w5s/core'; import { Task } from '@w5s/task'; import { randomUUID, type UUID } from '@w5s/uuid'; +import { configuration } from './configuration.js'; +import type { JobRequest } from './JobRequest.js'; +import { JobEnqueue } from './JobEnqueue.js'; export namespace Job { export interface Module { jobName: Request['_']; Request: Type.Module; - create(payload: Request['payload']): Task; + performNow(payload: Request['payload']): Task; } } @@ -21,15 +24,18 @@ export const Job = { _: Type.constant(jobName), payload: PayloadType, }, `${jobName}Job`); + return { jobName, Request, - create(payload) { + performNow(payload) { const request = { _: jobName, payload }; const requestEncoded = lazy(() => Codec.encode(Request, request)); return Task.andThen(Job.nextJobId, (jobId) => { return Task.create(async () => { - console.log(`Running job ${jobName} with payload:`, payload, requestEncoded()); + const { provider } = configuration.current; + + await provider.enqueue(requestEncoded() as JobRequest, JobEnqueue.Immediate); return Task.ok(jobId as JobId); }); }); @@ -43,4 +49,4 @@ export const Job = { // bar: Type.number, // })); -// const jobId = await Blah.create({ foo: 'hello', bar: 42 }).run(); +// const jobId = await Blah.performNow({ foo: 'hello', bar: 42 }).run(); diff --git a/packages/playground/src/job/JobEnqueue.ts b/packages/playground/src/job/JobEnqueue.ts new file mode 100644 index 00000000000..91d4d6ab016 --- /dev/null +++ b/packages/playground/src/job/JobEnqueue.ts @@ -0,0 +1,21 @@ +export type JobEnqueue = + | { _: 'JobEnqueueImmediate' } + | { _: 'JobEnqueueDelayed'; delay: number }; +export const JobEnqueue = { + /** + * Immediate job enqueue option, the job will be executed as soon as possible. + * + * @example + * JobEnqueue.Immediate + */ + Immediate: { _: 'JobEnqueueImmediate' } satisfies JobEnqueue, + + /** + * Creates a delayed job enqueue option. + * + * @example + * JobEnqueue.Delayed(5000) // The job will be executed after 5 seconds + * @param delay + */ + Delayed: (delay: number): JobEnqueue => ({ _: 'JobEnqueueDelayed', delay }), +}; diff --git a/packages/playground/src/job/JobHandler.ts b/packages/playground/src/job/JobHandler.ts new file mode 100644 index 00000000000..9cbca37d08e --- /dev/null +++ b/packages/playground/src/job/JobHandler.ts @@ -0,0 +1,9 @@ +import { useState } from '@w5s/application'; +import { meta } from './meta.js'; +import type { JobRequest } from './JobRequest.js'; + +export const handlers = useState(meta, 'handlers', new Map()); + +export interface JobHandler { + (request: JobRequest): Promise | void; +} diff --git a/packages/playground/src/job/JobProvider.ts b/packages/playground/src/job/JobProvider.ts index 981dfd1c282..7b187734496 100644 --- a/packages/playground/src/job/JobProvider.ts +++ b/packages/playground/src/job/JobProvider.ts @@ -1,9 +1,6 @@ +import type { JobEnqueue } from './JobEnqueue.js'; import type { JobRequest } from './JobRequest.js'; -export type JobEnqueueOptions = - | { _: 'immediate' } - | { _: 'delayed'; delay: number }; - export interface JobProvider { /** * Enqueue a job to be executed with the given request and options. @@ -12,5 +9,5 @@ export interface JobProvider { * @param request * @param options */ - enqueue(request: Request, options: JobEnqueueOptions): Promise; + enqueue(request: Request, options: JobEnqueue): Promise; } diff --git a/packages/playground/src/job/MemoryJobProvider.spec.ts b/packages/playground/src/job/MemoryJobProvider.spec.ts index c04bec722a9..383ea3d10b3 100644 --- a/packages/playground/src/job/MemoryJobProvider.spec.ts +++ b/packages/playground/src/job/MemoryJobProvider.spec.ts @@ -10,7 +10,7 @@ describe(MemoryJobProvider, () => { const provider = new MemoryJobProvider(); const request = { _: 'email', payload: { userId: '1' } }; - await provider.enqueue(request, { _: 'immediate' }); + await provider.enqueue(request, { _: 'JobEnqueueImmediate' }); expect(provider.size).toBe(1); expect(provider.peek()).toEqual({ @@ -31,7 +31,7 @@ describe(MemoryJobProvider, () => { const provider = new MemoryJobProvider({ now: () => Date.now() }); const request = { _: 'reminder', payload: { userId: '1' } }; - await provider.enqueue(request, { _: 'delayed', delay: 100 }); + await provider.enqueue(request, { _: 'JobEnqueueDelayed', delay: 100 }); expect(provider.size).toBe(0); vi.advanceTimersByTime(99); @@ -50,8 +50,8 @@ describe(MemoryJobProvider, () => { vi.useFakeTimers(); const provider = new MemoryJobProvider({ now: () => Date.now() }); - await provider.enqueue({ _: 'immediate', payload: {} }, { _: 'immediate' }); - await provider.enqueue({ _: 'delayed', payload: {} }, { _: 'delayed', delay: 100 }); + await provider.enqueue({ _: 'immediate', payload: {} }, { _: 'JobEnqueueImmediate' }); + await provider.enqueue({ _: 'delayed', payload: {} }, { _: 'JobEnqueueDelayed', delay: 100 }); provider.clear(); vi.advanceTimersByTime(100); diff --git a/packages/playground/src/job/MemoryJobProvider.ts b/packages/playground/src/job/MemoryJobProvider.ts index f0d4be43f9d..669cc6d5af4 100644 --- a/packages/playground/src/job/MemoryJobProvider.ts +++ b/packages/playground/src/job/MemoryJobProvider.ts @@ -1,4 +1,5 @@ -import type { JobEnqueueOptions, JobProvider } from './JobProvider.js'; +import type { JobEnqueue } from './JobEnqueue.js'; +import type { JobProvider } from './JobProvider.js'; import type { JobRequest } from './JobRequest.js'; export interface MemoryJobQueueEntry { @@ -40,9 +41,9 @@ export class MemoryJobProvider implements JobProvider { this.#queue.length = 0; } - async enqueue(request: Request, options: JobEnqueueOptions): Promise { + async enqueue(request: Request, options: JobEnqueue): Promise { const enqueuedAt = this.#now(); - const availableAt = options._ === 'delayed' + const availableAt = options._ === 'JobEnqueueDelayed' ? enqueuedAt + options.delay : enqueuedAt; const entry: MemoryJobQueueEntry = { @@ -51,7 +52,7 @@ export class MemoryJobProvider implements JobProvider { availableAt, }; - if (options._ === 'delayed') { + if (options._ === 'JobEnqueueDelayed') { const timer = setTimeout(() => { this.#timers.delete(timer); this.#queue.push(entry); From 1c55c5507b9ea7938255e5bb03e36dd9b53080e2 Mon Sep 17 00:00:00 2001 From: Julien Polo Date: Sat, 9 May 2026 15:45:12 +0200 Subject: [PATCH 2/7] =?UTF-8?q?=F0=9F=94=A7=20Refactor=20JobRequest=20and?= =?UTF-8?q?=20Job=20module=20for=20consistent=20naming=20and=20improved=20?= =?UTF-8?q?type=20safety?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/playground/src/job/Job.ts | 14 +++--- packages/playground/src/job/JobHandler.ts | 44 +++++++++++++++++-- packages/playground/src/job/JobRequest.ts | 12 ++++- .../src/job/MemoryJobProvider.spec.ts | 13 +++--- 4 files changed, 65 insertions(+), 18 deletions(-) diff --git a/packages/playground/src/job/Job.ts b/packages/playground/src/job/Job.ts index 35050ad7f6d..74e3c80874b 100644 --- a/packages/playground/src/job/Job.ts +++ b/packages/playground/src/job/Job.ts @@ -6,11 +6,11 @@ import type { JobRequest } from './JobRequest.js'; import { JobEnqueue } from './JobEnqueue.js'; export namespace Job { - export interface Module { - jobName: Request['_']; + export interface Module { + jobName: Request['jobName']; Request: Type.Module; - performNow(payload: Request['payload']): Task; + performNow(payload: Request['jobPayload']): Task; } } @@ -19,17 +19,17 @@ export type JobId = UUID & Tag<'JobId'>; export const Job = { nextJobId: randomUUID() as Task, - define(jobName: JobName, PayloadType: Type.Module): Job.Module<{ _: JobName; payload: Payload }> { + define(jobName: JobName, PayloadType: Type.Module): Job.Module<{ jobName: JobName; jobPayload: Payload }> { const Request = Type.Object({ - _: Type.constant(jobName), - payload: PayloadType, + jobName: Type.constant(jobName), + jobPayload: PayloadType, }, `${jobName}Job`); return { jobName, Request, performNow(payload) { - const request = { _: jobName, payload }; + const request = { jobName, jobPayload: payload }; const requestEncoded = lazy(() => Codec.encode(Request, request)); return Task.andThen(Job.nextJobId, (jobId) => { return Task.create(async () => { diff --git a/packages/playground/src/job/JobHandler.ts b/packages/playground/src/job/JobHandler.ts index 9cbca37d08e..90237bc4c44 100644 --- a/packages/playground/src/job/JobHandler.ts +++ b/packages/playground/src/job/JobHandler.ts @@ -1,9 +1,47 @@ import { useState } from '@w5s/application'; import { meta } from './meta.js'; import type { JobRequest } from './JobRequest.js'; - -export const handlers = useState(meta, 'handlers', new Map()); +import type { TaskLike } from '@w5s/task'; export interface JobHandler { - (request: JobRequest): Promise | void; + (request: JobRequest): TaskLike; } + +const __handlers = useState(meta, 'handlers', new Map() as ReadonlyMap); + +/** + * @namespace + */ +export const JobHandler = { + /** + * Registers a job handler for the specified type. + * + * @example + * ```ts + * JobHandler.register('my-job-type', (request) => { + * // handle the job request here + * return Console.log(request); + * }); + * ``` + * @param jobName + * @param handler + */ + register(jobName: Request['jobName'], handler: JobHandler): void { + __handlers.current = new Map(__handlers.current).set(jobName, handler); + }, + + /** + * Unregisters a job handler for the specified type. + * + * @example + * ```ts + * JobHandler.unregister('my-job-type'); + * ``` + * @param jobName + */ + unregister(jobName: JobRequest['jobName']): void { + const newHandlers = new Map(__handlers.current); + newHandlers.delete(jobName); + __handlers.current = newHandlers; + }, +}; diff --git a/packages/playground/src/job/JobRequest.ts b/packages/playground/src/job/JobRequest.ts index 1c322753524..f79ce910fc3 100644 --- a/packages/playground/src/job/JobRequest.ts +++ b/packages/playground/src/job/JobRequest.ts @@ -1,4 +1,12 @@ export interface JobRequest { - _: string; - payload: Record; + /** + * The name of the job type, used to identify the appropriate handler for processing the job. + * This should be a string that uniquely identifies the type of job being requested. + */ + jobName: string; + + /** + * The payload of the job request, containing the data necessary for processing the job. + */ + jobPayload: Record; } diff --git a/packages/playground/src/job/MemoryJobProvider.spec.ts b/packages/playground/src/job/MemoryJobProvider.spec.ts index 383ea3d10b3..ae52d26defb 100644 --- a/packages/playground/src/job/MemoryJobProvider.spec.ts +++ b/packages/playground/src/job/MemoryJobProvider.spec.ts @@ -1,5 +1,6 @@ import { afterEach, describe, expect, it, vi } from 'vitest'; import { MemoryJobProvider } from './MemoryJobProvider.js'; +import { JobEnqueue } from './JobEnqueue.js'; describe(MemoryJobProvider, () => { afterEach(() => { @@ -8,9 +9,9 @@ describe(MemoryJobProvider, () => { it('enqueues immediate jobs in memory', async () => { const provider = new MemoryJobProvider(); - const request = { _: 'email', payload: { userId: '1' } }; + const request = { jobName: 'email', jobPayload: { userId: '1' } }; - await provider.enqueue(request, { _: 'JobEnqueueImmediate' }); + await provider.enqueue(request, JobEnqueue.Immediate); expect(provider.size).toBe(1); expect(provider.peek()).toEqual({ @@ -29,9 +30,9 @@ describe(MemoryJobProvider, () => { it('keeps delayed jobs out of the queue until ready', async () => { vi.useFakeTimers(); const provider = new MemoryJobProvider({ now: () => Date.now() }); - const request = { _: 'reminder', payload: { userId: '1' } }; + const request = { jobName: 'reminder', jobPayload: { userId: '1' } }; - await provider.enqueue(request, { _: 'JobEnqueueDelayed', delay: 100 }); + await provider.enqueue(request, JobEnqueue.Delayed(100)); expect(provider.size).toBe(0); vi.advanceTimersByTime(99); @@ -50,8 +51,8 @@ describe(MemoryJobProvider, () => { vi.useFakeTimers(); const provider = new MemoryJobProvider({ now: () => Date.now() }); - await provider.enqueue({ _: 'immediate', payload: {} }, { _: 'JobEnqueueImmediate' }); - await provider.enqueue({ _: 'delayed', payload: {} }, { _: 'JobEnqueueDelayed', delay: 100 }); + await provider.enqueue({ jobName: 'immediate', jobPayload: {} }, JobEnqueue.Immediate); + await provider.enqueue({ jobName: 'delayed', jobPayload: {} }, JobEnqueue.Delayed(100)); provider.clear(); vi.advanceTimersByTime(100); From 91e493301eb98761f565f7f5e3f574c685c70d49 Mon Sep 17 00:00:00 2001 From: Julien Polo Date: Sat, 9 May 2026 23:58:43 +0200 Subject: [PATCH 3/7] =?UTF-8?q?=E2=9C=A8=20Introduce=20JobId=20type=20and?= =?UTF-8?q?=20update=20MemoryJobProvider=20to=20return=20jobId=20in=20enqu?= =?UTF-8?q?eue=20result?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/playground/src/job/Job.spec.ts | 65 +++++++++++++++++ packages/playground/src/job/Job.ts | 67 +++++++++++------ .../playground/src/job/JobHandler.spec.ts | 72 +++++++++++++++++++ packages/playground/src/job/JobHandler.ts | 27 +++++-- packages/playground/src/job/JobId.ts | 19 +++++ packages/playground/src/job/JobProvider.ts | 13 +++- packages/playground/src/job/JobRequest.ts | 2 +- .../src/job/MemoryJobProvider.spec.ts | 15 ++-- .../playground/src/job/MemoryJobProvider.ts | 13 +++- packages/playground/src/job/index.spec.ts | 16 +++++ packages/playground/src/job/index.ts | 5 ++ 11 files changed, 278 insertions(+), 36 deletions(-) create mode 100644 packages/playground/src/job/Job.spec.ts create mode 100644 packages/playground/src/job/JobHandler.spec.ts create mode 100644 packages/playground/src/job/JobId.ts create mode 100644 packages/playground/src/job/index.spec.ts create mode 100644 packages/playground/src/job/index.ts diff --git a/packages/playground/src/job/Job.spec.ts b/packages/playground/src/job/Job.spec.ts new file mode 100644 index 00000000000..0bd82ead7f9 --- /dev/null +++ b/packages/playground/src/job/Job.spec.ts @@ -0,0 +1,65 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import { Type, Result } from '@w5s/core'; +import { Task } from '@w5s/task'; +import { Job } from './Job.js'; +import { JobEnqueue } from './JobEnqueue.js'; +import { JobHandler } from './JobHandler.js'; +import type { JobId } from './JobId.js'; +import { configuration } from './configuration.js'; +import type { JobProvider } from './JobProvider.js'; + +describe('Job', () => { + const previousConfiguration = configuration.current; + + beforeEach(() => { + configuration.current = previousConfiguration; + }); + + it('defines a module and enqueues requests with performLater', async () => { + const provider = { + enqueue: vi.fn(async () => ({ + jobId: 'job-1' as JobId, + providerJobId: undefined, + })), + } satisfies JobProvider; + configuration.current = { provider }; + + const EmailJob = Job.define('EmailJob', Type.Object({ + to: Type.string, + retries: Type.number, + })); + + expect(EmailJob.jobName).toBe('EmailJob'); + + const result = await EmailJob.performLater({ to: 'dev@w5s.io', retries: 2 }).run(); + + expect(Result.getOrThrow(result)).toBe('job-1'); + expect(provider.enqueue).toHaveBeenCalledTimes(1); + expect(provider.enqueue).toHaveBeenCalledWith( + { + jobName: 'EmailJob', + parameters: { to: 'dev@w5s.io', retries: 2 }, + }, + JobEnqueue.Immediate, + ); + }); + + it('registers handlers via implement', () => { + const handler = vi.fn(() => Task.resolve()); + const ReportJob = Job.define('ReportJob', Type.Object({ + reportId: Type.string, + })); + + Job.implement(ReportJob, handler); + + const registered = JobHandler.get('ReportJob'); + expect(registered).toBeDefined(); + + const request = { jobName: 'ReportJob', parameters: { reportId: 'r-1' } }; + const task = registered?.(request); + + expect(task).toBeDefined(); + expect(handler).toHaveBeenCalledTimes(1); + expect(handler).toHaveBeenCalledWith(request); + }); +}); diff --git a/packages/playground/src/job/Job.ts b/packages/playground/src/job/Job.ts index 74e3c80874b..0f51530f46b 100644 --- a/packages/playground/src/job/Job.ts +++ b/packages/playground/src/job/Job.ts @@ -1,52 +1,73 @@ -import { Codec, lazy, Tag, Type } from '@w5s/core'; +import { Codec, lazy, Type } from '@w5s/core'; import { Task } from '@w5s/task'; -import { randomUUID, type UUID } from '@w5s/uuid'; import { configuration } from './configuration.js'; import type { JobRequest } from './JobRequest.js'; import { JobEnqueue } from './JobEnqueue.js'; +import type { JobId } from './JobId.js'; +import { JobHandler } from './JobHandler.js'; export namespace Job { export interface Module { + /** + * The name of the job, used to identify the job type and retrieve the corresponding handler. + */ jobName: Request['jobName']; + + /** + * The codec for the job request, used to encode and decode job requests when enqueuing and executing jobs. + */ Request: Type.Module; - performNow(payload: Request['jobPayload']): Task; + /** + * Enqueues a job to be executed later with the given parameters. + * + * @param parameters The parameters of the job, which will be passed to the job handler when the job is executed. + */ + performLater(parameters: Request['parameters']): Task; } } -export type JobId = UUID & Tag<'JobId'>; - export const Job = { - nextJobId: randomUUID() as Task, - define(jobName: JobName, PayloadType: Type.Module): Job.Module<{ jobName: JobName; jobPayload: Payload }> { + define(jobName: JobName, ParameterType: Type.Module): Job.Module<{ jobName: JobName; parameters: Payload }> { const Request = Type.Object({ jobName: Type.constant(jobName), - jobPayload: PayloadType, + parameters: ParameterType, }, `${jobName}Job`); return { jobName, Request, - performNow(payload) { - const request = { jobName, jobPayload: payload }; + performLater(parameters) { + const request = { jobName, parameters }; const requestEncoded = lazy(() => Codec.encode(Request, request)); - return Task.andThen(Job.nextJobId, (jobId) => { - return Task.create(async () => { - const { provider } = configuration.current; + return Task.create(async () => { + const { provider } = configuration.current; - await provider.enqueue(requestEncoded() as JobRequest, JobEnqueue.Immediate); - return Task.ok(jobId as JobId); - }); + const { jobId } = await provider.enqueue(requestEncoded() as JobRequest, JobEnqueue.Immediate); + return Task.ok(jobId); }); }, }; }, -}; -// export const Blah = Job.define('Blah', Type.Object({ -// foo: Type.string, -// bar: Type.number, -// })); - -// const jobId = await Blah.performNow({ foo: 'hello', bar: 42 }).run(); + /** + * Defines a job implementation + * + * @example + * ```ts + * const MyJob = Job.define('MyJob', Type.Object({ + * foo: Type.string, + * })); + * + * Job.implement(MyJob, (request) => + * Console.log(request.parameters.foo) + * ); + * ``` + * @param module + * @param handler + */ + implement(module: Job.Module, handler: JobHandler): void { + JobHandler.register(module.jobName, handler); + }, +}; diff --git a/packages/playground/src/job/JobHandler.spec.ts b/packages/playground/src/job/JobHandler.spec.ts new file mode 100644 index 00000000000..6cf6147c5d6 --- /dev/null +++ b/packages/playground/src/job/JobHandler.spec.ts @@ -0,0 +1,72 @@ +import { beforeEach, describe, expect, it } from 'vitest'; +import { useState } from '@w5s/application'; +import { JobHandler } from './JobHandler.js'; +import { meta } from './meta.js'; +import type { JobRequest } from './JobRequest.js'; + +function getHandlers() { + return useState(meta, 'handlers', new Map() as ReadonlyMap); +} + +describe('JobHandler', () => { + // eslint-disable-next-line ts/consistent-type-assertions + const generateHandler = (): JobHandler => () => ({}) as never; + + beforeEach(() => { + getHandlers().current = new Map(); + }); + + it('registers a handler for a job name', () => { + const handler = generateHandler(); + + JobHandler.register('email', handler); + + const handlers = getHandlers().current; + expect(handlers.size).toBe(1); + expect(handlers.get('email')).toBe(handler); + }); + + it('replaces an existing handler for the same job name', () => { + const previousHandler = generateHandler(); + const nextHandler = generateHandler(); + + JobHandler.register('email', previousHandler); + const previousMap = getHandlers().current; + + JobHandler.register('email', nextHandler); + + const handlers = getHandlers().current; + expect(handlers).not.toBe(previousMap); + expect(handlers.size).toBe(1); + expect(handlers.get('email')).toBe(nextHandler); + }); + + it('unregisters only the requested handler', () => { + const emailHandler = generateHandler(); + const smsHandler = generateHandler(); + + JobHandler.register('email', emailHandler); + JobHandler.register('sms', smsHandler); + + JobHandler.unregister('email'); + + const handlers = getHandlers().current; + expect(handlers.has('email')).toBe(false); + expect(handlers.get('sms')).toBe(smsHandler); + expect(handlers.size).toBe(1); + }); + + it('does nothing when unregistering a missing job name', () => { + const existing = generateHandler(); + + JobHandler.register('existing', existing); + const previousMap = getHandlers().current; + + JobHandler.unregister('missing'); + + const handlers = getHandlers().current; + expect(handlers).not.toBe(previousMap); + expect(handlers.get('existing')).toBe(existing); + expect(handlers.size).toBe(1); + }); +}); diff --git a/packages/playground/src/job/JobHandler.ts b/packages/playground/src/job/JobHandler.ts index 90237bc4c44..3fa2c34ec5f 100644 --- a/packages/playground/src/job/JobHandler.ts +++ b/packages/playground/src/job/JobHandler.ts @@ -1,18 +1,35 @@ import { useState } from '@w5s/application'; import { meta } from './meta.js'; import type { JobRequest } from './JobRequest.js'; +import type { Option } from '@w5s/core'; import type { TaskLike } from '@w5s/task'; -export interface JobHandler { - (request: JobRequest): TaskLike; +export interface JobHandler { + (request: Request): TaskLike; } -const __handlers = useState(meta, 'handlers', new Map() as ReadonlyMap); +const __handlers = useState(meta, 'handlers', new Map() as ReadonlyMap>); /** * @namespace */ export const JobHandler = { + /** + * Retrieves a job handler for the specified type, if it exists. + * + * @example + * ```ts + * const handler = JobHandler.get('my-job-type'); + * if (handler) { + * handler({ ... }); // call the handler with a job request + * } + * ``` + * @param jobName + */ + get(jobName: Request['jobName']): Option> { + return __handlers.current.get(jobName) as Option>; + }, + /** * Registers a job handler for the specified type. * @@ -20,13 +37,13 @@ export const JobHandler = { * ```ts * JobHandler.register('my-job-type', (request) => { * // handle the job request here - * return Console.log(request); + * console.log(request); * }); * ``` * @param jobName * @param handler */ - register(jobName: Request['jobName'], handler: JobHandler): void { + register(jobName: Request['jobName'], handler: JobHandler): void { __handlers.current = new Map(__handlers.current).set(jobName, handler); }, diff --git a/packages/playground/src/job/JobId.ts b/packages/playground/src/job/JobId.ts new file mode 100644 index 00000000000..3b443452c0f --- /dev/null +++ b/packages/playground/src/job/JobId.ts @@ -0,0 +1,19 @@ +import type { Tag } from '@w5s/core'; + +/** + * A unique identifier for a job. + */ +export type JobId = string & Tag<'JobId'>; + +/** + * Creates a JobId from a string value. + * + * @example + * ```typescript + * const jobId = JobId('my-job-id'); + * ``` + * @param value + */ +export function JobId(value: string): JobId { + return value as JobId; +} diff --git a/packages/playground/src/job/JobProvider.ts b/packages/playground/src/job/JobProvider.ts index 7b187734496..77ed453a5f0 100644 --- a/packages/playground/src/job/JobProvider.ts +++ b/packages/playground/src/job/JobProvider.ts @@ -1,5 +1,16 @@ +import type { Option } from '@w5s/core'; import type { JobEnqueue } from './JobEnqueue.js'; import type { JobRequest } from './JobRequest.js'; +import type { JobId } from './JobId.js'; + +export interface JobEnqueueResult { + jobId: JobId; + + /** + * Internal job id assigned by the provider, if any. + */ + providerJobId: Option; +} export interface JobProvider { /** @@ -9,5 +20,5 @@ export interface JobProvider { * @param request * @param options */ - enqueue(request: Request, options: JobEnqueue): Promise; + enqueue(request: Request, options: JobEnqueue): Promise; } diff --git a/packages/playground/src/job/JobRequest.ts b/packages/playground/src/job/JobRequest.ts index f79ce910fc3..b41f0735824 100644 --- a/packages/playground/src/job/JobRequest.ts +++ b/packages/playground/src/job/JobRequest.ts @@ -8,5 +8,5 @@ export interface JobRequest { /** * The payload of the job request, containing the data necessary for processing the job. */ - jobPayload: Record; + parameters: Record; } diff --git a/packages/playground/src/job/MemoryJobProvider.spec.ts b/packages/playground/src/job/MemoryJobProvider.spec.ts index ae52d26defb..db20fa53eeb 100644 --- a/packages/playground/src/job/MemoryJobProvider.spec.ts +++ b/packages/playground/src/job/MemoryJobProvider.spec.ts @@ -1,6 +1,8 @@ import { afterEach, describe, expect, it, vi } from 'vitest'; import { MemoryJobProvider } from './MemoryJobProvider.js'; import { JobEnqueue } from './JobEnqueue.js'; +import { Option } from '@w5s/core'; +import type { JobId } from './JobId.js'; describe(MemoryJobProvider, () => { afterEach(() => { @@ -8,18 +10,21 @@ describe(MemoryJobProvider, () => { }); it('enqueues immediate jobs in memory', async () => { - const provider = new MemoryJobProvider(); + const provider = new MemoryJobProvider({ nextJobId: () => 'job-1' as JobId }); const request = { jobName: 'email', jobPayload: { userId: '1' } }; - await provider.enqueue(request, JobEnqueue.Immediate); + const result = await provider.enqueue(request, JobEnqueue.Immediate); + expect(result).toEqual({ jobId: 'job-1', providerJobId: Option.None }); expect(provider.size).toBe(1); expect(provider.peek()).toEqual({ + jobId: 'job-1', request, enqueuedAt: expect.any(Number), availableAt: expect.any(Number), }); expect(provider.dequeue()).toEqual({ + jobId: 'job-1', request, enqueuedAt: expect.any(Number), availableAt: expect.any(Number), @@ -29,11 +34,12 @@ describe(MemoryJobProvider, () => { it('keeps delayed jobs out of the queue until ready', async () => { vi.useFakeTimers(); - const provider = new MemoryJobProvider({ now: () => Date.now() }); + const provider = new MemoryJobProvider({ now: () => Date.now(), nextJobId: () => 'job-2' as JobId }); const request = { jobName: 'reminder', jobPayload: { userId: '1' } }; - await provider.enqueue(request, JobEnqueue.Delayed(100)); + const result = await provider.enqueue(request, JobEnqueue.Delayed(100)); + expect(result).toEqual({ jobId: 'job-2', providerJobId: Option.None }); expect(provider.size).toBe(0); vi.advanceTimersByTime(99); expect(provider.size).toBe(0); @@ -41,6 +47,7 @@ describe(MemoryJobProvider, () => { vi.advanceTimersByTime(1); expect(provider.size).toBe(1); expect(provider.dequeue()).toEqual({ + jobId: 'job-2', request, enqueuedAt: expect.any(Number), availableAt: expect.any(Number), diff --git a/packages/playground/src/job/MemoryJobProvider.ts b/packages/playground/src/job/MemoryJobProvider.ts index 669cc6d5af4..a875c623bc8 100644 --- a/packages/playground/src/job/MemoryJobProvider.ts +++ b/packages/playground/src/job/MemoryJobProvider.ts @@ -1,8 +1,11 @@ +import { Option } from '@w5s/core'; import type { JobEnqueue } from './JobEnqueue.js'; import type { JobProvider } from './JobProvider.js'; import type { JobRequest } from './JobRequest.js'; +import { JobId } from './JobId.js'; export interface MemoryJobQueueEntry { + readonly jobId: JobId; readonly request: Request; readonly enqueuedAt: number; readonly availableAt: number; @@ -10,15 +13,18 @@ export interface MemoryJobQueueEntry { export interface MemoryJobProviderOptions { readonly now?: () => number; + readonly nextJobId?: () => JobId; } export class MemoryJobProvider implements JobProvider { readonly #queue: MemoryJobQueueEntry[] = []; readonly #timers = new Set>(); readonly #now: () => number; + readonly #nextJobId: () => JobId; constructor(options: MemoryJobProviderOptions = {}) { this.#now = options.now ?? (() => Date.now()); + this.#nextJobId = options.nextJobId ?? (() => JobId(globalThis.crypto.randomUUID())); } get size(): number { @@ -41,12 +47,14 @@ export class MemoryJobProvider implements JobProvider { this.#queue.length = 0; } - async enqueue(request: Request, options: JobEnqueue): Promise { + async enqueue(request: Request, options: JobEnqueue) { + const jobId = this.#nextJobId(); const enqueuedAt = this.#now(); const availableAt = options._ === 'JobEnqueueDelayed' ? enqueuedAt + options.delay : enqueuedAt; const entry: MemoryJobQueueEntry = { + jobId, request, enqueuedAt, availableAt, @@ -58,9 +66,10 @@ export class MemoryJobProvider implements JobProvider { this.#queue.push(entry); }, options.delay); this.#timers.add(timer); - return; + return { jobId, providerJobId: Option.None }; } this.#queue.push(entry); + return { jobId, providerJobId: Option.None }; } } diff --git a/packages/playground/src/job/index.spec.ts b/packages/playground/src/job/index.spec.ts new file mode 100644 index 00000000000..2a20adfe37c --- /dev/null +++ b/packages/playground/src/job/index.spec.ts @@ -0,0 +1,16 @@ +import { describe, it, expect } from 'vitest'; +import * as Module from './index.js'; + +describe('index', () => { + it('exports', () => { + expect(Object.keys(Module).toSorted()).toEqual( + [ + // Public API + 'Job', + 'JobEnqueue', + 'JobHandler', + 'JobId', + ].toSorted(), + ); + }); +}); diff --git a/packages/playground/src/job/index.ts b/packages/playground/src/job/index.ts new file mode 100644 index 00000000000..a258b110baf --- /dev/null +++ b/packages/playground/src/job/index.ts @@ -0,0 +1,5 @@ +export * from './Job.js'; +export * from './JobEnqueue.js'; +export * from './JobHandler.js'; +export * from './JobId.js'; +export * from './JobRequest.js'; From df7d3d1f2123c6741155c704650b2ddea3918c10 Mon Sep 17 00:00:00 2001 From: Julien Polo Date: Fri, 22 May 2026 21:48:45 +0200 Subject: [PATCH 4/7] =?UTF-8?q?=F0=9F=94=A7=20Refactor=20job=20enqueueing?= =?UTF-8?q?=20to=20include=20job=20module=20in=20MemoryJobProvider=20and?= =?UTF-8?q?=20Job=20definitions?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/playground/src/job/Job.spec.ts | 3 +++ packages/playground/src/job/Job.ts | 8 +++---- packages/playground/src/job/JobProvider.ts | 22 ++++++++++++++++--- .../src/job/MemoryJobProvider.spec.ts | 10 +++++---- .../playground/src/job/MemoryJobProvider.ts | 3 ++- 5 files changed, 34 insertions(+), 12 deletions(-) diff --git a/packages/playground/src/job/Job.spec.ts b/packages/playground/src/job/Job.spec.ts index 0bd82ead7f9..b0df03f7a76 100644 --- a/packages/playground/src/job/Job.spec.ts +++ b/packages/playground/src/job/Job.spec.ts @@ -36,6 +36,9 @@ describe('Job', () => { expect(Result.getOrThrow(result)).toBe('job-1'); expect(provider.enqueue).toHaveBeenCalledTimes(1); expect(provider.enqueue).toHaveBeenCalledWith( + expect.objectContaining({ + jobName: 'EmailJob', + }), { jobName: 'EmailJob', parameters: { to: 'dev@w5s.io', retries: 2 }, diff --git a/packages/playground/src/job/Job.ts b/packages/playground/src/job/Job.ts index 0f51530f46b..9c339f1e461 100644 --- a/packages/playground/src/job/Job.ts +++ b/packages/playground/src/job/Job.ts @@ -1,4 +1,4 @@ -import { Codec, lazy, Type } from '@w5s/core'; +import { Type } from '@w5s/core'; import { Task } from '@w5s/task'; import { configuration } from './configuration.js'; import type { JobRequest } from './JobRequest.js'; @@ -35,20 +35,20 @@ export const Job = { parameters: ParameterType, }, `${jobName}Job`); - return { + const instance: Job.Module<{ jobName: JobName; parameters: Payload }> = { jobName, Request, performLater(parameters) { const request = { jobName, parameters }; - const requestEncoded = lazy(() => Codec.encode(Request, request)); return Task.create(async () => { const { provider } = configuration.current; - const { jobId } = await provider.enqueue(requestEncoded() as JobRequest, JobEnqueue.Immediate); + const { jobId } = await provider.enqueue(instance, request, JobEnqueue.Immediate); return Task.ok(jobId); }); }, }; + return instance; }, /** diff --git a/packages/playground/src/job/JobProvider.ts b/packages/playground/src/job/JobProvider.ts index 77ed453a5f0..40ef9ae6dcd 100644 --- a/packages/playground/src/job/JobProvider.ts +++ b/packages/playground/src/job/JobProvider.ts @@ -2,6 +2,7 @@ import type { Option } from '@w5s/core'; import type { JobEnqueue } from './JobEnqueue.js'; import type { JobRequest } from './JobRequest.js'; import type { JobId } from './JobId.js'; +import type { Job } from './Job.js'; export interface JobEnqueueResult { jobId: JobId; @@ -17,8 +18,23 @@ export interface JobProvider { * Enqueue a job to be executed with the given request and options. * The provider is responsible for executing the job at the appropriate time based on the options provided. * - * @param request - * @param options + * @example + * ```ts + * const provider: JobProvider = { + * async enqueue(request, options) { + * // Enqueue the job using your preferred method (e.g., setTimeout, a job queue library, etc.) + * const jobId = await myJobQueue.enqueue(request, options); + * return { jobId, providerJobId: jobId.toString() }; + * }, + * }; + * + * const MyJob = Job.define('MyJob', Type.Object({ foo: Type.string })); + * MyJob.performLater({ foo: 'bar' }); + * ``` + * @param jobModule the job module defining the job type and request codec + * @param request the job request containing the parameters for the job + * @param options the options for enqueuing the job, such as delay or immediate execution + * @returns a promise that resolves to the result of enqueuing the job, including the assigned job ID */ - enqueue(request: Request, options: JobEnqueue): Promise; + enqueue(jobModule: Job.Module, request: Request, options: JobEnqueue): Promise; } diff --git a/packages/playground/src/job/MemoryJobProvider.spec.ts b/packages/playground/src/job/MemoryJobProvider.spec.ts index db20fa53eeb..62d566f46d3 100644 --- a/packages/playground/src/job/MemoryJobProvider.spec.ts +++ b/packages/playground/src/job/MemoryJobProvider.spec.ts @@ -3,17 +3,19 @@ import { MemoryJobProvider } from './MemoryJobProvider.js'; import { JobEnqueue } from './JobEnqueue.js'; import { Option } from '@w5s/core'; import type { JobId } from './JobId.js'; +import type { Job } from './Job.js'; describe(MemoryJobProvider, () => { afterEach(() => { vi.useRealTimers(); }); + const jobEmailModule = {} as any as Job.Module; it('enqueues immediate jobs in memory', async () => { const provider = new MemoryJobProvider({ nextJobId: () => 'job-1' as JobId }); const request = { jobName: 'email', jobPayload: { userId: '1' } }; - const result = await provider.enqueue(request, JobEnqueue.Immediate); + const result = await provider.enqueue(jobEmailModule, request, JobEnqueue.Immediate); expect(result).toEqual({ jobId: 'job-1', providerJobId: Option.None }); expect(provider.size).toBe(1); @@ -37,7 +39,7 @@ describe(MemoryJobProvider, () => { const provider = new MemoryJobProvider({ now: () => Date.now(), nextJobId: () => 'job-2' as JobId }); const request = { jobName: 'reminder', jobPayload: { userId: '1' } }; - const result = await provider.enqueue(request, JobEnqueue.Delayed(100)); + const result = await provider.enqueue(jobEmailModule, request, JobEnqueue.Delayed(100)); expect(result).toEqual({ jobId: 'job-2', providerJobId: Option.None }); expect(provider.size).toBe(0); @@ -58,8 +60,8 @@ describe(MemoryJobProvider, () => { vi.useFakeTimers(); const provider = new MemoryJobProvider({ now: () => Date.now() }); - await provider.enqueue({ jobName: 'immediate', jobPayload: {} }, JobEnqueue.Immediate); - await provider.enqueue({ jobName: 'delayed', jobPayload: {} }, JobEnqueue.Delayed(100)); + await provider.enqueue(jobEmailModule, { jobName: 'immediate', jobPayload: {} }, JobEnqueue.Immediate); + await provider.enqueue(jobEmailModule, { jobName: 'delayed', jobPayload: {} }, JobEnqueue.Delayed(100)); provider.clear(); vi.advanceTimersByTime(100); diff --git a/packages/playground/src/job/MemoryJobProvider.ts b/packages/playground/src/job/MemoryJobProvider.ts index a875c623bc8..9aa7e35ca08 100644 --- a/packages/playground/src/job/MemoryJobProvider.ts +++ b/packages/playground/src/job/MemoryJobProvider.ts @@ -2,6 +2,7 @@ import { Option } from '@w5s/core'; import type { JobEnqueue } from './JobEnqueue.js'; import type { JobProvider } from './JobProvider.js'; import type { JobRequest } from './JobRequest.js'; +import type { Job } from './Job.js'; import { JobId } from './JobId.js'; export interface MemoryJobQueueEntry { @@ -47,7 +48,7 @@ export class MemoryJobProvider implements JobProvider { this.#queue.length = 0; } - async enqueue(request: Request, options: JobEnqueue) { + async enqueue(_jobModule: Job.Module, request: Request, options: JobEnqueue) { const jobId = this.#nextJobId(); const enqueuedAt = this.#now(); const availableAt = options._ === 'JobEnqueueDelayed' From d4f1f681b468ec391d9af039f340526f4a609f9a Mon Sep 17 00:00:00 2001 From: Julien Polo Date: Fri, 22 May 2026 23:26:13 +0200 Subject: [PATCH 5/7] =?UTF-8?q?=E2=9C=A8=20Add=20JobEmail=20implementation?= =?UTF-8?q?=20and=20update=20Job=20module=20to=20support=20job=20definitio?= =?UTF-8?q?ns?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/initializer-example/job/JobEmail.ts | 11 ++++++++++ packages/playground/src/job/Job.ts | 22 ++++++++++++++++++- 2 files changed, 32 insertions(+), 1 deletion(-) create mode 100644 packages/playground/src/initializer-example/job/JobEmail.ts diff --git a/packages/playground/src/initializer-example/job/JobEmail.ts b/packages/playground/src/initializer-example/job/JobEmail.ts new file mode 100644 index 00000000000..f7686ed8178 --- /dev/null +++ b/packages/playground/src/initializer-example/job/JobEmail.ts @@ -0,0 +1,11 @@ +import { Type } from '@w5s/core'; +import { Console } from '@w5s/console'; +import { Job } from '../../job/Job.js'; + +export const SendEmail = Job + .define('SendEmail', Type.Object({ + email: Type.string, + subject: Type.string, + body: Type.Option(Type.string), + })) + .implement((request) => Console.log('Sending email to', request)); diff --git a/packages/playground/src/job/Job.ts b/packages/playground/src/job/Job.ts index 9c339f1e461..ff5c52f9a2e 100644 --- a/packages/playground/src/job/Job.ts +++ b/packages/playground/src/job/Job.ts @@ -24,6 +24,23 @@ export namespace Job { * @param parameters The parameters of the job, which will be passed to the job handler when the job is executed. */ performLater(parameters: Request['parameters']): Task; + + /** + * Defines a job implementation + * + * @example + * ```ts + * const MyJob = Job.define('MyJob', Type.Object({ + * foo: Type.string, + * })); + * + * Job.implement(MyJob, (request) => + * Console.log(request.parameters.foo) + * ); + * ``` + * @param handler + */ + implement(handler: JobHandler): void; } } @@ -33,7 +50,7 @@ export const Job = { const Request = Type.Object({ jobName: Type.constant(jobName), parameters: ParameterType, - }, `${jobName}Job`); + }, jobName); const instance: Job.Module<{ jobName: JobName; parameters: Payload }> = { jobName, @@ -47,6 +64,9 @@ export const Job = { return Task.ok(jobId); }); }, + implement(handler) { + Job.implement(instance, handler); + }, }; return instance; }, From 66d96a62dbdebaf1dbaa7360ff6a92914d32a029 Mon Sep 17 00:00:00 2001 From: Julien Polo Date: Sun, 24 May 2026 00:21:20 +0200 Subject: [PATCH 6/7] =?UTF-8?q?=E2=9C=A8=20Implement=20JobRunner=20with=20?= =?UTF-8?q?dispatch=20function=20for=20job=20handling?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/playground/src/job/JobRunner.ts | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 packages/playground/src/job/JobRunner.ts diff --git a/packages/playground/src/job/JobRunner.ts b/packages/playground/src/job/JobRunner.ts new file mode 100644 index 00000000000..5acd9c57d0d --- /dev/null +++ b/packages/playground/src/job/JobRunner.ts @@ -0,0 +1,17 @@ +import { Task } from '@w5s/task'; +import type { JobRequest } from './JobRequest.js'; +import { JobHandler } from './JobHandler.js'; + +function dispatch(request: JobRequest): Task { + const handler = JobHandler.get(request.jobName); + return (handler == null + ? Task.resolve() + : Task.from(handler(request))); +} + +/** + * @namespace + */ +export const JobRunner = { + dispatch, +}; From afdd774dbb725d04d13bed5392b65baf6b5b1f83 Mon Sep 17 00:00:00 2001 From: Julien Polo Date: Mon, 25 May 2026 22:52:45 +0200 Subject: [PATCH 7/7] =?UTF-8?q?=F0=9F=94=A7=20Refactor=20dispatch=20functi?= =?UTF-8?q?on=20in=20JobRunner=20to=20improve=20error=20handling=20with=20?= =?UTF-8?q?InvariantError?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/playground/src/job/JobRunner.ts | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/packages/playground/src/job/JobRunner.ts b/packages/playground/src/job/JobRunner.ts index 5acd9c57d0d..e937c8336f0 100644 --- a/packages/playground/src/job/JobRunner.ts +++ b/packages/playground/src/job/JobRunner.ts @@ -1,12 +1,20 @@ -import { Task } from '@w5s/task'; +import type { Option } from '@w5s/core'; +import { from } from '@w5s/task/dist/Task/from.js'; +import { andThen } from '@w5s/task/dist/Task/andThen.js'; +import { reject } from '@w5s/task/dist/Task/reject.js'; import type { JobRequest } from './JobRequest.js'; import { JobHandler } from './JobHandler.js'; +import { InvariantError } from '@w5s/error/dist/InvariantError.js'; +import type { Task } from '@w5s/task'; -function dispatch(request: JobRequest): Task { - const handler = JobHandler.get(request.jobName); - return (handler == null - ? Task.resolve() - : Task.from(handler(request))); +function dispatch(request: JobRequest): Task { + const handlerTask = from, never>(({ resolve }) => resolve(JobHandler.get(request.jobName))); + + return andThen(handlerTask, (handler) => + handler == null + ? reject(new InvariantError(`No handler found for job: ${request.jobName}`)) + : handler(request), + ); } /**