Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
934 changes: 809 additions & 125 deletions agent_core/core/impl/llm/errors.py

Large diffs are not rendered by default.

136 changes: 120 additions & 16 deletions agent_core/core/impl/llm/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
get_cache_config,
get_cache_metrics,
)
from agent_core.core.impl.llm.errors import LLMConsecutiveFailureError
from agent_core.core.impl.llm.errors import LLMConsecutiveFailureError, classify_llm_error
from agent_core.core.hooks import (
GetTokenCountHook,
SetTokenCountHook,
Expand Down Expand Up @@ -367,7 +367,7 @@ def _generate_response_sync(
logger.info(f"[LLM SEND] system={system_prompt} | user={user_prompt}")

try:
if self.provider in ("openai", "minimax", "deepseek", "moonshot", "grok"):
if self.provider in ("openai", "minimax", "deepseek", "moonshot", "grok", "openrouter"):
response = self._generate_openai(system_prompt, user_prompt)
elif self.provider == "remote":
response = self._generate_ollama(system_prompt, user_prompt)
Expand All @@ -384,8 +384,14 @@ def _generate_response_sync(

# Check if response is empty and provide diagnostics
if not content:
# Prefer the classified rich message (provider + upstream +
# raw + action hint inline) over the bare exception string.
# This is what the user actually sees in the chat bubble.
error_info = response.get("error_info_obj")
error_msg = response.get("error", "")
if error_msg:
if error_info is not None:
error_detail = error_info.message
elif error_msg:
error_detail = f"LLM provider returned error: {error_msg}"
else:
error_detail = (
Expand All @@ -402,7 +408,14 @@ def _generate_response_sync(
f"[LLM CONSECUTIVE FAILURE] Count: {self._consecutive_failures}/{self._max_consecutive_failures}"
)
if self._consecutive_failures >= self._max_consecutive_failures:
raise LLMConsecutiveFailureError(self._consecutive_failures)
# Attach the underlying classified info so the agent_base
# error handler can show the *cause* of the 5 failures
# (e.g. "rate-limited on Google AI Studio") instead of a
# meta-message about retry counts.
raise LLMConsecutiveFailureError(
self._consecutive_failures,
last_error_info=error_info,
)
raise RuntimeError(error_detail)

# Success - reset consecutive failure counter
Expand All @@ -428,7 +441,17 @@ def _generate_response_sync(
f"[LLM CONSECUTIVE FAILURE] Count: {self._consecutive_failures}/{self._max_consecutive_failures} | Error: {e}"
)
if self._consecutive_failures >= self._max_consecutive_failures:
raise LLMConsecutiveFailureError(self._consecutive_failures, last_error=e) from e
# Classify on the way out so the fatal-failure handler can
# surface the cause, not just the count.
try:
info = classify_llm_error(e, provider=self.provider, model=self.model)
except Exception:
info = None
raise LLMConsecutiveFailureError(
self._consecutive_failures,
last_error=e,
last_error_info=info,
) from e
raise

@profile("llm_generate_response", OperationCategory.LLM)
Expand Down Expand Up @@ -502,7 +525,7 @@ def create_session_cache(
supports_caching = (
(self.provider == "byteplus" and self._byteplus_cache_manager) or
(self.provider == "gemini" and self._gemini_cache_manager) or
(self.provider in ("openai", "deepseek", "grok") and self.client) or # OpenAI/DeepSeek/Grok use automatic caching with prompt_cache_key
(self.provider in ("openai", "deepseek", "grok", "openrouter") and self.client) or # OpenAI/DeepSeek/Grok/OpenRouter use automatic caching with prompt_cache_key (and cache_control for Anthropic-routed OpenRouter models)
(self.provider == "anthropic" and self._anthropic_client) # Anthropic uses ephemeral caching with extended TTL
)

Expand Down Expand Up @@ -605,7 +628,7 @@ def has_session_cache(self, task_id: str, call_type: str) -> bool:
return True
if self.provider == "gemini" and self._gemini_cache_manager:
return True
if self.provider in ("openai", "deepseek", "grok") and self.client:
if self.provider in ("openai", "deepseek", "grok", "openrouter") and self.client:
return True
if self.provider == "anthropic" and self._anthropic_client:
return True
Expand Down Expand Up @@ -687,8 +710,8 @@ def _generate_response_with_session_sync(
logger.info(f"[LLM RECV] {cleaned}")
return cleaned

# Handle OpenAI/DeepSeek/Grok with call_type-based cache routing
if self.provider in ("openai", "deepseek", "grok"):
# Handle OpenAI/DeepSeek/Grok/OpenRouter with call_type-based cache routing
if self.provider in ("openai", "deepseek", "grok", "openrouter"):
# Get stored system prompt or use provided one
session_key = f"{task_id}:{call_type}"
stored_system_prompt = self._session_system_prompts.get(session_key)
Expand Down Expand Up @@ -1184,15 +1207,46 @@ def _generate_openai(
# Always enforce JSON output format
request_kwargs["response_format"] = {"type": "json_object"}

# Add prompt_cache_key for OpenAI/DeepSeek cache routing.
# Grok (xAI) does not support prompt_cache_key — it uses automatic
# prefix caching and ignores this parameter, so skip it for Grok.
if self.provider != "grok" and call_type and system_prompt and len(system_prompt) >= config.min_cache_tokens:
# Build provider-specific cache hints in extra_body.
# - prompt_cache_key (OpenAI/DeepSeek/OpenRouter): improves prefix-cache routing
# stickiness across alternating call types. Grok ignores it; we skip there
# to avoid noise.
# - cache_control (OpenRouter routing to Anthropic Claude only): Anthropic
# prompt caching is opt-in. OpenRouter accepts a top-level cache_control
# field and applies it to the last cacheable block automatically. For
# OpenAI/DeepSeek/Gemini upstreams via OpenRouter, caching is automatic
# on the upstream side, so cache_control would be ignored — we only set
# it when the slug is Anthropic-routed.
extra_body: Dict[str, Any] = {}

long_enough = system_prompt and len(system_prompt) >= config.min_cache_tokens

if self.provider != "grok" and call_type and long_enough:
prompt_hash = hashlib.sha256(system_prompt.encode()).hexdigest()[:16]
cache_key = f"{call_type}_{prompt_hash}"
request_kwargs["extra_body"] = {"prompt_cache_key": cache_key}
extra_body["prompt_cache_key"] = cache_key
logger.debug(f"[OPENAI] Using prompt_cache_key: {cache_key}")

if self.provider == "openrouter" and long_enough:
model_lower_for_cache = (self.model or "").lower()
# OpenRouter slugs are "<provider>/<model>". Anthropic Claude routes
# are the only ones requiring opt-in cache_control. Detect by either
# the slug prefix or the "claude" substring (some aliases like
# "anthropic/claude-3.5-sonnet:beta" still match).
if model_lower_for_cache.startswith("anthropic/") or "claude" in model_lower_for_cache:
cache_control: Dict[str, Any] = {"type": "ephemeral"}
if call_type:
# 1-hour TTL keeps caches alive across alternating call types
# (mirrors the Anthropic-direct path).
cache_control["ttl"] = "1h"
extra_body["cache_control"] = cache_control
logger.debug(
f"[OPENROUTER] Anthropic cache_control: {cache_control} (model={self.model})"
)

if extra_body:
request_kwargs["extra_body"] = extra_body

response = self.client.chat.completions.create(**request_kwargs)
content = response.choices[0].message.content.strip()
token_count_input = response.usage.prompt_tokens
Expand Down Expand Up @@ -1235,9 +1289,11 @@ def _generate_openai(
token_count_output,
)

# Report usage
# Report usage. service_type stays "llm_openai" (the request shape) but
# provider attributes to the actual upstream so dashboards split out
# OpenRouter / DeepSeek / Grok separately.
self._report_usage_async(
"llm_openai", "openai", self.model,
"llm_openai", self.provider, self.model,
token_count_input, token_count_output, cached_tokens
)

Expand Down Expand Up @@ -1310,6 +1366,18 @@ def _generate_ollama(self, system_prompt: str | None, user_prompt: str) -> Dict[
if exc_obj:
error_str = f"{type(exc_obj).__name__}: {str(exc_obj)}"
result["error"] = error_str
# Classify once and stash the LLMErrorInfo object so the
# outer `_generate_response_sync` can put `info.message`
# (the rich detailed string) into the RuntimeError it raises,
# and attach the info to LLMConsecutiveFailureError at the
# 5-failure threshold. The classifier is wrapped in try/except
# so it can never break the error path itself.
try:
result["error_info_obj"] = classify_llm_error(
exc_obj, provider=self.provider, model=self.model
)
except Exception:
pass
result["content"] = ""
logger.error(f"[OLLAMA_ERROR] {error_str}")
else:
Expand Down Expand Up @@ -1431,6 +1499,18 @@ def _generate_gemini(
if exc_obj:
error_str = f"{type(exc_obj).__name__}: {str(exc_obj)}"
result["error"] = error_str
# Classify once and stash the LLMErrorInfo object so the
# outer `_generate_response_sync` can put `info.message`
# (the rich detailed string) into the RuntimeError it raises,
# and attach the info to LLMConsecutiveFailureError at the
# 5-failure threshold. The classifier is wrapped in try/except
# so it can never break the error path itself.
try:
result["error_info_obj"] = classify_llm_error(
exc_obj, provider=self.provider, model=self.model
)
except Exception:
pass
result["content"] = ""
logger.error(f"[GEMINI_ERROR] {error_str}")
else:
Expand Down Expand Up @@ -1668,6 +1748,18 @@ def _generate_byteplus_standard(
if exc_obj:
error_str = f"{type(exc_obj).__name__}: {str(exc_obj)}"
result["error"] = error_str
# Classify once and stash the LLMErrorInfo object so the
# outer `_generate_response_sync` can put `info.message`
# (the rich detailed string) into the RuntimeError it raises,
# and attach the info to LLMConsecutiveFailureError at the
# 5-failure threshold. The classifier is wrapped in try/except
# so it can never break the error path itself.
try:
result["error_info_obj"] = classify_llm_error(
exc_obj, provider=self.provider, model=self.model
)
except Exception:
pass
result["content"] = ""
logger.error(f"[BYTEPLUS_ERROR] {error_str}")
else:
Expand Down Expand Up @@ -1815,6 +1907,18 @@ def _generate_anthropic(
if exc_obj:
error_str = f"{type(exc_obj).__name__}: {str(exc_obj)}"
result["error"] = error_str
# Classify once and stash the LLMErrorInfo object so the
# outer `_generate_response_sync` can put `info.message`
# (the rich detailed string) into the RuntimeError it raises,
# and attach the info to LLMConsecutiveFailureError at the
# 5-failure threshold. The classifier is wrapped in try/except
# so it can never break the error path itself.
try:
result["error_info_obj"] = classify_llm_error(
exc_obj, provider=self.provider, model=self.model
)
except Exception:
pass
result["content"] = ""
logger.error(f"[ANTHROPIC_ERROR] {error_str}")
else:
Expand Down
Loading