Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/sim/executor/handlers/agent/agent-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1086,6 +1086,7 @@ export class AgentBlockHandler implements BlockHandler {
verbosity: providerRequest.verbosity,
thinkingLevel: providerRequest.thinkingLevel,
previousInteractionId: providerRequest.previousInteractionId,
abortSignal: ctx.abortSignal,
})

return this.processProviderResponse(response, block, responseFormat)
Expand Down
37 changes: 24 additions & 13 deletions apps/sim/providers/anthropic/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,20 @@ const ANTHROPIC_SDK_NON_STREAMING_MAX_TOKENS = 21333
*/
async function createMessage(
anthropic: Anthropic,
payload: AnthropicPayload
payload: AnthropicPayload,
abortSignal?: AbortSignal
): Promise<Anthropic.Messages.Message> {
const options = abortSignal ? { signal: abortSignal } : undefined
if (payload.max_tokens > ANTHROPIC_SDK_NON_STREAMING_MAX_TOKENS && !payload.stream) {
const stream = anthropic.messages.stream(payload as Anthropic.Messages.MessageStreamParams)
const stream = anthropic.messages.stream(
payload as Anthropic.Messages.MessageStreamParams,
options
)
return stream.finalMessage()
}
return anthropic.messages.create(
payload as Anthropic.Messages.MessageCreateParamsNonStreaming
payload as Anthropic.Messages.MessageCreateParamsNonStreaming,
options
) as Promise<Anthropic.Messages.Message>
}

Expand Down Expand Up @@ -367,10 +373,13 @@ export async function executeAnthropicProviderRequest(
const providerStartTime = Date.now()
const providerStartTimeISO = new Date(providerStartTime).toISOString()

const streamResponse = await anthropic.messages.create({
...payload,
stream: true,
} as Anthropic.Messages.MessageCreateParamsStreaming)
const streamResponse = await anthropic.messages.create(
{
...payload,
stream: true,
} as Anthropic.Messages.MessageCreateParamsStreaming,
request.abortSignal ? { signal: request.abortSignal } : undefined
)

const streamingResult = {
stream: createReadableStreamFromAnthropicStream(
Expand Down Expand Up @@ -461,7 +470,7 @@ export async function executeAnthropicProviderRequest(
const forcedTools = preparedTools?.forcedTools || []
let usedForcedTools: string[] = []

let currentResponse = await createMessage(anthropic, payload)
let currentResponse = await createMessage(anthropic, payload, request.abortSignal)
const firstResponseTime = Date.now() - initialCallTime

let content = ''
Expand Down Expand Up @@ -708,7 +717,7 @@ export async function executeAnthropicProviderRequest(

const nextModelStartTime = Date.now()

currentResponse = await createMessage(anthropic, nextPayload)
currentResponse = await createMessage(anthropic, nextPayload, request.abortSignal)

const nextCheckResult = checkForForcedToolUsage(
currentResponse,
Expand Down Expand Up @@ -758,7 +767,8 @@ export async function executeAnthropicProviderRequest(
}

const streamResponse = await anthropic.messages.create(
streamingPayload as Anthropic.Messages.MessageCreateParamsStreaming
streamingPayload as Anthropic.Messages.MessageCreateParamsStreaming,
request.abortSignal ? { signal: request.abortSignal } : undefined
)

const streamingResult = {
Expand Down Expand Up @@ -860,7 +870,7 @@ export async function executeAnthropicProviderRequest(
const forcedTools = preparedTools?.forcedTools || []
let usedForcedTools: string[] = []

let currentResponse = await createMessage(anthropic, payload)
let currentResponse = await createMessage(anthropic, payload, request.abortSignal)
const firstResponseTime = Date.now() - initialCallTime

let content = ''
Expand Down Expand Up @@ -1118,7 +1128,7 @@ export async function executeAnthropicProviderRequest(

const nextModelStartTime = Date.now()

currentResponse = await createMessage(anthropic, nextPayload)
currentResponse = await createMessage(anthropic, nextPayload, request.abortSignal)

const nextCheckResult = checkForForcedToolUsage(
currentResponse,
Expand Down Expand Up @@ -1182,7 +1192,8 @@ export async function executeAnthropicProviderRequest(
}

const streamResponse = await anthropic.messages.create(
streamingPayload as Anthropic.Messages.MessageCreateParamsStreaming
streamingPayload as Anthropic.Messages.MessageCreateParamsStreaming,
request.abortSignal ? { signal: request.abortSignal } : undefined
)

const streamingResult = {
Expand Down
20 changes: 16 additions & 4 deletions apps/sim/providers/azure-openai/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,10 @@ async function executeChatCompletionsRequest(
stream: true,
stream_options: { include_usage: true },
}
const streamResponse = await azureOpenAI.chat.completions.create(streamingParams)
const streamResponse = await azureOpenAI.chat.completions.create(
streamingParams,
request.abortSignal ? { signal: request.abortSignal } : undefined
)

const streamingResult = {
stream: createReadableStreamFromAzureOpenAIStream(streamResponse, (content, usage) => {
Expand Down Expand Up @@ -243,7 +246,10 @@ async function executeChatCompletionsRequest(
const forcedTools = preparedTools?.forcedTools || []
let usedForcedTools: string[] = []

let currentResponse = (await azureOpenAI.chat.completions.create(payload)) as ChatCompletion
let currentResponse = (await azureOpenAI.chat.completions.create(
payload,
request.abortSignal ? { signal: request.abortSignal } : undefined
)) as ChatCompletion
const firstResponseTime = Date.now() - initialCallTime

let content = currentResponse.choices[0]?.message?.content || ''
Expand Down Expand Up @@ -421,7 +427,10 @@ async function executeChatCompletionsRequest(
}

const nextModelStartTime = Date.now()
currentResponse = (await azureOpenAI.chat.completions.create(nextPayload)) as ChatCompletion
currentResponse = (await azureOpenAI.chat.completions.create(
nextPayload,
request.abortSignal ? { signal: request.abortSignal } : undefined
)) as ChatCompletion

const nextCheckResult = checkForForcedToolUsage(
currentResponse,
Expand Down Expand Up @@ -471,7 +480,10 @@ async function executeChatCompletionsRequest(
stream: true,
stream_options: { include_usage: true },
}
const streamResponse = await azureOpenAI.chat.completions.create(streamingParams)
const streamResponse = await azureOpenAI.chat.completions.create(
streamingParams,
request.abortSignal ? { signal: request.abortSignal } : undefined
)

const streamingResult = {
stream: createReadableStreamFromAzureOpenAIStream(streamResponse, (content, usage) => {
Expand Down
25 changes: 20 additions & 5 deletions apps/sim/providers/bedrock/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,10 @@ export const bedrockProvider: ProviderConfig = {
inferenceConfig,
})

const streamResponse = await client.send(command)
const streamResponse = await client.send(
command,
request.abortSignal ? { abortSignal: request.abortSignal } : undefined
)

if (!streamResponse.stream) {
throw new Error('No stream returned from Bedrock')
Expand Down Expand Up @@ -379,7 +382,10 @@ export const bedrockProvider: ProviderConfig = {
toolConfig,
})

let currentResponse = await client.send(command)
let currentResponse = await client.send(
command,
request.abortSignal ? { abortSignal: request.abortSignal } : undefined
)
const firstResponseTime = Date.now() - initialCallTime

let content = ''
Expand Down Expand Up @@ -628,7 +634,10 @@ export const bedrockProvider: ProviderConfig = {
: undefined,
})

currentResponse = await client.send(nextCommand)
currentResponse = await client.send(
nextCommand,
request.abortSignal ? { abortSignal: request.abortSignal } : undefined
)

const nextToolUseContentBlocks = (currentResponse.output?.message?.content || []).filter(
(block): block is ContentBlock & { toolUse: ToolUseBlock } => 'toolUse' in block
Expand Down Expand Up @@ -696,7 +705,10 @@ export const bedrockProvider: ProviderConfig = {
},
})

const structuredResponse = await client.send(structuredOutputCommand)
const structuredResponse = await client.send(
structuredOutputCommand,
request.abortSignal ? { abortSignal: request.abortSignal } : undefined
)
const structuredOutputEndTime = Date.now()

timeSegments.push({
Expand Down Expand Up @@ -782,7 +794,10 @@ export const bedrockProvider: ProviderConfig = {
toolConfig: streamToolConfig,
})

const streamResponse = await client.send(streamCommand)
const streamResponse = await client.send(
streamCommand,
request.abortSignal ? { abortSignal: request.abortSignal } : undefined
)

if (!streamResponse.stream) {
throw new Error('No stream returned from Bedrock')
Expand Down
27 changes: 19 additions & 8 deletions apps/sim/providers/cerebras/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,13 @@ export const cerebrasProvider: ProviderConfig = {
if (request.stream && (!tools || tools.length === 0)) {
logger.info('Using streaming response for Cerebras request (no tools)')

const streamResponse: any = await client.chat.completions.create({
...payload,
stream: true,
})
const streamResponse: any = await client.chat.completions.create(
{
...payload,
stream: true,
},
request.abortSignal ? { signal: request.abortSignal } : undefined
)

const streamingResult = {
stream: createReadableStreamFromCerebrasStream(streamResponse, (content, usage) => {
Expand Down Expand Up @@ -179,7 +182,10 @@ export const cerebrasProvider: ProviderConfig = {
}
const initialCallTime = Date.now()

let currentResponse = (await client.chat.completions.create(payload)) as CerebrasResponse
let currentResponse = (await client.chat.completions.create(
payload,
request.abortSignal ? { signal: request.abortSignal } : undefined
)) as CerebrasResponse
const firstResponseTime = Date.now() - initialCallTime

let content = currentResponse.choices[0]?.message?.content || ''
Expand Down Expand Up @@ -365,7 +371,8 @@ export const cerebrasProvider: ProviderConfig = {
finalPayload.tool_choice = 'none'

const finalResponse = (await client.chat.completions.create(
finalPayload
finalPayload,
request.abortSignal ? { signal: request.abortSignal } : undefined
)) as CerebrasResponse

const nextModelEndTime = Date.now()
Expand Down Expand Up @@ -401,7 +408,8 @@ export const cerebrasProvider: ProviderConfig = {

const nextModelStartTime = Date.now()
currentResponse = (await client.chat.completions.create(
nextPayload
nextPayload,
request.abortSignal ? { signal: request.abortSignal } : undefined
)) as CerebrasResponse

const nextModelEndTime = Date.now()
Expand Down Expand Up @@ -443,7 +451,10 @@ export const cerebrasProvider: ProviderConfig = {
stream: true,
}

const streamResponse: any = await client.chat.completions.create(streamingPayload)
const streamResponse: any = await client.chat.completions.create(
streamingPayload,
request.abortSignal ? { signal: request.abortSignal } : undefined
)

const accumulatedCost = calculateCost(request.model, tokens.input, tokens.output)

Expand Down
26 changes: 19 additions & 7 deletions apps/sim/providers/deepseek/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,13 @@ export const deepseekProvider: ProviderConfig = {
if (request.stream && (!tools || tools.length === 0)) {
logger.info('Using streaming response for DeepSeek request (no tools)')

const streamResponse = await deepseek.chat.completions.create({
...payload,
stream: true,
})
const streamResponse = await deepseek.chat.completions.create(
{
...payload,
stream: true,
},
request.abortSignal ? { signal: request.abortSignal } : undefined
)

const streamingResult = {
stream: createReadableStreamFromDeepseekStream(
Expand Down Expand Up @@ -183,7 +186,10 @@ export const deepseekProvider: ProviderConfig = {
const forcedTools = preparedTools?.forcedTools || []
let usedForcedTools: string[] = []

let currentResponse = await deepseek.chat.completions.create(payload)
let currentResponse = await deepseek.chat.completions.create(
payload,
request.abortSignal ? { signal: request.abortSignal } : undefined
)
const firstResponseTime = Date.now() - initialCallTime

let content = currentResponse.choices[0]?.message?.content || ''
Expand Down Expand Up @@ -375,7 +381,10 @@ export const deepseekProvider: ProviderConfig = {
}

const nextModelStartTime = Date.now()
currentResponse = await deepseek.chat.completions.create(nextPayload)
currentResponse = await deepseek.chat.completions.create(
nextPayload,
request.abortSignal ? { signal: request.abortSignal } : undefined
)

if (
typeof nextPayload.tool_choice === 'object' &&
Expand Down Expand Up @@ -439,7 +448,10 @@ export const deepseekProvider: ProviderConfig = {
stream: true,
}

const streamResponse = await deepseek.chat.completions.create(streamingPayload)
const streamResponse = await deepseek.chat.completions.create(
streamingPayload,
request.abortSignal ? { signal: request.abortSignal } : undefined
)

const accumulatedCost = calculateCost(request.model, tokens.input, tokens.output)

Expand Down
3 changes: 3 additions & 0 deletions apps/sim/providers/gemini/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,9 @@ export async function executeGeminiRequest(
// Build configuration
const geminiConfig: GenerateContentConfig = {}

if (request.abortSignal) {
Comment thread
waleedlatif1 marked this conversation as resolved.
Comment thread
waleedlatif1 marked this conversation as resolved.
geminiConfig.abortSignal = request.abortSignal
}
if (request.temperature !== undefined) {
geminiConfig.temperature = request.temperature
}
Expand Down
26 changes: 19 additions & 7 deletions apps/sim/providers/groq/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,13 @@ export const groqProvider: ProviderConfig = {
const providerStartTime = Date.now()
const providerStartTimeISO = new Date(providerStartTime).toISOString()

const streamResponse = await groq.chat.completions.create({
...payload,
stream: true,
})
const streamResponse = await groq.chat.completions.create(
{
...payload,
stream: true,
},
request.abortSignal ? { signal: request.abortSignal } : undefined
)

const streamingResult = {
stream: createReadableStreamFromGroqStream(streamResponse as any, (content, usage) => {
Expand Down Expand Up @@ -185,7 +188,10 @@ export const groqProvider: ProviderConfig = {
try {
const initialCallTime = Date.now()

let currentResponse = await groq.chat.completions.create(payload)
let currentResponse = await groq.chat.completions.create(
payload,
request.abortSignal ? { signal: request.abortSignal } : undefined
)
const firstResponseTime = Date.now() - initialCallTime

let content = currentResponse.choices[0]?.message?.content || ''
Expand Down Expand Up @@ -355,7 +361,10 @@ export const groqProvider: ProviderConfig = {
}

const nextModelStartTime = Date.now()
currentResponse = await groq.chat.completions.create(nextPayload)
currentResponse = await groq.chat.completions.create(
nextPayload,
request.abortSignal ? { signal: request.abortSignal } : undefined
)

const nextModelEndTime = Date.now()
const thisModelTime = nextModelEndTime - nextModelStartTime
Expand Down Expand Up @@ -396,7 +405,10 @@ export const groqProvider: ProviderConfig = {
stream: true,
}

const streamResponse = await groq.chat.completions.create(streamingPayload)
const streamResponse = await groq.chat.completions.create(
streamingPayload,
request.abortSignal ? { signal: request.abortSignal } : undefined
)

const accumulatedCost = calculateCost(request.model, tokens.input, tokens.output)

Expand Down
Loading