diff --git a/.docker/clickhouse/single_node/config.xml b/.docker/clickhouse/single_node/config.xml
index c0f76f36..076ed6ce 100644
--- a/.docker/clickhouse/single_node/config.xml
+++ b/.docker/clickhouse/single_node/config.xml
@@ -1,13 +1,8 @@
- 8123
- 9000
-
- users.xml
default
default
-
5368709120
/var/lib/clickhouse/
@@ -15,9 +10,10 @@
/var/lib/clickhouse/user_files/
/var/lib/clickhouse/access/
3
+ /var/lib/clickhouse/format_schemas/
- debug
+ warning
/var/log/clickhouse-server/clickhouse-server.log
/var/log/clickhouse-server/clickhouse-server.err.log
1000M
@@ -37,5 +33,11 @@
+
+
+ users.xml
+
+
+
SQL_
diff --git a/.docker/clickhouse/single_node/docker_related_config.xml b/.docker/clickhouse/single_node/docker_related_config.xml
new file mode 100644
index 00000000..e02bb050
--- /dev/null
+++ b/.docker/clickhouse/single_node/docker_related_config.xml
@@ -0,0 +1,5 @@
+
+ 0.0.0.0
+ 8123
+ 9000
+
\ No newline at end of file
diff --git a/.docker/clickhouse/single_node_tls/Dockerfile b/.docker/clickhouse/single_node_tls/Dockerfile
index 0fd1a6f5..ca4438ae 100644
--- a/.docker/clickhouse/single_node_tls/Dockerfile
+++ b/.docker/clickhouse/single_node_tls/Dockerfile
@@ -1,4 +1,4 @@
-FROM clickhouse/clickhouse-server:24.8-alpine
+FROM clickhouse/clickhouse-server:25.1-alpine
COPY .docker/clickhouse/single_node_tls/certificates /etc/clickhouse-server/certs
RUN chown clickhouse:clickhouse -R /etc/clickhouse-server/certs \
&& chmod 600 /etc/clickhouse-server/certs/* \
diff --git a/.docker/clickhouse/single_node_tls/certificates/ca.crt b/.docker/clickhouse/single_node_tls/certificates/ca.crt
index 2dcfce0c..b6160d31 100644
--- a/.docker/clickhouse/single_node_tls/certificates/ca.crt
+++ b/.docker/clickhouse/single_node_tls/certificates/ca.crt
@@ -1,12 +1,14 @@
-----BEGIN CERTIFICATE-----
-MIIBxDCCAWoCCQCC7Dz9F36rcTAKBggqhkjOPQQDAjBqMQswCQYDVQQGEwJVUzER
-MA8GA1UECAwIQ29sb3JhZG8xDzANBgNVBAcMBkRlbnZlcjEYMBYGA1UECgwPQ2xp
-Y2tIb3VzZSBJbmMuMR0wGwYDVQQDDBRjbGlja2hvdXNlX3Rlc3Rfcm9vdDAeFw0y
-MzA0MjYyMTM4MzhaFw00MzA0MjYyMTM4MzhaMGoxCzAJBgNVBAYTAlVTMREwDwYD
-VQQIDAhDb2xvcmFkbzEPMA0GA1UEBwwGRGVudmVyMRgwFgYDVQQKDA9DbGlja0hv
-dXNlIEluYy4xHTAbBgNVBAMMFGNsaWNraG91c2VfdGVzdF9yb290MFkwEwYHKoZI
-zj0CAQYIKoZIzj0DAQcDQgAE8ajzpmv1YDspmgGcE+KjB2SxAQJ2/awkkP/SBvjw
-enD0ibQG5fyA5vxhPv7ImbnqebPS1NXwIt4HCkLXKVPDnzAKBggqhkjOPQQDAgNI
-ADBFAiAlQ8IWL7OQua7/dFaE8xbFy/hoKnLvuigDg9MAJNJUXwIhAIa0c3pT6z9P
-OX2Sw5mfl/YEDTgsG033S1MeAha3707H
+MIICOTCCAd+gAwIBAgIUDVFiObYZ48KdDkTlhKzVRf/KfJ0wCgYIKoZIzj0EAwIw
+ajELMAkGA1UEBhMCVVMxETAPBgNVBAgMCENvbG9yYWRvMQ8wDQYDVQQHDAZEZW52
+ZXIxGDAWBgNVBAoMD0NsaWNrSG91c2UgSW5jLjEdMBsGA1UEAwwUY2xpY2tob3Vz
+ZV90ZXN0X3Jvb3QwHhcNMjUwNDExMTgzOTA5WhcNMjUwNTExMTgzOTA5WjBqMQsw
+CQYDVQQGEwJVUzERMA8GA1UECAwIQ29sb3JhZG8xDzANBgNVBAcMBkRlbnZlcjEY
+MBYGA1UECgwPQ2xpY2tIb3VzZSBJbmMuMR0wGwYDVQQDDBRjbGlja2hvdXNlX3Rl
+c3Rfcm9vdDBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABPGo86Zr9WA7KZoBnBPi
+owdksQECdv2sJJD/0gb48Hpw9Im0BuX8gOb8YT7+yJm56nmz0tTV8CLeBwpC1ylT
+w5+jYzBhMB0GA1UdDgQWBBSSPtUyuGF0HFuucyfFfWwWMAnF9jAfBgNVHSMEGDAW
+gBSSPtUyuGF0HFuucyfFfWwWMAnF9jAPBgNVHRMBAf8EBTADAQH/MA4GA1UdDwEB
+/wQEAwIBBjAKBggqhkjOPQQDAgNIADBFAiBdgpWahGxpRC1q2faCmxuAnK4Q6CMp
+cMybM4fhdKqhiQIhAM07skDAKqviL8mkZY6XDnHlFdpqnAXBXVHKrsDQTMz/
-----END CERTIFICATE-----
diff --git a/.docker/clickhouse/single_node_tls/certificates/client.crt b/.docker/clickhouse/single_node_tls/certificates/client.crt
index 366e1985..97ba6db0 100644
--- a/.docker/clickhouse/single_node_tls/certificates/client.crt
+++ b/.docker/clickhouse/single_node_tls/certificates/client.crt
@@ -1,12 +1,15 @@
-----BEGIN CERTIFICATE-----
-MIIBuDCCAV8CCQCvYwZhuT/WEjAKBggqhkjOPQQDAjBqMQswCQYDVQQGEwJVUzER
-MA8GA1UECAwIQ29sb3JhZG8xDzANBgNVBAcMBkRlbnZlcjEYMBYGA1UECgwPQ2xp
-Y2tIb3VzZSBJbmMuMR0wGwYDVQQDDBRjbGlja2hvdXNlX3Rlc3Rfcm9vdDAeFw0y
-MzA0MjYyMjAzMjZaFw00MzA0MjYyMjAzMjZaMF8xCzAJBgNVBAYTAlVTMREwDwYD
-VQQIDAhDb2xvcmFkbzEPMA0GA1UEBwwGRGVudmVyMRgwFgYDVQQKDA9DbGlja0hv
-dXNlIEluYy4xEjAQBgNVBAMMCWNlcnRfdXNlcjBZMBMGByqGSM49AgEGCCqGSM49
-AwEHA0IABIEhqR0FcbBp0ZdQ6t9c9+rxRVS8TZXlPY2kGlFMkW5AY8/Y05L1q7Cx
-mJiwZl6+4U/j8m0EhtVREywb1PENR20wCgYIKoZIzj0EAwIDRwAwRAIgRp0AWMOq
-OA8lJTd1h2GrAWDMpiNamMUvLyksxLq5SrgCIA5AwncaSEqGHboq1zHMj0Qnqnua
-JQJAbhcsh4sxk8AY
+MIICQzCCAemgAwIBAgIUeggQ6+OCjtT3i7jASzwA1qfdDn0wCgYIKoZIzj0EAwIw
+ajELMAkGA1UEBhMCVVMxETAPBgNVBAgMCENvbG9yYWRvMQ8wDQYDVQQHDAZEZW52
+ZXIxGDAWBgNVBAoMD0NsaWNrSG91c2UgSW5jLjEdMBsGA1UEAwwUY2xpY2tob3Vz
+ZV90ZXN0X3Jvb3QwHhcNMjUwNDExMTk1MjMyWhcNNDUwNDExMTk1MjMyWjBfMQsw
+CQYDVQQGEwJVUzERMA8GA1UECAwIQ29sb3JhZG8xDzANBgNVBAcMBkRlbnZlcjEY
+MBYGA1UECgwPQ2xpY2tIb3VzZSBJbmMuMRIwEAYDVQQDDAljZXJ0X3VzZXIwWTAT
+BgcqhkjOPQIBBggqhkjOPQMBBwNCAASBIakdBXGwadGXUOrfXPfq8UVUvE2V5T2N
+pBpRTJFuQGPP2NOS9auwsZiYsGZevuFP4/JtBIbVURMsG9TxDUdto3gwdjAdBgNV
+HQ4EFgQUJuFP4dlFGBW3wK6vUkqvSxaLMhswHwYDVR0jBBgwFoAUkj7VMrhhdBxb
+rnMnxX1sFjAJxfYwDAYDVR0TAQH/BAIwADAOBgNVHQ8BAf8EBAMCBaAwFgYDVR0l
+AQH/BAwwCgYIKwYBBQUHAwIwCgYIKoZIzj0EAwIDSAAwRQIgVrbKF3pqkvivLjhz
+uhMREwZtkK5jcQboVmHVtKQpkWACIQDYiwq+e8x/CdFdTiZwGrfliPy/pfBSvPSD
+sIRougm0nA==
-----END CERTIFICATE-----
diff --git a/.docker/clickhouse/single_node_tls/certificates/server.crt b/.docker/clickhouse/single_node_tls/certificates/server.crt
index 1adb2502..7e40825a 100644
--- a/.docker/clickhouse/single_node_tls/certificates/server.crt
+++ b/.docker/clickhouse/single_node_tls/certificates/server.crt
@@ -1,17 +1,15 @@
-----BEGIN CERTIFICATE-----
-MIICrjCCAlSgAwIBAgIJAK9jBmG5P9YRMAoGCCqGSM49BAMCMGoxCzAJBgNVBAYT
-AlVTMREwDwYDVQQIDAhDb2xvcmFkbzEPMA0GA1UEBwwGRGVudmVyMRgwFgYDVQQK
-DA9DbGlja0hvdXNlIEluYy4xHTAbBgNVBAMMFGNsaWNraG91c2VfdGVzdF9yb290
-MB4XDTIzMDQyNjIxNTAxOVoXDTQzMDQyNjIxNTAxOVowbTELMAkGA1UEBhMCVVMx
-ETAPBgNVBAgMCENvbG9yYWRvMQ8wDQYDVQQHDAZEZW52ZXIxGDAWBgNVBAoMD0Ns
-aWNrSG91c2UgSW5jLjEgMB4GA1UEAwwXc2VydmVyMS5jbGlja2hvdXNlLnRlc3Qw
-WTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAARhjivoy18D47i18Jqg6m9yI17ndMWA
-kuyPhXFLgW1PpU2wk3DvpUbkKUxUPlKsNwuHEKJ4kcparrrwWGxKT2Dmo4HfMIHc
-MIGEBgNVHSMEfTB7oW6kbDBqMQswCQYDVQQGEwJVUzERMA8GA1UECAwIQ29sb3Jh
-ZG8xDzANBgNVBAcMBkRlbnZlcjEYMBYGA1UECgwPQ2xpY2tIb3VzZSBJbmMuMR0w
-GwYDVQQDDBRjbGlja2hvdXNlX3Rlc3Rfcm9vdIIJAILsPP0XfqtxMAkGA1UdEwQC
-MAAwCwYDVR0PBAQDAgTwMDsGA1UdEQQ0MDKCF3NlcnZlcjEuY2xpY2tob3VzZS50
-ZXN0ghdzZXJ2ZXIyLmNsaWNraG91c2UudGVzdDAKBggqhkjOPQQDAgNIADBFAiBM
-71Vx9q964BRd9+N0zpbax+N+jWFJQfkOic4wlsPZ7QIhAPBU9Kfbi3Iwy3XwWBOv
-YZsvoFRxUfG2RRRlz5cGgKIa
+MIICZjCCAg2gAwIBAgIUeggQ6+OCjtT3i7jASzwA1qfdDnswCgYIKoZIzj0EAwIw
+ajELMAkGA1UEBhMCVVMxETAPBgNVBAgMCENvbG9yYWRvMQ8wDQYDVQQHDAZEZW52
+ZXIxGDAWBgNVBAoMD0NsaWNrSG91c2UgSW5jLjEdMBsGA1UEAwwUY2xpY2tob3Vz
+ZV90ZXN0X3Jvb3QwHhcNMjUwNDExMTkzNTE0WhcNNDUwNDExMTkzNTE0WjBtMQsw
+CQYDVQQGEwJVUzERMA8GA1UECAwIQ29sb3JhZG8xDzANBgNVBAcMBkRlbnZlcjEY
+MBYGA1UECgwPQ2xpY2tIb3VzZSBJbmMuMSAwHgYDVQQDDBdjbGlja2hvdXNlX3Rl
+c3Rfc2VydmVyMTBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABGGOK+jLXwPjuLXw
+mqDqb3IjXud0xYCS7I+FcUuBbU+lTbCTcO+lRuQpTFQ+Uqw3C4cQoniRylquuvBY
+bEpPYOajgY0wgYowHQYDVR0OBBYEFMT7NvpCkmSa2HYEyql/pUCxdkWQMB8GA1Ud
+IwQYMBaAFJI+1TK4YXQcW65zJ8V9bBYwCcX2MAwGA1UdEwEB/wQCMAAwIgYDVR0R
+BBswGYIXc2VydmVyMS5jbGlja2hvdXNlLnRlc3QwFgYDVR0lAQH/BAwwCgYIKwYB
+BQUHAwEwCgYIKoZIzj0EAwIDRwAwRAIgDUXjls0mpQwTOJyw9zy0zOA0kfU+fldI
+S4qsQwKhpmECID2eUcgU2zv0koUcE1M6UyVzQrJfJviUR48bh8rgkykg
-----END CERTIFICATE-----
diff --git a/.docker/clickhouse/single_node_tls/config.xml b/.docker/clickhouse/single_node_tls/config.xml
index d25ffbaa..4ff6cc01 100644
--- a/.docker/clickhouse/single_node_tls/config.xml
+++ b/.docker/clickhouse/single_node_tls/config.xml
@@ -1,11 +1,6 @@
- 8443
- 9440
- 0.0.0.0
-
- users.xml
default
default
@@ -15,9 +10,10 @@
/var/lib/clickhouse/tmp/
/var/lib/clickhouse/user_files/
/var/lib/clickhouse/access/
+ /var/lib/clickhouse/format_schemas/
- debug
+ warning
/var/log/clickhouse-server/clickhouse-server.log
/var/log/clickhouse-server/clickhouse-server.err.log
1000M
@@ -45,4 +41,10 @@
SQL_
+
+
+
+ users.xml
+
+
diff --git a/.docker/clickhouse/single_node_tls/docker_related_config.xml b/.docker/clickhouse/single_node_tls/docker_related_config.xml
new file mode 100644
index 00000000..80d38d81
--- /dev/null
+++ b/.docker/clickhouse/single_node_tls/docker_related_config.xml
@@ -0,0 +1,5 @@
+
+ 0.0.0.0
+ 8443
+ 9440
+
\ No newline at end of file
diff --git a/.github/workflows/on_push.yml b/.github/workflows/on_push.yml
index 3f20a155..f33bce0e 100644
--- a/.github/workflows/on_push.yml
+++ b/.github/workflows/on_push.yml
@@ -1,33 +1,33 @@
-name: 'Lint and Test'
+name: "Lint and Test"
on:
pull_request:
branches:
- timeplus
paths-ignore:
- - 'VERSION'
- - 'LICENSE'
- - '**.md'
- - 'examples'
- - 'publish.yaml'
- - '.github/workflows/clickhouse_ci.yml'
- - '.github/workflows/on_push.yml'
+ - "VERSION"
+ - "LICENSE"
+ - "**.md"
+ - "examples"
+ - "publish.yaml"
+ - ".github/workflows/clickhouse_ci.yml"
+ - ".github/workflows/on_push.yml"
workflow_dispatch:
push:
branches-ignore:
- - '*_test'
- - '*_dev'
- - '*_build'
- - 'release_*'
+ - "*_test"
+ - "*_dev"
+ - "*_build"
+ - "release_*"
- timeplus
paths-ignore:
- - 'VERSION'
- - 'LICENSE'
- - '**.md'
- - 'examples'
- - 'publish.yaml'
- - '.github/workflows/clickhouse_ci.yml'
- - '.github/workflows/on_push.yml'
+ - "VERSION"
+ - "LICENSE"
+ - "**.md"
+ - "examples"
+ - "publish.yaml"
+ - ".github/workflows/clickhouse_ci.yml"
+ - ".github/workflows/on_push.yml"
jobs:
lint:
@@ -59,11 +59,11 @@ jobs:
strategy:
matrix:
python-version:
- - '3.9'
- - '3.10'
- - '3.11'
- - '3.12'
- - '3.13'
+ - "3.9"
+ - "3.10"
+ - "3.11"
+ - "3.12"
+ - "3.13"
timeplus-version:
- latest
@@ -76,19 +76,19 @@ jobs:
env:
TIMEPLUS_CONNECT_TEST_TP_VERSION: ${{ matrix.timeplus-version }}
with:
- compose-file: 'docker-compose.yml'
- down-flags: '--volumes'
+ compose-file: "docker-compose.yml"
+ down-flags: "--volumes"
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install pip
- run: python -m pip install --upgrade pip
+ run: python -m pip install --upgrade pip
- name: Install Test Dependencies
run: pip install -r tests/test_requirements.txt
- name: Build cython extensions
run: python setup.py build_ext --inplace
- - name: "Add distribution info" # This lets SQLAlchemy find entry points
+ - name: "Add distribution info" # This lets SQLAlchemy find entry points
run: python setup.py develop
- name: Add ClickHouse TLS instance to /etc/hosts
run: |
@@ -97,7 +97,7 @@ jobs:
- name: Run tests
env:
# CLICKHOUSE_CONNECT_TEST_TLS: 1
- CLICKHOUSE_CONNECT_TEST_DOCKER: 'False'
+ CLICKHOUSE_CONNECT_TEST_DOCKER: "False"
CLICKHOUSE_CONNECT_TEST_FUZZ: 50
SQLALCHEMY_SILENCE_UBER_WARNING: 1
run: pytest tests
@@ -113,41 +113,3 @@ jobs:
CLOUD_HOST: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_HOST }}
if: "${{ env.CLOUD_HOST != '' }}"
run: echo "HAS_SECRETS=true" >> $GITHUB_OUTPUT
-
- # cloud-tests:
- # runs-on: ubuntu-latest
- # name: Cloud Tests Py=${{ matrix.python-version }}
- # needs: check-secret
- # if: needs.check-secret.outputs.has_secrets == 'true'
-
- # strategy:
- # matrix:
- # python-version:
- # - '3.10'
- # - '3.11'
-
- # steps:
- # - name: Checkout
- # uses: actions/checkout@v4
- # - name: Set up Python ${{ matrix.python-version }}
- # uses: actions/setup-python@v5
- # with:
- # python-version: ${{ matrix.python-version }}
- # - name: Install dependencies
- # run: |
- # python -m pip install --upgrade pip
- # pip install -r tests/test_requirements.txt
- # - name: Build cython extensions
- # run: python setup.py build_ext --inplace
- # - name: "Add distribution info" # This lets SQLAlchemy find entry points
- # run: python setup.py develop
- # - name: Run tests
- # env:
- # CLICKHOUSE_CONNECT_TEST_FUZZ: 10
- # CLICKHOUSE_CONNECT_TEST_CLOUD: 'True'
- # CLICKHOUSE_CONNECT_TEST_PORT: 8443
- # CLICKHOUSE_CONNECT_TEST_HOST: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_HOST_SMT }}
- # CLICKHOUSE_CONNECT_TEST_PASSWORD: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_PASSWORD_SMT }}
- # CLICKHOUSE_CONNECT_TEST_JWT_SECRET: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_JWT_PRIVATE_KEY }}
- # SQLALCHEMY_SILENCE_UBER_WARNING: 1
- # run: pytest tests/integration_tests
diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml
index b917514a..2878e91f 100644
--- a/.github/workflows/publish.yml
+++ b/.github/workflows/publish.yml
@@ -12,18 +12,18 @@ on:
- release
- build
env:
- CIBW_SKIP: 'cp36-* cp37-* pp37-*'
+ CIBW_SKIP: 'cp36-* cp37-* cp38-* pp37-*'
jobs:
build_x86_manylinux_wheels:
name: Build x86 manylinux wheels on Linux
runs-on: ubuntu-latest
env:
- CIBW_SKIP: 'cp36-* cp37-* pp37-* *-musllinux*'
+ CIBW_SKIP: 'cp36-* cp37-* pp37-* pp38-* *-musllinux*'
steps:
- uses: actions/checkout@v4
- name: Build wheels
- uses: pypa/cibuildwheel@v2.21.3
+ uses: pypa/cibuildwheel@v2.22.0
- uses: actions/upload-artifact@v4
with:
name: build-x86-manylinux
@@ -33,11 +33,11 @@ jobs:
name: Build x86 musllinux wheels on Linux
runs-on: ubuntu-latest
env:
- CIBW_SKIP: 'cp36-* cp37-* pp37-* *-manylinux*'
+ CIBW_SKIP: 'cp36-* cp37-* pp37-* pp38-* *-manylinux*'
steps:
- uses: actions/checkout@v4
- name: Build wheels
- uses: pypa/cibuildwheel@v2.21.3
+ uses: pypa/cibuildwheel@v2.22.0
- uses: actions/upload-artifact@v4
with:
name: build-x86-musllinux
@@ -56,7 +56,7 @@ jobs:
with:
platforms: all
- name: Build wheels
- uses: pypa/cibuildwheel@v2.21.3
+ uses: pypa/cibuildwheel@v2.22.0
- uses: actions/upload-artifact@v4
with:
name: build-aarch64-manylinux
@@ -75,7 +75,7 @@ jobs:
with:
platforms: all
- name: Build wheels
- uses: pypa/cibuildwheel@v2.21.3
+ uses: pypa/cibuildwheel@v2.22.0
- uses: actions/upload-artifact@v4
with:
name: build-aarch64-musllinux
@@ -86,7 +86,7 @@ jobs:
runs-on: ubuntu-latest
env:
CIBW_ARCHS_LINUX: aarch64
- CIBW_BUILD: 'pp*'
+ CIBW_BUILD: 'pp39-* pp310-*'
steps:
- uses: actions/checkout@v4
- name: Set up QEMU
@@ -94,7 +94,7 @@ jobs:
with:
platforms: all
- name: Build wheels
- uses: pypa/cibuildwheel@v2.21.3
+ uses: pypa/cibuildwheel@v2.22.0
- uses: actions/upload-artifact@v4
with:
name: build-aarch64-pypy
@@ -106,7 +106,7 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Build wheels
- uses: pypa/cibuildwheel@v2.21.3
+ uses: pypa/cibuildwheel@v2.22.0
env:
CIBW_ARCHS_MACOS: x86_64 arm64
- uses: actions/upload-artifact@v4
@@ -122,7 +122,7 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Build wheels
- uses: pypa/cibuildwheel@v2.21.3
+ uses: pypa/cibuildwheel@v2.22.0
- uses: actions/upload-artifact@v4
with:
name: build-windows
diff --git a/.github/workflows/timeplus_ci.yml b/.github/workflows/timeplus_ci.yml
index 1cbc43af..ca51abff 100644
--- a/.github/workflows/timeplus_ci.yml
+++ b/.github/workflows/timeplus_ci.yml
@@ -10,7 +10,7 @@ jobs:
runs-on: ubuntu-latest
name: Timeplus CI Tests
env:
- CLICKHOUSE_CONNECT_TEST_DOCKER: 'False'
+ CLICKHOUSE_CONNECT_TEST_DOCKER: "False"
CLICKHOUSE_CONNECT_TEST_FUZZ: 50
steps:
- name: Checkout
@@ -25,18 +25,9 @@ jobs:
run: pip install -r tests/test_requirements.txt
- name: Build cython extensions
run: python setup.py build_ext --inplace
- - name: "Add distribution info" # This lets SQLAlchemy find entry points
+ - name: "Add distribution info" # This lets SQLAlchemy find entry points
run: python setup.py develop
- # - name: run ClickHouse Cloud tests
- # env:
- # CLICKHOUSE_CONNECT_TEST_PORT: 3218
- # CLICKHOUSE_CONNECT_TEST_CLOUD: 'True'
- # CLICKHOUSE_CONNECT_TEST_HOST: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_HOST_SMT }}
- # CLICKHOUSE_CONNECT_TEST_PASSWORD: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_PASSWORD_SMT }}
- # CLICKHOUSE_CONNECT_TEST_JWT_SECRET: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_JWT_PRIVATE_KEY }}
- # run: pytest tests/integration_tests
-
- name: Run Timeplus Container (LATEST)
run: TIMEPLUS_CONNECT_TEST_TP_VERSION=latest docker compose up -d timeplus
- name: Run LATEST tests
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 3a6667ce..7785ca2a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,8 +1,12 @@
# ClickHouse Connect ChangeLog
+### WARNING -- Breaking change for AsyncClient close()
+The AsyncClient close() method is now async and should be called as an async function.
+
### WARNING -- Python 3.8 EOL
Python 3.8 was EOL on 2024-10-07. It is no longer tested, and versions after 2025-04-07 will not include Python
-3.8 wheel distributions.
+3.8 wheel distributions. As of version 0.8.15, wheels are not built for Python 3.8 AARCH64 versions due to
+missing dependencies in the build chain.
### WARNING -- JSON Incompatibility between versions 22.8 and 22.10
The internal serialization format for experimental JSON was updated in ClickHouse version 24.10. `clickhouse-connect`
@@ -17,6 +21,47 @@ release (0.9.0), unrecognized arguments/keywords for these methods of creating a
instead of being passed as ClickHouse server settings. This is in conjunction with some refactoring in Client construction.
The supported method of passing ClickHouse server settings is to prefix such arguments/query parameters with`ch_`.
+## 0.8.17, 2025-04-10
+
+### Improvements
+- The parameter `transport_settings` has been added to the Client query and insert methods. For the HTTP client (currently
+the only option), this dictionary of string is directly translated into additional HTTP headers at a query level. This can
+be used to provide additional proxy directives or other extra 'non-ClickHouse' information that is passed via headers.
+Thanks to [Paweł Szczur](https://github.com/orian) of PostHog for the original PR!
+- There was previously no way to add a path to the ClickHouse server host in cases where the ClickHouse server was
+behind a proxy that used path based routing (such as `https://big_proxy:8080/clickhouse). The new `proxy_path`
+`get_client` argument can now be used to set that path. Closes https://github.com/ClickHouse/clickhouse-connect/issues/486
+
+### Bug Fix
+- Version 0.8.16 introduced a bug where changing a Client setting value and then changing that setting value back to the
+ original server value would fail to restore the original setting. This has been fixed. Closes
+ https://github.com/ClickHouse/clickhouse-connect/issues/487
+
+## 0.8.16, 2025-03-28
+### Bug Fixes
+- Don't send a setting value if the setting is already correct according to the `system.settings` table.
+Closes https://github.com/ClickHouse/clickhouse-connect/issues/469
+- Ensure that the http `user_agent` header is in ascii. Note this could lead to an incorrectly encoded `os_user` if the
+os_user is not an Ascii string. Closes https://github.com/ClickHouse/clickhouse-connect/issues/484
+- Fix "cannot access local variable" exception where the http client encounters an unexpected streaming error. Also
+log that unexpected streaming error to assist debugging. Closes https://github.com/ClickHouse/clickhouse-connect/issues/483
+- Check that arrow/pandas is installed when calling `query_df` and `query_arrow` and raise a more meaningful exception
+if the required library is absent. Closes https://github.com/ClickHouse/clickhouse-connect/issues/477
+
+### Improvements
+- Some typing hints have been corrected. Thanks to [Avery Fischer](https://github.com/biggerfisch) for the PR!
+- The docker based tests have been fixed to work with security improvements in recent ClickHouse releases
+- Query string cleanup is now (in theory) microseconds faster. Thanks to [Sviatoslav Bobryshev](https://github.com/sbobryshev)
+for the optimization
+
+## 0.8.15, 2025-01-25
+### Bug Fix
+- The async client was not shutting down its associated executor thread pool, result in a memory leak if multiple
+async clients were created. Closes https://github.com/ClickHouse/clickhouse-connect/issues/424. Note that the `close`
+function for the async client is now async to cleanly close down the pool. The recommended way to use an async client
+is now within an AsyncContext. See the associated [PR](https://github.com/ClickHouse/clickhouse-connect/pull/457) for details.
+Thanks to ClickHouse core developer @pufit for the fix!
+
## 0.8.14, 2025-01-13
### Bug Fix
- Fix an edge case where a Pandas dataframe that contains _only_ Int64 (or smaller) values would cause an exception when
diff --git a/docker-compose.yml b/docker-compose.yml
index 8b59af15..211edbf2 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -1,30 +1,14 @@
services:
timeplus:
image: "d.timeplus.com/timeplus-io/proton:latest"
- container_name: 'timeplus-connect-timeplus-server'
+ container_name: "timeplus-connect-timeplus-server"
ports:
- - '8463:8463'
- - '8123:8123'
- - '3218:3218'
+ - "8463:8463"
+ - "8123:8123"
+ - "3218:3218"
ulimits:
nofile:
soft: 262144
hard: 262144
volumes:
- /mnt/timeplusd:/var/lib/timeplusd
-
- clickhouse_tls:
- build:
- context: ./
- dockerfile: .docker/clickhouse/single_node_tls/Dockerfile
- container_name: 'clickhouse-connect-clickhouse-server-tls'
- ports:
- - '10843:8443'
- - '10840:9440'
- ulimits:
- nofile:
- soft: 262144
- hard: 262144
- volumes:
- - './.docker/clickhouse/single_node_tls/config.xml:/etc/clickhouse-server/config.xml'
- - './.docker/clickhouse/single_node_tls/users.xml:/etc/clickhouse-server/users.xml'
diff --git a/examples/run_async.py b/examples/run_async.py
index a9c1a70c..6479999e 100644
--- a/examples/run_async.py
+++ b/examples/run_async.py
@@ -41,6 +41,7 @@ async def semaphore_wrapper(sm: asyncio.Semaphore, num: int):
semaphore = asyncio.Semaphore(SEMAPHORE)
await asyncio.gather(*[semaphore_wrapper(semaphore, num) for num in range(QUERIES)])
+ await client.close()
async def main():
diff --git a/tests/integration_tests/conftest.py b/tests/integration_tests/conftest.py
index 9270cff0..0c66114c 100644
--- a/tests/integration_tests/conftest.py
+++ b/tests/integration_tests/conftest.py
@@ -2,8 +2,9 @@
import os
import time
from subprocess import Popen, PIPE
-from typing import Iterator, NamedTuple, Sequence, Optional, Callable
+from typing import Iterator, NamedTuple, Sequence, Optional, Callable, AsyncContextManager
+import pytest_asyncio
from pytest import fixture
from timeplus_connect import common
@@ -126,9 +127,10 @@ def test_client_fixture(test_config: TestConfig, test_create_client: Callable) -
sys.stderr.write('Successfully stopped docker compose')
-@fixture(scope='session', autouse=True, name='test_async_client')
-def test_async_client_fixture(test_client: Client) -> Iterator[AsyncClient]:
- yield AsyncClient(client=test_client)
+@pytest_asyncio.fixture(scope='session', autouse=True, name='test_async_client')
+async def test_async_client_fixture(test_client: Client) -> AsyncContextManager[AsyncClient]:
+ async with AsyncClient(client=test_client) as client:
+ yield client
@fixture(scope='session', name='table_context')
diff --git a/tests/integration_tests/test_client.py b/tests/integration_tests/test_client.py
index b326b23c..10d571f9 100644
--- a/tests/integration_tests/test_client.py
+++ b/tests/integration_tests/test_client.py
@@ -41,6 +41,13 @@ def test_client_name(test_client: Client):
assert 'py/' in user_agent
+# def test_transport_settings(test_client: Client):
+# result = test_client.query('SELECT name,database FROM system.tables',
+# transport_settings={'X-Workload': 'ONLINE'})
+# assert result.column_names == ('name', 'database')
+# assert len(result.result_set) > 0
+
+
def test_none_database(test_client: Client):
old_db = test_client.database
test_db = test_client.command('select current_database()')
@@ -174,8 +181,10 @@ def test_error_decode(test_client: Client):
def test_command_as_query(test_client: Client):
+ # Test that non-SELECT and non-INSERT statements are treated as commands and
+ # just return the QueryResult metadata
result = test_client.query("SET count_distinct_implementation = 'uniq'")
- assert result.first_item['written_rows'] == 0
+ assert 'query_id' in result.first_item
def test_show_create(test_client: Client):
diff --git a/tests/integration_tests/test_dynamic.py b/tests/integration_tests/test_dynamic.py
index 59d1a0c8..0b8692c1 100644
--- a/tests/integration_tests/test_dynamic.py
+++ b/tests/integration_tests/test_dynamic.py
@@ -18,8 +18,8 @@ def test_variant(test_client: Client, table_context: Callable):
type_available(test_client, 'variant')
with table_context('basic_variants', [
'key int32',
- 'v1 Variant(uint64, string, array(uint64), )',
- 'v2 Variant(ipv4, Decimal(10, 2))']):
+ 'v1 variant(uint64, string, array(uint64), )',
+ 'v2 variant(ipv4, decimal(10, 2))']):
data = [[1, 58322, None],
[2, 'a string', 55.2],
[3, 'bef56f14-0870-4f82-a35e-9a47eff45a5b', 777.25],
@@ -37,9 +37,9 @@ def test_nested_variant(test_client: Client, table_context: Callable):
type_available(test_client, 'variant')
with table_context('nested_variants', [
'key int32',
- 'm1 map(string, Variant(string, uint128, Bool))',
- 't1 tuple(int64, Variant(Bool, string, int32))',
- 'a1 array(array(Variant(string, DateTime, float64)))',
+ 'm1 map(string, variant(string, uint128, bool))',
+ 't1 tuple(int64, variant(bool, string, int32))',
+ 'a1 array(array(variant(string, datetime, float64)))',
]):
data = [[1,
{'k1': 'string1', 'k2': 34782477743, 'k3':True},
@@ -64,7 +64,7 @@ def test_nested_variant(test_client: Client, table_context: Callable):
def test_dynamic_nested(test_client: Client, table_context: Callable):
type_available(test_client, 'dynamic')
with table_context('nested_dynamics', [
- 'm2 map(string, Dynamic)'
+ 'm2 map(string, dynamic)'
], order_by='()'):
data = [({'k4': 'string8', 'k5': 5000},)]
test_client.insert('nested_dynamics', data)
@@ -76,8 +76,8 @@ def test_dynamic(test_client: Client, table_context: Callable):
type_available(test_client, 'dynamic')
with table_context('basic_dynamic', [
'key uint64',
- 'v1 Dynamic',
- 'v2 Dynamic']):
+ 'v1 dynamic',
+ 'v2 dynamic']):
data = [[1, 58322, 15.5],
[3, 'bef56f14-0870-4f82-a35e-9a47eff45a5b', 777.25],
[2, 'a string', 55.2],
@@ -94,8 +94,8 @@ def test_basic_json(test_client: Client, table_context: Callable):
type_available(test_client, 'json')
with table_context('new_json_basic', [
'key int32',
- 'value JSON',
- "null_value JSON"
+ 'value json',
+ "null_value json"
]):
jv3 = {'key3': 752, 'value.2': 'v2_rules', 'blank': None}
jv1 = {'key1': 337, 'value.2': 'vvvv', 'HKD@spéçiäl': 'Special K', 'blank': 'not_really_blank'}
@@ -122,7 +122,7 @@ def test_basic_json(test_client: Client, table_context: Callable):
null_json3 = result[2][2]
assert null_json3['nk2']['space key'] == 'spacey'
- set_write_format('JSON', 'string')
+ set_write_format('json', 'string')
test_client.insert('new_json_basic', [[999, '{"key4": 283, "value.2": "str_value"}', '{"nk1":53}']])
result = test_client.query('SELECT value.key4, null_value.nk1 FROM new_json_basic ORDER BY key').result_set
assert result[3][0] == 283
@@ -133,7 +133,7 @@ def test_typed_json(test_client: Client, table_context: Callable):
type_available(test_client, 'json')
with table_context('new_json_typed', [
'key int32',
- 'value JSON(max_dynamic_paths=150, `a.b` DateTime64(3), SKIP a.c)'
+ 'value json(max_dynamic_paths=150, `a.b` datetime64(3), SKIP a.c)'
]):
v1 = '{"a":{"b":"2020-10-15T10:15:44.877", "c":"skip_me"}}'
test_client.insert('new_json_typed', [[1, v1]])
@@ -148,10 +148,21 @@ def test_complex_json(test_client: Client, table_context: Callable):
pytest.skip('Complex JSON broken before 24.10')
with table_context('new_json_complex', [
'key int32',
- 'value tuple(t JSON)'
+ 'value tuple(t json)'
]):
data = [[100, ({'a': 'qwe123', 'b': 'main', 'c': None},)]]
test_client.insert('new_json_complex', data)
result = test_client.query('SELECT * except _tp_time FROM new_json_complex ORDER BY key')
json1 = result.result_set[0][1]
assert json1['t']['a'] == 'qwe123'
+
+
+def test_json_str_time(test_client: Client):
+ if not test_client.min_version('2.9'):
+ pytest.skip('JSON string/numbers bug before 2.9, skipping')
+ result = test_client.query("SELECT '{\"timerange\": \"2025-01-01T00:00:00+0000\"}'::json").result_set
+ assert result[0][0]['timerange'] == datetime.datetime(2025, 1, 1)
+
+ # The following query is broken -- looks like something to do with Nullable(String) in the Tuple
+ # result = test_client.query("SELECT'{\"k\": [123, \"xyz\"]}'::json",
+ # settings={'input_format_json_read_numbers_as_strings': 0}).result_set
diff --git a/tests/integration_tests/test_jwt_auth.py b/tests/integration_tests/test_jwt_auth.py
index 678f128d..15466b96 100644
--- a/tests/integration_tests/test_jwt_auth.py
+++ b/tests/integration_tests/test_jwt_auth.py
@@ -1,8 +1,5 @@
-from datetime import datetime, timezone, timedelta
from os import environ
-# pylint: disable=no-member
-import jwt
import pytest
from timeplus_connect.driver import create_client, ProgrammingError, create_async_client
@@ -158,12 +155,4 @@ def make_access_token():
secret = environ.get(JWT_SECRET_ENV_KEY)
if not secret:
raise ValueError(f'{JWT_SECRET_ENV_KEY} environment variable is not set')
- payload = {
- 'iss': 'ClickHouse',
- 'sub': 'CI_Test',
- 'aud': '1f7f78b8-da67-480b-8913-726fdd31d2fc',
- 'clickhouse:roles': ['default'],
- 'clickhouse:grants': [],
- 'exp': datetime.now(tz=timezone.utc) + timedelta(minutes=15)
- }
- return jwt.encode(payload, secret, algorithm='RS256')
+ return secret
diff --git a/tests/integration_tests/test_session_id.py b/tests/integration_tests/test_session_id.py
index 9a0350eb..7781960b 100644
--- a/tests/integration_tests/test_session_id.py
+++ b/tests/integration_tests/test_session_id.py
@@ -46,7 +46,7 @@ async def test_async_client_default_session_id(test_config: TestConfig):
user=test_config.username,
password=test_config.password)
assert async_client.get_client_setting(SESSION_KEY) is None
- async_client.close()
+ await async_client.close()
@pytest.mark.asyncio
@@ -62,7 +62,7 @@ async def test_async_client_autogenerate_session_id(test_config: TestConfig):
uuid.UUID(session_id)
except ValueError:
pytest.fail(f"Invalid session_id: {session_id}")
- async_client.close()
+ await async_client.close()
@pytest.mark.asyncio
@@ -75,4 +75,4 @@ async def test_async_client_custom_session_id(test_config: TestConfig):
password=test_config.password,
session_id=session_id)
assert async_client.get_client_setting(SESSION_KEY) == session_id
- async_client.close()
+ await async_client.close()
diff --git a/tests/integration_tests/test_sqlalchemy/test_basics.py b/tests/integration_tests/test_sqlalchemy/test_basics.py
index 084c0ed0..f52cb9f1 100644
--- a/tests/integration_tests/test_sqlalchemy/test_basics.py
+++ b/tests/integration_tests/test_sqlalchemy/test_basics.py
@@ -53,7 +53,7 @@ def test_execute(test_engine: Engine):
assert len(rows) == 2
rows = list(row for row in conn.execute('DROP STREAM IF EXISTS dummy_table'))
- assert rows[0][0] == 0
+ assert len(rows) > 0 # This is just the metadata from the "command" QueryResult
rows = list(row for row in conn.execute('DESCRIBE dummy'))
assert len(rows) == 3
diff --git a/timeplus_connect/__version__.py b/timeplus_connect/__version__.py
index 5e80f2e4..fdf6a632 100644
--- a/timeplus_connect/__version__.py
+++ b/timeplus_connect/__version__.py
@@ -1 +1 @@
-version = '0.8.16'
+version = '0.8.17'
diff --git a/timeplus_connect/cc_sqlalchemy/dialect.py b/timeplus_connect/cc_sqlalchemy/dialect.py
index dc5be9dc..c9bee988 100644
--- a/timeplus_connect/cc_sqlalchemy/dialect.py
+++ b/timeplus_connect/cc_sqlalchemy/dialect.py
@@ -1,6 +1,6 @@
+from sqlalchemy import text
from sqlalchemy.engine.default import DefaultDialect
-from sqlalchemy.sql import text
from timeplus_connect import dbapi
@@ -33,6 +33,11 @@ class TimeplusDialect(DefaultDialect):
ischema_names = ischema_names
inspector = TpInspector
+ # pylint: disable=method-hidden
+ @classmethod
+ def dbapi(cls):
+ return dbapi
+
@classmethod
def import_dbapi(cls):
return dbapi
@@ -47,8 +52,8 @@ def get_schema_names(connection, **_):
@staticmethod
def has_database(connection, db_name):
- return (connection.execute('SELECT name FROM system.databases ' +
- f'WHERE name = {format_str(db_name)}')).rowcount > 0
+ return (connection.execute(text('SELECT name FROM system.databases ' +
+ f'WHERE name = {format_str(db_name)}'))).rowcount > 0
def get_table_names(self, connection, schema=None, **kw):
cmd = text('SHOW STREAMS') # Wrap in text() to make it an executable SQLAlchemy statement
@@ -93,7 +98,7 @@ def get_check_constraints(self, connection, table_name, schema=None, **kw):
return []
def has_table(self, connection, table_name, schema=None, **_kw):
- result = connection.execute(f'EXISTS STREAM {full_table(table_name, schema)}')
+ result = connection.execute(text(f'EXISTS STREAM {full_table(table_name, schema)}'))
row = result.fetchone()
return row[0] == 1
diff --git a/timeplus_connect/common.py b/timeplus_connect/common.py
index 49437f34..bc796e8f 100644
--- a/timeplus_connect/common.py
+++ b/timeplus_connect/common.py
@@ -41,8 +41,9 @@ def build_client_name(client_name: str):
os_user = f'; os_user:{getpass.getuser()}'
except Exception: # pylint: disable=broad-except
pass
- return (f'{client_name}{product_name}timeplus-connect/{version()}' +
+ full_name = (f'{client_name}{product_name}timeplus-connect/{version()}' +
f' (lv:py/{py_version}; mode:sync; os:{sys.platform}{os_user})')
+ return full_name.encode('ascii', 'ignore').decode()
def get_setting(name: str):
diff --git a/timeplus_connect/datatypes/base.py b/timeplus_connect/datatypes/base.py
index dfff3300..fd89d503 100644
--- a/timeplus_connect/datatypes/base.py
+++ b/timeplus_connect/datatypes/base.py
@@ -356,7 +356,7 @@ def _active_null(self, ctx: QueryContext):
class UnsupportedType(TimeplusType, ABC, registered=False):
"""
- Base class for ClickHouse types that can't be serialized/deserialized into Python types.
+ Base class for Timeplus types that can't be serialized/deserialized into Python types.
Mostly useful just for DDL statements
"""
def __init__(self, type_def: TypeDef):
diff --git a/timeplus_connect/datatypes/registry.py b/timeplus_connect/datatypes/registry.py
index 5b0aa587..68e1ac19 100644
--- a/timeplus_connect/datatypes/registry.py
+++ b/timeplus_connect/datatypes/registry.py
@@ -38,9 +38,9 @@ def parse_name(name: str) -> Tuple[str, str, TypeDef]:
elif base.startswith('variant'):
keys, values = parse_columns(base[7:])
base = 'variant'
- elif base.startswith('JSON') and len(base) > 4 and base[4] == '(':
+ elif base.startswith('json') and len(base) > 4 and base[4] == '(':
keys, values = parse_columns(base[4:])
- base = 'JSON'
+ base = 'json'
# timeplusd doesn't support geometric type.
# elif base == 'Point':
# values = ('Float64', 'Float64')
diff --git a/timeplus_connect/driver/__init__.py b/timeplus_connect/driver/__init__.py
index 05e65d0a..97ce38b5 100644
--- a/timeplus_connect/driver/__init__.py
+++ b/timeplus_connect/driver/__init__.py
@@ -13,8 +13,8 @@
# pylint: disable=too-many-arguments,too-many-locals,too-many-branches
def create_client(*,
- host: str = None,
- username: str = None,
+ host: Optional[str] = None,
+ username: Optional[str] = None,
password: str = '',
access_token: Optional[str] = None,
database: str = 'default',
diff --git a/timeplus_connect/driver/asyncclient.py b/timeplus_connect/driver/asyncclient.py
index 53b889fd..639dea95 100644
--- a/timeplus_connect/driver/asyncclient.py
+++ b/timeplus_connect/driver/asyncclient.py
@@ -64,11 +64,12 @@ def min_version(self, version_str: str) -> bool:
"""
return self.client.min_version(version_str)
- def close(self):
+ async def close(self):
"""
Subclass implementation to close the connection to the server/deallocate the client
"""
self.client.close()
+ await asyncio.to_thread(self.executor.shutdown, True)
async def query(self,
query: Optional[str] = None,
@@ -84,7 +85,8 @@ async def query(self,
context: QueryContext = None,
query_tz: Optional[Union[str, tzinfo]] = None,
column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None,
- external_data: Optional[ExternalData] = None) -> QueryResult:
+ external_data: Optional[ExternalData] = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> QueryResult:
"""
Main query method for SELECT, DESCRIBE and other SQL statements that return a result matrix.
For parameters, see the create_query_context method.
@@ -96,7 +98,7 @@ def _query():
column_formats=column_formats, encoding=encoding, use_none=use_none,
column_oriented=column_oriented, use_numpy=use_numpy, max_str_len=max_str_len,
context=context, query_tz=query_tz, column_tzs=column_tzs,
- external_data=external_data)
+ external_data=external_data, transport_settings=transport_settings)
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(self.executor, _query)
@@ -113,7 +115,9 @@ async def query_column_block_stream(self,
context: QueryContext = None,
query_tz: Optional[Union[str, tzinfo]] = None,
column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None,
- external_data: Optional[ExternalData] = None) -> StreamContext:
+ external_data: Optional[ExternalData] = None,
+ transport_settings: Optional[Dict[str, str]] = None,
+ ) -> StreamContext:
"""
Variation of main query method that returns a stream of column oriented blocks.
For parameters, see the create_query_context method.
@@ -125,7 +129,7 @@ def _query_column_block_stream():
query_formats=query_formats, column_formats=column_formats,
encoding=encoding, use_none=use_none, context=context,
query_tz=query_tz, column_tzs=column_tzs,
- external_data=external_data)
+ external_data=external_data, transport_settings=transport_settings)
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(self.executor, _query_column_block_stream)
@@ -142,7 +146,8 @@ async def query_row_block_stream(self,
context: QueryContext = None,
query_tz: Optional[Union[str, tzinfo]] = None,
column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None,
- external_data: Optional[ExternalData] = None) -> StreamContext:
+ external_data: Optional[ExternalData] = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> StreamContext:
"""
Variation of main query method that returns a stream of row oriented blocks.
For parameters, see the create_query_context method.
@@ -154,7 +159,7 @@ def _query_row_block_stream():
query_formats=query_formats, column_formats=column_formats,
encoding=encoding, use_none=use_none, context=context,
query_tz=query_tz, column_tzs=column_tzs,
- external_data=external_data)
+ external_data=external_data, transport_settings=transport_settings)
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(self.executor, _query_row_block_stream)
@@ -171,7 +176,8 @@ async def query_rows_stream(self,
context: QueryContext = None,
query_tz: Optional[Union[str, tzinfo]] = None,
column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None,
- external_data: Optional[ExternalData] = None) -> StreamContext:
+ external_data: Optional[ExternalData] = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> StreamContext:
"""
Variation of main query method that returns a stream of row oriented blocks.
For parameters, see the create_query_context method.
@@ -183,7 +189,7 @@ def _query_rows_stream():
query_formats=query_formats, column_formats=column_formats,
encoding=encoding, use_none=use_none, context=context,
query_tz=query_tz, column_tzs=column_tzs,
- external_data=external_data)
+ external_data=external_data, transport_settings=transport_settings)
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(self.executor, _query_rows_stream)
@@ -195,7 +201,8 @@ async def raw_query(self,
settings: Optional[Dict[str, Any]] = None,
fmt: str = None,
use_database: bool = True,
- external_data: Optional[ExternalData] = None) -> bytes:
+ external_data: Optional[ExternalData] = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> bytes:
"""
Query method that simply returns the raw ClickHouse format bytes.
:param query: Query statement/format string
@@ -205,12 +212,14 @@ async def raw_query(self,
:param use_database Send the database parameter to ClickHouse so the command will be executed in the client
database context
:param external_data External data to send with the query
+ :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.)
:return: bytes representing raw ClickHouse return value based on format
"""
def _raw_query():
return self.client.raw_query(query=query, parameters=parameters, settings=settings, fmt=fmt,
- use_database=use_database, external_data=external_data)
+ use_database=use_database, external_data=external_data,
+ transport_settings=transport_settings)
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(self.executor, _raw_query)
@@ -221,7 +230,8 @@ async def raw_stream(self, query: str,
settings: Optional[Dict[str, Any]] = None,
fmt: str = None,
use_database: bool = True,
- external_data: Optional[ExternalData] = None) -> io.IOBase:
+ external_data: Optional[ExternalData] = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> io.IOBase:
"""
Query method that returns the result as an io.IOBase iterator.
:param query: Query statement/format string
@@ -231,12 +241,13 @@ async def raw_stream(self, query: str,
:param use_database Send the database parameter to ClickHouse so the command will be executed in the client
database context
:param external_data External data to send with the query
+ :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.)
:return: io.IOBase stream/iterator for the result
"""
def _raw_stream():
return self.client.raw_stream(query=query, parameters=parameters, settings=settings, fmt=fmt,
- use_database=use_database, external_data=external_data)
+ use_database=use_database, external_data=external_data, transport_settings=transport_settings)
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(self.executor, _raw_stream)
@@ -252,7 +263,8 @@ async def query_np(self,
use_none: Optional[bool] = None,
max_str_len: Optional[int] = None,
context: QueryContext = None,
- external_data: Optional[ExternalData] = None):
+ external_data: Optional[ExternalData] = None,
+ transport_settings: Optional[Dict[str, str]] = None):
"""
Query method that returns the results as a numpy array.
For parameter values, see the create_query_context method.
@@ -263,7 +275,7 @@ def _query_np():
return self.client.query_np(query=query, parameters=parameters, settings=settings,
query_formats=query_formats, column_formats=column_formats, encoding=encoding,
use_none=use_none, max_str_len=max_str_len, context=context,
- external_data=external_data)
+ external_data=external_data, transport_settings=transport_settings)
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(self.executor, _query_np)
@@ -279,7 +291,8 @@ async def query_np_stream(self,
use_none: Optional[bool] = None,
max_str_len: Optional[int] = None,
context: QueryContext = None,
- external_data: Optional[ExternalData] = None) -> StreamContext:
+ external_data: Optional[ExternalData] = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> StreamContext:
"""
Query method that returns the results as a stream of numpy arrays.
For parameter values, see the create_query_context method.
@@ -290,7 +303,7 @@ def _query_np_stream():
return self.client.query_np_stream(query=query, parameters=parameters, settings=settings,
query_formats=query_formats, column_formats=column_formats,
encoding=encoding, use_none=use_none, max_str_len=max_str_len,
- context=context, external_data=external_data)
+ context=context, external_data=external_data, transport_settings=transport_settings)
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(self.executor, _query_np_stream)
@@ -310,7 +323,8 @@ async def query_df(self,
column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None,
context: QueryContext = None,
external_data: Optional[ExternalData] = None,
- use_extended_dtypes: Optional[bool] = None):
+ use_extended_dtypes: Optional[bool] = None,
+ transport_settings: Optional[Dict[str, str]] = None):
"""
Query method that results the results as a pandas dataframe.
For parameter values, see the create_query_context method.
@@ -322,7 +336,8 @@ def _query_df():
query_formats=query_formats, column_formats=column_formats, encoding=encoding,
use_none=use_none, max_str_len=max_str_len, use_na_values=use_na_values,
query_tz=query_tz, column_tzs=column_tzs, context=context,
- external_data=external_data, use_extended_dtypes=use_extended_dtypes)
+ external_data=external_data, use_extended_dtypes=use_extended_dtypes,
+ transport_settings=transport_settings)
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(self.executor, _query_df)
@@ -342,7 +357,8 @@ async def query_df_stream(self,
column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None,
context: QueryContext = None,
external_data: Optional[ExternalData] = None,
- use_extended_dtypes: Optional[bool] = None) -> StreamContext:
+ use_extended_dtypes: Optional[bool] = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> StreamContext:
"""
Query method that returns the results as a StreamContext.
For parameter values, see the create_query_context method.
@@ -355,7 +371,8 @@ def _query_df_stream():
encoding=encoding,
use_none=use_none, max_str_len=max_str_len, use_na_values=use_na_values,
query_tz=query_tz, column_tzs=column_tzs, context=context,
- external_data=external_data, use_extended_dtypes=use_extended_dtypes)
+ external_data=external_data, use_extended_dtypes=use_extended_dtypes,
+ transport_settings=transport_settings)
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(self.executor, _query_df_stream)
@@ -379,7 +396,8 @@ def create_query_context(self,
streaming: bool = False,
as_pandas: bool = False,
external_data: Optional[ExternalData] = None,
- use_extended_dtypes: Optional[bool] = None) -> QueryContext:
+ use_extended_dtypes: Optional[bool] = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> QueryContext:
"""
Creates or updates a reusable QueryContext object
:param query: Query statement/format string
@@ -409,6 +427,7 @@ def create_query_context(self,
:param use_extended_dtypes: Only relevant to Pandas Dataframe queries. Use Pandas "missing types", such as
pandas.NA and pandas.NaT for ClickHouse NULL values, as well as extended Pandas dtypes such as IntegerArray
and StringArray. Defaulted to True for query_df methods
+ :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.)
:return: Reusable QueryContext
"""
@@ -421,14 +440,16 @@ def create_query_context(self,
use_na_values=use_na_values,
streaming=streaming, as_pandas=as_pandas,
external_data=external_data,
- use_extended_dtypes=use_extended_dtypes)
+ use_extended_dtypes=use_extended_dtypes,
+ transport_settings=transport_settings)
async def query_arrow(self,
query: str,
parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
settings: Optional[Dict[str, Any]] = None,
use_strings: Optional[bool] = None,
- external_data: Optional[ExternalData] = None):
+ external_data: Optional[ExternalData] = None,
+ transport_settings: Optional[Dict[str, str]] = None):
"""
Query method using the ClickHouse Arrow format to return a PyArrow table
:param query: Query statement/format string
@@ -436,12 +457,14 @@ async def query_arrow(self,
:param settings: Optional dictionary of ClickHouse settings (key/string values)
:param use_strings: Convert ClickHouse String type to Arrow string type (instead of binary)
:param external_data ClickHouse "external data" to send with query
+ :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.)
:return: PyArrow.Table
"""
def _query_arrow():
return self.client.query_arrow(query=query, parameters=parameters, settings=settings,
- use_strings=use_strings, external_data=external_data)
+ use_strings=use_strings, external_data=external_data,
+ transport_settings=transport_settings)
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(self.executor, _query_arrow)
@@ -452,7 +475,8 @@ async def query_arrow_stream(self,
parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
settings: Optional[Dict[str, Any]] = None,
use_strings: Optional[bool] = None,
- external_data: Optional[ExternalData] = None) -> StreamContext:
+ external_data: Optional[ExternalData] = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> StreamContext:
"""
Query method that returns the results as a stream of Arrow tables
:param query: Query statement/format string
@@ -460,12 +484,14 @@ async def query_arrow_stream(self,
:param settings: Optional dictionary of ClickHouse settings (key/string values)
:param use_strings: Convert ClickHouse String type to Arrow string type (instead of binary)
:param external_data ClickHouse "external data" to send with query
+ :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.)
:return: Generator that yields a PyArrow.Table for per block representing the result set
"""
def _query_arrow_stream():
return self.client.query_arrow_stream(query=query, parameters=parameters, settings=settings,
- use_strings=use_strings, external_data=external_data)
+ use_strings=use_strings, external_data=external_data,
+ transport_settings=transport_settings)
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(self.executor, _query_arrow_stream)
@@ -477,7 +503,8 @@ async def command(self,
data: Union[str, bytes] = None,
settings: Dict[str, Any] = None,
use_database: bool = True,
- external_data: Optional[ExternalData] = None) -> Union[str, int, Sequence[str], QuerySummary]:
+ external_data: Optional[ExternalData] = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> Union[str, int, Sequence[str], QuerySummary]:
"""
Client method that returns a single value instead of a result set
:param cmd: ClickHouse query/command as a python format string
@@ -488,13 +515,15 @@ async def command(self,
database context. Otherwise, no database will be specified with the command. This is useful for determining
the default user database
:param external_data ClickHouse "external data" to send with command/query
+ :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.)
:return: Decoded response from ClickHouse as either a string, int, or sequence of strings, or QuerySummary
if no data returned
"""
def _command():
return self.client.command(cmd=cmd, parameters=parameters, data=data, settings=settings,
- use_database=use_database, external_data=external_data)
+ use_database=use_database, external_data=external_data,
+ transport_settings=transport_settings)
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(self.executor, _command)
@@ -522,7 +551,8 @@ async def insert(self,
column_type_names: Sequence[str] = None,
column_oriented: bool = False,
settings: Optional[Dict[str, Any]] = None,
- context: InsertContext = None) -> QuerySummary:
+ context: InsertContext = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> QuerySummary:
"""
Method to insert multiple rows/data matrix of native Python objects. If context is specified arguments
other than data are ignored
@@ -539,13 +569,15 @@ async def insert(self,
:param settings: Optional dictionary of ClickHouse settings (key/string values)
:param context: Optional reusable insert context to allow repeated inserts into the same table with
different data batches
+ :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.)
:return: QuerySummary with summary information, throws exception if insert fails
"""
def _insert():
return self.client.insert(table=table, data=data, column_names=column_names, database=database,
column_types=column_types, column_type_names=column_type_names,
- column_oriented=column_oriented, settings=settings, context=context)
+ column_oriented=column_oriented, settings=settings, context=context,
+ transport_settings=transport_settings)
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(self.executor, _insert)
@@ -558,7 +590,8 @@ async def insert_df(self, table: str = None,
column_names: Optional[Sequence[str]] = None,
column_types: Sequence[TimeplusType] = None,
column_type_names: Sequence[str] = None,
- context: InsertContext = None) -> QuerySummary:
+ context: InsertContext = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> QuerySummary:
"""
Insert a pandas DataFrame into ClickHouse. If context is specified arguments other than df are ignored
:param table: ClickHouse table
@@ -573,6 +606,7 @@ async def insert_df(self, table: str = None,
retrieved from the server
:param context: Optional reusable insert context to allow repeated inserts into the same table with
different data batches
+ :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.)
:return: QuerySummary with summary information, throws exception if insert fails
"""
@@ -580,7 +614,7 @@ def _insert_df():
return self.client.insert_df(table=table, df=df, database=database, settings=settings,
column_names=column_names,
column_types=column_types, column_type_names=column_type_names,
- context=context)
+ context=context, transport_settings=transport_settings)
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(self.executor, _insert_df)
@@ -588,18 +622,21 @@ def _insert_df():
async def insert_arrow(self, table: str,
arrow_table, database: str = None,
- settings: Optional[Dict] = None) -> QuerySummary:
+ settings: Optional[Dict] = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> QuerySummary:
"""
Insert a PyArrow table DataFrame into ClickHouse using raw Arrow format
:param table: ClickHouse table
:param arrow_table: PyArrow Table object
:param database: Optional ClickHouse database
:param settings: Optional dictionary of ClickHouse settings (key/string values)
+ :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.)
:return: QuerySummary with summary information, throws exception if insert fails
"""
def _insert_arrow():
- return self.client.insert_arrow(table=table, arrow_table=arrow_table, database=database, settings=settings)
+ return self.client.insert_arrow(table=table, arrow_table=arrow_table, database=database,
+ settings=settings, transport_settings=transport_settings)
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(self.executor, _insert_arrow)
@@ -613,7 +650,8 @@ async def create_insert_context(self,
column_type_names: Sequence[str] = None,
column_oriented: bool = False,
settings: Optional[Dict[str, Any]] = None,
- data: Optional[Sequence[Sequence[Any]]] = None) -> InsertContext:
+ data: Optional[Sequence[Sequence[Any]]] = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> InsertContext:
"""
Builds a reusable insert context to hold state for a duration of an insert
:param table: Target table
@@ -627,13 +665,15 @@ async def create_insert_context(self,
:param column_oriented: If true the data is already "pivoted" in column form
:param settings: Optional dictionary of ClickHouse settings (key/string values)
:param data: Initial dataset for insert
- :return Reusable insert context
+ :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.)
+ :return: Reusable insert context
"""
def _create_insert_context():
return self.client.create_insert_context(table=table, column_names=column_names, database=database,
column_types=column_types, column_type_names=column_type_names,
- column_oriented=column_oriented, settings=settings, data=data)
+ column_oriented=column_oriented, settings=settings, data=data,
+ transport_settings=transport_settings)
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(self.executor, _create_insert_context)
@@ -658,7 +698,8 @@ async def raw_insert(self, table: str,
insert_block: Union[str, bytes, Generator[bytes, None, None], BinaryIO] = None,
settings: Optional[Dict] = None,
fmt: Optional[str] = None,
- compression: Optional[str] = None) -> QuerySummary:
+ compression: Optional[str] = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> QuerySummary:
"""
Insert data already formatted in a bytes object
:param table: Table name (whether qualified with the database name or not)
@@ -666,13 +707,21 @@ async def raw_insert(self, table: str,
:param insert_block: Binary or string data already in a recognized ClickHouse format
:param settings: Optional dictionary of ClickHouse settings (key/string values)
:param compression: Recognized ClickHouse `Accept-Encoding` header compression value
+ :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.)
:param fmt: Valid clickhouse format
"""
def _raw_insert():
return self.client.raw_insert(table=table, column_names=column_names, insert_block=insert_block,
- settings=settings, fmt=fmt, compression=compression)
+ settings=settings, fmt=fmt, compression=compression,
+ transport_settings=transport_settings)
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(self.executor, _raw_insert)
return result
+
+ async def __aenter__(self) -> "AsyncClient":
+ return self
+
+ async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
+ await self.close()
diff --git a/timeplus_connect/driver/binding.py b/timeplus_connect/driver/binding.py
index 1d611b23..e266ad75 100644
--- a/timeplus_connect/driver/binding.py
+++ b/timeplus_connect/driver/binding.py
@@ -40,8 +40,7 @@ def quote_identifier(identifier: str):
def finalize_query(query: str, parameters: Optional[Union[Sequence, Dict[str, Any]]],
server_tz: Optional[tzinfo] = None) -> str:
- while query.endswith(';'):
- query = query[:-1]
+ query = query.rstrip(";")
if not parameters:
return query
if hasattr(parameters, 'items'):
@@ -52,8 +51,7 @@ def finalize_query(query: str, parameters: Optional[Union[Sequence, Dict[str, An
# pylint: disable=too-many-locals,too-many-branches
def bind_query(query: str, parameters: Optional[Union[Sequence, Dict[str, Any]]],
server_tz: Optional[tzinfo] = None) -> Tuple[str, Dict[str, str]]:
- while query.endswith(';'):
- query = query[:-1]
+ query = query.rstrip(";")
if not parameters:
return query, {}
diff --git a/timeplus_connect/driver/client.py b/timeplus_connect/driver/client.py
index 3390707c..e2680907 100644
--- a/timeplus_connect/driver/client.py
+++ b/timeplus_connect/driver/client.py
@@ -19,6 +19,7 @@
from timeplus_connect.driver.exceptions import ProgrammingError, OperationalError
from timeplus_connect.driver.external import ExternalData
from timeplus_connect.driver.insert import InsertContext
+from timeplus_connect.driver.options import check_arrow, check_pandas, check_numpy
from timeplus_connect.driver.summary import QuerySummary
from timeplus_connect.driver.models import ColumnDef, SettingDef, SettingStatus
from timeplus_connect.driver.query import QueryResult, to_arrow, to_arrow_batches, QueryContext, arrow_buffer
@@ -68,7 +69,7 @@ def __init__(self,
self.uri = uri
self._init_common_settings(apply_server_timezone)
- def _init_common_settings(self, apply_server_timezone:Optional[Union[str, bool]] ):
+ def _init_common_settings(self, apply_server_timezone: Optional[Union[str, bool]]):
self.server_tz, dst_safe = pytz.UTC, True
self.server_version, server_tz = \
tuple(self.command('SELECT version(), timezone()', use_database=False))
@@ -122,8 +123,16 @@ def _validate_settings(self, settings: Optional[Dict[str, Any]]) -> Dict[str, st
return validated
def _validate_setting(self, key: str, value: Any, invalid_action: str) -> Optional[str]:
+ str_value = str(value)
+ if value is True:
+ str_value = '1'
+ elif value is False:
+ str_value = '0'
if key not in self.valid_transport_settings:
setting_def = self.server_settings.get(key)
+ current_setting = self.get_client_setting(key)
+ if setting_def and setting_def.value == str_value and (current_setting is None or current_setting == setting_def.value):
+ return None # don't send settings that are already the expected value
if setting_def is None or setting_def.readonly:
if key in self.optional_transport_settings:
return None
@@ -134,9 +143,7 @@ def _validate_setting(self, key: str, value: Any, invalid_action: str) -> Option
return None
else:
raise ProgrammingError(f'Setting {key} is unknown or readonly') from None
- if isinstance(value, bool):
- return '1' if value else '0'
- return str(value)
+ return str_value
def _setting_status(self, key: str) -> SettingStatus:
comp_setting = self.server_settings.get(key)
@@ -205,14 +212,15 @@ def query(self,
context: QueryContext = None,
query_tz: Optional[Union[str, tzinfo]] = None,
column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None,
- external_data: Optional[ExternalData] = None) -> QueryResult:
+ external_data: Optional[ExternalData] = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> QueryResult:
"""
Main query method for SELECT, DESCRIBE and other SQL statements that return a result matrix. For
parameters, see the create_query_context method
:return: QueryResult -- data and metadata from response
"""
if query and query.lower().strip().startswith('select __connect_version__'):
- return QueryResult([[f'ClickHouse Connect v.{version()} ⓒ ClickHouse Inc.']], None,
+ return QueryResult([[f'Timeplus Connect v.{version()} ⓒ Timeplus Inc.']], None,
('connect_version',), (get_from_name('string'),))
kwargs = locals().copy()
del kwargs['self']
@@ -221,7 +229,8 @@ def query(self,
response = self.command(query,
parameters=query_context.parameters,
settings=query_context.settings,
- external_data=query_context.external_data)
+ external_data=query_context.external_data,
+ transport_settings=query_context.transport_settings)
if isinstance(response, QuerySummary):
return response.as_query_result()
return QueryResult([response] if isinstance(response, list) else [[response]])
@@ -238,7 +247,8 @@ def query_column_block_stream(self,
context: QueryContext = None,
query_tz: Optional[Union[str, tzinfo]] = None,
column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None,
- external_data: Optional[ExternalData] = None) -> StreamContext:
+ external_data: Optional[ExternalData] = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> StreamContext:
"""
Variation of main query method that returns a stream of column oriented blocks. For
parameters, see the create_query_context method.
@@ -257,7 +267,8 @@ def query_row_block_stream(self,
context: QueryContext = None,
query_tz: Optional[Union[str, tzinfo]] = None,
column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None,
- external_data: Optional[ExternalData] = None) -> StreamContext:
+ external_data: Optional[ExternalData] = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> StreamContext:
"""
Variation of main query method that returns a stream of row oriented blocks. For
parameters, see the create_query_context method.
@@ -276,7 +287,8 @@ def query_rows_stream(self,
context: QueryContext = None,
query_tz: Optional[Union[str, tzinfo]] = None,
column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None,
- external_data: Optional[ExternalData] = None) -> StreamContext:
+ external_data: Optional[ExternalData] = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> StreamContext:
"""
Variation of main query method that returns a stream of row oriented blocks. For
parameters, see the create_query_context method.
@@ -290,16 +302,18 @@ def raw_query(self, query: str,
settings: Optional[Dict[str, Any]] = None,
fmt: str = None,
use_database: bool = True,
- external_data: Optional[ExternalData] = None) -> bytes:
+ external_data: Optional[ExternalData] = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> bytes:
"""
Query method that simply returns the raw ClickHouse format bytes
:param query: Query statement/format string
:param parameters: Optional dictionary used to format the query
:param settings: Optional dictionary of ClickHouse settings (key/string values)
:param fmt: ClickHouse output format
- :param use_database Send the database parameter to ClickHouse so the command will be executed in the client
+ :param use_database: Send the database parameter to ClickHouse so the command will be executed in the client
database context.
- :param external_data External data to send with the query
+ :param external_data: External data to send with the query
+ :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.)
:return: bytes representing raw ClickHouse return value based on format
"""
@@ -309,7 +323,8 @@ def raw_stream(self, query: str,
settings: Optional[Dict[str, Any]] = None,
fmt: str = None,
use_database: bool = True,
- external_data: Optional[ExternalData] = None) -> io.IOBase:
+ external_data: Optional[ExternalData] = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> io.IOBase:
"""
Query method that returns the result as an io.IOBase iterator
:param query: Query statement/format string
@@ -318,7 +333,8 @@ def raw_stream(self, query: str,
:param fmt: ClickHouse output format
:param use_database Send the database parameter to ClickHouse so the command will be executed in the client
database context.
- :param external_data External data to send with the query
+ :param external_data: External data to send with the query.
+ :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.)
:return: io.IOBase stream/iterator for the result
"""
@@ -333,12 +349,14 @@ def query_np(self,
use_none: Optional[bool] = None,
max_str_len: Optional[int] = None,
context: QueryContext = None,
- external_data: Optional[ExternalData] = None):
+ external_data: Optional[ExternalData] = None,
+ transport_settings: Optional[Dict[str, str]] = None):
"""
Query method that returns the results as a numpy array. For parameter values, see the
create_query_context method
:return: Numpy array representing the result set
"""
+ check_numpy()
return self._context_query(locals(), use_numpy=True).np_result
# pylint: disable=duplicate-code,too-many-arguments,unused-argument
@@ -352,12 +370,14 @@ def query_np_stream(self,
use_none: Optional[bool] = None,
max_str_len: Optional[int] = None,
context: QueryContext = None,
- external_data: Optional[ExternalData] = None) -> StreamContext:
+ external_data: Optional[ExternalData] = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> StreamContext:
"""
Query method that returns the results as a stream of numpy arrays. For parameter values, see the
create_query_context method
:return: Generator that yield a numpy array per block representing the result set
"""
+ check_numpy()
return self._context_query(locals(), use_numpy=True, streaming=True).np_stream
# pylint: disable=duplicate-code,unused-argument
@@ -375,12 +395,14 @@ def query_df(self,
column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None,
context: QueryContext = None,
external_data: Optional[ExternalData] = None,
- use_extended_dtypes: Optional[bool] = None):
+ use_extended_dtypes: Optional[bool] = None,
+ transport_settings: Optional[Dict[str, str]] = None):
"""
Query method that results the results as a pandas dataframe. For parameter values, see the
create_query_context method
:return: Pandas dataframe representing the result set
"""
+ check_pandas()
return self._context_query(locals(), use_numpy=True, as_pandas=True).df_result
# pylint: disable=duplicate-code,unused-argument
@@ -398,12 +420,14 @@ def query_df_stream(self,
column_tzs: Optional[Dict[str, Union[str, tzinfo]]] = None,
context: QueryContext = None,
external_data: Optional[ExternalData] = None,
- use_extended_dtypes: Optional[bool] = None) -> StreamContext:
+ use_extended_dtypes: Optional[bool] = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> StreamContext:
"""
Query method that returns the results as a StreamContext. For parameter values, see the
create_query_context method
:return: Generator that yields a Pandas dataframe per block representing the result set
"""
+ check_pandas()
return self._context_query(locals(), use_numpy=True,
as_pandas=True,
streaming=True).df_stream
@@ -426,7 +450,8 @@ def create_query_context(self,
streaming: bool = False,
as_pandas: bool = False,
external_data: Optional[ExternalData] = None,
- use_extended_dtypes: Optional[bool] = None) -> QueryContext:
+ use_extended_dtypes: Optional[bool] = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> QueryContext:
"""
Creates or updates a reusable QueryContext object
:param query: Query statement/format string
@@ -444,10 +469,10 @@ def create_query_context(self,
structured array even with ClickHouse variable length String columns. If 0, Numpy arrays for
String columns will always be object arrays
:param context: An existing QueryContext to be updated with any provided parameter values
- :param query_tz Either a string or a pytz tzinfo object. (Strings will be converted to tzinfo objects).
+ :param query_tz: Either a string or a pytz tzinfo object. (Strings will be converted to tzinfo objects).
Values for any DateTime or DateTime64 column in the query will be converted to Python datetime.datetime
objects with the selected timezone.
- :param column_tzs A dictionary of column names to tzinfo objects (or strings that will be converted to
+ :param column_tzs: A dictionary of column names to tzinfo objects (or strings that will be converted to
tzinfo objects). The timezone will be applied to datetime objects returned in the query
:param use_na_values: Deprecated alias for use_advanced_dtypes
:param as_pandas Return the result columns as pandas.Series objects
@@ -456,6 +481,7 @@ def create_query_context(self,
:param use_extended_dtypes: Only relevant to Pandas Dataframe queries. Use Pandas "missing types", such as
pandas.NA and pandas.NaT for ClickHouse NULL values, as well as extended Pandas dtypes such as IntegerArray
and StringArray. Defaulted to True for query_df methods
+ :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.)
:return: Reusable QueryContext
"""
if context:
@@ -475,7 +501,8 @@ def create_query_context(self,
as_pandas=as_pandas,
use_extended_dtypes=use_extended_dtypes,
streaming=streaming,
- external_data=external_data)
+ external_data=external_data,
+ transport_settings=transport_settings)
if use_numpy and max_str_len is None:
max_str_len = 0
if use_extended_dtypes is None:
@@ -499,51 +526,60 @@ def create_query_context(self,
as_pandas=as_pandas,
streaming=streaming,
apply_server_tz=self.apply_server_timezone,
- external_data=external_data)
+ external_data=external_data,
+ transport_settings=transport_settings)
def query_arrow(self,
query: str,
parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
settings: Optional[Dict[str, Any]] = None,
use_strings: Optional[bool] = None,
- external_data: Optional[ExternalData] = None):
+ external_data: Optional[ExternalData] = None,
+ transport_settings: Optional[Dict[str, str]] = None):
"""
Query method using the ClickHouse Arrow format to return a PyArrow table
:param query: Query statement/format string
:param parameters: Optional dictionary used to format the query
:param settings: Optional dictionary of ClickHouse settings (key/string values)
- :param use_strings: Convert ClickHouse String type to Arrow string type (instead of binary)
- :param external_data ClickHouse "external data" to send with query
+ :param use_strings: Convert ClickHouse String type to Arrow string type (instead of binary)
+ :param external_data: ClickHouse "external data" to send with query
+ :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.)
:return: PyArrow.Table
"""
+ check_arrow()
settings = self._update_arrow_settings(settings, use_strings)
return to_arrow(self.raw_query(query,
parameters,
settings,
fmt='Arrow',
- external_data=external_data))
+ external_data=external_data,
+ transport_settings=transport_settings))
def query_arrow_stream(self,
query: str,
parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
settings: Optional[Dict[str, Any]] = None,
use_strings: Optional[bool] = None,
- external_data: Optional[ExternalData] = None) -> StreamContext:
+ external_data: Optional[ExternalData] = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> StreamContext:
"""
Query method that returns the results as a stream of Arrow tables
:param query: Query statement/format string
:param parameters: Optional dictionary used to format the query
:param settings: Optional dictionary of ClickHouse settings (key/string values)
- :param use_strings: Convert ClickHouse String type to Arrow string type (instead of binary)
- :param external_data ClickHouse "external data" to send with query
+ :param use_strings: Convert ClickHouse String type to Arrow string type (instead of binary)
+ :param external_data: ClickHouse "external data" to send with query
+ :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.)
:return: Generator that yields a PyArrow.Table for per block representing the result set
"""
+ check_arrow()
settings = self._update_arrow_settings(settings, use_strings)
return to_arrow_batches(self.raw_stream(query,
parameters,
settings,
fmt='ArrowStream',
- external_data=external_data))
+ external_data=external_data,
+ transport_settings=transport_settings))
def _update_arrow_settings(self,
settings: Optional[Dict[str, Any]],
@@ -568,7 +604,8 @@ def command(self,
data: Union[str, bytes] = None,
settings: Dict[str, Any] = None,
use_database: bool = True,
- external_data: Optional[ExternalData] = None) -> Union[str, int, Sequence[str], QuerySummary]:
+ external_data: Optional[ExternalData] = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> Union[str, int, Sequence[str], QuerySummary]:
"""
Client method that returns a single value instead of a result set
:param cmd: ClickHouse query/command as a python format string
@@ -576,9 +613,10 @@ def command(self,
:param data: Optional 'data' for the command (for INSERT INTO in particular)
:param settings: Optional dictionary of ClickHouse settings (key/string values)
:param use_database: Send the database parameter to ClickHouse so the command will be executed in the client
- database context. Otherwise, no database will be specified with the command. This is useful for determining
+ database context. Otherwise, no database will be specified with the command. This is useful for determining
the default user database
- :param external_data ClickHouse "external data" to send with command/query
+ :param external_data: ClickHouse "external data" to send with command/query
+ :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.)
:return: Decoded response from ClickHouse as either a string, int, or sequence of strings, or QuerySummary
if no data returned
"""
@@ -599,7 +637,8 @@ def insert(self,
column_type_names: Sequence[str] = None,
column_oriented: bool = False,
settings: Optional[Dict[str, Any]] = None,
- context: InsertContext = None) -> QuerySummary:
+ context: InsertContext = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> QuerySummary:
"""
Method to insert multiple rows/data matrix of native Python objects. If context is specified arguments
other than data are ignored
@@ -616,6 +655,7 @@ def insert(self,
:param settings: Optional dictionary of ClickHouse settings (key/string values)
:param context: Optional reusable insert context to allow repeated inserts into the same table with
different data batches
+ :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.)
:return: QuerySummary with summary information, throws exception if insert fails
"""
if (context is None or context.empty) and data is None:
@@ -627,7 +667,8 @@ def insert(self,
column_types,
column_type_names,
column_oriented,
- settings)
+ settings,
+ transport_settings=transport_settings)
if data is not None:
if not context.empty:
raise ProgrammingError('Attempting to insert new data with non-empty insert context') from None
@@ -641,7 +682,8 @@ def insert_df(self, table: str = None,
column_names: Optional[Sequence[str]] = None,
column_types: Sequence[TimeplusType] = None,
column_type_names: Sequence[str] = None,
- context: InsertContext = None) -> QuerySummary:
+ context: InsertContext = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> QuerySummary:
"""
Insert a pandas DataFrame into ClickHouse. If context is specified arguments other than df are ignored
:param table: ClickHouse table
@@ -656,8 +698,10 @@ def insert_df(self, table: str = None,
retrieved from the server
:param context: Optional reusable insert context to allow repeated inserts into the same table with
different data batches
+ :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.)
:return: QuerySummary with summary information, throws exception if insert fails
"""
+ check_pandas()
if context is None:
if column_names is None:
column_names = df.columns
@@ -669,24 +713,28 @@ def insert_df(self, table: str = None,
database,
column_types=column_types,
column_type_names=column_type_names,
- settings=settings, context=context)
+ settings=settings,
+ transport_settings=transport_settings,
+ context=context)
def insert_arrow(self, table: str,
arrow_table,
database: str = None,
- settings: Optional[Dict] = None) -> QuerySummary:
+ settings: Optional[Dict] = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> QuerySummary:
"""
Insert a PyArrow table DataFrame into ClickHouse using raw Arrow format
:param table: ClickHouse table
:param arrow_table: PyArrow Table object
:param database: Optional ClickHouse database
:param settings: Optional dictionary of ClickHouse settings (key/string values)
- :return: QuerySummary with summary information, throws exception if insert fails
+ :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.)
"""
+ check_arrow()
full_table = table if '.' in table or not database else f'{database}.{table}'
compression = self.write_compression if self.write_compression in ('zstd', 'lz4') else None
column_names, insert_block = arrow_buffer(arrow_table, compression)
- return self.raw_insert(full_table, column_names, insert_block, settings, 'Arrow')
+ return self.raw_insert(full_table, column_names, insert_block, settings, 'Arrow', transport_settings)
def create_insert_context(self,
table: str,
@@ -696,7 +744,8 @@ def create_insert_context(self,
column_type_names: Sequence[str] = None,
column_oriented: bool = False,
settings: Optional[Dict[str, Any]] = None,
- data: Optional[Sequence[Sequence[Any]]] = None) -> InsertContext:
+ data: Optional[Sequence[Sequence[Any]]] = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> InsertContext:
"""
Builds a reusable insert context to hold state for a duration of an insert
:param table: Target table
@@ -710,7 +759,8 @@ def create_insert_context(self,
:param column_oriented: If true the data is already "pivoted" in column form
:param settings: Optional dictionary of ClickHouse settings (key/string values)
:param data: Initial dataset for insert
- :return Reusable insert context
+ :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.)
+ :return: Reusable insert context
"""
full_table = table
if '.' not in table:
@@ -746,6 +796,7 @@ def create_insert_context(self,
column_types,
column_oriented=column_oriented,
settings=settings,
+ transport_settings=transport_settings,
data=data)
def min_version(self, version_str: str) -> bool:
@@ -786,15 +837,17 @@ def raw_insert(self, table: str,
insert_block: Union[str, bytes, Generator[bytes, None, None], BinaryIO] = None,
settings: Optional[Dict] = None,
fmt: Optional[str] = None,
- compression: Optional[str] = None) -> QuerySummary:
+ compression: Optional[str] = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> QuerySummary:
"""
Insert data already formatted in a bytes object
:param table: Table name (whether qualified with the database name or not)
:param column_names: Sequence of column names
:param insert_block: Binary or string data already in a recognized ClickHouse format
:param settings: Optional dictionary of ClickHouse settings (key/string values)
- :param compression: Recognized ClickHouse `Accept-Encoding` header compression value
:param fmt: Valid clickhouse format
+ :param compression: Recognized ClickHouse `Accept-Encoding` header compression value
+ :param transport_settings: Optional dictionary of transport level settings (HTTP headers, etc.)
"""
@abstractmethod
diff --git a/timeplus_connect/driver/context.py b/timeplus_connect/driver/context.py
index 13276aa7..bf8f34c8 100644
--- a/timeplus_connect/driver/context.py
+++ b/timeplus_connect/driver/context.py
@@ -16,7 +16,8 @@ def __init__(self,
column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None,
encoding: Optional[str] = None,
use_extended_dtypes: bool = False,
- use_numpy: bool = False):
+ use_numpy: bool = False,
+ transport_settings: Optional[Dict[str, str]] = None):
self.settings = settings or {}
if query_formats is None:
self.type_formats = _empty_map
@@ -36,6 +37,7 @@ def __init__(self,
for type_name, fmt in fmt.items()}
self.query_formats = query_formats or {}
self.column_formats = column_formats or {}
+ self.transport_settings = transport_settings
self.column_name = None
self.encoding = encoding
self.use_numpy = use_numpy
diff --git a/timeplus_connect/driver/httpclient.py b/timeplus_connect/driver/httpclient.py
index d5ea36e1..9261e07d 100644
--- a/timeplus_connect/driver/httpclient.py
+++ b/timeplus_connect/driver/httpclient.py
@@ -74,12 +74,16 @@ def __init__(self,
apply_server_timezone: Optional[Union[str, bool]] = None,
show_clickhouse_errors: Optional[bool] = None,
autogenerate_session_id: Optional[bool] = None,
- tls_mode: Optional[str] = None):
+ tls_mode: Optional[str] = None,
+ proxy_path: str = ''):
"""
- Create an HTTP ClickHouse Connect client
+ Create an HTTP Timeplus Connect client
See timeplus_connect.get_client for parameters
"""
- self.url = f'{interface}://{host}:{port}'
+ proxy_path = proxy_path.lstrip('/')
+ if proxy_path:
+ proxy_path = '/' + proxy_path
+ self.url = f'{interface}://{host}:{port}{proxy_path}'
self.headers = {}
self.params = dict_copy(HttpClient.params)
ch_settings = dict_copy(settings, self.params)
@@ -212,7 +216,7 @@ def _query_with_context(self, context: QueryContext) -> QueryResult:
response = self._raw_request(f'{context.final_query}\n FORMAT JSON',
params, headers, retries=self.query_retries)
json_result = json.loads(response.data)
- # ClickHouse will respond with a JSON object of meta, data, and some other objects
+ # Timeplus will respond with a JSON object of meta, data, and some other objects
# We just grab the column names and column types from the metadata sub object
names: List[str] = []
types: List[TimeplusType] = []
@@ -237,7 +241,7 @@ def _query_with_context(self, context: QueryContext) -> QueryResult:
headers['Content-Type'] = 'text/plain; charset=utf-8'
response = self._raw_request(body,
params,
- headers,
+ dict_copy(headers, context.transport_settings),
stream=True,
retries=self.query_retries,
fields=fields,
@@ -275,7 +279,7 @@ def error_handler(resp: HTTPResponse):
if self.database:
params['database'] = self.database
params.update(self._validate_settings(context.settings))
-
+ headers = dict_copy(headers, context.transport_settings)
response = self._raw_request(block_gen, params, headers, error_handler=error_handler, server_wait=False)
logger.debug('Context insert response code: %d, content: %s', response.status, response.data)
context.data = None
@@ -286,7 +290,8 @@ def raw_insert(self, table: str = None,
insert_block: Union[str, bytes, Generator[bytes, None, None], BinaryIO] = None,
settings: Optional[Dict] = None,
fmt: Optional[str] = None,
- compression: Optional[str] = None) -> QuerySummary:
+ compression: Optional[str] = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> QuerySummary:
"""
See BaseClient doc_string for this method
"""
@@ -306,6 +311,7 @@ def raw_insert(self, table: str = None,
if self.database:
params['database'] = self.database
params.update(self._validate_settings(settings or {}))
+ headers = dict_copy(headers, transport_settings)
response = self._raw_request(insert_block, params, headers, server_wait=False)
logger.debug('Raw insert response code: %d, content: %s', response.status, response.data)
return QuerySummary(self._summary(response))
@@ -327,7 +333,8 @@ def command(self,
data: Union[str, bytes] = None,
settings: Optional[Dict] = None,
use_database: int = True,
- external_data: Optional[ExternalData] = None) -> Union[str, int, Sequence[str], QuerySummary]:
+ external_data: Optional[ExternalData] = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> Union[str, int, Sequence[str], QuerySummary]:
"""
See BaseClient doc_string for this method
"""
@@ -355,7 +362,7 @@ def command(self,
if use_database and self.database:
params['database'] = self.database
params.update(self._validate_settings(settings or {}))
-
+ headers = dict_copy(headers, transport_settings)
method = 'POST' if payload or fields else 'GET'
response = self._raw_request(payload, params, headers, method, fields=fields, server_wait=False)
if response.data:
@@ -407,16 +414,18 @@ def _raw_request(self,
data = data.encode()
headers = dict_copy(self.headers, headers)
attempts = 0
+ final_params = {}
if server_wait:
- params['wait_end_of_query'] = '1'
- # We can't actually read the progress headers, but we enable them so ClickHouse sends something
+ final_params['wait_end_of_query'] = '1'
+ # We can't actually read the progress headers, but we enable them so Timeplus sends something
# to keep the connection alive when waiting for long-running queries and (2) to get summary information
# if not streaming
if self._send_progress:
- params['send_progress_in_http_headers'] = '1'
+ final_params['send_progress_in_http_headers'] = '1'
if self._progress_interval:
- params['http_headers_progress_interval_ms'] = self._progress_interval
- final_params = dict_copy(self.params, params)
+ final_params['http_headers_progress_interval_ms'] = self._progress_interval
+ final_params = dict_copy(self.params, final_params)
+ final_params = dict_copy(final_params, params)
url = f'{self.url}?{urlencode(final_params)}'
kwargs = {
'headers': headers,
@@ -447,7 +456,7 @@ def _raw_request(self,
except HTTPError as ex:
if isinstance(ex.__context__, ConnectionResetError):
# The server closed the connection, probably because the Keep Alive has expired
- # We should be safe to retry, as ClickHouse should not have processed anything on a connection
+ # We should be safe to retry, as Timeplus should not have processed anything on a connection
# that it killed. We also only retry this once, as multiple disconnects are unlikely to be
# related to the Keep Alive settings
if attempts == 1:
@@ -475,24 +484,27 @@ def raw_query(self, query: str,
settings: Optional[Dict[str, Any]] = None,
fmt: str = None,
use_database: bool = True,
- external_data: Optional[ExternalData] = None) -> bytes:
+ external_data: Optional[ExternalData] = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> bytes:
"""
See BaseClient doc_string for this method
"""
body, params, fields = self._prep_raw_query(query, parameters, settings, fmt, use_database, external_data)
- return self._raw_request(body, params, fields=fields).data
+ return self._raw_request(body, params, fields=fields, headers=transport_settings).data
def raw_stream(self, query: str,
parameters: Optional[Union[Sequence, Dict[str, Any]]] = None,
settings: Optional[Dict[str, Any]] = None,
fmt: str = None,
use_database: bool = True,
- external_data: Optional[ExternalData] = None) -> io.IOBase:
+ external_data: Optional[ExternalData] = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> io.IOBase:
"""
See BaseClient doc_string for this method
"""
body, params, fields = self._prep_raw_query(query, parameters, settings, fmt, use_database, external_data)
- return self._raw_request(body, params, fields=fields, stream=True, server_wait=False)
+ return self._raw_request(body, params, fields=fields, stream=True, server_wait=False,
+ headers=transport_settings)
def _prep_raw_query(self, query: str,
parameters: Optional[Union[Sequence, Dict[str, Any]]],
@@ -526,7 +538,7 @@ def ping(self):
return True
# proton hasn't HTTP handle for path /ping
# try:
- # response = self.http.request('GET', f'{self.url}/ping', timeout=3)
+ # response = self.http.request('GET', f'{self.url}/ping', timeout=3, preload_content=True)
# return 200 <= response.status < 300
# except HTTPError:
# logger.debug('ping failed', exc_info=True)
diff --git a/timeplus_connect/driver/httputil.py b/timeplus_connect/driver/httputil.py
index 0937e556..df370ed7 100644
--- a/timeplus_connect/driver/httputil.py
+++ b/timeplus_connect/driver/httputil.py
@@ -228,12 +228,13 @@ def buffered():
read_gen = response.stream(chunk_size, decompress is None)
while True:
while not done:
+ chunk = None
try:
chunk = next(read_gen, None) # Always try to read at least one chunk if there are any left
except Exception: # pylint: disable=broad-except
# By swallowing an unexpected exception reading the stream, we will let consumers decide how to
# handle the unexpected end of stream
- pass
+ logger.warning('unexpected failure to read next chunk', exc_info=True)
if not chunk:
done = True
break
diff --git a/timeplus_connect/driver/insert.py b/timeplus_connect/driver/insert.py
index 6f8225f8..cce021c9 100644
--- a/timeplus_connect/driver/insert.py
+++ b/timeplus_connect/driver/insert.py
@@ -42,8 +42,9 @@ def __init__(self,
compression: Optional[Union[str, bool]] = None,
query_formats: Optional[Dict[str, str]] = None,
column_formats: Optional[Dict[str, Union[str, Dict[str, str]]]] = None,
- block_size: Optional[int] = None):
- super().__init__(settings, query_formats, column_formats)
+ block_size: Optional[int] = None,
+ transport_settings: Optional[Dict[str, str]] = None):
+ super().__init__(settings, query_formats, column_formats, transport_settings=transport_settings)
self.table = table
self.column_names = column_names
self.column_types = column_types
diff --git a/timeplus_connect/driver/query.py b/timeplus_connect/driver/query.py
index e7b74e6a..6cd49243 100644
--- a/timeplus_connect/driver/query.py
+++ b/timeplus_connect/driver/query.py
@@ -3,7 +3,7 @@
import pytz
from io import IOBase
-from typing import Any, Tuple, Dict, Sequence, Optional, Union, Generator
+from typing import Any, Tuple, Dict, Sequence, Optional, Union, Generator, BinaryIO
from datetime import tzinfo
from pytz.exceptions import UnknownTimeZoneError
@@ -52,7 +52,8 @@ def __init__(self,
as_pandas: bool = False,
streaming: bool = False,
apply_server_tz: bool = False,
- external_data: Optional[ExternalData] = None):
+ external_data: Optional[ExternalData] = None,
+ transport_settings: Optional[Dict[str, str]] = None):
"""
Initializes various configuration settings for the query context
@@ -85,7 +86,8 @@ def __init__(self,
column_formats,
encoding,
use_extended_dtypes if use_extended_dtypes is not None else False,
- use_numpy if use_numpy is not None else False)
+ use_numpy if use_numpy is not None else False,
+ transport_settings=transport_settings)
self.query = query
self.parameters = parameters or {}
self.use_none = True if use_none is None else use_none
@@ -189,7 +191,8 @@ def updated_copy(self,
use_extended_dtypes: Optional[bool] = None,
as_pandas: bool = False,
streaming: bool = False,
- external_data: Optional[ExternalData] = None) -> 'QueryContext':
+ external_data: Optional[ExternalData] = None,
+ transport_settings: Optional[Dict[str, str]] = None) -> 'QueryContext':
"""
Creates Query context copy with parameters overridden/updated as appropriate.
"""
@@ -210,7 +213,8 @@ def updated_copy(self,
as_pandas,
streaming,
self.apply_server_tz,
- self.external_data if external_data is None else external_data)
+ self.external_data if external_data is None else external_data,
+ self.transport_settings if transport_settings is None else transport_settings)
def _update_query(self):
self.final_query, self.bind_params = bind_query(self.query, self.parameters, self.server_tz)
@@ -374,7 +378,7 @@ def to_arrow_batches(buffer: IOBase) -> StreamContext:
return StreamContext(buffer, reader)
-def arrow_buffer(table, compression: Optional[str] = None) -> Tuple[Sequence[str], bytes]:
+def arrow_buffer(table, compression: Optional[str] = None) -> Tuple[Sequence[str], Union[bytes, BinaryIO]]:
pyarrow = check_arrow()
options = None
if compression in ('zstd', 'lz4'):