Skip to content

Commit 9b11653

Browse files
committed
Provide queue name and reason to publisher for rejected messages
## What? 1. This commit adds the name of the queue that rejected a message as well as the reason why the message was rejected in the Rejected outcome for AMQP 1.0. 2. For MQTT 5.0 publishers, the reason is provided in the PUBACK packet. ## Why? It may be helpful for publishers to know which queue out of multilple target queues rejected the message. One such use case is described in #1443 It may also be helpful to know for publishers whether the given target queue rejected the message because its maximum queue length was reached or because the target queue happens to be unavailable. ## How? RabbitMQ will include the UTF-8 encoded queue name (rather than an AMQP address) as well as the reason in the `info` field of the Rejected outcome's `error` field: * `queue: <queue name>` * `reason: maxlen | unavailable` For MQTT 5.0, if the queue length limit is exceeded, RabbitMQ will return the reason code for `Quota exceeded` to the publisher.
1 parent 506176e commit 9b11653

File tree

10 files changed

+88
-41
lines changed

10 files changed

+88
-41
lines changed

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,10 @@
373373

374374
%% Queue actions that we will process later such that we can confirm and reject
375375
%% delivery IDs in ranges to reduce the number of DISPOSITION frames sent to the client.
376-
stashed_rejected = [] :: [{rejected, rabbit_amqqueue:name(), [delivery_number(),...]}],
376+
stashed_rejected = [] :: [{rejected,
377+
rabbit_amqqueue:name(),
378+
rabbit_queue_type:reject_reason(),
379+
[delivery_number(),...]}],
377380
stashed_settled = [] :: [{settled, rabbit_amqqueue:name(), [delivery_number(),...]}],
378381
%% Classic queues that are down.
379382
stashed_down = []:: [rabbit_amqqueue:name()],
@@ -692,7 +695,17 @@ send_delivery_state_changes(State0 = #state{cfg = #cfg{writer_pid = Writer,
692695
%% Order is important:
693696
%% 1. Process queue rejections.
694697
{RejectedIds, GrantCredits0, State1} = handle_stashed_rejected(State0),
695-
send_dispositions(RejectedIds, #'v1_0.rejected'{}, Writer, ChannelNum),
698+
maps:foreach(
699+
fun({QNameBin, Reason}, Ids) ->
700+
Info = {map,
701+
[{{symbol, <<"queue">>}, {utf8, QNameBin}},
702+
{{symbol, <<"reason">>}, {symbol, reject_reason_to_binary(Reason)}}]},
703+
Rej = #'v1_0.rejected'{
704+
error = #'v1_0.error'{
705+
condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
706+
info = Info}},
707+
send_dispositions(Ids, Rej, Writer, ChannelNum)
708+
end, RejectedIds),
696709
%% 2. Process queue confirmations.
697710
{AcceptedIds0, GrantCredits1, State2} = handle_stashed_settled(GrantCredits0, State1),
698711
%% 3. Process unavailable classic queues.
@@ -716,22 +729,28 @@ send_delivery_state_changes(State0 = #state{cfg = #cfg{writer_pid = Writer,
716729
State.
717730

718731
handle_stashed_rejected(#state{stashed_rejected = []} = State) ->
719-
{[], #{}, State};
732+
{#{}, #{}, State};
720733
handle_stashed_rejected(#state{cfg = #cfg{max_link_credit = MaxLinkCredit},
721734
stashed_rejected = Actions,
722735
incoming_links = Links} = State0) ->
723736
{Ids, GrantCredits, Ls} =
724737
lists:foldl(
725-
fun({rejected, _QName, Correlations}, Accum) ->
738+
fun({rejected, #resource{name = QNameBin}, Reason, Correlations}, Accum) ->
726739
lists:foldl(
727740
fun({HandleInt, DeliveryId}, {Ids0, GrantCreds0, Links0} = Acc) ->
728741
case Links0 of
729742
#{HandleInt := Link0 = #incoming_link{incoming_unconfirmed_map = U0}} ->
730743
case maps:take(DeliveryId, U0) of
731744
{{_, Settled, _}, U} ->
732745
Ids1 = case Settled of
733-
true -> Ids0;
734-
false -> [DeliveryId | Ids0]
746+
true ->
747+
Ids0;
748+
false ->
749+
maps:update_with(
750+
{QNameBin, Reason},
751+
fun(L) -> [DeliveryId | L] end,
752+
[DeliveryId],
753+
Ids0)
735754
end,
736755
Link1 = Link0#incoming_link{incoming_unconfirmed_map = U},
737756
{Link, GrantCreds} = maybe_grant_link_credit(
@@ -745,7 +764,7 @@ handle_stashed_rejected(#state{cfg = #cfg{max_link_credit = MaxLinkCredit},
745764
Acc
746765
end
747766
end, Accum, Correlations)
748-
end, {[], #{}, Links}, Actions),
767+
end, {#{}, #{}, Links}, Actions),
749768

750769
State = State0#state{stashed_rejected = [],
751770
incoming_links = Ls},
@@ -2116,7 +2135,7 @@ handle_queue_actions(Actions, State) ->
21162135
lists:foldl(
21172136
fun ({settled, _QName, _DelIds} = Action, #state{stashed_settled = As} = S) ->
21182137
S#state{stashed_settled = [Action | As]};
2119-
({rejected, _QName, _DelIds} = Action, #state{stashed_rejected = As} = S) ->
2138+
({rejected, _QName, _Reason, _DelIds} = Action, #state{stashed_rejected = As} = S) ->
21202139
S#state{stashed_rejected = [Action | As]};
21212140
({deliver, CTag, AckRequired, Msgs}, S0) ->
21222141
lists:foldl(fun(Msg, S) ->
@@ -2584,6 +2603,11 @@ rejected(DeliveryId, Error) ->
25842603
settled = true,
25852604
state = #'v1_0.rejected'{error = Error}}.
25862605

2606+
reject_reason_to_binary(maxlen) ->
2607+
<<"maxlen">>;
2608+
reject_reason_to_binary(down) ->
2609+
<<"unavailable">>.
2610+
25872611
maybe_grant_link_credit(Credit, MaxLinkCredit, DeliveryCount, NumUnconfirmed, Handle) ->
25882612
case grant_link_credit(Credit, MaxLinkCredit, NumUnconfirmed) of
25892613
true ->

deps/rabbit/src/rabbit_channel.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2787,7 +2787,7 @@ handle_queue_actions(Actions, State) ->
27872787
lists:foldl(
27882788
fun({settled, QRef, MsgSeqNos}, S0) ->
27892789
confirm(MsgSeqNos, QRef, S0);
2790-
({rejected, _QRef, MsgSeqNos}, S0) ->
2790+
({rejected, _QRef, _Reason, MsgSeqNos}, S0) ->
27912791
{U, Rej} =
27922792
lists:foldr(
27932793
fun(SeqNo, {U1, Acc}) ->

deps/rabbit/src/rabbit_classic_queue.erl

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ handle_event(QName, {reject_publish, SeqNo, _QPid},
390390
%% It does not matter which queue rejected the message,
391391
%% if any queue did, it should not be confirmed.
392392
{U, Rejected} = reject_seq_no(SeqNo, U0),
393-
Actions = [{rejected, QName, Rejected}],
393+
Actions = [{rejected, QName, maxlen, Rejected}],
394394
{ok, State#?STATE{unconfirmed = U}, Actions};
395395
handle_event(QName, {down, Pid, Info}, #?STATE{monitored = Monitored,
396396
unconfirmed = U0} = State0) ->
@@ -405,11 +405,19 @@ handle_event(QName, {down, Pid, Info}, #?STATE{monitored = Monitored,
405405
maps:filter(fun (_, #msg_status{pending = Pids}) ->
406406
lists:member(Pid, Pids)
407407
end, U0)),
408-
{Unconfirmed, Settled, Rejected} =
409-
settle_seq_nos(MsgSeqNos, Pid, U0, down),
410-
Actions = settlement_action(
411-
settled, QName, Settled,
412-
settlement_action(rejected, QName, Rejected, Actions0)),
408+
{Unconfirmed, Settled, Rejected} = settle_seq_nos(MsgSeqNos, Pid, U0, down),
409+
Actions1 = case Rejected of
410+
[] ->
411+
Actions0;
412+
_ ->
413+
[{rejected, QName, down, Rejected} | Actions0]
414+
end,
415+
Actions = case Settled of
416+
[] ->
417+
Actions1;
418+
_ ->
419+
[{settled, QName, Settled} | Actions1]
420+
end,
413421
{ok, State#?STATE{unconfirmed = Unconfirmed}, Actions};
414422
true ->
415423
%% any abnormal exit should be considered a full reject of the
@@ -425,17 +433,12 @@ handle_event(QName, {down, Pid, Info}, #?STATE{monitored = Monitored,
425433
end, [], U0),
426434
U = maps:without(MsgIds, U0),
427435
{ok, State#?STATE{unconfirmed = U},
428-
[{rejected, QName, MsgIds} | Actions0]}
436+
[{rejected, QName, down, MsgIds} | Actions0]}
429437
end;
430438
handle_event(_QName, Action, State)
431439
when element(1, Action) =:= credit_reply ->
432440
{ok, State, [Action]}.
433441

434-
settlement_action(_Type, _QRef, [], Acc) ->
435-
Acc;
436-
settlement_action(Type, QRef, MsgSeqs, Acc) ->
437-
[{Type, QRef, MsgSeqs} | Acc].
438-
439442
supports_stateful_delivery() -> true.
440443

441444
-spec deliver([{amqqueue:target(), state()}],

deps/rabbit/src/rabbit_fifo_dlx_worker.erl

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,8 @@ wait_for_queue_deleted(QRef, N) ->
232232
end.
233233

234234
-spec lookup_topology(state()) -> state().
235-
lookup_topology(#state{queue_ref = {resource, Vhost, queue, _} = QRef} = State) ->
235+
lookup_topology(#state{queue_ref = #resource{virtual_host = Vhost,
236+
kind = queue} = QRef} = State) ->
236237
{ok, Q} = rabbit_amqqueue:lookup(QRef),
237238
DLRKey = rabbit_queue_type_util:args_policy_lookup(<<"dead-letter-routing-key">>,
238239
fun(_Pol, QArg) -> QArg end, Q),
@@ -253,7 +254,7 @@ handle_queue_actions(Actions, State0) ->
253254
S1 = handle_settled(QRef, MsgSeqs, S0),
254255
S2 = ack(S1),
255256
maybe_cancel_timer(S2);
256-
({rejected, QRef, MsgSeqs}, S0) ->
257+
({rejected, QRef, _Reason, MsgSeqs}, S0) ->
257258
handle_rejected(QRef, MsgSeqs, S0);
258259
({queue_down, _QRef}, S0) ->
259260
%% target classic queue is down, but not deleted
@@ -291,8 +292,8 @@ rejected(SeqNo, Qs, Pendings)
291292
Pendings);
292293
false ->
293294
?LOG_DEBUG("Ignoring rejection for unknown sequence number ~b "
294-
"from target dead letter queues ~tp",
295-
[SeqNo, Qs]),
295+
"from target dead letter queues ~tp",
296+
[SeqNo, Qs]),
296297
Pendings
297298
end.
298299

deps/rabbit/src/rabbit_queue_type.erl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181

8282
-type queue_name() :: rabbit_amqqueue:name().
8383
-type queue_state() :: term().
84+
-type reject_reason() :: maxlen | down.
8485
%% sequence number typically
8586
-type correlation() :: term().
8687
-type arguments() :: queue_arguments | consumer_arguments.
@@ -101,6 +102,7 @@
101102
%% indicate to the queue type module that a message has been delivered
102103
%% fully to the queue
103104
{settled, queue_name(), [correlation()]} |
105+
{rejected, queue_name(), reject_reason(), [correlation()]} |
104106
{deliver, rabbit_types:ctag(), boolean(), [rabbit_amqqueue:qmsg()]} |
105107
{block | unblock, QueueName :: term()} |
106108
credit_reply_action().
@@ -158,6 +160,7 @@
158160
credit_reply_action/0,
159161
action/0,
160162
actions/0,
163+
reject_reason/0,
161164
settle_op/0,
162165
queue_type/0,
163166
credit/0,

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1125,7 +1125,7 @@ deliver(QSs, Msg0, Options) ->
11251125
case deliver0(QName, Correlation, Msg, S0) of
11261126
{reject_publish, S} ->
11271127
{[{Q, S} | Qs],
1128-
[{rejected, QName, [Correlation]} | Actions]};
1128+
[{rejected, QName, maxlen, [Correlation]} | Actions]};
11291129
{ok, S, As} ->
11301130
{[{Q, S} | Qs], As ++ Actions}
11311131
end

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1053,7 +1053,7 @@ invalid_transfer_settled_flag(Config) ->
10531053

10541054
quorum_queue_rejects(Config) ->
10551055
{_, Session, LinkPair} = Init = init(Config),
1056-
QName = atom_to_binary(?FUNCTION_NAME),
1056+
QName = <<"🎄"/utf8>>,
10571057
QProps = #{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>},
10581058
<<"x-max-length">> => {ulong, 1},
10591059
<<"x-overflow">> => {utf8, <<"reject-publish">>}}},
@@ -1068,12 +1068,11 @@ quorum_queue_rejects(Config) ->
10681068
%% Therefore, we expect the first 2 messages to be accepted.
10691069
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag a">>, <<>>, false)),
10701070
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag b">>, <<>>, false)),
1071-
[receive {amqp10_disposition, {accepted, DTag}} -> ok
1072-
after 30000 -> ct:fail({missing_accepted, DTag})
1073-
end || DTag <- [<<"tag a">>, <<"tag b">>]],
1071+
ok = wait_for_accepted(<<"tag a">>),
1072+
ok = wait_for_accepted(<<"tag b">>),
10741073

10751074
%% From now on the quorum queue should reject our publishes.
1076-
%% Send many messages aync.
1075+
%% Send many messages async.
10771076
NumMsgs = 20,
10781077
DTags = [begin
10791078
DTag = integer_to_binary(N),
@@ -1086,8 +1085,13 @@ quorum_queue_rejects(Config) ->
10861085
%% and the final one as unsettled again
10871086
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag d">>, <<>>, false)),
10881087

1089-
[receive {amqp10_disposition, {rejected, DTag}} -> ok
1090-
after 30000 -> ct:fail({missing_rejected, DTag})
1088+
[receive {amqp10_disposition, {{rejected, Error}, DTag}} ->
1089+
?assertMatch(
1090+
#'v1_0.error'{
1091+
info = {map, [{{symbol, <<"queue">>}, {utf8, QName}},
1092+
{{symbol, <<"reason">>}, {symbol, <<"maxlen">>}}]}},
1093+
Error)
1094+
after 9000 -> ct:fail({missing_rejected, DTag})
10911095
end || DTag <- DTags ++ [<<"tag d">>]],
10921096

10931097
ok = amqp10_client:detach_link(Sender),
@@ -3659,7 +3663,11 @@ target_classic_queue_down(Config) ->
36593663
%% Instead, the server should reject messages that are sent to classic queues that are down.
36603664
DTag2 = <<"t2">>,
36613665
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag2, <<"m2">>, false)),
3662-
ok = wait_for_settlement(DTag2, rejected),
3666+
ExpectedErr = #'v1_0.error'{
3667+
condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
3668+
info = {map, [{{symbol, <<"queue">>}, {utf8, QName}},
3669+
{{symbol, <<"reason">>}, {symbol, <<"unavailable">>}}]}},
3670+
ok = wait_for_settlement(DTag2, {rejected, ExpectedErr}),
36633671

36643672
ok = rabbit_ct_broker_helpers:start_node(Config, ClassicQueueNode),
36653673
%% Now that the classic queue is up again, we should be able to attach a new receiver

deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2053,15 +2053,20 @@ handle_queue_actions(Actions, #state{} = State0) ->
20532053
{ConfirmPktIds, U} = rabbit_mqtt_confirms:confirm(PktIds, QName, U0),
20542054
send_puback(ConfirmPktIds, ?RC_SUCCESS, S),
20552055
S#state{unacked_client_pubs = U};
2056-
({rejected, _QName, PktIds}, S0 = #state{unacked_client_pubs = U0,
2057-
cfg = #cfg{proto_ver = ProtoVer}}) ->
2056+
({rejected, _QName, Reason, PktIds},
2057+
#state{unacked_client_pubs = U0,
2058+
cfg = #cfg{proto_ver = ProtoVer}} = S0) ->
20582059
{RejectPktIds, U} = rabbit_mqtt_confirms:reject(PktIds, U0),
20592060
S = S0#state{unacked_client_pubs = U},
20602061
%% Negative acks are supported only in MQTT v5. In MQTT v3 and v4 we ignore
20612062
%% rejected messages since we can only (but must not) send a positive ack.
20622063
case ProtoVer of
20632064
?MQTT_PROTO_V5 ->
2064-
send_puback(RejectPktIds, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, S);
2065+
RC = case Reason of
2066+
maxlen -> ?RC_QUOTA_EXCEEDED;
2067+
_ -> ?RC_IMPLEMENTATION_SPECIFIC_ERROR
2068+
end,
2069+
send_puback(RejectPktIds, RC, S);
20652070
_ ->
20662071
ok
20672072
end,

deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -384,11 +384,13 @@ quorum_queue_rejects(Config) ->
384384
%% The queue will reject m3.
385385
V = ?config(mqtt_version, Config),
386386
if V =:= v3 orelse V =:= v4 ->
387-
%% v3 and v4 do not support NACKs. Therefore, the server should drop the message.
387+
%% v3 and v4 do not support NACKs.
388+
%% Therefore, the server should drop the message.
388389
?assertEqual(puback_timeout, util:publish_qos1_timeout(C, Name, <<"m3">>, 700));
389390
V =:= v5 ->
390-
%% v5 supports NACKs. Therefore, the server should send us a NACK.
391-
?assertMatch({ok, #{reason_code_name := implementation_specific_error}},
391+
%% v5 supports NACKs.
392+
%% Therefore, the server should send us a NACK with the correct reason.
393+
?assertMatch({ok, #{reason_code_name := quota_exceeded}},
392394
emqtt:publish(C, Name, <<"m3">>, qos1))
393395
end,
394396

deps/rabbitmq_shovel/src/rabbit_local_shovel.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -656,7 +656,8 @@ handle_dest_queue_actions(Actions, State) ->
656656
lists:foldl(
657657
fun({settled, QName, MsgSeqNos}, S0) ->
658658
confirm(MsgSeqNos, QName, S0);
659-
({rejected, _QName, MsgSeqNos}, #{dest := Dst = #{unconfirmed := U0}} = S0) ->
659+
({rejected, _QName, _Reason, MsgSeqNos},
660+
#{dest := Dst = #{unconfirmed := U0}} = S0) ->
660661
{U, Rej} =
661662
lists:foldr(
662663
fun(SeqNo, {U1, Acc}) ->

0 commit comments

Comments
 (0)