Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 93 additions & 1 deletion src/agents/extensions/sandbox/blaxel/sandbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
TRANSIENT_HTTP_STATUS_CODES,
exception_chain_contains_type,
exception_chain_has_status_code,
iter_exception_chain,
retry_async,
)
from ....sandbox.util.tar_utils import UnsafeTarMemberError, validate_tar_bytes
Expand All @@ -72,6 +73,85 @@
logger = logging.getLogger(__name__)


# Blaxel documents structured API error codes and retryability at:
# https://docs.blaxel.ai/troubleshooting/error-codes
_BLAXEL_ERROR_CODE_RETRYABLE: dict[str, bool] = {
"ROUTE_NOT_FOUND": False, # 404
"WORKLOAD_NOT_FOUND": False, # 404
"WORKSPACE_NOT_FOUND": False, # 404
"WORKLOAD_UNAVAILABLE": True, # 404
"AUTHENTICATION_REQUIRED": False, # 401
"AUTHENTICATION_FAILED": False, # 401
"FORBIDDEN": False, # 403
"BAD_REQUEST": False, # 400
"USAGE_LIMIT_EXCEEDED": False, # 402
"POLICY_VIOLATION": False, # varies
}


def _coerce_mapping(value: object) -> dict[str, object] | None:
if isinstance(value, dict):
return {str(key): item for key, item in value.items()}
if isinstance(value, str):
try:
decoded = json.loads(value)
except json.JSONDecodeError:
return None
if isinstance(decoded, dict):
return {str(key): item for key, item in decoded.items()}
return None


def _blaxel_error_payload(error: BaseException) -> dict[str, object] | None:
for candidate in iter_exception_chain(error):
for attr in ("body", "payload"):
payload = _coerce_mapping(getattr(candidate, attr, None))
if payload is not None:
return payload

response = getattr(candidate, "response", None)
response_json = getattr(response, "json", None)
if callable(response_json):
try:
payload = _coerce_mapping(response_json())
except Exception:
payload = None
if payload is not None:
return payload

response_text = getattr(response, "text", None)
payload = _coerce_mapping(response_text)
if payload is not None:
return payload

return None


def _blaxel_structured_error(error: BaseException) -> dict[str, object] | None:
payload = _blaxel_error_payload(error)
if payload is None:
return None
nested = payload.get("error")
if isinstance(nested, dict):
return {str(key): value for key, value in nested.items()}
return payload


def _blaxel_provider_retryability(error: BaseException) -> tuple[bool | None, str | None]:
structured_error = _blaxel_structured_error(error)
if structured_error is not None:
retryable = structured_error.get("retryable")
if isinstance(retryable, bool):
code = structured_error.get("code")
return retryable, str(code) if isinstance(code, str) and code else None

code = structured_error.get("code")
if isinstance(code, str):
return _BLAXEL_ERROR_CODE_RETRYABLE.get(code), code

return None, None


def _blaxel_provider_error_detail(error: BaseException) -> str | None:
message = str(error)
status = getattr(error, "status_code", None) or getattr(error, "status", None)
Expand All @@ -91,15 +171,26 @@ def _blaxel_exec_transport_error(
) -> ExecTransportError:
detail = _blaxel_provider_error_detail(cause)
context: dict[str, object] = {"backend": "blaxel"}
retryable, provider_error_code = _blaxel_provider_retryability(cause)
if provider_error_code is not None:
context["provider_error_code"] = provider_error_code
if detail:
context["provider_error"] = detail
status = getattr(cause, "status_code", None) or getattr(cause, "status", None)
if isinstance(status, int):
context["http_status"] = status
if retryable is None and status in TRANSIENT_HTTP_STATUS_CODES:
retryable = True
message = "Blaxel exec failed"
if detail:
message = f"{message}: {detail}"
return ExecTransportError(command=command, context=context, cause=cause, message=message)
return ExecTransportError(
command=command,
context=context,
cause=cause,
message=message,
retryable=retryable,
)


def _import_blaxel_sdk() -> Any:
Expand Down Expand Up @@ -583,6 +674,7 @@ async def persist_workspace(self) -> io.IOBase:
"reason": "tar_failed",
"output": result.stderr.decode("utf-8", errors="replace"),
},
retryable=False,
)
raw_data: Any = await self._sandbox.fs.read_binary(tar_path)
if isinstance(raw_data, str):
Expand Down
43 changes: 30 additions & 13 deletions src/agents/extensions/sandbox/cloudflare/sandbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,20 @@
from ....sandbox.session.sandbox_client import BaseSandboxClient, BaseSandboxClientOptions
from ....sandbox.snapshot import SnapshotBase, SnapshotSpec, resolve_snapshot
from ....sandbox.types import ExecResult, ExposedPortEndpoint, User
from ....sandbox.util.retry import (
TRANSIENT_HTTP_STATUS_CODES,
exception_chain_has_status_code,
retry_async,
)
from ....sandbox.util.retry import retry_async
from ....sandbox.util.tar_utils import UnsafeTarMemberError, validate_tar_bytes
from ....sandbox.workspace_paths import coerce_posix_path, posix_path_as_path, sandbox_path_str

