Skip to content

Commit 235bd37

Browse files
committed
QQ: record lengths of each priority queue
And emit the number of messages for each priority in the overview map.
1 parent 74e06db commit 235bd37

File tree

3 files changed

+98
-76
lines changed

3 files changed

+98
-76
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ apply_(#{index := Idx} = Meta,
449449
{Life, _Credit, credited} ->
450450
{Life, credited}
451451
end,
452-
Priority = get_priority(ConsumerMeta),
452+
Priority = get_consumer_priority(ConsumerMeta),
453453
ConsumerKey = case consumer_key_from_id(ConsumerId, State0) of
454454
{ok, K} ->
455455
K;
@@ -1747,10 +1747,10 @@ maybe_enqueue(RaftIdx, Ts, undefined, undefined, RawMsg,
17471747
Header0 = maybe_set_msg_ttl(RawMsg, Ts, Size, State0),
17481748
Header = maybe_set_msg_delivery_count(RawMsg, Header0),
17491749
Msg = make_msg(RaftIdx, Header),
1750-
PTag = priority_tag(RawMsg),
1750+
Priority = msg_priority(RawMsg),
17511751
State = State0#?STATE{msg_bytes_enqueue = Enqueue + Size,
17521752
messages_total = Total + 1,
1753-
messages = rabbit_fifo_pq:in(PTag, Msg, Messages)
1753+
messages = rabbit_fifo_pq:in(Priority, Msg, Messages)
17541754
},
17551755
{ok, State, Effects};
17561756
maybe_enqueue(RaftIdx, Ts, From, MsgSeqNo, RawMsg,
@@ -1780,10 +1780,10 @@ maybe_enqueue(RaftIdx, Ts, From, MsgSeqNo, RawMsg,
17801780
false ->
17811781
undefined
17821782
end,
1783-
PTag = priority_tag(RawMsg),
1783+
Priority = msg_priority(RawMsg),
17841784
State = State0#?STATE{msg_bytes_enqueue = BytesEnqueued + Size,
17851785
messages_total = Total + 1,
1786-
messages = rabbit_fifo_pq:in(PTag, Msg, Messages),
1786+
messages = rabbit_fifo_pq:in(Priority, Msg, Messages),
17871787
enqueuers = Enqueuers0#{From => Enq},
17881788
msg_cache = MsgCache
17891789
},
@@ -2835,17 +2835,17 @@ is_expired(Ts, #?STATE{cfg = #cfg{expires = Expires},
28352835
is_expired(_Ts, _State) ->
28362836
false.
28372837

2838-
get_priority(#{priority := Priority}) ->
2838+
get_consumer_priority(#{priority := Priority}) ->
28392839
Priority;
2840-
get_priority(#{args := Args}) ->
2840+
get_consumer_priority(#{args := Args}) ->
28412841
%% fallback, v3 option
28422842
case rabbit_misc:table_lookup(Args, <<"x-priority">>) of
28432843
{_Type, Value} ->
28442844
Value;
28452845
_ ->
28462846
0
28472847
end;
2848-
get_priority(_) ->
2848+
get_consumer_priority(_) ->
28492849
0.
28502850

28512851
notify_decorators_effect(QName, MaxActivePriority, IsEmpty) ->
@@ -3037,21 +3037,23 @@ maps_search(Pred, {K, V, I}) ->
30373037
maps_search(Pred, Map) when is_map(Map) ->
30383038
maps_search(Pred, maps:next(maps:iterator(Map))).
30393039

3040-
priority_tag(Msg) ->
3040+
-define(DEFAULT_PRIORITY, 4).
3041+
-define(MAX_PRIORITY, 31).
3042+
3043+
msg_priority(Msg) ->
30413044
case mc:is(Msg) of
30423045
true ->
30433046
case mc:priority(Msg) of
30443047
P when is_integer(P) ->
3045-
min(P, 31);
3048+
min(P, ?MAX_PRIORITY);
30463049
_ ->
3047-
4
3050+
?DEFAULT_PRIORITY
30483051
end;
30493052
false ->
3050-
4
3053+
?DEFAULT_PRIORITY
30513054
end.
30523055

3053-
do_snapshot(MacVer, Ts, Ch,
3054-
RaAux, DiscardedBytes, Force)
3056+
do_snapshot(MacVer, Ts, Ch, RaAux, DiscardedBytes, Force)
30553057
when element(1, Ch) == checkpoint andalso
30563058
is_integer(MacVer) andalso
30573059
MacVer >= 8 ->

deps/rabbit/src/rabbit_fifo_pq.erl

Lines changed: 68 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
-module(rabbit_fifo_pq).
99

1010
-include("rabbit_fifo.hrl").
11+
1112
-export([
1213
new/0,
1314
in/3,
@@ -22,68 +23,56 @@
2223
overview/1
2324
]).
2425

25-
% -define(NON_EMPTY, {_, [_|_]}).
26-
-define(EMPTY, {[], []}).
26+
-define(STATE, pq).
27+
-define(EMPTY, {0, [], []}).
2728

28-
%% a weighted priority queue with only two priorities
29+
%% supports 32 priorities, needs to be a power of 2 to support the De Bruijn
30+
%% lookup method. 64 would push the bitmap into an erlang big number so we
31+
%% have to settle for 32
2932
-type priority() :: 0..31.
30-
-type queue() :: {list(msg()), list(msg())}.
33+
-type queue() :: {non_neg_integer(), list(msg()), list(msg())}.
3134

32-
-record(?MODULE, {buckets = #{} :: #{priority() => queue()},
33-
len = 0 :: non_neg_integer(),
34-
bitmap = 0 :: integer()}).
35+
-record(?STATE, {buckets = #{} :: #{priority() => queue()},
36+
len = 0 :: non_neg_integer(),
37+
bitmap = 0 :: integer()}).
3538

36-
-opaque state() :: #?MODULE{}.
39+
-opaque state() :: #?STATE{}.
3740

3841
-export_type([state/0,
3942
priority/0]).
4043

4144
-spec new() -> state().
4245
new() ->
43-
#?MODULE{}.
46+
#?STATE{}.
4447

4548
-spec in(priority(), msg(), state()) -> state().
46-
in(Priority0, Item, #?MODULE{buckets = Buckets0,
47-
bitmap = Bitmap0,
48-
len = Len} = State)
49+
in(Priority0, Item, #?STATE{buckets = Buckets0,
50+
bitmap = Bitmap0,
51+
len = Len} = State)
4952
when Priority0 >= 0 andalso
5053
Priority0 =< 31 ->
5154
%% invert priority
5255
Priority = 31 - Priority0,
5356
case Buckets0 of
5457
#{Priority := Queue0} ->
5558
%% there are messages for the priority already
56-
State#?MODULE{buckets = Buckets0#{Priority => in(Item, Queue0)},
57-
len = Len + 1};
59+
State#?STATE{buckets = Buckets0#{Priority => in(Item, Queue0)},
60+
len = Len + 1};
5861
_ ->
5962
Bitmap = Bitmap0 bor (1 bsl Priority),
6063
%% there are no messages for the priority
61-
State#?MODULE{buckets = Buckets0#{Priority => in(Item, ?EMPTY)},
62-
bitmap = Bitmap,
63-
len = Len + 1}
64+
State#?STATE{buckets = Buckets0#{Priority => in(Item, ?EMPTY)},
65+
bitmap = Bitmap,
66+
len = Len + 1}
6467
end.
6568

66-
first_set_bit(0) ->
67-
32;
68-
first_set_bit(Bitmap) ->
69-
count_trailing(Bitmap band -Bitmap).
70-
71-
-define(DEBRUIJN_SEQ, 16#077CB531).
72-
-define(DEBRUIJN_LOOKUP,
73-
{0, 1, 28, 2, 29, 14, 24, 3, 30, 22, 20, 15, 25, 17, 4, 8,
74-
31, 27, 13, 23, 21, 19, 16, 7, 26, 12, 18, 6, 11, 5, 10, 9}).
75-
76-
count_trailing(N) ->
77-
Lookup = ((N * ?DEBRUIJN_SEQ) bsr 27) band 31,
78-
element(Lookup + 1, ?DEBRUIJN_LOOKUP).
79-
8069
-spec out(state()) ->
8170
empty | {msg(), state()}.
82-
out(#?MODULE{len = 0}) ->
71+
out(#?STATE{len = 0}) ->
8372
empty;
84-
out(#?MODULE{buckets = Buckets,
85-
len = Len,
86-
bitmap = Bitmap0} = State) ->
73+
out(#?STATE{buckets = Buckets,
74+
len = Len,
75+
bitmap = Bitmap0} = State) ->
8776
Priority = first_set_bit(Bitmap0),
8877
#{Priority := Q0} = Buckets,
8978
Msg = peek(Q0),
@@ -93,25 +82,25 @@ out(#?MODULE{buckets = Buckets,
9382
%% as we know the bit is set we just need to xor rather than
9483
%% create a mask then xor
9584
Bitmap = Bitmap0 bxor (1 bsl Priority),
96-
{Msg, State#?MODULE{buckets = maps:remove(Priority, Buckets),
97-
len = Len - 1,
98-
bitmap = Bitmap}};
85+
{Msg, State#?STATE{buckets = maps:remove(Priority, Buckets),
86+
len = Len - 1,
87+
bitmap = Bitmap}};
9988
Q ->
100-
{Msg, State#?MODULE{buckets = maps:put(Priority, Q, Buckets),
101-
len = Len - 1}}
89+
{Msg, State#?STATE{buckets = maps:put(Priority, Q, Buckets),
90+
len = Len - 1}}
10291
end.
10392

10493
-spec get(state()) -> empty | msg().
105-
get(#?MODULE{len = 0}) ->
94+
get(#?STATE{len = 0}) ->
10695
empty;
107-
get(#?MODULE{buckets = Buckets,
108-
bitmap = Bitmap}) ->
96+
get(#?STATE{buckets = Buckets,
97+
bitmap = Bitmap}) ->
10998
Priority = first_set_bit(Bitmap),
11099
#{Priority := Q0} = Buckets,
111100
peek(Q0).
112101

113102
-spec len(state()) -> non_neg_integer().
114-
len(#?MODULE{len = Len}) ->
103+
len(#?STATE{len = Len}) ->
115104
Len.
116105

117106
-spec from_list([{priority(), term()}]) -> state().
@@ -131,17 +120,17 @@ from_lqueue(LQ) ->
131120
end, new(), LQ).
132121

133122
-spec indexes(state()) -> [ra:index()].
134-
indexes(#?MODULE{buckets = Buckets}) ->
123+
indexes(#?STATE{buckets = Buckets}) ->
135124
maps:fold(
136-
fun (_P, {L1, L2}, Acc0) ->
125+
fun (_P, {_, L1, L2}, Acc0) ->
137126
Acc = lists:foldl(fun msg_idx_fld/2, Acc0, L1),
138127
lists:foldl(fun msg_idx_fld/2, Acc, L2)
139128
end, [], Buckets).
140129

141130
-spec get_lowest_index(state()) -> undefined | ra:index().
142-
get_lowest_index(#?MODULE{len = 0}) ->
131+
get_lowest_index(#?STATE{len = 0}) ->
143132
undefined;
144-
get_lowest_index(#?MODULE{buckets = Buckets}) ->
133+
get_lowest_index(#?STATE{buckets = Buckets}) ->
145134
lists:min(
146135
maps:fold(fun (_, Q, Acc) ->
147136
case peek(Q) of
@@ -154,32 +143,38 @@ get_lowest_index(#?MODULE{buckets = Buckets}) ->
154143

155144
-spec overview(state()) ->
156145
#{len := non_neg_integer(),
146+
detail := #{priority() => pos_integer()},
157147
num_active_priorities := 0..32,
158148
lowest_index := ra:index()}.
159-
overview(#?MODULE{len = Len,
160-
buckets = Buckets} = State) ->
149+
overview(#?STATE{len = Len,
150+
buckets = Buckets} = State) ->
151+
Detail = maps:fold(fun (P0, {C, _, _}, Acc) ->
152+
P = 31-P0,
153+
Acc#{P => C}
154+
end, #{}, Buckets),
161155
#{len => Len,
156+
detail => Detail,
162157
num_active_priorities => map_size(Buckets),
163158
lowest_index => get_lowest_index(State)}.
164159

165-
%% internals
160+
%% INTERNAL
166161

167162
%% invariant, if the queue is non empty so is the Out (right) list.
168163
in(X, ?EMPTY) ->
169-
{[], [X]};
170-
in(X, {In, Out}) ->
171-
{[X | In], Out}.
164+
{1, [], [X]};
165+
in(X, {C, In, Out}) ->
166+
{C+1, [X | In], Out}.
172167

173168
peek(?EMPTY) ->
174169
empty;
175-
peek({_, [H | _]}) ->
170+
peek({_, _, [H | _]}) ->
176171
H.
177172

178-
drop({In, [_]}) ->
173+
drop({C, In, [_]}) ->
179174
%% the last Out one
180-
{[], lists:reverse(In)};
181-
drop({In, [_ | Out]}) ->
182-
{In, Out}.
175+
{C-1, [], lists:reverse(In)};
176+
drop({C, In, [_ | Out]}) ->
177+
{C-1, In, Out}.
183178

184179
msg_idx_fld(Msg, Acc) when is_list(Acc) ->
185180
[msg_idx(Msg) | Acc].
@@ -194,3 +189,17 @@ to_list(empty, Acc) ->
194189
to_list({Item, State}, Acc) ->
195190
to_list(out(State), [Item | Acc]).
196191

192+
first_set_bit(0) ->
193+
32;
194+
first_set_bit(Bitmap) ->
195+
count_trailing(Bitmap band -Bitmap).
196+
197+
-define(DEBRUIJN_SEQ, 16#077CB531).
198+
-define(DEBRUIJN_LOOKUP,
199+
{0, 1, 28, 2, 29, 14, 24, 3, 30, 22, 20, 15, 25, 17, 4, 8,
200+
31, 27, 13, 23, 21, 19, 16, 7, 26, 12, 18, 6, 11, 5, 10, 9}).
201+
202+
count_trailing(N) ->
203+
Lookup = ((N * ?DEBRUIJN_SEQ) bsr 27) band 31,
204+
element(Lookup + 1, ?DEBRUIJN_LOOKUP).
205+

deps/rabbit/test/rabbit_fifo_pq_SUITE.erl

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ basics(_Config) ->
6565

6666
?assertEqual([1,2,3,4,5], lists:sort(rabbit_fifo_pq:indexes(Q1))),
6767
?assertMatch(#{len := 5,
68+
detail := #{1 := 1,
69+
2 := 1,
70+
3 := 1,
71+
4 := 1,
72+
5 := 1},
6873
num_active_priorities := 5,
6974
lowest_index := 1}, rabbit_fifo_pq:overview(Q1)),
7075
{?MSG(5), Q2} = rabbit_fifo_pq:out(Q1),
@@ -112,14 +117,16 @@ get_lowest_index(_Config) ->
112117
?assertEqual(1, rabbit_fifo_q:get_lowest_index(Q3)),
113118
?assertEqual(2, rabbit_fifo_q:get_lowest_index(Q4)),
114119
?assertEqual(3, rabbit_fifo_q:get_lowest_index(Q5)),
115-
?assertEqual(undefined, rabbit_fifo_q:get_lowest_index(Q6)).
120+
?assertEqual(undefined, rabbit_fifo_q:get_lowest_index(Q6)),
121+
ok.
122+
116123

117124
property(_Config) ->
118125
run_proper(
119126
fun () ->
120127
?FORALL(Ops, op_gen(256),
121128
queue_prop(Ops))
122-
end, [], 25),
129+
end, [], 100),
123130
ok.
124131

125132
queue_prop(Ops) ->
@@ -130,11 +137,15 @@ queue_prop(Ops) ->
130137

131138
Sut0 = rabbit_fifo_pq:from_list(Ops),
132139
Out = rabbit_fifo_pq:to_list(Sut0),
140+
#{detail := Detail,
141+
len := Len} = rabbit_fifo_pq:overview(Sut0),
142+
DetailSum = maps:fold(fun (_, C, Acc) -> Acc + C end, 0, Detail),
143+
DetailSum == Len andalso
133144
[element(2, O) || O <- SortedOps] == Out.
134145

135146
%%% helpers
136147

137-
-type item() :: {rabbit_fifo_pq:priority(), integer()}.
148+
-type item() :: {rabbit_fifo_pq:priority(), non_neg_integer()}.
138149
op_gen(Size) ->
139150
?LET(Ops, resize(Size, list(item())), Ops).
140151

0 commit comments

Comments
 (0)