Skip to content

Commit 6f0b65a

Browse files
author
Valeriya Popova
committed
topic-writer: fix default flush on close parameter, renaming
1 parent 6307e59 commit 6f0b65a

File tree

2 files changed

+23
-19
lines changed

2 files changed

+23
-19
lines changed

ydb/_topic_writer/topic_writer_asyncio.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,13 @@ def __del__(self):
6565

6666
self._loop.call_soon(self.close)
6767

68-
async def close(self):
68+
async def close(self, *, flush: bool = True):
6969
if self._closed:
7070
return
7171

7272
self._closed = True
7373

74-
await self._reconnector.close()
74+
await self._reconnector.close(flush)
7575

7676
async def write_with_ack(
7777
self,
@@ -109,13 +109,13 @@ async def write_with_ack_future(
109109
For wait with timeout use asyncio.wait_for.
110110
"""
111111
if isinstance(messages, PublicMessage):
112-
futures = await self._reconnector.write_with_ack([messages])
112+
futures = await self._reconnector.write_with_ack_future([messages])
113113
return futures[0]
114114
if isinstance(messages, list):
115115
for m in messages:
116116
if not isinstance(m, PublicMessage):
117117
raise NotImplementedError()
118-
return await self._reconnector.write_with_ack(messages)
118+
return await self._reconnector.write_with_ack_future(messages)
119119
raise NotImplementedError()
120120

121121
async def write(
@@ -185,7 +185,7 @@ def __init__(self, driver: SupportedDriverType, settings: WriterSettings):
185185
asyncio.create_task(self._connection_loop(), name="connection_loop")
186186
]
187187

188-
async def close(self, flush: bool = True):
188+
async def close(self, flush: bool):
189189
if self._closed:
190190
return
191191

@@ -223,7 +223,7 @@ async def wait_init(self) -> PublicWriterInitInfo:
223223
async def wait_stop(self) -> Exception:
224224
return await self._stop_reason
225225

226-
async def write_with_ack(
226+
async def write_with_ack_future(
227227
self, messages: List[PublicMessage]
228228
) -> List[asyncio.Future]:
229229
# todo check internal buffer limit

ydb/_topic_writer/topic_writer_asyncio_test.py

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ async def test_reconnect_and_resent_non_acked_messages_on_retriable_error(
278278
seqno=2,
279279
created_at=now,
280280
)
281-
await reconnector.write_with_ack([message1, message2])
281+
await reconnector.write_with_ack_future([message1, message2])
282282

283283
# sent to first stream
284284
stream_writer = get_stream_writer()
@@ -300,7 +300,7 @@ async def test_reconnect_and_resent_non_acked_messages_on_retriable_error(
300300
assert second_sent_msg == expected_messages
301301

302302
second_writer.from_server.put_nowait(self.make_default_ack_message(seq_no=2))
303-
await reconnector.close()
303+
await reconnector.close(flush=True)
304304

305305
async def test_stop_on_unexpected_exception(
306306
self, reconnector: WriterAsyncIOReconnector, get_stream_writer
@@ -320,7 +320,7 @@ class TestException(Exception):
320320

321321
async def wait_stop():
322322
while True:
323-
await reconnector.write_with_ack([message])
323+
await reconnector.write_with_ack_future([message])
324324
await asyncio.sleep(0.1)
325325

326326
await asyncio.wait_for(wait_stop(), 1)
@@ -363,7 +363,7 @@ async def test_write_message(
363363
data="123",
364364
seqno=3,
365365
)
366-
await reconnector.write_with_ack([message])
366+
await reconnector.write_with_ack_future([message])
367367

368368
sent_messages = await asyncio.wait_for(stream_writer.from_client.get(), 1)
369369
assert sent_messages == [InternalMessage(message)]
@@ -382,8 +382,8 @@ async def test_auto_seq_no(
382382

383383
reconnector = WriterAsyncIOReconnector(default_driver, settings)
384384

385-
await reconnector.write_with_ack([PublicMessage(data="123")])
386-
await reconnector.write_with_ack([PublicMessage(data="456")])
385+
await reconnector.write_with_ack_future([PublicMessage(data="123")])
386+
await reconnector.write_with_ack_future([PublicMessage(data="456")])
387387

388388
stream_writer = get_stream_writer()
389389

@@ -398,22 +398,26 @@ async def test_auto_seq_no(
398398
] == sent
399399

400400
with pytest.raises(TopicWriterError):
401-
await reconnector.write_with_ack(
401+
await reconnector.write_with_ack_future(
402402
[PublicMessage(seqno=last_seq_no + 3, data="123")]
403403
)
404404

405405
await reconnector.close(flush=False)
406406

407407
async def test_deny_double_seqno(self, reconnector: WriterAsyncIOReconnector):
408-
await reconnector.write_with_ack([PublicMessage(seqno=10, data="123")])
408+
await reconnector.write_with_ack_future([PublicMessage(seqno=10, data="123")])
409409

410410
with pytest.raises(TopicWriterError):
411-
await reconnector.write_with_ack([PublicMessage(seqno=9, data="123")])
411+
await reconnector.write_with_ack_future(
412+
[PublicMessage(seqno=9, data="123")]
413+
)
412414

413415
with pytest.raises(TopicWriterError):
414-
await reconnector.write_with_ack([PublicMessage(seqno=10, data="123")])
416+
await reconnector.write_with_ack_future(
417+
[PublicMessage(seqno=10, data="123")]
418+
)
415419

416-
await reconnector.write_with_ack([PublicMessage(seqno=11, data="123")])
420+
await reconnector.write_with_ack_future([PublicMessage(seqno=11, data="123")])
417421

418422
await reconnector.close(flush=False)
419423

@@ -426,7 +430,7 @@ async def test_auto_created_at(
426430
settings = copy.deepcopy(default_settings)
427431
settings.auto_created_at = True
428432
reconnector = WriterAsyncIOReconnector(default_driver, settings)
429-
await reconnector.write_with_ack([PublicMessage(seqno=4, data="123")])
433+
await reconnector.write_with_ack_future([PublicMessage(seqno=4, data="123")])
430434

431435
stream_writer = get_stream_writer()
432436
sent = await stream_writer.from_client.get()
@@ -451,7 +455,7 @@ def __init__(self):
451455
self.futures = []
452456
self.messages_writted = asyncio.Event()
453457

454-
async def write_with_ack(self, messages: typing.List[InternalMessage]):
458+
async def write_with_ack_future(self, messages: typing.List[InternalMessage]):
455459
async with self.lock:
456460
futures = [asyncio.Future() for _ in messages]
457461
self.messages.extend(messages)

0 commit comments

Comments
 (0)