Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 0.2.0 (2026-04-17)
- remove deprecate message handling
- Add `Group.broadcast/4` for fan-out to every peer node in a cluster
- Change cross-node `dispatch/4` delivery to have the receiver perform a fresh local lookup for the key
- Route remote dispatch/broadcast through source and receiver shard causal barriers before dispatcher fan-out
- Remove deprecated message handling

## 0.1.8 (2026-04-17)
- Use `send_nosuspend` for remote shard sends and add bounded reconnect retries after busy-link disconnects
Expand Down
6 changes: 5 additions & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,11 @@ Keys ending with `"/"` are rejected by `validate_key!/1` in register/unregister/

### Dispatch

`dispatch/4` sends to all members (registry + PG). Groups remote PG members by node, sends one `{:group_dispatch, pids, message}` per remote node (O(nodes) not O(members)). Target shard chosen by `phash2(self(), num_shards)` for per-sender ordering.
`dispatch/4` sends to members in the local replicated view (registry + PG) for the selected cluster/key. It routes through the source owning shard, which flushes pending replication before sending one `{:group_dispatch, cluster, key, message}` to each target node's owning shard. The receiver shard flushes pending replication before forwarding to its per-shard dispatcher for fresh local lookup and delivery.

`broadcast/4` uses the same causal shard path, but targets every peer node in the selected cluster rather than only nodes visible in the local member view. This is the Phoenix.PubSub-style fan-out path.

Delivery is asynchronous: both APIs return after enqueueing work on the local owning shard, not after recipients process the message. Per-{cluster, key} causality is favored over the old per-sender cross-key ordering because the source barrier must be the selected cluster/key's owning shard. The default cluster and named clusters are independent routing and causality scopes.

`dispatch_local/4` skips cross-node messaging.

Expand Down
26 changes: 23 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,35 @@ Single operations (register, join) produce one event per tuple. Bulk operations

### Dispatch

Send a message to all members of a key:
Send a message to members of a key:

```elixir
:ok = Group.dispatch(:my_app, "chat/room/42", {:new_message, "hello"})
:ok = Group.dispatch(:my_app, "chat/room/42", {:new_message, "hello"}, cluster: "servers_123")

:ok = Group.broadcast(:my_app, "chat/room/42", {:new_message, "hello"})
:ok = Group.broadcast(:my_app, "chat/room/42", {:new_message, "hello"}, cluster: "servers_123")
```

Compared to `Phoenix.PubSub`, `dispatch` only broadcasts to nodes with at least
one subscription and can also be tailored to a given cluster.
`dispatch` uses this node's replicated member view for the selected cluster/key
to decide which peer nodes to contact, then each receiver performs a fresh local
lookup before delivery.
`broadcast` behaves more like `Phoenix.PubSub.broadcast/3`: it sends a lookup
request to every peer node in the selected cluster, even when this node cannot
currently see members there.

Remote dispatch and broadcast route through the source and receiver owning
shards for the selected cluster/key as causal barriers before fan-out. The
source shard flushes pending replication before sending the app message; the
receiver shard flushes pending replication before forwarding to a per-shard
dispatcher for local lookup and delivery.

Those barriers are scoped to the same cluster and key. The default cluster and
named clusters are independent: a join in `cluster: "servers_123"` only orders
later dispatches or broadcasts using `cluster: "servers_123"` for the same key.

Delivery is asynchronous: `dispatch` and `broadcast` return after enqueueing
work on the local owning shard, not after recipients process the message.

### Named Clusters

Expand Down
222 changes: 184 additions & 38 deletions lib/group.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,106 @@ defmodule Group do
- When partitions heal, conflicts are resolved. The losing process is killed
with `{:group_registry_conflict, key, meta}`

## Dispatch and Broadcast Ordering

`dispatch/4` and `broadcast/4` are asynchronous message-delivery APIs.
They return after Group accepts the message for delivery, not after recipients
handle it.

### What Group Guarantees

Same cluster-and-key membership causality is preserved. The default cluster
(`nil`) and each named cluster are separate routing scopes. If a process
successfully `join`s or `register`s key `K` in cluster `C`, and then later
sends `dispatch` or `broadcast` for the same cluster `C` and key `K`, any
remote process that receives that message will see the earlier membership
update in `members/3` or `lookup/3`.

:ok = Group.join(MyGroup, "room/123", %{role: :player})
:ok = Group.broadcast(MyGroup, "room/123", {:hello, self()})

A remote receiver that handles `{:hello, pid}` can call:

Group.members(MyGroup, "room/123")

and expect the sender's joined process to be visible.

Receiver-local joins are included at delivery time. On each contacted node,
Group performs a fresh local lookup immediately before delivery, so a process
that joined locally before the dispatcher fan-out runs can receive the message
even if the sender had not learned about that member yet.

