Skip to content

[AAASM-1654] ✨ (op_control): Subscribe to OpControlStream + cooperative pause / fast-fail terminate#50

Merged
Chisanan232 merged 6 commits into
masterfrom
v0.0.1/AAASM-1654/feat/op_control_subscriber
May 21, 2026
Merged

[AAASM-1654] ✨ (op_control): Subscribe to OpControlStream + cooperative pause / fast-fail terminate#50
Chisanan232 merged 6 commits into
masterfrom
v0.0.1/AAASM-1654/feat/op_control_subscriber

Conversation

@Chisanan232
Copy link
Copy Markdown
Contributor

@Chisanan232 Chisanan232 commented May 21, 2026

Description

PR-E of AAASM-1422. Adds the Python SDK consumer for the gateway-side OpControlStream server-streaming RPC introduced in AAASM-1653 (agent-assembly PR-D, merged 2026-05-21 at 1b2b9283).

What this PR ships

Piece Path What
Runtime deps pyproject.toml grpcio>=1.66,<2, protobuf>=5,<7
Dev deps pyproject.toml grpcio-tools>=1.66,<2
Proto regen script scripts/gen_proto.py Reads protos from sibling agent-assembly/proto/ (overridable via $AA_PROTO_DIR); post-processes sibling-relative imports to from . import …; emits .pyi type stubs so mypy resolves message attributes
Generated stubs agent_assembly/proto/{common,policy}_pb2{,.pyi,_grpc}.py Committed so users don't need grpcio-tools
Exception agent_assembly/exceptions/OpTerminatedError New AssemblyError sibling carrying op_id
Subscriber agent_assembly/op_control.py OpControlSubscriber.connect(gateway_url, org_id=..., team_id=..., agent_id=...) opens the stream on a daemon background thread; per-op_id state machine: await_op(op_id) blocks while paused, unblocks on resume, raises OpTerminatedError on terminate; signals for unknown ops are buffered until the first await_op
Tests test/unit/test_op_control.py 7 pytest cases driving the state machine via a hand-rolled _FakeStub (no live gRPC server)

Scope boundary

PR-E ships the subscriber + state machine only. It does not:

  • Auto-wire into init_assembly / adapter check_action hooks — exposed as a standalone class users opt into. The hook into the adapter layer becomes a separate follow-up when that surface is stable.
  • Add reconnection / heartbeat logic — caller observes subscriber.stream_alive() and can re-instantiate.
  • Add a drift-check CI job for the generated stubs (regen + diff) — separate hygiene sub-task.

Path-A note

The original sub-ticket framing assumed extending an existing PyO3 gRPC pattern in aa-ffi-python. Closer inspection showed (a) no FFI gRPC client exists — the FFI is a Unix-socket IPC client to aa-runtime, (b) the RuntimeClient / GovernanceEvent Python imports are wrapped in try/except ImportError and fail silently today. Filed AAASM-1686 for the FFI binding option, then closed it as cancelled in favour of the pure-Python grpcio approach in this PR — see that ticket for the full decision trail.

Type of Change

  • ✨ New feature
  • 🐛 Bug fix
  • ♻️ Refactoring
  • 🍀 Performance improvement
  • 📝 Documentation update
  • 🔧 Configuration / CI change
  • ⬆️ Dependency upgrade (grpcio, protobuf, grpcio-tools)
  • 🚀 Release

Breaking Changes

  • No
  • Yes (describe below)

Pure additive — new module, new exception class, new deps. Existing SDK consumers untouched.

Related Issues

Testing

  • Unit tests added / updated — 7 new cases in test/unit/test_op_control.py
  • Integration tests added / updated (deferred — needs a live gateway)
  • Manual testing performed — full local gates

Local gates

Gate Result
.venv/bin/ruff check .
.venv/bin/mypy --ignore-missing-imports agent_assembly/op_control.py
.venv/bin/python -m pytest test/ 373 passed, 11 skipped (1 rerun)
.venv/bin/python -m pytest test/unit/test_op_control.py -v 7 / 7
Proto stubs import + expose new types ✅ verified via from agent_assembly.proto import policy_pb2; policy_pb2.OpControlMessage

Checklist

  • Code follows project style guidelines (ruff, mypy)
  • Self-review of the diff completed
  • Documentation updated if behaviour changed (module docstrings explain the cooperative-pause semantics)
  • All CI checks passing (pending)
  • Commits are small and follow the Gitmoji convention

Commits (6 granular)

  1. ⬆️ (deps) Add grpcio + grpcio-tools for OpControlStream consumer
  2. (agent_assembly/proto) Generate Python stubs for policy + common protos
  3. (exceptions) Add OpTerminatedError for OpControlStream consumer
  4. (op_control) Add OpControlSubscriber for gateway pause/resume/terminate
  5. (test) Add pytest cases for OpControlSubscriber
  6. 🚨 (op_control) Generate .pyi stubs + clean up ruff/mypy noise

PR-E of AAASM-1422 — the new OpControlSubscriber (lands in a follow-up
commit) needs gRPC at runtime to subscribe to the gateway-side
OpControlStream RPC introduced in AAASM-1653 (PR-D).

