Skip to content

Commit eee3ad1

Browse files
test(e2e):Run controller in e2e tests process, dump namespace obejcts and pod logs on failure.
1 parent 36ba3c3 commit eee3ad1

27 files changed

Lines changed: 815 additions & 513 deletions

.github/workflows/ci.yaml

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -225,9 +225,6 @@ jobs:
225225
- name: Go Mod
226226
run: go mod download
227227

228-
- name: Build image
229-
run: make docker-build IMG="clickhouse.com/clickhouse-operator:v0.0.1" BUILD_TIME=e2e
230-
231228
- name: Run compatibility e2e tests
232229
run: make test-compat-e2e
233230
env:
@@ -238,7 +235,7 @@ jobs:
238235
if: ${{ !cancelled() }}
239236
with:
240237
name: compat-e2e-report-${{ strategy.job-index }}
241-
path: "**/report/*.xml"
238+
path: "**/report/*"
242239
if-no-files-found: error
243240
overwrite: true
244241

@@ -261,9 +258,6 @@ jobs:
261258
- name: Go Mod
262259
run: go mod download
263260

264-
- name: Build image
265-
run: make docker-build IMG="clickhouse.com/clickhouse-operator:v0.0.1" BUILD_TIME=e2e
266-
267261
- name: Create k8s Kind Cluster
268262
uses: helm/kind-action@v1
269263
with:
@@ -280,7 +274,7 @@ jobs:
280274
if: ${{ !cancelled() }}
281275
with:
282276
name: e2e-report-${{ matrix.scope }}
283-
path: "**/report/*.xml"
277+
path: "**/report/*"
284278
if-no-files-found: error
285279
overwrite: true
286280

.golangci.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,10 @@ linters:
103103
- "github.com/onsi/gomega"
104104
- name: package-comments
105105
disabled: true
106+
staticcheck:
107+
dot-import-whitelist:
108+
- "github.com/onsi/ginkgo/v2"
109+
- "github.com/onsi/gomega"
106110
gosec:
107111
excludes:
108112
- G204

cmd/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,11 +195,11 @@ func run() error {
195195
upgradeChecker = upgrade.NewChecker(updater)
196196
}
197197

