Skip to content

refactor: extract StorageInterface protocol and add InProcessStorageControllerHandle#1126

Open
vringar wants to merge 3 commits intorefactor/extract-command-contextfrom
refactor/storage-interface
Open

refactor: extract StorageInterface protocol and add InProcessStorageControllerHandle#1126
vringar wants to merge 3 commits intorefactor/extract-command-contextfrom
refactor/storage-interface

Conversation

@vringar
Copy link
Contributor

@vringar vringar commented Feb 19, 2026

Summary

  • Extract StorageInterface protocol from StorageControllerHandle
  • Add InProcessStorageControllerHandle that runs the StorageController asyncio loop in a daemon thread instead of a subprocess, eliminating process spawn overhead for tests

The extension connects to it identically (same TCP protocol). Production code is unchanged.

Stacked on #1125.

@codecov
Copy link

codecov bot commented Feb 19, 2026

Codecov Report

❌ Patch coverage is 46.98795% with 44 lines in your changes missing coverage. Please review.
✅ Project coverage is 56.78%. Comparing base (07e5291) to head (27999da).

Files with missing lines Patch % Lines
openwpm/storage/in_process_storage.py 49.36% 40 Missing ⚠️
openwpm/storage/storage_interface.py 0.00% 4 Missing ⚠️
Additional details and impacted files
@@                         Coverage Diff                          @@
##           refactor/extract-command-context    #1126      +/-   ##
====================================================================
- Coverage                             56.93%   56.78%   -0.16%     
====================================================================
  Files                                    42       44       +2     
  Lines                                  3957     4040      +83     
====================================================================
+ Hits                                   2253     2294      +41     
- Misses                                 1704     1746      +42     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@vringar vringar force-pushed the refactor/extract-command-context branch from 6b6daf0 to 536eb98 Compare February 20, 2026 12:17
@vringar vringar force-pushed the refactor/storage-interface branch from aa79ddd to d333dbf Compare February 20, 2026 12:17
@vringar vringar changed the base branch from refactor/extract-command-context to master February 20, 2026 12:21
@vringar vringar changed the base branch from master to refactor/extract-command-context February 20, 2026 12:24
@vringar vringar force-pushed the refactor/extract-command-context branch from 536eb98 to 1be210b Compare February 20, 2026 12:41
@vringar vringar force-pushed the refactor/storage-interface branch from d333dbf to f286084 Compare February 20, 2026 12:41
Copilot AI review requested due to automatic review settings February 26, 2026 23:47
@vringar vringar force-pushed the refactor/storage-interface branch from 971a1b8 to 27999da Compare February 26, 2026 23:47
@vringar vringar force-pushed the refactor/extract-command-context branch from 1be210b to 07e5291 Compare February 26, 2026 23:47
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Refactors storage controller usage by extracting a StorageInterface protocol and adding an in-process storage controller handle to reduce test overhead while keeping the TCP protocol unchanged.

Changes:

  • Added StorageInterface protocol to decouple TaskManager from concrete controller handles
  • Introduced InProcessStorageControllerHandle that runs the storage asyncio loop in a daemon thread
  • Updated storage controller tests to parametrically run against subprocess and in-process handles

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 8 comments.

File Description
test/storage/test_storage_controller.py Parametrizes tests to run with both subprocess and in-process controller handles
openwpm/storage/storage_interface.py Adds a protocol describing the storage controller handle surface used by upstream code
openwpm/storage/in_process_storage.py Adds an in-process (thread-based) storage controller handle for faster tests
Comments suppressed due to low confidence (1)

openwpm/storage/storage_interface.py:25

  • This returns List[Tuple[int, bool]] even though the interface otherwise uses strong ID types (VisitId, BrowserId). For consistency and better type safety, consider changing this to List[Tuple[VisitId, bool]] (and updating implementations accordingly).
    def get_new_completed_visits(self) -> List[Tuple[int, bool]]: ...

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +54 to +58
return VisitId(random.getrandbits(53))

def get_next_browser_id(self) -> BrowserId:
"""Generate crawl id as randomly generated positive 32bit integer."""
return BrowserId(random.getrandbits(32))
Copy link

Copilot AI Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

random.getrandbits() can return 0, which contradicts the docstring ('positive integer') and may collide with sentinel/invalid IDs if 0 is treated specially elsewhere. Generate IDs in a strictly-positive range (e.g., loop until non-zero, or add an offset while preserving the intended bit bounds).

Suggested change
return VisitId(random.getrandbits(53))
def get_next_browser_id(self) -> BrowserId:
"""Generate crawl id as randomly generated positive 32bit integer."""
return BrowserId(random.getrandbits(32))
visit_id = 0
while visit_id == 0:
visit_id = random.getrandbits(53)
return VisitId(visit_id)
def get_next_browser_id(self) -> BrowserId:
"""Generate crawl id as randomly generated positive 32bit integer."""
browser_id = 0
while browser_id == 0:
browser_id = random.getrandbits(32)
return BrowserId(browser_id)

Copilot uses AI. Check for mistakes.
import time
from typing import List, Optional, Tuple

from multiprocess import Queue
Copy link

Copilot AI Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This handle runs entirely in-process (thread-based), but it uses multiprocess.Queue. multiprocessing/multiprocess queues have semantics optimized for IPC and their .empty()/.qsize() behavior can be unreliable; they can also add unnecessary overhead. Prefer queue.Queue/queue.SimpleQueue for thread communication here (and update the drain logic accordingly).

