From e58c8cf75dff079abdb44cdb63613cfca8207555 Mon Sep 17 00:00:00 2001 From: Abhinav Gupta Date: Thu, 26 Mar 2026 15:01:31 +0530 Subject: [PATCH 1/4] fix(sdk): use sync shutdown flush fallback --- .../python/src/agent_control/observability.py | 163 +++++++++++++++--- sdks/python/tests/test_observability.py | 17 ++ sdks/python/tests/test_shutdown.py | 131 ++++++++++++++ 3 files changed, 284 insertions(+), 27 deletions(-) diff --git a/sdks/python/src/agent_control/observability.py b/sdks/python/src/agent_control/observability.py index a84499c2..1d95632a 100644 --- a/sdks/python/src/agent_control/observability.py +++ b/sdks/python/src/agent_control/observability.py @@ -48,6 +48,7 @@ import atexit import logging import threading +import time from dataclasses import dataclass, field from typing import TYPE_CHECKING, Any @@ -388,29 +389,144 @@ def _stop_worker(self, *, graceful: bool, join_timeout: float) -> bool: self._thread = None return True - def _flush_all_without_worker(self, *, timeout: float) -> None: - """Flush remaining events in a helper thread when no worker loop is available.""" - flush_error: Exception | None = None + def _build_batch_request( + self, + events: list[ControlExecutionEvent], + ) -> tuple[str, dict[str, str], dict[str, Any]]: + """Build request components shared by async and sync send paths.""" + url = f"{self.server_url}/api/v1/observability/events" + headers = {"Content-Type": "application/json"} + if self.api_key: + headers["X-API-Key"] = self.api_key + payload = {"events": [event.model_dump(mode="json") for event in events]} + return url, headers, payload + + def _send_batch_sync( + self, + events: list[ControlExecutionEvent], + *, + deadline: float | None = None, + ) -> bool: + """Send a batch synchronously for shutdown/atexit fallback.""" + url, headers, payload = self._build_batch_request(events) + + for attempt in range(get_settings().max_retries): + remaining: float | None = None + if deadline is not None: + remaining = deadline - time.monotonic() + if remaining <= 0: + logger.warning("Fallback shutdown flush timed out before sending events") + return False + + request_timeout = 30.0 + if remaining is not None: + request_timeout = max(0.001, min(request_timeout, remaining)) - def run_flush() -> None: - nonlocal flush_error try: - asyncio.run(self.flush_all()) + with httpx.Client(timeout=request_timeout) as client: + response = client.post(url, json=payload, headers=headers) + + if response.status_code == 202: + return True + if response.status_code == 401: + logger.error("Authentication failed - check API key") + return False + + logger.warning( + "Server returned %s during shutdown flush: %s", + response.status_code, + response.text, + ) + except httpx.TimeoutException: + logger.warning("Timeout sending events during shutdown (attempt %d)", attempt + 1) + except httpx.ConnectError: + logger.warning( + "Connection error sending events during shutdown (attempt %d)", + attempt + 1, + ) except Exception as e: - flush_error = e + logger.error("Error sending events during shutdown: %s", e) - helper_thread = threading.Thread( - target=run_flush, - name="agent-control-event-batcher-shutdown-flush", - daemon=True, - ) - helper_thread.start() - helper_thread.join(timeout=timeout) - if helper_thread.is_alive(): - logger.warning("Fallback shutdown flush timed out after %.1f seconds", timeout) - return - if flush_error is not None: - logger.error("Error during fallback shutdown flush: %s", flush_error) + if attempt >= get_settings().max_retries - 1: + continue + + retry_delay = get_settings().retry_delay * (attempt + 1) + if deadline is not None: + remaining = deadline - time.monotonic() + if remaining <= 0: + logger.warning("Fallback shutdown flush timed out during retry backoff") + return False + retry_delay = min(retry_delay, remaining) + + if retry_delay > 0: + time.sleep(retry_delay) + + return False + + def _flush_sync(self, *, deadline: float | None = None) -> bool: + """Flush a batch synchronously without relying on asyncio state.""" + with self._lock: + if not self._events: + return True + events_to_send = self._events[:self.batch_size] + self._events = self._events[self.batch_size:] + + if not events_to_send: + return True + + success = self._send_batch_sync(events_to_send, deadline=deadline) + + if success: + with self._lock: + self._events_sent += len(events_to_send) + self._flush_count += 1 + total_sent = self._events_sent + logger.debug( + f"Flushed {len(events_to_send)} events " + f"(total sent: {total_sent})" + ) + return True + + with self._lock: + self._events = events_to_send + self._events + logger.warning(f"Failed to send batch, re-queued {len(events_to_send)} events") + return False + + def _flush_all_without_worker(self, *, timeout: float) -> None: + """Flush remaining events synchronously when no worker loop is available.""" + deadline = time.monotonic() + max(timeout, 0.0) + consecutive_failures = 0 + + while True: + with self._lock: + if not self._events: + break + + if time.monotonic() >= deadline: + logger.warning("Fallback shutdown flush timed out after %.1f seconds", timeout) + break + + flushed = self._flush_sync(deadline=deadline) + if flushed: + consecutive_failures = 0 + continue + + consecutive_failures += 1 + if consecutive_failures >= self.shutdown_max_failed_flushes: + with self._lock: + pending = len(self._events) + logger.warning( + "Stopping sync shutdown flush after %d consecutive failed flushes; " + "%d event(s) pending", + consecutive_failures, + pending, + ) + break + + # The worker thread normally closes the async client. If shutdown races or + # interpreter teardown prevented that path from completing, drop the reference + # so the SDK does not retain a half-closed async client after shutdown. + self._client = None def stop(self) -> None: """Stop the worker thread. Does not flush remaining events.""" @@ -543,14 +659,7 @@ async def _send_batch(self, events: list[ControlExecutionEvent]) -> bool: Returns: True if sent successfully, False otherwise """ - url = f"{self.server_url}/api/v1/observability/events" - headers = {"Content-Type": "application/json"} - if self.api_key: - headers["X-API-Key"] = self.api_key - - payload = { - "events": [event.model_dump(mode="json") for event in events] - } + url, headers, payload = self._build_batch_request(events) client = await self._get_client() diff --git a/sdks/python/tests/test_observability.py b/sdks/python/tests/test_observability.py index 16961eed..6c9d8c4e 100644 --- a/sdks/python/tests/test_observability.py +++ b/sdks/python/tests/test_observability.py @@ -219,6 +219,23 @@ def test_shutdown_flushes_when_worker_not_running(self): assert batcher._events_dropped == 0 assert batcher._thread is None + def test_shutdown_uses_sync_fallback_when_worker_not_running(self): + """Shutdown should use the sync fallback path without relying on asyncio.""" + batcher = EventBatcher(batch_size=100, flush_interval=60.0) + + for _ in range(5): + batcher.add_event(create_mock_event()) + + batcher._client = AsyncMock() + + with patch.object(batcher, "_send_batch_sync", return_value=True) as send_batch_sync: + batcher.shutdown() + + send_batch_sync.assert_called_once() + assert batcher._events_sent == 5 + assert len(batcher._events) == 0 + assert batcher._client is None + def test_shutdown_drains_inflight_flush_without_data_loss(self): """Test that shutdown waits for in-flight flushes and sends all events.""" import time diff --git a/sdks/python/tests/test_shutdown.py b/sdks/python/tests/test_shutdown.py index 77a2e3ed..073745ed 100644 --- a/sdks/python/tests/test_shutdown.py +++ b/sdks/python/tests/test_shutdown.py @@ -1,6 +1,10 @@ """Tests for agent_control.shutdown() and agent_control.ashutdown().""" import asyncio +import os +import subprocess +import sys +import textwrap import threading from typing import Any from unittest.mock import AsyncMock, MagicMock, patch @@ -259,3 +263,130 @@ async def slow_list_agent_controls(*args: Any, **kwargs: Any) -> dict[str, Any]: assert refreshed_snapshot is None assert state.server_controls is None + + +class TestAtexitShutdownFallback: + """Regression tests for short-lived processes relying on atexit shutdown.""" + + @pytest.mark.asyncio + async def test_short_lived_process_flushes_sdk_events_on_exit( + self, + client: agent_control.AgentControlClient, + test_agent: dict[str, Any], + unique_name: str, + server_url: str, + api_key: str | None, + tmp_path, + ) -> None: + """SDK-evaluated observability events should survive process exit.""" + control_name = f"sdk-shutdown-{unique_name}" + control = await agent_control.controls.create_control( + client, + control_name, + { + "description": "Flush observability on short-lived script shutdown", + "enabled": True, + "execution": "sdk", + "scope": {"stages": ["post"]}, + "condition": { + "selector": {"path": "output"}, + "evaluator": { + "name": "regex", + "config": {"pattern": r"\b123-45-6789\b"}, + }, + }, + "action": {"decision": "deny"}, + }, + ) + + await agent_control.agents.add_agent_control( + client, + agent_name=test_agent["agent_name"], + control_id=control["control_id"], + ) + + script_path = tmp_path / "short_lived_agent.py" + script_path.write_text( + textwrap.dedent( + f""" + import asyncio + import agent_control + from agent_control import ControlViolationError, control + + + @control() + async def chat(message: str) -> str: + return "SSN 123-45-6789" + + + agent_control.init( + agent_name={test_agent["agent_name"]!r}, + agent_description="atexit shutdown flush regression", + observability_enabled=True, + policy_refresh_interval_seconds=0, + ) + + + async def main() -> None: + try: + await chat("test") + except ControlViolationError: + pass + + + if __name__ == "__main__": + asyncio.run(main()) + """ + ).strip() + + "\n", + encoding="utf-8", + ) + + env = os.environ.copy() + env["AGENT_CONTROL_URL"] = server_url + env["AGENT_CONTROL_OBSERVABILITY_ENABLED"] = "true" + if api_key: + env["AGENT_CONTROL_API_KEY"] = api_key + else: + env.pop("AGENT_CONTROL_API_KEY", None) + + result = subprocess.run( + [sys.executable, str(script_path)], + cwd=tmp_path, + env=env, + capture_output=True, + text=True, + timeout=30, + check=False, + ) + + assert result.returncode == 0, ( + f"stdout:\n{result.stdout}\n\nstderr:\n{result.stderr}" + ) + + events: list[dict[str, Any]] = [] + for _ in range(20): + response = await client.http_client.post( + "/api/v1/observability/events/query", + json={ + "agent_name": test_agent["agent_name"], + "control_ids": [control["control_id"]], + "limit": 10, + "offset": 0, + }, + ) + response.raise_for_status() + events = response.json()["events"] + if events: + break + await asyncio.sleep(0.1) + + assert events, ( + "Expected at least one observability event from the short-lived process.\n" + f"stdout:\n{result.stdout}\n\nstderr:\n{result.stderr}" + ) + assert events[0]["control_id"] == control["control_id"] + assert events[0]["control_name"] == control_name + assert events[0]["check_stage"] == "post" + assert events[0]["matched"] is True + assert events[0]["action"] == "deny" From cbd06a2cef8afd8723f737aa6d56b40608125d30 Mon Sep 17 00:00:00 2001 From: Abhinav Gupta Date: Thu, 26 Mar 2026 15:38:36 +0530 Subject: [PATCH 2/4] chore: clarify shutdown fallback intent --- sdks/python/src/agent_control/observability.py | 11 +++-------- sdks/python/tests/test_observability.py | 5 +++-- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/sdks/python/src/agent_control/observability.py b/sdks/python/src/agent_control/observability.py index 1d95632a..f436fc8b 100644 --- a/sdks/python/src/agent_control/observability.py +++ b/sdks/python/src/agent_control/observability.py @@ -471,9 +471,6 @@ def _flush_sync(self, *, deadline: float | None = None) -> bool: events_to_send = self._events[:self.batch_size] self._events = self._events[self.batch_size:] - if not events_to_send: - return True - success = self._send_batch_sync(events_to_send, deadline=deadline) if success: @@ -524,8 +521,9 @@ def _flush_all_without_worker(self, *, timeout: float) -> None: break # The worker thread normally closes the async client. If shutdown races or - # interpreter teardown prevented that path from completing, drop the reference - # so the SDK does not retain a half-closed async client after shutdown. + # interpreter teardown prevented that path from completing, we intentionally + # do not try to aclose() here because this fallback may be running after + # asyncio teardown. Drop the reference so shutdown can finish deterministically. self._client = None def stop(self) -> None: @@ -622,9 +620,6 @@ async def _flush(self) -> bool: events_to_send = self._events[:self.batch_size] self._events = self._events[self.batch_size:] - if not events_to_send: - return True - success = await self._send_batch(events_to_send) if success: diff --git a/sdks/python/tests/test_observability.py b/sdks/python/tests/test_observability.py index 6c9d8c4e..60794217 100644 --- a/sdks/python/tests/test_observability.py +++ b/sdks/python/tests/test_observability.py @@ -207,12 +207,12 @@ def test_shutdown_flushes_and_joins_thread(self): def test_shutdown_flushes_when_worker_not_running(self): """Test that shutdown() still flushes when the worker thread is not running.""" batcher = EventBatcher(batch_size=100, flush_interval=60.0) - batcher._send_batch = AsyncMock(return_value=True) for _ in range(5): batcher.add_event(create_mock_event()) - batcher.shutdown() + with patch.object(batcher, "_send_batch_sync", return_value=True): + batcher.shutdown() assert batcher._events_sent == 5 assert len(batcher._events) == 0 @@ -234,6 +234,7 @@ def test_shutdown_uses_sync_fallback_when_worker_not_running(self): send_batch_sync.assert_called_once() assert batcher._events_sent == 5 assert len(batcher._events) == 0 + # The sync fallback only promises to drop the stale AsyncClient reference. assert batcher._client is None def test_shutdown_drains_inflight_flush_without_data_loss(self): From fc144800a9a95a63c6a60dc351a368f4df010576 Mon Sep 17 00:00:00 2001 From: Abhinav Gupta Date: Thu, 26 Mar 2026 17:50:33 +0530 Subject: [PATCH 3/4] test: cover sync shutdown flush branches --- sdks/python/tests/test_observability.py | 183 ++++++++++++++++++++++++ 1 file changed, 183 insertions(+) diff --git a/sdks/python/tests/test_observability.py b/sdks/python/tests/test_observability.py index 60794217..961e83b6 100644 --- a/sdks/python/tests/test_observability.py +++ b/sdks/python/tests/test_observability.py @@ -6,6 +6,7 @@ from unittest.mock import AsyncMock, MagicMock, patch from uuid import uuid4 +import httpx import pytest from agent_control.observability import ( @@ -448,6 +449,154 @@ async def test_send_batch_without_httpx(self): assert isinstance(result, bool) +class TestEventBatcherSendBatchSync: + """Tests for sync HTTP sending used during shutdown fallback.""" + + def test_send_batch_sync_returns_true_on_202(self): + batcher = EventBatcher(server_url="http://test:8000", api_key="test-key") + response = MagicMock(status_code=202, text="accepted") + client = MagicMock() + client.post.return_value = response + client_context = MagicMock() + client_context.__enter__.return_value = client + + with patch("agent_control.observability.httpx.Client", return_value=client_context) as client_ctor: + result = batcher._send_batch_sync([create_mock_event()]) + + assert result is True + client_ctor.assert_called_once_with(timeout=30.0) + client.post.assert_called_once() + + def test_send_batch_sync_returns_false_on_401_without_retry(self): + batcher = EventBatcher() + response = MagicMock(status_code=401, text="unauthorized") + client = MagicMock() + client.post.return_value = response + client_context = MagicMock() + client_context.__enter__.return_value = client + + with patch("agent_control.observability.httpx.Client", return_value=client_context) as client_ctor: + result = batcher._send_batch_sync([create_mock_event()]) + + assert result is False + assert client_ctor.call_count == 1 + client.post.assert_called_once() + + def test_send_batch_sync_retries_after_server_error_then_succeeds(self): + from agent_control.settings import configure_settings + + original = get_settings().model_dump() + configure_settings(max_retries=2, retry_delay=0.25) + batcher = EventBatcher() + + first = MagicMock(status_code=500, text="server error") + second = MagicMock(status_code=202, text="accepted") + client = MagicMock() + client.post.side_effect = [first, second] + client_context = MagicMock() + client_context.__enter__.return_value = client + + try: + with ( + patch( + "agent_control.observability.httpx.Client", + return_value=client_context, + ) as client_ctor, + patch("agent_control.observability.time.sleep") as sleep_mock, + ): + result = batcher._send_batch_sync([create_mock_event()]) + + assert result is True + assert client_ctor.call_count == 2 + sleep_mock.assert_called_once_with(0.25) + finally: + configure_settings(**original) + + def test_send_batch_sync_returns_false_when_deadline_already_expired(self): + batcher = EventBatcher() + + with ( + patch("agent_control.observability.httpx.Client") as client_ctor, + patch("agent_control.observability.time.monotonic", return_value=2.0), + ): + result = batcher._send_batch_sync([create_mock_event()], deadline=1.0) + + assert result is False + client_ctor.assert_not_called() + + def test_send_batch_sync_returns_false_when_retry_backoff_exceeds_deadline(self): + from agent_control.settings import configure_settings + + original = get_settings().model_dump() + configure_settings(max_retries=3, retry_delay=0.25) + batcher = EventBatcher() + + client = MagicMock() + client.post.side_effect = httpx.ConnectError("boom") + client_context = MagicMock() + client_context.__enter__.return_value = client + + try: + with ( + patch( + "agent_control.observability.httpx.Client", + return_value=client_context, + ) as client_ctor, + patch( + "agent_control.observability.time.monotonic", + side_effect=[0.0, 1.1], + ), + patch("agent_control.observability.time.sleep") as sleep_mock, + ): + result = batcher._send_batch_sync([create_mock_event()], deadline=1.0) + + assert result is False + assert client_ctor.call_count == 1 + sleep_mock.assert_not_called() + finally: + configure_settings(**original) + + def test_send_batch_sync_handles_timeout_exception(self): + from agent_control.settings import configure_settings + + original = get_settings().model_dump() + configure_settings(max_retries=1) + batcher = EventBatcher() + + client = MagicMock() + client.post.side_effect = httpx.TimeoutException("boom") + client_context = MagicMock() + client_context.__enter__.return_value = client + + try: + with patch("agent_control.observability.httpx.Client", return_value=client_context): + result = batcher._send_batch_sync([create_mock_event()]) + + assert result is False + finally: + configure_settings(**original) + + def test_send_batch_sync_handles_unexpected_exception(self): + from agent_control.settings import configure_settings + + original = get_settings().model_dump() + configure_settings(max_retries=1) + batcher = EventBatcher() + + client = MagicMock() + client.post.side_effect = RuntimeError("boom") + client_context = MagicMock() + client_context.__enter__.return_value = client + + try: + with patch("agent_control.observability.httpx.Client", return_value=client_context): + result = batcher._send_batch_sync([create_mock_event()]) + + assert result is False + finally: + configure_settings(**original) + + class TestGlobalBatcher: """Tests for global batcher functions.""" @@ -602,6 +751,40 @@ def test_shutdown_uses_settings_timeouts(self): finally: configure_settings(**original) + def test_sync_shutdown_flush_stops_after_failed_flush_limit(self): + """Test that sync shutdown fallback exits after configured failed flushes.""" + batcher = EventBatcher(batch_size=2) + batcher.shutdown_max_failed_flushes = 2 + batcher._client = AsyncMock() + for _ in range(3): + batcher.add_event(create_mock_event()) + + with patch.object(batcher, "_send_batch_sync", return_value=False) as send_batch_sync: + batcher._flush_all_without_worker(timeout=1.0) + + assert send_batch_sync.call_count == 2 + assert len(batcher._events) == 3 + assert batcher._client is None + + def test_sync_shutdown_flush_honors_timeout_before_first_attempt(self): + """Test that sync shutdown fallback exits if its timeout is already exhausted.""" + batcher = EventBatcher() + batcher._client = AsyncMock() + batcher.add_event(create_mock_event()) + + with ( + patch.object(batcher, "_send_batch_sync") as send_batch_sync, + patch( + "agent_control.observability.time.monotonic", + side_effect=[0.0, 0.0], + ), + ): + batcher._flush_all_without_worker(timeout=0.0) + + send_batch_sync.assert_not_called() + assert len(batcher._events) == 1 + assert batcher._client is None + class TestSpanLogging: """Tests for span start/end logging functions.""" From 30cd7b9aebe2a3a363d43998ae618505db442acc Mon Sep 17 00:00:00 2001 From: Abhinav Gupta Date: Fri, 27 Mar 2026 15:19:42 +0530 Subject: [PATCH 4/4] chore: use break in final sync retry --- sdks/python/src/agent_control/observability.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/src/agent_control/observability.py b/sdks/python/src/agent_control/observability.py index f436fc8b..26a7d553 100644 --- a/sdks/python/src/agent_control/observability.py +++ b/sdks/python/src/agent_control/observability.py @@ -448,7 +448,7 @@ def _send_batch_sync( logger.error("Error sending events during shutdown: %s", e) if attempt >= get_settings().max_retries - 1: - continue + break retry_delay = get_settings().retry_delay * (attempt + 1) if deadline is not None: