Skip to content

Commit 954db7b

Browse files
committed
feat: srw
1 parent e8b2445 commit 954db7b

File tree

8 files changed

+1165
-383
lines changed

8 files changed

+1165
-383
lines changed

aws_advanced_python_wrapper/plugin_service.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,8 @@
8080
from aws_advanced_python_wrapper.hostinfo import HostInfo, HostRole
8181
from aws_advanced_python_wrapper.iam_plugin import IamAuthPluginFactory
8282
from aws_advanced_python_wrapper.plugin import CanReleaseResources
83-
from aws_advanced_python_wrapper.read_write_splitting_plugin import \
84-
ReadWriteSplittingPluginFactory
83+
from aws_advanced_python_wrapper.read_write_splitting_plugin import ReadWriteSplittingPluginFactory
84+
from aws_advanced_python_wrapper.simple_read_write_splitting_plugin import SimpleReadWriteSplittingPluginFactory
8585
from aws_advanced_python_wrapper.stale_dns_plugin import StaleDnsPluginFactory
8686
from aws_advanced_python_wrapper.utils.cache_map import CacheMap
8787
from aws_advanced_python_wrapper.utils.decorators import \
@@ -760,6 +760,7 @@ class PluginManager(CanReleaseResources):
760760
"host_monitoring_v2": HostMonitoringV2PluginFactory,
761761
"failover": FailoverPluginFactory,
762762
"read_write_splitting": ReadWriteSplittingPluginFactory,
763+
"srw": SimpleReadWriteSplittingPluginFactory,
763764
"fastest_response_strategy": FastestResponseStrategyPluginFactory,
764765
"stale_dns": StaleDnsPluginFactory,
765766
"custom_endpoint": CustomEndpointPluginFactory,
@@ -784,6 +785,7 @@ class PluginManager(CanReleaseResources):
784785
AuroraConnectionTrackerPluginFactory: 100,
785786
StaleDnsPluginFactory: 200,
786787
ReadWriteSplittingPluginFactory: 300,
788+
SimpleReadWriteSplittingPluginFactory: 310,
787789
FailoverPluginFactory: 400,
788790
HostMonitoringPluginFactory: 500,
789791
HostMonitoringV2PluginFactory: 510,

aws_advanced_python_wrapper/read_write_splitting_plugin.py

Lines changed: 254 additions & 120 deletions
Large diffs are not rendered by default.

aws_advanced_python_wrapper/resources/aws_advanced_python_wrapper_messages.properties

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ OpenTelemetryFactory.WrongParameterType="[OpenTelemetryFactory] Wrong parameter
286286

287287
Plugin.UnsupportedMethod=[Plugin] '{}' is not supported by this plugin.
288288

289-
PluginManager.ConfigurationProfileNotFound=PluginManager] Configuration profile '{}' not found.
289+
PluginManager.ConfigurationProfileNotFound=[PluginManager] Configuration profile '{}' not found.
290290
PluginManager.InvalidPlugin=[PluginManager] Invalid plugin requested: '{}'.
291291
PluginManager.MethodInvokedAgainstOldConnection = [PluginManager] The internal connection has changed since '{}' was created. This is likely due to failover or read-write splitting functionality. To ensure you are using the updated connection, please re-create Cursor objects after failover and/or setting readonly.
292292
PluginManager.PipelineNone=[PluginManager] A pipeline was requested but the created pipeline evaluated to None.
@@ -357,8 +357,9 @@ ReadWriteSplittingPlugin.ErrorVerifyingInitialHostSpecRole=[ReadWriteSplittingPl
357357
ReadWriteSplittingPlugin.ExceptionWhileExecutingCommand=[ReadWriteSplittingPlugin] Detected an exception while executing a command: '{}'
358358
ReadWriteSplittingPlugin.ExecutingAgainstOldConnection=[ReadWriteSplittingPlugin] Executing method against old connection: '{}'
359359
ReadWriteSplittingPlugin.FailedToConnectToReader=[ReadWriteSplittingPlugin] Failed to connect to reader host: '{}'
360+
ReadWriteSplittingPlugin.FailedToConnectToWriter=[ReadWriteSplittingPlugin] Failed to connect to writer host: '{}'
360361
ReadWriteSplittingPlugin.FailoverExceptionWhileExecutingCommand=[ReadWriteSplittingPlugin] Detected a failover exception while executing a command: '{}'
361-
ReadWriteSplittingPlugin.FallbackToWriter=[ReadWriteSplittingPlugin] Failed to switch to a reader; the current writer will be used as a fallback: '{}'
362+
ReadWriteSplittingPlugin.FallbackToCurrentConnection=[ReadWriteSplittingPlugin] Failed to switch to a reader; the current connection will be used as a fallback: '{}'
362363
ReadWriteSplittingPlugin.NoReadersAvailable=[ReadWriteSplittingPlugin] The plugin was unable to establish a reader connection to any reader instance.
363364
ReadWriteSplittingPlugin.NoReadersFound=[ReadWriteSplittingPlugin] A reader instance was requested via set_read_only, but there are no readers in the host list. The current writer will be used as a fallback: '{}'
364365
ReadWriteSplittingPlugin.NoWriterFound=[ReadWriteSplittingPlugin] No writer was found in the current host list. This may occur if the writer is not in the list of allowed hosts.
@@ -382,6 +383,9 @@ RoundRobinHostSelector.RoundRobinInvalidHostWeightPairs= [RoundRobinHostSelector
382383
WeightedRandomHostSelector.WeightedRandomInvalidHostWeightPairs= [WeightedRandomHostSelector] The provided host weight pairs have not been configured correctly. Please ensure the provided host weight pairs is a comma separated list of pairs, each pair in the format of <host>:<weight>. Weight values must be an integer greater than or equal to the default weight value of 1. Weight pair: '{}'
383384
WeightedRandomHostSelector.WeightedRandomInvalidDefaultWeight=[WeightedRandomHostSelector] The provided default weight value is not valid. Weight values must be an integer greater than or equal to 1.
384385

386+
SimpleReadWriteSplittingPlugin.MissingRequiredConfigParameter=[SimpleReadWriteSplittingPlugin] Configuration parameter {} is required.
387+
SimpleReadWriteSplittingPlugin.IncorrectConfiguration=[SimpleReadWriteSplittingPlugin] Unable to verify connections with this current configuration. Ensure a correct value is provided to the configuration parameter {}.
388+
385389
SqlAlchemyPooledConnectionProvider.PoolNone=[SqlAlchemyPooledConnectionProvider] Attempted to find or create a pool for '{}' but the result of the attempt evaluated to None.
386390
SqlAlchemyPooledConnectionProvider.UnableToCreateDefaultKey=[SqlAlchemyPooledConnectionProvider] Unable to create a default key for internal connection pools. By default, the user parameter is used, but the given user evaluated to None or the empty string (""). Please ensure you have passed a valid user in the connection properties.
387391

Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License").
4+
# You may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import annotations
16+
17+
from enum import Enum, auto
18+
from time import perf_counter_ns, sleep
19+
from typing import TYPE_CHECKING, Callable, Optional
20+
21+
from aws_advanced_python_wrapper.host_availability import HostAvailability
22+
from aws_advanced_python_wrapper.read_write_splitting_plugin import ReadWriteSplittingConnectionManager, ConnectionHandler
23+
from aws_advanced_python_wrapper.utils.rds_url_type import RdsUrlType
24+
from aws_advanced_python_wrapper.utils.rdsutils import RdsUtils
25+
26+
if TYPE_CHECKING:
27+
from aws_advanced_python_wrapper.driver_dialect import DriverDialect
28+
from aws_advanced_python_wrapper.host_list_provider import HostListProviderService
29+
from aws_advanced_python_wrapper.pep249 import Connection
30+
from aws_advanced_python_wrapper.plugin_service import PluginService
31+
from aws_advanced_python_wrapper.utils.properties import Properties
32+
33+
from aws_advanced_python_wrapper.errors import AwsWrapperError
34+
from aws_advanced_python_wrapper.hostinfo import HostInfo, HostRole
35+
from aws_advanced_python_wrapper.plugin import PluginFactory
36+
from aws_advanced_python_wrapper.utils.messages import Messages
37+
from aws_advanced_python_wrapper.utils.properties import WrapperProperties
38+
39+
class EndpointBasedConnectionHandler(ConnectionHandler):
40+
"""Endpoint based implementation of connection handling logic."""
41+
def __init__(self, plugin_service: PluginService, props: Properties):
42+
srw_read_endpoint = WrapperProperties.SRW_READ_ENDPOINT.get(props)
43+
if srw_read_endpoint is None:
44+
raise AwsWrapperError(Messages.get_formatted("SimpleReadWriteSplittingPlugin.MissingRequiredConfigParameter", WrapperProperties.SRW_READ_ENDPOINT.name))
45+
self.read_endpoint = srw_read_endpoint
46+
47+
srw_write_endpoint = WrapperProperties.SRW_WRITE_ENDPOINT.get(props)
48+
if srw_write_endpoint is None:
49+
raise AwsWrapperError(Messages.get_formatted("SimpleReadWriteSplittingPlugin.MissingRequiredConfigParameter", WrapperProperties.SRW_WRITE_ENDPOINT.name))
50+
self.write_endpoint = srw_write_endpoint
51+
52+
self.verify_new_connections = WrapperProperties.SRW_VERIFY_NEW_CONNECTIONS.get_bool(props)
53+
if self.verify_new_connections is True:
54+
srw_connect_retry_timeout_ms = WrapperProperties.SRW_CONNECT_RETRY_TIMEOUT_MS.get_int(props)
55+
if srw_connect_retry_timeout_ms <= 0:
56+
raise ValueError(Messages.get_formatted("SimpleReadWriteSplittingPlugin.IncorrectConfiguration", WrapperProperties.SRW_CONNECT_RETRY_TIMEOUT_MS.name))
57+
self.connect_retry_timeout_ms = srw_connect_retry_timeout_ms
58+
59+
srw_connect_retry_interval_ms = WrapperProperties.SRW_CONNECT_RETRY_INTERVAL_MS.get_int(props)
60+
if srw_connect_retry_interval_ms <= 0:
61+
raise ValueError(Messages.get_formatted("SimpleReadWriteSplittingPlugin.IncorrectConfiguration", WrapperProperties.SRW_CONNECT_RETRY_INTERVAL_MS.name))
62+
self.connect_retry_interval_ms = srw_connect_retry_interval_ms
63+
64+
self.verify_opened_connection_type = EndpointBasedConnectionHandler._parse_connection_type(WrapperProperties.SRW_VERIFY_INITIAL_CONNECTION_TYPE.get(props))
65+
66+
self._plugin_service = plugin_service
67+
self._properties = props
68+
self._rds_utils = RdsUtils()
69+
self._host_list_provider_service: Optional[HostListProviderService] = None
70+
self._write_endpoint_host_info = None
71+
self._read_endpoint_host_info = None
72+
73+
def open_new_writer_connection(self) -> Optional[tuple[Connection, HostInfo]]:
74+
if self._write_endpoint_host_info is None:
75+
self._write_endpoint_host_info = self._create_host_info(self.write_endpoint, HostRole.WRITER)
76+
77+
conn: Optional[Connection] = None
78+
if self.verify_new_connections:
79+
conn = self._get_verified_connection(self._properties, self._write_endpoint_host_info, HostRole.WRITER)
80+
else:
81+
conn = self._plugin_service.connect(self._write_endpoint_host_info, self._properties, None)
82+
83+
return conn, self._write_endpoint_host_info
84+
85+
def open_new_reader_connection(self) -> Optional[tuple[Connection, HostInfo]]:
86+
if self._read_endpoint_host_info is None:
87+
self._read_endpoint_host_info = self._create_host_info(self.read_endpoint, HostRole.READER)
88+
89+
conn: Optional[Connection] = None
90+
if self.verify_new_connections:
91+
conn = self._get_verified_connection(self._properties, self._read_endpoint_host_info, HostRole.READER)
92+
else:
93+
conn = self._plugin_service.connect(self._read_endpoint_host_info, self._properties, None)
94+
95+
return conn, self._read_endpoint_host_info
96+
97+
def get_verified_initial_connection(self, host_info: HostInfo, props: Properties, is_initial_connection: bool, connect_func: Callable) -> Optional[Connection]:
98+
if not is_initial_connection or not self.verify_new_connections:
99+
return connect_func()
100+
101+
url_type: RdsUrlType = self._rds_utils.identify_rds_type(host_info.host)
102+
103+
conn: Optional[Connection] = None
104+
105+
if url_type == RdsUrlType.RDS_WRITER_CLUSTER or (self.verify_opened_connection_type is not None and self.verify_opened_connection_type == HostRole.WRITER):
106+
conn = self._get_verified_connection(props, host_info, HostRole.WRITER, connect_func)
107+
elif url_type == RdsUrlType.RDS_READER_CLUSTER or (self.verify_opened_connection_type is not None and self.verify_opened_connection_type == HostRole.READER):
108+
conn = self._get_verified_connection(props, host_info, HostRole.READER, connect_func)
109+
110+
if conn is None:
111+
conn = connect_func()
112+
113+
self._set_initial_connection_host_info(conn, host_info)
114+
return conn
115+
116+
def _set_initial_connection_host_info(self, conn: Connection, host_info: HostInfo):
117+
if self.set_host_list_provider_service is None:
118+
return
119+
120+
if host_info is None:
121+
try:
122+
host_info = self._plugin_service.identify_connection(conn)
123+
except Exception as e:
124+
return
125+
126+
if host_info is not None and self._host_list_provider_service is not None:
127+
self._host_list_provider_service.initial_connection_host_info = host_info
128+
129+
def _get_verified_connection(self, props: Properties, host_info: HostInfo, role: HostRole, connect_func: Callable = None) -> Connection:
130+
end_time_nano = perf_counter_ns() + (self.connect_retry_timeout_ms * 1000000)
131+
132+
candidate_conn: Optional[Connection]
133+
attempt = 0
134+
135+
while perf_counter_ns() < end_time_nano:
136+
attempt += 1
137+
candidate_conn = None
138+
139+
try:
140+
if connect_func is not None:
141+
candidate_conn = connect_func()
142+
elif host_info is not None:
143+
candidate_conn = self._plugin_service.connect(host_info, props, None)
144+
else:
145+
return None
146+
147+
if candidate_conn is None:
148+
self._delay()
149+
continue
150+
151+
actual_role = self._plugin_service.get_host_role(candidate_conn)
152+
153+
instance_connected_to = self._plugin_service.identify_connection(candidate_conn)
154+
155+
if actual_role != role:
156+
ReadWriteSplittingConnectionManager.close_connection(candidate_conn)
157+
self._delay()
158+
continue
159+
160+
return candidate_conn
161+
162+
except Exception as e:
163+
ReadWriteSplittingConnectionManager.close_connection(candidate_conn)
164+
self._delay()
165+
166+
return None
167+
168+
def old_reader_can_be_used(self, reader_host_info: HostInfo) -> bool:
169+
# Assume that the old reader can always be used, no topology-based information to check.
170+
return True
171+
172+
def need_connect_to_writer(self) -> bool:
173+
# SetReadOnly(true) will always connect to the read_endpoint, and not the writer.
174+
return False
175+
176+
def refresh_and_store_host_list(self, current_conn: Connection, driver_dialect: DriverDialect):
177+
# Endpoint based connections do not require a host list.
178+
return
179+
180+
def should_update_writer_with_current_conn(self, current_conn: Connection, current_host: HostInfo, writer_conn: Connection) -> bool:
181+
return self.is_writer_host(current_host) and current_conn != writer_conn and (not self.verify_new_connections or self._plugin_service.get_host_role(current_conn) == HostRole.WRITER)
182+
183+
def should_update_reader_with_current_conn(self, current_conn: Connection, current_host: HostInfo, reader_conn: Connection) -> bool:
184+
return self.is_reader_host(current_host) and current_conn != reader_conn and (not self.verify_new_connections or self._plugin_service.get_host_role(current_conn) == HostRole.READER)
185+
186+
def is_writer_host(self, current_host: HostInfo) -> bool:
187+
return current_host.host.casefold() == self.write_endpoint.casefold() or current_host.url.casefold() == self.write_endpoint.casefold()
188+
189+
def is_reader_host(self, current_host: HostInfo) -> bool:
190+
return current_host.host.casefold() == self.read_endpoint.casefold() or current_host.url.casefold() == self.read_endpoint.casefold()
191+
192+
def _create_host_info(self, endpoint, role: HostRole) -> HostInfo:
193+
endpoint = endpoint.strip()
194+
host = endpoint
195+
port = self._plugin_service.database_dialect.default_port
196+
colon_index = endpoint.rfind(":")
197+
198+
if colon_index != -1:
199+
port_str = endpoint[colon_index + 1:]
200+
if port_str.isdigit():
201+
host = endpoint[:colon_index]
202+
port = int(port_str)
203+
else:
204+
if (self.set_host_list_provider_service is not None and self.set_host_list_provider_service.initial_connection_host_info is not None and
205+
self.set_host_list_provider_service.initial_connection_host_info.port != HostInfo.NO_PORT):
206+
port = self.set_host_list_provider_service.initial_connection_host_info.port
207+
208+
return HostInfo(
209+
host=host,
210+
port=port,
211+
role=role,
212+
availability=HostAvailability.AVAILABLE)
213+
214+
def _delay(self):
215+
sleep(self.connect_retry_interval_ms / 1000)
216+
217+
@staticmethod
218+
def _parse_connection_type(phase_str: Optional[str]) -> HostRole:
219+
if not phase_str:
220+
return None
221+
222+
phase_upper = phase_str.lower()
223+
if phase_upper == "reader":
224+
return HostRole.READER
225+
elif phase_upper == "writer":
226+
return HostRole.WRITER
227+
else:
228+
raise ValueError(Messages.get_formatted("SimpleReadWriteSplittingPlugin.IncorrectConfiguration", WrapperProperties.SRW_VERIFY_INITIAL_CONNECTION_TYPE.name))
229+
230+
231+
class SimpleReadWriteSplittingPlugin(ReadWriteSplittingConnectionManager):
232+
def __init__(self, plugin_service, props: Properties):
233+
# The simple read/write splitting plugin handles connections based on configuration parameter endpoints.
234+
connection_handler = EndpointBasedConnectionHandler(
235+
plugin_service,
236+
props,
237+
)
238+
239+
super().__init__(plugin_service, props, connection_handler)
240+
241+
class SimpleReadWriteSplittingPluginFactory(PluginFactory):
242+
def get_instance(self, plugin_service, props: Properties):
243+
return SimpleReadWriteSplittingPlugin(plugin_service, props)

aws_advanced_python_wrapper/utils/properties.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,37 @@ class WrapperProperties:
462462
1000
463463
)
464464

465+
# Simple Read/Write Splitting
466+
SRW_READ_ENDPOINT = WrapperProperty(
467+
"srw_read_endpoint",
468+
"The read-only endpoint that should be used to connect to a reader.",
469+
None)
470+
471+
SRW_WRITE_ENDPOINT = WrapperProperty(
472+
"srw_write_endpoint",
473+
"The read-write/cluster endpoint that should be used to connect to the writer.",
474+
None)
475+
476+
SRW_VERIFY_NEW_CONNECTIONS = WrapperProperty(
477+
"srw_verify_new_connections",
478+
"Enables role-verification for new connections made by the Simple Read/Write Splitting Plugin..",
479+
True)
480+
481+
SRW_VERIFY_INITIAL_CONNECTION_TYPE = WrapperProperty(
482+
"srw_verify_initial_connection_type",
483+
"Force to verify an initial connection to be either a writer or a reader.",
484+
None)
485+
486+
SRW_CONNECT_RETRY_TIMEOUT_MS = WrapperProperty(
487+
"srw_connect_retry_timeout_ms",
488+
"Maximum allowed time in milliseconds for the plugin to retry opening a connection.",
489+
60000)
490+
491+
SRW_CONNECT_RETRY_INTERVAL_MS = WrapperProperty(
492+
"srw_connect_retry_interval_ms",
493+
"Time in milliseconds between each retry of opening a connection.",
494+
1000)
495+
465496

466497
class PropertiesUtils:
467498
_MONITORING_PROPERTY_PREFIX = "monitoring-"

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ build-backend = "poetry.core.masonry.api"
7575
[tool.pytest.ini_options]
7676
filterwarnings = [
7777
'ignore:cache could not write path',
78-
'ignore:could not create cache path'
78+
'ignore:could not create cache path',
79+
'ignore:Exception during reset or similar:pytest.PytestUnhandledThreadExceptionWarning'
7980
]
8081

0 commit comments

Comments
 (0)