Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions openhands-agent-server/openhands/agent_server/event_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,10 +472,9 @@ def _emit_event_from_thread(self, event: Event) -> None:
from callbacks that may run in different threads. Events are emitted through
the conversation's normal event flow to ensure they are persisted.
"""
if self._main_loop and self._main_loop.is_running() and self._conversation:
# Capture conversation reference for closure
conversation = self._conversation

main_loop = self._main_loop
conversation = self._conversation
if main_loop and main_loop.is_running() and conversation:
# Wrap _on_event with lock acquisition to ensure thread-safe access
# to conversation state and event log during concurrent operations
def locked_on_event():
Expand All @@ -484,7 +483,7 @@ def locked_on_event():

# Run the locked callback in an executor to ensure the event is
# both persisted and sent to WebSocket subscribers
self._main_loop.run_in_executor(None, locked_on_event)
main_loop.run_in_executor(None, locked_on_event)

def _setup_llm_log_streaming(self, agent: AgentBase) -> None:
"""Configure LLM log callbacks to stream logs via events."""
Expand All @@ -500,13 +499,16 @@ def log_callback(
filename: str, log_data: str, uid=usage_id, model=model_name
) -> None:
"""Callback to emit LLM completion logs as events."""
event = LLMCompletionLogEvent(
filename=filename,
log_data=log_data,
model_name=model,
usage_id=uid,
)
self._emit_event_from_thread(event)
try:
event = LLMCompletionLogEvent(
filename=filename,
log_data=log_data,
model_name=model,
usage_id=uid,
)
self._emit_event_from_thread(event)
except Exception:
logger.exception("Failed to emit LLM completion log event")

llm.telemetry.set_log_completions_callback(log_callback)

Expand Down
50 changes: 39 additions & 11 deletions openhands-tools/openhands/tools/apply_patch/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ def assemble_changes(
old_content=old_content,
)
else:
assert False
raise DiffError(
f"Unexpected state: path '{path}' exists in neither orig nor dest"
)
return commit


Expand Down Expand Up @@ -95,13 +97,15 @@ def is_done(self, prefixes: tuple[str, ...] | None = None) -> bool:
return False

def startswith(self, prefix: str | tuple[str, ...]) -> bool:
assert self.index < len(self.lines), f"Index: {self.index} >= {len(self.lines)}"
if self.index >= len(self.lines):
raise DiffError(f"Unexpected end of patch at index {self.index}")
if self.lines[self.index].startswith(prefix):
return True
return False

def read_str(self, prefix: str = "", return_everything: bool = False) -> str:
assert self.index < len(self.lines), f"Index: {self.index} >= {len(self.lines)}"
if self.index >= len(self.lines):
raise DiffError(f"Unexpected end of patch at index {self.index}")
line = self.lines[self.index]
if line.startswith(prefix):
text = line if return_everything else line[len(prefix) :]
Expand All @@ -116,11 +120,15 @@ def parse(self):
if path in self.patch.actions:
raise DiffError(f"Update File Error: Duplicate Path: {path}")
move_to = self.read_str("*** Move to: ")
if move_to and (".." in move_to.split("/") or move_to.startswith("/")):
raise DiffError(
f"Update File Error: Invalid move path '{move_to}': "
"must be a relative path without '..' components"
)
if path not in self.current_files:
raise DiffError(f"Update File Error: Missing File: {path}")
text = self.current_files[path]
action = self.parse_update_file(text)
# TODO: Check move_to is valid
action.move_path = move_to
self.patch.actions[path] = action
continue
Expand Down Expand Up @@ -359,7 +367,10 @@ def identify_files_needed(text: str) -> list[str]:


def _get_updated_file(text: str, action: PatchAction, path: str) -> str:
assert action.type == ActionType.UPDATE
if action.type != ActionType.UPDATE:
raise DiffError(
f"_get_updated_file: expected UPDATE action for '{path}', got {action.type}"
)
orig_lines = text.split("\n")
dest_lines = []
orig_index = 0
Expand All @@ -375,7 +386,11 @@ def _get_updated_file(text: str, action: PatchAction, path: str) -> str:
f"_get_updated_file: {path}: orig_index {orig_index} > "
f"chunk.orig_index {chunk.orig_index}"
)
assert orig_index <= chunk.orig_index
if orig_index > chunk.orig_index:
raise DiffError(
f"_get_updated_file: {path}: orig_index {orig_index} advanced past "
f"chunk.orig_index {chunk.orig_index}"
)
dest_lines.extend(orig_lines[orig_index : chunk.orig_index])
delta = chunk.orig_index - orig_index
orig_index += delta
Expand All @@ -389,8 +404,16 @@ def _get_updated_file(text: str, action: PatchAction, path: str) -> str:
delta = len(orig_lines) - orig_index
orig_index += delta
dest_index += delta
assert orig_index == len(orig_lines)
assert dest_index == len(dest_lines)
if orig_index != len(orig_lines):
raise DiffError(
f"_get_updated_file: {path}: did not consume all original lines "
f"(orig_index={orig_index}, len={len(orig_lines)})"
)
if dest_index != len(dest_lines):
raise DiffError(
f"_get_updated_file: {path}: dest line count mismatch "
f"(dest_index={dest_index}, len={len(dest_lines)})"
)
return "\n".join(dest_lines)


Expand Down Expand Up @@ -449,10 +472,14 @@ def apply_commit(
if change.type == ActionType.DELETE:
remove_fn(path)
elif change.type == ActionType.ADD:
assert change.new_content is not None
if change.new_content is None:
raise DiffError(f"apply_commit: ADD change for '{path}' has no content")
write_fn(path, change.new_content)
elif change.type == ActionType.UPDATE:
assert change.new_content is not None
if change.new_content is None:
raise DiffError(
f"apply_commit: UPDATE change for '{path}' has no content"
)
if change.move_path:
write_fn(change.move_path, change.new_content)
remove_fn(path)
Expand All @@ -470,7 +497,8 @@ def process_patch(

Returns (message, fuzz, commit)
"""
assert text.startswith("*** Begin Patch")
if not text.startswith("*** Begin Patch"):
raise DiffError("Invalid patch: must start with '*** Begin Patch'")
paths = identify_files_needed(text)
orig = load_files(paths, open_fn)
patch, fuzz = text_to_patch(text, orig)
Expand Down
7 changes: 1 addition & 6 deletions openhands-tools/openhands/tools/gemini/edit/impl.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Edit tool executor implementation."""

import os
from pathlib import Path
from typing import TYPE_CHECKING

Expand Down Expand Up @@ -43,11 +42,7 @@ def __call__(
new_string = action.new_string
expected_replacements = action.expected_replacements

# Resolve path relative to workspace
if not os.path.isabs(file_path):
resolved_path = self.workspace_root / file_path
else:
resolved_path = Path(file_path)
resolved_path = (self.workspace_root / file_path).resolve()

# Handle file creation (old_string is empty)
if old_string == "":
Expand Down
30 changes: 30 additions & 0 deletions tests/agent_server/test_event_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2619,3 +2619,33 @@ def _blocking_wait(timeout: float) -> None:
f"(call_count={parent_llm._call_count})"
)
assert es._run_task is None


def test_emit_event_from_thread_uses_captured_loop(event_service: EventService) -> None:
"""_emit_event_from_thread must use the captured main_loop, not self._main_loop.

