Skip to content

Commit a1edf34

Browse files
committed
Support for ip addresses discovery in C++ and Python sdk, KIKIMR-13572
ref:9032969 sync: https://proxy.sandbox.yandex-team.ru/2700957527
1 parent afb20b7 commit a1edf34

File tree

5 files changed

+112
-31
lines changed

5 files changed

+112
-31
lines changed

ydb/aio/connection.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
_get_request_timeout,
1414
_set_server_timeouts,
1515
_RpcState as RpcState,
16+
EndpointOptions,
1617
channel_factory,
1718
YDB_DATABASE_HEADER,
1819
YDB_TRACE_ID_HEADER,
@@ -106,10 +107,17 @@ class Connection:
106107
"closing",
107108
)
108109

109-
def __init__(self, endpoint: str, driver_config: DriverConfig = None):
110+
def __init__(
111+
self,
112+
endpoint: str,
113+
driver_config: DriverConfig = None,
114+
endpoint_options: EndpointOptions = None,
115+
):
110116
global _stubs_list
111117
self.endpoint = endpoint
112-
self._channel = channel_factory(self.endpoint, driver_config, grpc.aio)
118+
self._channel = channel_factory(
119+
self.endpoint, driver_config, grpc.aio, endpoint_options=endpoint_options
120+
)
113121
self._driver_config = driver_config
114122

115123
self._stub_instances = {}

ydb/aio/pool.py

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -123,31 +123,40 @@ async def execute_discovery(self):
123123
return False
124124

125125
resolved_endpoints = set(
126-
details.endpoint for details in resolve_details.endpoints
126+
endpoint
127+
for resolved_endpoint in resolve_details.endpoints
128+
for endpoint, endpoint_options in resolved_endpoint.endpoints_with_options()
127129
)
128130
for cached_endpoint in self._cache.values():
129131
if cached_endpoint.endpoint not in resolved_endpoints:
130132
self._cache.make_outdated(cached_endpoint)
131133

132134
for resolved_endpoint in resolve_details.endpoints:
133-
if self._cache.size >= self._max_size or self._cache.already_exists(
134-
resolved_endpoint.endpoint
135-
):
136-
continue
137-
138135
if self._ssl_required and not resolved_endpoint.ssl:
139136
continue
140137

141138
if not self._ssl_required and resolved_endpoint.ssl:
142139
continue
143140

144-
endpoint = resolved_endpoint.endpoint
145141
preferred = resolve_details.self_location == resolved_endpoint.location
146142

147-
ready_connection = Connection(endpoint, self._driver_config)
148-
await ready_connection.connection_ready(ready_timeout=self._ready_timeout)
143+
for (
144+
endpoint,
145+
endpoint_options,
146+
) in resolved_endpoint.endpoints_with_options():
147+
if self._cache.size >= self._max_size or self._cache.already_exists(
148+
endpoint
149+
):
150+
continue
151+
152+
ready_connection = Connection(
153+
endpoint, self._driver_config, endpoint_options=endpoint_options
154+
)
155+
await ready_connection.connection_ready(
156+
ready_timeout=self._ready_timeout
157+
)
149158

150-
self._cache.add(ready_connection, preferred)
159+
self._cache.add(ready_connection, preferred)
151160

152161
await self._cache.cleanup_outdated()
153162
return self._cache.size > 0

ydb/connection.py

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -162,10 +162,18 @@ def _get_request_timeout(settings):
162162
return settings.timeout
163163

164164

165-
def _construct_channel_options(driver_config):
165+
class EndpointOptions(object):
166+
__slots__ = ("ssl_target_name_override",)
167+
168+
def __init__(self, ssl_target_name_override=None):
169+
self.ssl_target_name_override = ssl_target_name_override
170+
171+
172+
def _construct_channel_options(driver_config, endpoint_options=None):
166173
"""
167174
Constructs gRPC channel initialization options
168175
:param driver_config: A driver config instance
176+
:param endpoint_options: Endpoint options
169177
:return: A channel initialization options
170178
"""
171179
_max_message_size = 64 * 10 ** 6
@@ -187,6 +195,14 @@ def _construct_channel_options(driver_config):
187195
("grpc.keepalive_permit_without_calls", 0),
188196
]
189197
)
198+
if endpoint_options is not None:
199+
if endpoint_options.ssl_target_name_override:
200+
_default_connect_options.append(
201+
(
202+
"grpc.ssl_target_name_override",
203+
endpoint_options.ssl_target_name_override,
204+
)
205+
)
190206
if driver_config.channel_options is None:
191207
return _default_connect_options
192208
channel_options = copy.deepcopy(driver_config.channel_options)
@@ -276,9 +292,11 @@ def _set_server_timeouts(request, settings, default_value):
276292
_set_duration(request.operation_params.cancel_after, cancel_after)
277293

278294

279-
def channel_factory(endpoint, driver_config, channel_provider=None):
295+
def channel_factory(
296+
endpoint, driver_config, channel_provider=None, endpoint_options=None
297+
):
280298
channel_provider = channel_provider if channel_provider is not None else grpc
281-
options = _construct_channel_options(driver_config)
299+
options = _construct_channel_options(driver_config, endpoint_options)
282300
logger.debug("Channel options: {}".format(options))
283301

284302
if driver_config.root_certificates is None and not driver_config.secure_channel:
@@ -306,7 +324,7 @@ class Connection(object):
306324
"closing",
307325
)
308326

309-
def __init__(self, endpoint, driver_config=None):
327+
def __init__(self, endpoint, driver_config=None, endpoint_options=None):
310328
"""
311329
Object that wraps gRPC channel and encapsulates gRPC request execution logic
312330
:param endpoint: endpoint to connect (in pattern host:port), constructed by user or
@@ -315,7 +333,9 @@ def __init__(self, endpoint, driver_config=None):
315333
"""
316334
global _stubs_list
317335
self.endpoint = endpoint
318-
self._channel = channel_factory(self.endpoint, driver_config)
336+
self._channel = channel_factory(
337+
self.endpoint, driver_config, endpoint_options=endpoint_options
338+
)
319339
self._driver_config = driver_config
320340
self._call_states = {}
321341
self._stub_instances = {}
@@ -436,8 +456,10 @@ def __call__(
436456
self._finish_call(rpc_state)
437457

438458
@classmethod
439-
def ready_factory(cls, endpoint, driver_config, ready_timeout=10):
440-
candidate = cls(endpoint, driver_config)
459+
def ready_factory(
460+
cls, endpoint, driver_config, ready_timeout=10, endpoint_options=None
461+
):
462+
candidate = cls(endpoint, driver_config, endpoint_options=endpoint_options)
441463
ready_future = candidate.ready_future()
442464
try:
443465
ready_future.result(timeout=ready_timeout)

ydb/pool.py

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -215,30 +215,40 @@ def execute_discovery(self):
215215
return False
216216

217217
resolved_endpoints = set(
218-
details.endpoint for details in resolve_details.endpoints
218+
endpoint
219+
for resolved_endpoint in resolve_details.endpoints
220+
for endpoint, endpoint_options in resolved_endpoint.endpoints_with_options()
219221
)
220222
for cached_endpoint in self._cache.values():
221223
if cached_endpoint.endpoint not in resolved_endpoints:
222224
self._cache.make_outdated(cached_endpoint)
223225

224226
for resolved_endpoint in resolve_details.endpoints:
225-
if self._cache.size >= self._max_size or self._cache.already_exists(
226-
resolved_endpoint.endpoint
227-
):
228-
continue
229-
230227
if self._ssl_required and not resolved_endpoint.ssl:
231228
continue
232229

233230
if not self._ssl_required and resolved_endpoint.ssl:
234231
continue
235232

236-
endpoint = resolved_endpoint.endpoint
237233
preferred = resolve_details.self_location == resolved_endpoint.location
238-
ready_connection = connection_impl.Connection.ready_factory(
239-
endpoint, self._driver_config, self._ready_timeout
240-
)
241-
self._cache.add(ready_connection, preferred)
234+
235+
for (
236+
endpoint,
237+
endpoint_options,
238+
) in resolved_endpoint.endpoints_with_options():
239+
if (
240+
self._cache.size >= self._max_size
241+
or self._cache.already_exists(endpoint)
242+
):
243+
continue
244+
245+
ready_connection = connection_impl.Connection.ready_factory(
246+
endpoint,
247+
self._driver_config,
248+
self._ready_timeout,
249+
endpoint_options=endpoint_options,
250+
)
251+
self._cache.add(ready_connection, preferred)
242252

243253
self._cache.cleanup_outdated()
244254

ydb/resolver.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,46 @@
1010

1111

1212
class EndpointInfo(object):
13-
__slots__ = ("address", "endpoint", "location", "port", "ssl")
13+
__slots__ = (
14+
"address",
15+
"endpoint",
16+
"location",
17+
"port",
18+
"ssl",
19+
"ipv4_addrs",
20+
"ipv6_addrs",
21+
"ssl_target_name_override",
22+
)
1423

1524
def __init__(self, endpoint_info):
1625
self.address = endpoint_info.address
1726
self.endpoint = "%s:%s" % (endpoint_info.address, endpoint_info.port)
1827
self.location = endpoint_info.location
1928
self.port = endpoint_info.port
2029
self.ssl = endpoint_info.ssl
30+
self.ipv4_addrs = tuple(endpoint_info.ip_v4)
31+
self.ipv6_addrs = tuple(endpoint_info.ip_v6)
32+
self.ssl_target_name_override = endpoint_info.ssl_target_name_override
33+
34+
def endpoints_with_options(self):
35+
if self.ssl:
36+
if self.ssl_target_name_override:
37+
endpoint_options = conn_impl.EndpointOptions(
38+
self.ssl_target_name_override
39+
)
40+
elif self.ipv6_addrs or self.ipv4_addrs:
41+
endpoint_options = conn_impl.EndpointOptions(self.address)
42+
else:
43+
endpoint_options = None
44+
else:
45+
endpoint_options = None
46+
if self.ipv6_addrs or self.ipv4_addrs:
47+
for ipv6addr in self.ipv6_addrs:
48+
yield ("ipv6:[%s]:%s" % (ipv6addr, self.port), endpoint_options)
49+
for ipv4addr in self.ipv4_addrs:
50+
yield ("ipv4:%s:%s" % (ipv4addr, self.port), endpoint_options)
51+
else:
52+
yield (self.endpoint, endpoint_options)
2153

2254
def __str__(self):
2355
return "<Endpoint %s, location %s, ssl: %s>" % (

0 commit comments

Comments
 (0)