Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 43 additions & 9 deletions src/aiousbwatcher/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@

_PATH = "/dev/bus/usb"

# How long to wait before restarting the watcher after an unexpected
# OSError. USB hotplug churns /dev/bus/usb constantly, so a transient
# failure should self-heal rather than permanently stop the watcher.
_AUTO_RECOVER_TIME = 5

_LOGGER = logging.getLogger(__name__)


Expand Down Expand Up @@ -69,30 +74,59 @@ def _async_stop(self) -> None:
self._task.cancel()
self._task = None

def _add_watches(
self, inotify: Inotify, mask: Mask, directories: list[Path]
) -> None:
"""Add a watch for each directory, skipping any that have vanished."""
# USB hotplug races mean a directory discovered by the recursive walk
# can disappear before we get to watch it. Skip those rather than
# letting the whole watcher crash.
for directory in directories:
try:
inotify.add_watch(directory, mask)
except OSError as ex:
_LOGGER.debug("Could not watch %s: %s", directory, ex)

async def _watcher(self) -> None:
"""Run the watcher, auto-recovering from transient OS errors."""
while True:
try:
await self._run_watcher()
except asyncio.CancelledError:
raise
except OSError as ex:
_LOGGER.warning(
"USB watcher stopped unexpectedly (%s); restarting in %s seconds",
ex,
_AUTO_RECOVER_TIME,
)
await asyncio.sleep(_AUTO_RECOVER_TIME)

async def _run_watcher(self) -> None:
mask = (
Mask.CREATE
| Mask.MOVED_FROM
| Mask.MOVED_TO
| Mask.CREATE
| Mask.DELETE_SELF
| Mask.DELETE
| Mask.IGNORED
)

with Inotify() as inotify:
for directory in await _async_get_directories_recursive(
self._loop, self._path
):
inotify.add_watch(directory, mask)
self._add_watches(
inotify,
mask,
await _async_get_directories_recursive(self._loop, self._path),
)

async for event in inotify:
# Add subdirectories to watch if a new directory is added.
if Mask.CREATE in event.mask and event.path is not None:
for directory in await _async_get_directories_recursive(
self._loop, event.path
):
inotify.add_watch(directory, mask)
self._add_watches(
inotify,
mask,
await _async_get_directories_recursive(self._loop, event.path),
)

# If there is at least some overlap, assume the user wants this event.
if event.mask & mask:
Expand Down
86 changes: 85 additions & 1 deletion tests/test_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import pytest

from aiousbwatcher import AIOUSBWatcher, InotifyNotAvailableError
from aiousbwatcher import AIOUSBWatcher, InotifyNotAvailableError, impl

_INOTIFY_WAIT_TIME = 0.2

Expand Down Expand Up @@ -80,6 +80,90 @@ def broken_callback() -> None:
assert not called


@pytest.mark.asyncio
@pytest.mark.skipif(
platform != "linux", reason="Inotify not available on this platform"
)
async def test_aiousbwatcher_recovers_from_oserror(tmp_path: Path) -> None:
"""A transient OSError in the watch loop must not kill the watcher."""
called: bool = False

def callback() -> None:
nonlocal called
called = True

attempts: int = 0
real_run_watcher = AIOUSBWatcher._run_watcher

async def flaky_run_watcher(self: AIOUSBWatcher) -> None:
nonlocal attempts
attempts += 1
if attempts == 1:
raise OSError("transient inotify failure")
await real_run_watcher(self)

with (
patch("aiousbwatcher.impl._PATH", str(tmp_path)),
patch.object(impl, "_AUTO_RECOVER_TIME", 0),
patch.object(AIOUSBWatcher, "_run_watcher", flaky_run_watcher),
):
watcher = AIOUSBWatcher()
watcher.async_register_callback(callback)
stop = watcher.async_start()
# First run raises OSError, watcher sleeps (0s) then restarts.
await asyncio.sleep(_INOTIFY_WAIT_TIME)
assert attempts >= 2
assert not called
(tmp_path / "test").touch()
await asyncio.sleep(_INOTIFY_WAIT_TIME)
assert called
stop()


@pytest.mark.asyncio
@pytest.mark.skipif(
platform != "linux", reason="Inotify not available on this platform"
)
async def test_aiousbwatcher_skips_unwatchable_directory(tmp_path: Path) -> None:
"""A directory that vanishes before add_watch must not crash the watcher."""
called: bool = False

def callback() -> None:
nonlocal called
called = True

from asyncinotify import Inotify

real_add_watch = Inotify.add_watch

def flaky_add_watch(self, directory, mask): # type: ignore[no-untyped-def]
# Simulate a hotplug race: a freshly created subdir has already vanished.
if Path(directory).name == "ghost":
raise FileNotFoundError("directory vanished")
return real_add_watch(self, directory, mask)

with (
patch("aiousbwatcher.impl._PATH", str(tmp_path)),
patch.object(Inotify, "add_watch", flaky_add_watch),
):
watcher = AIOUSBWatcher()
watcher.async_register_callback(callback)
stop = watcher.async_start()
await asyncio.sleep(_INOTIFY_WAIT_TIME)
assert not called
# Creating "ghost" triggers a CREATE on the watched root (callback
# fires) and then a failing add_watch on the new subdir (swallowed).
(tmp_path / "ghost").mkdir()
await asyncio.sleep(_INOTIFY_WAIT_TIME)
assert called
called = False # type: ignore[unreachable]
# The watcher must still be alive and processing root-level events.
(tmp_path / "after").touch()
await asyncio.sleep(_INOTIFY_WAIT_TIME)
assert called
stop()


@pytest.mark.asyncio
@pytest.mark.skipif(
platform != "linux", reason="Inotify not available on this platform"
Expand Down
Loading