Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 23 additions & 60 deletions internal/controller/clickhouse/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"net"
"strconv"
"sync"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/google/uuid"
Expand Down Expand Up @@ -57,14 +56,16 @@ type commander struct {
auth clickhouse.Auth
dialer controllerutil.DialContextFunc

lock sync.RWMutex
conns map[v1.ClickHouseReplicaID]clickhouse.Conn
entry *connCacheEntry
}

func newCommander(log controllerutil.Logger, cluster *v1.ClickHouseCluster, secret *corev1.Secret, dialer controllerutil.DialContextFunc) *commander {
func newCommander(log controllerutil.Logger, cluster *v1.ClickHouseCluster, secret *corev1.Secret, dialer controllerutil.DialContextFunc, cache *connCache) *commander {
log = log.Named("commander")
credHash, _ := controllerutil.DeepHashObject(secret.Data[SecretKeyManagementPassword])

return &commander{
log: log.Named("commander"),
conns: map[v1.ClickHouseReplicaID]clickhouse.Conn{},
log: log,
entry: cache.Get(cluster.NamespacedName(), credHash, log),
cluster: cluster,
Comment thread
GrigoryPervakov marked this conversation as resolved.
dialer: dialer,
auth: clickhouse.Auth{
Expand All @@ -74,19 +75,6 @@ func newCommander(log controllerutil.Logger, cluster *v1.ClickHouseCluster, secr
}
}

func (cmd *commander) Close() {
cmd.lock.Lock()
defer cmd.lock.Unlock()

for id, conn := range cmd.conns {
if err := conn.Close(); err != nil {
cmd.log.Warn("error closing connection", "error", err, "replica_id", id)
}
}

cmd.conns = map[v1.ClickHouseReplicaID]clickhouse.Conn{}
}

type replicaProbe struct {
Version string
ReloadConfigRevision string
Expand Down Expand Up @@ -474,53 +462,28 @@ func (cmd *commander) CleanupDatabaseReplicas(
}

func (cmd *commander) getConn(id v1.ClickHouseReplicaID) (clickhouse.Conn, error) {
cmd.lock.RLock()
conn, ok := cmd.conns[id]
cmd.lock.RUnlock()

if ok {
return conn, nil
}

cmd.lock.Lock()
defer cmd.lock.Unlock()
return cmd.entry.Conn(id, func() (clickhouse.Conn, error) {
cmd.log.Debug("creating new ClickHouse connection", "replica_id", id)

conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{net.JoinHostPort(cmd.cluster.HostnameByID(id), strconv.FormatInt(int64(PortManagement), 10))},
Auth: cmd.auth,
DialContext: cmd.dialer,
Debugf: func(format string, args ...any) {
cmd.log.Debug(fmt.Sprintf(format, args...))
},
})
if err != nil {
cmd.log.Error(err, "failed to open ClickHouse connection", "replica_id", id)
return nil, fmt.Errorf("open ClickHouse connection: %w", err)
}

// Check if another goroutine created the connection while we were waiting for the lock
if conn, ok := cmd.conns[id]; ok {
return conn, nil
}

cmd.log.Debug("creating new ClickHouse connection", "replica_id", id)

conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{net.JoinHostPort(cmd.cluster.HostnameByID(id), strconv.FormatInt(int64(PortManagement), 10))},
Auth: cmd.auth,
DialContext: cmd.dialer,
Debugf: func(format string, args ...any) {
cmd.log.Debug(fmt.Sprintf(format, args...))
},
})
if err != nil {
cmd.log.Error(err, "failed to open ClickHouse connection", "replica_id", id)
return nil, fmt.Errorf("open ClickHouse connection: %w", err)
}

cmd.conns[id] = conn

return conn, nil
}

func (cmd *commander) getAnyConn(ctx context.Context) (v1.ClickHouseReplicaID, clickhouse.Conn, error) {
cmd.lock.RLock()
defer cmd.lock.RUnlock()

for id, conn := range cmd.conns {
if conn.Ping(ctx) == nil {
return id, conn, nil
}
}

return v1.ClickHouseReplicaID{}, nil, errors.New("no available connections")
return cmd.entry.AnyConn(ctx)
}

func (cmd *commander) ensureReplicaDefaultDatabaseEngine(ctx context.Context, log controllerutil.Logger, id v1.ClickHouseReplicaID) error {
Expand Down
73 changes: 42 additions & 31 deletions internal/controller/clickhouse/commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"strings"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/go-logr/zapr"
"github.com/google/uuid"
"github.com/moby/moby/api/types/container"
Expand All @@ -18,6 +17,7 @@ import (
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/network"
"github.com/testcontainers/testcontainers-go/wait"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
Expand All @@ -32,7 +32,6 @@ const (
keeperHostname = "test-keeper"
clickhouseHostnameFormat = "test-clickhouse-0-%d-0"
testPassword = "test-password"
testUsername = "operator"
keeperImage = "clickhouse/clickhouse-keeper:26.5"
clickhouseImage = "clickhouse/clickhouse-server:26.5"
testConfigRevision = "test-revision-v1"
Expand Down Expand Up @@ -150,7 +149,17 @@ var _ = Describe("commander", Ordered, Label("integration"), func() {

chPort := strconv.FormatInt(PortNative, 10) + "/tcp"
chHTTPPort := strconv.FormatInt(PortHTTP, 10) + "/tcp"
conns := map[v1.ClickHouseReplicaID]clickhouse.Conn{}
hostTargets := map[string]string{}
cluster := v1.ClickHouseCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "default",
},
Spec: v1.ClickHouseClusterSpec{
Shards: new(int32(1)),
Replicas: new(testReplicas),
},
}

for i := range testReplicas {
By(fmt.Sprintf("starting ClickHouse node %d", i))
Expand Down Expand Up @@ -194,36 +203,33 @@ var _ = Describe("commander", Ordered, Label("integration"), func() {
port, err := ctr.MappedPort(ctx, chPort)
Expect(err).NotTo(HaveOccurred())

conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{net.JoinHostPort(host, port.Port())},
Auth: clickhouse.Auth{Username: testUsername, Password: testPassword},
})
Expect(err).NotTo(HaveOccurred())
Expect(conn.Ping(ctx)).To(Succeed())

conns[v1.ClickHouseReplicaID{ShardID: 0, Index: i}] = conn
hostTargets[cluster.HostnameByID(v1.ClickHouseReplicaID{Index: i})] = net.JoinHostPort(host, port.Port())
}

logger := zap.NewRaw(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))
logf.SetLogger(zapr.NewLogger(logger))
zapLogger := zap.NewRaw(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))
logf.SetLogger(zapr.NewLogger(zapLogger))
logger := controllerutil.NewLogger(zapLogger)

cmd = &commander{
log: controllerutil.NewLogger(logger),
cluster: &v1.ClickHouseCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "default",
},
Spec: v1.ClickHouseClusterSpec{
Shards: new(int32(1)),
Replicas: new(testReplicas),
},
},
auth: clickhouse.Auth{Username: testUsername, Password: testPassword},
conns: conns,
dialer := func(ctx context.Context, addr string) (net.Conn, error) {
host, _, err := net.SplitHostPort(addr)
if err != nil {
return nil, fmt.Errorf("split addr %q: %w", addr, err)
}

target, ok := hostTargets[host]
if !ok {
return nil, fmt.Errorf("no test container for host %q", host)
}

return (&net.Dialer{}).DialContext(ctx, "tcp", target)
}

secret := &corev1.Secret{Data: map[string][]byte{SecretKeyManagementPassword: []byte(testPassword)}}
cache := newConnCache()
cmd = newCommander(logger, &cluster, secret, dialer, cache)

DeferCleanup(func() {
cmd.Close()
cache.Close(logger)
})
})

Expand Down Expand Up @@ -261,8 +267,9 @@ var _ = Describe("commander", Ordered, Label("integration"), func() {
id0 := v1.ClickHouseReplicaID{ShardID: 0, Index: 0}
dbUUID := uuid.New().String()
q := `CREATE DATABASE testdb UUID '%s' ENGINE=Replicated('/clickhouse/databases/testdb', '{shard}', '{replica}')`
err := cmd.conns[id0].Exec(ctx, fmt.Sprintf(q, dbUUID))
conn, err := cmd.getConn(id0)
Expect(err).NotTo(HaveOccurred())
Expect(conn.Exec(ctx, fmt.Sprintf(q, dbUUID))).To(Succeed())

dbs, err := cmd.Databases(ctx, id0)
Expect(err).NotTo(HaveOccurred())
Expand All @@ -286,7 +293,8 @@ var _ = Describe("commander", Ordered, Label("integration"), func() {
It("should sync all replicas in shard", func(ctx context.Context) {
By("creating test tables")

conn := cmd.conns[v1.ClickHouseReplicaID{ShardID: 0, Index: 0}]
conn, err := cmd.getConn(v1.ClickHouseReplicaID{ShardID: 0, Index: 0})
Expect(err).NotTo(HaveOccurred())
Expect(conn.Exec(ctx, `CREATE TABLE testdb.test (id UInt64) ENGINE=ReplicatedMergeTree ORDER BY id`)).To(Succeed())
Expect(conn.Exec(ctx, `INSERT INTO testdb.test SELECT number FROM numbers(10)`)).To(Succeed())

Expand All @@ -301,9 +309,12 @@ var _ = Describe("commander", Ordered, Label("integration"), func() {
}, "5s", "100ms").To(Succeed())

var count uint64
for id, conn := range cmd.conns {
for id := range cmd.cluster.ReplicaIDs() {
By(fmt.Sprintf("verifying data is replicated to replica %d", id.Index))

conn, err := cmd.getConn(id)
Expect(err).NotTo(HaveOccurred())

row := conn.QueryRow(ctx, `SELECT count() FROM testdb.test`)
Expect(row.Err()).ToNot(HaveOccurred())
Expect(row.Scan(&count)).To(Succeed())
Expand Down
132 changes: 132 additions & 0 deletions internal/controller/clickhouse/conncache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package clickhouse

import (
"context"
"errors"
"maps"
"sync"

"github.com/ClickHouse/clickhouse-go/v2"
"k8s.io/apimachinery/pkg/types"

v1 "github.com/ClickHouse/clickhouse-operator/api/v1alpha1"
"github.com/ClickHouse/clickhouse-operator/internal/controllerutil"
)

type connCache struct {
mu sync.Mutex
entries map[types.NamespacedName]*connCacheEntry
}

type connCacheEntry struct {
mu sync.Mutex
credHash string
conns map[v1.ClickHouseReplicaID]clickhouse.Conn
}

func newConnCache() *connCache {
return &connCache{entries: map[types.NamespacedName]*connCacheEntry{}}
}

func (c *connCache) Get(key types.NamespacedName, credHash string, log controllerutil.Logger) *connCacheEntry {
if c == nil { // no pool wired (tests)
return &connCacheEntry{credHash: credHash, conns: map[v1.ClickHouseReplicaID]clickhouse.Conn{}}
}

c.mu.Lock()
defer c.mu.Unlock()

e, ok := c.entries[key]
if !ok {
e = &connCacheEntry{credHash: credHash, conns: map[v1.ClickHouseReplicaID]clickhouse.Conn{}}
c.entries[key] = e

return e
}

if e.credHash == credHash {
return e
}

e.Close(log)
e = &connCacheEntry{credHash: credHash, conns: map[v1.ClickHouseReplicaID]clickhouse.Conn{}}
c.entries[key] = e

return e
}

func (c *connCache) Evict(key types.NamespacedName, log controllerutil.Logger) {
if c == nil {
return
}

c.mu.Lock()
defer c.mu.Unlock()

if e, ok := c.entries[key]; ok {
e.Close(log)
delete(c.entries, key)
}
}

func (c *connCache) Close(log controllerutil.Logger) {
if c == nil {
return
}

c.mu.Lock()
defer c.mu.Unlock()

for key, e := range c.entries {
e.Close(log)
delete(c.entries, key)
}
}

func (e *connCacheEntry) Conn(id v1.ClickHouseReplicaID, dial func() (clickhouse.Conn, error)) (clickhouse.Conn, error) {
e.mu.Lock()
defer e.mu.Unlock()

if conn, ok := e.conns[id]; ok {
return conn, nil
}

conn, err := dial()
if err != nil {
return nil, err
}

e.conns[id] = conn

return conn, nil
}

func (e *connCacheEntry) AnyConn(ctx context.Context) (v1.ClickHouseReplicaID, clickhouse.Conn, error) {
conns := func() map[v1.ClickHouseReplicaID]clickhouse.Conn {
e.mu.Lock()
defer e.mu.Unlock()

return maps.Clone(e.conns)
}()

for id, conn := range conns {
if conn.Ping(ctx) == nil {
return id, conn, nil
}
}

return v1.ClickHouseReplicaID{}, nil, errors.New("no available connections")
}

func (e *connCacheEntry) Close(log controllerutil.Logger) {
e.mu.Lock()
defer e.mu.Unlock()

for id, conn := range e.conns {
if err := conn.Close(); err != nil {
log.Warn("error closing pooled connection", "error", err, "replica_id", id)
}
}

clear(e.conns)
}
Loading
Loading