diff --git a/docs/content/docs/connectors/datastream/redis.md b/docs/content/docs/connectors/datastream/redis.md
new file mode 100644
index 0000000..46c866f
--- /dev/null
+++ b/docs/content/docs/connectors/datastream/redis.md
@@ -0,0 +1,58 @@
+---
+title: Redis
+weight: 5
+type: docs
+aliases:
+---
+
+
+# Redis Connector
+
+This connector provides sinks that can request document actions to an
+[Redis](https://redis.io/). To use this connector, add the following
+dependencies to your project:
+
+
+
+
+ | Redis version |
+ Maven Dependency |
+
+
+
+
+ | 7.x |
+ {{< connector_artifact flink-connector-redis-streams 3.0.0 >}} |
+
+
+
+
+{{< py_download_link "redis" >}}
+
+Note that the streaming connectors are currently not part of the binary
+distribution. See [here]({{< ref "docs/dev/configuration/overview" >}}) for information
+about how to package the program with the libraries for cluster execution.
+
+## Installing Redis
+
+Instructions for setting up a Redis cluster can be found
+[here](https://redis.io/docs/getting-started/installation/).
+
+
diff --git a/flink-connector-redis-streams/pom.xml b/flink-connector-redis-streams/pom.xml
new file mode 100644
index 0000000..4a8b3b1
--- /dev/null
+++ b/flink-connector-redis-streams/pom.xml
@@ -0,0 +1,36 @@
+
+
+
+ flink-connector-redis-parent
+ org.apache.flink
+ 3.0-SNAPSHOT
+
+ 4.0.0
+
+ flink-connector-redis-streams
+ Flink : Connectors : Redis : Streams
+
+ jar
+
+
+
+ redis.clients
+ jedis
+
+
+
+ org.apache.flink
+ flink-connector-base
+ ${flink.version}
+
+
+
+ org.apache.flink
+ flink-streaming-java
+
+
+
+
+
\ No newline at end of file
diff --git a/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsCommand.java b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsCommand.java
new file mode 100644
index 0000000..0b98578
--- /dev/null
+++ b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsCommand.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.redis.streams.sink;
+
+import org.apache.flink.connector.redis.streams.sink.connection.JedisConnector;
+
+import redis.clients.jedis.StreamEntryID;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/** A Redis Streams Command. */
+public class RedisStreamsCommand implements Serializable {
+
+ private transient StreamEntryID streamId = null;
+ public final String key;
+ public final Map value;
+
+ private RedisStreamsCommand(String key, Map value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public void send(JedisConnector connector) {
+ this.streamId =
+ connector
+ .getJedisCommands()
+ .xadd(
+ key,
+ (this.streamId != null) ? this.streamId : StreamEntryID.NEW_ENTRY,
+ value);
+ }
+
+ public boolean sendCorrectly() {
+ return true;
+ }
+
+ public boolean sendIncorrectly() {
+ return !sendCorrectly();
+ }
+
+ public long getMessageSize() {
+ return this.key.length()
+ + this.value.entrySet().stream()
+ .map(k -> k.getKey().length() + k.getValue().length())
+ .reduce(Integer::sum)
+ .orElse(0);
+ }
+
+ /** The builder for {@link RedisStreamsCommand}. */
+ public static class Builder {
+ private String key;
+ private Map value;
+
+ public Builder withKey(String key) {
+ this.key = key;
+ return this;
+ }
+
+ public Builder withValue(Map value) {
+ this.value = value;
+ return this;
+ }
+
+ public RedisStreamsCommand build() {
+ return new RedisStreamsCommand(key, value);
+ }
+ }
+}
diff --git a/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsCommandSerializer.java b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsCommandSerializer.java
new file mode 100644
index 0000000..e1e98b0
--- /dev/null
+++ b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsCommandSerializer.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.redis.streams.sink;
+
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+/**
+ * Function that creates the description how the input data should be mapped to redis type.
+ *
+ * @param The type of the element handled by this {@code RedisSerializer}
+ */
+public interface RedisStreamsCommandSerializer
+ extends ElementConverter {}
diff --git a/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsSink.java b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsSink.java
new file mode 100644
index 0000000..5b068c2
--- /dev/null
+++ b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsSink.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.redis.streams.sink;
+
+import org.apache.flink.connector.base.sink.AsyncSinkBase;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
+import org.apache.flink.connector.redis.streams.sink.config.JedisConfig;
+import org.apache.flink.connector.redis.streams.sink.connection.JedisConnector;
+import org.apache.flink.connector.redis.streams.sink.connection.JedisConnectorBuilder;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A sink for publishing data into Redis.
+ *
+ * @param
+ */
+public class RedisStreamsSink extends AsyncSinkBase {
+
+ private final JedisConfig jedisConfig;
+
+ public RedisStreamsSink(
+ JedisConfig jedisConfig,
+ RedisStreamsCommandSerializer converter,
+ AsyncSinkWriterConfiguration asyncConfig) {
+ super(
+ converter,
+ asyncConfig.getMaxBatchSize(),
+ asyncConfig.getMaxInFlightRequests(),
+ asyncConfig.getMaxBufferedRequests(),
+ asyncConfig.getMaxBatchSizeInBytes(),
+ asyncConfig.getMaxTimeInBufferMS(),
+ asyncConfig.getMaxRecordSizeInBytes());
+ this.jedisConfig = jedisConfig;
+ }
+
+ @Override
+ public RedisStreamsWriter createWriter(InitContext initContext) throws IOException {
+ return restoreWriter(initContext, Collections.emptyList());
+ }
+
+ @Override
+ public RedisStreamsWriter restoreWriter(
+ InitContext initContext,
+ Collection> recoveredState)
+ throws IOException {
+ AsyncSinkWriterConfiguration asyncConfig =
+ AsyncSinkWriterConfiguration.builder()
+ .setMaxBatchSize(getMaxBatchSize())
+ .setMaxBatchSizeInBytes(getMaxBatchSizeInBytes())
+ .setMaxInFlightRequests(getMaxInFlightRequests())
+ .setMaxBufferedRequests(getMaxBufferedRequests())
+ .setMaxTimeInBufferMS(getMaxTimeInBufferMS())
+ .setMaxRecordSizeInBytes(getMaxRecordSizeInBytes())
+ .build();
+ JedisConnector connection = JedisConnectorBuilder.build(jedisConfig);
+ return new RedisStreamsWriter<>(
+ connection, getElementConverter(), asyncConfig, initContext, recoveredState);
+ }
+
+ @Override
+ public SimpleVersionedSerializer>
+ getWriterStateSerializer() {
+ return new RedisStreamsStateSerializer();
+ }
+}
diff --git a/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsStateSerializer.java b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsStateSerializer.java
new file mode 100644
index 0000000..1673c1e
--- /dev/null
+++ b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsStateSerializer.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.redis.streams.sink;
+
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.RequestEntryWrapper;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** The Redis implementation for {@link SimpleVersionedSerializer}. */
+public class RedisStreamsStateSerializer
+ implements SimpleVersionedSerializer> {
+
+ @Override
+ public int getVersion() {
+ return 1;
+ }
+
+ @Override
+ public byte[] serialize(BufferedRequestState obj) throws IOException {
+ Collection> bufferState =
+ obj.getBufferedRequestEntries();
+
+ try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final DataOutputStream out = new DataOutputStream(baos)) {
+
+ out.writeInt(getVersion());
+ out.writeInt(bufferState.size());
+
+ for (RequestEntryWrapper wrapper : bufferState) {
+ RedisStreamsCommand command = wrapper.getRequestEntry();
+ writeString(out, command.key);
+ out.writeInt(command.value.size());
+ for (Map.Entry entry : command.value.entrySet()) {
+ writeString(out, entry.getKey());
+ writeString(out, entry.getValue());
+ }
+ }
+
+ out.flush();
+ return baos.toByteArray();
+ }
+ }
+
+ @Override
+ public BufferedRequestState deserialize(int version, byte[] serialized)
+ throws IOException {
+ try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+ final DataInputStream in = new DataInputStream(bais)) {
+
+ int byteVersion = in.readInt();
+
+ int bufferSize = in.readInt();
+ List> state = new ArrayList<>();
+ for (int bs = 0; bs < bufferSize; bs++) {
+ String key = readString(in);
+
+ int valueSize = in.readInt();
+ Map values = new HashMap<>();
+ for (int i = 0; i < valueSize; i++) {
+ String eKey = readString(in);
+ String eValue = readString(in);
+ values.put(eKey, eValue);
+ }
+
+ RedisStreamsCommand command =
+ RedisStreamsCommand.builder().withKey(key).withValue(values).build();
+
+ state.add(new RequestEntryWrapper<>(command, command.getMessageSize()));
+ }
+ return new BufferedRequestState<>(state);
+ }
+ }
+
+ private void writeString(final DataOutputStream out, String value) throws IOException {
+ out.writeInt(value.length());
+ out.writeBytes(value);
+ }
+
+ private String readString(final DataInputStream in) throws IOException {
+ int sizeToRead = in.readInt();
+ byte[] bytesRead = new byte[sizeToRead];
+ int sizeRead = in.read(bytesRead);
+
+ if (sizeToRead != sizeRead) {
+ throw new IOException(
+ String.format("Expected to read %s but read %s", sizeToRead, sizeRead));
+ }
+
+ return new String(bytesRead);
+ }
+}
diff --git a/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsWriter.java b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsWriter.java
new file mode 100644
index 0000000..516d664
--- /dev/null
+++ b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsWriter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.redis.streams.sink;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
+import org.apache.flink.connector.redis.streams.sink.connection.JedisConnector;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+class RedisStreamsWriter extends AsyncSinkWriter {
+
+ private final JedisConnector jedisConnector;
+
+ public RedisStreamsWriter(
+ JedisConnector jedisConnector,
+ ElementConverter elementConverter,
+ AsyncSinkWriterConfiguration asyncConfig,
+ Sink.InitContext initContext,
+ Collection> recoveredState) {
+ super(elementConverter, initContext, asyncConfig, recoveredState);
+ this.jedisConnector = jedisConnector;
+ }
+
+ @Override
+ protected void submitRequestEntries(
+ List requestEntries,
+ Consumer> requestResult) {
+ List errors =
+ requestEntries.stream()
+ .peek(command -> command.send(this.jedisConnector))
+ .filter(RedisStreamsCommand::sendIncorrectly)
+ .collect(Collectors.toList());
+
+ requestResult.accept(errors);
+ }
+
+ @Override
+ protected long getSizeInBytes(RedisStreamsCommand command) {
+ return command.getMessageSize();
+ }
+}
diff --git a/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/config/JedisClusterConfig.java b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/config/JedisClusterConfig.java
new file mode 100644
index 0000000..076d74f
--- /dev/null
+++ b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/config/JedisClusterConfig.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.redis.streams.sink.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Protocol;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Configuration for Jedis cluster. */
+public class JedisClusterConfig extends JedisConfig {
+
+ private final Set nodes;
+ private final int maxRedirections;
+
+ /**
+ * Jedis cluster configuration. The list of node is mandatory, and when nodes is not set, it
+ * throws NullPointerException.
+ *
+ * @param nodes list of node information for JedisCluster
+ * @param connectionTimeout socket / connection timeout. The default is 2000
+ * @param maxRedirections limit of redirections-how much we'll follow MOVED or ASK
+ * @param maxTotal the maximum number of objects that can be allocated by the pool
+ * @param maxIdle the cap on the number of "idle" instances in the pool
+ * @param minIdle the minimum number of idle objects to maintain in the pool
+ * @param password the password of redis cluster
+ * @param testOnBorrow Whether objects borrowed from the pool will be validated before being
+ * returned, default value is false
+ * @param testOnReturn Whether objects borrowed from the pool will be validated when they are
+ * returned to the pool, default value is false
+ * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the idle
+ * object evictor, default value is false
+ * @throws NullPointerException if parameter {@code nodes} is {@code null}
+ */
+ private JedisClusterConfig(
+ Set nodes,
+ int connectionTimeout,
+ int maxRedirections,
+ int maxTotal,
+ int maxIdle,
+ int minIdle,
+ String password,
+ boolean testOnBorrow,
+ boolean testOnReturn,
+ boolean testWhileIdle) {
+ super(
+ connectionTimeout,
+ maxTotal,
+ maxIdle,
+ minIdle,
+ password,
+ testOnBorrow,
+ testOnReturn,
+ testWhileIdle);
+
+ checkNotNull(nodes, "Node information should be presented");
+ checkArgument(!nodes.isEmpty(), "Redis cluster hosts should not be empty");
+ this.nodes = new HashSet<>(nodes);
+ this.maxRedirections = maxRedirections;
+ }
+
+ /**
+ * Returns nodes.
+ *
+ * @return list of node information
+ */
+ public Set getNodes() {
+ Set ret = new HashSet<>();
+ for (InetSocketAddress node : nodes) {
+ ret.add(new HostAndPort(node.getHostName(), node.getPort()));
+ }
+ return ret;
+ }
+
+ /**
+ * Returns limit of redirection.
+ *
+ * @return limit of redirection
+ */
+ public int getMaxRedirections() {
+ return maxRedirections;
+ }
+
+ /** Builder for initializing {@link JedisClusterConfig}. */
+ public static class Builder {
+ private Set nodes;
+ private int timeout = Protocol.DEFAULT_TIMEOUT;
+ private int maxRedirections = 5;
+ private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
+ private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
+ private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
+ private boolean testOnBorrow = GenericObjectPoolConfig.DEFAULT_TEST_ON_BORROW;
+ private boolean testOnReturn = GenericObjectPoolConfig.DEFAULT_TEST_ON_RETURN;
+ private boolean testWhileIdle = GenericObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE;
+ private String password;
+
+ /**
+ * Sets list of node.
+ *
+ * @param nodes list of node
+ * @return Builder itself
+ */
+ public Builder setNodes(Set nodes) {
+ this.nodes = nodes;
+ return this;
+ }
+
+ /**
+ * Sets socket / connection timeout.
+ *
+ * @param timeout socket / connection timeout, default value is 2000
+ * @return Builder itself
+ */
+ public Builder setTimeout(int timeout) {
+ this.timeout = timeout;
+ return this;
+ }
+
+ /**
+ * Sets limit of redirection.
+ *
+ * @param maxRedirections limit of redirection, default value is 5
+ * @return Builder itself
+ */
+ public Builder setMaxRedirections(int maxRedirections) {
+ this.maxRedirections = maxRedirections;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code maxTotal} configuration attribute for pools to be created with
+ * this configuration instance.
+ *
+ * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool,
+ * default value is 8
+ * @return Builder itself
+ */
+ public Builder setMaxTotal(int maxTotal) {
+ this.maxTotal = maxTotal;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code maxIdle} configuration attribute for pools to be created with
+ * this configuration instance.
+ *
+ * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8
+ * @return Builder itself
+ */
+ public Builder setMaxIdle(int maxIdle) {
+ this.maxIdle = maxIdle;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code minIdle} configuration attribute for pools to be created with
+ * this configuration instance.
+ *
+ * @param minIdle the minimum number of idle objects to maintain in the pool, default value
+ * is 0
+ * @return Builder itself
+ */
+ public Builder setMinIdle(int minIdle) {
+ this.minIdle = minIdle;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code password} configuration attribute for pools to be created with
+ * this configuration instance.
+ *
+ * @param password the password for accessing redis cluster
+ * @return Builder itself
+ */
+ public Builder setPassword(String password) {
+ this.password = password;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code testOnBorrow} configuration attribute for pools to be created
+ * with this configuration instance.
+ *
+ * @param testOnBorrow Whether objects borrowed from the pool will be validated before being
+ * returned
+ * @return Builder itself
+ */
+ public Builder setTestOnBorrow(boolean testOnBorrow) {
+ this.testOnBorrow = testOnBorrow;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code testOnReturn} configuration attribute for pools to be created
+ * with this configuration instance.
+ *
+ * @param testOnReturn Whether objects borrowed from the pool will be validated when they
+ * are returned to the pool
+ * @return Builder itself
+ */
+ public Builder setTestOnReturn(boolean testOnReturn) {
+ this.testOnReturn = testOnReturn;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code testWhileIdle} configuration attribute for pools to be created
+ * with this configuration instance. Setting this to true will also set default idle-testing
+ * parameters provided in Jedis
+ *
+ * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the
+ * idle object evictor
+ * @return Builder itself
+ * @see redis.clients.jedis.JedisPoolConfig
+ */
+ public Builder setTestWhileIdle(boolean testWhileIdle) {
+ this.testWhileIdle = testWhileIdle;
+ return this;
+ }
+
+ /**
+ * Builds JedisClusterConfig.
+ *
+ * @return JedisClusterConfig
+ */
+ public JedisClusterConfig build() {
+ return new JedisClusterConfig(
+ nodes,
+ timeout,
+ maxRedirections,
+ maxTotal,
+ maxIdle,
+ minIdle,
+ password,
+ testOnBorrow,
+ testOnReturn,
+ testWhileIdle);
+ }
+ }
+}
diff --git a/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/config/JedisConfig.java b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/config/JedisConfig.java
new file mode 100644
index 0000000..7b8c65e
--- /dev/null
+++ b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/config/JedisConfig.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.redis.streams.sink.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Base class for Flink Redis configuration. */
+public abstract class JedisConfig implements Serializable {
+
+ protected final int maxTotal;
+ protected final int maxIdle;
+ protected final int minIdle;
+ protected final int connectionTimeout;
+ protected final String password;
+
+ protected final boolean testOnBorrow;
+ protected final boolean testOnReturn;
+ protected final boolean testWhileIdle;
+
+ protected JedisConfig(
+ int connectionTimeout,
+ int maxTotal,
+ int maxIdle,
+ int minIdle,
+ String password,
+ boolean testOnBorrow,
+ boolean testOnReturn,
+ boolean testWhileIdle) {
+
+ checkArgument(connectionTimeout >= 0, "connection timeout can not be negative");
+ checkArgument(maxTotal >= 0, "maxTotal value can not be negative");
+ checkArgument(maxIdle >= 0, "maxIdle value can not be negative");
+ checkArgument(minIdle >= 0, "minIdle value can not be negative");
+
+ this.connectionTimeout = connectionTimeout;
+ this.maxTotal = maxTotal;
+ this.maxIdle = maxIdle;
+ this.minIdle = minIdle;
+ this.testOnBorrow = testOnBorrow;
+ this.testOnReturn = testOnReturn;
+ this.testWhileIdle = testWhileIdle;
+ this.password = password;
+ }
+
+ /**
+ * Returns timeout.
+ *
+ * @return connection timeout
+ */
+ public int getConnectionTimeout() {
+ return connectionTimeout;
+ }
+
+ /**
+ * Get the value for the {@code maxTotal} configuration attribute for pools to be created with
+ * this configuration instance.
+ *
+ * @return The current setting of {@code maxTotal} for this configuration instance
+ * @see GenericObjectPoolConfig#getMaxTotal()
+ */
+ public int getMaxTotal() {
+ return maxTotal;
+ }
+
+ /**
+ * Get the value for the {@code maxIdle} configuration attribute for pools to be created with
+ * this configuration instance.
+ *
+ * @return The current setting of {@code maxIdle} for this configuration instance
+ * @see GenericObjectPoolConfig#getMaxIdle()
+ */
+ public int getMaxIdle() {
+ return maxIdle;
+ }
+
+ /**
+ * Get the value for the {@code minIdle} configuration attribute for pools to be created with
+ * this configuration instance.
+ *
+ * @return The current setting of {@code minIdle} for this configuration instance
+ * @see GenericObjectPoolConfig#getMinIdle()
+ */
+ public int getMinIdle() {
+ return minIdle;
+ }
+
+ /**
+ * Returns password.
+ *
+ * @return password
+ */
+ public String getPassword() {
+ return password;
+ }
+
+ /**
+ * Get the value for the {@code testOnBorrow} configuration attribute for pools to be created
+ * with this configuration instance.
+ *
+ * @return The current setting of {@code testOnBorrow} for this configuration instance
+ * @see GenericObjectPoolConfig#getTestOnBorrow()
+ */
+ public boolean getTestOnBorrow() {
+ return testOnBorrow;
+ }
+
+ /**
+ * Get the value for the {@code testOnReturn} configuration attribute for pools to be created
+ * with this configuration instance.
+ *
+ * @return The current setting of {@code testOnReturn} for this configuration instance
+ * @see GenericObjectPoolConfig#getTestOnReturn()
+ */
+ public boolean getTestOnReturn() {
+ return testOnReturn;
+ }
+
+ /**
+ * Get the value for the {@code testWhileIdle} configuration attribute for pools to be created
+ * with this configuration instance.
+ *
+ * @return The current setting of {@code testWhileIdle} for this configuration instance
+ * @see GenericObjectPoolConfig#getTestWhileIdle()
+ */
+ public boolean getTestWhileIdle() {
+ return testWhileIdle;
+ }
+}
diff --git a/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/config/JedisPoolConfig.java b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/config/JedisPoolConfig.java
new file mode 100644
index 0000000..dfe85c8
--- /dev/null
+++ b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/config/JedisPoolConfig.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.redis.streams.sink.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import redis.clients.jedis.Protocol;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Configuration for Jedis pool. */
+public class JedisPoolConfig extends JedisConfig {
+
+ private final String host;
+ private final int port;
+ private final int database;
+
+ /**
+ * Jedis pool configuration. The host is mandatory, and when host is not set, it throws
+ * NullPointerException.
+ *
+ * @param host hostname or IP
+ * @param port port, default value is 6379
+ * @param connectionTimeout socket / connection timeout, default value is 2000 milli second
+ * @param password password, if any
+ * @param database database index
+ * @param maxTotal the maximum number of objects that can be allocated by the pool, default
+ * value is 8
+ * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8
+ * @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0
+ * @param testOnBorrow Whether objects borrowed from the pool will be validated before being
+ * returned, default value is false
+ * @param testOnReturn Whether objects borrowed from the pool will be validated when they are
+ * returned to the pool, default value is false
+ * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the idle
+ * object evictor, default value is false
+ * @throws NullPointerException if parameter {@code host} is {@code null}
+ */
+ private JedisPoolConfig(
+ String host,
+ int port,
+ int connectionTimeout,
+ String password,
+ int database,
+ int maxTotal,
+ int maxIdle,
+ int minIdle,
+ boolean testOnBorrow,
+ boolean testOnReturn,
+ boolean testWhileIdle) {
+ super(
+ connectionTimeout,
+ maxTotal,
+ maxIdle,
+ minIdle,
+ password,
+ testOnBorrow,
+ testOnReturn,
+ testWhileIdle);
+
+ checkNotNull(host, "Host information should be presented");
+ this.host = host;
+ this.port = port;
+ this.database = database;
+ }
+
+ /**
+ * Returns host.
+ *
+ * @return hostname or IP
+ */
+ public String getHost() {
+ return host;
+ }
+
+ /**
+ * Returns port.
+ *
+ * @return port
+ */
+ public int getPort() {
+ return port;
+ }
+
+ /**
+ * Returns database index.
+ *
+ * @return database index
+ */
+ public int getDatabase() {
+ return database;
+ }
+
+ /** Builder for initializing {@link JedisPoolConfig}. */
+ public static class Builder {
+ private String host;
+ private int port = Protocol.DEFAULT_PORT;
+ private int timeout = Protocol.DEFAULT_TIMEOUT;
+ private int database = Protocol.DEFAULT_DATABASE;
+ private String password;
+ private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
+ private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
+ private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
+ private boolean testOnBorrow = GenericObjectPoolConfig.DEFAULT_TEST_ON_BORROW;
+ private boolean testOnReturn = GenericObjectPoolConfig.DEFAULT_TEST_ON_RETURN;
+ private boolean testWhileIdle = GenericObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE;
+
+ /**
+ * Sets value for the {@code maxTotal} configuration attribute for pools to be created with
+ * this configuration instance.
+ *
+ * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool,
+ * default value is 8
+ * @return Builder itself
+ */
+ public Builder setMaxTotal(int maxTotal) {
+ this.maxTotal = maxTotal;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code maxIdle} configuration attribute for pools to be created with
+ * this configuration instance.
+ *
+ * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8
+ * @return Builder itself
+ */
+ public Builder setMaxIdle(int maxIdle) {
+ this.maxIdle = maxIdle;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code minIdle} configuration attribute for pools to be created with
+ * this configuration instance.
+ *
+ * @param minIdle the minimum number of idle objects to maintain in the pool, default value
+ * is 0
+ * @return Builder itself
+ */
+ public Builder setMinIdle(int minIdle) {
+ this.minIdle = minIdle;
+ return this;
+ }
+
+ /**
+ * Sets host.
+ *
+ * @param host host
+ * @return Builder itself
+ */
+ public Builder setHost(String host) {
+ this.host = host;
+ return this;
+ }
+
+ /**
+ * Sets port.
+ *
+ * @param port port, default value is 6379
+ * @return Builder itself
+ */
+ public Builder setPort(int port) {
+ this.port = port;
+ return this;
+ }
+
+ /**
+ * Sets timeout.
+ *
+ * @param timeout timeout, default value is 2000
+ * @return Builder itself
+ */
+ public Builder setTimeout(int timeout) {
+ this.timeout = timeout;
+ return this;
+ }
+
+ /**
+ * Sets database index.
+ *
+ * @param database database index, default value is 0
+ * @return Builder itself
+ */
+ public Builder setDatabase(int database) {
+ this.database = database;
+ return this;
+ }
+
+ /**
+ * Sets password.
+ *
+ * @param password password, if any
+ * @return Builder itself
+ */
+ public Builder setPassword(String password) {
+ this.password = password;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code testOnBorrow} configuration attribute for pools to be created
+ * with this configuration instance.
+ *
+ * @param testOnBorrow Whether objects borrowed from the pool will be validated before being
+ * returned
+ * @return Builder itself
+ */
+ public Builder setTestOnBorrow(boolean testOnBorrow) {
+ this.testOnBorrow = testOnBorrow;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code testOnReturn} configuration attribute for pools to be created
+ * with this configuration instance.
+ *
+ * @param testOnReturn Whether objects borrowed from the pool will be validated when they
+ * are returned to the pool
+ * @return Builder itself
+ */
+ public Builder setTestOnReturn(boolean testOnReturn) {
+ this.testOnReturn = testOnReturn;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code testWhileIdle} configuration attribute for pools to be created
+ * with this configuration instance. Setting this to true will also set default idle-testing
+ * parameters provided in Jedis
+ *
+ * @see redis.clients.jedis.JedisPoolConfig
+ * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the
+ * idle object evictor
+ * @return Builder itself
+ */
+ public Builder setTestWhileIdle(boolean testWhileIdle) {
+ this.testWhileIdle = testWhileIdle;
+ return this;
+ }
+
+ /**
+ * Builds JedisPoolConfig.
+ *
+ * @return JedisPoolConfig
+ */
+ public JedisPoolConfig build() {
+ return new JedisPoolConfig(
+ host,
+ port,
+ timeout,
+ password,
+ database,
+ maxTotal,
+ maxIdle,
+ minIdle,
+ testOnBorrow,
+ testOnReturn,
+ testWhileIdle);
+ }
+ }
+}
diff --git a/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/config/JedisSentinelConfig.java b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/config/JedisSentinelConfig.java
new file mode 100644
index 0000000..49253a5
--- /dev/null
+++ b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/config/JedisSentinelConfig.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.redis.streams.sink.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import redis.clients.jedis.Protocol;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Configuration for Jedis Sentinel pool. */
+public class JedisSentinelConfig extends JedisConfig {
+ private final String masterName;
+ private final Set sentinels;
+ private final int soTimeout;
+ private final int database;
+
+ /**
+ * Jedis Sentinels config. The master name and sentinels are mandatory, and when you didn't set
+ * these, it throws NullPointerException.
+ *
+ * @param masterName master name of the replica set
+ * @param sentinels set of sentinel hosts
+ * @param connectionTimeout timeout connection timeout
+ * @param soTimeout timeout socket timeout
+ * @param password password, if any
+ * @param database database database index
+ * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool
+ * @param maxIdle the cap on the number of "idle" instances in the pool
+ * @param minIdle the minimum number of idle objects to maintain in the pool
+ * @param testOnBorrow Whether objects borrowed from the pool will be validated before being
+ * returned, default value is false
+ * @param testOnReturn Whether objects borrowed from the pool will be validated when they are
+ * returned to the pool, default value is false
+ * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the idle
+ * object evictor, default value is false
+ * @throws NullPointerException if {@code masterName} or {@code sentinels} is {@code null}
+ * @throws IllegalArgumentException if {@code sentinels} are empty
+ */
+ private JedisSentinelConfig(
+ String masterName,
+ Set sentinels,
+ int connectionTimeout,
+ int soTimeout,
+ String password,
+ int database,
+ int maxTotal,
+ int maxIdle,
+ int minIdle,
+ boolean testOnBorrow,
+ boolean testOnReturn,
+ boolean testWhileIdle) {
+ super(
+ connectionTimeout,
+ maxTotal,
+ maxIdle,
+ minIdle,
+ password,
+ testOnBorrow,
+ testOnReturn,
+ testWhileIdle);
+
+ checkNotNull(masterName, "Master name should be presented");
+ checkNotNull(sentinels, "Sentinels information should be presented");
+ checkArgument(!sentinels.isEmpty(), "Sentinel hosts should not be empty");
+
+ this.masterName = masterName;
+ this.sentinels = new HashSet<>(sentinels);
+ this.soTimeout = soTimeout;
+ this.database = database;
+ }
+
+ /**
+ * Returns master name of the replica set.
+ *
+ * @return master name of the replica set.
+ */
+ public String getMasterName() {
+ return masterName;
+ }
+
+ /**
+ * Returns Sentinels host addresses.
+ *
+ * @return Set of Sentinels host addresses
+ */
+ public Set getSentinels() {
+ return sentinels;
+ }
+
+ /**
+ * Returns socket timeout.
+ *
+ * @return socket timeout
+ */
+ public int getSoTimeout() {
+ return soTimeout;
+ }
+
+ /**
+ * Returns database index.
+ *
+ * @return database index
+ */
+ public int getDatabase() {
+ return database;
+ }
+
+ /** Builder for initializing {@link JedisSentinelConfig}. */
+ public static class Builder {
+ private String masterName;
+ private Set sentinels;
+ private int connectionTimeout = Protocol.DEFAULT_TIMEOUT;
+ private int soTimeout = Protocol.DEFAULT_TIMEOUT;
+ private String password;
+ private int database = Protocol.DEFAULT_DATABASE;
+ private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
+ private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
+ private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
+ private boolean testOnBorrow = GenericObjectPoolConfig.DEFAULT_TEST_ON_BORROW;
+ private boolean testOnReturn = GenericObjectPoolConfig.DEFAULT_TEST_ON_RETURN;
+ private boolean testWhileIdle = GenericObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE;
+
+ /**
+ * Sets master name of the replica set.
+ *
+ * @param masterName master name of the replica set
+ * @return Builder itself
+ */
+ public Builder setMasterName(String masterName) {
+ this.masterName = masterName;
+ return this;
+ }
+
+ /**
+ * Sets sentinels address.
+ *
+ * @param sentinels host set of the sentinels
+ * @return Builder itself
+ */
+ public Builder setSentinels(Set sentinels) {
+ this.sentinels = sentinels;
+ return this;
+ }
+
+ /**
+ * Sets connection timeout.
+ *
+ * @param connectionTimeout connection timeout, default value is 2000
+ * @return Builder itself
+ */
+ public Builder setConnectionTimeout(int connectionTimeout) {
+ this.connectionTimeout = connectionTimeout;
+ return this;
+ }
+
+ /**
+ * Sets socket timeout.
+ *
+ * @param soTimeout socket timeout, default value is 2000
+ * @return Builder itself
+ */
+ public Builder setSoTimeout(int soTimeout) {
+ this.soTimeout = soTimeout;
+ return this;
+ }
+
+ /**
+ * Sets password.
+ *
+ * @param password password, if any
+ * @return Builder itself
+ */
+ public Builder setPassword(String password) {
+ this.password = password;
+ return this;
+ }
+
+ /**
+ * Sets database index.
+ *
+ * @param database database index, default value is 0
+ * @return Builder itself
+ */
+ public Builder setDatabase(int database) {
+ this.database = database;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code maxTotal} configuration attribute for pools to be created with
+ * this configuration instance.
+ *
+ * @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool,
+ * default value is 8
+ * @return Builder itself
+ */
+ public Builder setMaxTotal(int maxTotal) {
+ this.maxTotal = maxTotal;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code maxIdle} configuration attribute for pools to be created with
+ * this configuration instance.
+ *
+ * @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8
+ * @return Builder itself
+ */
+ public Builder setMaxIdle(int maxIdle) {
+ this.maxIdle = maxIdle;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code minIdle} configuration attribute for pools to be created with
+ * this configuration instance.
+ *
+ * @param minIdle the minimum number of idle objects to maintain in the pool, default value
+ * is 0
+ * @return Builder itself
+ */
+ public Builder setMinIdle(int minIdle) {
+ this.minIdle = minIdle;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code testOnBorrow} configuration attribute for pools to be created
+ * with this configuration instance.
+ *
+ * @param testOnBorrow Whether objects borrowed from the pool will be validated before being
+ * returned
+ * @return Builder itself
+ */
+ public Builder setTestOnBorrow(boolean testOnBorrow) {
+ this.testOnBorrow = testOnBorrow;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code testOnReturn} configuration attribute for pools to be created
+ * with this configuration instance.
+ *
+ * @param testOnReturn Whether objects borrowed from the pool will be validated when they
+ * are returned to the pool
+ * @return Builder itself
+ */
+ public Builder setTestOnReturn(boolean testOnReturn) {
+ this.testOnReturn = testOnReturn;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code testWhileIdle} configuration attribute for pools to be created
+ * with this configuration instance. Setting this to true will also set default idle-testing
+ * parameters provided in Jedis
+ *
+ * @see redis.clients.jedis.JedisPoolConfig
+ * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the
+ * idle object evictor
+ * @return Builder itself
+ */
+ public Builder setTestWhileIdle(boolean testWhileIdle) {
+ this.testWhileIdle = testWhileIdle;
+ return this;
+ }
+
+ /**
+ * Builds JedisSentinelConfig.
+ *
+ * @return JedisSentinelConfig
+ */
+ public JedisSentinelConfig build() {
+ return new JedisSentinelConfig(
+ masterName,
+ sentinels,
+ connectionTimeout,
+ soTimeout,
+ password,
+ database,
+ maxTotal,
+ maxIdle,
+ minIdle,
+ testOnBorrow,
+ testOnReturn,
+ testWhileIdle);
+ }
+ }
+}
diff --git a/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/connection/JedisConnector.java b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/connection/JedisConnector.java
new file mode 100644
index 0000000..8c2fb8f
--- /dev/null
+++ b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/connection/JedisConnector.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.redis.streams.sink.connection;
+
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+import redis.clients.jedis.commands.JedisCommands;
+
+import java.io.Serializable;
+
+/** A connector to Redis. */
+public class JedisConnector implements AutoCloseable, Serializable {
+
+ private transient JedisCluster jedisCluster;
+ private transient JedisPool jedisPool;
+ private transient JedisSentinelPool jedisSentinelPool;
+
+ public JedisConnector(JedisCluster jedisCluster) {
+ this.jedisCluster = jedisCluster;
+ }
+
+ public JedisConnector(JedisPool jedisPool) {
+ this.jedisPool = jedisPool;
+ }
+
+ public JedisConnector(JedisSentinelPool jedisSentinelPool) {
+ this.jedisSentinelPool = jedisSentinelPool;
+ }
+
+ public JedisCommands getJedisCommands() {
+ if (jedisCluster != null) {
+ return jedisCluster;
+ }
+ if (jedisPool != null) {
+ return jedisPool.getResource();
+ }
+ if (jedisSentinelPool != null) {
+ return jedisSentinelPool.getResource();
+ }
+
+ throw new IllegalArgumentException("No redis connection found");
+ }
+
+ @Override
+ public void close() {
+ if (jedisCluster != null) {
+ jedisCluster.close();
+ }
+ if (jedisPool != null) {
+ jedisPool.close();
+ }
+ if (jedisSentinelPool != null) {
+ jedisSentinelPool.close();
+ }
+ }
+}
diff --git a/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/connection/JedisConnectorBuilder.java b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/connection/JedisConnectorBuilder.java
new file mode 100644
index 0000000..ef04e55
--- /dev/null
+++ b/flink-connector-redis-streams/src/main/java/org/apache/flink/connector/redis/streams/sink/connection/JedisConnectorBuilder.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.redis.streams.sink.connection;
+
+import org.apache.flink.connector.redis.streams.sink.config.JedisClusterConfig;
+import org.apache.flink.connector.redis.streams.sink.config.JedisConfig;
+import org.apache.flink.connector.redis.streams.sink.config.JedisPoolConfig;
+import org.apache.flink.connector.redis.streams.sink.config.JedisSentinelConfig;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+import java.util.Objects;
+
+/** The builder for {@link JedisConnector}. */
+public class JedisConnectorBuilder {
+
+ /**
+ * Initialize the {@link JedisConnector} based on the instance type.
+ *
+ * @param jedisConfig configuration base
+ * @return @throws IllegalArgumentException if not valid configuration is provided
+ */
+ public static JedisConnector build(JedisConfig jedisConfig) {
+ if (jedisConfig instanceof JedisPoolConfig) {
+ JedisPoolConfig jedisPoolConfig = (JedisPoolConfig) jedisConfig;
+ return JedisConnectorBuilder.build(jedisPoolConfig);
+ } else if (jedisConfig instanceof JedisClusterConfig) {
+ JedisClusterConfig jedisClusterConfig = (JedisClusterConfig) jedisConfig;
+ return JedisConnectorBuilder.build(jedisClusterConfig);
+ } else if (jedisConfig instanceof JedisSentinelConfig) {
+ JedisSentinelConfig jedisSentinelConfig = (JedisSentinelConfig) jedisConfig;
+ return JedisConnectorBuilder.build(jedisSentinelConfig);
+ } else {
+ throw new IllegalArgumentException("Jedis configuration not found");
+ }
+ }
+
+ /**
+ * Builds container for single Redis environment.
+ *
+ * @param jedisPoolConfig configuration for JedisPool
+ * @return container for single Redis environment
+ * @throws NullPointerException if jedisPoolConfig is null
+ */
+ public static JedisConnector build(JedisPoolConfig jedisPoolConfig) {
+ Objects.requireNonNull(jedisPoolConfig, "Redis pool config should not be Null");
+
+ GenericObjectPoolConfig genericObjectPoolConfig =
+ getGenericObjectPoolConfig(jedisPoolConfig);
+
+ JedisPool jedisPool =
+ new JedisPool(
+ genericObjectPoolConfig,
+ jedisPoolConfig.getHost(),
+ jedisPoolConfig.getPort(),
+ jedisPoolConfig.getConnectionTimeout(),
+ jedisPoolConfig.getPassword(),
+ jedisPoolConfig.getDatabase());
+ return new JedisConnector(jedisPool);
+ }
+
+ /**
+ * Builds container for Redis Cluster environment.
+ *
+ * @param jedisClusterConfig configuration for JedisCluster
+ * @return container for Redis Cluster environment
+ * @throws NullPointerException if jedisClusterConfig is null
+ */
+ public static JedisConnector build(JedisClusterConfig jedisClusterConfig) {
+ Objects.requireNonNull(jedisClusterConfig, "Redis cluster config should not be Null");
+
+ GenericObjectPoolConfig genericObjectPoolConfig =
+ getGenericObjectPoolConfig(jedisClusterConfig);
+
+ JedisCluster jedisCluster =
+ new JedisCluster(
+ jedisClusterConfig.getNodes(),
+ jedisClusterConfig.getConnectionTimeout(),
+ jedisClusterConfig.getConnectionTimeout(),
+ jedisClusterConfig.getMaxRedirections(),
+ jedisClusterConfig.getPassword(),
+ genericObjectPoolConfig);
+ return new JedisConnector(jedisCluster);
+ }
+
+ /**
+ * Builds container for Redis Sentinel environment.
+ *
+ * @param jedisSentinelConfig configuration for JedisSentinel
+ * @return container for Redis sentinel environment
+ * @throws NullPointerException if jedisSentinelConfig is null
+ */
+ public static JedisConnector build(JedisSentinelConfig jedisSentinelConfig) {
+ Objects.requireNonNull(jedisSentinelConfig, "Redis sentinel config should not be Null");
+
+ GenericObjectPoolConfig genericObjectPoolConfig =
+ getGenericObjectPoolConfig(jedisSentinelConfig);
+
+ JedisSentinelPool jedisSentinelPool =
+ new JedisSentinelPool(
+ jedisSentinelConfig.getMasterName(),
+ jedisSentinelConfig.getSentinels(),
+ genericObjectPoolConfig,
+ jedisSentinelConfig.getConnectionTimeout(),
+ jedisSentinelConfig.getSoTimeout(),
+ jedisSentinelConfig.getPassword(),
+ jedisSentinelConfig.getDatabase());
+ return new JedisConnector(jedisSentinelPool);
+ }
+
+ public static GenericObjectPoolConfig getGenericObjectPoolConfig(JedisConfig jedisConfig) {
+ GenericObjectPoolConfig genericObjectPoolConfig =
+ jedisConfig.getTestWhileIdle()
+ ? new redis.clients.jedis.JedisPoolConfig()
+ : new GenericObjectPoolConfig<>();
+ genericObjectPoolConfig.setMaxIdle(jedisConfig.getMaxIdle());
+ genericObjectPoolConfig.setMaxTotal(jedisConfig.getMaxTotal());
+ genericObjectPoolConfig.setMinIdle(jedisConfig.getMinIdle());
+ genericObjectPoolConfig.setTestOnBorrow(jedisConfig.getTestOnBorrow());
+ genericObjectPoolConfig.setTestOnReturn(jedisConfig.getTestOnReturn());
+
+ return genericObjectPoolConfig;
+ }
+}
diff --git a/flink-connector-redis-streams/src/test/java/org/apache/flink/connector/redis/streams/sink/BaseITCase.java b/flink-connector-redis-streams/src/test/java/org/apache/flink/connector/redis/streams/sink/BaseITCase.java
new file mode 100644
index 0000000..b7f32b2
--- /dev/null
+++ b/flink-connector-redis-streams/src/test/java/org/apache/flink/connector/redis/streams/sink/BaseITCase.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.redis.streams.sink;
+
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+import redis.clients.jedis.Jedis;
+
+/** A redis container for testing. */
+@Testcontainers
+public class BaseITCase {
+
+ @Container
+ private GenericContainer> redis =
+ new GenericContainer<>(DockerImageName.parse("redis:7.0.5-alpine"))
+ .withExposedPorts(6379);
+
+ protected Jedis jedis;
+
+ public static MiniClusterWithClientResource cluster =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberSlotsPerTaskManager(2)
+ .setNumberTaskManagers(1)
+ .build());
+
+ @BeforeAll
+ public static void beforeAll() throws Exception {
+ cluster.before();
+ }
+
+ @AfterAll
+ public static void afterAll() {
+ cluster.after();
+ }
+
+ @BeforeEach
+ public void setUp() {
+ jedis = new Jedis(redisHost(), redisPort());
+ }
+
+ @AfterEach
+ public void cleanUp() {
+ jedis.close();
+ }
+
+ public String redisHost() {
+ return redis.getHost();
+ }
+
+ public Integer redisPort() {
+ return redis.getFirstMappedPort();
+ }
+}
diff --git a/flink-connector-redis-streams/src/test/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsSinkTest.java b/flink-connector-redis-streams/src/test/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsSinkTest.java
new file mode 100644
index 0000000..90b2b03
--- /dev/null
+++ b/flink-connector-redis-streams/src/test/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsSinkTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.redis.streams.sink;
+
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
+import org.apache.flink.connector.redis.streams.sink.config.JedisPoolConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class RedisStreamsSinkTest extends BaseITCase {
+
+ @Test
+ public void testStreamCommand() throws Exception {
+
+ JedisPoolConfig jedisConfig =
+ new JedisPoolConfig.Builder().setHost(redisHost()).setPort(redisPort()).build();
+
+ RedisStreamsCommandSerializer> serializer =
+ new TestCommandSerializer();
+
+ AsyncSinkWriterConfiguration asyncConfig =
+ AsyncSinkWriterConfiguration.builder()
+ .setMaxBatchSize(5)
+ .setMaxBatchSizeInBytes(1000)
+ .setMaxInFlightRequests(5)
+ .setMaxBufferedRequests(6)
+ .setMaxTimeInBufferMS(10000)
+ .setMaxRecordSizeInBytes(1000)
+ .build();
+
+ RedisStreamsSink> underTest =
+ new RedisStreamsSink<>(jedisConfig, serializer, asyncConfig);
+
+ List> source =
+ Arrays.asList(
+ Tuple3.of("one", "onekey", "onevalue"),
+ Tuple3.of("two", "firstkey", "firstvalue"),
+ Tuple3.of("two", "secontkey", "secondvalue"));
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+ env.fromCollection(source).sinkTo(underTest);
+ env.execute();
+
+ // verify results
+ assertEquals(1, jedis.xlen("one"));
+ assertEquals(2, jedis.xlen("two"));
+ }
+
+ public static class TestCommandSerializer
+ implements RedisStreamsCommandSerializer> {
+ @Override
+ public RedisStreamsCommand apply(
+ Tuple3 input, SinkWriter.Context context) {
+ return RedisStreamsCommand.builder()
+ .withKey(input.f0)
+ .withValue(
+ new HashMap() {
+ {
+ put(input.f1, input.f2);
+ }
+ })
+ .build();
+ }
+ }
+}
diff --git a/flink-connector-redis-streams/src/test/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsStateSerializerTest.java b/flink-connector-redis-streams/src/test/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsStateSerializerTest.java
new file mode 100644
index 0000000..7e1863e
--- /dev/null
+++ b/flink-connector-redis-streams/src/test/java/org/apache/flink/connector/redis/streams/sink/RedisStreamsStateSerializerTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.redis.streams.sink;
+
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.RequestEntryWrapper;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class RedisStreamsStateSerializerTest {
+
+ @Test
+ void testSerDe() throws IOException {
+
+ RedisStreamsCommand command1 =
+ RedisStreamsCommand.builder()
+ .withKey("test1")
+ .withValue(
+ new HashMap() {
+ {
+ put("first", "1");
+ put("second", "2");
+ }
+ })
+ .build();
+
+ RedisStreamsCommand command2 =
+ RedisStreamsCommand.builder()
+ .withKey("test2")
+ .withValue(
+ new HashMap() {
+ {
+ put("third", "3");
+ put("fourth", "4");
+ }
+ })
+ .build();
+
+ List> state = new ArrayList<>();
+ state.add(new RequestEntryWrapper<>(command1, command1.getMessageSize()));
+ state.add(new RequestEntryWrapper<>(command2, command2.getMessageSize()));
+
+ RedisStreamsStateSerializer serializer = new RedisStreamsStateSerializer();
+
+ byte[] serialized = serializer.serialize(new BufferedRequestState<>(state));
+ BufferedRequestState deserialized =
+ serializer.deserialize(1, serialized);
+ assertEquals(2, deserialized.getBufferedRequestEntries().size());
+
+ RedisStreamsCommand newCommand =
+ deserialized.getBufferedRequestEntries().get(0).getRequestEntry();
+ assertEquals(command1.key, newCommand.key);
+ assertEquals(command1.value, newCommand.value);
+
+ newCommand = deserialized.getBufferedRequestEntries().get(1).getRequestEntry();
+ assertEquals(command2.key, newCommand.key);
+ assertEquals(command2.value, newCommand.value);
+ }
+}
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..bb6b39d
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,457 @@
+
+
+
+
+
+ io.github.zentol.flink
+ flink-connector-parent
+ 1.0
+
+
+ 4.0.0
+
+ org.apache.flink
+ flink-connector-redis-parent
+ 3.0-SNAPSHOT
+
+ Flink : Connectors : Redis : Parent
+ pom
+ 2022
+
+
+ https://github.com/apache/flink-connector-redis-streams
+ git@github.com:apache/flink-connector-redis-streams.git
+
+ scm:git:https://gitbox.apache.org/repos/asf/flink-connector-redis-stream.git
+
+
+
+
+ flink-connector-redis-streams
+
+
+
+ 1.16.0
+ 15.0
+ 4.2.3
+
+ 2.13.4.20221013
+ 5.8.1
+ 3.21.0
+ 0.22.0
+ 1.17.2
+ 2.21.0
+
+ false
+ 1.15.0
+
+ 1.7.36
+ 2.17.2
+
+
+
+ flink-connector-redis-parent
+
+
+
+
+ org.apache.flink
+ flink-shaded-force-shading
+ ${flink.shaded.version}
+
+
+
+
+
+
+ org.slf4j
+ slf4j-api
+ provided
+
+
+
+
+ com.google.code.findbugs
+ jsr305
+ provided
+
+
+
+
+ org.junit.jupiter
+ junit-jupiter
+ test
+
+
+
+ org.junit.vintage
+ junit-vintage-engine
+ test
+
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+
+ org.mockito
+ mockito-core
+ ${mockito.version}
+ jar
+ test
+
+
+
+ org.testcontainers
+ junit-jupiter
+ test
+
+
+
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+ test
+
+
+
+ org.apache.logging.log4j
+ log4j-api
+ test
+
+
+
+ org.apache.logging.log4j
+ log4j-core
+ test
+
+
+
+
+ org.apache.logging.log4j
+ log4j-1.2-api
+ test
+
+
+
+ org.apache.flink
+ flink-test-utils-junit
+ test
+
+
+
+
+ org.apache.flink
+ flink-architecture-tests-test
+ test
+
+
+ org.apache.flink
+ flink-architecture-tests-production
+ test
+
+
+
+
+
+
+
+
+
+
+
+ redis.clients
+ jedis
+ ${jedis.version}
+ compile
+
+
+ org.slf4j
+ slf4j.api
+
+
+
+
+
+
+ org.apache.flink
+ flink-connector-base
+ ${flink.version}
+
+
+
+ org.apache.flink
+ flink-streaming-java
+ ${flink.version}
+
+
+
+ org.apache.flink
+ flink-test-utils
+ ${flink.version}
+ test
+
+
+ log4j
+ log4j
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
+
+
+ org.apache.flink
+ flink-test-utils-junit
+ ${flink.version}
+ test
+
+
+
+
+
+ org.apache.flink
+ flink-architecture-tests-base
+ ${flink.version}
+ test
+
+
+
+ org.apache.flink
+ flink-architecture-tests-test
+ ${flink.version}
+ test
+
+
+
+ org.apache.flink
+ flink-architecture-tests-production
+ ${flink.version}
+ test
+
+
+
+
+ com.google.code.findbugs
+ jsr305
+ 1.3.9
+
+
+
+ commons-codec
+ commons-codec
+ 1.15
+
+
+
+ org.apache.httpcomponents
+ httpcore
+ 4.4.14
+
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.13
+
+
+
+ org.slf4j
+ slf4j-api
+ ${slf4j.version}
+
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+ ${log4j.version}
+
+
+
+ org.apache.logging.log4j
+ log4j-api
+ ${log4j.version}
+
+
+
+ org.apache.logging.log4j
+ log4j-core
+ ${log4j.version}
+
+
+
+
+ org.apache.logging.log4j
+ log4j-1.2-api
+ ${log4j.version}
+
+
+
+
+ com.fasterxml.jackson
+ jackson-bom
+ pom
+ import
+ ${jackson-bom.version}
+
+
+
+
+ org.junit
+ junit-bom
+ ${junit5.version}
+ pom
+ import
+
+
+
+ org.assertj
+ assertj-core
+ ${assertj.version}
+ test
+
+
+
+
+ com.esotericsoftware.kryo
+ kryo
+ 2.24.0
+
+
+
+
+ org.objenesis
+ objenesis
+ 2.1
+
+
+
+ org.testcontainers
+ testcontainers-bom
+ ${testcontainers.version}
+ pom
+ import
+
+
+
+ com.tngtech.archunit
+ archunit
+ ${archunit.version}
+ test
+
+
+
+ com.tngtech.archunit
+ archunit-junit5
+ ${archunit.version}
+ test
+
+
+
+
+
+
+
+
+
+
+ sql-jars
+
+
+ !skipSqlJars
+
+
+
+
+
+
+
+
+
+
+
+ org.codehaus.mojo
+ exec-maven-plugin
+ false
+
+
+ org.apache.flink
+ flink-ci-tools
+ ${flink.version}
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+
+ io.github.zentol.japicmp
+ japicmp-maven-plugin
+
+
+
+ org.apache.rat
+ apache-rat-plugin
+ false
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+
+
+ com.diffplug.spotless
+ spotless-maven-plugin
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+
+
+ org.apache.maven.plugins
+ maven-enforcer-plugin
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+
+ org.commonjava.maven.plugins
+ directory-maven-plugin
+
+
+
+
\ No newline at end of file
diff --git a/tools/ci/log4j.properties b/tools/ci/log4j.properties
new file mode 100644
index 0000000..b28a9e3
--- /dev/null
+++ b/tools/ci/log4j.properties
@@ -0,0 +1,43 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+rootLogger.level = INFO
+rootLogger.appenderRef.out.ref = ConsoleAppender
+
+# -----------------------------------------------------------------------------
+# Console (use 'console')
+# -----------------------------------------------------------------------------
+
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss,SSS} [%20t] %-5p %-60c %x - %m%n
+
+# -----------------------------------------------------------------------------
+# File (use 'file')
+# -----------------------------------------------------------------------------
+appender.file.name = FileAppender
+appender.file.type = FILE
+appender.file.fileName = ${sys:log.dir}/mvn-${sys:mvn.forkNumber:-output}.log
+appender.file.layout.type = PatternLayout
+appender.file.layout.pattern = %d{HH:mm:ss,SSS} [%20t] %-5p %-60c %x - %m%n
+appender.file.createOnDemand = true
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
+logger.netty.level = ERROR
\ No newline at end of file
diff --git a/tools/maven/checkstyle.xml b/tools/maven/checkstyle.xml
new file mode 100644
index 0000000..2841ea4
--- /dev/null
+++ b/tools/maven/checkstyle.xml
@@ -0,0 +1,561 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml
new file mode 100644
index 0000000..f0de8cd
--- /dev/null
+++ b/tools/maven/suppressions.xml
@@ -0,0 +1,26 @@
+
+
+
+
+
+
+
\ No newline at end of file