diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..86654b6 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,45 @@ +name: CI + +on: + push: + branches: + - main + paths-ignore: + - "*.md" + pull_request: + branches: + - main + paths-ignore: + - "*.md" + +permissions: + contents: read + +jobs: + test: + runs-on: ubuntu-latest + + strategy: + matrix: + java-version: [21] + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up JDK ${{ matrix.java-version }} + uses: actions/setup-java@v4 + with: + java-version: ${{ matrix.java-version }} + distribution: "temurin" + cache: maven + + - name: Cache Maven dependencies + uses: actions/cache@v4 + with: + path: ~/.m2/repository + key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} + restore-keys: ${{ runner.os }}-m2 + + - name: Run tests + run: mvn clean test diff --git a/README.md b/README.md index 4ab4ac1..6f9ac72 100644 --- a/README.md +++ b/README.md @@ -4,10 +4,12 @@ A simple Java-based message queue implementation with file storage. ## Features -- **Thread-safe message queue** using `BlockingQueue` +- **Thread-safe message queue** using `BlockingQueue` with read-write locks - **Unique message IDs** - automatically generated UUIDs for each message -- **File-based storage** - messages survive application restarts +- **Async batch persistence** - high-performance file storage with configurable batch sizes +- **File-based storage** - messages survive application restarts with automatic loading - **Producer/Consumer pattern** - separate classes for producing and consuming messages +- **Configurable settings** - customizable batch size and logging options ## Classes @@ -25,11 +27,15 @@ Represents a message with: The main messages queue implementation with: -- `enqueue(Message message)` - adds message to queue and saves to file +- `enqueue(Message message)` - adds message to queue with async persistence - `dequeue()` - removes and returns message from queue - `size()` - returns current queue size +- `flush()` - forces all pending messages to be written to disk +- `shutdown()` - gracefully shuts down the queue, ensuring all messages are persisted +- **Async batch persistence** - messages are written to file in configurable batches for optimal performance +- **Configurable settings** - batch size and console logging can be customized - Automatic file loading on startup -- Saves messages to a specified file +- Thread-safe operations with read-write locks ### [`Producer`](src/main/java/com/ofek/queue/Producer.java) @@ -51,10 +57,15 @@ Demo application showing producer/consumer usage pattern ## Usage +### Basic Usage + ```java -// Create a message queue with file storage +// Create a message queue with default settings (batch size: 100, no console logging) MessageQueue queue = new MessageQueue("messages.log"); +// Or create with custom configuration +MessageQueue queue = new MessageQueue("messages.log", 50, true); // batch size: 50, console logging enabled + // Create producer and consumer Producer producer = new Producer(queue); Consumer consumer = new Consumer(queue); @@ -69,8 +80,31 @@ System.out.println("Delivered: " + delivered); // Check queue size int size = queue.size(); + +// Gracefully shutdown (automatically flushes pending messages) +queue.shutdown(); ``` +### Configuration Options + +The MessageQueue constructor accepts the following parameters: + +- **filename** (String): The file path where messages will be persisted +- **batchSize** (int, optional): Number of messages to batch before writing to file (default: 100) +- **enableConsoleLogging** (boolean, optional): Whether to log enqueue operations to console (default: false) + +### Async Batch Persistence + +The MessageQueue implements an efficient async batch persistence mechanism: + +- **Non-blocking enqueue**: Messages are added to the queue immediately without waiting for disk I/O +- **Background persistence**: A dedicated thread handles writing messages to file +- **Batch optimization**: Messages are written in configurable batches to reduce file I/O overhead +- **Automatic flushing**: The system ensures data integrity by flushing batches to disk +- **Graceful shutdown**: Call `shutdown()` to ensure all pending messages are persisted (includes automatic flush) + +This design provides high throughput for message enqueuing while maintaining data durability. + ## Building and Running This is a Maven project. To build and run: @@ -86,6 +120,59 @@ mvn exec:java -Dexec.mainClass="com.ofek.queue.App" mvn test ``` +## Performance Testing + +This project includes comprehensive performance testing tools to measure queue performance under various conditions. + +### Quick Performance Test + +Run the automated performance test script: + +```bash +# Make script executable (first time only) +chmod +x run_performance_test.sh + +# Run performance tests +./run_performance_test.sh +``` + +This will: + +- Test with 1K, 10K, 100K, and 1M messages +- Run both single-threaded and multi-threaded tests +- Generate a detailed performance report (`performance_report.log`) + +### Manual Performance Tests + +#### Basic Performance Test + +```bash +# Compile test classes +mvn test-compile + +# Run basic performance test +mvn exec:java -Dexec.mainClass="com.ofek.queue.PerformanceTest" -Dexec.classpathScope="test" +``` + +### Performance Test Outputs + +The performance tests generate multiple output files: + +**`performance_report.log`** - Human-readable performance report with: + +- System information (Java version, OS, memory) +- Test results for different message counts +- Single-threaded and multi-threaded performance metrics +- Memory usage information + +### Sample Performance Results + +Typical results on a modern system: + +``` +Messages: 100,000 | Enqueue: 2,450.32 ms | Dequeue: 1,234.56 ms | Total: 3,684.88 ms | Throughput: 27,140.23 msg/sec +``` + ## File Format Messages are stored in pipe-delimited format: diff --git a/pom.xml b/pom.xml index 7f9e136..16884f1 100644 --- a/pom.xml +++ b/pom.xml @@ -16,10 +16,39 @@ - junit - junit - 3.8.1 + org.junit.jupiter + junit-jupiter + 5.13.3 + test + + + + org.mockito + mockito-core + 5.18.0 test + + + + + org.apache.maven.plugins + maven-surefire-plugin + 3.5.3 + + all + 8 + + + junit.jupiter.execution.parallel.enabled=true + junit.jupiter.execution.parallel.mode.default=concurrent + junit.jupiter.execution.parallel.mode.classes.default=concurrent + junit.jupiter.execution.parallel.mode.methods.default=concurrent + + + + + + diff --git a/run_performance_test.sh b/run_performance_test.sh new file mode 100755 index 0000000..ccb7adb --- /dev/null +++ b/run_performance_test.sh @@ -0,0 +1,38 @@ +#!/bin/bash + +echo "Message Queue Performance Test" +echo "==============================" +echo + +# Optimized GC, Heap and others Java settings +export MAVEN_OPTS="-Xms2g -Xmx4g -XX:+UseG1GC -XX:MaxGCPauseMillis=50 -XX:G1HeapRegionSize=2m -XX:+G1UseAdaptiveIHOP -XX:G1MixedGCCountTarget=4 -Xlog:gc*:logs/gc.log" + +# Clean and compile the project +echo "Compiling project..." +mvn clean compile test-compile -q + +if [ $? -ne 0 ]; then + echo "Error: Compilation failed" + exit 1 +fi + +echo "Compilation successful!" +echo + +# Run the performance test +echo "Starting performance test..." +echo + +# Run the test +mvn exec:java \ +-Dexec.mainClass="com.ofek.queue.PerformanceTest" \ +-Dexec.classpathScope="test" + +if [ $? -eq 0 ]; then + echo + echo "Performance test completed successfully!" + echo +else + echo "Error: Performance test failed" + exit 1 +fi diff --git a/src/main/java/com/ofek/queue/App.java b/src/main/java/com/ofek/queue/App.java index 8103200..b92dc2a 100644 --- a/src/main/java/com/ofek/queue/App.java +++ b/src/main/java/com/ofek/queue/App.java @@ -2,23 +2,28 @@ public class App { public static void main(String[] args) { - MessageQueue queue = new MessageQueue("messages.log"); + MessageQueue queue = new MessageQueue("logs/messages.log", 100, false); // Create producer and consumer instances Producer producer = new Producer(queue); Consumer consumer = new Consumer(queue); - producer.produce("Hello, World!"); - producer.produce("Second message"); - producer.produce("Third message"); + System.out.println("Starting message production..."); + for (int i = 1; i <= 502; i++) { + producer.produce("Hello, World!" + i); + } + + System.out.println("Queue size: " + queue.size()); // Loop to poll all messages - System.out.println("Polling messages:"); + System.out.println("Polling messages..."); while (queue.size() > 0) { - Message delivered = consumer.poll(); - System.out.println("Delivered: " + delivered); + consumer.poll(); } - System.out.println("Queue size: " + queue.size()); + System.out.println("Queue size after pooling: " + queue.size()); + + queue.shutdown(); + System.out.println("Queue shutdown complete."); } } diff --git a/src/main/java/com/ofek/queue/MessageQueue.java b/src/main/java/com/ofek/queue/MessageQueue.java index 2c9b1ab..e617210 100644 --- a/src/main/java/com/ofek/queue/MessageQueue.java +++ b/src/main/java/com/ofek/queue/MessageQueue.java @@ -8,48 +8,179 @@ import java.nio.file.StandardOpenOption; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.List; +import java.util.ArrayList; +import java.util.concurrent.TimeUnit; public class MessageQueue { private final BlockingQueue queue = new LinkedBlockingQueue<>(); + private final BlockingQueue persistenceQueue = new LinkedBlockingQueue<>(); private final Path messagesFilePath; + private final ExecutorService persistenceExecutor; + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + // Configuration + private final int batchSize; + private final boolean enableConsoleLogging; + + /** + * Creates a MessageQueue with default settings (batch size 100, no console + * logging) + */ public MessageQueue(String filename) { + this(filename, 100, false); + } + + /** + * Creates a MessageQueue with custom configuration + * + * @param filename The file to persist messages to + * @param batchSize Number of messages to batch before writing to + * file + * @param enableConsoleLogging Whether to log enqueue operations to console + */ + public MessageQueue(String filename, int batchSize, boolean enableConsoleLogging) { this.messagesFilePath = Path.of(filename); + this.batchSize = batchSize; + this.enableConsoleLogging = enableConsoleLogging; + this.persistenceExecutor = Executors.newSingleThreadExecutor(r -> { + Thread t = new Thread(r, "MessageQueue-Persistence"); + t.setDaemon(true); + return t; + }); + loadMessagesFromFile(); + startPersistenceWorker(); } + /** + * Adds a message to the queue. This operation is non-blocking and thread-safe. + * The message will be persisted to file asynchronously in batches. + */ public void enqueue(Message message) { - queue.offer(message); - saveMessageToFile(message); - System.out.println("Enqueued: " + message); + lock.readLock().lock(); + try { + queue.offer(message); + persistenceQueue.offer(message); // Async persistence + if (enableConsoleLogging) { + System.out.println("Enqueued: " + message); + } + } finally { + lock.readLock().unlock(); + } } + /** + * Removes and returns a message from the queue, or null if empty. + * This operation is thread-safe and allows concurrent access. + */ public Message dequeue() { - return queue.poll(); + lock.readLock().lock(); + try { + return queue.poll(); + } finally { + lock.readLock().unlock(); + } } + /** + * Returns the current number of messages in the queue. + */ public int size() { - return queue.size(); + lock.readLock().lock(); + try { + return queue.size(); + } finally { + lock.readLock().unlock(); + } } - private void saveMessageToFile(Message message) { + /** + * Shuts down the message queue, ensuring all messages are persisted. + * Call this method before application shutdown. + */ + public void shutdown() { + persistenceExecutor.shutdown(); + try { + // Wait for the persistence worker to finish + if (!persistenceExecutor.awaitTermination(1, TimeUnit.SECONDS)) { + persistenceExecutor.shutdownNow(); + // Wait a bit more for tasks to respond to being cancelled + if (!persistenceExecutor.awaitTermination(2, TimeUnit.SECONDS)) { + System.err.println("Persistence worker did not terminate gracefully"); + } + } + } catch (InterruptedException ie) { + // Re-interrupt the thread if we were interrupted while waiting + persistenceExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + private void startPersistenceWorker() { + persistenceExecutor.submit(() -> { + List batch = new ArrayList<>(batchSize); + try { + while (!Thread.currentThread().isInterrupted()) { + Message message = persistenceQueue.take(); // Blocking to wait for messages + batch.add(message); + persistenceQueue.drainTo(batch, batchSize - batch.size()); + + // Write batch to file + if (batch.size() >= batchSize) { + saveBatchToFile(batch); + } + } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + System.err.println("Error in persistence worker: " + e.getMessage()); + } finally { + // Ensure any remaining messages are saved + if (!batch.isEmpty()) { + saveBatchToFile(batch); + } + if (!persistenceQueue.isEmpty()) { + List remainingMessages = new ArrayList<>(); + persistenceQueue.drainTo(remainingMessages); + saveBatchToFile(remainingMessages); + } + } + }); + } + + private void saveBatchToFile(List messages) { + if (messages.isEmpty()) + return; + try (BufferedWriter writer = Files.newBufferedWriter( messagesFilePath, StandardOpenOption.CREATE, StandardOpenOption.APPEND)) { - writer.write(message.getId() + "|" + message.getPayload()); - writer.newLine(); + for (Message message : messages) { + writer.write(message.getId() + "|" + message.getPayload()); + writer.newLine(); + } + writer.flush(); // Ensure data is written } catch (IOException e) { - System.out.println("Error saving message to file: " + e.getMessage()); + System.err.println("Error saving messages to file: " + e.getMessage()); + } finally { + messages.clear(); } } private void loadMessagesFromFile() { if (!Files.exists(messagesFilePath)) { - System.out.println("File " + messagesFilePath + " doesn't exist. Starting with empty queue."); + if (enableConsoleLogging) { + System.out.println("File " + messagesFilePath + " doesn't exist. Starting with empty queue."); + } return; } try (BufferedReader reader = Files.newBufferedReader(messagesFilePath)) { String line; + int loadedCount = 0; while ((line = reader.readLine()) != null) { String[] parts = line.split("\\|", 2); if (parts.length == 2) { @@ -57,12 +188,14 @@ private void loadMessagesFromFile() { String payload = parts[1]; Message message = new Message(id, payload); queue.offer(message); + loadedCount++; } } + if (enableConsoleLogging) { + System.out.println("Loaded " + loadedCount + " messages from file"); + } } catch (IOException e) { - System.out.println("Error loading messages from file: " + e.getMessage()); + System.err.println("Error loading messages from file: " + e.getMessage()); } - } - } diff --git a/src/test/java/com/ofek/queue/AppTest.java b/src/test/java/com/ofek/queue/AppTest.java deleted file mode 100644 index cc93fa5..0000000 --- a/src/test/java/com/ofek/queue/AppTest.java +++ /dev/null @@ -1,38 +0,0 @@ -package com.ofek.queue; - -import junit.framework.Test; -import junit.framework.TestCase; -import junit.framework.TestSuite; - -/** - * Unit test for simple App. - */ -public class AppTest - extends TestCase -{ - /** - * Create the test case - * - * @param testName name of the test case - */ - public AppTest( String testName ) - { - super( testName ); - } - - /** - * @return the suite of tests being tested - */ - public static Test suite() - { - return new TestSuite( AppTest.class ); - } - - /** - * Rigourous Test :-) - */ - public void testApp() - { - assertTrue( true ); - } -} diff --git a/src/test/java/com/ofek/queue/ConsumerTest.java b/src/test/java/com/ofek/queue/ConsumerTest.java new file mode 100644 index 0000000..725acea --- /dev/null +++ b/src/test/java/com/ofek/queue/ConsumerTest.java @@ -0,0 +1,186 @@ +package com.ofek.queue; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.io.TempDir; +import static org.junit.jupiter.api.Assertions.*; + +import java.nio.file.Path; + +@DisplayName("Consumer Tests") +public class ConsumerTest { + + @TempDir + Path tempDir; + + private MessageQueue messageQueue; + private Producer producer; + private Consumer consumer; + + @BeforeEach + void setUp() { + Path testFile = tempDir.resolve("consumer_test.log"); + messageQueue = new MessageQueue(testFile.toString(), 10, false); + producer = new Producer(messageQueue); + consumer = new Consumer(messageQueue); + } + + @AfterEach + void tearDown() { + if (messageQueue != null) { + messageQueue.shutdown(); + } + } + + @Test + @DisplayName("Should consume single message") + void testConsumeSingleMessage() { + producer.produce("Test message"); + + Message message = consumer.poll(); + assertNotNull(message); + assertEquals("Test message", message.getPayload()); + assertEquals(0, messageQueue.size()); + } + + @Test + @DisplayName("Should consume multiple messages in FIFO order") + void testConsumeMultipleMessages() { + int messageCount = 5; + + for (int i = 0; i < messageCount; i++) { + producer.produce("Message " + i); + } + + assertEquals(messageCount, messageQueue.size()); + + for (int i = 0; i < messageCount; i++) { + Message message = consumer.poll(); + assertNotNull(message); + assertEquals("Message " + i, message.getPayload()); + } + + assertEquals(0, messageQueue.size()); + } + + @Test + @DisplayName("Should return null when consuming from empty queue") + void testConsumeFromEmptyQueue() { + assertEquals(0, messageQueue.size()); + + Message message = consumer.poll(); + assertNull(message); + + assertEquals(0, messageQueue.size()); + } + + @Test + @DisplayName("Should handle repeated polling of empty queue") + void testRepeatedPollingEmptyQueue() { + assertEquals(0, messageQueue.size()); + + for (int i = 0; i < 10; i++) { + Message message = consumer.poll(); + assertNull(message); + } + + assertEquals(0, messageQueue.size()); + } + + @Test + @DisplayName("Should consume messages as they are produced") + void testConsumeAsProduced() { + // Initially empty + assertEquals(0, messageQueue.size()); + assertNull(consumer.poll()); + + // Produce and consume one by one + producer.produce("Message 1"); + assertEquals(1, messageQueue.size()); + + Message msg1 = consumer.poll(); + assertNotNull(msg1); + assertEquals("Message 1", msg1.getPayload()); + assertEquals(0, messageQueue.size()); + + producer.produce("Message 2"); + assertEquals(1, messageQueue.size()); + + Message msg2 = consumer.poll(); + assertNotNull(msg2); + assertEquals("Message 2", msg2.getPayload()); + assertEquals(0, messageQueue.size()); + } + + @Test + @DisplayName("Should handle large messages") + void testConsumeLargeMessage() { + StringBuilder largePayload = new StringBuilder(); + for (int i = 0; i < 1000; i++) { + largePayload.append("Large payload content "); + } + + producer.produce(largePayload.toString()); + + Message message = consumer.poll(); + assertNotNull(message); + assertEquals(largePayload.toString(), message.getPayload()); + assertEquals(0, messageQueue.size()); + } + + @Test + @DisplayName("Should handle empty payload messages") + void testConsumeEmptyPayload() { + producer.produce(""); + + Message message = consumer.poll(); + assertNotNull(message); + assertEquals("", message.getPayload()); + assertEquals(0, messageQueue.size()); + } + + @Test + @DisplayName("Should handle partial queue consumption") + void testPartialConsumption() { + int totalMessages = 10; + int consumeCount = 5; + + // Produce all messages + for (int i = 0; i < totalMessages; i++) { + producer.produce("Message " + i); + } + + assertEquals(totalMessages, messageQueue.size()); + + // Consume only part of the messages + for (int i = 0; i < consumeCount; i++) { + Message message = consumer.poll(); + assertNotNull(message); + assertEquals("Message " + i, message.getPayload()); + } + + assertEquals(totalMessages - consumeCount, messageQueue.size()); + + // Verify remaining messages are still in correct order + for (int i = consumeCount; i < totalMessages; i++) { + Message message = consumer.poll(); + assertNotNull(message); + assertEquals("Message " + i, message.getPayload()); + } + + assertEquals(0, messageQueue.size()); + } + + @Test + @DisplayName("Should handle special characters in consumed messages") + void testSpecialCharactersInConsumption() { + String specialPayload = "Special chars: !@#$%^&*()_+[]{}|;:,.<>?`~"; + producer.produce(specialPayload); + + Message message = consumer.poll(); + assertNotNull(message); + assertEquals(specialPayload, message.getPayload()); + } +} diff --git a/src/test/java/com/ofek/queue/IntegrationTest.java b/src/test/java/com/ofek/queue/IntegrationTest.java new file mode 100644 index 0000000..8a019a3 --- /dev/null +++ b/src/test/java/com/ofek/queue/IntegrationTest.java @@ -0,0 +1,320 @@ +package com.ofek.queue; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.io.TempDir; +import static org.junit.jupiter.api.Assertions.*; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +@DisplayName("Integration Tests") +public class IntegrationTest { + + @TempDir + Path tempDir; + + private Path testFile; + + @BeforeEach + void setUp() { + testFile = tempDir.resolve("integration_test.log"); + } + + @Test + @DisplayName("Should handle complete workflow: produce, consume, and persist") + void testCompleteWorkflow() throws InterruptedException, IOException { + int messageCount = 24; + int batchSize = 6; + + // Create queue with specific batch size + MessageQueue batchQueue = new MessageQueue(testFile.toString(), batchSize, false); + Producer batchProducer = new Producer(batchQueue); + Consumer batchConsumer = new Consumer(batchQueue); + + // Produce messages + for (int i = 0; i < messageCount; i++) { + batchProducer.produce("Workflow message " + i); + } + + assertEquals(messageCount, batchQueue.size()); + + // Consume half of the messages + int consumeCount = messageCount / 2; + for (int i = 0; i < consumeCount; i++) { + Message message = batchConsumer.poll(); + assertNotNull(message); + assertEquals("Workflow message " + i, message.getPayload()); + } + + assertEquals(messageCount - consumeCount, batchQueue.size()); + + // Wait for persistence and verify file content + Thread.sleep(200); // Allow persistence to occur + + // Shutdown to ensure all messages are persisted + batchQueue.shutdown(); + Thread.sleep(200); + + // Verify file contains all produced messages + assertTrue(Files.exists(testFile)); + List lines = Files.readAllLines(testFile); + assertEquals(messageCount, lines.size()); + + // Verify recovery works + MessageQueue recoveredQueue = new MessageQueue(testFile.toString(), batchSize, false); + assertEquals(messageCount, recoveredQueue.size()); + + // Verify messages are in correct order + Consumer recoveredConsumer = new Consumer(recoveredQueue); + for (int i = 0; i < messageCount; i++) { + Message message = recoveredConsumer.poll(); + assertNotNull(message); + assertEquals("Workflow message " + i, message.getPayload()); + } + + recoveredQueue.shutdown(); + } + + @Test + @DisplayName("Should handle concurrent producers and consumers with persistence") + void testConcurrentOperationsWithPersistence() throws InterruptedException, IOException { + int numProducers = 4; + int numConsumers = 2; + int messagesPerProducer = 50; + int batchSize = 10; + + MessageQueue concurrentQueue = new MessageQueue(testFile.toString(), batchSize, false); + + ExecutorService executor = Executors.newFixedThreadPool(numProducers + numConsumers); + CountDownLatch producerLatch = new CountDownLatch(numProducers); + CountDownLatch consumerLatch = new CountDownLatch(numConsumers); + + AtomicInteger producedCount = new AtomicInteger(0); + AtomicInteger consumedCount = new AtomicInteger(0); + + // Start producers + for (int i = 0; i < numProducers; i++) { + final int producerId = i; + executor.submit(() -> { + try { + Producer threadProducer = new Producer(concurrentQueue); + for (int j = 0; j < messagesPerProducer; j++) { + threadProducer.produce("Producer-" + producerId + "-Message-" + j); + producedCount.incrementAndGet(); + } + } finally { + producerLatch.countDown(); + } + }); + } + + // Start consumers + for (int i = 0; i < numConsumers; i++) { + executor.submit(() -> { + try { + Consumer threadConsumer = new Consumer(concurrentQueue); + while (producedCount.get() < numProducers * messagesPerProducer || concurrentQueue.size() > 0) { + Message message = threadConsumer.poll(); + if (message != null) { + consumedCount.incrementAndGet(); + } + Thread.yield(); + } + } finally { + consumerLatch.countDown(); + } + }); + } + + // Wait for completion + assertTrue(producerLatch.await(10, TimeUnit.SECONDS)); + assertTrue(consumerLatch.await(10, TimeUnit.SECONDS)); + + executor.shutdown(); + + // Verify all messages were processed + assertEquals(numProducers * messagesPerProducer, producedCount.get()); + assertEquals(numProducers * messagesPerProducer, consumedCount.get()); + assertEquals(0, concurrentQueue.size()); + + // Shutdown and verify persistence + concurrentQueue.shutdown(); + Thread.sleep(200); + + assertTrue(Files.exists(testFile)); + List lines = Files.readAllLines(testFile); + assertEquals(numProducers * messagesPerProducer, lines.size()); + } + + @Test + @DisplayName("Should handle system restart simulation") + void testSystemRestartSimulation() throws InterruptedException, IOException { + int messageCount = 100; + int batchSize = 20; + + // Initial system operation + MessageQueue initialQueue = new MessageQueue(testFile.toString(), batchSize, false); + Producer initialProducer = new Producer(initialQueue); + Consumer initialConsumer = new Consumer(initialQueue); + + // Produce messages + for (int i = 0; i < messageCount; i++) { + initialProducer.produce("Restart test message " + i); + } + + // Consume some messages + int consumedInPhase1 = 30; + for (int i = 0; i < consumedInPhase1; i++) { + Message message = initialConsumer.poll(); + assertNotNull(message); + assertEquals("Restart test message " + i, message.getPayload()); + } + + // Simulate system shutdown + initialQueue.shutdown(); + Thread.sleep(300); + + // System restart + MessageQueue restartedQueue = new MessageQueue(testFile.toString(), batchSize, false); + + // Verify remaining messages are recovered + int expectedRecoveredCount = messageCount; // All messages should be persisted + assertEquals(expectedRecoveredCount, restartedQueue.size()); + + // Continue consuming + Consumer restartedConsumer = new Consumer(restartedQueue); + for (int i = 0; i < messageCount; i++) { + Message message = restartedConsumer.poll(); + assertNotNull(message); + assertEquals("Restart test message " + i, message.getPayload()); + } + + assertEquals(0, restartedQueue.size()); + + restartedQueue.shutdown(); + } + + @Test + @DisplayName("Should handle varying batch sizes correctly") + void testVaryingBatchSizes() throws InterruptedException, IOException { + int[] batchSizes = { 1, 5, 10, 25, 50 }; + int messagesPerBatch = 100; + + for (int batchSize : batchSizes) { + Path batchTestFile = tempDir.resolve("batch_test_" + batchSize + ".log"); + + MessageQueue batchQueue = new MessageQueue(batchTestFile.toString(), batchSize, false); + Producer batchProducer = new Producer(batchQueue); + + // Produce messages + for (int i = 0; i < messagesPerBatch; i++) { + batchProducer.produce("Batch-" + batchSize + "-Message-" + i); + } + + // Shutdown to ensure persistence + batchQueue.shutdown(); + Thread.sleep(200); + + // Verify file content + assertTrue(Files.exists(batchTestFile)); + List lines = Files.readAllLines(batchTestFile); + assertEquals(messagesPerBatch, lines.size()); + + // Verify recovery + MessageQueue recoveredQueue = new MessageQueue(batchTestFile.toString(), batchSize, false); + assertEquals(messagesPerBatch, recoveredQueue.size()); + + recoveredQueue.shutdown(); + } + } + + @Test + @DisplayName("Should handle mixed message sizes") + void testMixedMessageSizes() throws InterruptedException, IOException { + MessageQueue mixedQueue = new MessageQueue(testFile.toString(), 10, false); + Producer mixedProducer = new Producer(mixedQueue); + Consumer mixedConsumer = new Consumer(mixedQueue); + + // Produce messages of varying sizes + mixedProducer.produce("Short"); + mixedProducer.produce("Medium length message with some content"); + + StringBuilder longMessage = new StringBuilder(); + for (int i = 0; i < 1000; i++) { + longMessage.append("Long message content "); + } + mixedProducer.produce(longMessage.toString()); + + mixedProducer.produce(""); // Empty message + mixedProducer.produce("Final message"); + + assertEquals(5, mixedQueue.size()); + + // Consume and verify + Message msg1 = mixedConsumer.poll(); + assertEquals("Short", msg1.getPayload()); + + Message msg2 = mixedConsumer.poll(); + assertEquals("Medium length message with some content", msg2.getPayload()); + + Message msg3 = mixedConsumer.poll(); + assertEquals(longMessage.toString(), msg3.getPayload()); + + Message msg4 = mixedConsumer.poll(); + assertEquals("", msg4.getPayload()); + + Message msg5 = mixedConsumer.poll(); + assertEquals("Final message", msg5.getPayload()); + + assertEquals(0, mixedQueue.size()); + + // Shutdown and verify persistence + mixedQueue.shutdown(); + Thread.sleep(200); + + assertTrue(Files.exists(testFile)); + List lines = Files.readAllLines(testFile); + assertEquals(5, lines.size()); + } + + @Test + @DisplayName("Should handle rapid start-stop cycles") + void testRapidStartStopCycles() throws InterruptedException, IOException { + int cycles = 5; + int messagesPerCycle = 20; + + for (int cycle = 0; cycle < cycles; cycle++) { + MessageQueue cycleQueue = new MessageQueue(testFile.toString(), 10, false); + Producer cycleProducer = new Producer(cycleQueue); + + // Produce messages + for (int i = 0; i < messagesPerCycle; i++) { + cycleProducer.produce("Cycle-" + cycle + "-Message-" + i); + } + + // Quick shutdown + cycleQueue.shutdown(); + Thread.sleep(100); + } + + // Verify all messages were persisted + assertTrue(Files.exists(testFile)); + List lines = Files.readAllLines(testFile); + assertEquals(cycles * messagesPerCycle, lines.size()); + + // Verify recovery + MessageQueue finalQueue = new MessageQueue(testFile.toString(), 10, false); + assertEquals(cycles * messagesPerCycle, finalQueue.size()); + + finalQueue.shutdown(); + } +} diff --git a/src/test/java/com/ofek/queue/MessageQueueTest.java b/src/test/java/com/ofek/queue/MessageQueueTest.java new file mode 100644 index 0000000..543e759 --- /dev/null +++ b/src/test/java/com/ofek/queue/MessageQueueTest.java @@ -0,0 +1,384 @@ +package com.ofek.queue; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.io.TempDir; +import static org.junit.jupiter.api.Assertions.*; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +@DisplayName("MessageQueue Tests") +public class MessageQueueTest { + + @TempDir + Path tempDir; + + private MessageQueue messageQueue; + private Producer producer; + private Consumer consumer; + private Path testFile; + + @BeforeEach + void setUp() { + testFile = tempDir.resolve("message_queue_test.log"); + messageQueue = new MessageQueue(testFile.toString(), 5, false); + producer = new Producer(messageQueue); + consumer = new Consumer(messageQueue); + } + + @AfterEach + void tearDown() { + if (messageQueue != null) { + messageQueue.shutdown(); + } + } + + @Test + @DisplayName("Should enqueue and dequeue messages in FIFO order") + void testBasicEnqueueDequeue() { + // Enqueue messages + producer.produce("First message"); + producer.produce("Second message"); + producer.produce("Third message"); + + assertEquals(3, messageQueue.size()); + + // Dequeue messages and verify order + Message first = consumer.poll(); + Message second = consumer.poll(); + Message third = consumer.poll(); + + assertNotNull(first); + assertNotNull(second); + assertNotNull(third); + + assertEquals("First message", first.getPayload()); + assertEquals("Second message", second.getPayload()); + assertEquals("Third message", third.getPayload()); + + assertEquals(0, messageQueue.size()); + } + + @Test + @DisplayName("Should handle empty queue gracefully") + void testEmptyQueueHandling() { + assertEquals(0, messageQueue.size()); + + Message message = consumer.poll(); + assertNull(message); + + assertEquals(0, messageQueue.size()); + } + + @Test + @DisplayName("Should handle random number of messages") + void testRandomNumberOfMessages() throws InterruptedException { + Random random = new Random(); + int messageCount = random.nextInt(1000) + 1; // 1 to 1000 messages + + // Enqueue random number of messages + for (int i = 0; i < messageCount; i++) { + producer.produce("Random message " + i); + } + + assertEquals(messageCount, messageQueue.size()); + + // Dequeue all messages + int dequeuedCount = 0; + while (messageQueue.size() > 0) { + Message message = consumer.poll(); + if (message != null) { + dequeuedCount++; + assertTrue(message.getPayload().startsWith("Random message")); + } + } + + assertEquals(messageCount, dequeuedCount); + assertEquals(0, messageQueue.size()); + } + + @Test + @DisplayName("Should persist messages when batch size is reached") + void testBatchSizePersistence() throws InterruptedException, IOException { + int batchSize = 3; + MessageQueue batchQueue = new MessageQueue(testFile.toString(), batchSize, + false); + Producer batchProducer = new Producer(batchQueue); + + // Send exactly batch size messages + for (int i = 0; i < batchSize; i++) { + batchProducer.produce("Batch message " + i); + } + + // Wait for persistence + Thread.sleep(100); + + // Verify messages are saved to file + assertTrue(Files.exists(testFile)); + List lines = Files.readAllLines(testFile); + assertEquals(batchSize, lines.size()); + + // Verify file format + for (int i = 0; i < batchSize; i++) { + String line = lines.get(i); + assertTrue(line.contains("Batch message " + i)); + assertTrue(line.contains("|")); // ID|payload format + } + + batchQueue.shutdown(); + } + + @Test + @DisplayName("Should handle messages less than batch size") + void testLessThanBatchSize() throws InterruptedException, IOException { + int batchSize = 10; + MessageQueue batchQueue = new MessageQueue(testFile.toString(), batchSize, + false); + Producer batchProducer = new Producer(batchQueue); + + // Send fewer messages than batch size + int messageCount = 3; + for (int i = 0; i < messageCount; i++) { + batchProducer.produce("Small batch message " + i); + } + + // Shutdown to force persistence of remaining messages + batchQueue.shutdown(); + + // Wait for shutdown to complete + Thread.sleep(200); + + // Verify messages are saved to file + assertTrue(Files.exists(testFile)); + List lines = Files.readAllLines(testFile); + assertEquals(messageCount, lines.size()); + + for (int i = 0; i < messageCount; i++) { + String line = lines.get(i); + assertTrue(line.contains("Small batch message " + i)); + } + } + + @Test + @DisplayName("Should handle messages greater than batch size") + void testGreaterThanBatchSize() throws InterruptedException, IOException { + int batchSize = 3; + MessageQueue batchQueue = new MessageQueue(testFile.toString(), batchSize, false); + Producer batchProducer = new Producer(batchQueue); + + // Send more messages than batch size + int messageCount = 10; + for (int i = 0; i < messageCount; i++) { + batchProducer.produce("Large batch message " + i); + } + + // Shutdown to persist remaining messages + batchQueue.shutdown(); + Thread.sleep(100); + + // Verify all messages are saved to file + assertTrue(Files.exists(testFile)); + + List lines = Files.readAllLines(testFile); + assertEquals(messageCount, lines.size()); + + for (int i = 0; i < messageCount; i++) { + String line = lines.get(i); + assertTrue(line.contains("Large batch message " + i)); + } + } + + @Test + @DisplayName("Should handle messages equal to batch size") + void testEqualToBatchSize() throws InterruptedException, IOException { + int batchSize = 5; + MessageQueue batchQueue = new MessageQueue(testFile.toString(), batchSize, + false); + Producer batchProducer = new Producer(batchQueue); + + // Send exactly batch size messages + for (int i = 0; i < batchSize; i++) { + batchProducer.produce("Equal batch message " + i); + } + + // Wait for persistence + Thread.sleep(100); + + // Verify messages are saved to file + assertTrue(Files.exists(testFile)); + List lines = Files.readAllLines(testFile); + assertEquals(batchSize, lines.size()); + + batchQueue.shutdown(); + } + + @Test + @DisplayName("Should recover messages from file on startup") + void testMessageRecovery() throws InterruptedException, IOException { + // Create and populate queue + MessageQueue originalQueue = new MessageQueue(testFile.toString(), 3, false); + Producer originalProducer = new Producer(originalQueue); + + for (int i = 0; i < 5; i++) { + originalProducer.produce("Recovery message " + i); + } + + // Shutdown to ensure persistence + originalQueue.shutdown(); + Thread.sleep(100); + + // Create new queue with same file - should recover messages + MessageQueue recoveredQueue = new MessageQueue(testFile.toString(), 3, + false); + + // Verify recovered messages + assertEquals(5, recoveredQueue.size()); + + Consumer recoveredConsumer = new Consumer(recoveredQueue); + for (int i = 0; i < 5; i++) { + Message message = recoveredConsumer.poll(); + assertNotNull(message); + assertEquals("Recovery message " + i, message.getPayload()); + } + + recoveredQueue.shutdown(); + } + + @Test + @DisplayName("Should handle concurrent producers and consumers") + void testConcurrentAccess() throws InterruptedException { + int numProducers = 5; + int numConsumers = 3; + int messagesPerProducer = 100; + + ExecutorService executor = Executors.newFixedThreadPool(numProducers + numConsumers); + CountDownLatch producerLatch = new CountDownLatch(numProducers); + CountDownLatch consumerLatch = new CountDownLatch(numConsumers); + + AtomicInteger producedCount = new AtomicInteger(0); + AtomicInteger consumedCount = new AtomicInteger(0); + + // Start producers + for (int i = 0; i < numProducers; i++) { + final int producerId = i; + executor.submit(() -> { + try { + for (int j = 0; j < messagesPerProducer; j++) { + producer.produce("Producer-" + producerId + "-Message-" + j); + producedCount.incrementAndGet(); + } + } finally { + producerLatch.countDown(); + } + }); + } + + // Start consumers + for (int i = 0; i < numConsumers; i++) { + executor.submit(() -> { + try { + while (producedCount.get() < numProducers * messagesPerProducer || + messageQueue.size() > 0) { + Message message = consumer.poll(); + if (message != null) { + consumedCount.incrementAndGet(); + } + } + } finally { + consumerLatch.countDown(); + } + }); + } + + // Wait for completion + assertTrue(producerLatch.await(5, TimeUnit.SECONDS)); + assertTrue(consumerLatch.await(5, TimeUnit.SECONDS)); + + executor.shutdown(); + + // Verify all messages were processed + assertEquals(numProducers * messagesPerProducer, producedCount.get()); + assertEquals(numProducers * messagesPerProducer, consumedCount.get()); + assertEquals(0, messageQueue.size()); + } + + @Test + @DisplayName("Should handle large messages") + void testLargeMessages() throws InterruptedException { + // Create large payload (1MB) + StringBuilder largePayload = new StringBuilder(); + for (int i = 0; i < 1024 * 1024; i++) { + largePayload.append("A"); + } + + producer.produce(largePayload.toString()); + assertEquals(1, messageQueue.size()); + + Message message = consumer.poll(); + assertNotNull(message); + assertEquals(largePayload.toString(), message.getPayload()); + assertEquals(0, messageQueue.size()); + } + + @Test + @DisplayName("Should maintain message uniqueness with UUID") + void testMessageIdUniqueness() { + producer.produce("Same payload"); + producer.produce("Same payload"); + producer.produce("Same payload"); + + Message first = consumer.poll(); + Message second = consumer.poll(); + Message third = consumer.poll(); + + assertNotNull(first); + assertNotNull(second); + assertNotNull(third); + + // All should have same payload but different IDs + assertEquals("Same payload", first.getPayload()); + assertEquals("Same payload", second.getPayload()); + assertEquals("Same payload", third.getPayload()); + + assertNotEquals(first.getId(), second.getId()); + assertNotEquals(second.getId(), third.getId()); + assertNotEquals(first.getId(), third.getId()); + } + + @Test + @DisplayName("Should handle queue operations during shutdown") + void testOperationsDuringShutdown() throws InterruptedException { + // Add some messages + producer.produce("Before shutdown"); + assertEquals(1, messageQueue.size()); + + // Shutdown queue + messageQueue.shutdown(); + + // Operations should still work until the queue is fully shut down + Message message = consumer.poll(); + assertNotNull(message); + assertEquals("Before shutdown", message.getPayload()); + + // check if message saved to file + assertTrue(Files.exists(testFile)); + try { + List lines = Files.readAllLines(testFile); + assertEquals(1, lines.size()); + assertTrue(lines.get(0).contains("Before shutdown")); + } catch (IOException e) { + fail("Failed to read from file after shutdown: " + e.getMessage()); + } + } +} diff --git a/src/test/java/com/ofek/queue/MessageTest.java b/src/test/java/com/ofek/queue/MessageTest.java new file mode 100644 index 0000000..d783838 --- /dev/null +++ b/src/test/java/com/ofek/queue/MessageTest.java @@ -0,0 +1,104 @@ +package com.ofek.queue; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.DisplayName; +import static org.junit.jupiter.api.Assertions.*; + +@DisplayName("Message Tests") +public class MessageTest { + + @Test + @DisplayName("Should create message with auto-generated ID") + void testMessageCreationWithAutoId() { + String payload = "Test payload"; + Message message = new Message(payload); + + assertNotNull(message.getId()); + assertFalse(message.getId().isEmpty()); + assertEquals(payload, message.getPayload()); + } + + @Test + @DisplayName("Should create message with specified ID") + void testMessageCreationWithSpecificId() { + String id = "custom-id-123"; + String payload = "Test payload"; + Message message = new Message(id, payload); + + assertEquals(id, message.getId()); + assertEquals(payload, message.getPayload()); + } + + @Test + @DisplayName("Should generate unique IDs for different messages") + void testUniqueIdGeneration() { + Message message1 = new Message("Same payload"); + Message message2 = new Message("Same payload"); + Message message3 = new Message("Same payload"); + + assertNotEquals(message1.getId(), message2.getId()); + assertNotEquals(message2.getId(), message3.getId()); + assertNotEquals(message1.getId(), message3.getId()); + } + + @Test + @DisplayName("Should handle empty payload") + void testEmptyPayload() { + Message message = new Message(""); + + assertNotNull(message.getId()); + assertEquals("", message.getPayload()); + } + + @Test + @DisplayName("Should handle very long payload") + void testLongPayload() { + StringBuilder longPayload = new StringBuilder(); + for (int i = 0; i < 10000; i++) { + longPayload.append("This is a very long payload. "); + } + + Message message = new Message(longPayload.toString()); + + assertNotNull(message.getId()); + assertEquals(longPayload.toString(), message.getPayload()); + } + + @Test + @DisplayName("Should have proper toString representation") + void testToString() { + Message message = new Message("test-id", "test payload"); + String toString = message.toString(); + + assertTrue(toString.contains("test-id")); + assertTrue(toString.contains("test payload")); + assertTrue(toString.contains("Message{")); + } + + @Test + @DisplayName("Should handle special characters in payload") + void testSpecialCharacters() { + String specialPayload = "Special chars: !@#$%^&*()_+[]{}|;:,.<>?`~"; + Message message = new Message(specialPayload); + + assertEquals(specialPayload, message.getPayload()); + } + + @Test + @DisplayName("Should handle newlines and tabs in payload") + void testNewlinesAndTabs() { + String payload = "Line 1\nLine 2\tTabbed content\r\nWindows line ending"; + Message message = new Message(payload); + + assertEquals(payload, message.getPayload()); + } + + @Test + @DisplayName("Should handle pipe character in payload (potential file format conflict)") + void testPipeCharacterInPayload() { + String payload = "This|has|pipe|characters"; + Message message = new Message(payload); + + assertEquals(payload, message.getPayload()); + } +} diff --git a/src/test/java/com/ofek/queue/PerformanceTest.java b/src/test/java/com/ofek/queue/PerformanceTest.java new file mode 100644 index 0000000..a587540 --- /dev/null +++ b/src/test/java/com/ofek/queue/PerformanceTest.java @@ -0,0 +1,286 @@ +package com.ofek.queue; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +public class PerformanceTest { + + private static final String PERFORMANCE_REPORT_FILE = "logs/performance_report.log"; + private static final String PERF_TEST_MESSAGES_DIR = "logs/perf_test_messages"; + private static final int[] MESSAGE_COUNTS = { 1000, 10000, 100000, 1000000 }; + + // Intelligent thread count based on system capabilities + private static final int CPU_CORES = Runtime.getRuntime().availableProcessors(); + private static final int THREAD_COUNT = CPU_CORES * 2; + + private static final String PAYLOAD = generatePayload(100); + + public static void main(String[] args) { + PerformanceTest test = new PerformanceTest(); + try { + test.runAllTests(); + } catch (Exception e) { + System.err.println("Error running performance tests: " + e.getMessage()); + e.printStackTrace(); + } + } + + public void runAllTests() throws IOException { + System.out.println("Starting Performance Tests..."); + System.out.println("================================="); + System.out.println("System Info:"); + System.out.println(" CPU Cores: " + CPU_CORES); + System.out.println(" Thread Count: " + THREAD_COUNT); + System.out.println("================================="); + + // Initialize report file and create performance test messages directory + initializeReportFile(); + createPerformanceTestDirectory(); + + // Run single-threaded tests + for (int messageCount : MESSAGE_COUNTS) { + System.out.println("\nTesting with " + messageCount + " messages (Single-threaded):"); + runSingleThreadedTest(messageCount); + } + + // Run multi-threaded tests + for (int messageCount : MESSAGE_COUNTS) { + System.out.println( + "\nTesting with " + messageCount + " messages (Multi-threaded, " + THREAD_COUNT + " threads):"); + runMultiThreadedTest(messageCount); + } + + System.out.println("\nPerformance tests completed. Results saved to: " + PERFORMANCE_REPORT_FILE); + System.out.println("Message files saved to: " + PERF_TEST_MESSAGES_DIR); + } + + private void runSingleThreadedTest(int messageCount) throws IOException { + // Create unique filename for this test run + String testFileName = String.format("%s/single_threaded_%d_messages.log", PERF_TEST_MESSAGES_DIR, messageCount); + + // Clean up any existing messages file + cleanupMessagesFile(testFileName); + + MessageQueue queue = new MessageQueue(testFileName); + Producer producer = new Producer(queue); + Consumer consumer = new Consumer(queue); + + // Test enqueue performance + long startTime = System.nanoTime(); + for (int i = 0; i < messageCount; i++) { + producer.produce(PAYLOAD); + } + long enqueueTime = System.nanoTime() - startTime; + + // Test dequeue performance + startTime = System.nanoTime(); + int processedMessages = 0; + while (queue.size() > 0) { + Message message = consumer.poll(); + if (message != null) { + processedMessages++; + } + } + long dequeueTime = System.nanoTime() - startTime; + + // Calculate metrics + double enqueueTimeMs = enqueueTime / 1_000_000.0; + double dequeueTimeMs = dequeueTime / 1_000_000.0; + double totalTimeMs = enqueueTimeMs + dequeueTimeMs; + double throughputMsg = messageCount / (totalTimeMs / 1000.0); + + // Display results + String results = String.format( + "Messages: %,d | Enqueue: %.2f ms | Dequeue: %.2f ms | Total: %.2f ms | Throughput: %.2f msg/sec | Processed: %d", + messageCount, enqueueTimeMs, dequeueTimeMs, totalTimeMs, throughputMsg, processedMessages); + System.out.println(results); + System.out.println("Message file saved: " + testFileName); + + // Save to report + saveToReport("SINGLE-THREADED TEST", messageCount, results, enqueueTimeMs, dequeueTimeMs, totalTimeMs, + throughputMsg, testFileName); + + // Clean up + queue.shutdown(); + // Note: We don't delete the message file anymore so you can inspect it + } + + private void runMultiThreadedTest(int messageCount) throws IOException { + // Create unique filename for this test run + String testFileName = String.format("%s/multi_threaded_%d_threads_%d_messages.log", PERF_TEST_MESSAGES_DIR, + THREAD_COUNT, messageCount); + + // Clean up any existing messages file + cleanupMessagesFile(testFileName); + + MessageQueue queue = new MessageQueue(testFileName); + ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT * 2); // Producers + Consumers + + int messagesPerThread = messageCount / THREAD_COUNT; + List> futures = new ArrayList<>(); + + long startTime = System.nanoTime(); + + // Start producer threads + for (int t = 0; t < THREAD_COUNT; t++) { + Future future = executor.submit(() -> { + Producer producer = new Producer(queue); + for (int i = 0; i < messagesPerThread; i++) { + producer.produce(PAYLOAD); + } + }); + futures.add(future); + } + + // Wait for all producers to finish + for (Future future : futures) { + try { + future.get(); + } catch (Exception e) { + System.err.println("Producer thread error: " + e.getMessage()); + } + } + + long enqueueTime = System.nanoTime() - startTime; + + // Start consumer threads + futures.clear(); + startTime = System.nanoTime(); + + for (int t = 0; t < THREAD_COUNT; t++) { + Future future = executor.submit(() -> { + Consumer consumer = new Consumer(queue); + while (queue.size() > 0) { + Message message = consumer.poll(); + if (message == null) { + break; // No more messages + } + } + }); + futures.add(future); + } + + // Wait for all consumers to finish + for (Future future : futures) { + try { + future.get(); + } catch (Exception e) { + System.err.println("Consumer thread error: " + e.getMessage()); + } + } + + long dequeueTime = System.nanoTime() - startTime; + + queue.shutdown(); + executor.shutdown(); + try { + executor.awaitTermination(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + // Calculate metrics + double enqueueTimeMs = enqueueTime / 1_000_000.0; + double dequeueTimeMs = dequeueTime / 1_000_000.0; + double totalTimeMs = enqueueTimeMs + dequeueTimeMs; + double throughputMsg = messageCount / (totalTimeMs / 1000.0); + + // Display results + String results = String.format( + "Messages: %,d | Threads: %d | Enqueue: %.2f ms | Dequeue: %.2f ms | Total: %.2f ms | Throughput: %.2f msg/sec", + messageCount, THREAD_COUNT, enqueueTimeMs, dequeueTimeMs, totalTimeMs, throughputMsg); + System.out.println(results); + System.out.println("Message file saved: " + testFileName); + + // Save to report + saveToReport("MULTI-THREADED TEST (" + THREAD_COUNT + " threads)", messageCount, results, enqueueTimeMs, + dequeueTimeMs, totalTimeMs, throughputMsg, testFileName); + + // Note: We don't delete the message file anymore so you can inspect it + } + + private static String generatePayload(int length) { + StringBuilder sb = new StringBuilder(length); + for (int i = 0; i < length; i++) { + sb.append((char) ('A' + (i % 26))); + } + return sb.toString(); + } + + private void cleanupMessagesFile(String filePath) { + try { + Files.deleteIfExists(Path.of(filePath)); + } catch (IOException e) { + System.err.println("Error cleaning up messages file: " + e.getMessage()); + } + } + + private void createPerformanceTestDirectory() throws IOException { + Path perfTestDir = Path.of(PERF_TEST_MESSAGES_DIR); + if (!Files.exists(perfTestDir)) { + Files.createDirectories(perfTestDir); + System.out.println("Created performance test messages directory: " + PERF_TEST_MESSAGES_DIR); + } + } + + private void initializeReportFile() throws IOException { + Path reportPath = Path.of(PERFORMANCE_REPORT_FILE); + + String header = String.format( + "MESSAGE QUEUE PERFORMANCE TEST REPORT\n" + + "=====================================\n" + + "Test Date: %s\n" + + "Java Version: %s\n" + + "OS: %s %s\n" + + "Available Processors: %d\n" + + "Max Memory: %,d MB\n\n", + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), + System.getProperty("java.version"), + System.getProperty("os.name"), + System.getProperty("os.version"), + Runtime.getRuntime().availableProcessors(), + Runtime.getRuntime().maxMemory() / (1024 * 1024)); + + Files.write(reportPath, header.getBytes(), + StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING); + } + + private void saveToReport(String testType, int messageCount, String results, + double enqueueTimeMs, double dequeueTimeMs, + double totalTimeMs, double throughputMsg, String messageFilePath) throws IOException { + Path reportPath = Path.of(PERFORMANCE_REPORT_FILE); + + String report = String.format( + "%s\n" + + "Message Count: %,d\n" + + "Message File: %s\n" + + "Results: %s\n" + + "Detailed Metrics:\n" + + " - Enqueue Time: %.2f ms (%.2f msg/sec)\n" + + " - Dequeue Time: %.2f ms (%.2f msg/sec)\n" + + " - Total Time: %.2f ms\n" + + " - Overall Throughput: %.2f msg/sec\n" + + " - Memory Usage: %,d MB\n\n", + testType, + messageCount, + messageFilePath, + results, + enqueueTimeMs, messageCount / (enqueueTimeMs / 1000.0), + dequeueTimeMs, messageCount / (dequeueTimeMs / 1000.0), + totalTimeMs, + throughputMsg, + (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / (1024 * 1024)); + + Files.write(reportPath, report.getBytes(), StandardOpenOption.APPEND); + } +} diff --git a/src/test/java/com/ofek/queue/ProducerTest.java b/src/test/java/com/ofek/queue/ProducerTest.java new file mode 100644 index 0000000..2bbdc9c --- /dev/null +++ b/src/test/java/com/ofek/queue/ProducerTest.java @@ -0,0 +1,126 @@ +package com.ofek.queue; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.io.TempDir; +import static org.junit.jupiter.api.Assertions.*; + +import java.nio.file.Path; + +@DisplayName("Producer Tests") +public class ProducerTest { + + @TempDir + Path tempDir; + + private MessageQueue messageQueue; + private Producer producer; + + @BeforeEach + void setUp() { + Path testFile = tempDir.resolve("producer_test.log"); + messageQueue = new MessageQueue(testFile.toString(), 10, false); + producer = new Producer(messageQueue); + } + + @AfterEach + void tearDown() { + if (messageQueue != null) { + messageQueue.shutdown(); + } + } + + @Test + @DisplayName("Should produce single message") + void testProduceSingleMessage() { + producer.produce("Test message"); + + assertEquals(1, messageQueue.size()); + + Message message = messageQueue.dequeue(); + assertNotNull(message); + assertEquals("Test message", message.getPayload()); + } + + @Test + @DisplayName("Should produce multiple messages") + void testProduceMultipleMessages() { + int messageCount = 5; + + for (int i = 0; i < messageCount; i++) { + producer.produce("Message " + i); + } + + assertEquals(messageCount, messageQueue.size()); + + for (int i = 0; i < messageCount; i++) { + Message message = messageQueue.dequeue(); + assertNotNull(message); + assertEquals("Message " + i, message.getPayload()); + } + } + + @Test + @DisplayName("Should handle empty payload") + void testProduceEmptyPayload() { + producer.produce(""); + + assertEquals(1, messageQueue.size()); + + Message message = messageQueue.dequeue(); + assertNotNull(message); + assertEquals("", message.getPayload()); + } + + @Test + @DisplayName("Should handle large payload") + void testProduceLargePayload() { + StringBuilder largePayload = new StringBuilder(); + for (int i = 0; i < 1000; i++) { + largePayload.append("Large payload content "); + } + + producer.produce(largePayload.toString()); + + assertEquals(1, messageQueue.size()); + + Message message = messageQueue.dequeue(); + assertNotNull(message); + assertEquals(largePayload.toString(), message.getPayload()); + } + + @Test + @DisplayName("Should generate unique message IDs") + void testUniqueMessageIds() { + producer.produce("Same payload"); + producer.produce("Same payload"); + producer.produce("Same payload"); + + Message msg1 = messageQueue.dequeue(); + Message msg2 = messageQueue.dequeue(); + Message msg3 = messageQueue.dequeue(); + + assertNotNull(msg1); + assertNotNull(msg2); + assertNotNull(msg3); + + assertNotEquals(msg1.getId(), msg2.getId()); + assertNotEquals(msg2.getId(), msg3.getId()); + assertNotEquals(msg1.getId(), msg3.getId()); + } + + @Test + @DisplayName("Should handle special characters in payload") + void testSpecialCharactersInPayload() { + String specialPayload = "Special chars: !@#$%^&*()_+[]{}|;:,.<>?`~"; + producer.produce(specialPayload); + + assertEquals(1, messageQueue.size()); + + Message message = messageQueue.dequeue(); + assertNotNull(message); + assertEquals(specialPayload, message.getPayload()); + } +}