diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java index 8186a5b304969..87e6f7ea79b45 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java @@ -88,6 +88,7 @@ import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived; import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.apache.kafka.streams.utils.TestUtils.waitForApplicationState; +import static org.hamcrest.CoreMatchers.anyOf; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.not; @@ -162,11 +163,27 @@ public static void closeCluster() { private String changelog3; private static final List> STANDARD_INPUT_DATA = - asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 400L), pair("C", -50L)); - private static final List> COUNT_OUTPUT_DATA = - asList(pair("A", 1L), pair("B", 1L), pair("A", 2L), pair("C", 1L), pair("C", 2L)); - private static final List> SUM_OUTPUT_DATA = - asList(pair("A", 100L), pair("B", 200L), pair("A", 400L), pair("C", 400L), pair("C", 350L)); + asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 400L), pair("C", -50L), // For Partition0 + pair("D", 100L) // For Partition 1 + ); + + private static final List> COUNT_OUTPUT_DATA_PARTITION0_PRIORITY = + asList(pair("A", 1L), pair("B", 1L), pair("A", 2L), pair("C", 1L), pair("C", 2L), + pair("D", 1L) + ); + private static final List> COUNT_OUTPUT_DATA_PARTITION1_PRIORITY = + asList(pair("D", 1L), + pair("A", 1L), pair("B", 1L), pair("A", 2L), pair("C", 1L), pair("C", 2L) + ); + + private static final List> SUM_OUTPUT_DATA_PARTITION0_PRIORITY = + asList(pair("A", 100L), pair("B", 200L), pair("A", 400L), pair("C", 400L), pair("C", 350L), + pair("D", 100L) + ); + private static final List> SUM_OUTPUT_DATA_PARTITION1_PRIORITY = + asList(pair("D", 100L), + pair("A", 100L), pair("B", 200L), pair("A", 400L), pair("C", 400L), pair("C", 350L) + ); private static final String TOPIC_PREFIX = "unique_topic_prefix"; private final KafkaClientSupplier clientSupplier = new DefaultKafkaClientSupplier(); @@ -311,8 +328,11 @@ public void shouldProcessSingleNamedTopologyAndPrefixInternalTopics() throws Exc .toStream().to(OUTPUT_STREAM_1); streams.addNamedTopology(topology1Builder.build()); IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); - final List> results = waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 5); - assertThat(results, equalTo(COUNT_OUTPUT_DATA)); + final List> results = waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 6); + assertThat(results, anyOf( + equalTo(COUNT_OUTPUT_DATA_PARTITION0_PRIORITY), + equalTo(COUNT_OUTPUT_DATA_PARTITION1_PRIORITY) + )); final Set allTopics = CLUSTER.getAllTopicsInCluster(); assertThat(allTopics.contains(TOPIC_PREFIX + "-" + "topology-1" + "-store-changelog"), is(true)); @@ -329,9 +349,18 @@ public void shouldProcessMultipleIdenticalNamedTopologiesWithInMemoryAndPersiste streams.addNamedTopology(topology3Builder.build()); IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA)); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA)); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_3, 5), equalTo(COUNT_OUTPUT_DATA)); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 6), anyOf( + equalTo(COUNT_OUTPUT_DATA_PARTITION0_PRIORITY), + equalTo(COUNT_OUTPUT_DATA_PARTITION1_PRIORITY) + )); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 6), anyOf( + equalTo(COUNT_OUTPUT_DATA_PARTITION0_PRIORITY), + equalTo(COUNT_OUTPUT_DATA_PARTITION1_PRIORITY) + )); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_3, 6), anyOf( + equalTo(COUNT_OUTPUT_DATA_PARTITION0_PRIORITY), + equalTo(COUNT_OUTPUT_DATA_PARTITION1_PRIORITY) + )); assertThat(CLUSTER.getAllTopicsInCluster().containsAll(asList(changelog1, changelog2, changelog3)), is(true)); } @@ -359,8 +388,14 @@ public void shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorr streams.addNamedTopology(topology2Builder.build()); IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA)); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SINGLE_PARTITION_OUTPUT_STREAM, 3), equalTo(COUNT_OUTPUT_DATA)); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 6), anyOf( + equalTo(COUNT_OUTPUT_DATA_PARTITION0_PRIORITY), + equalTo(COUNT_OUTPUT_DATA_PARTITION1_PRIORITY) + )); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SINGLE_PARTITION_OUTPUT_STREAM, 3), anyOf( + equalTo(COUNT_OUTPUT_DATA_PARTITION0_PRIORITY), + equalTo(COUNT_OUTPUT_DATA_PARTITION1_PRIORITY) + )); final ReadOnlyKeyValueStore store = streams.store(NamedTopologyStoreQueryParameters.fromNamedTopologyAndStoreNameAndType( @@ -431,7 +466,10 @@ public void shouldAddNamedTopologyToRunningApplicationWithEmptyInitialTopology() streams.start(); streams.addNamedTopology(topology1Builder.build()).all().get(); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA)); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 6), anyOf( + equalTo(COUNT_OUTPUT_DATA_PARTITION0_PRIORITY), + equalTo(COUNT_OUTPUT_DATA_PARTITION1_PRIORITY) + )); } @Test @@ -445,8 +483,14 @@ public void shouldAddNamedTopologyToRunningApplicationWithSingleInitialNamedTopo waitForApplicationState(Collections.singletonList(streams), State.RUNNING, Duration.ofMillis(DEFAULT_TIMEOUT)); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA)); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA)); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 6), anyOf( + equalTo(COUNT_OUTPUT_DATA_PARTITION0_PRIORITY), + equalTo(COUNT_OUTPUT_DATA_PARTITION1_PRIORITY) + )); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 6), anyOf( + equalTo(COUNT_OUTPUT_DATA_PARTITION0_PRIORITY), + equalTo(COUNT_OUTPUT_DATA_PARTITION1_PRIORITY) + )); } @Test @@ -463,9 +507,18 @@ public void shouldAddNamedTopologyToRunningApplicationWithMultipleInitialNamedTo waitForApplicationState(Collections.singletonList(streams), State.RUNNING, Duration.ofMillis(DEFAULT_TIMEOUT)); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA)); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA)); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_3, 5), equalTo(COUNT_OUTPUT_DATA)); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 6), anyOf( + equalTo(COUNT_OUTPUT_DATA_PARTITION0_PRIORITY), + equalTo(COUNT_OUTPUT_DATA_PARTITION1_PRIORITY) + )); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 6), anyOf( + equalTo(COUNT_OUTPUT_DATA_PARTITION0_PRIORITY), + equalTo(COUNT_OUTPUT_DATA_PARTITION1_PRIORITY) + )); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_3, 6), anyOf( + equalTo(COUNT_OUTPUT_DATA_PARTITION0_PRIORITY), + equalTo(COUNT_OUTPUT_DATA_PARTITION1_PRIORITY) + )); } @Test @@ -481,14 +534,20 @@ public void shouldAddNamedTopologyToRunningApplicationWithMultipleNodes() throws streams2.addNamedTopology(topology1Builder2.build()); IntegrationTestUtils.startApplicationAndWaitUntilRunning(asList(streams, streams2)); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA)); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 6), anyOf( + equalTo(COUNT_OUTPUT_DATA_PARTITION0_PRIORITY), + equalTo(COUNT_OUTPUT_DATA_PARTITION1_PRIORITY) + )); final AddNamedTopologyResult result = streams.addNamedTopology(topology2Builder.build()); final AddNamedTopologyResult result2 = streams2.addNamedTopology(topology2Builder2.build()); result.all().get(); result2.all().get(); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA)); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 6), anyOf( + equalTo(COUNT_OUTPUT_DATA_PARTITION0_PRIORITY), + equalTo(COUNT_OUTPUT_DATA_PARTITION1_PRIORITY) + )); } @Test @@ -501,7 +560,10 @@ public void shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMul streams2.addNamedTopology(topology1Builder2.build()); IntegrationTestUtils.startApplicationAndWaitUntilRunning(asList(streams, streams2)); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA)); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 6), anyOf( + equalTo(COUNT_OUTPUT_DATA_PARTITION0_PRIORITY), + equalTo(COUNT_OUTPUT_DATA_PARTITION1_PRIORITY) + )); final RemoveNamedTopologyResult result = streams.removeNamedTopology(TOPOLOGY_1, true); streams2.removeNamedTopology(TOPOLOGY_1, true).all().get(); @@ -537,7 +599,10 @@ public void shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMul assertThat(streams.getAllTopologies(), equalTo(singleton(topology2Client1))); assertThat(streams2.getAllTopologies(), equalTo(singleton(topology2Client2))); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA)); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 6), anyOf( + equalTo(COUNT_OUTPUT_DATA_PARTITION0_PRIORITY), + equalTo(COUNT_OUTPUT_DATA_PARTITION1_PRIORITY) + )); } @Test @@ -553,8 +618,14 @@ public void shouldRemoveAndReplaceTopologicallyIncompatibleNamedTopology() throw streams.addNamedTopology(topology1Builder.build()); IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA)); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA)); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 6), anyOf( + equalTo(COUNT_OUTPUT_DATA_PARTITION0_PRIORITY), + equalTo(COUNT_OUTPUT_DATA_PARTITION1_PRIORITY) + )); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 6), anyOf( + equalTo(SUM_OUTPUT_DATA_PARTITION0_PRIORITY), + equalTo(SUM_OUTPUT_DATA_PARTITION1_PRIORITY) + )); streams.removeNamedTopology(TOPOLOGY_1).all().get(); streams.cleanUpNamedTopology(TOPOLOGY_1); @@ -567,8 +638,14 @@ public void shouldRemoveAndReplaceTopologicallyIncompatibleNamedTopology() throw produceToInputTopics(DELAYED_INPUT_STREAM_1, STANDARD_INPUT_DATA); streams.addNamedTopology(topology1Builder2.build()).all().get(); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA)); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA)); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 6), anyOf( + equalTo(COUNT_OUTPUT_DATA_PARTITION0_PRIORITY), + equalTo(COUNT_OUTPUT_DATA_PARTITION1_PRIORITY) + )); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 6), anyOf( + equalTo(SUM_OUTPUT_DATA_PARTITION0_PRIORITY), + equalTo(SUM_OUTPUT_DATA_PARTITION1_PRIORITY) + )); } finally { CLUSTER.deleteTopics(SUM_OUTPUT, COUNT_OUTPUT); CLUSTER.deleteTopics(DELAYED_INPUT_STREAM_1); @@ -585,9 +662,18 @@ public void shouldAllowPatternSubscriptionWithMultipleNamedTopologies() throws E streams.addNamedTopology(topology3Builder.build()); IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA)); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA)); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_3, 5), equalTo(COUNT_OUTPUT_DATA)); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 6), anyOf( + equalTo(COUNT_OUTPUT_DATA_PARTITION0_PRIORITY), + equalTo(COUNT_OUTPUT_DATA_PARTITION1_PRIORITY) + )); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 6), anyOf( + equalTo(COUNT_OUTPUT_DATA_PARTITION0_PRIORITY), + equalTo(COUNT_OUTPUT_DATA_PARTITION1_PRIORITY) + )); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_3, 6), anyOf( + equalTo(COUNT_OUTPUT_DATA_PARTITION0_PRIORITY), + equalTo(COUNT_OUTPUT_DATA_PARTITION1_PRIORITY) + )); } @Test @@ -600,9 +686,18 @@ public void shouldAllowMixedCollectionAndPatternSubscriptionWithMultipleNamedTop streams.addNamedTopology(topology3Builder.build()); IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA)); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA)); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_3, 5), equalTo(COUNT_OUTPUT_DATA)); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 6), anyOf( + equalTo(COUNT_OUTPUT_DATA_PARTITION0_PRIORITY), + equalTo(COUNT_OUTPUT_DATA_PARTITION1_PRIORITY) + )); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 6), anyOf( + equalTo(COUNT_OUTPUT_DATA_PARTITION0_PRIORITY), + equalTo(COUNT_OUTPUT_DATA_PARTITION1_PRIORITY) + )); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_3, 6), anyOf( + equalTo(COUNT_OUTPUT_DATA_PARTITION0_PRIORITY), + equalTo(COUNT_OUTPUT_DATA_PARTITION1_PRIORITY) + )); } @Test @@ -617,8 +712,14 @@ public void shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTop final NamedTopology namedTopology = topology1Builder.build(); streams.addNamedTopology(namedTopology).all().get(); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA)); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA)); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 6), anyOf( + equalTo(COUNT_OUTPUT_DATA_PARTITION0_PRIORITY), + equalTo(COUNT_OUTPUT_DATA_PARTITION1_PRIORITY) + )); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 6), anyOf( + equalTo(SUM_OUTPUT_DATA_PARTITION0_PRIORITY), + equalTo(SUM_OUTPUT_DATA_PARTITION1_PRIORITY) + )); streams.removeNamedTopology("topology-1", true).all().get(); streams.cleanUpNamedTopology("topology-1"); @@ -637,8 +738,14 @@ public void shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTop final NamedTopology namedTopologyDup = topology1BuilderDup.build(); streams.addNamedTopology(namedTopologyDup).all().get(); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA)); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA)); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 6), anyOf( + equalTo(COUNT_OUTPUT_DATA_PARTITION0_PRIORITY), + equalTo(COUNT_OUTPUT_DATA_PARTITION1_PRIORITY) + )); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 6), anyOf( + equalTo(SUM_OUTPUT_DATA_PARTITION0_PRIORITY), + equalTo(SUM_OUTPUT_DATA_PARTITION1_PRIORITY) + )); } finally { CLUSTER.deleteTopics(SUM_OUTPUT, COUNT_OUTPUT); } @@ -655,8 +762,14 @@ public void shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTop final NamedTopology namedTopology = topology1Builder.build(); streams.addNamedTopology(namedTopology).all().get(); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA)); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA)); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 6), anyOf( + equalTo(COUNT_OUTPUT_DATA_PARTITION0_PRIORITY), + equalTo(COUNT_OUTPUT_DATA_PARTITION1_PRIORITY) + )); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 6), anyOf( + equalTo(SUM_OUTPUT_DATA_PARTITION0_PRIORITY), + equalTo(SUM_OUTPUT_DATA_PARTITION1_PRIORITY) + )); streams.removeNamedTopology(TOPOLOGY_1, true).all().get(); streams.cleanUpNamedTopology(TOPOLOGY_1); @@ -675,8 +788,14 @@ public void shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTop final NamedTopology namedTopologyDup = topology1BuilderDup.build(); streams.addNamedTopology(namedTopologyDup).all().get(); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA)); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA)); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 6), anyOf( + equalTo(COUNT_OUTPUT_DATA_PARTITION0_PRIORITY), + equalTo(COUNT_OUTPUT_DATA_PARTITION1_PRIORITY) + )); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 6), anyOf( + equalTo(SUM_OUTPUT_DATA_PARTITION0_PRIORITY), + equalTo(SUM_OUTPUT_DATA_PARTITION1_PRIORITY) + )); CLUSTER.deleteTopics(SUM_OUTPUT, COUNT_OUTPUT); }