diff --git a/.github/workflows/ci-maven.yml b/.github/workflows/ci-maven.yml index bba26e3..d1c2ef3 100644 --- a/.github/workflows/ci-maven.yml +++ b/.github/workflows/ci-maven.yml @@ -54,28 +54,4 @@ jobs: restore-keys: ${{ runner.os }}-test-${{ hashFiles('**/pom.xml') }}-${{ matrix.java-version }} - name: Test with Maven - run: mvn test-compile test --file pom.xml - - integration-test: - needs: test - runs-on: ubuntu-latest - steps: - - name: Checkout repository - uses: actions/checkout@v6 - - - name: Set up JDK - uses: actions/setup-java@v5 - with: - java-version: 17 - distribution: "corretto" - cache: "maven" - - - name: Cache Maven packages - uses: actions/cache@v5 - with: - path: ~/.m2 - key: ${{ runner.os }}-integration-test-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-integration-test-${{ hashFiles('**/pom.xml') }} - - - name: Integration Test with Maven - run: mvn test-compile failsafe:integration-test failsafe:verify --file pom.xml \ No newline at end of file + run: mvn test-compile test --file pom.xml \ No newline at end of file diff --git a/amazon-sqs-java-messaging-lib-template/src/main/java/com/amazon/sqs/messaging/lib/core/AbstractAmazonSqsConsumer.java b/amazon-sqs-java-messaging-lib-template/src/main/java/com/amazon/sqs/messaging/lib/core/AbstractAmazonSqsConsumer.java index 2f9ee96..92389f9 100644 --- a/amazon-sqs-java-messaging-lib-template/src/main/java/com/amazon/sqs/messaging/lib/core/AbstractAmazonSqsConsumer.java +++ b/amazon-sqs-java-messaging-lib-template/src/main/java/com/amazon/sqs/messaging/lib/core/AbstractAmazonSqsConsumer.java @@ -203,7 +203,7 @@ public void run() { */ @SneakyThrows public void shutdown() { - await().thenAccept(result -> { + await().thenRun(() -> { try { LOGGER.warn("Shutdown consumer {}", getClass().getSimpleName()); diff --git a/amazon-sqs-java-messaging-lib-template/src/test/java/com/amazon/sqs/messaging/lib/concurrent/AmazonSqsThreadPoolExecutorTest.java b/amazon-sqs-java-messaging-lib-template/src/test/java/com/amazon/sqs/messaging/lib/concurrent/AmazonSqsThreadPoolExecutorTest.java index 4870d68..2adedec 100644 --- a/amazon-sqs-java-messaging-lib-template/src/test/java/com/amazon/sqs/messaging/lib/concurrent/AmazonSqsThreadPoolExecutorTest.java +++ b/amazon-sqs-java-messaging-lib-template/src/test/java/com/amazon/sqs/messaging/lib/concurrent/AmazonSqsThreadPoolExecutorTest.java @@ -17,6 +17,7 @@ package com.amazon.sqs.messaging.lib.concurrent; import static org.assertj.core.api.Assertions.catchThrowableOfType; +import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -46,11 +47,7 @@ void testSuccessSucceededTaskCount() throws InterruptedException { for(int i = 0; i < 300; i++) { amazonSqsThreadPoolExecutor.execute(() -> { - try { - Thread.sleep(1); - } catch (final InterruptedException e) { - e.printStackTrace(); - } + await().pollDelay(1, TimeUnit.MILLISECONDS).until(() -> true); }); } @@ -95,11 +92,7 @@ void testSuccessActiveTaskCount() throws InterruptedException { for(int i = 0; i < 10; i++) { amazonSqsThreadPoolExecutor.execute(() -> { while(true) { - try { - Thread.sleep(1); - } catch (final InterruptedException e) { - e.printStackTrace(); - } + await().pollDelay(1, TimeUnit.MILLISECONDS).until(() -> true); } }); } @@ -121,11 +114,7 @@ void testSuccessBlockingSubmissionPolicy() throws InterruptedException { amazonSqsThreadPoolExecutor.execute(() -> { while(true) { - try { - Thread.sleep(1); - } catch (final InterruptedException e) { - e.printStackTrace(); - } + await().pollDelay(1, TimeUnit.MILLISECONDS).until(() -> true); } }); diff --git a/amazon-sqs-java-messaging-lib-template/src/test/java/com/amazon/sqs/messaging/lib/concurrent/RingBufferBlockingQueueTest.java b/amazon-sqs-java-messaging-lib-template/src/test/java/com/amazon/sqs/messaging/lib/concurrent/RingBufferBlockingQueueTest.java index 044ec01..a07a692 100644 --- a/amazon-sqs-java-messaging-lib-template/src/test/java/com/amazon/sqs/messaging/lib/concurrent/RingBufferBlockingQueueTest.java +++ b/amazon-sqs-java-messaging-lib-template/src/test/java/com/amazon/sqs/messaging/lib/concurrent/RingBufferBlockingQueueTest.java @@ -97,7 +97,7 @@ void testSuccessWhenIsEmpty() throws InterruptedException { assertThat(ringBlockingQueue.take().getValue(), is(1)); }); - Thread.sleep(2000); + await().pollDelay(2000, TimeUnit.MILLISECONDS).until(() -> true); producer.submit(() -> { ringBlockingQueue.put(RequestEntry.builder().withValue(0).build()); @@ -126,7 +126,7 @@ void testSuccessWhenIsFull() throws InterruptedException { ringBlockingQueue.put(RequestEntry.builder().withValue(1).build()); }); - Thread.sleep(2000); + await().pollDelay(2000, TimeUnit.MILLISECONDS).until(() -> true); consumer.submit(() -> { assertThat(ringBlockingQueue.take().getValue(), is(0)); diff --git a/amazon-sqs-java-messaging-lib-template/src/test/java/com/amazon/sqs/messaging/lib/core/AbstractAmazonSqsConsumerTest.java b/amazon-sqs-java-messaging-lib-template/src/test/java/com/amazon/sqs/messaging/lib/core/AbstractAmazonSqsConsumerTest.java index f8a3f55..4403a05 100644 --- a/amazon-sqs-java-messaging-lib-template/src/test/java/com/amazon/sqs/messaging/lib/core/AbstractAmazonSqsConsumerTest.java +++ b/amazon-sqs-java-messaging-lib-template/src/test/java/com/amazon/sqs/messaging/lib/core/AbstractAmazonSqsConsumerTest.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.testcontainers.shaded.org.awaitility.Awaitility.await; import java.util.LinkedList; import java.util.List; @@ -189,10 +190,11 @@ void testRunWithFifoqueuePublishesSynchronously() throws Exception { final RequestEntry entry = buildRequestEntry("fifo-message"); queueRequests.put(entry); - Thread.sleep(LINGER_MS * 3); - - assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(1)); - assertThat(consumer.getHandleResponseCallCount(), greaterThanOrEqualTo(1)); + await() + .untilAsserted(() -> { + assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(1)); + assertThat(consumer.getHandleResponseCallCount(), greaterThanOrEqualTo(1)); + }); }); } @@ -208,9 +210,10 @@ void testRunWithNonFifoqueuePublishesAsynchronously() throws Exception { final RequestEntry entry = buildRequestEntry("non-fifo-message"); queueRequests.put(entry); - Thread.sleep(LINGER_MS * 3); - - verify(executorService, atLeastOnce()).execute(any(Runnable.class)); + await() + .untilAsserted(() -> + verify(executorService, atLeastOnce()).execute(any(Runnable.class)) + ); }); } @@ -224,10 +227,11 @@ void testRunHandlesPublishExceptionWithoutCrashing() throws Exception { final RequestEntry entry = buildRequestEntry("error-message"); queueRequests.put(entry); - Thread.sleep(LINGER_MS * 3); - - assertThat(consumer.getHandleErrorCallCount(), greaterThanOrEqualTo(1)); - assertThat(consumer.getLastError(), is(notNullValue())); + await() + .untilAsserted(() -> { + assertThat(consumer.getHandleErrorCallCount(), greaterThanOrEqualTo(1)); + assertThat(consumer.getLastError(), is(notNullValue())); + }); }); } @@ -240,20 +244,22 @@ void testRunRecordsCorrectExceptionOnPublishFailure() throws Exception { queueRequests.put(buildRequestEntry("fail-message")); - Thread.sleep(LINGER_MS * 3); - - assertThat(consumer.getLastError(), instanceOf(RuntimeException.class)); - assertThat(consumer.getLastError().getMessage(), containsString("publish failed")); + await() + .untilAsserted(() -> { + assertThat(consumer.getLastError(), instanceOf(RuntimeException.class)); + assertThat(consumer.getLastError().getMessage(), containsString("publish failed")); + }); }); } @Test void testRunDoesNotPublishWhenQueueIsEmpty() throws Exception { - Thread.sleep(LINGER_MS * 3); - context(consumer -> { - assertThat(consumer.getPublishCallCount(), is(0)); - assertThat(consumer.getHandleErrorCallCount(), is(0)); + await() + .untilAsserted(() -> { + assertThat(consumer.getPublishCallCount(), is(0)); + assertThat(consumer.getHandleErrorCallCount(), is(0)); + }); }); } @@ -266,9 +272,10 @@ void testRunPublishesMultipleEntriesInSingleBatch() throws Exception { queueRequests.put(buildRequestEntry("message-" + i)); } - Thread.sleep(LINGER_MS * 4); - - assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(1)); + await() + .untilAsserted(() -> + assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(1)) + ); }); } @@ -282,9 +289,10 @@ void testRunRespectMaxBatchSizeByPublishingInMultipleBatches() throws Exception queueRequests.put(buildRequestEntry("msg-" + i)); } - Thread.sleep(LINGER_MS * 6); - - assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(2)); + await() + .untilAsserted(() -> + assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(2)) + ); }); } @@ -344,9 +352,10 @@ void testPublishDecoratorIsAppliedBeforePublish() throws Exception { context(trackingDecorator, consumer -> { queueRequests.put(buildRequestEntry("decorated-message")); - Thread.sleep(LINGER_MS * 3); - - assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(1)); + await() + .untilAsserted(() -> + assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(1)) + ); }); } @@ -357,10 +366,11 @@ void testPublishDecoratorIdentityDoesNotAlterRequest() throws Exception { context(consumer -> { queueRequests.put(buildRequestEntry("identity-message")); - Thread.sleep(LINGER_MS * 3); - - assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(1)); - assertThat(consumer.getHandleErrorCallCount(), is(0)); + await() + .untilAsserted(() -> { + assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(1)); + assertThat(consumer.getHandleErrorCallCount(), is(0)); + }); }); } @@ -371,10 +381,11 @@ void testCanAddPayloadAllowsEntryWellBelowSizeThreshold() throws Exception { context(consumer -> { queueRequests.put(buildRequestEntry("small-payload")); - Thread.sleep(LINGER_MS * 3); - - assertThat(consumer.getTotalPublishedEntries(), is(1)); - assertThat(consumer.getHandleErrorCallCount(), is(0)); + await() + .untilAsserted(() -> { + assertThat(consumer.getTotalPublishedEntries(), is(1)); + assertThat(consumer.getHandleErrorCallCount(), is(0)); + }); }); } @@ -386,10 +397,11 @@ void testCanAddPayloadAllowsEntryExactlyAtSizeThreshold() throws Exception { final String payloadAtThreshold = buildPayloadOfBytes(TestableAmazonSqsConsumer.batchSizeBytesThreshold()); queueRequests.put(buildRequestEntry(payloadAtThreshold)); - Thread.sleep(LINGER_MS * 3); - - assertThat(consumer.getTotalPublishedEntries(), is(1)); - assertThat(consumer.getHandleErrorCallCount(), is(0)); + await() + .untilAsserted(() -> { + assertThat(consumer.getTotalPublishedEntries(), is(1)); + assertThat(consumer.getHandleErrorCallCount(), is(0)); + }); }); } @@ -401,12 +413,13 @@ void testCanAddPayloadRejectsEntryExceedingSizeThreshold() throws Exception { final String oversizedPayload = buildPayloadOfBytes(TestableAmazonSqsConsumer.batchSizeBytesThreshold() + 1); queueRequests.put(buildRequestEntry(oversizedPayload)); - Thread.sleep(LINGER_MS * 3); - - assertThat(consumer.getTotalPublishedEntries(), is(1)); - assertThat(consumer.getHandleErrorCallCount(), greaterThanOrEqualTo(1)); - assertThat(consumer.getLastError(), instanceOf(MaximumAllowedMessageException.class)); - assertThat(consumer.getLastError().getMessage(), containsString("1024KB")); + await() + .untilAsserted(() -> { + assertThat(consumer.getTotalPublishedEntries(), is(1)); + assertThat(consumer.getHandleErrorCallCount(), greaterThanOrEqualTo(1)); + assertThat(consumer.getLastError(), instanceOf(MaximumAllowedMessageException.class)); + assertThat(consumer.getLastError().getMessage(), containsString("1024KB")); + }); }); } @@ -421,10 +434,11 @@ void testCanAddPayloadStopsAccumulatingWhenBatchExceedsThreshold() throws Except queueRequests.put(buildRequestEntry(buildPayloadOfBytes(halfThreshold))); queueRequests.put(buildRequestEntry("small-overflow")); - Thread.sleep(LINGER_MS * 6); - - assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(2)); - assertThat(consumer.getTotalPublishedEntries(), is(3)); + await() + .untilAsserted(() -> { + assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(2)); + assertThat(consumer.getTotalPublishedEntries(), is(3)); + }); }); } @@ -438,10 +452,11 @@ void testCanAddPayloadPublishesFirstEntryAloneWhenItFillsThreshold() throws Exce queueRequests.put(buildRequestEntry(buildPayloadOfBytes(fullThreshold))); queueRequests.put(buildRequestEntry("second-entry")); - Thread.sleep(LINGER_MS * 6); - - assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(2)); - assertThat(consumer.getPublishedBatchSizes().get(0), is(1)); + await() + .untilAsserted(() -> { + assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(2)); + assertThat(consumer.getPublishedBatchSizes().get(0), is(1)); + }); }); } @@ -455,10 +470,11 @@ void testCanAddPayloadAllowsMultipleSmallEntriesUpToThreshold() throws Exception queueRequests.put(buildRequestEntry("entry-" + i)); } - Thread.sleep(LINGER_MS * 4); - - assertThat(consumer.getTotalPublishedEntries(), is(10)); - assertThat(consumer.getHandleErrorCallCount(), is(0)); + await() + .untilAsserted(() -> { + assertThat(consumer.getTotalPublishedEntries(), is(10)); + assertThat(consumer.getHandleErrorCallCount(), is(0)); + }); }); } @@ -473,10 +489,11 @@ void testCanAddPayloadSplitsBatchWhenCumulativeSizeExceedsThreshold() throws Exc queueRequests.put(buildRequestEntry(buildPayloadOfBytes(chunkSize))); queueRequests.put(buildRequestEntry(buildPayloadOfBytes(chunkSize))); - Thread.sleep(LINGER_MS * 8); - - assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(2)); - assertThat(consumer.getTotalPublishedEntries(), is(3)); + await() + .untilAsserted(() -> { + assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(2)); + assertThat(consumer.getTotalPublishedEntries(), is(3)); + }); }); } @@ -488,10 +505,11 @@ void testCanAddPayloadDoesNotPublishEmptyBatchWhenAllEntriesExceedThreshold() th final String oversizedPayload = buildPayloadOfBytes(TestableAmazonSqsConsumer.batchSizeBytesThreshold() + 100); queueRequests.put(buildRequestEntry(oversizedPayload)); - Thread.sleep(LINGER_MS * 3); - - assertThat(consumer.getTotalPublishedEntries(), is(1)); - assertThat(consumer.getHandleErrorCallCount(), greaterThanOrEqualTo(1)); + await() + .untilAsserted(() -> { + assertThat(consumer.getTotalPublishedEntries(), is(1)); + assertThat(consumer.getHandleErrorCallCount(), greaterThanOrEqualTo(1)); + }); }); } diff --git a/amazon-sqs-java-messaging-lib-v1/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsProducerAsyncTest.java b/amazon-sqs-java-messaging-lib-v1/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsProducerAsyncTest.java index b95f88b..9cce97f 100644 --- a/amazon-sqs-java-messaging-lib-v1/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsProducerAsyncTest.java +++ b/amazon-sqs-java-messaging-lib-v1/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsProducerAsyncTest.java @@ -16,6 +16,7 @@ package com.amazon.sqs.messaging.lib.core; +import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -29,6 +30,7 @@ import java.util.LinkedList; import java.util.List; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.commons.lang3.RandomStringUtils; @@ -217,7 +219,7 @@ void testSuccessBlockingSubmissionPolicy() { when(amazonSQS.sendMessageBatch(any())).thenAnswer(invocation -> { while (true) { - Thread.sleep(1); + await().pollDelay(1, TimeUnit.MILLISECONDS).until(() -> true); } }); diff --git a/amazon-sqs-java-messaging-lib-v1/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIT.java b/amazon-sqs-java-messaging-lib-v1/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIT.java deleted file mode 100644 index 7fa4d0a..0000000 --- a/amazon-sqs-java-messaging-lib-v1/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIT.java +++ /dev/null @@ -1,494 +0,0 @@ -/* - * Copyright 2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.amazon.sqs.messaging.lib.core; - -import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; - -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import org.apache.commons.lang3.RandomStringUtils; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.testcontainers.containers.localstack.LocalStackContainer; -import org.testcontainers.containers.localstack.LocalStackContainer.Service; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; - -import com.amazon.sqs.messaging.lib.concurrent.RingBufferBlockingQueue; -import com.amazon.sqs.messaging.lib.model.QueueProperty; -import com.amazon.sqs.messaging.lib.model.RequestEntry; -import com.amazon.sqs.messaging.lib.model.ResponseFailEntry; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration; -import com.amazonaws.services.sqs.AmazonSQS; -import com.amazonaws.services.sqs.AmazonSQSClientBuilder; -import com.amazonaws.services.sqs.model.CreateQueueRequest; -import com.amazonaws.services.sqs.model.Message; -import com.amazonaws.services.sqs.model.MessageAttributeValue; -import com.amazonaws.services.sqs.model.PurgeQueueRequest; -import com.amazonaws.services.sqs.model.ReceiveMessageRequest; -import com.amazonaws.services.sqs.model.ReceiveMessageResult; - -// @formatter:off -@Testcontainers -class AmazonSqsTemplateIT { - - @Container - static LocalStackContainer localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:3.4.0")) - .withEnv("LOCALSTACK_HOST", "localhost:4566") - .withEnv("SQS_ENDPOINT_STRATEGY", "dynamic") - .withReuse(true) - .withServices(Service.SQS); - - private static AmazonSQS amazonSQS; - - private static String standardQueueUrl; - - private static String fifoQueueUrl; - - @BeforeAll - static void setup() { - amazonSQS = AmazonSQSClientBuilder.standard() - .withEndpointConfiguration(new EndpointConfiguration(localstack.getEndpoint().toString(), localstack.getRegion())) - .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(localstack.getAccessKey(), localstack.getSecretKey()))) - .build(); - - standardQueueUrl = amazonSQS.createQueue( - new CreateQueueRequest().withQueueName("integration-test-standard-" + UUID.randomUUID()) - ).getQueueUrl(); - - final Map fifoAttributes = new HashMap<>(); - fifoAttributes.put("FifoQueue", "true"); - fifoAttributes.put("ContentBasedDeduplication", "true"); - - fifoQueueUrl = amazonSQS.createQueue( - new CreateQueueRequest() - .withQueueName("integration-test-fifo-" + UUID.randomUUID() + ".fifo") - .withAttributes(fifoAttributes) - ).getQueueUrl(); - } - - @AfterAll - static void teardown() { - if (amazonSQS != null) { - try { - amazonSQS.deleteQueue(standardQueueUrl); - } catch (final Exception e) { - // ignore - } - try { - amazonSQS.deleteQueue(fifoQueueUrl); - } catch (final Exception e) { - // ignore - } - } - } - - @BeforeEach - void before() { - purgeQueue(standardQueueUrl); - purgeQueue(fifoQueueUrl); - } - - @Test - void testSendSingleMessage() { - final String messageBody = "hello-sqs-" + UUID.randomUUID(); - - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 100L, 10, 5); - - try { - template.send(RequestEntry.builder() - .withId(UUID.randomUUID().toString()) - .withValue(messageBody) - .build()); - - template.await().join(); - } finally { - template.shutdown(); - } - - final List messages = receiveMessages(standardQueueUrl, 1, 5); - assertThat(messages, hasSize(1)); - assertThat(messages.get(0).getBody(), is(messageBody)); - } - - @Test - void testSendMultipleMessages() { - final int messageCount = 500; - final List messageBodies = IntStream.range(0, messageCount) - .mapToObj(i -> "msg-" + i + "-" + UUID.randomUUID()) - .collect(Collectors.toList()); - - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 50L, 10, 10); - - try { - messageBodies.forEach(body -> { - template.send(RequestEntry.builder() - .withId(UUID.randomUUID().toString()) - .withValue(body) - .build()); - }); - - template.await().join(); - } finally { - template.shutdown(); - } - - final List receivedBodies = drainQueue(standardQueueUrl, messageCount); - assertThat(receivedBodies, hasSize(messageCount)); - assertThat(receivedBodies.containsAll(messageBodies), is(true)); - } - - @Test - void testSendMessagesExceedingBatchSize() { - final int messageCount = 25; - final int maxBatchSize = 10; - - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 50L, maxBatchSize, 10); - - try { - IntStream.range(0, messageCount).forEach(i -> { - template.send(RequestEntry.builder() - .withId(UUID.randomUUID().toString()) - .withValue("batch-test-" + i) - .build()); - }); - - template.await().join(); - } finally { - template.shutdown(); - } - - final List receivedBodies = drainQueue(standardQueueUrl, messageCount); - assertThat(receivedBodies, hasSize(messageCount)); - } - - @Test - void testSendMessagesWithLinger() { - final int messageCount = 20; - final long linger = 200L; - - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, linger, 10, 5); - - try { - IntStream.range(0, messageCount).forEach(i -> { - template.send(RequestEntry.builder() - .withId(UUID.randomUUID().toString()) - .withValue("linger-test-" + i) - .build()); - }); - - template.await().join(); - } finally { - template.shutdown(); - } - - final List receivedBodies = drainQueue(standardQueueUrl, messageCount); - assertThat(receivedBodies, hasSize(messageCount)); - } - - @Test - void testSendMessageWithMessageAttributes() { - final String messageBody = "attr-test-" + UUID.randomUUID(); - - final Map expectedAttributes = new HashMap<>(); - expectedAttributes.put("string-attr", new MessageAttributeValue().withDataType("String").withStringValue("hello")); - expectedAttributes.put("number-attr", new MessageAttributeValue().withDataType("Number").withStringValue("42")); - - final Map messageHeaders = new HashMap<>(); - messageHeaders.put("string-attr", "hello"); - messageHeaders.put("number-attr", 42); - - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 100L, 10, 5); - - try { - template.send(RequestEntry.builder() - .withId(UUID.randomUUID().toString()) - .withValue(messageBody) - .withMessageHeaders(messageHeaders) - .build()); - - template.await().join(); - } finally { - template.shutdown(); - } - - final ReceiveMessageResult result = amazonSQS.receiveMessage( - new ReceiveMessageRequest() - .withQueueUrl(standardQueueUrl) - .withMaxNumberOfMessages(1) - .withWaitTimeSeconds(5) - .withMessageAttributeNames("All")); - - assertThat(result.getMessages(), hasSize(1)); - - final Message message = result.getMessages().get(0); - assertThat(message.getBody(), is(messageBody)); - assertThat(message.getMessageAttributes().get("string-attr").getStringValue(), is("hello")); - assertThat(message.getMessageAttributes().get("number-attr").getStringValue(), is("42")); - } - - @Test - void testSendLargeMessage() { - final String largeBody = RandomStringUtils.secure().nextAlphabetic(262_144); - - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 200L, 5, 5); - - try { - template.send(RequestEntry.builder() - .withId(UUID.randomUUID().toString()) - .withValue(largeBody) - .build()); - - template.await().join(); - } finally { - template.shutdown(); - } - - final List messages = receiveMessages(standardQueueUrl, 1, 10); - assertThat(messages, hasSize(1)); - assertThat(messages.get(0).getBody(), is(largeBody)); - } - - @Test - void testSendMessageExceedingMaxSize() { - final String oversizedBody = RandomStringUtils.secure().nextAlphabetic((1024 * 1024) + 1); - - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 100L, 10, 5); - - final RequestEntry entry = RequestEntry.builder() - .withId(UUID.randomUUID().toString()) - .withValue(oversizedBody) - .build(); - - final AtomicReference responseFailEntry = new AtomicReference<>(); - - try { - template.send(entry).addCallback(null, failCallback -> { - responseFailEntry.set(failCallback); - }); - - template.await().join(); - } finally { - template.shutdown(); - } - - assertThat(responseFailEntry.get().getCode(), is("000")); - assertThat(responseFailEntry.get().getId(), is(entry.getId())); - assertThat(responseFailEntry.get().getMessage(), containsString("The maximum allowed message size exceeding 1024KB (1,048,576 bytes).")); - assertThat(responseFailEntry.get().getSenderFault(), is(true)); - } - - @Test - void testShutdownDrainsPendingMessages() { - final int messageCount = 5; - - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 10_000L, 10, 5); - - IntStream.range(0, messageCount).forEach(i -> { - template.send(RequestEntry.builder() - .withId(UUID.randomUUID().toString()) - .withValue("drain-test-" + i) - .build()); - }); - - template.shutdown(); - - final List receivedBodies = drainQueue(standardQueueUrl, messageCount); - assertThat(receivedBodies, hasSize(messageCount)); - } - - @Test - void testTemplateLifecycle() { - final String messageBody = "lifecycle-" + UUID.randomUUID(); - - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 100L, 10, 5); - - template.send(RequestEntry.builder() - .withId(UUID.randomUUID().toString()) - .withValue(messageBody) - .build()); - - final CompletableFuture awaitFuture = template.await(); - assertThat(awaitFuture, notNullValue()); - - awaitFuture.thenRun(template::shutdown).join(); - - final List messages = receiveMessages(standardQueueUrl, 1, 5); - assertThat(messages, hasSize(1)); - assertThat(messages.get(0).getBody(), is(messageBody)); - } - - @Test - void testSendSingleFifoMessage() { - final String messageBody = "fifo-single-" + UUID.randomUUID(); - final String groupId = UUID.randomUUID().toString(); - - final AmazonSqsTemplate template = createTemplate(fifoQueueUrl, true, 100L, 10, 1); - - try { - template.send(RequestEntry.builder() - .withId(UUID.randomUUID().toString()) - .withValue(messageBody) - .withGroupId(groupId) - .build()); - - template.await().join(); - } finally { - template.shutdown(); - } - - final List messages = receiveMessages(fifoQueueUrl, 1, 5); - assertThat(messages, hasSize(1)); - assertThat(messages.get(0).getBody(), is(messageBody)); - assertThat(messages.get(0).getAttributes().get("MessageGroupId"), is(groupId)); - } - - @Test - void testSendFifoMessagesWithOrdering() { - final int messageCount = 100; - final String groupId = UUID.randomUUID().toString(); - - final AmazonSqsTemplate template = createTemplate(fifoQueueUrl, true, 50L, 10, 1); - - try { - IntStream.range(0, messageCount).forEach(i -> { - template.send(RequestEntry.builder() - .withId(UUID.randomUUID().toString()) - .withValue("ordered-" + i) - .withGroupId(groupId) - .build()); - }); - - template.await().join(); - } finally { - template.shutdown(); - } - - final List receivedBodies = drainQueue(fifoQueueUrl, messageCount); - assertThat(receivedBodies, hasSize(messageCount)); - - for (int i = 0; i < receivedBodies.size(); i++) { - assertThat(receivedBodies.get(i), is("ordered-" + i)); - } - } - - @Test - void testSendFifoMessageWithDeduplication() { - final String deduplicationId = UUID.randomUUID().toString(); - final String groupId = UUID.randomUUID().toString(); - final String messageBody = "dedup-test-" + UUID.randomUUID(); - - final AmazonSqsTemplate template = createTemplate(fifoQueueUrl, true, 100L, 10, 1); - - try { - template.send(RequestEntry.builder() - .withId(UUID.randomUUID().toString()) - .withValue(messageBody) - .withGroupId(groupId) - .withDeduplicationId(deduplicationId) - .build()); - - template.send(RequestEntry.builder() - .withId(UUID.randomUUID().toString()) - .withValue(messageBody + "-duplicate") - .withGroupId(groupId) - .withDeduplicationId(deduplicationId) - .build()); - - template.await().join(); - } finally { - template.shutdown(); - } - - final List messages = receiveMessages(fifoQueueUrl, 2, 5); - assertThat(messages, hasSize(1)); - assertThat(messages.get(0).getBody(), is(messageBody)); - } - - private AmazonSqsTemplate createTemplate(final String queueUrl, final boolean fifo, - final long linger, final int maxBatchSize, final int maxPoolSize) { - - final QueueProperty queueProperty = QueueProperty.builder() - .fifo(fifo) - .linger(linger) - .maxBatchSize(maxBatchSize) - .maximumPoolSize(maxPoolSize) - .queueUrl(queueUrl) - .build(); - - return new AmazonSqsTemplate<>(amazonSQS, queueProperty, new RingBufferBlockingQueue<>(1024)); - } - - private List receiveMessages(final String queueUrl, final int maxNumberOfMessages, final int waitTimeSeconds) { - return amazonSQS.receiveMessage( - new ReceiveMessageRequest() - .withQueueUrl(queueUrl) - .withMaxNumberOfMessages(maxNumberOfMessages) - .withWaitTimeSeconds(waitTimeSeconds) - .withMessageAttributeNames("All") - .withAttributeNames("All") - ).getMessages(); - } - - private List drainQueue(final String queueUrl, final int expectedCount) { - final List allBodies = new LinkedList<>(); - - while (allBodies.size() < expectedCount) { - final ReceiveMessageResult result = amazonSQS.receiveMessage( - new ReceiveMessageRequest() - .withQueueUrl(queueUrl) - .withMaxNumberOfMessages(10) - .withWaitTimeSeconds(5) - .withMessageAttributeNames("All") - .withAttributeNames("All")); - - if (result.getMessages().isEmpty()) { - break; - } - - result.getMessages().forEach(msg -> { - allBodies.add(msg.getBody()); - amazonSQS.deleteMessage(queueUrl, msg.getReceiptHandle()); - }); - } - - return allBodies; - } - - private void purgeQueue(final String queueUrl) { - amazonSQS.purgeQueue(new PurgeQueueRequest(queueUrl)); - } - -} -// @formatter:on diff --git a/amazon-sqs-java-messaging-lib-v1/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIntegrationTest.java b/amazon-sqs-java-messaging-lib-v1/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIntegrationTest.java new file mode 100644 index 0000000..3ca1e34 --- /dev/null +++ b/amazon-sqs-java-messaging-lib-v1/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIntegrationTest.java @@ -0,0 +1,644 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.sqs.messaging.lib.core; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.stream.IntStream; + +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.localstack.LocalStackContainer; +import org.testcontainers.containers.localstack.LocalStackContainer.Service; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import com.amazon.sqs.messaging.lib.concurrent.RingBufferBlockingQueue; +import com.amazon.sqs.messaging.lib.model.QueueProperty; +import com.amazon.sqs.messaging.lib.model.RequestEntry; +import com.amazon.sqs.messaging.lib.model.ResponseFailEntry; +import com.amazon.sqs.messaging.lib.model.ResponseSuccessEntry; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration; +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.AmazonSQSClientBuilder; +import com.amazonaws.services.sqs.model.CreateQueueRequest; +import com.amazonaws.services.sqs.model.DeleteMessageRequest; +import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.MessageSystemAttributeName; +import com.amazonaws.services.sqs.model.PurgeQueueRequest; +import com.amazonaws.services.sqs.model.QueueAttributeName; +import com.amazonaws.services.sqs.model.ReceiveMessageRequest; +import com.amazonaws.services.sqs.model.ReceiveMessageResult; + +import lombok.SneakyThrows; + +// @formatter:off +@Testcontainers +@SuppressWarnings("resource") +class AmazonSqsTemplateIntegrationTest { + + @Container + static LocalStackContainer localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:3.4.0")) + .withEnv("LOCALSTACK_HOST", "localhost:4566") + .withEnv("SQS_ENDPOINT_STRATEGY", "dynamic") + .withReuse(true) + .withServices(Service.SQS); + + private static AmazonSQS sqsClient; + + private static String standardQueueUrl; + + private static String fifoQueueUrl; + + @BeforeAll + static void setup() { + sqsClient = AmazonSQSClientBuilder.standard() + .withEndpointConfiguration(new EndpointConfiguration(localstack.getEndpoint().toString(), localstack.getRegion())) + .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(localstack.getAccessKey(), localstack.getSecretKey()))) + .build(); + + standardQueueUrl = sqsClient.createQueue( + new CreateQueueRequest().withQueueName("integration-test-standard-" + UUID.randomUUID()) + ).getQueueUrl(); + + final Map fifoAttributes = new HashMap<>(); + fifoAttributes.put(QueueAttributeName.FifoQueue.toString(), "true"); + fifoAttributes.put(QueueAttributeName.ContentBasedDeduplication.toString(), "true"); + + fifoQueueUrl = sqsClient.createQueue(new CreateQueueRequest() + .withQueueName("it-fifo-queue.fifo") + .withAttributes(fifoAttributes)).getQueueUrl(); + } + + @AfterAll + static void teardown() { + if (Objects.nonNull(sqsClient)) { + sqsClient.shutdown(); + } + + if (Objects.nonNull(localstack)) { + localstack.close(); + } + } + + @BeforeEach + void before() { + purgeQueue(standardQueueUrl); + purgeQueue(fifoQueueUrl); + } + + private AmazonSqsTemplate createTemplate( + final String queueUrl, + final boolean fifo, + final long linger, + final int maxBatchSize, + final int maxPoolSize) { + + final QueueProperty queueProperty = QueueProperty.builder() + .fifo(fifo) + .linger(linger) + .maxBatchSize(maxBatchSize) + .maximumPoolSize(maxPoolSize) + .queueUrl(queueUrl) + .build(); + + return new AmazonSqsTemplate<>(sqsClient, queueProperty, new RingBufferBlockingQueue<>(1024)); + } + + private void purgeQueue(final String queueUrl) { + sqsClient.purgeQueue(new PurgeQueueRequest(queueUrl)); + } + + private ReceiveMessageResult receiveMessage(final String queueUrl, final Integer maxNumberOfMessages, final Integer waitTimeSeconds) { + final ReceiveMessageResult result = sqsClient.receiveMessage( + new ReceiveMessageRequest(queueUrl) + .withMaxNumberOfMessages(maxNumberOfMessages) + .withWaitTimeSeconds(waitTimeSeconds) + .withAttributeNames(QueueAttributeName.All) + .withMessageAttributeNames("All")); + + result.getMessages().forEach(message -> sqsClient.deleteMessage(new DeleteMessageRequest(queueUrl, message.getReceiptHandle()))); + + return result; + } + + @SneakyThrows + private void countDownLatch(final Integer count, final Consumer consumer) { + final CountDownLatch countDownLatch = new CountDownLatch(count); + + consumer.accept(countDownLatch); + + countDownLatch.await(1L, TimeUnit.MINUTES); + } + + @Test + void testSendSingleMessage() { + final String messageBody = "hello-sqs-" + UUID.randomUUID(); + + countDownLatch(1, countDownLatch -> { + + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 100L, 10, 5); + + final String id = UUID.randomUUID().toString(); + + final ListenableFuture future = template.send(RequestEntry.builder() + .withId(id) + .withValue(messageBody) + .build()); + + template.await().thenRun(template::shutdown).join(); + + future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), is(id)); + assertThat(result.getMessageId(), notNullValue()); + countDownLatch.countDown(); + }); + }); + + final ReceiveMessageResult result = receiveMessage(standardQueueUrl, 1, 5); + + assertThat(result.getMessages(), hasSize(1)); + + final Message message = result.getMessages().get(0); + assertThat(message.getBody(), is(messageBody)); + assertThat(message.getMessageAttributes().keySet(), hasSize(0)); + } + + @Test + void testSendMultipleMessages() { + final int messageCount = 500; + + countDownLatch(messageCount, countDownLatch -> { + final List> futures = new ArrayList<>(); + + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 50L, 10, 10); + + IntStream.range(0, messageCount).forEach(i -> { + futures.add( + template.send(RequestEntry.builder() + .withId(UUID.randomUUID().toString()) + .withValue("msg-" + i + "-" + UUID.randomUUID()) + .build()) + ); + }); + + template.await().thenRun(template::shutdown).join(); + + futures.forEach(future -> future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + countDownLatch.countDown(); + })); + }); + + final List messages = new LinkedList<>(); + + while (messages.size() < messageCount) { + messages.addAll(receiveMessage(standardQueueUrl, 10, 5).getMessages()); + } + + assertThat(messages, hasSize(messageCount)); + + messages.forEach(message -> { + assertThat(message.getBody(), containsString("msg-")); + assertThat(message.getMessageAttributes().keySet(), hasSize(0)); + }); + } + + @Test + void testSendMessagesExceedingBatchSize() { + final int messageCount = 25; + + countDownLatch(messageCount, countDownLatch -> { + final List> futures = new ArrayList<>(); + + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 50L, 10, 10); + + IntStream.range(0, messageCount).forEach(i -> { + futures.add(template.send(RequestEntry.builder() + .withId(UUID.randomUUID().toString()) + .withValue("batch-test-" + i) + .build())); + }); + + template.await().thenRun(template::shutdown).join(); + + futures.forEach(future -> future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + countDownLatch.countDown(); + })); + }); + + final List messages = new LinkedList<>(); + + while (messages.size() < messageCount) { + messages.addAll(receiveMessage(standardQueueUrl, 10, 5).getMessages()); + } + + assertThat(messages, hasSize(messageCount)); + + messages.forEach(message -> { + assertThat(message.getBody(), containsString("batch-test-")); + assertThat(message.getMessageAttributes().keySet(), hasSize(0)); + }); + } + + @Test + void testSendMessagesWithLinger() { + final int messageCount = 20; + + countDownLatch(messageCount, countDownLatch -> { + final List> futures = new ArrayList<>(); + + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 200L, 10, 5); + + IntStream.range(0, messageCount).forEach(i -> { + futures.add(template.send(RequestEntry.builder() + .withId(UUID.randomUUID().toString()) + .withValue("linger-test-" + i) + .build())); + }); + + template.await().thenRun(template::shutdown).join(); + + futures.forEach(future -> future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + countDownLatch.countDown(); + })); + }); + + final List messages = new LinkedList<>(); + + while (messages.size() < messageCount) { + messages.addAll(receiveMessage(standardQueueUrl, 10, 5).getMessages()); + } + + assertThat(messages, hasSize(messageCount)); + + messages.forEach(message -> { + assertThat(message.getBody(), containsString("linger-test-")); + assertThat(message.getMessageAttributes().keySet(), hasSize(0)); + }); + } + + @Test + void testSendMessageWithgetMessageAttributes() { + final String messageBody = "attr-test-" + UUID.randomUUID(); + + countDownLatch(1, countDownLatch -> { + final Map messageHeaders = new HashMap<>(); + messageHeaders.put("string-attr", "hello"); + messageHeaders.put("number-attr", 42); + + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 100L, 10, 5); + + final ListenableFuture future = template.send(RequestEntry.builder() + .withId(UUID.randomUUID().toString()) + .withValue(messageBody) + .withMessageHeaders(messageHeaders) + .build()); + + template.await().thenRun(template::shutdown).join(); + + future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + countDownLatch.countDown(); + }); + }); + + final List messages = new LinkedList<>(); + + while (messages.size() < 1) { + messages.addAll(receiveMessage(standardQueueUrl, 10, 5).getMessages()); + } + + assertThat(messages, hasSize(1)); + + messages.forEach(message -> { + assertThat(message.getBody(), is(messageBody)); + assertThat(message.getMessageAttributes().get("string-attr").getStringValue(), is("hello")); + assertThat(message.getMessageAttributes().get("number-attr").getStringValue(), is("42")); + }); + } + + @Test + void testSendLargeMessage() { + final String largeBody = RandomStringUtils.secure().nextAlphabetic(262_144); + + countDownLatch(1, countDownLatch -> { + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 200L, 5, 5); + + final ListenableFuture future = template.send(RequestEntry.builder() + .withId(UUID.randomUUID().toString()) + .withValue(largeBody) + .build()); + + template.await().thenRun(template::shutdown).join(); + + future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + countDownLatch.countDown(); + }); + }); + + final List messages = new LinkedList<>(); + + while (messages.size() < 1) { + messages.addAll(receiveMessage(standardQueueUrl, 10, 5).getMessages()); + } + + assertThat(messages, hasSize(1)); + + messages.forEach(message -> { + assertThat(message.getBody(), is(largeBody)); + assertThat(message.getMessageAttributes().keySet(), hasSize(0)); + }); + } + + @Test + void testSendMessageExceedingMaxSize() { + countDownLatch(1, countDownLatch -> { + final String oversizedBody = RandomStringUtils.secure().nextAlphabetic((1024 * 1024) + 1); + + final RequestEntry entry = RequestEntry.builder() + .withId(UUID.randomUUID().toString()) + .withValue(oversizedBody) + .build(); + + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 100L, 10, 5); + + final ListenableFuture future = template.send(entry); + + template.await().thenRun(template::shutdown).join(); + + future.addCallback(null, failureResult -> { + assertThat(failureResult.getCode(), is("000")); + assertThat(failureResult.getId(), is(entry.getId())); + assertThat(failureResult.getMessage(), containsString("The maximum allowed message size exceeding 1024KB (1,048,576 bytes).")); + assertThat(failureResult.getSenderFault(), is(true)); + countDownLatch.countDown(); + }); + }); + + final List messages = receiveMessage(standardQueueUrl, 10, 5).getMessages(); + + assertThat(messages, hasSize(0)); + } + + @Test + void testShutdownDrainsPendingMessages() { + final int messageCount = 5; + + countDownLatch(messageCount, countDownLatch -> { + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 10_000L, 10, 5); + + final List> futures = new ArrayList<>(); + + IntStream.range(0, messageCount).forEach(i -> { + futures.add(template.send(RequestEntry.builder() + .withId(UUID.randomUUID().toString()) + .withValue("drain-test-" + i) + .build())); + }); + + template.await().thenRun(template::shutdown).join(); + + futures.forEach(future -> future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + countDownLatch.countDown(); + })); + }); + + final List messages = new LinkedList<>(); + + while (messages.size() < messageCount) { + messages.addAll(receiveMessage(standardQueueUrl, 10, 5).getMessages()); + } + + assertThat(messages, hasSize(messageCount)); + + messages.forEach(message -> { + assertThat(message.getBody(), containsString("drain-test-")); + assertThat(message.getMessageAttributes().keySet(), hasSize(0)); + }); + } + + @Test + void testTemplateLifecycle() { + countDownLatch(1, countDownLatch -> { + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 100L, 10, 5); + + final ListenableFuture future = template.send(RequestEntry.builder() + .withId(UUID.randomUUID().toString()) + .withValue("lifecycle-" + UUID.randomUUID()) + .build()); + + template.await().thenRun(template::shutdown).join(); + + future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + countDownLatch.countDown(); + }); + }); + + final List messages = new LinkedList<>(); + + while (messages.size() < 1) { + messages.addAll(receiveMessage(standardQueueUrl, 10, 5).getMessages()); + } + + assertThat(messages, hasSize(1)); + + messages.forEach(message -> { + assertThat(message.getBody(), containsString("lifecycle-")); + assertThat(message.getMessageAttributes().keySet(), hasSize(0)); + }); + } + + @Test + void testSendSingleFifoMessage() { + final String messageBody = "fifo-single-" + UUID.randomUUID(); + final String id = UUID.randomUUID().toString(); + final String groupId = id; + + countDownLatch(1, countDownLatch -> { + final AmazonSqsTemplate template = createTemplate(fifoQueueUrl, true, 100L, 10, 1); + + final ListenableFuture future = template.send(RequestEntry.builder() + .withId(id) + .withValue(messageBody) + .withGroupId(groupId) + .build()); + + template.await().thenRun(template::shutdown).join(); + + future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), is(id)); + assertThat(result.getMessageId(), notNullValue()); + assertThat(result.getSequenceNumber(), notNullValue()); + countDownLatch.countDown(); + }); + }); + + final List messages = new LinkedList<>(); + + while (messages.size() < 1) { + messages.addAll(receiveMessage(fifoQueueUrl, 10, 5).getMessages()); + } + + assertThat(messages, hasSize(1)); + + messages.forEach(message -> { + assertThat(message.getBody(), containsString("fifo-single-")); + assertThat(message.getAttributes().get(MessageSystemAttributeName.MessageGroupId.toString()), is(groupId)); + assertThat(message.getMessageAttributes().keySet(), hasSize(0)); + }); + } + + @Test + void testSendFifoMessagesWithOrdering() { + final int messageCount = 100; + final String groupId = UUID.randomUUID().toString(); + + countDownLatch(1, countDownLatch -> { + final List> futures = new ArrayList<>(); + + final AmazonSqsTemplate template = createTemplate(fifoQueueUrl, true, 50L, 10, 1); + + IntStream.range(0, messageCount).forEach(i -> { + futures.add(template.send(RequestEntry.builder() + .withId(UUID.randomUUID().toString()) + .withValue("ordered-" + i) + .withGroupId(groupId) + .build())); + }); + + template.await().thenRun(template::shutdown).join(); + + futures.forEach(future -> future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + assertThat(result.getSequenceNumber(), notNullValue()); + countDownLatch.countDown(); + })); + }); + + final List messages = new LinkedList<>(); + + while (messages.size() < messageCount) { + messages.addAll(receiveMessage(fifoQueueUrl, 10, 5).getMessages()); + } + + assertThat(messages, hasSize(messageCount)); + + messages.forEach(message -> { + assertThat(message.getBody(), containsString("ordered-")); + assertThat(message.getAttributes().get(MessageSystemAttributeName.MessageGroupId.toString()), is(groupId)); + assertThat(message.getMessageAttributes().keySet(), hasSize(0)); + }); + } + + @Test + void testSendFifoMessageWithDeduplication() { + final String deduplicationId = UUID.randomUUID().toString(); + final String groupId = UUID.randomUUID().toString(); + final String messageBody = "dedup-test-" + UUID.randomUUID(); + + countDownLatch(1, countDownLatch -> { + final List> futures = new ArrayList<>(); + + final AmazonSqsTemplate template = createTemplate(fifoQueueUrl, true, 100L, 10, 1); + + futures.add(template.send(RequestEntry.builder() + .withId(UUID.randomUUID().toString()) + .withValue(messageBody) + .withGroupId(groupId) + .withDeduplicationId(deduplicationId) + .build())); + + futures.add(template.send(RequestEntry.builder() + .withId(UUID.randomUUID().toString()) + .withValue(messageBody + "-duplicate") + .withGroupId(groupId) + .withDeduplicationId(deduplicationId) + .build())); + + template.await().thenRun(template::shutdown).join(); + + futures.forEach(future -> future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + assertThat(result.getSequenceNumber(), notNullValue()); + countDownLatch.countDown(); + })); + }); + + final List messages = new LinkedList<>(); + + while (messages.size() < 1) { + messages.addAll(receiveMessage(fifoQueueUrl, 10, 5).getMessages()); + } + + assertThat(messages, hasSize(1)); + + messages.forEach(message -> { + assertThat(message.getBody(), is(messageBody)); + assertThat(message.getAttributes().get(MessageSystemAttributeName.MessageGroupId.toString()), is(groupId)); + assertThat(message.getAttributes().get(MessageSystemAttributeName.MessageDeduplicationId.toString()), is(deduplicationId)); + assertThat(message.getMessageAttributes().keySet(), hasSize(0)); + }); + } + +} +// @formatter:on diff --git a/amazon-sqs-java-messaging-lib-v2/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsProducerAsyncTest.java b/amazon-sqs-java-messaging-lib-v2/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsProducerAsyncTest.java index e86595a..3cc38fd 100644 --- a/amazon-sqs-java-messaging-lib-v2/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsProducerAsyncTest.java +++ b/amazon-sqs-java-messaging-lib-v2/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsProducerAsyncTest.java @@ -16,6 +16,7 @@ package com.amazon.sqs.messaging.lib.core; +import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -29,6 +30,7 @@ import java.util.LinkedList; import java.util.List; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.junit.jupiter.api.BeforeEach; @@ -218,7 +220,7 @@ void testSuccessBlockingSubmissionPolicy() { when(amazonSQS.sendMessageBatch(any(SendMessageBatchRequest.class))).thenAnswer(invocation -> { while (true) { - Thread.sleep(1); + await().pollDelay(1, TimeUnit.MILLISECONDS).until(() -> true); } }); diff --git a/amazon-sqs-java-messaging-lib-v2/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIT.java b/amazon-sqs-java-messaging-lib-v2/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIT.java deleted file mode 100644 index 49f1296..0000000 --- a/amazon-sqs-java-messaging-lib-v2/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIT.java +++ /dev/null @@ -1,496 +0,0 @@ -/* - * Copyright 2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.amazon.sqs.messaging.lib.core; - -import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; - -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import org.apache.commons.lang3.RandomStringUtils; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.testcontainers.containers.localstack.LocalStackContainer; -import org.testcontainers.containers.localstack.LocalStackContainer.Service; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; - -import com.amazon.sqs.messaging.lib.concurrent.RingBufferBlockingQueue; -import com.amazon.sqs.messaging.lib.model.QueueProperty; -import com.amazon.sqs.messaging.lib.model.RequestEntry; -import com.amazon.sqs.messaging.lib.model.ResponseFailEntry; - -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.sqs.SqsClient; -import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; -import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; -import software.amazon.awssdk.services.sqs.model.Message; -import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; -import software.amazon.awssdk.services.sqs.model.PurgeQueueRequest; -import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; - -// @formatter:off -@Testcontainers -class AmazonSqsTemplateIT { - - @Container - static LocalStackContainer localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:3.4.0")) - .withEnv("LOCALSTACK_HOST", "localhost:4566") - .withEnv("SQS_ENDPOINT_STRATEGY", "dynamic") - .withReuse(true) - .withServices(Service.SQS); - - private static SqsClient sqsClient; - - private static String standardQueueUrl; - - private static String fifoQueueUrl; - - @BeforeAll - static void setup() { - sqsClient = SqsClient.builder() - .endpointOverride(localstack.getEndpoint()) - .region(Region.of(localstack.getRegion())) - .credentialsProvider(StaticCredentialsProvider.create( - AwsBasicCredentials.create(localstack.getAccessKey(), localstack.getSecretKey()))) - .build(); - - standardQueueUrl = sqsClient.createQueue( - CreateQueueRequest.builder() - .queueName("integration-test-standard-" + UUID.randomUUID()) - .build() - ).queueUrl(); - - final Map fifoAttributes = new HashMap<>(); - fifoAttributes.put("FifoQueue", "true"); - fifoAttributes.put("ContentBasedDeduplication", "true"); - - fifoQueueUrl = sqsClient.createQueue( - CreateQueueRequest.builder() - .queueName("integration-test-fifo-" + UUID.randomUUID() + ".fifo") - .attributesWithStrings(fifoAttributes) - .build() - ).queueUrl(); - } - - @AfterAll - static void teardown() { - if (sqsClient != null) { - try { - sqsClient.deleteQueue(r -> r.queueUrl(standardQueueUrl)); - } catch (final Exception e) { - // ignore - } - try { - sqsClient.deleteQueue(r -> r.queueUrl(fifoQueueUrl)); - } catch (final Exception e) { - // ignore - } - } - } - - @BeforeEach - void before() { - purgeQueue(standardQueueUrl); - purgeQueue(fifoQueueUrl); - } - - @Test - void testSendSingleMessage() { - final String messageBody = "hello-sqs-" + UUID.randomUUID(); - - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 100L, 10, 5); - - try { - template.send(RequestEntry.builder() - .withId(UUID.randomUUID().toString()) - .withValue(messageBody) - .build()); - - template.await().join(); - } finally { - template.shutdown(); - } - - final List messages = receiveMessages(standardQueueUrl, 1, 5); - assertThat(messages, hasSize(1)); - assertThat(messages.get(0).body(), is(messageBody)); - } - - @Test - void testSendMultipleMessages() { - final int messageCount = 500; - final List messageBodies = IntStream.range(0, messageCount) - .mapToObj(i -> "msg-" + i + "-" + UUID.randomUUID()) - .collect(Collectors.toList()); - - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 50L, 10, 10); - - try { - messageBodies.forEach(body -> { - template.send(RequestEntry.builder() - .withId(UUID.randomUUID().toString()) - .withValue(body) - .build()); - }); - - template.await().join(); - } finally { - template.shutdown(); - } - - final List receivedBodies = drainQueue(standardQueueUrl, messageCount); - assertThat(receivedBodies, hasSize(messageCount)); - assertThat(receivedBodies.containsAll(messageBodies), is(true)); - } - - @Test - void testSendMessagesExceedingBatchSize() { - final int messageCount = 25; - final int maxBatchSize = 10; - - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 50L, maxBatchSize, 10); - - try { - IntStream.range(0, messageCount).forEach(i -> { - template.send(RequestEntry.builder() - .withId(UUID.randomUUID().toString()) - .withValue("batch-test-" + i) - .build()); - }); - - template.await().join(); - } finally { - template.shutdown(); - } - - final List receivedBodies = drainQueue(standardQueueUrl, messageCount); - assertThat(receivedBodies, hasSize(messageCount)); - } - - @Test - void testSendMessagesWithLinger() { - final int messageCount = 20; - final long linger = 200L; - - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, linger, 10, 5); - - try { - IntStream.range(0, messageCount).forEach(i -> { - template.send(RequestEntry.builder() - .withId(UUID.randomUUID().toString()) - .withValue("linger-test-" + i) - .build()); - }); - - template.await().join(); - } finally { - template.shutdown(); - } - - final List receivedBodies = drainQueue(standardQueueUrl, messageCount); - assertThat(receivedBodies, hasSize(messageCount)); - } - - @Test - void testSendMessageWithMessageAttributes() { - final String messageBody = "attr-test-" + UUID.randomUUID(); - - final Map messageHeaders = new HashMap<>(); - messageHeaders.put("string-attr", "hello"); - messageHeaders.put("number-attr", 42); - - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 100L, 10, 5); - - try { - template.send(RequestEntry.builder() - .withId(UUID.randomUUID().toString()) - .withValue(messageBody) - .withMessageHeaders(messageHeaders) - .build()); - - template.await().join(); - } finally { - template.shutdown(); - } - - final List messages = receiveMessages(standardQueueUrl, 1, 5); - assertThat(messages, hasSize(1)); - - final Message message = messages.get(0); - assertThat(message.body(), is(messageBody)); - - final Map attrs = message.messageAttributes(); - assertThat(attrs.get("string-attr").stringValue(), is("hello")); - assertThat(attrs.get("number-attr").stringValue(), is("42")); - } - - @Test - void testSendLargeMessage() { - final String largeBody = RandomStringUtils.secure().nextAlphabetic(262_144); - - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 200L, 5, 5); - - try { - template.send(RequestEntry.builder() - .withId(UUID.randomUUID().toString()) - .withValue(largeBody) - .build()); - - template.await().join(); - } finally { - template.shutdown(); - } - - final List messages = receiveMessages(standardQueueUrl, 1, 10); - assertThat(messages, hasSize(1)); - assertThat(messages.get(0).body(), is(largeBody)); - } - - @Test - void testSendMessageExceedingMaxSize() { - final String oversizedBody = RandomStringUtils.secure().nextAlphabetic((1024 * 1024) + 1); - - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 100L, 10, 5); - - final AtomicReference responseFailEntry = new AtomicReference<>(); - - final RequestEntry entry = RequestEntry.builder() - .withId(UUID.randomUUID().toString()) - .withValue(oversizedBody) - .build(); - - try { - template.send(entry).addCallback(null, failCallback -> { - responseFailEntry.set(failCallback); - }); - - template.await().join(); - } finally { - template.shutdown(); - } - - assertThat(responseFailEntry.get().getCode(), is("000")); - assertThat(responseFailEntry.get().getId(), is(entry.getId())); - assertThat(responseFailEntry.get().getMessage(), containsString("The maximum allowed message size exceeding 1024KB (1,048,576 bytes).")); - assertThat(responseFailEntry.get().getSenderFault(), is(true)); - } - - @Test - void testShutdownDrainsPendingMessages() { - final int messageCount = 5; - - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 10_000L, 10, 5); - - IntStream.range(0, messageCount).forEach(i -> { - template.send(RequestEntry.builder() - .withId(UUID.randomUUID().toString()) - .withValue("drain-test-" + i) - .build()); - }); - - template.shutdown(); - - final List receivedBodies = drainQueue(standardQueueUrl, messageCount); - assertThat(receivedBodies, hasSize(messageCount)); - } - - @Test - void testTemplateLifecycle() { - final String messageBody = "lifecycle-" + UUID.randomUUID(); - - final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 100L, 10, 5); - - template.send(RequestEntry.builder() - .withId(UUID.randomUUID().toString()) - .withValue(messageBody) - .build()); - - final CompletableFuture awaitFuture = template.await(); - assertThat(awaitFuture, notNullValue()); - - awaitFuture.thenRun(template::shutdown).join(); - - final List messages = receiveMessages(standardQueueUrl, 1, 5); - assertThat(messages, hasSize(1)); - assertThat(messages.get(0).body(), is(messageBody)); - } - - @Test - void testSendSingleFifoMessage() { - final String messageBody = "fifo-single-" + UUID.randomUUID(); - final String groupId = UUID.randomUUID().toString(); - - final AmazonSqsTemplate template = createTemplate(fifoQueueUrl, true, 100L, 10, 1); - - try { - template.send(RequestEntry.builder() - .withId(UUID.randomUUID().toString()) - .withValue(messageBody) - .withGroupId(groupId) - .build()); - - template.await().join(); - } finally { - template.shutdown(); - } - - final List messages = receiveMessages(fifoQueueUrl, 1, 5); - assertThat(messages, hasSize(1)); - assertThat(messages.get(0).body(), is(messageBody)); - assertThat(messages.get(0).attributesAsStrings().get("MessageGroupId"), is(groupId)); - } - - @Test - void testSendFifoMessagesWithOrdering() { - final int messageCount = 100; - final String groupId = UUID.randomUUID().toString(); - - final AmazonSqsTemplate template = createTemplate(fifoQueueUrl, true, 50L, 10, 1); - - try { - IntStream.range(0, messageCount).forEach(i -> { - template.send(RequestEntry.builder() - .withId(UUID.randomUUID().toString()) - .withValue("ordered-" + i) - .withGroupId(groupId) - .build()); - }); - - template.await().join(); - } finally { - template.shutdown(); - } - - final List receivedBodies = drainQueue(fifoQueueUrl, messageCount); - assertThat(receivedBodies, hasSize(messageCount)); - - for (int i = 0; i < receivedBodies.size(); i++) { - assertThat(receivedBodies.get(i), is("ordered-" + i)); - } - } - - @Test - void testSendFifoMessageWithDeduplication() { - final String deduplicationId = UUID.randomUUID().toString(); - final String groupId = UUID.randomUUID().toString(); - final String messageBody = "dedup-test-" + UUID.randomUUID(); - - final AmazonSqsTemplate template = createTemplate(fifoQueueUrl, true, 100L, 10, 1); - - try { - template.send(RequestEntry.builder() - .withId(UUID.randomUUID().toString()) - .withValue(messageBody) - .withGroupId(groupId) - .withDeduplicationId(deduplicationId) - .build()); - - template.send(RequestEntry.builder() - .withId(UUID.randomUUID().toString()) - .withValue(messageBody + "-duplicate") - .withGroupId(groupId) - .withDeduplicationId(deduplicationId) - .build()); - - template.await().join(); - } finally { - template.shutdown(); - } - - final List messages = drainQueue(fifoQueueUrl, 2); - assertThat(messages, hasSize(1)); - assertThat(messages.get(0), is(messageBody)); - } - - private AmazonSqsTemplate createTemplate(final String queueUrl, final boolean fifo, - final long linger, final int maxBatchSize, final int maxPoolSize) { - - final QueueProperty queueProperty = QueueProperty.builder() - .fifo(fifo) - .linger(linger) - .maxBatchSize(maxBatchSize) - .maximumPoolSize(maxPoolSize) - .queueUrl(queueUrl) - .build(); - - return new AmazonSqsTemplate<>(sqsClient, queueProperty, new RingBufferBlockingQueue<>(1024)); - } - - private List receiveMessages(final String queueUrl, final int maxNumberOfMessages, final int waitTimeSeconds) { - return sqsClient.receiveMessage( - ReceiveMessageRequest.builder() - .queueUrl(queueUrl) - .maxNumberOfMessages(maxNumberOfMessages) - .waitTimeSeconds(waitTimeSeconds) - .messageAttributeNames("All") - .attributeNamesWithStrings("All") - .build() - ).messages(); - } - - private List drainQueue(final String queueUrl, final int expectedCount) { - final List allBodies = new LinkedList<>(); - - while (allBodies.size() < expectedCount) { - final List messages = sqsClient.receiveMessage( - ReceiveMessageRequest.builder() - .queueUrl(queueUrl) - .maxNumberOfMessages(10) - .waitTimeSeconds(5) - .messageAttributeNames("All") - .attributeNamesWithStrings("All") - .build() - ).messages(); - - if (messages.isEmpty()) { - break; - } - - messages.forEach(msg -> { - allBodies.add(msg.body()); - sqsClient.deleteMessage(DeleteMessageRequest.builder() - .queueUrl(queueUrl) - .receiptHandle(msg.receiptHandle()) - .build()); - }); - } - - return allBodies; - } - - private void purgeQueue(final String queueUrl) { - sqsClient.purgeQueue(PurgeQueueRequest.builder().queueUrl(queueUrl).build()); - } -} -// @formatter:on diff --git a/amazon-sqs-java-messaging-lib-v2/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIntegrationTest.java b/amazon-sqs-java-messaging-lib-v2/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIntegrationTest.java new file mode 100644 index 0000000..883d7dc --- /dev/null +++ b/amazon-sqs-java-messaging-lib-v2/src/test/java/com/amazon/sqs/messaging/lib/core/AmazonSqsTemplateIntegrationTest.java @@ -0,0 +1,640 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.sqs.messaging.lib.core; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +import java.util.ArrayList; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.stream.IntStream; + +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.localstack.LocalStackContainer; +import org.testcontainers.containers.localstack.LocalStackContainer.Service; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import com.amazon.sqs.messaging.lib.concurrent.RingBufferBlockingQueue; +import com.amazon.sqs.messaging.lib.model.QueueProperty; +import com.amazon.sqs.messaging.lib.model.RequestEntry; +import com.amazon.sqs.messaging.lib.model.ResponseFailEntry; +import com.amazon.sqs.messaging.lib.model.ResponseSuccessEntry; + +import lombok.SneakyThrows; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName; +import software.amazon.awssdk.services.sqs.model.PurgeQueueRequest; +import software.amazon.awssdk.services.sqs.model.QueueAttributeName; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; + +// @formatter:off +@Testcontainers +@SuppressWarnings("resource") +class AmazonSqsTemplateIntegrationTest { + + @Container + static LocalStackContainer localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:3.4.0")) + .withEnv("LOCALSTACK_HOST", "localhost:4566") + .withEnv("SQS_ENDPOINT_STRATEGY", "dynamic") + .withReuse(true) + .withServices(Service.SQS); + + private static SqsClient sqsClient; + + private static String standardQueueUrl; + + private static String fifoQueueUrl; + + @BeforeAll + static void setup() { + sqsClient = SqsClient.builder() + .endpointOverride(localstack.getEndpoint()) + .region(Region.of(localstack.getRegion())) + .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(localstack.getAccessKey(), localstack.getSecretKey()))) + .build(); + + standardQueueUrl = sqsClient.createQueue(request -> request.queueName("it-standard-queue")).queueUrl(); + + final Map fifoAttributes = new EnumMap<>(QueueAttributeName.class); + fifoAttributes.put(QueueAttributeName.FIFO_QUEUE, "true"); + fifoAttributes.put(QueueAttributeName.CONTENT_BASED_DEDUPLICATION, "true"); + + fifoQueueUrl = sqsClient.createQueue(request -> request + .queueName("it-fifo-queue.fifo") + .attributes(fifoAttributes)).queueUrl(); + } + + @AfterAll + static void teardown() { + if (Objects.nonNull(sqsClient)) { + sqsClient.close(); + } + + if (Objects.nonNull(localstack)) { + localstack.close(); + } + } + + @BeforeEach + void before() { + purgeQueue(standardQueueUrl); + purgeQueue(fifoQueueUrl); + } + + private AmazonSqsTemplate createTemplate( + final String queueUrl, + final boolean fifo, + final long linger, + final int maxBatchSize, + final int maxPoolSize) { + + final QueueProperty queueProperty = QueueProperty.builder() + .fifo(fifo) + .linger(linger) + .maxBatchSize(maxBatchSize) + .maximumPoolSize(maxPoolSize) + .queueUrl(queueUrl) + .build(); + + return new AmazonSqsTemplate<>(sqsClient, queueProperty, new RingBufferBlockingQueue<>(1024)); + } + + private void purgeQueue(final String queueUrl) { + sqsClient.purgeQueue(PurgeQueueRequest.builder().queueUrl(queueUrl).build()); + } + + private ReceiveMessageResponse receiveMessage(final String queueUrl, final Integer maxNumberOfMessages, final Integer waitTimeSeconds) { + final ReceiveMessageResponse result = sqsClient.receiveMessage(request -> request + .queueUrl(queueUrl) + .maxNumberOfMessages(maxNumberOfMessages) + .waitTimeSeconds(waitTimeSeconds) + .attributeNames(QueueAttributeName.ALL) + .messageAttributeNames("All")); + + result.messages().forEach(message -> sqsClient.deleteMessage(request -> request.receiptHandle(message.receiptHandle()).queueUrl(queueUrl))); + + return result; + } + + @SneakyThrows + private void countDownLatch(final Integer count, final Consumer consumer) { + final CountDownLatch countDownLatch = new CountDownLatch(count); + + consumer.accept(countDownLatch); + + countDownLatch.await(1L, TimeUnit.MINUTES); + } + + @Test + void testSendSingleMessage() { + final String messageBody = "hello-sqs-" + UUID.randomUUID(); + + countDownLatch(1, countDownLatch -> { + + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 100L, 10, 5); + + final String id = UUID.randomUUID().toString(); + + final ListenableFuture future = template.send(RequestEntry.builder() + .withId(id) + .withValue(messageBody) + .build()); + + template.await().thenRun(template::shutdown).join(); + + future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), is(id)); + assertThat(result.getMessageId(), notNullValue()); + countDownLatch.countDown(); + }); + }); + + final ReceiveMessageResponse result = receiveMessage(standardQueueUrl, 1, 5); + + assertThat(result.messages(), hasSize(1)); + + final Message message = result.messages().get(0); + assertThat(message.body(), is(messageBody)); + assertThat(message.messageAttributes().keySet(), hasSize(0)); + } + + @Test + void testSendMultipleMessages() { + final int messageCount = 500; + + countDownLatch(messageCount, countDownLatch -> { + final List> futures = new ArrayList<>(); + + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 50L, 10, 10); + + IntStream.range(0, messageCount).forEach(i -> { + futures.add( + template.send(RequestEntry.builder() + .withId(UUID.randomUUID().toString()) + .withValue("msg-" + i + "-" + UUID.randomUUID()) + .build()) + ); + }); + + template.await().thenRun(template::shutdown).join(); + + futures.forEach(future -> future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + countDownLatch.countDown(); + })); + }); + + final List messages = new LinkedList<>(); + + while (messages.size() < messageCount) { + messages.addAll(receiveMessage(standardQueueUrl, 10, 5).messages()); + } + + assertThat(messages, hasSize(messageCount)); + + messages.forEach(message -> { + assertThat(message.body(), containsString("msg-")); + assertThat(message.messageAttributes().keySet(), hasSize(0)); + }); + } + + @Test + void testSendMessagesExceedingBatchSize() { + final int messageCount = 25; + + countDownLatch(messageCount, countDownLatch -> { + final List> futures = new ArrayList<>(); + + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 50L, 10, 10); + + IntStream.range(0, messageCount).forEach(i -> { + futures.add(template.send(RequestEntry.builder() + .withId(UUID.randomUUID().toString()) + .withValue("batch-test-" + i) + .build())); + }); + + template.await().thenRun(template::shutdown).join(); + + futures.forEach(future -> future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + countDownLatch.countDown(); + })); + }); + + final List messages = new LinkedList<>(); + + while (messages.size() < messageCount) { + messages.addAll(receiveMessage(standardQueueUrl, 10, 5).messages()); + } + + assertThat(messages, hasSize(messageCount)); + + messages.forEach(message -> { + assertThat(message.body(), containsString("batch-test-")); + assertThat(message.messageAttributes().keySet(), hasSize(0)); + }); + } + + @Test + void testSendMessagesWithLinger() { + final int messageCount = 20; + + countDownLatch(messageCount, countDownLatch -> { + final List> futures = new ArrayList<>(); + + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 200L, 10, 5); + + IntStream.range(0, messageCount).forEach(i -> { + futures.add(template.send(RequestEntry.builder() + .withId(UUID.randomUUID().toString()) + .withValue("linger-test-" + i) + .build())); + }); + + template.await().thenRun(template::shutdown).join(); + + futures.forEach(future -> future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + countDownLatch.countDown(); + })); + }); + + final List messages = new LinkedList<>(); + + while (messages.size() < messageCount) { + messages.addAll(receiveMessage(standardQueueUrl, 10, 5).messages()); + } + + assertThat(messages, hasSize(messageCount)); + + messages.forEach(message -> { + assertThat(message.body(), containsString("linger-test-")); + assertThat(message.messageAttributes().keySet(), hasSize(0)); + }); + } + + @Test + void testSendMessageWithMessageAttributes() { + final String messageBody = "attr-test-" + UUID.randomUUID(); + + countDownLatch(1, countDownLatch -> { + final Map messageHeaders = new HashMap<>(); + messageHeaders.put("string-attr", "hello"); + messageHeaders.put("number-attr", 42); + + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 100L, 10, 5); + + final ListenableFuture future = template.send(RequestEntry.builder() + .withId(UUID.randomUUID().toString()) + .withValue(messageBody) + .withMessageHeaders(messageHeaders) + .build()); + + template.await().thenRun(template::shutdown).join(); + + future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + countDownLatch.countDown(); + }); + }); + + final List messages = new LinkedList<>(); + + while (messages.size() < 1) { + messages.addAll(receiveMessage(standardQueueUrl, 10, 5).messages()); + } + + assertThat(messages, hasSize(1)); + + messages.forEach(message -> { + assertThat(message.body(), is(messageBody)); + assertThat(message.messageAttributes().get("string-attr").stringValue(), is("hello")); + assertThat(message.messageAttributes().get("number-attr").stringValue(), is("42")); + }); + } + + @Test + void testSendLargeMessage() { + final String largeBody = RandomStringUtils.secure().nextAlphabetic(262_144); + + countDownLatch(1, countDownLatch -> { + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 200L, 5, 5); + + final ListenableFuture future = template.send(RequestEntry.builder() + .withId(UUID.randomUUID().toString()) + .withValue(largeBody) + .build()); + + template.await().thenRun(template::shutdown).join(); + + future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + countDownLatch.countDown(); + }); + }); + + final List messages = new LinkedList<>(); + + while (messages.size() < 1) { + messages.addAll(receiveMessage(standardQueueUrl, 10, 5).messages()); + } + + assertThat(messages, hasSize(1)); + + messages.forEach(message -> { + assertThat(message.body(), is(largeBody)); + assertThat(message.messageAttributes().keySet(), hasSize(0)); + }); + } + + @Test + void testSendMessageExceedingMaxSize() { + countDownLatch(1, countDownLatch -> { + final String oversizedBody = RandomStringUtils.secure().nextAlphabetic((1024 * 1024) + 1); + + final RequestEntry entry = RequestEntry.builder() + .withId(UUID.randomUUID().toString()) + .withValue(oversizedBody) + .build(); + + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 100L, 10, 5); + + final ListenableFuture future = template.send(entry); + + template.await().thenRun(template::shutdown).join(); + + future.addCallback(null, failureResult -> { + assertThat(failureResult.getCode(), is("000")); + assertThat(failureResult.getId(), is(entry.getId())); + assertThat(failureResult.getMessage(), containsString("The maximum allowed message size exceeding 1024KB (1,048,576 bytes).")); + assertThat(failureResult.getSenderFault(), is(true)); + countDownLatch.countDown(); + }); + }); + + final List messages = receiveMessage(standardQueueUrl, 10, 5).messages(); + + assertThat(messages, hasSize(0)); + } + + @Test + void testShutdownDrainsPendingMessages() { + final int messageCount = 5; + + countDownLatch(messageCount, countDownLatch -> { + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 10_000L, 10, 5); + + final List> futures = new ArrayList<>(); + + IntStream.range(0, messageCount).forEach(i -> { + futures.add(template.send(RequestEntry.builder() + .withId(UUID.randomUUID().toString()) + .withValue("drain-test-" + i) + .build())); + }); + + template.await().thenRun(template::shutdown).join(); + + futures.forEach(future -> future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + countDownLatch.countDown(); + })); + }); + + final List messages = new LinkedList<>(); + + while (messages.size() < messageCount) { + messages.addAll(receiveMessage(standardQueueUrl, 10, 5).messages()); + } + + assertThat(messages, hasSize(messageCount)); + + messages.forEach(message -> { + assertThat(message.body(), containsString("drain-test-")); + assertThat(message.messageAttributes().keySet(), hasSize(0)); + }); + } + + @Test + void testTemplateLifecycle() { + countDownLatch(1, countDownLatch -> { + final AmazonSqsTemplate template = createTemplate(standardQueueUrl, false, 100L, 10, 5); + + final ListenableFuture future = template.send(RequestEntry.builder() + .withId(UUID.randomUUID().toString()) + .withValue("lifecycle-" + UUID.randomUUID()) + .build()); + + template.await().thenRun(template::shutdown).join(); + + future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + countDownLatch.countDown(); + }); + }); + + final List messages = new LinkedList<>(); + + while (messages.size() < 1) { + messages.addAll(receiveMessage(standardQueueUrl, 10, 5).messages()); + } + + assertThat(messages, hasSize(1)); + + messages.forEach(message -> { + assertThat(message.body(), containsString("lifecycle-")); + assertThat(message.messageAttributes().keySet(), hasSize(0)); + }); + } + + @Test + void testSendSingleFifoMessage() { + final String messageBody = "fifo-single-" + UUID.randomUUID(); + final String id = UUID.randomUUID().toString(); + final String groupId = id; + + countDownLatch(1, countDownLatch -> { + final AmazonSqsTemplate template = createTemplate(fifoQueueUrl, true, 100L, 10, 1); + + final ListenableFuture future = template.send(RequestEntry.builder() + .withId(id) + .withValue(messageBody) + .withGroupId(groupId) + .build()); + + template.await().thenRun(template::shutdown).join(); + + future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), is(id)); + assertThat(result.getMessageId(), notNullValue()); + assertThat(result.getSequenceNumber(), notNullValue()); + countDownLatch.countDown(); + }); + }); + + final List messages = new LinkedList<>(); + + while (messages.size() < 1) { + messages.addAll(receiveMessage(fifoQueueUrl, 10, 5).messages()); + } + + assertThat(messages, hasSize(1)); + + messages.forEach(message -> { + assertThat(message.body(), containsString("fifo-single-")); + assertThat(message.attributes().get(MessageSystemAttributeName.MESSAGE_GROUP_ID), is(groupId)); + assertThat(message.messageAttributes().keySet(), hasSize(0)); + }); + } + + @Test + void testSendFifoMessagesWithOrdering() { + final int messageCount = 100; + final String groupId = UUID.randomUUID().toString(); + + countDownLatch(1, countDownLatch -> { + final List> futures = new ArrayList<>(); + + final AmazonSqsTemplate template = createTemplate(fifoQueueUrl, true, 50L, 10, 1); + + IntStream.range(0, messageCount).forEach(i -> { + futures.add(template.send(RequestEntry.builder() + .withId(UUID.randomUUID().toString()) + .withValue("ordered-" + i) + .withGroupId(groupId) + .build())); + }); + + template.await().thenRun(template::shutdown).join(); + + futures.forEach(future -> future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + assertThat(result.getSequenceNumber(), notNullValue()); + countDownLatch.countDown(); + })); + }); + + final List messages = new LinkedList<>(); + + while (messages.size() < messageCount) { + messages.addAll(receiveMessage(fifoQueueUrl, 10, 5).messages()); + } + + assertThat(messages, hasSize(messageCount)); + + messages.forEach(message -> { + assertThat(message.body(), containsString("ordered-")); + assertThat(message.attributes().get(MessageSystemAttributeName.MESSAGE_GROUP_ID), is(groupId)); + assertThat(message.messageAttributes().keySet(), hasSize(0)); + }); + } + + @Test + void testSendFifoMessageWithDeduplication() { + final String deduplicationId = UUID.randomUUID().toString(); + final String groupId = UUID.randomUUID().toString(); + final String messageBody = "dedup-test-" + UUID.randomUUID(); + + countDownLatch(1, countDownLatch -> { + final List> futures = new ArrayList<>(); + + final AmazonSqsTemplate template = createTemplate(fifoQueueUrl, true, 100L, 10, 1); + + futures.add(template.send(RequestEntry.builder() + .withId(UUID.randomUUID().toString()) + .withValue(messageBody) + .withGroupId(groupId) + .withDeduplicationId(deduplicationId) + .build())); + + futures.add(template.send(RequestEntry.builder() + .withId(UUID.randomUUID().toString()) + .withValue(messageBody + "-duplicate") + .withGroupId(groupId) + .withDeduplicationId(deduplicationId) + .build())); + + template.await().thenRun(template::shutdown).join(); + + futures.forEach(future -> future.addCallback(result -> { + assertThat(result, notNullValue()); + assertThat(result.getId(), notNullValue()); + assertThat(result.getMessageId(), notNullValue()); + assertThat(result.getSequenceNumber(), notNullValue()); + countDownLatch.countDown(); + })); + }); + + final List messages = new LinkedList<>(); + + while (messages.size() < 1) { + messages.addAll(receiveMessage(fifoQueueUrl, 10, 5).messages()); + } + + assertThat(messages, hasSize(1)); + + messages.forEach(message -> { + assertThat(message.body(), is(messageBody)); + assertThat(message.attributes().get(MessageSystemAttributeName.MESSAGE_GROUP_ID), is(groupId)); + assertThat(message.attributes().get(MessageSystemAttributeName.MESSAGE_DEDUPLICATION_ID), is(deduplicationId)); + assertThat(message.messageAttributes().keySet(), hasSize(0)); + }); + } + +} +// @formatter:on diff --git a/pom.xml b/pom.xml index 5f4989d..7ef7667 100644 --- a/pom.xml +++ b/pom.xml @@ -28,7 +28,6 @@ 4.11.0 5.10.2 2.16.1 - 1.37 4.2.2 3.24.2 3.20.0 @@ -46,7 +45,6 @@ 3.3.0 3.2.1 2.22.2 - 3.1.2 3.2.0 3.2.8 0.8.12 @@ -283,16 +281,11 @@ maven-source-plugin ${maven-source-plugin.version} - - org.apache.maven.plugins - maven-surefire-plugin - ${maven-surefire-plugin.version} - - - org.apache.maven.plugins - maven-failsafe-plugin - ${maven-failsafe-plugin.version} - + + org.apache.maven.plugins + maven-surefire-plugin + ${maven-surefire-plugin.version} + org.apache.maven.plugins maven-javadoc-plugin @@ -637,25 +630,5 @@ - - - integration-test - - - - org.apache.maven.plugins - maven-failsafe-plugin - - - - integration-test - verify - - - - - - - \ No newline at end of file