Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 47 additions & 34 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1513,40 +1513,53 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
{ok, pos_integer()} | {error, pos_integer(), term()}}].
shrink_all(Node) ->
?LOG_INFO("Asked to remove all quorum queue replicas from node ~ts", [Node]),
[begin
QName = amqqueue:get_name(Q),
?LOG_INFO("~ts: removing member (replica) on node ~w",
[rabbit_misc:rs(QName), Node]),
Size = length(get_nodes(Q)),
case delete_member(Q, Node) of
ok ->
{QName, {ok, Size-1}};
{error, cluster_change_not_permitted} ->
%% this could be timing related and due to a new leader just being
%% elected but it's noop command not been committed yet.
%% lets sleep and retry once
?LOG_INFO("~ts: failed to remove member (replica) on node ~w "
"as cluster change is not permitted. "
"retrying once in 500ms",
[rabbit_misc:rs(QName), Node]),
timer:sleep(500),
case delete_member(Q, Node) of
ok ->
{QName, {ok, Size-1}};
{error, Err} ->
?LOG_WARNING("~ts: failed to remove member (replica) on node ~w, error: ~w",
[rabbit_misc:rs(QName), Node, Err]),
{QName, {error, Size, Err}}
end;
{error, Err} ->
?LOG_WARNING("~ts: failed to remove member (replica) on node ~w, error: ~w",
[rabbit_misc:rs(QName), Node, Err]),
{QName, {error, Size, Err}}
end
end || Q <- rabbit_amqqueue:list(),
amqqueue:get_type(Q) == ?MODULE,
lists:member(Node, get_nodes(Q))].

Parent = self(),
%% This operation is mostly bound by I/O so this default is set high:
Size = application:get_env(rabbit, quorum_queue_shrink_batch_size, 64),
Chunks = ra_lib:lists_chunk(Size, [Q || Q <- rabbit_amqqueue:list(),
amqqueue:get_type(Q) == ?MODULE,
lists:member(Node, get_nodes(Q))]),
lists:append([begin
Running = [spawn(fun() ->
Res = shrink(Node, Q),
Parent ! {self(), Res}
end) || Q <- Chunk],
[receive
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should introduce a timeout here. All timeout defaults are wrong but something like 10s or 15s won't suffer from false positives much but avoids blocking the parent process.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I wonder if we should also spawn_monitor and look for 'DOWN's as well. I haven't seen this code crash before but it wouldn't hurt to catch exits and avoid this list comprehension getting stuck

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be even better. Both are just typical defensive techniques.

{Pid, Res} ->
Res
end || Pid <- Running]
end || Chunk <- Chunks]).

shrink(Node, Q) ->
QName = amqqueue:get_name(Q),
?LOG_INFO("~ts: removing member (replica) on node ~w",
[rabbit_misc:rs(QName), Node]),
Size = length(get_nodes(Q)),
case delete_member(Q, Node) of
ok ->
{QName, {ok, Size-1}};
{error, cluster_change_not_permitted} ->
%% this could be timing related and due to a new leader just being
%% elected but it's noop command not been committed yet.
%% lets sleep and retry once
?LOG_INFO("~ts: failed to remove member (replica) on node ~w "
"as cluster change is not permitted. "
"retrying once in 500ms",
[rabbit_misc:rs(QName), Node]),
timer:sleep(500),
case delete_member(Q, Node) of
ok ->
{QName, {ok, Size-1}};
{error, Err} ->
?LOG_WARNING("~ts: failed to remove member (replica) on node ~w, error: ~w",
[rabbit_misc:rs(QName), Node, Err]),
{QName, {error, Size, Err}}
end;
{error, Err} ->
?LOG_WARNING("~ts: failed to remove member (replica) on node ~w, error: ~w",
[rabbit_misc:rs(QName), Node, Err]),
{QName, {error, Size, Err}}
end.

grow(Node, VhostSpec, QueueSpec, Strategy) ->
grow(Node, VhostSpec, QueueSpec, Strategy, promotable).
Expand Down
Loading