Skip to content

Commit cd996ca

Browse files
committed
add slotnumber to commands
1 parent 71bafd1 commit cd996ca

File tree

2 files changed

+16
-4
lines changed

2 files changed

+16
-4
lines changed

packages/client/lib/client/commands-queue.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ export interface CommandOptions<T = TypeMapping> {
1919
* Timeout for the command in milliseconds
2020
*/
2121
timeout?: number;
22+
/**
23+
* @internal
24+
* The slot the command is targeted to (if any)
25+
*/
26+
slotNumber?: number;
2227
}
2328

2429
export interface CommandToWrite extends CommandWaitingForReply {
@@ -33,6 +38,7 @@ export interface CommandToWrite extends CommandWaitingForReply {
3338
listener: () => unknown;
3439
originalTimeout: number | undefined;
3540
} | undefined;
41+
slotNumber?: number
3642
}
3743

3844
interface CommandWaitingForReply {
@@ -219,6 +225,7 @@ export default class RedisCommandsQueue {
219225
channelsCounter: undefined,
220226
typeMapping: options?.typeMapping
221227
};
228+
value.slotNumber = options?.slotNumber
222229

223230
// If #maintenanceCommandTimeout was explicitly set, we should
224231
// use it instead of the timeout provided by the command

packages/client/lib/cluster/index.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -416,13 +416,16 @@ export default class RedisCluster<
416416
fn: (client: RedisClientType<M, F, S, RESP, TYPE_MAPPING>, opts?: ClusterCommandOptions) => Promise<T>
417417
): Promise<T> {
418418
const maxCommandRedirections = this._options.maxCommandRedirections ?? 16;
419-
let client = await this._slots.getClient(firstKey, isReadonly);
419+
let { client, slotNumber } = await this._slots.getClientAndSlotNumber(firstKey, isReadonly);
420420
let i = 0;
421421

422422
let myFn = fn;
423423

424424
while (true) {
425425
try {
426+
if(options !== undefined) {
427+
options.slotNumber = slotNumber;
428+
}
426429
return await myFn(client, options);
427430
} catch (err) {
428431
myFn = fn;
@@ -451,7 +454,9 @@ export default class RedisCluster<
451454

452455
if (err.message.startsWith('MOVED')) {
453456
await this._slots.rediscover(client);
454-
client = await this._slots.getClient(firstKey, isReadonly);
457+
const clientAndSlot = await this._slots.getClientAndSlotNumber(firstKey, isReadonly);
458+
client = clientAndSlot.client;
459+
slotNumber = clientAndSlot.slotNumber;
455460
continue;
456461
}
457462

@@ -485,11 +490,11 @@ export default class RedisCluster<
485490
type Multi = new (...args: ConstructorParameters<typeof RedisClusterMultiCommand>) => RedisClusterMultiCommandType<[], M, F, S, RESP, TYPE_MAPPING>;
486491
return new ((this as any).Multi as Multi)(
487492
async (firstKey, isReadonly, commands) => {
488-
const client = await this._self._slots.getClient(firstKey, isReadonly);
493+
const { client } = await this._self._slots.getClientAndSlotNumber(firstKey, isReadonly);
489494
return client._executeMulti(commands);
490495
},
491496
async (firstKey, isReadonly, commands) => {
492-
const client = await this._self._slots.getClient(firstKey, isReadonly);
497+
const { client } = await this._self._slots.getClientAndSlotNumber(firstKey, isReadonly);
493498
return client._executePipeline(commands);
494499
},
495500
routing,

0 commit comments

Comments
 (0)