33import asyncio
44import logging
55import socket
6- from asyncio import Lock
6+ from asyncio import Transport
77from typing import Callable , Mapping , Optional
88
99import async_timeout
1010
11- from .api import SPECIAL_COMMANDS , RoborockClient
11+ from .api import SPECIAL_COMMANDS , RoborockClient , QUEUE_TIMEOUT
1212from .containers import RoborockLocalDeviceInfo
13- from .exceptions import CommandVacuumError , RoborockConnectionException , RoborockException , RoborockTimeout
13+ from .exceptions import CommandVacuumError , RoborockConnectionException , RoborockException
1414from .roborock_message import RoborockMessage , RoborockParser
1515from .typing import CommandInfoMap , RoborockCommand
1616from .util import get_running_loop_or_create_one
@@ -30,15 +30,15 @@ def __init__(self, devices_info: Mapping[str, RoborockLocalDeviceInfo]):
3030 )
3131 for device_id , device_info in devices_info .items ()
3232 }
33- self ._mutex = Lock ()
3433 self ._batch_structs : list [RoborockMessage ] = []
3534 self ._executing = False
3635
37- async def async_connect (self ):
36+ async def async_connect (self ) -> None :
3837 await asyncio .gather (* [listener .connect () for listener in self .device_listener .values ()])
3938
4039 async def async_disconnect (self ) -> None :
41- await asyncio .gather (* [listener .disconnect () for listener in self .device_listener .values ()])
40+ for listener in self .device_listener .values ():
41+ listener .disconnect ()
4242
4343 def build_roborock_message (self , method : RoborockCommand , params : Optional [list ] = None ) -> RoborockMessage :
4444 secured = True if method in SPECIAL_COMMANDS else False
@@ -103,72 +103,56 @@ def is_closed(self):
103103 return self ._closed
104104
105105
106- class RoborockSocketListener :
106+ class RoborockSocketListener ( asyncio . Protocol ) :
107107 roborock_port = 58867
108108
109109 def __init__ (
110110 self ,
111111 ip : str ,
112112 local_key : str ,
113113 on_message : Callable [[list [RoborockMessage ]], None ],
114- timeout : float | int = 4 ,
114+ timeout : float | int = QUEUE_TIMEOUT ,
115115 ):
116116 self .ip = ip
117117 self .local_key = local_key
118- self .socket = RoborockSocket (socket .AF_INET , socket .SOCK_STREAM )
119- self .socket .setblocking (False )
120118 self .loop = get_running_loop_or_create_one ()
121119 self .on_message = on_message
122120 self .timeout = timeout
123- self .is_connected = False
124- self ._mutex = Lock ()
125121 self .remaining = b""
122+ self .transport : Transport | None = None
126123
127- async def _main_coro (self ):
128- while not self .socket .is_closed :
129- try :
130- message = await self .loop .sock_recv (self .socket , 4096 )
131- try :
132- if self .remaining :
133- message = self .remaining + message
134- self .remaining = b""
135- (parser_msg , remaining ) = RoborockParser .decode (message , self .local_key )
136- self .remaining = remaining
137- self .on_message (parser_msg )
138- except Exception as e :
139- _LOGGER .exception (e )
140- except BrokenPipeError as e :
141- _LOGGER .exception (e )
142- await self .disconnect ()
124+ def data_received (self , message ):
125+ if self .remaining :
126+ message = self .remaining + message
127+ self .remaining = b""
128+ (parser_msg , remaining ) = RoborockParser .decode (message , self .local_key )
129+ self .remaining = remaining
130+ self .on_message (parser_msg )
131+
132+ def connection_lost (self , exc ):
133+ print ("The server closed the connection" )
134+
135+ def is_connected (self ):
136+ return self .transport and self .transport .is_reading ()
143137
144138 async def connect (self ):
145- async with self ._mutex :
146- if not self .is_connected or self .socket .is_closed :
147- self .socket = RoborockSocket (socket .AF_INET , socket .SOCK_STREAM )
148- self .socket .setblocking (False )
149- try :
150- async with async_timeout .timeout (self .timeout ):
151- _LOGGER .info (f"Connecting to { self .ip } " )
152- await self .loop .sock_connect (self .socket , (self .ip , 58867 ))
153- self .is_connected = True
154- except Exception as e :
155- await self .disconnect ()
156- raise RoborockConnectionException (f"Failed connecting to { self .ip } " ) from e
157- self .loop .create_task (self ._main_coro ())
158-
159- async def disconnect (self ):
160- self .socket .close ()
161- self .is_connected = False
139+ try :
140+ if not self .is_connected ():
141+ async with async_timeout .timeout (self .timeout ):
142+ _LOGGER .info (f"Connecting to { self .ip } " )
143+ self .transport , _ = await self .loop .create_connection (lambda : self , self .ip , 58867 )
144+ except Exception as e :
145+ raise RoborockConnectionException (f"Failed connecting to { self .ip } " ) from e
146+
147+ def disconnect (self ):
148+ if self .transport :
149+ self .transport .close ()
162150
163151 async def send_message (self , data : bytes ) -> None :
164152 await self .connect ()
165153 try :
166- async with self ._mutex :
167- async with async_timeout .timeout (self .timeout ):
168- await self .loop .sock_sendall (self .socket , data )
169- except (asyncio .TimeoutError , asyncio .CancelledError ):
170- await self .disconnect ()
171- raise RoborockTimeout (f"Timeout after { self .timeout } seconds waiting for response" ) from None
172- except BrokenPipeError as e :
173- await self .disconnect ()
154+ if not self .transport :
155+ raise RoborockException ("Can not send message without connection" )
156+ self .transport .write (data )
157+ except Exception as e :
174158 raise RoborockException (e ) from e
0 commit comments