From eb8f9df2f50d6c8049dbcc8f1593f3553514d4cd Mon Sep 17 00:00:00 2001 From: Alka Trivedi Date: Sat, 2 May 2026 16:05:58 +0530 Subject: [PATCH 1/2] feat(spanner): make affinity keys configurable via spanner_grpc_config.json --- handwritten/spanner/src/index.ts | 3 +- .../spanner/src/spanner_grpc_config.json | 33 +++++++-- handwritten/spanner/src/transaction.ts | 73 +++++++++++++++++-- handwritten/spanner/test/index.ts | 4 +- .../spanner/test/multiplexed-session.ts | 14 +++- 5 files changed, 107 insertions(+), 20 deletions(-) diff --git a/handwritten/spanner/src/index.ts b/handwritten/spanner/src/index.ts index e46f34d109a2..77ed43ffc6d5 100644 --- a/handwritten/spanner/src/index.ts +++ b/handwritten/spanner/src/index.ts @@ -423,7 +423,8 @@ class Spanner extends GrpcService { // Enable grpc-gcp support 'grpc.callInvocationTransformer': grpcGcp.gcpCallInvocationTransformer, 'grpc.channelFactoryOverride': grpcGcp.gcpChannelFactoryOverride, - 'grpc.gcpApiConfig': grpcGcp.createGcpApiConfig(gcpApiConfig), + // Bypass createGcpApiConfig to preserve our custom metadata fields + 'grpc.gcpApiConfig': gcpApiConfig, grpc, }, options || {}, diff --git a/handwritten/spanner/src/spanner_grpc_config.json b/handwritten/spanner/src/spanner_grpc_config.json index bd640bdbe0b9..a13e0ecfaaa9 100644 --- a/handwritten/spanner/src/spanner_grpc_config.json +++ b/handwritten/spanner/src/spanner_grpc_config.json @@ -29,49 +29,67 @@ "name": ["/google.spanner.v1.Spanner/ExecuteSql"], "affinity": { "command": "BOUND", - "affinityKey": "session" + "affinityKey": "session", + "affinityKeyLocation": "METADATA", + "metadataKey": "x-grpc-gcp-affinity-key" } }, { "name": ["/google.spanner.v1.Spanner/ExecuteStreamingSql"], "affinity": { "command": "BOUND", - "affinityKey": "session" + "affinityKey": "session", + "affinityKeyLocation": "METADATA", + "metadataKey": "x-grpc-gcp-affinity-key" } }, { "name": ["/google.spanner.v1.Spanner/Read"], "affinity": { "command": "BOUND", - "affinityKey": "session" + "affinityKey": "session", + "affinityKeyLocation": "METADATA", + "metadataKey": "x-grpc-gcp-affinity-key" } }, { "name": ["/google.spanner.v1.Spanner/StreamingRead"], "affinity": { "command": "BOUND", - "affinityKey": "session" + "affinityKey": "session", + "affinityKeyLocation": "METADATA", + "metadataKey": "x-grpc-gcp-affinity-key" } }, { "name": ["/google.spanner.v1.Spanner/BeginTransaction"], "affinity": { "command": "BOUND", - "affinityKey": "session" + "affinityKey": "session", + "affinityKeyLocation": "METADATA", + "metadataKey": "x-grpc-gcp-affinity-key" } }, { "name": ["/google.spanner.v1.Spanner/Commit"], "affinity": { "command": "BOUND", - "affinityKey": "session" + "affinityKey": "session", + "affinityKeyLocation": "METADATA", + "metadataKey": "x-grpc-gcp-affinity-key", + "unbindKeyLocation": "METADATA", + "unbindMetadataKey": "x-grpc-gcp-unbind" } }, { "name": ["/google.spanner.v1.Spanner/Rollback"], "affinity": { "command": "BOUND", - "affinityKey": "session" + "affinityKey": "session", + "affinityKeyLocation": "METADATA", + "metadataKey": "x-grpc-gcp-affinity-key", + "unbindKeyLocation": "METADATA", + "unbindMetadataKey": "x-grpc-gcp-unbind" } }, { @@ -90,3 +108,4 @@ } ] } + diff --git a/handwritten/spanner/src/transaction.ts b/handwritten/spanner/src/transaction.ts index 75d4b2d00794..c1750c250dc0 100644 --- a/handwritten/spanner/src/transaction.ts +++ b/handwritten/spanner/src/transaction.ts @@ -55,6 +55,9 @@ import { } from './instrument'; import {RunTransactionOptions} from './transaction-runner'; import {injectRequestIDIntoHeaders, nextNthRequest} from './request_id_header'; +import * as uuid from 'uuid'; + +const gcpApiConfig = require('./spanner_grpc_config.json'); export type Rows = Array; const RETRY_INFO_TYPE = 'type.googleapis.com/google.rpc.retryinfo'; @@ -296,6 +299,7 @@ export class Snapshot extends EventEmitter { | undefined | null; id?: Uint8Array | string; + public _affinityKey?: string; multiplexedSessionPreviousTransactionId?: Uint8Array | string; ended: boolean; metadata?: spannerClient.spanner.v1.ITransaction; @@ -365,8 +369,39 @@ export class Snapshot extends EventEmitter { this.ended = false; this.session = session; this.queryOptions = Object.assign({}, queryOptions); - this.request = session.request.bind(session); - this.requestStream = session.requestStream.bind(session); + // If the session is multiplexed, generate a unique affinity key (UUID) for this + // specific transaction/snapshot. This allows requests using the same shared + // multiplexed session to be distributed across different gRPC channels. + if (session.metadata && session.metadata.multiplexed) { + this._affinityKey = uuid.v4(); + } + const getMetadataHeaderName = (rpcMethodName: string): string => { + const method = + rpcMethodName.charAt(0).toUpperCase() + rpcMethodName.slice(1); + const fullRpcPath = `/google.spanner.v1.Spanner/${method}`; + const methodConfig = gcpApiConfig.method.find(m => + m.name.includes(fullRpcPath), + ); + return methodConfig?.affinity?.metadataKey || 'x-grpc-gcp-affinity-key'; + }; + + this.request = (config: any, callback: Function) => { + if (this._affinityKey) { + const headerName = getMetadataHeaderName(config.method); + config.headers = config.headers || {}; + config.headers[headerName] = this._affinityKey; + } + return session.request(config, callback); + }; + + this.requestStream = (config: any) => { + if (this._affinityKey) { + const headerName = getMetadataHeaderName(config.method); + config.headers = config.headers || {}; + config.headers[headerName] = this._affinityKey; + } + return session.requestStream(config); + }; const readOnly = Snapshot.encodeTimestampBounds(options || {}); this._options = {readOnly}; @@ -1324,7 +1359,7 @@ export class Snapshot extends EventEmitter { * options as well as several convenience properties. * * @see [Query Syntax](https://cloud.google.com/spanner/docs/query-syntax) - * @see [ExecuteSql API Documentation](https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#google.spanner.v1.Spanner.ExecuteSql) + * @see [ExecuteSql API Documentation](https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#google.spanner.v1.ExecuteSql) * * @typedef {object} ExecuteSqlRequest * @property {string} resumeToken The token used to resume getting results. @@ -2453,6 +2488,20 @@ export class Transaction extends Dml { addLeaderAwareRoutingHeader(headers); } + // Create a copy to avoid leaking the unbind header to the global commonHeaders_. + const requestHeaders = Object.assign({}, headers); + + // Signal to grpc-gcp to unbind the affinity key and clean up memory + // since this transaction is now complete. + if (this._affinityKey) { + const commitConfig = gcpApiConfig.method.find(m => + m.name.includes('/google.spanner.v1.Spanner/Commit'), + ); + const unbindHeaderName = + commitConfig?.affinity?.unbindMetadataKey || 'x-grpc-gcp-unbind'; + requestHeaders[unbindHeaderName] = 'true'; + } + span.addEvent('Starting Commit'); const database = this.session.parent as Database; @@ -2463,7 +2512,7 @@ export class Transaction extends Dml { reqOpts, gaxOpts: gaxOpts, headers: injectRequestIDIntoHeaders( - headers, + requestHeaders, this.session, nextNthRequest(database), 1, @@ -2819,13 +2868,27 @@ export class Transaction extends Dml { addLeaderAwareRoutingHeader(headers); } + // Create a copy to avoid leaking the unbind header to the global commonHeaders_. + const requestHeaders = Object.assign({}, headers); + + // Signal to grpc-gcp to unbind the affinity key and clean up memory + // since this transaction is now complete. + if (this._affinityKey) { + const rollbackConfig = gcpApiConfig.method.find(m => + m.name.includes('/google.spanner.v1.Spanner/Rollback'), + ); + const unbindHeaderName = + rollbackConfig?.affinity?.unbindMetadataKey || 'x-grpc-gcp-unbind'; + requestHeaders[unbindHeaderName] = 'true'; + } + this.request( { client: 'SpannerClient', method: 'rollback', reqOpts, gaxOpts, - headers: headers, + headers: requestHeaders, }, (err: null | ServiceError) => { if (err) { diff --git a/handwritten/spanner/test/index.ts b/handwritten/spanner/test/index.ts index 39f4dfdd24aa..2591a5ec0bb1 100644 --- a/handwritten/spanner/test/index.ts +++ b/handwritten/spanner/test/index.ts @@ -232,9 +232,7 @@ describe('Spanner', () => { 'grpc.callInvocationTransformer': fakeGrpcGcp().gcpCallInvocationTransformer, 'grpc.channelFactoryOverride': fakeGrpcGcp().gcpChannelFactoryOverride, - 'grpc.gcpApiConfig': { - calledWith_: apiConfig, - }, + 'grpc.gcpApiConfig': apiConfig, }); it('should localize a cached gapic client map', () => { diff --git a/handwritten/spanner/test/multiplexed-session.ts b/handwritten/spanner/test/multiplexed-session.ts index d21f912f5e8f..ba934dd25d5a 100644 --- a/handwritten/spanner/test/multiplexed-session.ts +++ b/handwritten/spanner/test/multiplexed-session.ts @@ -43,7 +43,11 @@ describe('MultiplexedSession', () => { return Object.assign(new Session(DATABASE, name), props, { create: sandbox.stub().resolves(), - transaction: sandbox.stub().returns(new FakeTransaction()), + transaction: sandbox.stub().callsFake(() => { + const txn = new FakeTransaction(); + (txn as any)._affinityKey = 'mock-uuid'; + return txn; + }), }); }; @@ -185,13 +189,15 @@ describe('MultiplexedSession', () => { }); }); - it('should pass back the session and txn', done => { - const fakeTxn = new FakeTransaction() as unknown as Transaction; + it('should pass back the session and txn with affinity key', done => { sandbox.stub(multiplexedSession, '_getSession').resolves(fakeMuxSession); multiplexedSession.getSession((err, session, txn) => { assert.ifError(err); assert.strictEqual(session, fakeMuxSession); - assert.deepStrictEqual(txn, fakeTxn); + assert(txn); + assert(txn._affinityKey); + assert.strictEqual(typeof txn._affinityKey, 'string'); + assert(txn._affinityKey.length > 0); done(); }); }); From 77436fe3420f519e69baff0f3e230f7735a14448 Mon Sep 17 00:00:00 2001 From: Alka Trivedi Date: Sun, 3 May 2026 07:21:13 +0530 Subject: [PATCH 2/2] gemini review comments --- handwritten/spanner/src/transaction.ts | 51 ++++++++++++++++++-------- 1 file changed, 36 insertions(+), 15 deletions(-) diff --git a/handwritten/spanner/src/transaction.ts b/handwritten/spanner/src/transaction.ts index c1750c250dc0..91c467c04a22 100644 --- a/handwritten/spanner/src/transaction.ts +++ b/handwritten/spanner/src/transaction.ts @@ -59,6 +59,18 @@ import * as uuid from 'uuid'; const gcpApiConfig = require('./spanner_grpc_config.json'); +// Pre-compute a map for O(1) affinity lookups +const methodToAffinityMap = new Map(); +if (gcpApiConfig && gcpApiConfig.method) { + gcpApiConfig.method.forEach((m: any) => { + if (m.name && m.affinity) { + m.name.forEach((name: string) => { + methodToAffinityMap.set(name, m.affinity); + }); + } + }); +} + export type Rows = Array; const RETRY_INFO_TYPE = 'type.googleapis.com/google.rpc.retryinfo'; const RETRY_INFO_BIN = 'google.rpc.retryinfo-bin'; @@ -379,17 +391,21 @@ export class Snapshot extends EventEmitter { const method = rpcMethodName.charAt(0).toUpperCase() + rpcMethodName.slice(1); const fullRpcPath = `/google.spanner.v1.Spanner/${method}`; - const methodConfig = gcpApiConfig.method.find(m => - m.name.includes(fullRpcPath), - ); - return methodConfig?.affinity?.metadataKey || 'x-grpc-gcp-affinity-key'; + + const affinity = methodToAffinityMap.get(fullRpcPath); + return affinity?.metadataKey || 'x-grpc-gcp-affinity-key'; }; this.request = (config: any, callback: Function) => { if (this._affinityKey) { const headerName = getMetadataHeaderName(config.method); - config.headers = config.headers || {}; - config.headers[headerName] = this._affinityKey; + config = { + ...config, + headers: { + ...(config.headers || {}), + [headerName]: this._affinityKey, + }, + }; } return session.request(config, callback); }; @@ -397,8 +413,13 @@ export class Snapshot extends EventEmitter { this.requestStream = (config: any) => { if (this._affinityKey) { const headerName = getMetadataHeaderName(config.method); - config.headers = config.headers || {}; - config.headers[headerName] = this._affinityKey; + config = { + ...config, + headers: { + ...(config.headers || {}), + [headerName]: this._affinityKey, + }, + }; } return session.requestStream(config); }; @@ -1359,7 +1380,7 @@ export class Snapshot extends EventEmitter { * options as well as several convenience properties. * * @see [Query Syntax](https://cloud.google.com/spanner/docs/query-syntax) - * @see [ExecuteSql API Documentation](https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#google.spanner.v1.ExecuteSql) + * @see [ExecuteSql API Documentation](https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#google.spanner.v1.Spanner.ExecuteSql) * * @typedef {object} ExecuteSqlRequest * @property {string} resumeToken The token used to resume getting results. @@ -2494,11 +2515,11 @@ export class Transaction extends Dml { // Signal to grpc-gcp to unbind the affinity key and clean up memory // since this transaction is now complete. if (this._affinityKey) { - const commitConfig = gcpApiConfig.method.find(m => - m.name.includes('/google.spanner.v1.Spanner/Commit'), + const affinity = methodToAffinityMap.get( + '/google.spanner.v1.Spanner/Commit', ); const unbindHeaderName = - commitConfig?.affinity?.unbindMetadataKey || 'x-grpc-gcp-unbind'; + affinity?.unbindMetadataKey || 'x-grpc-gcp-unbind'; requestHeaders[unbindHeaderName] = 'true'; } @@ -2874,11 +2895,11 @@ export class Transaction extends Dml { // Signal to grpc-gcp to unbind the affinity key and clean up memory // since this transaction is now complete. if (this._affinityKey) { - const rollbackConfig = gcpApiConfig.method.find(m => - m.name.includes('/google.spanner.v1.Spanner/Rollback'), + const affinity = methodToAffinityMap.get( + '/google.spanner.v1.Spanner/Rollback', ); const unbindHeaderName = - rollbackConfig?.affinity?.unbindMetadataKey || 'x-grpc-gcp-unbind'; + affinity?.unbindMetadataKey || 'x-grpc-gcp-unbind'; requestHeaders[unbindHeaderName] = 'true'; }