diff --git a/packages/playground/src/initializer-example/job/JobEmail.ts b/packages/playground/src/initializer-example/job/JobEmail.ts new file mode 100644 index 00000000000..f7686ed8178 --- /dev/null +++ b/packages/playground/src/initializer-example/job/JobEmail.ts @@ -0,0 +1,11 @@ +import { Type } from '@w5s/core'; +import { Console } from '@w5s/console'; +import { Job } from '../../job/Job.js'; + +export const SendEmail = Job + .define('SendEmail', Type.Object({ + email: Type.string, + subject: Type.string, + body: Type.Option(Type.string), + })) + .implement((request) => Console.log('Sending email to', request)); diff --git a/packages/playground/src/job/Job.spec.ts b/packages/playground/src/job/Job.spec.ts new file mode 100644 index 00000000000..b0df03f7a76 --- /dev/null +++ b/packages/playground/src/job/Job.spec.ts @@ -0,0 +1,68 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import { Type, Result } from '@w5s/core'; +import { Task } from '@w5s/task'; +import { Job } from './Job.js'; +import { JobEnqueue } from './JobEnqueue.js'; +import { JobHandler } from './JobHandler.js'; +import type { JobId } from './JobId.js'; +import { configuration } from './configuration.js'; +import type { JobProvider } from './JobProvider.js'; + +describe('Job', () => { + const previousConfiguration = configuration.current; + + beforeEach(() => { + configuration.current = previousConfiguration; + }); + + it('defines a module and enqueues requests with performLater', async () => { + const provider = { + enqueue: vi.fn(async () => ({ + jobId: 'job-1' as JobId, + providerJobId: undefined, + })), + } satisfies JobProvider; + configuration.current = { provider }; + + const EmailJob = Job.define('EmailJob', Type.Object({ + to: Type.string, + retries: Type.number, + })); + + expect(EmailJob.jobName).toBe('EmailJob'); + + const result = await EmailJob.performLater({ to: 'dev@w5s.io', retries: 2 }).run(); + + expect(Result.getOrThrow(result)).toBe('job-1'); + expect(provider.enqueue).toHaveBeenCalledTimes(1); + expect(provider.enqueue).toHaveBeenCalledWith( + expect.objectContaining({ + jobName: 'EmailJob', + }), + { + jobName: 'EmailJob', + parameters: { to: 'dev@w5s.io', retries: 2 }, + }, + JobEnqueue.Immediate, + ); + }); + + it('registers handlers via implement', () => { + const handler = vi.fn(() => Task.resolve()); + const ReportJob = Job.define('ReportJob', Type.Object({ + reportId: Type.string, + })); + + Job.implement(ReportJob, handler); + + const registered = JobHandler.get('ReportJob'); + expect(registered).toBeDefined(); + + const request = { jobName: 'ReportJob', parameters: { reportId: 'r-1' } }; + const task = registered?.(request); + + expect(task).toBeDefined(); + expect(handler).toHaveBeenCalledTimes(1); + expect(handler).toHaveBeenCalledWith(request); + }); +}); diff --git a/packages/playground/src/job/Job.ts b/packages/playground/src/job/Job.ts index 6c6266dd9bf..ff5c52f9a2e 100644 --- a/packages/playground/src/job/Job.ts +++ b/packages/playground/src/job/Job.ts @@ -1,46 +1,93 @@ -import { Codec, lazy, Tag, Type } from '@w5s/core'; +import { Type } from '@w5s/core'; import { Task } from '@w5s/task'; -import { randomUUID, type UUID } from '@w5s/uuid'; +import { configuration } from './configuration.js'; +import type { JobRequest } from './JobRequest.js'; +import { JobEnqueue } from './JobEnqueue.js'; +import type { JobId } from './JobId.js'; +import { JobHandler } from './JobHandler.js'; export namespace Job { - export interface Module { - jobName: Request['_']; + export interface Module { + /** + * The name of the job, used to identify the job type and retrieve the corresponding handler. + */ + jobName: Request['jobName']; + + /** + * The codec for the job request, used to encode and decode job requests when enqueuing and executing jobs. + */ Request: Type.Module; - create(payload: Request['payload']): Task; + /** + * Enqueues a job to be executed later with the given parameters. + * + * @param parameters The parameters of the job, which will be passed to the job handler when the job is executed. + */ + performLater(parameters: Request['parameters']): Task; + + /** + * Defines a job implementation + * + * @example + * ```ts + * const MyJob = Job.define('MyJob', Type.Object({ + * foo: Type.string, + * })); + * + * Job.implement(MyJob, (request) => + * Console.log(request.parameters.foo) + * ); + * ``` + * @param handler + */ + implement(handler: JobHandler): void; } } -export type JobId = UUID & Tag<'JobId'>; - export const Job = { - nextJobId: randomUUID() as Task, - define(jobName: JobName, PayloadType: Type.Module): Job.Module<{ _: JobName; payload: Payload }> { + define(jobName: JobName, ParameterType: Type.Module): Job.Module<{ jobName: JobName; parameters: Payload }> { const Request = Type.Object({ - _: Type.constant(jobName), - payload: PayloadType, - }, `${jobName}Job`); - return { + jobName: Type.constant(jobName), + parameters: ParameterType, + }, jobName); + + const instance: Job.Module<{ jobName: JobName; parameters: Payload }> = { jobName, Request, - create(payload) { - const request = { _: jobName, payload }; - const requestEncoded = lazy(() => Codec.encode(Request, request)); - return Task.andThen(Job.nextJobId, (jobId) => { - return Task.create(async () => { - console.log(`Running job ${jobName} with payload:`, payload, requestEncoded()); - return Task.ok(jobId as JobId); - }); + performLater(parameters) { + const request = { jobName, parameters }; + return Task.create(async () => { + const { provider } = configuration.current; + + const { jobId } = await provider.enqueue(instance, request, JobEnqueue.Immediate); + return Task.ok(jobId); }); }, + implement(handler) { + Job.implement(instance, handler); + }, }; + return instance; }, -}; - -// export const Blah = Job.define('Blah', Type.Object({ -// foo: Type.string, -// bar: Type.number, -// })); -// const jobId = await Blah.create({ foo: 'hello', bar: 42 }).run(); + /** + * Defines a job implementation + * + * @example + * ```ts + * const MyJob = Job.define('MyJob', Type.Object({ + * foo: Type.string, + * })); + * + * Job.implement(MyJob, (request) => + * Console.log(request.parameters.foo) + * ); + * ``` + * @param module + * @param handler + */ + implement(module: Job.Module, handler: JobHandler): void { + JobHandler.register(module.jobName, handler); + }, +}; diff --git a/packages/playground/src/job/JobEnqueue.ts b/packages/playground/src/job/JobEnqueue.ts new file mode 100644 index 00000000000..91d4d6ab016 --- /dev/null +++ b/packages/playground/src/job/JobEnqueue.ts @@ -0,0 +1,21 @@ +export type JobEnqueue = + | { _: 'JobEnqueueImmediate' } + | { _: 'JobEnqueueDelayed'; delay: number }; +export const JobEnqueue = { + /** + * Immediate job enqueue option, the job will be executed as soon as possible. + * + * @example + * JobEnqueue.Immediate + */ + Immediate: { _: 'JobEnqueueImmediate' } satisfies JobEnqueue, + + /** + * Creates a delayed job enqueue option. + * + * @example + * JobEnqueue.Delayed(5000) // The job will be executed after 5 seconds + * @param delay + */ + Delayed: (delay: number): JobEnqueue => ({ _: 'JobEnqueueDelayed', delay }), +}; diff --git a/packages/playground/src/job/JobHandler.spec.ts b/packages/playground/src/job/JobHandler.spec.ts new file mode 100644 index 00000000000..6cf6147c5d6 --- /dev/null +++ b/packages/playground/src/job/JobHandler.spec.ts @@ -0,0 +1,72 @@ +import { beforeEach, describe, expect, it } from 'vitest'; +import { useState } from '@w5s/application'; +import { JobHandler } from './JobHandler.js'; +import { meta } from './meta.js'; +import type { JobRequest } from './JobRequest.js'; + +function getHandlers() { + return useState(meta, 'handlers', new Map() as ReadonlyMap); +} + +describe('JobHandler', () => { + // eslint-disable-next-line ts/consistent-type-assertions + const generateHandler = (): JobHandler => () => ({}) as never; + + beforeEach(() => { + getHandlers().current = new Map(); + }); + + it('registers a handler for a job name', () => { + const handler = generateHandler(); + + JobHandler.register('email', handler); + + const handlers = getHandlers().current; + expect(handlers.size).toBe(1); + expect(handlers.get('email')).toBe(handler); + }); + + it('replaces an existing handler for the same job name', () => { + const previousHandler = generateHandler(); + const nextHandler = generateHandler(); + + JobHandler.register('email', previousHandler); + const previousMap = getHandlers().current; + + JobHandler.register('email', nextHandler); + + const handlers = getHandlers().current; + expect(handlers).not.toBe(previousMap); + expect(handlers.size).toBe(1); + expect(handlers.get('email')).toBe(nextHandler); + }); + + it('unregisters only the requested handler', () => { + const emailHandler = generateHandler(); + const smsHandler = generateHandler(); + + JobHandler.register('email', emailHandler); + JobHandler.register('sms', smsHandler); + + JobHandler.unregister('email'); + + const handlers = getHandlers().current; + expect(handlers.has('email')).toBe(false); + expect(handlers.get('sms')).toBe(smsHandler); + expect(handlers.size).toBe(1); + }); + + it('does nothing when unregistering a missing job name', () => { + const existing = generateHandler(); + + JobHandler.register('existing', existing); + const previousMap = getHandlers().current; + + JobHandler.unregister('missing'); + + const handlers = getHandlers().current; + expect(handlers).not.toBe(previousMap); + expect(handlers.get('existing')).toBe(existing); + expect(handlers.size).toBe(1); + }); +}); diff --git a/packages/playground/src/job/JobHandler.ts b/packages/playground/src/job/JobHandler.ts new file mode 100644 index 00000000000..3fa2c34ec5f --- /dev/null +++ b/packages/playground/src/job/JobHandler.ts @@ -0,0 +1,64 @@ +import { useState } from '@w5s/application'; +import { meta } from './meta.js'; +import type { JobRequest } from './JobRequest.js'; +import type { Option } from '@w5s/core'; +import type { TaskLike } from '@w5s/task'; + +export interface JobHandler { + (request: Request): TaskLike; +} + +const __handlers = useState(meta, 'handlers', new Map() as ReadonlyMap>); + +/** + * @namespace + */ +export const JobHandler = { + /** + * Retrieves a job handler for the specified type, if it exists. + * + * @example + * ```ts + * const handler = JobHandler.get('my-job-type'); + * if (handler) { + * handler({ ... }); // call the handler with a job request + * } + * ``` + * @param jobName + */ + get(jobName: Request['jobName']): Option> { + return __handlers.current.get(jobName) as Option>; + }, + + /** + * Registers a job handler for the specified type. + * + * @example + * ```ts + * JobHandler.register('my-job-type', (request) => { + * // handle the job request here + * console.log(request); + * }); + * ``` + * @param jobName + * @param handler + */ + register(jobName: Request['jobName'], handler: JobHandler): void { + __handlers.current = new Map(__handlers.current).set(jobName, handler); + }, + + /** + * Unregisters a job handler for the specified type. + * + * @example + * ```ts + * JobHandler.unregister('my-job-type'); + * ``` + * @param jobName + */ + unregister(jobName: JobRequest['jobName']): void { + const newHandlers = new Map(__handlers.current); + newHandlers.delete(jobName); + __handlers.current = newHandlers; + }, +}; diff --git a/packages/playground/src/job/JobId.ts b/packages/playground/src/job/JobId.ts new file mode 100644 index 00000000000..3b443452c0f --- /dev/null +++ b/packages/playground/src/job/JobId.ts @@ -0,0 +1,19 @@ +import type { Tag } from '@w5s/core'; + +/** + * A unique identifier for a job. + */ +export type JobId = string & Tag<'JobId'>; + +/** + * Creates a JobId from a string value. + * + * @example + * ```typescript + * const jobId = JobId('my-job-id'); + * ``` + * @param value + */ +export function JobId(value: string): JobId { + return value as JobId; +} diff --git a/packages/playground/src/job/JobProvider.ts b/packages/playground/src/job/JobProvider.ts index 981dfd1c282..40ef9ae6dcd 100644 --- a/packages/playground/src/job/JobProvider.ts +++ b/packages/playground/src/job/JobProvider.ts @@ -1,16 +1,40 @@ +import type { Option } from '@w5s/core'; +import type { JobEnqueue } from './JobEnqueue.js'; import type { JobRequest } from './JobRequest.js'; +import type { JobId } from './JobId.js'; +import type { Job } from './Job.js'; -export type JobEnqueueOptions = - | { _: 'immediate' } - | { _: 'delayed'; delay: number }; +export interface JobEnqueueResult { + jobId: JobId; + + /** + * Internal job id assigned by the provider, if any. + */ + providerJobId: Option; +} export interface JobProvider { /** * Enqueue a job to be executed with the given request and options. * The provider is responsible for executing the job at the appropriate time based on the options provided. * - * @param request - * @param options + * @example + * ```ts + * const provider: JobProvider = { + * async enqueue(request, options) { + * // Enqueue the job using your preferred method (e.g., setTimeout, a job queue library, etc.) + * const jobId = await myJobQueue.enqueue(request, options); + * return { jobId, providerJobId: jobId.toString() }; + * }, + * }; + * + * const MyJob = Job.define('MyJob', Type.Object({ foo: Type.string })); + * MyJob.performLater({ foo: 'bar' }); + * ``` + * @param jobModule the job module defining the job type and request codec + * @param request the job request containing the parameters for the job + * @param options the options for enqueuing the job, such as delay or immediate execution + * @returns a promise that resolves to the result of enqueuing the job, including the assigned job ID */ - enqueue(request: Request, options: JobEnqueueOptions): Promise; + enqueue(jobModule: Job.Module, request: Request, options: JobEnqueue): Promise; } diff --git a/packages/playground/src/job/JobRequest.ts b/packages/playground/src/job/JobRequest.ts index 1c322753524..b41f0735824 100644 --- a/packages/playground/src/job/JobRequest.ts +++ b/packages/playground/src/job/JobRequest.ts @@ -1,4 +1,12 @@ export interface JobRequest { - _: string; - payload: Record; + /** + * The name of the job type, used to identify the appropriate handler for processing the job. + * This should be a string that uniquely identifies the type of job being requested. + */ + jobName: string; + + /** + * The payload of the job request, containing the data necessary for processing the job. + */ + parameters: Record; } diff --git a/packages/playground/src/job/JobRunner.ts b/packages/playground/src/job/JobRunner.ts new file mode 100644 index 00000000000..e937c8336f0 --- /dev/null +++ b/packages/playground/src/job/JobRunner.ts @@ -0,0 +1,25 @@ +import type { Option } from '@w5s/core'; +import { from } from '@w5s/task/dist/Task/from.js'; +import { andThen } from '@w5s/task/dist/Task/andThen.js'; +import { reject } from '@w5s/task/dist/Task/reject.js'; +import type { JobRequest } from './JobRequest.js'; +import { JobHandler } from './JobHandler.js'; +import { InvariantError } from '@w5s/error/dist/InvariantError.js'; +import type { Task } from '@w5s/task'; + +function dispatch(request: JobRequest): Task { + const handlerTask = from, never>(({ resolve }) => resolve(JobHandler.get(request.jobName))); + + return andThen(handlerTask, (handler) => + handler == null + ? reject(new InvariantError(`No handler found for job: ${request.jobName}`)) + : handler(request), + ); +} + +/** + * @namespace + */ +export const JobRunner = { + dispatch, +}; diff --git a/packages/playground/src/job/MemoryJobProvider.spec.ts b/packages/playground/src/job/MemoryJobProvider.spec.ts index c04bec722a9..62d566f46d3 100644 --- a/packages/playground/src/job/MemoryJobProvider.spec.ts +++ b/packages/playground/src/job/MemoryJobProvider.spec.ts @@ -1,24 +1,32 @@ import { afterEach, describe, expect, it, vi } from 'vitest'; import { MemoryJobProvider } from './MemoryJobProvider.js'; +import { JobEnqueue } from './JobEnqueue.js'; +import { Option } from '@w5s/core'; +import type { JobId } from './JobId.js'; +import type { Job } from './Job.js'; describe(MemoryJobProvider, () => { afterEach(() => { vi.useRealTimers(); }); + const jobEmailModule = {} as any as Job.Module; it('enqueues immediate jobs in memory', async () => { - const provider = new MemoryJobProvider(); - const request = { _: 'email', payload: { userId: '1' } }; + const provider = new MemoryJobProvider({ nextJobId: () => 'job-1' as JobId }); + const request = { jobName: 'email', jobPayload: { userId: '1' } }; - await provider.enqueue(request, { _: 'immediate' }); + const result = await provider.enqueue(jobEmailModule, request, JobEnqueue.Immediate); + expect(result).toEqual({ jobId: 'job-1', providerJobId: Option.None }); expect(provider.size).toBe(1); expect(provider.peek()).toEqual({ + jobId: 'job-1', request, enqueuedAt: expect.any(Number), availableAt: expect.any(Number), }); expect(provider.dequeue()).toEqual({ + jobId: 'job-1', request, enqueuedAt: expect.any(Number), availableAt: expect.any(Number), @@ -28,11 +36,12 @@ describe(MemoryJobProvider, () => { it('keeps delayed jobs out of the queue until ready', async () => { vi.useFakeTimers(); - const provider = new MemoryJobProvider({ now: () => Date.now() }); - const request = { _: 'reminder', payload: { userId: '1' } }; + const provider = new MemoryJobProvider({ now: () => Date.now(), nextJobId: () => 'job-2' as JobId }); + const request = { jobName: 'reminder', jobPayload: { userId: '1' } }; - await provider.enqueue(request, { _: 'delayed', delay: 100 }); + const result = await provider.enqueue(jobEmailModule, request, JobEnqueue.Delayed(100)); + expect(result).toEqual({ jobId: 'job-2', providerJobId: Option.None }); expect(provider.size).toBe(0); vi.advanceTimersByTime(99); expect(provider.size).toBe(0); @@ -40,6 +49,7 @@ describe(MemoryJobProvider, () => { vi.advanceTimersByTime(1); expect(provider.size).toBe(1); expect(provider.dequeue()).toEqual({ + jobId: 'job-2', request, enqueuedAt: expect.any(Number), availableAt: expect.any(Number), @@ -50,8 +60,8 @@ describe(MemoryJobProvider, () => { vi.useFakeTimers(); const provider = new MemoryJobProvider({ now: () => Date.now() }); - await provider.enqueue({ _: 'immediate', payload: {} }, { _: 'immediate' }); - await provider.enqueue({ _: 'delayed', payload: {} }, { _: 'delayed', delay: 100 }); + await provider.enqueue(jobEmailModule, { jobName: 'immediate', jobPayload: {} }, JobEnqueue.Immediate); + await provider.enqueue(jobEmailModule, { jobName: 'delayed', jobPayload: {} }, JobEnqueue.Delayed(100)); provider.clear(); vi.advanceTimersByTime(100); diff --git a/packages/playground/src/job/MemoryJobProvider.ts b/packages/playground/src/job/MemoryJobProvider.ts index f0d4be43f9d..9aa7e35ca08 100644 --- a/packages/playground/src/job/MemoryJobProvider.ts +++ b/packages/playground/src/job/MemoryJobProvider.ts @@ -1,7 +1,12 @@ -import type { JobEnqueueOptions, JobProvider } from './JobProvider.js'; +import { Option } from '@w5s/core'; +import type { JobEnqueue } from './JobEnqueue.js'; +import type { JobProvider } from './JobProvider.js'; import type { JobRequest } from './JobRequest.js'; +import type { Job } from './Job.js'; +import { JobId } from './JobId.js'; export interface MemoryJobQueueEntry { + readonly jobId: JobId; readonly request: Request; readonly enqueuedAt: number; readonly availableAt: number; @@ -9,15 +14,18 @@ export interface MemoryJobQueueEntry { export interface MemoryJobProviderOptions { readonly now?: () => number; + readonly nextJobId?: () => JobId; } export class MemoryJobProvider implements JobProvider { readonly #queue: MemoryJobQueueEntry[] = []; readonly #timers = new Set>(); readonly #now: () => number; + readonly #nextJobId: () => JobId; constructor(options: MemoryJobProviderOptions = {}) { this.#now = options.now ?? (() => Date.now()); + this.#nextJobId = options.nextJobId ?? (() => JobId(globalThis.crypto.randomUUID())); } get size(): number { @@ -40,26 +48,29 @@ export class MemoryJobProvider implements JobProvider { this.#queue.length = 0; } - async enqueue(request: Request, options: JobEnqueueOptions): Promise { + async enqueue(_jobModule: Job.Module, request: Request, options: JobEnqueue) { + const jobId = this.#nextJobId(); const enqueuedAt = this.#now(); - const availableAt = options._ === 'delayed' + const availableAt = options._ === 'JobEnqueueDelayed' ? enqueuedAt + options.delay : enqueuedAt; const entry: MemoryJobQueueEntry = { + jobId, request, enqueuedAt, availableAt, }; - if (options._ === 'delayed') { + if (options._ === 'JobEnqueueDelayed') { const timer = setTimeout(() => { this.#timers.delete(timer); this.#queue.push(entry); }, options.delay); this.#timers.add(timer); - return; + return { jobId, providerJobId: Option.None }; } this.#queue.push(entry); + return { jobId, providerJobId: Option.None }; } } diff --git a/packages/playground/src/job/index.spec.ts b/packages/playground/src/job/index.spec.ts new file mode 100644 index 00000000000..2a20adfe37c --- /dev/null +++ b/packages/playground/src/job/index.spec.ts @@ -0,0 +1,16 @@ +import { describe, it, expect } from 'vitest'; +import * as Module from './index.js'; + +describe('index', () => { + it('exports', () => { + expect(Object.keys(Module).toSorted()).toEqual( + [ + // Public API + 'Job', + 'JobEnqueue', + 'JobHandler', + 'JobId', + ].toSorted(), + ); + }); +}); diff --git a/packages/playground/src/job/index.ts b/packages/playground/src/job/index.ts new file mode 100644 index 00000000000..a258b110baf --- /dev/null +++ b/packages/playground/src/job/index.ts @@ -0,0 +1,5 @@ +export * from './Job.js'; +export * from './JobEnqueue.js'; +export * from './JobHandler.js'; +export * from './JobId.js'; +export * from './JobRequest.js';