diff --git a/internal/controller/clickhouse/commands.go b/internal/controller/clickhouse/commands.go index e3643b2..11cb03a 100644 --- a/internal/controller/clickhouse/commands.go +++ b/internal/controller/clickhouse/commands.go @@ -7,7 +7,6 @@ import ( "fmt" "net" "strconv" - "sync" "github.com/ClickHouse/clickhouse-go/v2" "github.com/google/uuid" @@ -57,14 +56,16 @@ type commander struct { auth clickhouse.Auth dialer controllerutil.DialContextFunc - lock sync.RWMutex - conns map[v1.ClickHouseReplicaID]clickhouse.Conn + entry *connCacheEntry } -func newCommander(log controllerutil.Logger, cluster *v1.ClickHouseCluster, secret *corev1.Secret, dialer controllerutil.DialContextFunc) *commander { +func newCommander(log controllerutil.Logger, cluster *v1.ClickHouseCluster, secret *corev1.Secret, dialer controllerutil.DialContextFunc, cache *connCache) *commander { + log = log.Named("commander") + credHash, _ := controllerutil.DeepHashObject(secret.Data[SecretKeyManagementPassword]) + return &commander{ - log: log.Named("commander"), - conns: map[v1.ClickHouseReplicaID]clickhouse.Conn{}, + log: log, + entry: cache.Get(cluster.NamespacedName(), credHash, log), cluster: cluster, dialer: dialer, auth: clickhouse.Auth{ @@ -74,19 +75,6 @@ func newCommander(log controllerutil.Logger, cluster *v1.ClickHouseCluster, secr } } -func (cmd *commander) Close() { - cmd.lock.Lock() - defer cmd.lock.Unlock() - - for id, conn := range cmd.conns { - if err := conn.Close(); err != nil { - cmd.log.Warn("error closing connection", "error", err, "replica_id", id) - } - } - - cmd.conns = map[v1.ClickHouseReplicaID]clickhouse.Conn{} -} - type replicaProbe struct { Version string ReloadConfigRevision string @@ -474,53 +462,28 @@ func (cmd *commander) CleanupDatabaseReplicas( } func (cmd *commander) getConn(id v1.ClickHouseReplicaID) (clickhouse.Conn, error) { - cmd.lock.RLock() - conn, ok := cmd.conns[id] - cmd.lock.RUnlock() - - if ok { - return conn, nil - } - - cmd.lock.Lock() - defer cmd.lock.Unlock() + return cmd.entry.Conn(id, func() (clickhouse.Conn, error) { + cmd.log.Debug("creating new ClickHouse connection", "replica_id", id) + + conn, err := clickhouse.Open(&clickhouse.Options{ + Addr: []string{net.JoinHostPort(cmd.cluster.HostnameByID(id), strconv.FormatInt(int64(PortManagement), 10))}, + Auth: cmd.auth, + DialContext: cmd.dialer, + Debugf: func(format string, args ...any) { + cmd.log.Debug(fmt.Sprintf(format, args...)) + }, + }) + if err != nil { + cmd.log.Error(err, "failed to open ClickHouse connection", "replica_id", id) + return nil, fmt.Errorf("open ClickHouse connection: %w", err) + } - // Check if another goroutine created the connection while we were waiting for the lock - if conn, ok := cmd.conns[id]; ok { return conn, nil - } - - cmd.log.Debug("creating new ClickHouse connection", "replica_id", id) - - conn, err := clickhouse.Open(&clickhouse.Options{ - Addr: []string{net.JoinHostPort(cmd.cluster.HostnameByID(id), strconv.FormatInt(int64(PortManagement), 10))}, - Auth: cmd.auth, - DialContext: cmd.dialer, - Debugf: func(format string, args ...any) { - cmd.log.Debug(fmt.Sprintf(format, args...)) - }, }) - if err != nil { - cmd.log.Error(err, "failed to open ClickHouse connection", "replica_id", id) - return nil, fmt.Errorf("open ClickHouse connection: %w", err) - } - - cmd.conns[id] = conn - - return conn, nil } func (cmd *commander) getAnyConn(ctx context.Context) (v1.ClickHouseReplicaID, clickhouse.Conn, error) { - cmd.lock.RLock() - defer cmd.lock.RUnlock() - - for id, conn := range cmd.conns { - if conn.Ping(ctx) == nil { - return id, conn, nil - } - } - - return v1.ClickHouseReplicaID{}, nil, errors.New("no available connections") + return cmd.entry.AnyConn(ctx) } func (cmd *commander) ensureReplicaDefaultDatabaseEngine(ctx context.Context, log controllerutil.Logger, id v1.ClickHouseReplicaID) error { diff --git a/internal/controller/clickhouse/commands_test.go b/internal/controller/clickhouse/commands_test.go index 39a18a2..f48127b 100644 --- a/internal/controller/clickhouse/commands_test.go +++ b/internal/controller/clickhouse/commands_test.go @@ -9,7 +9,6 @@ import ( "strings" "time" - "github.com/ClickHouse/clickhouse-go/v2" "github.com/go-logr/zapr" "github.com/google/uuid" "github.com/moby/moby/api/types/container" @@ -18,6 +17,7 @@ import ( "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/network" "github.com/testcontainers/testcontainers-go/wait" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" @@ -32,7 +32,6 @@ const ( keeperHostname = "test-keeper" clickhouseHostnameFormat = "test-clickhouse-0-%d-0" testPassword = "test-password" - testUsername = "operator" keeperImage = "clickhouse/clickhouse-keeper:26.5" clickhouseImage = "clickhouse/clickhouse-server:26.5" testConfigRevision = "test-revision-v1" @@ -150,7 +149,17 @@ var _ = Describe("commander", Ordered, Label("integration"), func() { chPort := strconv.FormatInt(PortNative, 10) + "/tcp" chHTTPPort := strconv.FormatInt(PortHTTP, 10) + "/tcp" - conns := map[v1.ClickHouseReplicaID]clickhouse.Conn{} + hostTargets := map[string]string{} + cluster := v1.ClickHouseCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "default", + }, + Spec: v1.ClickHouseClusterSpec{ + Shards: new(int32(1)), + Replicas: new(testReplicas), + }, + } for i := range testReplicas { By(fmt.Sprintf("starting ClickHouse node %d", i)) @@ -194,36 +203,33 @@ var _ = Describe("commander", Ordered, Label("integration"), func() { port, err := ctr.MappedPort(ctx, chPort) Expect(err).NotTo(HaveOccurred()) - conn, err := clickhouse.Open(&clickhouse.Options{ - Addr: []string{net.JoinHostPort(host, port.Port())}, - Auth: clickhouse.Auth{Username: testUsername, Password: testPassword}, - }) - Expect(err).NotTo(HaveOccurred()) - Expect(conn.Ping(ctx)).To(Succeed()) - - conns[v1.ClickHouseReplicaID{ShardID: 0, Index: i}] = conn + hostTargets[cluster.HostnameByID(v1.ClickHouseReplicaID{Index: i})] = net.JoinHostPort(host, port.Port()) } - logger := zap.NewRaw(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)) - logf.SetLogger(zapr.NewLogger(logger)) + zapLogger := zap.NewRaw(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)) + logf.SetLogger(zapr.NewLogger(zapLogger)) + logger := controllerutil.NewLogger(zapLogger) - cmd = &commander{ - log: controllerutil.NewLogger(logger), - cluster: &v1.ClickHouseCluster{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - Namespace: "default", - }, - Spec: v1.ClickHouseClusterSpec{ - Shards: new(int32(1)), - Replicas: new(testReplicas), - }, - }, - auth: clickhouse.Auth{Username: testUsername, Password: testPassword}, - conns: conns, + dialer := func(ctx context.Context, addr string) (net.Conn, error) { + host, _, err := net.SplitHostPort(addr) + if err != nil { + return nil, fmt.Errorf("split addr %q: %w", addr, err) + } + + target, ok := hostTargets[host] + if !ok { + return nil, fmt.Errorf("no test container for host %q", host) + } + + return (&net.Dialer{}).DialContext(ctx, "tcp", target) } + + secret := &corev1.Secret{Data: map[string][]byte{SecretKeyManagementPassword: []byte(testPassword)}} + cache := newConnCache() + cmd = newCommander(logger, &cluster, secret, dialer, cache) + DeferCleanup(func() { - cmd.Close() + cache.Close(logger) }) }) @@ -261,8 +267,9 @@ var _ = Describe("commander", Ordered, Label("integration"), func() { id0 := v1.ClickHouseReplicaID{ShardID: 0, Index: 0} dbUUID := uuid.New().String() q := `CREATE DATABASE testdb UUID '%s' ENGINE=Replicated('/clickhouse/databases/testdb', '{shard}', '{replica}')` - err := cmd.conns[id0].Exec(ctx, fmt.Sprintf(q, dbUUID)) + conn, err := cmd.getConn(id0) Expect(err).NotTo(HaveOccurred()) + Expect(conn.Exec(ctx, fmt.Sprintf(q, dbUUID))).To(Succeed()) dbs, err := cmd.Databases(ctx, id0) Expect(err).NotTo(HaveOccurred()) @@ -286,7 +293,8 @@ var _ = Describe("commander", Ordered, Label("integration"), func() { It("should sync all replicas in shard", func(ctx context.Context) { By("creating test tables") - conn := cmd.conns[v1.ClickHouseReplicaID{ShardID: 0, Index: 0}] + conn, err := cmd.getConn(v1.ClickHouseReplicaID{ShardID: 0, Index: 0}) + Expect(err).NotTo(HaveOccurred()) Expect(conn.Exec(ctx, `CREATE TABLE testdb.test (id UInt64) ENGINE=ReplicatedMergeTree ORDER BY id`)).To(Succeed()) Expect(conn.Exec(ctx, `INSERT INTO testdb.test SELECT number FROM numbers(10)`)).To(Succeed()) @@ -301,9 +309,12 @@ var _ = Describe("commander", Ordered, Label("integration"), func() { }, "5s", "100ms").To(Succeed()) var count uint64 - for id, conn := range cmd.conns { + for id := range cmd.cluster.ReplicaIDs() { By(fmt.Sprintf("verifying data is replicated to replica %d", id.Index)) + conn, err := cmd.getConn(id) + Expect(err).NotTo(HaveOccurred()) + row := conn.QueryRow(ctx, `SELECT count() FROM testdb.test`) Expect(row.Err()).ToNot(HaveOccurred()) Expect(row.Scan(&count)).To(Succeed()) diff --git a/internal/controller/clickhouse/conncache.go b/internal/controller/clickhouse/conncache.go new file mode 100644 index 0000000..0098bcd --- /dev/null +++ b/internal/controller/clickhouse/conncache.go @@ -0,0 +1,132 @@ +package clickhouse + +import ( + "context" + "errors" + "maps" + "sync" + + "github.com/ClickHouse/clickhouse-go/v2" + "k8s.io/apimachinery/pkg/types" + + v1 "github.com/ClickHouse/clickhouse-operator/api/v1alpha1" + "github.com/ClickHouse/clickhouse-operator/internal/controllerutil" +) + +type connCache struct { + mu sync.Mutex + entries map[types.NamespacedName]*connCacheEntry +} + +type connCacheEntry struct { + mu sync.Mutex + credHash string + conns map[v1.ClickHouseReplicaID]clickhouse.Conn +} + +func newConnCache() *connCache { + return &connCache{entries: map[types.NamespacedName]*connCacheEntry{}} +} + +func (c *connCache) Get(key types.NamespacedName, credHash string, log controllerutil.Logger) *connCacheEntry { + if c == nil { // no pool wired (tests) + return &connCacheEntry{credHash: credHash, conns: map[v1.ClickHouseReplicaID]clickhouse.Conn{}} + } + + c.mu.Lock() + defer c.mu.Unlock() + + e, ok := c.entries[key] + if !ok { + e = &connCacheEntry{credHash: credHash, conns: map[v1.ClickHouseReplicaID]clickhouse.Conn{}} + c.entries[key] = e + + return e + } + + if e.credHash == credHash { + return e + } + + e.Close(log) + e = &connCacheEntry{credHash: credHash, conns: map[v1.ClickHouseReplicaID]clickhouse.Conn{}} + c.entries[key] = e + + return e +} + +func (c *connCache) Evict(key types.NamespacedName, log controllerutil.Logger) { + if c == nil { + return + } + + c.mu.Lock() + defer c.mu.Unlock() + + if e, ok := c.entries[key]; ok { + e.Close(log) + delete(c.entries, key) + } +} + +func (c *connCache) Close(log controllerutil.Logger) { + if c == nil { + return + } + + c.mu.Lock() + defer c.mu.Unlock() + + for key, e := range c.entries { + e.Close(log) + delete(c.entries, key) + } +} + +func (e *connCacheEntry) Conn(id v1.ClickHouseReplicaID, dial func() (clickhouse.Conn, error)) (clickhouse.Conn, error) { + e.mu.Lock() + defer e.mu.Unlock() + + if conn, ok := e.conns[id]; ok { + return conn, nil + } + + conn, err := dial() + if err != nil { + return nil, err + } + + e.conns[id] = conn + + return conn, nil +} + +func (e *connCacheEntry) AnyConn(ctx context.Context) (v1.ClickHouseReplicaID, clickhouse.Conn, error) { + conns := func() map[v1.ClickHouseReplicaID]clickhouse.Conn { + e.mu.Lock() + defer e.mu.Unlock() + + return maps.Clone(e.conns) + }() + + for id, conn := range conns { + if conn.Ping(ctx) == nil { + return id, conn, nil + } + } + + return v1.ClickHouseReplicaID{}, nil, errors.New("no available connections") +} + +func (e *connCacheEntry) Close(log controllerutil.Logger) { + e.mu.Lock() + defer e.mu.Unlock() + + for id, conn := range e.conns { + if err := conn.Close(); err != nil { + log.Warn("error closing pooled connection", "error", err, "replica_id", id) + } + } + + clear(e.conns) +} diff --git a/internal/controller/clickhouse/controller.go b/internal/controller/clickhouse/controller.go index 388d966..22253e6 100644 --- a/internal/controller/clickhouse/controller.go +++ b/internal/controller/clickhouse/controller.go @@ -17,6 +17,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -38,6 +39,7 @@ type ClusterController struct { Checker *upgrade.Checker Dialer controllerutil.DialContextFunc EnablePDB bool + connCache *connCache } const keeperClusterReferenceField = "clickhouse.com/keeperClusterReference" @@ -72,6 +74,8 @@ func (cc *ClusterController) Reconcile(ctx context.Context, req ctrl.Request) (c if err != nil { if errors.IsNotFound(err) { cc.Logger.Info("clickhouse cluster not found") + cc.connCache.Evict(req.NamespacedName, cc.Logger) + return ctrl.Result{}, nil } @@ -117,6 +121,7 @@ func (cc *ClusterController) Reconcile(ctx context.Context, req ctrl.Request) (c Dialer: cc.Dialer, Checker: cc.Checker, EnablePDB: cc.EnablePDB, + connCache: cc.connCache, Cluster: cluster, ReplicaState: map[v1.ClickHouseReplicaID]replicaState{}, @@ -165,6 +170,7 @@ func SetupWithManager(mgr ctrl.Manager, log controllerutil.Logger, checker *upgr Checker: checker, Dialer: dialer, EnablePDB: enablePDB, + connCache: newConnCache(), } if err := mgr.GetFieldIndexer().IndexField(context.Background(), &v1.ClickHouseCluster{}, keeperClusterReferenceField, func(obj client.Object) []string { @@ -202,6 +208,16 @@ func SetupWithManager(mgr ctrl.Manager, log controllerutil.Logger, checker *upgr return fmt.Errorf("setup ClickHouse controller: %w", err) } + // Close pooled connections on manager stop so ClickHouse sees a graceful close. + if err := mgr.Add(manager.RunnableFunc(func(ctx context.Context) error { + <-ctx.Done() + clickhouseController.connCache.Close(namedLogger) + + return nil + })); err != nil { + return fmt.Errorf("close cached ClickHouse connections: %w", err) + } + return nil } diff --git a/internal/controller/clickhouse/sync.go b/internal/controller/clickhouse/sync.go index ed2efcf..8dd01a8 100644 --- a/internal/controller/clickhouse/sync.go +++ b/internal/controller/clickhouse/sync.go @@ -88,6 +88,7 @@ type clickhouseReconciler struct { Dialer ctrlutil.DialContextFunc Checker *upgrade.Checker EnablePDB bool + connCache *connCache Cluster *v1.ClickHouseCluster ReplicaState map[v1.ClickHouseReplicaID]replicaState @@ -119,12 +120,6 @@ func (r *clickhouseReconciler) sync(ctx context.Context, log ctrlutil.Logger) (c v1.ClickHouseConditionTypeSchemaInSync, }) - defer func() { - if r.commander != nil { - r.commander.Close() - } - }() - steps := []chctrl.ReconcileStep{ {Name: "VersionProbe", Fn: r.reconcileVersionProbe, Always: true}, {Name: "Service", Fn: r.reconcileService, Always: true}, @@ -252,13 +247,13 @@ func (r *clickhouseReconciler) reconcileClusterSecret(ctx context.Context, log c secretExists = false } else { - r.commander = newCommander(log, r.Cluster, &r.secret, r.Dialer) + r.commander = newCommander(log, r.Cluster, &r.secret, r.Dialer, r.connCache) } if !r.versionProbe.Completed() { log.Info("version probe is not completed yet, skipping cluster secret templating") - return chctrl.StepBlocked(chctrl.RequeueOnRefreshTimeout), nil + return chctrl.StepBlocked(chctrl.RequeueProbePoll), nil } var isSecretUpdated bool @@ -274,7 +269,7 @@ func (r *clickhouseReconciler) reconcileClusterSecret(ctx context.Context, log c } // Create or recreate commander with new credentials - r.commander = newCommander(log, r.Cluster, &r.secret, r.Dialer) + r.commander = newCommander(log, r.Cluster, &r.secret, r.Dialer, r.connCache) if !secretExists { log.Info("cluster secret not found, creating", "secret", r.Cluster.SecretName()) @@ -335,12 +330,12 @@ func (r *clickhouseReconciler) reconcileExternalSecret(ctx context.Context, log r.secret.Data = make(map[string][]byte) } - r.commander = newCommander(log, r.Cluster, &r.secret, r.Dialer) + r.commander = newCommander(log, r.Cluster, &r.secret, r.Dialer, r.connCache) if !r.versionProbe.Completed() { log.Info("version probe is not completed yet, skipping external secret validation") - return chctrl.StepBlocked(chctrl.RequeueOnRefreshTimeout), nil + return chctrl.StepBlocked(chctrl.RequeueProbePoll), nil } var missingKeys []int @@ -388,7 +383,7 @@ func (r *clickhouseReconciler) reconcileExternalSecret(ctx context.Context, log return chctrl.StepResult{}, fmt.Errorf("fill external secret %q: %w", r.Cluster.SecretName(), err) } - r.commander = newCommander(log, r.Cluster, &r.secret, r.Dialer) + r.commander = newCommander(log, r.Cluster, &r.secret, r.Dialer, r.connCache) r.SetCondition(metav1.Condition{ Type: v1.ClickHouseConditionTypeExternalSecretValid, @@ -618,7 +613,7 @@ func (r *clickhouseReconciler) reconcileClusterRevisions(ctx context.Context, lo if !r.versionProbe.Completed() { log.Info("version probe is not completed yet, waiting") - return chctrl.StepBlocked(chctrl.RequeueOnRefreshTimeout), nil + return chctrl.StepBlocked(chctrl.RequeueProbePoll), nil } cfgRev, err := getConfigurationRevisions(r) @@ -732,7 +727,7 @@ func (r *clickhouseReconciler) reconcileReplicaResources(ctx context.Context, lo case chctrl.StageNotReadyUpToDate, chctrl.StageUpdating: log.Info("waiting for updated replicas to become ready", "replicas", replicasInStatus, "priority", highestStage.String()) - requeueAfter = chctrl.RequeueOnRefreshTimeout + requeueAfter = chctrl.RequeueProbePoll case chctrl.StageHasDiff: // Leave one replica to rolling update. replicasInStatus must not be empty. // Prefer replicas with higher id. @@ -745,7 +740,7 @@ func (r *clickhouseReconciler) reconcileReplicaResources(ctx context.Context, lo log.Info(fmt.Sprintf("updating chosen replica %v with priority %s: %v", chosenReplica, highestStage.String(), replicasInStatus)) - requeueAfter = chctrl.RequeueOnRefreshTimeout + requeueAfter = chctrl.RequeueProbePoll replicasInStatus = []v1.ClickHouseReplicaID{chosenReplica} case chctrl.StageNotExists, chctrl.StageError: @@ -856,7 +851,7 @@ func (r *clickhouseReconciler) reconcileDatabaseSync(ctx context.Context, log ct Message: "Some databases are not created on all replicas", }) - return chctrl.StepRequeue(chctrl.RequeueOnRefreshTimeout), nil + return chctrl.StepRequeue(chctrl.RequeueProbePoll), nil case !staleReplicasCleanedUp: r.SetCondition(metav1.Condition{ @@ -866,7 +861,7 @@ func (r *clickhouseReconciler) reconcileDatabaseSync(ctx context.Context, log ct Message: "Some stale replicas are not cleaned up", }) - return chctrl.StepRequeue(chctrl.RequeueOnRefreshTimeout), nil + return chctrl.StepRequeue(chctrl.RequeueProbePoll), nil default: r.SetCondition(metav1.Condition{ diff --git a/internal/controller/constants.go b/internal/controller/constants.go index 259864c..cdfa6a5 100644 --- a/internal/controller/constants.go +++ b/internal/controller/constants.go @@ -8,6 +8,7 @@ import ( const ( RequeueOnRefreshTimeout = time.Second + RequeueProbePoll = 5 * time.Second LoadReplicaStateTimeout = 10 * time.Second WarningsPollInterval = 30 * time.Second TLSFileMode int32 = 0444 diff --git a/internal/controller/keeper/sync.go b/internal/controller/keeper/sync.go index 2e5196e..674c9d9 100644 --- a/internal/controller/keeper/sync.go +++ b/internal/controller/keeper/sync.go @@ -299,7 +299,7 @@ func (r *keeperReconciler) reconcileActiveReplicaStatus(ctx context.Context, log r.evaluateReplicaConditions() if !r.HorizontalScaleAllowed { - return chctrl.StepRequeue(chctrl.RequeueOnRefreshTimeout), nil + return chctrl.StepRequeue(chctrl.RequeueProbePoll), nil } return chctrl.StepContinue(), nil @@ -374,7 +374,7 @@ func (r *keeperReconciler) reconcileQuorumMembership(_ context.Context, log ctrl // For running cluster, allow quorum membership changes only in stable state. if !r.HorizontalScaleAllowed { log.Info("Delaying horizontal scaling, cluster is not in stable state") - return chctrl.StepRequeue(chctrl.RequeueOnRefreshTimeout), nil + return chctrl.StepRequeue(chctrl.RequeueProbePoll), nil } // Add single replica in quorum, allocating the first free id. @@ -481,7 +481,7 @@ func (r *keeperReconciler) reconcileReplicaResources(ctx context.Context, log ct case chctrl.StageNotReadyUpToDate, chctrl.StageUpdating: log.Info("waiting for updated replicas to become ready", "replicas", replicasInStatus, "priority", highestStage.String()) - requeueAfter = chctrl.RequeueOnRefreshTimeout + requeueAfter = chctrl.RequeueProbePoll case chctrl.StageHasDiff: // Leave one replica to rolling update. replicasInStatus must not be empty. // Prefer replicas with higher id. @@ -494,7 +494,7 @@ func (r *keeperReconciler) reconcileReplicaResources(ctx context.Context, log ct log.Info(fmt.Sprintf("updating chosen replica %d with priority %s: %v", chosenReplica, highestStage.String(), replicasInStatus)) - requeueAfter = chctrl.RequeueOnRefreshTimeout + requeueAfter = chctrl.RequeueProbePoll replicasInStatus = []v1.KeeperReplicaID{chosenReplica} case chctrl.StageNotExists, chctrl.StageError: diff --git a/internal/controller/resourcemanager.go b/internal/controller/resourcemanager.go index a435996..d0981d1 100644 --- a/internal/controller/resourcemanager.go +++ b/internal/controller/resourcemanager.go @@ -264,7 +264,7 @@ func (rm *ResourceManager) ReconcileReplicaResources( return nil, fmt.Errorf("create replica: %w", err) } - return &ctrlruntime.Result{RequeueAfter: RequeueOnRefreshTimeout}, nil + return &ctrlruntime.Result{RequeueAfter: RequeueProbePoll}, nil } statefulSet.Spec.VolumeClaimTemplates = input.Existing.STS.Spec.VolumeClaimTemplates @@ -286,7 +286,7 @@ func (rm *ResourceManager) ReconcileReplicaResources( return nil, fmt.Errorf("recreate replica: %w", err) } - return &ctrlruntime.Result{RequeueAfter: RequeueOnRefreshTimeout}, nil + return &ctrlruntime.Result{RequeueAfter: RequeueProbePoll}, nil } } @@ -294,7 +294,7 @@ func (rm *ResourceManager) ReconcileReplicaResources( log.Debug("StatefulSet is up to date", "statefulset", statefulSet.Name) if configChanged { - return &ctrlruntime.Result{RequeueAfter: RequeueOnRefreshTimeout}, nil + return &ctrlruntime.Result{RequeueAfter: RequeueProbePoll}, nil } // Delete stuck pod if in error state so the StatefulSet controller can recreate it @@ -305,12 +305,12 @@ func (rm *ResourceManager) ReconcileReplicaResources( err = rm.ctrl.GetClient().Get(ctx, types.NamespacedName{Namespace: input.Existing.STS.Namespace, Name: podName}, pod) if err != nil { if k8serrors.IsNotFound(err) { - return &ctrlruntime.Result{RequeueAfter: RequeueOnRefreshTimeout}, nil + return &ctrlruntime.Result{RequeueAfter: RequeueProbePoll}, nil } log.Warn("failed to get error pod", "pod", podName, "error", err) - return &ctrlruntime.Result{RequeueAfter: RequeueOnRefreshTimeout}, nil + return &ctrlruntime.Result{RequeueAfter: RequeueProbePoll}, nil } if pod.Labels[appsv1.ControllerRevisionHashLabelKey] != input.Existing.STS.Status.UpdateRevision { @@ -320,7 +320,7 @@ func (rm *ResourceManager) ReconcileReplicaResources( log.Warn("failed to delete stuck pod", "pod", podName, "error", err) } - return &ctrlruntime.Result{RequeueAfter: RequeueOnRefreshTimeout}, nil + return &ctrlruntime.Result{RequeueAfter: RequeueProbePoll}, nil } } @@ -340,7 +340,7 @@ func (rm *ResourceManager) ReconcileReplicaResources( return nil, fmt.Errorf("update replica: %w", err) } - return &ctrlruntime.Result{RequeueAfter: RequeueOnRefreshTimeout}, nil + return &ctrlruntime.Result{RequeueAfter: RequeueProbePoll}, nil } func diffFilter(specFields []string) gcmp.Option {