diff --git a/README.md b/README.md index fa3bc02..ff6adda 100644 --- a/README.md +++ b/README.md @@ -386,6 +386,40 @@ Tunable parameters (these mirror Headroom's `CompressConfig`): > **Access**: the `/config` endpoint is gated only by your [tailnet policy file](https://tailscale.com/docs/reference/syntax/policy-file): anyone who can reach the device can read and change its configuration. Lock the device down accordingly (see [Security](#security-and-data-handling)). +## Metrics (`/metrics`) + +tsheadroom exposes a Prometheus endpoint at `GET /metrics` on the same device. It uses the standard text exposition format, so any existing scraper works: + +```sh +curl -s http://tsheadroom..ts.net/metrics +``` + +Two families are emitted. The `headroom_*` metrics reuse the names Headroom's own proxy emits, so if you already scrape a Headroom proxy you can point the same dashboards at this endpoint: + +| metric | type | meaning | +|---|---|---| +| `headroom_requests_total` | counter | Compression hook calls processed. | +| `headroom_requests_failed_total` | counter | Calls where compression errored and we failed open. | +| `headroom_tokens_input_total` | counter | Input tokens seen (`tokens_before`). | +| `headroom_tokens_saved_total` | counter | Tokens saved by compression (`tokens_saved`). | +| `headroom_overhead_ms_{sum,count,min,max}` | counter/gauge | tsheadroom processing time per request, in ms (excludes the upstream LLM — that's what Headroom calls "overhead"). | +| `headroom_requests_by_provider{provider}` | counter | Requests per provider (from Aperture metadata). | +| `headroom_requests_by_model{model}` | counter | Requests per model (from Aperture metadata). | +| `headroom_inbound_requests_total` / `_completed_total` / `_active` | counter/gauge | Inbound HTTP requests to the hook handler; `_active` is the live in-flight count. | + +The `tsheadroom_*` metrics have no Headroom analog and describe this service specifically: + +| metric | type | meaning | +|---|---|---| +| `tsheadroom_pool_slots_total` / `tsheadroom_pool_slots_busy` | gauge | Worker-pool size and how many slots are compressing right now. When `busy` sits at `total`, the pool is saturated and new requests queue (see [ML model loading and timeouts](#ml-model-loading-and-timeouts)). | +| `tsheadroom_requests_by_outcome{outcome}` | counter | Requests by outcome: `modify`, `noop`, `error`, `passthrough`, `read_error`. | +| `tsheadroom_cold_starts_total` | counter | Worker first-real-call events that paid the one-time ML model load. | +| `tsheadroom_tokens_after_total` | counter | Output tokens after compression (`tokens_after`), so you can compute the realized ratio. | + +> **Why a subset?** tsheadroom is a `pre_request` hook, not a proxy: it runs `headroom.compress()` and never sees the upstream response. So Headroom proxy metrics that depend on the response or on prompt caching — completion tokens, TTFB, end-to-end latency, cache reads/writes and busts, per-transform timing, waste signals — have no source here and are deliberately omitted rather than reported as misleading zeros. + +> **Access**: like `/config`, `/metrics` is gated only by your tailnet policy file. Anyone who can reach the device can read it; it exposes aggregate counts and model/provider names, not request contents. + ### ML model loading and timeouts The ML text compressor loads a ~600 MB model on first use (one-time, then cached on disk and resident in each worker). This load takes several seconds; a large, late-session conversation can itself take a few seconds to compress. There is one worker timeout, and the client-facing wait is owned by Aperture: diff --git a/handler.go b/handler.go index dc2bd47..0cda91f 100644 --- a/handler.go +++ b/handler.go @@ -18,11 +18,23 @@ import ( const maxBody = 50 << 20 // 50 MiB // hookCallData is the subset of aperture's HookCallData we consume. The hook is -// configured with send: ["request_body"], so that is the only field we read. +// configured with send: ["request_body"], so request_body is gated on that; the +// metadata object is always sent regardless of send, which is where aperture +// puts the resolved provider and model. type hookCallData struct { + Metadata hookMetadata `json:"metadata"` RequestBody json.RawMessage `json:"request_body"` } +// hookMetadata is the subset of aperture's always-present HookMetadata we use. +// provider and model are populated unconditionally by aperture (not gated by +// the hook's send config), so they're our authoritative source for both the +// per-provider/per-model metrics and the model passed to compress(). +type hookMetadata struct { + Provider string `json:"provider"` + Model string `json:"model"` +} + // guardrailResponse is aperture's GuardrailResponse. We only ever emit "allow" // or "modify" — never "block" — and always with HTTP 200. // @@ -67,17 +79,26 @@ type Handler struct { comp compressor settings *settingsStore // current compress knobs, read per request log *slog.Logger // operational logs + warnings (stderr) + metrics *Metrics // lifetime counters for /metrics (nil-safe; nil in tests) verbose bool // when set, emit a per-request summary to out out io.Writer // destination for verbose summaries (stdout) } -// summary holds the numbers reported in the -v per-request line. +// summary holds the numbers reported in the -v per-request line and folded into +// /metrics. type summary struct { inMessages int // number of messages received inBytes int // serialized size of received messages outBytes int // serialized size of returned messages + provider string // aperture-resolved provider (metrics label) + model string // aperture-resolved model (metrics label) + + tokensBefore int // res.TokensBefore (0 when no worker result) + tokensSaved int // res.TokensSaved (0 when no worker result) + tokensAfter int // res.TokensAfter (0 when no worker result) + workerMs float64 // worker-reported compress() time (0 when no worker result) cold bool // worker's first real request (paid the cold model load) modelLimit int // context-window limit the worker compressed against (0 when no result) @@ -85,6 +106,9 @@ type summary struct { } func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + h.metrics.inboundStart() + defer h.metrics.inboundDone() + if r.Method != http.MethodPost { http.Error(w, "method not allowed", http.StatusMethodNotAllowed) return @@ -92,11 +116,13 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { start := time.Now() resp, s := h.process(r) + durMs := float64(time.Since(start).Microseconds()) / 1000 if h.verbose { - fmt.Fprintf(h.out, "request in_msgs=%d in_bytes=%d out_bytes=%d dur_ms=%d worker_ms=%.0f cold=%t model_limit=%d -> %s\n", - s.inMessages, s.inBytes, s.outBytes, time.Since(start).Milliseconds(), s.workerMs, s.cold, s.modelLimit, s.reason) + fmt.Fprintf(h.out, "request in_msgs=%d in_bytes=%d out_bytes=%d dur_ms=%.0f worker_ms=%.0f cold=%t model_limit=%d -> %s\n", + s.inMessages, s.inBytes, s.outBytes, durMs, s.workerMs, s.cold, s.modelLimit, s.reason) } + h.metrics.record(s, durMs) writeJSON(w, http.StatusOK, resp) } @@ -121,25 +147,30 @@ func (h *Handler) process(r *http.Request) (guardrailResponse, summary) { // return the whole thing (aperture rejects non-object modified bodies). var reqBody map[string]any if err := json.Unmarshal(data.RequestBody, &reqBody); err != nil { - return allow, summary{reason: "allow(passthrough)"} + return allow, summary{reason: "allow(passthrough)", provider: data.Metadata.Provider, model: data.Metadata.Model} } // v1 handles only the `messages` shape (Anthropic/OpenAI). Anything else // (e.g. Gemini's `contents`, embeddings) passes through untouched. rawMessages, ok := reqBody["messages"] if !ok { - return allow, summary{reason: "allow(passthrough)"} + return allow, summary{reason: "allow(passthrough)", provider: data.Metadata.Provider, model: data.Metadata.Model} } messages, ok := rawMessages.([]any) if !ok { - return allow, summary{reason: "allow(passthrough)"} + return allow, summary{reason: "allow(passthrough)", provider: data.Metadata.Provider, model: data.Metadata.Model} + } + // Prefer aperture's resolved metadata.model (always present, authoritative); + // fall back to the request body's own model field. Empty -> worker default. + model := data.Metadata.Model + if model == "" { + model, _ = reqBody["model"].(string) } - model, _ := reqBody["model"].(string) // absent/non-string -> worker default // Byte sizes are only used for the -v summary; skip the marshal otherwise // (messages can be multi-MB). The message count is cheap, so keep it. // For an allow result, output size equals input size (body unchanged). - s := summary{inMessages: len(messages)} + s := summary{inMessages: len(messages), provider: data.Metadata.Provider, model: model} if h.verbose { s.inBytes = jsonLen(messages) s.outBytes = s.inBytes @@ -161,11 +192,15 @@ func (h *Handler) process(r *http.Request) (guardrailResponse, summary) { s.reason = "allow(error)" return allow, s } - // Worker timing/cold/limit are available for both noop and modify; record - // them so the -v line shows them regardless of the outcome. + // Worker timing/cold/limit and token counts are available for both noop and + // modify; record them so the -v line and /metrics reflect them regardless of + // the outcome. s.workerMs = res.ElapsedMs s.cold = res.ColdFirstCall s.modelLimit = res.ModelLimit + s.tokensBefore = res.TokensBefore + s.tokensSaved = res.TokensSaved + s.tokensAfter = res.TokensAfter if res.TokensSaved <= 0 { // No-op: nothing meaningful to change, so don't rewrite the body. s.reason = "allow(noop)" diff --git a/main.go b/main.go index b95c6c2..49ece6f 100644 --- a/main.go +++ b/main.go @@ -66,17 +66,23 @@ func main() { }, *maxCompress, log) defer pool.Shutdown() + metrics := newMetrics() + metrics.poolStats = pool.stats + handler := &Handler{ comp: pool, settings: settings, log: log, + metrics: metrics, verbose: *verbose, out: os.Stdout, } - // /config is the runtime tuning API; everything else is the aperture hook. + // /config is the runtime tuning API; /metrics is the Prometheus scrape + // endpoint; everything else is the aperture hook. mux := http.NewServeMux() mux.Handle("/config", &configHandler{store: settings, log: log}) + mux.Handle("/metrics", metrics) mux.Handle("/", handler) httpSrv := &http.Server{Handler: mux} diff --git a/metrics.go b/metrics.go new file mode 100644 index 0000000..f9e2282 --- /dev/null +++ b/metrics.go @@ -0,0 +1,282 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package main + +import ( + "fmt" + "math" + "net/http" + "sort" + "strings" + "sync" +) + +// Metrics accumulates lifetime counters for the /metrics endpoint. It is the +// Go-side synthesis of the per-request numbers we already receive (token +// savings, timing, outcome) plus live pool state — tsheadroom calls headroom's +// bare compress() in isolated workers, so headroom's own aggregate metrics +// (proxy/client Prometheus, OTel) are never populated by us and can't be +// surfaced. See the README "/metrics" section for which headroom_* names we +// emit faithfully and which proxy-only families we deliberately omit. +// +// All methods are safe on a nil *Metrics so tests (and any future caller) can +// construct a Handler without one. Every mutating method takes the mutex; the +// scrape (export) snapshots under the same lock. +type Metrics struct { + mu sync.Mutex + + // Processed hook calls (POSTs we ran through compression). + requestsTotal int64 + requestsFailed int64 // compress returned an error (we failed open) + tokensInput int64 // Σ tokens_before + tokensSaved int64 // Σ tokens_saved + tokensAfter int64 // Σ tokens_after (post-compression) + coldStarts int64 // worker first-real-call (paid the ML model load) + + // Per-request overhead: our full processing time, the latency we add to a + // request (headroom calls this "overhead" — optimization time, excludes LLM). + overheadSumMs float64 + overheadMinMs float64 + overheadMaxMs float64 + overheadCount int64 + + byProvider map[string]int64 + byModel map[string]int64 + byOutcome map[string]int64 + + // Inbound HTTP requests to the hook handler (method-agnostic): total seen, + // completed, and currently in flight. active doubles as a saturation signal. + inboundTotal int64 + inboundCompleted int64 + inboundActive int64 + + // poolStats, when set, returns (total slots, busy slots) for the live pool + // gauges. nil in tests or before wiring; the gauges are then omitted. + poolStats func() (total, busy int) +} + +func newMetrics() *Metrics { + return &Metrics{ + byProvider: map[string]int64{}, + byModel: map[string]int64{}, + byOutcome: map[string]int64{}, + overheadMinMs: math.Inf(1), + } +} + +// inboundStart records a newly-accepted inbound request (any method). +func (m *Metrics) inboundStart() { + if m == nil { + return + } + m.mu.Lock() + m.inboundTotal++ + m.inboundActive++ + m.mu.Unlock() +} + +// inboundDone records that an inbound request finished (any method/outcome). +func (m *Metrics) inboundDone() { + if m == nil { + return + } + m.mu.Lock() + m.inboundCompleted++ + if m.inboundActive > 0 { + m.inboundActive-- + } + m.mu.Unlock() +} + +// record folds one processed hook call into the lifetime counters. durMs is the +// handler's full processing time for the request. +func (m *Metrics) record(s summary, durMs float64) { + if m == nil { + return + } + outcome := outcomeFromReason(s.reason) + + m.mu.Lock() + defer m.mu.Unlock() + + m.requestsTotal++ + m.byOutcome[outcome]++ + m.byProvider[labelOrUnknown(s.provider)]++ + m.byModel[labelOrUnknown(s.model)]++ + m.tokensInput += int64(s.tokensBefore) + m.tokensSaved += int64(s.tokensSaved) + m.tokensAfter += int64(s.tokensAfter) + if s.cold { + m.coldStarts++ + } + if outcome == "error" { + m.requestsFailed++ + } + + m.overheadSumMs += durMs + m.overheadCount++ + if durMs < m.overheadMinMs { + m.overheadMinMs = durMs + } + if durMs > m.overheadMaxMs { + m.overheadMaxMs = durMs + } +} + +// outcomeFromReason maps the handler's verbose reason to a stable, low- +// cardinality outcome label for tsheadroom_requests_by_outcome. +func outcomeFromReason(reason string) string { + switch reason { + case "modify": + return "modify" + case "allow(noop)": + return "noop" + case "allow(error)": + return "error" + case "allow(read-error)": + return "read_error" + default: // allow(passthrough) and any future allow(...) shapes + return "passthrough" + } +} + +func labelOrUnknown(v string) string { + if v == "" { + return "unknown" + } + return v +} + +// ServeHTTP exposes the metrics in Prometheus text exposition format. +func (m *Metrics) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8") + _, _ = w.Write([]byte(m.export())) +} + +// export renders the current counters as Prometheus text. It mirrors the metric +// names headroom's proxy emits (so existing scrapers/dashboards can be +// repointed) for the subset we can populate faithfully, then adds tsheadroom_* +// native metrics that have no headroom analog (live pool saturation, per- +// outcome counts, cold starts, post-compression tokens). +func (m *Metrics) export() string { + if m == nil { + return "" + } + m.mu.Lock() + requestsTotal := m.requestsTotal + requestsFailed := m.requestsFailed + tokensInput := m.tokensInput + tokensSaved := m.tokensSaved + tokensAfter := m.tokensAfter + coldStarts := m.coldStarts + overheadSum := m.overheadSumMs + overheadCount := m.overheadCount + overheadMin := m.overheadMinMs + overheadMax := m.overheadMaxMs + inboundTotal := m.inboundTotal + inboundCompleted := m.inboundCompleted + inboundActive := m.inboundActive + byProvider := snapshotMap(m.byProvider) + byModel := snapshotMap(m.byModel) + byOutcome := snapshotMap(m.byOutcome) + m.mu.Unlock() + + var b strings.Builder + + // --- headroom_* : faithful subset (repoint-compatible) --- + scalar(&b, "headroom_requests_total", "counter", "Total compression hook calls processed", requestsTotal) + scalar(&b, "headroom_requests_failed_total", "counter", "Hook calls where compression errored and we failed open", requestsFailed) + scalar(&b, "headroom_tokens_input_total", "counter", "Total input tokens seen by compression (tokens_before)", tokensInput) + scalar(&b, "headroom_tokens_saved_total", "counter", "Tokens saved by compression (tokens_saved)", tokensSaved) + + overheadMinOut := 0.0 + if overheadCount > 0 { + overheadMinOut = round2(overheadMin) + } + scalar(&b, "headroom_overhead_ms_sum", "counter", "Sum of tsheadroom processing time in milliseconds (excludes upstream LLM)", round2(overheadSum)) + scalar(&b, "headroom_overhead_ms_count", "counter", "Count of observed tsheadroom overhead samples", overheadCount) + scalar(&b, "headroom_overhead_ms_min", "gauge", "Minimum observed tsheadroom overhead in milliseconds", overheadMinOut) + scalar(&b, "headroom_overhead_ms_max", "gauge", "Maximum observed tsheadroom overhead in milliseconds", round2(overheadMax)) + + scalar(&b, "headroom_inbound_requests_total", "counter", "All inbound HTTP requests accepted by the hook handler", inboundTotal) + scalar(&b, "headroom_inbound_requests_completed_total", "counter", "Inbound HTTP requests completed by the hook handler", inboundCompleted) + scalar(&b, "headroom_inbound_requests_active", "gauge", "Inbound HTTP requests currently in flight in the hook handler", inboundActive) + + labeled(&b, "headroom_requests_by_provider", "counter", "Requests by provider", "provider", byProvider) + labeled(&b, "headroom_requests_by_model", "counter", "Requests by model", "model", byModel) + + // --- tsheadroom_* : native metrics (no headroom analog) --- + scalar(&b, "tsheadroom_tokens_after_total", "counter", "Total output tokens after compression (tokens_after)", tokensAfter) + scalar(&b, "tsheadroom_cold_starts_total", "counter", "Worker first-real-call events that paid the one-time ML model load", coldStarts) + labeled(&b, "tsheadroom_requests_by_outcome", "counter", "Requests by outcome", "outcome", byOutcome) + + if m.poolStats != nil { + total, busy := m.poolStats() + scalar(&b, "tsheadroom_pool_slots_total", "gauge", "Total worker slots in the pool", int64(total)) + scalar(&b, "tsheadroom_pool_slots_busy", "gauge", "Worker slots currently running a compression", int64(busy)) + } + + return b.String() +} + +func snapshotMap(src map[string]int64) map[string]int64 { + out := make(map[string]int64, len(src)) + for k, v := range src { + out[k] = v + } + return out +} + +// scalar appends one unlabeled metric (HELP/TYPE/value) in Prometheus format. +// value may be an integer or float; it is rendered without scientific notation. +func scalar(b *strings.Builder, name, typ, help string, value any) { + fmt.Fprintf(b, "# HELP %s %s\n# TYPE %s %s\n%s %s\n\n", name, help, name, typ, name, fmtValue(value)) +} + +// labeled appends a metric family with one label dimension, sorted by label +// value for stable output. The family header is always emitted (even when +// empty) so dashboards can discover it on a fresh boot. +func labeled(b *strings.Builder, name, typ, help, label string, values map[string]int64) { + fmt.Fprintf(b, "# HELP %s %s\n# TYPE %s %s\n", name, help, name, typ) + keys := make([]string, 0, len(values)) + for k := range values { + keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { + fmt.Fprintf(b, "%s{%s=\"%s\"} %d\n", name, label, escapeLabel(k), values[k]) + } + b.WriteString("\n") +} + +func fmtValue(v any) string { + switch n := v.(type) { + case int64: + return fmt.Sprintf("%d", n) + case float64: + return strings.TrimSuffix(strings.TrimRight(fmt.Sprintf("%.2f", n), "0"), ".") + default: + return fmt.Sprintf("%v", n) + } +} + +func round2(f float64) float64 { + if math.IsInf(f, 0) || math.IsNaN(f) { + return 0 + } + return math.Round(f*100) / 100 +} + +// escapeLabel escapes a Prometheus label value (backslash, newline, quote), +// matching headroom's exporter so identical label values render identically. +func escapeLabel(v string) string { + v = strings.ReplaceAll(v, "\\", "\\\\") + v = strings.ReplaceAll(v, "\n", "\\n") + v = strings.ReplaceAll(v, "\"", "\\\"") + return v +} diff --git a/metrics_test.go b/metrics_test.go new file mode 100644 index 0000000..4dfdba4 --- /dev/null +++ b/metrics_test.go @@ -0,0 +1,182 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package main + +import ( + "context" + "net/http" + "net/http/httptest" + "strings" + "testing" +) + +// metricLine returns the value rendered for an exact metric line (name plus +// optional label set), or "" if the line is absent. It matches on the text up +// to the value so callers can assert exact emitted numbers. +func metricLine(t *testing.T, export, prefix string) string { + t.Helper() + for _, line := range strings.Split(export, "\n") { + if strings.HasPrefix(line, "#") { + continue + } + if strings.HasPrefix(line, prefix+" ") { + return strings.TrimSpace(strings.TrimPrefix(line, prefix)) + } + } + return "" +} + +func TestMetrics_NilSafe(t *testing.T) { + var m *Metrics // never constructed — mirrors a Handler built without metrics + // None of these may panic. + m.inboundStart() + m.record(summary{reason: "modify"}, 5) + m.inboundDone() + if got := m.export(); got != "" { + t.Fatalf("nil export = %q, want empty", got) + } +} + +func TestMetrics_RecordAndExport(t *testing.T) { + m := newMetrics() + m.poolStats = func() (int, int) { return 8, 3 } + + // A successful modify, a no-op, and a failed (fail-open) request. + m.record(summary{ + reason: "modify", provider: "anthropic", model: "claude-x", + tokensBefore: 100, tokensSaved: 40, tokensAfter: 60, cold: true, + }, 12.5) + m.record(summary{ + reason: "allow(noop)", provider: "anthropic", model: "claude-x", + tokensBefore: 10, tokensSaved: 0, tokensAfter: 10, + }, 2) + m.record(summary{reason: "allow(error)", provider: "openai", model: "gpt-x"}, 1) + + out := m.export() + + checks := map[string]string{ + "headroom_requests_total": "3", + "headroom_requests_failed_total": "1", + "headroom_tokens_input_total": "110", + "headroom_tokens_saved_total": "40", + "headroom_overhead_ms_count": "3", + "headroom_inbound_requests_total": "0", // no inbound* recorded here + "tsheadroom_tokens_after_total": "70", + "tsheadroom_cold_starts_total": "1", + "tsheadroom_pool_slots_total": "8", + "tsheadroom_pool_slots_busy": "3", + `headroom_requests_by_provider{provider="anthropic"}`: "2", + `headroom_requests_by_provider{provider="openai"}`: "1", + `headroom_requests_by_model{model="claude-x"}`: "2", + `tsheadroom_requests_by_outcome{outcome="modify"}`: "1", + `tsheadroom_requests_by_outcome{outcome="noop"}`: "1", + `tsheadroom_requests_by_outcome{outcome="error"}`: "1", + } + for prefix, want := range checks { + if got := metricLine(t, out, prefix); got != want { + t.Errorf("%s = %q, want %q\n--- export ---\n%s", prefix, got, want, out) + } + } + + // Every emitted family must carry HELP/TYPE headers. + for _, name := range []string{"headroom_requests_total", "headroom_requests_by_provider", "tsheadroom_pool_slots_busy"} { + if !strings.Contains(out, "# TYPE "+name+" ") { + t.Errorf("missing # TYPE header for %s", name) + } + } +} + +func TestMetrics_UnknownLabelFallback(t *testing.T) { + m := newMetrics() + m.record(summary{reason: "allow(passthrough)"}, 1) // no provider/model + out := m.export() + if got := metricLine(t, out, `headroom_requests_by_provider{provider="unknown"}`); got != "1" { + t.Errorf("empty provider should fall back to unknown; export:\n%s", out) + } + if got := metricLine(t, out, `headroom_requests_by_model{model="unknown"}`); got != "1" { + t.Errorf("empty model should fall back to unknown; export:\n%s", out) + } +} + +func TestMetrics_InboundGauge(t *testing.T) { + m := newMetrics() + m.inboundStart() + m.inboundStart() + m.inboundDone() // one still in flight + out := m.export() + if got := metricLine(t, out, "headroom_inbound_requests_total"); got != "2" { + t.Errorf("inbound_total = %q, want 2", got) + } + if got := metricLine(t, out, "headroom_inbound_requests_completed_total"); got != "1" { + t.Errorf("inbound_completed = %q, want 1", got) + } + if got := metricLine(t, out, "headroom_inbound_requests_active"); got != "1" { + t.Errorf("inbound_active = %q, want 1", got) + } +} + +func TestMetrics_PoolGaugesOmittedWhenUnset(t *testing.T) { + m := newMetrics() // poolStats left nil + if strings.Contains(m.export(), "tsheadroom_pool_slots_total") { + t.Error("pool gauges must be omitted when poolStats is nil") + } +} + +func TestMetrics_HTTPEndpoint(t *testing.T) { + m := newMetrics() + m.record(summary{reason: "modify", provider: "anthropic", model: "claude-x", tokensSaved: 5}, 1) + + // GET serves the exposition. + rec := httptest.NewRecorder() + m.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/metrics", nil)) + if rec.Code != http.StatusOK { + t.Fatalf("GET /metrics = %d, want 200", rec.Code) + } + if ct := rec.Header().Get("Content-Type"); !strings.HasPrefix(ct, "text/plain") { + t.Errorf("Content-Type = %q, want text/plain...", ct) + } + if !strings.Contains(rec.Body.String(), "headroom_tokens_saved_total 5") { + t.Errorf("body missing recorded metric:\n%s", rec.Body.String()) + } + + // Non-GET is rejected. + rec = httptest.NewRecorder() + m.ServeHTTP(rec, httptest.NewRequest(http.MethodPost, "/metrics", nil)) + if rec.Code != http.StatusMethodNotAllowed { + t.Errorf("POST /metrics = %d, want 405", rec.Code) + } +} + +// TestMetrics_HandlerIntegration drives a request through the Handler and +// confirms the provider/model from aperture metadata land in /metrics. +func TestMetrics_HandlerIntegration(t *testing.T) { + h := newTestHandler(func(_ context.Context, req compressRequest) (*compressResult, error) { + if req.Model != "claude-from-metadata" { + t.Errorf("compress model = %q, want metadata model", req.Model) + } + return &compressResult{Messages: []any{"x"}, TokensBefore: 50, TokensSaved: 20, TokensAfter: 30}, nil + }) + h.metrics = newMetrics() + + body := `{"metadata":{"provider":"anthropic","model":"claude-from-metadata"}, + "request_body":{"messages":[{"role":"user","content":"big"}]}}` + resp := doHook(t, h, body) + if resp.Action != "modify" { + t.Fatalf("action = %q, want modify", resp.Action) + } + + out := h.metrics.export() + if got := metricLine(t, out, `headroom_requests_by_provider{provider="anthropic"}`); got != "1" { + t.Errorf("provider from metadata not recorded; export:\n%s", out) + } + if got := metricLine(t, out, `headroom_requests_by_model{model="claude-from-metadata"}`); got != "1" { + t.Errorf("model from metadata not recorded; export:\n%s", out) + } + if got := metricLine(t, out, "headroom_tokens_saved_total"); got != "20" { + t.Errorf("tokens_saved = %q, want 20", got) + } + if got := metricLine(t, out, "headroom_inbound_requests_active"); got != "0" { + t.Errorf("inbound_active = %q, want 0 after request completes", got) + } +} diff --git a/pool.go b/pool.go index 6477c0d..7d09cef 100644 --- a/pool.go +++ b/pool.go @@ -14,6 +14,7 @@ import ( "os" "os/exec" "sync" + "sync/atomic" "syscall" "time" ) @@ -98,11 +99,19 @@ type Pool struct { maxCompress time.Duration // hard cap on a single worker call before recycle log *slog.Logger + size int // number of slots (for the saturation gauge) + busy atomic.Int64 // slots currently running a compression (for the gauge) + ctx context.Context cancel context.CancelFunc wg sync.WaitGroup } +// stats reports the pool's total and currently-busy slot counts for /metrics. +func (p *Pool) stats() (total, busy int) { + return p.size, int(p.busy.Load()) +} + // NewPool starts `size` slot goroutines, each spawning and supervising a worker // launched as `python -u script`. maxCompress is the hard cap on a single // worker call (see runSlot) — distinct from the caller's fail-open deadline. @@ -132,6 +141,7 @@ func newPool(size int, newCmd func() *exec.Cmd, maxCompress time.Duration, log * newCmd: newCmd, maxCompress: maxCompress, log: log, + size: size, ctx: ctx, cancel: cancel, } @@ -209,7 +219,9 @@ func (p *Pool) runSlot(idx int) { // send never blocks even after the client has given up. start := time.Now() hardCtx, cancel := context.WithTimeout(p.ctx, p.maxCompress) + p.busy.Add(1) res, err := w.do(hardCtx, j.req) + p.busy.Add(-1) cancel() dur := time.Since(start) if err != nil {