`broadcast/4` contacts every known peer node in the selected cluster. Use it
when you prefer an extra remote lookup over missing a recent subscriber.

`dispatch/4` contacts only remote nodes that are already visible in this
node's replicated member view for the selected cluster and key. Use it when
you want the optimized path and can tolerate eventual membership visibility.

Per-sender ordering is preserved for messages to the same cluster and key.
Because messages for the same cluster and key route through the same source
shard and same receiver shard, a single sender's consecutive `broadcast` or
`dispatch` calls for cluster `C` and key `K` are delivered to each contacted
node in send order.

### What Group Does Not Guarantee

There is no synchronous delivery guarantee. A successful `dispatch/4` or
`broadcast/4` return does not mean any recipient has received or handled the
message.

There is no global membership barrier. Same cluster-and-key causality does
not extend across unrelated keys or clusters. A message for `"room/456"` does
not act as a barrier for membership changes on `"room/123"`:

:ok = Group.join(MyGroup, "room/123", %{})
:ok = Group.broadcast(MyGroup, "room/456", :hello)

A receiver of the `"room/456"` message should not assume that
`Group.members(MyGroup, "room/123")` has already caught up.

Cluster scope matters the same way: a join in `cluster: "blue"` does not act
as a barrier for a dispatch or broadcast in the default cluster, or in any
other named cluster.

`dispatch/4` does not guarantee all-node delivery. If the sender has not yet
learned that a remote node has members for the selected cluster and key,
`dispatch/4` may not contact that node. Use `broadcast/4` when every known
peer node in the selected cluster should be contacted.

`broadcast/4` does not guarantee delivery to nodes outside the sender's
current cluster view. If cluster membership itself is still converging, a
newly connected node may be missed by a concurrent broadcast.

`monitor/3` is not an application-message subscription. Monitor-only
processes receive lifecycle events, not application messages. A process must
`join/4` the key or be registered at that exact key to receive `dispatch/4`
or `broadcast/4` messages.

`leave/3` and `unregister/3` are not unsubscribe barriers for messages that
were already selected, in flight, or already delivered to the process mailbox.
After leaving, a process may still receive an older application message. If
this matters, include an application-level epoch or session id in messages and
ignore stale messages after leaving.

Monitor events are not ordered before application messages. Lifecycle events
from `monitor/3` and application messages from `dispatch/4` or `broadcast/4`
are separate delivery paths.

There is no cross-sender total ordering. If multiple processes concurrently
broadcast to the same cluster and key, recipients may observe those messages
in different interleavings.

There is no duplicate suppression across registry and group membership. If
the same process is both registered at key `K` and joined to key `K` in the
same cluster, it may receive the same application message through both
routes.

During partitions, dispatch, broadcast, lookup, and members reflect each
node's current local view. Group converges after connectivity returns, but it
does not provide linearizable reads or exactly-once message delivery across
partitions.

## Clusters

By default, all operations use the default cluster (`nil`). You can
Expand Down Expand Up @@ -161,9 +261,10 @@ defmodule Group do
# Monitor events in the named cluster
:ok = Group.monitor(MySup, :all, cluster: "game_servers")

# Members and dispatch also support cluster option
# Members, dispatch, and broadcast also support cluster option
Group.members(MySup, "room/123", cluster: "game_servers")
Group.dispatch(MySup, "room/123", {:msg, "hi"}, cluster: "game_servers")
Group.broadcast(MySup, "room/123", {:msg, "hi"}, cluster: "game_servers")

## Architecture Notes

Expand Down Expand Up @@ -396,6 +497,10 @@ defmodule Group do
Use `register/4` when you need exactly one process per key. Use `join/4`
when multiple processes should be able to share the same key.

