Skip to content

Commit 1ad0dc8

Browse files
committed
QQ: wip - improve message expiry
1 parent 5a56c37 commit 1ad0dc8

File tree

7 files changed

+368
-117
lines changed

7 files changed

+368
-117
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 171 additions & 104 deletions
Large diffs are not rendered by default.

deps/rabbit/src/rabbit_fifo.hrl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@
250250
% rabbit_fifo_index can be slow when calculating the smallest
251251
% index when there are large gaps but should be faster than gb_trees
252252
% for normal appending operations as it's backed by a map
253-
unused_0 = ?NIL,
253+
last_command_time = 0,
254254
unused_1 = ?NIL,
255255
% consumers need to reflect consumer state at time of snapshot
256256
consumers = #{} :: #{consumer_key() => consumer()},

deps/rabbit/src/rabbit_fifo_pq.erl

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020
from_lqueue/1,
2121
indexes/1,
2222
get_lowest_index/1,
23-
overview/1
23+
overview/1,
24+
take_while/2,
25+
any_priority_next/2,
26+
fold_priorities_next/3
2427
]).
2528

2629
-define(STATE, pq).
@@ -156,8 +159,75 @@ overview(#?STATE{len = Len,
156159
num_active_priorities => map_size(Buckets),
157160
lowest_index => get_lowest_index(State)}.
158161

