Skip to content

Commit 36ba3c3

Browse files
fix(reconcile): refactor pvc reconcillation, add conflict retries (#139)
1 parent 72ded3f commit 36ba3c3

11 files changed

Lines changed: 439 additions & 105 deletions

File tree

internal/controller/clickhouse/controller_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@ import (
1212
corev1 "k8s.io/api/core/v1"
1313
policyv1 "k8s.io/api/policy/v1"
1414
"k8s.io/apimachinery/pkg/api/meta"
15+
"k8s.io/apimachinery/pkg/api/resource"
1516
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1617
"k8s.io/apimachinery/pkg/types"
1718
"k8s.io/client-go/kubernetes/scheme"
1819
"k8s.io/client-go/tools/events"
1920
"k8s.io/utils/ptr"
2021
ctrl "sigs.k8s.io/controller-runtime"
22+
"sigs.k8s.io/controller-runtime/pkg/client"
2123

2224
v1 "github.com/ClickHouse/clickhouse-operator/api/v1alpha1"
2325
"github.com/ClickHouse/clickhouse-operator/internal/controller/testutil"
@@ -339,4 +341,84 @@ var _ = When("reconciling ClickHouseCluster", Ordered, func() {
339341
Expect(suite.Client.List(ctx, &pdbs, listOpts)).To(Succeed())
340342
Expect(pdbs.Items).To(BeEmpty())
341343
})
344+
345+
It("should update all replica resources, but not proceed to the next if failed", func(ctx context.Context) {
346+
By("creating a new cluster with DataVolumeClaimSpec")
347+
348+
pvcCR := &v1.ClickHouseCluster{
349+
ObjectMeta: metav1.ObjectMeta{
350+
Name: "pvc-test",
351+
Namespace: "default",
352+
},
353+
Spec: v1.ClickHouseClusterSpec{
354+
Replicas: ptr.To[int32](2),
355+
Shards: ptr.To[int32](1),
356+
KeeperClusterRef: &corev1.LocalObjectReference{Name: keeperName},
357+
DataVolumeClaimSpec: &corev1.PersistentVolumeClaimSpec{
358+
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
359+
Resources: corev1.VolumeResourceRequirements{
360+
Requests: corev1.ResourceList{
361+
corev1.ResourceStorage: resource.MustParse("10Gi"),
362+
},
363+
},
364+
},
365+
},
366+
}
367+
Expect(suite.Client.Create(ctx, pvcCR)).To(Succeed())
368+
369+
By("reconcile to create all resources including STS with VolumeClaimTemplates")
370+
371+
_, err := controller.Reconcile(ctx, ctrl.Request{NamespacedName: pvcCR.NamespacedName()})
372+
Expect(err).NotTo(HaveOccurred())
373+
Expect(suite.Client.Get(ctx, pvcCR.NamespacedName(), pvcCR)).To(Succeed())
374+
testutil.AssertEvents(recorder.Events, map[string]int{
375+
"ClusterNotReady": 1,
376+
})
377+
378+
By("marking StatefulSets as ready")
379+
testutil.ReconcileStatefulSets(ctx, pvcCR, suite)
380+
381+
By("recording STS state before PVC change")
382+
383+
replicaID := v1.ClickHouseReplicaID{ShardID: 0, Index: 1}
384+
stsName := pvcCR.StatefulSetNameByReplicaID(replicaID)
385+
386+
var sts appsv1.StatefulSet
387+
Expect(suite.Client.Get(ctx, types.NamespacedName{Namespace: pvcCR.Namespace, Name: stsName}, &sts)).To(Succeed())
388+
389+
By("make STS and PVC changes")
390+
391+
pvcCR.Spec.DataVolumeClaimSpec.StorageClassName = new("changed")
392+
pvcCR.Spec.ContainerTemplate.Image = v1.ContainerImage{Tag: "changed"}
393+
Expect(suite.Client.Update(ctx, pvcCR)).To(Succeed())
394+
395+
By("reconcile updated CR")
396+
397+
_, err = controller.Reconcile(ctx, ctrl.Request{NamespacedName: pvcCR.NamespacedName()})
398+
Expect(err).NotTo(HaveOccurred())
399+
testutil.AssertEvents(recorder.Events, map[string]int{
400+
"FailedUpdate": 1,
401+
})
402+
403+
By("ensuring STS updated")
404+
Expect(suite.Client.Get(ctx, pvcCR.NamespacedName(), pvcCR)).To(Succeed())
405+
Expect(suite.Client.Get(ctx, client.ObjectKeyFromObject(&sts), &sts)).To(Succeed())
406+
Expect(sts.Annotations[controllerutil.AnnotationSpecHash]).To(Equal(pvcCR.Status.StatefulSetRevision))
407+
408+
By("retry reconcile")
409+
410+
_, err = controller.Reconcile(ctx, ctrl.Request{NamespacedName: pvcCR.NamespacedName()})
411+
Expect(err).NotTo(HaveOccurred())
412+
testutil.AssertEvents(recorder.Events, map[string]int{
413+
"FailedUpdate": 1,
414+
})
415+
416+
By("ensuring next replica not changed")
417+
Expect(suite.Client.Get(ctx, pvcCR.NamespacedName(), pvcCR)).To(Succeed())
418+
Expect(suite.Client.Get(ctx, types.NamespacedName{
419+
Namespace: pvcCR.Namespace,
420+
Name: pvcCR.StatefulSetNameByReplicaID(v1.ClickHouseReplicaID{ShardID: 0, Index: 0}),
421+
}, &sts)).To(Succeed())
422+
Expect(sts.Annotations[controllerutil.AnnotationSpecHash]).ToNot(Equal(pvcCR.Status.StatefulSetRevision))
423+
})
342424
})

internal/controller/clickhouse/sync.go

Lines changed: 48 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ func compareReplicaID(a, b v1.ClickHouseReplicaID) int {
3434
type replicaState struct {
3535
Error bool `json:"error"`
3636
StatefulSet *appsv1.StatefulSet
37+
PVC *corev1.PersistentVolumeClaim
3738
Pinged bool
3839
Version string
3940
}
@@ -55,20 +56,30 @@ func (r replicaState) Ready() bool {
5556
return r.Pinged && r.StatefulSet.Status.ReadyReplicas == 1 // Not reliable, but allows to wait until pod is `green`
5657
}
5758

58-
func (r replicaState) HasStatefulSetDiff(rec *clickhouseReconciler) bool {
59+
func (r replicaState) HasDiff(rec *clickhouseReconciler) bool {
5960
if r.StatefulSet == nil {
6061
return true
6162
}
6263

63-
return ctrlutil.GetSpecHashFromObject(r.StatefulSet) != rec.Cluster.Status.StatefulSetRevision
64-
}
64+
if ctrlutil.GetSpecHashFromObject(r.StatefulSet) != rec.Cluster.Status.StatefulSetRevision {
65+
return true
66+
}
6567

66-
func (r replicaState) HasConfigMapDiff(rec *clickhouseReconciler) bool {
67-
if r.StatefulSet == nil {
68+
if ctrlutil.GetConfigHashFromObject(r.StatefulSet) != rec.Cluster.Status.ConfigurationRevision {
6869
return true
6970
}
7071

71-
return ctrlutil.GetConfigHashFromObject(r.StatefulSet) != rec.Cluster.Status.ConfigurationRevision
72+
if rec.Cluster.Spec.DataVolumeClaimSpec != nil {
73+
if r.PVC == nil {
74+
return true
75+
}
76+
77+
if ctrlutil.GetSpecHashFromObject(r.PVC) != rec.pvcRevision {
78+
return true
79+
}
80+
}
81+
82+
return false
7283
}
7384

7485
func (r replicaState) UpdateStage(rec *clickhouseReconciler) chctrl.ReplicaUpdateStage {
@@ -84,7 +95,7 @@ func (r replicaState) UpdateStage(rec *clickhouseReconciler) chctrl.ReplicaUpdat
8495
return chctrl.StageUpdating
8596
}
8697

87-
if r.HasConfigMapDiff(rec) || r.HasStatefulSetDiff(rec) {
98+
if r.HasDiff(rec) {
8899
return chctrl.StageHasDiff
89100
}
90101

@@ -109,6 +120,7 @@ type clickhouseReconciler struct {
109120
versionProbe chctrl.VersionProbeResult
110121
databasesInSync bool
111122
staleReplicasCleanedUp bool
123+
pvcRevision string
112124
}
113125

114126
type reconcileFunc func(context.Context, ctrlutil.Logger) (*ctrl.Result, error)
@@ -316,6 +328,13 @@ func (r *clickhouseReconciler) reconcileClusterRevisions(ctx context.Context, lo
316328
log.Debug(fmt.Sprintf("observed new StatefulSet revision %q", stsRevision))
317329
}
318330

331+
if r.Cluster.Spec.DataVolumeClaimSpec != nil {
332+
r.pvcRevision, err = ctrlutil.DeepHashObject(r.Cluster.Spec.DataVolumeClaimSpec)
333+
if err != nil {
334+
return nil, fmt.Errorf("get PVC revision: %w", err)
335+
}
336+
}
337+
319338
probeResult, err := r.VersionProbe(ctx, log, chctrl.VersionProbeConfig{
320339
Binary: "clickhouse-server",
321340
Labels: r.Cluster.Spec.Labels,
@@ -370,10 +389,19 @@ func (r *clickhouseReconciler) reconcileActiveReplicaStatus(ctx context.Context,
370389
}
371390
}
372391

392+
var pvc *corev1.PersistentVolumeClaim
393+
if r.Cluster.Spec.DataVolumeClaimSpec != nil {
394+
pvc, err = r.GetPVCByStatefulSet(ctx, log.With("replica_id", id), &sts)
395+
if err != nil {
396+
log.Error(err, "failed to get PVC for replica", "replica_id", id)
397+
}
398+
}
399+
373400
log.Debug("load replica state done", "replica_id", id, "statefulset", sts.Name)
374401

375402
return id, replicaState{
376403
StatefulSet: &sts,
404+
PVC: pvc,
377405
Error: hasError,
378406
Pinged: pinged,
379407
Version: version,
@@ -686,7 +714,7 @@ func (r *clickhouseReconciler) reconcileConditions(ctx context.Context, log ctrl
686714
hasReady = true
687715
}
688716

689-
if replica.HasConfigMapDiff(r) || replica.HasStatefulSetDiff(r) || !replica.Updated() {
717+
if replica.HasDiff(r) || !replica.Updated() {
690718
notUpdatedReplicas = append(notUpdatedReplicas, id)
691719
}
692720
}
@@ -812,15 +840,19 @@ func (r *clickhouseReconciler) updateReplica(ctx context.Context, log ctrlutil.L
812840

813841
replica := r.Replica(id)
814842

815-
result, err := r.ReconcileReplicaResources(ctx, log, id, chctrl.ReplicaUpdateInput{
816-
ExistingSTS: replica.StatefulSet,
817-
DesiredConfigMap: configMap,
818-
DesiredSTS: statefulSet,
819-
HasError: replica.Error,
843+
result, err := r.ReconcileReplicaResources(ctx, log, chctrl.ReplicaUpdateInput{
820844
ConfigurationRevision: r.Cluster.Status.ConfigurationRevision,
821-
StatefulSetRevision: r.Cluster.Status.StatefulSetRevision,
822-
BreakingSTSVersion: breakingStatefulSetVersion,
823-
DataVolumeClaimSpec: r.Cluster.Spec.DataVolumeClaimSpec,
845+
DesiredConfigMap: configMap,
846+
847+
StatefulSetRevision: r.Cluster.Status.StatefulSetRevision,
848+
ExistingSTS: replica.StatefulSet,
849+
DesiredSTS: statefulSet,
850+
HasError: replica.Error,
851+
BreakingSTSVersion: breakingStatefulSetVersion,
852+
853+
PVCRevision: r.pvcRevision,
854+
ExistingPVC: replica.PVC,
855+
DesiredPVCSpec: r.Cluster.Spec.DataVolumeClaimSpec,
824856
})
825857
if err != nil {
826858
return nil, fmt.Errorf("reconcile replica %s resources: %w", id, err)

internal/controller/clickhouse/templates.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,8 @@ func getStatefulSetRevision(r *clickhouseReconciler) (string, error) {
168168
return "", fmt.Errorf("generate template StatefulSet: %w", err)
169169
}
170170

171+
sts.Spec.VolumeClaimTemplates = nil
172+
171173
hash, err := controllerutil.DeepHashObject(sts)
172174
if err != nil {
173175
return "", fmt.Errorf("hash template StatefulSet: %w", err)

internal/controller/clickhouse/templates_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
. "github.com/onsi/gomega"
99
corev1 "k8s.io/api/core/v1"
1010
policyv1 "k8s.io/api/policy/v1"
11+
"k8s.io/apimachinery/pkg/api/resource"
1112
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1213
"k8s.io/apimachinery/pkg/util/intstr"
1314
"k8s.io/utils/ptr"
@@ -300,6 +301,41 @@ var _ = Describe("PDB", func() {
300301
})
301302
})
302303

304+
var _ = Describe("getStatefulSetRevision", func() {
305+
It("should not depend on data disk spec", func() {
306+
r := clickhouseReconciler{
307+
reconcilerBase: reconcilerBase{
308+
Cluster: &v1.ClickHouseCluster{
309+
ObjectMeta: metav1.ObjectMeta{
310+
Name: "test",
311+
},
312+
Spec: v1.ClickHouseClusterSpec{
313+
Replicas: ptr.To[int32](1),
314+
DataVolumeClaimSpec: &corev1.PersistentVolumeClaimSpec{
315+
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
316+
Resources: corev1.VolumeResourceRequirements{
317+
Requests: corev1.ResourceList{
318+
corev1.ResourceStorage: resource.MustParse("10Gi"),
319+
},
320+
},
321+
},
322+
},
323+
},
324+
},
325+
}
326+
327+
rev, err := getStatefulSetRevision(&r)
328+
Expect(err).ToNot(HaveOccurred())
329+
Expect(rev).ToNot(BeEmpty())
330+
331+
r.Cluster.Spec.DataVolumeClaimSpec.Resources.Requests[corev1.ResourceStorage] = resource.MustParse("20Gi")
332+
rev2, err := getStatefulSetRevision(&r)
333+
Expect(err).ToNot(HaveOccurred())
334+
335+
Expect(rev2).To(Equal(rev), "StatefulSet revision should not change when data disk spec changes")
336+
})
337+
})
338+
303339
func checkVolumeMounts(volumes []corev1.Volume, mounts []corev1.VolumeMount) {
304340
volumeMap := map[string]struct{}{
305341
internal.PersistentVolumeName: {},

0 commit comments

Comments
 (0)