[AAASM-1654] ✨ (op_control): Subscribe to OpControlStream + cooperative pause / fast-fail terminate#50
Merged
Chisanan232 merged 6 commits intoMay 21, 2026
Conversation
PR-E of AAASM-1422 — the new OpControlSubscriber (lands in a follow-up commit) needs gRPC at runtime to subscribe to the gateway-side OpControlStream RPC introduced in AAASM-1653 (PR-D). Runtime: grpcio>=1.66,<2 plus protobuf>=5,<7 (transitive but pinned to the modern stable line). Dev: grpcio-tools>=1.66,<2 for the regen script in the next commit. Refs: AAASM-1654
…rotos
PR-E of AAASM-1422 — adds the gRPC client stubs the OpControlSubscriber
(landing in the next commit) needs to talk to the gateway's
OpControlStream RPC.
* `scripts/gen_proto.py` — regen script that reads protos from the
sibling agent-assembly checkout (overridable via $AA_PROTO_DIR per
the project's sibling-repo CI pattern). Post-processes the
generated files to rewrite `import xxx_pb2` → `from . import xxx_pb2`
so they work as a Python package.
* `agent_assembly/proto/{common,policy}_pb2{,_grpc}.py` — committed
generated stubs. Users get gRPC out of the box without needing
grpcio-tools installed.
A drift-check CI job that runs the script + asserts no diff is filed
as a follow-up hygiene sub-task; PR-E ships the generator + initial
output.
Refs: AAASM-1654
PR-E of AAASM-1422 — exception raised by OpControlSubscriber.await_op when an OP_CONTROL_SIGNAL_TERMINATE arrives for the awaited op. Carries the originating op_id so callers can correlate the failure to the operation they were tracking. Sibling of ToolExecutionBlockedError under AssemblyError — picks up the same `try / except AssemblyError` umbrella that existing SDK consumers already use. Refs: AAASM-1654
…inate PR-E of AAASM-1422 — the SDK-side consumer of OpControlStream introduced in AAASM-1653 (PR-D). Connects via grpcio, runs a daemon reader thread that drains the stream, dispatches each OpControlMessage to a per-op_id state slot, and exposes the slot via `await_op(op_id)`. State machine per op_id: - PAUSE → `await_op` blocks until RESUME (or terminate/timeout) - RESUME → `await_op` returns immediately - TERMINATE → `await_op` raises OpTerminatedError (latched, future calls also raise — once terminated, the op never becomes runnable again) - Buffered: a signal that arrives before anyone is awaiting is stored in the slot so the next `await_op` sees it Construct via `OpControlSubscriber.connect(gateway_url, org_id=..., team_id=..., agent_id=...)`. Tests inject a hand-rolled `_OpControlStub` Protocol implementor + call `_start()` directly to avoid touching the network. Out of scope (deferred per the AAASM-1654 starting comment): - Reconnection / heartbeat on stream close (caller observes via `stream_alive()` and re-instantiates if desired) - Auto-wiring into `init_assembly` / adapter `check_action` hooks Refs: AAASM-1654
PR-E of AAASM-1422 — 7 cases exercising the per-op state machine via a hand-rolled `_FakeStub` that returns a queue-backed iterator. No gRPC server needed. Coverage: - await_op returns immediately for an unknown op_id (no signal seen yet) - pause blocks await_op until resume arrives - terminate raises OpTerminatedError (op_id attached) - terminate while a waiter is blocked unblocks it AND raises - a signal that arrives before await_op is called is buffered and observed on the first await - the subscribe request carries the full composite agent_id triple - stream close wakes blocked waiters without raising (stream_alive() flips to False so callers can detect) 7/7 pass via `.venv/bin/python -m pytest test/unit/test_op_control.py`. op_control.py coverage 89%. Refs: AAASM-1654
PR-E of AAASM-1422 follow-up to keep the local gates green:
* `scripts/gen_proto.py` — add `--pyi_out` so grpcio-tools emits .pyi
type stubs alongside the runtime `_pb2.py` files. Lets mypy resolve
message attributes like `OpControlMessage`, `OpControlSubscribeRequest`,
`OP_CONTROL_SIGNAL_*` instead of failing with `attr-defined`.
* `agent_assembly/proto/{common,policy}_pb2.pyi` — committed generated stubs.
* `agent_assembly/op_control.py` — one `# type: ignore[no-untyped-call]`
on the `PolicyServiceStub(channel)` constructor (grpcio-tools doesn't
generate type info for the `_grpc.py` files); removed an unused
`# type: ignore[assignment]`.
* `test/unit/test_op_control.py` — ruff-autofix sweep (modern stdlib
imports, no functional change). Tests still 7/7.
Local gates after this: `ruff check` ✅ · `mypy --ignore-missing-imports` ✅
· `pytest test/` ✅ 373 passed.
Refs: AAASM-1654
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.


Description
PR-E of AAASM-1422. Adds the Python SDK consumer for the gateway-side
OpControlStreamserver-streaming RPC introduced in AAASM-1653 (agent-assembly PR-D, merged 2026-05-21 at1b2b9283).What this PR ships
pyproject.tomlgrpcio>=1.66,<2,protobuf>=5,<7pyproject.tomlgrpcio-tools>=1.66,<2scripts/gen_proto.pyagent-assembly/proto/(overridable via$AA_PROTO_DIR); post-processes sibling-relative imports tofrom . import …; emits.pyitype stubs so mypy resolves message attributesagent_assembly/proto/{common,policy}_pb2{,.pyi,_grpc}.pygrpcio-toolsagent_assembly/exceptions/OpTerminatedErrorAssemblyErrorsibling carryingop_idagent_assembly/op_control.pyOpControlSubscriber.connect(gateway_url, org_id=..., team_id=..., agent_id=...)opens the stream on a daemon background thread; per-op_idstate machine:await_op(op_id)blocks while paused, unblocks on resume, raisesOpTerminatedErroron terminate; signals for unknown ops are buffered until the firstawait_optest/unit/test_op_control.py_FakeStub(no live gRPC server)Scope boundary
PR-E ships the subscriber + state machine only. It does not:
init_assembly/ adaptercheck_actionhooks — exposed as a standalone class users opt into. The hook into the adapter layer becomes a separate follow-up when that surface is stable.subscriber.stream_alive()and can re-instantiate.Path-A note
The original sub-ticket framing assumed extending an existing PyO3 gRPC pattern in
aa-ffi-python. Closer inspection showed (a) no FFI gRPC client exists — the FFI is a Unix-socket IPC client toaa-runtime, (b) theRuntimeClient/GovernanceEventPython imports are wrapped intry/except ImportErrorand fail silently today. Filed AAASM-1686 for the FFI binding option, then closed it as cancelled in favour of the pure-Pythongrpcioapproach in this PR — see that ticket for the full decision trail.Type of Change
Breaking Changes
Pure additive — new module, new exception class, new deps. Existing SDK consumers untouched.
Related Issues
OpControlStreamhandler)Testing
test/unit/test_op_control.pyLocal gates
.venv/bin/ruff check ..venv/bin/mypy --ignore-missing-imports agent_assembly/op_control.py.venv/bin/python -m pytest test/.venv/bin/python -m pytest test/unit/test_op_control.py -vfrom agent_assembly.proto import policy_pb2; policy_pb2.OpControlMessageChecklist
ruff,mypy)Commits (6 granular)
(deps)Add grpcio + grpcio-tools for OpControlStream consumer(agent_assembly/proto)Generate Python stubs for policy + common protos(exceptions)Add OpTerminatedError for OpControlStream consumer(op_control)Add OpControlSubscriber for gateway pause/resume/terminate(test)Add pytest cases for OpControlSubscriber(op_control)Generate .pyi stubs + clean up ruff/mypy noise