diff --git a/CHANGELOG.md b/CHANGELOG.md index 65e8521..952d1af 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## 0.2.0 (2026-04-17) -- remove deprecate message handling +- Add `Group.broadcast/4` for fan-out to every peer node in a cluster +- Change cross-node `dispatch/4` delivery to have the receiver perform a fresh local lookup for the key +- Route remote dispatch/broadcast through source and receiver shard causal barriers before dispatcher fan-out +- Remove deprecated message handling ## 0.1.8 (2026-04-17) - Use `send_nosuspend` for remote shard sends and add bounded reconnect retries after busy-link disconnects diff --git a/CLAUDE.md b/CLAUDE.md index 5a6412f..9289507 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -165,7 +165,11 @@ Keys ending with `"/"` are rejected by `validate_key!/1` in register/unregister/ ### Dispatch -`dispatch/4` sends to all members (registry + PG). Groups remote PG members by node, sends one `{:group_dispatch, pids, message}` per remote node (O(nodes) not O(members)). Target shard chosen by `phash2(self(), num_shards)` for per-sender ordering. +`dispatch/4` sends to members in the local replicated view (registry + PG) for the selected cluster/key. It routes through the source owning shard, which flushes pending replication before sending one `{:group_dispatch, cluster, key, message}` to each target node's owning shard. The receiver shard flushes pending replication before forwarding to its per-shard dispatcher for fresh local lookup and delivery. + +`broadcast/4` uses the same causal shard path, but targets every peer node in the selected cluster rather than only nodes visible in the local member view. This is the Phoenix.PubSub-style fan-out path. + +Delivery is asynchronous: both APIs return after enqueueing work on the local owning shard, not after recipients process the message. Per-{cluster, key} causality is favored over the old per-sender cross-key ordering because the source barrier must be the selected cluster/key's owning shard. The default cluster and named clusters are independent routing and causality scopes. `dispatch_local/4` skips cross-node messaging. diff --git a/README.md b/README.md index c766aeb..25e5f48 100644 --- a/README.md +++ b/README.md @@ -111,15 +111,35 @@ Single operations (register, join) produce one event per tuple. Bulk operations ### Dispatch -Send a message to all members of a key: +Send a message to members of a key: ```elixir :ok = Group.dispatch(:my_app, "chat/room/42", {:new_message, "hello"}) :ok = Group.dispatch(:my_app, "chat/room/42", {:new_message, "hello"}, cluster: "servers_123") + +:ok = Group.broadcast(:my_app, "chat/room/42", {:new_message, "hello"}) +:ok = Group.broadcast(:my_app, "chat/room/42", {:new_message, "hello"}, cluster: "servers_123") ``` -Compared to `Phoenix.PubSub`, `dispatch` only broadcasts to nodes with at least -one subscription and can also be tailored to a given cluster. +`dispatch` uses this node's replicated member view for the selected cluster/key +to decide which peer nodes to contact, then each receiver performs a fresh local +lookup before delivery. +`broadcast` behaves more like `Phoenix.PubSub.broadcast/3`: it sends a lookup +request to every peer node in the selected cluster, even when this node cannot +currently see members there. + +Remote dispatch and broadcast route through the source and receiver owning +shards for the selected cluster/key as causal barriers before fan-out. The +source shard flushes pending replication before sending the app message; the +receiver shard flushes pending replication before forwarding to a per-shard +dispatcher for local lookup and delivery. + +Those barriers are scoped to the same cluster and key. The default cluster and +named clusters are independent: a join in `cluster: "servers_123"` only orders +later dispatches or broadcasts using `cluster: "servers_123"` for the same key. + +Delivery is asynchronous: `dispatch` and `broadcast` return after enqueueing +work on the local owning shard, not after recipients process the message. ### Named Clusters diff --git a/lib/group.ex b/lib/group.ex index 6f6b538..71efaef 100644 --- a/lib/group.ex +++ b/lib/group.ex @@ -19,6 +19,106 @@ defmodule Group do - When partitions heal, conflicts are resolved. The losing process is killed with `{:group_registry_conflict, key, meta}` + ## Dispatch and Broadcast Ordering + + `dispatch/4` and `broadcast/4` are asynchronous message-delivery APIs. + They return after Group accepts the message for delivery, not after recipients + handle it. + + ### What Group Guarantees + + Same cluster-and-key membership causality is preserved. The default cluster + (`nil`) and each named cluster are separate routing scopes. If a process + successfully `join`s or `register`s key `K` in cluster `C`, and then later + sends `dispatch` or `broadcast` for the same cluster `C` and key `K`, any + remote process that receives that message will see the earlier membership + update in `members/3` or `lookup/3`. + + :ok = Group.join(MyGroup, "room/123", %{role: :player}) + :ok = Group.broadcast(MyGroup, "room/123", {:hello, self()}) + + A remote receiver that handles `{:hello, pid}` can call: + + Group.members(MyGroup, "room/123") + + and expect the sender's joined process to be visible. + + Receiver-local joins are included at delivery time. On each contacted node, + Group performs a fresh local lookup immediately before delivery, so a process + that joined locally before the dispatcher fan-out runs can receive the message + even if the sender had not learned about that member yet. + + `broadcast/4` contacts every known peer node in the selected cluster. Use it + when you prefer an extra remote lookup over missing a recent subscriber. + + `dispatch/4` contacts only remote nodes that are already visible in this + node's replicated member view for the selected cluster and key. Use it when + you want the optimized path and can tolerate eventual membership visibility. + + Per-sender ordering is preserved for messages to the same cluster and key. + Because messages for the same cluster and key route through the same source + shard and same receiver shard, a single sender's consecutive `broadcast` or + `dispatch` calls for cluster `C` and key `K` are delivered to each contacted + node in send order. + + ### What Group Does Not Guarantee + + There is no synchronous delivery guarantee. A successful `dispatch/4` or + `broadcast/4` return does not mean any recipient has received or handled the + message. + + There is no global membership barrier. Same cluster-and-key causality does + not extend across unrelated keys or clusters. A message for `"room/456"` does + not act as a barrier for membership changes on `"room/123"`: + + :ok = Group.join(MyGroup, "room/123", %{}) + :ok = Group.broadcast(MyGroup, "room/456", :hello) + + A receiver of the `"room/456"` message should not assume that + `Group.members(MyGroup, "room/123")` has already caught up. + + Cluster scope matters the same way: a join in `cluster: "blue"` does not act + as a barrier for a dispatch or broadcast in the default cluster, or in any + other named cluster. + + `dispatch/4` does not guarantee all-node delivery. If the sender has not yet + learned that a remote node has members for the selected cluster and key, + `dispatch/4` may not contact that node. Use `broadcast/4` when every known + peer node in the selected cluster should be contacted. + + `broadcast/4` does not guarantee delivery to nodes outside the sender's + current cluster view. If cluster membership itself is still converging, a + newly connected node may be missed by a concurrent broadcast. + + `monitor/3` is not an application-message subscription. Monitor-only + processes receive lifecycle events, not application messages. A process must + `join/4` the key or be registered at that exact key to receive `dispatch/4` + or `broadcast/4` messages. + + `leave/3` and `unregister/3` are not unsubscribe barriers for messages that + were already selected, in flight, or already delivered to the process mailbox. + After leaving, a process may still receive an older application message. If + this matters, include an application-level epoch or session id in messages and + ignore stale messages after leaving. + + Monitor events are not ordered before application messages. Lifecycle events + from `monitor/3` and application messages from `dispatch/4` or `broadcast/4` + are separate delivery paths. + + There is no cross-sender total ordering. If multiple processes concurrently + broadcast to the same cluster and key, recipients may observe those messages + in different interleavings. + + There is no duplicate suppression across registry and group membership. If + the same process is both registered at key `K` and joined to key `K` in the + same cluster, it may receive the same application message through both + routes. + + During partitions, dispatch, broadcast, lookup, and members reflect each + node's current local view. Group converges after connectivity returns, but it + does not provide linearizable reads or exactly-once message delivery across + partitions. + ## Clusters By default, all operations use the default cluster (`nil`). You can @@ -161,9 +261,10 @@ defmodule Group do # Monitor events in the named cluster :ok = Group.monitor(MySup, :all, cluster: "game_servers") - # Members and dispatch also support cluster option + # Members, dispatch, and broadcast also support cluster option Group.members(MySup, "room/123", cluster: "game_servers") Group.dispatch(MySup, "room/123", {:msg, "hi"}, cluster: "game_servers") + Group.broadcast(MySup, "room/123", {:msg, "hi"}, cluster: "game_servers") ## Architecture Notes @@ -396,6 +497,10 @@ defmodule Group do Use `register/4` when you need exactly one process per key. Use `join/4` when multiple processes should be able to share the same key. + See the [Dispatch and Broadcast Ordering](#module-dispatch-and-broadcast-ordering) + section for the same cluster-and-key causality guarantee between registration + and later app messages. + ## Parameters - `name` - The Group name @@ -581,6 +686,10 @@ defmodule Group do Note: Joining does NOT automatically monitor events. Call `monitor/2` separately if you want to receive events. + See the [Dispatch and Broadcast Ordering](#module-dispatch-and-broadcast-ordering) + section for the same cluster-and-key causality guarantee between membership + and later app messages. + ## Parameters - `name` - The Group name @@ -614,6 +723,12 @@ defmodule Group do @doc """ Leave a group that was previously joined. + Leaving removes the process from future membership lookups once the leave is + observed, but it is not an unsubscribe barrier for messages that were already + selected, in flight, or already delivered to the process mailbox. If stale + messages matter, include an application-level epoch or session id in messages + and ignore old messages after leaving. + ## Parameters - `name` - The Group name @@ -865,7 +980,7 @@ defmodule Group do # =========================================================================== @doc """ - Dispatch a message to all members of a key. + Dispatch a message to all members of a key visible from this node. Sends `message` to all processes that have joined the key via `join/3`, as well as any DurableServer registered at that key. This is useful for application-level @@ -887,8 +1002,20 @@ defmodule Group do ## Filtering by Metadata - `dispatch/3` sends to all members. If you need to filter by metadata (e.g., only - send to members with `%{type: :channel}`), use `members/2` directly: + `dispatch/3` sends to all members in this node's current replicated view. + Remote nodes perform a fresh local lookup before delivering the message, but + `dispatch/3` only contacts remote nodes that this node can already see for + the selected cluster and key. Use `broadcast/3` when you want to contact all + peer nodes in the selected cluster. + + Delivery is asynchronous: the call returns after enqueueing work on the + local owning shard, not after recipients process the message. + + See the [Dispatch and Broadcast Ordering](#module-dispatch-and-broadcast-ordering) + section for the same cluster-and-key causality guarantee. + + If you need to filter by metadata (e.g., only send to members with + `%{type: :channel}`), use `members/2` directly: for {pid, %{type: :channel}} <- Group.members(sup, key) do send(pid, message) @@ -909,40 +1036,54 @@ defmodule Group do cluster = Keyword.get(opts, :cluster) num_shards = get_config(name).num_shards shard = Replica.shard_index_for(cluster, key, num_shards) - local = node() - # Registry entry (one or none) — always direct send, even cross-node - case Data.registry_lookup(name, shard, cluster, key) do - {pid, _meta, _time, _node} -> send(pid, message) - nil -> :ok - end + send(Replica.shard_name(name, shard), {:group_dispatch_request, cluster, key, message}) - # PG members — group remote pids by node for batched fan-out - case Data.pg_members_with_node(name, shard, cluster, key) do - [] -> - :ok - - members -> - {local_pids, remote_by_node} = - Enum.reduce(members, {[], %{}}, fn {pid, _meta, node}, {locals, remotes} -> - if node == local do - {[pid | locals], remotes} - else - {locals, Map.update(remotes, node, [pid], &[pid | &1])} - end - end) - - for pid <- local_pids, do: send(pid, message) - - # Hash caller pid to pick a shard — same caller always hits the same - # shard, preserving per-sender message ordering across dispatches. - dispatch_shard = :erlang.phash2(self(), num_shards) - shard_name = Replica.shard_name(name, dispatch_shard) - - for {target_node, pids} <- remote_by_node do - send({shard_name, target_node}, {:group_dispatch, pids, message}) - end - end + :ok + end + + @doc """ + Broadcast a message to all members of a key on every peer node in the selected + cluster. + + Like `dispatch/4`, this sends to the local registry entry and local PG + members for the key. Unlike `dispatch/4`, it does not use this node's + replicated member view to decide which remote nodes may have recipients. + It routes through the local owning shard, which flushes pending replication + before sending one request to the owning shard on every remote peer in the + cluster. Each receiver shard flushes pending replication before forwarding + to its local dispatcher for a fresh local lookup and delivery. + + This is useful when missing a recent remote join is worse than sending an + extra remote shard message. + + The `:cluster` option selects the cluster whose peer nodes are contacted and + is part of the causality scope. The default cluster and named clusters do not + order each other. + + Delivery is asynchronous: the call returns after enqueueing work on the + local owning shard, not after recipients process the message. + + See the [Dispatch and Broadcast Ordering](#module-dispatch-and-broadcast-ordering) + section for the same cluster-and-key causality guarantee. + + ## Options + + - `:cluster` - Broadcast within a named cluster instead of the default cluster + + ## Returns + + - `:ok` always + """ + def broadcast(name, key, message, opts \\ []) + + def broadcast(name, key, message, opts) + when is_atom(name) and is_binary(key) and is_list(opts) do + cluster = Keyword.get(opts, :cluster) + num_shards = get_config(name).num_shards + shard = Replica.shard_index_for(cluster, key, num_shards) + + send(Replica.shard_name(name, shard), {:group_broadcast_request, cluster, key, message}) :ok end @@ -968,15 +1109,20 @@ defmodule Group do cluster = Keyword.get(opts, :cluster) num_shards = get_config(name).num_shards shard = Replica.shard_index_for(cluster, key, num_shards) + + dispatch_local_members(name, shard, cluster, key, message) + + :ok + end + + defp dispatch_local_members(name, shard, cluster, key, message) do local = node() - # Registry entry — only if local case Data.registry_lookup(name, shard, cluster, key) do {pid, _meta, _time, ^local} -> send(pid, message) _ -> :ok end - # PG members — local only, filtered in the match spec for pid <- Data.pg_members_local(name, shard, cluster, key) do send(pid, message) end diff --git a/lib/group/dispatcher.ex b/lib/group/dispatcher.ex new file mode 100644 index 0000000..71639de --- /dev/null +++ b/lib/group/dispatcher.ex @@ -0,0 +1,49 @@ +defmodule Group.Dispatcher do + @moduledoc false + + use GenServer + + alias Group.Replica.Data + + def start_link(opts) do + name = Keyword.fetch!(opts, :name) + shard_index = Keyword.fetch!(opts, :shard_index) + + GenServer.start_link(__MODULE__, opts, name: dispatcher_name(name, shard_index)) + end + + def dispatcher_name(name, shard_index), do: :"#{name}_dispatcher_#{shard_index}" + + @impl true + def init(opts) do + state = %{ + name: Keyword.fetch!(opts, :name), + shard_index: Keyword.fetch!(opts, :shard_index) + } + + {:ok, state} + end + + @impl true + def handle_info({:group_dispatch, cluster, key, message}, state) when is_binary(key) do + dispatch_local_members(state.name, state.shard_index, cluster, key, message) + {:noreply, state} + end + + def handle_info(_msg, state), do: {:noreply, state} + + defp dispatch_local_members(name, shard, cluster, key, message) do + local = node() + + case Data.registry_lookup(name, shard, cluster, key) do + {pid, _meta, _time, ^local} -> send(pid, message) + _ -> :ok + end + + for pid <- Data.pg_members_local(name, shard, cluster, key) do + send(pid, message) + end + + :ok + end +end diff --git a/lib/group/replica.ex b/lib/group/replica.ex index f8392c5..c97dc53 100644 --- a/lib/group/replica.ex +++ b/lib/group/replica.ex @@ -29,7 +29,9 @@ defmodule Group.Replica do | `{:cluster_connect_ack, clusters, pid, cluster_data}` | S→remote S | Ack + bundled shard data | | `{:cluster_disconnect, clusters, pid}` | shard 0→remote | Node leaving named clusters | | `{:send_cluster_data, clusters, target_node}` | local fan-out | Notify siblings: send shard data | - | `{:group_dispatch, pids, message}` | caller→remote | Per-node fan-out for dispatch | + | `{:group_dispatch_request, cluster, key, message}` | local caller | Causal dispatch via source shard | + | `{:group_broadcast_request, cluster, key, message}` | local caller | Causal broadcast via source shard| + | `{:group_dispatch, cluster, key, message}` | shard→shard | Causal remote dispatch barrier | ## Protocol Flows @@ -109,22 +111,30 @@ defmodule Group.Replica do │ │ (no overwrite conflict │ │ for PG) │ │ - Group.dispatch(name, group, msg) │ - │── send directly to local pids │ - │── group remote pids by node │ - │── hash self() to pick shard j │ - │ │ - │ {:group_dispatch, pids, msg} Node B shard j - │────────────────────────────────────────>│ - │ │── send msg to each local pid - │ │ - - Dispatch groups remote PG members by node and sends one - `:group_dispatch` message per remote node, reducing cross-node - messages from O(members) to O(nodes). The target shard is chosen - by hashing the caller's pid (`phash2(self(), num_shards)`), so - back-to-back dispatches from the same caller always route through - the same shard, preserving per-sender message ordering. + Group.dispatch(name, group, msg) + │── send request to local owning shard + │ + Node A owning shard Node B owning shard + │── flush outbound/inbound replication │ + │── local dispatcher lookup/fan-out │ + │── group target remote nodes │ + │ │ + │ {:group_dispatch, cluster, group, msg} │ + │───────────────────────────────────────────>│ + │ │── flush inbound replication + │ │── forward to local dispatcher + │ │── lookup local pids for group + │ │── send msg to each local pid + │ │ + + Dispatch and broadcast use the owning shard as a causal barrier before + handing fan-out to `Group.Dispatcher`. The source shard flushes pending + outbound replication before sending the app message. The receiver shard + flushes pending inbound replication before forwarding to its dispatcher. + This keeps application delivery from doing ETS lookup/send work in the + replica shard while preserving the least-surprise guarantee that a remote + handler reading `members/2` can observe membership updates that happened + before the source dispatch/broadcast. ### 3. Named Cluster Connect (random shard S + fan-out) @@ -427,6 +437,7 @@ defmodule Group.Replica do require Logger + alias Group.Dispatcher alias Group.Replica.Data defstruct [ @@ -995,7 +1006,36 @@ defmodule Group.Replica do {:noreply, state} end - def handle_info({:group_dispatch, pids, message}, state) do + def handle_info({:group_dispatch_request, cluster, key, message}, state) when is_binary(key) do + state = + state + |> flush_pending_replicated_message_barrier() + |> dispatch_group_locally(cluster, key, message) + |> dispatch_group_to_visible_remote_nodes(cluster, key, message) + + {:noreply, state} + end + + def handle_info({:group_broadcast_request, cluster, key, message}, state) when is_binary(key) do + state = + state + |> flush_pending_replicated_message_barrier() + |> dispatch_group_locally(cluster, key, message) + |> broadcast_group_to_remote_nodes(cluster, key, message) + + {:noreply, state} + end + + def handle_info({:group_dispatch, cluster, key, message}, state) when is_binary(key) do + state = + state + |> flush_pending_replicated_message_barrier() + |> dispatch_group_locally(cluster, key, message) + + {:noreply, state} + end + + def handle_info({:group_flush_barrier, pids, message}, state) when is_list(pids) do state = flush_pending_replicated_message_barrier(state) for pid <- pids, do: send(pid, message) {:noreply, state} @@ -1058,6 +1098,54 @@ defmodule Group.Replica do # Internal helpers # ===================================================================== + defp dispatch_group_locally(state, cluster, key, message) do + send( + Dispatcher.dispatcher_name(state.name, state.shard_index), + {:group_dispatch, cluster, key, message} + ) + + state + end + + defp dispatch_group_to_visible_remote_nodes(state, cluster, key, message) do + state + |> visible_group_remote_nodes(cluster, key) + |> Enum.each(fn target_node -> + send_remote_shard_message(state, target_node, {:group_dispatch, cluster, key, message}) + end) + + state + end + + defp broadcast_group_to_remote_nodes(state, cluster, key, message) do + {target_nodes, _cache} = broadcast_targets(state, cluster, %{}) + + Enum.each(target_nodes, fn target_node -> + send_remote_shard_message(state, target_node, {:group_dispatch, cluster, key, message}) + end) + + state + end + + defp visible_group_remote_nodes(state, cluster, key) do + %{name: name, shard_index: shard} = state + local = node() + + remote_nodes = + case Data.registry_lookup(name, shard, cluster, key) do + {_pid, _meta, _time, ^local} -> MapSet.new() + {_pid, _meta, _time, remote_node} -> MapSet.new([remote_node]) + nil -> MapSet.new() + end + + Data.pg_members_with_node(name, shard, cluster, key) + |> Enum.reduce(remote_nodes, fn + {_pid, _meta, ^local}, acc -> acc + {_pid, _meta, member_node}, acc -> MapSet.put(acc, member_node) + end) + |> MapSet.to_list() + end + defp do_local_request(pid, shard_name, request, :infinity) when is_pid(pid) do alias_ref = Process.alias() mref = Process.monitor(pid) diff --git a/lib/group/replica/supervisor.ex b/lib/group/replica/supervisor.ex index 56f45ec..621ca86 100644 --- a/lib/group/replica/supervisor.ex +++ b/lib/group/replica/supervisor.ex @@ -14,13 +14,19 @@ defmodule Group.Replica.Supervisor do num_shards = Keyword.fetch!(opts, :num_shards) children = - for i <- 0..(num_shards - 1) do - %{ - id: {Group.Replica, i}, - start: - {Group.Replica, :start_link, [[name: name, shard_index: i, num_shards: num_shards]]} - } - end + Enum.flat_map(0..(num_shards - 1), fn i -> + [ + %{ + id: {Group.Dispatcher, i}, + start: {Group.Dispatcher, :start_link, [[name: name, shard_index: i]]} + }, + %{ + id: {Group.Replica, i}, + start: + {Group.Replica, :start_link, [[name: name, shard_index: i, num_shards: num_shards]]} + } + ] + end) Supervisor.init(children, strategy: :one_for_one) end diff --git a/priv/bench/README.md b/priv/bench/README.md index a51e3a8..c30fbed 100644 --- a/priv/bench/README.md +++ b/priv/bench/README.md @@ -27,6 +27,13 @@ Uses 3 separate BEAM VMs (coordinator + 2 replicas) as OS processes: ./run_distributed.sh ``` +Focused single-shard pubsub fan-out benchmark: + +```bash +ERL_AFLAGS='+zdbbl 49152' ./run_distributed.sh --shards 1 \ + --coordinator-expr 'GroupBench.Distributed.run_pubsub_single_shard_only()' +``` + The script compiles once, starts both replicas in the background, then launches the coordinator. Replicas are killed automatically on exit. @@ -136,6 +143,24 @@ convergence on replica2 via the `replicate_leave` path. All members hash to the same shard (single key), making this the worst case for shard contention during bulk cleanup. +### 11. PubSub dispatch/broadcast single-shard fan-out + +Measures theoretical max throughput for one hot key on one shard under +multi-caller publisher load. Replica1 runs concurrent publishers calling +`Group.broadcast/4` or `Group.dispatch/4`; replica2 owns all subscribed +members for that key. + +Subscriber processes count deliveries locally in process state. The coordinator +only polls aggregate counts after publishing, so results are not bottlenecked +by per-message test acknowledgements. + +Reports: + +- logical message enqueue throughput on replica1 +- end-to-end logical messages/sec after all subscriber deliveries are observed +- raw fan-out deliveries/sec on replica2 +- per-member delivery count range and remaining subscriber mailbox pressure + ## Architecture ``` diff --git a/priv/bench/lib/group_bench.ex b/priv/bench/lib/group_bench.ex index b819017..df87027 100644 --- a/priv/bench/lib/group_bench.ex +++ b/priv/bench/lib/group_bench.ex @@ -18,12 +18,16 @@ defmodule GroupBench do ["distributed"] -> GroupBench.Distributed.run() + ["pubsub_single_shard"] -> + GroupBench.Distributed.run_pubsub_single_shard_only() + _ -> IO.puts(""" - Usage: GroupBench.main(["local" | "distributed"]) + Usage: GroupBench.main(["local" | "distributed" | "pubsub_single_shard"]) - local — Run single-node benchmarks - distributed — Coordinator: connects to replicas, drives benchmarks + local — Run single-node benchmarks + distributed — Coordinator: connects to replicas, drives benchmarks + pubsub_single_shard — Focused dispatch/broadcast fan-out benchmark """) end end diff --git a/priv/bench/lib/group_bench/distributed.ex b/priv/bench/lib/group_bench/distributed.ex index 68369d2..3ed3b88 100644 --- a/priv/bench/lib/group_bench/distributed.ex +++ b/priv/bench/lib/group_bench/distributed.ex @@ -33,6 +33,7 @@ defmodule GroupBench.Distributed do bench_many_clusters(@replicas) bench_busy_app(@replicas) bench_local_requests_under_replicated_pg_pressure(@replicas) + bench_pubsub_single_shard(@replicas) IO.puts("\n Done.\n") end @@ -52,6 +53,20 @@ defmodule GroupBench.Distributed do IO.puts("\n Done.\n") end + def run_pubsub_single_shard_only(_opts \\ []) do + Process.put(:bench_shards, 1) + + header("Distributed PubSub Single-Shard Benchmark") + IO.puts(" coordinator: #{node()}") + IO.puts(" shards: 1") + IO.puts(" schedulers: #{System.schedulers_online()}") + + connect_replicas() + bench_pubsub_single_shard(@replicas) + + IO.puts("\n Done.\n") + end + # ── Connection ──────────────────────────────────────────────────────── defp connect_replicas do @@ -872,6 +887,157 @@ defmodule GroupBench.Distributed do end end + # ── 11. PubSub single-shard max fan-out ───────────────────────────── + + defp bench_pubsub_single_shard([r1, r2] = replicas) do + header("11. PubSub Dispatch/Broadcast Single-Shard Fan-out") + + member_counts = [1, 10, 100, 1_000] + publisher_counts = [1, 4, 16, 64] + + IO.puts(" setup: 1 shard, one hot key, publishers on replica1, members on replica2") + IO.puts(" members: #{inspect(member_counts)}") + IO.puts(" publishers: #{inspect(publisher_counts)}") + IO.puts(" note: subscriber delivery counts stay local on replica2") + + for member_count <- member_counts do + subheader("#{format_number(member_count)} remote members") + + for kind <- [:broadcast, :dispatch], + publisher_count <- publisher_counts do + run_pubsub_single_shard_case(replicas, r1, r2, kind, member_count, publisher_count) + end + end + end + + defp run_pubsub_single_shard_case( + replicas, + r1, + r2, + kind, + member_count, + publisher_count + ) do + total_messages = pubsub_message_count(member_count) + expected_deliveries = total_messages * member_count + key = "pubsub/hot/#{kind}/m#{member_count}/p#{publisher_count}" + + start_group_on(r1, shards: 1) + start_group_on(r2, shards: 1) + wait_for_peer_discovery(replicas) + + member_pids = + :erpc.call( + r2, + GroupBench.Replica, + :start_counting_members, + [@name, key, member_count], + 120_000 + ) + + poll_until( + fn -> + length(:erpc.call(r1, Group, :members, [@name, key])) == member_count + end, + 60_000 + ) + + started_at = System.monotonic_time(:microsecond) + + publish_result = + :erpc.call( + r1, + GroupBench.Replica, + :pubsub_publish_load, + [@name, kind, key, total_messages, publisher_count], + 120_000 + ) + + snapshot = + poll_pubsub_deliveries(r2, member_pids, expected_deliveries, 120_000) + + total_wall_us = System.monotonic_time(:microsecond) - started_at + + report_pubsub_single_shard_result( + kind, + member_count, + publisher_count, + total_messages, + expected_deliveries, + publish_result.publish_wall_us, + total_wall_us, + snapshot + ) + + :erpc.call(r2, GroupBench.Replica, :kill_processes, [member_pids], 60_000) + stop_groups(replicas) + end + + defp pubsub_message_count(member_count) do + cond do + member_count <= 1 -> 50_000 + member_count <= 10 -> 20_000 + member_count <= 100 -> 5_000 + true -> 1_000 + end + end + + defp poll_pubsub_deliveries(node, member_pids, expected_deliveries, timeout_ms) do + deadline = System.monotonic_time(:millisecond) + timeout_ms + do_poll_pubsub_deliveries(node, member_pids, expected_deliveries, deadline, nil) + end + + defp do_poll_pubsub_deliveries(node, member_pids, expected_deliveries, deadline, last_snapshot) do + snapshot = + :erpc.call(node, GroupBench.Replica, :counting_member_snapshot, [member_pids], 120_000) + + cond do + snapshot.total_deliveries >= expected_deliveries -> + snapshot + + System.monotonic_time(:millisecond) > deadline -> + raise """ + timed out waiting for pubsub deliveries: + expected=#{expected_deliveries} + last=#{inspect(snapshot)} + previous=#{inspect(last_snapshot)} + """ + + true -> + Process.sleep(25) + do_poll_pubsub_deliveries(node, member_pids, expected_deliveries, deadline, snapshot) + end + end + + defp report_pubsub_single_shard_result( + kind, + member_count, + publisher_count, + total_messages, + expected_deliveries, + publish_wall_us, + total_wall_us, + snapshot + ) do + publish_ops_sec = round(total_messages * 1_000_000 / max(publish_wall_us, 1)) + end_to_end_msg_sec = round(total_messages * 1_000_000 / max(total_wall_us, 1)) + delivery_sec = round(snapshot.total_deliveries * 1_000_000 / max(total_wall_us, 1)) + + IO.puts( + " #{kind} publishers=#{publisher_count}: " <> + "members=#{format_number(member_count)}, " <> + "messages=#{format_number(total_messages)}, " <> + "deliveries=#{format_number(snapshot.total_deliveries)}/#{format_number(expected_deliveries)}, " <> + "enqueue=#{format_number(publish_ops_sec)} msg/s, " <> + "e2e=#{format_number(end_to_end_msg_sec)} msg/s, " <> + "fanout=#{format_number(delivery_sec)} deliveries/s, " <> + "wall=#{format_number(div(total_wall_us, 1000))} ms, " <> + "publish=#{format_number(div(publish_wall_us, 1000))} ms, " <> + "per_member=#{snapshot.min_count}..#{snapshot.max_count}, " <> + "member_queue=max #{snapshot.max_member_queue_len}, total #{snapshot.total_member_queue_len}" + ) + end + defp bench_local_load_sweep( node, replica_fun, diff --git a/priv/bench/lib/group_bench/replica.ex b/priv/bench/lib/group_bench/replica.ex index 22df3c0..e84b391 100644 --- a/priv/bench/lib/group_bench/replica.ex +++ b/priv/bench/lib/group_bench/replica.ex @@ -701,4 +701,136 @@ defmodule GroupBench.Replica do final_user_pids ++ room_pids end + + @doc """ + Starts `count` local members for one hot key. Each member counts deliveries + locally in process state so the benchmark does not add per-message coordinator + ack fan-in. + """ + def start_counting_members(name, key, count, opts \\ []) do + parent = self() + + pids = + Enum.map(1..count, fn _ -> + spawn(fn -> + :ok = Group.join(name, key, %{}, opts) + send(parent, {:counting_member_ready, self()}) + counting_member_loop(0) + end) + end) + + Enum.each(pids, fn pid -> + receive do + {:counting_member_ready, ^pid} -> :ok + after + 30_000 -> raise "Timed out waiting for counting member" + end + end) + + pids + end + + @doc """ + Starts `publisher_count` local publishers that concurrently call + `Group.dispatch/4` or `Group.broadcast/4` for one hot key until + `total_messages` logical publishes have been enqueued. + """ + def pubsub_publish_load(name, kind, key, total_messages, publisher_count, opts \\ []) + when kind in [:broadcast, :dispatch] and total_messages > 0 and publisher_count > 0 do + parent = self() + ranges = publish_ranges(total_messages, publisher_count) + started_at = System.monotonic_time(:microsecond) + + publishers = + Enum.map(ranges, fn range -> + spawn(fn -> + Enum.each(range, fn seq -> + :ok = apply(Group, kind, [name, key, {:bench_pubsub_delivery, kind, seq}, opts]) + end) + + send(parent, {:pubsub_publisher_done, self()}) + end) + end) + + Enum.each(publishers, fn pid -> + receive do + {:pubsub_publisher_done, ^pid} -> :ok + after + 120_000 -> raise "Timed out waiting for pubsub publisher" + end + end) + + %{publish_wall_us: System.monotonic_time(:microsecond) - started_at} + end + + @doc """ + Returns aggregate local delivery counts and subscriber mailbox pressure. + """ + def counting_member_snapshot(pids) do + ref = make_ref() + + Enum.each(pids, fn pid -> + send(pid, {:bench_pubsub_get_count, self(), ref}) + end) + + counts = + Enum.map(pids, fn pid -> + receive do + {:bench_pubsub_count, ^ref, ^pid, count} -> count + after + 120_000 -> raise "Timed out waiting for counting member snapshot" + end + end) + + queue_lens = + Enum.map(pids, fn pid -> + case Process.info(pid, :message_queue_len) do + {:message_queue_len, len} -> len + nil -> 0 + end + end) + + %{ + total_deliveries: Enum.sum(counts), + min_count: Enum.min(counts, fn -> 0 end), + max_count: Enum.max(counts, fn -> 0 end), + max_member_queue_len: Enum.max(queue_lens, fn -> 0 end), + total_member_queue_len: Enum.sum(queue_lens) + } + end + + defp counting_member_loop(count) do + receive do + {:bench_pubsub_delivery, _kind, _seq} -> + counting_member_loop(count + 1) + + {:bench_pubsub_get_count, caller, ref} -> + send(caller, {:bench_pubsub_count, ref, self(), count}) + counting_member_loop(count) + + _ -> + counting_member_loop(count) + end + end + + defp publish_ranges(total_messages, publisher_count) do + per_publisher = div(total_messages, publisher_count) + extra = rem(total_messages, publisher_count) + + {ranges, _next_seq} = + Enum.map_reduce(1..publisher_count, 1, fn publisher_index, next_seq -> + count = per_publisher + if(publisher_index <= extra, do: 1, else: 0) + + cond do + count <= 0 -> + {(next_seq - 1)..(next_seq - 2)//1, next_seq} + + true -> + range = next_seq..(next_seq + count - 1) + {range, next_seq + count} + end + end) + + Enum.reject(ranges, fn range -> Enum.empty?(range) end) + end end diff --git a/test/distributed_test.exs b/test/distributed_test.exs index df34035..9e3169c 100644 --- a/test/distributed_test.exs +++ b/test/distributed_test.exs @@ -41,6 +41,14 @@ defmodule Group.DistributedTest do TestCluster.rpc!(node, :sys, :get_state, [Group.PeerReconnect.reconnect_name(name)]) end + defp spawn_join_message_forwarder(node, name, key, meta, target_pid, opts \\ []) do + TestCluster.spawn_join_message_forwarder(node, name, key, meta, target_pid, opts) + end + + defp spawn_join_members_forwarder(node, name, key, meta, target_pid, opts \\ []) do + TestCluster.spawn_join_members_forwarder(node, name, key, meta, target_pid, opts) + end + describe "registration replication" do test "register replicates to other nodes" do peers = TestCluster.start_peers(2) @@ -107,6 +115,175 @@ defmodule Group.DistributedTest do end end + describe "dispatch and broadcast" do + test "broadcast reaches remote local member before join replication reaches sender" do + peers = TestCluster.start_peers(2) + on_exit(fn -> TestCluster.stop_peers(peers) end) + + [{_, node_a}, {_, node_b}] = peers + name = :"dist_broadcast_#{System.unique_integer([:positive])}" + num_shards = 2 + opts = [name: name, shards: num_shards] + key = "broadcast/race/#{System.unique_integer([:positive])}" + shard = :erlang.phash2({nil, key}, num_shards) + shard_name = shard_name(name, shard) + + start_group_on_peers(peers, opts) + + TestCluster.assert_eventually(fn -> + TestCluster.rpc!(node_a, Group, :nodes, [name]) == [node_b] + end) + + :ok = TestCluster.rpc!(node_a, :sys, :suspend, [shard_name]) + + try do + member = + spawn_join_message_forwarder(node_b, name, key, %{role: :listener}, self()) + + assert_receive {:join_forwarder_ready, ^member}, 5_000 + assert TestCluster.rpc!(node_a, Group, :members, [name, key]) == [] + + :ok = TestCluster.rpc!(node_a, Group, :broadcast, [name, key, {:broadcast_race, key}]) + refute_receive {:group_message, ^member, {:broadcast_race, ^key}}, 100 + TestCluster.rpc!(node_a, :sys, :resume, [shard_name]) + assert_receive {:group_message, ^member, {:broadcast_race, ^key}}, 5_000 + after + TestCluster.rpc!(node_a, :sys, :resume, [shard_name]) + end + end + + test "dispatch remote lookup catches member that raced sender replication" do + peers = TestCluster.start_peers(2) + on_exit(fn -> TestCluster.stop_peers(peers) end) + + [{_, node_a}, {_, node_b}] = peers + name = :"dist_dispatch_lookup_#{System.unique_integer([:positive])}" + num_shards = 2 + opts = [name: name, shards: num_shards] + key = "dispatch/race/#{System.unique_integer([:positive])}" + shard = :erlang.phash2({nil, key}, num_shards) + shard_name = shard_name(name, shard) + + start_group_on_peers(peers, opts) + + first = + spawn_join_message_forwarder(node_b, name, key, %{order: 1}, self()) + + assert_receive {:join_forwarder_ready, ^first}, 5_000 + + TestCluster.assert_eventually(fn -> + TestCluster.rpc!(node_a, Group, :members, [name, key]) == [{first, %{order: 1}}] + end) + + :ok = TestCluster.rpc!(node_a, :sys, :suspend, [shard_name]) + + try do + second = + spawn_join_message_forwarder(node_b, name, key, %{order: 2}, self()) + + assert_receive {:join_forwarder_ready, ^second}, 5_000 + assert TestCluster.rpc!(node_a, Group, :members, [name, key]) == [{first, %{order: 1}}] + + :ok = TestCluster.rpc!(node_a, Group, :dispatch, [name, key, {:dispatch_race, key}]) + refute_receive {:group_message, ^first, {:dispatch_race, ^key}}, 100 + refute_receive {:group_message, ^second, {:dispatch_race, ^key}}, 100 + TestCluster.rpc!(node_a, :sys, :resume, [shard_name]) + assert_receive {:group_message, ^first, {:dispatch_race, ^key}}, 5_000 + assert_receive {:group_message, ^second, {:dispatch_race, ^key}}, 5_000 + after + TestCluster.rpc!(node_a, :sys, :resume, [shard_name]) + end + end + + test "broadcast delivery waits for receiver replica shard causal barrier" do + peers = TestCluster.start_peers(2) + on_exit(fn -> TestCluster.stop_peers(peers) end) + + [{_, node_a}, {_, node_b}] = peers + name = :"dist_broadcast_dispatcher_#{System.unique_integer([:positive])}" + num_shards = 2 + opts = [name: name, shards: num_shards] + key = "broadcast/dispatcher/#{System.unique_integer([:positive])}" + shard = :erlang.phash2({nil, key}, num_shards) + receiver_shard_name = shard_name(name, shard) + + start_group_on_peers(peers, opts) + + TestCluster.assert_eventually(fn -> + TestCluster.rpc!(node_a, Group, :nodes, [name]) == [node_b] + end) + + member = + spawn_join_message_forwarder(node_b, name, key, %{role: :listener}, self()) + + assert_receive {:join_forwarder_ready, ^member}, 5_000 + + :ok = TestCluster.rpc!(node_b, :sys, :suspend, [receiver_shard_name]) + + try do + :ok = + TestCluster.rpc!(node_a, Group, :broadcast, [name, key, {:broadcast_dispatcher, key}]) + + refute_receive {:group_message, ^member, {:broadcast_dispatcher, ^key}}, 100 + TestCluster.rpc!(node_b, :sys, :resume, [receiver_shard_name]) + assert_receive {:group_message, ^member, {:broadcast_dispatcher, ^key}}, 5_000 + after + TestCluster.rpc!(node_b, :sys, :resume, [receiver_shard_name]) + end + end + + test "broadcast handler sees source membership after source join returned" do + peers = TestCluster.start_peers(2) + on_exit(fn -> TestCluster.stop_peers(peers) end) + + [{_, node_a}, {_, node_b}] = peers + name = :"dist_broadcast_causal_#{System.unique_integer([:positive])}" + num_shards = 2 + opts = [name: name, shards: num_shards] + key = "broadcast/causal/#{System.unique_integer([:positive])}" + shard = :erlang.phash2({nil, key}, num_shards) + receiver_shard_name = shard_name(name, shard) + + start_group_on_peers(peers, opts) + + TestCluster.assert_eventually(fn -> + TestCluster.rpc!(node_a, Group, :nodes, [name]) == [node_b] + end) + + receiver = + spawn_join_members_forwarder(node_b, name, key, %{role: :receiver}, self()) + + assert_receive {:join_forwarder_ready, ^receiver}, 5_000 + + TestCluster.assert_eventually(fn -> + TestCluster.rpc!(node_a, Group, :members, [name, key]) == [ + {receiver, %{role: :receiver}} + ] + end) + + :ok = TestCluster.rpc!(node_b, :sys, :suspend, [receiver_shard_name]) + + try do + source_member = TestCluster.spawn_join(node_a, name, key, %{role: :source}) + + assert TestCluster.rpc!(node_b, Group, :members, [name, key]) == [ + {receiver, %{role: :receiver}} + ] + + :ok = TestCluster.rpc!(node_a, Group, :broadcast, [name, key, {:broadcast_causal, key}]) + refute_receive {:group_message_members, ^receiver, {:broadcast_causal, ^key}, _}, 100 + TestCluster.rpc!(node_b, :sys, :resume, [receiver_shard_name]) + + assert_receive {:group_message_members, ^receiver, {:broadcast_causal, ^key}, members}, + 5_000 + + assert {source_member, %{role: :source}} in members + after + TestCluster.rpc!(node_b, :sys, :resume, [receiver_shard_name]) + end + end + end + describe "named cluster ttl" do test "expired inactive ttl lease disconnects a node and stops further replication" do peers = TestCluster.start_peers(2) diff --git a/test/group_test.exs b/test/group_test.exs index 854cabc..33c7b70 100644 --- a/test/group_test.exs +++ b/test/group_test.exs @@ -881,6 +881,20 @@ defmodule GroupTest do refute_receive {:test_message, :from_default}, 200 end + test "broadcast works with cluster: option", %{name: name} do + cluster = "broadcast_all_cluster" + key = "broadcast_all/#{System.unique_integer([:positive])}" + + :ok = Group.connect(name, cluster) + :ok = Group.join(name, key, %{}, cluster: cluster) + + :ok = Group.broadcast(name, key, {:broadcast_message, :from_cluster}, cluster: cluster) + assert_receive {:broadcast_message, :from_cluster}, 1000 + + :ok = Group.broadcast(name, key, {:broadcast_message, :from_default}) + refute_receive {:broadcast_message, :from_default}, 200 + end + test "dispatch_local only sends to local members", %{name: name} do key = "dispatch_local/#{System.unique_integer([:positive])}" @@ -1303,7 +1317,7 @@ defmodule GroupTest do send(shard, replicated_pg_join(nil, remote_key1, remote_pid, %{remote: 1}, :join)) send(shard, replicated_pg_join(nil, remote_key2, remote_pid, %{remote: 2}, :join)) - send(shard, {:group_dispatch, [self()], {:quota_marker, shard}}) + send(shard, {:group_flush_barrier, [self()], {:quota_marker, shard}}) caller1 = spawn_requester( @@ -2361,11 +2375,11 @@ defmodule GroupTest do end defp flush_replicated_pg_barrier(shard) do - send(shard, {:group_dispatch, [self()], {:replicated_pg_buffer_flushed, shard}}) + send(shard, {:group_flush_barrier, [self()], {:replicated_pg_buffer_flushed, shard}}) end defp flush_replicated_registry_barrier(shard) do - send(shard, {:group_dispatch, [self()], {:replicated_registry_buffer_flushed, shard}}) + send(shard, {:group_flush_barrier, [self()], {:replicated_registry_buffer_flushed, shard}}) end defp force_cluster_lease_sweep(name) do diff --git a/test/support/test_cluster.ex b/test/support/test_cluster.ex index 0a42bac..76c9479 100644 --- a/test/support/test_cluster.ex +++ b/test/support/test_cluster.ex @@ -124,6 +124,60 @@ defmodule Group.TestCluster do end) end + def spawn_join_message_forwarder(node, name, key, meta, target_pid, opts \\ []) do + :erpc.call(node, __MODULE__, :do_spawn_join_message_forwarder, [ + name, + key, + meta, + target_pid, + opts + ]) + end + + @doc false + def do_spawn_join_message_forwarder(name, key, meta, target_pid, opts) do + spawn(fn -> + :ok = Group.join(name, key, meta, opts) + send(target_pid, {:join_forwarder_ready, self()}) + forward_group_messages(target_pid) + end) + end + + def spawn_join_members_forwarder(node, name, key, meta, target_pid, opts \\ []) do + :erpc.call(node, __MODULE__, :do_spawn_join_members_forwarder, [ + name, + key, + meta, + target_pid, + opts + ]) + end + + @doc false + def do_spawn_join_members_forwarder(name, key, meta, target_pid, opts) do + spawn(fn -> + :ok = Group.join(name, key, meta, opts) + send(target_pid, {:join_forwarder_ready, self()}) + forward_group_messages_with_members(name, key, target_pid) + end) + end + + defp forward_group_messages_with_members(name, key, target_pid) do + receive do + message -> + send(target_pid, {:group_message_members, self(), message, Group.members(name, key)}) + forward_group_messages_with_members(name, key, target_pid) + end + end + + defp forward_group_messages(target_pid) do + receive do + message -> + send(target_pid, {:group_message, self(), message}) + forward_group_messages(target_pid) + end + end + @doc "Spawn a process on a remote node that registers, joins, and sleeps forever. Waits for both operations to complete before returning." def spawn_register_and_join(node, name, reg_key, reg_meta, join_key, join_meta) do @@ -381,7 +435,7 @@ defmodule Group.TestCluster do for shard <- 0..(num_shards - 1) do shard_name = :"#{name}_replica_#{shard}" ref = make_ref() - send(shard_name, {:group_dispatch, [self()], {:test_cluster_flush_ack, ref}}) + send(shard_name, {:group_flush_barrier, [self()], {:test_cluster_flush_ack, ref}}) receive do {:test_cluster_flush_ack, ^ref} -> :ok