From 0b35b353a8398976be91f14b3a70ad2fd00f1d68 Mon Sep 17 00:00:00 2001 From: Sergey Prokhorov Date: Fri, 29 May 2026 19:10:12 +0200 Subject: [PATCH] Add member_order (fifo/lifo) + demand-history cull Introduces two related changes: 1. `member_order => fifo | lifo` pool config option (default `lifo`, preserving existing stack behaviour). `fifo` uses an OTP `queue()` as the free-member structure, producing round-robin dispatch where the longest-idle worker is always handed out next. The free-member abstraction (`free_push/pop/delete/fold/take_n/convert`) hides the list-vs-queue difference from all call sites; `free_count` is now exclusively managed by these helpers. 2. Pool-level demand-history cull replaces the per-worker `max_age` idle-time check. A fixed-capacity OTP-queue ring buffer (head = current bucket, capacity = ceil(max_age/bucket_size)) tracks peak concurrent in-use count; at each cull tick the pool sizes to that peak (floored at `init_count`). This gives LIFO and FIFO pools identical shrinkage behaviour for the same config, and correctly handles FIFO rotation where workers never accumulate idle time. `max_age` now means "demand memory window"; `cull_interval` default changed from {1,min} to {15,sec} so defaults satisfy CI <= MA. Hot-upgrade (vsn 5->6) reconstructs the demand buffer from member `time` timestamps. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- README.org | 65 +++- rebar.config | 2 + src/pooler.erl | 630 ++++++++++++++++++++++++++++--------- test/bench_take_return.erl | 101 +++++- test/pooler_tests.erl | 166 ++++++++++ 5 files changed, 799 insertions(+), 165 deletions(-) diff --git a/README.org b/README.org index 08acc1a..0d49637 100644 --- a/README.org +++ b/README.org @@ -137,18 +137,25 @@ examples are valid: 120000 %% plain integer — milliseconds, consistent with Erlang timeout conventions #+END_SRC -The =cull_interval= determines the schedule when a check will be made -for stale members. Checks are scheduled using =erlang:send_after/3= -which provides a light-weight timing mechanism. The next check is -scheduled after the prior check completes. - -During a check, pool members that have not been used in more than -=max_age= minutes will be removed until the pool size reaches -=init_count=. - -The default value for =cull_interval= is ={1, min}=. You can disable -culling by specifying a value os ={0, min}=. The =max_age= parameter -defaults to ={30, sec}=. +The =cull_interval= sets how often a cull check runs. Checks are +scheduled using =erlang:send_after/3=; the next check is scheduled +after the prior one completes. Set to ={0, min}= to disable culling +entirely. Default: ={15, sec}=. + +=max_age= is the demand memory window. Pooler tracks the peak +concurrent in-use count over the last =max_age= period. At each cull +tick the pool is sized to that peak (floored at =init_count=); workers +beyond the peak are removed oldest-idle-first. Default: ={30, sec}=. + +In practice this means: after a traffic burst the pool holds its +grown size for up to =max_age= time, then shrinks back to =init_count= +once no burst is recorded in the window. + +The most natural configuration is =cull_interval ≤ max_age= (check +more often than you forget) — the defaults satisfy this. Setting +=cull_interval > max_age= is valid: the pool still shrinks correctly, +but the memory window is shorter than the check interval, which is an +unusual combination that rarely makes sense in practice. **** Connection lifetime recycling (max_lifetime) @@ -216,9 +223,11 @@ The value is a =time_spec()= tuple; the default is ={1, min}=. **** Request queue limit (queue_max) -When all pool members are in use, =take_member= calls are queued up to -=queue_max= entries (default =50=). Callers beyond this limit receive -=error_no_members= immediately rather than waiting. Set it to =0= to disable +When all pool members are in use, =take_member/2= calls (those with an explicit +timeout) are queued up to =queue_max= entries (default =50=) and wait for a +member to become available. =take_member/1= always returns =error_no_members= +immediately and is never queued. Callers beyond the =queue_max= limit also +receive =error_no_members= immediately. Set =queue_max= to =0= to disable queuing entirely. #+BEGIN_SRC erlang @@ -404,6 +413,32 @@ members stay on their original shards and new starts fill the added shards round-robin until churn rebalances things. /Decreasing/ is not supported and returns ={error, num_member_sups_cannot_be_decreased}=. +**** Worker dispatch order (member_order) + +Controls which free worker is handed out on each =take_member= call. + +- =lifo= (default, stack) — the most recently returned worker is taken first. + Under steady load a small working set stays warm; idle workers accumulate at + the back and are culled. Good for connection pools where keeping a + connection recently used matters (e.g. avoids idle-timeout disconnects on + hot connections). + +- =fifo= (queue) — the worker that has been idle the longest is taken first, + producing round-robin rotation across the whole pool. Load is spread evenly; + no single worker becomes a hot spot. + +#+BEGIN_SRC erlang +#{name => pg_pool, + init_count => 10, + max_count => 50, + start_mfa => {epgsql, connect, [...]}, + member_order => fifo} %% round-robin; default is lifo (stack) +#+END_SRC + +Both modes produce identical cull behaviour for the same =cull_interval= / +=max_age= configuration — only which worker is handed out next differs. +=member_order= can be changed at runtime via =pooler:pool_reconfigure/2=. + *** Pool Configuration via =pooler:new_pool= You can create pools using =pooler:new_pool/1= when accepts a map of pool configuration. Here's an example: diff --git a/rebar.config b/rebar.config index 988d38c..2f47ffa 100644 --- a/rebar.config +++ b/rebar.config @@ -78,3 +78,5 @@ {warnings, [unknown]}, {plt_apps, all_deps} ]}. + +{hex, [{doc, edoc}]}. diff --git a/src/pooler.erl b/src/pooler.erl index 63be676..1406a7c 100644 --- a/src/pooler.erl +++ b/src/pooler.erl @@ -61,7 +61,7 @@ code_change/3 ]). --vsn(5). +-vsn(6). %% Bump this value and add a new clause to `code_change', if the format of `#pool{}' record changed %% ------------------------------------------------------------------ @@ -83,12 +83,13 @@ ]). -define(DEFAULT_ADD_RETRY, 1). --define(DEFAULT_CULL_INTERVAL, {1, min}). +-define(DEFAULT_CULL_INTERVAL, {15, sec}). -define(DEFAULT_MAX_AGE, {30, sec}). -define(DEFAULT_MEMBER_START_TIMEOUT, {1, min}). -define(DEFAULT_AUTO_GROW_THRESHOLD, undefined). -define(POOLER_GROUP_TABLE, pooler_group_table). -define(DEFAULT_POOLER_QUEUE_MAX, 50). +-define(MAX_DEMAND_HISTORY_BUCKETS, 256). %% Per-pool TTL configuration and timer state. %% `undefined' in #pool.ttl means TTL is disabled — all TTL functions short-circuit immediately. @@ -121,8 +122,16 @@ max_count = 100 :: non_neg_integer(), init_count = 10 :: non_neg_integer(), start_mfa :: {atom(), atom(), [term()]}, - free_pids = [] :: [pid()], + %% LIFO (default): list, used as a stack. FIFO: OTP queue. + %% Write only via free_push/free_pop/free_delete/free_convert — never update directly. + free_pids = [] :: [pid()] | queue:queue(pid()), + member_order = lifo :: lifo | fifo, + %% Demand ring-buffer for pool-level cull. Head = current bucket {PeakInUse, BucketStartMs}. + %% Queue is always at capacity (pre-filled with {0,0} dummies). Write via update_demand_buf. + demand_buf = undefined :: undefined | queue:queue({non_neg_integer(), integer()}), in_use_count = 0 :: non_neg_integer(), + %% Kept in sync with queue:len(free_pids) / length(free_pids). + %% Write only via free_push/free_pop/free_delete — never update directly. free_count = 0 :: non_neg_integer(), %% The number times to attempt adding a pool member if the %% pool size is below max_count and there are no free @@ -253,7 +262,8 @@ max_lifetime => time_spec(), max_lifetime_jitter => time_spec(), num_member_sups => pos_integer(), - member_shutdown => brutal_kill | pos_integer() + member_shutdown => brutal_kill | pos_integer(), + member_order => lifo | fifo }. %% See {@link pooler:new_pool/1} @@ -286,6 +296,8 @@ '$pooler_pid' | '$pooler_member_sup' | '$pooler_pool' | '$pooler_pool_name' | any(), ... ]}} | {auto_grow_threshold, non_neg_integer()}} + | {set_member_order, lifo | fifo} + | {rebuild_demand_buf} | {update_ttl, undefined | #ttl{}}. -type member_expiry() :: integer() | infinity. @@ -394,21 +406,19 @@ create_group_table() -> %% to. Pools sharing a common `group' value can be accessed using %% {@link take_group_member/1}, {@link return_group_member/2} and {@link group_pools/1}. %%
`cull_interval'
-%%
Default: `{1, min}'. Time between checks for stale pool members. Specified as -%% a `{Time, Unit}' tuple where `Unit' is one of `hour', `min', `sec', `ms', `mu', +%%
Default: `{15, sec}'. How often to run a cull check. Specified as a +%% `{Time, Unit}' tuple where `Unit' is one of `hour', `min', `sec', `ms', `mu', %% or as a plain non-negative integer (milliseconds). -%% Triggers a once per `cull_interval' check to remove members that have not -%% been accessed in `max_age' time units. Culling can be disabled by -%% specifying a zero time vaule (e.g. `{0, min}'). Culling will also be -%% disabled if `init_count' is the same as `max_count'.
+%% Culling can be disabled by specifying a zero value (e.g. `{0, min}'). +%% Culling is also disabled if `init_count' equals `max_count'. %%
`max_age'
-%%
Default: `{30, sec}'. Members idle longer than `max_age' time units are removed from -%% the pool when stale checking is enabled via -%% `cull_interval'. Culling of idle members will never reduce the pool -%% below `init_count'. The value is specified as `{Time, Unit}'. Note -%% that timers are not set on individual pool members and may remain -%% in the pool beyond the configured `max_age' value since members are -%% only removed on the interval configured via `cull_interval'.
+%%
Default: `{30, sec}'. Demand memory window for pool sizing. Pooler tracks the +%% peak concurrent in-use count over the last `max_age' period; at each cull tick +%% the pool is sized to that peak (floored at `init_count'). Workers beyond the +%% peak are removed oldest-idle-first. Setting to zero disables demand tracking +%% and always targets `init_count'. +%% Typical configuration: `cull_interval' <= `max_age' (check more often +%% than you forget). `cull_interval' > `max_age' is valid but unusual.
%%
`member_start_timeout'
%%
Default: `{1, min}'. Time limit for member starts. Specified as `{Time, Unit}'.
%%
`queue_max'
@@ -446,6 +456,14 @@ create_group_table() -> %% from `[-jitter, +jitter]', so members started together do not all expire simultaneously. %% Only meaningful when `max_lifetime' is set. Must be strictly less than `max_lifetime'; %% pool start and reconfigure will fail with `jitter_must_be_less_than_max_lifetime' otherwise. +%%
`member_order'
+%%
Default: `lifo' (stack). Controls which free worker is handed out next. +%% `lifo' — most recently returned worker first; keeps a hot working set warm under +%% steady load and lets idle workers accumulate for culling. +%% `fifo' (queue) — longest-idle worker first; rotates all workers evenly, +%% spreading load across the pool. +%% Both modes produce identical cull behaviour for the same `cull_interval' / `max_age'. +%% Reconfigurable via {@link pool_reconfigure/2}.
%% -spec new_pool(pool_config() | pool_config_legacy()) -> {ok, pid()} | {error, {already_started, pid()}}. new_pool(PoolConfig) -> @@ -757,6 +775,15 @@ init(#{name := Name, max_count := MaxCount, init_count := InitCount, start_mfa : {error, Err} -> exit({error, Err}); {ok, T} -> T end, + case + validate_demand_buf_size( + maps:get(cull_interval, P, ?DEFAULT_CULL_INTERVAL), + maps:get(max_age, P, ?DEFAULT_MAX_AGE) + ) + of + ok -> ok; + {error, _} = DemandBufErr -> exit(DemandBufErr) + end, NumMemberSups = maps:get(num_member_sups, P, 1), MemberSups = pooler_pool_sup:member_sup_names(Name, NumMemberSups), Pool = #pool{ @@ -776,7 +803,20 @@ init(#{name := Name, max_count := MaxCount, init_count := InitCount, start_mfa : metrics_api = maps:get(metrics_api, P, folsom), queue_max = maps:get(queue_max, P, ?DEFAULT_POOLER_QUEUE_MAX), ttl = TTL, - member_sups = MemberSups + member_sups = MemberSups, + member_order = maps:get(member_order, P, lifo), + free_pids = free_new(maps:get(member_order, P, lifo)), + demand_buf = + case InitCount =:= MaxCount of + %% fixed-size pool: cull never fires, no tracking needed + true -> + undefined; + false -> + init_demand_buf( + maps:get(cull_interval, P, ?DEFAULT_CULL_INTERVAL), + maps:get(max_age, P, ?DEFAULT_MAX_AGE) + ) + end }, %% This schedules the next cull when the pool is configured for %% such and is otherwise a no-op. @@ -811,8 +851,9 @@ handle_call(pool_utilization, _From, Pool) -> {reply, compute_utilization(Pool), Pool}; handle_call(dump_pool, _From, Pool) -> {reply, to_map(Pool), Pool}; -handle_call({call_free_members, Fun}, _From, #pool{free_pids = Pids} = Pool) -> - {reply, do_call_free_members(Fun, Pids), Pool}; +handle_call({call_free_members, Fun}, _From, Pool) -> + Results = free_fold(fun(Pid, Acc) -> [do_call_free_member(Fun, Pid) | Acc] end, [], Pool), + {reply, Results, Pool}; handle_call({reconfigure, NewConfig}, _From, Pool) -> case calculate_reconfigure_actions(NewConfig, Pool) of {ok, Actions} = Res -> @@ -911,7 +952,12 @@ code_change(3, OldState, Extra) when tuple_size(OldState) =:= 27 -> %% 2-tuple starting_members) → v5 (`member_sups' tuple + `next_shard', %% `#member{}' record entries, 3-tuple starting_members). code_change(4, OldState, _Extra) when is_tuple(OldState), tuple_size(OldState) =:= 28, element(1, OldState) =:= pool -> - {ok, do_upgrade_to_v5(OldState)}; + code_change(5, do_upgrade_to_v5(OldState), _Extra); +%% v5 #pool{} (29-tuple, no member_order/demand_buf) → v6. +code_change(5, OldState, _Extra) when + is_tuple(OldState), tuple_size(OldState) =:= 29, element(1, OldState) =:= pool +-> + {ok, do_upgrade_to_v6(OldState)}; code_change(_, State, _Extra) -> {ok, State}. @@ -976,6 +1022,19 @@ do_upgrade_to_v5( AllMembers ), NewStartingMembers = [{P, T, 1} || {P, T} <- StartingMembers], + {pool, Name, Group, MaxCount, InitCount, StartMFA, FreePids, InUseCount, FreeCount, AddMemberRetry, CullInterval, + MaxAge, CullTimer, TTL, {MemberSup}, 1, StarterSup, NewAllMembers, ConsumerToPid, NewStartingMembers, + StoppingCount, MemberStartTimeout, AutoGrowThreshold, StopMFA, InitializeMFA, MetricsMod, MetricsAPI, + QueuedRequestors, QueueMax}. + +%% v5 #pool{} → v6: positional destructure (v5 has no member_order/demand_buf), +%% then construct a proper v6 record and warm the demand_buf from member timestamps. +do_upgrade_to_v6( + {pool, Name, Group, MaxCount, InitCount, StartMFA, FreePids, InUseCount, FreeCount, AddMemberRetry, CullInterval, + MaxAge, CullTimer, TTL, MemberSups, NextShard, StarterSup, AllMembers, ConsumerToPid, StartingMembers, + StoppingCount, MemberStartTimeout, AutoGrowThreshold, StopMFA, InitializeMFA, MetricsMod, MetricsAPI, + QueuedRequestors, QueueMax} +) -> #pool{ name = Name, group = Group, @@ -983,6 +1042,8 @@ do_upgrade_to_v5( init_count = InitCount, start_mfa = StartMFA, free_pids = FreePids, + member_order = lifo, + demand_buf = rebuild_demand_buf_from_members(CullInterval, MaxAge, AllMembers, InUseCount), in_use_count = InUseCount, free_count = FreeCount, add_member_retry = AddMemberRetry, @@ -990,12 +1051,12 @@ do_upgrade_to_v5( max_age = MaxAge, cull_timer = CullTimer, ttl = TTL, - member_sups = {MemberSup}, - next_shard = 1, + member_sups = MemberSups, + next_shard = NextShard, starter_sup = StarterSup, - all_members = NewAllMembers, + all_members = AllMembers, consumer_to_pid = ConsumerToPid, - starting_members = NewStartingMembers, + starting_members = StartingMembers, stopping_count = StoppingCount, member_start_timeout = MemberStartTimeout, auto_grow_threshold = AutoGrowThreshold, @@ -1007,6 +1068,59 @@ do_upgrade_to_v5( queue_max = QueueMax }. +%% Returns an initialised demand_buf estimated from member last-return timestamps. +%% Lower bound: members that rotated through a bucket multiple times only appear once. +rebuild_demand_buf_from_members(CI, MA, AllMembers, InUseCount) -> + rebuild_demand_buf_from_members( + CI, + MA, + AllMembers, + InUseCount, + erlang:monotonic_time(millisecond) + ). + +rebuild_demand_buf_from_members(CI, MA, AllMembers, InUseCount, Now) -> + case cull_bucket_size_ms(CI, MA) of + 0 -> + undefined; + BucketMs -> + N = min(ceil(time_as_millis(MA) / BucketMs), ?MAX_DEMAND_HISTORY_BUCKETS), + MAms = time_as_millis(MA), + Buckets = [ + begin + BucketEnd = Now - I * BucketMs, + BucketStart = BucketEnd - BucketMs, + Count = maps:fold( + fun(_, #member{time = T}, Acc) -> + Tms = member_time_to_monotonic_ms(T), + case Tms >= BucketStart andalso Tms < BucketEnd of + true -> Acc + 1; + false -> Acc + end + end, + 0, + AllMembers + ), + Peak = + Count + + (if + I =:= 0 -> InUseCount; + true -> 0 + end), + case Now - BucketStart =< MAms of + true -> {Peak, BucketStart}; + false -> {0, 0} + end + end + || I <- lists:seq(0, N - 1) + ], + queue:from_list(Buckets) + end. + +member_time_to_monotonic_ms({Mega, Sec, Micro}) -> + WallMs = (Mega * 1_000_000 + Sec) * 1000 + Micro div 1000, + WallMs - erlang:time_offset(millisecond). + %% ------------------------------------------------------------------ %% Internal Function Definitions %% ------------------------------------------------------------------ @@ -1074,17 +1188,12 @@ do_accept_member( maybe_reply_with_pid( Pid, Pool = #pool{ - queued_requestors = QueuedRequestors, - free_pids = Free, - free_count = NumFree + queued_requestors = QueuedRequestors } ) when is_pid(Pid) -> case queue:out(QueuedRequestors) of {empty, _} -> - Pool#pool{ - free_pids = [Pid | Free], - free_count = NumFree + 1 - }; + free_push(Pid, Pool); {{value, {From = {APid, _}, TRef}}, NewQueuedRequestors} when is_pid(APid) -> reply_to_queued_requestor(TRef, Pid, From, NewQueuedRequestors, Pool) end. @@ -1098,34 +1207,39 @@ reply_to_queued_requestor(TRef, Pid, From = {APid, _}, NewQueuedRequestors, Pool gen_server:reply(From, Pid), Pool1. +%% Clause 1: normal take from free list. Pool already has free_pids updated by free_pop. -spec take_member_bookkeeping( pid(), gen_server_from(), - [pid()] | requestor_queue(), #pool{} ) -> #pool{}. take_member_bookkeeping( MemberPid, {CPid, _}, - Rest, Pool = #pool{ in_use_count = NumInUse, - free_count = NumFree, consumer_to_pid = CPMap, all_members = AllMembers } ) when is_pid(MemberPid), - is_pid(CPid), - is_list(Rest) + is_pid(CPid) -> - Pool#pool{ - free_pids = Rest, - in_use_count = NumInUse + 1, - free_count = NumFree - 1, + NewInUse = NumInUse + 1, + Pool1 = Pool#pool{ + in_use_count = NewInUse, consumer_to_pid = add_member_to_consumer(MemberPid, CPid, CPMap), all_members = set_cpid_for_member(MemberPid, CPid, AllMembers) - }; + }, + update_demand_buf(NewInUse, Pool1). + +%% Clause 2: queued requestor take. Called from reply_to_queued_requestor. +-spec take_member_bookkeeping( + pid(), + gen_server_from(), + requestor_queue(), + #pool{} +) -> #pool{}. take_member_bookkeeping( MemberPid, {ReplyPid, _Tag}, @@ -1136,12 +1250,14 @@ take_member_bookkeeping( consumer_to_pid = CPMap } ) -> - Pool#pool{ - in_use_count = NumInUse + 1, + NewInUse = NumInUse + 1, + Pool1 = Pool#pool{ + in_use_count = NewInUse, all_members = set_cpid_for_member(MemberPid, ReplyPid, AllMembers), consumer_to_pid = add_member_to_consumer(MemberPid, ReplyPid, CPMap), queued_requestors = NewQueuedRequestors - }. + }, + update_demand_buf(NewInUse, Pool1). -spec remove_stale_starting_members( #pool{}, @@ -1237,12 +1353,12 @@ take_member_from_pool( NumFree = Pool1#pool.free_count, StoppingCount = Pool1#pool.stopping_count, NumCanAdd = Max - (NumInUse + NumFree + NonStaleStartingMemberCount + StoppingCount), - case Pool1#pool.free_pids of - [] when NumCanAdd =< 0 -> + case free_pop(Pool1) of + empty when NumCanAdd =< 0 -> send_metric(Pool, error_no_members_count, {inc, 1}, counter, #{reason => at_capacity}), send_metric(Pool, events, error_no_members, history), {error_no_members, Pool1}; - [] when NumCanAdd > 0 -> + empty -> %% Limit concurrently starting members to init_count. Add %% up to init_count members. Starting members here means %% we always return an error_no_members for a take request @@ -1255,23 +1371,23 @@ take_member_from_pool( send_metric(Pool, error_no_members_count, {inc, 1}, counter, #{reason => all_in_use}), send_metric(Pool, events, error_no_members, history), {error_no_members, Pool2}; - [Pid | Rest] -> - Pool2 = take_member_bookkeeping(Pid, From, Rest, Pool1), - Pool3 = - case Pool2#pool.auto_grow_threshold of + {Pid, Pool2} -> + Pool3 = take_member_bookkeeping(Pid, From, Pool2), + Pool4 = + case Pool3#pool.auto_grow_threshold of N when is_integer(N) andalso - Pool2#pool.free_count =< N andalso + Pool3#pool.free_count =< N andalso NumCanAdd > 0 -> NumToAdd = max(min(InitCount - NonStaleStartingMemberCount, NumCanAdd), 0), - add_members_async(NumToAdd, Pool2); + add_members_async(NumToAdd, Pool3); _ -> - Pool2 + Pool3 end, - send_metric(Pool, in_use_count, Pool3#pool.in_use_count, gauge), - send_metric(Pool, free_count, Pool3#pool.free_count, gauge), - {Pid, Pool3} + send_metric(Pool, in_use_count, Pool4#pool.in_use_count, gauge), + send_metric(Pool, free_count, Pool4#pool.free_count, gauge), + {Pid, Pool4} end. -spec take_member_from_pool_queued( @@ -1370,9 +1486,7 @@ do_return_member( Pool; #member{status = CPid, expires_at = ExpTs} = Member -> #pool{ - free_pids = Free, - in_use_count = NumInUse, - free_count = NumFree + in_use_count = NumInUse } = Pool, Pool1 = Pool#pool{ in_use_count = NumInUse - 1, @@ -1406,7 +1520,7 @@ do_return_member( }, case queue:out(QueuedRequestors) of {empty, _} -> - Pool3 = Pool2#pool{free_pids = [Pid | Free], free_count = NumFree + 1}, + Pool3 = free_push(Pid, Pool2), maybe_advance_ttl_timer(Pid, ExpTs, Pool3); {{value, {From = {APid, _}, TRef}}, NewQueuedRequestors} when is_pid(APid) -> reply_to_queued_requestor(TRef, Pid, From, NewQueuedRequestors, Pool2) @@ -1470,11 +1584,9 @@ handle_unexpected_member_down(Pid, Pool) -> AllMembers = Pool#pool.all_members, case maps:get(Pid, AllMembers, undefined) of #member{status = free} -> - Pool1 = Pool#pool{ - free_pids = lists:delete(Pid, Pool#pool.free_pids), - free_count = Pool#pool.free_count - 1, + Pool1 = free_delete(Pid, Pool#pool{ all_members = maps:remove(Pid, AllMembers) - }, + }), send_metric(Pool1, killed_free_count, {inc, 1}, counter, #{reason => crashed}), Pool2 = case Pool1#pool.ttl of @@ -1513,12 +1625,10 @@ remove_pid(Pid, Pool, Flag, Reason) -> } = Pool, case maps:get(Pid, AllMembers, undefined) of #member{status = free, shard_idx = ShardIdx} = Member -> - Pool1 = Pool#pool{ - free_pids = lists:delete(Pid, Pool#pool.free_pids), - free_count = Pool#pool.free_count - 1, + Pool1 = free_delete(Pid, Pool#pool{ stopping_count = Pool#pool.stopping_count + 1, all_members = AllMembers#{Pid => Member#member{status = {stopping, Flag}}} - }, + }), pooler_starter_sup:new_stopper( pooler_starter:stop_spec(PoolName, member_sup_for(ShardIdx, Pool1), Pid, StopMFA) ), @@ -1593,7 +1703,8 @@ add_member_to_consumer(MemberPid, CPid, CPMap) -> -spec cull_members_from_pool(#pool{}) -> #pool{}. cull_members_from_pool(#pool{cull_interval = {0, _}} = Pool) -> - %% 0 cull_interval means do not cull + Pool; +cull_members_from_pool(#pool{cull_interval = 0} = Pool) -> Pool; cull_members_from_pool(#pool{init_count = C, max_count = C} = Pool) -> %% if init_count matches max_count, then we will not dynamically @@ -1602,31 +1713,25 @@ cull_members_from_pool(#pool{init_count = C, max_count = C} = Pool) -> Pool; cull_members_from_pool( #pool{ - free_count = FreeCount, - init_count = InitCount, - in_use_count = InUseCount, cull_interval = Delay, - cull_timer = CullTRef, - max_age = MaxAge, - all_members = AllMembers + cull_timer = CullTRef } = Pool ) -> case is_reference(CullTRef) of true -> erlang:cancel_timer(CullTRef); false -> noop end, - MaxCull = FreeCount - (InitCount - InUseCount), + WinPeak = demand_window_peak(Pool), + TargetSize = max(Pool#pool.init_count, WinPeak), + MaxCull = Pool#pool.free_count - max(0, TargetSize - Pool#pool.in_use_count), Pool1 = case MaxCull > 0 of true -> - MemberInfo = member_info(Pool#pool.free_pids, AllMembers), - ExpiredMembers = - expired_free_members(MemberInfo, os:timestamp(), MaxAge), - CullList = lists:sublist(ExpiredMembers, MaxCull), + Victims = oldest_idle_victims(MaxCull, Pool), lists:foldl( fun({CullMe, _}, S) -> remove_pid(CullMe, S, no_replace, max_age) end, Pool, - CullList + Victims ); false -> Pool @@ -1645,23 +1750,6 @@ schedule_cull(Pool, Delay) -> DelayMillis = time_as_millis(Delay), erlang:send_after(DelayMillis, Pool, cull_pool). --spec member_info([pid()], members_map()) -> [{pid(), #member{}}]. -member_info(Pids, AllMembers) -> - maps:to_list(maps:with(Pids, AllMembers)). - --spec expired_free_members( - Members :: [{pid(), #member{}}], - Now :: {_, _, _}, - MaxAge :: time_spec() -) -> [{pid(), #member{}}]. -expired_free_members(Members, Now, MaxAge) -> - MaxMicros = time_as_micros(MaxAge), - [ - MI - || MI = {_, #member{status = free, time = LastReturn}} <- Members, - timer:now_diff(Now, LastReturn) >= MaxMicros - ]. - -spec calculate_reconfigure_actions(pool_config(), #pool{}) -> {ok, [reconfigure_action()]} | {error, any()}. calculate_reconfigure_actions( #{name := Name, start_mfa := MFA} = NewConfig, @@ -1681,10 +1769,17 @@ calculate_reconfigure_actions( metrics_mod => pooler_no_metrics, metrics_api => folsom, queue_max => ?DEFAULT_POOLER_QUEUE_MAX, - num_member_sups => 1 + num_member_sups => 1, + member_order => lifo }, + NewCI = maps:get(cull_interval, NewConfig, ?DEFAULT_CULL_INTERVAL), + NewMA = maps:get(max_age, NewConfig, ?DEFAULT_MAX_AGE), NewWithDefaults0 = maps:merge(Defaults, NewConfig), try + case validate_demand_buf_size(NewCI, NewMA) of + {error, _} = DemandSizeErr -> throw(DemandSizeErr); + ok -> ok + end, NewWithDefaults = case ttl_from_config(NewConfig) of {error, _} = Err -> throw(Err); @@ -1708,7 +1803,8 @@ calculate_reconfigure_actions( initialize_mfa, auto_grow_threshold, ttl, - num_member_sups + num_member_sups, + member_order ] ) of @@ -1736,7 +1832,8 @@ mk_rec_action(init_count, NewInitCount, _, #pool{init_count = OldInitCount, in_u -> AliveCount = InUse + Free, [ - {set_parameter, {init_count, NewInitCount}} + {set_parameter, {init_count, NewInitCount}}, + {rebuild_demand_buf} | case AliveCount < NewInitCount of true -> [{start_workers, NewInitCount - AliveCount}]; @@ -1745,29 +1842,29 @@ mk_rec_action(init_count, NewInitCount, _, #pool{init_count = OldInitCount, in_u end ]; mk_rec_action(init_count, NewInitCount, _, #pool{init_count = OldInitCount}) when NewInitCount < OldInitCount -> - [{set_parameter, {init_count, NewInitCount}}]; + [{set_parameter, {init_count, NewInitCount}}, {rebuild_demand_buf}]; mk_rec_action(max_count, NewMaxCount, _, #pool{max_count = OldMaxCount, in_use_count = InUse, free_count = Free}) when NewMaxCount < OldMaxCount -> AliveCount = InUse + Free, [ - {set_parameter, {max_count, NewMaxCount}} + {set_parameter, {max_count, NewMaxCount}}, + {rebuild_demand_buf} | case AliveCount > NewMaxCount of true when Free >= (AliveCount - NewMaxCount) -> - %% We have enough free workers to shut down [{stop_free_workers, AliveCount - NewMaxCount}]; true -> - %% We don't have enough free workers to shutdown throw({error, {max_count, not_enough_free_workers_to_shutdown}}); false -> [] end ]; mk_rec_action(max_count, NewMaxCount, _, #pool{max_count = OldMaxCount}) when NewMaxCount > OldMaxCount -> - [{set_parameter, {max_count, NewMaxCount}}]; + [{set_parameter, {max_count, NewMaxCount}}, {rebuild_demand_buf}]; mk_rec_action(cull_interval, New, _, #pool{cull_interval = Old, cull_timer = _Timer}) when New =/= Old -> [ - {set_parameter, {cull_interval, New}} + {set_parameter, {cull_interval, New}}, + {rebuild_demand_buf} | case time_as_millis(New) < time_as_millis(Old) of true -> [{reset_cull_timer, New}]; @@ -1777,7 +1874,8 @@ mk_rec_action(cull_interval, New, _, #pool{cull_interval = Old, cull_timer = _Ti ]; mk_rec_action(max_age, New, _, #pool{max_age = Old}) when New =/= Old -> [ - {set_parameter, {max_age, New}} + {set_parameter, {max_age, New}}, + {rebuild_demand_buf} | case time_as_millis(New) < time_as_millis(Old) of true -> [{cull, []}]; @@ -1822,6 +1920,8 @@ mk_rec_action(num_member_sups, New, _, #pool{member_sups = Sups}) -> New < OldN -> throw({error, num_member_sups_cannot_be_decreased}); true -> [] end; +mk_rec_action(member_order, New, _, #pool{member_order = Old}) when New =/= Old -> + [{set_member_order, New}]; mk_rec_action(_Param, _NewVal, _, _Pool) -> %% not changed []. @@ -1831,8 +1931,8 @@ apply_reconfigure_action({set_parameter, {Name, Value}}, Pool) -> set_parameter(Name, Value, Pool); apply_reconfigure_action({start_workers, Count}, Pool) -> add_members_async(Count, Pool); -apply_reconfigure_action({stop_free_workers, Count}, #pool{free_pids = Free} = Pool) -> - lists:foldl(fun(P, S) -> remove_pid(P, S, no_replace, reconfigured) end, Pool, lists:sublist(Free, Count)); +apply_reconfigure_action({stop_free_workers, Count}, Pool) -> + lists:foldl(fun(P, S) -> remove_pid(P, S, no_replace, reconfigured) end, Pool, free_take_n(Count, Pool)); apply_reconfigure_action({shrink_queue, Count}, #pool{queued_requestors = Q} = Pool) -> {ToShrink, ToKeep} = lists:split(Count, queue:to_list(Q)), [gen_server:reply(From, error_no_members) || {From, _TRef} <- ToShrink], @@ -1859,6 +1959,10 @@ apply_reconfigure_action( NewNames = pooler_pool_sup:add_member_sups(PoolName, StartMFA, OldN, NewN), NewSups = list_to_tuple(tuple_to_list(OldSups) ++ NewNames), Pool#pool{member_sups = NewSups}; +apply_reconfigure_action({set_member_order, Order}, Pool) -> + free_convert(Pool, Order); +apply_reconfigure_action({rebuild_demand_buf}, Pool) -> + rebuild_demand_buf(Pool); apply_reconfigure_action({update_ttl, NewTTL}, Pool) -> case Pool#pool.ttl of #ttl{timer = TRef} when is_reference(TRef) -> erlang:cancel_timer(TRef); @@ -2020,7 +2124,7 @@ time_as_millis({Time, Unit}) -> time_as_millis(Time) when is_integer(Time) -> Time. --spec time_as_micros(time_spec()) -> non_neg_integer(). +-spec time_as_micros({non_neg_integer(), time_unit()}) -> non_neg_integer(). %% @doc Convert time unit into microseconds time_as_micros({Time, hour}) -> 60 * 60 * 1000 * 1000 * Time; @@ -2031,9 +2135,7 @@ time_as_micros({Time, sec}) -> time_as_micros({Time, ms}) -> 1000 * Time; time_as_micros({Time, mu}) -> - Time; -time_as_micros(Time) when is_integer(Time) -> - Time * 1000. + Time. secs_between({Mega1, Secs1, _}, {Mega2, Secs2, _}) -> (Mega2 - Mega1) * 1000000 + (Secs2 - Secs1). @@ -2078,12 +2180,10 @@ is_member_expired(Pid, #pool{all_members = AllMembers}) -> undefined -> false end. -%% @doc Evict an expired member that was just dequeued from the head of free_pids. -%% Unlike remove_pid/3, does NOT call lists:delete (we already have Rest). --spec remove_free_head(pid(), [pid()], #pool{}) -> #pool{}. +%% @doc Evict an expired member. Pool already has free_pids updated by free_pop. +-spec remove_free_head(pid(), #pool{}) -> #pool{}. remove_free_head( Pid, - Rest, #pool{ all_members = AllMembers, ttl = TTL, @@ -2093,8 +2193,6 @@ remove_free_head( ) -> #member{status = free, shard_idx = ShardIdx} = Member = maps:get(Pid, AllMembers), Pool1 = Pool#pool{ - free_pids = Rest, - free_count = Pool#pool.free_count - 1, stopping_count = Pool#pool.stopping_count + 1, all_members = AllMembers#{Pid => Member#member{status = {stopping, replace}}} }, @@ -2129,12 +2227,15 @@ evict_expired_heads(#pool{ttl = #ttl{timer_target = BeforeTarget}} = Pool) -> end. -spec evict_expired_heads_loop(#pool{}) -> #pool{}. -evict_expired_heads_loop(#pool{free_pids = []} = Pool) -> - Pool; -evict_expired_heads_loop(#pool{free_pids = [Pid | Rest]} = Pool) -> - case is_member_expired(Pid, Pool) of - false -> Pool; - true -> evict_expired_heads_loop(remove_free_head(Pid, Rest, Pool)) +evict_expired_heads_loop(Pool) -> + case free_pop(Pool) of + empty -> + Pool; + {Pid, Pool1} -> + case is_member_expired(Pid, Pool) of + false -> Pool; + true -> evict_expired_heads_loop(remove_free_head(Pid, Pool1)) + end end. %% @doc Schedule or advance the TTL timer when a member joins free_pids. @@ -2176,22 +2277,14 @@ schedule_ttl_timer_for(Pid, ExpTs, TTL, Pool) -> -spec reschedule_ttl_timer(#pool{}) -> #pool{}. reschedule_ttl_timer(#pool{ttl = undefined} = Pool) -> Pool; -reschedule_ttl_timer(#pool{ttl = TTL, free_pids = FreePids, all_members = AllMembers} = Pool) -> +reschedule_ttl_timer(#pool{ttl = TTL, all_members = AllMembers} = Pool) -> case TTL#ttl.timer of TRef when is_reference(TRef) -> erlang:cancel_timer(TRef); undefined -> ok end, TTL1 = TTL#ttl{timer = undefined, timer_target = undefined}, Pool1 = Pool#pool{ttl = TTL1}, - case find_earliest_expiry(FreePids, AllMembers) of - none -> Pool1; - {Pid, ExpTs} -> schedule_ttl_timer_for(Pid, ExpTs, TTL1, Pool1) - end. - -%% @doc Scan free_pids for the member with the smallest finite ExpiresAt. O(n). --spec find_earliest_expiry([pid()], members_map()) -> {pid(), integer()} | none. -find_earliest_expiry(Pids, AllMembers) -> - lists:foldl( + Earliest = free_fold( fun(Pid, Acc) -> case maps:get(Pid, AllMembers, undefined) of #member{expires_at = infinity} -> @@ -2207,8 +2300,12 @@ find_earliest_expiry(Pids, AllMembers) -> end end, none, - Pids - ). + Pool1 + ), + case Earliest of + none -> Pool1; + {Pid, ExpTs} -> schedule_ttl_timer_for(Pid, ExpTs, TTL1, Pool1) + end. %% @doc Extract only the static config fields of a #ttl{} for change-detection in %% reconfigure. Must not include timer or timer_target (those change at runtime). @@ -2275,9 +2372,6 @@ compute_utilization(#pool{ {queue_max, QueueMax} ]. -do_call_free_members(Fun, Pids) -> - [do_call_free_member(Fun, P) || P <- Pids]. - do_call_free_member(Fun, Pid) -> try {ok, Fun(Pid)} @@ -2286,6 +2380,185 @@ do_call_free_member(Fun, Pid) -> {error, Reason} end. +%% ===== Free-collection abstraction ===== + +free_new(lifo) -> []; +free_new(fifo) -> queue:new(). + +free_push(Pid, #pool{member_order = lifo, free_pids = L, free_count = N} = Pool) -> + Pool#pool{free_pids = [Pid | L], free_count = N + 1}; +free_push(Pid, #pool{member_order = fifo, free_pids = Q, free_count = N} = Pool) -> + Pool#pool{free_pids = queue:in(Pid, Q), free_count = N + 1}. + +free_pop(#pool{member_order = lifo, free_pids = [Pid | Rest], free_count = N} = Pool) -> + {Pid, Pool#pool{free_pids = Rest, free_count = N - 1}}; +free_pop(#pool{member_order = fifo, free_pids = Q, free_count = N} = Pool) -> + case queue:out(Q) of + {empty, _} -> empty; + {{value, Pid}, Rest} -> {Pid, Pool#pool{free_pids = Rest, free_count = N - 1}} + end; +free_pop(#pool{member_order = lifo, free_pids = []}) -> + empty. + +free_delete(Pid, #pool{member_order = lifo, free_pids = L, free_count = N} = Pool) -> + Pool#pool{free_pids = lists:delete(Pid, L), free_count = N - 1}; +free_delete(Pid, #pool{member_order = fifo, free_pids = Q, free_count = N} = Pool) -> + Pool#pool{free_pids = queue:filter(fun(P) -> P =/= Pid end, Q), free_count = N - 1}. + +free_fold(Fun, Acc0, #pool{member_order = lifo, free_pids = L}) -> + lists:foldl(Fun, Acc0, L); +free_fold(Fun, Acc0, #pool{member_order = fifo, free_pids = Q}) -> + queue:fold(Fun, Acc0, Q). + +free_take_n(N, #pool{member_order = lifo, free_pids = L}) -> + lists:sublist(L, N); +free_take_n(N, #pool{member_order = fifo, free_pids = Q}) -> + lists:sublist(queue:to_list(Q), N). + +free_convert(#pool{member_order = O} = Pool, O) -> + Pool; +free_convert(#pool{member_order = lifo, free_pids = L} = Pool, fifo) -> + Pool#pool{free_pids = queue:from_list(L), member_order = fifo}; +free_convert(#pool{member_order = fifo, free_pids = Q} = Pool, lifo) -> + Pool#pool{free_pids = queue:to_list(Q), member_order = lifo}. + +%% ===== Demand buffer ===== + +cull_bucket_size_ms(CullInterval, MaxAge) -> + CIms = time_as_millis(CullInterval), + MAms = time_as_millis(MaxAge), + case CIms =:= 0 orelse MAms =:= 0 of + true -> 0; + false -> min(CIms, MAms) + end. + +validate_demand_buf_size(CullInterval, MaxAge) -> + case cull_bucket_size_ms(CullInterval, MaxAge) of + 0 -> + ok; + BS -> + N = ceil(time_as_millis(MaxAge) / BS), + case N > ?MAX_DEMAND_HISTORY_BUCKETS of + true -> + {error, + {demand_history_too_large, #{ + cull_interval => CullInterval, + max_age => MaxAge, + computed_buckets => N, + max_buckets => ?MAX_DEMAND_HISTORY_BUCKETS + }}}; + false -> + ok + end + end. + +init_demand_buf(CullInterval, MaxAge) -> + case cull_bucket_size_ms(CullInterval, MaxAge) of + 0 -> + undefined; + BS -> + N = min(ceil(time_as_millis(MaxAge) / BS), ?MAX_DEMAND_HISTORY_BUCKETS), + Now = erlang:monotonic_time(millisecond), + queue:from_list([{0, 0} || _ <- lists:seq(1, N - 1)] ++ [{0, Now}]) + end. + +rebuild_demand_buf(#pool{init_count = C, max_count = C} = Pool) -> + Pool#pool{demand_buf = undefined}; +rebuild_demand_buf(#pool{cull_interval = CI, max_age = MA, demand_buf = OldBuf} = Pool) -> + case cull_bucket_size_ms(CI, MA) of + 0 -> + Pool#pool{demand_buf = undefined}; + NewBS -> + NewN = min(ceil(time_as_millis(MA) / NewBS), ?MAX_DEMAND_HISTORY_BUCKETS), + Now = erlang:monotonic_time(millisecond), + NewBuf = + case OldBuf of + undefined -> + init_demand_buf(CI, MA); + OldQ -> + remap_demand_buf(OldQ, NewN, NewBS, Now) + end, + Pool#pool{demand_buf = NewBuf} + end. + +remap_demand_buf(OldQ, NewN, NewBucketMs, Now) -> + OldEntries = queue:to_list(OldQ), + OldBucketMs = + case OldEntries of + [{_, T0}, {_, T1} | _] -> max(T0 - T1, 1); + _ -> NewBucketMs + end, + Buckets = [ + begin + NewEnd = Now - I * NewBucketMs, + NewStart = NewEnd - NewBucketMs, + Peak = lists:foldl( + fun({P, S}, Acc) -> + case S + OldBucketMs > NewStart andalso S < NewEnd of + true -> max(P, Acc); + false -> Acc + end + end, + 0, + OldEntries + ), + {Peak, NewStart} + end + || I <- lists:seq(0, NewN - 1) + ], + queue:from_list(Buckets). + +update_demand_buf(_, #pool{demand_buf = undefined} = Pool) -> + Pool; +update_demand_buf(NewInUse, Pool) -> + Now = erlang:monotonic_time(millisecond), + Q = Pool#pool.demand_buf, + {value, {CurMax, CurStart}} = queue:peek(Q), + BucketMs = cull_bucket_size_ms(Pool#pool.cull_interval, Pool#pool.max_age), + case BucketMs > 0 andalso Now - CurStart >= BucketMs of + false -> + {{value, _}, Q1} = queue:out(Q), + Pool#pool{demand_buf = queue:in_r({max(NewInUse, CurMax), CurStart}, Q1)}; + true -> + {_, Q1} = queue:out_r(Q), + Pool#pool{demand_buf = queue:in_r({NewInUse, Now}, Q1)} + end. + +demand_window_peak(#pool{demand_buf = undefined}) -> + 0; +demand_window_peak(#pool{demand_buf = Q, max_age = MaxAge}) -> + MAms = time_as_millis(MaxAge), + case MAms of + 0 -> + 0; + _ -> + Now = erlang:monotonic_time(millisecond), + queue:fold( + fun + ({P, S}, Acc) when Now - S =< MAms -> max(P, Acc); + (_, Acc) -> Acc + end, + 0, + Q + ) + end. + +oldest_idle_victims(MaxCull, #pool{all_members = AllMembers}) -> + Free = maps:to_list( + maps:filter( + fun + (_, #member{status = free}) -> true; + (_, _) -> false + end, + AllMembers + ) + ), + Sorted = lists:sort( + fun({_, #member{time = A}}, {_, #member{time = B}}) -> A =< B end, + Free + ), + lists:sublist(Sorted, MaxCull). + %% @private to_map(#pool{} = Pool) -> [Name | Values] = tuple_to_list(Pool), @@ -2314,3 +2587,74 @@ pg_join(Group, Pid) -> pg_leave(Group, Pid) -> pg:leave(Group, Pid). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +remap_demand_buf_test() -> + Now = 100_000, + + %% Narrow→wide: 2 old 10s buckets merged into 1 new 20s bucket; max peak taken + OldQ1 = queue:from_list([{5, 90_000}, {3, 80_000}]), + ?assertEqual( + [{5, 80_000}], + queue:to_list(remap_demand_buf(OldQ1, 1, 20_000, Now)) + ), + + %% Wide→narrow: 1 old 20s bucket split into 2 new 10s buckets; peak propagated + OldQ2 = queue:from_list([{8, 80_000}, {2, 60_000}]), + [{B0, _}, {B1, _}] = queue:to_list(remap_demand_buf(OldQ2, 2, 10_000, Now)), + %% [90_000,100_000) overlaps old [80_000,100_000) + ?assertEqual(8, B0), + %% [80_000, 90_000) overlaps old [80_000,100_000) + ?assertEqual(8, B1), + + %% Old entries fully outside new window → zero peaks + OldQ3 = queue:from_list([{10, 50_000}, {10, 40_000}]), + ?assertEqual( + [{0, 90_000}, {0, 80_000}], + queue:to_list(remap_demand_buf(OldQ3, 2, 10_000, Now)) + ). + +rebuild_demand_buf_from_members_test() -> + %% Disabled when cull_interval is 0 + ?assertEqual( + undefined, + rebuild_demand_buf_from_members({0, min}, {30, sec}, #{}, 0) + ), + + %% CI=10s, MA=20s → 2 buckets of 10s each + Now = erlang:monotonic_time(millisecond), + + %% Build a fake os:timestamp() for a member last-returned OffsetMs ago. + %% Inverse of member_time_to_monotonic_ms/1: monotonic → wall → {Mega,Sec,Micro}. + MkTs = fun(OffsetMs) -> + WallMs = (Now - OffsetMs) + erlang:time_offset(millisecond), + Mega = WallMs div 1_000_000_000, + RemMs = WallMs rem 1_000_000_000, + {Mega, RemMs div 1_000, (RemMs rem 1_000) * 1_000} + end, + FakeMember = fun(OffsetMs) -> + #member{ + status = free, + time = MkTs(OffsetMs), + mref = make_ref(), + expires_at = infinity, + shard_idx = 1 + } + end, + %% 2 members returned 5 s ago → bucket 0 (current) + %% 1 member returned 15 s ago → bucket 1 (older) + AllMembers = #{ + a => FakeMember(5_000), + b => FakeMember(5_000), + c => FakeMember(15_000) + }, + Q = rebuild_demand_buf_from_members({10, sec}, {20, sec}, AllMembers, _InUseCount = 1, Now), + [{B0Peak, _}, {B1Peak, _}] = queue:to_list(Q), + %% bucket 0: 2 members + in_use_count → 3 + ?assertEqual(3, B0Peak), + %% bucket 1: 1 member, no in_use bonus → 1 + ?assertEqual(1, B1Peak). + +-endif. diff --git a/test/bench_take_return.erl b/test/bench_take_return.erl index 24aa117..a06bf74 100644 --- a/test/bench_take_return.erl +++ b/test/bench_take_return.erl @@ -23,7 +23,16 @@ size_20_ttl_5ms_take_return_one/1, bench_size_20_ttl_5ms_take_return_one/2, size_500_ttl_5ms_take_return_one/1, - bench_size_500_ttl_5ms_take_return_one/2 + bench_size_500_ttl_5ms_take_return_one/2, + %% FIFO (queue) dispatch order — compare against lifo equivalents above + fifo_size_5_take_return_one/1, + bench_fifo_size_5_take_return_one/2, + fifo_size_1000_take_return_one/1, + bench_fifo_size_1000_take_return_one/2, + fifo_size_500_take_return_all/1, + bench_fifo_size_500_take_return_all/2, + fifo_size_500_clients_50_take_return_all/1, + bench_fifo_size_500_clients_50_take_return_all/2 ]). %% @doc Pool of fixed size 5 - try to take just one member and instantly return @@ -304,15 +313,93 @@ bench_size_500_ttl_5ms_take_return_one(_Input, PoolName) -> pooler:return_member(PoolName, Member) end. +%% FIFO benchmarks — member_order => fifo (queue dispatch, round-robin) + +%% @doc FIFO pool of fixed size 5 — single take/return; measures queue:out overhead vs list head. +fifo_size_5_take_return_one(init) -> + start_fixed_fifo(?FUNCTION_NAME, 5), + ?FUNCTION_NAME; +fifo_size_5_take_return_one({input, _}) -> + []; +fifo_size_5_take_return_one({stop, PoolName}) -> + stop(PoolName). + +bench_fifo_size_5_take_return_one(_Input, PoolName) -> + Member = pooler:take_member(PoolName), + true = is_pid(Member), + pooler:return_member(PoolName, Member). + +%% @doc FIFO pool of fixed size 1000 — larger queue, measures queue:out + queue:in at scale. +fifo_size_1000_take_return_one(init) -> + start_fixed_fifo(?FUNCTION_NAME, 1000), + ?FUNCTION_NAME; +fifo_size_1000_take_return_one({input, _}) -> + []; +fifo_size_1000_take_return_one({stop, PoolName}) -> + stop(PoolName). + +bench_fifo_size_1000_take_return_one(_Input, PoolName) -> + Member = pooler:take_member(PoolName), + true = is_pid(Member), + pooler:return_member(PoolName, Member). + +%% @doc FIFO pool of fixed size 500 — drain all members then return them; exercises full rotation. +fifo_size_500_take_return_all(init) -> + start_fixed_fifo(?FUNCTION_NAME, 500), + {?FUNCTION_NAME, 500}; +fifo_size_500_take_return_all({input, {_Pool, Size}}) -> + lists:seq(1, Size); +fifo_size_500_take_return_all({stop, {PoolName, _}}) -> + stop(PoolName). + +bench_fifo_size_500_take_return_all(_Input, {PoolName, Size}) -> + Members = take_n(PoolName, Size), + [pooler:return_member(PoolName, Member) || Member <- Members]. + +%% @doc FIFO pool, 500 members, 50 concurrent clients — measures contention on the queue. +fifo_size_500_clients_50_take_return_all(init) -> + PoolSize = 500, + NumClients = 50, + PerClient = PoolSize div NumClients, + start_fixed_fifo(?FUNCTION_NAME, PoolSize), + Clients = [ + erlang:spawn_link(fun() -> client(?FUNCTION_NAME, 0, PerClient) end) + || _ <- lists:seq(1, NumClients) + ], + {?FUNCTION_NAME, PoolSize, Clients}; +fifo_size_500_clients_50_take_return_all({input, _}) -> + []; +fifo_size_500_clients_50_take_return_all({stop, {PoolName, _Size, Clients}}) -> + [ + begin + unlink(Pid), + exit(Pid, shutdown) + end + || Pid <- Clients + ], + stop(PoolName). + +bench_fifo_size_500_clients_50_take_return_all(_Input, {_PoolName, _Size, Clients}) -> + Ref = erlang:make_ref(), + Self = self(), + lists:foreach(fun(C) -> C ! {do, Self, Ref} end, Clients), + lists:foreach( + fun(_) -> + receive + {done, RecRef} -> RecRef = Ref + after 5000 -> error(timeout) + end + end, + Clients + ). + %% Internal start_fixed(Name, Size) -> - Conf = [ - {name, Name}, - {init_count, Size}, - {max_count, Size} - ], - start(Conf). + start([{name, Name}, {init_count, Size}, {max_count, Size}]). + +start_fixed_fifo(Name, Size) -> + start([{name, Name}, {init_count, Size}, {max_count, Size}, {member_order, fifo}]). start(Conf0) -> Conf = [{start_mfa, {pooled_gs, start_link, [{"test"}]}} | Conf0], diff --git a/test/pooler_tests.erl b/test/pooler_tests.erl index 92a03d0..91723bc 100644 --- a/test/pooler_tests.erl +++ b/test/pooler_tests.erl @@ -1313,6 +1313,7 @@ reconfigure_test_() -> ?assertEqual( {ok, [ {set_parameter, {init_count, 3}}, + {rebuild_demand_buf}, {start_workers, 1} ]}, pooler:pool_reconfigure(Name, Config1) @@ -1331,7 +1332,9 @@ reconfigure_test_() -> ?assertEqual( {ok, [ {set_parameter, {init_count, 1}}, + {rebuild_demand_buf}, {set_parameter, {max_count, 1}}, + {rebuild_demand_buf}, {stop_free_workers, 1} ]}, pooler:pool_reconfigure(Name, Config1) @@ -1409,6 +1412,7 @@ reconfigure_test_() -> ?assertEqual( {ok, [ {set_parameter, {cull_interval, {10, sec}}}, + {rebuild_demand_buf}, {reset_cull_timer, {10, sec}} ]}, pooler:pool_reconfigure(Name, NewConfig) @@ -1421,6 +1425,7 @@ reconfigure_test_() -> ?assertEqual( {ok, [ {set_parameter, {max_age, {100, ms}}}, + {rebuild_demand_buf}, {cull, []} ]}, pooler:pool_reconfigure(Name, NewConfig) @@ -1480,6 +1485,7 @@ reconfigure_test_() -> ?assertEqual( {ok, [ {set_parameter, {max_count, NewMaxCount}}, + {rebuild_demand_buf}, {set_parameter, {member_start_timeout, {10, sec}}}, {set_parameter, {queue_max, 100}}, {set_parameter, {metrics_mod, fake_metrics}}, @@ -1500,6 +1506,27 @@ reconfigure_test_() -> gen_server:call(Name, dump_pool) ) end}, + {"Change member_order lifo→fifo: action emitted, pool functional, round-robin observable", fun() -> + %% Take both workers, return in known order so we know the free list state + [P1, P2] = get_n_pids(Name, 2, []), + pooler:return_member(Name, P1), + pooler:return_member(Name, P2), + %% LIFO: P2 (most recent) should come out first + ?assertEqual(P2, pooler:take_member(Name)), + pooler:return_member(Name, P2), + %% Reconfigure to FIFO — verify action list + ?assertEqual( + {ok, [{set_member_order, fifo}]}, + pooler:pool_reconfigure(Name, StartConfig#{member_order => fifo}) + ), + ?assertMatch(#{member_order := fifo}, gen_server:call(Name, dump_pool)), + %% Under FIFO: taking P2 (front) and returning it should rotate + ?assertEqual(P2, pooler:take_member(Name)), + pooler:return_member(Name, P2), + %% P2 was just returned to the back; P1 (idle longest) should come next + ?assertEqual(P1, pooler:take_member(Name)), + pooler:return_member(Name, P1) + end}, {"Update failed", fun() -> ?assertEqual( {error, changed_unsupported_parameter}, @@ -1513,6 +1540,14 @@ reconfigure_test_() -> Name, StartConfig#{name := not_a_pool_name} ) ) + end}, + {"reconfigure rejected when cull/max_age combo exceeds demand buf limit", fun() -> + %% cull_interval=10s, max_age=1h → 360 buckets > 256 cap + BadConfig = StartConfig#{cull_interval => {10, sec}, max_age => {1, hour}}, + ?assertMatch( + {error, {demand_history_too_large, #{computed_buckets := 360}}}, + pooler:pool_reconfigure(Name, BadConfig) + ) end} ]}. @@ -2369,3 +2404,134 @@ pg_stop() -> pg_leave(Group, Pid) -> pg:leave(Group, Pid). + +%% ===== FIFO member_order tests ===== + +pooler_fifo_member_order_test_() -> + {setup, + fun() -> + application:set_env(pooler, pools, [ + #{ + name => test_pool_fifo, + max_count => 5, + init_count => 3, + start_mfa => {pooled_gs, start_link, [{"fifo-type"}]}, + member_order => fifo + } + ]), + application:start(pooler) + end, + fun(_) -> application:stop(pooler) end, [ + {"fifo: members served in round-robin order", fun() -> + P1 = pooler:take_member(test_pool_fifo), + P2 = pooler:take_member(test_pool_fifo), + P3 = pooler:take_member(test_pool_fifo), + ?assert(is_pid(P1)), + ?assertNotEqual(P1, P2), + ?assertNotEqual(P2, P3), + pooler:return_member(test_pool_fifo, P1), + pooler:return_member(test_pool_fifo, P2), + pooler:return_member(test_pool_fifo, P3), + %% FIFO: oldest idle (P1) comes out first + ?assertEqual(P1, pooler:take_member(test_pool_fifo)), + ?assertEqual(P2, pooler:take_member(test_pool_fifo)), + ?assertEqual(P3, pooler:take_member(test_pool_fifo)), + pooler:return_member(test_pool_fifo, P1), + pooler:return_member(test_pool_fifo, P2), + pooler:return_member(test_pool_fifo, P3) + end} + ]}. + +pooler_fifo_cull_test_() -> + {setup, + fun() -> + application:set_env(pooler, pools, [ + #{ + name => test_pool_fifo_cull, + max_count => 5, + init_count => 2, + start_mfa => {pooled_gs, start_link, [{"fifo-cull"}]}, + member_order => fifo, + cull_interval => {200, ms}, + max_age => {0, min} + } + ]), + application:start(pooler) + end, + fun(_) -> application:stop(pooler) end, [ + {"fifo: excess members culled to init_count", fun() -> + Pids = get_n_pids(test_pool_fifo_cull, 5, []), + [pooler:return_member(test_pool_fifo_cull, P) || P <- Pids], + wait_for_pool_state( + test_pool_fifo_cull, + 2000, + fun(U) -> maps:get(free_count, U) =:= 2 end + ) + end} + ]}. + +pooler_cull_size_invariant_test_() -> + {setup, + fun() -> + Base = #{ + max_count => 5, + init_count => 2, + start_mfa => {pooled_gs, start_link, [{"inv"}]}, + cull_interval => {200, ms}, + max_age => {0, min} + }, + application:set_env(pooler, pools, [ + Base#{name => test_pool_lifo_inv}, + Base#{name => test_pool_fifo_inv, member_order => fifo} + ]), + application:start(pooler) + end, + fun(_) -> application:stop(pooler) end, [ + {"lifo and fifo pools with identical params cull to same size", fun() -> + LPids = get_n_pids(test_pool_lifo_inv, 5, []), + FPids = get_n_pids(test_pool_fifo_inv, 5, []), + [pooler:return_member(test_pool_lifo_inv, P) || P <- LPids], + [pooler:return_member(test_pool_fifo_inv, P) || P <- FPids], + wait_for_pool_state( + test_pool_lifo_inv, + 2000, + fun(U) -> maps:get(free_count, U) =:= 2 end + ), + wait_for_pool_state( + test_pool_fifo_inv, + 2000, + fun(U) -> maps:get(free_count, U) =:= 2 end + ) + end} + ]}. + +pooler_demand_buf_validation_test_() -> + {setup, + fun() -> + application:start(pooler) + end, + fun(_) -> + application:stop(pooler) + end, + [ + {"new_pool rejected when cull/max_age combo exceeds demand buf limit", fun() -> + %% cull_interval=10s, max_age=1h → 360 buckets > 256 cap + %% The error is wrapped by the pool_sup → pooler_sup supervisor chain. + ?assertMatch( + {error, { + {shutdown, + {failed_to_start_child, pooler, + {error, {demand_history_too_large, #{computed_buckets := 360}}}}}, + _ + }}, + pooler:new_pool(#{ + name => demand_buf_test_pool, + init_count => 1, + max_count => 5, + start_mfa => {pooled_gs, start_link, [{"test"}]}, + cull_interval => {10, sec}, + max_age => {1, hour} + }) + ) + end} + ]}.