Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 65 additions & 6 deletions controlplane/k8s_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ func newK8sWorkerPool(cfg K8sWorkerPoolConfig, clientset kubernetes.Interface) (

observeControlPlaneWorkers(0)
go pool.idleReaper()
go pool.disruptionGuardReconciler()

return pool, nil
}
Expand Down Expand Up @@ -725,11 +726,16 @@ func (p *K8sWorkerPool) spawnWorker(ctx context.Context, id int, image string, p
Labels: podLabels,
},
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyNever,
ServiceAccountName: p.workerServiceAccountName(),
AutomountServiceAccountToken: boolPtr(false),
PriorityClassName: p.workerPriorityClassName,
NodeSelector: p.nodeSelectorForProfile(profile),
RestartPolicy: corev1.RestartPolicyNever,
// Give in-flight queries a real drain window on SIGTERM instead of
// the 30s k8s default. Pairs with karpenter.sh/do-not-disrupt
// (set on busy workers) so a node roll defers to the query; this is
// the fallback for the residual race and involuntary eviction.
TerminationGracePeriodSeconds: int64Ptr(workerTerminationGracePeriodSeconds),
ServiceAccountName: p.workerServiceAccountName(),
AutomountServiceAccountToken: boolPtr(false),
PriorityClassName: p.workerPriorityClassName,
NodeSelector: p.nodeSelectorForProfile(profile),
SecurityContext: &corev1.PodSecurityContext{
RunAsNonRoot: boolPtr(true),
RunAsUser: int64Ptr(1000),
Expand Down Expand Up @@ -1879,6 +1885,27 @@ func (p *K8sWorkerPool) adoptClaimedWorker(ctx context.Context, claimed *configs
return nil, fmt.Errorf("get claimed worker pod %s: %w", claimed.PodName, err)
}

// Don't adopt a worker whose pod is already Terminating (its node is being
// drained): activating it would hand a new session to a shutting-down pod.
// The local scheduler predicates (isGenericSessionSchedulableWorkerLocked /
// workerReadyForSchedulingLocked) only cover workers already in p.workers,
// not runtime-store claims, so gate the claim here. The error routes into the
// caller's claim-retire/fallback path (reserveClaimedWorker callers).
if pod.DeletionTimestamp != nil {
return nil, fmt.Errorf("claimed worker pod %s is terminating; not adopting", claimed.PodName)
}

// Relabel the adopted pod to this control plane so our label-scoped pod
// informer sees it; otherwise the do-not-disrupt reconcile, the planned-drain
// health guard, and informer-driven pod-terminated cleanup are all blind to
// adopted workers. Best-effort: the worker still functions if this fails.
if pod.Labels["duckgres/control-plane"] != p.cpID {
if relErr := p.relabelAdoptedPodToThisCP(ctx, claimed.PodName); relErr != nil {
slog.Warn("Failed to relabel adopted worker pod to this control plane; informer-based guards may not apply until relabel succeeds.",
"worker", claimed.WorkerID, "worker_pod", claimed.PodName, "error", relErr)
}
}

// For hot-idle workers, skip the epoch-validated health check. The worker's
// epoch and CP instance ID are from the previous owner, and ClaimHotIdleWorker
// already bumped the epoch in the DB. The health check requires exact epoch
Expand Down Expand Up @@ -2622,6 +2649,34 @@ func (p *K8sWorkerPool) HealthCheckLoop(ctx context.Context, interval time.Durat
slog.Warn("K8s worker health check failed.", "id", lease.workerID, "error", healthErr, "consecutive_failures", count)

if count >= maxConsecutiveHealthFailures {
// Planned node disruption (Karpenter drift/
// consolidation, kubelet drain) is not a worker
// crash: the pod is already Terminating. Marking a
// BUSY worker Lost here cancels its in-flight query
// after ~3 health intervals (~5s), even though the
// worker is draining and its node typically lives
// ~100s+ longer. For a busy worker, defer to the
// informer-driven w.done path above (fires when the
// pod is actually gone and runs the same cleanup) so
// the query drains, bounded by the pod's
// terminationGracePeriodSeconds. Cache-only read.
//
// Gate on activeSessions>0: an idle worker has no
// query to protect, and leaving a Terminating-but-
// not-yet-deleted worker in the pool would let
// AcquireWorker route a new session to a shutting-
// down pod (findIdle/leastLoaded only skip workers
// whose done channel is closed, i.e. already gone).
// So idle workers fall through and are marked Lost
// promptly, exactly as before this change.
p.mu.RLock()
active := w.activeSessions
p.mu.RUnlock()
if active > 0 && p.workerPodTerminating(p.workerPodName(w)) {
slog.Warn("Busy K8s worker failing health checks while pod is Terminating (planned node drain); deferring to graceful drain instead of canceling its query.",
"id", lease.workerID, "pod", p.workerPodName(w), "active_sessions", active, "consecutive_failures", count)
return
}
lostDisposition, err := p.markWorkerLostForHealthLease(lease, LifecycleOriginHealthCheckCrash)
if err != nil {
slog.Error("K8s worker unresponsive but lease validation failed; leaving cleanup to retry.", "id", lease.workerID, "owner_cp_instance_id", lease.ownerCPInstanceID, "owner_epoch", lease.ownerEpoch, "consecutive_failures", count, "error", err)
Expand Down Expand Up @@ -3422,7 +3477,11 @@ func (p *K8sWorkerPool) dropLocalWorkerIfSameLeaseLocked(lease workerLeaseSnapsh
}

func (p *K8sWorkerPool) isGenericSessionSchedulableWorkerLocked(w *ManagedWorker) bool {
return w.SharedState().NormalizedLifecycle() == WorkerLifecycleIdle
// Exclude a worker whose pod is already Terminating (planned node drain): the
// 2a health guard keeps such a busy worker in the pool to let its query
// drain, but it must not receive a NEW session on a shutting-down pod.
return w.SharedState().NormalizedLifecycle() == WorkerLifecycleIdle &&
!p.workerPodTerminating(p.workerPodName(w))
}

func (p *K8sWorkerPool) isWarmIdleWorkerLocked(w *ManagedWorker) bool {
Expand Down
6 changes: 6 additions & 0 deletions controlplane/k8s_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3552,6 +3552,12 @@ func assertSpawnedWorkerPod(t *testing.T, pod *corev1.Pod) {
t.Fatal("expected automountServiceAccountToken=false for shared warm worker pods")
}

// Drain-aware eviction: a real grace window for in-flight queries, not the
// 30s k8s default.
if pod.Spec.TerminationGracePeriodSeconds == nil || *pod.Spec.TerminationGracePeriodSeconds != workerTerminationGracePeriodSeconds {
t.Fatalf("expected terminationGracePeriodSeconds=%d, got %v", workerTerminationGracePeriodSeconds, pod.Spec.TerminationGracePeriodSeconds)
}

if pod.Spec.SecurityContext == nil || pod.Spec.SecurityContext.RunAsNonRoot == nil || !*pod.Spec.SecurityContext.RunAsNonRoot {
t.Fatal("expected runAsNonRoot=true")
}
Expand Down
5 changes: 5 additions & 0 deletions controlplane/org_reserved_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,11 @@ func (p *OrgReservedPool) workerReadyForSchedulingLocked(w *ManagedWorker) bool
if !p.workerBelongsToOrgLocked(w) {
return false
}
// Don't hand a new session to a worker whose pod is already Terminating
// (planned node drain) — see isGenericSessionSchedulableWorkerLocked.
if p.shared.workerPodTerminating(p.shared.workerPodName(w)) {
return false
}
lifecycle := w.SharedState().NormalizedLifecycle()
return lifecycle == WorkerLifecycleHot
}
Expand Down
233 changes: 233 additions & 0 deletions controlplane/worker_disruption_guard.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
//go:build kubernetes

package controlplane

import (
"context"
"encoding/json"
"log/slog"
"time"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)

// Drain-aware worker eviction (Tier 1a + Tier 2a of the 2026-06-04 RCA).
//
// A Karpenter Drift roll tainted worker nodes out from under running queries:
// each in-flight query died a median 5s after its node was tainted, while the
// node still had ~108s of life left, and the control plane *itself* canceled
// the query (3 failed health checks → "worker unresponsive"). 31 queries were
// killed this way. The control plane drains correctly on its own roll (900s
// grace, unbounded drain); workers had none of that. These two mechanisms give
// a busy worker the same protection:
//
// 1a. While a worker is running a query it carries karpenter.sh/do-not-disrupt,
// so Karpenter's *voluntary* disruption (drift, consolidation, expiry)
// skips its node. The annotation is cleared when the worker goes idle.
// 2a. If a worker fails health checks while its pod is already Terminating
// (a planned node drain), the health loop does NOT mark it Lost / cancel
// its query — it defers to the informer-driven pod-terminated path, letting
// the worker drain (bounded by terminationGracePeriodSeconds).
//
// This protects against *voluntary* disruption only. Involuntary loss (spot
// reclaim, node/hardware failure) still kills the query; that residual tail is
// handled by transparent statement retry for commit-safe statements (separate
// change), not here.

const (
// karpenterDoNotDisruptAnnotation, when set on a pod, tells Karpenter not to
// voluntarily disrupt the node hosting it (drift/consolidation/expiry).
// Involuntary disruption (spot reclaim, NodePool terminationGracePeriod
// ceiling) is unaffected — which is why we still need the Tier 2a guard and
// a bounded grace period.
karpenterDoNotDisruptAnnotation = "karpenter.sh/do-not-disrupt"
karpenterDoNotDisruptValue = "true"

// disruptionGuardReconcileInterval is how often the busy/idle → annotation
// reconcile runs. A worker busy for longer than this is protected before the
// next Karpenter disruption decision; the only exposure is a query that both
// starts and is selected for voluntary disruption within one interval. Kept
// at the health-check cadence (2s) to bound that race window; with the
// NodePool terminationGracePeriod ceiling in place a disruption inside the
// window is non-fatal anyway (the query still gets its drain). Eager-stamping
// on the 0→1 session transition would close the window entirely but touches
// the hot acquire path on both pools — deferred. Patches fire only on
// transitions, so steady state costs zero API calls regardless of cadence.
disruptionGuardReconcileInterval = 2 * time.Second

// workerTerminationGracePeriodSeconds is the worker pod's grace period.
// Unset historically (→ k8s default 30s), far too short for analytical
// queries. This is the fallback drain window for the residual race (worker
// became busy inside the reconcile gap) and for involuntary eviction; the
// primary protection for long queries is the do-not-disrupt annotation
// above. The NodePool-level terminationGracePeriod (infra) remains the hard
// ceiling so a security roll always completes.
workerTerminationGracePeriodSeconds int64 = 600
)

// disruptionGuardReconciler keeps karpenter.sh/do-not-disrupt in sync with each
// worker's busy/idle state. Started once from newK8sWorkerPool alongside
// idleReaper. Covers both the flat pool and OrgReservedPool, which share this
// pool's worker map.
func (p *K8sWorkerPool) disruptionGuardReconciler() {
ticker := time.NewTicker(disruptionGuardReconcileInterval)
defer ticker.Stop()
for {
select {
case <-p.shutdownCh:
return
case <-ticker.C:
p.reconcileDisruptionGuards(context.Background())
}
}
}

// reconcileDisruptionGuards patches the do-not-disrupt annotation onto workers
// that became busy and removes it from workers that went idle.
//
// Desired state (busy) is compared against the pod's ACTUAL annotation, read
// from the pod informer cache — not an in-memory flag. This keeps the reconcile
// stateless and self-correcting across control-plane restarts and failover:
// when a CP dies, the worker's session(s) die with it (so the worker goes idle)
// but its pod keeps the annotation the dead CP stamped. A surviving/replacement
// CP that adopts the worker has no in-memory memory of having set it; reading
// the live annotation lets it see desired=idle vs current=set and clear the
// orphan. (An in-memory "applied" flag would miss this — it would read
// applied=false==busy=false and never clear the stale annotation.)
//
// It snapshots desired state under the lock, reads current state from the
// cache, and issues K8s API calls without holding the lock.
func (p *K8sWorkerPool) reconcileDisruptionGuards(ctx context.Context) {
type guardTarget struct {
worker *ManagedWorker
podName string
busy bool
}

var targets []guardTarget
p.mu.RLock()
for _, w := range p.workers {
select {
case <-w.done:
continue // worker exiting; nothing to guard
default:
}
targets = append(targets, guardTarget{worker: w, podName: p.workerPodName(w), busy: w.activeSessions > 0})
}
p.mu.RUnlock()

for _, t := range targets {
if t.podName == "" {
continue
}
if t.busy == p.podHasDoNotDisrupt(t.podName) {
continue // pod already in the desired state (steady state: no API call)
}
if err := p.patchWorkerDoNotDisrupt(ctx, t.podName, t.busy); err != nil {
if apierrors.IsNotFound(err) {
continue // pod gone; nothing to guard
}
slog.Warn("Failed to reconcile worker do-not-disrupt annotation.",
"worker", t.worker.ID, "pod", t.podName, "busy", t.busy, "error", err)
continue
}
slog.Debug("Reconciled worker do-not-disrupt annotation.",
"worker", t.worker.ID, "pod", t.podName, "do_not_disrupt", t.busy)
}
}

// podHasDoNotDisrupt reports whether the worker pod currently carries the
// karpenter.sh/do-not-disrupt annotation, read from the pod informer cache
// (no API call). Returns false if the pod is not in the cache yet.
func (p *K8sWorkerPool) podHasDoNotDisrupt(podName string) bool {
if p.informer == nil || podName == "" {
return false
}
obj, exists, err := p.informer.GetIndexer().GetByKey(p.namespace + "/" + podName)
if err != nil || !exists {
return false
}
pod, ok := obj.(*corev1.Pod)
if !ok {
return false
}
return pod.Annotations[karpenterDoNotDisruptAnnotation] == karpenterDoNotDisruptValue
}

// patchWorkerDoNotDisrupt sets (enable) or removes (disable) the
// karpenter.sh/do-not-disrupt annotation on a worker pod via a JSON merge patch.
// A null value removes the key.
func (p *K8sWorkerPool) patchWorkerDoNotDisrupt(ctx context.Context, podName string, enable bool) error {
var value interface{}
if enable {
value = karpenterDoNotDisruptValue
} else {
value = nil // JSON merge patch: null deletes the annotation key
}
patch := map[string]interface{}{
"metadata": map[string]interface{}{
"annotations": map[string]interface{}{
karpenterDoNotDisruptAnnotation: value,
},
},
}
data, err := json.Marshal(patch)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
_, err = p.clientset.CoreV1().Pods(p.namespace).Patch(ctx, podName, types.MergePatchType, data, metav1.PatchOptions{})
return err
}

// relabelAdoptedPodToThisCP patches a worker pod's duckgres/control-plane label
// to this control plane's ID. Called when one CP adopts a worker spawned by
// another (failover/rollout). The pod informer is label-scoped to
// duckgres/control-plane=<cpID> (see startInformer), so without this an adopted
// worker is invisible to THIS CP's informer cache — making podHasDoNotDisrupt,
// workerPodTerminating, and the informer-driven pod-terminated cleanup all blind
// to it (the do-not-disrupt reconcile and the planned-drain health guard would
// silently not apply, and a stale do-not-disrupt annotation would never clear).
func (p *K8sWorkerPool) relabelAdoptedPodToThisCP(ctx context.Context, podName string) error {
patch := map[string]interface{}{
"metadata": map[string]interface{}{
"labels": map[string]interface{}{
"duckgres/control-plane": p.cpID,
},
},
}
data, err := json.Marshal(patch)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
_, err = p.clientset.CoreV1().Pods(p.namespace).Patch(ctx, podName, types.MergePatchType, data, metav1.PatchOptions{})
return err
}

// workerPodTerminating reports whether the worker's pod is already being torn
// down (has a deletionTimestamp), read from the pod informer cache — no API
// call, no extra RBAC. The control plane evicts workers via Karpenter node
// drains and kubelet, both of which set deletionTimestamp before the worker
// stops answering health checks; a genuine crash leaves it nil. Used by the
// health-check loop to distinguish a planned drain (let the query finish) from
// a crash (mark Lost).
func (p *K8sWorkerPool) workerPodTerminating(podName string) bool {
if p.informer == nil || podName == "" {
return false
}
obj, exists, err := p.informer.GetIndexer().GetByKey(p.namespace + "/" + podName)
if err != nil || !exists {
return false
}
pod, ok := obj.(*corev1.Pod)
if !ok {
return false
}
return pod.DeletionTimestamp != nil
}
Loading
Loading