99import java .io .IOException ;
1010import java .net .ServerSocket ;
1111import java .time .Duration ;
12- import java .util .HashMap ;
13- import java .util .List ;
14- import java .util .Map ;
15- import java .util .Set ;
12+ import java .util .*;
1613import java .util .concurrent .CopyOnWriteArrayList ;
1714
1815@ SuppressWarnings ("unused" )
@@ -521,45 +518,32 @@ public void onMessage(String channel, String message) {
521518 }, "Redis-Subscriber-" + channel ).start ();
522519 }
523520
524- public boolean enqueue (String queueName , String message ) {
521+ public Long enqueue (String queueName , String message ) {
525522 if (useMockFallback ) {
526523 return mockRedis .enqueue (queueName , message );
527524 }
528-
529525 if (!isConnected ()) {
530526 Log .error ("Not connected to Redis server. Cannot enqueue message." );
531- return false ;
527+ return - 1L ;
532528 }
533-
534529 try (Jedis jedis = jedisPool .getResource ()) {
535- jedis .rpush (queueName , message );
536- return true ;
537- } catch (JedisConnectionException e ) {
538- Log .error ("Jedis connection error during RPUSH operation" , e );
539- connected = false ;
540- return false ;
530+ return jedis .rpush (queueName , message );
541531 } catch (Exception e ) {
542- Log .error ("Error enqueueing message to Redis" , e );
543- return false ;
532+ Log .error ("Error enqueueing message to Redis queue " + queueName , e );
533+ return - 1L ;
544534 }
545535 }
546536
547537 public String dequeue (String queueName ) {
548538 if (useMockFallback ) {
549539 return mockRedis .dequeue (queueName );
550540 }
551-
552541 if (!isConnected ()) {
553542 Log .error ("Not connected to Redis server. Cannot dequeue message." );
554543 return null ;
555544 }
556-
557545 try (Jedis jedis = jedisPool .getResource ()) {
558546 return jedis .lpop (queueName );
559- } catch (JedisConnectionException e ) {
560- Log .error ("Jedis connection error during LPOP operation" , e );
561- connected = false ;
562- return null ;
563547 } catch (Exception e ) {
564548 Log .error ("Error dequeuing message from Redis" , e );
565549 return null ;
@@ -598,6 +582,70 @@ public void registerQueueConsumer(String queueName, QueueMessageConsumer consume
598582 }, "Redis-Queue-Consumer-" + queueName ).start ();
599583 }
600584
585+ public void registerReliableQueueConsumer (String queueName , QueueMessageConsumer consumer ) {
586+ if (useMockFallback ) {
587+ mockRedis .registerQueueConsumer (queueName , consumer ); // Mock uses simple consumer
588+ return ;
589+ }
590+ if (!isConnected ()) {
591+ Log .error ("Not connected to Redis. Cannot register queue consumer for " + queueName );
592+ return ;
593+ }
594+
595+ final String processingQueueName = queueName + ":processing:" + UUID .randomUUID ().toString ();
596+
597+ new Thread (() -> {
598+ try {
599+ while (isConnected ()) {
600+ try (Jedis jedis = jedisPool .getResource ()) {
601+ String message = jedis .brpoplpush (queueName , processingQueueName , 0 );
602+
603+ if (message != null ) {
604+ try {
605+ consumer .onMessageReceived (queueName , message );
606+ jedis .lrem (processingQueueName , -1 , message );
607+ } catch (Exception e ) {
608+ Log .error ("Worker failed to process message. It remains in " + processingQueueName + " for recovery." , e );
609+ }
610+ }
611+ }
612+ }
613+ } catch (JedisConnectionException e ) {
614+ Log .error ("Redis connection lost for consumer on queue " + queueName , e );
615+ connected = false ;
616+ } catch (Exception e ) {
617+ Log .error ("Error in reliable queue consumer for " + queueName , e );
618+ }
619+ }, "Reliable-Queue-Consumer-" + queueName ).start ();
620+ }
621+
622+ public long getQueueLength (String queueName ) {
623+ if (useMockFallback ) {
624+ return mockRedis .getQueueLength (queueName );
625+ }
626+ if (!isConnected ()) return -1 ;
627+ try (Jedis jedis = jedisPool .getResource ()) {
628+ return jedis .llen (queueName );
629+ } catch (Exception e ) {
630+ Log .error ("Error getting queue length for " + queueName , e );
631+ return -1 ;
632+ }
633+ }
634+
635+ public Long getPositionInQueue (String queueName , String message ) {
636+ if (useMockFallback ) {
637+ return mockRedis .getPositionInQueue (queueName , message );
638+ }
639+ if (!isConnected ()) return null ;
640+ try (Jedis jedis = jedisPool .getResource ()) {
641+ return jedis .lpos (queueName , message );
642+ } catch (Exception e ) {
643+ Log .error ("Error getting position in queue for " + queueName , e );
644+ return null ;
645+ }
646+ }
647+
648+
601649 public interface MessageListener {
602650 void onMessage (String channel , String message );
603651 }
@@ -723,30 +771,27 @@ public void subscribe(String channel, MessageListener messageListener) {
723771 }
724772
725773 subscribers .computeIfAbsent (channel , k -> new CopyOnWriteArrayList <>())
726- .add (messageListener );
774+ .add (messageListener );
727775 }
728776
729- public boolean enqueue (String queueName , String message ) {
777+ public Long enqueue (String queueName , String message ) {
730778 if (!isRunning ()) {
731779 Log .error ("Mock Redis server is not running. Cannot enqueue message." );
732- return false ;
780+ return - 1L ;
733781 }
734782
735- messageQueues .computeIfAbsent (queueName , k -> new CopyOnWriteArrayList <>())
736- .add (message );
783+ List < String > queue = messageQueues .computeIfAbsent (queueName , k -> new CopyOnWriteArrayList <>());
784+ queue .add (message );
737785
738786 List <QueueMessageConsumer > consumers = queueConsumers .get (queueName );
739787 if (consumers != null && !consumers .isEmpty ()) {
740788 QueueMessageConsumer consumer = consumers .get (0 );
741-
742- List <String > queue = messageQueues .get (queueName );
743789 if (!queue .isEmpty ()) {
744790 String msg = queue .remove (0 );
745791 new Thread (() -> consumer .onMessageReceived (queueName , msg )).start ();
746792 }
747793 }
748-
749- return true ;
794+ return (long ) queue .size ();
750795 }
751796
752797 public String dequeue (String queueName ) {
@@ -770,14 +815,29 @@ public void registerQueueConsumer(String queueName, QueueMessageConsumer consume
770815 }
771816
772817 queueConsumers .computeIfAbsent (queueName , k -> new CopyOnWriteArrayList <>())
773- .add (consumer );
818+ .add (consumer );
774819
775820 List <String > queue = messageQueues .get (queueName );
776821 if (queue != null && !queue .isEmpty ()) {
777822 String message = queue .remove (0 );
778823 new Thread (() -> consumer .onMessageReceived (queueName , message )).start ();
779824 }
780825 }
826+
827+ @ Override
828+ public long getQueueLength (String queueName ) {
829+ if (!isRunning ()) return -1 ;
830+ return messageQueues .getOrDefault (queueName , new ArrayList <>()).size ();
831+ }
832+
833+ @ Override
834+ public Long getPositionInQueue (String queueName , String message ) {
835+ if (!isRunning ()) return null ;
836+ List <String > queue = messageQueues .get (queueName );
837+ if (queue == null ) return null ;
838+ long index = queue .indexOf (message );
839+ return index == -1 ? null : index ;
840+ }
781841 }
782842
783843 public Map <String , String > getDebugInfo () {
@@ -868,17 +928,17 @@ public Map<String, Long> getKeyspaceStats() {
868928
869929 Set <String > keys = jedis .keys ("*" );
870930 stats .put ("string_keys" , keys .stream ()
871- .filter (k -> jedis .type (k ).equals ("string" ))
872- .count ());
931+ .filter (k -> jedis .type (k ).equals ("string" ))
932+ .count ());
873933 stats .put ("list_keys" , keys .stream ()
874- .filter (k -> jedis .type (k ).equals ("list" ))
875- .count ());
934+ .filter (k -> jedis .type (k ).equals ("list" ))
935+ .count ());
876936 stats .put ("set_keys" , keys .stream ()
877- .filter (k -> jedis .type (k ).equals ("set" ))
878- .count ());
937+ .filter (k -> jedis .type (k ).equals ("set" ))
938+ .count ());
879939 stats .put ("hash_keys" , keys .stream ()
880- .filter (k -> jedis .type (k ).equals ("hash" ))
881- .count ());
940+ .filter (k -> jedis .type (k ).equals ("hash" ))
941+ .count ());
882942
883943 return stats ;
884944 } catch (Exception e ) {
@@ -912,4 +972,4 @@ public void onCommand(String command) {
912972 }
913973 }, "Redis-Monitor" ).start ();
914974 }
915- }
975+ }
0 commit comments