diff --git a/.editorconfig b/.editorconfig index 38c0dec25ef9..925edb3fbbb7 100644 --- a/.editorconfig +++ b/.editorconfig @@ -52,6 +52,12 @@ insert_final_newline = unset trim_trailing_whitespace = unset end_of_line = unset +[*.bin] +indent_style = unset +insert_final_newline = unset +trim_trailing_whitespace = unset +end_of_line = unset + [*.sse] trim_trailing_whitespace = unset insert_final_newline = unset diff --git a/apisix/plugins/ai-protocols/bedrock-converse.lua b/apisix/plugins/ai-protocols/bedrock-converse.lua index 0e0d2061a4cb..5c7112c063df 100644 --- a/apisix/plugins/ai-protocols/bedrock-converse.lua +++ b/apisix/plugins/ai-protocols/bedrock-converse.lua @@ -17,7 +17,9 @@ --- Bedrock Converse protocol adapter (client-side). -- Handles detection and response parsing for the Amazon Bedrock --- Converse API format. Non-streaming only in this phase. +-- Converse API format. Streaming uses the /converse-stream endpoint with +-- AWS EventStream binary framing; this module's parse_sse_event consumes +-- the {headers, payload} event shape produced by ai-transport.aws-eventstream. local core = require("apisix.core") local string_sub = string.sub @@ -38,22 +40,95 @@ end --- Check whether the request is a streaming request. --- Streaming is not supported in this phase. +-- The Bedrock Converse API itself has no `stream` body field — we use +-- `body.stream = true` as the gateway-side opt-in to route the request to +-- /converse-stream (which returns AWS EventStream binary frames). function _M.is_streaming(body) - return false + return type(body) == "table" and body.stream == true end --- Prepare the outgoing request body for the target provider. --- TODO: support streaming. Bedrock uses a separate /converse-stream endpoint --- with the AWS EventStream binary protocol, which we don't yet implement. --- For now, strip the `stream` field so we only send non-streaming requests --- to /converse. +-- Strip our gateway-side `stream` flag; Bedrock rejects unknown body fields +-- and decides streaming purely by URL (/converse vs /converse-stream). function _M.prepare_outgoing_request(body) body.stream = nil end +--- Parse a streaming event from Bedrock's EventStream framing. +-- Event shape comes from ai-transport.aws-eventstream: {headers, payload}. +-- Bedrock standard headers: :event-type, :content-type, :message-type. +-- :message-type is "event" for normal events and "exception" for typed +-- streaming errors (throttlingException, modelStreamErrorException, etc.). +function _M.parse_sse_event(event, ctx, state) + if type(event) ~= "table" or type(event.headers) ~= "table" then + return { type = "skip" } + end + + local message_type = event.headers[":message-type"] + if message_type == "exception" or message_type == "error" then + local err_type = event.headers[":exception-type"] + or event.headers[":error-code"] + or "unknown" + -- Don't log event.payload: it's upstream-controlled JSON that may + -- contain partial completions, prompt fragments, or other request + -- content. Log just the typed error and the payload size. + core.log.warn("Bedrock streaming exception: type=", err_type, + ", payload_size=", #(event.payload or "")) + return { type = "done" } + end + + local event_type = event.headers[":event-type"] + if not event_type then + return { type = "skip" } + end + + if event_type == "contentBlockDelta" then + local data, err = core.json.decode(event.payload, { null_as_nil = true }) + if not data then + core.log.warn("failed to decode contentBlockDelta payload: ", err) + return { type = "skip" } + end + if type(data.delta) == "table" and type(data.delta.text) == "string" then + return { type = "delta", texts = { data.delta.text } } + end + -- toolUse partial-JSON deltas and other delta shapes don't contribute + -- to llm_response_text; let the raw bytes pass through unchanged. + return { type = "skip" } + end + + if event_type == "messageStop" then + return { type = "done" } + end + + if event_type == "metadata" then + local data, err = core.json.decode(event.payload, { null_as_nil = true }) + if not data or type(data.usage) ~= "table" then + if err then + core.log.warn("failed to decode metadata payload: ", err) + end + return { type = "skip" } + end + local raw = data.usage + return { + type = "usage_and_done", + usage = { + prompt_tokens = raw.inputTokens or 0, + completion_tokens = raw.outputTokens or 0, + total_tokens = raw.totalTokens + or (raw.inputTokens or 0) + (raw.outputTokens or 0), + }, + raw_usage = raw, + } + end + + -- messageStart / contentBlockStart / contentBlockStop carry no metrics + -- or visible text and aren't surfaced to the client beyond passthrough. + return { type = "skip" } +end + + --- Extract token usage from a non-streaming Bedrock response. -- Bedrock format: res_body.usage.inputTokens / outputTokens / totalTokens function _M.extract_usage(res_body) diff --git a/apisix/plugins/ai-providers/base.lua b/apisix/plugins/ai-providers/base.lua index 253e048f649a..5aaddbe1ac4a 100644 --- a/apisix/plugins/ai-providers/base.lua +++ b/apisix/plugins/ai-providers/base.lua @@ -25,13 +25,11 @@ local mt = { __index = _M } --- Maximum SSE buffer size per request (1 MB). -local MAX_SSE_BUF_SIZE = 1024 * 1024 - local core = require("apisix.core") local plugin = require("apisix.plugin") local url = require("socket.url") local sse = require("apisix.plugins.ai-transport.sse") +local aws_eventstream = require("apisix.plugins.ai-transport.aws-eventstream") local transport_http = require("apisix.plugins.ai-transport.http") local transport_auth = require("apisix.plugins.ai-transport.auth") local log_sanitize = require("apisix.utils.log-sanitize") @@ -48,6 +46,16 @@ local type = type local math = math local ipairs = ipairs local setmetatable = setmetatable +local tostring = tostring + +-- Streaming framings selectable via provider.streaming_framing. +-- Each module exposes split_buf(buf) -> (complete, remainder) and +-- decode(buf) -> array of events. The event shape is framing-specific; +-- the protocol's parse_sse_event must understand it. +local FRAMINGS = { + sse = sse, + ["aws-eventstream"] = aws_eventstream, +} function _M.new(opt) @@ -360,13 +368,20 @@ function _M.parse_response(self, ctx, res, client_proto, converter, conf) end ---- Process streaming SSE response. --- Uses target protocol for SSE parsing and converter (if present) for --- transforming events to client format. +--- Process streaming response. +-- Uses target protocol for event parsing and converter (if present) for +-- transforming events to client format. The wire framing (SSE vs AWS +-- EventStream binary) is selected by provider.streaming_framing; the +-- protocol module's parse_sse_event must understand the resulting event +-- shape. -- @param target_proto table The protocol module for the provider's native protocol -- @param converter table|nil The converter module (if protocol conversion needed) -- @param conf table|nil Plugin configuration (used for stream duration and size limits) function _M.parse_streaming_response(self, ctx, res, target_proto, converter, conf) + local framing = FRAMINGS[self.streaming_framing or "sse"] + if not framing then + return 500, "unknown streaming framing: " .. tostring(self.streaming_framing) + end local body_reader = res.body_reader local contents = {} local sse_state = { is_first = true } @@ -406,14 +421,14 @@ function _M.parse_streaming_response(self, ctx, res, target_proto, converter, co end if not chunk then if #sse_buf > 0 then - core.log.warn("dropping incomplete SSE frame at EOF, size: ", + core.log.warn("dropping incomplete stream frame at EOF, size: ", #sse_buf) end if converter and not output_sent then local msg = "streaming response completed without producing " .. "any output; the upstream likely returned a " - .. "different SSE format than the converter expects" + .. "different stream format than the converter expects" core.log.error(msg) return 502, msg end @@ -428,13 +443,14 @@ function _M.parse_streaming_response(self, ctx, res, target_proto, converter, co end sse_buf = sse_buf .. chunk - local complete, remainder = sse.split_buf(sse_buf) - if #remainder > MAX_SSE_BUF_SIZE then - core.log.warn("SSE remainder exceeded ", MAX_SSE_BUF_SIZE, " bytes, resetting") + local complete, remainder = framing.split_buf(sse_buf) + local max_remainder = framing.max_remainder or 1024 * 1024 + if #remainder > max_remainder then + core.log.warn("stream remainder exceeded ", max_remainder, " bytes, resetting") remainder = "" end sse_buf = remainder - local events = complete ~= "" and sse.decode(complete) or {} + local events = complete ~= "" and framing.decode(complete) or {} ctx.llm_response_contents_in_chunk = {} local converted_chunks = {} diff --git a/apisix/plugins/ai-providers/bedrock.lua b/apisix/plugins/ai-providers/bedrock.lua index 8fb2f0a1e620..e4b826ec6b43 100644 --- a/apisix/plugins/ai-providers/bedrock.lua +++ b/apisix/plugins/ai-providers/bedrock.lua @@ -21,6 +21,7 @@ local ngx_escape_uri = ngx.escape_uri local host_template = "bedrock-runtime.%s.amazonaws.com" local chat_path_template = "/model/%s/converse" +local stream_path_template = "/model/%s/converse-stream" local function get_host(region) return str_fmt(host_template, region) @@ -57,6 +58,9 @@ return require("apisix.plugins.ai-providers.base").new({ get_node = get_node, remove_model = true, aws_sigv4 = true, + -- Bedrock ConverseStream uses AWS EventStream binary framing on the + -- /converse-stream endpoint, not Server-Sent Events. + streaming_framing = "aws-eventstream", capabilities = { ["bedrock-converse"] = { host = function(conf) @@ -75,7 +79,11 @@ return require("apisix.plugins.ai-providers.base").new({ -- contain "/" (e.g. "...:application-inference-profile/abc") -- and ":". auth-aws.lua's normalize_and_encode_path is -- idempotent so this pre-encoding is preserved end-to-end. - return str_fmt(chat_path_template, ngx_escape_uri(model)) + local template = chat_path_template + if ctx.var.request_type == "ai_stream" then + template = stream_path_template + end + return str_fmt(template, ngx_escape_uri(model)) end, rewrite_request_body = rewrite_converse_request_body, }, diff --git a/apisix/plugins/ai-proxy/base.lua b/apisix/plugins/ai-proxy/base.lua index 1795c4c62c0f..302c27986543 100644 --- a/apisix/plugins/ai-proxy/base.lua +++ b/apisix/plugins/ai-proxy/base.lua @@ -225,8 +225,18 @@ function _M.before_proxy(conf, ctx, on_error) core.response.set_header("Content-Type", content_type) -- Step 5: Parse response + -- Streaming responses arrive with provider-specific framing + -- content-types: SSE for OpenAI/Anthropic/etc., AWS EventStream + -- binary frames for Bedrock ConverseStream. The framing module + -- is selected inside parse_streaming_response via + -- provider.streaming_framing. local code, body - if content_type and core.string.find(content_type, "text/event-stream") then + local is_streaming_resp = content_type and ( + core.string.find(content_type, "text/event-stream", 1, true) or + core.string.find(content_type, + "application/vnd.amazon.eventstream", 1, true) + ) + if is_streaming_resp then local target_proto_module = protocols.get(target_proto) if not target_proto_module then core.log.error("no protocol module for streaming target: ", target_proto) diff --git a/apisix/plugins/ai-transport/aws-eventstream.lua b/apisix/plugins/ai-transport/aws-eventstream.lua new file mode 100644 index 000000000000..09f017843685 --- /dev/null +++ b/apisix/plugins/ai-transport/aws-eventstream.lua @@ -0,0 +1,292 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +--- AWS EventStream binary framing codec. +-- Used by Bedrock ConverseStream, Kinesis SubscribeToShard, S3 SelectObjectContent, +-- and Transcribe streaming. Each frame: +-- +-- prelude (12 bytes) +-- total_length BE uint32 -- entire frame including trailing CRC +-- headers_length BE uint32 -- size of the headers section +-- prelude_crc BE uint32 -- CRC32 of the first 8 bytes +-- headers (headers_length bytes) +-- repeated entries: name_len(u8) name(bytes) value_type(u8) value... +-- payload (total_length - 16 - headers_length bytes) +-- message_crc (4 bytes BE uint32) -- CRC32 of bytes [0, total_length-4) +-- +-- This module provides the same API surface as ai-transport.sse so the base +-- provider can pick a framing module by name. + +local core = require("apisix.core") +local ngx = ngx +local ngx_crc32 = ngx.crc32_long +local string_byte = string.byte +local string_sub = string.sub +local tostring = tostring + +-- Hard cap on a single frame size to avoid memory blowups on malformed input. +-- AWS documents ConverseStream frames as well under 1 MiB; pick 16 MiB to be +-- safe for other services that use this codec. +local MAX_FRAME_SIZE = 16 * 1024 * 1024 + +-- Header value type tags (AWS EventStream spec). Bedrock only sends type 7 +-- (string), but we decode 6/7 for robustness against other AWS services. +local TYPE_TRUE = 0 +local TYPE_FALSE = 1 +local TYPE_BYTE = 2 +local TYPE_SHORT = 3 +local TYPE_INTEGER = 4 +local TYPE_LONG = 5 +local TYPE_BYTE_ARRAY = 6 +local TYPE_STRING = 7 +local TYPE_TIMESTAMP = 8 +local TYPE_UUID = 9 + +local _M = { + -- Cap on bytes split_buf may leave in `remainder`. The streaming loop + -- in ai-providers.base uses this to bound the buffer when a frame has + -- not yet completed. A single in-progress frame can be up to + -- MAX_FRAME_SIZE bytes, so the remainder cap matches that. + max_remainder = MAX_FRAME_SIZE, +} + + +local function read_u32_be(s, pos) + local b1, b2, b3, b4 = string_byte(s, pos, pos + 3) + if not b4 then + return nil + end + return b1 * 16777216 + b2 * 65536 + b3 * 256 + b4 +end + + +local function read_u16_be(s, pos) + local b1, b2 = string_byte(s, pos, pos + 1) + if not b2 then + return nil + end + return b1 * 256 + b2 +end + + +-- Decode a single frame's headers section. +-- @param s string Full frame buffer +-- @param start int 1-based start of headers +-- @param stop int 1-based end of headers (inclusive) +-- @return table|nil Map of header name -> string value +-- @return string|nil error +local function parse_headers(s, start, stop) + local headers = {} + local pos = start + while pos <= stop do + local name_len = string_byte(s, pos) + if not name_len then + return nil, "truncated header entry" + end + pos = pos + 1 + if pos + name_len - 1 > stop then + return nil, "header name extends past headers section" + end + local name = string_sub(s, pos, pos + name_len - 1) + pos = pos + name_len + + local value_type = string_byte(s, pos) + if not value_type then + return nil, "missing header value type" + end + pos = pos + 1 + + if value_type == TYPE_STRING or value_type == TYPE_BYTE_ARRAY then + local val_len = read_u16_be(s, pos) + if not val_len then + return nil, "truncated header value length" + end + pos = pos + 2 + if pos + val_len - 1 > stop then + return nil, "header value extends past headers section" + end + headers[name] = string_sub(s, pos, pos + val_len - 1) + pos = pos + val_len + elseif value_type == TYPE_TRUE then + headers[name] = true + elseif value_type == TYPE_FALSE then + headers[name] = false + elseif value_type == TYPE_BYTE then + if pos > stop then + return nil, "truncated header byte value" + end + headers[name] = string_byte(s, pos) + pos = pos + 1 + elseif value_type == TYPE_SHORT then + if pos + 1 > stop then + return nil, "truncated header short value" + end + headers[name] = read_u16_be(s, pos) + pos = pos + 2 + elseif value_type == TYPE_INTEGER then + if pos + 3 > stop then + return nil, "truncated header integer value" + end + headers[name] = read_u32_be(s, pos) + pos = pos + 4 + elseif value_type == TYPE_LONG then + if pos + 7 > stop then + return nil, "truncated header long value" + end + -- 64-bit ints don't fit in a Lua double; keep raw bytes. + headers[name] = string_sub(s, pos, pos + 7) + pos = pos + 8 + elseif value_type == TYPE_TIMESTAMP then + if pos + 7 > stop then + return nil, "truncated header timestamp value" + end + headers[name] = string_sub(s, pos, pos + 7) + pos = pos + 8 + elseif value_type == TYPE_UUID then + if pos + 15 > stop then + return nil, "truncated header uuid value" + end + headers[name] = string_sub(s, pos, pos + 15) + pos = pos + 16 + else + return nil, "unknown header value type: " .. value_type + end + end + return headers +end + + +--- Split a buffer at the last complete frame boundary. +-- A "complete" frame here is one whose prelude length fields are sane, +-- whose full byte range is present, AND whose prelude CRC validates. +-- The CRC check matters because split_buf advances pos based on +-- total_length alone — without it, a frame with a sane length but bad +-- prelude CRC would be consumed into `complete`, decode() would stop on +-- it, and any valid frames behind it in the same chunk would be lost +-- (already past pos, not preserved in remainder). Validating here keeps +-- corrupt frames in `remainder` so the caller can either resync or trip +-- max_remainder. Message CRC is intentionally not checked here (decode() +-- handles that); a frame with a good prelude but bad payload CRC is rare +-- and any frames behind it in the same chunk would still be advanced +-- past — accepted trade-off vs. the cost of computing the message CRC +-- twice on every frame. +-- @param buf string +-- @return string complete Concatenated complete frames (or "" if none). +-- @return string remainder Bytes after the last complete frame. +function _M.split_buf(buf) + local len = #buf + local pos = 1 + while pos + 11 <= len do + local total_length = read_u32_be(buf, pos) + if not total_length or total_length < 16 or total_length > MAX_FRAME_SIZE then + -- Corrupt total_length field. Stop and leave the bytes in the + -- remainder; decode() never sees them. + break + end + if pos + total_length - 1 > len then + -- Frame not yet fully in buffer — wait for more chunks. + break + end + local prelude_crc = read_u32_be(buf, pos + 8) + if ngx_crc32(string_sub(buf, pos, pos + 7)) ~= prelude_crc then + -- Prelude CRC mismatch. Don't advance: keep this corrupt + -- frame and everything after in `remainder`, so we don't + -- silently consume valid frames sitting behind it. + break + end + pos = pos + total_length + end + if pos == 1 then + return "", buf + end + return string_sub(buf, 1, pos - 1), string_sub(buf, pos) +end + + +--- Decode a buffer of complete frames into events. +-- @param buf string Buffer containing zero or more complete frames. +-- @return table Array of {headers = {string -> string}, payload = string}. +function _M.decode(buf) + local events = {} + local len = #buf + local pos = 1 + while pos <= len do + if pos + 11 > len then + core.log.warn("aws-eventstream: truncated prelude at offset ", pos - 1, + " (buffer ", len, " bytes)") + return events + end + local total_length = read_u32_be(buf, pos) + local headers_length = read_u32_be(buf, pos + 4) + local prelude_crc = read_u32_be(buf, pos + 8) + + if not total_length or total_length < 16 or total_length > MAX_FRAME_SIZE then + core.log.warn("aws-eventstream: invalid total_length ", + tostring(total_length), " at offset ", pos - 1) + return events + end + if headers_length > total_length - 16 then + core.log.warn("aws-eventstream: headers_length ", headers_length, + " exceeds frame body") + return events + end + if pos + total_length - 1 > len then + core.log.warn("aws-eventstream: incomplete frame at offset ", pos - 1) + return events + end + + local computed_prelude_crc = ngx_crc32(string_sub(buf, pos, pos + 7)) + if computed_prelude_crc ~= prelude_crc then + core.log.warn("aws-eventstream: prelude CRC mismatch at offset ", pos - 1, + " expected ", prelude_crc, " got ", computed_prelude_crc) + return events + end + + local headers_start = pos + 12 + local payload_start = headers_start + headers_length + local payload_end = pos + total_length - 5 -- inclusive + local message_crc = read_u32_be(buf, payload_end + 1) + + local computed_message_crc = ngx_crc32(string_sub(buf, pos, payload_end)) + if computed_message_crc ~= message_crc then + core.log.warn("aws-eventstream: message CRC mismatch at offset ", pos - 1, + " expected ", message_crc, " got ", computed_message_crc) + return events + end + + local headers, herr + if headers_length > 0 then + headers, herr = parse_headers(buf, headers_start, payload_start - 1) + if not headers then + core.log.warn("aws-eventstream: failed to parse headers: ", herr) + return events + end + else + headers = {} + end + + events[#events + 1] = { + headers = headers, + payload = string_sub(buf, payload_start, payload_end), + } + pos = pos + total_length + end + return events +end + + +return _M diff --git a/apisix/plugins/ai-transport/sse.lua b/apisix/plugins/ai-transport/sse.lua index 249fcf9f8b5d..ad8ab271224d 100644 --- a/apisix/plugins/ai-transport/sse.lua +++ b/apisix/plugins/ai-transport/sse.lua @@ -24,7 +24,13 @@ local tonumber = tonumber local tostring = tostring local ipairs = ipairs -local _M = {} +local _M = { + -- Cap on bytes split_buf may leave in `remainder`. Used by the streaming + -- loop in ai-providers.base to bound the buffer when frames don't + -- complete. SSE frames are small (text events delimited by blank lines), + -- so 1 MiB is plenty. + max_remainder = 1024 * 1024, +} --- Decode an SSE text chunk into a list of event tables. diff --git a/docs/en/latest/plugins/ai-proxy-multi.md b/docs/en/latest/plugins/ai-proxy-multi.md index ca0a9b23953e..771801ba0f93 100644 --- a/docs/en/latest/plugins/ai-proxy-multi.md +++ b/docs/en/latest/plugins/ai-proxy-multi.md @@ -54,13 +54,14 @@ In addition, the Plugin also supports logging LLM request information in the acc When an instance's `provider` is set to `bedrock`, the Plugin expects requests in the [Bedrock Converse API](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_Converse.html) format. The request URI must end with `/converse` and the body must contain a `messages` array. -| Name | Type | Required | Description | -| ------------------ | ------ | -------- | ---------------------------------------------------------------------------------------------------- | -| `messages` | Array | True | An array of message objects. | -| `messages.role` | String | True | Role of the message (`user`, `assistant`). | -| `messages.content` | Array | True | An array of content blocks. Each block contains a `text` field (e.g., `[{"text": "What is 1+1?"}]`). | -| `system` | Array | False | Optional system prompt blocks (e.g., `[{"text": "You are a helpful assistant."}]`). | -| `inferenceConfig` | Object | False | Optional inference parameters such as `maxTokens`, `temperature`, `topP`, `stopSequences`, etc. | +| Name | Type | Required | Description | +| ------------------ | ------- | -------- | ---------------------------------------------------------------------------------------------------- | +| `messages` | Array | True | An array of message objects. | +| `messages.role` | String | True | Role of the message (`user`, `assistant`). | +| `messages.content` | Array | True | An array of content blocks. Each block contains a `text` field (e.g., `[{"text": "What is 1+1?"}]`). | +| `system` | Array | False | Optional system prompt blocks (e.g., `[{"text": "You are a helpful assistant."}]`). | +| `inferenceConfig` | Object | False | Optional inference parameters such as `maxTokens`, `temperature`, `topP`, `stopSequences`, etc. | +| `stream` | Boolean | False | When `true`, the Plugin proxies the request to Bedrock's [`ConverseStream`](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_ConverseStream.html) endpoint and forwards the response in [AWS EventStream](https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html) (`application/vnd.amazon.eventstream`) binary framing. The flag is consumed by the Plugin and is not forwarded to Bedrock. | ## Attributes @@ -2028,10 +2029,12 @@ https://bedrock-runtime.us-east-1.amazonaws.com/model/arn%3Aaws%3Abedrock%3Aus-e If `auth.aws.session_token` is set, it is used for temporary credentials (e.g., obtained from AWS STS or an assumed role) and will be added to the SigV4-signed request automatically. Both `auth.aws.secret_access_key` and `auth.aws.session_token` are stored encrypted. -Streaming responses (Bedrock `ConverseStream`) are not yet supported by the Plugin. - ::: +#### Streaming with Bedrock `ConverseStream` + +To enable streaming, send the same Converse request body with `"stream": true`. The Plugin routes the request to Bedrock's `/model//converse-stream` endpoint and forwards each AWS EventStream frame to the client unchanged. The response `Content-Type` is `application/vnd.amazon.eventstream`; clients must parse the binary framing themselves (most AWS SDKs do this automatically). + ### Proxy to Embedding Models The following example demonstrates how you can configure the `ai-proxy-multi` Plugin to proxy requests and load balance between embedding models. diff --git a/docs/en/latest/plugins/ai-proxy.md b/docs/en/latest/plugins/ai-proxy.md index 5bf7ebdb14b1..40e138b18d7f 100644 --- a/docs/en/latest/plugins/ai-proxy.md +++ b/docs/en/latest/plugins/ai-proxy.md @@ -54,13 +54,14 @@ In addition, the Plugin also supports logging LLM request information in the acc When `provider` is set to `bedrock`, the Plugin expects requests in the [Bedrock Converse API](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_Converse.html) format. The request URI must end with `/converse` and the body must contain a `messages` array. -| Name | Type | Required | Description | -| ------------------ | ------ | -------- | ---------------------------------------------------------------------------------------------------- | -| `messages` | Array | True | An array of message objects. | -| `messages.role` | String | True | Role of the message (`user`, `assistant`). | -| `messages.content` | Array | True | An array of content blocks. Each block contains a `text` field (e.g., `[{"text": "What is 1+1?"}]`). | -| `system` | Array | False | Optional system prompt blocks (e.g., `[{"text": "You are a helpful assistant."}]`). | -| `inferenceConfig` | Object | False | Optional inference parameters such as `maxTokens`, `temperature`, `topP`, etc. | +| Name | Type | Required | Description | +| ------------------ | ------- | -------- | ---------------------------------------------------------------------------------------------------- | +| `messages` | Array | True | An array of message objects. | +| `messages.role` | String | True | Role of the message (`user`, `assistant`). | +| `messages.content` | Array | True | An array of content blocks. Each block contains a `text` field (e.g., `[{"text": "What is 1+1?"}]`). | +| `system` | Array | False | Optional system prompt blocks (e.g., `[{"text": "You are a helpful assistant."}]`). | +| `inferenceConfig` | Object | False | Optional inference parameters such as `maxTokens`, `temperature`, `topP`, etc. | +| `stream` | Boolean | False | When `true`, the Plugin proxies the request to Bedrock's [`ConverseStream`](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_ConverseStream.html) endpoint and forwards the response in [AWS EventStream](https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html) (`application/vnd.amazon.eventstream`) binary framing. The flag is consumed by the Plugin and is not forwarded to Bedrock. | ## Attributes @@ -884,10 +885,23 @@ https://bedrock-runtime.us-east-1.amazonaws.com/model/arn%3Aaws%3Abedrock%3Aus-e If `auth.aws.session_token` is set, it is used for temporary credentials (e.g., obtained from AWS STS or an assumed role) and will be added to the SigV4-signed request automatically. Both `auth.aws.secret_access_key` and `auth.aws.session_token` are stored encrypted. -Streaming responses (Bedrock `ConverseStream`) are not yet supported by the Plugin. - ::: +#### Streaming with Bedrock `ConverseStream` + +To enable streaming, send the same Converse request body with `"stream": true`. The Plugin routes the request to Bedrock's `/model//converse-stream` endpoint and forwards each AWS EventStream frame to the client unchanged. The response `Content-Type` is `application/vnd.amazon.eventstream`; clients must parse the binary framing themselves (most AWS SDKs do this automatically). + +```shell +curl "http://127.0.0.1:9080/bedrock/converse" -X POST \ + -H "Content-Type: application/json" \ + --data '{ + "stream": true, + "messages": [ + {"role": "user", "content": [{"text": "What is 1+1?"}]} + ] + }' --output - +``` + ### Proxy to OpenAI Embedding Models The following example demonstrates how you can configure the `ai-proxy` Plugin to proxy requests to embedding models. This example will use the OpenAI embedding model endpoint. diff --git a/docs/zh/latest/plugins/ai-proxy-multi.md b/docs/zh/latest/plugins/ai-proxy-multi.md index 358b7da0c584..8148def3c2a4 100644 --- a/docs/zh/latest/plugins/ai-proxy-multi.md +++ b/docs/zh/latest/plugins/ai-proxy-multi.md @@ -54,13 +54,14 @@ import TabItem from '@theme/TabItem'; 当某个实例的 `provider` 设置为 `bedrock` 时,插件期望请求采用 [Bedrock Converse API](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_Converse.html) 格式。请求 URI 必须以 `/converse` 结尾,且请求体必须包含 `messages` 数组。 -| 名称 | 类型 | 必选项 | 描述 | -| ------------------ | ------ | -------- | ---------------------------------------------------------------------------------------------------- | -| `messages` | Array | 是 | 消息对象数组。 | -| `messages.role` | String | 是 | 消息的角色(`user`、`assistant`)。 | -| `messages.content` | Array | 是 | 内容块数组。每个块包含一个 `text` 字段(例如 `[{"text": "What is 1+1?"}]`)。 | -| `system` | Array | 否 | 可选的系统提示块(例如 `[{"text": "You are a helpful assistant."}]`)。 | -| `inferenceConfig` | Object | 否 | 可选的推理参数,如 `maxTokens`、`temperature`、`topP` 等。 | +| 名称 | 类型 | 必选项 | 描述 | +| ------------------ | ------- | -------- | ---------------------------------------------------------------------------------------------------- | +| `messages` | Array | 是 | 消息对象数组。 | +| `messages.role` | String | 是 | 消息的角色(`user`、`assistant`)。 | +| `messages.content` | Array | 是 | 内容块数组。每个块包含一个 `text` 字段(例如 `[{"text": "What is 1+1?"}]`)。 | +| `system` | Array | 否 | 可选的系统提示块(例如 `[{"text": "You are a helpful assistant."}]`)。 | +| `inferenceConfig` | Object | 否 | 可选的推理参数,如 `maxTokens`、`temperature`、`topP` 等。 | +| `stream` | Boolean | 否 | 设置为 `true` 时,插件会将请求代理到 Bedrock 的 [`ConverseStream`](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_ConverseStream.html) 接口,并以 [AWS EventStream](https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html) 二进制帧(`application/vnd.amazon.eventstream`)转发响应。该字段由插件消费,不会转发给 Bedrock。 | ## 属性 @@ -1841,10 +1842,12 @@ https://bedrock-runtime.us-east-1.amazonaws.com/model/arn%3Aaws%3Abedrock%3Aus-e 如果设置了 `auth.aws.session_token`,则它将用于临时凭证(例如从 AWS STS 或扮演角色获得的凭证),并将自动添加到 SigV4 签名的请求中。`auth.aws.secret_access_key` 和 `auth.aws.session_token` 都以加密形式存储。 -插件目前尚不支持流式响应(Bedrock `ConverseStream`)。 - ::: +#### 使用 Bedrock `ConverseStream` 进行流式响应 + +要启用流式响应,请使用相同的 Converse 请求体,并在其中加上 `"stream": true`。插件会将请求路由到 Bedrock 的 `/model//converse-stream` 接口,并将 AWS EventStream 帧原样转发给客户端。响应的 `Content-Type` 为 `application/vnd.amazon.eventstream`,客户端需自行解析二进制帧(多数 AWS SDK 已自动处理)。 + ### 代理到嵌入模型 以下示例演示了如何配置 `ai-proxy-multi` 插件以代理请求并在嵌入模型之间进行负载均衡。 diff --git a/docs/zh/latest/plugins/ai-proxy.md b/docs/zh/latest/plugins/ai-proxy.md index 0babeb969136..425961251139 100644 --- a/docs/zh/latest/plugins/ai-proxy.md +++ b/docs/zh/latest/plugins/ai-proxy.md @@ -54,13 +54,14 @@ import TabItem from '@theme/TabItem'; 当 `provider` 设置为 `bedrock` 时,插件期望请求采用 [Bedrock Converse API](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_Converse.html) 格式。请求 URI 必须以 `/converse` 结尾,且请求体必须包含 `messages` 数组。 -| 名称 | 类型 | 必选项 | 描述 | -| ------------------ | ------ | -------- | ---------------------------------------------------------------------------------------------------- | -| `messages` | Array | 是 | 消息对象数组。 | -| `messages.role` | String | 是 | 消息的角色(`user`、`assistant`)。 | -| `messages.content` | Array | 是 | 内容块数组。每个块包含一个 `text` 字段(例如 `[{"text": "What is 1+1?"}]`)。 | -| `system` | Array | 否 | 可选的系统提示块(例如 `[{"text": "You are a helpful assistant."}]`)。 | -| `inferenceConfig` | Object | 否 | 可选的推理参数,如 `maxTokens`、`temperature`、`topP` 等。 | +| 名称 | 类型 | 必选项 | 描述 | +| ------------------ | ------- | -------- | ---------------------------------------------------------------------------------------------------- | +| `messages` | Array | 是 | 消息对象数组。 | +| `messages.role` | String | 是 | 消息的角色(`user`、`assistant`)。 | +| `messages.content` | Array | 是 | 内容块数组。每个块包含一个 `text` 字段(例如 `[{"text": "What is 1+1?"}]`)。 | +| `system` | Array | 否 | 可选的系统提示块(例如 `[{"text": "You are a helpful assistant."}]`)。 | +| `inferenceConfig` | Object | 否 | 可选的推理参数,如 `maxTokens`、`temperature`、`topP` 等。 | +| `stream` | Boolean | 否 | 设置为 `true` 时,插件会将请求代理到 Bedrock 的 [`ConverseStream`](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_ConverseStream.html) 接口,并以 [AWS EventStream](https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html) 二进制帧(`application/vnd.amazon.eventstream`)转发响应。该字段由插件消费,不会转发给 Bedrock。 | ## 属性 @@ -884,10 +885,23 @@ https://bedrock-runtime.us-east-1.amazonaws.com/model/arn%3Aaws%3Abedrock%3Aus-e 如果设置了 `auth.aws.session_token`,则它将用于临时凭证(例如从 AWS STS 或扮演角色获得的凭证),并将自动添加到 SigV4 签名的请求中。`auth.aws.secret_access_key` 和 `auth.aws.session_token` 都以加密形式存储。 -插件目前尚不支持流式响应(Bedrock `ConverseStream`)。 - ::: +#### 使用 Bedrock `ConverseStream` 进行流式响应 + +要启用流式响应,请使用相同的 Converse 请求体,并在其中加上 `"stream": true`。插件会将请求路由到 Bedrock 的 `/model//converse-stream` 接口,并将 AWS EventStream 帧原样转发给客户端。响应的 `Content-Type` 为 `application/vnd.amazon.eventstream`,客户端需自行解析二进制帧(多数 AWS SDK 已自动处理)。 + +```shell +curl "http://127.0.0.1:9080/bedrock/converse" -X POST \ + -H "Content-Type: application/json" \ + --data '{ + "stream": true, + "messages": [ + {"role": "user", "content": [{"text": "What is 1+1?"}]} + ] + }' --output - +``` + ### 代理到 OpenAI 嵌入模型 以下示例演示了如何配置 `ai-proxy` 插件以将请求代理到嵌入模型。此示例将使用 OpenAI 嵌入模型端点。 diff --git a/t/fixtures/bedrock/bedrock-converse-streaming.bin b/t/fixtures/bedrock/bedrock-converse-streaming.bin new file mode 100644 index 000000000000..db4e9fb5e5e8 Binary files /dev/null and b/t/fixtures/bedrock/bedrock-converse-streaming.bin differ diff --git a/t/lib/server.lua b/t/lib/server.lua index 97908e214e72..bf7c530e1516 100644 --- a/t/lib/server.lua +++ b/t/lib/server.lua @@ -1036,6 +1036,114 @@ function _M.bedrock_converse() end +-- Mock for Bedrock /converse-stream. Reuses the same SigV4 + body shape +-- validation as bedrock_converse(), then serves the recorded EventStream +-- binary fixture so streaming tests can assert end-to-end framing, +-- response-text aggregation, and token-usage extraction. +function _M.bedrock_converse_stream() + local json = require("cjson.safe") + + ngx.log(ngx.WARN, "[test] received uri: ", ngx.var.request_uri) + + if ngx.req.get_method() ~= "POST" then + ngx.status = 400 + ngx.say("Unsupported request method: ", ngx.req.get_method()) + return + end + + local headers = ngx.req.get_headers() + local auth_header = headers["authorization"] + local amz_date = headers["x-amz-date"] + if not auth_header or not amz_date then + ngx.status = 403 + ngx.say(json.encode({message = "Missing Authentication Token"})) + return + end + + if not auth_header:match("^AWS4%-HMAC%-SHA256 ") then + ngx.status = 403 + ngx.say(json.encode({ + message = "Authorization header missing AWS4-HMAC-SHA256 algorithm prefix" + })) + return + end + + if not auth_header:match( + "Credential=AKIAIOSFODNN7EXAMPLE/%d%d%d%d%d%d%d%d/us%-east%-1/bedrock/aws4_request" + ) then + ngx.status = 403 + ngx.say(json.encode({ + message = "Authorization Credential scope does not match expected " + .. "AKIAIOSFODNN7EXAMPLE//us-east-1/bedrock/aws4_request" + })) + return + end + + local hex64 = string.rep("%x", 64) + if not auth_header:match("Signature=" .. hex64) then + ngx.status = 403 + ngx.say(json.encode({ + message = "Authorization Signature is missing or not 64 hex chars" + })) + return + end + + if not amz_date:match("^%d%d%d%d%d%d%d%dT%d%d%d%d%d%dZ$") then + ngx.status = 403 + ngx.say(json.encode({ + message = "X-Amz-Date header does not match YYYYMMDDTHHMMSSZ format" + })) + return + end + + ngx.req.read_body() + local body, err = json.decode(ngx.req.get_body_data() or "") + if not body then + ngx.status = 400 + ngx.say(json.encode({message = "Invalid JSON: " .. (err or "")})) + return + end + + -- Bedrock decides streaming purely by URL; the gateway must strip its + -- internal `stream` flag from the body before forwarding. + if body.stream ~= nil then + ngx.status = 400 + ngx.say(json.encode({ + message = "stream field should not be in request body for /converse-stream" + })) + return + end + + if body.model then + ngx.status = 400 + ngx.say(json.encode({ + message = "model field should not be in request body" + })) + return + end + + if not body.messages or #body.messages < 1 then + ngx.status = 400 + ngx.say(json.encode({message = "messages is required"})) + return + end + + local fixture_loader = require("lib.fixture_loader") + local content, ferr = fixture_loader.load("bedrock/bedrock-converse-streaming.bin") + if not content then + ngx.status = 500 + ngx.say(ferr) + return + end + + ngx.header["Content-Type"] = "application/vnd.amazon.eventstream" + ngx.header["Cache-Control"] = "no-cache" + ngx.header["Transfer-Encoding"] = "chunked" + ngx.print(content) + ngx.flush(true) +end + + -- Error endpoints for ai-request-rewrite tests. function _M.bad_request() ngx.status = 400 @@ -1078,9 +1186,15 @@ end function _M.go() local uri = ngx.var.uri - -- Bedrock Converse API: /model//converse where can contain - -- ':' and percent-encoded sequences (URL-encoded ARNs), which the path-to- - -- function-name conversion below can't represent. Dispatch directly. + -- Bedrock Converse API: /model//converse(-stream) where + -- can contain ':' and percent-encoded sequences (URL-encoded ARNs), + -- which the path-to-function-name conversion below can't represent. + -- Dispatch directly. Match streaming first since /converse is a suffix + -- of /converse-stream. + if uri:match("^/model/.+/converse%-stream$") then + inject_headers() + return _M.bedrock_converse_stream() + end if uri:match("^/model/.+/converse$") then inject_headers() return _M.bedrock_converse() diff --git a/t/plugin/ai-proxy-bedrock-single.t b/t/plugin/ai-proxy-bedrock-single.t index 349036908604..fc7e7d3b5a97 100644 --- a/t/plugin/ai-proxy-bedrock-single.t +++ b/t/plugin/ai-proxy-bedrock-single.t @@ -427,3 +427,73 @@ POST /single-ai/body-model-only/converse --- error_code: 400 --- response_body eval qr/could not resolve upstream path/ + + + +=== TEST 16: route for streaming (single ai-proxy, no path on endpoint) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/9', + ngx.HTTP_PUT, + [[{ + "uri": "/single-ai/stream/converse", + "plugins": { + "ai-proxy": { + "provider": "bedrock", + "auth": { + "aws": { + "access_key_id": "AKIAIOSFODNN7EXAMPLE", + "secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" + } + }, + "provider_conf": { + "region": "us-east-1" + }, + "options": { + "model": "anthropic.claude-3-5-sonnet-20241022-v2:0" + }, + "override": { + "endpoint": "http://127.0.0.1:1980" + }, + "ssl_verify": false + } + } + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed + + + +=== TEST 17: stream request hits /converse-stream and forwards EventStream bytes +--- request +POST /single-ai/stream/converse +{"stream":true,"messages":[{"role":"user","content":[{"text":"Say hi"}]}]} +--- error_code: 200 +--- response_body eval +qr/messageStart.*contentBlockDelta.*Hello.*messageStop.*metadata/s +--- error_log eval +qr{\[test\] received uri: /model/anthropic\.claude-3-5-sonnet-20241022-v2%3A0/converse-stream} +--- response_headers +Content-Type: application/vnd.amazon.eventstream + + + +=== TEST 18: non-stream request on the same route still hits /converse +--- request +POST /single-ai/stream/converse +{"messages":[{"role":"user","content":[{"text":"What is 1+1?"}]}]} +--- error_code: 200 +--- response_body eval +qr/"text"\s*:\s*"Hello!"/ +--- error_log eval +qr{\[test\] received uri: /model/anthropic\.claude-3-5-sonnet-20241022-v2%3A0/converse(?!-stream)} diff --git a/t/plugin/ai-proxy-bedrock.t b/t/plugin/ai-proxy-bedrock.t index 06be07905463..00afa3e3a975 100644 --- a/t/plugin/ai-proxy-bedrock.t +++ b/t/plugin/ai-proxy-bedrock.t @@ -516,3 +516,121 @@ POST /ai/body-model-only/converse --- error_code: 400 --- response_body eval qr/could not resolve upstream path/ + + + +=== TEST 17: route for streaming (no path on endpoint, model from options) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/9', + ngx.HTTP_PUT, + [[{ + "uri": "/ai/stream/converse", + "plugins": { + "ai-proxy-multi": { + "instances": [ + { + "name": "bedrock-stream", + "provider": "bedrock", + "weight": 1, + "auth": { + "aws": { + "access_key_id": "AKIAIOSFODNN7EXAMPLE", + "secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" + } + }, + "provider_conf": { + "region": "us-east-1" + }, + "options": { + "model": "anthropic.claude-3-5-sonnet-20241022-v2:0" + }, + "override": { + "endpoint": "http://127.0.0.1:1980" + } + } + ], + "ssl_verify": false + } + } + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed + + + +=== TEST 18: stream request hits /converse-stream and forwards EventStream bytes +--- request +POST /ai/stream/converse +{"stream":true,"messages":[{"role":"user","content":[{"text":"Say hi"}]}]} +--- error_code: 200 +--- response_body eval +qr/messageStart.*contentBlockDelta.*Hello.*messageStop.*metadata/s +--- error_log eval +qr{\[test\] received uri: /model/anthropic\.claude-3-5-sonnet-20241022-v2%3A0/converse-stream} +--- response_headers +Content-Type: application/vnd.amazon.eventstream + + + +=== TEST 19: stream request aggregates response text and token usage +--- config + location /t { + content_by_lua_block { + local http = require("resty.http") + local httpc = http.new() + local res, err = httpc:request_uri("http://127.0.0.1:" .. ngx.var.server_port + .. "/ai/stream/converse", { + method = "POST", + headers = {["Content-Type"] = "application/json"}, + body = [[{"stream":true,"messages":[{"role":"user","content":[{"text":"hi"}]}]}]], + }) + if not res then + ngx.status = 500 + ngx.say(err) + return + end + ngx.status = res.status + -- Body is binary EventStream; expose payload-bearing keywords so the + -- test can assert frame ordering and token-bearing metadata payload. + local body = res.body + local found = {} + for _, name in ipairs({"messageStart", "contentBlockDelta", + "Hello", "messageStop", "metadata", + "inputTokens", "outputTokens"}) do + if body:find(name, 1, true) then + found[#found + 1] = name + end + end + ngx.say(table.concat(found, ",")) + } + } +--- request +GET /t +--- error_code: 200 +--- response_body +messageStart,contentBlockDelta,Hello,messageStop,metadata,inputTokens,outputTokens +--- error_log eval +qr/got token usage from ai service/ + + + +=== TEST 20: non-stream request still hits /converse (control) +--- request +POST /ai/stream/converse +{"messages":[{"role":"user","content":[{"text":"What is 1+1?"}]}]} +--- error_code: 200 +--- response_body eval +qr/"text"\s*:\s*"Hello!"/ +--- error_log eval +qr{\[test\] received uri: /model/anthropic\.claude-3-5-sonnet-20241022-v2%3A0/converse(?!-stream)}