162+
-spec take_while(fun ((msg()) -> boolean()), state()) ->
163+
{[msg()], state()}.
164+
take_while(Fun, #?STATE{len = Len,
165+
buckets = Buckets0} = State)
166+
when is_function(Fun) ->
167+
{Buckets, Acc} = maps:fold(
168+
fun (P, Q0, {B0, Items0}) ->
169+
case take_while(Q0, Fun, Items0) of
170+
{?EMPTY, Items} ->
171+
{maps:remove(P, B0), Items};
172+
{Q, Items} ->
173+
{B0#{P => Q}, Items}
174+
end
175+
end,
176+
{Buckets0, []},
177+
maps:iterator(Buckets0, ordered)),
178+
179+
%% TODO: optimise updates
180+
%% update bitmap
181+
Bitmap = maps:fold(fun (P, _Q, B) -> B bor (1 bsl P) end, 0, Buckets),
182+
183+
{lists:reverse(Acc),
184+
State#?STATE{len = Len - length(Acc),
185+
buckets = Buckets,
186+
bitmap = Bitmap}}.
187+
188+
-spec any_priority_next(fun ((msg()) -> boolean()), state()) ->
189+
boolean().
190+
any_priority_next(Fun, #?STATE{buckets = Buckets0})
191+
when is_function(Fun) ->
192+
maps_any(Fun, maps:next(maps:iterator(Buckets0))).
193+
194+
-spec fold_priorities_next(fun ((msg(), Acc) -> Acc), Acc, state()) ->
195+
Acc when Acc :: term().
196+
fold_priorities_next(Fun, Acc, #?STATE{buckets = Buckets0})
197+
when is_function(Fun) ->
198+
maps:fold(fun (_P, Q, A) ->
199+
Fun(peek(Q), A)
200+
end, Acc, Buckets0).
201+
159202
%% INTERNAL
160203

204+
maps_any(_Fun, none) ->
205+
false;
206+
maps_any(Fun, {_, Q, I}) ->
207+
case Fun(peek(Q)) of
208+
true ->
209+
true;
210+
false ->
211+
maps_any(Fun, maps:next(I))
212+
end.
213+
214+
take_while(?EMPTY, _Fun, Acc) ->
215+
{?EMPTY, Acc};
216+
take_while(Q, Fun, Acc) ->
217+
case peek(Q) of
218+
empty ->
219+
{Q, Acc};
220+
Msg ->
221+
case Fun(Msg) of
222+
true ->
223+
take_while(drop(Q), Fun, [Msg | Acc]);
224+
false ->
225+
{Q, Acc}
226+
end
227+
end.
228+
229+
230+
161231
%% invariant, if the queue is non empty so is the Out (right) list.
162232
in(X, ?EMPTY) ->
163233
{1, [], [X]};

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1987,7 +1987,7 @@ make_ra_conf(Q, ServerId, Membership, MacVersion)
19871987
Membership, MacVersion).
19881988

19891989
make_ra_conf(Q, ServerId, TickTimeout,
1990-
_SnapshotInterval, CheckpointInterval,
1990+
_SnapshotInterval, _CheckpointInterval,
19911991
Membership, MacVersion) ->
19921992
QName = amqqueue:get_name(Q),
19931993
#resource{name = QNameBin} = QName,
@@ -1998,8 +1998,9 @@ make_ra_conf(Q, ServerId, TickTimeout,
19981998
Formatter = {?MODULE, format_ra_event, [QName]},
19991999
LogCfg = #{uid => UId,
20002000
min_snapshot_interval => 0,
2001-
min_checkpoint_interval => CheckpointInterval,
2002-
max_checkpoints => 3},
2001+
% min_checkpoint_interval => CheckpointInterval,
2002+
% max_checkpoints => 3,
2003+
major_compaction_strategy => {num_minors, 32}},
20032004
rabbit_misc:maps_put_truthy(membership, Membership,
20042005
#{cluster_name => ClusterName,
20052006
id => ServerId,

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2147,6 +2147,8 @@ dead_letter_policy(Config) ->
21472147
%% Test that messages are at most once dead letter in the correct order
21482148
%% for reason 'maxlen'.
21492149
at_most_once_dead_letter_order_maxlen(Config) ->
2150+
check_quorum_queues_v8_compat(Config),
2151+
21502152
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
21512153

21522154
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
@@ -2261,16 +2263,18 @@ at_most_once_dead_letter_order_delivery_limit(Config) ->
22612263
#'basic.publish'{routing_key = QQ},
22622264
#amqp_msg{payload = <<"m2">>}),
22632265

2264-
ok = subscribe(Ch, QQ, false),
2266+
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
2267+
ok = subscribe(Ch2, QQ, false),
22652268
receive {_, #amqp_msg{payload = P1}} ->
22662269
?assertEqual(<<"m1">>, P1)
22672270
end,
22682271
receive {_, #amqp_msg{payload = P2}} ->
22692272
?assertEqual(<<"m2">>, P2)
22702273
end,
2271-
ok = amqp_channel:call(Ch, #'basic.nack'{delivery_tag = 0,
2272-
multiple = true,
2273-
requeue = true}),
2274+
amqp_channel:close(Ch2),
2275+
% ok = amqp_channel:call(Ch, #'basic.nack'{delivery_tag = 0,
2276+
% multiple = true,
2277+
% requeue = true}),
22742278

22752279
wait_for_consensus(DLQ, Config),
22762280
wait_for_messages_ready(Servers, ra_name(DLQ), 2),
@@ -5337,7 +5341,7 @@ check_quorum_queues_v8_compat(Config) ->
53375341
true ->
53385342
ok;
53395343
false ->
5340-
throw({skip, "test will only work on QQ machine version > 8"})
5344+
throw({skip, "test will only work on QQ machine version >= 8"})
53415345
end.
53425346

53435347
lists_interleave([], _List) ->

deps/rabbit/test/rabbit_fifo_SUITE.erl

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1620,8 +1620,11 @@ single_active_consumer_state_enter_leader_include_waiting_consumers_test(Config)
16201620
],
16211621
{State1, _} = run_log(Config, State0, Entries),
16221622
Effects = rabbit_fifo:state_enter(leader, State1),
1623-
%% 2 effects for each consumer process (channel process), 1 effect for the node,
1624-
?assertEqual(2 * 3 + 1 + 1 + 1, length(Effects)).
1623+
ct:pal("Efx ~p", [Effects]),
1624+
%% 2 effects for each consumer process (channel process),
1625+
%% 1 effect for the node,
1626+
%% 1 for decorators
1627+
?assertEqual(2 * 3 + 1 + 1, length(Effects)).
16251628

16261629
single_active_consumer_state_enter_eol_include_waiting_consumers_test(Config) ->
16271630
Resource = rabbit_misc:r("/", queue, ?FUNCTION_NAME_B),
@@ -3223,6 +3226,39 @@ modify_test(Config) ->
32233226

32243227
ok.
32253228

3229+
priorities_expire_test(Config) ->
3230+
State0 = init(#{name => ?FUNCTION_NAME,
3231+
queue_resource => rabbit_misc:r("/", queue,
3232+
?FUNCTION_NAME_B)}),
3233+
Pid1 = spawn(fun() -> ok end),
3234+
3235+
Entries =
3236+
[
3237+
{?LINE, make_enqueue(Pid1, 1,
3238+
mk_mc(<<"p1">>, #'P_basic'{priority = 9,
3239+
expiration = <<"100">>}))},
3240+
{?LINE, make_enqueue(Pid1, 2,
3241+
mk_mc(<<"p1">>, #'P_basic'{priority = 9,
3242+
expiration = <<"100000">>}))},
3243+
{?LINE, make_enqueue(Pid1, 3,
3244+
mk_mc(<<"p7">>, #'P_basic'{priority = 7,
3245+
expiration = <<"100">>}))},
3246+
{?LINE, make_enqueue(Pid1, 4,
3247+
mk_mc(<<"p7">>, #'P_basic'{priority = 7,
3248+
expiration = <<"100000">>}))},
3249+
{?LINE, make_enqueue(Pid1, 5,
3250+
mk_mc(<<"p7b">>, #'P_basic'{priority = 3}))},
3251+
3252+
{?LINE + 101, {timeout, {expire_msgs, shallow}}},
3253+
3254+
?ASSERT(_, fun(State) ->
3255+
?assertMatch(#{num_messages := 3},
3256+
rabbit_fifo:overview(State))
3257+
end)
3258+
],
3259+
{_State2, _} = run_log(Config, State0, Entries),
3260+
ok.
3261+
32263262
%% Utility
32273263
%%
32283264

@@ -3232,6 +3268,7 @@ apply(Meta, Entry, State) -> rabbit_fifo:apply(Meta, Entry, State).
32323268
init_aux(Conf) -> rabbit_fifo:init_aux(Conf).
32333269
handle_aux(S, T, C, A, A2) -> rabbit_fifo:handle_aux(S, T, C, A, A2).
32343270
make_checkout(C, S, M) -> rabbit_fifo:make_checkout(C, S, M).
3271+
make_enqueue(P, S, M) -> rabbit_fifo:make_enqueue(P, S, M).
32353272

32363273
cid(A) when is_atom(A) ->
32373274
atom_to_binary(A, utf8).
@@ -3242,10 +3279,13 @@ single_active_invariant( #rabbit_fifo{consumers = Cons}) ->
32423279
end, Cons)).
32433280

32443281
mk_mc(Body) ->
3282+
mk_mc(Body, #'P_basic'{}).
3283+
3284+
mk_mc(Body, BasicProps) ->
32453285
mc_amqpl:from_basic_message(
32463286
#basic_message{routing_keys = [<<"">>],
32473287
exchange_name = #resource{name = <<"x">>,
32483288
kind = exchange,
32493289
virtual_host = <<"v">>},
3250-
content = #content{properties = #'P_basic'{},
3290+
content = #content{properties = BasicProps,
32513291
payload_fragments_rev = [Body]}}).

deps/rabbit/test/rabbit_fifo_pq_SUITE.erl

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ all() ->
1717
all_tests() ->
1818
[
1919
basics,
20+
take_while,
21+
any_priority_next,
2022
property
2123
].
2224

