Skip to content

Commit 171f814

Browse files
new tracking logic
1 parent ee671ab commit 171f814

18 files changed

Lines changed: 101 additions & 715 deletions

File tree

.npmrc

Lines changed: 0 additions & 1 deletion
This file was deleted.

examples/effect-worker-v2/src/workflows/index.ts

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,9 @@ export const { Workflows, WorkflowClient } = createDurableWorkflows(
4141
},
4242
{
4343
tracker: {
44-
env: "production",
45-
serviceKey: "finance-workflows",
46-
endpoint: "https://tanstack-trpc-on-cloudflare.backpine.workers.dev/sync",
47-
retry: {
48-
maxAttempts: 3,
49-
},
44+
endpoint: "http://localhost:3000/sync",
45+
env: "dev",
46+
serviceKey: "my-service-key",
5047
},
5148
},
5249
);

packages/core/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@durable-effect/core",
3-
"version": "0.0.1-next.12",
3+
"version": "0.0.1-next.13",
44
"type": "module",
55
"main": "./dist/index.js",
66
"types": "./dist/index.d.ts",

packages/core/src/tracker/http-batch.ts

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ import { EventTracker, type EventTrackerService, type BaseTrackingEvent } from "
1313
export interface HttpBatchTrackerConfig {
1414
/** URL to send events to */
1515
readonly endpoint: string;
16+
/** Environment identifier (e.g., "production", "staging") */
17+
readonly env: string;
18+
/** User-defined service key for identifying the service */
19+
readonly serviceKey: string;
1620
/** Maximum events per batch (default: 100) */
1721
readonly batchSize?: number;
1822
/** Headers to include in requests */
@@ -85,61 +89,82 @@ export function createHttpBatchTracker<E extends BaseTrackingEvent>(
8589
retry: { ...DEFAULT_CONFIG.retry, ...config.retry },
8690
};
8791

92+
// Default enrichment adds env and serviceKey to events
93+
const defaultEnrich = (event: E): E & { env: string; serviceKey: string } => ({
94+
...event,
95+
env: config.env,
96+
serviceKey: config.serviceKey,
97+
});
98+
99+
// Compose enrichment: apply default first, then custom if provided
100+
const enrich = enrichFn
101+
? (event: E) => enrichFn(defaultEnrich(event) as E)
102+
: defaultEnrich;
103+
88104
return Effect.gen(function* () {
89105
const buffer = yield* Ref.make<E[]>([]);
90106

91107
// Send batch to endpoint
92108
const sendBatch = (events: E[]): Effect.Effect<void> => {
93-
if (events.length === 0) return Effect.void;
109+
if (events.length === 0) {
110+
return Effect.void;
111+
}
94112

95-
// Apply enrichment if provided
96-
const wireEvents = enrichFn ? events.map(enrichFn) : events;
113+
// Apply enrichment (adds env and serviceKey)
114+
const wireEvents = events.map(enrich);
97115

98116
return Effect.tryPromise({
99117
try: async () => {
118+
const body = JSON.stringify({ events: wireEvents });
119+
100120
const response = await fetch(cfg.endpoint, {
101121
method: "POST",
102122
headers: {
103123
"Content-Type": "application/json",
104124
...cfg.headers,
105125
},
106-
body: JSON.stringify({ events: wireEvents }),
126+
body,
107127
});
108128

109129
if (!response.ok) {
130+
const errorText = await response.text().catch(() => "unable to read response");
110131
throw new HttpTrackerError(
111132
"send",
112-
`HTTP ${response.status}`,
133+
`HTTP ${response.status}: ${errorText}`,
113134
response.status,
114135
);
115136
}
116137
},
117-
catch: (error) =>
118-
error instanceof HttpTrackerError
138+
catch: (error) => {
139+
return error instanceof HttpTrackerError
119140
? error
120-
: new HttpTrackerError("send", error),
141+
: new HttpTrackerError("send", error);
142+
},
121143
}).pipe(
122144
Effect.retry(
123145
Schedule.exponential(Duration.millis(cfg.retry.initialDelayMs)).pipe(
124146
Schedule.compose(Schedule.recurs(cfg.retry.maxAttempts)),
125147
),
126148
),
127-
// Don't fail on tracking error - just log and continue
149+
// Don't fail on tracking error - silently continue
128150
Effect.catchAll(() => Effect.void),
129151
);
130152
};
131153

132154
// Flush current buffer
133155
const flush = (): Effect.Effect<void> =>
134-
Ref.getAndSet(buffer, []).pipe(Effect.flatMap(sendBatch));
156+
Effect.gen(function* () {
157+
const events = yield* Ref.getAndSet(buffer, []);
158+
yield* sendBatch(events);
159+
});
135160

136161
const service: EventTrackerService<E> = {
137162
emit: (event) =>
138163
Effect.gen(function* () {
139164
yield* Ref.update(buffer, (events) => [...events, event]);
165+
const current = yield* Ref.get(buffer);
140166

141167
// Check if we should flush
142-
const current = yield* Ref.get(buffer);
143168
if (current.length >= cfg.batchSize) {
144169
yield* flush();
145170
}

packages/core/src/tracker/tracker.ts

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,17 +73,31 @@ export class EventTracker extends Context.Tag("@durable-effect/EventTracker")<
7373
export const emitEvent = <E extends BaseTrackingEvent>(
7474
event: E,
7575
): Effect.Effect<void> =>
76-
Effect.flatMap(Effect.serviceOption(EventTracker), (option) =>
77-
option._tag === "Some" ? option.value.emit(event) : Effect.void,
78-
);
76+
Effect.flatMap(Effect.serviceOption(EventTracker), (option) => {
77+
if (option._tag === "Some") {
78+
console.log(`[Tracker] emitEvent: type=${event.type}, eventId=${event.eventId}`);
79+
return option.value.emit(event);
80+
} else {
81+
console.log(`[Tracker] emitEvent: tracker not available, event type=${event.type} dropped`);
82+
return Effect.void;
83+
}
84+
});
7985

8086
/**
8187
* Flush events using the tracker from context.
8288
* Safe to call - does nothing if tracker not available.
8389
*/
8490
export const flushEvents: Effect.Effect<void> = Effect.flatMap(
8591
Effect.serviceOption(EventTracker),
86-
(option) => (option._tag === "Some" ? option.value.flush() : Effect.void),
92+
(option) => {
93+
if (option._tag === "Some") {
94+
console.log("[Tracker] flushEvents: flushing...");
95+
return option.value.flush();
96+
} else {
97+
console.log("[Tracker] flushEvents: tracker not available, nothing to flush");
98+
return Effect.void;
99+
}
100+
},
87101
);
88102

89103
/**

packages/workflow/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@durable-effect/workflow",
3-
"version": "0.0.1-next.26",
3+
"version": "0.0.1-next.28",
44
"type": "module",
55
"main": "./dist/index.js",
66
"types": "./dist/index.d.ts",

packages/workflow/src/engine/engine.ts

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import {
2222
HttpBatchTrackerLayer,
2323
NoopTrackerLayer,
2424
flushEvents,
25-
} from "../tracker";
25+
} from "@durable-effect/core";
2626
import {
2727
PurgeManager,
2828
PurgeManagerLayer,
@@ -171,13 +171,12 @@ export function createDurableWorkflows<const W extends WorkflowRegistry>(
171171
Effect.gen(function* () {
172172
const orchestrator = yield* WorkflowOrchestrator;
173173
const result = yield* orchestrator.start(call);
174+
// Flush events in the same execution context to preserve tracker instance
175+
yield* flushEvents;
174176
return result;
175177
}),
176178
);
177179

178-
// Fire-and-forget event flushing - don't block response
179-
this.ctx.waitUntil(this.#runEffect(flushEvents));
180-
181180
return { id: result.id, completed: result.completed };
182181
}
183182

@@ -186,13 +185,12 @@ export function createDurableWorkflows<const W extends WorkflowRegistry>(
186185
Effect.gen(function* () {
187186
const orchestrator = yield* WorkflowOrchestrator;
188187
const result = yield* orchestrator.queue(call);
188+
// Flush events in the same execution context to preserve tracker instance
189+
yield* flushEvents;
189190
return result;
190191
}),
191192
);
192193

193-
// Fire-and-forget event flushing - don't block response
194-
this.ctx.waitUntil(this.#runEffect(flushEvents));
195-
196194
return { id: result.id };
197195
}
198196

@@ -216,11 +214,10 @@ export function createDurableWorkflows<const W extends WorkflowRegistry>(
216214
// Not a purge alarm - handle as workflow alarm (resume/recovery)
217215
const orchestrator = yield* WorkflowOrchestrator;
218216
yield* orchestrator.handleAlarm();
217+
// Flush events in the same execution context to preserve tracker instance
218+
yield* flushEvents;
219219
}),
220220
);
221-
222-
// Fire-and-forget event flushing - don't block alarm completion
223-
this.ctx.waitUntil(this.#runEffect(flushEvents));
224221
}
225222

226223
async cancel(options?: { reason?: string }): Promise<{

packages/workflow/src/engine/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import type { WorkflowRegistry, WorkflowCall } from "../orchestrator/types";
44
import type { WorkflowStatus } from "../state/types";
5-
import type { HttpBatchTrackerConfig } from "../tracker/http-batch";
5+
import type { HttpBatchTrackerConfig } from "@durable-effect/core";
66
import type { RecoveryConfig } from "../recovery/config";
77
import type { WorkflowClientFactory } from "../client/types";
88
import type { PurgeConfig } from "../purge/config";

packages/workflow/src/index.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -220,16 +220,16 @@ export {
220220
type CancelResult as ClientCancelResult,
221221
} from "./client";
222222

223-
// Tracker
223+
// Tracker (re-exported from @durable-effect/core)
224224
export {
225225
// Service
226226
EventTracker,
227227
emitEvent,
228228
flushEvents,
229229
type EventTrackerService,
230-
// Event types from core (re-exported for convenience)
231-
createBaseEvent,
232-
enrichEvent,
230+
// Event types
231+
createWorkflowBaseEvent,
232+
enrichWorkflowEvent,
233233
type InternalWorkflowEvent,
234234
type InternalBaseEvent,
235235
type InternalWorkflowStartedEvent,
@@ -249,7 +249,7 @@ export {
249249
createInMemoryTracker,
250250
createInMemoryTrackerLayer,
251251
type InMemoryTrackerHandle,
252-
} from "./tracker";
252+
} from "@durable-effect/core";
253253

254254
// Re-export as Workflow namespace for convenience
255255
export * as Workflow from "./primitives";

packages/workflow/src/orchestrator/orchestrator.ts

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// packages/workflow/src/orchestrator/orchestrator.ts
22

33
import { Context, Effect, Layer } from "effect";
4-
import { createBaseEvent } from "@durable-effect/core";
4+
import { createWorkflowBaseEvent, emitEvent } from "@durable-effect/core";
55
import { StorageAdapter } from "../adapters/storage";
66
import { SchedulerAdapter } from "../adapters/scheduler";
77
import { RuntimeAdapter } from "../adapters/runtime";
@@ -10,7 +10,6 @@ import { Start, Queue, Resume, Cancel } from "../state/types";
1010
import { RecoveryManager } from "../recovery/manager";
1111
import { WorkflowExecutor, resultToTransition } from "../executor";
1212
import { OrchestratorError, StorageError } from "../errors";
13-
import { emitEvent } from "../tracker";
1413
import { PurgeManager, type TerminalState } from "../purge";
1514
import type { WorkflowDefinition } from "../primitives/make";
1615
import { WorkflowRegistryTag, WorkflowNotFoundError } from "./registry";
@@ -157,7 +156,7 @@ export const createWorkflowOrchestrator = <W extends WorkflowRegistry>() =>
157156

158157
// Emit workflow.started event
159158
yield* emitEvent({
160-
...createBaseEvent(
159+
...createWorkflowBaseEvent(
161160
runtime.instanceId,
162161
call.workflow,
163162
call.executionId,
@@ -180,7 +179,7 @@ export const createWorkflowOrchestrator = <W extends WorkflowRegistry>() =>
180179
yield* stateMachine.applyTransition(transition);
181180

182181
// Emit result event
183-
const baseEvent = createBaseEvent(
182+
const baseEvent = createWorkflowBaseEvent(
184183
runtime.instanceId,
185184
call.workflow,
186185
call.executionId,
@@ -277,7 +276,7 @@ export const createWorkflowOrchestrator = <W extends WorkflowRegistry>() =>
277276

278277
// Emit workflow.queued event
279278
yield* emitEvent({
280-
...createBaseEvent(
279+
...createWorkflowBaseEvent(
281280
runtime.instanceId,
282281
call.workflow,
283282
call.executionId,
@@ -320,7 +319,7 @@ export const createWorkflowOrchestrator = <W extends WorkflowRegistry>() =>
320319
executionId?: string,
321320
) =>
322321
Effect.gen(function* () {
323-
const baseEvent = createBaseEvent(
322+
const baseEvent = createWorkflowBaseEvent(
324323
runtime.instanceId,
325324
workflowName,
326325
executionId,
@@ -379,11 +378,13 @@ export const createWorkflowOrchestrator = <W extends WorkflowRegistry>() =>
379378

380379
const definition = yield* registry.get(state.workflowName);
381380

382-
yield* stateMachine.applyTransition(new Start({ input: state.input }));
381+
yield* stateMachine.applyTransition(
382+
new Start({ input: state.input }),
383+
);
383384

384385
// Emit workflow.started event
385386
yield* emitEvent({
386-
...createBaseEvent(
387+
...createWorkflowBaseEvent(
387388
runtime.instanceId,
388389
state.workflowName,
389390
state.executionId,
@@ -428,7 +429,7 @@ export const createWorkflowOrchestrator = <W extends WorkflowRegistry>() =>
428429

429430
// Emit workflow.resumed event
430431
yield* emitEvent({
431-
...createBaseEvent(
432+
...createWorkflowBaseEvent(
432433
runtime.instanceId,
433434
state.workflowName,
434435
state.executionId,
@@ -475,7 +476,7 @@ export const createWorkflowOrchestrator = <W extends WorkflowRegistry>() =>
475476

476477
// Emit workflow.resumed for recovery (similar to resume)
477478
yield* emitEvent({
478-
...createBaseEvent(
479+
...createWorkflowBaseEvent(
479480
runtime.instanceId,
480481
state.workflowName,
481482
state.executionId,
@@ -558,14 +559,14 @@ export const createWorkflowOrchestrator = <W extends WorkflowRegistry>() =>
558559
const completedSteps = yield* stateMachine.getCompletedSteps();
559560

560561
yield* stateMachine.applyTransition(
561-
new Cancel({ reason: options?.reason, completedSteps })
562+
new Cancel({ reason: options?.reason, completedSteps }),
562563
);
563564

564565
// Emit workflow.cancelled event
565566
const state = yield* stateMachine.getState();
566567
if (state) {
567568
yield* emitEvent({
568-
...createBaseEvent(
569+
...createWorkflowBaseEvent(
569570
runtime.instanceId,
570571
state.workflowName,
571572
state.executionId,

0 commit comments

Comments
 (0)