Skip to content

Commit c6e00e6

Browse files
committed
Implement async context manager protocol for AIO clients
When AIOProducer or AIOConsumer is used as the target of a `with:` statement, automatically close the connection at the end of the context.
1 parent 97b0dd1 commit c6e00e6

4 files changed

Lines changed: 37 additions & 0 deletions

File tree

requirements/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
typing-extensions; python_version < "3.11"

src/confluent_kafka/aio/_AIOConsumer.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@
1616
import concurrent.futures
1717
from typing import Any, Callable, Dict, Optional, Tuple
1818

19+
try:
20+
from typing import Self
21+
except ImportError:
22+
# FIXME: remove once we depend on Python >= 3.11
23+
from typing_extensions import Self
24+
1925
import confluent_kafka
2026

2127
from . import _common as _common
@@ -46,6 +52,12 @@ def __init__(
4652

4753
self._consumer: confluent_kafka.Consumer = confluent_kafka.Consumer(consumer_conf)
4854

55+
async def __aenter__(self) -> Self:
56+
return self
57+
58+
async def __aexit__(self, *_) -> None:
59+
await self.close()
60+
4961
async def _call(self, blocking_task: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
5062
return await _common.async_call(self.executor, blocking_task, *args, **kwargs)
5163

src/confluent_kafka/aio/producer/_AIOProducer.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@
1717
import logging
1818
from typing import Any, Callable, Dict, Optional
1919

20+
try:
21+
from typing import Self
22+
except ImportError:
23+
# FIXME: remove once we depend on Python >= 3.11
24+
from typing_extensions import Self
25+
2026
import confluent_kafka
2127

2228
from .. import _common as _common
@@ -70,6 +76,12 @@ def __init__(
7076
if buffer_timeout > 0:
7177
self._buffer_timeout_manager.start_timeout_monitoring()
7278

79+
async def __aenter__(self) -> Self:
80+
return self
81+
82+
async def __aexit__(self, *_) -> None:
83+
await self.close()
84+
7385
async def close(self) -> None:
7486
"""Close the producer and cleanup resources
7587

tests/test_AIOProducer.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,18 @@ async def test_close_method(self, mock_producer, mock_common, basic_config):
8080
await producer2.close()
8181
assert producer2._is_closed is True
8282

83+
@pytest.mark.asyncio
84+
async def test_async_context_manager(self, mock_producer, mock_common, basic_config):
85+
with AIOProducer(basic_config) as producer:
86+
assert producer._is_closed is False
87+
assert producer._is_closed is True
88+
89+
with AIOProducer(basic_config) as producer2:
90+
assert producer2._is_closed is False
91+
await producer2.close()
92+
await producer2.close()
93+
assert producer2._is_closed is True
94+
8395
@pytest.mark.asyncio
8496
async def test_call_method_executor_usage(self, mock_producer, mock_common, basic_config):
8597
producer = AIOProducer(basic_config)

0 commit comments

Comments
 (0)