From ec9bbb3fbaf488a870b60be331fb42bcb5150973 Mon Sep 17 00:00:00 2001 From: Evgeny Skvortsov <71790359+EvgSkv@users.noreply.github.com> Date: Thu, 29 Jan 2026 01:34:04 +0000 Subject: [PATCH 1/2] Add ClickHouse support --- colab_logica.py | 46 ++++- common/clickhouse_logica.py | 192 ++++++++++++++++++ common/logica_lib.py | 7 + .../dialect_libraries/clickhouse_library.py | 55 +++++ compiler/dialects.py | 58 ++++++ compiler/expr_translate.py | 39 +++- compiler/universe.py | 2 +- .../clickhouse/shipping_funfacts_test.l | 134 ++++++++++++ .../clickhouse/shipping_funfacts_test.txt | 10 + integration_tests/run_tests.py | 5 +- logica.py | 23 ++- type_inference/research/infer.py | 49 ++++- 12 files changed, 603 insertions(+), 17 deletions(-) create mode 100644 common/clickhouse_logica.py create mode 100644 compiler/dialect_libraries/clickhouse_library.py create mode 100644 integration_tests/dialects/clickhouse/shipping_funfacts_test.l create mode 100644 integration_tests/dialects/clickhouse/shipping_funfacts_test.txt diff --git a/colab_logica.py b/colab_logica.py index 5544aa20..8f083763 100755 --- a/colab_logica.py +++ b/colab_logica.py @@ -18,11 +18,13 @@ from decimal import Decimal import getpass +import io import json import re from .common import color from .common import concertina_lib +from .common import clickhouse_logica from .common import duckdb_logica from .common import psql_logica @@ -215,9 +217,30 @@ def RunSQL(sql, engine, connection=None, is_final=False): ShowError("Error while executing SQL:\n%s" % e) raise e return None + elif engine == 'clickhouse': + # ClickHouse runner uses HTTP and doesn't require a DB-API connection. + # For the final predicate we return a DataFrame for display. + # For non-final statements we use FORMAT Null to minimize transfer. + if is_final: + engine_settings = dict(connection or {}) + engine_settings['settings'] = dict(engine_settings.get('settings') or {}) + engine_settings['settings']['output_format_json_named_tuples_as_objects'] = 1 + + json_text = clickhouse_logica.RunQuery( + sql, + output_format='json', + engine_settings=engine_settings) + if not json_text.strip(): + return pandas.DataFrame() + return pandas.read_json(io.StringIO(json_text), lines=True) + else: + clickhouse_logica.RunQuery( + sql.rstrip().rstrip(';') + ' FORMAT Null', + output_format='csv', + engine_settings=connection) + return None else: - raise Exception('Logica only supports BigQuery, PostgreSQL and SQLite ' - 'for now.') + raise Exception('Unsupported engine: %s' % engine) def Ingress(table_name, csv_file_name): @@ -314,6 +337,15 @@ def RegisterTableLocation(self, predicate, table_location): CONNECTION_USED = None + +class ClickHouseRunner(object): + def __init__(self, engine_settings=None): + # We pass engine_settings via the "connection" parameter of RunSQL. + self.engine_settings = engine_settings or {} + + def __call__(self, sql, engine, is_final): + return RunSQL(sql, engine, connection=self.engine_settings, is_final=is_final) + def Logica(line, cell, run_query): """Running Logica predicates and storing results.""" predicates, maybe_storage_file = ParseListAndMaybeFile(line) @@ -427,9 +459,15 @@ def Logica(line, cell, run_query): elif engine == 'bigquery': EnsureAuthenticatedUser() sql_runner = RunSQL + elif engine == 'clickhouse': + # Connection settings can be provided via @Engine("clickhouse", ...) + # annotation or environment variables (see common/clickhouse_logica.py). + engine_settings = ( + program.annotations.annotations.get('@Engine', {}) + .get('clickhouse', {})) + sql_runner = ClickHouseRunner(engine_settings=engine_settings) else: - raise Exception('Logica only supports BigQuery, PostgreSQL and SQLite ' - 'for now.') + raise Exception('Unsupported engine: %s' % engine) try: result_map = concertina_lib.ExecuteLogicaProgram( executions, sql_runner=sql_runner, sql_engine=engine, diff --git a/common/clickhouse_logica.py b/common/clickhouse_logica.py new file mode 100644 index 00000000..8269132d --- /dev/null +++ b/common/clickhouse_logica.py @@ -0,0 +1,192 @@ +#!/usr/bin/python +# +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ClickHouse execution helpers for Logica. + +This is a minimal HTTP client implementation meant for local/dev usage. +Connection parameters can be provided either via @Engine("clickhouse", ...) +settings or via environment variables. + +Environment variables: + LOGICA_CLICKHOUSE_HOST (default: 127.0.0.1) + LOGICA_CLICKHOUSE_PORT (default: 8123) + LOGICA_CLICKHOUSE_USER (default: default) + LOGICA_CLICKHOUSE_PASSWORD (default: "") + LOGICA_CLICKHOUSE_DATABASE (default: default) +""" + +from __future__ import annotations + +import csv +import io +import os +import re +import base64 +import urllib.parse +import urllib.request +import urllib.error + +if '.' not in __package__: + from common import sqlite3_logica + from common import color +else: + from ..common import sqlite3_logica + from ..common import color + + +_FORMAT_RE = re.compile(r"\bFORMAT\b", re.IGNORECASE) + + +class ClickHouseQueryError(RuntimeError): + def __init__(self, message, *, url=None, status=None, body=None, sql=None): + super().__init__(message) + self.url = url + self.status = status + self.body = body + self.sql = sql + + +class ClickHouseCliError(RuntimeError): + pass + + +def _FormatCliError(e: ClickHouseQueryError) -> str: + details = '' + if getattr(e, 'status', None) is not None: + details += f'HTTP {e.status}. ' + body = getattr(e, 'body', None) + if body: + details += body.strip() + else: + details += str(e) + return color.Format('[ {error}Error{end} ] ClickHouse query failed: {msg}', + {'msg': details}) + + +def RunQueryCli(sql, *, output_format='pretty', engine_settings=None) -> bytes: + """Run a query for the Logica CLI. + + Returns bytes ready to be printed. + Raises ClickHouseCliError with a color-formatted message on failure. + """ + try: + return RunQuery(sql, output_format=output_format, + engine_settings=engine_settings).encode() + except ClickHouseQueryError as e: + raise ClickHouseCliError(_FormatCliError(e)) + + +def _coalesce(first, second): + return first if first is not None else second + + +def GetConnectionSettings(engine_settings=None): + engine_settings = engine_settings or {} + host = _coalesce(engine_settings.get('host'), os.environ.get('LOGICA_CLICKHOUSE_HOST')) or '127.0.0.1' + port = int(_coalesce(engine_settings.get('port'), os.environ.get('LOGICA_CLICKHOUSE_PORT')) or 8123) + user = _coalesce(engine_settings.get('user'), os.environ.get('LOGICA_CLICKHOUSE_USER')) or 'default' + password = _coalesce(engine_settings.get('password'), os.environ.get('LOGICA_CLICKHOUSE_PASSWORD')) + if password is None: + # Default ClickHouse setups typically have an empty password for user + # 'default'. Users can override via @Engine(..., password: ...) or env var. + password = '' + database = _coalesce(engine_settings.get('database'), os.environ.get('LOGICA_CLICKHOUSE_DATABASE')) or 'default' + query_settings = engine_settings.get('settings') or {} + if query_settings is None: + query_settings = {} + if not isinstance(query_settings, dict): + raise ValueError('ClickHouse engine_settings.settings must be a dict, got: %r' % (query_settings,)) + return { + 'host': host, + 'port': port, + 'user': user, + 'password': password, + 'database': database, + 'settings': query_settings, + } + + +def _http_query(sql, *, settings, fmt): + # Use a named format so we can parse results. + if not _FORMAT_RE.search(sql): + sql = sql.rstrip().rstrip(';') + f' FORMAT {fmt}' + + # Use POST to avoid URL length limits (compiled SQL can be large). + params = {'database': settings['database']} + for k, v in (settings.get('settings') or {}).items(): + if v is None: + continue + params[str(k)] = str(v) + url = f"http://{settings['host']}:{settings['port']}/?" + urllib.parse.urlencode(params) + req = urllib.request.Request( + url, + data=(sql + "\n").encode('utf-8'), + method='POST') + + # Preemptive basic auth avoids extra 401 roundtrip. + token = base64.b64encode( + f"{settings['user']}:{settings['password']}".encode('utf-8')).decode('ascii') + req.add_header('Authorization', f'Basic {token}') + req.add_header('Content-Type', 'text/plain; charset=utf-8') + + try: + with urllib.request.urlopen(req, timeout=30) as resp: + return resp.read().decode('utf-8', errors='replace') + except urllib.error.HTTPError as e: + # ClickHouse sometimes returns query errors with HTTP status codes like + # 404 and a useful plain-text body. Surface that body to the user. + try: + body = e.read().decode('utf-8', errors='replace') + except Exception: + body = None + raise ClickHouseQueryError( + 'ClickHouse HTTP error', + url=getattr(e, 'filename', None), + status=getattr(e, 'code', None), + body=body, + sql=sql, + ) + except urllib.error.URLError as e: + raise ClickHouseQueryError( + f'ClickHouse connection error: {e}', + url=url, + sql=sql, + ) + + +def RunQuery(sql, output_format='pretty', engine_settings=None): + """Run a query on ClickHouse and return formatted output as a string.""" + settings = GetConnectionSettings(engine_settings) + + if output_format == 'csv': + return _http_query(sql, settings=settings, fmt='CSVWithNames') + + if output_format == 'json': + return _http_query(sql, settings=settings, fmt='JSONEachRow') + + # pretty / artistictable + body = _http_query(sql, settings=settings, fmt='TabSeparatedWithNames') + if not body.strip(): + return '' + + reader = csv.reader(io.StringIO(body), delimiter='\t') + try: + header = next(reader) + except StopIteration: + return '' + + rows = [row for row in reader] + return sqlite3_logica.ArtisticTable(header, rows) diff --git a/common/logica_lib.py b/common/logica_lib.py index 22fcdb2f..faf3fb1d 100644 --- a/common/logica_lib.py +++ b/common/logica_lib.py @@ -25,6 +25,7 @@ from common import duckdb_logica from common import sqlite3_logica from common import psql_logica + from common import clickhouse_logica from compiler import functors from compiler import rule_translate from compiler import universe @@ -34,6 +35,7 @@ from ..common import duckdb_logica from ..common import sqlite3_logica from ..common import psql_logica + from ..common import clickhouse_logica from ..compiler import functors from ..compiler import rule_translate from ..compiler import universe @@ -122,6 +124,11 @@ def RunQuery(sql, duckdb_logica.ConnectClingo(connection, logical_context=logical_context) df = connection.sql(sql).df() return sqlite3_logica.DataframeAsArtisticTable(df) + elif engine == 'clickhouse': + return clickhouse_logica.RunQuery( + sql, + output_format=output_format, + engine_settings=settings) else: assert False, 'Unknown engine: %s' % engine o, _ = p.communicate(sql.encode()) diff --git a/compiler/dialect_libraries/clickhouse_library.py b/compiler/dialect_libraries/clickhouse_library.py new file mode 100644 index 00000000..d960db8b --- /dev/null +++ b/compiler/dialect_libraries/clickhouse_library.py @@ -0,0 +1,55 @@ +#!/usr/bin/python +# +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +library = """ +->(left:, right:) = {arg: left, value: right}; +`=`(left:, right:) = right :- left == right; + +Arrow(left, right) = arrow :- + left == arrow.arg, + right == arrow.value; + +# Aggregates. +ArgMin(arr) = SqlExpr( + "argMin({a}, {v})", {a:, v:}) :- Arrow(a, v) == arr; + +ArgMax(arr) = SqlExpr( + "argMax({a}, {v})", {a:, v:}) :- Arrow(a, v) == arr; + +# Best-effort top-k helpers using tuple sorting. +ArgMaxK(a, l) = SqlExpr( + "arraySlice(arrayMap(x -> x.2, arrayReverseSort(groupArray(({value}, {arg})))), 1, {lim})", + {arg: a.arg, value: a.value, lim: l}); + +ArgMinK(a, l) = SqlExpr( + "arraySlice(arrayMap(x -> x.2, arraySort(groupArray(({value}, {arg})))), 1, {lim})", + {arg: a.arg, value: a.value, lim: l}); + +Array(a) = SqlExpr( + "groupArray({value} ORDER BY {arg})", + {arg: a.arg, value: a.value}); + +RecordAsJson(r) = SqlExpr("toJSONString({x})", {x: r}); + +# Hash helpers. +Fingerprint(s) = SqlExpr("reinterpretAsInt64(cityHash64(toString({s})))", {s:}); +NaturalHash(x) = Fingerprint(x); + +Chr(x) = SqlExpr("char({x})", {x:}); + +Num(a) = a; +Str(a) = a; +""" diff --git a/compiler/dialects.py b/compiler/dialects.py index ad706302..78c59be3 100755 --- a/compiler/dialects.py +++ b/compiler/dialects.py @@ -26,6 +26,7 @@ from compiler.dialect_libraries import presto_library from compiler.dialect_libraries import databricks_library from compiler.dialect_libraries import duckdb_library + from compiler.dialect_libraries import clickhouse_library else: from ..compiler.dialect_libraries import bq_library from ..compiler.dialect_libraries import psql_library @@ -34,6 +35,8 @@ from ..compiler.dialect_libraries import presto_library from ..compiler.dialect_libraries import databricks_library from ..compiler.dialect_libraries import duckdb_library + from ..compiler.dialect_libraries import clickhouse_library + def Get(engine): return DIALECTS[engine]() @@ -239,6 +242,60 @@ def DecorateCombineRule(self, rule, var): return rule +class ClickHouseDialect(Dialect): + """ClickHouse SQL dialect (minimal). + + This is intentionally small: enough to compile and run basic programs. + """ + + def Name(self): + return 'ClickHouse' + + def BuiltInFunctions(self): + return { + 'Range': 'range(%s)', + 'ToString': 'toString(%s)', + 'ToInt64': 'toInt64(%s)', + 'ToFloat64': 'toFloat64(%s)', + 'Element': 'arrayElement({0}, {1} + 1)', + 'Size': 'length(%s)', + 'List': 'groupArray(%s)', + 'Set': 'groupUniqArray(%s)', + 'Count': 'countDistinct(%s)', + 'AnyValue': 'any(%s)', + 'ArrayConcat': 'arrayConcat({0}, {1})', + # The compiler implements "x in some_array" as an UNNEST with a + # generated table alias; this extracts the element from that alias. + 'ValueOfUnnested': '{0}.unnested_pod', + } + + def InfixOperators(self): + return { + '++': 'concat(%s, %s)', + 'in': 'has({right}, {left})', + } + + def Subscript(self, record, subscript, record_is_table): + if record_is_table: + return '%s.%s' % (record, subscript) + key = str(subscript).replace('\\', '\\\\').replace("'", "\\'") + return "tupleElement(%s, '%s')" % (record, key) + + def LibraryProgram(self): + return clickhouse_library.library + + def UnnestPhrase(self): + # ClickHouse doesn't support arrayJoin(...) as a table function in FROM. + # Use a subquery that produces a single column. + return '(select arrayJoin({0}) as unnested_pod) as {1}' + + def ArrayPhrase(self): + return '[%s]' + + def GroupBySpecBy(self): + return 'expr' + + class Presto(Dialect): def Name(self): @@ -464,5 +521,6 @@ def IsPostgreSQLish(self): 'trino': Trino, 'databricks': Databricks, 'duckdb': DuckDB, + 'clickhouse': ClickHouseDialect, } diff --git a/compiler/expr_translate.py b/compiler/expr_translate.py index 418ecb22..085919ff 100755 --- a/compiler/expr_translate.py +++ b/compiler/expr_translate.py @@ -249,14 +249,14 @@ def IntLiteral(self, literal): return str(literal['number']) def StrLiteral(self, literal): - if self.dialect.Name() in ["DuckDB"]: # PostreSQL too? + if self.dialect.Name() in ["DuckDB"]: # PostgreSQL too? return 'E\'%s\'' % ( literal['the_string'] .replace('\\', '\\\\') .replace("'", "''") .replace('\t', r'\t') .replace('\n', r'\n')) - if self.dialect.Name() in ["PostgreSQL", "Presto", "Trino", "SqLite"]: + if self.dialect.Name() in ["PostgreSQL", "Presto", "Trino", "SqLite", "ClickHouse"]: # TODO: Do this safely. return '\'%s\'' % (literal['the_string'].replace("'", "''")) @@ -269,6 +269,18 @@ def ListLiteralInternals(self, literal): def ListLiteral(self, literal, element_type_name, full_expression): # <-- for error. internals = self.ListLiteralInternals(literal) + + if self.convert_to_json: + return '[%s]' % internals + + if self.dialect.Name() == 'ClickHouse': + # Be maximally deterministic: always pin the array element type. + # This also avoids ClickHouse interpreting [] as Array(Nothing). + if not element_type_name: + raise self.exception_maker( + 'Type is needed, but not determined for %s. Please give hints with ~ operator!' % + color.Warn(full_expression['expression_heritage'])) + return "CAST([%s], 'Array(%s)')" % (internals, element_type_name) if self.dialect.IsPostgreSQLish() and not element_type_name: raise self.exception_maker( 'Type is needed, but not determined for %s. Please give hints with ~ operator!' % @@ -278,9 +290,6 @@ def ListLiteral(self, literal, element_type_name, if self.dialect.IsPostgreSQLish() else '') array_phrase = self.dialect.ArrayPhrase() - if self.convert_to_json: - array_phrase = '[%s]' - suffix = '' return (array_phrase % internals) + suffix def BoolLiteral(self, literal): @@ -350,6 +359,15 @@ def Record(self, record, record_type=None): if self.convert_to_json: return self.RecordAsJson(record) # TODO: Move this to dialects.py. + if self.dialect.Name() == 'ClickHouse': + # For ClickHouse we rely on type inference to populate type_name with a + # rendered Tuple(...) type. + assert record_type, json.dumps(record, indent=' ') + args = ', '.join( + self.ConvertToSql(f_v['value']['expression']) + for f_v in sorted(record['field_value'], + key=lambda x: StrIntKey(x['field']))) + return "CAST(tuple(%s), '%s')" % (args, record_type) if self.dialect.Name() == 'SqLite': arguments_str = ', '.join( "'%s', %s" % (f_v['field'], @@ -658,7 +676,16 @@ def ConvertToSql(self, expression): 'an incomplete type {warning}{type}{end}.', dict( record=expression['expression_heritage'], type=rendered_type))) - + + if self.dialect.Name() == 'ClickHouse' and record_type is None: + rendered_type = expression.get('type', {}).get('rendered_type', None) + raise self.exception_maker(color.Format( + 'Record needs type in ClickHouse: ' + '{warning}{record}{end} was inferred only ' + 'an incomplete type {warning}{type}{end}.', dict( + record=expression['expression_heritage'], + type=rendered_type))) + return self.Record( record, record_type=record_type) diff --git a/compiler/universe.py b/compiler/universe.py index 8366a2c7..8d598993 100755 --- a/compiler/universe.py +++ b/compiler/universe.py @@ -337,7 +337,7 @@ def Engine(self): return engine def EngineTypechecksByDefault(self, engine): - return engine in ['psql', 'duckdb'] + return engine in ['psql', 'duckdb', 'clickhouse'] def ShouldTypecheck(self): engine = self.Engine() diff --git a/integration_tests/dialects/clickhouse/shipping_funfacts_test.l b/integration_tests/dialects/clickhouse/shipping_funfacts_test.l new file mode 100644 index 00000000..d25a7f44 --- /dev/null +++ b/integration_tests/dialects/clickhouse/shipping_funfacts_test.l @@ -0,0 +1,134 @@ +# +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# A creative ClickHouse integration test exercising: +# - Records + field access (r.from / r.to) +# - Aggregations (+=, Count=, ArgMax=) +# - Deterministic ordering of output + +@Engine("clickhouse"); + +# --- Universe: The Astra Freight Guild --- + +Planet(name: "Astra", sector: "Core"); +Planet(name: "Borealis", sector: "Rim"); +Planet(name: "Cygnus", sector: "Core"); +Planet(name: "Draco", sector: "Rim"); +Planet(name: "Erebus", sector: "Void"); + +Ship(name: "Kestrel", captain: "Mira"); +Ship(name: "Orchid", captain: "Jae"); +Ship(name: "Nomad", captain: "Sol"); +Ship(name: "Pioneer", captain: "Rin"); + +Crew(ship: "Kestrel", member: "Mira"); +Crew(ship: "Kestrel", member: "Ivo"); +Crew(ship: "Kestrel", member: "Tala"); + +Crew(ship: "Orchid", member: "Jae"); +Crew(ship: "Orchid", member: "Kato"); + +Crew(ship: "Nomad", member: "Sol"); +Crew(ship: "Nomad", member: "Uma"); +Crew(ship: "Nomad", member: "Bea"); +Crew(ship: "Nomad", member: "Nox"); + +Crew(ship: "Pioneer", member: "Rin"); + +# Shipment(id:, ship:, from:, to:, weight:, day:) +Shipment(id: 1, ship: "Kestrel", from: "Astra", to: "Borealis", weight: 12, day: 1); +Shipment(id: 2, ship: "Kestrel", from: "Astra", to: "Cygnus", weight: 7, day: 1); +Shipment(id: 3, ship: "Orchid", from: "Cygnus", to: "Astra", weight: 9, day: 1); +Shipment(id: 4, ship: "Nomad", from: "Draco", to: "Borealis", weight: 18, day: 1); +Shipment(id: 5, ship: "Pioneer", from: "Erebus", to: "Astra", weight: 3, day: 1); + +Shipment(id: 6, ship: "Nomad", from: "Borealis", to: "Astra", weight: 14, day: 2); +Shipment(id: 7, ship: "Orchid", from: "Astra", to: "Draco", weight: 7, day: 2); +Shipment(id: 8, ship: "Kestrel", from: "Cygnus", to: "Erebus", weight: 4, day: 2); +Shipment(id: 9, ship: "Nomad", from: "Draco", to: "Cygnus", weight: 12, day: 2); +Shipment(id: 10, ship: "Pioneer", from: "Erebus", to: "Borealis", weight: 9, day: 2); + +Shipment(id: 11, ship: "Kestrel", from: "Astra", to: "Borealis", weight: 6, day: 3); +Shipment(id: 12, ship: "Orchid", from: "Borealis", to: "Draco", weight: 2, day: 3); +Shipment(id: 13, ship: "Nomad", from: "Cygnus", to: "Astra", weight: 16, day: 3); +Shipment(id: 14, ship: "Nomad", from: "Cygnus", to: "Draco", weight: 1, day: 3); +Shipment(id: 15, ship: "Pioneer", from: "Astra", to: "Erebus", weight: 2, day: 3); + +Shipment(id: 16, ship: "Kestrel", from: "Draco", to: "Borealis", weight: 10, day: 4); +Shipment(id: 17, ship: "Orchid", from: "Astra", to: "Cygnus", weight: 3, day: 4); +Shipment(id: 18, ship: "Nomad", from: "Borealis", to: "Cygnus", weight: 10, day: 4); +Shipment(id: 19, ship: "Pioneer", from: "Erebus", to: "Draco", weight: 8, day: 4); + +# --- Derived metrics (aggregation syntax) --- + +Outgoing(planet: p, total? += w) distinct :- Shipment(from: p, weight: w); +Incoming(planet: p, total? += w) distinct :- Shipment(to: p, weight: w); + +PartnerCount(planet: p, n? Count= dest) distinct :- Shipment(from: p, to: dest); + +RouteWeight(route: r, total? += w) distinct :- + Shipment(from: f, to: t, weight: w), + r = {from: f, to: t}; + +DayWeight(day: d, total? += w) distinct :- Shipment(day: d, weight: w); + +CrewCount(ship: s, crew? Count= m) distinct :- Crew(ship: s, member: m); +ShipWeight(ship: s, total? += w) distinct :- Shipment(ship: s, weight: w); + +ShipEfficiency(ship: s, eff: eff) :- + ShipWeight(ship: s, total: total), + CrewCount(ship: s, crew: crew), + eff = total / crew; + +# --- Winners (aggregation operators) --- + +HubPlanet() ArgMax= p -> n :- PartnerCount(planet: p, n: n); +BusiestDay() ArgMax= d -> total :- DayWeight(day: d, total: total); +HeaviestRoute() ArgMax= r -> total :- RouteWeight(route: r, total: total); +BestShip() ArgMax= s -> eff :- ShipEfficiency(ship: s, eff: eff); + +MostOutgoingPlanet() ArgMax= p -> w :- Outgoing(planet: p, total: w); +MostIncomingPlanet() ArgMax= p -> w :- Incoming(planet: p, total: w); + +# --- Fun facts output --- + +@OrderBy(FunFact, "kind", "subject"); + +FunFact(kind: "Hub planet", subject: p, value: ToString(n) ++ " destinations") :- + p = HubPlanet(), + PartnerCount(planet: p, n: n); + +FunFact(kind: "Busiest day", subject: ToString(d), value: ToString(w) ++ " total tons") :- + d = BusiestDay(), + DayWeight(day: d, total: w); + +FunFact(kind: "Heaviest route", subject: r.from ++ " -> " ++ r.to, value: ToString(w) ++ " tons") :- + r = HeaviestRoute(), + RouteWeight(route: r, total: w); + +FunFact(kind: "Most outgoing", subject: p, value: ToString(w) ++ " tons shipped") :- + p = MostOutgoingPlanet(), + Outgoing(planet: p, total: w); + +FunFact(kind: "Most incoming", subject: p, value: ToString(w) ++ " tons received") :- + p = MostIncomingPlanet(), + Incoming(planet: p, total: w); + +FunFact(kind: "Best ship", subject: s ++ " (captain " ++ cap ++ ")", value: ToString(eff) ++ " tons/crew") :- + s = BestShip(), + Ship(name: s, captain: cap), + ShipEfficiency(ship: s, eff: eff); + +Test := FunFact(); diff --git a/integration_tests/dialects/clickhouse/shipping_funfacts_test.txt b/integration_tests/dialects/clickhouse/shipping_funfacts_test.txt new file mode 100644 index 00000000..40eb122d --- /dev/null +++ b/integration_tests/dialects/clickhouse/shipping_funfacts_test.txt @@ -0,0 +1,10 @@ ++----------------+-----------------------+------------------+ +| kind | subject | value | ++----------------+-----------------------+------------------+ +| Best ship | Pioneer (captain Rin) | 22 tons/crew | +| Busiest day | 1 | 49 total tons | +| Heaviest route | Draco -> Borealis | 28 tons | +| Hub planet | Astra | 4 destinations | +| Most incoming | Borealis | 55 tons received | +| Most outgoing | Draco | 40 tons shipped | ++----------------+-----------------------+------------------+ \ No newline at end of file diff --git a/integration_tests/run_tests.py b/integration_tests/run_tests.py index d85281c8..5e65775b 100755 --- a/integration_tests/run_tests.py +++ b/integration_tests/run_tests.py @@ -45,7 +45,7 @@ def RunTest(name, src=None, golden=None, predicate=None, duckify_psql=True) -def RunAll(test_presto=False, test_trino=False, test_clingo=True): +def RunAll(test_presto=False, test_trino=False, test_clingo=True, test_clickhouse=False): """Running all tests.""" # Uncomment to test writing tables. # RunTest("ground_test") @@ -65,6 +65,9 @@ def RunAll(test_presto=False, test_trino=False, test_clingo=True): RunTest("dialects/trino/joins_test") RunTest("dialects/trino/joins_test") + if test_clickhouse: + RunTest("dialects/clickhouse/shipping_funfacts_test") + if test_clingo: from common import duckdb_logica RunTest('clingo_sum_test') diff --git a/logica.py b/logica.py index 34b8cf2b..ffb5d8ff 100755 --- a/logica.py +++ b/logica.py @@ -43,6 +43,7 @@ if __name__ == '__main__' and not __package__: from common import color from common import sqlite3_logica + from common import clickhouse_logica from common import clingo_logica from common import duckdb_logica from compiler import functors @@ -55,6 +56,7 @@ else: from .common import color from .common import sqlite3_logica + from .common import clickhouse_logica from .common import clingo_logica from .common import duckdb_logica from .compiler import functors @@ -210,9 +212,10 @@ def main(argv): if command == 'infer_types': # This disallows getting types of program with type errors. # logic_program = universe.LogicaProgram(parsed_rules) - # TODO: Find a way to get engine from program. But it should not matter - # for inference. It only patters for compiling. - typing_engine = infer.TypesInferenceEngine(parsed_rules, "psql") + # Dialect mostly doesn't matter for inference itself, but it does matter + # for dialect-specific type renderings stored in the syntax tree. + annotations = universe.Annotations(parsed_rules, user_flags={}) + typing_engine = infer.TypesInferenceEngine(parsed_rules, annotations.Engine()) typing_engine.InferTypes() # print(parsed_rules) print(json.dumps(parsed_rules, sort_keys=True, indent=' ')) @@ -331,6 +334,20 @@ def main(argv): ['--output-format=ALIGNED']), stdin=subprocess.PIPE, stdout=subprocess.PIPE) o, _ = p.communicate(formatted_sql.encode()) + elif engine == 'clickhouse': + engine_settings = {} + if ('@Engine' in logic_program.annotations.annotations and + 'clickhouse' in logic_program.annotations.annotations['@Engine']): + engine_settings = logic_program.annotations.annotations['@Engine']['clickhouse'] + output_format = 'csv' if command == 'run_to_csv' else 'pretty' + try: + o = clickhouse_logica.RunQueryCli( + formatted_sql, + output_format=output_format, + engine_settings=engine_settings) + except clickhouse_logica.ClickHouseCliError as e: + print(str(e)) + sys.exit(1) elif engine == 'presto': a = logic_program.annotations.annotations['@Engine']['presto'] catalog = a.get('catalog', 'memory') diff --git a/type_inference/research/infer.py b/type_inference/research/infer.py index 2f98ffc1..f2fa43fc 100644 --- a/type_inference/research/infer.py +++ b/type_inference/research/infer.py @@ -674,6 +674,35 @@ def __init__(self, parsed_rules, dialect): self.psql_type_cache = {} self.dialect = dialect + def ClickHouseType(self, t): + def _ClickHouseFieldName(key): + if isinstance(key, int): + return 'col%d' % key + return str(key) + + # Note: this is an informational rendering for ClickHouse, stored in + # node['type']['type_name']. + if t == 'Str': + return 'String' + if t == 'Num': + return 'Float64' + if t == 'Bool': + return 'Bool' + if t == 'Time': + return 'DateTime' + if isinstance(t, list): + [e] = t + return 'Array(%s)' % self.ClickHouseType(e) + if isinstance(t, dict): + parts = [] + for k, v in sorted(t.items(), key=reference_algebra.StrIntKey): + n = _ClickHouseFieldName(k) + rendered_v = self.ClickHouseType(v) + assert rendered_v is not None, v + parts.append('%s %s' % (n, rendered_v)) + return 'Tuple(%s)' % ', '.join(parts) + assert False, t + def ActPopulatingTypeMap(self, node): if 'type' in node: t = node['type']['the_type'] @@ -685,10 +714,16 @@ def ActPopulatingTypeMap(self, node): if reference_algebra.IsFullyDefined(t): self.psql_type_cache[t_rendering] = self.PsqlType(t) if isinstance(t, dict) and reference_algebra.IsFullyDefined(t): - node['type']['type_name'] = RecordTypeName(t_rendering) + if self.dialect == 'clickhouse': + node['type']['type_name'] = self.ClickHouseType(t) + else: + node['type']['type_name'] = RecordTypeName(t_rendering) if isinstance(t, list) and reference_algebra.IsFullyDefined(t): [e] = t - node['type']['element_type_name'] = self.PsqlType(e) + if self.dialect == 'clickhouse': + node['type']['element_type_name'] = self.ClickHouseType(e) + else: + node['type']['element_type_name'] = self.PsqlType(e) def CollectTypes(self): Walk(self.parsed_rules, self.ActPopulatingTypeMap) @@ -719,6 +754,14 @@ def PsqlType(self, t): assert False, t def BuildPsqlDefinitions(self): + # ClickHouse doesn't need (and doesn't support) PostgreSQL-style CREATE TYPE + # preambles. We still run type inference to attach fully-defined record + # types to expressions (e.g. for named tuples). + if self.dialect == 'clickhouse': + self.definitions = {} + self.typing_preamble = '' + return + for t in self.psql_struct_type_name: arg_name = lambda x: ( '"cast"' if x == 'cast' # Escaping keyword. @@ -756,6 +799,8 @@ def BuildPsqlDefinitions(self): self.typing_preamble = BuildPreamble(self.definitions, self.dialect) def BuildPreamble(definitions, dialect): + if dialect == 'clickhouse': + return '' # Gentle touch of genious here. Sorting definitions by length of the # full type description automatically means that simpler types are defined # before the ones that depend on them. From 4decd5330180b8ee4b28b631f68cdf4fdca67526 Mon Sep 17 00:00:00 2001 From: Evgeny Skvortsov <71790359+EvgSkv@users.noreply.github.com> Date: Thu, 29 Jan 2026 04:24:44 +0000 Subject: [PATCH 2/2] Add ClickHouse support via Concertina --- colab_logica.py | 43 +++++++----- common/clickhouse_logica.py | 127 +++++++++++++++++++++++++++++------- compiler/universe.py | 29 ++++++-- tools/run_in_terminal.py | 11 +++- 4 files changed, 163 insertions(+), 47 deletions(-) diff --git a/colab_logica.py b/colab_logica.py index 8f083763..4766bd90 100755 --- a/colab_logica.py +++ b/colab_logica.py @@ -220,25 +220,34 @@ def RunSQL(sql, engine, connection=None, is_final=False): elif engine == 'clickhouse': # ClickHouse runner uses HTTP and doesn't require a DB-API connection. # For the final predicate we return a DataFrame for display. - # For non-final statements we use FORMAT Null to minimize transfer. + # For non-final statements we execute raw statements (DDL-safe). if is_final: - engine_settings = dict(connection or {}) - engine_settings['settings'] = dict(engine_settings.get('settings') or {}) - engine_settings['settings']['output_format_json_named_tuples_as_objects'] = 1 - - json_text = clickhouse_logica.RunQuery( - sql, - output_format='json', - engine_settings=engine_settings) - if not json_text.strip(): - return pandas.DataFrame() - return pandas.read_json(io.StringIO(json_text), lines=True) + try: + engine_settings = dict(connection or {}) + engine_settings['settings'] = dict(engine_settings.get('settings') or {}) + engine_settings['settings']['output_format_json_named_tuples_as_objects'] = 1 + + json_text = clickhouse_logica.RunQuery( + sql, + output_format='json', + engine_settings=engine_settings) + if not json_text.strip(): + return pandas.DataFrame() + return pandas.read_json(io.StringIO(json_text), lines=True) + except clickhouse_logica.ClickHouseQueryError as e: + print("\n--- SQL ---") + print(sql) + ShowError(clickhouse_logica.FormatQueryError(e)) + raise else: - clickhouse_logica.RunQuery( - sql.rstrip().rstrip(';') + ' FORMAT Null', - output_format='csv', - engine_settings=connection) - return None + try: + clickhouse_logica.RunStatement(sql, engine_settings=connection) + return None + except clickhouse_logica.ClickHouseQueryError as e: + print("\n--- SQL ---") + print(sql) + ShowError(clickhouse_logica.FormatQueryError(e)) + raise else: raise Exception('Unsupported engine: %s' % engine) diff --git a/common/clickhouse_logica.py b/common/clickhouse_logica.py index 8269132d..4578c54e 100644 --- a/common/clickhouse_logica.py +++ b/common/clickhouse_logica.py @@ -47,7 +47,7 @@ from ..common import color -_FORMAT_RE = re.compile(r"\bFORMAT\b", re.IGNORECASE) +FORMAT_RE = re.compile(r"\bFORMAT\b", re.IGNORECASE) class ClickHouseQueryError(RuntimeError): @@ -63,19 +63,26 @@ class ClickHouseCliError(RuntimeError): pass -def _FormatCliError(e: ClickHouseQueryError) -> str: +def FormatCliError(e: ClickHouseQueryError) -> str: details = '' if getattr(e, 'status', None) is not None: details += f'HTTP {e.status}. ' body = getattr(e, 'body', None) if body: details += body.strip() + if ('Multi-statements are not allowed' in body or + ('Syntax error' in body and '-- Interacting with table' in body)): + details += '\nTip: use `logica.py ... run_in_terminal ...` (Concertina) for @Ground/multi-step programs.' else: details += str(e) return color.Format('[ {error}Error{end} ] ClickHouse query failed: {msg}', {'msg': details}) +def FormatQueryError(e: ClickHouseQueryError) -> str: + return FormatCliError(e) + + def RunQueryCli(sql, *, output_format='pretty', engine_settings=None) -> bytes: """Run a query for the Logica CLI. @@ -86,24 +93,24 @@ def RunQueryCli(sql, *, output_format='pretty', engine_settings=None) -> bytes: return RunQuery(sql, output_format=output_format, engine_settings=engine_settings).encode() except ClickHouseQueryError as e: - raise ClickHouseCliError(_FormatCliError(e)) + raise ClickHouseCliError(FormatCliError(e)) -def _coalesce(first, second): +def Coalesce(first, second): return first if first is not None else second def GetConnectionSettings(engine_settings=None): engine_settings = engine_settings or {} - host = _coalesce(engine_settings.get('host'), os.environ.get('LOGICA_CLICKHOUSE_HOST')) or '127.0.0.1' - port = int(_coalesce(engine_settings.get('port'), os.environ.get('LOGICA_CLICKHOUSE_PORT')) or 8123) - user = _coalesce(engine_settings.get('user'), os.environ.get('LOGICA_CLICKHOUSE_USER')) or 'default' - password = _coalesce(engine_settings.get('password'), os.environ.get('LOGICA_CLICKHOUSE_PASSWORD')) + host = Coalesce(engine_settings.get('host'), os.environ.get('LOGICA_CLICKHOUSE_HOST')) or '127.0.0.1' + port = int(Coalesce(engine_settings.get('port'), os.environ.get('LOGICA_CLICKHOUSE_PORT')) or 8123) + user = Coalesce(engine_settings.get('user'), os.environ.get('LOGICA_CLICKHOUSE_USER')) or 'default' + password = Coalesce(engine_settings.get('password'), os.environ.get('LOGICA_CLICKHOUSE_PASSWORD')) if password is None: # Default ClickHouse setups typically have an empty password for user # 'default'. Users can override via @Engine(..., password: ...) or env var. password = '' - database = _coalesce(engine_settings.get('database'), os.environ.get('LOGICA_CLICKHOUSE_DATABASE')) or 'default' + database = Coalesce(engine_settings.get('database'), os.environ.get('LOGICA_CLICKHOUSE_DATABASE')) or 'default' query_settings = engine_settings.get('settings') or {} if query_settings is None: query_settings = {} @@ -119,11 +126,60 @@ def GetConnectionSettings(engine_settings=None): } -def _http_query(sql, *, settings, fmt): - # Use a named format so we can parse results. - if not _FORMAT_RE.search(sql): - sql = sql.rstrip().rstrip(';') + f' FORMAT {fmt}' +class Connection(object): + def __init__(self, engine_settings=None): + self.settings = GetConnectionSettings(engine_settings) + + def RunStatement(self, sql): + return HttpRequest(sql, settings=self.settings) + + def RunQueryHeaderRows(self, sql): + body = HttpQuery(sql, settings=self.settings, fmt='TabSeparatedWithNames') + if not body.strip(): + return [], [] + reader = csv.reader(io.StringIO(body), delimiter='\t') + try: + header = next(reader) + except StopIteration: + return [], [] + rows = [row for row in reader] + return header, rows + + def RunQuery(self, sql, output_format='pretty'): + if output_format == 'csv': + return HttpQuery(sql, settings=self.settings, fmt='CSVWithNames') + if output_format == 'json': + return HttpQuery(sql, settings=self.settings, fmt='JSONEachRow') + (header, rows) = self.RunQueryHeaderRows(sql) + if not header and not rows: + return '' + return sqlite3_logica.ArtisticTable(header, rows) + + +def Connect(engine_settings=None): + return Connection(engine_settings) + + +def ClickHouseConnect(logic_program_or_engine_settings=None): + """Compatibility helper mirroring sqlite3_logica.SqliteConnect(). + By default connects to localhost ClickHouse with user 'default' and empty + password (can be overridden by env vars or @Engine("clickhouse", ...) settings). + """ + engine_settings = None + if isinstance(logic_program_or_engine_settings, dict) or logic_program_or_engine_settings is None: + engine_settings = logic_program_or_engine_settings + else: + # Treat as LogicaProgram-like object. + try: + annotations = logic_program_or_engine_settings.annotations.annotations + engine_settings = annotations.get('@Engine', {}).get('clickhouse') + except Exception: + engine_settings = None + return Connection(engine_settings) + + +def HttpRequest(sql, *, settings): # Use POST to avoid URL length limits (compiled SQL can be large). params = {'database': settings['database']} for k, v in (settings.get('settings') or {}).items(): @@ -167,26 +223,47 @@ def _http_query(sql, *, settings, fmt): ) -def RunQuery(sql, output_format='pretty', engine_settings=None): - """Run a query on ClickHouse and return formatted output as a string.""" - settings = GetConnectionSettings(engine_settings) +def HttpQuery(sql, *, settings, fmt=None): + # Append a FORMAT clause only when requested (DDL doesn't accept FORMAT). + if fmt and not FORMAT_RE.search(sql): + sql = sql.rstrip().rstrip(';') + f' FORMAT {fmt}' + return HttpRequest(sql, settings=settings) - if output_format == 'csv': - return _http_query(sql, settings=settings, fmt='CSVWithNames') - if output_format == 'json': - return _http_query(sql, settings=settings, fmt='JSONEachRow') +def RunStatement(sql, *, engine_settings=None): + """Execute a statement and return the raw response body.""" + settings = GetConnectionSettings(engine_settings) + return HttpRequest(sql, settings=settings) - # pretty / artistictable - body = _http_query(sql, settings=settings, fmt='TabSeparatedWithNames') + +def RunQueryHeaderRows(sql, *, engine_settings=None): + """Run a query and return (header, rows) for Concertina runners.""" + settings = GetConnectionSettings(engine_settings) + body = HttpQuery(sql, settings=settings, fmt='TabSeparatedWithNames') if not body.strip(): - return '' + return [], [] reader = csv.reader(io.StringIO(body), delimiter='\t') try: header = next(reader) except StopIteration: - return '' - + return [], [] rows = [row for row in reader] + return header, rows + + +def RunQuery(sql, output_format='pretty', engine_settings=None): + """Run a query on ClickHouse and return formatted output as a string.""" + settings = GetConnectionSettings(engine_settings) + + if output_format == 'csv': + return HttpQuery(sql, settings=settings, fmt='CSVWithNames') + + if output_format == 'json': + return HttpQuery(sql, settings=settings, fmt='JSONEachRow') + + # pretty / artistictable + (header, rows) = RunQueryHeaderRows(sql, engine_settings=engine_settings) + if not header and not rows: + return '' return sqlite3_logica.ArtisticTable(header, rows) diff --git a/compiler/universe.py b/compiler/universe.py index 8d598993..53763f71 100755 --- a/compiler/universe.py +++ b/compiler/universe.py @@ -325,6 +325,8 @@ def Dataset(self): # This change is intended for all engines in the future. if self.Engine() in ['psql', 'duckdb']: default_dataset = 'logica_home' + if self.Engine() == 'clickhouse': + default_dataset = 'default' if self.Engine() == 'sqlite' and 'logica_home' in self.AttachedDatabases(): default_dataset = 'logica_home' return self.ExtractSingleton('@Dataset', default_dataset) @@ -1298,6 +1300,20 @@ def __init__(self, program, allocator, execution): self.allocator = allocator self.execution = execution + def AddClickhouseDropAction(self, table, ground): + # Hacking because ClickHouse doesnt support multi-statements. + drop_action = '__drop__' + table + drop_statement = ( + 'DROP TABLE IF EXISTS %s%s' % ( + ground.table_name, + self.execution.dialect.MaybeCascadingDeletionWord())) + drop_statement = FormatSql(drop_statement) + drop_statement = self.program.UseFlagsAsParameters(drop_statement) + self.execution.table_to_export_map[drop_action] = drop_statement + self.execution.export_statements.append(drop_statement) + self.execution.defines_and_exports.append(drop_statement) + self.execution.dependency_edges.append((drop_action, table)) + def TranslateTableAttachedToFile(self, table, ground, external_vocabulary, edge_needed=True): """Translates file-attached table. Appends exports and defines.""" @@ -1332,12 +1348,17 @@ def TranslateTableAttachedToFile(self, table, ground, external_vocabulary, maybe_copy = '' if ground.copy_to_file: maybe_copy = f'COPY {ground.table_name} TO \'{ground.copy_to_file}\';\n' - export_statement = ( - maybe_drop_table + + create_statement = ( 'CREATE TABLE {name} AS {dependency_sql}'.format( name=ground.table_name, - dependency_sql=FormatSql(dependency_sql)) + - maybe_copy) + dependency_sql=FormatSql(dependency_sql))) + + if self.program.annotations.Engine() == 'clickhouse': + if ground.overwrite: + self.AddClickhouseDropAction(table, ground) + export_statement = create_statement + else: + export_statement = maybe_drop_table + create_statement + maybe_copy export_statement = self.program.UseFlagsAsParameters(export_statement) # It's cheap to store a string multiple times in Python, as it's stored diff --git a/tools/run_in_terminal.py b/tools/run_in_terminal.py index e5c8360d..50c4a5b9 100644 --- a/tools/run_in_terminal.py +++ b/tools/run_in_terminal.py @@ -27,6 +27,7 @@ from common import psql_logica from common import sqlite3_logica from common import duckdb_logica + from common import clickhouse_logica from compiler import functors from compiler import rule_translate from type_inference.research import infer @@ -37,6 +38,7 @@ from ..common import psql_logica from ..common import sqlite3_logica from ..common import duckdb_logica + from ..common import clickhouse_logica from ..compiler import functors from ..compiler import rule_translate from ..type_inference.research import infer @@ -45,7 +47,7 @@ class SqlRunner(object): def __init__(self, engine, logic_program=None): self.engine = engine - assert engine in ['sqlite', 'bigquery', 'psql', 'duckdb'] + assert engine in ['sqlite', 'bigquery', 'psql', 'duckdb', 'clickhouse'] if engine == 'sqlite': self.connection = sqlite3_logica.SqliteConnect() else: @@ -63,6 +65,8 @@ def __init__(self, engine, logic_program=None): self.connection = psql_logica.ConnectToPostgres('environment') if engine == 'duckdb': self.connection = duckdb_logica.GetConnection(logic_program) + if engine == 'clickhouse': + self.connection = clickhouse_logica.ClickHouseConnect(logic_program) self.bq_credentials = credentials self.bq_project = project @@ -113,6 +117,11 @@ def RunSQL(sql, engine, connection=None, is_final=False, return cur.columns, cur.fetchall() else: connection.sql(sql) + elif engine == 'clickhouse': + if is_final: + return connection.RunQueryHeaderRows(sql) + else: + connection.RunStatement(sql) else: raise Exception('Logica only supports BigQuery, PostgreSQL and SQLite '