Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 14 additions & 17 deletions src/main/java/com/ofek/queue/Message.java
Original file line number Diff line number Diff line change
@@ -1,37 +1,34 @@
package com.ofek.queue;

import java.io.Serializable;
import java.util.UUID;
import java.nio.charset.StandardCharsets;

public class Message implements Serializable {
private final String id;
private final String payload;
private final byte[] payload;

// Constructor for new messages (generates new ID)
public Message(String payload) {
this.id = UUID.randomUUID().toString();
this.payload = payload;
// Constructor for new messages
public Message(byte[] payload) {
this.payload = payload.clone();
}

// Constructor for loading existing messages (preserves original ID)
public Message(String id, String payload) {
this.id = id;
this.payload = payload;
// Constructor for String payload
public Message(String payload) {
this.payload = payload.getBytes(StandardCharsets.UTF_8);
}

public String getId() {
return id;
public byte[] getPayload() {
return payload.clone();
}

public String getPayload() {
return payload;
// Get payload as String
public String getPayloadAsString() {
return new String(payload, StandardCharsets.UTF_8);
}

@Override
public String toString() {
return "Message{" +
"id='" + id + '\'' +
", payload='" + payload + '\'' +
", payload='" + getPayloadAsString() + '\'' +
'}';
}
}
19 changes: 10 additions & 9 deletions src/main/java/com/ofek/queue/MessageQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import java.util.Base64;

public class MessageQueue {
private final BlockingQueue<Message> queue = new LinkedBlockingQueue<>();
Expand Down Expand Up @@ -159,7 +160,9 @@ private void saveBatchToFile(List<Message> messages) {
try (BufferedWriter writer = Files.newBufferedWriter(
messagesFilePath, StandardOpenOption.CREATE, StandardOpenOption.APPEND)) {
for (Message message : messages) {
writer.write(message.getId() + "|" + message.getPayload());
// Encode byte array as Base64 for safe text storage
String encodedPayload = Base64.getEncoder().encodeToString(message.getPayload());
writer.write(encodedPayload);
writer.newLine();
}
writer.flush(); // Ensure data is written
Expand All @@ -182,14 +185,12 @@ private void loadMessagesFromFile() {
String line;
int loadedCount = 0;
while ((line = reader.readLine()) != null) {
String[] parts = line.split("\\|", 2);
if (parts.length == 2) {
String id = parts[0];
String payload = parts[1];
Message message = new Message(id, payload);
queue.offer(message);
loadedCount++;
}
String encodedPayload = line;
// Decode Base64 back to byte array
byte[] payload = Base64.getDecoder().decode(encodedPayload);
Message message = new Message(payload);
queue.offer(message);
loadedCount++;
}
Copy link

Copilot AI Jul 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an unmatched closing brace. The if statement on line 189 that checks parts.length == 2 was removed, but its closing brace remains, creating a syntax error.

Copilot uses AI. Check for mistakes.
if (enableConsoleLogging) {
System.out.println("Loaded " + loadedCount + " messages from file");
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/com/ofek/queue/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,9 @@ public void produce(String payload) {
messageQueue.enqueue(message);
}

public void produce(byte[] payload) {
Message message = new Message(payload);
messageQueue.enqueue(message);
}

}
18 changes: 9 additions & 9 deletions src/test/java/com/ofek/queue/ConsumerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ void testConsumeSingleMessage() {

Message message = consumer.poll();
assertNotNull(message);
assertEquals("Test message", message.getPayload());
assertEquals("Test message", message.getPayloadAsString());
assertEquals(0, messageQueue.size());
}

Expand All @@ -59,7 +59,7 @@ void testConsumeMultipleMessages() {
for (int i = 0; i < messageCount; i++) {
Message message = consumer.poll();
assertNotNull(message);
assertEquals("Message " + i, message.getPayload());
assertEquals("Message " + i, message.getPayloadAsString());
}

assertEquals(0, messageQueue.size());
Expand Down Expand Up @@ -102,15 +102,15 @@ void testConsumeAsProduced() {

Message msg1 = consumer.poll();
assertNotNull(msg1);
assertEquals("Message 1", msg1.getPayload());
assertEquals("Message 1", msg1.getPayloadAsString());
assertEquals(0, messageQueue.size());

producer.produce("Message 2");
assertEquals(1, messageQueue.size());

Message msg2 = consumer.poll();
assertNotNull(msg2);
assertEquals("Message 2", msg2.getPayload());
assertEquals("Message 2", msg2.getPayloadAsString());
assertEquals(0, messageQueue.size());
}

Expand All @@ -126,7 +126,7 @@ void testConsumeLargeMessage() {

Message message = consumer.poll();
assertNotNull(message);
assertEquals(largePayload.toString(), message.getPayload());
assertEquals(largePayload.toString(), message.getPayloadAsString());
assertEquals(0, messageQueue.size());
}

Expand All @@ -137,7 +137,7 @@ void testConsumeEmptyPayload() {

Message message = consumer.poll();
assertNotNull(message);
assertEquals("", message.getPayload());
assertEquals("", message.getPayloadAsString());
assertEquals(0, messageQueue.size());
}

Expand All @@ -158,7 +158,7 @@ void testPartialConsumption() {
for (int i = 0; i < consumeCount; i++) {
Message message = consumer.poll();
assertNotNull(message);
assertEquals("Message " + i, message.getPayload());
assertEquals("Message " + i, message.getPayloadAsString());
}

assertEquals(totalMessages - consumeCount, messageQueue.size());
Expand All @@ -167,7 +167,7 @@ void testPartialConsumption() {
for (int i = consumeCount; i < totalMessages; i++) {
Message message = consumer.poll();
assertNotNull(message);
assertEquals("Message " + i, message.getPayload());
assertEquals("Message " + i, message.getPayloadAsString());
}

assertEquals(0, messageQueue.size());
Expand All @@ -181,6 +181,6 @@ void testSpecialCharactersInConsumption() {

Message message = consumer.poll();
assertNotNull(message);
assertEquals(specialPayload, message.getPayload());
assertEquals(specialPayload, message.getPayloadAsString());
}
}
18 changes: 9 additions & 9 deletions src/test/java/com/ofek/queue/IntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ void testCompleteWorkflow() throws InterruptedException, IOException {
for (int i = 0; i < consumeCount; i++) {
Message message = batchConsumer.poll();
assertNotNull(message);
assertEquals("Workflow message " + i, message.getPayload());
assertEquals("Workflow message " + i, message.getPayloadAsString());
}

assertEquals(messageCount - consumeCount, batchQueue.size());
Expand All @@ -78,7 +78,7 @@ void testCompleteWorkflow() throws InterruptedException, IOException {
for (int i = 0; i < messageCount; i++) {
Message message = recoveredConsumer.poll();
assertNotNull(message);
assertEquals("Workflow message " + i, message.getPayload());
assertEquals("Workflow message " + i, message.getPayloadAsString());
}

recoveredQueue.shutdown();
Expand Down Expand Up @@ -176,7 +176,7 @@ void testSystemRestartSimulation() throws InterruptedException, IOException {
for (int i = 0; i < consumedInPhase1; i++) {
Message message = initialConsumer.poll();
assertNotNull(message);
assertEquals("Restart test message " + i, message.getPayload());
assertEquals("Restart test message " + i, message.getPayloadAsString());
}

// Simulate system shutdown
Expand All @@ -195,7 +195,7 @@ void testSystemRestartSimulation() throws InterruptedException, IOException {
for (int i = 0; i < messageCount; i++) {
Message message = restartedConsumer.poll();
assertNotNull(message);
assertEquals("Restart test message " + i, message.getPayload());
assertEquals("Restart test message " + i, message.getPayloadAsString());
}

assertEquals(0, restartedQueue.size());
Expand Down Expand Up @@ -261,19 +261,19 @@ void testMixedMessageSizes() throws InterruptedException, IOException {

// Consume and verify
Message msg1 = mixedConsumer.poll();
assertEquals("Short", msg1.getPayload());
assertEquals("Short", msg1.getPayloadAsString());

Message msg2 = mixedConsumer.poll();
assertEquals("Medium length message with some content", msg2.getPayload());
assertEquals("Medium length message with some content", msg2.getPayloadAsString());

Message msg3 = mixedConsumer.poll();
assertEquals(longMessage.toString(), msg3.getPayload());
assertEquals(longMessage.toString(), msg3.getPayloadAsString());

Message msg4 = mixedConsumer.poll();
assertEquals("", msg4.getPayload());
assertEquals("", msg4.getPayloadAsString());

Message msg5 = mixedConsumer.poll();
assertEquals("Final message", msg5.getPayload());
assertEquals("Final message", msg5.getPayloadAsString());

assertEquals(0, mixedQueue.size());

Expand Down
54 changes: 16 additions & 38 deletions src/test/java/com/ofek/queue/MessageQueueTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Base64;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -63,9 +64,9 @@ void testBasicEnqueueDequeue() {
assertNotNull(second);
assertNotNull(third);

assertEquals("First message", first.getPayload());
assertEquals("Second message", second.getPayload());
assertEquals("Third message", third.getPayload());
assertEquals("First message", first.getPayloadAsString());
assertEquals("Second message", second.getPayloadAsString());
assertEquals("Third message", third.getPayloadAsString());

assertEquals(0, messageQueue.size());
}
Expand Down Expand Up @@ -100,7 +101,7 @@ void testRandomNumberOfMessages() throws InterruptedException {
Message message = consumer.poll();
if (message != null) {
dequeuedCount++;
assertTrue(message.getPayload().startsWith("Random message"));
assertTrue(message.getPayloadAsString().startsWith("Random message"));
}
}

Expand Down Expand Up @@ -131,9 +132,8 @@ void testBatchSizePersistence() throws InterruptedException, IOException {

// Verify file format
for (int i = 0; i < batchSize; i++) {
String line = lines.get(i);
assertTrue(line.contains("Batch message " + i));
assertTrue(line.contains("|")); // ID|payload format
String decodedMessage = new String(Base64.getDecoder().decode(lines.get(i)));
Copy link

Copilot AI Jul 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Base64 decoding should specify the character encoding explicitly. Use new String(Base64.getDecoder().decode(lines.get(i)), StandardCharsets.UTF_8) to ensure consistent encoding behavior across different platforms.

Copilot uses AI. Check for mistakes.
assertTrue(decodedMessage.contains("Batch message " + i));
}

batchQueue.shutdown();
Expand Down Expand Up @@ -166,7 +166,8 @@ void testLessThanBatchSize() throws InterruptedException, IOException {

for (int i = 0; i < messageCount; i++) {
String line = lines.get(i);
assertTrue(line.contains("Small batch message " + i));
String decodedMessage = new String(Base64.getDecoder().decode(line));
Copy link

Copilot AI Jul 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Base64 decoding should specify the character encoding explicitly. Use new String(Base64.getDecoder().decode(line), StandardCharsets.UTF_8) to ensure consistent encoding behavior across different platforms.

Suggested change
String decodedMessage = new String(Base64.getDecoder().decode(line));
String decodedMessage = new String(Base64.getDecoder().decode(line), StandardCharsets.UTF_8);

Copilot uses AI. Check for mistakes.
assertTrue(decodedMessage.contains("Small batch message " + i));
}
}

Expand Down Expand Up @@ -195,7 +196,8 @@ void testGreaterThanBatchSize() throws InterruptedException, IOException {

for (int i = 0; i < messageCount; i++) {
String line = lines.get(i);
assertTrue(line.contains("Large batch message " + i));
String decodedMessage = new String(Base64.getDecoder().decode(line));
Copy link

Copilot AI Jul 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Base64 decoding should specify the character encoding explicitly. Use new String(Base64.getDecoder().decode(line), StandardCharsets.UTF_8) to ensure consistent encoding behavior across different platforms.

Suggested change
String decodedMessage = new String(Base64.getDecoder().decode(line));
String decodedMessage = new String(Base64.getDecoder().decode(line), StandardCharsets.UTF_8);

Copilot uses AI. Check for mistakes.
assertTrue(decodedMessage.contains("Large batch message " + i));
}
}

Expand Down Expand Up @@ -249,7 +251,7 @@ void testMessageRecovery() throws InterruptedException, IOException {
for (int i = 0; i < 5; i++) {
Message message = recoveredConsumer.poll();
assertNotNull(message);
assertEquals("Recovery message " + i, message.getPayload());
assertEquals("Recovery message " + i, message.getPayloadAsString());
}

recoveredQueue.shutdown();
Expand Down Expand Up @@ -327,35 +329,10 @@ void testLargeMessages() throws InterruptedException {

Message message = consumer.poll();
assertNotNull(message);
assertEquals(largePayload.toString(), message.getPayload());
assertEquals(largePayload.toString(), message.getPayloadAsString());
assertEquals(0, messageQueue.size());
}

@Test
@DisplayName("Should maintain message uniqueness with UUID")
void testMessageIdUniqueness() {
producer.produce("Same payload");
producer.produce("Same payload");
producer.produce("Same payload");

Message first = consumer.poll();
Message second = consumer.poll();
Message third = consumer.poll();

assertNotNull(first);
assertNotNull(second);
assertNotNull(third);

// All should have same payload but different IDs
assertEquals("Same payload", first.getPayload());
assertEquals("Same payload", second.getPayload());
assertEquals("Same payload", third.getPayload());

assertNotEquals(first.getId(), second.getId());
assertNotEquals(second.getId(), third.getId());
assertNotEquals(first.getId(), third.getId());
}

@Test
@DisplayName("Should handle queue operations during shutdown")
void testOperationsDuringShutdown() throws InterruptedException {
Expand All @@ -369,14 +346,15 @@ void testOperationsDuringShutdown() throws InterruptedException {
// Operations should still work until the queue is fully shut down
Message message = consumer.poll();
assertNotNull(message);
assertEquals("Before shutdown", message.getPayload());
assertEquals("Before shutdown", message.getPayloadAsString());

// check if message saved to file
assertTrue(Files.exists(testFile));
try {
List<String> lines = Files.readAllLines(testFile);
assertEquals(1, lines.size());
assertTrue(lines.get(0).contains("Before shutdown"));
String decodedMessage = new String(Base64.getDecoder().decode(lines.get(0)));
Copy link

Copilot AI Jul 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Base64 decoding should specify the character encoding explicitly. Use new String(Base64.getDecoder().decode(lines.get(0)), StandardCharsets.UTF_8) to ensure consistent encoding behavior across different platforms.

Suggested change
String decodedMessage = new String(Base64.getDecoder().decode(lines.get(0)));
String decodedMessage = new String(Base64.getDecoder().decode(lines.get(0)), StandardCharsets.UTF_8);

Copilot uses AI. Check for mistakes.
assertTrue(decodedMessage.contains("Before shutdown"));
} catch (IOException e) {
fail("Failed to read from file after shutdown: " + e.getMessage());
}
Expand Down
Loading