Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 39 additions & 33 deletions openwpm/browser_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
import traceback
from pathlib import Path
from queue import Empty as EmptyQueue
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Type, Union
from typing import Any, Dict, Optional, Tuple, Type, Union

import psutil
from multiprocess import Queue
from selenium.common.exceptions import WebDriverException
from tblib import Traceback, pickling_support

from .command_execution_context import CommandExecutionContext
from .command_sequence import CommandSequence
from .commands.browser_commands import FinalizeCommand
from .commands.profile_commands import dump_profile
Expand All @@ -27,6 +28,7 @@
from .config import BrowserParamsInternal, ManagerParamsInternal
from .deploy_browsers import deploy_firefox
from .errors import BrowserConfigError, BrowserCrashError, ProfileLoadError
from .failure_tracker import CommandFailure
from .socket_interface import ClientSocket
from .storage.storage_providers import TableName
from .types import BrowserId, VisitId
Expand All @@ -39,9 +41,6 @@

pickling_support.install()

if TYPE_CHECKING:
from .task_manager import TaskManager


class BrowserManagerHandle:
"""The BrowserManagerHandle class is responsible for holding all the
Expand Down Expand Up @@ -343,16 +342,23 @@ def close_browser_manager(self, force: bool = False) -> None:

def execute_command_sequence(
self,
# Quoting to break cyclic import, see https://stackoverflow.com/a/39757388
task_manager: "TaskManager",
context: CommandExecutionContext,
command_sequence: CommandSequence,
) -> None:
"""
Sends CommandSequence to the BrowserManager one command at a time
Sends CommandSequence to the BrowserManager one command at a time.

Parameters
----------
context : CommandExecutionContext
Provides storage and failure tracking without requiring a
direct reference to TaskManager.
command_sequence : CommandSequence
The sequence of commands to execute.
"""
assert self.browser_id is not None
assert self.curr_visit_id is not None
task_manager.sock.store_record(
context.store_record(
TableName("site_visits"),
self.curr_visit_id,
{
Expand Down Expand Up @@ -412,11 +418,9 @@ def execute_command_sequence(
"process while executing command %s. Setting failure "
"status." % (self.browser_id, str(command))
)
task_manager.failure_status = {
"ErrorType": "CriticalChildException",
"CommandSequence": command_sequence,
"Exception": status[1],
}
context.failure_tracker.set_critical_failure(
"CriticalChildException", command_sequence, exception=status[1]
)
error_text, tb = self._unpack_pickled_error(status[1])
elif status[0] == "FAILED":
command_status = "error"
Expand All @@ -436,7 +440,7 @@ def execute_command_sequence(
else:
raise ValueError("Unknown browser status message %s" % status)

task_manager.sock.store_record(
context.store_record(
TableName("crawl_history"),
self.curr_visit_id,
{
Expand All @@ -455,39 +459,42 @@ def execute_command_sequence(
)

if command_status == "critical":
task_manager.sock.finalize_visit_id(
success=False,
context.finalize_visit_id(
visit_id=self.curr_visit_id,
success=False,
)
return

if command_status != "ok":
with task_manager.threadlock:
task_manager.failure_count += 1
if task_manager.failure_count > task_manager.failure_limit:
over_limit = context.failure_tracker.record_failure(
CommandFailure(
browser_id=self.browser_id,
command=repr(command),
command_status=command_status,
error=error_text,
traceback=tb,
)
)
if over_limit:
self.logger.critical(
"BROWSER %i: Command execution failure pushes failure "
"count above the allowable limit. Setting "
"failure_status." % self.browser_id
)
task_manager.failure_status = {
"ErrorType": "ExceedCommandFailureLimit",
"CommandSequence": command_sequence,
}
context.failure_tracker.set_critical_failure(
"ExceedCommandFailureLimit", command_sequence
)
return
self.restart_required = True
self.logger.debug(
"BROWSER %i: Browser restart required" % self.browser_id
)
# Reset failure_count at the end of each successful command sequence
elif type(command) is FinalizeCommand:
with task_manager.threadlock:
task_manager.failure_count = 0
context.failure_tracker.reset()

if self.restart_required:
task_manager.sock.finalize_visit_id(
success=False, visit_id=self.curr_visit_id
)
context.finalize_visit_id(visit_id=self.curr_visit_id, success=False)
break

self.logger.info(
Expand All @@ -500,7 +507,7 @@ def execute_command_sequence(
# internal buffers to drain. Stopgap in support of #135
time.sleep(2)

if task_manager.closing:
if context.closing:
return

# Allow StorageWatchdog to utilize built-in browser reset functionality
Expand All @@ -520,10 +527,9 @@ def execute_command_sequence(
"BROWSER %i: Exceeded the maximum allowable consecutive "
"browser launch failures. Setting failure_status." % self.browser_id
)
task_manager.failure_status = {
"ErrorType": "ExceedLaunchFailureLimit",
"CommandSequence": command_sequence,
}
context.failure_tracker.set_critical_failure(
"ExceedLaunchFailureLimit", command_sequence
)
return
self.restart_required = False

Expand Down
32 changes: 32 additions & 0 deletions openwpm/command_execution_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Protocol for the context needed by BrowserManagerHandle to execute commands.

This decouples BrowserManagerHandle from the concrete TaskManager class,
allowing tests to provide lightweight mock implementations.
"""

