Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 61 additions & 12 deletions handwritten/spanner/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ export interface SpannerOptions extends GrpcClientOptions {
>;
observabilityOptions?: ObservabilityOptions;
disableBuiltInMetrics?: boolean;
/**
* Experimental. Number of independent Spanner data clients/channels to use
* for transaction RPCs. When set, grpc-gcp channel pooling is disabled and
* read/write transactions are assigned to these clients round-robin.
*/
numChannels?: number;
interceptors?: any[];
sessionLabels?: {[key: string]: string};
/**
Expand All @@ -187,6 +193,7 @@ export interface RequestConfig {
reqOpts: any;
gaxOpts?: CallOptions;
headers: {[k: string]: string};
channelHint?: number;
}
export interface CreateInstanceRequest {
config?: string;
Expand Down Expand Up @@ -327,6 +334,8 @@ class Spanner extends GrpcService {
private _isInSecureCredentials: boolean;
private static _isAFEServerTimingEnabled: boolean | undefined;
readonly _nthClientId: number;
private _numChannels: number;
private _nextChannelHint: number;

/**
* Placeholder used to auto populate a column with the commit timestamp.
Expand Down Expand Up @@ -399,6 +408,11 @@ class Spanner extends GrpcService {
}

constructor(options?: SpannerOptions) {
const requestedNumChannels =
options?.numChannels && options.numChannels > 0
? Math.floor(options.numChannels)
: 0;
const numChannels = requestedNumChannels;
const scopes: Array<{}> = [];
const clientClasses = [
v1.DatabaseAdminClient,
Expand All @@ -413,21 +427,36 @@ class Spanner extends GrpcService {
}
}

options = Object.assign(
{
libName: 'gccl',
libVersion: require('../../package.json').version,
scopes,
// Add grpc keep alive setting
'grpc.keepalive_time_ms': 120000,
const defaultOptions = {
libName: 'gccl',
libVersion: require('../../package.json').version,
scopes,
// Add grpc keep alive setting
'grpc.keepalive_time_ms': 120000,
grpc,
};
if (!numChannels) {
Object.assign(defaultOptions, {
// Enable grpc-gcp support
'grpc.callInvocationTransformer': grpcGcp.gcpCallInvocationTransformer,
'grpc.channelFactoryOverride': grpcGcp.gcpChannelFactoryOverride,
'grpc.gcpApiConfig': grpcGcp.createGcpApiConfig(gcpApiConfig),
grpc,
},
});
}
options = Object.assign(
defaultOptions,
options || {},
) as {} as SpannerOptions;
if (numChannels) {
Object.assign(defaultOptions, {
// Keep each generated gRPC channel on its own HTTP/2 transport.
'grpc.use_local_subchannel_pool': 1,
});
delete options['grpc.callInvocationTransformer'];
delete options['grpc.channelFactoryOverride'];
delete options['grpc.gcpApiConfig'];
delete options.numChannels;
}

const directedReadOptions = options.directedReadOptions
? options.directedReadOptions
Expand Down Expand Up @@ -505,6 +534,17 @@ class Spanner extends GrpcService {
this._universeDomain = universeEndpoint;
this.projectId_ = options.projectId;
this.configureMetrics_(options.disableBuiltInMetrics);
this._numChannels = numChannels;
this._nextChannelHint = 0;
}

_nextTransactionChannelHint(): number | undefined {
if (!this._numChannels) {
return undefined;
}
const channelHint = this._nextChannelHint;
this._nextChannelHint = (this._nextChannelHint + 1) % this._numChannels;
return channelHint;
}

get universeDomain() {
Expand Down Expand Up @@ -1681,14 +1721,23 @@ class Spanner extends GrpcService {
return;
}
const clientName = config.client;
let channelHint: number | undefined;
if (clientName === 'SpannerClient' && this._numChannels) {
channelHint =
typeof config.channelHint === 'number'
? config.channelHint % this._numChannels
: this._nextTransactionChannelHint();
}
const clientKey =
channelHint === undefined ? clientName : `${clientName}:${channelHint}`;
try {
if (!this.clients_.has(clientName)) {
this.clients_.set(clientName, new v1[clientName](this.options));
if (!this.clients_.has(clientKey)) {
this.clients_.set(clientKey, new v1[clientName](this.options));
}
} catch (err) {
callback(err, null);
}
const gaxClient = this.clients_.get(clientName)!;
const gaxClient = this.clients_.get(clientKey)!;
let reqOpts = extend(true, {}, config.reqOpts);
reqOpts = replaceProjectIdToken(reqOpts, projectId!);
// It would have been preferable to replace the projectId already in the
Expand Down
18 changes: 18 additions & 0 deletions handwritten/spanner/src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ export class Snapshot extends EventEmitter {
_traceConfig: traceConfig;
protected _dbName?: string;
protected _mutationKey: spannerClient.spanner.v1.Mutation | null;
protected _channelHint?: number;

/**
* The transaction ID.
Expand Down Expand Up @@ -620,6 +621,7 @@ export class Snapshot extends EventEmitter {
method: 'beginTransaction',
reqOpts,
gaxOpts,
channelHint: this._ensureChannelHint(),
headers: injectRequestIDIntoHeaders(headers, this.session),
},
(
Expand Down Expand Up @@ -919,6 +921,7 @@ export class Snapshot extends EventEmitter {
method: 'streamingRead',
reqOpts: Object.assign({}, reqOpts, {resumeToken}),
gaxOpts: gaxOptions,
channelHint: this._ensureChannelHint(),
headers: injectRequestIDIntoHeaders(
headers,
this.session,
Expand Down Expand Up @@ -1538,6 +1541,7 @@ export class Snapshot extends EventEmitter {
method: 'executeStreamingSql',
reqOpts: Object.assign({}, reqOpts, {resumeToken}),
gaxOpts: gaxOptions,
channelHint: this._ensureChannelHint(),
headers: injectRequestIDIntoHeaders(
headers,
this.session,
Expand Down Expand Up @@ -1853,6 +1857,17 @@ export class Snapshot extends EventEmitter {
protected _getSpanner(): Spanner {
return this.session.parent.parent.parent as Spanner;
}

protected _ensureChannelHint(): number | undefined {
if (this._channelHint === undefined) {
const spanner = this._getSpanner();
this._channelHint =
typeof spanner._nextTransactionChannelHint === 'function'
? spanner._nextTransactionChannelHint()
: undefined;
}
return this._channelHint;
}
}

/*! Developer Documentation
Expand Down Expand Up @@ -2225,6 +2240,7 @@ export class Transaction extends Dml {
method: 'executeBatchDml',
reqOpts,
gaxOpts,
channelHint: this._ensureChannelHint(),
headers: headers,
},
(
Expand Down Expand Up @@ -2462,6 +2478,7 @@ export class Transaction extends Dml {
method: 'commit',
reqOpts,
gaxOpts: gaxOpts,
channelHint: this._ensureChannelHint(),
headers: injectRequestIDIntoHeaders(
headers,
this.session,
Expand Down Expand Up @@ -2825,6 +2842,7 @@ export class Transaction extends Dml {
method: 'rollback',
reqOpts,
gaxOpts,
channelHint: this._ensureChannelHint(),
headers: headers,
},
(err: null | ServiceError) => {
Expand Down
44 changes: 44 additions & 0 deletions handwritten/spanner/test/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,50 @@ describe('Spanner', () => {
);
});

it('should use numChannels instead of grpc-gcp when explicitly provided', () => {
const options = extend({}, OPTIONS, {
numChannels: 4,
});
const spanner = new Spanner(options);
const expectedOptions = extend({}, OPTIONS, {
libName: 'gccl',
libVersion: require('../../package.json').version,
scopes: [],
grpc,
'grpc.keepalive_time_ms': 120000,
'grpc.use_local_subchannel_pool': 1,
});

assert.deepStrictEqual(
getFake(spanner.auth).calledWith_[0],
expectedOptions,
);
});

it('should use grpc-gcp when a custom grpc.gcpApiConfig is provided', () => {
const customGcpApiConfig = {channelPool: {maxSize: 2}};
const options = extend({}, OPTIONS, {
'grpc.gcpApiConfig': customGcpApiConfig,
});
const spanner = new Spanner(options);
const expectedOptions = extend({}, OPTIONS, {
libName: 'gccl',
libVersion: require('../../package.json').version,
scopes: [],
grpc,
'grpc.keepalive_time_ms': 120000,
'grpc.callInvocationTransformer':
fakeGrpcGcp().gcpCallInvocationTransformer,
'grpc.channelFactoryOverride': fakeGrpcGcp().gcpChannelFactoryOverride,
'grpc.gcpApiConfig': customGcpApiConfig,
});

assert.deepStrictEqual(
getFake(spanner.auth).calledWith_[0],
expectedOptions,
);
});

it('should combine and uniquify all gapic client scopes', () => {
const expectedScopes = ['a', 'b', 'c'];
fakeV1.DatabaseAdminClient.scopes = ['a', 'c'];
Expand Down
Loading