Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions controlplane/provisioner/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,15 +467,27 @@ func (c *Controller) reconcileLakekeeper(ctx context.Context, w *configstore.Man
if w.Iceberg.ResolvedBackend() != configstore.IcebergBackendLakekeeper {
return
}
log := slog.With("org", w.OrgID, "phase", "lakekeeper")

if w.Iceberg.LakekeeperEndpoint != "" {
// Already provisioned. Future drift correction (e.g. image bumps)
// will be handled by the operator's reconcile loop reading the CR
// spec on every tick; nothing to do here.
// Already provisioned. Re-apply just the CR spec so changes to the
// desired shape (resources, podMetadata, image, ...) converge onto the
// existing CR. Cheap + idempotent: skips the DB/Secret/REST pipeline;
// the operator only rolls the Deployment when the spec actually changes.
// (The operator can't add fields that aren't in the CR, so a spec change
// in the provisioner must be written back here — it won't appear on its
// own.)
inputs, err := c.lakekeeperInputs(ctx, w)
if err != nil {
log.Warn("Failed to resolve lakekeeper inputs for CR drift correction.", "error", err)
return
}
if err := c.lakekeeperProvisioner.EnsureCRSpec(ctx, w, inputs); err != nil {
log.Warn("Lakekeeper CR drift correction failed.", "error", err)
}
return
}

log := slog.With("org", w.OrgID, "phase", "lakekeeper")

inputs, err := c.lakekeeperInputs(ctx, w)
if err != nil {
log.Warn("Failed to resolve lakekeeper provisioning inputs.", "error", err)
Expand Down
42 changes: 33 additions & 9 deletions controlplane/provisioner/controller_lakekeeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,15 @@ func TestReconcileLakekeeper_SkipsWhenIcebergDisabled(t *testing.T) {
}
}

func TestReconcileLakekeeper_SkipsWhenAlreadyProvisioned(t *testing.T) {
func TestReconcileLakekeeper_DriftCorrectsWhenAlreadyProvisioned(t *testing.T) {
// An already-provisioned org (LakekeeperEndpoint set) must NOT be skipped:
// the Ready loop re-applies just the CR spec so changes to the desired shape
// (resources, podMetadata, image, ...) converge onto the existing CR. The
// full DB/Secret/REST pipeline must NOT run — only EnsureCR.
called := false
k8sClient, dyn, _ := newFakeLakekeeperClient()
p := NewLakekeeperProvisioner(newFakeStore(), k8sClient, WithImage("drift:test"))

store := newFakeStore()
store.warehouses["acme"] = &configstore.ManagedWarehouse{
OrgID: "acme",
Expand All @@ -119,15 +126,32 @@ func TestReconcileLakekeeper_SkipsWhenAlreadyProvisioned(t *testing.T) {
LakekeeperEndpoint: "http://lk-acme.lakekeeper.svc:8181/catalog",
},
}
c := NewControllerWithClient(store, nil, 0)
c.lakekeeperProvisioner = &LakekeeperProvisioner{}
c.lakekeeperInputs = func(_ context.Context, _ *configstore.ManagedWarehouse) (ProvisioningInputs, error) {
called = true
return ProvisioningInputs{}, nil
}
c := NewControllerWithClient(store, nil, 0).
WithLakekeeperProvisioner(p, func(_ context.Context, _ *configstore.ManagedWarehouse) (ProvisioningInputs, error) {
called = true
// S3 deliberately unset: drift correction re-applies only the CR
// spec, so it must not depend on S3/REST inputs.
return ProvisioningInputs{PGHost: "acme-pg", PGPort: 5432, PGSSLMode: "require"}, nil
})

c.reconcileLakekeeper(context.Background(), store.warehouses["acme"])
if called {
t.Errorf("inputs resolver should not be called when LakekeeperEndpoint already set")

if !called {
t.Fatalf("inputs resolver should be called for CR drift correction when already provisioned")
}
// EnsureCRSpec create-or-updated the CR with the desired spec, without the
// DB/Secret/REST pipeline running.
got, err := dyn.Resource(lakekeeperGVR).Namespace(k8sClient.namespace).Get(context.Background(), LakekeeperResourceName("acme"), metav1.GetOptions{})
if err != nil {
t.Fatalf("CR not applied by drift correction: %v", err)
}
if spec := got.Object["spec"].(map[string]interface{}); spec["image"] != "drift:test" {
t.Errorf("CR image = %v, want drift:test", spec["image"])
}
// The warehouse row stays untouched — drift correction never re-runs the
// pipeline, so it doesn't rewrite the persisted warehouse name.
if store.warehouses["acme"].Iceberg.LakekeeperWarehouse != "" {
t.Errorf("drift correction unexpectedly wrote the warehouse row")
}
}

Expand Down
66 changes: 49 additions & 17 deletions controlplane/provisioner/lakekeeper_provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,23 +233,7 @@ func (p *LakekeeperProvisioner) EnsureForOrg(ctx context.Context, w *configstore
}

// 3. Apply the Lakekeeper CR pointing at the org's PG + the Secret.
pgPort := in.PGPort
if pgPort == 0 {
pgPort = 5432
}
if err := p.k8s.EnsureCR(ctx, LakekeeperCRSpec{
OrgID: w.OrgID,
Image: p.image,
Replicas: 1,
PGHost: in.PGHost,
PGPort: pgPort,
PGDatabase: dbName,
SecretName: secretName,
BaseURI: baseURL,
PGSSLMode: in.PGSSLMode,
ServiceAccountName: LakekeeperServiceAccountName(w.OrgID),
KubernetesAuthAudiences: in.KubernetesAuthAudiences,
}); err != nil {
if err := p.k8s.EnsureCR(ctx, p.buildCRSpec(w, in)); err != nil {
return fmt.Errorf("ensure lakekeeper cr: %w", err)
}

Expand Down Expand Up @@ -335,6 +319,54 @@ func (p *LakekeeperProvisioner) EnsureForOrg(ctx context.Context, w *configstore
return nil
}

// buildCRSpec assembles the desired Lakekeeper CR spec for an org. Shared by
// EnsureForOrg (initial provisioning) and EnsureCRSpec (drift correction) so the
// two never diverge — every field the operator renders into the Deployment is
// defined in exactly one place.
func (p *LakekeeperProvisioner) buildCRSpec(w *configstore.ManagedWarehouse, in ProvisioningInputs) LakekeeperCRSpec {
dbName := lakekeeperDBName(w.OrgID)
if in.PGPreProvisioned {
dbName = in.PGDatabase
}
pgPort := in.PGPort
if pgPort == 0 {
pgPort = 5432
}
resourceName := LakekeeperResourceName(w.OrgID)
return LakekeeperCRSpec{
OrgID: w.OrgID,
Image: p.image,
Replicas: 1,
PGHost: in.PGHost,
PGPort: pgPort,
PGDatabase: dbName,
SecretName: resourceName,
BaseURI: fmt.Sprintf("http://%s.%s.svc:8181", resourceName, p.k8s.namespace),
PGSSLMode: in.PGSSLMode,
ServiceAccountName: LakekeeperServiceAccountName(w.OrgID),
KubernetesAuthAudiences: in.KubernetesAuthAudiences,
}
}

// EnsureCRSpec re-applies only the Lakekeeper CR spec for an org, skipping the
// database / Secret / REST-warehouse pipeline. It's the drift-correction path
// for already-provisioned orgs: when the desired CR shape changes (resources,
// podMetadata, image, ...), the controller's Ready loop calls this so existing
// CRs converge without re-running full provisioning. Idempotent — EnsureCR does
// create-or-update and preserves the operator-owned status. Field validation is
// delegated to EnsureCR (OrgID/Image/PGHost/PGDatabase/SecretName required), so
// a momentarily-incomplete inputs resolution surfaces as a logged error and a
// skipped tick rather than a partial write.
func (p *LakekeeperProvisioner) EnsureCRSpec(ctx context.Context, w *configstore.ManagedWarehouse, in ProvisioningInputs) error {
if w == nil {
return errors.New("EnsureCRSpec: warehouse is nil")
}
if !isValidOrgIDLabel(w.OrgID) {
return fmt.Errorf("EnsureCRSpec: orgID %q is not a valid K8s label value", w.OrgID)
}
return p.k8s.EnsureCR(ctx, p.buildCRSpec(w, in))
}

// DeleteForOrg tears down the per-org Lakekeeper instance that EnsureForOrg
// created: the CR (which cascades to the operator-managed Deployment, Service,
// and migration Job via ownerReferences) plus the standalone Secret and
Expand Down
Loading