forked from shiuu/queue-service
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathRedisQueueService.java
More file actions
143 lines (114 loc) · 5.02 KB
/
RedisQueueService.java
File metadata and controls
143 lines (114 loc) · 5.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package com.example;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
public class RedisQueueService implements QueueService {
private final String redisEndpointUrl;
private final String redisApiToken;
private static final String REDIS_ZADD_COMMAND = "zadd";
private static final String REDIS_ZPOPMAX_COMMAND = "zpopmax";
private static final String REDIS_REV_RANGE_COMMAND = "zrevrange";
RedisQueueService() {
String propFileName = "config.properties";
Properties confInfo = new Properties();
try (InputStream inStream = getClass().getClassLoader().getResourceAsStream(propFileName)) {
confInfo.load(inStream);
} catch (IOException e) {
e.printStackTrace();
}
this.redisEndpointUrl = confInfo.getProperty("redisEndpointUrl");
this.redisApiToken = confInfo.getProperty("redisApiToken");
}
@Override
public void push(String queueUrl, String msgBody) {
int priority = extractPriorityFromJson(msgBody);
makePostRequest(redisEndpointUrl + "/"+ REDIS_ZADD_COMMAND + "/" + queueUrl + "/" + priority, msgBody);
}
@Override
public Message pull(String queueUrl) {
String redisResponse = makeGetRequest(redisEndpointUrl + "/"+ REDIS_REV_RANGE_COMMAND+ "/"+ queueUrl+"/0/0");
return createMessageFromRedisResponse(redisResponse);
}
// for now, This deletes peek element from redis sorted set.
// will try to find a solution if we can delete using receipt id
@Override
public void delete(String queueUrl, String receiptId) {
makeGetRequest(redisEndpointUrl + "/"+ REDIS_ZPOPMAX_COMMAND+ "/"+ queueUrl);
}
private int extractPriorityFromJson(String msgBody) {
try {
ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode = mapper.readTree(msgBody);
JsonNode priorityNode = rootNode.get("priority");
if (priorityNode != null && priorityNode.isInt()) {
return priorityNode.intValue();
}
} catch (Exception e) {
e.printStackTrace();
}
return 0; // Default priority if not found or error occurred
}
// this method creates a Message object from response received from redis.
private Message createMessageFromRedisResponse(String response) {
try {
ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode = mapper.readTree(response);
JsonNode resultNode = rootNode.get("result");
if (resultNode != null && resultNode.isArray() && resultNode.size() > 0) {
JsonNode messageNode = mapper.readTree(resultNode.get(0).asText());
int priority = messageNode.has("priority") ? messageNode.get("priority").asInt() : 0;
String messageBody = messageNode.has("content") ? messageNode.get("content").asText() : "";
return new Message(messageBody, UUID.randomUUID().toString(), priority);
} else {
// Queue is empty or no valid message found
return null;
}
} catch (IOException e) {
throw new RuntimeException("Failed to parse JSON response", e);
}
}
private String makeGetRequest(String apiUrl) {
CloseableHttpClient client = HttpClients.createDefault();
HttpGet httpGet = new HttpGet(apiUrl);
httpGet.addHeader("Content-Type", "application/json");
httpGet.addHeader("Authorization", String.format("Bearer %s", redisApiToken));
try {
CloseableHttpResponse response = client.execute(httpGet);
if (response.getStatusLine().getStatusCode() != 200) {
throw new RuntimeException(String.format("Redis Api failed with status %s", response.getStatusLine().getStatusCode()));
}
return EntityUtils.toString(response.getEntity());
} catch (Exception e) {
throw new RuntimeException("Unable to call Redis api", e);
}
}
private String makePostRequest(String apiUrl, String msgBody) {
CloseableHttpClient client = HttpClients.createDefault();
HttpPost httpPost = new HttpPost(apiUrl);
try {
httpPost.setEntity(new StringEntity(msgBody));
} catch (Exception e) {
throw new RuntimeException("Unable to call Redis api", e);
}
httpPost.addHeader("Content-Type", "application/json");
httpPost.addHeader("Authorization", String.format("Bearer %s", redisApiToken));
try {
CloseableHttpResponse response = client.execute(httpPost);
if (response.getStatusLine().getStatusCode() != 200) {
throw new RuntimeException(String.format("Redis Api failed with status %s", response.getStatusLine().getStatusCode()));
}
return EntityUtils.toString(response.getEntity());
} catch (Exception e) {
throw new RuntimeException("Unable to call Redis api", e);
}
}
}