diff --git a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/src/test/java/org/apache/flink/connector/firehose/table/test/KinesisFirehoseTableITTest.java b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/src/test/java/org/apache/flink/connector/firehose/table/test/KinesisFirehoseTableITTest.java index b86c2538..559056d3 100644 --- a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/src/test/java/org/apache/flink/connector/firehose/table/test/KinesisFirehoseTableITTest.java +++ b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-firehose-e2e-tests/src/test/java/org/apache/flink/connector/firehose/table/test/KinesisFirehoseTableITTest.java @@ -25,7 +25,6 @@ import org.apache.flink.test.resources.ResourceTestUtils; import org.apache.flink.test.util.SQLJobSubmission; import org.apache.flink.util.DockerImageVersions; -import org.apache.flink.util.TestLogger; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -35,16 +34,17 @@ import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import org.assertj.core.api.Assertions; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.Timeout; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.Network; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; import software.amazon.awssdk.core.SdkSystemSetting; import software.amazon.awssdk.http.SdkHttpClient; @@ -74,7 +74,9 @@ import static org.apache.flink.connector.firehose.sink.testutils.KinesisFirehoseTestUtils.createFirehoseClient; /** End to End test for Kinesis Firehose Table sink API. */ -public class KinesisFirehoseTableITTest extends TestLogger { +@Testcontainers +@Timeout(value = 10, unit = TimeUnit.MINUTES) +public class KinesisFirehoseTableITTest { private static final Logger LOG = LoggerFactory.getLogger(KinesisFirehoseTableITTest.class); @@ -95,9 +97,7 @@ public class KinesisFirehoseTableITTest extends TestLogger { private static final int NUM_ELEMENTS = 5; private static final Network network = Network.newNetwork(); - @ClassRule public static final Timeout TIMEOUT = new Timeout(10, TimeUnit.MINUTES); - - @ClassRule + @Container public static LocalstackContainer mockFirehoseContainer = new LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK)) .withNetwork(network) @@ -117,7 +117,7 @@ public class KinesisFirehoseTableITTest extends TestLogger { public static final FlinkContainers FLINK = FlinkContainers.builder().withTestcontainersSettings(TESTCONTAINERS_SETTINGS).build(); - @Before + @BeforeEach public void setup() throws Exception { System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false"); @@ -139,17 +139,17 @@ public void setup() throws Exception { LOG.info("Done setting up the localstack."); } - @BeforeClass + @BeforeAll public static void setupFlink() throws Exception { FLINK.start(); } - @AfterClass + @AfterAll public static void stopFlink() { FLINK.stop(); } - @After + @AfterEach public void teardown() { System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property()); diff --git a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java index 65e74866..ac4b5102 100644 --- a/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java +++ b/flink-connector-aws-e2e-tests/flink-connector-aws-kinesis-streams-e2e-tests/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java @@ -36,14 +36,13 @@ import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.google.common.collect.ImmutableList; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.rules.Timeout; import org.rnorth.ducttape.ratelimits.RateLimiter; import org.rnorth.ducttape.ratelimits.RateLimiterBuilder; import org.slf4j.Logger; @@ -79,6 +78,7 @@ /** End-to-end test for Kinesis Streams Table API Sink using Kinesalite. */ @Testcontainers @ExtendWith(MiniClusterExtension.class) +@Timeout(value = 10, unit = TimeUnit.MINUTES) public class KinesisStreamsTableApiIT { private static final Logger LOGGER = LoggerFactory.getLogger(KinesisStreamsTableApiIT.class); @@ -96,8 +96,6 @@ public class KinesisStreamsTableApiIT { ResourceTestUtils.getResource(".*kinesis-streams.jar"); private static final Network network = Network.newNetwork(); - @ClassRule public static final Timeout TIMEOUT = new Timeout(10, TimeUnit.MINUTES); - @Container public static final LocalstackContainer LOCALSTACK_CONTAINER = new LocalstackContainer(DockerImageName.parse(LOCALSTACK_DOCKER_IMAGE_VERSION)) @@ -123,17 +121,17 @@ public class KinesisStreamsTableApiIT { public static final FlinkContainers FLINK = FlinkContainers.builder().withTestcontainersSettings(TESTCONTAINERS_SETTINGS).build(); - @BeforeClass + @BeforeAll public static void setupFlink() throws Exception { FLINK.start(); } - @AfterClass + @AfterAll public static void stopFlink() { FLINK.stop(); } - @Before + @BeforeEach public void setUp() throws Exception { System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false"); httpClient = AWSServicesTestUtils.createHttpClient(); @@ -144,7 +142,7 @@ public void setUp() throws Exception { prepareStream(LARGE_ORDERS_STREAM); } - @After + @AfterEach public void teardown() { System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property()); AWSGeneralUtil.closeResources(httpClient, kinesisClient); diff --git a/flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/java/org/apache/flink/connector/sqs/sink/test/SqsSinkITTest.java b/flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/java/org/apache/flink/connector/sqs/sink/test/SqsSinkITTest.java index 97a4e169..3258052d 100644 --- a/flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/java/org/apache/flink/connector/sqs/sink/test/SqsSinkITTest.java +++ b/flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/java/org/apache/flink/connector/sqs/sink/test/SqsSinkITTest.java @@ -28,18 +28,18 @@ import org.apache.flink.connector.testframe.container.TestcontainersSettings; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.DockerImageVersions; -import org.apache.flink.util.TestLogger; import org.assertj.core.api.Assertions; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.Network; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; import software.amazon.awssdk.core.SdkSystemSetting; import software.amazon.awssdk.http.SdkHttpClient; @@ -55,7 +55,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; /** End to End test for SQS sink API. */ -public class SqsSinkITTest extends TestLogger { +@Testcontainers +public class SqsSinkITTest { private static final Logger LOG = LoggerFactory.getLogger(SqsSinkITTest.class); @@ -65,7 +66,7 @@ public class SqsSinkITTest extends TestLogger { private SqsClient sqsClient; private static final Network network = Network.newNetwork(); - @ClassRule + @Container public static LocalstackContainer mockSqsContainer = new LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK)) .withNetwork(network) @@ -85,7 +86,7 @@ public class SqsSinkITTest extends TestLogger { public static final FlinkContainers FLINK = FlinkContainers.builder().withTestcontainersSettings(TESTCONTAINERS_SETTINGS).build(); - @Before + @BeforeEach public void setup() throws Exception { httpClient = AWSServicesTestUtils.createHttpClient(); sqsClient = createSqsClient(mockSqsContainer.getEndpoint(), httpClient); @@ -94,17 +95,17 @@ public void setup() throws Exception { LOG.info("Done setting up the localstack."); } - @BeforeClass + @BeforeAll public static void setupFlink() throws Exception { FLINK.start(); } - @AfterClass + @AfterAll public static void stopFlink() { FLINK.stop(); } - @After + @AfterEach public void teardown() { System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property()); httpClient.close(); diff --git a/flink-connector-aws-e2e-tests/flink-formats-avro-glue-schema-registry-e2e-tests/src/test/java/org/apache/flink/glue/schema/registry/test/GlueSchemaRegistryAvroKinesisITCase.java b/flink-connector-aws-e2e-tests/flink-formats-avro-glue-schema-registry-e2e-tests/src/test/java/org/apache/flink/glue/schema/registry/test/GlueSchemaRegistryAvroKinesisITCase.java index 631fb3e7..2a743d82 100644 --- a/flink-connector-aws-e2e-tests/flink-formats-avro-glue-schema-registry-e2e-tests/src/test/java/org/apache/flink/glue/schema/registry/test/GlueSchemaRegistryAvroKinesisITCase.java +++ b/flink-connector-aws-e2e-tests/flink-formats-avro-glue-schema-registry-e2e-tests/src/test/java/org/apache/flink/glue/schema/registry/test/GlueSchemaRegistryAvroKinesisITCase.java @@ -33,20 +33,20 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.StringUtils; -import org.apache.flink.util.TestLogger; import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; import com.amazonaws.services.schemaregistry.utils.AvroRecordType; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; -import org.junit.After; -import org.junit.Assume; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; @@ -73,7 +73,8 @@ import static org.assertj.core.api.Assertions.assertThat; /** End-to-end test for Glue Schema Registry AVRO format using Localstack. */ -public class GlueSchemaRegistryAvroKinesisITCase extends TestLogger { +@Testcontainers +public class GlueSchemaRegistryAvroKinesisITCase { private static final Logger LOGGER = LoggerFactory.getLogger(GlueSchemaRegistryAvroKinesisITCase.class); @@ -95,21 +96,21 @@ public class GlueSchemaRegistryAvroKinesisITCase extends TestLogger { private KinesisClient kinesisClient; private GSRKinesisPubsubClient gsrKinesisPubsubClient; - @ClassRule + @Container public static LocalstackContainer mockKinesisContainer = new LocalstackContainer(DockerImageName.parse(LOCALSTACK_DOCKER_IMAGE_VERSION)) .withNetworkAliases("localstack"); - @Before + @BeforeEach public void setup() throws Exception { System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false"); - Assume.assumeTrue( - "Access key not configured, skipping test...", - !StringUtils.isNullOrWhitespaceOnly(ACCESS_KEY)); - Assume.assumeTrue( - "Secret key not configured, skipping test...", - !StringUtils.isNullOrWhitespaceOnly(SECRET_KEY)); + Assumptions.assumeTrue( + !StringUtils.isNullOrWhitespaceOnly(ACCESS_KEY), + "Access key not configured, skipping test..."); + Assumptions.assumeTrue( + !StringUtils.isNullOrWhitespaceOnly(SECRET_KEY), + "Secret key not configured, skipping test..."); StaticCredentialsProvider gsrCredentialsProvider = StaticCredentialsProvider.create( @@ -128,7 +129,7 @@ public void setup() throws Exception { LOGGER.info("Done setting up the localstack."); } - @After + @AfterEach public void teardown() { System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property()); AWSGeneralUtil.closeResources(httpClient, kinesisClient); @@ -142,7 +143,7 @@ public void testGSRJsonGenericFormatWithFlink() throws Exception { for (GenericRecord msg : messages) { gsrKinesisPubsubClient.sendMessage(getSchema().toString(), INPUT_STREAM, msg); } - log.info("generated records"); + LOGGER.info("generated records"); DataStream input = env.fromSource(createSource(), WatermarkStrategy.noWatermarks(), "source") @@ -155,11 +156,11 @@ public void testGSRJsonGenericFormatWithFlink() throws Exception { List results = gsrKinesisPubsubClient.readAllMessages(OUTPUT_STREAM, OUTPUT_STREAM_ARN); while (deadline.hasTimeLeft() && results.size() < messages.size()) { - log.info("waiting for results.."); + LOGGER.info("waiting for results.."); Thread.sleep(1000); results = gsrKinesisPubsubClient.readAllMessages(OUTPUT_STREAM, OUTPUT_STREAM_ARN); } - log.info("results: {}", results); + LOGGER.info("results: {}", results); assertThat(results).containsExactlyInAnyOrderElementsOf(messages); } diff --git a/flink-connector-aws-e2e-tests/flink-formats-json-glue-schema-registry-e2e-tests/src/test/java/org/apache/flink/glue/schema/registry/test/json/GlueSchemaRegistryJsonKinesisITCase.java b/flink-connector-aws-e2e-tests/flink-formats-json-glue-schema-registry-e2e-tests/src/test/java/org/apache/flink/glue/schema/registry/test/json/GlueSchemaRegistryJsonKinesisITCase.java index 6a9d7525..cc3be2a0 100644 --- a/flink-connector-aws-e2e-tests/flink-formats-json-glue-schema-registry-e2e-tests/src/test/java/org/apache/flink/glue/schema/registry/test/json/GlueSchemaRegistryJsonKinesisITCase.java +++ b/flink-connector-aws-e2e-tests/flink-formats-json-glue-schema-registry-e2e-tests/src/test/java/org/apache/flink/glue/schema/registry/test/json/GlueSchemaRegistryJsonKinesisITCase.java @@ -33,17 +33,17 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.StringUtils; -import org.apache.flink.util.TestLogger; import com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema; import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; -import org.junit.After; -import org.junit.Assume; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; @@ -69,7 +69,8 @@ import static org.assertj.core.api.Assertions.assertThat; /** End-to-end test for Glue Schema Registry Json format using Localstack. */ -public class GlueSchemaRegistryJsonKinesisITCase extends TestLogger { +@Testcontainers +public class GlueSchemaRegistryJsonKinesisITCase { private static final Logger LOGGER = LoggerFactory.getLogger(GlueSchemaRegistryJsonKinesisITCase.class); @@ -92,21 +93,21 @@ public class GlueSchemaRegistryJsonKinesisITCase extends TestLogger { private KinesisClient kinesisClient; private GSRKinesisPubsubClient gsrKinesisPubsubClient; - @ClassRule + @Container public static LocalstackContainer mockKinesisContainer = new LocalstackContainer(DockerImageName.parse(LOCALSTACK_DOCKER_IMAGE_VERSION)) .withNetworkAliases("localstack"); - @Before + @BeforeEach public void setup() throws Exception { System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false"); - Assume.assumeTrue( - "Access key not configured, skipping test...", - !StringUtils.isNullOrWhitespaceOnly(ACCESS_KEY)); - Assume.assumeTrue( - "Secret key not configured, skipping test...", - !StringUtils.isNullOrWhitespaceOnly(SECRET_KEY)); + Assumptions.assumeTrue( + !StringUtils.isNullOrWhitespaceOnly(ACCESS_KEY), + "Access key not configured, skipping test..."); + Assumptions.assumeTrue( + !StringUtils.isNullOrWhitespaceOnly(SECRET_KEY), + "Secret key not configured, skipping test..."); StaticCredentialsProvider gsrCredentialsProvider = StaticCredentialsProvider.create( @@ -126,7 +127,7 @@ public void setup() throws Exception { LOGGER.info("Done setting up the localstack."); } - @After + @AfterEach public void teardown() { System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property()); AWSGeneralUtil.closeResources(httpClient, kinesisClient); @@ -139,7 +140,7 @@ public void testGSRJsonGenericFormatWithFlink() throws Exception { for (JsonDataWithSchema msg : messages) { gsrKinesisPubsubClient.sendMessage(msg.getSchema(), INPUT_STREAM, msg); } - log.info("generated records"); + LOGGER.info("generated records"); DataStream input = env.fromSource(createSource(), WatermarkStrategy.noWatermarks(), "source") @@ -152,11 +153,11 @@ public void testGSRJsonGenericFormatWithFlink() throws Exception { List results = gsrKinesisPubsubClient.readAllMessages(OUTPUT_STREAM, OUTPUT_STREAM_ARN); while (deadline.hasTimeLeft() && results.size() < messages.size()) { - log.info("waiting for results.."); + LOGGER.info("waiting for results.."); Thread.sleep(1000); results = gsrKinesisPubsubClient.readAllMessages(OUTPUT_STREAM, OUTPUT_STREAM_ARN); } - log.info("results: {}", results); + LOGGER.info("results: {}", results); assertThat(results).containsExactlyInAnyOrderElementsOf(messages); } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSourceFactoryTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSourceFactoryTest.java index 0a77c96c..36f442cd 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSourceFactoryTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSourceFactoryTest.java @@ -40,9 +40,8 @@ import org.apache.flink.table.factories.TestFormatFactory; import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext; import org.apache.flink.table.types.DataType; -import org.apache.flink.util.TestLogger; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.Collections; @@ -60,7 +59,7 @@ import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link KinesisDynamicSource} created by {@link KinesisDynamicTableFactory}. */ -public class KinesisDynamicTableSourceFactoryTest extends TestLogger { +public class KinesisDynamicTableSourceFactoryTest { @Test public void testGoodTableSource() {