diff --git a/utu/agents/orchestrator_agent.py b/utu/agents/orchestrator_agent.py index ae4ed93f..731366b0 100644 --- a/utu/agents/orchestrator_agent.py +++ b/utu/agents/orchestrator_agent.py @@ -106,7 +106,7 @@ async def _run_task(self, recorder: Recorder, task: Task): input = recorder.history_messages + [{"role": "user", "content": task_with_context}] # run the task recorder._event_queue.put_nowait(OrchestratorStreamEvent(name="task.start", item=task)) - result = worker.run_streamed(input) + result = await worker.run_streamed(input) async for event in result.stream_events(): recorder._event_queue.put_nowait(event) task.result = result.final_output # set result diff --git a/utu/agents/workforce/assigner.py b/utu/agents/workforce/assigner.py index b7ba281a..e55dc950 100644 --- a/utu/agents/workforce/assigner.py +++ b/utu/agents/workforce/assigner.py @@ -23,9 +23,11 @@ def __init__(self, config: AgentConfig): self.config = config self.llm = LLMAgent(model_config=config.workforce_planner_model) - async def assign_task(self, recorder: WorkspaceTaskRecorder) -> Subtask: + async def assign_task(self, recorder: WorkspaceTaskRecorder) -> Subtask | None: """Assigns a task to a worker node with the best capability.""" next_task = recorder.get_next_task() + if next_task is None: + return None sp = PROMPTS["TASK_ASSIGN_SYS_PROMPT"].format( overall_task=recorder.overall_task, diff --git a/utu/agents/workforce/data.py b/utu/agents/workforce/data.py index f78b4301..ddbe1dec 100644 --- a/utu/agents/workforce/data.py +++ b/utu/agents/workforce/data.py @@ -78,9 +78,9 @@ def has_uncompleted_tasks(self) -> bool: return True return False - def get_next_task(self) -> Subtask: + def get_next_task(self) -> Subtask | None: assert self.task_plan is not None, "No task plan available." for task in self.task_plan: if task.task_status == "not started": return task - return "No uncompleted tasks." + return None diff --git a/utu/eval/benchmarks/base_benchmark.py b/utu/eval/benchmarks/base_benchmark.py index d2a268e4..64309b04 100644 --- a/utu/eval/benchmarks/base_benchmark.py +++ b/utu/eval/benchmarks/base_benchmark.py @@ -69,8 +69,8 @@ def preprocess_one(self, sample: EvaluationSample) -> EvaluationSample: processed_sample = processer.preprocess_one(sample) if processed_sample is None: return None - self.dataset.save(sample) - return sample + self.dataset.save(processed_sample) + return processed_sample async def rollout(self, max_retries: int = 3) -> None: """Rollout the datapoints.""" diff --git a/utu/tools/local_env/file_edit.py b/utu/tools/local_env/file_edit.py index 7b1f462d..d88af87a 100644 --- a/utu/tools/local_env/file_edit.py +++ b/utu/tools/local_env/file_edit.py @@ -34,6 +34,13 @@ def _resolve_filepath(self, file_path: str) -> Path: sanitized_filename = self._sanitize_filename(path_obj.name) path_obj = path_obj.parent / sanitized_filename resolved_path = path_obj.resolve() + + # Security check: ensure the resolved path is within work_dir + try: + resolved_path.relative_to(self.work_dir) + except ValueError: + raise ValueError(f"Path {resolved_path} is outside the allowed workspace {self.work_dir}") + self._create_backup(resolved_path) return resolved_path diff --git a/utu/tracing/phoenix_utils.py b/utu/tracing/phoenix_utils.py index b0e05f22..70a81380 100644 --- a/utu/tracing/phoenix_utils.py +++ b/utu/tracing/phoenix_utils.py @@ -37,9 +37,10 @@ def get_project(self) -> dict: return self.client.projects.get(project_name=self.project_name) def get_trace_url_by_id(self, trace_id: str) -> str | None: - # get trace by trace_id in @openai-agents, see the trick in OpenInferenceTracingProcessor + # Escape single quotes to prevent injection into the filter expression. + safe_trace_id = trace_id.replace("'", "''") spans_df = self.get_spans( - condition=f"metadata['trace_id'] == '{trace_id}'", select=["context.trace_id"], limit=1 + condition=f"metadata['trace_id'] == '{safe_trace_id}'", select=["context.trace_id"], limit=1 ) if spans_df.empty: return None