diff --git a/controlplane/k8s_pool.go b/controlplane/k8s_pool.go index 7ecdeed1..78d89069 100644 --- a/controlplane/k8s_pool.go +++ b/controlplane/k8s_pool.go @@ -253,6 +253,7 @@ func newK8sWorkerPool(cfg K8sWorkerPoolConfig, clientset kubernetes.Interface) ( observeControlPlaneWorkers(0) go pool.idleReaper() + go pool.disruptionGuardReconciler() return pool, nil } @@ -725,11 +726,16 @@ func (p *K8sWorkerPool) spawnWorker(ctx context.Context, id int, image string, p Labels: podLabels, }, Spec: corev1.PodSpec{ - RestartPolicy: corev1.RestartPolicyNever, - ServiceAccountName: p.workerServiceAccountName(), - AutomountServiceAccountToken: boolPtr(false), - PriorityClassName: p.workerPriorityClassName, - NodeSelector: p.nodeSelectorForProfile(profile), + RestartPolicy: corev1.RestartPolicyNever, + // Give in-flight queries a real drain window on SIGTERM instead of + // the 30s k8s default. Pairs with karpenter.sh/do-not-disrupt + // (set on busy workers) so a node roll defers to the query; this is + // the fallback for the residual race and involuntary eviction. + TerminationGracePeriodSeconds: int64Ptr(workerTerminationGracePeriodSeconds), + ServiceAccountName: p.workerServiceAccountName(), + AutomountServiceAccountToken: boolPtr(false), + PriorityClassName: p.workerPriorityClassName, + NodeSelector: p.nodeSelectorForProfile(profile), SecurityContext: &corev1.PodSecurityContext{ RunAsNonRoot: boolPtr(true), RunAsUser: int64Ptr(1000), @@ -1879,6 +1885,27 @@ func (p *K8sWorkerPool) adoptClaimedWorker(ctx context.Context, claimed *configs return nil, fmt.Errorf("get claimed worker pod %s: %w", claimed.PodName, err) } + // Don't adopt a worker whose pod is already Terminating (its node is being + // drained): activating it would hand a new session to a shutting-down pod. + // The local scheduler predicates (isGenericSessionSchedulableWorkerLocked / + // workerReadyForSchedulingLocked) only cover workers already in p.workers, + // not runtime-store claims, so gate the claim here. The error routes into the + // caller's claim-retire/fallback path (reserveClaimedWorker callers). + if pod.DeletionTimestamp != nil { + return nil, fmt.Errorf("claimed worker pod %s is terminating; not adopting", claimed.PodName) + } + + // Relabel the adopted pod to this control plane so our label-scoped pod + // informer sees it; otherwise the do-not-disrupt reconcile, the planned-drain + // health guard, and informer-driven pod-terminated cleanup are all blind to + // adopted workers. Best-effort: the worker still functions if this fails. + if pod.Labels["duckgres/control-plane"] != p.cpID { + if relErr := p.relabelAdoptedPodToThisCP(ctx, claimed.PodName); relErr != nil { + slog.Warn("Failed to relabel adopted worker pod to this control plane; informer-based guards may not apply until relabel succeeds.", + "worker", claimed.WorkerID, "worker_pod", claimed.PodName, "error", relErr) + } + } + // For hot-idle workers, skip the epoch-validated health check. The worker's // epoch and CP instance ID are from the previous owner, and ClaimHotIdleWorker // already bumped the epoch in the DB. The health check requires exact epoch @@ -2622,6 +2649,34 @@ func (p *K8sWorkerPool) HealthCheckLoop(ctx context.Context, interval time.Durat slog.Warn("K8s worker health check failed.", "id", lease.workerID, "error", healthErr, "consecutive_failures", count) if count >= maxConsecutiveHealthFailures { + // Planned node disruption (Karpenter drift/ + // consolidation, kubelet drain) is not a worker + // crash: the pod is already Terminating. Marking a + // BUSY worker Lost here cancels its in-flight query + // after ~3 health intervals (~5s), even though the + // worker is draining and its node typically lives + // ~100s+ longer. For a busy worker, defer to the + // informer-driven w.done path above (fires when the + // pod is actually gone and runs the same cleanup) so + // the query drains, bounded by the pod's + // terminationGracePeriodSeconds. Cache-only read. + // + // Gate on activeSessions>0: an idle worker has no + // query to protect, and leaving a Terminating-but- + // not-yet-deleted worker in the pool would let + // AcquireWorker route a new session to a shutting- + // down pod (findIdle/leastLoaded only skip workers + // whose done channel is closed, i.e. already gone). + // So idle workers fall through and are marked Lost + // promptly, exactly as before this change. + p.mu.RLock() + active := w.activeSessions + p.mu.RUnlock() + if active > 0 && p.workerPodTerminating(p.workerPodName(w)) { + slog.Warn("Busy K8s worker failing health checks while pod is Terminating (planned node drain); deferring to graceful drain instead of canceling its query.", + "id", lease.workerID, "pod", p.workerPodName(w), "active_sessions", active, "consecutive_failures", count) + return + } lostDisposition, err := p.markWorkerLostForHealthLease(lease, LifecycleOriginHealthCheckCrash) if err != nil { slog.Error("K8s worker unresponsive but lease validation failed; leaving cleanup to retry.", "id", lease.workerID, "owner_cp_instance_id", lease.ownerCPInstanceID, "owner_epoch", lease.ownerEpoch, "consecutive_failures", count, "error", err) @@ -3422,7 +3477,11 @@ func (p *K8sWorkerPool) dropLocalWorkerIfSameLeaseLocked(lease workerLeaseSnapsh } func (p *K8sWorkerPool) isGenericSessionSchedulableWorkerLocked(w *ManagedWorker) bool { - return w.SharedState().NormalizedLifecycle() == WorkerLifecycleIdle + // Exclude a worker whose pod is already Terminating (planned node drain): the + // 2a health guard keeps such a busy worker in the pool to let its query + // drain, but it must not receive a NEW session on a shutting-down pod. + return w.SharedState().NormalizedLifecycle() == WorkerLifecycleIdle && + !p.workerPodTerminating(p.workerPodName(w)) } func (p *K8sWorkerPool) isWarmIdleWorkerLocked(w *ManagedWorker) bool { diff --git a/controlplane/k8s_pool_test.go b/controlplane/k8s_pool_test.go index c1d39415..186a5fea 100644 --- a/controlplane/k8s_pool_test.go +++ b/controlplane/k8s_pool_test.go @@ -3552,6 +3552,12 @@ func assertSpawnedWorkerPod(t *testing.T, pod *corev1.Pod) { t.Fatal("expected automountServiceAccountToken=false for shared warm worker pods") } + // Drain-aware eviction: a real grace window for in-flight queries, not the + // 30s k8s default. + if pod.Spec.TerminationGracePeriodSeconds == nil || *pod.Spec.TerminationGracePeriodSeconds != workerTerminationGracePeriodSeconds { + t.Fatalf("expected terminationGracePeriodSeconds=%d, got %v", workerTerminationGracePeriodSeconds, pod.Spec.TerminationGracePeriodSeconds) + } + if pod.Spec.SecurityContext == nil || pod.Spec.SecurityContext.RunAsNonRoot == nil || !*pod.Spec.SecurityContext.RunAsNonRoot { t.Fatal("expected runAsNonRoot=true") } diff --git a/controlplane/org_reserved_pool.go b/controlplane/org_reserved_pool.go index 0b5a0449..f4142e82 100644 --- a/controlplane/org_reserved_pool.go +++ b/controlplane/org_reserved_pool.go @@ -343,6 +343,11 @@ func (p *OrgReservedPool) workerReadyForSchedulingLocked(w *ManagedWorker) bool if !p.workerBelongsToOrgLocked(w) { return false } + // Don't hand a new session to a worker whose pod is already Terminating + // (planned node drain) — see isGenericSessionSchedulableWorkerLocked. + if p.shared.workerPodTerminating(p.shared.workerPodName(w)) { + return false + } lifecycle := w.SharedState().NormalizedLifecycle() return lifecycle == WorkerLifecycleHot } diff --git a/controlplane/worker_disruption_guard.go b/controlplane/worker_disruption_guard.go new file mode 100644 index 00000000..4e1f29c8 --- /dev/null +++ b/controlplane/worker_disruption_guard.go @@ -0,0 +1,233 @@ +//go:build kubernetes + +package controlplane + +import ( + "context" + "encoding/json" + "log/slog" + "time" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +// Drain-aware worker eviction (Tier 1a + Tier 2a of the 2026-06-04 RCA). +// +// A Karpenter Drift roll tainted worker nodes out from under running queries: +// each in-flight query died a median 5s after its node was tainted, while the +// node still had ~108s of life left, and the control plane *itself* canceled +// the query (3 failed health checks → "worker unresponsive"). 31 queries were +// killed this way. The control plane drains correctly on its own roll (900s +// grace, unbounded drain); workers had none of that. These two mechanisms give +// a busy worker the same protection: +// +// 1a. While a worker is running a query it carries karpenter.sh/do-not-disrupt, +// so Karpenter's *voluntary* disruption (drift, consolidation, expiry) +// skips its node. The annotation is cleared when the worker goes idle. +// 2a. If a worker fails health checks while its pod is already Terminating +// (a planned node drain), the health loop does NOT mark it Lost / cancel +// its query — it defers to the informer-driven pod-terminated path, letting +// the worker drain (bounded by terminationGracePeriodSeconds). +// +// This protects against *voluntary* disruption only. Involuntary loss (spot +// reclaim, node/hardware failure) still kills the query; that residual tail is +// handled by transparent statement retry for commit-safe statements (separate +// change), not here. + +const ( + // karpenterDoNotDisruptAnnotation, when set on a pod, tells Karpenter not to + // voluntarily disrupt the node hosting it (drift/consolidation/expiry). + // Involuntary disruption (spot reclaim, NodePool terminationGracePeriod + // ceiling) is unaffected — which is why we still need the Tier 2a guard and + // a bounded grace period. + karpenterDoNotDisruptAnnotation = "karpenter.sh/do-not-disrupt" + karpenterDoNotDisruptValue = "true" + + // disruptionGuardReconcileInterval is how often the busy/idle → annotation + // reconcile runs. A worker busy for longer than this is protected before the + // next Karpenter disruption decision; the only exposure is a query that both + // starts and is selected for voluntary disruption within one interval. Kept + // at the health-check cadence (2s) to bound that race window; with the + // NodePool terminationGracePeriod ceiling in place a disruption inside the + // window is non-fatal anyway (the query still gets its drain). Eager-stamping + // on the 0→1 session transition would close the window entirely but touches + // the hot acquire path on both pools — deferred. Patches fire only on + // transitions, so steady state costs zero API calls regardless of cadence. + disruptionGuardReconcileInterval = 2 * time.Second + + // workerTerminationGracePeriodSeconds is the worker pod's grace period. + // Unset historically (→ k8s default 30s), far too short for analytical + // queries. This is the fallback drain window for the residual race (worker + // became busy inside the reconcile gap) and for involuntary eviction; the + // primary protection for long queries is the do-not-disrupt annotation + // above. The NodePool-level terminationGracePeriod (infra) remains the hard + // ceiling so a security roll always completes. + workerTerminationGracePeriodSeconds int64 = 600 +) + +// disruptionGuardReconciler keeps karpenter.sh/do-not-disrupt in sync with each +// worker's busy/idle state. Started once from newK8sWorkerPool alongside +// idleReaper. Covers both the flat pool and OrgReservedPool, which share this +// pool's worker map. +func (p *K8sWorkerPool) disruptionGuardReconciler() { + ticker := time.NewTicker(disruptionGuardReconcileInterval) + defer ticker.Stop() + for { + select { + case <-p.shutdownCh: + return + case <-ticker.C: + p.reconcileDisruptionGuards(context.Background()) + } + } +} + +// reconcileDisruptionGuards patches the do-not-disrupt annotation onto workers +// that became busy and removes it from workers that went idle. +// +// Desired state (busy) is compared against the pod's ACTUAL annotation, read +// from the pod informer cache — not an in-memory flag. This keeps the reconcile +// stateless and self-correcting across control-plane restarts and failover: +// when a CP dies, the worker's session(s) die with it (so the worker goes idle) +// but its pod keeps the annotation the dead CP stamped. A surviving/replacement +// CP that adopts the worker has no in-memory memory of having set it; reading +// the live annotation lets it see desired=idle vs current=set and clear the +// orphan. (An in-memory "applied" flag would miss this — it would read +// applied=false==busy=false and never clear the stale annotation.) +// +// It snapshots desired state under the lock, reads current state from the +// cache, and issues K8s API calls without holding the lock. +func (p *K8sWorkerPool) reconcileDisruptionGuards(ctx context.Context) { + type guardTarget struct { + worker *ManagedWorker + podName string + busy bool + } + + var targets []guardTarget + p.mu.RLock() + for _, w := range p.workers { + select { + case <-w.done: + continue // worker exiting; nothing to guard + default: + } + targets = append(targets, guardTarget{worker: w, podName: p.workerPodName(w), busy: w.activeSessions > 0}) + } + p.mu.RUnlock() + + for _, t := range targets { + if t.podName == "" { + continue + } + if t.busy == p.podHasDoNotDisrupt(t.podName) { + continue // pod already in the desired state (steady state: no API call) + } + if err := p.patchWorkerDoNotDisrupt(ctx, t.podName, t.busy); err != nil { + if apierrors.IsNotFound(err) { + continue // pod gone; nothing to guard + } + slog.Warn("Failed to reconcile worker do-not-disrupt annotation.", + "worker", t.worker.ID, "pod", t.podName, "busy", t.busy, "error", err) + continue + } + slog.Debug("Reconciled worker do-not-disrupt annotation.", + "worker", t.worker.ID, "pod", t.podName, "do_not_disrupt", t.busy) + } +} + +// podHasDoNotDisrupt reports whether the worker pod currently carries the +// karpenter.sh/do-not-disrupt annotation, read from the pod informer cache +// (no API call). Returns false if the pod is not in the cache yet. +func (p *K8sWorkerPool) podHasDoNotDisrupt(podName string) bool { + if p.informer == nil || podName == "" { + return false + } + obj, exists, err := p.informer.GetIndexer().GetByKey(p.namespace + "/" + podName) + if err != nil || !exists { + return false + } + pod, ok := obj.(*corev1.Pod) + if !ok { + return false + } + return pod.Annotations[karpenterDoNotDisruptAnnotation] == karpenterDoNotDisruptValue +} + +// patchWorkerDoNotDisrupt sets (enable) or removes (disable) the +// karpenter.sh/do-not-disrupt annotation on a worker pod via a JSON merge patch. +// A null value removes the key. +func (p *K8sWorkerPool) patchWorkerDoNotDisrupt(ctx context.Context, podName string, enable bool) error { + var value interface{} + if enable { + value = karpenterDoNotDisruptValue + } else { + value = nil // JSON merge patch: null deletes the annotation key + } + patch := map[string]interface{}{ + "metadata": map[string]interface{}{ + "annotations": map[string]interface{}{ + karpenterDoNotDisruptAnnotation: value, + }, + }, + } + data, err := json.Marshal(patch) + if err != nil { + return err + } + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + _, err = p.clientset.CoreV1().Pods(p.namespace).Patch(ctx, podName, types.MergePatchType, data, metav1.PatchOptions{}) + return err +} + +// relabelAdoptedPodToThisCP patches a worker pod's duckgres/control-plane label +// to this control plane's ID. Called when one CP adopts a worker spawned by +// another (failover/rollout). The pod informer is label-scoped to +// duckgres/control-plane= (see startInformer), so without this an adopted +// worker is invisible to THIS CP's informer cache — making podHasDoNotDisrupt, +// workerPodTerminating, and the informer-driven pod-terminated cleanup all blind +// to it (the do-not-disrupt reconcile and the planned-drain health guard would +// silently not apply, and a stale do-not-disrupt annotation would never clear). +func (p *K8sWorkerPool) relabelAdoptedPodToThisCP(ctx context.Context, podName string) error { + patch := map[string]interface{}{ + "metadata": map[string]interface{}{ + "labels": map[string]interface{}{ + "duckgres/control-plane": p.cpID, + }, + }, + } + data, err := json.Marshal(patch) + if err != nil { + return err + } + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + _, err = p.clientset.CoreV1().Pods(p.namespace).Patch(ctx, podName, types.MergePatchType, data, metav1.PatchOptions{}) + return err +} + +// workerPodTerminating reports whether the worker's pod is already being torn +// down (has a deletionTimestamp), read from the pod informer cache — no API +// call, no extra RBAC. The control plane evicts workers via Karpenter node +// drains and kubelet, both of which set deletionTimestamp before the worker +// stops answering health checks; a genuine crash leaves it nil. Used by the +// health-check loop to distinguish a planned drain (let the query finish) from +// a crash (mark Lost). +func (p *K8sWorkerPool) workerPodTerminating(podName string) bool { + if p.informer == nil || podName == "" { + return false + } + obj, exists, err := p.informer.GetIndexer().GetByKey(p.namespace + "/" + podName) + if err != nil || !exists { + return false + } + pod, ok := obj.(*corev1.Pod) + if !ok { + return false + } + return pod.DeletionTimestamp != nil +} diff --git a/controlplane/worker_disruption_guard_test.go b/controlplane/worker_disruption_guard_test.go new file mode 100644 index 00000000..1d21a499 --- /dev/null +++ b/controlplane/worker_disruption_guard_test.go @@ -0,0 +1,278 @@ +//go:build kubernetes + +package controlplane + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "testing" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" +) + +// guardPodAnnotated builds a worker pod with/without the do-not-disrupt annotation. +func guardPodAnnotated(name string, annotated bool) *corev1.Pod { + pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: "default"}} + if annotated { + pod.Annotations = map[string]string{karpenterDoNotDisruptAnnotation: karpenterDoNotDisruptValue} + } + return pod +} + +// withGuardInformer attaches a (non-running) pod informer to the pool and seeds +// the cache, so podHasDoNotDisrupt reads the pods we stage. Returns a function +// to (re)sync the cache from a set of pods, simulating the informer observing a +// patch. +func withGuardInformer(t *testing.T, pool *K8sWorkerPool, cs *fake.Clientset, pods ...*corev1.Pod) func(...*corev1.Pod) { + t.Helper() + pool.informer = informers.NewSharedInformerFactory(cs, 0).Core().V1().Pods().Informer() + idx := pool.informer.GetIndexer() + sync := func(ps ...*corev1.Pod) { + for _, p := range ps { + if err := idx.Add(p); err != nil { + t.Fatalf("seed informer: %v", err) + } + } + } + sync(pods...) + return sync +} + +func countPatchReactor(cs *fake.Clientset, n *int32) { + cs.PrependReactor("patch", "pods", func(k8stesting.Action) (bool, runtime.Object, error) { + atomic.AddInt32(n, 1) + return false, nil, nil // fall through to default reactor (applies the patch) + }) +} + +// TestReconcileDisruptionGuardsSetsAndClears: a busy worker's pod gets the +// annotation; an idle worker's pod has it removed; steady state issues no patch. +func TestReconcileDisruptionGuardsSetsAndClears(t *testing.T) { + pool, cs := newTestK8sPool(t, 5) + ctx := context.Background() + + busyPod := guardPodAnnotated("wpod-busy", false) + idlePod := guardPodAnnotated("wpod-idle", true) + for _, pod := range []*corev1.Pod{busyPod, idlePod} { + if _, err := cs.CoreV1().Pods("default").Create(ctx, pod, metav1.CreateOptions{}); err != nil { + t.Fatalf("create pod %s: %v", pod.Name, err) + } + } + resync := withGuardInformer(t, pool, cs, busyPod, idlePod) + + var patches int32 + countPatchReactor(cs, &patches) + + pool.workers[1] = &ManagedWorker{ID: 1, podName: "wpod-busy", activeSessions: 1, done: make(chan struct{})} + pool.workers[2] = &ManagedWorker{ID: 2, podName: "wpod-idle", activeSessions: 0, done: make(chan struct{})} + + pool.reconcileDisruptionGuards(ctx) + + if got := podAnno(t, cs, "wpod-busy"); got != karpenterDoNotDisruptValue { + t.Fatalf("busy worker: want do-not-disrupt=true, got %q", got) + } + if got := podAnno(t, cs, "wpod-idle"); got != "" { + t.Fatalf("idle worker: want do-not-disrupt cleared, got %q", got) + } + if n := atomic.LoadInt32(&patches); n != 2 { + t.Fatalf("expected exactly 2 patches (1 set, 1 clear), got %d", n) + } + + // Steady state: cache now reflects the patched annotations -> no more patches. + resync(guardPodAnnotated("wpod-busy", true), guardPodAnnotated("wpod-idle", false)) + pool.reconcileDisruptionGuards(ctx) + if n := atomic.LoadInt32(&patches); n != 2 { + t.Fatalf("expected no additional patches in steady state, got %d", n) + } +} + +// TestRelabelAdoptedPodToThisCP verifies the adoption relabel makes a pod +// spawned by another CP carry THIS CP's control-plane label (so this CP's +// label-scoped informer can see it) while preserving the pod's other labels. +func TestRelabelAdoptedPodToThisCP(t *testing.T) { + pool, cs := newTestK8sPool(t, 5) // pool.cpID == "test-cp" + ctx := context.Background() + + if _, err := cs.CoreV1().Pods("default").Create(ctx, &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ + Name: "wpod-adopted", + Namespace: "default", + Labels: map[string]string{"duckgres/control-plane": "old-cp", "duckgres/worker-id": "9"}, + }}, metav1.CreateOptions{}); err != nil { + t.Fatal(err) + } + + if err := pool.relabelAdoptedPodToThisCP(ctx, "wpod-adopted"); err != nil { + t.Fatal(err) + } + + got, err := cs.CoreV1().Pods("default").Get(ctx, "wpod-adopted", metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + if got.Labels["duckgres/control-plane"] != "test-cp" { + t.Fatalf("want control-plane=test-cp after relabel, got %q", got.Labels["duckgres/control-plane"]) + } + if got.Labels["duckgres/worker-id"] != "9" { + t.Fatalf("merge patch clobbered other labels: %v", got.Labels) + } +} + +// TestReconcileDisruptionGuardsClearsStaleAnnotationAfterFailover is the +// CP-failover regression: a CP stamps do-not-disrupt on a busy worker, then +// dies; its sessions die with it so the worker is idle, but the pod keeps the +// annotation. A surviving/replacement CP (no in-memory record of having set it) +// must still clear the orphan. In production the pod becomes visible to the new +// CP's label-scoped informer via relabelAdoptedPodToThisCP on adoption (tested +// above); this test seeds the cache to that post-relabel state and asserts the +// cache-based reconcile clears the orphan. An in-memory "applied" flag would not +// (it would read applied=false==busy=false and skip). +func TestReconcileDisruptionGuardsClearsStaleAnnotationAfterFailover(t *testing.T) { + pool, cs := newTestK8sPool(t, 5) + ctx := context.Background() + + stale := guardPodAnnotated("wpod-orphan", true) // annotation left by the dead CP + if _, err := cs.CoreV1().Pods("default").Create(ctx, stale, metav1.CreateOptions{}); err != nil { + t.Fatal(err) + } + withGuardInformer(t, pool, cs, stale) + + // Adopted worker, freshly in memory: idle (sessions died with the old CP). + pool.workers[7] = &ManagedWorker{ID: 7, podName: "wpod-orphan", activeSessions: 0, done: make(chan struct{})} + + pool.reconcileDisruptionGuards(ctx) + + if got := podAnno(t, cs, "wpod-orphan"); got != "" { + t.Fatalf("orphaned do-not-disrupt not cleared after failover: got %q", got) + } +} + +// TestReconcileDisruptionGuardsSkipsExitingWorker: a worker whose done channel +// is closed (exiting) is not patched. +func TestReconcileDisruptionGuardsSkipsExitingWorker(t *testing.T) { + pool, cs := newTestK8sPool(t, 5) + ctx := context.Background() + + if _, err := cs.CoreV1().Pods("default").Create(ctx, guardPodAnnotated("wpod-exiting", false), metav1.CreateOptions{}); err != nil { + t.Fatal(err) + } + withGuardInformer(t, pool, cs, guardPodAnnotated("wpod-exiting", false)) + + var patches int32 + countPatchReactor(cs, &patches) + + done := make(chan struct{}) + close(done) + pool.workers[1] = &ManagedWorker{ID: 1, podName: "wpod-exiting", activeSessions: 1, done: done} + + pool.reconcileDisruptionGuards(ctx) + if n := atomic.LoadInt32(&patches); n != 0 { + t.Fatalf("expected no patches for an exiting worker, got %d", n) + } +} + +// TestWorkerPodTerminating verifies the cache-only Terminating check the health +// loop uses to distinguish a planned node drain from a crash. +func TestWorkerPodTerminating(t *testing.T) { + pool, cs := newTestK8sPool(t, 5) + pool.informer = informers.NewSharedInformerFactory(cs, 0).Core().V1().Pods().Informer() + + now := metav1.Now() + terminating := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "wpod-term", Namespace: "default", DeletionTimestamp: &now}} + alive := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "wpod-alive", Namespace: "default"}} + if err := pool.informer.GetIndexer().Add(terminating); err != nil { + t.Fatal(err) + } + if err := pool.informer.GetIndexer().Add(alive); err != nil { + t.Fatal(err) + } + + if !pool.workerPodTerminating("wpod-term") { + t.Fatal("expected Terminating pod (deletionTimestamp set) -> true") + } + if pool.workerPodTerminating("wpod-alive") { + t.Fatal("expected live pod -> false") + } + if pool.workerPodTerminating("wpod-missing") { + t.Fatal("expected pod absent from cache -> false") + } + if pool.workerPodTerminating("") { + t.Fatal("expected empty pod name -> false") + } +} + +// TestReconcileDisruptionGuardsNoDataRace runs the reconciler in a tight loop +// while many goroutines flip activeSessions under the pool lock (as Acquire/ +// ReleaseWorker do). Run with -race, this asserts the reconciler's snapshot- +// under-RLock / patch-without-lock discipline has no data race against concurrent +// session churn. +func TestReconcileDisruptionGuardsNoDataRace(t *testing.T) { + pool, cs := newTestK8sPool(t, 100) + ctx := context.Background() + + const n = 16 + var pods []*corev1.Pod + for i := 1; i <= n; i++ { + name := fmt.Sprintf("wpod-%d", i) + pod := guardPodAnnotated(name, i%2 == 0) + if _, err := cs.CoreV1().Pods("default").Create(ctx, pod, metav1.CreateOptions{}); err != nil { + t.Fatal(err) + } + pods = append(pods, pod) + pool.workers[i] = &ManagedWorker{ID: i, podName: name, done: make(chan struct{})} + } + withGuardInformer(t, pool, cs, pods...) + + stop := make(chan struct{}) + + // Mutators: flip activeSessions under the pool lock, mirroring Acquire/Release. + var muWG sync.WaitGroup + for i := 1; i <= n; i++ { + muWG.Add(1) + go func(id int) { + defer muWG.Done() + for j := 0; j < 2000; j++ { + pool.mu.Lock() + if w := pool.workers[id]; w != nil { + w.activeSessions ^= 1 + } + pool.mu.Unlock() + } + }(i) + } + + // Reconciler + health-path read, looping concurrently with the mutators. + var recWG sync.WaitGroup + recWG.Add(1) + go func() { + defer recWG.Done() + for { + select { + case <-stop: + return + default: + pool.reconcileDisruptionGuards(ctx) + pool.workerPodTerminating("wpod-1") + } + } + }() + + muWG.Wait() + close(stop) + recWG.Wait() +} + +func podAnno(t *testing.T, cs *fake.Clientset, name string) string { + t.Helper() + pod, err := cs.CoreV1().Pods("default").Get(context.Background(), name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("get pod %s: %v", name, err) + } + return pod.Annotations[karpenterDoNotDisruptAnnotation] +} diff --git a/k8s/rbac.yaml b/k8s/rbac.yaml index 1e482f93..d0754d27 100644 --- a/k8s/rbac.yaml +++ b/k8s/rbac.yaml @@ -17,10 +17,11 @@ metadata: name: duckgres-control-plane namespace: duckgres rules: - # Manage worker pods + # Manage worker pods. "patch" is required to set/clear the + # karpenter.sh/do-not-disrupt annotation on busy workers (drain-aware eviction). - apiGroups: [""] resources: ["pods"] - verbs: ["create", "delete", "get", "list", "watch"] + verbs: ["create", "delete", "get", "list", "patch", "watch"] # Validate the shared neutral worker startup config before spawn - apiGroups: [""] resources: ["configmaps"] diff --git a/tests/e2e-mw-dev/harness.sh b/tests/e2e-mw-dev/harness.sh index 19a97577..cab9588c 100755 --- a/tests/e2e-mw-dev/harness.sh +++ b/tests/e2e-mw-dev/harness.sh @@ -327,6 +327,70 @@ assert_worker_pod() { if echo "$mounts" | grep -q '/var/run/secrets/kubernetes.io/serviceaccount'; then fail "worker $pod mounts a kubernetes.io/serviceaccount token" fi + + # Drain window: workers carry a real terminationGracePeriodSeconds (600), not + # the 30s k8s default, so an in-flight query gets time to drain on SIGTERM + # (drain-aware eviction). TestK8sWorkerPodCreation-adjacent. + gp="$(k get pod "$pod" -o jsonpath='{.spec.terminationGracePeriodSeconds}')" + [ "$gp" = "600" ] || fail "worker $pod terminationGracePeriodSeconds '$gp' != 600" +} + +# ---- drain-aware eviction (do-not-disrupt on busy workers) ---------------- +# Regression guard for 2026-06-04: a Karpenter drift roll killed 31 in-flight +# queries because busy worker nodes were valid voluntary-disruption candidates. +# Now the CP stamps karpenter.sh/do-not-disrupt on a worker while it serves a +# session and clears it when idle, so Karpenter skips a node running a query. +# We hold a session open long enough for the CP reconciler (~5s) to stamp the +# annotation, assert some busy worker carries it, then assert it clears once the +# session ends. (Asserting Karpenter actually defers needs a real node drain, +# which is out of scope for the in-Job harness; see README. The annotation +# contract is what gates Karpenter, so that is what we assert.) +DND_JSONPATH='{.metadata.annotations.karpenter\.sh/do-not-disrupt}' +worker_dnd() { k get pod "$1" -o jsonpath="$DND_JSONPATH" 2>/dev/null; } +# Space-padded list of worker pods currently carrying do-not-disrupt. +annotated_workers() { + out=" " + for p in $(k get pods -l app=duckgres-worker -o jsonpath='{.items[*].metadata.name}' 2>/dev/null); do + [ "$(worker_dnd "$p")" = "true" ] && out="$out$p " + done + printf '%s' "$out" +} +assert_drain_aware_eviction() { # org password + log "drain-aware eviction: busy worker carries do-not-disrupt, idle worker clears it" + wait_worker "$1" "$2" ducklake + + # Attribute the annotation to OUR session: snapshot which workers are already + # annotated, then look for a NEWLY annotated one while we hold a session (a + # pre-existing busy worker must not make this pass for the wrong reason). + before="$(annotated_workers)" + + # Hold a session open ~40s. SELECT 1 proves the worker is acquired; the + # trailing sleep keeps stdin (and the connection, so activeSessions>0) open + # without blocking the harness. + { printf 'SELECT 1;\n'; sleep 40; } | PGPASSWORD="$2" psql \ + "sslmode=require host=$1$SNI_SUFFIX hostaddr=$CP_IP port=5432 user=root dbname=ducklake" \ + -v ON_ERROR_STOP=1 -tA >/dev/null 2>&1 & + hold_pid=$! + + busy=""; i=0 + while [ "$i" -lt 15 ]; do # up to ~30s for the reconciler to stamp it + for p in $(annotated_workers); do + case "$before" in *" $p "*) : ;; *) busy="$p"; break ;; esac + done + [ -n "$busy" ] && break + sleep 2; i=$((i + 1)) + done + [ -n "$busy" ] || { kill "$hold_pid" 2>/dev/null; fail "no worker acquired karpenter.sh/do-not-disrupt while a session was held"; } + log " session worker $busy carries do-not-disrupt=true" + + wait "$hold_pid" 2>/dev/null || true # session ends -> worker goes idle + cleared=""; i=0 + while [ "$i" -lt 20 ]; do # up to ~40s for the reconciler to clear it + [ -z "$(worker_dnd "$busy")" ] && { cleared=1; break; } + sleep 2; i=$((i + 1)) + done + [ -n "$cleared" ] || fail "do-not-disrupt not cleared on idle worker $busy after session end" + log " do-not-disrupt cleared after session end on $busy" } # ---- resilience ----------------------------------------------------------- @@ -464,6 +528,7 @@ main() { assert_fork_extensions "$CNPG" "$cnpg_pw" # after a DuckLake R/W (httpfs loaded) rw_iceberg "$CNPG" "$cnpg_pw" assert_worker_pod + assert_drain_aware_eviction "$CNPG" "$cnpg_pw" concurrent_connections "$CNPG" "$cnpg_pw" concurrent_writers "$CNPG" "$cnpg_pw" durability_across_restart "$CNPG" "$cnpg_pw" @@ -488,7 +553,7 @@ main() { # end-to-end would need a warm target >0 in the per-PR CP (see README). log "SKIP shared-warm-activation + version-reaper (CP runs warm-target=0; see README)" - log "PASS: wire + warm-pool-backpressure + activation(DuckLake/Iceberg) + ext-forks + worker-pod + concurrency + durability + crash-recovery + isolation + lifecycle-teardown, on cnpg & ext" + log "PASS: wire + warm-pool-backpressure + activation(DuckLake/Iceberg) + ext-forks + worker-pod + drain-aware-eviction + concurrency + durability + crash-recovery + isolation + lifecycle-teardown, on cnpg & ext" } main "$@" diff --git a/tests/manifests/manifests_test.go b/tests/manifests/manifests_test.go index d0d536b6..7a8a6b6c 100644 --- a/tests/manifests/manifests_test.go +++ b/tests/manifests/manifests_test.go @@ -27,6 +27,21 @@ func TestControlPlaneRBACIncludesLeaseAccess(t *testing.T) { } } +func TestControlPlaneRBACIncludesWorkerPodPatch(t *testing.T) { + // Drain-aware eviction sets/clears karpenter.sh/do-not-disrupt on busy + // worker pods via a merge patch, so the control plane Role must grant + // "patch" on pods (in addition to create/delete/get/list/watch). + content := readManifest(t, "k8s", "rbac.yaml") + for _, want := range []string{ + `resources: ["pods"]`, + `verbs: ["create", "delete", "get", "list", "patch", "watch"]`, + } { + if !strings.Contains(content, want) { + t.Fatalf("expected %q in k8s/rbac.yaml", want) + } + } +} + func TestControlPlaneRBACIncludesSharedWorkerConfigMapRead(t *testing.T) { content := readManifest(t, "k8s", "rbac.yaml") for _, want := range []string{