Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 1 addition & 25 deletions .github/workflows/ci-maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,28 +54,4 @@ jobs:
restore-keys: ${{ runner.os }}-test-${{ hashFiles('**/pom.xml') }}-${{ matrix.java-version }}

- name: Test with Maven
run: mvn test-compile test --file pom.xml

integration-test:
needs: test
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v6

- name: Set up JDK
uses: actions/setup-java@v5
with:
java-version: 17
distribution: "corretto"
cache: "maven"

- name: Cache Maven packages
uses: actions/cache@v5
with:
path: ~/.m2
key: ${{ runner.os }}-integration-test-${{ hashFiles('**/pom.xml') }}
restore-keys: ${{ runner.os }}-integration-test-${{ hashFiles('**/pom.xml') }}

- name: Integration Test with Maven
run: mvn test-compile failsafe:integration-test failsafe:verify --file pom.xml
run: mvn test-compile test --file pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public void run() {
*/
@SneakyThrows
public void shutdown() {
await().thenAccept(result -> {
await().thenRun(() -> {
try {
LOGGER.warn("Shutdown consumer {}", getClass().getSimpleName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.amazon.sqs.messaging.lib.concurrent;

import static org.assertj.core.api.Assertions.catchThrowableOfType;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

Expand Down Expand Up @@ -46,11 +47,7 @@ void testSuccessSucceededTaskCount() throws InterruptedException {

for(int i = 0; i < 300; i++) {
amazonSqsThreadPoolExecutor.execute(() -> {
try {
Thread.sleep(1);
} catch (final InterruptedException e) {
e.printStackTrace();
}
await().pollDelay(1, TimeUnit.MILLISECONDS).until(() -> true);
});
}

Expand Down Expand Up @@ -95,11 +92,7 @@ void testSuccessActiveTaskCount() throws InterruptedException {
for(int i = 0; i < 10; i++) {
amazonSqsThreadPoolExecutor.execute(() -> {
while(true) {
try {
Thread.sleep(1);
} catch (final InterruptedException e) {
e.printStackTrace();
}
await().pollDelay(1, TimeUnit.MILLISECONDS).until(() -> true);
}
});
}
Expand All @@ -121,11 +114,7 @@ void testSuccessBlockingSubmissionPolicy() throws InterruptedException {

amazonSqsThreadPoolExecutor.execute(() -> {
while(true) {
try {
Thread.sleep(1);
} catch (final InterruptedException e) {
e.printStackTrace();
}
await().pollDelay(1, TimeUnit.MILLISECONDS).until(() -> true);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ void testSuccessWhenIsEmpty() throws InterruptedException {
assertThat(ringBlockingQueue.take().getValue(), is(1));
});

Thread.sleep(2000);
await().pollDelay(2000, TimeUnit.MILLISECONDS).until(() -> true);

producer.submit(() -> {
ringBlockingQueue.put(RequestEntry.<Integer>builder().withValue(0).build());
Expand Down Expand Up @@ -126,7 +126,7 @@ void testSuccessWhenIsFull() throws InterruptedException {
ringBlockingQueue.put(RequestEntry.<Integer>builder().withValue(1).build());
});

Thread.sleep(2000);
await().pollDelay(2000, TimeUnit.MILLISECONDS).until(() -> true);

consumer.submit(() -> {
assertThat(ringBlockingQueue.take().getValue(), is(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;

import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -189,10 +190,11 @@ void testRunWithFifoqueuePublishesSynchronously() throws Exception {
final RequestEntry<String> entry = buildRequestEntry("fifo-message");
queueRequests.put(entry);

Thread.sleep(LINGER_MS * 3);

assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(1));
assertThat(consumer.getHandleResponseCallCount(), greaterThanOrEqualTo(1));
await()
.untilAsserted(() -> {
assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(1));
assertThat(consumer.getHandleResponseCallCount(), greaterThanOrEqualTo(1));
});
});
}

Expand All @@ -208,9 +210,10 @@ void testRunWithNonFifoqueuePublishesAsynchronously() throws Exception {
final RequestEntry<String> entry = buildRequestEntry("non-fifo-message");
queueRequests.put(entry);

Thread.sleep(LINGER_MS * 3);

verify(executorService, atLeastOnce()).execute(any(Runnable.class));
await()
.untilAsserted(() ->
verify(executorService, atLeastOnce()).execute(any(Runnable.class))
);
});
}

Expand All @@ -224,10 +227,11 @@ void testRunHandlesPublishExceptionWithoutCrashing() throws Exception {
final RequestEntry<String> entry = buildRequestEntry("error-message");
queueRequests.put(entry);

Thread.sleep(LINGER_MS * 3);

assertThat(consumer.getHandleErrorCallCount(), greaterThanOrEqualTo(1));
assertThat(consumer.getLastError(), is(notNullValue()));
await()
.untilAsserted(() -> {
assertThat(consumer.getHandleErrorCallCount(), greaterThanOrEqualTo(1));
assertThat(consumer.getLastError(), is(notNullValue()));
});
});
}

Expand All @@ -240,20 +244,22 @@ void testRunRecordsCorrectExceptionOnPublishFailure() throws Exception {

queueRequests.put(buildRequestEntry("fail-message"));

Thread.sleep(LINGER_MS * 3);

assertThat(consumer.getLastError(), instanceOf(RuntimeException.class));
assertThat(consumer.getLastError().getMessage(), containsString("publish failed"));
await()
.untilAsserted(() -> {
assertThat(consumer.getLastError(), instanceOf(RuntimeException.class));
assertThat(consumer.getLastError().getMessage(), containsString("publish failed"));
});
});
}

@Test
void testRunDoesNotPublishWhenQueueIsEmpty() throws Exception {
Thread.sleep(LINGER_MS * 3);

context(consumer -> {
assertThat(consumer.getPublishCallCount(), is(0));
assertThat(consumer.getHandleErrorCallCount(), is(0));
await()
.untilAsserted(() -> {
assertThat(consumer.getPublishCallCount(), is(0));
assertThat(consumer.getHandleErrorCallCount(), is(0));
});
});
}

Expand All @@ -266,9 +272,10 @@ void testRunPublishesMultipleEntriesInSingleBatch() throws Exception {
queueRequests.put(buildRequestEntry("message-" + i));
}

Thread.sleep(LINGER_MS * 4);

assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(1));
await()
.untilAsserted(() ->
assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(1))
);
});
}

Expand All @@ -282,9 +289,10 @@ void testRunRespectMaxBatchSizeByPublishingInMultipleBatches() throws Exception
queueRequests.put(buildRequestEntry("msg-" + i));
}

Thread.sleep(LINGER_MS * 6);

assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(2));
await()
.untilAsserted(() ->
assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(2))
);
});
}

Expand Down Expand Up @@ -344,9 +352,10 @@ void testPublishDecoratorIsAppliedBeforePublish() throws Exception {
context(trackingDecorator, consumer -> {
queueRequests.put(buildRequestEntry("decorated-message"));

Thread.sleep(LINGER_MS * 3);

assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(1));
await()
.untilAsserted(() ->
assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(1))
);
});
}

Expand All @@ -357,10 +366,11 @@ void testPublishDecoratorIdentityDoesNotAlterRequest() throws Exception {
context(consumer -> {
queueRequests.put(buildRequestEntry("identity-message"));

Thread.sleep(LINGER_MS * 3);

assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(1));
assertThat(consumer.getHandleErrorCallCount(), is(0));
await()
.untilAsserted(() -> {
assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(1));
assertThat(consumer.getHandleErrorCallCount(), is(0));
});
});
}

Expand All @@ -371,10 +381,11 @@ void testCanAddPayloadAllowsEntryWellBelowSizeThreshold() throws Exception {
context(consumer -> {
queueRequests.put(buildRequestEntry("small-payload"));

Thread.sleep(LINGER_MS * 3);

assertThat(consumer.getTotalPublishedEntries(), is(1));
assertThat(consumer.getHandleErrorCallCount(), is(0));
await()
.untilAsserted(() -> {
assertThat(consumer.getTotalPublishedEntries(), is(1));
assertThat(consumer.getHandleErrorCallCount(), is(0));
});
});
}

Expand All @@ -386,10 +397,11 @@ void testCanAddPayloadAllowsEntryExactlyAtSizeThreshold() throws Exception {
final String payloadAtThreshold = buildPayloadOfBytes(TestableAmazonSqsConsumer.batchSizeBytesThreshold());
queueRequests.put(buildRequestEntry(payloadAtThreshold));

Thread.sleep(LINGER_MS * 3);

assertThat(consumer.getTotalPublishedEntries(), is(1));
assertThat(consumer.getHandleErrorCallCount(), is(0));
await()
.untilAsserted(() -> {
assertThat(consumer.getTotalPublishedEntries(), is(1));
assertThat(consumer.getHandleErrorCallCount(), is(0));
});
});
}

