From eeb5559b0579b0c8851f4e0b91baacab1a8f7f20 Mon Sep 17 00:00:00 2001 From: Ming Wen Date: Sat, 28 Feb 2026 17:21:30 +0800 Subject: [PATCH 1/3] feat(ai-proxy): support automatic protocol translation from Claude to OpenAI This feature enables the `ai-proxy` plugin to automatically detect requests directed to `/v1/messages` (the Claude API format), translate them into the OpenAI API format (`/v1/chat/completions`), forward them to an OpenAI-compatible upstream, and translate the response (including SSE streams) back into the Claude API format. Added a new module `claude_to_openai.lua` for the core translation logic, integrated it into `ai-proxy/base.lua` and `ai-drivers/openai-base.lua`, and added a comprehensive test suite `t/plugin/ai-proxy-claude.t`. Also updated the documentation with usage examples and an architecture diagram. --- apisix/plugins/ai-drivers/openai-base.lua | 13 + apisix/plugins/ai-proxy/base.lua | 5 + .../ai-proxy/converter/claude_to_openai.lua | 176 +++++++ docs/en/latest/plugins/ai-proxy.md | 88 ++++ t/plugin/ai-proxy-claude.t | 468 ++++++++++++++++++ 5 files changed, 750 insertions(+) create mode 100644 apisix/plugins/ai-proxy/converter/claude_to_openai.lua create mode 100644 t/plugin/ai-proxy-claude.t diff --git a/apisix/plugins/ai-drivers/openai-base.lua b/apisix/plugins/ai-drivers/openai-base.lua index 4f279bbc3eab..ce9dc8724785 100644 --- a/apisix/plugins/ai-drivers/openai-base.lua +++ b/apisix/plugins/ai-drivers/openai-base.lua @@ -16,6 +16,8 @@ -- local _M = {} +local claude_converter = require("apisix.plugins.ai-proxy.converter.claude_to_openai") + local mt = { __index = _M } @@ -62,6 +64,10 @@ function _M.validate_request(ctx) return nil, err end + if ctx.ai_client_protocol == "claude" then + request_table = claude_converter.convert_request(request_table) + end + return request_table, nil end @@ -147,6 +153,7 @@ local function read_response(conf, ctx, res, response_filter) ::CONTINUE:: end + if ctx.ai_client_protocol == "claude" then chunk = claude_converter.convert_sse_events(ctx, chunk) end plugin.lua_response_filter(ctx, res.headers, chunk) end end @@ -208,6 +215,12 @@ local function read_response(conf, ctx, res, response_filter) ctx.var.llm_response_text = content_to_check end end + if ctx.ai_client_protocol == "claude" and res.status == 200 then + local res_body_tab = core.json.decode(raw_res_body) + if res_body_tab then + raw_res_body = core.json.encode(claude_converter.convert_response(res_body_tab)) + end + end plugin.lua_response_filter(ctx, headers, raw_res_body) end diff --git a/apisix/plugins/ai-proxy/base.lua b/apisix/plugins/ai-proxy/base.lua index 324ac2da5f51..b610954867eb 100644 --- a/apisix/plugins/ai-proxy/base.lua +++ b/apisix/plugins/ai-proxy/base.lua @@ -53,6 +53,11 @@ function _M.before_proxy(conf, ctx, on_error) local ai_instance = ctx.picked_ai_instance local ai_driver = require("apisix.plugins.ai-drivers." .. ai_instance.provider) + local is_claude = core.string.has_suffix(ctx.var.uri, "/v1/messages") + if is_claude then + ctx.ai_client_protocol = "claude" + end + local request_body, err = ai_driver.validate_request(ctx) if not request_body then return 400, err diff --git a/apisix/plugins/ai-proxy/converter/claude_to_openai.lua b/apisix/plugins/ai-proxy/converter/claude_to_openai.lua new file mode 100644 index 000000000000..258d0b1a1d35 --- /dev/null +++ b/apisix/plugins/ai-proxy/converter/claude_to_openai.lua @@ -0,0 +1,176 @@ +local core = require("apisix.core") +local type = type +local sse = require("apisix.plugins.ai-drivers.sse") +local table = table +local ipairs = ipairs + +local _M = {} + +function _M.convert_request(request_table) + local openai_req = core.table.clone(request_table) + + if openai_req.system then + local system_content + if type(openai_req.system) == "string" then + system_content = openai_req.system + elseif type(openai_req.system) == "table" then + system_content = "" + for _, block in ipairs(openai_req.system) do + if type(block) == "table" and block.type == "text" then + system_content = system_content .. block.text + end + end + end + + if system_content and system_content ~= "" then + if not openai_req.messages then + openai_req.messages = {} + end + core.table.insert(openai_req.messages, 1, { + role = "system", + content = system_content + }) + end + openai_req.system = nil + end + + return openai_req +end + +function _M.convert_response(openai_res) + local content = "" + local finish_reason = "end_turn" + + if openai_res.choices and openai_res.choices[1] then + if openai_res.choices[1].message then + content = openai_res.choices[1].message.content or "" + end + if openai_res.choices[1].finish_reason ~= "stop" and openai_res.choices[1].finish_reason ~= nil then + finish_reason = openai_res.choices[1].finish_reason + end + end + + local input_tokens = 0 + local output_tokens = 0 + if openai_res.usage then + input_tokens = openai_res.usage.prompt_tokens or 0 + output_tokens = openai_res.usage.completion_tokens or 0 + end + + return { + id = openai_res.id or "msg_unknown", + type = "message", + role = "assistant", + model = openai_res.model or "unknown", + content = { + { + type = "text", + text = content + } + }, + stop_reason = finish_reason, + stop_sequence = core.json.null, + usage = { + input_tokens = input_tokens, + output_tokens = output_tokens + } + } +end + +function _M.convert_sse_events(ctx, chunk) + local events = sse.decode(chunk) + if not events or #events == 0 then + return chunk + end + + local out_events = {} + + for _, event in ipairs(events) do + if event.type == "message" and event.data ~= "[DONE]" then + local data, err = core.json.decode(event.data) + if data then + if not ctx.claude_sse_started then + ctx.claude_sse_started = true + core.table.insert(out_events, "event: message_start\ndata: " .. core.json.encode({ + type = "message_start", + message = { + id = data.id or "msg_unknown", + type = "message", + role = "assistant", + model = data.model or "unknown", + content = {}, + stop_reason = core.json.null, + stop_sequence = core.json.null, + usage = { input_tokens = 0, output_tokens = 0 } + } + }) .. "\n\n") + + core.table.insert(out_events, "event: content_block_start\ndata: " .. core.json.encode({ + type = "content_block_start", + index = 0, + content_block = { type = "text", text = "" } + }) .. "\n\n") + end + + if data.choices and data.choices[1] then + local choice = data.choices[1] + if choice.delta and choice.delta.content and choice.delta.content ~= "" then + core.table.insert(out_events, "event: content_block_delta\ndata: " .. core.json.encode({ + type = "content_block_delta", + index = 0, + delta = { type = "text_delta", text = choice.delta.content } + }) .. "\n\n") + end + + if choice.finish_reason and choice.finish_reason ~= core.json.null then + local stop_reason = choice.finish_reason == "stop" and "end_turn" or choice.finish_reason + ctx.claude_stop_reason = stop_reason + + core.table.insert(out_events, "event: content_block_stop\ndata: " .. core.json.encode({ + type = "content_block_stop", + index = 0 + }) .. "\n\n") + end + end + + if data.usage and type(data.usage) == "table" then + core.table.insert(out_events, "event: message_delta\ndata: " .. core.json.encode({ + type = "message_delta", + delta = { + stop_reason = ctx.claude_stop_reason or "end_turn", + stop_sequence = core.json.null + }, + usage = { + output_tokens = data.usage.completion_tokens or 0 + } + }) .. "\n\n") + + ctx.claude_message_delta_emitted = true + end + end + elseif event.type == "message" and event.data == "[DONE]" then + if not ctx.claude_message_delta_emitted and ctx.claude_stop_reason then + core.table.insert(out_events, "event: message_delta\ndata: " .. core.json.encode({ + type = "message_delta", + delta = { + stop_reason = ctx.claude_stop_reason, + stop_sequence = core.json.null + }, + usage = { output_tokens = 0 } + }) .. "\n\n") + end + + core.table.insert(out_events, "event: message_stop\ndata: " .. core.json.encode({ + type = "message_stop" + }) .. "\n\n") + end + end + + if #out_events > 0 then + return table.concat(out_events, "") + else + return "" + end +end + +return _M diff --git a/docs/en/latest/plugins/ai-proxy.md b/docs/en/latest/plugins/ai-proxy.md index 56bc7e1f081d..256cedc73274 100644 --- a/docs/en/latest/plugins/ai-proxy.md +++ b/docs/en/latest/plugins/ai-proxy.md @@ -37,6 +37,28 @@ description: The ai-proxy Plugin simplifies access to LLM and embedding models p The `ai-proxy` Plugin simplifies access to LLM and embedding models by transforming Plugin configurations into the designated request format. It supports the integration with OpenAI, DeepSeek, Azure, AIMLAPI, Anthropic, OpenRouter, Gemini, Vertex AI, and other OpenAI-compatible APIs. +### Automatic Protocol Translation + +The `ai-proxy` plugin supports automatic protocol detection and translation from the Claude API format to the OpenAI API format. If a client sends a request to a route ending with `/v1/messages` (the standard Claude API endpoint), the plugin will automatically convert the request from Claude's format to OpenAI's format, send it to the upstream OpenAI-compatible service, and translate the response back to Claude's format. This is particularly useful when using AI tools or extensions that strictly require the Claude API, allowing them to work seamlessly with an OpenAI-compatible backend. + +#### Architecture + +```mermaid +sequenceDiagram + participant Client as Client (Claude API) + participant APISIX as APISIX (ai-proxy) + participant Upstream as Upstream (OpenAI API) + + Client->>APISIX: POST /v1/messages (Claude format) + Note over APISIX: Detects /v1/messages
Translates Request + APISIX->>Upstream: POST /v1/chat/completions (OpenAI format) + Upstream-->>APISIX: Response (JSON / SSE) + Note over APISIX: Translates Response + APISIX-->>Client: Response (Claude format) +``` + + + In addition, the Plugin also supports logging LLM request information in the access log, such as token usage, model, time to the first response, and more. ## Request Format @@ -89,6 +111,72 @@ admin_key=$(yq '.deployment.admin.admin_key[0].key' conf/config.yaml | sed 's/"/ ::: +### Protocol Translation: Claude to OpenAI + +This example demonstrates how to configure the plugin to accept Claude API requests and translate them to an OpenAI backend. The plugin automatically detects the `/v1/messages` endpoint. + +Create a Route mapped to the `/v1/messages` endpoint: + +```shell +curl "http://127.0.0.1:9180/apisix/admin/routes/claude-to-openai" -X PUT \ + -H "X-API-KEY: ${admin_key}" \ + -d '{ + "uri": "/v1/messages", + "plugins": { + "ai-proxy": { + "auth": { + "header": { + "Authorization": "Bearer " + } + }, + "provider": "openai" + } + } + }' +``` + +Send a request using the Claude API format: + +```shell +curl -X POST http://127.0.0.1:9080/v1/messages \ + -H "Content-Type: application/json" \ + -H "x-api-key: mock-key" \ + -d '{ + "model": "claude-3-opus-20240229", + "max_tokens": 1024, + "system": "You are a helpful assistant.", + "messages": [ + { + "role": "user", + "content": "Hello!" + } + ] + }' +``` + +The response will be seamlessly translated back to the Claude API format: + +```json +{ + "id": "chatcmpl-9q...", + "type": "message", + "role": "assistant", + "model": "gpt-4o-2024-05-13", + "content": [ + { + "type": "text", + "text": "Hello! How can I help you today?" + } + ], + "stop_reason": "end_turn", + "stop_sequence": null, + "usage": { + "input_tokens": 14, + "output_tokens": 9 + } +} +``` + ### Proxy to OpenAI The following example demonstrates how you can configure the API key, model, and other parameters in the `ai-proxy` Plugin and configure the Plugin on a Route to proxy user prompts to OpenAI. diff --git a/t/plugin/ai-proxy-claude.t b/t/plugin/ai-proxy-claude.t new file mode 100644 index 000000000000..1b4f2b3dca22 --- /dev/null +++ b/t/plugin/ai-proxy-claude.t @@ -0,0 +1,468 @@ +use t::APISIX 'no_plan'; + +log_level("info"); +repeat_each(1); +no_long_string(); +no_root_location(); + +my $resp_file = 't/assets/openai-compatible-api-response.json'; +open(my $fh, '<', $resp_file) or die "Could not open file '$resp_file' $!"; +my $resp = do { local $/; <$fh> }; +close($fh); + +add_block_preprocessor(sub { + my ($block) = @_; + + if (!defined $block->request) { + $block->set_value("request", "GET /t"); + } + + my $http_config = $block->http_config // <<_EOC_; + server { + server_name openai; + listen 6724; + + default_type 'application/json'; + + location /v1/chat/completions { + content_by_lua_block { + local json = require("cjson.safe") + ngx.req.read_body() + local body, err = ngx.req.get_body_data() + body, err = json.decode(body) + + if not body.messages or #body.messages < 1 then + ngx.status = 400 + ngx.say([[{ "error": "bad request"}]] ) + return + end + + -- Check if it is a Claude to OpenAI conversion + local is_claude = ngx.req.get_headers()["X-Claude-Test"] + if is_claude == "system" then + if body.messages[1].role == "system" and body.messages[1].content == "You are a bot" then + ngx.status = 200 + ngx.say([[$resp]]) + return + else + ngx.status = 500 + ngx.say("conversion failed") + return + end + elseif is_claude == "system_array" then + if body.messages[1].role == "system" and body.messages[1].content == "Text1Text2" then + ngx.status = 200 + ngx.say([[$resp]]) + return + else + ngx.status = 500 + ngx.say("conversion failed") + return + end + elseif is_claude == "no_system" then + if body.messages[1].role == "user" and body.messages[1].content == "Hello!" then + ngx.status = 200 + ngx.say([[$resp]]) + return + else + ngx.status = 500 + ngx.say("conversion failed") + return + end + elseif is_claude == "upstream_error" then + ngx.status = 401 + ngx.say([[{"error": {"message": "Unauthorized"}}]]) + return + elseif is_claude == "missing_usage" then + ngx.status = 200 + local no_usage_resp = [[{"id":"chatcmpl-123","object":"chat.completion","created":1694268190,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"message":{"role":"assistant","content":"Hello without usage"},"finish_reason":"stop"}]}]] + ngx.say(no_usage_resp) + return + elseif is_claude == "streaming" then + ngx.req.read_body() + ngx.header.content_type = "text/event-stream" + ngx.header.cache_control = "no-cache" + ngx.header.connection = "keep-alive" + + ngx.say('data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"role":"assistant","content":""},"finish_reason":null}]}') + ngx.say('') + ngx.flush(true) + ngx.sleep(0.1) + + ngx.say('data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"content":"Hello"},"finish_reason":null}]}') + ngx.say('') + ngx.flush(true) + ngx.sleep(0.1) + + ngx.say('data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{},"finish_reason":"stop"}],"usage":{"prompt_tokens":10,"completion_tokens":5,"total_tokens":15}}') + ngx.say('') + ngx.flush(true) + ngx.sleep(0.1) + + ngx.say('data: [DONE]') + ngx.say('') + ngx.flush(true) + return + elseif is_claude == "streaming_diff_reason" then + ngx.req.read_body() + ngx.header.content_type = "text/event-stream" + ngx.header.cache_control = "no-cache" + ngx.header.connection = "keep-alive" + + ngx.say('data: {"id":"chatcmpl-124","object":"chat.completion.chunk","created":1694268190,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"role":"assistant","content":""},"finish_reason":null}]}') + ngx.say('') + ngx.flush(true) + ngx.sleep(0.1) + + ngx.say('data: {"id":"chatcmpl-124","object":"chat.completion.chunk","created":1694268190,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"content":"Hello again"},"finish_reason":null}]}') + ngx.say('') + ngx.flush(true) + ngx.sleep(0.1) + + -- Finish reason is length, not stop + ngx.say('data: {"id":"chatcmpl-124","object":"chat.completion.chunk","created":1694268190,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{},"finish_reason":"length"}]}') + ngx.say('') + ngx.flush(true) + ngx.sleep(0.1) + + ngx.say('data: [DONE]') + ngx.say('') + ngx.flush(true) + return + end + + ngx.status = 200 + ngx.say([[$resp]]) + } + } + } +_EOC_ + + $block->set_value("http_config", $http_config); +}); + +run_tests(); + +__DATA__ + +=== TEST 1: setup route +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "uri": "/v1/messages", + "plugins": { + "ai-proxy": { + "provider": "openai", + "auth": { + "header": { + "Authorization": "Bearer token" + } + }, + "override": { + "endpoint": "http://127.0.0.1:6724/v1/chat/completions" + } + } + } + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed + + + +=== TEST 2: Basic Chat Request (Claude -> OpenAI -> Claude) +--- request +POST /v1/messages +{ + "model": "claude-3-5-sonnet", + "max_tokens": 1024, + "system": "You are a bot", + "messages": [ + { + "role": "user", + "content": "Hello!" + } + ] +} +--- more_headers +Authorization: Bearer token +Content-Type: application/json +X-Claude-Test: system +--- error_code: 200 +--- response_body_like eval +qr/"role":"assistant"/ + + + +=== TEST 3: Basic Chat Request with Complex System Prompt (Claude -> OpenAI -> Claude) +--- request +POST /v1/messages +{ + "model": "claude-3-5-sonnet", + "max_tokens": 1024, + "system": [ + { + "type": "text", + "text": "Text1" + }, + { + "type": "text", + "text": "Text2" + } + ], + "messages": [ + { + "role": "user", + "content": "Hello!" + } + ] +} +--- more_headers +Authorization: Bearer token +Content-Type: application/json +X-Claude-Test: system_array +--- error_code: 200 +--- response_body_like eval +qr/"role":"assistant"/ + + + +=== TEST 4: SSE Streaming Test (Claude -> OpenAI -> Claude) +--- config + location /t { + content_by_lua_block { + local http = require("resty.http") + local httpc = http.new() + local core = require("apisix.core") + + local ok, err = httpc:connect({ + scheme = "http", + host = "127.0.0.1", + port = 9080, + }) + + if not ok then + ngx.status = 500 + ngx.say(err) + return + end + + local params = { + method = "POST", + headers = { + ["Content-Type"] = "application/json", + ["X-Claude-Test"] = "streaming", + }, + path = "/v1/messages", + body = [[{ + "model": "claude-3-5-sonnet", + "max_tokens": 1024, + "messages": [ + { "role": "user", "content": "hello" } + ], + "stream": true + }]], + } + + local res, err = httpc:request(params) + if not res then + ngx.status = 500 + ngx.say(err) + return + end + + local final_res = {} + while true do + local chunk, err = res.body_reader() + if err then + break + end + if not chunk then + break + end + core.table.insert_tail(final_res, chunk) + end + + ngx.print(table.concat(final_res, "")) + } + } +--- response_body_like eval +qr/event: message_start/ + + + +=== TEST 5: Abnormal Test - Missing messages +--- request +POST /v1/messages +{ + "model": "claude-3-5-sonnet", + "max_tokens": 1024, + "system": "You are a bot" +} +--- more_headers +Authorization: Bearer token +Content-Type: application/json +--- error_code: 400 +--- response_body_like eval +qr/request format doesn't match/ + + + +=== TEST 6: Basic Chat Request (No System Prompt) +--- request +POST /v1/messages +{ + "model": "claude-3-5-sonnet", + "max_tokens": 1024, + "messages": [ + { + "role": "user", + "content": "Hello!" + } + ] +} +--- more_headers +Authorization: Bearer token +Content-Type: application/json +X-Claude-Test: no_system +--- error_code: 200 +--- response_body_like eval +qr/"role":"assistant"/ + + + +=== TEST 7: Abnormal Test - Empty messages array +--- request +POST /v1/messages +{ + "model": "claude-3-5-sonnet", + "max_tokens": 1024, + "messages": [] +} +--- more_headers +Authorization: Bearer token +Content-Type: application/json +--- error_code: 400 +--- response_body_like eval +qr/request format doesn't match/ + + + +=== TEST 8: Upstream Error Passed Through (e.g., 401 Unauthorized) +--- request +POST /v1/messages +{ + "model": "claude-3-5-sonnet", + "max_tokens": 1024, + "messages": [ + { + "role": "user", + "content": "Hello!" + } + ] +} +--- more_headers +Authorization: Bearer token +Content-Type: application/json +X-Claude-Test: upstream_error +--- error_code: 401 +--- response_body_like eval +qr/Unauthorized/ + + + +=== TEST 9: Response missing usage data +--- request +POST /v1/messages +{ + "model": "claude-3-5-sonnet", + "max_tokens": 1024, + "messages": [ + { + "role": "user", + "content": "Hello!" + } + ] +} +--- more_headers +Authorization: Bearer token +Content-Type: application/json +X-Claude-Test: missing_usage +--- error_code: 200 +--- response_body_like eval +qr/"Hello without usage"/ + + + +=== TEST 10: SSE Streaming with different stop reason (length) +--- config + location /t { + content_by_lua_block { + local http = require("resty.http") + local httpc = http.new() + local core = require("apisix.core") + + local ok, err = httpc:connect({ + scheme = "http", + host = "127.0.0.1", + port = 9080, + }) + + if not ok then + ngx.status = 500 + ngx.say(err) + return + end + + local params = { + method = "POST", + headers = { + ["Content-Type"] = "application/json", + ["X-Claude-Test"] = "streaming_diff_reason", + }, + path = "/v1/messages", + body = [[{ + "model": "claude-3-5-sonnet", + "max_tokens": 1024, + "messages": [ + { "role": "user", "content": "hello" } + ], + "stream": true + }]], + } + + local res, err = httpc:request(params) + if not res then + ngx.status = 500 + ngx.say(err) + return + end + + local final_res = {} + while true do + local chunk, err = res.body_reader() + if err then + break + end + if not chunk then + break + end + core.table.insert_tail(final_res, chunk) + end + + ngx.print(table.concat(final_res, "")) + } + } +--- response_body_like eval +qr/"stop_reason":"length"/ From 6cc4bb0782c63776ce12908fdec9c210a01d4e6e Mon Sep 17 00:00:00 2001 From: Ming Wen Date: Mon, 2 Mar 2026 16:27:52 +0800 Subject: [PATCH 2/3] feat(ai-proxy): improve Claude protocol translation Add full request field mapping, strict SSE sequence handling, and clearer error paths for Claude to OpenAI translation. Tests now cover stop_sequences, temperature/top_p, content arrays, non-text content errors, and full SSE ordering. Documentation includes field mapping and streaming guarantees. --- apisix/plugins/ai-drivers/openai-base.lua | 10 +- .../ai-proxy/converter/claude_to_openai.lua | 266 ++++++++++++------ docs/en/latest/plugins/ai-proxy.md | 23 ++ t/plugin/ai-proxy-claude.t | 193 ++++++++++++- 4 files changed, 401 insertions(+), 91 deletions(-) diff --git a/apisix/plugins/ai-drivers/openai-base.lua b/apisix/plugins/ai-drivers/openai-base.lua index ce9dc8724785..e7d02a123327 100644 --- a/apisix/plugins/ai-drivers/openai-base.lua +++ b/apisix/plugins/ai-drivers/openai-base.lua @@ -65,7 +65,11 @@ function _M.validate_request(ctx) end if ctx.ai_client_protocol == "claude" then - request_table = claude_converter.convert_request(request_table) + local converted, err = claude_converter.convert_request(request_table) + if not converted then + return nil, err + end + request_table = converted end return request_table, nil @@ -172,6 +176,10 @@ local function read_response(conf, ctx, res, response_filter) core.log.warn("invalid response body from ai service: ", raw_res_body, " err: ", err, ", it will cause token usage not available") else + if ctx.ai_client_protocol == "claude" and res.status ~= 200 then + plugin.lua_response_filter(ctx, headers, raw_res_body) + return + end if response_filter then local resp = { headers = headers, diff --git a/apisix/plugins/ai-proxy/converter/claude_to_openai.lua b/apisix/plugins/ai-proxy/converter/claude_to_openai.lua index 258d0b1a1d35..b4fb34f04cdc 100644 --- a/apisix/plugins/ai-proxy/converter/claude_to_openai.lua +++ b/apisix/plugins/ai-proxy/converter/claude_to_openai.lua @@ -6,26 +6,68 @@ local ipairs = ipairs local _M = {} +local function concat_text_blocks(blocks, context) + if type(blocks) ~= "table" then + return nil, "unsupported content type in " .. context + end + + if blocks.type ~= nil then + if blocks.type ~= "text" or type(blocks.text) ~= "string" then + return nil, "unsupported content type in " .. context + end + return blocks.text + end + + local result = {} + for _, block in ipairs(blocks) do + if type(block) ~= "table" or block.type ~= "text" or type(block.text) ~= "string" then + return nil, "unsupported content type in " .. context + end + core.table.insert(result, block.text) + end + + return table.concat(result, "") +end + +local function normalize_stop_sequences(stop_sequences) + if type(stop_sequences) == "string" then + return stop_sequences + end + + if type(stop_sequences) == "table" then + local stops = {} + for _, item in ipairs(stop_sequences) do + if type(item) ~= "string" then + return nil, "request format doesn't match: stop_sequences must be string array" + end + core.table.insert(stops, item) + end + return stops + end + + return nil, "request format doesn't match: stop_sequences must be string or array" +end + function _M.convert_request(request_table) local openai_req = core.table.clone(request_table) - + + if type(openai_req.messages) ~= "table" or #openai_req.messages == 0 then + return nil, "request format doesn't match: messages is required" + end + if openai_req.system then local system_content if type(openai_req.system) == "string" then system_content = openai_req.system - elseif type(openai_req.system) == "table" then - system_content = "" - for _, block in ipairs(openai_req.system) do - if type(block) == "table" and block.type == "text" then - system_content = system_content .. block.text - end + else + local err + system_content, err = concat_text_blocks(openai_req.system, "system") + if err then + return nil, err end end - + if system_content and system_content ~= "" then - if not openai_req.messages then - openai_req.messages = {} - end core.table.insert(openai_req.messages, 1, { role = "system", content = system_content @@ -34,29 +76,64 @@ function _M.convert_request(request_table) openai_req.system = nil end + for _, message in ipairs(openai_req.messages) do + if type(message) == "table" and message.content ~= nil then + if type(message.content) == "table" then + local merged, err = concat_text_blocks(message.content, "messages") + if err then + return nil, err + end + message.content = merged + elseif type(message.content) ~= "string" then + return nil, "unsupported content type in messages" + end + end + end + + if openai_req.stop_sequences ~= nil then + local stop, err = normalize_stop_sequences(openai_req.stop_sequences) + if err then + return nil, err + end + openai_req.stop = stop + openai_req.stop_sequences = nil + end + + if openai_req.temperature ~= nil and type(openai_req.temperature) ~= "number" then + return nil, "request format doesn't match: temperature must be number" + end + + if openai_req.top_p ~= nil and type(openai_req.top_p) ~= "number" then + return nil, "request format doesn't match: top_p must be number" + end + return openai_req end function _M.convert_response(openai_res) local content = "" local finish_reason = "end_turn" - + if openai_res.choices and openai_res.choices[1] then if openai_res.choices[1].message then content = openai_res.choices[1].message.content or "" end - if openai_res.choices[1].finish_reason ~= "stop" and openai_res.choices[1].finish_reason ~= nil then - finish_reason = openai_res.choices[1].finish_reason + if openai_res.choices[1].finish_reason ~= nil then + if openai_res.choices[1].finish_reason == "stop" then + finish_reason = "end_turn" + else + finish_reason = openai_res.choices[1].finish_reason + end end end - + local input_tokens = 0 local output_tokens = 0 if openai_res.usage then input_tokens = openai_res.usage.prompt_tokens or 0 output_tokens = openai_res.usage.completion_tokens or 0 end - + return { id = openai_res.id or "msg_unknown", type = "message", @@ -82,95 +159,114 @@ function _M.convert_sse_events(ctx, chunk) if not events or #events == 0 then return chunk end - + local out_events = {} - + + local function emit_message_start(data) + if ctx.claude_sse_started then + return + end + ctx.claude_sse_started = true + core.table.insert(out_events, "event: message_start\ndata: " .. core.json.encode({ + type = "message_start", + message = { + id = data and data.id or "msg_unknown", + type = "message", + role = "assistant", + model = data and data.model or "unknown", + content = {}, + stop_reason = core.json.null, + stop_sequence = core.json.null, + usage = { input_tokens = 0, output_tokens = 0 } + } + }) .. "\n\n") + + core.table.insert(out_events, "event: content_block_start\ndata: " .. core.json.encode({ + type = "content_block_start", + index = 0, + content_block = { type = "text", text = "" } + }) .. "\n\n") + end + + local function emit_content_block_stop() + if ctx.claude_content_block_stopped then + return + end + ctx.claude_content_block_stopped = true + core.table.insert(out_events, "event: content_block_stop\ndata: " .. core.json.encode({ + type = "content_block_stop", + index = 0 + }) .. "\n\n") + end + + local function emit_message_delta(output_tokens) + if ctx.claude_message_delta_emitted then + return + end + ctx.claude_message_delta_emitted = true + core.table.insert(out_events, "event: message_delta\ndata: " .. core.json.encode({ + type = "message_delta", + delta = { + stop_reason = ctx.claude_stop_reason or "end_turn", + stop_sequence = core.json.null + }, + usage = { + output_tokens = output_tokens or 0 + } + }) .. "\n\n") + end + for _, event in ipairs(events) do if event.type == "message" and event.data ~= "[DONE]" then local data, err = core.json.decode(event.data) - if data then - if not ctx.claude_sse_started then - ctx.claude_sse_started = true - core.table.insert(out_events, "event: message_start\ndata: " .. core.json.encode({ - type = "message_start", - message = { - id = data.id or "msg_unknown", - type = "message", - role = "assistant", - model = data.model or "unknown", - content = {}, - stop_reason = core.json.null, - stop_sequence = core.json.null, - usage = { input_tokens = 0, output_tokens = 0 } - } - }) .. "\n\n") - - core.table.insert(out_events, "event: content_block_start\ndata: " .. core.json.encode({ - type = "content_block_start", + if not data then + core.log.warn("failed to decode SSE data: ", err) + return chunk + end + + emit_message_start(data) + + if data.choices and data.choices[1] then + local choice = data.choices[1] + if choice.delta and choice.delta.content and choice.delta.content ~= "" then + core.table.insert(out_events, "event: content_block_delta\ndata: " .. core.json.encode({ + type = "content_block_delta", index = 0, - content_block = { type = "text", text = "" } + delta = { type = "text_delta", text = choice.delta.content } }) .. "\n\n") end - - if data.choices and data.choices[1] then - local choice = data.choices[1] - if choice.delta and choice.delta.content and choice.delta.content ~= "" then - core.table.insert(out_events, "event: content_block_delta\ndata: " .. core.json.encode({ - type = "content_block_delta", - index = 0, - delta = { type = "text_delta", text = choice.delta.content } - }) .. "\n\n") - end - - if choice.finish_reason and choice.finish_reason ~= core.json.null then - local stop_reason = choice.finish_reason == "stop" and "end_turn" or choice.finish_reason - ctx.claude_stop_reason = stop_reason - - core.table.insert(out_events, "event: content_block_stop\ndata: " .. core.json.encode({ - type = "content_block_stop", - index = 0 - }) .. "\n\n") + + if choice.finish_reason and choice.finish_reason ~= core.json.null then + if choice.finish_reason == "stop" then + ctx.claude_stop_reason = "end_turn" + else + ctx.claude_stop_reason = choice.finish_reason end - end - - if data.usage and type(data.usage) == "table" then - core.table.insert(out_events, "event: message_delta\ndata: " .. core.json.encode({ - type = "message_delta", - delta = { - stop_reason = ctx.claude_stop_reason or "end_turn", - stop_sequence = core.json.null - }, - usage = { - output_tokens = data.usage.completion_tokens or 0 - } - }) .. "\n\n") - - ctx.claude_message_delta_emitted = true + emit_content_block_stop() end end + + if data.usage and type(data.usage) == "table" then + ctx.claude_pending_output_tokens = data.usage.completion_tokens or 0 + end elseif event.type == "message" and event.data == "[DONE]" then - if not ctx.claude_message_delta_emitted and ctx.claude_stop_reason then - core.table.insert(out_events, "event: message_delta\ndata: " .. core.json.encode({ - type = "message_delta", - delta = { - stop_reason = ctx.claude_stop_reason, - stop_sequence = core.json.null - }, - usage = { output_tokens = 0 } - }) .. "\n\n") + emit_message_start(nil) + if not ctx.claude_content_block_stopped then + ctx.claude_stop_reason = ctx.claude_stop_reason or "end_turn" + emit_content_block_stop() end - + emit_message_delta(ctx.claude_pending_output_tokens or 0) core.table.insert(out_events, "event: message_stop\ndata: " .. core.json.encode({ type = "message_stop" }) .. "\n\n") end end - + if #out_events > 0 then return table.concat(out_events, "") - else - return "" end + + return chunk end return _M diff --git a/docs/en/latest/plugins/ai-proxy.md b/docs/en/latest/plugins/ai-proxy.md index 256cedc73274..4e96fdf709c3 100644 --- a/docs/en/latest/plugins/ai-proxy.md +++ b/docs/en/latest/plugins/ai-proxy.md @@ -41,6 +41,21 @@ The `ai-proxy` Plugin simplifies access to LLM and embedding models by transform The `ai-proxy` plugin supports automatic protocol detection and translation from the Claude API format to the OpenAI API format. If a client sends a request to a route ending with `/v1/messages` (the standard Claude API endpoint), the plugin will automatically convert the request from Claude's format to OpenAI's format, send it to the upstream OpenAI-compatible service, and translate the response back to Claude's format. This is particularly useful when using AI tools or extensions that strictly require the Claude API, allowing them to work seamlessly with an OpenAI-compatible backend. +#### Field Mapping + +The following table shows how the plugin maps fields when translating Claude requests into OpenAI-compatible requests: + +| Claude Field | OpenAI Field | Notes | +| --- | --- | --- | +| `model` | `model` | Pass-through | +| `max_tokens` | `max_tokens` | Pass-through | +| `temperature` | `temperature` | Pass-through | +| `top_p` | `top_p` | Pass-through | +| `stop_sequences` | `stop` | Supports string or string array | +| `system` | `messages[1]` | `system` is prepended as a `role=system` message | + +For `system` or `messages[].content` arrays, only `type: text` blocks are supported. Any non-text block results in a `400` error. + #### Architecture ```mermaid @@ -57,6 +72,14 @@ sequenceDiagram APISIX-->>Client: Response (Claude format) ``` +#### Streaming Guarantees + +When streaming responses, the plugin emits the Claude SSE event sequence in strict order: + +`message_start` → `content_block_start` → `content_block_delta`* → `content_block_stop` → `message_delta` → `message_stop` + +If the upstream does not include token usage in the stream, the plugin still emits `message_delta` with `output_tokens` set to `0`. + In addition, the Plugin also supports logging LLM request information in the access log, such as token usage, model, time to the first response, and more. diff --git a/t/plugin/ai-proxy-claude.t b/t/plugin/ai-proxy-claude.t index 1b4f2b3dca22..e61cfea0fcd9 100644 --- a/t/plugin/ai-proxy-claude.t +++ b/t/plugin/ai-proxy-claude.t @@ -73,6 +73,46 @@ add_block_preprocessor(sub { ngx.status = 401 ngx.say([[{"error": {"message": "Unauthorized"}}]]) return + elseif is_claude == "stop_sequences_string" then + if body.stop == "STOP" then + ngx.status = 200 + ngx.say([[$resp]]) + return + else + ngx.status = 500 + ngx.say("conversion failed") + return + end + elseif is_claude == "stop_sequences_array" then + if type(body.stop) == "table" and body.stop[1] == "STOP1" and body.stop[2] == "STOP2" then + ngx.status = 200 + ngx.say([[$resp]]) + return + else + ngx.status = 500 + ngx.say("conversion failed") + return + end + elseif is_claude == "temperature_top_p" then + if body.temperature == 0.2 and body.top_p == 0.9 then + ngx.status = 200 + ngx.say([[$resp]]) + return + else + ngx.status = 500 + ngx.say("conversion failed") + return + end + elseif is_claude == "content_array" then + if body.messages[1].content == "HelloWorld" then + ngx.status = 200 + ngx.say([[$resp]]) + return + else + ngx.status = 500 + ngx.say("conversion failed") + return + end elseif is_claude == "missing_usage" then ngx.status = 200 local no_usage_resp = [[{"id":"chatcmpl-123","object":"chat.completion","created":1694268190,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"message":{"role":"assistant","content":"Hello without usage"},"finish_reason":"stop"}]}]] @@ -298,7 +338,17 @@ qr/"role":"assistant"/ } } --- response_body_like eval -qr/event: message_start/ +qr/event: message_start/[0] +--- response_body_like eval +qr/event: content_block_start/[1] +--- response_body_like eval +qr/event: content_block_delta/[2] +--- response_body_like eval +qr/event: content_block_stop/[3] +--- response_body_like eval +qr/event: message_delta/[4] +--- response_body_like eval +qr/event: message_stop/[5] @@ -359,7 +409,130 @@ qr/request format doesn't match/ -=== TEST 8: Upstream Error Passed Through (e.g., 401 Unauthorized) +=== TEST 8: stop_sequences string maps to stop +--- request +POST /v1/messages +{ + "model": "claude-3-5-sonnet", + "max_tokens": 1024, + "stop_sequences": "STOP", + "messages": [ + { + "role": "user", + "content": "Hello!" + } + ] +} +--- more_headers +Authorization: Bearer token +Content-Type: application/json +X-Claude-Test: stop_sequences_string +--- error_code: 200 +--- response_body_like eval +qr/"role":"assistant"/ + + + +=== TEST 9: stop_sequences array maps to stop +--- request +POST /v1/messages +{ + "model": "claude-3-5-sonnet", + "max_tokens": 1024, + "stop_sequences": ["STOP1", "STOP2"], + "messages": [ + { + "role": "user", + "content": "Hello!" + } + ] +} +--- more_headers +Authorization: Bearer token +Content-Type: application/json +X-Claude-Test: stop_sequences_array +--- error_code: 200 +--- response_body_like eval +qr/"role":"assistant"/ + + + +=== TEST 10: temperature and top_p passthrough +--- request +POST /v1/messages +{ + "model": "claude-3-5-sonnet", + "max_tokens": 1024, + "temperature": 0.2, + "top_p": 0.9, + "messages": [ + { + "role": "user", + "content": "Hello!" + } + ] +} +--- more_headers +Authorization: Bearer token +Content-Type: application/json +X-Claude-Test: temperature_top_p +--- error_code: 200 +--- response_body_like eval +qr/"role":"assistant"/ + + + +=== TEST 11: messages content as array of text blocks +--- request +POST /v1/messages +{ + "model": "claude-3-5-sonnet", + "max_tokens": 1024, + "messages": [ + { + "role": "user", + "content": [ + { "type": "text", "text": "Hello" }, + { "type": "text", "text": "World" } + ] + } + ] +} +--- more_headers +Authorization: Bearer token +Content-Type: application/json +X-Claude-Test: content_array +--- error_code: 200 +--- response_body_like eval +qr/"role":"assistant"/ + + + +=== TEST 12: Abnormal Test - non-text content in messages +--- request +POST /v1/messages +{ + "model": "claude-3-5-sonnet", + "max_tokens": 1024, + "messages": [ + { + "role": "user", + "content": [ + { "type": "image", "text": "bad" } + ] + } + ] +} +--- more_headers +Authorization: Bearer token +Content-Type: application/json +--- error_code: 400 +--- response_body_like eval +qr/unsupported content type/ + + + +=== TEST 13: Upstream Error Passed Through (e.g., 401 Unauthorized) --- request POST /v1/messages { @@ -382,7 +555,7 @@ qr/Unauthorized/ -=== TEST 9: Response missing usage data +=== TEST 14: Response missing usage data --- request POST /v1/messages { @@ -405,7 +578,7 @@ qr/"Hello without usage"/ -=== TEST 10: SSE Streaming with different stop reason (length) +=== TEST 15: SSE Streaming with different stop reason (length) --- config location /t { content_by_lua_block { @@ -465,4 +638,14 @@ qr/"Hello without usage"/ } } --- response_body_like eval -qr/"stop_reason":"length"/ +qr/event: message_start/[0] +--- response_body_like eval +qr/event: content_block_start/[1] +--- response_body_like eval +qr/event: content_block_delta/[2] +--- response_body_like eval +qr/event: content_block_stop/[3] +--- response_body_like eval +qr/"stop_reason":"length"/[4] +--- response_body_like eval +qr/event: message_stop/[5] From ff746e59caae2b08802675c80a2619bda5aa1597 Mon Sep 17 00:00:00 2001 From: Ming Wen Date: Thu, 5 Mar 2026 15:23:35 +0800 Subject: [PATCH 3/3] fix(ai-proxy): address P1/P2 review issues in Claude-to-OpenAI converter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix shallow copy bug: deep-copy messages array to prevent mutation - Fix error path skipping observability: remove early return, let full finalization pipeline run for token tracking/metrics/logging - Fix SSE fail-closed: decode failures emit Claude-format error events - Fix SSE chunk buffering: buffer incomplete chunks across boundaries - Fix SSE [DONE] matching: use event.type=="done" matching sse.decode - Complete stop_reason mapping: length→max_tokens, tool_calls→tool_use - Namespace ctx state: consolidate ctx.claude_* under ctx.claude_state - Convert error responses to Claude format with proper error types - Improve error messages with specific type values - Fix test assertions and add extra_yaml_config --- apisix/plugins/ai-drivers/openai-base.lua | 24 +-- .../ai-proxy/converter/claude_to_openai.lua | 146 ++++++++++++++---- t/plugin/ai-proxy-claude.t | 37 ++--- 3 files changed, 142 insertions(+), 65 deletions(-) diff --git a/apisix/plugins/ai-drivers/openai-base.lua b/apisix/plugins/ai-drivers/openai-base.lua index e7d02a123327..14f6789bae10 100644 --- a/apisix/plugins/ai-drivers/openai-base.lua +++ b/apisix/plugins/ai-drivers/openai-base.lua @@ -157,8 +157,16 @@ local function read_response(conf, ctx, res, response_filter) ::CONTINUE:: end - if ctx.ai_client_protocol == "claude" then chunk = claude_converter.convert_sse_events(ctx, chunk) end + if ctx.ai_client_protocol == "claude" then + local converted = claude_converter.convert_sse_events(ctx, chunk) + if converted then + chunk = converted + else + goto NEXT_CHUNK + end + end plugin.lua_response_filter(ctx, res.headers, chunk) + ::NEXT_CHUNK:: end end @@ -176,10 +184,6 @@ local function read_response(conf, ctx, res, response_filter) core.log.warn("invalid response body from ai service: ", raw_res_body, " err: ", err, ", it will cause token usage not available") else - if ctx.ai_client_protocol == "claude" and res.status ~= 200 then - plugin.lua_response_filter(ctx, headers, raw_res_body) - return - end if response_filter then local resp = { headers = headers, @@ -223,10 +227,12 @@ local function read_response(conf, ctx, res, response_filter) ctx.var.llm_response_text = content_to_check end end - if ctx.ai_client_protocol == "claude" and res.status == 200 then - local res_body_tab = core.json.decode(raw_res_body) - if res_body_tab then - raw_res_body = core.json.encode(claude_converter.convert_response(res_body_tab)) + if ctx.ai_client_protocol == "claude" and res_body then + if res.status == 200 then + raw_res_body = core.json.encode(claude_converter.convert_response(res_body)) + else + raw_res_body = core.json.encode( + claude_converter.convert_error_response(res.status, res_body)) end end plugin.lua_response_filter(ctx, headers, raw_res_body) diff --git a/apisix/plugins/ai-proxy/converter/claude_to_openai.lua b/apisix/plugins/ai-proxy/converter/claude_to_openai.lua index b4fb34f04cdc..9b944c53666a 100644 --- a/apisix/plugins/ai-proxy/converter/claude_to_openai.lua +++ b/apisix/plugins/ai-proxy/converter/claude_to_openai.lua @@ -3,17 +3,43 @@ local type = type local sse = require("apisix.plugins.ai-drivers.sse") local table = table local ipairs = ipairs +local tostring = tostring local _M = {} +-- Map OpenAI finish_reason to Claude stop_reason +local STOP_REASON_MAP = { + stop = "end_turn", + length = "max_tokens", + tool_calls = "tool_use", + content_filter = "end_turn", +} + +local function map_stop_reason(finish_reason) + if not finish_reason or finish_reason == core.json.null then + return "end_turn" + end + return STOP_REASON_MAP[finish_reason] or "end_turn" +end + +-- Deep-copy messages array to avoid mutating the original request table +local function deep_copy_messages(messages) + local copy = core.table.new(#messages, 0) + for i, msg in ipairs(messages) do + copy[i] = core.table.clone(msg) + end + return copy +end + local function concat_text_blocks(blocks, context) if type(blocks) ~= "table" then - return nil, "unsupported content type in " .. context + return nil, "unsupported content type in " .. context .. ": expected table, got " + .. type(blocks) end if blocks.type ~= nil then if blocks.type ~= "text" or type(blocks.text) ~= "string" then - return nil, "unsupported content type in " .. context + return nil, "unsupported content type in " .. context .. ": " .. tostring(blocks.type) end return blocks.text end @@ -21,7 +47,8 @@ local function concat_text_blocks(blocks, context) local result = {} for _, block in ipairs(blocks) do if type(block) ~= "table" or block.type ~= "text" or type(block.text) ~= "string" then - return nil, "unsupported content type in " .. context + local block_type = type(block) == "table" and tostring(block.type) or type(block) + return nil, "unsupported content type in " .. context .. ": " .. block_type end core.table.insert(result, block.text) end @@ -50,6 +77,7 @@ end function _M.convert_request(request_table) local openai_req = core.table.clone(request_table) + openai_req.messages = deep_copy_messages(request_table.messages) if type(openai_req.messages) ~= "table" or #openai_req.messages == 0 then return nil, "request format doesn't match: messages is required" @@ -118,13 +146,7 @@ function _M.convert_response(openai_res) if openai_res.choices[1].message then content = openai_res.choices[1].message.content or "" end - if openai_res.choices[1].finish_reason ~= nil then - if openai_res.choices[1].finish_reason == "stop" then - finish_reason = "end_turn" - else - finish_reason = openai_res.choices[1].finish_reason - end - end + finish_reason = map_stop_reason(openai_res.choices[1].finish_reason) end local input_tokens = 0 @@ -154,19 +176,76 @@ function _M.convert_response(openai_res) } end +local OPENAI_TO_CLAUDE_ERROR_TYPE = { + ["401"] = "authentication_error", + ["403"] = "permission_error", + ["404"] = "not_found_error", + ["429"] = "rate_limit_error", + ["500"] = "api_error", +} + +function _M.convert_error_response(status, openai_body) + local message = "unknown error" + if type(openai_body) == "table" and type(openai_body.error) == "table" then + message = openai_body.error.message or message + elseif type(openai_body) == "table" and openai_body.error then + message = tostring(openai_body.error) + end + + local error_type = OPENAI_TO_CLAUDE_ERROR_TYPE[tostring(status)] or "api_error" + + return { + type = "error", + error = { + type = error_type, + message = message, + } + } +end + +local function get_claude_state(ctx) + if not ctx.claude_state then + ctx.claude_state = { + sse_started = false, + content_block_stopped = false, + message_delta_emitted = false, + stop_reason = nil, + pending_output_tokens = 0, + sse_buffer = "", + } + end + return ctx.claude_state +end + function _M.convert_sse_events(ctx, chunk) - local events = sse.decode(chunk) + local state = get_claude_state(ctx) + + local buffered = state.sse_buffer .. chunk + if not core.string.has_suffix(buffered, "\n\n") then + state.sse_buffer = buffered + return nil + end + state.sse_buffer = "" + + local events = sse.decode(buffered) if not events or #events == 0 then - return chunk + core.log.warn("SSE decode returned no events for buffered chunk") + return "event: error\ndata: " .. core.json.encode({ + type = "error", + error = { + type = "api_error", + message = "failed to decode SSE events", + } + }) .. "\n\n" end local out_events = {} local function emit_message_start(data) - if ctx.claude_sse_started then + if state.sse_started then return end - ctx.claude_sse_started = true + state.sse_started = true core.table.insert(out_events, "event: message_start\ndata: " .. core.json.encode({ type = "message_start", message = { @@ -189,10 +268,10 @@ function _M.convert_sse_events(ctx, chunk) end local function emit_content_block_stop() - if ctx.claude_content_block_stopped then + if state.content_block_stopped then return end - ctx.claude_content_block_stopped = true + state.content_block_stopped = true core.table.insert(out_events, "event: content_block_stop\ndata: " .. core.json.encode({ type = "content_block_stop", index = 0 @@ -200,14 +279,14 @@ function _M.convert_sse_events(ctx, chunk) end local function emit_message_delta(output_tokens) - if ctx.claude_message_delta_emitted then + if state.message_delta_emitted then return end - ctx.claude_message_delta_emitted = true + state.message_delta_emitted = true core.table.insert(out_events, "event: message_delta\ndata: " .. core.json.encode({ type = "message_delta", delta = { - stop_reason = ctx.claude_stop_reason or "end_turn", + stop_reason = state.stop_reason or "end_turn", stop_sequence = core.json.null }, usage = { @@ -221,7 +300,14 @@ function _M.convert_sse_events(ctx, chunk) local data, err = core.json.decode(event.data) if not data then core.log.warn("failed to decode SSE data: ", err) - return chunk + core.table.insert(out_events, "event: error\ndata: " .. core.json.encode({ + type = "error", + error = { + type = "api_error", + message = "failed to decode upstream SSE event", + } + }) .. "\n\n") + goto CONTINUE end emit_message_start(data) @@ -237,36 +323,34 @@ function _M.convert_sse_events(ctx, chunk) end if choice.finish_reason and choice.finish_reason ~= core.json.null then - if choice.finish_reason == "stop" then - ctx.claude_stop_reason = "end_turn" - else - ctx.claude_stop_reason = choice.finish_reason - end + state.stop_reason = map_stop_reason(choice.finish_reason) emit_content_block_stop() end end if data.usage and type(data.usage) == "table" then - ctx.claude_pending_output_tokens = data.usage.completion_tokens or 0 + state.pending_output_tokens = data.usage.completion_tokens or 0 end - elseif event.type == "message" and event.data == "[DONE]" then + elseif event.type == "done" then emit_message_start(nil) - if not ctx.claude_content_block_stopped then - ctx.claude_stop_reason = ctx.claude_stop_reason or "end_turn" + if not state.content_block_stopped then + state.stop_reason = state.stop_reason or "end_turn" emit_content_block_stop() end - emit_message_delta(ctx.claude_pending_output_tokens or 0) + emit_message_delta(state.pending_output_tokens or 0) core.table.insert(out_events, "event: message_stop\ndata: " .. core.json.encode({ type = "message_stop" }) .. "\n\n") end + + ::CONTINUE:: end if #out_events > 0 then return table.concat(out_events, "") end - return chunk + return nil end return _M diff --git a/t/plugin/ai-proxy-claude.t b/t/plugin/ai-proxy-claude.t index e61cfea0fcd9..a89020b3585f 100644 --- a/t/plugin/ai-proxy-claude.t +++ b/t/plugin/ai-proxy-claude.t @@ -17,6 +17,13 @@ add_block_preprocessor(sub { $block->set_value("request", "GET /t"); } + my $user_yaml_config = <<_EOC_; +plugins: + - ai-proxy + - prometheus +_EOC_ + $block->set_value("extra_yaml_config", $user_yaml_config); + my $http_config = $block->http_config // <<_EOC_; server { server_name openai; @@ -338,17 +345,7 @@ qr/"role":"assistant"/ } } --- response_body_like eval -qr/event: message_start/[0] ---- response_body_like eval -qr/event: content_block_start/[1] ---- response_body_like eval -qr/event: content_block_delta/[2] ---- response_body_like eval -qr/event: content_block_stop/[3] ---- response_body_like eval -qr/event: message_delta/[4] ---- response_body_like eval -qr/event: message_stop/[5] +qr/(?s)event: message_start.*event: content_block_start.*event: content_block_delta.*event: content_block_stop.*event: message_delta.*event: message_stop/ @@ -528,11 +525,11 @@ Authorization: Bearer token Content-Type: application/json --- error_code: 400 --- response_body_like eval -qr/unsupported content type/ +qr/unsupported content type in messages: image/ -=== TEST 13: Upstream Error Passed Through (e.g., 401 Unauthorized) +=== TEST 13: Upstream Error Passed Through in Claude format (e.g., 401 Unauthorized) --- request POST /v1/messages { @@ -551,7 +548,7 @@ Content-Type: application/json X-Claude-Test: upstream_error --- error_code: 401 --- response_body_like eval -qr/Unauthorized/ +qr/"type":"error".*"authentication_error".*Unauthorized/ @@ -638,14 +635,4 @@ qr/"Hello without usage"/ } } --- response_body_like eval -qr/event: message_start/[0] ---- response_body_like eval -qr/event: content_block_start/[1] ---- response_body_like eval -qr/event: content_block_delta/[2] ---- response_body_like eval -qr/event: content_block_stop/[3] ---- response_body_like eval -qr/"stop_reason":"length"/[4] ---- response_body_like eval -qr/event: message_stop/[5] +qr/(?s)event: message_start.*event: content_block_start.*event: content_block_delta.*event: content_block_stop.*"stop_reason":"max_tokens".*event: message_stop/