Skip to content

Commit c6d9195

Browse files
author
test
committed
fix(logs): preserve fallback diagnostics semantics
Keep successful fallback output and accumulated cost intact while tightening progress-write draining and deduplicating trace span counting for diagnostics helpers.
1 parent 2901db4 commit c6d9195

File tree

7 files changed

+194
-57
lines changed

7 files changed

+194
-57
lines changed

apps/sim/lib/logs/execution/diagnostics.ts

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { countTraceSpans } from '@/lib/logs/execution/trace-span-count'
12
import type { ExecutionFinalizationPath } from '@/lib/logs/types'
23
import { isExecutionFinalizationPath } from '@/lib/logs/types'
34

@@ -14,21 +15,6 @@ type ExecutionData = {
1415
finalizationPath?: unknown
1516
}
1617

17-
function countTraceSpans(traceSpans: unknown[] | undefined): number {
18-
if (!Array.isArray(traceSpans) || traceSpans.length === 0) {
19-
return 0
20-
}
21-
22-
return traceSpans.reduce<number>((count, span) => {
23-
const children =
24-
span && typeof span === 'object' && 'children' in span && Array.isArray(span.children)
25-
? (span.children as unknown[])
26-
: undefined
27-
28-
return count + 1 + countTraceSpans(children)
29-
}, 0)
30-
}
31-
3218
export function buildExecutionDiagnostics(params: {
3319
status: string
3420
level?: string | null

apps/sim/lib/logs/execution/logger.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { databaseMock, loggerMock } from '@sim/testing'
22
import { beforeEach, describe, expect, test, vi } from 'vitest'
3-
import { ExecutionLogger } from './logger'
3+
import { ExecutionLogger } from '@/lib/logs/execution/logger'
44

55
vi.mock('@sim/db', () => databaseMock)
66

apps/sim/lib/logs/execution/logger.ts

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import { redactApiKeys } from '@/lib/core/security/redaction'
2323
import { filterForDisplay } from '@/lib/core/utils/display-filters'
2424
import { emitWorkflowExecutionCompleted } from '@/lib/logs/events'
2525
import { snapshotService } from '@/lib/logs/execution/snapshot/service'
26+
import { countTraceSpans } from '@/lib/logs/execution/trace-span-count'
2627
import type {
2728
BlockOutputData,
2829
ExecutionEnvironment,
@@ -49,14 +50,6 @@ export interface ToolCall {
4950

5051
const logger = createLogger('ExecutionLogger')
5152

52-
function countTraceSpans(traceSpans?: TraceSpan[]): number {
53-
if (!Array.isArray(traceSpans) || traceSpans.length === 0) {
54-
return 0
55-
}
56-
57-
return traceSpans.reduce((count, span) => count + 1 + countTraceSpans(span.children), 0)
58-
}
59-
6053
export class ExecutionLogger implements IExecutionLoggerService {
6154
private buildCompletedExecutionData(params: {
6255
existingExecutionData?: WorkflowExecutionLog['executionData']

apps/sim/lib/logs/execution/logging-session.test.ts

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,64 @@ describe('LoggingSession completion retries', () => {
157157
expect(session.hasCompleted()).toBe(true)
158158
})
159159

160+
it('preserves successful final output during fallback completion', async () => {
161+
const session = new LoggingSession('workflow-1', 'execution-5', 'api', 'req-1')
162+
163+
completeWorkflowExecutionMock
164+
.mockRejectedValueOnce(new Error('success finalize failed'))
165+
.mockResolvedValueOnce({})
166+
167+
await expect(
168+
session.safeComplete({ finalOutput: { ok: true, stage: 'done' } })
169+
).resolves.toBeUndefined()
170+
171+
expect(completeWorkflowExecutionMock).toHaveBeenLastCalledWith(
172+
expect.objectContaining({
173+
executionId: 'execution-5',
174+
finalOutput: { ok: true, stage: 'done' },
175+
finalizationPath: 'fallback_completed',
176+
})
177+
)
178+
})
179+
180+
it('preserves accumulated cost during fallback completion', async () => {
181+
const session = new LoggingSession('workflow-1', 'execution-6', 'api', 'req-1') as any
182+
183+
session.accumulatedCost = {
184+
total: 12,
185+
input: 5,
186+
output: 7,
187+
tokens: { input: 11, output: 13, total: 24 },
188+
models: {
189+
'test-model': {
190+
input: 5,
191+
output: 7,
192+
total: 12,
193+
tokens: { input: 11, output: 13, total: 24 },
194+
},
195+
},
196+
}
197+
session.costFlushed = true
198+
199+
completeWorkflowExecutionMock
200+
.mockRejectedValueOnce(new Error('success finalize failed'))
201+
.mockResolvedValueOnce({})
202+
203+
await expect(session.safeComplete({ finalOutput: { ok: true } })).resolves.toBeUndefined()
204+
205+
expect(completeWorkflowExecutionMock).toHaveBeenLastCalledWith(
206+
expect.objectContaining({
207+
executionId: 'execution-6',
208+
costSummary: expect.objectContaining({
209+
totalCost: 12,
210+
totalInputCost: 5,
211+
totalOutputCost: 7,
212+
totalTokens: 24,
213+
}),
214+
})
215+
)
216+
})
217+
160218
it('persists failed error semantics when completeWithError receives non-error trace spans', async () => {
161219
const session = new LoggingSession('workflow-1', 'execution-4', 'api', 'req-1')
162220
const traceSpans = [
@@ -294,6 +352,75 @@ describe('LoggingSession completion retries', () => {
294352
expect(session.complete).toHaveBeenCalledTimes(1)
295353
})
296354

355+
it('drains fire-and-forget cost flushes before terminal completion', async () => {
356+
let releaseFlush: (() => void) | undefined
357+
const flushPromise = new Promise<void>((resolve) => {
358+
releaseFlush = resolve
359+
})
360+
361+
const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1') as any
362+
session.flushAccumulatedCost = vi.fn(() => flushPromise)
363+
session.complete = vi.fn().mockResolvedValue(undefined)
364+
365+
await session.onBlockComplete('block-2', 'Transform', 'function', {
366+
endedAt: '2025-01-01T00:00:01.000Z',
367+
output: { value: true },
368+
cost: { total: 1, input: 1, output: 0 },
369+
tokens: { input: 1, output: 0, total: 1 },
370+
model: 'test-model',
371+
})
372+
373+
const completionPromise = session.safeComplete({ finalOutput: { ok: true } })
374+
375+
await Promise.resolve()
376+
377+
expect(session.complete).not.toHaveBeenCalled()
378+
379+
releaseFlush?.()
380+
381+
await completionPromise
382+
383+
expect(session.flushAccumulatedCost).toHaveBeenCalledTimes(1)
384+
expect(session.complete).toHaveBeenCalledTimes(1)
385+
})
386+
387+
it('keeps draining when new progress writes arrive during drain', async () => {
388+
let releaseFirst: (() => void) | undefined
389+
let releaseSecond: (() => void) | undefined
390+
const firstPromise = new Promise<void>((resolve) => {
391+
releaseFirst = resolve
392+
})
393+
const secondPromise = new Promise<void>((resolve) => {
394+
releaseSecond = resolve
395+
})
396+
397+
const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1') as any
398+
399+
void session.trackProgressWrite(firstPromise)
400+
401+
const drainPromise = session.drainPendingProgressWrites()
402+
403+
await Promise.resolve()
404+
405+
void session.trackProgressWrite(secondPromise)
406+
releaseFirst?.()
407+
408+
await Promise.resolve()
409+
410+
let drained = false
411+
void drainPromise.then(() => {
412+
drained = true
413+
})
414+
415+
await Promise.resolve()
416+
expect(drained).toBe(false)
417+
418+
releaseSecond?.()
419+
await drainPromise
420+
421+
expect(session.pendingProgressWrites.size).toBe(0)
422+
})
423+
297424
it('marks pause completion as terminal and prevents duplicate pause finalization', async () => {
298425
const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1') as any
299426
session.completeExecutionWithFinalization = vi.fn().mockResolvedValue(undefined)

apps/sim/lib/logs/execution/logging-session.ts

Lines changed: 42 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -224,11 +224,9 @@ export class LoggingSession {
224224
}
225225

226226
private async drainPendingProgressWrites(): Promise<void> {
227-
if (this.pendingProgressWrites.size === 0) {
228-
return
227+
while (this.pendingProgressWrites.size > 0) {
228+
await Promise.allSettled(Array.from(this.pendingProgressWrites))
229229
}
230-
231-
await Promise.allSettled(Array.from(this.pendingProgressWrites))
232230
}
233231

234232
private async completeExecutionWithFinalization(params: {
@@ -330,7 +328,7 @@ export class LoggingSession {
330328
}
331329
}
332330

333-
void this.flushAccumulatedCost()
331+
void this.trackProgressWrite(this.flushAccumulatedCost())
334332
}
335333

336334
private async flushAccumulatedCost(): Promise<void> {
@@ -921,6 +919,7 @@ export class LoggingSession {
921919
errorMessage: `Failed to store trace spans: ${errorMsg}`,
922920
isError: false,
923921
finalizationPath: 'fallback_completed',
922+
finalOutput: params.finalOutput || {},
924923
})
925924
}
926925
}
@@ -947,6 +946,9 @@ export class LoggingSession {
947946
params?.error?.message || `Execution failed to store trace spans: ${errorMsg}`,
948947
isError: true,
949948
finalizationPath: 'force_failed',
949+
finalOutput: {
950+
error: params?.error?.message || `Execution failed to store trace spans: ${errorMsg}`,
951+
},
950952
status: 'failed',
951953
})
952954
}
@@ -975,6 +977,7 @@ export class LoggingSession {
975977
errorMessage: 'Execution was cancelled',
976978
isError: false,
977979
finalizationPath: 'cancelled',
980+
finalOutput: { cancelled: true },
978981
status: 'cancelled',
979982
})
980983
}
@@ -1001,6 +1004,7 @@ export class LoggingSession {
10011004
errorMessage: 'Execution paused but failed to store full trace spans',
10021005
isError: false,
10031006
finalizationPath: 'paused',
1007+
finalOutput: { paused: true },
10041008
status: 'pending',
10051009
})
10061010
}
@@ -1054,6 +1058,7 @@ export class LoggingSession {
10541058
errorMessage: string
10551059
isError: boolean
10561060
finalizationPath: ExecutionFinalizationPath
1061+
finalOutput?: Record<string, unknown>
10571062
status?: 'completed' | 'failed' | 'cancelled' | 'pending'
10581063
}): Promise<void> {
10591064
if (this.completed || this.completing) {
@@ -1066,25 +1071,45 @@ export class LoggingSession {
10661071
)
10671072

10681073
try {
1069-
const costSummary = params.traceSpans?.length
1070-
? calculateCostSummary(params.traceSpans)
1071-
: {
1072-
totalCost: BASE_EXECUTION_CHARGE,
1073-
totalInputCost: 0,
1074-
totalOutputCost: 0,
1075-
totalTokens: 0,
1076-
totalPromptTokens: 0,
1077-
totalCompletionTokens: 0,
1074+
const hasAccumulatedCost =
1075+
this.costFlushed ||
1076+
this.accumulatedCost.total > BASE_EXECUTION_CHARGE ||
1077+
this.accumulatedCost.tokens.total > 0 ||
1078+
Object.keys(this.accumulatedCost.models).length > 0
1079+
1080+
const costSummary = hasAccumulatedCost
1081+
? {
1082+
totalCost: this.accumulatedCost.total,
1083+
totalInputCost: this.accumulatedCost.input,
1084+
totalOutputCost: this.accumulatedCost.output,
1085+
totalTokens: this.accumulatedCost.tokens.total,
1086+
totalPromptTokens: this.accumulatedCost.tokens.input,
1087+
totalCompletionTokens: this.accumulatedCost.tokens.output,
10781088
baseExecutionCharge: BASE_EXECUTION_CHARGE,
1079-
modelCost: 0,
1080-
models: {},
1089+
modelCost: Math.max(0, this.accumulatedCost.total - BASE_EXECUTION_CHARGE),
1090+
models: this.accumulatedCost.models,
10811091
}
1092+
: params.traceSpans?.length
1093+
? calculateCostSummary(params.traceSpans)
1094+
: {
1095+
totalCost: BASE_EXECUTION_CHARGE,
1096+
totalInputCost: 0,
1097+
totalOutputCost: 0,
1098+
totalTokens: 0,
1099+
totalPromptTokens: 0,
1100+
totalCompletionTokens: 0,
1101+
baseExecutionCharge: BASE_EXECUTION_CHARGE,
1102+
modelCost: 0,
1103+
models: {},
1104+
}
1105+
1106+
const finalOutput = params.finalOutput || { _fallback: true, error: params.errorMessage }
10821107

10831108
await this.completeExecutionWithFinalization({
10841109
endedAt: params.endedAt || new Date().toISOString(),
10851110
totalDurationMs: params.totalDurationMs || 0,
10861111
costSummary,
1087-
finalOutput: { _fallback: true, error: params.errorMessage },
1112+
finalOutput,
10881113
traceSpans: [],
10891114
finalizationPath: params.finalizationPath,
10901115
completionFailure: params.errorMessage,
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
export function countTraceSpans(traceSpans?: unknown[]): number {
2+
if (!Array.isArray(traceSpans) || traceSpans.length === 0) {
3+
return 0
4+
}
5+
6+
return traceSpans.reduce<number>((count, span) => {
7+
const children =
8+
span && typeof span === 'object' && 'children' in span && Array.isArray(span.children)
9+
? (span.children as unknown[])
10+
: undefined
11+
12+
return count + 1 + countTraceSpans(children)
13+
}, 0)
14+
}

apps/sim/lib/workflows/executor/execution-core.test.ts

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -615,7 +615,7 @@ describe('executeWorkflowCore terminal finalization sequencing', () => {
615615
expect(wasExecutionFinalizedByCore('engine failed', 'execution-a')).toBe(true)
616616
})
617617

618-
it('falls back to error finalization when success finalization rejects', async () => {
618+
it('does not replace a successful outcome when success finalization rejects', async () => {
619619
executorExecuteMock.mockResolvedValue({
620620
success: true,
621621
status: 'completed',
@@ -626,22 +626,14 @@ describe('executeWorkflowCore terminal finalization sequencing', () => {
626626

627627
safeCompleteMock.mockRejectedValue(new Error('completion failed'))
628628

629-
await expect(
630-
executeWorkflowCore({
631-
snapshot: createSnapshot() as any,
632-
callbacks: {},
633-
loggingSession: loggingSession as any,
634-
})
635-
).rejects.toThrow('completion failed')
629+
const result = await executeWorkflowCore({
630+
snapshot: createSnapshot() as any,
631+
callbacks: {},
632+
loggingSession: loggingSession as any,
633+
})
636634

637-
expect(safeCompleteWithErrorMock).toHaveBeenCalledTimes(1)
638-
expect(safeCompleteWithErrorMock).toHaveBeenCalledWith(
639-
expect.objectContaining({
640-
error: expect.objectContaining({
641-
message: 'completion failed',
642-
}),
643-
})
644-
)
635+
expect(result).toMatchObject({ status: 'completed', success: true })
636+
expect(safeCompleteWithErrorMock).not.toHaveBeenCalled()
645637
})
646638

647639
it('does not replace a successful outcome when cancellation cleanup fails', async () => {

0 commit comments

Comments
 (0)