diff --git a/README.md b/README.md index 2806585..9ccd942 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,8 @@ cq proxy pin cq proxy pin --clear # Clear the pinned Claude account ``` +Use `cq proxy pin --clear` to clear a pin. `clear` and `remove` are reserved words, not valid literal pin values. + The proxy config is stored at `$XDG_CONFIG_HOME/cq/proxy.json`, or `~/.config/cq/proxy.json` when `XDG_CONFIG_HOME` is not set. If it does not exist, `cq proxy start` creates it with a random local token. Important `proxy.json` fields: @@ -88,9 +90,12 @@ Important `proxy.json` fields: | `codex_upstream` | `https://chatgpt.com/backend-api/codex` | Codex backend upstream. | | `local_token` | generated | Required bearer token for local proxy requests. | | `pinned_claude_account` | unset | Optional Claude account email or UUID to force proxy selection. | +| `diagnostics_log` | unset | Optional JSONL routing diagnostics log path for advanced local debugging. | | `headroom` | `false` | Enables the headroom compression bridge when true. | | `headroom_mode` | `cache` | Compression strategy when set; valid values are `cache` and `token`. | +Routing diagnostics are disabled by default. To enable them, set `diagnostics_log` in `proxy.json` to a local file path and restart the proxy. The log is append-only JSONL containing redacted route metadata such as method, path, provider, route kind, status, latency, selected-account hint, failover flag, and safe error code. It is intended for advanced local debugging and UAT, and enabling it does not change routing policy. + ## Model Registry `cq models` manages the local model registry used by the proxy, Claude Code model caches, and Codex model cache integration. diff --git a/cmd/cq/proxy.go b/cmd/cq/proxy.go index e0c93ac..70573e2 100644 --- a/cmd/cq/proxy.go +++ b/cmd/cq/proxy.go @@ -57,6 +57,16 @@ func runProxyPin(args []string) error { return fmt.Errorf("load config: %w", err) } + // cq proxy pin (no args) — show current pin + if len(args) == 0 { + if cfg.PinnedClaudeAccount == "" { + fmt.Println("No pin is active. All Claude requests use automatic account selection.") + } else { + fmt.Printf("Pinned Claude account: %s\n", cfg.PinnedClaudeAccount) + } + return nil + } + // cq proxy pin --clear if len(args) == 1 && args[0] == "--clear" { cfg.PinnedClaudeAccount = "" @@ -70,25 +80,30 @@ func runProxyPin(args []string) error { // cq proxy pin if len(args) == 1 { - cfg.PinnedClaudeAccount = args[0] + arg := args[0] + lower := strings.ToLower(arg) + + // Reject reserved words that look like commands but aren't flags. + if lower == "clear" || lower == "remove" { + fmt.Fprintf(os.Stderr, "Usage: cq proxy pin [--clear | ]\n") + return fmt.Errorf("reserved word %q is not valid; did you mean --clear?", arg) + } + + // Reject any argument that looks like an unknown flag. + if strings.HasPrefix(arg, "-") { + fmt.Fprintf(os.Stderr, "Usage: cq proxy pin [--clear | ]\n") + return fmt.Errorf("unknown flag %q", arg) + } + + cfg.PinnedClaudeAccount = arg if err := proxy.SaveConfig(cfg); err != nil { return fmt.Errorf("save config: %w", err) } - fmt.Printf("Pinned Claude account set to %q.\n", args[0]) + fmt.Printf("Pinned Claude account set to %q.\n", arg) fmt.Println("A running proxy will pick up the change shortly.") return nil } - // cq proxy pin (no args) — show current pin - if len(args) == 0 { - if cfg.PinnedClaudeAccount == "" { - fmt.Println("No pin is active. All Claude requests use automatic account selection.") - } else { - fmt.Printf("Pinned Claude account: %s\n", cfg.PinnedClaudeAccount) - } - return nil - } - fmt.Fprintf(os.Stderr, "Usage: cq proxy pin [--clear | ]\n") return fmt.Errorf("unexpected arguments") } @@ -310,6 +325,21 @@ func runProxyStart(opts proxyCommandOptions) error { } } + var diagnostics *proxy.DiagnosticsWriter + if cfg.DiagnosticsLog != "" { + diagnostics, err = proxy.OpenDiagnosticsWriter(cfg.DiagnosticsLog) + if err != nil { + fmt.Fprintf(os.Stderr, "cq: diagnostics: %v (continuing without diagnostics)\n", err) + } else { + fmt.Fprintf(os.Stderr, "cq: diagnostics enabled\n") + defer func() { + if err := diagnostics.Close(); err != nil { + fmt.Fprintf(os.Stderr, "cq: diagnostics: close: %v\n", err) + } + }() + } + } + srv := &proxy.Server{ Config: cfg, Selector: selector, @@ -319,6 +349,7 @@ func runProxyStart(opts proxyCommandOptions) error { CodexTransport: codexTransport, CodexUpgradeTransport: codexUpgradeTransport, Headroom: headroom, + Diag: diagnostics, HeadroomMode: resolvedMode, Catalog: catalog, Refresher: proxyRefresher, diff --git a/cmd/cq/proxy_pin_test.go b/cmd/cq/proxy_pin_test.go new file mode 100644 index 0000000..d24618e --- /dev/null +++ b/cmd/cq/proxy_pin_test.go @@ -0,0 +1,195 @@ +package main + +import ( + "os" + "path/filepath" + "strings" + "testing" + + "github.com/jacobcxdev/cq/internal/proxy" +) + +// setupPinTest isolates proxy config to a temp dir and optionally seeds an +// existing pin value. Returns the config dir path for inspection. +func setupPinTest(t *testing.T, existingPin string) string { + t.Helper() + dir := t.TempDir() + t.Setenv("XDG_CONFIG_HOME", dir) + + if existingPin != "" { + // Seed a config with the given pin so tests that need a pre-existing + // value can verify it remains unchanged. + cfg, err := proxy.LoadConfig() + if err != nil { + t.Fatalf("seed LoadConfig: %v", err) + } + cfg.PinnedClaudeAccount = existingPin + if err := proxy.SaveConfig(cfg); err != nil { + t.Fatalf("seed SaveConfig: %v", err) + } + } + return filepath.Join(dir, "cq") +} + +// loadPin reads the persisted pin from the proxy config under XDG_CONFIG_HOME. +func loadPin(t *testing.T) string { + t.Helper() + cfg, err := proxy.LoadConfig() + if err != nil { + t.Fatalf("LoadConfig: %v", err) + } + return cfg.PinnedClaudeAccount +} + +func TestProxyPin(t *testing.T) { + t.Run("no args no pin configured prints message", func(t *testing.T) { + setupPinTest(t, "") + // No pin is set; runProxyPin(nil) should return nil and print no-pin message. + if err := runProxyPin(nil); err != nil { + t.Fatalf("runProxyPin(nil) returned error: %v", err) + } + }) + + t.Run("no args with pin configured prints pin", func(t *testing.T) { + setupPinTest(t, "pinned@example.com") + if err := runProxyPin(nil); err != nil { + t.Fatalf("runProxyPin(nil) returned error: %v", err) + } + // Pin should remain unchanged. + if got := loadPin(t); got != "pinned@example.com" { + t.Errorf("pin = %q, want %q", got, "pinned@example.com") + } + }) + + t.Run("--clear clears existing pin", func(t *testing.T) { + setupPinTest(t, "user@example.com") + if err := runProxyPin([]string{"--clear"}); err != nil { + t.Fatalf("runProxyPin(--clear) returned error: %v", err) + } + if got := loadPin(t); got != "" { + t.Errorf("pin after --clear = %q, want empty", got) + } + }) + + t.Run("clear (bare word) returns error and leaves pin unchanged", func(t *testing.T) { + setupPinTest(t, "user@example.com") + err := runProxyPin([]string{"clear"}) + if err == nil { + t.Fatal("runProxyPin(clear) expected error, got nil") + } + if !strings.Contains(err.Error(), "clear") { + t.Errorf("error %q does not mention 'clear'", err.Error()) + } + if got := loadPin(t); got != "user@example.com" { + t.Errorf("pin changed to %q, want %q", got, "user@example.com") + } + }) + + t.Run("remove (bare word) returns error and leaves pin unchanged", func(t *testing.T) { + setupPinTest(t, "user@example.com") + err := runProxyPin([]string{"remove"}) + if err == nil { + t.Fatal("runProxyPin(remove) expected error, got nil") + } + if !strings.Contains(err.Error(), "remove") { + t.Errorf("error %q does not mention 'remove'", err.Error()) + } + if got := loadPin(t); got != "user@example.com" { + t.Errorf("pin changed to %q, want %q", got, "user@example.com") + } + }) + + t.Run("CLEAR (case-insensitive) returns error and leaves pin unchanged", func(t *testing.T) { + setupPinTest(t, "user@example.com") + err := runProxyPin([]string{"CLEAR"}) + if err == nil { + t.Fatal("runProxyPin(CLEAR) expected error, got nil") + } + if got := loadPin(t); got != "user@example.com" { + t.Errorf("pin changed to %q, want %q", got, "user@example.com") + } + }) + + t.Run("REMOVE (case-insensitive) returns error and leaves pin unchanged", func(t *testing.T) { + setupPinTest(t, "user@example.com") + err := runProxyPin([]string{"REMOVE"}) + if err == nil { + t.Fatal("runProxyPin(REMOVE) expected error, got nil") + } + if got := loadPin(t); got != "user@example.com" { + t.Errorf("pin changed to %q, want %q", got, "user@example.com") + } + }) + + t.Run("unknown flag returns error and leaves pin unchanged", func(t *testing.T) { + setupPinTest(t, "user@example.com") + err := runProxyPin([]string{"--help"}) + if err == nil { + t.Fatal("runProxyPin(--help) expected error, got nil") + } + if got := loadPin(t); got != "user@example.com" { + t.Errorf("pin changed to %q, want %q", got, "user@example.com") + } + }) + + t.Run("other flag-like arg returns error and leaves pin unchanged", func(t *testing.T) { + setupPinTest(t, "user@example.com") + err := runProxyPin([]string{"--unknown"}) + if err == nil { + t.Fatal("runProxyPin(--unknown) expected error, got nil") + } + if got := loadPin(t); got != "user@example.com" { + t.Errorf("pin changed to %q, want %q", got, "user@example.com") + } + }) + + t.Run("valid email sets pin", func(t *testing.T) { + setupPinTest(t, "") + if err := runProxyPin([]string{"new@example.com"}); err != nil { + t.Fatalf("runProxyPin(email) returned error: %v", err) + } + if got := loadPin(t); got != "new@example.com" { + t.Errorf("pin = %q, want %q", got, "new@example.com") + } + }) + + t.Run("UUID-like value sets pin", func(t *testing.T) { + setupPinTest(t, "") + uuid := "550e8400-e29b-41d4-a716-446655440000" + if err := runProxyPin([]string{uuid}); err != nil { + t.Fatalf("runProxyPin(uuid) returned error: %v", err) + } + if got := loadPin(t); got != uuid { + t.Errorf("pin = %q, want %q", got, uuid) + } + }) + + t.Run("multiple args returns usage error", func(t *testing.T) { + setupPinTest(t, "") + err := runProxyPin([]string{"one@example.com", "two@example.com"}) + if err == nil { + t.Fatal("runProxyPin with multiple args expected error, got nil") + } + }) +} + +// TestProxyPinNoConfigDirCreation verifies that read-only operations (show +// current pin) do not fail when XDG_CONFIG_HOME is set to a non-existent path. +// The LoadConfig path will create the directory on first run, so this test +// just verifies no crash occurs on a fresh temp dir with no prior config. +func TestProxyPinFreshConfig(t *testing.T) { + dir := t.TempDir() + // Point at a sub-directory that doesn't exist yet. + configHome := filepath.Join(dir, "new-config") + t.Setenv("XDG_CONFIG_HOME", configHome) + + // LoadConfig will create the dir and generate a default config. + if err := runProxyPin(nil); err != nil { + t.Fatalf("runProxyPin(nil) on fresh config: %v", err) + } + + // Verify the config file was created. + if _, err := os.Stat(filepath.Join(configHome, "cq", "proxy.json")); err != nil { + t.Errorf("proxy.json not created: %v", err) + } +} diff --git a/docs/screenshot.png b/docs/screenshot.png index 75147f4..64d9fb8 100644 Binary files a/docs/screenshot.png and b/docs/screenshot.png differ diff --git a/internal/proxy/codex_compact.go b/internal/proxy/codex_compact.go index 44aa9b9..47e0ca8 100644 --- a/internal/proxy/codex_compact.go +++ b/internal/proxy/codex_compact.go @@ -6,6 +6,7 @@ import ( "io" "net/http" "os" + "time" ) // handleCodexCompactResponsesRoute handles POST /v1/responses/compact. @@ -55,6 +56,28 @@ func rejectCodexCompactWebSocket(w http.ResponseWriter, requestPath string) { // No headroom compression is applied — compact requests already represent // a summarisation boundary; compressing them further is counterproductive. func (s *Server) handleNativeCodexCompact(w http.ResponseWriter, r *http.Request, requestPath string) { + start := time.Now() + var model string + ctx, routeDiag := withRouteDiagnostics(r.Context()) + if wrapped, rec := s.wrapDiagnosticsResponseWriter(w); rec != nil { + w = wrapped + defer func() { + event := RouteEvent{ + Time: start.UTC(), + Method: r.Method, + Path: r.URL.Path, + Provider: "codex", + RouteKind: "codex_compact", + Model: model, + StatusCode: rec.statusCode(), + LatencyMS: time.Since(start).Milliseconds(), + Error: rec.diagnosticsError(), + } + event.applyRouteDiagnostics(routeDiag) + s.emitDiagnostics(event) + }() + } + if s.CodexTransport == nil { writeError(w, http.StatusServiceUnavailable, "api_error", "no codex accounts configured") return @@ -72,12 +95,12 @@ func (s *Server) handleNativeCodexCompact(w http.ResponseWriter, r *http.Request return } - model := extractModel(body) + model = extractModel(body) fmt.Fprintf(os.Stderr, "cq: route POST %s model=%q provider=codex (native compact)\n", requestPath, model) // Build upstream request targeting /responses/compact (no headroom applied). upstreamURL := s.Config.CodexUpstream + "/responses/compact" - upReq, err := http.NewRequestWithContext(r.Context(), http.MethodPost, upstreamURL, bytes.NewReader(body)) + upReq, err := http.NewRequestWithContext(ctx, http.MethodPost, upstreamURL, bytes.NewReader(body)) if err != nil { writeError(w, http.StatusInternalServerError, "api_error", fmt.Sprintf("create upstream request: %v", err)) return diff --git a/internal/proxy/codex_selector.go b/internal/proxy/codex_selector.go index a9177f3..f6a3539 100644 --- a/internal/proxy/codex_selector.go +++ b/internal/proxy/codex_selector.go @@ -163,3 +163,10 @@ func codexAcctIdentifier(a *codex.CodexAccount) string { } return a.AccessToken } + +func codexAccountHint(a *codex.CodexAccount) string { + if a == nil { + return "" + } + return redactedAccountHint("codex", a.AccountID, a.Email, a.RecordKey, a.AccessToken) +} diff --git a/internal/proxy/codex_transport.go b/internal/proxy/codex_transport.go index fda8a05..2694c1a 100644 --- a/internal/proxy/codex_transport.go +++ b/internal/proxy/codex_transport.go @@ -47,6 +47,7 @@ func (t *CodexTokenTransport) RoundTrip(req *http.Request) (*http.Response, erro if err != nil { return nil, err } + noteRouteAccount(req.Context(), codexAccountHint(acct), false) resp, err := t.doRequest(req, acct) if err != nil { @@ -190,6 +191,7 @@ func (t *CodexTokenTransport) handleUnauthorized(req *http.Request, failedAcct * codexAcctIdentifier(failedAcct), codexAcctIdentifier(alt)) t.persistSwitch(alt) + noteRouteAccount(req.Context(), codexAccountHint(alt), true) resp, err := t.doRequest(req, alt) if err != nil { return nil, err @@ -241,6 +243,7 @@ func (t *CodexTokenTransport) handle429(req *http.Request, resp *http.Response, return makeBufferedResponse(fallbackResp, fallbackBody), nil } + noteRouteAccount(req.Context(), codexAccountHint(alt), true) altResp, err := t.doRequest(req, alt) if err != nil { return nil, err diff --git a/internal/proxy/config.go b/internal/proxy/config.go index 84b89ae..b9c78cd 100644 --- a/internal/proxy/config.go +++ b/internal/proxy/config.go @@ -36,6 +36,7 @@ type Config struct { // PinnedClaudeAccount forces the proxy to route all Claude requests through // a specific account identified by email or AccountUUID. Omitted when empty. PinnedClaudeAccount string `json:"pinned_claude_account,omitempty"` + DiagnosticsLog string `json:"diagnostics_log,omitempty"` } // ResolvedHeadroomMode returns the effective HeadroomMode for this config. diff --git a/internal/proxy/config_test.go b/internal/proxy/config_test.go new file mode 100644 index 0000000..53bffd8 --- /dev/null +++ b/internal/proxy/config_test.go @@ -0,0 +1,100 @@ +package proxy + +import ( + "encoding/json" + "os" + "path/filepath" + "testing" +) + +func TestConfigDiagnosticsLogJSONRoundTrip(t *testing.T) { + cfg := Config{ + Port: DefaultPort, + ClaudeUpstream: DefaultUpstream, + CodexUpstream: DefaultCodexUpstream, + LocalToken: "tok", + DiagnosticsLog: "/tmp/cq-routes.jsonl", + } + data, err := json.Marshal(cfg) + if err != nil { + t.Fatal(err) + } + + var raw map[string]json.RawMessage + if err := json.Unmarshal(data, &raw); err != nil { + t.Fatal(err) + } + if string(raw["diagnostics_log"]) != `"/tmp/cq-routes.jsonl"` { + t.Fatalf("diagnostics_log = %s, want configured path in %s", raw["diagnostics_log"], data) + } + + var roundTrip Config + if err := json.Unmarshal(data, &roundTrip); err != nil { + t.Fatal(err) + } + if roundTrip.DiagnosticsLog != cfg.DiagnosticsLog { + t.Fatalf("DiagnosticsLog = %q, want %q", roundTrip.DiagnosticsLog, cfg.DiagnosticsLog) + } +} + +func TestConfigDiagnosticsLogDefaultDisabled(t *testing.T) { + var cfg Config + if err := json.Unmarshal([]byte(`{"port":19280,"local_token":"tok"}`), &cfg); err != nil { + t.Fatal(err) + } + if cfg.DiagnosticsLog != "" { + t.Fatalf("DiagnosticsLog = %q, want empty", cfg.DiagnosticsLog) + } + + data, err := json.Marshal(Config{Port: DefaultPort, LocalToken: "tok"}) + if err != nil { + t.Fatal(err) + } + var raw map[string]json.RawMessage + if err := json.Unmarshal(data, &raw); err != nil { + t.Fatal(err) + } + if _, ok := raw["diagnostics_log"]; ok { + t.Fatalf("diagnostics_log should be omitted when empty: %s", data) + } +} + +func TestConfigDiagnosticsLogPersisted(t *testing.T) { + configHome := t.TempDir() + t.Setenv("XDG_CONFIG_HOME", configHome) + path := filepath.Join(t.TempDir(), "routes.jsonl") + + if err := SaveConfig(&Config{ + LocalToken: "tok", + DiagnosticsLog: path, + }); err != nil { + t.Fatalf("SaveConfig: %v", err) + } + + data, err := os.ReadFile(filepath.Join(configHome, "cq", "proxy.json")) + if err != nil { + t.Fatalf("read proxy.json: %v", err) + } + if !json.Valid(data) { + t.Fatalf("proxy.json is not valid JSON: %s", data) + } + var raw map[string]json.RawMessage + if err := json.Unmarshal(data, &raw); err != nil { + t.Fatal(err) + } + var persisted string + if err := json.Unmarshal(raw["diagnostics_log"], &persisted); err != nil { + t.Fatalf("unmarshal diagnostics_log: %v", err) + } + if persisted != path { + t.Fatalf("persisted diagnostics_log = %q, want %q in %s", persisted, path, data) + } + + cfg, err := LoadConfig() + if err != nil { + t.Fatalf("LoadConfig: %v", err) + } + if cfg.DiagnosticsLog != path { + t.Fatalf("loaded DiagnosticsLog = %q, want %q", cfg.DiagnosticsLog, path) + } +} diff --git a/internal/proxy/diag.go b/internal/proxy/diag.go new file mode 100644 index 0000000..6ba571a --- /dev/null +++ b/internal/proxy/diag.go @@ -0,0 +1,131 @@ +package proxy + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "os" + "sync" + "time" +) + +type RouteEvent struct { + Time time.Time `json:"time"` + Method string `json:"method"` + Path string `json:"path"` + Provider string `json:"provider"` + RouteKind string `json:"route_kind,omitempty"` + Model string `json:"model,omitempty"` + AccountHint string `json:"account_hint,omitempty"` + PinActive bool `json:"pin_active,omitempty"` + Failover bool `json:"failover,omitempty"` + StatusCode int `json:"status_code,omitempty"` + LatencyMS int64 `json:"latency_ms,omitempty"` + Error string `json:"error,omitempty"` +} + +type routeDiagnosticsContextKey struct{} + +type routeDiagnostics struct { + mu sync.Mutex + accountHint string + failover bool +} + +func withRouteDiagnostics(ctx context.Context) (context.Context, *routeDiagnostics) { + diag := &routeDiagnostics{} + return context.WithValue(ctx, routeDiagnosticsContextKey{}, diag), diag +} + +func noteRouteAccount(ctx context.Context, accountHint string, failover bool) { + if ctx == nil { + return + } + diag, _ := ctx.Value(routeDiagnosticsContextKey{}).(*routeDiagnostics) + if diag == nil { + return + } + diag.mu.Lock() + defer diag.mu.Unlock() + if accountHint != "" { + diag.accountHint = accountHint + } + diag.failover = diag.failover || failover +} + +func (d *routeDiagnostics) fields() (accountHint string, failover bool) { + if d == nil { + return "", false + } + d.mu.Lock() + defer d.mu.Unlock() + return d.accountHint, d.failover +} + +func (event *RouteEvent) applyRouteDiagnostics(diag *routeDiagnostics) { + if event == nil { + return + } + accountHint, failover := diag.fields() + if accountHint != "" { + event.AccountHint = accountHint + } + if failover { + event.Failover = true + } +} + +func redactedAccountHint(prefix string, identifiers ...string) string { + for _, identifier := range identifiers { + if identifier == "" { + continue + } + sum := sha256.Sum256([]byte(identifier)) + return prefix + ":" + hex.EncodeToString(sum[:])[:12] + } + return "" +} + +type DiagnosticsWriter struct { + mu sync.Mutex + file *os.File +} + +func OpenDiagnosticsWriter(path string) (*DiagnosticsWriter, error) { + f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o600) + if err != nil { + return nil, err + } + if err := f.Chmod(0o600); err != nil { + _ = f.Close() + return nil, err + } + return &DiagnosticsWriter{file: f}, nil +} + +func (w *DiagnosticsWriter) Write(event RouteEvent) error { + if w == nil { + return nil + } + w.mu.Lock() + defer w.mu.Unlock() + if w.file == nil { + return nil + } + return json.NewEncoder(w.file).Encode(event) +} + +func (w *DiagnosticsWriter) Close() error { + if w == nil { + return nil + } + w.mu.Lock() + defer w.mu.Unlock() + if w.file == nil { + return nil + } + err := w.file.Close() + w.file = nil + return err +} diff --git a/internal/proxy/diag_test.go b/internal/proxy/diag_test.go new file mode 100644 index 0000000..5c23af4 --- /dev/null +++ b/internal/proxy/diag_test.go @@ -0,0 +1,173 @@ +package proxy + +import ( + "bufio" + "encoding/json" + "os" + "path/filepath" + "runtime" + "sync" + "testing" + "time" +) + +func TestDiagnosticsWriterCreatesAndAppendsJSONL(t *testing.T) { + path := filepath.Join(t.TempDir(), "routes.jsonl") + + w, err := OpenDiagnosticsWriter(path) + if err != nil { + t.Fatalf("OpenDiagnosticsWriter: %v", err) + } + if err := w.Write(RouteEvent{ + Time: time.Unix(1, 0).UTC(), + Method: "POST", + Path: "/v1/messages", + Provider: "claude", + RouteKind: "anthropic_messages", + Model: "claude-sonnet", + StatusCode: 200, + LatencyMS: 12, + }); err != nil { + t.Fatalf("Write: %v", err) + } + if err := w.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + + w, err = OpenDiagnosticsWriter(path) + if err != nil { + t.Fatalf("reopen diagnostics writer: %v", err) + } + if err := w.Write(RouteEvent{ + Time: time.Unix(2, 0).UTC(), + Method: "POST", + Path: "/responses", + Provider: "codex", + RouteKind: "codex_native", + Model: "gpt-5.4", + StatusCode: 201, + LatencyMS: 34, + }); err != nil { + t.Fatalf("append Write: %v", err) + } + if err := w.Close(); err != nil { + t.Fatalf("append Close: %v", err) + } + + events := readDiagnosticsEvents(t, path) + if len(events) != 2 { + t.Fatalf("events = %d, want 2", len(events)) + } + if events[0].Path != "/v1/messages" || events[1].Path != "/responses" { + t.Fatalf("events paths = %q, %q", events[0].Path, events[1].Path) + } + if runtime.GOOS != "windows" { + info, err := os.Stat(path) + if err != nil { + t.Fatalf("stat diagnostics log: %v", err) + } + if got := info.Mode().Perm(); got != 0o600 { + t.Fatalf("file mode = %#o, want 0600", got) + } + } +} + +func TestDiagnosticsWriterNilSafe(t *testing.T) { + var w *DiagnosticsWriter + if err := w.Write(RouteEvent{Path: "/v1/messages"}); err != nil { + t.Fatalf("nil Write: %v", err) + } + if err := w.Close(); err != nil { + t.Fatalf("nil Close: %v", err) + } +} + +func TestDiagnosticsWriterConcurrentWritesProduceValidJSONLines(t *testing.T) { + path := filepath.Join(t.TempDir(), "routes.jsonl") + w, err := OpenDiagnosticsWriter(path) + if err != nil { + t.Fatalf("OpenDiagnosticsWriter: %v", err) + } + + const count = 64 + var wg sync.WaitGroup + wg.Add(count) + for i := 0; i < count; i++ { + i := i + go func() { + defer wg.Done() + if err := w.Write(RouteEvent{ + Time: time.Unix(int64(i), 0).UTC(), + Method: "POST", + Path: "/v1/messages", + Provider: "claude", + StatusCode: 200, + }); err != nil { + t.Errorf("Write(%d): %v", i, err) + } + }() + } + wg.Wait() + + if err := w.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + + events := readDiagnosticsEvents(t, path) + if len(events) != count { + t.Fatalf("events = %d, want %d", len(events), count) + } + for i, ev := range events { + if ev.Method != "POST" || ev.Path != "/v1/messages" || ev.Provider != "claude" { + t.Fatalf("event %d = %+v", i, ev) + } + } +} + +func TestDiagnosticsWriterCloseSafeAndStopsWrites(t *testing.T) { + path := filepath.Join(t.TempDir(), "routes.jsonl") + w, err := OpenDiagnosticsWriter(path) + if err != nil { + t.Fatalf("OpenDiagnosticsWriter: %v", err) + } + if err := w.Write(RouteEvent{Time: time.Unix(1, 0).UTC(), Method: "GET", Path: "/health", Provider: "proxy"}); err != nil { + t.Fatalf("Write: %v", err) + } + if err := w.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + if err := w.Close(); err != nil { + t.Fatalf("second Close: %v", err) + } + if err := w.Write(RouteEvent{Time: time.Unix(2, 0).UTC(), Method: "GET", Path: "/health", Provider: "proxy"}); err != nil { + t.Fatalf("Write after Close: %v", err) + } + + events := readDiagnosticsEvents(t, path) + if len(events) != 1 { + t.Fatalf("events after closed write = %d, want 1", len(events)) + } +} + +func readDiagnosticsEvents(t *testing.T, path string) []RouteEvent { + t.Helper() + f, err := os.Open(path) + if err != nil { + t.Fatalf("open diagnostics log: %v", err) + } + defer f.Close() + + var events []RouteEvent + scanner := bufio.NewScanner(f) + for scanner.Scan() { + var event RouteEvent + if err := json.Unmarshal(scanner.Bytes(), &event); err != nil { + t.Fatalf("invalid diagnostics JSON line %q: %v", scanner.Text(), err) + } + events = append(events, event) + } + if err := scanner.Err(); err != nil { + t.Fatalf("scan diagnostics log: %v", err) + } + return events +} diff --git a/internal/proxy/server.go b/internal/proxy/server.go index c8b6f1e..1ce4869 100644 --- a/internal/proxy/server.go +++ b/internal/proxy/server.go @@ -56,6 +56,7 @@ type Server struct { CodexTransport http.RoundTripper CodexUpgradeTransport http.RoundTripper // HTTP/1.1-only transport for WebSocket upgrades Headroom *HeadroomBridge + Diag *DiagnosticsWriter // HeadroomMode is the resolved compression mode. Only meaningful when // Headroom is non-nil. Reported in the /health response. HeadroomMode HeadroomMode @@ -316,14 +317,42 @@ func (s *Server) handleLegacyCodexResponsesRoute(w http.ResponseWriter, r *http. func (s *Server) handleCodexAppServerRoute(w http.ResponseWriter, r *http.Request) { if !isWebSocketUpgrade(r) { + start := time.Now() + message := fmt.Sprintf("%s requires websocket upgrade", codexAppServerPath) w.Header().Set("Upgrade", "websocket") - writeError(w, http.StatusUpgradeRequired, "invalid_request_error", fmt.Sprintf("%s requires websocket upgrade", codexAppServerPath)) + writeError(w, http.StatusUpgradeRequired, "invalid_request_error", message) + s.emitDiagnostics(RouteEvent{ + Time: start.UTC(), + Method: r.Method, + Path: r.URL.Path, + Provider: "codex", + RouteKind: "codex_app_server", + StatusCode: http.StatusUpgradeRequired, + LatencyMS: time.Since(start).Milliseconds(), + Error: diagnosticsErrorCode("invalid_request_error", message), + }) return } s.proxyCodexAppServer(w, r) } -func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) { +func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { + start := time.Now() + if wrapped, rec := s.wrapDiagnosticsResponseWriter(w); rec != nil { + w = wrapped + defer func() { + s.emitDiagnostics(RouteEvent{ + Time: start.UTC(), + Method: r.Method, + Path: r.URL.Path, + Provider: "proxy", + RouteKind: "health", + StatusCode: rec.statusCode(), + LatencyMS: time.Since(start).Milliseconds(), + }) + }() + } + var claudeCount int if s.Discover != nil { claudeCount = len(s.Discover()) @@ -339,6 +368,9 @@ func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) { "claude": claudeCount, "codex": codexCount, }, + "diagnostics": map[string]bool{ + "enabled": s.Diag != nil, + }, } if s.Headroom != nil { switch s.HeadroomMode { @@ -379,6 +411,28 @@ func (s *Server) isValidToken(token string) bool { // so only local processes can reach this endpoint. Codex CLI in ChatGPT auth // mode doesn't support custom API keys, so we can't require the proxy token. func (s *Server) handleNativeCodex(w http.ResponseWriter, r *http.Request) { + start := time.Now() + var model string + ctx, routeDiag := withRouteDiagnostics(r.Context()) + if wrapped, rec := s.wrapDiagnosticsResponseWriter(w); rec != nil { + w = wrapped + defer func() { + event := RouteEvent{ + Time: start.UTC(), + Method: r.Method, + Path: r.URL.Path, + Provider: "codex", + RouteKind: "codex_native", + Model: model, + StatusCode: rec.statusCode(), + LatencyMS: time.Since(start).Milliseconds(), + Error: rec.diagnosticsError(), + } + event.applyRouteDiagnostics(routeDiag) + s.emitDiagnostics(event) + }() + } + if s.CodexTransport == nil { writeError(w, http.StatusServiceUnavailable, "api_error", "no codex accounts configured") return @@ -396,7 +450,7 @@ func (s *Server) handleNativeCodex(w http.ResponseWriter, r *http.Request) { return } - model := extractModel(body) + model = extractModel(body) fmt.Fprintf(os.Stderr, "cq: route POST /responses model=%q provider=codex (native)\n", model) // Compress Responses API input via headroom bridge if available. @@ -420,7 +474,7 @@ func (s *Server) handleNativeCodex(w http.ResponseWriter, r *http.Request) { // Build upstream request — forward as-is, no translation. upstreamURL := s.Config.CodexUpstream + "/responses" - upReq, err := http.NewRequestWithContext(r.Context(), "POST", upstreamURL, bytes.NewReader(body)) + upReq, err := http.NewRequestWithContext(ctx, "POST", upstreamURL, bytes.NewReader(body)) if err != nil { writeError(w, http.StatusInternalServerError, "api_error", fmt.Sprintf("create upstream request: %v", err)) return @@ -486,6 +540,33 @@ func (s *Server) handleNativeCodex(w http.ResponseWriter, r *http.Request) { // headroom compression — the handshake body is minimal and the subsequent // binary/text frames are not buffered by this proxy. func (s *Server) proxyCodexUpgrade(w http.ResponseWriter, r *http.Request) { + start := time.Now() + var rec *diagnosticsResponseWriter + ctx, routeDiag := withRouteDiagnostics(r.Context()) + r = r.WithContext(ctx) + if wrapped, recorder := s.wrapDiagnosticsResponseWriter(w); recorder != nil { + w = wrapped + rec = recorder + defer func() { + status := rec.statusCode() + if rec.status == 0 { + status = http.StatusSwitchingProtocols + } + event := RouteEvent{ + Time: start.UTC(), + Method: r.Method, + Path: r.URL.Path, + Provider: "codex", + RouteKind: "codex_legacy_websocket", + StatusCode: status, + LatencyMS: time.Since(start).Milliseconds(), + Error: rec.diagnosticsError(), + } + event.applyRouteDiagnostics(routeDiag) + s.emitDiagnostics(event) + }() + } + codexUpstream, err := url.Parse(s.Config.CodexUpstream) if err != nil { writeError(w, http.StatusInternalServerError, "api_error", "invalid codex upstream URL") @@ -519,14 +600,41 @@ func (s *Server) proxyCodexUpgrade(w http.ResponseWriter, r *http.Request) { // inspect that frame before selecting an account and opening the upstream // websocket. func (s *Server) proxyCodexAppServer(w http.ResponseWriter, r *http.Request) { + start := time.Now() + statusCode := 0 + requestedModel := "" + diagError := "" + ctx, routeDiag := withRouteDiagnostics(r.Context()) + r = r.WithContext(ctx) + defer func() { + event := RouteEvent{ + Time: start.UTC(), + Method: r.Method, + Path: r.URL.Path, + Provider: "codex", + RouteKind: "codex_app_server", + Model: requestedModel, + StatusCode: statusCode, + LatencyMS: time.Since(start).Milliseconds(), + Error: diagError, + } + event.applyRouteDiagnostics(routeDiag) + s.emitDiagnostics(event) + }() + transport, err := s.codexAppServerTransport() if err != nil { + statusCode = http.StatusServiceUnavailable + diagError = diagnosticsErrorCode("api_error", err.Error()) writeError(w, http.StatusServiceUnavailable, "api_error", err.Error()) return } upstreamURL, err := codexAppServerWebSocketURL(s.Config.CodexUpstream) if err != nil { - writeError(w, http.StatusInternalServerError, "api_error", "invalid codex upstream URL") + statusCode = http.StatusInternalServerError + message := "invalid codex upstream URL" + diagError = diagnosticsErrorCode("api_error", message) + writeError(w, http.StatusInternalServerError, "api_error", message) return } @@ -538,6 +646,7 @@ func (s *Server) proxyCodexAppServer(w http.ResponseWriter, r *http.Request) { if err != nil { return } + statusCode = http.StatusSwitchingProtocols defer clientConn.Close() clientConn.SetReadLimit(maxRequestBody) @@ -545,7 +654,6 @@ func (s *Server) proxyCodexAppServer(w http.ResponseWriter, r *http.Request) { if err != nil { return } - requestedModel := "" if messageType == websocket.TextMessage { requestedModel = extractCodexAppServerThreadStartModel(message) } @@ -554,6 +662,7 @@ func (s *Server) proxyCodexAppServer(w http.ResponseWriter, r *http.Request) { upstreamConn, acct, err := s.dialCodexAppServer(r.Context(), transport, upstreamURL, r.Header, requestedModel) if err != nil { + diagError = diagnosticsErrorCode("api_error", "codex upstream error: "+err.Error()) _ = clientConn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, "upstream error"), time.Now().Add(time.Second)) return } @@ -614,6 +723,7 @@ func (s *Server) dialCodexAppServer(ctx context.Context, transport *CodexTokenTr } return nil, nil, fmt.Errorf("no alternate codex account available for app-server websocket") } + noteRouteAccount(ctx, codexAccountHint(acct), len(excluded) > 0) conn, resp, body, err := dialCodexAppServerWithAccount(ctx, upstreamURL, incomingHeaders, acct) if err == nil { if persistSwitch { @@ -768,9 +878,34 @@ func (s *Server) proxyHandler(upstream *url.URL) http.HandlerFunc { } return func(w http.ResponseWriter, r *http.Request) { + start := time.Now() var routeModel string var routeProvider Provider var buf []byte + ctx, routeDiag := withRouteDiagnostics(r.Context()) + r = r.WithContext(ctx) + if diagnosticsAnthropicRouteKind(r.URL.Path) != "" { + if wrapped, rec := s.wrapDiagnosticsResponseWriter(w); rec != nil { + w = wrapped + defer func() { + provider := providerName(routeProvider) + event := RouteEvent{ + Time: start.UTC(), + Method: r.Method, + Path: r.URL.Path, + Provider: provider, + RouteKind: diagnosticsAnthropicRouteKind(r.URL.Path), + Model: routeModel, + PinActive: provider == "claude" && s.claudePinActive(), + StatusCode: rec.statusCode(), + LatencyMS: time.Since(start).Milliseconds(), + Error: rec.diagnosticsError(), + } + event.applyRouteDiagnostics(routeDiag) + s.emitDiagnostics(event) + }() + } + } // Auth check: accept local proxy token or a known Claude account token. token := strings.TrimPrefix(r.Header.Get("Authorization"), "Bearer ") @@ -838,6 +973,99 @@ func (s *Server) proxyHandler(upstream *url.URL) http.HandlerFunc { } } +type diagnosticsResponseWriter struct { + http.ResponseWriter + status int + diagnosticError string +} + +func (w *diagnosticsResponseWriter) WriteHeader(status int) { + if status >= 200 && w.status == 0 { + w.status = status + } + w.ResponseWriter.WriteHeader(status) +} + +func (w *diagnosticsResponseWriter) Write(b []byte) (int, error) { + if w.status == 0 { + w.status = http.StatusOK + } + return w.ResponseWriter.Write(b) +} + +func (w *diagnosticsResponseWriter) statusCode() int { + if w.status == 0 { + return http.StatusOK + } + return w.status +} + +func (w *diagnosticsResponseWriter) SetDiagnosticsError(err string) { + w.diagnosticError = err +} + +func (w *diagnosticsResponseWriter) diagnosticsError() string { + return w.diagnosticError +} + +func (w *diagnosticsResponseWriter) Unwrap() http.ResponseWriter { + return w.ResponseWriter +} + +type diagnosticsFlushWriter struct { + *diagnosticsResponseWriter +} + +func (w diagnosticsFlushWriter) Flush() { + if f, ok := w.ResponseWriter.(http.Flusher); ok { + f.Flush() + } +} + +func (s *Server) wrapDiagnosticsResponseWriter(w http.ResponseWriter) (http.ResponseWriter, *diagnosticsResponseWriter) { + if s == nil || s.Diag == nil { + return w, nil + } + rec := &diagnosticsResponseWriter{ResponseWriter: w} + if _, ok := w.(http.Flusher); ok { + return diagnosticsFlushWriter{diagnosticsResponseWriter: rec}, rec + } + return rec, rec +} + +func (s *Server) emitDiagnostics(event RouteEvent) { + if s == nil || s.Diag == nil { + return + } + if event.Time.IsZero() { + event.Time = time.Now().UTC() + } + if err := s.Diag.Write(event); err != nil { + fmt.Fprintf(os.Stderr, "cq: diagnostics: write: %v\n", err) + } +} + +func (s *Server) claudePinActive() bool { + if s == nil { + return false + } + if selector, ok := s.Selector.(interface{ Pin() string }); ok { + return selector.Pin() != "" + } + return s.Config != nil && s.Config.PinnedClaudeAccount != "" +} + +func diagnosticsAnthropicRouteKind(path string) string { + switch path { + case "/v1/messages": + return "anthropic_messages" + case countTokensPath: + return "anthropic_count_tokens" + default: + return "" + } +} + func providerName(provider Provider) string { switch provider { case ProviderCodex: @@ -898,6 +1126,9 @@ func truncateDebugText(text string) string { } func writeError(w http.ResponseWriter, status int, errType, message string) { + if rec, ok := w.(interface{ SetDiagnosticsError(string) }); ok { + rec.SetDiagnosticsError(diagnosticsErrorCode(errType, message)) + } w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) _ = json.NewEncoder(w).Encode(map[string]any{ @@ -908,3 +1139,54 @@ func writeError(w http.ResponseWriter, status int, errType, message string) { }, }) } + +func diagnosticsErrorCode(errType, message string) string { + msg := strings.ToLower(message) + switch { + case strings.Contains(msg, "invalid proxy token"): + return errType + ":invalid_proxy_token" + case strings.Contains(msg, "no codex accounts configured") || + strings.Contains(msg, "no codex accounts available") || + strings.Contains(msg, "no codex accounts with valid tokens and quota"): + return errType + ":no_codex_accounts" + case strings.Contains(msg, "websocket transport is not supported"): + return errType + ":unsupported_websocket_transport" + case strings.Contains(msg, "requires websocket upgrade"): + return errType + ":websocket_upgrade_required" + case strings.Contains(msg, "only supports"): + return errType + ":method_not_allowed" + case strings.Contains(msg, "invalid codex upstream url") || + strings.Contains(msg, "unsupported codex upstream scheme"): + return errType + ":invalid_codex_upstream" + case strings.Contains(msg, "create upstream request"): + return errType + ":invalid_upstream" + case strings.Contains(msg, "codex upstream error") || + strings.Contains(msg, "codex upstream:") || + strings.Contains(msg, "codex websocket upgrade failed"): + return errType + ":codex_upstream_error" + case strings.Contains(msg, "request translation failed"): + return errType + ":request_translation_failed" + case strings.Contains(msg, "failed to read request body"): + return errType + ":read_request_body" + case strings.Contains(msg, "request body exceeds"): + return errType + ":request_body_too_large" + case strings.Contains(msg, "not a codex model"): + return errType + ":invalid_route_model" + case strings.Contains(msg, "stream collection failed"): + return errType + ":stream_collection_failed" + case strings.Contains(msg, "response assembly failed"): + return errType + ":response_assembly_failed" + case strings.Contains(msg, "decode count_tokens response"): + return errType + ":decode_count_tokens_response" + case strings.Contains(msg, "model registry refresher not configured"): + return errType + ":model_registry_refresher_not_configured" + case strings.Contains(msg, "model registry not configured"): + return errType + ":model_registry_not_configured" + case strings.Contains(msg, "registry refresh failed"): + return errType + ":registry_refresh_failed" + case errType == "api_error": + return errType + ":upstream_error" + default: + return errType + } +} diff --git a/internal/proxy/server_test.go b/internal/proxy/server_test.go index 4f9d64f..1801c7f 100644 --- a/internal/proxy/server_test.go +++ b/internal/proxy/server_test.go @@ -8,6 +8,8 @@ import ( "net/http" "net/http/httptest" "net/url" + "os" + "path/filepath" "strings" "testing" "time" @@ -17,6 +19,7 @@ import ( "github.com/jacobcxdev/cq/internal/keyring" claude "github.com/jacobcxdev/cq/internal/provider/claude" codex "github.com/jacobcxdev/cq/internal/provider/codex" + "github.com/jacobcxdev/cq/internal/quota" ) func mustParseURL(s string) *url.URL { @@ -60,6 +63,1178 @@ func TestServer_HealthEndpoint(t *testing.T) { } } +type diagnosticsControllerTestWriter struct { + header http.Header + statuses []int + body []byte + flushed bool + writeDeadline time.Time +} + +func (w *diagnosticsControllerTestWriter) Header() http.Header { + if w.header == nil { + w.header = http.Header{} + } + return w.header +} + +func (w *diagnosticsControllerTestWriter) Write(b []byte) (int, error) { + if !w.hasFinalStatus() { + w.statuses = append(w.statuses, http.StatusOK) + } + w.body = append(w.body, b...) + return len(b), nil +} + +func (w *diagnosticsControllerTestWriter) WriteHeader(status int) { + w.statuses = append(w.statuses, status) +} + +func (w *diagnosticsControllerTestWriter) Flush() { + w.flushed = true +} + +func (w *diagnosticsControllerTestWriter) SetWriteDeadline(deadline time.Time) error { + w.writeDeadline = deadline + return nil +} + +func (w *diagnosticsControllerTestWriter) hasFinalStatus() bool { + for _, status := range w.statuses { + if status >= 200 { + return true + } + } + return false +} + +func TestDiagnosticsResponseWriterRecordsFinalNonInformationalStatus(t *testing.T) { + underlying := &diagnosticsControllerTestWriter{} + rec := &diagnosticsResponseWriter{ResponseWriter: underlying} + + rec.WriteHeader(http.StatusEarlyHints) + rec.WriteHeader(http.StatusAccepted) + rec.WriteHeader(http.StatusInternalServerError) + + if got := rec.statusCode(); got != http.StatusAccepted { + t.Fatalf("statusCode = %d, want %d", got, http.StatusAccepted) + } + wantStatuses := []int{http.StatusEarlyHints, http.StatusAccepted, http.StatusInternalServerError} + if fmt.Sprint(underlying.statuses) != fmt.Sprint(wantStatuses) { + t.Fatalf("underlying statuses = %v, want %v", underlying.statuses, wantStatuses) + } + + underlying = &diagnosticsControllerTestWriter{} + rec = &diagnosticsResponseWriter{ResponseWriter: underlying} + rec.WriteHeader(http.StatusEarlyHints) + if _, err := rec.Write([]byte("ok")); err != nil { + t.Fatalf("Write: %v", err) + } + if got := rec.statusCode(); got != http.StatusOK { + t.Fatalf("statusCode after informational then Write = %d, want %d", got, http.StatusOK) + } +} + +func TestDiagnosticsResponseWriterUnwrapsForResponseController(t *testing.T) { + underlying := &diagnosticsControllerTestWriter{} + wrapped, rec := (&Server{Diag: &DiagnosticsWriter{}}).wrapDiagnosticsResponseWriter(underlying) + if rec == nil { + t.Fatal("recorder is nil") + } + if _, ok := wrapped.(http.Flusher); !ok { + t.Fatal("wrapped writer does not preserve http.Flusher") + } + + deadline := time.Unix(123, 0).UTC() + if err := http.NewResponseController(wrapped).SetWriteDeadline(deadline); err != nil { + t.Fatalf("SetWriteDeadline: %v", err) + } + if !underlying.writeDeadline.Equal(deadline) { + t.Fatalf("write deadline = %v, want %v", underlying.writeDeadline, deadline) + } + if err := http.NewResponseController(wrapped).Flush(); err != nil { + t.Fatalf("Flush: %v", err) + } + if !underlying.flushed { + t.Fatal("underlying writer was not flushed") + } +} + +func TestServerDiagnosticsClaudeRouteEmitsEvent(t *testing.T) { + path := filepath.Join(t.TempDir(), "routes.jsonl") + diag, err := OpenDiagnosticsWriter(path) + if err != nil { + t.Fatalf("OpenDiagnosticsWriter: %v", err) + } + defer diag.Close() + + future := time.Now().UnixMilli() + 3600_000 + claudeAccount := keyring.ClaudeOAuth{Email: "user@test.com", AccountUUID: "account-uuid-secret", AccessToken: "real-token", ExpiresAt: future} + sel := &fakeSelector{accounts: []keyring.ClaudeOAuth{ + claudeAccount, + }} + srv := &Server{ + Config: &Config{ + ClaudeUpstream: "https://api.anthropic.com", + LocalToken: "local-tok", + PinnedClaudeAccount: "user@test.com", + }, + Transport: &TokenTransport{ + Selector: sel, + Inner: roundTripFunc(func(_ *http.Request) (*http.Response, error) { + return makeResponse(http.StatusOK, `{"id":"msg_123"}`), nil + }), + }, + Diag: diag, + } + + handler := srv.proxyHandler(mustParseURL(srv.Config.ClaudeUpstream)) + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/messages", strings.NewReader(`{"model":"claude-sonnet","messages":[]}`)) + req.Header.Set("Authorization", "Bearer local-tok") + req.Header.Set("Content-Type", "application/json") + handler(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want 200, body: %s", w.Code, w.Body.String()) + } + if err := diag.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + events := readDiagnosticsEvents(t, path) + if len(events) != 1 { + t.Fatalf("events = %d, want 1", len(events)) + } + ev := events[0] + if ev.Method != http.MethodPost || ev.Path != "/v1/messages" || ev.Provider != "claude" { + t.Fatalf("event route = %+v", ev) + } + if ev.RouteKind != "anthropic_messages" { + t.Fatalf("RouteKind = %q, want anthropic_messages", ev.RouteKind) + } + if ev.Model != "claude-sonnet" { + t.Fatalf("Model = %q, want claude-sonnet", ev.Model) + } + if !ev.PinActive { + t.Fatal("PinActive = false, want true") + } + if ev.AccountHint != claudeAccountHint(&claudeAccount) { + t.Fatalf("AccountHint = %q, want redacted hint %q", ev.AccountHint, claudeAccountHint(&claudeAccount)) + } + if ev.Failover { + t.Fatal("Failover = true, want false") + } + if ev.StatusCode != http.StatusOK { + t.Fatalf("StatusCode = %d, want 200", ev.StatusCode) + } + if ev.Time.IsZero() { + t.Fatal("Time is zero") + } + assertDiagnosticsLogDoesNotContain(t, path, "local-tok") + assertDiagnosticsLogDoesNotContain(t, path, "user@test.com") + assertDiagnosticsLogDoesNotContain(t, path, "account-uuid-secret") + assertDiagnosticsLogDoesNotContain(t, path, "real-token") +} + +func TestServerDiagnosticsClaudeRouteRecordsFailover(t *testing.T) { + path := filepath.Join(t.TempDir(), "routes.jsonl") + diag, err := OpenDiagnosticsWriter(path) + if err != nil { + t.Fatalf("OpenDiagnosticsWriter: %v", err) + } + defer diag.Close() + + future := time.Now().UnixMilli() + 3600_000 + accounts := []keyring.ClaudeOAuth{ + {Email: "primary@test.com", AccountUUID: "primary-uuid", AccessToken: "primary-token", ExpiresAt: future}, + {Email: "fallback@test.com", AccountUUID: "fallback-uuid", AccessToken: "fallback-token", ExpiresAt: future}, + } + srv := &Server{ + Config: &Config{ + ClaudeUpstream: "https://api.anthropic.com", + LocalToken: "local-tok", + }, + Transport: &TokenTransport{ + Selector: &fakeSelector{accounts: accounts}, + Inner: roundTripFunc(func(req *http.Request) (*http.Response, error) { + switch req.Header.Get("Authorization") { + case "Bearer primary-token": + return makeResponse(http.StatusTooManyRequests, `{"error":"rate_limited"}`), nil + case "Bearer fallback-token": + return makeResponse(http.StatusOK, `{"id":"msg_456"}`), nil + default: + t.Fatalf("unexpected Authorization = %q", req.Header.Get("Authorization")) + return nil, nil + } + }), + }, + Diag: diag, + } + + handler := srv.proxyHandler(mustParseURL(srv.Config.ClaudeUpstream)) + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/messages", strings.NewReader(`{"model":"claude-sonnet","messages":[]}`)) + req.Header.Set("Authorization", "Bearer local-tok") + req.Header.Set("Content-Type", "application/json") + handler(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want 200, body: %s", w.Code, w.Body.String()) + } + if err := diag.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + events := readDiagnosticsEvents(t, path) + if len(events) != 1 { + t.Fatalf("events = %d, want 1", len(events)) + } + ev := events[0] + if ev.AccountHint != claudeAccountHint(&accounts[1]) { + t.Fatalf("AccountHint = %q, want fallback hint %q", ev.AccountHint, claudeAccountHint(&accounts[1])) + } + if !ev.Failover { + t.Fatal("Failover = false, want true") + } + assertDiagnosticsLogDoesNotContain(t, path, "primary@test.com") + assertDiagnosticsLogDoesNotContain(t, path, "fallback@test.com") + assertDiagnosticsLogDoesNotContain(t, path, "primary-uuid") + assertDiagnosticsLogDoesNotContain(t, path, "fallback-uuid") + assertDiagnosticsLogDoesNotContain(t, path, "primary-token") + assertDiagnosticsLogDoesNotContain(t, path, "fallback-token") +} + +func TestServerDiagnosticsClaudeTransportFailureEmitsSafeError(t *testing.T) { + path := filepath.Join(t.TempDir(), "routes.jsonl") + diag, err := OpenDiagnosticsWriter(path) + if err != nil { + t.Fatalf("OpenDiagnosticsWriter: %v", err) + } + defer diag.Close() + + future := time.Now().UnixMilli() + 3600_000 + acct := keyring.ClaudeOAuth{Email: "error@test.com", AccountUUID: "error-uuid", AccessToken: "error-token", ExpiresAt: future} + srv := &Server{ + Config: &Config{ + ClaudeUpstream: "https://api.anthropic.com", + LocalToken: "local-tok", + }, + Transport: &TokenTransport{ + Selector: &fakeSelector{accounts: []keyring.ClaudeOAuth{acct}}, + Inner: roundTripFunc(func(_ *http.Request) (*http.Response, error) { + return nil, fmt.Errorf("dial failed for error-token") + }), + }, + Diag: diag, + } + + handler := srv.proxyHandler(mustParseURL(srv.Config.ClaudeUpstream)) + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/messages", strings.NewReader(`{"model":"claude-sonnet","messages":[]}`)) + req.Header.Set("Authorization", "Bearer local-tok") + req.Header.Set("Content-Type", "application/json") + handler(w, req) + + if w.Code != http.StatusBadGateway { + t.Fatalf("status = %d, want 502, body: %s", w.Code, w.Body.String()) + } + if err := diag.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + events := readDiagnosticsEvents(t, path) + if len(events) != 1 { + t.Fatalf("events = %d, want 1", len(events)) + } + ev := events[0] + if ev.Error != "api_error:upstream_error" { + t.Fatalf("Error = %q, want safe upstream error code", ev.Error) + } + if ev.AccountHint != claudeAccountHint(&acct) { + t.Fatalf("AccountHint = %q, want redacted hint %q", ev.AccountHint, claudeAccountHint(&acct)) + } + assertDiagnosticsLogDoesNotContain(t, path, "error@test.com") + assertDiagnosticsLogDoesNotContain(t, path, "error-uuid") + assertDiagnosticsLogDoesNotContain(t, path, "error-token") +} + +func TestServerDiagnosticsClaudeRouteReadsLiveSelectorPin(t *testing.T) { + future := time.Now().UnixMilli() + 3600_000 + accounts := []keyring.ClaudeOAuth{ + {Email: "fallback@test.com", AccountUUID: "uuid-fallback", AccessToken: "fallback-token", ExpiresAt: future}, + {Email: "pinned@test.com", AccountUUID: "uuid-pin", AccessToken: "pinned-token", ExpiresAt: future}, + } + + for _, tc := range []struct { + name string + configPin string + livePin string + quota QuotaReader + wantPin bool + }{ + { + name: "set by config reload", + livePin: "pinned@test.com", + wantPin: true, + }, + { + name: "cleared by config reload", + configPin: "pinned@test.com", + wantPin: false, + }, + { + name: "cleared by automatic expiry", + configPin: "pinned@test.com", + livePin: "pinned@test.com", + quota: stubQuotaReader{ + "uuid-pin": { + Result: quota.Result{ + Status: quota.StatusExhausted, + Windows: map[quota.WindowName]quota.Window{ + "5h": {RemainingPct: 0}, + }, + }, + FetchedAt: time.Now(), + }, + }, + wantPin: false, + }, + } { + t.Run(tc.name, func(t *testing.T) { + path := filepath.Join(t.TempDir(), "routes.jsonl") + diag, err := OpenDiagnosticsWriter(path) + if err != nil { + t.Fatalf("OpenDiagnosticsWriter: %v", err) + } + defer diag.Close() + + inner := innerSelectorFunc(func(_ context.Context, exclude ...string) (*keyring.ClaudeOAuth, error) { + excludeSet := make(map[string]bool, len(exclude)) + for _, e := range exclude { + excludeSet[e] = true + } + for i := range accounts { + acct := &accounts[i] + if isExcluded(acct, excludeSet) { + continue + } + result := *acct + return &result, nil + } + return nil, fmt.Errorf("no accounts available") + }) + selector := NewPinnedClaudeSelector(inner, func() []keyring.ClaudeOAuth { return accounts }, tc.livePin, tc.quota) + srv := &Server{ + Config: &Config{ + ClaudeUpstream: "https://api.anthropic.com", + LocalToken: "local-tok", + PinnedClaudeAccount: tc.configPin, + }, + Selector: selector, + Transport: &TokenTransport{ + Selector: selector, + Inner: roundTripFunc(func(_ *http.Request) (*http.Response, error) { + return makeResponse(http.StatusOK, `{"id":"msg_123"}`), nil + }), + }, + Diag: diag, + } + + handler := srv.proxyHandler(mustParseURL(srv.Config.ClaudeUpstream)) + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/messages", strings.NewReader(`{"model":"claude-sonnet","messages":[]}`)) + req.Header.Set("Authorization", "Bearer local-tok") + req.Header.Set("Content-Type", "application/json") + handler(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want 200, body: %s", w.Code, w.Body.String()) + } + if err := diag.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + events := readDiagnosticsEvents(t, path) + if len(events) != 1 { + t.Fatalf("events = %d, want 1", len(events)) + } + if events[0].PinActive != tc.wantPin { + t.Fatalf("PinActive = %v, want %v; event = %+v", events[0].PinActive, tc.wantPin, events[0]) + } + }) + } +} + +func TestServerDiagnosticsCodexRouteEmitsEvent(t *testing.T) { + path := filepath.Join(t.TempDir(), "routes.jsonl") + diag, err := OpenDiagnosticsWriter(path) + if err != nil { + t.Fatalf("OpenDiagnosticsWriter: %v", err) + } + defer diag.Close() + + codexAccount := codex.CodexAccount{Email: "codex-user@test.com", AccountID: "codex-account-secret", AccessToken: "codex-tok"} + srv := &Server{ + Config: &Config{ + ClaudeUpstream: "https://api.anthropic.com", + CodexUpstream: "https://chatgpt.com/backend-api/codex", + LocalToken: "tok", + }, + CodexTransport: &CodexTokenTransport{ + Selector: &fakeCodexSelector{account: &codexAccount}, + Inner: roundTripFunc(func(_ *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusAccepted, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: io.NopCloser(strings.NewReader(`{"id":"resp_123"}`)), + }, nil + }), + }, + Diag: diag, + } + + handler, err := srv.handler() + if err != nil { + t.Fatalf("handler() error = %v", err) + } + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, codexResponsesPath, strings.NewReader(`{"model":"gpt-5.4","input":"hello"}`)) + req.Header.Set("Content-Type", "application/json") + handler.ServeHTTP(w, req) + + if w.Code != http.StatusAccepted { + t.Fatalf("status = %d, want 202, body: %s", w.Code, w.Body.String()) + } + if err := diag.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + events := readDiagnosticsEvents(t, path) + if len(events) != 1 { + t.Fatalf("events = %d, want 1", len(events)) + } + ev := events[0] + if ev.Method != http.MethodPost || ev.Path != codexResponsesPath || ev.Provider != "codex" { + t.Fatalf("event route = %+v", ev) + } + if ev.RouteKind != "codex_native" { + t.Fatalf("RouteKind = %q, want codex_native", ev.RouteKind) + } + if ev.Model != "gpt-5.4" { + t.Fatalf("Model = %q, want gpt-5.4", ev.Model) + } + if ev.StatusCode != http.StatusAccepted { + t.Fatalf("StatusCode = %d, want 202", ev.StatusCode) + } + if ev.AccountHint != codexAccountHint(&codexAccount) { + t.Fatalf("AccountHint = %q, want redacted hint %q", ev.AccountHint, codexAccountHint(&codexAccount)) + } + if ev.Failover { + t.Fatal("Failover = true, want false") + } + assertDiagnosticsLogDoesNotContain(t, path, "codex-user@test.com") + assertDiagnosticsLogDoesNotContain(t, path, "codex-account-secret") + assertDiagnosticsLogDoesNotContain(t, path, "codex-tok") +} + +func TestServerDiagnosticsCodexRouteRecordsFailover(t *testing.T) { + path := filepath.Join(t.TempDir(), "routes.jsonl") + diag, err := OpenDiagnosticsWriter(path) + if err != nil { + t.Fatalf("OpenDiagnosticsWriter: %v", err) + } + defer diag.Close() + + accounts := []codex.CodexAccount{ + {Email: "primary-codex@test.com", AccountID: "primary-codex-account", AccessToken: "primary-codex-token"}, + {Email: "fallback-codex@test.com", AccountID: "fallback-codex-account", AccessToken: "fallback-codex-token"}, + } + srv := &Server{ + Config: &Config{ + ClaudeUpstream: "https://api.anthropic.com", + CodexUpstream: "https://chatgpt.com/backend-api/codex", + LocalToken: "tok", + }, + CodexTransport: &CodexTokenTransport{ + Selector: &multiCodexSelector{accounts: accounts}, + Inner: roundTripFunc(func(req *http.Request) (*http.Response, error) { + switch req.Header.Get("Authorization") { + case "Bearer primary-codex-token": + return makeResponse(http.StatusTooManyRequests, `{"error":{"code":"rate_limit_exceeded"}}`), nil + case "Bearer fallback-codex-token": + return &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: io.NopCloser(strings.NewReader(`{"id":"resp_456"}`)), + }, nil + default: + t.Fatalf("unexpected Authorization = %q", req.Header.Get("Authorization")) + return nil, nil + } + }), + }, + Diag: diag, + } + + handler, err := srv.handler() + if err != nil { + t.Fatalf("handler() error = %v", err) + } + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, codexResponsesPath, strings.NewReader(`{"model":"gpt-5.4","input":"hello"}`)) + req.Header.Set("Content-Type", "application/json") + handler.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want 200, body: %s", w.Code, w.Body.String()) + } + if err := diag.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + events := readDiagnosticsEvents(t, path) + if len(events) != 1 { + t.Fatalf("events = %d, want 1", len(events)) + } + ev := events[0] + if ev.AccountHint != codexAccountHint(&accounts[1]) { + t.Fatalf("AccountHint = %q, want fallback hint %q", ev.AccountHint, codexAccountHint(&accounts[1])) + } + if !ev.Failover { + t.Fatal("Failover = false, want true") + } + assertDiagnosticsLogDoesNotContain(t, path, "primary-codex@test.com") + assertDiagnosticsLogDoesNotContain(t, path, "fallback-codex@test.com") + assertDiagnosticsLogDoesNotContain(t, path, "primary-codex-account") + assertDiagnosticsLogDoesNotContain(t, path, "fallback-codex-account") + assertDiagnosticsLogDoesNotContain(t, path, "primary-codex-token") + assertDiagnosticsLogDoesNotContain(t, path, "fallback-codex-token") +} + +func TestServerDiagnosticsCodexNoTransportEmitsSafeError(t *testing.T) { + path := filepath.Join(t.TempDir(), "routes.jsonl") + diag, err := OpenDiagnosticsWriter(path) + if err != nil { + t.Fatalf("OpenDiagnosticsWriter: %v", err) + } + defer diag.Close() + + srv := &Server{ + Config: &Config{ + ClaudeUpstream: "https://api.anthropic.com", + CodexUpstream: "https://chatgpt.com/backend-api/codex", + LocalToken: "local-token-secret", + }, + Diag: diag, + } + + handler, err := srv.handler() + if err != nil { + t.Fatalf("handler() error = %v", err) + } + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, codexResponsesPath, strings.NewReader(`{"model":"gpt-5.4","input":"hello"}`)) + req.Header.Set("Content-Type", "application/json") + handler.ServeHTTP(w, req) + + if w.Code != http.StatusServiceUnavailable { + t.Fatalf("status = %d, want 503, body: %s", w.Code, w.Body.String()) + } + if err := diag.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + events := readDiagnosticsEvents(t, path) + if len(events) != 1 { + t.Fatalf("events = %d, want 1", len(events)) + } + ev := events[0] + if ev.Error != "api_error:no_codex_accounts" { + t.Fatalf("Error = %q, want no account code", ev.Error) + } + if ev.StatusCode != http.StatusServiceUnavailable { + t.Fatalf("StatusCode = %d, want 503", ev.StatusCode) + } + assertDiagnosticsLogDoesNotContain(t, path, "local-token-secret") +} + +func TestServerDiagnosticsCountTokensRouteEmitsEvents(t *testing.T) { + for _, tc := range []struct { + name string + model string + wantProvider string + wantBody string + }{ + {name: "claude", model: "claude-sonnet-4-6", wantProvider: "claude", wantBody: `{"input_tokens":321}`}, + {name: "codex", model: "gpt-5.4", wantProvider: "codex", wantBody: `{"input_tokens":123}`}, + } { + t.Run(tc.name, func(t *testing.T) { + path := filepath.Join(t.TempDir(), "routes.jsonl") + diag, err := OpenDiagnosticsWriter(path) + if err != nil { + t.Fatalf("OpenDiagnosticsWriter: %v", err) + } + defer diag.Close() + + codexTransport := &CodexTokenTransport{ + Selector: &fakeCodexSelector{account: &codex.CodexAccount{AccessToken: "codex-tok", AccountID: "acct"}}, + Inner: roundTripFunc(func(r *http.Request) (*http.Response, error) { + if tc.wantProvider != "codex" { + t.Fatal("codex upstream should not be called") + } + if !strings.HasSuffix(r.URL.Path, "/v1/responses/input_tokens") { + t.Fatalf("codex path = %q, want suffix /v1/responses/input_tokens", r.URL.Path) + } + return &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: io.NopCloser(strings.NewReader(`{"object":"response.input_tokens","input_tokens":123}`)), + }, nil + }), + } + srv := &Server{ + Config: &Config{ + ClaudeUpstream: "https://api.anthropic.com", + CodexUpstream: "https://chatgpt.com/backend-api/codex", + LocalToken: "local-tok", + }, + Transport: roundTripFunc(func(r *http.Request) (*http.Response, error) { + if tc.wantProvider != "claude" { + t.Fatal("claude upstream should not be called") + } + if r.URL.Path != countTokensPath { + t.Fatalf("claude path = %q, want %q", r.URL.Path, countTokensPath) + } + return &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: io.NopCloser(strings.NewReader(tc.wantBody)), + }, nil + }), + CodexTransport: codexTransport, + Diag: diag, + } + + handler := srv.proxyHandler(mustParseURL(srv.Config.ClaudeUpstream)) + w := httptest.NewRecorder() + body := fmt.Sprintf(`{"model":%q,"messages":[{"role":"user","content":"hi"}]}`, tc.model) + req := httptest.NewRequest(http.MethodPost, countTokensPath, strings.NewReader(body)) + req.Header.Set("Authorization", "Bearer local-tok") + req.Header.Set("Content-Type", "application/json") + handler(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want 200, body: %s", w.Code, w.Body.String()) + } + if strings.TrimSpace(w.Body.String()) != tc.wantBody { + t.Fatalf("body = %s, want %s", w.Body.String(), tc.wantBody) + } + if err := diag.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + events := readDiagnosticsEvents(t, path) + if len(events) != 1 { + t.Fatalf("events = %d, want 1", len(events)) + } + ev := events[0] + if ev.Method != http.MethodPost || ev.Path != countTokensPath || ev.Provider != tc.wantProvider { + t.Fatalf("event route = %+v", ev) + } + if ev.RouteKind != "anthropic_count_tokens" { + t.Fatalf("RouteKind = %q, want anthropic_count_tokens", ev.RouteKind) + } + if ev.Model != tc.model { + t.Fatalf("Model = %q, want %s", ev.Model, tc.model) + } + if ev.StatusCode != http.StatusOK { + t.Fatalf("StatusCode = %d, want 200", ev.StatusCode) + } + assertDiagnosticsLogDoesNotContain(t, path, "local-tok") + }) + } +} + +func TestServerDiagnosticsLegacyCodexRouteEmitsEvent(t *testing.T) { + path := filepath.Join(t.TempDir(), "routes.jsonl") + diag, err := OpenDiagnosticsWriter(path) + if err != nil { + t.Fatalf("OpenDiagnosticsWriter: %v", err) + } + defer diag.Close() + + const localToken = "secret-proxy-token" + var gotPath string + srv := &Server{ + Config: &Config{ + ClaudeUpstream: "https://api.anthropic.com", + CodexUpstream: "https://chatgpt.com", + LocalToken: localToken, + }, + CodexTransport: &CodexTokenTransport{ + Selector: &fakeCodexSelector{account: &codex.CodexAccount{AccessToken: "codex-tok"}}, + Inner: roundTripFunc(func(r *http.Request) (*http.Response, error) { + gotPath = r.URL.Path + return &http.Response{ + StatusCode: http.StatusCreated, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: io.NopCloser(strings.NewReader(`{"id":"resp_legacy"}`)), + }, nil + }), + }, + Diag: diag, + } + + handler, err := srv.handler() + if err != nil { + t.Fatalf("handler() error = %v", err) + } + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, legacyCodexResponsesPath, strings.NewReader(`{"model":"gpt-5.4","input":"hello"}`)) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+localToken) + handler.ServeHTTP(w, req) + + if w.Code != http.StatusCreated { + t.Fatalf("status = %d, want 201, body: %s", w.Code, w.Body.String()) + } + if gotPath != "/responses" { + t.Fatalf("upstream path = %q, want /responses", gotPath) + } + if err := diag.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + events := readDiagnosticsEvents(t, path) + if len(events) != 1 { + t.Fatalf("events = %d, want 1", len(events)) + } + ev := events[0] + if ev.Method != http.MethodPost || ev.Path != legacyCodexResponsesPath || ev.Provider != "codex" { + t.Fatalf("event route = %+v", ev) + } + if ev.RouteKind != "codex_native" { + t.Fatalf("RouteKind = %q, want codex_native", ev.RouteKind) + } + if ev.Model != "gpt-5.4" { + t.Fatalf("Model = %q, want gpt-5.4", ev.Model) + } + if ev.StatusCode != http.StatusCreated { + t.Fatalf("StatusCode = %d, want 201", ev.StatusCode) + } + assertDiagnosticsLogDoesNotContain(t, path, localToken) +} + +func TestServerDiagnosticsLegacyCodexWebsocketRouteEmitsEvent(t *testing.T) { + path := filepath.Join(t.TempDir(), "routes.jsonl") + diag, err := OpenDiagnosticsWriter(path) + if err != nil { + t.Fatalf("OpenDiagnosticsWriter: %v", err) + } + defer diag.Close() + + upgrader := websocket.Upgrader{CheckOrigin: func(_ *http.Request) bool { return true }} + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/responses" { + t.Errorf("upstream path = %q, want /responses", r.URL.Path) + } + if got := r.Header.Get("Authorization"); got != "Bearer codex-tok" { + t.Errorf("upstream auth = %q, want Bearer codex-tok", got) + } + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + t.Errorf("upstream upgrade error = %v", err) + return + } + defer conn.Close() + messageType, message, err := conn.ReadMessage() + if err != nil { + t.Errorf("upstream read error = %v", err) + return + } + if string(message) != "ping" { + t.Errorf("upstream message = %q, want ping", message) + } + if err := conn.WriteMessage(messageType, []byte("pong")); err != nil { + t.Errorf("upstream write error = %v", err) + } + })) + defer upstream.Close() + + srv := &Server{ + Config: &Config{ + ClaudeUpstream: "https://api.anthropic.com", + CodexUpstream: upstream.URL, + LocalToken: "local-tok", + }, + CodexUpgradeTransport: &CodexTokenTransport{ + Selector: &fakeCodexSelector{account: &codex.CodexAccount{AccessToken: "codex-tok"}}, + Inner: http.DefaultTransport, + }, + Diag: diag, + } + + handler, err := srv.handler() + if err != nil { + t.Fatalf("handler() error = %v", err) + } + proxy := httptest.NewServer(handler) + defer proxy.Close() + + wsURL := "ws" + strings.TrimPrefix(proxy.URL, "http") + legacyCodexResponsesPath + conn, resp, err := websocket.DefaultDialer.Dial(wsURL, nil) + if err != nil { + if resp != nil { + defer resp.Body.Close() + } + t.Fatalf("Dial() error = %v", err) + } + if err := conn.WriteMessage(websocket.TextMessage, []byte("ping")); err != nil { + t.Fatalf("WriteMessage() error = %v", err) + } + if _, message, err := conn.ReadMessage(); err != nil { + t.Fatalf("ReadMessage() error = %v", err) + } else if string(message) != "pong" { + t.Fatalf("message = %q, want pong", message) + } + _ = conn.Close() + + events := waitForDiagnosticsEvents(t, path, 1) + ev := events[0] + if ev.Method != http.MethodGet || ev.Path != legacyCodexResponsesPath || ev.Provider != "codex" { + t.Fatalf("event route = %+v", ev) + } + if ev.RouteKind != "codex_legacy_websocket" { + t.Fatalf("RouteKind = %q, want codex_legacy_websocket", ev.RouteKind) + } + if ev.StatusCode != http.StatusSwitchingProtocols { + t.Fatalf("StatusCode = %d, want 101", ev.StatusCode) + } + assertDiagnosticsLogDoesNotContain(t, path, "codex-tok") +} + +func TestServerDiagnosticsCompactRoutesEmitEvents(t *testing.T) { + for _, tc := range []struct { + name string + path string + }{ + {name: "canonical", path: codexCompactResponsesPath}, + {name: "legacy", path: legacyCodexCompactResponsesPath}, + } { + t.Run(tc.name, func(t *testing.T) { + path := filepath.Join(t.TempDir(), "routes.jsonl") + diag, err := OpenDiagnosticsWriter(path) + if err != nil { + t.Fatalf("OpenDiagnosticsWriter: %v", err) + } + defer diag.Close() + + var gotPath string + srv := &Server{ + Config: &Config{ + ClaudeUpstream: "https://api.anthropic.com", + CodexUpstream: "https://chatgpt.com", + LocalToken: "tok", + }, + CodexTransport: &CodexTokenTransport{ + Selector: &fakeCodexSelector{account: &codex.CodexAccount{AccessToken: "codex-tok"}}, + Inner: roundTripFunc(func(r *http.Request) (*http.Response, error) { + gotPath = r.URL.Path + return &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: io.NopCloser(strings.NewReader(`{"object":"response.compact"}`)), + }, nil + }), + }, + Diag: diag, + } + + handler, err := srv.handler() + if err != nil { + t.Fatalf("handler() error = %v", err) + } + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, tc.path, strings.NewReader(`{"model":"gpt-5.4","previous_response_id":"resp_abc"}`)) + req.Header.Set("Content-Type", "application/json") + handler.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want 200, body: %s", w.Code, w.Body.String()) + } + if gotPath != "/responses/compact" { + t.Fatalf("upstream path = %q, want /responses/compact", gotPath) + } + if err := diag.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + events := readDiagnosticsEvents(t, path) + if len(events) != 1 { + t.Fatalf("events = %d, want 1", len(events)) + } + ev := events[0] + if ev.Method != http.MethodPost || ev.Path != tc.path || ev.Provider != "codex" { + t.Fatalf("event route = %+v", ev) + } + if ev.RouteKind != "codex_compact" { + t.Fatalf("RouteKind = %q, want codex_compact", ev.RouteKind) + } + if ev.Model != "gpt-5.4" { + t.Fatalf("Model = %q, want gpt-5.4", ev.Model) + } + if ev.StatusCode != http.StatusOK { + t.Fatalf("StatusCode = %d, want 200", ev.StatusCode) + } + }) + } +} + +func TestServerDiagnosticsLegacyCodexAppServerNonUpgradeRejectionEmitsEvent(t *testing.T) { + path := filepath.Join(t.TempDir(), "routes.jsonl") + diag, err := OpenDiagnosticsWriter(path) + if err != nil { + t.Fatalf("OpenDiagnosticsWriter: %v", err) + } + defer diag.Close() + + const localToken = "secret-proxy-token" + srv := &Server{ + Config: &Config{ + ClaudeUpstream: "https://api.anthropic.com", + CodexUpstream: "https://chatgpt.com/backend-api/codex", + LocalToken: localToken, + }, + Diag: diag, + } + + handler, err := srv.handler() + if err != nil { + t.Fatalf("handler() error = %v", err) + } + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, codexAppServerPath, nil) + handler.ServeHTTP(w, req) + + if w.Code != http.StatusUpgradeRequired { + t.Fatalf("status = %d, want 426, body: %s", w.Code, w.Body.String()) + } + if err := diag.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + events := readDiagnosticsEvents(t, path) + if len(events) != 1 { + t.Fatalf("events = %d, want 1", len(events)) + } + ev := events[0] + if ev.Method != http.MethodGet || ev.Path != codexAppServerPath || ev.Provider != "codex" { + t.Fatalf("event route = %+v", ev) + } + if ev.RouteKind != "codex_app_server" { + t.Fatalf("RouteKind = %q, want codex_app_server", ev.RouteKind) + } + if ev.StatusCode != http.StatusUpgradeRequired { + t.Fatalf("StatusCode = %d, want 426", ev.StatusCode) + } + if ev.Error != "invalid_request_error:websocket_upgrade_required" { + t.Fatalf("Error = %q, want websocket upgrade error code", ev.Error) + } + assertDiagnosticsLogDoesNotContain(t, path, localToken) +} + +func TestServerDiagnosticsCodexAppServerInvalidUpstreamEmitsSafeError(t *testing.T) { + path := filepath.Join(t.TempDir(), "routes.jsonl") + diag, err := OpenDiagnosticsWriter(path) + if err != nil { + t.Fatalf("OpenDiagnosticsWriter: %v", err) + } + defer diag.Close() + + srv := &Server{ + Config: &Config{ + ClaudeUpstream: "https://api.anthropic.com", + CodexUpstream: "ftp://chatgpt.example", + LocalToken: "local-token-secret", + }, + CodexUpgradeTransport: &CodexTokenTransport{ + Selector: &fakeCodexSelector{account: &codex.CodexAccount{Email: "codex@test.com", AccountID: "codex-account", AccessToken: "codex-token"}}, + Inner: http.DefaultTransport, + }, + Diag: diag, + } + + handler, err := srv.handler() + if err != nil { + t.Fatalf("handler() error = %v", err) + } + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, codexAppServerPath, nil) + req.Header.Set("Connection", "Upgrade") + req.Header.Set("Upgrade", "websocket") + handler.ServeHTTP(w, req) + + if w.Code != http.StatusInternalServerError { + t.Fatalf("status = %d, want 500, body: %s", w.Code, w.Body.String()) + } + if err := diag.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + events := readDiagnosticsEvents(t, path) + if len(events) != 1 { + t.Fatalf("events = %d, want 1", len(events)) + } + ev := events[0] + if ev.Error != "api_error:invalid_codex_upstream" { + t.Fatalf("Error = %q, want invalid upstream code", ev.Error) + } + if ev.StatusCode != http.StatusInternalServerError { + t.Fatalf("StatusCode = %d, want 500", ev.StatusCode) + } + assertDiagnosticsLogDoesNotContain(t, path, "local-token-secret") + assertDiagnosticsLogDoesNotContain(t, path, "codex-token") +} + +func TestServerDiagnosticsHealthEmitsEvent(t *testing.T) { + path := filepath.Join(t.TempDir(), "routes.jsonl") + diag, err := OpenDiagnosticsWriter(path) + if err != nil { + t.Fatalf("OpenDiagnosticsWriter: %v", err) + } + defer diag.Close() + + const localToken = "secret-proxy-token" + srv := &Server{ + Config: &Config{ + ClaudeUpstream: "https://api.anthropic.com", + LocalToken: localToken, + }, + Diag: diag, + } + + handler, err := srv.handler() + if err != nil { + t.Fatalf("handler() error = %v", err) + } + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/health", nil) + handler.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want 200, body: %s", w.Code, w.Body.String()) + } + if err := diag.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + events := readDiagnosticsEvents(t, path) + if len(events) != 1 { + t.Fatalf("events = %d, want 1", len(events)) + } + ev := events[0] + if ev.Method != http.MethodGet || ev.Path != "/health" || ev.Provider != "proxy" { + t.Fatalf("event route = %+v", ev) + } + if ev.RouteKind != "health" { + t.Fatalf("RouteKind = %q, want health", ev.RouteKind) + } + if ev.StatusCode != http.StatusOK { + t.Fatalf("StatusCode = %d, want 200", ev.StatusCode) + } + assertDiagnosticsLogDoesNotContain(t, path, localToken) +} + +func TestServerDiagnosticsDisabledNoEvent(t *testing.T) { + path := filepath.Join(t.TempDir(), "routes.jsonl") + future := time.Now().UnixMilli() + 3600_000 + sel := &fakeSelector{accounts: []keyring.ClaudeOAuth{ + {Email: "user@test.com", AccessToken: "real-token", ExpiresAt: future}, + }} + srv := &Server{ + Config: &Config{ + ClaudeUpstream: "https://api.anthropic.com", + LocalToken: "local-tok", + }, + Transport: &TokenTransport{ + Selector: sel, + Inner: roundTripFunc(func(_ *http.Request) (*http.Response, error) { + return makeResponse(http.StatusOK, `{"id":"msg_123"}`), nil + }), + }, + } + + handler := srv.proxyHandler(mustParseURL(srv.Config.ClaudeUpstream)) + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/messages", strings.NewReader(`{"model":"claude-sonnet","messages":[]}`)) + req.Header.Set("Authorization", "Bearer local-tok") + req.Header.Set("Content-Type", "application/json") + handler(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want 200", w.Code) + } + if _, err := os.Stat(path); !os.IsNotExist(err) { + t.Fatalf("diagnostics file exists or stat failed: %v", err) + } +} + +func TestServerHealthReportsDiagnosticsEnabled(t *testing.T) { + for _, tc := range []struct { + name string + enabled bool + }{ + {name: "disabled", enabled: false}, + {name: "enabled", enabled: true}, + } { + t.Run(tc.name, func(t *testing.T) { + path := filepath.Join(t.TempDir(), "routes.jsonl") + srv := &Server{} + if tc.enabled { + diag, err := OpenDiagnosticsWriter(path) + if err != nil { + t.Fatalf("OpenDiagnosticsWriter: %v", err) + } + defer diag.Close() + srv.Diag = diag + } + + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/health", nil) + srv.handleHealth(w, req) + + var resp struct { + Diagnostics struct { + Enabled bool `json:"enabled"` + } `json:"diagnostics"` + } + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatal(err) + } + if resp.Diagnostics.Enabled != tc.enabled { + t.Fatalf("diagnostics.enabled = %v, want %v", resp.Diagnostics.Enabled, tc.enabled) + } + if strings.Contains(w.Body.String(), path) { + t.Fatalf("health leaked diagnostics path: %s", w.Body.String()) + } + }) + } +} + +func assertDiagnosticsLogDoesNotContain(t *testing.T, path, needle string) { + t.Helper() + raw, err := os.ReadFile(path) + if err != nil { + t.Fatalf("read diagnostics log: %v", err) + } + if strings.Contains(string(raw), needle) { + t.Fatalf("diagnostics log leaked %q: %s", needle, raw) + } +} + +func waitForDiagnosticsEvents(t *testing.T, path string, want int) []RouteEvent { + t.Helper() + deadline := time.Now().Add(2 * time.Second) + for { + events := readDiagnosticsEvents(t, path) + if len(events) >= want { + return events + } + if time.Now().After(deadline) { + t.Fatalf("events = %d, want at least %d", len(events), want) + } + time.Sleep(10 * time.Millisecond) + } +} + func TestServer_InvalidToken(t *testing.T) { upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { t.Error("request should not reach upstream") diff --git a/internal/proxy/transport.go b/internal/proxy/transport.go index 5b87076..fa03128 100644 --- a/internal/proxy/transport.go +++ b/internal/proxy/transport.go @@ -59,6 +59,7 @@ func (t *TokenTransport) RoundTrip(req *http.Request) (*http.Response, error) { if err != nil { return nil, err } + noteRouteAccount(req.Context(), claudeAccountHint(acct), false) // Refresh upfront if token is already expired. token := acct.AccessToken @@ -160,6 +161,7 @@ func (t *TokenTransport) handle429(req *http.Request, resp *http.Response, faile token = refreshed } + noteRouteAccount(req.Context(), claudeAccountHint(alt), true) altResp, err := t.doRequest(req, token) if err != nil { last429Resp.Body.Close() @@ -387,6 +389,13 @@ func acctIdentifier(a *keyring.ClaudeOAuth) string { return a.AccessToken } +func claudeAccountHint(a *keyring.ClaudeOAuth) string { + if a == nil { + return "" + } + return redactedAccountHint("claude", a.AccountUUID, a.Email, a.AccessToken) +} + func tracksExhaustion(req *http.Request) bool { return req != nil && req.URL != nil && req.URL.Path == "/v1/messages" }