diff --git a/src/main/java/com/ofek/queue/Message.java b/src/main/java/com/ofek/queue/Message.java index af74fa2..b7116eb 100644 --- a/src/main/java/com/ofek/queue/Message.java +++ b/src/main/java/com/ofek/queue/Message.java @@ -1,37 +1,34 @@ package com.ofek.queue; import java.io.Serializable; -import java.util.UUID; +import java.nio.charset.StandardCharsets; public class Message implements Serializable { - private final String id; - private final String payload; + private final byte[] payload; - // Constructor for new messages (generates new ID) - public Message(String payload) { - this.id = UUID.randomUUID().toString(); - this.payload = payload; + // Constructor for new messages + public Message(byte[] payload) { + this.payload = payload.clone(); } - // Constructor for loading existing messages (preserves original ID) - public Message(String id, String payload) { - this.id = id; - this.payload = payload; + // Constructor for String payload + public Message(String payload) { + this.payload = payload.getBytes(StandardCharsets.UTF_8); } - public String getId() { - return id; + public byte[] getPayload() { + return payload.clone(); } - public String getPayload() { - return payload; + // Get payload as String + public String getPayloadAsString() { + return new String(payload, StandardCharsets.UTF_8); } @Override public String toString() { return "Message{" + - "id='" + id + '\'' + - ", payload='" + payload + '\'' + + ", payload='" + getPayloadAsString() + '\'' + '}'; } } \ No newline at end of file diff --git a/src/main/java/com/ofek/queue/MessageQueue.java b/src/main/java/com/ofek/queue/MessageQueue.java index e617210..475dbb7 100644 --- a/src/main/java/com/ofek/queue/MessageQueue.java +++ b/src/main/java/com/ofek/queue/MessageQueue.java @@ -14,6 +14,7 @@ import java.util.List; import java.util.ArrayList; import java.util.concurrent.TimeUnit; +import java.util.Base64; public class MessageQueue { private final BlockingQueue queue = new LinkedBlockingQueue<>(); @@ -159,7 +160,9 @@ private void saveBatchToFile(List messages) { try (BufferedWriter writer = Files.newBufferedWriter( messagesFilePath, StandardOpenOption.CREATE, StandardOpenOption.APPEND)) { for (Message message : messages) { - writer.write(message.getId() + "|" + message.getPayload()); + // Encode byte array as Base64 for safe text storage + String encodedPayload = Base64.getEncoder().encodeToString(message.getPayload()); + writer.write(encodedPayload); writer.newLine(); } writer.flush(); // Ensure data is written @@ -182,14 +185,12 @@ private void loadMessagesFromFile() { String line; int loadedCount = 0; while ((line = reader.readLine()) != null) { - String[] parts = line.split("\\|", 2); - if (parts.length == 2) { - String id = parts[0]; - String payload = parts[1]; - Message message = new Message(id, payload); - queue.offer(message); - loadedCount++; - } + String encodedPayload = line; + // Decode Base64 back to byte array + byte[] payload = Base64.getDecoder().decode(encodedPayload); + Message message = new Message(payload); + queue.offer(message); + loadedCount++; } if (enableConsoleLogging) { System.out.println("Loaded " + loadedCount + " messages from file"); diff --git a/src/main/java/com/ofek/queue/Producer.java b/src/main/java/com/ofek/queue/Producer.java index 8f5f93f..2abec5d 100644 --- a/src/main/java/com/ofek/queue/Producer.java +++ b/src/main/java/com/ofek/queue/Producer.java @@ -12,4 +12,9 @@ public void produce(String payload) { messageQueue.enqueue(message); } + public void produce(byte[] payload) { + Message message = new Message(payload); + messageQueue.enqueue(message); + } + } diff --git a/src/test/java/com/ofek/queue/ConsumerTest.java b/src/test/java/com/ofek/queue/ConsumerTest.java index 725acea..628c02a 100644 --- a/src/test/java/com/ofek/queue/ConsumerTest.java +++ b/src/test/java/com/ofek/queue/ConsumerTest.java @@ -41,7 +41,7 @@ void testConsumeSingleMessage() { Message message = consumer.poll(); assertNotNull(message); - assertEquals("Test message", message.getPayload()); + assertEquals("Test message", message.getPayloadAsString()); assertEquals(0, messageQueue.size()); } @@ -59,7 +59,7 @@ void testConsumeMultipleMessages() { for (int i = 0; i < messageCount; i++) { Message message = consumer.poll(); assertNotNull(message); - assertEquals("Message " + i, message.getPayload()); + assertEquals("Message " + i, message.getPayloadAsString()); } assertEquals(0, messageQueue.size()); @@ -102,7 +102,7 @@ void testConsumeAsProduced() { Message msg1 = consumer.poll(); assertNotNull(msg1); - assertEquals("Message 1", msg1.getPayload()); + assertEquals("Message 1", msg1.getPayloadAsString()); assertEquals(0, messageQueue.size()); producer.produce("Message 2"); @@ -110,7 +110,7 @@ void testConsumeAsProduced() { Message msg2 = consumer.poll(); assertNotNull(msg2); - assertEquals("Message 2", msg2.getPayload()); + assertEquals("Message 2", msg2.getPayloadAsString()); assertEquals(0, messageQueue.size()); } @@ -126,7 +126,7 @@ void testConsumeLargeMessage() { Message message = consumer.poll(); assertNotNull(message); - assertEquals(largePayload.toString(), message.getPayload()); + assertEquals(largePayload.toString(), message.getPayloadAsString()); assertEquals(0, messageQueue.size()); } @@ -137,7 +137,7 @@ void testConsumeEmptyPayload() { Message message = consumer.poll(); assertNotNull(message); - assertEquals("", message.getPayload()); + assertEquals("", message.getPayloadAsString()); assertEquals(0, messageQueue.size()); } @@ -158,7 +158,7 @@ void testPartialConsumption() { for (int i = 0; i < consumeCount; i++) { Message message = consumer.poll(); assertNotNull(message); - assertEquals("Message " + i, message.getPayload()); + assertEquals("Message " + i, message.getPayloadAsString()); } assertEquals(totalMessages - consumeCount, messageQueue.size()); @@ -167,7 +167,7 @@ void testPartialConsumption() { for (int i = consumeCount; i < totalMessages; i++) { Message message = consumer.poll(); assertNotNull(message); - assertEquals("Message " + i, message.getPayload()); + assertEquals("Message " + i, message.getPayloadAsString()); } assertEquals(0, messageQueue.size()); @@ -181,6 +181,6 @@ void testSpecialCharactersInConsumption() { Message message = consumer.poll(); assertNotNull(message); - assertEquals(specialPayload, message.getPayload()); + assertEquals(specialPayload, message.getPayloadAsString()); } } diff --git a/src/test/java/com/ofek/queue/IntegrationTest.java b/src/test/java/com/ofek/queue/IntegrationTest.java index 8a019a3..6c09049 100644 --- a/src/test/java/com/ofek/queue/IntegrationTest.java +++ b/src/test/java/com/ofek/queue/IntegrationTest.java @@ -52,7 +52,7 @@ void testCompleteWorkflow() throws InterruptedException, IOException { for (int i = 0; i < consumeCount; i++) { Message message = batchConsumer.poll(); assertNotNull(message); - assertEquals("Workflow message " + i, message.getPayload()); + assertEquals("Workflow message " + i, message.getPayloadAsString()); } assertEquals(messageCount - consumeCount, batchQueue.size()); @@ -78,7 +78,7 @@ void testCompleteWorkflow() throws InterruptedException, IOException { for (int i = 0; i < messageCount; i++) { Message message = recoveredConsumer.poll(); assertNotNull(message); - assertEquals("Workflow message " + i, message.getPayload()); + assertEquals("Workflow message " + i, message.getPayloadAsString()); } recoveredQueue.shutdown(); @@ -176,7 +176,7 @@ void testSystemRestartSimulation() throws InterruptedException, IOException { for (int i = 0; i < consumedInPhase1; i++) { Message message = initialConsumer.poll(); assertNotNull(message); - assertEquals("Restart test message " + i, message.getPayload()); + assertEquals("Restart test message " + i, message.getPayloadAsString()); } // Simulate system shutdown @@ -195,7 +195,7 @@ void testSystemRestartSimulation() throws InterruptedException, IOException { for (int i = 0; i < messageCount; i++) { Message message = restartedConsumer.poll(); assertNotNull(message); - assertEquals("Restart test message " + i, message.getPayload()); + assertEquals("Restart test message " + i, message.getPayloadAsString()); } assertEquals(0, restartedQueue.size()); @@ -261,19 +261,19 @@ void testMixedMessageSizes() throws InterruptedException, IOException { // Consume and verify Message msg1 = mixedConsumer.poll(); - assertEquals("Short", msg1.getPayload()); + assertEquals("Short", msg1.getPayloadAsString()); Message msg2 = mixedConsumer.poll(); - assertEquals("Medium length message with some content", msg2.getPayload()); + assertEquals("Medium length message with some content", msg2.getPayloadAsString()); Message msg3 = mixedConsumer.poll(); - assertEquals(longMessage.toString(), msg3.getPayload()); + assertEquals(longMessage.toString(), msg3.getPayloadAsString()); Message msg4 = mixedConsumer.poll(); - assertEquals("", msg4.getPayload()); + assertEquals("", msg4.getPayloadAsString()); Message msg5 = mixedConsumer.poll(); - assertEquals("Final message", msg5.getPayload()); + assertEquals("Final message", msg5.getPayloadAsString()); assertEquals(0, mixedQueue.size()); diff --git a/src/test/java/com/ofek/queue/MessageQueueTest.java b/src/test/java/com/ofek/queue/MessageQueueTest.java index 543e759..febd0dc 100644 --- a/src/test/java/com/ofek/queue/MessageQueueTest.java +++ b/src/test/java/com/ofek/queue/MessageQueueTest.java @@ -10,6 +10,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Base64; import java.util.List; import java.util.Random; import java.util.concurrent.CountDownLatch; @@ -63,9 +64,9 @@ void testBasicEnqueueDequeue() { assertNotNull(second); assertNotNull(third); - assertEquals("First message", first.getPayload()); - assertEquals("Second message", second.getPayload()); - assertEquals("Third message", third.getPayload()); + assertEquals("First message", first.getPayloadAsString()); + assertEquals("Second message", second.getPayloadAsString()); + assertEquals("Third message", third.getPayloadAsString()); assertEquals(0, messageQueue.size()); } @@ -100,7 +101,7 @@ void testRandomNumberOfMessages() throws InterruptedException { Message message = consumer.poll(); if (message != null) { dequeuedCount++; - assertTrue(message.getPayload().startsWith("Random message")); + assertTrue(message.getPayloadAsString().startsWith("Random message")); } } @@ -131,9 +132,8 @@ void testBatchSizePersistence() throws InterruptedException, IOException { // Verify file format for (int i = 0; i < batchSize; i++) { - String line = lines.get(i); - assertTrue(line.contains("Batch message " + i)); - assertTrue(line.contains("|")); // ID|payload format + String decodedMessage = new String(Base64.getDecoder().decode(lines.get(i))); + assertTrue(decodedMessage.contains("Batch message " + i)); } batchQueue.shutdown(); @@ -166,7 +166,8 @@ void testLessThanBatchSize() throws InterruptedException, IOException { for (int i = 0; i < messageCount; i++) { String line = lines.get(i); - assertTrue(line.contains("Small batch message " + i)); + String decodedMessage = new String(Base64.getDecoder().decode(line)); + assertTrue(decodedMessage.contains("Small batch message " + i)); } } @@ -195,7 +196,8 @@ void testGreaterThanBatchSize() throws InterruptedException, IOException { for (int i = 0; i < messageCount; i++) { String line = lines.get(i); - assertTrue(line.contains("Large batch message " + i)); + String decodedMessage = new String(Base64.getDecoder().decode(line)); + assertTrue(decodedMessage.contains("Large batch message " + i)); } } @@ -249,7 +251,7 @@ void testMessageRecovery() throws InterruptedException, IOException { for (int i = 0; i < 5; i++) { Message message = recoveredConsumer.poll(); assertNotNull(message); - assertEquals("Recovery message " + i, message.getPayload()); + assertEquals("Recovery message " + i, message.getPayloadAsString()); } recoveredQueue.shutdown(); @@ -327,35 +329,10 @@ void testLargeMessages() throws InterruptedException { Message message = consumer.poll(); assertNotNull(message); - assertEquals(largePayload.toString(), message.getPayload()); + assertEquals(largePayload.toString(), message.getPayloadAsString()); assertEquals(0, messageQueue.size()); } - @Test - @DisplayName("Should maintain message uniqueness with UUID") - void testMessageIdUniqueness() { - producer.produce("Same payload"); - producer.produce("Same payload"); - producer.produce("Same payload"); - - Message first = consumer.poll(); - Message second = consumer.poll(); - Message third = consumer.poll(); - - assertNotNull(first); - assertNotNull(second); - assertNotNull(third); - - // All should have same payload but different IDs - assertEquals("Same payload", first.getPayload()); - assertEquals("Same payload", second.getPayload()); - assertEquals("Same payload", third.getPayload()); - - assertNotEquals(first.getId(), second.getId()); - assertNotEquals(second.getId(), third.getId()); - assertNotEquals(first.getId(), third.getId()); - } - @Test @DisplayName("Should handle queue operations during shutdown") void testOperationsDuringShutdown() throws InterruptedException { @@ -369,14 +346,15 @@ void testOperationsDuringShutdown() throws InterruptedException { // Operations should still work until the queue is fully shut down Message message = consumer.poll(); assertNotNull(message); - assertEquals("Before shutdown", message.getPayload()); + assertEquals("Before shutdown", message.getPayloadAsString()); // check if message saved to file assertTrue(Files.exists(testFile)); try { List lines = Files.readAllLines(testFile); assertEquals(1, lines.size()); - assertTrue(lines.get(0).contains("Before shutdown")); + String decodedMessage = new String(Base64.getDecoder().decode(lines.get(0))); + assertTrue(decodedMessage.contains("Before shutdown")); } catch (IOException e) { fail("Failed to read from file after shutdown: " + e.getMessage()); } diff --git a/src/test/java/com/ofek/queue/MessageTest.java b/src/test/java/com/ofek/queue/MessageTest.java index d783838..47741b3 100644 --- a/src/test/java/com/ofek/queue/MessageTest.java +++ b/src/test/java/com/ofek/queue/MessageTest.java @@ -3,51 +3,18 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.DisplayName; import static org.junit.jupiter.api.Assertions.*; +import java.nio.charset.StandardCharsets; @DisplayName("Message Tests") public class MessageTest { - @Test - @DisplayName("Should create message with auto-generated ID") - void testMessageCreationWithAutoId() { - String payload = "Test payload"; - Message message = new Message(payload); - - assertNotNull(message.getId()); - assertFalse(message.getId().isEmpty()); - assertEquals(payload, message.getPayload()); - } - - @Test - @DisplayName("Should create message with specified ID") - void testMessageCreationWithSpecificId() { - String id = "custom-id-123"; - String payload = "Test payload"; - Message message = new Message(id, payload); - - assertEquals(id, message.getId()); - assertEquals(payload, message.getPayload()); - } - - @Test - @DisplayName("Should generate unique IDs for different messages") - void testUniqueIdGeneration() { - Message message1 = new Message("Same payload"); - Message message2 = new Message("Same payload"); - Message message3 = new Message("Same payload"); - - assertNotEquals(message1.getId(), message2.getId()); - assertNotEquals(message2.getId(), message3.getId()); - assertNotEquals(message1.getId(), message3.getId()); - } - @Test @DisplayName("Should handle empty payload") void testEmptyPayload() { Message message = new Message(""); - assertNotNull(message.getId()); - assertEquals("", message.getPayload()); + assertEquals("", message.getPayloadAsString()); + assertArrayEquals("".getBytes(StandardCharsets.UTF_8), message.getPayload()); } @Test @@ -60,19 +27,17 @@ void testLongPayload() { Message message = new Message(longPayload.toString()); - assertNotNull(message.getId()); - assertEquals(longPayload.toString(), message.getPayload()); + assertEquals(longPayload.toString(), message.getPayloadAsString()); + assertArrayEquals(longPayload.toString().getBytes(StandardCharsets.UTF_8), message.getPayload()); } @Test @DisplayName("Should have proper toString representation") void testToString() { - Message message = new Message("test-id", "test payload"); + Message message = new Message("test payload"); String toString = message.toString(); - assertTrue(toString.contains("test-id")); assertTrue(toString.contains("test payload")); - assertTrue(toString.contains("Message{")); } @Test @@ -81,7 +46,8 @@ void testSpecialCharacters() { String specialPayload = "Special chars: !@#$%^&*()_+[]{}|;:,.<>?`~"; Message message = new Message(specialPayload); - assertEquals(specialPayload, message.getPayload()); + assertEquals(specialPayload, message.getPayloadAsString()); + assertArrayEquals(specialPayload.getBytes(StandardCharsets.UTF_8), message.getPayload()); } @Test @@ -90,15 +56,98 @@ void testNewlinesAndTabs() { String payload = "Line 1\nLine 2\tTabbed content\r\nWindows line ending"; Message message = new Message(payload); - assertEquals(payload, message.getPayload()); + assertEquals(payload, message.getPayloadAsString()); + assertArrayEquals(payload.getBytes(StandardCharsets.UTF_8), message.getPayload()); } @Test - @DisplayName("Should handle pipe character in payload (potential file format conflict)") - void testPipeCharacterInPayload() { - String payload = "This|has|pipe|characters"; - Message message = new Message(payload); + @DisplayName("Should handle byte array constructor") + void testByteArrayConstructor() { + byte[] originalBytes = "Hello World".getBytes(StandardCharsets.UTF_8); + Message message = new Message(originalBytes); + + assertArrayEquals(originalBytes, message.getPayload()); + assertEquals("Hello World", message.getPayloadAsString()); + } + + @Test + @DisplayName("Should handle empty byte array") + void testEmptyByteArray() { + byte[] emptyBytes = new byte[0]; + Message message = new Message(emptyBytes); + + assertArrayEquals(emptyBytes, message.getPayload()); + assertEquals("", message.getPayloadAsString()); + } + + @Test + @DisplayName("Should handle large byte array") + void testLargeByteArray() { + byte[] largeBytes = new byte[50000]; + for (int i = 0; i < largeBytes.length; i++) { + largeBytes[i] = (byte) (i % 256); + } + + Message message = new Message(largeBytes); + + assertArrayEquals(largeBytes, message.getPayload()); + assertEquals(50000, message.getPayload().length); + } + + @Test + @DisplayName("Should handle UTF-8 encoded bytes correctly") + void testUtf8EncodedBytes() { + String unicodeText = "Hello 世界 🌍 مرحبا"; + byte[] utf8Bytes = unicodeText.getBytes(StandardCharsets.UTF_8); + + Message message = new Message(utf8Bytes); + + assertArrayEquals(utf8Bytes, message.getPayload()); + assertEquals(unicodeText, message.getPayloadAsString()); + } + + @Test + @DisplayName("Should handle control characters in byte array") + void testControlCharactersInByteArray() { + byte[] controlChars = { + 0, // NULL + 9, // TAB + 10, // LF (newline) + 13, // CR (carriage return) + 27, // ESC + 127 // DEL + }; + + Message message = new Message(controlChars); + + assertArrayEquals(controlChars, message.getPayload()); + // Should not crash when converting to string + String result = message.getPayloadAsString(); + assertNotNull(result); + } + + @Test + @DisplayName("Should preserve message immutability") + void testMessageImmutability() { + byte[] originalBytes = "test".getBytes(StandardCharsets.UTF_8); + Message message = new Message(originalBytes); + + // Modify the original array + originalBytes[0] = (byte) 'X'; + + // Message should not be affected + assertNotEquals(originalBytes, message.getPayload()); + } + + @Test + @DisplayName("Should handle byte array vs string constructor equivalence") + void testByteArrayStringEquivalence() { + String testString = "Test message with special chars: àáâãäå"; + + Message messageFromString = new Message(testString); + Message messageFromBytes = new Message(testString.getBytes(StandardCharsets.UTF_8)); - assertEquals(payload, message.getPayload()); + assertArrayEquals(messageFromString.getPayload(), messageFromBytes.getPayload()); + assertEquals(messageFromString.getPayloadAsString(), messageFromBytes.getPayloadAsString()); } } diff --git a/src/test/java/com/ofek/queue/ProducerTest.java b/src/test/java/com/ofek/queue/ProducerTest.java index 2bbdc9c..7f8bf1e 100644 --- a/src/test/java/com/ofek/queue/ProducerTest.java +++ b/src/test/java/com/ofek/queue/ProducerTest.java @@ -41,7 +41,7 @@ void testProduceSingleMessage() { Message message = messageQueue.dequeue(); assertNotNull(message); - assertEquals("Test message", message.getPayload()); + assertEquals("Test message", message.getPayloadAsString()); } @Test @@ -58,7 +58,7 @@ void testProduceMultipleMessages() { for (int i = 0; i < messageCount; i++) { Message message = messageQueue.dequeue(); assertNotNull(message); - assertEquals("Message " + i, message.getPayload()); + assertEquals("Message " + i, message.getPayloadAsString()); } } @@ -71,7 +71,7 @@ void testProduceEmptyPayload() { Message message = messageQueue.dequeue(); assertNotNull(message); - assertEquals("", message.getPayload()); + assertEquals("", message.getPayloadAsString()); } @Test @@ -88,27 +88,7 @@ void testProduceLargePayload() { Message message = messageQueue.dequeue(); assertNotNull(message); - assertEquals(largePayload.toString(), message.getPayload()); - } - - @Test - @DisplayName("Should generate unique message IDs") - void testUniqueMessageIds() { - producer.produce("Same payload"); - producer.produce("Same payload"); - producer.produce("Same payload"); - - Message msg1 = messageQueue.dequeue(); - Message msg2 = messageQueue.dequeue(); - Message msg3 = messageQueue.dequeue(); - - assertNotNull(msg1); - assertNotNull(msg2); - assertNotNull(msg3); - - assertNotEquals(msg1.getId(), msg2.getId()); - assertNotEquals(msg2.getId(), msg3.getId()); - assertNotEquals(msg1.getId(), msg3.getId()); + assertEquals(largePayload.toString(), message.getPayloadAsString()); } @Test @@ -121,6 +101,6 @@ void testSpecialCharactersInPayload() { Message message = messageQueue.dequeue(); assertNotNull(message); - assertEquals(specialPayload, message.getPayload()); + assertEquals(specialPayload, message.getPayloadAsString()); } }