diff --git a/CHANGELOG.md b/CHANGELOG.md index fc2106b..c021310 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## 0.18.0 [unreleased] +### Bug Fixes + +1. [#194](https://github.com/InfluxCommunity/influxdb3-python/pull/194): Fix `InfluxDBClient3.write_file()` and `InfluxDBClient3.write_dataframe()` fail with batching mode. + ## 0.17.0 [2026-01-08] ### Features diff --git a/influxdb_client_3/write_client/client/write_api.py b/influxdb_client_3/write_client/client/write_api.py index aa57830..b071856 100644 --- a/influxdb_client_3/write_client/client/write_api.py +++ b/influxdb_client_3/write_client/client/write_api.py @@ -391,12 +391,9 @@ def write(self, bucket: str, org: str = None, _async_req = True if self._write_options.write_type == WriteType.asynchronous else False - # Filter out serializer-specific kwargs before passing to _post_write - http_kwargs = {k: v for k, v in kwargs.items() if k not in SERIALIZER_KWARGS} - def write_payload(payload): final_string = b'\n'.join(payload[1]) - return self._post_write(_async_req, bucket, org, final_string, payload[0], no_sync, **http_kwargs) + return self._post_write(_async_req, bucket, org, final_string, payload[0], no_sync, **kwargs) results = list(map(write_payload, payloads.items())) if not _async_req: @@ -586,11 +583,13 @@ def _retry_callback_delegate(exception): return _BatchResponse(data=batch_item) def _post_write(self, _async_req, bucket, org, body, precision, no_sync, **kwargs): + # Filter out serializer-specific kwargs before passing to _post_write + http_kwargs = {k: v for k, v in kwargs.items() if k not in SERIALIZER_KWARGS} return self._write_service.post_write(org=org, bucket=bucket, body=body, precision=precision, no_sync=no_sync, async_req=_async_req, content_type="text/plain; charset=utf-8", - **kwargs) + **http_kwargs) def _to_response(self, data: _BatchItem, delay: timedelta): diff --git a/tests/test_influxdb_client_3_integration.py b/tests/test_influxdb_client_3_integration.py index 554a971..4f7b7ef 100644 --- a/tests/test_influxdb_client_3_integration.py +++ b/tests/test_influxdb_client_3_integration.py @@ -1,12 +1,13 @@ import logging import os -import pyarrow -import pytest import random import string import time import unittest +import pandas as pd +import pyarrow +import pytest from urllib3.exceptions import MaxRetryError, TimeoutError as Url3TimeoutError from influxdb_client_3 import InfluxDBClient3, write_client_options, WriteOptions, \ @@ -48,6 +49,68 @@ def tearDown(self): if self.client: self.client.close() + def test_write_dataframe(self): + measurement = f'test{random_hex(3)}'.lower() + df = pd.DataFrame({ + 'time': pd.to_datetime(['2024-01-01', '2024-01-02']), + 'city': ['London', 'Paris'], + 'temperature': [15.0, 18.5] + }) + self.client.write_dataframe(df, measurement=measurement, timestamp_column='time', tags=['city']) + self.client.flush() + + result = self.client.query(query=f"select * from {measurement}", mode="pandas") + + self.assertIsNotNone(result) + self.assertEqual(2, len(result.get('city'))) + self.assertEqual(2, len(result.get('temperature'))) + + def test_write_dataframe_with_batch(self): + self.client = InfluxDBClient3(host=self.host, + database=self.database, + token=self.token, + write_client_options=write_client_options( + write_options=WriteOptions(batch_size=100) + )) + measurement = f'test{random_hex(3)}'.lower() + df = pd.DataFrame({ + 'time': pd.to_datetime(['2024-01-01', '2024-01-02']), + 'city': ['London', 'Paris'], + 'temperature': [15.0, 18.5] + }) + self.client.write_dataframe( + df, + measurement=measurement, + timestamp_column='time', + tags=['city'] + ) + self.client.flush() + + result = self.client.query(query=f"select * from {measurement}", mode="pandas") + + self.assertIsNotNone(result) + self.assertEqual(2, len(result.get('city'))) + self.assertEqual(2, len(result.get('temperature'))) + + def test_write_csv_file_with_batch(self): + client = InfluxDBClient3(host=self.host, + database=self.database, + token=self.token, + write_client_options=write_client_options( + write_options=WriteOptions(batch_size=100) + )) + measurement = f'test{random_hex(3)}'.lower() + client.write_file( + measurement_name=measurement, + file='tests/data/iot.csv', + timestamp_column='time', tag_columns=["name"]) + client.flush() + + result = client.query(query=f"select * from {measurement}", mode="pandas") + self.assertIsNotNone(result) + self.assertEqual(3, len(result.get('building'))) + self.assertEqual(3, len(result.get('temperature'))) + def test_write_and_query(self): test_id = time.time_ns() self.client.write(f"integration_test_python,type=used value=123.0,test_id={test_id}i") diff --git a/tests/test_polars.py b/tests/test_polars.py index 17ccf14..70571c9 100644 --- a/tests/test_polars.py +++ b/tests/test_polars.py @@ -96,6 +96,4 @@ def test_write_polars_batching(self): async_req=ANY, content_type=ANY, urlopen_kw=ANY, - data_frame_measurement_name='measurement', - data_frame_timestamp_column='time', body=b'measurement temperature=22.4 1722470400000000000\nmeasurement temperature=21.8 1722474000000000000')