Skip to content

Commit 49328c1

Browse files
committed
wip
1 parent 42e8233 commit 49328c1

File tree

2 files changed

+93
-29
lines changed

2 files changed

+93
-29
lines changed

packages/client/lib/client/enterprise-maintenance-manager.ts

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,14 @@ import { setTimeout } from "node:timers/promises";
88
import { RedisTcpSocketOptions } from "./socket";
99
import diagnostics_channel from "node:diagnostics_channel";
1010

11-
type RedisType = RedisClient<any, any, any, any, any>
11+
type RedisType = RedisClient<any, any, any, any, any>;
1212

1313
export const SMIGRATED_EVENT = "__SMIGRATED";
14+
export interface SMigratedEvent {
15+
source: { host: string, port: number };
16+
destination: { host: string, port: number };
17+
ranges: (number | [number, number])[]
18+
}
1419

1520
export const MAINTENANCE_EVENTS = {
1621
PAUSE_WRITING: "pause-writing",
@@ -87,7 +92,7 @@ export default class EnterpriseMaintenanceManager {
8792

8893
if (!host) return;
8994

90-
const tls = options.socket?.tls ?? false
95+
const tls = options.socket?.tls ?? false;
9196

9297
const movingEndpointType = await determineEndpoint(tls, host, options);
9398
return {
@@ -129,12 +134,12 @@ export default class EnterpriseMaintenanceManager {
129134
const type = String(push[0]);
130135

131136
emitDiagnostics({
132-
type,
133-
timestamp: Date.now(),
134-
data: {
135-
push: push.map(String),
136-
},
137-
});
137+
type,
138+
timestamp: Date.now(),
139+
data: {
140+
push: push.map(String),
141+
},
142+
});
138143
switch (type) {
139144
case PN.MOVING: {
140145
// [ 'MOVING', '17', '15', '54.78.247.156:12075' ]
@@ -223,7 +228,7 @@ export default class EnterpriseMaintenanceManager {
223228

224229
// If the URL is provided, it takes precedense
225230
// the options object could just be mutated
226-
if(this.#options.url) {
231+
if (this.#options.url) {
227232
const u = new URL(this.#options.url);
228233
u.hostname = host;
229234
u.port = String(port);
@@ -232,15 +237,17 @@ export default class EnterpriseMaintenanceManager {
232237
this.#options.socket = {
233238
...this.#options.socket,
234239
host,
235-
port
236-
}
240+
port,
241+
};
237242
}
238243
const tmpClient = this.#client.duplicate();
239-
tmpClient.on('error', (error: unknown) => {
244+
tmpClient.on("error", (error: unknown) => {
240245
//We dont know how to handle tmp client errors
241-
dbgMaintenance(`[ERR]`, error)
246+
dbgMaintenance(`[ERR]`, error);
242247
});
243-
dbgMaintenance(`Tmp client created in ${( performance.now() - start ).toFixed(2)}ms`);
248+
dbgMaintenance(
249+
`Tmp client created in ${(performance.now() - start).toFixed(2)}ms`,
250+
);
244251
dbgMaintenance(
245252
`Set timeout for tmp client to ${this.#options.maintRelaxedSocketTimeout}`,
246253
);
@@ -251,7 +258,9 @@ export default class EnterpriseMaintenanceManager {
251258
dbgMaintenance(`Connecting tmp client: ${host}:${port}`);
252259
start = performance.now();
253260
await tmpClient.connect();
254-
dbgMaintenance(`Connected to tmp client in ${(performance.now() - start).toFixed(2)}ms`);
261+
dbgMaintenance(
262+
`Connected to tmp client in ${(performance.now() - start).toFixed(2)}ms`,
263+
);
255264
// 3 [EVENT] New socket connected
256265

257266
dbgMaintenance(`Wait for all in-flight commands to complete`);
@@ -297,12 +306,11 @@ export default class EnterpriseMaintenanceManager {
297306

298307
const update: MaintenanceUpdate = {
299308
relaxedCommandTimeout: undefined,
300-
relaxedSocketTimeout: undefined
309+
relaxedSocketTimeout: undefined,
301310
};
302311

303312
this.#client._maintenanceUpdate(update);
304313
};
305-
306314
}
307315

308316
export type MovingEndpointType =
@@ -341,9 +349,7 @@ async function determineEndpoint(
341349
): Promise<MovingEndpointType> {
342350
assert(options.maintEndpointType !== undefined);
343351
if (options.maintEndpointType !== "auto") {
344-
dbgMaintenance(
345-
`Determine endpoint type: ${options.maintEndpointType}`,
346-
);
352+
dbgMaintenance(`Determine endpoint type: ${options.maintEndpointType}`);
347353
return options.maintEndpointType;
348354
}
349355

packages/client/lib/cluster/cluster-slots.ts

Lines changed: 67 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import { RedisArgument, RedisFunctions, RedisModules, RedisScripts, RespVersions
77
import calculateSlot from 'cluster-key-slot';
88
import { RedisSocketOptions } from '../client/socket';
99
import { BasicPooledClientSideCache, PooledClientSideCacheProvider } from '../client/cache';
10-
import { SMIGRATED_EVENT, dbgMaintenance } from '../client/enterprise-maintenance-manager';
10+
import { SMIGRATED_EVENT, SMigratedEvent, dbgMaintenance } from '../client/enterprise-maintenance-manager';
1111

1212
interface NodeAddress {
1313
host: string;
@@ -114,7 +114,6 @@ export default class RedisClusterSlots<
114114
readonly nodeByAddress = new Map<string, MasterNode<M, F, S, RESP, TYPE_MAPPING> | ShardNode<M, F, S, RESP, TYPE_MAPPING>>();
115115
pubSubNode?: PubSubNode<M, F, S, RESP, TYPE_MAPPING>;
116116
clientSideCache?: PooledClientSideCacheProvider;
117-
smigratedSequenceIdsSeen = new Set<number>();
118117

119118
#isOpen = false;
120119

@@ -254,15 +253,74 @@ export default class RedisClusterSlots<
254253
}
255254
}
256255

257-
#handleSmigrated = async (sequenceId: number) => {
258-
dbgMaintenance(`[CLUSTER_SLOTS]: handle smigrated`)
259-
if(this.smigratedSequenceIdsSeen.has(sequenceId)) {
260-
dbgMaintenance(`[CLUSTER_SLOTS]: smigrated sequence id ${sequenceId} already seen, skipping rediscovery`);
256+
#handleSmigrated = async (event: SMigratedEvent) => {
257+
dbgMaintenance(`[CSlots]: handle smigrated`, event);
258+
259+
// slots = new Array<Shard<M, F, S, RESP, TYPE_MAPPING>>(RedisClusterSlots.#SLOTS);
260+
// masters = new Array<MasterNode<M, F, S, RESP, TYPE_MAPPING>>();
261+
// replicas = new Array<ShardNode<M, F, S, RESP, TYPE_MAPPING>>();
262+
// readonly nodeByAddress = new Map<string, MasterNode<M, F, S, RESP, TYPE_MAPPING> | ShardNode<M, F, S, RESP, TYPE_MAPPING>>();
263+
// pubSubNode?: PubSubNode<M, F, S, RESP, TYPE_MAPPING>;
264+
265+
const sourceAddress = `${event.source.host}:${event.source.port}`;
266+
const sourceNode = this.nodeByAddress.get(sourceAddress);
267+
if(!sourceNode) {
268+
dbgMaintenance(`[CSlots]: address ${sourceAddress} not in 'nodeByAddress', abort SMIGRATED handling`);
269+
return;
270+
}
271+
272+
// 1. Pausing
273+
//TODO - check the single pubsubnode
274+
sourceNode.client?._pause();
275+
if('pubSub' in sourceNode) {
276+
sourceNode.pubSub?.client._pause();
277+
}
278+
279+
const destinationAddress = `${event.destination.host}:${event.destination.port}`;
280+
let destinationNode = this.nodeByAddress.get(destinationAddress);
281+
let destinationShard: Shard<M, F, S, RESP, TYPE_MAPPING>;
282+
283+
// 2. Create new Master
284+
if(!destinationNode) {
285+
const promises: Promise<unknown>[] = [];
286+
destinationNode = this.#initiateSlotNode({ host: event.destination.host, port: event.destination.port, id: 'asdff' }, false, true, new Set(), promises);
287+
await Promise.all(promises);
288+
// 2.1 Pause
289+
destinationNode.client?._pause();
290+
// In case destination node didnt exist, this means Shard didnt exist as well, so creating a new Shard is completely fine
291+
destinationShard = {
292+
master: destinationNode
293+
};
261294
} else {
262-
dbgMaintenance(`[CLUSTER_SLOTS]: smigrated sequence id ${sequenceId} is new, triggering rediscovery`);
263-
this.smigratedSequenceIdsSeen.add(sequenceId);
264-
await this.#discoverWithRootNodes()
295+
// In case destination node existed, this means there was a Shard already, so its best if we can find it.
296+
const existingShard = this.slots.find(shard => shard.master.host === event.destination.host && shard.master.port === event.destination.port);
297+
if(!existingShard) {
298+
dbgMaintenance("Could not find shard");
299+
throw new Error('Could not find shard');
300+
}
301+
destinationShard = existingShard;
265302
}
303+
304+
// 3. Soft update shards
305+
for(const range of event.ranges) {
306+
if(typeof range === 'number') {
307+
this.slots[range] = destinationShard;
308+
} else {
309+
for (let slot = range[0]; slot <= range[1]; slot++) {
310+
this.slots[slot] = destinationShard;
311+
}
312+
}
313+
}
314+
315+
// 4. For all affected clients (normal, pubsub, spubsub):
316+
// 4.1 Wait for inflight commands to complete
317+
// 4.2 Extract commands, channels, sharded channels
318+
// 4.3 Kill if no slots are pointing to it
319+
//
320+
321+
// 5. Prepend extracted commands, chans
322+
// 5.1 Unpause
323+
266324
}
267325

268326
async #getShards(rootNode: RedisClusterClientOptions) {

0 commit comments

Comments
 (0)