From 040df2f44c6272e3ebc7f0034a1abd47d21ff3cb Mon Sep 17 00:00:00 2001 From: Chisanan232 Date: Wed, 29 Apr 2026 23:03:17 +0800 Subject: [PATCH 01/28] =?UTF-8?q?=E2=9C=A8=20(ffi):=20add=20aa-ffi-python?= =?UTF-8?q?=20crate=20scaffold=20and=20workspace=20wiring?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rust/Cargo.toml | 3 +++ rust/aa-ffi-python/Cargo.toml | 10 ++++++++++ rust/aa-ffi-python/src/lib.rs | 3 +++ 3 files changed, 16 insertions(+) create mode 100644 rust/Cargo.toml create mode 100644 rust/aa-ffi-python/Cargo.toml create mode 100644 rust/aa-ffi-python/src/lib.rs diff --git a/rust/Cargo.toml b/rust/Cargo.toml new file mode 100644 index 0000000..8839457 --- /dev/null +++ b/rust/Cargo.toml @@ -0,0 +1,3 @@ +[workspace] +members = ["aa-ffi-python"] +resolver = "2" diff --git a/rust/aa-ffi-python/Cargo.toml b/rust/aa-ffi-python/Cargo.toml new file mode 100644 index 0000000..07994cf --- /dev/null +++ b/rust/aa-ffi-python/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "aa-ffi-python" +version = "0.0.0" +edition = "2021" +description = "PyO3 bridge crate for the Agent Assembly Python SDK" +license = "MIT" + +[lib] +name = "aa_ffi_python" +crate-type = ["cdylib"] diff --git a/rust/aa-ffi-python/src/lib.rs b/rust/aa-ffi-python/src/lib.rs new file mode 100644 index 0000000..b78f0cf --- /dev/null +++ b/rust/aa-ffi-python/src/lib.rs @@ -0,0 +1,3 @@ +//! aa-ffi-python crate bootstrap. + +// Implementation will be introduced incrementally in AAASM-55 commits. From 82d2df97da0b107642a9f0dbf783352ddb5b9a99 Mon Sep 17 00:00:00 2001 From: Chisanan232 Date: Wed, 29 Apr 2026 23:03:29 +0800 Subject: [PATCH 02/28] =?UTF-8?q?=F0=9F=94=A7=20(ffi):=20add=20pyo3,=20pyo?= =?UTF-8?q?3-asyncio,=20tokio,=20and=20serde=20dependencies?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rust/aa-ffi-python/Cargo.toml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/rust/aa-ffi-python/Cargo.toml b/rust/aa-ffi-python/Cargo.toml index 07994cf..6472248 100644 --- a/rust/aa-ffi-python/Cargo.toml +++ b/rust/aa-ffi-python/Cargo.toml @@ -8,3 +8,11 @@ license = "MIT" [lib] name = "aa_ffi_python" crate-type = ["cdylib"] + +[dependencies] +once_cell = "1.20" +pyo3 = { version = "0.23", features = ["extension-module"] } +pyo3-asyncio = { version = "0.23", features = ["tokio-runtime"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +tokio = { version = "1.41", features = ["rt-multi-thread", "sync", "time"] } From 1cd36471e045335ed6bb66ce0d10c2f96f615193 Mon Sep 17 00:00:00 2001 From: Chisanan232 Date: Wed, 29 Apr 2026 23:03:40 +0800 Subject: [PATCH 03/28] =?UTF-8?q?=E2=9C=A8=20(ffi):=20add=20=5Fcore=20pymo?= =?UTF-8?q?dule=20bootstrap?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rust/aa-ffi-python/src/lib.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/rust/aa-ffi-python/src/lib.rs b/rust/aa-ffi-python/src/lib.rs index b78f0cf..50ac1e4 100644 --- a/rust/aa-ffi-python/src/lib.rs +++ b/rust/aa-ffi-python/src/lib.rs @@ -1,3 +1,8 @@ //! aa-ffi-python crate bootstrap. -// Implementation will be introduced incrementally in AAASM-55 commits. +use pyo3::prelude::*; + +#[pymodule] +fn _core(_py: Python<'_>, _module: &Bound<'_, PyModule>) -> PyResult<()> { + Ok(()) +} From 08bdd6a1a7e2ffc14057d0c7c2b3e727e1c374c2 Mon Sep 17 00:00:00 2001 From: Chisanan232 Date: Wed, 29 Apr 2026 23:03:52 +0800 Subject: [PATCH 04/28] =?UTF-8?q?=E2=9C=A8=20(ffi):=20add=20GovernanceEven?= =?UTF-8?q?t=20pyclass=20skeleton?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rust/aa-ffi-python/src/lib.rs | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/rust/aa-ffi-python/src/lib.rs b/rust/aa-ffi-python/src/lib.rs index 50ac1e4..7b2ffd0 100644 --- a/rust/aa-ffi-python/src/lib.rs +++ b/rust/aa-ffi-python/src/lib.rs @@ -2,7 +2,23 @@ use pyo3::prelude::*; +#[pyclass(module = "agent_assembly._core")] +#[derive(Clone)] +struct GovernanceEvent { + #[pyo3(get)] + payload_json: String, +} + +#[pymethods] +impl GovernanceEvent { + #[new] + fn new(payload_json: String) -> Self { + Self { payload_json } + } +} + #[pymodule] -fn _core(_py: Python<'_>, _module: &Bound<'_, PyModule>) -> PyResult<()> { +fn _core(_py: Python<'_>, module: &Bound<'_, PyModule>) -> PyResult<()> { + module.add_class::()?; Ok(()) } From 2a949b9d2b612b451efff8545c8f00bae1dec14c Mon Sep 17 00:00:00 2001 From: Chisanan232 Date: Wed, 29 Apr 2026 23:04:04 +0800 Subject: [PATCH 05/28] =?UTF-8?q?=E2=9C=A8=20(ffi):=20add=20PolicyResult?= =?UTF-8?q?=20pyclass=20skeleton?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rust/aa-ffi-python/src/lib.rs | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/rust/aa-ffi-python/src/lib.rs b/rust/aa-ffi-python/src/lib.rs index 7b2ffd0..ca9d05c 100644 --- a/rust/aa-ffi-python/src/lib.rs +++ b/rust/aa-ffi-python/src/lib.rs @@ -17,8 +17,29 @@ impl GovernanceEvent { } } +#[pyclass(module = "agent_assembly._core")] +#[derive(Clone)] +struct PolicyResult { + #[pyo3(get)] + allowed: bool, + #[pyo3(get)] + reason: String, +} + +#[pymethods] +impl PolicyResult { + #[new] + fn new(allowed: bool, reason: Option) -> Self { + Self { + allowed, + reason: reason.unwrap_or_default(), + } + } +} + #[pymodule] fn _core(_py: Python<'_>, module: &Bound<'_, PyModule>) -> PyResult<()> { module.add_class::()?; + module.add_class::()?; Ok(()) } From 524d2c2adbe58eff07980e5cf9a60d4f0f9e39ca Mon Sep 17 00:00:00 2001 From: Chisanan232 Date: Wed, 29 Apr 2026 23:04:14 +0800 Subject: [PATCH 06/28] =?UTF-8?q?=E2=9C=A8=20(ffi):=20add=20RuntimeClient?= =?UTF-8?q?=20pyclass=20skeleton?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rust/aa-ffi-python/src/lib.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/rust/aa-ffi-python/src/lib.rs b/rust/aa-ffi-python/src/lib.rs index ca9d05c..9b9b244 100644 --- a/rust/aa-ffi-python/src/lib.rs +++ b/rust/aa-ffi-python/src/lib.rs @@ -37,9 +37,24 @@ impl PolicyResult { } } +#[pyclass(module = "agent_assembly._core")] +struct RuntimeClient { + #[pyo3(get)] + socket_path: String, +} + +#[pymethods] +impl RuntimeClient { + #[new] + fn new(socket_path: String) -> Self { + Self { socket_path } + } +} + #[pymodule] fn _core(_py: Python<'_>, module: &Bound<'_, PyModule>) -> PyResult<()> { module.add_class::()?; module.add_class::()?; + module.add_class::()?; Ok(()) } From 2ac3390d340415185b22c66ac23b0b3638265260 Mon Sep 17 00:00:00 2001 From: Chisanan232 Date: Wed, 29 Apr 2026 23:04:26 +0800 Subject: [PATCH 07/28] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20(ffi):=20add=20share?= =?UTF-8?q?d=20Lazy=20tokio=20runtime=20bootstrap?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rust/aa-ffi-python/src/lib.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/rust/aa-ffi-python/src/lib.rs b/rust/aa-ffi-python/src/lib.rs index 9b9b244..799ad76 100644 --- a/rust/aa-ffi-python/src/lib.rs +++ b/rust/aa-ffi-python/src/lib.rs @@ -1,6 +1,16 @@ //! aa-ffi-python crate bootstrap. +use once_cell::sync::Lazy; use pyo3::prelude::*; +use tokio::runtime::Runtime; + +static TOKIO_RUNTIME: Lazy = Lazy::new(|| { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .thread_name("aa-ffi-python") + .build() + .expect("failed to build aa-ffi-python tokio runtime") +}); #[pyclass(module = "agent_assembly._core")] #[derive(Clone)] From 03d68e0379484ba73bb7eb5e944103b5590db2ef Mon Sep 17 00:00:00 2001 From: Chisanan232 Date: Wed, 29 Apr 2026 23:04:38 +0800 Subject: [PATCH 08/28] =?UTF-8?q?=E2=9C=A8=20(ffi):=20add=20RuntimeClient.?= =?UTF-8?q?connect=20socket=20constructor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rust/aa-ffi-python/src/lib.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/rust/aa-ffi-python/src/lib.rs b/rust/aa-ffi-python/src/lib.rs index 799ad76..dd62834 100644 --- a/rust/aa-ffi-python/src/lib.rs +++ b/rust/aa-ffi-python/src/lib.rs @@ -59,6 +59,12 @@ impl RuntimeClient { fn new(socket_path: String) -> Self { Self { socket_path } } + + #[staticmethod] + fn connect(socket_path: String) -> Self { + let _ = &*TOKIO_RUNTIME; + Self { socket_path } + } } #[pymodule] From 12fddf1f1b400571fa3916b715b9a02d6eebce61 Mon Sep 17 00:00:00 2001 From: Chisanan232 Date: Wed, 29 Apr 2026 23:05:03 +0800 Subject: [PATCH 09/28] =?UTF-8?q?=E2=9C=A8=20(ffi):=20add=20internal=20tok?= =?UTF-8?q?io=20channel=20worker=20for=20runtime=20client?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rust/aa-ffi-python/src/lib.rs | 35 +++++++++++++++++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/rust/aa-ffi-python/src/lib.rs b/rust/aa-ffi-python/src/lib.rs index dd62834..39947d7 100644 --- a/rust/aa-ffi-python/src/lib.rs +++ b/rust/aa-ffi-python/src/lib.rs @@ -2,7 +2,10 @@ use once_cell::sync::Lazy; use pyo3::prelude::*; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; use tokio::runtime::Runtime; +use tokio::sync::mpsc; static TOKIO_RUNTIME: Lazy = Lazy::new(|| { tokio::runtime::Builder::new_multi_thread() @@ -51,19 +54,47 @@ impl PolicyResult { struct RuntimeClient { #[pyo3(get)] socket_path: String, + sender: Option>, + closed: Arc, +} + +#[derive(Clone)] +enum WorkerMessage { + Event(GovernanceEvent), + Close, } #[pymethods] impl RuntimeClient { #[new] fn new(socket_path: String) -> Self { - Self { socket_path } + Self { + socket_path, + sender: None, + closed: Arc::new(AtomicBool::new(true)), + } } #[staticmethod] fn connect(socket_path: String) -> Self { let _ = &*TOKIO_RUNTIME; - Self { socket_path } + let (sender, mut receiver) = mpsc::unbounded_channel::(); + let closed = Arc::new(AtomicBool::new(false)); + let closed_for_task = Arc::clone(&closed); + TOKIO_RUNTIME.spawn(async move { + while let Some(message) = receiver.recv().await { + match message { + WorkerMessage::Event(_event) => {} + WorkerMessage::Close => break, + } + } + closed_for_task.store(true, Ordering::SeqCst); + }); + Self { + socket_path, + sender: Some(sender), + closed, + } } } From 2a311907678597757e62e5797fdabb1d463cc59f Mon Sep 17 00:00:00 2001 From: Chisanan232 Date: Wed, 29 Apr 2026 23:05:21 +0800 Subject: [PATCH 10/28] =?UTF-8?q?=E2=9C=A8=20(ffi):=20implement=20non-bloc?= =?UTF-8?q?king=20RuntimeClient.send=5Fevent=20enqueue?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rust/aa-ffi-python/src/lib.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/rust/aa-ffi-python/src/lib.rs b/rust/aa-ffi-python/src/lib.rs index 39947d7..07463ef 100644 --- a/rust/aa-ffi-python/src/lib.rs +++ b/rust/aa-ffi-python/src/lib.rs @@ -1,6 +1,7 @@ //! aa-ffi-python crate bootstrap. use once_cell::sync::Lazy; +use pyo3::exceptions::PyRuntimeError; use pyo3::prelude::*; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; @@ -96,6 +97,20 @@ impl RuntimeClient { closed, } } + + fn send_event(&self, event: GovernanceEvent) -> PyResult<()> { + if self.closed.load(Ordering::SeqCst) { + return Err(PyRuntimeError::new_err("runtime client is closed")); + } + let sender = self + .sender + .as_ref() + .ok_or_else(|| PyRuntimeError::new_err("runtime event queue is unavailable"))?; + sender + .send(WorkerMessage::Event(event)) + .map_err(|_| PyRuntimeError::new_err("failed to enqueue governance event"))?; + Ok(()) + } } #[pymodule] From 6d1f42e1aa48bcd8fd69f31d953866d29f8ab3df Mon Sep 17 00:00:00 2001 From: Chisanan232 Date: Wed, 29 Apr 2026 23:05:56 +0800 Subject: [PATCH 11/28] =?UTF-8?q?=E2=9C=A8=20(ffi):=20add=20policy=20query?= =?UTF-8?q?=20serialization=20and=20worker=20response=20channel?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rust/aa-ffi-python/src/lib.rs | 67 ++++++++++++++++++++++++++++++++++- 1 file changed, 66 insertions(+), 1 deletion(-) diff --git a/rust/aa-ffi-python/src/lib.rs b/rust/aa-ffi-python/src/lib.rs index 07463ef..cda78fa 100644 --- a/rust/aa-ffi-python/src/lib.rs +++ b/rust/aa-ffi-python/src/lib.rs @@ -3,10 +3,11 @@ use once_cell::sync::Lazy; use pyo3::exceptions::PyRuntimeError; use pyo3::prelude::*; +use serde_json::Value; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use tokio::runtime::Runtime; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; static TOKIO_RUNTIME: Lazy = Lazy::new(|| { tokio::runtime::Builder::new_multi_thread() @@ -62,9 +63,19 @@ struct RuntimeClient { #[derive(Clone)] enum WorkerMessage { Event(GovernanceEvent), + PolicyQuery { + action_json: String, + response_tx: oneshot::Sender, + }, Close, } +#[derive(Clone)] +struct PolicyResultPayload { + allowed: bool, + reason: String, +} + #[pymethods] impl RuntimeClient { #[new] @@ -86,6 +97,13 @@ impl RuntimeClient { while let Some(message) = receiver.recv().await { match message { WorkerMessage::Event(_event) => {} + WorkerMessage::PolicyQuery { + action_json, + response_tx, + } => { + let policy_result = evaluate_policy_action(&action_json); + let _ = response_tx.send(policy_result); + } WorkerMessage::Close => break, } } @@ -111,6 +129,53 @@ impl RuntimeClient { .map_err(|_| PyRuntimeError::new_err("failed to enqueue governance event"))?; Ok(()) } + + fn query_policy(&self, py: Python<'_>, action: &Bound<'_, PyAny>) -> PyResult { + let action_json = serialize_action_to_json(py, action)?; + let sender = self + .sender + .as_ref() + .ok_or_else(|| PyRuntimeError::new_err("runtime event queue is unavailable"))?; + let (response_tx, response_rx) = oneshot::channel::(); + sender + .send(WorkerMessage::PolicyQuery { + action_json, + response_tx, + }) + .map_err(|_| PyRuntimeError::new_err("failed to enqueue policy query"))?; + let payload = response_rx + .blocking_recv() + .map_err(|_| PyRuntimeError::new_err("failed to resolve policy query"))?; + Ok(PolicyResult { + allowed: payload.allowed, + reason: payload.reason, + }) + } +} + +fn serialize_action_to_json(py: Python<'_>, action: &Bound<'_, PyAny>) -> PyResult { + let json_module = PyModule::import(py, "json")?; + let dumped = json_module.call_method1("dumps", (action,))?; + dumped.extract::() +} + +fn evaluate_policy_action(action_json: &str) -> PolicyResultPayload { + let parsed: Value = serde_json::from_str(action_json).unwrap_or(Value::Null); + let deny_flag = parsed + .as_object() + .and_then(|obj| obj.get("deny")) + .and_then(Value::as_bool) + .unwrap_or(false); + if deny_flag { + return PolicyResultPayload { + allowed: false, + reason: "Denied by local policy rule.".to_string(), + }; + } + PolicyResultPayload { + allowed: true, + reason: String::new(), + } } #[pymodule] From 06eeeabcd198f07f6009ab95716fc1fd10f391b1 Mon Sep 17 00:00:00 2001 From: Chisanan232 Date: Wed, 29 Apr 2026 23:06:21 +0800 Subject: [PATCH 12/28] =?UTF-8?q?=E2=9C=A8=20(ffi):=20add=20query=5Fpolicy?= =?UTF-8?q?=20timeout=20handling=20and=20PolicyTimeoutError?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rust/aa-ffi-python/src/lib.rs | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/rust/aa-ffi-python/src/lib.rs b/rust/aa-ffi-python/src/lib.rs index cda78fa..0bccea9 100644 --- a/rust/aa-ffi-python/src/lib.rs +++ b/rust/aa-ffi-python/src/lib.rs @@ -3,11 +3,16 @@ use once_cell::sync::Lazy; use pyo3::exceptions::PyRuntimeError; use pyo3::prelude::*; +use pyo3::types::PyDict; use serde_json::Value; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::Duration; use tokio::runtime::Runtime; use tokio::sync::{mpsc, oneshot}; +use tokio::time; + +pyo3::create_exception!(_core, PolicyTimeoutError, pyo3::exceptions::PyTimeoutError); static TOKIO_RUNTIME: Lazy = Lazy::new(|| { tokio::runtime::Builder::new_multi_thread() @@ -132,6 +137,7 @@ impl RuntimeClient { fn query_policy(&self, py: Python<'_>, action: &Bound<'_, PyAny>) -> PyResult { let action_json = serialize_action_to_json(py, action)?; + let timeout_ms = extract_timeout_ms(action); let sender = self .sender .as_ref() @@ -143,8 +149,9 @@ impl RuntimeClient { response_tx, }) .map_err(|_| PyRuntimeError::new_err("failed to enqueue policy query"))?; - let payload = response_rx - .blocking_recv() + let payload = TOKIO_RUNTIME + .block_on(async move { time::timeout(Duration::from_millis(timeout_ms), response_rx).await }) + .map_err(|_| PolicyTimeoutError::new_err("policy query timed out"))? .map_err(|_| PyRuntimeError::new_err("failed to resolve policy query"))?; Ok(PolicyResult { allowed: payload.allowed, @@ -153,6 +160,15 @@ impl RuntimeClient { } } +fn extract_timeout_ms(action: &Bound<'_, PyAny>) -> u64 { + action + .downcast::() + .ok() + .and_then(|dict| dict.get_item("timeout_ms").ok().flatten()) + .and_then(|value| value.extract::().ok()) + .unwrap_or(50) +} + fn serialize_action_to_json(py: Python<'_>, action: &Bound<'_, PyAny>) -> PyResult { let json_module = PyModule::import(py, "json")?; let dumped = json_module.call_method1("dumps", (action,))?; @@ -180,6 +196,7 @@ fn evaluate_policy_action(action_json: &str) -> PolicyResultPayload { #[pymodule] fn _core(_py: Python<'_>, module: &Bound<'_, PyModule>) -> PyResult<()> { + module.add("PolicyTimeoutError", _py.get_type::())?; module.add_class::()?; module.add_class::()?; module.add_class::()?; From 9b22d4f41e6c277bcfacb64dd2651db091f2360f Mon Sep 17 00:00:00 2001 From: Chisanan232 Date: Wed, 29 Apr 2026 23:06:38 +0800 Subject: [PATCH 13/28] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20(ffi):=20release=20G?= =?UTF-8?q?IL=20while=20waiting=20for=20query=5Fpolicy=20response?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rust/aa-ffi-python/src/lib.rs | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/rust/aa-ffi-python/src/lib.rs b/rust/aa-ffi-python/src/lib.rs index 0bccea9..2b09399 100644 --- a/rust/aa-ffi-python/src/lib.rs +++ b/rust/aa-ffi-python/src/lib.rs @@ -81,6 +81,11 @@ struct PolicyResultPayload { reason: String, } +enum PolicyWaitError { + Timeout, + Disconnected, +} + #[pymethods] impl RuntimeClient { #[new] @@ -149,10 +154,13 @@ impl RuntimeClient { response_tx, }) .map_err(|_| PyRuntimeError::new_err("failed to enqueue policy query"))?; - let payload = TOKIO_RUNTIME - .block_on(async move { time::timeout(Duration::from_millis(timeout_ms), response_rx).await }) - .map_err(|_| PolicyTimeoutError::new_err("policy query timed out"))? - .map_err(|_| PyRuntimeError::new_err("failed to resolve policy query"))?; + let payload = py.allow_threads(|| wait_for_policy_response(timeout_ms, response_rx)); + let payload = payload.map_err(|error| match error { + PolicyWaitError::Timeout => PolicyTimeoutError::new_err("policy query timed out"), + PolicyWaitError::Disconnected => { + PyRuntimeError::new_err("failed to resolve policy query") + } + })?; Ok(PolicyResult { allowed: payload.allowed, reason: payload.reason, @@ -194,6 +202,16 @@ fn evaluate_policy_action(action_json: &str) -> PolicyResultPayload { } } +fn wait_for_policy_response( + timeout_ms: u64, + response_rx: oneshot::Receiver, +) -> Result { + TOKIO_RUNTIME + .block_on(async move { time::timeout(Duration::from_millis(timeout_ms), response_rx).await }) + .map_err(|_| PolicyWaitError::Timeout)? + .map_err(|_| PolicyWaitError::Disconnected) +} + #[pymodule] fn _core(_py: Python<'_>, module: &Bound<'_, PyModule>) -> PyResult<()> { module.add("PolicyTimeoutError", _py.get_type::())?; From d7634e261ee418ccc7a6e5f4be6dcd7bbcb59f72 Mon Sep 17 00:00:00 2001 From: Chisanan232 Date: Wed, 29 Apr 2026 23:06:47 +0800 Subject: [PATCH 14/28] =?UTF-8?q?=E2=9C=A8=20(ffi):=20implement=20RuntimeC?= =?UTF-8?q?lient.close=20graceful=20shutdown?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rust/aa-ffi-python/src/lib.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/rust/aa-ffi-python/src/lib.rs b/rust/aa-ffi-python/src/lib.rs index 2b09399..527286e 100644 --- a/rust/aa-ffi-python/src/lib.rs +++ b/rust/aa-ffi-python/src/lib.rs @@ -166,6 +166,15 @@ impl RuntimeClient { reason: payload.reason, }) } + + fn close(&mut self) { + if self.closed.swap(true, Ordering::SeqCst) { + return; + } + if let Some(sender) = self.sender.take() { + let _ = sender.send(WorkerMessage::Close); + } + } } fn extract_timeout_ms(action: &Bound<'_, PyAny>) -> u64 { From c38008afe8ec399e4248672062faea57c653d466 Mon Sep 17 00:00:00 2001 From: Chisanan232 Date: Wed, 29 Apr 2026 23:07:14 +0800 Subject: [PATCH 15/28] =?UTF-8?q?=F0=9F=94=A7=20(python):=20wire=20maturin?= =?UTF-8?q?=20module-name=20to=20agent=5Fassembly.=5Fcore=20export=20path?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent_assembly/__init__.py | 20 ++++++++++++++++++++ rust/aa-ffi-python/pyproject.toml | 14 ++++++++++++++ 2 files changed, 34 insertions(+) create mode 100644 rust/aa-ffi-python/pyproject.toml diff --git a/agent_assembly/__init__.py b/agent_assembly/__init__.py index f5aaab8..fcdf256 100644 --- a/agent_assembly/__init__.py +++ b/agent_assembly/__init__.py @@ -12,6 +12,16 @@ ToolExecutionBlockedError, ) +try: + from agent_assembly._core import ( # type: ignore[attr-defined] + GovernanceEvent, + PolicyResult, + PolicyTimeoutError, + RuntimeClient, + ) +except ImportError: + pass + __version__ = "0.0.0" __all__ = [ @@ -28,3 +38,13 @@ "AdapterValidationError", "ToolExecutionBlockedError", ] + +if "RuntimeClient" in globals(): + __all__.extend( + [ + "RuntimeClient", + "GovernanceEvent", + "PolicyResult", + "PolicyTimeoutError", + ] + ) diff --git a/rust/aa-ffi-python/pyproject.toml b/rust/aa-ffi-python/pyproject.toml new file mode 100644 index 0000000..b277e05 --- /dev/null +++ b/rust/aa-ffi-python/pyproject.toml @@ -0,0 +1,14 @@ +[build-system] +requires = ["maturin>=1.7,<2.0"] +build-backend = "maturin" + +[project] +name = "agent-assembly-core" +version = "0.0.0" +description = "Native core extension for Agent Assembly Python SDK" +requires-python = ">=3.12,<4.0" + +[tool.maturin] +python-source = "../.." +module-name = "agent_assembly._core" +features = ["pyo3/extension-module"] From 1fb90886f692b5bb8619bca26912f16ca1d9b36e Mon Sep 17 00:00:00 2001 From: Chisanan232 Date: Wed, 29 Apr 2026 23:09:26 +0800 Subject: [PATCH 16/28] =?UTF-8?q?=F0=9F=94=A7=20(ffi):=20align=20PyO3=20AP?= =?UTF-8?q?I=20compatibility=20with=20available=20pyo3-asyncio=20versions?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rust/aa-ffi-python/Cargo.toml | 4 ++-- rust/aa-ffi-python/src/lib.rs | 11 +++++------ 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/rust/aa-ffi-python/Cargo.toml b/rust/aa-ffi-python/Cargo.toml index 6472248..c72b7af 100644 --- a/rust/aa-ffi-python/Cargo.toml +++ b/rust/aa-ffi-python/Cargo.toml @@ -11,8 +11,8 @@ crate-type = ["cdylib"] [dependencies] once_cell = "1.20" -pyo3 = { version = "0.23", features = ["extension-module"] } -pyo3-asyncio = { version = "0.23", features = ["tokio-runtime"] } +pyo3 = { version = "0.20", features = ["extension-module"] } +pyo3-asyncio = { version = "0.20", features = ["tokio-runtime"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" tokio = { version = "1.41", features = ["rt-multi-thread", "sync", "time"] } diff --git a/rust/aa-ffi-python/src/lib.rs b/rust/aa-ffi-python/src/lib.rs index 527286e..2abc66b 100644 --- a/rust/aa-ffi-python/src/lib.rs +++ b/rust/aa-ffi-python/src/lib.rs @@ -65,7 +65,6 @@ struct RuntimeClient { closed: Arc, } -#[derive(Clone)] enum WorkerMessage { Event(GovernanceEvent), PolicyQuery { @@ -140,7 +139,7 @@ impl RuntimeClient { Ok(()) } - fn query_policy(&self, py: Python<'_>, action: &Bound<'_, PyAny>) -> PyResult { + fn query_policy(&self, py: Python<'_>, action: &PyAny) -> PyResult { let action_json = serialize_action_to_json(py, action)?; let timeout_ms = extract_timeout_ms(action); let sender = self @@ -177,7 +176,7 @@ impl RuntimeClient { } } -fn extract_timeout_ms(action: &Bound<'_, PyAny>) -> u64 { +fn extract_timeout_ms(action: &PyAny) -> u64 { action .downcast::() .ok() @@ -186,7 +185,7 @@ fn extract_timeout_ms(action: &Bound<'_, PyAny>) -> u64 { .unwrap_or(50) } -fn serialize_action_to_json(py: Python<'_>, action: &Bound<'_, PyAny>) -> PyResult { +fn serialize_action_to_json(py: Python<'_>, action: &PyAny) -> PyResult { let json_module = PyModule::import(py, "json")?; let dumped = json_module.call_method1("dumps", (action,))?; dumped.extract::() @@ -222,8 +221,8 @@ fn wait_for_policy_response( } #[pymodule] -fn _core(_py: Python<'_>, module: &Bound<'_, PyModule>) -> PyResult<()> { - module.add("PolicyTimeoutError", _py.get_type::())?; +fn _core(py: Python<'_>, module: &PyModule) -> PyResult<()> { + module.add("PolicyTimeoutError", py.get_type::())?; module.add_class::()?; module.add_class::()?; module.add_class::()?; From 6437dee84755a7533ff7e882ab532a86a2016e48 Mon Sep 17 00:00:00 2001 From: Chisanan232 Date: Wed, 29 Apr 2026 23:09:50 +0800 Subject: [PATCH 17/28] =?UTF-8?q?=F0=9F=A7=AA=20(tests):=20add=20maturin?= =?UTF-8?q?=20develop=20smoke=20test=20for=20RuntimeClient=20import?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- test/integration/test_native_core_maturin.py | 31 ++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 test/integration/test_native_core_maturin.py diff --git a/test/integration/test_native_core_maturin.py b/test/integration/test_native_core_maturin.py new file mode 100644 index 0000000..a39eaf8 --- /dev/null +++ b/test/integration/test_native_core_maturin.py @@ -0,0 +1,31 @@ +from __future__ import annotations + +import os +import subprocess +import sys + +import pytest + + +@pytest.mark.integration +def test_maturin_develop_exposes_runtime_client() -> None: + if os.getenv("AAASM_RUN_MATURIN_TESTS") != "1": + pytest.skip("Set AAASM_RUN_MATURIN_TESTS=1 to run maturin integration smoke tests.") + + command = [ + "uv", + "tool", + "run", + "maturin", + "develop", + "--manifest-path", + "rust/aa-ffi-python/Cargo.toml", + "--release", + ] + subprocess.run(command, check=True) + + from agent_assembly._core import RuntimeClient + + assert RuntimeClient is not None + assert hasattr(RuntimeClient, "connect") + assert sys.modules.get("agent_assembly._core") is not None From b103275a73ea86439790048a4cf1ff7f47b7684b Mon Sep 17 00:00:00 2001 From: Chisanan232 Date: Wed, 29 Apr 2026 23:10:40 +0800 Subject: [PATCH 18/28] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20(ffi):=20add=20delay?= =?UTF-8?q?-aware=20local=20policy=20evaluation=20for=20timeout=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rust/aa-ffi-python/src/lib.rs | 35 +++++++++++++++++++++++++++-------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/rust/aa-ffi-python/src/lib.rs b/rust/aa-ffi-python/src/lib.rs index 2abc66b..ad59a4d 100644 --- a/rust/aa-ffi-python/src/lib.rs +++ b/rust/aa-ffi-python/src/lib.rs @@ -111,7 +111,10 @@ impl RuntimeClient { response_tx, } => { let policy_result = evaluate_policy_action(&action_json); - let _ = response_tx.send(policy_result); + if policy_result.delay_ms > 0 { + time::sleep(Duration::from_millis(policy_result.delay_ms)).await; + } + let _ = response_tx.send(policy_result.result); } WorkerMessage::Close => break, } @@ -191,22 +194,38 @@ fn serialize_action_to_json(py: Python<'_>, action: &PyAny) -> PyResult dumped.extract::() } -fn evaluate_policy_action(action_json: &str) -> PolicyResultPayload { +struct PolicyEvaluation { + result: PolicyResultPayload, + delay_ms: u64, +} + +fn evaluate_policy_action(action_json: &str) -> PolicyEvaluation { let parsed: Value = serde_json::from_str(action_json).unwrap_or(Value::Null); + let delay_ms = parsed + .as_object() + .and_then(|obj| obj.get("delay_ms")) + .and_then(Value::as_u64) + .unwrap_or(0); let deny_flag = parsed .as_object() .and_then(|obj| obj.get("deny")) .and_then(Value::as_bool) .unwrap_or(false); if deny_flag { - return PolicyResultPayload { - allowed: false, - reason: "Denied by local policy rule.".to_string(), + return PolicyEvaluation { + result: PolicyResultPayload { + allowed: false, + reason: "Denied by local policy rule.".to_string(), + }, + delay_ms, }; } - PolicyResultPayload { - allowed: true, - reason: String::new(), + PolicyEvaluation { + result: PolicyResultPayload { + allowed: true, + reason: String::new(), + }, + delay_ms, } } From 1c5e14e4969b9043b68db3a71a5f211ede628c34 Mon Sep 17 00:00:00 2001 From: Chisanan232 Date: Wed, 29 Apr 2026 23:11:02 +0800 Subject: [PATCH 19/28] =?UTF-8?q?=F0=9F=A7=AA=20(tests):=20add=20non-block?= =?UTF-8?q?ing=20send=5Fevent=20runtime=20acceptance=20test?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- test/integration/test_native_core_runtime.py | 26 ++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 test/integration/test_native_core_runtime.py diff --git a/test/integration/test_native_core_runtime.py b/test/integration/test_native_core_runtime.py new file mode 100644 index 0000000..ad3a9d0 --- /dev/null +++ b/test/integration/test_native_core_runtime.py @@ -0,0 +1,26 @@ +from __future__ import annotations + +import os +import time + +import pytest + + +@pytest.fixture() +def native_core(): + if os.getenv("AAASM_RUN_NATIVE_CORE_TESTS") != "1": + pytest.skip("Set AAASM_RUN_NATIVE_CORE_TESTS=1 to run native core runtime tests.") + return pytest.importorskip("agent_assembly._core") + + +@pytest.mark.integration +def test_send_event_is_non_blocking(native_core) -> None: + client = native_core.RuntimeClient.connect("/tmp/aaasm55.sock") + try: + start = time.perf_counter() + for index in range(500): + client.send_event(native_core.GovernanceEvent(f'{{"index": {index}}}')) + elapsed_ms = (time.perf_counter() - start) * 1000.0 + assert elapsed_ms < 50.0 + finally: + client.close() From 76411a915186ce8b164b18bdbdfff2f2f6f90e67 Mon Sep 17 00:00:00 2001 From: Chisanan232 Date: Wed, 29 Apr 2026 23:11:12 +0800 Subject: [PATCH 20/28] =?UTF-8?q?=F0=9F=A7=AA=20(tests):=20add=20query=5Fp?= =?UTF-8?q?olicy=20latency=20and=20timeout=20acceptance=20checks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- test/integration/test_native_core_runtime.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/test/integration/test_native_core_runtime.py b/test/integration/test_native_core_runtime.py index ad3a9d0..c8b1123 100644 --- a/test/integration/test_native_core_runtime.py +++ b/test/integration/test_native_core_runtime.py @@ -24,3 +24,19 @@ def test_send_event_is_non_blocking(native_core) -> None: assert elapsed_ms < 50.0 finally: client.close() + + +@pytest.mark.integration +def test_query_policy_returns_quickly_and_times_out(native_core) -> None: + client = native_core.RuntimeClient.connect("/tmp/aaasm55.sock") + try: + start = time.perf_counter() + result = client.query_policy({"action": "tool.call", "timeout_ms": 50}) + elapsed_ms = (time.perf_counter() - start) * 1000.0 + assert elapsed_ms < 50.0 + assert result.allowed is True + + with pytest.raises(native_core.PolicyTimeoutError): + client.query_policy({"action": "slow.call", "delay_ms": 200, "timeout_ms": 10}) + finally: + client.close() From 8e9e2a577380288da39d523f88f22fe2cfb53d53 Mon Sep 17 00:00:00 2001 From: Chisanan232 Date: Wed, 29 Apr 2026 23:11:26 +0800 Subject: [PATCH 21/28] =?UTF-8?q?=F0=9F=A7=AA=20(tests):=20add=20concurren?= =?UTF-8?q?t=20thread=20deadlock=20regression=20test?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- test/integration/test_native_core_runtime.py | 27 ++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/test/integration/test_native_core_runtime.py b/test/integration/test_native_core_runtime.py index c8b1123..d4acc42 100644 --- a/test/integration/test_native_core_runtime.py +++ b/test/integration/test_native_core_runtime.py @@ -1,6 +1,7 @@ from __future__ import annotations import os +import threading import time import pytest @@ -40,3 +41,29 @@ def test_query_policy_returns_quickly_and_times_out(native_core) -> None: client.query_policy({"action": "slow.call", "delay_ms": 200, "timeout_ms": 10}) finally: client.close() + + +@pytest.mark.integration +def test_runtime_client_has_no_thread_deadlock(native_core) -> None: + client = native_core.RuntimeClient.connect("/tmp/aaasm55.sock") + errors: list[Exception] = [] + + def worker(worker_id: int) -> None: + try: + for index in range(100): + client.send_event(native_core.GovernanceEvent(f'{{"worker": {worker_id}, "idx": {index}}}')) + client.query_policy({"action": "tool.call", "timeout_ms": 50}) + except Exception as error: # pragma: no cover - runtime guard + errors.append(error) + + threads = [threading.Thread(target=worker, args=(worker_id,)) for worker_id in range(8)] + for thread in threads: + thread.start() + for thread in threads: + thread.join(timeout=5) + + try: + assert all(not thread.is_alive() for thread in threads) + assert errors == [] + finally: + client.close() From 06fe0c451d7968f15ce720dbc0c9baa81c91c75d Mon Sep 17 00:00:00 2001 From: Chisanan232 Date: Wed, 29 Apr 2026 23:11:41 +0800 Subject: [PATCH 22/28] =?UTF-8?q?=F0=9F=A7=AA=20(tests):=20add=20tracemall?= =?UTF-8?q?oc=20leak=20guard=20for=2010k=20event=20throughput?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- test/integration/test_native_core_runtime.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/test/integration/test_native_core_runtime.py b/test/integration/test_native_core_runtime.py index d4acc42..dcd5384 100644 --- a/test/integration/test_native_core_runtime.py +++ b/test/integration/test_native_core_runtime.py @@ -1,8 +1,10 @@ from __future__ import annotations +import gc import os import threading import time +import tracemalloc import pytest @@ -67,3 +69,19 @@ def worker(worker_id: int) -> None: assert errors == [] finally: client.close() + + +@pytest.mark.integration +def test_runtime_client_tracemalloc_leak_guard(native_core) -> None: + client = native_core.RuntimeClient.connect("/tmp/aaasm55.sock") + tracemalloc.start() + baseline_current, _ = tracemalloc.get_traced_memory() + try: + for index in range(10_000): + client.send_event(native_core.GovernanceEvent(f'{{"index": {index}}}')) + gc.collect() + current, _ = tracemalloc.get_traced_memory() + assert current - baseline_current < 1_000_000 + finally: + tracemalloc.stop() + client.close() From 944f3b066dbcbe7d78847a219cc0481d8d0a048c Mon Sep 17 00:00:00 2001 From: Chisanan232 Date: Wed, 29 Apr 2026 23:11:53 +0800 Subject: [PATCH 23/28] =?UTF-8?q?=F0=9F=93=9D=20(docs):=20add=20native=20c?= =?UTF-8?q?ore=20build=20and=20runtime=20test=20usage=20notes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/README.md b/README.md index 716cc25..cb9c6bb 100644 --- a/README.md +++ b/README.md @@ -89,6 +89,27 @@ uv run ruff check . uv run mypy agent_assembly ``` +## Native Core Extension (AAASM-55) + +Build and install the PyO3 extension locally: + +```bash +uv tool run maturin develop --manifest-path rust/aa-ffi-python/Cargo.toml --release +``` + +Validate native module import: + +```python +from agent_assembly._core import RuntimeClient, GovernanceEvent, PolicyResult +``` + +Run opt-in native integration tests: + +```bash +AAASM_RUN_NATIVE_CORE_TESTS=1 uv run pytest test/integration/test_native_core_runtime.py +AAASM_RUN_MATURIN_TESTS=1 uv run pytest test/integration/test_native_core_maturin.py +``` + ## Documentation - Project docs source: `docs/` From 8435365ee1347189a0ba91c7f26abab85289d646 Mon Sep 17 00:00:00 2001 From: Chisanan232 Date: Wed, 29 Apr 2026 23:12:15 +0800 Subject: [PATCH 24/28] =?UTF-8?q?=F0=9F=94=A7=20(ci):=20add=20native=20cor?= =?UTF-8?q?e=20maturin=20build=20and=20import=20check=20workflow?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .github/workflows/native-core-build.yml | 34 +++++++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 .github/workflows/native-core-build.yml diff --git a/.github/workflows/native-core-build.yml b/.github/workflows/native-core-build.yml new file mode 100644 index 0000000..c4923b2 --- /dev/null +++ b/.github/workflows/native-core-build.yml @@ -0,0 +1,34 @@ +name: Native Core Build Check + +on: + pull_request: + paths: + - "rust/**" + - "agent_assembly/**" + - ".github/workflows/native-core-build.yml" + workflow_dispatch: + +jobs: + build-native-core: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v5 + + - name: Setup Python + uses: actions/setup-python@v6 + with: + python-version: "3.13" + + - name: Setup uv + uses: astral-sh/setup-uv@v6 + + - name: Setup Rust + uses: dtolnay/rust-toolchain@stable + + - name: Build native module with maturin + run: uv tool run maturin develop --manifest-path rust/aa-ffi-python/Cargo.toml --release + + - name: Verify native module import + run: | + uv run python -c "from agent_assembly._core import RuntimeClient, GovernanceEvent, PolicyResult" From feaaeb9593ca9c210b51e432c3eafb7b174b54ad Mon Sep 17 00:00:00 2001 From: Chisanan232 Date: Wed, 29 Apr 2026 23:12:51 +0800 Subject: [PATCH 25/28] =?UTF-8?q?=F0=9F=A7=B9=20(build):=20ignore=20Rust?= =?UTF-8?q?=20target=20and=20Cargo=20lock=20artifacts?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.gitignore b/.gitignore index f9c5eee..cff69bd 100644 --- a/.gitignore +++ b/.gitignore @@ -26,6 +26,10 @@ build/ dist/ *.egg-info/ +## Rust build artifacts +rust/target/ +rust/Cargo.lock + ## Script scripts/ci/get-all-tests.sh From 1d9a4fc8b6c8af8032622b76ea230599101a9f70 Mon Sep 17 00:00:00 2001 From: Chisanan232 Date: Thu, 30 Apr 2026 09:09:55 +0800 Subject: [PATCH 26/28] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20(ffi):=20replace=20l?= =?UTF-8?q?ocal=20evaluator=20with=20socket=20IPC=20runtime=20bridge?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .github/workflows/native-core-build.yml | 2 + rust/aa-ffi-python/Cargo.toml | 4 +- rust/aa-ffi-python/src/lib.rs | 418 +++++++++++++++---- test/integration/test_native_core_maturin.py | 4 +- test/integration/test_native_core_runtime.py | 162 ++++++- 5 files changed, 507 insertions(+), 83 deletions(-) diff --git a/.github/workflows/native-core-build.yml b/.github/workflows/native-core-build.yml index c4923b2..71dc467 100644 --- a/.github/workflows/native-core-build.yml +++ b/.github/workflows/native-core-build.yml @@ -27,6 +27,8 @@ jobs: uses: dtolnay/rust-toolchain@stable - name: Build native module with maturin + env: + PYO3_USE_ABI3_FORWARD_COMPATIBILITY: "1" run: uv tool run maturin develop --manifest-path rust/aa-ffi-python/Cargo.toml --release - name: Verify native module import diff --git a/rust/aa-ffi-python/Cargo.toml b/rust/aa-ffi-python/Cargo.toml index c72b7af..948d89c 100644 --- a/rust/aa-ffi-python/Cargo.toml +++ b/rust/aa-ffi-python/Cargo.toml @@ -10,9 +10,11 @@ name = "aa_ffi_python" crate-type = ["cdylib"] [dependencies] +aa-proto = { git = "https://github.com/AI-agent-assembly/agent-assembly.git", package = "aa-proto" } once_cell = "1.20" +prost = "0.13" pyo3 = { version = "0.20", features = ["extension-module"] } pyo3-asyncio = { version = "0.20", features = ["tokio-runtime"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -tokio = { version = "1.41", features = ["rt-multi-thread", "sync", "time"] } +tokio = { version = "1.41", features = ["io-util", "net", "rt-multi-thread", "sync", "time"] } diff --git a/rust/aa-ffi-python/src/lib.rs b/rust/aa-ffi-python/src/lib.rs index ad59a4d..5eb6ada 100644 --- a/rust/aa-ffi-python/src/lib.rs +++ b/rust/aa-ffi-python/src/lib.rs @@ -1,13 +1,20 @@ //! aa-ffi-python crate bootstrap. +use aa_proto::assembly::audit::v1::AuditEvent; +use aa_proto::assembly::common::v1::Decision; +use aa_proto::assembly::policy::v1::CheckActionRequest; +use aa_proto::assembly::policy::v1::CheckActionResponse; use once_cell::sync::Lazy; +use prost::Message; use pyo3::exceptions::PyRuntimeError; use pyo3::prelude::*; use pyo3::types::PyDict; -use serde_json::Value; use std::sync::Arc; +use std::sync::Mutex; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use tokio::net::UnixStream; use tokio::runtime::Runtime; use tokio::sync::{mpsc, oneshot}; use tokio::time; @@ -22,6 +29,13 @@ static TOKIO_RUNTIME: Lazy = Lazy::new(|| { .expect("failed to build aa-ffi-python tokio runtime") }); +const TAG_POLICY_QUERY: u8 = 1; +const TAG_EVENT_REPORT: u8 = 2; +const TAG_HEARTBEAT: u8 = 4; + +const TAG_POLICY_RESPONSE: u8 = 1; +const TAG_ACK: u8 = 3; + #[pyclass(module = "agent_assembly._core")] #[derive(Clone)] struct GovernanceEvent { @@ -63,13 +77,15 @@ struct RuntimeClient { socket_path: String, sender: Option>, closed: Arc, + last_error: Arc>>, } enum WorkerMessage { Event(GovernanceEvent), PolicyQuery { action_json: String, - response_tx: oneshot::Sender, + timeout_ms: u64, + response_tx: oneshot::Sender>, }, Close, } @@ -80,7 +96,15 @@ struct PolicyResultPayload { reason: String, } -enum PolicyWaitError { +#[derive(Debug)] +enum WorkerError { + Timeout, + Disconnected, + Transport(String), + Decode(String), +} + +enum WorkerWaitError { Timeout, Disconnected, } @@ -93,45 +117,35 @@ impl RuntimeClient { socket_path, sender: None, closed: Arc::new(AtomicBool::new(true)), + last_error: Arc::new(Mutex::new(None)), } } #[staticmethod] fn connect(socket_path: String) -> Self { let _ = &*TOKIO_RUNTIME; - let (sender, mut receiver) = mpsc::unbounded_channel::(); + + let (sender, receiver) = mpsc::unbounded_channel::(); let closed = Arc::new(AtomicBool::new(false)); - let closed_for_task = Arc::clone(&closed); - TOKIO_RUNTIME.spawn(async move { - while let Some(message) = receiver.recv().await { - match message { - WorkerMessage::Event(_event) => {} - WorkerMessage::PolicyQuery { - action_json, - response_tx, - } => { - let policy_result = evaluate_policy_action(&action_json); - if policy_result.delay_ms > 0 { - time::sleep(Duration::from_millis(policy_result.delay_ms)).await; - } - let _ = response_tx.send(policy_result.result); - } - WorkerMessage::Close => break, - } - } - closed_for_task.store(true, Ordering::SeqCst); - }); + let last_error = Arc::new(Mutex::new(None)); + + TOKIO_RUNTIME.spawn(worker_loop( + socket_path.clone(), + receiver, + Arc::clone(&closed), + Arc::clone(&last_error), + )); + Self { socket_path, sender: Some(sender), closed, + last_error, } } fn send_event(&self, event: GovernanceEvent) -> PyResult<()> { - if self.closed.load(Ordering::SeqCst) { - return Err(PyRuntimeError::new_err("runtime client is closed")); - } + ensure_client_open(self.closed.as_ref(), self.last_error.as_ref())?; let sender = self .sender .as_ref() @@ -143,26 +157,30 @@ impl RuntimeClient { } fn query_policy(&self, py: Python<'_>, action: &PyAny) -> PyResult { + ensure_client_open(self.closed.as_ref(), self.last_error.as_ref())?; let action_json = serialize_action_to_json(py, action)?; let timeout_ms = extract_timeout_ms(action); let sender = self .sender .as_ref() .ok_or_else(|| PyRuntimeError::new_err("runtime event queue is unavailable"))?; - let (response_tx, response_rx) = oneshot::channel::(); + + let (response_tx, response_rx) = oneshot::channel::>(); sender .send(WorkerMessage::PolicyQuery { action_json, + timeout_ms, response_tx, }) .map_err(|_| PyRuntimeError::new_err("failed to enqueue policy query"))?; - let payload = py.allow_threads(|| wait_for_policy_response(timeout_ms, response_rx)); - let payload = payload.map_err(|error| match error { - PolicyWaitError::Timeout => PolicyTimeoutError::new_err("policy query timed out"), - PolicyWaitError::Disconnected => { - PyRuntimeError::new_err("failed to resolve policy query") - } + + let worker_result = py.allow_threads(|| wait_for_worker_response(timeout_ms + 100, response_rx)); + let worker_result = worker_result.map_err(|error| match error { + WorkerWaitError::Timeout => PolicyTimeoutError::new_err("policy query timed out"), + WorkerWaitError::Disconnected => PyRuntimeError::new_err("policy worker disconnected"), })?; + + let payload = worker_result.map_err(map_worker_error_to_py)?; Ok(PolicyResult { allowed: payload.allowed, reason: payload.reason, @@ -179,6 +197,161 @@ impl RuntimeClient { } } +async fn worker_loop( + socket_path: String, + mut receiver: mpsc::UnboundedReceiver, + closed: Arc, + last_error: Arc>>, +) { + let stream = match UnixStream::connect(&socket_path).await { + Ok(stream) => stream, + Err(error) => { + set_worker_error(last_error.as_ref(), format!("failed to connect runtime socket: {error}")); + closed.store(true, Ordering::SeqCst); + return; + } + }; + + let (mut reader, mut writer) = stream.into_split(); + if let Err(error) = write_heartbeat(&mut writer).await { + set_worker_error(last_error.as_ref(), format!("failed to send heartbeat: {error:?}")); + closed.store(true, Ordering::SeqCst); + return; + } + + match read_runtime_response(&mut reader).await { + Ok(RuntimeResponse::Ack) => {} + Ok(_) => { + set_worker_error(last_error.as_ref(), "unexpected heartbeat response from runtime".to_string()); + closed.store(true, Ordering::SeqCst); + return; + } + Err(error) => { + set_worker_error(last_error.as_ref(), format!("failed to read heartbeat ack: {error}")); + closed.store(true, Ordering::SeqCst); + return; + } + } + + while let Some(message) = receiver.recv().await { + match message { + WorkerMessage::Event(event) => { + let send_result = send_event_frame(&mut writer, &event).await; + if let Err(error) = send_result { + set_worker_error(last_error.as_ref(), format!("failed to send event: {error:?}")); + break; + } + + match read_runtime_response(&mut reader).await { + Ok(RuntimeResponse::Ack) => {} + Ok(_) => { + set_worker_error(last_error.as_ref(), "unexpected event ack response from runtime".to_string()); + break; + } + Err(error) => { + set_worker_error(last_error.as_ref(), format!("failed to read event ack: {error}")); + break; + } + } + } + WorkerMessage::PolicyQuery { + action_json, + timeout_ms, + response_tx, + } => { + let response = process_policy_query(&mut reader, &mut writer, action_json, timeout_ms).await; + let _ = response_tx.send(response); + } + WorkerMessage::Close => break, + } + } + + closed.store(true, Ordering::SeqCst); +} + +async fn send_event_frame(writer: &mut W, event: &GovernanceEvent) -> Result<(), WorkerError> +where + W: AsyncWrite + Unpin, +{ + let audit_event = AuditEvent { + event_id: make_event_id(), + trace_id: "python-sdk".to_string(), + span_id: "ffi-send-event".to_string(), + decision: Decision::Allow as i32, + labels: std::collections::HashMap::from([(String::from("payload_json"), event.payload_json.clone())]), + ..Default::default() + }; + let payload = audit_event.encode_to_vec(); + write_frame(writer, TAG_EVENT_REPORT, &payload).await +} + +async fn process_policy_query( + reader: &mut R, + writer: &mut W, + action_json: String, + timeout_ms: u64, +) -> Result +where + R: AsyncRead + Unpin, + W: AsyncWrite + Unpin, +{ + let request = CheckActionRequest { + trace_id: action_json, + span_id: "ffi-query-policy".to_string(), + ..Default::default() + }; + let payload = request.encode_to_vec(); + write_frame(writer, TAG_POLICY_QUERY, &payload).await?; + + let response = time::timeout( + Duration::from_millis(timeout_ms), + read_runtime_response(reader), + ) + .await + .map_err(|_| WorkerError::Timeout)? + .map_err(|error| WorkerError::Transport(error))?; + + match response { + RuntimeResponse::PolicyResponse(bytes) => { + let policy = CheckActionResponse::decode(bytes.as_slice()) + .map_err(|error| WorkerError::Decode(error.to_string()))?; + let allowed = matches!(policy.decision, x if x == Decision::Allow as i32 || x == Decision::Redact as i32); + Ok(PolicyResultPayload { + allowed, + reason: policy.reason, + }) + } + RuntimeResponse::Ack => Err(WorkerError::Transport( + "runtime returned ACK instead of policy response".to_string(), + )), + RuntimeResponse::Unknown(tag, _) => Err(WorkerError::Transport(format!( + "runtime returned unexpected tag {tag} for policy query" + ))), + } +} + +fn map_worker_error_to_py(error: WorkerError) -> PyErr { + match error { + WorkerError::Timeout => PolicyTimeoutError::new_err("policy query timed out"), + WorkerError::Disconnected => PyRuntimeError::new_err("policy worker disconnected"), + WorkerError::Transport(message) | WorkerError::Decode(message) => PyRuntimeError::new_err(message), + } +} + +fn ensure_client_open(closed: &AtomicBool, last_error: &Mutex>) -> PyResult<()> { + if !closed.load(Ordering::SeqCst) { + return Ok(()); + } + + if let Ok(guard) = last_error.lock() { + if let Some(message) = guard.as_ref() { + return Err(PyRuntimeError::new_err(message.clone())); + } + } + + Err(PyRuntimeError::new_err("runtime client is closed")) +} + fn extract_timeout_ms(action: &PyAny) -> u64 { action .downcast::() @@ -194,49 +367,146 @@ fn serialize_action_to_json(py: Python<'_>, action: &PyAny) -> PyResult dumped.extract::() } -struct PolicyEvaluation { - result: PolicyResultPayload, - delay_ms: u64, -} - -fn evaluate_policy_action(action_json: &str) -> PolicyEvaluation { - let parsed: Value = serde_json::from_str(action_json).unwrap_or(Value::Null); - let delay_ms = parsed - .as_object() - .and_then(|obj| obj.get("delay_ms")) - .and_then(Value::as_u64) - .unwrap_or(0); - let deny_flag = parsed - .as_object() - .and_then(|obj| obj.get("deny")) - .and_then(Value::as_bool) - .unwrap_or(false); - if deny_flag { - return PolicyEvaluation { - result: PolicyResultPayload { - allowed: false, - reason: "Denied by local policy rule.".to_string(), - }, - delay_ms, - }; - } - PolicyEvaluation { - result: PolicyResultPayload { - allowed: true, - reason: String::new(), - }, - delay_ms, - } -} - -fn wait_for_policy_response( +fn set_worker_error(last_error: &Mutex>, message: String) { + if let Ok(mut guard) = last_error.lock() { + *guard = Some(message); + } +} + +fn make_event_id() -> String { + use std::time::{SystemTime, UNIX_EPOCH}; + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default(); + format!("py-{}-{}", now.as_secs(), now.subsec_nanos()) +} + +enum RuntimeResponse { + Ack, + PolicyResponse(Vec), + Unknown(u8, Vec), +} + +async fn write_heartbeat(writer: &mut W) -> Result<(), WorkerError> +where + W: AsyncWrite + Unpin, +{ + writer + .write_u8(TAG_HEARTBEAT) + .await + .map_err(|error| WorkerError::Transport(error.to_string()))?; + writer + .flush() + .await + .map_err(|error| WorkerError::Transport(error.to_string())) +} + +async fn write_frame(writer: &mut W, tag: u8, payload: &[u8]) -> Result<(), WorkerError> +where + W: AsyncWrite + Unpin, +{ + writer + .write_u8(tag) + .await + .map_err(|error| WorkerError::Transport(error.to_string()))?; + write_varint(writer, payload.len() as u64).await?; + writer + .write_all(payload) + .await + .map_err(|error| WorkerError::Transport(error.to_string()))?; + writer + .flush() + .await + .map_err(|error| WorkerError::Transport(error.to_string())) +} + +async fn read_runtime_response(reader: &mut R) -> Result +where + R: AsyncRead + Unpin, +{ + let tag = reader.read_u8().await.map_err(|error| error.to_string())?; + match tag { + TAG_ACK => { + let _ = read_length_delimited(reader).await?; + Ok(RuntimeResponse::Ack) + } + TAG_POLICY_RESPONSE => { + let payload = read_length_delimited(reader).await?; + Ok(RuntimeResponse::PolicyResponse(payload)) + } + other => { + let payload = read_length_delimited(reader).await?; + Ok(RuntimeResponse::Unknown(other, payload)) + } + } +} + +async fn read_length_delimited(reader: &mut R) -> Result, String> +where + R: AsyncRead + Unpin, +{ + let len = read_varint(reader).await? as usize; + let mut payload = vec![0u8; len]; + reader + .read_exact(&mut payload) + .await + .map_err(|error| error.to_string())?; + Ok(payload) +} + +async fn read_varint(reader: &mut R) -> Result +where + R: AsyncRead + Unpin, +{ + let mut result: u64 = 0; + let mut shift = 0u32; + loop { + let byte = reader.read_u8().await.map_err(|error| error.to_string())?; + result |= ((byte & 0x7F) as u64) << shift; + if byte & 0x80 == 0 { + break; + } + shift += 7; + if shift >= 64 { + return Err("varint too long".to_string()); + } + } + Ok(result) +} + +async fn write_varint(writer: &mut W, mut value: u64) -> Result<(), WorkerError> +where + W: AsyncWrite + Unpin, +{ + loop { + let byte = (value & 0x7F) as u8; + value >>= 7; + if value == 0 { + writer + .write_u8(byte) + .await + .map_err(|error| WorkerError::Transport(error.to_string()))?; + break; + } + + writer + .write_u8(byte | 0x80) + .await + .map_err(|error| WorkerError::Transport(error.to_string()))?; + } + + Ok(()) +} + +fn wait_for_worker_response( timeout_ms: u64, - response_rx: oneshot::Receiver, -) -> Result { + response_rx: oneshot::Receiver>, +) -> Result, WorkerWaitError> { TOKIO_RUNTIME .block_on(async move { time::timeout(Duration::from_millis(timeout_ms), response_rx).await }) - .map_err(|_| PolicyWaitError::Timeout)? - .map_err(|_| PolicyWaitError::Disconnected) + .map_err(|_| WorkerWaitError::Timeout)? + .map_err(|_| WorkerWaitError::Disconnected) } #[pymodule] diff --git a/test/integration/test_native_core_maturin.py b/test/integration/test_native_core_maturin.py index a39eaf8..ee09135 100644 --- a/test/integration/test_native_core_maturin.py +++ b/test/integration/test_native_core_maturin.py @@ -22,7 +22,9 @@ def test_maturin_develop_exposes_runtime_client() -> None: "rust/aa-ffi-python/Cargo.toml", "--release", ] - subprocess.run(command, check=True) + env = os.environ.copy() + env.setdefault("PYO3_USE_ABI3_FORWARD_COMPATIBILITY", "1") + subprocess.run(command, check=True, env=env) from agent_assembly._core import RuntimeClient diff --git a/test/integration/test_native_core_runtime.py b/test/integration/test_native_core_runtime.py index dcd5384..589f502 100644 --- a/test/integration/test_native_core_runtime.py +++ b/test/integration/test_native_core_runtime.py @@ -2,13 +2,137 @@ import gc import os +import socket +import tempfile import threading import time import tracemalloc +from pathlib import Path import pytest +class MockRuntimeServer: + def __init__(self, *, policy_delay_ms: int = 0) -> None: + self._policy_delay_ms = policy_delay_ms + self._stop = threading.Event() + self._ready = threading.Event() + self._thread = threading.Thread(target=self._run, daemon=True) + self._socket_dir = tempfile.TemporaryDirectory(prefix="aaasm55-") + self.socket_path = str(Path(self._socket_dir.name) / "runtime.sock") + + def start(self) -> None: + self._thread.start() + if not self._ready.wait(timeout=5): + raise RuntimeError("mock runtime server did not start") + + def close(self) -> None: + self._stop.set() + self._thread.join(timeout=5) + self._socket_dir.cleanup() + + def _run(self) -> None: + if os.path.exists(self.socket_path): + os.remove(self.socket_path) + + with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as server: + server.bind(self.socket_path) + server.listen(1) + self._ready.set() + + conn, _ = server.accept() + with conn: + conn.settimeout(0.2) + + # Heartbeat frame from client is a single tag byte. + tag = self._read_u8(conn) + if tag != 4: + return + self._write_frame(conn, 3, b"") + + while not self._stop.is_set(): + try: + tag = self._read_u8(conn) + except (TimeoutError, OSError): + continue + + if tag in (1, 2, 3): + _ = self._read_length_delimited(conn) + + if tag == 1: + if self._policy_delay_ms > 0: + time.sleep(self._policy_delay_ms / 1000.0) + # CheckActionResponse { decision: ALLOW } protobuf payload + try: + self._write_frame(conn, 1, b"\x08\x01") + except OSError: + return + elif tag == 2: + try: + self._write_frame(conn, 3, b"") + except OSError: + return + elif tag == 3: + try: + self._write_frame(conn, 3, b"") + except OSError: + return + else: + return + + @staticmethod + def _read_u8(conn: socket.socket) -> int: + chunk = conn.recv(1) + if not chunk: + raise OSError("socket closed") + return chunk[0] + + @staticmethod + def _read_varint(conn: socket.socket) -> int: + result = 0 + shift = 0 + while True: + byte = MockRuntimeServer._read_u8(conn) + result |= (byte & 0x7F) << shift + if byte & 0x80 == 0: + return result + shift += 7 + if shift >= 64: + raise ValueError("varint too long") + + @staticmethod + def _read_exact(conn: socket.socket, size: int) -> bytes: + data = b"" + while len(data) < size: + chunk = conn.recv(size - len(data)) + if not chunk: + raise OSError("socket closed") + data += chunk + return data + + @classmethod + def _read_length_delimited(cls, conn: socket.socket) -> bytes: + size = cls._read_varint(conn) + return cls._read_exact(conn, size) + + @staticmethod + def _write_varint(conn: socket.socket, value: int) -> None: + while True: + byte = value & 0x7F + value >>= 7 + if value == 0: + conn.sendall(bytes([byte])) + return + conn.sendall(bytes([byte | 0x80])) + + @classmethod + def _write_frame(cls, conn: socket.socket, tag: int, payload: bytes) -> None: + conn.sendall(bytes([tag])) + cls._write_varint(conn, len(payload)) + if payload: + conn.sendall(payload) + + @pytest.fixture() def native_core(): if os.getenv("AAASM_RUN_NATIVE_CORE_TESTS") != "1": @@ -18,7 +142,10 @@ def native_core(): @pytest.mark.integration def test_send_event_is_non_blocking(native_core) -> None: - client = native_core.RuntimeClient.connect("/tmp/aaasm55.sock") + server = MockRuntimeServer() + server.start() + + client = native_core.RuntimeClient.connect(server.socket_path) try: start = time.perf_counter() for index in range(500): @@ -27,27 +154,43 @@ def test_send_event_is_non_blocking(native_core) -> None: assert elapsed_ms < 50.0 finally: client.close() + server.close() @pytest.mark.integration def test_query_policy_returns_quickly_and_times_out(native_core) -> None: - client = native_core.RuntimeClient.connect("/tmp/aaasm55.sock") + fast_server = MockRuntimeServer(policy_delay_ms=0) + fast_server.start() + + fast_client = native_core.RuntimeClient.connect(fast_server.socket_path) try: start = time.perf_counter() - result = client.query_policy({"action": "tool.call", "timeout_ms": 50}) + result = fast_client.query_policy({"action": "tool.call", "timeout_ms": 50}) elapsed_ms = (time.perf_counter() - start) * 1000.0 assert elapsed_ms < 50.0 assert result.allowed is True + finally: + fast_client.close() + fast_server.close() + slow_server = MockRuntimeServer(policy_delay_ms=200) + slow_server.start() + + slow_client = native_core.RuntimeClient.connect(slow_server.socket_path) + try: with pytest.raises(native_core.PolicyTimeoutError): - client.query_policy({"action": "slow.call", "delay_ms": 200, "timeout_ms": 10}) + slow_client.query_policy({"action": "slow.call", "timeout_ms": 10}) finally: - client.close() + slow_client.close() + slow_server.close() @pytest.mark.integration def test_runtime_client_has_no_thread_deadlock(native_core) -> None: - client = native_core.RuntimeClient.connect("/tmp/aaasm55.sock") + server = MockRuntimeServer() + server.start() + + client = native_core.RuntimeClient.connect(server.socket_path) errors: list[Exception] = [] def worker(worker_id: int) -> None: @@ -69,11 +212,15 @@ def worker(worker_id: int) -> None: assert errors == [] finally: client.close() + server.close() @pytest.mark.integration def test_runtime_client_tracemalloc_leak_guard(native_core) -> None: - client = native_core.RuntimeClient.connect("/tmp/aaasm55.sock") + server = MockRuntimeServer() + server.start() + + client = native_core.RuntimeClient.connect(server.socket_path) tracemalloc.start() baseline_current, _ = tracemalloc.get_traced_memory() try: @@ -85,3 +232,4 @@ def test_runtime_client_tracemalloc_leak_guard(native_core) -> None: finally: tracemalloc.stop() client.close() + server.close() From 27c542ebecf42146c000e078661f60ca6e6a09c9 Mon Sep 17 00:00:00 2001 From: Chisanan232 Date: Thu, 30 Apr 2026 09:13:57 +0800 Subject: [PATCH 27/28] =?UTF-8?q?=E2=9C=A8=20(ffi):=20validate=20Governanc?= =?UTF-8?q?eEvent=20as=20aa-core=20AuditEntry=20payload?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rust/aa-ffi-python/Cargo.toml | 1 + rust/aa-ffi-python/src/lib.rs | 37 ++++++++++++++++++-- test/integration/test_native_core_runtime.py | 27 +++++++++++--- 3 files changed, 58 insertions(+), 7 deletions(-) diff --git a/rust/aa-ffi-python/Cargo.toml b/rust/aa-ffi-python/Cargo.toml index 948d89c..fe93a98 100644 --- a/rust/aa-ffi-python/Cargo.toml +++ b/rust/aa-ffi-python/Cargo.toml @@ -10,6 +10,7 @@ name = "aa_ffi_python" crate-type = ["cdylib"] [dependencies] +aa-core = { git = "https://github.com/AI-agent-assembly/agent-assembly.git", package = "aa-core", features = ["serde"] } aa-proto = { git = "https://github.com/AI-agent-assembly/agent-assembly.git", package = "aa-proto" } once_cell = "1.20" prost = "0.13" diff --git a/rust/aa-ffi-python/src/lib.rs b/rust/aa-ffi-python/src/lib.rs index 5eb6ada..5ae2616 100644 --- a/rust/aa-ffi-python/src/lib.rs +++ b/rust/aa-ffi-python/src/lib.rs @@ -1,11 +1,13 @@ //! aa-ffi-python crate bootstrap. +use aa_core::AuditEntry; use aa_proto::assembly::audit::v1::AuditEvent; use aa_proto::assembly::common::v1::Decision; use aa_proto::assembly::policy::v1::CheckActionRequest; use aa_proto::assembly::policy::v1::CheckActionResponse; use once_cell::sync::Lazy; use prost::Message; +use pyo3::exceptions::PyValueError; use pyo3::exceptions::PyRuntimeError; use pyo3::prelude::*; use pyo3::types::PyDict; @@ -41,13 +43,22 @@ const TAG_ACK: u8 = 3; struct GovernanceEvent { #[pyo3(get)] payload_json: String, + audit_entry: AuditEntry, } #[pymethods] impl GovernanceEvent { #[new] - fn new(payload_json: String) -> Self { - Self { payload_json } + fn new(payload_json: String) -> PyResult { + let audit_entry = serde_json::from_str::(&payload_json).map_err(|error| { + PyValueError::new_err(format!( + "GovernanceEvent payload must be serialized aa_core::AuditEntry JSON: {error}" + )) + })?; + Ok(Self { + payload_json, + audit_entry, + }) } } @@ -273,12 +284,22 @@ async fn send_event_frame(writer: &mut W, event: &GovernanceEvent) -> Result< where W: AsyncWrite + Unpin, { + let entry = &event.audit_entry; + let event_type = format!("{:?}", entry.event_type()); + let agent_id_hex = bytes_to_hex(entry.agent_id().as_bytes()); + let session_id_hex = bytes_to_hex(entry.session_id().as_bytes()); let audit_event = AuditEvent { event_id: make_event_id(), trace_id: "python-sdk".to_string(), span_id: "ffi-send-event".to_string(), decision: Decision::Allow as i32, - labels: std::collections::HashMap::from([(String::from("payload_json"), event.payload_json.clone())]), + labels: std::collections::HashMap::from([ + (String::from("payload_json"), event.payload_json.clone()), + (String::from("event_type"), event_type), + (String::from("agent_id_hex"), agent_id_hex), + (String::from("session_id_hex"), session_id_hex), + (String::from("payload"), entry.payload().to_string()), + ]), ..Default::default() }; let payload = audit_event.encode_to_vec(); @@ -382,6 +403,16 @@ fn make_event_id() -> String { format!("py-{}-{}", now.as_secs(), now.subsec_nanos()) } +fn bytes_to_hex(bytes: &[u8; 16]) -> String { + const HEX: &[u8; 16] = b"0123456789abcdef"; + let mut result = String::with_capacity(bytes.len() * 2); + for byte in bytes { + result.push(HEX[(byte >> 4) as usize] as char); + result.push(HEX[(byte & 0x0F) as usize] as char); + } + result +} + enum RuntimeResponse { Ack, PolicyResponse(Vec), diff --git a/test/integration/test_native_core_runtime.py b/test/integration/test_native_core_runtime.py index 589f502..114e683 100644 --- a/test/integration/test_native_core_runtime.py +++ b/test/integration/test_native_core_runtime.py @@ -1,6 +1,7 @@ from __future__ import annotations import gc +import json import os import socket import tempfile @@ -133,6 +134,21 @@ def _write_frame(cls, conn: socket.socket, tag: int, payload: bytes) -> None: conn.sendall(payload) +def make_audit_entry_payload(index: int, *, worker_id: int = 0) -> str: + return json.dumps( + { + "seq": index, + "timestamp_ns": 1_700_000_000_000_000_000 + index, + "event_type": "ToolCallIntercepted", + "agent_id": [worker_id % 255] * 16, + "session_id": [index % 255] * 16, + "payload": json.dumps({"index": index, "worker": worker_id}), + "previous_hash": [0] * 32, + "entry_hash": [0] * 32, + } + ) + + @pytest.fixture() def native_core(): if os.getenv("AAASM_RUN_NATIVE_CORE_TESTS") != "1": @@ -147,9 +163,10 @@ def test_send_event_is_non_blocking(native_core) -> None: client = native_core.RuntimeClient.connect(server.socket_path) try: + events = [native_core.GovernanceEvent(make_audit_entry_payload(index)) for index in range(500)] start = time.perf_counter() - for index in range(500): - client.send_event(native_core.GovernanceEvent(f'{{"index": {index}}}')) + for event in events: + client.send_event(event) elapsed_ms = (time.perf_counter() - start) * 1000.0 assert elapsed_ms < 50.0 finally: @@ -196,7 +213,9 @@ def test_runtime_client_has_no_thread_deadlock(native_core) -> None: def worker(worker_id: int) -> None: try: for index in range(100): - client.send_event(native_core.GovernanceEvent(f'{{"worker": {worker_id}, "idx": {index}}}')) + client.send_event( + native_core.GovernanceEvent(make_audit_entry_payload(index, worker_id=worker_id)) + ) client.query_policy({"action": "tool.call", "timeout_ms": 50}) except Exception as error: # pragma: no cover - runtime guard errors.append(error) @@ -225,7 +244,7 @@ def test_runtime_client_tracemalloc_leak_guard(native_core) -> None: baseline_current, _ = tracemalloc.get_traced_memory() try: for index in range(10_000): - client.send_event(native_core.GovernanceEvent(f'{{"index": {index}}}')) + client.send_event(native_core.GovernanceEvent(make_audit_entry_payload(index))) gc.collect() current, _ = tracemalloc.get_traced_memory() assert current - baseline_current < 1_000_000 From ba5f0c536faeb92851f8e5587996e620673790ce Mon Sep 17 00:00:00 2001 From: Chisanan232 Date: Thu, 30 Apr 2026 09:14:27 +0800 Subject: [PATCH 28/28] =?UTF-8?q?=F0=9F=A7=AA=20(tests):=20cover=20optiona?= =?UTF-8?q?l=20native=20exports=20in=20package=20init?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- test/unit/test_init_exports.py | 43 ++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 test/unit/test_init_exports.py diff --git a/test/unit/test_init_exports.py b/test/unit/test_init_exports.py new file mode 100644 index 0000000..81aeb3a --- /dev/null +++ b/test/unit/test_init_exports.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +import importlib +import sys +import types + + +def test_all_includes_native_core_symbols_when_extension_is_available() -> None: + fake_core = types.ModuleType("agent_assembly._core") + + class RuntimeClient: ... + + class GovernanceEvent: ... + + class PolicyResult: ... + + class PolicyTimeoutError(Exception): ... + + fake_core.RuntimeClient = RuntimeClient + fake_core.GovernanceEvent = GovernanceEvent + fake_core.PolicyResult = PolicyResult + fake_core.PolicyTimeoutError = PolicyTimeoutError + + original_package = sys.modules.pop("agent_assembly", None) + original_core = sys.modules.get("agent_assembly._core") + + try: + sys.modules["agent_assembly._core"] = fake_core + module = importlib.import_module("agent_assembly") + + assert "RuntimeClient" in module.__all__ + assert "GovernanceEvent" in module.__all__ + assert "PolicyResult" in module.__all__ + assert "PolicyTimeoutError" in module.__all__ + finally: + sys.modules.pop("agent_assembly", None) + if original_package is not None: + sys.modules["agent_assembly"] = original_package + + if original_core is None: + sys.modules.pop("agent_assembly._core", None) + else: + sys.modules["agent_assembly._core"] = original_core