Before this fix, the method captured _main_loop into a local variable for
the if-check but then called self._main_loop.run_in_executor(...) in the
body. A concurrent close() setting self._main_loop = None between the
check and the call would cause AttributeError. The fix uses main_loop.
"""
captured_calls: list = []

mock_loop = MagicMock()
mock_loop.is_running.return_value = True

def record_and_null(*args, **kwargs):
# Simulate concurrent close() nulling self._main_loop mid-call
object.__setattr__(event_service, "_main_loop", None)
captured_calls.append(args)

mock_loop.run_in_executor.side_effect = record_and_null

event_service._main_loop = mock_loop # type: ignore[assignment]
event_service._conversation = MagicMock() # type: ignore[assignment]

event = MagicMock()

# Should not raise AttributeError even though self._main_loop is cleared
event_service._emit_event_from_thread(event)
assert len(captured_calls) == 1, "run_in_executor should have been called once"
35 changes: 35 additions & 0 deletions tests/tools/apply_patch/test_apply_patch_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,38 @@ def test_path_escape_with_parent_directory(tmp_ws: Path):
obs = run_exec(tmp_ws, patch)
assert obs.is_error
assert "Absolute or escaping paths" in obs.text


def test_malformed_patch_header_returns_differror(tmp_ws: Path):
"""A patch missing '*** Begin Patch' must return a structured error, not crash.

Before the assert→DiffError fix, process_patch() used assert to validate
the header. Python disables assert with -O (optimized mode, used in Docker
production images), so a bad header would silently pass and corrupt state.
Now it raises DiffError which is caught and returned as is_error=True.
"""
obs = run_exec(tmp_ws, "INVALID HEADER\n*** End Patch")
assert obs.is_error
assert "Begin Patch" in obs.text


def test_invalid_move_to_path_returns_differror(tmp_ws: Path):
"""A '*** Move to:' with '..' components must be rejected as a structured error.

The original Parser.parse() had a TODO acknowledging this check was missing.
An unvalidated move_to path could allow the agent to move files outside the
workspace.
"""
fp = tmp_ws / "source.txt"
fp.write_text("hello\n")
patch = (
"*** Begin Patch\n"
"*** Update File: source.txt\n"
"*** Move to: ../outside.txt\n"
"@@\n"
" hello\n"
"*** End Patch"
)
obs = run_exec(tmp_ws, patch)
assert obs.is_error
assert "Invalid move path" in obs.text