Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
name: CI

on:
push:
branches:
- main
paths-ignore:
- "*.md"
pull_request:
branches:
- main
paths-ignore:
- "*.md"

permissions:
contents: read

jobs:
test:
runs-on: ubuntu-latest

strategy:
matrix:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why matrix for one runner one version? just use jobs

java-version: [21]

steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Set up JDK ${{ matrix.java-version }}
uses: actions/setup-java@v4
with:
java-version: ${{ matrix.java-version }}
distribution: "temurin"
cache: maven

- name: Cache Maven dependencies
uses: actions/cache@v4
with:
path: ~/.m2/repository
key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
restore-keys: ${{ runner.os }}-m2

- name: Run tests
run: mvn clean test
97 changes: 92 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ A simple Java-based message queue implementation with file storage.

## Features

- **Thread-safe message queue** using `BlockingQueue`
- **Thread-safe message queue** using `BlockingQueue` with read-write locks
- **Unique message IDs** - automatically generated UUIDs for each message
- **File-based storage** - messages survive application restarts
- **Async batch persistence** - high-performance file storage with configurable batch sizes
- **File-based storage** - messages survive application restarts with automatic loading
- **Producer/Consumer pattern** - separate classes for producing and consuming messages
- **Configurable settings** - customizable batch size and logging options

## Classes

Expand All @@ -25,11 +27,15 @@ Represents a message with:

The main messages queue implementation with:

- `enqueue(Message message)` - adds message to queue and saves to file
- `enqueue(Message message)` - adds message to queue with async persistence
- `dequeue()` - removes and returns message from queue
- `size()` - returns current queue size
- `flush()` - forces all pending messages to be written to disk
- `shutdown()` - gracefully shuts down the queue, ensuring all messages are persisted
- **Async batch persistence** - messages are written to file in configurable batches for optimal performance
- **Configurable settings** - batch size and console logging can be customized
- Automatic file loading on startup
- Saves messages to a specified file
- Thread-safe operations with read-write locks

### [`Producer`](src/main/java/com/ofek/queue/Producer.java)

Expand All @@ -51,10 +57,15 @@ Demo application showing producer/consumer usage pattern

## Usage

### Basic Usage

```java
// Create a message queue with file storage
// Create a message queue with default settings (batch size: 100, no console logging)
MessageQueue queue = new MessageQueue("messages.log");

// Or create with custom configuration
MessageQueue queue = new MessageQueue("messages.log", 50, true); // batch size: 50, console logging enabled

// Create producer and consumer
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
Expand All @@ -69,8 +80,31 @@ System.out.println("Delivered: " + delivered);

// Check queue size
int size = queue.size();

// Gracefully shutdown (automatically flushes pending messages)
queue.shutdown();
```

### Configuration Options

The MessageQueue constructor accepts the following parameters:

- **filename** (String): The file path where messages will be persisted
- **batchSize** (int, optional): Number of messages to batch before writing to file (default: 100)
- **enableConsoleLogging** (boolean, optional): Whether to log enqueue operations to console (default: false)

### Async Batch Persistence

The MessageQueue implements an efficient async batch persistence mechanism:

- **Non-blocking enqueue**: Messages are added to the queue immediately without waiting for disk I/O
- **Background persistence**: A dedicated thread handles writing messages to file
- **Batch optimization**: Messages are written in configurable batches to reduce file I/O overhead
- **Automatic flushing**: The system ensures data integrity by flushing batches to disk
- **Graceful shutdown**: Call `shutdown()` to ensure all pending messages are persisted (includes automatic flush)

This design provides high throughput for message enqueuing while maintaining data durability.

## Building and Running

This is a Maven project. To build and run:
Expand All @@ -86,6 +120,59 @@ mvn exec:java -Dexec.mainClass="com.ofek.queue.App"
mvn test
```

## Performance Testing

This project includes comprehensive performance testing tools to measure queue performance under various conditions.

### Quick Performance Test

Run the automated performance test script:

```bash
# Make script executable (first time only)
chmod +x run_performance_test.sh

