Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 41 additions & 41 deletions mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,23 @@ const isNullish = (value: unknown): value is null | undefined =>
value === null || value === undefined;

export class EventEmitter<E extends Record<string, unknown[]>> {
#listeners: {
private listeners: {
[K in keyof E]?: Array<{
once: boolean;
cb: (...args: E[K]) => void;
}>;
} = {};
#globalWriters: WritableStreamDefaultWriter<Entry<E, keyof E>>[] = [];
#onWriters: {
private globalWriters: WritableStreamDefaultWriter<Entry<E, keyof E>>[] = [];
private onWriters: {
[K in keyof E]?: WritableStreamDefaultWriter<E[K]>[];
} = {};
#limit: number;
private limit: number;

/**
* @param maxListenersPerEvent - if set to 0, no limit is applied. defaults to 10
*/
constructor(maxListenersPerEvent?: number) {
this.#limit = maxListenersPerEvent ?? 10;
this.limit = maxListenersPerEvent ?? 10;
}

/**
Expand All @@ -42,33 +42,33 @@ export class EventEmitter<E extends Record<string, unknown[]>> {
listener?: (...args: E[K]) => void,
): this | AsyncIterableIterator<E[K]> {
if (listener) {
if (!this.#listeners[eventName]) {
this.#listeners[eventName] = [];
if (!this.listeners[eventName]) {
this.listeners[eventName] = [];
}
if (
this.#limit !== 0 &&
this.#listeners[eventName]!.length >= this.#limit
this.limit !== 0 &&
this.listeners[eventName]!.length >= this.limit
) {
throw new TypeError("Listeners limit reached: limit is " + this.#limit);
throw new TypeError("Listeners limit reached: limit is " + this.limit);
}
this.#listeners[eventName]!.push({
this.listeners[eventName]!.push({
once: false,
cb: listener,
});
return this;
} else {
if (!this.#onWriters[eventName]) {
this.#onWriters[eventName] = [];
if (!this.onWriters[eventName]) {
this.onWriters[eventName] = [];
}
if (
this.#limit !== 0 &&
this.#onWriters[eventName]!.length >= this.#limit
this.limit !== 0 &&
this.onWriters[eventName]!.length >= this.limit
) {
throw new TypeError("Listeners limit reached: limit is " + this.#limit);
throw new TypeError("Listeners limit reached: limit is " + this.limit);
}

const { readable, writable } = new TransformStream<E[K], E[K]>();
this.#onWriters[eventName]!.push(writable.getWriter());
this.onWriters[eventName]!.push(writable.getWriter());
return readable[Symbol.asyncIterator]();
}
}
Expand All @@ -88,24 +88,24 @@ export class EventEmitter<E extends Record<string, unknown[]>> {
eventName: K,
listener?: (...args: E[K]) => void,
): this | Promise<E[K]> {
if (!this.#listeners[eventName]) {
this.#listeners[eventName] = [];
if (!this.listeners[eventName]) {
this.listeners[eventName] = [];
}
if (
this.#limit !== 0 &&
this.#listeners[eventName]!.length >= this.#limit
this.limit !== 0 &&
this.listeners[eventName]!.length >= this.limit
) {
throw new TypeError("Listeners limit reached: limit is " + this.#limit);
throw new TypeError("Listeners limit reached: limit is " + this.limit);
}
if (listener) {
this.#listeners[eventName]!.push({
this.listeners[eventName]!.push({
once: true,
cb: listener,
});
return this;
} else {
return new Promise((res) => {
this.#listeners[eventName]!.push({
this.listeners[eventName]!.push({
once: true,
cb: (...args) => res(args),
});
Expand All @@ -126,37 +126,37 @@ export class EventEmitter<E extends Record<string, unknown[]>> {
): Promise<this> {
if (!isNullish(eventName)) {
if (listener) {
this.#listeners[eventName] = this.#listeners[eventName]?.filter(
this.listeners[eventName] = this.listeners[eventName]?.filter(
({ cb }) => cb !== listener,
);
} else {
if (this.#onWriters[eventName]) {
for (const writer of this.#onWriters[eventName]!) {
if (this.onWriters[eventName]) {
for (const writer of this.onWriters[eventName]!) {
await writer.close();
}
delete this.#onWriters[eventName];
delete this.onWriters[eventName];
}

delete this.#listeners[eventName];
delete this.listeners[eventName];
}
} else {
for (
const writers of Object.values(
this.#onWriters,
this.onWriters,
) as WritableStreamDefaultWriter<E[K]>[][]
) {
for (const writer of writers) {
await writer.close();
}
}
this.#onWriters = {};
this.onWriters = {};

for (const writer of this.#globalWriters) {
for (const writer of this.globalWriters) {
await writer.close();
}

this.#globalWriters = [];
this.#listeners = {};
this.globalWriters = [];
this.listeners = {};
}
return this;
}
Expand All @@ -167,7 +167,7 @@ export class EventEmitter<E extends Record<string, unknown[]>> {
* arguments to each.
*/
async emit<K extends keyof E>(eventName: K, ...args: E[K]): Promise<void> {
const listeners = this.#listeners[eventName]?.slice() ?? [];
const listeners = this.listeners[eventName]?.slice() ?? [];
for (const { cb, once } of listeners) {
cb(...args);

Expand All @@ -176,12 +176,12 @@ export class EventEmitter<E extends Record<string, unknown[]>> {
}
}

if (this.#onWriters[eventName]) {
for (const writer of this.#onWriters[eventName]!) {
if (this.onWriters[eventName]) {
for (const writer of this.onWriters[eventName]!) {
await writer.write(args);
}
}
for (const writer of this.#globalWriters) {
for (const writer of this.globalWriters) {
await writer.write({
name: eventName,
value: args,
Expand All @@ -192,15 +192,15 @@ export class EventEmitter<E extends Record<string, unknown[]>> {
[Symbol.asyncIterator]<K extends keyof E>(): AsyncIterableIterator<
{ [V in K]: Entry<E, V> }[K]
> {
if (this.#limit !== 0 && this.#globalWriters.length >= this.#limit) {
throw new TypeError("Listeners limit reached: limit is " + this.#limit);
if (this.limit !== 0 && this.globalWriters.length >= this.limit) {
throw new TypeError("Listeners limit reached: limit is " + this.limit);
}

const { readable, writable } = new TransformStream<
Entry<E, K>,
Entry<E, K>
>();
this.#globalWriters.push(writable.getWriter());
this.globalWriters.push(writable.getWriter());
return readable[Symbol.asyncIterator]();
}
}