From c4fdca4fa9d09a90b95c2d400a6ade3d8f28d76d Mon Sep 17 00:00:00 2001 From: David Ansari Date: Fri, 5 Dec 2025 11:30:52 +0100 Subject: [PATCH] Provide queue name and reason to publisher for rejected messages ## What? 1. This commit adds the name of the queue that rejected a message as well as the reason why the message was rejected in the Rejected outcome for AMQP 1.0. 2. For MQTT 5.0 publishers, the reason is provided in the PUBACK packet. ## Why? It may be helpful for publishers to know which queue out of multilple target queues rejected the message. One such use case is described in https://github.com/rabbitmq/rabbitmq-server/issues/1443 It may also be helpful to know for publishers whether the given target queue rejected the message because its maximum queue length was reached or because the target queue happens to be unavailable. ## How? RabbitMQ will include the UTF-8 encoded queue name (rather than an AMQP address) as well as the reason in the `info` field of the Rejected outcome's `error` field: * `queue: ` * `reason: maxlen | unavailable` For MQTT 5.0, if the queue length limit is exceeded, RabbitMQ will return the reason code for `Quota exceeded` to the publisher. --- deps/rabbit/src/rabbit_amqp_session.erl | 40 +++- deps/rabbit/src/rabbit_channel.erl | 2 +- deps/rabbit/src/rabbit_classic_queue.erl | 27 ++- deps/rabbit/src/rabbit_fifo_dlx_worker.erl | 9 +- deps/rabbit/src/rabbit_queue_type.erl | 3 + deps/rabbit/src/rabbit_quorum_queue.erl | 2 +- deps/rabbit/test/amqp_client_SUITE.erl | 223 +++++++++++------- .../src/rabbit_mqtt_processor.erl | 11 +- deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl | 8 +- .../src/rabbit_local_shovel.erl | 3 +- release-notes/4.3.0.md | 25 ++ 11 files changed, 229 insertions(+), 124 deletions(-) diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 8e1f074dc5fd..8b12a2b1de86 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -373,7 +373,10 @@ %% Queue actions that we will process later such that we can confirm and reject %% delivery IDs in ranges to reduce the number of DISPOSITION frames sent to the client. - stashed_rejected = [] :: [{rejected, rabbit_amqqueue:name(), [delivery_number(),...]}], + stashed_rejected = [] :: [{rejected, + rabbit_amqqueue:name(), + rabbit_queue_type:reject_reason(), + [delivery_number(),...]}], stashed_settled = [] :: [{settled, rabbit_amqqueue:name(), [delivery_number(),...]}], %% Classic queues that are down. stashed_down = []:: [rabbit_amqqueue:name()], @@ -692,7 +695,17 @@ send_delivery_state_changes(State0 = #state{cfg = #cfg{writer_pid = Writer, %% Order is important: %% 1. Process queue rejections. {RejectedIds, GrantCredits0, State1} = handle_stashed_rejected(State0), - send_dispositions(RejectedIds, #'v1_0.rejected'{}, Writer, ChannelNum), + maps:foreach( + fun({QNameBin, Reason}, Ids) -> + Info = {map, + [{{symbol, <<"queue">>}, {utf8, QNameBin}}, + {{symbol, <<"reason">>}, {symbol, reject_reason_to_binary(Reason)}}]}, + Rej = #'v1_0.rejected'{ + error = #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED, + info = Info}}, + send_dispositions(Ids, Rej, Writer, ChannelNum) + end, RejectedIds), %% 2. Process queue confirmations. {AcceptedIds0, GrantCredits1, State2} = handle_stashed_settled(GrantCredits0, State1), %% 3. Process unavailable classic queues. @@ -716,13 +729,13 @@ send_delivery_state_changes(State0 = #state{cfg = #cfg{writer_pid = Writer, State. handle_stashed_rejected(#state{stashed_rejected = []} = State) -> - {[], #{}, State}; + {#{}, #{}, State}; handle_stashed_rejected(#state{cfg = #cfg{max_link_credit = MaxLinkCredit}, stashed_rejected = Actions, incoming_links = Links} = State0) -> {Ids, GrantCredits, Ls} = lists:foldl( - fun({rejected, _QName, Correlations}, Accum) -> + fun({rejected, #resource{name = QNameBin}, Reason, Correlations}, Accum) -> lists:foldl( fun({HandleInt, DeliveryId}, {Ids0, GrantCreds0, Links0} = Acc) -> case Links0 of @@ -730,8 +743,14 @@ handle_stashed_rejected(#state{cfg = #cfg{max_link_credit = MaxLinkCredit}, case maps:take(DeliveryId, U0) of {{_, Settled, _}, U} -> Ids1 = case Settled of - true -> Ids0; - false -> [DeliveryId | Ids0] + true -> + Ids0; + false -> + maps:update_with( + {QNameBin, Reason}, + fun(L) -> [DeliveryId | L] end, + [DeliveryId], + Ids0) end, Link1 = Link0#incoming_link{incoming_unconfirmed_map = U}, {Link, GrantCreds} = maybe_grant_link_credit( @@ -745,7 +764,7 @@ handle_stashed_rejected(#state{cfg = #cfg{max_link_credit = MaxLinkCredit}, Acc end end, Accum, Correlations) - end, {[], #{}, Links}, Actions), + end, {#{}, #{}, Links}, Actions), State = State0#state{stashed_rejected = [], incoming_links = Ls}, @@ -2116,7 +2135,7 @@ handle_queue_actions(Actions, State) -> lists:foldl( fun ({settled, _QName, _DelIds} = Action, #state{stashed_settled = As} = S) -> S#state{stashed_settled = [Action | As]}; - ({rejected, _QName, _DelIds} = Action, #state{stashed_rejected = As} = S) -> + ({rejected, _QName, _Reason, _DelIds} = Action, #state{stashed_rejected = As} = S) -> S#state{stashed_rejected = [Action | As]}; ({deliver, CTag, AckRequired, Msgs}, S0) -> lists:foldl(fun(Msg, S) -> @@ -2584,6 +2603,11 @@ rejected(DeliveryId, Error) -> settled = true, state = #'v1_0.rejected'{error = Error}}. +reject_reason_to_binary(maxlen) -> + <<"maxlen">>; +reject_reason_to_binary(down) -> + <<"unavailable">>. + maybe_grant_link_credit(Credit, MaxLinkCredit, DeliveryCount, NumUnconfirmed, Handle) -> case grant_link_credit(Credit, MaxLinkCredit, NumUnconfirmed) of true -> diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 3c7c865fcb00..19175cbd65b3 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -2787,7 +2787,7 @@ handle_queue_actions(Actions, State) -> lists:foldl( fun({settled, QRef, MsgSeqNos}, S0) -> confirm(MsgSeqNos, QRef, S0); - ({rejected, _QRef, MsgSeqNos}, S0) -> + ({rejected, _QRef, _Reason, MsgSeqNos}, S0) -> {U, Rej} = lists:foldr( fun(SeqNo, {U1, Acc}) -> diff --git a/deps/rabbit/src/rabbit_classic_queue.erl b/deps/rabbit/src/rabbit_classic_queue.erl index c817ef7fc110..ef54c61697e9 100644 --- a/deps/rabbit/src/rabbit_classic_queue.erl +++ b/deps/rabbit/src/rabbit_classic_queue.erl @@ -390,7 +390,7 @@ handle_event(QName, {reject_publish, SeqNo, _QPid}, %% It does not matter which queue rejected the message, %% if any queue did, it should not be confirmed. {U, Rejected} = reject_seq_no(SeqNo, U0), - Actions = [{rejected, QName, Rejected}], + Actions = [{rejected, QName, maxlen, Rejected}], {ok, State#?STATE{unconfirmed = U}, Actions}; handle_event(QName, {down, Pid, Info}, #?STATE{monitored = Monitored, unconfirmed = U0} = State0) -> @@ -405,11 +405,19 @@ handle_event(QName, {down, Pid, Info}, #?STATE{monitored = Monitored, maps:filter(fun (_, #msg_status{pending = Pids}) -> lists:member(Pid, Pids) end, U0)), - {Unconfirmed, Settled, Rejected} = - settle_seq_nos(MsgSeqNos, Pid, U0, down), - Actions = settlement_action( - settled, QName, Settled, - settlement_action(rejected, QName, Rejected, Actions0)), + {Unconfirmed, Settled, Rejected} = settle_seq_nos(MsgSeqNos, Pid, U0, down), + Actions1 = case Rejected of + [] -> + Actions0; + _ -> + [{rejected, QName, down, Rejected} | Actions0] + end, + Actions = case Settled of + [] -> + Actions1; + _ -> + [{settled, QName, Settled} | Actions1] + end, {ok, State#?STATE{unconfirmed = Unconfirmed}, Actions}; true -> %% any abnormal exit should be considered a full reject of the @@ -425,17 +433,12 @@ handle_event(QName, {down, Pid, Info}, #?STATE{monitored = Monitored, end, [], U0), U = maps:without(MsgIds, U0), {ok, State#?STATE{unconfirmed = U}, - [{rejected, QName, MsgIds} | Actions0]} + [{rejected, QName, down, MsgIds} | Actions0]} end; handle_event(_QName, Action, State) when element(1, Action) =:= credit_reply -> {ok, State, [Action]}. -settlement_action(_Type, _QRef, [], Acc) -> - Acc; -settlement_action(Type, QRef, MsgSeqs, Acc) -> - [{Type, QRef, MsgSeqs} | Acc]. - supports_stateful_delivery() -> true. -spec deliver([{amqqueue:target(), state()}], diff --git a/deps/rabbit/src/rabbit_fifo_dlx_worker.erl b/deps/rabbit/src/rabbit_fifo_dlx_worker.erl index 006431a38b4c..71003bfc3f63 100644 --- a/deps/rabbit/src/rabbit_fifo_dlx_worker.erl +++ b/deps/rabbit/src/rabbit_fifo_dlx_worker.erl @@ -232,7 +232,8 @@ wait_for_queue_deleted(QRef, N) -> end. -spec lookup_topology(state()) -> state(). -lookup_topology(#state{queue_ref = {resource, Vhost, queue, _} = QRef} = State) -> +lookup_topology(#state{queue_ref = #resource{virtual_host = Vhost, + kind = queue} = QRef} = State) -> {ok, Q} = rabbit_amqqueue:lookup(QRef), DLRKey = rabbit_queue_type_util:args_policy_lookup(<<"dead-letter-routing-key">>, fun(_Pol, QArg) -> QArg end, Q), @@ -253,7 +254,7 @@ handle_queue_actions(Actions, State0) -> S1 = handle_settled(QRef, MsgSeqs, S0), S2 = ack(S1), maybe_cancel_timer(S2); - ({rejected, QRef, MsgSeqs}, S0) -> + ({rejected, QRef, _Reason, MsgSeqs}, S0) -> handle_rejected(QRef, MsgSeqs, S0); ({queue_down, _QRef}, S0) -> %% target classic queue is down, but not deleted @@ -291,8 +292,8 @@ rejected(SeqNo, Qs, Pendings) Pendings); false -> ?LOG_DEBUG("Ignoring rejection for unknown sequence number ~b " - "from target dead letter queues ~tp", - [SeqNo, Qs]), + "from target dead letter queues ~tp", + [SeqNo, Qs]), Pendings end. diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index f1e421402c85..25598de91d6e 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -81,6 +81,7 @@ -type queue_name() :: rabbit_amqqueue:name(). -type queue_state() :: term(). +-type reject_reason() :: maxlen | down. %% sequence number typically -type correlation() :: term(). -type arguments() :: queue_arguments | consumer_arguments. @@ -101,6 +102,7 @@ %% indicate to the queue type module that a message has been delivered %% fully to the queue {settled, queue_name(), [correlation()]} | + {rejected, queue_name(), reject_reason(), [correlation()]} | {deliver, rabbit_types:ctag(), boolean(), [rabbit_amqqueue:qmsg()]} | {block | unblock, QueueName :: term()} | credit_reply_action(). @@ -158,6 +160,7 @@ credit_reply_action/0, action/0, actions/0, + reject_reason/0, settle_op/0, queue_type/0, credit/0, diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 03b147cb8a17..90a13066257d 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -1125,7 +1125,7 @@ deliver(QSs, Msg0, Options) -> case deliver0(QName, Correlation, Msg, S0) of {reject_publish, S} -> {[{Q, S} | Qs], - [{rejected, QName, [Correlation]} | Actions]}; + [{rejected, QName, maxlen, [Correlation]} | Actions]}; {ok, S, As} -> {[{Q, S} | Qs], As ++ Actions} end diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 0be929e63f3a..1f49a0796d36 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -63,7 +63,6 @@ groups() -> durable_field_quorum_queue, durable_field_stream, invalid_transfer_settled_flag, - quorum_queue_rejects, receiver_settle_mode_first, publishing_to_non_existing_queue_should_settle_with_released, attach_link_to_non_existing_destination, @@ -187,7 +186,8 @@ groups() -> dead_letter_into_stream, last_queue_confirms, target_queue_deleted, - target_classic_queue_down, + target_quorum_queue_rejects, + target_classic_queue_rejects, async_notify_settled_classic_queue, async_notify_settled_quorum_queue, async_notify_settled_stream, @@ -1051,50 +1051,6 @@ invalid_transfer_settled_flag(Config) -> ok = end_session_sync(Session), ok = close_connection_sync(Connection). -quorum_queue_rejects(Config) -> - {_, Session, LinkPair} = Init = init(Config), - QName = atom_to_binary(?FUNCTION_NAME), - QProps = #{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}, - <<"x-max-length">> => {ulong, 1}, - <<"x-overflow">> => {utf8, <<"reject-publish">>}}}, - {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps), - - Address = rabbitmq_amqp_address:queue(QName), - {ok, Sender} = amqp10_client:attach_sender_link( - Session, <<"test-sender">>, Address, mixed), - ok = wait_for_credit(Sender), - - %% Quorum queue's x-max-length limit is known to be off by 1. - %% Therefore, we expect the first 2 messages to be accepted. - ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag a">>, <<>>, false)), - ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag b">>, <<>>, false)), - [receive {amqp10_disposition, {accepted, DTag}} -> ok - after 30000 -> ct:fail({missing_accepted, DTag}) - end || DTag <- [<<"tag a">>, <<"tag b">>]], - - %% From now on the quorum queue should reject our publishes. - %% Send many messages aync. - NumMsgs = 20, - DTags = [begin - DTag = integer_to_binary(N), - Msg = amqp10_msg:new(DTag, <<"body">>, false), - ok = amqp10_client:send_msg(Sender, Msg), - DTag - end || N <- lists:seq(1, NumMsgs)], - %% Since our sender settle mode is mixed, let's also test sending one as settled. - ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag c">>, <<>>, true)), - %% and the final one as unsettled again - ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag d">>, <<>>, false)), - - [receive {amqp10_disposition, {rejected, DTag}} -> ok - after 30000 -> ct:fail({missing_rejected, DTag}) - end || DTag <- DTags ++ [<<"tag d">>]], - - ok = amqp10_client:detach_link(Sender), - ?assertMatch({ok, #{message_count := 2}}, - rabbitmq_amqp_client:delete_queue(LinkPair, QName)), - ok = close(Init). - receiver_settle_mode_first(Config) -> QName = atom_to_binary(?FUNCTION_NAME), {Connection, Session, LinkPair} = init(Config), @@ -3616,69 +3572,154 @@ target_queue_deleted(Config) -> amqp_channel:call(Ch, #'queue.delete'{queue = QuorumQ})), ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). -target_classic_queue_down(Config) -> - ClassicQueueNode = 2, - {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel( - Config, ClassicQueueNode), - QName = atom_to_binary(?FUNCTION_NAME), - Address = rabbitmq_amqp_address:queue(QName), - #'queue.declare_ok'{} = amqp_channel:call( - Ch, #'queue.declare'{ - queue = QName, - durable = true, - arguments = [{<<"x-queue-type">>, longstr, <<"classic">>}]}), - ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), +target_quorum_queue_rejects(Config) -> + {_, Session, LinkPair} = Init = init(Config), + QName = <<"🎄"/utf8>>, + QProps = #{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}, + <<"x-max-length">> => {ulong, 1}, + <<"x-overflow">> => {utf8, <<"reject-publish">>}}}, + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps), - OpnConf = connection_config(Config), - {ok, Connection} = amqp10_client:open_connection(OpnConf), - {ok, Session} = amqp10_client:begin_session_sync(Connection), - {ok, Receiver1} = amqp10_client:attach_receiver_link(Session, <<"receiver 1">>, Address), - {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address, unsettled), + Address = rabbitmq_amqp_address:queue(QName), + {ok, Sender} = amqp10_client:attach_sender_link( + Session, <<"test-sender">>, Address, mixed), ok = wait_for_credit(Sender), - DTag1 = <<"t1">>, - ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag1, <<"m1">>, false)), - ok = wait_for_accepted(DTag1), + %% Quorum queue's x-max-length limit is known to be off by 1. + %% Therefore, we expect the first 2 messages to be accepted. + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag a">>, <<>>, false)), + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag b">>, <<>>, false)), + ok = wait_for_accepted(<<"tag a">>), + ok = wait_for_accepted(<<"tag b">>), - {ok, Msg1} = amqp10_client:get_msg(Receiver1), - ?assertEqual([<<"m1">>], amqp10_msg:body(Msg1)), + %% From now on the quorum queue should reject our publishes. + %% Send many messages async. + NumMsgs = 20, + DTags = [begin + DTag = integer_to_binary(N), + Msg = amqp10_msg:new(DTag, <<"body">>, false), + ok = amqp10_client:send_msg(Sender, Msg), + DTag + end || N <- lists:seq(1, NumMsgs)], + %% Since our sender settle mode is mixed, let's also test sending one as settled. + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag c">>, <<>>, true)), + %% and the final one as unsettled again + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag d">>, <<>>, false)), + + [receive {amqp10_disposition, {{rejected, Error}, DTag}} -> + ?assertMatch( + #'v1_0.error'{ + info = {map, [{{symbol, <<"queue">>}, {utf8, QName}}, + {{symbol, <<"reason">>}, {symbol, <<"maxlen">>}}]}}, + Error) + after 9000 -> ct:fail({missing_rejected, DTag}) + end || DTag <- DTags ++ [<<"tag d">>]], - %% Make classic queue down. + ok = amqp10_client:detach_link(Sender), + ?assertMatch({ok, #{message_count := 2}}, + rabbitmq_amqp_client:delete_queue(LinkPair, QName)), + ok = close(Init). + +%% Bind two classic queues to the fanout exchange. +%% 1st queue rejects due to `maxlen`. +%% 2nd queue rejects due to `unavailable`. +target_classic_queue_rejects(Config) -> + QType = <<"classic">>, + QName1 = <<"classic queue 1">>, + QName2 = <<"classic queue 2">>, + Address1 = rabbitmq_amqp_address:queue(QName1), + Address2 = rabbitmq_amqp_address:queue(QName2), + + %% Declare 2nd queue on the node-1. + {_, _, LinkPair2} = Init2 = init(1, Config), + {ok, _} = rabbitmq_amqp_client:declare_queue( + LinkPair2, QName2, + #{arguments => #{<<"x-queue-type">> => {utf8, QType}, + <<"x-queue-leader-locator">> => {utf8, <<"client-local">>}}}), + ok = close(Init2), + ok = rabbit_ct_broker_helpers:await_metadata_store_consistent(Config, 0), + %% Declare 1st queue on the node-0. + {_, Session, LinkPair1} = Init1 = init(0, Config), + {ok, _} = rabbitmq_amqp_client:declare_queue( + LinkPair1, QName1, + #{arguments => #{<<"x-queue-type">> => {utf8, QType}, + <<"x-queue-leader-locator">> => {utf8, <<"client-local">>}, + <<"x-max-length">> => {ulong, 1}, + <<"x-overflow">> => {utf8, <<"reject-publish">>}}}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair1, QName1, <<"amq.fanout">>, <<>>, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair1, QName2, <<"amq.fanout">>, <<>>, #{}), + + {ok, Sender} = amqp10_client:attach_sender_link(Session, + <<"sender">>, + rabbitmq_amqp_address:exchange(<<"amq.fanout">>), + unsettled), + ok = wait_for_credit(Sender), + {ok, Receiver1} = amqp10_client:attach_receiver_link(Session, <<"receiver 1">>, Address1, settled), + {ok, Receiver2a} = amqp10_client:attach_receiver_link(Session, <<"receiver 2a">>, Address2, settled), + + %% This message should make it to both queues. + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t1">>, <<"m1">>)), + ok = wait_for_accepted(<<"t1">>), + {ok, R2Msg1} = amqp10_client:get_msg(Receiver2a), + ?assertEqual([<<"m1">>], amqp10_msg:body(R2Msg1)), + + %% This message should make it to only the 2nd queue. + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t2">>, <<"m2">>)), + ExpectedErr1 = #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED, + info = {map, [{{symbol, <<"queue">>}, {utf8, QName1}}, + {{symbol, <<"reason">>}, {symbol, <<"maxlen">>}}]}}, + ok = wait_for_settlement(<<"t2">>, {rejected, ExpectedErr1}), + {ok, R1Msg1} = amqp10_client:get_msg(Receiver1), + ?assertEqual([<<"m1">>], amqp10_msg:body(R1Msg1)), + {ok, RMsg2} = amqp10_client:get_msg(Receiver2a), + ?assertEqual([<<"m2">>], amqp10_msg:body(RMsg2)), + + %% Make 2nd classic queue down. flush("stopping node"), - ok = rabbit_ct_broker_helpers:stop_node(Config, ClassicQueueNode), + ok = rabbit_ct_broker_helpers:stop_node(Config, 1), %% We expect that the server closes links that receive from classic queues that are down. - ExpectedError = #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_ILLEGAL_STATE}, - receive {amqp10_event, {link, Receiver1, {detached, ExpectedError}}} -> ok - after 30_000 -> ct:fail({missing_event, ?LINE}) + ExpectedErr2 = #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_ILLEGAL_STATE}, + receive {amqp10_event, {link, Receiver2a, {detached, ExpectedErr2}}} -> ok + after 9000 -> ct:fail({missing_event, ?LINE}) end, %% However the server should not close links that send to classic queues that are down. receive Unexpected -> ct:fail({unexpected, Unexpected}) after 20 -> ok end, %% Instead, the server should reject messages that are sent to classic queues that are down. - DTag2 = <<"t2">>, - ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag2, <<"m2">>, false)), - ok = wait_for_settlement(DTag2, rejected), + %% This message should make it to only the 1st queue. + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t3">>, <<"m3">>)), + ExpectedErr3 = #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED, + info = {map, [{{symbol, <<"queue">>}, {utf8, QName2}}, + {{symbol, <<"reason">>}, {symbol, <<"unavailable">>}}]}}, + ok = wait_for_settlement(<<"t3">>, {rejected, ExpectedErr3}), + {ok, R1Msg3} = amqp10_client:get_msg(Receiver1), + ?assertEqual([<<"m3">>], amqp10_msg:body(R1Msg3)), - ok = rabbit_ct_broker_helpers:start_node(Config, ClassicQueueNode), - %% Now that the classic queue is up again, we should be able to attach a new receiver - %% and be able to send to and receive from the classic queue. - {ok, Receiver2} = amqp10_client:attach_receiver_link(Session, <<"receiver 2">>, Address), - receive {amqp10_event, {link, Receiver2, attached}} -> ok - after 30000 -> ct:fail({missing_event, ?LINE}) + ok = rabbit_ct_broker_helpers:start_node(Config, 1), + %% Now that the 2nd classic queue is up again, we should be able to attach a new receiver + %% and be able to send to and receive again. + {ok, Receiver2b} = amqp10_client:attach_receiver_link(Session, <<"receiver 2b">>, Address2, settled), + receive {amqp10_event, {link, Receiver2b, attached}} -> ok + after 9000 -> ct:fail({missing_event, ?LINE}) end, - DTag3 = <<"t3">>, - ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag3, <<"m3">>, false)), - ok = wait_for_accepted(DTag3), - {ok, Msg3} = amqp10_client:get_msg(Receiver2), - ?assertEqual([<<"m3">>], amqp10_msg:body(Msg3)), + + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t4">>, <<"m4">>)), + ok = wait_for_accepted(<<"t4">>), + {ok, R1Msg4} = amqp10_client:get_msg(Receiver1), + {ok, R2Msg4} = amqp10_client:get_msg(Receiver2b), + ?assertEqual([<<"m4">>], amqp10_msg:body(R1Msg4)), + ?assertEqual([<<"m4">>], amqp10_msg:body(R2Msg4)), ok = amqp10_client:detach_link(Sender), - ok = amqp10_client:detach_link(Receiver2), - ok = delete_queue(Session, QName), - ok = end_session_sync(Session), - ok = close_connection_sync(Connection). + ok = amqp10_client:detach_link(Receiver1), + ok = amqp10_client:detach_link(Receiver2b), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair1, QName1), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair1, QName2), + ok = close(Init1). async_notify_settled_classic_queue(Config) -> async_notify(settled, <<"classic">>, Config). diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index e20c0b3267c3..3481366c90a7 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -2053,15 +2053,20 @@ handle_queue_actions(Actions, #state{} = State0) -> {ConfirmPktIds, U} = rabbit_mqtt_confirms:confirm(PktIds, QName, U0), send_puback(ConfirmPktIds, ?RC_SUCCESS, S), S#state{unacked_client_pubs = U}; - ({rejected, _QName, PktIds}, S0 = #state{unacked_client_pubs = U0, - cfg = #cfg{proto_ver = ProtoVer}}) -> + ({rejected, _QName, Reason, PktIds}, + #state{unacked_client_pubs = U0, + cfg = #cfg{proto_ver = ProtoVer}} = S0) -> {RejectPktIds, U} = rabbit_mqtt_confirms:reject(PktIds, U0), S = S0#state{unacked_client_pubs = U}, %% Negative acks are supported only in MQTT v5. In MQTT v3 and v4 we ignore %% rejected messages since we can only (but must not) send a positive ack. case ProtoVer of ?MQTT_PROTO_V5 -> - send_puback(RejectPktIds, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, S); + RC = case Reason of + maxlen -> ?RC_QUOTA_EXCEEDED; + _ -> ?RC_IMPLEMENTATION_SPECIFIC_ERROR + end, + send_puback(RejectPktIds, RC, S); _ -> ok end, diff --git a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl index c0316a21689c..d300fa738989 100644 --- a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl @@ -384,11 +384,13 @@ quorum_queue_rejects(Config) -> %% The queue will reject m3. V = ?config(mqtt_version, Config), if V =:= v3 orelse V =:= v4 -> - %% v3 and v4 do not support NACKs. Therefore, the server should drop the message. + %% v3 and v4 do not support NACKs. + %% Therefore, the server should drop the message. ?assertEqual(puback_timeout, util:publish_qos1_timeout(C, Name, <<"m3">>, 700)); V =:= v5 -> - %% v5 supports NACKs. Therefore, the server should send us a NACK. - ?assertMatch({ok, #{reason_code_name := implementation_specific_error}}, + %% v5 supports NACKs. + %% Therefore, the server should send us a NACK with the correct reason. + ?assertMatch({ok, #{reason_code_name := quota_exceeded}}, emqtt:publish(C, Name, <<"m3">>, qos1)) end, diff --git a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl index ae44bc97d39c..3f25d7c5c407 100644 --- a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl @@ -656,7 +656,8 @@ handle_dest_queue_actions(Actions, State) -> lists:foldl( fun({settled, QName, MsgSeqNos}, S0) -> confirm(MsgSeqNos, QName, S0); - ({rejected, _QName, MsgSeqNos}, #{dest := Dst = #{unconfirmed := U0}} = S0) -> + ({rejected, _QName, _Reason, MsgSeqNos}, + #{dest := Dst = #{unconfirmed := U0}} = S0) -> {U, Rej} = lists:foldr( fun(SeqNo, {U1, Acc}) -> diff --git a/release-notes/4.3.0.md b/release-notes/4.3.0.md index 324c63fadb21..cf2ede9d4d17 100644 --- a/release-notes/4.3.0.md +++ b/release-notes/4.3.0.md @@ -72,6 +72,20 @@ compared to other versions. ### Core Server +#### Enhancements + + * When a message is rejected by a queue, RabbitMQ now provides the queue name and rejection reason to AMQP 1.0 publishers + in the `Rejected` outcome. This is particularly useful when multiple queues are bound to an exchange, as it allows + publishers to identify which specific queue out of several target queues rejected the message and why + (e.g., maximum queue length reached or queue unavailable). Previously, publishers had no way to determine which queue + rejected their message or the reason for rejection. + + The queue name and reason are included in the `info` field of the `Rejected` outcome's `error` field: + * `queue: ` + * `reason: maxlen | unavailable` + + GitHub issue: [#15075](https://github.com/rabbitmq/rabbitmq-server/pull/15075) + #### Bug Fixes * Quorum queue at-most-once dead lettering for the overflow behaviour `drop-head` now happens in the correct order. @@ -114,6 +128,17 @@ compared to other versions. GitHub issue: [#14923](https://github.com/rabbitmq/rabbitmq-server/discussions/14923) +### MQTT Plugin + +#### Enhancements + + * For MQTT 5.0 publishers, when a message is rejected because the target queue's maximum length is exceeded, + RabbitMQ now returns a `Quota exceeded` reason code in the PUBACK packet. This provides publishers with + actionable information about why their message was rejected. + + GitHub issue: [#15075](https://github.com/rabbitmq/rabbitmq-server/pull/15075) + + ### Shovel Plugin #### Bug Fixes