Skip to content

feat: Implement v1 streaming data updates#799

Draft
allenporter wants to merge 2 commits intoPython-roborock:mainfrom
allenporter:v1-streaming-updates
Draft

feat: Implement v1 streaming data updates#799
allenporter wants to merge 2 commits intoPython-roborock:mainfrom
allenporter:v1-streaming-updates

Conversation

@allenporter
Copy link
Contributor

This implements a listener that will receive streaming updates to data protocol messages. This sets the mapping for each field using the dps metadata and adds corresponding update listeners to the associated traints.

This uses the same dps converter pattern used by q10, but does not share code explicitly.

…es using `dps` metadata and add corresponding update listeners.

This uses the same dps converter patern used by q10, but does not share code explicitly.
Copilot AI review requested due to automatic review settings March 23, 2026 00:32
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Implements streaming “push” updates for V1 devices by decoding incoming DPS (data protocol) payloads and applying them to relevant V1 traits, enabling near-real-time status/consumable updates without polling.

Changes:

  • Add decode_data_protocol_message() to parse V1 DPS push messages into RoborockDataProtocol -> value mappings.
  • Introduce a DPS-to-dataclass-field mapping mechanism (DpsDataConverter) plus trait update listeners to apply streaming updates.
  • Wire V1 channel MQTT subscriptions to emit DPS updates and have PropertiesApi.start() register a listener to update traits.

Reviewed changes

Copilot reviewed 16 out of 16 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
roborock/protocols/v1_protocol.py Adds DPS decoding helper and decode_data_protocol_message() for V1 push updates.
roborock/devices/rpc/v1_channel.py Always subscribes to MQTT and emits decoded DPS updates to registered listeners.
roborock/devices/traits/v1/common.py Adds TraitUpdateListener + DpsDataConverter to support streaming field updates.
roborock/devices/traits/v1/status.py Applies DPS updates to StatusTrait and notifies listeners.
roborock/devices/traits/v1/consumeable.py Adds DPS update support to ConsumableTrait.
roborock/devices/traits/v1/__init__.py Adds start() to register DPS listener and dispatch updates to traits.
roborock/devices/device.py Starts/stops V1 properties streaming lifecycle on connect/close.
roborock/devices/device_manager.py Passes add_dps_listener from channel into V1 trait creation.
roborock/data/v1/v1_containers.py Annotates selected dataclass fields with dps metadata for streaming updates.
roborock/roborock_message.py Adjusts RoborockEnum import location.
tests/protocols/test_v1_protocol.py Adds unit tests for decoding V1 data protocol push messages.
tests/devices/traits/v1/test_status.py Adds unit tests for status updates + update listeners.
tests/devices/traits/v1/fixtures.py / tests/devices/test_v1_device.py Updates fixtures to satisfy new add_dps_listener constructor parameter.
tests/devices/rpc/test_v1_channel.py Updates expectation: MQTT subscription is established even with local connection.
tests/e2e/__snapshots__/test_device_manager.ambr Snapshot updated for additional MQTT traffic.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +238 to +243
await self.discover_features()
self._unsub = self._add_dps_listener(self._on_dps_update)

def close(self) -> None:
if self._unsub:
self._unsub()
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PropertiesApi.close() calls the stored unsubscribe but doesn’t clear self._unsub. If close() is called more than once (or if start() is called again), this can lead to double-unsubscribe attempts or holding a stale callable. Consider setting self._unsub = None after invoking it (and possibly guarding against start() being called twice without closing).

Suggested change
await self.discover_features()
self._unsub = self._add_dps_listener(self._on_dps_update)
def close(self) -> None:
if self._unsub:
self._unsub()
# Avoid registering multiple listeners if start is called more than once
if getattr(self, "_unsub", None):
return
await self.discover_features()
self._unsub = self._add_dps_listener(self._on_dps_update)
def close(self) -> None:
unsub = getattr(self, "_unsub", None)
if unsub:
unsub()
self._unsub = None

Copilot uses AI. Check for mistakes.
Comment on lines 436 to +449
datetime.datetime.now(datetime.UTC) - self._last_network_info_refresh > NETWORK_INFO_REFRESH_INTERVAL
):
return False
return True

def _on_mqtt_message(self, message: RoborockMessage) -> None:
"""Handle incoming MQTT messages."""
self._logger.debug("V1Channel received MQTT message: %s", message)
if self._callback:
self._callback(message)
try:
if datapoints := decode_data_protocol_message(message):
self._dps_listeners(datapoints)
except RoborockException as e:
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New DPS listener plumbing (add_dps_listener + decoding/dispatch in _on_mqtt_message) doesn’t appear to have direct test coverage (no tests assert that a pushed MQTT message results in DPS callbacks firing). Adding a unit test around V1Channel._on_mqtt_message (or an integration-style test that PropertiesApi.start() updates StatusTrait via a simulated MQTT message) would help prevent regressions in the streaming update path.

Copilot uses AI. Check for mistakes.
Comment on lines 41 to +63
@@ -47,3 +52,12 @@ async def reset_consumable(self, consumable: ConsumableAttribute) -> None:
"""Reset a specific consumable attribute on the device."""
await self.rpc_channel.send_command(RoborockCommand.RESET_CONSUMABLE, params=[consumable.value])
await self.refresh()

def update_from_dps(self, decoded_dps: dict[RoborockDataProtocol, Any]) -> None:
"""Update the trait from data protocol push message data.

This handles unsolicited status updates pushed by the device
via RoborockDataProtocol codes (e.g. STATE=121, BATTERY=122).
"""
if _DPS_CONVERTER.update_from_dps(self, decoded_dps):
self._notify_update()
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConsumableTrait now mixes in TraitUpdateListener, but it never calls TraitUpdateListener.__init__(). As a result, _update_callbacks is never initialized and update_from_dps() will raise AttributeError when it tries to call _notify_update(). Add an __init__ that calls super().__init__() (to init the Consumable dataclass) and then TraitUpdateListener.__init__(..., logger=...) (and add a module logger similar to StatusTrait).

Copilot uses AI. Check for mistakes.
Comment on lines 236 to +239
channel.rpc_channel,
channel.mqtt_rpc_channel,
channel.map_rpc_channel,
channel.add_dps_listener,
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

device_creator types channel as the Channel protocol, but then passes channel.add_dps_listener into v1.create(...). Channel doesn’t define add_dps_listener, so this will fail mypy (which is enabled via pre-commit). Consider either (a) extending the Channel protocol to include add_dps_listener (possibly as an optional/no-op for non-V1 channels), or (b) using a more specific protocol/type for V1 channels in this match arm (e.g., a V1Channel protocol with add_dps_listener).

Copilot uses AI. Check for mistakes.
Comment on lines +311 to +318
# Always subscribe to MQTT to receive protocol updates (data points)
# even if we have a local connection. Protocol updates only come via cloud/MQTT.
# Local connection is used for RPC commands, but push notifications come via MQTT.
self._mqtt_unsub = await self._mqtt_channel.subscribe(self._on_mqtt_message)
if self.is_local_connected:
self._logger.debug("V1Channel connected via local and MQTT (for protocol updates)")
else:
self._logger.debug("V1Channel connected via MQTT only")
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subscribe() now always awaits self._mqtt_channel.subscribe(...). This changes behavior from “fallback to MQTT if local fails” to “require MQTT even when local is connected”, and the docstring above still describes fallback semantics. If local-only operation should still be possible, consider making the MQTT subscription best-effort when is_local_connected is true (log + continue) and only raise when neither transport is available; otherwise, update the docstring and error behavior to reflect the new requirement.

Suggested change
# Always subscribe to MQTT to receive protocol updates (data points)
# even if we have a local connection. Protocol updates only come via cloud/MQTT.
# Local connection is used for RPC commands, but push notifications come via MQTT.
self._mqtt_unsub = await self._mqtt_channel.subscribe(self._on_mqtt_message)
if self.is_local_connected:
self._logger.debug("V1Channel connected via local and MQTT (for protocol updates)")
else:
self._logger.debug("V1Channel connected via MQTT only")
# Always attempt to subscribe to MQTT to receive protocol updates (data points)
# even if we have a local connection. Protocol updates only come via cloud/MQTT.
# Local connection is used for RPC commands, but push notifications come via MQTT.
try:
self._mqtt_unsub = await self._mqtt_channel.subscribe(self._on_mqtt_message)
if self.is_local_connected:
self._logger.debug("V1Channel connected via local and MQTT (for protocol updates)")
else:
self._logger.debug("V1Channel connected via MQTT only")
except Exception as err:
# If MQTT subscription fails but local is connected, continue in local-only mode.
if self.is_local_connected:
self._logger.warning(
"MQTT subscription failed, continuing with local-only connection: %s",
err,
)
self._mqtt_unsub = None
else:
# No MQTT and no local connection available: propagate the error.
raise

Copilot uses AI. Check for mistakes.
associate an API response with a request even if there was an error.
"""
if not message.payload:
if not (datapoints := _decode_dps_message(message)):
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

decode_rpc_response() now calls _decode_dps_message() and returns an empty ResponseMessage when _decode_dps_message() returns None. _decode_dps_message() returns None not only for missing payload, but also when 'dps' is present but not a dict. That silently treats malformed RPC responses as “no payload”, whereas the previous logic raised RoborockException. Consider keeping strict validation for RPC responses (e.g., have _decode_dps_message(..., strict=True) raise when 'dps' is missing/wrong type, or add an explicit check in decode_rpc_response).

Suggested change
if not (datapoints := _decode_dps_message(message)):
datapoints = _decode_dps_message(message)
if datapoints is None:
# For RPC responses, a missing or malformed 'dps' section is considered
# a protocol error rather than "no payload".
raise RoborockException(
f"Invalid V1 message format: missing or invalid 'dps' in payload for {message.payload!r}"
)
if not datapoints:

Copilot uses AI. Check for mistakes.
@allenporter allenporter marked this pull request as draft March 23, 2026 00:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants