Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,16 @@ Important `proxy.json` fields:
| `local_token` | generated | Required bearer token for local proxy requests. |
| `pinned_claude_account` | unset | Optional Claude account email or UUID to force proxy selection. |
| `diagnostics_log` | unset | Optional JSONL routing diagnostics log path for advanced local debugging. |
| `payload_diagnostics_log` | unset | Optional JSONL payload diagnostics log path. **Disabled by default.** See warning below. |
| `headroom` | `false` | Enables the headroom compression bridge when true. |
| `headroom_mode` | `cache` | Compression strategy when set; valid values are `cache` and `token`. |

Routing diagnostics are disabled by default. To enable them, set `diagnostics_log` in `proxy.json` to a local file path and restart the proxy. The log is append-only JSONL containing redacted route metadata such as method, path, provider, route kind, status, latency, selected-account hint, failover flag, and safe error code. It is intended for advanced local debugging and UAT, and enabling it does not change routing policy.

**Payload diagnostics** (`payload_diagnostics_log`) are disabled by default. To enable them, set `payload_diagnostics_log` in `proxy.json` to a local file path and **restart the proxy** (hot reload is not supported). Each entry in the log is a JSONL record with fields including `time`, `method`, `path`, `provider`, `route_kind`, `model`, `client_kind`, `session_key`, `session_source`, `session_signal`, `frame_index`, `body_bytes`, and `body`. Codex WebSocket client text frames are logged with `route_kind: codex_websocket_frame` so native Codex sessions can expose signals such as new session, continuation, long session, clear, and compact transitions.

> **WARNING:** The payload diagnostics log contains **raw request bodies** including prompts, tool inputs, system prompts, compact summaries, and message content. It is unsafe to share without careful review. The log does not record headers, tokens, or credential values by itself, but the body content may contain sensitive information. The `session_key` and `session_source` fields are derived correlation metadata — `session_key` is a deterministic 12-hex-character hash of a session identifier (never the raw value), and `session_source` identifies which signal was used (e.g. `x-claude-code-session-id`, `session_id`, `x-codex-window-id`, `body:conversation_id`, `body:thread_id`, `body:previous_response_id`, `ws:thread_id`, `ws:previous_response_id`). Payload diagnostics can derive the key from known conversation/thread identifiers in JSON request bodies or Codex WebSocket frames when otherwise-identical local client sessions have no differentiating session header.

## Model Registry