See the [Dispatch and Broadcast Ordering](#module-dispatch-and-broadcast-ordering)
section for the same cluster-and-key causality guarantee between registration
and later app messages.

## Parameters

- `name` - The Group name
Expand Down Expand Up @@ -581,6 +686,10 @@ defmodule Group do
Note: Joining does NOT automatically monitor events.
Call `monitor/2` separately if you want to receive events.

See the [Dispatch and Broadcast Ordering](#module-dispatch-and-broadcast-ordering)
section for the same cluster-and-key causality guarantee between membership
and later app messages.

## Parameters

- `name` - The Group name
Expand Down Expand Up @@ -614,6 +723,12 @@ defmodule Group do
@doc """
Leave a group that was previously joined.

Leaving removes the process from future membership lookups once the leave is
observed, but it is not an unsubscribe barrier for messages that were already
selected, in flight, or already delivered to the process mailbox. If stale
messages matter, include an application-level epoch or session id in messages
and ignore old messages after leaving.

## Parameters

- `name` - The Group name
Expand Down Expand Up @@ -865,7 +980,7 @@ defmodule Group do
# ===========================================================================

@doc """
Dispatch a message to all members of a key.
Dispatch a message to all members of a key visible from this node.

Sends `message` to all processes that have joined the key via `join/3`, as well as
any DurableServer registered at that key. This is useful for application-level
Expand All @@ -887,8 +1002,20 @@ defmodule Group do

## Filtering by Metadata

`dispatch/3` sends to all members. If you need to filter by metadata (e.g., only
send to members with `%{type: :channel}`), use `members/2` directly:
`dispatch/3` sends to all members in this node's current replicated view.
Remote nodes perform a fresh local lookup before delivering the message, but
`dispatch/3` only contacts remote nodes that this node can already see for
the selected cluster and key. Use `broadcast/3` when you want to contact all
peer nodes in the selected cluster.

Delivery is asynchronous: the call returns after enqueueing work on the
local owning shard, not after recipients process the message.

See the [Dispatch and Broadcast Ordering](#module-dispatch-and-broadcast-ordering)
section for the same cluster-and-key causality guarantee.

If you need to filter by metadata (e.g., only send to members with
`%{type: :channel}`), use `members/2` directly:

for {pid, %{type: :channel}} <- Group.members(sup, key) do
send(pid, message)
Expand All @@ -909,40 +1036,54 @@ defmodule Group do
cluster = Keyword.get(opts, :cluster)
num_shards = get_config(name).num_shards
shard = Replica.shard_index_for(cluster, key, num_shards)
local = node()

# Registry entry (one or none) — always direct send, even cross-node
case Data.registry_lookup(name, shard, cluster, key) do
{pid, _meta, _time, _node} -> send(pid, message)
nil -> :ok
end
send(Replica.shard_name(name, shard), {:group_dispatch_request, cluster, key, message})

# PG members — group remote pids by node for batched fan-out
case Data.pg_members_with_node(name, shard, cluster, key) do
[] ->
:ok

members ->
{local_pids, remote_by_node} =
Enum.reduce(members, {[], %{}}, fn {pid, _meta, node}, {locals, remotes} ->
if node == local do
{[pid | locals], remotes}
else
{locals, Map.update(remotes, node, [pid], &[pid | &1])}
end
end)

for pid <- local_pids, do: send(pid, message)

# Hash caller pid to pick a shard — same caller always hits the same
# shard, preserving per-sender message ordering across dispatches.
dispatch_shard = :erlang.phash2(self(), num_shards)
shard_name = Replica.shard_name(name, dispatch_shard)

for {target_node, pids} <- remote_by_node do
send({shard_name, target_node}, {:group_dispatch, pids, message})
end
end
:ok
end

@doc """
Broadcast a message to all members of a key on every peer node in the selected
cluster.

Like `dispatch/4`, this sends to the local registry entry and local PG
members for the key. Unlike `dispatch/4`, it does not use this node's
replicated member view to decide which remote nodes may have recipients.
It routes through the local owning shard, which flushes pending replication
before sending one request to the owning shard on every remote peer in the
cluster. Each receiver shard flushes pending replication before forwarding
to its local dispatcher for a fresh local lookup and delivery.

This is useful when missing a recent remote join is worse than sending an
extra remote shard message.

The `:cluster` option selects the cluster whose peer nodes are contacted and
is part of the causality scope. The default cluster and named clusters do not
order each other.

Delivery is asynchronous: the call returns after enqueueing work on the
local owning shard, not after recipients process the message.

See the [Dispatch and Broadcast Ordering](#module-dispatch-and-broadcast-ordering)
section for the same cluster-and-key causality guarantee.

## Options

- `:cluster` - Broadcast within a named cluster instead of the default cluster

## Returns

- `:ok` always
"""
def broadcast(name, key, message, opts \\ [])

def broadcast(name, key, message, opts)
when is_atom(name) and is_binary(key) and is_list(opts) do
cluster = Keyword.get(opts, :cluster)
num_shards = get_config(name).num_shards
shard = Replica.shard_index_for(cluster, key, num_shards)

send(Replica.shard_name(name, shard), {:group_broadcast_request, cluster, key, message})

:ok
end
Expand All @@ -968,15 +1109,20 @@ defmodule Group do
cluster = Keyword.get(opts, :cluster)
num_shards = get_config(name).num_shards
shard = Replica.shard_index_for(cluster, key, num_shards)

dispatch_local_members(name, shard, cluster, key, message)

:ok
end

defp dispatch_local_members(name, shard, cluster, key, message) do
local = node()

# Registry entry — only if local
case Data.registry_lookup(name, shard, cluster, key) do
{pid, _meta, _time, ^local} -> send(pid, message)
_ -> :ok
end

# PG members — local only, filtered in the match spec
for pid <- Data.pg_members_local(name, shard, cluster, key) do
send(pid, message)
end
Expand Down
Loading