Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
25da489
✨ (tests): Create OpenAI Agents adapter unit and integration test pac…
Chisanan232 Apr 30, 2026
abef605
✅ (tests): Add initial failing coverage for FunctionTool call patchin…
Chisanan232 Apr 30, 2026
16fe816
✨ (openai_agents): Add patch state constants and process agent contex…
Chisanan232 Apr 30, 2026
8c0ec2f
✨ (openai_agents): Add FunctionTool class loader helper
Chisanan232 Apr 30, 2026
838d2c5
✨ (openai_agents): Add ctx-based agent_id resolver with process fallback
Chisanan232 Apr 30, 2026
74cc5b7
✨ (openai_agents): Add governance decision normalization helper
Chisanan232 Apr 30, 2026
a4134af
✨ (openai_agents): Add async governance pre-check helper for Function…
Chisanan232 Apr 30, 2026
f617b59
✨ (openai_agents): Add pending approval wait helper with timeout reso…
Chisanan232 Apr 30, 2026
95f56ce
✨ (openai_agents): Add ToolResult error-construction helper for deny …
Chisanan232 Apr 30, 2026
4c8cdd6
✨ (openai_agents): Add async audit result recording helper for tool c…
Chisanan232 Apr 30, 2026
9fd1696
✨ (openai_agents): Add governance error classifier helper for fail-op…
Chisanan232 Apr 30, 2026
f9d36d6
✨ (openai_agents): Implement FunctionTool __call__ patch apply and re…
Chisanan232 Apr 30, 2026
7c04c6f
✨ (assembly): Pass process_agent_id into OpenAIAgentsPatch wiring
Chisanan232 Apr 30, 2026
cd3320a
✅ (tests): Add comprehensive OpenAI Agents patch unit coverage and re…
Chisanan232 Apr 30, 2026
d94da17
✅ (integration): Add direct and MCP-coexistence coverage for OpenAI A…
Chisanan232 Apr 30, 2026
cdd0071
🩹 (openai_agents): Fix async result recording fallback block placement
Chisanan232 Apr 30, 2026
67dc91c
🩹 (integration): Use shared import dispatcher for OpenAI and MCP coex…
Chisanan232 Apr 30, 2026
c0b6784
✅ (tests): Cover OpenAI adapter fallback branches to resolve patch co…
Chisanan232 Apr 30, 2026
5c09230
✅ (tests): Cover async on_tool_end fallback await branch for OpenAI a…
Chisanan232 Apr 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
293 changes: 290 additions & 3 deletions agent_assembly/adapters/openai_agents/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,310 @@
from __future__ import annotations

from dataclasses import dataclass
from functools import wraps
import importlib
import importlib.util
from typing import Any
import inspect
from typing import Any, Literal

from agent_assembly.adapters.crewai.patch import (
_get_pending_tool_approval_timeout_seconds as _resolve_pending_timeout_seconds,
)
from agent_assembly.adapters.crewai.patch import _normalize_decision as _normalize_governance_decision

_ORIGINAL_FUNCTION_TOOL_CALL = "_agent_assembly_original_openai_agents_function_tool_call"
_PATCHED_FLAG = "_agent_assembly_openai_agents_function_tool_patched"
_PROCESS_AGENT_ID: str | None = None
_MAX_AUDIT_RESULT_CHARS = 2000


@dataclass(slots=True)
class OpenAIAgentsPatch:
"""Patch placeholder for OpenAI Agents SDK interception."""

callback_handler: Any
process_agent_id: str | None = None

def apply(self) -> bool:
_ = self.callback_handler
return _is_openai_agents_available()
set_process_agent_id(self.process_agent_id)
function_tool_cls = _load_openai_agents_function_tool_class()
if function_tool_cls is None:
return False
_apply_function_tool_call_patch(function_tool_cls, self.callback_handler)
return True

def revert(self) -> None:
function_tool_cls = _load_openai_agents_function_tool_class()
if function_tool_cls is not None:
_revert_function_tool_call_patch(function_tool_cls)
set_process_agent_id(None)
return None


def _is_openai_agents_available() -> bool:
return importlib.util.find_spec("openai.agents") is not None

Check failure on line 47 in agent_assembly/adapters/openai_agents/patch.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Define a constant instead of duplicating this literal "openai.agents" 3 times.

See more on https://sonarcloud.io/project/issues?id=AI-agent-assembly_python-sdk&issues=AZ3d6UBXtXZ2iDxjPtKw&open=AZ3d6UBXtXZ2iDxjPtKw&pullRequest=15


def set_process_agent_id(agent_id: str | None) -> None:
global _PROCESS_AGENT_ID
_PROCESS_AGENT_ID = agent_id


def _get_process_agent_id() -> str | None:
if isinstance(_PROCESS_AGENT_ID, str) and _PROCESS_AGENT_ID:
return _PROCESS_AGENT_ID
return None


def _load_openai_agents_function_tool_class() -> type[Any] | None:
try:
module = importlib.import_module("openai.agents")
except ImportError:
return None

function_tool_cls = getattr(module, "FunctionTool", None)
if isinstance(function_tool_cls, type):
return function_tool_cls
return None


def _resolve_agent_id(ctx: Any) -> str | None:
candidate = getattr(ctx, "agent_id", None)
if isinstance(candidate, str) and candidate:
return candidate
return _get_process_agent_id()


def _normalize_decision(
decision: object,
) -> tuple[Literal["allow", "deny", "pending"], str | None]:
return _normalize_governance_decision(decision)


def _resolve_governance_target(callback_handler: Any) -> Any:
target = getattr(callback_handler, "_interceptor", None)
if target is not None:
return target
return callback_handler


async def _invoke_async_tool_check(
callback_handler: Any,
*,
tool_name: str,
tool_input: Any,
agent_id: str | None,
ctx: Any,
) -> object:
target = _resolve_governance_target(callback_handler)
method = getattr(target, "check_tool_start", None)
if not callable(method):
return {"status": "allow"}

result = method(
serialized={"name": tool_name},
input_str=str(tool_input),
tool_name=tool_name,
args=tool_input,
agent_id=agent_id,
run_context=ctx,
)
if inspect.isawaitable(result):
return await result
return result


def _get_pending_tool_approval_timeout_seconds(callback_handler: Any) -> int:
return _resolve_pending_timeout_seconds(callback_handler)


async def _wait_for_async_tool_approval(
callback_handler: Any,
*,
tool_name: str,
timeout_seconds: int,
tool_input: Any,
agent_id: str | None,
ctx: Any,
) -> object:
target = _resolve_governance_target(callback_handler)
method = getattr(target, "wait_for_tool_approval", None)
if not callable(method):
return {"status": "deny", "reason": "Approval handler is unavailable."}

result = method(
serialized={"name": tool_name},
input_str=str(tool_input),
tool_name=tool_name,
timeout_seconds=timeout_seconds,
args=tool_input,
agent_id=agent_id,
run_context=ctx,
)
if inspect.isawaitable(result):
return await result
return result


def _build_tool_result_error(
*,
tool_name: str,
reason: str | None,
is_pending_rejection: bool,
) -> object:
try:
module = importlib.import_module("openai.agents")
except ImportError:
module = None

tool_result_cls = getattr(module, "ToolResult", None) if module is not None else None
reason_text = reason or "No reason provided."
if is_pending_rejection:
error_message = f"Approval denied for tool '{tool_name}': {reason_text}"
else:
error_message = f"Action blocked by governance policy for tool '{tool_name}': {reason_text}"

if isinstance(tool_result_cls, type):
try:
return tool_result_cls(error=error_message)
except Exception:
pass

return {"error": error_message}


def _truncate_result_for_audit(result: object) -> str:
return str(result)[:_MAX_AUDIT_RESULT_CHARS]


async def _record_async_tool_result(
callback_handler: Any,
*,
tool_name: str,
tool_input: Any,
result: object,
agent_id: str | None,
ctx: Any,
) -> None:
target = _resolve_governance_target(callback_handler)

record_method = getattr(target, "record_result", None)
if callable(record_method):
recorded = record_method(
tool_name=tool_name,
args=tool_input,
result=_truncate_result_for_audit(result),
agent_id=agent_id,
run_context=ctx,
)
if inspect.isawaitable(recorded):
await recorded
return None

tool_end_method = getattr(target, "on_tool_end", None)
if callable(tool_end_method):
recorded = tool_end_method(
output=_truncate_result_for_audit(result),
tool_name=tool_name,
agent_id=agent_id,
run_context=ctx,
)
if inspect.isawaitable(recorded):
await recorded


def _is_governance_error(error: Exception) -> bool:
del error
return True


def _apply_function_tool_call_patch(function_tool_cls: type[Any], callback_handler: Any) -> None:
if getattr(function_tool_cls, _PATCHED_FLAG, False):
return None

original_call = getattr(function_tool_cls, "__call__", None)
if not callable(original_call):
return None

@wraps(original_call)
async def patched_call(self: Any, ctx: Any, tool_input: Any, *args: Any, **kwargs: Any) -> Any:
tool_name = str(getattr(self, "name", self.__class__.__name__))
agent_id = _resolve_agent_id(ctx)

decision: object = {"status": "allow"}
governance_failed = False
try:
decision = await _invoke_async_tool_check(
callback_handler,
tool_name=tool_name,
tool_input=tool_input,
agent_id=agent_id,
ctx=ctx,
)
status, reason = _normalize_decision(decision)
is_pending_flow = False
if status == "pending":
is_pending_flow = True
timeout_seconds = _get_pending_tool_approval_timeout_seconds(callback_handler)
final_decision = await _wait_for_async_tool_approval(
callback_handler,
tool_name=tool_name,
timeout_seconds=timeout_seconds,
tool_input=tool_input,
agent_id=agent_id,
ctx=ctx,
)
status, reason = _normalize_decision(final_decision)

if status == "deny":
blocked_result = _build_tool_result_error(
tool_name=tool_name,
reason=reason,
is_pending_rejection=is_pending_flow,
)
await _record_async_tool_result(
callback_handler,
tool_name=tool_name,
tool_input=tool_input,
result=blocked_result,
agent_id=agent_id,
ctx=ctx,
)
return blocked_result
except Exception as error:
governance_failed = _is_governance_error(error)
if not governance_failed:
raise

result = original_call(self, ctx, tool_input, *args, **kwargs)
if inspect.isawaitable(result):
result = await result

if not governance_failed:
await _record_async_tool_result(
callback_handler,
tool_name=tool_name,
tool_input=tool_input,
result=result,
agent_id=agent_id,
ctx=ctx,
)
return result

setattr(function_tool_cls, _ORIGINAL_FUNCTION_TOOL_CALL, original_call)
setattr(function_tool_cls, "__call__", patched_call)
setattr(function_tool_cls, _PATCHED_FLAG, True)


def _revert_function_tool_call_patch(function_tool_cls: type[Any]) -> None:
if not getattr(function_tool_cls, _PATCHED_FLAG, False):
return None

original_call = getattr(function_tool_cls, _ORIGINAL_FUNCTION_TOOL_CALL, None)
if callable(original_call):
setattr(function_tool_cls, "__call__", original_call)

if hasattr(function_tool_cls, _ORIGINAL_FUNCTION_TOOL_CALL):
delattr(function_tool_cls, _ORIGINAL_FUNCTION_TOOL_CALL)
if hasattr(function_tool_cls, _PATCHED_FLAG):
delattr(function_tool_cls, _PATCHED_FLAG)
7 changes: 6 additions & 1 deletion agent_assembly/core/assembly.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,12 @@ def _build_patch_plan(client: GatewayClient, process_agent_id: str) -> list[Runt
if _is_installed("pydantic_ai"):
patch_plan.append(PydanticAIPatch(callback_target))
if _is_installed("openai") and _has_agents_sdk():
patch_plan.append(OpenAIAgentsPatch(callback_target))
patch_plan.append(
OpenAIAgentsPatch(
callback_handler=callback_target,
process_agent_id=process_agent_id,
)
)
if _is_installed("mcp"):
# Keep MCP patch last as fallback for remaining tool dispatch paths.
patch_plan.append(
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from __future__ import annotations

from types import SimpleNamespace
from typing import Any

import pytest

from agent_assembly.adapters.openai_agents import patch as openai_patch


@pytest.mark.asyncio
@pytest.mark.integration
async def test_direct_openai_agents_functiontool_blocks_then_allows_followup(
monkeypatch: pytest.MonkeyPatch,
) -> None:
class FakeToolResult:
def __init__(self, *, error: str | None = None, output: Any = None) -> None:
self.error = error
self.output = output

class FakeFunctionTool:
def __init__(self, name: str) -> None:
self.name = name

async def __call__(self, ctx: Any, tool_input: Any) -> dict[str, Any]:
return {"name": self.name, "ctx": ctx, "tool_input": tool_input}

fake_openai_agents_module = SimpleNamespace(
FunctionTool=FakeFunctionTool,
ToolResult=FakeToolResult,
)
monkeypatch.setattr(
openai_patch.importlib,
"import_module",
lambda name: fake_openai_agents_module
if name == "openai.agents"
else (_ for _ in ()).throw(ImportError(name)),
)

class Interceptor:
async def check_tool_start(self, **kwargs: object) -> dict[str, str]:
if kwargs.get("tool_name") == "blocked_tool":
return {"status": "deny", "reason": "blocked by policy"}
return {"status": "allow"}

patcher = openai_patch.OpenAIAgentsPatch(callback_handler=Interceptor(), process_agent_id="agent-oa")
assert patcher.apply() is True

blocked = await FakeFunctionTool("blocked_tool")(SimpleNamespace(agent_id="agent-oa"), {"step": 1})
safe = await FakeFunctionTool("safe_tool")(SimpleNamespace(agent_id="agent-oa"), {"step": 2})

assert isinstance(blocked, FakeToolResult)
assert isinstance(blocked.error, str)
assert "blocked by policy" in blocked.error
assert safe["name"] == "safe_tool"
Loading