# Run performance tests
./run_performance_test.sh
```

This will:

- Test with 1K, 10K, 100K, and 1M messages
- Run both single-threaded and multi-threaded tests
- Generate a detailed performance report (`performance_report.log`)

### Manual Performance Tests

#### Basic Performance Test

```bash
# Compile test classes
mvn test-compile

# Run basic performance test
mvn exec:java -Dexec.mainClass="com.ofek.queue.PerformanceTest" -Dexec.classpathScope="test"
```

### Performance Test Outputs

The performance tests generate multiple output files:

**`performance_report.log`** - Human-readable performance report with:

- System information (Java version, OS, memory)
- Test results for different message counts
- Single-threaded and multi-threaded performance metrics
- Memory usage information

### Sample Performance Results

Typical results on a modern system:

```
Messages: 100,000 | Enqueue: 2,450.32 ms | Dequeue: 1,234.56 ms | Total: 3,684.88 ms | Throughput: 27,140.23 msg/sec
```

## File Format

Messages are stored in pipe-delimited format:
Expand Down
35 changes: 32 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,39 @@

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.13.3</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>5.18.0</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.5.3</version>
<configuration>
<parallel>all</parallel>
<threadCount>8</threadCount>
<properties>
<configurationParameters>
junit.jupiter.execution.parallel.enabled=true
junit.jupiter.execution.parallel.mode.default=concurrent
junit.jupiter.execution.parallel.mode.classes.default=concurrent
junit.jupiter.execution.parallel.mode.methods.default=concurrent
</configurationParameters>
</properties>
</configuration>
</plugin>
</plugins>
</build>
</project>
38 changes: 38 additions & 0 deletions run_performance_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#!/bin/bash

echo "Message Queue Performance Test"
echo "=============================="
echo

# Optimized GC, Heap and others Java settings
export MAVEN_OPTS="-Xms2g -Xmx4g -XX:+UseG1GC -XX:MaxGCPauseMillis=50 -XX:G1HeapRegionSize=2m -XX:+G1UseAdaptiveIHOP -XX:G1MixedGCCountTarget=4 -Xlog:gc*:logs/gc.log"

# Clean and compile the project
echo "Compiling project..."
mvn clean compile test-compile -q

if [ $? -ne 0 ]; then
echo "Error: Compilation failed"
exit 1
fi

echo "Compilation successful!"
echo

# Run the performance test
echo "Starting performance test..."
echo

# Run the test
mvn exec:java \
-Dexec.mainClass="com.ofek.queue.PerformanceTest" \
-Dexec.classpathScope="test"

if [ $? -eq 0 ]; then
echo
echo "Performance test completed successfully!"
echo
else
echo "Error: Performance test failed"
exit 1
fi
21 changes: 13 additions & 8 deletions src/main/java/com/ofek/queue/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,28 @@

public class App {
public static void main(String[] args) {
MessageQueue queue = new MessageQueue("messages.log");
MessageQueue queue = new MessageQueue("logs/messages.log", 100, false);

// Create producer and consumer instances
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);

producer.produce("Hello, World!");
producer.produce("Second message");
producer.produce("Third message");
System.out.println("Starting message production...");
for (int i = 1; i <= 502; i++) {
producer.produce("Hello, World!" + i);
}

System.out.println("Queue size: " + queue.size());

// Loop to poll all messages
System.out.println("Polling messages:");
System.out.println("Polling messages...");
while (queue.size() > 0) {
Message delivered = consumer.poll();
System.out.println("Delivered: " + delivered);
consumer.poll();
}

System.out.println("Queue size: " + queue.size());
System.out.println("Queue size after pooling: " + queue.size());

queue.shutdown();
System.out.println("Queue shutdown complete.");
}
}
Loading