From 55ede39625c6c921e3a12d056618071dd62579b8 Mon Sep 17 00:00:00 2001 From: Luke Kosewski Date: Wed, 10 Jun 2026 04:40:51 +0000 Subject: [PATCH 1/2] feat: add /metrics Prometheus endpoint Synthesize a Prometheus /metrics endpoint from the per-request data we already receive, plus live worker-pool state. tsheadroom calls headroom's bare compress() in isolated workers, so headroom's own aggregate metrics (proxy/client Prometheus, OTel) are never populated by us; instead we accumulate our own lifetime counters in Go. Two families are emitted: - headroom_*: reuse the names headroom's proxy emits for the subset we can populate faithfully (requests, tokens in/saved, our processing overhead, per-provider/model, inbound HTTP), so existing headroom dashboards can be repointed at this endpoint. - tsheadroom_*: native metrics with no headroom analog (pool saturation gauges, per-outcome counts, cold starts, tokens_after). Response-dependent proxy metrics (completion tokens, TTFB, cache read/write/bust, per-transform timing, waste signals) have no source on a pre_request hook and are deliberately omitted rather than reported as misleading zeros. Also parse aperture's always-present metadata.{provider,model} (sent regardless of the hook's send config) and prefer metadata.model as the authoritative model for compress() and the metric labels, falling back to the request body's model field. The Metrics type is nil-safe so the handler tests construct a Handler without one. The pool exposes a busy-slot atomic for the live gauges. Co-Authored-By: Claude Opus 4.8 (1M context) Signed-off-by: Luke Kosewski --- handler.go | 57 ++++++++-- main.go | 8 +- metrics.go | 282 ++++++++++++++++++++++++++++++++++++++++++++++++ metrics_test.go | 182 +++++++++++++++++++++++++++++++ pool.go | 12 +++ 5 files changed, 529 insertions(+), 12 deletions(-) create mode 100644 metrics.go create mode 100644 metrics_test.go diff --git a/handler.go b/handler.go index dc2bd47..0cda91f 100644 --- a/handler.go +++ b/handler.go @@ -18,11 +18,23 @@ import ( const maxBody = 50 << 20 // 50 MiB // hookCallData is the subset of aperture's HookCallData we consume. The hook is -// configured with send: ["request_body"], so that is the only field we read. +// configured with send: ["request_body"], so request_body is gated on that; the +// metadata object is always sent regardless of send, which is where aperture +// puts the resolved provider and model. type hookCallData struct { + Metadata hookMetadata `json:"metadata"` RequestBody json.RawMessage `json:"request_body"` } +// hookMetadata is the subset of aperture's always-present HookMetadata we use. +// provider and model are populated unconditionally by aperture (not gated by +// the hook's send config), so they're our authoritative source for both the +// per-provider/per-model metrics and the model passed to compress(). +type hookMetadata struct { + Provider string `json:"provider"` + Model string `json:"model"` +} + // guardrailResponse is aperture's GuardrailResponse. We only ever emit "allow" // or "modify" — never "block" — and always with HTTP 200. // @@ -67,17 +79,26 @@ type Handler struct { comp compressor settings *settingsStore // current compress knobs, read per request log *slog.Logger // operational logs + warnings (stderr) + metrics *Metrics // lifetime counters for /metrics (nil-safe; nil in tests) verbose bool // when set, emit a per-request summary to out out io.Writer // destination for verbose summaries (stdout) } -// summary holds the numbers reported in the -v per-request line. +// summary holds the numbers reported in the -v per-request line and folded into +// /metrics. type summary struct { inMessages int // number of messages received inBytes int // serialized size of received messages outBytes int // serialized size of returned messages + provider string // aperture-resolved provider (metrics label) + model string // aperture-resolved model (metrics label) + + tokensBefore int // res.TokensBefore (0 when no worker result) + tokensSaved int // res.TokensSaved (0 when no worker result) + tokensAfter int // res.TokensAfter (0 when no worker result) + workerMs float64 // worker-reported compress() time (0 when no worker result) cold bool // worker's first real request (paid the cold model load) modelLimit int // context-window limit the worker compressed against (0 when no result) @@ -85,6 +106,9 @@ type summary struct { } func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + h.metrics.inboundStart() + defer h.metrics.inboundDone() + if r.Method != http.MethodPost { http.Error(w, "method not allowed", http.StatusMethodNotAllowed) return @@ -92,11 +116,13 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { start := time.Now() resp, s := h.process(r) + durMs := float64(time.Since(start).Microseconds()) / 1000 if h.verbose { - fmt.Fprintf(h.out, "request in_msgs=%d in_bytes=%d out_bytes=%d dur_ms=%d worker_ms=%.0f cold=%t model_limit=%d -> %s\n", - s.inMessages, s.inBytes, s.outBytes, time.Since(start).Milliseconds(), s.workerMs, s.cold, s.modelLimit, s.reason) + fmt.Fprintf(h.out, "request in_msgs=%d in_bytes=%d out_bytes=%d dur_ms=%.0f worker_ms=%.0f cold=%t model_limit=%d -> %s\n", + s.inMessages, s.inBytes, s.outBytes, durMs, s.workerMs, s.cold, s.modelLimit, s.reason) } + h.metrics.record(s, durMs) writeJSON(w, http.StatusOK, resp) } @@ -121,25 +147,30 @@ func (h *Handler) process(r *http.Request) (guardrailResponse, summary) { // return the whole thing (aperture rejects non-object modified bodies). var reqBody map[string]any if err := json.Unmarshal(data.RequestBody, &reqBody); err != nil { - return allow, summary{reason: "allow(passthrough)"} + return allow, summary{reason: "allow(passthrough)", provider: data.Metadata.Provider, model: data.Metadata.Model} } // v1 handles only the `messages` shape (Anthropic/OpenAI). Anything else // (e.g. Gemini's `contents`, embeddings) passes through untouched. rawMessages, ok := reqBody["messages"] if !ok { - return allow, summary{reason: "allow(passthrough)"} + return allow, summary{reason: "allow(passthrough)", provider: data.Metadata.Provider, model: data.Metadata.Model} } messages, ok := rawMessages.([]any) if !ok { - return allow, summary{reason: "allow(passthrough)"} + return allow, summary{reason: "allow(passthrough)", provider: data.Metadata.Provider, model: data.Metadata.Model} + } + // Prefer aperture's resolved metadata.model (always present, authoritative); + // fall back to the request body's own model field. Empty -> worker default. + model := data.Metadata.Model + if model == "" { + model, _ = reqBody["model"].(string) } - model, _ := reqBody["model"].(string) // absent/non-string -> worker default // Byte sizes are only used for the -v summary; skip the marshal otherwise // (messages can be multi-MB). The message count is cheap, so keep it. // For an allow result, output size equals input size (body unchanged). - s := summary{inMessages: len(messages)} + s := summary{inMessages: len(messages), provider: data.Metadata.Provider, model: model} if h.verbose { s.inBytes = jsonLen(messages) s.outBytes = s.inBytes @@ -161,11 +192,15 @@ func (h *Handler) process(r *http.Request) (guardrailResponse, summary) { s.reason = "allow(error)" return allow, s } - // Worker timing/cold/limit are available for both noop and modify; record - // them so the -v line shows them regardless of the outcome. + // Worker timing/cold/limit and token counts are available for both noop and + // modify; record them so the -v line and /metrics reflect them regardless of + // the outcome. s.workerMs = res.ElapsedMs s.cold = res.ColdFirstCall s.modelLimit = res.ModelLimit + s.tokensBefore = res.TokensBefore + s.tokensSaved = res.TokensSaved + s.tokensAfter = res.TokensAfter if res.TokensSaved <= 0 { // No-op: nothing meaningful to change, so don't rewrite the body. s.reason = "allow(noop)" diff --git a/main.go b/main.go index b95c6c2..49ece6f 100644 --- a/main.go +++ b/main.go @@ -66,17 +66,23 @@ func main() { }, *maxCompress, log) defer pool.Shutdown() + metrics := newMetrics() + metrics.poolStats = pool.stats + handler := &Handler{ comp: pool, settings: settings, log: log, + metrics: metrics, verbose: *verbose, out: os.Stdout, } - // /config is the runtime tuning API; everything else is the aperture hook. + // /config is the runtime tuning API; /metrics is the Prometheus scrape + // endpoint; everything else is the aperture hook. mux := http.NewServeMux() mux.Handle("/config", &configHandler{store: settings, log: log}) + mux.Handle("/metrics", metrics) mux.Handle("/", handler) httpSrv := &http.Server{Handler: mux} diff --git a/metrics.go b/metrics.go new file mode 100644 index 0000000..f9e2282 --- /dev/null +++ b/metrics.go @@ -0,0 +1,282 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package main + +import ( + "fmt" + "math" + "net/http" + "sort" + "strings" + "sync" +) + +// Metrics accumulates lifetime counters for the /metrics endpoint. It is the +// Go-side synthesis of the per-request numbers we already receive (token +// savings, timing, outcome) plus live pool state — tsheadroom calls headroom's +// bare compress() in isolated workers, so headroom's own aggregate metrics +// (proxy/client Prometheus, OTel) are never populated by us and can't be +// surfaced. See the README "/metrics" section for which headroom_* names we +// emit faithfully and which proxy-only families we deliberately omit. +// +// All methods are safe on a nil *Metrics so tests (and any future caller) can +// construct a Handler without one. Every mutating method takes the mutex; the +// scrape (export) snapshots under the same lock. +type Metrics struct { + mu sync.Mutex + + // Processed hook calls (POSTs we ran through compression). + requestsTotal int64 + requestsFailed int64 // compress returned an error (we failed open) + tokensInput int64 // Σ tokens_before + tokensSaved int64 // Σ tokens_saved + tokensAfter int64 // Σ tokens_after (post-compression) + coldStarts int64 // worker first-real-call (paid the ML model load) + + // Per-request overhead: our full processing time, the latency we add to a + // request (headroom calls this "overhead" — optimization time, excludes LLM). + overheadSumMs float64 + overheadMinMs float64 + overheadMaxMs float64 + overheadCount int64 + + byProvider map[string]int64 + byModel map[string]int64 + byOutcome map[string]int64 + + // Inbound HTTP requests to the hook handler (method-agnostic): total seen, + // completed, and currently in flight. active doubles as a saturation signal. + inboundTotal int64 + inboundCompleted int64 + inboundActive int64 + + // poolStats, when set, returns (total slots, busy slots) for the live pool + // gauges. nil in tests or before wiring; the gauges are then omitted. + poolStats func() (total, busy int) +} + +func newMetrics() *Metrics { + return &Metrics{ + byProvider: map[string]int64{}, + byModel: map[string]int64{}, + byOutcome: map[string]int64{}, + overheadMinMs: math.Inf(1), + } +} + +// inboundStart records a newly-accepted inbound request (any method). +func (m *Metrics) inboundStart() { + if m == nil { + return + } + m.mu.Lock() + m.inboundTotal++ + m.inboundActive++ + m.mu.Unlock() +} + +// inboundDone records that an inbound request finished (any method/outcome). +func (m *Metrics) inboundDone() { + if m == nil { + return + } + m.mu.Lock() + m.inboundCompleted++ + if m.inboundActive > 0 { + m.inboundActive-- + } + m.mu.Unlock() +} + +// record folds one processed hook call into the lifetime counters. durMs is the +// handler's full processing time for the request. +func (m *Metrics) record(s summary, durMs float64) { + if m == nil { + return + } + outcome := outcomeFromReason(s.reason) + + m.mu.Lock() + defer m.mu.Unlock() + + m.requestsTotal++ + m.byOutcome[outcome]++ + m.byProvider[labelOrUnknown(s.provider)]++ + m.byModel[labelOrUnknown(s.model)]++ + m.tokensInput += int64(s.tokensBefore) + m.tokensSaved += int64(s.tokensSaved) + m.tokensAfter += int64(s.tokensAfter) + if s.cold { + m.coldStarts++ + } + if outcome == "error" { + m.requestsFailed++ + } + + m.overheadSumMs += durMs + m.overheadCount++ + if durMs < m.overheadMinMs { + m.overheadMinMs = durMs + } + if durMs > m.overheadMaxMs { + m.overheadMaxMs = durMs + } +} + +// outcomeFromReason maps the handler's verbose reason to a stable, low- +// cardinality outcome label for tsheadroom_requests_by_outcome. +func outcomeFromReason(reason string) string { + switch reason { + case "modify": + return "modify" + case "allow(noop)": + return "noop" + case "allow(error)": + return "error" + case "allow(read-error)": + return "read_error" + default: // allow(passthrough) and any future allow(...) shapes + return "passthrough" + } +} + +func labelOrUnknown(v string) string { + if v == "" { + return "unknown" + } + return v +} + +// ServeHTTP exposes the metrics in Prometheus text exposition format. +func (m *Metrics) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8") + _, _ = w.Write([]byte(m.export())) +} + +// export renders the current counters as Prometheus text. It mirrors the metric +// names headroom's proxy emits (so existing scrapers/dashboards can be +// repointed) for the subset we can populate faithfully, then adds tsheadroom_* +// native metrics that have no headroom analog (live pool saturation, per- +// outcome counts, cold starts, post-compression tokens). +func (m *Metrics) export() string { + if m == nil { + return "" + } + m.mu.Lock() + requestsTotal := m.requestsTotal + requestsFailed := m.requestsFailed + tokensInput := m.tokensInput + tokensSaved := m.tokensSaved + tokensAfter := m.tokensAfter + coldStarts := m.coldStarts + overheadSum := m.overheadSumMs + overheadCount := m.overheadCount + overheadMin := m.overheadMinMs + overheadMax := m.overheadMaxMs + inboundTotal := m.inboundTotal + inboundCompleted := m.inboundCompleted + inboundActive := m.inboundActive + byProvider := snapshotMap(m.byProvider) + byModel := snapshotMap(m.byModel) + byOutcome := snapshotMap(m.byOutcome) + m.mu.Unlock() + + var b strings.Builder + + // --- headroom_* : faithful subset (repoint-compatible) --- + scalar(&b, "headroom_requests_total", "counter", "Total compression hook calls processed", requestsTotal) + scalar(&b, "headroom_requests_failed_total", "counter", "Hook calls where compression errored and we failed open", requestsFailed) + scalar(&b, "headroom_tokens_input_total", "counter", "Total input tokens seen by compression (tokens_before)", tokensInput) + scalar(&b, "headroom_tokens_saved_total", "counter", "Tokens saved by compression (tokens_saved)", tokensSaved) + + overheadMinOut := 0.0 + if overheadCount > 0 { + overheadMinOut = round2(overheadMin) + } + scalar(&b, "headroom_overhead_ms_sum", "counter", "Sum of tsheadroom processing time in milliseconds (excludes upstream LLM)", round2(overheadSum)) + scalar(&b, "headroom_overhead_ms_count", "counter", "Count of observed tsheadroom overhead samples", overheadCount) + scalar(&b, "headroom_overhead_ms_min", "gauge", "Minimum observed tsheadroom overhead in milliseconds", overheadMinOut) + scalar(&b, "headroom_overhead_ms_max", "gauge", "Maximum observed tsheadroom overhead in milliseconds", round2(overheadMax)) + + scalar(&b, "headroom_inbound_requests_total", "counter", "All inbound HTTP requests accepted by the hook handler", inboundTotal) + scalar(&b, "headroom_inbound_requests_completed_total", "counter", "Inbound HTTP requests completed by the hook handler", inboundCompleted) + scalar(&b, "headroom_inbound_requests_active", "gauge", "Inbound HTTP requests currently in flight in the hook handler", inboundActive) + + labeled(&b, "headroom_requests_by_provider", "counter", "Requests by provider", "provider", byProvider) + labeled(&b, "headroom_requests_by_model", "counter", "Requests by model", "model", byModel) + + // --- tsheadroom_* : native metrics (no headroom analog) --- + scalar(&b, "tsheadroom_tokens_after_total", "counter", "Total output tokens after compression (tokens_after)", tokensAfter) + scalar(&b, "tsheadroom_cold_starts_total", "counter", "Worker first-real-call events that paid the one-time ML model load", coldStarts) + labeled(&b, "tsheadroom_requests_by_outcome", "counter", "Requests by outcome", "outcome", byOutcome) + + if m.poolStats != nil { + total, busy := m.poolStats() + scalar(&b, "tsheadroom_pool_slots_total", "gauge", "Total worker slots in the pool", int64(total)) + scalar(&b, "tsheadroom_pool_slots_busy", "gauge", "Worker slots currently running a compression", int64(busy)) + } + + return b.String() +} + +func snapshotMap(src map[string]int64) map[string]int64 { + out := make(map[string]int64, len(src)) + for k, v := range src { + out[k] = v + } + return out +} + +// scalar appends one unlabeled metric (HELP/TYPE/value) in Prometheus format. +// value may be an integer or float; it is rendered without scientific notation. +func scalar(b *strings.Builder, name, typ, help string, value any) { + fmt.Fprintf(b, "# HELP %s %s\n# TYPE %s %s\n%s %s\n\n", name, help, name, typ, name, fmtValue(value)) +} + +// labeled appends a metric family with one label dimension, sorted by label +// value for stable output. The family header is always emitted (even when +// empty) so dashboards can discover it on a fresh boot. +func labeled(b *strings.Builder, name, typ, help, label string, values map[string]int64) { + fmt.Fprintf(b, "# HELP %s %s\n# TYPE %s %s\n", name, help, name, typ) + keys := make([]string, 0, len(values)) + for k := range values { + keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { + fmt.Fprintf(b, "%s{%s=\"%s\"} %d\n", name, label, escapeLabel(k), values[k]) + } + b.WriteString("\n") +} + +func fmtValue(v any) string { + switch n := v.(type) { + case int64: + return fmt.Sprintf("%d", n) + case float64: + return strings.TrimSuffix(strings.TrimRight(fmt.Sprintf("%.2f", n), "0"), ".") + default: + return fmt.Sprintf("%v", n) + } +} + +func round2(f float64) float64 { + if math.IsInf(f, 0) || math.IsNaN(f) { + return 0 + } + return math.Round(f*100) / 100 +} + +// escapeLabel escapes a Prometheus label value (backslash, newline, quote), +// matching headroom's exporter so identical label values render identically. +func escapeLabel(v string) string { + v = strings.ReplaceAll(v, "\\", "\\\\") + v = strings.ReplaceAll(v, "\n", "\\n") + v = strings.ReplaceAll(v, "\"", "\\\"") + return v +} diff --git a/metrics_test.go b/metrics_test.go new file mode 100644 index 0000000..4dfdba4 --- /dev/null +++ b/metrics_test.go @@ -0,0 +1,182 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package main + +import ( + "context" + "net/http" + "net/http/httptest" + "strings" + "testing" +) + +// metricLine returns the value rendered for an exact metric line (name plus +// optional label set), or "" if the line is absent. It matches on the text up +// to the value so callers can assert exact emitted numbers. +func metricLine(t *testing.T, export, prefix string) string { + t.Helper() + for _, line := range strings.Split(export, "\n") { + if strings.HasPrefix(line, "#") { + continue + } + if strings.HasPrefix(line, prefix+" ") { + return strings.TrimSpace(strings.TrimPrefix(line, prefix)) + } + } + return "" +} + +func TestMetrics_NilSafe(t *testing.T) { + var m *Metrics // never constructed — mirrors a Handler built without metrics + // None of these may panic. + m.inboundStart() + m.record(summary{reason: "modify"}, 5) + m.inboundDone() + if got := m.export(); got != "" { + t.Fatalf("nil export = %q, want empty", got) + } +} + +func TestMetrics_RecordAndExport(t *testing.T) { + m := newMetrics() + m.poolStats = func() (int, int) { return 8, 3 } + + // A successful modify, a no-op, and a failed (fail-open) request. + m.record(summary{ + reason: "modify", provider: "anthropic", model: "claude-x", + tokensBefore: 100, tokensSaved: 40, tokensAfter: 60, cold: true, + }, 12.5) + m.record(summary{ + reason: "allow(noop)", provider: "anthropic", model: "claude-x", + tokensBefore: 10, tokensSaved: 0, tokensAfter: 10, + }, 2) + m.record(summary{reason: "allow(error)", provider: "openai", model: "gpt-x"}, 1) + + out := m.export() + + checks := map[string]string{ + "headroom_requests_total": "3", + "headroom_requests_failed_total": "1", + "headroom_tokens_input_total": "110", + "headroom_tokens_saved_total": "40", + "headroom_overhead_ms_count": "3", + "headroom_inbound_requests_total": "0", // no inbound* recorded here + "tsheadroom_tokens_after_total": "70", + "tsheadroom_cold_starts_total": "1", + "tsheadroom_pool_slots_total": "8", + "tsheadroom_pool_slots_busy": "3", + `headroom_requests_by_provider{provider="anthropic"}`: "2", + `headroom_requests_by_provider{provider="openai"}`: "1", + `headroom_requests_by_model{model="claude-x"}`: "2", + `tsheadroom_requests_by_outcome{outcome="modify"}`: "1", + `tsheadroom_requests_by_outcome{outcome="noop"}`: "1", + `tsheadroom_requests_by_outcome{outcome="error"}`: "1", + } + for prefix, want := range checks { + if got := metricLine(t, out, prefix); got != want { + t.Errorf("%s = %q, want %q\n--- export ---\n%s", prefix, got, want, out) + } + } + + // Every emitted family must carry HELP/TYPE headers. + for _, name := range []string{"headroom_requests_total", "headroom_requests_by_provider", "tsheadroom_pool_slots_busy"} { + if !strings.Contains(out, "# TYPE "+name+" ") { + t.Errorf("missing # TYPE header for %s", name) + } + } +} + +func TestMetrics_UnknownLabelFallback(t *testing.T) { + m := newMetrics() + m.record(summary{reason: "allow(passthrough)"}, 1) // no provider/model + out := m.export() + if got := metricLine(t, out, `headroom_requests_by_provider{provider="unknown"}`); got != "1" { + t.Errorf("empty provider should fall back to unknown; export:\n%s", out) + } + if got := metricLine(t, out, `headroom_requests_by_model{model="unknown"}`); got != "1" { + t.Errorf("empty model should fall back to unknown; export:\n%s", out) + } +} + +func TestMetrics_InboundGauge(t *testing.T) { + m := newMetrics() + m.inboundStart() + m.inboundStart() + m.inboundDone() // one still in flight + out := m.export() + if got := metricLine(t, out, "headroom_inbound_requests_total"); got != "2" { + t.Errorf("inbound_total = %q, want 2", got) + } + if got := metricLine(t, out, "headroom_inbound_requests_completed_total"); got != "1" { + t.Errorf("inbound_completed = %q, want 1", got) + } + if got := metricLine(t, out, "headroom_inbound_requests_active"); got != "1" { + t.Errorf("inbound_active = %q, want 1", got) + } +} + +func TestMetrics_PoolGaugesOmittedWhenUnset(t *testing.T) { + m := newMetrics() // poolStats left nil + if strings.Contains(m.export(), "tsheadroom_pool_slots_total") { + t.Error("pool gauges must be omitted when poolStats is nil") + } +} + +func TestMetrics_HTTPEndpoint(t *testing.T) { + m := newMetrics() + m.record(summary{reason: "modify", provider: "anthropic", model: "claude-x", tokensSaved: 5}, 1) + + // GET serves the exposition. + rec := httptest.NewRecorder() + m.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/metrics", nil)) + if rec.Code != http.StatusOK { + t.Fatalf("GET /metrics = %d, want 200", rec.Code) + } + if ct := rec.Header().Get("Content-Type"); !strings.HasPrefix(ct, "text/plain") { + t.Errorf("Content-Type = %q, want text/plain...", ct) + } + if !strings.Contains(rec.Body.String(), "headroom_tokens_saved_total 5") { + t.Errorf("body missing recorded metric:\n%s", rec.Body.String()) + } + + // Non-GET is rejected. + rec = httptest.NewRecorder() + m.ServeHTTP(rec, httptest.NewRequest(http.MethodPost, "/metrics", nil)) + if rec.Code != http.StatusMethodNotAllowed { + t.Errorf("POST /metrics = %d, want 405", rec.Code) + } +} + +// TestMetrics_HandlerIntegration drives a request through the Handler and +// confirms the provider/model from aperture metadata land in /metrics. +func TestMetrics_HandlerIntegration(t *testing.T) { + h := newTestHandler(func(_ context.Context, req compressRequest) (*compressResult, error) { + if req.Model != "claude-from-metadata" { + t.Errorf("compress model = %q, want metadata model", req.Model) + } + return &compressResult{Messages: []any{"x"}, TokensBefore: 50, TokensSaved: 20, TokensAfter: 30}, nil + }) + h.metrics = newMetrics() + + body := `{"metadata":{"provider":"anthropic","model":"claude-from-metadata"}, + "request_body":{"messages":[{"role":"user","content":"big"}]}}` + resp := doHook(t, h, body) + if resp.Action != "modify" { + t.Fatalf("action = %q, want modify", resp.Action) + } + + out := h.metrics.export() + if got := metricLine(t, out, `headroom_requests_by_provider{provider="anthropic"}`); got != "1" { + t.Errorf("provider from metadata not recorded; export:\n%s", out) + } + if got := metricLine(t, out, `headroom_requests_by_model{model="claude-from-metadata"}`); got != "1" { + t.Errorf("model from metadata not recorded; export:\n%s", out) + } + if got := metricLine(t, out, "headroom_tokens_saved_total"); got != "20" { + t.Errorf("tokens_saved = %q, want 20", got) + } + if got := metricLine(t, out, "headroom_inbound_requests_active"); got != "0" { + t.Errorf("inbound_active = %q, want 0 after request completes", got) + } +} diff --git a/pool.go b/pool.go index 6477c0d..7d09cef 100644 --- a/pool.go +++ b/pool.go @@ -14,6 +14,7 @@ import ( "os" "os/exec" "sync" + "sync/atomic" "syscall" "time" ) @@ -98,11 +99,19 @@ type Pool struct { maxCompress time.Duration // hard cap on a single worker call before recycle log *slog.Logger + size int // number of slots (for the saturation gauge) + busy atomic.Int64 // slots currently running a compression (for the gauge) + ctx context.Context cancel context.CancelFunc wg sync.WaitGroup } +// stats reports the pool's total and currently-busy slot counts for /metrics. +func (p *Pool) stats() (total, busy int) { + return p.size, int(p.busy.Load()) +} + // NewPool starts `size` slot goroutines, each spawning and supervising a worker // launched as `python -u script`. maxCompress is the hard cap on a single // worker call (see runSlot) — distinct from the caller's fail-open deadline. @@ -132,6 +141,7 @@ func newPool(size int, newCmd func() *exec.Cmd, maxCompress time.Duration, log * newCmd: newCmd, maxCompress: maxCompress, log: log, + size: size, ctx: ctx, cancel: cancel, } @@ -209,7 +219,9 @@ func (p *Pool) runSlot(idx int) { // send never blocks even after the client has given up. start := time.Now() hardCtx, cancel := context.WithTimeout(p.ctx, p.maxCompress) + p.busy.Add(1) res, err := w.do(hardCtx, j.req) + p.busy.Add(-1) cancel() dur := time.Since(start) if err != nil { From 5126569bd86ab9fc044b37979bfe5d36d54c5d61 Mon Sep 17 00:00:00 2001 From: Luke Kosewski Date: Wed, 10 Jun 2026 04:41:00 +0000 Subject: [PATCH 2/2] docs: document the /metrics endpoint Add a Metrics section to the README: the curl invocation, tables for the headroom_* (repoint-compatible) and tsheadroom_* (native) families, the rationale for the omitted proxy-only metrics, and an access note. Co-Authored-By: Claude Opus 4.8 (1M context) Signed-off-by: Luke Kosewski --- README.md | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/README.md b/README.md index fa3bc02..ff6adda 100644 --- a/README.md +++ b/README.md @@ -386,6 +386,40 @@ Tunable parameters (these mirror Headroom's `CompressConfig`): > **Access**: the `/config` endpoint is gated only by your [tailnet policy file](https://tailscale.com/docs/reference/syntax/policy-file): anyone who can reach the device can read and change its configuration. Lock the device down accordingly (see [Security](#security-and-data-handling)). +## Metrics (`/metrics`) + +tsheadroom exposes a Prometheus endpoint at `GET /metrics` on the same device. It uses the standard text exposition format, so any existing scraper works: + +```sh +curl -s http://tsheadroom..ts.net/metrics +``` + +Two families are emitted. The `headroom_*` metrics reuse the names Headroom's own proxy emits, so if you already scrape a Headroom proxy you can point the same dashboards at this endpoint: + +| metric | type | meaning | +|---|---|---| +| `headroom_requests_total` | counter | Compression hook calls processed. | +| `headroom_requests_failed_total` | counter | Calls where compression errored and we failed open. | +| `headroom_tokens_input_total` | counter | Input tokens seen (`tokens_before`). | +| `headroom_tokens_saved_total` | counter | Tokens saved by compression (`tokens_saved`). | +| `headroom_overhead_ms_{sum,count,min,max}` | counter/gauge | tsheadroom processing time per request, in ms (excludes the upstream LLM — that's what Headroom calls "overhead"). | +| `headroom_requests_by_provider{provider}` | counter | Requests per provider (from Aperture metadata). | +| `headroom_requests_by_model{model}` | counter | Requests per model (from Aperture metadata). | +| `headroom_inbound_requests_total` / `_completed_total` / `_active` | counter/gauge | Inbound HTTP requests to the hook handler; `_active` is the live in-flight count. | + +The `tsheadroom_*` metrics have no Headroom analog and describe this service specifically: + +| metric | type | meaning | +|---|---|---| +| `tsheadroom_pool_slots_total` / `tsheadroom_pool_slots_busy` | gauge | Worker-pool size and how many slots are compressing right now. When `busy` sits at `total`, the pool is saturated and new requests queue (see [ML model loading and timeouts](#ml-model-loading-and-timeouts)). | +| `tsheadroom_requests_by_outcome{outcome}` | counter | Requests by outcome: `modify`, `noop`, `error`, `passthrough`, `read_error`. | +| `tsheadroom_cold_starts_total` | counter | Worker first-real-call events that paid the one-time ML model load. | +| `tsheadroom_tokens_after_total` | counter | Output tokens after compression (`tokens_after`), so you can compute the realized ratio. | + +> **Why a subset?** tsheadroom is a `pre_request` hook, not a proxy: it runs `headroom.compress()` and never sees the upstream response. So Headroom proxy metrics that depend on the response or on prompt caching — completion tokens, TTFB, end-to-end latency, cache reads/writes and busts, per-transform timing, waste signals — have no source here and are deliberately omitted rather than reported as misleading zeros. + +> **Access**: like `/config`, `/metrics` is gated only by your tailnet policy file. Anyone who can reach the device can read it; it exposes aggregate counts and model/provider names, not request contents. + ### ML model loading and timeouts The ML text compressor loads a ~600 MB model on first use (one-time, then cached on disk and resident in each worker). This load takes several seconds; a large, late-session conversation can itself take a few seconds to compress. There is one worker timeout, and the client-facing wait is owned by Aperture: