Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
90e358c
📦 (adapters): Create empty CrewAI adapter package scaffold
Chisanan232 Apr 28, 2026
6d8c5ab
📦 (adapters): Add empty CrewAI patch module scaffold
Chisanan232 Apr 28, 2026
698c9fb
✨ (crewai): Add CrewAIPatch class skeleton
Chisanan232 Apr 28, 2026
13fe05d
✨ (crewai): Add apply shell for CrewAI patch lifecycle
Chisanan232 Apr 28, 2026
4c188a7
✨ (crewai): Add idempotent patch guard flags
Chisanan232 Apr 28, 2026
3b412ea
✨ (crewai): Add CrewAI class loader helpers
Chisanan232 Apr 28, 2026
9077f2c
✨ (crewai): Add thread-local agent context storage helper
Chisanan232 Apr 28, 2026
109ab25
✨ (crewai): Add thread-local agent ID getter
Chisanan232 Apr 28, 2026
f8f922a
✨ (crewai): Add blocked policy response string formatter
Chisanan232 Apr 28, 2026
2e04a7f
✨ (crewai): Add approval rejected response string formatter
Chisanan232 Apr 28, 2026
47f67fb
✨ (crewai): Add governance decision normalization helper
Chisanan232 Apr 28, 2026
1a71261
✨ (crewai): Add sync governance tool-check invocation helper
Chisanan232 Apr 28, 2026
03bd6b5
✨ (crewai): Add sync pending-approval wait helper
Chisanan232 Apr 28, 2026
8821e22
✨ (crewai): Patch BaseTool.run with governance pre-check flow
Chisanan232 Apr 28, 2026
6dba345
✨ (crewai): Return denied governance decisions as CrewAI strings
Chisanan232 Apr 28, 2026
ba4ea18
✨ (crewai): Record allowed tool results after run passthrough
Chisanan232 Apr 28, 2026
25dfc63
✨ (crewai): Patch Task.execute_sync with task-start hook
Chisanan232 Apr 28, 2026
e909518
✨ (crewai): Add task-complete hook recording for execute_sync
Chisanan232 Apr 28, 2026
2a28dc7
♻️ (crewai): Preserve original callable metadata in patched methods
Chisanan232 Apr 28, 2026
b11386f
♻️ (runtime): Apply CrewAIPatch during assembly runtime injection
Chisanan232 Apr 28, 2026
f61dc36
✅ (crewai): Add unit test for patch apply idempotency
Chisanan232 Apr 28, 2026
e42b308
✅ (crewai): Add unit test for blocked tool return-string contract
Chisanan232 Apr 28, 2026
559d4f0
✅ (crewai): Add unit test for allowed tool result recording
Chisanan232 Apr 28, 2026
ec36b32
✅ (crewai): Add unit test for pending approval allow path
Chisanan232 Apr 28, 2026
aefe158
✅ (crewai): Add unit test for pending approval timeout denial
Chisanan232 Apr 28, 2026
8a8ae51
✅ (crewai): Add unit test for task lifecycle audit events
Chisanan232 Apr 28, 2026
bd2e4af
✨ (crewai): Propagate task agent ID into thread-local context
Chisanan232 Apr 28, 2026
25d3ca8
✅ (crewai): Add unit test for thread-local agent ID concurrency
Chisanan232 Apr 28, 2026
8e10441
✅ (integration): Add CrewAI blocked-tool continuation scenario test
Chisanan232 Apr 28, 2026
8d0381f
♻️ (adapters): Export CrewAIPatch from crewai package
Chisanan232 Apr 28, 2026
2d8e9ea
📝 (docs): Add CrewAI runtime interception behavior notes
Chisanan232 Apr 28, 2026
e0f91ad
✅ (crewai): Add unit test for loader edge cases and apply false path
Chisanan232 Apr 28, 2026
e1a4238
✅ (crewai): Add unit test for decision and agent-ID helper branches
Chisanan232 Apr 28, 2026
6baa20c
✅ (crewai): Add unit test for result and task fallback callback paths
Chisanan232 Apr 28, 2026
3289efa
✨ (crewai): Make pending approval timeout configurable in runtime patch
Chisanan232 Apr 29, 2026
6f9798f
✅ (integration): Add real CrewAI class-path blocked tool flow test
Chisanan232 Apr 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions agent_assembly/adapters/crewai/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""CrewAI adapter package."""

from agent_assembly.adapters.crewai.patch import CrewAIPatch

__all__ = ["CrewAIPatch"]
315 changes: 315 additions & 0 deletions agent_assembly/adapters/crewai/patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,315 @@
"""CrewAI patch module."""

from __future__ import annotations

from dataclasses import dataclass
from functools import wraps
import importlib
from threading import local
from typing import Any, Literal, Mapping

_TOOLS_PATCHED_FLAG = "_agent_assembly_crewai_tools_patched"
_TASK_PATCHED_FLAG = "_agent_assembly_crewai_task_patched"
_ORIGINAL_TOOL_RUN = "_agent_assembly_original_crewai_tool_run"
_ORIGINAL_TASK_EXECUTE_SYNC = "_agent_assembly_original_crewai_task_execute_sync"
_AGENT_CONTEXT = local()
_DEFAULT_PENDING_APPROVAL_TIMEOUT_SECONDS = 300


@dataclass(slots=True)
class CrewAIPatch:
"""Applies CrewAI runtime monkey-patching hooks."""

callback_handler: Any

def apply(self) -> bool:
"""Apply patch wiring and return whether CrewAI is available."""
base_tool_cls = _load_crewai_basetool_class()
if base_tool_cls is None:
return False

_apply_basetool_run_patch(base_tool_cls, self.callback_handler)
task_cls = _load_crewai_task_class()
if task_cls is not None:
_apply_task_execute_sync_patch(task_cls, self.callback_handler)
return True


def _load_crewai_basetool_class() -> type[Any] | None:
try:
module = importlib.import_module("crewai.tools")
except ImportError:
return None

base_tool_cls = getattr(module, "BaseTool", None)
if isinstance(base_tool_cls, type):
return base_tool_cls
return None


def _load_crewai_task_class() -> type[Any] | None:
try:
module = importlib.import_module("crewai")
except ImportError:
return None

task_cls = getattr(module, "Task", None)
if isinstance(task_cls, type):
return task_cls
return None


def _set_thread_local_agent_id(agent_id: str | None) -> None:
_AGENT_CONTEXT.agent_id = agent_id


def _get_thread_local_agent_id() -> str | None:
agent_id = getattr(_AGENT_CONTEXT, "agent_id", None)
if isinstance(agent_id, str) and agent_id:
return agent_id
return None


def _extract_agent_id_from_inputs(args: tuple[Any, ...], kwargs: dict[str, Any]) -> str | None:

Check failure on line 73 in agent_assembly/adapters/crewai/patch.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 20 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=AI-agent-assembly_python-sdk&issues=AZ3WS6kiz0DJLEl1vsH1&open=AZ3WS6kiz0DJLEl1vsH1&pullRequest=9
direct_agent_id = kwargs.get("agent_id")
if isinstance(direct_agent_id, str) and direct_agent_id:
return direct_agent_id

config = kwargs.get("config")
if isinstance(config, dict):
configurable = config.get("configurable")
if isinstance(configurable, dict):
configurable_agent_id = configurable.get("agent_id")
if isinstance(configurable_agent_id, str) and configurable_agent_id:
return configurable_agent_id

metadata = config.get("metadata")
if isinstance(metadata, dict):
metadata_agent_id = metadata.get("agent_id")
if isinstance(metadata_agent_id, str) and metadata_agent_id:
return metadata_agent_id

if args and isinstance(args[0], dict):
state_agent_id = args[0].get("agent_id")
if isinstance(state_agent_id, str) and state_agent_id:
return state_agent_id

return None


def _format_blocked_message(reason: str | None) -> str:
reason_text = reason or "No reason provided."
return (
f"[BLOCKED by governance policy] {reason_text}. "
"Please choose a different approach to accomplish this task."
)


def _format_approval_rejected_message(reason: str | None) -> str:
reason_text = reason or "No reason provided."
return f"[APPROVAL REJECTED] Action was reviewed and denied: {reason_text}"


def _normalize_decision(
decision: object,
) -> tuple[Literal["allow", "deny", "pending"], str | None]:
if isinstance(decision, str):
normalized = decision.strip().lower()
if normalized == "deny":
return "deny", None
if normalized == "pending":
return "pending", None
return "allow", None

if isinstance(decision, Mapping):
raw_status = str(decision.get("status", "allow")).strip().lower()
if raw_status == "deny":
status: Literal["allow", "deny", "pending"] = "deny"
elif raw_status == "pending":
status = "pending"
else:
status = "allow"

reason_value = decision.get("reason")
reason = str(reason_value) if reason_value is not None else None
return status, reason

return "allow", None


def _invoke_sync_tool_check(
callback_handler: Any,
*,
tool_name: str,
tool_args: dict[str, Any],
agent_id: str | None,
) -> object:
method = getattr(callback_handler, "check_tool_start", None)
if callable(method):
return method(
serialized={"name": tool_name},
input_str=str(tool_args),
tool_name=tool_name,
args=tool_args,
agent_id=agent_id,
)

return {"status": "allow"}


def _wait_for_sync_tool_approval(
callback_handler: Any,
*,
tool_name: str,
timeout_seconds: int,
tool_args: dict[str, Any],
agent_id: str | None,
) -> object:
method = getattr(callback_handler, "wait_for_tool_approval", None)
if callable(method):
return method(
tool_name=tool_name,
timeout_seconds=timeout_seconds,
args=tool_args,
agent_id=agent_id,
)

return {"status": "deny", "reason": "Approval handler is unavailable."}


def _get_pending_tool_approval_timeout_seconds(callback_handler: Any) -> int:
provider = getattr(callback_handler, "get_pending_tool_approval_timeout_seconds", None)
if callable(provider):
configured = provider()
else:
configured = getattr(callback_handler, "pending_tool_approval_timeout_seconds", None)

if isinstance(configured, str):
stripped = configured.strip()
if stripped.isdigit():
parsed = int(stripped)
if parsed > 0:
return parsed
return _DEFAULT_PENDING_APPROVAL_TIMEOUT_SECONDS

if isinstance(configured, bool):
return _DEFAULT_PENDING_APPROVAL_TIMEOUT_SECONDS

if isinstance(configured, int) and configured > 0:
return configured

return _DEFAULT_PENDING_APPROVAL_TIMEOUT_SECONDS


def _record_sync_tool_result(
callback_handler: Any,
*,
tool_name: str,
result: object,
) -> None:
record_method = getattr(callback_handler, "record_result", None)
if callable(record_method):
record_method(tool_name=tool_name, result=result)
return None

tool_end_method = getattr(callback_handler, "on_tool_end", None)
if callable(tool_end_method):
tool_end_method(output=result, tool_name=tool_name)
return None


def _apply_basetool_run_patch(base_tool_cls: type[Any], callback_handler: Any) -> None:
if getattr(base_tool_cls, _TOOLS_PATCHED_FLAG, False):
return None

original_run = base_tool_cls.run

@wraps(original_run)
def patched_run(self: Any, *args: Any, **kwargs: Any) -> Any:
tool_name = getattr(self, "name", self.__class__.__name__)
tool_args = dict(kwargs)
agent_id = _get_thread_local_agent_id()
decision = _invoke_sync_tool_check(
callback_handler,
tool_name=str(tool_name),
tool_args=tool_args,
agent_id=agent_id,
)
status, reason = _normalize_decision(decision)
is_pending_flow = False
if status == "pending":
is_pending_flow = True
timeout_seconds = _get_pending_tool_approval_timeout_seconds(callback_handler)
final_decision = _wait_for_sync_tool_approval(
callback_handler,
tool_name=str(tool_name),
timeout_seconds=timeout_seconds,
tool_args=tool_args,
agent_id=agent_id,
)
status, reason = _normalize_decision(final_decision)

if status == "deny":
if is_pending_flow:
return _format_approval_rejected_message(reason)
return _format_blocked_message(reason)

result = original_run(self, *args, **kwargs)
_record_sync_tool_result(callback_handler, tool_name=str(tool_name), result=result)
return result

setattr(base_tool_cls, _ORIGINAL_TOOL_RUN, original_run)
setattr(base_tool_cls, "run", patched_run)
setattr(base_tool_cls, _TOOLS_PATCHED_FLAG, True)


def _record_task_start(callback_handler: Any, task: Any) -> None:
method = getattr(callback_handler, "record", None)
if callable(method):
method(
action="task_start",
task_description=str(getattr(task, "description", ""))[:200],
expected_output=getattr(task, "expected_output", None),
)
return None

fallback = getattr(callback_handler, "on_task_start", None)
if callable(fallback):
fallback(task=task)
return None


def _record_task_complete(callback_handler: Any, result: object) -> None:
method = getattr(callback_handler, "record", None)
if callable(method):
method(action="task_complete", output_preview=str(result)[:500])
return None

fallback = getattr(callback_handler, "on_task_complete", None)
if callable(fallback):
fallback(result=result)
return None


def _apply_task_execute_sync_patch(task_cls: type[Any], callback_handler: Any) -> None:
if getattr(task_cls, _TASK_PATCHED_FLAG, False):
return None

original_execute_sync = task_cls.execute_sync

@wraps(original_execute_sync)
def patched_execute_sync(self: Any, *args: Any, **kwargs: Any) -> Any:
previous_agent_id = _get_thread_local_agent_id()
_set_thread_local_agent_id(_extract_agent_id_from_inputs(args, kwargs))
_record_task_start(callback_handler, self)
try:
result = original_execute_sync(self, *args, **kwargs)
finally:
_set_thread_local_agent_id(previous_agent_id)

_record_task_complete(callback_handler, result)
return result

setattr(task_cls, _ORIGINAL_TASK_EXECUTE_SYNC, original_execute_sync)
setattr(task_cls, "execute_sync", patched_execute_sync)
setattr(task_cls, _TASK_PATCHED_FLAG, True)
3 changes: 3 additions & 0 deletions agent_assembly/adapters/langchain/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from threading import Lock
from typing import Any

from agent_assembly.adapters.crewai.patch import CrewAIPatch
from agent_assembly.adapters.langchain.callback_handler import AssemblyCallbackHandler
from agent_assembly.adapters.langgraph import LangGraphPatch

Expand All @@ -19,11 +20,13 @@ def auto_inject_callback_handler(interceptor: Any) -> AssemblyCallbackHandler:
with _RUNTIME_LOCK:
if _ACTIVE_CALLBACK_HANDLER is not None:
LangGraphPatch(_ACTIVE_CALLBACK_HANDLER).apply()
CrewAIPatch(interceptor).apply()
return _ACTIVE_CALLBACK_HANDLER

handler = AssemblyCallbackHandler(interceptor)
_ACTIVE_CALLBACK_HANDLER = handler
LangGraphPatch(handler).apply()
CrewAIPatch(interceptor).apply()
return handler


Expand Down
9 changes: 9 additions & 0 deletions docs/contents/document/api-references/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,12 @@ from agent_assembly.exceptions import (
- `deny` (or unresolved `pending`) raises `ToolExecutionBlockedError`.
- LLM start interception is scan-only and does not mutate prompt content.
- LangGraph `StateGraph.compile()` is patched to add pre/post invocation governance hooks.

## CrewAI runtime interception

`init_assembly(...)` also applies CrewAI runtime patches when CrewAI is installed.

- `BaseTool.run()` is patched with synchronous governance checks.
- Blocked tool calls return policy message strings (instead of raising exceptions).
- `pending` approval decisions block synchronously and return denial strings if not approved.
- `Task.execute_sync()` emits task start/complete audit events.
Loading