From d43596e2c2b4781dc21b602c7afdff6e34f90c1c Mon Sep 17 00:00:00 2001 From: Sergey Chernov Date: Mon, 22 Jun 2026 10:16:24 -0700 Subject: [PATCH 1/7] udpated type mapping docs for kafka sink connector --- .../kafka/kafka-clickhouse-connect-sink.md | 97 ++++++++++--------- 1 file changed, 52 insertions(+), 45 deletions(-) diff --git a/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md b/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md index 27561bbd017..28e1a7bae44 100644 --- a/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md +++ b/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md @@ -29,6 +29,7 @@ The [Kafka Connect](https://docs.confluent.io/platform/current/connect/index.htm |----------------------------------|--------------------|---------------|--------------------| | 1.0.0 | > 23.3 | > 2.7 | > 6.1 | + ### Main features {#main-features} - Shipped with out-of-the-box exactly-once semantics. It's powered by a new ClickHouse core feature named [KeeperMap](https://github.com/ClickHouse/ClickHouse/pull/39976) (used as a state store by the connector) and allows for minimalistic architecture. @@ -121,6 +122,7 @@ The full table of configuration options: | `bufferFlushTime` (since v1.3.6) | Maximum time in milliseconds to buffer records before flush when `exactlyOnce=false`. `0` disables time-based flushing. Default value is `0`. Only required for time-base threshold. Only effective when `bufferCount > 0`. | `"0"` | | `reportInsertedOffsets` (since v1.3.6) | Enables returning only successfully inserted offsets from `preCommit` (instead of `currentOffsets`) when `exactlyOnce=false`. This does not apply when `ignorePartitionsWhenBatching=true`, where `currentOffsets` are still returned. | `"false"` | + ### Target tables {#target-tables} ClickHouse Connect Sink reads messages from Kafka topics and writes them to appropriate tables. ClickHouse Connect Sink writes data into existing tables. Please, make sure a target table with an appropriate schema was created in ClickHouse before starting to insert data into it. @@ -158,6 +160,7 @@ Sink, use [Kafka Connect Transformations](https://docs.confluent.io/platform/cur | org.apache.kafka.connect.data.Timestamp | Int32 / Date32 | ✅ | No | | org.apache.kafka.connect.data.Decimal | Decimal | ✅ | No | + - (1) - JSON is supported only when ClickHouse settings has `input_format_binary_read_json_as_string=1`. This works only for RowBinary format family and the setting affects all columns in the insert request so they all should be a string. Connector will convert STRUCT to a JSON string in this case. - (2) - When struct has unions like `oneof` then converter should be configured to NOT add prefix/suffix to a field names. There is `generate.index.for.unions=false` [setting for `ProtobufConverter`](https://docs.confluent.io/platform/current/schema-registry/connect.html#protobuf). @@ -234,9 +237,7 @@ The connector can consume data from multiple topics } ``` -#### Using with different data formats {#using-with-different-data-formats} - -##### Avro schema support {#avro-schema-support} +### Avro schema support {#avro-schema-support} ```json { @@ -251,7 +252,7 @@ The connector can consume data from multiple topics } ``` -###### Avro type mapping {#avro-type-mapping} +#### Avro type mapping {#avro-type-mapping} The type mapping below is defined by `io.confluent.connect.avro.AvroConverter`, the official Avro serializer/deserializer implementation in Kafka Connect. See the Kafka Connect [docs](https://docs.confluent.io/platform/current/connect/userguide.html#avro) for advanced information on conversion logic. ✅: Supported @@ -279,7 +280,7 @@ The type mapping below is defined by `io.confluent.connect.avro.AvroConverter`, Refer to [Supported data types](#supported-data-types) for the mapping between Kafka Connect types and ClickHouse types. -###### Unsupported Avro schemas {#unsupported-avro-schemas} +#### Unsupported Avro schemas {#unsupported-avro-schemas} The following Avro schemas are unsupported by the connector: - fixed `decimal` logical type @@ -319,7 +320,7 @@ The following Avro schemas are unsupported by the connector: } ``` -##### Protobuf schema support {#protobuf-schema-support} +### Protobuf schema support {#protobuf-schema-support} ```json { @@ -336,7 +337,7 @@ The following Avro schemas are unsupported by the connector: Please note: if you encounter issues with missing classes, not every environment comes with the protobuf converter and you may need an alternate release of the jar bundled with dependencies. -###### Protobuf type mapping {#proto-type-mapping} +#### Protobuf Type Mapping {#proto-type-mapping} The type mapping below is defined by `io.confluent.connect.protobuf.ProtobufConverter`, the official Protobuf serializer/deserializer implementation in Kafka Connect. See the Kafka Connect [docs](https://docs.confluent.io/platform/current/connect/userguide.html#json-schema-and-protobuf) for advanced information on conversion logic. ✅: Supported @@ -345,44 +346,48 @@ The type mapping below is defined by `io.confluent.connect.protobuf.ProtobufConv ️⚠️: Partially supported -| Protobuf Type | Kafka Connect Type | Supported | Notes | -|-----------------------------------------|-----------------------------------------|-----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| double | FLOAT64 | ✅ | | -| float | FLOAT32 | ✅ | | -| int32 | INT8/INT16/INT32 | ✅ | Defaults to INT32. Resolves to INT8 if the schema has option `connect.type=int8` (analogously for INT16 if `connect.type=int16`) | -| sint32 | INT8/INT16/INT32 | ✅ | Defaults to INT32. Resolves to INT8 if the schema has option `connect.type=int8` (analogously for INT16 if `connect.type=int16`) | -| sfixed32 | INT8/INT16/INT32 | ✅ | Defaults to INT32. Resolves to INT8 if the schema has option `connect.type=int8` (analogously for INT16 if `connect.type=int16`) | -| uint32 | INT64 | ✅ | | -| fixed32 | INT64 | ✅ | | -| int64 | INT64 | ✅ | | -| uint64 | INT64 | ✅ | | -| sint64 | INT64 | ✅ | | -| fixed64 | INT64 | ✅ | | -| sfixed64 | INT64 | ✅ | | -| bool | BOOLEAN | ✅ | | -| string | STRING | ✅ | | -| bytes | BYTES | ✅ | | -| enum | INT32/STRING | ✅ | Defaults to STRING. Resolves to INT32 if `int.for.enums=true` (see [schema registry docs](https://docs.confluent.io/platform/current/schema-registry/connect.html#protobuf)) | -| message | STRUCT | ⚠️ | See Unsupported schemas section below | -| repeated T (where T is not a map entry) | ARRAY | ✅ | | -| `map` | MAP | ✅ | | -| oneof | STRUCT | ⚠️ | See section below on translating oneof to ClickHouse schema | -| google.protobuf.DoubleValue | FLOAT64 | ✅ | | -| google.protobuf.FloatValue | FLOAT32 | ✅ | | -| google.protobuf.Int64Value | INT64 | ✅ | | -| google.protobuf.UInt64Value | INT64 | ✅ | | -| google.protobuf.UInt32Value | INT64 | ✅ | | -| google.protobuf.Int32Value | INT32 | ✅ | | -| google.protobuf.BoolValue | BOOLEAN | ✅ | | -| google.protobuf.StringValue | STRING | ✅ | | -| google.protobuf.BytesValue | BYTES | ✅ | | -| google.protobuf.Timestamp | org.apache.kafka.connect.data.Timestamp | ✅ | | -| google.type.Date | org.apache.kafka.connect.data.Date | ✅ | | -| google.type.TimeOfDay | org.apache.kafka.connect.data.Time | ✅ | | +| Protobuf Type | Kafka Connect Type | ClickHouse Type | Supported | Notes | +|-----------------------------------------|-----------------------------------------|------------------------------------------------|-----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| double | FLOAT64 | Float64 | ✅ | | +| float | FLOAT32 | Float32 | ✅ | | +| int32 | INT8/INT16/INT32 | Int32 | ✅ | Defaults to INT32. Resolves to INT8 if the schema has option `connect.type=int8` (analogously for INT16 if `connect.type=int16`) | +| sint32 | INT8/INT16/INT32 | Int32 | ✅ | Defaults to INT32. Resolves to INT8 if the schema has option `connect.type=int8` (analogously for INT16 if `connect.type=int16`) | +| sfixed32 | INT8/INT16/INT32 | Int32 | ✅ | Defaults to INT32. Resolves to INT8 if the schema has option `connect.type=int8` (analogously for INT16 if `connect.type=int16`) | +| uint32 | INT64 | UInt32 | ✅ | | +| fixed32 | INT64 | UInt32 | ✅ | | +| int64 | INT64 | Int64 | ✅ | | +| uint64 | INT64 | UInt64 | ✅ | | +| sint64 | INT64 | Int64 | ✅ | | +| fixed64 | INT64 | UInt64 | ✅ | | +| sfixed64 | INT64 | Int64 | ✅ | | +| bool | BOOLEAN | Bool | ✅ | | +| string | STRING | String | ✅ | | +| bytes | BYTES | String | ✅ | | +| enum | INT32/STRING | Int32 | ✅ | Defaults to STRING. Resolves to INT32 if `int.for.enums=true` (see [schema registry docs](https://docs.confluent.io/platform/current/schema-registry/connect.html#protobuf)) | +| message | STRUCT | Tuple / JSON | ⚠️ | See Unsupported schemas section below | +| repeated T (where T is not a map entry) | ARRAY | Array(T) | ✅ | | +| `map` | MAP | Map(K, V) | ✅ | | +| oneof | STRUCT | Tuple / Variant | ⚠️ | See section below on translating oneof to ClickHouse schema | +| google.protobuf.DoubleValue | FLOAT64 | Nullable(Float64) | ✅ | | +| google.protobuf.FloatValue | FLOAT32 | Nullable(Float32) | ✅ | | +| google.protobuf.Int64Value | INT64 | Nullable(Int64) | ✅ | | +| google.protobuf.UInt64Value | INT64 | Nullable(UInt64) | ✅ | | +| google.protobuf.UInt32Value | INT64 | Nullable(UInt32) | ✅ | | +| google.protobuf.Int32Value | INT32 | Nullable(Int32) | ✅ | | +| google.protobuf.BoolValue | BOOLEAN | Nullable(Bool) | ✅ | | +| google.protobuf.StringValue | STRING | Nullable(String) | ✅ | | +| google.protobuf.BytesValue | BYTES | Nullable(String) | ✅ | | +| google.protobuf.Timestamp | org.apache.kafka.connect.data.Timestamp | DateTime64(3) | ✅ | | +| google.type.Date | org.apache.kafka.connect.data.Date | Date | ✅ | | +| google.type.TimeOfDay | org.apache.kafka.connect.data.Time | Int32 / Int64 | ✅ | | +| google.protobuf.Duration | STRUCT | Tuple(`seconds` Int64, `nano` Nullable(Int32)) | ✅ | | +| google.protobuf.Any | _N/A_ | _N/A_ | ❌ | | +| google.protobuf.Empty | _N/A_ | _N/A_ | ❌ | | + Refer to [Supported data types](#supported-data-types) for the mapping between Kafka Connect types and ClickHouse types. -###### Note on translating `oneof` fields to ClickHouse columns {#oneof-translation} +#### Note on translating `oneof` fields to ClickHouse columns {#oneof-translation} The connector does not support translating Protobuf unions (`oneof`) to the ClickHouse Variant type. Instead, list the `oneof` fields as individual nullable fields in your ClickHouse table schema. For example: @@ -410,7 +415,7 @@ CREATE TABLE IF NOT EXISTS `StringIntUnion` ) ENGINE = ...; ``` -###### Unsupported Protobuf schemas {#unsupported-proto-schemas} +#### Unsupported Protobuf schemas {#unsupported-proto-schemas} The following Protobuf schemas are unsupported by the connector: - multi-message unions (**before CH version 26.1**) ```protobuf @@ -438,7 +443,7 @@ message TwoRecords { From CH version 26.1 onwards, this schema is supported when `allow_experimental_nullable_tuple_type=1` (see [this documentation page](https://clickhouse.com/docs/operations/settings/settings#allow_experimental_nullable_tuple_type)). -##### JSON schema support {#json-schema-support} +### JSON schema support {#json-schema-support} ```json { @@ -451,7 +456,7 @@ From CH version 26.1 onwards, this schema is supported when `allow_experimental_ } ``` -##### String support {#string-support} +### String support {#string-support} The connector supports the String Converter in different ClickHouse formats: [JSON](/interfaces/formats/JSONEachRow), [CSV](/interfaces/formats/CSV), and [TSV](/interfaces/formats/TabSeparated). @@ -530,6 +535,7 @@ com.clickhouse:type=ClickHouseKafkaConnector,name=SinkTask{id} | `recordProcessingTime` | long | Total time in nanoseconds spent grouping and converting records to a unified structure. | | `taskProcessingTime` | long | Total time in nanoseconds spent processing and inserting data into ClickHouse. | + #### Kafka Producer/Consumer Metrics {#kafka-producer-consumer-metrics} The connector exposes standard Kafka producer and consumer metrics that provide insights into data flow, throughput, and performance. @@ -893,6 +899,7 @@ Monitor these key metrics: | OutOfMemory errors | Batch size too large | Reduce `max.poll.records`, `max.partition.fetch.bytes` | | Uneven task load | Uneven partition distribution | Rebalance partitions or adjust `tasks.max` | + #### Best practices summary {#performance-best-practices} 1. **Start with defaults**, then measure and tune based on actual performance From b401260b394fd28f7c40a17bb0dcdafce9bd007e Mon Sep 17 00:00:00 2001 From: Sergey Chernov Date: Mon, 22 Jun 2026 10:26:23 -0700 Subject: [PATCH 2/7] compacted tables in kafka docs --- .../kafka/kafka-clickhouse-connect-sink.md | 117 ++++++++++-------- 1 file changed, 64 insertions(+), 53 deletions(-) diff --git a/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md b/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md index 28e1a7bae44..ac768ec2eb2 100644 --- a/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md +++ b/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md @@ -30,6 +30,8 @@ The [Kafka Connect](https://docs.confluent.io/platform/current/connect/index.htm | 1.0.0 | > 23.3 | > 2.7 | > 6.1 | + + ### Main features {#main-features} - Shipped with out-of-the-box exactly-once semantics. It's powered by a new ClickHouse core feature named [KeeperMap](https://github.com/ClickHouse/ClickHouse/pull/39976) (used as a state store by the connector) and allows for minimalistic architecture. @@ -123,6 +125,8 @@ The full table of configuration options: | `reportInsertedOffsets` (since v1.3.6) | Enables returning only successfully inserted offsets from `preCommit` (instead of `currentOffsets`) when `exactlyOnce=false`. This does not apply when `ignorePartitionsWhenBatching=true`, where `currentOffsets` are still returned. | `"false"` | + + ### Target tables {#target-tables} ClickHouse Connect Sink reads messages from Kafka topics and writes them to appropriate tables. ClickHouse Connect Sink writes data into existing tables. Please, make sure a target table with an appropriate schema was created in ClickHouse before starting to insert data into it. @@ -161,6 +165,8 @@ Sink, use [Kafka Connect Transformations](https://docs.confluent.io/platform/cur | org.apache.kafka.connect.data.Decimal | Decimal | ✅ | No | + + - (1) - JSON is supported only when ClickHouse settings has `input_format_binary_read_json_as_string=1`. This works only for RowBinary format family and the setting affects all columns in the insert request so they all should be a string. Connector will convert STRUCT to a JSON string in this case. - (2) - When struct has unions like `oneof` then converter should be configured to NOT add prefix/suffix to a field names. There is `generate.index.for.unions=false` [setting for `ProtobufConverter`](https://docs.confluent.io/platform/current/schema-registry/connect.html#protobuf). @@ -261,22 +267,23 @@ The type mapping below is defined by `io.confluent.connect.avro.AvroConverter`, ️⚠️: Partially supported -| Avro Type | Kafka Connect Type | Supported | Notes | -|-----------|--------------------|-----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| null | _N/A_ | ❌ | Not supported as a standalone type, but can be used in unions | -| boolean | BOOLEAN | ✅ | | -| int | INT8/INT16/INT32 | ✅ | Defaults to INT32. Resolves to INT8 if the schema has property `connect.type=int8` (analagously for INT16 if `connect.type=int16`) | -| long | INT64 | ✅ | | -| float | FLOAT32 | ✅ | | -| double | FLOAT64 | ✅ | | -| bytes | BYTES | ✅ | | -| string | STRING | ✅ | | -| record | STRUCT | ✅ | | -| enum | STRING | ✅ | | -| array | ARRAY/MAP | ✅ | Defaults to ARRAY. Resolves to MAP if the field was originally constructed via `AvroData.fromConnectSchema` ([source](https://github.com/confluentinc/schema-registry/blob/174907bfc0d9424e8d02e788f450f4afcdda1750/avro-data/src/main/java/io/confluent/connect/avro/AvroData.java#L943)) | -| map | MAP | ✅ | | -| union | STRUCT/`` | ⚠️ | Defaults to STRUCT. Resolves to the singleton type `T` in the union definition if `flatten.singleton.unions=true` (see [docs](https://docs.confluent.io/cloud/current/connectors/reference/connector-configuration.html#value-converter-flatten-singleton-unions)) | -| fixed | BYTES | ⚠️ | Fixed `decimal` logical type is not supported (see below) | +| Avro Type | Kafka Connect Type | Supported | Notes | +|---|---|---|---| +| null | _N/A_ | ❌ | Not supported as a standalone type, but can be used in unions | +| boolean | BOOLEAN | ✅ | | +| int | INT8/INT16/INT32 | ✅ | Defaults to INT32. Resolves to INT8 if the schema has property `connect.type=int8` (analagously for INT16 if `connect.type=int16`) | +| long | INT64 | ✅ | | +| float | FLOAT32 | ✅ | | +| double | FLOAT64 | ✅ | | +| bytes | BYTES | ✅ | | +| string | STRING | ✅ | | +| record | STRUCT | ✅ | | +| enum | STRING | ✅ | | +| array | ARRAY/MAP | ✅ | Defaults to ARRAY. Resolves to MAP if the field was originally constructed via `AvroData.fromConnectSchema` ([source](https://github.com/confluentinc/schema-registry/blob/174907bfc0d9424e8d02e788f450f4afcdda1750/avro-data/src/main/java/io/confluent/connect/avro/AvroData.java#L943)) | +| map | MAP | ✅ | | +| union | STRUCT/`` | ⚠️ | Defaults to STRUCT. Resolves to the singleton type `T` in the union definition if `flatten.singleton.unions=true` (see [docs](https://docs.confluent.io/cloud/current/connectors/reference/connector-configuration.html#value-converter-flatten-singleton-unions)) | +| fixed | BYTES | ⚠️ | Fixed `decimal` logical type is not supported (see below) | + Refer to [Supported data types](#supported-data-types) for the mapping between Kafka Connect types and ClickHouse types. @@ -346,43 +353,43 @@ The type mapping below is defined by `io.confluent.connect.protobuf.ProtobufConv ️⚠️: Partially supported -| Protobuf Type | Kafka Connect Type | ClickHouse Type | Supported | Notes | -|-----------------------------------------|-----------------------------------------|------------------------------------------------|-----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| double | FLOAT64 | Float64 | ✅ | | -| float | FLOAT32 | Float32 | ✅ | | -| int32 | INT8/INT16/INT32 | Int32 | ✅ | Defaults to INT32. Resolves to INT8 if the schema has option `connect.type=int8` (analogously for INT16 if `connect.type=int16`) | -| sint32 | INT8/INT16/INT32 | Int32 | ✅ | Defaults to INT32. Resolves to INT8 if the schema has option `connect.type=int8` (analogously for INT16 if `connect.type=int16`) | -| sfixed32 | INT8/INT16/INT32 | Int32 | ✅ | Defaults to INT32. Resolves to INT8 if the schema has option `connect.type=int8` (analogously for INT16 if `connect.type=int16`) | -| uint32 | INT64 | UInt32 | ✅ | | -| fixed32 | INT64 | UInt32 | ✅ | | -| int64 | INT64 | Int64 | ✅ | | -| uint64 | INT64 | UInt64 | ✅ | | -| sint64 | INT64 | Int64 | ✅ | | -| fixed64 | INT64 | UInt64 | ✅ | | -| sfixed64 | INT64 | Int64 | ✅ | | -| bool | BOOLEAN | Bool | ✅ | | -| string | STRING | String | ✅ | | -| bytes | BYTES | String | ✅ | | -| enum | INT32/STRING | Int32 | ✅ | Defaults to STRING. Resolves to INT32 if `int.for.enums=true` (see [schema registry docs](https://docs.confluent.io/platform/current/schema-registry/connect.html#protobuf)) | -| message | STRUCT | Tuple / JSON | ⚠️ | See Unsupported schemas section below | -| repeated T (where T is not a map entry) | ARRAY | Array(T) | ✅ | | -| `map` | MAP | Map(K, V) | ✅ | | -| oneof | STRUCT | Tuple / Variant | ⚠️ | See section below on translating oneof to ClickHouse schema | -| google.protobuf.DoubleValue | FLOAT64 | Nullable(Float64) | ✅ | | -| google.protobuf.FloatValue | FLOAT32 | Nullable(Float32) | ✅ | | -| google.protobuf.Int64Value | INT64 | Nullable(Int64) | ✅ | | -| google.protobuf.UInt64Value | INT64 | Nullable(UInt64) | ✅ | | -| google.protobuf.UInt32Value | INT64 | Nullable(UInt32) | ✅ | | -| google.protobuf.Int32Value | INT32 | Nullable(Int32) | ✅ | | -| google.protobuf.BoolValue | BOOLEAN | Nullable(Bool) | ✅ | | -| google.protobuf.StringValue | STRING | Nullable(String) | ✅ | | -| google.protobuf.BytesValue | BYTES | Nullable(String) | ✅ | | -| google.protobuf.Timestamp | org.apache.kafka.connect.data.Timestamp | DateTime64(3) | ✅ | | -| google.type.Date | org.apache.kafka.connect.data.Date | Date | ✅ | | -| google.type.TimeOfDay | org.apache.kafka.connect.data.Time | Int32 / Int64 | ✅ | | -| google.protobuf.Duration | STRUCT | Tuple(`seconds` Int64, `nano` Nullable(Int32)) | ✅ | | -| google.protobuf.Any | _N/A_ | _N/A_ | ❌ | | -| google.protobuf.Empty | _N/A_ | _N/A_ | ❌ | | +| Protobuf Type | Kafka Connect Type | ClickHouse Type | Supported | Notes | +|---|---|---|---|---| +| double | FLOAT64 | Float64 | ✅ | | +| float | FLOAT32 | Float32 | ✅ | | +| int32 | INT8/INT16/INT32 | Int32 | ✅ | Defaults to INT32. Resolves to INT8 if the schema has option `connect.type=int8` (analogously for INT16 if `connect.type=int16`) | +| sint32 | INT8/INT16/INT32 | Int32 | ✅ | Defaults to INT32. Resolves to INT8 if the schema has option `connect.type=int8` (analogously for INT16 if `connect.type=int16`) | +| sfixed32 | INT8/INT16/INT32 | Int32 | ✅ | Defaults to INT32. Resolves to INT8 if the schema has option `connect.type=int8` (analogously for INT16 if `connect.type=int16`) | +| uint32 | INT64 | UInt32 | ✅ | | +| fixed32 | INT64 | UInt32 | ✅ | | +| int64 | INT64 | Int64 | ✅ | | +| uint64 | INT64 | UInt64 | ✅ | | +| sint64 | INT64 | Int64 | ✅ | | +| fixed64 | INT64 | UInt64 | ✅ | | +| sfixed64 | INT64 | Int64 | ✅ | | +| bool | BOOLEAN | Bool | ✅ | | +| string | STRING | String | ✅ | | +| bytes | BYTES | String | ✅ | | +| enum | INT32/STRING | Int32 | ✅ | Defaults to STRING. Resolves to INT32 if `int.for.enums=true` (see [schema registry docs](https://docs.confluent.io/platform/current/schema-registry/connect.html#protobuf)) | +| message | STRUCT | Tuple / JSON | ⚠️ | See Unsupported schemas section below | +| repeated T (where T is not a map entry) | ARRAY | Array(T) | ✅ | | +| `map` | MAP | Map(K, V) | ✅ | | +| oneof | STRUCT | Tuple / Variant | ⚠️ | See section below on translating oneof to ClickHouse schema | +| google.protobuf.DoubleValue | FLOAT64 | Nullable(Float64) | ✅ | | +| google.protobuf.FloatValue | FLOAT32 | Nullable(Float32) | ✅ | | +| google.protobuf.Int64Value | INT64 | Nullable(Int64) | ✅ | | +| google.protobuf.UInt64Value | INT64 | Nullable(UInt64) | ✅ | | +| google.protobuf.UInt32Value | INT64 | Nullable(UInt32) | ✅ | | +| google.protobuf.Int32Value | INT32 | Nullable(Int32) | ✅ | | +| google.protobuf.BoolValue | BOOLEAN | Nullable(Bool) | ✅ | | +| google.protobuf.StringValue | STRING | Nullable(String) | ✅ | | +| google.protobuf.BytesValue | BYTES | Nullable(String) | ✅ | | +| google.protobuf.Timestamp | org.apache.kafka.connect.data.Timestamp | DateTime64(3) | ✅ | | +| google.type.Date | org.apache.kafka.connect.data.Date | Date | ✅ | | +| google.type.TimeOfDay | org.apache.kafka.connect.data.Time | Int32 / Int64 | ✅ | | +| google.protobuf.Duration | STRUCT | Tuple(`seconds` Int64, `nano` Nullable(Int32)) | ✅ | | +| google.protobuf.Any | _N/A_ | _N/A_ | ❌ | | +| google.protobuf.Empty | _N/A_ | _N/A_ | ❌ | | Refer to [Supported data types](#supported-data-types) for the mapping between Kafka Connect types and ClickHouse types. @@ -536,6 +543,8 @@ com.clickhouse:type=ClickHouseKafkaConnector,name=SinkTask{id} | `taskProcessingTime` | long | Total time in nanoseconds spent processing and inserting data into ClickHouse. | + + #### Kafka Producer/Consumer Metrics {#kafka-producer-consumer-metrics} The connector exposes standard Kafka producer and consumer metrics that provide insights into data flow, throughput, and performance. @@ -900,6 +909,8 @@ Monitor these key metrics: | Uneven task load | Uneven partition distribution | Rebalance partitions or adjust `tasks.max` | + + #### Best practices summary {#performance-best-practices} 1. **Start with defaults**, then measure and tune based on actual performance From e80c8c59a199a761cc4a9e5895308bfb98ccd8eb Mon Sep 17 00:00:00 2001 From: Sergey Chernov Date: Mon, 22 Jun 2026 10:58:49 -0700 Subject: [PATCH 3/7] fixed capitalization --- .../data-ingestion/kafka/kafka-clickhouse-connect-sink.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md b/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md index ac768ec2eb2..a1a71c32a87 100644 --- a/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md +++ b/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md @@ -344,7 +344,7 @@ The following Avro schemas are unsupported by the connector: Please note: if you encounter issues with missing classes, not every environment comes with the protobuf converter and you may need an alternate release of the jar bundled with dependencies. -#### Protobuf Type Mapping {#proto-type-mapping} +#### Protobuf type mapping {#proto-type-mapping} The type mapping below is defined by `io.confluent.connect.protobuf.ProtobufConverter`, the official Protobuf serializer/deserializer implementation in Kafka Connect. See the Kafka Connect [docs](https://docs.confluent.io/platform/current/connect/userguide.html#json-schema-and-protobuf) for advanced information on conversion logic. ✅: Supported @@ -422,7 +422,7 @@ CREATE TABLE IF NOT EXISTS `StringIntUnion` ) ENGINE = ...; ``` -#### Unsupported Protobuf schemas {#unsupported-proto-schemas} +#### Unsupported protobuf schemas {#unsupported-proto-schemas} The following Protobuf schemas are unsupported by the connector: - multi-message unions (**before CH version 26.1**) ```protobuf From 2a11e6e651c8eb32fa3c9ba333821407baa2cb2d Mon Sep 17 00:00:00 2001 From: Sergey Chernov Date: Mon, 22 Jun 2026 11:17:22 -0700 Subject: [PATCH 4/7] removed extra blank lines --- .../kafka/kafka-clickhouse-connect-sink.md | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md b/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md index a1a71c32a87..90e57b1fca9 100644 --- a/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md +++ b/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md @@ -29,9 +29,6 @@ The [Kafka Connect](https://docs.confluent.io/platform/current/connect/index.htm |----------------------------------|--------------------|---------------|--------------------| | 1.0.0 | > 23.3 | > 2.7 | > 6.1 | - - - ### Main features {#main-features} - Shipped with out-of-the-box exactly-once semantics. It's powered by a new ClickHouse core feature named [KeeperMap](https://github.com/ClickHouse/ClickHouse/pull/39976) (used as a state store by the connector) and allows for minimalistic architecture. @@ -124,9 +121,6 @@ The full table of configuration options: | `bufferFlushTime` (since v1.3.6) | Maximum time in milliseconds to buffer records before flush when `exactlyOnce=false`. `0` disables time-based flushing. Default value is `0`. Only required for time-base threshold. Only effective when `bufferCount > 0`. | `"0"` | | `reportInsertedOffsets` (since v1.3.6) | Enables returning only successfully inserted offsets from `preCommit` (instead of `currentOffsets`) when `exactlyOnce=false`. This does not apply when `ignorePartitionsWhenBatching=true`, where `currentOffsets` are still returned. | `"false"` | - - - ### Target tables {#target-tables} ClickHouse Connect Sink reads messages from Kafka topics and writes them to appropriate tables. ClickHouse Connect Sink writes data into existing tables. Please, make sure a target table with an appropriate schema was created in ClickHouse before starting to insert data into it. @@ -164,9 +158,6 @@ Sink, use [Kafka Connect Transformations](https://docs.confluent.io/platform/cur | org.apache.kafka.connect.data.Timestamp | Int32 / Date32 | ✅ | No | | org.apache.kafka.connect.data.Decimal | Decimal | ✅ | No | - - - - (1) - JSON is supported only when ClickHouse settings has `input_format_binary_read_json_as_string=1`. This works only for RowBinary format family and the setting affects all columns in the insert request so they all should be a string. Connector will convert STRUCT to a JSON string in this case. - (2) - When struct has unions like `oneof` then converter should be configured to NOT add prefix/suffix to a field names. There is `generate.index.for.unions=false` [setting for `ProtobufConverter`](https://docs.confluent.io/platform/current/schema-registry/connect.html#protobuf). @@ -284,7 +275,6 @@ The type mapping below is defined by `io.confluent.connect.avro.AvroConverter`, | union | STRUCT/`` | ⚠️ | Defaults to STRUCT. Resolves to the singleton type `T` in the union definition if `flatten.singleton.unions=true` (see [docs](https://docs.confluent.io/cloud/current/connectors/reference/connector-configuration.html#value-converter-flatten-singleton-unions)) | | fixed | BYTES | ⚠️ | Fixed `decimal` logical type is not supported (see below) | - Refer to [Supported data types](#supported-data-types) for the mapping between Kafka Connect types and ClickHouse types. #### Unsupported Avro schemas {#unsupported-avro-schemas} @@ -391,7 +381,6 @@ The type mapping below is defined by `io.confluent.connect.protobuf.ProtobufConv | google.protobuf.Any | _N/A_ | _N/A_ | ❌ | | | google.protobuf.Empty | _N/A_ | _N/A_ | ❌ | | - Refer to [Supported data types](#supported-data-types) for the mapping between Kafka Connect types and ClickHouse types. #### Note on translating `oneof` fields to ClickHouse columns {#oneof-translation} @@ -542,9 +531,6 @@ com.clickhouse:type=ClickHouseKafkaConnector,name=SinkTask{id} | `recordProcessingTime` | long | Total time in nanoseconds spent grouping and converting records to a unified structure. | | `taskProcessingTime` | long | Total time in nanoseconds spent processing and inserting data into ClickHouse. | - - - #### Kafka Producer/Consumer Metrics {#kafka-producer-consumer-metrics} The connector exposes standard Kafka producer and consumer metrics that provide insights into data flow, throughput, and performance. @@ -908,9 +894,6 @@ Monitor these key metrics: | OutOfMemory errors | Batch size too large | Reduce `max.poll.records`, `max.partition.fetch.bytes` | | Uneven task load | Uneven partition distribution | Rebalance partitions or adjust `tasks.max` | - - - #### Best practices summary {#performance-best-practices} 1. **Start with defaults**, then measure and tune based on actual performance From 2a7c7a4050413884482019c04fed246bda5d94f6 Mon Sep 17 00:00:00 2001 From: Dominic Tran Date: Mon, 22 Jun 2026 15:15:08 -0500 Subject: [PATCH 5/7] adding protobuff to exceptions to that Protobuff passes CI --- .../data-ingestion/kafka/kafka-clickhouse-connect-sink.md | 2 +- styles/ClickHouse/Headings.yml | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md b/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md index 90e57b1fca9..de0dde21af8 100644 --- a/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md +++ b/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md @@ -411,7 +411,7 @@ CREATE TABLE IF NOT EXISTS `StringIntUnion` ) ENGINE = ...; ``` -#### Unsupported protobuf schemas {#unsupported-proto-schemas} +#### Unsupported Protobuf schemas {#unsupported-proto-schemas} The following Protobuf schemas are unsupported by the connector: - multi-message unions (**before CH version 26.1**) ```protobuf diff --git a/styles/ClickHouse/Headings.yml b/styles/ClickHouse/Headings.yml index ef4bcc1437a..d2df3f20ee4 100644 --- a/styles/ClickHouse/Headings.yml +++ b/styles/ClickHouse/Headings.yml @@ -226,6 +226,7 @@ exceptions: - RowBinary - MessagePack - Protocol Buffers + - Protobuf - Cap'n Proto - TabSeparated - TabSeparatedRaw From 0efdc6e4dbf17d7894b70e2ebcd14485a73789eb Mon Sep 17 00:00:00 2001 From: Dominic Tran Date: Mon, 22 Jun 2026 15:17:48 -0500 Subject: [PATCH 6/7] Update docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md --- .../data-ingestion/kafka/kafka-clickhouse-connect-sink.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md b/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md index de0dde21af8..06d4081e73b 100644 --- a/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md +++ b/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md @@ -262,7 +262,7 @@ The type mapping below is defined by `io.confluent.connect.avro.AvroConverter`, |---|---|---|---| | null | _N/A_ | ❌ | Not supported as a standalone type, but can be used in unions | | boolean | BOOLEAN | ✅ | | -| int | INT8/INT16/INT32 | ✅ | Defaults to INT32. Resolves to INT8 if the schema has property `connect.type=int8` (analagously for INT16 if `connect.type=int16`) | +| int | INT8/INT16/INT32 | ✅ | Defaults to INT32. Resolves to INT8 if the schema has property `connect.type=int8` (analogously for INT16 if `connect.type=int16`) | | long | INT64 | ✅ | | | float | FLOAT32 | ✅ | | | double | FLOAT64 | ✅ | | From 560304061368612025062a4b2093633365361a6f Mon Sep 17 00:00:00 2001 From: Sergey Chernov Date: Mon, 22 Jun 2026 13:40:06 -0700 Subject: [PATCH 7/7] forces left justify for header --- .../data-ingestion/kafka/kafka-clickhouse-connect-sink.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md b/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md index 06d4081e73b..344e013f206 100644 --- a/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md +++ b/docs/integrations/data-ingestion/kafka/kafka-clickhouse-connect-sink.md @@ -344,7 +344,7 @@ The type mapping below is defined by `io.confluent.connect.protobuf.ProtobufConv ️⚠️: Partially supported | Protobuf Type | Kafka Connect Type | ClickHouse Type | Supported | Notes | -|---|---|---|---|---| +|:---|:---|:---|:---|:---| | double | FLOAT64 | Float64 | ✅ | | | float | FLOAT32 | Float32 | ✅ | | | int32 | INT8/INT16/INT32 | Int32 | ✅ | Defaults to INT32. Resolves to INT8 if the schema has option `connect.type=int8` (analogously for INT16 if `connect.type=int16`) |