`cq models` manages the local model registry used by the proxy, Claude Code model caches, and Codex model cache integration.
Expand Down
19 changes: 18 additions & 1 deletion cmd/cq/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ func runProxyStart(opts proxyCommandOptions) error {
claudeProvider := claudeprov.New(refreshClient)
quotaCache := proxy.NewQuotaCache(claudeProvider.FetchAccountUsage, cache.DefaultDir())
baseSelector := proxy.NewAccountSelector(discover, activeEmail, quotaCache)
selector := proxy.NewPinnedClaudeSelector(baseSelector, discover, cfg.PinnedClaudeAccount, quotaCache)
affinitySelector := proxy.NewSessionAffinitySelector(baseSelector, discover, quotaCache)
selector := proxy.NewPinnedClaudeSelector(affinitySelector, discover, cfg.PinnedClaudeAccount, quotaCache)
selector.SetPinExpireFunc(clearPersistedClaudePin)
if cfg.PinnedClaudeAccount != "" {
fmt.Fprintf(os.Stderr, "cq: pinned claude account: %s\n", cfg.PinnedClaudeAccount)
Expand Down Expand Up @@ -340,6 +341,21 @@ func runProxyStart(opts proxyCommandOptions) error {
}
}

var payloadDiag *proxy.PayloadWriter
if cfg.PayloadDiagnosticsLog != "" {
payloadDiag, err = proxy.OpenPayloadWriter(cfg.PayloadDiagnosticsLog)
if err != nil {
fmt.Fprintf(os.Stderr, "cq: payload diagnostics: %v (continuing without payload diagnostics)\n", err)
} else {
fmt.Fprintf(os.Stderr, "cq: payload diagnostics enabled — WARNING: log contains raw request bodies including prompts and message content\n")
defer func() {
if err := payloadDiag.Close(); err != nil {
fmt.Fprintf(os.Stderr, "cq: payload diagnostics: close: %v\n", err)
}
}()
}
}

srv := &proxy.Server{
Config: cfg,
Selector: selector,
Expand All @@ -350,6 +366,7 @@ func runProxyStart(opts proxyCommandOptions) error {
CodexUpgradeTransport: codexUpgradeTransport,
Headroom: headroom,
Diag: diagnostics,
PayloadDiag: payloadDiag,
HeadroomMode: resolvedMode,
Catalog: catalog,
Refresher: proxyRefresher,
Expand Down
19 changes: 19 additions & 0 deletions internal/proxy/codex_compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func (s *Server) handleNativeCodexCompact(w http.ResponseWriter, r *http.Request
Error: rec.diagnosticsError(),
}
event.applyRouteDiagnostics(routeDiag)
event.applySessionCorrelation(r.Header)
s.emitDiagnostics(event)
}()
}
Expand All @@ -98,6 +99,24 @@ func (s *Server) handleNativeCodexCompact(w http.ResponseWriter, r *http.Request
model = extractModel(body)
fmt.Fprintf(os.Stderr, "cq: route POST %s model=%q provider=codex (native compact)\n", requestPath, model)

// Emit payload diagnostics before forwarding.
if s.PayloadDiag != nil {
sessionKey, sessionSource := payloadSessionCorrelation(r.Header, body)
s.emitPayloadDiagnostics(PayloadEvent{
Time: time.Now().UTC(),
Method: r.Method,
Path: r.URL.Path,
Provider: "codex",
RouteKind: "codex_compact",
Model: model,
ClientKind: clientKindFromUserAgent(r.Header.Get("User-Agent")),
SessionKey: sessionKey,
SessionSource: sessionSource,
BodyBytes: len(body),
Body: encodeBody(body),
})
}

// Build upstream request targeting /responses/compact (no headroom applied).
upstreamURL := s.Config.CodexUpstream + "/responses/compact"
upReq, err := http.NewRequestWithContext(ctx, http.MethodPost, upstreamURL, bytes.NewReader(body))
Expand Down
68 changes: 52 additions & 16 deletions internal/proxy/codex_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,26 +58,25 @@ func (s *codexSelector) Select(ctx context.Context, exclude ...string) (*codex.C
}

func (s *codexSelector) selectAccount(accounts []codex.CodexAccount, excludeSet map[string]bool, requestedModel string, requireCompatible bool) *codex.CodexAccount {
var best *codex.CodexAccount
bestRemaining := -1

for i := range accounts {
a := &accounts[i]
if !s.isEligible(a, excludeSet, requestedModel, requireCompatible) {
continue
}
if a.IsActive {
result := *a
return &result
remaining := s.accountRemaining(a)
if s.betterCandidate(a, remaining, best, bestRemaining) {
best = a
bestRemaining = remaining
}
}

for i := range accounts {
a := &accounts[i]
if !s.isEligible(a, excludeSet, requestedModel, requireCompatible) {
continue
}
result := *a
return &result
if best == nil {
return nil
}
return nil
result := *best
return &result
}

func (s *codexSelector) isEligible(a *codex.CodexAccount, excludeSet map[string]bool, requestedModel string, requireCompatible bool) bool {
Expand All @@ -90,6 +89,38 @@ func (s *codexSelector) isEligible(a *codex.CodexAccount, excludeSet map[string]
return true
}

func (s *codexSelector) accountRemaining(a *codex.CodexAccount) int {
if s.quota == nil {
return -1
}
snap, ok := s.snapshot(a)
if !ok || time.Since(snap.FetchedAt) > transientQuotaMaxAge {
return -1
}
return snap.Result.MinRemainingPct()
}

func (s *codexSelector) betterCandidate(candidate *codex.CodexAccount, candidateRemaining int, current *codex.CodexAccount, currentRemaining int) bool {
if current == nil {
return true
}
if candidateRemaining >= 0 || currentRemaining >= 0 {
if candidateRemaining != currentRemaining {
if candidateRemaining == 0 && currentRemaining < 0 {
return false
}
if currentRemaining == 0 && candidateRemaining < 0 {
return true
}
return candidateRemaining > currentRemaining
}
}
if candidate.IsActive != current.IsActive {
return candidate.IsActive
}
return false
}

func codexRequestedModel(ctx context.Context) string {
if ctx == nil {
return ""
Expand All @@ -116,10 +147,7 @@ func (s *codexSelector) hasQuota(a *codex.CodexAccount) bool {
if s.quota == nil {
return true
}
snap, ok := s.quota.Snapshot(a.AccountID)
if !ok {
snap, ok = s.quota.Snapshot(a.Email)
}
snap, ok := s.snapshot(a)
if !ok {
return true
}
Expand All @@ -132,6 +160,14 @@ func (s *codexSelector) hasQuota(a *codex.CodexAccount) bool {
return snap.Result.MinRemainingPct() != 0
}

func (s *codexSelector) snapshot(a *codex.CodexAccount) (QuotaSnapshot, bool) {
snap, ok := s.quota.Snapshot(a.AccountID)
if !ok {
snap, ok = s.quota.Snapshot(a.Email)
}
return snap, ok
}

func codexAcctExcluded(a *codex.CodexAccount, excludeSet map[string]bool) bool {
return (a.Email != "" && excludeSet[a.Email]) ||
(a.AccountID != "" && excludeSet[a.AccountID]) ||
Expand Down
22 changes: 22 additions & 0 deletions internal/proxy/codex_selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,28 @@ func TestCodexSelector_SkipsExhaustedAccounts(t *testing.T) {
}
}

func TestCodexSelector_PrefersHigherQuotaOverActiveLowQuota(t *testing.T) {
now := time.Now()
quotaReader := stubQuotaReader{
"low": {Result: quota.Result{Windows: map[quota.WindowName]quota.Window{quota.Window5Hour: {RemainingPct: 5}}}, FetchedAt: now},
"high": {Result: quota.Result{Windows: map[quota.WindowName]quota.Window{quota.Window5Hour: {RemainingPct: 80}}}, FetchedAt: now},
}
sel := NewCodexSelector(func() []codex.CodexAccount {
return []codex.CodexAccount{
{AccountID: "low", Email: "low@test.com", AccessToken: "t1", IsActive: true},
{AccountID: "high", Email: "high@test.com", AccessToken: "t2", IsActive: false},
}
}, quotaReader)

acct, err := sel.Select(context.Background())
if err != nil {
t.Fatalf("Select error: %v", err)
}
if acct == nil || acct.Email != "high@test.com" {
t.Fatalf("got %+v, want high@test.com", acct)
}
}

func TestCodexSelector_DoesNotSwitchWhenAllAccountsExhausted(t *testing.T) {
now := time.Now()
quotaReader := stubQuotaReader{
Expand Down
7 changes: 7 additions & 0 deletions internal/proxy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ type Config struct {
// a specific account identified by email or AccountUUID. Omitted when empty.
PinnedClaudeAccount string `json:"pinned_claude_account,omitempty"`
DiagnosticsLog string `json:"diagnostics_log,omitempty"`
// PayloadDiagnosticsLog is the optional path to a JSONL file for payload
// diagnostics. When set, the proxy logs request body metadata (including raw
// request bodies) for every buffered request. Disabled by default.
// WARNING: this log contains raw request bodies including prompts, tool
// inputs, system prompts, compact summaries, and message content. Do not
// share without review. Requires a proxy restart to take effect.
PayloadDiagnosticsLog string `json:"payload_diagnostics_log,omitempty"`
}

// ResolvedHeadroomMode returns the effective HeadroomMode for this config.
Expand Down
52 changes: 52 additions & 0 deletions internal/proxy/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,58 @@ func TestConfigDiagnosticsLogDefaultDisabled(t *testing.T) {
}
}

func TestConfigPayloadDiagnosticsLogJSONRoundTrip(t *testing.T) {
cfg := Config{
Port: DefaultPort,
ClaudeUpstream: DefaultUpstream,
CodexUpstream: DefaultCodexUpstream,
LocalToken: "tok",
PayloadDiagnosticsLog: "/tmp/cq-payloads.jsonl",
}
data, err := json.Marshal(cfg)
if err != nil {
t.Fatal(err)
}

var raw map[string]json.RawMessage
if err := json.Unmarshal(data, &raw); err != nil {
t.Fatal(err)
}
if string(raw["payload_diagnostics_log"]) != `"/tmp/cq-payloads.jsonl"` {
t.Fatalf("payload_diagnostics_log = %s, want configured path in %s", raw["payload_diagnostics_log"], data)
}

var roundTrip Config
if err := json.Unmarshal(data, &roundTrip); err != nil {
t.Fatal(err)
}
if roundTrip.PayloadDiagnosticsLog != cfg.PayloadDiagnosticsLog {
t.Fatalf("PayloadDiagnosticsLog = %q, want %q", roundTrip.PayloadDiagnosticsLog, cfg.PayloadDiagnosticsLog)
}
}

func TestConfigPayloadDiagnosticsLogDefaultDisabled(t *testing.T) {
var cfg Config
if err := json.Unmarshal([]byte(`{"port":19280,"local_token":"tok"}`), &cfg); err != nil {
t.Fatal(err)
}
if cfg.PayloadDiagnosticsLog != "" {
t.Fatalf("PayloadDiagnosticsLog = %q, want empty", cfg.PayloadDiagnosticsLog)
}

data, err := json.Marshal(Config{Port: DefaultPort, LocalToken: "tok"})
if err != nil {
t.Fatal(err)
}
var raw map[string]json.RawMessage
if err := json.Unmarshal(data, &raw); err != nil {
t.Fatal(err)
}
if _, ok := raw["payload_diagnostics_log"]; ok {
t.Fatalf("payload_diagnostics_log should be omitted when empty: %s", data)
}
}

func TestConfigDiagnosticsLogPersisted(t *testing.T) {
configHome := t.TempDir()
t.Setenv("XDG_CONFIG_HOME", configHome)
Expand Down
Loading
Loading