refactor: extract StorageInterface protocol and add InProcessStorageControllerHandle#1126
refactor: extract StorageInterface protocol and add InProcessStorageControllerHandle#1126vringar wants to merge 3 commits intorefactor/extract-command-contextfrom
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## refactor/extract-command-context #1126 +/- ##
====================================================================
- Coverage 56.93% 56.78% -0.16%
====================================================================
Files 42 44 +2
Lines 3957 4040 +83
====================================================================
+ Hits 2253 2294 +41
- Misses 1704 1746 +42 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
6b6daf0 to
536eb98
Compare
aa79ddd to
d333dbf
Compare
536eb98 to
1be210b
Compare
d333dbf to
f286084
Compare
971a1b8 to
27999da
Compare
1be210b to
07e5291
Compare
There was a problem hiding this comment.
Pull request overview
Refactors storage controller usage by extracting a StorageInterface protocol and adding an in-process storage controller handle to reduce test overhead while keeping the TCP protocol unchanged.
Changes:
- Added
StorageInterfaceprotocol to decouple TaskManager from concrete controller handles - Introduced
InProcessStorageControllerHandlethat runs the storage asyncio loop in a daemon thread - Updated storage controller tests to parametrically run against subprocess and in-process handles
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 8 comments.
| File | Description |
|---|---|
| test/storage/test_storage_controller.py | Parametrizes tests to run with both subprocess and in-process controller handles |
| openwpm/storage/storage_interface.py | Adds a protocol describing the storage controller handle surface used by upstream code |
| openwpm/storage/in_process_storage.py | Adds an in-process (thread-based) storage controller handle for faster tests |
Comments suppressed due to low confidence (1)
openwpm/storage/storage_interface.py:25
- This returns
List[Tuple[int, bool]]even though the interface otherwise uses strong ID types (VisitId,BrowserId). For consistency and better type safety, consider changing this toList[Tuple[VisitId, bool]](and updating implementations accordingly).
def get_new_completed_visits(self) -> List[Tuple[int, bool]]: ...
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| return VisitId(random.getrandbits(53)) | ||
|
|
||
| def get_next_browser_id(self) -> BrowserId: | ||
| """Generate crawl id as randomly generated positive 32bit integer.""" | ||
| return BrowserId(random.getrandbits(32)) |
There was a problem hiding this comment.
random.getrandbits() can return 0, which contradicts the docstring ('positive integer') and may collide with sentinel/invalid IDs if 0 is treated specially elsewhere. Generate IDs in a strictly-positive range (e.g., loop until non-zero, or add an offset while preserving the intended bit bounds).
| return VisitId(random.getrandbits(53)) | |
| def get_next_browser_id(self) -> BrowserId: | |
| """Generate crawl id as randomly generated positive 32bit integer.""" | |
| return BrowserId(random.getrandbits(32)) | |
| visit_id = 0 | |
| while visit_id == 0: | |
| visit_id = random.getrandbits(53) | |
| return VisitId(visit_id) | |
| def get_next_browser_id(self) -> BrowserId: | |
| """Generate crawl id as randomly generated positive 32bit integer.""" | |
| browser_id = 0 | |
| while browser_id == 0: | |
| browser_id = random.getrandbits(32) | |
| return BrowserId(browser_id) |
| import time | ||
| from typing import List, Optional, Tuple | ||
|
|
||
| from multiprocess import Queue |
There was a problem hiding this comment.
This handle runs entirely in-process (thread-based), but it uses multiprocess.Queue. multiprocessing/multiprocess queues have semantics optimized for IPC and their .empty()/.qsize() behavior can be unreliable; they can also add unnecessary overhead. Prefer queue.Queue/queue.SimpleQueue for thread communication here (and update the drain logic accordingly).
| def get_new_completed_visits(self) -> List[Tuple[int, bool]]: | ||
| """Return visit ids completed since last call.""" | ||
| finished_visit_ids = list() | ||
| while not self.completion_queue.empty(): | ||
| finished_visit_ids.append(self.completion_queue.get()) | ||
| return finished_visit_ids |
There was a problem hiding this comment.
Draining a queue using while not queue.empty(): queue.get() is race-prone (items can arrive between empty() and get()) and .empty() is not reliable for multiprocessing-style queues. Use non-blocking gets in a try/except loop (e.g., get_nowait) until empty is raised, which is correct for both thread queues and process queues.
| def launch(self) -> None: | ||
| """Start the storage controller in a daemon thread.""" | ||
| self._thread = threading.Thread( | ||
| target=self._run_loop, name="InProcessStorageController", daemon=True | ||
| ) | ||
| self._thread.start() | ||
| # Wait for the listener address from the status queue | ||
| self.listener_address = self.status_queue.get() |
There was a problem hiding this comment.
self.status_queue.get() blocks indefinitely if the controller fails to start or dies before publishing the listener address, which can hang tests/CI. Use a bounded timeout here and raise a clear error if the address doesn't arrive (and consider surfacing thread exceptions via a shared variable/queue).
| def _run_loop(self) -> None: | ||
| """Run the storage controller's asyncio loop in this thread.""" | ||
| logging.getLogger("asyncio").setLevel(logging.WARNING) | ||
| asyncio.run(self._storage_controller._run(), debug=True) |
There was a problem hiding this comment.
This introduces global side effects for the whole process by changing the asyncio logger level, and it always enables asyncio debug mode (debug=True) which can significantly slow down tests and change behavior. Consider avoiding global logger mutation (or scoping it) and making debug configurable/defaulting to False.
| def get_most_recent_status(self) -> int: | ||
| """Return the most recent queue size.""" | ||
| if self._last_status is None: | ||
| return self.get_status() | ||
|
|
||
| while not self.status_queue.empty(): | ||
| self._last_status = self.status_queue.get() | ||
| self._last_status_received = time.time() | ||
|
|
||
| if self._last_status_received is not None and ( | ||
| time.time() - self._last_status_received | ||
| ) > 120: | ||
| raise RuntimeError( | ||
| "No status update from the storage controller " | ||
| "for %d seconds." % (time.time() - self._last_status_received) | ||
| ) | ||
|
|
||
| return self._last_status |
There was a problem hiding this comment.
get_most_recent_status() is annotated to return int, but it can return a non-int if any non-status message ends up on status_queue (there is no type check here, unlike get_status()). Also, the queue-drain loop uses .empty() which is race-prone/unreliable. Mirror get_status()'s validation (assert/raise if non-int) and use a get_nowait drain pattern.
| self.logger.debug( | ||
| "%s took %s seconds to close." | ||
| % (type(self).__name__, str(time.time() - start_time)) | ||
| ) |
There was a problem hiding this comment.
After join(timeout=60), the code doesn’t check whether the thread actually stopped. This can silently leak a daemon thread and cause later tests to behave unpredictably. Check self._thread.is_alive() after join and raise/handle it (and consider a longer timeout or a deterministic shutdown acknowledgement from the controller).
| self.logger.debug( | |
| "%s took %s seconds to close." | |
| % (type(self).__name__, str(time.time() - start_time)) | |
| ) | |
| elapsed = time.time() - start_time | |
| self.logger.debug( | |
| "%s took %s seconds to close." | |
| % (type(self).__name__, str(elapsed)) | |
| ) | |
| if self._thread.is_alive(): | |
| msg = ( | |
| "InProcessStorageController thread failed to shut down " | |
| "within the 60-second timeout." | |
| ) | |
| self.logger.error(msg) | |
| raise RuntimeError(msg) |
| def save_configuration(self, *args, **kwargs) -> None: | ||
| """Save configuration - delegates to a DataSocket like StorageControllerHandle.""" | ||
| from .storage_controller import DataSocket, INVALID_VISIT_ID | ||
| from ..config import BrowserParamsInternal, ManagerParamsInternal | ||
| from .storage_providers import TableName | ||
|
|
||
| assert self.listener_address is not None | ||
| manager_params: ManagerParamsInternal = args[0] | ||
| browser_params: List[BrowserParamsInternal] = args[1] | ||
| openwpm_version: str = args[2] | ||
| browser_version: str = args[3] |
There was a problem hiding this comment.
Using *args/**kwargs and positional indexing makes this API easy to misuse and harder to type-check (e.g., callers passing keywords or refactoring argument order). Prefer an explicit signature matching the production handle’s save_configuration(...) parameters (including typing), and avoid relying on args[n].
| def save_configuration(self, *args, **kwargs) -> None: | |
| """Save configuration - delegates to a DataSocket like StorageControllerHandle.""" | |
| from .storage_controller import DataSocket, INVALID_VISIT_ID | |
| from ..config import BrowserParamsInternal, ManagerParamsInternal | |
| from .storage_providers import TableName | |
| assert self.listener_address is not None | |
| manager_params: ManagerParamsInternal = args[0] | |
| browser_params: List[BrowserParamsInternal] = args[1] | |
| openwpm_version: str = args[2] | |
| browser_version: str = args[3] | |
| def save_configuration( | |
| self, | |
| manager_params: "ManagerParamsInternal", | |
| browser_params: List["BrowserParamsInternal"], | |
| openwpm_version: str, | |
| browser_version: str, | |
| ) -> None: | |
| """Save configuration - delegates to a DataSocket like StorageControllerHandle.""" | |
| from .storage_controller import DataSocket, INVALID_VISIT_ID | |
| from ..config import BrowserParamsInternal, ManagerParamsInternal | |
| from .storage_providers import TableName | |
| assert self.listener_address is not None |
Summary
StorageInterfaceprotocol fromStorageControllerHandleInProcessStorageControllerHandlethat runs the StorageController asyncio loop in a daemon thread instead of a subprocess, eliminating process spawn overhead for testsThe extension connects to it identically (same TCP protocol). Production code is unchanged.
Stacked on #1125.