Skip to content

Commit 16f347c

Browse files
committed
add sync client
1 parent dbc22a1 commit 16f347c

File tree

5 files changed

+134
-5
lines changed

5 files changed

+134
-5
lines changed

tests/conftest.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,23 @@ async def driver(endpoint, database, event_loop):
9999
await driver.stop(timeout=10)
100100

101101

102+
@pytest.fixture()
103+
async def driver_sync(endpoint, database, event_loop):
104+
driver_config = ydb.DriverConfig(
105+
endpoint,
106+
database,
107+
credentials=ydb.construct_credentials_from_environ(),
108+
root_certificates=ydb.load_ydb_root_certificate(),
109+
)
110+
111+
driver = ydb.Driver(driver_config=driver_config)
112+
driver.wait(timeout=15)
113+
114+
yield driver
115+
116+
driver.stop(timeout=10)
117+
118+
102119
@pytest.fixture()
103120
def topic_consumer():
104121
return "fixture-consumer"

tests/topics/test_control_plane.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,37 @@ async def test_describe_topic(self, driver, topic_path: str, topic_consumer):
3838
break
3939

4040
assert has_consumer
41+
42+
43+
class TestTopicClientControlPlane:
44+
def test_create_topic(self, driver_sync, database):
45+
client = driver_sync.topic_client
46+
47+
topic_path = database + "/my-test-topic"
48+
49+
client.create_topic(topic_path)
50+
51+
with pytest.raises(issues.SchemeError):
52+
# double create is ok - try create topic with bad path
53+
client.create_topic(database)
54+
55+
def test_drop_topic(self, driver_sync, topic_path):
56+
client = driver_sync.topic_client
57+
58+
client.drop_topic(topic_path)
59+
60+
with pytest.raises(issues.SchemeError):
61+
client.drop_topic(topic_path)
62+
63+
def test_describe_topic(self, driver_sync, topic_path: str, topic_consumer):
64+
res = driver_sync.topic_client.describe(topic_path)
65+
66+
assert res.self.name == os.path.basename(topic_path)
67+
68+
has_consumer = False
69+
for consumer in res.consumers:
70+
if consumer.name == topic_consumer:
71+
has_consumer = True
72+
break
73+
74+
assert has_consumer

ydb/aio/driver.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44
import ydb
55
from .. import _utilities
66
from ydb.driver import get_config
7-
from .. import topic
8-
97

