Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,21 @@ jobs:
MINIO_ROOT_PASSWORD: miniosecret
AWS_EC2_METADATA_DISABLED: true

- name: Generate ClickHouse TLS certificates
run: |
mkdir -p ch-certs
# CA
openssl genrsa -out ch-certs/ca.key 2048
openssl req -new -x509 -key ch-certs/ca.key -out ch-certs/ca.crt -days 3650 -subj "/CN=ClickHouse-CA"
# Server cert (CN=localhost, SAN for TLS 1.3)
openssl genrsa -out ch-certs/server.key 2048
openssl req -new -key ch-certs/server.key -out ch-certs/server.csr -subj "/CN=localhost" -addext "subjectAltName=DNS:localhost,IP:127.0.0.1"
openssl x509 -req -days 3650 -in ch-certs/server.csr -CA ch-certs/ca.crt -CAkey ch-certs/ca.key -CAcreateserial -out ch-certs/server.crt -copy_extensions copyall
# Client cert for mTLS (CN=peerdb-client)
openssl genrsa -out ch-certs/client.key 2048
openssl req -new -key ch-certs/client.key -out ch-certs/client.csr -subj "/CN=peerdb-client"
openssl x509 -req -days 3650 -in ch-certs/client.csr -CA ch-certs/ca.crt -CAkey ch-certs/ca.key -CAcreateserial -out ch-certs/client.crt

- name: create postgres extensions, increase logical replication limits, and setup catalog database
run: >
docker exec "${{ job.services.catalog.id }}" apk add --no-cache build-base git &&
Expand Down Expand Up @@ -292,16 +307,39 @@ jobs:
<access_management>1</access_management>
<named_collection_control>1</named_collection_control>
</default>
<peerdb_tls>
<ssl_certificates>
<common_name>peerdb-client</common_name>
</ssl_certificates>
<networks>
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
<access_management>1</access_management>
</peerdb_tls>
</users>
<logger><level>none</level></logger>
<path>var/lib/clickhouse</path>
<tmp_path>var/lib/clickhouse/tmp</tmp_path>
<user_files_path>var/lib/clickhouse/user_files</user_files_path>
<format_schema_path>var/lib/clickhouse/format_schemas</format_schema_path>
<tcp_port>9000</tcp_port>
<tcp_port_secure>9440</tcp_port_secure>
<http_port remove="1"/>
<postgresql_port remove="1"/>
<mysql_port remove="1"/>
<openSSL>
<server>
<certificateFile>../ch-certs/server.crt</certificateFile>
<privateKeyFile>../ch-certs/server.key</privateKeyFile>
<caConfig>../ch-certs/ca.crt</caConfig>
<verificationMode>relaxed</verificationMode>
<cacheSessions>true</cacheSessions>
<disableProtocols>sslv2,sslv3</disableProtocols>
<preferServerCiphers>true</preferServerCiphers>
</server>
</openSSL>
<macros>
<shard>1</shard>
<replica>1</replica>
Expand Down Expand Up @@ -347,16 +385,39 @@ jobs:
<access_management>1</access_management>
<named_collection_control>1</named_collection_control>
</default>
<peerdb_tls>
<ssl_certificates>
<common_name>peerdb-client</common_name>
</ssl_certificates>
<networks>
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
<access_management>1</access_management>
</peerdb_tls>
</users>
<logger><level>none</level></logger>
<path>var/lib/clickhouse</path>
<tmp_path>var/lib/clickhouse/tmp</tmp_path>
<user_files_path>var/lib/clickhouse/user_files</user_files_path>
<format_schema_path>var/lib/clickhouse/format_schemas</format_schema_path>
<tcp_port>9001</tcp_port>
<tcp_port_secure>9441</tcp_port_secure>
<http_port remove="1"/>
<postgresql_port remove="1"/>
<mysql_port remove="1"/>
<openSSL>
<server>
<certificateFile>../ch-certs/server.crt</certificateFile>
<privateKeyFile>../ch-certs/server.key</privateKeyFile>
<caConfig>../ch-certs/ca.crt</caConfig>
<verificationMode>relaxed</verificationMode>
<cacheSessions>true</cacheSessions>
<disableProtocols>sslv2,sslv3</disableProtocols>
<preferServerCiphers>true</preferServerCiphers>
</server>
</openSSL>
<macros>
<shard>2</shard>
<replica>1</replica>
Expand Down Expand Up @@ -511,6 +572,12 @@ jobs:
FLOW_TESTS_AWS_ACCESS_KEY_ID: ${{ steps.setup-aws.outputs.aws-access-key-id }}
FLOW_TESTS_AWS_SECRET_ACCESS_KEY: ${{ steps.setup-aws.outputs.aws-secret-access-key }}
FLOW_TESTS_AWS_SESSION_TOKEN: ${{ steps.setup-aws.outputs.aws-session-token }}
# ClickHouse TLS/mTLS test certificates
PEERDB_CLICKHOUSE_TLS_PORT: "9440"
PEERDB_CLICKHOUSE_TLS_CLUSTER_PORT: "9441"
PEERDB_CLICKHOUSE_TLS_CA_CERT_PATH: ${{ github.workspace }}/ch-certs/ca.crt
PEERDB_CLICKHOUSE_TLS_CLIENT_CERT_PATH: ${{ github.workspace }}/ch-certs/client.crt
PEERDB_CLICKHOUSE_TLS_CLIENT_KEY_PATH: ${{ github.workspace }}/ch-certs/client.key

- name: Upload peer-flow logs
if: always()
Expand Down
5 changes: 4 additions & 1 deletion flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,10 @@ func (h *FlowRequestHandler) CreatePeer(
req *protos.CreatePeerRequest,
) (*protos.CreatePeerResponse, APIError) {
if !req.DisableValidation {
status, validateErr := h.ValidatePeer(ctx, &protos.ValidatePeerRequest{Peer: req.Peer})
status, validateErr := h.ValidatePeer(ctx, &protos.ValidatePeerRequest{
Peer: req.Peer,
Flags: req.ValidationFlags,
})
if validateErr != nil {
return nil, validateErr
}
Expand Down
112 changes: 91 additions & 21 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,30 +231,100 @@ func (c *ClickHouseConnector) ValidateCheck(ctx context.Context) error {
return nil
}

func Connect(ctx context.Context, env map[string]string, config *protos.ClickhouseConfig) (clickhouse.Conn, error) {
var tlsSetting *tls.Config
if !config.DisableTls {
tlsSetting = &tls.Config{MinVersion: tls.VersionTLS13}
if config.Certificate != nil || config.PrivateKey != nil {
if config.Certificate == nil || config.PrivateKey == nil {
return nil, errors.New("both certificate and private key must be provided if using certificate-based authentication")
}
cert, err := tls.X509KeyPair([]byte(*config.Certificate), []byte(*config.PrivateKey))
if err != nil {
return nil, fmt.Errorf("failed to parse provided certificate: %w", err)
}
tlsSetting.Certificates = []tls.Certificate{cert}
// configureK8sSecretTLS configures the tls.Config to load client certificates
// dynamically from a Kubernetes TLS Secret via the informer cache.
// On the initial call it waits up to ~30s for the Secret to appear (e.g.
// while cert-manager fulfills the Certificate CR).
func configureK8sSecretTLS(ctx context.Context, tlsConfig *tls.Config, secretName string) error {
secretStore, err := utils.GetK8sSecretStore()
if err != nil {
return fmt.Errorf("failed to initialize K8s Secret store for TLS certificate Secret %q: %w", secretName, err)
}

_, caCertPEM, err := secretStore.WaitForTLSCertificate(ctx, secretName)
if err != nil {
return fmt.Errorf("failed to load TLS certificate from Secret %q: %w", secretName, err)
}
if len(caCertPEM) > 0 {
caPool := x509.NewCertPool()
if !caPool.AppendCertsFromPEM(caCertPEM) {
return fmt.Errorf("failed to parse CA certificate from Secret %q", secretName)
}
if config.RootCa != nil {
caPool := x509.NewCertPool()
if !caPool.AppendCertsFromPEM([]byte(*config.RootCa)) {
return nil, errors.New("failed to parse provided root CA")
}
tlsSetting.RootCAs = caPool
tlsConfig.RootCAs = caPool
}

tlsConfig.GetClientCertificate = func(*tls.CertificateRequestInfo) (*tls.Certificate, error) {
cert, _, err := secretStore.GetTLSCertificate(secretName)
if err != nil {
return nil, fmt.Errorf("failed to load TLS certificate from Secret %q: %w", secretName, err)
}
if config.TlsHost != "" {
tlsSetting.ServerName = config.TlsHost
return cert, nil
}

return nil
}

// configureInlineTLS configures the tls.Config using inline certificate/key
// PEM values from the ClickHouse peer config.
func configureInlineTLS(tlsConfig *tls.Config, config *protos.ClickhouseConfig) error {
if config.Certificate != nil || config.PrivateKey != nil {
if config.Certificate == nil || config.PrivateKey == nil {
return errors.New("both certificate and private key must be provided if using certificate-based authentication")
}
cert, err := tls.X509KeyPair([]byte(*config.Certificate), []byte(*config.PrivateKey))
if err != nil {
return fmt.Errorf("failed to parse provided certificate: %w", err)
}
tlsConfig.Certificates = []tls.Certificate{cert}
}
if config.RootCa != nil {
caPool := x509.NewCertPool()
if !caPool.AppendCertsFromPEM([]byte(*config.RootCa)) {
return errors.New("failed to parse provided root CA")
}
tlsConfig.RootCAs = caPool
}
return nil
}

// buildTLSConfig builds the TLS configuration for a ClickHouse connection.
// When K8s TLS Secret loading is enabled and the secret name is configured,
// it will load client certificates dynamically from the specified Kubernetes TLS secret.
// Otherwise, inline certificates are used.
func buildTLSConfig(ctx context.Context, env map[string]string, config *protos.ClickhouseConfig) (*tls.Config, error) {
if config.DisableTls {
return nil, nil
}

tlsConfig := &tls.Config{
MinVersion: tls.VersionTLS13,
ServerName: config.TlsHost,
}
secretName := config.GetTlsCertificateSecretName()

if secretName != "" {
k8sTLSEnabled, err := internal.PeerDBClickHouseTLSK8sSecretEnabled(ctx, env)
if err != nil {
return nil, fmt.Errorf("failed to check K8s TLS Secret feature flag: %w", err)
}
if k8sTLSEnabled {
if err := configureK8sSecretTLS(ctx, tlsConfig, secretName); err != nil {
return nil, err
}
}
}

if err := configureInlineTLS(tlsConfig, config); err != nil {
return nil, err
}

return tlsConfig, nil
}

func Connect(ctx context.Context, env map[string]string, config *protos.ClickhouseConfig) (clickhouse.Conn, error) {
tlsSetting, err := buildTLSConfig(ctx, env, config)
if err != nil {
return nil, err
}

settings := clickhouse.Settings{
Expand Down
Loading
Loading