Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions roborock/data/v1/v1_containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
ROBOROCK_G20S_Ultra,
)
from roborock.exceptions import RoborockException
from roborock.roborock_message import RoborockDataProtocol

from ..containers import NamedRoomMapping, RoborockBase, RoborockBaseTimer, _attr_repr
from .v1_clean_modes import WashTowelModes
Expand Down Expand Up @@ -293,11 +294,11 @@ class StatusV2(RoborockBase):

msg_ver: int | None = None
msg_seq: int | None = None
state: RoborockStateCode | None = None
battery: int | None = None
state: RoborockStateCode | None = field(default=None, metadata={"dps": RoborockDataProtocol.STATE})
battery: int | None = field(default=None, metadata={"dps": RoborockDataProtocol.BATTERY})
clean_time: int | None = None
clean_area: int | None = None
error_code: RoborockErrorCode | None = None
error_code: RoborockErrorCode | None = field(default=None, metadata={"dps": RoborockDataProtocol.ERROR_CODE})
map_present: int | None = None
in_cleaning: RoborockInCleaning | None = None
in_returning: int | None = None
Expand All @@ -307,12 +308,12 @@ class StatusV2(RoborockBase):
back_type: int | None = None
wash_phase: int | None = None
wash_ready: int | None = None
fan_power: int | None = None
fan_power: int | None = field(default=None, metadata={"dps": RoborockDataProtocol.FAN_POWER})
dnd_enabled: int | None = None
map_status: int | None = None
is_locating: int | None = None
lock_status: int | None = None
water_box_mode: int | None = None
water_box_mode: int | None = field(default=None, metadata={"dps": RoborockDataProtocol.WATER_BOX_MODE})
water_box_carriage_status: int | None = None
mop_forbidden_enable: int | None = None
camera_status: int | None = None
Expand All @@ -330,13 +331,13 @@ class StatusV2(RoborockBase):
collision_avoid_status: int | None = None
switch_map_mode: int | None = None
dock_error_status: RoborockDockErrorCode | None = None
charge_status: int | None = None
charge_status: int | None = field(default=None, metadata={"dps": RoborockDataProtocol.CHARGE_STATUS})
unsave_map_reason: int | None = None
unsave_map_flag: int | None = None
wash_status: int | None = None
distance_off: int | None = None
in_warmup: int | None = None
dry_status: int | None = None
dry_status: int | None = field(default=None, metadata={"dps": RoborockDataProtocol.DRYING_STATUS})
rdt: int | None = None
clean_percent: int | None = None
rss: int | None = None
Expand Down Expand Up @@ -626,9 +627,9 @@ class CleanSummaryWithDetail(CleanSummary):

