feat(cluster): provider request routing + rank-1 coordinator opt-out + failure modes#198
feat(cluster): provider request routing + rank-1 coordinator opt-out + failure modes#198anupsv wants to merge 10 commits into
Conversation
…+ failure modes - ProviderLoop.handleInferenceRequest checks ClusterDiscovery for an active TP or PP engine (rank 0 only) and dispatches through it; falls back to the local BatchScheduler when no cluster session is active or on rank 1. - Cluster engine token streams are decoded via NaiveStreamingDetokenizer, boxed into the same SSE/encrypted-chunk format as the local path, and bounded by a 120 s per-request timeout (returns HTTP 503 on breach). - Premature stream end (0 tokens from a non-empty prompt) is treated as a link failure and returned as HTTP 503 so the coordinator can retry. - ClusterRole field added to HeartbeatMessage (Go) and ProviderMessage.Heartbeat (Swift) with json:"cluster_role,omitempty" backward compat. - ClusterDiscovery.clusterRole exposed as a public accessor; set at rank election time and cleared by sessionDegraded() on link loss. - ProviderState.clusterRole written by a 5 s background sync task in ProviderLoop; read by CoordinatorClient.buildHeartbeatJSON(). - Registry.FindProviderWithTrust skips providers with ClusterRole==1; IsClusterRank1() helper added for other routing eligibility checks. - Registry.Heartbeat propagates ClusterRole from each heartbeat message. - BatchScheduler.currentTokenizer() exposes the loaded tokenizer for the cluster dispatch path without breaking existing interfaces. - StartCommand wires ClusterDiscovery into ProviderLoop after construction. - New tests: coordinator/registry/registry_test.go (5 sub-tests covering rank-1 exclusion, nil eligibility, IsClusterRank1, heartbeat propagation); provider-swift/Tests/ProviderCoreTests/ClusterRequestRoutingTests.swift (8 tests covering heartbeat wire format, ProviderState.clusterRole, and codec round-trips). - scripts/smoke-tp.sh: hardware-gated two-Mac smoke script; not runnable in CI.
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
Benchmark ResultsRunner: 1-provider-streaming1 providers, 1 users, 30 requests, concurrency=5, streaming=true
Latency Decomposition
Assertion Report: FAIL
1-provider-non-streaming1 providers, 1 users, 20 requests, concurrency=5, streaming=false
Latency Decomposition
Assertion Report: FAIL
7-provider-multi-model7 providers, 5 users, 50 requests, concurrency=10, streaming=true
Latency Decomposition
Assertion Report: FAIL
3-provider-high-concurrency3 providers, 10 users, 60 requests, concurrency=20, streaming=true
Latency Decomposition
Assertion Report: FAIL
1-provider-queue-saturation1 providers, 10 users, 40 requests, concurrency=15, streaming=true
Latency Decomposition
Assertion Report: FAIL
3-provider-20-users3 providers, 20 users, 60 requests, concurrency=10, streaming=true
Latency Decomposition
Assertion Report: FAIL
1-provider-scaling1 providers, 5 users, 30 requests, concurrency=10, streaming=true
Latency Decomposition
Assertion Report: FAIL
3-provider-scaling3 providers, 5 users, 30 requests, concurrency=10, streaming=true
Latency Decomposition
Assertion Report: FAIL
5-provider-scaling5 providers, 5 users, 30 requests, concurrency=10, streaming=true
Latency Decomposition
Assertion Report: FAIL
3-provider-heavy-100conc-10kb3 providers, 20 users, 100 requests, concurrency=100, streaming=true
Latency Decomposition
Assertion Report: FAIL
|
…, honor --parallelism pp
Five-round review of the 4-PR cluster stack (4a/4b/4c/4d) surfaced
three critical/high bugs. This commit fixes them.
C1: jaccl bootstrap silently degraded to singleton on failure
DistributedGroupBootstrap.initializeGroup() called
DistributedGroup.initialize(strict: false). On jaccl init failure,
mlx_distributed_init falls back to a singleton group (size=1) — the
bootstrap "succeeded" but the TP engine then ran with no actual work
distribution, wasting the peer Mac's compute.
Switch to strict: true (returns nil ctx on failure → wrapper returns
nil → bootstrap throws). Add belt-and-braces size > 1 check in case
a future MLX release changes strict semantics.
C2: PP hiddenDim set to layer count, not actual hidden dimension
ClusterDiscovery's tryBuildRank0PPEngine / tryBuildRank1PPServer
passed `hiddenDim: model.vocabularySize > 0 ? model.kvHeads.count : 0`
— meaningless conditional that produced the layer count (e.g., 32).
TensorCrypto.openActivation uses config.hiddenDim to reshape the
sealed activation tensor back to [B, seqLen, hiddenDim] on rank 1.
Wrong dimension → malformed tensor → PP decode bricked.
Extend ClusterModelLoader to expose hiddenSize / numLayers /
vocabSize from config.json via a small mirror struct (the MLXLLM
LlamaConfiguration fields are module-internal). Return PPLoadResult
/ TPLoadResult bundles with the real dimensions. Wire the correct
hiddenSize through to EncryptedPipelineConfig.
H1: ClusterModelLoader re-initialized DistributedGroup via no-arg init
load() called MLX.DistributedGroup() which also falls back to
singleton on failure — a second silent-degradation path independent
of the bootstrap.
Change load() to take the bootstrap-verified group as an explicit
parameter; assert size > 1 at the loader boundary too. The internal
MLX.DistributedGroup() call still happens (LlamaModelTP expects the
upstream class, not the ProviderCore wrapper) but now we verify its
size matches before constructing the model.
H3: --parallelism pp operator choice was not honored
ClusterDiscovery always tried jaccl bootstrap first, even when the
operator explicitly chose --parallelism pp. With C1 fixed, this
would now hard-fail at the strict bootstrap — making PP-only setups
unable to form a cluster.
Thread parallelismPreference through ClusterDiscovery.init from the
StartCommand. On rank 0: branch before bootstrap — .pp builds PP
engine directly, .single skips cluster entirely, .tp aborts on
bootstrap failure (refuses to silently downgrade to PP), .auto
retains the try-TP-then-fall-back-to-PP behavior. On rank 1: the
inferenceHandler now routes by frame type, lazily building either
TensorParallelServer (for TP frames) or EncryptedPipelineServer
(for PP frames) on first receive — rank 1 figures out the mode by
observing what rank 0 sends.
Other findings, not fixed (documented):
M1: actor reentrancy in TP/PP generate — concurrent generate()
calls would interleave at await points and corrupt self.cache.
PR 4b explicitly scopes to one-at-a-time; needs a guard before
v2 enables concurrent requests.
M2: setenv not thread-safe — acknowledged in code, low risk.
M3: hardcoded splitLayer = numLayers / 2 — works for symmetric
Macs but not flexible across heterogeneous hardware.
M4: rank-1 PP default-case logs and loops; could hang on a stream
of unknown frames.
M5: cluster model selection — engine preloaded with one model;
consumer requests for another model fall through to local.
M6: first cluster request always falls through to local path
because BatchScheduler.currentTokenizer() is only populated
after the first submit(). Subsequent requests use cluster.
Latency-wrong for the first call; functional.
M7: 60-second polling loop for session-ready in startAsRank0.
Functional but inelegant; callback-based wakeup would be
cleaner.
M8: PP server's seqLen == -1 legacy stop sentinel duplicates the
ppSessionEnd signal.
… retry, PP server resilience
Ten additional review rounds across the cluster stack surfaced four
more critical/high bugs. This commit fixes them; the remaining mediums
are documented in the trailer.
C6 (round 9): TP rank 1 never evaluated the model output, so jaccl
allreduce nodes inside the sharded layers were never triggered. MLX is
lazy — `_ = model.callAsFunction(...)` builds the computation graph
but doesn't execute it. Rank 0 evals its result and triggers its half
of the per-layer allreduce, which then waits indefinitely for rank 1
to participate. Pre-fix behavior: TP appears to start, then deadlocks
on the first layer's allreduce. Tests didn't catch this because the
test setup uses world=1 (singleton) groups where allreduce is a no-op.
Fix: TensorParallelServer.handleFrame now calls `eval(prefillLogits)`
and `eval(decodeLogits)` after `model.callAsFunction(...)`, forcing
the graph (including the allreduce nodes) to execute before the next
frame is processed.
C5 (round 7): TensorCrypto.sealActivation hardcoded bfloat16 via
`mlx_array_data_bfloat16(activation.ctx)`. Llama-3.x checkpoints are
bf16, so this happens to work today. A future TP-supported model
(Qwen3, Gemma3) with fp16 activations would silently produce wrong
bytes — the C function either returns nil or reinterprets fp16 as
bf16 depending on the mlx-c version. Either way: garbage on rank 1.
Fix: validate `activation.dtype == .bfloat16` at seal time and throw
`TensorCryptoError.dtypeMismatch` if not. Documents the assumption
explicitly; future fix is to embed a dtype byte in the wire format and
extend `openActivation` to decode accordingly.
C4 (round 7): EncryptedPipelineEngine's `sendActivationAndReceiveToken`
retried up to 3× on send/receive failure. PP is stateful in a way
that breaks under retry: if the send succeeded but the receive timed
out, rank 1 has already advanced its KV cache and emitted the response
token. Re-sending the same activation would advance rank 1's cache a
second time, producing wrong tokens for the rest of the request.
Fix: drop the retry. Fail-fast is the correct semantic — the caller
bubbles the failure to the consumer as 503, and the coordinator
re-routes the request to a different provider with fresh KV state.
H5 (round 10): A single corrupted ppActivation frame (malformed JSON,
bad ciphertext, MLX shape error) propagated through `try` all the way
up to `peer.serve()`, killing the rank-1 PP serve loop entirely. Any
single bad frame would bring down the whole cluster's PP path until
ClusterDiscovery rebuilt the session.
Fix: wrap each fallible step in `do/catch` so a single corrupted frame
just logs and continues. The serve loop survives malformed payloads,
crypto failures, and transient MLX errors. Rank 0 will time out on its
own receive and retry / re-establish the session.
H6 (round 13): PP server reset its KV cache only on `ppSessionEnd` or
the legacy `seqLen=-1` sentinel. If rank 0 crashed / restarted /
dropped mid-decode without sending sessionEnd, the next request's
prefill would land on stale K/V state, producing garbage tokens.
Fix: track the active UID server-side. When `ppActivation` arrives
with a uid different from the active one, reset the cache before
running prefill. Combined with the H5 hardening, this makes the PP
server resilient to abrupt rank-0 restarts without requiring
ClusterDiscovery rebuild on each.
Mediums noted (not fixed):
M9: ppActivation JSON-encodes the sealed activation bytes, causing
~37% base64 inflation on multi-MB prefill frames. Bandwidth-only,
not correctness.
M10: No telemetry on the cluster path. Logs only. Ops can't query
"TP fallback rate to PP" or "cluster session uptime".
M11: No graceful shutdown for ClusterDiscovery on ProviderLoop
teardown. Resources held in long-lived service scenarios; fine
at process exit.
M12: Round 6 — rank 0 / rank 1 parallelism preference mismatch isn't
detected. Both Macs passing the same --parallelism flag is the
common case; explicit mismatch is operator misuse.
M13: Round 9 — TP server doesn't deduplicate or sequence frames from
rank 0. A buggy rank 0 could send duplicate stepTokens and
rank 1 would double-advance.
M14: Round 11 — no version field in the cluster frame format. Future
protocol changes need an in-band version negotiation.
Tests: dispatcher + request routing suites (22 tests) all green.
The TP / PP decode-loop tests run under singleton groups, so the
allreduce eval fix (C6) is verified by code review + the smoke test
on real hardware. The remaining cluster decode-loop tests crash
before completion due to a pre-existing mlx_distributed_init env-var
issue unrelated to these changes.
…ire (C7)
C7 (round 19): TP control frames (promptTokens, stepToken, sessionStop,
jacclBootstrap), ping/pong, and rank-1's pong response all traveled
PLAINTEXT over the Thunderbolt cable. Anyone with physical link access
could:
- Observe consumer prompt token IDs and trivially recover the prompt
text via the model's tokenizer
- Observe sampled token IDs (the response stream)
- Observe the jaccl coordinator port + session ID
The earlier comment in ClusterControlMessage.swift claimed "All frames
travel over the encrypted ThunderboltLink; the link-layer AES-256-GCM
wrapping in ClusterFrame.encode/decode covers every type" — but the
actual ClusterFrame.encode/decode functions did NO encryption (just
prepended a type byte and read it back). PP was the only path with any
encryption (via inner TensorCrypto.sealActivation), but the OUTER frame
was still plaintext.
Fix: add `ClusterLinkSeal.seal/open` helpers (AES-256-GCM using the
session key established at handshake) and wire them into every
post-handshake send/receive path:
- ClusterSession.sendInferenceFrame: seal frame before conn.send
- ClusterSession.receiveInferenceFrame: open after conn.receive
- ClusterSession.handleConnection (rank 1 dispatch loop): open every
received frame; seal every sent pong before conn.send
- ClusterSession.sendPing (rank 0 health ping): seal ping, open pong
- ClusterDiscovery.startAsRank0 jacclBootstrap send: route through
sendInferenceFrame instead of raw conn.send so it's sealed
- EncryptedPipelineServer.handleRequest: open received frames in the
internal receive loop; seal ppToken before send (matches the outer
sealing that sendInferenceFrame applies on rank 0)
PP's inner TensorCrypto.sealActivation/sealToken remains in place as
defense in depth — the inner cipher provides per-tensor integrity that
the outer link layer doesn't independently audit. Performance impact
is negligible (AES-GCM at multi-GB/s on Apple Silicon; one round per
frame).
Mediums noted across rounds 16-22 (not fixed):
M15: actor reentrancy race on concurrent generate() calls misroutes
response tokens across requests (PP/TP both — session helpers
are actor-isolated per call but inter-call interleaving via
await isn't serialized)
M16: engine fetches sessionKey at start of step; a mid-step session
reconnect would use the old key on the new connection — packets
sealed with the old key fail authentication on the new peer
M17: cluster path only checks tokenizer.eosToken; Llama-3.x has both
<|end_of_text|> and <|eot_id|> — wouldn't stop on the alternate
…(C8, H8)
C8 (round 31): The entire 4-PR cluster stack was dead code in
production. ClusterDiscovery.setModelDirectory was defined but never
called from anywhere in production code paths. As a result,
`tryBuildRank0Engine` and `tryBuildRank1Server` always bailed early
at `guard let modelDir = modelDirectory else { ... }`, the cluster
engine accessors stayed nil forever, and every inference request fell
through to the local BatchScheduler. Tests passed because they
construct engines directly, bypassing ClusterDiscovery.
Fix: ProviderLoop.ensureModelLoaded now calls
`discovery.setModelDirectory(modelPath)` after the model loads
successfully. The cluster engine gets built on the next jaccl
bootstrap (or immediately if bootstrap already completed and was
waiting for the model directory).
H8 (round 31): Consumer requests a different model from the one the
cluster engine currently has loaded → silently produces garbage
tokens. The cluster engine doesn't track its loaded model id; the
dispatcher doesn't compare against `chatRequest.model`.
Fix: ClusterDiscovery.setModelDirectory now tears down existing
engines on a path change, so the next inference request rebuilds the
engine against the new model directory. The first request after a
model switch eats the engine-construction latency; subsequent
requests reuse the rebuilt engine.
Mediums noted across rounds 23-31 (not fixed):
M18: No upper cap on `maxTokens` for cluster path; consumer could
request 200k tokens and OOM the cluster.
M19: No prompt-length cap; pathological prompts can attempt to
allocate enormous activation tensors.
M20: AES-GCM frames don't include a sequence counter; an attacker
with MITM access to the cable could replay sealed frames out
of order (low practical risk on a direct Thunderbolt point-to-
point link).
M21: Hardcoded `jacclPort=29400` — no fallback if the port is
held by another process.
M22: No check that ownRank matches the rank that jaccl assigned to
this process after bootstrap. If jaccl flips them, the model
loads with the wrong shard.
…n already established
Subtle follow-up to C8: setting modelDirectory AFTER jaccl bootstrap
already completed didn't trigger engine construction. The session-
establishment path (startAsRank0 / rank-1 bootstrapHandler) calls
tryBuildRank{0,1}Engine right after jaccl init, but those bail when
modelDirectory is nil and never retry. With the C8 fix only,
ensureModelLoaded now sets modelDirectory but the engine still wasn't
being built because nothing observed the path change.
Fix: setModelDirectory now inspects _activeSession / _activePeer and
the _distributedGroup, and invokes the appropriate engine builder
(TP if jaccl initialized, PP otherwise). The retry is idempotent —
the builders bail if the engine is already constructed.
This is the actual completion of C8 — without it, the cluster
framework remained dead in the common ordering (cluster handshake
finishes before first consumer request triggers model load).
Round 37: sessionDegraded cleared engines + clusterRole but kept the _activeSession / _activePeer handles non-nil for diagnostic purposes. After my round-32 C8 fix that retries engine construction from setModelDirectory when a session is 'active', this gap meant: a sessionDegraded → setModelDirectory sequence would happily build an engine pointing at a dead session. The next inference request 503s on first send. Fix: clear _activeSession / _activePeer in sessionDegraded too. The cluster is genuinely inert after degradation until a fresh handshake re-establishes everything.
…C10)
PR 4d added the cluster_role exclusion to FindProvider /
FindProviderWithTrust, but those are NOT the production routing path
for consumer inference. The actual path is:
api/consumer.go:164 → Registry.ReserveProviderEx
→ selectBestCandidateLockedFull → snapshotProviderLocked
snapshotProviderLocked had no ClusterRole check, so a rank-1 cluster
provider with high decode TPS could outscore the rank-0 partner and
win routing. The consumer request would then either fail downstream
(no clusterEngine on rank 1) or be served single-rank, defeating the
entire point of the cluster_role opt-out.
The pre-flight QuickCapacityCheck (api/consumer.go 429/503 gate) had
the same omission — without the gate it counts rank-1 as a candidate
and reports capacity that ReserveProviderEx then refuses, leaving
consumers with confusing failure modes.
Fix: add `if p.ClusterRole != nil && *p.ClusterRole == 1 { skip }`
to both snapshotProviderLocked (C9) and the QuickCapacityCheck
structural-gates block (C10). Same predicate as the existing
FindProviderWithTrust gate in registry.go:1521.
Tests: three regression tests in scheduler_test.go that fail without
the fix and pass with it — covering the rank-1-excluded case, the
rank-1-only-returns-nil case, and the QuickCapacityCheck path. Full
coordinator suite remains green (./...).
ModelCapacitySnapshot in registry.go claimed to apply "the same gates
as snapshotProviderLocked" but was missing the ClusterRole check —
the same omission fixed in round 41. The snapshot feeds:
- GET /v1/models/capacity (api/capacity.go) — polled by upstream
routers like OpenRouter before dispatching requests
- GET /v1/models (api/consumer.go handleListModels) — RoutableProviders,
WarmProviders, CanAccept fields advertised to consumers
A rank-1 cluster provider would inflate RoutableProviders /
WarmProviders / CanAccept, lying to upstream callers about how many
providers can actually serve a model. Upstream dispatches based on
these counts would land at the coordinator, only for ReserveProviderEx
to correctly refuse — surfacing as 503s with phantom capacity.
Fix mirrors the round-41 pattern: insert the ClusterRole gate right
after the Status check and before the trust check.
Round 42 mediums noted for later (not blocking):
- ListModels ProviderCount field (registry.go:1632) — same root
cause but observability-only; consumer-facing /v1/models lies
about provider count for cluster models
- ListRDMAEnabledPeersForCaller (registry.go:2198) — account-scoped
RDMA peer listing doesn't filter rank-1; could advertise an
already-paired Mac as available to a sibling rank-0 of the same
account
Test: TestModelCapacitySnapshotExcludesRank1 — fails without the
fix (RoutableProviders=2), passes with it (RoutableProviders=1).
Full coordinator suite green.
The wire protocol defines `cluster_role` ∈ {nil, 0, 1}: nil = not
clustered, 0 = rank-0 initiator (routable), 1 = rank-1 responder
(non-routable). Every routing gate added in PR 4d / rounds 41-42
matches the rank-1 case with `*p.ClusterRole == 1` — but the
coordinator stored `msg.ClusterRole` verbatim with no validation.
A provider sending `cluster_role=2` (or any non-{0,1}) would:
- Not match `*p.ClusterRole == 1` at any gate, so it would be
treated as routable
- Be selected by ReserveProviderEx, ModelCapacitySnapshot, and
FindProviderWithTrust — bypassing the entire round-41/42 fix
This is realistic both as a provider bug (off-by-one when computing
its own rank) and as a malicious provider that wants to evade the
rank-1 exclusion to draw traffic. The fix:
- Validate on ingest in Heartbeat. If `*msg.ClusterRole` is
non-nil and ∉ {0, 1}, log a warning and clamp to 1.
- Fail-closed: malformed cluster state is treated as non-routable
(worst case: a buggy provider is excluded), not as routable
(worst case: traffic dispatched to a non-functional node).
Tests:
- TestHeartbeatClampsMalformedClusterRole — sends cluster_role=2,
verifies stored value is 1, then verifies ReserveProviderEx
correctly excludes the provider end-to-end. Fails without the
fix (stored ClusterRole=2 passes the == 1 gate).
- TestHeartbeatPreservesValidClusterRoles — round-trips 0 and 1
unchanged so the clamp doesn't break the normal path.
Full coordinator suite green.
Summary
This is PR 4d (the final integration PR in the 4-PR cluster inference stack). PRs 4a–4c shipped the jaccl bootstrap, TP decode loop, and PP decode loop. This PR wires everything together so consumer inference requests actually run on the cluster.
ProviderLoop.handleInferenceRequestnow checksClusterDiscoveryfor an activeTensorParallelEngine(TP) orEncryptedPipelineEngine(PP). On rank 0 with an active session, requests go through the cluster engine; rank 1 and non-clustered providers fall through to the existingBatchSchedulerpath.AsyncStream<Int>token IDs are decoded viaNaiveStreamingDetokenizer, then emitted as the same SSE/encrypted-chunk format the local path uses. No new public type needed — the adapter is private toProviderLoop.withTaskGroup+ watchdog task) → HTTP 503. Premature stream end (0 tokens from non-empty prompt) → HTTP 503. Both cause the coordinator to retry on a different provider. No mid-generation single-rank fallback (KV cache state would be inconsistent).cluster_role: new optional field onHeartbeatMessage(Go) andProviderMessage.Heartbeat(Swift).ClusterDiscoverysets itsclusterRoleat rank election and clears it viasessionDegraded()on link loss. A 5 s background sync task inProviderLoopwritesProviderState.clusterRole;CoordinatorClient.buildHeartbeatJSON()reads it.FindProviderWithTrustskips providers withClusterRole == 1.IsClusterRank1()helper added.Heartbeat()propagates the field from each heartbeat message. Pre-PR-4d providers (missing field) are treated as nil → eligible.Test plan
cd coordinator && go build ./...— cleancd provider-swift && swift build— clean (warnings pre-existing)cd coordinator && go test ./registry/ -run TestClusterRank1 -v— 5 sub-tests passcd coordinator && go test ./registry/— full suite, no regressionsswift test --filter "ClusterRequestRouting"— 8 tests passscripts/smoke-tp.sh— hardware-gated; requires two TB5-connected Macs. Run after flashing both Macs with this build. See script header for instructions.Deferred
LlamaModelTPQ) — follow-up PRClusterDiscoveryunit tests in Swift (actor requires a realAttestationSigner) — covered by smoke script insteadNeed help on this PR? Tag
@codesmithwith what you need. Autofix is disabled.