@@ -80,6 +82,73 @@ basics(_Config) ->
8082
empty = rabbit_fifo_pq:out(Q6),
8183
ok.
8284

85+
take_while(_Config) ->
86+
Q1 = lists:foldl(
87+
fun ({P, I}, Q) ->
88+
rabbit_fifo_pq:in(P, I, Q)
89+
end, rabbit_fifo_pq:new(),
90+
[
91+
{1, ?MSG(1)}, {1, ?MSG(2)}, {1, ?MSG(3)},
92+
{2, ?MSG(1)}, {2, ?MSG(2)}, {2, ?MSG(3)},
93+
{3, ?MSG(1)}, {3, ?MSG(2)}, {3, ?MSG(3)},
94+
{4, ?MSG(1)}, {4, ?MSG(2)}, {4, ?MSG(3)},
95+
{5, ?MSG(1, 10)}, {5, ?MSG(2, 20)}, {5, ?MSG(3, 30)}
96+
]),
97+
98+
{Taken, Q2} = rabbit_fifo_pq:take_while(fun (?MSG(I, _)) ->
99+
I < 3
100+
end, Q1),
101+
?assertMatch([
102+
?MSG(1, 10), ?MSG(2, 20),
103+
?MSG(1, 1), ?MSG(2, 2),
104+
?MSG(1, 1), ?MSG(2, 2),
105+
?MSG(1, 1), ?MSG(2, 2),
106+
?MSG(1, 1), ?MSG(2, 2)
107+
], Taken),
108+
109+
110+
?assertEqual(5, rabbit_fifo_pq:len(Q2)),
111+
?assertEqual(10, length(Taken)),
112+
{?MSG(3, 30), Q3} = rabbit_fifo_pq:out(Q2),
113+
{?MSG(3), Q4} = rabbit_fifo_pq:out(Q3),
114+
{?MSG(3), Q5} = rabbit_fifo_pq:out(Q4),
115+
{?MSG(3), Q6} = rabbit_fifo_pq:out(Q5),
116+
{?MSG(3), _Q7} = rabbit_fifo_pq:out(Q6),
117+
118+
119+
{_Taken2, Q} = rabbit_fifo_pq:take_while(fun (?MSG(_, _)) ->
120+
true
121+
end, Q2),
122+
123+
ct:pal("Q ~p", [Q]),
124+
125+
ok.
126+
127+
any_priority_next(_Config) ->
128+
Q0 = rabbit_fifo_pq:new(),
129+
130+
?assertNot(rabbit_fifo_pq:any_priority_next(fun (_) -> true end, Q0)),
131+
132+
Q1 = lists:foldl(fun ({P, I}, Q) ->
133+
rabbit_fifo_pq:in(P, I, Q)
134+
end, Q0,
135+
[
136+
{1, ?MSG(1)}, {1, ?MSG(2)}, {1, ?MSG(3)},
137+
{2, ?MSG(1)}, {2, ?MSG(2)}, {2, ?MSG(3)},
138+
{3, ?MSG(2)}, {3, ?MSG(3)},
139+
{4, ?MSG(1)}, {4, ?MSG(2)}, {4, ?MSG(3)},
140+
{5, ?MSG(1)}, {5, ?MSG(2)}, {5, ?MSG(3)}
141+
]),
142+
143+
?assert(rabbit_fifo_pq:any_priority_next(fun (?MSG(I, _)) ->
144+
I > 1
145+
end, Q1)),
146+
?assertNot(rabbit_fifo_pq:any_priority_next(fun (?MSG(I, _)) ->
147+
I > 6
148+
end, Q1)),
149+
150+
ok.
151+
83152
hi_is_prioritised(_Config) ->
84153
Q0 = rabbit_fifo_q:new(),
85154
%% when `hi' has a lower index than the next 'no' then it is still

0 commit comments

Comments
 (0)