Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ insert_final_newline = unset
trim_trailing_whitespace = unset
end_of_line = unset

[*.bin]
indent_style = unset
insert_final_newline = unset
trim_trailing_whitespace = unset
end_of_line = unset

[*.sse]
trim_trailing_whitespace = unset
insert_final_newline = unset
89 changes: 82 additions & 7 deletions apisix/plugins/ai-protocols/bedrock-converse.lua
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

--- Bedrock Converse protocol adapter (client-side).
-- Handles detection and response parsing for the Amazon Bedrock
-- Converse API format. Non-streaming only in this phase.
-- Converse API format. Streaming uses the /converse-stream endpoint with
-- AWS EventStream binary framing; this module's parse_sse_event consumes
-- the {headers, payload} event shape produced by ai-transport.aws-eventstream.

local core = require("apisix.core")
local string_sub = string.sub
Expand All @@ -38,22 +40,95 @@ end


--- Check whether the request is a streaming request.
-- Streaming is not supported in this phase.
-- The Bedrock Converse API itself has no `stream` body field — we use
-- `body.stream = true` as the gateway-side opt-in to route the request to
-- /converse-stream (which returns AWS EventStream binary frames).
function _M.is_streaming(body)
return false
return type(body) == "table" and body.stream == true
end


--- Prepare the outgoing request body for the target provider.
-- TODO: support streaming. Bedrock uses a separate /converse-stream endpoint
-- with the AWS EventStream binary protocol, which we don't yet implement.
-- For now, strip the `stream` field so we only send non-streaming requests
-- to /converse.
-- Strip our gateway-side `stream` flag; Bedrock rejects unknown body fields
-- and decides streaming purely by URL (/converse vs /converse-stream).
function _M.prepare_outgoing_request(body)
body.stream = nil
end


--- Parse a streaming event from Bedrock's EventStream framing.
-- Event shape comes from ai-transport.aws-eventstream: {headers, payload}.
-- Bedrock standard headers: :event-type, :content-type, :message-type.
-- :message-type is "event" for normal events and "exception" for typed
-- streaming errors (throttlingException, modelStreamErrorException, etc.).
function _M.parse_sse_event(event, ctx, state)
if type(event) ~= "table" or type(event.headers) ~= "table" then
return { type = "skip" }
end

