From e879cee92e4a782567610a4dbbf7b4e1bfdc5808 Mon Sep 17 00:00:00 2001 From: Nandana Dileep <110280757+nandanadileep@users.noreply.github.com> Date: Sun, 28 Jun 2026 20:39:49 +0530 Subject: [PATCH] fix(watchdog): force-kill stuck-busy backends instead of deadlocking the loader MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the watchdog's busy-killer decides a backend has been busy past the busy timeout, it shuts it down via ModelLoader.ShutdownModel -> deleteProcess, which grabs ml.mu and then waits for IsBusy() to clear BEFORE stopping the process. But a backend that exceeds the busy timeout is, by definition, stuck on an in-flight gRPC call, so the graceful wait never returns, ml.mu is held forever, and every other ml.Load blocks — including the shared opus backend load at the start of every realtime (WebRTC) session. New realtime connections then hang at "Connected, waiting for session..." whenever the watchdog is enabled, while logs repeatedly print the watchdog's busy / "active connection" line. Fix: add a force shutdown path (ShutdownModelForce / deleteProcess(s, force=true)) that stops the process FIRST — dropping the stuck call's gRPC connection and unblocking it — instead of waiting on it. Route the watchdog's busy-killer and busy LRU / group / memory evictions through the force path; keep the graceful wait for idle and user-initulated unloads. Graceful/unforced kills are unchanged. Regression test: the watchdog busy-killer uses ShutdownModelForce. Fixes #10391 --- pkg/model/loader.go | 18 ++++++-- pkg/model/process.go | 55 +++++++++++++++--------- pkg/model/watchdog.go | 87 +++++++++++++++++++++++++++++--------- pkg/model/watchdog_test.go | 51 ++++++++++++++++++++++ 4 files changed, 168 insertions(+), 43 deletions(-) diff --git a/pkg/model/loader.go b/pkg/model/loader.go index 5eb40cdb9837..4a72e27a5b11 100644 --- a/pkg/model/loader.go +++ b/pkg/model/loader.go @@ -369,7 +369,19 @@ func (ml *ModelLoader) ShutdownModel(modelName string) error { ml.mu.Lock() defer ml.mu.Unlock() - return ml.deleteProcess(modelName) + return ml.deleteProcess(modelName, false) +} + +// ShutdownModelForce stops a backend without waiting for an in-flight gRPC +// call to finish first. It is used by the watchdog's busy-killer, which only +// fires once a backend has been stuck on a call past the busy timeout — the +// graceful ShutdownModel would block forever on that stuck call (while +// holding ml.mu), preventing every other model load. See deleteProcess. +func (ml *ModelLoader) ShutdownModelForce(modelName string) error { + ml.mu.Lock() + defer ml.mu.Unlock() + + return ml.deleteProcess(modelName, true) } func (ml *ModelLoader) CheckIsLoaded(s string) *Model { @@ -411,7 +423,7 @@ func (ml *ModelLoader) checkIsLoaded(s string) *Model { // Timeouts may mean the node is busy, so keep the model cached. if isConnectionError(err) { xlog.Warn("Remote model unreachable (connection error), removing from cache", "model", s, "error", err) - if delErr := ml.deleteProcess(s); delErr != nil { + if delErr := ml.deleteProcess(s, false); delErr != nil { xlog.Error("error cleaning up remote model", "error", delErr, "model", s) } return nil @@ -422,7 +434,7 @@ func (ml *ModelLoader) checkIsLoaded(s string) *Model { if !process.IsAlive() { xlog.Debug("GRPC Process is not responding", "model", s) // stop and delete the process, this forces to re-load the model and re-create again the service - err := ml.deleteProcess(s) + err := ml.deleteProcess(s, false) if err != nil { xlog.Error("error stopping process", "error", err, "process", s) } diff --git a/pkg/model/process.go b/pkg/model/process.go index 95e3e0758464..a6fc9cef4542 100644 --- a/pkg/model/process.go +++ b/pkg/model/process.go @@ -22,40 +22,57 @@ var ( modelNotFoundErr = errors.New("model not found") ) -func (ml *ModelLoader) deleteProcess(s string) error { +// deleteProcess stops and removes a backend. The force flag trades a graceful +// shutdown for a prompt one and is meant for the watchdog's busy-killer: a +// backend that has been busy past the watchdog timeout is, by definition, +// stuck in an in-flight gRPC call. Waiting for that call to finish before +// stopping the process (the graceful path) would block forever — and since +// deleteProcess runs under ml.mu, it would stall every other model load, +// including the shared opus backend load at the start of every realtime +// (WebRTC) session, hanging new connections at "Connected, waiting for +// session...". The force path stops the process first, which drops the +// in-flight call's gRPC connection and unblocks it, then cleans up. +func (ml *ModelLoader) deleteProcess(s string, force bool) error { model, ok := ml.store.Get(s) if !ok { xlog.Debug("Model not found", "model", s) return modelNotFoundErr } - retries := 1 - for model.GRPC(false, ml.wd).IsBusy() { - xlog.Debug("Model busy. Waiting.", "model", s) - dur := time.Duration(retries*2) * time.Second - if dur > retryTimeout { - dur = retryTimeout - } - time.Sleep(dur) - retries++ + if !force { + retries := 1 + for model.GRPC(false, ml.wd).IsBusy() { + xlog.Debug("Model busy. Waiting.", "model", s) + dur := time.Duration(retries*2) * time.Second + if dur > retryTimeout { + dur = retryTimeout + } + time.Sleep(dur) + retries++ - if retries > 10 && forceBackendShutdown { - xlog.Warn("Model is still busy after retries. Forcing shutdown.", "model", s, "retries", retries) - break + if retries > 10 && forceBackendShutdown { + xlog.Warn("Model is still busy after retries. Forcing shutdown.", "model", s, "retries", retries) + break + } } } - xlog.Debug("Deleting process", "model", s) + xlog.Debug("Deleting process", "model", s, "force", force) // Run unload hooks (e.g. close MCP sessions) for _, hook := range ml.onUnloadHooks { hook(s) } - // Free GPU resources before stopping the process to ensure VRAM is released - xlog.Debug("Calling Free() to release GPU resources", "model", s) - if err := model.GRPC(false, ml.wd).Free(context.Background()); err != nil { - xlog.Warn("Error freeing GPU resources", "error", err, "model", s) + // Free GPU resources before stopping the process to ensure VRAM is + // released. Skipped on force-shutdown: a stuck-busy backend won't answer + // a Free RPC (it's hung on the same stuck call), and stopping the process + // releases its VRAM anyway. + if !force { + xlog.Debug("Calling Free() to release GPU resources", "model", s) + if err := model.GRPC(false, ml.wd).Free(context.Background()); err != nil { + xlog.Warn("Error freeing GPU resources", "error", err, "model", s) + } } process := model.Process() @@ -103,7 +120,7 @@ func (ml *ModelLoader) StopGRPC(filter GRPCProcessFilter) error { return true }) for _, k := range toDelete { - e := ml.deleteProcess(k) + e := ml.deleteProcess(k, false) err = errors.Join(err, e) } return err diff --git a/pkg/model/watchdog.go b/pkg/model/watchdog.go index d6dd18da8d89..a961333477d8 100644 --- a/pkg/model/watchdog.go +++ b/pkg/model/watchdog.go @@ -62,6 +62,11 @@ type WatchDog struct { type ProcessManager interface { ShutdownModel(modelName string) error + // ShutdownModelForce stops the backend without waiting for an in-flight + // gRPC call to finish. Used when the watchdog evicts a backend that is + // stuck busy: the graceful ShutdownModel would block on that stuck call + // (while holding the loader's mutex), stalling every other model load. + ShutdownModelForce(modelName string) error } // NewWatchDog creates a new WatchDog with the provided options. @@ -342,6 +347,17 @@ type modelUsageInfo struct { sizeBytes int64 // on-disk file size; 0 if unknown } +// evictionTarget is a model selected for eviction, retaining whether it was busy +// at selection time. A busy target must be shut down via ShutdownModelForce: +// the graceful path waits for its in-flight gRPC call to finish, but a busy +// eviction only happens when that call is stuck (busy-killer) or the operator +// opted into forceEvictionWhenBusy — either way, waiting would deadlock while +// holding the loader mutex. +type evictionTarget struct { + model string + wasBusy bool +} + // EnforceLRULimitResult contains the result of LRU enforcement type EnforceLRULimitResult struct { EvictedCount int // Number of models successfully evicted @@ -410,13 +426,10 @@ func (wd *WatchDog) EnforceLRULimit(pendingLoads int) EnforceLRULimitResult { needMore := len(modelsToShutdown) < modelsToEvict && skippedBusyCount > 0 wd.Unlock() - // Now shutdown models without holding the watchdog lock to prevent deadlock - for _, model := range modelsToShutdown { - if err := wd.pm.ShutdownModel(model); err != nil { - xlog.Error("[WatchDog] error shutting down model during LRU eviction", "error", err, "model", model) - } - xlog.Debug("[WatchDog] LRU eviction complete", "model", model) - } + // Now shutdown models without holding the watchdog lock to prevent + // deadlock. Busy targets go through the force path so the loader doesn't + // wait on their stuck in-flight call (which would re-deadlock here). + wd.shutdownEvicted(modelsToShutdown, "LRU eviction") if needMore { xlog.Warn("[WatchDog] LRU eviction incomplete", "evicted", len(modelsToShutdown), "needed", modelsToEvict, "skippedBusy", skippedBusyCount, "reason", "some models are busy with active API calls") @@ -431,9 +444,10 @@ func (wd *WatchDog) EnforceLRULimit(pendingLoads int) EnforceLRULimitResult { // collectEvictionsLocked walks `candidates` (already in eviction order) and // untracks up to `maxToEvict` models that are eligible for eviction. Pinned // models are always skipped; busy models are skipped unless `force` is true. -// Returns the names of evicted models and the number skipped because they -// were busy. Must be called with wd.Lock() held. -func (wd *WatchDog) collectEvictionsLocked(candidates []modelUsageInfo, maxToEvict int, force bool) (evicted []string, skippedBusy int) { +// Returns the evicted models (with their busy state at selection time) and +// the number skipped because they were busy. Must be called with wd.Lock() +// held. +func (wd *WatchDog) collectEvictionsLocked(candidates []modelUsageInfo, maxToEvict int, force bool) (evicted []evictionTarget, skippedBusy int) { for i := 0; len(evicted) < maxToEvict && i < len(candidates); i++ { m := candidates[i] if wd.pinnedModels[m.model] { @@ -447,12 +461,31 @@ func (wd *WatchDog) collectEvictionsLocked(candidates []modelUsageInfo, maxToEvi continue } xlog.Info("[WatchDog] evicting model", "model", m.model, "busy", isBusy) - evicted = append(evicted, m.model) + evicted = append(evicted, evictionTarget{model: m.model, wasBusy: isBusy}) wd.untrack(m.address) } return evicted, skippedBusy } +// shutdownEvicted shuts down each evicted model, using the force path for any +// that were busy at selection time so a stuck in-flight call doesn't deadlock +// the graceful ShutdownModel (which waits for that call while holding the +// loader's mutex). +func (wd *WatchDog) shutdownEvicted(targets []evictionTarget, label string) { + for _, t := range targets { + var err error + if t.wasBusy { + err = wd.pm.ShutdownModelForce(t.model) + } else { + err = wd.pm.ShutdownModel(t.model) + } + if err != nil { + xlog.Error("[WatchDog] error shutting down model during "+label, "error", err, "model", t.model, "busy", t.wasBusy) + } + xlog.Debug("[WatchDog] "+label+" complete", "model", t.model, "busy", t.wasBusy) + } +} + // EnforceGroupExclusivity evicts every loaded model that shares at least one // concurrency group with the requested model. The pinned/busy/retry semantics // match EnforceLRULimit so the loader's retry loop can stay generic. @@ -502,12 +535,7 @@ func (wd *WatchDog) EnforceGroupExclusivity(requestedModel string) EnforceLRULim needMore := len(modelsToShutdown) < len(conflicts) wd.Unlock() - for _, m := range modelsToShutdown { - if err := wd.pm.ShutdownModel(m); err != nil { - xlog.Error("[WatchDog] error shutting down model during group eviction", "error", err, "model", m) - } - xlog.Debug("[WatchDog] Group eviction complete", "model", m) - } + wd.shutdownEvicted(modelsToShutdown, "group eviction") if needMore { xlog.Warn("[WatchDog] Group eviction incomplete", "requested", requestedModel, "evicted", len(modelsToShutdown), "needed", len(conflicts), "skippedBusy", skippedBusyCount, "reason", "some conflicts are busy or pinned") @@ -623,12 +651,19 @@ func (wd *WatchDog) checkBusy() { } wd.Unlock() - // Now shutdown models without holding the watchdog lock to prevent deadlock + // The busy-killer targets backends whose in-flight gRPC call has been + // stuck past the busy timeout. Use the force path so the loader stops + // the process FIRST (dropping the stuck call's gRPC connection) instead + // of waiting for that call to finish — the graceful path would block on + // that stuck call while holding the loader mutex and stall every other + // model load (notably the opus backend load at the start of every + // realtime WebRTC session, which hangs new sessions at "Connected, + // waiting for session..."). for _, model := range modelsToShutdown { - if err := wd.pm.ShutdownModel(model); err != nil { + if err := wd.pm.ShutdownModelForce(model); err != nil { xlog.Error("[watchdog] error shutting down model", "error", err, "model", model) } - xlog.Debug("[WatchDog] model shut down", "model", model) + xlog.Debug("[WatchDog] busy model shut down", "model", model) } } @@ -743,10 +778,20 @@ func (wd *WatchDog) evictLRUModel() { xlog.Info("[WatchDog] Memory reclaimer evicting LRU model", "model", lruModel.model, "lastUsed", lruModel.lastUsed) + // A busy target only gets here when forceEvictionWhenBusy is true, i.e. + // the operator accepted evicting models with active calls. Route it + // through the force path so the loader stops the process first instead + // of blocking on the stuck in-flight call while holding its mutex. + _, wasBusy := wd.busyTime[lruModel.address] + wd.Unlock() // Shutdown the model - if err := wd.pm.ShutdownModel(lruModel.model); err != nil && err != modelNotFoundErr { + shutdown := wd.pm.ShutdownModel + if wasBusy { + shutdown = wd.pm.ShutdownModelForce + } + if err := shutdown(lruModel.model); err != nil && err != modelNotFoundErr { xlog.Error("[WatchDog] error shutting down model during memory reclamation", "error", err, "model", lruModel.model) } else { // Untrack the model diff --git a/pkg/model/watchdog_test.go b/pkg/model/watchdog_test.go index a8bd47bf0929..e158b84c005e 100644 --- a/pkg/model/watchdog_test.go +++ b/pkg/model/watchdog_test.go @@ -13,6 +13,8 @@ import ( type mockProcessManager struct { mu sync.Mutex shutdownCalls []string + forceCalls []string + gracefulCalls []string shutdownErrors map[string]error } @@ -27,6 +29,18 @@ func (m *mockProcessManager) ShutdownModel(modelName string) error { m.mu.Lock() defer m.mu.Unlock() m.shutdownCalls = append(m.shutdownCalls, modelName) + m.gracefulCalls = append(m.gracefulCalls, modelName) + if err, ok := m.shutdownErrors[modelName]; ok { + return err + } + return nil +} + +func (m *mockProcessManager) ShutdownModelForce(modelName string) error { + m.mu.Lock() + defer m.mu.Unlock() + m.shutdownCalls = append(m.shutdownCalls, modelName) + m.forceCalls = append(m.forceCalls, modelName) if err, ok := m.shutdownErrors[modelName]; ok { return err } @@ -41,6 +55,14 @@ func (m *mockProcessManager) getShutdownCalls() []string { return result } +func (m *mockProcessManager) getForceShutdownCalls() []string { + m.mu.Lock() + defer m.mu.Unlock() + result := make([]string, len(m.forceCalls)) + copy(result, m.forceCalls) + return result +} + var _ = Describe("WatchDog", func() { var ( wd *model.WatchDog @@ -666,6 +688,35 @@ var _ = Describe("WatchDog", func() { }) }) + Context("Busy Killer", func() { + It("force-shuts down a model that is stuck busy past the busy timeout", func() { + // Regression: a backend stuck on an in-flight gRPC call must be + // killed via the force path (stop the process first), not the + // graceful one (wait for the stuck call to finish, which would + // deadlock while holding the loader mutex and stall every other + // model load — e.g. the opus backend load at the start of every + // realtime WebRTC session, hanging new "Connected, waiting for + // session..." connections). + wd = model.NewWatchDog( + model.WithProcessManager(pm), + model.WithBusyTimeout(10*time.Millisecond), + model.WithBusyCheck(true), + model.WithWatchdogInterval(20*time.Millisecond), + ) + + wd.AddAddressModelMap("addr1", "stuckModel") + wd.Mark("addr1") // busy — simulates an in-flight gRPC call + + go wd.Run() + defer wd.Shutdown() + + Eventually(func() []string { + return pm.getForceShutdownCalls() + }, "300ms", "10ms").Should(ContainElement("stuckModel")) + Expect(pm.getShutdownCalls()).To(ContainElement("stuckModel")) + }) + }) + Context("Concurrency Groups", func() { Describe("ReplaceModelGroups / GetModelGroups", func() { It("returns nil for unknown models", func() {