Skip to content

Commit bb9625c

Browse files
committed
sync
1 parent 29048b8 commit bb9625c

File tree

10 files changed

+83
-37
lines changed

10 files changed

+83
-37
lines changed

tests/__init__.py

Whitespace-only changes.

tests/conftest.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,18 @@ async def topic_path(driver, topic_consumer, database) -> str:
160160

161161
@pytest.fixture()
162162
@pytest.mark.asyncio()
163-
async def topic_with_messages(driver, topic_path):
163+
async def topic_with_messages(driver, topic_consumer, database):
164+
topic_path = database + "/test-topic-with-messages"
165+
try:
166+
await driver.topic_client.drop_topic(topic_path)
167+
except issues.SchemeError:
168+
pass
169+
170+
await driver.topic_client.create_topic(
171+
path=topic_path,
172+
consumers=[topic_consumer],
173+
)
174+
164175
writer = driver.topic_client.writer(topic_path, producer_id="fixture-producer-id", codec=ydb.TopicCodec.RAW)
165176
await writer.write_with_ack(
166177
[
@@ -175,6 +186,7 @@ async def topic_with_messages(driver, topic_path):
175186
]
176187
)
177188
await writer.close()
189+
return topic_path
178190

179191

180192
@pytest.fixture()

tests/ssl/__init__.py

Whitespace-only changes.

tests/table/__init__.py

Whitespace-only changes.

tests/topics/__init__.py

Whitespace-only changes.

tests/topics/test_topic_reader.py

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55

66
@pytest.mark.asyncio
77
class TestTopicReaderAsyncIO:
8-
async def test_read_batch(self, driver, topic_path, topic_with_messages, topic_consumer):
9-
reader = driver.topic_client.reader(topic_path, topic_consumer)
8+
async def test_read_batch(self, driver, topic_with_messages, topic_consumer):
9+
reader = driver.topic_client.reader(topic_with_messages, topic_consumer)
1010
batch = await reader.receive_batch()
1111

1212
assert batch is not None
@@ -18,30 +18,30 @@ async def test_link_to_client(self, driver, topic_path, topic_consumer):
1818
reader = driver.topic_client.reader(topic_path, topic_consumer)
1919
assert reader._parent is driver.topic_client
2020

21-
async def test_read_message(self, driver, topic_path, topic_with_messages, topic_consumer):
22-
reader = driver.topic_client.reader(topic_path, topic_consumer)
21+
async def test_read_message(self, driver, topic_with_messages, topic_consumer):
22+
reader = driver.topic_client.reader(topic_with_messages, topic_consumer)
2323
msg = await reader.receive_message()
2424

2525
assert msg is not None
2626
assert msg.seqno
2727

2828
await reader.close()
2929

30-
async def test_read_and_commit_with_close_reader(self, driver, topic_path, topic_with_messages, topic_consumer):
31-
async with driver.topic_client.reader(topic_path, topic_consumer) as reader:
30+
async def test_read_and_commit_with_close_reader(self, driver, topic_with_messages, topic_consumer):
31+
async with driver.topic_client.reader(topic_with_messages, topic_consumer) as reader:
3232
message = await reader.receive_message()
3333
reader.commit(message)
3434

35-
async with driver.topic_client.reader(topic_path, topic_consumer) as reader:
35+
async with driver.topic_client.reader(topic_with_messages, topic_consumer) as reader:
3636
message2 = await reader.receive_message()
3737
assert message != message2
3838

39-
async def test_read_and_commit_with_ack(self, driver, topic_path, topic_with_messages, topic_consumer):
40-
reader = driver.topic_client.reader(topic_path, topic_consumer)
39+
async def test_read_and_commit_with_ack(self, driver, topic_with_messages, topic_consumer):
40+
reader = driver.topic_client.reader(topic_with_messages, topic_consumer)
4141
batch = await reader.receive_batch()
4242
await reader.commit_with_ack(batch)
4343

44-
reader = driver.topic_client.reader(topic_path, topic_consumer)
44+
reader = driver.topic_client.reader(topic_with_messages, topic_consumer)
4545
batch2 = await reader.receive_batch()
4646
assert batch.messages[0] != batch2.messages[0]
4747

@@ -73,8 +73,8 @@ def decode(b: bytes):
7373

7474

7575
class TestTopicReaderSync:
76-
def test_read_batch(self, driver_sync, topic_path, topic_with_messages, topic_consumer):
77-
reader = driver_sync.topic_client.reader(topic_path, topic_consumer)
76+
def test_read_batch(self, driver_sync, topic_with_messages, topic_consumer):
77+
reader = driver_sync.topic_client.reader(topic_with_messages, topic_consumer)
7878
batch = reader.receive_batch()
7979

8080
assert batch is not None
@@ -86,30 +86,30 @@ def test_link_to_client(self, driver_sync, topic_path, topic_consumer):
8686
reader = driver_sync.topic_client.reader(topic_path, topic_consumer)
8787
assert reader._parent is driver_sync.topic_client
8888

89-
def test_read_message(self, driver_sync, topic_path, topic_with_messages, topic_consumer):
90-
reader = driver_sync.topic_client.reader(topic_path, topic_consumer)
89+
def test_read_message(self, driver_sync, topic_with_messages, topic_consumer):
90+
reader = driver_sync.topic_client.reader(topic_with_messages, topic_consumer)
9191
msg = reader.receive_message()
9292

9393
assert msg is not None
9494
assert msg.seqno
9595

9696
reader.close()
9797

98-
def test_read_and_commit_with_close_reader(self, driver_sync, topic_path, topic_with_messages, topic_consumer):
99-
with driver_sync.topic_client.reader(topic_path, topic_consumer) as reader:
98+
def test_read_and_commit_with_close_reader(self, driver_sync, topic_with_messages, topic_consumer):
99+
with driver_sync.topic_client.reader(topic_with_messages, topic_consumer) as reader:
100100
message = reader.receive_message()
101101
reader.commit(message)
102102

103-
with driver_sync.topic_client.reader(topic_path, topic_consumer) as reader:
103+
with driver_sync.topic_client.reader(topic_with_messages, topic_consumer) as reader:
104104
message2 = reader.receive_message()
105105
assert message != message2
106106

107-
def test_read_and_commit_with_ack(self, driver_sync, topic_path, topic_with_messages, topic_consumer):
108-
reader = driver_sync.topic_client.reader(topic_path, topic_consumer)
107+
def test_read_and_commit_with_ack(self, driver_sync, topic_with_messages, topic_consumer):
108+
reader = driver_sync.topic_client.reader(topic_with_messages, topic_consumer)
109109
batch = reader.receive_batch()
110110
reader.commit_with_ack(batch)
111111

112-
reader = driver_sync.topic_client.reader(topic_path, topic_consumer)
112+
reader = driver_sync.topic_client.reader(topic_with_messages, topic_consumer)
113113
batch2 = reader.receive_batch()
114114
assert batch.messages[0] != batch2.messages[0]
115115

ydb/_grpc/grpcwrapper/common_utils.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -274,14 +274,15 @@ async def to_thread(func, /, *args, **kwargs):
274274
return await loop.run_in_executor(None, func_call)
275275

276276

277-
def proto_duration_from_timedelta(t: Optional[datetime.timedelta]) -> ProtoDuration:
277+
def proto_duration_from_timedelta(t: Optional[datetime.timedelta]) -> Optional[ProtoDuration]:
278278
if t is None:
279279
return None
280+
280281
res = ProtoDuration()
281282
res.FromTimedelta(t)
282283

283284

284-
def proto_timestamp_from_datetime(t: Optional[datetime.datetime]) -> ProtoTimeStamp:
285+
def proto_timestamp_from_datetime(t: Optional[datetime.datetime]) -> Optional[ProtoTimeStamp]:
285286
if t is None:
286287
return None
287288

ydb/_grpc/grpcwrapper/ydb_topic.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -410,17 +410,23 @@ def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.InitRequest:
410410
class TopicReadSettings(IToProto):
411411
path: str
412412
partition_ids: List[int] = field(default_factory=list)
413-
max_lag_seconds: Union[datetime.timedelta, None] = None
414-
read_from: Union[int, float, datetime.datetime, None] = None
413+
max_lag: Optional[datetime.timedelta] = None
414+
read_from: Optional[datetime.datetime] = None
415415

416416
def to_proto(
417417
self,
418418
) -> ydb_topic_pb2.StreamReadMessage.InitRequest.TopicReadSettings:
419419
res = ydb_topic_pb2.StreamReadMessage.InitRequest.TopicReadSettings()
420420
res.path = self.path
421421
res.partition_ids.extend(self.partition_ids)
422-
if self.max_lag_seconds is not None:
423-
res.max_lag = proto_duration_from_timedelta(self.max_lag_seconds)
422+
max_lag = proto_duration_from_timedelta(self.max_lag)
423+
if max_lag is not None:
424+
res.max_lag = max_lag
425+
426+
read_from = proto_timestamp_from_datetime(self.read_from)
427+
if read_from is not None:
428+
res.read_from = read_from
429+
424430
return res
425431

426432
@dataclass

ydb/_topic_reader/topic_reader.py

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,39 @@
1414
from .._grpc.grpcwrapper.ydb_topic import StreamReadMessage, OffsetsRange
1515

1616

17-
class Selector:
17+
class PublicTopicSelector:
1818
path: str
1919
partitions: Union[None, int, List[int]]
20-
read_from_timestamp_ms: Optional[int]
21-
max_time_lag_ms: Optional[int]
20+
read_from: Optional[datetime.datetime]
21+
max_lag: Optional[datetime.timedelta]
2222

2323
def __init__(self, path, *, partitions: Union[None, int, List[int]] = None):
2424
self.path = path
2525
self.partitions = partitions
2626

27+
def _to_topic_read_settings(self)->StreamReadMessage.InitRequest.TopicReadSettings:
28+
partitions = self.partitions
29+
if partitions is None:
30+
partitions = []
31+
32+
elif not isinstance(partitions, list):
33+
partitions = [partitions]
34+
35+
return StreamReadMessage.InitRequest.TopicReadSettings(
36+
path=self.path,
37+
partition_ids=partitions,
38+
max_lag=self.max_lag,
39+
read_from=self.read_from,
40+
)
41+
42+
43+
TopicSelectorTypes = Union[str, PublicTopicSelector, List[Union[str, PublicTopicSelector]]]
44+
2745

2846
@dataclass
2947
class PublicReaderSettings:
3048
consumer: str
31-
topic: str
49+
topic: TopicSelectorTypes
3250
buffer_size_bytes: int = 50 * 1024 * 1024
3351

3452
decoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None
@@ -39,12 +57,21 @@ class PublicReaderSettings:
3957
update_token_interval: Union[int, float] = 3600
4058

4159
def _init_message(self) -> StreamReadMessage.InitRequest:
60+
if isinstance(self.topic, list):
61+
selectors = self.topic
62+
else:
63+
selectors = [self.topic]
64+
65+
for index, selector in enumerate(selectors):
66+
if isinstance(selector, str):
67+
selectors[index] = PublicTopicSelector(path=selector)
68+
elif isinstance(selector, PublicTopicSelector):
69+
pass
70+
else:
71+
raise TypeError("Value of %s not supported as topic selector" % type(selector))
72+
4273
return StreamReadMessage.InitRequest(
43-
topics_read_settings=[
44-
StreamReadMessage.InitRequest.TopicReadSettings(
45-
path=self.topic,
46-
)
47-
],
74+
topics_read_settings=list(map(PublicTopicSelector._to_topic_read_settings, selectors)), # type: ignore
4875
consumer=self.consumer,
4976
)
5077

ydb/_topic_reader/topic_reader_asyncio_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -576,7 +576,7 @@ async def test_init_reader(self, stream, default_reader_settings):
576576
StreamReadMessage.InitRequest.TopicReadSettings(
577577
path="/local/test-topic",
578578
partition_ids=[],
579-
max_lag_seconds=None,
579+
max_lag=None,
580580
read_from=None,
581581
)
582582
],

0 commit comments

Comments
 (0)