Runtime: grpcio>=1.66,<2 plus protobuf>=5,<7 (transitive but pinned to
the modern stable line).
Dev: grpcio-tools>=1.66,<2 for the regen script in the next commit.

Refs: AAASM-1654
…rotos

PR-E of AAASM-1422 — adds the gRPC client stubs the OpControlSubscriber
(landing in the next commit) needs to talk to the gateway's
OpControlStream RPC.

* `scripts/gen_proto.py` — regen script that reads protos from the
  sibling agent-assembly checkout (overridable via $AA_PROTO_DIR per
  the project's sibling-repo CI pattern). Post-processes the
  generated files to rewrite `import xxx_pb2` → `from . import xxx_pb2`
  so they work as a Python package.
* `agent_assembly/proto/{common,policy}_pb2{,_grpc}.py` — committed
  generated stubs. Users get gRPC out of the box without needing
  grpcio-tools installed.

A drift-check CI job that runs the script + asserts no diff is filed
as a follow-up hygiene sub-task; PR-E ships the generator + initial
output.

Refs: AAASM-1654
PR-E of AAASM-1422 — exception raised by OpControlSubscriber.await_op
when an OP_CONTROL_SIGNAL_TERMINATE arrives for the awaited op. Carries
the originating op_id so callers can correlate the failure to the
operation they were tracking.

Sibling of ToolExecutionBlockedError under AssemblyError — picks up the
same `try / except AssemblyError` umbrella that existing SDK consumers
already use.

Refs: AAASM-1654
…inate

PR-E of AAASM-1422 — the SDK-side consumer of OpControlStream introduced
in AAASM-1653 (PR-D). Connects via grpcio, runs a daemon reader thread
that drains the stream, dispatches each OpControlMessage to a per-op_id
state slot, and exposes the slot via `await_op(op_id)`.

State machine per op_id:
- PAUSE  → `await_op` blocks until RESUME (or terminate/timeout)
- RESUME → `await_op` returns immediately
- TERMINATE → `await_op` raises OpTerminatedError (latched, future calls
  also raise — once terminated, the op never becomes runnable again)
- Buffered: a signal that arrives before anyone is awaiting is stored
  in the slot so the next `await_op` sees it

Construct via `OpControlSubscriber.connect(gateway_url, org_id=..., team_id=...,
agent_id=...)`. Tests inject a hand-rolled `_OpControlStub` Protocol
implementor + call `_start()` directly to avoid touching the network.

Out of scope (deferred per the AAASM-1654 starting comment):
- Reconnection / heartbeat on stream close (caller observes via
  `stream_alive()` and re-instantiates if desired)
- Auto-wiring into `init_assembly` / adapter `check_action` hooks

Refs: AAASM-1654
PR-E of AAASM-1422 — 7 cases exercising the per-op state machine via a
hand-rolled `_FakeStub` that returns a queue-backed iterator. No gRPC
server needed.

Coverage:
- await_op returns immediately for an unknown op_id (no signal seen yet)
- pause blocks await_op until resume arrives
- terminate raises OpTerminatedError (op_id attached)
- terminate while a waiter is blocked unblocks it AND raises
- a signal that arrives before await_op is called is buffered and
  observed on the first await
- the subscribe request carries the full composite agent_id triple
- stream close wakes blocked waiters without raising (stream_alive()
  flips to False so callers can detect)

7/7 pass via `.venv/bin/python -m pytest test/unit/test_op_control.py`.
op_control.py coverage 89%.

Refs: AAASM-1654
PR-E of AAASM-1422 follow-up to keep the local gates green:

* `scripts/gen_proto.py` — add `--pyi_out` so grpcio-tools emits .pyi
  type stubs alongside the runtime `_pb2.py` files. Lets mypy resolve
  message attributes like `OpControlMessage`, `OpControlSubscribeRequest`,
  `OP_CONTROL_SIGNAL_*` instead of failing with `attr-defined`.
* `agent_assembly/proto/{common,policy}_pb2.pyi` — committed generated stubs.
* `agent_assembly/op_control.py` — one `# type: ignore[no-untyped-call]`
  on the `PolicyServiceStub(channel)` constructor (grpcio-tools doesn't
  generate type info for the `_grpc.py` files); removed an unused
  `# type: ignore[assignment]`.
* `test/unit/test_op_control.py` — ruff-autofix sweep (modern stdlib
  imports, no functional change). Tests still 7/7.

Local gates after this: `ruff check` ✅ · `mypy --ignore-missing-imports` ✅
· `pytest test/` ✅ 373 passed.

Refs: AAASM-1654
@codecov
Copy link
Copy Markdown

codecov Bot commented May 21, 2026

@sonarqubecloud
Copy link
Copy Markdown

Quality Gate Failed Quality Gate failed

Failed conditions
60.5% Coverage on New Code (required ≥ 80%)

See analysis details on SonarQube Cloud

@Chisanan232 Chisanan232 merged commit b1c1b68 into master May 21, 2026
22 of 24 checks passed
@Chisanan232 Chisanan232 deleted the v0.0.1/AAASM-1654/feat/op_control_subscriber branch May 21, 2026 06:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant