Skip to content

Commit dbc22a1

Browse files
committed
add topic control plane async
1 parent 5a107cc commit dbc22a1

File tree

12 files changed

+853
-61
lines changed

12 files changed

+853
-61
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Add control plane operations for topic api: create, drop
2+
13
## 3.0.1b4 ##
24
* Initial implementation of topic reader
35

tests/conftest.py

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import pytest
55
import ydb
66
import time
7-
import subprocess
7+
from ydb import issues
88

99

1010
@pytest.fixture(autouse=True, scope="session")
@@ -105,30 +105,21 @@ def topic_consumer():
105105

106106

107107
@pytest.fixture()
108-
def topic_path(endpoint, topic_consumer) -> str:
109-
subprocess.run(
110-
"""docker-compose exec -T ydb /ydb -e grpc://%s -d /local topic drop /local/test-topic"""
111-
% endpoint,
112-
shell=True,
113-
capture_output=True,
114-
)
115-
res = subprocess.run(
116-
"""docker-compose exec -T ydb /ydb -e grpc://%s -d /local topic create /local/test-topic"""
117-
% endpoint,
118-
shell=True,
119-
capture_output=True,
120-
)
121-
assert res.returncode == 0, res.stderr + res.stdout
108+
@pytest.mark.asyncio()
109+
async def topic_path(driver, topic_consumer, database) -> str:
110+
topic_path = database + "/test-topic"
111+
112+
try:
113+
await driver.topic_client.drop_topic(topic_path)
114+
except issues.SchemeError:
115+
pass
122116

123-
res = subprocess.run(
124-
"""docker-compose exec -T ydb /ydb -e grpc://%s -d /local topic consumer add --consumer %s /local/test-topic"""
125-
% (endpoint, topic_consumer),
126-
shell=True,
127-
capture_output=True,
117+
await driver.topic_client.create_topic(
118+
path=topic_path,
119+
consumers=[topic_consumer],
128120
)
129-
assert res.returncode == 0, res.stderr + res.stdout
130121

131-
return "/local/test-topic"
122+
return topic_path
132123

133124

134125
@pytest.fixture()

tests/topics/test_control_plane.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import os.path
2+
3+
import pytest
4+
5+
from ydb import issues
6+
7+
8+
@pytest.mark.asyncio
9+
class TestTopicClientControlPlaneAsyncIO:
10+
async def test_create_topic(self, driver, database):
11+
client = driver.topic_client
12+
13+
topic_path = database + "/my-test-topic"
14+
15+
await client.create_topic(topic_path)
16+
17+
with pytest.raises(issues.SchemeError):
18+
# double create is ok - try create topic with bad path
19+
await client.create_topic(database)
20+
21+
async def test_drop_topic(self, driver, topic_path):
22+
client = driver.topic_client
23+
24+
await client.drop_topic(topic_path)
25+
26+
with pytest.raises(issues.SchemeError):
27+
await client.drop_topic(topic_path)
28+
29+
async def test_describe_topic(self, driver, topic_path: str, topic_consumer):
30+
res = await driver.topic_client.describe(topic_path)
31+
32+
assert res.self.name == os.path.basename(topic_path)
33+
34+
has_consumer = False
35+
for consumer in res.consumers:
36+
if consumer.name == topic_consumer:
37+
has_consumer = True
38+
break
39+
40+
assert has_consumer

ydb/_apis.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,5 +103,8 @@ class TableService(object):
103103
class TopicService(object):
104104
Stub = ydb_topic_v1_pb2_grpc.TopicServiceStub
105105

106+
CreateTopic = "CreateTopic"
107+
DescribeTopic = "DescribeTopic"
108+
DropTopic = "DropTopic"
106109
StreamRead = "StreamRead"
107110
StreamWrite = "StreamWrite"

ydb/_grpc/grpcwrapper/common_utils.py

Lines changed: 74 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,25 @@
1+
from __future__ import annotations
2+
13
import abc
24
import asyncio
5+
import datetime
36
import typing
7+
from typing import (
8+
Optional,
9+
Any,
10+
Iterator,
11+
AsyncIterator,
12+
Callable,
13+
Iterable,
14+
Union,
15+
Coroutine,
16+
)
417
from dataclasses import dataclass
518

619
import grpc
720
from google.protobuf.message import Message
21+
from google.protobuf.duration_pb2 import Duration as ProtoDuration
22+
from google.protobuf.timestamp_pb2 import Timestamp as ProtoTimeStamp
823

924
import ydb.aio
1025

@@ -21,14 +36,35 @@
2136
class IFromProto(abc.ABC):
2237
@staticmethod
2338
@abc.abstractmethod
24-
def from_proto(msg: Message) -> typing.Any:
25-
pass
39+
def from_proto(msg: Message) -> Any:
40+
...
41+
42+
43+
class IFromProtoWithProtoType(IFromProto):
44+
@staticmethod
45+
@abc.abstractmethod
46+
def empty_proto_message() -> Message:
47+
...
2648

2749

2850
class IToProto(abc.ABC):
2951
@abc.abstractmethod
3052
def to_proto(self) -> Message:
31-
pass
53+
...
54+
55+
56+
class IFromPublic(abc.ABC):
57+
58+
@staticmethod
59+
@abc.abstractmethod
60+
def from_public(o: typing.Any) -> typing.Any:
61+
...
62+
63+
64+
class IToPublic(abc.ABC):
65+
@abc.abstractmethod
66+
def to_public(self) -> typing.Any:
67+
...
3268

3369

3470
class UnknownGrpcMessageError(issues.Error):
@@ -76,7 +112,7 @@ def __next__(self):
76112

77113

78114
class SyncIteratorToAsyncIterator:
79-
def __init__(self, sync_iterator: typing.Iterator):
115+
def __init__(self, sync_iterator: Iterator):
80116
self._sync_iterator = sync_iterator
81117

82118
def __aiter__(self):
@@ -92,21 +128,21 @@ async def __anext__(self):
92128

93129
class IGrpcWrapperAsyncIO(abc.ABC):
94130
@abc.abstractmethod
95-
async def receive(self) -> typing.Any:
131+
async def receive(self) -> Any:
96132
...
97133

98134
@abc.abstractmethod
99135
def write(self, wrap_message: IToProto):
100136
...
101137

102138

103-
SupportedDriverType = typing.Union[ydb.Driver, ydb.aio.Driver]
139+
SupportedDriverType = Union[ydb.Driver, ydb.aio.Driver]
104140

105141

106142
class GrpcWrapperAsyncIO(IGrpcWrapperAsyncIO):
107143
from_client_grpc: asyncio.Queue
108-
from_server_grpc: typing.AsyncIterator
109-
convert_server_grpc_to_wrapper: typing.Callable[[typing.Any], typing.Any]
144+
from_server_grpc: AsyncIterator
145+
convert_server_grpc_to_wrapper: Callable[[Any], Any]
110146
_connection_state: str
111147

112148
def __init__(self, convert_server_grpc_to_wrapper):
@@ -140,7 +176,7 @@ async def _start_sync_driver(self, driver: ydb.Driver, stub, method):
140176
)
141177
self.from_server_grpc = SyncIteratorToAsyncIterator(stream_call.__iter__())
142178

143-
async def receive(self) -> typing.Any:
179+
async def receive(self) -> Any:
144180
# todo handle grpc exceptions and convert it to internal exceptions
145181
try:
146182
grpc_message = await self.from_server_grpc.__anext__()
@@ -168,7 +204,7 @@ class ServerStatus(IFromProto):
168204
def __init__(
169205
self,
170206
status: issues.StatusCode,
171-
issues: typing.Iterable[typing.Any],
207+
issues: Iterable[Any],
172208
):
173209
self.status = status
174210
self.issues = issues
@@ -178,7 +214,7 @@ def __str__(self):
178214

179215
@staticmethod
180216
def from_proto(
181-
msg: typing.Union[
217+
msg: Union[
182218
ydb_topic_pb2.StreamReadMessage.FromServer,
183219
ydb_topic_pb2.StreamWriteMessage.FromServer,
184220
]
@@ -198,11 +234,37 @@ def issue_to_str(cls, issue: ydb_issue_message_pb2.IssueMessage):
198234

199235

200236
def callback_from_asyncio(
201-
callback: typing.Union[typing.Callable, typing.Coroutine]
237+
callback: Union[Callable, Coroutine]
202238
) -> [asyncio.Future, asyncio.Task]:
203239
loop = asyncio.get_running_loop()
204240

205241
if asyncio.iscoroutinefunction(callback):
206242
return loop.create_task(callback())
207243
else:
208244
return loop.run_in_executor(None, callback)
245+
246+
247+
def proto_duration_from_timedelta(t: Optional[datetime.timedelta]) -> ProtoDuration:
248+
if t is None:
249+
return None
250+
res = ProtoDuration()
251+
res.FromTimedelta(t)
252+
253+
254+
def proto_timestamp_from_datetime(t: Optional[datetime.datetime]) -> ProtoTimeStamp:
255+
if t is None:
256+
return None
257+
258+
res = ProtoTimeStamp()
259+
res.FromDatetime(t)
260+
261+
262+
def datetime_from_proto_timestamp(ts: Optional[ProtoTimeStamp]) -> Optional[datetime.datetime]:
263+
if ts is None:
264+
return None
265+
return ts.ToDatetime()
266+
267+
def timedelta_from_proto_duration(d: Optional[ProtoDuration]) -> Optional[datetime.timedelta]:
268+
if d is None:
269+
return None
270+
return d.ToTimedelta()
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import datetime
2+
import enum
3+
from dataclasses import dataclass
4+
from typing import List
5+
6+
7+
@dataclass
8+
class Entry:
9+
name: str
10+
owner: str
11+
type: "Entry.Type"
12+
effective_permissions: "Permissions"
13+
permissions: "Permissions"
14+
size_bytes: int
15+
created_at: datetime.datetime
16+
17+
class Type(enum.IntEnum):
18+
UNSPECIFIED = 0
19+
DIRECTORY = 1
20+
TABLE = 2
21+
PERS_QUEUE_GROUP = 3
22+
DATABASE = 4
23+
RTMR_VOLUME = 5
24+
BLOCK_STORE_VOLUME = 6
25+
COORDINATION_NODE = 7
26+
COLUMN_STORE = 12
27+
COLUMN_TABLE = 13
28+
SEQUENCE = 15
29+
REPLICATION = 16
30+
TOPIC = 17
31+
32+
33+
@dataclass
34+
class Permissions:
35+
subject: str
36+
permission_names: List[str]

0 commit comments

Comments
 (0)