From 90f6c93f09f460520057ee2e17aace4921071b57 Mon Sep 17 00:00:00 2001 From: bentsileviav Date: Tue, 23 Jun 2026 22:57:15 +0300 Subject: [PATCH 1/2] Add Apache Airflow provider integration page --- .../data-ingestion/data-ingestion-index.md | 3 +- .../etl-tools/airflow-and-clickhouse.md | 212 ++++++++++++++++++ sidebars.js | 1 + static/integrations-fallback.json | 4 +- 4 files changed, 217 insertions(+), 3 deletions(-) create mode 100644 docs/integrations/data-ingestion/etl-tools/airflow-and-clickhouse.md diff --git a/docs/integrations/data-ingestion/data-ingestion-index.md b/docs/integrations/data-ingestion/data-ingestion-index.md index a5a26050e01..ecfb030bc85 100644 --- a/docs/integrations/data-ingestion/data-ingestion-index.md +++ b/docs/integrations/data-ingestion/data-ingestion-index.md @@ -1,6 +1,6 @@ --- slug: /integrations/data-ingestion-overview -keywords: [ 'Airbyte', 'Apache Spark', 'Spark', 'Azure Synapse', 'Amazon Glue', 'Apache Beam', 'dbt', 'Fivetran', 'NiFi', 'dlt', 'Vector' ] +keywords: [ 'Airbyte', 'Apache Airflow', 'Airflow', 'Apache Spark', 'Spark', 'Azure Synapse', 'Amazon Glue', 'Apache Beam', 'dbt', 'Fivetran', 'NiFi', 'dlt', 'Vector' ] title: 'Data ingestion' description: 'Landing page for the data ingestion section' doc_type: 'landing-page' @@ -13,6 +13,7 @@ For more information check out the pages below: | Data Ingestion Tool | Description | |------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | [Airbyte](/integrations/airbyte) | An open-source data integration platform. It allows the creation of ELT data pipelines and is shipped with more than 140 out-of-the-box connectors. | +| [Apache Airflow](/integrations/airflow) | An open-source platform for authoring, scheduling, and monitoring workflows as code, with a provider for running ClickHouse queries and data loads from DAGs. | | [Apache Spark](/integrations/apache-spark) | A multi-language engine for executing data engineering, data science, and machine learning on single-node machines or clusters | | [Apache Flink](/integrations/apache-flink) | Real-time data ingestion and processing into ClickHouse through Flink's DataStream API with support for batch writes | | [Amazon Glue](/integrations/glue) | A fully managed, serverless data integration service provided by Amazon Web Services (AWS) simplifying the process of discovering, preparing, and transforming data for analytics, machine learning, and application development. | diff --git a/docs/integrations/data-ingestion/etl-tools/airflow-and-clickhouse.md b/docs/integrations/data-ingestion/etl-tools/airflow-and-clickhouse.md new file mode 100644 index 00000000000..9d17ef4fe45 --- /dev/null +++ b/docs/integrations/data-ingestion/etl-tools/airflow-and-clickhouse.md @@ -0,0 +1,212 @@ +--- +sidebar_label: 'Apache Airflow' +slug: /integrations/airflow +description: 'Orchestrate ClickHouse queries and data loads from Apache Airflow using the ClickHouse provider' +title: 'Connect Apache Airflow to ClickHouse' +doc_type: 'guide' +integration: + - support_level: 'core' + - category: 'data_ingestion' + - website: 'https://airflow.apache.org/' +keywords: ['Apache Airflow', 'orchestration', 'DAG', 'clickhouse-connect', 'SQLExecuteQueryOperator', 'ClickHouseHook', 'workflow', 'scheduler'] +--- + +import ClickHouseSupportedBadge from '@theme/badges/ClickHouseSupported'; + + + +[Apache Airflow](https://airflow.apache.org/) is an open-source platform for authoring, scheduling, and monitoring workflows as code. Workflows are defined as directed acyclic graphs (DAGs) of tasks written in Python. + +The `apache-airflow-providers-clickhousedb` provider connects Airflow to ClickHouse, letting you run queries, create tables, and load data as part of a DAG. It connects over the [HTTP interface](/interfaces/http) using the [`clickhouse-connect`](/integrations/python) client, and exposes ClickHouse through Airflow's common SQL framework, so the standard `SQLExecuteQueryOperator` handles DDL, DML, and analytical queries with no ClickHouse-specific operator required. + +## Install the provider {#install-the-provider} + +Install the provider into the environment where your Airflow scheduler and workers run: + +```bash +pip install apache-airflow-providers-clickhousedb +``` + +The provider depends on `apache-airflow-providers-common-sql` and `clickhouse-connect`, which are installed alongside it. To pass query results to pandas or polars DataFrames, install the optional extras: + +```bash +pip install 'apache-airflow-providers-common-sql[pandas,polars]' +``` + +## Create a ClickHouse connection {#create-a-clickhouse-connection} + +The provider registers a connection type of `clickhouse`. Create a connection from the Airflow UI under **Admin > Connections**, or define one through the CLI or an environment variable. + +In the UI, select **ClickHouse** as the connection type and fill in the fields: + +| Field | Description | Default | +|--------------------------|--------------------------------------------------------------------------|-------------------------------| +| **Host** | ClickHouse server hostname, for example `abc123.clickhouse.cloud` | `localhost` | +| **Port** | HTTP(S) port | `8123` (plain), `8443` (TLS) | +| **Login** | ClickHouse username | `default` | +| **Password** | ClickHouse user password | (empty) | +| **Database** | Default database for the connection. The UI labels this **Database**; it is the `schema` field when you define the connection by URI or JSON. | `default` | + +For [ClickHouse Cloud](/cloud/overview) or any self-hosted cluster with TLS enabled, set `secure` to `true` in the **Extra** field and use the TLS port (`8443`). + +### Extra connection options {#extra-connection-options} + +The provider exposes additional options as dedicated fields in the connection form. When you define the connection by URI, JSON, or environment variable instead, supply them as keys in the `extra` JSON object. All are optional: + +| `extra` key | UI field | Default | Description | +|------------------------|------------------------------|---------|----------------------------------------------------------------------------------------------| +| `secure` | Use TLS (HTTPS) | `false` | Enable HTTPS/TLS. | +| `verify` | Verify SSL Certificate | `true` | Verify the server TLS certificate when `secure` is `true`. Set to `false` for self-signed certificates. | +| `connect_timeout` | Connection Timeout (seconds) | `10` | HTTP connection timeout in seconds. | +| `send_receive_timeout` | Query Timeout (seconds) | `300` | Query read/write timeout in seconds. Increase this for long-running analytical queries. | +| `compress` | Enable LZ4 Compression | `true` | Enable LZ4 result compression. | +| `client_name` | Client Name | (empty) | A label appended to the Airflow version identifier in the ClickHouse `User-Agent` and the `client_name` column of [`system.query_log`](/operations/system-tables/query_log). | +| `session_settings` | Session Settings (JSON) | (empty) | [ClickHouse session settings](/operations/settings/settings) applied to every query on the connection, for example `{"max_execution_time": 300, "max_threads": 8}`. | +| `client_kwargs` | Client kwargs (JSON) | (empty) | Additional keyword arguments forwarded to `clickhouse_connect.get_client()`, for example an `http_proxy`. | + +### Define a connection without the UI {#define-a-connection-without-the-ui} + +Set the connection through an environment variable. The URI form covers host, credentials, and database: + +```bash +export AIRFLOW_CONN_CLICKHOUSE_DEFAULT='clickhouse://default:password@localhost:8123/my_database' +``` + +All components of the URI must be URL-encoded. For TLS, timeouts, or session settings, use the JSON form, which exposes the **Extra** fields: + +```bash +export AIRFLOW_CONN_CLICKHOUSE_DEFAULT='{ + "conn_type": "clickhouse", + "host": "abc123.clickhouse.cloud", + "port": 8443, + "login": "default", + "password": "secret", + "schema": "my_database", + "extra": { + "secure": true, + "session_settings": { + "max_execution_time": 300, + "max_memory_usage": 10000000000 + } + } +}' +``` + +All hooks and operators use the connection ID `clickhouse_default` unless you specify another. + +## Run queries with SQLExecuteQueryOperator {#run-queries} + +Set the operator's `conn_id` to your ClickHouse connection. The following DAG creates a table, inserts rows, reads them back, and drops the table: + +```python +from datetime import datetime + +from airflow import DAG +from airflow.providers.common.sql.hooks.sql import fetch_all_handler +from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator + +CLICKHOUSE_CONN_ID = "clickhouse_default" +CLICKHOUSE_TABLE = "airflow_example" + +with DAG( + dag_id="example_clickhouse", + start_date=datetime(2021, 1, 1), + default_args={"conn_id": CLICKHOUSE_CONN_ID}, + schedule="@once", + catchup=False, +) as dag: + create_table = SQLExecuteQueryOperator( + task_id="create_table", + sql=f""" + CREATE TABLE IF NOT EXISTS {CLICKHOUSE_TABLE} ( + id UInt32, + name String, + ts DateTime DEFAULT now() + ) ENGINE = MergeTree() + ORDER BY id + """, + ) + + insert_rows = SQLExecuteQueryOperator( + task_id="insert_rows", + sql=f""" + INSERT INTO {CLICKHOUSE_TABLE} (id, name) VALUES + (1, 'Alice'), + (2, 'Bob'), + (3, 'Charlie') + """, + ) + + read_rows = SQLExecuteQueryOperator( + task_id="read_rows", + sql=f"SELECT id, name FROM {CLICKHOUSE_TABLE} ORDER BY id", + handler=fetch_all_handler, + ) + + drop_table = SQLExecuteQueryOperator( + task_id="drop_table", + sql=f"DROP TABLE IF EXISTS {CLICKHOUSE_TABLE}", + ) + + create_table >> insert_rows >> read_rows >> drop_table +``` + +Query results are fetched with the default `handler` (`fetch_all_handler`). To return something other than the full result set, pass a different handler, such as `fetch_one_handler` for the first row only. + +### Target a different database per task {#target-a-different-database} + +When one connection points at a cluster and individual tasks query different databases, override the database through `hook_params` instead of creating a separate connection: + +```python +read_rows = SQLExecuteQueryOperator( + task_id="read_rows", + conn_id=CLICKHOUSE_CONN_ID, + sql="SELECT count() FROM events", + hook_params={"database": "analytics"}, +) +``` + +## Use the hook directly {#use-the-hook-directly} + +For work that doesn't fit a SQL operator — bulk inserts, streaming, or ClickHouse-specific client calls — use `ClickHouseHook` inside a Python task. + +The hook's `bulk_insert_rows` method uses the native columnar insert path in `clickhouse-connect`, which is much faster than row-by-row inserts for large datasets. Set `batch_size` to bound peak memory on very large inputs: + +```python +from airflow.providers.clickhousedb.hooks.clickhouse import ClickHouseHook + +hook = ClickHouseHook(clickhouse_conn_id="clickhouse_default") + +hook.bulk_insert_rows( + table="events", + rows=[("user1", "click"), ("user2", "view")], + column_names=["user_id", "action"], + batch_size=1000, +) +``` + +Call `get_client()` to reach the underlying `clickhouse-connect` client for anything the hook doesn't expose directly: + +```python +client = hook.get_client() +total = client.query("SELECT count() FROM events").result_rows[0][0] +``` + +### Apply session settings {#apply-session-settings} + +Pass [session settings](/operations/settings/settings) when constructing the hook, either directly or through an operator's `hook_params`. Settings passed to the constructor are merged on top of any `session_settings` defined in the connection's **Extra** field, and the constructor values win on conflicting keys: + +```python +hook = ClickHouseHook( + clickhouse_conn_id="clickhouse_default", + session_settings={"max_execution_time": 60, "max_threads": 4}, +) +``` + +## Related content {#related-content} + +- [`clickhouse-connect` Python client](/integrations/python) +- [ClickHouse HTTP interface](/interfaces/http) +- [ClickHouse session settings reference](/operations/settings/settings) +- [`apache-airflow-providers-clickhousedb` reference docs](https://airflow.apache.org/docs/apache-airflow-providers-clickhousedb/) +- [Provider package on PyPI](https://pypi.org/project/apache-airflow-providers-clickhousedb/) diff --git a/sidebars.js b/sidebars.js index 977e76e65e2..baf164da462 100644 --- a/sidebars.js +++ b/sidebars.js @@ -1135,6 +1135,7 @@ const sidebars = { }, items: [ 'integrations/data-ingestion/etl-tools/airbyte-and-clickhouse', + 'integrations/data-ingestion/etl-tools/airflow-and-clickhouse', 'integrations/data-ingestion/etl-tools/apify-and-clickhouse', { type: 'category', diff --git a/static/integrations-fallback.json b/static/integrations-fallback.json index 94d38ffd1b7..1f8c9ae3417 100644 --- a/static/integrations-fallback.json +++ b/static/integrations-fallback.json @@ -1735,8 +1735,8 @@ "name": "Apache Airflow", "slug": "apache_airflow", "category": "DATA_INGESTION", - "supportLevel": "COMMUNITY", - "docsLink": "https://github.com/bryzgaloff/airflow-clickhouse-plugin", + "supportLevel": "CORE", + "docsLink": "https://clickhouse.com/docs/integrations/airflow", "logo": { "id": 2661, "documentId": "xe6c78wt9ztzz2w8aii5w76u", From 76b83c85a82b63bc3fb3b71dfb4da96ae7e78175 Mon Sep 17 00:00:00 2001 From: Dominic Tran Date: Wed, 24 Jun 2026 12:15:33 -0500 Subject: [PATCH 2/2] adding Airflow to heading exceptions, fixing CI --- styles/ClickHouse/Headings.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/styles/ClickHouse/Headings.yml b/styles/ClickHouse/Headings.yml index 75452145210..33a5bf74c72 100644 --- a/styles/ClickHouse/Headings.yml +++ b/styles/ClickHouse/Headings.yml @@ -25,6 +25,8 @@ exceptions: - Arrow - AWS - Apache + - Airflow + - Apache Airflow - AggregatingMergeTree - Amazon - Amazon Web Services