Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,57 @@ Install this via pip (or your favourite package manager):

`pip install aiousbwatcher`

## Usage

```python
import asyncio

from aiousbwatcher import AIOUSBWatcher, InotifyNotAvailableError


async def main() -> None:
def _callback() -> None:
# A USB device was plugged in or unplugged; rescan as needed.
print("USB devices changed")

watcher = AIOUSBWatcher()
unregister = watcher.async_register_callback(_callback)

try:
stop = watcher.async_start()
except InotifyNotAvailableError:
# inotify is only available on Linux.
return

# ... run your application ...
await asyncio.sleep(60)

unregister()
stop()


asyncio.run(main())
```

`async_register_callback` returns a callable that unregisters that callback, and
`async_start` returns a callable that stops the watcher. Callbacks take no
arguments — they signal _that_ something changed, not _what_; rescan your
devices to find the details.

### Debouncing event bursts

Plugging in a single USB device churns `/dev/bus/usb` with several events, so a
naive callback fires multiple times per physical change. If your callback does
expensive work (such as a full device rescan), pass `debounce` to coalesce a
burst into a single invocation that fires once events have been quiet for the
given number of seconds:

```python
watcher = AIOUSBWatcher(debounce=0.5)
```

With `debounce=None` (the default) every event fires the callbacks immediately.

## Contributors ✨

Thanks goes to these wonderful people ([emoji key](https://allcontributors.org/docs/en/emoji-key)):
Expand Down
31 changes: 30 additions & 1 deletion src/aiousbwatcher/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,23 @@ async def _async_get_directories_recursive(
class AIOUSBWatcher:
"""A watcher for USB devices that uses asyncio."""

def __init__(self) -> None:
def __init__(self, debounce: float | None = None) -> None:
"""
Initialize the watcher.

``debounce``, when set to a number of seconds, coalesces a burst of
filesystem events into a single callback invocation that fires once the
events have been quiet for that long. Plugging in a single USB device
churns ``/dev/bus/usb`` with several events, so consumers that perform
an expensive rescan per callback can use this to rescan only once.
When ``None`` (the default) every event fires the callbacks immediately.
"""
self._path = Path(_PATH)
self._loop = asyncio.get_running_loop()
self._task: asyncio.Task[None] | None = None
self._callbacks: set[Callable[[], None]] = set()
self._debounce = debounce
self._debounce_handle: asyncio.TimerHandle | None = None

def async_start(self) -> Callable[[], None]:
"""Start the watcher."""
Expand All @@ -68,6 +80,10 @@ def _async_stop(self) -> None:
assert self._task is not None # noqa
self._task.cancel()
self._task = None
if self._debounce_handle is not None:
# Drop any pending coalesced callback so it cannot fire after stop.
self._debounce_handle.cancel()
self._debounce_handle = None

async def _watcher(self) -> None:
mask = (
Expand Down Expand Up @@ -102,6 +118,19 @@ def _async_unregister_callback(self, callback: Callable[[], None]) -> None:
self._callbacks.remove(callback)

def _async_call_callbacks(self) -> None:
if self._debounce is None:
self._async_fire_callbacks()
return
# Coalesce a burst of events: (re)arm a single timer so the callbacks
# fire once the events have been quiet for ``self._debounce`` seconds.
if self._debounce_handle is not None:
self._debounce_handle.cancel()
self._debounce_handle = self._loop.call_later(
self._debounce, self._async_fire_callbacks
)

def _async_fire_callbacks(self) -> None:
self._debounce_handle = None
for callback in self._callbacks:
try:
callback()
Expand Down
60 changes: 60 additions & 0 deletions tests/test_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,63 @@ def callback() -> None:
stop()
await asyncio.sleep(_INOTIFY_WAIT_TIME)
assert not called


_DEBOUNCE_TIME = 0.5


@pytest.mark.asyncio
@pytest.mark.skipif(
platform != "linux", reason="Inotify not available on this platform"
)
async def test_aiousbwatcher_debounce_coalesces_bursts(tmp_path: Path) -> None:
count: int = 0

def callback() -> None:
nonlocal count
count += 1

with patch("aiousbwatcher.impl._PATH", str(tmp_path)):
watcher = AIOUSBWatcher(debounce=_DEBOUNCE_TIME)
unregister = watcher.async_register_callback(callback)
stop = watcher.async_start()
await asyncio.sleep(_INOTIFY_WAIT_TIME)
assert count == 0
# A burst of events within the debounce window must not fire yet.
for i in range(3):
(tmp_path / f"test{i}").touch()
await asyncio.sleep(_INOTIFY_WAIT_TIME)
assert count == 0
# Once the events go quiet for the debounce window, fire exactly once.
await asyncio.sleep(_DEBOUNCE_TIME)
assert count == 1
unregister()
stop()
await asyncio.sleep(_DEBOUNCE_TIME + _INOTIFY_WAIT_TIME)
assert count == 1


@pytest.mark.asyncio
@pytest.mark.skipif(
platform != "linux", reason="Inotify not available on this platform"
)
async def test_aiousbwatcher_debounce_cancelled_on_stop(tmp_path: Path) -> None:
count: int = 0

def callback() -> None:
nonlocal count
count += 1

with patch("aiousbwatcher.impl._PATH", str(tmp_path)):
watcher = AIOUSBWatcher(debounce=_DEBOUNCE_TIME)
unregister = watcher.async_register_callback(callback)
stop = watcher.async_start()
await asyncio.sleep(_INOTIFY_WAIT_TIME)
(tmp_path / "test").touch()
await asyncio.sleep(_INOTIFY_WAIT_TIME)
# Stopping while a debounced callback is still pending must drop it.
assert count == 0
unregister()
stop()
await asyncio.sleep(_DEBOUNCE_TIME + _INOTIFY_WAIT_TIME)
assert count == 0
Loading