Skip to content

Commit 67a3b13

Browse files
ansdkjnilsson
authored andcommitted
Redeliver in correct order
delivery_effects/2 expects the messages in reversed order (and will from now on return the effects in the correct order).
1 parent 417a2d9 commit 67a3b13

File tree

1 file changed

+16
-17
lines changed

1 file changed

+16
-17
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -744,14 +744,13 @@ snapshot_installed(_Meta, #?MODULE{cfg = #cfg{},
744744
Acc) ->
745745
case node(Pid) == node() of
746746
true ->
747-
Iter = maps:iterator(Checked, ordered),
747+
Iter = maps:iterator(Checked, reversed),
748748
Acc#{{Tag, Pid} => maps:to_list(Iter)};
749749
false ->
750750
Acc
751751
end
752752
end, #{}, Consumers),
753-
Effs = add_delivery_effects([], SendAcc, State),
754-
Effs.
753+
delivery_effects(SendAcc, State).
755754

756755
convert_v7_to_v8(#{} = _Meta, StateV7) ->
757756
%% the structure is intact for now
@@ -2063,9 +2062,8 @@ checkout0(Meta, {success, ConsumerKey, MsgId,
20632062
SendAcc0#{ConsumerKey => [DelMsg | LogMsgs]}
20642063
end,
20652064
checkout0(Meta, checkout_one(Meta, ExpiredMsg, State, Effects), SendAcc);
2066-
checkout0(_Meta, {_Activity, ExpiredMsg, State0, Effects0}, SendAcc) ->
2067-
Effects = add_delivery_effects([], SendAcc, State0),
2068-
{State0, ExpiredMsg, Effects0 ++ lists:reverse(Effects)}.
2065+
checkout0(_Meta, {_Activity, ExpiredMsg, State, Effects}, SendAcc) ->
2066+
{State, ExpiredMsg, Effects ++ delivery_effects(SendAcc, State)}.
20692067

20702068
evaluate_limit(Idx, State1, State2, OuterEffects) ->
20712069
case evaluate_limit0(Idx, State1, State2, []) of
@@ -2154,18 +2152,19 @@ chunk_disk_msgs([{_MsgId, Msg} = ConsumerMsg | Rem], Bytes,
21542152
Size = get_header(size, get_msg_header(Msg)),
21552153
chunk_disk_msgs(Rem, Bytes + Size, [[ConsumerMsg | CurChunk] | Chunks]).
21562154

2157-
add_delivery_effects(Effects0, AccMap, _State)
2158-
when map_size(AccMap) == 0 ->
2155+
delivery_effects(AccMap, _State)
2156+
when map_size(AccMap) =:= 0 ->
21592157
%% does this ever happen?
2160-
Effects0;
2161-
add_delivery_effects(Effects0, AccMap, State) ->
2162-
maps:fold(fun (C, DiskMsgs, Efs)
2163-
when is_list(DiskMsgs) ->
2164-
lists:foldl(
2165-
fun (Msgs, E) ->
2166-
[delivery_effect(C, Msgs, State) | E]
2167-
end, Efs, chunk_disk_msgs(DiskMsgs, 0, [[]]))
2168-
end, Effects0, AccMap).
2158+
[];
2159+
delivery_effects(AccMap, State) ->
2160+
Effs = maps:fold(fun(C, DiskMsgs, Efs)
2161+
when is_list(DiskMsgs) ->
2162+
lists:foldl(
2163+
fun (Msgs, E) ->
2164+
[delivery_effect(C, Msgs, State) | E]
2165+
end, Efs, chunk_disk_msgs(DiskMsgs, 0, [[]]))
2166+
end, [], AccMap),
2167+
lists:reverse(Effs).
21692168

21702169
take_next_msg(#?STATE{returns = Returns0,
21712170
messages = Messages0} = State) ->

0 commit comments

Comments
 (0)