_DEFAULT_EXEC_TIMEOUT_S = 30.0
_DEFAULT_REQUEST_TIMEOUT_S = 120.0
_MAX_ERROR_BODY_CHARS = 2000
# Cloudflare documents sandbox HTTP status retry semantics at:
# https://cloudflare-sandbox-sdk.mintlify.app/advanced/error-handling#http-status-code-semantics
_CLOUDFLARE_HTTP_STATUS_RETRYABLE: dict[int, bool] = {
400: False,
500: False,
503: True,
}

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -141,6 +144,12 @@ def _cloudflare_error_context(
return context


def _cloudflare_retryability_for_status(status: int | None) -> bool | None:
if status is None:
return None
return _CLOUDFLARE_HTTP_STATUS_RETRYABLE.get(status)


def _cloudflare_exec_error_detail(error: ExecTransportError) -> str | None:
detail = error.context.get("provider_error")
if isinstance(detail, str) and detail:
Expand All @@ -164,15 +173,17 @@ def _cloudflare_transport_error(
) -> ExecTransportError:
detail = str(cause)
provider_error = f"{type(cause).__name__}: {detail}" if detail else type(cause).__name__
context: dict[str, object] = {
"backend": "cloudflare",
"operation": operation,
"provider_error": provider_error,
}
return ExecTransportError(
command=command,
context={
"backend": "cloudflare",
"operation": operation,
"provider_error": provider_error,
},
context=context,
cause=cause,
message=f"Cloudflare {operation} transport failed: {provider_error}",
retryable=None,
)


Expand All @@ -181,7 +192,7 @@ def _is_transient_workspace_error(exc: BaseException) -> bool:
if not isinstance(exc, WorkspaceArchiveReadError | WorkspaceArchiveWriteError):
return False
status = exc.context.get("http_status")
return isinstance(status, int) and status in TRANSIENT_HTTP_STATUS_CODES
return isinstance(status, int) and _cloudflare_retryability_for_status(status) is True


@dataclass
Expand Down Expand Up @@ -731,6 +742,7 @@ async def _exec_internal(
context=_cloudflare_error_context(status=resp.status, detail=detail),
cause=Exception(message),
message=message,
retryable=_cloudflare_retryability_for_status(resp.status),
)

stdout_parts: list[bytes] = []
Expand Down Expand Up @@ -802,6 +814,7 @@ async def _exec_internal(
),
cause=Exception(message),
message=message,
retryable=_cloudflare_retryability_for_status(resp.status),
)

except asyncio.TimeoutError as e:
Expand Down Expand Up @@ -1152,6 +1165,7 @@ async def read(self, path: Path | str, *, user: str | User | None = None) -> io.
"http_status": resp.status,
"message": body.get("error", "path escapes /workspace"),
},
retryable=False,
)
if resp.status != 200:
body = {}
Expand All @@ -1166,6 +1180,7 @@ async def read(self, path: Path | str, *, user: str | User | None = None) -> io.
"http_status": resp.status,
"message": body.get("error", f"HTTP {resp.status}"),
},
retryable=_cloudflare_retryability_for_status(resp.status),
)
return io.BytesIO(self._decode_streamed_payload(await resp.read()))
except (WorkspaceReadNotFoundError, WorkspaceArchiveReadError):
Expand Down Expand Up @@ -1219,6 +1234,7 @@ async def write(
"http_status": resp.status,
"message": body.get("error", "path escapes /workspace"),
},
retryable=False,
)
if resp.status != 200:
body = {}
Expand All @@ -1233,6 +1249,7 @@ async def write(
"http_status": resp.status,
"message": body.get("error", f"HTTP {resp.status}"),
},
retryable=_cloudflare_retryability_for_status(resp.status),
)
except WorkspaceArchiveWriteError:
raise
Expand All @@ -1255,7 +1272,6 @@ async def running(self) -> bool:

@retry_async(
retry_if=lambda exc, self: isinstance(exc, aiohttp.ClientError)
or exception_chain_has_status_code(exc, TRANSIENT_HTTP_STATUS_CODES)
or _is_transient_workspace_error(exc)
)
async def _persist_workspace_via_http(self) -> io.IOBase:
Expand Down Expand Up @@ -1286,6 +1302,7 @@ async def _persist_workspace_via_http(self) -> io.IOBase:
"http_status": resp.status,
"message": body.get("error", f"HTTP {resp.status}"),
},
retryable=_cloudflare_retryability_for_status(resp.status),
)
return io.BytesIO(self._decode_streamed_payload(await resp.read()))
except WorkspaceArchiveReadError:
Expand All @@ -1297,7 +1314,6 @@ async def _persist_workspace_via_http(self) -> io.IOBase:

@retry_async(
retry_if=lambda exc, self, data: isinstance(exc, aiohttp.ClientError)
or exception_chain_has_status_code(exc, TRANSIENT_HTTP_STATUS_CODES)
or _is_transient_workspace_error(exc)
)
async def _hydrate_workspace_via_http(self, data: io.IOBase) -> None:
Expand Down Expand Up @@ -1346,6 +1362,7 @@ async def _hydrate_workspace_via_http(self, data: io.IOBase) -> None:
"http_status": resp.status,
"message": body.get("error", f"HTTP {resp.status}"),
},
retryable=_cloudflare_retryability_for_status(resp.status),
)
except WorkspaceArchiveWriteError:
raise
Expand Down
Loading
Loading