feat: Implement v1 streaming data updates#799
feat: Implement v1 streaming data updates#799allenporter wants to merge 2 commits intoPython-roborock:mainfrom
Conversation
…es using `dps` metadata and add corresponding update listeners. This uses the same dps converter patern used by q10, but does not share code explicitly.
There was a problem hiding this comment.
Pull request overview
Implements streaming “push” updates for V1 devices by decoding incoming DPS (data protocol) payloads and applying them to relevant V1 traits, enabling near-real-time status/consumable updates without polling.
Changes:
- Add
decode_data_protocol_message()to parse V1 DPS push messages intoRoborockDataProtocol -> valuemappings. - Introduce a DPS-to-dataclass-field mapping mechanism (
DpsDataConverter) plus trait update listeners to apply streaming updates. - Wire V1 channel MQTT subscriptions to emit DPS updates and have
PropertiesApi.start()register a listener to update traits.
Reviewed changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
roborock/protocols/v1_protocol.py |
Adds DPS decoding helper and decode_data_protocol_message() for V1 push updates. |
roborock/devices/rpc/v1_channel.py |
Always subscribes to MQTT and emits decoded DPS updates to registered listeners. |
roborock/devices/traits/v1/common.py |
Adds TraitUpdateListener + DpsDataConverter to support streaming field updates. |
roborock/devices/traits/v1/status.py |
Applies DPS updates to StatusTrait and notifies listeners. |
roborock/devices/traits/v1/consumeable.py |
Adds DPS update support to ConsumableTrait. |
roborock/devices/traits/v1/__init__.py |
Adds start() to register DPS listener and dispatch updates to traits. |
roborock/devices/device.py |
Starts/stops V1 properties streaming lifecycle on connect/close. |
roborock/devices/device_manager.py |
Passes add_dps_listener from channel into V1 trait creation. |
roborock/data/v1/v1_containers.py |
Annotates selected dataclass fields with dps metadata for streaming updates. |
roborock/roborock_message.py |
Adjusts RoborockEnum import location. |
tests/protocols/test_v1_protocol.py |
Adds unit tests for decoding V1 data protocol push messages. |
tests/devices/traits/v1/test_status.py |
Adds unit tests for status updates + update listeners. |
tests/devices/traits/v1/fixtures.py / tests/devices/test_v1_device.py |
Updates fixtures to satisfy new add_dps_listener constructor parameter. |
tests/devices/rpc/test_v1_channel.py |
Updates expectation: MQTT subscription is established even with local connection. |
tests/e2e/__snapshots__/test_device_manager.ambr |
Snapshot updated for additional MQTT traffic. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| await self.discover_features() | ||
| self._unsub = self._add_dps_listener(self._on_dps_update) | ||
|
|
||
| def close(self) -> None: | ||
| if self._unsub: | ||
| self._unsub() |
There was a problem hiding this comment.
PropertiesApi.close() calls the stored unsubscribe but doesn’t clear self._unsub. If close() is called more than once (or if start() is called again), this can lead to double-unsubscribe attempts or holding a stale callable. Consider setting self._unsub = None after invoking it (and possibly guarding against start() being called twice without closing).
| await self.discover_features() | |
| self._unsub = self._add_dps_listener(self._on_dps_update) | |
| def close(self) -> None: | |
| if self._unsub: | |
| self._unsub() | |
| # Avoid registering multiple listeners if start is called more than once | |
| if getattr(self, "_unsub", None): | |
| return | |
| await self.discover_features() | |
| self._unsub = self._add_dps_listener(self._on_dps_update) | |
| def close(self) -> None: | |
| unsub = getattr(self, "_unsub", None) | |
| if unsub: | |
| unsub() | |
| self._unsub = None |
| datetime.datetime.now(datetime.UTC) - self._last_network_info_refresh > NETWORK_INFO_REFRESH_INTERVAL | ||
| ): | ||
| return False | ||
| return True | ||
|
|
||
| def _on_mqtt_message(self, message: RoborockMessage) -> None: | ||
| """Handle incoming MQTT messages.""" | ||
| self._logger.debug("V1Channel received MQTT message: %s", message) | ||
| if self._callback: | ||
| self._callback(message) | ||
| try: | ||
| if datapoints := decode_data_protocol_message(message): | ||
| self._dps_listeners(datapoints) | ||
| except RoborockException as e: |
There was a problem hiding this comment.
New DPS listener plumbing (add_dps_listener + decoding/dispatch in _on_mqtt_message) doesn’t appear to have direct test coverage (no tests assert that a pushed MQTT message results in DPS callbacks firing). Adding a unit test around V1Channel._on_mqtt_message (or an integration-style test that PropertiesApi.start() updates StatusTrait via a simulated MQTT message) would help prevent regressions in the streaming update path.
| @@ -47,3 +52,12 @@ async def reset_consumable(self, consumable: ConsumableAttribute) -> None: | |||
| """Reset a specific consumable attribute on the device.""" | |||
| await self.rpc_channel.send_command(RoborockCommand.RESET_CONSUMABLE, params=[consumable.value]) | |||
| await self.refresh() | |||
|
|
|||
| def update_from_dps(self, decoded_dps: dict[RoborockDataProtocol, Any]) -> None: | |||
| """Update the trait from data protocol push message data. | |||
|
|
|||
| This handles unsolicited status updates pushed by the device | |||
| via RoborockDataProtocol codes (e.g. STATE=121, BATTERY=122). | |||
| """ | |||
| if _DPS_CONVERTER.update_from_dps(self, decoded_dps): | |||
| self._notify_update() | |||
There was a problem hiding this comment.
ConsumableTrait now mixes in TraitUpdateListener, but it never calls TraitUpdateListener.__init__(). As a result, _update_callbacks is never initialized and update_from_dps() will raise AttributeError when it tries to call _notify_update(). Add an __init__ that calls super().__init__() (to init the Consumable dataclass) and then TraitUpdateListener.__init__(..., logger=...) (and add a module logger similar to StatusTrait).
| channel.rpc_channel, | ||
| channel.mqtt_rpc_channel, | ||
| channel.map_rpc_channel, | ||
| channel.add_dps_listener, |
There was a problem hiding this comment.
device_creator types channel as the Channel protocol, but then passes channel.add_dps_listener into v1.create(...). Channel doesn’t define add_dps_listener, so this will fail mypy (which is enabled via pre-commit). Consider either (a) extending the Channel protocol to include add_dps_listener (possibly as an optional/no-op for non-V1 channels), or (b) using a more specific protocol/type for V1 channels in this match arm (e.g., a V1Channel protocol with add_dps_listener).
roborock/devices/rpc/v1_channel.py
Outdated
| # Always subscribe to MQTT to receive protocol updates (data points) | ||
| # even if we have a local connection. Protocol updates only come via cloud/MQTT. | ||
| # Local connection is used for RPC commands, but push notifications come via MQTT. | ||
| self._mqtt_unsub = await self._mqtt_channel.subscribe(self._on_mqtt_message) | ||
| if self.is_local_connected: | ||
| self._logger.debug("V1Channel connected via local and MQTT (for protocol updates)") | ||
| else: | ||
| self._logger.debug("V1Channel connected via MQTT only") |
There was a problem hiding this comment.
subscribe() now always awaits self._mqtt_channel.subscribe(...). This changes behavior from “fallback to MQTT if local fails” to “require MQTT even when local is connected”, and the docstring above still describes fallback semantics. If local-only operation should still be possible, consider making the MQTT subscription best-effort when is_local_connected is true (log + continue) and only raise when neither transport is available; otherwise, update the docstring and error behavior to reflect the new requirement.
| # Always subscribe to MQTT to receive protocol updates (data points) | |
| # even if we have a local connection. Protocol updates only come via cloud/MQTT. | |
| # Local connection is used for RPC commands, but push notifications come via MQTT. | |
| self._mqtt_unsub = await self._mqtt_channel.subscribe(self._on_mqtt_message) | |
| if self.is_local_connected: | |
| self._logger.debug("V1Channel connected via local and MQTT (for protocol updates)") | |
| else: | |
| self._logger.debug("V1Channel connected via MQTT only") | |
| # Always attempt to subscribe to MQTT to receive protocol updates (data points) | |
| # even if we have a local connection. Protocol updates only come via cloud/MQTT. | |
| # Local connection is used for RPC commands, but push notifications come via MQTT. | |
| try: | |
| self._mqtt_unsub = await self._mqtt_channel.subscribe(self._on_mqtt_message) | |
| if self.is_local_connected: | |
| self._logger.debug("V1Channel connected via local and MQTT (for protocol updates)") | |
| else: | |
| self._logger.debug("V1Channel connected via MQTT only") | |
| except Exception as err: | |
| # If MQTT subscription fails but local is connected, continue in local-only mode. | |
| if self.is_local_connected: | |
| self._logger.warning( | |
| "MQTT subscription failed, continuing with local-only connection: %s", | |
| err, | |
| ) | |
| self._mqtt_unsub = None | |
| else: | |
| # No MQTT and no local connection available: propagate the error. | |
| raise |
roborock/protocols/v1_protocol.py
Outdated
| associate an API response with a request even if there was an error. | ||
| """ | ||
| if not message.payload: | ||
| if not (datapoints := _decode_dps_message(message)): |
There was a problem hiding this comment.
decode_rpc_response() now calls _decode_dps_message() and returns an empty ResponseMessage when _decode_dps_message() returns None. _decode_dps_message() returns None not only for missing payload, but also when 'dps' is present but not a dict. That silently treats malformed RPC responses as “no payload”, whereas the previous logic raised RoborockException. Consider keeping strict validation for RPC responses (e.g., have _decode_dps_message(..., strict=True) raise when 'dps' is missing/wrong type, or add an explicit check in decode_rpc_response).
| if not (datapoints := _decode_dps_message(message)): | |
| datapoints = _decode_dps_message(message) | |
| if datapoints is None: | |
| # For RPC responses, a missing or malformed 'dps' section is considered | |
| # a protocol error rather than "no payload". | |
| raise RoborockException( | |
| f"Invalid V1 message format: missing or invalid 'dps' in payload for {message.payload!r}" | |
| ) | |
| if not datapoints: |
This implements a listener that will receive streaming updates to data protocol messages. This sets the mapping for each field using the
dpsmetadata and adds corresponding update listeners to the associated traints.This uses the same dps converter pattern used by q10, but does not share code explicitly.