Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions lib/ai/clients/mastra.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def agent_names
request['Origin'] = Ai.config.origin
request['Authorization'] = "Bearer #{Ai.config.api_key}" if Ai.config.api_key.present?

http = build_http
response = http.request(request)
unless response.is_a?(Net::HTTPSuccess)
raise Ai::Error, "Mastra error – could not fetch agents: #{response.body}"
Expand Down Expand Up @@ -97,7 +98,8 @@ def run_workflow(workflow_name, input:)
end

# Step 2: Stream the workflow – we only need to consume the stream so that we know when it finishes
stream_url = URI.join(@base_uri, "api/workflows/#{workflow_name}/streamVNext?runId=#{run_id}")
stream_url =
URI.join(@base_uri, "api/workflows/#{workflow_name}/streamVNext?runId=#{run_id}")
stream_request_body = { inputData: JSON.parse(input.to_json), runtimeContext: {} }.to_json
stream_response =
http_post(stream_url, body: stream_request_body, stream: true) do |response|
Expand All @@ -119,6 +121,7 @@ def run_workflow(workflow_name, input:)
.config
.api_key
.present?
http = build_http
result_response = http.request(result_request)

unless result_response.is_a?(Net::HTTPSuccess)
Expand Down Expand Up @@ -153,6 +156,7 @@ def workflow(workflow_name)
request['Origin'] = Ai.config.origin
request['Authorization'] = "Bearer #{Ai.config.api_key}" if Ai.config.api_key.present?

http = build_http
response = http.request(request)

unless response.is_a?(Net::HTTPSuccess)
Expand Down Expand Up @@ -181,10 +185,12 @@ def workflow(workflow_name)
private

sig { returns(Net::HTTP) }
def http
@http ||= T.let(Net::HTTP.new(@base_uri.host, @base_uri.port), T.nilable(Net::HTTP))
@http.use_ssl = (@base_uri.scheme == 'https')
@http
def build_http
# Create a new connection for each request - thread-safe
# This ensures each thread/request gets its own HTTP connection with its own SSL context
http_instance = Net::HTTP.new(@base_uri.host, @base_uri.port)
http_instance.use_ssl = (@base_uri.scheme == 'https')
http_instance
Comment thread
oriolgual marked this conversation as resolved.
end

sig { params(options: T::Hash[Symbol, T.anything]).returns(T::Hash[Symbol, T.anything]) }
Expand All @@ -208,6 +214,7 @@ def http_post(url, body: nil, stream: false, &blk)
request['Authorization'] = "Bearer #{Ai.config.api_key}" if Ai.config.api_key.present?
request.body = body if body

http = build_http
if stream && blk
http.request(request, &blk)
else
Expand Down Expand Up @@ -247,6 +254,7 @@ def response(url:, messages:, options:)
serialized_messages = messages.map(&:as_json)
request.body = { messages: serialized_messages, **camelized_options }.to_json

http = build_http
response = http.request(request)

unless response.is_a?(Net::HTTPSuccess)
Expand Down