diff --git a/handwritten/spanner/src/transaction.ts b/handwritten/spanner/src/transaction.ts index 75d4b2d00794..5f18ae64001e 100644 --- a/handwritten/spanner/src/transaction.ts +++ b/handwritten/spanner/src/transaction.ts @@ -14,37 +14,37 @@ * limitations under the License. */ -import {DateStruct, PreciseDate} from '@google-cloud/precise-date'; -import {promisifyAll} from '@google-cloud/promisify'; -import {isEmpty, toArray} from './helper'; +import { DateStruct, PreciseDate } from '@google-cloud/precise-date'; +import { promisifyAll } from '@google-cloud/promisify'; +import { isEmpty, toArray } from './helper'; import Long = require('long'); -import {EventEmitter} from 'events'; -import {grpc, CallOptions, ServiceError, Status, GoogleError} from 'google-gax'; -import {common as p} from 'protobufjs'; -import {finished, Readable, PassThrough, Stream} from 'stream'; +import { EventEmitter } from 'events'; +import { grpc, CallOptions, ServiceError, Status, GoogleError } from 'google-gax'; +import { common as p } from 'protobufjs'; +import { finished, Readable, PassThrough, Stream } from 'stream'; -import {codec, Json, JSONOptions, Type, Value} from './codec'; +import { codec, Json, JSONOptions, Type, Value } from './codec'; import { PartialResultStream, partialResultStream, ResumeToken, Row, } from './partial-result-stream'; -import {Session} from './session'; -import {Key} from './table'; -import {Span} from './instrument'; -import {google as spannerClient} from '../protos/protos'; +import { Session } from './session'; +import { Key } from './table'; +import { Span } from './instrument'; +import { google as spannerClient } from '../protos/protos'; import { NormalCallback, addLeaderAwareRoutingHeader, getCommonHeaders, } from './common'; -import {google} from '../protos/protos'; +import { google } from '../protos/protos'; import IsolationLevel = google.spanner.v1.TransactionOptions.IsolationLevel; import IAny = google.protobuf.IAny; import IQueryOptions = google.spanner.v1.ExecuteSqlRequest.IQueryOptions; import IRequestOptions = google.spanner.v1.IRequestOptions; -import {Database, Spanner} from '.'; +import { Database, Spanner } from '.'; import ReadLockMode = google.spanner.v1.TransactionOptions.ReadWrite.ReadLockMode; import { ObservabilityOptions, @@ -53,8 +53,8 @@ import { setSpanErrorAndException, traceConfig, } from './instrument'; -import {RunTransactionOptions} from './transaction-runner'; -import {injectRequestIDIntoHeaders, nextNthRequest} from './request_id_header'; +import { RunTransactionOptions } from './transaction-runner'; +import { injectRequestIDIntoHeaders, nextNthRequest } from './request_id_header'; export type Rows = Array; const RETRY_INFO_TYPE = 'type.googleapis.com/google.rpc.retryinfo'; @@ -128,10 +128,10 @@ export interface CommitOptions { export interface Statement { sql: string; - params?: {[param: string]: Value}; - types?: Type | {[param: string]: Value}; + params?: { [param: string]: Value }; + types?: Type | { [param: string]: Value }; // This property is used internally as a mapping for types. Do not set it manually - paramTypes?: {[k: string]: google.spanner.v1.Type} | null; + paramTypes?: { [k: string]: google.spanner.v1.Type } | null; } export interface ExecuteSqlRequest extends Statement, RequestOptions { @@ -296,6 +296,7 @@ export class Snapshot extends EventEmitter { | undefined | null; id?: Uint8Array | string; + public _affinityKey?: string; multiplexedSessionPreviousTransactionId?: Uint8Array | string; ended: boolean; metadata?: spannerClient.spanner.v1.ITransaction; @@ -305,7 +306,7 @@ export class Snapshot extends EventEmitter { requestStream: (config: {}) => Readable; session: Session; queryOptions?: IQueryOptions; - commonHeaders_: {[k: string]: string}; + commonHeaders_: { [k: string]: string }; requestOptions?: Pick; _observabilityOptions?: ObservabilityOptions; _traceConfig: traceConfig; @@ -365,11 +366,31 @@ export class Snapshot extends EventEmitter { this.ended = false; this.session = session; this.queryOptions = Object.assign({}, queryOptions); - this.request = session.request.bind(session); - this.requestStream = session.requestStream.bind(session); + // If the session is multiplexed, generate a unique affinity key (UUID) for this + // specific transaction/snapshot. This allows requests using the same shared + // multiplexed session to be distributed across different gRPC channels. + if (session.metadata && session.metadata.multiplexed) { + this._affinityKey = require('uuid').v4(); + } + + this.request = (config: any, callback: Function) => { + if (this._affinityKey) { + config.headers = config.headers || {}; + config.headers['x-grpc-gcp-affinity-key'] = this._affinityKey; + } + return session.request(config, callback); + }; + + this.requestStream = (config: any) => { + if (this._affinityKey) { + config.headers = config.headers || {}; + config.headers['x-grpc-gcp-affinity-key'] = this._affinityKey; + } + return session.requestStream(config); + }; const readOnly = Snapshot.encodeTimestampBounds(options || {}); - this._options = {readOnly}; + this._options = { readOnly }; this._dbName = (this.session.parent as Database).formattedName_; this._waitingRequests = []; this._inlineBeginStarted = false; @@ -463,13 +484,13 @@ export class Snapshot extends EventEmitter { } else if (lowPriority.length > 0) { // RULE 2: If only 'insert' key(s) exist, find the one with // highest number of values - const {bestCandidates} = lowPriority.reduce( + const { bestCandidates } = lowPriority.reduce( (acc, mutation) => { const size = mutation.insert?.values?.length || 0; if (size > acc.maxSize) { // New largest size found, start a new list - return {maxSize: size, bestCandidates: [mutation]}; + return { maxSize: size, bestCandidates: [mutation] }; } if (size === acc.maxSize) { // Same size as current max, add to list @@ -905,7 +926,7 @@ export class Snapshot extends EventEmitter { if (attempt === 1) { span.addEvent('Starting stream'); } else { - span.addEvent('Re-attempting start stream', {attempt: attempt}); + span.addEvent('Re-attempting start stream', { attempt: attempt }); } } else { span.addEvent('Resuming stream', { @@ -917,7 +938,7 @@ export class Snapshot extends EventEmitter { return this.requestStream({ client: 'SpannerClient', method: 'streamingRead', - reqOpts: Object.assign({}, reqOpts, {resumeToken}), + reqOpts: Object.assign({}, reqOpts, { resumeToken }), gaxOpts: gaxOptions, headers: injectRequestIDIntoHeaders( headers, @@ -1420,7 +1441,7 @@ export class Snapshot extends EventEmitter { */ runStream(query: string | ExecuteSqlRequest): PartialResultStream { if (typeof query === 'string') { - query = {sql: query} as ExecuteSqlRequest; + query = { sql: query } as ExecuteSqlRequest; } query = Object.assign({}, query) as ExecuteSqlRequest; @@ -1445,7 +1466,7 @@ export class Snapshot extends EventEmitter { const sanitizeRequest = () => { query = query as ExecuteSqlRequest; - const {params, paramTypes} = Snapshot.encodeParams(query); + const { params, paramTypes } = Snapshot.encodeParams(query); const transaction: spannerClient.spanner.v1.ITransactionSelector = {}; if (this.id) { transaction.id = this.id as Uint8Array; @@ -1512,7 +1533,7 @@ export class Snapshot extends EventEmitter { if (attempt === 1) { span.addEvent('Starting stream'); } else { - span.addEvent('Re-attempting start stream', {attempt: attempt}); + span.addEvent('Re-attempting start stream', { attempt: attempt }); } } else { span.addEvent('Resuming stream', { @@ -1536,7 +1557,7 @@ export class Snapshot extends EventEmitter { return this.requestStream({ client: 'SpannerClient', method: 'executeStreamingSql', - reqOpts: Object.assign({}, reqOpts, {resumeToken}), + reqOpts: Object.assign({}, reqOpts, { resumeToken }), gaxOpts: gaxOptions, headers: injectRequestIDIntoHeaders( headers, @@ -1665,7 +1686,7 @@ export class Snapshot extends EventEmitter { options: TimestampBounds, ): spannerClient.spanner.v1.TransactionOptions.IReadOnly { const readOnly: spannerClient.spanner.v1.TransactionOptions.IReadOnly = {}; - const {returnReadTimestamp = true} = options; + const { returnReadTimestamp = true } = options; if (options.minReadTimestamp instanceof PreciseDate) { readOnly.minReadTimestamp = ( @@ -1713,8 +1734,8 @@ export class Snapshot extends EventEmitter { static encodeParams(request: ExecuteSqlRequest) { const typeMap = request.types || {}; - const params: p.IStruct = {fields: request.params?.fields || {}}; - const paramTypes: {[field: string]: spannerClient.spanner.v1.Type} = + const params: p.IStruct = { fields: request.params?.fields || {} }; + const paramTypes: { [field: string]: spannerClient.spanner.v1.Type } = request.paramTypes || {}; if (request.params && !request.params.fields) { @@ -1751,7 +1772,7 @@ export class Snapshot extends EventEmitter { }); } - return {params, paramTypes}; + return { params, paramTypes }; } /** @@ -1787,12 +1808,12 @@ export class Snapshot extends EventEmitter { resp: spannerClient.spanner.v1.ITransaction, span: Span, ): void { - const {id, readTimestamp} = resp; + const { id, readTimestamp } = resp; this.id = id!; this.metadata = resp; - span.addEvent('Transaction Creation Done', {id: this.id.toString()}); + span.addEvent('Transaction Creation Done', { id: this.id.toString() }); if (readTimestamp) { this.readTimestampProto = readTimestamp; @@ -1822,7 +1843,7 @@ export class Snapshot extends EventEmitter { // Queue subsequent requests. return (resumeToken?: ResumeToken): Readable => { const streamProxy = new Readable({ - read() {}, + read() { }, }); this._waitingRequests.push(() => { @@ -1906,7 +1927,7 @@ export class Dml extends Snapshot { callback?: RunUpdateCallback, ): void | Promise { if (typeof query === 'string') { - query = {sql: query} as ExecuteSqlRequest; + query = { sql: query } as ExecuteSqlRequest; } return startTrace( @@ -2051,7 +2072,7 @@ export class Transaction extends Dml { super(session, undefined, queryOptions); this._queuedMutations = []; - this._options = {readWrite: options}; + this._options = { readWrite: options }; this._options.isolationLevel = IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED; this.requestOptions = requestOptions; this._retryCommit = false; @@ -2167,11 +2188,11 @@ export class Transaction extends Dml { const statements: spannerClient.spanner.v1.ExecuteBatchDmlRequest.IStatement[] = queries.map(query => { if (typeof query === 'string') { - return {sql: query}; + return { sql: query }; } - const {sql} = query; - const {params, paramTypes} = Snapshot.encodeParams(query); - return {sql, params, paramTypes}; + const { sql } = query; + const { params, paramTypes } = Snapshot.encodeParams(query); + return { sql, params, paramTypes }; }); const transaction: spannerClient.spanner.v1.ITransactionSelector = {}; @@ -2235,7 +2256,7 @@ export class Transaction extends Dml { if (err) { const rowCounts: number[] = []; - batchUpdateError = Object.assign(err, {rowCounts}); + batchUpdateError = Object.assign(err, { rowCounts }); setSpanError(span, batchUpdateError); span.end(); callback!(batchUpdateError, rowCounts, resp); @@ -2244,18 +2265,18 @@ export class Transaction extends Dml { this._updatePrecommitToken(resp); - const {resultSets, status} = resp; + const { resultSets, status } = resp; for (const resultSet of resultSets) { if (!this.id && resultSet.metadata?.transaction) { this._update(resultSet.metadata.transaction, span); } } - const rowCounts: number[] = resultSets.map(({stats}) => { + const rowCounts: number[] = resultSets.map(({ stats }) => { return ( (stats && Number( stats[ - (stats as spannerClient.spanner.v1.ResultSetStats).rowCount! + (stats as spannerClient.spanner.v1.ResultSetStats).rowCount! ], )) || 0 @@ -2453,6 +2474,15 @@ export class Transaction extends Dml { addLeaderAwareRoutingHeader(headers); } + // Create a copy to avoid leaking the unbind header to the global commonHeaders_. + const requestHeaders = Object.assign({}, headers); + + // Signal to grpc-gcp to unbind the affinity key and clean up memory + // since this transaction is now complete. + if (this._affinityKey) { + requestHeaders['x-grpc-gcp-unbind'] = 'true'; + } + span.addEvent('Starting Commit'); const database = this.session.parent as Database; @@ -2463,7 +2493,7 @@ export class Transaction extends Dml { reqOpts, gaxOpts: gaxOpts, headers: injectRequestIDIntoHeaders( - headers, + requestHeaders, this.session, nextNthRequest(database), 1, @@ -2819,13 +2849,22 @@ export class Transaction extends Dml { addLeaderAwareRoutingHeader(headers); } + // Create a copy to avoid leaking the unbind header to the global commonHeaders_. + const requestHeaders = Object.assign({}, headers); + + // Signal to grpc-gcp to unbind the affinity key and clean up memory + // since this transaction is now complete. + if (this._affinityKey) { + requestHeaders['x-grpc-gcp-unbind'] = 'true'; + } + this.request( { client: 'SpannerClient', method: 'rollback', reqOpts, gaxOpts, - headers: headers, + headers: requestHeaders, }, (err: null | ServiceError) => { if (err) { @@ -3052,7 +3091,7 @@ function buildMutation( }); const mutation: spannerClient.spanner.v1.IMutation = { - [method]: {table, columns, values}, + [method]: { table, columns, values }, }; return mutation as spannerClient.spanner.v1.Mutation; } @@ -3072,7 +3111,7 @@ function buildDeleteMutation( keys: toArray(keys).map(codec.convertToListValue), }; const mutation: spannerClient.spanner.v1.IMutation = { - delete: {table, keySet}, + delete: { table, keySet }, }; return mutation as spannerClient.spanner.v1.Mutation; } @@ -3265,7 +3304,7 @@ export class PartitionedDml extends Dml { options = {} as spannerClient.spanner.v1.TransactionOptions.PartitionedDml, ) { super(session); - this._options = {partitionedDml: options}; + this._options = { partitionedDml: options }; } /** * Use option excludeTxnFromChangeStreams to exclude partitionedDml diff --git a/handwritten/spanner/test/multiplexed-session.ts b/handwritten/spanner/test/multiplexed-session.ts index d21f912f5e8f..ba934dd25d5a 100644 --- a/handwritten/spanner/test/multiplexed-session.ts +++ b/handwritten/spanner/test/multiplexed-session.ts @@ -43,7 +43,11 @@ describe('MultiplexedSession', () => { return Object.assign(new Session(DATABASE, name), props, { create: sandbox.stub().resolves(), - transaction: sandbox.stub().returns(new FakeTransaction()), + transaction: sandbox.stub().callsFake(() => { + const txn = new FakeTransaction(); + (txn as any)._affinityKey = 'mock-uuid'; + return txn; + }), }); }; @@ -185,13 +189,15 @@ describe('MultiplexedSession', () => { }); }); - it('should pass back the session and txn', done => { - const fakeTxn = new FakeTransaction() as unknown as Transaction; + it('should pass back the session and txn with affinity key', done => { sandbox.stub(multiplexedSession, '_getSession').resolves(fakeMuxSession); multiplexedSession.getSession((err, session, txn) => { assert.ifError(err); assert.strictEqual(session, fakeMuxSession); - assert.deepStrictEqual(txn, fakeTxn); + assert(txn); + assert(txn._affinityKey); + assert.strictEqual(typeof txn._affinityKey, 'string'); + assert(txn._affinityKey.length > 0); done(); }); });