Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions flink-end-to-end-tests/run-nightly-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ function run_group_3 {
run_test "New File Sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh local FileSink" "skip_check_exceptions"
run_test "New File Sink s3 end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh s3 FileSink" "skip_check_exceptions"

run_test "Wordcount Hadoop S3 SeaweedFS read-write end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_wordcount.sh hadoop_seaweedfs"
run_test "Wordcount Presto S3 SeaweedFS read end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_wordcount.sh presto_seaweedfs_read"

run_test "Stateful stream job upgrade end-to-end test" "$END_TO_END_DIR/test-scripts/test_stateful_stream_job_upgrade.sh 2 4"

run_test "Netty shuffle direct memory consumption end-to-end test" "$END_TO_END_DIR/test-scripts/test_netty_shuffle_memory_control.sh"
Expand Down
2 changes: 1 addition & 1 deletion flink-end-to-end-tests/test-scripts/common_s3.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
################################################################################

if [[ $S3_SOURCED ]]; then
echo "Only source common_s3.sh or common_s3_minio.sh in the same test, previously sourced $S3_SOURCED" && exit 1
echo "Only source common_s3.sh or common_s3_seaweedfs.sh in the same test, previously sourced $S3_SOURCED" && exit 1
fi
export S3_SOURCED="common_s3.sh"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
################################################################################

if [[ $S3_SOURCED ]]; then
echo "Only source common_s3.sh or common_s3_minio.sh in the same test, previously sourced $S3_SOURCED" && exit 1
echo "Only source common_s3.sh or common_s3_seaweedfs.sh in the same test, previously sourced $S3_SOURCED" && exit 1
fi
export S3_SOURCED="common_s3_minio.sh"
export S3_SOURCED="common_s3_seaweedfs.sh"

# export credentials into environment variables for AWS client
export AWS_REGION=us-east-1
Expand All @@ -32,48 +32,49 @@ IT_CASE_S3_BUCKET=test-data
S3_TEST_DATA_WORDS_URI="s3://$IT_CASE_S3_BUCKET/words"

###################################
# Starts a docker container for s3 minio.
# Starts a docker container for s3 seaweedfs.
#
# Globals:
# TEST_INFRA_DIR
# AWS_ACCESS_KEY_ID
# AWS_SECRET_ACCESS_KEY
# Exports:
# MINIO_CONTAINER_ID
# SEAWEEDFS_CONTAINER_ID
# S3_ENDPOINT
###################################
function s3_start {
echo "Spawning minio for s3 tests"
export MINIO_CONTAINER_ID=$(docker run -d \
echo "Spawning seaweedfs for s3 tests"
export SEAWEEDFS_CONTAINER_ID=$(docker run -d \
-P \
--mount type=bind,source="$TEST_INFRA_DIR",target=/data \
-e "MINIO_ACCESS_KEY=$AWS_ACCESS_KEY_ID" -e "MINIO_SECRET_KEY=$AWS_SECRET_ACCESS_KEY" -e "MINIO_DOMAIN=localhost" \
minio/minio \
-e "AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID" -e "AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY" \
chrislusf/seaweedfs \
server \
/data)
while [[ "$(docker inspect -f {{.State.Running}} "$MINIO_CONTAINER_ID")" -ne "true" ]]; do
-s3 \
-s3.port=8333 \
-dir=/data)
while [[ "$(docker inspect -f {{.State.Running}} "$SEAWEEDFS_CONTAINER_ID")" -ne "true" ]]; do
sleep 0.1
done
export S3_ENDPOINT="http://$(docker port "$MINIO_CONTAINER_ID" 9000 | grep -F '0.0.0.0' | sed s'/0\.0\.0\.0/localhost/')"
echo "Started minio @ $S3_ENDPOINT"
export S3_ENDPOINT="http://$(docker port "$SEAWEEDFS_CONTAINER_ID" 8333 | grep -F '0.0.0.0' | sed s'/0\.0\.0\.0/localhost/')"
echo "Started seaweedfs @ $S3_ENDPOINT"
on_exit s3_stop
}

###################################
# Stops the docker container of minio.
# Stops the docker container of seaweedfs.
#
# Globals:
# MINIO_CONTAINER_ID
# SEAWEEDFS_CONTAINER_ID
###################################
function s3_stop {
docker kill "$MINIO_CONTAINER_ID"
docker rm "$MINIO_CONTAINER_ID"
docker kill "$SEAWEEDFS_CONTAINER_ID"
docker rm "$SEAWEEDFS_CONTAINER_ID"
export S3_ENDPOINT=
export MINIO_CONTAINER_ID=
export SEAWEEDFS_CONTAINER_ID=
}

# always start it while sourcing, so that MINIO_CONTAINER_ID is available from parent script
if [[ $MINIO_CONTAINER_ID ]]; then
# always start it while sourcing, so that SEAWEEDFS_CONTAINER_ID is available from parent script
if [[ $SEAWEEDFS_CONTAINER_ID ]]; then
s3_stop
fi
s3_start
Expand All @@ -95,9 +96,9 @@ function s3_setup {

set_config_key "s3.access-key" "$AWS_ACCESS_KEY_ID"
set_config_key "s3.secret-key" "$AWS_SECRET_ACCESS_KEY"
# change endpoint to minio's location
# change endpoint to seaweedfs's location
set_config_key "s3.endpoint" "$S3_ENDPOINT"
# If the test is using virtual host style (default), then it tries to reach minio on <bucket>.localhost:<port>,
# If the test is using virtual host style (default), then it tries to reach seaweedfs on <bucket>.localhost:<port>,
# which docker does not properly forward.
set_config_key "s3.path.style.access" "true"
set_config_key "s3.path-style-access" "true"
Expand All @@ -107,12 +108,17 @@ function s3_setup_with_provider {
add_optional_plugin "s3-fs-$1"
# reads (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
set_config_key "$2" "com.amazonaws.auth.EnvironmentVariableCredentialsProvider"
# change endpoint to minio's location
# change endpoint to seaweedfs's location
set_config_key "s3.endpoint" "$S3_ENDPOINT"
# If the test is using virtual host style (default), then it tries to reach minio on <bucket>.localhost:<port>,
# If the test is using virtual host style (default), then it tries to reach seaweedfs on <bucket>.localhost:<port>,
# which docker does not properly forward.
set_config_key "s3.path.style.access" "true"
set_config_key "s3.path-style-access" "true"
}

source "$(dirname "$0")"/common_s3_operations.sh

# SeaweedFS does not expose on-disk directories as S3 buckets,
# so the bucket must be created explicitly
aws_cli s3 mb "s3://$IT_CASE_S3_BUCKET"
aws_cli s3 cp "/hostdir/test-data/words" "$S3_TEST_DATA_WORDS_URI"
17 changes: 9 additions & 8 deletions flink-end-to-end-tests/test-scripts/test_batch_wordcount.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ case $INPUT_TYPE in
on_exit "s3_delete_by_full_path_prefix '$S3_PREFIX'"
fetch_complete_result=(s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "${S3_PREFIX}" true)
;;
(hadoop_minio)
source "$(dirname "$0")"/common_s3_minio.sh
(hadoop_seaweedfs)
source "$(dirname "$0")"/common_s3_seaweedfs.sh
s3_setup hadoop
ARGS="--execution-mode BATCH --input ${S3_TEST_DATA_WORDS_URI} --output s3://$IT_CASE_S3_BUCKET/$S3_PREFIX"
OUTPUT_PATH="$TEST_INFRA_DIR/$IT_CASE_S3_BUCKET/$S3_PREFIX"
OUTPUT_PATH="$TEST_DATA_DIR/$S3_PREFIX"
on_exit "s3_delete_by_full_path_prefix '$S3_PREFIX'"
fetch_complete_result=(s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "${S3_PREFIX}" true)
;;
(hadoop_with_provider)
source "$(dirname "$0")"/common_s3.sh
Expand All @@ -60,11 +62,10 @@ case $INPUT_TYPE in
on_exit "s3_delete_by_full_path_prefix '$S3_PREFIX'"
fetch_complete_result=(s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "${S3_PREFIX}" true)
;;
(presto_minio)
source "$(dirname "$0")"/common_s3_minio.sh
(presto_seaweedfs_read)
source "$(dirname "$0")"/common_s3_seaweedfs.sh
s3_setup presto
ARGS="--execution-mode BATCH --input ${S3_TEST_DATA_WORDS_URI} --output s3://$IT_CASE_S3_BUCKET/$S3_PREFIX"
OUTPUT_PATH="$TEST_INFRA_DIR/$IT_CASE_S3_BUCKET/$S3_PREFIX"
ARGS="--execution-mode BATCH --input ${S3_TEST_DATA_WORDS_URI} --output ${OUTPUT_PATH}"
;;
(dummy-fs)
source "$(dirname "$0")"/common_dummy_fs.sh
Expand All @@ -84,7 +85,7 @@ start_cluster
# The test may run against different source types.
# But the sources should provide the same test data, so the checksum stays the same for all tests.
${FLINK_DIR}/bin/flink run -p 1 ${FLINK_DIR}/examples/streaming/WordCount.jar ${ARGS}
# Fetches result from AWS s3 to the OUTPUT_PATH, no-op for other filesystems and minio-based tests
# Fetches result from AWS s3 to the OUTPUT_PATH, no-op for other filesystems and seaweedfs-based tests

# it seems we need a function for retry_times
function fetch_it() {
Expand Down
7 changes: 1 addition & 6 deletions flink-end-to-end-tests/test-scripts/test_file_sink.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ function get_total_number_of_valid_lines {
if [ "${OUT_TYPE}" == "local" ]; then
echo "[INFO] Test run in local environment: No S3 environment is loaded."
elif [ "${OUT_TYPE}" == "s3" ]; then
source "$(dirname "$0")"/common_s3_minio.sh
source "$(dirname "$0")"/common_s3_seaweedfs.sh
s3_setup hadoop

# overwrites JOB_OUTPUT_PATH to point to S3
Expand All @@ -80,11 +80,6 @@ elif [ "${OUT_TYPE}" == "s3" ]; then
find "${LOCAL_JOB_OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g
}

# overwrites implementation for local runs
function get_total_number_of_valid_lines {
s3_get_number_of_lines_by_prefix "${S3_DATA_PREFIX}" "part-"
}

# make sure we delete the file at the end
function out_cleanup {
s3_delete_by_full_path_prefix "${S3_DATA_PREFIX}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ fi

# setup materialized table data dir
echo "[INFO] Start S3 env"
source "$(dirname "$0")"/common_s3_minio.sh
source "$(dirname "$0")"/common_s3_seaweedfs.sh
s3_setup hadoop
S3_TEST_DATA_WORDS_URI="s3://$IT_CASE_S3_BUCKET/"
MATERIALIZED_TABLE_DATA_DIR="${S3_TEST_DATA_WORDS_URI}/test_materialized_table-$(uuidgen)"
Expand Down
4 changes: 2 additions & 2 deletions flink-filesystems/flink-s3-fs-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ under the License.
<fs.s3.aws.version>1.12.779</fs.s3.aws.version>
<japicmp.skip>true</japicmp.skip>
<surefire.module.config> <!--
S5CmdOnMinioITCase uses ArraysAsListSerializer indirectly
S5CmdOnSeaweedFsITCase uses ArraysAsListSerializer indirectly
-->--add-opens=java.base/java.util=ALL-UNNAMED
</surefire.module.config>
</properties>
Expand Down Expand Up @@ -245,7 +245,7 @@ under the License.
</exclusions>
</dependency>

<!-- flink-runtime and curator dependencies are needed for the HAJobRunOnMinioS3StoreITCase -->
<!-- flink-runtime and curator dependencies are needed for the HAJobRunOnSeaweedFsS3StoreITCase -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,22 @@
import static org.assertj.core.api.Assertions.assertThat;

/**
* {@code ApplicationRunOnMinioS3StoreITCase} covers an application run where the HA data is stored
* in Minio. The implementation verifies whether the {@code JobResult} was written into the
* FileSystem-backed {@code ApplicationResultStore}.
* {@code HAApplicationRunOnSeaweedFsS3StoreITCase} covers an application run where the HA data is
* stored in SeaweedFs. The implementation verifies whether the {@code JobResult} was written into
* the FileSystem-backed {@code ApplicationResultStore}.
*/
public abstract class HAApplicationRunOnMinioS3StoreITCase extends AbstractHAApplicationRunITCase {
public abstract class HAApplicationRunOnSeaweedFsS3StoreITCase
extends AbstractHAApplicationRunITCase {

private static final String CLUSTER_ID = "test-cluster";
private static final String APPLICATION_RESULT_STORE_FOLDER = "ars";

@RegisterExtension
@Order(2)
private static final AllCallbackWrapper<TestContainerExtension<MinioTestContainer>>
MINIO_EXTENSION =
new AllCallbackWrapper<>(new TestContainerExtension<>(MinioTestContainer::new));
private static final AllCallbackWrapper<TestContainerExtension<SeaweedFsTestContainer>>
SEAWEEDFS_EXTENSION =
new AllCallbackWrapper<>(
new TestContainerExtension<>(SeaweedFsTestContainer::new));

@RegisterExtension
@Order(3)
Expand All @@ -71,19 +73,19 @@ public abstract class HAApplicationRunOnMinioS3StoreITCase extends AbstractHAApp
.build();
});

private static MinioTestContainer getMinioContainer() {
return MINIO_EXTENSION.getCustomExtension().getTestContainer();
private static SeaweedFsTestContainer getSeaweedFsContainer() {
return SEAWEEDFS_EXTENSION.getCustomExtension().getTestContainer();
}

private static String createS3URIWithSubPath(String... subfolders) {
return getMinioContainer().getS3UriForDefaultBucket() + createSubPath(subfolders);
return getSeaweedFsContainer().getS3UriForDefaultBucket() + createSubPath(subfolders);
}

private static List<S3ObjectSummary> getObjectsFromApplicationResultStore() {
return getMinioContainer()
return getSeaweedFsContainer()
.getClient()
.listObjects(
getMinioContainer().getDefaultBucketName(),
getSeaweedFsContainer().getDefaultBucketName(),
createSubPath(CLUSTER_ID, APPLICATION_RESULT_STORE_FOLDER))
.getObjectSummaries();
}
Expand All @@ -96,7 +98,7 @@ private static String createSubPath(String... subfolders) {
private static Configuration createConfiguration() {
final Configuration config = new Configuration();

getMinioContainer().setS3ConfigOptions(config);
getSeaweedFsContainer().setS3ConfigOptions(config);

// ApplicationResultStore configuration
config.set(ApplicationResultStoreOptions.DELETE_ON_COMMIT, Boolean.FALSE);
Expand Down Expand Up @@ -142,7 +144,7 @@ protected void runAfterApplicationTermination() throws Exception {
::hasValidDirtyApplicationResultStoreEntryExtension));

final String objContent =
getMinioContainer()
getSeaweedFsContainer()
.getClient()
.getObjectAsString(objRef.getBucketName(), objRef.getKey());
assertThat(objContent).contains(ApplicationState.FINISHED.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,21 @@
import static org.assertj.core.api.Assertions.assertThat;

/**
* {@code HAJobRunOnMinioS3StoreITCase} covers a job run where the HA data is stored in Minio. The
* implementation verifies whether the {@code JobResult} was written into the FileSystem-backed
* {@code JobResultStore}.
* {@code HAJobRunOnSeaweedFsS3StoreITCase} covers a job run where the HA data is stored in
* SeaweedFs. The implementation verifies whether the {@code JobResult} was written into the
* FileSystem-backed {@code JobResultStore}.
*/
public abstract class HAJobRunOnMinioS3StoreITCase extends AbstractHAJobRunITCase {
public abstract class HAJobRunOnSeaweedFsS3StoreITCase extends AbstractHAJobRunITCase {

private static final String CLUSTER_ID = "test-cluster";
private static final String JOB_RESULT_STORE_FOLDER = "jrs";

@RegisterExtension
@Order(2)
private static final AllCallbackWrapper<TestContainerExtension<MinioTestContainer>>
MINIO_EXTENSION =
new AllCallbackWrapper<>(new TestContainerExtension<>(MinioTestContainer::new));
private static final AllCallbackWrapper<TestContainerExtension<SeaweedFsTestContainer>>
SEAWEEDFS_EXTENSION =
new AllCallbackWrapper<>(
new TestContainerExtension<>(SeaweedFsTestContainer::new));

@RegisterExtension
@Order(3)
Expand All @@ -71,19 +72,19 @@ public abstract class HAJobRunOnMinioS3StoreITCase extends AbstractHAJobRunITCas
.build();
});

private static MinioTestContainer getMinioContainer() {
return MINIO_EXTENSION.getCustomExtension().getTestContainer();
private static SeaweedFsTestContainer getSeaweedFsContainer() {
return SEAWEEDFS_EXTENSION.getCustomExtension().getTestContainer();
}

private static String createS3URIWithSubPath(String... subfolders) {
return getMinioContainer().getS3UriForDefaultBucket() + createSubPath(subfolders);
return getSeaweedFsContainer().getS3UriForDefaultBucket() + createSubPath(subfolders);
}

private static List<S3ObjectSummary> getObjectsFromJobResultStore() {
return getMinioContainer()
return getSeaweedFsContainer()
.getClient()
.listObjects(
getMinioContainer().getDefaultBucketName(),
getSeaweedFsContainer().getDefaultBucketName(),
createSubPath(CLUSTER_ID, JOB_RESULT_STORE_FOLDER))
.getObjectSummaries();
}
Expand All @@ -96,7 +97,7 @@ private static String createSubPath(String... subfolders) {
private static Configuration createConfiguration() {
final Configuration config = new Configuration();

getMinioContainer().setS3ConfigOptions(config);
getSeaweedFsContainer().setS3ConfigOptions(config);

// JobResultStore configuration
config.set(JobResultStoreOptions.DELETE_ON_COMMIT, Boolean.FALSE);
Expand Down Expand Up @@ -136,7 +137,7 @@ protected void runAfterJobTermination() throws Exception {
.matches(not(FileSystemJobResultStore::hasValidDirtyJobResultStoreEntryExtension));

final String objContent =
getMinioContainer()
getSeaweedFsContainer()
.getClient()
.getObjectAsString(objRef.getBucketName(), objRef.getKey());
assertThat(objContent).contains(ApplicationStatus.SUCCEEDED.name());
Expand Down
Loading