from typing import Any, Dict, Protocol

from .failure_tracker import FailureTracker
from .storage.storage_providers import TableName
from .types import VisitId


class CommandExecutionContext(Protocol):
"""Interface that BrowserManagerHandle needs from the TaskManager.

TaskManager implements this protocol. Tests can provide a lightweight
mock with a real FailureTracker but no storage/browser infrastructure.
"""

closing: bool
failure_tracker: FailureTracker

def store_record(
self, table: TableName, visit_id: VisitId, data: Dict[str, Any]
) -> None:
"""Send a record to the StorageController."""
...

def finalize_visit_id(self, visit_id: VisitId, success: bool) -> None:
"""Signal that all data for a visit_id has been sent."""
...
114 changes: 114 additions & 0 deletions openwpm/failure_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
"""Tracks command execution failures and determines when to halt crawling.

There are two failure mechanisms:

1. **Consecutive failure tracking** — Each non-ok command status is recorded
via ``record_failure``. After a successful command sequence the list is
cleared. When the number of recorded failures exceeds ``failure_limit``
the tracker escalates to a critical failure.

2. **Critical failure** — An immediate halt signal set by
``set_critical_failure``. Three situations trigger this:
- A browser process sends a CRITICAL status (``CriticalChildException``).
- The consecutive failure limit is exceeded (``ExceedCommandFailureLimit``).
- A browser fails to relaunch (``ExceedLaunchFailureLimit``).
"""

import threading
from dataclasses import dataclass
from typing import Any, Dict, List, Optional

from .command_sequence import CommandSequence
from .types import BrowserId


@dataclass(frozen=True)
class CommandFailure:
"""A single command execution failure.

Attributes
----------
browser_id
The browser that experienced the failure.
command
Human-readable representation of the command that failed.
command_status
The status string reported by the browser process
(e.g. ``"timeout"``, ``"error"``, ``"neterror"``).
error
Optional error message extracted from the browser process.
traceback
Optional JSON-encoded traceback from the browser process.
"""

browser_id: BrowserId
command: str
command_status: str
error: Optional[str] = None
traceback: Optional[str] = None


class FailureTracker:
"""Thread-safe tracker for command execution failures.

Used by ``BrowserManagerHandle`` to report failures without needing a
direct reference to ``TaskManager``. The ``TaskManager`` checks
``has_critical_failure`` periodically and shuts down if it is set.

The recorded ``failures`` list preserves context across consecutive
failures so that common causes can be identified (e.g. all failures
from the same browser, or all the same error type).
"""

def __init__(self, failure_limit: int) -> None:
self.failure_limit = failure_limit
self.failures: List[CommandFailure] = []
self.critical_failure: Optional[Dict[str, Any]] = None
Copy link

