Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions src/class/externals.class.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,37 @@
// Import Internal Dependencies
import { Incomer, type IncomerOptions } from "./incomer.class.js";
import { EventCallbackResponse, Incomer, type IncomerOptions } from "./incomer.class.js";
import { Dispatcher } from "./dispatcher.class.js";
import type {
CallBackEventMessage,
GenericEvent
} from "../types/index.js";

export class Externals<T extends GenericEvent = GenericEvent> {
public incomer: Incomer<T>;
public dispatcher: Dispatcher<T>;
export class Externals<
TListenedEvents extends GenericEvent = GenericEvent,
KCastedEvents extends GenericEvent = GenericEvent
> {
public incomer: Incomer<KCastedEvents, TListenedEvents>;
public dispatcher: Dispatcher<TListenedEvents | KCastedEvents>;

constructor(
options: IncomerOptions<T>
options: IncomerOptions<TListenedEvents, KCastedEvents>
) {
this.incomer = new Incomer({
const opts: IncomerOptions<KCastedEvents, TListenedEvents> = {
...options,
eventCallback: options.eventCallback as unknown as (
message: CallBackEventMessage<KCastedEvents>
) => Promise<EventCallbackResponse>,
eventsCast: options.eventsSubscribe.map((val) => val.name),
eventsSubscribe: options.eventsCast.map((eventCast) => {
return {
name: eventCast
};
}),
externalsInitialized: true
});
};
this.incomer = new Incomer<KCastedEvents, TListenedEvents>(opts);

this.dispatcher = new Dispatcher({
this.dispatcher = new Dispatcher<TListenedEvents | KCastedEvents>({
...options,
pingInterval: Number(process.env.MYUNISOFT_DISPATCHER_PING) || undefined,
checkLastActivityInterval: Number(process.env.MYUNISOFT_DISPATCHER_ACTIVITY_CHECK) || undefined,
Expand Down
52 changes: 29 additions & 23 deletions src/class/incomer.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,19 +98,22 @@ function isUnresolvedEvent(value: EventCallbackResponse): value is EventCallback
return value.ok && Symbol.for(value.val.status) === UNRESOLVED;
}

export type IncomerOptions<T extends GenericEvent = GenericEvent> = {
export type IncomerOptions<
TListenedEvents extends GenericEvent = GenericEvent,
KCastedEvents extends GenericEvent = GenericEvent
> = {
name: string;
redis: RedisAdapter;
subscriber: RedisAdapter;
logger?: Logger;
standardLog?: StandardLog<T>;
standardLog?: StandardLog<TListenedEvents | KCastedEvents>;
eventsCast: string[];
eventsSubscribe: EventSubscribe[];
eventsValidation?: {
eventsValidationFn?: eventsValidationFn<T>;
customValidationCbFn?: customValidationCbFn<T>;
eventsValidationFn?: eventsValidationFn<TListenedEvents | KCastedEvents>;
customValidationCbFn?: customValidationCbFn<TListenedEvents | KCastedEvents>;
};
eventCallback: (message: CallBackEventMessage<T>) => Promise<EventCallbackResponse>;
eventCallback: (message: CallBackEventMessage<TListenedEvents>) => Promise<EventCallbackResponse>;
dispatcherInactivityOptions?: {
/* interval between two pings */
maxPingInterval?: number;
Expand All @@ -124,16 +127,17 @@ export type IncomerOptions<T extends GenericEvent = GenericEvent> = {
};

export class Incomer <
T extends GenericEvent = GenericEvent
TListenedEvents extends GenericEvent = GenericEvent,
KCastedEvents extends GenericEvent = GenericEvent
> extends EventEmitter {
readonly name: string;
readonly eventCallback: (message: CallBackEventMessage<T>) => Promise<EventCallbackResponse>;
readonly eventCallback: (message: CallBackEventMessage<TListenedEvents>) => Promise<EventCallbackResponse>;

public dispatcherConnectionState = false;
public baseUUID = randomUUID();
public providedUUID: string;

private incomerChannel: Channel<IncomerChannelMessages<T>["IncomerMessages"] | RetryMessage>;
private incomerChannel: Channel<IncomerChannelMessages<TListenedEvents | KCastedEvents>["IncomerMessages"] | RetryMessage>;
private incomerChannelName: string;
private subscriber: RedisAdapter;
private defaultIncomerTransactionStore: TransactionStore<"incomer">;
Expand All @@ -150,7 +154,7 @@ export class Incomer <
#publishInterval: number;
#maxPingInterval: number;

#standardLogFn: StandardLog<T>;
#standardLogFn: StandardLog<TListenedEvents | KCastedEvents>;
#dispatcherChannel: Channel<IncomerRegistrationMessage>;
#dispatcherTransactionStore: TransactionStore<"dispatcher">;

Expand All @@ -164,11 +168,11 @@ export class Incomer <
private lastActivity: number;
#idleTime: number;
#eventsValidationFn: Map<string, ValidateFunction<Record<string, any>> | NestedValidationFunctions> | undefined;
#customValidationCbFn: ((event: T) => void) | undefined;
#customValidationCbFn: ((event: KCastedEvents | TListenedEvents) => void) | undefined;

public externals: Externals<T> | undefined;
public externals: Externals<TListenedEvents, KCastedEvents> | undefined;

constructor(options: IncomerOptions<T>) {
constructor(options: IncomerOptions<TListenedEvents, KCastedEvents>) {
super();

Object.assign(this, {}, options);
Expand Down Expand Up @@ -288,7 +292,7 @@ export class Incomer <
transactionId: transaction.redisMetadata.transactionId,
origin: transaction.redisMetadata.origin
}
} as unknown as IncomerChannelMessages<T>["IncomerMessages"]);
} as unknown as IncomerChannelMessages<KCastedEvents>["IncomerMessages"]);

this.logger.info(
this.#standardLogFn({
Expand Down Expand Up @@ -555,16 +559,16 @@ export class Incomer <
}

public async publish(
event: T
event: KCastedEvents
) {
const formattedEvent = {
...event,
redisMetadata: {
origin: this.providedUUID,
incomerName: this.name
}
} as unknown as Omit<EventMessage<T>, "redisMetadata"> & {
redisMetadata: Omit<EventMessage<T>["redisMetadata"], "transactionId">
} as unknown as Omit<EventMessage<KCastedEvents>, "redisMetadata"> & {
redisMetadata: Omit<EventMessage<KCastedEvents>["redisMetadata"], "transactionId">
};

if (this.#eventsValidationFn) {
Expand Down Expand Up @@ -596,7 +600,7 @@ export class Incomer <
...formattedEvent.redisMetadata,
transactionId: (transaction.redisMetadata as any).transactionId
}
} as unknown as EventMessage<T>;
} as unknown as EventMessage<KCastedEvents>;

if (!this.dispatcherConnectionState) {
this.logger.info(this.#standardLogFn({
Expand Down Expand Up @@ -634,7 +638,7 @@ export class Incomer <
}

const formattedMessage: DispatcherApprovementMessage |
IncomerChannelMessages<T>["DispatcherMessages"] = JSON.parse(message);
IncomerChannelMessages<TListenedEvents>["DispatcherMessages"] = JSON.parse(message);

if (
(formattedMessage.redisMetadata && formattedMessage.redisMetadata.origin === this.providedUUID) ||
Expand Down Expand Up @@ -698,7 +702,7 @@ export class Incomer <

private async handleIncomerMessages(
channel: string,
message: IncomerChannelMessages<T>["DispatcherMessages"]
message: IncomerChannelMessages<TListenedEvents>["DispatcherMessages"]
): Promise<void> {
const { name } = message;

Expand All @@ -708,7 +712,7 @@ export class Incomer <
return;
}

await this.customEvent({ name, channel, message: message as DistributedEventMessage<T> });
await this.customEvent({ name, channel, message: message as DistributedEventMessage<TListenedEvents> });
}

private async handlePing(channel: string, message: DispatcherPingMessage) {
Expand Down Expand Up @@ -743,7 +747,7 @@ export class Incomer <
}

private async customEvent(
options: { name: string, channel: string, message: DistributedEventMessage<T> }
options: { name: string, channel: string, message: DistributedEventMessage<TListenedEvents> }
) {
const { message, channel } = options;
const { redisMetadata, ...event } = message;
Expand All @@ -762,7 +766,7 @@ export class Incomer <
throw new Error(`Unknown Event ${event.name}`);
}

this.#customValidationCbFn(event as unknown as T);
this.#customValidationCbFn(event as unknown as TListenedEvents);
}

const spreadTransaction = await this.#dispatcherTransactionStore.getTransactionById(transactionId);
Expand All @@ -771,7 +775,9 @@ export class Incomer <
throw new Error(`Unable to find the given spread transaction ${transactionId}`);
}

const callbackResult = await this.eventCallback({ ...event, eventTransactionId } as unknown as CallBackEventMessage<T>);
const callbackResult = await this.eventCallback(
{ ...event, eventTransactionId } as unknown as CallBackEventMessage<TListenedEvents>
);

let reason = callbackResult.val;
if (callbackResult.ok) {
Expand Down
Loading