88} from './interfaces' ;
99
1010import { createRedisConnection } from './redis.utils' ;
11- import { CONNECT_EVENT , ERROR_EVENT } from '@nestjs/microservices/constants ' ;
11+ import { RedisEventsMap } from '@nestjs/microservices/events/redis.events ' ;
1212import { deserialize , serialize } from './streams.utils' ;
1313import { RedisStreamContext } from './stream.context' ;
1414import { Observable } from 'rxjs' ;
@@ -37,7 +37,7 @@ export class RedisStreamStrategy
3737 this . handleError ( this . client ) ;
3838
3939 // when server instance connect, bind handlers.
40- this . redis . on ( CONNECT_EVENT , ( ) => {
40+ this . redis . on ( RedisEventsMap . CONNECT , ( ) => {
4141 this . logger . log (
4242 'Redis connected successfully on ' +
4343 ( this . options . connection ?. url ??
@@ -150,13 +150,13 @@ export class RedisStreamStrategy
150150 if ( ! this . client ) throw new Error ( 'Redis client instance not found.' ) ;
151151
152152 const commandArgs : RedisValue [ ] = [ ] ;
153- if ( this . options . streams ?. maxLen ) {
154- commandArgs . push ( " MAXLEN" )
155- commandArgs . push ( "~" )
156- commandArgs . push ( this . options . streams . maxLen . toString ( ) )
153+ if ( this . options . streams ?. maxLen ) {
154+ commandArgs . push ( ' MAXLEN' ) ;
155+ commandArgs . push ( '~' ) ;
156+ commandArgs . push ( this . options . streams . maxLen . toString ( ) ) ;
157157 }
158- commandArgs . push ( "*" )
159-
158+ commandArgs . push ( '*' ) ;
159+
160160 await this . client . xadd (
161161 responseObj . stream ,
162162 ...commandArgs ,
@@ -339,7 +339,7 @@ export class RedisStreamStrategy
339339
340340 // for redis instances. need to add mechanism to try to connect back.
341341 public handleError ( stream : any ) {
342- stream . on ( ERROR_EVENT , ( err : any ) => {
342+ stream . on ( RedisEventsMap . ERROR , ( err : any ) => {
343343 this . logger . error ( 'Redis instance error: ' + err ) ;
344344 this . close ( ) ;
345345 } ) ;
@@ -350,4 +350,18 @@ export class RedisStreamStrategy
350350 this . redis && this . redis . quit ( ) ;
351351 this . client && this . client . quit ( ) ;
352352 }
353+
354+ public on <
355+ EventKey extends string = string ,
356+ EventCallback extends Function = Function ,
357+ > ( event : EventKey , callback : EventCallback ) : this {
358+ if ( this . redis ) {
359+ this . redis . on ( event , callback as unknown as ( ...args : unknown [ ] ) => void ) ;
360+ }
361+ return this ;
362+ }
363+
364+ public unwrap < T > ( ) : T {
365+ return this . redis as T ;
366+ }
353367}
0 commit comments