Skip to content

fix(runtime): give event publishers unique discovery identities#11014

Open
zhongdaor-nv wants to merge 2 commits into
mainfrom
fix/event-publisher-identity
Open

fix(runtime): give event publishers unique discovery identities#11014
zhongdaor-nv wants to merge 2 commits into
mainfrom
fix/event-publisher-identity

Conversation

@zhongdaor-nv

@zhongdaor-nv zhongdaor-nv commented Jun 27, 2026

Copy link
Copy Markdown
Contributor

Summary

  • Give every EventPublisher a fork-safe, per-incarnation publisher ID and use the same ID in both EventEnvelope and event-channel discovery.
  • Keep the existing four-part discovery key and serialized DiscoveryInstance::EventChannel shape while allowing multiple publishers for the same process/component/topic.
  • Track ZMQ publisher connections by full DiscoveryInstanceId and ignore unrelated topics.
  • Keep asynchronous discovery unregister work alive through graceful-shutdown Phase 2.
  • Add a direct ZMQ regression test covering two same-topic publishers, event delivery from both, and continued delivery after one publisher is dropped.
  • Remove the mocker E2E autouse fixture that forced NATS, so nondurable mocker coverage exercises the default ZMQ event plane.

The root cause was that each publisher bound its own ZMQ endpoint and owned its own sequence counter, but all publishers in one process registered with the process-level discovery ID. Their discovery keys therefore collided, subscribers only connected to one endpoint, and dropping any colliding publisher could remove the shared registration.

This also makes (publisher_id, sequence) valid for broker deduplication across multiple publishers. The wire schema and discovery path structure are unchanged. The public Rust DiscoverySpec::EventChannel variant now requires publisher_id.

Validation

  • cargo fmt --all -- --check
  • git diff --check
  • cargo test -p dynamo-runtime transports::event_plane:: --lib — 17 passed after rebasing onto latest main
  • cargo clippy -p dynamo-runtime --lib --tests -- -D warnings
  • cargo test -p dynamo-runtime --lib — 410 passed, 2 ignored
  • cargo check -p dynamo-llm -p dynamo-backend-common
  • New direct-ZMQ regression repeated 20 times without failure
  • maturin develop --uv
  • pytest -q -s 'tests/router/test_router_e2e_with_mockers.py::test_router_decisions[nats_core-tcp]' — 1 passed in 40.73s with the default ZMQ event plane, two workers, and dp_size=4
  • pytest -q -s 'tests/router/test_router_e2e_with_mockers.py::test_router_decisions_disagg_round_robin_prefill_dp_rank[no_bootstrap-prefill_first]' — 1 passed in 80.00s with the default ZMQ event plane
  • pytest -q -s 'tests/router/test_router_e2e_with_mockers.py::test_router_decisions[jetstream-tcp]' — 1 passed in 40.64s, confirming durable coverage still opts into NATS
  • pre-commit run --files tests/router/test_router_e2e_with_mockers.py

Summary by CodeRabbit

  • New Features

    • Event channel registrations now use a distinct publisher identity, improving how multiple publishers on the same topic are tracked.
    • Subscribers now connect only to matching ZMQ topics, reducing unrelated connections.
  • Bug Fixes

    • Fixed unregister and logging behavior so the correct instance is reported and removed.
    • Improved publisher shutdown handling so one publisher can be stopped without affecting others.

Signed-off-by: zhongdaor <zhongdaor@nvidia.com>
@zhongdaor-nv zhongdaor-nv temporarily deployed to external_collaborator June 27, 2026 04:49 — with GitHub Actions Inactive
@github-actions github-actions Bot added the fix label Jun 27, 2026
@datadog-official

datadog-official Bot commented Jun 27, 2026

Copy link
Copy Markdown

Pipelines

⚠️ Warnings

🚦 1 Pipeline job failed

Docs link check | lychee   View in Datadog   GitHub Actions

This comment will be updated automatically if new data arrives.
🔗 Commit SHA: 21dfb28 | Docs | Give us feedback!

Signed-off-by: zhongdaor <zhongdaor@nvidia.com>
@zhongdaor-nv zhongdaor-nv temporarily deployed to external_collaborator June 27, 2026 05:22 — with GitHub Actions Inactive
@zhongdaor-nv zhongdaor-nv marked this pull request as ready for review June 27, 2026 09:22
@zhongdaor-nv zhongdaor-nv requested review from a team as code owners June 27, 2026 09:22

