diff --git a/README.md b/README.md index a86dc38..87f663c 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,57 @@ Install this via pip (or your favourite package manager): `pip install aiousbwatcher` +## Usage + +```python +import asyncio + +from aiousbwatcher import AIOUSBWatcher, InotifyNotAvailableError + + +async def main() -> None: + def _callback() -> None: + # A USB device was plugged in or unplugged; rescan as needed. + print("USB devices changed") + + watcher = AIOUSBWatcher() + unregister = watcher.async_register_callback(_callback) + + try: + stop = watcher.async_start() + except InotifyNotAvailableError: + # inotify is only available on Linux. + return + + # ... run your application ... + await asyncio.sleep(60) + + unregister() + stop() + + +asyncio.run(main()) +``` + +`async_register_callback` returns a callable that unregisters that callback, and +`async_start` returns a callable that stops the watcher. Callbacks take no +arguments — they signal _that_ something changed, not _what_; rescan your +devices to find the details. + +### Debouncing event bursts + +Plugging in a single USB device churns `/dev/bus/usb` with several events, so a +naive callback fires multiple times per physical change. If your callback does +expensive work (such as a full device rescan), pass `debounce` to coalesce a +burst into a single invocation that fires once events have been quiet for the +given number of seconds: + +```python +watcher = AIOUSBWatcher(debounce=0.5) +``` + +With `debounce=None` (the default) every event fires the callbacks immediately. + ## Contributors ✨ Thanks goes to these wonderful people ([emoji key](https://allcontributors.org/docs/en/emoji-key)): diff --git a/src/aiousbwatcher/impl.py b/src/aiousbwatcher/impl.py index 6571bfa..6267fea 100644 --- a/src/aiousbwatcher/impl.py +++ b/src/aiousbwatcher/impl.py @@ -39,11 +39,23 @@ async def _async_get_directories_recursive( class AIOUSBWatcher: """A watcher for USB devices that uses asyncio.""" - def __init__(self) -> None: + def __init__(self, debounce: float | None = None) -> None: + """ + Initialize the watcher. + + ``debounce``, when set to a number of seconds, coalesces a burst of + filesystem events into a single callback invocation that fires once the + events have been quiet for that long. Plugging in a single USB device + churns ``/dev/bus/usb`` with several events, so consumers that perform + an expensive rescan per callback can use this to rescan only once. + When ``None`` (the default) every event fires the callbacks immediately. + """ self._path = Path(_PATH) self._loop = asyncio.get_running_loop() self._task: asyncio.Task[None] | None = None self._callbacks: set[Callable[[], None]] = set() + self._debounce = debounce + self._debounce_handle: asyncio.TimerHandle | None = None def async_start(self) -> Callable[[], None]: """Start the watcher.""" @@ -68,6 +80,10 @@ def _async_stop(self) -> None: assert self._task is not None # noqa self._task.cancel() self._task = None + if self._debounce_handle is not None: + # Drop any pending coalesced callback so it cannot fire after stop. + self._debounce_handle.cancel() + self._debounce_handle = None async def _watcher(self) -> None: mask = ( @@ -102,6 +118,19 @@ def _async_unregister_callback(self, callback: Callable[[], None]) -> None: self._callbacks.remove(callback) def _async_call_callbacks(self) -> None: + if self._debounce is None: + self._async_fire_callbacks() + return + # Coalesce a burst of events: (re)arm a single timer so the callbacks + # fire once the events have been quiet for ``self._debounce`` seconds. + if self._debounce_handle is not None: + self._debounce_handle.cancel() + self._debounce_handle = self._loop.call_later( + self._debounce, self._async_fire_callbacks + ) + + def _async_fire_callbacks(self) -> None: + self._debounce_handle = None for callback in self._callbacks: try: callback() diff --git a/tests/test_impl.py b/tests/test_impl.py index 8a0bc6c..d1eb55b 100644 --- a/tests/test_impl.py +++ b/tests/test_impl.py @@ -138,3 +138,63 @@ def callback() -> None: stop() await asyncio.sleep(_INOTIFY_WAIT_TIME) assert not called + + +_DEBOUNCE_TIME = 0.5 + + +@pytest.mark.asyncio +@pytest.mark.skipif( + platform != "linux", reason="Inotify not available on this platform" +) +async def test_aiousbwatcher_debounce_coalesces_bursts(tmp_path: Path) -> None: + count: int = 0 + + def callback() -> None: + nonlocal count + count += 1 + + with patch("aiousbwatcher.impl._PATH", str(tmp_path)): + watcher = AIOUSBWatcher(debounce=_DEBOUNCE_TIME) + unregister = watcher.async_register_callback(callback) + stop = watcher.async_start() + await asyncio.sleep(_INOTIFY_WAIT_TIME) + assert count == 0 + # A burst of events within the debounce window must not fire yet. + for i in range(3): + (tmp_path / f"test{i}").touch() + await asyncio.sleep(_INOTIFY_WAIT_TIME) + assert count == 0 + # Once the events go quiet for the debounce window, fire exactly once. + await asyncio.sleep(_DEBOUNCE_TIME) + assert count == 1 + unregister() + stop() + await asyncio.sleep(_DEBOUNCE_TIME + _INOTIFY_WAIT_TIME) + assert count == 1 + + +@pytest.mark.asyncio +@pytest.mark.skipif( + platform != "linux", reason="Inotify not available on this platform" +) +async def test_aiousbwatcher_debounce_cancelled_on_stop(tmp_path: Path) -> None: + count: int = 0 + + def callback() -> None: + nonlocal count + count += 1 + + with patch("aiousbwatcher.impl._PATH", str(tmp_path)): + watcher = AIOUSBWatcher(debounce=_DEBOUNCE_TIME) + unregister = watcher.async_register_callback(callback) + stop = watcher.async_start() + await asyncio.sleep(_INOTIFY_WAIT_TIME) + (tmp_path / "test").touch() + await asyncio.sleep(_INOTIFY_WAIT_TIME) + # Stopping while a debounced callback is still pending must drop it. + assert count == 0 + unregister() + stop() + await asyncio.sleep(_DEBOUNCE_TIME + _INOTIFY_WAIT_TIME) + assert count == 0