Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@
nigel*
target/
.DS_Store
.vscode
*.log
25 changes: 25 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,30 @@
<artifactId>aws-java-sdk</artifactId>
<version>1.10.65</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.13.0</version>
</dependency>
<dependency>
<groupId>com.jcabi</groupId>
<artifactId>jcabi-log</artifactId>
<version>0.24.1</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.7.0</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.9</version>
</dependency>
</dependencies>
</project>
53 changes: 29 additions & 24 deletions src/main/java/com/example/InMemoryQueueService.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;

public class InMemoryQueueService implements QueueService {
private final Map<String, Queue<Message>> queues;
public class InMemoryQueueService implements PriorityQueueService {
private final Map<String, PriorityBlockingQueue<PriorityMessage>> queues;

private long visibilityTimeout;

Expand All @@ -27,46 +27,51 @@ public class InMemoryQueueService implements QueueService {
}

@Override
public void push(String queueUrl, String msgBody) {
Queue<Message> queue = queues.get(queueUrl);
public void push(String queueUrl, String msgBody, int rank) {
// priority --> numerically lowest rank, lowest time, message
PriorityBlockingQueue<PriorityMessage> queue = queues.get(queueUrl);
if (queue == null) {
queue = new ConcurrentLinkedQueue<>();
queue = new PriorityBlockingQueue<PriorityMessage>();
queues.put(queueUrl, queue);
}
queue.add(new Message(msgBody));
Long now = now();
PriorityMessage priorityMessage = new PriorityMessage(rank,now, new Message(msgBody));
queue.add(priorityMessage);
}

@Override
public Message pull(String queueUrl) {
Queue<Message> queue = queues.get(queueUrl);
PriorityBlockingQueue<PriorityMessage> queue = queues.get(queueUrl);
if (queue == null) {
return null;
}

long nowTime = now();
Optional<Message> msgOpt = queue.stream().filter(m -> m.isVisibleAt(nowTime)).findFirst();
if (msgOpt.isEmpty()) {
return null;
} else {
Message msg = msgOpt.get();
msg.setReceiptId(UUID.randomUUID().toString());
msg.incrementAttempts();
msg.setVisibleFrom(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(visibilityTimeout));

return new Message(msg.getBody(), msg.getReceiptId());
while (!queue.isEmpty()) {
PriorityMessage priorityMessage = queue.poll();
if (priorityMessage == null) {
return null;
} else {
Message msg = priorityMessage.getMessage();
msg.setReceiptId(UUID.randomUUID().toString());
msg.incrementAttempts();
msg.setVisibleFrom(nowTime + TimeUnit.SECONDS.toMillis(visibilityTimeout));

return new Message(msg.getBody(), msg.getReceiptId());
}
}
return null;
}

@Override
public void delete(String queueUrl, String receiptId) {
Queue<Message> queue = queues.get(queueUrl);
PriorityBlockingQueue<PriorityMessage> queue = queues.get(queueUrl);
if (queue != null) {
long nowTime = now();

for (Iterator<Message> it = queue.iterator(); it.hasNext(); ) {
Message msg = it.next();
if (!msg.isVisibleAt(nowTime) && msg.getReceiptId().equals(receiptId)) {
it.remove();
for (PriorityMessage priorityMessage : queue) {
Message message = priorityMessage.getMessage();
if (!message.isVisibleAt(nowTime) && message.getReceiptId().equals(receiptId)) {
queue.remove(priorityMessage);
break;
}
}
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/com/example/Main.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.example;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;


public class Main {
public static Logger logger = LogManager.getLogger(Main.class);

public static void main(String[] args) {
logger.trace("We've just greeted the user!");
logger.info("Hello");
logger.debug("Hello");
System.out.println("Hello");
}
}
5 changes: 5 additions & 0 deletions src/main/java/com/example/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ public class Message {

private String msgBody;


Message() {

}

Message(String msgBody) {
this.msgBody = msgBody;
}
Expand Down
54 changes: 54 additions & 0 deletions src/main/java/com/example/PriorityMessage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.example;

import java.io.Serializable;

public class PriorityMessage implements Comparable<PriorityMessage>,Serializable{
private Integer rank;
private Long time;
private Message message;

public PriorityMessage() {

}

public PriorityMessage(Integer rank, Long time, Message msg) {
this.rank = rank;
this.time = time;
this.message = msg;
}

public Integer getRank() {
return rank;
}

public void setRank(Integer rank) {
this.rank = rank;
}

public Long getTime() {
return time;
}

public void setTime(Long time) {
this.time = time;
}


public Message getMessage() {
return message;
}

public void setMessage(Message message) {
this.message = message;
}

@Override
public int compareTo(PriorityMessage other) {
// Higher rank messages should come after lower rank messages
int rankComparison = Integer.compare(this.rank, other.getRank());
if (rankComparison == 0) {
return Long.compare(this.time, other.time);
}
return rankComparison;
}
}
11 changes: 11 additions & 0 deletions src/main/java/com/example/PriorityQueueService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.example;

public interface PriorityQueueService {

public void push(String queueUrl, String msgBody, int rank);

public Message pull(String queueUrl);

public void delete(String queueUrl, String receiptId);

}
124 changes: 124 additions & 0 deletions src/main/java/com/example/RedisQueueService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package com.example;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Tuple;

public class RedisQueueService implements PriorityQueueService {

private final Jedis jedis;
private Integer visibilityTimeout;
private final Logger logger = LogManager.getLogger(RedisQueueService.class);
private final Gson gson = new GsonBuilder().serializeNulls().create();

public RedisQueueService() {
String propFileName = "config.properties";
Properties confInfo = new Properties();
try (InputStream inStream = getClass().getClassLoader().getResourceAsStream(propFileName)) {
confInfo.load(inStream);
} catch (IOException e) {
e.printStackTrace();
}
this.visibilityTimeout = Integer.parseInt(confInfo.getProperty("visibilityTimeout", "30"));

final String host = confInfo.getProperty("host", "apn1-pet-wombat-34614.upstash.io");
final int port = Integer.parseInt(confInfo.getProperty("port", "34614"));
final boolean isSSL = Boolean.parseBoolean(confInfo.getProperty("ssl", "true"));

this.jedis = new Jedis(host,port,isSSL);
this.jedis.auth(confInfo.getProperty("password", "None"));
}

@Override
public void push(String queueUrl, String msgBody, int rank) {
Long now = now();
PriorityMessage priorityMessage = new PriorityMessage(rank, now, new Message(msgBody));
try {
String serializedMessage = gson.toJson(priorityMessage);
logger.debug(serializedMessage);
this.jedis.zadd(queueUrl, score(priorityMessage), serializedMessage);
} catch (Exception ex) {
ex.printStackTrace();
}
}

@Override
public Message pull(String queueUrl) {
Long nowTime = now();
try {
Set<Tuple> tuples = this.jedis.zrangeWithScores(queueUrl, 0, 0);
logger.debug(tuples.toString());
for (Tuple tuple : tuples) {
String deserializedMessage = tuple.getElement();
logger.debug(deserializedMessage);
PriorityMessage priorityMessage = gson.fromJson(deserializedMessage, PriorityMessage.class);
if (priorityMessage != null && priorityMessage.getMessage() != null) {
Message msg = priorityMessage.getMessage();
msg.setReceiptId(UUID.randomUUID().toString());
msg.incrementAttempts();
msg.setVisibleFrom(nowTime + TimeUnit.SECONDS.toMillis(visibilityTimeout));

return new Message(msg.getBody(), msg.getReceiptId());
}
}

} catch (Exception e) {
e.printStackTrace();
}
return null;
}

@Override
public void delete(String queueUrl, String receiptId) {
try {
Set<String> members = this.jedis.zrange(queueUrl, 0, -1);
for (String member : members) {
logger.info(member);
PriorityMessage priorityMessage = gson.fromJson(member, PriorityMessage.class);
Message message = priorityMessage.getMessage();
if (message.getReceiptId() != null && message.getReceiptId().equals(receiptId)) {
this.jedis.zrem(queueUrl, member);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}

long now() {
return System.currentTimeMillis();
}

private double score(PriorityMessage priorityMessage) {
double messageScore = (double) priorityMessage.getRank() + (double) priorityMessage.getTime() / 1e12;
System.out.printf("Score :%f\n", messageScore);
return messageScore;
}

public static void main(String[] args) {
RedisQueueService redisQueueService = new RedisQueueService();
// redisQueueService.push("abc.com", "Hello", 1);
// redisQueueService.push("abc.com", "Hi", 2);
// redisQueueService.push("abc.com", "Hiya", 3);

Message message = redisQueueService.pull("abc.com");
if (message != null) {
System.out.println(message.getBody() + message.getReceiptId() + message.getAttempts());
}

redisQueueService.delete("abc.com", "123");

}
}
8 changes: 7 additions & 1 deletion src/main/resources/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,10 @@ queueDirectory = nigel-qs
fieldDelimiter = :

# Visibility Timeout (in seconds)
visibilityTimeout = 30
visibilityTimeout = 30

# Redis DB
host = apn1-pet-wombat-34614.upstash.io
port = 34614
ssl = true
password = a60ea24024a240a09842e11688682b6b
18 changes: 18 additions & 0 deletions src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO">
<Appenders>
<Console name="ConsoleAppender" target="SYSTEM_OUT">
<!-- <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" /> -->
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %highlight{%level}{FATAL=bg_red, ERROR=red, WARN=yellow, INFO=green, DEBUG=blue} - %msg%n" />
</Console>
<File name="FileAppender" fileName="application-${date:yyyyMMdd}.log" immediateFlush="false" append="true">
<PatternLayout pattern="%d{yyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</File>
</Appenders>
<Loggers>
<Root level="debug">
<AppenderRef ref="ConsoleAppender" />
<AppenderRef ref="FileAppender"/>
</Root>
</Loggers>
</Configuration>
Loading