@dataclass
class Consumable(RoborockBase):
main_brush_work_time: int | None = None
side_brush_work_time: int | None = None
filter_work_time: int | None = None
main_brush_work_time: int | None = field(default=None, metadata={"dps": RoborockDataProtocol.MAIN_BRUSH_WORK_TIME})
side_brush_work_time: int | None = field(default=None, metadata={"dps": RoborockDataProtocol.SIDE_BRUSH_WORK_TIME})
filter_work_time: int | None = field(default=None, metadata={"dps": RoborockDataProtocol.FILTER_WORK_TIME})
filter_element_work_time: int | None = None
sensor_dirty_time: int | None = None
strainer_work_times: int | None = None
Expand Down
4 changes: 3 additions & 1 deletion roborock/devices/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ async def connect(self) -> None:
unsub = await self._channel.subscribe(self._on_message)
try:
if self.v1_properties is not None:
await self.v1_properties.discover_features()
await self.v1_properties.start()
elif self.b01_q10_properties is not None:
await self.b01_q10_properties.start()
except RoborockException:
Expand All @@ -216,6 +216,8 @@ async def close(self) -> None:
await self._connect_task
except asyncio.CancelledError:
pass
if self.v1_properties is not None:
self.v1_properties.close()
if self.b01_q10_properties is not None:
await self.b01_q10_properties.close()
if self._unsub:
Expand Down
1 change: 1 addition & 0 deletions roborock/devices/device_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ def device_creator(home_data: HomeData, device: HomeDataDevice, product: HomeDat
channel.rpc_channel,
channel.mqtt_rpc_channel,
channel.map_rpc_channel,
channel.add_dps_listener,
Comment on lines 236 to +239
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

device_creator types channel as the Channel protocol, but then passes channel.add_dps_listener into v1.create(...). Channel doesn’t define add_dps_listener, so this will fail mypy (which is enabled via pre-commit). Consider either (a) extending the Channel protocol to include add_dps_listener (possibly as an optional/no-op for non-V1 channels), or (b) using a more specific protocol/type for V1 channels in this match arm (e.g., a V1Channel protocol with add_dps_listener).

Copilot uses AI. Check for mistakes.
web_api,
device_cache=device_cache,
map_parser_config=map_parser_config,
Expand Down
35 changes: 29 additions & 6 deletions roborock/devices/rpc/v1_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from dataclasses import dataclass
from typing import Any, TypeVar

from roborock.callbacks import CallbackList
from roborock.data import HomeDataDevice, NetworkInfo, RoborockBase, UserData
from roborock.devices.cache import DeviceCache
from roborock.devices.transport.channel import Channel
Expand All @@ -30,9 +31,10 @@
V1RpcChannel,
create_map_response_decoder,
create_security_data,
decode_data_protocol_message,
decode_rpc_response,
)
from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol
from roborock.roborock_message import RoborockDataProtocol, RoborockMessage, RoborockMessageProtocol
from roborock.roborock_typing import RoborockCommand
from roborock.util import RoborockLoggerAdapter

Expand Down Expand Up @@ -188,6 +190,7 @@ def __init__(
self._device_cache = device_cache
self._reconnect_task: asyncio.Task[None] | None = None
self._last_network_info_refresh: datetime.datetime | None = None
self._dps_listeners = CallbackList[dict[RoborockDataProtocol, Any]]()

@property
def is_connected(self) -> bool:
Expand Down Expand Up @@ -305,12 +308,17 @@ async def subscribe(self, callback: Callable[[RoborockMessage], None]) -> Callab
loop = asyncio.get_running_loop()
self._reconnect_task = loop.create_task(self._background_reconnect())

if not self.is_local_connected:
# We were not able to connect locally, so fallback to MQTT and at least
# establish that connection explicitly. If this fails then raise an
# error and let the caller know we failed to subscribe.
# Always attempt to subscribe to MQTT to receive protocol updates (data points)
# even if we have a local connection. Protocol updates only come via cloud/MQTT.
# Local connection is used for RPC commands, but push notifications come via MQTT.
try:
self._mqtt_unsub = await self._mqtt_channel.subscribe(self._on_mqtt_message)
self._logger.debug("V1Channel connected to device via MQTT")
except RoborockException as err:
if not self.is_local_connected:
# Propagate error if both local and MQTT failed
self._logger.debug("MQTT connection also failed: %s", err)
raise
self._logger.debug("MQTT subscription failed, continuing with local-only connection: %s", err)

def unsub() -> None:
"""Unsubscribe from all messages."""
Expand All @@ -328,6 +336,16 @@ def unsub() -> None:
self._callback = callback
return unsub

def add_dps_listener(self, listener: Callable[[dict[RoborockDataProtocol, Any]], None]) -> Callable[[], None]:
"""Add a listener for DPS updates.

This will attach a listener to the existing subscription, invoking
the listener whenever new DPS values arrive from the subscription.
This will only work if a subscription has already been setup, which is
handled by the device setup.
"""
return self._dps_listeners.add_callback(listener)

async def _get_networking_info(self, *, prefer_cache: bool = True) -> NetworkInfo:
"""Retrieve networking information for the device.

Expand Down Expand Up @@ -428,6 +446,11 @@ def _on_mqtt_message(self, message: RoborockMessage) -> None:
self._logger.debug("V1Channel received MQTT message: %s", message)
if self._callback:
self._callback(message)
try:
if datapoints := decode_data_protocol_message(message):
self._dps_listeners(datapoints)
except RoborockException as e:
self._logger.debug("Error decoding data protocol message: %s", e)

def _on_local_message(self, message: RoborockMessage) -> None:
"""Handle incoming local messages."""
Expand Down
31 changes: 30 additions & 1 deletion roborock/devices/traits/v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,18 @@
"""

import logging
from collections.abc import Callable
from dataclasses import dataclass, field, fields
from typing import Any, get_args

from roborock.data.containers import HomeData, HomeDataProduct, RoborockBase
from roborock.data.v1.v1_code_mappings import RoborockDockTypeCode
from roborock.devices.cache import DeviceCache
from roborock.devices.traits import Trait
from roborock.exceptions import RoborockException
from roborock.map.map_parser import MapParserConfig
from roborock.protocols.v1_protocol import V1RpcChannel
from roborock.protocols.v1_protocol import V1RpcChannel, decode_data_protocol_message
from roborock.roborock_message import RoborockDataProtocol, RoborockMessage
from roborock.web_api import UserWebApiClient

from . import (
Expand Down Expand Up @@ -176,6 +179,7 @@ def __init__(
rpc_channel: V1RpcChannel,
mqtt_rpc_channel: V1RpcChannel,
map_rpc_channel: V1RpcChannel,
add_dps_listener: Callable[[Callable[[dict[RoborockDataProtocol, Any]], None]], Callable[[], None]],
web_api: UserWebApiClient,
device_cache: DeviceCache,
map_parser_config: MapParserConfig | None = None,
Expand All @@ -189,6 +193,8 @@ def __init__(
self._web_api = web_api
self._device_cache = device_cache
self._region = region
self._unsub: Callable[[], None] | None = None
self._add_dps_listener = add_dps_listener

self.device_features = DeviceFeaturesTrait(product, self._device_cache)
self.status = StatusTrait(self.device_features, region=self._region)
Expand Down Expand Up @@ -227,6 +233,27 @@ def _get_rpc_channel(self, trait: V1TraitMixin) -> V1RpcChannel:
else:
return self._rpc_channel

async def start(self) -> None:
"""Start the properties API and discover features."""
if self._unsub:
return
await self.discover_features()
self._unsub = self._add_dps_listener(self._on_dps_update)

def close(self) -> None:
if self._unsub:
self._unsub()
Comment on lines +240 to +245
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PropertiesApi.close() calls the stored unsubscribe but doesn’t clear self._unsub. If close() is called more than once (or if start() is called again), this can lead to double-unsubscribe attempts or holding a stale callable. Consider setting self._unsub = None after invoking it (and possibly guarding against start() being called twice without closing).

Suggested change
await self.discover_features()
self._unsub = self._add_dps_listener(self._on_dps_update)
def close(self) -> None:
if self._unsub:
self._unsub()
# Avoid registering multiple listeners if start is called more than once
if getattr(self, "_unsub", None):
return
await self.discover_features()
self._unsub = self._add_dps_listener(self._on_dps_update)
def close(self) -> None:
unsub = getattr(self, "_unsub", None)
if unsub:
unsub()
self._unsub = None

Copilot uses AI. Check for mistakes.
self._unsub = None

def _on_dps_update(self, dps: dict[RoborockDataProtocol, Any]) -> None:
"""Handle incoming messages from the device.

This will notify all traits of the new values.
"""
_LOGGER.debug("Received message from device: %s", dps)
self.status.update_from_dps(dps)
self.consumables.update_from_dps(dps)

async def discover_features(self) -> None:
"""Populate any supported traits that were not initialized in __init__."""
_LOGGER.debug("Starting optional trait discovery")
Expand Down Expand Up @@ -330,6 +357,7 @@ def create(
rpc_channel: V1RpcChannel,
mqtt_rpc_channel: V1RpcChannel,
map_rpc_channel: V1RpcChannel,
add_dps_listener: Callable[[Callable[[dict[RoborockDataProtocol, Any]], None]], Callable[[], None]],
web_api: UserWebApiClient,
device_cache: DeviceCache,
map_parser_config: MapParserConfig | None = None,
Expand All @@ -343,6 +371,7 @@ def create(
rpc_channel,
mqtt_rpc_channel,
map_rpc_channel,
add_dps_listener,
web_api,
device_cache,
map_parser_config,
Expand Down
76 changes: 75 additions & 1 deletion roborock/devices/traits/v1/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@

import logging
from abc import ABC, abstractmethod
from collections.abc import Callable
from dataclasses import fields
from typing import ClassVar
from typing import Any, ClassVar

from roborock.callbacks import CallbackList
from roborock.data import RoborockBase
from roborock.protocols.v1_protocol import V1RpcChannel
from roborock.roborock_message import RoborockDataProtocol
from roborock.roborock_typing import RoborockCommand

_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -173,3 +176,74 @@ def wrapper(*args, **kwargs):

cls.map_rpc_channel = True # type: ignore[attr-defined]
return wrapper


# TODO(allenporter): Merge with roborock.devices.traits.b01.q10.common.TraitUpdateListener
class TraitUpdateListener(ABC):
"""Trait update listener.

This is a base class for traits to support notifying listeners when they
have been updated. Clients may register callbacks to be notified when the
trait has been updated. When the listener callback is invoked, the client
should read the trait's properties to get the updated values.
"""

def __init__(self, logger: logging.Logger) -> None:
"""Initialize the trait update listener."""
self._update_callbacks: CallbackList[None] = CallbackList(logger=logger)

def add_update_listener(self, callback: Callable[[], None]) -> Callable[[], None]:
"""Register a callback when the trait has been updated.

Returns a callable to remove the listener.
"""
# We wrap the callback to ignore the value passed to it.
return self._update_callbacks.add_callback(lambda _: callback())

def _notify_update(self) -> None:
"""Notify all update listeners."""
self._update_callbacks(None)


class DpsDataConverter:
"""Utility to handle the transformation and merging of DPS data into models.

This class pre-calculates the mapping between Data Point IDs and dataclass fields
to optimize repeated updates from device streams.
"""

def __init__(self, dps_type_map: dict[RoborockDataProtocol, type], dps_field_map: dict[RoborockDataProtocol, str]):
"""Initialize the converter for a specific RoborockBase-derived class."""
self._dps_type_map = dps_type_map
self._dps_field_map = dps_field_map

@classmethod
def from_dataclass(cls, dataclass_type: type[RoborockBase]):
"""Initialize the converter for a specific RoborockBase-derived class."""
dps_type_map: dict[RoborockDataProtocol, type] = {}
dps_field_map: dict[RoborockDataProtocol, str] = {}
for field_obj in fields(dataclass_type):
if field_obj.metadata and "dps" in field_obj.metadata:
dps_id = field_obj.metadata["dps"]
dps_type_map[dps_id] = field_obj.type
dps_field_map[dps_id] = field_obj.name
return cls(dps_type_map, dps_field_map)

def update_from_dps(self, target: RoborockBase, decoded_dps: dict[RoborockDataProtocol, Any]) -> bool:
"""Convert and merge raw DPS data into the target object.

Uses the pre-calculated type mapping to ensure values are converted to the
correct Python types before being updated on the target.

Args:
target: The target object to update.
decoded_dps: The decoded DPS data to convert.

Returns:
True if any values were updated, False otherwise.
"""
conversions = RoborockBase.convert_dict(self._dps_type_map, decoded_dps)
for dps_id, value in conversions.items():
field_name = self._dps_field_map[dps_id]
setattr(target, field_name, value)
return bool(conversions)
Loading
Loading