diff --git a/handwritten/spanner/src/index.ts b/handwritten/spanner/src/index.ts index e46f34d109a2..0d364d210bca 100644 --- a/handwritten/spanner/src/index.ts +++ b/handwritten/spanner/src/index.ts @@ -166,6 +166,12 @@ export interface SpannerOptions extends GrpcClientOptions { >; observabilityOptions?: ObservabilityOptions; disableBuiltInMetrics?: boolean; + /** + * Experimental. Number of independent Spanner data clients/channels to use + * for transaction RPCs. When set, grpc-gcp channel pooling is disabled and + * read/write transactions are assigned to these clients round-robin. + */ + numChannels?: number; interceptors?: any[]; sessionLabels?: {[key: string]: string}; /** @@ -187,6 +193,7 @@ export interface RequestConfig { reqOpts: any; gaxOpts?: CallOptions; headers: {[k: string]: string}; + channelHint?: number; } export interface CreateInstanceRequest { config?: string; @@ -327,6 +334,8 @@ class Spanner extends GrpcService { private _isInSecureCredentials: boolean; private static _isAFEServerTimingEnabled: boolean | undefined; readonly _nthClientId: number; + private _numChannels: number; + private _nextChannelHint: number; /** * Placeholder used to auto populate a column with the commit timestamp. @@ -399,6 +408,11 @@ class Spanner extends GrpcService { } constructor(options?: SpannerOptions) { + const requestedNumChannels = + options?.numChannels && options.numChannels > 0 + ? Math.floor(options.numChannels) + : 0; + const numChannels = requestedNumChannels; const scopes: Array<{}> = []; const clientClasses = [ v1.DatabaseAdminClient, @@ -413,21 +427,36 @@ class Spanner extends GrpcService { } } - options = Object.assign( - { - libName: 'gccl', - libVersion: require('../../package.json').version, - scopes, - // Add grpc keep alive setting - 'grpc.keepalive_time_ms': 120000, + const defaultOptions = { + libName: 'gccl', + libVersion: require('../../package.json').version, + scopes, + // Add grpc keep alive setting + 'grpc.keepalive_time_ms': 120000, + grpc, + }; + if (!numChannels) { + Object.assign(defaultOptions, { // Enable grpc-gcp support 'grpc.callInvocationTransformer': grpcGcp.gcpCallInvocationTransformer, 'grpc.channelFactoryOverride': grpcGcp.gcpChannelFactoryOverride, 'grpc.gcpApiConfig': grpcGcp.createGcpApiConfig(gcpApiConfig), - grpc, - }, + }); + } + options = Object.assign( + defaultOptions, options || {}, ) as {} as SpannerOptions; + if (numChannels) { + Object.assign(defaultOptions, { + // Keep each generated gRPC channel on its own HTTP/2 transport. + 'grpc.use_local_subchannel_pool': 1, + }); + delete options['grpc.callInvocationTransformer']; + delete options['grpc.channelFactoryOverride']; + delete options['grpc.gcpApiConfig']; + delete options.numChannels; + } const directedReadOptions = options.directedReadOptions ? options.directedReadOptions @@ -505,6 +534,17 @@ class Spanner extends GrpcService { this._universeDomain = universeEndpoint; this.projectId_ = options.projectId; this.configureMetrics_(options.disableBuiltInMetrics); + this._numChannels = numChannels; + this._nextChannelHint = 0; + } + + _nextTransactionChannelHint(): number | undefined { + if (!this._numChannels) { + return undefined; + } + const channelHint = this._nextChannelHint; + this._nextChannelHint = (this._nextChannelHint + 1) % this._numChannels; + return channelHint; } get universeDomain() { @@ -1681,14 +1721,23 @@ class Spanner extends GrpcService { return; } const clientName = config.client; + let channelHint: number | undefined; + if (clientName === 'SpannerClient' && this._numChannels) { + channelHint = + typeof config.channelHint === 'number' + ? config.channelHint % this._numChannels + : this._nextTransactionChannelHint(); + } + const clientKey = + channelHint === undefined ? clientName : `${clientName}:${channelHint}`; try { - if (!this.clients_.has(clientName)) { - this.clients_.set(clientName, new v1[clientName](this.options)); + if (!this.clients_.has(clientKey)) { + this.clients_.set(clientKey, new v1[clientName](this.options)); } } catch (err) { callback(err, null); } - const gaxClient = this.clients_.get(clientName)!; + const gaxClient = this.clients_.get(clientKey)!; let reqOpts = extend(true, {}, config.reqOpts); reqOpts = replaceProjectIdToken(reqOpts, projectId!); // It would have been preferable to replace the projectId already in the diff --git a/handwritten/spanner/src/transaction.ts b/handwritten/spanner/src/transaction.ts index 75d4b2d00794..dd60041e5317 100644 --- a/handwritten/spanner/src/transaction.ts +++ b/handwritten/spanner/src/transaction.ts @@ -311,6 +311,7 @@ export class Snapshot extends EventEmitter { _traceConfig: traceConfig; protected _dbName?: string; protected _mutationKey: spannerClient.spanner.v1.Mutation | null; + protected _channelHint?: number; /** * The transaction ID. @@ -620,6 +621,7 @@ export class Snapshot extends EventEmitter { method: 'beginTransaction', reqOpts, gaxOpts, + channelHint: this._ensureChannelHint(), headers: injectRequestIDIntoHeaders(headers, this.session), }, ( @@ -919,6 +921,7 @@ export class Snapshot extends EventEmitter { method: 'streamingRead', reqOpts: Object.assign({}, reqOpts, {resumeToken}), gaxOpts: gaxOptions, + channelHint: this._ensureChannelHint(), headers: injectRequestIDIntoHeaders( headers, this.session, @@ -1538,6 +1541,7 @@ export class Snapshot extends EventEmitter { method: 'executeStreamingSql', reqOpts: Object.assign({}, reqOpts, {resumeToken}), gaxOpts: gaxOptions, + channelHint: this._ensureChannelHint(), headers: injectRequestIDIntoHeaders( headers, this.session, @@ -1853,6 +1857,17 @@ export class Snapshot extends EventEmitter { protected _getSpanner(): Spanner { return this.session.parent.parent.parent as Spanner; } + + protected _ensureChannelHint(): number | undefined { + if (this._channelHint === undefined) { + const spanner = this._getSpanner(); + this._channelHint = + typeof spanner._nextTransactionChannelHint === 'function' + ? spanner._nextTransactionChannelHint() + : undefined; + } + return this._channelHint; + } } /*! Developer Documentation @@ -2225,6 +2240,7 @@ export class Transaction extends Dml { method: 'executeBatchDml', reqOpts, gaxOpts, + channelHint: this._ensureChannelHint(), headers: headers, }, ( @@ -2462,6 +2478,7 @@ export class Transaction extends Dml { method: 'commit', reqOpts, gaxOpts: gaxOpts, + channelHint: this._ensureChannelHint(), headers: injectRequestIDIntoHeaders( headers, this.session, @@ -2825,6 +2842,7 @@ export class Transaction extends Dml { method: 'rollback', reqOpts, gaxOpts, + channelHint: this._ensureChannelHint(), headers: headers, }, (err: null | ServiceError) => { diff --git a/handwritten/spanner/test/index.ts b/handwritten/spanner/test/index.ts index 39f4dfdd24aa..4595048507bd 100644 --- a/handwritten/spanner/test/index.ts +++ b/handwritten/spanner/test/index.ts @@ -258,6 +258,50 @@ describe('Spanner', () => { ); }); + it('should use numChannels instead of grpc-gcp when explicitly provided', () => { + const options = extend({}, OPTIONS, { + numChannels: 4, + }); + const spanner = new Spanner(options); + const expectedOptions = extend({}, OPTIONS, { + libName: 'gccl', + libVersion: require('../../package.json').version, + scopes: [], + grpc, + 'grpc.keepalive_time_ms': 120000, + 'grpc.use_local_subchannel_pool': 1, + }); + + assert.deepStrictEqual( + getFake(spanner.auth).calledWith_[0], + expectedOptions, + ); + }); + + it('should use grpc-gcp when a custom grpc.gcpApiConfig is provided', () => { + const customGcpApiConfig = {channelPool: {maxSize: 2}}; + const options = extend({}, OPTIONS, { + 'grpc.gcpApiConfig': customGcpApiConfig, + }); + const spanner = new Spanner(options); + const expectedOptions = extend({}, OPTIONS, { + libName: 'gccl', + libVersion: require('../../package.json').version, + scopes: [], + grpc, + 'grpc.keepalive_time_ms': 120000, + 'grpc.callInvocationTransformer': + fakeGrpcGcp().gcpCallInvocationTransformer, + 'grpc.channelFactoryOverride': fakeGrpcGcp().gcpChannelFactoryOverride, + 'grpc.gcpApiConfig': customGcpApiConfig, + }); + + assert.deepStrictEqual( + getFake(spanner.auth).calledWith_[0], + expectedOptions, + ); + }); + it('should combine and uniquify all gapic client scopes', () => { const expectedScopes = ['a', 'b', 'c']; fakeV1.DatabaseAdminClient.scopes = ['a', 'c'];