Skip to content

Commit dda52b9

Browse files
committed
Fix hang while write messages to closed driver.
Close #296
1 parent 9097cf4 commit dda52b9

File tree

4 files changed

+22
-15
lines changed

4 files changed

+22
-15
lines changed

tests/topics/test_topic_writer.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
from __future__ import annotations
2+
3+
import asyncio
24
from typing import List
35

46
import pytest
@@ -120,11 +122,12 @@ class TestException(Exception):
120122

121123
with pytest.raises(TestException):
122124
async with driver.topic_client.writer(topic_path) as writer:
123-
driver.stop() # will raise exception on topic writer __exit__
124-
try:
125-
writer.write("123")
126-
except ydb.Error:
127-
pass
125+
await writer.wait_init()
126+
await driver.stop() # will raise exception on topic writer __exit__
127+
128+
# ensure writer has exception internally
129+
with pytest.raises((ydb.Error, asyncio.CancelledError)):
130+
await writer.write_with_ack("123")
128131

129132
raise TestException()
130133

@@ -257,10 +260,11 @@ class TestException(Exception):
257260

258261
with pytest.raises(TestException):
259262
with driver_sync.topic_client.writer(topic_path) as writer:
263+
writer.wait_init()
260264
driver_sync.stop() # will raise exception on topic writer __exit__
261-
try:
262-
writer.write("123")
263-
except ydb.Error:
264-
pass
265+
266+
# ensure writer has exception internally
267+
with pytest.raises(ydb.Error):
268+
writer.write_with_ack("123")
265269

266270
raise TestException()

ydb/_topic_writer/topic_writer_asyncio.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,11 @@ async def __aenter__(self) -> "WriterAsyncIO":
6565
return self
6666

6767
async def __aexit__(self, exc_type, exc_val, exc_tb):
68-
await self.close()
68+
try:
69+
await self.close()
70+
except BaseException as e:
71+
if exc_val is None:
72+
raise
6973

7074
def __del__(self):
7175
if self._closed or self._loop.is_closed():

ydb/_topic_writer/topic_writer_sync.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,9 @@ def __enter__(self):
5959
def __exit__(self, exc_type, exc_val, exc_tb):
6060
try:
6161
self.close()
62-
except ydb.Error:
63-
if exc_val:
64-
raise exc_val
65-
raise
62+
except BaseException as e:
63+
if exc_val is None:
64+
raise
6665

6766
def __del__(self):
6867
self.close(flush=False)

ydb/topic.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,7 @@ def writer(
337337
encoders: Optional[Mapping[_ydb_topic_public_types.PublicCodec, Callable[[bytes], bytes]]] = None,
338338
encoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool
339339
) -> TopicWriter:
340-
args = locals().copy()
340+
args = locals()
341341
del args["self"]
342342
self._check_closed()
343343

0 commit comments

Comments
 (0)