From 68af392b6fb93232dd9b0e501275f0034b362556 Mon Sep 17 00:00:00 2001 From: Bill Guowei Yang Date: Thu, 4 Jun 2026 15:36:11 -0400 Subject: [PATCH 1/5] fix(worker-pool): drain-aware worker eviction (do-not-disrupt + grace + health guard) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit On 2026-06-04 a Karpenter Drift roll (Bottlerocket AMI 1.61→1.62) tainted worker nodes out from under running queries: 31 in-flight queries were killed, each a median ~5s after its node was tainted while the node still had ~108s of life left. The control plane itself canceled them (3 failed health checks → "worker unresponsive"), even though the workers were draining. The CP drains its own sessions on a roll (900s grace, unbounded — control.go:46); workers had none of that. This gives a busy worker the same protection. 1a (controlplane/worker_disruption_guard.go): the CP stamps karpenter.sh/do-not-disrupt on a worker while it serves a session and clears it when idle, so Karpenter skips a node running a query. Reconciled every 5s on the shared K8sWorkerPool, covering both the flat pool and OrgReservedPool (which share its worker map). Patches only on busy<->idle transitions. Requires the new pods:patch RBAC. 1c (k8s_pool.go): worker pods set terminationGracePeriodSeconds=600 (was unset → 30s default) — a real drain window for in-flight queries on SIGTERM. 2a (k8s_pool.go HealthCheckLoop): a worker failing health checks while its pod is already Terminating (planned node drain) is no longer marked Lost / canceled; the loop defers to the informer-driven pod-terminated path so the worker drains. Terminating is read from the pod informer cache — no API call, no new RBAC. Protects against voluntary disruption only. Involuntary loss (spot reclaim, node failure) is unchanged; that residual tail needs transparent statement retry for commit-safe statements (follow-up). Disruption budgets are intentionally not touched: workers already had Drift budget=1 and it did not prevent the kills. Tests: - unit: reconciler set/clear/idempotent/skip-exiting; pod-Terminating cache check; pod-spec grace assertion (TestK8sPool_SpawnWorkerCreatesCorrectPod). - manifest: k8s/rbac.yaml now grants pods:patch. - e2e-mw-dev/harness.sh: a busy worker carries do-not-disrupt and an idle worker clears it; worker terminationGracePeriodSeconds=600. Co-Authored-By: Claude Opus 4.8 (1M context) --- controlplane/k8s_pool.go | 33 +++- controlplane/k8s_pool_test.go | 6 + controlplane/worker_disruption_guard.go | 191 +++++++++++++++++++ controlplane/worker_disruption_guard_test.go | 140 ++++++++++++++ controlplane/worker_mgr.go | 42 ++-- k8s/rbac.yaml | 5 +- tests/e2e-mw-dev/harness.sh | 54 +++++- tests/manifests/manifests_test.go | 15 ++ 8 files changed, 459 insertions(+), 27 deletions(-) create mode 100644 controlplane/worker_disruption_guard.go create mode 100644 controlplane/worker_disruption_guard_test.go diff --git a/controlplane/k8s_pool.go b/controlplane/k8s_pool.go index 7ecdeed1..0b51b447 100644 --- a/controlplane/k8s_pool.go +++ b/controlplane/k8s_pool.go @@ -253,6 +253,7 @@ func newK8sWorkerPool(cfg K8sWorkerPoolConfig, clientset kubernetes.Interface) ( observeControlPlaneWorkers(0) go pool.idleReaper() + go pool.disruptionGuardReconciler() return pool, nil } @@ -725,11 +726,16 @@ func (p *K8sWorkerPool) spawnWorker(ctx context.Context, id int, image string, p Labels: podLabels, }, Spec: corev1.PodSpec{ - RestartPolicy: corev1.RestartPolicyNever, - ServiceAccountName: p.workerServiceAccountName(), - AutomountServiceAccountToken: boolPtr(false), - PriorityClassName: p.workerPriorityClassName, - NodeSelector: p.nodeSelectorForProfile(profile), + RestartPolicy: corev1.RestartPolicyNever, + // Give in-flight queries a real drain window on SIGTERM instead of + // the 30s k8s default. Pairs with karpenter.sh/do-not-disrupt + // (set on busy workers) so a node roll defers to the query; this is + // the fallback for the residual race and involuntary eviction. + TerminationGracePeriodSeconds: int64Ptr(workerTerminationGracePeriodSeconds), + ServiceAccountName: p.workerServiceAccountName(), + AutomountServiceAccountToken: boolPtr(false), + PriorityClassName: p.workerPriorityClassName, + NodeSelector: p.nodeSelectorForProfile(profile), SecurityContext: &corev1.PodSecurityContext{ RunAsNonRoot: boolPtr(true), RunAsUser: int64Ptr(1000), @@ -2622,6 +2628,23 @@ func (p *K8sWorkerPool) HealthCheckLoop(ctx context.Context, interval time.Durat slog.Warn("K8s worker health check failed.", "id", lease.workerID, "error", healthErr, "consecutive_failures", count) if count >= maxConsecutiveHealthFailures { + // Planned node disruption (Karpenter drift/ + // consolidation, kubelet drain) is not a worker + // crash: the pod is already Terminating. Marking + // it Lost here cancels the in-flight query after + // ~3 health intervals (~5s), even though the + // worker is draining and its node typically lives + // ~100s+ longer. Defer to the informer-driven + // w.done path above, which fires when the pod is + // actually gone and runs the same cleanup — so the + // query drains (bounded by the pod's + // terminationGracePeriodSeconds) instead of being + // killed mid-flight. Cache-only read; no API call. + if p.workerPodTerminating(p.workerPodName(w)) { + slog.Warn("K8s worker failing health checks while pod is Terminating (planned node disruption); deferring to graceful drain instead of marking lost.", + "id", lease.workerID, "pod", p.workerPodName(w), "consecutive_failures", count) + return + } lostDisposition, err := p.markWorkerLostForHealthLease(lease, LifecycleOriginHealthCheckCrash) if err != nil { slog.Error("K8s worker unresponsive but lease validation failed; leaving cleanup to retry.", "id", lease.workerID, "owner_cp_instance_id", lease.ownerCPInstanceID, "owner_epoch", lease.ownerEpoch, "consecutive_failures", count, "error", err) diff --git a/controlplane/k8s_pool_test.go b/controlplane/k8s_pool_test.go index c1d39415..186a5fea 100644 --- a/controlplane/k8s_pool_test.go +++ b/controlplane/k8s_pool_test.go @@ -3552,6 +3552,12 @@ func assertSpawnedWorkerPod(t *testing.T, pod *corev1.Pod) { t.Fatal("expected automountServiceAccountToken=false for shared warm worker pods") } + // Drain-aware eviction: a real grace window for in-flight queries, not the + // 30s k8s default. + if pod.Spec.TerminationGracePeriodSeconds == nil || *pod.Spec.TerminationGracePeriodSeconds != workerTerminationGracePeriodSeconds { + t.Fatalf("expected terminationGracePeriodSeconds=%d, got %v", workerTerminationGracePeriodSeconds, pod.Spec.TerminationGracePeriodSeconds) + } + if pod.Spec.SecurityContext == nil || pod.Spec.SecurityContext.RunAsNonRoot == nil || !*pod.Spec.SecurityContext.RunAsNonRoot { t.Fatal("expected runAsNonRoot=true") } diff --git a/controlplane/worker_disruption_guard.go b/controlplane/worker_disruption_guard.go new file mode 100644 index 00000000..d6b4ddbe --- /dev/null +++ b/controlplane/worker_disruption_guard.go @@ -0,0 +1,191 @@ +//go:build kubernetes + +package controlplane + +import ( + "context" + "encoding/json" + "log/slog" + "time" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +// Drain-aware worker eviction (Tier 1a + Tier 2a of the 2026-06-04 RCA). +// +// A Karpenter Drift roll tainted worker nodes out from under running queries: +// each in-flight query died a median 5s after its node was tainted, while the +// node still had ~108s of life left, and the control plane *itself* canceled +// the query (3 failed health checks → "worker unresponsive"). 31 queries were +// killed this way. The control plane drains correctly on its own roll (900s +// grace, unbounded drain); workers had none of that. These two mechanisms give +// a busy worker the same protection: +// +// 1a. While a worker is running a query it carries karpenter.sh/do-not-disrupt, +// so Karpenter's *voluntary* disruption (drift, consolidation, expiry) +// skips its node. The annotation is cleared when the worker goes idle. +// 2a. If a worker fails health checks while its pod is already Terminating +// (a planned node drain), the health loop does NOT mark it Lost / cancel +// its query — it defers to the informer-driven pod-terminated path, letting +// the worker drain (bounded by terminationGracePeriodSeconds). +// +// This protects against *voluntary* disruption only. Involuntary loss (spot +// reclaim, node/hardware failure) still kills the query; that residual tail is +// handled by transparent statement retry for commit-safe statements (separate +// change), not here. + +const ( + // karpenterDoNotDisruptAnnotation, when set on a pod, tells Karpenter not to + // voluntarily disrupt the node hosting it (drift/consolidation/expiry). + // Involuntary disruption (spot reclaim, NodePool terminationGracePeriod + // ceiling) is unaffected — which is why we still need the Tier 2a guard and + // a bounded grace period. + karpenterDoNotDisruptAnnotation = "karpenter.sh/do-not-disrupt" + karpenterDoNotDisruptValue = "true" + + // disruptionGuardReconcileInterval is how often the busy/idle → annotation + // reconcile runs. A worker busy for longer than this is protected before the + // next Karpenter disruption decision; the only exposure is a query that both + // starts and is selected for voluntary disruption within one interval, which + // would be a sub-interval query anyway. Patches only fire on transitions, so + // steady state costs zero API calls. + disruptionGuardReconcileInterval = 5 * time.Second + + // workerTerminationGracePeriodSeconds is the worker pod's grace period. + // Unset historically (→ k8s default 30s), far too short for analytical + // queries. This is the fallback drain window for the residual race (worker + // became busy inside the reconcile gap) and for involuntary eviction; the + // primary protection for long queries is the do-not-disrupt annotation + // above. The NodePool-level terminationGracePeriod (infra) remains the hard + // ceiling so a security roll always completes. + workerTerminationGracePeriodSeconds int64 = 600 +) + +// disruptionGuardReconciler keeps karpenter.sh/do-not-disrupt in sync with each +// worker's busy/idle state. Started once from newK8sWorkerPool alongside +// idleReaper. Covers both the flat pool and OrgReservedPool, which share this +// pool's worker map. +func (p *K8sWorkerPool) disruptionGuardReconciler() { + ticker := time.NewTicker(disruptionGuardReconcileInterval) + defer ticker.Stop() + for { + select { + case <-p.shutdownCh: + return + case <-ticker.C: + p.reconcileDisruptionGuards(context.Background()) + } + } +} + +// reconcileDisruptionGuards patches the do-not-disrupt annotation onto workers +// that became busy and removes it from workers that went idle. It snapshots the +// desired vs. applied state under the lock, then performs K8s API calls without +// holding it. +// +// doNotDisruptApplied is in-memory, so a CP restart resets it to false. The +// failure direction is safe: a still-busy worker (busy=true, applied=false) +// gets re-patched set on the next tick; the only residue is a worker that went +// idle exactly across the restart, whose stale annotation merely defers its +// node's consolidation until the idle reaper retires it. It never clears a busy +// worker's annotation. (Initializing applied from the live pod annotation on +// adoption would remove even that residue — a follow-up.) +func (p *K8sWorkerPool) reconcileDisruptionGuards(ctx context.Context) { + type guardTarget struct { + worker *ManagedWorker + podName string + busy bool + } + + var targets []guardTarget + p.mu.RLock() + for _, w := range p.workers { + select { + case <-w.done: + continue // worker exiting; nothing to guard + default: + } + busy := w.activeSessions > 0 + if busy == w.doNotDisruptApplied { + continue // already in desired state + } + targets = append(targets, guardTarget{worker: w, podName: p.workerPodName(w), busy: busy}) + } + p.mu.RUnlock() + + for _, t := range targets { + if t.podName == "" { + continue + } + if err := p.patchWorkerDoNotDisrupt(ctx, t.podName, t.busy); err != nil { + if apierrors.IsNotFound(err) { + // Pod already gone — drop the optimistic applied flag so a + // replacement pod with the same worker is re-evaluated. + p.mu.Lock() + t.worker.doNotDisruptApplied = false + p.mu.Unlock() + continue + } + slog.Warn("Failed to reconcile worker do-not-disrupt annotation.", + "worker", t.worker.ID, "pod", t.podName, "busy", t.busy, "error", err) + continue + } + p.mu.Lock() + t.worker.doNotDisruptApplied = t.busy + p.mu.Unlock() + slog.Debug("Reconciled worker do-not-disrupt annotation.", + "worker", t.worker.ID, "pod", t.podName, "do_not_disrupt", t.busy) + } +} + +// patchWorkerDoNotDisrupt sets (enable) or removes (disable) the +// karpenter.sh/do-not-disrupt annotation on a worker pod via a JSON merge patch. +// A null value removes the key. +func (p *K8sWorkerPool) patchWorkerDoNotDisrupt(ctx context.Context, podName string, enable bool) error { + var value interface{} + if enable { + value = karpenterDoNotDisruptValue + } else { + value = nil // JSON merge patch: null deletes the annotation key + } + patch := map[string]interface{}{ + "metadata": map[string]interface{}{ + "annotations": map[string]interface{}{ + karpenterDoNotDisruptAnnotation: value, + }, + }, + } + data, err := json.Marshal(patch) + if err != nil { + return err + } + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + _, err = p.clientset.CoreV1().Pods(p.namespace).Patch(ctx, podName, types.MergePatchType, data, metav1.PatchOptions{}) + return err +} + +// workerPodTerminating reports whether the worker's pod is already being torn +// down (has a deletionTimestamp), read from the pod informer cache — no API +// call, no extra RBAC. The control plane evicts workers via Karpenter node +// drains and kubelet, both of which set deletionTimestamp before the worker +// stops answering health checks; a genuine crash leaves it nil. Used by the +// health-check loop to distinguish a planned drain (let the query finish) from +// a crash (mark Lost). +func (p *K8sWorkerPool) workerPodTerminating(podName string) bool { + if p.informer == nil || podName == "" { + return false + } + obj, exists, err := p.informer.GetIndexer().GetByKey(p.namespace + "/" + podName) + if err != nil || !exists { + return false + } + pod, ok := obj.(*corev1.Pod) + if !ok { + return false + } + return pod.DeletionTimestamp != nil +} diff --git a/controlplane/worker_disruption_guard_test.go b/controlplane/worker_disruption_guard_test.go new file mode 100644 index 00000000..e078dc39 --- /dev/null +++ b/controlplane/worker_disruption_guard_test.go @@ -0,0 +1,140 @@ +//go:build kubernetes + +package controlplane + +import ( + "context" + "sync/atomic" + "testing" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/informers" + k8stesting "k8s.io/client-go/testing" +) + +// TestReconcileDisruptionGuardsSetsAndClears verifies the reconciler stamps +// karpenter.sh/do-not-disrupt onto a busy worker's pod and removes it once the +// worker goes idle, and that it only patches on busy<->idle transitions. +func TestReconcileDisruptionGuardsSetsAndClears(t *testing.T) { + pool, cs := newTestK8sPool(t, 5) + ctx := context.Background() + + busyPod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ + Name: "wpod-busy", Namespace: "default", + Labels: map[string]string{"duckgres/worker-id": "1"}, + }} + idlePod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ + Name: "wpod-idle", Namespace: "default", + Labels: map[string]string{"duckgres/worker-id": "2"}, + Annotations: map[string]string{karpenterDoNotDisruptAnnotation: karpenterDoNotDisruptValue}, + }} + for _, pod := range []*corev1.Pod{busyPod, idlePod} { + if _, err := cs.CoreV1().Pods("default").Create(ctx, pod, metav1.CreateOptions{}); err != nil { + t.Fatalf("create pod %s: %v", pod.Name, err) + } + } + + var patches int32 + cs.PrependReactor("patch", "pods", func(k8stesting.Action) (bool, runtime.Object, error) { + atomic.AddInt32(&patches, 1) + return false, nil, nil // fall through to the default reactor (applies the patch) + }) + + // w1 busy and not yet guarded; w2 idle but still carrying the annotation. + pool.workers[1] = &ManagedWorker{ID: 1, podName: "wpod-busy", activeSessions: 1, doNotDisruptApplied: false, done: make(chan struct{})} + pool.workers[2] = &ManagedWorker{ID: 2, podName: "wpod-idle", activeSessions: 0, doNotDisruptApplied: true, done: make(chan struct{})} + + pool.reconcileDisruptionGuards(ctx) + + got, err := cs.CoreV1().Pods("default").Get(ctx, "wpod-busy", metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + if got.Annotations[karpenterDoNotDisruptAnnotation] != karpenterDoNotDisruptValue { + t.Fatalf("busy worker: want do-not-disrupt=true, got annotations=%v", got.Annotations) + } + if !pool.workers[1].doNotDisruptApplied { + t.Fatal("busy worker: expected doNotDisruptApplied=true after reconcile") + } + + got, err = cs.CoreV1().Pods("default").Get(ctx, "wpod-idle", metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + if _, ok := got.Annotations[karpenterDoNotDisruptAnnotation]; ok { + t.Fatalf("idle worker: want do-not-disrupt cleared, got annotations=%v", got.Annotations) + } + if pool.workers[2].doNotDisruptApplied { + t.Fatal("idle worker: expected doNotDisruptApplied=false after reconcile") + } + + if n := atomic.LoadInt32(&patches); n != 2 { + t.Fatalf("expected exactly 2 patches (1 set, 1 clear), got %d", n) + } + + // Steady state: nothing changed, so no further patches. + pool.reconcileDisruptionGuards(ctx) + if n := atomic.LoadInt32(&patches); n != 2 { + t.Fatalf("expected no additional patches on steady state, got %d", n) + } +} + +// TestReconcileDisruptionGuardsSkipsExitingWorker verifies a worker whose done +// channel is closed (exiting) is not patched — nothing to guard. +func TestReconcileDisruptionGuardsSkipsExitingWorker(t *testing.T) { + pool, cs := newTestK8sPool(t, 5) + ctx := context.Background() + + if _, err := cs.CoreV1().Pods("default").Create(ctx, &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ + Name: "wpod-exiting", Namespace: "default", + }}, metav1.CreateOptions{}); err != nil { + t.Fatal(err) + } + + var patches int32 + cs.PrependReactor("patch", "pods", func(k8stesting.Action) (bool, runtime.Object, error) { + atomic.AddInt32(&patches, 1) + return false, nil, nil + }) + + done := make(chan struct{}) + close(done) + pool.workers[1] = &ManagedWorker{ID: 1, podName: "wpod-exiting", activeSessions: 1, done: done} + + pool.reconcileDisruptionGuards(ctx) + if n := atomic.LoadInt32(&patches); n != 0 { + t.Fatalf("expected no patches for an exiting worker, got %d", n) + } +} + +// TestWorkerPodTerminating verifies the cache-only Terminating check that the +// health-check loop uses to distinguish a planned node drain from a crash. +func TestWorkerPodTerminating(t *testing.T) { + pool, cs := newTestK8sPool(t, 5) + pool.informer = informers.NewSharedInformerFactory(cs, 0).Core().V1().Pods().Informer() + + now := metav1.Now() + terminating := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "wpod-term", Namespace: "default", DeletionTimestamp: &now}} + alive := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "wpod-alive", Namespace: "default"}} + if err := pool.informer.GetIndexer().Add(terminating); err != nil { + t.Fatal(err) + } + if err := pool.informer.GetIndexer().Add(alive); err != nil { + t.Fatal(err) + } + + if !pool.workerPodTerminating("wpod-term") { + t.Fatal("expected Terminating pod (deletionTimestamp set) → true") + } + if pool.workerPodTerminating("wpod-alive") { + t.Fatal("expected live pod → false") + } + if pool.workerPodTerminating("wpod-missing") { + t.Fatal("expected pod absent from cache → false") + } + if pool.workerPodTerminating("") { + t.Fatal("expected empty pod name → false") + } +} diff --git a/controlplane/worker_mgr.go b/controlplane/worker_mgr.go index 86ed8e32..6764b56e 100644 --- a/controlplane/worker_mgr.go +++ b/controlplane/worker_mgr.go @@ -26,25 +26,29 @@ import ( // ManagedWorker represents a duckdb-service worker process. type ManagedWorker struct { - ID int - podName string - nodeName string //nolint:unused // only set in kubernetes warm-pool; drives cache-locality-aware scheduling - image string //nolint:unused // only set in kubernetes warm-pool; carried through runtime store records - profile WorkerProfile //nolint:unused // only set in kubernetes warm-pool; pod-shape this worker was spawned with (zero = default exclusive) - cmd *exec.Cmd - socketPath string - bearerToken string - client *flightsql.Client - parentListener net.Listener // CP-side listener; lifecycle managed by releaseSocket - prebound *preboundSocket // non-nil if using a pre-bound socket slot - releaseOnce sync.Once // ensures releaseWorkerSocket body runs exactly once - done chan struct{} // closed when process exits - exitErr error - activeSessions int // Number of sessions currently assigned to this worker - lastUsed time.Time // Last time a session was destroyed on this worker - sharedState SharedWorkerState - reservedAt time.Time //nolint:unused // only set in kubernetes warm-pool reservation path - peakSessions int // High-water mark of concurrent sessions (for retirement metrics) + ID int + podName string + nodeName string //nolint:unused // only set in kubernetes warm-pool; drives cache-locality-aware scheduling + image string //nolint:unused // only set in kubernetes warm-pool; carried through runtime store records + profile WorkerProfile //nolint:unused // only set in kubernetes warm-pool; pod-shape this worker was spawned with (zero = default exclusive) + cmd *exec.Cmd + socketPath string + bearerToken string + client *flightsql.Client + parentListener net.Listener // CP-side listener; lifecycle managed by releaseSocket + prebound *preboundSocket // non-nil if using a pre-bound socket slot + releaseOnce sync.Once // ensures releaseWorkerSocket body runs exactly once + done chan struct{} // closed when process exits + exitErr error + activeSessions int // Number of sessions currently assigned to this worker + lastUsed time.Time // Last time a session was destroyed on this worker + // doNotDisruptApplied tracks whether karpenter.sh/do-not-disrupt is currently + // set on this worker's pod, so the disruption-guard reconciler only issues a + // patch on busy<->idle transitions rather than every tick. + doNotDisruptApplied bool //nolint:unused // only set in kubernetes warm-pool disruption guard + sharedState SharedWorkerState + reservedAt time.Time //nolint:unused // only set in kubernetes warm-pool reservation path + peakSessions int // High-water mark of concurrent sessions (for retirement metrics) // ownerEpoch is guarded by epochMu so cred-refresh's // "RefreshLease then SetOwnerEpoch" sequence appears atomic to // concurrent readers (notably ShutdownAll's lease minting). diff --git a/k8s/rbac.yaml b/k8s/rbac.yaml index 1e482f93..d0754d27 100644 --- a/k8s/rbac.yaml +++ b/k8s/rbac.yaml @@ -17,10 +17,11 @@ metadata: name: duckgres-control-plane namespace: duckgres rules: - # Manage worker pods + # Manage worker pods. "patch" is required to set/clear the + # karpenter.sh/do-not-disrupt annotation on busy workers (drain-aware eviction). - apiGroups: [""] resources: ["pods"] - verbs: ["create", "delete", "get", "list", "watch"] + verbs: ["create", "delete", "get", "list", "patch", "watch"] # Validate the shared neutral worker startup config before spawn - apiGroups: [""] resources: ["configmaps"] diff --git a/tests/e2e-mw-dev/harness.sh b/tests/e2e-mw-dev/harness.sh index 19a97577..60cb7f82 100755 --- a/tests/e2e-mw-dev/harness.sh +++ b/tests/e2e-mw-dev/harness.sh @@ -327,6 +327,57 @@ assert_worker_pod() { if echo "$mounts" | grep -q '/var/run/secrets/kubernetes.io/serviceaccount'; then fail "worker $pod mounts a kubernetes.io/serviceaccount token" fi + + # Drain window: workers carry a real terminationGracePeriodSeconds (600), not + # the 30s k8s default, so an in-flight query gets time to drain on SIGTERM + # (drain-aware eviction). TestK8sWorkerPodCreation-adjacent. + gp="$(k get pod "$pod" -o jsonpath='{.spec.terminationGracePeriodSeconds}')" + [ "$gp" = "600" ] || fail "worker $pod terminationGracePeriodSeconds '$gp' != 600" +} + +# ---- drain-aware eviction (do-not-disrupt on busy workers) ---------------- +# Regression guard for 2026-06-04: a Karpenter drift roll killed 31 in-flight +# queries because busy worker nodes were valid voluntary-disruption candidates. +# Now the CP stamps karpenter.sh/do-not-disrupt on a worker while it serves a +# session and clears it when idle, so Karpenter skips a node running a query. +# We hold a session open long enough for the CP reconciler (~5s) to stamp the +# annotation, assert some busy worker carries it, then assert it clears once the +# session ends. (Asserting Karpenter actually defers needs a real node drain, +# which is out of scope for the in-Job harness; see README. The annotation +# contract is what gates Karpenter, so that is what we assert.) +DND_JSONPATH='{.metadata.annotations.karpenter\.sh/do-not-disrupt}' +worker_dnd() { k get pod "$1" -o jsonpath="$DND_JSONPATH" 2>/dev/null; } +assert_drain_aware_eviction() { # org password + log "drain-aware eviction: busy worker carries do-not-disrupt, idle worker clears it" + wait_worker "$1" "$2" ducklake + + # Hold a session open ~30s. SELECT 1 proves the worker is acquired; the + # trailing sleep keeps stdin (and thus the connection, and activeSessions>0) + # open without blocking the harness. + { printf 'SELECT 1;\n'; sleep 30; } | PGPASSWORD="$2" psql \ + "sslmode=require host=$1$SNI_SUFFIX hostaddr=$CP_IP port=5432 user=root dbname=ducklake" \ + -v ON_ERROR_STOP=1 -tA >/dev/null 2>&1 & + hold_pid=$! + + busy="" + for _ in $(seq 1 12); do # up to ~24s for the reconciler to stamp it + for p in $(k get pods -l app=duckgres-worker -o jsonpath='{.items[*].metadata.name}' 2>/dev/null); do + [ "$(worker_dnd "$p")" = "true" ] && { busy="$p"; break; } + done + [ -n "$busy" ] && break + sleep 2 + done + [ -n "$busy" ] || { kill "$hold_pid" 2>/dev/null; fail "no busy worker received karpenter.sh/do-not-disrupt"; } + log " busy worker $busy carries do-not-disrupt=true" + + wait "$hold_pid" 2>/dev/null || true # session ends -> worker goes idle + cleared="" + for _ in $(seq 1 15); do # up to ~30s for the reconciler to clear it + [ -z "$(worker_dnd "$busy")" ] && { cleared=1; break; } + sleep 2 + done + [ -n "$cleared" ] || fail "do-not-disrupt not cleared on idle worker $busy" + log " do-not-disrupt cleared after session end on $busy" } # ---- resilience ----------------------------------------------------------- @@ -464,6 +515,7 @@ main() { assert_fork_extensions "$CNPG" "$cnpg_pw" # after a DuckLake R/W (httpfs loaded) rw_iceberg "$CNPG" "$cnpg_pw" assert_worker_pod + assert_drain_aware_eviction "$CNPG" "$cnpg_pw" concurrent_connections "$CNPG" "$cnpg_pw" concurrent_writers "$CNPG" "$cnpg_pw" durability_across_restart "$CNPG" "$cnpg_pw" @@ -488,7 +540,7 @@ main() { # end-to-end would need a warm target >0 in the per-PR CP (see README). log "SKIP shared-warm-activation + version-reaper (CP runs warm-target=0; see README)" - log "PASS: wire + warm-pool-backpressure + activation(DuckLake/Iceberg) + ext-forks + worker-pod + concurrency + durability + crash-recovery + isolation + lifecycle-teardown, on cnpg & ext" + log "PASS: wire + warm-pool-backpressure + activation(DuckLake/Iceberg) + ext-forks + worker-pod + drain-aware-eviction + concurrency + durability + crash-recovery + isolation + lifecycle-teardown, on cnpg & ext" } main "$@" diff --git a/tests/manifests/manifests_test.go b/tests/manifests/manifests_test.go index d0d536b6..7a8a6b6c 100644 --- a/tests/manifests/manifests_test.go +++ b/tests/manifests/manifests_test.go @@ -27,6 +27,21 @@ func TestControlPlaneRBACIncludesLeaseAccess(t *testing.T) { } } +func TestControlPlaneRBACIncludesWorkerPodPatch(t *testing.T) { + // Drain-aware eviction sets/clears karpenter.sh/do-not-disrupt on busy + // worker pods via a merge patch, so the control plane Role must grant + // "patch" on pods (in addition to create/delete/get/list/watch). + content := readManifest(t, "k8s", "rbac.yaml") + for _, want := range []string{ + `resources: ["pods"]`, + `verbs: ["create", "delete", "get", "list", "patch", "watch"]`, + } { + if !strings.Contains(content, want) { + t.Fatalf("expected %q in k8s/rbac.yaml", want) + } + } +} + func TestControlPlaneRBACIncludesSharedWorkerConfigMapRead(t *testing.T) { content := readManifest(t, "k8s", "rbac.yaml") for _, want := range []string{ From 8afdf2799a23cd84b1ff9ada085b47dd176d81a1 Mon Sep 17 00:00:00 2001 From: Bill Guowei Yang Date: Thu, 4 Jun 2026 15:59:25 -0400 Subject: [PATCH 2/5] fix(worker-pool): only defer eviction for BUSY workers; note NodePool grace dependency MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Holistic review against the live Karpenter config (v1.9.0, karpenter.sh/v1) surfaced two issues: 1. 2a regression: deferring "mark Lost" whenever a worker's pod was Terminating kept the worker in the pool until the pod was actually deleted. findIdle/ leastLoaded only skip workers whose done channel is closed (pod gone), so a Terminating-but-not-deleted IDLE worker stayed acquirable — a new session could be routed to a shutting-down pod, for no benefit (no query to protect). Fix: gate the deferral on activeSessions>0. Idle workers are marked Lost promptly as before; only busy workers (a query to drain) defer. 2. The duckgres-workers / -colocated / -cp NodePools all have terminationGracePeriod: None. In Karpenter v1, do-not-disrupt then blocks Drift/expiration INDEFINITELY (no forced removal), so a long/stuck/idle-held session could wedge a security AMI roll. The do-not-disrupt change (1a) MUST ship with a NodePool terminationGracePeriod ceiling. Documented as a blocking companion infra change in the PR; not fixable in this repo (ArgoCD-managed karpenter-config). Co-Authored-By: Claude Opus 4.8 (1M context) --- controlplane/k8s_pool.go | 35 +++++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/controlplane/k8s_pool.go b/controlplane/k8s_pool.go index 0b51b447..6d615b16 100644 --- a/controlplane/k8s_pool.go +++ b/controlplane/k8s_pool.go @@ -2630,19 +2630,30 @@ func (p *K8sWorkerPool) HealthCheckLoop(ctx context.Context, interval time.Durat if count >= maxConsecutiveHealthFailures { // Planned node disruption (Karpenter drift/ // consolidation, kubelet drain) is not a worker - // crash: the pod is already Terminating. Marking - // it Lost here cancels the in-flight query after - // ~3 health intervals (~5s), even though the + // crash: the pod is already Terminating. Marking a + // BUSY worker Lost here cancels its in-flight query + // after ~3 health intervals (~5s), even though the // worker is draining and its node typically lives - // ~100s+ longer. Defer to the informer-driven - // w.done path above, which fires when the pod is - // actually gone and runs the same cleanup — so the - // query drains (bounded by the pod's - // terminationGracePeriodSeconds) instead of being - // killed mid-flight. Cache-only read; no API call. - if p.workerPodTerminating(p.workerPodName(w)) { - slog.Warn("K8s worker failing health checks while pod is Terminating (planned node disruption); deferring to graceful drain instead of marking lost.", - "id", lease.workerID, "pod", p.workerPodName(w), "consecutive_failures", count) + // ~100s+ longer. For a busy worker, defer to the + // informer-driven w.done path above (fires when the + // pod is actually gone and runs the same cleanup) so + // the query drains, bounded by the pod's + // terminationGracePeriodSeconds. Cache-only read. + // + // Gate on activeSessions>0: an idle worker has no + // query to protect, and leaving a Terminating-but- + // not-yet-deleted worker in the pool would let + // AcquireWorker route a new session to a shutting- + // down pod (findIdle/leastLoaded only skip workers + // whose done channel is closed, i.e. already gone). + // So idle workers fall through and are marked Lost + // promptly, exactly as before this change. + p.mu.RLock() + active := w.activeSessions + p.mu.RUnlock() + if active > 0 && p.workerPodTerminating(p.workerPodName(w)) { + slog.Warn("Busy K8s worker failing health checks while pod is Terminating (planned node drain); deferring to graceful drain instead of canceling its query.", + "id", lease.workerID, "pod", p.workerPodName(w), "active_sessions", active, "consecutive_failures", count) return } lostDisposition, err := p.markWorkerLostForHealthLease(lease, LifecycleOriginHealthCheckCrash) From 5d99d797236937c2e605eb46d69214011bf6a0b6 Mon Sep 17 00:00:00 2001 From: Bill Guowei Yang Date: Thu, 4 Jun 2026 16:32:47 -0400 Subject: [PATCH 3/5] fix(worker-pool): make do-not-disrupt reconcile stateless (survive CP failover) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The reconciler tracked applied-state in an in-memory ManagedWorker field. That loses the orphan-clear case on CP death/failover: when a CP dies, its sessions die with it (worker goes idle) but the worker pod keeps the do-not-disrupt annotation the dead CP stamped. A surviving/replacement CP adopts the worker with applied=false and activeSessions=0, so applied==busy==false and the reconciler skips it forever — leaving an orphaned annotation that suppresses Karpenter consolidation until the idle reaper happens to delete the pod. Reconcile against the pod's ACTUAL annotation read from the pod informer cache (new podHasDoNotDisrupt, no API call) instead of an in-memory flag. Any CP now self-corrects: it sees desired=idle vs current=annotated and clears the orphan. Removes the ManagedWorker field entirely (worker_mgr.go back to unchanged). Also gates the 2a health-check deferral on activeSessions>0 (prior commit), so a Terminating *idle* worker is still marked Lost promptly rather than lingering in the acquirable pool. New regression test TestReconcileDisruptionGuardsClearsStaleAnnotationAfterFailover covers the orphan path; steady-state still issues zero patches. Note (no code change): verified against Karpenter v1 docs that spec.template.spec.terminationGracePeriod DOES make Drift bypass do-not-disrupt after the grace ("a node may be disrupted via drift even if there are pods with ... the karpenter.sh/do-not-disrupt annotation"), so the charts#11756 ceiling correctly bounds the hold for AMI/CVE drift — not just expiration. Co-Authored-By: Claude Opus 4.8 (1M context) --- controlplane/worker_disruption_guard.go | 60 +++++--- controlplane/worker_disruption_guard_test.go | 151 ++++++++++++------- controlplane/worker_mgr.go | 42 +++--- 3 files changed, 151 insertions(+), 102 deletions(-) diff --git a/controlplane/worker_disruption_guard.go b/controlplane/worker_disruption_guard.go index d6b4ddbe..1b153ae1 100644 --- a/controlplane/worker_disruption_guard.go +++ b/controlplane/worker_disruption_guard.go @@ -82,17 +82,20 @@ func (p *K8sWorkerPool) disruptionGuardReconciler() { } // reconcileDisruptionGuards patches the do-not-disrupt annotation onto workers -// that became busy and removes it from workers that went idle. It snapshots the -// desired vs. applied state under the lock, then performs K8s API calls without -// holding it. +// that became busy and removes it from workers that went idle. // -// doNotDisruptApplied is in-memory, so a CP restart resets it to false. The -// failure direction is safe: a still-busy worker (busy=true, applied=false) -// gets re-patched set on the next tick; the only residue is a worker that went -// idle exactly across the restart, whose stale annotation merely defers its -// node's consolidation until the idle reaper retires it. It never clears a busy -// worker's annotation. (Initializing applied from the live pod annotation on -// adoption would remove even that residue — a follow-up.) +// Desired state (busy) is compared against the pod's ACTUAL annotation, read +// from the pod informer cache — not an in-memory flag. This keeps the reconcile +// stateless and self-correcting across control-plane restarts and failover: +// when a CP dies, the worker's session(s) die with it (so the worker goes idle) +// but its pod keeps the annotation the dead CP stamped. A surviving/replacement +// CP that adopts the worker has no in-memory memory of having set it; reading +// the live annotation lets it see desired=idle vs current=set and clear the +// orphan. (An in-memory "applied" flag would miss this — it would read +// applied=false==busy=false and never clear the stale annotation.) +// +// It snapshots desired state under the lock, reads current state from the +// cache, and issues K8s API calls without holding the lock. func (p *K8sWorkerPool) reconcileDisruptionGuards(ctx context.Context) { type guardTarget struct { worker *ManagedWorker @@ -108,11 +111,7 @@ func (p *K8sWorkerPool) reconcileDisruptionGuards(ctx context.Context) { continue // worker exiting; nothing to guard default: } - busy := w.activeSessions > 0 - if busy == w.doNotDisruptApplied { - continue // already in desired state - } - targets = append(targets, guardTarget{worker: w, podName: p.workerPodName(w), busy: busy}) + targets = append(targets, guardTarget{worker: w, podName: p.workerPodName(w), busy: w.activeSessions > 0}) } p.mu.RUnlock() @@ -120,27 +119,40 @@ func (p *K8sWorkerPool) reconcileDisruptionGuards(ctx context.Context) { if t.podName == "" { continue } + if t.busy == p.podHasDoNotDisrupt(t.podName) { + continue // pod already in the desired state (steady state: no API call) + } if err := p.patchWorkerDoNotDisrupt(ctx, t.podName, t.busy); err != nil { if apierrors.IsNotFound(err) { - // Pod already gone — drop the optimistic applied flag so a - // replacement pod with the same worker is re-evaluated. - p.mu.Lock() - t.worker.doNotDisruptApplied = false - p.mu.Unlock() - continue + continue // pod gone; nothing to guard } slog.Warn("Failed to reconcile worker do-not-disrupt annotation.", "worker", t.worker.ID, "pod", t.podName, "busy", t.busy, "error", err) continue } - p.mu.Lock() - t.worker.doNotDisruptApplied = t.busy - p.mu.Unlock() slog.Debug("Reconciled worker do-not-disrupt annotation.", "worker", t.worker.ID, "pod", t.podName, "do_not_disrupt", t.busy) } } +// podHasDoNotDisrupt reports whether the worker pod currently carries the +// karpenter.sh/do-not-disrupt annotation, read from the pod informer cache +// (no API call). Returns false if the pod is not in the cache yet. +func (p *K8sWorkerPool) podHasDoNotDisrupt(podName string) bool { + if p.informer == nil || podName == "" { + return false + } + obj, exists, err := p.informer.GetIndexer().GetByKey(p.namespace + "/" + podName) + if err != nil || !exists { + return false + } + pod, ok := obj.(*corev1.Pod) + if !ok { + return false + } + return pod.Annotations[karpenterDoNotDisruptAnnotation] == karpenterDoNotDisruptValue +} + // patchWorkerDoNotDisrupt sets (enable) or removes (disable) the // karpenter.sh/do-not-disrupt annotation on a worker pod via a JSON merge patch. // A null value removes the key. diff --git a/controlplane/worker_disruption_guard_test.go b/controlplane/worker_disruption_guard_test.go index e078dc39..eca7bb17 100644 --- a/controlplane/worker_disruption_guard_test.go +++ b/controlplane/worker_disruption_guard_test.go @@ -11,93 +11,125 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" k8stesting "k8s.io/client-go/testing" ) -// TestReconcileDisruptionGuardsSetsAndClears verifies the reconciler stamps -// karpenter.sh/do-not-disrupt onto a busy worker's pod and removes it once the -// worker goes idle, and that it only patches on busy<->idle transitions. +// guardPodAnnotated builds a worker pod with/without the do-not-disrupt annotation. +func guardPodAnnotated(name string, annotated bool) *corev1.Pod { + pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: "default"}} + if annotated { + pod.Annotations = map[string]string{karpenterDoNotDisruptAnnotation: karpenterDoNotDisruptValue} + } + return pod +} + +// withGuardInformer attaches a (non-running) pod informer to the pool and seeds +// the cache, so podHasDoNotDisrupt reads the pods we stage. Returns a function +// to (re)sync the cache from a set of pods, simulating the informer observing a +// patch. +func withGuardInformer(t *testing.T, pool *K8sWorkerPool, cs *fake.Clientset, pods ...*corev1.Pod) func(...*corev1.Pod) { + t.Helper() + pool.informer = informers.NewSharedInformerFactory(cs, 0).Core().V1().Pods().Informer() + idx := pool.informer.GetIndexer() + sync := func(ps ...*corev1.Pod) { + for _, p := range ps { + if err := idx.Add(p); err != nil { + t.Fatalf("seed informer: %v", err) + } + } + } + sync(pods...) + return sync +} + +func countPatchReactor(cs *fake.Clientset, n *int32) { + cs.PrependReactor("patch", "pods", func(k8stesting.Action) (bool, runtime.Object, error) { + atomic.AddInt32(n, 1) + return false, nil, nil // fall through to default reactor (applies the patch) + }) +} + +// TestReconcileDisruptionGuardsSetsAndClears: a busy worker's pod gets the +// annotation; an idle worker's pod has it removed; steady state issues no patch. func TestReconcileDisruptionGuardsSetsAndClears(t *testing.T) { pool, cs := newTestK8sPool(t, 5) ctx := context.Background() - busyPod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ - Name: "wpod-busy", Namespace: "default", - Labels: map[string]string{"duckgres/worker-id": "1"}, - }} - idlePod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ - Name: "wpod-idle", Namespace: "default", - Labels: map[string]string{"duckgres/worker-id": "2"}, - Annotations: map[string]string{karpenterDoNotDisruptAnnotation: karpenterDoNotDisruptValue}, - }} + busyPod := guardPodAnnotated("wpod-busy", false) + idlePod := guardPodAnnotated("wpod-idle", true) for _, pod := range []*corev1.Pod{busyPod, idlePod} { if _, err := cs.CoreV1().Pods("default").Create(ctx, pod, metav1.CreateOptions{}); err != nil { t.Fatalf("create pod %s: %v", pod.Name, err) } } + resync := withGuardInformer(t, pool, cs, busyPod, idlePod) var patches int32 - cs.PrependReactor("patch", "pods", func(k8stesting.Action) (bool, runtime.Object, error) { - atomic.AddInt32(&patches, 1) - return false, nil, nil // fall through to the default reactor (applies the patch) - }) + countPatchReactor(cs, &patches) - // w1 busy and not yet guarded; w2 idle but still carrying the annotation. - pool.workers[1] = &ManagedWorker{ID: 1, podName: "wpod-busy", activeSessions: 1, doNotDisruptApplied: false, done: make(chan struct{})} - pool.workers[2] = &ManagedWorker{ID: 2, podName: "wpod-idle", activeSessions: 0, doNotDisruptApplied: true, done: make(chan struct{})} + pool.workers[1] = &ManagedWorker{ID: 1, podName: "wpod-busy", activeSessions: 1, done: make(chan struct{})} + pool.workers[2] = &ManagedWorker{ID: 2, podName: "wpod-idle", activeSessions: 0, done: make(chan struct{})} pool.reconcileDisruptionGuards(ctx) - got, err := cs.CoreV1().Pods("default").Get(ctx, "wpod-busy", metav1.GetOptions{}) - if err != nil { - t.Fatal(err) + if got := podAnno(t, cs, "wpod-busy"); got != karpenterDoNotDisruptValue { + t.Fatalf("busy worker: want do-not-disrupt=true, got %q", got) } - if got.Annotations[karpenterDoNotDisruptAnnotation] != karpenterDoNotDisruptValue { - t.Fatalf("busy worker: want do-not-disrupt=true, got annotations=%v", got.Annotations) + if got := podAnno(t, cs, "wpod-idle"); got != "" { + t.Fatalf("idle worker: want do-not-disrupt cleared, got %q", got) } - if !pool.workers[1].doNotDisruptApplied { - t.Fatal("busy worker: expected doNotDisruptApplied=true after reconcile") + if n := atomic.LoadInt32(&patches); n != 2 { + t.Fatalf("expected exactly 2 patches (1 set, 1 clear), got %d", n) } - got, err = cs.CoreV1().Pods("default").Get(ctx, "wpod-idle", metav1.GetOptions{}) - if err != nil { - t.Fatal(err) - } - if _, ok := got.Annotations[karpenterDoNotDisruptAnnotation]; ok { - t.Fatalf("idle worker: want do-not-disrupt cleared, got annotations=%v", got.Annotations) - } - if pool.workers[2].doNotDisruptApplied { - t.Fatal("idle worker: expected doNotDisruptApplied=false after reconcile") + // Steady state: cache now reflects the patched annotations -> no more patches. + resync(guardPodAnnotated("wpod-busy", true), guardPodAnnotated("wpod-idle", false)) + pool.reconcileDisruptionGuards(ctx) + if n := atomic.LoadInt32(&patches); n != 2 { + t.Fatalf("expected no additional patches in steady state, got %d", n) } +} - if n := atomic.LoadInt32(&patches); n != 2 { - t.Fatalf("expected exactly 2 patches (1 set, 1 clear), got %d", n) +// TestReconcileDisruptionGuardsClearsStaleAnnotationAfterFailover is the +// CP-failover regression: a CP stamps do-not-disrupt on a busy worker, then +// dies; its sessions die with it so the worker is idle, but the pod keeps the +// annotation. A surviving/replacement CP (no in-memory record of having set it) +// must still clear the orphan. The cache-based reconcile does; an in-memory +// "applied" flag would not (it would read applied=false==busy=false and skip). +func TestReconcileDisruptionGuardsClearsStaleAnnotationAfterFailover(t *testing.T) { + pool, cs := newTestK8sPool(t, 5) + ctx := context.Background() + + stale := guardPodAnnotated("wpod-orphan", true) // annotation left by the dead CP + if _, err := cs.CoreV1().Pods("default").Create(ctx, stale, metav1.CreateOptions{}); err != nil { + t.Fatal(err) } + withGuardInformer(t, pool, cs, stale) + + // Adopted worker, freshly in memory: idle (sessions died with the old CP). + pool.workers[7] = &ManagedWorker{ID: 7, podName: "wpod-orphan", activeSessions: 0, done: make(chan struct{})} - // Steady state: nothing changed, so no further patches. pool.reconcileDisruptionGuards(ctx) - if n := atomic.LoadInt32(&patches); n != 2 { - t.Fatalf("expected no additional patches on steady state, got %d", n) + + if got := podAnno(t, cs, "wpod-orphan"); got != "" { + t.Fatalf("orphaned do-not-disrupt not cleared after failover: got %q", got) } } -// TestReconcileDisruptionGuardsSkipsExitingWorker verifies a worker whose done -// channel is closed (exiting) is not patched — nothing to guard. +// TestReconcileDisruptionGuardsSkipsExitingWorker: a worker whose done channel +// is closed (exiting) is not patched. func TestReconcileDisruptionGuardsSkipsExitingWorker(t *testing.T) { pool, cs := newTestK8sPool(t, 5) ctx := context.Background() - if _, err := cs.CoreV1().Pods("default").Create(ctx, &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ - Name: "wpod-exiting", Namespace: "default", - }}, metav1.CreateOptions{}); err != nil { + if _, err := cs.CoreV1().Pods("default").Create(ctx, guardPodAnnotated("wpod-exiting", false), metav1.CreateOptions{}); err != nil { t.Fatal(err) } + withGuardInformer(t, pool, cs, guardPodAnnotated("wpod-exiting", false)) var patches int32 - cs.PrependReactor("patch", "pods", func(k8stesting.Action) (bool, runtime.Object, error) { - atomic.AddInt32(&patches, 1) - return false, nil, nil - }) + countPatchReactor(cs, &patches) done := make(chan struct{}) close(done) @@ -109,8 +141,8 @@ func TestReconcileDisruptionGuardsSkipsExitingWorker(t *testing.T) { } } -// TestWorkerPodTerminating verifies the cache-only Terminating check that the -// health-check loop uses to distinguish a planned node drain from a crash. +// TestWorkerPodTerminating verifies the cache-only Terminating check the health +// loop uses to distinguish a planned node drain from a crash. func TestWorkerPodTerminating(t *testing.T) { pool, cs := newTestK8sPool(t, 5) pool.informer = informers.NewSharedInformerFactory(cs, 0).Core().V1().Pods().Informer() @@ -126,15 +158,24 @@ func TestWorkerPodTerminating(t *testing.T) { } if !pool.workerPodTerminating("wpod-term") { - t.Fatal("expected Terminating pod (deletionTimestamp set) → true") + t.Fatal("expected Terminating pod (deletionTimestamp set) -> true") } if pool.workerPodTerminating("wpod-alive") { - t.Fatal("expected live pod → false") + t.Fatal("expected live pod -> false") } if pool.workerPodTerminating("wpod-missing") { - t.Fatal("expected pod absent from cache → false") + t.Fatal("expected pod absent from cache -> false") } if pool.workerPodTerminating("") { - t.Fatal("expected empty pod name → false") + t.Fatal("expected empty pod name -> false") + } +} + +func podAnno(t *testing.T, cs *fake.Clientset, name string) string { + t.Helper() + pod, err := cs.CoreV1().Pods("default").Get(context.Background(), name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("get pod %s: %v", name, err) } + return pod.Annotations[karpenterDoNotDisruptAnnotation] } diff --git a/controlplane/worker_mgr.go b/controlplane/worker_mgr.go index 6764b56e..86ed8e32 100644 --- a/controlplane/worker_mgr.go +++ b/controlplane/worker_mgr.go @@ -26,29 +26,25 @@ import ( // ManagedWorker represents a duckdb-service worker process. type ManagedWorker struct { - ID int - podName string - nodeName string //nolint:unused // only set in kubernetes warm-pool; drives cache-locality-aware scheduling - image string //nolint:unused // only set in kubernetes warm-pool; carried through runtime store records - profile WorkerProfile //nolint:unused // only set in kubernetes warm-pool; pod-shape this worker was spawned with (zero = default exclusive) - cmd *exec.Cmd - socketPath string - bearerToken string - client *flightsql.Client - parentListener net.Listener // CP-side listener; lifecycle managed by releaseSocket - prebound *preboundSocket // non-nil if using a pre-bound socket slot - releaseOnce sync.Once // ensures releaseWorkerSocket body runs exactly once - done chan struct{} // closed when process exits - exitErr error - activeSessions int // Number of sessions currently assigned to this worker - lastUsed time.Time // Last time a session was destroyed on this worker - // doNotDisruptApplied tracks whether karpenter.sh/do-not-disrupt is currently - // set on this worker's pod, so the disruption-guard reconciler only issues a - // patch on busy<->idle transitions rather than every tick. - doNotDisruptApplied bool //nolint:unused // only set in kubernetes warm-pool disruption guard - sharedState SharedWorkerState - reservedAt time.Time //nolint:unused // only set in kubernetes warm-pool reservation path - peakSessions int // High-water mark of concurrent sessions (for retirement metrics) + ID int + podName string + nodeName string //nolint:unused // only set in kubernetes warm-pool; drives cache-locality-aware scheduling + image string //nolint:unused // only set in kubernetes warm-pool; carried through runtime store records + profile WorkerProfile //nolint:unused // only set in kubernetes warm-pool; pod-shape this worker was spawned with (zero = default exclusive) + cmd *exec.Cmd + socketPath string + bearerToken string + client *flightsql.Client + parentListener net.Listener // CP-side listener; lifecycle managed by releaseSocket + prebound *preboundSocket // non-nil if using a pre-bound socket slot + releaseOnce sync.Once // ensures releaseWorkerSocket body runs exactly once + done chan struct{} // closed when process exits + exitErr error + activeSessions int // Number of sessions currently assigned to this worker + lastUsed time.Time // Last time a session was destroyed on this worker + sharedState SharedWorkerState + reservedAt time.Time //nolint:unused // only set in kubernetes warm-pool reservation path + peakSessions int // High-water mark of concurrent sessions (for retirement metrics) // ownerEpoch is guarded by epochMu so cred-refresh's // "RefreshLease then SetOwnerEpoch" sequence appears atomic to // concurrent readers (notably ShutdownAll's lease minting). From 1577ffa1e47eaa088b24a63dfc0f80a8ecee1dc0 Mon Sep 17 00:00:00 2001 From: Bill Guowei Yang Date: Thu, 4 Jun 2026 16:42:05 -0400 Subject: [PATCH 4/5] test(worker-pool): -race concurrency test for disruption-guard reconcile Loops reconcileDisruptionGuards + workerPodTerminating against 16 goroutines flipping activeSessions under the pool lock (as Acquire/ReleaseWorker do). Asserts the snapshot-under-RLock / patch-without-lock discipline is race-free. Co-Authored-By: Claude Opus 4.8 (1M context) --- controlplane/worker_disruption_guard_test.go | 63 ++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/controlplane/worker_disruption_guard_test.go b/controlplane/worker_disruption_guard_test.go index eca7bb17..5997ede1 100644 --- a/controlplane/worker_disruption_guard_test.go +++ b/controlplane/worker_disruption_guard_test.go @@ -4,6 +4,8 @@ package controlplane import ( "context" + "fmt" + "sync" "sync/atomic" "testing" @@ -171,6 +173,67 @@ func TestWorkerPodTerminating(t *testing.T) { } } +// TestReconcileDisruptionGuardsNoDataRace runs the reconciler in a tight loop +// while many goroutines flip activeSessions under the pool lock (as Acquire/ +// ReleaseWorker do). Run with -race, this asserts the reconciler's snapshot- +// under-RLock / patch-without-lock discipline has no data race against concurrent +// session churn. +func TestReconcileDisruptionGuardsNoDataRace(t *testing.T) { + pool, cs := newTestK8sPool(t, 100) + ctx := context.Background() + + const n = 16 + var pods []*corev1.Pod + for i := 1; i <= n; i++ { + name := fmt.Sprintf("wpod-%d", i) + pod := guardPodAnnotated(name, i%2 == 0) + if _, err := cs.CoreV1().Pods("default").Create(ctx, pod, metav1.CreateOptions{}); err != nil { + t.Fatal(err) + } + pods = append(pods, pod) + pool.workers[i] = &ManagedWorker{ID: i, podName: name, done: make(chan struct{})} + } + withGuardInformer(t, pool, cs, pods...) + + stop := make(chan struct{}) + + // Mutators: flip activeSessions under the pool lock, mirroring Acquire/Release. + var muWG sync.WaitGroup + for i := 1; i <= n; i++ { + muWG.Add(1) + go func(id int) { + defer muWG.Done() + for j := 0; j < 2000; j++ { + pool.mu.Lock() + if w := pool.workers[id]; w != nil { + w.activeSessions ^= 1 + } + pool.mu.Unlock() + } + }(i) + } + + // Reconciler + health-path read, looping concurrently with the mutators. + var recWG sync.WaitGroup + recWG.Add(1) + go func() { + defer recWG.Done() + for { + select { + case <-stop: + return + default: + pool.reconcileDisruptionGuards(ctx) + pool.workerPodTerminating("wpod-1") + } + } + }() + + muWG.Wait() + close(stop) + recWG.Wait() +} + func podAnno(t *testing.T, cs *fake.Clientset, name string) string { t.Helper() pod, err := cs.CoreV1().Pods("default").Get(context.Background(), name, metav1.GetOptions{}) From 295e6ef57f756520444efc8b880f8ddac830f344 Mon Sep 17 00:00:00 2001 From: Bill Guowei Yang Date: Thu, 4 Jun 2026 19:18:05 -0400 Subject: [PATCH 5/5] fix(worker-pool): address dual-review findings (adopted-worker visibility + scheduling) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to the dual-model review of #682. BLOCKING — relabel adopted pods so the informer sees them The pod informer is label-scoped to duckgres/control-plane= (cpID = os.Hostname(), per CP pod). adoptClaimedWorker did not relabel, so after a CP restart/rollout an adopted worker kept the dead CP's label and was invisible to the new CP's informer — defeating podHasDoNotDisrupt (stale annotation never cleared; busy adopted worker patched every 5s forever), workerPodTerminating (2a never protected adopted workers), and informer-driven cleanup. Now adoptClaimedWorker merge-patches duckgres/control-plane to this CP. should-fix — don't hand a new session to a Terminating worker isGenericSessionSchedulableWorkerLocked and OrgReservedPool. workerReadyForSchedulingLocked now exclude workers whose pod is Terminating, so the 2a deferral can't leave a shutting-down pod schedulable once its session releases. Codex P1 follow-up: the runtime-store claim path bypasses those predicates, so adoptClaimedWorker also rejects a claim whose pod is already Terminating (routes into the existing claim-retire/fallback). should-fix — shrink the reconcile race window 5s -> 2s (eager-stamp on the 0->1 session transition deferred: it touches the hot acquire path on both pools, and with the terminationGracePeriod ceiling a disruption inside the window is non-fatal anyway.) test — harness assert_drain_aware_eviction attributes the annotation to its OWN held session via a baseline diff (not "first annotated pod"), and drops $(seq) for a POSIX while-loop (#!/bin/sh + set -eu). New unit test TestRelabelAdoptedPodToThisCP; reconcile/terminating/-race tests updated. Charts rollout-ordering + 1h-semantics documented in PostHog/charts#11756. Co-Authored-By: Claude Opus 4.8 (1M context) --- controlplane/k8s_pool.go | 27 +++++++++++- controlplane/org_reserved_pool.go | 5 +++ controlplane/worker_disruption_guard.go | 38 +++++++++++++++-- controlplane/worker_disruption_guard_test.go | 38 ++++++++++++++++- tests/e2e-mw-dev/harness.sh | 43 +++++++++++++------- 5 files changed, 129 insertions(+), 22 deletions(-) diff --git a/controlplane/k8s_pool.go b/controlplane/k8s_pool.go index 6d615b16..78d89069 100644 --- a/controlplane/k8s_pool.go +++ b/controlplane/k8s_pool.go @@ -1885,6 +1885,27 @@ func (p *K8sWorkerPool) adoptClaimedWorker(ctx context.Context, claimed *configs return nil, fmt.Errorf("get claimed worker pod %s: %w", claimed.PodName, err) } + // Don't adopt a worker whose pod is already Terminating (its node is being + // drained): activating it would hand a new session to a shutting-down pod. + // The local scheduler predicates (isGenericSessionSchedulableWorkerLocked / + // workerReadyForSchedulingLocked) only cover workers already in p.workers, + // not runtime-store claims, so gate the claim here. The error routes into the + // caller's claim-retire/fallback path (reserveClaimedWorker callers). + if pod.DeletionTimestamp != nil { + return nil, fmt.Errorf("claimed worker pod %s is terminating; not adopting", claimed.PodName) + } + + // Relabel the adopted pod to this control plane so our label-scoped pod + // informer sees it; otherwise the do-not-disrupt reconcile, the planned-drain + // health guard, and informer-driven pod-terminated cleanup are all blind to + // adopted workers. Best-effort: the worker still functions if this fails. + if pod.Labels["duckgres/control-plane"] != p.cpID { + if relErr := p.relabelAdoptedPodToThisCP(ctx, claimed.PodName); relErr != nil { + slog.Warn("Failed to relabel adopted worker pod to this control plane; informer-based guards may not apply until relabel succeeds.", + "worker", claimed.WorkerID, "worker_pod", claimed.PodName, "error", relErr) + } + } + // For hot-idle workers, skip the epoch-validated health check. The worker's // epoch and CP instance ID are from the previous owner, and ClaimHotIdleWorker // already bumped the epoch in the DB. The health check requires exact epoch @@ -3456,7 +3477,11 @@ func (p *K8sWorkerPool) dropLocalWorkerIfSameLeaseLocked(lease workerLeaseSnapsh } func (p *K8sWorkerPool) isGenericSessionSchedulableWorkerLocked(w *ManagedWorker) bool { - return w.SharedState().NormalizedLifecycle() == WorkerLifecycleIdle + // Exclude a worker whose pod is already Terminating (planned node drain): the + // 2a health guard keeps such a busy worker in the pool to let its query + // drain, but it must not receive a NEW session on a shutting-down pod. + return w.SharedState().NormalizedLifecycle() == WorkerLifecycleIdle && + !p.workerPodTerminating(p.workerPodName(w)) } func (p *K8sWorkerPool) isWarmIdleWorkerLocked(w *ManagedWorker) bool { diff --git a/controlplane/org_reserved_pool.go b/controlplane/org_reserved_pool.go index 0b5a0449..f4142e82 100644 --- a/controlplane/org_reserved_pool.go +++ b/controlplane/org_reserved_pool.go @@ -343,6 +343,11 @@ func (p *OrgReservedPool) workerReadyForSchedulingLocked(w *ManagedWorker) bool if !p.workerBelongsToOrgLocked(w) { return false } + // Don't hand a new session to a worker whose pod is already Terminating + // (planned node drain) — see isGenericSessionSchedulableWorkerLocked. + if p.shared.workerPodTerminating(p.shared.workerPodName(w)) { + return false + } lifecycle := w.SharedState().NormalizedLifecycle() return lifecycle == WorkerLifecycleHot } diff --git a/controlplane/worker_disruption_guard.go b/controlplane/worker_disruption_guard.go index 1b153ae1..4e1f29c8 100644 --- a/controlplane/worker_disruption_guard.go +++ b/controlplane/worker_disruption_guard.go @@ -49,10 +49,14 @@ const ( // disruptionGuardReconcileInterval is how often the busy/idle → annotation // reconcile runs. A worker busy for longer than this is protected before the // next Karpenter disruption decision; the only exposure is a query that both - // starts and is selected for voluntary disruption within one interval, which - // would be a sub-interval query anyway. Patches only fire on transitions, so - // steady state costs zero API calls. - disruptionGuardReconcileInterval = 5 * time.Second + // starts and is selected for voluntary disruption within one interval. Kept + // at the health-check cadence (2s) to bound that race window; with the + // NodePool terminationGracePeriod ceiling in place a disruption inside the + // window is non-fatal anyway (the query still gets its drain). Eager-stamping + // on the 0→1 session transition would close the window entirely but touches + // the hot acquire path on both pools — deferred. Patches fire only on + // transitions, so steady state costs zero API calls regardless of cadence. + disruptionGuardReconcileInterval = 2 * time.Second // workerTerminationGracePeriodSeconds is the worker pod's grace period. // Unset historically (→ k8s default 30s), far too short for analytical @@ -180,6 +184,32 @@ func (p *K8sWorkerPool) patchWorkerDoNotDisrupt(ctx context.Context, podName str return err } +// relabelAdoptedPodToThisCP patches a worker pod's duckgres/control-plane label +// to this control plane's ID. Called when one CP adopts a worker spawned by +// another (failover/rollout). The pod informer is label-scoped to +// duckgres/control-plane= (see startInformer), so without this an adopted +// worker is invisible to THIS CP's informer cache — making podHasDoNotDisrupt, +// workerPodTerminating, and the informer-driven pod-terminated cleanup all blind +// to it (the do-not-disrupt reconcile and the planned-drain health guard would +// silently not apply, and a stale do-not-disrupt annotation would never clear). +func (p *K8sWorkerPool) relabelAdoptedPodToThisCP(ctx context.Context, podName string) error { + patch := map[string]interface{}{ + "metadata": map[string]interface{}{ + "labels": map[string]interface{}{ + "duckgres/control-plane": p.cpID, + }, + }, + } + data, err := json.Marshal(patch) + if err != nil { + return err + } + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + _, err = p.clientset.CoreV1().Pods(p.namespace).Patch(ctx, podName, types.MergePatchType, data, metav1.PatchOptions{}) + return err +} + // workerPodTerminating reports whether the worker's pod is already being torn // down (has a deletionTimestamp), read from the pod informer cache — no API // call, no extra RBAC. The control plane evicts workers via Karpenter node diff --git a/controlplane/worker_disruption_guard_test.go b/controlplane/worker_disruption_guard_test.go index 5997ede1..1d21a499 100644 --- a/controlplane/worker_disruption_guard_test.go +++ b/controlplane/worker_disruption_guard_test.go @@ -93,12 +93,46 @@ func TestReconcileDisruptionGuardsSetsAndClears(t *testing.T) { } } +// TestRelabelAdoptedPodToThisCP verifies the adoption relabel makes a pod +// spawned by another CP carry THIS CP's control-plane label (so this CP's +// label-scoped informer can see it) while preserving the pod's other labels. +func TestRelabelAdoptedPodToThisCP(t *testing.T) { + pool, cs := newTestK8sPool(t, 5) // pool.cpID == "test-cp" + ctx := context.Background() + + if _, err := cs.CoreV1().Pods("default").Create(ctx, &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ + Name: "wpod-adopted", + Namespace: "default", + Labels: map[string]string{"duckgres/control-plane": "old-cp", "duckgres/worker-id": "9"}, + }}, metav1.CreateOptions{}); err != nil { + t.Fatal(err) + } + + if err := pool.relabelAdoptedPodToThisCP(ctx, "wpod-adopted"); err != nil { + t.Fatal(err) + } + + got, err := cs.CoreV1().Pods("default").Get(ctx, "wpod-adopted", metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + if got.Labels["duckgres/control-plane"] != "test-cp" { + t.Fatalf("want control-plane=test-cp after relabel, got %q", got.Labels["duckgres/control-plane"]) + } + if got.Labels["duckgres/worker-id"] != "9" { + t.Fatalf("merge patch clobbered other labels: %v", got.Labels) + } +} + // TestReconcileDisruptionGuardsClearsStaleAnnotationAfterFailover is the // CP-failover regression: a CP stamps do-not-disrupt on a busy worker, then // dies; its sessions die with it so the worker is idle, but the pod keeps the // annotation. A surviving/replacement CP (no in-memory record of having set it) -// must still clear the orphan. The cache-based reconcile does; an in-memory -// "applied" flag would not (it would read applied=false==busy=false and skip). +// must still clear the orphan. In production the pod becomes visible to the new +// CP's label-scoped informer via relabelAdoptedPodToThisCP on adoption (tested +// above); this test seeds the cache to that post-relabel state and asserts the +// cache-based reconcile clears the orphan. An in-memory "applied" flag would not +// (it would read applied=false==busy=false and skip). func TestReconcileDisruptionGuardsClearsStaleAnnotationAfterFailover(t *testing.T) { pool, cs := newTestK8sPool(t, 5) ctx := context.Background() diff --git a/tests/e2e-mw-dev/harness.sh b/tests/e2e-mw-dev/harness.sh index 60cb7f82..cab9588c 100755 --- a/tests/e2e-mw-dev/harness.sh +++ b/tests/e2e-mw-dev/harness.sh @@ -347,36 +347,49 @@ assert_worker_pod() { # contract is what gates Karpenter, so that is what we assert.) DND_JSONPATH='{.metadata.annotations.karpenter\.sh/do-not-disrupt}' worker_dnd() { k get pod "$1" -o jsonpath="$DND_JSONPATH" 2>/dev/null; } +# Space-padded list of worker pods currently carrying do-not-disrupt. +annotated_workers() { + out=" " + for p in $(k get pods -l app=duckgres-worker -o jsonpath='{.items[*].metadata.name}' 2>/dev/null); do + [ "$(worker_dnd "$p")" = "true" ] && out="$out$p " + done + printf '%s' "$out" +} assert_drain_aware_eviction() { # org password log "drain-aware eviction: busy worker carries do-not-disrupt, idle worker clears it" wait_worker "$1" "$2" ducklake - # Hold a session open ~30s. SELECT 1 proves the worker is acquired; the - # trailing sleep keeps stdin (and thus the connection, and activeSessions>0) - # open without blocking the harness. - { printf 'SELECT 1;\n'; sleep 30; } | PGPASSWORD="$2" psql \ + # Attribute the annotation to OUR session: snapshot which workers are already + # annotated, then look for a NEWLY annotated one while we hold a session (a + # pre-existing busy worker must not make this pass for the wrong reason). + before="$(annotated_workers)" + + # Hold a session open ~40s. SELECT 1 proves the worker is acquired; the + # trailing sleep keeps stdin (and the connection, so activeSessions>0) open + # without blocking the harness. + { printf 'SELECT 1;\n'; sleep 40; } | PGPASSWORD="$2" psql \ "sslmode=require host=$1$SNI_SUFFIX hostaddr=$CP_IP port=5432 user=root dbname=ducklake" \ -v ON_ERROR_STOP=1 -tA >/dev/null 2>&1 & hold_pid=$! - busy="" - for _ in $(seq 1 12); do # up to ~24s for the reconciler to stamp it - for p in $(k get pods -l app=duckgres-worker -o jsonpath='{.items[*].metadata.name}' 2>/dev/null); do - [ "$(worker_dnd "$p")" = "true" ] && { busy="$p"; break; } + busy=""; i=0 + while [ "$i" -lt 15 ]; do # up to ~30s for the reconciler to stamp it + for p in $(annotated_workers); do + case "$before" in *" $p "*) : ;; *) busy="$p"; break ;; esac done [ -n "$busy" ] && break - sleep 2 + sleep 2; i=$((i + 1)) done - [ -n "$busy" ] || { kill "$hold_pid" 2>/dev/null; fail "no busy worker received karpenter.sh/do-not-disrupt"; } - log " busy worker $busy carries do-not-disrupt=true" + [ -n "$busy" ] || { kill "$hold_pid" 2>/dev/null; fail "no worker acquired karpenter.sh/do-not-disrupt while a session was held"; } + log " session worker $busy carries do-not-disrupt=true" wait "$hold_pid" 2>/dev/null || true # session ends -> worker goes idle - cleared="" - for _ in $(seq 1 15); do # up to ~30s for the reconciler to clear it + cleared=""; i=0 + while [ "$i" -lt 20 ]; do # up to ~40s for the reconciler to clear it [ -z "$(worker_dnd "$busy")" ] && { cleared=1; break; } - sleep 2 + sleep 2; i=$((i + 1)) done - [ -n "$cleared" ] || fail "do-not-disrupt not cleared on idle worker $busy" + [ -n "$cleared" ] || fail "do-not-disrupt not cleared on idle worker $busy after session end" log " do-not-disrupt cleared after session end on $busy" }