Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 21 additions & 18 deletions backend/app/agent/factory/browser.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,25 @@ def browser_agent(options: Chat):
else:
selected_port = env("browser_port", "9222")

enabled_browser_tools = [
"browser_click",
"browser_type",
"browser_back",
"browser_forward",
"browser_select",
"browser_console_exec",
"browser_console_view",
"browser_switch_tab",
"browser_enter",
"browser_visit_page",
"browser_scroll",
"browser_sheet_read",
"browser_sheet_input",
"browser_get_page_snapshot",
]
if selected_is_external:
enabled_browser_tools.append("browser_open")

web_toolkit_custom = HybridBrowserToolkit(
options.project_id,
cdp_keep_current_page=True,
Expand All @@ -197,23 +216,7 @@ def browser_agent(options: Chat):
stealth=True,
session_id=toolkit_session_id,
cdp_url=f"http://localhost:{selected_port}",
enabled_tools=[
"browser_click",
"browser_type",
"browser_back",
"browser_forward",
"browser_select",
"browser_console_exec",
"browser_console_view",
"browser_switch_tab",
"browser_enter",
"browser_visit_page",
"browser_scroll",
"browser_sheet_read",
"browser_sheet_input",
"browser_get_page_snapshot",
"browser_open",
],
enabled_tools=enabled_browser_tools,
)

# Save reference before registering for toolkits_to_register_agent
Expand Down Expand Up @@ -307,7 +310,7 @@ def browser_agent(options: Chat):
),
options,
tools,
prune_tool_calls_from_memory=True,
prune_tool_calls_from_memory=False,
tool_names=[
SearchToolkit.toolkit_name(),
HybridBrowserToolkit.toolkit_name(),
Expand Down
3 changes: 2 additions & 1 deletion backend/app/controller/chat_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,14 +342,15 @@ def improve(id: str, data: SupplementChat):
data=ImprovePayload(
question=data.question,
attaches=data.attaches or [],
target=data.target,
),
new_task_id=data.task_id,
)
)
)
chat_logger.info(
"Improvement request queued with preserved context",
extra={"project_id": id},
extra={"project_id": id, "target": data.target},
)
return Response(status_code=201)

Expand Down
4 changes: 4 additions & 0 deletions backend/app/model/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ class Chat(BaseModel):
search_config: dict[str, str] | None = None
# User identifier for user-specific skill configurations
user_id: str | None = None
# Target agent for @mention routing: "browser", "dev", "doc",
# "media", "workforce", or None (default behavior)
target: str | None = None

@field_validator("model_type")
@classmethod
Expand Down Expand Up @@ -141,6 +144,7 @@ class SupplementChat(BaseModel):
question: str
task_id: str | None = None
attaches: list[str] = []
target: str | None = None


