Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion core/src/Stack.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import type { BaseDomainEvent } from "event-types/_base";
import type {
DomainEvent,
PoppedEvent,
Expand Down
11 changes: 11 additions & 0 deletions core/src/aggregators/Aggregator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import type { Effect } from "../Effect";
import type { DomainEvent } from "../event-types";
import type { Stack } from "../Stack";

export interface Aggregator {
getStack(): Stack;
dispatchEvent(event: DomainEvent): void;
subscribeChanges: (
listener: (effects: Effect[], stack: Stack) => void,
) => () => void;
}
97 changes: 97 additions & 0 deletions core/src/aggregators/SyncAggregator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import { aggregate } from "../aggregate";
import type { Effect } from "../Effect";
import type { DomainEvent } from "../event-types";
import { produceEffects } from "../produceEffects";
import type { Stack } from "../Stack";
import { delay } from "../utils/delay";
import type { Publisher } from "../utils/publishers/Publisher";
import type { Scheduler } from "../utils/schedulers/Scheduler";
import type { Aggregator } from "./Aggregator";

export class SyncAggregator implements Aggregator {
private events: DomainEvent[];
private latestStackSnapshot: Stack;
private changePublisher: Publisher<{ effects: Effect[]; stack: Stack }>;
private updateScheduler: Scheduler;

constructor(options: {
initialEvents: DomainEvent[];
changePublisher: Publisher<{ effects: Effect[]; stack: Stack }>;
updateScheduler: Scheduler;
}) {
this.events = options.initialEvents;
this.latestStackSnapshot = aggregate(this.events, Date.now());
this.changePublisher = options.changePublisher;
this.updateScheduler = options.updateScheduler;
}

getStack(): Stack {
return this.latestStackSnapshot;
}

dispatchEvent(event: DomainEvent): void {
this.events.push(event);
this.updateSnapshot();
}

subscribeChanges(
listener: (effects: Effect[], stack: Stack) => void,
): () => void {
return this.changePublisher.subscribe(({ effects, stack }) => {
listener(effects, stack);
});
}

private updateSnapshot(): void {
const previousSnapshot = this.latestStackSnapshot;
const currentSnapshot = aggregate(this.events, Date.now());
const effects = produceEffects(previousSnapshot, currentSnapshot);

if (effects.length > 0) {
this.latestStackSnapshot = currentSnapshot;
this.changePublisher.publish({
effects,
stack: this.latestStackSnapshot,
});
}

const earliestUpcomingTransitionStateUpdate =
this.calculateEarliestUpcomingTransitionStateUpdate();

if (earliestUpcomingTransitionStateUpdate) {
this.updateScheduler.schedule(async (options) => {
await delay(
earliestUpcomingTransitionStateUpdate.timestamp - Date.now(),
{ signal: options?.signal },
);

if (options?.signal?.aborted) return;

this.updateSnapshot();
});
}
}

private calculateEarliestUpcomingTransitionStateUpdate(): {
event: DomainEvent;
timestamp: number;
} | null {
const activeActivities = this.latestStackSnapshot.activities.filter(
(activity) =>
activity.transitionState === "enter-active" ||
activity.transitionState === "exit-active",
);
const mostRecentlyActivatedActivity = activeActivities.sort(
(a, b) => a.estimatedTransitionEnd - b.estimatedTransitionEnd,
)[0];

return mostRecentlyActivatedActivity
? {
event:
mostRecentlyActivatedActivity.exitedBy ??
mostRecentlyActivatedActivity.enteredBy,
timestamp: mostRecentlyActivatedActivity.estimatedTransitionEnd,
}
: null;
}
}
60 changes: 19 additions & 41 deletions core/src/makeCoreStore.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
import isEqual from "react-fast-compare";
import { aggregate } from "./aggregate";
import type { Aggregator } from "./aggregators/Aggregator";
import { SyncAggregator } from "./aggregators/SyncAggregator";
import type { DomainEvent, PushedEvent, StepPushedEvent } from "./event-types";
import { makeEvent } from "./event-utils";
import type { StackflowActions, StackflowPlugin } from "./interfaces";
import { produceEffects } from "./produceEffects";
import type { Stack } from "./Stack";
import { divideBy, once } from "./utils";
import { Mutex } from "./utils/Mutex";
import { makeActions } from "./utils/makeActions";
import { ScheduledPublisher } from "./utils/publishers/ScheduledPublisher";
import { SequentialScheduler } from "./utils/schedulers/SequentialScheduler";
import { SwitchScheduler } from "./utils/schedulers/SwitchScheduler";
import { triggerPostEffectHooks } from "./utils/triggerPostEffectHooks";

const SECOND = 1000;

// 60FPS
const INTERVAL_MS = SECOND / 60;

export type MakeCoreStoreOptions = {
initialEvents: DomainEvent[];
initialContext?: any;
Expand Down Expand Up @@ -76,39 +73,26 @@ export function makeCoreStore(options: MakeCoreStoreOptions): CoreStore {
options.handlers?.onInitialActivityNotFound?.();
}

const events: { value: DomainEvent[] } = {
value: [...initialRemainingEvents, ...initialPushedEvents],
};
const aggregator: Aggregator = new SyncAggregator({
initialEvents: [...initialRemainingEvents, ...initialPushedEvents],
changePublisher: new ScheduledPublisher(
new SequentialScheduler(new Mutex()),
),
updateScheduler: new SwitchScheduler(new Mutex()),
});

const stack = {
value: aggregate(events.value, new Date().getTime()),
};
aggregator.subscribeChanges((effects) => {
triggerPostEffectHooks(effects, pluginInstances, actions);
});

const actions: StackflowActions = {
getStack() {
return stack.value;
return aggregator.getStack();
},
dispatchEvent(name, params) {
const newEvent = makeEvent(name, params);
const nextStackValue = aggregate(
[...events.value, newEvent],
new Date().getTime(),
);

events.value.push(newEvent);
setStackValue(nextStackValue);

const interval = setInterval(() => {
const nextStackValue = aggregate(events.value, new Date().getTime());

if (!isEqual(stack.value, nextStackValue)) {
setStackValue(nextStackValue);
}

if (nextStackValue.globalTransitionState === "idle") {
clearInterval(interval);
}
}, INTERVAL_MS);
aggregator.dispatchEvent(newEvent);
},
push: () => {},
replace: () => {},
Expand All @@ -120,12 +104,6 @@ export function makeCoreStore(options: MakeCoreStoreOptions): CoreStore {
resume: () => {},
};

const setStackValue = (nextStackValue: Stack) => {
const effects = produceEffects(stack.value, nextStackValue);
stack.value = nextStackValue;
triggerPostEffectHooks(effects, pluginInstances, actions);
};

// Initialize action methods after actions object is fully created
Object.assign(
actions,
Expand All @@ -145,7 +123,7 @@ export function makeCoreStore(options: MakeCoreStoreOptions): CoreStore {
});
});
}),
pullEvents: () => events.value,
pullEvents: () => aggregator.getStack().events,
subscribe(listener) {
storeListeners.push(listener);

Expand Down
14 changes: 7 additions & 7 deletions core/src/produceEffects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import type { Stack } from "./Stack";
import { omit } from "./utils";

export function produceEffects(prevOutput: Stack, nextOutput: Stack): Effect[] {
const output: Effect[] = [];

const somethingChanged = !isEqual(prevOutput, nextOutput);
if (isEqual(prevOutput, nextOutput)) {
return [];
}

if (somethingChanged) {
output.push({
const output: Effect[] = [
{
_TAG: "%SOMETHING_CHANGED%",
});
}
},
];

const isPaused =
prevOutput.globalTransitionState !== "paused" &&
Expand Down
19 changes: 19 additions & 0 deletions core/src/utils/Mutex.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { SequentialScheduler } from "./schedulers/SequentialScheduler";

export class Mutex {
private sequentialScheduler: SequentialScheduler = new SequentialScheduler();

acquire(options?: { signal?: AbortSignal }): Promise<LockHandle> {
return new Promise((resolve, reject) => {
this.sequentialScheduler
.schedule(() => new Promise<void>((release) => resolve({ release })), {
signal: options?.signal,
})
.catch(reject);
});
}
}

export interface LockHandle {
release: () => void;
}
27 changes: 27 additions & 0 deletions core/src/utils/delay.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { getAbortReason } from "./getAbortReason";

export function delay(
ms: number,
options?: { signal?: AbortSignal },
): Promise<void> {
return new Promise((resolve, reject) => {
const signal = options?.signal;

if (signal?.aborted) throw getAbortReason(signal);

const abortHandler = () => {
if (!signal) return;

clearTimeout(timeoutId);
reject(getAbortReason(signal));
};
const timeoutId = setTimeout(() => {
if (signal?.aborted) abortHandler();
else resolve();

signal?.removeEventListener("abort", abortHandler);
}, ms);

signal?.addEventListener("abort", abortHandler, { once: true });
});
}
7 changes: 7 additions & 0 deletions core/src/utils/getAbortReason.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export function getAbortReason(signal: AbortSignal): unknown {
if (!signal.aborted) throw new Error("the signal was not aborted");

return (
signal.reason ?? new DOMException("an operation was aborted", "AbortError")
);
}
4 changes: 4 additions & 0 deletions core/src/utils/publishers/Publisher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export interface Publisher<T> {
publish(value: T): void;
subscribe(subscriber: (value: T) => void): () => void;
}
39 changes: 39 additions & 0 deletions core/src/utils/publishers/ScheduledPublisher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import type { Scheduler } from "../schedulers/Scheduler";
import type { Publisher } from "./Publisher";

export class ScheduledPublisher<T> implements Publisher<T> {
private scheduler: Scheduler;
private subscribers: ((value: T) => void)[];
private handlePublishError: (error: unknown, value: T) => void;

constructor(
scheduler: Scheduler,
options?: { handlePublishError?: (error: unknown, value: T) => void },
) {
this.scheduler = scheduler;
this.subscribers = [];
this.handlePublishError = options?.handlePublishError ?? (() => {});
}

publish(value: T): void {
const subscribers = this.subscribers.slice();

this.scheduler.schedule(async () => {
for (const subscriber of subscribers) {
try {
subscriber(value);
} catch (error) {
this.handlePublishError(error, value);
}
}
});
}

subscribe(subscriber: (value: T) => void): () => void {
this.subscribers.push(subscriber);

return () => {
this.subscribers = this.subscribers.filter((s) => s !== subscriber);
};
}
}
6 changes: 6 additions & 0 deletions core/src/utils/schedulers/Scheduler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export interface Scheduler {
schedule<T>(
task: (options?: { signal?: AbortSignal }) => Promise<T>,
options?: { signal?: AbortSignal },
): Promise<T>;
}
Loading
Loading