198-
if err = keeper.SetupWithManager(mgr, zapLogger, upgradeChecker); err != nil {
198+
if err = keeper.SetupWithManager(mgr, zapLogger, upgradeChecker, nil); err != nil {
199199
return fmt.Errorf("unable to setup KeeperCluster controller: %w", err)
200200
}
201201

202-
if err = clickhouse.SetupWithManager(mgr, zapLogger, upgradeChecker); err != nil {
202+
if err = clickhouse.SetupWithManager(mgr, zapLogger, upgradeChecker, nil); err != nil {
203203
return fmt.Errorf("unable to setup ClickHouseCluster controller: %w", err)
204204
}
205205

internal/controller/clickhouse/commands.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"database/sql"
66
"errors"
77
"fmt"
8+
"net"
9+
"strconv"
810
"sync"
911

1012
"github.com/ClickHouse/clickhouse-go/v2"
@@ -53,16 +55,18 @@ type commander struct {
5355
log controllerutil.Logger
5456
cluster *v1.ClickHouseCluster
5557
auth clickhouse.Auth
58+
dialer controllerutil.DialContextFunc
5659

5760
lock sync.RWMutex
5861
conns map[v1.ClickHouseReplicaID]clickhouse.Conn
5962
}
6063

61-
func newCommander(log controllerutil.Logger, cluster *v1.ClickHouseCluster, secret *corev1.Secret) *commander {
64+
func newCommander(log controllerutil.Logger, cluster *v1.ClickHouseCluster, secret *corev1.Secret, dialer controllerutil.DialContextFunc) *commander {
6265
return &commander{
6366
log: log.Named("commander"),
6467
conns: map[v1.ClickHouseReplicaID]clickhouse.Conn{},
6568
cluster: cluster,
69+
dialer: dialer,
6670
auth: clickhouse.Auth{
6771
Username: OperatorManagementUsername,
6872
Password: string(secret.Data[SecretKeyManagementPassword]),
@@ -409,8 +413,9 @@ func (cmd *commander) getConn(id v1.ClickHouseReplicaID) (clickhouse.Conn, error
409413
cmd.log.Debug("creating new ClickHouse connection", "replica_id", id)
410414

411415
conn, err := clickhouse.Open(&clickhouse.Options{
412-
Addr: []string{fmt.Sprintf("%s:%d", cmd.cluster.HostnameByID(id), PortManagement)},
413-
Auth: cmd.auth,
416+
Addr: []string{net.JoinHostPort(cmd.cluster.HostnameByID(id), strconv.FormatInt(int64(PortManagement), 10))},
417+
Auth: cmd.auth,
418+
DialContext: cmd.dialer,
414419
Debugf: func(format string, args ...any) {
415420
cmd.log.Debug(fmt.Sprintf(format, args...))
416421
},

internal/controller/clickhouse/controller.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
corev1 "k8s.io/api/core/v1"
1010
policyv1 "k8s.io/api/policy/v1"
1111
"k8s.io/apimachinery/pkg/api/errors"
12-
"k8s.io/apimachinery/pkg/api/meta"
1312
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1413
"k8s.io/apimachinery/pkg/runtime"
1514
"k8s.io/apimachinery/pkg/types"
@@ -37,6 +36,7 @@ type ClusterController struct {
3736
Logger controllerutil.Logger
3837
Webhook webhookv1.ClickHouseClusterWebhook
3938
Checker *upgrade.Checker
39+
Dialer controllerutil.DialContextFunc
4040
}
4141

4242
// +kubebuilder:rbac:groups=clickhouse.com,resources=clickhouseclusters,verbs=get;list;watch;create;update;patch;delete
@@ -75,7 +75,7 @@ func (cc *ClusterController) Reconcile(ctx context.Context, req ctrl.Request) (c
7575
}
7676

7777
if _, err := cc.Webhook.ValidateCreate(ctx, cluster); err != nil {
78-
meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{
78+
chctrl.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{
7979
Type: string(v1.ConditionTypeSpecValid),
8080
Status: metav1.ConditionFalse,
8181
Reason: string(v1.ConditionReasonSpecInvalid),
@@ -90,7 +90,7 @@ func (cc *ClusterController) Reconcile(ctx context.Context, req ctrl.Request) (c
9090
return ctrl.Result{}, nil
9191
}
9292

93-
meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{
93+
chctrl.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{
9494
Type: string(v1.ConditionTypeSpecValid),
9595
Status: metav1.ConditionTrue,
9696
Reason: string(v1.ConditionReasonSpecValid),
@@ -129,8 +129,13 @@ func (cc *ClusterController) GetVersionChecker() *upgrade.Checker {
129129
return cc.Checker
130130
}
131131

132+
// GetDialer returns the custom dialer, or nil.
133+
func (cc *ClusterController) GetDialer() controllerutil.DialContextFunc {
134+
return cc.Dialer
135+
}
136+
132137
// SetupWithManager sets up the controller with the Manager.
133-
func SetupWithManager(mgr ctrl.Manager, log controllerutil.Logger, checker *upgrade.Checker) error {
138+
func SetupWithManager(mgr ctrl.Manager, log controllerutil.Logger, checker *upgrade.Checker, dialer controllerutil.DialContextFunc) error {
134139
namedLogger := log.Named("clickhouse")
135140

136141
clickhouseController := &ClusterController{
@@ -140,6 +145,7 @@ func SetupWithManager(mgr ctrl.Manager, log controllerutil.Logger, checker *upgr
140145
Logger: namedLogger,
141146
Webhook: webhookv1.ClickHouseClusterWebhook{Log: namedLogger},
142147
Checker: checker,
148+
Dialer: dialer,
143149
}
144150

145151
err := ctrl.NewControllerManagedBy(mgr).

internal/controller/clickhouse/controller_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package clickhouse
22

33
import (
44
"context"
5+
"errors"
6+
"net"
57
"testing"
68

79
. "github.com/onsi/ginkgo/v2"
@@ -75,6 +77,9 @@ var _ = When("reconciling ClickHouseCluster", Ordered, func() {
7577
Webhook: webhookv1.ClickHouseClusterWebhook{
7678
Log: suite.Log.Named("clickhouse-webhook"),
7779
},
80+
Dialer: func(context.Context, string) (net.Conn, error) {
81+
return nil, errors.New("disabled")
82+
},
7883
}
7984

8085
keeper := &v1.KeeperCluster{

internal/controller/clickhouse/sync.go

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ type clickhouseReconciler struct {
118118
commander *commander
119119

120120
versionProbe chctrl.VersionProbeResult
121+
readyReplicas int
121122
databasesInSync bool
122123
staleReplicasCleanedUp bool
123124
pvcRevision string
@@ -173,7 +174,7 @@ func (r *clickhouseReconciler) sync(ctx context.Context, log ctrlutil.Logger) (c
173174
unknownConditions = append(unknownConditions, r.NewCondition(cond, metav1.ConditionUnknown, v1.ConditionReasonStepFailed, errMsg))
174175
}
175176

176-
meta.SetStatusCondition(&unknownConditions, r.NewCondition(v1.ConditionTypeReconcileSucceeded, metav1.ConditionFalse, v1.ConditionReasonStepFailed, errMsg))
177+
chctrl.SetStatusCondition(&unknownConditions, r.NewCondition(v1.ConditionTypeReconcileSucceeded, metav1.ConditionFalse, v1.ConditionReasonStepFailed, errMsg))
177178
r.SetConditions(log, unknownConditions)
178179

179180
if updateErr := r.UpsertStatus(ctx, log); updateErr != nil {
@@ -272,7 +273,7 @@ func (r *clickhouseReconciler) reconcileCommonResources(ctx context.Context, log
272273
log.Debug("cluster secret is up to date")
273274
}
274275

275-
r.commander = newCommander(log, r.Cluster, &r.secret)
276+
r.commander = newCommander(log, r.Cluster, &r.secret, r.GetDialer())
276277

277278
return nil, nil
278279
}
@@ -297,14 +298,20 @@ func (r *clickhouseReconciler) reconcileClusterRevisions(ctx context.Context, lo
297298
Namespace: r.Cluster.Namespace,
298299
Name: r.Cluster.Spec.KeeperClusterRef.Name,
299300
}, &r.keeper); err != nil {
301+
if k8serrors.IsNotFound(err) {
302+
log.Debug("keeper cluster not found, waiting")
303+
304+
return &ctrl.Result{RequeueAfter: chctrl.RequeueOnRefreshTimeout}, nil
305+
}
306+
300307
return nil, fmt.Errorf("get keeper cluster: %w", err)
301308
}
302309

303310
if cond := meta.FindStatusCondition(r.keeper.Status.Conditions, string(v1.ConditionTypeReady)); cond == nil || cond.Status != metav1.ConditionTrue {
304311
if cond == nil {
305-
log.Warn("keeper cluster is not ready")
312+
log.Info("keeper cluster is not ready")
306313
} else {
307-
log.Warn("keeper cluster is not ready", "reason", cond.Reason, "message", cond.Message)
314+
log.Info("keeper cluster is not ready", "reason", cond.Reason, "message", cond.Message)
308315
}
309316
}
310317

@@ -377,7 +384,7 @@ func (r *clickhouseReconciler) reconcileActiveReplicaStatus(ctx context.Context,
377384
pinged := false
378385
version := ""
379386

380-
if !hasError {
387+
if !hasError && sts.Status.ReadyReplicas > 0 {
381388
ctx, cancel := context.WithTimeout(ctx, chctrl.LoadReplicaStateTimeout)
382389
defer cancel()
383390

@@ -419,6 +426,10 @@ func (r *clickhouseReconciler) reconcileActiveReplicaStatus(ctx context.Context,
419426
}
420427

421428
for id, state := range states {
429+
if state.Ready() {
430+
r.readyReplicas++
431+
}
432+
422433
if exists := r.SetReplica(id, state); exists {
423434
log.Debug(fmt.Sprintf("multiple StatefulSets for single replica %v", id),
424435
"replica_id", id, "statefulset", state.StatefulSet.Name)
@@ -491,7 +502,12 @@ func (r *clickhouseReconciler) reconcileReplicaResources(ctx context.Context, lo
491502

492503
func (r *clickhouseReconciler) reconcileReplicateSchema(ctx context.Context, log ctrlutil.Logger) (*ctrl.Result, error) {
493504
if !r.Cluster.Spec.Settings.EnableDatabaseSync {
494-
log.Info("database sync is disabled, skipping")
505+
log.Debug("database sync is disabled, skipping")
506+
return nil, nil
507+
}
508+
509+
if r.readyReplicas < 2 {
510+
log.Info("no ready replicas to replicate schema, skipping")
495511
return nil, nil
496512
}
497513

@@ -502,11 +518,6 @@ func (r *clickhouseReconciler) reconcileReplicateSchema(ctx context.Context, log
502518
}
503519
}
504520

505-
if readyReplicas == nil {
506-
log.Info("no ready replicas to replicate schema, skipping")
507-
return nil, nil
508-
}
509-
510521
hasNotSynced := false
511522
replicaDatabases := ctrlutil.ExecuteParallel(readyReplicas, func(id v1.ClickHouseReplicaID) (v1.ClickHouseReplicaID, map[string]databaseDescriptor, error) {
512523
if err := r.commander.EnsureDefaultDatabaseEngine(ctx, log, id); err != nil {
@@ -675,7 +686,7 @@ func (r *clickhouseReconciler) reconcileCleanUp(ctx context.Context, log ctrluti
675686
}
676687
}
677688

678-
if r.Cluster.Spec.Settings.EnableDatabaseSync {
689+
if r.Cluster.Spec.Settings.EnableDatabaseSync && r.readyReplicas > 0 {
679690
if err := r.commander.CleanupDatabaseReplicas(ctx, log, runningStaleReplicas); err != nil {
680691
log.Warn("failed to cleanup database replicas", "error", err)
681692

internal/controller/keeper/commands.go

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@ import (
1010
"strconv"
1111
"strings"
1212

13-
"golang.org/x/net/proxy"
14-
1513
"github.com/ClickHouse/clickhouse-operator/internal/controllerutil"
1614
)
1715

@@ -34,26 +32,26 @@ type serverStatus struct {
3432
Version string
3533
}
3634

37-
func getConnection(ctx context.Context, hostname string, tlsRequired bool) (net.Conn, error) {
38-
var d proxy.ContextDialer = &net.Dialer{}
39-
35+
func getConnection(ctx context.Context, dialer controllerutil.DialContextFunc, hostname string, tlsRequired bool) (net.Conn, error) {
4036
port := PortNative
4137
if tlsRequired {
42-
d = &tls.Dialer{
43-
NetDialer: &net.Dialer{},
44-
Config: &tls.Config{
45-
//nolint:gosec // User managed certificate may be outdated or issued for other hostnames.
46-
InsecureSkipVerify: true,
47-
},
48-
}
4938
port = PortNativeSecure
5039
}
5140

52-
conn, err := d.DialContext(ctx, "tcp", fmt.Sprintf("%s:%d", hostname, port))
41+
addr := net.JoinHostPort(hostname, strconv.FormatInt(int64(port), 10))
42+
43+
conn, err := dialer(ctx, addr)
5344
if err != nil {
5445
return nil, fmt.Errorf("connect to %s: %w", hostname, err)
5546
}
5647

48+
if tlsRequired {
49+
return tls.Client(conn, &tls.Config{
50+
//nolint:gosec // User managed certificate may be outdated or issued for other hostnames.
51+
InsecureSkipVerify: true,
52+
}), nil
53+
}
54+
5755
return conn, nil
5856
}
5957

@@ -126,8 +124,8 @@ func queryKeeper(ctx context.Context, log controllerutil.Logger, conn net.Conn)
126124
return result, nil
127125
}
128126

129-
func getServerStatus(ctx context.Context, log controllerutil.Logger, hostname string, tlsRequired bool) serverStatus {
130-
conn, err := getConnection(ctx, hostname, tlsRequired)
127+
func getServerStatus(ctx context.Context, log controllerutil.Logger, dialer controllerutil.DialContextFunc, hostname string, tlsRequired bool) serverStatus {
128+
conn, err := getConnection(ctx, dialer, hostname, tlsRequired)
131129
if err != nil {
132130
log.Info("failed to get keeper connection", "error", err)
133131
return serverStatus{}

internal/controller/keeper/controller.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
corev1 "k8s.io/api/core/v1"
1010
policyv1 "k8s.io/api/policy/v1"
1111
"k8s.io/apimachinery/pkg/api/errors"
12-
"k8s.io/apimachinery/pkg/api/meta"
1312
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1413
"k8s.io/apimachinery/pkg/runtime"
1514
"k8s.io/client-go/tools/events"
@@ -34,6 +33,7 @@ type ClusterController struct {
3433
Logger controllerutil.Logger
3534
Webhook webhookv1.KeeperClusterWebhook
3635
Checker *upgrade.Checker
36+
Dialer controllerutil.DialContextFunc
3737
}
3838

3939
// +kubebuilder:rbac:groups=clickhouse.com,resources=keeperclusters,verbs=get;list;watch;create;update;patch;delete
@@ -72,7 +72,7 @@ func (cc *ClusterController) Reconcile(ctx context.Context, req ctrl.Request) (c
7272
}
7373

7474
if _, err := cc.Webhook.ValidateCreate(ctx, cluster); err != nil {
75-
meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{
75+
chctrl.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{
7676
Type: string(v1.ConditionTypeSpecValid),
7777
Status: metav1.ConditionFalse,
7878
Reason: string(v1.ConditionReasonSpecInvalid),
@@ -87,7 +87,7 @@ func (cc *ClusterController) Reconcile(ctx context.Context, req ctrl.Request) (c
8787
return ctrl.Result{}, nil
8888
}
8989

90-
meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{
90+
chctrl.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{
9191
Type: string(v1.ConditionTypeSpecValid),
9292
Status: metav1.ConditionTrue,
9393
Reason: string(v1.ConditionReasonSpecValid),
@@ -127,8 +127,13 @@ func (cc *ClusterController) GetVersionChecker() *upgrade.Checker {
127127
return cc.Checker
128128
}
129129

130+
// GetDialer returns the custom dialer, or the default TCP dialer if none was set.
131+
func (cc *ClusterController) GetDialer() controllerutil.DialContextFunc {
132+
return cc.Dialer
133+
}
134+
130135
// SetupWithManager sets up the controller with the Manager.
131-
func SetupWithManager(mgr ctrl.Manager, log controllerutil.Logger, checker *upgrade.Checker) error {
136+
func SetupWithManager(mgr ctrl.Manager, log controllerutil.Logger, checker *upgrade.Checker, dialer controllerutil.DialContextFunc) error {
132137
namedLogger := log.Named("keeper")
133138

134139
keeperController := &ClusterController{
@@ -138,6 +143,7 @@ func SetupWithManager(mgr ctrl.Manager, log controllerutil.Logger, checker *upgr
138143
Logger: namedLogger,
139144
Webhook: webhookv1.KeeperClusterWebhook{Log: namedLogger},
140145
Checker: checker,
146+
Dialer: dialer,
141147
}
142148

143149
err := ctrl.NewControllerManagedBy(mgr).

0 commit comments

Comments
 (0)