Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 92 additions & 53 deletions handwritten/spanner/src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,37 +14,37 @@
* limitations under the License.
*/

import {DateStruct, PreciseDate} from '@google-cloud/precise-date';
import {promisifyAll} from '@google-cloud/promisify';
import {isEmpty, toArray} from './helper';
import { DateStruct, PreciseDate } from '@google-cloud/precise-date';
import { promisifyAll } from '@google-cloud/promisify';
import { isEmpty, toArray } from './helper';
import Long = require('long');
import {EventEmitter} from 'events';
import {grpc, CallOptions, ServiceError, Status, GoogleError} from 'google-gax';
import {common as p} from 'protobufjs';
import {finished, Readable, PassThrough, Stream} from 'stream';
import { EventEmitter } from 'events';
import { grpc, CallOptions, ServiceError, Status, GoogleError } from 'google-gax';
import { common as p } from 'protobufjs';
import { finished, Readable, PassThrough, Stream } from 'stream';

import {codec, Json, JSONOptions, Type, Value} from './codec';
import { codec, Json, JSONOptions, Type, Value } from './codec';
import {
PartialResultStream,
partialResultStream,
ResumeToken,
Row,
} from './partial-result-stream';
import {Session} from './session';
import {Key} from './table';
import {Span} from './instrument';
import {google as spannerClient} from '../protos/protos';
import { Session } from './session';
import { Key } from './table';
import { Span } from './instrument';
import { google as spannerClient } from '../protos/protos';
import {
NormalCallback,
addLeaderAwareRoutingHeader,
getCommonHeaders,
} from './common';
import {google} from '../protos/protos';
import { google } from '../protos/protos';
import IsolationLevel = google.spanner.v1.TransactionOptions.IsolationLevel;
import IAny = google.protobuf.IAny;
import IQueryOptions = google.spanner.v1.ExecuteSqlRequest.IQueryOptions;
import IRequestOptions = google.spanner.v1.IRequestOptions;
import {Database, Spanner} from '.';
import { Database, Spanner } from '.';
import ReadLockMode = google.spanner.v1.TransactionOptions.ReadWrite.ReadLockMode;
import {
ObservabilityOptions,
Expand All @@ -53,8 +53,8 @@ import {
setSpanErrorAndException,
traceConfig,
} from './instrument';
import {RunTransactionOptions} from './transaction-runner';
import {injectRequestIDIntoHeaders, nextNthRequest} from './request_id_header';
import { RunTransactionOptions } from './transaction-runner';
import { injectRequestIDIntoHeaders, nextNthRequest } from './request_id_header';

export type Rows = Array<Row | Json>;
const RETRY_INFO_TYPE = 'type.googleapis.com/google.rpc.retryinfo';
Expand Down Expand Up @@ -128,10 +128,10 @@ export interface CommitOptions {

export interface Statement {
sql: string;
params?: {[param: string]: Value};
types?: Type | {[param: string]: Value};
params?: { [param: string]: Value };
types?: Type | { [param: string]: Value };
// This property is used internally as a mapping for types. Do not set it manually
paramTypes?: {[k: string]: google.spanner.v1.Type} | null;
paramTypes?: { [k: string]: google.spanner.v1.Type } | null;
}

export interface ExecuteSqlRequest extends Statement, RequestOptions {
Expand Down Expand Up @@ -296,6 +296,7 @@ export class Snapshot extends EventEmitter {
| undefined
| null;
id?: Uint8Array | string;
public _affinityKey?: string;
multiplexedSessionPreviousTransactionId?: Uint8Array | string;
ended: boolean;
metadata?: spannerClient.spanner.v1.ITransaction;
Expand All @@ -305,7 +306,7 @@ export class Snapshot extends EventEmitter {
requestStream: (config: {}) => Readable;
session: Session;
queryOptions?: IQueryOptions;
commonHeaders_: {[k: string]: string};
commonHeaders_: { [k: string]: string };
requestOptions?: Pick<IRequestOptions, 'transactionTag'>;
_observabilityOptions?: ObservabilityOptions;
_traceConfig: traceConfig;
Expand Down Expand Up @@ -365,11 +366,31 @@ export class Snapshot extends EventEmitter {
this.ended = false;
this.session = session;
this.queryOptions = Object.assign({}, queryOptions);
this.request = session.request.bind(session);
this.requestStream = session.requestStream.bind(session);
// If the session is multiplexed, generate a unique affinity key (UUID) for this
// specific transaction/snapshot. This allows requests using the same shared
// multiplexed session to be distributed across different gRPC channels.
if (session.metadata && session.metadata.multiplexed) {
this._affinityKey = require('uuid').v4();
}

this.request = (config: any, callback: Function) => {
if (this._affinityKey) {
config.headers = config.headers || {};
config.headers['x-grpc-gcp-affinity-key'] = this._affinityKey;
}
return session.request(config, callback);
};

this.requestStream = (config: any) => {
if (this._affinityKey) {
config.headers = config.headers || {};
config.headers['x-grpc-gcp-affinity-key'] = this._affinityKey;
}
return session.requestStream(config);
};
Comment on lines +376 to +390
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation mutates the config object by directly modifying its headers property. This can lead to unexpected side effects if the caller reuses the configuration object for multiple requests. It is safer to create a shallow copy of the configuration and its headers to ensure immutability of the input.

    this.request = (config: any, callback: Function) => {
      const requestConfig = this._affinityKey
        ? Object.assign({}, config, {
            headers: Object.assign({}, config.headers, {
              'x-grpc-gcp-affinity-key': this._affinityKey,
            }),
          })
        : config;
      return session.request(requestConfig, callback);
    };

    this.requestStream = (config: any) => {
      const requestConfig = this._affinityKey
        ? Object.assign({}, config, {
            headers: Object.assign({}, config.headers, {
              'x-grpc-gcp-affinity-key': this._affinityKey,
            }),
          })
        : config;
      return session.requestStream(requestConfig);
    };


const readOnly = Snapshot.encodeTimestampBounds(options || {});
this._options = {readOnly};
this._options = { readOnly };
this._dbName = (this.session.parent as Database).formattedName_;
this._waitingRequests = [];
this._inlineBeginStarted = false;
Expand Down Expand Up @@ -463,13 +484,13 @@ export class Snapshot extends EventEmitter {
} else if (lowPriority.length > 0) {
// RULE 2: If only 'insert' key(s) exist, find the one with
// highest number of values
const {bestCandidates} = lowPriority.reduce(
const { bestCandidates } = lowPriority.reduce(
(acc, mutation) => {
const size = mutation.insert?.values?.length || 0;

if (size > acc.maxSize) {
// New largest size found, start a new list
return {maxSize: size, bestCandidates: [mutation]};
return { maxSize: size, bestCandidates: [mutation] };
}
if (size === acc.maxSize) {
// Same size as current max, add to list
Expand Down Expand Up @@ -905,7 +926,7 @@ export class Snapshot extends EventEmitter {
if (attempt === 1) {
span.addEvent('Starting stream');
} else {
span.addEvent('Re-attempting start stream', {attempt: attempt});
span.addEvent('Re-attempting start stream', { attempt: attempt });
}
} else {
span.addEvent('Resuming stream', {
Expand All @@ -917,7 +938,7 @@ export class Snapshot extends EventEmitter {
return this.requestStream({
client: 'SpannerClient',
method: 'streamingRead',
reqOpts: Object.assign({}, reqOpts, {resumeToken}),
reqOpts: Object.assign({}, reqOpts, { resumeToken }),
gaxOpts: gaxOptions,
headers: injectRequestIDIntoHeaders(
headers,
Expand Down Expand Up @@ -1420,7 +1441,7 @@ export class Snapshot extends EventEmitter {
*/
runStream(query: string | ExecuteSqlRequest): PartialResultStream {
if (typeof query === 'string') {
query = {sql: query} as ExecuteSqlRequest;
query = { sql: query } as ExecuteSqlRequest;
}

query = Object.assign({}, query) as ExecuteSqlRequest;
Expand All @@ -1445,7 +1466,7 @@ export class Snapshot extends EventEmitter {

const sanitizeRequest = () => {
query = query as ExecuteSqlRequest;
const {params, paramTypes} = Snapshot.encodeParams(query);
const { params, paramTypes } = Snapshot.encodeParams(query);
const transaction: spannerClient.spanner.v1.ITransactionSelector = {};
if (this.id) {
transaction.id = this.id as Uint8Array;
Expand Down Expand Up @@ -1512,7 +1533,7 @@ export class Snapshot extends EventEmitter {
if (attempt === 1) {
span.addEvent('Starting stream');
} else {
span.addEvent('Re-attempting start stream', {attempt: attempt});
span.addEvent('Re-attempting start stream', { attempt: attempt });
}
} else {
span.addEvent('Resuming stream', {
Expand All @@ -1536,7 +1557,7 @@ export class Snapshot extends EventEmitter {
return this.requestStream({
client: 'SpannerClient',
method: 'executeStreamingSql',
reqOpts: Object.assign({}, reqOpts, {resumeToken}),
reqOpts: Object.assign({}, reqOpts, { resumeToken }),
gaxOpts: gaxOptions,
headers: injectRequestIDIntoHeaders(
headers,
Expand Down Expand Up @@ -1665,7 +1686,7 @@ export class Snapshot extends EventEmitter {
options: TimestampBounds,
): spannerClient.spanner.v1.TransactionOptions.IReadOnly {
const readOnly: spannerClient.spanner.v1.TransactionOptions.IReadOnly = {};
const {returnReadTimestamp = true} = options;
const { returnReadTimestamp = true } = options;

if (options.minReadTimestamp instanceof PreciseDate) {
readOnly.minReadTimestamp = (
Expand Down Expand Up @@ -1713,8 +1734,8 @@ export class Snapshot extends EventEmitter {
static encodeParams(request: ExecuteSqlRequest) {
const typeMap = request.types || {};

const params: p.IStruct = {fields: request.params?.fields || {}};
const paramTypes: {[field: string]: spannerClient.spanner.v1.Type} =
const params: p.IStruct = { fields: request.params?.fields || {} };
const paramTypes: { [field: string]: spannerClient.spanner.v1.Type } =
request.paramTypes || {};

if (request.params && !request.params.fields) {
Expand Down Expand Up @@ -1751,7 +1772,7 @@ export class Snapshot extends EventEmitter {
});
}

return {params, paramTypes};
return { params, paramTypes };
}

/**
Expand Down Expand Up @@ -1787,12 +1808,12 @@ export class Snapshot extends EventEmitter {
resp: spannerClient.spanner.v1.ITransaction,
span: Span,
): void {
const {id, readTimestamp} = resp;
const { id, readTimestamp } = resp;

this.id = id!;
this.metadata = resp;

span.addEvent('Transaction Creation Done', {id: this.id.toString()});
span.addEvent('Transaction Creation Done', { id: this.id.toString() });

if (readTimestamp) {
this.readTimestampProto = readTimestamp;
Expand Down Expand Up @@ -1822,7 +1843,7 @@ export class Snapshot extends EventEmitter {
// Queue subsequent requests.
return (resumeToken?: ResumeToken): Readable => {
const streamProxy = new Readable({
read() {},
read() { },
});

this._waitingRequests.push(() => {
Expand Down Expand Up @@ -1906,7 +1927,7 @@ export class Dml extends Snapshot {
callback?: RunUpdateCallback,
): void | Promise<RunUpdateResponse> {
if (typeof query === 'string') {
query = {sql: query} as ExecuteSqlRequest;
query = { sql: query } as ExecuteSqlRequest;
}

return startTrace(
Expand Down Expand Up @@ -2051,7 +2072,7 @@ export class Transaction extends Dml {
super(session, undefined, queryOptions);

this._queuedMutations = [];
this._options = {readWrite: options};
this._options = { readWrite: options };
this._options.isolationLevel = IsolationLevel.ISOLATION_LEVEL_UNSPECIFIED;
this.requestOptions = requestOptions;
this._retryCommit = false;
Expand Down Expand Up @@ -2167,11 +2188,11 @@ export class Transaction extends Dml {
const statements: spannerClient.spanner.v1.ExecuteBatchDmlRequest.IStatement[] =
queries.map(query => {
if (typeof query === 'string') {
return {sql: query};
return { sql: query };
}
const {sql} = query;
const {params, paramTypes} = Snapshot.encodeParams(query);
return {sql, params, paramTypes};
const { sql } = query;
const { params, paramTypes } = Snapshot.encodeParams(query);
return { sql, params, paramTypes };
});

const transaction: spannerClient.spanner.v1.ITransactionSelector = {};
Expand Down Expand Up @@ -2235,7 +2256,7 @@ export class Transaction extends Dml {

if (err) {
const rowCounts: number[] = [];
batchUpdateError = Object.assign(err, {rowCounts});
batchUpdateError = Object.assign(err, { rowCounts });
setSpanError(span, batchUpdateError);
span.end();
callback!(batchUpdateError, rowCounts, resp);
Expand All @@ -2244,18 +2265,18 @@ export class Transaction extends Dml {

this._updatePrecommitToken(resp);

const {resultSets, status} = resp;
const { resultSets, status } = resp;
for (const resultSet of resultSets) {
if (!this.id && resultSet.metadata?.transaction) {
this._update(resultSet.metadata.transaction, span);
}
}
const rowCounts: number[] = resultSets.map(({stats}) => {
const rowCounts: number[] = resultSets.map(({ stats }) => {
return (
(stats &&
Number(
stats[
(stats as spannerClient.spanner.v1.ResultSetStats).rowCount!
(stats as spannerClient.spanner.v1.ResultSetStats).rowCount!
],
)) ||
0
Expand Down Expand Up @@ -2453,6 +2474,15 @@ export class Transaction extends Dml {
addLeaderAwareRoutingHeader(headers);
}

// Create a copy to avoid leaking the unbind header to the global commonHeaders_.
const requestHeaders = Object.assign({}, headers);

// Signal to grpc-gcp to unbind the affinity key and clean up memory
// since this transaction is now complete.
if (this._affinityKey) {
requestHeaders['x-grpc-gcp-unbind'] = 'true';
}

span.addEvent('Starting Commit');

const database = this.session.parent as Database;
Expand All @@ -2463,7 +2493,7 @@ export class Transaction extends Dml {
reqOpts,
gaxOpts: gaxOpts,
headers: injectRequestIDIntoHeaders(
headers,
requestHeaders,
this.session,
nextNthRequest(database),
1,
Expand Down Expand Up @@ -2819,13 +2849,22 @@ export class Transaction extends Dml {
addLeaderAwareRoutingHeader(headers);
}

// Create a copy to avoid leaking the unbind header to the global commonHeaders_.
const requestHeaders = Object.assign({}, headers);

// Signal to grpc-gcp to unbind the affinity key and clean up memory
// since this transaction is now complete.
if (this._affinityKey) {
requestHeaders['x-grpc-gcp-unbind'] = 'true';
}

this.request(
{
client: 'SpannerClient',
method: 'rollback',
reqOpts,
gaxOpts,
headers: headers,
headers: requestHeaders,
},
(err: null | ServiceError) => {
if (err) {
Expand Down Expand Up @@ -3052,7 +3091,7 @@ function buildMutation(
});

const mutation: spannerClient.spanner.v1.IMutation = {
[method]: {table, columns, values},
[method]: { table, columns, values },
};
return mutation as spannerClient.spanner.v1.Mutation;
}
Expand All @@ -3072,7 +3111,7 @@ function buildDeleteMutation(
keys: toArray(keys).map(codec.convertToListValue),
};
const mutation: spannerClient.spanner.v1.IMutation = {
delete: {table, keySet},
delete: { table, keySet },
};
return mutation as spannerClient.spanner.v1.Mutation;
}
Expand Down Expand Up @@ -3265,7 +3304,7 @@ export class PartitionedDml extends Dml {
options = {} as spannerClient.spanner.v1.TransactionOptions.PartitionedDml,
) {
super(session);
this._options = {partitionedDml: options};
this._options = { partitionedDml: options };
}
/**
* Use option excludeTxnFromChangeStreams to exclude partitionedDml
Expand Down
Loading
Loading