From 0303f8d7553c7e4965bcbf9e3ae74461ad61b5c5 Mon Sep 17 00:00:00 2001 From: Marcos Tischer Vallim Date: Thu, 28 May 2026 02:06:37 -0300 Subject: [PATCH 1/5] test(integration): improve tests Signed-off-by: Marcos Tischer Vallim --- .../lib/core/AmazonSqsTemplateIT.java | 604 +++++++++++------- .../lib/core/AmazonSqsTemplateIT.java | 596 ++++++++++------- pom.xml | 21 +- 3 files changed, 762 insertions(+), 459 deletions(-) diff --git a/amazon-sqs-java-messaging-lib-v1/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIT.java b/amazon-sqs-java-messaging-lib-v1/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIT.java index 7fa4d0a..72506b8 100644 --- a/amazon-sqs-java-messaging-lib-v1/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIT.java +++ b/amazon-sqs-java-messaging-lib-v1/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIT.java @@ -17,19 +17,21 @@ package com.amazon.sqs.messaging.lib.core; import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; +import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.stream.IntStream; import org.apache.commons.lang3.RandomStringUtils; @@ -47,20 +49,26 @@ import com.amazon.sqs.messaging.lib.model.QueueProperty; import com.amazon.sqs.messaging.lib.model.RequestEntry; import com.amazon.sqs.messaging.lib.model.ResponseFailEntry; +import com.amazon.sqs.messaging.lib.model.ResponseSuccessEntry; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.model.CreateQueueRequest; +import com.amazonaws.services.sqs.model.DeleteMessageRequest; import com.amazonaws.services.sqs.model.Message; -import com.amazonaws.services.sqs.model.MessageAttributeValue; +import com.amazonaws.services.sqs.model.MessageSystemAttributeName; import com.amazonaws.services.sqs.model.PurgeQueueRequest; +import com.amazonaws.services.sqs.model.QueueAttributeName; import com.amazonaws.services.sqs.model.ReceiveMessageRequest; import com.amazonaws.services.sqs.model.ReceiveMessageResult; +import lombok.SneakyThrows; + // @formatter:off @Testcontainers +@SuppressWarnings("resource") class AmazonSqsTemplateIT { @Container @@ -70,7 +78,7 @@ class AmazonSqsTemplateIT { .withReuse(true) .withServices(Service.SQS); - private static AmazonSQS amazonSQS; + private static AmazonSQS sqsClient; private static String standardQueueUrl; @@ -78,20 +86,20 @@ class AmazonSqsTemplateIT { @BeforeAll static void setup() { - amazonSQS = AmazonSQSClientBuilder.standard() + sqsClient = AmazonSQSClientBuilder.standard() .withEndpointConfiguration(new EndpointConfiguration(localstack.getEndpoint().toString(), localstack.getRegion())) .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(localstack.getAccessKey(), localstack.getSecretKey()))) .build(); - standardQueueUrl = amazonSQS.createQueue( + standardQueueUrl = sqsClient.createQueue( new CreateQueueRequest().withQueueName("integration-test-standard-" + UUID.randomUUID()) ).getQueueUrl(); final Map fifoAttributes = new HashMap<>(); - fifoAttributes.put("FifoQueue", "true"); - fifoAttributes.put("ContentBasedDeduplication", "true"); + fifoAttributes.put(QueueAttributeName.FifoQueue.toString(), "true"); + fifoAttributes.put(QueueAttributeName.ContentBasedDeduplication.toString(), "true"); - fifoQueueUrl = amazonSQS.createQueue( + fifoQueueUrl = sqsClient.createQueue( new CreateQueueRequest() .withQueueName("integration-test-fifo-" + UUID.randomUUID() + ".fifo") .withAttributes(fifoAttributes) @@ -100,17 +108,12 @@ static void setup() { @AfterAll static void teardown() { - if (amazonSQS != null) { - try { - amazonSQS.deleteQueue(standardQueueUrl); - } catch (final Exception e) { - // ignore - } - try { - amazonSQS.deleteQueue(fifoQueueUrl); - } catch (final Exception e) { - // ignore - } + if (Objects.nonNull(sqsClient)) { + sqsClient.shutdown(); + } + + if (Objects.nonNull(localstack)) { + localstack.close(); } } @@ -120,258 +123,427 @@ void before() { purgeQueue(fifoQueueUrl); } + private AmazonSqsTemplate createTemplate( + final String queueUrl, + final boolean fifo, + final long linger, + final int maxBatchSize, + final int maxPoolSize) { + + final QueueProperty queueProperty = QueueProperty.builder() + .fifo(fifo) + .linger(linger) + .maxBatchSize(maxBatchSize) + .maximumPoolSize(maxPoolSize) + .queueUrl(queueUrl) + .build(); + + return new AmazonSqsTemplate<>(sqsClient, queueProperty, new RingBufferBlockingQueue<>(1024)); + } + + private void purgeQueue(final String queueUrl) { + sqsClient.purgeQueue(new PurgeQueueRequest(queueUrl)); + } + + private ReceiveMessageResult receiveMessage(final String queueUrl, final Integer maxNumberOfMessages, final Integer waitTimeSeconds) { + final ReceiveMessageResult result = sqsClient.receiveMessage( + new ReceiveMessageRequest(queueUrl) + .withMaxNumberOfMessages(maxNumberOfMessages) + .withWaitTimeSeconds(waitTimeSeconds) + .withAttributeNames(QueueAttributeName.All) + .withMessageAttributeNames("All")); + + result.getMessages().forEach(message -> sqsClient.deleteMessage(new DeleteMessageRequest(queueUrl, message.getReceiptHandle()))); + + return result; + } + + @SneakyThrows + private void countDownLatch(final Integer count, final Consumer consumer) { + final CountDownLatch countDownLatch = new CountDownLatch(count); + + consumer.accept(countDownLatch); + + countDownLatch.await(1L, TimeUnit.MINUTES); + } + @Test void testSendSingleMessage() { final String messageBody = "hello-sqs-" + UUID.randomUUID(); - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 100L, 10, 5); + countDownLatch(1, countDownLatch -> { - try { - template.send(RequestEntry.builder() - .withId(UUID.randomUUID().toString()) + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 100L, 10, 5); + + final String id = UUID.randomUUID().toString(); + + final ListenableFuture future = template.send(RequestEntry.builder() + .withId(id) .withValue(messageBody) .build()); - template.await().join(); - } finally { - template.shutdown(); - } + template.await().thenRun(template::shutdown).join(); - final List messages = receiveMessages(standardQueueUrl, 1, 5); - assertThat(messages, hasSize(1)); - assertThat(messages.get(0).getBody(), is(messageBody)); + future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), is(id)); + assertThat(result.getMessageId(), notNullValue()); + countDownLatch.countDown(); + }); + }); + + final ReceiveMessageResult result = receiveMessage(standardQueueUrl, 1, 5); + + assertThat(result.getMessages(), hasSize(1)); + + final Message message = result.getMessages().get(0); + assertThat(message.getBody(), is(messageBody)); + assertThat(message.getMessageAttributes().keySet(), hasSize(0)); } @Test void testSendMultipleMessages() { final int messageCount = 500; - final List messageBodies = IntStream.range(0, messageCount) - .mapToObj(i -> "msg-" + i + "-" + UUID.randomUUID()) - .collect(Collectors.toList()); - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 50L, 10, 10); + countDownLatch(messageCount, countDownLatch -> { + final List> futures = new ArrayList<>(); - try { - messageBodies.forEach(body -> { - template.send(RequestEntry.builder() - .withId(UUID.randomUUID().toString()) - .withValue(body) - .build()); + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 50L, 10, 10); + + IntStream.range(0, messageCount).forEach(i -> { + futures.add( + template.send(RequestEntry.builder() + .withId(UUID.randomUUID().toString()) + .withValue("msg-" + i + "-" + UUID.randomUUID()) + .build()) + ); }); - template.await().join(); - } finally { - template.shutdown(); + template.await().thenRun(template::shutdown).join(); + + futures.forEach(future -> future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + countDownLatch.countDown(); + })); + }); + + final List messages = new LinkedList<>(); + + while (messages.size() < messageCount) { + messages.addAll(receiveMessage(standardQueueUrl, 10, 5).getMessages()); } - final List receivedBodies = drainQueue(standardQueueUrl, messageCount); - assertThat(receivedBodies, hasSize(messageCount)); - assertThat(receivedBodies.containsAll(messageBodies), is(true)); + assertThat(messages, hasSize(messageCount)); + + messages.forEach(message -> { + assertThat(message.getBody(), containsString("msg-")); + assertThat(message.getMessageAttributes().keySet(), hasSize(0)); + }); } @Test void testSendMessagesExceedingBatchSize() { final int messageCount = 25; - final int maxBatchSize = 10; - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 50L, maxBatchSize, 10); + countDownLatch(messageCount, countDownLatch -> { + final List> futures = new ArrayList<>(); + + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 50L, 10, 10); - try { IntStream.range(0, messageCount).forEach(i -> { - template.send(RequestEntry.builder() + futures.add(template.send(RequestEntry.builder() .withId(UUID.randomUUID().toString()) .withValue("batch-test-" + i) - .build()); + .build())); }); - template.await().join(); - } finally { - template.shutdown(); + template.await().thenRun(template::shutdown).join(); + + futures.forEach(future -> future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + countDownLatch.countDown(); + })); + }); + + final List messages = new LinkedList<>(); + + while (messages.size() < messageCount) { + messages.addAll(receiveMessage(standardQueueUrl, 10, 5).getMessages()); } - final List receivedBodies = drainQueue(standardQueueUrl, messageCount); - assertThat(receivedBodies, hasSize(messageCount)); + assertThat(messages, hasSize(messageCount)); + + messages.forEach(message -> { + assertThat(message.getBody(), containsString("batch-test-")); + assertThat(message.getMessageAttributes().keySet(), hasSize(0)); + }); } @Test void testSendMessagesWithLinger() { final int messageCount = 20; - final long linger = 200L; - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, linger, 10, 5); + countDownLatch(messageCount, countDownLatch -> { + final List> futures = new ArrayList<>(); + + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 200L, 10, 5); - try { IntStream.range(0, messageCount).forEach(i -> { - template.send(RequestEntry.builder() + futures.add(template.send(RequestEntry.builder() .withId(UUID.randomUUID().toString()) .withValue("linger-test-" + i) - .build()); + .build())); }); - template.await().join(); - } finally { - template.shutdown(); + template.await().thenRun(template::shutdown).join(); + + futures.forEach(future -> future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + countDownLatch.countDown(); + })); + }); + + final List messages = new LinkedList<>(); + + while (messages.size() < messageCount) { + messages.addAll(receiveMessage(standardQueueUrl, 10, 5).getMessages()); } - final List receivedBodies = drainQueue(standardQueueUrl, messageCount); - assertThat(receivedBodies, hasSize(messageCount)); + assertThat(messages, hasSize(messageCount)); + + messages.forEach(message -> { + assertThat(message.getBody(), containsString("linger-test-")); + assertThat(message.getMessageAttributes().keySet(), hasSize(0)); + }); } @Test - void testSendMessageWithMessageAttributes() { + void testSendMessageWithgetMessageAttributes() { final String messageBody = "attr-test-" + UUID.randomUUID(); - final Map expectedAttributes = new HashMap<>(); - expectedAttributes.put("string-attr", new MessageAttributeValue().withDataType("String").withStringValue("hello")); - expectedAttributes.put("number-attr", new MessageAttributeValue().withDataType("Number").withStringValue("42")); - - final Map messageHeaders = new HashMap<>(); - messageHeaders.put("string-attr", "hello"); - messageHeaders.put("number-attr", 42); + countDownLatch(1, countDownLatch -> { + final Map messageHeaders = new HashMap<>(); + messageHeaders.put("string-attr", "hello"); + messageHeaders.put("number-attr", 42); - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 100L, 10, 5); + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 100L, 10, 5); - try { - template.send(RequestEntry.builder() + final ListenableFuture future = template.send(RequestEntry.builder() .withId(UUID.randomUUID().toString()) .withValue(messageBody) .withMessageHeaders(messageHeaders) .build()); - template.await().join(); - } finally { - template.shutdown(); - } + template.await().thenRun(template::shutdown).join(); + + future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + countDownLatch.countDown(); + }); + }); - final ReceiveMessageResult result = amazonSQS.receiveMessage( - new ReceiveMessageRequest() - .withQueueUrl(standardQueueUrl) - .withMaxNumberOfMessages(1) - .withWaitTimeSeconds(5) - .withMessageAttributeNames("All")); + final List messages = new LinkedList<>(); - assertThat(result.getMessages(), hasSize(1)); + while (messages.size() < 1) { + messages.addAll(receiveMessage(standardQueueUrl, 10, 5).getMessages()); + } - final Message message = result.getMessages().get(0); - assertThat(message.getBody(), is(messageBody)); - assertThat(message.getMessageAttributes().get("string-attr").getStringValue(), is("hello")); - assertThat(message.getMessageAttributes().get("number-attr").getStringValue(), is("42")); + assertThat(messages, hasSize(1)); + + messages.forEach(message -> { + assertThat(message.getBody(), is(messageBody)); + assertThat(message.getMessageAttributes().get("string-attr").getStringValue(), is("hello")); + assertThat(message.getMessageAttributes().get("number-attr").getStringValue(), is("42")); + }); } @Test void testSendLargeMessage() { final String largeBody = RandomStringUtils.secure().nextAlphabetic(262_144); - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 200L, 5, 5); + countDownLatch(1, countDownLatch -> { + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 200L, 5, 5); - try { - template.send(RequestEntry.builder() + final ListenableFuture future = template.send(RequestEntry.builder() .withId(UUID.randomUUID().toString()) .withValue(largeBody) .build()); - template.await().join(); - } finally { - template.shutdown(); + template.await().thenRun(template::shutdown).join(); + + future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + countDownLatch.countDown(); + }); + }); + + final List messages = new LinkedList<>(); + + while (messages.size() < 1) { + messages.addAll(receiveMessage(standardQueueUrl, 10, 5).getMessages()); } - final List messages = receiveMessages(standardQueueUrl, 1, 10); assertThat(messages, hasSize(1)); - assertThat(messages.get(0).getBody(), is(largeBody)); + + messages.forEach(message -> { + assertThat(message.getBody(), is(largeBody)); + assertThat(message.getMessageAttributes().keySet(), hasSize(0)); + }); } @Test void testSendMessageExceedingMaxSize() { - final String oversizedBody = RandomStringUtils.secure().nextAlphabetic((1024 * 1024) + 1); + countDownLatch(1, countDownLatch -> { + final String oversizedBody = RandomStringUtils.secure().nextAlphabetic((1024 * 1024) + 1); - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 100L, 10, 5); + final RequestEntry entry = RequestEntry.builder() + .withId(UUID.randomUUID().toString()) + .withValue(oversizedBody) + .build(); - final RequestEntry entry = RequestEntry.builder() - .withId(UUID.randomUUID().toString()) - .withValue(oversizedBody) - .build(); + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 100L, 10, 5); - final AtomicReference responseFailEntry = new AtomicReference<>(); + final ListenableFuture future = template.send(entry); - try { - template.send(entry).addCallback(null, failCallback -> { - responseFailEntry.set(failCallback); + template.await().thenRun(template::shutdown).join(); + + future.addCallback(null, failureResult -> { + assertThat(failureResult.getCode(), is("000")); + assertThat(failureResult.getId(), is(entry.getId())); + assertThat(failureResult.getMessage(), containsString("The maximum allowed message size exceeding 1024KB (1,048,576 bytes).")); + assertThat(failureResult.getSenderFault(), is(true)); + countDownLatch.countDown(); }); + }); - template.await().join(); - } finally { - template.shutdown(); - } + final List messages = receiveMessage(standardQueueUrl, 10, 5).getMessages(); - assertThat(responseFailEntry.get().getCode(), is("000")); - assertThat(responseFailEntry.get().getId(), is(entry.getId())); - assertThat(responseFailEntry.get().getMessage(), containsString("The maximum allowed message size exceeding 1024KB (1,048,576 bytes).")); - assertThat(responseFailEntry.get().getSenderFault(), is(true)); + assertThat(messages, hasSize(0)); } @Test void testShutdownDrainsPendingMessages() { final int messageCount = 5; - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 10_000L, 10, 5); + countDownLatch(messageCount, countDownLatch -> { + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 10_000L, 10, 5); - IntStream.range(0, messageCount).forEach(i -> { - template.send(RequestEntry.builder() - .withId(UUID.randomUUID().toString()) - .withValue("drain-test-" + i) - .build()); + final List> futures = new ArrayList<>(); + + IntStream.range(0, messageCount).forEach(i -> { + futures.add(template.send(RequestEntry.builder() + .withId(UUID.randomUUID().toString()) + .withValue("drain-test-" + i) + .build())); + }); + + template.await().thenRun(template::shutdown).join(); + + futures.forEach(future -> future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + countDownLatch.countDown(); + })); }); - template.shutdown(); + final List messages = new LinkedList<>(); + + while (messages.size() < messageCount) { + messages.addAll(receiveMessage(standardQueueUrl, 10, 5).getMessages()); + } + + assertThat(messages, hasSize(messageCount)); - final List receivedBodies = drainQueue(standardQueueUrl, messageCount); - assertThat(receivedBodies, hasSize(messageCount)); + messages.forEach(message -> { + assertThat(message.getBody(), containsString("drain-test-")); + assertThat(message.getMessageAttributes().keySet(), hasSize(0)); + }); } @Test void testTemplateLifecycle() { - final String messageBody = "lifecycle-" + UUID.randomUUID(); + countDownLatch(1, countDownLatch -> { + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 100L, 10, 5); + + final ListenableFuture future = template.send(RequestEntry.builder() + .withId(UUID.randomUUID().toString()) + .withValue("lifecycle-" + UUID.randomUUID()) + .build()); - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 100L, 10, 5); + template.await().thenRun(template::shutdown).join(); - template.send(RequestEntry.builder() - .withId(UUID.randomUUID().toString()) - .withValue(messageBody) - .build()); + future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + countDownLatch.countDown(); + }); + }); - final CompletableFuture awaitFuture = template.await(); - assertThat(awaitFuture, notNullValue()); + final List messages = new LinkedList<>(); - awaitFuture.thenRun(template::shutdown).join(); + while (messages.size() < 1) { + messages.addAll(receiveMessage(standardQueueUrl, 10, 5).getMessages()); + } - final List messages = receiveMessages(standardQueueUrl, 1, 5); assertThat(messages, hasSize(1)); - assertThat(messages.get(0).getBody(), is(messageBody)); + + messages.forEach(message -> { + assertThat(message.getBody(), containsString("lifecycle-")); + assertThat(message.getMessageAttributes().keySet(), hasSize(0)); + }); } @Test void testSendSingleFifoMessage() { final String messageBody = "fifo-single-" + UUID.randomUUID(); - final String groupId = UUID.randomUUID().toString(); + final String id = UUID.randomUUID().toString(); + final String groupId = id; - final AmazonSqsTemplate template = createTemplate(fifoQueueUrl, true, 100L, 10, 1); + countDownLatch(1, countDownLatch -> { + final AmazonSqsTemplate template = createTemplate(fifoQueueUrl, true, 100L, 10, 1); - try { - template.send(RequestEntry.builder() - .withId(UUID.randomUUID().toString()) + final ListenableFuture future = template.send(RequestEntry.builder() + .withId(id) .withValue(messageBody) .withGroupId(groupId) .build()); - template.await().join(); - } finally { - template.shutdown(); + template.await().thenRun(template::shutdown).join(); + + future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), is(id)); + assertThat(result.getMessageId(), notNullValue()); + assertThat(result.getSequenceNumber(), notNullValue()); + countDownLatch.countDown(); + }); + }); + + final List messages = new LinkedList<>(); + + while (messages.size() < 1) { + messages.addAll(receiveMessage(fifoQueueUrl, 10, 5).getMessages()); } - final List messages = receiveMessages(fifoQueueUrl, 1, 5); assertThat(messages, hasSize(1)); - assertThat(messages.get(0).getBody(), is(messageBody)); - assertThat(messages.get(0).getAttributes().get("MessageGroupId"), is(groupId)); + + messages.forEach(message -> { + assertThat(message.getBody(), containsString("fifo-single-")); + assertThat(message.getAttributes().get(MessageSystemAttributeName.MessageGroupId.toString()), is(groupId)); + assertThat(message.getMessageAttributes().keySet(), hasSize(0)); + }); } @Test @@ -379,28 +551,43 @@ void testSendFifoMessagesWithOrdering() { final int messageCount = 100; final String groupId = UUID.randomUUID().toString(); - final AmazonSqsTemplate template = createTemplate(fifoQueueUrl, true, 50L, 10, 1); + countDownLatch(1, countDownLatch -> { + final List> futures = new ArrayList<>(); + + final AmazonSqsTemplate template = createTemplate(fifoQueueUrl, true, 50L, 10, 1); - try { IntStream.range(0, messageCount).forEach(i -> { - template.send(RequestEntry.builder() + futures.add(template.send(RequestEntry.builder() .withId(UUID.randomUUID().toString()) .withValue("ordered-" + i) .withGroupId(groupId) - .build()); + .build())); }); - template.await().join(); - } finally { - template.shutdown(); - } + template.await().thenRun(template::shutdown).join(); + + futures.forEach(future -> future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + assertThat(result.getSequenceNumber(), notNullValue()); + countDownLatch.countDown(); + })); + }); - final List receivedBodies = drainQueue(fifoQueueUrl, messageCount); - assertThat(receivedBodies, hasSize(messageCount)); + final List messages = new LinkedList<>(); - for (int i = 0; i < receivedBodies.size(); i++) { - assertThat(receivedBodies.get(i), is("ordered-" + i)); + while (messages.size() < messageCount) { + messages.addAll(receiveMessage(fifoQueueUrl, 10, 5).getMessages()); } + + assertThat(messages, hasSize(messageCount)); + + messages.forEach(message -> { + assertThat(message.getBody(), containsString("ordered-")); + assertThat(message.getAttributes().get(MessageSystemAttributeName.MessageGroupId.toString()), is(groupId)); + assertThat(message.getMessageAttributes().keySet(), hasSize(0)); + }); } @Test @@ -409,85 +596,50 @@ void testSendFifoMessageWithDeduplication() { final String groupId = UUID.randomUUID().toString(); final String messageBody = "dedup-test-" + UUID.randomUUID(); - final AmazonSqsTemplate template = createTemplate(fifoQueueUrl, true, 100L, 10, 1); + countDownLatch(1, countDownLatch -> { + final List> futures = new ArrayList<>(); - try { - template.send(RequestEntry.builder() + final AmazonSqsTemplate template = createTemplate(fifoQueueUrl, true, 100L, 10, 1); + + futures.add(template.send(RequestEntry.builder() .withId(UUID.randomUUID().toString()) .withValue(messageBody) .withGroupId(groupId) .withDeduplicationId(deduplicationId) - .build()); + .build())); - template.send(RequestEntry.builder() + futures.add(template.send(RequestEntry.builder() .withId(UUID.randomUUID().toString()) .withValue(messageBody + "-duplicate") .withGroupId(groupId) .withDeduplicationId(deduplicationId) - .build()); - - template.await().join(); - } finally { - template.shutdown(); - } - - final List messages = receiveMessages(fifoQueueUrl, 2, 5); - assertThat(messages, hasSize(1)); - assertThat(messages.get(0).getBody(), is(messageBody)); - } - - private AmazonSqsTemplate createTemplate(final String queueUrl, final boolean fifo, - final long linger, final int maxBatchSize, final int maxPoolSize) { + .build())); - final QueueProperty queueProperty = QueueProperty.builder() - .fifo(fifo) - .linger(linger) - .maxBatchSize(maxBatchSize) - .maximumPoolSize(maxPoolSize) - .queueUrl(queueUrl) - .build(); + template.await().thenRun(template::shutdown).join(); - return new AmazonSqsTemplate<>(amazonSQS, queueProperty, new RingBufferBlockingQueue<>(1024)); - } + futures.forEach(future -> future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + assertThat(result.getSequenceNumber(), notNullValue()); + countDownLatch.countDown(); + })); + }); - private List receiveMessages(final String queueUrl, final int maxNumberOfMessages, final int waitTimeSeconds) { - return amazonSQS.receiveMessage( - new ReceiveMessageRequest() - .withQueueUrl(queueUrl) - .withMaxNumberOfMessages(maxNumberOfMessages) - .withWaitTimeSeconds(waitTimeSeconds) - .withMessageAttributeNames("All") - .withAttributeNames("All") - ).getMessages(); - } + final List messages = new LinkedList<>(); - private List drainQueue(final String queueUrl, final int expectedCount) { - final List allBodies = new LinkedList<>(); - - while (allBodies.size() < expectedCount) { - final ReceiveMessageResult result = amazonSQS.receiveMessage( - new ReceiveMessageRequest() - .withQueueUrl(queueUrl) - .withMaxNumberOfMessages(10) - .withWaitTimeSeconds(5) - .withMessageAttributeNames("All") - .withAttributeNames("All")); - - if (result.getMessages().isEmpty()) { - break; - } - - result.getMessages().forEach(msg -> { - allBodies.add(msg.getBody()); - amazonSQS.deleteMessage(queueUrl, msg.getReceiptHandle()); - }); + while (messages.size() < 1) { + messages.addAll(receiveMessage(fifoQueueUrl, 10, 5).getMessages()); } - return allBodies; - } + assertThat(messages, hasSize(1)); - private void purgeQueue(final String queueUrl) { - amazonSQS.purgeQueue(new PurgeQueueRequest(queueUrl)); + messages.forEach(message -> { + assertThat(message.getBody(), is(messageBody)); + assertThat(message.getAttributes().get(MessageSystemAttributeName.MessageGroupId.toString()), is(groupId)); + assertThat(message.getAttributes().get(MessageSystemAttributeName.MessageDeduplicationId.toString()), is(deduplicationId)); + assertThat(message.getMessageAttributes().keySet(), hasSize(0)); + }); } } diff --git a/amazon-sqs-java-messaging-lib-v2/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIT.java b/amazon-sqs-java-messaging-lib-v2/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIT.java index 49f1296..635804c 100644 --- a/amazon-sqs-java-messaging-lib-v2/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIT.java +++ b/amazon-sqs-java-messaging-lib-v2/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIT.java @@ -17,19 +17,21 @@ package com.amazon.sqs.messaging.lib.core; import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; +import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.stream.IntStream; import org.apache.commons.lang3.RandomStringUtils; @@ -47,20 +49,23 @@ import com.amazon.sqs.messaging.lib.model.QueueProperty; import com.amazon.sqs.messaging.lib.model.RequestEntry; import com.amazon.sqs.messaging.lib.model.ResponseFailEntry; +import com.amazon.sqs.messaging.lib.model.ResponseSuccessEntry; +import lombok.SneakyThrows; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; -import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; import software.amazon.awssdk.services.sqs.model.Message; -import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; +import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName; import software.amazon.awssdk.services.sqs.model.PurgeQueueRequest; -import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; +import software.amazon.awssdk.services.sqs.model.QueueAttributeName; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; // @formatter:off @Testcontainers +@SuppressWarnings("resource") class AmazonSqsTemplateIT { @Container @@ -91,31 +96,26 @@ static void setup() { .build() ).queueUrl(); - final Map fifoAttributes = new HashMap<>(); - fifoAttributes.put("FifoQueue", "true"); - fifoAttributes.put("ContentBasedDeduplication", "true"); + final Map fifoAttributes = new HashMap<>(); + fifoAttributes.put(QueueAttributeName.FIFO_QUEUE, "true"); + fifoAttributes.put(QueueAttributeName.CONTENT_BASED_DEDUPLICATION, "true"); fifoQueueUrl = sqsClient.createQueue( CreateQueueRequest.builder() .queueName("integration-test-fifo-" + UUID.randomUUID() + ".fifo") - .attributesWithStrings(fifoAttributes) + .attributes(fifoAttributes) .build() ).queueUrl(); } @AfterAll static void teardown() { - if (sqsClient != null) { - try { - sqsClient.deleteQueue(r -> r.queueUrl(standardQueueUrl)); - } catch (final Exception e) { - // ignore - } - try { - sqsClient.deleteQueue(r -> r.queueUrl(fifoQueueUrl)); - } catch (final Exception e) { - // ignore - } + if (Objects.nonNull(sqsClient)) { + sqsClient.close(); + } + + if (Objects.nonNull(localstack)) { + localstack.close(); } } @@ -125,250 +125,427 @@ void before() { purgeQueue(fifoQueueUrl); } + private AmazonSqsTemplate createTemplate( + final String queueUrl, + final boolean fifo, + final long linger, + final int maxBatchSize, + final int maxPoolSize) { + + final QueueProperty queueProperty = QueueProperty.builder() + .fifo(fifo) + .linger(linger) + .maxBatchSize(maxBatchSize) + .maximumPoolSize(maxPoolSize) + .queueUrl(queueUrl) + .build(); + + return new AmazonSqsTemplate<>(sqsClient, queueProperty, new RingBufferBlockingQueue<>(1024)); + } + + private void purgeQueue(final String queueUrl) { + sqsClient.purgeQueue(PurgeQueueRequest.builder().queueUrl(queueUrl).build()); + } + + private ReceiveMessageResponse receiveMessage(final String queueUrl, final Integer maxNumberOfMessages, final Integer waitTimeSeconds) { + final ReceiveMessageResponse result = sqsClient.receiveMessage(request -> request + .queueUrl(queueUrl) + .maxNumberOfMessages(maxNumberOfMessages) + .waitTimeSeconds(waitTimeSeconds) + .attributeNames(QueueAttributeName.ALL) + .messageAttributeNames("All")); + + result.messages().forEach(message -> sqsClient.deleteMessage(request -> request.receiptHandle(message.receiptHandle()).queueUrl(queueUrl))); + + return result; + } + + @SneakyThrows + private void countDownLatch(final Integer count, final Consumer consumer) { + final CountDownLatch countDownLatch = new CountDownLatch(count); + + consumer.accept(countDownLatch); + + countDownLatch.await(1L, TimeUnit.MINUTES); + } + @Test void testSendSingleMessage() { final String messageBody = "hello-sqs-" + UUID.randomUUID(); - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 100L, 10, 5); + countDownLatch(1, countDownLatch -> { - try { - template.send(RequestEntry.builder() - .withId(UUID.randomUUID().toString()) + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 100L, 10, 5); + + final String id = UUID.randomUUID().toString(); + + final ListenableFuture future = template.send(RequestEntry.builder() + .withId(id) .withValue(messageBody) .build()); - template.await().join(); - } finally { - template.shutdown(); - } + template.await().thenRun(template::shutdown).join(); - final List messages = receiveMessages(standardQueueUrl, 1, 5); - assertThat(messages, hasSize(1)); - assertThat(messages.get(0).body(), is(messageBody)); + future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), is(id)); + assertThat(result.getMessageId(), notNullValue()); + countDownLatch.countDown(); + }); + }); + + final ReceiveMessageResponse result = receiveMessage(standardQueueUrl, 1, 5); + + assertThat(result.messages(), hasSize(1)); + + final Message message = result.messages().get(0); + assertThat(message.body(), is(messageBody)); + assertThat(message.messageAttributes().keySet(), hasSize(0)); } @Test void testSendMultipleMessages() { final int messageCount = 500; - final List messageBodies = IntStream.range(0, messageCount) - .mapToObj(i -> "msg-" + i + "-" + UUID.randomUUID()) - .collect(Collectors.toList()); - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 50L, 10, 10); + countDownLatch(messageCount, countDownLatch -> { + final List> futures = new ArrayList<>(); - try { - messageBodies.forEach(body -> { - template.send(RequestEntry.builder() - .withId(UUID.randomUUID().toString()) - .withValue(body) - .build()); + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 50L, 10, 10); + + IntStream.range(0, messageCount).forEach(i -> { + futures.add( + template.send(RequestEntry.builder() + .withId(UUID.randomUUID().toString()) + .withValue("msg-" + i + "-" + UUID.randomUUID()) + .build()) + ); }); - template.await().join(); - } finally { - template.shutdown(); + template.await().thenRun(template::shutdown).join(); + + futures.forEach(future -> future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + countDownLatch.countDown(); + })); + }); + + final List messages = new LinkedList<>(); + + while (messages.size() < messageCount) { + messages.addAll(receiveMessage(standardQueueUrl, 10, 5).messages()); } - final List receivedBodies = drainQueue(standardQueueUrl, messageCount); - assertThat(receivedBodies, hasSize(messageCount)); - assertThat(receivedBodies.containsAll(messageBodies), is(true)); + assertThat(messages, hasSize(messageCount)); + + messages.forEach(message -> { + assertThat(message.body(), containsString("msg-")); + assertThat(message.messageAttributes().keySet(), hasSize(0)); + }); } @Test void testSendMessagesExceedingBatchSize() { final int messageCount = 25; - final int maxBatchSize = 10; - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 50L, maxBatchSize, 10); + countDownLatch(messageCount, countDownLatch -> { + final List> futures = new ArrayList<>(); + + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 50L, 10, 10); - try { IntStream.range(0, messageCount).forEach(i -> { - template.send(RequestEntry.builder() + futures.add(template.send(RequestEntry.builder() .withId(UUID.randomUUID().toString()) .withValue("batch-test-" + i) - .build()); + .build())); }); - template.await().join(); - } finally { - template.shutdown(); + template.await().thenRun(template::shutdown).join(); + + futures.forEach(future -> future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + countDownLatch.countDown(); + })); + }); + + final List messages = new LinkedList<>(); + + while (messages.size() < messageCount) { + messages.addAll(receiveMessage(standardQueueUrl, 10, 5).messages()); } - final List receivedBodies = drainQueue(standardQueueUrl, messageCount); - assertThat(receivedBodies, hasSize(messageCount)); + assertThat(messages, hasSize(messageCount)); + + messages.forEach(message -> { + assertThat(message.body(), containsString("batch-test-")); + assertThat(message.messageAttributes().keySet(), hasSize(0)); + }); } @Test void testSendMessagesWithLinger() { final int messageCount = 20; - final long linger = 200L; - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, linger, 10, 5); + countDownLatch(messageCount, countDownLatch -> { + final List> futures = new ArrayList<>(); + + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 200L, 10, 5); - try { IntStream.range(0, messageCount).forEach(i -> { - template.send(RequestEntry.builder() + futures.add(template.send(RequestEntry.builder() .withId(UUID.randomUUID().toString()) .withValue("linger-test-" + i) - .build()); + .build())); }); - template.await().join(); - } finally { - template.shutdown(); + template.await().thenRun(template::shutdown).join(); + + futures.forEach(future -> future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + countDownLatch.countDown(); + })); + }); + + final List messages = new LinkedList<>(); + + while (messages.size() < messageCount) { + messages.addAll(receiveMessage(standardQueueUrl, 10, 5).messages()); } - final List receivedBodies = drainQueue(standardQueueUrl, messageCount); - assertThat(receivedBodies, hasSize(messageCount)); + assertThat(messages, hasSize(messageCount)); + + messages.forEach(message -> { + assertThat(message.body(), containsString("linger-test-")); + assertThat(message.messageAttributes().keySet(), hasSize(0)); + }); } @Test void testSendMessageWithMessageAttributes() { final String messageBody = "attr-test-" + UUID.randomUUID(); - final Map messageHeaders = new HashMap<>(); - messageHeaders.put("string-attr", "hello"); - messageHeaders.put("number-attr", 42); + countDownLatch(1, countDownLatch -> { + final Map messageHeaders = new HashMap<>(); + messageHeaders.put("string-attr", "hello"); + messageHeaders.put("number-attr", 42); - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 100L, 10, 5); + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 100L, 10, 5); - try { - template.send(RequestEntry.builder() + final ListenableFuture future = template.send(RequestEntry.builder() .withId(UUID.randomUUID().toString()) .withValue(messageBody) .withMessageHeaders(messageHeaders) .build()); - template.await().join(); - } finally { - template.shutdown(); + template.await().thenRun(template::shutdown).join(); + + future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + countDownLatch.countDown(); + }); + }); + + final List messages = new LinkedList<>(); + + while (messages.size() < 1) { + messages.addAll(receiveMessage(standardQueueUrl, 10, 5).messages()); } - final List messages = receiveMessages(standardQueueUrl, 1, 5); assertThat(messages, hasSize(1)); - final Message message = messages.get(0); - assertThat(message.body(), is(messageBody)); - - final Map attrs = message.messageAttributes(); - assertThat(attrs.get("string-attr").stringValue(), is("hello")); - assertThat(attrs.get("number-attr").stringValue(), is("42")); + messages.forEach(message -> { + assertThat(message.body(), is(messageBody)); + assertThat(message.messageAttributes().get("string-attr").stringValue(), is("hello")); + assertThat(message.messageAttributes().get("number-attr").stringValue(), is("42")); + }); } @Test void testSendLargeMessage() { final String largeBody = RandomStringUtils.secure().nextAlphabetic(262_144); - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 200L, 5, 5); + countDownLatch(1, countDownLatch -> { + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 200L, 5, 5); - try { - template.send(RequestEntry.builder() + final ListenableFuture future = template.send(RequestEntry.builder() .withId(UUID.randomUUID().toString()) .withValue(largeBody) .build()); - template.await().join(); - } finally { - template.shutdown(); + template.await().thenRun(template::shutdown).join(); + + future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + countDownLatch.countDown(); + }); + }); + + final List messages = new LinkedList<>(); + + while (messages.size() < 1) { + messages.addAll(receiveMessage(standardQueueUrl, 10, 5).messages()); } - final List messages = receiveMessages(standardQueueUrl, 1, 10); assertThat(messages, hasSize(1)); - assertThat(messages.get(0).body(), is(largeBody)); + + messages.forEach(message -> { + assertThat(message.body(), is(largeBody)); + assertThat(message.messageAttributes().keySet(), hasSize(0)); + }); } @Test void testSendMessageExceedingMaxSize() { - final String oversizedBody = RandomStringUtils.secure().nextAlphabetic((1024 * 1024) + 1); + countDownLatch(1, countDownLatch -> { + final String oversizedBody = RandomStringUtils.secure().nextAlphabetic((1024 * 1024) + 1); - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 100L, 10, 5); + final RequestEntry entry = RequestEntry.builder() + .withId(UUID.randomUUID().toString()) + .withValue(oversizedBody) + .build(); - final AtomicReference responseFailEntry = new AtomicReference<>(); + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 100L, 10, 5); - final RequestEntry entry = RequestEntry.builder() - .withId(UUID.randomUUID().toString()) - .withValue(oversizedBody) - .build(); + final ListenableFuture future = template.send(entry); - try { - template.send(entry).addCallback(null, failCallback -> { - responseFailEntry.set(failCallback); + template.await().thenRun(template::shutdown).join(); + + future.addCallback(null, failureResult -> { + assertThat(failureResult.getCode(), is("000")); + assertThat(failureResult.getId(), is(entry.getId())); + assertThat(failureResult.getMessage(), containsString("The maximum allowed message size exceeding 1024KB (1,048,576 bytes).")); + assertThat(failureResult.getSenderFault(), is(true)); + countDownLatch.countDown(); }); + }); - template.await().join(); - } finally { - template.shutdown(); - } + final List messages = receiveMessage(standardQueueUrl, 10, 5).messages(); - assertThat(responseFailEntry.get().getCode(), is("000")); - assertThat(responseFailEntry.get().getId(), is(entry.getId())); - assertThat(responseFailEntry.get().getMessage(), containsString("The maximum allowed message size exceeding 1024KB (1,048,576 bytes).")); - assertThat(responseFailEntry.get().getSenderFault(), is(true)); + assertThat(messages, hasSize(0)); } @Test void testShutdownDrainsPendingMessages() { final int messageCount = 5; - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 10_000L, 10, 5); + countDownLatch(messageCount, countDownLatch -> { + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 10_000L, 10, 5); - IntStream.range(0, messageCount).forEach(i -> { - template.send(RequestEntry.builder() - .withId(UUID.randomUUID().toString()) - .withValue("drain-test-" + i) - .build()); + final List> futures = new ArrayList<>(); + + IntStream.range(0, messageCount).forEach(i -> { + futures.add(template.send(RequestEntry.builder() + .withId(UUID.randomUUID().toString()) + .withValue("drain-test-" + i) + .build())); + }); + + template.await().thenRun(template::shutdown).join(); + + futures.forEach(future -> future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + countDownLatch.countDown(); + })); }); - template.shutdown(); + final List messages = new LinkedList<>(); + + while (messages.size() < messageCount) { + messages.addAll(receiveMessage(standardQueueUrl, 10, 5).messages()); + } + + assertThat(messages, hasSize(messageCount)); - final List receivedBodies = drainQueue(standardQueueUrl, messageCount); - assertThat(receivedBodies, hasSize(messageCount)); + messages.forEach(message -> { + assertThat(message.body(), containsString("drain-test-")); + assertThat(message.messageAttributes().keySet(), hasSize(0)); + }); } @Test void testTemplateLifecycle() { - final String messageBody = "lifecycle-" + UUID.randomUUID(); + countDownLatch(1, countDownLatch -> { + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 100L, 10, 5); - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 100L, 10, 5); + final ListenableFuture future = template.send(RequestEntry.builder() + .withId(UUID.randomUUID().toString()) + .withValue("lifecycle-" + UUID.randomUUID()) + .build()); - template.send(RequestEntry.builder() - .withId(UUID.randomUUID().toString()) - .withValue(messageBody) - .build()); + template.await().thenRun(template::shutdown).join(); - final CompletableFuture awaitFuture = template.await(); - assertThat(awaitFuture, notNullValue()); + future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + countDownLatch.countDown(); + }); + }); + + final List messages = new LinkedList<>(); - awaitFuture.thenRun(template::shutdown).join(); + while (messages.size() < 1) { + messages.addAll(receiveMessage(standardQueueUrl, 10, 5).messages()); + } - final List messages = receiveMessages(standardQueueUrl, 1, 5); assertThat(messages, hasSize(1)); - assertThat(messages.get(0).body(), is(messageBody)); + + messages.forEach(message -> { + assertThat(message.body(), containsString("lifecycle-")); + assertThat(message.messageAttributes().keySet(), hasSize(0)); + }); } @Test void testSendSingleFifoMessage() { final String messageBody = "fifo-single-" + UUID.randomUUID(); - final String groupId = UUID.randomUUID().toString(); + final String id = UUID.randomUUID().toString(); + final String groupId = id; - final AmazonSqsTemplate template = createTemplate(fifoQueueUrl, true, 100L, 10, 1); + countDownLatch(1, countDownLatch -> { + final AmazonSqsTemplate template = createTemplate(fifoQueueUrl, true, 100L, 10, 1); - try { - template.send(RequestEntry.builder() - .withId(UUID.randomUUID().toString()) + final ListenableFuture future = template.send(RequestEntry.builder() + .withId(id) .withValue(messageBody) .withGroupId(groupId) .build()); - template.await().join(); - } finally { - template.shutdown(); + template.await().thenRun(template::shutdown).join(); + + future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), is(id)); + assertThat(result.getMessageId(), notNullValue()); + assertThat(result.getSequenceNumber(), notNullValue()); + countDownLatch.countDown(); + }); + }); + + final List messages = new LinkedList<>(); + + while (messages.size() < 1) { + messages.addAll(receiveMessage(fifoQueueUrl, 10, 5).messages()); } - final List messages = receiveMessages(fifoQueueUrl, 1, 5); assertThat(messages, hasSize(1)); - assertThat(messages.get(0).body(), is(messageBody)); - assertThat(messages.get(0).attributesAsStrings().get("MessageGroupId"), is(groupId)); + + messages.forEach(message -> { + assertThat(message.body(), containsString("fifo-single-")); + assertThat(message.attributes().get(MessageSystemAttributeName.MESSAGE_GROUP_ID), is(groupId)); + assertThat(message.messageAttributes().keySet(), hasSize(0)); + }); } @Test @@ -376,28 +553,43 @@ void testSendFifoMessagesWithOrdering() { final int messageCount = 100; final String groupId = UUID.randomUUID().toString(); - final AmazonSqsTemplate template = createTemplate(fifoQueueUrl, true, 50L, 10, 1); + countDownLatch(1, countDownLatch -> { + final List> futures = new ArrayList<>(); + + final AmazonSqsTemplate template = createTemplate(fifoQueueUrl, true, 50L, 10, 1); - try { IntStream.range(0, messageCount).forEach(i -> { - template.send(RequestEntry.builder() + futures.add(template.send(RequestEntry.builder() .withId(UUID.randomUUID().toString()) .withValue("ordered-" + i) .withGroupId(groupId) - .build()); + .build())); }); - template.await().join(); - } finally { - template.shutdown(); - } + template.await().thenRun(template::shutdown).join(); + + futures.forEach(future -> future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + assertThat(result.getSequenceNumber(), notNullValue()); + countDownLatch.countDown(); + })); + }); - final List receivedBodies = drainQueue(fifoQueueUrl, messageCount); - assertThat(receivedBodies, hasSize(messageCount)); + final List messages = new LinkedList<>(); - for (int i = 0; i < receivedBodies.size(); i++) { - assertThat(receivedBodies.get(i), is("ordered-" + i)); + while (messages.size() < messageCount) { + messages.addAll(receiveMessage(fifoQueueUrl, 10, 5).messages()); } + + assertThat(messages, hasSize(messageCount)); + + messages.forEach(message -> { + assertThat(message.body(), containsString("ordered-")); + assertThat(message.attributes().get(MessageSystemAttributeName.MESSAGE_GROUP_ID), is(groupId)); + assertThat(message.messageAttributes().keySet(), hasSize(0)); + }); } @Test @@ -406,91 +598,51 @@ void testSendFifoMessageWithDeduplication() { final String groupId = UUID.randomUUID().toString(); final String messageBody = "dedup-test-" + UUID.randomUUID(); - final AmazonSqsTemplate template = createTemplate(fifoQueueUrl, true, 100L, 10, 1); + countDownLatch(1, countDownLatch -> { + final List> futures = new ArrayList<>(); + + final AmazonSqsTemplate template = createTemplate(fifoQueueUrl, true, 100L, 10, 1); - try { - template.send(RequestEntry.builder() + futures.add(template.send(RequestEntry.builder() .withId(UUID.randomUUID().toString()) .withValue(messageBody) .withGroupId(groupId) .withDeduplicationId(deduplicationId) - .build()); + .build())); - template.send(RequestEntry.builder() + futures.add(template.send(RequestEntry.builder() .withId(UUID.randomUUID().toString()) .withValue(messageBody + "-duplicate") .withGroupId(groupId) .withDeduplicationId(deduplicationId) - .build()); + .build())); - template.await().join(); - } finally { - template.shutdown(); - } - - final List messages = drainQueue(fifoQueueUrl, 2); - assertThat(messages, hasSize(1)); - assertThat(messages.get(0), is(messageBody)); - } - - private AmazonSqsTemplate createTemplate(final String queueUrl, final boolean fifo, - final long linger, final int maxBatchSize, final int maxPoolSize) { - - final QueueProperty queueProperty = QueueProperty.builder() - .fifo(fifo) - .linger(linger) - .maxBatchSize(maxBatchSize) - .maximumPoolSize(maxPoolSize) - .queueUrl(queueUrl) - .build(); + template.await().thenRun(template::shutdown).join(); - return new AmazonSqsTemplate<>(sqsClient, queueProperty, new RingBufferBlockingQueue<>(1024)); - } + futures.forEach(future -> future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + assertThat(result.getSequenceNumber(), notNullValue()); + countDownLatch.countDown(); + })); + }); - private List receiveMessages(final String queueUrl, final int maxNumberOfMessages, final int waitTimeSeconds) { - return sqsClient.receiveMessage( - ReceiveMessageRequest.builder() - .queueUrl(queueUrl) - .maxNumberOfMessages(maxNumberOfMessages) - .waitTimeSeconds(waitTimeSeconds) - .messageAttributeNames("All") - .attributeNamesWithStrings("All") - .build() - ).messages(); - } + final List messages = new LinkedList<>(); - private List drainQueue(final String queueUrl, final int expectedCount) { - final List allBodies = new LinkedList<>(); - - while (allBodies.size() < expectedCount) { - final List messages = sqsClient.receiveMessage( - ReceiveMessageRequest.builder() - .queueUrl(queueUrl) - .maxNumberOfMessages(10) - .waitTimeSeconds(5) - .messageAttributeNames("All") - .attributeNamesWithStrings("All") - .build() - ).messages(); - - if (messages.isEmpty()) { - break; - } - - messages.forEach(msg -> { - allBodies.add(msg.body()); - sqsClient.deleteMessage(DeleteMessageRequest.builder() - .queueUrl(queueUrl) - .receiptHandle(msg.receiptHandle()) - .build()); - }); + while (messages.size() < 1) { + messages.addAll(receiveMessage(fifoQueueUrl, 10, 5).messages()); } - return allBodies; - } + assertThat(messages, hasSize(1)); - private void purgeQueue(final String queueUrl) { - sqsClient.purgeQueue(PurgeQueueRequest.builder().queueUrl(queueUrl).build()); + messages.forEach(message -> { + assertThat(message.body(), is(messageBody)); + assertThat(message.attributes().get(MessageSystemAttributeName.MESSAGE_GROUP_ID), is(groupId)); + assertThat(message.attributes().get(MessageSystemAttributeName.MESSAGE_DEDUPLICATION_ID), is(deduplicationId)); + assertThat(message.messageAttributes().keySet(), hasSize(0)); + }); } + } // @formatter:on diff --git a/pom.xml b/pom.xml index 5f4989d..269d17c 100644 --- a/pom.xml +++ b/pom.xml @@ -28,7 +28,6 @@ 4.11.0 5.10.2 2.16.1 - 1.37 4.2.2 3.24.2 3.20.0 @@ -283,16 +282,16 @@ maven-source-plugin ${maven-source-plugin.version} - - org.apache.maven.plugins - maven-surefire-plugin - ${maven-surefire-plugin.version} - - - org.apache.maven.plugins - maven-failsafe-plugin - ${maven-failsafe-plugin.version} - + + org.apache.maven.plugins + maven-surefire-plugin + ${maven-surefire-plugin.version} + + + org.apache.maven.plugins + maven-failsafe-plugin + ${maven-failsafe-plugin.version} + org.apache.maven.plugins maven-javadoc-plugin From aaf2e32ef39ca9f1644b9354a9a3930865d19ae9 Mon Sep 17 00:00:00 2001 From: Marcos Tischer Vallim Date: Thu, 28 May 2026 22:37:03 -0300 Subject: [PATCH 2/5] test(integration): improve tests Signed-off-by: Marcos Tischer Vallim --- .../lib/core/AmazonSqsTemplateIT.java | 8 +++----- .../lib/core/AmazonSqsTemplateIT.java | 19 +++++-------------- 2 files changed, 8 insertions(+), 19 deletions(-) diff --git a/amazon-sqs-java-messaging-lib-v1/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIT.java b/amazon-sqs-java-messaging-lib-v1/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIT.java index 72506b8..4066b39 100644 --- a/amazon-sqs-java-messaging-lib-v1/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIT.java +++ b/amazon-sqs-java-messaging-lib-v1/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIT.java @@ -99,11 +99,9 @@ static void setup() { fifoAttributes.put(QueueAttributeName.FifoQueue.toString(), "true"); fifoAttributes.put(QueueAttributeName.ContentBasedDeduplication.toString(), "true"); - fifoQueueUrl = sqsClient.createQueue( - new CreateQueueRequest() - .withQueueName("integration-test-fifo-" + UUID.randomUUID() + ".fifo") - .withAttributes(fifoAttributes) - ).getQueueUrl(); + fifoQueueUrl = sqsClient.createQueue(new CreateQueueRequest() + .withQueueName("it-fifo-queue.fifo") + .withAttributes(fifoAttributes)).getQueueUrl(); } @AfterAll diff --git a/amazon-sqs-java-messaging-lib-v2/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIT.java b/amazon-sqs-java-messaging-lib-v2/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIT.java index 635804c..cd1557e 100644 --- a/amazon-sqs-java-messaging-lib-v2/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIT.java +++ b/amazon-sqs-java-messaging-lib-v2/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIT.java @@ -56,7 +56,6 @@ import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sqs.SqsClient; -import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; import software.amazon.awssdk.services.sqs.model.Message; import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName; import software.amazon.awssdk.services.sqs.model.PurgeQueueRequest; @@ -86,26 +85,18 @@ static void setup() { sqsClient = SqsClient.builder() .endpointOverride(localstack.getEndpoint()) .region(Region.of(localstack.getRegion())) - .credentialsProvider(StaticCredentialsProvider.create( - AwsBasicCredentials.create(localstack.getAccessKey(), localstack.getSecretKey()))) + .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(localstack.getAccessKey(), localstack.getSecretKey()))) .build(); - standardQueueUrl = sqsClient.createQueue( - CreateQueueRequest.builder() - .queueName("integration-test-standard-" + UUID.randomUUID()) - .build() - ).queueUrl(); + standardQueueUrl = sqsClient.createQueue(request -> request.queueName("it-standard-queue")).queueUrl(); final Map fifoAttributes = new HashMap<>(); fifoAttributes.put(QueueAttributeName.FIFO_QUEUE, "true"); fifoAttributes.put(QueueAttributeName.CONTENT_BASED_DEDUPLICATION, "true"); - fifoQueueUrl = sqsClient.createQueue( - CreateQueueRequest.builder() - .queueName("integration-test-fifo-" + UUID.randomUUID() + ".fifo") - .attributes(fifoAttributes) - .build() - ).queueUrl(); + fifoQueueUrl = sqsClient.createQueue(request -> request + .queueName("it-fifo-queue.fifo") + .attributes(fifoAttributes)).queueUrl(); } @AfterAll From f4c66fb69cbdc76172b4f40d85e9ab06ef357749 Mon Sep 17 00:00:00 2001 From: Marcos Tischer Vallim Date: Thu, 28 May 2026 23:02:47 -0300 Subject: [PATCH 3/5] test(integration): improve tests Signed-off-by: Marcos Tischer Vallim --- .github/workflows/ci-gates.yml | 2 +- pom.xml | 20 -------------------- 2 files changed, 1 insertion(+), 21 deletions(-) diff --git a/.github/workflows/ci-gates.yml b/.github/workflows/ci-gates.yml index 9b772c4..73a9822 100644 --- a/.github/workflows/ci-gates.yml +++ b/.github/workflows/ci-gates.yml @@ -30,7 +30,7 @@ jobs: restore-keys: ${{ runner.os }}-sonar-${{ hashFiles('**/pom.xml') }} - name: Build and analyze - run: mvn -B verify sonar:sonar -P sonar + run: mvn -B verify failsafe:integration-test failsafe:verify sonar:sonar -P sonar env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }} diff --git a/pom.xml b/pom.xml index 269d17c..b25e137 100644 --- a/pom.xml +++ b/pom.xml @@ -636,25 +636,5 @@ - - - integration-test - - - - org.apache.maven.plugins - maven-failsafe-plugin - - - - integration-test - verify - - - - - - - \ No newline at end of file From 379748129258a98dd70786d0537a19649751452b Mon Sep 17 00:00:00 2001 From: Marcos Tischer Vallim Date: Fri, 29 May 2026 00:05:59 -0300 Subject: [PATCH 4/5] test(integration): improve tests --- .github/workflows/ci-gates.yml | 2 +- .github/workflows/ci-maven.yml | 26 +-- .../lib/core/AbstractAmazonSqsConsumer.java | 2 +- .../AmazonSqsThreadPoolExecutorTest.java | 19 +-- .../RingBufferBlockingQueueTest.java | 4 +- .../core/AbstractAmazonSqsConsumerTest.java | 150 ++++++++++-------- .../lib/core/AmazonSqsProducerAsyncTest.java | 4 +- ... => AmazonSqsTemplateIntegrationTest.java} | 2 +- .../lib/core/AmazonSqsProducerAsyncTest.java | 4 +- ... => AmazonSqsTemplateIntegrationTest.java} | 2 +- pom.xml | 6 - 11 files changed, 101 insertions(+), 120 deletions(-) rename amazon-sqs-java-messaging-lib-v1/src/test/java/com/amazon/sqs/messaging/lib/core/{AmazonSqsTemplateIT.java => AmazonSqsTemplateIntegrationTest.java} (99%) rename amazon-sqs-java-messaging-lib-v2/src/test/java/com/amazon/sqs/messaging/lib/core/{AmazonSqsTemplateIT.java => AmazonSqsTemplateIntegrationTest.java} (99%) diff --git a/.github/workflows/ci-gates.yml b/.github/workflows/ci-gates.yml index 73a9822..9b772c4 100644 --- a/.github/workflows/ci-gates.yml +++ b/.github/workflows/ci-gates.yml @@ -30,7 +30,7 @@ jobs: restore-keys: ${{ runner.os }}-sonar-${{ hashFiles('**/pom.xml') }} - name: Build and analyze - run: mvn -B verify failsafe:integration-test failsafe:verify sonar:sonar -P sonar + run: mvn -B verify sonar:sonar -P sonar env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }} diff --git a/.github/workflows/ci-maven.yml b/.github/workflows/ci-maven.yml index bba26e3..d1c2ef3 100644 --- a/.github/workflows/ci-maven.yml +++ b/.github/workflows/ci-maven.yml @@ -54,28 +54,4 @@ jobs: restore-keys: ${{ runner.os }}-test-${{ hashFiles('**/pom.xml') }}-${{ matrix.java-version }} - name: Test with Maven - run: mvn test-compile test --file pom.xml - - integration-test: - needs: test - runs-on: ubuntu-latest - steps: - - name: Checkout repository - uses: actions/checkout@v6 - - - name: Set up JDK - uses: actions/setup-java@v5 - with: - java-version: 17 - distribution: "corretto" - cache: "maven" - - - name: Cache Maven packages - uses: actions/cache@v5 - with: - path: ~/.m2 - key: ${{ runner.os }}-integration-test-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-integration-test-${{ hashFiles('**/pom.xml') }} - - - name: Integration Test with Maven - run: mvn test-compile failsafe:integration-test failsafe:verify --file pom.xml \ No newline at end of file + run: mvn test-compile test --file pom.xml \ No newline at end of file diff --git a/amazon-sqs-java-messaging-lib-template/src/main/java/com/amazon/sqs/messaging/lib/core/AbstractAmazonSqsConsumer.java b/amazon-sqs-java-messaging-lib-template/src/main/java/com/amazon/sqs/messaging/lib/core/AbstractAmazonSqsConsumer.java index 2f9ee96..92389f9 100644 --- a/amazon-sqs-java-messaging-lib-template/src/main/java/com/amazon/sqs/messaging/lib/core/AbstractAmazonSqsConsumer.java +++ b/amazon-sqs-java-messaging-lib-template/src/main/java/com/amazon/sqs/messaging/lib/core/AbstractAmazonSqsConsumer.java @@ -203,7 +203,7 @@ public void run() { */ @SneakyThrows public void shutdown() { - await().thenAccept(result -> { + await().thenRun(() -> { try { LOGGER.warn("Shutdown consumer {}", getClass().getSimpleName()); diff --git a/amazon-sqs-java-messaging-lib-template/src/test/java/com/amazon/sqs/messaging/lib/concurrent/AmazonSqsThreadPoolExecutorTest.java b/amazon-sqs-java-messaging-lib-template/src/test/java/com/amazon/sqs/messaging/lib/concurrent/AmazonSqsThreadPoolExecutorTest.java index 4870d68..2adedec 100644 --- a/amazon-sqs-java-messaging-lib-template/src/test/java/com/amazon/sqs/messaging/lib/concurrent/AmazonSqsThreadPoolExecutorTest.java +++ b/amazon-sqs-java-messaging-lib-template/src/test/java/com/amazon/sqs/messaging/lib/concurrent/AmazonSqsThreadPoolExecutorTest.java @@ -17,6 +17,7 @@ package com.amazon.sqs.messaging.lib.concurrent; import static org.assertj.core.api.Assertions.catchThrowableOfType; +import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -46,11 +47,7 @@ void testSuccessSucceededTaskCount() throws InterruptedException { for(int i = 0; i < 300; i++) { amazonSqsThreadPoolExecutor.execute(() -> { - try { - Thread.sleep(1); - } catch (final InterruptedException e) { - e.printStackTrace(); - } + await().pollDelay(1, TimeUnit.MILLISECONDS).until(() -> true); }); } @@ -95,11 +92,7 @@ void testSuccessActiveTaskCount() throws InterruptedException { for(int i = 0; i < 10; i++) { amazonSqsThreadPoolExecutor.execute(() -> { while(true) { - try { - Thread.sleep(1); - } catch (final InterruptedException e) { - e.printStackTrace(); - } + await().pollDelay(1, TimeUnit.MILLISECONDS).until(() -> true); } }); } @@ -121,11 +114,7 @@ void testSuccessBlockingSubmissionPolicy() throws InterruptedException { amazonSqsThreadPoolExecutor.execute(() -> { while(true) { - try { - Thread.sleep(1); - } catch (final InterruptedException e) { - e.printStackTrace(); - } + await().pollDelay(1, TimeUnit.MILLISECONDS).until(() -> true); } }); diff --git a/amazon-sqs-java-messaging-lib-template/src/test/java/com/amazon/sqs/messaging/lib/concurrent/RingBufferBlockingQueueTest.java b/amazon-sqs-java-messaging-lib-template/src/test/java/com/amazon/sqs/messaging/lib/concurrent/RingBufferBlockingQueueTest.java index 044ec01..a07a692 100644 --- a/amazon-sqs-java-messaging-lib-template/src/test/java/com/amazon/sqs/messaging/lib/concurrent/RingBufferBlockingQueueTest.java +++ b/amazon-sqs-java-messaging-lib-template/src/test/java/com/amazon/sqs/messaging/lib/concurrent/RingBufferBlockingQueueTest.java @@ -97,7 +97,7 @@ void testSuccessWhenIsEmpty() throws InterruptedException { assertThat(ringBlockingQueue.take().getValue(), is(1)); }); - Thread.sleep(2000); + await().pollDelay(2000, TimeUnit.MILLISECONDS).until(() -> true); producer.submit(() -> { ringBlockingQueue.put(RequestEntry.builder().withValue(0).build()); @@ -126,7 +126,7 @@ void testSuccessWhenIsFull() throws InterruptedException { ringBlockingQueue.put(RequestEntry.builder().withValue(1).build()); }); - Thread.sleep(2000); + await().pollDelay(2000, TimeUnit.MILLISECONDS).until(() -> true); consumer.submit(() -> { assertThat(ringBlockingQueue.take().getValue(), is(0)); diff --git a/amazon-sqs-java-messaging-lib-template/src/test/java/com/amazon/sqs/messaging/lib/core/AbstractAmazonSqsConsumerTest.java b/amazon-sqs-java-messaging-lib-template/src/test/java/com/amazon/sqs/messaging/lib/core/AbstractAmazonSqsConsumerTest.java index f8a3f55..4403a05 100644 --- a/amazon-sqs-java-messaging-lib-template/src/test/java/com/amazon/sqs/messaging/lib/core/AbstractAmazonSqsConsumerTest.java +++ b/amazon-sqs-java-messaging-lib-template/src/test/java/com/amazon/sqs/messaging/lib/core/AbstractAmazonSqsConsumerTest.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.testcontainers.shaded.org.awaitility.Awaitility.await; import java.util.LinkedList; import java.util.List; @@ -189,10 +190,11 @@ void testRunWithFifoqueuePublishesSynchronously() throws Exception { final RequestEntry entry = buildRequestEntry("fifo-message"); queueRequests.put(entry); - Thread.sleep(LINGER_MS * 3); - - assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(1)); - assertThat(consumer.getHandleResponseCallCount(), greaterThanOrEqualTo(1)); + await() + .untilAsserted(() -> { + assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(1)); + assertThat(consumer.getHandleResponseCallCount(), greaterThanOrEqualTo(1)); + }); }); } @@ -208,9 +210,10 @@ void testRunWithNonFifoqueuePublishesAsynchronously() throws Exception { final RequestEntry entry = buildRequestEntry("non-fifo-message"); queueRequests.put(entry); - Thread.sleep(LINGER_MS * 3); - - verify(executorService, atLeastOnce()).execute(any(Runnable.class)); + await() + .untilAsserted(() -> + verify(executorService, atLeastOnce()).execute(any(Runnable.class)) + ); }); } @@ -224,10 +227,11 @@ void testRunHandlesPublishExceptionWithoutCrashing() throws Exception { final RequestEntry entry = buildRequestEntry("error-message"); queueRequests.put(entry); - Thread.sleep(LINGER_MS * 3); - - assertThat(consumer.getHandleErrorCallCount(), greaterThanOrEqualTo(1)); - assertThat(consumer.getLastError(), is(notNullValue())); + await() + .untilAsserted(() -> { + assertThat(consumer.getHandleErrorCallCount(), greaterThanOrEqualTo(1)); + assertThat(consumer.getLastError(), is(notNullValue())); + }); }); } @@ -240,20 +244,22 @@ void testRunRecordsCorrectExceptionOnPublishFailure() throws Exception { queueRequests.put(buildRequestEntry("fail-message")); - Thread.sleep(LINGER_MS * 3); - - assertThat(consumer.getLastError(), instanceOf(RuntimeException.class)); - assertThat(consumer.getLastError().getMessage(), containsString("publish failed")); + await() + .untilAsserted(() -> { + assertThat(consumer.getLastError(), instanceOf(RuntimeException.class)); + assertThat(consumer.getLastError().getMessage(), containsString("publish failed")); + }); }); } @Test void testRunDoesNotPublishWhenQueueIsEmpty() throws Exception { - Thread.sleep(LINGER_MS * 3); - context(consumer -> { - assertThat(consumer.getPublishCallCount(), is(0)); - assertThat(consumer.getHandleErrorCallCount(), is(0)); + await() + .untilAsserted(() -> { + assertThat(consumer.getPublishCallCount(), is(0)); + assertThat(consumer.getHandleErrorCallCount(), is(0)); + }); }); } @@ -266,9 +272,10 @@ void testRunPublishesMultipleEntriesInSingleBatch() throws Exception { queueRequests.put(buildRequestEntry("message-" + i)); } - Thread.sleep(LINGER_MS * 4); - - assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(1)); + await() + .untilAsserted(() -> + assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(1)) + ); }); } @@ -282,9 +289,10 @@ void testRunRespectMaxBatchSizeByPublishingInMultipleBatches() throws Exception queueRequests.put(buildRequestEntry("msg-" + i)); } - Thread.sleep(LINGER_MS * 6); - - assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(2)); + await() + .untilAsserted(() -> + assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(2)) + ); }); } @@ -344,9 +352,10 @@ void testPublishDecoratorIsAppliedBeforePublish() throws Exception { context(trackingDecorator, consumer -> { queueRequests.put(buildRequestEntry("decorated-message")); - Thread.sleep(LINGER_MS * 3); - - assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(1)); + await() + .untilAsserted(() -> + assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(1)) + ); }); } @@ -357,10 +366,11 @@ void testPublishDecoratorIdentityDoesNotAlterRequest() throws Exception { context(consumer -> { queueRequests.put(buildRequestEntry("identity-message")); - Thread.sleep(LINGER_MS * 3); - - assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(1)); - assertThat(consumer.getHandleErrorCallCount(), is(0)); + await() + .untilAsserted(() -> { + assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(1)); + assertThat(consumer.getHandleErrorCallCount(), is(0)); + }); }); } @@ -371,10 +381,11 @@ void testCanAddPayloadAllowsEntryWellBelowSizeThreshold() throws Exception { context(consumer -> { queueRequests.put(buildRequestEntry("small-payload")); - Thread.sleep(LINGER_MS * 3); - - assertThat(consumer.getTotalPublishedEntries(), is(1)); - assertThat(consumer.getHandleErrorCallCount(), is(0)); + await() + .untilAsserted(() -> { + assertThat(consumer.getTotalPublishedEntries(), is(1)); + assertThat(consumer.getHandleErrorCallCount(), is(0)); + }); }); } @@ -386,10 +397,11 @@ void testCanAddPayloadAllowsEntryExactlyAtSizeThreshold() throws Exception { final String payloadAtThreshold = buildPayloadOfBytes(TestableAmazonSqsConsumer.batchSizeBytesThreshold()); queueRequests.put(buildRequestEntry(payloadAtThreshold)); - Thread.sleep(LINGER_MS * 3); - - assertThat(consumer.getTotalPublishedEntries(), is(1)); - assertThat(consumer.getHandleErrorCallCount(), is(0)); + await() + .untilAsserted(() -> { + assertThat(consumer.getTotalPublishedEntries(), is(1)); + assertThat(consumer.getHandleErrorCallCount(), is(0)); + }); }); } @@ -401,12 +413,13 @@ void testCanAddPayloadRejectsEntryExceedingSizeThreshold() throws Exception { final String oversizedPayload = buildPayloadOfBytes(TestableAmazonSqsConsumer.batchSizeBytesThreshold() + 1); queueRequests.put(buildRequestEntry(oversizedPayload)); - Thread.sleep(LINGER_MS * 3); - - assertThat(consumer.getTotalPublishedEntries(), is(1)); - assertThat(consumer.getHandleErrorCallCount(), greaterThanOrEqualTo(1)); - assertThat(consumer.getLastError(), instanceOf(MaximumAllowedMessageException.class)); - assertThat(consumer.getLastError().getMessage(), containsString("1024KB")); + await() + .untilAsserted(() -> { + assertThat(consumer.getTotalPublishedEntries(), is(1)); + assertThat(consumer.getHandleErrorCallCount(), greaterThanOrEqualTo(1)); + assertThat(consumer.getLastError(), instanceOf(MaximumAllowedMessageException.class)); + assertThat(consumer.getLastError().getMessage(), containsString("1024KB")); + }); }); } @@ -421,10 +434,11 @@ void testCanAddPayloadStopsAccumulatingWhenBatchExceedsThreshold() throws Except queueRequests.put(buildRequestEntry(buildPayloadOfBytes(halfThreshold))); queueRequests.put(buildRequestEntry("small-overflow")); - Thread.sleep(LINGER_MS * 6); - - assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(2)); - assertThat(consumer.getTotalPublishedEntries(), is(3)); + await() + .untilAsserted(() -> { + assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(2)); + assertThat(consumer.getTotalPublishedEntries(), is(3)); + }); }); } @@ -438,10 +452,11 @@ void testCanAddPayloadPublishesFirstEntryAloneWhenItFillsThreshold() throws Exce queueRequests.put(buildRequestEntry(buildPayloadOfBytes(fullThreshold))); queueRequests.put(buildRequestEntry("second-entry")); - Thread.sleep(LINGER_MS * 6); - - assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(2)); - assertThat(consumer.getPublishedBatchSizes().get(0), is(1)); + await() + .untilAsserted(() -> { + assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(2)); + assertThat(consumer.getPublishedBatchSizes().get(0), is(1)); + }); }); } @@ -455,10 +470,11 @@ void testCanAddPayloadAllowsMultipleSmallEntriesUpToThreshold() throws Exception queueRequests.put(buildRequestEntry("entry-" + i)); } - Thread.sleep(LINGER_MS * 4); - - assertThat(consumer.getTotalPublishedEntries(), is(10)); - assertThat(consumer.getHandleErrorCallCount(), is(0)); + await() + .untilAsserted(() -> { + assertThat(consumer.getTotalPublishedEntries(), is(10)); + assertThat(consumer.getHandleErrorCallCount(), is(0)); + }); }); } @@ -473,10 +489,11 @@ void testCanAddPayloadSplitsBatchWhenCumulativeSizeExceedsThreshold() throws Exc queueRequests.put(buildRequestEntry(buildPayloadOfBytes(chunkSize))); queueRequests.put(buildRequestEntry(buildPayloadOfBytes(chunkSize))); - Thread.sleep(LINGER_MS * 8); - - assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(2)); - assertThat(consumer.getTotalPublishedEntries(), is(3)); + await() + .untilAsserted(() -> { + assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(2)); + assertThat(consumer.getTotalPublishedEntries(), is(3)); + }); }); } @@ -488,10 +505,11 @@ void testCanAddPayloadDoesNotPublishEmptyBatchWhenAllEntriesExceedThreshold() th final String oversizedPayload = buildPayloadOfBytes(TestableAmazonSqsConsumer.batchSizeBytesThreshold() + 100); queueRequests.put(buildRequestEntry(oversizedPayload)); - Thread.sleep(LINGER_MS * 3); - - assertThat(consumer.getTotalPublishedEntries(), is(1)); - assertThat(consumer.getHandleErrorCallCount(), greaterThanOrEqualTo(1)); + await() + .untilAsserted(() -> { + assertThat(consumer.getTotalPublishedEntries(), is(1)); + assertThat(consumer.getHandleErrorCallCount(), greaterThanOrEqualTo(1)); + }); }); } diff --git a/amazon-sqs-java-messaging-lib-v1/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsProducerAsyncTest.java b/amazon-sqs-java-messaging-lib-v1/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsProducerAsyncTest.java index b95f88b..9cce97f 100644 --- a/amazon-sqs-java-messaging-lib-v1/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsProducerAsyncTest.java +++ b/amazon-sqs-java-messaging-lib-v1/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsProducerAsyncTest.java @@ -16,6 +16,7 @@ package com.amazon.sqs.messaging.lib.core; +import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -29,6 +30,7 @@ import java.util.LinkedList; import java.util.List; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.commons.lang3.RandomStringUtils; @@ -217,7 +219,7 @@ void testSuccessBlockingSubmissionPolicy() { when(amazonSQS.sendMessageBatch(any())).thenAnswer(invocation -> { while (true) { - Thread.sleep(1); + await().pollDelay(1, TimeUnit.MILLISECONDS).until(() -> true); } }); diff --git a/amazon-sqs-java-messaging-lib-v1/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIT.java b/amazon-sqs-java-messaging-lib-v1/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIntegrationTest.java similarity index 99% rename from amazon-sqs-java-messaging-lib-v1/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIT.java rename to amazon-sqs-java-messaging-lib-v1/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIntegrationTest.java index 4066b39..3ca1e34 100644 --- a/amazon-sqs-java-messaging-lib-v1/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIT.java +++ b/amazon-sqs-java-messaging-lib-v1/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIntegrationTest.java @@ -69,7 +69,7 @@ // @formatter:off @Testcontainers @SuppressWarnings("resource") -class AmazonSqsTemplateIT { +class AmazonSqsTemplateIntegrationTest { @Container static LocalStackContainer localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:3.4.0")) diff --git a/amazon-sqs-java-messaging-lib-v2/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsProducerAsyncTest.java b/amazon-sqs-java-messaging-lib-v2/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsProducerAsyncTest.java index e86595a..3cc38fd 100644 --- a/amazon-sqs-java-messaging-lib-v2/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsProducerAsyncTest.java +++ b/amazon-sqs-java-messaging-lib-v2/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsProducerAsyncTest.java @@ -16,6 +16,7 @@ package com.amazon.sqs.messaging.lib.core; +import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -29,6 +30,7 @@ import java.util.LinkedList; import java.util.List; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.junit.jupiter.api.BeforeEach; @@ -218,7 +220,7 @@ void testSuccessBlockingSubmissionPolicy() { when(amazonSQS.sendMessageBatch(any(SendMessageBatchRequest.class))).thenAnswer(invocation -> { while (true) { - Thread.sleep(1); + await().pollDelay(1, TimeUnit.MILLISECONDS).until(() -> true); } }); diff --git a/amazon-sqs-java-messaging-lib-v2/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIT.java b/amazon-sqs-java-messaging-lib-v2/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIntegrationTest.java similarity index 99% rename from amazon-sqs-java-messaging-lib-v2/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIT.java rename to amazon-sqs-java-messaging-lib-v2/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIntegrationTest.java index cd1557e..98a1aeb 100644 --- a/amazon-sqs-java-messaging-lib-v2/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIT.java +++ b/amazon-sqs-java-messaging-lib-v2/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIntegrationTest.java @@ -65,7 +65,7 @@ // @formatter:off @Testcontainers @SuppressWarnings("resource") -class AmazonSqsTemplateIT { +class AmazonSqsTemplateIntegrationTest { @Container static LocalStackContainer localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:3.4.0")) diff --git a/pom.xml b/pom.xml index b25e137..7ef7667 100644 --- a/pom.xml +++ b/pom.xml @@ -45,7 +45,6 @@ 3.3.0 3.2.1 2.22.2 - 3.1.2 3.2.0 3.2.8 0.8.12 @@ -287,11 +286,6 @@ maven-surefire-plugin ${maven-surefire-plugin.version} - - org.apache.maven.plugins - maven-failsafe-plugin - ${maven-failsafe-plugin.version} - org.apache.maven.plugins maven-javadoc-plugin From 6e35a82b8f4a7530bf8fdb267c997730372a76c7 Mon Sep 17 00:00:00 2001 From: Marcos Tischer Vallim Date: Fri, 29 May 2026 00:10:06 -0300 Subject: [PATCH 5/5] test(integration): improve tests --- .../messaging/lib/core/AmazonSqsTemplateIntegrationTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/amazon-sqs-java-messaging-lib-v2/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIntegrationTest.java b/amazon-sqs-java-messaging-lib-v2/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIntegrationTest.java index 98a1aeb..883d7dc 100644 --- a/amazon-sqs-java-messaging-lib-v2/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIntegrationTest.java +++ b/amazon-sqs-java-messaging-lib-v2/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIntegrationTest.java @@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.is; import java.util.ArrayList; +import java.util.EnumMap; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -90,7 +91,7 @@ static void setup() { standardQueueUrl = sqsClient.createQueue(request -> request.queueName("it-standard-queue")).queueUrl(); - final Map fifoAttributes = new HashMap<>(); + final Map fifoAttributes = new EnumMap<>(QueueAttributeName.class); fifoAttributes.put(QueueAttributeName.FIFO_QUEUE, "true"); fifoAttributes.put(QueueAttributeName.CONTENT_BASED_DEDUPLICATION, "true");