Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 51 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ received by your Aperture instance and then forwarded to it, tsheadroom hands
the request's `messages` array to Headroom's `compress()` function — which
crushes bulky, low-information content (large tool outputs, search results,
logs) while leaving prompts and recent turns intact — and returns a `modify`
action with the compressed body. If there's nothing worthwhile to compress,or
action with the compressed body. If there's nothing worthwhile to compress, or
anything goes wrong, it returns `allow` and the request passes through
unchanged. **It never blocks a request.**

Expand Down Expand Up @@ -95,14 +95,13 @@ wish to wrap tsheadroom in a `systemd` or similar service manager.
| `-python` | `python3` | Interpreter with `headroom-ai` installed (used to launch workers). |
| `-worker` | `worker.py` | Path to the worker script. |
| `-hostname` | `tsheadroom` | Node name on the tailnet. |
| `-pool-size` | `max(4, GOMAXPROCS)` | Number of persistent Python workers. |
| `-deadline` | `4s` | Per-request fail-open deadline (client-facing). Keep it under Aperture's hook `timeout`. |
| `-max-compress` | `60s` | Hard cap on a single worker call before it's recycled. Must exceed `-deadline`; covers one-time ML model loads (see "Tuning compression"). |
| `-pool-size` | `8` | Number of persistent Python workers. Each holds a resident copy of the ML model (~600MB) when text compression is active (~4.8GB at 8), so raise it deliberately. |
| `-max-compress` | `60s` | Hard cap on a single worker call before the worker is recycled; the sole worker-side timeout. Covers one-time ML model loads (see "Tuning compression"). |
| `-addr` | `:80` | Listen address on the tsnet node. |
| `-state-dir` | tsnet default | tsnet state directory. **Persist this** — it holds the node identity (`tailscaled.state`). See note below. |
| `-config` | `tsheadroom.config.json` | Path to the tunable compress-config file (see "Tuning compression"). Loaded at startup, rewritten on `PUT /config`. |
| `-local-addr` | (off) | Serve plain HTTP here instead of tsnet — for local testing only. |
| `-v` | off | Log a one-line per-request summary (`in/out` sizes, `modify`/`allow`) to stdout. |
| `-v` | off | Log a one-line per-request summary (`in/out` sizes, timing, `model_limit`, `modify`/`allow`) to stdout. |

