From 4b924ea4d06960bde8a157d507bece51f1c9af39 Mon Sep 17 00:00:00 2001 From: Dmitri Tikhonov Date: Mon, 26 Jun 2023 15:43:55 +0000 Subject: [PATCH 1/2] Handle early server errors (issue #2417) --- packages/client/lib/client/commands-queue.ts | 15 ++++++++++++++- packages/client/lib/client/index.ts | 3 ++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 7fffed86580..546289d404b 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -43,6 +43,7 @@ export default class RedisCommandsQueue { readonly #waitingToBeSent = new LinkedList(); readonly #waitingForReply = new LinkedList(); readonly #onShardedChannelMoved: OnShardedChannelMoved; + readonly #earlyServerError: Function; readonly #pubSub = new PubSub(); @@ -98,10 +99,12 @@ export default class RedisCommandsQueue { constructor( maxLength: number | null | undefined, - onShardedChannelMoved: OnShardedChannelMoved + onShardedChannelMoved: OnShardedChannelMoved, + earlyServerError: Function ) { this.#maxLength = maxLength; this.#onShardedChannelMoved = onShardedChannelMoved; + this.#earlyServerError = earlyServerError; } addCommand(args: RedisCommandArguments, options?: QueueCommandOptions): Promise { @@ -237,6 +240,16 @@ export default class RedisCommandsQueue { } onReplyChunk(chunk: Buffer): void { + if (this.#waitingForReply.length === 0) { + this.#waitingForReply.push({ + resolve: (obj) => { + this.#earlyServerError(new Error(`unexpected response from server ${obj}`)); + }, + reject: e => this.#earlyServerError(e), + channelsCounter: 0, + returnBuffers: false + }); + } this.#decoder.write(chunk); } diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 5b9badf3f37..24cad0acc16 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -248,7 +248,8 @@ export default class RedisClient< #initiateQueue(): RedisCommandsQueue { return new RedisCommandsQueue( this.#options?.commandsQueueMaxLength, - (channel, listeners) => this.emit('sharded-channel-moved', channel, listeners) + (channel, listeners) => this.emit('sharded-channel-moved', channel, listeners), + (err: Error) => this.emit('error', err) ); } From 69526130452f76b263b58542a954339840fd6340 Mon Sep 17 00:00:00 2001 From: Dmitri Tikhonov Date: Fri, 30 Jun 2023 14:12:42 +0000 Subject: [PATCH 2/2] Move handler into onReply() --- packages/client/lib/client/commands-queue.ts | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 546289d404b..f1a05681a02 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -88,7 +88,14 @@ export default class RedisCommandsQueue { } } - const { resolve, reject } = this.#waitingForReply.shift()!; + let resolve, reject; + if (this.#waitingForReply.length > 0) { + ({ resolve, reject } = this.#waitingForReply.shift()!); + } else { + resolve = (obj: any) => this.#earlyServerError(new Error(`unexpected response from server ${obj}`)); + reject = (e: Error) => this.#earlyServerError(e); + } + if (reply instanceof ErrorReply) { reject(reply); } else { @@ -240,16 +247,6 @@ export default class RedisCommandsQueue { } onReplyChunk(chunk: Buffer): void { - if (this.#waitingForReply.length === 0) { - this.#waitingForReply.push({ - resolve: (obj) => { - this.#earlyServerError(new Error(`unexpected response from server ${obj}`)); - }, - reject: e => this.#earlyServerError(e), - channelsCounter: 0, - returnBuffers: false - }); - } this.#decoder.write(chunk); }