Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ venv.bak/
# Performance test results
tests/performance/results/
.coverage*
coverage*

# Local test script
tools/tmp/*
49 changes: 49 additions & 0 deletions app/api/routes/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
EmbeddingsRequest,
ImageEditsRequest,
ImageGenerationRequest,
ResponsesRequest,
)
from app.core.async_cache import forge_scope_cache_async, get_forge_scope_cache_async
from app.core.database import get_async_db
Expand Down Expand Up @@ -283,3 +284,51 @@ async def create_embeddings(
raise HTTPException(
status_code=500, detail=f"Error processing request: {str(err)}"
) from err

@router.post("/responses")
async def create_responses(
request: Request,
responses_request: ResponsesRequest,
user_details: dict[str, Any] = Depends(get_user_details_by_api_key),
db: AsyncSession = Depends(get_async_db),
) -> Any:
"""
Create a response (OpenAI-compatible endpoint).
"""
try:
user = user_details["user"]
api_key_id = user_details["api_key_id"]
provider_service = await ProviderService.async_get_instance(user, db, api_key_id=api_key_id)
allowed_provider_names = await _get_allowed_provider_names(request, db)

response = await provider_service.process_request(
"responses",
responses_request.model_dump(mode="json", exclude_unset=True),
allowed_provider_names=allowed_provider_names,
)

# Check if it's a streaming response
if inspect.isasyncgen(response):
headers = {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Prevent Nginx buffering
}

return StreamingResponse(
response, media_type="text/event-stream", headers=headers
)

# Otherwise, return the JSON response directly
return response
except NotImplementedError as err:
raise HTTPException(
status_code=404, detail=f"Error processing request: {str(err)}"
) from err
except ValueError as err:
raise HTTPException(status_code=400, detail=str(err)) from err
except Exception as err:
raise HTTPException(
status_code=500, detail=f"Error processing request: {str(err)}"
) from err
199 changes: 199 additions & 0 deletions app/api/schemas/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,3 +252,202 @@ class EmbeddingsRequest(BaseModel):
encoding_format: str | None = 'float'
# inpput_type is for cohere embeddings only
input_type: str | None = 'search_document'


# ---------------------------------------------------------------------------
# OpenAI Responses Request
# https://platform.openai.com/docs/api-reference/responses/create
# ---------------------------------------------------------------------------
class ResponsesInputTextItem(BaseModel):
text: str
type: str # always input_text

class ResponsesInputImageItem(BaseModel):
detail: str | None = 'auto'
type: str # always input_image
file_id: str | None = None
image_url: str | None = None

class ResponsesInputFileItem(BaseModel):
type: str # always input_file
file_data: str | None = None
file_id: str | None = None
file_url: str | None = None
filename: str | None = None

class ResponsesInputAudioItem(BaseModel):
input_audio: object
type: str # always input_audio

class ResponsesInputMessageItem(BaseModel):
role: str
type: str | None = None
content: str | list[ResponsesInputTextItem | ResponsesInputImageItem | ResponsesInputFileItem | ResponsesInputAudioItem]


class ResponsesItemInputMessage(BaseModel):
role: str
content: list[ResponsesInputTextItem | ResponsesInputImageItem | ResponsesInputFileItem | ResponsesInputAudioItem]
status: str | None = None
type: str | None = None

class ResponsesItemOutputMessage(BaseModel):
content: list[object]
id: str
role: str
status: str
type: str

class ResponsesItemFileSearchToolCall(BaseModel):
id: str
query: str
status: str
type: str
results: list[object]

class ResponsesItemComputerToolCall(BaseModel):
action: object
call_id: str
id: str
pending_safety_checks: list[object]
status: str
type: str

class ResponsesItemComputerToolCallOutput(BaseModel):
call_id: str
output: object
type: str
acknowledged_safety_checks: list[object] | None = None
id: str | None = None
status: str | None = None

class ResponsesItemWebSearchToolCall(BaseModel):
action: object
id: str
status: str
type: str

class ResponsesItemFunctionToolCall(BaseModel):
arguments: str
call_id: str
name: str
type: str
id: str | None = None
status: str | None = None

class ResponsesItemFunctionToolCallOutput(BaseModel):
call_id: str
output: str | list[object]
type: str
id: str | None = None
status: str | None = None

class ResponsesItemReasoning(BaseModel):
id: str
summary: list[object]
type: str
content: list[object] | None = None
encrypted_content: str | None = None
status: str | None = None

class ResponsesItemImageGenerationCall(BaseModel):
id: str
result: str
status: str
type: str

class ResponsesItemCodeInterpreterToolCall(BaseModel):
code: str
container_id: str
id: str
outputs: list[object]
status: str
type: str

class ResponsesItemLocalShellCall(BaseModel):
action: object
call_id: str
id: str
status: str
type: str

class ResponsesItemLocalShellCallOutput(BaseModel):
id: str
output: str
type: str
status: str | None = None

class ResponsesItemMCPListTools(BaseModel):
id: str
server_label: str
tools: list[object]
type: str
error: str | None = None

class ResponsesItemMCPApprovalRequest(BaseModel):
arguments: str
id: str
name: str
server_label: str
type: str

class ResponsesItemMCPApprovalResponse(BaseModel):
approval_request_id: str
approve: bool
type: str
id: str | None = None
reason: str | None = None

class ResponsesItemMCPToolCall(BaseModel):
arguments: str
id: str
name: str
server_label: str
type: str
error: str | None = None
output: str | None = None

class ResponsesItemCustomToolCallOutput(BaseModel):
call_id: str
output: str | list[object]
type: str
id: str | None = None

class ResponsesItemCustomToolCall(BaseModel):
call_id: str
input: str
name: str
type: str
id: str | None = None

class ResponsesItemReference(BaseModel):
id: str
type: str

class ResponsesRequest(BaseModel):
background: bool | None = False
conversation: str | object | None = None
include: list[Any] | None = None
input: str | list[ResponsesInputMessageItem | ResponsesItemReference | ResponsesItemInputMessage | ResponsesItemFileSearchToolCall | ResponsesItemComputerToolCall | ResponsesItemWebSearchToolCall | ResponsesItemFunctionToolCall | ResponsesItemReasoning | ResponsesItemImageGenerationCall | ResponsesItemCodeInterpreterToolCall | ResponsesItemLocalShellCall | ResponsesItemMCPListTools | ResponsesItemMCPApprovalRequest | ResponsesItemMCPApprovalResponse | ResponsesItemMCPToolCall | ResponsesItemCustomToolCallOutput | ResponsesItemCustomToolCall] | None = None
instructions: str | None = None
max_output_tokens: int | None = None
max_tool_calls: int | None = None
metadata: dict[Any, Any] | None = None
model: str | None = None
parallel_tool_calls: bool | None = True
previous_response_id: str | None = None
prompt: object | None = None
prompt_cache_key: str | None = None
reasoning: object | None = None
safety_identifier: str | None = None
service_tier: str | None = 'auto'
store: bool | None = True
stream: bool | None = False
stream_options: object | None = None
temperature: float | None = 1.0
text: object | None = None
tool_choice: str | object | None = None
tools: list[Any] | None = None
top_logprobs: int | None = None
top_p: float | None = 1.0
truncation: str | None = 'disabled'
6 changes: 6 additions & 0 deletions app/services/provider_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,12 @@ async def process_request(
payload,
api_key,
)
elif "responses" == endpoint:
result = await adapter.process_responses(
endpoint,
payload,
api_key,
)
elif "images/generations" in endpoint:
# TODO: we only support openai for now
if provider_name != "openai":
Expand Down
21 changes: 21 additions & 0 deletions app/services/providers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,27 @@ async def process_completion(
) -> Any:
"""Process a completion request"""
pass

async def process_responses(
self,
endpoint: str,
payload: dict[str, Any],
api_key: str,
base_url: str | None = None,
) -> Any:
"""Process a response request"""
# TODO: currently it's openai only
raise NotImplementedError("Process response is not implemented")

async def process_conversations(
self,
endpoint: str,
payload: dict[str, Any],
api_key: str,
base_url: str | None = None,
) -> Any:
"""Process a conversations request"""
raise NotImplementedError("Process conversations is not implemented")

@abstractmethod
async def process_embeddings(
Expand Down
65 changes: 65 additions & 0 deletions app/services/providers/openai_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,3 +352,68 @@ async def process_embeddings(
"usage": total_usage,
}
return final_response

async def process_responses(
self,
endpoint: str,
payload: dict[str, Any],
api_key: str,
base_url: str | None = None,
) -> Any:
"""Process a response request using OpenAI API"""
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}

url = f"{base_url or self._base_url}/{endpoint}"

# Check if streaming is requested
streaming = payload.get("stream", False)
if streaming:
# For streaming, return a streaming generator
async def stream_response() -> AsyncGenerator[bytes, None]:
async with (
aiohttp.ClientSession() as session,
session.post(
url, headers=headers, json=payload
) as response,
):
if response.status != HTTPStatus.OK:
error_text = await response.text()
logger.error(
f"Responses Streaming API error for {self.provider_name}: {error_text}"
)
raise ProviderAPIException(
provider_name=self.provider_name,
error_code=response.status,
error_message=error_text,
)

# Stream the response back
async for chunk in response.content:
if chunk:
yield chunk

# Return the streaming generator
return stream_response()
else:
# For non-streaming, use the regular approach
async with (
aiohttp.ClientSession() as session,
session.post(
url, headers=headers, json=payload
) as response,
):
if response.status != HTTPStatus.OK:
error_text = await response.text()
logger.error(
f"Responses API error for {self.provider_name}: {error_text}"
)
raise ProviderAPIException(
provider_name=self.provider_name,
error_code=response.status,
error_message=error_text,
)

return await response.json()
Loading
Loading