From da9972d0e0d2b90c9b38d6e85245b1b72e184d0f Mon Sep 17 00:00:00 2001 From: Abhishek Choudhary Date: Mon, 27 Apr 2026 22:59:48 +0545 Subject: [PATCH 01/14] feat(ai-proxy): support Bedrock ConverseStream streaming Phase 2 adds streaming via Bedrock's /converse-stream endpoint with AWS EventStream binary framing. - new ai-transport.eventstream codec parses/encodes EventStream frames (12-byte prelude + headers + payload + trailing CRC, validated via ngx.crc32_long); same API surface as ai-transport.sse so providers pick a framing module by name - ai-providers.base.parse_streaming_response selects sse vs aws-eventstream framing via provider.streaming_framing - bedrock provider declares streaming_framing = "aws-eventstream" and routes ctx.var.request_type == "ai_stream" to /converse-stream - bedrock-converse protocol decodes EventStream events: contentBlockDelta -> texts; metadata -> usage_and_done; exception :message-type -> done with warning - streaming tests (single + multi) cover path routing, SigV4 still validating, raw EventStream forwarded, token usage extracted from metadata, and a non-streaming control on the same route. Fixture is a real recorded /converse-stream response. - docs (en/zh) for ai-proxy and ai-proxy-multi updated --- .../plugins/ai-protocols/bedrock-converse.lua | 86 +++++- apisix/plugins/ai-providers/base.lua | 33 +- apisix/plugins/ai-providers/bedrock.lua | 10 +- apisix/plugins/ai-transport/eventstream.lua | 288 ++++++++++++++++++ docs/en/latest/plugins/ai-proxy-multi.md | 21 +- docs/en/latest/plugins/ai-proxy.md | 32 +- docs/zh/latest/plugins/ai-proxy-multi.md | 21 +- docs/zh/latest/plugins/ai-proxy.md | 32 +- .../bedrock/bedrock-converse-streaming.bin | Bin 0 -> 1086 bytes t/lib/server.lua | 120 +++++++- t/plugin/ai-proxy-bedrock-single.t | 70 +++++ t/plugin/ai-proxy-bedrock.t | 116 +++++++ 12 files changed, 774 insertions(+), 55 deletions(-) create mode 100644 apisix/plugins/ai-transport/eventstream.lua create mode 100644 t/fixtures/bedrock/bedrock-converse-streaming.bin diff --git a/apisix/plugins/ai-protocols/bedrock-converse.lua b/apisix/plugins/ai-protocols/bedrock-converse.lua index 0e0d2061a4cb..99e727d7e75b 100644 --- a/apisix/plugins/ai-protocols/bedrock-converse.lua +++ b/apisix/plugins/ai-protocols/bedrock-converse.lua @@ -17,7 +17,9 @@ --- Bedrock Converse protocol adapter (client-side). -- Handles detection and response parsing for the Amazon Bedrock --- Converse API format. Non-streaming only in this phase. +-- Converse API format. Streaming uses the /converse-stream endpoint with +-- AWS EventStream binary framing; this module's parse_sse_event consumes +-- the {headers, payload} event shape produced by ai-transport.eventstream. local core = require("apisix.core") local string_sub = string.sub @@ -38,22 +40,92 @@ end --- Check whether the request is a streaming request. --- Streaming is not supported in this phase. +-- The Bedrock Converse API itself has no `stream` body field — we use +-- `body.stream = true` as the gateway-side opt-in to route the request to +-- /converse-stream (which returns AWS EventStream binary frames). function _M.is_streaming(body) - return false + return type(body) == "table" and body.stream == true end --- Prepare the outgoing request body for the target provider. --- TODO: support streaming. Bedrock uses a separate /converse-stream endpoint --- with the AWS EventStream binary protocol, which we don't yet implement. --- For now, strip the `stream` field so we only send non-streaming requests --- to /converse. +-- Strip our gateway-side `stream` flag; Bedrock rejects unknown body fields +-- and decides streaming purely by URL (/converse vs /converse-stream). function _M.prepare_outgoing_request(body) body.stream = nil end +--- Parse a streaming event from Bedrock's EventStream framing. +-- Event shape comes from ai-transport.eventstream: {headers, payload}. +-- Bedrock standard headers: :event-type, :content-type, :message-type. +-- :message-type is "event" for normal events and "exception" for typed +-- streaming errors (throttlingException, modelStreamErrorException, etc.). +function _M.parse_sse_event(event, ctx, state) + if type(event) ~= "table" or type(event.headers) ~= "table" then + return { type = "skip" } + end + + local message_type = event.headers[":message-type"] + if message_type == "exception" or message_type == "error" then + local err_type = event.headers[":exception-type"] + or event.headers[":error-code"] + or "unknown" + core.log.warn("Bedrock streaming exception: type=", err_type, + ", payload=", event.payload or "") + return { type = "done" } + end + + local event_type = event.headers[":event-type"] + if not event_type then + return { type = "skip" } + end + + if event_type == "contentBlockDelta" then + local data, err = core.json.decode(event.payload, { null_as_nil = true }) + if not data then + core.log.warn("failed to decode contentBlockDelta payload: ", err) + return { type = "skip" } + end + if type(data.delta) == "table" and type(data.delta.text) == "string" then + return { type = "delta", texts = { data.delta.text } } + end + -- toolUse partial-JSON deltas and other delta shapes don't contribute + -- to llm_response_text; let the raw bytes pass through unchanged. + return { type = "skip" } + end + + if event_type == "messageStop" then + return { type = "done" } + end + + if event_type == "metadata" then + local data, err = core.json.decode(event.payload, { null_as_nil = true }) + if not data or type(data.usage) ~= "table" then + if err then + core.log.warn("failed to decode metadata payload: ", err) + end + return { type = "skip" } + end + local raw = data.usage + return { + type = "usage_and_done", + usage = { + prompt_tokens = raw.inputTokens or 0, + completion_tokens = raw.outputTokens or 0, + total_tokens = raw.totalTokens + or (raw.inputTokens or 0) + (raw.outputTokens or 0), + }, + raw_usage = raw, + } + end + + -- messageStart / contentBlockStart / contentBlockStop carry no metrics + -- or visible text and aren't surfaced to the client beyond passthrough. + return { type = "skip" } +end + + --- Extract token usage from a non-streaming Bedrock response. -- Bedrock format: res_body.usage.inputTokens / outputTokens / totalTokens function _M.extract_usage(res_body) diff --git a/apisix/plugins/ai-providers/base.lua b/apisix/plugins/ai-providers/base.lua index 253e048f649a..2b887c6de3cd 100644 --- a/apisix/plugins/ai-providers/base.lua +++ b/apisix/plugins/ai-providers/base.lua @@ -28,10 +28,20 @@ local mt = { -- Maximum SSE buffer size per request (1 MB). local MAX_SSE_BUF_SIZE = 1024 * 1024 +-- Streaming framings selectable via provider.streaming_framing. +-- Each module exposes split_buf(buf) -> (complete, remainder) and +-- decode(buf) -> array of events. The event shape is framing-specific; +-- the protocol's parse_sse_event must understand it. +local FRAMINGS = { + sse = sse, + ["aws-eventstream"] = eventstream, +} + local core = require("apisix.core") local plugin = require("apisix.plugin") local url = require("socket.url") local sse = require("apisix.plugins.ai-transport.sse") +local eventstream = require("apisix.plugins.ai-transport.eventstream") local transport_http = require("apisix.plugins.ai-transport.http") local transport_auth = require("apisix.plugins.ai-transport.auth") local log_sanitize = require("apisix.utils.log-sanitize") @@ -360,13 +370,20 @@ function _M.parse_response(self, ctx, res, client_proto, converter, conf) end ---- Process streaming SSE response. --- Uses target protocol for SSE parsing and converter (if present) for --- transforming events to client format. +--- Process streaming response. +-- Uses target protocol for event parsing and converter (if present) for +-- transforming events to client format. The wire framing (SSE vs AWS +-- EventStream binary) is selected by provider.streaming_framing; the +-- protocol module's parse_sse_event must understand the resulting event +-- shape. -- @param target_proto table The protocol module for the provider's native protocol -- @param converter table|nil The converter module (if protocol conversion needed) -- @param conf table|nil Plugin configuration (used for stream duration and size limits) function _M.parse_streaming_response(self, ctx, res, target_proto, converter, conf) + local framing = FRAMINGS[self.streaming_framing or "sse"] + if not framing then + return 500, "unknown streaming framing: " .. tostring(self.streaming_framing) + end local body_reader = res.body_reader local contents = {} local sse_state = { is_first = true } @@ -406,14 +423,14 @@ function _M.parse_streaming_response(self, ctx, res, target_proto, converter, co end if not chunk then if #sse_buf > 0 then - core.log.warn("dropping incomplete SSE frame at EOF, size: ", + core.log.warn("dropping incomplete stream frame at EOF, size: ", #sse_buf) end if converter and not output_sent then local msg = "streaming response completed without producing " .. "any output; the upstream likely returned a " - .. "different SSE format than the converter expects" + .. "different stream format than the converter expects" core.log.error(msg) return 502, msg end @@ -428,13 +445,13 @@ function _M.parse_streaming_response(self, ctx, res, target_proto, converter, co end sse_buf = sse_buf .. chunk - local complete, remainder = sse.split_buf(sse_buf) + local complete, remainder = framing.split_buf(sse_buf) if #remainder > MAX_SSE_BUF_SIZE then - core.log.warn("SSE remainder exceeded ", MAX_SSE_BUF_SIZE, " bytes, resetting") + core.log.warn("stream remainder exceeded ", MAX_SSE_BUF_SIZE, " bytes, resetting") remainder = "" end sse_buf = remainder - local events = complete ~= "" and sse.decode(complete) or {} + local events = complete ~= "" and framing.decode(complete) or {} ctx.llm_response_contents_in_chunk = {} local converted_chunks = {} diff --git a/apisix/plugins/ai-providers/bedrock.lua b/apisix/plugins/ai-providers/bedrock.lua index 8fb2f0a1e620..e4b826ec6b43 100644 --- a/apisix/plugins/ai-providers/bedrock.lua +++ b/apisix/plugins/ai-providers/bedrock.lua @@ -21,6 +21,7 @@ local ngx_escape_uri = ngx.escape_uri local host_template = "bedrock-runtime.%s.amazonaws.com" local chat_path_template = "/model/%s/converse" +local stream_path_template = "/model/%s/converse-stream" local function get_host(region) return str_fmt(host_template, region) @@ -57,6 +58,9 @@ return require("apisix.plugins.ai-providers.base").new({ get_node = get_node, remove_model = true, aws_sigv4 = true, + -- Bedrock ConverseStream uses AWS EventStream binary framing on the + -- /converse-stream endpoint, not Server-Sent Events. + streaming_framing = "aws-eventstream", capabilities = { ["bedrock-converse"] = { host = function(conf) @@ -75,7 +79,11 @@ return require("apisix.plugins.ai-providers.base").new({ -- contain "/" (e.g. "...:application-inference-profile/abc") -- and ":". auth-aws.lua's normalize_and_encode_path is -- idempotent so this pre-encoding is preserved end-to-end. - return str_fmt(chat_path_template, ngx_escape_uri(model)) + local template = chat_path_template + if ctx.var.request_type == "ai_stream" then + template = stream_path_template + end + return str_fmt(template, ngx_escape_uri(model)) end, rewrite_request_body = rewrite_converse_request_body, }, diff --git a/apisix/plugins/ai-transport/eventstream.lua b/apisix/plugins/ai-transport/eventstream.lua new file mode 100644 index 000000000000..3666394894e6 --- /dev/null +++ b/apisix/plugins/ai-transport/eventstream.lua @@ -0,0 +1,288 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +--- AWS EventStream binary framing codec. +-- Used by Bedrock ConverseStream, Kinesis SubscribeToShard, S3 SelectObjectContent, +-- and Transcribe streaming. Each frame: +-- +-- prelude (12 bytes) +-- total_length BE uint32 -- entire frame including trailing CRC +-- headers_length BE uint32 -- size of the headers section +-- prelude_crc BE uint32 -- CRC32 of the first 8 bytes +-- headers (headers_length bytes) +-- repeated entries: name_len(u8) name(bytes) value_type(u8) value... +-- payload (total_length - 16 - headers_length bytes) +-- message_crc (4 bytes BE uint32) -- CRC32 of bytes [0, total_length-4) +-- +-- This module provides the same API surface as ai-transport.sse so the base +-- provider can pick a framing module by name. + +local core = require("apisix.core") +local ngx = ngx +local ngx_crc32 = ngx.crc32_long +local string_byte = string.byte +local string_sub = string.sub +local string_char = string.char +local table_concat = table.concat +local type = type + +-- Hard cap on a single frame size to avoid memory blowups on malformed input. +-- AWS documents ConverseStream frames as well under 1 MiB; pick 16 MiB to be +-- safe for other services that use this codec. +local MAX_FRAME_SIZE = 16 * 1024 * 1024 + +-- Header value type tags (AWS EventStream spec). Bedrock only sends type 7 +-- (string), but we decode 6/7 for robustness against other AWS services. +local TYPE_TRUE = 0 +local TYPE_FALSE = 1 +local TYPE_BYTE = 2 +local TYPE_SHORT = 3 +local TYPE_INTEGER = 4 +local TYPE_LONG = 5 +local TYPE_BYTE_ARRAY = 6 +local TYPE_STRING = 7 +local TYPE_TIMESTAMP = 8 +local TYPE_UUID = 9 + +local _M = {} + + +local function read_u32_be(s, pos) + local b1, b2, b3, b4 = string_byte(s, pos, pos + 3) + if not b4 then + return nil + end + return b1 * 16777216 + b2 * 65536 + b3 * 256 + b4 +end + + +local function read_u16_be(s, pos) + local b1, b2 = string_byte(s, pos, pos + 1) + if not b2 then + return nil + end + return b1 * 256 + b2 +end + + +local function write_u32_be(n) + return string_char( + math.floor(n / 16777216) % 256, + math.floor(n / 65536) % 256, + math.floor(n / 256) % 256, + n % 256 + ) +end + + +local function write_u16_be(n) + return string_char(math.floor(n / 256) % 256, n % 256) +end + + +-- Decode a single frame's headers section. +-- @param s string Full frame buffer +-- @param start int 1-based start of headers +-- @param stop int 1-based end of headers (inclusive) +-- @return table|nil Map of header name -> string value +-- @return string|nil error +local function parse_headers(s, start, stop) + local headers = {} + local pos = start + while pos <= stop do + local name_len = string_byte(s, pos) + if not name_len then + return nil, "truncated header entry" + end + pos = pos + 1 + if pos + name_len - 1 > stop then + return nil, "header name extends past headers section" + end + local name = string_sub(s, pos, pos + name_len - 1) + pos = pos + name_len + + local value_type = string_byte(s, pos) + if not value_type then + return nil, "missing header value type" + end + pos = pos + 1 + + if value_type == TYPE_STRING or value_type == TYPE_BYTE_ARRAY then + local val_len = read_u16_be(s, pos) + if not val_len then + return nil, "truncated header value length" + end + pos = pos + 2 + if pos + val_len - 1 > stop then + return nil, "header value extends past headers section" + end + headers[name] = string_sub(s, pos, pos + val_len - 1) + pos = pos + val_len + elseif value_type == TYPE_TRUE then + headers[name] = true + elseif value_type == TYPE_FALSE then + headers[name] = false + elseif value_type == TYPE_BYTE then + headers[name] = string_byte(s, pos) + pos = pos + 1 + elseif value_type == TYPE_SHORT then + headers[name] = read_u16_be(s, pos) + pos = pos + 2 + elseif value_type == TYPE_INTEGER then + headers[name] = read_u32_be(s, pos) + pos = pos + 4 + elseif value_type == TYPE_LONG then + -- 64-bit ints don't fit in a Lua double; keep raw bytes. + headers[name] = string_sub(s, pos, pos + 7) + pos = pos + 8 + elseif value_type == TYPE_TIMESTAMP then + headers[name] = string_sub(s, pos, pos + 7) + pos = pos + 8 + elseif value_type == TYPE_UUID then + headers[name] = string_sub(s, pos, pos + 15) + pos = pos + 16 + else + return nil, "unknown header value type: " .. value_type + end + end + return headers +end + + +--- Split a buffer at the last complete frame boundary. +-- @param buf string +-- @return string complete Concatenated complete frames (or "" if none). +-- @return string remainder Bytes after the last complete frame. +function _M.split_buf(buf) + local len = #buf + local pos = 1 + while pos + 11 <= len do + local total_length = read_u32_be(buf, pos) + if not total_length or total_length < 16 or total_length > MAX_FRAME_SIZE then + -- Corrupt prelude. Return everything consumed so far as complete; + -- decode() will surface the error when it sees the bad prelude. + break + end + if pos + total_length - 1 > len then + break + end + pos = pos + total_length + end + if pos == 1 then + return "", buf + end + return string_sub(buf, 1, pos - 1), string_sub(buf, pos) +end + + +--- Decode a buffer of complete frames into events. +-- @param buf string Buffer containing zero or more complete frames. +-- @return table Array of {headers = {string -> string}, payload = string}. +function _M.decode(buf) + local events = {} + local len = #buf + local pos = 1 + while pos <= len do + if pos + 11 > len then + core.log.warn("eventstream: truncated prelude at offset ", pos - 1, + " (buffer ", len, " bytes)") + return events + end + local total_length = read_u32_be(buf, pos) + local headers_length = read_u32_be(buf, pos + 4) + local prelude_crc = read_u32_be(buf, pos + 8) + + if not total_length or total_length < 16 or total_length > MAX_FRAME_SIZE then + core.log.warn("eventstream: invalid total_length ", + tostring(total_length), " at offset ", pos - 1) + return events + end + if headers_length > total_length - 16 then + core.log.warn("eventstream: headers_length ", headers_length, + " exceeds frame body") + return events + end + if pos + total_length - 1 > len then + core.log.warn("eventstream: incomplete frame at offset ", pos - 1) + return events + end + + local computed_prelude_crc = ngx_crc32(string_sub(buf, pos, pos + 7)) + if computed_prelude_crc ~= prelude_crc then + core.log.warn("eventstream: prelude CRC mismatch at offset ", pos - 1, + " expected ", prelude_crc, " got ", computed_prelude_crc) + return events + end + + local headers_start = pos + 12 + local payload_start = headers_start + headers_length + local payload_end = pos + total_length - 5 -- inclusive + local message_crc = read_u32_be(buf, payload_end + 1) + + local computed_message_crc = ngx_crc32(string_sub(buf, pos, payload_end)) + if computed_message_crc ~= message_crc then + core.log.warn("eventstream: message CRC mismatch at offset ", pos - 1, + " expected ", message_crc, " got ", computed_message_crc) + return events + end + + local headers, herr + if headers_length > 0 then + headers, herr = parse_headers(buf, headers_start, payload_start - 1) + if not headers then + core.log.warn("eventstream: failed to parse headers: ", herr) + return events + end + else + headers = {} + end + + events[#events + 1] = { + headers = headers, + payload = string_sub(buf, payload_start, payload_end), + } + pos = pos + total_length + end + return events +end + + +--- Encode an event into a single frame. Used for tests and fixture authoring. +-- @param headers table Map of header name -> string value (only string +-- values supported; this is what Bedrock emits). +-- @param payload string +-- @return string Encoded frame bytes. +function _M.encode(headers, payload) + local header_parts = {} + for name, value in pairs(headers) do + if type(value) ~= "string" then + return nil, "encode supports string-typed headers only" + end + header_parts[#header_parts + 1] = string_char(#name) .. name + .. string_char(TYPE_STRING) .. write_u16_be(#value) .. value + end + local headers_bytes = table_concat(header_parts) + local total_length = 12 + #headers_bytes + #payload + 4 + local prelude = write_u32_be(total_length) .. write_u32_be(#headers_bytes) + local prelude_crc = write_u32_be(ngx_crc32(prelude)) + local body_so_far = prelude .. prelude_crc .. headers_bytes .. payload + local message_crc = write_u32_be(ngx_crc32(body_so_far)) + return body_so_far .. message_crc +end + + +return _M diff --git a/docs/en/latest/plugins/ai-proxy-multi.md b/docs/en/latest/plugins/ai-proxy-multi.md index ca0a9b23953e..771801ba0f93 100644 --- a/docs/en/latest/plugins/ai-proxy-multi.md +++ b/docs/en/latest/plugins/ai-proxy-multi.md @@ -54,13 +54,14 @@ In addition, the Plugin also supports logging LLM request information in the acc When an instance's `provider` is set to `bedrock`, the Plugin expects requests in the [Bedrock Converse API](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_Converse.html) format. The request URI must end with `/converse` and the body must contain a `messages` array. -| Name | Type | Required | Description | -| ------------------ | ------ | -------- | ---------------------------------------------------------------------------------------------------- | -| `messages` | Array | True | An array of message objects. | -| `messages.role` | String | True | Role of the message (`user`, `assistant`). | -| `messages.content` | Array | True | An array of content blocks. Each block contains a `text` field (e.g., `[{"text": "What is 1+1?"}]`). | -| `system` | Array | False | Optional system prompt blocks (e.g., `[{"text": "You are a helpful assistant."}]`). | -| `inferenceConfig` | Object | False | Optional inference parameters such as `maxTokens`, `temperature`, `topP`, `stopSequences`, etc. | +| Name | Type | Required | Description | +| ------------------ | ------- | -------- | ---------------------------------------------------------------------------------------------------- | +| `messages` | Array | True | An array of message objects. | +| `messages.role` | String | True | Role of the message (`user`, `assistant`). | +| `messages.content` | Array | True | An array of content blocks. Each block contains a `text` field (e.g., `[{"text": "What is 1+1?"}]`). | +| `system` | Array | False | Optional system prompt blocks (e.g., `[{"text": "You are a helpful assistant."}]`). | +| `inferenceConfig` | Object | False | Optional inference parameters such as `maxTokens`, `temperature`, `topP`, `stopSequences`, etc. | +| `stream` | Boolean | False | When `true`, the Plugin proxies the request to Bedrock's [`ConverseStream`](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_ConverseStream.html) endpoint and forwards the response in [AWS EventStream](https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html) (`application/vnd.amazon.eventstream`) binary framing. The flag is consumed by the Plugin and is not forwarded to Bedrock. | ## Attributes @@ -2028,10 +2029,12 @@ https://bedrock-runtime.us-east-1.amazonaws.com/model/arn%3Aaws%3Abedrock%3Aus-e If `auth.aws.session_token` is set, it is used for temporary credentials (e.g., obtained from AWS STS or an assumed role) and will be added to the SigV4-signed request automatically. Both `auth.aws.secret_access_key` and `auth.aws.session_token` are stored encrypted. -Streaming responses (Bedrock `ConverseStream`) are not yet supported by the Plugin. - ::: +#### Streaming with Bedrock `ConverseStream` + +To enable streaming, send the same Converse request body with `"stream": true`. The Plugin routes the request to Bedrock's `/model//converse-stream` endpoint and forwards each AWS EventStream frame to the client unchanged. The response `Content-Type` is `application/vnd.amazon.eventstream`; clients must parse the binary framing themselves (most AWS SDKs do this automatically). + ### Proxy to Embedding Models The following example demonstrates how you can configure the `ai-proxy-multi` Plugin to proxy requests and load balance between embedding models. diff --git a/docs/en/latest/plugins/ai-proxy.md b/docs/en/latest/plugins/ai-proxy.md index 5bf7ebdb14b1..40e138b18d7f 100644 --- a/docs/en/latest/plugins/ai-proxy.md +++ b/docs/en/latest/plugins/ai-proxy.md @@ -54,13 +54,14 @@ In addition, the Plugin also supports logging LLM request information in the acc When `provider` is set to `bedrock`, the Plugin expects requests in the [Bedrock Converse API](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_Converse.html) format. The request URI must end with `/converse` and the body must contain a `messages` array. -| Name | Type | Required | Description | -| ------------------ | ------ | -------- | ---------------------------------------------------------------------------------------------------- | -| `messages` | Array | True | An array of message objects. | -| `messages.role` | String | True | Role of the message (`user`, `assistant`). | -| `messages.content` | Array | True | An array of content blocks. Each block contains a `text` field (e.g., `[{"text": "What is 1+1?"}]`). | -| `system` | Array | False | Optional system prompt blocks (e.g., `[{"text": "You are a helpful assistant."}]`). | -| `inferenceConfig` | Object | False | Optional inference parameters such as `maxTokens`, `temperature`, `topP`, etc. | +| Name | Type | Required | Description | +| ------------------ | ------- | -------- | ---------------------------------------------------------------------------------------------------- | +| `messages` | Array | True | An array of message objects. | +| `messages.role` | String | True | Role of the message (`user`, `assistant`). | +| `messages.content` | Array | True | An array of content blocks. Each block contains a `text` field (e.g., `[{"text": "What is 1+1?"}]`). | +| `system` | Array | False | Optional system prompt blocks (e.g., `[{"text": "You are a helpful assistant."}]`). | +| `inferenceConfig` | Object | False | Optional inference parameters such as `maxTokens`, `temperature`, `topP`, etc. | +| `stream` | Boolean | False | When `true`, the Plugin proxies the request to Bedrock's [`ConverseStream`](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_ConverseStream.html) endpoint and forwards the response in [AWS EventStream](https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html) (`application/vnd.amazon.eventstream`) binary framing. The flag is consumed by the Plugin and is not forwarded to Bedrock. | ## Attributes @@ -884,10 +885,23 @@ https://bedrock-runtime.us-east-1.amazonaws.com/model/arn%3Aaws%3Abedrock%3Aus-e If `auth.aws.session_token` is set, it is used for temporary credentials (e.g., obtained from AWS STS or an assumed role) and will be added to the SigV4-signed request automatically. Both `auth.aws.secret_access_key` and `auth.aws.session_token` are stored encrypted. -Streaming responses (Bedrock `ConverseStream`) are not yet supported by the Plugin. - ::: +#### Streaming with Bedrock `ConverseStream` + +To enable streaming, send the same Converse request body with `"stream": true`. The Plugin routes the request to Bedrock's `/model//converse-stream` endpoint and forwards each AWS EventStream frame to the client unchanged. The response `Content-Type` is `application/vnd.amazon.eventstream`; clients must parse the binary framing themselves (most AWS SDKs do this automatically). + +```shell +curl "http://127.0.0.1:9080/bedrock/converse" -X POST \ + -H "Content-Type: application/json" \ + --data '{ + "stream": true, + "messages": [ + {"role": "user", "content": [{"text": "What is 1+1?"}]} + ] + }' --output - +``` + ### Proxy to OpenAI Embedding Models The following example demonstrates how you can configure the `ai-proxy` Plugin to proxy requests to embedding models. This example will use the OpenAI embedding model endpoint. diff --git a/docs/zh/latest/plugins/ai-proxy-multi.md b/docs/zh/latest/plugins/ai-proxy-multi.md index 358b7da0c584..8148def3c2a4 100644 --- a/docs/zh/latest/plugins/ai-proxy-multi.md +++ b/docs/zh/latest/plugins/ai-proxy-multi.md @@ -54,13 +54,14 @@ import TabItem from '@theme/TabItem'; 当某个实例的 `provider` 设置为 `bedrock` 时,插件期望请求采用 [Bedrock Converse API](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_Converse.html) 格式。请求 URI 必须以 `/converse` 结尾,且请求体必须包含 `messages` 数组。 -| 名称 | 类型 | 必选项 | 描述 | -| ------------------ | ------ | -------- | ---------------------------------------------------------------------------------------------------- | -| `messages` | Array | 是 | 消息对象数组。 | -| `messages.role` | String | 是 | 消息的角色(`user`、`assistant`)。 | -| `messages.content` | Array | 是 | 内容块数组。每个块包含一个 `text` 字段(例如 `[{"text": "What is 1+1?"}]`)。 | -| `system` | Array | 否 | 可选的系统提示块(例如 `[{"text": "You are a helpful assistant."}]`)。 | -| `inferenceConfig` | Object | 否 | 可选的推理参数,如 `maxTokens`、`temperature`、`topP` 等。 | +| 名称 | 类型 | 必选项 | 描述 | +| ------------------ | ------- | -------- | ---------------------------------------------------------------------------------------------------- | +| `messages` | Array | 是 | 消息对象数组。 | +| `messages.role` | String | 是 | 消息的角色(`user`、`assistant`)。 | +| `messages.content` | Array | 是 | 内容块数组。每个块包含一个 `text` 字段(例如 `[{"text": "What is 1+1?"}]`)。 | +| `system` | Array | 否 | 可选的系统提示块(例如 `[{"text": "You are a helpful assistant."}]`)。 | +| `inferenceConfig` | Object | 否 | 可选的推理参数,如 `maxTokens`、`temperature`、`topP` 等。 | +| `stream` | Boolean | 否 | 设置为 `true` 时,插件会将请求代理到 Bedrock 的 [`ConverseStream`](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_ConverseStream.html) 接口,并以 [AWS EventStream](https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html) 二进制帧(`application/vnd.amazon.eventstream`)转发响应。该字段由插件消费,不会转发给 Bedrock。 | ## 属性 @@ -1841,10 +1842,12 @@ https://bedrock-runtime.us-east-1.amazonaws.com/model/arn%3Aaws%3Abedrock%3Aus-e 如果设置了 `auth.aws.session_token`,则它将用于临时凭证(例如从 AWS STS 或扮演角色获得的凭证),并将自动添加到 SigV4 签名的请求中。`auth.aws.secret_access_key` 和 `auth.aws.session_token` 都以加密形式存储。 -插件目前尚不支持流式响应(Bedrock `ConverseStream`)。 - ::: +#### 使用 Bedrock `ConverseStream` 进行流式响应 + +要启用流式响应,请使用相同的 Converse 请求体,并在其中加上 `"stream": true`。插件会将请求路由到 Bedrock 的 `/model//converse-stream` 接口,并将 AWS EventStream 帧原样转发给客户端。响应的 `Content-Type` 为 `application/vnd.amazon.eventstream`,客户端需自行解析二进制帧(多数 AWS SDK 已自动处理)。 + ### 代理到嵌入模型 以下示例演示了如何配置 `ai-proxy-multi` 插件以代理请求并在嵌入模型之间进行负载均衡。 diff --git a/docs/zh/latest/plugins/ai-proxy.md b/docs/zh/latest/plugins/ai-proxy.md index 0babeb969136..425961251139 100644 --- a/docs/zh/latest/plugins/ai-proxy.md +++ b/docs/zh/latest/plugins/ai-proxy.md @@ -54,13 +54,14 @@ import TabItem from '@theme/TabItem'; 当 `provider` 设置为 `bedrock` 时,插件期望请求采用 [Bedrock Converse API](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_Converse.html) 格式。请求 URI 必须以 `/converse` 结尾,且请求体必须包含 `messages` 数组。 -| 名称 | 类型 | 必选项 | 描述 | -| ------------------ | ------ | -------- | ---------------------------------------------------------------------------------------------------- | -| `messages` | Array | 是 | 消息对象数组。 | -| `messages.role` | String | 是 | 消息的角色(`user`、`assistant`)。 | -| `messages.content` | Array | 是 | 内容块数组。每个块包含一个 `text` 字段(例如 `[{"text": "What is 1+1?"}]`)。 | -| `system` | Array | 否 | 可选的系统提示块(例如 `[{"text": "You are a helpful assistant."}]`)。 | -| `inferenceConfig` | Object | 否 | 可选的推理参数,如 `maxTokens`、`temperature`、`topP` 等。 | +| 名称 | 类型 | 必选项 | 描述 | +| ------------------ | ------- | -------- | ---------------------------------------------------------------------------------------------------- | +| `messages` | Array | 是 | 消息对象数组。 | +| `messages.role` | String | 是 | 消息的角色(`user`、`assistant`)。 | +| `messages.content` | Array | 是 | 内容块数组。每个块包含一个 `text` 字段(例如 `[{"text": "What is 1+1?"}]`)。 | +| `system` | Array | 否 | 可选的系统提示块(例如 `[{"text": "You are a helpful assistant."}]`)。 | +| `inferenceConfig` | Object | 否 | 可选的推理参数,如 `maxTokens`、`temperature`、`topP` 等。 | +| `stream` | Boolean | 否 | 设置为 `true` 时,插件会将请求代理到 Bedrock 的 [`ConverseStream`](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_ConverseStream.html) 接口,并以 [AWS EventStream](https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html) 二进制帧(`application/vnd.amazon.eventstream`)转发响应。该字段由插件消费,不会转发给 Bedrock。 | ## 属性 @@ -884,10 +885,23 @@ https://bedrock-runtime.us-east-1.amazonaws.com/model/arn%3Aaws%3Abedrock%3Aus-e 如果设置了 `auth.aws.session_token`,则它将用于临时凭证(例如从 AWS STS 或扮演角色获得的凭证),并将自动添加到 SigV4 签名的请求中。`auth.aws.secret_access_key` 和 `auth.aws.session_token` 都以加密形式存储。 -插件目前尚不支持流式响应(Bedrock `ConverseStream`)。 - ::: +#### 使用 Bedrock `ConverseStream` 进行流式响应 + +要启用流式响应,请使用相同的 Converse 请求体,并在其中加上 `"stream": true`。插件会将请求路由到 Bedrock 的 `/model//converse-stream` 接口,并将 AWS EventStream 帧原样转发给客户端。响应的 `Content-Type` 为 `application/vnd.amazon.eventstream`,客户端需自行解析二进制帧(多数 AWS SDK 已自动处理)。 + +```shell +curl "http://127.0.0.1:9080/bedrock/converse" -X POST \ + -H "Content-Type: application/json" \ + --data '{ + "stream": true, + "messages": [ + {"role": "user", "content": [{"text": "What is 1+1?"}]} + ] + }' --output - +``` + ### 代理到 OpenAI 嵌入模型 以下示例演示了如何配置 `ai-proxy` 插件以将请求代理到嵌入模型。此示例将使用 OpenAI 嵌入模型端点。 diff --git a/t/fixtures/bedrock/bedrock-converse-streaming.bin b/t/fixtures/bedrock/bedrock-converse-streaming.bin new file mode 100644 index 0000000000000000000000000000000000000000..db4e9fb5e5e88dbfbdcf1bedf538e0fa1d4c90b7 GIT binary patch literal 1086 zcmc(e%Ss$U6oyMA7#IaLZ&2=}y-gShjV|JKyvDhB3jwKgAGMvDsjz@|a8)~4i4aZKz8{IGNr1rSaZ7x1KE^KUWdD|7q zEgonI$*eJ+L7|Yx{`zo45Jyxi``=&nHiL9EjvkfTsf>d}@y|4ArD^3to!f&JcN0x+ z?{S1W5?LOk)I7HSfUhd;?*u$PJokEt^)%tlUVO8knYaIOR)b;fQ$5;!J37YW?AM!K zLO!Ef-XG~cXvXeLn^QvV|7VvUCgGQvDZufG$*KJG%3Y5>+LmTEg9li$X^s@ znmJxp0fj4`5Ks~bJjWzql!E@76V1~go69|##=kSr;$iZa6J1aFA(nLo6e$nAwzv*4 fHIt*eLr{aDthHQfG|}m(QONtYH*tJBc5?I+uOoC( literal 0 HcmV?d00001 diff --git a/t/lib/server.lua b/t/lib/server.lua index 97908e214e72..bf7c530e1516 100644 --- a/t/lib/server.lua +++ b/t/lib/server.lua @@ -1036,6 +1036,114 @@ function _M.bedrock_converse() end +-- Mock for Bedrock /converse-stream. Reuses the same SigV4 + body shape +-- validation as bedrock_converse(), then serves the recorded EventStream +-- binary fixture so streaming tests can assert end-to-end framing, +-- response-text aggregation, and token-usage extraction. +function _M.bedrock_converse_stream() + local json = require("cjson.safe") + + ngx.log(ngx.WARN, "[test] received uri: ", ngx.var.request_uri) + + if ngx.req.get_method() ~= "POST" then + ngx.status = 400 + ngx.say("Unsupported request method: ", ngx.req.get_method()) + return + end + + local headers = ngx.req.get_headers() + local auth_header = headers["authorization"] + local amz_date = headers["x-amz-date"] + if not auth_header or not amz_date then + ngx.status = 403 + ngx.say(json.encode({message = "Missing Authentication Token"})) + return + end + + if not auth_header:match("^AWS4%-HMAC%-SHA256 ") then + ngx.status = 403 + ngx.say(json.encode({ + message = "Authorization header missing AWS4-HMAC-SHA256 algorithm prefix" + })) + return + end + + if not auth_header:match( + "Credential=AKIAIOSFODNN7EXAMPLE/%d%d%d%d%d%d%d%d/us%-east%-1/bedrock/aws4_request" + ) then + ngx.status = 403 + ngx.say(json.encode({ + message = "Authorization Credential scope does not match expected " + .. "AKIAIOSFODNN7EXAMPLE//us-east-1/bedrock/aws4_request" + })) + return + end + + local hex64 = string.rep("%x", 64) + if not auth_header:match("Signature=" .. hex64) then + ngx.status = 403 + ngx.say(json.encode({ + message = "Authorization Signature is missing or not 64 hex chars" + })) + return + end + + if not amz_date:match("^%d%d%d%d%d%d%d%dT%d%d%d%d%d%dZ$") then + ngx.status = 403 + ngx.say(json.encode({ + message = "X-Amz-Date header does not match YYYYMMDDTHHMMSSZ format" + })) + return + end + + ngx.req.read_body() + local body, err = json.decode(ngx.req.get_body_data() or "") + if not body then + ngx.status = 400 + ngx.say(json.encode({message = "Invalid JSON: " .. (err or "")})) + return + end + + -- Bedrock decides streaming purely by URL; the gateway must strip its + -- internal `stream` flag from the body before forwarding. + if body.stream ~= nil then + ngx.status = 400 + ngx.say(json.encode({ + message = "stream field should not be in request body for /converse-stream" + })) + return + end + + if body.model then + ngx.status = 400 + ngx.say(json.encode({ + message = "model field should not be in request body" + })) + return + end + + if not body.messages or #body.messages < 1 then + ngx.status = 400 + ngx.say(json.encode({message = "messages is required"})) + return + end + + local fixture_loader = require("lib.fixture_loader") + local content, ferr = fixture_loader.load("bedrock/bedrock-converse-streaming.bin") + if not content then + ngx.status = 500 + ngx.say(ferr) + return + end + + ngx.header["Content-Type"] = "application/vnd.amazon.eventstream" + ngx.header["Cache-Control"] = "no-cache" + ngx.header["Transfer-Encoding"] = "chunked" + ngx.print(content) + ngx.flush(true) +end + + -- Error endpoints for ai-request-rewrite tests. function _M.bad_request() ngx.status = 400 @@ -1078,9 +1186,15 @@ end function _M.go() local uri = ngx.var.uri - -- Bedrock Converse API: /model//converse where can contain - -- ':' and percent-encoded sequences (URL-encoded ARNs), which the path-to- - -- function-name conversion below can't represent. Dispatch directly. + -- Bedrock Converse API: /model//converse(-stream) where + -- can contain ':' and percent-encoded sequences (URL-encoded ARNs), + -- which the path-to-function-name conversion below can't represent. + -- Dispatch directly. Match streaming first since /converse is a suffix + -- of /converse-stream. + if uri:match("^/model/.+/converse%-stream$") then + inject_headers() + return _M.bedrock_converse_stream() + end if uri:match("^/model/.+/converse$") then inject_headers() return _M.bedrock_converse() diff --git a/t/plugin/ai-proxy-bedrock-single.t b/t/plugin/ai-proxy-bedrock-single.t index 349036908604..fc7e7d3b5a97 100644 --- a/t/plugin/ai-proxy-bedrock-single.t +++ b/t/plugin/ai-proxy-bedrock-single.t @@ -427,3 +427,73 @@ POST /single-ai/body-model-only/converse --- error_code: 400 --- response_body eval qr/could not resolve upstream path/ + + + +=== TEST 16: route for streaming (single ai-proxy, no path on endpoint) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/9', + ngx.HTTP_PUT, + [[{ + "uri": "/single-ai/stream/converse", + "plugins": { + "ai-proxy": { + "provider": "bedrock", + "auth": { + "aws": { + "access_key_id": "AKIAIOSFODNN7EXAMPLE", + "secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" + } + }, + "provider_conf": { + "region": "us-east-1" + }, + "options": { + "model": "anthropic.claude-3-5-sonnet-20241022-v2:0" + }, + "override": { + "endpoint": "http://127.0.0.1:1980" + }, + "ssl_verify": false + } + } + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed + + + +=== TEST 17: stream request hits /converse-stream and forwards EventStream bytes +--- request +POST /single-ai/stream/converse +{"stream":true,"messages":[{"role":"user","content":[{"text":"Say hi"}]}]} +--- error_code: 200 +--- response_body eval +qr/messageStart.*contentBlockDelta.*Hello.*messageStop.*metadata/s +--- error_log eval +qr{\[test\] received uri: /model/anthropic\.claude-3-5-sonnet-20241022-v2%3A0/converse-stream} +--- response_headers +Content-Type: application/vnd.amazon.eventstream + + + +=== TEST 18: non-stream request on the same route still hits /converse +--- request +POST /single-ai/stream/converse +{"messages":[{"role":"user","content":[{"text":"What is 1+1?"}]}]} +--- error_code: 200 +--- response_body eval +qr/"text"\s*:\s*"Hello!"/ +--- error_log eval +qr{\[test\] received uri: /model/anthropic\.claude-3-5-sonnet-20241022-v2%3A0/converse(?!-stream)} diff --git a/t/plugin/ai-proxy-bedrock.t b/t/plugin/ai-proxy-bedrock.t index 06be07905463..69ee9f95fdd7 100644 --- a/t/plugin/ai-proxy-bedrock.t +++ b/t/plugin/ai-proxy-bedrock.t @@ -516,3 +516,119 @@ POST /ai/body-model-only/converse --- error_code: 400 --- response_body eval qr/could not resolve upstream path/ + + + +=== TEST 17: route for streaming (no path on endpoint, model from options) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/9', + ngx.HTTP_PUT, + [[{ + "uri": "/ai/stream/converse", + "plugins": { + "ai-proxy-multi": { + "instances": [ + { + "name": "bedrock-stream", + "provider": "bedrock", + "weight": 1, + "auth": { + "aws": { + "access_key_id": "AKIAIOSFODNN7EXAMPLE", + "secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" + } + }, + "provider_conf": { + "region": "us-east-1" + }, + "options": { + "model": "anthropic.claude-3-5-sonnet-20241022-v2:0" + }, + "override": { + "endpoint": "http://127.0.0.1:1980" + } + } + ], + "ssl_verify": false + } + } + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed + + + +=== TEST 18: stream request hits /converse-stream and forwards EventStream bytes +--- request +POST /ai/stream/converse +{"stream":true,"messages":[{"role":"user","content":[{"text":"Say hi"}]}]} +--- error_code: 200 +--- response_body eval +qr/messageStart.*contentBlockDelta.*Hello.*messageStop.*metadata/s +--- error_log eval +qr{\[test\] received uri: /model/anthropic\.claude-3-5-sonnet-20241022-v2%3A0/converse-stream} +--- response_headers +Content-Type: application/vnd.amazon.eventstream + + + +=== TEST 19: stream request aggregates response text and token usage +--- config + location /t { + content_by_lua_block { + local http = require("resty.http") + local httpc = http.new() + local res, err = httpc:request_uri("http://127.0.0.1:" .. ngx.var.server_port + .. "/ai/stream/converse", { + method = "POST", + headers = {["Content-Type"] = "application/json"}, + body = [[{"stream":true,"messages":[{"role":"user","content":[{"text":"hi"}]}]}]], + }) + if not res then + ngx.status = 500 + ngx.say(err) + return + end + ngx.status = res.status + -- Body is binary EventStream; expose payload-bearing keywords so the + -- test can assert frame ordering and token-bearing metadata payload. + local body = res.body + local found = {} + for _, name in ipairs({"messageStart", "contentBlockDelta", + "Hello", "messageStop", "metadata", + "inputTokens", "outputTokens"}) do + if body:find(name, 1, true) then + found[#found + 1] = name + end + end + ngx.say(table.concat(found, ",")) + } + } +--- request +GET /t +--- error_code: 200 +--- response_body +messageStart,contentBlockDelta,Hello,messageStop,metadata,inputTokens,outputTokens + + + +=== TEST 20: non-stream request still hits /converse (control) +--- request +POST /ai/stream/converse +{"messages":[{"role":"user","content":[{"text":"What is 1+1?"}]}]} +--- error_code: 200 +--- response_body eval +qr/"text"\s*:\s*"Hello!"/ +--- error_log eval +qr{\[test\] received uri: /model/anthropic\.claude-3-5-sonnet-20241022-v2%3A0/converse(?!-stream)} From 16c099ae9ffa9f94b795a4fe67ddaf7d9e3fbb00 Mon Sep 17 00:00:00 2001 From: Abhishek Choudhary Date: Tue, 28 Apr 2026 09:07:04 +0545 Subject: [PATCH 02/14] refactor(ai-proxy): remove unused encode helper from eventstream codec The eventstream module's encode() function and write_u32_be/write_u16_be helpers had no callers (no tests, no plugins, no transport modules referenced them). Drop them to keep only what's exercised. Decode path is unchanged. --- apisix/plugins/ai-transport/eventstream.lua | 42 --------------------- 1 file changed, 42 deletions(-) diff --git a/apisix/plugins/ai-transport/eventstream.lua b/apisix/plugins/ai-transport/eventstream.lua index 3666394894e6..0dfe7a0f32a9 100644 --- a/apisix/plugins/ai-transport/eventstream.lua +++ b/apisix/plugins/ai-transport/eventstream.lua @@ -36,9 +36,6 @@ local ngx = ngx local ngx_crc32 = ngx.crc32_long local string_byte = string.byte local string_sub = string.sub -local string_char = string.char -local table_concat = table.concat -local type = type -- Hard cap on a single frame size to avoid memory blowups on malformed input. -- AWS documents ConverseStream frames as well under 1 MiB; pick 16 MiB to be @@ -79,21 +76,6 @@ local function read_u16_be(s, pos) end -local function write_u32_be(n) - return string_char( - math.floor(n / 16777216) % 256, - math.floor(n / 65536) % 256, - math.floor(n / 256) % 256, - n % 256 - ) -end - - -local function write_u16_be(n) - return string_char(math.floor(n / 256) % 256, n % 256) -end - - -- Decode a single frame's headers section. -- @param s string Full frame buffer -- @param start int 1-based start of headers @@ -261,28 +243,4 @@ function _M.decode(buf) end ---- Encode an event into a single frame. Used for tests and fixture authoring. --- @param headers table Map of header name -> string value (only string --- values supported; this is what Bedrock emits). --- @param payload string --- @return string Encoded frame bytes. -function _M.encode(headers, payload) - local header_parts = {} - for name, value in pairs(headers) do - if type(value) ~= "string" then - return nil, "encode supports string-typed headers only" - end - header_parts[#header_parts + 1] = string_char(#name) .. name - .. string_char(TYPE_STRING) .. write_u16_be(#value) .. value - end - local headers_bytes = table_concat(header_parts) - local total_length = 12 + #headers_bytes + #payload + 4 - local prelude = write_u32_be(total_length) .. write_u32_be(#headers_bytes) - local prelude_crc = write_u32_be(ngx_crc32(prelude)) - local body_so_far = prelude .. prelude_crc .. headers_bytes .. payload - local message_crc = write_u32_be(ngx_crc32(body_so_far)) - return body_so_far .. message_crc -end - - return _M From aa11f79ccfef9ee626905e963ede1a67e1712f38 Mon Sep 17 00:00:00 2001 From: Abhishek Choudhary Date: Tue, 28 Apr 2026 09:08:07 +0545 Subject: [PATCH 03/14] fix(ai-proxy): move FRAMINGS table below require() statements luacheck flagged sse/eventstream as undefined because the FRAMINGS map referenced them before the require() lines that define them. Move the table below the requires so the references resolve at parse time. --- apisix/plugins/ai-providers/base.lua | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/apisix/plugins/ai-providers/base.lua b/apisix/plugins/ai-providers/base.lua index 2b887c6de3cd..66c14b592af3 100644 --- a/apisix/plugins/ai-providers/base.lua +++ b/apisix/plugins/ai-providers/base.lua @@ -28,15 +28,6 @@ local mt = { -- Maximum SSE buffer size per request (1 MB). local MAX_SSE_BUF_SIZE = 1024 * 1024 --- Streaming framings selectable via provider.streaming_framing. --- Each module exposes split_buf(buf) -> (complete, remainder) and --- decode(buf) -> array of events. The event shape is framing-specific; --- the protocol's parse_sse_event must understand it. -local FRAMINGS = { - sse = sse, - ["aws-eventstream"] = eventstream, -} - local core = require("apisix.core") local plugin = require("apisix.plugin") local url = require("socket.url") @@ -59,6 +50,15 @@ local math = math local ipairs = ipairs local setmetatable = setmetatable +-- Streaming framings selectable via provider.streaming_framing. +-- Each module exposes split_buf(buf) -> (complete, remainder) and +-- decode(buf) -> array of events. The event shape is framing-specific; +-- the protocol's parse_sse_event must understand it. +local FRAMINGS = { + sse = sse, + ["aws-eventstream"] = eventstream, +} + function _M.new(opt) return setmetatable(opt, mt) From 84561c3fcd84c846b55fd3065b268d087ff25b7f Mon Sep 17 00:00:00 2001 From: Abhishek Choudhary Date: Tue, 28 Apr 2026 09:08:48 +0545 Subject: [PATCH 04/14] ci(eclint): exclude binary fixtures from whitespace rules The recorded /converse-stream EventStream fixture (t/fixtures/bedrock/bedrock-converse-streaming.bin) tripped eclint's trim_trailing_whitespace rule because the random binary content happened to include the byte sequence 0x20 0x0a. Add a [*.bin] block mirroring the existing [*.pb] precedent so binary fixtures are skipped by EditorConfig-aware tools. --- .editorconfig | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.editorconfig b/.editorconfig index 38c0dec25ef9..925edb3fbbb7 100644 --- a/.editorconfig +++ b/.editorconfig @@ -52,6 +52,12 @@ insert_final_newline = unset trim_trailing_whitespace = unset end_of_line = unset +[*.bin] +indent_style = unset +insert_final_newline = unset +trim_trailing_whitespace = unset +end_of_line = unset + [*.sse] trim_trailing_whitespace = unset insert_final_newline = unset From c8aef96a37735190a7f76e2a2c1155160a8d6c29 Mon Sep 17 00:00:00 2001 From: Abhishek Choudhary Date: Tue, 28 Apr 2026 09:09:52 +0545 Subject: [PATCH 05/14] fix(ai-proxy): localize tostring in base.lua The streaming framing error path uses tostring() to format the framing name when it's missing or unknown. EE's lj-releng linter requires every Lua global referenced inside a module to be aliased to a local at the top of the file. Add the alias for parity with other locals already declared (table, pairs, type, math, ipairs, setmetatable). --- apisix/plugins/ai-providers/base.lua | 1 + 1 file changed, 1 insertion(+) diff --git a/apisix/plugins/ai-providers/base.lua b/apisix/plugins/ai-providers/base.lua index 66c14b592af3..91f053db771c 100644 --- a/apisix/plugins/ai-providers/base.lua +++ b/apisix/plugins/ai-providers/base.lua @@ -49,6 +49,7 @@ local type = type local math = math local ipairs = ipairs local setmetatable = setmetatable +local tostring = tostring -- Streaming framings selectable via provider.streaming_framing. -- Each module exposes split_buf(buf) -> (complete, remainder) and From d2b5c862a3b804892a1f4cc90ea29b251b11914c Mon Sep 17 00:00:00 2001 From: Abhishek Choudhary Date: Tue, 28 Apr 2026 09:11:53 +0545 Subject: [PATCH 06/14] fix(ai-proxy): validate buffer bounds for fixed-width EventStream header types parse_headers() validated the bounds of TYPE_STRING and TYPE_BYTE_ARRAY values but not the fixed-width types (BYTE, SHORT, INTEGER, LONG, TIMESTAMP, UUID). On a truncated headers section, read_u16_be / read_u32_be silently return nil and that nil was being stored as the header value, masking the framing error. Add explicit pos + size - 1 <= stop checks before each fixed-width read so a malformed frame surfaces a clear error instead of a partially parsed event. --- apisix/plugins/ai-transport/eventstream.lua | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/apisix/plugins/ai-transport/eventstream.lua b/apisix/plugins/ai-transport/eventstream.lua index 0dfe7a0f32a9..9e38680ec6fa 100644 --- a/apisix/plugins/ai-transport/eventstream.lua +++ b/apisix/plugins/ai-transport/eventstream.lua @@ -119,22 +119,40 @@ local function parse_headers(s, start, stop) elseif value_type == TYPE_FALSE then headers[name] = false elseif value_type == TYPE_BYTE then + if pos > stop then + return nil, "truncated header byte value" + end headers[name] = string_byte(s, pos) pos = pos + 1 elseif value_type == TYPE_SHORT then + if pos + 1 > stop then + return nil, "truncated header short value" + end headers[name] = read_u16_be(s, pos) pos = pos + 2 elseif value_type == TYPE_INTEGER then + if pos + 3 > stop then + return nil, "truncated header integer value" + end headers[name] = read_u32_be(s, pos) pos = pos + 4 elseif value_type == TYPE_LONG then + if pos + 7 > stop then + return nil, "truncated header long value" + end -- 64-bit ints don't fit in a Lua double; keep raw bytes. headers[name] = string_sub(s, pos, pos + 7) pos = pos + 8 elseif value_type == TYPE_TIMESTAMP then + if pos + 7 > stop then + return nil, "truncated header timestamp value" + end headers[name] = string_sub(s, pos, pos + 7) pos = pos + 8 elseif value_type == TYPE_UUID then + if pos + 15 > stop then + return nil, "truncated header uuid value" + end headers[name] = string_sub(s, pos, pos + 15) pos = pos + 16 else From 075a7b542e08b61ff7503e6881944d31f2190cb9 Mon Sep 17 00:00:00 2001 From: Abhishek Choudhary Date: Tue, 28 Apr 2026 09:12:48 +0545 Subject: [PATCH 07/14] docs(ai-proxy): correct split_buf corrupt-prelude comment The comment claimed decode() would surface the error, but split_buf only invokes decode() on bytes it has already advanced past. When the corrupt prelude is at offset 0, split_buf returns ("", buf) and decode() is never called on those bytes. Update the comment to describe the actual behavior in both cases (corrupt frame at offset 0 vs. after some valid frames). --- apisix/plugins/ai-transport/eventstream.lua | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/apisix/plugins/ai-transport/eventstream.lua b/apisix/plugins/ai-transport/eventstream.lua index 9e38680ec6fa..523e5294295e 100644 --- a/apisix/plugins/ai-transport/eventstream.lua +++ b/apisix/plugins/ai-transport/eventstream.lua @@ -173,8 +173,12 @@ function _M.split_buf(buf) while pos + 11 <= len do local total_length = read_u32_be(buf, pos) if not total_length or total_length < 16 or total_length > MAX_FRAME_SIZE then - -- Corrupt prelude. Return everything consumed so far as complete; - -- decode() will surface the error when it sees the bad prelude. + -- Corrupt prelude. Stop scanning and leave the invalid bytes in + -- the remainder. If this happens after one or more valid frames, + -- those are returned as `complete` and decode() processes them + -- normally; the corrupt frame stays in `remainder` for the next + -- read. If it happens at offset 0, split_buf returns ("", buf) + -- and the corrupt bytes accumulate until MAX_REMAINDER trips. break end if pos + total_length - 1 > len then From 69f0fd3e38b48e8bb4c0ac401b79f87e0d48a48e Mon Sep 17 00:00:00 2001 From: Abhishek Choudhary Date: Tue, 28 Apr 2026 09:13:44 +0545 Subject: [PATCH 08/14] fix(ai-proxy): don't log raw Bedrock exception payloads The streaming exception path logged event.payload verbatim. Bedrock exception payloads are upstream-controlled JSON and may include partial model output, prompt fragments, or other request content. Log only the typed error and the payload size to avoid leaking that data into error logs. --- apisix/plugins/ai-protocols/bedrock-converse.lua | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/apisix/plugins/ai-protocols/bedrock-converse.lua b/apisix/plugins/ai-protocols/bedrock-converse.lua index 99e727d7e75b..ded17d21f379 100644 --- a/apisix/plugins/ai-protocols/bedrock-converse.lua +++ b/apisix/plugins/ai-protocols/bedrock-converse.lua @@ -71,8 +71,11 @@ function _M.parse_sse_event(event, ctx, state) local err_type = event.headers[":exception-type"] or event.headers[":error-code"] or "unknown" + -- Don't log event.payload: it's upstream-controlled JSON that may + -- contain partial completions, prompt fragments, or other request + -- content. Log just the typed error and the payload size. core.log.warn("Bedrock streaming exception: type=", err_type, - ", payload=", event.payload or "") + ", payload_size=", #(event.payload or "")) return { type = "done" } end From 68b767cb12ddd95573490c2af926b3ad4517afa9 Mon Sep 17 00:00:00 2001 From: Abhishek Choudhary Date: Tue, 28 Apr 2026 09:15:40 +0545 Subject: [PATCH 09/14] fix(ai-proxy): use framing-specific buffer cap in streaming loop The 1 MiB MAX_SSE_BUF_SIZE was applied to every framing. AWS EventStream allows frames up to 16 MiB; a valid >1 MiB frame split across reads would have its in-progress bytes silently discarded by the cap. Each framing module now exposes its own `max_remainder` (sse: 1 MiB, unchanged; eventstream: MAX_FRAME_SIZE = 16 MiB), and parse_streaming_response reads framing.max_remainder instead of the hardcoded constant. A 1 MiB fallback is kept for any framing that forgets to declare one. --- apisix/plugins/ai-providers/base.lua | 8 +++----- apisix/plugins/ai-transport/eventstream.lua | 8 +++++++- apisix/plugins/ai-transport/sse.lua | 8 +++++++- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/apisix/plugins/ai-providers/base.lua b/apisix/plugins/ai-providers/base.lua index 91f053db771c..ba0708c96c1d 100644 --- a/apisix/plugins/ai-providers/base.lua +++ b/apisix/plugins/ai-providers/base.lua @@ -25,9 +25,6 @@ local mt = { __index = _M } --- Maximum SSE buffer size per request (1 MB). -local MAX_SSE_BUF_SIZE = 1024 * 1024 - local core = require("apisix.core") local plugin = require("apisix.plugin") local url = require("socket.url") @@ -447,8 +444,9 @@ function _M.parse_streaming_response(self, ctx, res, target_proto, converter, co sse_buf = sse_buf .. chunk local complete, remainder = framing.split_buf(sse_buf) - if #remainder > MAX_SSE_BUF_SIZE then - core.log.warn("stream remainder exceeded ", MAX_SSE_BUF_SIZE, " bytes, resetting") + local max_remainder = framing.max_remainder or 1024 * 1024 + if #remainder > max_remainder then + core.log.warn("stream remainder exceeded ", max_remainder, " bytes, resetting") remainder = "" end sse_buf = remainder diff --git a/apisix/plugins/ai-transport/eventstream.lua b/apisix/plugins/ai-transport/eventstream.lua index 523e5294295e..c01330999e1c 100644 --- a/apisix/plugins/ai-transport/eventstream.lua +++ b/apisix/plugins/ai-transport/eventstream.lua @@ -55,7 +55,13 @@ local TYPE_STRING = 7 local TYPE_TIMESTAMP = 8 local TYPE_UUID = 9 -local _M = {} +local _M = { + -- Cap on bytes split_buf may leave in `remainder`. The streaming loop + -- in ai-providers.base uses this to bound the buffer when a frame has + -- not yet completed. A single in-progress frame can be up to + -- MAX_FRAME_SIZE bytes, so the remainder cap matches that. + max_remainder = MAX_FRAME_SIZE, +} local function read_u32_be(s, pos) diff --git a/apisix/plugins/ai-transport/sse.lua b/apisix/plugins/ai-transport/sse.lua index 249fcf9f8b5d..ad8ab271224d 100644 --- a/apisix/plugins/ai-transport/sse.lua +++ b/apisix/plugins/ai-transport/sse.lua @@ -24,7 +24,13 @@ local tonumber = tonumber local tostring = tostring local ipairs = ipairs -local _M = {} +local _M = { + -- Cap on bytes split_buf may leave in `remainder`. Used by the streaming + -- loop in ai-providers.base to bound the buffer when frames don't + -- complete. SSE frames are small (text events delimited by blank lines), + -- so 1 MiB is plenty. + max_remainder = 1024 * 1024, +} --- Decode an SSE text chunk into a list of event tables. From 43909ff8ed4cd85eb4dfbfa63931f1241ebd2609 Mon Sep 17 00:00:00 2001 From: Abhishek Choudhary Date: Tue, 28 Apr 2026 09:18:48 +0545 Subject: [PATCH 10/14] fix(ai-proxy): validate prelude CRC inside split_buf MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit split_buf advanced pos based on total_length alone. If a chunk contained [valid_frame, corrupt_frame, valid_frame], split_buf would consume all three into `complete`, decode() would succeed on frame 1 then stop on frame 2's CRC mismatch, and frame 3 would be silently lost — its bytes had already been advanced past and weren't preserved in remainder. Validate the prelude CRC inside split_buf before advancing. On mismatch, leave the corrupt frame and everything after it in remainder so the caller (or max_remainder cap) can handle it. Message CRC is still checked by decode() — accepted trade-off vs. CRCing the full frame twice on every iteration. --- apisix/plugins/ai-transport/eventstream.lua | 29 ++++++++++++++++----- 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/apisix/plugins/ai-transport/eventstream.lua b/apisix/plugins/ai-transport/eventstream.lua index c01330999e1c..0fc034c56a43 100644 --- a/apisix/plugins/ai-transport/eventstream.lua +++ b/apisix/plugins/ai-transport/eventstream.lua @@ -170,6 +170,19 @@ end --- Split a buffer at the last complete frame boundary. +-- A "complete" frame here is one whose prelude length fields are sane, +-- whose full byte range is present, AND whose prelude CRC validates. +-- The CRC check matters because split_buf advances pos based on +-- total_length alone — without it, a frame with a sane length but bad +-- prelude CRC would be consumed into `complete`, decode() would stop on +-- it, and any valid frames behind it in the same chunk would be lost +-- (already past pos, not preserved in remainder). Validating here keeps +-- corrupt frames in `remainder` so the caller can either resync or trip +-- max_remainder. Message CRC is intentionally not checked here (decode() +-- handles that); a frame with a good prelude but bad payload CRC is rare +-- and any frames behind it in the same chunk would still be advanced +-- past — accepted trade-off vs. the cost of computing the message CRC +-- twice on every frame. -- @param buf string -- @return string complete Concatenated complete frames (or "" if none). -- @return string remainder Bytes after the last complete frame. @@ -179,15 +192,19 @@ function _M.split_buf(buf) while pos + 11 <= len do local total_length = read_u32_be(buf, pos) if not total_length or total_length < 16 or total_length > MAX_FRAME_SIZE then - -- Corrupt prelude. Stop scanning and leave the invalid bytes in - -- the remainder. If this happens after one or more valid frames, - -- those are returned as `complete` and decode() processes them - -- normally; the corrupt frame stays in `remainder` for the next - -- read. If it happens at offset 0, split_buf returns ("", buf) - -- and the corrupt bytes accumulate until MAX_REMAINDER trips. + -- Corrupt total_length field. Stop and leave the bytes in the + -- remainder; decode() never sees them. break end if pos + total_length - 1 > len then + -- Frame not yet fully in buffer — wait for more chunks. + break + end + local prelude_crc = read_u32_be(buf, pos + 8) + if ngx_crc32(string_sub(buf, pos, pos + 7)) ~= prelude_crc then + -- Prelude CRC mismatch. Don't advance: keep this corrupt + -- frame and everything after in `remainder`, so we don't + -- silently consume valid frames sitting behind it. break end pos = pos + total_length From beb8fe7af009040b894453fdcab40c9bb71fff32 Mon Sep 17 00:00:00 2001 From: Abhishek Choudhary Date: Tue, 28 Apr 2026 09:23:11 +0545 Subject: [PATCH 11/14] fix(ai-proxy): localize tostring in eventstream.lua The decode() warn path uses tostring() to format an invalid total_length field. lj-releng requires every Lua global referenced inside a module to be aliased to a local at the top of the file. --- apisix/plugins/ai-transport/eventstream.lua | 1 + 1 file changed, 1 insertion(+) diff --git a/apisix/plugins/ai-transport/eventstream.lua b/apisix/plugins/ai-transport/eventstream.lua index 0fc034c56a43..6b8af8e19e97 100644 --- a/apisix/plugins/ai-transport/eventstream.lua +++ b/apisix/plugins/ai-transport/eventstream.lua @@ -36,6 +36,7 @@ local ngx = ngx local ngx_crc32 = ngx.crc32_long local string_byte = string.byte local string_sub = string.sub +local tostring = tostring -- Hard cap on a single frame size to avoid memory blowups on malformed input. -- AWS documents ConverseStream frames as well under 1 MiB; pick 16 MiB to be From f7c8b760b7de8095619331839a180330a429bfd0 Mon Sep 17 00:00:00 2001 From: Abhishek Choudhary Date: Tue, 28 Apr 2026 10:02:17 +0545 Subject: [PATCH 12/14] fix(ai-proxy): dispatch streaming response when upstream is AWS EventStream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The streaming-vs-non-streaming choice in ai-proxy/base.lua was made by testing whether the upstream Content-Type contains 'text/event-stream'. Bedrock ConverseStream returns 'application/vnd.amazon.eventstream', which doesn't match — so even though the gateway correctly routed the request to /converse-stream, the response was processed by parse_response (non-streaming), bypassing the framing-aware parse_streaming_response that decodes events for usage and response-text extraction. In passthrough mode (Bedrock has no converter) the bytes still reached the client because parse_response falls through to lua_response_filter on a JSON-decode failure, but llm_prompt_tokens / llm_completion_tokens / llm_response_text were never populated and the SSE-event parser was never invoked. Recognize 'application/vnd.amazon.eventstream' as a streaming response content-type alongside 'text/event-stream', and switch to plain-text matching so the dot in 'application/vnd.amazon.eventstream' isn't interpreted as a regex wildcard. --- apisix/plugins/ai-proxy/base.lua | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/apisix/plugins/ai-proxy/base.lua b/apisix/plugins/ai-proxy/base.lua index 1795c4c62c0f..302c27986543 100644 --- a/apisix/plugins/ai-proxy/base.lua +++ b/apisix/plugins/ai-proxy/base.lua @@ -225,8 +225,18 @@ function _M.before_proxy(conf, ctx, on_error) core.response.set_header("Content-Type", content_type) -- Step 5: Parse response + -- Streaming responses arrive with provider-specific framing + -- content-types: SSE for OpenAI/Anthropic/etc., AWS EventStream + -- binary frames for Bedrock ConverseStream. The framing module + -- is selected inside parse_streaming_response via + -- provider.streaming_framing. local code, body - if content_type and core.string.find(content_type, "text/event-stream") then + local is_streaming_resp = content_type and ( + core.string.find(content_type, "text/event-stream", 1, true) or + core.string.find(content_type, + "application/vnd.amazon.eventstream", 1, true) + ) + if is_streaming_resp then local target_proto_module = protocols.get(target_proto) if not target_proto_module then core.log.error("no protocol module for streaming target: ", target_proto) From 9bdf2c4cf4c45e8c2cdcf4b324ff4aee45b2863c Mon Sep 17 00:00:00 2001 From: Abhishek Choudhary Date: Tue, 28 Apr 2026 10:02:29 +0545 Subject: [PATCH 13/14] test(ai-proxy): assert streaming pipeline runs in Bedrock TEST 19 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous TEST 19 only checked that the raw EventStream bytes were forwarded to the client and that token-related strings appeared in the binary. Both of those would still pass even if parse_streaming_response never ran (parse_response falls through to plain byte forwarding on JSON-decode failure, and the binary contains 'inputTokens'/'outputTokens' literally), so the gateway's usage-extraction path wasn't actually exercised. Add an error_log assertion for 'got token usage from ai service' — that log line is only emitted from parse_streaming_response when the Bedrock metadata frame is parsed and merge_usage() runs. This catches a regression where the response handler bypasses parse_streaming_response (as the just-fixed Content-Type dispatch bug did). --- t/plugin/ai-proxy-bedrock.t | 2 ++ 1 file changed, 2 insertions(+) diff --git a/t/plugin/ai-proxy-bedrock.t b/t/plugin/ai-proxy-bedrock.t index 69ee9f95fdd7..00afa3e3a975 100644 --- a/t/plugin/ai-proxy-bedrock.t +++ b/t/plugin/ai-proxy-bedrock.t @@ -620,6 +620,8 @@ GET /t --- error_code: 200 --- response_body messageStart,contentBlockDelta,Hello,messageStop,metadata,inputTokens,outputTokens +--- error_log eval +qr/got token usage from ai service/ From 3b667a8545c17eb273e72580f3a9a4c0bc0b0f09 Mon Sep 17 00:00:00 2001 From: Abhishek Choudhary Date: Wed, 6 May 2026 11:16:08 +0800 Subject: [PATCH 14/14] refactor(ai-proxy): rename eventstream module to aws-eventstream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The transport module implements AWS EventStream binary framing — a name specific to AWS, not a generic event stream concept. Renaming so reviewers and maintainers can tell at a glance that this is AWS-specific. --- apisix/plugins/ai-protocols/bedrock-converse.lua | 4 ++-- apisix/plugins/ai-providers/base.lua | 4 ++-- .../{eventstream.lua => aws-eventstream.lua} | 14 +++++++------- 3 files changed, 11 insertions(+), 11 deletions(-) rename apisix/plugins/ai-transport/{eventstream.lua => aws-eventstream.lua} (94%) diff --git a/apisix/plugins/ai-protocols/bedrock-converse.lua b/apisix/plugins/ai-protocols/bedrock-converse.lua index ded17d21f379..5c7112c063df 100644 --- a/apisix/plugins/ai-protocols/bedrock-converse.lua +++ b/apisix/plugins/ai-protocols/bedrock-converse.lua @@ -19,7 +19,7 @@ -- Handles detection and response parsing for the Amazon Bedrock -- Converse API format. Streaming uses the /converse-stream endpoint with -- AWS EventStream binary framing; this module's parse_sse_event consumes --- the {headers, payload} event shape produced by ai-transport.eventstream. +-- the {headers, payload} event shape produced by ai-transport.aws-eventstream. local core = require("apisix.core") local string_sub = string.sub @@ -57,7 +57,7 @@ end --- Parse a streaming event from Bedrock's EventStream framing. --- Event shape comes from ai-transport.eventstream: {headers, payload}. +-- Event shape comes from ai-transport.aws-eventstream: {headers, payload}. -- Bedrock standard headers: :event-type, :content-type, :message-type. -- :message-type is "event" for normal events and "exception" for typed -- streaming errors (throttlingException, modelStreamErrorException, etc.). diff --git a/apisix/plugins/ai-providers/base.lua b/apisix/plugins/ai-providers/base.lua index ba0708c96c1d..5aaddbe1ac4a 100644 --- a/apisix/plugins/ai-providers/base.lua +++ b/apisix/plugins/ai-providers/base.lua @@ -29,7 +29,7 @@ local core = require("apisix.core") local plugin = require("apisix.plugin") local url = require("socket.url") local sse = require("apisix.plugins.ai-transport.sse") -local eventstream = require("apisix.plugins.ai-transport.eventstream") +local aws_eventstream = require("apisix.plugins.ai-transport.aws-eventstream") local transport_http = require("apisix.plugins.ai-transport.http") local transport_auth = require("apisix.plugins.ai-transport.auth") local log_sanitize = require("apisix.utils.log-sanitize") @@ -54,7 +54,7 @@ local tostring = tostring -- the protocol's parse_sse_event must understand it. local FRAMINGS = { sse = sse, - ["aws-eventstream"] = eventstream, + ["aws-eventstream"] = aws_eventstream, } diff --git a/apisix/plugins/ai-transport/eventstream.lua b/apisix/plugins/ai-transport/aws-eventstream.lua similarity index 94% rename from apisix/plugins/ai-transport/eventstream.lua rename to apisix/plugins/ai-transport/aws-eventstream.lua index 6b8af8e19e97..09f017843685 100644 --- a/apisix/plugins/ai-transport/eventstream.lua +++ b/apisix/plugins/ai-transport/aws-eventstream.lua @@ -226,7 +226,7 @@ function _M.decode(buf) local pos = 1 while pos <= len do if pos + 11 > len then - core.log.warn("eventstream: truncated prelude at offset ", pos - 1, + core.log.warn("aws-eventstream: truncated prelude at offset ", pos - 1, " (buffer ", len, " bytes)") return events end @@ -235,23 +235,23 @@ function _M.decode(buf) local prelude_crc = read_u32_be(buf, pos + 8) if not total_length or total_length < 16 or total_length > MAX_FRAME_SIZE then - core.log.warn("eventstream: invalid total_length ", + core.log.warn("aws-eventstream: invalid total_length ", tostring(total_length), " at offset ", pos - 1) return events end if headers_length > total_length - 16 then - core.log.warn("eventstream: headers_length ", headers_length, + core.log.warn("aws-eventstream: headers_length ", headers_length, " exceeds frame body") return events end if pos + total_length - 1 > len then - core.log.warn("eventstream: incomplete frame at offset ", pos - 1) + core.log.warn("aws-eventstream: incomplete frame at offset ", pos - 1) return events end local computed_prelude_crc = ngx_crc32(string_sub(buf, pos, pos + 7)) if computed_prelude_crc ~= prelude_crc then - core.log.warn("eventstream: prelude CRC mismatch at offset ", pos - 1, + core.log.warn("aws-eventstream: prelude CRC mismatch at offset ", pos - 1, " expected ", prelude_crc, " got ", computed_prelude_crc) return events end @@ -263,7 +263,7 @@ function _M.decode(buf) local computed_message_crc = ngx_crc32(string_sub(buf, pos, payload_end)) if computed_message_crc ~= message_crc then - core.log.warn("eventstream: message CRC mismatch at offset ", pos - 1, + core.log.warn("aws-eventstream: message CRC mismatch at offset ", pos - 1, " expected ", message_crc, " got ", computed_message_crc) return events end @@ -272,7 +272,7 @@ function _M.decode(buf) if headers_length > 0 then headers, herr = parse_headers(buf, headers_start, payload_start - 1) if not headers then - core.log.warn("eventstream: failed to parse headers: ", herr) + core.log.warn("aws-eventstream: failed to parse headers: ", herr) return events end else