Copilot uses AI. Check for mistakes.
Comment on lines +74 to +79
def get_new_completed_visits(self) -> List[Tuple[int, bool]]:
"""Return visit ids completed since last call."""
finished_visit_ids = list()
while not self.completion_queue.empty():
finished_visit_ids.append(self.completion_queue.get())
return finished_visit_ids
Copy link

Copilot AI Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Draining a queue using while not queue.empty(): queue.get() is race-prone (items can arrive between empty() and get()) and .empty() is not reliable for multiprocessing-style queues. Use non-blocking gets in a try/except loop (e.g., get_nowait) until empty is raised, which is correct for both thread queues and process queues.

Copilot uses AI. Check for mistakes.
Comment on lines +65 to +72
def launch(self) -> None:
"""Start the storage controller in a daemon thread."""
self._thread = threading.Thread(
target=self._run_loop, name="InProcessStorageController", daemon=True
)
self._thread.start()
# Wait for the listener address from the status queue
self.listener_address = self.status_queue.get()
Copy link

Copilot AI Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.status_queue.get() blocks indefinitely if the controller fails to start or dies before publishing the listener address, which can hang tests/CI. Use a bounded timeout here and raise a clear error if the address doesn't arrive (and consider surfacing thread exceptions via a shared variable/queue).

Copilot uses AI. Check for mistakes.
Comment on lines +60 to +63
def _run_loop(self) -> None:
"""Run the storage controller's asyncio loop in this thread."""
logging.getLogger("asyncio").setLevel(logging.WARNING)
asyncio.run(self._storage_controller._run(), debug=True)
Copy link

Copilot AI Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This introduces global side effects for the whole process by changing the asyncio logger level, and it always enables asyncio debug mode (debug=True) which can significantly slow down tests and change behavior. Consider avoiding global logger mutation (or scoping it) and making debug configurable/defaulting to False.

Copilot uses AI. Check for mistakes.
Comment on lines +93 to +110
def get_most_recent_status(self) -> int:
"""Return the most recent queue size."""
if self._last_status is None:
return self.get_status()

while not self.status_queue.empty():
self._last_status = self.status_queue.get()
self._last_status_received = time.time()

if self._last_status_received is not None and (
time.time() - self._last_status_received
) > 120:
raise RuntimeError(
"No status update from the storage controller "
"for %d seconds." % (time.time() - self._last_status_received)
)

return self._last_status
Copy link

Copilot AI Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_most_recent_status() is annotated to return int, but it can return a non-int if any non-status message ends up on status_queue (there is no type check here, unlike get_status()). Also, the queue-drain loop uses .empty() which is race-prone/unreliable. Mirror get_status()'s validation (assert/raise if non-int) and use a get_nowait drain pattern.

Copilot uses AI. Check for mistakes.
Comment on lines +88 to +91
self.logger.debug(
"%s took %s seconds to close."
% (type(self).__name__, str(time.time() - start_time))
)
Copy link

Copilot AI Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After join(timeout=60), the code doesn’t check whether the thread actually stopped. This can silently leak a daemon thread and cause later tests to behave unpredictably. Check self._thread.is_alive() after join and raise/handle it (and consider a longer timeout or a deterministic shutdown acknowledgement from the controller).

Suggested change
self.logger.debug(
"%s took %s seconds to close."
% (type(self).__name__, str(time.time() - start_time))
)
elapsed = time.time() - start_time
self.logger.debug(
"%s took %s seconds to close."
% (type(self).__name__, str(elapsed))
)
if self._thread.is_alive():
msg = (
"InProcessStorageController thread failed to shut down "
"within the 60-second timeout."
)
self.logger.error(msg)
raise RuntimeError(msg)

Copilot uses AI. Check for mistakes.
Comment on lines +128 to +138
def save_configuration(self, *args, **kwargs) -> None:
"""Save configuration - delegates to a DataSocket like StorageControllerHandle."""
from .storage_controller import DataSocket, INVALID_VISIT_ID
from ..config import BrowserParamsInternal, ManagerParamsInternal
from .storage_providers import TableName

assert self.listener_address is not None
manager_params: ManagerParamsInternal = args[0]
browser_params: List[BrowserParamsInternal] = args[1]
openwpm_version: str = args[2]
browser_version: str = args[3]
Copy link

Copilot AI Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using *args/**kwargs and positional indexing makes this API easy to misuse and harder to type-check (e.g., callers passing keywords or refactoring argument order). Prefer an explicit signature matching the production handle’s save_configuration(...) parameters (including typing), and avoid relying on args[n].

Suggested change
def save_configuration(self, *args, **kwargs) -> None:
"""Save configuration - delegates to a DataSocket like StorageControllerHandle."""
from .storage_controller import DataSocket, INVALID_VISIT_ID
from ..config import BrowserParamsInternal, ManagerParamsInternal
from .storage_providers import TableName
assert self.listener_address is not None
manager_params: ManagerParamsInternal = args[0]
browser_params: List[BrowserParamsInternal] = args[1]
openwpm_version: str = args[2]
browser_version: str = args[3]
def save_configuration(
self,
manager_params: "ManagerParamsInternal",
browser_params: List["BrowserParamsInternal"],
openwpm_version: str,
browser_version: str,
) -> None:
"""Save configuration - delegates to a DataSocket like StorageControllerHandle."""
from .storage_controller import DataSocket, INVALID_VISIT_ID
from ..config import BrowserParamsInternal, ManagerParamsInternal
from .storage_providers import TableName
assert self.listener_address is not None

Copilot uses AI. Check for mistakes.
@vringar vringar added the backlog Not a priorty, but nice to have label Feb 27, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backlog Not a priorty, but nice to have

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants