diff --git a/.github/workflows/integration_tests.yml b/.github/workflows/integration_tests.yml index f01de944..b2e480fb 100644 --- a/.github/workflows/integration_tests.yml +++ b/.github/workflows/integration_tests.yml @@ -5,7 +5,6 @@ on: push: branches: - main - - dev/v3 paths-ignore: - "**/*.md" - "**/*.jpg" diff --git a/common/lib/connection_info.ts b/common/lib/connection_info.ts new file mode 100644 index 00000000..2d195120 --- /dev/null +++ b/common/lib/connection_info.ts @@ -0,0 +1,35 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { ClientWrapper } from "./client_wrapper"; + +export class ConnectionInfo { + private readonly _client: ClientWrapper; + private readonly _isPooled: boolean; + + constructor(client: ClientWrapper, isPooled: boolean) { + this._client = client; + this._isPooled = isPooled; + } + + get client(): ClientWrapper { + return this._client; + } + + get isPooled(): boolean { + return this._isPooled; + } +} diff --git a/common/lib/connection_provider.ts b/common/lib/connection_provider.ts index 8da16f5d..9f60b31b 100644 --- a/common/lib/connection_provider.ts +++ b/common/lib/connection_provider.ts @@ -17,10 +17,10 @@ import { HostRole } from "./host_role"; import { HostInfo } from "./host_info"; import { PluginService } from "./plugin_service"; -import { ClientWrapper } from "./client_wrapper"; +import { ConnectionInfo } from "./connection_info"; export interface ConnectionProvider { - connect(hostInfo: HostInfo, pluginService: PluginService, props: Map): Promise; + connect(hostInfo: HostInfo, pluginService: PluginService, props: Map): Promise; acceptsUrl(hostInfo: HostInfo, props: Map): boolean; acceptsStrategy(role: HostRole, strategy: string): boolean; getHostInfoByStrategy(hosts: HostInfo[], role: HostRole, strategy: string, props?: Map): HostInfo; diff --git a/common/lib/driver_connection_provider.ts b/common/lib/driver_connection_provider.ts index 7affca52..6a4cd034 100644 --- a/common/lib/driver_connection_provider.ts +++ b/common/lib/driver_connection_provider.ts @@ -30,6 +30,7 @@ import { logger } from "../logutils"; import { ClientWrapper } from "./client_wrapper"; import { RoundRobinHostSelector } from "./round_robin_host_selector"; import { DriverDialect } from "./driver_dialect/driver_dialect"; +import { ConnectionInfo } from "./connection_info"; export class DriverConnectionProvider implements ConnectionProvider { private static readonly acceptedStrategies: Map = new Map([ @@ -46,7 +47,7 @@ export class DriverConnectionProvider implements ConnectionProvider { return true; } - async connect(hostInfo: HostInfo, pluginService: PluginService, props: Map): Promise { + async connect(hostInfo: HostInfo, pluginService: PluginService, props: Map): Promise { let resultTargetClient; const resultProps = new Map(props); resultProps.set(WrapperProperties.HOST.name, hostInfo.host); @@ -92,7 +93,7 @@ export class DriverConnectionProvider implements ConnectionProvider { resultTargetClient = driverDialect.connect(hostInfo, resultProps); } pluginService.attachErrorListener(resultTargetClient); - return resultTargetClient; + return new ConnectionInfo(resultTargetClient, false); } getHostInfoByStrategy(hosts: HostInfo[], role: HostRole, strategy: string, props?: Map): HostInfo { diff --git a/common/lib/host_info.ts b/common/lib/host_info.ts index 566f5cfa..3ac57751 100644 --- a/common/lib/host_info.ts +++ b/common/lib/host_info.ts @@ -65,10 +65,6 @@ export class HostInfo { return this.port != HostInfo.NO_PORT; } - getHostAndPort(): string { - return this.isPortSpecified() ? this.host + ":" + this.port : this.host; - } - addAlias(...alias: string[]) { if (!alias || alias.length < 1) { return; diff --git a/common/lib/host_list_provider/monitoring/cluster_topology_monitor.ts b/common/lib/host_list_provider/monitoring/cluster_topology_monitor.ts index a5a3e2f0..958f4046 100644 --- a/common/lib/host_list_provider/monitoring/cluster_topology_monitor.ts +++ b/common/lib/host_list_provider/monitoring/cluster_topology_monitor.ts @@ -528,9 +528,9 @@ export class HostMonitor { } const latestWriterHostInfo: HostInfo = hosts.find((x) => x.role === HostRole.WRITER); - if (latestWriterHostInfo && writerHostInfo && latestWriterHostInfo.getHostAndPort() !== writerHostInfo.getHostAndPort()) { + if (latestWriterHostInfo && writerHostInfo && latestWriterHostInfo.hostAndPort !== writerHostInfo.hostAndPort) { this.writerChanged = true; - logger.debug(Messages.get("HostMonitor.writerHostChanged", writerHostInfo.getHostAndPort(), latestWriterHostInfo.getHostAndPort())); + logger.debug(Messages.get("HostMonitor.writerHostChanged", writerHostInfo.hostAndPort, latestWriterHostInfo.hostAndPort)); this.monitor.updateTopologyCache(hosts); logger.debug(logTopology(hosts, `[hostMonitor ${this.hostInfo.hostId}] `)); } diff --git a/common/lib/internal_pooled_connection_provider.ts b/common/lib/internal_pooled_connection_provider.ts index 300442ad..fe028fe9 100644 --- a/common/lib/internal_pooled_connection_provider.ts +++ b/common/lib/internal_pooled_connection_provider.ts @@ -39,6 +39,7 @@ import { LeastConnectionsHostSelector } from "./least_connections_host_selector" import { PoolClientWrapper } from "./pool_client_wrapper"; import { logger } from "../logutils"; import { SlidingExpirationCacheWithCleanupTask } from "./utils/sliding_expiration_cache_with_cleanup_task"; +import { ConnectionInfo } from "./connection_info"; export class InternalPooledConnectionProvider implements PooledConnectionProvider, CanReleaseResources { static readonly CACHE_CLEANUP_NANOS: bigint = BigInt(10 * 60_000_000_000); // 10 minutes @@ -79,7 +80,7 @@ export class InternalPooledConnectionProvider implements PooledConnectionProvide return RdsUrlType.RDS_INSTANCE === urlType; } - async connect(hostInfo: HostInfo, pluginService: PluginService, props: Map): Promise { + async connect(hostInfo: HostInfo, pluginService: PluginService, props: Map): Promise { const resultProps = new Map(props); resultProps.set(WrapperProperties.HOST.name, hostInfo.host); if (hostInfo.isPortSpecified()) { @@ -122,7 +123,7 @@ export class InternalPooledConnectionProvider implements PooledConnectionProvide const poolClient = await this.getPoolConnection(connectionHostInfo, props); pluginService.attachErrorListener(poolClient); - return poolClient; + return new ConnectionInfo(poolClient, true); } async getPoolConnection(hostInfo: HostInfo, props: Map) { diff --git a/common/lib/plugin_manager.ts b/common/lib/plugin_manager.ts index 49256696..410b4cf9 100644 --- a/common/lib/plugin_manager.ts +++ b/common/lib/plugin_manager.ts @@ -32,6 +32,7 @@ import { TelemetryTraceLevel } from "./utils/telemetry/telemetry_trace_level"; import { ConnectionProvider } from "./connection_provider"; import { ConnectionPluginFactory } from "./plugin_factory"; import { ConfigurationProfile } from "./profile/configuration_profile"; +import { CoreServicesContainer } from "./utils/core_services_container"; type PluginFunc = (plugin: ConnectionPlugin, targetFunc: () => Promise) => Promise; @@ -395,6 +396,7 @@ export class PluginManager { } PluginManager.STRATEGY_PLUGIN_CHAIN_CACHE.clear(); + CoreServicesContainer.releaseResources(); PluginManager.PLUGINS = new Set(); } diff --git a/common/lib/plugin_service.ts b/common/lib/plugin_service.ts index e2aae506..9aa5e732 100644 --- a/common/lib/plugin_service.ts +++ b/common/lib/plugin_service.ts @@ -150,6 +150,10 @@ export interface PluginService extends ErrorHandler { getStatus(clazz: any, key: string): T; isPluginInUse(plugin: any): boolean; + + isPooledClient(): boolean; + + setIsPooledClient(isPooledClient: boolean): void; } export class PluginServiceImpl implements PluginService, HostListProviderService { @@ -172,6 +176,8 @@ export class PluginServiceImpl implements PluginService, HostListProviderService protected static readonly statusesExpiringCache: CacheMap = new CacheMap(); protected static readonly DEFAULT_STATUS_CACHE_EXPIRE_NANO: number = 3_600_000_000_000; // 60 minutes + protected pooledClient: boolean | null = null; + constructor( container: PluginServiceManagerContainer, client: AwsClient, @@ -782,4 +788,12 @@ export class PluginServiceImpl implements PluginService, HostListProviderService isPluginInUse(plugin: any) { return this.pluginServiceManagerContainer.pluginManager!.isPluginInUse(plugin); } + + isPooledClient(): boolean | null { + return this.pooledClient; + } + + setIsPooledClient(isPooledClient: boolean): void { + this.pooledClient = isPooledClient; + } } diff --git a/common/lib/plugins/bluegreen/blue_green_interim_status.ts b/common/lib/plugins/bluegreen/blue_green_interim_status.ts index c2c97931..4cf7bb3f 100644 --- a/common/lib/plugins/bluegreen/blue_green_interim_status.ts +++ b/common/lib/plugins/bluegreen/blue_green_interim_status.ts @@ -104,7 +104,7 @@ export class BlueGreenInterimStatus { this.startTopology == null ? "" : this.startTopology - .map((x) => x.getHostAndPort() + x.role) + .map((x) => x.hostAndPort + x.role) .sort() .join(",") ); @@ -114,7 +114,7 @@ export class BlueGreenInterimStatus { this.currentTopology == null ? "" : this.currentTopology - .map((x) => x.getHostAndPort() + x.role) + .map((x) => x.hostAndPort + x.role) .sort() .join(",") ); diff --git a/common/lib/plugins/bluegreen/blue_green_status_provider.ts b/common/lib/plugins/bluegreen/blue_green_status_provider.ts index c503f133..0919fc78 100644 --- a/common/lib/plugins/bluegreen/blue_green_status_provider.ts +++ b/common/lib/plugins/bluegreen/blue_green_status_provider.ts @@ -874,7 +874,7 @@ export class BlueGreenStatusProvider { logger.debug( "Corresponding hosts:\n" + Array.from(this.correspondingHosts.entries()) - .map(([key, value]) => ` ${key} -> ${value.right == null ? "" : value.right.getHostAndPort()}`) + .map(([key, value]) => ` ${key} -> ${value.right == null ? "" : value.right.hostAndPort}`) .join("\n") ); diff --git a/common/lib/plugins/bluegreen/routing/base_connect_routing.ts b/common/lib/plugins/bluegreen/routing/base_connect_routing.ts index 5dfdd25c..9a0deecc 100644 --- a/common/lib/plugins/bluegreen/routing/base_connect_routing.ts +++ b/common/lib/plugins/bluegreen/routing/base_connect_routing.ts @@ -25,7 +25,7 @@ import { PluginService } from "../../../plugin_service"; export abstract class BaseConnectRouting extends BaseRouting implements ConnectRouting { isMatch(hostInfo: HostInfo, hostRole: BlueGreenRole): boolean { return ( - (this.hostAndPort === null || this.hostAndPort === (hostInfo ?? hostInfo.getHostAndPort().toLowerCase())) && + (this.hostAndPort === null || this.hostAndPort === (hostInfo ?? hostInfo.hostAndPort.toLowerCase())) && (this.role === null || this.role === hostRole) ); } diff --git a/common/lib/plugins/bluegreen/routing/base_execute_routing.ts b/common/lib/plugins/bluegreen/routing/base_execute_routing.ts index 80d7d50e..e631442b 100644 --- a/common/lib/plugins/bluegreen/routing/base_execute_routing.ts +++ b/common/lib/plugins/bluegreen/routing/base_execute_routing.ts @@ -24,7 +24,7 @@ import { ConnectionPlugin } from "../../../connection_plugin"; export abstract class BaseExecuteRouting extends BaseRouting implements ExecuteRouting { isMatch(hostInfo: HostInfo, hostRole: BlueGreenRole): boolean { return ( - (this.hostAndPort === null || this.hostAndPort === (hostInfo ?? hostInfo.getHostAndPort().toLowerCase())) && + (this.hostAndPort === null || this.hostAndPort === (hostInfo ?? hostInfo.hostAndPort.toLowerCase())) && (this.role === null || this.role === hostRole) ); } diff --git a/common/lib/plugins/bluegreen/routing/substitute_connect_routing.ts b/common/lib/plugins/bluegreen/routing/substitute_connect_routing.ts index 1d453e46..94f35677 100644 --- a/common/lib/plugins/bluegreen/routing/substitute_connect_routing.ts +++ b/common/lib/plugins/bluegreen/routing/substitute_connect_routing.ts @@ -99,12 +99,12 @@ export class SubstituteConnectRouting extends BaseConnectRouting { // try with another IAM host } } - throw new AwsWrapperError(Messages.get("Bgd.inProgressCantOpenConnection", this.substituteHost.getHostAndPort())); + throw new AwsWrapperError(Messages.get("Bgd.inProgressCantOpenConnection", this.substituteHost.hostAndPort)); } toString(): string { - return `${this.constructor.name} [${this.hostAndPort ?? ""}, ${this.role?.name ?? ""}, substitute: ${this.substituteHost?.getHostAndPort() ?? ""}, iamHosts: ${ - this.iamHosts?.map((host) => host.getHostAndPort()).join(", ") ?? "" + return `${this.constructor.name} [${this.hostAndPort ?? ""}, ${this.role?.name ?? ""}, substitute: ${this.substituteHost?.hostAndPort ?? ""}, iamHosts: ${ + this.iamHosts?.map((host) => host.hostAndPort).join(", ") ?? "" }]`; } } diff --git a/common/lib/plugins/connection_tracker/opened_connection_tracker.ts b/common/lib/plugins/connection_tracker/opened_connection_tracker.ts index 364a123b..6c8b6a58 100644 --- a/common/lib/plugins/connection_tracker/opened_connection_tracker.ts +++ b/common/lib/plugins/connection_tracker/opened_connection_tracker.ts @@ -36,7 +36,7 @@ export class OpenedConnectionTracker { // Check if the connection was established using an instance endpoint if (OpenedConnectionTracker.rdsUtils.isRdsInstance(hostInfo.host)) { - this.trackConnection(hostInfo.getHostAndPort(), client); + this.trackConnection(hostInfo.hostAndPort, client); return; } diff --git a/common/lib/plugins/default_plugin.ts b/common/lib/plugins/default_plugin.ts index ddfcf5f9..3af9b3fc 100644 --- a/common/lib/plugins/default_plugin.ts +++ b/common/lib/plugins/default_plugin.ts @@ -29,6 +29,7 @@ import { AwsWrapperError } from "../utils/errors"; import { HostAvailability } from "../host_availability/host_availability"; import { ClientWrapper } from "../client_wrapper"; import { TelemetryTraceLevel } from "../utils/telemetry/telemetry_trace_level"; +import { ConnectionInfo } from "../connection_info"; export class DefaultPlugin extends AbstractConnectionPlugin { id: string = uniqueId("_defaultPlugin"); @@ -79,10 +80,11 @@ export class DefaultPlugin extends AbstractConnectionPlugin { TelemetryTraceLevel.NESTED ); - const result = await telemetryContext.start(async () => await connProvider.connect(hostInfo, this.pluginService, props)); + const result: ConnectionInfo = await telemetryContext.start(async () => await connProvider.connect(hostInfo, this.pluginService, props)); this.pluginService.setAvailability(hostInfo.allAliases, HostAvailability.AVAILABLE); - await this.pluginService.updateDialect(result); - return result; + this.pluginService.setIsPooledClient(result.isPooled); + await this.pluginService.updateDialect(result.client); + return result.client; } override async execute(methodName: string, methodFunc: () => Promise): Promise { diff --git a/common/lib/plugins/failover/failover_plugin.ts b/common/lib/plugins/failover/failover_plugin.ts index e6d7f4d7..4145e519 100644 --- a/common/lib/plugins/failover/failover_plugin.ts +++ b/common/lib/plugins/failover/failover_plugin.ts @@ -17,14 +17,14 @@ import { AbstractConnectionPlugin } from "../../abstract_connection_plugin"; import { logger } from "../../../logutils"; import { - HostInfo, AwsWrapperError, FailoverFailedError, FailoverSuccessError, - TransactionResolutionUnknownError, - UnavailableHostError, + HostAvailability, + HostInfo, HostRole, - HostAvailability + TransactionResolutionUnknownError, + UnavailableHostError } from "../../"; import { PluginService } from "../../plugin_service"; import { OldConnectionSuggestionAction } from "../../old_connection_suggestion_action"; @@ -255,6 +255,7 @@ export class FailoverPlugin extends AbstractConnectionPlugin { // Verify there aren't any unexpected error emitted while the connection was idle. if (this.pluginService.hasNetworkError()) { // Throw the unexpected error directly to be handled. + throw this.pluginService.getUnexpectedError(); } diff --git a/common/lib/plugins/failover2/failover2_plugin.ts b/common/lib/plugins/failover2/failover2_plugin.ts index 5d7060d0..3b4a7f04 100644 --- a/common/lib/plugins/failover2/failover2_plugin.ts +++ b/common/lib/plugins/failover2/failover2_plugin.ts @@ -138,7 +138,7 @@ export class Failover2Plugin extends AbstractConnectionPlugin implements CanRele return await this._staleDnsHelper.getVerifiedConnection(hostInfo.host, isInitialConnection, this.hostListProviderService!, props, connectFunc); } - const hostInfoWithAvailability: HostInfo = this.pluginService.getHosts().find((x) => x.getHostAndPort() === hostInfo.getHostAndPort()); + const hostInfoWithAvailability: HostInfo = this.pluginService.getHosts().find((x) => x.hostAndPort === hostInfo.hostAndPort); let client: ClientWrapper = null; if (!hostInfoWithAvailability || hostInfoWithAvailability.getAvailability() != HostAvailability.NOT_AVAILABLE) { diff --git a/common/lib/plugins/read_write_splitting/abstract_read_write_splitting_plugin.ts b/common/lib/plugins/read_write_splitting/abstract_read_write_splitting_plugin.ts new file mode 100644 index 00000000..7507ac66 --- /dev/null +++ b/common/lib/plugins/read_write_splitting/abstract_read_write_splitting_plugin.ts @@ -0,0 +1,306 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { CanReleaseResources } from "../../can_release_resources"; +import { AbstractConnectionPlugin } from "../../abstract_connection_plugin"; +import { ClientWrapper } from "../../client_wrapper"; +import { PluginService } from "../../plugin_service"; +import { HostListProviderService } from "../../host_list_provider_service"; +import { HostInfo } from "../../host_info"; +import { HostChangeOptions } from "../../host_change_options"; +import { OldConnectionSuggestionAction } from "../../old_connection_suggestion_action"; +import { Messages } from "../../utils/messages"; +import { logger } from "../../../logutils"; +import { HostRole } from "../../host_role"; +import { SqlMethodUtils } from "../../utils/sql_method_utils"; +import { FailoverError } from "../../utils/errors"; +import { WrapperProperties } from "../../wrapper_property"; +import { convertMsToNanos, getTimeInNanos, logAndThrowError } from "../../utils/utils"; +import { CacheItem } from "../../utils/cache_item"; + +export abstract class AbstractReadWriteSplittingPlugin extends AbstractConnectionPlugin implements CanReleaseResources { + private static readonly subscribedMethods: Set = new Set(["initHostProvider", "connect", "notifyConnectionChanged", "query"]); + + protected _hostListProviderService: HostListProviderService | undefined; + protected pluginService: PluginService; + protected readonly _properties: Map; + protected readerHostInfo?: HostInfo = undefined; + protected writerHostInfo?: HostInfo = undefined; + protected isReaderClientFromInternalPool: boolean = false; + protected isWriterClientFromInternalPool: boolean = false; + + protected writerTargetClient: ClientWrapper | undefined; + protected readerCacheItem: CacheItem; + protected readonly readerSelectorStrategy: string = ""; + + private _inReadWriteSplit = false; + + protected constructor(pluginService: PluginService, properties: Map) { + super(); + this.pluginService = pluginService; + this._properties = properties; + this.readerSelectorStrategy = WrapperProperties.READER_HOST_SELECTOR_STRATEGY.get(properties); + } + + override getSubscribedMethods(): Set { + return AbstractReadWriteSplittingPlugin.subscribedMethods; + } + + override initHostProvider( + hostInfo: HostInfo, + props: Map, + hostListProviderService: HostListProviderService, + initHostProviderFunc: () => void + ) { + this._hostListProviderService = hostListProviderService; + initHostProviderFunc(); + } + + override async notifyConnectionChanged(changes: Set): Promise { + try { + await this.updateInternalClientInfo(); + } catch (e) { + // pass + } + if (this._inReadWriteSplit) { + return Promise.resolve(OldConnectionSuggestionAction.PRESERVE); + } + return Promise.resolve(OldConnectionSuggestionAction.NO_OPINION); + } + + async updateInternalClientInfo(): Promise { + const currentTargetClient = this.pluginService.getCurrentClient().targetClient; + const currentHost = this.pluginService.getCurrentHostInfo(); + if (currentHost === null || currentTargetClient === null) { + return; + } + + if (this.shouldUpdateWriterClient(currentTargetClient, currentHost)) { + this.setWriterClient(currentTargetClient, currentHost); + } else if (this.shouldUpdateReaderClient(currentTargetClient, currentHost)) { + await this.setReaderClient(currentTargetClient, currentHost); + } + } + + setWriterClient(writerTargetClient: ClientWrapper | undefined, writerHostInfo: HostInfo): void { + this.writerTargetClient = writerTargetClient; + this.writerHostInfo = writerHostInfo; + logger.debug(Messages.get("ReadWriteSplittingPlugin.setWriterClient", writerHostInfo.hostAndPort)); + } + + async setReaderClient(readerTargetClient: ClientWrapper | undefined, readerHost: HostInfo): Promise { + await this.closeReaderClientIfIdle(); + this.readerCacheItem = new CacheItem(readerTargetClient, this.getKeepAliveTimeout(this.isReaderClientFromInternalPool)); + this.readerHostInfo = readerHost; + logger.debug(Messages.get("ReadWriteSplittingPlugin.setReaderClient", readerHost.hostAndPort)); + } + + async switchClientIfRequired(readOnly: boolean) { + const currentClient = this.pluginService.getCurrentClient(); + if (!(await currentClient.isValid())) { + logAndThrowError(Messages.get("ReadWriteSplittingPlugin.setReadOnlyOnClosedClient", currentClient.targetClient?.id ?? "undefined client")); + } + + await this.refreshAndStoreTopology(currentClient.targetClient); + + const currentHost = this.pluginService.getCurrentHostInfo(); + if (currentHost == null) { + logAndThrowError(Messages.get("ReadWriteSplittingPlugin.unavailableHostInfo")); + } else if (readOnly) { + if (!this.pluginService.isInTransaction() && currentHost.role != HostRole.READER) { + try { + await this.switchToReaderTargetClient(); + } catch (error: any) { + if (!(await currentClient.isValid())) { + logAndThrowError(Messages.get("ReadWriteSplittingPlugin.errorSwitchingToReader", error.message)); + } + logger.warn(Messages.get("ReadWriteSplittingPlugin.fallbackToWriter", currentHost.url)); + } + } + } else if (currentHost.role != HostRole.WRITER) { + if (this.pluginService.isInTransaction()) { + logAndThrowError(Messages.get("ReadWriteSplittingPlugin.setReadOnlyFalseInTransaction")); + } + try { + await this.switchToWriterTargetClient(); + } catch (error: any) { + logAndThrowError(Messages.get("ReadWriteSplittingPlugin.errorSwitchingToWriter", error.message)); + } + } + } + + override async execute(methodName: string, executeFunc: () => Promise, methodArgs: any): Promise { + const statement = SqlMethodUtils.parseMethodArgs(methodArgs, this.pluginService.getDriverDialect()); + const statements = SqlMethodUtils.parseMultiStatementQueries(statement); + + const updateReadOnly: boolean | undefined = SqlMethodUtils.doesSetReadOnly(statements, this.pluginService.getDialect()); + if (updateReadOnly !== undefined) { + try { + await this.switchClientIfRequired(updateReadOnly); + } catch (error) { + await this.closeIdleClients(); + throw error; + } + } + + try { + return await executeFunc(); + } catch (error: any) { + if (error instanceof FailoverError) { + logger.debug(Messages.get("ReadWriteSplittingPlugin.failoverErrorWhileExecutingCommand", methodName)); + await this.closeIdleClients(); + } else { + logger.debug(Messages.get("ReadWriteSplittingPlugin.errorWhileExecutingCommand", methodName, error.message)); + } + + throw error; + } + } + + async switchCurrentTargetClientTo(newTargetClient: ClientWrapper | undefined, newClientHost: HostInfo | undefined) { + const currentTargetClient = this.pluginService.getCurrentClient().targetClient; + + if (currentTargetClient === newTargetClient) { + return; + } + if (newClientHost && newTargetClient) { + try { + await this.pluginService.setCurrentClient(newTargetClient, newClientHost); + logger.debug(Messages.get("ReadWriteSplittingPlugin.settingCurrentClient", newTargetClient.id, newClientHost.hostAndPort)); + } catch (error) { + // pass + } + } + } + + async switchToWriterTargetClient() { + const currentHost = this.pluginService.getCurrentHostInfo(); + const currentClient = this.pluginService.getCurrentClient(); + if (this.isWriter(currentHost) && (await this.isTargetClientUsable(currentClient.targetClient))) { + // Already connected to writer. + return; + } + this._inReadWriteSplit = true; + if (!(await this.isTargetClientUsable(this.writerTargetClient))) { + await this.initializeWriterClient(); + } else { + await this.switchCurrentTargetClientTo(this.writerTargetClient, this.writerHostInfo); + } + + if (this.isReaderClientFromInternalPool) { + await this.closeReaderClientIfIdle(); + } + + logger.debug(Messages.get("ReadWriteSplittingPlugin.switchedFromReaderToWriter", this.writerHostInfo.hostAndPort)); + } + + async switchToReaderTargetClient() { + const currentHost = this.pluginService.getCurrentHostInfo(); + const currentClient = this.pluginService.getCurrentClient(); + if (currentHost !== null && currentHost?.role === HostRole.READER && currentClient) { + // Already connected to reader. + return; + } + + await this.closeReaderIfNecessary(); + + this._inReadWriteSplit = true; + if (this.readerCacheItem == null || !(await this.isTargetClientUsable(this.readerCacheItem.get()))) { + await this.initializeReaderClient(); + } else { + try { + await this.switchCurrentTargetClientTo(this.readerCacheItem.get(), this.readerHostInfo); + logger.debug(Messages.get("ReadWriteSplittingPlugin.switchedFromWriterToReader", this.readerHostInfo.hostAndPort)); + } catch (error: any) { + logger.debug(Messages.get("ReadWriteSplittingPlugin.errorSwitchingToCachedReader", this.readerHostInfo.hostAndPort, error.message)); + await this.closeReaderClientIfIdle(); + await this.initializeReaderClient(); + } + } + if (this.isWriterClientFromInternalPool) { + await this.closeWriterClientIfIdle(); + } + } + + async isTargetClientUsable(targetClient: ClientWrapper | undefined): Promise { + if (!targetClient) { + return Promise.resolve(false); + } + return await this.pluginService.isClientValid(targetClient); + } + + async closeWriterClientIfIdle() { + const currentTargetClient = this.pluginService.getCurrentClient().targetClient; + if (this.writerTargetClient == null || this.writerTargetClient === currentTargetClient) { + return; + } + + try { + if (await this.isTargetClientUsable(this.writerTargetClient)) { + await this.pluginService.abortTargetClient(this.writerTargetClient); + } + } catch (error) { + // ignore + } + this.writerTargetClient = undefined; + } + + async closeReaderClientIfIdle(): Promise { + const currentTargetClient = this.pluginService.getCurrentClient().targetClient; + const readerClient = this.readerCacheItem?.get(true); + if (readerClient == null || readerClient === currentTargetClient) { + return; + } + + try { + if (await this.isTargetClientUsable(readerClient)) { + await this.pluginService.abortTargetClient(readerClient); + } + } catch (error) { + // ignore + } + this.readerCacheItem = null; + this.readerHostInfo = undefined; + } + + async closeIdleClients() { + logger.debug(Messages.get("ReadWriteSplittingPlugin.closingInternalClients")); + await this.closeReaderClientIfIdle(); + await this.closeWriterClientIfIdle(); + } + + protected getKeepAliveTimeout(isPooledClient: boolean): bigint { + if (isPooledClient) { + return BigInt(0); + } + const keepAliveMs = WrapperProperties.CACHED_READER_KEEP_ALIVE_TIMEOUT.get(this._properties); + + return keepAliveMs > 0 ? getTimeInNanos() + convertMsToNanos(keepAliveMs) : BigInt(0); + } + + async releaseResources() { + await this.closeIdleClients(); + } + + protected abstract shouldUpdateReaderClient(currentClient: ClientWrapper | undefined, host: HostInfo): boolean; + protected abstract shouldUpdateWriterClient(currentClient: ClientWrapper | undefined, host: HostInfo): boolean; + protected abstract isWriter(currentHost: HostInfo): boolean; + protected abstract isReader(currentHost: HostInfo): boolean; + protected abstract refreshAndStoreTopology(currentClient: ClientWrapper | undefined): Promise; + protected abstract initializeWriterClient(): Promise; + protected abstract initializeReaderClient(): Promise; + protected abstract closeReaderIfNecessary(): Promise; +} diff --git a/common/lib/plugins/read_write_splitting/read_write_splitting_plugin.ts b/common/lib/plugins/read_write_splitting/read_write_splitting_plugin.ts index c48fd009..14963552 100644 --- a/common/lib/plugins/read_write_splitting/read_write_splitting_plugin.ts +++ b/common/lib/plugins/read_write_splitting/read_write_splitting_plugin.ts @@ -14,38 +14,19 @@ * limitations under the License. */ -import { AbstractConnectionPlugin } from "../../abstract_connection_plugin"; -import { - HostInfo, - FailoverError, - HostRole -} from "../../index"; +import { HostInfo, HostRole } from "../../index"; import { PluginService } from "../../plugin_service"; import { HostListProviderService } from "../../host_list_provider_service"; -import { OldConnectionSuggestionAction } from "../../old_connection_suggestion_action"; -import { HostChangeOptions } from "../../host_change_options"; -import { WrapperProperties } from "../../wrapper_property"; import { Messages } from "../../utils/messages"; -import { logger } from "../../../logutils"; -import { SqlMethodUtils } from "../../utils/sql_method_utils"; import { ClientWrapper } from "../../client_wrapper"; -import { getWriter, logAndThrowError } from "../../utils/utils"; -import { CanReleaseResources } from "../../can_release_resources"; -import { PoolClientWrapper } from "../../pool_client_wrapper"; - -export class ReadWriteSplittingPlugin extends AbstractConnectionPlugin implements CanReleaseResources { - private static readonly subscribedMethods: Set = new Set(["initHostProvider", "connect", "notifyConnectionChanged", "query"]); - private readonly readerSelectorStrategy: string = ""; +import { containsHostAndPort, getWriter, logAndThrowError, logTopology } from "../../utils/utils"; +import { AbstractReadWriteSplittingPlugin } from "./abstract_read_write_splitting_plugin"; +import { WrapperProperties } from "../../wrapper_property"; +import { logger } from "../../../logutils"; +import { CacheItem } from "../../utils/cache_item"; - private _hostListProviderService: HostListProviderService | undefined; - private pluginService: PluginService; - private readonly _properties: Map; - private _readerHostInfo?: HostInfo = undefined; - private isReaderClientFromInternalPool: boolean = false; - private isWriterClientFromInternalPool: boolean = false; - private _inReadWriteSplit = false; - writerTargetClient: ClientWrapper | undefined; - readerTargetClient: ClientWrapper | undefined; +export class ReadWriteSplittingPlugin extends AbstractReadWriteSplittingPlugin { + protected hosts: HostInfo[] = []; constructor(pluginService: PluginService, properties: Map); constructor( @@ -62,53 +43,10 @@ export class ReadWriteSplittingPlugin extends AbstractConnectionPlugin implement writerClient?: ClientWrapper, readerClient?: ClientWrapper ) { - super(); - this.pluginService = pluginService; - this._properties = properties; - this.readerSelectorStrategy = WrapperProperties.READER_HOST_SELECTOR_STRATEGY.get(properties); + super(pluginService, properties); this._hostListProviderService = hostListProviderService; this.writerTargetClient = writerClient; - this.readerTargetClient = readerClient; - } - - override getSubscribedMethods(): Set { - return ReadWriteSplittingPlugin.subscribedMethods; - } - - override initHostProvider( - hostInfo: HostInfo, - props: Map, - hostListProviderService: HostListProviderService, - initHostProviderFunc: () => void - ) { - this._hostListProviderService = hostListProviderService; - initHostProviderFunc(); - } - - override notifyConnectionChanged(changes: Set): Promise { - try { - this.updateInternalClientInfo(); - } catch (e) { - // pass - } - if (this._inReadWriteSplit) { - return Promise.resolve(OldConnectionSuggestionAction.PRESERVE); - } - return Promise.resolve(OldConnectionSuggestionAction.NO_OPINION); - } - - updateInternalClientInfo(): void { - const currentTargetClient = this.pluginService.getCurrentClient().targetClient; - const currentHost = this.pluginService.getCurrentHostInfo(); - if (currentHost === null || currentTargetClient === null) { - return; - } - - if (currentHost.role === HostRole.WRITER) { - this.setWriterClient(currentTargetClient, currentHost); - } else { - this.setReaderClient(currentTargetClient, currentHost); - } + this.readerCacheItem = new CacheItem(readerClient, BigInt(0)); } override async connect( @@ -143,151 +81,116 @@ export class ReadWriteSplittingPlugin extends AbstractConnectionPlugin implement return result; } - override async execute(methodName: string, executeFunc: () => Promise, methodArgs: any): Promise { - const statement = SqlMethodUtils.parseMethodArgs(methodArgs, this.pluginService.getDriverDialect()); - const statements = SqlMethodUtils.parseMultiStatementQueries(statement); + protected isWriter(currentHost: HostInfo): boolean { + return HostRole.WRITER === currentHost.role; + } + + protected isReader(currentHost: HostInfo): boolean { + return HostRole.READER === currentHost.role; + } - const updateReadOnly: boolean | undefined = SqlMethodUtils.doesSetReadOnly(statements, this.pluginService.getDialect()); - if (updateReadOnly !== undefined) { + protected async refreshAndStoreTopology(currentClient: ClientWrapper | undefined): Promise { + if (await this.pluginService.isClientValid(currentClient)) { try { - await this.switchClientIfRequired(updateReadOnly); - } catch (error) { - await this.closeIdleClients(); - throw error; + await this.pluginService.refreshHostList(); + } catch { + // pass } } - try { - return await executeFunc(); - } catch (error: any) { - if (error instanceof FailoverError) { - logger.debug(Messages.get("ReadWriteSplittingPlugin.failoverErrorWhileExecutingCommand", methodName)); - await this.closeIdleClients(); - } else { - logger.debug(Messages.get("ReadWriteSplittingPlugin.errorWhileExecutingCommand", methodName, error.message)); - } - - throw error; + this.hosts = this.pluginService.getHosts(); + if (this.hosts == null || this.hosts.length === 0) { + logAndThrowError(Messages.get("ReadWriteSplittingPlugin.emptyHostList")); } - } - setWriterClient(writerTargetClient: ClientWrapper | undefined, writerHostInfo: HostInfo): void { - this.writerTargetClient = writerTargetClient; - logger.debug(Messages.get("ReadWriteSplittingPlugin.setWriterClient", writerHostInfo.url)); + this.writerHostInfo = getWriter(this.hosts, Messages.get("ReadWriteSplittingPlugin.noWriterFound")); } - setReaderClient(readerTargetClient: ClientWrapper | undefined, readerHost: HostInfo): void { - this.readerTargetClient = readerTargetClient; - this._readerHostInfo = readerHost; - logger.debug(Messages.get("ReadWriteSplittingPlugin.setReaderClient", readerHost.url)); - } + protected async forceRefreshAndStoreTopology(currentClient: ClientWrapper | undefined): Promise { + if (await this.pluginService.isClientValid(currentClient)) { + try { + await this.pluginService.forceRefreshHostList(); + } catch { + // ignore + } + } - async getNewWriterClient(writerHost: HostInfo) { - const props = new Map(this._properties); - props.set(WrapperProperties.HOST.name, writerHost.host); - try { - const copyProps = new Map(props); - WrapperProperties.ENABLE_CLUSTER_AWARE_FAILOVER.set(copyProps, false); - const targetClient = await this.pluginService.connect(writerHost, copyProps, this); - this.isWriterClientFromInternalPool = targetClient instanceof PoolClientWrapper; - this.setWriterClient(targetClient, writerHost); - await this.switchCurrentTargetClientTo(this.writerTargetClient, writerHost); - } catch (any) { - logger.warn(Messages.get("ReadWriteSplittingPlugin.failedToConnectToWriter", writerHost.url)); + this.hosts = this.pluginService.getHosts(); + if (this.hosts == null || this.hosts.length === 0) { + logAndThrowError(Messages.get("ReadWriteSplittingPlugin.emptyHostList")); } + + this.writerHostInfo = getWriter(this.hosts, Messages.get("ReadWriteSplittingPlugin.noWriterFound")); } - async switchClientIfRequired(readOnly: boolean) { - const currentClient = this.pluginService.getCurrentClient(); - if (!(await currentClient.isValid())) { - logAndThrowError(Messages.get("ReadWriteSplittingPlugin.setReadOnlyOnClosedClient", currentClient.targetClient?.id ?? "undefined client")); - } - try { - await this.pluginService.refreshHostList(); - } catch { - // pass - } + override async initializeWriterClient(): Promise { + let client: ClientWrapper = await this.connectToWriter(); - const hosts: HostInfo[] = this.pluginService.getHosts(); - if (hosts == null || hosts.length === 0) { - logAndThrowError(Messages.get("ReadWriteSplittingPlugin.emptyHostList")); - } - const currentHost = this.pluginService.getCurrentHostInfo(); - if (currentHost == null) { - logAndThrowError(Messages.get("ReadWriteSplittingPlugin.unavailableHostInfo")); - } else if (readOnly) { - if (!this.pluginService.isInTransaction() && currentHost.role != HostRole.READER) { + if (!this.isWriter(client.hostInfo)) { + // refresh and store topology updates this.writerHostInfo. + await this.forceRefreshAndStoreTopology(client); + + if (client !== this.readerCacheItem.get() && client !== this.pluginService.getCurrentClient().targetClient) { try { - await this.switchToReaderTargetClient(hosts); - } catch (error: any) { - if (!(await currentClient.isValid())) { - logAndThrowError(Messages.get("ReadWriteSplittingPlugin.errorSwitchingToReader", error.message)); - } - logger.warn(Messages.get("ReadWriteSplittingPlugin.fallbackToWriter", currentHost.url)); + await client.end(); + } catch (error) { + // Ignore } } - } else if (currentHost.role != HostRole.WRITER) { - if (this.pluginService.isInTransaction()) { - logAndThrowError(Messages.get("ReadWriteSplittingPlugin.setReadOnlyFalseInTransaction")); - } - try { - await this.switchToWriterTargetClient(hosts); - } catch (error: any) { - logAndThrowError(Messages.get("ReadWriteSplittingPlugin.errorSwitchingToWriter", error.message)); - } + + client = await this.connectToWriter(); } + this.isWriterClientFromInternalPool = this.pluginService.isPooledClient(); + this.setWriterClient(client, this.writerHostInfo); + await this.switchCurrentTargetClientTo(this.writerTargetClient, this.writerHostInfo); } - async switchCurrentTargetClientTo(newTargetClient: ClientWrapper | undefined, newClientHost: HostInfo | undefined) { - const currentTargetClient = this.pluginService.getCurrentClient().targetClient; - - if (currentTargetClient === newTargetClient) { - return; - } - if (newClientHost && newTargetClient) { - try { - await this.pluginService.setCurrentClient(newTargetClient, newClientHost); - logger.debug(Messages.get("ReadWriteSplittingPlugin.settingCurrentClient", newTargetClient.id, newClientHost.url)); - } catch (error) { - // pass - } - } + private async connectToWriter() { + const copyProps = new Map(this._properties); + copyProps.set(WrapperProperties.HOST.name, this.writerHostInfo.host); + return await this.pluginService.connect(this.writerHostInfo, copyProps, this); } - async initializeReaderClient(hosts: HostInfo[]) { - if (hosts.length === 1) { - const writerHost = getWriter(hosts, Messages.get("ReadWriteSplittingPlugin.noWriterFound")); - if (writerHost) { - if (!(await this.isTargetClientUsable(this.writerTargetClient))) { - await this.getNewWriterClient(writerHost); - } - logger.warn(Messages.get("ReadWriteSplittingPlugin.noReadersFound", writerHost.url)); + override async initializeReaderClient() { + if (this.hosts.length === 1) { + if (!(await this.isTargetClientUsable(this.writerTargetClient))) { + await this.initializeWriterClient(); } + logger.warn(Messages.get("ReadWriteSplittingPlugin.noReadersFound", this.writerHostInfo.hostAndPort)); } else { await this.getNewReaderClient(); + logger.debug(Messages.get("ReadWriteSplittingPlugin.switchedFromWriterToReader", this.readerHostInfo.hostAndPort)); } } - async getNewReaderClient() { + override shouldUpdateReaderClient(currentClient: ClientWrapper | undefined, host: HostInfo): boolean { + return this.isReader(host); + } + + override shouldUpdateWriterClient(currentClient: ClientWrapper | undefined, host: HostInfo): boolean { + return this.isWriter(host); + } + + protected async getNewReaderClient() { let targetClient = undefined; let readerHost: HostInfo | undefined = undefined; - const connectAttempts = this.pluginService.getHosts().length; + + const hostCandidates: HostInfo[] = this.getReaderHostCandidates(); + + const connectAttempts = hostCandidates.length * 2; for (let i = 0; i < connectAttempts; i++) { const host = this.pluginService.getHostInfoByStrategy(HostRole.READER, this.readerSelectorStrategy); if (host) { - const props = new Map(this._properties); - props.set(WrapperProperties.HOST.name, host.host); - try { - const copyProps = new Map(props); - WrapperProperties.ENABLE_CLUSTER_AWARE_FAILOVER.set(copyProps, false); + const copyProps = new Map(this._properties); + copyProps.set(WrapperProperties.HOST.name, host.host); targetClient = await this.pluginService.connect(host, copyProps, this); - this.isReaderClientFromInternalPool = targetClient instanceof PoolClientWrapper; + this.isReaderClientFromInternalPool = this.pluginService.isPooledClient(); readerHost = host; break; } catch (any) { - logger.warn(Messages.get("ReadWriteSplittingPlugin.failedToConnectToReader", host.url)); + logger.warn(Messages.get("ReadWriteSplittingPlugin.failedToConnectToReader", host.hostAndPort)); } } } @@ -295,99 +198,19 @@ export class ReadWriteSplittingPlugin extends AbstractConnectionPlugin implement logAndThrowError(Messages.get("ReadWriteSplittingPlugin.noReadersAvailable")); return; } - logger.debug(Messages.get("ReadWriteSplittingPlugin.successfullyConnectedToReader", readerHost.url)); - this.setReaderClient(targetClient, readerHost); - await this.switchCurrentTargetClientTo(this.readerTargetClient, this._readerHostInfo); - } - - async switchToWriterTargetClient(hosts: HostInfo[]) { - const currentHost = this.pluginService.getCurrentHostInfo(); - const currentClient = this.pluginService.getCurrentClient(); - if (currentHost !== null && currentHost?.role === HostRole.WRITER && (await currentClient.isValid())) { - return; - } - this._inReadWriteSplit = true; - const writerHost = getWriter(hosts, Messages.get("ReadWriteSplittingPlugin.noWriterFound")); - if (!writerHost) { - return; - } - if (!(await this.isTargetClientUsable(this.writerTargetClient))) { - await this.getNewWriterClient(writerHost); - } else if (this.writerTargetClient) { - await this.switchCurrentTargetClientTo(this.writerTargetClient, writerHost); - } - - logger.debug(Messages.get("ReadWriteSplittingPlugin.switchedFromReaderToWriter", writerHost.url)); - if (this.isReaderClientFromInternalPool) { - await this.closeTargetClientIfIdle(this.readerTargetClient); - } + logger.debug(Messages.get("ReadWriteSplittingPlugin.successfullyConnectedToReader", readerHost.hostAndPort)); + await this.setReaderClient(targetClient, readerHost); + await this.switchCurrentTargetClientTo(this.readerCacheItem.get(), this.readerHostInfo); } - async switchToReaderTargetClient(hosts: HostInfo[]) { - const currentHost = this.pluginService.getCurrentHostInfo(); - const currentClient = this.pluginService.getCurrentClient(); - if (currentHost !== null && currentHost?.role === HostRole.READER && currentClient) { - return; - } - - if (this._readerHostInfo && !hosts.some((hostInfo: HostInfo) => hostInfo.host === this._readerHostInfo?.host)) { - // The old reader cannot be used anymore because it is no longer in the list of allowed hosts. - await this.closeTargetClientIfIdle(this.readerTargetClient); + protected async closeReaderIfNecessary(): Promise { + if (this.readerHostInfo != null && !containsHostAndPort(this.hosts, this.readerHostInfo.hostAndPort)) { + logger.debug(Messages.get("ReadWriteSplittingPlugin.previousReaderNotAllowed", this.readerHostInfo.toString(), logTopology(this.hosts, ""))); + await this.closeReaderClientIfIdle(); } - - this._inReadWriteSplit = true; - if (!(await this.isTargetClientUsable(this.readerTargetClient))) { - await this.initializeReaderClient(hosts); - } else if (this.readerTargetClient != null && this._readerHostInfo != null) { - try { - await this.switchCurrentTargetClientTo(this.readerTargetClient, this._readerHostInfo); - logger.debug(Messages.get("ReadWriteSplittingPlugin.switchedFromWriterToReader", this._readerHostInfo.url)); - } catch (error: any) { - logger.debug(Messages.get("ReadWriteSplittingPlugin.errorSwitchingToCachedReader", this._readerHostInfo.url)); - await this.pluginService.abortTargetClient(this.readerTargetClient); - this.readerTargetClient = undefined; - this._readerHostInfo = undefined; - await this.initializeReaderClient(hosts); - } - } - if (this.isWriterClientFromInternalPool) { - await this.closeTargetClientIfIdle(this.writerTargetClient); - } - } - - async isTargetClientUsable(targetClient: ClientWrapper | undefined): Promise { - if (!targetClient) { - return Promise.resolve(false); - } - return await this.pluginService.isClientValid(targetClient); - } - - async closeTargetClientIfIdle(internalTargetClient: ClientWrapper | undefined) { - const currentTargetClient = this.pluginService.getCurrentClient().targetClient; - try { - if (internalTargetClient != null && internalTargetClient !== currentTargetClient && (await this.isTargetClientUsable(internalTargetClient))) { - await this.pluginService.abortTargetClient(internalTargetClient); - - if (internalTargetClient === this.writerTargetClient) { - this.writerTargetClient = undefined; - } - if (internalTargetClient === this.readerTargetClient) { - this.readerTargetClient = undefined; - this._readerHostInfo = undefined; - } - } - } catch (error) { - // ignore - } - } - - async closeIdleClients() { - logger.debug(Messages.get("ReadWriteSplittingPlugin.closingInternalClients")); - await this.closeTargetClientIfIdle(this.readerTargetClient); - await this.closeTargetClientIfIdle(this.writerTargetClient); } - async releaseResources() { - await this.closeIdleClients(); + protected getReaderHostCandidates(): HostInfo[] | undefined { + return this.pluginService.getHosts(); } } diff --git a/common/lib/utils/cache_item.ts b/common/lib/utils/cache_item.ts new file mode 100644 index 00000000..1abf2b76 --- /dev/null +++ b/common/lib/utils/cache_item.ts @@ -0,0 +1,38 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +export class CacheItem { + constructor( + readonly item: T, + readonly expirationTime: bigint + ) {} + + isExpired(): boolean { + if (this.expirationTime <= 0) { + // No expiration time. + return false; + } + return process.hrtime.bigint() > this.expirationTime; + } + + get(returnExpired: boolean = false): T | null { + return this.isExpired() && !returnExpired ? null : this.item; + } + + toString(): string { + return `CacheItem [item=${this.item}, expirationTime=${this.expirationTime}]`; + } +} diff --git a/common/lib/utils/core_services_container.ts b/common/lib/utils/core_services_container.ts index c9ab89fe..8ae9f27f 100644 --- a/common/lib/utils/core_services_container.ts +++ b/common/lib/utils/core_services_container.ts @@ -45,4 +45,8 @@ export class CoreServicesContainer { // getMonitorService(): MonitorService { // return this.monitorService; // } + + static releaseResources(): void { + CoreServicesContainer.INSTANCE.storageService.releaseResources(); + } } diff --git a/common/lib/utils/messages.ts b/common/lib/utils/messages.ts index 5c0af393..fc145146 100644 --- a/common/lib/utils/messages.ts +++ b/common/lib/utils/messages.ts @@ -116,7 +116,7 @@ const MESSAGES: Record = { "AuroraStaleDnsHelper.staleDnsDetected": "Stale DNS data detected. Opening a connection to '%s'.", "ReadWriteSplittingPlugin.setReadOnlyOnClosedClient": "setReadOnly cannot be called on a closed client '%s'.", "ReadWriteSplittingPlugin.errorSwitchingToCachedReader": - "An error occurred while trying to switch to a cached reader client: '%s'. The driver will attempt to establish a new reader client.", + "An error occurred while trying to switch to a cached reader client: '%s'. Error message: '%s'. The driver will attempt to establish a new reader client.", "ReadWriteSplittingPlugin.errorSwitchingToReader": "An error occurred while trying to switch to a reader client: '%s'.", "ReadWriteSplittingPlugin.errorSwitchingToWriter": "An error occurred while trying to switch to a writer client: '%s'.", "ReadWriteSplittingPlugin.closingInternalClients": "Closing all internal clients except for the current one.", @@ -137,6 +137,7 @@ const MESSAGES: Record = { "ReadWriteSplittingPlugin.failoverErrorWhileExecutingCommand": "Detected a failover error while executing a command: '%s'", "ReadWriteSplittingPlugin.noReadersAvailable": "The plugin was unable to establish a reader client to any reader instance.", "ReadWriteSplittingPlugin.successfullyConnectedToReader": "Successfully connected to a new reader host: '%s'", + "ReadWriteSplittingPlugin.previousReaderNotAllowed": "The previous reader connection cannot be used because it is no longer in the list of allowed hosts. Previous reader: %s. Allowed hosts: %s", "ReadWriteSplittingPlugin.failedToConnectToReader": "Failed to connect to reader host: '%s'", "ReadWriteSplittingPlugin.unsupportedHostSelectorStrategy": "Unsupported host selection strategy '%s' specified in plugin configuration parameter 'readerHostSelectorStrategy'. Please visit the Read/Write Splitting Plugin documentation for all supported strategies.", diff --git a/common/lib/utils/storage/storage_service.ts b/common/lib/utils/storage/storage_service.ts index d13bc052..8dee5a74 100644 --- a/common/lib/utils/storage/storage_service.ts +++ b/common/lib/utils/storage/storage_service.ts @@ -106,6 +106,12 @@ export interface StorageService { * @returns The number of items stored for the given item class */ size(itemClass: Constructor): number; + + /** + * Cleanup method to stop the cleanup interval timer. + * Should be called when the service is no longer needed. + */ + releaseResources(): void; } type CacheSupplier = () => ExpirationCache; @@ -133,7 +139,6 @@ export class StorageServiceImpl implements StorageService { } protected removeExpiredItems(): void { - logger.debug("StorageServiceImpl: Removing expired items"); for (const cache of this.caches.values()) { cache.removeExpiredEntries(); } @@ -250,11 +255,7 @@ export class StorageServiceImpl implements StorageService { StorageServiceImpl.defaultCacheSuppliers.set(itemClass, supplier); } - /** - * Cleanup method to stop the cleanup interval timer. - * Should be called when the service is no longer needed. - */ - destroy(): void { + releaseResources(): void { if (this.cleanupIntervalHandle) { clearInterval(this.cleanupIntervalHandle); this.cleanupIntervalHandle = undefined; diff --git a/common/lib/utils/utils.ts b/common/lib/utils/utils.ts index 32dd0a47..244967c8 100644 --- a/common/lib/utils/utils.ts +++ b/common/lib/utils/utils.ts @@ -117,6 +117,14 @@ export function isDialectTopologyAware(dialect: any): dialect is TopologyAwareDa return dialect; } +export function containsHostAndPort(hosts: HostInfo[] | null | undefined, hostAndPort: string): boolean { + if (!hosts || hosts.length === 0) { + return false; + } + + return hosts.some((host) => host.hostAndPort === hostAndPort); +} + export class Pair { private readonly _left: K; private readonly _right: V; diff --git a/common/lib/wrapper_property.ts b/common/lib/wrapper_property.ts index 215714c7..3718693f 100644 --- a/common/lib/wrapper_property.ts +++ b/common/lib/wrapper_property.ts @@ -473,6 +473,13 @@ export class WrapperProperties { false ); + static readonly CACHED_READER_KEEP_ALIVE_TIMEOUT = new WrapperProperty( + "cachedReaderKeepAliveTimeoutMs", + "The time in milliseconds to keep a reader connection alive in the cache. " + + "Default value 0 means the Wrapper will keep reusing the same cached reader connection.", + 0 + ); + private static readonly PREFIXES = [ WrapperProperties.MONITORING_PROPERTY_PREFIX, ClusterTopologyMonitorImpl.MONITORING_PROPERTY_PREFIX, diff --git a/jest.integration.config.json b/jest.integration.config.json index a6d021d3..f172bcad 100644 --- a/jest.integration.config.json +++ b/jest.integration.config.json @@ -1,7 +1,7 @@ { "testTimeout": 3600000, "moduleFileExtensions": ["ts", "js", "json"], - "testMatch": ["/tests/integration/container/tests/*.(spec|test).ts|tsx"], + "testMatch": ["/tests/integration/container/tests/read_write_splitting*.(spec|test).ts|tsx"], "transform": { "^.+\\.ts$": [ "ts-jest", diff --git a/tests/unit/read_write_splitting.test.ts b/tests/unit/read_write_splitting.test.ts index aba156d7..1c9c0c74 100644 --- a/tests/unit/read_write_splitting.test.ts +++ b/tests/unit/read_write_splitting.test.ts @@ -82,6 +82,16 @@ const mockExecuteFuncThrowsFailoverSuccessError = jest.fn(() => { throw new FailoverSuccessError("test"); }); +class TestReadWriteSplitting extends ReadWriteSplittingPlugin { + getWriterTargetClient(): ClientWrapper | undefined { + return this.writerTargetClient; + } + + getReaderTargetClient(): ClientWrapper | undefined { + return this.readerCacheItem?.get(); + } +} + describe("reader write splitting test", () => { beforeEach(() => { when(mockPluginService.getHostListProvider()).thenReturn(instance(mockHostListProvider)); @@ -116,7 +126,7 @@ describe("reader write splitting test", () => { when(mockPluginService.getCurrentHostInfo()).thenReturn(writerHost); when(mockPluginService.connect(anything(), anything(), anything())).thenResolve(mockReaderWrapper); - const target = new ReadWriteSplittingPlugin( + const target = new TestReadWriteSplitting( mockPluginServiceInstance, properties, mockHostListProviderService, @@ -127,7 +137,7 @@ describe("reader write splitting test", () => { await target.switchClientIfRequired(true); verify(mockPluginService.refreshHostList()).once(); verify(mockPluginService.setCurrentClient(mockReaderWrapper, readerHost1)).once(); - expect(target.readerTargetClient).toBe(mockReaderWrapper); + expect(target.getReaderTargetClient()).toBe(mockReaderWrapper); }); it("test set read only false", async () => { @@ -140,7 +150,7 @@ describe("reader write splitting test", () => { when(mockPluginService.getCurrentHostInfo()).thenReturn(readerHost1); when(mockPluginService.connect(anything(), anything(), anything())).thenResolve(mockWriterWrapper); - const target = new ReadWriteSplittingPlugin( + const target = new TestReadWriteSplitting( mockPluginServiceInstance, properties, mockHostListProviderService, @@ -150,7 +160,7 @@ describe("reader write splitting test", () => { await target.switchClientIfRequired(false); verify(mockPluginService.setCurrentClient(mockWriterWrapper, writerHost)).once(); - expect(target.writerTargetClient).toEqual(mockWriterWrapper); + expect(target.getWriterTargetClient()).toEqual(mockWriterWrapper); }); it("test set read only true already on reader", async () => { @@ -164,7 +174,7 @@ describe("reader write splitting test", () => { when(mockPluginService.getCurrentHostInfo()).thenReturn(readerHost1); when(mockPluginService.connect(anything(), anything(), anything())).thenResolve(mockReaderWrapper); - const target = new ReadWriteSplittingPlugin( + const target = new TestReadWriteSplitting( mockPluginServiceInstance, properties, mockHostListProviderServiceInstance, @@ -174,8 +184,8 @@ describe("reader write splitting test", () => { await target.switchClientIfRequired(true); verify(mockPluginService.setCurrentClient(anything(), anything())).never(); - expect(target.readerTargetClient).toEqual(mockReaderWrapper); - expect(target.writerTargetClient).toEqual(undefined); + expect(target.getReaderTargetClient()).toEqual(mockReaderWrapper); + expect(target.getWriterTargetClient()).toEqual(undefined); }); it("test set read only false already on reader", async () => { @@ -188,7 +198,7 @@ describe("reader write splitting test", () => { when(mockPluginService.getCurrentHostInfo()).thenReturn(writerHost); when(mockPluginService.connect(anything(), anything(), anything())).thenResolve(mockReaderWrapper); - const target = new ReadWriteSplittingPlugin( + const target = new TestReadWriteSplitting( mockPluginServiceInstance, properties, mockHostListProviderServiceInstance, @@ -198,8 +208,8 @@ describe("reader write splitting test", () => { await target.switchClientIfRequired(false); verify(mockPluginService.setCurrentClient(anything(), anything())).never(); - expect(target.writerTargetClient).toEqual(mockWriterWrapper); - expect(target.readerTargetClient).toEqual(undefined); + expect(target.getWriterTargetClient()).toEqual(mockWriterWrapper); + expect(target.getReaderTargetClient()).toEqual(undefined); }); it("test set read only true one host", async () => { @@ -214,7 +224,7 @@ describe("reader write splitting test", () => { when(mockPluginService.getCurrentHostInfo()).thenReturn(writerHost); when(mockPluginService.connect(anything(), anything(), anything())).thenReturn(Promise.resolve(mockWriterWrapper)); - const target = new ReadWriteSplittingPlugin( + const target = new TestReadWriteSplitting( mockPluginServiceInstance, properties, mockHostListProviderService, @@ -225,8 +235,8 @@ describe("reader write splitting test", () => { await target.switchClientIfRequired(true); verify(mockPluginService.setCurrentClient(anything(), anything())).never(); - expect(target.readerTargetClient).toEqual(undefined); - expect(target.writerTargetClient).toEqual(mockWriterWrapper); + expect(target.getReaderTargetClient()).toEqual(undefined); + expect(target.getWriterTargetClient()).toEqual(mockWriterWrapper); }); it("test connect incorrect host role", async () => { @@ -242,7 +252,7 @@ describe("reader write splitting test", () => { when(mockHostListProviderService.isStaticHostListProvider()).thenReturn(false); when(mockHostListProviderService.getHostListProvider()).thenReturn(mockHostListProviderInstance); - const target = new ReadWriteSplittingPlugin( + const target = new TestReadWriteSplitting( mockPluginServiceInstance, properties, mockHostListProviderServiceInstance, @@ -264,7 +274,7 @@ describe("reader write splitting test", () => { when(mockPluginService.getCurrentHostInfo()).thenReturn(readerHost1); when(await mockPluginService.connect(writerHost, properties)).thenReject(); - const target = new ReadWriteSplittingPlugin( + const target = new TestReadWriteSplitting( mockPluginServiceInstance, properties, mockHostListProviderService, @@ -287,7 +297,7 @@ describe("reader write splitting test", () => { when(mockPluginService.getCurrentHostInfo()).thenReturn(writerHost); when(mockPluginService.connect(readerHost1 || readerHost2, properties)).thenReject(); - const target = new ReadWriteSplittingPlugin( + const target = new TestReadWriteSplitting( mockPluginServiceInstance, properties, mockHostListProviderServiceInstance, @@ -297,7 +307,7 @@ describe("reader write splitting test", () => { await target.switchClientIfRequired(true); verify(mockPluginService.setCurrentClient(anything(), anything())).never(); - expect(target.readerTargetClient).toEqual(undefined); + expect(target.getReaderTargetClient()).toEqual(undefined); }); it("test set read only on closed connection", async () => { @@ -308,7 +318,7 @@ describe("reader write splitting test", () => { when(mockPluginService.getCurrentClient()).thenReturn(instance(mockClosedWriterClient)); when(mockPluginService.getCurrentHostInfo()).thenReturn(writerHost); - const target = new ReadWriteSplittingPlugin( + const target = new TestReadWriteSplitting( mockPluginServiceInstance, properties, mockHostListProviderService, @@ -318,7 +328,7 @@ describe("reader write splitting test", () => { await expect(async () => await target.switchClientIfRequired(true)).rejects.toThrow(AwsWrapperError); verify(mockPluginService.setCurrentClient(anything(), anything())).never(); - expect(target.readerTargetClient).toEqual(undefined); + expect(target.getReaderTargetClient()).toEqual(undefined); }); it("test execute failover to new writer", async () => { @@ -330,7 +340,7 @@ describe("reader write splitting test", () => { when(mockPluginService.getCurrentClient()).thenReturn(mockNewWriterClient); when(mockPluginService.getCurrentHostInfo()).thenReturn(writerHost); when(await mockPluginService.isClientValid(mockWriterWrapper)).thenReturn(true); - const target = new ReadWriteSplittingPlugin( + const target = new TestReadWriteSplitting( mockPluginServiceInstance, properties, mockHostListProviderService, @@ -352,7 +362,7 @@ describe("reader write splitting test", () => { when(mockPluginService.getCurrentClient()).thenReturn(mockWriterClient); when(mockPluginService.getCurrentHostInfo()).thenReturn(writerHost); - const target = new ReadWriteSplittingPlugin( + const target = new TestReadWriteSplitting( mockPluginServiceInstance, properties, mockHostListProviderService, @@ -372,7 +382,7 @@ describe("reader write splitting test", () => { when(mockPluginService.getCurrentHostInfo()).thenReturn(writerHost); when(mockPluginService.acceptsStrategy(anything(), anything())).thenReturn(true); - const target = new ReadWriteSplittingPlugin( + const target = new TestReadWriteSplitting( mockPluginServiceInstance, properties, mockHostListProviderService, @@ -394,7 +404,7 @@ describe("reader write splitting test", () => { when(mockPluginService.acceptsStrategy(anything(), anything())).thenReturn(true); when(mockHostListProviderService.isStaticHostListProvider()).thenReturn(false); - const target = new ReadWriteSplittingPlugin( + const target = new TestReadWriteSplitting( mockPluginServiceInstance, properties, mockHostListProviderServiceInstance, @@ -411,6 +421,7 @@ describe("reader write splitting test", () => { when(mockPluginService.getHosts()).thenReturn(singleReaderTopology); when(mockPluginService.getHostInfoByStrategy(anything(), anything())).thenReturn(readerHost1); when(mockPluginService.getCurrentClient()).thenReturn(instance(mockWriterClient)); + when(mockPluginService.isPooledClient()).thenReturn(true); when(await mockWriterClient.isValid()).thenReturn(true); when(mockPluginService.getCurrentHostInfo()).thenReturn(writerHost).thenReturn(writerHost).thenReturn(readerHost1); when(mockDriverDialect.connect(anything(), anything())).thenReturn(Promise.resolve(poolClientWrapper)); @@ -431,7 +442,7 @@ describe("reader write splitting test", () => { const spyTarget = instance(target); await spyTarget.switchClientIfRequired(true); await spyTarget.switchClientIfRequired(false); - verify(target.closeTargetClientIfIdle(poolClientWrapper)).once(); + verify(target.closeReaderClientIfIdle()).twice(); }); it("test pooled writer connection after set read only", async () => { @@ -439,6 +450,7 @@ describe("reader write splitting test", () => { when(mockPluginService.getHosts()).thenReturn(singleReaderTopology); when(mockPluginService.getHostInfoByStrategy(HostRole.READER, "random")).thenReturn(readerHost1); when(mockPluginService.getCurrentClient()).thenReturn(instance(mockWriterClient)); + when(mockPluginService.isPooledClient()).thenReturn(true); when(await mockWriterClient.isValid()).thenReturn(true); when(mockPluginService.getCurrentHostInfo()) .thenReturn(writerHost) @@ -480,6 +492,7 @@ describe("reader write splitting test", () => { await spyTarget.switchClientIfRequired(false); await spyTarget.switchClientIfRequired(true); - verify(target.closeTargetClientIfIdle(poolClientWrapper)).twice(); + verify(target.closeReaderClientIfIdle()).times(3); + verify(target.closeWriterClientIfIdle()).once(); }); });