diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 7fffed86580..f1a05681a02 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -43,6 +43,7 @@ export default class RedisCommandsQueue { readonly #waitingToBeSent = new LinkedList(); readonly #waitingForReply = new LinkedList(); readonly #onShardedChannelMoved: OnShardedChannelMoved; + readonly #earlyServerError: Function; readonly #pubSub = new PubSub(); @@ -87,7 +88,14 @@ export default class RedisCommandsQueue { } } - const { resolve, reject } = this.#waitingForReply.shift()!; + let resolve, reject; + if (this.#waitingForReply.length > 0) { + ({ resolve, reject } = this.#waitingForReply.shift()!); + } else { + resolve = (obj: any) => this.#earlyServerError(new Error(`unexpected response from server ${obj}`)); + reject = (e: Error) => this.#earlyServerError(e); + } + if (reply instanceof ErrorReply) { reject(reply); } else { @@ -98,10 +106,12 @@ export default class RedisCommandsQueue { constructor( maxLength: number | null | undefined, - onShardedChannelMoved: OnShardedChannelMoved + onShardedChannelMoved: OnShardedChannelMoved, + earlyServerError: Function ) { this.#maxLength = maxLength; this.#onShardedChannelMoved = onShardedChannelMoved; + this.#earlyServerError = earlyServerError; } addCommand(args: RedisCommandArguments, options?: QueueCommandOptions): Promise { diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 5b9badf3f37..24cad0acc16 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -248,7 +248,8 @@ export default class RedisClient< #initiateQueue(): RedisCommandsQueue { return new RedisCommandsQueue( this.#options?.commandsQueueMaxLength, - (channel, listeners) => this.emit('sharded-channel-moved', channel, listeners) + (channel, listeners) => this.emit('sharded-channel-moved', channel, listeners), + (err: Error) => this.emit('error', err) ); }