Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ interface BlockEventHandlerConfig {
executionIdRef: { current: string }
workflowEdges: Array<{ id: string; target: string; sourceHandle?: string | null }>
activeBlocksSet: Set<string>
activeBlockRefCounts: Map<string, number>
accumulatedBlockLogs: BlockLog[]
accumulatedBlockStates: Map<string, BlockState>
executedBlockIds: Set<string>
Expand Down Expand Up @@ -309,6 +310,7 @@ export function useWorkflowExecution() {
executionIdRef,
workflowEdges,
activeBlocksSet,
activeBlockRefCounts,
accumulatedBlockLogs,
accumulatedBlockStates,
executedBlockIds,
Expand All @@ -328,9 +330,18 @@ export function useWorkflowExecution() {
const updateActiveBlocks = (blockId: string, isActive: boolean) => {
if (!workflowId) return
if (isActive) {
const count = activeBlockRefCounts.get(blockId) ?? 0
activeBlockRefCounts.set(blockId, count + 1)
activeBlocksSet.add(blockId)
} else {
activeBlocksSet.delete(blockId)
const count = activeBlockRefCounts.get(blockId) ?? 1
const next = count - 1
if (next <= 0) {
activeBlockRefCounts.delete(blockId)
activeBlocksSet.delete(blockId)
} else {
activeBlockRefCounts.set(blockId, next)
}
}
setActiveBlocks(workflowId, new Set(activeBlocksSet))
}
Expand Down Expand Up @@ -1280,6 +1291,7 @@ export function useWorkflowExecution() {
}

const activeBlocksSet = new Set<string>()
const activeBlockRefCounts = new Map<string, number>()
const streamedContent = new Map<string, string>()
const accumulatedBlockLogs: BlockLog[] = []
const accumulatedBlockStates = new Map<string, BlockState>()
Expand All @@ -1292,6 +1304,7 @@ export function useWorkflowExecution() {
executionIdRef,
workflowEdges,
activeBlocksSet,
activeBlockRefCounts,
accumulatedBlockLogs,
accumulatedBlockStates,
executedBlockIds,
Expand Down Expand Up @@ -1902,13 +1915,15 @@ export function useWorkflowExecution() {
const accumulatedBlockStates = new Map<string, BlockState>()
const executedBlockIds = new Set<string>()
const activeBlocksSet = new Set<string>()
const activeBlockRefCounts = new Map<string, number>()

try {
const blockHandlers = buildBlockEventHandlers({
workflowId,
executionIdRef,
workflowEdges,
activeBlocksSet,
activeBlockRefCounts,
accumulatedBlockLogs,
accumulatedBlockStates,
executedBlockIds,
Expand Down Expand Up @@ -2104,6 +2119,7 @@ export function useWorkflowExecution() {

const workflowEdges = useWorkflowStore.getState().edges
const activeBlocksSet = new Set<string>()
const activeBlockRefCounts = new Map<string, number>()
const accumulatedBlockLogs: BlockLog[] = []
const accumulatedBlockStates = new Map<string, BlockState>()
const executedBlockIds = new Set<string>()
Expand All @@ -2115,6 +2131,7 @@ export function useWorkflowExecution() {
executionIdRef,
workflowEdges,
activeBlocksSet,
activeBlockRefCounts,
accumulatedBlockLogs,
accumulatedBlockStates,
executedBlockIds,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export async function executeWorkflowWithFullLogging(
const workflowEdges = useWorkflowStore.getState().edges

const activeBlocksSet = new Set<string>()
const activeBlockRefCounts = new Map<string, number>()

const payload: any = {
input: options.workflowInput,
Expand Down Expand Up @@ -103,6 +104,8 @@ export async function executeWorkflowWithFullLogging(

switch (event.type) {
case 'block:started': {
const startCount = activeBlockRefCounts.get(event.data.blockId) ?? 0
activeBlockRefCounts.set(event.data.blockId, startCount + 1)
activeBlocksSet.add(event.data.blockId)
setActiveBlocks(wfId, new Set(activeBlocksSet))

Expand All @@ -115,8 +118,14 @@ export async function executeWorkflowWithFullLogging(
break
}

case 'block:completed':
activeBlocksSet.delete(event.data.blockId)
case 'block:completed': {
const completeCount = activeBlockRefCounts.get(event.data.blockId) ?? 1
if (completeCount <= 1) {
activeBlockRefCounts.delete(event.data.blockId)
activeBlocksSet.delete(event.data.blockId)
} else {
activeBlockRefCounts.set(event.data.blockId, completeCount - 1)
}
setActiveBlocks(wfId, new Set(activeBlocksSet))

setBlockRunStatus(wfId, event.data.blockId, 'success')
Expand Down Expand Up @@ -144,9 +153,16 @@ export async function executeWorkflowWithFullLogging(
options.onBlockComplete(event.data.blockId, event.data.output).catch(() => {})
}
break
}

case 'block:error':
activeBlocksSet.delete(event.data.blockId)
case 'block:error': {
const errorCount = activeBlockRefCounts.get(event.data.blockId) ?? 1
if (errorCount <= 1) {
activeBlockRefCounts.delete(event.data.blockId)
activeBlocksSet.delete(event.data.blockId)
} else {
activeBlockRefCounts.set(event.data.blockId, errorCount - 1)
}
setActiveBlocks(wfId, new Set(activeBlocksSet))

setBlockRunStatus(wfId, event.data.blockId, 'error')
Expand All @@ -171,6 +187,7 @@ export async function executeWorkflowWithFullLogging(
iterationContainerId: event.data.iterationContainerId,
})
break
}

case 'execution:completed':
executionResult = {
Expand Down
4 changes: 2 additions & 2 deletions apps/sim/executor/execution/block-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ export class BlockExecutor {
block: SerializedBlock,
executionOrder: number
): void {
const blockId = node.id
const blockId = node.metadata?.originalBlockId ?? node.id
const blockName = block.metadata?.name ?? blockId
const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE

Expand Down Expand Up @@ -456,7 +456,7 @@ export class BlockExecutor {
executionOrder: number,
endedAt: string
): void {
const blockId = node.id
const blockId = node.metadata?.originalBlockId ?? node.id
Comment thread
waleedlatif1 marked this conversation as resolved.
const blockName = block.metadata?.name ?? blockId
const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE

Expand Down