Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions IMPLEMENTATION_PLAN.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# WebSocket Mode Implementation Plan for OpenAI Responses API

## Overview
Implement WebSocket support for LocalAI's OpenAI API-compatible Responses endpoint, enabling persistent WebSocket connections for long-running, tool-call-heavy agentic workflows.

## Technical Requirements

### 1. WebSocket Endpoint
- **Endpoint**: `ws://<host>:<port>/v1/responses`
- **Upgrade**: HTTP upgrade from POST /v1/responses when `Upgrade: websocket` header is present

### 2. Message Types (Client → Server)

#### response.create (Initial Turn)
```json
{
"type": "response.create",
"model": "gpt-4o",
"store": false,
"input": [...],
"tools": []
}
```

#### response.create with Continuation (Subsequent Turns)
```json
{
"type": "response.create",
"model": "gpt-4o",
"store": false,
"previous_response_id": "resp_123",
"input": [...],
"tools": []
}
```

### 3. Response Events (Server → Client)

1. **response.created** - Response object created
2. **response.progress** - Incremental output
3. **response.function_call_arguments.delta** - Streaming function arguments
4. **response.function_call_arguments.done** - Function call complete
5. **response.done** - Final response

### 4. Connection Management
- Track active connections with 60-minute timeout
- Connection-local cache for responses (when store=false)
- One in-flight response at a time per connection

### 5. Error Handling
- `previous_response_not_found` (400)
- `websocket_connection_limit_reached` (400)

## Implementation Steps

### Step 1: Add WebSocket Schema Types
- Add WebSocket message types to `core/schema/openresponses.go`
- Add connection-related types

### Step 2: Add WebSocket Route
- Modify `core/http/routes/openresponses.go` to handle WebSocket upgrade
- Add GET /v1/responses WebSocket endpoint

### Step 3: Create WebSocket Handler
- Create `core/http/endpoints/openresponses/websocket.go`
- Implement connection handling
- Implement message parsing
- Implement event streaming

### Step 4: Add Connection Store
- Implement connection management in store
- Add 60-minute timeout
- Add connection-local cache

