From c3f0e943a14386a95f5d86385ac50584207cd178 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Fri, 16 Jan 2026 11:06:29 +0700 Subject: [PATCH 1/6] feat: add support for writing Pandas DataFrames with tests --- .../write_client/client/write_api.py | 9 ++-- tests/test_influxdb_client_3_integration.py | 52 +++++++++++++++++-- 2 files changed, 52 insertions(+), 9 deletions(-) diff --git a/influxdb_client_3/write_client/client/write_api.py b/influxdb_client_3/write_client/client/write_api.py index aa57830..b071856 100644 --- a/influxdb_client_3/write_client/client/write_api.py +++ b/influxdb_client_3/write_client/client/write_api.py @@ -391,12 +391,9 @@ def write(self, bucket: str, org: str = None, _async_req = True if self._write_options.write_type == WriteType.asynchronous else False - # Filter out serializer-specific kwargs before passing to _post_write - http_kwargs = {k: v for k, v in kwargs.items() if k not in SERIALIZER_KWARGS} - def write_payload(payload): final_string = b'\n'.join(payload[1]) - return self._post_write(_async_req, bucket, org, final_string, payload[0], no_sync, **http_kwargs) + return self._post_write(_async_req, bucket, org, final_string, payload[0], no_sync, **kwargs) results = list(map(write_payload, payloads.items())) if not _async_req: @@ -586,11 +583,13 @@ def _retry_callback_delegate(exception): return _BatchResponse(data=batch_item) def _post_write(self, _async_req, bucket, org, body, precision, no_sync, **kwargs): + # Filter out serializer-specific kwargs before passing to _post_write + http_kwargs = {k: v for k, v in kwargs.items() if k not in SERIALIZER_KWARGS} return self._write_service.post_write(org=org, bucket=bucket, body=body, precision=precision, no_sync=no_sync, async_req=_async_req, content_type="text/plain; charset=utf-8", - **kwargs) + **http_kwargs) def _to_response(self, data: _BatchItem, delay: timedelta): diff --git a/tests/test_influxdb_client_3_integration.py b/tests/test_influxdb_client_3_integration.py index 554a971..8f8c170 100644 --- a/tests/test_influxdb_client_3_integration.py +++ b/tests/test_influxdb_client_3_integration.py @@ -1,16 +1,17 @@ import logging import os -import pyarrow -import pytest import random import string import time import unittest +import pyarrow +import pytest from urllib3.exceptions import MaxRetryError, TimeoutError as Url3TimeoutError - +from datetime import datetime +import pandas as pd from influxdb_client_3 import InfluxDBClient3, write_client_options, WriteOptions, \ - WriteType, InfluxDB3ClientQueryError + WriteType, InfluxDB3ClientQueryError, Point from influxdb_client_3.exceptions import InfluxDBError from tests.util import asyncio_run, lp_to_py_object @@ -48,6 +49,49 @@ def tearDown(self): if self.client: self.client.close() + def test_write_dataframe(self): + measurement = f'test{random_hex(3)}'.lower() + df = pd.DataFrame({ + 'time': pd.to_datetime(['2024-01-01', '2024-01-02']), + 'city': ['London', 'Paris'], + 'temperature': [15.0, 18.5] + }) + self.client.write_dataframe(df, measurement=measurement, timestamp_column='time', tags=['city']) + self.client.flush() + + result = self.client.query(query=f"select * from {measurement}", mode="pandas") + + self.assertEqual(2, len(result)) + self.assertEqual(2, len(result.get('city'))) + self.assertEqual(2, len(result.get('temperature'))) + + def test_write_dataframe_with_batch(self): + self.client = InfluxDBClient3(host=self.host, + database=self.database, + token=self.token, + write_client_options=write_client_options( + write_options=WriteOptions(batch_size=100) + )) + measurement = f'test{random_hex(3)}'.lower() + df = pd.DataFrame({ + 'time': pd.to_datetime(['2024-01-01', '2024-01-02']), + 'city': ['London', 'Paris'], + 'temperature': [15.0, 18.5] + }) + self.client.write_dataframe( + df, + measurement=measurement, + timestamp_column='time', + tags=['city'] + ) + self.client.flush() + + result = self.client.query(query=f"select * from {measurement}", mode="pandas") + + self.assertIsNotNone(result) + self.assertEqual(2, len(result.get('city'))) + self.assertEqual(2, len(result.get('temperature'))) + def test_write_and_query(self): test_id = time.time_ns() self.client.write(f"integration_test_python,type=used value=123.0,test_id={test_id}i") From 39d69e2ce3bf017c1fdeec3e84fd9e3f89ef522d Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Fri, 16 Jan 2026 11:28:46 +0700 Subject: [PATCH 2/6] feat: add test for writing CSV files with batching --- tests/test_influxdb_client_3_integration.py | 22 ++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/tests/test_influxdb_client_3_integration.py b/tests/test_influxdb_client_3_integration.py index 8f8c170..4be119d 100644 --- a/tests/test_influxdb_client_3_integration.py +++ b/tests/test_influxdb_client_3_integration.py @@ -61,7 +61,7 @@ def test_write_dataframe(self): result = self.client.query(query=f"select * from {measurement}", mode="pandas") - self.assertEqual(2, len(result)) + self.assertIsNotNone(result) self.assertEqual(2, len(result.get('city'))) self.assertEqual(2, len(result.get('temperature'))) @@ -92,6 +92,26 @@ def test_write_dataframe_with_batch(self): self.assertEqual(2, len(result.get('city'))) self.assertEqual(2, len(result.get('temperature'))) + def test_write_csv_file_with_batch(self): + client = InfluxDBClient3(host=self.host, + database=self.database, + token=self.token, + write_client_options=write_client_options( + write_options=WriteOptions(batch_size=100) + )) + measurement = f'test{random_hex(3)}'.lower() + client.write_file( + measurement_name=measurement, + file='tests/data/iot.csv', + timestamp_column='time', tag_columns=["name"]) + client.flush() + + result = client.query(query=f"select * from {measurement}", mode="pandas") + self.assertIsNotNone(result) + self.assertEqual(3, len(result.get('building'))) + self.assertEqual(3, len(result.get('temperature'))) + + def test_write_and_query(self): test_id = time.time_ns() self.client.write(f"integration_test_python,type=used value=123.0,test_id={test_id}i") From eeb9c629f650ca05578be50044d4cb773d91a86d Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Fri, 16 Jan 2026 11:39:45 +0700 Subject: [PATCH 3/6] chore: update CHANGELOG with bug fix for write_file and write_dataframe batching issues --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index fc2106b..1c1989b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## 0.18.0 [unreleased] +### Bug Fixes + +1. [#194](https://github.com/InfluxCommunity/influxdb3-python/pull/194): Fix InfluxDBClient3.write_file() and InfluxDBClient3.write_dataframe() fail with batching mode. + ## 0.17.0 [2026-01-08] ### Features From 3781bdf20edee62e3e79883f10d4affee668c7a3 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Fri, 16 Jan 2026 11:42:01 +0700 Subject: [PATCH 4/6] chore: clean up unused imports in test_influxdb_client_3_integration.py --- tests/test_influxdb_client_3_integration.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/test_influxdb_client_3_integration.py b/tests/test_influxdb_client_3_integration.py index 4be119d..4f7b7ef 100644 --- a/tests/test_influxdb_client_3_integration.py +++ b/tests/test_influxdb_client_3_integration.py @@ -5,13 +5,13 @@ import time import unittest +import pandas as pd import pyarrow import pytest from urllib3.exceptions import MaxRetryError, TimeoutError as Url3TimeoutError -from datetime import datetime -import pandas as pd + from influxdb_client_3 import InfluxDBClient3, write_client_options, WriteOptions, \ - WriteType, InfluxDB3ClientQueryError, Point + WriteType, InfluxDB3ClientQueryError from influxdb_client_3.exceptions import InfluxDBError from tests.util import asyncio_run, lp_to_py_object @@ -111,7 +111,6 @@ def test_write_csv_file_with_batch(self): self.assertEqual(3, len(result.get('building'))) self.assertEqual(3, len(result.get('temperature'))) - def test_write_and_query(self): test_id = time.time_ns() self.client.write(f"integration_test_python,type=used value=123.0,test_id={test_id}i") From 751a3ff04b1323ef7dbcbabaa0f1275a096e09d5 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Fri, 16 Jan 2026 14:03:23 +0700 Subject: [PATCH 5/6] chore: remove unused test parameters in test_polars.py --- tests/test_polars.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/test_polars.py b/tests/test_polars.py index 17ccf14..70571c9 100644 --- a/tests/test_polars.py +++ b/tests/test_polars.py @@ -96,6 +96,4 @@ def test_write_polars_batching(self): async_req=ANY, content_type=ANY, urlopen_kw=ANY, - data_frame_measurement_name='measurement', - data_frame_timestamp_column='time', body=b'measurement temperature=22.4 1722470400000000000\nmeasurement temperature=21.8 1722474000000000000') From 69b467fc717da4ea22449b0dead8eb10b09fbcb1 Mon Sep 17 00:00:00 2001 From: sonnh <46211823+NguyenHoangSon96@users.noreply.github.com> Date: Fri, 16 Jan 2026 18:36:51 +0700 Subject: [PATCH 6/6] Update CHANGELOG.md MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Jakub Bednář --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1c1989b..c021310 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ ### Bug Fixes -1. [#194](https://github.com/InfluxCommunity/influxdb3-python/pull/194): Fix InfluxDBClient3.write_file() and InfluxDBClient3.write_dataframe() fail with batching mode. +1. [#194](https://github.com/InfluxCommunity/influxdb3-python/pull/194): Fix `InfluxDBClient3.write_file()` and `InfluxDBClient3.write_dataframe()` fail with batching mode. ## 0.17.0 [2026-01-08]