-
Notifications
You must be signed in to change notification settings - Fork 484
Add Apache Airflow provider integration page #6437
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,212 @@ | ||
| --- | ||
| sidebar_label: 'Apache Airflow' | ||
| slug: /integrations/airflow | ||
| description: 'Orchestrate ClickHouse queries and data loads from Apache Airflow using the ClickHouse provider' | ||
| title: 'Connect Apache Airflow to ClickHouse' | ||
| doc_type: 'guide' | ||
| integration: | ||
| - support_level: 'core' | ||
| - category: 'data_ingestion' | ||
| - website: 'https://airflow.apache.org/' | ||
| keywords: ['Apache Airflow', 'orchestration', 'DAG', 'clickhouse-connect', 'SQLExecuteQueryOperator', 'ClickHouseHook', 'workflow', 'scheduler'] | ||
| --- | ||
|
|
||
| import ClickHouseSupportedBadge from '@theme/badges/ClickHouseSupported'; | ||
|
|
||
| <ClickHouseSupportedBadge/> | ||
|
|
||
| [Apache Airflow](https://airflow.apache.org/) is an open-source platform for authoring, scheduling, and monitoring workflows as code. Workflows are defined as directed acyclic graphs (DAGs) of tasks written in Python. | ||
|
|
||
| The `apache-airflow-providers-clickhousedb` provider connects Airflow to ClickHouse, letting you run queries, create tables, and load data as part of a DAG. It connects over the [HTTP interface](/interfaces/http) using the [`clickhouse-connect`](/integrations/python) client, and exposes ClickHouse through Airflow's common SQL framework, so the standard `SQLExecuteQueryOperator` handles DDL, DML, and analytical queries with no ClickHouse-specific operator required. | ||
|
Check notice on line 20 in docs/integrations/data-ingestion/etl-tools/airflow-and-clickhouse.md
|
||
|
|
||
| ## Install the provider {#install-the-provider} | ||
|
|
||
| Install the provider into the environment where your Airflow scheduler and workers run: | ||
|
|
||
| ```bash | ||
|
Check notice on line 26 in docs/integrations/data-ingestion/etl-tools/airflow-and-clickhouse.md
|
||
| pip install apache-airflow-providers-clickhousedb | ||
| ``` | ||
|
|
||
| The provider depends on `apache-airflow-providers-common-sql` and `clickhouse-connect`, which are installed alongside it. To pass query results to pandas or polars DataFrames, install the optional extras: | ||
|
|
||
| ```bash | ||
|
Check notice on line 32 in docs/integrations/data-ingestion/etl-tools/airflow-and-clickhouse.md
|
||
| pip install 'apache-airflow-providers-common-sql[pandas,polars]' | ||
| ``` | ||
|
|
||
| ## Create a ClickHouse connection {#create-a-clickhouse-connection} | ||
|
|
||
| The provider registers a connection type of `clickhouse`. Create a connection from the Airflow UI under **Admin > Connections**, or define one through the CLI or an environment variable. | ||
|
|
||
| In the UI, select **ClickHouse** as the connection type and fill in the fields: | ||
|
|
||
| | Field | Description | Default | | ||
| |--------------------------|--------------------------------------------------------------------------|-------------------------------| | ||
| | **Host** | ClickHouse server hostname, for example `abc123.clickhouse.cloud` | `localhost` | | ||
| | **Port** | HTTP(S) port | `8123` (plain), `8443` (TLS) | | ||
| | **Login** | ClickHouse username | `default` | | ||
| | **Password** | ClickHouse user password | (empty) | | ||
| | **Database** | Default database for the connection. The UI labels this **Database**; it is the `schema` field when you define the connection by URI or JSON. | `default` | | ||
|
|
||
| For [ClickHouse Cloud](/cloud/overview) or any self-hosted cluster with TLS enabled, set `secure` to `true` in the **Extra** field and use the TLS port (`8443`). | ||
|
|
||
| ### Extra connection options {#extra-connection-options} | ||
|
|
||
| The provider exposes additional options as dedicated fields in the connection form. When you define the connection by URI, JSON, or environment variable instead, supply them as keys in the `extra` JSON object. All are optional: | ||
|
|
||
| | `extra` key | UI field | Default | Description | | ||
| |------------------------|------------------------------|---------|----------------------------------------------------------------------------------------------| | ||
| | `secure` | Use TLS (HTTPS) | `false` | Enable HTTPS/TLS. | | ||
| | `verify` | Verify SSL Certificate | `true` | Verify the server TLS certificate when `secure` is `true`. Set to `false` for self-signed certificates. | | ||
| | `connect_timeout` | Connection Timeout (seconds) | `10` | HTTP connection timeout in seconds. | | ||
| | `send_receive_timeout` | Query Timeout (seconds) | `300` | Query read/write timeout in seconds. Increase this for long-running analytical queries. | | ||
| | `compress` | Enable LZ4 Compression | `true` | Enable LZ4 result compression. | | ||
| | `client_name` | Client Name | (empty) | A label appended to the Airflow version identifier in the ClickHouse `User-Agent` and the `client_name` column of [`system.query_log`](/operations/system-tables/query_log). | | ||
| | `session_settings` | Session Settings (JSON) | (empty) | [ClickHouse session settings](/operations/settings/settings) applied to every query on the connection, for example `{"max_execution_time": 300, "max_threads": 8}`. | | ||
| | `client_kwargs` | Client kwargs (JSON) | (empty) | Additional keyword arguments forwarded to `clickhouse_connect.get_client()`, for example an `http_proxy`. | | ||
|
|
||
| ### Define a connection without the UI {#define-a-connection-without-the-ui} | ||
|
|
||
| Set the connection through an environment variable. The URI form covers host, credentials, and database: | ||
|
|
||
| ```bash | ||
|
Check notice on line 71 in docs/integrations/data-ingestion/etl-tools/airflow-and-clickhouse.md
|
||
| export AIRFLOW_CONN_CLICKHOUSE_DEFAULT='clickhouse://default:password@localhost:8123/my_database' | ||
| ``` | ||
|
|
||
| All components of the URI must be URL-encoded. For TLS, timeouts, or session settings, use the JSON form, which exposes the **Extra** fields: | ||
|
|
||
| ```bash | ||
|
Check notice on line 77 in docs/integrations/data-ingestion/etl-tools/airflow-and-clickhouse.md
|
||
| export AIRFLOW_CONN_CLICKHOUSE_DEFAULT='{ | ||
| "conn_type": "clickhouse", | ||
| "host": "abc123.clickhouse.cloud", | ||
| "port": 8443, | ||
| "login": "default", | ||
| "password": "secret", | ||
| "schema": "my_database", | ||
| "extra": { | ||
| "secure": true, | ||
| "session_settings": { | ||
| "max_execution_time": 300, | ||
| "max_memory_usage": 10000000000 | ||
| } | ||
| } | ||
| }' | ||
| ``` | ||
|
|
||
| All hooks and operators use the connection ID `clickhouse_default` unless you specify another. | ||
|
|
||
| ## Run queries with SQLExecuteQueryOperator {#run-queries} | ||
|
|
||
| Set the operator's `conn_id` to your ClickHouse connection. The following DAG creates a table, inserts rows, reads them back, and drops the table: | ||
|
Check notice on line 99 in docs/integrations/data-ingestion/etl-tools/airflow-and-clickhouse.md
|
||
|
|
||
| ```python | ||
| from datetime import datetime | ||
|
|
||
| from airflow import DAG | ||
| from airflow.providers.common.sql.hooks.sql import fetch_all_handler | ||
| from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator | ||
|
|
||
| CLICKHOUSE_CONN_ID = "clickhouse_default" | ||
| CLICKHOUSE_TABLE = "airflow_example" | ||
|
|
||
| with DAG( | ||
| dag_id="example_clickhouse", | ||
| start_date=datetime(2021, 1, 1), | ||
| default_args={"conn_id": CLICKHOUSE_CONN_ID}, | ||
| schedule="@once", | ||
| catchup=False, | ||
| ) as dag: | ||
| create_table = SQLExecuteQueryOperator( | ||
| task_id="create_table", | ||
| sql=f""" | ||
| CREATE TABLE IF NOT EXISTS {CLICKHOUSE_TABLE} ( | ||
| id UInt32, | ||
| name String, | ||
| ts DateTime DEFAULT now() | ||
| ) ENGINE = MergeTree() | ||
| ORDER BY id | ||
| """, | ||
| ) | ||
|
|
||
| insert_rows = SQLExecuteQueryOperator( | ||
| task_id="insert_rows", | ||
| sql=f""" | ||
| INSERT INTO {CLICKHOUSE_TABLE} (id, name) VALUES | ||
| (1, 'Alice'), | ||
| (2, 'Bob'), | ||
| (3, 'Charlie') | ||
| """, | ||
| ) | ||
|
|
||
| read_rows = SQLExecuteQueryOperator( | ||
| task_id="read_rows", | ||
| sql=f"SELECT id, name FROM {CLICKHOUSE_TABLE} ORDER BY id", | ||
| handler=fetch_all_handler, | ||
| ) | ||
|
|
||
| drop_table = SQLExecuteQueryOperator( | ||
| task_id="drop_table", | ||
| sql=f"DROP TABLE IF EXISTS {CLICKHOUSE_TABLE}", | ||
| ) | ||
|
|
||
| create_table >> insert_rows >> read_rows >> drop_table | ||
| ``` | ||
|
|
||
| Query results are fetched with the default `handler` (`fetch_all_handler`). To return something other than the full result set, pass a different handler, such as `fetch_one_handler` for the first row only. | ||
|
|
||
| ### Target a different database per task {#target-a-different-database} | ||
|
|
||
| When one connection points at a cluster and individual tasks query different databases, override the database through `hook_params` instead of creating a separate connection: | ||
|
|
||
| ```python | ||
| read_rows = SQLExecuteQueryOperator( | ||
| task_id="read_rows", | ||
| conn_id=CLICKHOUSE_CONN_ID, | ||
| sql="SELECT count() FROM events", | ||
| hook_params={"database": "analytics"}, | ||
| ) | ||
| ``` | ||
|
|
||
| ## Use the hook directly {#use-the-hook-directly} | ||
|
|
||
| For work that doesn't fit a SQL operator — bulk inserts, streaming, or ClickHouse-specific client calls — use `ClickHouseHook` inside a Python task. | ||
|
|
||
| The hook's `bulk_insert_rows` method uses the native columnar insert path in `clickhouse-connect`, which is much faster than row-by-row inserts for large datasets. Set `batch_size` to bound peak memory on very large inputs: | ||
|
|
||
| ```python | ||
| from airflow.providers.clickhousedb.hooks.clickhouse import ClickHouseHook | ||
|
|
||
| hook = ClickHouseHook(clickhouse_conn_id="clickhouse_default") | ||
|
|
||
| hook.bulk_insert_rows( | ||
| table="events", | ||
| rows=[("user1", "click"), ("user2", "view")], | ||
| column_names=["user_id", "action"], | ||
| batch_size=1000, | ||
| ) | ||
| ``` | ||
|
|
||
| Call `get_client()` to reach the underlying `clickhouse-connect` client for anything the hook doesn't expose directly: | ||
|
|
||
| ```python | ||
| client = hook.get_client() | ||
| total = client.query("SELECT count() FROM events").result_rows[0][0] | ||
| ``` | ||
|
|
||
| ### Apply session settings {#apply-session-settings} | ||
|
|
||
| Pass [session settings](/operations/settings/settings) when constructing the hook, either directly or through an operator's `hook_params`. Settings passed to the constructor are merged on top of any `session_settings` defined in the connection's **Extra** field, and the constructor values win on conflicting keys: | ||
|
|
||
| ```python | ||
| hook = ClickHouseHook( | ||
| clickhouse_conn_id="clickhouse_default", | ||
| session_settings={"max_execution_time": 60, "max_threads": 4}, | ||
| ) | ||
| ``` | ||
|
|
||
| ## Related content {#related-content} | ||
|
|
||
| - [`clickhouse-connect` Python client](/integrations/python) | ||
| - [ClickHouse HTTP interface](/interfaces/http) | ||
| - [ClickHouse session settings reference](/operations/settings/settings) | ||
| - [`apache-airflow-providers-clickhousedb` reference docs](https://airflow.apache.org/docs/apache-airflow-providers-clickhousedb/) | ||
| - [Provider package on PyPI](https://pypi.org/project/apache-airflow-providers-clickhousedb/) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not 100% sure about this, but I think we need to use
clickhousedbas the driver name here.I think
clickhousealso works but that is also used by the moduleclickhouse-sqlalchemy, so it may cause problems if both modules are installed.Checking the code of
clickhouse-connectI can see both strings may be used, so to be honest I'm not sure which should we add here 🤔 CC @joe-clickhouseclickhousedb, which looks like the "official" one as it's defined at setup time: https://github.com/ClickHouse/clickhouse-connect/blob/main/setup.py#L83-L84clickhouse, which is defined only if this class is imported, which made me thing it's not available all the time: https://github.com/ClickHouse/clickhouse-connect/blob/main/clickhouse_connect/cc_sqlalchemy/__init__.py#L13-L14