Skip to content
11 changes: 11 additions & 0 deletions packages/playground/src/initializer-example/job/JobEmail.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { Type } from '@w5s/core';
import { Console } from '@w5s/console';
import { Job } from '../../job/Job.js';

export const SendEmail = Job
.define('SendEmail', Type.Object({
email: Type.string,
subject: Type.string,
body: Type.Option(Type.string),
}))
.implement((request) => Console.log('Sending email to', request));
68 changes: 68 additions & 0 deletions packages/playground/src/job/Job.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import { beforeEach, describe, expect, it, vi } from 'vitest';
import { Type, Result } from '@w5s/core';
import { Task } from '@w5s/task';
import { Job } from './Job.js';
import { JobEnqueue } from './JobEnqueue.js';
import { JobHandler } from './JobHandler.js';
import type { JobId } from './JobId.js';
import { configuration } from './configuration.js';
import type { JobProvider } from './JobProvider.js';

describe('Job', () => {
const previousConfiguration = configuration.current;

beforeEach(() => {
configuration.current = previousConfiguration;
});

it('defines a module and enqueues requests with performLater', async () => {
const provider = {
enqueue: vi.fn(async () => ({
jobId: 'job-1' as JobId,
providerJobId: undefined,
})),
} satisfies JobProvider;
configuration.current = { provider };

const EmailJob = Job.define('EmailJob', Type.Object({
to: Type.string,
retries: Type.number,
}));

expect(EmailJob.jobName).toBe('EmailJob');

const result = await EmailJob.performLater({ to: 'dev@w5s.io', retries: 2 }).run();

expect(Result.getOrThrow(result)).toBe('job-1');
expect(provider.enqueue).toHaveBeenCalledTimes(1);
expect(provider.enqueue).toHaveBeenCalledWith(
expect.objectContaining({
jobName: 'EmailJob',
}),
{
jobName: 'EmailJob',
parameters: { to: 'dev@w5s.io', retries: 2 },
},
JobEnqueue.Immediate,
);
});

it('registers handlers via implement', () => {
const handler = vi.fn(() => Task.resolve());
const ReportJob = Job.define('ReportJob', Type.Object({
reportId: Type.string,
}));

Job.implement(ReportJob, handler);

const registered = JobHandler.get('ReportJob');
expect(registered).toBeDefined();

const request = { jobName: 'ReportJob', parameters: { reportId: 'r-1' } };
const task = registered?.(request);

expect(task).toBeDefined();
expect(handler).toHaveBeenCalledTimes(1);
expect(handler).toHaveBeenCalledWith(request);
});
});
103 changes: 75 additions & 28 deletions packages/playground/src/job/Job.ts
Original file line number Diff line number Diff line change
@@ -1,46 +1,93 @@
import { Codec, lazy, Tag, Type } from '@w5s/core';
import { Type } from '@w5s/core';
import { Task } from '@w5s/task';
import { randomUUID, type UUID } from '@w5s/uuid';
import { configuration } from './configuration.js';
import type { JobRequest } from './JobRequest.js';
import { JobEnqueue } from './JobEnqueue.js';
import type { JobId } from './JobId.js';
import { JobHandler } from './JobHandler.js';

export namespace Job {
export interface Module<Request extends { _: string; payload: unknown }> {
jobName: Request['_'];
export interface Module<Request extends JobRequest> {
/**
* The name of the job, used to identify the job type and retrieve the corresponding handler.
*/
jobName: Request['jobName'];

/**
* The codec for the job request, used to encode and decode job requests when enqueuing and executing jobs.
*/
Request: Type.Module<Request>;

create(payload: Request['payload']): Task<JobId, never>;
/**
* Enqueues a job to be executed later with the given parameters.
*
* @param parameters The parameters of the job, which will be passed to the job handler when the job is executed.
*/
performLater(parameters: Request['parameters']): Task<JobId, never>;

/**
* Defines a job implementation
*
* @example
* ```ts
* const MyJob = Job.define('MyJob', Type.Object({
* foo: Type.string,
* }));
*
* Job.implement(MyJob, (request) =>
* Console.log(request.parameters.foo)
* );
* ```
* @param handler
*/
implement(handler: JobHandler<Request>): void;
}
}

export type JobId = UUID & Tag<'JobId'>;

export const Job = {
nextJobId: randomUUID() as Task<JobId, never>,

define<JobName extends string, Payload>(jobName: JobName, PayloadType: Type.Module<Payload>): Job.Module<{ _: JobName; payload: Payload }> {
define<JobName extends JobRequest['jobName'], Payload extends JobRequest['parameters']>(jobName: JobName, ParameterType: Type.Module<Payload>): Job.Module<{ jobName: JobName; parameters: Payload }> {
const Request = Type.Object({
_: Type.constant(jobName),
payload: PayloadType,
}, `${jobName}Job`);
return {
jobName: Type.constant(jobName),
parameters: ParameterType,
}, jobName);

const instance: Job.Module<{ jobName: JobName; parameters: Payload }> = {
jobName,
Request,
create(payload) {
const request = { _: jobName, payload };
const requestEncoded = lazy(() => Codec.encode(Request, request));
return Task.andThen(Job.nextJobId, (jobId) => {
return Task.create<JobId, never>(async () => {
console.log(`Running job ${jobName} with payload:`, payload, requestEncoded());
return Task.ok(jobId as JobId);
});
performLater(parameters) {
const request = { jobName, parameters };
return Task.create<JobId, never>(async () => {
const { provider } = configuration.current;

const { jobId } = await provider.enqueue(instance, request, JobEnqueue.Immediate);
return Task.ok(jobId);
});
},
implement(handler) {
Job.implement(instance, handler);
},
};
return instance;
},
};

// export const Blah = Job.define('Blah', Type.Object({
// foo: Type.string,
// bar: Type.number,
// }));

// const jobId = await Blah.create({ foo: 'hello', bar: 42 }).run();
/**
* Defines a job implementation
*
* @example
* ```ts
* const MyJob = Job.define('MyJob', Type.Object({
* foo: Type.string,
* }));
*
* Job.implement(MyJob, (request) =>
* Console.log(request.parameters.foo)
* );
* ```
* @param module
* @param handler
*/
implement<Request extends JobRequest>(module: Job.Module<Request>, handler: JobHandler<Request>): void {
JobHandler.register(module.jobName, handler);
},
};
21 changes: 21 additions & 0 deletions packages/playground/src/job/JobEnqueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
export type JobEnqueue =
| { _: 'JobEnqueueImmediate' }
| { _: 'JobEnqueueDelayed'; delay: number };
export const JobEnqueue = {
/**
* Immediate job enqueue option, the job will be executed as soon as possible.
*
* @example
* JobEnqueue.Immediate
*/
Immediate: { _: 'JobEnqueueImmediate' } satisfies JobEnqueue,

/**
* Creates a delayed job enqueue option.
*
* @example
* JobEnqueue.Delayed(5000) // The job will be executed after 5 seconds
* @param delay
*/
Delayed: (delay: number): JobEnqueue => ({ _: 'JobEnqueueDelayed', delay }),
};
72 changes: 72 additions & 0 deletions packages/playground/src/job/JobHandler.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { beforeEach, describe, expect, it } from 'vitest';
import { useState } from '@w5s/application';
import { JobHandler } from './JobHandler.js';
import { meta } from './meta.js';
import type { JobRequest } from './JobRequest.js';

function getHandlers() {
return useState(meta, 'handlers', new Map<JobRequest['jobName'], JobHandler>() as ReadonlyMap<JobRequest['jobName'], JobHandler>);
}

describe('JobHandler', () => {
// eslint-disable-next-line ts/consistent-type-assertions
const generateHandler = (): JobHandler => () => ({}) as never;

beforeEach(() => {
getHandlers().current = new Map();
});

it('registers a handler for a job name', () => {
const handler = generateHandler();

JobHandler.register('email', handler);

const handlers = getHandlers().current;
expect(handlers.size).toBe(1);
expect(handlers.get('email')).toBe(handler);
});

it('replaces an existing handler for the same job name', () => {
const previousHandler = generateHandler();
const nextHandler = generateHandler();

JobHandler.register('email', previousHandler);
const previousMap = getHandlers().current;

JobHandler.register('email', nextHandler);

const handlers = getHandlers().current;
expect(handlers).not.toBe(previousMap);
expect(handlers.size).toBe(1);
expect(handlers.get('email')).toBe(nextHandler);
});

it('unregisters only the requested handler', () => {
const emailHandler = generateHandler();
const smsHandler = generateHandler();

JobHandler.register('email', emailHandler);
JobHandler.register('sms', smsHandler);

JobHandler.unregister('email');

const handlers = getHandlers().current;
expect(handlers.has('email')).toBe(false);
expect(handlers.get('sms')).toBe(smsHandler);
expect(handlers.size).toBe(1);
});

it('does nothing when unregistering a missing job name', () => {
const existing = generateHandler();

JobHandler.register('existing', existing);
const previousMap = getHandlers().current;

JobHandler.unregister('missing');

const handlers = getHandlers().current;
expect(handlers).not.toBe(previousMap);
expect(handlers.get('existing')).toBe(existing);
expect(handlers.size).toBe(1);
});
});
64 changes: 64 additions & 0 deletions packages/playground/src/job/JobHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { useState } from '@w5s/application';
import { meta } from './meta.js';
import type { JobRequest } from './JobRequest.js';
import type { Option } from '@w5s/core';
import type { TaskLike } from '@w5s/task';

export interface JobHandler<Request = JobRequest> {
(request: Request): TaskLike<void, never>;
}

const __handlers = useState(meta, 'handlers', new Map<JobRequest['jobName'], JobHandler>() as ReadonlyMap<JobRequest['jobName'], JobHandler<any>>);

/**
* @namespace
*/
export const JobHandler = {
/**
* Retrieves a job handler for the specified type, if it exists.
*
* @example
* ```ts
* const handler = JobHandler.get('my-job-type');
* if (handler) {
* handler({ ... }); // call the handler with a job request
* }
* ```
* @param jobName
*/
get<Request extends JobRequest>(jobName: Request['jobName']): Option<JobHandler<Request>> {
return __handlers.current.get(jobName) as Option<JobHandler<Request>>;
},

/**
* Registers a job handler for the specified type.
*
* @example
* ```ts
* JobHandler.register('my-job-type', (request) => {
* // handle the job request here
* console.log(request);
* });
* ```
* @param jobName
* @param handler
*/
register<Request extends JobRequest>(jobName: Request['jobName'], handler: JobHandler<Request>): void {
__handlers.current = new Map(__handlers.current).set(jobName, handler);
},

/**
* Unregisters a job handler for the specified type.
*
* @example
* ```ts
* JobHandler.unregister('my-job-type');
* ```
* @param jobName
*/
unregister(jobName: JobRequest['jobName']): void {
const newHandlers = new Map(__handlers.current);
newHandlers.delete(jobName);
__handlers.current = newHandlers;
},
};
19 changes: 19 additions & 0 deletions packages/playground/src/job/JobId.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import type { Tag } from '@w5s/core';

/**
* A unique identifier for a job.
*/
export type JobId = string & Tag<'JobId'>;

/**
* Creates a JobId from a string value.
*
* @example
* ```typescript
* const jobId = JobId('my-job-id');
* ```
* @param value
*/
export function JobId(value: string): JobId {
return value as JobId;
}
Loading