class HumanReply(BaseModel):
Expand Down
264 changes: 264 additions & 0 deletions backend/app/service/chat_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
mcp_agent,
multi_modal_agent,
question_confirm_agent,
social_media_agent,
task_summary_agent,
)
from app.agent.listen_chat_agent import ListenChatAgent
Expand All @@ -46,8 +47,10 @@
from app.model.chat import Chat, NewAgent, Status, TaskContent, sse_json
from app.service.task import (
Action,
ActionAgentEndData,
ActionDecomposeProgressData,
ActionDecomposeTextData,
ActionEndData,
ActionImproveData,
ActionInstallMcpData,
ActionNewAgent,
Expand Down Expand Up @@ -284,6 +287,104 @@ def build_context_for_workforce(
)


# ================================================================
# @mention direct agent routing
# ================================================================

# Maps @mention target names to (factory_fn, is_async) pairs
_AGENT_TARGET_MAP: dict[str, tuple] = {
"browser": (browser_agent, False),
"dev": (developer_agent, True),
"doc": (document_agent, True),
"media": (multi_modal_agent, False),
"social": (social_media_agent, True),
}


async def _create_persistent_agent(
target: str, options: Chat
) -> ListenChatAgent:
"""Create a persistent agent by target name using existing factories."""
if target not in _AGENT_TARGET_MAP:
raise ValueError(
f"Unknown agent target: {target}. "
f"Valid targets: {list(_AGENT_TARGET_MAP.keys())}"
)
factory_fn, is_async = _AGENT_TARGET_MAP[target]
if is_async:
agent = await factory_fn(options)
else:
agent = await asyncio.to_thread(factory_fn, options)
logger.info(
f"[DIRECT-AGENT] Created persistent agent: {target}",
extra={
"project_id": options.project_id,
"agent_name": getattr(agent, "agent_name", target),
"agent_id": getattr(agent, "agent_id", ""),
},
)
return agent


async def _run_direct_agent(
agent,
prompt: str,
question: str,
task_lock: TaskLock,
):
"""Background task that runs a direct agent step.

agent.astep() internally sends activate_agent and deactivate_agent
events via the queue, so step_solve's main loop can process them
in real-time alongside toolkit events.

After completion, puts ActionEndData into the queue so step_solve
yields the 'end' SSE event.
"""
from camel.agents.chat_agent import (
AsyncStreamingChatAgentResponse,
)

response_content = ""
try:
response = await agent.astep(prompt)
if isinstance(response, AsyncStreamingChatAgentResponse):
# Must consume the stream to trigger deactivation
# in _astream_chunks's finally block
async for chunk in response:
if chunk.msg and chunk.msg.content:
response_content += chunk.msg.content
else:
response_content = response.msg.content if response.msg else ""
except Exception as e:
logger.error(
f"[DIRECT-AGENT] Error executing agent: {e}",
exc_info=True,
)
response_content = f"Error executing agent: {e}"

# Save conversation history
task_lock.add_conversation("user", question)
task_lock.add_conversation("assistant", response_content)

# Yield control so the agent's deactivate_agent event
# (scheduled via _schedule_async_task in _astream_chunks's
# finally block) fires before the end event.
# Without this, end arrives first and frontend ignores
# deactivate_agent because task is already FINISHED.
await asyncio.sleep(0.1)

# Signal per-agent completion. step_solve's agent_end handler
# will emit the real "end" only when ALL agents are done.
await task_lock.put_queue(
ActionAgentEndData(
data=response_content,
agent_id=getattr(agent, "agent_id", ""),
agent_name=getattr(agent, "agent_name", ""),
)
)


@sync_step
async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
"""Main task execution loop. Called when POST /chat endpoint
Expand Down Expand Up @@ -469,6 +570,139 @@ async def step_solve(options: Chat, request: Request, task_lock: TaskLock):
f"'{question[:100]}...'"
)

# --- @mention direct agent routing ---
target: str | None = None
if (
hasattr(options, "target")
and options.target
and loop_iteration == 1
):
target = options.target
elif isinstance(item, ActionImproveData) and item.data.target:
target = item.data.target

if (
target
and target != "workforce"
and target in _AGENT_TARGET_MAP
):
# Direct agent mode: keep the SAME task_id
# across turns (continuous chat in one chatStore).
# Do NOT update options.task_id here — the
# frontend reuses the existing chatStore.

logger.info(
f"[DIRECT-AGENT] Routing to @{target}",
extra={
"project_id": options.project_id,
"target": target,
"task_id": options.task_id,
},
)

# Send confirmed event so frontend transitions
# from pending/splitting state.
# direct=True tells frontend to skip task
# splitting and go straight to RUNNING.
yield sse_json(
"confirmed",
{"question": question, "direct": True},
)

# Ensure event loop is set for agent internals
set_main_event_loop(asyncio.get_running_loop())

# Get or create persistent agent
agent = task_lock.persistent_agents.get(target)
is_new_agent = agent is None

logger.info(
f"[DIRECT-AGENT] persistent_agents"
f" keys: {list(task_lock.persistent_agents.keys())},"
f" is_new={is_new_agent},"
f" target={target}",
)

if is_new_agent:
# Factory internally sends create_agent
# via ActionCreateAgentData in the queue
agent = await _create_persistent_agent(target, options)
task_lock.persistent_agents[target] = agent
logger.info(
f"[DIRECT-AGENT] Created NEW "
f"agent: {agent.agent_name} "
f"(id={agent.agent_id})",
)
else:
logger.info(
f"[DIRECT-AGENT] REUSING "
f"agent: {agent.agent_name} "
f"(id={agent.agent_id})",
)
# Reused agent: factory won't send
# create_agent, so we must send it
# explicitly for the new chatStore
tool_names = []
for t in getattr(agent, "tools", []):
fn_name = getattr(t, "get_function_name", None)
if fn_name:
tool_names.append(fn_name())
yield sse_json(
"create_agent",
{
"agent_name": agent.agent_name,
"agent_id": agent.agent_id,
"tools": tool_names,
},
)

# Each direct agent needs a unique process_task_id
# so toolkit events map to the correct agent panel.
agent.process_task_id = (
f"{options.task_id}_{agent.agent_id}"
)

# New agents need prior conversation context
# injected into the prompt; reused agents
# already have it in CAMEL memory.
if is_new_agent:
conv_ctx = build_conversation_context(
task_lock,
header="=== Previous Conversation ===",
)
prompt = f"{conv_ctx}\nUser: {question}"
else:
prompt = question
if attaches_to_use:
prompt += f"\n\nAttached files: {attaches_to_use}"

# Launch agent in background task so step_solve's
# while loop can process queue events (activate,
# toolkit, deactivate) in real-time.
# agent.astep() internally sends activate_agent
# and deactivate_agent via the queue.
task = asyncio.create_task(
_run_direct_agent(
agent,
prompt,
question,
task_lock,
)
)
task_lock.add_background_task(task)
continue

if target == "workforce":
logger.info(
"[DIRECT-AGENT] @workforce: "
"cleaning up persistent agents",
extra={
"project_id": options.project_id,
},
)
await task_lock.cleanup_persistent_agents()
# --- end @mention routing ---

is_exceeded, total_length = check_conversation_history_length(
task_lock
)
Expand Down Expand Up @@ -1598,6 +1832,36 @@ def on_stream_text(chunk):
},
)

elif item.action == Action.agent_end:
# Per-agent completion for @mention direct chat.
# Only emit the real "end" when ALL agents finish.
agent_name = item.agent_name
agent_id = item.agent_id
remaining = len(task_lock.background_tasks)
logger.info(
f"[AGENT-END] Agent {agent_name} "
f"({agent_id}) finished. "
f"Remaining background tasks: {remaining}",
extra={
"project_id": options.project_id,
},
)

yield sse_json(
"agent_end",
{
"agent_id": agent_id,
"agent_name": agent_name,
"data": item.data or "",
},
)

if remaining == 0:
# All agents done — trigger real end.
# Don't pass agent content as data; per-agent
# results are already sent via agent_end events.
await task_lock.put_queue(ActionEndData())

elif item.action == Action.end:
logger.info("=" * 80)
logger.info(
Expand Down
Loading