Skip to content

Commit b839bd3

Browse files
committed
added support many topics in one reader
1 parent bb9625c commit b839bd3

File tree

6 files changed

+56
-12
lines changed

6 files changed

+56
-12
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Added support to set many topics and topic reader settings for read in one reader
2+
13
## 3.2.2 ##
24
* Fixed set keep_in_cache algorithm
35

tests/conftest.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,24 @@ async def topic_path(driver, topic_consumer, database) -> str:
158158
return topic_path
159159

160160

161+
@pytest.fixture()
162+
@pytest.mark.asyncio()
163+
async def topic2_path(driver, topic_consumer, database) -> str:
164+
topic_path = database + "/test-topic2"
165+
166+
try:
167+
await driver.topic_client.drop_topic(topic_path)
168+
except issues.SchemeError:
169+
pass
170+
171+
await driver.topic_client.create_topic(
172+
path=topic_path,
173+
consumers=[topic_consumer],
174+
)
175+
176+
return topic_path
177+
178+
161179
@pytest.fixture()
162180
@pytest.mark.asyncio()
163181
async def topic_with_messages(driver, topic_consumer, database):

tests/topics/test_topic_reader.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,31 @@ def decode(b: bytes):
7171
batch = await reader.receive_batch()
7272
assert batch.messages[0].data.decode() == "123"
7373

74+
async def test_read_from_two_topics(self, driver, topic_path, topic2_path, topic_consumer):
75+
async with driver.topic_client.writer(topic_path) as writer:
76+
await writer.write("1")
77+
await writer.flush()
78+
79+
async with driver.topic_client.writer(topic2_path) as writer:
80+
await writer.write("2")
81+
await writer.flush()
82+
83+
messages = []
84+
async with driver.topic_client.reader(
85+
[
86+
topic_path,
87+
ydb.TopicReaderSelector(path=topic2_path),
88+
],
89+
consumer=topic_consumer,
90+
) as reader:
91+
for _ in range(2):
92+
message = await reader.receive_message()
93+
messages.append(message)
94+
95+
messages = [message.data.decode() for message in messages]
96+
messages.sort()
97+
assert messages == ["1", "2"]
98+
7499

75100
class TestTopicReaderSync:
76101
def test_read_batch(self, driver_sync, topic_with_messages, topic_consumer):

ydb/_topic_reader/topic_reader.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,14 @@
1414
from .._grpc.grpcwrapper.ydb_topic import StreamReadMessage, OffsetsRange
1515

1616

17+
@dataclass
1718
class PublicTopicSelector:
1819
path: str
19-
partitions: Union[None, int, List[int]]
20-
read_from: Optional[datetime.datetime]
21-
max_lag: Optional[datetime.timedelta]
22-
23-
def __init__(self, path, *, partitions: Union[None, int, List[int]] = None):
24-
self.path = path
25-
self.partitions = partitions
20+
partitions: Union[None, int, List[int]] = None
21+
read_from: Optional[datetime.datetime] = None
22+
max_lag: Optional[datetime.timedelta] = None
2623

27-
def _to_topic_read_settings(self)->StreamReadMessage.InitRequest.TopicReadSettings:
24+
def _to_topic_read_settings(self) -> StreamReadMessage.InitRequest.TopicReadSettings:
2825
partitions = self.partitions
2926
if partitions is None:
3027
partitions = []
@@ -71,7 +68,7 @@ def _init_message(self) -> StreamReadMessage.InitRequest:
7168
raise TypeError("Value of %s not supported as topic selector" % type(selector))
7269

7370
return StreamReadMessage.InitRequest(
74-
topics_read_settings=list(map(PublicTopicSelector._to_topic_read_settings, selectors)), # type: ignore
71+
topics_read_settings=list(map(PublicTopicSelector._to_topic_read_settings, selectors)), # type: ignore
7572
consumer=self.consumer,
7673
)
7774

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ async def _connection_loop(self):
164164
attempt = 0
165165
self._state_changed.set()
166166
await self._stream_reader.wait_error()
167-
except issues.Error as err:
167+
except BaseException as err:
168168
retry_info = check_retriable_error(err, self._settings._retry_settings(), attempt)
169169
if not retry_info.is_retriable:
170170
self._set_first_error(err)

ydb/topic.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
"TopicMeteringMode",
1212
"TopicReader",
1313
"TopicReaderAsyncIO",
14+
"TopicReaderSelector",
1415
"TopicReaderSettings",
1516
"TopicStatWindow",
1617
"TopicWriter",
@@ -30,6 +31,7 @@
3031

3132
from ._topic_reader.topic_reader import (
3233
PublicReaderSettings as TopicReaderSettings,
34+
PublicTopicSelector as TopicReaderSelector,
3335
)
3436

3537
from ._topic_reader.topic_reader_sync import TopicReaderSync as TopicReader
@@ -151,7 +153,7 @@ async def drop_topic(self, path: str):
151153

152154
def reader(
153155
self,
154-
topic: str,
156+
topic: Union[str, TopicReaderSelector, List[Union[str, TopicReaderSelector]]],
155157
consumer: str,
156158
buffer_size_bytes: int = 50 * 1024 * 1024,
157159
# decoders: map[codec_code] func(encoded_bytes)->decoded_bytes
@@ -300,7 +302,7 @@ def drop_topic(self, path: str):
300302

301303
def reader(
302304
self,
303-
topic: str,
305+
topic: Union[str, TopicReaderSelector, List[Union[str, TopicReaderSelector]]],
304306
consumer: str,
305307
buffer_size_bytes: int = 50 * 1024 * 1024,
306308
# decoders: map[codec_code] func(encoded_bytes)->decoded_bytes

0 commit comments

Comments
 (0)