Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## 0.18.0 [unreleased]

### Bug Fixes

1. [#194](https://github.com/InfluxCommunity/influxdb3-python/pull/194): Fix `InfluxDBClient3.write_file()` and `InfluxDBClient3.write_dataframe()` fail with batching mode.

## 0.17.0 [2026-01-08]

### Features
Expand Down
9 changes: 4 additions & 5 deletions influxdb_client_3/write_client/client/write_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,12 +391,9 @@ def write(self, bucket: str, org: str = None,

_async_req = True if self._write_options.write_type == WriteType.asynchronous else False

# Filter out serializer-specific kwargs before passing to _post_write
http_kwargs = {k: v for k, v in kwargs.items() if k not in SERIALIZER_KWARGS}

def write_payload(payload):
final_string = b'\n'.join(payload[1])
return self._post_write(_async_req, bucket, org, final_string, payload[0], no_sync, **http_kwargs)
return self._post_write(_async_req, bucket, org, final_string, payload[0], no_sync, **kwargs)

results = list(map(write_payload, payloads.items()))
if not _async_req:
Expand Down Expand Up @@ -586,11 +583,13 @@ def _retry_callback_delegate(exception):
return _BatchResponse(data=batch_item)

def _post_write(self, _async_req, bucket, org, body, precision, no_sync, **kwargs):
# Filter out serializer-specific kwargs before passing to _post_write
http_kwargs = {k: v for k, v in kwargs.items() if k not in SERIALIZER_KWARGS}
return self._write_service.post_write(org=org, bucket=bucket, body=body, precision=precision,
no_sync=no_sync,
async_req=_async_req,
content_type="text/plain; charset=utf-8",
**kwargs)
**http_kwargs)

def _to_response(self, data: _BatchItem, delay: timedelta):

Expand Down
67 changes: 65 additions & 2 deletions tests/test_influxdb_client_3_integration.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import logging
import os
import pyarrow
import pytest
import random
import string
import time
import unittest

import pandas as pd
import pyarrow
import pytest
from urllib3.exceptions import MaxRetryError, TimeoutError as Url3TimeoutError

from influxdb_client_3 import InfluxDBClient3, write_client_options, WriteOptions, \
Expand Down Expand Up @@ -48,6 +49,68 @@ def tearDown(self):
if self.client:
self.client.close()

def test_write_dataframe(self):
measurement = f'test{random_hex(3)}'.lower()
df = pd.DataFrame({
'time': pd.to_datetime(['2024-01-01', '2024-01-02']),
'city': ['London', 'Paris'],
'temperature': [15.0, 18.5]
})
self.client.write_dataframe(df, measurement=measurement, timestamp_column='time', tags=['city'])
self.client.flush()

result = self.client.query(query=f"select * from {measurement}", mode="pandas")

self.assertIsNotNone(result)
self.assertEqual(2, len(result.get('city')))
self.assertEqual(2, len(result.get('temperature')))

def test_write_dataframe_with_batch(self):
self.client = InfluxDBClient3(host=self.host,
database=self.database,
token=self.token,
write_client_options=write_client_options(
write_options=WriteOptions(batch_size=100)
))
measurement = f'test{random_hex(3)}'.lower()
df = pd.DataFrame({
'time': pd.to_datetime(['2024-01-01', '2024-01-02']),
'city': ['London', 'Paris'],
'temperature': [15.0, 18.5]
})
self.client.write_dataframe(
df,
measurement=measurement,
timestamp_column='time',
tags=['city']
)
self.client.flush()

result = self.client.query(query=f"select * from {measurement}", mode="pandas")

self.assertIsNotNone(result)
self.assertEqual(2, len(result.get('city')))
self.assertEqual(2, len(result.get('temperature')))

def test_write_csv_file_with_batch(self):
client = InfluxDBClient3(host=self.host,
database=self.database,
token=self.token,
write_client_options=write_client_options(
write_options=WriteOptions(batch_size=100)
))
measurement = f'test{random_hex(3)}'.lower()
client.write_file(
measurement_name=measurement,
file='tests/data/iot.csv',
timestamp_column='time', tag_columns=["name"])
client.flush()

result = client.query(query=f"select * from {measurement}", mode="pandas")
self.assertIsNotNone(result)
self.assertEqual(3, len(result.get('building')))
self.assertEqual(3, len(result.get('temperature')))

def test_write_and_query(self):
test_id = time.time_ns()
self.client.write(f"integration_test_python,type=used value=123.0,test_id={test_id}i")
Expand Down
2 changes: 0 additions & 2 deletions tests/test_polars.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,4 @@ def test_write_polars_batching(self):
async_req=ANY,
content_type=ANY,
urlopen_kw=ANY,
data_frame_measurement_name='measurement',
data_frame_timestamp_column='time',
body=b'measurement temperature=22.4 1722470400000000000\nmeasurement temperature=21.8 1722474000000000000')