@devin-ai-integration devin-ai-integration Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 potential issue.

Open in Devin Review

COUNTER_WORKER_SCRIPT = os.path.join(os.path.dirname(__file__), "counter_worker.py")


@pytest.fixture(autouse=True)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Removed NATS event plane fixture may rely on default transport being NATS for etcd backends

The removed _pin_nats_event_plane_for_mocker fixture (test_router_e2e_with_mockers.py:63-79) explicitly set DYN_EVENT_PLANE=nats for non-file store backends to work around the ZMQ slow-joiner problem where mock engines publish events before ZMQ subscriptions are established. The fixture's own comment states: 'on the (now default) ZMQ event plane the router observes zero events and the routing assertions fail.' Removing it is safe only if the runtime's default_event_transport_kind() already returns NATS for distributed backends (etcd/kubernetes). If so, the fixture was redundant. If any test parameterization uses etcd but the default event plane is still ZMQ, these tests could become flaky. Worth confirming the default transport for etcd-backed tests.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

@coderabbitai

coderabbitai Bot commented Jun 27, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Enterprise

Run ID: 06439b4c-ba56-4f17-a8a2-3b7198527730

📥 Commits

Reviewing files that changed from the base of the PR and between 333b156 and 21dfb28.

📒 Files selected for processing (6)
  • lib/runtime/src/discovery/kube.rs
  • lib/runtime/src/discovery/kv_store.rs
  • lib/runtime/src/discovery/mod.rs
  • lib/runtime/src/transports/event_plane/dynamic_subscriber.rs
  • lib/runtime/src/transports/event_plane/mod.rs
  • tests/router/test_router_e2e_with_mockers.py
💤 Files with no reviewable changes (1)
  • tests/router/test_router_e2e_with_mockers.py

Walkthrough

The PR splits discovery ownership from event-channel publisher identity, threads the new publisher id through event publisher registration and ZMQ subscriber matching, updates discovery logging, adds shutdown-safe unregistration, expands ZMQ integration coverage, and removes a router mocker environment override.

Changes

Discovery and event-plane identity flow

Layer / File(s) Summary
Discovery identity contract
lib/runtime/src/discovery/mod.rs
DiscoverySpec::EventChannel now carries publisher_id, and with_instance_id distinguishes owner ids from publisher ids when building DiscoveryInstance values.
Discovery registration ids and logs
lib/runtime/src/discovery/kube.rs, lib/runtime/src/discovery/kv_store.rs
Kube and KV-store discovery registrations now capture owner and registered instance ids separately, and unregister logging uses the instance being removed.
EventPublisher identity and shutdown
lib/runtime/src/transports/event_plane/mod.rs
EventPublisher generates a random publisher_id, includes it in NATS and ZMQ direct discovery specs, and registers a shutdown guard during async unregistration.
ZMQ subscriber matching
lib/runtime/src/transports/event_plane/dynamic_subscriber.rs
DynamicSubscriber::start_zmq tracks active endpoints by DiscoveryInstanceId, filters add/remove events by topic, and the extractor tests cover topic and transport matching.
ZMQ multi-publisher test
lib/runtime/src/transports/event_plane/mod.rs
A Tokio test covers two ZMQ publishers on the same topic, independent discovery and delivery, and unregistering one publisher while the other remains active.

Router mocker harness

Layer / File(s) Summary
Remove NATS pinning fixture
tests/router/test_router_e2e_with_mockers.py
The mocker e2e router fixture that forced DYN_EVENT_PLANE to nats for etcd-backed tests was removed.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

🚥 Pre-merge checks | ✅ 3 | ❌ 2

❌ Failed checks (2 warnings)

Check name Status Explanation Resolution
Description check ⚠️ Warning The description is detailed, but it does not follow the required template and is missing the Overview, Details, reviewer start, and Related Issues sections. Reformat the PR description to the repo template, adding Overview, Details, Where should reviewer start?, and a completed Related Issues section.
Docstring Coverage ⚠️ Warning Docstring coverage is 75.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly captures the main runtime change: assigning unique discovery identities to event publishers.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

Comment @coderabbitai help to get the list of available commands.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant