From 8a28b1f285e987222428bffcc9a49f66ce26daec Mon Sep 17 00:00:00 2001 From: Benjamin Knofe-Vider Date: Fri, 5 Jun 2026 14:00:53 +0200 Subject: [PATCH] feat(lakekeeper): drift-correct existing CRs instead of skipping provisioned orgs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit reconcileLakekeeper previously early-returned for any org with LakekeeperEndpoint already set, on the assumption that the operator's reconcile loop would carry future drift. That holds for fields already in the CR (e.g. image), but the operator can't add fields that aren't there — so a new field in the desired CR spec (resources, podMetadata, ...) never reaches existing per-org Lakekeepers. The Ready loop now re-applies just the CR spec for already-provisioned orgs: - buildCRSpec(w, in): single source of truth for the desired CR spec, shared by EnsureForOrg and the new path so the two never diverge. - EnsureCRSpec(ctx, w, in): lightweight drift correction — calls only k8s.EnsureCR (create-or-update), skipping the DB/Secret/REST pipeline. Idempotent; preserves the operator-owned status. - controller: the LakekeeperEndpoint-set branch resolves inputs and calls EnsureCRSpec instead of returning. The operator rolls the Deployment only when the spec actually changes. This is what makes the resources + podMetadata additions (PR #684) land on existing catalog pods rather than only new ones. Off main; complementary to #684 — until that merges, EnsureCR writes the same spec it does today, so this is a no-op on existing CRs. Rewrites TestReconcileLakekeeper_SkipsWhenAlreadyProvisioned -> _DriftCorrectsWhenAlreadyProvisioned to assert the CR is re-applied (image only, no pipeline side effects). All provisioner tests pass. --- controlplane/provisioner/controller.go | 22 +++++-- .../provisioner/controller_lakekeeper_test.go | 42 +++++++++--- .../provisioner/lakekeeper_provisioner.go | 66 ++++++++++++++----- 3 files changed, 99 insertions(+), 31 deletions(-) diff --git a/controlplane/provisioner/controller.go b/controlplane/provisioner/controller.go index 88cb6954..b33be458 100644 --- a/controlplane/provisioner/controller.go +++ b/controlplane/provisioner/controller.go @@ -467,15 +467,27 @@ func (c *Controller) reconcileLakekeeper(ctx context.Context, w *configstore.Man if w.Iceberg.ResolvedBackend() != configstore.IcebergBackendLakekeeper { return } + log := slog.With("org", w.OrgID, "phase", "lakekeeper") + if w.Iceberg.LakekeeperEndpoint != "" { - // Already provisioned. Future drift correction (e.g. image bumps) - // will be handled by the operator's reconcile loop reading the CR - // spec on every tick; nothing to do here. + // Already provisioned. Re-apply just the CR spec so changes to the + // desired shape (resources, podMetadata, image, ...) converge onto the + // existing CR. Cheap + idempotent: skips the DB/Secret/REST pipeline; + // the operator only rolls the Deployment when the spec actually changes. + // (The operator can't add fields that aren't in the CR, so a spec change + // in the provisioner must be written back here — it won't appear on its + // own.) + inputs, err := c.lakekeeperInputs(ctx, w) + if err != nil { + log.Warn("Failed to resolve lakekeeper inputs for CR drift correction.", "error", err) + return + } + if err := c.lakekeeperProvisioner.EnsureCRSpec(ctx, w, inputs); err != nil { + log.Warn("Lakekeeper CR drift correction failed.", "error", err) + } return } - log := slog.With("org", w.OrgID, "phase", "lakekeeper") - inputs, err := c.lakekeeperInputs(ctx, w) if err != nil { log.Warn("Failed to resolve lakekeeper provisioning inputs.", "error", err) diff --git a/controlplane/provisioner/controller_lakekeeper_test.go b/controlplane/provisioner/controller_lakekeeper_test.go index ac149e9b..51c9a124 100644 --- a/controlplane/provisioner/controller_lakekeeper_test.go +++ b/controlplane/provisioner/controller_lakekeeper_test.go @@ -107,8 +107,15 @@ func TestReconcileLakekeeper_SkipsWhenIcebergDisabled(t *testing.T) { } } -func TestReconcileLakekeeper_SkipsWhenAlreadyProvisioned(t *testing.T) { +func TestReconcileLakekeeper_DriftCorrectsWhenAlreadyProvisioned(t *testing.T) { + // An already-provisioned org (LakekeeperEndpoint set) must NOT be skipped: + // the Ready loop re-applies just the CR spec so changes to the desired shape + // (resources, podMetadata, image, ...) converge onto the existing CR. The + // full DB/Secret/REST pipeline must NOT run — only EnsureCR. called := false + k8sClient, dyn, _ := newFakeLakekeeperClient() + p := NewLakekeeperProvisioner(newFakeStore(), k8sClient, WithImage("drift:test")) + store := newFakeStore() store.warehouses["acme"] = &configstore.ManagedWarehouse{ OrgID: "acme", @@ -119,15 +126,32 @@ func TestReconcileLakekeeper_SkipsWhenAlreadyProvisioned(t *testing.T) { LakekeeperEndpoint: "http://lk-acme.lakekeeper.svc:8181/catalog", }, } - c := NewControllerWithClient(store, nil, 0) - c.lakekeeperProvisioner = &LakekeeperProvisioner{} - c.lakekeeperInputs = func(_ context.Context, _ *configstore.ManagedWarehouse) (ProvisioningInputs, error) { - called = true - return ProvisioningInputs{}, nil - } + c := NewControllerWithClient(store, nil, 0). + WithLakekeeperProvisioner(p, func(_ context.Context, _ *configstore.ManagedWarehouse) (ProvisioningInputs, error) { + called = true + // S3 deliberately unset: drift correction re-applies only the CR + // spec, so it must not depend on S3/REST inputs. + return ProvisioningInputs{PGHost: "acme-pg", PGPort: 5432, PGSSLMode: "require"}, nil + }) + c.reconcileLakekeeper(context.Background(), store.warehouses["acme"]) - if called { - t.Errorf("inputs resolver should not be called when LakekeeperEndpoint already set") + + if !called { + t.Fatalf("inputs resolver should be called for CR drift correction when already provisioned") + } + // EnsureCRSpec create-or-updated the CR with the desired spec, without the + // DB/Secret/REST pipeline running. + got, err := dyn.Resource(lakekeeperGVR).Namespace(k8sClient.namespace).Get(context.Background(), LakekeeperResourceName("acme"), metav1.GetOptions{}) + if err != nil { + t.Fatalf("CR not applied by drift correction: %v", err) + } + if spec := got.Object["spec"].(map[string]interface{}); spec["image"] != "drift:test" { + t.Errorf("CR image = %v, want drift:test", spec["image"]) + } + // The warehouse row stays untouched — drift correction never re-runs the + // pipeline, so it doesn't rewrite the persisted warehouse name. + if store.warehouses["acme"].Iceberg.LakekeeperWarehouse != "" { + t.Errorf("drift correction unexpectedly wrote the warehouse row") } } diff --git a/controlplane/provisioner/lakekeeper_provisioner.go b/controlplane/provisioner/lakekeeper_provisioner.go index 618fc66c..9550133b 100644 --- a/controlplane/provisioner/lakekeeper_provisioner.go +++ b/controlplane/provisioner/lakekeeper_provisioner.go @@ -233,23 +233,7 @@ func (p *LakekeeperProvisioner) EnsureForOrg(ctx context.Context, w *configstore } // 3. Apply the Lakekeeper CR pointing at the org's PG + the Secret. - pgPort := in.PGPort - if pgPort == 0 { - pgPort = 5432 - } - if err := p.k8s.EnsureCR(ctx, LakekeeperCRSpec{ - OrgID: w.OrgID, - Image: p.image, - Replicas: 1, - PGHost: in.PGHost, - PGPort: pgPort, - PGDatabase: dbName, - SecretName: secretName, - BaseURI: baseURL, - PGSSLMode: in.PGSSLMode, - ServiceAccountName: LakekeeperServiceAccountName(w.OrgID), - KubernetesAuthAudiences: in.KubernetesAuthAudiences, - }); err != nil { + if err := p.k8s.EnsureCR(ctx, p.buildCRSpec(w, in)); err != nil { return fmt.Errorf("ensure lakekeeper cr: %w", err) } @@ -335,6 +319,54 @@ func (p *LakekeeperProvisioner) EnsureForOrg(ctx context.Context, w *configstore return nil } +// buildCRSpec assembles the desired Lakekeeper CR spec for an org. Shared by +// EnsureForOrg (initial provisioning) and EnsureCRSpec (drift correction) so the +// two never diverge — every field the operator renders into the Deployment is +// defined in exactly one place. +func (p *LakekeeperProvisioner) buildCRSpec(w *configstore.ManagedWarehouse, in ProvisioningInputs) LakekeeperCRSpec { + dbName := lakekeeperDBName(w.OrgID) + if in.PGPreProvisioned { + dbName = in.PGDatabase + } + pgPort := in.PGPort + if pgPort == 0 { + pgPort = 5432 + } + resourceName := LakekeeperResourceName(w.OrgID) + return LakekeeperCRSpec{ + OrgID: w.OrgID, + Image: p.image, + Replicas: 1, + PGHost: in.PGHost, + PGPort: pgPort, + PGDatabase: dbName, + SecretName: resourceName, + BaseURI: fmt.Sprintf("http://%s.%s.svc:8181", resourceName, p.k8s.namespace), + PGSSLMode: in.PGSSLMode, + ServiceAccountName: LakekeeperServiceAccountName(w.OrgID), + KubernetesAuthAudiences: in.KubernetesAuthAudiences, + } +} + +// EnsureCRSpec re-applies only the Lakekeeper CR spec for an org, skipping the +// database / Secret / REST-warehouse pipeline. It's the drift-correction path +// for already-provisioned orgs: when the desired CR shape changes (resources, +// podMetadata, image, ...), the controller's Ready loop calls this so existing +// CRs converge without re-running full provisioning. Idempotent — EnsureCR does +// create-or-update and preserves the operator-owned status. Field validation is +// delegated to EnsureCR (OrgID/Image/PGHost/PGDatabase/SecretName required), so +// a momentarily-incomplete inputs resolution surfaces as a logged error and a +// skipped tick rather than a partial write. +func (p *LakekeeperProvisioner) EnsureCRSpec(ctx context.Context, w *configstore.ManagedWarehouse, in ProvisioningInputs) error { + if w == nil { + return errors.New("EnsureCRSpec: warehouse is nil") + } + if !isValidOrgIDLabel(w.OrgID) { + return fmt.Errorf("EnsureCRSpec: orgID %q is not a valid K8s label value", w.OrgID) + } + return p.k8s.EnsureCR(ctx, p.buildCRSpec(w, in)) +} + // DeleteForOrg tears down the per-org Lakekeeper instance that EnsureForOrg // created: the CR (which cascades to the operator-managed Deployment, Service, // and migration Job via ownerReferences) plus the standalone Secret and