108
def default_credentials(credentials=None):
119
if credentials is not None:
@@ -81,6 +79,8 @@ def __init__(
8179
credentials=None,
8280
**kwargs
8381
):
82+
from .. import topic # local import for prevent cycle import error
83+
8484
config = get_config(
8585
driver_config,
8686
connection_string,

ydb/driver.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,8 @@ def __init__(
231231
:param database: A database path
232232
:param credentials: A credentials. If not specifed credentials constructed by default.
233233
"""
234+
from . import topic # local import for prevent cycle import error
235+
234236
driver_config = get_config(
235237
driver_config,
236238
connection_string,
@@ -246,3 +248,4 @@ def __init__(
246248

247249
self.scheme_client = scheme.SchemeClient(self)
248250
self.table_client = table.TableClient(self, driver_config.table_client_settings)
251+
self.topic_client = topic.TopicClient(self, driver_config.topic_client_settings)

ydb/topic.py

Lines changed: 78 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from . import aio, Credentials, _apis
66

77
from . import scheme
8+
from . import driver
89

910
from ._grpc.grpcwrapper.ydb_topic_public_types import (
1011
DropTopicRequestParams as _DropTopicRequestParams,
@@ -43,6 +44,14 @@
4344

4445
from ._grpc.grpcwrapper import ydb_topic as _ydb_topic
4546
from ._grpc.grpcwrapper import ydb_topic_public_types as _ydb_topic_public_types
47+
from ._grpc.grpcwrapper.ydb_topic_public_types import (
48+
PublicDescribeTopicResult as TopicDescription,
49+
PublicMultipleWindowsStat as TopicStatWindow,
50+
PublicPartitionStats as TopicPartitionStats,
51+
PublicCodec as TopicCodec,
52+
PublicConsumer as TopicConsumer,
53+
PublicMeteringMode as TopicMeteringMode,
54+
)
4655

4756

4857
class TopicClientAsyncIO:
@@ -94,7 +103,7 @@ async def create_topic(
94103
_wrap_operation,
95104
)
96105

97-
async def describe(self, path: str, include_stats: bool = False):
106+
async def describe(self, path: str, include_stats: bool = False) -> TopicDescription:
98107
args = locals().copy()
99108
del args["self"]
100109
req = _DescribeTopicRequestParams(**args)
@@ -164,8 +173,74 @@ def topic_writer(
164173

165174

166175
class TopicClient:
167-
def __init__(self, driver, topic_client_settings: "TopicClientSettings" = None):
168-
pass
176+
_driver: driver.Driver
177+
_credentials: Union[Credentials, None]
178+
179+
def __init__(self, driver: driver.Driver, topic_client_settings: "TopicClientSettings" = None):
180+
self._driver = driver
181+
182+
def create_topic(
183+
self,
184+
path: str,
185+
min_active_partitions: Optional[
186+
int
187+
] = None, # Minimum partition count auto merge would stop working at.
188+
partition_count_limit: Optional[
189+
int
190+
] = None, # Limit for total partition count, including active (open for write) and read-only partitions.
191+
retention_period: Optional[
192+
datetime.timedelta
193+
] = None, # How long data in partition should be stored
194+
retention_storage_mb: Optional[
195+
int
196+
] = None, # How much data in partition should be stored
197+
# List of allowed codecs for writers.
198+
# Writes with codec not from this list are forbidden.
199+
supported_codecs: Optional[List[Union[TopicCodec, int]]] = None,
200+
partition_write_speed_bytes_per_second: Optional[
201+
int
202+
] = None, # Partition write speed in bytes per second
203+
partition_write_burst_bytes: Optional[
204+
int
205+
] = None, # Burst size for write in partition, in bytes
206+
# User and server attributes of topic. Server attributes starts from "_" and will be validated by server.
207+
attributes: Optional[Dict[str, str]] = None,
208+
# List of consumers for this topic
209+
consumers: Optional[List[Union[TopicConsumer, str]]] = None,
210+
# Metering mode for the topic in a serverless database
211+
metering_mode: Optional[TopicMeteringMode] = None,
212+
):
213+
args = locals().copy()
214+
del args["self"]
215+
req = _ydb_topic_public_types.CreateTopicRequestParams(**args)
216+
req = _ydb_topic.CreateTopicRequest.from_public(req)
217+
self._driver(
218+
req.to_proto(),
219+
_apis.TopicService.Stub,
220+
_apis.TopicService.CreateTopic,
221+
_wrap_operation,
222+
)
223+
224+
def describe(self, path: str, include_stats: bool = False) -> TopicDescription:
225+
args = locals().copy()
226+
del args["self"]
227+
req = _DescribeTopicRequestParams(**args)
228+
res = self._driver(
229+
req.to_proto(),
230+
_apis.TopicService.Stub,
231+
_apis.TopicService.DescribeTopic,
232+
_create_result_wrapper(_ydb_topic.DescribeTopicResult),
233+
) # type: _ydb_topic.DescribeTopicResult
234+
return res.to_public()
235+
236+
def drop_topic(self, path: str):
237+
req = _DropTopicRequestParams(path=path)
238+
self._driver(
239+
req.to_proto(),
240+
_apis.TopicService.Stub,
241+
_apis.TopicService.DropTopic,
242+
_wrap_operation,
243+
)
169244

170245
def topic_reader(
171246
self,

0 commit comments

Comments
 (0)