diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index 129aeb82f4549..eab0f865604f4 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -189,6 +189,9 @@ function run_group_3 { run_test "New File Sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh local FileSink" "skip_check_exceptions" run_test "New File Sink s3 end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh s3 FileSink" "skip_check_exceptions" + run_test "Wordcount Hadoop S3 SeaweedFS read-write end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_wordcount.sh hadoop_seaweedfs" + run_test "Wordcount Presto S3 SeaweedFS read end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_wordcount.sh presto_seaweedfs_read" + run_test "Stateful stream job upgrade end-to-end test" "$END_TO_END_DIR/test-scripts/test_stateful_stream_job_upgrade.sh 2 4" run_test "Netty shuffle direct memory consumption end-to-end test" "$END_TO_END_DIR/test-scripts/test_netty_shuffle_memory_control.sh" diff --git a/flink-end-to-end-tests/test-scripts/common_s3.sh b/flink-end-to-end-tests/test-scripts/common_s3.sh index 903fb6a3ff8f7..6de1604a61c22 100644 --- a/flink-end-to-end-tests/test-scripts/common_s3.sh +++ b/flink-end-to-end-tests/test-scripts/common_s3.sh @@ -18,7 +18,7 @@ ################################################################################ if [[ $S3_SOURCED ]]; then - echo "Only source common_s3.sh or common_s3_minio.sh in the same test, previously sourced $S3_SOURCED" && exit 1 + echo "Only source common_s3.sh or common_s3_seaweedfs.sh in the same test, previously sourced $S3_SOURCED" && exit 1 fi export S3_SOURCED="common_s3.sh" diff --git a/flink-end-to-end-tests/test-scripts/common_s3_minio.sh b/flink-end-to-end-tests/test-scripts/common_s3_seaweedfs.sh similarity index 66% rename from flink-end-to-end-tests/test-scripts/common_s3_minio.sh rename to flink-end-to-end-tests/test-scripts/common_s3_seaweedfs.sh index d3d08392caac7..bd371d16f27a2 100644 --- a/flink-end-to-end-tests/test-scripts/common_s3_minio.sh +++ b/flink-end-to-end-tests/test-scripts/common_s3_seaweedfs.sh @@ -18,9 +18,9 @@ ################################################################################ if [[ $S3_SOURCED ]]; then - echo "Only source common_s3.sh or common_s3_minio.sh in the same test, previously sourced $S3_SOURCED" && exit 1 + echo "Only source common_s3.sh or common_s3_seaweedfs.sh in the same test, previously sourced $S3_SOURCED" && exit 1 fi -export S3_SOURCED="common_s3_minio.sh" +export S3_SOURCED="common_s3_seaweedfs.sh" # export credentials into environment variables for AWS client export AWS_REGION=us-east-1 @@ -32,48 +32,49 @@ IT_CASE_S3_BUCKET=test-data S3_TEST_DATA_WORDS_URI="s3://$IT_CASE_S3_BUCKET/words" ################################### -# Starts a docker container for s3 minio. +# Starts a docker container for s3 seaweedfs. # # Globals: # TEST_INFRA_DIR # AWS_ACCESS_KEY_ID # AWS_SECRET_ACCESS_KEY # Exports: -# MINIO_CONTAINER_ID +# SEAWEEDFS_CONTAINER_ID # S3_ENDPOINT ################################### function s3_start { - echo "Spawning minio for s3 tests" - export MINIO_CONTAINER_ID=$(docker run -d \ + echo "Spawning seaweedfs for s3 tests" + export SEAWEEDFS_CONTAINER_ID=$(docker run -d \ -P \ - --mount type=bind,source="$TEST_INFRA_DIR",target=/data \ - -e "MINIO_ACCESS_KEY=$AWS_ACCESS_KEY_ID" -e "MINIO_SECRET_KEY=$AWS_SECRET_ACCESS_KEY" -e "MINIO_DOMAIN=localhost" \ - minio/minio \ + -e "AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID" -e "AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY" \ + chrislusf/seaweedfs \ server \ - /data) - while [[ "$(docker inspect -f {{.State.Running}} "$MINIO_CONTAINER_ID")" -ne "true" ]]; do + -s3 \ + -s3.port=8333 \ + -dir=/data) + while [[ "$(docker inspect -f {{.State.Running}} "$SEAWEEDFS_CONTAINER_ID")" -ne "true" ]]; do sleep 0.1 done - export S3_ENDPOINT="http://$(docker port "$MINIO_CONTAINER_ID" 9000 | grep -F '0.0.0.0' | sed s'/0\.0\.0\.0/localhost/')" - echo "Started minio @ $S3_ENDPOINT" + export S3_ENDPOINT="http://$(docker port "$SEAWEEDFS_CONTAINER_ID" 8333 | grep -F '0.0.0.0' | sed s'/0\.0\.0\.0/localhost/')" + echo "Started seaweedfs @ $S3_ENDPOINT" on_exit s3_stop } ################################### -# Stops the docker container of minio. +# Stops the docker container of seaweedfs. # # Globals: -# MINIO_CONTAINER_ID +# SEAWEEDFS_CONTAINER_ID ################################### function s3_stop { - docker kill "$MINIO_CONTAINER_ID" - docker rm "$MINIO_CONTAINER_ID" + docker kill "$SEAWEEDFS_CONTAINER_ID" + docker rm "$SEAWEEDFS_CONTAINER_ID" export S3_ENDPOINT= - export MINIO_CONTAINER_ID= + export SEAWEEDFS_CONTAINER_ID= } -# always start it while sourcing, so that MINIO_CONTAINER_ID is available from parent script -if [[ $MINIO_CONTAINER_ID ]]; then +# always start it while sourcing, so that SEAWEEDFS_CONTAINER_ID is available from parent script +if [[ $SEAWEEDFS_CONTAINER_ID ]]; then s3_stop fi s3_start @@ -95,9 +96,9 @@ function s3_setup { set_config_key "s3.access-key" "$AWS_ACCESS_KEY_ID" set_config_key "s3.secret-key" "$AWS_SECRET_ACCESS_KEY" - # change endpoint to minio's location + # change endpoint to seaweedfs's location set_config_key "s3.endpoint" "$S3_ENDPOINT" - # If the test is using virtual host style (default), then it tries to reach minio on .localhost:, + # If the test is using virtual host style (default), then it tries to reach seaweedfs on .localhost:, # which docker does not properly forward. set_config_key "s3.path.style.access" "true" set_config_key "s3.path-style-access" "true" @@ -107,12 +108,17 @@ function s3_setup_with_provider { add_optional_plugin "s3-fs-$1" # reads (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY) set_config_key "$2" "com.amazonaws.auth.EnvironmentVariableCredentialsProvider" - # change endpoint to minio's location + # change endpoint to seaweedfs's location set_config_key "s3.endpoint" "$S3_ENDPOINT" - # If the test is using virtual host style (default), then it tries to reach minio on .localhost:, + # If the test is using virtual host style (default), then it tries to reach seaweedfs on .localhost:, # which docker does not properly forward. set_config_key "s3.path.style.access" "true" set_config_key "s3.path-style-access" "true" } source "$(dirname "$0")"/common_s3_operations.sh + +# SeaweedFS does not expose on-disk directories as S3 buckets, +# so the bucket must be created explicitly +aws_cli s3 mb "s3://$IT_CASE_S3_BUCKET" +aws_cli s3 cp "/hostdir/test-data/words" "$S3_TEST_DATA_WORDS_URI" diff --git a/flink-end-to-end-tests/test-scripts/test_batch_wordcount.sh b/flink-end-to-end-tests/test-scripts/test_batch_wordcount.sh index 62f2455d5ba9d..726370f6dbd68 100755 --- a/flink-end-to-end-tests/test-scripts/test_batch_wordcount.sh +++ b/flink-end-to-end-tests/test-scripts/test_batch_wordcount.sh @@ -38,11 +38,13 @@ case $INPUT_TYPE in on_exit "s3_delete_by_full_path_prefix '$S3_PREFIX'" fetch_complete_result=(s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "${S3_PREFIX}" true) ;; - (hadoop_minio) - source "$(dirname "$0")"/common_s3_minio.sh + (hadoop_seaweedfs) + source "$(dirname "$0")"/common_s3_seaweedfs.sh s3_setup hadoop ARGS="--execution-mode BATCH --input ${S3_TEST_DATA_WORDS_URI} --output s3://$IT_CASE_S3_BUCKET/$S3_PREFIX" - OUTPUT_PATH="$TEST_INFRA_DIR/$IT_CASE_S3_BUCKET/$S3_PREFIX" + OUTPUT_PATH="$TEST_DATA_DIR/$S3_PREFIX" + on_exit "s3_delete_by_full_path_prefix '$S3_PREFIX'" + fetch_complete_result=(s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "${S3_PREFIX}" true) ;; (hadoop_with_provider) source "$(dirname "$0")"/common_s3.sh @@ -60,11 +62,10 @@ case $INPUT_TYPE in on_exit "s3_delete_by_full_path_prefix '$S3_PREFIX'" fetch_complete_result=(s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "${S3_PREFIX}" true) ;; - (presto_minio) - source "$(dirname "$0")"/common_s3_minio.sh + (presto_seaweedfs_read) + source "$(dirname "$0")"/common_s3_seaweedfs.sh s3_setup presto - ARGS="--execution-mode BATCH --input ${S3_TEST_DATA_WORDS_URI} --output s3://$IT_CASE_S3_BUCKET/$S3_PREFIX" - OUTPUT_PATH="$TEST_INFRA_DIR/$IT_CASE_S3_BUCKET/$S3_PREFIX" + ARGS="--execution-mode BATCH --input ${S3_TEST_DATA_WORDS_URI} --output ${OUTPUT_PATH}" ;; (dummy-fs) source "$(dirname "$0")"/common_dummy_fs.sh @@ -84,7 +85,7 @@ start_cluster # The test may run against different source types. # But the sources should provide the same test data, so the checksum stays the same for all tests. ${FLINK_DIR}/bin/flink run -p 1 ${FLINK_DIR}/examples/streaming/WordCount.jar ${ARGS} -# Fetches result from AWS s3 to the OUTPUT_PATH, no-op for other filesystems and minio-based tests +# Fetches result from AWS s3 to the OUTPUT_PATH, no-op for other filesystems and seaweedfs-based tests # it seems we need a function for retry_times function fetch_it() { diff --git a/flink-end-to-end-tests/test-scripts/test_file_sink.sh b/flink-end-to-end-tests/test-scripts/test_file_sink.sh index 47fba1d0c90d7..f35369f95f337 100755 --- a/flink-end-to-end-tests/test-scripts/test_file_sink.sh +++ b/flink-end-to-end-tests/test-scripts/test_file_sink.sh @@ -62,7 +62,7 @@ function get_total_number_of_valid_lines { if [ "${OUT_TYPE}" == "local" ]; then echo "[INFO] Test run in local environment: No S3 environment is loaded." elif [ "${OUT_TYPE}" == "s3" ]; then - source "$(dirname "$0")"/common_s3_minio.sh + source "$(dirname "$0")"/common_s3_seaweedfs.sh s3_setup hadoop # overwrites JOB_OUTPUT_PATH to point to S3 @@ -80,11 +80,6 @@ elif [ "${OUT_TYPE}" == "s3" ]; then find "${LOCAL_JOB_OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g } - # overwrites implementation for local runs - function get_total_number_of_valid_lines { - s3_get_number_of_lines_by_prefix "${S3_DATA_PREFIX}" "part-" - } - # make sure we delete the file at the end function out_cleanup { s3_delete_by_full_path_prefix "${S3_DATA_PREFIX}" diff --git a/flink-end-to-end-tests/test-scripts/test_kubernetes_materialized_table.sh b/flink-end-to-end-tests/test-scripts/test_kubernetes_materialized_table.sh index addc04f28bdad..cb810a6c86c3b 100755 --- a/flink-end-to-end-tests/test-scripts/test_kubernetes_materialized_table.sh +++ b/flink-end-to-end-tests/test-scripts/test_kubernetes_materialized_table.sh @@ -46,7 +46,7 @@ fi # setup materialized table data dir echo "[INFO] Start S3 env" -source "$(dirname "$0")"/common_s3_minio.sh +source "$(dirname "$0")"/common_s3_seaweedfs.sh s3_setup hadoop S3_TEST_DATA_WORDS_URI="s3://$IT_CASE_S3_BUCKET/" MATERIALIZED_TABLE_DATA_DIR="${S3_TEST_DATA_WORDS_URI}/test_materialized_table-$(uuidgen)" diff --git a/flink-filesystems/flink-s3-fs-base/pom.xml b/flink-filesystems/flink-s3-fs-base/pom.xml index b1d3246825dc9..fc3c6a66a54f3 100644 --- a/flink-filesystems/flink-s3-fs-base/pom.xml +++ b/flink-filesystems/flink-s3-fs-base/pom.xml @@ -34,7 +34,7 @@ under the License. 1.12.779 true --add-opens=java.base/java.util=ALL-UNNAMED @@ -245,7 +245,7 @@ under the License. - + org.apache.flink flink-runtime diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/HAApplicationRunOnMinioS3StoreITCase.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/HAApplicationRunOnSeaweedFsS3StoreITCase.java similarity index 84% rename from flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/HAApplicationRunOnMinioS3StoreITCase.java rename to flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/HAApplicationRunOnSeaweedFsS3StoreITCase.java index b80a952fb2ef7..cb5a7521f430d 100644 --- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/HAApplicationRunOnMinioS3StoreITCase.java +++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/HAApplicationRunOnSeaweedFsS3StoreITCase.java @@ -44,20 +44,22 @@ import static org.assertj.core.api.Assertions.assertThat; /** - * {@code ApplicationRunOnMinioS3StoreITCase} covers an application run where the HA data is stored - * in Minio. The implementation verifies whether the {@code JobResult} was written into the - * FileSystem-backed {@code ApplicationResultStore}. + * {@code HAApplicationRunOnSeaweedFsS3StoreITCase} covers an application run where the HA data is + * stored in SeaweedFs. The implementation verifies whether the {@code JobResult} was written into + * the FileSystem-backed {@code ApplicationResultStore}. */ -public abstract class HAApplicationRunOnMinioS3StoreITCase extends AbstractHAApplicationRunITCase { +public abstract class HAApplicationRunOnSeaweedFsS3StoreITCase + extends AbstractHAApplicationRunITCase { private static final String CLUSTER_ID = "test-cluster"; private static final String APPLICATION_RESULT_STORE_FOLDER = "ars"; @RegisterExtension @Order(2) - private static final AllCallbackWrapper> - MINIO_EXTENSION = - new AllCallbackWrapper<>(new TestContainerExtension<>(MinioTestContainer::new)); + private static final AllCallbackWrapper> + SEAWEEDFS_EXTENSION = + new AllCallbackWrapper<>( + new TestContainerExtension<>(SeaweedFsTestContainer::new)); @RegisterExtension @Order(3) @@ -71,19 +73,19 @@ public abstract class HAApplicationRunOnMinioS3StoreITCase extends AbstractHAApp .build(); }); - private static MinioTestContainer getMinioContainer() { - return MINIO_EXTENSION.getCustomExtension().getTestContainer(); + private static SeaweedFsTestContainer getSeaweedFsContainer() { + return SEAWEEDFS_EXTENSION.getCustomExtension().getTestContainer(); } private static String createS3URIWithSubPath(String... subfolders) { - return getMinioContainer().getS3UriForDefaultBucket() + createSubPath(subfolders); + return getSeaweedFsContainer().getS3UriForDefaultBucket() + createSubPath(subfolders); } private static List getObjectsFromApplicationResultStore() { - return getMinioContainer() + return getSeaweedFsContainer() .getClient() .listObjects( - getMinioContainer().getDefaultBucketName(), + getSeaweedFsContainer().getDefaultBucketName(), createSubPath(CLUSTER_ID, APPLICATION_RESULT_STORE_FOLDER)) .getObjectSummaries(); } @@ -96,7 +98,7 @@ private static String createSubPath(String... subfolders) { private static Configuration createConfiguration() { final Configuration config = new Configuration(); - getMinioContainer().setS3ConfigOptions(config); + getSeaweedFsContainer().setS3ConfigOptions(config); // ApplicationResultStore configuration config.set(ApplicationResultStoreOptions.DELETE_ON_COMMIT, Boolean.FALSE); @@ -142,7 +144,7 @@ protected void runAfterApplicationTermination() throws Exception { ::hasValidDirtyApplicationResultStoreEntryExtension)); final String objContent = - getMinioContainer() + getSeaweedFsContainer() .getClient() .getObjectAsString(objRef.getBucketName(), objRef.getKey()); assertThat(objContent).contains(ApplicationState.FINISHED.name()); diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/HAJobRunOnMinioS3StoreITCase.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/HAJobRunOnSeaweedFsS3StoreITCase.java similarity index 84% rename from flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/HAJobRunOnMinioS3StoreITCase.java rename to flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/HAJobRunOnSeaweedFsS3StoreITCase.java index 95c1d33f47374..d047d36f2d0a6 100644 --- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/HAJobRunOnMinioS3StoreITCase.java +++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/HAJobRunOnSeaweedFsS3StoreITCase.java @@ -44,20 +44,21 @@ import static org.assertj.core.api.Assertions.assertThat; /** - * {@code HAJobRunOnMinioS3StoreITCase} covers a job run where the HA data is stored in Minio. The - * implementation verifies whether the {@code JobResult} was written into the FileSystem-backed - * {@code JobResultStore}. + * {@code HAJobRunOnSeaweedFsS3StoreITCase} covers a job run where the HA data is stored in + * SeaweedFs. The implementation verifies whether the {@code JobResult} was written into the + * FileSystem-backed {@code JobResultStore}. */ -public abstract class HAJobRunOnMinioS3StoreITCase extends AbstractHAJobRunITCase { +public abstract class HAJobRunOnSeaweedFsS3StoreITCase extends AbstractHAJobRunITCase { private static final String CLUSTER_ID = "test-cluster"; private static final String JOB_RESULT_STORE_FOLDER = "jrs"; @RegisterExtension @Order(2) - private static final AllCallbackWrapper> - MINIO_EXTENSION = - new AllCallbackWrapper<>(new TestContainerExtension<>(MinioTestContainer::new)); + private static final AllCallbackWrapper> + SEAWEEDFS_EXTENSION = + new AllCallbackWrapper<>( + new TestContainerExtension<>(SeaweedFsTestContainer::new)); @RegisterExtension @Order(3) @@ -71,19 +72,19 @@ public abstract class HAJobRunOnMinioS3StoreITCase extends AbstractHAJobRunITCas .build(); }); - private static MinioTestContainer getMinioContainer() { - return MINIO_EXTENSION.getCustomExtension().getTestContainer(); + private static SeaweedFsTestContainer getSeaweedFsContainer() { + return SEAWEEDFS_EXTENSION.getCustomExtension().getTestContainer(); } private static String createS3URIWithSubPath(String... subfolders) { - return getMinioContainer().getS3UriForDefaultBucket() + createSubPath(subfolders); + return getSeaweedFsContainer().getS3UriForDefaultBucket() + createSubPath(subfolders); } private static List getObjectsFromJobResultStore() { - return getMinioContainer() + return getSeaweedFsContainer() .getClient() .listObjects( - getMinioContainer().getDefaultBucketName(), + getSeaweedFsContainer().getDefaultBucketName(), createSubPath(CLUSTER_ID, JOB_RESULT_STORE_FOLDER)) .getObjectSummaries(); } @@ -96,7 +97,7 @@ private static String createSubPath(String... subfolders) { private static Configuration createConfiguration() { final Configuration config = new Configuration(); - getMinioContainer().setS3ConfigOptions(config); + getSeaweedFsContainer().setS3ConfigOptions(config); // JobResultStore configuration config.set(JobResultStoreOptions.DELETE_ON_COMMIT, Boolean.FALSE); @@ -136,7 +137,7 @@ protected void runAfterJobTermination() throws Exception { .matches(not(FileSystemJobResultStore::hasValidDirtyJobResultStoreEntryExtension)); final String objContent = - getMinioContainer() + getSeaweedFsContainer() .getClient() .getObjectAsString(objRef.getBucketName(), objRef.getKey()); assertThat(objContent).contains(ApplicationStatus.SUCCEEDED.name()); diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S5CmdOnMinioITCase.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S5CmdOnSeaweedFsITCase.java similarity index 93% rename from flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S5CmdOnMinioITCase.java rename to flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S5CmdOnSeaweedFsITCase.java index cefcc3f1f810c..93ec971536572 100644 --- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S5CmdOnMinioITCase.java +++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S5CmdOnSeaweedFsITCase.java @@ -73,20 +73,21 @@ import static org.assertj.core.api.Assertions.assertThat; /** - * {@code HAJobRunOnMinioS3StoreITCase} covers a job run where the HA data is stored in Minio. The - * implementation verifies whether the {@code JobResult} was written into the FileSystem-backed - * {@code JobResultStore}. + * {@code HAJobRunOnSeaweedFsS3StoreITCase} covers a job run where the HA data is stored in + * SeaweedFs. The implementation verifies whether the {@code JobResult} was written into the + * FileSystem-backed {@code JobResultStore}. */ @ExtendWith(TestLoggerExtension.class) -public abstract class S5CmdOnMinioITCase { +public abstract class S5CmdOnSeaweedFsITCase { private static final int CHECKPOINT_INTERVAL = 100; @RegisterExtension @Order(1) - private static final AllCallbackWrapper> - MINIO_EXTENSION = - new AllCallbackWrapper<>(new TestContainerExtension<>(MinioTestContainer::new)); + private static final AllCallbackWrapper> + SEAWEEDFS_EXTENSION = + new AllCallbackWrapper<>( + new TestContainerExtension<>(SeaweedFsTestContainer::new)); @RegisterExtension @Order(2) @@ -103,17 +104,18 @@ public abstract class S5CmdOnMinioITCase { private static Configuration createConfiguration() { final Configuration config = new Configuration(); - getMinioContainer().setS3ConfigOptions(config); + getSeaweedFsContainer().setS3ConfigOptions(config); File credentialsFile = new File(temporaryDirectory, "credentials"); try { // It looks like on the CI machines s5cmd by default is using some other default // authentication mechanism, that takes precedence over passing secret and access keys // via environment variables. For example maybe there exists a credentials file in the - // default location with secrets from the S3, not MinIO. To circumvent it, lets use our - // own credentials file with secrets for MinIO. + // default location with secrets from the S3, not SeaweedFs. To circumvent it, lets use + // our + // own credentials file with secrets for SeaweedFs. checkState(credentialsFile.createNewFile()); - getMinioContainer().writeCredentialsFile(credentialsFile); + getSeaweedFsContainer().writeCredentialsFile(credentialsFile); config.set( S5CMD_EXTRA_ARGS, S5CMD_EXTRA_ARGS.defaultValue() @@ -132,8 +134,8 @@ private static Configuration createConfiguration() { @TempDir public static File temporaryDirectory; - private static MinioTestContainer getMinioContainer() { - return MINIO_EXTENSION.getCustomExtension().getTestContainer(); + private static SeaweedFsTestContainer getSeaweedFsContainer() { + return SEAWEEDFS_EXTENSION.getCustomExtension().getTestContainer(); } @BeforeAll @@ -342,7 +344,7 @@ public void initializeState(FunctionInitializationContext context) throws Except } private static String createS3URIWithSubPath(String... subfolders) { - return getMinioContainer().getS3UriForDefaultBucket() + createSubPath(subfolders); + return getSeaweedFsContainer().getS3UriForDefaultBucket() + createSubPath(subfolders); } private static String createSubPath(String... subfolders) { diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/MinioTestContainer.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/SeaweedFsTestContainer.java similarity index 83% rename from flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/MinioTestContainer.java rename to flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/SeaweedFsTestContainer.java index 162bf1cadd2e1..a1d8ceed18792 100644 --- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/MinioTestContainer.java +++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/SeaweedFsTestContainer.java @@ -39,38 +39,39 @@ import java.time.Duration; import java.util.Locale; -/** {@code MinioTestContainer} provides a {@code Minio} test instance. */ -class MinioTestContainer extends GenericContainer { +/** {@code SeaweedFsTestContainer} provides a {@code SeaweedFs} test instance. */ +class SeaweedFsTestContainer extends GenericContainer { - private static final int DEFAULT_PORT = 9000; + private static final int DEFAULT_PORT = 8333; - private static final String MINIO_ACCESS_KEY = "MINIO_ROOT_USER"; - private static final String MINIO_SECRET_KEY = "MINIO_ROOT_PASSWORD"; + private static final String AWS_ACCESS_KEY_ID = "AWS_ACCESS_KEY_ID"; + private static final String AWS_SECRET_ACCESS_KEY = "AWS_SECRET_ACCESS_KEY"; private static final String DEFAULT_STORAGE_DIRECTORY = "/data"; - private static final String HEALTH_ENDPOINT = "/minio/health/ready"; + private static final String HEALTH_ENDPOINT = "/healthz"; private final String accessKey; private final String secretKey; private final String defaultBucketName; - public MinioTestContainer() { + public SeaweedFsTestContainer() { this(randomString("bucket", 6)); } - public MinioTestContainer(String defaultBucketName) { - super(DockerImageVersions.MINIO); + public SeaweedFsTestContainer(String defaultBucketName) { + super(DockerImageVersions.SEAWEEDFS); this.accessKey = randomString("accessKey", 10); // secrets must have at least 8 characters this.secretKey = randomString("secret", 10); this.defaultBucketName = Preconditions.checkNotNull(defaultBucketName); - withNetworkAliases(randomString("minio", 6)); + withNetworkAliases(randomString("seaweedfs", 6)); addExposedPort(DEFAULT_PORT); - withEnv(MINIO_ACCESS_KEY, this.accessKey); - withEnv(MINIO_SECRET_KEY, this.secretKey); - withCommand("server", DEFAULT_STORAGE_DIRECTORY); + withEnv(AWS_ACCESS_KEY_ID, this.accessKey); + withEnv(AWS_SECRET_ACCESS_KEY, this.secretKey); + withCommand( + "server", "-s3", "-s3.port=" + DEFAULT_PORT, "-dir=" + DEFAULT_STORAGE_DIRECTORY); setWaitStrategy( new HttpWaitStrategy() .forPort(DEFAULT_PORT) @@ -91,7 +92,7 @@ private static String randomString(String prefix, int length) { return String.format("%s-%s", prefix, Base58.randomString(length).toLowerCase(Locale.ROOT)); } - /** Creates {@link AmazonS3} client for accessing the {@code Minio} instance. */ + /** Creates {@link AmazonS3} client for accessing the {@code SeaweedFs} instance. */ public AmazonS3 getClient() { return AmazonS3Client.builder() .withCredentials( @@ -109,9 +110,9 @@ private String getHttpEndpoint() { } /** - * Initializes the Minio instance (i.e. creating the default bucket and initializing Flink's + * Initializes the SeaweedFs instance (i.e. creating the default bucket and initializing Flink's * FileSystems). Additionally, the passed Flink {@link Configuration} is extended by all - * relevant parameter to access the {@code Minio} instance. + * relevant parameter to access the {@code SeaweedFs} instance. */ public void setS3ConfigOptions(Configuration config) { config.set(AbstractS3FileSystemFactory.ENDPOINT, getHttpEndpoint()); diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/MinioTestContainerTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/SeaweedFsTestContainerTest.java similarity index 89% rename from flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/MinioTestContainerTest.java rename to flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/SeaweedFsTestContainerTest.java index fd8e48c86b76e..3e82b6f023424 100644 --- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/MinioTestContainerTest.java +++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/SeaweedFsTestContainerTest.java @@ -35,22 +35,22 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; /** - * {@code MinioTestContainerTest} tests some basic functionality provided by {@link - * MinioTestContainer}. + * {@code SeaweedFsTestContainerTest} tests some basic functionality provided by {@link + * SeaweedFsTestContainer}. */ -class MinioTestContainerTest { +class SeaweedFsTestContainerTest { private static final String DEFAULT_BUCKET_NAME = "test-bucket"; @RegisterExtension - private static final EachCallbackWrapper> - MINIO_EXTENSION = + private static final EachCallbackWrapper> + SEAWEEDFS_EXTENSION = new EachCallbackWrapper<>( new TestContainerExtension<>( - () -> new MinioTestContainer(DEFAULT_BUCKET_NAME))); + () -> new SeaweedFsTestContainer(DEFAULT_BUCKET_NAME))); - private static MinioTestContainer getTestContainer() { - return MINIO_EXTENSION.getCustomExtension().getTestContainer(); + private static SeaweedFsTestContainer getTestContainer() { + return SEAWEEDFS_EXTENSION.getCustomExtension().getTestContainer(); } private static AmazonS3 getClient() { diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HAApplicationRunOnHadoopS3FileSystemITCase.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HAApplicationRunOnHadoopS3FileSystemITCase.java index cd4bb089ff207..3db2cdfe39c6e 100644 --- a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HAApplicationRunOnHadoopS3FileSystemITCase.java +++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HAApplicationRunOnHadoopS3FileSystemITCase.java @@ -18,7 +18,7 @@ package org.apache.flink.fs.s3hadoop; -import org.apache.flink.fs.s3.common.HAApplicationRunOnMinioS3StoreITCase; +import org.apache.flink.fs.s3.common.HAApplicationRunOnSeaweedFsS3StoreITCase; -/** Runs the {@link HAApplicationRunOnMinioS3StoreITCase} on the Hadoop S3 file system. */ -class HAApplicationRunOnHadoopS3FileSystemITCase extends HAApplicationRunOnMinioS3StoreITCase {} +/** Runs the {@link HAApplicationRunOnSeaweedFsS3StoreITCase} on the Hadoop S3 file system. */ +class HAApplicationRunOnHadoopS3FileSystemITCase extends HAApplicationRunOnSeaweedFsS3StoreITCase {} diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HAJobRunOnHadoopS3FileSystemITCase.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HAJobRunOnHadoopS3FileSystemITCase.java index 22ec2ffe2b0ad..107235cf09a9b 100644 --- a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HAJobRunOnHadoopS3FileSystemITCase.java +++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HAJobRunOnHadoopS3FileSystemITCase.java @@ -18,7 +18,7 @@ package org.apache.flink.fs.s3hadoop; -import org.apache.flink.fs.s3.common.HAJobRunOnMinioS3StoreITCase; +import org.apache.flink.fs.s3.common.HAJobRunOnSeaweedFsS3StoreITCase; -/** Runs the {@link HAJobRunOnMinioS3StoreITCase} on the Hadoop S3 file system. */ -class HAJobRunOnHadoopS3FileSystemITCase extends HAJobRunOnMinioS3StoreITCase {} +/** Runs the {@link HAJobRunOnSeaweedFsS3StoreITCase} on the Hadoop S3 file system. */ +class HAJobRunOnHadoopS3FileSystemITCase extends HAJobRunOnSeaweedFsS3StoreITCase {} diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/S5CmdOnHadoopS3FileSystemITCase.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/S5CmdOnHadoopS3FileSystemITCase.java index bae4d6c0b301a..5f939e5feb3fb 100644 --- a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/S5CmdOnHadoopS3FileSystemITCase.java +++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/S5CmdOnHadoopS3FileSystemITCase.java @@ -18,8 +18,8 @@ package org.apache.flink.fs.s3hadoop; -import org.apache.flink.fs.s3.common.HAJobRunOnMinioS3StoreITCase; -import org.apache.flink.fs.s3.common.S5CmdOnMinioITCase; +import org.apache.flink.fs.s3.common.HAJobRunOnSeaweedFsS3StoreITCase; +import org.apache.flink.fs.s3.common.S5CmdOnSeaweedFsITCase; -/** Runs the {@link HAJobRunOnMinioS3StoreITCase} on the Hadoop S3 file system. */ -public class S5CmdOnHadoopS3FileSystemITCase extends S5CmdOnMinioITCase {} +/** Runs the {@link HAJobRunOnSeaweedFsS3StoreITCase} on the Hadoop S3 file system. */ +public class S5CmdOnHadoopS3FileSystemITCase extends S5CmdOnSeaweedFsITCase {} diff --git a/flink-filesystems/flink-s3-fs-native/README.md b/flink-filesystems/flink-s3-fs-native/README.md index a3dc5c866c5e2..96ac27a907ac0 100644 --- a/flink-filesystems/flink-s3-fs-native/README.md +++ b/flink-filesystems/flink-s3-fs-native/README.md @@ -63,8 +63,8 @@ input.sinkTo(FileSink.forRowFormat(new Path("s3://my-bucket/output"), | s3.access-key | (none) | AWS access key (fallback key: `s3.access.key`) | | s3.secret-key | (none) | AWS secret key (fallback key: `s3.secret.key`) | | s3.region | (auto-detect) | AWS region (auto-detected via AWS_REGION, ~/.aws/config, EC2 metadata) | -| s3.endpoint | (none) | Custom S3 endpoint (for MinIO, LocalStack, etc.) | -| s3.path-style-access | false | Use path-style access for S3 (required by most S3-compatible servers such as MinIO; fallback key: `s3.path.style.access`) | +| s3.endpoint | (none) | Custom S3 endpoint (for SeaweedFS, LocalStack, etc.) | +| s3.path-style-access | false | Use path-style access for S3 (required by most S3-compatible servers such as SeaweedFS; fallback key: `s3.path.style.access`) | | s3.chunked-encoding.enabled | true | Enable chunked encoding for S3 requests. Disable for S3-compatible servers that do not support it | | s3.checksum-validation.enabled | true | Enable checksum validation for S3 requests. Disable for S3-compatible servers that do not support it | | s3.upload.min.part.size | 5242880 | Minimum part size for multipart uploads (5MB to 5GB) | @@ -271,20 +271,20 @@ s3.assume-role.session-duration: 3600 # 1 hour } ``` -## MinIO and S3-Compatible Storage +## SeaweedFS and S3-Compatible Storage -For S3-compatible servers (MinIO, LocalStack, Ceph RGW, etc.), set the endpoint plus any compatibility flags the server requires. These flags are not auto-detected from the endpoint value — the defaults target AWS S3 and must be overridden explicitly: +For S3-compatible servers (SeaweedFS, LocalStack, Ceph RGW, etc.), set the endpoint plus any compatibility flags the server requires. These flags are not auto-detected from the endpoint value — the defaults target AWS S3 and must be overridden explicitly: ```yaml -s3.access-key: minioadmin -s3.secret-key: minioadmin -s3.endpoint: http://localhost:9000 +s3.endpoint: http://localhost:8333 +fs.s3.aws.credentials.provider: AnonymousCredentialsProvider -# Required: MinIO does not support virtual-hosted-style addressing. +# Required: SeaweedFS serves the S3 API in path-style and does not support +# virtual-hosted-style addressing by default. s3.path-style-access: true -# Required: MinIO does not support AWS chunked encoding or AWS-style -# checksum trailers used by the SDK by default. +# Required: many S3-compatible servers do not support AWS chunked encoding or +# AWS-style checksum trailers used by the SDK by default. s3.chunked-encoding.enabled: false s3.checksum-validation.enabled: false ``` @@ -402,25 +402,20 @@ Key classes: mvn clean package ``` -## Testing with MinIO +## Testing with SeaweedFS ```bash -# Start MinIO -docker run -d -p 9000:9000 -p 9001:9001 \ - -e "MINIO_ROOT_USER=minioadmin" \ - -e "MINIO_ROOT_PASSWORD=minioadmin" \ - minio/minio server /data --console-address ":9001" +# Start SeaweedFS +docker run -d --name seaweedfs -p 8333:8333 chrislusf/seaweedfs server -s3 -dir=/data # Create bucket -mc alias set local http://localhost:9000 minioadmin minioadmin -mc mb local/test-bucket +docker exec -i seaweedfs sh -c 'echo "s3.bucket.create -name test-bucket" | weed shell' -# Run Flink with MinIO +# Run Flink with SeaweedFS export FLINK_HOME=/path/to/flink cat > $FLINK_HOME/conf/config.yaml <