diff --git a/README.md b/README.md index bf8e250c..9409ee26 100644 --- a/README.md +++ b/README.md @@ -226,6 +226,7 @@ Run with config file: | `DUCKGRES_K8S_SHARED_WARM_TARGET` | Default-image neutral shared warm-worker target for K8s multi-tenant mode (`0` disables prewarm; subject to `DUCKGRES_K8S_MAX_WORKERS`) | `0` | | `DUCKGRES_K8S_DYNAMIC_WARM_CAPACITY_ENABLED` | Enable configstore-driven dynamic warm-capacity target computation from recent no-idle misses | `true` | | `DUCKGRES_K8S_WARM_CAPACITY_MISS_WINDOW` | Recent no-idle miss window used for dynamic warm-capacity demand | `2m` | +| `DUCKGRES_K8S_WARM_ACQUIRE_TIMEOUT` | Optional server-side wait budget for retryable no-idle warm-pool misses; `0`/unset uses `DUCKGRES_WORKER_QUEUE_TIMEOUT` in remote K8s mode, and the effective worker queue timeout is extended when needed | `0` | | `DUCKGRES_K8S_WARM_CAPACITY_MISSES_PER_WORKER` | Recent misses required for one extra dynamic warm worker | `8` | | `DUCKGRES_K8S_WARM_CAPACITY_DEMAND_TTL` | Retention TTL for warm-capacity miss buckets; clamped to at least the miss window | `15m` | | `DUCKGRES_K8S_WARM_CAPACITY_DYNAMIC_IMAGE_CEILING` | Max dynamic extra warm workers per image (`0` means unlimited) | `0` | diff --git a/configresolve/cliflags.go b/configresolve/cliflags.go index feefece9..48cb86af 100644 --- a/configresolve/cliflags.go +++ b/configresolve/cliflags.go @@ -76,7 +76,7 @@ func RegisterCLIInputsFlags(fs *flag.FlagSet) func() CLIInputs { k8sSharedWarmTarget := fs.Int("k8s-shared-warm-target", 0, "Neutral shared warm-worker target for K8s multi-tenant mode, 0=disabled (env: DUCKGRES_K8S_SHARED_WARM_TARGET)") k8sDynamicWarmCapacityEnabled := fs.Bool("k8s-dynamic-warm-capacity-enabled", true, "Enable configstore-driven dynamic warm-capacity target computation (default true; use --k8s-dynamic-warm-capacity-enabled=false to disable; env: DUCKGRES_K8S_DYNAMIC_WARM_CAPACITY_ENABLED)") k8sWarmCapacityMissWindow := fs.String("k8s-warm-capacity-miss-window", "", "Recent no-idle miss window for dynamic warm-capacity demand (default: 2m) (env: DUCKGRES_K8S_WARM_CAPACITY_MISS_WINDOW)") - k8sWarmAcquireTimeout := fs.String("k8s-warm-acquire-timeout", "", "How long a session-acquire blocks server-side for a warm colocated worker before returning backpressure (e.g. 5m; default 0 = fail fast) (env: DUCKGRES_K8S_WARM_ACQUIRE_TIMEOUT)") + k8sWarmAcquireTimeout := fs.String("k8s-warm-acquire-timeout", "", "How long a session-acquire waits server-side for a warm worker before timing out (e.g. 5m; default uses worker-queue-timeout in remote mode) (env: DUCKGRES_K8S_WARM_ACQUIRE_TIMEOUT)") k8sWarmCapacityMissesPerWorker := fs.Int("k8s-warm-capacity-misses-per-worker", 0, "Recent misses required for one extra dynamic warm worker (default: 8) (env: DUCKGRES_K8S_WARM_CAPACITY_MISSES_PER_WORKER)") k8sWarmCapacityDemandTTL := fs.String("k8s-warm-capacity-demand-ttl", "", "Retention TTL for warm-capacity miss buckets (default: 15m) (env: DUCKGRES_K8S_WARM_CAPACITY_DEMAND_TTL)") k8sWarmCapacityDynamicImageCeiling := fs.Int("k8s-warm-capacity-dynamic-image-ceiling", 0, "Max dynamic extra warm workers per image, 0=unlimited (env: DUCKGRES_K8S_WARM_CAPACITY_DYNAMIC_IMAGE_CEILING)") diff --git a/configresolve/resolve_k8s_test.go b/configresolve/resolve_k8s_test.go index 8c253763..c4a7f3a5 100644 --- a/configresolve/resolve_k8s_test.go +++ b/configresolve/resolve_k8s_test.go @@ -115,7 +115,8 @@ func TestResolveEffectiveK8sDynamicWarmCapacityPrecedence(t *testing.T) { } func TestResolveEffectiveK8sWarmAcquireTimeout(t *testing.T) { - // Default: unset everywhere => 0 (fail-fast). + // Default: unset everywhere => 0. Remote control-plane startup expands this + // to worker_queue_timeout so no-idle warm misses wait at the session boundary. def := ResolveEffective(&configloader.FileConfig{}, CLIInputs{}, func(string) string { return "" }, nil) if def.K8sWarmAcquireTimeout != 0 { t.Fatalf("expected default warm-acquire-timeout 0, got %s", def.K8sWarmAcquireTimeout) diff --git a/controlplane/control.go b/controlplane/control.go index 8e8496fd..39165f4f 100644 --- a/controlplane/control.go +++ b/controlplane/control.go @@ -40,7 +40,7 @@ type ControlPlaneConfig struct { SocketDir string ConfigPath string // Path to config file, passed to workers HealthCheckInterval time.Duration - WorkerQueueTimeout time.Duration // How long to wait for an available worker/org connection slot (default: 60s) + WorkerQueueTimeout time.Duration // How long to wait for an available worker/org connection slot (default: 60s; extended to cover warm acquire waits) WorkerIdleTimeout time.Duration // How long to keep an idle worker alive (default: 5m) RetireOnSessionEnd bool // When true, process workers are retired immediately after their last session ends. HandoverDrainTimeout time.Duration // How long to wait for connections to drain during upgrade. 0 = unbounded (wait until k8s SIGKILL via terminationGracePeriodSeconds). Default: 0 in remote mode (so a CP rolling out doesn't kill in-flight customer queries at a self-imposed wall — see drainAndShutdown), 24h in process mode. @@ -115,7 +115,7 @@ type K8sConfig struct { SharedWarmTarget int // Neutral shared warm-worker target for K8s multi-tenant mode (0 = disabled) DynamicWarmCapacityEnabled bool // Enable configstore-driven dynamic warm-capacity target computation WarmCapacityMissWindow time.Duration // Window of recent no-idle misses that contributes to dynamic targets - WarmAcquireTimeout time.Duration // Server-side block window for a session-acquire that misses the warm pool (0 = fail fast) + WarmAcquireTimeout time.Duration // Server-side block window for a session-acquire that misses the warm pool (0 = use worker queue timeout) WarmCapacityMissesPerWorker int // Number of recent misses that translate to one extra warm worker WarmCapacityDemandTTL time.Duration // Retention TTL for warm-capacity miss buckets WarmCapacityDynamicImageCeiling int // Max dynamic extra warm workers per image (0 = unlimited) @@ -256,6 +256,12 @@ func RunControlPlane(cfg ControlPlaneConfig) { if cfg.WorkerQueueTimeout == 0 { cfg.WorkerQueueTimeout = 60 * time.Second } + if cfg.WorkerBackend == "remote" { + if cfg.K8s.WarmAcquireTimeout == 0 { + cfg.K8s.WarmAcquireTimeout = cfg.WorkerQueueTimeout + } + cfg.WorkerQueueTimeout = effectiveSessionAcquireTimeout(cfg.WorkerQueueTimeout, cfg.K8s.WarmAcquireTimeout) + } if cfg.WorkerIdleTimeout == 0 { cfg.WorkerIdleTimeout = 5 * time.Minute } @@ -727,6 +733,17 @@ func createSessionWithRegisteredCancel( return createFn(ctx) } +func effectiveSessionAcquireTimeout(workerQueueTimeout, warmAcquireTimeout time.Duration) time.Duration { + if warmAcquireTimeout <= 0 { + return workerQueueTimeout + } + warmWaitBudget := warmAcquireTimeout + WarmAcquireRetryInterval + if workerQueueTimeout <= 0 || warmWaitBudget > workerQueueTimeout { + return warmWaitBudget + } + return workerQueueTimeout +} + func sessionCreationErrorResponse(err error) (code string, message string) { var capacityErr *WarmCapacityExhaustedError switch { diff --git a/controlplane/control_cancel_test.go b/controlplane/control_cancel_test.go index 78bcb44f..95fbd1d4 100644 --- a/controlplane/control_cancel_test.go +++ b/controlplane/control_cancel_test.go @@ -63,6 +63,46 @@ func TestCreateSessionWithRegisteredCancel_CancelQueryCancelsWait(t *testing.T) } } +func TestEffectiveSessionAcquireTimeoutExtendsForWarmAcquire(t *testing.T) { + tests := []struct { + name string + workerQueueTimeout time.Duration + warmAcquireTimeout time.Duration + want time.Duration + }{ + { + name: "worker queue only", + workerQueueTimeout: 60 * time.Second, + want: 60 * time.Second, + }, + { + name: "warm acquire below queue", + workerQueueTimeout: 60 * time.Second, + warmAcquireTimeout: 30 * time.Second, + want: 60 * time.Second, + }, + { + name: "warm acquire exceeds queue", + workerQueueTimeout: 60 * time.Second, + warmAcquireTimeout: 5 * time.Minute, + want: 5*time.Minute + WarmAcquireRetryInterval, + }, + { + name: "warm acquire only", + warmAcquireTimeout: 90 * time.Second, + want: 90*time.Second + WarmAcquireRetryInterval, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := effectiveSessionAcquireTimeout(tt.workerQueueTimeout, tt.warmAcquireTimeout); got != tt.want { + t.Fatalf("effectiveSessionAcquireTimeout() = %s, want %s", got, tt.want) + } + }) + } +} + func TestSessionCreationErrorResponse(t *testing.T) { t.Run("cancelled", func(t *testing.T) { code, message := sessionCreationErrorResponse(context.Canceled) @@ -99,7 +139,7 @@ func TestSessionCreationErrorResponse(t *testing.T) { if code != "53300" { t.Fatalf("code = %q, want 53300", code) } - want := "no warm Duckgres worker is currently available; retry in about 45 seconds" + want := "timed out waiting for an available worker" if message != want { t.Fatalf("message = %q, want %q", message, want) } diff --git a/controlplane/k8s_pool.go b/controlplane/k8s_pool.go index 7ecdeed1..3a144b37 100644 --- a/controlplane/k8s_pool.go +++ b/controlplane/k8s_pool.go @@ -61,8 +61,8 @@ type K8sWorkerPool struct { nextWorkerID int spawning int maxWorkers int - // warmAcquireTimeout: server-side block window for a session-acquire that - // missed the warm pool (0 = fail fast). Read by OrgReservedPool.AcquireWorker. + // warmAcquireTimeout: optional server-side block window for a session-acquire + // that missed the warm pool. Read by OrgReservedPool.AcquireWorker. warmAcquireTimeout time.Duration minWorkers int // perImageWarmTarget is an additive floor on top of minWorkers: for each @@ -1653,9 +1653,9 @@ func (p *K8sWorkerPool) ReserveSharedWorker(ctx context.Context, assignment *Wor // Runtime-store-less K8s mode uses the local worker map as its source // of truth. Filter by assignment.Image so a per-org pin is honored: if // the assignment names a specific image and no in-memory warm worker - // matches, fail fast with warm-capacity backpressure. Warm capacity is - // supplied by configured warm reconciliation rather than by the - // foreground user connection. + // matches, return the internal warm-capacity signal. Session acquisition + // keeps retryable no-idle misses waiting instead of exposing this to the + // client. // Without this filter, a pinned org could be handed a default-image // warm worker, and activation would fail with a DuckLake/extension // version mismatch. diff --git a/controlplane/org_reserved_pool.go b/controlplane/org_reserved_pool.go index 0b5a0449..89b669e3 100644 --- a/controlplane/org_reserved_pool.go +++ b/controlplane/org_reserved_pool.go @@ -4,26 +4,11 @@ package controlplane import ( "context" - "errors" "fmt" "log/slog" "time" - - "github.com/posthog/duckgres/controlplane/configstore" ) -// isRetryableWarmMiss reports whether a worker-acquire error is a transient -// "no idle warm worker" miss — the only capacity miss that resolves on its own -// once the warm pool replenishes. Org/global-cap and shutdown misses are not -// retried (waiting won't change them). -func isRetryableWarmMiss(err error) bool { - var capErr *WarmCapacityExhaustedError - if !errors.As(err, &capErr) { - return false - } - return capErr.missReason() == configstore.WorkerClaimMissReasonNoIdle -} - // OrgReservedPool presents one org's reserved slice of a shared K8s warm pool. // It preserves the existing WorkerPool contract for SessionManager while ensuring // workers are reserved to a single org for their lifetime and retired after use. @@ -77,11 +62,13 @@ func (p *OrgReservedPool) assignedColocatedResourcesLocked() (cpu int, memBytes } func (p *OrgReservedPool) AcquireWorker(ctx context.Context, profile *WorkerProfile) (*ManagedWorker, error) { - // Server-side patience: block up to warmAcquireTimeout for the warm pool to - // replenish before surfacing a "no warm worker" miss to the client. Always - // bounded by the request ctx, so a client with a short deadline still fails - // fast. 0 = legacy fail-fast. - warmDeadline := time.Now().Add(p.shared.warmAcquireTimeout) + // Server-side patience: transient no-idle warm-pool misses stay internal and + // are retried until the caller context or optional warm-acquire deadline + // expires. Org/global-cap and shutdown misses still return immediately. + var warmDeadline time.Time + if p.shared.warmAcquireTimeout > 0 { + warmDeadline = time.Now().Add(p.shared.warmAcquireTimeout) + } // Throttle warm-miss recording across the wait's repeated polls. var lastWarmMissAt time.Time for { @@ -151,16 +138,12 @@ func (p *OrgReservedPool) AcquireWorker(ctx context.Context, profile *WorkerProf if recordMiss { lastWarmMissAt = time.Now() } - // Server-side patience: a transient no-idle miss on a colocated - // request resolves once the warm pool replenishes (a colocated - // shape may first need a cold node, minutes). Wait and retry until - // warmAcquireTimeout elapses rather than failing immediately; - // bounded by ctx. Restricted to colocated requests — a default / - // exclusive miss replenishes quickly and the client retries, so we - // don't block interactive/default traffic (e.g. data imports) for - // minutes. - if p.shared.warmAcquireTimeout > 0 && isColocated && isRetryableWarmMiss(err) && time.Now().Before(warmDeadline) { - timer := time.NewTimer(WarmAcquireRetryInterval) + if isRetryableWarmMiss(err) { + wait, ok := warmAcquireRetryDelay(ctx, warmDeadline) + if !ok { + return nil, context.DeadlineExceeded + } + timer := time.NewTimer(wait) select { case <-ctx.Done(): timer.Stop() @@ -201,6 +184,29 @@ func (p *OrgReservedPool) AcquireWorker(ctx context.Context, profile *WorkerProf } } +func warmAcquireRetryDelay(ctx context.Context, warmDeadline time.Time) (time.Duration, bool) { + wait := WarmAcquireRetryInterval + if !warmDeadline.IsZero() { + remaining := time.Until(warmDeadline) + if remaining <= 0 { + return 0, false + } + if remaining < wait { + wait = remaining + } + } + if deadline, ok := ctx.Deadline(); ok { + remaining := time.Until(deadline) + if remaining <= 0 { + return 0, false + } + if remaining < wait { + wait = remaining + } + } + return wait, true +} + func (p *OrgReservedPool) ReleaseWorker(id int) { if p.shared.TransitionToHotIdleIfNoSessions(id) { return diff --git a/controlplane/org_reserved_pool_test.go b/controlplane/org_reserved_pool_test.go index 28eef015..9fd066bf 100644 --- a/controlplane/org_reserved_pool_test.go +++ b/controlplane/org_reserved_pool_test.go @@ -322,6 +322,62 @@ func TestIsRetryableWarmMiss(t *testing.T) { } } +func TestOrgReservedPoolWaitsForNoIdleWarmMissUntilWarmDeadline(t *testing.T) { + shared, _ := newTestK8sPool(t, 5) + shared.warmAcquireTimeout = 20 * time.Millisecond + + pool := NewOrgReservedPool(shared, "analytics", 1, shared.workerImage, nil, 0, 0) + + start := time.Now() + got, err := pool.AcquireWorker(context.Background(), nil) + if err != context.DeadlineExceeded { + t.Fatalf("expected deadline exceeded after waiting for warm capacity, got worker=%v err=%v", got, err) + } + if elapsed := time.Since(start); elapsed < shared.warmAcquireTimeout { + t.Fatalf("expected acquire to wait at least %s, returned after %s", shared.warmAcquireTimeout, elapsed) + } +} + +func TestOrgReservedPoolAcquireNoIdleMissReturnsWorkerWhenWarmCapacityArrivesBeforeDeadline(t *testing.T) { + shared, _ := newTestK8sPool(t, 5) + shared.warmAcquireTimeout = 50 * time.Millisecond + shared.healthCheckFunc = func(ctx context.Context, worker *ManagedWorker) error { + return nil + } + + pool := NewOrgReservedPool(shared, "analytics", 1, shared.workerImage, nil, 0, 0) + pool.activateReservedWorker = func(ctx context.Context, worker *ManagedWorker) error { + return nil + } + + type acquireResult struct { + worker *ManagedWorker + err error + } + resultCh := make(chan acquireResult, 1) + go func() { + worker, err := pool.AcquireWorker(context.Background(), nil) + resultCh <- acquireResult{worker: worker, err: err} + }() + + time.Sleep(5 * time.Millisecond) + shared.mu.Lock() + addNeutralWarmWorker(shared, 7) + shared.mu.Unlock() + + select { + case result := <-resultCh: + if result.err != nil { + t.Fatalf("expected worker after warm capacity arrived, got error %v", result.err) + } + if result.worker == nil || result.worker.ID != 7 { + t.Fatalf("expected worker 7, got %#v", result.worker) + } + case <-time.After(500 * time.Millisecond): + t.Fatal("timed out waiting for acquire to use arriving warm capacity") + } +} + func TestOrgReservedPoolAcquireWaitsWhenSharedWarmWorkerBusyAtCapacity(t *testing.T) { shared, _ := newTestK8sPool(t, 5) worker := &ManagedWorker{ID: 3, activeSessions: 1, done: make(chan struct{})} diff --git a/controlplane/session_mgr.go b/controlplane/session_mgr.go index 99efc7bd..5ffc6257 100644 --- a/controlplane/session_mgr.go +++ b/controlplane/session_mgr.go @@ -265,8 +265,19 @@ func (sm *SessionManager) CreateSessionWithProtocol(ctx context.Context, usernam acquireStart := time.Now() ctx, acquireSpan := server.Tracer().Start(ctx, "duckgres.worker_acquire") slog.Debug("Acquiring worker for session.", "pid", pid, "user", username) - worker, err := sm.pool.AcquireWorker(ctx, profile) - if err != nil { + var worker *ManagedWorker + for { + worker, err = sm.pool.AcquireWorker(ctx, profile) + if err == nil { + break + } + if isRetryableWarmMiss(err) { + if waitErr := waitForWarmAcquireRetry(ctx); waitErr != nil { + acquireSpan.End() + return 0, nil, fmt.Errorf("acquire worker: %w", waitErr) + } + continue + } var capacityErr *WarmCapacityExhaustedError if errors.As(err, &capacityErr) { missReason := capacityErr.missReason() @@ -300,6 +311,31 @@ func (sm *SessionManager) CreateSessionWithProtocol(ctx context.Context, usernam return pid, exec, nil } +func waitForWarmAcquireRetry(ctx context.Context) error { + wait := WarmAcquireRetryInterval + if deadline, ok := ctx.Deadline(); ok { + remaining := time.Until(deadline) + if remaining <= 0 { + if err := ctx.Err(); err != nil { + return err + } + return context.DeadlineExceeded + } + if remaining < wait { + wait = remaining + } + } + + timer := time.NewTimer(wait) + defer timer.Stop() + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + return nil + } +} + func (sm *SessionManager) resolveSessionLimits(memoryLimit string, threads int) (string, int) { if sm.rebalancer == nil { return memoryLimit, threads diff --git a/controlplane/session_mgr_test.go b/controlplane/session_mgr_test.go index 83e79f4d..f6e55ab9 100644 --- a/controlplane/session_mgr_test.go +++ b/controlplane/session_mgr_test.go @@ -12,6 +12,7 @@ import ( "testing" "time" + "github.com/posthog/duckgres/controlplane/configstore" "github.com/posthog/duckgres/server/flightclient" dto "github.com/prometheus/client_model/go" ) @@ -60,7 +61,7 @@ func (p *acquireErrorPool) ShutdownAll() {} func TestCreateSessionObservesWarmCapacityExhaustion(t *testing.T) { controlPlaneWorkerAcquireFailuresCounter.Reset() sm := NewSessionManager(&acquireErrorPool{ - err: NewWarmCapacityExhaustedError(30 * time.Second), + err: NewWarmCapacityExhaustedErrorForReason(configstore.WorkerClaimMissReasonOrgCap, 30*time.Second), }, nil) _, _, err := sm.CreateSession(context.Background(), "root", 1001, "", 0, nil) @@ -82,6 +83,28 @@ func TestCreateSessionObservesWarmCapacityExhaustion(t *testing.T) { } } +func TestCreateSessionNoIdleWarmCapacityWaitsForCallerDeadline(t *testing.T) { + sm := NewSessionManager(&acquireErrorPool{ + err: NewWarmCapacityExhaustedError(30 * time.Second), + }, nil) + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) + defer cancel() + + start := time.Now() + _, _, err := sm.CreateSession(ctx, "root", 1001, "", 0, nil) + if !errors.Is(err, context.DeadlineExceeded) { + t.Fatalf("expected deadline exceeded, got %v", err) + } + var capacityErr *WarmCapacityExhaustedError + if errors.As(err, &capacityErr) { + t.Fatalf("expected no-idle warm capacity miss to stay internal, got %v", err) + } + if elapsed := time.Since(start); elapsed < 20*time.Millisecond { + t.Fatalf("expected create session to wait for caller deadline, returned after %s", elapsed) + } +} + func TestOnWorkerCrash_MarksExecutorsDead(t *testing.T) { pool := &FlightWorkerPool{ workers: make(map[int]*ManagedWorker), diff --git a/controlplane/warm_capacity_policy.go b/controlplane/warm_capacity_policy.go index aadf8603..6b676c29 100644 --- a/controlplane/warm_capacity_policy.go +++ b/controlplane/warm_capacity_policy.go @@ -1,7 +1,6 @@ package controlplane import ( - "fmt" "time" "github.com/posthog/duckgres/controlplane/configstore" @@ -45,7 +44,7 @@ func warmCapacityMissPolicyForReason(reason configstore.WorkerClaimMissReason) w func (p warmCapacityMissPolicy) errorString(retryAfter time.Duration) string { switch p.messageKind { case warmCapacityMessageNoIdle: - return fmt.Sprintf("warm worker capacity exhausted; retry in about %s", normalizedWarmCapacityRetryAfter(retryAfter).Round(time.Second)) + return "timed out waiting for warm worker capacity" case warmCapacityMessageOrgCap: return "warm worker capacity exhausted for organization" case warmCapacityMessageGlobalCap: @@ -60,7 +59,7 @@ func (p warmCapacityMissPolicy) errorString(retryAfter time.Duration) string { func (p warmCapacityMissPolicy) sqlMessage(retryAfter time.Duration) string { switch p.messageKind { case warmCapacityMessageNoIdle: - return fmt.Sprintf("no warm Duckgres worker is currently available; retry in about %d seconds", warmCapacityRetrySeconds(retryAfter)) + return "timed out waiting for an available worker" case warmCapacityMessageOrgCap: return "Duckgres worker capacity for this organization is currently exhausted; retry later" case warmCapacityMessageGlobalCap: diff --git a/controlplane/warm_capacity_retry.go b/controlplane/warm_capacity_retry.go new file mode 100644 index 00000000..fdf10c32 --- /dev/null +++ b/controlplane/warm_capacity_retry.go @@ -0,0 +1,17 @@ +package controlplane + +import ( + "errors" + + "github.com/posthog/duckgres/controlplane/configstore" +) + +// isRetryableWarmMiss reports whether a worker-acquire error is a transient +// no-idle warm-worker miss. Org/global-cap and shutdown misses are not retried. +func isRetryableWarmMiss(err error) bool { + var capErr *WarmCapacityExhaustedError + if !errors.As(err, &capErr) { + return false + } + return capErr.missReason() == configstore.WorkerClaimMissReasonNoIdle +} diff --git a/controlplane/worker_pool.go b/controlplane/worker_pool.go index 9415a773..30b7caa6 100644 --- a/controlplane/worker_pool.go +++ b/controlplane/worker_pool.go @@ -26,9 +26,10 @@ const WarmAcquireRetryInterval = 2 * time.Second // keep driving the warm reconciler. const WarmMissRecordInterval = 30 * time.Second -// WarmCapacityExhaustedError is returned when a user request misses the ready -// warm pool. The caller should fail fast with a retryable capacity response -// instead of waiting for a foreground cold worker spawn. +// WarmCapacityExhaustedError is returned internally when a worker acquire misses +// ready warm capacity. Retryable no-idle misses are kept inside session acquire; +// quota and shutdown misses still surface to callers because waiting will not +// resolve them. type WarmCapacityExhaustedError struct { // RetryAfter is the client-facing retry hint for protocol-specific error responses. RetryAfter time.Duration @@ -111,11 +112,9 @@ type K8sWorkerPoolConfig struct { SecretName string // Base name for per-worker K8s Secrets containing RPC bearer token and TLS material ConfigMap string // ConfigMap name for duckgres.yaml MaxWorkers int - // WarmAcquireTimeout: how long a session-acquire blocks server-side waiting - // for a warm worker to become available before returning the retryable - // "no warm worker" backpressure. 0 = fail fast (legacy behavior). Bounded - // per-request by the client's connection context, so a client with a short - // deadline still fails fast. + // WarmAcquireTimeout: optional server-side warm-pool wait budget. In remote + // control-plane mode, 0 means use the worker queue timeout. Bounded + // per-request by the client's connection context. WarmAcquireTimeout time.Duration IdleTimeout time.Duration ConfigPath string // Path inside worker pod where config is mounted diff --git a/duckgres.example.yaml b/duckgres.example.yaml index 56978522..27ad74df 100644 --- a/duckgres.example.yaml +++ b/duckgres.example.yaml @@ -128,6 +128,9 @@ ducklake: # # image-specific warm targets after recent no-idle misses. # dynamic_warm_capacity_enabled: true # warm_capacity_miss_window: "2m" +# # Optional wait budget for no-idle warm-pool misses. Unset/0 uses +# # worker_queue_timeout in remote K8s mode. +# warm_acquire_timeout: "0s" # warm_capacity_misses_per_worker: 8 # # Retain miss buckets at least as long as the miss window. # warm_capacity_demand_ttl: "15m" diff --git a/tests/e2e-mw-dev/README.md b/tests/e2e-mw-dev/README.md index c1393490..e5b4b068 100644 --- a/tests/e2e-mw-dev/README.md +++ b/tests/e2e-mw-dev/README.md @@ -38,12 +38,11 @@ client-go: - **wire/query** — `SELECT 1` round-trips; 5 concurrent connections stay distinct (ported from `TestK8sMultipleConcurrentConnections`). -- **warm-pool backpressure** — a cold-pool burst of sessions outruns the worker - pool (`shared_warm_target=0`); the CP must answer the surplus with the - graceful client-visible `no warm Duckgres worker … retry in about 45 seconds` - hint (not a hang/500/drop), and the pool must then drain so a retrying - connection succeeds. The harness asserts the hint **and** handles it (the - concurrency tests retry through it). +- **warm-pool wait/recovery** — a cold-pool burst of sessions outruns the worker + pool (`shared_warm_target=0`); the CP must keep retryable no-idle misses + internal, avoid the old client-visible no-warm retry hint, and then drain so a + retrying connection succeeds. The concurrency tests retry through bounded + session-startup timeouts. - **activation** — DuckLake **and** Iceberg catalogs attach and read/write. - **extension forks** — the bundled `ducklake`/`httpfs` extensions are the PostHog forks, not upstream (ported from the `*IsBundledFork` tests). diff --git a/tests/e2e-mw-dev/harness.sh b/tests/e2e-mw-dev/harness.sh index 19a97577..2b05dcd3 100755 --- a/tests/e2e-mw-dev/harness.sh +++ b/tests/e2e-mw-dev/harness.sh @@ -107,17 +107,11 @@ resolve_cp_ip() { [ -n "$CP_IP" ] || fail "could not resolve $PGHOST" } -# Warm-pool backpressure ("no warm Duckgres worker … retry in about 45 seconds") -# is a FEATURE, not an error: with shared_warm_target=0 the per-org pool is cold, -# so ANY new-session acquisition — a fresh catalog (ducklake→iceberg), a burst, -# or the first connect after the pool churned (worker kills / idle timeout) — can -# transiently get it while the CP spawns a worker. It is a FATAL at session -# create, BEFORE any SQL runs, so retrying the whole command is safe (no -# half-applied INSERT). So every harness query tolerates it via bounded retry. -# Auth failures and real SQL errors are NOT retried — they surface immediately, -# so this never feeds the rate limiter or masks a genuine failure. (The -# backpressure *contract itself* is asserted separately in -# warm_capacity_backpressure, which uses raw psql to observe the hint.) +# Warm-pool acquisition can wait while the CP replenishes capacity. If the +# caller deadline expires first, retrying the whole command is safe because the +# failure happens at session create, BEFORE any SQL runs (no half-applied +# INSERT). Auth failures and real SQL errors are NOT retried — they surface +# immediately, so this never feeds the rate limiter or masks a genuine failure. _pg_exec() { # org password dbname sql -> prints output; rc 0 ok / 1 real error a=0 out="" while [ "$a" -lt 12 ]; do @@ -127,7 +121,7 @@ _pg_exec() { # org password dbname sql -> prints output; rc 0 ok / 1 real error printf %s "$out"; return 0 fi case "$out" in - *"capacity exhausted"*|*"no warm Duckgres worker"*|*"no warm worker"*|\ + *"capacity exhausted"*|*"timed out waiting for an available worker"*|\ *"still provisioning"*|*"failed to initialize session"*) sleep 10; a=$((a + 1)); continue ;; *) printf %s "$out" >&2; return 1 ;; @@ -156,7 +150,7 @@ pg_try() { # org password dbname sql printf %s "$out"; return 0 fi case "$out" in - *"capacity exhausted"*|*"no warm Duckgres worker"*|*"no warm worker"*|\ + *"capacity exhausted"*|*"timed out waiting for an available worker"*|\ *"still provisioning"*|*"failed to initialize session"*) sleep 10; a=$((a + 1)); continue ;; *) printf %s "$out"; return 1 ;; @@ -168,10 +162,9 @@ pg_try() { # org password dbname sql # Connect preflight: a warm worker isn't always available the instant a # warehouse goes ready (shared_warm_target=0, so the first connection for an # org cold-spawns a worker; a second org can find the pool momentarily -# exhausted). The CP returns a transient "no warm Duckgres worker is currently -# available; retry in ~45s" / "still provisioning" / "failed to initialize -# session". Retry `SELECT 1` through those, bounded, BEFORE the R/W. Auth -# failures are NOT in this set, so this never feeds the rate limiter. +# exhausted). Retry `SELECT 1` through startup timeouts / "still provisioning" / +# "failed to initialize session", bounded, BEFORE the R/W. Auth failures are NOT +# in this set, so this never feeds the rate limiter. wait_worker() { # org password catalog attempt=0 while [ "$attempt" -lt 12 ]; do @@ -184,7 +177,7 @@ wait_worker() { # org password catalog sleep 15 attempt=$((attempt + 1)) done - fail "no warm worker for $1/$3 after retries" + fail "worker not ready for $1/$3 after retries" } # ---- wire protocol -------------------------------------------------------- @@ -194,31 +187,25 @@ basic_query() { # org password [ "$n" = "1" ] || fail "$1 SELECT 1 returned '$n'" } -# Warm-pool backpressure is a FEATURE: when a burst of sessions outruns the -# cold worker pool (shared_warm_target=0), the CP rejects the surplus with a -# graceful, client-visible "no warm Duckgres worker is currently available; -# retry in about 45 seconds" rather than hanging, 500-ing, or dropping the -# connection. Assert both halves of the contract: (1) under a cold-pool burst at -# least one connection receives that exact graceful hint, and (2) the pool then -# drains so a (retrying) connection succeeds. Run this BEFORE the heavier -# concurrency tests, while only one worker is warm, so the burst reliably -# exceeds instantaneous spawn capacity. +# Warm-pool wait contract: when a burst of sessions outruns the cold worker pool +# (shared_warm_target=0), the CP must not expose the old no-warm retry hint. +# The pool should still recover so a retrying connection succeeds. Run this +# BEFORE the heavier concurrency tests, while only one worker is warm. warm_capacity_backpressure() { # org password - log "warm-pool backpressure contract on $1" - burst=12; seen=/tmp/bp_seen; rm -f "$seen"; pids="" + log "warm-pool wait contract on $1" + burst=12; bad=/tmp/bp_bad; rm -f "$bad"; pids="" i=0 while [ "$i" -lt "$burst" ]; do ( o="$(PGPASSWORD="$2" psql \ "sslmode=require host=$1$SNI_SUFFIX hostaddr=$CP_IP port=5432 user=root dbname=ducklake" \ -tAc 'SELECT 1' 2>&1 || true)" case "$o" in - *"no warm Duckgres worker"*|*"retry in about"*|*"capacity exhausted"*) echo x >> "$seen" ;; + *"no warm Duckgres"*worker*|*"retry in"*seconds*) echo "$o" >> "$bad" ;; esac ) & pids="$pids $!"; i=$((i + 1)) done for p in $pids; do wait "$p" || true; done - [ -s "$seen" ] || fail "expected graceful 'retry in ~45s' backpressure under a cold-pool burst of $burst, but no connection saw it" - log "backpressure observed: $(wc -l < "$seen" | tr -d ' ')/$burst connections got the graceful retry hint" + [ ! -s "$bad" ] || fail "old no-warm retry hint leaked under a cold-pool burst of $burst: $(head -1 "$bad")" # The pool must recover: a retrying connection succeeds. v="$(pgc "$1" "$2" ducklake 'SELECT 1')" [ "$v" = "1" ] || fail "pool did not recover after backpressure (got '$v')"