local message_type = event.headers[":message-type"]
if message_type == "exception" or message_type == "error" then
local err_type = event.headers[":exception-type"]
or event.headers[":error-code"]
or "unknown"
-- Don't log event.payload: it's upstream-controlled JSON that may
-- contain partial completions, prompt fragments, or other request
-- content. Log just the typed error and the payload size.
core.log.warn("Bedrock streaming exception: type=", err_type,
", payload_size=", #(event.payload or ""))
return { type = "done" }
end

local event_type = event.headers[":event-type"]
if not event_type then
return { type = "skip" }
end

if event_type == "contentBlockDelta" then
local data, err = core.json.decode(event.payload, { null_as_nil = true })
if not data then
core.log.warn("failed to decode contentBlockDelta payload: ", err)
return { type = "skip" }
end
if type(data.delta) == "table" and type(data.delta.text) == "string" then
return { type = "delta", texts = { data.delta.text } }
end
-- toolUse partial-JSON deltas and other delta shapes don't contribute
-- to llm_response_text; let the raw bytes pass through unchanged.
return { type = "skip" }
end

if event_type == "messageStop" then
return { type = "done" }
end

if event_type == "metadata" then
local data, err = core.json.decode(event.payload, { null_as_nil = true })
if not data or type(data.usage) ~= "table" then
if err then
core.log.warn("failed to decode metadata payload: ", err)
end
return { type = "skip" }
end
local raw = data.usage
return {
type = "usage_and_done",
usage = {
prompt_tokens = raw.inputTokens or 0,
completion_tokens = raw.outputTokens or 0,
total_tokens = raw.totalTokens
or (raw.inputTokens or 0) + (raw.outputTokens or 0),
},
raw_usage = raw,
}
end

-- messageStart / contentBlockStart / contentBlockStop carry no metrics
-- or visible text and aren't surfaced to the client beyond passthrough.
return { type = "skip" }
end


--- Extract token usage from a non-streaming Bedrock response.
-- Bedrock format: res_body.usage.inputTokens / outputTokens / totalTokens
function _M.extract_usage(res_body)
Expand Down
40 changes: 28 additions & 12 deletions apisix/plugins/ai-providers/base.lua
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@ local mt = {
__index = _M
}

-- Maximum SSE buffer size per request (1 MB).
local MAX_SSE_BUF_SIZE = 1024 * 1024

local core = require("apisix.core")
local plugin = require("apisix.plugin")
local url = require("socket.url")
local sse = require("apisix.plugins.ai-transport.sse")
local aws_eventstream = require("apisix.plugins.ai-transport.aws-eventstream")
local transport_http = require("apisix.plugins.ai-transport.http")
local transport_auth = require("apisix.plugins.ai-transport.auth")
local log_sanitize = require("apisix.utils.log-sanitize")
Expand All @@ -48,6 +46,16 @@ local type = type
local math = math
local ipairs = ipairs
local setmetatable = setmetatable
local tostring = tostring

-- Streaming framings selectable via provider.streaming_framing.
-- Each module exposes split_buf(buf) -> (complete, remainder) and
-- decode(buf) -> array of events. The event shape is framing-specific;
-- the protocol's parse_sse_event must understand it.
local FRAMINGS = {
sse = sse,
["aws-eventstream"] = aws_eventstream,
}


function _M.new(opt)
Expand Down Expand Up @@ -360,13 +368,20 @@ function _M.parse_response(self, ctx, res, client_proto, converter, conf)
end


--- Process streaming SSE response.
-- Uses target protocol for SSE parsing and converter (if present) for
-- transforming events to client format.
--- Process streaming response.
-- Uses target protocol for event parsing and converter (if present) for
-- transforming events to client format. The wire framing (SSE vs AWS
-- EventStream binary) is selected by provider.streaming_framing; the
-- protocol module's parse_sse_event must understand the resulting event
-- shape.
-- @param target_proto table The protocol module for the provider's native protocol
-- @param converter table|nil The converter module (if protocol conversion needed)
-- @param conf table|nil Plugin configuration (used for stream duration and size limits)
function _M.parse_streaming_response(self, ctx, res, target_proto, converter, conf)
local framing = FRAMINGS[self.streaming_framing or "sse"]
if not framing then
return 500, "unknown streaming framing: " .. tostring(self.streaming_framing)
end
local body_reader = res.body_reader
local contents = {}
local sse_state = { is_first = true }
Expand Down Expand Up @@ -406,14 +421,14 @@ function _M.parse_streaming_response(self, ctx, res, target_proto, converter, co
end
if not chunk then
if #sse_buf > 0 then
core.log.warn("dropping incomplete SSE frame at EOF, size: ",
core.log.warn("dropping incomplete stream frame at EOF, size: ",
#sse_buf)
end

if converter and not output_sent then
local msg = "streaming response completed without producing "
.. "any output; the upstream likely returned a "
.. "different SSE format than the converter expects"
.. "different stream format than the converter expects"
core.log.error(msg)
return 502, msg
end
Expand All @@ -428,13 +443,14 @@ function _M.parse_streaming_response(self, ctx, res, target_proto, converter, co
end

sse_buf = sse_buf .. chunk
local complete, remainder = sse.split_buf(sse_buf)
if #remainder > MAX_SSE_BUF_SIZE then
core.log.warn("SSE remainder exceeded ", MAX_SSE_BUF_SIZE, " bytes, resetting")
local complete, remainder = framing.split_buf(sse_buf)
local max_remainder = framing.max_remainder or 1024 * 1024
if #remainder > max_remainder then
core.log.warn("stream remainder exceeded ", max_remainder, " bytes, resetting")
remainder = ""
end
sse_buf = remainder
local events = complete ~= "" and sse.decode(complete) or {}
local events = complete ~= "" and framing.decode(complete) or {}
ctx.llm_response_contents_in_chunk = {}
local converted_chunks = {}

Expand Down
10 changes: 9 additions & 1 deletion apisix/plugins/ai-providers/bedrock.lua
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ local ngx_escape_uri = ngx.escape_uri

local host_template = "bedrock-runtime.%s.amazonaws.com"
local chat_path_template = "/model/%s/converse"
local stream_path_template = "/model/%s/converse-stream"

local function get_host(region)
return str_fmt(host_template, region)
Expand Down Expand Up @@ -57,6 +58,9 @@ return require("apisix.plugins.ai-providers.base").new({
get_node = get_node,
remove_model = true,
aws_sigv4 = true,
-- Bedrock ConverseStream uses AWS EventStream binary framing on the
-- /converse-stream endpoint, not Server-Sent Events.
streaming_framing = "aws-eventstream",
capabilities = {
["bedrock-converse"] = {
host = function(conf)
Expand All @@ -75,7 +79,11 @@ return require("apisix.plugins.ai-providers.base").new({
-- contain "/" (e.g. "...:application-inference-profile/abc")
-- and ":". auth-aws.lua's normalize_and_encode_path is
-- idempotent so this pre-encoding is preserved end-to-end.
return str_fmt(chat_path_template, ngx_escape_uri(model))
local template = chat_path_template
if ctx.var.request_type == "ai_stream" then
template = stream_path_template
end
return str_fmt(template, ngx_escape_uri(model))
end,
rewrite_request_body = rewrite_converse_request_body,
},
Expand Down
12 changes: 11 additions & 1 deletion apisix/plugins/ai-proxy/base.lua
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,18 @@ function _M.before_proxy(conf, ctx, on_error)
core.response.set_header("Content-Type", content_type)

-- Step 5: Parse response
-- Streaming responses arrive with provider-specific framing
-- content-types: SSE for OpenAI/Anthropic/etc., AWS EventStream
-- binary frames for Bedrock ConverseStream. The framing module
-- is selected inside parse_streaming_response via
-- provider.streaming_framing.
local code, body
if content_type and core.string.find(content_type, "text/event-stream") then
local is_streaming_resp = content_type and (
core.string.find(content_type, "text/event-stream", 1, true) or
core.string.find(content_type,
"application/vnd.amazon.eventstream", 1, true)
)
if is_streaming_resp then
local target_proto_module = protocols.get(target_proto)
if not target_proto_module then
core.log.error("no protocol module for streaming target: ", target_proto)
Expand Down
Loading
Loading