Skip to content

Commit 618f25c

Browse files
author
Valeriya Popova
committed
better tests fot topic_writer/topic_reader
1 parent 805d91f commit 618f25c

File tree

2 files changed

+37
-44
lines changed

2 files changed

+37
-44
lines changed

ydb/_topic_reader/topic_reader_asyncio_test.py

Lines changed: 19 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -125,17 +125,8 @@ def second_partition_session(
125125

126126
return stream_reader_started._partition_sessions[partition_session.id]
127127

128-
@pytest.fixture()
129-
async def stream_reader_started(
130-
self, stream, default_reader_settings, request
131-
) -> ReaderStream:
132-
133-
settings, token_getter = getattr(
134-
request, "param", (default_reader_settings, None)
135-
)
136-
reader = ReaderStream(
137-
self.default_reader_reconnector_id, settings, token_getter
138-
)
128+
async def get_started_reader(self, stream, *args, **kwargs) -> ReaderStream:
129+
reader = ReaderStream(self.default_reader_reconnector_id, *args, **kwargs)
139130
init_message = object()
140131

141132
# noinspection PyTypeChecker
@@ -164,6 +155,12 @@ async def stream_reader_started(
164155

165156
return reader
166157

158+
@pytest.fixture()
159+
async def stream_reader_started(
160+
self, stream, default_reader_settings
161+
) -> ReaderStream:
162+
return await self.get_started_reader(stream, default_reader_settings)
163+
167164
@pytest.fixture()
168165
async def stream_reader(self, stream_reader_started: ReaderStream):
169166
yield stream_reader_started
@@ -1012,21 +1009,16 @@ async def test_receive_batch_nowait(self, stream, stream_reader, partition_sessi
10121009
with pytest.raises(asyncio.QueueEmpty):
10131010
stream.from_client.get_nowait()
10141011

1015-
@pytest.mark.parametrize(
1016-
"stream_reader_started",
1017-
[
1018-
(
1019-
PublicReaderSettings(
1020-
consumer="test-consumer",
1021-
topic="test-topic",
1022-
update_token_interval=0.1,
1023-
),
1024-
lambda: "foo-bar",
1025-
)
1026-
],
1027-
indirect=True,
1028-
)
1029-
async def test_update_token(self, stream, stream_reader_started: ReaderStream):
1012+
async def test_update_token(self, stream):
1013+
settings = PublicReaderSettings(
1014+
consumer="test-consumer",
1015+
topic="test-topic",
1016+
update_token_interval=0.1,
1017+
)
1018+
reader = await self.get_started_reader(
1019+
stream, settings, get_token_function=lambda: "foo-bar"
1020+
)
1021+
10301022
assert stream.from_client.empty()
10311023

10321024
expected = StreamReadMessage.FromClient(UpdateTokenRequest(token="foo-bar"))
@@ -1046,7 +1038,7 @@ async def test_update_token(self, stream, stream_reader_started: ReaderStream):
10461038
got = await wait_for_fast(stream.from_client.get())
10471039
assert expected == got
10481040

1049-
await stream_reader_started.close()
1041+
await reader.close()
10501042

10511043

10521044
@pytest.mark.asyncio

ydb/_topic_writer/topic_writer_asyncio_test.py

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@ def stream(self):
6363
yield stream
6464
stream.close()
6565

66-
@pytest.fixture
67-
async def writer_and_stream(self, stream, request) -> WriterWithMockedStream:
66+
@staticmethod
67+
async def get_started_writer(stream, *args, **kwargs) -> WriterAsyncIOStream:
6868
stream.from_server.put_nowait(
6969
StreamWriteMessage.InitResponse(
7070
last_seq_no=4,
@@ -75,9 +75,7 @@ async def writer_and_stream(self, stream, request) -> WriterWithMockedStream:
7575
)
7676
)
7777

78-
params = getattr(request, "param", ())
79-
writer = WriterAsyncIOStream(*params)
80-
78+
writer = WriterAsyncIOStream(*args, **kwargs)
8179
await writer._start(
8280
stream,
8381
init_message=StreamWriteMessage.InitRequest(
@@ -91,6 +89,11 @@ async def writer_and_stream(self, stream, request) -> WriterWithMockedStream:
9189
),
9290
)
9391
await stream.from_client.get()
92+
return writer
93+
94+
@pytest.fixture
95+
async def writer_and_stream(self, stream) -> WriterWithMockedStream:
96+
writer = await self.get_started_writer(stream)
9497

9598
yield TestWriterAsyncIOStream.WriterWithMockedStream(
9699
stream=stream,
@@ -164,25 +167,23 @@ async def test_write_a_message(self, writer_and_stream: WriterWithMockedStream):
164167
sent_message = await writer_and_stream.stream.from_client.get()
165168
assert expected_message == sent_message
166169

167-
@pytest.mark.parametrize(
168-
"writer_and_stream", [(0.1, lambda: "foo-bar")], indirect=True
169-
)
170-
async def test_update_token(self, writer_and_stream: WriterWithMockedStream):
171-
assert writer_and_stream.stream.from_client.empty()
170+
async def test_update_token(self, stream: StreamMock):
171+
writer = await self.get_started_writer(
172+
stream, update_token_interval=0.1, get_token_function=lambda: "foo-bar"
173+
)
174+
assert stream.from_client.empty()
172175

173176
expected = StreamWriteMessage.FromClient(UpdateTokenRequest(token="foo-bar"))
174-
got = await wait_for_fast(writer_and_stream.stream.from_client.get())
177+
got = await wait_for_fast(stream.from_client.get())
175178
assert expected == got, "send update token request"
176179

177180
await asyncio.sleep(0.2)
178-
assert (
179-
writer_and_stream.stream.from_client.empty()
180-
), "no answer - no new update request"
181+
assert stream.from_client.empty(), "no answer - no new update request"
181182

182-
await writer_and_stream.stream.from_server.put(UpdateTokenResponse())
183-
receive_task = asyncio.create_task(writer_and_stream.writer.receive())
183+
await stream.from_server.put(UpdateTokenResponse())
184+
receive_task = asyncio.create_task(writer.receive())
184185

185-
got = await wait_for_fast(writer_and_stream.stream.from_client.get())
186+
got = await wait_for_fast(stream.from_client.get())
186187
assert expected == got
187188

188189
receive_task.cancel()

0 commit comments

Comments
 (0)