diff --git a/README.md b/README.md index bd69d6a..6d1532a 100644 --- a/README.md +++ b/README.md @@ -62,6 +62,9 @@ cq proxy status --port 19280 cq proxy install # Install the user launch agent cq proxy uninstall # Remove the user launch agent cq proxy restart # Restart the user launch agent +cq proxy pin # Show the pinned Claude account, if any +cq proxy pin +cq proxy pin --clear # Clear the pinned Claude account ``` The proxy config is stored at `$XDG_CONFIG_HOME/cq/proxy.json`, or `~/.config/cq/proxy.json` when `XDG_CONFIG_HOME` is not set. If it does not exist, `cq proxy start` creates it with a random local token. @@ -74,6 +77,7 @@ Important `proxy.json` fields: | `claude_upstream` | `https://api.anthropic.com` | Anthropic API upstream. | | `codex_upstream` | `https://chatgpt.com/backend-api/codex` | Codex backend upstream. | | `local_token` | generated | Required bearer token for local proxy requests. | +| `pinned_claude_account` | unset | Optional Claude account email or UUID to force proxy selection. | | `headroom` | `false` | Enables the headroom compression bridge when true. | | `headroom_mode` | `cache` | Compression strategy when set; valid values are `cache` and `token`. | @@ -85,7 +89,8 @@ Important `proxy.json` fields: cq models refresh # Refresh registry data and publish caches cq models list # List active registry models cq models list --json # JSON model list -cq models list --provider codex # Filter by provider: codex or anthropic +cq models list --provider codex # Filter by provider +cq models list --provider anthropic cq models overlay add --provider codex --id gpt-5.5 --clone-from gpt-5.4 cq models overlay remove --provider codex --id gpt-5.5 @@ -100,7 +105,7 @@ A registry refresh publishes provider-specific caches where supported: - Claude Code model capabilities: `$CLAUDE_CONFIG_DIR/cache/model-capabilities.json`, or `~/.claude/cache/model-capabilities.json`. - Claude Code picker options: `additionalModelOptionsCache` in `~/.claude.json`. -Claude Code still needs `ANTHROPIC_BASE_URL` pointed at the running proxy for runtime API traffic. The `/model` picker is populated from Claude Code config/cache files, so `cq models refresh` and the proxy publish registry-backed picker entries there. +Claude Code still needs `ANTHROPIC_BASE_URL` pointed at the running proxy for runtime API traffic. The `/model` picker is populated from Claude Code config/cache files, so `cq models refresh` and the proxy publish registry-backed picker entries there. The proxy also re-publishes picker entries automatically when it detects drift. ## Background Agent @@ -122,7 +127,7 @@ For each provider, cq displays remaining quota as a percentage bar, pace indicat | Environment variable | Default | Description | |----------------------|---------|-------------| -| `CQ_TTL` | `30s` | Quota cache duration, e.g. `1m`, `5m`. | +| `CQ_TTL` | `30` | Quota cache duration in seconds, e.g. `60`, `300`. | | `XDG_CONFIG_HOME` | `~/.config` | Base directory for cq config files. | | `XDG_CACHE_HOME` | platform cache dir | Base directory for cq quota cache files. | | `CLAUDE_CONFIG_DIR` | `~/.claude` | Claude Code config directory for model capability cache publication. | @@ -138,6 +143,8 @@ For each provider, cq displays remaining quota as a percentage bar, pace indicat | `~/.claude/.credentials.json` | Claude credentials read/written for account management. | | `~/.claude.json` | Claude Code global config; cq writes managed model picker entries. | | `~/.codex/models_cache.json` | Codex model cache populated by registry refresh. | +| `~/Library/Logs/cq/proxy.log` | macOS launch agent log for the proxy service. | +| `~/Library/Logs/cq/refresh.log` | macOS launch agent log for quota refresh. | ## Licence diff --git a/internal/proxy/codex_compact.go b/internal/proxy/codex_compact.go new file mode 100644 index 0000000..44aa9b9 --- /dev/null +++ b/internal/proxy/codex_compact.go @@ -0,0 +1,133 @@ +package proxy + +import ( + "bytes" + "fmt" + "io" + "net/http" + "os" +) + +// handleCodexCompactResponsesRoute handles POST /v1/responses/compact. +func (s *Server) handleCodexCompactResponsesRoute(w http.ResponseWriter, r *http.Request) { + if isWebSocketUpgrade(r) { + rejectCodexCompactWebSocket(w, codexCompactResponsesPath) + return + } + s.handleNativeCodexCompact(w, r, codexCompactResponsesPath) +} + +// handleCodexCompactResponsesGetRoute handles GET /v1/responses/compact. +func (s *Server) handleCodexCompactResponsesGetRoute(w http.ResponseWriter, r *http.Request) { + handleCodexCompactGet(w, r, codexCompactResponsesPath) +} + +// handleLegacyCodexCompactResponsesRoute handles POST /responses/compact. +func (s *Server) handleLegacyCodexCompactResponsesRoute(w http.ResponseWriter, r *http.Request) { + if isWebSocketUpgrade(r) { + rejectCodexCompactWebSocket(w, legacyCodexCompactResponsesPath) + return + } + s.handleNativeCodexCompact(w, r, legacyCodexCompactResponsesPath) +} + +// handleLegacyCodexCompactResponsesGetRoute handles GET /responses/compact. +func (s *Server) handleLegacyCodexCompactResponsesGetRoute(w http.ResponseWriter, r *http.Request) { + handleCodexCompactGet(w, r, legacyCodexCompactResponsesPath) +} + +func handleCodexCompactGet(w http.ResponseWriter, r *http.Request, requestPath string) { + if isWebSocketUpgrade(r) { + rejectCodexCompactWebSocket(w, requestPath) + return + } + w.Header().Set("Allow", http.MethodPost) + writeError(w, http.StatusMethodNotAllowed, "invalid_request_error", fmt.Sprintf("%s only supports POST", requestPath)) +} + +func rejectCodexCompactWebSocket(w http.ResponseWriter, requestPath string) { + writeError(w, http.StatusBadRequest, "invalid_request_error", + fmt.Sprintf("websocket transport is not supported on %s; use %s", requestPath, codexAppServerPath)) +} + +// handleNativeCodexCompact forwards a compact request to the upstream +// /responses/compact endpoint using CodexTransport for auth injection. +// No headroom compression is applied — compact requests already represent +// a summarisation boundary; compressing them further is counterproductive. +func (s *Server) handleNativeCodexCompact(w http.ResponseWriter, r *http.Request, requestPath string) { + if s.CodexTransport == nil { + writeError(w, http.StatusServiceUnavailable, "api_error", "no codex accounts configured") + return + } + + // Buffer request body. + body, err := io.ReadAll(io.LimitReader(r.Body, maxRequestBody+1)) + r.Body.Close() + if err != nil { + writeError(w, http.StatusBadRequest, "invalid_request_error", "failed to read request body") + return + } + if len(body) > maxRequestBody { + writeError(w, http.StatusRequestEntityTooLarge, "invalid_request_error", "request body exceeds 10 MiB") + return + } + + model := extractModel(body) + fmt.Fprintf(os.Stderr, "cq: route POST %s model=%q provider=codex (native compact)\n", requestPath, model) + + // Build upstream request targeting /responses/compact (no headroom applied). + upstreamURL := s.Config.CodexUpstream + "/responses/compact" + upReq, err := http.NewRequestWithContext(r.Context(), http.MethodPost, upstreamURL, bytes.NewReader(body)) + if err != nil { + writeError(w, http.StatusInternalServerError, "api_error", fmt.Sprintf("create upstream request: %v", err)) + return + } + upReq.ContentLength = int64(len(body)) + upReq.GetBody = func() (io.ReadCloser, error) { + return io.NopCloser(bytes.NewReader(body)), nil + } + + // Forward all original headers; transport will override auth. + for key, vals := range r.Header { + for _, v := range vals { + upReq.Header.Add(key, v) + } + } + if upReq.Header.Get("Content-Type") == "" { + upReq.Header.Set("Content-Type", "application/json") + } + + // Transport handles auth injection and account rotation. + resp, err := s.CodexTransport.RoundTrip(upReq) + if err != nil { + writeError(w, http.StatusBadGateway, "api_error", fmt.Sprintf("codex upstream error: %v", err)) + return + } + defer resp.Body.Close() + + fmt.Fprintf(os.Stderr, "cq: proxy POST %s → %d (codex native compact)\n", upstreamURL, resp.StatusCode) + + // Forward response headers, status, and body. + for key, vals := range resp.Header { + for _, v := range vals { + w.Header().Add(key, v) + } + } + w.WriteHeader(resp.StatusCode) + + if f, ok := w.(http.Flusher); ok { + buf := make([]byte, 4096) + for { + n, readErr := resp.Body.Read(buf) + if n > 0 { + w.Write(buf[:n]) + f.Flush() + } + if readErr != nil { + break + } + } + } else { + io.Copy(w, resp.Body) + } +} diff --git a/internal/proxy/codex_compact_test.go b/internal/proxy/codex_compact_test.go new file mode 100644 index 0000000..9259b89 --- /dev/null +++ b/internal/proxy/codex_compact_test.go @@ -0,0 +1,282 @@ +package proxy + +import ( + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + + codex "github.com/jacobcxdev/cq/internal/provider/codex" +) + +// TestServer_CodexCompactPaths_ForwardToCompactEndpointWithCodexAuth verifies +// that both /v1/responses/compact and /responses/compact forward to upstream +// /responses/compact with Codex auth injected and the response body proxied. +func TestServer_CodexCompactPaths_ForwardToCompactEndpointWithCodexAuth(t *testing.T) { + tests := []struct { + name string + requestPath string + }{ + {"canonical path", codexCompactResponsesPath}, + {"legacy path", legacyCodexCompactResponsesPath}, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + var gotPath, gotAuth, gotAcctID string + inner := roundTripFunc(func(r *http.Request) (*http.Response, error) { + gotPath = r.URL.Path + gotAuth = r.Header.Get("Authorization") + gotAcctID = r.Header.Get("ChatGPT-Account-ID") + return &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: io.NopCloser(strings.NewReader(`{"object":"response.compact","output":"compact result"}`)), + }, nil + }) + + srv := &Server{ + Config: &Config{ + CodexUpstream: "https://chatgpt.com", + LocalToken: "tok", + }, + CodexTransport: &CodexTokenTransport{ + Selector: &fakeCodexSelector{account: &codex.CodexAccount{ + AccessToken: "codex-tok", + AccountID: "acct-1", + }}, + Inner: inner, + }, + } + + handler, err := srv.handler() + if err != nil { + t.Fatalf("handler() error = %v", err) + } + + body := `{"model":"gpt-5.4","previous_response_id":"resp_abc"}` + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, tt.requestPath, strings.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + handler.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want 200, body: %s", w.Code, w.Body.String()) + } + if gotPath != "/responses/compact" { + t.Errorf("upstream path = %q, want /responses/compact", gotPath) + } + if gotAuth != "Bearer codex-tok" { + t.Errorf("upstream auth = %q, want Bearer codex-tok", gotAuth) + } + if gotAcctID != "acct-1" { + t.Errorf("upstream account-id = %q, want acct-1", gotAcctID) + } + if !strings.Contains(w.Body.String(), "output") { + t.Errorf("response body should contain output: %s", w.Body.String()) + } + }) + } +} + +// TestServer_CodexCompact_NoTransport verifies that POST /responses/compact +// with nil CodexTransport returns 503. +func TestServer_CodexCompact_NoTransport(t *testing.T) { + srv := &Server{ + Config: &Config{ + CodexUpstream: "https://chatgpt.com", + LocalToken: "tok", + }, + CodexTransport: nil, + } + + handler, err := srv.handler() + if err != nil { + t.Fatalf("handler() error = %v", err) + } + + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/responses/compact", strings.NewReader(`{"model":"gpt-5.4"}`)) + req.Header.Set("Content-Type", "application/json") + handler.ServeHTTP(w, req) + + if w.Code != http.StatusServiceUnavailable { + t.Errorf("status = %d, want 503", w.Code) + } +} + +// TestServer_CodexCompact_RejectsWebsocket verifies that POST /responses/compact +// with a WebSocket upgrade header returns 400 mentioning codexAppServerPath. +func TestServer_CodexCompact_RejectsWebsocket(t *testing.T) { + srv := &Server{ + Config: &Config{ + CodexUpstream: "https://chatgpt.com/backend-api/codex", + LocalToken: "tok", + }, + } + + handler, err := srv.handler() + if err != nil { + t.Fatalf("handler() error = %v", err) + } + + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/responses/compact", nil) + req.Header.Set("Connection", "Upgrade") + req.Header.Set("Upgrade", "websocket") + handler.ServeHTTP(w, req) + + if w.Code != http.StatusBadRequest { + t.Fatalf("status = %d, want 400", w.Code) + } + if !strings.Contains(w.Body.String(), codexAppServerPath) { + t.Errorf("body = %q, want mention of %s", w.Body.String(), codexAppServerPath) + } +} + +// TestServer_CodexCompact_RejectsGetWebsocket verifies that real WebSocket +// upgrade requests on compact paths hit the explicit compact rejection. +func TestServer_CodexCompact_RejectsGetWebsocket(t *testing.T) { + tests := []struct { + name string + path string + }{ + {"canonical path", codexCompactResponsesPath}, + {"legacy path", legacyCodexCompactResponsesPath}, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + srv := &Server{ + Config: &Config{ + CodexUpstream: "https://chatgpt.com/backend-api/codex", + LocalToken: "tok", + }, + } + + handler, err := srv.handler() + if err != nil { + t.Fatalf("handler() error = %v", err) + } + + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, tt.path, nil) + req.Header.Set("Connection", "Upgrade") + req.Header.Set("Upgrade", "websocket") + handler.ServeHTTP(w, req) + + if w.Code != http.StatusBadRequest { + t.Fatalf("status = %d, want 400, body: %s", w.Code, w.Body.String()) + } + if !strings.Contains(w.Body.String(), "invalid_request_error") { + t.Errorf("body = %q, want invalid_request_error", w.Body.String()) + } + if !strings.Contains(w.Body.String(), codexAppServerPath) { + t.Errorf("body = %q, want mention of %s", w.Body.String(), codexAppServerPath) + } + }) + } +} + +// TestServer_CodexCompact_GetMethodNotAllowed verifies that non-upgrade GET +// requests on compact paths do not fall through to the authenticated proxy. +func TestServer_CodexCompact_GetMethodNotAllowed(t *testing.T) { + tests := []struct { + name string + path string + }{ + {"canonical path", codexCompactResponsesPath}, + {"legacy path", legacyCodexCompactResponsesPath}, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + srv := &Server{ + Config: &Config{ + CodexUpstream: "https://chatgpt.com/backend-api/codex", + LocalToken: "tok", + }, + } + + handler, err := srv.handler() + if err != nil { + t.Fatalf("handler() error = %v", err) + } + + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, tt.path, nil) + handler.ServeHTTP(w, req) + + if w.Code != http.StatusMethodNotAllowed { + t.Fatalf("status = %d, want 405, body: %s", w.Code, w.Body.String()) + } + if got := w.Header().Get("Allow"); got != http.MethodPost { + t.Errorf("Allow = %q, want %s", got, http.MethodPost) + } + }) + } +} + +// TestServer_CodexCompact_DoesNotUseHeadroom verifies that a native compact +// request does not invoke the headroom bridge, and the upstream receives the +// original request body unmodified. +func TestServer_CodexCompact_DoesNotUseHeadroom(t *testing.T) { + bridgeCalled := false + + var gotBody []byte + inner := roundTripFunc(func(r *http.Request) (*http.Response, error) { + gotBody, _ = io.ReadAll(r.Body) + return &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: io.NopCloser(strings.NewReader(`{"object":"response.compact"}`)), + }, nil + }) + + bridge := fakeBridgeRaw(t, func(reqBytes []byte) []byte { + var req headroomResponsesRequest + if err := json.Unmarshal(reqBytes, &req); err == nil && req.Operation == "compress_responses" { + bridgeCalled = true + } + return nil + }) + + originalBody := `{"model":"gpt-5.4","previous_response_id":"resp_abc"}` + + srv := &Server{ + Config: &Config{ + CodexUpstream: "https://chatgpt.com/backend-api/codex", + LocalToken: "tok", + }, + CodexTransport: &CodexTokenTransport{ + Selector: &fakeCodexSelector{account: &codex.CodexAccount{AccessToken: "codex-tok"}}, + Inner: inner, + }, + Headroom: bridge, + } + + handler, err := srv.handler() + if err != nil { + t.Fatalf("handler() error = %v", err) + } + + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/responses/compact", strings.NewReader(originalBody)) + req.Header.Set("Content-Type", "application/json") + handler.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want 200, body: %s", w.Code, w.Body.String()) + } + if bridgeCalled { + t.Error("headroom bridge compress_responses should not be called for compact requests") + } + if string(gotBody) != originalBody { + t.Errorf("upstream body = %s, want original %s", gotBody, originalBody) + } +} diff --git a/internal/proxy/codex_handler_test.go b/internal/proxy/codex_handler_test.go index a912a6c..715d9e2 100644 --- a/internal/proxy/codex_handler_test.go +++ b/internal/proxy/codex_handler_test.go @@ -681,6 +681,67 @@ func TestServer_ClaudeCountTokensStillRoutesToClaude(t *testing.T) { } } +func TestServer_ClaudeCodeOpenAIRequestsUseResponsesNotCompactEndpoint(t *testing.T) { + var gotPath string + + srv := &Server{ + Config: &Config{ + ClaudeUpstream: "https://api.anthropic.test", + CodexUpstream: "https://codex.test", + LocalToken: "tok", + }, + Transport: roundTripFunc(func(_ *http.Request) (*http.Response, error) { + t.Fatal("claude upstream should not be called for Codex model") + return nil, fmt.Errorf("unexpected claude upstream request") + }), + CodexTransport: &CodexTokenTransport{ + Selector: &fakeCodexSelector{ + account: &codex.CodexAccount{AccessToken: "codex-tok"}, + }, + Inner: roundTripFunc(func(r *http.Request) (*http.Response, error) { + gotPath = r.URL.Path + if got := r.Header.Get("Authorization"); got != "Bearer codex-tok" { + t.Fatalf("upstream auth = %q, want Bearer codex-tok", got) + } + body := strings.Join([]string{ + `data: {"type":"response.created","response":{"id":"resp_123"}}`, + `data: {"type":"response.output_item.added","item":{"type":"message","role":"assistant"}}`, + `data: {"type":"response.content_part.added","part":{"type":"output_text"}}`, + `data: {"type":"response.output_text.delta","delta":"ok"}`, + `data: {"type":"response.content_part.done","part":{"type":"output_text"}}`, + `data: {"type":"response.output_item.done","item":{"type":"message"}}`, + `data: {"type":"response.completed","response":{"status":"completed","usage":{"input_tokens":1,"output_tokens":1}}}`, + `data: [DONE]`, + }, "\n\n") + return &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}}, + Body: io.NopCloser(strings.NewReader(body)), + }, nil + }), + }, + } + + handler, err := srv.handler() + if err != nil { + t.Fatalf("handler() error = %v", err) + } + + w := httptest.NewRecorder() + body := `{"model":"gpt-5.4","max_tokens":64,"messages":[{"role":"user","content":"Summarise this conversation."}]}` + req := httptest.NewRequest(http.MethodPost, "/v1/messages", strings.NewReader(body)) + req.Header.Set("Authorization", "Bearer tok") + req.Header.Set("Content-Type", "application/json") + handler.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want 200, body: %s", w.Code, w.Body.String()) + } + if gotPath != "/responses" { + t.Fatalf("upstream path = %q, want /responses", gotPath) + } +} + func TestServer_CodexRouting(t *testing.T) { // Claude upstream — should NOT be hit for Codex model. claudeUpstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { diff --git a/internal/proxy/server.go b/internal/proxy/server.go index e899e82..c8b6f1e 100644 --- a/internal/proxy/server.go +++ b/internal/proxy/server.go @@ -25,9 +25,11 @@ import ( const maxRequestBody = 10 << 20 // 10 MiB const ( - codexResponsesPath = "/v1/responses" - legacyCodexResponsesPath = "/responses" - codexAppServerPath = "/app-server" + codexResponsesPath = "/v1/responses" + legacyCodexResponsesPath = "/responses" + codexCompactResponsesPath = "/v1/responses/compact" + legacyCodexCompactResponsesPath = "/responses/compact" + codexAppServerPath = "/app-server" ) // RegistryRefresher is the interface for triggering a registry refresh. @@ -112,6 +114,10 @@ func (s *Server) handler() (http.Handler, error) { mux.HandleFunc("POST /v1/registry/refresh", s.handleRegistryRefresh) mux.HandleFunc(codexResponsesPath, s.handleCodexResponsesRoute) mux.HandleFunc(legacyCodexResponsesPath, s.handleLegacyCodexResponsesRoute) + mux.HandleFunc("GET "+codexCompactResponsesPath, s.handleCodexCompactResponsesGetRoute) + mux.HandleFunc("GET "+legacyCodexCompactResponsesPath, s.handleLegacyCodexCompactResponsesGetRoute) + mux.HandleFunc("POST "+codexCompactResponsesPath, s.handleCodexCompactResponsesRoute) + mux.HandleFunc("POST "+legacyCodexCompactResponsesPath, s.handleLegacyCodexCompactResponsesRoute) mux.HandleFunc(codexAppServerPath, s.handleCodexAppServerRoute) mux.HandleFunc("/", s.proxyHandler(upstream)) return mux, nil