From c97e4f58e1d972ec94b064d2261b0c68eaa2e98d Mon Sep 17 00:00:00 2001 From: Luke Kosewski Date: Tue, 9 Jun 2026 04:56:11 +0000 Subject: [PATCH 1/3] fix: preload ML model by default; tame HF Hub loads; add per-request diagnostics With headroom-ai[ml] installed, the ML text compressor (Kompress/ModernBERT) is reached under the default config: compress_system_messages defaults on, and headroom also uses Kompress as its fallback for tool/mixed content. But textEnabled() only checked compress_user_messages/target_ratio, so preload was off by default and every worker cold-loaded the ~600MB model on its first live request. Spread across the pool, that caused staggered per-slot loads and, on a large request hitting a still-cold worker, a deadline blow -> fail-open -> uncompressed passthrough. Fixes: - config.go: textEnabled() now includes CompressSystemMessages, so workers preload at startup under the default config and come up warm. - worker.py: configure HF env before importing headroom. Stay online when HF_TOKEN is set (honored only when set); otherwise go offline iff the models are already cached, removing the per-cold-load HF Hub revalidation round-trip (and its anonymous rate-limiting). Operator-set OFFLINE vars are respected. - main.go: -pool-size now defaults to 4 (was max(4, GOMAXPROCS)); drop the now-unused runtime import. Diagnostics: - handler.go: -v line distinguishes allow(noop|error|passthrough|read-error) from modify, and adds dur_ms, worker_ms, and cold. - pool.go: log slow worker calls (with cold_first_call) at Info, and log the previously-invisible "client deadline elapsed; worker continues warming" fail-open transition. - worker.py: return elapsed_ms + cold_first_call per response; emit a structured model-preload / warmup-failure line to stderr. README: pool-size default + memory caveat, preload-by-default behavior, and the HF offline-when-cached / HF_TOKEN opt-in. Co-Authored-By: Claude Opus 4.8 (1M context) Signed-off-by: Luke Kosewski --- README.md | 30 ++++++++++++++-------- config.go | 8 +++++- handler.go | 26 +++++++++++++------ main.go | 3 +-- pool.go | 23 ++++++++++++++++- worker.py | 74 +++++++++++++++++++++++++++++++++++++++++++++++++----- 6 files changed, 137 insertions(+), 27 deletions(-) diff --git a/README.md b/README.md index 3ca8e1e..1e45629 100644 --- a/README.md +++ b/README.md @@ -95,7 +95,7 @@ wish to wrap tsheadroom in a `systemd` or similar service manager. | `-python` | `python3` | Interpreter with `headroom-ai` installed (used to launch workers). | | `-worker` | `worker.py` | Path to the worker script. | | `-hostname` | `tsheadroom` | Node name on the tailnet. | -| `-pool-size` | `max(4, GOMAXPROCS)` | Number of persistent Python workers. | +| `-pool-size` | `4` | Number of persistent Python workers. Each holds a resident copy of the ML model (~600MB) when text compression is active, so raise it deliberately. | | `-deadline` | `4s` | Per-request fail-open deadline (client-facing). Keep it under Aperture's hook `timeout`. | | `-max-compress` | `60s` | Hard cap on a single worker call before it's recycled. Must exceed `-deadline`; covers one-time ML model loads (see "Tuning compression"). | | `-addr` | `:80` | Listen address on the tsnet node. | @@ -251,17 +251,27 @@ than the `-deadline` — so tsheadroom handles it with two separate timeouts: stays warm. Only a call exceeding the hard cap is treated as wedged and recycled. -The result: enabling text compression at runtime via `PUT /config` costs **at -most one** uncompressed (`allow`) request while the model loads, then it works — -**no restart needed**. To avoid even that one, workers **preload** the model at -startup whenever the persisted config has a text knob enabled (so new/restarted -workers come up warm). +The result: a cold worker pays **at most one** uncompressed (`allow`) request +while the model loads, then it works — **no restart needed**. To avoid even that, +workers **preload** the model at startup whenever the persisted config could +route content through the ML compressor. With `headroom-ai[ml]` installed this is +the **default**: `compress_system_messages` defaults on, and Headroom also uses +the ML Kompress model as its fallback for tool/mixed content — so the model is +needed for ordinary traffic even under an otherwise-default config, and workers +come up warm. (Each preloaded worker holds its own resident copy of the model, +which is why `-pool-size` defaults low — see the flag table.) The model is downloaded from HuggingFace on first ever use (cached under -`~/.cache/huggingface` thereafter). A worker has ~60s to report ready, so a slow -*cold* download on a fresh host can exceed that and make workers retry; warm the -cache once (start with a text knob on, or run a single `compress` under the -worker's interpreter) before serving production traffic. +`~/.cache/huggingface` thereafter). transformers revalidates that cache against +the HuggingFace Hub on each cold load; tsheadroom skips that network round-trip +(and its anonymous rate-limiting) by running the workers **offline once the model +is cached** (`HF_HUB_OFFLINE`/`TRANSFORMERS_OFFLINE`, set automatically). To +instead stay online with higher rate limits — e.g. to let a fresh host download +the model — set **`HF_TOKEN`** in the environment; it is honored only when set, +and takes precedence over the automatic offline mode. A worker has ~60s to report +ready, so a slow *cold* download on a fresh host can exceed that and make workers +retry; warm the cache once (start a worker with the cache empty, or run a single +`compress` under the worker's interpreter) before serving production traffic. ## What actually gets compressed diff --git a/config.go b/config.go index 6160181..6cb5073 100644 --- a/config.go +++ b/config.go @@ -70,8 +70,14 @@ func (s CompressSettings) validate() error { // preload that model at startup (passed to them via TSHEADROOM_PRELOAD; see // worker.py's _warmup). Keep this in sync with any new ML-driving knob added to // CompressSettings. +// +// CompressSystemMessages defaults to true, so with headroom-ai[ml] installed the +// ML model is needed for ordinary traffic even under an otherwise-default config +// (and headroom also uses Kompress as its fallback for tool/mixed content). We +// therefore include it here, which makes preload on by default — workers come up +// warm instead of cold-loading the ~600MB model on their first live request. func (s CompressSettings) textEnabled() bool { - return s.CompressUserMessages || s.TargetRatio != nil + return s.CompressUserMessages || s.CompressSystemMessages || s.TargetRatio != nil } // settingsStore holds the current settings behind an atomic pointer (read once diff --git a/handler.go b/handler.go index 51eb6fb..77309b0 100644 --- a/handler.go +++ b/handler.go @@ -55,6 +55,10 @@ type summary struct { inMessages int // number of messages received inBytes int // serialized size of received messages outBytes int // serialized size of returned messages + + workerMs float64 // worker-reported compress() time (0 when no worker result) + cold bool // worker's first real request (paid the cold model load) + reason string // why this action was chosen: modify / allow(noop|error|passthrough|read-error) } func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -63,11 +67,12 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + start := time.Now() resp, s := h.process(r) if h.verbose { - fmt.Fprintf(h.out, "request in_msgs=%d in_bytes=%d out_bytes=%d -> %s\n", - s.inMessages, s.inBytes, s.outBytes, resp.Action) + fmt.Fprintf(h.out, "request in_msgs=%d in_bytes=%d out_bytes=%d dur_ms=%d worker_ms=%.0f cold=%t -> %s\n", + s.inMessages, s.inBytes, s.outBytes, time.Since(start).Milliseconds(), s.workerMs, s.cold, s.reason) } writeJSON(w, http.StatusOK, resp) } @@ -81,30 +86,30 @@ func (h *Handler) process(r *http.Request) (guardrailResponse, summary) { body, err := io.ReadAll(io.LimitReader(r.Body, maxBody)) if err != nil { h.log.Warn("read request body failed; allowing", "err", err) - return allow, summary{} + return allow, summary{reason: "allow(read-error)"} } var data hookCallData if err := json.Unmarshal(body, &data); err != nil || len(data.RequestBody) == 0 { - return allow, summary{} + return allow, summary{reason: "allow(passthrough)"} } // request_body must be a JSON object so we can splice messages back and // return the whole thing (aperture rejects non-object modified bodies). var reqBody map[string]any if err := json.Unmarshal(data.RequestBody, &reqBody); err != nil { - return allow, summary{} + return allow, summary{reason: "allow(passthrough)"} } // v1 handles only the `messages` shape (Anthropic/OpenAI). Anything else // (e.g. Gemini's `contents`, embeddings) passes through untouched. rawMessages, ok := reqBody["messages"] if !ok { - return allow, summary{} + return allow, summary{reason: "allow(passthrough)"} } messages, ok := rawMessages.([]any) if !ok { - return allow, summary{} + return allow, summary{reason: "allow(passthrough)"} } model, _ := reqBody["model"].(string) // absent/non-string -> worker default @@ -127,10 +132,16 @@ func (h *Handler) process(r *http.Request) (guardrailResponse, summary) { }) if err != nil { h.log.Warn("compress failed; allowing", "err", err) + s.reason = "allow(error)" return allow, s } + // Worker timing/cold are available for both noop and modify; record them so + // the -v line shows them regardless of the outcome. + s.workerMs = res.ElapsedMs + s.cold = res.ColdFirstCall if res.TokensSaved <= 0 { // No-op: nothing meaningful to change, so don't rewrite the body. + s.reason = "allow(noop)" return allow, s } @@ -141,6 +152,7 @@ func (h *Handler) process(r *http.Request) (guardrailResponse, summary) { if h.verbose { s.outBytes = jsonLen(res.Messages) } + s.reason = "modify" return guardrailResponse{Action: "modify", RequestBody: reqBody}, s } diff --git a/main.go b/main.go index fdee202..9bb1522 100644 --- a/main.go +++ b/main.go @@ -22,7 +22,6 @@ import ( "os" "os/signal" "path/filepath" - "runtime" "syscall" "time" @@ -32,7 +31,7 @@ import ( func main() { var ( hostname = flag.String("hostname", "tsheadroom", "tsnet hostname (how this node appears on the tailnet)") - poolSize = flag.Int("pool-size", max(4, runtime.GOMAXPROCS(0)), "number of Python compression workers") + poolSize = flag.Int("pool-size", 4, "number of Python compression workers") deadline = flag.Duration("deadline", 4*time.Second, "per-request fail-open deadline (keep under aperture's hook timeout)") maxCompress = flag.Duration("max-compress", 60*time.Second, "hard cap on a single worker call before it's recycled (must exceed -deadline; covers one-time model loads)") python = flag.String("python", "python3", "Python interpreter with headroom-ai installed") diff --git a/pool.go b/pool.go index fb73021..475d531 100644 --- a/pool.go +++ b/pool.go @@ -36,6 +36,10 @@ type compressResult struct { TokensSaved int `json:"tokens_saved"` CompressionRatio float64 `json:"compression_ratio"` TransformsApplied []string `json:"transforms_applied"` + + // Diagnostics added by worker.py (not part of headroom's CompressResult). + ElapsedMs float64 `json:"elapsed_ms"` // worker-side compress() wall time + ColdFirstCall bool `json:"cold_first_call"` // this worker's first real request } // requestEnvelope / responseEnvelope are the NDJSON framing on the wire. @@ -56,6 +60,10 @@ const ( shutdownGrace = 5 * time.Second initialBackoff = 200 * time.Millisecond maxBackoff = 10 * time.Second + // slowCallLog is the worker-call duration above which we log a one-line + // notice at Info (visible without -v). It's meant to surface the one-time + // cold ML model load and any genuinely slow compress. + slowCallLog = 2 * time.Second ) // workerReadyTimeout bounds how long startWorker waits for a worker's @@ -151,6 +159,12 @@ func (p *Pool) Compress(ctx context.Context, req compressRequest) (*compressResu case r := <-j.resp: return r.result, r.err case <-ctx.Done(): + // The client's fail-open deadline fired while the worker is still + // running. We return now (the handler fails open and the request passes + // uncompressed), but the worker keeps going under the hard cap and stays + // warm for the next call. Log it so this "passthrough while warming" + // case — most likely on a cold worker's first large request — is visible. + p.log.Warn("client deadline elapsed before worker finished; failing open (worker continues, warming)", "err", ctx.Err()) return nil, ctx.Err() } } @@ -189,13 +203,20 @@ func (p *Pool) runSlot(idx int) { // and leaves the worker warm. Only a real error or the hard cap // (genuinely wedged) recycles the worker. j.resp is buffered, so the // send never blocks even after the client has given up. + start := time.Now() hardCtx, cancel := context.WithTimeout(p.ctx, p.maxCompress) res, err := w.do(hardCtx, j.req) cancel() + dur := time.Since(start) if err != nil { - p.log.Warn("worker request failed; recycling", "slot", idx, "err", err) + p.log.Warn("worker request failed; recycling", "slot", idx, "dur", dur, "err", err) w.kill() w = p.spawn(idx) + } else if dur >= slowCallLog { + // Surface slow calls without requiring -v; cold_first_call + // pinpoints the one-time ML model load. + p.log.Info("slow worker call", "slot", idx, "dur", dur, + "cold_first_call", res.ColdFirstCall, "worker_ms", res.ElapsedMs) } j.resp <- jobResult{result: res, err: err} } diff --git a/worker.py b/worker.py index efa9ec2..e0e1a15 100644 --- a/worker.py +++ b/worker.py @@ -38,15 +38,61 @@ import json import os import sys +import time from typing import Any -from headroom import compress +# Kompress (the ML text compressor) loads ModernBERT via transformers' +# from_pretrained(), which — without local_files_only — revalidates the cache +# against the HuggingFace Hub on every cold load. That network round-trip shows +# up as "unauthenticated requests to the HF Hub", adds latency to the first +# request a worker serves, and risks anonymous rate-limiting across a pool. We +# tame it *before* importing headroom (so transformers sees the env at import). +_KOMPRESS_REPOS = ("answerdotai/ModernBERT-base", "chopratejas/kompress-base") + + +def _models_cached() -> bool: + """True if the Kompress models are already in the local HF cache, so it's + safe to run transformers offline (no network needed to load them).""" + try: + from huggingface_hub import scan_cache_dir + + repos = {r.repo_id for r in scan_cache_dir().repos} + except Exception: # noqa: BLE001 - hub missing/unscannable -> assume not cached + return False + return set(_KOMPRESS_REPOS).issubset(repos) + + +def _configure_hf_env() -> None: + """Avoid a per-cold-load HF Hub round-trip on the model-load path. + + - If HF_TOKEN is set, stay online: the token raises rate limits and lets a + fresh host download the model. We change nothing else. + - Otherwise, if the models are already cached locally, force offline mode so + from_pretrained never touches the network. If they're not cached yet + (fresh host, no token), stay online so the first download can happen. + + Operator-set HF_HUB_OFFLINE / TRANSFORMERS_OFFLINE are respected (setdefault). + """ + if os.environ.get("HF_TOKEN"): + return + if _models_cached(): + os.environ.setdefault("HF_HUB_OFFLINE", "1") + os.environ.setdefault("TRANSFORMERS_OFFLINE", "1") + + +_configure_hf_env() + +from headroom import compress # noqa: E402 - must follow _configure_hf_env() # The protocol stream. main() repoints this at a private dup of the original # stdout and redirects fd 1 to stderr, so library output (HF/transformers/torch # progress, stray prints) can never corrupt the NDJSON the Go parent reads. _proto = sys.stdout +# Flips to True after this worker serves its first real request, so we can flag +# the cold call (the one that may pay the lazy model load) to the Go side. +_served_a_request = False + def _result_to_dict(result: Any) -> dict[str, Any]: """Project a CompressResult onto the plain fields the Go side consumes.""" @@ -77,8 +123,20 @@ def _compress(payload: dict[str, Any]) -> dict[str, Any]: if model: kwargs["model"] = model + global _served_a_request + cold_first_call = not _served_a_request + _served_a_request = True + + t0 = time.monotonic() result = compress(messages, **kwargs) - return _result_to_dict(result) + elapsed_ms = (time.monotonic() - t0) * 1000.0 + + out = _result_to_dict(result) + # Diagnostics for the Go side (handler -v line / pool slow-call log). A cold + # first call that's also slow is the lazy ML-model-load case. + out["elapsed_ms"] = round(elapsed_ms, 1) + out["cold_first_call"] = cold_first_call + return out def _emit(obj: dict[str, Any]) -> None: @@ -99,16 +157,20 @@ def _warmup() -> None: Always builds the (lazy) pipeline. When preload is requested, also runs a sizable user-message compress to force the ML model (Kompress) to load now — - otherwise that multi-second load would land on the first real request. - Best-effort: failures here never block serving.""" + otherwise that multi-second load would land on the first real request. The + load time is logged (to stderr, which the Go parent captures) so cold-start + cost is visible. Best-effort: failures here never block serving.""" try: if _preload_requested(): + t0 = time.monotonic() big = "The system processes the request and returns a response. " * 80 compress([{"role": "user", "content": big}], compress_user_messages=True) + dt = time.monotonic() - t0 + print(f"tsheadroom: ML model preload complete in {dt:.1f}s", file=sys.stderr, flush=True) else: compress([{"role": "user", "content": "warmup"}]) - except Exception: - pass + except Exception as e: # noqa: BLE001 - warmup is best-effort + print(f"tsheadroom: warmup failed ({type(e).__name__}): {e}", file=sys.stderr, flush=True) def main() -> int: From 6e2098cd93bcc61f7c936f758e7f96a36599e427 Mon Sep 17 00:00:00 2001 From: Luke Kosewski Date: Tue, 9 Jun 2026 07:39:40 +0000 Subject: [PATCH 2/3] fix: size compression to the model's real context window headroom.compress() drives its aggressiveness off context_pressure = tokens_before / model_limit, but defaults model_limit to a flat 200000 regardless of the model. So we were over-compressing big-context models (e.g. a 1M-token Gemini) and mis-sizing others. Resolve the real limit per request and pass it to compress(): - a tsheadroom-side override table of precompiled, case-insensitive, unanchored regexes (e.g. claude-opus-?4.8 -> 1,000,000), consulted first because the bundled Headroom registry lists no current Claude 4.x model and would otherwise default them to 200K; - then ModelRegistry.get_context_limit(model, default=200000); - then 200000. The resolved limit is surfaced back through the worker response, compressResult.ModelLimit, summary.modelLimit, and the -v line for visibility. An explicit model_limit in the runtime config is never overridden. Co-Authored-By: Claude Opus 4.8 (1M context) Signed-off-by: Luke Kosewski --- handler.go | 16 +++++++------ pool.go | 1 + tests/test_worker.py | 37 ++++++++++++++++++++++++++++++ worker.py | 54 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 101 insertions(+), 7 deletions(-) diff --git a/handler.go b/handler.go index 77309b0..0fa07bb 100644 --- a/handler.go +++ b/handler.go @@ -56,9 +56,10 @@ type summary struct { inBytes int // serialized size of received messages outBytes int // serialized size of returned messages - workerMs float64 // worker-reported compress() time (0 when no worker result) - cold bool // worker's first real request (paid the cold model load) - reason string // why this action was chosen: modify / allow(noop|error|passthrough|read-error) + workerMs float64 // worker-reported compress() time (0 when no worker result) + cold bool // worker's first real request (paid the cold model load) + modelLimit int // context-window limit the worker compressed against (0 when no result) + reason string // why this action was chosen: modify / allow(noop|error|passthrough|read-error) } func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -71,8 +72,8 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { resp, s := h.process(r) if h.verbose { - fmt.Fprintf(h.out, "request in_msgs=%d in_bytes=%d out_bytes=%d dur_ms=%d worker_ms=%.0f cold=%t -> %s\n", - s.inMessages, s.inBytes, s.outBytes, time.Since(start).Milliseconds(), s.workerMs, s.cold, s.reason) + fmt.Fprintf(h.out, "request in_msgs=%d in_bytes=%d out_bytes=%d dur_ms=%d worker_ms=%.0f cold=%t model_limit=%d -> %s\n", + s.inMessages, s.inBytes, s.outBytes, time.Since(start).Milliseconds(), s.workerMs, s.cold, s.modelLimit, s.reason) } writeJSON(w, http.StatusOK, resp) } @@ -135,10 +136,11 @@ func (h *Handler) process(r *http.Request) (guardrailResponse, summary) { s.reason = "allow(error)" return allow, s } - // Worker timing/cold are available for both noop and modify; record them so - // the -v line shows them regardless of the outcome. + // Worker timing/cold/limit are available for both noop and modify; record + // them so the -v line shows them regardless of the outcome. s.workerMs = res.ElapsedMs s.cold = res.ColdFirstCall + s.modelLimit = res.ModelLimit if res.TokensSaved <= 0 { // No-op: nothing meaningful to change, so don't rewrite the body. s.reason = "allow(noop)" diff --git a/pool.go b/pool.go index 475d531..e810e48 100644 --- a/pool.go +++ b/pool.go @@ -40,6 +40,7 @@ type compressResult struct { // Diagnostics added by worker.py (not part of headroom's CompressResult). ElapsedMs float64 `json:"elapsed_ms"` // worker-side compress() wall time ColdFirstCall bool `json:"cold_first_call"` // this worker's first real request + ModelLimit int `json:"model_limit"` // context-window limit compressed against } // requestEnvelope / responseEnvelope are the NDJSON framing on the wire. diff --git a/tests/test_worker.py b/tests/test_worker.py index 4b18e55..e3f7882 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -114,6 +114,43 @@ def test_non_dict_config_ignored(self): worker._compress({"messages": [{"role": "user"}], "config": "nope"}) self.assertEqual(self.last_kwargs(), {}) + def test_model_limit_override_regex_matches_variants(self): + # claude-opus-4-8 (1M context) is unknown to Headroom's registry, so the + # tsheadroom override supplies it. The override is a case-insensitive, + # unanchored regex (claude-opus-?4.8), so it covers dated/[1m]/punctuation + # variants, a provider-qualified prefix, and any case. + for model in ( + "claude-opus-4-8", + "claude-opus-4-8-20251101", + "claude-opus-4-8[1m]", + "claude-opus4-8", # optional dash after "opus" + "claude-opus-4.8", # "." matches the dot too + "anthropic/claude-opus-4-8", # unanchored: matches mid-string + "CLAUDE-OPUS-4-8", # case-insensitive + ): + worker._compress({"messages": [{"role": "user"}], "model": model}) + self.assertEqual(self.last_kwargs().get("model_limit"), 1_000_000, model) + + def test_model_limit_override_does_not_overmatch(self): + # A different Opus generation must not trip the opus-4-8 override. + worker._compress({"messages": [{"role": "user"}], "model": "claude-opus-4-1-20250805"}) + self.assertEqual(self.last_kwargs().get("model_limit"), worker._DEFAULT_MODEL_LIMIT) + + def test_model_limit_defaults_when_unresolved(self): + # Registry is stubbed out in these tests, so an unlisted model with no + # override falls back to the 200K default. + worker._compress({"messages": [{"role": "user"}], "model": "some-unknown-model"}) + self.assertEqual(self.last_kwargs().get("model_limit"), worker._DEFAULT_MODEL_LIMIT) + + def test_explicit_config_model_limit_wins(self): + # An operator-set model_limit in config is never overridden by the lookup. + worker._compress({ + "messages": [{"role": "user"}], + "model": "claude-opus-4-8", + "config": {"model_limit": 12345}, + }) + self.assertEqual(self.last_kwargs().get("model_limit"), 12345) + def test_non_list_messages_raises(self): with self.assertRaises(ValueError): worker._compress({"messages": "not a list"}) diff --git a/worker.py b/worker.py index e0e1a15..3fba16d 100644 --- a/worker.py +++ b/worker.py @@ -37,6 +37,7 @@ import json import os +import re import sys import time from typing import Any @@ -93,6 +94,50 @@ def _configure_hf_env() -> None: # the cold call (the one that may pay the lazy model load) to the Go side. _served_a_request = False +# Default context window when we can't resolve a model's real limit. Matches +# compress()'s own default; correct for most current Claude models. +_DEFAULT_MODEL_LIMIT = 200000 + +# tsheadroom-side context-window overrides, consulted BEFORE Headroom's registry. +# The bundled registry lists no current Claude 4.x model, so without these they +# fall to _DEFAULT_MODEL_LIMIT (200K) — which silently over-compresses a model +# whose real window is larger. Each entry is a precompiled case-insensitive regex +# matched with re.search (unanchored), so it matches anywhere in the model string +# and tolerates punctuation/prefix/suffix drift: r"claude-opus-?4.8" matches +# claude-opus-4-8, claude-opus4-8, claude-opus-4-8[1m], claude-opus-4.8, and a +# provider-qualified anthropic/claude-opus-4-8. Tighten a pattern if it would +# over-match a same-family model with a different window. +_MODEL_LIMIT_OVERRIDES: tuple[tuple[re.Pattern[str], int], ...] = ( + (re.compile(r"claude-opus-?4.8", re.IGNORECASE), 1_000_000), +) + +try: + from headroom.models.registry import ModelRegistry as _ModelRegistry +except Exception: # noqa: BLE001 - older headroom without the registry + _ModelRegistry = None + + +def _context_limit(model: str) -> int: + """Resolve a model's context-window limit. + + compress() drives its aggressiveness off context_pressure = + tokens_before / model_limit, but defaults model_limit to a flat 200000 + regardless of `model`. So passing the real limit matters: a big-context + model (e.g. a 1M-token Gemini, or claude-opus-4-8) would otherwise be + over-compressed, and a smaller one under-compressed. Resolution order: + tsheadroom overrides (regex search) -> Headroom's registry -> the 200K + default (used for models neither source knows, which is correct for the + current 200K Claude models the registry doesn't list yet).""" + for pat, limit in _MODEL_LIMIT_OVERRIDES: + if pat.search(model): + return limit + if _ModelRegistry is not None: + try: + return int(_ModelRegistry.get_context_limit(model, default=_DEFAULT_MODEL_LIMIT)) + except Exception: # noqa: BLE001 - registry shape changed; fall back + pass + return _DEFAULT_MODEL_LIMIT + def _result_to_dict(result: Any) -> dict[str, Any]: """Project a CompressResult onto the plain fields the Go side consumes.""" @@ -122,6 +167,14 @@ def _compress(payload: dict[str, Any]) -> dict[str, Any]: model = payload.get("model") if model: kwargs["model"] = model + # Resolve the real context window so compression aggressiveness is sized + # to the model, not compress()'s flat 200000 default. Don't override an + # explicit model_limit from config. + if "model_limit" not in kwargs: + kwargs["model_limit"] = _context_limit(model) + # 0 when neither config nor a model supplied one (compress() then uses its + # own default); surfaced on the -v line for visibility. + model_limit = int(kwargs.get("model_limit", 0)) global _served_a_request cold_first_call = not _served_a_request @@ -136,6 +189,7 @@ def _compress(payload: dict[str, Any]) -> dict[str, Any]: # first call that's also slow is the lazy ML-model-load case. out["elapsed_ms"] = round(elapsed_ms, 1) out["cold_first_call"] = cold_first_call + out["model_limit"] = model_limit return out From 528e1de7fa82b2356f47d2c2fb13c9ad8502b6ed Mon Sep 17 00:00:00 2001 From: Luke Kosewski Date: Tue, 9 Jun 2026 07:41:48 +0000 Subject: [PATCH 3/3] fix: remove the per-request soft deadline; rely on aperture's hook timeout MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit tsheadroom's 4s -deadline was a third timeout layered below two that already do the job: aperture's per-hook `timeout` (the client-facing latency ceiling, owned by the caller, which fail-opens on expiry) and the pool's -max-compress worker cap. Its only unique effect was to abandon compression *earlier* than aperture would have — exactly the slow, large-context requests this tool exists to compress. Remove it entirely: the handler passes the request context straight to the pool (cancelled when aperture's hook timeout fires or the client disconnects); a slow call runs to completion under -max-compress and leaves the worker warm. compress() has no "halt" signal, so every error still collapses to allow — tsheadroom never blocks. The guardrailResponse doc records why, and the WebSocket-client caveat that can't arise on aperture's request/response hook protocol today. Also: - raise the default worker pool 4 -> 8 (more concurrency headroom under the wider budget; ~4.8GB resident model RAM at 8); - README: recommend a 30s aperture hook timeout, drop the -deadline flag row and the two-timeouts framing, document the single worker cap; - refresh pool.go comments that still called the request context a "fail-open deadline". Co-Authored-By: Claude Opus 4.8 (1M context) Signed-off-by: Luke Kosewski --- README.md | 49 +++++++++++++++++++++++++++++++------------------ handler.go | 37 +++++++++++++++++++++++++++++++------ handler_test.go | 2 -- main.go | 12 ++---------- pool.go | 37 ++++++++++++++++++++----------------- 5 files changed, 84 insertions(+), 53 deletions(-) diff --git a/README.md b/README.md index 1e45629..bf57c9f 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ received by your Aperture instance and then forwarded to it, tsheadroom hands the request's `messages` array to Headroom's `compress()` function — which crushes bulky, low-information content (large tool outputs, search results, logs) while leaving prompts and recent turns intact — and returns a `modify` -action with the compressed body. If there's nothing worthwhile to compress,or +action with the compressed body. If there's nothing worthwhile to compress, or anything goes wrong, it returns `allow` and the request passes through unchanged. **It never blocks a request.** @@ -95,14 +95,13 @@ wish to wrap tsheadroom in a `systemd` or similar service manager. | `-python` | `python3` | Interpreter with `headroom-ai` installed (used to launch workers). | | `-worker` | `worker.py` | Path to the worker script. | | `-hostname` | `tsheadroom` | Node name on the tailnet. | -| `-pool-size` | `4` | Number of persistent Python workers. Each holds a resident copy of the ML model (~600MB) when text compression is active, so raise it deliberately. | -| `-deadline` | `4s` | Per-request fail-open deadline (client-facing). Keep it under Aperture's hook `timeout`. | -| `-max-compress` | `60s` | Hard cap on a single worker call before it's recycled. Must exceed `-deadline`; covers one-time ML model loads (see "Tuning compression"). | +| `-pool-size` | `8` | Number of persistent Python workers. Each holds a resident copy of the ML model (~600MB) when text compression is active (~4.8GB at 8), so raise it deliberately. | +| `-max-compress` | `60s` | Hard cap on a single worker call before the worker is recycled; the sole worker-side timeout. Covers one-time ML model loads (see "Tuning compression"). | | `-addr` | `:80` | Listen address on the tsnet node. | | `-state-dir` | tsnet default | tsnet state directory. **Persist this** — it holds the node identity (`tailscaled.state`). See note below. | | `-config` | `tsheadroom.config.json` | Path to the tunable compress-config file (see "Tuning compression"). Loaded at startup, rewritten on `PUT /config`. | | `-local-addr` | (off) | Serve plain HTTP here instead of tsnet — for local testing only. | -| `-v` | off | Log a one-line per-request summary (`in/out` sizes, `modify`/`allow`) to stdout. | +| `-v` | off | Log a one-line per-request summary (`in/out` sizes, timing, `model_limit`, `modify`/`allow`) to stdout. | `TS_AUTHKEY` (environment) provides the tailnet auth key on first start. You may harmlessly omit it for future restarts, once the state-dir has been @@ -153,7 +152,7 @@ node: "headroom": { "url": "http://tsheadroom..ts.net/", "fail_policy": "fail_open", // default; if tsheadroom is unreachable, send uncompressed - "timeout": "5s", // must be >= tsheadroom's -deadline (4s) + "timeout": "30s", // tsheadroom's only client-facing latency ceiling (see below) }, }, ``` @@ -186,9 +185,16 @@ Notes: (the model name is read from inside the body). - **`fail_open` is the right policy.** tsheadroom always answers `200` with `allow`/`modify` and never blocks; `fail_open` ensures that if the node is - *unreachable*, requests still proceed (just uncompressed). -- Set the hook **`timeout` ≥ tsheadroom's `-deadline`** so tsheadroom's own - fail-open fires before Aperture times the call out. + *unreachable* — or a compression runs long — requests still proceed (just + uncompressed). +- **The hook `timeout` is tsheadroom's only client-facing latency ceiling.** + tsheadroom has no soft timeout of its own: it waits for the compression and + returns, bounded only by this `timeout` (Aperture gives up and forwards the + original) and by its internal `-max-compress` worker cap. Set it to how long + you're willing to make a request wait for compression — `30s` matches + Headroom's own compression budget and comfortably fits large, late-session + requests. Lowering it just trades compression on the slowest requests for + latency; there's no separate tsheadroom knob to keep in sync. - Scope `models` to target specific providers, and use `preference` if you stack it with other guardrail hooks. @@ -241,15 +247,22 @@ change config). Lock the node down accordingly, or restrict who can reach it. ### ML model loading and timeouts The ML text compressor loads a ~600MB model on first use (one-time, then cached -on disk and resident in each worker). This load takes several seconds — longer -than the `-deadline` — so tsheadroom handles it with two separate timeouts: - -- **`-deadline` (4s)** bounds the *client* response: if a call is still running, - tsheadroom fails open (`allow`) so Aperture is never held up. -- **`-max-compress` (60s)** bounds the *worker*: the call keeps running in the - background past the deadline, so the model finishes loading and the worker - stays warm. Only a call exceeding the hard cap is treated as wedged and - recycled. +on disk and resident in each worker). This load takes several seconds, and a +large, late-session conversation can itself take a few seconds to compress. There +is **one** timeout that bounds the worker, and the client-facing wait is owned by +Aperture: + +- **`-max-compress` (60s)** bounds the *worker*: a single call may run this long + before the worker is treated as wedged and recycled. tsheadroom imposes no + shorter deadline of its own — a slow call runs to completion (the model + finishes loading, the worker stays warm) and the result is used if Aperture is + still waiting. +- **Aperture's hook `timeout`** bounds the *client*: when it fires, Aperture + (with `fail_open`) forwards the original request uncompressed. This is the only + client-facing latency ceiling, and it belongs in the Aperture config — there is + no tsheadroom-side `-deadline` to keep in sync. Set it to your tolerance for + waiting on compression (`30s` recommended; it matches Headroom's own budget and + fits large requests). The result: a cold worker pays **at most one** uncompressed (`allow`) request while the model loads, then it works — **no restart needed**. To avoid even that, diff --git a/handler.go b/handler.go index 0fa07bb..e248e2c 100644 --- a/handler.go +++ b/handler.go @@ -25,6 +25,29 @@ type hookCallData struct { // guardrailResponse is aperture's GuardrailResponse. We only ever emit "allow" // or "modify" — never "block" — and always with HTTP 200. +// +// Why never "block": Headroom's compress() is best-effort and has no "halt this +// request" signal. It never raises on an over-limit conversation; it just +// returns the most-compressed messages it can, even if they still exceed the +// model's context window. So nothing compress() returns ever means "refuse" — +// every compress error or shortfall collapses back to "allow" (forward the +// original, unchanged). If a conversation is genuinely too big even after +// compression, we forward it and the upstream provider returns its own native, +// model-tailored "prompt is too long" error, which request/response clients +// (Claude Code and friends) already recover from by auto-compacting and retrying. +// +// WHERE THIS WOULD BREAK — and why it can't today: the above assumes the client +// recovers from the provider's overflow error. A streaming/WebSocket client of +// the kind Headroom's own proxy deliberately fail-CLOSES for — one that decides +// when to compact from the upstream-reported token usage that our compression +// deflates, and that treats a mid-stream refuse (1009/413) as a fatal connection +// error — could instead lock up when we fail open on an oversized frame. We +// cannot hit that case: Aperture's hook protocol is request/response only (one +// discrete HTTP POST per call; no WebSocket, no incremental frame delivery), so +// no such client exists on this path. THERE ARE NONE TODAY. If Aperture ever +// grows WebSocket/streaming hook support, revisit this: we may then need a real +// "block" path that mimics the provider's overflow error shape so those clients +// compact instead of hanging. type guardrailResponse struct { Action string `json:"action"` RequestBody any `json:"request_body,omitempty"` @@ -43,8 +66,7 @@ type compressor interface { type Handler struct { comp compressor settings *settingsStore // current compress knobs, read per request - deadline time.Duration - log *slog.Logger // operational logs + warnings (stderr) + log *slog.Logger // operational logs + warnings (stderr) verbose bool // when set, emit a per-request summary to out out io.Writer // destination for verbose summaries (stdout) @@ -123,10 +145,13 @@ func (h *Handler) process(r *http.Request) (guardrailResponse, summary) { s.outBytes = s.inBytes } - ctx, cancel := context.WithTimeout(r.Context(), h.deadline) - defer cancel() - - res, err := h.comp.Compress(ctx, compressRequest{ + // No tsheadroom-side timeout: the wait is bounded by the request context + // (aperture hangs up at its hook timeout) and by the pool's hard cap (a + // runaway worker is recycled). A soft deadline here would only fail open + // *before* aperture would have — abandoning exactly the slow, large-context + // compressions this tool exists to perform. The latency ceiling belongs to + // aperture's per-hook `timeout`, which is owned by the caller. + res, err := h.comp.Compress(r.Context(), compressRequest{ Messages: messages, Model: model, Config: h.settings.get(), diff --git a/handler_test.go b/handler_test.go index 42695a0..908ceef 100644 --- a/handler_test.go +++ b/handler_test.go @@ -13,7 +13,6 @@ import ( "net/http/httptest" "strings" "testing" - "time" ) // fakeCompressor lets each test dictate the pool's behavior. @@ -29,7 +28,6 @@ func newTestHandler(fn func(ctx context.Context, req compressRequest) (*compress return &Handler{ comp: fakeCompressor{fn: fn}, settings: loadSettings("", quietLog()), - deadline: time.Second, log: quietLog(), out: io.Discard, } diff --git a/main.go b/main.go index 9bb1522..b95c6c2 100644 --- a/main.go +++ b/main.go @@ -31,9 +31,8 @@ import ( func main() { var ( hostname = flag.String("hostname", "tsheadroom", "tsnet hostname (how this node appears on the tailnet)") - poolSize = flag.Int("pool-size", 4, "number of Python compression workers") - deadline = flag.Duration("deadline", 4*time.Second, "per-request fail-open deadline (keep under aperture's hook timeout)") - maxCompress = flag.Duration("max-compress", 60*time.Second, "hard cap on a single worker call before it's recycled (must exceed -deadline; covers one-time model loads)") + poolSize = flag.Int("pool-size", 8, "number of Python compression workers") + maxCompress = flag.Duration("max-compress", 60*time.Second, "hard cap on a single worker call before it's recycled (covers one-time model loads); the sole worker-side timeout") python = flag.String("python", "python3", "Python interpreter with headroom-ai installed") script = flag.String("worker", "worker.py", "path to worker.py") addr = flag.String("addr", ":80", "listen address on the tsnet node") @@ -59,11 +58,6 @@ func main() { } settings := loadSettings(configPath, log) - if *maxCompress <= *deadline { - log.Warn("max-compress should exceed deadline; slow calls will be killed before the client's fail-open", - "max_compress", *maxCompress, "deadline", *deadline) - } - // Workers preload the ML model at startup when text compression is enabled; // the decision lives here (single source of truth) and is re-evaluated at // each spawn, so a worker respawned after a runtime change is up to date. @@ -75,7 +69,6 @@ func main() { handler := &Handler{ comp: pool, settings: settings, - deadline: *deadline, log: log, verbose: *verbose, out: os.Stdout, @@ -104,7 +97,6 @@ func main() { "hostname", *hostname, "addr", listenAddr(*localAddr, *addr), "pool_size", *poolSize, - "deadline", *deadline, "max_compress", *maxCompress, "python", *python, "config", configPath, diff --git a/pool.go b/pool.go index e810e48..e32bacd 100644 --- a/pool.go +++ b/pool.go @@ -73,8 +73,9 @@ const ( var workerReadyTimeout = 60 * time.Second // job is one unit of work handed to a slot goroutine. It carries no context: -// the client's fail-open deadline is enforced by Compress's own select, while -// the worker runs under the pool's hard cap (see runSlot). +// the caller's request context (cancelled when aperture's hook timeout fires or +// the client disconnects) is enforced by Compress's own select, while the worker +// runs under the pool's hard cap (see runSlot). type job struct { req compressRequest resp chan jobResult @@ -141,12 +142,13 @@ func newPool(size int, newCmd func() *exec.Cmd, maxCompress time.Duration, log * return p } -// Compress submits a job and waits for the result, bounded by ctx (the client's -// fail-open deadline). If ctx fires — while waiting for a free slot or while the -// worker is still running — the caller gets an error and fails open, but the -// worker keeps running under the pool's hard cap (see runSlot). That lets a slow -// first call (e.g. a one-time model load) finish and leave the worker warm, so -// the next call succeeds — no restart needed. +// Compress submits a job and waits for the result, bounded by ctx (the request +// context — cancelled when aperture's hook timeout fires or the client +// disconnects; tsheadroom sets no deadline of its own). If ctx fires — while +// waiting for a free slot or while the worker is still running — the caller gets +// an error and fails open, but the worker keeps running under the pool's hard cap +// (see runSlot). That lets a slow first call (e.g. a one-time model load) finish +// and leave the worker warm, so the next call succeeds — no restart needed. func (p *Pool) Compress(ctx context.Context, req compressRequest) (*compressResult, error) { j := job{req: req, resp: make(chan jobResult, 1)} select { @@ -160,12 +162,13 @@ func (p *Pool) Compress(ctx context.Context, req compressRequest) (*compressResu case r := <-j.resp: return r.result, r.err case <-ctx.Done(): - // The client's fail-open deadline fired while the worker is still - // running. We return now (the handler fails open and the request passes - // uncompressed), but the worker keeps going under the hard cap and stays - // warm for the next call. Log it so this "passthrough while warming" - // case — most likely on a cold worker's first large request — is visible. - p.log.Warn("client deadline elapsed before worker finished; failing open (worker continues, warming)", "err", ctx.Err()) + // The request context was cancelled (aperture's hook timeout fired, or + // the client went away) while the worker is still running. We return now + // (the handler fails open and the request passes uncompressed), but the + // worker keeps going under the hard cap and stays warm for the next call. + // Log it so this "passthrough while warming" case — most likely on a cold + // worker's first large request — is visible. + p.log.Warn("request canceled before worker finished; failing open (worker continues, warming)", "err", ctx.Err()) return nil, ctx.Err() } } @@ -198,9 +201,9 @@ func (p *Pool) runSlot(idx int) { return // spawn only returns nil on pool shutdown } } - // Run under the pool's hard cap, NOT the client's deadline. The - // client already fails open on its own ctx in Compress; here we let - // the worker keep going so a slow call (e.g. a model load) finishes + // Run under the pool's hard cap, NOT the request context. Compress + // already fails open on that ctx; here we let the worker keep going + // so a slow call (e.g. a model load) finishes // and leaves the worker warm. Only a real error or the hard cap // (genuinely wedged) recycles the worker. j.resp is buffered, so the // send never blocks even after the client has given up.