Skip to content

Commit c4fdca4

Browse files
committed
Provide queue name and reason to publisher for rejected messages
## What? 1. This commit adds the name of the queue that rejected a message as well as the reason why the message was rejected in the Rejected outcome for AMQP 1.0. 2. For MQTT 5.0 publishers, the reason is provided in the PUBACK packet. ## Why? It may be helpful for publishers to know which queue out of multilple target queues rejected the message. One such use case is described in #1443 It may also be helpful to know for publishers whether the given target queue rejected the message because its maximum queue length was reached or because the target queue happens to be unavailable. ## How? RabbitMQ will include the UTF-8 encoded queue name (rather than an AMQP address) as well as the reason in the `info` field of the Rejected outcome's `error` field: * `queue: <queue name>` * `reason: maxlen | unavailable` For MQTT 5.0, if the queue length limit is exceeded, RabbitMQ will return the reason code for `Quota exceeded` to the publisher.
1 parent 8911fa6 commit c4fdca4

File tree

11 files changed

+229
-124
lines changed

11 files changed

+229
-124
lines changed

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,10 @@
373373

374374
%% Queue actions that we will process later such that we can confirm and reject
375375
%% delivery IDs in ranges to reduce the number of DISPOSITION frames sent to the client.
376-
stashed_rejected = [] :: [{rejected, rabbit_amqqueue:name(), [delivery_number(),...]}],
376+
stashed_rejected = [] :: [{rejected,
377+
rabbit_amqqueue:name(),
378+
rabbit_queue_type:reject_reason(),
379+
[delivery_number(),...]}],
377380
stashed_settled = [] :: [{settled, rabbit_amqqueue:name(), [delivery_number(),...]}],
378381
%% Classic queues that are down.
379382
stashed_down = []:: [rabbit_amqqueue:name()],
@@ -692,7 +695,17 @@ send_delivery_state_changes(State0 = #state{cfg = #cfg{writer_pid = Writer,
692695
%% Order is important:
693696
%% 1. Process queue rejections.
694697
{RejectedIds, GrantCredits0, State1} = handle_stashed_rejected(State0),
695-
send_dispositions(RejectedIds, #'v1_0.rejected'{}, Writer, ChannelNum),
698+
maps:foreach(
699+
fun({QNameBin, Reason}, Ids) ->
700+
Info = {map,
701+
[{{symbol, <<"queue">>}, {utf8, QNameBin}},
702+
{{symbol, <<"reason">>}, {symbol, reject_reason_to_binary(Reason)}}]},
703+
Rej = #'v1_0.rejected'{
704+
error = #'v1_0.error'{
705+
condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
706+
info = Info}},
707+
send_dispositions(Ids, Rej, Writer, ChannelNum)
708+
end, RejectedIds),
696709
%% 2. Process queue confirmations.
697710
{AcceptedIds0, GrantCredits1, State2} = handle_stashed_settled(GrantCredits0, State1),
698711
%% 3. Process unavailable classic queues.
@@ -716,22 +729,28 @@ send_delivery_state_changes(State0 = #state{cfg = #cfg{writer_pid = Writer,
716729
State.
717730

718731
handle_stashed_rejected(#state{stashed_rejected = []} = State) ->
719-
{[], #{}, State};
732+
{#{}, #{}, State};
720733
handle_stashed_rejected(#state{cfg = #cfg{max_link_credit = MaxLinkCredit},
721734
stashed_rejected = Actions,
722735
incoming_links = Links} = State0) ->
723736
{Ids, GrantCredits, Ls} =
724737
lists:foldl(
725-
fun({rejected, _QName, Correlations}, Accum) ->
738+
fun({rejected, #resource{name = QNameBin}, Reason, Correlations}, Accum) ->
726739
lists:foldl(
727740
fun({HandleInt, DeliveryId}, {Ids0, GrantCreds0, Links0} = Acc) ->
728741
case Links0 of
729742
#{HandleInt := Link0 = #incoming_link{incoming_unconfirmed_map = U0}} ->
730743
case maps:take(DeliveryId, U0) of
731744
{{_, Settled, _}, U} ->
732745
Ids1 = case Settled of
733-
true -> Ids0;
734-
false -> [DeliveryId | Ids0]
746+
true ->
747+
Ids0;
748+
false ->
749+
maps:update_with(
750+
{QNameBin, Reason},
751+
fun(L) -> [DeliveryId | L] end,
752+
[DeliveryId],
753+
Ids0)
735754
end,
736755
Link1 = Link0#incoming_link{incoming_unconfirmed_map = U},
737756
{Link, GrantCreds} = maybe_grant_link_credit(
@@ -745,7 +764,7 @@ handle_stashed_rejected(#state{cfg = #cfg{max_link_credit = MaxLinkCredit},
745764
Acc
746765
end
747766
end, Accum, Correlations)
748-
end, {[], #{}, Links}, Actions),
767+
end, {#{}, #{}, Links}, Actions),
749768

750769
State = State0#state{stashed_rejected = [],
751770
incoming_links = Ls},
@@ -2116,7 +2135,7 @@ handle_queue_actions(Actions, State) ->
21162135
lists:foldl(
21172136
fun ({settled, _QName, _DelIds} = Action, #state{stashed_settled = As} = S) ->
21182137
S#state{stashed_settled = [Action | As]};
2119-
({rejected, _QName, _DelIds} = Action, #state{stashed_rejected = As} = S) ->
2138+
({rejected, _QName, _Reason, _DelIds} = Action, #state{stashed_rejected = As} = S) ->
21202139
S#state{stashed_rejected = [Action | As]};
21212140
({deliver, CTag, AckRequired, Msgs}, S0) ->
21222141
lists:foldl(fun(Msg, S) ->
@@ -2584,6 +2603,11 @@ rejected(DeliveryId, Error) ->
25842603
settled = true,
25852604
state = #'v1_0.rejected'{error = Error}}.
25862605

2606+
reject_reason_to_binary(maxlen) ->
2607+
<<"maxlen">>;
2608+
reject_reason_to_binary(down) ->
2609+
<<"unavailable">>.
2610+
25872611
maybe_grant_link_credit(Credit, MaxLinkCredit, DeliveryCount, NumUnconfirmed, Handle) ->
25882612
case grant_link_credit(Credit, MaxLinkCredit, NumUnconfirmed) of
25892613
true ->

deps/rabbit/src/rabbit_channel.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2787,7 +2787,7 @@ handle_queue_actions(Actions, State) ->
27872787
lists:foldl(
27882788
fun({settled, QRef, MsgSeqNos}, S0) ->
27892789
confirm(MsgSeqNos, QRef, S0);
2790-
({rejected, _QRef, MsgSeqNos}, S0) ->
2790+
({rejected, _QRef, _Reason, MsgSeqNos}, S0) ->
27912791
{U, Rej} =
27922792
lists:foldr(
27932793
fun(SeqNo, {U1, Acc}) ->

deps/rabbit/src/rabbit_classic_queue.erl

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ handle_event(QName, {reject_publish, SeqNo, _QPid},
390390
%% It does not matter which queue rejected the message,
391391
%% if any queue did, it should not be confirmed.
392392
{U, Rejected} = reject_seq_no(SeqNo, U0),
393-
Actions = [{rejected, QName, Rejected}],
393+
Actions = [{rejected, QName, maxlen, Rejected}],
394394
{ok, State#?STATE{unconfirmed = U}, Actions};
395395
handle_event(QName, {down, Pid, Info}, #?STATE{monitored = Monitored,
396396
unconfirmed = U0} = State0) ->
@@ -405,11 +405,19 @@ handle_event(QName, {down, Pid, Info}, #?STATE{monitored = Monitored,
405405
maps:filter(fun (_, #msg_status{pending = Pids}) ->
406406
lists:member(Pid, Pids)
407407
end, U0)),
408-
{Unconfirmed, Settled, Rejected} =
409-
settle_seq_nos(MsgSeqNos, Pid, U0, down),
410-
Actions = settlement_action(
411-
settled, QName, Settled,
412-
settlement_action(rejected, QName, Rejected, Actions0)),
408+
{Unconfirmed, Settled, Rejected} = settle_seq_nos(MsgSeqNos, Pid, U0, down),
409+
Actions1 = case Rejected of
410+
[] ->
411+
Actions0;
412+
_ ->
413+
[{rejected, QName, down, Rejected} | Actions0]
414+
end,
415+
Actions = case Settled of
416+
[] ->
417+
Actions1;
418+
_ ->
419+
[{settled, QName, Settled} | Actions1]
420+
end,
413421
{ok, State#?STATE{unconfirmed = Unconfirmed}, Actions};
414422
true ->
415423
%% any abnormal exit should be considered a full reject of the
@@ -425,17 +433,12 @@ handle_event(QName, {down, Pid, Info}, #?STATE{monitored = Monitored,
425433
end, [], U0),
426434
U = maps:without(MsgIds, U0),
427435
{ok, State#?STATE{unconfirmed = U},
428-
[{rejected, QName, MsgIds} | Actions0]}
436+
[{rejected, QName, down, MsgIds} | Actions0]}
429437
end;
430438
handle_event(_QName, Action, State)
431439
when element(1, Action) =:= credit_reply ->
432440
{ok, State, [Action]}.
433441

434-
settlement_action(_Type, _QRef, [], Acc) ->
435-
Acc;
436-
settlement_action(Type, QRef, MsgSeqs, Acc) ->
437-
[{Type, QRef, MsgSeqs} | Acc].
438-
439442
supports_stateful_delivery() -> true.
440443

441444
-spec deliver([{amqqueue:target(), state()}],

deps/rabbit/src/rabbit_fifo_dlx_worker.erl

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,8 @@ wait_for_queue_deleted(QRef, N) ->
232232
end.
233233

234234
-spec lookup_topology(state()) -> state().
235-
lookup_topology(#state{queue_ref = {resource, Vhost, queue, _} = QRef} = State) ->
235+
lookup_topology(#state{queue_ref = #resource{virtual_host = Vhost,
236+
kind = queue} = QRef} = State) ->
236237
{ok, Q} = rabbit_amqqueue:lookup(QRef),
237238
DLRKey = rabbit_queue_type_util:args_policy_lookup(<<"dead-letter-routing-key">>,
238239
fun(_Pol, QArg) -> QArg end, Q),
@@ -253,7 +254,7 @@ handle_queue_actions(Actions, State0) ->
253254
S1 = handle_settled(QRef, MsgSeqs, S0),
254255
S2 = ack(S1),
255256
maybe_cancel_timer(S2);
256-
({rejected, QRef, MsgSeqs}, S0) ->
257+
({rejected, QRef, _Reason, MsgSeqs}, S0) ->
257258
handle_rejected(QRef, MsgSeqs, S0);
258259
({queue_down, _QRef}, S0) ->
259260
%% target classic queue is down, but not deleted
@@ -291,8 +292,8 @@ rejected(SeqNo, Qs, Pendings)
291292
Pendings);
292293
false ->
293294
?LOG_DEBUG("Ignoring rejection for unknown sequence number ~b "
294-
"from target dead letter queues ~tp",
295-
[SeqNo, Qs]),
295+
"from target dead letter queues ~tp",
296+
[SeqNo, Qs]),
296297
Pendings
297298
end.
298299

deps/rabbit/src/rabbit_queue_type.erl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181

8282
-type queue_name() :: rabbit_amqqueue:name().
8383
-type queue_state() :: term().
84+
-type reject_reason() :: maxlen | down.
8485
%% sequence number typically
8586
-type correlation() :: term().
8687
-type arguments() :: queue_arguments | consumer_arguments.
@@ -101,6 +102,7 @@
101102
%% indicate to the queue type module that a message has been delivered
102103
%% fully to the queue
103104
{settled, queue_name(), [correlation()]} |
105+
{rejected, queue_name(), reject_reason(), [correlation()]} |
104106
{deliver, rabbit_types:ctag(), boolean(), [rabbit_amqqueue:qmsg()]} |
105107
{block | unblock, QueueName :: term()} |
106108
credit_reply_action().
@@ -158,6 +160,7 @@
158160
credit_reply_action/0,
159161
action/0,
160162
actions/0,
163+
reject_reason/0,
161164
settle_op/0,
162165
queue_type/0,
163166
credit/0,

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1125,7 +1125,7 @@ deliver(QSs, Msg0, Options) ->
11251125
case deliver0(QName, Correlation, Msg, S0) of
11261126
{reject_publish, S} ->
11271127
{[{Q, S} | Qs],
1128-
[{rejected, QName, [Correlation]} | Actions]};
1128+
[{rejected, QName, maxlen, [Correlation]} | Actions]};
11291129
{ok, S, As} ->
11301130
{[{Q, S} | Qs], As ++ Actions}
11311131
end

0 commit comments

Comments
 (0)