Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.github.fridujo.rabbitmq.mock;

import static org.junit.jupiter.api.Assertions.*;

import com.github.fridujo.rabbitmq.mock.MockConnectionFactory;
import com.rabbitmq.client.*;

import org.junit.jupiter.api.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

class ExchangeRoutingTest {

private Connection connection;
private Channel channel;
private final String exchangeName = "testExchange";
private final String queueName = "testQueue";
private final String routingKey = "test.key";

@BeforeEach
void setUp() throws IOException, TimeoutException {
ConnectionFactory factory = new MockConnectionFactory();
connection = factory.newConnection();
channel = connection.createChannel();

// Create the exchange and queue
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
}

@AfterEach
void tearDown() throws IOException, TimeoutException {
channel.close();
connection.close();
}

@Test
void testMessageIsRoutedToBoundQueue() throws Exception {
String message = "Test Exchange Routing";
channel.basicPublish(exchangeName, routingKey, null, message.getBytes(StandardCharsets.UTF_8));

GetResponse response = channel.basicGet(queueName, true);
assertNotNull(response);
assertEquals(message, new String(response.getBody(), StandardCharsets.UTF_8));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.github.fridujo.rabbitmq.mock;

import static org.junit.jupiter.api.Assertions.*;

import com.github.fridujo.rabbitmq.mock.MockConnectionFactory;
import com.rabbitmq.client.*;

import org.junit.jupiter.api.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.*;

class MessageFlowIntegrationTest{

private Connection connection;
private Channel channel;
private final String exchangeName = "integrationExchange";
private final String queueName = "integrationQueue";
private final String routingKey = "integration.key";

@BeforeEach
void setUp() throws IOException, TimeoutException {//setup connectuion mock and channel as all other tests
ConnectionFactory factory = new MockConnectionFactory();
connection = factory.newConnection();
channel = connection.createChannel();

// Setup exchange and queue
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
}

@AfterEach//close channel/connection when complete
void tearDown() throws IOException, TimeoutException {
channel.close();
connection.close();
}

@Test
void testEndToEndMessageFlow() throws Exception {
String message = "Top-Down Test Message";
CountDownLatch latch = new CountDownLatch(1); // Synchronization aid
String[] receivedMessage = new String[1]; // Store received message

// Start a consumer
channel.basicConsume(queueName, true, (consumerTag, delivery) -> {
receivedMessage[0] = new String(delivery.getBody(), StandardCharsets.UTF_8);
latch.countDown(); // Signal that message is received
}, consumerTag -> {});

// Publish message
channel.basicPublish(exchangeName, routingKey, null, message.getBytes(StandardCharsets.UTF_8));

// Wait for the message to be received
boolean messageReceived = latch.await(2, TimeUnit.SECONDS);

// Validate that the message was successfully processed
assertTrue(messageReceived, "Consumer should have received the message.");
assertEquals(message, receivedMessage[0], "Received message should match the sent message.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.github.fridujo.rabbitmq.mock;
import static org.junit.jupiter.api.Assertions.*;

import com.github.fridujo.rabbitmq.mock.MockConnectionFactory;
import com.rabbitmq.client.*;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

class MessagePublishingTest {

private Connection connection;
private Channel channel;

@BeforeEach
void setUp() throws IOException, TimeoutException { // Create a mock of an in-memory RabbitMQ mock
ConnectionFactory factory = new MockConnectionFactory();
connection = factory.newConnection();
channel = connection.createChannel();
}

@Test
void testMessagePublishingToQueue() throws IOException {
String exchangeName = "testExchange";
String queueName = "testQueue";
String routingKey = "testKey";
String message = "Test message for RabbitMQ";

// Declare exchange and queue, then bind them
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

// Publish a message
channel.basicPublish(exchangeName, routingKey, null, message.getBytes(StandardCharsets.UTF_8));

// Consume the message from the queue
GetResponse response = channel.basicGet(queueName, true);
assertNotNull(response);
assertEquals(message, new String(response.getBody(), StandardCharsets.UTF_8));
}
}
130 changes: 130 additions & 0 deletions src/test/java/com/github/fridujo/rabbitmq/mock/QueueConsumerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package com.github.fridujo.rabbitmq.mock;

import static org.junit.jupiter.api.Assertions.*;

import com.github.fridujo.rabbitmq.mock.MockConnectionFactory;
import com.rabbitmq.client.*;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

class QueueConsumerTest {

private Connection connection;
private Channel channel;
private final String queueName = "consumer-test-queue";

@BeforeEach
void setUp() throws IOException, TimeoutException {//Setup the mock for the connection factory
ConnectionFactory factory = new MockConnectionFactory();
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(queueName, false, false, false, null);
}

@Test
void testQueueConsumerReceivesMessagesOfDifferentSizes() throws Exception {//sends various sized messaages from empty to large
String[] testMessages = {
"", // Empty message
"Short",
"This is a typical message",
"00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000" // Large message
};

for (String message : testMessages) {//publish to queue
channel.basicPublish("", queueName, null, message.getBytes(StandardCharsets.UTF_8));
}

CountDownLatch latch = new CountDownLatch(testMessages.length);
AtomicReference<String> lastReceivedMessage = new AtomicReference<>();

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
lastReceivedMessage.set(new String(delivery.getBody(), StandardCharsets.UTF_8));
latch.countDown();
};

channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});

latch.await();
assertEquals(testMessages[testMessages.length - 1], lastReceivedMessage.get(), "The last received message should match the last sent message.");
}

@Test
void testConsumerWithManualAcknowledgment() throws Exception {
String message = "Manual Acknowledgment Test";
channel.basicPublish("", queueName, null, message.getBytes(StandardCharsets.UTF_8));

CountDownLatch latch = new CountDownLatch(1);
AtomicReference<String> receivedMessage = new AtomicReference<>();

channel.basicConsume(queueName, false, (consumerTag, delivery) -> {
receivedMessage.set(new String(delivery.getBody(), StandardCharsets.UTF_8));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
latch.countDown();
}, consumerTag -> {});

latch.await();
assertEquals(message, receivedMessage.get(), "Message should be received and acknowledged.");

// Verify that the queue is empty after acknowledgment
GetResponse response = channel.basicGet(queueName, false);
assertNull(response, "Queue should be empty after message acknowledgment.");
}

@Test
void testConsumerRejectsMessageWithoutRequeue() throws Exception {
String message = "Reject Without Requeue";
channel.basicPublish("", queueName, null, message.getBytes(StandardCharsets.UTF_8));

CountDownLatch latch = new CountDownLatch(1);
AtomicReference<String> receivedMessage = new AtomicReference<>();

channel.basicConsume(queueName, false, (consumerTag, delivery) -> {
receivedMessage.set(new String(delivery.getBody(), StandardCharsets.UTF_8));
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false); // reject without requeue
latch.countDown();
}, consumerTag -> {});

latch.await();
assertEquals(message, receivedMessage.get(), "Message should be received before rejection.");

// verify that the queue is empty
GetResponse response = channel.basicGet(queueName, false);
assertNull(response, "Queue should be empty after message rejection.");
}

@Test
void testConsumerRejectsMessageWithRequeue() throws Exception {
String message = "Reject With Requeue";
channel.basicPublish("", queueName, null, message.getBytes(StandardCharsets.UTF_8));

CountDownLatch latch = new CountDownLatch(1);
AtomicReference<String> receivedMessage = new AtomicReference<>();

channel.basicConsume(queueName, false, (consumerTag, delivery) -> {
receivedMessage.set(new String(delivery.getBody(), StandardCharsets.UTF_8));
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), true); // Reject with requeue
latch.countDown();
}, consumerTag -> {});

latch.await();
assertEquals(message, receivedMessage.get(), "Message should be received before rejection.");

// verify that the message is still in the queue
GetResponse response = channel.basicGet(queueName, false);
assertNotNull(response, "Queue should still have the message after requeue.");
assertEquals(message, new String(response.getBody(), StandardCharsets.UTF_8), "Requeued message should match the original.");
}

@Test
void testConsumerHandlesEmptyQueueGracefully() throws Exception {
GetResponse response = channel.basicGet(queueName, false);
assertNull(response, "Queue should be empty, and consumer should not fail.");
}
}