From 7553ff021e25ce572d782d928b0a1eda9e415b35 Mon Sep 17 00:00:00 2001 From: Leo Singer Date: Wed, 28 Jan 2026 22:36:46 -0500 Subject: [PATCH] Implement async context manager protocol for AIO clients When AIOProducer or AIOConsumer is used as the target of a `with:` statement, automatically close the connection at the end of the context. --- CHANGELOG.md | 8 ++++++++ requirements/requirements.txt | 1 + src/confluent_kafka/aio/_AIOConsumer.py | 9 +++++++++ src/confluent_kafka/aio/producer/_AIOProducer.py | 9 +++++++++ tests/test_AIOConsumer.py | 6 ++++++ tests/test_AIOProducer.py | 12 ++++++++++++ 6 files changed, 45 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 46111c5ba..4d2a5f3c1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Confluent Python Client for Apache Kafka - CHANGELOG +## v2.13.3 - unreleased + +### Enhancements + +- Implement async context manager protocol for AIOProducer and AIOConsumer (#2180) + +### Fixes + ## v2.13.2 - 2026-03-02 v2.13.2 is a maintenance release with the following fixes and enhancements: diff --git a/requirements/requirements.txt b/requirements/requirements.txt index e69de29bb..fa6e39aa2 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -0,0 +1 @@ +typing-extensions; python_version < "3.11" diff --git a/src/confluent_kafka/aio/_AIOConsumer.py b/src/confluent_kafka/aio/_AIOConsumer.py index 0ff986a62..fb0157070 100644 --- a/src/confluent_kafka/aio/_AIOConsumer.py +++ b/src/confluent_kafka/aio/_AIOConsumer.py @@ -16,6 +16,9 @@ import concurrent.futures from typing import Any, Callable, Dict, Optional, Tuple +# FIXME: import from typing once we depend on Python >= 3.11 +from typing_extensions import Self + import confluent_kafka from . import _common as _common @@ -46,6 +49,12 @@ def __init__( self._consumer: confluent_kafka.Consumer = confluent_kafka.Consumer(consumer_conf) + async def __aenter__(self) -> Self: + return self + + async def __aexit__(self, *_) -> None: + await self.close() + async def _call(self, blocking_task: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: return await _common.async_call(self.executor, blocking_task, *args, **kwargs) diff --git a/src/confluent_kafka/aio/producer/_AIOProducer.py b/src/confluent_kafka/aio/producer/_AIOProducer.py index a5fc28727..adf6f86b2 100644 --- a/src/confluent_kafka/aio/producer/_AIOProducer.py +++ b/src/confluent_kafka/aio/producer/_AIOProducer.py @@ -17,6 +17,9 @@ import logging from typing import Any, Callable, Dict, Optional +# FIXME: import from typing once we depend on Python >= 3.11 +from typing_extensions import Self + import confluent_kafka from .. import _common as _common @@ -70,6 +73,12 @@ def __init__( if buffer_timeout > 0: self._buffer_timeout_manager.start_timeout_monitoring() + async def __aenter__(self) -> Self: + return self + + async def __aexit__(self, *_) -> None: + await self.close() + async def close(self) -> None: """Close the producer and cleanup resources diff --git a/tests/test_AIOConsumer.py b/tests/test_AIOConsumer.py index 61ff6ae80..48c92c776 100644 --- a/tests/test_AIOConsumer.py +++ b/tests/test_AIOConsumer.py @@ -165,3 +165,9 @@ async def test_network_error_handling(self, mock_consumer, mock_common, basic_co await consumer.poll(timeout=1.0) assert exc_info.value.args[0].code() == KafkaError._TRANSPORT + + @pytest.mark.asyncio + async def test_async_context_manager(self, mock_consumer, mock_common, basic_config): + """Test AIOConsumer handles network errors gracefully.""" + async with AIOConsumer(basic_config) as _: + pass diff --git a/tests/test_AIOProducer.py b/tests/test_AIOProducer.py index 6cfa55833..0db882288 100644 --- a/tests/test_AIOProducer.py +++ b/tests/test_AIOProducer.py @@ -80,6 +80,18 @@ async def test_close_method(self, mock_producer, mock_common, basic_config): await producer2.close() assert producer2._is_closed is True + @pytest.mark.asyncio + async def test_async_context_manager(self, mock_producer, mock_common, basic_config): + async with AIOProducer(basic_config) as producer: + assert producer._is_closed is False + assert producer._is_closed is True + + async with AIOProducer(basic_config) as producer2: + assert producer2._is_closed is False + await producer2.close() + await producer2.close() + assert producer2._is_closed is True + @pytest.mark.asyncio async def test_call_method_executor_usage(self, mock_producer, mock_common, basic_config): producer = AIOProducer(basic_config)