From bbe767b43cc51a1bd39acdaa0bd60aff26993044 Mon Sep 17 00:00:00 2001 From: Bluetooth Devices Bot Date: Fri, 5 Jun 2026 06:13:03 +0000 Subject: [PATCH 1/2] feat: add opt-in debounce to coalesce USB event bursts Plugging in a single USB device churns /dev/bus/usb with several inotify events, firing registered callbacks multiple times per physical change. Consumers that rescan on each callback pay that cost repeatedly. AIOUSBWatcher now accepts an optional debounce (seconds) that coalesces a burst of events into a single callback once events go quiet. Default is None, preserving the existing fire-immediately behavior. The pending timer is cancelled on stop so no callback fires after the watcher is stopped. Also documents the public API with a Usage section in the README. Co-Authored-By: Claude Opus 4.8 (1M context) --- README.md | 51 +++++++++++++++++++++++++++++++++ src/aiousbwatcher/impl.py | 31 +++++++++++++++++++- tests/test_impl.py | 60 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 141 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index a86dc38..360aca3 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,57 @@ Install this via pip (or your favourite package manager): `pip install aiousbwatcher` +## Usage + +```python +import asyncio + +from aiousbwatcher import AIOUSBWatcher, InotifyNotAvailableError + + +async def main() -> None: + def _callback() -> None: + # A USB device was plugged in or unplugged; rescan as needed. + print("USB devices changed") + + watcher = AIOUSBWatcher() + unregister = watcher.async_register_callback(_callback) + + try: + stop = watcher.async_start() + except InotifyNotAvailableError: + # inotify is only available on Linux. + return + + # ... run your application ... + await asyncio.sleep(60) + + unregister() + stop() + + +asyncio.run(main()) +``` + +`async_register_callback` returns a callable that unregisters that callback, and +`async_start` returns a callable that stops the watcher. Callbacks take no +arguments — they signal *that* something changed, not *what*; rescan your +devices to find the details. + +### Debouncing event bursts + +Plugging in a single USB device churns `/dev/bus/usb` with several events, so a +naive callback fires multiple times per physical change. If your callback does +expensive work (such as a full device rescan), pass `debounce` to coalesce a +burst into a single invocation that fires once events have been quiet for the +given number of seconds: + +```python +watcher = AIOUSBWatcher(debounce=0.5) +``` + +With `debounce=None` (the default) every event fires the callbacks immediately. + ## Contributors ✨ Thanks goes to these wonderful people ([emoji key](https://allcontributors.org/docs/en/emoji-key)): diff --git a/src/aiousbwatcher/impl.py b/src/aiousbwatcher/impl.py index 6571bfa..6267fea 100644 --- a/src/aiousbwatcher/impl.py +++ b/src/aiousbwatcher/impl.py @@ -39,11 +39,23 @@ async def _async_get_directories_recursive( class AIOUSBWatcher: """A watcher for USB devices that uses asyncio.""" - def __init__(self) -> None: + def __init__(self, debounce: float | None = None) -> None: + """ + Initialize the watcher. + + ``debounce``, when set to a number of seconds, coalesces a burst of + filesystem events into a single callback invocation that fires once the + events have been quiet for that long. Plugging in a single USB device + churns ``/dev/bus/usb`` with several events, so consumers that perform + an expensive rescan per callback can use this to rescan only once. + When ``None`` (the default) every event fires the callbacks immediately. + """ self._path = Path(_PATH) self._loop = asyncio.get_running_loop() self._task: asyncio.Task[None] | None = None self._callbacks: set[Callable[[], None]] = set() + self._debounce = debounce + self._debounce_handle: asyncio.TimerHandle | None = None def async_start(self) -> Callable[[], None]: """Start the watcher.""" @@ -68,6 +80,10 @@ def _async_stop(self) -> None: assert self._task is not None # noqa self._task.cancel() self._task = None + if self._debounce_handle is not None: + # Drop any pending coalesced callback so it cannot fire after stop. + self._debounce_handle.cancel() + self._debounce_handle = None async def _watcher(self) -> None: mask = ( @@ -102,6 +118,19 @@ def _async_unregister_callback(self, callback: Callable[[], None]) -> None: self._callbacks.remove(callback) def _async_call_callbacks(self) -> None: + if self._debounce is None: + self._async_fire_callbacks() + return + # Coalesce a burst of events: (re)arm a single timer so the callbacks + # fire once the events have been quiet for ``self._debounce`` seconds. + if self._debounce_handle is not None: + self._debounce_handle.cancel() + self._debounce_handle = self._loop.call_later( + self._debounce, self._async_fire_callbacks + ) + + def _async_fire_callbacks(self) -> None: + self._debounce_handle = None for callback in self._callbacks: try: callback() diff --git a/tests/test_impl.py b/tests/test_impl.py index 8a0bc6c..d1eb55b 100644 --- a/tests/test_impl.py +++ b/tests/test_impl.py @@ -138,3 +138,63 @@ def callback() -> None: stop() await asyncio.sleep(_INOTIFY_WAIT_TIME) assert not called + + +_DEBOUNCE_TIME = 0.5 + + +@pytest.mark.asyncio +@pytest.mark.skipif( + platform != "linux", reason="Inotify not available on this platform" +) +async def test_aiousbwatcher_debounce_coalesces_bursts(tmp_path: Path) -> None: + count: int = 0 + + def callback() -> None: + nonlocal count + count += 1 + + with patch("aiousbwatcher.impl._PATH", str(tmp_path)): + watcher = AIOUSBWatcher(debounce=_DEBOUNCE_TIME) + unregister = watcher.async_register_callback(callback) + stop = watcher.async_start() + await asyncio.sleep(_INOTIFY_WAIT_TIME) + assert count == 0 + # A burst of events within the debounce window must not fire yet. + for i in range(3): + (tmp_path / f"test{i}").touch() + await asyncio.sleep(_INOTIFY_WAIT_TIME) + assert count == 0 + # Once the events go quiet for the debounce window, fire exactly once. + await asyncio.sleep(_DEBOUNCE_TIME) + assert count == 1 + unregister() + stop() + await asyncio.sleep(_DEBOUNCE_TIME + _INOTIFY_WAIT_TIME) + assert count == 1 + + +@pytest.mark.asyncio +@pytest.mark.skipif( + platform != "linux", reason="Inotify not available on this platform" +) +async def test_aiousbwatcher_debounce_cancelled_on_stop(tmp_path: Path) -> None: + count: int = 0 + + def callback() -> None: + nonlocal count + count += 1 + + with patch("aiousbwatcher.impl._PATH", str(tmp_path)): + watcher = AIOUSBWatcher(debounce=_DEBOUNCE_TIME) + unregister = watcher.async_register_callback(callback) + stop = watcher.async_start() + await asyncio.sleep(_INOTIFY_WAIT_TIME) + (tmp_path / "test").touch() + await asyncio.sleep(_INOTIFY_WAIT_TIME) + # Stopping while a debounced callback is still pending must drop it. + assert count == 0 + unregister() + stop() + await asyncio.sleep(_DEBOUNCE_TIME + _INOTIFY_WAIT_TIME) + assert count == 0 From 698e3f30bb262c74809dbcf2cb99038b0f4de725 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 5 Jun 2026 06:13:36 +0000 Subject: [PATCH 2/2] chore(pre-commit.ci): auto fixes --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 360aca3..87f663c 100644 --- a/README.md +++ b/README.md @@ -80,7 +80,7 @@ asyncio.run(main()) `async_register_callback` returns a callable that unregisters that callback, and `async_start` returns a callable that stops the watcher. Callbacks take no -arguments — they signal *that* something changed, not *what*; rescan your +arguments — they signal _that_ something changed, not _what_; rescan your devices to find the details. ### Debouncing event bursts