Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,8 @@
- Use `just` recipes instead of raw commands (run `just` to see all recipes).
- `just ci` runs the full local CI pipeline (lint + unit + integration + controlplane tests).
- `just lint` runs `golangci-lint` (not `go vet` — CI uses golangci-lint).

## Security / Data Handling
- **This repo is public.** Never expose customer or internal data in anything that lands here — PR titles/bodies, commit messages, code, comments, or test fixtures.
- This includes customer/org IDs and UUIDs, customer names, internal hostnames/cluster names/endpoints, secrets, and internal-only identifiers.
- When a diagnostic detail is needed for context, redact it (`<org-id>`, `org A`/`org B`, "the prod cluster") or keep it in the local conversation only — not in the published artifact.
23 changes: 9 additions & 14 deletions controlplane/provisioner/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,20 +470,15 @@ func (c *Controller) reconcileLakekeeper(ctx context.Context, w *configstore.Man
log := slog.With("org", w.OrgID, "phase", "lakekeeper")

if w.Iceberg.LakekeeperEndpoint != "" {
// Already provisioned. Re-apply just the CR spec so changes to the
// desired shape (resources, podMetadata, image, ...) converge onto the
// existing CR. Cheap + idempotent: skips the DB/Secret/REST pipeline;
// the operator only rolls the Deployment when the spec actually changes.
// (The operator can't add fields that aren't in the CR, so a spec change
// in the provisioner must be written back here — it won't appear on its
// own.)
inputs, err := c.lakekeeperInputs(ctx, w)
if err != nil {
log.Warn("Failed to resolve lakekeeper inputs for CR drift correction.", "error", err)
return
}
if err := c.lakekeeperProvisioner.EnsureCRSpec(ctx, w, inputs); err != nil {
log.Warn("Lakekeeper CR drift correction failed.", "error", err)
// Already provisioned. Converge the pod-shape fields (replicas, resource
// requests, scrape annotations) onto the org's existing CR(s) via a
// label-matched merge patch — no inputs needed, never recreates under a
// new name. The operator rolls the Deployment only when the spec actually
// changes. (The operator can't add fields that aren't in the CR, so a
// spec change in the provisioner must be written back here — it won't
// appear on its own.)
if err := c.lakekeeperProvisioner.PatchPodShape(ctx, w.OrgID); err != nil {
log.Warn("Lakekeeper CR pod-shape drift correction failed.", "error", err)
}
return
}
Expand Down
60 changes: 40 additions & 20 deletions controlplane/provisioner/controller_lakekeeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/posthog/duckgres/controlplane/configstore"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)

func TestReconcileDeleting_TearsDownLakekeeper(t *testing.T) {
Expand Down Expand Up @@ -108,13 +109,30 @@ func TestReconcileLakekeeper_SkipsWhenIcebergDisabled(t *testing.T) {
}

func TestReconcileLakekeeper_DriftCorrectsWhenAlreadyProvisioned(t *testing.T) {
// An already-provisioned org (LakekeeperEndpoint set) must NOT be skipped:
// the Ready loop re-applies just the CR spec so changes to the desired shape
// (resources, podMetadata, image, ...) converge onto the existing CR. The
// full DB/Secret/REST pipeline must NOT run — only EnsureCR.
// An already-provisioned org (LakekeeperEndpoint set) is NOT skipped: the
// Ready loop patches the pod shape (replicas/requests/scrape) onto the org's
// existing CR, matched by the duckgres/active-org label. No inputs are
// resolved, and no duplicate CR is created under the recomputed name.
called := false
k8sClient, dyn, _ := newFakeLakekeeperClient()
p := NewLakekeeperProvisioner(newFakeStore(), k8sClient, WithImage("drift:test"))
p := NewLakekeeperProvisioner(newFakeStore(), k8sClient)

// Seed a legacy-named CR carrying the org label. Its name intentionally
// differs from LakekeeperResourceName("acme") to prove the patch is matched
// by label, not by a recomputed name (the post-#632 hyphenation bug).
seed := &unstructured.Unstructured{Object: map[string]interface{}{
"apiVersion": "lakekeeper.k8s.lakekeeper.io/v1alpha1",
"kind": "Lakekeeper",
"metadata": map[string]interface{}{
"name": "lakekeeper-acme-legacy",
"namespace": k8sClient.namespace,
"labels": map[string]interface{}{"duckgres/active-org": "acme"},
},
"spec": map[string]interface{}{"replicas": int64(1)},
}}
if _, err := dyn.Resource(lakekeeperGVR).Namespace(k8sClient.namespace).Create(context.Background(), seed, metav1.CreateOptions{}); err != nil {
t.Fatalf("seed CR: %v", err)
}

store := newFakeStore()
store.warehouses["acme"] = &configstore.ManagedWarehouse{
Expand All @@ -129,29 +147,31 @@ func TestReconcileLakekeeper_DriftCorrectsWhenAlreadyProvisioned(t *testing.T) {
c := NewControllerWithClient(store, nil, 0).
WithLakekeeperProvisioner(p, func(_ context.Context, _ *configstore.ManagedWarehouse) (ProvisioningInputs, error) {
called = true
// S3 deliberately unset: drift correction re-applies only the CR
// spec, so it must not depend on S3/REST inputs.
return ProvisioningInputs{PGHost: "acme-pg", PGPort: 5432, PGSSLMode: "require"}, nil
return ProvisioningInputs{}, nil
})

c.reconcileLakekeeper(context.Background(), store.warehouses["acme"])

if !called {
t.Fatalf("inputs resolver should be called for CR drift correction when already provisioned")
// Pod-shape drift correction needs no inputs — the resolver must not run.
if called {
t.Errorf("inputs resolver should NOT be called for pod-shape drift correction")
}
// EnsureCRSpec create-or-updated the CR with the desired spec, without the
// DB/Secret/REST pipeline running.
got, err := dyn.Resource(lakekeeperGVR).Namespace(k8sClient.namespace).Get(context.Background(), LakekeeperResourceName("acme"), metav1.GetOptions{})
// The existing legacy-named CR was patched in place by label.
got, err := dyn.Resource(lakekeeperGVR).Namespace(k8sClient.namespace).Get(context.Background(), "lakekeeper-acme-legacy", metav1.GetOptions{})
if err != nil {
t.Fatalf("CR not applied by drift correction: %v", err)
t.Fatalf("get patched CR: %v", err)
}
spec := got.Object["spec"].(map[string]interface{})
if rv := spec["replicas"]; rv != int64(lakekeeperPodReplicas) && rv != float64(lakekeeperPodReplicas) {
t.Errorf("replicas = %v, want %d", rv, lakekeeperPodReplicas)
}
if spec := got.Object["spec"].(map[string]interface{}); spec["image"] != "drift:test" {
t.Errorf("CR image = %v, want drift:test", spec["image"])
ann := spec["podMetadata"].(map[string]interface{})["annotations"].(map[string]interface{})
if ann["prometheus.io/scrape"] != "true" {
t.Errorf("scrape annotation not applied: %v", ann)
}
// The warehouse row stays untouched — drift correction never re-runs the
// pipeline, so it doesn't rewrite the persisted warehouse name.
if store.warehouses["acme"].Iceberg.LakekeeperWarehouse != "" {
t.Errorf("drift correction unexpectedly wrote the warehouse row")
// No duplicate CR created under the recomputed (hyphenated) name.
if _, err := dyn.Resource(lakekeeperGVR).Namespace(k8sClient.namespace).Get(context.Background(), LakekeeperResourceName("acme"), metav1.GetOptions{}); err == nil {
t.Errorf("drift correction created a duplicate CR under the recomputed name")
}
}

Expand Down
119 changes: 91 additions & 28 deletions controlplane/provisioner/lakekeeper_k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package provisioner

import (
"context"
"encoding/json"
"fmt"
"regexp"

Expand All @@ -12,6 +13,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand All @@ -32,12 +34,15 @@ func isValidOrgIDLabel(orgID string) bool {
// by default but we co-locate the CRs to keep RBAC tight.
const LakekeeperNamespace = "lakekeeper"

// Per-org Lakekeeper pod resource shape. Requests == limits → Guaranteed QoS.
// Lakekeeper is a light Rust REST catalog (mostly idle metadata ops), so a
// modest fixed shape is plenty; bump these if a tenant needs more headroom.
// Per-org Lakekeeper pod resource shape. Requests only (no limits) → Burstable
// QoS: a CPU limit would CFS-throttle the catalog, and we intentionally leave
// memory unbounded too. Lakekeeper is a light Rust REST catalog (mostly idle
// metadata ops), so a modest request floor is plenty; bump if a tenant needs
// more headroom.
const (
lakekeeperPodCPU = "500m"
lakekeeperPodMemory = "512Mi"
lakekeeperPodCPU = "500m"
lakekeeperPodMemory = "512Mi"
lakekeeperPodReplicas = 2
)

// lakekeeperMetricsPort is the operator's default metrics container port
Expand All @@ -46,6 +51,78 @@ const (
// the value advertised to vmagent via the prometheus.io/port pod annotation.
const lakekeeperMetricsPort = "9000"

// lakekeeperResourceRequests returns the CR spec.resources block: requests only,
// no limits (Burstable). Shared by EnsureCR (create) and PatchPodShape (drift)
// so the create and drift paths never diverge.
func lakekeeperResourceRequests() map[string]interface{} {
return map[string]interface{}{
"requests": map[string]interface{}{
"cpu": lakekeeperPodCPU,
"memory": lakekeeperPodMemory,
},
}
}

// lakekeeperPodMetadata returns the CR spec.podMetadata block carrying the
// Prometheus scrape annotations (see EnsureCR for the rationale).
func lakekeeperPodMetadata() map[string]interface{} {
return map[string]interface{}{
"annotations": map[string]interface{}{
"prometheus.io/scrape": "true",
"prometheus.io/port": lakekeeperMetricsPort,
"prometheus.io/path": "/metrics",
},
}
}

// PatchPodShape converges the pod-shape fields (replicas + resource requests +
// scrape annotations) onto every existing Lakekeeper CR for the org, matched by
// the duckgres/active-org label.
//
// It deliberately does NOT recompute the CR name from the orgID. Post-#632,
// LakekeeperResourceName preserves hyphens, but a legacy org's CR — and its
// Secret, ServiceAccount, and EKS pod-identity, all derived from the no-hyphen
// Duckling XR name — keeps the de-hyphenated name. Looking up by label patches
// whatever name actually exists (no-dash for legacy orgs, hyphenated for new
// ones) instead of minting a duplicate CR under a name that has no matching
// Secret/SA/pod-identity.
//
// Uses a JSON merge patch, which carries no resourceVersion, so it never races
// the operator's frequent status writes (the "object has been modified"
// conflicts seen under multiple control-plane replicas). limits is explicitly
// nulled so the patch strips any stale CPU/memory limit (requests-only shape).
func (c *LakekeeperK8sClient) PatchPodShape(ctx context.Context, orgID string) error {
if !isValidOrgIDLabel(orgID) {
return fmt.Errorf("PatchPodShape: orgID %q is not a valid K8s label value", orgID)
}
resource := c.dynamic.Resource(lakekeeperGVR).Namespace(c.namespace)
list, err := resource.List(ctx, metav1.ListOptions{
LabelSelector: "duckgres/active-org=" + orgID,
})
if err != nil {
return fmt.Errorf("list lakekeeper CRs for org %s: %w", orgID, err)
}
resources := lakekeeperResourceRequests()
resources["limits"] = nil // merge-patch removes any pre-existing limit block
patch, err := json.Marshal(map[string]interface{}{
"spec": map[string]interface{}{
"replicas": lakekeeperPodReplicas,
"resources": resources,
"podMetadata": lakekeeperPodMetadata(),
},
})
if err != nil {
return fmt.Errorf("marshal pod-shape patch: %w", err)
}
for i := range list.Items {
name := list.Items[i].GetName()
if _, err := resource.Patch(ctx, name, types.MergePatchType, patch, metav1.PatchOptions{}); err != nil {
return fmt.Errorf("patch lakekeeper CR %s pod shape: %w", name, err)
}
}
return nil
}

// lakekeeperGVR matches the operator at /Users/james/opt/ph/lakekeeper-operator.
var lakekeeperGVR = schema.GroupVersionResource{
Group: "lakekeeper.k8s.lakekeeper.io",
Expand Down Expand Up @@ -265,7 +342,7 @@ func (c *LakekeeperK8sClient) EnsureCR(ctx context.Context, spec LakekeeperCRSpe
return fmt.Errorf("EnsureCR: missing required field in spec: %+v", spec)
}
if spec.Replicas == 0 {
spec.Replicas = 1
spec.Replicas = lakekeeperPodReplicas
}
if spec.PGPort == 0 {
spec.PGPort = 5432
Expand Down Expand Up @@ -341,21 +418,13 @@ func (c *LakekeeperK8sClient) EnsureCR(ctx context.Context, spec LakekeeperCRSpe
},
},
},
// Pin a fixed pod shape with requests == limits → Guaranteed QoS.
// The managed-warehouse clusters require it; an unbounded catalog
// pod runs BestEffort and is first evicted under node pressure.
// Lakekeeper is a light Rust REST catalog, so a modest shape is
// plenty — tune the consts if a tenant needs more.
"resources": map[string]interface{}{
"requests": map[string]interface{}{
"cpu": lakekeeperPodCPU,
"memory": lakekeeperPodMemory,
},
"limits": map[string]interface{}{
"cpu": lakekeeperPodCPU,
"memory": lakekeeperPodMemory,
},
},
// Pin a resource request floor (requests only, no limits →
// Burstable). Without it the catalog pod runs BestEffort and is
// first evicted under node pressure; a CPU limit would CFS-throttle
// it. Lakekeeper is a light Rust REST catalog, so a modest floor is
// plenty — tune the consts if a tenant needs more. Kept in sync with
// the drift-correction patch in PatchPodShape.
"resources": lakekeeperResourceRequests(),
// Stamp Prometheus scrape annotations onto the operator-managed
// pods. The managed-warehouse clusters have no prometheus-operator;
// vmagent discovers targets by pod annotation (kubernetes_sd), and
Expand All @@ -367,13 +436,7 @@ func (c *LakekeeperK8sClient) EnsureCR(ctx context.Context, spec LakekeeperCRSpe
// posthog/serviceaccountname); on an operator without it the CRD
// prunes the field and these annotations are dropped — a safe no-op
// until the new operator image ships.
"podMetadata": map[string]interface{}{
"annotations": map[string]interface{}{
"prometheus.io/scrape": "true",
"prometheus.io/port": lakekeeperMetricsPort,
"prometheus.io/path": "/metrics",
},
},
"podMetadata": lakekeeperPodMetadata(),
},
},
}
Expand Down
71 changes: 65 additions & 6 deletions controlplane/provisioner/lakekeeper_k8s_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,15 +225,18 @@ func TestEnsureCR_CreateAndShape(t *testing.T) {
if pg["host"] != spec.PGHost || pg["database"] != spec.PGDatabase {
t.Errorf("pg host/db = %v/%v, want %s/%s", pg["host"], pg["database"], spec.PGHost, spec.PGDatabase)
}
// Resources are pinned with requests == limits (Guaranteed QoS).
// Resources are requests-only (no limits → Burstable).
res := specMap["resources"].(map[string]interface{})
reqs := res["requests"].(map[string]interface{})
lims := res["limits"].(map[string]interface{})
if reqs["cpu"] != lakekeeperPodCPU || lims["cpu"] != lakekeeperPodCPU {
t.Errorf("cpu req/lim = %v/%v, want %s/%s", reqs["cpu"], lims["cpu"], lakekeeperPodCPU, lakekeeperPodCPU)
if reqs["cpu"] != lakekeeperPodCPU || reqs["memory"] != lakekeeperPodMemory {
t.Errorf("requests cpu/mem = %v/%v, want %s/%s", reqs["cpu"], reqs["memory"], lakekeeperPodCPU, lakekeeperPodMemory)
}
if reqs["memory"] != lakekeeperPodMemory || lims["memory"] != lakekeeperPodMemory {
t.Errorf("memory req/lim = %v/%v, want %s/%s", reqs["memory"], lims["memory"], lakekeeperPodMemory, lakekeeperPodMemory)
if _, hasLimits := res["limits"]; hasLimits {
t.Errorf("resources.limits set, want none (requests-only): %v", res["limits"])
}
// Two replicas.
if specMap["replicas"] != int64(lakekeeperPodReplicas) {
t.Errorf("replicas = %v, want %d", specMap["replicas"], lakekeeperPodReplicas)
}
// Prometheus scrape annotations are stamped onto the pod via podMetadata.
ann := specMap["podMetadata"].(map[string]interface{})["annotations"].(map[string]interface{})
Expand All @@ -248,6 +251,62 @@ func TestEnsureCR_CreateAndShape(t *testing.T) {
}
}

func TestPatchPodShape_LabelMatchedStripsLimits(t *testing.T) {
c, dc, _ := newFakeLakekeeperClient()
ctx := context.Background()
// Two CRs for one org under different names (legacy de-hyphenated + new
// hyphenated), both label-tagged; the first carries a stale limits block.
names := []string{"lakekeeper-acme", "lakekeeper-a-c-m-e"}
for i, name := range names {
spec := map[string]interface{}{"replicas": int64(1)}
if i == 0 {
spec["resources"] = map[string]interface{}{
"limits": map[string]interface{}{"cpu": "250m", "memory": "256Mi"},
"requests": map[string]interface{}{"cpu": "250m", "memory": "256Mi"},
}
}
cr := &unstructured.Unstructured{Object: map[string]interface{}{
"apiVersion": "lakekeeper.k8s.lakekeeper.io/v1alpha1",
"kind": "Lakekeeper",
"metadata": map[string]interface{}{
"name": name,
"namespace": "lakekeeper",
"labels": map[string]interface{}{"duckgres/active-org": "acme"},
},
"spec": spec,
}}
if _, err := dc.Resource(lakekeeperGVR).Namespace("lakekeeper").Create(ctx, cr, metav1.CreateOptions{}); err != nil {
t.Fatalf("seed %s: %v", name, err)
}
}

if err := c.PatchPodShape(ctx, "acme"); err != nil {
t.Fatalf("PatchPodShape: %v", err)
}

for _, name := range names {
got, err := dc.Resource(lakekeeperGVR).Namespace("lakekeeper").Get(ctx, name, metav1.GetOptions{})
if err != nil {
t.Fatalf("get %s: %v", name, err)
}
spec := got.Object["spec"].(map[string]interface{})
if rv := spec["replicas"]; rv != int64(lakekeeperPodReplicas) && rv != float64(lakekeeperPodReplicas) {
t.Errorf("%s replicas = %v, want %d", name, rv, lakekeeperPodReplicas)
}
res := spec["resources"].(map[string]interface{})
if _, ok := res["limits"]; ok {
t.Errorf("%s resources.limits still present after patch: %v", name, res["limits"])
}
if res["requests"].(map[string]interface{})["cpu"] != lakekeeperPodCPU {
t.Errorf("%s requests.cpu = %v, want %s", name, res["requests"], lakekeeperPodCPU)
}
ann := spec["podMetadata"].(map[string]interface{})["annotations"].(map[string]interface{})
if ann["prometheus.io/scrape"] != "true" {
t.Errorf("%s scrape annotation not applied: %v", name, ann)
}
}
}

func TestEnsureCR_KubernetesAuthOff_OmitsAuthenticationBlock(t *testing.T) {
c, dc, _ := newFakeLakekeeperClient()
ctx := context.Background()
Expand Down
Loading
Loading