diff --git a/controlplane/provisioner/controller.go b/controlplane/provisioner/controller.go index 88cb6954..b33be458 100644 --- a/controlplane/provisioner/controller.go +++ b/controlplane/provisioner/controller.go @@ -467,15 +467,27 @@ func (c *Controller) reconcileLakekeeper(ctx context.Context, w *configstore.Man if w.Iceberg.ResolvedBackend() != configstore.IcebergBackendLakekeeper { return } + log := slog.With("org", w.OrgID, "phase", "lakekeeper") + if w.Iceberg.LakekeeperEndpoint != "" { - // Already provisioned. Future drift correction (e.g. image bumps) - // will be handled by the operator's reconcile loop reading the CR - // spec on every tick; nothing to do here. + // Already provisioned. Re-apply just the CR spec so changes to the + // desired shape (resources, podMetadata, image, ...) converge onto the + // existing CR. Cheap + idempotent: skips the DB/Secret/REST pipeline; + // the operator only rolls the Deployment when the spec actually changes. + // (The operator can't add fields that aren't in the CR, so a spec change + // in the provisioner must be written back here — it won't appear on its + // own.) + inputs, err := c.lakekeeperInputs(ctx, w) + if err != nil { + log.Warn("Failed to resolve lakekeeper inputs for CR drift correction.", "error", err) + return + } + if err := c.lakekeeperProvisioner.EnsureCRSpec(ctx, w, inputs); err != nil { + log.Warn("Lakekeeper CR drift correction failed.", "error", err) + } return } - log := slog.With("org", w.OrgID, "phase", "lakekeeper") - inputs, err := c.lakekeeperInputs(ctx, w) if err != nil { log.Warn("Failed to resolve lakekeeper provisioning inputs.", "error", err) diff --git a/controlplane/provisioner/controller_lakekeeper_test.go b/controlplane/provisioner/controller_lakekeeper_test.go index ac149e9b..51c9a124 100644 --- a/controlplane/provisioner/controller_lakekeeper_test.go +++ b/controlplane/provisioner/controller_lakekeeper_test.go @@ -107,8 +107,15 @@ func TestReconcileLakekeeper_SkipsWhenIcebergDisabled(t *testing.T) { } } -func TestReconcileLakekeeper_SkipsWhenAlreadyProvisioned(t *testing.T) { +func TestReconcileLakekeeper_DriftCorrectsWhenAlreadyProvisioned(t *testing.T) { + // An already-provisioned org (LakekeeperEndpoint set) must NOT be skipped: + // the Ready loop re-applies just the CR spec so changes to the desired shape + // (resources, podMetadata, image, ...) converge onto the existing CR. The + // full DB/Secret/REST pipeline must NOT run — only EnsureCR. called := false + k8sClient, dyn, _ := newFakeLakekeeperClient() + p := NewLakekeeperProvisioner(newFakeStore(), k8sClient, WithImage("drift:test")) + store := newFakeStore() store.warehouses["acme"] = &configstore.ManagedWarehouse{ OrgID: "acme", @@ -119,15 +126,32 @@ func TestReconcileLakekeeper_SkipsWhenAlreadyProvisioned(t *testing.T) { LakekeeperEndpoint: "http://lk-acme.lakekeeper.svc:8181/catalog", }, } - c := NewControllerWithClient(store, nil, 0) - c.lakekeeperProvisioner = &LakekeeperProvisioner{} - c.lakekeeperInputs = func(_ context.Context, _ *configstore.ManagedWarehouse) (ProvisioningInputs, error) { - called = true - return ProvisioningInputs{}, nil - } + c := NewControllerWithClient(store, nil, 0). + WithLakekeeperProvisioner(p, func(_ context.Context, _ *configstore.ManagedWarehouse) (ProvisioningInputs, error) { + called = true + // S3 deliberately unset: drift correction re-applies only the CR + // spec, so it must not depend on S3/REST inputs. + return ProvisioningInputs{PGHost: "acme-pg", PGPort: 5432, PGSSLMode: "require"}, nil + }) + c.reconcileLakekeeper(context.Background(), store.warehouses["acme"]) - if called { - t.Errorf("inputs resolver should not be called when LakekeeperEndpoint already set") + + if !called { + t.Fatalf("inputs resolver should be called for CR drift correction when already provisioned") + } + // EnsureCRSpec create-or-updated the CR with the desired spec, without the + // DB/Secret/REST pipeline running. + got, err := dyn.Resource(lakekeeperGVR).Namespace(k8sClient.namespace).Get(context.Background(), LakekeeperResourceName("acme"), metav1.GetOptions{}) + if err != nil { + t.Fatalf("CR not applied by drift correction: %v", err) + } + if spec := got.Object["spec"].(map[string]interface{}); spec["image"] != "drift:test" { + t.Errorf("CR image = %v, want drift:test", spec["image"]) + } + // The warehouse row stays untouched — drift correction never re-runs the + // pipeline, so it doesn't rewrite the persisted warehouse name. + if store.warehouses["acme"].Iceberg.LakekeeperWarehouse != "" { + t.Errorf("drift correction unexpectedly wrote the warehouse row") } } diff --git a/controlplane/provisioner/lakekeeper_provisioner.go b/controlplane/provisioner/lakekeeper_provisioner.go index 618fc66c..9550133b 100644 --- a/controlplane/provisioner/lakekeeper_provisioner.go +++ b/controlplane/provisioner/lakekeeper_provisioner.go @@ -233,23 +233,7 @@ func (p *LakekeeperProvisioner) EnsureForOrg(ctx context.Context, w *configstore } // 3. Apply the Lakekeeper CR pointing at the org's PG + the Secret. - pgPort := in.PGPort - if pgPort == 0 { - pgPort = 5432 - } - if err := p.k8s.EnsureCR(ctx, LakekeeperCRSpec{ - OrgID: w.OrgID, - Image: p.image, - Replicas: 1, - PGHost: in.PGHost, - PGPort: pgPort, - PGDatabase: dbName, - SecretName: secretName, - BaseURI: baseURL, - PGSSLMode: in.PGSSLMode, - ServiceAccountName: LakekeeperServiceAccountName(w.OrgID), - KubernetesAuthAudiences: in.KubernetesAuthAudiences, - }); err != nil { + if err := p.k8s.EnsureCR(ctx, p.buildCRSpec(w, in)); err != nil { return fmt.Errorf("ensure lakekeeper cr: %w", err) } @@ -335,6 +319,54 @@ func (p *LakekeeperProvisioner) EnsureForOrg(ctx context.Context, w *configstore return nil } +// buildCRSpec assembles the desired Lakekeeper CR spec for an org. Shared by +// EnsureForOrg (initial provisioning) and EnsureCRSpec (drift correction) so the +// two never diverge — every field the operator renders into the Deployment is +// defined in exactly one place. +func (p *LakekeeperProvisioner) buildCRSpec(w *configstore.ManagedWarehouse, in ProvisioningInputs) LakekeeperCRSpec { + dbName := lakekeeperDBName(w.OrgID) + if in.PGPreProvisioned { + dbName = in.PGDatabase + } + pgPort := in.PGPort + if pgPort == 0 { + pgPort = 5432 + } + resourceName := LakekeeperResourceName(w.OrgID) + return LakekeeperCRSpec{ + OrgID: w.OrgID, + Image: p.image, + Replicas: 1, + PGHost: in.PGHost, + PGPort: pgPort, + PGDatabase: dbName, + SecretName: resourceName, + BaseURI: fmt.Sprintf("http://%s.%s.svc:8181", resourceName, p.k8s.namespace), + PGSSLMode: in.PGSSLMode, + ServiceAccountName: LakekeeperServiceAccountName(w.OrgID), + KubernetesAuthAudiences: in.KubernetesAuthAudiences, + } +} + +// EnsureCRSpec re-applies only the Lakekeeper CR spec for an org, skipping the +// database / Secret / REST-warehouse pipeline. It's the drift-correction path +// for already-provisioned orgs: when the desired CR shape changes (resources, +// podMetadata, image, ...), the controller's Ready loop calls this so existing +// CRs converge without re-running full provisioning. Idempotent — EnsureCR does +// create-or-update and preserves the operator-owned status. Field validation is +// delegated to EnsureCR (OrgID/Image/PGHost/PGDatabase/SecretName required), so +// a momentarily-incomplete inputs resolution surfaces as a logged error and a +// skipped tick rather than a partial write. +func (p *LakekeeperProvisioner) EnsureCRSpec(ctx context.Context, w *configstore.ManagedWarehouse, in ProvisioningInputs) error { + if w == nil { + return errors.New("EnsureCRSpec: warehouse is nil") + } + if !isValidOrgIDLabel(w.OrgID) { + return fmt.Errorf("EnsureCRSpec: orgID %q is not a valid K8s label value", w.OrgID) + } + return p.k8s.EnsureCR(ctx, p.buildCRSpec(w, in)) +} + // DeleteForOrg tears down the per-org Lakekeeper instance that EnsureForOrg // created: the CR (which cascades to the operator-managed Deployment, Service, // and migration Job via ownerReferences) plus the standalone Secret and