Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
Change log
==========

X.X.X
-----

* Add PubSub functionality: publish/subscribe/psubscribe/unsubscribe/punsubscribe

0.9.0
-----

Expand Down
68 changes: 67 additions & 1 deletion src/eredis_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@
%% Specific pools (Redis nodes), named cluster
-export([get_pool_by_command/2, get_pool_by_key/2, get_all_pools/1]).

%% Get key slot and node info
-export([get_key_slot_node/1, get_key_slot_node/2]).

%% PubSub functionality (default cluster)
-export([publish/2, spublish/2]).

%% PubSub functionality (named cluster)
-export([publish/3, spublish/3]).

-ifdef(TEST).
-export([get_key_slot/1]).
-export([get_key_from_command/1]).
Expand Down Expand Up @@ -1214,7 +1223,7 @@ arg_after_keyword(Keyword, [Arg|Args]) ->
end.

memory_arg([Subcommand | Args]) when is_binary(Subcommand) ->
memory_arg([binary_to_list(Subcommand) | Args]);
memory_arg([binary_to_list(Subcommand)|Args]);
memory_arg([Subcommand | Args]) ->
case string:to_lower(Subcommand) of
"usage" -> nth_arg(1, Args);
Expand All @@ -1238,6 +1247,63 @@ resource_queue_redesign_log(Cluster, Command, Result) ->
ok
end.

%% =============================================================================
%% @doc Publish a message to a channel.
%%
%% Returns the number of clients that received the message.
%% @end
%% =============================================================================
-spec publish(Channel::anystring(), Message::anystring()) ->
{ok, non_neg_integer()} | {error, Reason::term()}.
publish(Channel, Message) ->
publish(?default_cluster, Channel, Message).

%% @doc Publish a message to a channel on a specific cluster.
-spec publish(Cluster::atom(), Channel::anystring(), Message::anystring()) ->
{ok, non_neg_integer()} | {error, Reason::term()}.
publish(Cluster, Channel, Message) ->
Command = ["PUBLISH", Channel, Message],
query(Cluster, Command, Channel).

%% =============================================================================
%% @doc Publish a message using the SPUBLISH command (Redis 7.0+).
%%
%% SPUBLISH is a global publish command introduced in Redis 7.0 that
%% ensures the message is published to all shards in the cluster.
%% Returns the number of clients that received the message.
%% @end
%% =============================================================================
-spec spublish(Channel::anystring(), Message::anystring()) ->
{ok, non_neg_integer()} | {error, Reason::term()}.
spublish(Channel, Message) ->
spublish(?default_cluster, Channel, Message).

%% @doc Publish a message using the SPUBLISH command (Redis 7.0+) in a named cluster.
-spec spublish(Cluster::atom(), Channel::anystring(), Message::anystring()) ->
{ok, non_neg_integer()} | {error, Reason::term()}.
spublish(Cluster, Channel, Message) ->
Command = ["SPUBLISH", Channel, Message],
% Using Channel as routing key since SPUBLISH handles global publishing automatically
query(Cluster, Command, Channel).

%% =============================================================================
%% @doc Returns the hash slot and the connection pool for the Redis node responsible
%% for the key in the default cluster.
%% @end
%% =============================================================================
-spec get_key_slot_node(Key::anystring()) -> {integer(), atom()} | {integer(), undefined}.
get_key_slot_node(Key) ->
get_key_slot_node(?default_cluster, Key).

%% @doc Like get_key_slot_node/1 for a named cluster.
-spec get_key_slot_node(Cluster::atom(), Key::anystring()) ->
{integer(), atom()} | {integer(), undefined}.
get_key_slot_node(Cluster, Key) ->
Slot = get_key_slot(Key),
State = eredis_cluster_monitor:get_state(Cluster),
{Pool, _Version} = eredis_cluster_monitor:get_pool_by_slot(Slot, State),
{Slot, Pool}.

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").

Expand Down