`TS_AUTHKEY` (environment) provides the tailnet auth key on first start. You
may harmlessly omit it for future restarts, once the state-dir has been
Expand Down Expand Up @@ -153,7 +152,7 @@ node:
"headroom": {
"url": "http://tsheadroom.<your-tailnet>.ts.net/",
"fail_policy": "fail_open", // default; if tsheadroom is unreachable, send uncompressed
"timeout": "5s", // must be >= tsheadroom's -deadline (4s)
"timeout": "30s", // tsheadroom's only client-facing latency ceiling (see below)
},
},
```
Expand Down Expand Up @@ -186,9 +185,16 @@ Notes:
(the model name is read from inside the body).
- **`fail_open` is the right policy.** tsheadroom always answers `200` with
`allow`/`modify` and never blocks; `fail_open` ensures that if the node is
*unreachable*, requests still proceed (just uncompressed).
- Set the hook **`timeout` ≥ tsheadroom's `-deadline`** so tsheadroom's own
fail-open fires before Aperture times the call out.
*unreachable* — or a compression runs long — requests still proceed (just
uncompressed).
- **The hook `timeout` is tsheadroom's only client-facing latency ceiling.**
tsheadroom has no soft timeout of its own: it waits for the compression and
returns, bounded only by this `timeout` (Aperture gives up and forwards the
original) and by its internal `-max-compress` worker cap. Set it to how long
you're willing to make a request wait for compression — `30s` matches
Headroom's own compression budget and comfortably fits large, late-session
requests. Lowering it just trades compression on the slowest requests for
latency; there's no separate tsheadroom knob to keep in sync.
- Scope `models` to target specific providers, and use `preference` if you stack
it with other guardrail hooks.

Expand Down Expand Up @@ -241,27 +247,44 @@ change config). Lock the node down accordingly, or restrict who can reach it.
### ML model loading and timeouts

The ML text compressor loads a ~600MB model on first use (one-time, then cached
on disk and resident in each worker). This load takes several seconds — longer
than the `-deadline` — so tsheadroom handles it with two separate timeouts:

- **`-deadline` (4s)** bounds the *client* response: if a call is still running,
tsheadroom fails open (`allow`) so Aperture is never held up.
- **`-max-compress` (60s)** bounds the *worker*: the call keeps running in the
background past the deadline, so the model finishes loading and the worker
stays warm. Only a call exceeding the hard cap is treated as wedged and
recycled.

The result: enabling text compression at runtime via `PUT /config` costs **at
most one** uncompressed (`allow`) request while the model loads, then it works —
**no restart needed**. To avoid even that one, workers **preload** the model at
startup whenever the persisted config has a text knob enabled (so new/restarted
workers come up warm).
on disk and resident in each worker). This load takes several seconds, and a
large, late-session conversation can itself take a few seconds to compress. There
is **one** timeout that bounds the worker, and the client-facing wait is owned by
Aperture:

- **`-max-compress` (60s)** bounds the *worker*: a single call may run this long
before the worker is treated as wedged and recycled. tsheadroom imposes no
shorter deadline of its own — a slow call runs to completion (the model
finishes loading, the worker stays warm) and the result is used if Aperture is
still waiting.
- **Aperture's hook `timeout`** bounds the *client*: when it fires, Aperture
(with `fail_open`) forwards the original request uncompressed. This is the only
client-facing latency ceiling, and it belongs in the Aperture config — there is
no tsheadroom-side `-deadline` to keep in sync. Set it to your tolerance for
waiting on compression (`30s` recommended; it matches Headroom's own budget and
fits large requests).

The result: a cold worker pays **at most one** uncompressed (`allow`) request
while the model loads, then it works — **no restart needed**. To avoid even that,
workers **preload** the model at startup whenever the persisted config could
route content through the ML compressor. With `headroom-ai[ml]` installed this is
the **default**: `compress_system_messages` defaults on, and Headroom also uses
the ML Kompress model as its fallback for tool/mixed content — so the model is
needed for ordinary traffic even under an otherwise-default config, and workers
come up warm. (Each preloaded worker holds its own resident copy of the model,
which is why `-pool-size` defaults low — see the flag table.)

The model is downloaded from HuggingFace on first ever use (cached under
`~/.cache/huggingface` thereafter). A worker has ~60s to report ready, so a slow
*cold* download on a fresh host can exceed that and make workers retry; warm the
cache once (start with a text knob on, or run a single `compress` under the
worker's interpreter) before serving production traffic.
`~/.cache/huggingface` thereafter). transformers revalidates that cache against
the HuggingFace Hub on each cold load; tsheadroom skips that network round-trip
(and its anonymous rate-limiting) by running the workers **offline once the model
is cached** (`HF_HUB_OFFLINE`/`TRANSFORMERS_OFFLINE`, set automatically). To
instead stay online with higher rate limits — e.g. to let a fresh host download
the model — set **`HF_TOKEN`** in the environment; it is honored only when set,
and takes precedence over the automatic offline mode. A worker has ~60s to report
ready, so a slow *cold* download on a fresh host can exceed that and make workers
retry; warm the cache once (start a worker with the cache empty, or run a single
`compress` under the worker's interpreter) before serving production traffic.

## What actually gets compressed

Expand Down
8 changes: 7 additions & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,14 @@ func (s CompressSettings) validate() error {
// preload that model at startup (passed to them via TSHEADROOM_PRELOAD; see
// worker.py's _warmup). Keep this in sync with any new ML-driving knob added to
// CompressSettings.
//
// CompressSystemMessages defaults to true, so with headroom-ai[ml] installed the
// ML model is needed for ordinary traffic even under an otherwise-default config
// (and headroom also uses Kompress as its fallback for tool/mixed content). We
// therefore include it here, which makes preload on by default — workers come up
// warm instead of cold-loading the ~600MB model on their first live request.
func (s CompressSettings) textEnabled() bool {
return s.CompressUserMessages || s.TargetRatio != nil
return s.CompressUserMessages || s.CompressSystemMessages || s.TargetRatio != nil
}

// settingsStore holds the current settings behind an atomic pointer (read once
Expand Down
65 changes: 52 additions & 13 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,29 @@ type hookCallData struct {

// guardrailResponse is aperture's GuardrailResponse. We only ever emit "allow"
// or "modify" — never "block" — and always with HTTP 200.
//
// Why never "block": Headroom's compress() is best-effort and has no "halt this
// request" signal. It never raises on an over-limit conversation; it just
// returns the most-compressed messages it can, even if they still exceed the
// model's context window. So nothing compress() returns ever means "refuse" —
// every compress error or shortfall collapses back to "allow" (forward the
// original, unchanged). If a conversation is genuinely too big even after
// compression, we forward it and the upstream provider returns its own native,
// model-tailored "prompt is too long" error, which request/response clients
// (Claude Code and friends) already recover from by auto-compacting and retrying.
//
// WHERE THIS WOULD BREAK — and why it can't today: the above assumes the client
// recovers from the provider's overflow error. A streaming/WebSocket client of
// the kind Headroom's own proxy deliberately fail-CLOSES for — one that decides
// when to compact from the upstream-reported token usage that our compression
// deflates, and that treats a mid-stream refuse (1009/413) as a fatal connection
// error — could instead lock up when we fail open on an oversized frame. We
// cannot hit that case: Aperture's hook protocol is request/response only (one
// discrete HTTP POST per call; no WebSocket, no incremental frame delivery), so
// no such client exists on this path. THERE ARE NONE TODAY. If Aperture ever
// grows WebSocket/streaming hook support, revisit this: we may then need a real
// "block" path that mimics the provider's overflow error shape so those clients
// compact instead of hanging.
type guardrailResponse struct {
Action string `json:"action"`
RequestBody any `json:"request_body,omitempty"`
Expand All @@ -43,8 +66,7 @@ type compressor interface {
type Handler struct {
comp compressor
settings *settingsStore // current compress knobs, read per request
deadline time.Duration
log *slog.Logger // operational logs + warnings (stderr)
log *slog.Logger // operational logs + warnings (stderr)

verbose bool // when set, emit a per-request summary to out
out io.Writer // destination for verbose summaries (stdout)
Expand All @@ -55,6 +77,11 @@ type summary struct {
inMessages int // number of messages received
inBytes int // serialized size of received messages
outBytes int // serialized size of returned messages

workerMs float64 // worker-reported compress() time (0 when no worker result)
cold bool // worker's first real request (paid the cold model load)
modelLimit int // context-window limit the worker compressed against (0 when no result)
reason string // why this action was chosen: modify / allow(noop|error|passthrough|read-error)
}

func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Expand All @@ -63,11 +90,12 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

start := time.Now()
resp, s := h.process(r)

if h.verbose {
fmt.Fprintf(h.out, "request in_msgs=%d in_bytes=%d out_bytes=%d -> %s\n",
s.inMessages, s.inBytes, s.outBytes, resp.Action)
fmt.Fprintf(h.out, "request in_msgs=%d in_bytes=%d out_bytes=%d dur_ms=%d worker_ms=%.0f cold=%t model_limit=%d -> %s\n",
s.inMessages, s.inBytes, s.outBytes, time.Since(start).Milliseconds(), s.workerMs, s.cold, s.modelLimit, s.reason)
}
writeJSON(w, http.StatusOK, resp)
}
Expand All @@ -81,30 +109,30 @@ func (h *Handler) process(r *http.Request) (guardrailResponse, summary) {
body, err := io.ReadAll(io.LimitReader(r.Body, maxBody))
if err != nil {
h.log.Warn("read request body failed; allowing", "err", err)
return allow, summary{}
return allow, summary{reason: "allow(read-error)"}
}

var data hookCallData
if err := json.Unmarshal(body, &data); err != nil || len(data.RequestBody) == 0 {
return allow, summary{}
return allow, summary{reason: "allow(passthrough)"}
}

// request_body must be a JSON object so we can splice messages back and
// return the whole thing (aperture rejects non-object modified bodies).
var reqBody map[string]any
if err := json.Unmarshal(data.RequestBody, &reqBody); err != nil {
return allow, summary{}
return allow, summary{reason: "allow(passthrough)"}
}

// v1 handles only the `messages` shape (Anthropic/OpenAI). Anything else
// (e.g. Gemini's `contents`, embeddings) passes through untouched.
rawMessages, ok := reqBody["messages"]
if !ok {
return allow, summary{}
return allow, summary{reason: "allow(passthrough)"}
}
messages, ok := rawMessages.([]any)
if !ok {
return allow, summary{}
return allow, summary{reason: "allow(passthrough)"}
}
model, _ := reqBody["model"].(string) // absent/non-string -> worker default

Expand All @@ -117,20 +145,30 @@ func (h *Handler) process(r *http.Request) (guardrailResponse, summary) {
s.outBytes = s.inBytes
}

ctx, cancel := context.WithTimeout(r.Context(), h.deadline)
defer cancel()

res, err := h.comp.Compress(ctx, compressRequest{
// No tsheadroom-side timeout: the wait is bounded by the request context
// (aperture hangs up at its hook timeout) and by the pool's hard cap (a
// runaway worker is recycled). A soft deadline here would only fail open
// *before* aperture would have — abandoning exactly the slow, large-context
// compressions this tool exists to perform. The latency ceiling belongs to
// aperture's per-hook `timeout`, which is owned by the caller.
res, err := h.comp.Compress(r.Context(), compressRequest{
Messages: messages,
Model: model,
Config: h.settings.get(),
})
if err != nil {
h.log.Warn("compress failed; allowing", "err", err)
s.reason = "allow(error)"
return allow, s
}
// Worker timing/cold/limit are available for both noop and modify; record
// them so the -v line shows them regardless of the outcome.
s.workerMs = res.ElapsedMs
s.cold = res.ColdFirstCall
s.modelLimit = res.ModelLimit
if res.TokensSaved <= 0 {
// No-op: nothing meaningful to change, so don't rewrite the body.
s.reason = "allow(noop)"
return allow, s
}

Expand All @@ -141,6 +179,7 @@ func (h *Handler) process(r *http.Request) (guardrailResponse, summary) {
if h.verbose {
s.outBytes = jsonLen(res.Messages)
}
s.reason = "modify"
return guardrailResponse{Action: "modify", RequestBody: reqBody}, s
}

Expand Down
2 changes: 0 additions & 2 deletions handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"net/http/httptest"
"strings"
"testing"
"time"
)

// fakeCompressor lets each test dictate the pool's behavior.
Expand All @@ -29,7 +28,6 @@ func newTestHandler(fn func(ctx context.Context, req compressRequest) (*compress
return &Handler{
comp: fakeCompressor{fn: fn},
settings: loadSettings("", quietLog()),
deadline: time.Second,
log: quietLog(),
out: io.Discard,
}
Expand Down
13 changes: 2 additions & 11 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"os"
"os/signal"
"path/filepath"
"runtime"
"syscall"
"time"

Expand All @@ -32,9 +31,8 @@ import (
func main() {
var (
hostname = flag.String("hostname", "tsheadroom", "tsnet hostname (how this node appears on the tailnet)")
poolSize = flag.Int("pool-size", max(4, runtime.GOMAXPROCS(0)), "number of Python compression workers")
deadline = flag.Duration("deadline", 4*time.Second, "per-request fail-open deadline (keep under aperture's hook timeout)")
maxCompress = flag.Duration("max-compress", 60*time.Second, "hard cap on a single worker call before it's recycled (must exceed -deadline; covers one-time model loads)")
poolSize = flag.Int("pool-size", 8, "number of Python compression workers")
maxCompress = flag.Duration("max-compress", 60*time.Second, "hard cap on a single worker call before it's recycled (covers one-time model loads); the sole worker-side timeout")
python = flag.String("python", "python3", "Python interpreter with headroom-ai installed")
script = flag.String("worker", "worker.py", "path to worker.py")
addr = flag.String("addr", ":80", "listen address on the tsnet node")
Expand All @@ -60,11 +58,6 @@ func main() {
}
settings := loadSettings(configPath, log)

if *maxCompress <= *deadline {
log.Warn("max-compress should exceed deadline; slow calls will be killed before the client's fail-open",
"max_compress", *maxCompress, "deadline", *deadline)
}

// Workers preload the ML model at startup when text compression is enabled;
// the decision lives here (single source of truth) and is re-evaluated at
// each spawn, so a worker respawned after a runtime change is up to date.
Expand All @@ -76,7 +69,6 @@ func main() {
handler := &Handler{
comp: pool,
settings: settings,
deadline: *deadline,
log: log,
verbose: *verbose,
out: os.Stdout,
Expand Down Expand Up @@ -105,7 +97,6 @@ func main() {
"hostname", *hostname,
"addr", listenAddr(*localAddr, *addr),
"pool_size", *poolSize,
"deadline", *deadline,
"max_compress", *maxCompress,
"python", *python,
"config", configPath,
Expand Down
Loading
Loading