Server-sent events (SSE; using ReadableStream) proof of concept using UnJS Nitro and a TS JSDoc frontend.
nitro-sse-counter$ npm i
> nitro-sse-counter@0.0.0 prepare
> nitropack prepare
added 397 packages, and audited 399 packages in 31s
70 packages are looking for funding
run `npm fund` for details
found 0 vulnerabilities
nitro-sse-counter$ npm run client:build
> nitro-sse-counter@0.0.0 client:build
> ./node_modules/.bin/rollup -c src/client/rollup.config.mjs
src/client/entry.js → src/public/main.js...
created src/public/main.js in 413ms
nitro-sse-counter$ cp .env.example .env
nitro-sse-counter$ npm run dev
> nitro-sse-counter@0.0.0 dev
> nitropack dev
➜ Local: http://localhost:3000/
➜ Network: use --host to expose
✔ Nitro built in 566 ms
nitro-sse-counter$After launching the server open 3 separate tabs to http://localhost:3000/ in the browser. Then switch to incognito mode and open another 2 tabs to http://localhost:3000/ (alternately open them on a different (make of) browser).
⚠️ CautionDon't open too many tabs/pages with event sources to the same server on the same browser or things will invariably break.
HTML Living Standard—9.2.7 Authoring notes: “Clients that support HTTP's per-server connection limitation might run into trouble when opening multiple pages from a site if each page has an
EventSourceto the same domain. Authors can avoid this using the relatively complex mechanism of using unique domain names per connection, or by allowing the user to enable or disable theEventSourcefunctionality on a per-page basis, or by sharing a singleEventSourceobject using a shared worker.”Google Chrome for example has a limit of 6 connections per domain per browser under HTTP/1.1 (this limitation does not apply to HTTP/2.x and beyond, through with HTTP 3.x WebTransport is likely a better fit).
See also
All five tabs will show a count of “0”.
- On the incognito side click the “Increment” button on one of the tabs. Once it changes to “1” check to other tab to discover that its count has changed to “1” as well. The three tabs on the normal side are still showing a count of “0”.
The server uses an HttpOnly cookie (with h3 session support) to associate a page with a CounterRecord.
While each page has it's own event source, all pages associated with the same browser session share the same CounterRecord. When one of the pages within the session is incremented, all pages on the same session will be updated wit the new value.
Because the normal and incognito sessions are kept separate only the incognito tabs were updated.
- Switch to the normal tabs an increment one of them twice. All normal tabs will now show a count of “2” while the incognito tabs still show a count “1”.
Advancing the count to “10” will cause the server to close all the SSE responses that are associated with that CounterRecord.
Normally the client event sources will attempt to reconnect immediately but in this case the client will simply close the event source once it receives an error.
Reloading one of the tabs will create an entirely new CounterRecord on the server and start the process all over again. The other tabs however won't be notified as they have lost their SSE connection. Reloading them will synchronize them with the other tab as the shared session will give them access to the same CounterRecord.
The figure below roughly outlines the relationships between the various modules that support their collaborations.
(Made with excalidraw.)
-
/routes/indexreturns the page's HTML. It checks whether aCounterRecordis currently associated with the client session. If one is found the current count is interpolated into the page. Otherwise a newCounterRecordisn't created until the page connects with an event source. -
/middlewarejust moves some data from the request into the event context for easy consumption. -
/api/counterreturns an event-stream to carry count updates to the client via adispatch()callback registered with hookable set up in/utils/hooks. It submits a task to thetask-queuetoaddObserverandcountUnicast.countUnicastsends the most recent count as the first message through the event stream that was just created. -
/api/incrementtriggers an eventual increment (and subsequent count update message to every observer) of theCounterRecordassociated with the request's session by submitting anupdateBroadcasttask (increment¬ifyObservers) totask-queue.202 Acceptedis returned if theCounterRecordexists, otherwise409 Conflictis returned. -
/server/task-queueexists to managecountUnicastandupdateBroadcasttasks. It sequences tasks belonging to the sameCounterRecordwhile those of other records can happen concurrently. It removes unnecessarycountUnicasttasks when anupdateBroadcastis already queued or when it's queued whilecountUnicasttasks are waiting. -
/server/counterfurnishes theaddObserver,dropObserver,increment, andcounterRecordFromEventtasks. TheCounterRecorditself is handled by Unstorage.counterhas it's own internal task queue to sequence operations against the sameCounterRecordas accesses outside of those fromcountUpdateandupdateBroadcasttasks (managed bytask-queue) could otherwise still lead to race conditions.addObservereither creates a newCounterRecordor increments the observer count on an already existingCounterRecord;dropObserverdecrements the observer count or removes theCounterRecordentirely if there are no more observers;incrementincrements the count, stores the updated record and returns a copy of the updated record;counterRecordFromEventreturns a copy of the record if it currently exists.
-
/entrybootstraps the client side JavaScript by wrapping the browser dependencies, injecting the wrappers into the core app and then binding the UI components to the app and their respective DOM regions. -
/components/countis the UI component for updating and visually adjusting the count's appearance based oncountandavailableStatusnotifications from theapp. -
/components/triggeris the UI component that forwards the buttonclickto theappand adjusts the button's visual appearance based onavailableStatusnotifications from theapp. -
/components/statusis the UI component that updates and visually adjusts the status messages, received viastatusnotifications from theapp. -
/app/inboundwraps the event source.start()creates the event source a forwards any messages or errors to subscribers. -
/app/outboundwraps thePOST fetch()to the/api/incrementendpoint. The status code is mapped to a boolean result. -
/app/contextacts as the marshalling yard for the counter'savailableStatus. It specifically monitors the messages frominboundto make the required adjustments to theavailableStatusand notifies all subscribers of the change. Other forced changes are reported viasendAvailable(). When theavailableStatuschanges toUNAVAILABLEan appropriate status message is selected and forwarded to/app/status. -
/app/incrementis responsible for issuing theincrement()command via/app/outboundand forcing/app/contextinto aWAITstatus. It will also force theUNAVAILABLEstatus should theincrement()command fail to be accepted. -
/app/countfilters theinboundupdate messages and forwards them as plain count updates to its subscriber (/components/count). -
/app/statusforwards any status messages originating anywhere from inside theappto its subscriber (/components/status).
The following activity diagrams highlight some collaborations for some key capabilities.
The counter stream connects at the end of the entry script that runs when the / page loads.
-
The
entryscript causes theapp/inboundstart()to execute which issues aGETrequest to the server's/api/counterendpoint wheninboundcreates the event source. -
First an
addObservertask is submitted. It will create and store a freshCounterRecord(if one already existed, it is simply updated with an incremented observer count). -
A
ReadableStreamis created while adispatch()callback is registered with thecounterHooksand acountUnicasttask is submitted to eventually sent the first message with the current count through the stream. Finally that stream is returned in the HTTP response (with is kept open for further messages). -
With the event source created
inbound'saddRouterregisters amessageanderrorhandler on it to notify s its subscribers of these events.
-
A user
clickon a DOM button marked with thejs:c-trigger(JavaScript hooks, component namespaces) class initiates the increment command. -
components/triggerinvokes theincrement()action on/app/increment. -
First the
WAITavailableStatusis sent toapp/contextwhich broadcasts it to all its subscribers which includescomponents/trigger. It promptly applies thejs:c-trigger--waitandjs:c-trigger--disabledclasses to the DOM button. -
Second
/app/outbound'sincrement()function is called resulting in a HTTPPOSTrequest to/api/increment. This causes aupdateBroadcast(increment¬ifyObservers) task to be submitted (and a202 Acceptedstatus to be returned).
Eventually the /server/task-queue executes the updateBroadcast task.
-
First
/server/counterperforms the increment on the appropriateCounterRecordupdates the storage and returns the updated record. -
Second that record is used to broadcast the update. The necessary arguments for the
dispatch()callbacks registered withcounterHooksare passed to hookable which then notifies every registered callback. -
The
dispatch()created by/api/counterusessend()provided by/server/event-streamto send the update throug theReadableStream. -
The updated
countis received as a MessageEvent by/app/inboundwhich forwards it as aCountUpdateto all its subscribers. -
/app/countis one such subscriber. After receiving theCountUpdateit notifies/components/countof the update which modifies theTextnode inside its element marked with ajs:c-countclass. -
/app/contextis the otherCountMessagesubscriber. TheCountUpdatecauses it to change the previously setWAITavailableStatustoREADYand broadcast the change to it's subscribers./components/triggeris one such subscriber which in response toREADYremoves both thejs:c-trigger--waitandjs:c-trigger--disabledclasses from its DOM button—thereby enabling the “Increment” button to be cliked once again.
- Middleware transferring common values from request to
H3EventContext. - Session cookies with h3.
- Shared data managed in server (memory) storage with Unstorage.
- Client specific event dispatchers related to shared server data managed with hooks.
Though not strictly necessary a ReadableStream is returned in the server-sent events (features) response to carry the events from the server to the client. The ReadableStream is a web standard API and the event stream (mostly) decouples it from the node specific request API.
// file: src/server/event-stream.ts
import { IncomingMessage } from 'node:http';
import { ReadableStream } from 'node:stream/web';
import { TextEncoder } from 'node:util';
export type SourceController = {
send: (data: string, id?: string) => void;
close: () => void;
};
type InitSource = (controller: SourceController) => () => void;
function makeEventStream(request: IncomingMessage, init: InitSource) {
// listen to the request closing ASAP
let cleanup: (() => void) | undefined;
let closeStream: (() => void) | undefined;
let onClientClose = () => {
if (onClientClose) {
request.removeListener('close', onClientClose);
onClientClose = undefined;
}
closeStream?.();
};
request.addListener('close', onClientClose);
return new ReadableStream({
start(controller) {
const encoder = new TextEncoder();
const send = (data: string, id?: string) => {
const payload = (id ? 'id:' + id + '\ndata:' : 'data:') + data + '\n\n';
controller.enqueue(encoder.encode(payload));
};
closeStream = () => {
if (!cleanup) return;
cleanup();
cleanup = undefined;
controller.close();
};
cleanup = init({ send, close: closeStream });
if (!onClientClose) {
// client closed request early
closeStream();
return;
}
},
});
}
export { makeEventStream };The event handler increments the observer count on the counter record. The initialization function registers the client event dispatch with the server counter hooks an submits a task to (eventually) send this client the most current count.
The cleanup function is responsible for unregistering the client event dispatch and adjusting the observer count (potentially dropping the counter record). The cleanup is called when the event stream is closed by either the server or client.
// file: src/api/counter.ts
import { CONTEXT_SESSION_ID, refreshCounterId } from '../server/session';
import { makeEventStream, type SourceController } from '../server/event-stream';
import { submitTask } from '../server/task-queue';
import {
addObserver,
dropObserver,
type CountDispatch,
type CounterRecord,
} from '../server/counter';
function submitCountUnicast(record: CounterRecord, dispatch: CountDispatch) {
const task = () => {
dispatch(record.count, record.lastEventId);
};
// ejectable by non-ejectable task
// with same priority (counter) id
// i.e. the `increment` will notify the observer
// of the latest value already
//
submitTask(task, record.id, true);
}
function makeInitFn(record: CounterRecord, sessionId: string) {
return function init(controller: SourceController) {
const { send, close } = controller;
const dispatch = (count: number, id: string) => {
send(String(count), id);
if (count > 9) close();
};
const unsubscribe = counterHooks.hook(record.id, dispatch);
submitCountUnicast(record, dispatch);
return function cleanup() {
unsubscribe();
dropObserver(record.id, sessionId);
};
};
}
export default defineEventHandler(async (event) => {
const record = await addObserver(event, refreshCounterId);
const sessionId = event.context[CONTEXT_SESSION_ID] as string;
const init = makeInitFn(record, sessionId);
setHeader(event, 'cache-control', 'no-cache');
setHeader(event, 'connection', 'keep-alive');
setHeader(event, 'content-type', 'text/event-stream');
setResponseStatus(event, 200);
return makeEventStream(event.node.req, init);
});Every export in the utils directory or its subdirectories becomes available globally.
So counterHooks is available throughout the server.
The ID to a CounterRecord is used as the key to counterHooks.
This key acts much in the same way as an event type identifier (e.g. click).
It allows multiple (CountDispatch) callbacks to be registered against a single key.
const unsubscribe = counterHooks.hook(record.id, dispatch) registers the dispatch callback the record.id key, returning an unsubscribe thunk for unregistering later.
In the increment endpoint counterHooks.callHook(record.id, record.count, record.lastEventId) is then used to invoke all CounterDispatch callbacks registered against record.id with the arguments (record.count, record.lastEventId).
// file: src/utils/hooks.ts
import { createHooks } from 'hookable';
import type { CountDispatch } from '../server/counter';
export interface CounterHooks {
[key: string]: CountDispatch;
}
export const counterHooks = createHooks<CounterHooks>();The session correlates the client to the counter record maintained by the server. While the counter record is dropped when the last observer closes, the session persist as it may not be possible to update/remove the session cookie on the client at that time. However when a new record is initialized, the a cookie is updated with a different counter ID. The session ID is separate from the counter ID; the session record only holds the counter ID needed access the counter record.
// file: src/server/session.ts
import crypto from 'uncrypto';
import { updateSession, getSession, type SessionConfig } from 'h3';
import type { EventHandlerRequest, H3Event } from 'h3';
type SessionRecord = {
counterId: string;
};
const CONTEXT_SESSION_ID = 'session-id';
if (!process.env.SESSION_SECRET) throw Error('SESSION_SECRET must be set');
const config: SessionConfig = {
cookie: {
// domain?: string
// encode?: (value: string) => string;
// expires?: Date
httpOnly: true,
// maxAge?: number
path: '/',
sameSite: 'lax',
secure: true,
},
password: process.env.SESSION_SECRET,
maxAge: 86400, // 24h
name: '__session',
};
// Note: Despite type, `session.data` may not have `id` property
const sessionFromEvent = (event: H3Event<EventHandlerRequest>) =>
getSession<SessionRecord>(event, config);
async function refreshCounterId(event: H3Event<EventHandlerRequest>) {
const counterId = crypto.randomUUID();
await updateSession(event, config, { counterId });
return counterId;
}
export { CONTEXT_SESSION_ID, refreshCounterId, sessionFromEvent };Middleware is responsible for creating the session cookie and transferring some request data to the event context.
// file: src/middleware/1.session.ts
import { CONTEXT_COUNTER } from '../server/counter';
import { CONTEXT_SESSION_ID, sessionFromEvent } from '../server/session';
import { CONTEXT_URL, urlFromRequest } from '../server/url';
export default defineEventHandler(async (event) => {
const url = urlFromRequest(event.node.req);
if (url) event.context[CONTEXT_URL] = url;
const session = await sessionFromEvent(event);
event.context[CONTEXT_SESSION_ID] = session.id;
const record = session.data;
if (record) event.context[CONTEXT_COUNTER] = record.counterId;
});The task queue manages the event dispatches. It is used to eject redundant tasks, i.e. a unicast count update is redundant if an increment to the respective counter is already scheduled or will be scheduled before the entire queue of tasks is run.
For the same counter the increment task is considered non-ejectable while a unicast update is ejectable. An increment task increments the counter record and notifies all active observers of that record. A unicast update is the initial observer specific event that just communicates the current counter value.
- An ejectable task can be added as long as no task or only ejectable tasks are on the queue.
- Queuing a non-ejectable task will cause all ejectable tasks on the queue to be ejected.
- Once a non-ejectable task is on the queue no other other tasks, ejectable or non-ejectable, can be queued until the all the tasks on the queue have run.
The tasks are managed with respect to a priority ID. In this case the counter ID serves as the priority ID.
All active ejectable tasks are on the taskQueue.
The first ejectable TaskRecord (for a specific priorityId) in the queue acts as the head of a linked list of all ejectable tasks for that particular priorityId.
The linked list is traversed when all the ejectable tasks are ejected.
priorityMap acts a an index by priorityId into the taskQueue; it indexes only non-ejectable TaskRecordss or any ejectable TaskRecords at the head of their respective linked lists.
// file: src/server/task-queue.ts
type TaskRecord = {
task: () => void;
priorityId: string | undefined;
eject: boolean;
next: TaskRecord | undefined;
};
type PriorityMap = Map<string, TaskRecord>;
const RUN_DELAY = 300;
const makeTaskRecord = (
task: () => void,
priorityId?: string,
eject: boolean = false
) => ({ task, priorityId, eject, next: undefined });
let queuedId: ReturnType<typeof setTimeout> | undefined;
const taskQueue: TaskRecord[] = [];
const priorityMap: PriorityMap = new Map();
// Process **everything** on the queue
function runQueue() {
priorityMap.clear();
for (let i = 0; i < taskQueue.length; i += 1) taskQueue[i].task();
taskQueue.length = 0;
queuedId = undefined;
}
function queueTask(record: TaskRecord) {
taskQueue.push(record);
if (typeof queuedId !== 'undefined') return;
queuedId = setTimeout(runQueue, RUN_DELAY);
}
// For non-ejectable or first ejectable tasks
function queueTaskWithId(record: TaskRecord) {
priorityMap.set(record.priorityId, record);
queueTask(record);
}
// For ejectable tasks after the first
function appendEjectable(head: TaskRecord, last: TaskRecord) {
let record = head;
for (; record.next; record = record.next);
record.next = last;
queueTask(record);
}
function ejectTasks(record: TaskRecord) {
for (let next = record; next; next = record.next) {
const index = taskQueue.indexOf(next);
if (index > -1) taskQueue.splice(index, 1);
}
}
function submitTask(
task: () => void,
priorityId?: string,
eject: boolean = false
) {
if (typeof priorityId === 'undefined') {
queueTask(makeTaskRecord(task));
return;
}
const found = priorityMap.get(priorityId);
if (!found) {
queueTaskWithId(makeTaskRecord(task, priorityId, eject));
return;
}
if (!found.eject) {
// found task will take care of it
return;
}
const record = makeTaskRecord(task, priorityId, eject);
if (record.eject) {
appendEjectable(found, record);
return;
}
// new task will replace ejectables
ejectTasks(found);
queueTaskWithId(record);
}
export { submitTask };The CounterRecords are managed with the nitro storage layer using the default Unstorage memory driver.
The module exports:
addObserveradds a newCounterRecordwhen there is no counter with a matchingcounterId; otherwise it increments the observer count and updates the storedCounterRecord.dropObserverdecrements the observer count and updates the storedCounterRecordunless there are no more observers, in which case theCounterRecordis removed entirely.incrementincrements the count of theCounterRecordand updates it.counterRecordFromEventretrieves theCounterRecordassociated with the current session if there is one.
Each of these functions queue a TaskRecord on taskQueue (distinct from src/server/task-queue.ts) in order to prevent racing conditions against a shared record by separate clients. The TaskRecord's is the session ID, not counter ID. This makes it possible to queue an addObserver task on the queue even in the absense of a counter ID (when the CounterRecord doesn't yet exist).
To be clear: tasks for different session IDs (different CounterRecords) are allowed to execute concurrently. Tasks for the same session ID (same CounterRecord) have to run sequentially.
- When a task is added to the
taskQueuethe task isn't run if there already is a task on the queue for the same session ID. - When the task added is to the queue and it is the first task of that session ID
runTasksis launched immedately. Once the task completes, the task is removed form the queue and the queue is checked for new tasks belonging to that session ID and if any are found they are run in insertion order.
locate is simply an object with a withId and countWithId function.
In synchronous code sections it is perfectly safe to set locate.id and then use either function in order to avoid creating fresh one off function experessions.
The class expession is used to ensure that the arrow function's this refers to the locate object.
// file: src/server/counter.ts
import { CONTEXT_SESSION_ID } from './session';
import type { EventHandlerRequest, H3Event } from 'h3';
type RequestEvent = H3Event<EventHandlerRequest>;
export type CountDispatch = (count: number, id: string) => void;
export type CounterRecord = {
id: string;
lastEventId: string;
count: number;
observers: number;
};
type TaskRecord = {
id: string;
task: () => Promise<void>;
};
// --- Task Queue
const taskQueue: TaskRecord[] = [];
const locate = new (class {
id = '';
readonly withId = (record: TaskRecord) => this.id === record.id;
readonly countWithId = (count: number, record: TaskRecord) =>
this.id === record.id ? count + 1 : count;
})();
// Continue running until there are no more tasks with the same
// session IDs in taskQueue
async function runTasks(record: TaskRecord) {
for (
;
typeof record !== 'undefined';
locate.id = record.id, record = taskQueue.find(locate.withId)
) {
await record.task();
const index = taskQueue.indexOf(record);
taskQueue.splice(index, 1);
}
}
function queueTask(record: TaskRecord) {
locate.id = record.id;
const count = taskQueue.reduce(locate.countWithId, 0);
taskQueue.push(record);
// Others with identical session ID already running
if (count > 0) return;
runTasks(record);
}
// --- Storage Tnteraction
//
const CONTEXT_COUNTER = 'counter';
const STORAGE_COUNTER = 'counter';
const makeCounterRecord = (id: string, lastEventId: string): CounterRecord => ({
id,
lastEventId,
count: 0,
observers: 1,
});
// inferred return type should also included `undefined`
async function counterRecord(id: unknown): Promise<CounterRecord | void> {
if (typeof id !== 'string') return undefined;
return (
(await useStorage(STORAGE_COUNTER).getItem<CounterRecord>(id)) ?? undefined
);
}
const removeRecord = (id: string) => useStorage(STORAGE_COUNTER).removeItem(id);
const updateRecord = (record: CounterRecord) =>
useStorage(STORAGE_COUNTER).setItem<CounterRecord>(record.id, record);
// --- Tasks
//
function counterRecordFromEvent(event: RequestEvent) {
const id = event.context[CONTEXT_SESSION_ID];
const counterId = event.context[CONTEXT_COUNTER];
return new Promise<CounterRecord | void>((resolve, reject) => {
const task = async () => {
try {
const record = await counterRecord(counterId);
return resolve(record);
} catch (error) {
reject(error);
}
};
queueTask({ id, task });
});
}
function increment(event: RequestEvent) {
const id = event.context[CONTEXT_SESSION_ID];
const counterId = event.context[CONTEXT_COUNTER];
return new Promise<CounterRecord | void>((resolve, reject) => {
const task = async () => {
try {
const record = await counterRecord(counterId);
if (!record) return resolve(undefined);
record.count += 1;
record.lastEventId = String(Date.now());
await updateRecord(record);
return resolve(record);
} catch (error) {
reject(error);
}
};
queueTask({ id, task });
});
}
function addObserver(
event: RequestEvent,
refreshId: (event: RequestEvent) => Promise<string>
) {
const id = event.context[CONTEXT_SESSION_ID];
const counterId = event.context[CONTEXT_COUNTER];
return new Promise<CounterRecord>((resolve, reject) => {
const task = async () => {
try {
let record = await counterRecord(counterId);
if (record) {
record.observers += 1;
} else {
const freshId = await refreshId(event);
record = makeCounterRecord(freshId, String(Date.now()));
}
await updateRecord(record);
return resolve(record);
} catch (error) {
reject(error);
}
};
queueTask({ id, task });
});
}
function dropObserver(counterId: string, id: string) {
return new Promise<CounterRecord | void>((resolve, reject) => {
const task = async () => {
try {
const record = await counterRecord(counterId);
if (!record) return resolve(undefined);
if (record.observers < 2) {
await removeRecord(record.id);
return resolve(undefined);
}
record.observers -= 1;
await updateRecord(record);
return resolve(record);
} catch (error) {
reject(error);
}
};
queueTask({ id, task });
});
}
export {
CONTEXT_COUNTER,
addObserver,
counterRecordFromEvent,
dropObserver,
increment,
};The handler first verifies that the CounterRecord exists before it proceeds to submit a task to increment the count and notify all of its observers.
callHook is used to broadcast the updated count to the observing clients.
// file: src/api/increment.post.ts
import { submitTask } from '../server/task-queue';
import {
counterRecordFromEvent,
increment,
type CounterRecord,
} from '../server/counter';
import type { EventHandlerRequest, H3Event } from 'h3';
function notifyObservers(record: void | CounterRecord) {
if (!record) return;
counterHooks.callHook(record.id, record.count, record.lastEventId);
}
const makeUpdateBroadcast = (event: H3Event<EventHandlerRequest>) => () =>
increment(event).then(notifyObservers);
export default defineEventHandler(async (event) => {
const record = await counterRecordFromEvent(event);
if (!record) {
sendNoContent(event, 409);
return;
}
// non-ejectable, non-duplicated task
submitTask(makeUpdateBroadcast(event), record.id);
sendNoContent(event, 202);
});Renders the static HTML portion of the page while injecting the current count of the CounterRecord if one already exist. The CounterRecord will only be created once the first page's main.js script attempts to connect to the Event Source at /api/counter.
// file: src/routes/index.ts
import { counterRecordFromEvent } from '../server/counter';
export default defineEventHandler(async (event) => {
const title = 'SSE Counter';
const record = await counterRecordFromEvent(event);
const count = record ? String(record.count) : ' ';
// prettier-ignore
return (
'<!doctype html><html lang="en"><head>' +
'<meta name="viewport" content="width=device-width, initial-scale=1.0">' +
`<title>Nitro - ${title}</title>` +
'<link rel="icon" href="favicon.ico">' +
'<link href="https://fonts.googleapis.com/css?family=IBM+Plex+Sans:400,600&display=swap" rel="stylesheet">' +
'<link rel="stylesheet" href="styles.css">' +
'<script type="module" src="main.js"></script>' +
'</head><body>' +
`<h1>${title}</h1>` +
'<div class="c-counter">' +
'<dl>' +
'<dt>Count</dt>' +
`<dd aria-live="assertive" aria-disabled="false" class="c-counter__count js:c-count">${count}</dd>` +
'</dl>' +
'<div class="c-counter__increment">' +
'<button class="js:c-trigger c-counter__button">Increment</button>' +
'<p aria-live="assertive" class="js:c-status c-counter__status"></p>' +
'</div>' +
'</div>' +
'<footer>' +
'<div class="center">' +
'<p>Served with <a href="https://unjs.io/">UnJS</a> <a href="https://nitro.unjs.io/">Nitro</a>.</p>' +
'<p>Frontend' +
'<ul>' +
'<li>Using <a href="https://github.com/WebReflection/qsa-observer">qsa-observer</a></li>' +
'<li>Type checked with ' +
'<a href="https://www.typescriptlang.org/docs/handbook/jsdoc-supported-types.html">TS JSDoc</a>.' +
'</li>' +
'<li>Bundled with <a href="https://rollupjs.org/">Rollup</a>.</li>' +
'<li>Design repurposed from <a href="https://codepen.io/sandrina-p/pen/WNRYabB">here</a>.</li>' +
'<li>CSS reset from <a href="https://andy-bell.co.uk/a-more-modern-css-reset/">here</a>.</li>' +
'</ul>' +
'</p>' +
'</div>' +
'</footer>' +
'</body></html>'
);
});The client application starts with assembleApp() which constructs the inbound and outbound dependencies before injecting them into app factory. Once assembled hookupUI() makes it possible for the UI components to connect to the requisite capabilities of the core app.
// @ts-check
// file: src/client/entry.js
import { define } from './components/registry';
import * as count from './components/count';
import * as status from './components/status';
import * as trigger from './components/trigger';
import { makeInbound } from './app/inbound';
import { makeOutbound } from './app/outbound';
import { makeApp } from './app/index';
/** @typedef { ReturnType<typeof makeApp> } App */
function assembleApp() {
const inbound = makeInbound('/api/counter');
const outbound = makeOutbound('/api/increment');
return makeApp({ inbound, outbound });
}
/** @param { App } app
* @returns { void }
*/
function hookupUI(app) {
define(
count.NAME,
count.makeSpec(app.subscribeCount, app.subscribeAvailable)
);
define(status.NAME, status.makeSpec(app.subscribeStatus));
define(trigger.NAME, trigger.makeSpec(app.increment, app.subscribeAvailable));
app.start();
}
hookupUI(assembleApp());The core app consists of four parts:
statusThe routing point for any status displays to the UI. Any status message to be displayed must go throughstatus.send().statusis only designed to accept a single observer (thestatuscomponent).contextThe routing point for the currentavailableStatus. Every change must go throughcontext.sendAvailable(). The context receivesstatusas a dependency as it sends an "error" or "end of count" status display whenavailableStatusbecomesUNAVAILABLE(due to aCountErrororCountEndmessage being received).contextsubscribes toinboundforCountMessages;CountUpdatemessages change theavailableStatusfromWAITtoREADY.contextis designed to accept multiple observers (increment,triggerandcountcomponents).countSubscribes toinboundmessages and processes theCountUpdateby updating its single observer (countcomponent).incrementissues anincrementcommand tooutboundright after it sets theavailableStatus(fromREADY) toWAIT. If its command is notacceptedit switches theavailableStatustoUNAVAILABLE.
The exposed App API consist of:
incrementto dispatch anincrementcommand.startto establish the inbound connection.subscribeAvailableto subscribe to the currentavailableStatus.subscribeStatusto subscribe to the latest status text.subscribeCountto subscribe to the latest count.
The App starts out in
READYto issue an increment commandWAITwhile waiting for an increment command to complete.UNAVAILABLEis entered when either an error is encounter while trying to establish a connection or because the count was ended by the server.
// @ts-check
// file: src/client/app/index.js
/** @typedef { import('../types').CountSink } CountSink */
/** @typedef { import('../types').AvailableStatus } AvailableStatus */
/** @typedef { import('../types').AvailableSink } AvailableSink */
/** @typedef { import('../types').CountMessage } Message */
/** @typedef { import('../types').Inbound } Inbound */
/** @typedef { import('../types').Outbound } Outbound */
/** @typedef { import('../types').Status } Status */
/** @typedef { import('../types').StatusSink } StatusSink */
import { Sinks } from '../lib/sinks';
import { availableStatus } from './available';
/** @param { Inbound['subscribe'] } subscribe
* @param { StatusSink } sendStatus
*/
function makeContext(subscribe, sendStatus) {
/** @type { Status | undefined } */
let status = {
error: true,
message: 'Connection failed. Reload to retry.',
};
// Once status is undefined we're done
/** @type { Sinks<AvailableStatus> } */
const sinks = new Sinks();
const context = {
/** @type { AvailableStatus } */
available: availableStatus.READY,
/** @param { AvailableStatus } available */
sendAvailable: (available) => {
if (!status) return;
context.available = available;
sinks.send(available);
if (available !== availableStatus.UNAVAILABLE) return;
// signing off…
sendStatus(status);
status = undefined;
context.unsubscribe();
},
/** @param { AvailableSink } sink
*/
subscribe: (sink) => {
const unsubscribe = sinks.add(sink);
sink(context.available);
return unsubscribe;
},
unsubscribe: (() => {
/** @type { (() => void) | undefined } */
let remove = subscribe(handler);
return () => {
if (!remove) return;
remove();
remove = undefined;
};
/** @param { Message } message */
function handler(message) {
switch (message.kind) {
case 'end': {
status = {
error: false,
message: 'Count complete. Reload to restart.',
};
context.sendAvailable(availableStatus.UNAVAILABLE);
return;
}
case 'error': {
context.sendAvailable(availableStatus.UNAVAILABLE);
return;
}
default: {
if (context.available === availableStatus.WAIT)
context.sendAvailable(availableStatus.READY);
return;
}
}
}
})(),
};
return context;
}
/** @param { Inbound['subscribe'] } subscribe
*/
function makeCount(subscribe) {
/** @type { CountSink | undefined } */
let sink;
return {
/** @param { CountSink } nextSink
* @return { () => void }
*/
subscribe: (nextSink) => {
sink = nextSink;
return () => {
if (sink === nextSink) sink = undefined;
};
},
unsubscribe: (() => {
/** @type { (() => void) | undefined } */
let remove = subscribe(handler);
return () => {
if (!remove) return;
remove();
remove = undefined;
};
/** @param { Message } message */
function handler(message) {
if (message.kind === 'update' && sink) sink(message.count);
}
})(),
};
}
/** @param { () => Promise<boolean> } incrementFn
* @param { AvailableSink } sendAvailable
*/
function makeIncrement(incrementFn, sendAvailable) {
/** @param { boolean } accepted */
const postIncrement = (accepted) => {
if (accepted) return;
sendAvailable(availableStatus.UNAVAILABLE);
};
let done = false;
const increment = {
/** @type { AvailableSink } */
availableSink: (available) => {
if (done) return;
if (available === availableStatus.UNAVAILABLE) done = true;
},
/** @type { undefined | (() => void)} */
unsubscribe: undefined,
dispatch() {
if (done) return;
sendAvailable(availableStatus.WAIT);
incrementFn().then(postIncrement);
},
};
return increment;
}
function makeStatus() {
/** @type { undefined | StatusSink } */
let sink;
const status = {
/** @param { Status} message
* @return { void }
*/
send: (message) => {
if (sink) sink(message);
},
/** @param { StatusSink } nextSink
* @return { () => void }
*/
subscribe: (nextSink) => {
sink = nextSink;
return () => {
if (sink === nextSink) sink = undefined;
};
},
};
return status;
}
/** @param { {
* inbound: Inbound
* outbound: Outbound
* } } api
*/
function makeApp({ inbound, outbound }) {
const status = makeStatus();
const context = makeContext(inbound.subscribe, status.send);
const count = makeCount(inbound.subscribe);
const increment = makeIncrement(outbound.increment, context.sendAvailable);
// Internal registrations
increment.unsubscribe = context.subscribe(increment.availableSink);
return {
increment: increment.dispatch,
start: inbound.start,
subscribeAvailable: context.subscribe,
subscribeStatus: status.subscribe,
subscribeCount: count.subscribe,
};
}
export { makeApp };The inbound module isolates the core's dependency on the browser's EventSource.
eventRouter is an EventListenerObject (handleEvent, DOM Spec) used in lieu of the typical EventListener callback function.
Apart from the handleEvent(event: Event) method this particular EventListenerObject carries information relevant to the event context:
hrefthe url to connect the event sourcesourceonce theEventSourcehas been connected- at most one
sink, aMessageSink statusthe state of the message connectionundefinednot yet connectedtrueconnected and operationalfalseerrored or completed and therefore closed
makeInbound exposes two functions:
subscribeto add aMessageSinkto theeventRouterstartto attempt to connect the event source with the configuredhref, later attachingeventRouterto themessageanderrorevents, preparing it to emitCountMessages.
In this simple case the MessageEvent will only ever hold the updated count on the data property and the lastEventId property is ignored. In practice any type of serialized data (preferably represented via a discrimated union) could be attached. MessageEvents can have types other than 'message' for custom events. lastEventId is usually used implement the ability to catch up on past events after the event source looses the connection and automatically reconnects.
dispatchUpdatesends the updated count to theSink<CounterMessage>as aCountUpdateMESSAGE_ENDis sent toSink<CounterMessage>when the count finishedMESSAGE_ERRORis sent toSink<CounterMessage>when an error occurs before the connection can be established.
After receiving an error Event it is assumed that the count has completed if the current status === true causing NESSAGE_END; otherwise MESSAGE_ERROR is sent.
In either case the eventRouter is disposed of and cleaned up.
// @ts-check
// file: src/client/app/inbound.js
/** @typedef { import('../types.ts').CountEnd } CountEnd */
/** @typedef { import('../types.ts').CountError } CountError */
/** @typedef { import('../types.ts').CountMessage } CountMessage */
/** @typedef { import('../types.ts').CountUpdate } CountUpdate */
/** @typedef { import('../types.ts').Inbound } Inbound */
/** @typedef { import('../types.ts').MessageSink } MessageSink */
import { Sinks } from '../lib/sinks';
/** @typedef { EventListenerObject & {
* status: undefined | boolean;
* href: string;
* sinks: Sinks<CountMessage>;
* source: void | EventSource;
* } } HandlerObject */
/** @type { CountEnd } */
const MESSAGE_END = {
kind: 'end',
};
/** @type { CountError } */
const MESSAGE_ERROR = {
kind: 'error',
reason: 'Failed to open connection',
};
/** @param { MessageSink } send
* @param { MessageEvent<string> } event
*/
function dispatchUpdate(send, event) {
const count = Number(event.data);
if (Number.isNaN(count)) return;
/** @type { CountUpdate } */
const message = {
kind: 'update',
count,
};
send(message);
}
/** @param { HandlerObject } router */
function disposeRouter(router) {
if (router.source) {
router.source.removeEventListener('message', router);
router.source.removeEventListener('error', router);
if (router.source.readyState < 2) router.source.close();
}
router.source = undefined;
router.sinks.clear();
router.status = false;
}
/** @param { EventSource } source
* @param { HandlerObject } router
*/
function addRouter(source, router) {
source.addEventListener('message', router);
source.addEventListener('error', router);
}
/** @param { string } href
*/
function makeInbound(href) {
/** @type { HandlerObject } */
const eventRouter = {
status: undefined,
href,
sinks: new Sinks(),
source: undefined,
handleEvent(event) {
if (this.sinks.size < 1 || this.status === false) return;
if (event instanceof MessageEvent) {
this.status = true;
dispatchUpdate(this.sinks.send, event);
return;
}
if (event.type === 'error') {
if (this.status === true) {
this.sinks.send(MESSAGE_END);
} else {
this.sinks.send(MESSAGE_ERROR);
}
disposeRouter(this);
}
},
};
const start = () => {
eventRouter.source = new EventSource(eventRouter.href);
if (eventRouter.sinks.size > 0) {
addRouter(eventRouter.source, eventRouter);
}
};
/** @param { MessageSink } sink */
const subscribe = (sink) => {
const size = eventRouter.sinks.size;
const unsubscribe = eventRouter.sinks.add(sink);
if (size < 1 && eventRouter.source) {
addRouter(eventRouter.source, eventRouter);
}
return unsubscribe;
};
/** @type { Inbound } */
return {
start,
subscribe,
};
}
export { makeInbound };The outbound module isolates the core's dependency on the browser's fetch.
The increment command simply requests that the count on the counter record currently associated with the client session be incremented.
A return status of 202 Accepted indicates that the task has been queued and will completed some time soon (it's complete once the corresponding MessageEvent arrives via the inbound event source).
A return status of 409 Conflict indicates that the counter identified in the session no longer exists (i.e. there is no one left listening) and that the command cannot be honoured (404 Not Found is more closely associated the resource identified by the url (e.g. /api/increment)—given that the out of band session data is considered a different status seemed more appropriate).
In this case the value of the response's ok property is enough to communicate success or failure of the increment command to the core.
// @ts-check
// file: src/client/app/outbound.js
/** @param { string } href
*/
function makeOutbound(href) {
const increment = async () => {
const response = await fetch(href, { method: 'POST' });
return response.ok;
};
return {
increment,
};
}
export { makeOutbound };The component modules isolate the core app from the browser's DOM API. The registry handle the component lifecycles.
This simple component registry uses the querySelectAll Observer which itself is based on MutationObserver.
// file: src/client/types.ts
// …
/*
A specification object
(https://gist.github.com/benpriebe/55b7e950b5e9d056b47e?permalink_comment_id=2229105#gistcomment-2229105)
is a configurations/customizations "bag" that is passed
to a factory or builder function to configure and/or
customize the item under assembly.
Douglas Crockford also called them _Object Specifiers_
in _JavaScript: The Good Parts_.
The term was also used in the original
(https://shripadk.github.io/react/docs/top-level-api.html#react.createclass)
React documentation:
`function createClass(object specification)` Create a component
given a **specification**. … For more information about
the **«specification object»**, see Component Specs and Lifecycle
(https://shripadk.github.io/react/docs/component-specs.html).
*/
export type QsaoSpec = {
mounted?: (element: Element) => void;
unmounted?: (element: Element) => void;
};
// … The registry Map maps a component to its QsaoSpec.
The QsaoSpec can provide mounted and unmounted lifecycle functions.
mounted is invoked when an element with the registered component name in its classList is inserted in to the DOM.
unmounted is invoked when an element with the registered component name in its classList is removed from the DOM.
The define function derives a class selector from the supplied component name and stores the Qsao lifecycle spec in the registry Map before applying it to the current DOM state.
// @ts-check
// file: src/client/components/registry.js
import Qsao from 'qsa-observer';
/** @typedef { import('../types').QsaoSpec } Spec */
// set up component registry
/** @type { Map<string, Spec> } */
const registry = new Map();
/** @type { string[] } */
const query = [];
const root = self.document;
const qsao = Qsao({
query,
root,
handle(element, mounted, selector) {
const spec = registry.get(selector);
if (!spec) return;
(mounted ? spec.mounted : spec.unmounted)?.(element);
},
});
/** @param { string} name
* @param { Spec } spec
* @return { void }
*/
const define = (name, spec) => {
const selector = '.' + name;
if (query.includes(selector)) return;
query.push(selector);
registry.set(selector, spec);
qsao.parse(root.querySelectorAll(selector));
};
export { define };The status component is the simplest of the components.
Before it can provide a QsaoSpec it needs to be supplied with a means to subscribe to the core app status.
The makeSpec factory function takes the required subscribe function and returns the status's QsaoSpec needed for component registration.
In the mounted lifecycle function a Binder object is stored in the instances WeakMap against the root element's object reference. The Binder holds:
rootthe element that represents the root of the component inside the DOM.texttheTextto be updated with the status message (reportedly updatingnodeValueis faster thantextContent)unsubscribeto remove the subscription to the core app's status message updates.
It's at this point that the component instance subscribes to the core app's status messages which are handled by onStatus.
onStatus updates the Text node referenced in the binder and sets/reset the error class on the root element.
The unmounted lifecycle function removes the status subscription and drops the element's Binder from the instances WeakMap.
// @ts-check
// file: src/client/components/status.ts
/** @typedef { import('../types').QsaoSpec } Spec */
/** @typedef { import('../types').Status } Status */
/** @typedef { import('../types').StatusSink } StatusSink */
/** @typedef { object } Binder
* @property { HTMLParagraphElement } root
* @property { Text } text
* @property { () => void } unsubscribe
*/
const NAME = 'js\\:c-status';
const MODIFIER_ERROR = 'js:c-status--error';
const noOp = () => {};
/** @param { Binder } binder
* @param { Status } status
* @return { void }
*/
function onStatus(binder, status) {
binder.root.classList.toggle(MODIFIER_ERROR, status.error);
binder.text.nodeValue = status.message;
}
/** @param { (fn: StatusSink) => (() => void) } subscribe
* @return { Spec }
*/
function makeSpec(subscribe) {
/** @type { WeakMap<Element, Binder> } */
const instances = new WeakMap();
/** @type { Spec } */
const spec = {
mounted(element) {
if (!(element instanceof HTMLParagraphElement)) return;
const binder = {
root: element,
text: new Text(''),
unsubscribe: noOp,
};
binder.root.appendChild(binder.text);
binder.unsubscribe = subscribe((status) => onStatus(binder, status));
instances.set(element, binder);
},
unmounted(element) {
const instance = instances.get(element);
if (!instance) return;
instance.unsubscribe();
instances.delete(element);
},
};
return spec;
}
export { NAME, makeSpec };The count component displays the count but visually also communicates when the count has completed. So makeSpec has to be supplied with both subscribeCount and subscribeAvailable before the QsaoSpec needed for registration is produced.
The mounted lifecycle function stores a Binder object in the instances WeakMap under the root element's reference.
The count's Binder holds:
rootthe element that acts as the root of the component inside the DOM.texttheTextnode holding the most recent count value.disabledreflecting the state of the counter.unsubscribesan array that holds theunsubscribethunks fromsubscribeCountandsubscribeAvailable.
The onAvailable callback compares the availableStatus to the component's disabled state.
UNAVAILABLE implies disabled === true.
If the state of disabled needs to be synchronized, the value is updated and the MODIFIER_DISABLED class is set on the root element accordingly.
The onCount callback updates the text node to the specified count.
The unmounted lifecycle function invokes all the unsubscribes thunks and deletes the Binder from instances.
// @ts-check
// file: src/client/components/count.ts
/** @typedef { import('../types').QsaoSpec } Spec */
/** @typedef { import('../types').AvailableSink } AvailableSink */
/** @typedef { import('../types').AvailableStatus } AvailableStatus */
/** @typedef { import('../types').CountSink } CountSink */
/** @typedef { object } Binder
* @property { HTMLElement } root
* @property { Text } text
* @property { boolean } disabled
* @property { (() => void)[] } unsubscribes
*/
import { availableStatus } from '../app/available';
const NAME = 'js\\:c-count';
const MODIFIER_DISABLED = 'js:c-count--disabled';
/** @param { Binder } binder
* @param { AvailableStatus } status
* @return { void }
*/
function onAvailable(binder, status) {
const isDisabled = status === availableStatus.UNAVAILABLE;
if (binder.disabled === isDisabled) return;
binder.disabled = isDisabled;
binder.root.classList.toggle(MODIFIER_DISABLED, isDisabled);
}
/** @param { Binder } binder
* @param { number } count
* @return { void }
*/
function onCount(binder, count) {
if (binder.disabled) return;
binder.text.nodeValue = String(count);
}
/** @param { Element } root
* @return { Text }
*/
function ensureTextNode(root) {
const first = root.firstChild;
if (first instanceof Text) return first;
const text = new Text('');
if (first) {
root.replaceChild(text, first);
} else {
root.appendChild(text);
}
return text;
}
/** @param { (fn: CountSink) => (() => void) } subscribeCount
* @param { (fn: AvailableSink) => (() => void) } subscribeAvailable
* @return { Spec }
*/
function makeSpec(subscribeCount, subscribeAvailable) {
/** @type { WeakMap<Element, Binder> } */
const instances = new WeakMap();
/** @type { Spec } */
const spec = {
mounted(element) {
if (!(element instanceof HTMLElement)) return;
const text = ensureTextNode(element);
/** @type { Binder } */
const binder = {
root: element,
text,
disabled: element.classList.contains(MODIFIER_DISABLED),
unsubscribes: [],
};
binder.unsubscribes[0] = subscribeCount((count) =>
onCount(binder, count)
);
binder.unsubscribes[1] = subscribeAvailable((status) =>
onAvailable(binder, status)
);
instances.set(element, binder);
},
unmounted(element) {
const instance = instances.get(element);
if (!instance) return;
for (const unsubscribe of instance.unsubscribes) unsubscribe();
instances.delete(element);
},
};
return spec;
}
export { NAME, makeSpec };The trigger component manages the button for the increment command/action. The MODIFIER_DISABLED class name relates to the disabled/enabled appearance, while MODIFIER_WAIT will further augment the disabled appearance to suggest that it will resolve in time (i.e. show a spinner). makeSpec has to be supplied with the action function to be invoked on click and a subscribe function for the availableStatus before it can furnish the QsaoSpec for component registration.
The mounted lifecycle functon stores a Binder object in the instances Weakmap under the HTMLButtonElement's reference.
The trigger's Binder holds:
roottheHTMLButtonElementrepresenting the component in the DOMdisabledtracks whether the component is considered "disabled" (which is mirrored in the button'saria-disabledattribute)clickis theclicklistener that is used by the button element to invoke theactionunsubscribeto stop receivingavailableStatusupdates from the core app.
The component subscribes to the app's availableStatus updates with the onAvailable callback and adds the click listener to the button elementi to trigger the action.
The onAvailable callback synchronizes the button's appearance based on availableStatus updates:
READYcomponent is not disabled or waitingWAITcomponent is disabled and waiting (i.e. showing spinner)UNAVAILABLEcomponent is disabled but is not waiting (i.e. no spinner)
The unmounted lifecycle function removes the click listener, unsubscribes from availableStatus updates and removes the component's Binder from instances.
// @ts-check
// file: src/client/components/trigger.ts
/** @typedef { import('../types').QsaoSpec } Spec */
/** @typedef { import('../types').AvailableStatus } AvailableStatus */
/** @typedef { import('../types').AvailableSink } AvailableSink */
/** @typedef { object } Binder
* @property { HTMLButtonElement } root
* @property { boolean } disabled
* @property { () => void } click
* @property { () => void } unsubscribe
*/
import { availableStatus } from '../app/available';
const NAME = 'js\\:c-trigger';
const MODIFIER_DISABLED = 'js:c-trigger--disabled';
const MODIFIER_WAIT = 'js:c-trigger--wait';
const noOp = () => {};
/** @param { Binder } binder
* @param { AvailableStatus } status
* @return { void }
*/
function onAvailable(binder, status) {
const [disabled, wait] =
status === availableStatus.READY
? [false, false]
: status === availableStatus.WAIT
? [true, true]
: [true, false];
binder.root.classList.toggle(MODIFIER_WAIT, wait);
binder.disabled = disabled;
binder.root.classList.toggle(MODIFIER_DISABLED, disabled);
binder.root.setAttribute('aria-disabled', String(disabled));
}
/** @param { () => void } action
* @param { (fn: AvailableSink) => (() => void) } subscribe
* @return { Spec }
*/
function makeSpec(action, subscribe) {
/** @type { WeakMap<Element, Binder> } */
const instances = new WeakMap();
/** @type { Spec } */
const spec = {
mounted(element) {
if (!(element instanceof HTMLButtonElement)) return;
const binder = {
root: element,
disabled: false,
click: () => {
if (binder.disabled) return;
action();
},
unsubscribe: noOp,
};
binder.unsubscribe = subscribe((status) => onAvailable(binder, status));
instances.set(element, binder);
binder.root.addEventListener('click', binder.click);
},
unmounted(element) {
const instance = instances.get(element);
if (!instance) return;
instance.unsubscribe();
instance.root.removeEventListener('click', instance.click);
instances.delete(element);
},
};
return spec;
}
export { NAME, makeSpec };

