From cc42bac43ae49ca7836bbfabaa5534d642e4cf82 Mon Sep 17 00:00:00 2001 From: Geoff Genz Date: Tue, 21 Jan 2025 15:46:22 -0700 Subject: [PATCH 01/31] Gg/update test jwt (#458) * update test jwt, ensure query params are final * tweak test job * Assume CI "secret" is full JWT * Fix lint --- .github/workflows/on_push.yml | 84 ++++++++---------------- .github/workflows/timeplus_ci.yml | 13 +--- tests/integration_tests/test_jwt_auth.py | 11 +--- timeplus_connect/driver/httpclient.py | 10 +-- 4 files changed, 38 insertions(+), 80 deletions(-) diff --git a/.github/workflows/on_push.yml b/.github/workflows/on_push.yml index 3f20a155..f147e923 100644 --- a/.github/workflows/on_push.yml +++ b/.github/workflows/on_push.yml @@ -1,33 +1,33 @@ -name: 'Lint and Test' +name: "Lint and Test" on: pull_request: branches: - timeplus paths-ignore: - - 'VERSION' - - 'LICENSE' - - '**.md' - - 'examples' - - 'publish.yaml' - - '.github/workflows/clickhouse_ci.yml' - - '.github/workflows/on_push.yml' + - "VERSION" + - "LICENSE" + - "**.md" + - "examples" + - "publish.yaml" + - ".github/workflows/clickhouse_ci.yml" + - ".github/workflows/on_push.yml" workflow_dispatch: push: branches-ignore: - - '*_test' - - '*_dev' - - '*_build' - - 'release_*' + - "*_test" + - "*_dev" + - "*_build" + - "release_*" - timeplus paths-ignore: - - 'VERSION' - - 'LICENSE' - - '**.md' - - 'examples' - - 'publish.yaml' - - '.github/workflows/clickhouse_ci.yml' - - '.github/workflows/on_push.yml' + - "VERSION" + - "LICENSE" + - "**.md" + - "examples" + - "publish.yaml" + - ".github/workflows/clickhouse_ci.yml" + - ".github/workflows/on_push.yml" jobs: lint: @@ -59,11 +59,11 @@ jobs: strategy: matrix: python-version: - - '3.9' - - '3.10' - - '3.11' - - '3.12' - - '3.13' + - "3.9" + - "3.10" + - "3.11" + - "3.12" + - "3.13" timeplus-version: - latest @@ -76,19 +76,19 @@ jobs: env: TIMEPLUS_CONNECT_TEST_TP_VERSION: ${{ matrix.timeplus-version }} with: - compose-file: 'docker-compose.yml' - down-flags: '--volumes' + compose-file: "docker-compose.yml" + down-flags: "--volumes" - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} - name: Install pip - run: python -m pip install --upgrade pip + run: python -m pip install --upgrade pip - name: Install Test Dependencies run: pip install -r tests/test_requirements.txt - name: Build cython extensions run: python setup.py build_ext --inplace - - name: "Add distribution info" # This lets SQLAlchemy find entry points + - name: "Add distribution info" # This lets SQLAlchemy find entry points run: python setup.py develop - name: Add ClickHouse TLS instance to /etc/hosts run: | @@ -97,7 +97,7 @@ jobs: - name: Run tests env: # CLICKHOUSE_CONNECT_TEST_TLS: 1 - CLICKHOUSE_CONNECT_TEST_DOCKER: 'False' + CLICKHOUSE_CONNECT_TEST_DOCKER: "False" CLICKHOUSE_CONNECT_TEST_FUZZ: 50 SQLALCHEMY_SILENCE_UBER_WARNING: 1 run: pytest tests @@ -125,29 +125,3 @@ jobs: # python-version: # - '3.10' # - '3.11' - - # steps: - # - name: Checkout - # uses: actions/checkout@v4 - # - name: Set up Python ${{ matrix.python-version }} - # uses: actions/setup-python@v5 - # with: - # python-version: ${{ matrix.python-version }} - # - name: Install dependencies - # run: | - # python -m pip install --upgrade pip - # pip install -r tests/test_requirements.txt - # - name: Build cython extensions - # run: python setup.py build_ext --inplace - # - name: "Add distribution info" # This lets SQLAlchemy find entry points - # run: python setup.py develop - # - name: Run tests - # env: - # CLICKHOUSE_CONNECT_TEST_FUZZ: 10 - # CLICKHOUSE_CONNECT_TEST_CLOUD: 'True' - # CLICKHOUSE_CONNECT_TEST_PORT: 8443 - # CLICKHOUSE_CONNECT_TEST_HOST: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_HOST_SMT }} - # CLICKHOUSE_CONNECT_TEST_PASSWORD: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_PASSWORD_SMT }} - # CLICKHOUSE_CONNECT_TEST_JWT_SECRET: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_JWT_PRIVATE_KEY }} - # SQLALCHEMY_SILENCE_UBER_WARNING: 1 - # run: pytest tests/integration_tests diff --git a/.github/workflows/timeplus_ci.yml b/.github/workflows/timeplus_ci.yml index 1cbc43af..ca51abff 100644 --- a/.github/workflows/timeplus_ci.yml +++ b/.github/workflows/timeplus_ci.yml @@ -10,7 +10,7 @@ jobs: runs-on: ubuntu-latest name: Timeplus CI Tests env: - CLICKHOUSE_CONNECT_TEST_DOCKER: 'False' + CLICKHOUSE_CONNECT_TEST_DOCKER: "False" CLICKHOUSE_CONNECT_TEST_FUZZ: 50 steps: - name: Checkout @@ -25,18 +25,9 @@ jobs: run: pip install -r tests/test_requirements.txt - name: Build cython extensions run: python setup.py build_ext --inplace - - name: "Add distribution info" # This lets SQLAlchemy find entry points + - name: "Add distribution info" # This lets SQLAlchemy find entry points run: python setup.py develop - # - name: run ClickHouse Cloud tests - # env: - # CLICKHOUSE_CONNECT_TEST_PORT: 3218 - # CLICKHOUSE_CONNECT_TEST_CLOUD: 'True' - # CLICKHOUSE_CONNECT_TEST_HOST: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_HOST_SMT }} - # CLICKHOUSE_CONNECT_TEST_PASSWORD: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_PASSWORD_SMT }} - # CLICKHOUSE_CONNECT_TEST_JWT_SECRET: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_JWT_PRIVATE_KEY }} - # run: pytest tests/integration_tests - - name: Run Timeplus Container (LATEST) run: TIMEPLUS_CONNECT_TEST_TP_VERSION=latest docker compose up -d timeplus - name: Run LATEST tests diff --git a/tests/integration_tests/test_jwt_auth.py b/tests/integration_tests/test_jwt_auth.py index 678f128d..f5fce873 100644 --- a/tests/integration_tests/test_jwt_auth.py +++ b/tests/integration_tests/test_jwt_auth.py @@ -1,4 +1,3 @@ -from datetime import datetime, timezone, timedelta from os import environ # pylint: disable=no-member @@ -158,12 +157,4 @@ def make_access_token(): secret = environ.get(JWT_SECRET_ENV_KEY) if not secret: raise ValueError(f'{JWT_SECRET_ENV_KEY} environment variable is not set') - payload = { - 'iss': 'ClickHouse', - 'sub': 'CI_Test', - 'aud': '1f7f78b8-da67-480b-8913-726fdd31d2fc', - 'clickhouse:roles': ['default'], - 'clickhouse:grants': [], - 'exp': datetime.now(tz=timezone.utc) + timedelta(minutes=15) - } - return jwt.encode(payload, secret, algorithm='RS256') + return secret diff --git a/timeplus_connect/driver/httpclient.py b/timeplus_connect/driver/httpclient.py index d5ea36e1..6e3501d2 100644 --- a/timeplus_connect/driver/httpclient.py +++ b/timeplus_connect/driver/httpclient.py @@ -407,16 +407,18 @@ def _raw_request(self, data = data.encode() headers = dict_copy(self.headers, headers) attempts = 0 + final_params = {} if server_wait: - params['wait_end_of_query'] = '1' + final_params['wait_end_of_query'] = '1' # We can't actually read the progress headers, but we enable them so ClickHouse sends something # to keep the connection alive when waiting for long-running queries and (2) to get summary information # if not streaming if self._send_progress: - params['send_progress_in_http_headers'] = '1' + final_params['send_progress_in_http_headers'] = '1' if self._progress_interval: - params['http_headers_progress_interval_ms'] = self._progress_interval - final_params = dict_copy(self.params, params) + final_params['http_headers_progress_interval_ms'] = self._progress_interval + final_params = dict_copy(self.params, final_params) + final_params = dict_copy(final_params, params) url = f'{self.url}?{urlencode(final_params)}' kwargs = { 'headers': headers, From 971301de781730cde1a6f0db899f57b83ce7a766 Mon Sep 17 00:00:00 2001 From: pufit Date: Tue, 21 Jan 2025 22:04:35 -0500 Subject: [PATCH 02/31] Fix memory leak in AsyncClient (#457) * Fix memory leak in AsyncClient * Graceful shutdown * fix test * fix tests * fix tests --- examples/run_async.py | 1 + tests/integration_tests/conftest.py | 10 ++++++---- tests/integration_tests/test_session_id.py | 6 +++--- timeplus_connect/driver/asyncclient.py | 9 ++++++++- 4 files changed, 18 insertions(+), 8 deletions(-) diff --git a/examples/run_async.py b/examples/run_async.py index a9c1a70c..6479999e 100644 --- a/examples/run_async.py +++ b/examples/run_async.py @@ -41,6 +41,7 @@ async def semaphore_wrapper(sm: asyncio.Semaphore, num: int): semaphore = asyncio.Semaphore(SEMAPHORE) await asyncio.gather(*[semaphore_wrapper(semaphore, num) for num in range(QUERIES)]) + await client.close() async def main(): diff --git a/tests/integration_tests/conftest.py b/tests/integration_tests/conftest.py index 9270cff0..0c66114c 100644 --- a/tests/integration_tests/conftest.py +++ b/tests/integration_tests/conftest.py @@ -2,8 +2,9 @@ import os import time from subprocess import Popen, PIPE -from typing import Iterator, NamedTuple, Sequence, Optional, Callable +from typing import Iterator, NamedTuple, Sequence, Optional, Callable, AsyncContextManager +import pytest_asyncio from pytest import fixture from timeplus_connect import common @@ -126,9 +127,10 @@ def test_client_fixture(test_config: TestConfig, test_create_client: Callable) - sys.stderr.write('Successfully stopped docker compose') -@fixture(scope='session', autouse=True, name='test_async_client') -def test_async_client_fixture(test_client: Client) -> Iterator[AsyncClient]: - yield AsyncClient(client=test_client) +@pytest_asyncio.fixture(scope='session', autouse=True, name='test_async_client') +async def test_async_client_fixture(test_client: Client) -> AsyncContextManager[AsyncClient]: + async with AsyncClient(client=test_client) as client: + yield client @fixture(scope='session', name='table_context') diff --git a/tests/integration_tests/test_session_id.py b/tests/integration_tests/test_session_id.py index 9a0350eb..7781960b 100644 --- a/tests/integration_tests/test_session_id.py +++ b/tests/integration_tests/test_session_id.py @@ -46,7 +46,7 @@ async def test_async_client_default_session_id(test_config: TestConfig): user=test_config.username, password=test_config.password) assert async_client.get_client_setting(SESSION_KEY) is None - async_client.close() + await async_client.close() @pytest.mark.asyncio @@ -62,7 +62,7 @@ async def test_async_client_autogenerate_session_id(test_config: TestConfig): uuid.UUID(session_id) except ValueError: pytest.fail(f"Invalid session_id: {session_id}") - async_client.close() + await async_client.close() @pytest.mark.asyncio @@ -75,4 +75,4 @@ async def test_async_client_custom_session_id(test_config: TestConfig): password=test_config.password, session_id=session_id) assert async_client.get_client_setting(SESSION_KEY) == session_id - async_client.close() + await async_client.close() diff --git a/timeplus_connect/driver/asyncclient.py b/timeplus_connect/driver/asyncclient.py index 53b889fd..28a8d6ec 100644 --- a/timeplus_connect/driver/asyncclient.py +++ b/timeplus_connect/driver/asyncclient.py @@ -64,11 +64,12 @@ def min_version(self, version_str: str) -> bool: """ return self.client.min_version(version_str) - def close(self): + async def close(self): """ Subclass implementation to close the connection to the server/deallocate the client """ self.client.close() + await asyncio.to_thread(self.executor.shutdown, True) async def query(self, query: Optional[str] = None, @@ -676,3 +677,9 @@ def _raw_insert(): loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _raw_insert) return result + + async def __aenter__(self) -> "AsyncClient": + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: + await self.close() From ff40f45d9c71b7824ab2115940933293590a24df Mon Sep 17 00:00:00 2001 From: Geoff Genz Date: Sat, 25 Jan 2025 14:47:47 -0700 Subject: [PATCH 03/31] Fix lint (#459) --- CHANGELOG.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3a6667ce..cd9acd2a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # ClickHouse Connect ChangeLog +### WARNING -- Breaking change for AsyncClient close() +The AsyncClient close() method is now async and should be called as an async function. + ### WARNING -- Python 3.8 EOL Python 3.8 was EOL on 2024-10-07. It is no longer tested, and versions after 2025-04-07 will not include Python 3.8 wheel distributions. @@ -17,6 +20,14 @@ release (0.9.0), unrecognized arguments/keywords for these methods of creating a instead of being passed as ClickHouse server settings. This is in conjunction with some refactoring in Client construction. The supported method of passing ClickHouse server settings is to prefix such arguments/query parameters with`ch_`. +## 0.8.15, 2025-01-25 +### Bug Fix +- The async client was not shutting down its associated executor thread pool, result in a memory leak if multiple +async clients were created. Closes https://github.com/ClickHouse/clickhouse-connect/issues/424. Note that the `close` +function for the async client is now async to cleanly close down the pool. The recommended way to use an async client +is now within an AsyncContext. See the associated [PR](https://github.com/ClickHouse/clickhouse-connect/pull/457) for details. +Thanks to ClickHouse core developer @pufit for the fix! + ## 0.8.14, 2025-01-13 ### Bug Fix - Fix an edge case where a Pandas dataframe that contains _only_ Int64 (or smaller) values would cause an exception when From ec77aa5edd5e3603e0c9535103f2dcc2736ad121 Mon Sep 17 00:00:00 2001 From: Geoff Genz Date: Sat, 25 Jan 2025 16:19:41 -0700 Subject: [PATCH 04/31] Exclude 3.8 Aarch64 builds (#460) * Fix lint * Exclude pypy 38 build * Exclude all Python 3.8 builds * Update changelog re Python 3.8 aarch64 wheels --- .github/workflows/publish.yml | 22 +++++++++++----------- CHANGELOG.md | 3 ++- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index b917514a..2878e91f 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -12,18 +12,18 @@ on: - release - build env: - CIBW_SKIP: 'cp36-* cp37-* pp37-*' + CIBW_SKIP: 'cp36-* cp37-* cp38-* pp37-*' jobs: build_x86_manylinux_wheels: name: Build x86 manylinux wheels on Linux runs-on: ubuntu-latest env: - CIBW_SKIP: 'cp36-* cp37-* pp37-* *-musllinux*' + CIBW_SKIP: 'cp36-* cp37-* pp37-* pp38-* *-musllinux*' steps: - uses: actions/checkout@v4 - name: Build wheels - uses: pypa/cibuildwheel@v2.21.3 + uses: pypa/cibuildwheel@v2.22.0 - uses: actions/upload-artifact@v4 with: name: build-x86-manylinux @@ -33,11 +33,11 @@ jobs: name: Build x86 musllinux wheels on Linux runs-on: ubuntu-latest env: - CIBW_SKIP: 'cp36-* cp37-* pp37-* *-manylinux*' + CIBW_SKIP: 'cp36-* cp37-* pp37-* pp38-* *-manylinux*' steps: - uses: actions/checkout@v4 - name: Build wheels - uses: pypa/cibuildwheel@v2.21.3 + uses: pypa/cibuildwheel@v2.22.0 - uses: actions/upload-artifact@v4 with: name: build-x86-musllinux @@ -56,7 +56,7 @@ jobs: with: platforms: all - name: Build wheels - uses: pypa/cibuildwheel@v2.21.3 + uses: pypa/cibuildwheel@v2.22.0 - uses: actions/upload-artifact@v4 with: name: build-aarch64-manylinux @@ -75,7 +75,7 @@ jobs: with: platforms: all - name: Build wheels - uses: pypa/cibuildwheel@v2.21.3 + uses: pypa/cibuildwheel@v2.22.0 - uses: actions/upload-artifact@v4 with: name: build-aarch64-musllinux @@ -86,7 +86,7 @@ jobs: runs-on: ubuntu-latest env: CIBW_ARCHS_LINUX: aarch64 - CIBW_BUILD: 'pp*' + CIBW_BUILD: 'pp39-* pp310-*' steps: - uses: actions/checkout@v4 - name: Set up QEMU @@ -94,7 +94,7 @@ jobs: with: platforms: all - name: Build wheels - uses: pypa/cibuildwheel@v2.21.3 + uses: pypa/cibuildwheel@v2.22.0 - uses: actions/upload-artifact@v4 with: name: build-aarch64-pypy @@ -106,7 +106,7 @@ jobs: steps: - uses: actions/checkout@v4 - name: Build wheels - uses: pypa/cibuildwheel@v2.21.3 + uses: pypa/cibuildwheel@v2.22.0 env: CIBW_ARCHS_MACOS: x86_64 arm64 - uses: actions/upload-artifact@v4 @@ -122,7 +122,7 @@ jobs: steps: - uses: actions/checkout@v4 - name: Build wheels - uses: pypa/cibuildwheel@v2.21.3 + uses: pypa/cibuildwheel@v2.22.0 - uses: actions/upload-artifact@v4 with: name: build-windows diff --git a/CHANGELOG.md b/CHANGELOG.md index cd9acd2a..883fbcb2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,8 @@ The AsyncClient close() method is now async and should be called as an async fun ### WARNING -- Python 3.8 EOL Python 3.8 was EOL on 2024-10-07. It is no longer tested, and versions after 2025-04-07 will not include Python -3.8 wheel distributions. +3.8 wheel distributions. As of version 0.8.15, wheels are not built for Python 3.8 AARCH64 versions due to +missing dependencies in the build chain. ### WARNING -- JSON Incompatibility between versions 22.8 and 22.10 The internal serialization format for experimental JSON was updated in ClickHouse version 24.10. `clickhouse-connect` From d94e1ffcfa0d2666f55f2a114ffd837766775324 Mon Sep 17 00:00:00 2001 From: Geoff Genz Date: Sun, 2 Feb 2025 13:42:40 -0700 Subject: [PATCH 05/31] Gg/update test matrix (#464) * Update some tests * Fix lint * Skip JSON buggy test --- tests/integration_tests/test_client.py | 4 +++- tests/integration_tests/test_dynamic.py | 7 +++++++ tests/integration_tests/test_sqlalchemy/test_basics.py | 2 +- 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/tests/integration_tests/test_client.py b/tests/integration_tests/test_client.py index b326b23c..105c5f13 100644 --- a/tests/integration_tests/test_client.py +++ b/tests/integration_tests/test_client.py @@ -174,8 +174,10 @@ def test_error_decode(test_client: Client): def test_command_as_query(test_client: Client): + # Test that non-SELECT and non-INSERT statements are treated as commands and + # just return the QueryResult metadata result = test_client.query("SET count_distinct_implementation = 'uniq'") - assert result.first_item['written_rows'] == 0 + assert 'query_id' in result.first_item def test_show_create(test_client: Client): diff --git a/tests/integration_tests/test_dynamic.py b/tests/integration_tests/test_dynamic.py index 59d1a0c8..819aefed 100644 --- a/tests/integration_tests/test_dynamic.py +++ b/tests/integration_tests/test_dynamic.py @@ -155,3 +155,10 @@ def test_complex_json(test_client: Client, table_context: Callable): result = test_client.query('SELECT * except _tp_time FROM new_json_complex ORDER BY key') json1 = result.result_set[0][1] assert json1['t']['a'] == 'qwe123' + + +def test_json_str_time(test_client: Client): + if not test_client.min_version('25.1'): + pytest.skip('JSON string/numbers bug before 25.1, skipping') + result = test_client.query("SELECT '{\"timerange\": \"2025-01-01T00:00:00+0000\"}'::JSON").result_set + assert result[0][0]['timerange'] == datetime.datetime(2025, 1, 1) diff --git a/tests/integration_tests/test_sqlalchemy/test_basics.py b/tests/integration_tests/test_sqlalchemy/test_basics.py index 084c0ed0..f52cb9f1 100644 --- a/tests/integration_tests/test_sqlalchemy/test_basics.py +++ b/tests/integration_tests/test_sqlalchemy/test_basics.py @@ -53,7 +53,7 @@ def test_execute(test_engine: Engine): assert len(rows) == 2 rows = list(row for row in conn.execute('DROP STREAM IF EXISTS dummy_table')) - assert rows[0][0] == 0 + assert len(rows) > 0 # This is just the metadata from the "command" QueryResult rows = list(row for row in conn.execute('DESCRIBE dummy')) assert len(rows) == 3 From fce0d4ec28aafd009ae129a6ca70cb9cdc2f2e24 Mon Sep 17 00:00:00 2001 From: Geoff Genz Date: Wed, 5 Feb 2025 08:51:32 -0700 Subject: [PATCH 06/31] Fix CI tests with default user (#465) --- .docker/clickhouse/single_node/config.xml | 2 +- .docker/clickhouse/single_node_tls/config.xml | 2 +- docker-compose.yml | 20 ++++++++++--------- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/.docker/clickhouse/single_node/config.xml b/.docker/clickhouse/single_node/config.xml index c0f76f36..cba58a5f 100644 --- a/.docker/clickhouse/single_node/config.xml +++ b/.docker/clickhouse/single_node/config.xml @@ -17,7 +17,7 @@ 3 - debug + warning /var/log/clickhouse-server/clickhouse-server.log /var/log/clickhouse-server/clickhouse-server.err.log 1000M diff --git a/.docker/clickhouse/single_node_tls/config.xml b/.docker/clickhouse/single_node_tls/config.xml index d25ffbaa..a087ac4d 100644 --- a/.docker/clickhouse/single_node_tls/config.xml +++ b/.docker/clickhouse/single_node_tls/config.xml @@ -17,7 +17,7 @@ /var/lib/clickhouse/access/ - debug + warning /var/log/clickhouse-server/clickhouse-server.log /var/log/clickhouse-server/clickhouse-server.err.log 1000M diff --git a/docker-compose.yml b/docker-compose.yml index 8b59af15..238419e6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,11 +1,11 @@ services: timeplus: image: "d.timeplus.com/timeplus-io/proton:latest" - container_name: 'timeplus-connect-timeplus-server' + container_name: "timeplus-connect-timeplus-server" ports: - - '8463:8463' - - '8123:8123' - - '3218:3218' + - "8463:8463" + - "8123:8123" + - "3218:3218" ulimits: nofile: soft: 262144 @@ -17,14 +17,16 @@ services: build: context: ./ dockerfile: .docker/clickhouse/single_node_tls/Dockerfile - container_name: 'clickhouse-connect-clickhouse-server-tls' + container_name: "clickhouse-connect-clickhouse-server-tls" + environment: + CLICKHOUSE_SKIP_USER_SETUP: 1 ports: - - '10843:8443' - - '10840:9440' + - "10843:8443" + - "10840:9440" ulimits: nofile: soft: 262144 hard: 262144 volumes: - - './.docker/clickhouse/single_node_tls/config.xml:/etc/clickhouse-server/config.xml' - - './.docker/clickhouse/single_node_tls/users.xml:/etc/clickhouse-server/users.xml' + - "./.docker/clickhouse/single_node_tls/config.xml:/etc/clickhouse-server/config.xml" + - "./.docker/clickhouse/single_node_tls/users.xml:/etc/clickhouse-server/users.xml" From c89705f88607dfc03a3a2ea6aa110c229879e830 Mon Sep 17 00:00:00 2001 From: Sviatoslav Bobryshev <61021258+sbobryshev@users.noreply.github.com> Date: Sat, 15 Feb 2025 20:10:28 +0300 Subject: [PATCH 07/31] Replace removal of ; in the loop line with rstrip (#472) --- timeplus_connect/driver/binding.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/timeplus_connect/driver/binding.py b/timeplus_connect/driver/binding.py index 1d611b23..e266ad75 100644 --- a/timeplus_connect/driver/binding.py +++ b/timeplus_connect/driver/binding.py @@ -40,8 +40,7 @@ def quote_identifier(identifier: str): def finalize_query(query: str, parameters: Optional[Union[Sequence, Dict[str, Any]]], server_tz: Optional[tzinfo] = None) -> str: - while query.endswith(';'): - query = query[:-1] + query = query.rstrip(";") if not parameters: return query if hasattr(parameters, 'items'): @@ -52,8 +51,7 @@ def finalize_query(query: str, parameters: Optional[Union[Sequence, Dict[str, An # pylint: disable=too-many-locals,too-many-branches def bind_query(query: str, parameters: Optional[Union[Sequence, Dict[str, Any]]], server_tz: Optional[tzinfo] = None) -> Tuple[str, Dict[str, str]]: - while query.endswith(';'): - query = query[:-1] + query = query.rstrip(";") if not parameters: return query, {} From a2b0adf66b2a176ff912e425a4c5a2838ad3913a Mon Sep 17 00:00:00 2001 From: Geoff Genz Date: Sat, 15 Feb 2025 15:29:06 -0700 Subject: [PATCH 08/31] Docker test fixes (#473) * Don't prevent settings that don't change the value * Add docker related config file * Fix typo --- .docker/clickhouse/single_node/config.xml | 12 +++++++----- .../single_node/docker_related_config.xml | 5 +++++ .docker/clickhouse/single_node_tls/Dockerfile | 2 +- .docker/clickhouse/single_node_tls/config.xml | 12 +++++++----- .../single_node_tls/docker_related_config.xml | 5 +++++ docker-compose.yml | 18 ------------------ tests/integration_tests/test_dynamic.py | 4 ++++ timeplus_connect/driver/client.py | 11 +++++++---- 8 files changed, 36 insertions(+), 33 deletions(-) create mode 100644 .docker/clickhouse/single_node/docker_related_config.xml create mode 100644 .docker/clickhouse/single_node_tls/docker_related_config.xml diff --git a/.docker/clickhouse/single_node/config.xml b/.docker/clickhouse/single_node/config.xml index cba58a5f..076ed6ce 100644 --- a/.docker/clickhouse/single_node/config.xml +++ b/.docker/clickhouse/single_node/config.xml @@ -1,13 +1,8 @@ - 8123 - 9000 - - users.xml default default - 5368709120 /var/lib/clickhouse/ @@ -15,6 +10,7 @@ /var/lib/clickhouse/user_files/ /var/lib/clickhouse/access/ 3 + /var/lib/clickhouse/format_schemas/ warning @@ -37,5 +33,11 @@ session_log
+ + + users.xml + + + SQL_
diff --git a/.docker/clickhouse/single_node/docker_related_config.xml b/.docker/clickhouse/single_node/docker_related_config.xml new file mode 100644 index 00000000..e02bb050 --- /dev/null +++ b/.docker/clickhouse/single_node/docker_related_config.xml @@ -0,0 +1,5 @@ + + 0.0.0.0 + 8123 + 9000 + \ No newline at end of file diff --git a/.docker/clickhouse/single_node_tls/Dockerfile b/.docker/clickhouse/single_node_tls/Dockerfile index 0fd1a6f5..ca4438ae 100644 --- a/.docker/clickhouse/single_node_tls/Dockerfile +++ b/.docker/clickhouse/single_node_tls/Dockerfile @@ -1,4 +1,4 @@ -FROM clickhouse/clickhouse-server:24.8-alpine +FROM clickhouse/clickhouse-server:25.1-alpine COPY .docker/clickhouse/single_node_tls/certificates /etc/clickhouse-server/certs RUN chown clickhouse:clickhouse -R /etc/clickhouse-server/certs \ && chmod 600 /etc/clickhouse-server/certs/* \ diff --git a/.docker/clickhouse/single_node_tls/config.xml b/.docker/clickhouse/single_node_tls/config.xml index a087ac4d..4ff6cc01 100644 --- a/.docker/clickhouse/single_node_tls/config.xml +++ b/.docker/clickhouse/single_node_tls/config.xml @@ -1,11 +1,6 @@ - 8443 - 9440 - 0.0.0.0 - - users.xml default default @@ -15,6 +10,7 @@ /var/lib/clickhouse/tmp/ /var/lib/clickhouse/user_files/ /var/lib/clickhouse/access/ + /var/lib/clickhouse/format_schemas/ warning @@ -45,4 +41,10 @@ SQL_ + + + + users.xml + + diff --git a/.docker/clickhouse/single_node_tls/docker_related_config.xml b/.docker/clickhouse/single_node_tls/docker_related_config.xml new file mode 100644 index 00000000..80d38d81 --- /dev/null +++ b/.docker/clickhouse/single_node_tls/docker_related_config.xml @@ -0,0 +1,5 @@ + + 0.0.0.0 + 8443 + 9440 + \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 238419e6..211edbf2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -12,21 +12,3 @@ services: hard: 262144 volumes: - /mnt/timeplusd:/var/lib/timeplusd - - clickhouse_tls: - build: - context: ./ - dockerfile: .docker/clickhouse/single_node_tls/Dockerfile - container_name: "clickhouse-connect-clickhouse-server-tls" - environment: - CLICKHOUSE_SKIP_USER_SETUP: 1 - ports: - - "10843:8443" - - "10840:9440" - ulimits: - nofile: - soft: 262144 - hard: 262144 - volumes: - - "./.docker/clickhouse/single_node_tls/config.xml:/etc/clickhouse-server/config.xml" - - "./.docker/clickhouse/single_node_tls/users.xml:/etc/clickhouse-server/users.xml" diff --git a/tests/integration_tests/test_dynamic.py b/tests/integration_tests/test_dynamic.py index 819aefed..11f65ecc 100644 --- a/tests/integration_tests/test_dynamic.py +++ b/tests/integration_tests/test_dynamic.py @@ -162,3 +162,7 @@ def test_json_str_time(test_client: Client): pytest.skip('JSON string/numbers bug before 25.1, skipping') result = test_client.query("SELECT '{\"timerange\": \"2025-01-01T00:00:00+0000\"}'::JSON").result_set assert result[0][0]['timerange'] == datetime.datetime(2025, 1, 1) + + # The following query is broken -- looks like something to do with Nullable(String) in the Tuple + # result = test_client.query("SELECT'{\"k\": [123, \"xyz\"]}'::JSON", + # settings={'input_format_json_read_numbers_as_strings': 0}).result_set diff --git a/timeplus_connect/driver/client.py b/timeplus_connect/driver/client.py index 3390707c..cc962939 100644 --- a/timeplus_connect/driver/client.py +++ b/timeplus_connect/driver/client.py @@ -122,9 +122,14 @@ def _validate_settings(self, settings: Optional[Dict[str, Any]]) -> Dict[str, st return validated def _validate_setting(self, key: str, value: Any, invalid_action: str) -> Optional[str]: + new_value = str(value) + if value is True: + new_value = '1' + elif value is False: + new_value = '0' if key not in self.valid_transport_settings: setting_def = self.server_settings.get(key) - if setting_def is None or setting_def.readonly: + if setting_def is None or (setting_def.readonly and setting_def.value != new_value): if key in self.optional_transport_settings: return None if invalid_action == 'send': @@ -134,9 +139,7 @@ def _validate_setting(self, key: str, value: Any, invalid_action: str) -> Option return None else: raise ProgrammingError(f'Setting {key} is unknown or readonly') from None - if isinstance(value, bool): - return '1' if value else '0' - return str(value) + return new_value def _setting_status(self, key: str) -> SettingStatus: comp_setting = self.server_settings.get(key) From 0c6abd129041ccf7e71e3c6b6587901e50c0e4ce Mon Sep 17 00:00:00 2001 From: Geoff Genz Date: Fri, 21 Feb 2025 14:52:29 -0700 Subject: [PATCH 09/31] Update README.md doc link (#476) From a65635f74d6c24c4b876e0eafb36182d984083ca Mon Sep 17 00:00:00 2001 From: Geoff Genz Date: Tue, 21 Jan 2025 15:46:22 -0700 Subject: [PATCH 10/31] Gg/update test jwt (#458) * update test jwt, ensure query params are final * tweak test job * Assume CI "secret" is full JWT * Fix lint --- .github/workflows/on_push.yml | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/.github/workflows/on_push.yml b/.github/workflows/on_push.yml index f147e923..f33bce0e 100644 --- a/.github/workflows/on_push.yml +++ b/.github/workflows/on_push.yml @@ -113,15 +113,3 @@ jobs: CLOUD_HOST: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_HOST }} if: "${{ env.CLOUD_HOST != '' }}" run: echo "HAS_SECRETS=true" >> $GITHUB_OUTPUT - - # cloud-tests: - # runs-on: ubuntu-latest - # name: Cloud Tests Py=${{ matrix.python-version }} - # needs: check-secret - # if: needs.check-secret.outputs.has_secrets == 'true' - - # strategy: - # matrix: - # python-version: - # - '3.10' - # - '3.11' From 6f9ccde206c72162b3cdd0966983df79a29a618a Mon Sep 17 00:00:00 2001 From: pufit Date: Tue, 21 Jan 2025 22:04:35 -0500 Subject: [PATCH 11/31] Fix memory leak in AsyncClient (#457) * Fix memory leak in AsyncClient * Graceful shutdown * fix test * fix tests * fix tests From 2802cec417640b97d69002bd72306178a5857b75 Mon Sep 17 00:00:00 2001 From: Geoff Genz Date: Sat, 25 Jan 2025 14:47:47 -0700 Subject: [PATCH 12/31] Fix lint (#459) From c50f66edafe89a53dd15e006e7bee1c62ee7adca Mon Sep 17 00:00:00 2001 From: "Avery Fischer (biggerfisch)" Date: Fri, 21 Mar 2025 18:18:12 +0100 Subject: [PATCH 13/31] Correct typing of create_client(host, username) (#482) The parameters `host` and `username` of `create_client` actually do accept None values, as demonstrated by their default values being `None` and the docstring explaining default behavior when not-set. Correcting these types (by marking as Optional) allows users using `dsn` or default behavior to not see type-checking errors. --- timeplus_connect/driver/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/timeplus_connect/driver/__init__.py b/timeplus_connect/driver/__init__.py index 05e65d0a..97ce38b5 100644 --- a/timeplus_connect/driver/__init__.py +++ b/timeplus_connect/driver/__init__.py @@ -13,8 +13,8 @@ # pylint: disable=too-many-arguments,too-many-locals,too-many-branches def create_client(*, - host: str = None, - username: str = None, + host: Optional[str] = None, + username: Optional[str] = None, password: str = '', access_token: Optional[str] = None, database: str = 'default', From f40d684cd0d3e0a63c89931e9b65a14bc0f572cf Mon Sep 17 00:00:00 2001 From: Geoff Genz Date: Fri, 28 Mar 2025 16:59:36 -0600 Subject: [PATCH 14/31] Release 0 8 16 (#485) * Check for optional libraries in client methods * Log unexpected http next chunk unexpected * Log unexpected http next chunk unexpected * Updates for 0.8.16 release --- CHANGELOG.md | 17 +++++++++++++++++ timeplus_connect/common.py | 3 ++- timeplus_connect/driver/client.py | 22 ++++++++++++++++------ timeplus_connect/driver/httpclient.py | 2 +- timeplus_connect/driver/httputil.py | 3 ++- 5 files changed, 38 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 883fbcb2..f00faea9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,23 @@ release (0.9.0), unrecognized arguments/keywords for these methods of creating a instead of being passed as ClickHouse server settings. This is in conjunction with some refactoring in Client construction. The supported method of passing ClickHouse server settings is to prefix such arguments/query parameters with`ch_`. +## 0.8.16, 2025-03-28 +### Bug Fixes +- Don't send a setting value if the setting is already correct according to the `system.settings` table. +Closes https://github.com/ClickHouse/clickhouse-connect/issues/469 +- Ensure that the http `user_agent` header is in ascii. Note this could lead to an incorrectly encoded `os_user` if the +os_user is not an Ascii string. Closes https://github.com/ClickHouse/clickhouse-connect/issues/484 +- Fix "cannot access local variable" exception where the http client encounters an unexpected streaming error. Also +log that unexpected streaming error to assist debugging. Closes https://github.com/ClickHouse/clickhouse-connect/issues/483 +- Check that arrow/pandas is installed when calling `query_df` and `query_arrow` and raise a more meaningful exception +if the required library is absent. Closes https://github.com/ClickHouse/clickhouse-connect/issues/477 + +### Improvements +- Some typing hints have been corrected. Thanks to [Avery Fischer](https://github.com/biggerfisch) for the PR! +- The docker based tests have been fixed to work with security improvements in recent ClickHouse releases +- Query string cleanup is now (in theory) microseconds faster. Thanks to [Sviatoslav Bobryshev](https://github.com/sbobryshev) +for the optimization + ## 0.8.15, 2025-01-25 ### Bug Fix - The async client was not shutting down its associated executor thread pool, result in a memory leak if multiple diff --git a/timeplus_connect/common.py b/timeplus_connect/common.py index 49437f34..bc796e8f 100644 --- a/timeplus_connect/common.py +++ b/timeplus_connect/common.py @@ -41,8 +41,9 @@ def build_client_name(client_name: str): os_user = f'; os_user:{getpass.getuser()}' except Exception: # pylint: disable=broad-except pass - return (f'{client_name}{product_name}timeplus-connect/{version()}' + + full_name = (f'{client_name}{product_name}timeplus-connect/{version()}' + f' (lv:py/{py_version}; mode:sync; os:{sys.platform}{os_user})') + return full_name.encode('ascii', 'ignore').decode() def get_setting(name: str): diff --git a/timeplus_connect/driver/client.py b/timeplus_connect/driver/client.py index cc962939..c387254d 100644 --- a/timeplus_connect/driver/client.py +++ b/timeplus_connect/driver/client.py @@ -68,7 +68,7 @@ def __init__(self, self.uri = uri self._init_common_settings(apply_server_timezone) - def _init_common_settings(self, apply_server_timezone:Optional[Union[str, bool]] ): + def _init_common_settings(self, apply_server_timezone: Optional[Union[str, bool]]): self.server_tz, dst_safe = pytz.UTC, True self.server_version, server_tz = \ tuple(self.command('SELECT version(), timezone()', use_database=False)) @@ -122,14 +122,16 @@ def _validate_settings(self, settings: Optional[Dict[str, Any]]) -> Dict[str, st return validated def _validate_setting(self, key: str, value: Any, invalid_action: str) -> Optional[str]: - new_value = str(value) + str_value = str(value) if value is True: - new_value = '1' + str_value = '1' elif value is False: - new_value = '0' + str_value = '0' if key not in self.valid_transport_settings: setting_def = self.server_settings.get(key) - if setting_def is None or (setting_def.readonly and setting_def.value != new_value): + if setting_def and setting_def.value == str_value: + return None # don't send settings that are already the expected value + if setting_def is None or setting_def.readonly: if key in self.optional_transport_settings: return None if invalid_action == 'send': @@ -139,7 +141,7 @@ def _validate_setting(self, key: str, value: Any, invalid_action: str) -> Option return None else: raise ProgrammingError(f'Setting {key} is unknown or readonly') from None - return new_value + return str_value def _setting_status(self, key: str) -> SettingStatus: comp_setting = self.server_settings.get(key) @@ -342,6 +344,7 @@ def query_np(self, create_query_context method :return: Numpy array representing the result set """ + check_numpy() return self._context_query(locals(), use_numpy=True).np_result # pylint: disable=duplicate-code,too-many-arguments,unused-argument @@ -361,6 +364,7 @@ def query_np_stream(self, create_query_context method :return: Generator that yield a numpy array per block representing the result set """ + check_numpy() return self._context_query(locals(), use_numpy=True, streaming=True).np_stream # pylint: disable=duplicate-code,unused-argument @@ -384,6 +388,7 @@ def query_df(self, create_query_context method :return: Pandas dataframe representing the result set """ + check_pandas() return self._context_query(locals(), use_numpy=True, as_pandas=True).df_result # pylint: disable=duplicate-code,unused-argument @@ -407,6 +412,7 @@ def query_df_stream(self, create_query_context method :return: Generator that yields a Pandas dataframe per block representing the result set """ + check_pandas() return self._context_query(locals(), use_numpy=True, as_pandas=True, streaming=True).df_stream @@ -519,6 +525,7 @@ def query_arrow(self, :param external_data ClickHouse "external data" to send with query :return: PyArrow.Table """ + check_arrow() settings = self._update_arrow_settings(settings, use_strings) return to_arrow(self.raw_query(query, parameters, @@ -541,6 +548,7 @@ def query_arrow_stream(self, :param external_data ClickHouse "external data" to send with query :return: Generator that yields a PyArrow.Table for per block representing the result set """ + check_arrow() settings = self._update_arrow_settings(settings, use_strings) return to_arrow_batches(self.raw_stream(query, parameters, @@ -661,6 +669,7 @@ def insert_df(self, table: str = None, different data batches :return: QuerySummary with summary information, throws exception if insert fails """ + check_pandas() if context is None: if column_names is None: column_names = df.columns @@ -686,6 +695,7 @@ def insert_arrow(self, table: str, :param settings: Optional dictionary of ClickHouse settings (key/string values) :return: QuerySummary with summary information, throws exception if insert fails """ + check_arrow() full_table = table if '.' in table or not database else f'{database}.{table}' compression = self.write_compression if self.write_compression in ('zstd', 'lz4') else None column_names, insert_block = arrow_buffer(arrow_table, compression) diff --git a/timeplus_connect/driver/httpclient.py b/timeplus_connect/driver/httpclient.py index 6e3501d2..b56e27d2 100644 --- a/timeplus_connect/driver/httpclient.py +++ b/timeplus_connect/driver/httpclient.py @@ -528,7 +528,7 @@ def ping(self): return True # proton hasn't HTTP handle for path /ping # try: - # response = self.http.request('GET', f'{self.url}/ping', timeout=3) + # response = self.http.request('GET', f'{self.url}/ping', timeout=3, preload_content=True) # return 200 <= response.status < 300 # except HTTPError: # logger.debug('ping failed', exc_info=True) diff --git a/timeplus_connect/driver/httputil.py b/timeplus_connect/driver/httputil.py index 0937e556..df370ed7 100644 --- a/timeplus_connect/driver/httputil.py +++ b/timeplus_connect/driver/httputil.py @@ -228,12 +228,13 @@ def buffered(): read_gen = response.stream(chunk_size, decompress is None) while True: while not done: + chunk = None try: chunk = next(read_gen, None) # Always try to read at least one chunk if there are any left except Exception: # pylint: disable=broad-except # By swallowing an unexpected exception reading the stream, we will let consumers decide how to # handle the unexpected end of stream - pass + logger.warning('unexpected failure to read next chunk', exc_info=True) if not chunk: done = True break From f9079a97cbd3da58e185e1a40478e05eeef04311 Mon Sep 17 00:00:00 2001 From: Geoff Genz Date: Sat, 25 Jan 2025 16:19:41 -0700 Subject: [PATCH 15/31] Exclude 3.8 Aarch64 builds (#460) * Fix lint * Exclude pypy 38 build * Exclude all Python 3.8 builds * Update changelog re Python 3.8 aarch64 wheels From d5a3c96ff8fddae83f9b863ec37455d7d9d18eb6 Mon Sep 17 00:00:00 2001 From: Geoff Genz Date: Sun, 2 Feb 2025 13:42:40 -0700 Subject: [PATCH 16/31] Gg/update test matrix (#464) * Update some tests * Fix lint * Skip JSON buggy test From dfb19b550c8fde63aca447398bd89c17fe22f90e Mon Sep 17 00:00:00 2001 From: Geoff Genz Date: Fri, 11 Apr 2025 14:12:03 -0600 Subject: [PATCH 17/31] Updates for 0.8.17 release (#488) * Updates for 0.8.17 release * Update test matrix * Try to punt on SSL issues * Update TLS test certificates --- .../single_node_tls/certificates/ca.crt | 22 ++++++++------- .../single_node_tls/certificates/client.crt | 23 ++++++++------- .../single_node_tls/certificates/server.crt | 28 +++++++++---------- CHANGELOG.md | 11 ++++++++ timeplus_connect/__version__.py | 2 +- timeplus_connect/driver/client.py | 3 +- timeplus_connect/driver/httpclient.py | 8 ++++-- timeplus_connect/driver/query.py | 4 +-- 8 files changed, 60 insertions(+), 41 deletions(-) diff --git a/.docker/clickhouse/single_node_tls/certificates/ca.crt b/.docker/clickhouse/single_node_tls/certificates/ca.crt index 2dcfce0c..b6160d31 100644 --- a/.docker/clickhouse/single_node_tls/certificates/ca.crt +++ b/.docker/clickhouse/single_node_tls/certificates/ca.crt @@ -1,12 +1,14 @@ -----BEGIN CERTIFICATE----- -MIIBxDCCAWoCCQCC7Dz9F36rcTAKBggqhkjOPQQDAjBqMQswCQYDVQQGEwJVUzER -MA8GA1UECAwIQ29sb3JhZG8xDzANBgNVBAcMBkRlbnZlcjEYMBYGA1UECgwPQ2xp -Y2tIb3VzZSBJbmMuMR0wGwYDVQQDDBRjbGlja2hvdXNlX3Rlc3Rfcm9vdDAeFw0y -MzA0MjYyMTM4MzhaFw00MzA0MjYyMTM4MzhaMGoxCzAJBgNVBAYTAlVTMREwDwYD -VQQIDAhDb2xvcmFkbzEPMA0GA1UEBwwGRGVudmVyMRgwFgYDVQQKDA9DbGlja0hv -dXNlIEluYy4xHTAbBgNVBAMMFGNsaWNraG91c2VfdGVzdF9yb290MFkwEwYHKoZI -zj0CAQYIKoZIzj0DAQcDQgAE8ajzpmv1YDspmgGcE+KjB2SxAQJ2/awkkP/SBvjw -enD0ibQG5fyA5vxhPv7ImbnqebPS1NXwIt4HCkLXKVPDnzAKBggqhkjOPQQDAgNI -ADBFAiAlQ8IWL7OQua7/dFaE8xbFy/hoKnLvuigDg9MAJNJUXwIhAIa0c3pT6z9P -OX2Sw5mfl/YEDTgsG033S1MeAha3707H +MIICOTCCAd+gAwIBAgIUDVFiObYZ48KdDkTlhKzVRf/KfJ0wCgYIKoZIzj0EAwIw +ajELMAkGA1UEBhMCVVMxETAPBgNVBAgMCENvbG9yYWRvMQ8wDQYDVQQHDAZEZW52 +ZXIxGDAWBgNVBAoMD0NsaWNrSG91c2UgSW5jLjEdMBsGA1UEAwwUY2xpY2tob3Vz +ZV90ZXN0X3Jvb3QwHhcNMjUwNDExMTgzOTA5WhcNMjUwNTExMTgzOTA5WjBqMQsw +CQYDVQQGEwJVUzERMA8GA1UECAwIQ29sb3JhZG8xDzANBgNVBAcMBkRlbnZlcjEY +MBYGA1UECgwPQ2xpY2tIb3VzZSBJbmMuMR0wGwYDVQQDDBRjbGlja2hvdXNlX3Rl +c3Rfcm9vdDBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABPGo86Zr9WA7KZoBnBPi +owdksQECdv2sJJD/0gb48Hpw9Im0BuX8gOb8YT7+yJm56nmz0tTV8CLeBwpC1ylT +w5+jYzBhMB0GA1UdDgQWBBSSPtUyuGF0HFuucyfFfWwWMAnF9jAfBgNVHSMEGDAW +gBSSPtUyuGF0HFuucyfFfWwWMAnF9jAPBgNVHRMBAf8EBTADAQH/MA4GA1UdDwEB +/wQEAwIBBjAKBggqhkjOPQQDAgNIADBFAiBdgpWahGxpRC1q2faCmxuAnK4Q6CMp +cMybM4fhdKqhiQIhAM07skDAKqviL8mkZY6XDnHlFdpqnAXBXVHKrsDQTMz/ -----END CERTIFICATE----- diff --git a/.docker/clickhouse/single_node_tls/certificates/client.crt b/.docker/clickhouse/single_node_tls/certificates/client.crt index 366e1985..97ba6db0 100644 --- a/.docker/clickhouse/single_node_tls/certificates/client.crt +++ b/.docker/clickhouse/single_node_tls/certificates/client.crt @@ -1,12 +1,15 @@ -----BEGIN CERTIFICATE----- -MIIBuDCCAV8CCQCvYwZhuT/WEjAKBggqhkjOPQQDAjBqMQswCQYDVQQGEwJVUzER -MA8GA1UECAwIQ29sb3JhZG8xDzANBgNVBAcMBkRlbnZlcjEYMBYGA1UECgwPQ2xp -Y2tIb3VzZSBJbmMuMR0wGwYDVQQDDBRjbGlja2hvdXNlX3Rlc3Rfcm9vdDAeFw0y -MzA0MjYyMjAzMjZaFw00MzA0MjYyMjAzMjZaMF8xCzAJBgNVBAYTAlVTMREwDwYD -VQQIDAhDb2xvcmFkbzEPMA0GA1UEBwwGRGVudmVyMRgwFgYDVQQKDA9DbGlja0hv -dXNlIEluYy4xEjAQBgNVBAMMCWNlcnRfdXNlcjBZMBMGByqGSM49AgEGCCqGSM49 -AwEHA0IABIEhqR0FcbBp0ZdQ6t9c9+rxRVS8TZXlPY2kGlFMkW5AY8/Y05L1q7Cx -mJiwZl6+4U/j8m0EhtVREywb1PENR20wCgYIKoZIzj0EAwIDRwAwRAIgRp0AWMOq -OA8lJTd1h2GrAWDMpiNamMUvLyksxLq5SrgCIA5AwncaSEqGHboq1zHMj0Qnqnua -JQJAbhcsh4sxk8AY +MIICQzCCAemgAwIBAgIUeggQ6+OCjtT3i7jASzwA1qfdDn0wCgYIKoZIzj0EAwIw +ajELMAkGA1UEBhMCVVMxETAPBgNVBAgMCENvbG9yYWRvMQ8wDQYDVQQHDAZEZW52 +ZXIxGDAWBgNVBAoMD0NsaWNrSG91c2UgSW5jLjEdMBsGA1UEAwwUY2xpY2tob3Vz +ZV90ZXN0X3Jvb3QwHhcNMjUwNDExMTk1MjMyWhcNNDUwNDExMTk1MjMyWjBfMQsw +CQYDVQQGEwJVUzERMA8GA1UECAwIQ29sb3JhZG8xDzANBgNVBAcMBkRlbnZlcjEY +MBYGA1UECgwPQ2xpY2tIb3VzZSBJbmMuMRIwEAYDVQQDDAljZXJ0X3VzZXIwWTAT +BgcqhkjOPQIBBggqhkjOPQMBBwNCAASBIakdBXGwadGXUOrfXPfq8UVUvE2V5T2N +pBpRTJFuQGPP2NOS9auwsZiYsGZevuFP4/JtBIbVURMsG9TxDUdto3gwdjAdBgNV +HQ4EFgQUJuFP4dlFGBW3wK6vUkqvSxaLMhswHwYDVR0jBBgwFoAUkj7VMrhhdBxb +rnMnxX1sFjAJxfYwDAYDVR0TAQH/BAIwADAOBgNVHQ8BAf8EBAMCBaAwFgYDVR0l +AQH/BAwwCgYIKwYBBQUHAwIwCgYIKoZIzj0EAwIDSAAwRQIgVrbKF3pqkvivLjhz +uhMREwZtkK5jcQboVmHVtKQpkWACIQDYiwq+e8x/CdFdTiZwGrfliPy/pfBSvPSD +sIRougm0nA== -----END CERTIFICATE----- diff --git a/.docker/clickhouse/single_node_tls/certificates/server.crt b/.docker/clickhouse/single_node_tls/certificates/server.crt index 1adb2502..7e40825a 100644 --- a/.docker/clickhouse/single_node_tls/certificates/server.crt +++ b/.docker/clickhouse/single_node_tls/certificates/server.crt @@ -1,17 +1,15 @@ -----BEGIN CERTIFICATE----- -MIICrjCCAlSgAwIBAgIJAK9jBmG5P9YRMAoGCCqGSM49BAMCMGoxCzAJBgNVBAYT -AlVTMREwDwYDVQQIDAhDb2xvcmFkbzEPMA0GA1UEBwwGRGVudmVyMRgwFgYDVQQK -DA9DbGlja0hvdXNlIEluYy4xHTAbBgNVBAMMFGNsaWNraG91c2VfdGVzdF9yb290 -MB4XDTIzMDQyNjIxNTAxOVoXDTQzMDQyNjIxNTAxOVowbTELMAkGA1UEBhMCVVMx -ETAPBgNVBAgMCENvbG9yYWRvMQ8wDQYDVQQHDAZEZW52ZXIxGDAWBgNVBAoMD0Ns -aWNrSG91c2UgSW5jLjEgMB4GA1UEAwwXc2VydmVyMS5jbGlja2hvdXNlLnRlc3Qw -WTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAARhjivoy18D47i18Jqg6m9yI17ndMWA -kuyPhXFLgW1PpU2wk3DvpUbkKUxUPlKsNwuHEKJ4kcparrrwWGxKT2Dmo4HfMIHc -MIGEBgNVHSMEfTB7oW6kbDBqMQswCQYDVQQGEwJVUzERMA8GA1UECAwIQ29sb3Jh -ZG8xDzANBgNVBAcMBkRlbnZlcjEYMBYGA1UECgwPQ2xpY2tIb3VzZSBJbmMuMR0w -GwYDVQQDDBRjbGlja2hvdXNlX3Rlc3Rfcm9vdIIJAILsPP0XfqtxMAkGA1UdEwQC -MAAwCwYDVR0PBAQDAgTwMDsGA1UdEQQ0MDKCF3NlcnZlcjEuY2xpY2tob3VzZS50 -ZXN0ghdzZXJ2ZXIyLmNsaWNraG91c2UudGVzdDAKBggqhkjOPQQDAgNIADBFAiBM -71Vx9q964BRd9+N0zpbax+N+jWFJQfkOic4wlsPZ7QIhAPBU9Kfbi3Iwy3XwWBOv -YZsvoFRxUfG2RRRlz5cGgKIa +MIICZjCCAg2gAwIBAgIUeggQ6+OCjtT3i7jASzwA1qfdDnswCgYIKoZIzj0EAwIw +ajELMAkGA1UEBhMCVVMxETAPBgNVBAgMCENvbG9yYWRvMQ8wDQYDVQQHDAZEZW52 +ZXIxGDAWBgNVBAoMD0NsaWNrSG91c2UgSW5jLjEdMBsGA1UEAwwUY2xpY2tob3Vz +ZV90ZXN0X3Jvb3QwHhcNMjUwNDExMTkzNTE0WhcNNDUwNDExMTkzNTE0WjBtMQsw +CQYDVQQGEwJVUzERMA8GA1UECAwIQ29sb3JhZG8xDzANBgNVBAcMBkRlbnZlcjEY +MBYGA1UECgwPQ2xpY2tIb3VzZSBJbmMuMSAwHgYDVQQDDBdjbGlja2hvdXNlX3Rl +c3Rfc2VydmVyMTBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABGGOK+jLXwPjuLXw +mqDqb3IjXud0xYCS7I+FcUuBbU+lTbCTcO+lRuQpTFQ+Uqw3C4cQoniRylquuvBY +bEpPYOajgY0wgYowHQYDVR0OBBYEFMT7NvpCkmSa2HYEyql/pUCxdkWQMB8GA1Ud +IwQYMBaAFJI+1TK4YXQcW65zJ8V9bBYwCcX2MAwGA1UdEwEB/wQCMAAwIgYDVR0R +BBswGYIXc2VydmVyMS5jbGlja2hvdXNlLnRlc3QwFgYDVR0lAQH/BAwwCgYIKwYB +BQUHAwEwCgYIKoZIzj0EAwIDRwAwRAIgDUXjls0mpQwTOJyw9zy0zOA0kfU+fldI +S4qsQwKhpmECID2eUcgU2zv0koUcE1M6UyVzQrJfJviUR48bh8rgkykg -----END CERTIFICATE----- diff --git a/CHANGELOG.md b/CHANGELOG.md index f00faea9..b71a9aaf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,17 @@ release (0.9.0), unrecognized arguments/keywords for these methods of creating a instead of being passed as ClickHouse server settings. This is in conjunction with some refactoring in Client construction. The supported method of passing ClickHouse server settings is to prefix such arguments/query parameters with`ch_`. +## 0.8.17, 2025-04-10 +### Bug Fix +- Version 0.8.16 introduced a bug where changing a Client setting value and then changing that setting value back to the +original server value would fail to restore the original setting. This has been fixed. Closes +https://github.com/ClickHouse/clickhouse-connect/issues/487 + +### Improvement +- There was previously no way to add a path to the ClickHouse server host in cases where the ClickHouse server was +behind a proxy that used path based routing (such as `https://big_proxy:8080/clickhouse). The new `proxy_path` +`get_client` argument can now be used to set that path. Closes https://github.com/ClickHouse/clickhouse-connect/issues/486 + ## 0.8.16, 2025-03-28 ### Bug Fixes - Don't send a setting value if the setting is already correct according to the `system.settings` table. diff --git a/timeplus_connect/__version__.py b/timeplus_connect/__version__.py index 5e80f2e4..fdf6a632 100644 --- a/timeplus_connect/__version__.py +++ b/timeplus_connect/__version__.py @@ -1 +1 @@ -version = '0.8.16' +version = '0.8.17' diff --git a/timeplus_connect/driver/client.py b/timeplus_connect/driver/client.py index c387254d..2c241cb1 100644 --- a/timeplus_connect/driver/client.py +++ b/timeplus_connect/driver/client.py @@ -129,7 +129,8 @@ def _validate_setting(self, key: str, value: Any, invalid_action: str) -> Option str_value = '0' if key not in self.valid_transport_settings: setting_def = self.server_settings.get(key) - if setting_def and setting_def.value == str_value: + current_setting = self.get_client_setting(key) + if setting_def and setting_def.value == str_value and (current_setting is None or current_setting == setting_def.value): return None # don't send settings that are already the expected value if setting_def is None or setting_def.readonly: if key in self.optional_transport_settings: diff --git a/timeplus_connect/driver/httpclient.py b/timeplus_connect/driver/httpclient.py index b56e27d2..5822f187 100644 --- a/timeplus_connect/driver/httpclient.py +++ b/timeplus_connect/driver/httpclient.py @@ -74,12 +74,16 @@ def __init__(self, apply_server_timezone: Optional[Union[str, bool]] = None, show_clickhouse_errors: Optional[bool] = None, autogenerate_session_id: Optional[bool] = None, - tls_mode: Optional[str] = None): + tls_mode: Optional[str] = None, + proxy_path: str = ''): """ Create an HTTP ClickHouse Connect client See timeplus_connect.get_client for parameters """ - self.url = f'{interface}://{host}:{port}' + proxy_path = proxy_path.lstrip('/') + if proxy_path: + proxy_path = '/' + proxy_path + self.url = f'{interface}://{host}:{port}{proxy_path}' self.headers = {} self.params = dict_copy(HttpClient.params) ch_settings = dict_copy(settings, self.params) diff --git a/timeplus_connect/driver/query.py b/timeplus_connect/driver/query.py index e7b74e6a..d975758a 100644 --- a/timeplus_connect/driver/query.py +++ b/timeplus_connect/driver/query.py @@ -3,7 +3,7 @@ import pytz from io import IOBase -from typing import Any, Tuple, Dict, Sequence, Optional, Union, Generator +from typing import Any, Tuple, Dict, Sequence, Optional, Union, Generator, BinaryIO from datetime import tzinfo from pytz.exceptions import UnknownTimeZoneError @@ -374,7 +374,7 @@ def to_arrow_batches(buffer: IOBase) -> StreamContext: return StreamContext(buffer, reader) -def arrow_buffer(table, compression: Optional[str] = None) -> Tuple[Sequence[str], bytes]: +def arrow_buffer(table, compression: Optional[str] = None) -> Tuple[Sequence[str], Union[bytes, BinaryIO]]: pyarrow = check_arrow() options = None if compression in ('zstd', 'lz4'): From d92035a2bc34bdad8ab144a45be5e96305bd5708 Mon Sep 17 00:00:00 2001 From: Geoff Genz Date: Wed, 5 Feb 2025 08:51:32 -0700 Subject: [PATCH 18/31] Fix CI tests with default user (#465) From e61c6b0788b041a0b18bac73fd7d3fb4a42580ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Szczur?= Date: Sat, 12 Apr 2025 14:51:25 +0200 Subject: [PATCH 19/31] Add param extra_http_headers to query/command methods (#489) * Add param extra_http_headers to query/command methods * add test, fix dict copy --------- Co-authored-by: Geoff Genz --- tests/integration_tests/test_client.py | 7 ++ timeplus_connect/driver/client.py | 88 +++++++++++++++++--------- timeplus_connect/driver/httpclient.py | 16 +++-- timeplus_connect/driver/query.py | 10 ++- 4 files changed, 83 insertions(+), 38 deletions(-) diff --git a/tests/integration_tests/test_client.py b/tests/integration_tests/test_client.py index 105c5f13..2ee0f27f 100644 --- a/tests/integration_tests/test_client.py +++ b/tests/integration_tests/test_client.py @@ -41,6 +41,13 @@ def test_client_name(test_client: Client): assert 'py/' in user_agent +def test_extra_http_headers(test_client: Client): + result = test_client.query('SELECT name,database FROM system.tables', + extra_http_headers={'X-Workload': 'ONLINE'}) + assert result.column_names == ('name', 'database') + assert len(result.result_set) > 0 + + def test_none_database(test_client: Client): old_db = test_client.database test_db = test_client.command('select current_database()') diff --git a/timeplus_connect/driver/client.py b/timeplus_connect/driver/client.py index 2c241cb1..3f24ee02 100644 --- a/timeplus_connect/driver/client.py +++ b/timeplus_connect/driver/client.py @@ -211,7 +211,8 @@ def query(self, context: QueryContext = None, query_tz: Optional[Union[str, tzinfo]] = None, column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, - external_data: Optional[ExternalData] = None) -> QueryResult: + external_data: Optional[ExternalData] = None, + extra_http_headers: Optional[Dict[str, str]] = None) -> QueryResult: """ Main query method for SELECT, DESCRIBE and other SQL statements that return a result matrix. For parameters, see the create_query_context method @@ -227,7 +228,8 @@ def query(self, response = self.command(query, parameters=query_context.parameters, settings=query_context.settings, - external_data=query_context.external_data) + external_data=query_context.external_data, + extra_http_headers=query_context.extra_http_headers) if isinstance(response, QuerySummary): return response.as_query_result() return QueryResult([response] if isinstance(response, list) else [[response]]) @@ -244,7 +246,8 @@ def query_column_block_stream(self, context: QueryContext = None, query_tz: Optional[Union[str, tzinfo]] = None, column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, - external_data: Optional[ExternalData] = None) -> StreamContext: + external_data: Optional[ExternalData] = None, + extra_http_headers: Optional[Dict[str, str]] = None) -> StreamContext: """ Variation of main query method that returns a stream of column oriented blocks. For parameters, see the create_query_context method. @@ -263,7 +266,8 @@ def query_row_block_stream(self, context: QueryContext = None, query_tz: Optional[Union[str, tzinfo]] = None, column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, - external_data: Optional[ExternalData] = None) -> StreamContext: + external_data: Optional[ExternalData] = None, + extra_http_headers: Optional[Dict[str, str]] = None) -> StreamContext: """ Variation of main query method that returns a stream of row oriented blocks. For parameters, see the create_query_context method. @@ -282,7 +286,8 @@ def query_rows_stream(self, context: QueryContext = None, query_tz: Optional[Union[str, tzinfo]] = None, column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, - external_data: Optional[ExternalData] = None) -> StreamContext: + external_data: Optional[ExternalData] = None, + extra_http_headers: Optional[Dict[str, str]] = None) -> StreamContext: """ Variation of main query method that returns a stream of row oriented blocks. For parameters, see the create_query_context method. @@ -296,16 +301,19 @@ def raw_query(self, query: str, settings: Optional[Dict[str, Any]] = None, fmt: str = None, use_database: bool = True, - external_data: Optional[ExternalData] = None) -> bytes: + external_data: Optional[ExternalData] = None, + extra_http_headers: Optional[Dict[str, str]] = None) -> bytes: """ Query method that simply returns the raw ClickHouse format bytes :param query: Query statement/format string :param parameters: Optional dictionary used to format the query :param settings: Optional dictionary of ClickHouse settings (key/string values) :param fmt: ClickHouse output format - :param use_database Send the database parameter to ClickHouse so the command will be executed in the client + :param use_database: Send the database parameter to ClickHouse so the command will be executed in the client database context. - :param external_data External data to send with the query + :param external_data: External data to send with the query + :param extra_http_headers: Optional dictionary of extra HTTP headers to pass to ClickHouse, + useful if using Proxy. :return: bytes representing raw ClickHouse return value based on format """ @@ -315,7 +323,8 @@ def raw_stream(self, query: str, settings: Optional[Dict[str, Any]] = None, fmt: str = None, use_database: bool = True, - external_data: Optional[ExternalData] = None) -> io.IOBase: + external_data: Optional[ExternalData] = None, + extra_http_headers: Optional[Dict[str, str]] = None) -> io.IOBase: """ Query method that returns the result as an io.IOBase iterator :param query: Query statement/format string @@ -324,7 +333,9 @@ def raw_stream(self, query: str, :param fmt: ClickHouse output format :param use_database Send the database parameter to ClickHouse so the command will be executed in the client database context. - :param external_data External data to send with the query + :param external_data: External data to send with the query. + :param extra_http_headers: Optional dictionary of extra HTTP headers to pass to ClickHouse, + useful if using Proxy. :return: io.IOBase stream/iterator for the result """ @@ -339,7 +350,8 @@ def query_np(self, use_none: Optional[bool] = None, max_str_len: Optional[int] = None, context: QueryContext = None, - external_data: Optional[ExternalData] = None): + external_data: Optional[ExternalData] = None, + extra_http_headers: Optional[Dict[str, str]] = None): """ Query method that returns the results as a numpy array. For parameter values, see the create_query_context method @@ -359,7 +371,8 @@ def query_np_stream(self, use_none: Optional[bool] = None, max_str_len: Optional[int] = None, context: QueryContext = None, - external_data: Optional[ExternalData] = None) -> StreamContext: + external_data: Optional[ExternalData] = None, + extra_http_headers: Optional[Dict[str, str]] = None) -> StreamContext: """ Query method that returns the results as a stream of numpy arrays. For parameter values, see the create_query_context method @@ -383,7 +396,8 @@ def query_df(self, column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, context: QueryContext = None, external_data: Optional[ExternalData] = None, - use_extended_dtypes: Optional[bool] = None): + use_extended_dtypes: Optional[bool] = None, + extra_http_headers: Optional[Dict[str, str]] = None): """ Query method that results the results as a pandas dataframe. For parameter values, see the create_query_context method @@ -407,7 +421,8 @@ def query_df_stream(self, column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, context: QueryContext = None, external_data: Optional[ExternalData] = None, - use_extended_dtypes: Optional[bool] = None) -> StreamContext: + use_extended_dtypes: Optional[bool] = None, + extra_http_headers: Optional[Dict[str, str]] = None) -> StreamContext: """ Query method that returns the results as a StreamContext. For parameter values, see the create_query_context method @@ -436,7 +451,8 @@ def create_query_context(self, streaming: bool = False, as_pandas: bool = False, external_data: Optional[ExternalData] = None, - use_extended_dtypes: Optional[bool] = None) -> QueryContext: + use_extended_dtypes: Optional[bool] = None, + extra_http_headers: Optional[Dict[str, str]] = None) -> QueryContext: """ Creates or updates a reusable QueryContext object :param query: Query statement/format string @@ -454,10 +470,10 @@ def create_query_context(self, structured array even with ClickHouse variable length String columns. If 0, Numpy arrays for String columns will always be object arrays :param context: An existing QueryContext to be updated with any provided parameter values - :param query_tz Either a string or a pytz tzinfo object. (Strings will be converted to tzinfo objects). + :param query_tz: Either a string or a pytz tzinfo object. (Strings will be converted to tzinfo objects). Values for any DateTime or DateTime64 column in the query will be converted to Python datetime.datetime objects with the selected timezone. - :param column_tzs A dictionary of column names to tzinfo objects (or strings that will be converted to + :param column_tzs: A dictionary of column names to tzinfo objects (or strings that will be converted to tzinfo objects). The timezone will be applied to datetime objects returned in the query :param use_na_values: Deprecated alias for use_advanced_dtypes :param as_pandas Return the result columns as pandas.Series objects @@ -466,6 +482,8 @@ def create_query_context(self, :param use_extended_dtypes: Only relevant to Pandas Dataframe queries. Use Pandas "missing types", such as pandas.NA and pandas.NaT for ClickHouse NULL values, as well as extended Pandas dtypes such as IntegerArray and StringArray. Defaulted to True for query_df methods + :param extra_http_headers: Optional dictionary of extra HTTP headers to pass to ClickHouse, + useful if using Proxy. :return: Reusable QueryContext """ if context: @@ -509,21 +527,25 @@ def create_query_context(self, as_pandas=as_pandas, streaming=streaming, apply_server_tz=self.apply_server_timezone, - external_data=external_data) + external_data=external_data, + extra_http_headers=extra_http_headers) def query_arrow(self, query: str, parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, settings: Optional[Dict[str, Any]] = None, use_strings: Optional[bool] = None, - external_data: Optional[ExternalData] = None): + external_data: Optional[ExternalData] = None, + extra_http_headers: Optional[Dict[str, str]] = None): """ Query method using the ClickHouse Arrow format to return a PyArrow table :param query: Query statement/format string :param parameters: Optional dictionary used to format the query :param settings: Optional dictionary of ClickHouse settings (key/string values) - :param use_strings: Convert ClickHouse String type to Arrow string type (instead of binary) - :param external_data ClickHouse "external data" to send with query + :param use_strings: Convert ClickHouse String type to Arrow string type (instead of binary) + :param external_data: ClickHouse "external data" to send with query + :param extra_http_headers: Optional dictionary of extra HTTP headers to pass to ClickHouse, + useful if using Proxy. :return: PyArrow.Table """ check_arrow() @@ -532,21 +554,25 @@ def query_arrow(self, parameters, settings, fmt='Arrow', - external_data=external_data)) + external_data=external_data, + extra_http_headers=extra_http_headers)) def query_arrow_stream(self, query: str, parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, settings: Optional[Dict[str, Any]] = None, use_strings: Optional[bool] = None, - external_data: Optional[ExternalData] = None) -> StreamContext: + external_data: Optional[ExternalData] = None, + extra_http_headers: Optional[Dict[str, str]] = None) -> StreamContext: """ Query method that returns the results as a stream of Arrow tables :param query: Query statement/format string :param parameters: Optional dictionary used to format the query :param settings: Optional dictionary of ClickHouse settings (key/string values) - :param use_strings: Convert ClickHouse String type to Arrow string type (instead of binary) - :param external_data ClickHouse "external data" to send with query + :param use_strings: Convert ClickHouse String type to Arrow string type (instead of binary) + :param external_data: ClickHouse "external data" to send with query + :param extra_http_headers: Optional dictionary of extra HTTP headers to pass to ClickHouse, + useful if using Proxy. :return: Generator that yields a PyArrow.Table for per block representing the result set """ check_arrow() @@ -555,7 +581,8 @@ def query_arrow_stream(self, parameters, settings, fmt='ArrowStream', - external_data=external_data)) + external_data=external_data, + extra_http_headers=extra_http_headers)) def _update_arrow_settings(self, settings: Optional[Dict[str, Any]], @@ -580,7 +607,8 @@ def command(self, data: Union[str, bytes] = None, settings: Dict[str, Any] = None, use_database: bool = True, - external_data: Optional[ExternalData] = None) -> Union[str, int, Sequence[str], QuerySummary]: + external_data: Optional[ExternalData] = None, + extra_http_headers: Optional[Dict[str, str]] = None) -> Union[str, int, Sequence[str], QuerySummary]: """ Client method that returns a single value instead of a result set :param cmd: ClickHouse query/command as a python format string @@ -588,9 +616,11 @@ def command(self, :param data: Optional 'data' for the command (for INSERT INTO in particular) :param settings: Optional dictionary of ClickHouse settings (key/string values) :param use_database: Send the database parameter to ClickHouse so the command will be executed in the client - database context. Otherwise, no database will be specified with the command. This is useful for determining + database context. Otherwise, no database will be specified with the command. This is useful for determining the default user database - :param external_data ClickHouse "external data" to send with command/query + :param external_data: ClickHouse "external data" to send with command/query + :param extra_http_headers: Optional dictionary of extra HTTP headers to pass to ClickHouse, + useful if using Proxy. :return: Decoded response from ClickHouse as either a string, int, or sequence of strings, or QuerySummary if no data returned """ diff --git a/timeplus_connect/driver/httpclient.py b/timeplus_connect/driver/httpclient.py index 5822f187..309e61c3 100644 --- a/timeplus_connect/driver/httpclient.py +++ b/timeplus_connect/driver/httpclient.py @@ -203,7 +203,7 @@ def _prep_query(self, context: QueryContext): return final_query + fmt def _query_with_context(self, context: QueryContext) -> QueryResult: - headers = {} + headers = dict_copy(context.extra_http_headers) params = {} if self.database: params['database'] = self.database @@ -331,7 +331,8 @@ def command(self, data: Union[str, bytes] = None, settings: Optional[Dict] = None, use_database: int = True, - external_data: Optional[ExternalData] = None) -> Union[str, int, Sequence[str], QuerySummary]: + external_data: Optional[ExternalData] = None, + extra_http_headers: Optional[Dict[str, str]] = None) -> Union[str, int, Sequence[str], QuerySummary]: """ See BaseClient doc_string for this method """ @@ -481,24 +482,27 @@ def raw_query(self, query: str, settings: Optional[Dict[str, Any]] = None, fmt: str = None, use_database: bool = True, - external_data: Optional[ExternalData] = None) -> bytes: + external_data: Optional[ExternalData] = None, + extra_http_headers: Optional[Dict[str, str]] = None) -> bytes: """ See BaseClient doc_string for this method """ body, params, fields = self._prep_raw_query(query, parameters, settings, fmt, use_database, external_data) - return self._raw_request(body, params, fields=fields).data + return self._raw_request(body, params, fields=fields, headers=extra_http_headers).data def raw_stream(self, query: str, parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, settings: Optional[Dict[str, Any]] = None, fmt: str = None, use_database: bool = True, - external_data: Optional[ExternalData] = None) -> io.IOBase: + external_data: Optional[ExternalData] = None, + extra_http_headers: Optional[Dict[str, str]] = None) -> io.IOBase: """ See BaseClient doc_string for this method """ body, params, fields = self._prep_raw_query(query, parameters, settings, fmt, use_database, external_data) - return self._raw_request(body, params, fields=fields, stream=True, server_wait=False) + return self._raw_request(body, params, fields=fields, stream=True, server_wait=False, + headers=extra_http_headers) def _prep_raw_query(self, query: str, parameters: Optional[Union[Sequence, Dict[str, Any]]], diff --git a/timeplus_connect/driver/query.py b/timeplus_connect/driver/query.py index d975758a..5807e671 100644 --- a/timeplus_connect/driver/query.py +++ b/timeplus_connect/driver/query.py @@ -52,7 +52,8 @@ def __init__(self, as_pandas: bool = False, streaming: bool = False, apply_server_tz: bool = False, - external_data: Optional[ExternalData] = None): + external_data: Optional[ExternalData] = None, + extra_http_headers: Optional[Dict[str, str]] = None): """ Initializes various configuration settings for the query context @@ -116,6 +117,7 @@ def __init__(self, self.as_pandas = as_pandas self.use_pandas_na = as_pandas and pd_extended_dtypes self.streaming = streaming + self.extra_http_headers = extra_http_headers self._update_query() @property @@ -189,7 +191,8 @@ def updated_copy(self, use_extended_dtypes: Optional[bool] = None, as_pandas: bool = False, streaming: bool = False, - external_data: Optional[ExternalData] = None) -> 'QueryContext': + external_data: Optional[ExternalData] = None, + extra_http_headers: Optional[Dict[str, str]] = None) -> 'QueryContext': """ Creates Query context copy with parameters overridden/updated as appropriate. """ @@ -210,7 +213,8 @@ def updated_copy(self, as_pandas, streaming, self.apply_server_tz, - self.external_data if external_data is None else external_data) + self.external_data if external_data is None else external_data, + self.extra_http_headers if extra_http_headers is None else extra_http_headers) def _update_query(self): self.final_query, self.bind_params = bind_query(self.query, self.parameters, self.server_tz) From 94693aff3504d84de2684077ecb5fd3b970fe2e0 Mon Sep 17 00:00:00 2001 From: Geoff Genz Date: Sat, 12 Apr 2025 16:29:10 -0600 Subject: [PATCH 20/31] Change http_headers to transport settings, add transport settings to async client and insert methods (#490) --- CHANGELOG.md | 15 ++-- tests/integration_tests/test_client.py | 4 +- timeplus_connect/driver/asyncclient.py | 120 +++++++++++++++++-------- timeplus_connect/driver/client.py | 92 ++++++++++--------- timeplus_connect/driver/context.py | 4 +- timeplus_connect/driver/httpclient.py | 22 ++--- timeplus_connect/driver/insert.py | 5 +- timeplus_connect/driver/query.py | 10 +-- 8 files changed, 166 insertions(+), 106 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b71a9aaf..7785ca2a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,16 +22,21 @@ instead of being passed as ClickHouse server settings. This is in conjunction wi The supported method of passing ClickHouse server settings is to prefix such arguments/query parameters with`ch_`. ## 0.8.17, 2025-04-10 -### Bug Fix -- Version 0.8.16 introduced a bug where changing a Client setting value and then changing that setting value back to the -original server value would fail to restore the original setting. This has been fixed. Closes -https://github.com/ClickHouse/clickhouse-connect/issues/487 -### Improvement +### Improvements +- The parameter `transport_settings` has been added to the Client query and insert methods. For the HTTP client (currently +the only option), this dictionary of string is directly translated into additional HTTP headers at a query level. This can +be used to provide additional proxy directives or other extra 'non-ClickHouse' information that is passed via headers. +Thanks to [Paweł Szczur](https://github.com/orian) of PostHog for the original PR! - There was previously no way to add a path to the ClickHouse server host in cases where the ClickHouse server was behind a proxy that used path based routing (such as `https://big_proxy:8080/clickhouse). The new `proxy_path` `get_client` argument can now be used to set that path. Closes https://github.com/ClickHouse/clickhouse-connect/issues/486 +### Bug Fix +- Version 0.8.16 introduced a bug where changing a Client setting value and then changing that setting value back to the + original server value would fail to restore the original setting. This has been fixed. Closes + https://github.com/ClickHouse/clickhouse-connect/issues/487 + ## 0.8.16, 2025-03-28 ### Bug Fixes - Don't send a setting value if the setting is already correct according to the `system.settings` table. diff --git a/tests/integration_tests/test_client.py b/tests/integration_tests/test_client.py index 2ee0f27f..6cbbee4d 100644 --- a/tests/integration_tests/test_client.py +++ b/tests/integration_tests/test_client.py @@ -41,9 +41,9 @@ def test_client_name(test_client: Client): assert 'py/' in user_agent -def test_extra_http_headers(test_client: Client): +def test_transport_settings(test_client: Client): result = test_client.query('SELECT name,database FROM system.tables', - extra_http_headers={'X-Workload': 'ONLINE'}) + transport_settings={'X-Workload': 'ONLINE'}) assert result.column_names == ('name', 'database') assert len(result.result_set) > 0 diff --git a/timeplus_connect/driver/asyncclient.py b/timeplus_connect/driver/asyncclient.py index 28a8d6ec..639dea95 100644 --- a/timeplus_connect/driver/asyncclient.py +++ b/timeplus_connect/driver/asyncclient.py @@ -85,7 +85,8 @@ async def query(self, context: QueryContext = None, query_tz: Optional[Union[str, tzinfo]] = None, column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, - external_data: Optional[ExternalData] = None) -> QueryResult: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> QueryResult: """ Main query method for SELECT, DESCRIBE and other SQL statements that return a result matrix. For parameters, see the create_query_context method. @@ -97,7 +98,7 @@ def _query(): column_formats=column_formats, encoding=encoding, use_none=use_none, column_oriented=column_oriented, use_numpy=use_numpy, max_str_len=max_str_len, context=context, query_tz=query_tz, column_tzs=column_tzs, - external_data=external_data) + external_data=external_data, transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _query) @@ -114,7 +115,9 @@ async def query_column_block_stream(self, context: QueryContext = None, query_tz: Optional[Union[str, tzinfo]] = None, column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, - external_data: Optional[ExternalData] = None) -> StreamContext: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None, + ) -> StreamContext: """ Variation of main query method that returns a stream of column oriented blocks. For parameters, see the create_query_context method. @@ -126,7 +129,7 @@ def _query_column_block_stream(): query_formats=query_formats, column_formats=column_formats, encoding=encoding, use_none=use_none, context=context, query_tz=query_tz, column_tzs=column_tzs, - external_data=external_data) + external_data=external_data, transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _query_column_block_stream) @@ -143,7 +146,8 @@ async def query_row_block_stream(self, context: QueryContext = None, query_tz: Optional[Union[str, tzinfo]] = None, column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, - external_data: Optional[ExternalData] = None) -> StreamContext: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> StreamContext: """ Variation of main query method that returns a stream of row oriented blocks. For parameters, see the create_query_context method. @@ -155,7 +159,7 @@ def _query_row_block_stream(): query_formats=query_formats, column_formats=column_formats, encoding=encoding, use_none=use_none, context=context, query_tz=query_tz, column_tzs=column_tzs, - external_data=external_data) + external_data=external_data, transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _query_row_block_stream) @@ -172,7 +176,8 @@ async def query_rows_stream(self, context: QueryContext = None, query_tz: Optional[Union[str, tzinfo]] = None, column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, - external_data: Optional[ExternalData] = None) -> StreamContext: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> StreamContext: """ Variation of main query method that returns a stream of row oriented blocks. For parameters, see the create_query_context method. @@ -184,7 +189,7 @@ def _query_rows_stream(): query_formats=query_formats, column_formats=column_formats, encoding=encoding, use_none=use_none, context=context, query_tz=query_tz, column_tzs=column_tzs, - external_data=external_data) + external_data=external_data, transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _query_rows_stream) @@ -196,7 +201,8 @@ async def raw_query(self, settings: Optional[Dict[str, Any]] = None, fmt: str = None, use_database: bool = True, - external_data: Optional[ExternalData] = None) -> bytes: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> bytes: """ Query method that simply returns the raw ClickHouse format bytes. :param query: Query statement/format string @@ -206,12 +212,14 @@ async def raw_query(self, :param use_database Send the database parameter to ClickHouse so the command will be executed in the client database context :param external_data External data to send with the query + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: bytes representing raw ClickHouse return value based on format """ def _raw_query(): return self.client.raw_query(query=query, parameters=parameters, settings=settings, fmt=fmt, - use_database=use_database, external_data=external_data) + use_database=use_database, external_data=external_data, + transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _raw_query) @@ -222,7 +230,8 @@ async def raw_stream(self, query: str, settings: Optional[Dict[str, Any]] = None, fmt: str = None, use_database: bool = True, - external_data: Optional[ExternalData] = None) -> io.IOBase: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> io.IOBase: """ Query method that returns the result as an io.IOBase iterator. :param query: Query statement/format string @@ -232,12 +241,13 @@ async def raw_stream(self, query: str, :param use_database Send the database parameter to ClickHouse so the command will be executed in the client database context :param external_data External data to send with the query + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: io.IOBase stream/iterator for the result """ def _raw_stream(): return self.client.raw_stream(query=query, parameters=parameters, settings=settings, fmt=fmt, - use_database=use_database, external_data=external_data) + use_database=use_database, external_data=external_data, transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _raw_stream) @@ -253,7 +263,8 @@ async def query_np(self, use_none: Optional[bool] = None, max_str_len: Optional[int] = None, context: QueryContext = None, - external_data: Optional[ExternalData] = None): + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None): """ Query method that returns the results as a numpy array. For parameter values, see the create_query_context method. @@ -264,7 +275,7 @@ def _query_np(): return self.client.query_np(query=query, parameters=parameters, settings=settings, query_formats=query_formats, column_formats=column_formats, encoding=encoding, use_none=use_none, max_str_len=max_str_len, context=context, - external_data=external_data) + external_data=external_data, transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _query_np) @@ -280,7 +291,8 @@ async def query_np_stream(self, use_none: Optional[bool] = None, max_str_len: Optional[int] = None, context: QueryContext = None, - external_data: Optional[ExternalData] = None) -> StreamContext: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> StreamContext: """ Query method that returns the results as a stream of numpy arrays. For parameter values, see the create_query_context method. @@ -291,7 +303,7 @@ def _query_np_stream(): return self.client.query_np_stream(query=query, parameters=parameters, settings=settings, query_formats=query_formats, column_formats=column_formats, encoding=encoding, use_none=use_none, max_str_len=max_str_len, - context=context, external_data=external_data) + context=context, external_data=external_data, transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _query_np_stream) @@ -311,7 +323,8 @@ async def query_df(self, column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, context: QueryContext = None, external_data: Optional[ExternalData] = None, - use_extended_dtypes: Optional[bool] = None): + use_extended_dtypes: Optional[bool] = None, + transport_settings: Optional[Dict[str, str]] = None): """ Query method that results the results as a pandas dataframe. For parameter values, see the create_query_context method. @@ -323,7 +336,8 @@ def _query_df(): query_formats=query_formats, column_formats=column_formats, encoding=encoding, use_none=use_none, max_str_len=max_str_len, use_na_values=use_na_values, query_tz=query_tz, column_tzs=column_tzs, context=context, - external_data=external_data, use_extended_dtypes=use_extended_dtypes) + external_data=external_data, use_extended_dtypes=use_extended_dtypes, + transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _query_df) @@ -343,7 +357,8 @@ async def query_df_stream(self, column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, context: QueryContext = None, external_data: Optional[ExternalData] = None, - use_extended_dtypes: Optional[bool] = None) -> StreamContext: + use_extended_dtypes: Optional[bool] = None, + transport_settings: Optional[Dict[str, str]] = None) -> StreamContext: """ Query method that returns the results as a StreamContext. For parameter values, see the create_query_context method. @@ -356,7 +371,8 @@ def _query_df_stream(): encoding=encoding, use_none=use_none, max_str_len=max_str_len, use_na_values=use_na_values, query_tz=query_tz, column_tzs=column_tzs, context=context, - external_data=external_data, use_extended_dtypes=use_extended_dtypes) + external_data=external_data, use_extended_dtypes=use_extended_dtypes, + transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _query_df_stream) @@ -380,7 +396,8 @@ def create_query_context(self, streaming: bool = False, as_pandas: bool = False, external_data: Optional[ExternalData] = None, - use_extended_dtypes: Optional[bool] = None) -> QueryContext: + use_extended_dtypes: Optional[bool] = None, + transport_settings: Optional[Dict[str, str]] = None) -> QueryContext: """ Creates or updates a reusable QueryContext object :param query: Query statement/format string @@ -410,6 +427,7 @@ def create_query_context(self, :param use_extended_dtypes: Only relevant to Pandas Dataframe queries. Use Pandas "missing types", such as pandas.NA and pandas.NaT for ClickHouse NULL values, as well as extended Pandas dtypes such as IntegerArray and StringArray. Defaulted to True for query_df methods + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: Reusable QueryContext """ @@ -422,14 +440,16 @@ def create_query_context(self, use_na_values=use_na_values, streaming=streaming, as_pandas=as_pandas, external_data=external_data, - use_extended_dtypes=use_extended_dtypes) + use_extended_dtypes=use_extended_dtypes, + transport_settings=transport_settings) async def query_arrow(self, query: str, parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, settings: Optional[Dict[str, Any]] = None, use_strings: Optional[bool] = None, - external_data: Optional[ExternalData] = None): + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None): """ Query method using the ClickHouse Arrow format to return a PyArrow table :param query: Query statement/format string @@ -437,12 +457,14 @@ async def query_arrow(self, :param settings: Optional dictionary of ClickHouse settings (key/string values) :param use_strings: Convert ClickHouse String type to Arrow string type (instead of binary) :param external_data ClickHouse "external data" to send with query + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: PyArrow.Table """ def _query_arrow(): return self.client.query_arrow(query=query, parameters=parameters, settings=settings, - use_strings=use_strings, external_data=external_data) + use_strings=use_strings, external_data=external_data, + transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _query_arrow) @@ -453,7 +475,8 @@ async def query_arrow_stream(self, parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, settings: Optional[Dict[str, Any]] = None, use_strings: Optional[bool] = None, - external_data: Optional[ExternalData] = None) -> StreamContext: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> StreamContext: """ Query method that returns the results as a stream of Arrow tables :param query: Query statement/format string @@ -461,12 +484,14 @@ async def query_arrow_stream(self, :param settings: Optional dictionary of ClickHouse settings (key/string values) :param use_strings: Convert ClickHouse String type to Arrow string type (instead of binary) :param external_data ClickHouse "external data" to send with query + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: Generator that yields a PyArrow.Table for per block representing the result set """ def _query_arrow_stream(): return self.client.query_arrow_stream(query=query, parameters=parameters, settings=settings, - use_strings=use_strings, external_data=external_data) + use_strings=use_strings, external_data=external_data, + transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _query_arrow_stream) @@ -478,7 +503,8 @@ async def command(self, data: Union[str, bytes] = None, settings: Dict[str, Any] = None, use_database: bool = True, - external_data: Optional[ExternalData] = None) -> Union[str, int, Sequence[str], QuerySummary]: + external_data: Optional[ExternalData] = None, + transport_settings: Optional[Dict[str, str]] = None) -> Union[str, int, Sequence[str], QuerySummary]: """ Client method that returns a single value instead of a result set :param cmd: ClickHouse query/command as a python format string @@ -489,13 +515,15 @@ async def command(self, database context. Otherwise, no database will be specified with the command. This is useful for determining the default user database :param external_data ClickHouse "external data" to send with command/query + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: Decoded response from ClickHouse as either a string, int, or sequence of strings, or QuerySummary if no data returned """ def _command(): return self.client.command(cmd=cmd, parameters=parameters, data=data, settings=settings, - use_database=use_database, external_data=external_data) + use_database=use_database, external_data=external_data, + transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _command) @@ -523,7 +551,8 @@ async def insert(self, column_type_names: Sequence[str] = None, column_oriented: bool = False, settings: Optional[Dict[str, Any]] = None, - context: InsertContext = None) -> QuerySummary: + context: InsertContext = None, + transport_settings: Optional[Dict[str, str]] = None) -> QuerySummary: """ Method to insert multiple rows/data matrix of native Python objects. If context is specified arguments other than data are ignored @@ -540,13 +569,15 @@ async def insert(self, :param settings: Optional dictionary of ClickHouse settings (key/string values) :param context: Optional reusable insert context to allow repeated inserts into the same table with different data batches + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: QuerySummary with summary information, throws exception if insert fails """ def _insert(): return self.client.insert(table=table, data=data, column_names=column_names, database=database, column_types=column_types, column_type_names=column_type_names, - column_oriented=column_oriented, settings=settings, context=context) + column_oriented=column_oriented, settings=settings, context=context, + transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _insert) @@ -559,7 +590,8 @@ async def insert_df(self, table: str = None, column_names: Optional[Sequence[str]] = None, column_types: Sequence[TimeplusType] = None, column_type_names: Sequence[str] = None, - context: InsertContext = None) -> QuerySummary: + context: InsertContext = None, + transport_settings: Optional[Dict[str, str]] = None) -> QuerySummary: """ Insert a pandas DataFrame into ClickHouse. If context is specified arguments other than df are ignored :param table: ClickHouse table @@ -574,6 +606,7 @@ async def insert_df(self, table: str = None, retrieved from the server :param context: Optional reusable insert context to allow repeated inserts into the same table with different data batches + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: QuerySummary with summary information, throws exception if insert fails """ @@ -581,7 +614,7 @@ def _insert_df(): return self.client.insert_df(table=table, df=df, database=database, settings=settings, column_names=column_names, column_types=column_types, column_type_names=column_type_names, - context=context) + context=context, transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _insert_df) @@ -589,18 +622,21 @@ def _insert_df(): async def insert_arrow(self, table: str, arrow_table, database: str = None, - settings: Optional[Dict] = None) -> QuerySummary: + settings: Optional[Dict] = None, + transport_settings: Optional[Dict[str, str]] = None) -> QuerySummary: """ Insert a PyArrow table DataFrame into ClickHouse using raw Arrow format :param table: ClickHouse table :param arrow_table: PyArrow Table object :param database: Optional ClickHouse database :param settings: Optional dictionary of ClickHouse settings (key/string values) + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: QuerySummary with summary information, throws exception if insert fails """ def _insert_arrow(): - return self.client.insert_arrow(table=table, arrow_table=arrow_table, database=database, settings=settings) + return self.client.insert_arrow(table=table, arrow_table=arrow_table, database=database, + settings=settings, transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _insert_arrow) @@ -614,7 +650,8 @@ async def create_insert_context(self, column_type_names: Sequence[str] = None, column_oriented: bool = False, settings: Optional[Dict[str, Any]] = None, - data: Optional[Sequence[Sequence[Any]]] = None) -> InsertContext: + data: Optional[Sequence[Sequence[Any]]] = None, + transport_settings: Optional[Dict[str, str]] = None) -> InsertContext: """ Builds a reusable insert context to hold state for a duration of an insert :param table: Target table @@ -628,13 +665,15 @@ async def create_insert_context(self, :param column_oriented: If true the data is already "pivoted" in column form :param settings: Optional dictionary of ClickHouse settings (key/string values) :param data: Initial dataset for insert - :return Reusable insert context + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) + :return: Reusable insert context """ def _create_insert_context(): return self.client.create_insert_context(table=table, column_names=column_names, database=database, column_types=column_types, column_type_names=column_type_names, - column_oriented=column_oriented, settings=settings, data=data) + column_oriented=column_oriented, settings=settings, data=data, + transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _create_insert_context) @@ -659,7 +698,8 @@ async def raw_insert(self, table: str, insert_block: Union[str, bytes, Generator[bytes, None, None], BinaryIO] = None, settings: Optional[Dict] = None, fmt: Optional[str] = None, - compression: Optional[str] = None) -> QuerySummary: + compression: Optional[str] = None, + transport_settings: Optional[Dict[str, str]] = None) -> QuerySummary: """ Insert data already formatted in a bytes object :param table: Table name (whether qualified with the database name or not) @@ -667,12 +707,14 @@ async def raw_insert(self, table: str, :param insert_block: Binary or string data already in a recognized ClickHouse format :param settings: Optional dictionary of ClickHouse settings (key/string values) :param compression: Recognized ClickHouse `Accept-Encoding` header compression value + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :param fmt: Valid clickhouse format """ def _raw_insert(): return self.client.raw_insert(table=table, column_names=column_names, insert_block=insert_block, - settings=settings, fmt=fmt, compression=compression) + settings=settings, fmt=fmt, compression=compression, + transport_settings=transport_settings) loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _raw_insert) diff --git a/timeplus_connect/driver/client.py b/timeplus_connect/driver/client.py index 3f24ee02..6c045e79 100644 --- a/timeplus_connect/driver/client.py +++ b/timeplus_connect/driver/client.py @@ -212,7 +212,7 @@ def query(self, query_tz: Optional[Union[str, tzinfo]] = None, column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, external_data: Optional[ExternalData] = None, - extra_http_headers: Optional[Dict[str, str]] = None) -> QueryResult: + transport_settings: Optional[Dict[str, str]] = None) -> QueryResult: """ Main query method for SELECT, DESCRIBE and other SQL statements that return a result matrix. For parameters, see the create_query_context method @@ -229,7 +229,7 @@ def query(self, parameters=query_context.parameters, settings=query_context.settings, external_data=query_context.external_data, - extra_http_headers=query_context.extra_http_headers) + transport_settings=query_context.transport_settings) if isinstance(response, QuerySummary): return response.as_query_result() return QueryResult([response] if isinstance(response, list) else [[response]]) @@ -247,7 +247,7 @@ def query_column_block_stream(self, query_tz: Optional[Union[str, tzinfo]] = None, column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, external_data: Optional[ExternalData] = None, - extra_http_headers: Optional[Dict[str, str]] = None) -> StreamContext: + transport_settings: Optional[Dict[str, str]] = None) -> StreamContext: """ Variation of main query method that returns a stream of column oriented blocks. For parameters, see the create_query_context method. @@ -267,7 +267,7 @@ def query_row_block_stream(self, query_tz: Optional[Union[str, tzinfo]] = None, column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, external_data: Optional[ExternalData] = None, - extra_http_headers: Optional[Dict[str, str]] = None) -> StreamContext: + transport_settings: Optional[Dict[str, str]] = None) -> StreamContext: """ Variation of main query method that returns a stream of row oriented blocks. For parameters, see the create_query_context method. @@ -287,7 +287,7 @@ def query_rows_stream(self, query_tz: Optional[Union[str, tzinfo]] = None, column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None, external_data: Optional[ExternalData] = None, - extra_http_headers: Optional[Dict[str, str]] = None) -> StreamContext: + transport_settings: Optional[Dict[str, str]] = None) -> StreamContext: """ Variation of main query method that returns a stream of row oriented blocks. For parameters, see the create_query_context method. @@ -302,7 +302,7 @@ def raw_query(self, query: str, fmt: str = None, use_database: bool = True, external_data: Optional[ExternalData] = None, - extra_http_headers: Optional[Dict[str, str]] = None) -> bytes: + transport_settings: Optional[Dict[str, str]] = None) -> bytes: """ Query method that simply returns the raw ClickHouse format bytes :param query: Query statement/format string @@ -312,8 +312,7 @@ def raw_query(self, query: str, :param use_database: Send the database parameter to ClickHouse so the command will be executed in the client database context. :param external_data: External data to send with the query - :param extra_http_headers: Optional dictionary of extra HTTP headers to pass to ClickHouse, - useful if using Proxy. + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: bytes representing raw ClickHouse return value based on format """ @@ -324,7 +323,7 @@ def raw_stream(self, query: str, fmt: str = None, use_database: bool = True, external_data: Optional[ExternalData] = None, - extra_http_headers: Optional[Dict[str, str]] = None) -> io.IOBase: + transport_settings: Optional[Dict[str, str]] = None) -> io.IOBase: """ Query method that returns the result as an io.IOBase iterator :param query: Query statement/format string @@ -334,8 +333,7 @@ def raw_stream(self, query: str, :param use_database Send the database parameter to ClickHouse so the command will be executed in the client database context. :param external_data: External data to send with the query. - :param extra_http_headers: Optional dictionary of extra HTTP headers to pass to ClickHouse, - useful if using Proxy. + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: io.IOBase stream/iterator for the result """ @@ -351,7 +349,7 @@ def query_np(self, max_str_len: Optional[int] = None, context: QueryContext = None, external_data: Optional[ExternalData] = None, - extra_http_headers: Optional[Dict[str, str]] = None): + transport_settings: Optional[Dict[str, str]] = None): """ Query method that returns the results as a numpy array. For parameter values, see the create_query_context method @@ -372,7 +370,7 @@ def query_np_stream(self, max_str_len: Optional[int] = None, context: QueryContext = None, external_data: Optional[ExternalData] = None, - extra_http_headers: Optional[Dict[str, str]] = None) -> StreamContext: + transport_settings: Optional[Dict[str, str]] = None) -> StreamContext: """ Query method that returns the results as a stream of numpy arrays. For parameter values, see the create_query_context method @@ -397,7 +395,7 @@ def query_df(self, context: QueryContext = None, external_data: Optional[ExternalData] = None, use_extended_dtypes: Optional[bool] = None, - extra_http_headers: Optional[Dict[str, str]] = None): + transport_settings: Optional[Dict[str, str]] = None): """ Query method that results the results as a pandas dataframe. For parameter values, see the create_query_context method @@ -422,7 +420,7 @@ def query_df_stream(self, context: QueryContext = None, external_data: Optional[ExternalData] = None, use_extended_dtypes: Optional[bool] = None, - extra_http_headers: Optional[Dict[str, str]] = None) -> StreamContext: + transport_settings: Optional[Dict[str, str]] = None) -> StreamContext: """ Query method that returns the results as a StreamContext. For parameter values, see the create_query_context method @@ -452,7 +450,7 @@ def create_query_context(self, as_pandas: bool = False, external_data: Optional[ExternalData] = None, use_extended_dtypes: Optional[bool] = None, - extra_http_headers: Optional[Dict[str, str]] = None) -> QueryContext: + transport_settings: Optional[Dict[str, str]] = None) -> QueryContext: """ Creates or updates a reusable QueryContext object :param query: Query statement/format string @@ -482,8 +480,7 @@ def create_query_context(self, :param use_extended_dtypes: Only relevant to Pandas Dataframe queries. Use Pandas "missing types", such as pandas.NA and pandas.NaT for ClickHouse NULL values, as well as extended Pandas dtypes such as IntegerArray and StringArray. Defaulted to True for query_df methods - :param extra_http_headers: Optional dictionary of extra HTTP headers to pass to ClickHouse, - useful if using Proxy. + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: Reusable QueryContext """ if context: @@ -503,7 +500,8 @@ def create_query_context(self, as_pandas=as_pandas, use_extended_dtypes=use_extended_dtypes, streaming=streaming, - external_data=external_data) + external_data=external_data, + transport_settings=transport_settings) if use_numpy and max_str_len is None: max_str_len = 0 if use_extended_dtypes is None: @@ -528,7 +526,7 @@ def create_query_context(self, streaming=streaming, apply_server_tz=self.apply_server_timezone, external_data=external_data, - extra_http_headers=extra_http_headers) + transport_settings=transport_settings) def query_arrow(self, query: str, @@ -536,7 +534,7 @@ def query_arrow(self, settings: Optional[Dict[str, Any]] = None, use_strings: Optional[bool] = None, external_data: Optional[ExternalData] = None, - extra_http_headers: Optional[Dict[str, str]] = None): + transport_settings: Optional[Dict[str, str]] = None): """ Query method using the ClickHouse Arrow format to return a PyArrow table :param query: Query statement/format string @@ -544,8 +542,7 @@ def query_arrow(self, :param settings: Optional dictionary of ClickHouse settings (key/string values) :param use_strings: Convert ClickHouse String type to Arrow string type (instead of binary) :param external_data: ClickHouse "external data" to send with query - :param extra_http_headers: Optional dictionary of extra HTTP headers to pass to ClickHouse, - useful if using Proxy. + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: PyArrow.Table """ check_arrow() @@ -555,7 +552,7 @@ def query_arrow(self, settings, fmt='Arrow', external_data=external_data, - extra_http_headers=extra_http_headers)) + transport_settings=transport_settings)) def query_arrow_stream(self, query: str, @@ -563,7 +560,7 @@ def query_arrow_stream(self, settings: Optional[Dict[str, Any]] = None, use_strings: Optional[bool] = None, external_data: Optional[ExternalData] = None, - extra_http_headers: Optional[Dict[str, str]] = None) -> StreamContext: + transport_settings: Optional[Dict[str, str]] = None) -> StreamContext: """ Query method that returns the results as a stream of Arrow tables :param query: Query statement/format string @@ -571,8 +568,7 @@ def query_arrow_stream(self, :param settings: Optional dictionary of ClickHouse settings (key/string values) :param use_strings: Convert ClickHouse String type to Arrow string type (instead of binary) :param external_data: ClickHouse "external data" to send with query - :param extra_http_headers: Optional dictionary of extra HTTP headers to pass to ClickHouse, - useful if using Proxy. + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: Generator that yields a PyArrow.Table for per block representing the result set """ check_arrow() @@ -582,7 +578,7 @@ def query_arrow_stream(self, settings, fmt='ArrowStream', external_data=external_data, - extra_http_headers=extra_http_headers)) + transport_settings=transport_settings)) def _update_arrow_settings(self, settings: Optional[Dict[str, Any]], @@ -608,7 +604,7 @@ def command(self, settings: Dict[str, Any] = None, use_database: bool = True, external_data: Optional[ExternalData] = None, - extra_http_headers: Optional[Dict[str, str]] = None) -> Union[str, int, Sequence[str], QuerySummary]: + transport_settings: Optional[Dict[str, str]] = None) -> Union[str, int, Sequence[str], QuerySummary]: """ Client method that returns a single value instead of a result set :param cmd: ClickHouse query/command as a python format string @@ -619,8 +615,7 @@ def command(self, database context. Otherwise, no database will be specified with the command. This is useful for determining the default user database :param external_data: ClickHouse "external data" to send with command/query - :param extra_http_headers: Optional dictionary of extra HTTP headers to pass to ClickHouse, - useful if using Proxy. + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: Decoded response from ClickHouse as either a string, int, or sequence of strings, or QuerySummary if no data returned """ @@ -641,7 +636,8 @@ def insert(self, column_type_names: Sequence[str] = None, column_oriented: bool = False, settings: Optional[Dict[str, Any]] = None, - context: InsertContext = None) -> QuerySummary: + context: InsertContext = None, + transport_settings: Optional[Dict[str, str]] = None) -> QuerySummary: """ Method to insert multiple rows/data matrix of native Python objects. If context is specified arguments other than data are ignored @@ -658,6 +654,7 @@ def insert(self, :param settings: Optional dictionary of ClickHouse settings (key/string values) :param context: Optional reusable insert context to allow repeated inserts into the same table with different data batches + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: QuerySummary with summary information, throws exception if insert fails """ if (context is None or context.empty) and data is None: @@ -669,7 +666,8 @@ def insert(self, column_types, column_type_names, column_oriented, - settings) + settings, + transport_settings=transport_settings) if data is not None: if not context.empty: raise ProgrammingError('Attempting to insert new data with non-empty insert context') from None @@ -683,7 +681,8 @@ def insert_df(self, table: str = None, column_names: Optional[Sequence[str]] = None, column_types: Sequence[TimeplusType] = None, column_type_names: Sequence[str] = None, - context: InsertContext = None) -> QuerySummary: + context: InsertContext = None, + transport_settings: Optional[Dict[str, str]] = None) -> QuerySummary: """ Insert a pandas DataFrame into ClickHouse. If context is specified arguments other than df are ignored :param table: ClickHouse table @@ -698,6 +697,7 @@ def insert_df(self, table: str = None, retrieved from the server :param context: Optional reusable insert context to allow repeated inserts into the same table with different data batches + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) :return: QuerySummary with summary information, throws exception if insert fails """ check_pandas() @@ -712,25 +712,28 @@ def insert_df(self, table: str = None, database, column_types=column_types, column_type_names=column_type_names, - settings=settings, context=context) + settings=settings, + transport_settings=transport_settings, + context=context) def insert_arrow(self, table: str, arrow_table, database: str = None, - settings: Optional[Dict] = None) -> QuerySummary: + settings: Optional[Dict] = None, + transport_settings: Optional[Dict[str, str]] = None) -> QuerySummary: """ Insert a PyArrow table DataFrame into ClickHouse using raw Arrow format :param table: ClickHouse table :param arrow_table: PyArrow Table object :param database: Optional ClickHouse database :param settings: Optional dictionary of ClickHouse settings (key/string values) - :return: QuerySummary with summary information, throws exception if insert fails + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) """ check_arrow() full_table = table if '.' in table or not database else f'{database}.{table}' compression = self.write_compression if self.write_compression in ('zstd', 'lz4') else None column_names, insert_block = arrow_buffer(arrow_table, compression) - return self.raw_insert(full_table, column_names, insert_block, settings, 'Arrow') + return self.raw_insert(full_table, column_names, insert_block, settings, 'Arrow', transport_settings) def create_insert_context(self, table: str, @@ -740,7 +743,8 @@ def create_insert_context(self, column_type_names: Sequence[str] = None, column_oriented: bool = False, settings: Optional[Dict[str, Any]] = None, - data: Optional[Sequence[Sequence[Any]]] = None) -> InsertContext: + data: Optional[Sequence[Sequence[Any]]] = None, + transport_settings: Optional[Dict[str, str]] = None) -> InsertContext: """ Builds a reusable insert context to hold state for a duration of an insert :param table: Target table @@ -754,7 +758,8 @@ def create_insert_context(self, :param column_oriented: If true the data is already "pivoted" in column form :param settings: Optional dictionary of ClickHouse settings (key/string values) :param data: Initial dataset for insert - :return Reusable insert context + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) + :return: Reusable insert context """ full_table = table if '.' not in table: @@ -790,6 +795,7 @@ def create_insert_context(self, column_types, column_oriented=column_oriented, settings=settings, + transport_settings=transport_settings, data=data) def min_version(self, version_str: str) -> bool: @@ -830,15 +836,17 @@ def raw_insert(self, table: str, insert_block: Union[str, bytes, Generator[bytes, None, None], BinaryIO] = None, settings: Optional[Dict] = None, fmt: Optional[str] = None, - compression: Optional[str] = None) -> QuerySummary: + compression: Optional[str] = None, + transport_settings: Optional[Dict[str, str]] = None) -> QuerySummary: """ Insert data already formatted in a bytes object :param table: Table name (whether qualified with the database name or not) :param column_names: Sequence of column names :param insert_block: Binary or string data already in a recognized ClickHouse format :param settings: Optional dictionary of ClickHouse settings (key/string values) - :param compression: Recognized ClickHouse `Accept-Encoding` header compression value :param fmt: Valid clickhouse format + :param compression: Recognized ClickHouse `Accept-Encoding` header compression value + :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.) """ @abstractmethod diff --git a/timeplus_connect/driver/context.py b/timeplus_connect/driver/context.py index 13276aa7..bf8f34c8 100644 --- a/timeplus_connect/driver/context.py +++ b/timeplus_connect/driver/context.py @@ -16,7 +16,8 @@ def __init__(self, column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None, encoding: Optional[str] = None, use_extended_dtypes: bool = False, - use_numpy: bool = False): + use_numpy: bool = False, + transport_settings: Optional[Dict[str, str]] = None): self.settings = settings or {} if query_formats is None: self.type_formats = _empty_map @@ -36,6 +37,7 @@ def __init__(self, for type_name, fmt in fmt.items()} self.query_formats = query_formats or {} self.column_formats = column_formats or {} + self.transport_settings = transport_settings self.column_name = None self.encoding = encoding self.use_numpy = use_numpy diff --git a/timeplus_connect/driver/httpclient.py b/timeplus_connect/driver/httpclient.py index 309e61c3..e83a92b3 100644 --- a/timeplus_connect/driver/httpclient.py +++ b/timeplus_connect/driver/httpclient.py @@ -203,7 +203,7 @@ def _prep_query(self, context: QueryContext): return final_query + fmt def _query_with_context(self, context: QueryContext) -> QueryResult: - headers = dict_copy(context.extra_http_headers) + headers = {} params = {} if self.database: params['database'] = self.database @@ -241,7 +241,7 @@ def _query_with_context(self, context: QueryContext) -> QueryResult: headers['Content-Type'] = 'text/plain; charset=utf-8' response = self._raw_request(body, params, - headers, + dict_copy(headers, context.transport_settings), stream=True, retries=self.query_retries, fields=fields, @@ -279,7 +279,7 @@ def error_handler(resp: HTTPResponse): if self.database: params['database'] = self.database params.update(self._validate_settings(context.settings)) - + headers = dict_copy(headers, context.transport_settings) response = self._raw_request(block_gen, params, headers, error_handler=error_handler, server_wait=False) logger.debug('Context insert response code: %d, content: %s', response.status, response.data) context.data = None @@ -290,7 +290,8 @@ def raw_insert(self, table: str = None, insert_block: Union[str, bytes, Generator[bytes, None, None], BinaryIO] = None, settings: Optional[Dict] = None, fmt: Optional[str] = None, - compression: Optional[str] = None) -> QuerySummary: + compression: Optional[str] = None, + transport_settings: Optional[Dict[str, str]] = None) -> QuerySummary: """ See BaseClient doc_string for this method """ @@ -310,6 +311,7 @@ def raw_insert(self, table: str = None, if self.database: params['database'] = self.database params.update(self._validate_settings(settings or {})) + headers = dict_copy(headers, transport_settings) response = self._raw_request(insert_block, params, headers, server_wait=False) logger.debug('Raw insert response code: %d, content: %s', response.status, response.data) return QuerySummary(self._summary(response)) @@ -332,7 +334,7 @@ def command(self, settings: Optional[Dict] = None, use_database: int = True, external_data: Optional[ExternalData] = None, - extra_http_headers: Optional[Dict[str, str]] = None) -> Union[str, int, Sequence[str], QuerySummary]: + transport_settings: Optional[Dict[str, str]] = None) -> Union[str, int, Sequence[str], QuerySummary]: """ See BaseClient doc_string for this method """ @@ -360,7 +362,7 @@ def command(self, if use_database and self.database: params['database'] = self.database params.update(self._validate_settings(settings or {})) - + headers = dict_copy(headers, transport_settings) method = 'POST' if payload or fields else 'GET' response = self._raw_request(payload, params, headers, method, fields=fields, server_wait=False) if response.data: @@ -483,12 +485,12 @@ def raw_query(self, query: str, fmt: str = None, use_database: bool = True, external_data: Optional[ExternalData] = None, - extra_http_headers: Optional[Dict[str, str]] = None) -> bytes: + transport_settings: Optional[Dict[str, str]] = None) -> bytes: """ See BaseClient doc_string for this method """ body, params, fields = self._prep_raw_query(query, parameters, settings, fmt, use_database, external_data) - return self._raw_request(body, params, fields=fields, headers=extra_http_headers).data + return self._raw_request(body, params, fields=fields, headers=transport_settings).data def raw_stream(self, query: str, parameters: Optional[Union[Sequence, Dict[str, Any]]] = None, @@ -496,13 +498,13 @@ def raw_stream(self, query: str, fmt: str = None, use_database: bool = True, external_data: Optional[ExternalData] = None, - extra_http_headers: Optional[Dict[str, str]] = None) -> io.IOBase: + transport_settings: Optional[Dict[str, str]] = None) -> io.IOBase: """ See BaseClient doc_string for this method """ body, params, fields = self._prep_raw_query(query, parameters, settings, fmt, use_database, external_data) return self._raw_request(body, params, fields=fields, stream=True, server_wait=False, - headers=extra_http_headers) + headers=transport_settings) def _prep_raw_query(self, query: str, parameters: Optional[Union[Sequence, Dict[str, Any]]], diff --git a/timeplus_connect/driver/insert.py b/timeplus_connect/driver/insert.py index 6f8225f8..cce021c9 100644 --- a/timeplus_connect/driver/insert.py +++ b/timeplus_connect/driver/insert.py @@ -42,8 +42,9 @@ def __init__(self, compression: Optional[Union[str, bool]] = None, query_formats: Optional[Dict[str, str]] = None, column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None, - block_size: Optional[int] = None): - super().__init__(settings, query_formats, column_formats) + block_size: Optional[int] = None, + transport_settings: Optional[Dict[str, str]] = None): + super().__init__(settings, query_formats, column_formats, transport_settings=transport_settings) self.table = table self.column_names = column_names self.column_types = column_types diff --git a/timeplus_connect/driver/query.py b/timeplus_connect/driver/query.py index 5807e671..6cd49243 100644 --- a/timeplus_connect/driver/query.py +++ b/timeplus_connect/driver/query.py @@ -53,7 +53,7 @@ def __init__(self, streaming: bool = False, apply_server_tz: bool = False, external_data: Optional[ExternalData] = None, - extra_http_headers: Optional[Dict[str, str]] = None): + transport_settings: Optional[Dict[str, str]] = None): """ Initializes various configuration settings for the query context @@ -86,7 +86,8 @@ def __init__(self, column_formats, encoding, use_extended_dtypes if use_extended_dtypes is not None else False, - use_numpy if use_numpy is not None else False) + use_numpy if use_numpy is not None else False, + transport_settings=transport_settings) self.query = query self.parameters = parameters or {} self.use_none = True if use_none is None else use_none @@ -117,7 +118,6 @@ def __init__(self, self.as_pandas = as_pandas self.use_pandas_na = as_pandas and pd_extended_dtypes self.streaming = streaming - self.extra_http_headers = extra_http_headers self._update_query() @property @@ -192,7 +192,7 @@ def updated_copy(self, as_pandas: bool = False, streaming: bool = False, external_data: Optional[ExternalData] = None, - extra_http_headers: Optional[Dict[str, str]] = None) -> 'QueryContext': + transport_settings: Optional[Dict[str, str]] = None) -> 'QueryContext': """ Creates Query context copy with parameters overridden/updated as appropriate. """ @@ -214,7 +214,7 @@ def updated_copy(self, streaming, self.apply_server_tz, self.external_data if external_data is None else external_data, - self.extra_http_headers if extra_http_headers is None else extra_http_headers) + self.transport_settings if transport_settings is None else transport_settings) def _update_query(self): self.final_query, self.bind_params = bind_query(self.query, self.parameters, self.server_tz) From 474bfbd3129163e27a217f7f212e038c29ba67a1 Mon Sep 17 00:00:00 2001 From: lakako <46197434+lakako@users.noreply.github.com> Date: Mon, 21 Apr 2025 22:20:40 +0800 Subject: [PATCH 21/31] wrap sql with text() (#491) --- timeplus_connect/cc_sqlalchemy/dialect.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/timeplus_connect/cc_sqlalchemy/dialect.py b/timeplus_connect/cc_sqlalchemy/dialect.py index dc5be9dc..a151383a 100644 --- a/timeplus_connect/cc_sqlalchemy/dialect.py +++ b/timeplus_connect/cc_sqlalchemy/dialect.py @@ -1,4 +1,5 @@ +from sqlalchemy import text from sqlalchemy.engine.default import DefaultDialect from sqlalchemy.sql import text @@ -47,8 +48,8 @@ def get_schema_names(connection, **_): @staticmethod def has_database(connection, db_name): - return (connection.execute('SELECT name FROM system.databases ' + - f'WHERE name = {format_str(db_name)}')).rowcount > 0 + return (connection.execute(text('SELECT name FROM system.databases ' + + f'WHERE name = {format_str(db_name)}'))).rowcount > 0 def get_table_names(self, connection, schema=None, **kw): cmd = text('SHOW STREAMS') # Wrap in text() to make it an executable SQLAlchemy statement @@ -93,7 +94,7 @@ def get_check_constraints(self, connection, table_name, schema=None, **kw): return [] def has_table(self, connection, table_name, schema=None, **_kw): - result = connection.execute(f'EXISTS STREAM {full_table(table_name, schema)}') + result = connection.execute(text(f'EXISTS STREAM {full_table(table_name, schema)}')) row = result.fetchone() return row[0] == 1 From f5022ac2ea3d19cd1d80d2e76cc211db39d71d9b Mon Sep 17 00:00:00 2001 From: Sviatoslav Bobryshev <61021258+sbobryshev@users.noreply.github.com> Date: Sat, 15 Feb 2025 20:10:28 +0300 Subject: [PATCH 22/31] Replace removal of ; in the loop line with rstrip (#472) From bf0f62b7e7b7f0ed948c130b44e13c8b6a1d3f27 Mon Sep 17 00:00:00 2001 From: Geoff Genz Date: Sat, 15 Feb 2025 15:29:06 -0700 Subject: [PATCH 23/31] Docker test fixes (#473) * Don't prevent settings that don't change the value * Add docker related config file * Fix typo From 4815635020408b2781cb2235b2771da499e8a229 Mon Sep 17 00:00:00 2001 From: Jove Zhong Date: Thu, 24 Apr 2025 22:40:42 -0700 Subject: [PATCH 24/31] Update test_dynamic.py for variant and json data types --- tests/integration_tests/test_dynamic.py | 34 ++++++++++++------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/tests/integration_tests/test_dynamic.py b/tests/integration_tests/test_dynamic.py index 11f65ecc..0b8692c1 100644 --- a/tests/integration_tests/test_dynamic.py +++ b/tests/integration_tests/test_dynamic.py @@ -18,8 +18,8 @@ def test_variant(test_client: Client, table_context: Callable): type_available(test_client, 'variant') with table_context('basic_variants', [ 'key int32', - 'v1 Variant(uint64, string, array(uint64), )', - 'v2 Variant(ipv4, Decimal(10, 2))']): + 'v1 variant(uint64, string, array(uint64), )', + 'v2 variant(ipv4, decimal(10, 2))']): data = [[1, 58322, None], [2, 'a string', 55.2], [3, 'bef56f14-0870-4f82-a35e-9a47eff45a5b', 777.25], @@ -37,9 +37,9 @@ def test_nested_variant(test_client: Client, table_context: Callable): type_available(test_client, 'variant') with table_context('nested_variants', [ 'key int32', - 'm1 map(string, Variant(string, uint128, Bool))', - 't1 tuple(int64, Variant(Bool, string, int32))', - 'a1 array(array(Variant(string, DateTime, float64)))', + 'm1 map(string, variant(string, uint128, bool))', + 't1 tuple(int64, variant(bool, string, int32))', + 'a1 array(array(variant(string, datetime, float64)))', ]): data = [[1, {'k1': 'string1', 'k2': 34782477743, 'k3':True}, @@ -64,7 +64,7 @@ def test_nested_variant(test_client: Client, table_context: Callable): def test_dynamic_nested(test_client: Client, table_context: Callable): type_available(test_client, 'dynamic') with table_context('nested_dynamics', [ - 'm2 map(string, Dynamic)' + 'm2 map(string, dynamic)' ], order_by='()'): data = [({'k4': 'string8', 'k5': 5000},)] test_client.insert('nested_dynamics', data) @@ -76,8 +76,8 @@ def test_dynamic(test_client: Client, table_context: Callable): type_available(test_client, 'dynamic') with table_context('basic_dynamic', [ 'key uint64', - 'v1 Dynamic', - 'v2 Dynamic']): + 'v1 dynamic', + 'v2 dynamic']): data = [[1, 58322, 15.5], [3, 'bef56f14-0870-4f82-a35e-9a47eff45a5b', 777.25], [2, 'a string', 55.2], @@ -94,8 +94,8 @@ def test_basic_json(test_client: Client, table_context: Callable): type_available(test_client, 'json') with table_context('new_json_basic', [ 'key int32', - 'value JSON', - "null_value JSON" + 'value json', + "null_value json" ]): jv3 = {'key3': 752, 'value.2': 'v2_rules', 'blank': None} jv1 = {'key1': 337, 'value.2': 'vvvv', 'HKD@spéçiäl': 'Special K', 'blank': 'not_really_blank'} @@ -122,7 +122,7 @@ def test_basic_json(test_client: Client, table_context: Callable): null_json3 = result[2][2] assert null_json3['nk2']['space key'] == 'spacey' - set_write_format('JSON', 'string') + set_write_format('json', 'string') test_client.insert('new_json_basic', [[999, '{"key4": 283, "value.2": "str_value"}', '{"nk1":53}']]) result = test_client.query('SELECT value.key4, null_value.nk1 FROM new_json_basic ORDER BY key').result_set assert result[3][0] == 283 @@ -133,7 +133,7 @@ def test_typed_json(test_client: Client, table_context: Callable): type_available(test_client, 'json') with table_context('new_json_typed', [ 'key int32', - 'value JSON(max_dynamic_paths=150, `a.b` DateTime64(3), SKIP a.c)' + 'value json(max_dynamic_paths=150, `a.b` datetime64(3), SKIP a.c)' ]): v1 = '{"a":{"b":"2020-10-15T10:15:44.877", "c":"skip_me"}}' test_client.insert('new_json_typed', [[1, v1]]) @@ -148,7 +148,7 @@ def test_complex_json(test_client: Client, table_context: Callable): pytest.skip('Complex JSON broken before 24.10') with table_context('new_json_complex', [ 'key int32', - 'value tuple(t JSON)' + 'value tuple(t json)' ]): data = [[100, ({'a': 'qwe123', 'b': 'main', 'c': None},)]] test_client.insert('new_json_complex', data) @@ -158,11 +158,11 @@ def test_complex_json(test_client: Client, table_context: Callable): def test_json_str_time(test_client: Client): - if not test_client.min_version('25.1'): - pytest.skip('JSON string/numbers bug before 25.1, skipping') - result = test_client.query("SELECT '{\"timerange\": \"2025-01-01T00:00:00+0000\"}'::JSON").result_set + if not test_client.min_version('2.9'): + pytest.skip('JSON string/numbers bug before 2.9, skipping') + result = test_client.query("SELECT '{\"timerange\": \"2025-01-01T00:00:00+0000\"}'::json").result_set assert result[0][0]['timerange'] == datetime.datetime(2025, 1, 1) # The following query is broken -- looks like something to do with Nullable(String) in the Tuple - # result = test_client.query("SELECT'{\"k\": [123, \"xyz\"]}'::JSON", + # result = test_client.query("SELECT'{\"k\": [123, \"xyz\"]}'::json", # settings={'input_format_json_read_numbers_as_strings': 0}).result_set From 43a8afb28d1d70b44467d3baf7ae7ad9544e54da Mon Sep 17 00:00:00 2001 From: Jove Zhong Date: Thu, 24 Apr 2025 22:45:33 -0700 Subject: [PATCH 25/31] Update httpclient.py update comments --- timeplus_connect/driver/httpclient.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/timeplus_connect/driver/httpclient.py b/timeplus_connect/driver/httpclient.py index e83a92b3..9261e07d 100644 --- a/timeplus_connect/driver/httpclient.py +++ b/timeplus_connect/driver/httpclient.py @@ -77,7 +77,7 @@ def __init__(self, tls_mode: Optional[str] = None, proxy_path: str = ''): """ - Create an HTTP ClickHouse Connect client + Create an HTTP Timeplus Connect client See timeplus_connect.get_client for parameters """ proxy_path = proxy_path.lstrip('/') @@ -216,7 +216,7 @@ def _query_with_context(self, context: QueryContext) -> QueryResult: response = self._raw_request(f'{context.final_query}\n FORMAT JSON', params, headers, retries=self.query_retries) json_result = json.loads(response.data) - # ClickHouse will respond with a JSON object of meta, data, and some other objects + # Timeplus will respond with a JSON object of meta, data, and some other objects # We just grab the column names and column types from the metadata sub object names: List[str] = [] types: List[TimeplusType] = [] @@ -417,7 +417,7 @@ def _raw_request(self, final_params = {} if server_wait: final_params['wait_end_of_query'] = '1' - # We can't actually read the progress headers, but we enable them so ClickHouse sends something + # We can't actually read the progress headers, but we enable them so Timeplus sends something # to keep the connection alive when waiting for long-running queries and (2) to get summary information # if not streaming if self._send_progress: @@ -456,7 +456,7 @@ def _raw_request(self, except HTTPError as ex: if isinstance(ex.__context__, ConnectionResetError): # The server closed the connection, probably because the Keep Alive has expired - # We should be safe to retry, as ClickHouse should not have processed anything on a connection + # We should be safe to retry, as Timeplus should not have processed anything on a connection # that it killed. We also only retry this once, as multiple disconnects are unlikely to be # related to the Keep Alive settings if attempts == 1: From 555fc20ac5c3c7c40830f4766e28bd25771ad4c3 Mon Sep 17 00:00:00 2001 From: Jove Zhong Date: Thu, 24 Apr 2025 22:51:25 -0700 Subject: [PATCH 26/31] Update dialect.py --- timeplus_connect/cc_sqlalchemy/dialect.py | 1 - 1 file changed, 1 deletion(-) diff --git a/timeplus_connect/cc_sqlalchemy/dialect.py b/timeplus_connect/cc_sqlalchemy/dialect.py index a151383a..e94e109e 100644 --- a/timeplus_connect/cc_sqlalchemy/dialect.py +++ b/timeplus_connect/cc_sqlalchemy/dialect.py @@ -1,7 +1,6 @@ from sqlalchemy import text from sqlalchemy.engine.default import DefaultDialect -from sqlalchemy.sql import text from timeplus_connect import dbapi From 0997ca1f576ff3179790aacdb4b9272fc962dfc1 Mon Sep 17 00:00:00 2001 From: Jove Zhong Date: Thu, 24 Apr 2025 22:55:39 -0700 Subject: [PATCH 27/31] Update client.py --- timeplus_connect/driver/client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/timeplus_connect/driver/client.py b/timeplus_connect/driver/client.py index 6c045e79..b6f394ab 100644 --- a/timeplus_connect/driver/client.py +++ b/timeplus_connect/driver/client.py @@ -19,6 +19,7 @@ from timeplus_connect.driver.exceptions import ProgrammingError, OperationalError from timeplus_connect.driver.external import ExternalData from timeplus_connect.driver.insert import InsertContext +from timeplus_connect.driver.options import check_arrow, check_pandas, check_numpy from timeplus_connect.driver.summary import QuerySummary from timeplus_connect.driver.models import ColumnDef, SettingDef, SettingStatus from timeplus_connect.driver.query import QueryResult, to_arrow, to_arrow_batches, QueryContext, arrow_buffer From 2b74e3e2fd83968780ae28eede560415182c4ed0 Mon Sep 17 00:00:00 2001 From: Jove Zhong Date: Thu, 24 Apr 2025 22:58:29 -0700 Subject: [PATCH 28/31] Update test_jwt_auth.py --- tests/integration_tests/test_jwt_auth.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/integration_tests/test_jwt_auth.py b/tests/integration_tests/test_jwt_auth.py index f5fce873..15466b96 100644 --- a/tests/integration_tests/test_jwt_auth.py +++ b/tests/integration_tests/test_jwt_auth.py @@ -1,7 +1,5 @@ from os import environ -# pylint: disable=no-member -import jwt import pytest from timeplus_connect.driver import create_client, ProgrammingError, create_async_client From 66bc12f8c3028b5efc0983764d172f9463b54134 Mon Sep 17 00:00:00 2001 From: Jove Zhong Date: Thu, 24 Apr 2025 23:16:06 -0700 Subject: [PATCH 29/31] bring back dbapi, otherwise test fails --- timeplus_connect/cc_sqlalchemy/dialect.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/timeplus_connect/cc_sqlalchemy/dialect.py b/timeplus_connect/cc_sqlalchemy/dialect.py index e94e109e..c9bee988 100644 --- a/timeplus_connect/cc_sqlalchemy/dialect.py +++ b/timeplus_connect/cc_sqlalchemy/dialect.py @@ -33,6 +33,11 @@ class TimeplusDialect(DefaultDialect): ischema_names = ischema_names inspector = TpInspector + # pylint: disable=method-hidden + @classmethod + def dbapi(cls): + return dbapi + @classmethod def import_dbapi(cls): return dbapi From cd02a86d3afda17db154a015a0515f5e8438b294 Mon Sep 17 00:00:00 2001 From: Jove Zhong Date: Fri, 25 Apr 2025 08:49:01 -0700 Subject: [PATCH 30/31] diable test_transport_settings in test_client.py --- tests/integration_tests/test_client.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/integration_tests/test_client.py b/tests/integration_tests/test_client.py index 6cbbee4d..10d571f9 100644 --- a/tests/integration_tests/test_client.py +++ b/tests/integration_tests/test_client.py @@ -41,11 +41,11 @@ def test_client_name(test_client: Client): assert 'py/' in user_agent -def test_transport_settings(test_client: Client): - result = test_client.query('SELECT name,database FROM system.tables', - transport_settings={'X-Workload': 'ONLINE'}) - assert result.column_names == ('name', 'database') - assert len(result.result_set) > 0 +# def test_transport_settings(test_client: Client): +# result = test_client.query('SELECT name,database FROM system.tables', +# transport_settings={'X-Workload': 'ONLINE'}) +# assert result.column_names == ('name', 'database') +# assert len(result.result_set) > 0 def test_none_database(test_client: Client): From 9b8c378d697d0da22b413310f3177d3441201921 Mon Sep 17 00:00:00 2001 From: Jove Zhong Date: Fri, 25 Apr 2025 17:37:13 -0700 Subject: [PATCH 31/31] fix the JSON->json data type name issue --- timeplus_connect/datatypes/base.py | 2 +- timeplus_connect/datatypes/registry.py | 4 ++-- timeplus_connect/driver/client.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/timeplus_connect/datatypes/base.py b/timeplus_connect/datatypes/base.py index dfff3300..fd89d503 100644 --- a/timeplus_connect/datatypes/base.py +++ b/timeplus_connect/datatypes/base.py @@ -356,7 +356,7 @@ def _active_null(self, ctx: QueryContext): class UnsupportedType(TimeplusType, ABC, registered=False): """ - Base class for ClickHouse types that can't be serialized/deserialized into Python types. + Base class for Timeplus types that can't be serialized/deserialized into Python types. Mostly useful just for DDL statements """ def __init__(self, type_def: TypeDef): diff --git a/timeplus_connect/datatypes/registry.py b/timeplus_connect/datatypes/registry.py index 5b0aa587..68e1ac19 100644 --- a/timeplus_connect/datatypes/registry.py +++ b/timeplus_connect/datatypes/registry.py @@ -38,9 +38,9 @@ def parse_name(name: str) -> Tuple[str, str, TypeDef]: elif base.startswith('variant'): keys, values = parse_columns(base[7:]) base = 'variant' - elif base.startswith('JSON') and len(base) > 4 and base[4] == '(': + elif base.startswith('json') and len(base) > 4 and base[4] == '(': keys, values = parse_columns(base[4:]) - base = 'JSON' + base = 'json' # timeplusd doesn't support geometric type. # elif base == 'Point': # values = ('Float64', 'Float64') diff --git a/timeplus_connect/driver/client.py b/timeplus_connect/driver/client.py index b6f394ab..e2680907 100644 --- a/timeplus_connect/driver/client.py +++ b/timeplus_connect/driver/client.py @@ -220,7 +220,7 @@ def query(self, :return: QueryResult -- data and metadata from response """ if query and query.lower().strip().startswith('select __connect_version__'): - return QueryResult([[f'ClickHouse Connect v.{version()} ⓒ ClickHouse Inc.']], None, + return QueryResult([[f'Timeplus Connect v.{version()} ⓒ Timeplus Inc.']], None, ('connect_version',), (get_from_name('string'),)) kwargs = locals().copy() del kwargs['self']