Skip to content

Commit 5fe62ac

Browse files
treysperindru
andauthored
Feat!: add clickhouse engine adapter (#3019)
Co-authored-by: Erin Drummond <erin.dru@gmail.com>
1 parent bb6c3d8 commit 5fe62ac

34 files changed

+1337
-76
lines changed

.circleci/continue_config.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,8 @@ workflows:
315315
- mssql
316316
- trino
317317
- spark
318+
- clickhouse
319+
- clickhouse-cluster
318320
- engine_tests_cloud:
319321
name: cloud_engine_<< matrix.engine >>
320322
context:
@@ -328,6 +330,7 @@ workflows:
328330
- databricks
329331
- redshift
330332
- bigquery
333+
- clickhouse-cloud
331334
filters:
332335
branches:
333336
only:

.circleci/wait-for-db.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ probe_port() {
2525
done
2626
}
2727

28+
clickhouse_ready() {
29+
probe_port 8123
30+
}
31+
2832
postgres_ready() {
2933
probe_port 5432
3034
}

Makefile

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ install-doc:
1010
pip3 install -r ./docs/requirements.txt
1111

1212
install-engine-test:
13-
pip3 install -e ".[dev,web,slack,mysql,postgres,databricks,redshift,bigquery,snowflake,trino,mssql]"
13+
pip3 install -e ".[dev,web,slack,mysql,postgres,databricks,redshift,bigquery,snowflake,trino,mssql,clickhouse]"
1414

1515
install-pre-commit:
1616
pre-commit install
@@ -86,9 +86,9 @@ dev-publish: ui-build clean-build publish
8686
jupyter-example:
8787
jupyter lab tests/slows/jupyter/example_outputs.ipynb
8888

89-
engine-up: engine-mssql-up engine-mysql-up engine-postgres-up engine-spark-up engine-trino-up
89+
engine-up: engine-clickhouse-up engine-mssql-up engine-mysql-up engine-postgres-up engine-spark-up engine-trino-up
9090

91-
engine-down: engine-mssql-down engine-mysql-down engine-postgres-down engine-spark-down engine-trino-down
91+
engine-down: engine-clickhouse-down engine-mssql-down engine-mysql-down engine-postgres-down engine-spark-down engine-trino-down
9292

9393
fast-test:
9494
pytest -n auto -m "fast and not cicdonly"
@@ -166,6 +166,12 @@ engine-%-down:
166166
# Docker Engines #
167167
##################
168168

169+
clickhouse-test: engine-clickhouse-up
170+
pytest -n auto -x -m "clickhouse" --junitxml=test-results/junit-clickhouse.xml
171+
172+
clickhouse-cluster-test: engine-clickhouse-up
173+
pytest -n auto -x -m "clickhouse_cluster" --junitxml=test-results/junit-clickhouse-cluster.xml
174+
169175
duckdb-test: engine-duckdb-install
170176
pytest -n auto -x -m "duckdb" --junitxml=test-results/junit-duckdb.xml
171177

@@ -200,3 +206,6 @@ databricks-test: guard-DATABRICKS_CATALOG guard-DATABRICKS_SERVER_HOSTNAME guard
200206

201207
redshift-test: guard-REDSHIFT_HOST guard-REDSHIFT_USER guard-REDSHIFT_PASSWORD guard-REDSHIFT_DATABASE engine-redshift-install
202208
pytest -n auto -x -m "redshift" --junitxml=test-results/junit-redshift.xml
209+
210+
clickhouse-cloud-test: guard-CLICKHOUSE_CLOUD_HOST guard-CLICKHOUSE_CLOUD_USERNAME guard-CLICKHOUSE_CLOUD_PASSWORD engine-clickhouse-install
211+
pytest -n auto -x -m "clickhouse_cloud" --junitxml=test-results/junit-clickhouse-cloud.xml

examples/sushi/models/raw_marketing.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,15 @@ def execute(
5959
"updated_at": [exec_time] * num_customers,
6060
}
6161
)
62-
df = df_new.merge(df_existing, on="customer_id", how="left", suffixes=(None, "_old"))
62+
63+
# clickhouse returns a dataframe with no columns if the query is empty, so we can't merge
64+
if not df_existing.empty:
65+
df = df_new.merge(df_existing, on="customer_id", how="left", suffixes=(None, "_old"))
66+
else:
67+
df = df_new
68+
df["status_old"] = pd.NA
69+
df["updated_at_old"] = pd.NA
70+
6371
df["updated_at"] = pd.to_datetime(
6472
np.where( # type: ignore
6573
df["status_old"] != df["status"], execution_time, df["updated_at_old"]

pytest.ini

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ markers =
2020
# Engine Adapters
2121
engine: test all engine adapters
2222
bigquery: test for BigQuery
23+
clickhouse: test for Clickhouse (standalone mode)
24+
clickhouse_cluster: test for Clickhouse (cluster mode)
25+
clickhouse_cloud: test for Clickhouse (cloud mode)
2326
databricks: test for Databricks
2427
duckdb: test for DuckDB
2528
motherduck: test for MotherDuck

setup.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,14 @@
4747
"requests",
4848
"rich[jupyter]",
4949
"ruamel.yaml",
50-
"sqlglot[rs]~=25.17.0",
50+
"sqlglot[rs]~=25.18.0",
5151
],
5252
extras_require={
5353
"bigquery": [
5454
"google-cloud-bigquery[pandas]",
5555
"google-cloud-bigquery-storage",
5656
],
57+
"clickhouse": ["clickhouse-connect"],
5758
"databricks": ["databricks-sql-connector"],
5859
"dev": [
5960
f"apache-airflow=={os.environ.get('AIRFLOW_VERSION', '2.9.1')}",

sqlmesh/core/config/connection.py

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,13 @@
3939
logger = logging.getLogger(__name__)
4040

4141
RECOMMENDED_STATE_SYNC_ENGINES = {"postgres", "gcp_postgres", "mysql", "duckdb"}
42+
FORBIDDEN_STATE_SYNC_ENGINES = {
43+
# Do not support row-level operations
44+
"spark",
45+
"trino",
46+
# Nullable types are problematic
47+
"clickhouse",
48+
}
4249

4350

4451
class ConnectionConfig(abc.ABC, BaseConfig):
@@ -84,9 +91,14 @@ def _cursor_init(self) -> t.Optional[t.Callable[[t.Any], None]]:
8491

8592
@property
8693
def is_recommended_for_state_sync(self) -> bool:
87-
"""Whether this connection is recommended for being used as a state sync for production state syncs"""
94+
"""Whether this engine is recommended for being used as a state sync for production state syncs"""
8895
return self.type_ in RECOMMENDED_STATE_SYNC_ENGINES
8996

97+
@property
98+
def is_forbidden_for_state_sync(self) -> bool:
99+
"""Whether this engine is forbidden from being used as a state sync"""
100+
return self.type_ in FORBIDDEN_STATE_SYNC_ENGINES
101+
90102
@property
91103
def _connection_factory_with_kwargs(self) -> t.Callable[[], t.Any]:
92104
"""A function that is called to return a connection object for the given Engine Adapter"""
@@ -1364,6 +1376,73 @@ def _static_connection_kwargs(self) -> t.Dict[str, t.Any]:
13641376
}
13651377

13661378

1379+
class ClickhouseConnectionConfig(ConnectionConfig):
1380+
"""
1381+
Clickhouse Connection Configuration.
1382+
1383+
Property reference: https://clickhouse.com/docs/en/integrations/python#client-initialization
1384+
"""
1385+
1386+
host: str
1387+
username: str
1388+
password: t.Optional[str] = None
1389+
port: t.Optional[int] = None
1390+
cluster: t.Optional[str] = None
1391+
connect_timeout: int = 10
1392+
send_receive_timeout: int = 300
1393+
verify: bool = True
1394+
query_limit: int = 0
1395+
use_compression: bool = True
1396+
compression_method: t.Optional[str] = None
1397+
1398+
concurrent_tasks: int = 1
1399+
register_comments: bool = True
1400+
pre_ping: bool = False
1401+
1402+
type_: Literal["clickhouse"] = Field(alias="type", default="clickhouse")
1403+
1404+
@property
1405+
def _connection_kwargs_keys(self) -> t.Set[str]:
1406+
kwargs = {
1407+
"host",
1408+
"username",
1409+
"port",
1410+
"password",
1411+
"connect_timeout",
1412+
"send_receive_timeout",
1413+
"verify",
1414+
"query_limit",
1415+
}
1416+
return kwargs
1417+
1418+
@property
1419+
def _engine_adapter(self) -> t.Type[EngineAdapter]:
1420+
return engine_adapter.ClickhouseEngineAdapter
1421+
1422+
@property
1423+
def _connection_factory(self) -> t.Callable:
1424+
from clickhouse_connect.dbapi import connect # type: ignore
1425+
1426+
return connect
1427+
1428+
@property
1429+
def _extra_engine_config(self) -> t.Dict[str, t.Any]:
1430+
return {"cluster": self.cluster}
1431+
1432+
@property
1433+
def _static_connection_kwargs(self) -> t.Dict[str, t.Any]:
1434+
from sqlmesh import __version__
1435+
1436+
# False = no compression
1437+
# True = Clickhouse default compression method
1438+
# string = specific compression method
1439+
compress: bool | str = self.use_compression
1440+
if compress and self.compression_method:
1441+
compress = self.compression_method
1442+
1443+
return {"compress": compress, "client_name": f"SQLMesh/{__version__}"}
1444+
1445+
13671446
CONNECTION_CONFIG_TO_TYPE = {
13681447
# Map all subclasses of ConnectionConfig to the value of their `type_` field.
13691448
tpe.all_field_infos()["type_"].default: tpe

sqlmesh/core/config/scheduler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ def create_state_sync(self, context: GenericContext) -> StateSync:
8383
context.config.get_state_connection(context.gateway) or context._connection_config
8484
)
8585
engine_adapter = state_connection.create_engine_adapter()
86-
if not engine_adapter.SUPPORTS_ROW_LEVEL_OP:
86+
if state_connection.is_forbidden_for_state_sync:
8787
raise ConfigError(
8888
f"The {engine_adapter.DIALECT.upper()} engine cannot be used to store SQLMesh state - please specify a different `state_connection` engine."
8989
+ " See https://sqlmesh.readthedocs.io/en/stable/reference/configuration/#gateways for more information."

sqlmesh/core/dialect.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,9 @@ def _parse_props(self: Parser) -> t.Optional[exp.Expression]:
374374
name = key.name.lower()
375375
if name == "when_matched":
376376
value: t.Optional[exp.Expression] = self._parse_when_matched()[0]
377+
elif name == "time_data_type":
378+
# TODO: if we make *_data_type a convention to parse things into exp.DataType, we could make this more generic
379+
value = self._parse_types(schema=True)
377380
elif self._match(TokenType.L_PAREN):
378381
value = self.expression(exp.Tuple, expressions=self._parse_csv(self._parse_equality))
379382
self._match_r_paren()

sqlmesh/core/engine_adapter/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
EngineAdapterWithIndexSupport,
88
)
99
from sqlmesh.core.engine_adapter.bigquery import BigQueryEngineAdapter
10+
from sqlmesh.core.engine_adapter.clickhouse import ClickhouseEngineAdapter
1011
from sqlmesh.core.engine_adapter.databricks import DatabricksEngineAdapter
1112
from sqlmesh.core.engine_adapter.duckdb import DuckDBEngineAdapter
1213
from sqlmesh.core.engine_adapter.mssql import MSSQLEngineAdapter
@@ -21,6 +22,7 @@
2122
"hive": SparkEngineAdapter,
2223
"spark": SparkEngineAdapter,
2324
"bigquery": BigQueryEngineAdapter,
25+
"clickhouse": ClickhouseEngineAdapter,
2426
"duckdb": DuckDBEngineAdapter,
2527
"snowflake": SnowflakeEngineAdapter,
2628
"databricks": DatabricksEngineAdapter,

0 commit comments

Comments
 (0)