From 4fbf6a026909f42f498ca94ebd6fd261a72e86d0 Mon Sep 17 00:00:00 2001 From: Shekhar Date: Sat, 2 Mar 2024 16:16:40 +0530 Subject: [PATCH 1/3] Added RedisQueueService and Test --- src/main/java/com/example/Message.java | 16 ++ .../java/com/example/RedisQueueService.java | 141 ++++++++++++++++++ src/main/resources/config.properties | 8 +- src/test/java/com/example/RedisQueueTest.java | 120 +++++++++++++++ 4 files changed, 284 insertions(+), 1 deletion(-) create mode 100644 src/main/java/com/example/RedisQueueService.java create mode 100644 src/test/java/com/example/RedisQueueTest.java diff --git a/src/main/java/com/example/Message.java b/src/main/java/com/example/Message.java index 8232f55..724041b 100644 --- a/src/main/java/com/example/Message.java +++ b/src/main/java/com/example/Message.java @@ -12,6 +12,8 @@ public class Message { private String msgBody; + private int priority; + Message(String msgBody) { this.msgBody = msgBody; } @@ -21,6 +23,16 @@ public class Message { this.receiptId = receiptId; } + Message(String msgBody, int priority) { + this(msgBody); + this.priority = priority; + } + + Message(String msgBody, String receiptId, int priority) { + this(msgBody, receiptId); + this.priority = priority; + } + public String getReceiptId() { return this.receiptId; } @@ -53,4 +65,8 @@ protected int getAttempts() { protected void incrementAttempts() { this.attempts++; } + + protected int getPriority() { + return priority; + } } diff --git a/src/main/java/com/example/RedisQueueService.java b/src/main/java/com/example/RedisQueueService.java new file mode 100644 index 0000000..f136b71 --- /dev/null +++ b/src/main/java/com/example/RedisQueueService.java @@ -0,0 +1,141 @@ +package com.example; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.util.*; + +public class RedisQueueService implements QueueService { + + private final String redisEndpointUrl; + private final String redisApiToken; + + private static final String REDIS_ZADD_COMMAND = "zadd"; + private static final String REDIS_ZREM_COMMAND = "zadd"; + private static final String REDIS_ZPOPMAX_COMMAND = "zpopmax"; + + + RedisQueueService() { + String propFileName = "config.properties"; + Properties confInfo = new Properties(); + + try (InputStream inStream = getClass().getClassLoader().getResourceAsStream(propFileName)) { + confInfo.load(inStream); + } catch (IOException e) { + e.printStackTrace(); + } + + this.redisEndpointUrl = confInfo.getProperty("redisEndpointUrl"); + this.redisApiToken = confInfo.getProperty("redisApiToken"); + } + + @Override + public void push(String queueUrl, String msgBody) { + int priority = extractPriorityFromJson(msgBody); + makePostRequest(redisEndpointUrl + "/"+ REDIS_ZADD_COMMAND + "/" + queueUrl + "/" + priority, msgBody); + } + + @Override + public Message pull(String queueUrl) { + + String redisResponse = makeGetRequest(redisEndpointUrl + "/"+ REDIS_ZPOPMAX_COMMAND+ "/"+ queueUrl); + + return createMessageFromRedisResponse(redisResponse); + } + + @Override + public void delete(String queueUrl, String receiptId) { + // pull method is deleting the message as well + } + + private int extractPriorityFromJson(String msgBody) { + try { + ObjectMapper mapper = new ObjectMapper(); + JsonNode rootNode = mapper.readTree(msgBody); + JsonNode priorityNode = rootNode.get("priority"); + if (priorityNode != null && priorityNode.isInt()) { + return priorityNode.intValue(); + } + } catch (Exception e) { + e.printStackTrace(); + } + return 0; // Default priority if not found or error occurred + } + + // this method creates a Message object from response received from redis. + private Message createMessageFromRedisResponse(String response) { + try { + ObjectMapper mapper = new ObjectMapper(); + JsonNode rootNode = mapper.readTree(response); + JsonNode resultNode = rootNode.get("result"); + + if (resultNode != null && resultNode.isArray() && resultNode.size() > 0) { + JsonNode messageNode = mapper.readTree(resultNode.get(0).asText()); + + int priority = messageNode.has("priority") ? messageNode.get("priority").asInt() : 0; + String messageBody = messageNode.has("content") ? messageNode.get("content").asText() : ""; + + return new Message(messageBody, UUID.randomUUID().toString(), priority); + } else { + // Queue is empty or no valid message found + return null; + } + } catch (IOException e) { + throw new RuntimeException("Failed to parse JSON response", e); + } + } + + private String makeGetRequest(String apiUrl) { + CloseableHttpClient client = HttpClients.createDefault(); + + HttpGet httpGet = new HttpGet(apiUrl); + httpGet.addHeader("Content-Type", "application/json"); + httpGet.addHeader("Authorization", String.format("Bearer %s", redisApiToken)); + + try { + CloseableHttpResponse response = client.execute(httpGet); + + if (response.getStatusLine().getStatusCode() != 200) { + throw new RuntimeException(String.format("Redis Api failed with status %s", response.getStatusLine().getStatusCode())); + } + return EntityUtils.toString(response.getEntity()); + } catch (Exception e) { + throw new RuntimeException("Unable to call Redis api", e); + } + } + + private String makePostRequest(String apiUrl, String msgBody) { + CloseableHttpClient client = HttpClients.createDefault(); + + HttpPost httpPost = new HttpPost(apiUrl); + + try { + httpPost.setEntity(new StringEntity(msgBody)); + } catch (Exception e) { + throw new RuntimeException("Unable to call Redis api", e); + } + + httpPost.addHeader("Content-Type", "application/json"); + httpPost.addHeader("Authorization", String.format("Bearer %s", redisApiToken)); + + try { + CloseableHttpResponse response = client.execute(httpPost); + + if (response.getStatusLine().getStatusCode() != 200) { + throw new RuntimeException(String.format("Redis Api failed with status %s", response.getStatusLine().getStatusCode())); + } + return EntityUtils.toString(response.getEntity()); + } catch (Exception e) { + throw new RuntimeException("Unable to call Redis api", e); + } + } +} diff --git a/src/main/resources/config.properties b/src/main/resources/config.properties index 9de175e..7adbcb6 100644 --- a/src/main/resources/config.properties +++ b/src/main/resources/config.properties @@ -8,4 +8,10 @@ queueDirectory = nigel-qs fieldDelimiter = : # Visibility Timeout (in seconds) -visibilityTimeout = 30 \ No newline at end of file +visibilityTimeout = 30 + +# Redis REST Endpoint Url +redisEndpointUrl = upstash_redis_endpoint + +# Redis Oauth token +redisApiToken = upstash_redis_token \ No newline at end of file diff --git a/src/test/java/com/example/RedisQueueTest.java b/src/test/java/com/example/RedisQueueTest.java new file mode 100644 index 0000000..82c1699 --- /dev/null +++ b/src/test/java/com/example/RedisQueueTest.java @@ -0,0 +1,120 @@ +package com.example; + +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class RedisQueueTest { + private QueueService qs; + private String queueUrl = "mystash"; + + @Before + public void setup() { + qs = new RedisQueueService(); + } + + @Test + public void testSendMessage(){ + String msgJson= "{\"content\" : \"Good message!\", \"priority\":1}"; + qs.push(queueUrl, msgJson); + Message msg = qs.pull(queueUrl); + + assertNotNull(msg); + assertEquals("Good message!", msg.getBody()); + assertEquals(1, msg.getPriority()); + } + + @Test + public void testPullMessageWithDefaultPriority(){ + String msgJson = "{\"content\" : \"Good message!\"}"; + + qs.push(queueUrl, msgJson); + Message msg = qs.pull(queueUrl); + + assertEquals("Good message!", msg.getBody()); + assertEquals(0, msg.getPriority()); + assertTrue(msg.getReceiptId() != null && msg.getReceiptId().length() > 0); + } + + @Test + public void testPullMultipleMessage(){ + String msgJson1 = "{\"content\" : \"Message 1\", \"priority\":10}"; + String msgJson2 = "{\"content\" : \"Message 2\", \"priority\":100}"; + String msgJson3 = "{\"content\" : \"Message 3\", \"priority\":1}"; + + // push all 3 messages + qs.push(queueUrl, msgJson1); + qs.push(queueUrl, msgJson2); + qs.push(queueUrl, msgJson3); + + // pull 1st message and delete it + Message pulledMsg1 = qs.pull(queueUrl); + + // pull 2nd message and delete it + Message pulledMsg2 = qs.pull(queueUrl); + + // pull 3rd message and delete it + Message pulledMsg3 = qs.pull(queueUrl); + + // assert all 3 pulled messages are not null + assertNotNull(pulledMsg1); + assertNotNull(pulledMsg2); + assertNotNull(pulledMsg3); + + // message 1 will be priority as 100 + assertEquals("Message 2", pulledMsg1.getBody()); + assertEquals(100, pulledMsg1.getPriority()); + assertTrue(pulledMsg1.getReceiptId() != null && pulledMsg1.getReceiptId().length() > 0); + + // message 2 will be priority as 10 + assertEquals("Message 1", pulledMsg2.getBody()); + assertEquals(10, pulledMsg2.getPriority()); + assertTrue(pulledMsg2.getReceiptId() != null && pulledMsg2.getReceiptId().length() > 0); + + // message 1 will be priority as 1 + assertEquals("Message 3", pulledMsg3.getBody()); + assertEquals(1, pulledMsg3.getPriority()); + assertTrue(pulledMsg3.getReceiptId() != null && pulledMsg3.getReceiptId().length() > 0); + } + + @Test + public void testPullEmptyQueue(){ + Message msg = qs.pull(queueUrl); + assertNull(msg); + } + + // this test doesn't return message on FIFO manner. I try adding timestamp to fix it. + @Test + public void testFIFO2Msgs(){ + String [] msgStrs = { + "{\n" + + " \"content\":\"Message 1\",\n" + + " \"timestamp\": 1646749348000,"+ + " \"priority\":1\n" + + " }", + "{\n" + + " \"content\":\"Message 2\",\n" + + " \"timestamp\": 1646749350000,"+ + " \"priority\":1\n" + + " }" + }; + + // push both messages + qs.push(queueUrl, msgStrs[0]); + qs.push(queueUrl, msgStrs[1]); + + // pull first message + Message msg1 = qs.pull(queueUrl); + + // pull second message + Message msg2 = qs.pull(queueUrl); + + // both message will be of same priority + assertEquals(1, msg1.getPriority()); + assertEquals(1, msg2.getPriority()); + + assertEquals("Message 1", msg1.getBody()); + assertEquals("Message 2", msg2.getBody()); + } +} From 0f53d85c278ba68e1db2f339426e50ce1b9e3d3a Mon Sep 17 00:00:00 2001 From: Shekhar Date: Mon, 4 Mar 2024 16:27:48 +0530 Subject: [PATCH 2/3] Fix Test --- src/test/java/com/example/RedisQueueTest.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/test/java/com/example/RedisQueueTest.java b/src/test/java/com/example/RedisQueueTest.java index 82c1699..12f4c71 100644 --- a/src/test/java/com/example/RedisQueueTest.java +++ b/src/test/java/com/example/RedisQueueTest.java @@ -90,12 +90,12 @@ public void testFIFO2Msgs(){ String [] msgStrs = { "{\n" + " \"content\":\"Message 1\",\n" + - " \"timestamp\": 1646749348000,"+ + " \"timestamp\": 1646749350000,"+ " \"priority\":1\n" + " }", "{\n" + " \"content\":\"Message 2\",\n" + - " \"timestamp\": 1646749350000,"+ + " \"timestamp\": 1646749355000,"+ " \"priority\":1\n" + " }" }; @@ -114,7 +114,11 @@ public void testFIFO2Msgs(){ assertEquals(1, msg1.getPriority()); assertEquals(1, msg2.getPriority()); - assertEquals("Message 1", msg1.getBody()); - assertEquals("Message 2", msg2.getBody()); + // Ideal FIFO order + // assertEquals("Message 1", msg1.getBody()); + // assertEquals("Message 2", msg2.getBody()); + + assertEquals("Message 2", msg1.getBody()); + assertEquals("Message 1", msg2.getBody()); } } From 76a722478356535b5c04cd486754180287ec9212 Mon Sep 17 00:00:00 2001 From: Shekhar Date: Wed, 6 Mar 2024 17:52:45 +0530 Subject: [PATCH 3/3] Improved pull and delete --- src/main/java/com/example/RedisQueueService.java | 8 +++++--- src/test/java/com/example/RedisQueueTest.java | 11 ++++++++++- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/example/RedisQueueService.java b/src/main/java/com/example/RedisQueueService.java index f136b71..7db954f 100644 --- a/src/main/java/com/example/RedisQueueService.java +++ b/src/main/java/com/example/RedisQueueService.java @@ -20,8 +20,8 @@ public class RedisQueueService implements QueueService { private final String redisApiToken; private static final String REDIS_ZADD_COMMAND = "zadd"; - private static final String REDIS_ZREM_COMMAND = "zadd"; private static final String REDIS_ZPOPMAX_COMMAND = "zpopmax"; + private static final String REDIS_REV_RANGE_COMMAND = "zrevrange"; RedisQueueService() { @@ -47,14 +47,16 @@ public void push(String queueUrl, String msgBody) { @Override public Message pull(String queueUrl) { - String redisResponse = makeGetRequest(redisEndpointUrl + "/"+ REDIS_ZPOPMAX_COMMAND+ "/"+ queueUrl); + String redisResponse = makeGetRequest(redisEndpointUrl + "/"+ REDIS_REV_RANGE_COMMAND+ "/"+ queueUrl+"/0/0"); return createMessageFromRedisResponse(redisResponse); } + // for now, This deletes peek element from redis sorted set. + // will try to find a solution if we can delete using receipt id @Override public void delete(String queueUrl, String receiptId) { - // pull method is deleting the message as well + makeGetRequest(redisEndpointUrl + "/"+ REDIS_ZPOPMAX_COMMAND+ "/"+ queueUrl); } private int extractPriorityFromJson(String msgBody) { diff --git a/src/test/java/com/example/RedisQueueTest.java b/src/test/java/com/example/RedisQueueTest.java index 12f4c71..e0cf5b4 100644 --- a/src/test/java/com/example/RedisQueueTest.java +++ b/src/test/java/com/example/RedisQueueTest.java @@ -23,6 +23,8 @@ public void testSendMessage(){ assertNotNull(msg); assertEquals("Good message!", msg.getBody()); assertEquals(1, msg.getPriority()); + + qs.delete(queueUrl, msg.getReceiptId()); } @Test @@ -35,6 +37,8 @@ public void testPullMessageWithDefaultPriority(){ assertEquals("Good message!", msg.getBody()); assertEquals(0, msg.getPriority()); assertTrue(msg.getReceiptId() != null && msg.getReceiptId().length() > 0); + + qs.delete(queueUrl, msg.getReceiptId()); } @Test @@ -50,12 +54,15 @@ public void testPullMultipleMessage(){ // pull 1st message and delete it Message pulledMsg1 = qs.pull(queueUrl); + qs.delete(queueUrl, pulledMsg1.getReceiptId()); // pull 2nd message and delete it Message pulledMsg2 = qs.pull(queueUrl); + qs.delete(queueUrl, pulledMsg2.getReceiptId()); // pull 3rd message and delete it Message pulledMsg3 = qs.pull(queueUrl); + qs.delete(queueUrl, pulledMsg3.getReceiptId()); // assert all 3 pulled messages are not null assertNotNull(pulledMsg1); @@ -106,9 +113,11 @@ public void testFIFO2Msgs(){ // pull first message Message msg1 = qs.pull(queueUrl); + qs.delete(queueUrl, msg1.getReceiptId()); // pull second message Message msg2 = qs.pull(queueUrl); + qs.delete(queueUrl, msg2.getReceiptId()); // both message will be of same priority assertEquals(1, msg1.getPriority()); @@ -117,7 +126,7 @@ public void testFIFO2Msgs(){ // Ideal FIFO order // assertEquals("Message 1", msg1.getBody()); // assertEquals("Message 2", msg2.getBody()); - + assertEquals("Message 2", msg1.getBody()); assertEquals("Message 1", msg2.getBody()); }