## Files to Modify/Create
1. `core/schema/openresponses.go` - Add WebSocket types
2. `core/http/routes/openresponses.go` - Add WebSocket route
3. `core/http/endpoints/openresponses/websocket.go` - New WebSocket handler (create)
4. `core/http/endpoints/openresponses/store.go` - Add connection management
6 changes: 6 additions & 0 deletions core/cli/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ type RunCMD struct {
BackendGalleries string `env:"LOCALAI_BACKEND_GALLERIES,BACKEND_GALLERIES" help:"JSON list of backend galleries" group:"backends" default:"${backends}"`
Galleries string `env:"LOCALAI_GALLERIES,GALLERIES" help:"JSON list of galleries" group:"models" default:"${galleries}"`
AutoloadGalleries bool `env:"LOCALAI_AUTOLOAD_GALLERIES,AUTOLOAD_GALLERIES" group:"models" default:"true"`
BackendImagesReleaseTag string `env:"LOCALAI_BACKEND_IMAGES_RELEASE_TAG,BACKEND_IMAGES_RELEASE_TAG" help:"Fallback release tag for backend images" group:"backends" default:"latest"`
BackendImagesBranchTag string `env:"LOCALAI_BACKEND_IMAGES_BRANCH_TAG,BACKEND_IMAGES_BRANCH_TAG" help:"Fallback branch tag for backend images" group:"backends" default:"master"`
BackendDevSuffix string `env:"LOCALAI_BACKEND_DEV_SUFFIX,BACKEND_DEV_SUFFIX" help:"Development suffix for backend images" group:"backends" default:"development"`
AutoloadBackendGalleries bool `env:"LOCALAI_AUTOLOAD_BACKEND_GALLERIES,AUTOLOAD_BACKEND_GALLERIES" group:"backends" default:"true"`
PreloadModels string `env:"LOCALAI_PRELOAD_MODELS,PRELOAD_MODELS" help:"A List of models to apply in JSON at start" group:"models"`
Models []string `env:"LOCALAI_MODELS,MODELS" help:"A List of model configuration URLs to load" group:"models"`
Expand Down Expand Up @@ -102,6 +105,9 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
system.WithBackendSystemPath(r.BackendsSystemPath),
system.WithModelPath(r.ModelsPath),
system.WithBackendPath(r.BackendsPath),
system.WithBackendImagesReleaseTag(r.BackendImagesReleaseTag),
system.WithBackendImagesBranchTag(r.BackendImagesBranchTag),
system.WithBackendDevSuffix(r.BackendDevSuffix),
)
if err != nil {
return err
Expand Down
41 changes: 18 additions & 23 deletions core/gallery/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"time"
Expand All @@ -25,33 +24,29 @@ const (
runFile = "run.sh"
)

// Environment variables for configurable fallback URI patterns
// Default fallback tag values
const (
// Default fallback tag values
defaultLatestTag = "latest"
defaultMasterTag = "master"
defaultDevSuffix = "development"

// Environment variable names
envLatestTag = "LOCALAI_BACKEND_IMAGES_RELEASE_TAG"
envMasterTag = "LOCALAI_BACKEND_IMAGES_BRANCH_TAG"
envDevSuffix = "LOCALAI_BACKEND_DEV_SUFFIX"
)

// getFallbackTagValues returns the configurable fallback tag values from environment variables
func getFallbackTagValues() (latestTag, masterTag, devSuffix string) {
latestTag = os.Getenv(envLatestTag)
masterTag = os.Getenv(envMasterTag)
devSuffix = os.Getenv(envDevSuffix)

// Use defaults if environment variables are not set
if latestTag == "" {
// getFallbackTagValues returns the configurable fallback tag values from SystemState
func getFallbackTagValues(systemState *system.SystemState) (latestTag, masterTag, devSuffix string) {
// Use SystemState fields if set, otherwise use defaults
if systemState.BackendImagesReleaseTag != "" {
latestTag = systemState.BackendImagesReleaseTag
} else {
latestTag = defaultLatestTag
}
if masterTag == "" {
if systemState.BackendImagesBranchTag != "" {
masterTag = systemState.BackendImagesBranchTag
} else {
masterTag = defaultMasterTag
}
if devSuffix == "" {
if systemState.BackendDevSuffix != "" {
devSuffix = systemState.BackendDevSuffix
} else {
devSuffix = defaultDevSuffix
}

Expand Down Expand Up @@ -172,8 +167,8 @@ func InstallBackendFromGallery(ctx context.Context, galleries []config.Gallery,
}

func InstallBackend(ctx context.Context, systemState *system.SystemState, modelLoader *model.ModelLoader, config *GalleryBackend, downloadStatus func(string, string, string, float64)) error {
// Get configurable fallback tag values from environment variables
latestTag, masterTag, devSuffix := getFallbackTagValues()
// Get configurable fallback tag values from SystemState
latestTag, masterTag, devSuffix := getFallbackTagValues(systemState)

// Create base path if it doesn't exist
err := os.MkdirAll(systemState.Backend.BackendsPath, 0750)
Expand Down Expand Up @@ -225,7 +220,7 @@ func InstallBackend(ctx context.Context, systemState *system.SystemState, modelL
}

// Try fallback: replace latestTag + "-" with masterTag + "-" in the URI
fallbackURI := strings.Replace(string(config.URI), latestTag + "-", masterTag + "-", 1)
fallbackURI := strings.Replace(string(config.URI), latestTag+"-", masterTag+"-", 1)
if fallbackURI != string(config.URI) {
xlog.Debug("Trying fallback URI", "original", config.URI, "fallback", fallbackURI)
if err := downloader.URI(fallbackURI).DownloadFileWithContext(ctx, backendPath, "", 1, 1, downloadStatus); err == nil {
Expand All @@ -234,7 +229,7 @@ func InstallBackend(ctx context.Context, systemState *system.SystemState, modelL
} else {
// Try another fallback: add "-" + devSuffix suffix to the backend name
// For example: master-gpu-nvidia-cuda-13-ace-step -> master-gpu-nvidia-cuda-13-ace-step-development
if !strings.Contains(fallbackURI, "-" + devSuffix) {
if !strings.Contains(fallbackURI, "-"+devSuffix) {
// Extract backend name from URI and add -development
parts := strings.Split(fallbackURI, "-")
if len(parts) >= 2 {
Expand Down Expand Up @@ -441,7 +436,7 @@ func ListSystemBackends(systemState *system.SystemState) (SystemBackends, error)

metaMap[dir] = metadata

// Concrete backend entry
// Concrete-backend entry
if _, err := os.Stat(run); err == nil {
backends[dir] = SystemBackend{
Name: dir,
Expand Down
12 changes: 12 additions & 0 deletions core/http/routes/openresponses.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ func RegisterOpenResponsesRoutes(app *echo.Echo,
cancelResponseHandler := openresponses.CancelResponseEndpoint()
app.POST("/v1/responses/:id/cancel", cancelResponseHandler, middleware.TraceMiddleware(application))
app.POST("/responses/:id/cancel", cancelResponseHandler, middleware.TraceMiddleware(application))

// WebSocket endpoint for OpenAI Responses API WebSocket Mode
websocketHandler := openresponses.WebSocketEndpoint(
application.ModelConfigLoader(),
application.ModelLoader(),
application.TemplatesEvaluator(),
application.ApplicationConfig(),
)

// WebSocket at /v1/responses (GET method for upgrade)
app.GET("/v1/responses", websocketHandler, middleware.TraceMiddleware(application))
app.GET("/responses", websocketHandler, middleware.TraceMiddleware(application))
}

// setOpenResponsesRequestContext sets up the context and cancel function for Open Responses requests
Expand Down
70 changes: 70 additions & 0 deletions core/schema/openresponses.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package schema

import (
"time"
"context"
)

Expand All @@ -17,7 +18,7 @@
// OpenResponsesRequest represents a request to the Open Responses API
// https://www.openresponses.org/specification
type OpenResponsesRequest struct {
Model string `json:"model"`

Check failure on line 21 in core/schema/openresponses.go

View workflow job for this annotation

GitHub Actions / tests-apple (1.25.x)

other declaration of ORWebSocketMessage

Check failure on line 21 in core/schema/openresponses.go

View workflow job for this annotation

GitHub Actions / tests-linux (1.25.x)

other declaration of ORWebSocketMessage
Input interface{} `json:"input"` // string or []ORItemParam
Tools []ORFunctionTool `json:"tools,omitempty"`
ToolChoice interface{} `json:"tool_choice,omitempty"` // "auto"|"required"|"none"|{type:"function",name:"..."}
Expand Down Expand Up @@ -309,3 +310,72 @@
Logprobs: orLogprobs, // REQUIRED - must always be present as array (empty if none)
}
}

// WebSocket message types for Open Responses API WebSocket Mode
// https://developers.openai.com/api/docs/guides/websocket-mode

// ORWebSocketMessage represents a WebSocket message (client -> server or server -> client)
type ORWebSocketMessage struct {
Type string `json:"type"` // response.create, response.created, response.progress, etc.
}

// ORWebSocketClientMessage represents a client message to the WebSocket endpoint
type ORWebSocketClientMessage struct {
Type string `json:"type"` // "response.create"
Model string `json:"model,omitempty"`
Input interface{} `json:"input,omitempty"`

Check failure on line 326 in core/schema/openresponses.go

View workflow job for this annotation

GitHub Actions / tests-apple (1.25.x)

ORWebSocketMessage redeclared in this block

Check failure on line 326 in core/schema/openresponses.go

View workflow job for this annotation

GitHub Actions / tests-linux (1.25.x)

ORWebSocketMessage redeclared in this block
Tools []ORFunctionTool `json:"tools,omitempty"`
ToolChoice interface{} `json:"tool_choice,omitempty"`
MaxOutputTokens *int `json:"max_output_tokens,omitempty"`
Temperature *float64 `json:"temperature,omitempty"`
TopP *float64 `json:"top_p,omitempty"`
Truncation string `json:"truncation,omitempty"`
Instructions string `json:"instructions,omitempty"`
Reasoning *ORReasoningParam `json:"reasoning,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
PreviousResponseID string `json:"previous_response_id,omitempty"`
Store *bool `json:"store,omitempty"`
TextFormat interface{} `json:"text_format,omitempty"`
ServiceTier string `json:"service_tier,omitempty"`
AllowedTools []string `json:"allowed_tools,omitempty"`
ParallelToolCalls *bool `json:"parallel_tool_calls,omitempty"`
PresencePenalty *float64 `json:"presence_penalty,omitempty"`
FrequencyPenalty *float64 `json:"frequency_penalty,omitempty"`
TopLogprobs *int `json:"top_logprobs,omitempty"`
MaxToolCalls *int `json:"max_tool_calls,omitempty"`
Generate *bool `json:"generate,omitempty"` // If false, just warm up and return response_id
}

// ORWebSocketServerEvent represents a server event to the WebSocket
type ORWebSocketServerEvent struct {
Type string `json:"type"` // response.created, response.progress, etc.
ResponseID string `json:"response_id,omitempty"`
Response *ORResponseResource `json:"response,omitempty"`
OutputIndex *int `json:"output_index,omitempty"`
Output []ORItemField `json:"output,omitempty"`
ItemID string `json:"item_id,omitempty"`
Item *ORItemField `json:"item,omitempty"`
ContentIndex *int `json:"content_index,omitempty"`
Delta *string `json:"delta,omitempty"`
Text *string `json:"text,omitempty"`
CallID string `json:"call_id,omitempty"`
Arguments *string `json:"arguments,omitempty"`
Error *ORError `json:"error,omitempty"`
}

// ORWebSocketError represents a WebSocket error event
type ORWebSocketError struct {
Type string `json:"type"` // error
Code string `json:"code,omitempty"` // previous_response_not_found, websocket_connection_limit_reached, etc.
Message string `json:"message"`
Param string `json:"param,omitempty"`
}

// ConnectionLocalCacheEntry represents a cached response in connection-local storage
type ConnectionLocalCacheEntry struct {
ResponseID string
Response *ORResponseResource
Input *ORWebSocketClientMessage
CachedAt time.Time
ExpiresAt *time.Time
}
23 changes: 23 additions & 0 deletions pkg/system/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ type SystemState struct {
VRAM uint64

systemCapabilities string

// Backend image fallback tag configuration
BackendImagesReleaseTag string
BackendImagesBranchTag string
BackendDevSuffix string
}

type SystemStateOptions func(*SystemState)
Expand All @@ -43,6 +48,24 @@ func WithModelPath(path string) SystemStateOptions {
}
}

func WithBackendImagesReleaseTag(tag string) SystemStateOptions {
return func(s *SystemState) {
s.BackendImagesReleaseTag = tag
}
}

func WithBackendImagesBranchTag(tag string) SystemStateOptions {
return func(s *SystemState) {
s.BackendImagesBranchTag = tag
}
}

func WithBackendDevSuffix(suffix string) SystemStateOptions {
return func(s *SystemState) {
s.BackendDevSuffix = suffix
}
}

func GetSystemState(opts ...SystemStateOptions) (*SystemState, error) {
state := &SystemState{}
for _, opt := range opts {
Expand Down
Loading