Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisClusterInfoCache;
import redis.clients.jedis.RedisClusterClient;
import redis.clients.jedis.UnifiedJedis;
import redis.clients.jedis.providers.ClusterConnectionProvider;

import java.time.Duration;
Expand Down Expand Up @@ -66,6 +68,8 @@
* Uses the native {@link JedisCluster} api where possible and falls back to direct node communication using
* {@link Jedis} where needed.
* <p>
* Pipelines and transactions are not supported in cluster mode.
* <p>
* This class is not Thread-safe and instances should not be shared across threads.
*
* @author Christoph Strobl
Expand All @@ -76,17 +80,32 @@
* @author Pavel Khokhlov
* @author Liming Deng
* @author John Blum
* @author Tihomir Mateev
* @since 1.7
*/
@NullUnmarked
public class JedisClusterConnection implements RedisClusterConnection {
public class JedisClusterConnection extends JedisConnection implements RedisClusterConnection {

private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION = new FallbackExceptionTranslationStrategy(
JedisExceptionConverter.INSTANCE);

private final Log log = LogFactory.getLog(getClass());

private final JedisCluster cluster;
private final UnifiedJedis cluster;

/**
* Cluster-safe invoker that only supports direct execution.
* Pipelines and transactions are not supported in cluster mode.
*/
private final JedisInvoker clusterInvoker = new JedisInvoker((directFunction, pipelineFunction, converter, nullDefault) -> {
try {
Object result = directFunction.apply(getCluster());
return result != null ? converter.convert(result) : nullDefault.get();
} catch (Exception ex) {
throw convertJedisAccessException(ex);
}
});

private final JedisClusterGeoCommands geoCommands = new JedisClusterGeoCommands(this);
private final JedisClusterHashCommands hashCommands = new JedisClusterHashCommands(this);
private final JedisClusterHyperLogLogCommands hllCommands = new JedisClusterHyperLogLogCommands(this);
Expand All @@ -104,16 +123,16 @@ public class JedisClusterConnection implements RedisClusterConnection {
private final ClusterCommandExecutor clusterCommandExecutor;
private final boolean disposeClusterCommandExecutorOnClose;

private volatile @Nullable JedisSubscription subscription;

/**
* Create new {@link JedisClusterConnection} utilizing native connections via {@link JedisCluster}.
* Create new {@link JedisClusterConnection} utilizing native connections via {@link UnifiedJedis} based {@link RedisClusterClient}.
*
* @param cluster must not be {@literal null}.
*/
public JedisClusterConnection(@NonNull JedisCluster cluster) {
public JedisClusterConnection(@NonNull UnifiedJedis cluster) {

Assert.notNull(cluster, "JedisCluster must not be null");
super(cluster);

Assert.notNull(cluster, "UnifiedJedis must not be null");

this.cluster = cluster;

Expand All @@ -135,29 +154,31 @@ public JedisClusterConnection(@NonNull JedisCluster cluster) {
}

/**
* Create new {@link JedisClusterConnection} utilizing native connections via {@link JedisCluster} running commands
* Create new {@link JedisClusterConnection} utilizing native connections via {@link UnifiedJedis} running commands
* across the cluster via given {@link ClusterCommandExecutor}. Uses {@link JedisClusterTopologyProvider} by default.
*
* @param cluster must not be {@literal null}.
* @param executor must not be {@literal null}.
*/
public JedisClusterConnection(@NonNull JedisCluster cluster, @NonNull ClusterCommandExecutor executor) {
public JedisClusterConnection(@NonNull UnifiedJedis cluster, @NonNull ClusterCommandExecutor executor) {
this(cluster, executor, new JedisClusterTopologyProvider(cluster));
}

/**
* Create new {@link JedisClusterConnection} utilizing native connections via {@link JedisCluster} running commands
* Create new {@link JedisClusterConnection} utilizing native connections via {@link UnifiedJedis} running commands
* across the cluster via given {@link ClusterCommandExecutor} and using the given {@link ClusterTopologyProvider}.
*
* @param cluster must not be {@literal null}.
* @param executor must not be {@literal null}.
* @param topologyProvider must not be {@literal null}.
* @since 2.2
*/
public JedisClusterConnection(@NonNull JedisCluster cluster, @NonNull ClusterCommandExecutor executor,
public JedisClusterConnection(@NonNull UnifiedJedis cluster, @NonNull ClusterCommandExecutor executor,
@NonNull ClusterTopologyProvider topologyProvider) {

Assert.notNull(cluster, "JedisCluster must not be null");
super(cluster);

Assert.notNull(cluster, "UnifiedJedis must not be null");
Assert.notNull(executor, "ClusterCommandExecutor must not be null");
Assert.notNull(topologyProvider, "ClusterTopologyProvider must not be null");

Expand Down Expand Up @@ -354,60 +375,6 @@ public void unwatch() {
throw new InvalidDataAccessApiUsageException("UNWATCH is currently not supported in cluster mode");
}

@Override
public boolean isSubscribed() {
JedisSubscription subscription = this.subscription;
return (subscription != null && subscription.isAlive());
}

@Override
public Subscription getSubscription() {
return this.subscription;
}

@Override
public Long publish(byte @NonNull [] channel, byte @NonNull [] message) {

try {
return this.cluster.publish(channel, message);
} catch (Exception ex) {
throw convertJedisAccessException(ex);
}
}

@Override
public void subscribe(@NonNull MessageListener listener, byte @NonNull [] @NonNull... channels) {

if (isSubscribed()) {
String message = "Connection already subscribed; use the connection Subscription to cancel or add new channels";
throw new RedisSubscribedConnectionException(message);
}
try {
JedisMessageListener jedisPubSub = new JedisMessageListener(listener);
subscription = new JedisSubscription(listener, jedisPubSub, channels, null);
cluster.subscribe(jedisPubSub, channels);
} catch (Exception ex) {
throw convertJedisAccessException(ex);
}
}

@Override
public void pSubscribe(@NonNull MessageListener listener, byte @NonNull [] @NonNull... patterns) {

if (isSubscribed()) {
String message = "Connection already subscribed; use the connection Subscription to cancel or add new channels";
throw new RedisSubscribedConnectionException(message);
}

try {
JedisMessageListener jedisPubSub = new JedisMessageListener(listener);
subscription = new JedisSubscription(listener, jedisPubSub, null, patterns);
cluster.psubscribe(jedisPubSub, patterns);
} catch (Exception ex) {
throw convertJedisAccessException(ex);
}
}

@Override
public void select(int dbIndex) {

Expand Down Expand Up @@ -631,18 +598,8 @@ public ClusterInfo clusterGetClusterInfo() {
return new ClusterInfo(JedisConverters.toProperties(source));
}

/*
* Little helpers to make it work
*/
protected DataAccessException convertJedisAccessException(Exception cause) {

DataAccessException translated = EXCEPTION_TRANSLATION.translate(cause);

return translated != null ? translated : new RedisSystemException(cause.getMessage(), cause);
}

@Override
public void close() throws DataAccessException {
protected void doClose() {

if (!closed && disposeClusterCommandExecutorOnClose) {
try {
Expand All @@ -661,7 +618,7 @@ public boolean isClosed() {
}

@Override
public JedisCluster getNativeConnection() {
public UnifiedJedis getNativeConnection() {
return cluster;
}

Expand Down Expand Up @@ -723,7 +680,7 @@ protected interface JedisMultiKeyClusterCommandCallback<T> extends MultiKeyClust
@NullMarked
static class JedisClusterNodeResourceProvider implements ClusterNodeResourceProvider {

private final JedisCluster cluster;
private final UnifiedJedis cluster;
private final ClusterTopologyProvider topologyProvider;
private final @Nullable ClusterConnectionProvider connectionHandler;

Expand All @@ -733,7 +690,7 @@ static class JedisClusterNodeResourceProvider implements ClusterNodeResourceProv
* @param cluster should not be {@literal null}.
* @param topologyProvider must not be {@literal null}.
*/
JedisClusterNodeResourceProvider(JedisCluster cluster, ClusterTopologyProvider topologyProvider) {
JedisClusterNodeResourceProvider(UnifiedJedis cluster, ClusterTopologyProvider topologyProvider) {

this.cluster = cluster;
this.topologyProvider = topologyProvider;
Expand Down Expand Up @@ -767,7 +724,7 @@ public Jedis getResourceForSpecificNode(RedisClusterNode node) {

private @Nullable ConnectionPool getResourcePoolForSpecificNode(RedisClusterNode node) {

Map<String, ConnectionPool> clusterNodes = cluster.getClusterNodes();
Map<String, ConnectionPool> clusterNodes = getClusterNodesMap(cluster);
HostAndPort hap = JedisConverters.toHostAndPort(node);
String key = JedisClusterInfoCache.getNodeKey(hap);

Expand Down Expand Up @@ -810,7 +767,7 @@ public void returnResourceForSpecificNode(@NonNull RedisClusterNode node, @NonNu
@NullMarked
public static class JedisClusterTopologyProvider implements ClusterTopologyProvider {

private final JedisCluster cluster;
private final UnifiedJedis cluster;

private final long cacheTimeMs;

Expand All @@ -821,7 +778,7 @@ public static class JedisClusterTopologyProvider implements ClusterTopologyProvi
*
* @param cluster must not be {@literal null}.
*/
public JedisClusterTopologyProvider(JedisCluster cluster) {
public JedisClusterTopologyProvider(UnifiedJedis cluster) {
this(cluster, Duration.ofMillis(100));
}

Expand All @@ -832,9 +789,9 @@ public JedisClusterTopologyProvider(JedisCluster cluster) {
* @param cacheTimeout must not be {@literal null}.
* @since 2.2
*/
public JedisClusterTopologyProvider(JedisCluster cluster, Duration cacheTimeout) {
public JedisClusterTopologyProvider(UnifiedJedis cluster, Duration cacheTimeout) {

Assert.notNull(cluster, "JedisCluster must not be null");
Assert.notNull(cluster, "UnifiedJedis must not be null");
Assert.notNull(cacheTimeout, "Cache timeout must not be null");
Assert.isTrue(!cacheTimeout.isNegative(), "Cache timeout must not be negative");

Expand All @@ -852,7 +809,7 @@ public ClusterTopology getTopology() {
}

Map<String, Exception> errors = new LinkedHashMap<>();
List<Entry<String, ConnectionPool>> list = new ArrayList<>(cluster.getClusterNodes().entrySet());
List<Entry<String, ConnectionPool>> list = new ArrayList<>(getClusterNodesMap(cluster).entrySet());

Collections.shuffle(list);

Expand Down Expand Up @@ -885,7 +842,7 @@ public ClusterTopology getTopology() {
*
* @return {@literal true} to use the cached {@link ClusterTopology}; {@literal false} to fetch a new cluster
* topology.
* @see #JedisClusterTopologyProvider(JedisCluster, Duration)
* @see #JedisClusterTopologyProvider(UnifiedJedis, Duration)
* @since 3.3.4
*/
protected boolean shouldUseCachedValue(@Nullable JedisClusterTopology topology) {
Expand Down Expand Up @@ -923,15 +880,68 @@ long getMaxTime() {
}
}

protected JedisCluster getCluster() {
protected UnifiedJedis getCluster() {
return cluster;
}

@Override
public UnifiedJedis getJedis() {
return cluster;
}

/**
* Obtain a {@link JedisInvoker} to call Jedis methods on the cluster.
* <p>
* This invoker only supports direct execution mode. Pipelines and transactions
* are not supported in cluster mode.
*
* @return the {@link JedisInvoker}.
* @since 3.5
*/
@Override
public JedisInvoker invoke() {
return this.clusterInvoker;
}

/**
* Obtain a {@link JedisInvoker} for status commands on the cluster.
* <p>
* In cluster mode, this returns the same invoker as {@link #invoke()} since
* pipelines and transactions are not supported.
*
* @return the {@link JedisInvoker}.
* @since 3.5
*/
@Override
public JedisInvoker invokeStatus() {
return this.clusterInvoker;
}

protected ClusterCommandExecutor getClusterCommandExecutor() {
return clusterCommandExecutor;
}

protected ClusterTopologyProvider getTopologyProvider() {
return topologyProvider;
}

/**
* Get cluster nodes map from a {@link UnifiedJedis} instance. This method handles both
* {@link JedisCluster} and {@link RedisClusterClient} by invoking the {@code getClusterNodes()}
* method via reflection since it's not part of the {@link UnifiedJedis} base class.
*
* @param cluster the cluster client (either JedisCluster or RedisClusterClient)
* @return map of node addresses to connection pools
*/
@SuppressWarnings("unchecked")
static Map<String, ConnectionPool> getClusterNodesMap(UnifiedJedis cluster) {
if (cluster instanceof JedisCluster jedisCluster) {
return jedisCluster.getClusterNodes();
}
if (cluster instanceof RedisClusterClient redisClusterClient) {
return redisClusterClient.getClusterNodes();
}
throw new IllegalArgumentException(
"Unsupported UnifiedJedis type: " + cluster.getClass().getName() + ". Expected JedisCluster or RedisClusterClient.");
}
}
Loading
Loading