Skip to content

Commit ffd51af

Browse files
committed
rebaseable wip
1 parent 2efe652 commit ffd51af

2 files changed

Lines changed: 113 additions & 26 deletions

File tree

src/poolboy.erl

Lines changed: 68 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
{gen_fsm, sync_send_all_state_event, 2}]}).
1313

1414
-export([checkout/1, checkout/2, checkout/3, checkin/2, transaction/2,
15-
get_pool_size/1, set_pool_size/2,
15+
get_pool_size/1, set_pool_size/2, set_pool_size/3,
1616
child_spec/2, child_spec/3, start/1, start/2, start_link/1,
1717
start_link/2, stop/1, status/1]).
1818
-export([init/1, ready/2, ready/3, overflow/2, overflow/3, full/2, full/3,
@@ -71,13 +71,16 @@ transaction(Pool, Fun) ->
7171
ok = poolboy:checkin(Pool, Worker)
7272
end.
7373

74-
-spec get_pool_size(pid()) -> {ok, non_neg_integer()} | {error, notfound}.
74+
-spec get_pool_size(pid()) -> {non_neg_integer(), non_neg_integer()}.
7575
get_pool_size(Pid) ->
7676
gen_fsm:sync_send_all_state_event(Pid, get_pool_size).
7777

78-
-spec set_pool_size(pid(), non_neg_integer()) -> ok | {error, notfound}.
78+
-spec set_pool_size(pid(), non_neg_integer()) -> ok.
7979
set_pool_size(Pid, NewSize) ->
8080
gen_fsm:sync_send_all_state_event(Pid, {set_pool_size, NewSize}).
81+
-spec set_pool_size(pid(), non_neg_integer(), non_neg_integer()) -> ok.
82+
set_pool_size(Pid, NewSize, NewMaxOverflow) ->
83+
gen_fsm:sync_send_all_state_event(Pid, {set_pool_size, NewSize, NewMaxOverflow}).
8184

8285
-spec child_spec(Pool :: node(), PoolArgs :: proplists:proplist())
8386
-> supervisor:child_spec().
@@ -148,12 +151,20 @@ init([], _WorkerArgs, #state{size=Size, supervisor=Sup, max_overflow=MaxOverflow
148151
{ok, StartState, State#state{workers=Workers}}.
149152

150153
ready({checkin, Pid}, State) ->
154+
#state{supervisor = Sup} = State,
151155
Monitors = State#state.monitors,
152156
case ets:lookup(Monitors, Pid) of
153157
[{Pid, Ref}] ->
154158
true = erlang:demonitor(Ref),
155159
true = ets:delete(Monitors, Pid),
156-
Workers = queue:in(Pid, State#state.workers),
160+
Workers =
161+
case State#state.size < length(supervisor:which_children(Sup)) of
162+
true ->
163+
ok = dismiss_worker(Sup, Pid),
164+
State#state.workers;
165+
false ->
166+
queue:in(Pid, State#state.workers)
167+
end,
157168
{next_state, ready, State#state{workers=Workers}};
158169
[] ->
159170
{next_state, ready, State}
@@ -163,28 +174,37 @@ ready(_Event, State) ->
163174

164175
ready({checkout, Block, Timeout}, {FromPid, _}=From, State) ->
165176
#state{supervisor = Sup,
177+
size = Size,
166178
workers = Workers,
167179
monitors = Monitors,
168180
max_overflow = MaxOverflow} = State,
169-
case queue:out(Workers) of
170-
{{value, Pid}, Left} ->
171-
Ref = erlang:monitor(process, FromPid),
172-
true = ets:insert(Monitors, {Pid, Ref}),
173-
NextState = case queue:is_empty(Left) of
174-
true when MaxOverflow < 1 -> full;
175-
true -> overflow;
176-
false -> ready
177-
end,
178-
{reply, Pid, NextState, State#state{workers=Left}};
179-
{empty, Empty} when MaxOverflow > 0 ->
181+
case Size > length(supervisor:which_children(Sup)) of
182+
true ->
183+
%% we are here after a set_pool_size with Size > OldSize:
180184
{Pid, Ref} = new_worker(Sup, FromPid),
181185
true = ets:insert(Monitors, {Pid, Ref}),
182-
{reply, Pid, overflow, State#state{workers=Empty, overflow=1}};
183-
{empty, Empty} when Block =:= false ->
184-
{reply, full, full, State#state{workers=Empty}};
185-
{empty, Empty} ->
186-
Waiting = add_waiting(From, Timeout, State#state.waiting),
187-
{next_state, full, State#state{workers=Empty, waiting=Waiting}}
186+
{reply, Pid, ready, State};
187+
false ->
188+
case queue:out(Workers) of
189+
{{value, Pid}, Left} ->
190+
Ref = erlang:monitor(process, FromPid),
191+
true = ets:insert(Monitors, {Pid, Ref}),
192+
NextState = case queue:is_empty(Left) of
193+
true when MaxOverflow < 1 -> full;
194+
true -> overflow;
195+
false -> ready
196+
end,
197+
{reply, Pid, NextState, State#state{workers=Left}};
198+
{empty, Empty} when MaxOverflow > 0 ->
199+
{Pid, Ref} = new_worker(Sup, FromPid),
200+
true = ets:insert(Monitors, {Pid, Ref}),
201+
{reply, Pid, overflow, State#state{workers=Empty, overflow=1}};
202+
{empty, Empty} when Block =:= false ->
203+
{reply, full, full, State#state{workers=Empty}};
204+
{empty, Empty} ->
205+
Waiting = add_waiting(From, Timeout, State#state.waiting),
206+
{next_state, full, State#state{workers=Empty, waiting=Waiting}}
207+
end
188208
end;
189209
ready(_Event, _From, State) ->
190210
{reply, ok, ready, State}.
@@ -259,8 +279,24 @@ full(_Event, State) ->
259279
full({checkout, true, Timeout}, From, State) ->
260280
Waiting = add_waiting(From, Timeout, State#state.waiting),
261281
{next_state, full, State#state{waiting=Waiting}};
262-
full({checkout, false, _Timeout}, _From, State) ->
263-
{reply, full, full, State};
282+
full({checkout, false, _Timeout}, {FromPid, _}, State) ->
283+
#state{size = Size,
284+
monitors = Monitors,
285+
supervisor = Sup} = State,
286+
CurrentAllWorkersSize = length(supervisor:which_children(Sup)),
287+
if Size > CurrentAllWorkersSize ->
288+
{Pid, Ref} = new_worker(Sup, FromPid),
289+
true = ets:insert(Monitors, {Pid, Ref}),
290+
NextState =
291+
if Size > CurrentAllWorkersSize + 1 ->
292+
ready;
293+
el/=se ->
294+
full
295+
end,
296+
{reply, Pid, NextState, State};
297+
el/=se ->
298+
{reply, full, full, State}
299+
end;
264300
full(_Event, _From, State) ->
265301
{reply, ok, full, State}.
266302

@@ -283,9 +319,12 @@ handle_sync_event(get_all_monitors, _From, StateName, State) ->
283319
Monitors = ets:tab2list(State#state.monitors),
284320
{reply, Monitors, StateName, State};
285321
handle_sync_event(get_pool_size, _From, StateName, State) ->
286-
{reply, State#state.size, StateName, State};
322+
{reply, {State#state.size, State#state.max_overflow}, StateName, State};
287323
handle_sync_event({set_pool_size, NewSize}, _From, StateName, State) ->
288324
{reply, ok, StateName, State#state{size = NewSize}};
325+
handle_sync_event({set_pool_size, NewSize, NewMaxOverflow}, _From, StateName, State) ->
326+
{reply, ok, StateName, State#state{size = NewSize,
327+
max_overflow = NewMaxOverflow}};
289328
handle_sync_event(stop, _From, _StateName, State) ->
290329
Sup = State#state.supervisor,
291330
true = exit(Sup, shutdown),
@@ -382,7 +421,9 @@ checkin_while_full(Pid, State) ->
382421
waiting = Waiting,
383422
monitors = Monitors,
384423
max_overflow = MaxOverflow,
385-
overflow = Overflow} = State,
424+
overflow = Overflow,
425+
size = Size} = State,
426+
EffectieSize = length(supervisor:which_children(Sup)),
386427
case queue:out(Waiting) of
387428
{{value, {{FromPid, _}=From, Timeout, StartTime}}, Left} ->
388429
case wait_valid(StartTime, Timeout) of
@@ -394,7 +435,8 @@ checkin_while_full(Pid, State) ->
394435
false ->
395436
checkin_while_full(Pid, State#state{waiting=Left})
396437
end;
397-
{empty, Empty} when MaxOverflow < 1 ->
438+
{empty, Empty} when MaxOverflow < 1,
439+
Size >= EffectieSize ->
398440
Workers = queue:in(Pid, State#state.workers),
399441
{next_state, ready, State#state{workers=Workers,
400442
waiting=Empty}};

test/poolboy_tests.erl

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ pool_test_() ->
5454
{<<"Worker checked-in after an exception in a transaction">>,
5555
fun checkin_after_exception_in_transaction/0
5656
},
57+
{<<"Pool size adjustments (no overflow)">>,
58+
fun pool_resize_without_overflow/0
59+
},
5760
{<<"Pool returns status">>,
5861
fun pool_returns_status/0
5962
}
@@ -369,6 +372,48 @@ checkin_after_exception_in_transaction() ->
369372
?assertEqual(2, length(?sync(Pool, get_avail_workers))),
370373
ok = ?sync(Pool, stop).
371374

375+
pool_resize_without_overflow() ->
376+
{ok, Pool} = new_pool(4, 0),
377+
?assertEqual({4, 0}, ?sync(Pool, get_pool_size)),
378+
379+
Workers0 = [poolboy:checkout(Pool) || _ <- lists:seq(1, 4)],
380+
?assertEqual(full, poolboy:checkout(Pool, false)),
381+
382+
ok = ?sync(Pool, {set_pool_size, 6}),
383+
?assertEqual({6, 0}, ?sync(Pool, get_pool_size)),
384+
%% workers queue is still empty, but two more checkouts should succeed
385+
?assertEqual(0, length(?sync(Pool, get_avail_workers))),
386+
?assertEqual(4, length(?sync(Pool, get_all_workers))),
387+
388+
ExtraWorker0 = poolboy:checkout(Pool, false),
389+
?assert(is_pid(ExtraWorker0)),
390+
?assertEqual(0, length(?sync(Pool, get_avail_workers))),
391+
?assertEqual(5, length(?sync(Pool, get_all_workers))), %% will be 6 after another checkout
392+
393+
ExtraWorker1 = poolboy:checkout(Pool, false),
394+
?assert(is_pid(ExtraWorker0)),
395+
?assertEqual(0, length(?sync(Pool, get_avail_workers))),
396+
?assertEqual(6, length(?sync(Pool, get_all_workers))),
397+
?assertEqual(full, poolboy:checkout(Pool, false)),
398+
399+
ok = ?sync(Pool, {set_pool_size, 4}),
400+
?assertEqual(0, length(?sync(Pool, get_avail_workers))),
401+
?assertEqual(6, length(?sync(Pool, get_all_workers))), %% no change until checkins
402+
ok = poolboy:checkin(Pool, ExtraWorker1),
403+
?assertEqual(0, length(?sync(Pool, get_avail_workers))), %% worker is dismissed, not placed on queue
404+
?assertEqual(5, length(?sync(Pool, get_all_workers))),
405+
ok = poolboy:checkin(Pool, ExtraWorker0),
406+
?assertEqual(0, length(?sync(Pool, get_avail_workers))), %% worker is dismissed, not placed on queue
407+
?assertEqual(4, length(?sync(Pool, get_all_workers))),
408+
409+
[Worker2 | Workers2] = Workers0,
410+
ok = poolboy:checkin(Pool, Worker2),
411+
?assertEqual(1, length(?sync(Pool, get_avail_workers))),
412+
?assertEqual(4, length(?sync(Pool, get_all_workers))),
413+
414+
[poolboy:checkin(Pool, W) || W <- Workers2],
415+
ok = ?sync(Pool, stop).
416+
372417
pool_returns_status() ->
373418
{ok, Pool} = new_pool(2, 0),
374419
?assertEqual({ready, 2, 0, 0}, poolboy:status(Pool)),

0 commit comments

Comments
 (0)