Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 134 additions & 30 deletions sdks/python/src/agent_control/observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import atexit
import logging
import threading
import time
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any

Expand Down Expand Up @@ -388,29 +389,142 @@ def _stop_worker(self, *, graceful: bool, join_timeout: float) -> bool:
self._thread = None
return True

def _flush_all_without_worker(self, *, timeout: float) -> None:
"""Flush remaining events in a helper thread when no worker loop is available."""
flush_error: Exception | None = None
def _build_batch_request(
self,
events: list[ControlExecutionEvent],
) -> tuple[str, dict[str, str], dict[str, Any]]:
"""Build request components shared by async and sync send paths."""
url = f"{self.server_url}/api/v1/observability/events"
headers = {"Content-Type": "application/json"}
if self.api_key:
headers["X-API-Key"] = self.api_key
payload = {"events": [event.model_dump(mode="json") for event in events]}
return url, headers, payload

def _send_batch_sync(
self,
events: list[ControlExecutionEvent],
*,
deadline: float | None = None,
) -> bool:
"""Send a batch synchronously for shutdown/atexit fallback."""
url, headers, payload = self._build_batch_request(events)

for attempt in range(get_settings().max_retries):
remaining: float | None = None
if deadline is not None:
remaining = deadline - time.monotonic()
if remaining <= 0:
logger.warning("Fallback shutdown flush timed out before sending events")
return False

request_timeout = 30.0
if remaining is not None:
request_timeout = max(0.001, min(request_timeout, remaining))

def run_flush() -> None:
nonlocal flush_error
try:
asyncio.run(self.flush_all())
with httpx.Client(timeout=request_timeout) as client:
response = client.post(url, json=payload, headers=headers)

if response.status_code == 202:
return True
if response.status_code == 401:
logger.error("Authentication failed - check API key")
return False

logger.warning(
"Server returned %s during shutdown flush: %s",
response.status_code,
response.text,
)
except httpx.TimeoutException:
logger.warning("Timeout sending events during shutdown (attempt %d)", attempt + 1)
except httpx.ConnectError:
logger.warning(
"Connection error sending events during shutdown (attempt %d)",
attempt + 1,
)
except Exception as e:
flush_error = e
logger.error("Error sending events during shutdown: %s", e)

helper_thread = threading.Thread(
target=run_flush,
name="agent-control-event-batcher-shutdown-flush",
daemon=True,
)
helper_thread.start()
helper_thread.join(timeout=timeout)
if helper_thread.is_alive():
logger.warning("Fallback shutdown flush timed out after %.1f seconds", timeout)
return
if flush_error is not None:
logger.error("Error during fallback shutdown flush: %s", flush_error)
if attempt >= get_settings().max_retries - 1:
break

retry_delay = get_settings().retry_delay * (attempt + 1)
if deadline is not None:
remaining = deadline - time.monotonic()
if remaining <= 0:
logger.warning("Fallback shutdown flush timed out during retry backoff")
return False
retry_delay = min(retry_delay, remaining)

if retry_delay > 0:
time.sleep(retry_delay)

return False

def _flush_sync(self, *, deadline: float | None = None) -> bool:
"""Flush a batch synchronously without relying on asyncio state."""
with self._lock:
if not self._events:
return True
events_to_send = self._events[:self.batch_size]
self._events = self._events[self.batch_size:]

success = self._send_batch_sync(events_to_send, deadline=deadline)

if success:
with self._lock:
self._events_sent += len(events_to_send)
self._flush_count += 1
total_sent = self._events_sent
logger.debug(
f"Flushed {len(events_to_send)} events "
f"(total sent: {total_sent})"
)
return True

with self._lock:
self._events = events_to_send + self._events
logger.warning(f"Failed to send batch, re-queued {len(events_to_send)} events")
return False

def _flush_all_without_worker(self, *, timeout: float) -> None:
"""Flush remaining events synchronously when no worker loop is available."""
deadline = time.monotonic() + max(timeout, 0.0)
consecutive_failures = 0

while True:
with self._lock:
if not self._events:
break

if time.monotonic() >= deadline:
logger.warning("Fallback shutdown flush timed out after %.1f seconds", timeout)
break

flushed = self._flush_sync(deadline=deadline)
if flushed:
consecutive_failures = 0
continue

consecutive_failures += 1
if consecutive_failures >= self.shutdown_max_failed_flushes:
with self._lock:
pending = len(self._events)
logger.warning(
"Stopping sync shutdown flush after %d consecutive failed flushes; "
"%d event(s) pending",
consecutive_failures,
pending,
)
break

# The worker thread normally closes the async client. If shutdown races or
# interpreter teardown prevented that path from completing, we intentionally
# do not try to aclose() here because this fallback may be running after
# asyncio teardown. Drop the reference so shutdown can finish deterministically.
self._client = None

def stop(self) -> None:
"""Stop the worker thread. Does not flush remaining events."""
Expand Down Expand Up @@ -506,9 +620,6 @@ async def _flush(self) -> bool:
events_to_send = self._events[:self.batch_size]
self._events = self._events[self.batch_size:]

if not events_to_send:
return True

success = await self._send_batch(events_to_send)

if success:
Expand Down Expand Up @@ -543,14 +654,7 @@ async def _send_batch(self, events: list[ControlExecutionEvent]) -> bool:
Returns:
True if sent successfully, False otherwise
"""
url = f"{self.server_url}/api/v1/observability/events"
headers = {"Content-Type": "application/json"}
if self.api_key:
headers["X-API-Key"] = self.api_key

payload = {
"events": [event.model_dump(mode="json") for event in events]
}
url, headers, payload = self._build_batch_request(events)

client = await self._get_client()

Expand Down
Loading
Loading