|
| 1 | +import type { StepExecutionResult } from '../types/execution'; |
| 2 | +import type { CollectionSchema, RecordRef } from '../types/record'; |
| 3 | +import type { RecordTaskStepDefinition } from '../types/step-definition'; |
| 4 | +import type { TriggerActionStepExecutionData } from '../types/step-execution-data'; |
| 5 | + |
| 6 | +import { HumanMessage, SystemMessage } from '@langchain/core/messages'; |
| 7 | +import { DynamicStructuredTool } from '@langchain/core/tools'; |
| 8 | +import { z } from 'zod'; |
| 9 | + |
| 10 | +import { NoActionsError, WorkflowExecutorError } from '../errors'; |
| 11 | +import BaseStepExecutor from './base-step-executor'; |
| 12 | + |
| 13 | +const TRIGGER_ACTION_SYSTEM_PROMPT = `You are an AI agent triggering an action on a record based on a user request. |
| 14 | +Select the action to trigger. |
| 15 | +
|
| 16 | +Important rules: |
| 17 | +- Be precise: only trigger the action directly relevant to the request. |
| 18 | +- Final answer is definitive, you won't receive any other input from the user. |
| 19 | +- Do not refer to yourself as "I" in the response, use a passive formulation instead.`; |
| 20 | + |
| 21 | +interface TriggerTarget { |
| 22 | + selectedRecordRef: RecordRef; |
| 23 | + actionDisplayName: string; |
| 24 | +} |
| 25 | + |
| 26 | +export default class TriggerActionStepExecutor extends BaseStepExecutor<RecordTaskStepDefinition> { |
| 27 | + async execute(): Promise<StepExecutionResult> { |
| 28 | + // Branch A -- Re-entry with user confirmation |
| 29 | + if (this.context.userConfirmed !== undefined) { |
| 30 | + return this.handleConfirmation(); |
| 31 | + } |
| 32 | + |
| 33 | + // Branches B & C -- First call |
| 34 | + return this.handleFirstCall(); |
| 35 | + } |
| 36 | + |
| 37 | + private async handleConfirmation(): Promise<StepExecutionResult> { |
| 38 | + const stepExecutions = await this.context.runStore.getStepExecutions(this.context.runId); |
| 39 | + const execution = stepExecutions.find( |
| 40 | + (e): e is TriggerActionStepExecutionData => |
| 41 | + e.type === 'trigger-action' && e.stepIndex === this.context.stepIndex, |
| 42 | + ); |
| 43 | + |
| 44 | + if (!execution?.pendingAction) { |
| 45 | + throw new WorkflowExecutorError('No pending action found for this step'); |
| 46 | + } |
| 47 | + |
| 48 | + if (!this.context.userConfirmed) { |
| 49 | + await this.context.runStore.saveStepExecution(this.context.runId, { |
| 50 | + ...execution, |
| 51 | + executionResult: { skipped: true }, |
| 52 | + }); |
| 53 | + |
| 54 | + return this.buildOutcomeResult('success'); |
| 55 | + } |
| 56 | + |
| 57 | + const { selectedRecordRef, pendingAction } = execution; |
| 58 | + const target: TriggerTarget = { |
| 59 | + selectedRecordRef, |
| 60 | + actionDisplayName: pendingAction.actionDisplayName, |
| 61 | + }; |
| 62 | + |
| 63 | + return this.resolveAndExecute(target, execution); |
| 64 | + } |
| 65 | + |
| 66 | + private async handleFirstCall(): Promise<StepExecutionResult> { |
| 67 | + const { stepDefinition: step } = this.context; |
| 68 | + const records = await this.getAvailableRecordRefs(); |
| 69 | + |
| 70 | + let target: TriggerTarget; |
| 71 | + |
| 72 | + try { |
| 73 | + const selectedRecordRef = await this.selectRecordRef(records, step.prompt); |
| 74 | + const schema = await this.getCollectionSchema(selectedRecordRef.collectionName); |
| 75 | + const args = await this.selectAction(schema, step.prompt); |
| 76 | + target = { selectedRecordRef, actionDisplayName: args.actionDisplayName }; |
| 77 | + } catch (error) { |
| 78 | + if (error instanceof WorkflowExecutorError) { |
| 79 | + return this.buildOutcomeResult('error', error.message); |
| 80 | + } |
| 81 | + |
| 82 | + throw error; |
| 83 | + } |
| 84 | + |
| 85 | + // Branch B -- automaticExecution |
| 86 | + if (step.automaticExecution) { |
| 87 | + return this.resolveAndExecute(target); |
| 88 | + } |
| 89 | + |
| 90 | + // Branch C -- Awaiting confirmation |
| 91 | + await this.context.runStore.saveStepExecution(this.context.runId, { |
| 92 | + type: 'trigger-action', |
| 93 | + stepIndex: this.context.stepIndex, |
| 94 | + pendingAction: { actionDisplayName: target.actionDisplayName }, |
| 95 | + selectedRecordRef: target.selectedRecordRef, |
| 96 | + }); |
| 97 | + |
| 98 | + return this.buildOutcomeResult('awaiting-input'); |
| 99 | + } |
| 100 | + |
| 101 | + /** |
| 102 | + * Resolves the action name, calls executeAction, and persists execution data. |
| 103 | + * When `existingExecution` is provided (confirmation flow), it is spread into the |
| 104 | + * saved execution to preserve pendingAction for traceability. |
| 105 | + */ |
| 106 | + private async resolveAndExecute( |
| 107 | + target: TriggerTarget, |
| 108 | + existingExecution?: TriggerActionStepExecutionData, |
| 109 | + ): Promise<StepExecutionResult> { |
| 110 | + const { selectedRecordRef, actionDisplayName } = target; |
| 111 | + let actionName: string; |
| 112 | + |
| 113 | + try { |
| 114 | + const schema = await this.getCollectionSchema(selectedRecordRef.collectionName); |
| 115 | + actionName = this.resolveActionName(schema, actionDisplayName); |
| 116 | + // Return value intentionally discarded: action results may contain client data |
| 117 | + // and must not leave the client's infrastructure (privacy constraint). |
| 118 | + await this.context.agentPort.executeAction(selectedRecordRef.collectionName, actionName, [ |
| 119 | + selectedRecordRef.recordId, |
| 120 | + ]); |
| 121 | + } catch (error) { |
| 122 | + if (error instanceof WorkflowExecutorError) { |
| 123 | + return this.buildOutcomeResult('error', error.message); |
| 124 | + } |
| 125 | + |
| 126 | + throw error; |
| 127 | + } |
| 128 | + |
| 129 | + await this.context.runStore.saveStepExecution(this.context.runId, { |
| 130 | + ...existingExecution, |
| 131 | + type: 'trigger-action', |
| 132 | + stepIndex: this.context.stepIndex, |
| 133 | + executionParams: { actionDisplayName, actionName }, |
| 134 | + executionResult: { success: true }, |
| 135 | + selectedRecordRef, |
| 136 | + }); |
| 137 | + |
| 138 | + return this.buildOutcomeResult('success'); |
| 139 | + } |
| 140 | + |
| 141 | + private async selectAction( |
| 142 | + schema: CollectionSchema, |
| 143 | + prompt: string | undefined, |
| 144 | + ): Promise<{ actionDisplayName: string; reasoning: string }> { |
| 145 | + const tool = this.buildSelectActionTool(schema); |
| 146 | + const messages = [ |
| 147 | + ...(await this.buildPreviousStepsMessages()), |
| 148 | + new SystemMessage(TRIGGER_ACTION_SYSTEM_PROMPT), |
| 149 | + new SystemMessage( |
| 150 | + `The selected record belongs to the "${schema.collectionDisplayName}" collection.`, |
| 151 | + ), |
| 152 | + new HumanMessage(`**Request**: ${prompt ?? 'Trigger the relevant action.'}`), |
| 153 | + ]; |
| 154 | + |
| 155 | + return this.invokeWithTool<{ actionDisplayName: string; reasoning: string }>(messages, tool); |
| 156 | + } |
| 157 | + |
| 158 | + private buildSelectActionTool(schema: CollectionSchema): DynamicStructuredTool { |
| 159 | + if (schema.actions.length === 0) { |
| 160 | + throw new NoActionsError(schema.collectionName); |
| 161 | + } |
| 162 | + |
| 163 | + const displayNames = schema.actions.map(a => a.displayName) as [string, ...string[]]; |
| 164 | + const technicalNames = schema.actions |
| 165 | + .map(a => `${a.displayName} (technical name: ${a.name})`) |
| 166 | + .join(', '); |
| 167 | + |
| 168 | + return new DynamicStructuredTool({ |
| 169 | + name: 'select-action', |
| 170 | + description: 'Select the action to trigger on the record.', |
| 171 | + schema: z.object({ |
| 172 | + actionDisplayName: z |
| 173 | + .enum(displayNames) |
| 174 | + .describe(`The display name of the action to trigger. Available: ${technicalNames}`), |
| 175 | + reasoning: z.string().describe('Why this action was chosen'), |
| 176 | + }), |
| 177 | + func: undefined, |
| 178 | + }); |
| 179 | + } |
| 180 | + |
| 181 | + private resolveActionName(schema: CollectionSchema, displayName: string): string { |
| 182 | + const action = |
| 183 | + schema.actions.find(a => a.displayName === displayName) ?? |
| 184 | + schema.actions.find(a => a.name === displayName); |
| 185 | + |
| 186 | + if (!action) { |
| 187 | + throw new WorkflowExecutorError( |
| 188 | + `Action "${displayName}" not found in collection "${schema.collectionName}"`, |
| 189 | + ); |
| 190 | + } |
| 191 | + |
| 192 | + return action.name; |
| 193 | + } |
| 194 | +} |
0 commit comments