Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ Run with config file:
| `DUCKGRES_K8S_SHARED_WARM_TARGET` | Default-image neutral shared warm-worker target for K8s multi-tenant mode (`0` disables prewarm; subject to `DUCKGRES_K8S_MAX_WORKERS`) | `0` |
| `DUCKGRES_K8S_DYNAMIC_WARM_CAPACITY_ENABLED` | Enable configstore-driven dynamic warm-capacity target computation from recent no-idle misses | `true` |
| `DUCKGRES_K8S_WARM_CAPACITY_MISS_WINDOW` | Recent no-idle miss window used for dynamic warm-capacity demand | `2m` |
| `DUCKGRES_K8S_WARM_ACQUIRE_TIMEOUT` | Optional server-side wait budget for retryable no-idle warm-pool misses; `0`/unset uses `DUCKGRES_WORKER_QUEUE_TIMEOUT` in remote K8s mode, and the effective worker queue timeout is extended when needed | `0` |
| `DUCKGRES_K8S_WARM_CAPACITY_MISSES_PER_WORKER` | Recent misses required for one extra dynamic warm worker | `8` |
| `DUCKGRES_K8S_WARM_CAPACITY_DEMAND_TTL` | Retention TTL for warm-capacity miss buckets; clamped to at least the miss window | `15m` |
| `DUCKGRES_K8S_WARM_CAPACITY_DYNAMIC_IMAGE_CEILING` | Max dynamic extra warm workers per image (`0` means unlimited) | `0` |
Expand Down
2 changes: 1 addition & 1 deletion configresolve/cliflags.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func RegisterCLIInputsFlags(fs *flag.FlagSet) func() CLIInputs {
k8sSharedWarmTarget := fs.Int("k8s-shared-warm-target", 0, "Neutral shared warm-worker target for K8s multi-tenant mode, 0=disabled (env: DUCKGRES_K8S_SHARED_WARM_TARGET)")
k8sDynamicWarmCapacityEnabled := fs.Bool("k8s-dynamic-warm-capacity-enabled", true, "Enable configstore-driven dynamic warm-capacity target computation (default true; use --k8s-dynamic-warm-capacity-enabled=false to disable; env: DUCKGRES_K8S_DYNAMIC_WARM_CAPACITY_ENABLED)")
k8sWarmCapacityMissWindow := fs.String("k8s-warm-capacity-miss-window", "", "Recent no-idle miss window for dynamic warm-capacity demand (default: 2m) (env: DUCKGRES_K8S_WARM_CAPACITY_MISS_WINDOW)")
k8sWarmAcquireTimeout := fs.String("k8s-warm-acquire-timeout", "", "How long a session-acquire blocks server-side for a warm colocated worker before returning backpressure (e.g. 5m; default 0 = fail fast) (env: DUCKGRES_K8S_WARM_ACQUIRE_TIMEOUT)")
k8sWarmAcquireTimeout := fs.String("k8s-warm-acquire-timeout", "", "How long a session-acquire waits server-side for a warm worker before timing out (e.g. 5m; default uses worker-queue-timeout in remote mode) (env: DUCKGRES_K8S_WARM_ACQUIRE_TIMEOUT)")
k8sWarmCapacityMissesPerWorker := fs.Int("k8s-warm-capacity-misses-per-worker", 0, "Recent misses required for one extra dynamic warm worker (default: 8) (env: DUCKGRES_K8S_WARM_CAPACITY_MISSES_PER_WORKER)")
k8sWarmCapacityDemandTTL := fs.String("k8s-warm-capacity-demand-ttl", "", "Retention TTL for warm-capacity miss buckets (default: 15m) (env: DUCKGRES_K8S_WARM_CAPACITY_DEMAND_TTL)")
k8sWarmCapacityDynamicImageCeiling := fs.Int("k8s-warm-capacity-dynamic-image-ceiling", 0, "Max dynamic extra warm workers per image, 0=unlimited (env: DUCKGRES_K8S_WARM_CAPACITY_DYNAMIC_IMAGE_CEILING)")
Expand Down
3 changes: 2 additions & 1 deletion configresolve/resolve_k8s_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ func TestResolveEffectiveK8sDynamicWarmCapacityPrecedence(t *testing.T) {
}

func TestResolveEffectiveK8sWarmAcquireTimeout(t *testing.T) {
// Default: unset everywhere => 0 (fail-fast).
// Default: unset everywhere => 0. Remote control-plane startup expands this
// to worker_queue_timeout so no-idle warm misses wait at the session boundary.
def := ResolveEffective(&configloader.FileConfig{}, CLIInputs{}, func(string) string { return "" }, nil)
if def.K8sWarmAcquireTimeout != 0 {
t.Fatalf("expected default warm-acquire-timeout 0, got %s", def.K8sWarmAcquireTimeout)
Expand Down
21 changes: 19 additions & 2 deletions controlplane/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type ControlPlaneConfig struct {
SocketDir string
ConfigPath string // Path to config file, passed to workers
HealthCheckInterval time.Duration
WorkerQueueTimeout time.Duration // How long to wait for an available worker/org connection slot (default: 60s)
WorkerQueueTimeout time.Duration // How long to wait for an available worker/org connection slot (default: 60s; extended to cover warm acquire waits)
WorkerIdleTimeout time.Duration // How long to keep an idle worker alive (default: 5m)
RetireOnSessionEnd bool // When true, process workers are retired immediately after their last session ends.
HandoverDrainTimeout time.Duration // How long to wait for connections to drain during upgrade. 0 = unbounded (wait until k8s SIGKILL via terminationGracePeriodSeconds). Default: 0 in remote mode (so a CP rolling out doesn't kill in-flight customer queries at a self-imposed wall — see drainAndShutdown), 24h in process mode.
Expand Down Expand Up @@ -115,7 +115,7 @@ type K8sConfig struct {
SharedWarmTarget int // Neutral shared warm-worker target for K8s multi-tenant mode (0 = disabled)
DynamicWarmCapacityEnabled bool // Enable configstore-driven dynamic warm-capacity target computation
WarmCapacityMissWindow time.Duration // Window of recent no-idle misses that contributes to dynamic targets
WarmAcquireTimeout time.Duration // Server-side block window for a session-acquire that misses the warm pool (0 = fail fast)
WarmAcquireTimeout time.Duration // Server-side block window for a session-acquire that misses the warm pool (0 = use worker queue timeout)
WarmCapacityMissesPerWorker int // Number of recent misses that translate to one extra warm worker
WarmCapacityDemandTTL time.Duration // Retention TTL for warm-capacity miss buckets
WarmCapacityDynamicImageCeiling int // Max dynamic extra warm workers per image (0 = unlimited)
Expand Down Expand Up @@ -256,6 +256,12 @@ func RunControlPlane(cfg ControlPlaneConfig) {
if cfg.WorkerQueueTimeout == 0 {
cfg.WorkerQueueTimeout = 60 * time.Second
}
if cfg.WorkerBackend == "remote" {
if cfg.K8s.WarmAcquireTimeout == 0 {
cfg.K8s.WarmAcquireTimeout = cfg.WorkerQueueTimeout
}
cfg.WorkerQueueTimeout = effectiveSessionAcquireTimeout(cfg.WorkerQueueTimeout, cfg.K8s.WarmAcquireTimeout)
}
if cfg.WorkerIdleTimeout == 0 {
cfg.WorkerIdleTimeout = 5 * time.Minute
}
Expand Down Expand Up @@ -727,6 +733,17 @@ func createSessionWithRegisteredCancel(
return createFn(ctx)
}

func effectiveSessionAcquireTimeout(workerQueueTimeout, warmAcquireTimeout time.Duration) time.Duration {
if warmAcquireTimeout <= 0 {
return workerQueueTimeout
}
warmWaitBudget := warmAcquireTimeout + WarmAcquireRetryInterval
if workerQueueTimeout <= 0 || warmWaitBudget > workerQueueTimeout {
return warmWaitBudget
}
return workerQueueTimeout
}

func sessionCreationErrorResponse(err error) (code string, message string) {
var capacityErr *WarmCapacityExhaustedError
switch {
Expand Down
42 changes: 41 additions & 1 deletion controlplane/control_cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,46 @@ func TestCreateSessionWithRegisteredCancel_CancelQueryCancelsWait(t *testing.T)
}
}

func TestEffectiveSessionAcquireTimeoutExtendsForWarmAcquire(t *testing.T) {
tests := []struct {
name string
workerQueueTimeout time.Duration
warmAcquireTimeout time.Duration
want time.Duration
}{
{
name: "worker queue only",
workerQueueTimeout: 60 * time.Second,
want: 60 * time.Second,
},
{
name: "warm acquire below queue",
workerQueueTimeout: 60 * time.Second,
warmAcquireTimeout: 30 * time.Second,
want: 60 * time.Second,
},
{
name: "warm acquire exceeds queue",
workerQueueTimeout: 60 * time.Second,
warmAcquireTimeout: 5 * time.Minute,
want: 5*time.Minute + WarmAcquireRetryInterval,
},
{
name: "warm acquire only",
warmAcquireTimeout: 90 * time.Second,
want: 90*time.Second + WarmAcquireRetryInterval,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := effectiveSessionAcquireTimeout(tt.workerQueueTimeout, tt.warmAcquireTimeout); got != tt.want {
t.Fatalf("effectiveSessionAcquireTimeout() = %s, want %s", got, tt.want)
}
})
}
}

func TestSessionCreationErrorResponse(t *testing.T) {
t.Run("cancelled", func(t *testing.T) {
code, message := sessionCreationErrorResponse(context.Canceled)
Expand Down Expand Up @@ -99,7 +139,7 @@ func TestSessionCreationErrorResponse(t *testing.T) {
if code != "53300" {
t.Fatalf("code = %q, want 53300", code)
}
want := "no warm Duckgres worker is currently available; retry in about 45 seconds"
want := "timed out waiting for an available worker"
if message != want {
t.Fatalf("message = %q, want %q", message, want)
}
Expand Down
10 changes: 5 additions & 5 deletions controlplane/k8s_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ type K8sWorkerPool struct {
nextWorkerID int
spawning int
maxWorkers int
// warmAcquireTimeout: server-side block window for a session-acquire that
// missed the warm pool (0 = fail fast). Read by OrgReservedPool.AcquireWorker.
// warmAcquireTimeout: optional server-side block window for a session-acquire
// that missed the warm pool. Read by OrgReservedPool.AcquireWorker.
warmAcquireTimeout time.Duration
minWorkers int
// perImageWarmTarget is an additive floor on top of minWorkers: for each
Expand Down Expand Up @@ -1653,9 +1653,9 @@ func (p *K8sWorkerPool) ReserveSharedWorker(ctx context.Context, assignment *Wor
// Runtime-store-less K8s mode uses the local worker map as its source
// of truth. Filter by assignment.Image so a per-org pin is honored: if
// the assignment names a specific image and no in-memory warm worker
// matches, fail fast with warm-capacity backpressure. Warm capacity is
// supplied by configured warm reconciliation rather than by the
// foreground user connection.
// matches, return the internal warm-capacity signal. Session acquisition
// keeps retryable no-idle misses waiting instead of exposing this to the
// client.
// Without this filter, a pinned org could be handed a default-image
// warm worker, and activation would fail with a DuckLake/extension
// version mismatch.
Expand Down
66 changes: 36 additions & 30 deletions controlplane/org_reserved_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,11 @@ package controlplane

import (
"context"
"errors"
"fmt"
"log/slog"
"time"

"github.com/posthog/duckgres/controlplane/configstore"
)

// isRetryableWarmMiss reports whether a worker-acquire error is a transient
// "no idle warm worker" miss — the only capacity miss that resolves on its own
// once the warm pool replenishes. Org/global-cap and shutdown misses are not
// retried (waiting won't change them).
func isRetryableWarmMiss(err error) bool {
var capErr *WarmCapacityExhaustedError
if !errors.As(err, &capErr) {
return false
}
return capErr.missReason() == configstore.WorkerClaimMissReasonNoIdle
}

// OrgReservedPool presents one org's reserved slice of a shared K8s warm pool.
// It preserves the existing WorkerPool contract for SessionManager while ensuring
// workers are reserved to a single org for their lifetime and retired after use.
Expand Down Expand Up @@ -77,11 +62,13 @@ func (p *OrgReservedPool) assignedColocatedResourcesLocked() (cpu int, memBytes
}

func (p *OrgReservedPool) AcquireWorker(ctx context.Context, profile *WorkerProfile) (*ManagedWorker, error) {
// Server-side patience: block up to warmAcquireTimeout for the warm pool to
// replenish before surfacing a "no warm worker" miss to the client. Always
// bounded by the request ctx, so a client with a short deadline still fails
// fast. 0 = legacy fail-fast.
warmDeadline := time.Now().Add(p.shared.warmAcquireTimeout)
// Server-side patience: transient no-idle warm-pool misses stay internal and
// are retried until the caller context or optional warm-acquire deadline
// expires. Org/global-cap and shutdown misses still return immediately.
var warmDeadline time.Time
if p.shared.warmAcquireTimeout > 0 {
warmDeadline = time.Now().Add(p.shared.warmAcquireTimeout)
}
// Throttle warm-miss recording across the wait's repeated polls.
var lastWarmMissAt time.Time
for {
Expand Down Expand Up @@ -151,16 +138,12 @@ func (p *OrgReservedPool) AcquireWorker(ctx context.Context, profile *WorkerProf
if recordMiss {
lastWarmMissAt = time.Now()
}
// Server-side patience: a transient no-idle miss on a colocated
// request resolves once the warm pool replenishes (a colocated
// shape may first need a cold node, minutes). Wait and retry until
// warmAcquireTimeout elapses rather than failing immediately;
// bounded by ctx. Restricted to colocated requests — a default /
// exclusive miss replenishes quickly and the client retries, so we
// don't block interactive/default traffic (e.g. data imports) for
// minutes.
if p.shared.warmAcquireTimeout > 0 && isColocated && isRetryableWarmMiss(err) && time.Now().Before(warmDeadline) {
timer := time.NewTimer(WarmAcquireRetryInterval)
if isRetryableWarmMiss(err) {
wait, ok := warmAcquireRetryDelay(ctx, warmDeadline)
if !ok {
return nil, context.DeadlineExceeded
}
timer := time.NewTimer(wait)
select {
case <-ctx.Done():
timer.Stop()
Expand Down Expand Up @@ -201,6 +184,29 @@ func (p *OrgReservedPool) AcquireWorker(ctx context.Context, profile *WorkerProf
}
}

func warmAcquireRetryDelay(ctx context.Context, warmDeadline time.Time) (time.Duration, bool) {
wait := WarmAcquireRetryInterval
if !warmDeadline.IsZero() {
remaining := time.Until(warmDeadline)
if remaining <= 0 {
return 0, false
}
if remaining < wait {
wait = remaining
}
}
if deadline, ok := ctx.Deadline(); ok {
remaining := time.Until(deadline)
if remaining <= 0 {
return 0, false
}
if remaining < wait {
wait = remaining
}
}
return wait, true
}

func (p *OrgReservedPool) ReleaseWorker(id int) {
if p.shared.TransitionToHotIdleIfNoSessions(id) {
return
Expand Down
56 changes: 56 additions & 0 deletions controlplane/org_reserved_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,62 @@ func TestIsRetryableWarmMiss(t *testing.T) {
}
}

func TestOrgReservedPoolWaitsForNoIdleWarmMissUntilWarmDeadline(t *testing.T) {
shared, _ := newTestK8sPool(t, 5)
shared.warmAcquireTimeout = 20 * time.Millisecond

pool := NewOrgReservedPool(shared, "analytics", 1, shared.workerImage, nil, 0, 0)

start := time.Now()
got, err := pool.AcquireWorker(context.Background(), nil)
if err != context.DeadlineExceeded {
t.Fatalf("expected deadline exceeded after waiting for warm capacity, got worker=%v err=%v", got, err)
}
if elapsed := time.Since(start); elapsed < shared.warmAcquireTimeout {
t.Fatalf("expected acquire to wait at least %s, returned after %s", shared.warmAcquireTimeout, elapsed)
}
}

func TestOrgReservedPoolAcquireNoIdleMissReturnsWorkerWhenWarmCapacityArrivesBeforeDeadline(t *testing.T) {
shared, _ := newTestK8sPool(t, 5)
shared.warmAcquireTimeout = 50 * time.Millisecond
shared.healthCheckFunc = func(ctx context.Context, worker *ManagedWorker) error {
return nil
}

pool := NewOrgReservedPool(shared, "analytics", 1, shared.workerImage, nil, 0, 0)
pool.activateReservedWorker = func(ctx context.Context, worker *ManagedWorker) error {
return nil
}

type acquireResult struct {
worker *ManagedWorker
err error
}
resultCh := make(chan acquireResult, 1)
go func() {
worker, err := pool.AcquireWorker(context.Background(), nil)
resultCh <- acquireResult{worker: worker, err: err}
}()

time.Sleep(5 * time.Millisecond)
shared.mu.Lock()
addNeutralWarmWorker(shared, 7)
shared.mu.Unlock()

select {
case result := <-resultCh:
if result.err != nil {
t.Fatalf("expected worker after warm capacity arrived, got error %v", result.err)
}
if result.worker == nil || result.worker.ID != 7 {
t.Fatalf("expected worker 7, got %#v", result.worker)
}
case <-time.After(500 * time.Millisecond):
t.Fatal("timed out waiting for acquire to use arriving warm capacity")
}
}

func TestOrgReservedPoolAcquireWaitsWhenSharedWarmWorkerBusyAtCapacity(t *testing.T) {
shared, _ := newTestK8sPool(t, 5)
worker := &ManagedWorker{ID: 3, activeSessions: 1, done: make(chan struct{})}
Expand Down
40 changes: 38 additions & 2 deletions controlplane/session_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,19 @@ func (sm *SessionManager) CreateSessionWithProtocol(ctx context.Context, usernam
acquireStart := time.Now()
ctx, acquireSpan := server.Tracer().Start(ctx, "duckgres.worker_acquire")
slog.Debug("Acquiring worker for session.", "pid", pid, "user", username)
worker, err := sm.pool.AcquireWorker(ctx, profile)
if err != nil {
var worker *ManagedWorker
for {
worker, err = sm.pool.AcquireWorker(ctx, profile)
if err == nil {
break
}
if isRetryableWarmMiss(err) {
if waitErr := waitForWarmAcquireRetry(ctx); waitErr != nil {
acquireSpan.End()
return 0, nil, fmt.Errorf("acquire worker: %w", waitErr)
}
continue
}
var capacityErr *WarmCapacityExhaustedError
if errors.As(err, &capacityErr) {
missReason := capacityErr.missReason()
Expand Down Expand Up @@ -300,6 +311,31 @@ func (sm *SessionManager) CreateSessionWithProtocol(ctx context.Context, usernam
return pid, exec, nil
}

func waitForWarmAcquireRetry(ctx context.Context) error {
wait := WarmAcquireRetryInterval
if deadline, ok := ctx.Deadline(); ok {
remaining := time.Until(deadline)
if remaining <= 0 {
if err := ctx.Err(); err != nil {
return err
}
return context.DeadlineExceeded
}
if remaining < wait {
wait = remaining
}
}

timer := time.NewTimer(wait)
defer timer.Stop()
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
return nil
}
}

func (sm *SessionManager) resolveSessionLimits(memoryLimit string, threads int) (string, int) {
if sm.rebalancer == nil {
return memoryLimit, threads
Expand Down
Loading
Loading