From c87f77099d3b4953ae8278ed3e6cfba9c254a99f Mon Sep 17 00:00:00 2001 From: amankr1279 Date: Sat, 2 Mar 2024 15:18:33 +0530 Subject: [PATCH 01/10] Add loggers --- .gitignore | 2 ++ pom.xml | 15 +++++++++++++++ src/main/java/com/example/Main.java | 16 ++++++++++++++++ src/main/resources/log4j2.xml | 18 ++++++++++++++++++ 4 files changed, 51 insertions(+) create mode 100644 src/main/java/com/example/Main.java create mode 100644 src/main/resources/log4j2.xml diff --git a/.gitignore b/.gitignore index c27e6ec..d5e2459 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,5 @@ nigel* target/ .DS_Store +.vscode +*.log diff --git a/pom.xml b/pom.xml index 85d4420..c77cd08 100755 --- a/pom.xml +++ b/pom.xml @@ -30,5 +30,20 @@ aws-java-sdk 1.10.65 + + org.apache.logging.log4j + log4j-api + 2.13.0 + + + org.apache.logging.log4j + log4j-core + 2.13.0 + + + com.jcabi + jcabi-log + 0.24.1 + diff --git a/src/main/java/com/example/Main.java b/src/main/java/com/example/Main.java new file mode 100644 index 0000000..9ca4025 --- /dev/null +++ b/src/main/java/com/example/Main.java @@ -0,0 +1,16 @@ +package com.example; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + + +public class Main { + public static Logger logger = LogManager.getLogger(Main.class); + + public static void main(String[] args) { + logger.trace("We've just greeted the user!"); + logger.info("Hello"); + logger.debug("Hello"); + System.out.println("Hello"); + } +} diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml new file mode 100644 index 0000000..01ae38c --- /dev/null +++ b/src/main/resources/log4j2.xml @@ -0,0 +1,18 @@ + + + + + + + + + + + + + + + + + + \ No newline at end of file From 97331ba7c103c46e9564a83c6a00e6ebcf8876f9 Mon Sep 17 00:00:00 2001 From: amankr1279 Date: Sun, 3 Mar 2024 22:22:26 +0530 Subject: [PATCH 02/10] Add Priority Queue Service interface and corresponding message --- .../java/com/example/PriorityMessage.java | 48 +++++++++++++++++++ .../com/example/PriorityQueueService.java | 11 +++++ 2 files changed, 59 insertions(+) create mode 100644 src/main/java/com/example/PriorityMessage.java create mode 100644 src/main/java/com/example/PriorityQueueService.java diff --git a/src/main/java/com/example/PriorityMessage.java b/src/main/java/com/example/PriorityMessage.java new file mode 100644 index 0000000..26fcbfb --- /dev/null +++ b/src/main/java/com/example/PriorityMessage.java @@ -0,0 +1,48 @@ +package com.example; + +public class PriorityMessage implements Comparable{ + private Integer rank; + private Long time; + private Message message; + + public PriorityMessage(Integer rank, Long time, Message message) { + this.rank = rank; + this.time = time; + this.message = message; + } + + public Integer getRank() { + return rank; + } + + public void setFirst(Integer rank) { + this.rank = rank; + } + + public Long getTime() { + return time; + } + + public void setTime(Long time) { + this.time = time; + } + + + public Message getMesssage() { + return message; + } + + public void setMessage(Message message) { + this.message = message; + } + + @Override + public int compareTo(PriorityMessage other) { + // Higher rank messages should come after lower rank messages + int rankComparison = Integer.compare(this.rank, other.getRank()); + if (rankComparison == 0) { + return Long.compare(this.time, other.time); + } + return rankComparison; + } +} diff --git a/src/main/java/com/example/PriorityQueueService.java b/src/main/java/com/example/PriorityQueueService.java new file mode 100644 index 0000000..1034215 --- /dev/null +++ b/src/main/java/com/example/PriorityQueueService.java @@ -0,0 +1,11 @@ +package com.example; + +public interface PriorityQueueService { + + public void push(String queueUrl, String msgBody, int rank); + + public Message pull(String queueUrl); + + public void delete(String queueUrl, String receiptId); + +} \ No newline at end of file From fb7ca4241a864479c782c6c99e592484bbf52b35 Mon Sep 17 00:00:00 2001 From: amankr1279 Date: Sun, 3 Mar 2024 22:22:56 +0530 Subject: [PATCH 03/10] Update push,pull and delete for Priority queue --- .../com/example/InMemoryQueueService.java | 53 ++++++++++--------- .../java/com/example/InMemoryQueueTest.java | 27 ++++++---- 2 files changed, 45 insertions(+), 35 deletions(-) diff --git a/src/main/java/com/example/InMemoryQueueService.java b/src/main/java/com/example/InMemoryQueueService.java index 76605cc..493fb1d 100755 --- a/src/main/java/com/example/InMemoryQueueService.java +++ b/src/main/java/com/example/InMemoryQueueService.java @@ -4,11 +4,11 @@ import java.io.InputStream; import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; -public class InMemoryQueueService implements QueueService { - private final Map> queues; +public class InMemoryQueueService implements PriorityQueueService { + private final Map> queues; private long visibilityTimeout; @@ -27,46 +27,51 @@ public class InMemoryQueueService implements QueueService { } @Override - public void push(String queueUrl, String msgBody) { - Queue queue = queues.get(queueUrl); + public void push(String queueUrl, String msgBody, int rank) { + // priority --> numerically lowest rank, lowest time, message + PriorityBlockingQueue queue = queues.get(queueUrl); if (queue == null) { - queue = new ConcurrentLinkedQueue<>(); + queue = new PriorityBlockingQueue(); queues.put(queueUrl, queue); } - queue.add(new Message(msgBody)); + Long now = now(); + PriorityMessage priorityMessage = new PriorityMessage(rank,now, new Message(msgBody)); + queue.add(priorityMessage); } @Override public Message pull(String queueUrl) { - Queue queue = queues.get(queueUrl); + PriorityBlockingQueue queue = queues.get(queueUrl); if (queue == null) { return null; } - long nowTime = now(); - Optional msgOpt = queue.stream().filter(m -> m.isVisibleAt(nowTime)).findFirst(); - if (msgOpt.isEmpty()) { - return null; - } else { - Message msg = msgOpt.get(); - msg.setReceiptId(UUID.randomUUID().toString()); - msg.incrementAttempts(); - msg.setVisibleFrom(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(visibilityTimeout)); - return new Message(msg.getBody(), msg.getReceiptId()); + while (!queue.isEmpty()) { + PriorityMessage priorityMessage = queue.poll(); + if (priorityMessage == null) { + return null; + } else { + Message msg = priorityMessage.getMesssage(); + msg.setReceiptId(UUID.randomUUID().toString()); + msg.incrementAttempts(); + msg.setVisibleFrom(nowTime + TimeUnit.SECONDS.toMillis(visibilityTimeout)); + + return new Message(msg.getBody(), msg.getReceiptId()); + } } + return null; } @Override public void delete(String queueUrl, String receiptId) { - Queue queue = queues.get(queueUrl); + PriorityBlockingQueue queue = queues.get(queueUrl); if (queue != null) { long nowTime = now(); - - for (Iterator it = queue.iterator(); it.hasNext(); ) { - Message msg = it.next(); - if (!msg.isVisibleAt(nowTime) && msg.getReceiptId().equals(receiptId)) { - it.remove(); + for (PriorityMessage priorityMessage : queue) { + Message message = priorityMessage.getMesssage(); + if (!message.isVisibleAt(nowTime) && message.getReceiptId().equals(receiptId)) { + queue.remove(priorityMessage); break; } } diff --git a/src/test/java/com/example/InMemoryQueueTest.java b/src/test/java/com/example/InMemoryQueueTest.java index 26bf4ae..f0b1c35 100755 --- a/src/test/java/com/example/InMemoryQueueTest.java +++ b/src/test/java/com/example/InMemoryQueueTest.java @@ -5,12 +5,15 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.junit.Before; import org.junit.Test; public class InMemoryQueueTest { - private QueueService qs; + private PriorityQueueService qs; private String queueUrl = "https://sqs.ap-1.amazonaws.com/007/MyQueue"; + public static Logger logger = LogManager.getLogger(InMemoryQueueTest.class); @Before public void setup() { @@ -20,8 +23,11 @@ public void setup() { @Test public void testSendMessage(){ - qs.push(queueUrl, "Good message!"); + qs.push(queueUrl, "Good message!",1); + logger.info(queueUrl); Message msg = qs.pull(queueUrl); + logger.info(msg.toString()); + logger.info(msg.getBody()); assertNotNull(msg); assertEquals("Good message!", msg.getBody()); @@ -31,7 +37,7 @@ public void testSendMessage(){ public void testPullMessage(){ String msgBody = "{ \"name\":\"John\", \"age\":30, \"car\":null }"; - qs.push(queueUrl, msgBody); + qs.push(queueUrl, msgBody,1); Message msg = qs.pull(queueUrl); assertEquals(msgBody, msg.getBody()); @@ -46,7 +52,7 @@ public void testPullEmptyQueue(){ @Test public void testDoublePull(){ - qs.push(queueUrl, "Message A."); + qs.push(queueUrl, "Message A.",1); qs.pull(queueUrl); Message msg = qs.pull(queueUrl); assertNull(msg); @@ -56,7 +62,7 @@ public void testDoublePull(){ public void testDeleteMessage(){ String msgBody = "{ \"name\":\"John\", \"age\":30, \"car\":null }"; - qs.push(queueUrl, msgBody); + qs.push(queueUrl, msgBody,1); Message msg = qs.pull(queueUrl); qs.delete(queueUrl, msg.getReceiptId()); @@ -77,9 +83,9 @@ public void testFIFO3Msgs(){ " \"car3\":\"Fiat\"\n" + " }\n" + " }"}; - qs.push(queueUrl, msgStrs[0]); - qs.push(queueUrl, msgStrs[1]); - qs.push(queueUrl, msgStrs[2]); + qs.push(queueUrl, msgStrs[0],1); + qs.push(queueUrl, msgStrs[1],2); + qs.push(queueUrl, msgStrs[2],3); Message msg1 = qs.pull(queueUrl); Message msg2 = qs.pull(queueUrl); Message msg3 = qs.pull(queueUrl); @@ -90,14 +96,13 @@ public void testFIFO3Msgs(){ @Test public void testAckTimeout(){ - InMemoryQueueService queueService = new InMemoryQueueService() { + PriorityQueueService queueService = new InMemoryQueueService() { long now() { return System.currentTimeMillis() + 1000 * 30 + 1; } }; - queueService.push(queueUrl, "Message A."); - queueService.pull(queueUrl); + queueService.push(queueUrl, "Message A.",1); Message msg = queueService.pull(queueUrl); assertTrue(msg != null && msg.getBody() == "Message A."); } From 9a5095c31b05dd19040051187d9ff4946e72a15e Mon Sep 17 00:00:00 2001 From: amankr1279 Date: Sun, 3 Mar 2024 22:23:49 +0530 Subject: [PATCH 04/10] Remove logs --- src/test/java/com/example/InMemoryQueueTest.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/test/java/com/example/InMemoryQueueTest.java b/src/test/java/com/example/InMemoryQueueTest.java index f0b1c35..cb210a6 100755 --- a/src/test/java/com/example/InMemoryQueueTest.java +++ b/src/test/java/com/example/InMemoryQueueTest.java @@ -24,10 +24,7 @@ public void setup() { @Test public void testSendMessage(){ qs.push(queueUrl, "Good message!",1); - logger.info(queueUrl); Message msg = qs.pull(queueUrl); - logger.info(msg.toString()); - logger.info(msg.getBody()); assertNotNull(msg); assertEquals("Good message!", msg.getBody()); From 1d4d59b0fa5595755b9634a416546493e09560a9 Mon Sep 17 00:00:00 2001 From: amankr1279 Date: Tue, 5 Mar 2024 14:49:19 +0530 Subject: [PATCH 05/10] Add Redis Queue Service with Gson package --- pom.xml | 10 ++ .../com/example/InMemoryQueueService.java | 4 +- src/main/java/com/example/Message.java | 5 + .../java/com/example/PriorityMessage.java | 16 ++- .../java/com/example/RedisQueueService.java | 106 ++++++++++++++++++ 5 files changed, 134 insertions(+), 7 deletions(-) create mode 100644 src/main/java/com/example/RedisQueueService.java diff --git a/pom.xml b/pom.xml index c77cd08..16c976c 100755 --- a/pom.xml +++ b/pom.xml @@ -45,5 +45,15 @@ jcabi-log 0.24.1 + + redis.clients + jedis + 3.7.0 + + + com.google.code.gson + gson + 2.8.9 + diff --git a/src/main/java/com/example/InMemoryQueueService.java b/src/main/java/com/example/InMemoryQueueService.java index 493fb1d..974f02a 100755 --- a/src/main/java/com/example/InMemoryQueueService.java +++ b/src/main/java/com/example/InMemoryQueueService.java @@ -52,7 +52,7 @@ public Message pull(String queueUrl) { if (priorityMessage == null) { return null; } else { - Message msg = priorityMessage.getMesssage(); + Message msg = priorityMessage.getMessage(); msg.setReceiptId(UUID.randomUUID().toString()); msg.incrementAttempts(); msg.setVisibleFrom(nowTime + TimeUnit.SECONDS.toMillis(visibilityTimeout)); @@ -69,7 +69,7 @@ public void delete(String queueUrl, String receiptId) { if (queue != null) { long nowTime = now(); for (PriorityMessage priorityMessage : queue) { - Message message = priorityMessage.getMesssage(); + Message message = priorityMessage.getMessage(); if (!message.isVisibleAt(nowTime) && message.getReceiptId().equals(receiptId)) { queue.remove(priorityMessage); break; diff --git a/src/main/java/com/example/Message.java b/src/main/java/com/example/Message.java index 8232f55..a4fa158 100644 --- a/src/main/java/com/example/Message.java +++ b/src/main/java/com/example/Message.java @@ -12,6 +12,11 @@ public class Message { private String msgBody; + + Message() { + + } + Message(String msgBody) { this.msgBody = msgBody; } diff --git a/src/main/java/com/example/PriorityMessage.java b/src/main/java/com/example/PriorityMessage.java index 26fcbfb..76d2d02 100644 --- a/src/main/java/com/example/PriorityMessage.java +++ b/src/main/java/com/example/PriorityMessage.java @@ -1,21 +1,27 @@ package com.example; -public class PriorityMessage implements Comparable{ +import java.io.Serializable; + +public class PriorityMessage implements Comparable,Serializable{ private Integer rank; private Long time; private Message message; + + public PriorityMessage() { + + } - public PriorityMessage(Integer rank, Long time, Message message) { + public PriorityMessage(Integer rank, Long time, Message msg) { this.rank = rank; this.time = time; - this.message = message; + this.message = msg; } public Integer getRank() { return rank; } - public void setFirst(Integer rank) { + public void setRank(Integer rank) { this.rank = rank; } @@ -28,7 +34,7 @@ public void setTime(Long time) { } - public Message getMesssage() { + public Message getMessage() { return message; } diff --git a/src/main/java/com/example/RedisQueueService.java b/src/main/java/com/example/RedisQueueService.java new file mode 100644 index 0000000..9ff2703 --- /dev/null +++ b/src/main/java/com/example/RedisQueueService.java @@ -0,0 +1,106 @@ +package com.example; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +import redis.clients.jedis.Jedis; +import redis.clients.jedis.Tuple; + +public class RedisQueueService implements PriorityQueueService { + + private final Jedis jedis; + private Integer visibilityTimeout; + private final Logger logger = LogManager.getLogger(RedisQueueService.class); + private final Gson gson = new GsonBuilder().serializeNulls().create(); + + public RedisQueueService() { + this.jedis = new Jedis("apn1-pet-wombat-34614.upstash.io", 34614, true); + this.jedis.auth("a60ea24024a240a09842e11688682b6b"); + + String propFileName = "config.properties"; + Properties confInfo = new Properties(); + try (InputStream inStream = getClass().getClassLoader().getResourceAsStream(propFileName)) { + confInfo.load(inStream); + } catch (IOException e) { + e.printStackTrace(); + } + this.visibilityTimeout = Integer.parseInt(confInfo.getProperty("visibilityTimeout", "30")); + } + + @Override + public void push(String queueUrl, String msgBody, int rank) { + Long now = now(); + PriorityMessage priorityMessage = new PriorityMessage(rank, now, new Message(msgBody)); + try { + String serializedMessage = gson.toJson(priorityMessage); + logger.debug(serializedMessage); + this.jedis.zadd(queueUrl, score(priorityMessage), serializedMessage); + } catch (Exception ex) { + ex.printStackTrace(); + } + } + + @Override + public Message pull(String queueUrl) { + Long nowTime = now(); + try { + Set tuples = this.jedis.zrangeWithScores(queueUrl, 0, 0); + logger.debug(tuples.toString()); + for (Tuple tuple : tuples) { + String deserializedMessage = tuple.getElement(); + logger.debug(deserializedMessage); + PriorityMessage priorityMessage = gson.fromJson(deserializedMessage, PriorityMessage.class); + if (priorityMessage != null && priorityMessage.getMessage() != null) { + Message msg = priorityMessage.getMessage(); + msg.setReceiptId(UUID.randomUUID().toString()); + msg.incrementAttempts(); + msg.setVisibleFrom(nowTime + TimeUnit.SECONDS.toMillis(visibilityTimeout)); + + return new Message(msg.getBody(), msg.getReceiptId()); + } + } + + } catch (Exception e) { + e.printStackTrace(); + } + return null; + } + + @Override + public void delete(String queueUrl, String receiptId) { + + } + + long now() { + return System.currentTimeMillis(); + } + + private double score(PriorityMessage priorityMessage) { + double messageScore = (double) priorityMessage.getRank() + (double) priorityMessage.getTime() / 1e12; + System.out.printf("Score :%f\n", messageScore); + return messageScore; + } + + public static void main(String[] args) { + RedisQueueService redisQueueService = new RedisQueueService(); + // redisQueueService.push("abc.com", "Hello", 1); + // redisQueueService.push("abc.com", "Hi", 2); + // redisQueueService.push("abc.com", "Hiya", 3); + + Message message = redisQueueService.pull("abc.com"); + if (message != null) { + System.out.println(message.getBody() + message.getReceiptId() + message.getAttempts()); + } + + } +} From 844e127d1fabde73767070cd2259f63770c8e251 Mon Sep 17 00:00:00 2001 From: amankr1279 Date: Tue, 5 Mar 2024 14:58:26 +0530 Subject: [PATCH 06/10] Move Redis DB details to config file --- src/main/java/com/example/RedisQueueService.java | 10 +++++++--- src/main/resources/config.properties | 8 +++++++- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/example/RedisQueueService.java b/src/main/java/com/example/RedisQueueService.java index 9ff2703..781fd3f 100644 --- a/src/main/java/com/example/RedisQueueService.java +++ b/src/main/java/com/example/RedisQueueService.java @@ -24,9 +24,6 @@ public class RedisQueueService implements PriorityQueueService { private final Gson gson = new GsonBuilder().serializeNulls().create(); public RedisQueueService() { - this.jedis = new Jedis("apn1-pet-wombat-34614.upstash.io", 34614, true); - this.jedis.auth("a60ea24024a240a09842e11688682b6b"); - String propFileName = "config.properties"; Properties confInfo = new Properties(); try (InputStream inStream = getClass().getClassLoader().getResourceAsStream(propFileName)) { @@ -35,6 +32,13 @@ public RedisQueueService() { e.printStackTrace(); } this.visibilityTimeout = Integer.parseInt(confInfo.getProperty("visibilityTimeout", "30")); + + final String host = confInfo.getProperty("host", "apn1-pet-wombat-34614.upstash.io"); + final int port = Integer.parseInt(confInfo.getProperty("port", "34614")); + final boolean isSSL = Boolean.parseBoolean(confInfo.getProperty("ssl", "true")); + + this.jedis = new Jedis(host,port,isSSL); + this.jedis.auth(confInfo.getProperty("password", "None")); } @Override diff --git a/src/main/resources/config.properties b/src/main/resources/config.properties index 9de175e..8982662 100644 --- a/src/main/resources/config.properties +++ b/src/main/resources/config.properties @@ -8,4 +8,10 @@ queueDirectory = nigel-qs fieldDelimiter = : # Visibility Timeout (in seconds) -visibilityTimeout = 30 \ No newline at end of file +visibilityTimeout = 30 + +# Redis DB +host = apn1-pet-wombat-34614.upstash.io +port = 34614 +ssl = true +password = a60ea24024a240a09842e11688682b6b \ No newline at end of file From f05903e3b779bb8d795128d519189d5e1291db9c Mon Sep 17 00:00:00 2001 From: amankr1279 Date: Tue, 5 Mar 2024 17:24:11 +0530 Subject: [PATCH 07/10] Add unit tests --- .../com/example/RedisQueueServiceTest.java | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 src/test/java/com/example/RedisQueueServiceTest.java diff --git a/src/test/java/com/example/RedisQueueServiceTest.java b/src/test/java/com/example/RedisQueueServiceTest.java new file mode 100644 index 0000000..264afc8 --- /dev/null +++ b/src/test/java/com/example/RedisQueueServiceTest.java @@ -0,0 +1,36 @@ +package com.example; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.Test; + +public class RedisQueueServiceTest { + + private RedisQueueService redisQueueService = new RedisQueueService(); + public static Logger logger = LogManager.getLogger(); + + @Test + public void testDelete() { + + } + + @Test + public void testPull() { + redisQueueService.push("test2.com", "Pull testing",2); + Message message = redisQueueService.pull("test2.com"); + assertEquals("Pull testing", message.getBody()); + assertTrue(message.getReceiptId() != null && message.getReceiptId().length() > 0); + } + + @Test + public void testPush() { + redisQueueService.push("test.com", "Push testing",1); + Message message = redisQueueService.pull("test.com"); + + assertEquals("Hello testing", message.getBody()); + assertTrue(message.getReceiptId() != null && message.getReceiptId().length() > 0); + } +} From 29ae8e7cb8e052aaddd4052bb1c7ebbbf6dc2157 Mon Sep 17 00:00:00 2001 From: amankr1279 Date: Tue, 5 Mar 2024 17:24:33 +0530 Subject: [PATCH 08/10] wip: delete method of redis queue service --- src/main/java/com/example/RedisQueueService.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/example/RedisQueueService.java b/src/main/java/com/example/RedisQueueService.java index 781fd3f..e763968 100644 --- a/src/main/java/com/example/RedisQueueService.java +++ b/src/main/java/com/example/RedisQueueService.java @@ -82,7 +82,14 @@ public Message pull(String queueUrl) { @Override public void delete(String queueUrl, String receiptId) { - + try { + Set members = this.jedis.zrange(queueUrl, 0, -1); + for (String member : members) { + logger.info(member); + } + } catch (Exception e) { + e.printStackTrace(); + } } long now() { @@ -105,6 +112,8 @@ public static void main(String[] args) { if (message != null) { System.out.println(message.getBody() + message.getReceiptId() + message.getAttempts()); } + + redisQueueService.delete("abc.com", "123"); } } From 769f48c71915d9b82efd8311c81a8fd160f89e18 Mon Sep 17 00:00:00 2001 From: amankr1279 Date: Tue, 5 Mar 2024 18:42:03 +0530 Subject: [PATCH 09/10] Fix push test --- src/test/java/com/example/RedisQueueServiceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/com/example/RedisQueueServiceTest.java b/src/test/java/com/example/RedisQueueServiceTest.java index 264afc8..6b054a4 100644 --- a/src/test/java/com/example/RedisQueueServiceTest.java +++ b/src/test/java/com/example/RedisQueueServiceTest.java @@ -30,7 +30,7 @@ public void testPush() { redisQueueService.push("test.com", "Push testing",1); Message message = redisQueueService.pull("test.com"); - assertEquals("Hello testing", message.getBody()); + assertEquals("Push testing", message.getBody()); assertTrue(message.getReceiptId() != null && message.getReceiptId().length() > 0); } } From e064b4001b0b9312cad653eb5af14377cfa57332 Mon Sep 17 00:00:00 2001 From: amankr1279 Date: Tue, 5 Mar 2024 18:42:16 +0530 Subject: [PATCH 10/10] Add delete functionality --- src/main/java/com/example/RedisQueueService.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/main/java/com/example/RedisQueueService.java b/src/main/java/com/example/RedisQueueService.java index e763968..442f955 100644 --- a/src/main/java/com/example/RedisQueueService.java +++ b/src/main/java/com/example/RedisQueueService.java @@ -86,6 +86,11 @@ public void delete(String queueUrl, String receiptId) { Set members = this.jedis.zrange(queueUrl, 0, -1); for (String member : members) { logger.info(member); + PriorityMessage priorityMessage = gson.fromJson(member, PriorityMessage.class); + Message message = priorityMessage.getMessage(); + if (message.getReceiptId() != null && message.getReceiptId().equals(receiptId)) { + this.jedis.zrem(queueUrl, member); + } } } catch (Exception e) { e.printStackTrace();