diff --git a/pycyphal/application/file.py b/pycyphal/application/file.py index ac5b1e9f..4ca5790a 100644 --- a/pycyphal/application/file.py +++ b/pycyphal/application/file.py @@ -43,9 +43,7 @@ class FileServer: The lifetime of this instance matches the lifetime of its node. """ - def __init__( - self, node: pycyphal.application.Node, roots: typing.Iterable[typing.Union[str, pathlib.Path]] - ) -> None: + def __init__(self, node: pycyphal.application.Node, roots: typing.Iterable[str | pathlib.Path]) -> None: """ :param node: The node instance to initialize the file server on. @@ -85,7 +83,7 @@ def close() -> None: node.add_lifetime_hooks(start, close) @property - def roots(self) -> typing.List[pathlib.Path]: + def roots(self) -> list[pathlib.Path]: """ File operations will be performed within these root directories. The first directory to match takes precedence. @@ -94,7 +92,7 @@ def roots(self) -> typing.List[pathlib.Path]: """ return self._roots - def locate(self, p: typing.Union[pathlib.Path, str, Path]) -> typing.Tuple[pathlib.Path, pathlib.Path]: + def locate(self, p: pathlib.Path | str | Path) -> tuple[pathlib.Path, pathlib.Path]: """ Iterate through :attr:`roots` until a root r is found such that ``r/p`` exists and return ``(r, p)``. Otherwise, return nonexistent ``(roots[0], p)``. @@ -413,7 +411,7 @@ async def move(self, src: str, dst: str, overwrite: bool = False) -> int: assert isinstance(res, Modify.Response) return int(res.error.value) - async def read(self, path: str, offset: int = 0, size: typing.Optional[int] = None) -> typing.Union[int, bytes]: + async def read(self, path: str, offset: int = 0, size: int | None = None) -> int | bytes: """ Proxy for ``uavcan.file.Read``. @@ -434,7 +432,7 @@ async def read(self, path: str, offset: int = 0, size: typing.Optional[int] = No data on success (empty if the offset is out of bounds or the file is empty). """ - async def once() -> typing.Union[int, bytes]: + async def once() -> int | bytes: res = await self._call(Read, Read.Request(offset=offset, path=Path(path))) assert isinstance(res, Read.Response) if res.error.value != 0: @@ -455,9 +453,7 @@ async def once() -> typing.Union[int, bytes]: offset += len(out) return data - async def write( - self, path: str, data: typing.Union[memoryview, bytes], offset: int = 0, *, truncate: bool = True - ) -> int: + async def write(self, path: str, data: memoryview | bytes, offset: int = 0, *, truncate: bool = True) -> int: """ Proxy for ``uavcan.file.Write``. @@ -479,7 +475,7 @@ async def write( :returns: See ``uavcan.file.Error`` """ - async def once(d: typing.Union[memoryview, bytes]) -> int: + async def once(d: memoryview | bytes) -> int: res = await self._call( Write, Write.Request(offset, path=Path(path), data=Unstructured(np.frombuffer(d, np.uint8))), @@ -658,7 +654,13 @@ async def move(self, src: str, dst: str, overwrite: bool = False) -> None: assert isinstance(res, Modify.Response) _raise_on_error(res.error, f"{src}->{dst}") - async def read(self, path: str, offset: int = 0, size: typing.Optional[int] = None) -> bytes: + async def read( + self, + path: str, + offset: int = 0, + size: int | None = None, + progress: typing.Callable[[int, int | None], None] | None = None, + ) -> bytes: """ Proxy for ``uavcan.file.Read``. @@ -674,6 +676,10 @@ async def read(self, path: str, offset: int = 0, size: typing.Optional[int] = No If None (default), the entire file will be read (this may exhaust local memory). If zero, this call is a no-op. + :param progress: + Optional callback function that receives (bytes_read, total_size) + total_size will be None if size parameter is None + :raises OSError: If the read operation failed; see ``uavcan.file.Error`` :returns: @@ -686,20 +692,26 @@ async def once() -> bytes: _raise_on_error(res.error, path) return bytes(res.data.value.tobytes()) - if size is None: - size = 2**64 data = b"" - while len(data) < size: + while len(data) < (size or 2**64): out = await once() assert isinstance(out, bytes) if not out: break data += out offset += len(out) + if progress: + progress(len(data), size) return data async def write( - self, path: str, data: typing.Union[memoryview, bytes], offset: int = 0, *, truncate: bool = True + self, + path: str, + data: memoryview | bytes, + offset: int = 0, + *, + truncate: bool = True, + progress: typing.Callable[[int, int], None] | None = None, ) -> None: """ Proxy for ``uavcan.file.Write``. @@ -719,10 +731,13 @@ async def write( If True, the rest of the file after ``offset + len(data)`` will be truncated. This is done by sending an empty write request, as prescribed by the Specification. + :param progress: + Optional callback function that receives (bytes_written, total_size) + :raises OSError: If the write operation failed; see ``uavcan.file.Error`` """ - async def once(d: typing.Union[memoryview, bytes]) -> None: + async def once(d: memoryview | bytes) -> None: res = await self._call( Write, Write.Request(offset, path=Path(path), data=Unstructured(np.frombuffer(d, np.uint8))), @@ -730,11 +745,16 @@ async def once(d: typing.Union[memoryview, bytes]) -> None: assert isinstance(res, Write.Response) _raise_on_error(res.error, path) + total_size = len(data) + bytes_written = 0 limit = self.data_transfer_capacity while len(data) > 0: frag, data = data[:limit], data[limit:] await once(frag) offset += len(frag) + bytes_written += len(frag) + if progress: + progress(bytes_written, total_size) if truncate: await once(b"") diff --git a/tests/application/file.py b/tests/application/file.py index 0cd5aad6..bb23588c 100644 --- a/tests/application/file.py +++ b/tests/application/file.py @@ -2,6 +2,7 @@ # This software is distributed under the terms of the MIT License. # Author: Pavel Kirienko +import math import sys import shutil import typing @@ -13,8 +14,13 @@ import pycyphal +class ProgressTracker: + def __init__(self) -> None: + self.counter = 0 + + @pytest.mark.asyncio -async def _unittest_file(compiled: typing.List[pycyphal.dsdl.GeneratedPackageInfo]) -> None: +async def _unittest_file(compiled: list[pycyphal.dsdl.GeneratedPackageInfo]) -> None: from pycyphal.application import make_node, NodeInfo from pycyphal.transport.udp import UDPTransport from pycyphal.application.file import FileClient, FileServer, Error @@ -254,8 +260,28 @@ async def ls(path: str) -> typing.List[str]: assert e.value.errno == errno.ENOENT # Write into empty file - await cln.write("a/foo/x", bytes(range(200)) * 3) - assert await cln.read("a/foo/x") == bytes(range(200)) * 3 + data = bytes(range(200)) * 3 + data_chunks = math.ceil(len(data) / cln.data_transfer_capacity) + write_tracker = ProgressTracker() + + def write_progress_cb(bytes_written: int, bytes_total: int) -> None: + write_tracker.counter += 1 + assert bytes_total == len(data) + assert bytes_written == min(write_tracker.counter * cln.data_transfer_capacity, len(data)) + + await cln.write("a/foo/x", data, progress=write_progress_cb) + assert write_tracker.counter == data_chunks + + read_tracker = ProgressTracker() + + def read_progress_cb(bytes_read: int, bytes_total: int | None) -> None: + read_tracker.counter += 1 + assert bytes_total is None + assert bytes_read == min(read_tracker.counter * cln.data_transfer_capacity, len(data)) + + assert await cln.read("a/foo/x", progress=read_progress_cb) == data + assert read_tracker.counter == data_chunks + assert (await cln.get_info("a/foo/x")).size == 600 # Truncation -- this write is shorter