diff --git a/.gitignore b/.gitignore
index c27e6ec..d5e2459 100644
--- a/.gitignore
+++ b/.gitignore
@@ -4,3 +4,5 @@
nigel*
target/
.DS_Store
+.vscode
+*.log
diff --git a/pom.xml b/pom.xml
index 85d4420..16c976c 100755
--- a/pom.xml
+++ b/pom.xml
@@ -30,5 +30,30 @@
aws-java-sdk
1.10.65
+
+ org.apache.logging.log4j
+ log4j-api
+ 2.13.0
+
+
+ org.apache.logging.log4j
+ log4j-core
+ 2.13.0
+
+
+ com.jcabi
+ jcabi-log
+ 0.24.1
+
+
+ redis.clients
+ jedis
+ 3.7.0
+
+
+ com.google.code.gson
+ gson
+ 2.8.9
+
diff --git a/src/main/java/com/example/InMemoryQueueService.java b/src/main/java/com/example/InMemoryQueueService.java
index 76605cc..974f02a 100755
--- a/src/main/java/com/example/InMemoryQueueService.java
+++ b/src/main/java/com/example/InMemoryQueueService.java
@@ -4,11 +4,11 @@
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
-public class InMemoryQueueService implements QueueService {
- private final Map> queues;
+public class InMemoryQueueService implements PriorityQueueService {
+ private final Map> queues;
private long visibilityTimeout;
@@ -27,46 +27,51 @@ public class InMemoryQueueService implements QueueService {
}
@Override
- public void push(String queueUrl, String msgBody) {
- Queue queue = queues.get(queueUrl);
+ public void push(String queueUrl, String msgBody, int rank) {
+ // priority --> numerically lowest rank, lowest time, message
+ PriorityBlockingQueue queue = queues.get(queueUrl);
if (queue == null) {
- queue = new ConcurrentLinkedQueue<>();
+ queue = new PriorityBlockingQueue();
queues.put(queueUrl, queue);
}
- queue.add(new Message(msgBody));
+ Long now = now();
+ PriorityMessage priorityMessage = new PriorityMessage(rank,now, new Message(msgBody));
+ queue.add(priorityMessage);
}
@Override
public Message pull(String queueUrl) {
- Queue queue = queues.get(queueUrl);
+ PriorityBlockingQueue queue = queues.get(queueUrl);
if (queue == null) {
return null;
}
-
long nowTime = now();
- Optional msgOpt = queue.stream().filter(m -> m.isVisibleAt(nowTime)).findFirst();
- if (msgOpt.isEmpty()) {
- return null;
- } else {
- Message msg = msgOpt.get();
- msg.setReceiptId(UUID.randomUUID().toString());
- msg.incrementAttempts();
- msg.setVisibleFrom(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(visibilityTimeout));
- return new Message(msg.getBody(), msg.getReceiptId());
+ while (!queue.isEmpty()) {
+ PriorityMessage priorityMessage = queue.poll();
+ if (priorityMessage == null) {
+ return null;
+ } else {
+ Message msg = priorityMessage.getMessage();
+ msg.setReceiptId(UUID.randomUUID().toString());
+ msg.incrementAttempts();
+ msg.setVisibleFrom(nowTime + TimeUnit.SECONDS.toMillis(visibilityTimeout));
+
+ return new Message(msg.getBody(), msg.getReceiptId());
+ }
}
+ return null;
}
@Override
public void delete(String queueUrl, String receiptId) {
- Queue queue = queues.get(queueUrl);
+ PriorityBlockingQueue queue = queues.get(queueUrl);
if (queue != null) {
long nowTime = now();
-
- for (Iterator it = queue.iterator(); it.hasNext(); ) {
- Message msg = it.next();
- if (!msg.isVisibleAt(nowTime) && msg.getReceiptId().equals(receiptId)) {
- it.remove();
+ for (PriorityMessage priorityMessage : queue) {
+ Message message = priorityMessage.getMessage();
+ if (!message.isVisibleAt(nowTime) && message.getReceiptId().equals(receiptId)) {
+ queue.remove(priorityMessage);
break;
}
}
diff --git a/src/main/java/com/example/Main.java b/src/main/java/com/example/Main.java
new file mode 100644
index 0000000..9ca4025
--- /dev/null
+++ b/src/main/java/com/example/Main.java
@@ -0,0 +1,16 @@
+package com.example;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+
+public class Main {
+ public static Logger logger = LogManager.getLogger(Main.class);
+
+ public static void main(String[] args) {
+ logger.trace("We've just greeted the user!");
+ logger.info("Hello");
+ logger.debug("Hello");
+ System.out.println("Hello");
+ }
+}
diff --git a/src/main/java/com/example/Message.java b/src/main/java/com/example/Message.java
index 8232f55..a4fa158 100644
--- a/src/main/java/com/example/Message.java
+++ b/src/main/java/com/example/Message.java
@@ -12,6 +12,11 @@ public class Message {
private String msgBody;
+
+ Message() {
+
+ }
+
Message(String msgBody) {
this.msgBody = msgBody;
}
diff --git a/src/main/java/com/example/PriorityMessage.java b/src/main/java/com/example/PriorityMessage.java
new file mode 100644
index 0000000..76d2d02
--- /dev/null
+++ b/src/main/java/com/example/PriorityMessage.java
@@ -0,0 +1,54 @@
+package com.example;
+
+import java.io.Serializable;
+
+public class PriorityMessage implements Comparable,Serializable{
+ private Integer rank;
+ private Long time;
+ private Message message;
+
+ public PriorityMessage() {
+
+ }
+
+ public PriorityMessage(Integer rank, Long time, Message msg) {
+ this.rank = rank;
+ this.time = time;
+ this.message = msg;
+ }
+
+ public Integer getRank() {
+ return rank;
+ }
+
+ public void setRank(Integer rank) {
+ this.rank = rank;
+ }
+
+ public Long getTime() {
+ return time;
+ }
+
+ public void setTime(Long time) {
+ this.time = time;
+ }
+
+
+ public Message getMessage() {
+ return message;
+ }
+
+ public void setMessage(Message message) {
+ this.message = message;
+ }
+
+ @Override
+ public int compareTo(PriorityMessage other) {
+ // Higher rank messages should come after lower rank messages
+ int rankComparison = Integer.compare(this.rank, other.getRank());
+ if (rankComparison == 0) {
+ return Long.compare(this.time, other.time);
+ }
+ return rankComparison;
+ }
+}
diff --git a/src/main/java/com/example/PriorityQueueService.java b/src/main/java/com/example/PriorityQueueService.java
new file mode 100644
index 0000000..1034215
--- /dev/null
+++ b/src/main/java/com/example/PriorityQueueService.java
@@ -0,0 +1,11 @@
+package com.example;
+
+public interface PriorityQueueService {
+
+ public void push(String queueUrl, String msgBody, int rank);
+
+ public Message pull(String queueUrl);
+
+ public void delete(String queueUrl, String receiptId);
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/example/RedisQueueService.java b/src/main/java/com/example/RedisQueueService.java
new file mode 100644
index 0000000..442f955
--- /dev/null
+++ b/src/main/java/com/example/RedisQueueService.java
@@ -0,0 +1,124 @@
+package com.example;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Tuple;
+
+public class RedisQueueService implements PriorityQueueService {
+
+ private final Jedis jedis;
+ private Integer visibilityTimeout;
+ private final Logger logger = LogManager.getLogger(RedisQueueService.class);
+ private final Gson gson = new GsonBuilder().serializeNulls().create();
+
+ public RedisQueueService() {
+ String propFileName = "config.properties";
+ Properties confInfo = new Properties();
+ try (InputStream inStream = getClass().getClassLoader().getResourceAsStream(propFileName)) {
+ confInfo.load(inStream);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ this.visibilityTimeout = Integer.parseInt(confInfo.getProperty("visibilityTimeout", "30"));
+
+ final String host = confInfo.getProperty("host", "apn1-pet-wombat-34614.upstash.io");
+ final int port = Integer.parseInt(confInfo.getProperty("port", "34614"));
+ final boolean isSSL = Boolean.parseBoolean(confInfo.getProperty("ssl", "true"));
+
+ this.jedis = new Jedis(host,port,isSSL);
+ this.jedis.auth(confInfo.getProperty("password", "None"));
+ }
+
+ @Override
+ public void push(String queueUrl, String msgBody, int rank) {
+ Long now = now();
+ PriorityMessage priorityMessage = new PriorityMessage(rank, now, new Message(msgBody));
+ try {
+ String serializedMessage = gson.toJson(priorityMessage);
+ logger.debug(serializedMessage);
+ this.jedis.zadd(queueUrl, score(priorityMessage), serializedMessage);
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ }
+
+ @Override
+ public Message pull(String queueUrl) {
+ Long nowTime = now();
+ try {
+ Set tuples = this.jedis.zrangeWithScores(queueUrl, 0, 0);
+ logger.debug(tuples.toString());
+ for (Tuple tuple : tuples) {
+ String deserializedMessage = tuple.getElement();
+ logger.debug(deserializedMessage);
+ PriorityMessage priorityMessage = gson.fromJson(deserializedMessage, PriorityMessage.class);
+ if (priorityMessage != null && priorityMessage.getMessage() != null) {
+ Message msg = priorityMessage.getMessage();
+ msg.setReceiptId(UUID.randomUUID().toString());
+ msg.incrementAttempts();
+ msg.setVisibleFrom(nowTime + TimeUnit.SECONDS.toMillis(visibilityTimeout));
+
+ return new Message(msg.getBody(), msg.getReceiptId());
+ }
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ @Override
+ public void delete(String queueUrl, String receiptId) {
+ try {
+ Set members = this.jedis.zrange(queueUrl, 0, -1);
+ for (String member : members) {
+ logger.info(member);
+ PriorityMessage priorityMessage = gson.fromJson(member, PriorityMessage.class);
+ Message message = priorityMessage.getMessage();
+ if (message.getReceiptId() != null && message.getReceiptId().equals(receiptId)) {
+ this.jedis.zrem(queueUrl, member);
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ long now() {
+ return System.currentTimeMillis();
+ }
+
+ private double score(PriorityMessage priorityMessage) {
+ double messageScore = (double) priorityMessage.getRank() + (double) priorityMessage.getTime() / 1e12;
+ System.out.printf("Score :%f\n", messageScore);
+ return messageScore;
+ }
+
+ public static void main(String[] args) {
+ RedisQueueService redisQueueService = new RedisQueueService();
+ // redisQueueService.push("abc.com", "Hello", 1);
+ // redisQueueService.push("abc.com", "Hi", 2);
+ // redisQueueService.push("abc.com", "Hiya", 3);
+
+ Message message = redisQueueService.pull("abc.com");
+ if (message != null) {
+ System.out.println(message.getBody() + message.getReceiptId() + message.getAttempts());
+ }
+
+ redisQueueService.delete("abc.com", "123");
+
+ }
+}
diff --git a/src/main/resources/config.properties b/src/main/resources/config.properties
index 9de175e..8982662 100644
--- a/src/main/resources/config.properties
+++ b/src/main/resources/config.properties
@@ -8,4 +8,10 @@ queueDirectory = nigel-qs
fieldDelimiter = :
# Visibility Timeout (in seconds)
-visibilityTimeout = 30
\ No newline at end of file
+visibilityTimeout = 30
+
+# Redis DB
+host = apn1-pet-wombat-34614.upstash.io
+port = 34614
+ssl = true
+password = a60ea24024a240a09842e11688682b6b
\ No newline at end of file
diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml
new file mode 100644
index 0000000..01ae38c
--- /dev/null
+++ b/src/main/resources/log4j2.xml
@@ -0,0 +1,18 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/test/java/com/example/InMemoryQueueTest.java b/src/test/java/com/example/InMemoryQueueTest.java
index 26bf4ae..cb210a6 100755
--- a/src/test/java/com/example/InMemoryQueueTest.java
+++ b/src/test/java/com/example/InMemoryQueueTest.java
@@ -5,12 +5,15 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.junit.Before;
import org.junit.Test;
public class InMemoryQueueTest {
- private QueueService qs;
+ private PriorityQueueService qs;
private String queueUrl = "https://sqs.ap-1.amazonaws.com/007/MyQueue";
+ public static Logger logger = LogManager.getLogger(InMemoryQueueTest.class);
@Before
public void setup() {
@@ -20,7 +23,7 @@ public void setup() {
@Test
public void testSendMessage(){
- qs.push(queueUrl, "Good message!");
+ qs.push(queueUrl, "Good message!",1);
Message msg = qs.pull(queueUrl);
assertNotNull(msg);
@@ -31,7 +34,7 @@ public void testSendMessage(){
public void testPullMessage(){
String msgBody = "{ \"name\":\"John\", \"age\":30, \"car\":null }";
- qs.push(queueUrl, msgBody);
+ qs.push(queueUrl, msgBody,1);
Message msg = qs.pull(queueUrl);
assertEquals(msgBody, msg.getBody());
@@ -46,7 +49,7 @@ public void testPullEmptyQueue(){
@Test
public void testDoublePull(){
- qs.push(queueUrl, "Message A.");
+ qs.push(queueUrl, "Message A.",1);
qs.pull(queueUrl);
Message msg = qs.pull(queueUrl);
assertNull(msg);
@@ -56,7 +59,7 @@ public void testDoublePull(){
public void testDeleteMessage(){
String msgBody = "{ \"name\":\"John\", \"age\":30, \"car\":null }";
- qs.push(queueUrl, msgBody);
+ qs.push(queueUrl, msgBody,1);
Message msg = qs.pull(queueUrl);
qs.delete(queueUrl, msg.getReceiptId());
@@ -77,9 +80,9 @@ public void testFIFO3Msgs(){
" \"car3\":\"Fiat\"\n" +
" }\n" +
" }"};
- qs.push(queueUrl, msgStrs[0]);
- qs.push(queueUrl, msgStrs[1]);
- qs.push(queueUrl, msgStrs[2]);
+ qs.push(queueUrl, msgStrs[0],1);
+ qs.push(queueUrl, msgStrs[1],2);
+ qs.push(queueUrl, msgStrs[2],3);
Message msg1 = qs.pull(queueUrl);
Message msg2 = qs.pull(queueUrl);
Message msg3 = qs.pull(queueUrl);
@@ -90,14 +93,13 @@ public void testFIFO3Msgs(){
@Test
public void testAckTimeout(){
- InMemoryQueueService queueService = new InMemoryQueueService() {
+ PriorityQueueService queueService = new InMemoryQueueService() {
long now() {
return System.currentTimeMillis() + 1000 * 30 + 1;
}
};
- queueService.push(queueUrl, "Message A.");
- queueService.pull(queueUrl);
+ queueService.push(queueUrl, "Message A.",1);
Message msg = queueService.pull(queueUrl);
assertTrue(msg != null && msg.getBody() == "Message A.");
}
diff --git a/src/test/java/com/example/RedisQueueServiceTest.java b/src/test/java/com/example/RedisQueueServiceTest.java
new file mode 100644
index 0000000..6b054a4
--- /dev/null
+++ b/src/test/java/com/example/RedisQueueServiceTest.java
@@ -0,0 +1,36 @@
+package com.example;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.Test;
+
+public class RedisQueueServiceTest {
+
+ private RedisQueueService redisQueueService = new RedisQueueService();
+ public static Logger logger = LogManager.getLogger();
+
+ @Test
+ public void testDelete() {
+
+ }
+
+ @Test
+ public void testPull() {
+ redisQueueService.push("test2.com", "Pull testing",2);
+ Message message = redisQueueService.pull("test2.com");
+ assertEquals("Pull testing", message.getBody());
+ assertTrue(message.getReceiptId() != null && message.getReceiptId().length() > 0);
+ }
+
+ @Test
+ public void testPush() {
+ redisQueueService.push("test.com", "Push testing",1);
+ Message message = redisQueueService.pull("test.com");
+
+ assertEquals("Push testing", message.getBody());
+ assertTrue(message.getReceiptId() != null && message.getReceiptId().length() > 0);
+ }
+}