Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 32 additions & 8 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,10 @@

%% Queue actions that we will process later such that we can confirm and reject
%% delivery IDs in ranges to reduce the number of DISPOSITION frames sent to the client.
stashed_rejected = [] :: [{rejected, rabbit_amqqueue:name(), [delivery_number(),...]}],
stashed_rejected = [] :: [{rejected,
rabbit_amqqueue:name(),
rabbit_queue_type:reject_reason(),
[delivery_number(),...]}],
stashed_settled = [] :: [{settled, rabbit_amqqueue:name(), [delivery_number(),...]}],
%% Classic queues that are down.
stashed_down = []:: [rabbit_amqqueue:name()],
Expand Down Expand Up @@ -692,7 +695,17 @@ send_delivery_state_changes(State0 = #state{cfg = #cfg{writer_pid = Writer,
%% Order is important:
%% 1. Process queue rejections.
{RejectedIds, GrantCredits0, State1} = handle_stashed_rejected(State0),
send_dispositions(RejectedIds, #'v1_0.rejected'{}, Writer, ChannelNum),
maps:foreach(
fun({QNameBin, Reason}, Ids) ->
Info = {map,
[{{symbol, <<"queue">>}, {utf8, QNameBin}},
{{symbol, <<"reason">>}, {symbol, reject_reason_to_binary(Reason)}}]},
Rej = #'v1_0.rejected'{
error = #'v1_0.error'{
condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
info = Info}},
send_dispositions(Ids, Rej, Writer, ChannelNum)
end, RejectedIds),
%% 2. Process queue confirmations.
{AcceptedIds0, GrantCredits1, State2} = handle_stashed_settled(GrantCredits0, State1),
%% 3. Process unavailable classic queues.
Expand All @@ -716,22 +729,28 @@ send_delivery_state_changes(State0 = #state{cfg = #cfg{writer_pid = Writer,
State.

handle_stashed_rejected(#state{stashed_rejected = []} = State) ->
{[], #{}, State};
{#{}, #{}, State};
handle_stashed_rejected(#state{cfg = #cfg{max_link_credit = MaxLinkCredit},
stashed_rejected = Actions,
incoming_links = Links} = State0) ->
{Ids, GrantCredits, Ls} =
lists:foldl(
fun({rejected, _QName, Correlations}, Accum) ->
fun({rejected, #resource{name = QNameBin}, Reason, Correlations}, Accum) ->
lists:foldl(
fun({HandleInt, DeliveryId}, {Ids0, GrantCreds0, Links0} = Acc) ->
case Links0 of
#{HandleInt := Link0 = #incoming_link{incoming_unconfirmed_map = U0}} ->
case maps:take(DeliveryId, U0) of
{{_, Settled, _}, U} ->
Ids1 = case Settled of
true -> Ids0;
false -> [DeliveryId | Ids0]
true ->
Ids0;
false ->
maps:update_with(
{QNameBin, Reason},
fun(L) -> [DeliveryId | L] end,
[DeliveryId],
Ids0)
end,
Link1 = Link0#incoming_link{incoming_unconfirmed_map = U},
{Link, GrantCreds} = maybe_grant_link_credit(
Expand All @@ -745,7 +764,7 @@ handle_stashed_rejected(#state{cfg = #cfg{max_link_credit = MaxLinkCredit},
Acc
end
end, Accum, Correlations)
end, {[], #{}, Links}, Actions),
end, {#{}, #{}, Links}, Actions),

State = State0#state{stashed_rejected = [],
incoming_links = Ls},
Expand Down Expand Up @@ -2116,7 +2135,7 @@ handle_queue_actions(Actions, State) ->
lists:foldl(
fun ({settled, _QName, _DelIds} = Action, #state{stashed_settled = As} = S) ->
S#state{stashed_settled = [Action | As]};
({rejected, _QName, _DelIds} = Action, #state{stashed_rejected = As} = S) ->
({rejected, _QName, _Reason, _DelIds} = Action, #state{stashed_rejected = As} = S) ->
S#state{stashed_rejected = [Action | As]};
({deliver, CTag, AckRequired, Msgs}, S0) ->
lists:foldl(fun(Msg, S) ->
Expand Down Expand Up @@ -2584,6 +2603,11 @@ rejected(DeliveryId, Error) ->
settled = true,
state = #'v1_0.rejected'{error = Error}}.

reject_reason_to_binary(maxlen) ->
<<"maxlen">>;
reject_reason_to_binary(down) ->
<<"unavailable">>.

maybe_grant_link_credit(Credit, MaxLinkCredit, DeliveryCount, NumUnconfirmed, Handle) ->
case grant_link_credit(Credit, MaxLinkCredit, NumUnconfirmed) of
true ->
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2787,7 +2787,7 @@ handle_queue_actions(Actions, State) ->
lists:foldl(
fun({settled, QRef, MsgSeqNos}, S0) ->
confirm(MsgSeqNos, QRef, S0);
({rejected, _QRef, MsgSeqNos}, S0) ->
({rejected, _QRef, _Reason, MsgSeqNos}, S0) ->
{U, Rej} =
lists:foldr(
fun(SeqNo, {U1, Acc}) ->
Expand Down
27 changes: 15 additions & 12 deletions deps/rabbit/src/rabbit_classic_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ handle_event(QName, {reject_publish, SeqNo, _QPid},
%% It does not matter which queue rejected the message,
%% if any queue did, it should not be confirmed.
{U, Rejected} = reject_seq_no(SeqNo, U0),
Actions = [{rejected, QName, Rejected}],
Actions = [{rejected, QName, maxlen, Rejected}],
{ok, State#?STATE{unconfirmed = U}, Actions};
handle_event(QName, {down, Pid, Info}, #?STATE{monitored = Monitored,
unconfirmed = U0} = State0) ->
Expand All @@ -405,11 +405,19 @@ handle_event(QName, {down, Pid, Info}, #?STATE{monitored = Monitored,
maps:filter(fun (_, #msg_status{pending = Pids}) ->
lists:member(Pid, Pids)
end, U0)),
{Unconfirmed, Settled, Rejected} =
settle_seq_nos(MsgSeqNos, Pid, U0, down),
Actions = settlement_action(
settled, QName, Settled,
settlement_action(rejected, QName, Rejected, Actions0)),
{Unconfirmed, Settled, Rejected} = settle_seq_nos(MsgSeqNos, Pid, U0, down),
Actions1 = case Rejected of
[] ->
Actions0;
_ ->
[{rejected, QName, down, Rejected} | Actions0]
end,
Actions = case Settled of
[] ->
Actions1;
_ ->
[{settled, QName, Settled} | Actions1]
end,
{ok, State#?STATE{unconfirmed = Unconfirmed}, Actions};
true ->
%% any abnormal exit should be considered a full reject of the
Expand All @@ -425,17 +433,12 @@ handle_event(QName, {down, Pid, Info}, #?STATE{monitored = Monitored,
end, [], U0),
U = maps:without(MsgIds, U0),
{ok, State#?STATE{unconfirmed = U},
[{rejected, QName, MsgIds} | Actions0]}
[{rejected, QName, down, MsgIds} | Actions0]}
end;
handle_event(_QName, Action, State)
when element(1, Action) =:= credit_reply ->
{ok, State, [Action]}.

settlement_action(_Type, _QRef, [], Acc) ->
Acc;
settlement_action(Type, QRef, MsgSeqs, Acc) ->
[{Type, QRef, MsgSeqs} | Acc].

supports_stateful_delivery() -> true.

-spec deliver([{amqqueue:target(), state()}],
Expand Down
9 changes: 5 additions & 4 deletions deps/rabbit/src/rabbit_fifo_dlx_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ wait_for_queue_deleted(QRef, N) ->
end.

-spec lookup_topology(state()) -> state().
lookup_topology(#state{queue_ref = {resource, Vhost, queue, _} = QRef} = State) ->
lookup_topology(#state{queue_ref = #resource{virtual_host = Vhost,
kind = queue} = QRef} = State) ->
{ok, Q} = rabbit_amqqueue:lookup(QRef),
DLRKey = rabbit_queue_type_util:args_policy_lookup(<<"dead-letter-routing-key">>,
fun(_Pol, QArg) -> QArg end, Q),
Expand All @@ -253,7 +254,7 @@ handle_queue_actions(Actions, State0) ->
S1 = handle_settled(QRef, MsgSeqs, S0),
S2 = ack(S1),
maybe_cancel_timer(S2);
({rejected, QRef, MsgSeqs}, S0) ->
({rejected, QRef, _Reason, MsgSeqs}, S0) ->
handle_rejected(QRef, MsgSeqs, S0);
({queue_down, _QRef}, S0) ->
%% target classic queue is down, but not deleted
Expand Down Expand Up @@ -291,8 +292,8 @@ rejected(SeqNo, Qs, Pendings)
Pendings);
false ->
?LOG_DEBUG("Ignoring rejection for unknown sequence number ~b "
"from target dead letter queues ~tp",
[SeqNo, Qs]),
"from target dead letter queues ~tp",
[SeqNo, Qs]),
Pendings
end.

Expand Down
3 changes: 3 additions & 0 deletions deps/rabbit/src/rabbit_queue_type.erl
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@

-type queue_name() :: rabbit_amqqueue:name().
-type queue_state() :: term().
-type reject_reason() :: maxlen | down.
%% sequence number typically
-type correlation() :: term().
-type arguments() :: queue_arguments | consumer_arguments.
Expand All @@ -101,6 +102,7 @@
%% indicate to the queue type module that a message has been delivered
%% fully to the queue
{settled, queue_name(), [correlation()]} |
{rejected, queue_name(), reject_reason(), [correlation()]} |
{deliver, rabbit_types:ctag(), boolean(), [rabbit_amqqueue:qmsg()]} |
{block | unblock, QueueName :: term()} |
credit_reply_action().
Expand Down Expand Up @@ -158,6 +160,7 @@
credit_reply_action/0,
action/0,
actions/0,
reject_reason/0,
settle_op/0,
queue_type/0,
credit/0,
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1125,7 +1125,7 @@ deliver(QSs, Msg0, Options) ->
case deliver0(QName, Correlation, Msg, S0) of
{reject_publish, S} ->
{[{Q, S} | Qs],
[{rejected, QName, [Correlation]} | Actions]};
[{rejected, QName, maxlen, [Correlation]} | Actions]};
{ok, S, As} ->
{[{Q, S} | Qs], As ++ Actions}
end
Expand Down
Loading
Loading