Expand All @@ -401,12 +413,13 @@ void testCanAddPayloadRejectsEntryExceedingSizeThreshold() throws Exception {
final String oversizedPayload = buildPayloadOfBytes(TestableAmazonSqsConsumer.batchSizeBytesThreshold() + 1);
queueRequests.put(buildRequestEntry(oversizedPayload));

Thread.sleep(LINGER_MS * 3);

assertThat(consumer.getTotalPublishedEntries(), is(1));
assertThat(consumer.getHandleErrorCallCount(), greaterThanOrEqualTo(1));
assertThat(consumer.getLastError(), instanceOf(MaximumAllowedMessageException.class));
assertThat(consumer.getLastError().getMessage(), containsString("1024KB"));
await()
.untilAsserted(() -> {
assertThat(consumer.getTotalPublishedEntries(), is(1));
assertThat(consumer.getHandleErrorCallCount(), greaterThanOrEqualTo(1));
assertThat(consumer.getLastError(), instanceOf(MaximumAllowedMessageException.class));
assertThat(consumer.getLastError().getMessage(), containsString("1024KB"));
});
});
}

Expand All @@ -421,10 +434,11 @@ void testCanAddPayloadStopsAccumulatingWhenBatchExceedsThreshold() throws Except
queueRequests.put(buildRequestEntry(buildPayloadOfBytes(halfThreshold)));
queueRequests.put(buildRequestEntry("small-overflow"));

Thread.sleep(LINGER_MS * 6);

assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(2));
assertThat(consumer.getTotalPublishedEntries(), is(3));
await()
.untilAsserted(() -> {
assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(2));
assertThat(consumer.getTotalPublishedEntries(), is(3));
});
});
}

Expand All @@ -438,10 +452,11 @@ void testCanAddPayloadPublishesFirstEntryAloneWhenItFillsThreshold() throws Exce
queueRequests.put(buildRequestEntry(buildPayloadOfBytes(fullThreshold)));
queueRequests.put(buildRequestEntry("second-entry"));

Thread.sleep(LINGER_MS * 6);

assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(2));
assertThat(consumer.getPublishedBatchSizes().get(0), is(1));
await()
.untilAsserted(() -> {
assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(2));
assertThat(consumer.getPublishedBatchSizes().get(0), is(1));
});
});
}

Expand All @@ -455,10 +470,11 @@ void testCanAddPayloadAllowsMultipleSmallEntriesUpToThreshold() throws Exception
queueRequests.put(buildRequestEntry("entry-" + i));
}

Thread.sleep(LINGER_MS * 4);

assertThat(consumer.getTotalPublishedEntries(), is(10));
assertThat(consumer.getHandleErrorCallCount(), is(0));
await()
.untilAsserted(() -> {
assertThat(consumer.getTotalPublishedEntries(), is(10));
assertThat(consumer.getHandleErrorCallCount(), is(0));
});
});
}

Expand All @@ -473,10 +489,11 @@ void testCanAddPayloadSplitsBatchWhenCumulativeSizeExceedsThreshold() throws Exc
queueRequests.put(buildRequestEntry(buildPayloadOfBytes(chunkSize)));
queueRequests.put(buildRequestEntry(buildPayloadOfBytes(chunkSize)));

Thread.sleep(LINGER_MS * 8);

assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(2));
assertThat(consumer.getTotalPublishedEntries(), is(3));
await()
.untilAsserted(() -> {
assertThat(consumer.getPublishCallCount(), greaterThanOrEqualTo(2));
assertThat(consumer.getTotalPublishedEntries(), is(3));
});
});
}

Expand All @@ -488,10 +505,11 @@ void testCanAddPayloadDoesNotPublishEmptyBatchWhenAllEntriesExceedThreshold() th
final String oversizedPayload = buildPayloadOfBytes(TestableAmazonSqsConsumer.batchSizeBytesThreshold() + 100);
queueRequests.put(buildRequestEntry(oversizedPayload));

Thread.sleep(LINGER_MS * 3);

assertThat(consumer.getTotalPublishedEntries(), is(1));
assertThat(consumer.getHandleErrorCallCount(), greaterThanOrEqualTo(1));
await()
.untilAsserted(() -> {
assertThat(consumer.getTotalPublishedEntries(), is(1));
assertThat(consumer.getHandleErrorCallCount(), greaterThanOrEqualTo(1));
});
});
}

Expand Down
Loading