-
Notifications
You must be signed in to change notification settings - Fork 329
refactor: extract CommandExecutionContext to decouple BrowserManagerHandle from TaskManager #1125
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
9bfb0c3
f205ebf
4e905ee
a8fac2b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
| """Protocol for the context needed by BrowserManagerHandle to execute commands. | ||
|
|
||
| This decouples BrowserManagerHandle from the concrete TaskManager class, | ||
| allowing tests to provide lightweight mock implementations. | ||
| """ | ||
|
|
||
| from typing import Any, Dict, Protocol | ||
|
|
||
| from .failure_tracker import FailureTracker | ||
| from .storage.storage_providers import TableName | ||
| from .types import VisitId | ||
|
|
||
|
|
||
| class CommandExecutionContext(Protocol): | ||
| """Interface that BrowserManagerHandle needs from the TaskManager. | ||
|
|
||
| TaskManager implements this protocol. Tests can provide a lightweight | ||
| mock with a real FailureTracker but no storage/browser infrastructure. | ||
| """ | ||
|
|
||
| closing: bool | ||
| failure_tracker: FailureTracker | ||
|
|
||
| def store_record( | ||
| self, table: TableName, visit_id: VisitId, data: Dict[str, Any] | ||
| ) -> None: | ||
| """Send a record to the StorageController.""" | ||
| ... | ||
|
|
||
| def finalize_visit_id(self, visit_id: VisitId, success: bool) -> None: | ||
| """Signal that all data for a visit_id has been sent.""" | ||
| ... |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,114 @@ | ||||||||||||||||||||||||||||||||||||||||
| """Tracks command execution failures and determines when to halt crawling. | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| There are two failure mechanisms: | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| 1. **Consecutive failure tracking** — Each non-ok command status is recorded | ||||||||||||||||||||||||||||||||||||||||
| via ``record_failure``. After a successful command sequence the list is | ||||||||||||||||||||||||||||||||||||||||
| cleared. When the number of recorded failures exceeds ``failure_limit`` | ||||||||||||||||||||||||||||||||||||||||
| the tracker escalates to a critical failure. | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| 2. **Critical failure** — An immediate halt signal set by | ||||||||||||||||||||||||||||||||||||||||
| ``set_critical_failure``. Three situations trigger this: | ||||||||||||||||||||||||||||||||||||||||
| - A browser process sends a CRITICAL status (``CriticalChildException``). | ||||||||||||||||||||||||||||||||||||||||
| - The consecutive failure limit is exceeded (``ExceedCommandFailureLimit``). | ||||||||||||||||||||||||||||||||||||||||
| - A browser fails to relaunch (``ExceedLaunchFailureLimit``). | ||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| import threading | ||||||||||||||||||||||||||||||||||||||||
| from dataclasses import dataclass | ||||||||||||||||||||||||||||||||||||||||
| from typing import Any, Dict, List, Optional | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| from .command_sequence import CommandSequence | ||||||||||||||||||||||||||||||||||||||||
| from .types import BrowserId | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| @dataclass(frozen=True) | ||||||||||||||||||||||||||||||||||||||||
| class CommandFailure: | ||||||||||||||||||||||||||||||||||||||||
| """A single command execution failure. | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| Attributes | ||||||||||||||||||||||||||||||||||||||||
| ---------- | ||||||||||||||||||||||||||||||||||||||||
| browser_id | ||||||||||||||||||||||||||||||||||||||||
| The browser that experienced the failure. | ||||||||||||||||||||||||||||||||||||||||
| command | ||||||||||||||||||||||||||||||||||||||||
| Human-readable representation of the command that failed. | ||||||||||||||||||||||||||||||||||||||||
| command_status | ||||||||||||||||||||||||||||||||||||||||
| The status string reported by the browser process | ||||||||||||||||||||||||||||||||||||||||
| (e.g. ``"timeout"``, ``"error"``, ``"neterror"``). | ||||||||||||||||||||||||||||||||||||||||
| error | ||||||||||||||||||||||||||||||||||||||||
| Optional error message extracted from the browser process. | ||||||||||||||||||||||||||||||||||||||||
| traceback | ||||||||||||||||||||||||||||||||||||||||
| Optional JSON-encoded traceback from the browser process. | ||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| browser_id: BrowserId | ||||||||||||||||||||||||||||||||||||||||
| command: str | ||||||||||||||||||||||||||||||||||||||||
| command_status: str | ||||||||||||||||||||||||||||||||||||||||
| error: Optional[str] = None | ||||||||||||||||||||||||||||||||||||||||
| traceback: Optional[str] = None | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| class FailureTracker: | ||||||||||||||||||||||||||||||||||||||||
| """Thread-safe tracker for command execution failures. | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| Used by ``BrowserManagerHandle`` to report failures without needing a | ||||||||||||||||||||||||||||||||||||||||
| direct reference to ``TaskManager``. The ``TaskManager`` checks | ||||||||||||||||||||||||||||||||||||||||
| ``has_critical_failure`` periodically and shuts down if it is set. | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| The recorded ``failures`` list preserves context across consecutive | ||||||||||||||||||||||||||||||||||||||||
| failures so that common causes can be identified (e.g. all failures | ||||||||||||||||||||||||||||||||||||||||
| from the same browser, or all the same error type). | ||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| def __init__(self, failure_limit: int) -> None: | ||||||||||||||||||||||||||||||||||||||||
| self.failure_limit = failure_limit | ||||||||||||||||||||||||||||||||||||||||
| self.failures: List[CommandFailure] = [] | ||||||||||||||||||||||||||||||||||||||||
| self.critical_failure: Optional[Dict[str, Any]] = None | ||||||||||||||||||||||||||||||||||||||||
| self.lock = threading.Lock() | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| def record_failure(self, failure: CommandFailure) -> bool: | ||||||||||||||||||||||||||||||||||||||||
| """Record a command failure. | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| Returns ``True`` when the number of consecutive failures exceeds | ||||||||||||||||||||||||||||||||||||||||
| ``failure_limit``, indicating that the crawl should be halted. | ||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||
| with self.lock: | ||||||||||||||||||||||||||||||||||||||||
| self.failures.append(failure) | ||||||||||||||||||||||||||||||||||||||||
| return len(self.failures) > self.failure_limit | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| def reset(self) -> None: | ||||||||||||||||||||||||||||||||||||||||
| """Clear recorded failures after a successful command sequence.""" | ||||||||||||||||||||||||||||||||||||||||
| with self.lock: | ||||||||||||||||||||||||||||||||||||||||
| self.failures.clear() | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| def set_critical_failure( | ||||||||||||||||||||||||||||||||||||||||
| self, | ||||||||||||||||||||||||||||||||||||||||
| error_type: str, | ||||||||||||||||||||||||||||||||||||||||
| command_sequence: CommandSequence, | ||||||||||||||||||||||||||||||||||||||||
| exception: Optional[bytes] = None, | ||||||||||||||||||||||||||||||||||||||||
| ) -> None: | ||||||||||||||||||||||||||||||||||||||||
| """Record a critical failure that should halt crawling immediately. | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| Parameters | ||||||||||||||||||||||||||||||||||||||||
| ---------- | ||||||||||||||||||||||||||||||||||||||||
| error_type | ||||||||||||||||||||||||||||||||||||||||
| One of ``"CriticalChildException"``, | ||||||||||||||||||||||||||||||||||||||||
| ``"ExceedCommandFailureLimit"``, or | ||||||||||||||||||||||||||||||||||||||||
| ``"ExceedLaunchFailureLimit"``. | ||||||||||||||||||||||||||||||||||||||||
| command_sequence | ||||||||||||||||||||||||||||||||||||||||
| The command sequence that triggered the failure. | ||||||||||||||||||||||||||||||||||||||||
| exception | ||||||||||||||||||||||||||||||||||||||||
| Pickled exception info for ``CriticalChildException``. | ||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||
| status: Dict[str, Any] = { | ||||||||||||||||||||||||||||||||||||||||
| "ErrorType": error_type, | ||||||||||||||||||||||||||||||||||||||||
| "CommandSequence": command_sequence, | ||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||
| if exception is not None: | ||||||||||||||||||||||||||||||||||||||||
| status["Exception"] = exception | ||||||||||||||||||||||||||||||||||||||||
| self.critical_failure = status | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| @property | ||||||||||||||||||||||||||||||||||||||||
| def has_critical_failure(self) -> bool: | ||||||||||||||||||||||||||||||||||||||||
| """Check if a critical failure has been recorded.""" | ||||||||||||||||||||||||||||||||||||||||
| return self.critical_failure is not None | ||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+109
to
+114
|
||||||||||||||||||||||||||||||||||||||||
| self.critical_failure = status | |
| @property | |
| def has_critical_failure(self) -> bool: | |
| """Check if a critical failure has been recorded.""" | |
| return self.critical_failure is not None | |
| with self.lock: | |
| self.critical_failure = status | |
| @property | |
| def has_critical_failure(self) -> bool: | |
| """Check if a critical failure has been recorded.""" | |
| with self.lock: | |
| return self.critical_failure is not None | |
| def get_critical_failure(self) -> Optional[Dict[str, Any]]: | |
| """Return the current critical failure snapshot, if any, in a thread-safe manner.""" | |
| with self.lock: | |
| return self.critical_failure |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -21,13 +21,16 @@ | |||||||||||||||||||||||||||
| from .browser_manager import BrowserManagerHandle | ||||||||||||||||||||||||||||
| from .command_sequence import CommandSequence | ||||||||||||||||||||||||||||
| from .errors import CommandExecutionError | ||||||||||||||||||||||||||||
| from .failure_tracker import FailureTracker | ||||||||||||||||||||||||||||
| from .js_instrumentation import clean_js_instrumentation_settings | ||||||||||||||||||||||||||||
| from .mp_logger import MPLogger | ||||||||||||||||||||||||||||
| from .storage.storage_controller import DataSocket, StorageControllerHandle | ||||||||||||||||||||||||||||
| from .storage.storage_providers import ( | ||||||||||||||||||||||||||||
| StructuredStorageProvider, | ||||||||||||||||||||||||||||
| TableName, | ||||||||||||||||||||||||||||
| UnstructuredStorageProvider, | ||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||
| from .types import VisitId | ||||||||||||||||||||||||||||
| from .utilities.multiprocess_utils import kill_process_and_children | ||||||||||||||||||||||||||||
| from .utilities.platform_utils import get_configuration_string, get_version | ||||||||||||||||||||||||||||
| from .utilities.storage_watchdog import StorageLogger | ||||||||||||||||||||||||||||
|
|
@@ -101,11 +104,7 @@ def __init__( | |||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| # Flow control | ||||||||||||||||||||||||||||
| self.closing = False | ||||||||||||||||||||||||||||
| self.failure_status: Optional[Dict[str, Any]] = None | ||||||||||||||||||||||||||||
| self.threadlock = threading.Lock() | ||||||||||||||||||||||||||||
| self.failure_count = 0 | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| self.failure_limit = manager_params.failure_limit | ||||||||||||||||||||||||||||
| self.failure_tracker = FailureTracker(manager_params.failure_limit) | ||||||||||||||||||||||||||||
| # Start logging server thread | ||||||||||||||||||||||||||||
| self.logging_server = MPLogger( | ||||||||||||||||||||||||||||
| self.manager_params.log_path, | ||||||||||||||||||||||||||||
|
|
@@ -331,6 +330,18 @@ def _shutdown_manager( | |||||||||||||||||||||||||||
| if hasattr(self, "callback_thread"): | ||||||||||||||||||||||||||||
| self.callback_thread.join() | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| # CommandExecutionContext protocol implementation | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| def store_record( | ||||||||||||||||||||||||||||
| self, table: TableName, visit_id: VisitId, data: Dict[str, Any] | ||||||||||||||||||||||||||||
| ) -> None: | ||||||||||||||||||||||||||||
| """Send a record to the StorageController via DataSocket.""" | ||||||||||||||||||||||||||||
| self.sock.store_record(table, visit_id, data) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| def finalize_visit_id(self, visit_id: VisitId, success: bool) -> None: | ||||||||||||||||||||||||||||
| """Signal that all data for a visit_id has been sent.""" | ||||||||||||||||||||||||||||
| self.sock.finalize_visit_id(visit_id, success) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| def _check_failure_status(self) -> None: | ||||||||||||||||||||||||||||
| """Check the status of command failures. Raise exceptions as necessary | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
|
|
@@ -340,25 +351,27 @@ def _check_failure_status(self) -> None: | |||||||||||||||||||||||||||
| appropriate steps are taken to gracefully close the infrastructure | ||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||
| self.logger.debug("Checking command failure status indicator...") | ||||||||||||||||||||||||||||
| if not self.failure_status: | ||||||||||||||||||||||||||||
| if not self.failure_tracker.has_critical_failure: | ||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| failure_status = self.failure_tracker.critical_failure | ||||||||||||||||||||||||||||
| assert failure_status is not None | ||||||||||||||||||||||||||||
| self.logger.debug("TaskManager failure status set, halting command execution.") | ||||||||||||||||||||||||||||
| self._shutdown_manager() | ||||||||||||||||||||||||||||
|
Comment on lines
+354
to
360
|
||||||||||||||||||||||||||||
| if not self.failure_tracker.has_critical_failure: | |
| return | |
| failure_status = self.failure_tracker.critical_failure | |
| assert failure_status is not None | |
| self.logger.debug("TaskManager failure status set, halting command execution.") | |
| self._shutdown_manager() | |
| failure_status = self.failure_tracker.critical_failure | |
| if failure_status is None: | |
| return | |
| self.logger.debug("TaskManager failure status set, halting command execution.") | |
| self._shutdown_manager() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Representing
critical_failureasDict[str, Any]makes the API easy to misuse (stringly-typed keys like\"ErrorType\", optional\"Exception\") and leaks internal structure to consumers. Since you already introduced theCommandFailuredataclass, consider introducing a small@dataclass/TypedDictfor the critical failure payload (and possibly anEnumfor the error type) soTaskManager._check_failure_status()can rely on typed fields instead of ad-hoc dict keys.