diff --git a/README.org b/README.org
index 08acc1a..0d49637 100644
--- a/README.org
+++ b/README.org
@@ -137,18 +137,25 @@ examples are valid:
120000 %% plain integer — milliseconds, consistent with Erlang timeout conventions
#+END_SRC
-The =cull_interval= determines the schedule when a check will be made
-for stale members. Checks are scheduled using =erlang:send_after/3=
-which provides a light-weight timing mechanism. The next check is
-scheduled after the prior check completes.
-
-During a check, pool members that have not been used in more than
-=max_age= minutes will be removed until the pool size reaches
-=init_count=.
-
-The default value for =cull_interval= is ={1, min}=. You can disable
-culling by specifying a value os ={0, min}=. The =max_age= parameter
-defaults to ={30, sec}=.
+The =cull_interval= sets how often a cull check runs. Checks are
+scheduled using =erlang:send_after/3=; the next check is scheduled
+after the prior one completes. Set to ={0, min}= to disable culling
+entirely. Default: ={15, sec}=.
+
+=max_age= is the demand memory window. Pooler tracks the peak
+concurrent in-use count over the last =max_age= period. At each cull
+tick the pool is sized to that peak (floored at =init_count=); workers
+beyond the peak are removed oldest-idle-first. Default: ={30, sec}=.
+
+In practice this means: after a traffic burst the pool holds its
+grown size for up to =max_age= time, then shrinks back to =init_count=
+once no burst is recorded in the window.
+
+The most natural configuration is =cull_interval ≤ max_age= (check
+more often than you forget) — the defaults satisfy this. Setting
+=cull_interval > max_age= is valid: the pool still shrinks correctly,
+but the memory window is shorter than the check interval, which is an
+unusual combination that rarely makes sense in practice.
**** Connection lifetime recycling (max_lifetime)
@@ -216,9 +223,11 @@ The value is a =time_spec()= tuple; the default is ={1, min}=.
**** Request queue limit (queue_max)
-When all pool members are in use, =take_member= calls are queued up to
-=queue_max= entries (default =50=). Callers beyond this limit receive
-=error_no_members= immediately rather than waiting. Set it to =0= to disable
+When all pool members are in use, =take_member/2= calls (those with an explicit
+timeout) are queued up to =queue_max= entries (default =50=) and wait for a
+member to become available. =take_member/1= always returns =error_no_members=
+immediately and is never queued. Callers beyond the =queue_max= limit also
+receive =error_no_members= immediately. Set =queue_max= to =0= to disable
queuing entirely.
#+BEGIN_SRC erlang
@@ -404,6 +413,32 @@ members stay on their original shards and new starts fill the added shards
round-robin until churn rebalances things. /Decreasing/ is not supported and
returns ={error, num_member_sups_cannot_be_decreased}=.
+**** Worker dispatch order (member_order)
+
+Controls which free worker is handed out on each =take_member= call.
+
+- =lifo= (default, stack) — the most recently returned worker is taken first.
+ Under steady load a small working set stays warm; idle workers accumulate at
+ the back and are culled. Good for connection pools where keeping a
+ connection recently used matters (e.g. avoids idle-timeout disconnects on
+ hot connections).
+
+- =fifo= (queue) — the worker that has been idle the longest is taken first,
+ producing round-robin rotation across the whole pool. Load is spread evenly;
+ no single worker becomes a hot spot.
+
+#+BEGIN_SRC erlang
+#{name => pg_pool,
+ init_count => 10,
+ max_count => 50,
+ start_mfa => {epgsql, connect, [...]},
+ member_order => fifo} %% round-robin; default is lifo (stack)
+#+END_SRC
+
+Both modes produce identical cull behaviour for the same =cull_interval= /
+=max_age= configuration — only which worker is handed out next differs.
+=member_order= can be changed at runtime via =pooler:pool_reconfigure/2=.
+
*** Pool Configuration via =pooler:new_pool=
You can create pools using =pooler:new_pool/1= when accepts a
map of pool configuration. Here's an example:
diff --git a/rebar.config b/rebar.config
index 988d38c..2f47ffa 100644
--- a/rebar.config
+++ b/rebar.config
@@ -78,3 +78,5 @@
{warnings, [unknown]},
{plt_apps, all_deps}
]}.
+
+{hex, [{doc, edoc}]}.
diff --git a/src/pooler.erl b/src/pooler.erl
index 63be676..1406a7c 100644
--- a/src/pooler.erl
+++ b/src/pooler.erl
@@ -61,7 +61,7 @@
code_change/3
]).
--vsn(5).
+-vsn(6).
%% Bump this value and add a new clause to `code_change', if the format of `#pool{}' record changed
%% ------------------------------------------------------------------
@@ -83,12 +83,13 @@
]).
-define(DEFAULT_ADD_RETRY, 1).
--define(DEFAULT_CULL_INTERVAL, {1, min}).
+-define(DEFAULT_CULL_INTERVAL, {15, sec}).
-define(DEFAULT_MAX_AGE, {30, sec}).
-define(DEFAULT_MEMBER_START_TIMEOUT, {1, min}).
-define(DEFAULT_AUTO_GROW_THRESHOLD, undefined).
-define(POOLER_GROUP_TABLE, pooler_group_table).
-define(DEFAULT_POOLER_QUEUE_MAX, 50).
+-define(MAX_DEMAND_HISTORY_BUCKETS, 256).
%% Per-pool TTL configuration and timer state.
%% `undefined' in #pool.ttl means TTL is disabled — all TTL functions short-circuit immediately.
@@ -121,8 +122,16 @@
max_count = 100 :: non_neg_integer(),
init_count = 10 :: non_neg_integer(),
start_mfa :: {atom(), atom(), [term()]},
- free_pids = [] :: [pid()],
+ %% LIFO (default): list, used as a stack. FIFO: OTP queue.
+ %% Write only via free_push/free_pop/free_delete/free_convert — never update directly.
+ free_pids = [] :: [pid()] | queue:queue(pid()),
+ member_order = lifo :: lifo | fifo,
+ %% Demand ring-buffer for pool-level cull. Head = current bucket {PeakInUse, BucketStartMs}.
+ %% Queue is always at capacity (pre-filled with {0,0} dummies). Write via update_demand_buf.
+ demand_buf = undefined :: undefined | queue:queue({non_neg_integer(), integer()}),
in_use_count = 0 :: non_neg_integer(),
+ %% Kept in sync with queue:len(free_pids) / length(free_pids).
+ %% Write only via free_push/free_pop/free_delete — never update directly.
free_count = 0 :: non_neg_integer(),
%% The number times to attempt adding a pool member if the
%% pool size is below max_count and there are no free
@@ -253,7 +262,8 @@
max_lifetime => time_spec(),
max_lifetime_jitter => time_spec(),
num_member_sups => pos_integer(),
- member_shutdown => brutal_kill | pos_integer()
+ member_shutdown => brutal_kill | pos_integer(),
+ member_order => lifo | fifo
}.
%% See {@link pooler:new_pool/1}
@@ -286,6 +296,8 @@
'$pooler_pid' | '$pooler_member_sup' | '$pooler_pool' | '$pooler_pool_name' | any(), ...
]}}
| {auto_grow_threshold, non_neg_integer()}}
+ | {set_member_order, lifo | fifo}
+ | {rebuild_demand_buf}
| {update_ttl, undefined | #ttl{}}.
-type member_expiry() :: integer() | infinity.
@@ -394,21 +406,19 @@ create_group_table() ->
%% to. Pools sharing a common `group' value can be accessed using
%% {@link take_group_member/1}, {@link return_group_member/2} and {@link group_pools/1}.
%%
`cull_interval'
-%% Default: `{1, min}'. Time between checks for stale pool members. Specified as
-%% a `{Time, Unit}' tuple where `Unit' is one of `hour', `min', `sec', `ms', `mu',
+%% Default: `{15, sec}'. How often to run a cull check. Specified as a
+%% `{Time, Unit}' tuple where `Unit' is one of `hour', `min', `sec', `ms', `mu',
%% or as a plain non-negative integer (milliseconds).
-%% Triggers a once per `cull_interval' check to remove members that have not
-%% been accessed in `max_age' time units. Culling can be disabled by
-%% specifying a zero time vaule (e.g. `{0, min}'). Culling will also be
-%% disabled if `init_count' is the same as `max_count'.
+%% Culling can be disabled by specifying a zero value (e.g. `{0, min}').
+%% Culling is also disabled if `init_count' equals `max_count'.
%% `max_age'
-%% Default: `{30, sec}'. Members idle longer than `max_age' time units are removed from
-%% the pool when stale checking is enabled via
-%% `cull_interval'. Culling of idle members will never reduce the pool
-%% below `init_count'. The value is specified as `{Time, Unit}'. Note
-%% that timers are not set on individual pool members and may remain
-%% in the pool beyond the configured `max_age' value since members are
-%% only removed on the interval configured via `cull_interval'.
+%% Default: `{30, sec}'. Demand memory window for pool sizing. Pooler tracks the
+%% peak concurrent in-use count over the last `max_age' period; at each cull tick
+%% the pool is sized to that peak (floored at `init_count'). Workers beyond the
+%% peak are removed oldest-idle-first. Setting to zero disables demand tracking
+%% and always targets `init_count'.
+%% Typical configuration: `cull_interval' <= `max_age' (check more often
+%% than you forget). `cull_interval' > `max_age' is valid but unusual.
%% `member_start_timeout'
%% Default: `{1, min}'. Time limit for member starts. Specified as `{Time, Unit}'.
%% `queue_max'
@@ -446,6 +456,14 @@ create_group_table() ->
%% from `[-jitter, +jitter]', so members started together do not all expire simultaneously.
%% Only meaningful when `max_lifetime' is set. Must be strictly less than `max_lifetime';
%% pool start and reconfigure will fail with `jitter_must_be_less_than_max_lifetime' otherwise.
+%% `member_order'
+%% Default: `lifo' (stack). Controls which free worker is handed out next.
+%% `lifo' — most recently returned worker first; keeps a hot working set warm under
+%% steady load and lets idle workers accumulate for culling.
+%% `fifo' (queue) — longest-idle worker first; rotates all workers evenly,
+%% spreading load across the pool.
+%% Both modes produce identical cull behaviour for the same `cull_interval' / `max_age'.
+%% Reconfigurable via {@link pool_reconfigure/2}.
%%
-spec new_pool(pool_config() | pool_config_legacy()) -> {ok, pid()} | {error, {already_started, pid()}}.
new_pool(PoolConfig) ->
@@ -757,6 +775,15 @@ init(#{name := Name, max_count := MaxCount, init_count := InitCount, start_mfa :
{error, Err} -> exit({error, Err});
{ok, T} -> T
end,
+ case
+ validate_demand_buf_size(
+ maps:get(cull_interval, P, ?DEFAULT_CULL_INTERVAL),
+ maps:get(max_age, P, ?DEFAULT_MAX_AGE)
+ )
+ of
+ ok -> ok;
+ {error, _} = DemandBufErr -> exit(DemandBufErr)
+ end,
NumMemberSups = maps:get(num_member_sups, P, 1),
MemberSups = pooler_pool_sup:member_sup_names(Name, NumMemberSups),
Pool = #pool{
@@ -776,7 +803,20 @@ init(#{name := Name, max_count := MaxCount, init_count := InitCount, start_mfa :
metrics_api = maps:get(metrics_api, P, folsom),
queue_max = maps:get(queue_max, P, ?DEFAULT_POOLER_QUEUE_MAX),
ttl = TTL,
- member_sups = MemberSups
+ member_sups = MemberSups,
+ member_order = maps:get(member_order, P, lifo),
+ free_pids = free_new(maps:get(member_order, P, lifo)),
+ demand_buf =
+ case InitCount =:= MaxCount of
+ %% fixed-size pool: cull never fires, no tracking needed
+ true ->
+ undefined;
+ false ->
+ init_demand_buf(
+ maps:get(cull_interval, P, ?DEFAULT_CULL_INTERVAL),
+ maps:get(max_age, P, ?DEFAULT_MAX_AGE)
+ )
+ end
},
%% This schedules the next cull when the pool is configured for
%% such and is otherwise a no-op.
@@ -811,8 +851,9 @@ handle_call(pool_utilization, _From, Pool) ->
{reply, compute_utilization(Pool), Pool};
handle_call(dump_pool, _From, Pool) ->
{reply, to_map(Pool), Pool};
-handle_call({call_free_members, Fun}, _From, #pool{free_pids = Pids} = Pool) ->
- {reply, do_call_free_members(Fun, Pids), Pool};
+handle_call({call_free_members, Fun}, _From, Pool) ->
+ Results = free_fold(fun(Pid, Acc) -> [do_call_free_member(Fun, Pid) | Acc] end, [], Pool),
+ {reply, Results, Pool};
handle_call({reconfigure, NewConfig}, _From, Pool) ->
case calculate_reconfigure_actions(NewConfig, Pool) of
{ok, Actions} = Res ->
@@ -911,7 +952,12 @@ code_change(3, OldState, Extra) when tuple_size(OldState) =:= 27 ->
%% 2-tuple starting_members) → v5 (`member_sups' tuple + `next_shard',
%% `#member{}' record entries, 3-tuple starting_members).
code_change(4, OldState, _Extra) when is_tuple(OldState), tuple_size(OldState) =:= 28, element(1, OldState) =:= pool ->
- {ok, do_upgrade_to_v5(OldState)};
+ code_change(5, do_upgrade_to_v5(OldState), _Extra);
+%% v5 #pool{} (29-tuple, no member_order/demand_buf) → v6.
+code_change(5, OldState, _Extra) when
+ is_tuple(OldState), tuple_size(OldState) =:= 29, element(1, OldState) =:= pool
+->
+ {ok, do_upgrade_to_v6(OldState)};
code_change(_, State, _Extra) ->
{ok, State}.
@@ -976,6 +1022,19 @@ do_upgrade_to_v5(
AllMembers
),
NewStartingMembers = [{P, T, 1} || {P, T} <- StartingMembers],
+ {pool, Name, Group, MaxCount, InitCount, StartMFA, FreePids, InUseCount, FreeCount, AddMemberRetry, CullInterval,
+ MaxAge, CullTimer, TTL, {MemberSup}, 1, StarterSup, NewAllMembers, ConsumerToPid, NewStartingMembers,
+ StoppingCount, MemberStartTimeout, AutoGrowThreshold, StopMFA, InitializeMFA, MetricsMod, MetricsAPI,
+ QueuedRequestors, QueueMax}.
+
+%% v5 #pool{} → v6: positional destructure (v5 has no member_order/demand_buf),
+%% then construct a proper v6 record and warm the demand_buf from member timestamps.
+do_upgrade_to_v6(
+ {pool, Name, Group, MaxCount, InitCount, StartMFA, FreePids, InUseCount, FreeCount, AddMemberRetry, CullInterval,
+ MaxAge, CullTimer, TTL, MemberSups, NextShard, StarterSup, AllMembers, ConsumerToPid, StartingMembers,
+ StoppingCount, MemberStartTimeout, AutoGrowThreshold, StopMFA, InitializeMFA, MetricsMod, MetricsAPI,
+ QueuedRequestors, QueueMax}
+) ->
#pool{
name = Name,
group = Group,
@@ -983,6 +1042,8 @@ do_upgrade_to_v5(
init_count = InitCount,
start_mfa = StartMFA,
free_pids = FreePids,
+ member_order = lifo,
+ demand_buf = rebuild_demand_buf_from_members(CullInterval, MaxAge, AllMembers, InUseCount),
in_use_count = InUseCount,
free_count = FreeCount,
add_member_retry = AddMemberRetry,
@@ -990,12 +1051,12 @@ do_upgrade_to_v5(
max_age = MaxAge,
cull_timer = CullTimer,
ttl = TTL,
- member_sups = {MemberSup},
- next_shard = 1,
+ member_sups = MemberSups,
+ next_shard = NextShard,
starter_sup = StarterSup,
- all_members = NewAllMembers,
+ all_members = AllMembers,
consumer_to_pid = ConsumerToPid,
- starting_members = NewStartingMembers,
+ starting_members = StartingMembers,
stopping_count = StoppingCount,
member_start_timeout = MemberStartTimeout,
auto_grow_threshold = AutoGrowThreshold,
@@ -1007,6 +1068,59 @@ do_upgrade_to_v5(
queue_max = QueueMax
}.
+%% Returns an initialised demand_buf estimated from member last-return timestamps.
+%% Lower bound: members that rotated through a bucket multiple times only appear once.
+rebuild_demand_buf_from_members(CI, MA, AllMembers, InUseCount) ->
+ rebuild_demand_buf_from_members(
+ CI,
+ MA,
+ AllMembers,
+ InUseCount,
+ erlang:monotonic_time(millisecond)
+ ).
+
+rebuild_demand_buf_from_members(CI, MA, AllMembers, InUseCount, Now) ->
+ case cull_bucket_size_ms(CI, MA) of
+ 0 ->
+ undefined;
+ BucketMs ->
+ N = min(ceil(time_as_millis(MA) / BucketMs), ?MAX_DEMAND_HISTORY_BUCKETS),
+ MAms = time_as_millis(MA),
+ Buckets = [
+ begin
+ BucketEnd = Now - I * BucketMs,
+ BucketStart = BucketEnd - BucketMs,
+ Count = maps:fold(
+ fun(_, #member{time = T}, Acc) ->
+ Tms = member_time_to_monotonic_ms(T),
+ case Tms >= BucketStart andalso Tms < BucketEnd of
+ true -> Acc + 1;
+ false -> Acc
+ end
+ end,
+ 0,
+ AllMembers
+ ),
+ Peak =
+ Count +
+ (if
+ I =:= 0 -> InUseCount;
+ true -> 0
+ end),
+ case Now - BucketStart =< MAms of
+ true -> {Peak, BucketStart};
+ false -> {0, 0}
+ end
+ end
+ || I <- lists:seq(0, N - 1)
+ ],
+ queue:from_list(Buckets)
+ end.
+
+member_time_to_monotonic_ms({Mega, Sec, Micro}) ->
+ WallMs = (Mega * 1_000_000 + Sec) * 1000 + Micro div 1000,
+ WallMs - erlang:time_offset(millisecond).
+
%% ------------------------------------------------------------------
%% Internal Function Definitions
%% ------------------------------------------------------------------
@@ -1074,17 +1188,12 @@ do_accept_member(
maybe_reply_with_pid(
Pid,
Pool = #pool{
- queued_requestors = QueuedRequestors,
- free_pids = Free,
- free_count = NumFree
+ queued_requestors = QueuedRequestors
}
) when is_pid(Pid) ->
case queue:out(QueuedRequestors) of
{empty, _} ->
- Pool#pool{
- free_pids = [Pid | Free],
- free_count = NumFree + 1
- };
+ free_push(Pid, Pool);
{{value, {From = {APid, _}, TRef}}, NewQueuedRequestors} when is_pid(APid) ->
reply_to_queued_requestor(TRef, Pid, From, NewQueuedRequestors, Pool)
end.
@@ -1098,34 +1207,39 @@ reply_to_queued_requestor(TRef, Pid, From = {APid, _}, NewQueuedRequestors, Pool
gen_server:reply(From, Pid),
Pool1.
+%% Clause 1: normal take from free list. Pool already has free_pids updated by free_pop.
-spec take_member_bookkeeping(
pid(),
gen_server_from(),
- [pid()] | requestor_queue(),
#pool{}
) -> #pool{}.
take_member_bookkeeping(
MemberPid,
{CPid, _},
- Rest,
Pool = #pool{
in_use_count = NumInUse,
- free_count = NumFree,
consumer_to_pid = CPMap,
all_members = AllMembers
}
) when
is_pid(MemberPid),
- is_pid(CPid),
- is_list(Rest)
+ is_pid(CPid)
->
- Pool#pool{
- free_pids = Rest,
- in_use_count = NumInUse + 1,
- free_count = NumFree - 1,
+ NewInUse = NumInUse + 1,
+ Pool1 = Pool#pool{
+ in_use_count = NewInUse,
consumer_to_pid = add_member_to_consumer(MemberPid, CPid, CPMap),
all_members = set_cpid_for_member(MemberPid, CPid, AllMembers)
- };
+ },
+ update_demand_buf(NewInUse, Pool1).
+
+%% Clause 2: queued requestor take. Called from reply_to_queued_requestor.
+-spec take_member_bookkeeping(
+ pid(),
+ gen_server_from(),
+ requestor_queue(),
+ #pool{}
+) -> #pool{}.
take_member_bookkeeping(
MemberPid,
{ReplyPid, _Tag},
@@ -1136,12 +1250,14 @@ take_member_bookkeeping(
consumer_to_pid = CPMap
}
) ->
- Pool#pool{
- in_use_count = NumInUse + 1,
+ NewInUse = NumInUse + 1,
+ Pool1 = Pool#pool{
+ in_use_count = NewInUse,
all_members = set_cpid_for_member(MemberPid, ReplyPid, AllMembers),
consumer_to_pid = add_member_to_consumer(MemberPid, ReplyPid, CPMap),
queued_requestors = NewQueuedRequestors
- }.
+ },
+ update_demand_buf(NewInUse, Pool1).
-spec remove_stale_starting_members(
#pool{},
@@ -1237,12 +1353,12 @@ take_member_from_pool(
NumFree = Pool1#pool.free_count,
StoppingCount = Pool1#pool.stopping_count,
NumCanAdd = Max - (NumInUse + NumFree + NonStaleStartingMemberCount + StoppingCount),
- case Pool1#pool.free_pids of
- [] when NumCanAdd =< 0 ->
+ case free_pop(Pool1) of
+ empty when NumCanAdd =< 0 ->
send_metric(Pool, error_no_members_count, {inc, 1}, counter, #{reason => at_capacity}),
send_metric(Pool, events, error_no_members, history),
{error_no_members, Pool1};
- [] when NumCanAdd > 0 ->
+ empty ->
%% Limit concurrently starting members to init_count. Add
%% up to init_count members. Starting members here means
%% we always return an error_no_members for a take request
@@ -1255,23 +1371,23 @@ take_member_from_pool(
send_metric(Pool, error_no_members_count, {inc, 1}, counter, #{reason => all_in_use}),
send_metric(Pool, events, error_no_members, history),
{error_no_members, Pool2};
- [Pid | Rest] ->
- Pool2 = take_member_bookkeeping(Pid, From, Rest, Pool1),
- Pool3 =
- case Pool2#pool.auto_grow_threshold of
+ {Pid, Pool2} ->
+ Pool3 = take_member_bookkeeping(Pid, From, Pool2),
+ Pool4 =
+ case Pool3#pool.auto_grow_threshold of
N when
is_integer(N) andalso
- Pool2#pool.free_count =< N andalso
+ Pool3#pool.free_count =< N andalso
NumCanAdd > 0
->
NumToAdd = max(min(InitCount - NonStaleStartingMemberCount, NumCanAdd), 0),
- add_members_async(NumToAdd, Pool2);
+ add_members_async(NumToAdd, Pool3);
_ ->
- Pool2
+ Pool3
end,
- send_metric(Pool, in_use_count, Pool3#pool.in_use_count, gauge),
- send_metric(Pool, free_count, Pool3#pool.free_count, gauge),
- {Pid, Pool3}
+ send_metric(Pool, in_use_count, Pool4#pool.in_use_count, gauge),
+ send_metric(Pool, free_count, Pool4#pool.free_count, gauge),
+ {Pid, Pool4}
end.
-spec take_member_from_pool_queued(
@@ -1370,9 +1486,7 @@ do_return_member(
Pool;
#member{status = CPid, expires_at = ExpTs} = Member ->
#pool{
- free_pids = Free,
- in_use_count = NumInUse,
- free_count = NumFree
+ in_use_count = NumInUse
} = Pool,
Pool1 = Pool#pool{
in_use_count = NumInUse - 1,
@@ -1406,7 +1520,7 @@ do_return_member(
},
case queue:out(QueuedRequestors) of
{empty, _} ->
- Pool3 = Pool2#pool{free_pids = [Pid | Free], free_count = NumFree + 1},
+ Pool3 = free_push(Pid, Pool2),
maybe_advance_ttl_timer(Pid, ExpTs, Pool3);
{{value, {From = {APid, _}, TRef}}, NewQueuedRequestors} when is_pid(APid) ->
reply_to_queued_requestor(TRef, Pid, From, NewQueuedRequestors, Pool2)
@@ -1470,11 +1584,9 @@ handle_unexpected_member_down(Pid, Pool) ->
AllMembers = Pool#pool.all_members,
case maps:get(Pid, AllMembers, undefined) of
#member{status = free} ->
- Pool1 = Pool#pool{
- free_pids = lists:delete(Pid, Pool#pool.free_pids),
- free_count = Pool#pool.free_count - 1,
+ Pool1 = free_delete(Pid, Pool#pool{
all_members = maps:remove(Pid, AllMembers)
- },
+ }),
send_metric(Pool1, killed_free_count, {inc, 1}, counter, #{reason => crashed}),
Pool2 =
case Pool1#pool.ttl of
@@ -1513,12 +1625,10 @@ remove_pid(Pid, Pool, Flag, Reason) ->
} = Pool,
case maps:get(Pid, AllMembers, undefined) of
#member{status = free, shard_idx = ShardIdx} = Member ->
- Pool1 = Pool#pool{
- free_pids = lists:delete(Pid, Pool#pool.free_pids),
- free_count = Pool#pool.free_count - 1,
+ Pool1 = free_delete(Pid, Pool#pool{
stopping_count = Pool#pool.stopping_count + 1,
all_members = AllMembers#{Pid => Member#member{status = {stopping, Flag}}}
- },
+ }),
pooler_starter_sup:new_stopper(
pooler_starter:stop_spec(PoolName, member_sup_for(ShardIdx, Pool1), Pid, StopMFA)
),
@@ -1593,7 +1703,8 @@ add_member_to_consumer(MemberPid, CPid, CPMap) ->
-spec cull_members_from_pool(#pool{}) -> #pool{}.
cull_members_from_pool(#pool{cull_interval = {0, _}} = Pool) ->
- %% 0 cull_interval means do not cull
+ Pool;
+cull_members_from_pool(#pool{cull_interval = 0} = Pool) ->
Pool;
cull_members_from_pool(#pool{init_count = C, max_count = C} = Pool) ->
%% if init_count matches max_count, then we will not dynamically
@@ -1602,31 +1713,25 @@ cull_members_from_pool(#pool{init_count = C, max_count = C} = Pool) ->
Pool;
cull_members_from_pool(
#pool{
- free_count = FreeCount,
- init_count = InitCount,
- in_use_count = InUseCount,
cull_interval = Delay,
- cull_timer = CullTRef,
- max_age = MaxAge,
- all_members = AllMembers
+ cull_timer = CullTRef
} = Pool
) ->
case is_reference(CullTRef) of
true -> erlang:cancel_timer(CullTRef);
false -> noop
end,
- MaxCull = FreeCount - (InitCount - InUseCount),
+ WinPeak = demand_window_peak(Pool),
+ TargetSize = max(Pool#pool.init_count, WinPeak),
+ MaxCull = Pool#pool.free_count - max(0, TargetSize - Pool#pool.in_use_count),
Pool1 =
case MaxCull > 0 of
true ->
- MemberInfo = member_info(Pool#pool.free_pids, AllMembers),
- ExpiredMembers =
- expired_free_members(MemberInfo, os:timestamp(), MaxAge),
- CullList = lists:sublist(ExpiredMembers, MaxCull),
+ Victims = oldest_idle_victims(MaxCull, Pool),
lists:foldl(
fun({CullMe, _}, S) -> remove_pid(CullMe, S, no_replace, max_age) end,
Pool,
- CullList
+ Victims
);
false ->
Pool
@@ -1645,23 +1750,6 @@ schedule_cull(Pool, Delay) ->
DelayMillis = time_as_millis(Delay),
erlang:send_after(DelayMillis, Pool, cull_pool).
--spec member_info([pid()], members_map()) -> [{pid(), #member{}}].
-member_info(Pids, AllMembers) ->
- maps:to_list(maps:with(Pids, AllMembers)).
-
--spec expired_free_members(
- Members :: [{pid(), #member{}}],
- Now :: {_, _, _},
- MaxAge :: time_spec()
-) -> [{pid(), #member{}}].
-expired_free_members(Members, Now, MaxAge) ->
- MaxMicros = time_as_micros(MaxAge),
- [
- MI
- || MI = {_, #member{status = free, time = LastReturn}} <- Members,
- timer:now_diff(Now, LastReturn) >= MaxMicros
- ].
-
-spec calculate_reconfigure_actions(pool_config(), #pool{}) -> {ok, [reconfigure_action()]} | {error, any()}.
calculate_reconfigure_actions(
#{name := Name, start_mfa := MFA} = NewConfig,
@@ -1681,10 +1769,17 @@ calculate_reconfigure_actions(
metrics_mod => pooler_no_metrics,
metrics_api => folsom,
queue_max => ?DEFAULT_POOLER_QUEUE_MAX,
- num_member_sups => 1
+ num_member_sups => 1,
+ member_order => lifo
},
+ NewCI = maps:get(cull_interval, NewConfig, ?DEFAULT_CULL_INTERVAL),
+ NewMA = maps:get(max_age, NewConfig, ?DEFAULT_MAX_AGE),
NewWithDefaults0 = maps:merge(Defaults, NewConfig),
try
+ case validate_demand_buf_size(NewCI, NewMA) of
+ {error, _} = DemandSizeErr -> throw(DemandSizeErr);
+ ok -> ok
+ end,
NewWithDefaults =
case ttl_from_config(NewConfig) of
{error, _} = Err -> throw(Err);
@@ -1708,7 +1803,8 @@ calculate_reconfigure_actions(
initialize_mfa,
auto_grow_threshold,
ttl,
- num_member_sups
+ num_member_sups,
+ member_order
]
)
of
@@ -1736,7 +1832,8 @@ mk_rec_action(init_count, NewInitCount, _, #pool{init_count = OldInitCount, in_u
->
AliveCount = InUse + Free,
[
- {set_parameter, {init_count, NewInitCount}}
+ {set_parameter, {init_count, NewInitCount}},
+ {rebuild_demand_buf}
| case AliveCount < NewInitCount of
true ->
[{start_workers, NewInitCount - AliveCount}];
@@ -1745,29 +1842,29 @@ mk_rec_action(init_count, NewInitCount, _, #pool{init_count = OldInitCount, in_u
end
];
mk_rec_action(init_count, NewInitCount, _, #pool{init_count = OldInitCount}) when NewInitCount < OldInitCount ->
- [{set_parameter, {init_count, NewInitCount}}];
+ [{set_parameter, {init_count, NewInitCount}}, {rebuild_demand_buf}];
mk_rec_action(max_count, NewMaxCount, _, #pool{max_count = OldMaxCount, in_use_count = InUse, free_count = Free}) when
NewMaxCount < OldMaxCount
->
AliveCount = InUse + Free,
[
- {set_parameter, {max_count, NewMaxCount}}
+ {set_parameter, {max_count, NewMaxCount}},
+ {rebuild_demand_buf}
| case AliveCount > NewMaxCount of
true when Free >= (AliveCount - NewMaxCount) ->
- %% We have enough free workers to shut down
[{stop_free_workers, AliveCount - NewMaxCount}];
true ->
- %% We don't have enough free workers to shutdown
throw({error, {max_count, not_enough_free_workers_to_shutdown}});
false ->
[]
end
];
mk_rec_action(max_count, NewMaxCount, _, #pool{max_count = OldMaxCount}) when NewMaxCount > OldMaxCount ->
- [{set_parameter, {max_count, NewMaxCount}}];
+ [{set_parameter, {max_count, NewMaxCount}}, {rebuild_demand_buf}];
mk_rec_action(cull_interval, New, _, #pool{cull_interval = Old, cull_timer = _Timer}) when New =/= Old ->
[
- {set_parameter, {cull_interval, New}}
+ {set_parameter, {cull_interval, New}},
+ {rebuild_demand_buf}
| case time_as_millis(New) < time_as_millis(Old) of
true ->
[{reset_cull_timer, New}];
@@ -1777,7 +1874,8 @@ mk_rec_action(cull_interval, New, _, #pool{cull_interval = Old, cull_timer = _Ti
];
mk_rec_action(max_age, New, _, #pool{max_age = Old}) when New =/= Old ->
[
- {set_parameter, {max_age, New}}
+ {set_parameter, {max_age, New}},
+ {rebuild_demand_buf}
| case time_as_millis(New) < time_as_millis(Old) of
true ->
[{cull, []}];
@@ -1822,6 +1920,8 @@ mk_rec_action(num_member_sups, New, _, #pool{member_sups = Sups}) ->
New < OldN -> throw({error, num_member_sups_cannot_be_decreased});
true -> []
end;
+mk_rec_action(member_order, New, _, #pool{member_order = Old}) when New =/= Old ->
+ [{set_member_order, New}];
mk_rec_action(_Param, _NewVal, _, _Pool) ->
%% not changed
[].
@@ -1831,8 +1931,8 @@ apply_reconfigure_action({set_parameter, {Name, Value}}, Pool) ->
set_parameter(Name, Value, Pool);
apply_reconfigure_action({start_workers, Count}, Pool) ->
add_members_async(Count, Pool);
-apply_reconfigure_action({stop_free_workers, Count}, #pool{free_pids = Free} = Pool) ->
- lists:foldl(fun(P, S) -> remove_pid(P, S, no_replace, reconfigured) end, Pool, lists:sublist(Free, Count));
+apply_reconfigure_action({stop_free_workers, Count}, Pool) ->
+ lists:foldl(fun(P, S) -> remove_pid(P, S, no_replace, reconfigured) end, Pool, free_take_n(Count, Pool));
apply_reconfigure_action({shrink_queue, Count}, #pool{queued_requestors = Q} = Pool) ->
{ToShrink, ToKeep} = lists:split(Count, queue:to_list(Q)),
[gen_server:reply(From, error_no_members) || {From, _TRef} <- ToShrink],
@@ -1859,6 +1959,10 @@ apply_reconfigure_action(
NewNames = pooler_pool_sup:add_member_sups(PoolName, StartMFA, OldN, NewN),
NewSups = list_to_tuple(tuple_to_list(OldSups) ++ NewNames),
Pool#pool{member_sups = NewSups};
+apply_reconfigure_action({set_member_order, Order}, Pool) ->
+ free_convert(Pool, Order);
+apply_reconfigure_action({rebuild_demand_buf}, Pool) ->
+ rebuild_demand_buf(Pool);
apply_reconfigure_action({update_ttl, NewTTL}, Pool) ->
case Pool#pool.ttl of
#ttl{timer = TRef} when is_reference(TRef) -> erlang:cancel_timer(TRef);
@@ -2020,7 +2124,7 @@ time_as_millis({Time, Unit}) ->
time_as_millis(Time) when is_integer(Time) ->
Time.
--spec time_as_micros(time_spec()) -> non_neg_integer().
+-spec time_as_micros({non_neg_integer(), time_unit()}) -> non_neg_integer().
%% @doc Convert time unit into microseconds
time_as_micros({Time, hour}) ->
60 * 60 * 1000 * 1000 * Time;
@@ -2031,9 +2135,7 @@ time_as_micros({Time, sec}) ->
time_as_micros({Time, ms}) ->
1000 * Time;
time_as_micros({Time, mu}) ->
- Time;
-time_as_micros(Time) when is_integer(Time) ->
- Time * 1000.
+ Time.
secs_between({Mega1, Secs1, _}, {Mega2, Secs2, _}) ->
(Mega2 - Mega1) * 1000000 + (Secs2 - Secs1).
@@ -2078,12 +2180,10 @@ is_member_expired(Pid, #pool{all_members = AllMembers}) ->
undefined -> false
end.
-%% @doc Evict an expired member that was just dequeued from the head of free_pids.
-%% Unlike remove_pid/3, does NOT call lists:delete (we already have Rest).
--spec remove_free_head(pid(), [pid()], #pool{}) -> #pool{}.
+%% @doc Evict an expired member. Pool already has free_pids updated by free_pop.
+-spec remove_free_head(pid(), #pool{}) -> #pool{}.
remove_free_head(
Pid,
- Rest,
#pool{
all_members = AllMembers,
ttl = TTL,
@@ -2093,8 +2193,6 @@ remove_free_head(
) ->
#member{status = free, shard_idx = ShardIdx} = Member = maps:get(Pid, AllMembers),
Pool1 = Pool#pool{
- free_pids = Rest,
- free_count = Pool#pool.free_count - 1,
stopping_count = Pool#pool.stopping_count + 1,
all_members = AllMembers#{Pid => Member#member{status = {stopping, replace}}}
},
@@ -2129,12 +2227,15 @@ evict_expired_heads(#pool{ttl = #ttl{timer_target = BeforeTarget}} = Pool) ->
end.
-spec evict_expired_heads_loop(#pool{}) -> #pool{}.
-evict_expired_heads_loop(#pool{free_pids = []} = Pool) ->
- Pool;
-evict_expired_heads_loop(#pool{free_pids = [Pid | Rest]} = Pool) ->
- case is_member_expired(Pid, Pool) of
- false -> Pool;
- true -> evict_expired_heads_loop(remove_free_head(Pid, Rest, Pool))
+evict_expired_heads_loop(Pool) ->
+ case free_pop(Pool) of
+ empty ->
+ Pool;
+ {Pid, Pool1} ->
+ case is_member_expired(Pid, Pool) of
+ false -> Pool;
+ true -> evict_expired_heads_loop(remove_free_head(Pid, Pool1))
+ end
end.
%% @doc Schedule or advance the TTL timer when a member joins free_pids.
@@ -2176,22 +2277,14 @@ schedule_ttl_timer_for(Pid, ExpTs, TTL, Pool) ->
-spec reschedule_ttl_timer(#pool{}) -> #pool{}.
reschedule_ttl_timer(#pool{ttl = undefined} = Pool) ->
Pool;
-reschedule_ttl_timer(#pool{ttl = TTL, free_pids = FreePids, all_members = AllMembers} = Pool) ->
+reschedule_ttl_timer(#pool{ttl = TTL, all_members = AllMembers} = Pool) ->
case TTL#ttl.timer of
TRef when is_reference(TRef) -> erlang:cancel_timer(TRef);
undefined -> ok
end,
TTL1 = TTL#ttl{timer = undefined, timer_target = undefined},
Pool1 = Pool#pool{ttl = TTL1},
- case find_earliest_expiry(FreePids, AllMembers) of
- none -> Pool1;
- {Pid, ExpTs} -> schedule_ttl_timer_for(Pid, ExpTs, TTL1, Pool1)
- end.
-
-%% @doc Scan free_pids for the member with the smallest finite ExpiresAt. O(n).
--spec find_earliest_expiry([pid()], members_map()) -> {pid(), integer()} | none.
-find_earliest_expiry(Pids, AllMembers) ->
- lists:foldl(
+ Earliest = free_fold(
fun(Pid, Acc) ->
case maps:get(Pid, AllMembers, undefined) of
#member{expires_at = infinity} ->
@@ -2207,8 +2300,12 @@ find_earliest_expiry(Pids, AllMembers) ->
end
end,
none,
- Pids
- ).
+ Pool1
+ ),
+ case Earliest of
+ none -> Pool1;
+ {Pid, ExpTs} -> schedule_ttl_timer_for(Pid, ExpTs, TTL1, Pool1)
+ end.
%% @doc Extract only the static config fields of a #ttl{} for change-detection in
%% reconfigure. Must not include timer or timer_target (those change at runtime).
@@ -2275,9 +2372,6 @@ compute_utilization(#pool{
{queue_max, QueueMax}
].
-do_call_free_members(Fun, Pids) ->
- [do_call_free_member(Fun, P) || P <- Pids].
-
do_call_free_member(Fun, Pid) ->
try
{ok, Fun(Pid)}
@@ -2286,6 +2380,185 @@ do_call_free_member(Fun, Pid) ->
{error, Reason}
end.
+%% ===== Free-collection abstraction =====
+
+free_new(lifo) -> [];
+free_new(fifo) -> queue:new().
+
+free_push(Pid, #pool{member_order = lifo, free_pids = L, free_count = N} = Pool) ->
+ Pool#pool{free_pids = [Pid | L], free_count = N + 1};
+free_push(Pid, #pool{member_order = fifo, free_pids = Q, free_count = N} = Pool) ->
+ Pool#pool{free_pids = queue:in(Pid, Q), free_count = N + 1}.
+
+free_pop(#pool{member_order = lifo, free_pids = [Pid | Rest], free_count = N} = Pool) ->
+ {Pid, Pool#pool{free_pids = Rest, free_count = N - 1}};
+free_pop(#pool{member_order = fifo, free_pids = Q, free_count = N} = Pool) ->
+ case queue:out(Q) of
+ {empty, _} -> empty;
+ {{value, Pid}, Rest} -> {Pid, Pool#pool{free_pids = Rest, free_count = N - 1}}
+ end;
+free_pop(#pool{member_order = lifo, free_pids = []}) ->
+ empty.
+
+free_delete(Pid, #pool{member_order = lifo, free_pids = L, free_count = N} = Pool) ->
+ Pool#pool{free_pids = lists:delete(Pid, L), free_count = N - 1};
+free_delete(Pid, #pool{member_order = fifo, free_pids = Q, free_count = N} = Pool) ->
+ Pool#pool{free_pids = queue:filter(fun(P) -> P =/= Pid end, Q), free_count = N - 1}.
+
+free_fold(Fun, Acc0, #pool{member_order = lifo, free_pids = L}) ->
+ lists:foldl(Fun, Acc0, L);
+free_fold(Fun, Acc0, #pool{member_order = fifo, free_pids = Q}) ->
+ queue:fold(Fun, Acc0, Q).
+
+free_take_n(N, #pool{member_order = lifo, free_pids = L}) ->
+ lists:sublist(L, N);
+free_take_n(N, #pool{member_order = fifo, free_pids = Q}) ->
+ lists:sublist(queue:to_list(Q), N).
+
+free_convert(#pool{member_order = O} = Pool, O) ->
+ Pool;
+free_convert(#pool{member_order = lifo, free_pids = L} = Pool, fifo) ->
+ Pool#pool{free_pids = queue:from_list(L), member_order = fifo};
+free_convert(#pool{member_order = fifo, free_pids = Q} = Pool, lifo) ->
+ Pool#pool{free_pids = queue:to_list(Q), member_order = lifo}.
+
+%% ===== Demand buffer =====
+
+cull_bucket_size_ms(CullInterval, MaxAge) ->
+ CIms = time_as_millis(CullInterval),
+ MAms = time_as_millis(MaxAge),
+ case CIms =:= 0 orelse MAms =:= 0 of
+ true -> 0;
+ false -> min(CIms, MAms)
+ end.
+
+validate_demand_buf_size(CullInterval, MaxAge) ->
+ case cull_bucket_size_ms(CullInterval, MaxAge) of
+ 0 ->
+ ok;
+ BS ->
+ N = ceil(time_as_millis(MaxAge) / BS),
+ case N > ?MAX_DEMAND_HISTORY_BUCKETS of
+ true ->
+ {error,
+ {demand_history_too_large, #{
+ cull_interval => CullInterval,
+ max_age => MaxAge,
+ computed_buckets => N,
+ max_buckets => ?MAX_DEMAND_HISTORY_BUCKETS
+ }}};
+ false ->
+ ok
+ end
+ end.
+
+init_demand_buf(CullInterval, MaxAge) ->
+ case cull_bucket_size_ms(CullInterval, MaxAge) of
+ 0 ->
+ undefined;
+ BS ->
+ N = min(ceil(time_as_millis(MaxAge) / BS), ?MAX_DEMAND_HISTORY_BUCKETS),
+ Now = erlang:monotonic_time(millisecond),
+ queue:from_list([{0, 0} || _ <- lists:seq(1, N - 1)] ++ [{0, Now}])
+ end.
+
+rebuild_demand_buf(#pool{init_count = C, max_count = C} = Pool) ->
+ Pool#pool{demand_buf = undefined};
+rebuild_demand_buf(#pool{cull_interval = CI, max_age = MA, demand_buf = OldBuf} = Pool) ->
+ case cull_bucket_size_ms(CI, MA) of
+ 0 ->
+ Pool#pool{demand_buf = undefined};
+ NewBS ->
+ NewN = min(ceil(time_as_millis(MA) / NewBS), ?MAX_DEMAND_HISTORY_BUCKETS),
+ Now = erlang:monotonic_time(millisecond),
+ NewBuf =
+ case OldBuf of
+ undefined ->
+ init_demand_buf(CI, MA);
+ OldQ ->
+ remap_demand_buf(OldQ, NewN, NewBS, Now)
+ end,
+ Pool#pool{demand_buf = NewBuf}
+ end.
+
+remap_demand_buf(OldQ, NewN, NewBucketMs, Now) ->
+ OldEntries = queue:to_list(OldQ),
+ OldBucketMs =
+ case OldEntries of
+ [{_, T0}, {_, T1} | _] -> max(T0 - T1, 1);
+ _ -> NewBucketMs
+ end,
+ Buckets = [
+ begin
+ NewEnd = Now - I * NewBucketMs,
+ NewStart = NewEnd - NewBucketMs,
+ Peak = lists:foldl(
+ fun({P, S}, Acc) ->
+ case S + OldBucketMs > NewStart andalso S < NewEnd of
+ true -> max(P, Acc);
+ false -> Acc
+ end
+ end,
+ 0,
+ OldEntries
+ ),
+ {Peak, NewStart}
+ end
+ || I <- lists:seq(0, NewN - 1)
+ ],
+ queue:from_list(Buckets).
+
+update_demand_buf(_, #pool{demand_buf = undefined} = Pool) ->
+ Pool;
+update_demand_buf(NewInUse, Pool) ->
+ Now = erlang:monotonic_time(millisecond),
+ Q = Pool#pool.demand_buf,
+ {value, {CurMax, CurStart}} = queue:peek(Q),
+ BucketMs = cull_bucket_size_ms(Pool#pool.cull_interval, Pool#pool.max_age),
+ case BucketMs > 0 andalso Now - CurStart >= BucketMs of
+ false ->
+ {{value, _}, Q1} = queue:out(Q),
+ Pool#pool{demand_buf = queue:in_r({max(NewInUse, CurMax), CurStart}, Q1)};
+ true ->
+ {_, Q1} = queue:out_r(Q),
+ Pool#pool{demand_buf = queue:in_r({NewInUse, Now}, Q1)}
+ end.
+
+demand_window_peak(#pool{demand_buf = undefined}) ->
+ 0;
+demand_window_peak(#pool{demand_buf = Q, max_age = MaxAge}) ->
+ MAms = time_as_millis(MaxAge),
+ case MAms of
+ 0 ->
+ 0;
+ _ ->
+ Now = erlang:monotonic_time(millisecond),
+ queue:fold(
+ fun
+ ({P, S}, Acc) when Now - S =< MAms -> max(P, Acc);
+ (_, Acc) -> Acc
+ end,
+ 0,
+ Q
+ )
+ end.
+
+oldest_idle_victims(MaxCull, #pool{all_members = AllMembers}) ->
+ Free = maps:to_list(
+ maps:filter(
+ fun
+ (_, #member{status = free}) -> true;
+ (_, _) -> false
+ end,
+ AllMembers
+ )
+ ),
+ Sorted = lists:sort(
+ fun({_, #member{time = A}}, {_, #member{time = B}}) -> A =< B end,
+ Free
+ ),
+ lists:sublist(Sorted, MaxCull).
+
%% @private
to_map(#pool{} = Pool) ->
[Name | Values] = tuple_to_list(Pool),
@@ -2314,3 +2587,74 @@ pg_join(Group, Pid) ->
pg_leave(Group, Pid) ->
pg:leave(Group, Pid).
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+remap_demand_buf_test() ->
+ Now = 100_000,
+
+ %% Narrow→wide: 2 old 10s buckets merged into 1 new 20s bucket; max peak taken
+ OldQ1 = queue:from_list([{5, 90_000}, {3, 80_000}]),
+ ?assertEqual(
+ [{5, 80_000}],
+ queue:to_list(remap_demand_buf(OldQ1, 1, 20_000, Now))
+ ),
+
+ %% Wide→narrow: 1 old 20s bucket split into 2 new 10s buckets; peak propagated
+ OldQ2 = queue:from_list([{8, 80_000}, {2, 60_000}]),
+ [{B0, _}, {B1, _}] = queue:to_list(remap_demand_buf(OldQ2, 2, 10_000, Now)),
+ %% [90_000,100_000) overlaps old [80_000,100_000)
+ ?assertEqual(8, B0),
+ %% [80_000, 90_000) overlaps old [80_000,100_000)
+ ?assertEqual(8, B1),
+
+ %% Old entries fully outside new window → zero peaks
+ OldQ3 = queue:from_list([{10, 50_000}, {10, 40_000}]),
+ ?assertEqual(
+ [{0, 90_000}, {0, 80_000}],
+ queue:to_list(remap_demand_buf(OldQ3, 2, 10_000, Now))
+ ).
+
+rebuild_demand_buf_from_members_test() ->
+ %% Disabled when cull_interval is 0
+ ?assertEqual(
+ undefined,
+ rebuild_demand_buf_from_members({0, min}, {30, sec}, #{}, 0)
+ ),
+
+ %% CI=10s, MA=20s → 2 buckets of 10s each
+ Now = erlang:monotonic_time(millisecond),
+
+ %% Build a fake os:timestamp() for a member last-returned OffsetMs ago.
+ %% Inverse of member_time_to_monotonic_ms/1: monotonic → wall → {Mega,Sec,Micro}.
+ MkTs = fun(OffsetMs) ->
+ WallMs = (Now - OffsetMs) + erlang:time_offset(millisecond),
+ Mega = WallMs div 1_000_000_000,
+ RemMs = WallMs rem 1_000_000_000,
+ {Mega, RemMs div 1_000, (RemMs rem 1_000) * 1_000}
+ end,
+ FakeMember = fun(OffsetMs) ->
+ #member{
+ status = free,
+ time = MkTs(OffsetMs),
+ mref = make_ref(),
+ expires_at = infinity,
+ shard_idx = 1
+ }
+ end,
+ %% 2 members returned 5 s ago → bucket 0 (current)
+ %% 1 member returned 15 s ago → bucket 1 (older)
+ AllMembers = #{
+ a => FakeMember(5_000),
+ b => FakeMember(5_000),
+ c => FakeMember(15_000)
+ },
+ Q = rebuild_demand_buf_from_members({10, sec}, {20, sec}, AllMembers, _InUseCount = 1, Now),
+ [{B0Peak, _}, {B1Peak, _}] = queue:to_list(Q),
+ %% bucket 0: 2 members + in_use_count → 3
+ ?assertEqual(3, B0Peak),
+ %% bucket 1: 1 member, no in_use bonus → 1
+ ?assertEqual(1, B1Peak).
+
+-endif.
diff --git a/test/bench_take_return.erl b/test/bench_take_return.erl
index 24aa117..a06bf74 100644
--- a/test/bench_take_return.erl
+++ b/test/bench_take_return.erl
@@ -23,7 +23,16 @@
size_20_ttl_5ms_take_return_one/1,
bench_size_20_ttl_5ms_take_return_one/2,
size_500_ttl_5ms_take_return_one/1,
- bench_size_500_ttl_5ms_take_return_one/2
+ bench_size_500_ttl_5ms_take_return_one/2,
+ %% FIFO (queue) dispatch order — compare against lifo equivalents above
+ fifo_size_5_take_return_one/1,
+ bench_fifo_size_5_take_return_one/2,
+ fifo_size_1000_take_return_one/1,
+ bench_fifo_size_1000_take_return_one/2,
+ fifo_size_500_take_return_all/1,
+ bench_fifo_size_500_take_return_all/2,
+ fifo_size_500_clients_50_take_return_all/1,
+ bench_fifo_size_500_clients_50_take_return_all/2
]).
%% @doc Pool of fixed size 5 - try to take just one member and instantly return
@@ -304,15 +313,93 @@ bench_size_500_ttl_5ms_take_return_one(_Input, PoolName) ->
pooler:return_member(PoolName, Member)
end.
+%% FIFO benchmarks — member_order => fifo (queue dispatch, round-robin)
+
+%% @doc FIFO pool of fixed size 5 — single take/return; measures queue:out overhead vs list head.
+fifo_size_5_take_return_one(init) ->
+ start_fixed_fifo(?FUNCTION_NAME, 5),
+ ?FUNCTION_NAME;
+fifo_size_5_take_return_one({input, _}) ->
+ [];
+fifo_size_5_take_return_one({stop, PoolName}) ->
+ stop(PoolName).
+
+bench_fifo_size_5_take_return_one(_Input, PoolName) ->
+ Member = pooler:take_member(PoolName),
+ true = is_pid(Member),
+ pooler:return_member(PoolName, Member).
+
+%% @doc FIFO pool of fixed size 1000 — larger queue, measures queue:out + queue:in at scale.
+fifo_size_1000_take_return_one(init) ->
+ start_fixed_fifo(?FUNCTION_NAME, 1000),
+ ?FUNCTION_NAME;
+fifo_size_1000_take_return_one({input, _}) ->
+ [];
+fifo_size_1000_take_return_one({stop, PoolName}) ->
+ stop(PoolName).
+
+bench_fifo_size_1000_take_return_one(_Input, PoolName) ->
+ Member = pooler:take_member(PoolName),
+ true = is_pid(Member),
+ pooler:return_member(PoolName, Member).
+
+%% @doc FIFO pool of fixed size 500 — drain all members then return them; exercises full rotation.
+fifo_size_500_take_return_all(init) ->
+ start_fixed_fifo(?FUNCTION_NAME, 500),
+ {?FUNCTION_NAME, 500};
+fifo_size_500_take_return_all({input, {_Pool, Size}}) ->
+ lists:seq(1, Size);
+fifo_size_500_take_return_all({stop, {PoolName, _}}) ->
+ stop(PoolName).
+
+bench_fifo_size_500_take_return_all(_Input, {PoolName, Size}) ->
+ Members = take_n(PoolName, Size),
+ [pooler:return_member(PoolName, Member) || Member <- Members].
+
+%% @doc FIFO pool, 500 members, 50 concurrent clients — measures contention on the queue.
+fifo_size_500_clients_50_take_return_all(init) ->
+ PoolSize = 500,
+ NumClients = 50,
+ PerClient = PoolSize div NumClients,
+ start_fixed_fifo(?FUNCTION_NAME, PoolSize),
+ Clients = [
+ erlang:spawn_link(fun() -> client(?FUNCTION_NAME, 0, PerClient) end)
+ || _ <- lists:seq(1, NumClients)
+ ],
+ {?FUNCTION_NAME, PoolSize, Clients};
+fifo_size_500_clients_50_take_return_all({input, _}) ->
+ [];
+fifo_size_500_clients_50_take_return_all({stop, {PoolName, _Size, Clients}}) ->
+ [
+ begin
+ unlink(Pid),
+ exit(Pid, shutdown)
+ end
+ || Pid <- Clients
+ ],
+ stop(PoolName).
+
+bench_fifo_size_500_clients_50_take_return_all(_Input, {_PoolName, _Size, Clients}) ->
+ Ref = erlang:make_ref(),
+ Self = self(),
+ lists:foreach(fun(C) -> C ! {do, Self, Ref} end, Clients),
+ lists:foreach(
+ fun(_) ->
+ receive
+ {done, RecRef} -> RecRef = Ref
+ after 5000 -> error(timeout)
+ end
+ end,
+ Clients
+ ).
+
%% Internal
start_fixed(Name, Size) ->
- Conf = [
- {name, Name},
- {init_count, Size},
- {max_count, Size}
- ],
- start(Conf).
+ start([{name, Name}, {init_count, Size}, {max_count, Size}]).
+
+start_fixed_fifo(Name, Size) ->
+ start([{name, Name}, {init_count, Size}, {max_count, Size}, {member_order, fifo}]).
start(Conf0) ->
Conf = [{start_mfa, {pooled_gs, start_link, [{"test"}]}} | Conf0],
diff --git a/test/pooler_tests.erl b/test/pooler_tests.erl
index 92a03d0..91723bc 100644
--- a/test/pooler_tests.erl
+++ b/test/pooler_tests.erl
@@ -1313,6 +1313,7 @@ reconfigure_test_() ->
?assertEqual(
{ok, [
{set_parameter, {init_count, 3}},
+ {rebuild_demand_buf},
{start_workers, 1}
]},
pooler:pool_reconfigure(Name, Config1)
@@ -1331,7 +1332,9 @@ reconfigure_test_() ->
?assertEqual(
{ok, [
{set_parameter, {init_count, 1}},
+ {rebuild_demand_buf},
{set_parameter, {max_count, 1}},
+ {rebuild_demand_buf},
{stop_free_workers, 1}
]},
pooler:pool_reconfigure(Name, Config1)
@@ -1409,6 +1412,7 @@ reconfigure_test_() ->
?assertEqual(
{ok, [
{set_parameter, {cull_interval, {10, sec}}},
+ {rebuild_demand_buf},
{reset_cull_timer, {10, sec}}
]},
pooler:pool_reconfigure(Name, NewConfig)
@@ -1421,6 +1425,7 @@ reconfigure_test_() ->
?assertEqual(
{ok, [
{set_parameter, {max_age, {100, ms}}},
+ {rebuild_demand_buf},
{cull, []}
]},
pooler:pool_reconfigure(Name, NewConfig)
@@ -1480,6 +1485,7 @@ reconfigure_test_() ->
?assertEqual(
{ok, [
{set_parameter, {max_count, NewMaxCount}},
+ {rebuild_demand_buf},
{set_parameter, {member_start_timeout, {10, sec}}},
{set_parameter, {queue_max, 100}},
{set_parameter, {metrics_mod, fake_metrics}},
@@ -1500,6 +1506,27 @@ reconfigure_test_() ->
gen_server:call(Name, dump_pool)
)
end},
+ {"Change member_order lifo→fifo: action emitted, pool functional, round-robin observable", fun() ->
+ %% Take both workers, return in known order so we know the free list state
+ [P1, P2] = get_n_pids(Name, 2, []),
+ pooler:return_member(Name, P1),
+ pooler:return_member(Name, P2),
+ %% LIFO: P2 (most recent) should come out first
+ ?assertEqual(P2, pooler:take_member(Name)),
+ pooler:return_member(Name, P2),
+ %% Reconfigure to FIFO — verify action list
+ ?assertEqual(
+ {ok, [{set_member_order, fifo}]},
+ pooler:pool_reconfigure(Name, StartConfig#{member_order => fifo})
+ ),
+ ?assertMatch(#{member_order := fifo}, gen_server:call(Name, dump_pool)),
+ %% Under FIFO: taking P2 (front) and returning it should rotate
+ ?assertEqual(P2, pooler:take_member(Name)),
+ pooler:return_member(Name, P2),
+ %% P2 was just returned to the back; P1 (idle longest) should come next
+ ?assertEqual(P1, pooler:take_member(Name)),
+ pooler:return_member(Name, P1)
+ end},
{"Update failed", fun() ->
?assertEqual(
{error, changed_unsupported_parameter},
@@ -1513,6 +1540,14 @@ reconfigure_test_() ->
Name, StartConfig#{name := not_a_pool_name}
)
)
+ end},
+ {"reconfigure rejected when cull/max_age combo exceeds demand buf limit", fun() ->
+ %% cull_interval=10s, max_age=1h → 360 buckets > 256 cap
+ BadConfig = StartConfig#{cull_interval => {10, sec}, max_age => {1, hour}},
+ ?assertMatch(
+ {error, {demand_history_too_large, #{computed_buckets := 360}}},
+ pooler:pool_reconfigure(Name, BadConfig)
+ )
end}
]}.
@@ -2369,3 +2404,134 @@ pg_stop() ->
pg_leave(Group, Pid) ->
pg:leave(Group, Pid).
+
+%% ===== FIFO member_order tests =====
+
+pooler_fifo_member_order_test_() ->
+ {setup,
+ fun() ->
+ application:set_env(pooler, pools, [
+ #{
+ name => test_pool_fifo,
+ max_count => 5,
+ init_count => 3,
+ start_mfa => {pooled_gs, start_link, [{"fifo-type"}]},
+ member_order => fifo
+ }
+ ]),
+ application:start(pooler)
+ end,
+ fun(_) -> application:stop(pooler) end, [
+ {"fifo: members served in round-robin order", fun() ->
+ P1 = pooler:take_member(test_pool_fifo),
+ P2 = pooler:take_member(test_pool_fifo),
+ P3 = pooler:take_member(test_pool_fifo),
+ ?assert(is_pid(P1)),
+ ?assertNotEqual(P1, P2),
+ ?assertNotEqual(P2, P3),
+ pooler:return_member(test_pool_fifo, P1),
+ pooler:return_member(test_pool_fifo, P2),
+ pooler:return_member(test_pool_fifo, P3),
+ %% FIFO: oldest idle (P1) comes out first
+ ?assertEqual(P1, pooler:take_member(test_pool_fifo)),
+ ?assertEqual(P2, pooler:take_member(test_pool_fifo)),
+ ?assertEqual(P3, pooler:take_member(test_pool_fifo)),
+ pooler:return_member(test_pool_fifo, P1),
+ pooler:return_member(test_pool_fifo, P2),
+ pooler:return_member(test_pool_fifo, P3)
+ end}
+ ]}.
+
+pooler_fifo_cull_test_() ->
+ {setup,
+ fun() ->
+ application:set_env(pooler, pools, [
+ #{
+ name => test_pool_fifo_cull,
+ max_count => 5,
+ init_count => 2,
+ start_mfa => {pooled_gs, start_link, [{"fifo-cull"}]},
+ member_order => fifo,
+ cull_interval => {200, ms},
+ max_age => {0, min}
+ }
+ ]),
+ application:start(pooler)
+ end,
+ fun(_) -> application:stop(pooler) end, [
+ {"fifo: excess members culled to init_count", fun() ->
+ Pids = get_n_pids(test_pool_fifo_cull, 5, []),
+ [pooler:return_member(test_pool_fifo_cull, P) || P <- Pids],
+ wait_for_pool_state(
+ test_pool_fifo_cull,
+ 2000,
+ fun(U) -> maps:get(free_count, U) =:= 2 end
+ )
+ end}
+ ]}.
+
+pooler_cull_size_invariant_test_() ->
+ {setup,
+ fun() ->
+ Base = #{
+ max_count => 5,
+ init_count => 2,
+ start_mfa => {pooled_gs, start_link, [{"inv"}]},
+ cull_interval => {200, ms},
+ max_age => {0, min}
+ },
+ application:set_env(pooler, pools, [
+ Base#{name => test_pool_lifo_inv},
+ Base#{name => test_pool_fifo_inv, member_order => fifo}
+ ]),
+ application:start(pooler)
+ end,
+ fun(_) -> application:stop(pooler) end, [
+ {"lifo and fifo pools with identical params cull to same size", fun() ->
+ LPids = get_n_pids(test_pool_lifo_inv, 5, []),
+ FPids = get_n_pids(test_pool_fifo_inv, 5, []),
+ [pooler:return_member(test_pool_lifo_inv, P) || P <- LPids],
+ [pooler:return_member(test_pool_fifo_inv, P) || P <- FPids],
+ wait_for_pool_state(
+ test_pool_lifo_inv,
+ 2000,
+ fun(U) -> maps:get(free_count, U) =:= 2 end
+ ),
+ wait_for_pool_state(
+ test_pool_fifo_inv,
+ 2000,
+ fun(U) -> maps:get(free_count, U) =:= 2 end
+ )
+ end}
+ ]}.
+
+pooler_demand_buf_validation_test_() ->
+ {setup,
+ fun() ->
+ application:start(pooler)
+ end,
+ fun(_) ->
+ application:stop(pooler)
+ end,
+ [
+ {"new_pool rejected when cull/max_age combo exceeds demand buf limit", fun() ->
+ %% cull_interval=10s, max_age=1h → 360 buckets > 256 cap
+ %% The error is wrapped by the pool_sup → pooler_sup supervisor chain.
+ ?assertMatch(
+ {error, {
+ {shutdown,
+ {failed_to_start_child, pooler,
+ {error, {demand_history_too_large, #{computed_buckets := 360}}}}},
+ _
+ }},
+ pooler:new_pool(#{
+ name => demand_buf_test_pool,
+ init_count => 1,
+ max_count => 5,
+ start_mfa => {pooled_gs, start_link, [{"test"}]},
+ cull_interval => {10, sec},
+ max_age => {1, hour}
+ })
+ )
+ end}
+ ]}.