Copilot AI Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Representing critical_failure as Dict[str, Any] makes the API easy to misuse (stringly-typed keys like \"ErrorType\", optional \"Exception\") and leaks internal structure to consumers. Since you already introduced the CommandFailure dataclass, consider introducing a small @dataclass/TypedDict for the critical failure payload (and possibly an Enum for the error type) so TaskManager._check_failure_status() can rely on typed fields instead of ad-hoc dict keys.

Copilot uses AI. Check for mistakes.
self.lock = threading.Lock()

def record_failure(self, failure: CommandFailure) -> bool:
"""Record a command failure.

Returns ``True`` when the number of consecutive failures exceeds
``failure_limit``, indicating that the crawl should be halted.
"""
with self.lock:
self.failures.append(failure)
return len(self.failures) > self.failure_limit

def reset(self) -> None:
"""Clear recorded failures after a successful command sequence."""
with self.lock:
self.failures.clear()

def set_critical_failure(
self,
error_type: str,
command_sequence: CommandSequence,
exception: Optional[bytes] = None,
) -> None:
"""Record a critical failure that should halt crawling immediately.

Parameters
----------
error_type
One of ``"CriticalChildException"``,
``"ExceedCommandFailureLimit"``, or
``"ExceedLaunchFailureLimit"``.
command_sequence
The command sequence that triggered the failure.
exception
Pickled exception info for ``CriticalChildException``.
"""
status: Dict[str, Any] = {
"ErrorType": error_type,
"CommandSequence": command_sequence,
}
if exception is not None:
status["Exception"] = exception
self.critical_failure = status

@property
def has_critical_failure(self) -> bool:
"""Check if a critical failure has been recorded."""
return self.critical_failure is not None
Comment on lines +109 to +114
Copy link

Copilot AI Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FailureTracker is documented as thread-safe, but set_critical_failure() writes self.critical_failure without holding self.lock, and has_critical_failure reads it without a lock. This can lead to data races where TaskManager._check_failure_status() observes partial or stale state. Guard both the write and the read with with self.lock:, and consider adding a get_critical_failure() method that returns a snapshot under the lock (so callers don’t access critical_failure directly).

Suggested change
self.critical_failure = status
@property
def has_critical_failure(self) -> bool:
"""Check if a critical failure has been recorded."""
return self.critical_failure is not None
with self.lock:
self.critical_failure = status
@property
def has_critical_failure(self) -> bool:
"""Check if a critical failure has been recorded."""
with self.lock:
return self.critical_failure is not None
def get_critical_failure(self) -> Optional[Dict[str, Any]]:
"""Return the current critical failure snapshot, if any, in a thread-safe manner."""
with self.lock:
return self.critical_failure

Copilot uses AI. Check for mistakes.
37 changes: 25 additions & 12 deletions openwpm/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@
from .browser_manager import BrowserManagerHandle
from .command_sequence import CommandSequence
from .errors import CommandExecutionError
from .failure_tracker import FailureTracker
from .js_instrumentation import clean_js_instrumentation_settings
from .mp_logger import MPLogger
from .storage.storage_controller import DataSocket, StorageControllerHandle
from .storage.storage_providers import (
StructuredStorageProvider,
TableName,
UnstructuredStorageProvider,
)
from .types import VisitId
from .utilities.multiprocess_utils import kill_process_and_children
from .utilities.platform_utils import get_configuration_string, get_version
from .utilities.storage_watchdog import StorageLogger
Expand Down Expand Up @@ -101,11 +104,7 @@ def __init__(

# Flow control
self.closing = False
self.failure_status: Optional[Dict[str, Any]] = None
self.threadlock = threading.Lock()
self.failure_count = 0

self.failure_limit = manager_params.failure_limit
self.failure_tracker = FailureTracker(manager_params.failure_limit)
# Start logging server thread
self.logging_server = MPLogger(
self.manager_params.log_path,
Expand Down Expand Up @@ -331,6 +330,18 @@ def _shutdown_manager(
if hasattr(self, "callback_thread"):
self.callback_thread.join()

# CommandExecutionContext protocol implementation

def store_record(
self, table: TableName, visit_id: VisitId, data: Dict[str, Any]
) -> None:
"""Send a record to the StorageController via DataSocket."""
self.sock.store_record(table, visit_id, data)

def finalize_visit_id(self, visit_id: VisitId, success: bool) -> None:
"""Signal that all data for a visit_id has been sent."""
self.sock.finalize_visit_id(visit_id, success)

def _check_failure_status(self) -> None:
"""Check the status of command failures. Raise exceptions as necessary

Expand All @@ -340,25 +351,27 @@ def _check_failure_status(self) -> None:
appropriate steps are taken to gracefully close the infrastructure
"""
self.logger.debug("Checking command failure status indicator...")
if not self.failure_status:
if not self.failure_tracker.has_critical_failure:
return

failure_status = self.failure_tracker.critical_failure
assert failure_status is not None
self.logger.debug("TaskManager failure status set, halting command execution.")
self._shutdown_manager()
Comment on lines +354 to 360
Copy link

Copilot AI Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_check_failure_status() reads has_critical_failure and then immediately reads failure_tracker.critical_failure without synchronization. Even if FailureTracker is fixed to lock internally, this two-step pattern can still race (e.g., critical failure cleared/set between calls). Prefer a single locked accessor on FailureTracker (e.g., get_critical_failure() returning Optional[...]) and branch on that returned value to make the check atomic.

Suggested change
if not self.failure_tracker.has_critical_failure:
return
failure_status = self.failure_tracker.critical_failure
assert failure_status is not None
self.logger.debug("TaskManager failure status set, halting command execution.")
self._shutdown_manager()
failure_status = self.failure_tracker.critical_failure
if failure_status is None:
return
self.logger.debug("TaskManager failure status set, halting command execution.")
self._shutdown_manager()

Copilot uses AI. Check for mistakes.
if self.failure_status["ErrorType"] == "ExceedCommandFailureLimit":
if failure_status["ErrorType"] == "ExceedCommandFailureLimit":
raise CommandExecutionError(
"TaskManager exceeded maximum consecutive command "
"execution failures.",
self.failure_status["CommandSequence"],
failure_status["CommandSequence"],
)
elif self.failure_status["ErrorType"] == "ExceedLaunchFailureLimit":
elif failure_status["ErrorType"] == "ExceedLaunchFailureLimit":
raise CommandExecutionError(
"TaskManager failed to launch browser within allowable "
"failure limit.",
self.failure_status["CommandSequence"],
failure_status["CommandSequence"],
)
if self.failure_status["ErrorType"] == "CriticalChildException":
_, exc, tb = pickle.loads(self.failure_status["Exception"])
if failure_status["ErrorType"] == "CriticalChildException":
_, exc, tb = pickle.loads(failure_status["Exception"])
raise exc.with_traceback(tb)

# CRAWLER COMMAND CODE
Expand Down
Loading