Skip to content

Commit 454d736

Browse files
authored
Kafka Connect: Support Confluent Cloud connectors (open-metadata#23780)
1 parent da8c50d commit 454d736

File tree

9 files changed

+2141
-55
lines changed

9 files changed

+2141
-55
lines changed
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
source:
2+
type: kafkaconnect
3+
serviceName: confluent_cdc_mysql_postgres
4+
serviceConnection:
5+
config:
6+
type: KafkaConnect
7+
hostPort: http://localhost:8083
8+
# For Kafka Connect, choose one of the following authentication methods:
9+
10+
# Option 1: Username/Password Authentication (for self-hosted Kafka Connect)
11+
KafkaConnectConfig:
12+
username: admin
13+
password: admin_password
14+
15+
# Option 2: API Key Authentication (for Confluent Cloud)
16+
# KafkaConnectConfig:
17+
# username: YOUR_CONFLUENT_CLOUD_API_KEY
18+
# password: YOUR_CONFLUENT_CLOUD_API_SECRET
19+
20+
# Option 3: No Authentication
21+
# KafkaConnectConfig: null
22+
23+
verifySSL: true
24+
messagingServiceName: KafkaProd
25+
26+
# Optional: Filter CDC connectors/pipelines using regex patterns
27+
# pipelineFilterPattern:
28+
# includes:
29+
# - "mysql-cdc-.*"
30+
# - "postgres-sink-.*"
31+
# excludes:
32+
# - ".*test.*"
33+
# - ".*dev.*"
34+
35+
sourceConfig:
36+
config:
37+
type: PipelineMetadata
38+
lineageInformation:
39+
dbServiceNames:
40+
- "MysqlProd"
41+
- "PostgresProd"
42+
storageServiceNames: []
43+
44+
sink:
45+
type: metadata-rest
46+
config: {}
47+
48+
workflowConfig:
49+
loggerLevel: INFO # DEBUG, INFO, WARN or ERROR
50+
openMetadataServerConfig:
51+
hostPort: http://localhost:8585/api
52+
authProvider: openmetadata
53+
securityConfig:
54+
jwtToken: <your-jwt-token-here>
55+
56+
# To run this workflow:
57+
# metadata ingest -c /path/to/confluent_cdc.yaml
58+
59+
# Prerequisites:
60+
# 1. OpenMetadata server running at http://localhost:8585
61+
# 2. MySQL database service already ingested into OpenMetadata
62+
# 3. Postgres database service already ingested into OpenMetadata
63+
# 4. Kafka messaging service already ingested into OpenMetadata
64+
# 5. Kafka Connect connectors configured and running
65+
#
66+
# The connector will create:
67+
# - Pipeline entities for each CDC connector
68+
# - Table → Topic lineage with column-level mapping
69+
# - Topic → Table lineage with column-level mapping
70+
# - Complete end-to-end lineage: MySQL columns → Kafka topic fields → Postgres columns
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
# Example: Confluent Cloud CDC Configuration
2+
# This example shows how to connect to Confluent Cloud CDC connectors
3+
# that replicate data from MySQL to PostgreSQL via Kafka
4+
5+
source:
6+
type: kafkaconnect
7+
serviceName: confluent_cloud_cdc_production
8+
serviceConnection:
9+
config:
10+
type: KafkaConnect
11+
# Confluent Cloud Kafka Connect REST API endpoint
12+
hostPort: https://pkc-xxxxx.us-east-1.aws.confluent.cloud:443
13+
14+
# Confluent Cloud API Key authentication
15+
KafkaConnectConfig:
16+
username: YOUR_CONFLUENT_CLOUD_API_KEY
17+
password: YOUR_CONFLUENT_CLOUD_API_SECRET
18+
19+
verifySSL: true
20+
messagingServiceName: KafkaCloudProd
21+
22+
# Filter to include only production CDC connectors
23+
pipelineFilterPattern:
24+
includes:
25+
- "prod-mysql-cdc-.*"
26+
- "prod-postgres-sink-.*"
27+
excludes:
28+
- ".*staging.*"
29+
- ".*test.*"
30+
31+
sourceConfig:
32+
config:
33+
type: PipelineMetadata
34+
lineageInformation:
35+
dbServiceNames:
36+
- "MysqlProd"
37+
- "PostgresProd"
38+
storageServiceNames: []
39+
40+
sink:
41+
type: metadata-rest
42+
config: {}
43+
44+
workflowConfig:
45+
loggerLevel: INFO
46+
openMetadataServerConfig:
47+
hostPort: https://your-openmetadata-server.com/api
48+
authProvider: openmetadata
49+
securityConfig:
50+
jwtToken: <your-jwt-token>
51+
52+
# Example CDC Connector Configuration that this workflow will discover:
53+
#
54+
# MySQL CDC Source Connector:
55+
# {
56+
# "name": "prod-mysql-cdc-customers",
57+
# "config": {
58+
# "connector.class": "io.debezium.connector.mysql.MySqlConnector",
59+
# "database.hostname": "mysql.example.com",
60+
# "database.name": "ecommerce",
61+
# "table.include.list": "ecommerce.customers,ecommerce.orders",
62+
# "database.server.name": "prod-mysql",
63+
# "topics": "prod-mysql.ecommerce.customers"
64+
# }
65+
# }
66+
#
67+
# Postgres Sink Connector:
68+
# {
69+
# "name": "prod-postgres-sink-customers",
70+
# "config": {
71+
# "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
72+
# "connection.url": "jdbc:postgresql://postgres.example.com:5432/analytics",
73+
# "topics": "prod-mysql.ecommerce.customers",
74+
# "table.name.format": "customers",
75+
# "schema": "public"
76+
# }
77+
# }
78+
#
79+
# This will create lineage:
80+
# MySQL: ecommerce.customers (id, name, email, created_at)
81+
#
82+
# Kafka: prod-mysql.ecommerce.customers (id, name, email, created_at) [topic schema fields]
83+
#
84+
# Postgres: analytics.public.customers (id, name, email, created_at)
85+
#
86+
# With column-level lineage showing exact field mappings at each hop!
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
# Example: Local Confluent CDC Setup for Development
2+
# This example shows a typical local development setup with:
3+
# - Local Kafka Connect cluster
4+
# - MySQL source database
5+
# - PostgreSQL target database
6+
# - Local Kafka broker
7+
8+
source:
9+
type: kafkaconnect
10+
serviceName: local_cdc_dev
11+
serviceConnection:
12+
config:
13+
type: KafkaConnect
14+
# Local Kafka Connect REST API
15+
hostPort: http://localhost:8083
16+
17+
# No authentication for local development
18+
# KafkaConnectConfig: null
19+
20+
# Or use basic auth if configured
21+
KafkaConnectConfig:
22+
username: connect-user
23+
password: connect-password
24+
25+
verifySSL: false # Typically disabled for local development
26+
messagingServiceName: KafkaLocal
27+
28+
sourceConfig:
29+
config:
30+
type: PipelineMetadata
31+
lineageInformation:
32+
dbServiceNames:
33+
- "MysqlLocal"
34+
- "PostgresLocal"
35+
storageServiceNames: []
36+
37+
sink:
38+
type: metadata-rest
39+
config: {}
40+
41+
workflowConfig:
42+
loggerLevel: DEBUG # Use DEBUG for development to see detailed logs
43+
openMetadataServerConfig:
44+
hostPort: http://localhost:8585/api
45+
authProvider: openmetadata
46+
securityConfig:
47+
jwtToken: eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg
48+
49+
# ============================================================================
50+
# Local Development Setup Guide
51+
# ============================================================================
52+
#
53+
# 1. Start local infrastructure:
54+
# docker-compose up -d # Start MySQL, Postgres, Kafka, Kafka Connect
55+
#
56+
# 2. Ingest database metadata into OpenMetadata:
57+
# # MySQL
58+
# metadata ingest -c mysql_local.yaml
59+
#
60+
# # PostgreSQL
61+
# metadata ingest -c postgres_local.yaml
62+
#
63+
# # Kafka
64+
# metadata ingest -c kafka_local.yaml
65+
#
66+
# 3. Configure CDC connectors in Kafka Connect:
67+
#
68+
# # MySQL CDC Source Connector (Debezium)
69+
# curl -X POST http://localhost:8083/connectors \
70+
# -H "Content-Type: application/json" \
71+
# -d '{
72+
# "name": "mysql-source-customers",
73+
# "config": {
74+
# "connector.class": "io.debezium.connector.mysql.MySqlConnector",
75+
# "database.hostname": "mysql",
76+
# "database.port": "3306",
77+
# "database.user": "debezium",
78+
# "database.password": "dbz",
79+
# "database.server.id": "184054",
80+
# "database.server.name": "local-mysql",
81+
# "database.name": "testdb",
82+
# "table.include.list": "testdb.customers,testdb.orders",
83+
# "database.history.kafka.bootstrap.servers": "kafka:9092",
84+
# "database.history.kafka.topic": "schema-changes.testdb"
85+
# }
86+
# }'
87+
#
88+
# # PostgreSQL Sink Connector
89+
# curl -X POST http://localhost:8083/connectors \
90+
# -H "Content-Type: application/json" \
91+
# -d '{
92+
# "name": "postgres-sink-customers",
93+
# "config": {
94+
# "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
95+
# "connection.url": "jdbc:postgresql://postgres:5432/warehouse",
96+
# "connection.user": "postgres",
97+
# "connection.password": "postgres",
98+
# "topics": "local-mysql.testdb.customers",
99+
# "table.name.format": "customers",
100+
# "auto.create": "true",
101+
# "auto.evolve": "true",
102+
# "insert.mode": "upsert",
103+
# "pk.mode": "record_key",
104+
# "delete.enabled": "true"
105+
# }
106+
# }'
107+
#
108+
# 4. Run this CDC workflow:
109+
# metadata ingest -c confluent_cdc_local.yaml
110+
#
111+
# 5. View lineage in OpenMetadata UI:
112+
# http://localhost:8585
113+
# Navigate to: Pipeline Services → local_cdc_dev → Pipelines
114+
# Click on a pipeline to see column-level lineage
115+
#
116+
# Expected Lineage:
117+
# testdb.customers.id → local-mysql.testdb.customers.id → warehouse.public.customers.id
118+
# testdb.customers.name → local-mysql.testdb.customers.name → warehouse.public.customers.name
119+
# testdb.customers.email → local-mysql.testdb.customers.email → warehouse.public.customers.email
120+
# testdb.customers.created → local-mysql.testdb.customers.created → warehouse.public.customers.created
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
source:
2+
type: kafkaconnect
3+
serviceName: confluent_cloud_cdc
4+
serviceConnection:
5+
config:
6+
type: KafkaConnect
7+
# IMPORTANT: Use the Kafka Connect API endpoint, NOT the Kafka broker endpoint
8+
# Format: https://api.confluent.cloud/connect/v1/environments/{ENV_ID}/clusters/{CLUSTER_ID}
9+
# Example: https://api.confluent.cloud/connect/v1/environments/env-abc123/clusters/lkc-xyz789
10+
hostPort: https://api.confluent.cloud/connect/v1/environments/YOUR_ENV_ID/clusters/YOUR_CONNECT_CLUSTER_ID
11+
12+
# Use Kafka Connect API Key (NOT Kafka broker API key)
13+
KafkaConnectConfig:
14+
username: YOUR_KAFKA_CONNECT_API_KEY
15+
password: YOUR_KAFKA_CONNECT_API_SECRET
16+
17+
verifySSL: true
18+
19+
# REQUIRED for Confluent Cloud: Specify your Kafka messaging service
20+
# This should match the service name you used when ingesting Kafka topics
21+
messagingServiceName: confluent_kafka_cloud
22+
23+
# Optional: Filter specific connectors
24+
# pipelineFilterPattern:
25+
# includes:
26+
# - "mysql-cdc-.*"
27+
# - "postgres-sink-.*"
28+
29+
sourceConfig:
30+
config:
31+
type: PipelineMetadata
32+
# Specify the database services where your tables are ingested
33+
lineageInformation:
34+
dbServiceNames:
35+
- local_mysql # Replace with your MySQL service name
36+
- local_postgres # Replace with your Postgres service name
37+
storageServiceNames: [] # Add S3/GCS service names if using sink connectors to storage
38+
39+
sink:
40+
type: metadata-rest
41+
config: {}
42+
43+
workflowConfig:
44+
loggerLevel: DEBUG # Use DEBUG to see detailed connection info
45+
openMetadataServerConfig:
46+
hostPort: http://localhost:8585/api
47+
authProvider: openmetadata
48+
securityConfig:
49+
jwtToken: eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg

0 commit comments

Comments
 (0)