diff --git a/cri_lib/__init__.py b/cri_lib/__init__.py index eec1630..5aaf958 100644 --- a/cri_lib/__init__.py +++ b/cri_lib/__init__.py @@ -2,8 +2,13 @@ .. include:: ../README.md """ -from .cri_controller import CRIController, MotionType -from .cri_errors import CRICommandTimeOutError, CRIConnectionError, CRIError +from .cri_controller import CRIClient, CRIConnector, CRIController, MotionType +from .cri_errors import ( + CRICommandError, + CRICommandTimeOutError, + CRIConnectionError, + CRIError, +) from .cri_protocol_parser import CRIProtocolParser from .robot_state import ( ErrorStates, diff --git a/cri_lib/cri_controller.py b/cri_lib/cri_controller.py index 3f8f135..2dbb407 100644 --- a/cri_lib/cri_controller.py +++ b/cri_lib/cri_controller.py @@ -1,14 +1,17 @@ +import asyncio +import contextlib import logging import socket import threading +from collections.abc import AsyncIterator from datetime import datetime, timezone from enum import Enum from pathlib import Path from queue import Empty, Queue from time import sleep, time -from typing import Any, Callable +from typing import Any, Callable, Literal -from .cri_errors import CRICommandTimeOutError, CRIConnectionError +from .cri_errors import CRICommandError, CRICommandTimeOutError, CRIConnectionError from .cri_protocol_parser import CRIProtocolParser from .robot_state import KinematicsState, RobotState @@ -17,6 +20,8 @@ DEFAULT = object() """Placeholder for defaulting a parameter to runtime-configurable default values.""" +REQUIRED_STATUS_CATEGORIES = {"STATUS", "RUNSTATE"} +"""Robot state message categories that must must be received before confirming fully connected.""" class MotionType(Enum): @@ -28,30 +33,32 @@ class MotionType(Enum): Platform = "Platform" -class CRIController: - """ - Class implementing the CRI network protocol for igus Robot Control. - """ +class CRIClient: + """Client with implementations for read-only communication.""" ALIVE_JOG_INTERVAL_SEC = 0.2 - ACTIVE_JOG_INTERVAL_SEC = 0.02 RECEIVE_TIMEOUT_SEC = 5 DEFAULT_ANSWER_TIMEOUT = 10.0 def __init__(self) -> None: + """Create a ``CRIClient`` without connecting it yet. + + Call ``connect`` to connect and start receiving data. + """ self.robot_state: RobotState = RobotState() self.robot_state_lock = threading.Lock() self.parser = CRIProtocolParser(self.robot_state, self.robot_state_lock) self.connected = False - self.sock: socket.socket | None = None + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket_write_lock = threading.Lock() self.can_mode: bool = False self.can_queue: Queue = Queue() self.jog_thread = threading.Thread(target=self._bg_alivejog_thread, daemon=True) + self.jog_intervall = self.ALIVE_JOG_INTERVAL_SEC self.receive_thread = threading.Thread( target=self._bg_receive_thread, daemon=True ) @@ -64,28 +71,13 @@ def __init__(self) -> None: self.status_callback: Callable | None = None - self.live_jog_active: bool = False - self.jog_intervall = self.ALIVE_JOG_INTERVAL_SEC - self.jog_speeds: dict[str, float] = { - "A1": 0.0, - "A2": 0.0, - "A3": 0.0, - "A4": 0.0, - "A5": 0.0, - "A6": 0.0, - "E1": 0.0, - "E2": 0.0, - "E3": 0.0, - } - self.jog_speeds_lock = threading.Lock() - def connect( self, host: str, port: int = 3920, application_name: str = "CRI-Python-Lib", application_version: str = "0-0-0-0", - ) -> bool: + ) -> Literal[True]: """ Connect to iRC. @@ -103,16 +95,21 @@ def connect( Returns ------- bool - True if connected - False if not connected + True if connected. + Otherwise an exception is raised. + Raises + ------ + CRIConnectionError + When already connected or connection fails. """ - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + if self.connected: + raise CRIConnectionError("Already connected.") self.sock.settimeout(0.1) # Set a timeout of 0.1 seconds try: ip = socket.gethostbyname(host) self.sock.connect((ip, port)) - logger.debug("\t Robot connected: %s:%d", host, port) + # Mark as connected before starting threads self.connected = True # Start receiving commands @@ -124,19 +121,15 @@ def connect( hello_msg = f'INFO Hello "{application_name}" {application_version} {datetime.now(timezone.utc).strftime(format="%Y-%m-%dT%H:%M:%S")}' self._send_command(hello_msg) - + logger.debug("Connected to %s:%d", host, port) return True except ConnectionRefusedError: - logger.error( - "Connection refused: Unable to connect to %s:%i", - host, - port, + raise CRIConnectionError( + f"Connection refused: Unable to connect to {host}:{port}" ) - return False except Exception as e: - logger.exception("An error occurred while attempting to connect.") - return False + raise CRIConnectionError("Failed to connect to iRC.") from e def close(self) -> None: """ @@ -170,6 +163,8 @@ def _send_command( ) -> int: """Sends the given command to iRC. + The method is marked private because technically it can be used to send control commands as well. + Parameters ---------- command : str @@ -231,13 +226,7 @@ def _bg_alivejog_thread(self) -> None: Background Thread sending alivejog messages to keep connection alive. """ while self.connected: - if self.live_jog_active: - with self.jog_speeds_lock: - command = f"ALIVEJOG {self.jog_speeds['A1']} {self.jog_speeds['A2']} {self.jog_speeds['A3']} {self.jog_speeds['A4']} {self.jog_speeds['A5']} {self.jog_speeds['A6']} {self.jog_speeds['E1']} {self.jog_speeds['E2']} {self.jog_speeds['E3']}" - else: - command = "ALIVEJOG 0 0 0 0 0 0 0 0 0" - - if self._send_command(command) is None: + if self._send_command("ALIVEJOG 0 0 0 0 0 0 0 0 0") is None: logger.error("AliveJog Thread: Connection lost.") self.connected = False return @@ -392,8 +381,125 @@ def callback(state: RobotState) """ self.status_callback = callback + def wait_for_kinematics_ready(self, timeout: float = 30) -> bool: + """Wait until drive state is indicated as ready. + + Parameters + ---------- + timeout : float + maximum time to wait in seconds + + Returns + ------- + bool + `True`if drives are ready, `False` if not ready or timeout + """ + start_time = time() + new_timeout = timeout + while new_timeout > 0.0: + self.wait_for_status_update(timeout=new_timeout) + if (self.robot_state.kinematics_state == KinematicsState.NO_ERROR) and ( + self.robot_state.combined_axes_error == "NoError" + ): + return True + + new_timeout = timeout - (time() - start_time) + + return False + + def get_board_temperatures( + self, + blocking: bool = True, + timeout: float | None = DEFAULT, # type: ignore + ) -> bool: + """Receive motor controller PCB temperatures and save in robot state + + Parameters + ---------- + blocking: bool + wait for response, always returns True if not waiting + + timeout: float | None + timeout for waiting in seconds or None for infinite waiting + """ + self._send_command("SYSTEM GetBoardTemp", True, "info_boardtemp") + if ( + error_msg := self._wait_for_answer("info_boardtemp", timeout=timeout) + ) is not None: + logger.debug("Error in GetBoardTemp command: %s", error_msg) + return False + else: + return True + + def get_motor_temperatures( + self, + blocking: bool = True, + timeout: float | None = DEFAULT, # type: ignore + ) -> bool: + """Receive motor temperatures and save in robot state + + Parameters + ---------- + blocking: bool + wait for response, always returns True if not waiting + + timeout: float | None + timeout for waiting in seconds or None for infinite waiting + """ + self._send_command("SYSTEM GetMotorTemp", True, "info_motortemp") + if ( + error_msg := self._wait_for_answer("info_motortemp", timeout=timeout) + ) is not None: + logger.debug("Error in GetMotorTemp command: %s", error_msg) + return False + else: + return True + + +class CRIController(CRIClient): + """A connected ``CRIClient`` with control capabilities.""" + + ACTIVE_JOG_INTERVAL_SEC = 0.02 + + def __init__(self) -> None: + self.live_jog_active: bool = False + self.jog_intervall = self.ALIVE_JOG_INTERVAL_SEC + self.jog_speeds: dict[str, float] = { + "A1": 0.0, + "A2": 0.0, + "A3": 0.0, + "A4": 0.0, + "A5": 0.0, + "A6": 0.0, + "E1": 0.0, + "E2": 0.0, + "E3": 0.0, + } + self.jog_speeds_lock = threading.Lock() + super().__init__() + + def _bg_alivejog_thread(self) -> None: + """Overrides the ``CRIClient._bg_alivejog_thread`` to send alivejog messages with possibly nonzero jog speeds.""" + while self.connected: + if self.live_jog_active: + with self.jog_speeds_lock: + command = f"ALIVEJOG {self.jog_speeds['A1']} {self.jog_speeds['A2']} {self.jog_speeds['A3']} {self.jog_speeds['A4']} {self.jog_speeds['A5']} {self.jog_speeds['A6']} {self.jog_speeds['E1']} {self.jog_speeds['E2']} {self.jog_speeds['E3']}" + else: + command = "ALIVEJOG 0 0 0 0 0 0 0 0 0" + + if self._send_command(command) is None: + logger.error("AliveJog Thread: Connection lost.") + self.connected = False + return + + sleep(self.jog_intervall) + + def send_command(self, command, register_answer=False, fixed_answer_name=None): + """Wraps the superclass method to make it public.""" + return super()._send_command(command, register_answer, fixed_answer_name) + def reset(self) -> bool: - """Reset robot + """Reset robot clears errors and fetches current axis positions from the modules. Returns ------- @@ -409,8 +515,9 @@ def reset(self) -> bool: return True def enable(self) -> bool: - """Enable robot - An potential error message received from the robot will be logged with priority DEBUG + """Enable robot activates the motors. + + An potential error message received from the robot will be logged with priority DEBUG Returns ------- @@ -426,7 +533,7 @@ def enable(self) -> bool: return True def disable(self) -> bool: - """Disable robot + """Disable robot stops currently running programs, movements and deactivates the motors. Returns ------- @@ -540,32 +647,6 @@ def get_referencing_info(self): else: return True - def wait_for_kinematics_ready(self, timeout: float = 30) -> bool: - """Wait until drive state is indicated as ready. - - Parameters - ---------- - timeout : float - maximum time to wait in seconds - - Returns - ------- - bool - `True`if drives are ready, `False` if not ready or timeout - """ - start_time = time() - new_timeout = timeout - while new_timeout > 0.0: - self.wait_for_status_update(timeout=new_timeout) - if (self.robot_state.kinematics_state == KinematicsState.NO_ERROR) and ( - self.robot_state.combined_axes_error == "NoError" - ): - return True - - new_timeout = timeout - (time() - start_time) - - return False - def move_joints( self, A1: float, @@ -1322,54 +1403,99 @@ def can_receive( return item - def get_board_temperatures( - self, - blocking: bool = True, - timeout: float | None = DEFAULT, # type: ignore - ) -> bool: - """Receive motor controller PCB temperatures and save in robot state - Parameters - ---------- - blocking: bool - wait for response, always returns True if not waiting +# Monkey patch to maintain backward compatibility +CRIController.MotionType = MotionType # type: ignore - timeout: float | None - timeout for waiting in seconds or None for infinite waiting - """ - self._send_command("SYSTEM GetBoardTemp", True, "info_boardtemp") - if ( - error_msg := self._wait_for_answer("info_boardtemp", timeout=timeout) - ) is not None: - logger.debug("Error in GetBoardTemp command: %s", error_msg) - return False - else: - return True - def get_motor_temperatures( +class CRIConnector: + """Factory providing context managers for connecting with clean resource lifecycle management. + + The context managers will yield ``CRIClient`` or ``CRIController`` instances + and ensure that the connection is properly closed and resources disposed when the context is exited. + """ + + def __init__( self, - blocking: bool = True, - timeout: float | None = DEFAULT, # type: ignore - ) -> bool: - """Receive motor temperatures and save in robot state + host: str, + port: int = 3920, + application_name: str = "CRI-Python-Lib", + application_version: str = "0-0-0-0", + ) -> None: + """Create a factory for active or passive connection to the robot controller. Parameters ---------- - blocking: bool - wait for response, always returns True if not waiting - - timeout: float | None - timeout for waiting in seconds or None for infinite waiting + host : str + IP address or hostname of iRC + port : int + port of iRC + application_name : str + optional name of your application sent to controller + application_version: str + optional version of your application sent to controller """ - self._send_command("SYSTEM GetMotorTemp", True, "info_motortemp") - if ( - error_msg := self._wait_for_answer("info_motortemp", timeout=timeout) - ) is not None: - logger.debug("Error in GetMotorTemp command: %s", error_msg) - return False - else: - return True + # Remember connection parameters so that context entry methods don't need them. + self.host = host + self.port = port + self.application_name = application_name + self.application_version = application_version + super().__init__() + + @contextlib.asynccontextmanager + async def observe(self) -> AsyncIterator[CRIClient]: + """Establish connection for reading robot state. + + ⚠️ Do not connect/disconnect at high frequency - use long-lived connection for monitoring ⚠️ + """ + client = CRIClient() + try: + client.connect( + self.host, self.port, self.application_name, self.application_version + ) + while REQUIRED_STATUS_CATEGORIES.difference( + client.robot_state.category_time_ns + ): + await asyncio.sleep(0.05) + yield client + # Graceful context exit; nothing to do other than disconnect in finally block. + finally: + client.close() + return -# Monkey patch to maintain backward compatibility -CRIController.MotionType = MotionType # type: ignore + @contextlib.asynccontextmanager + async def control(self, *, auto_disable: bool) -> AsyncIterator[CRIController]: + """Establish connection for controlling robot state. + + Parameters + ---------- + auto_disable + If ``True`` a graceful exit will call :meth:`CRIConnector.disable` + to stop movements and turn off motors. + """ + controller = CRIController() + try: + controller.connect( + self.host, self.port, self.application_name, self.application_version + ) + while REQUIRED_STATUS_CATEGORIES.difference( + controller.robot_state.category_time_ns + ): + await asyncio.sleep(0.05) + + # Take active control + if not controller.set_active_control(True): + raise CRICommandError("Failed to acquire active control.") + if not controller.enable(): + raise CRICommandError("Failed to enable robot.") + if not controller.wait_for_kinematics_ready(10): + raise CRICommandError("Kinematics not ready.") + yield controller + # Graceful context exit: give up control (maybe disable robot) then disconnect in finally block. + if auto_disable: + controller.disable() + controller.set_active_control(False) + finally: + controller.close() + return diff --git a/cri_lib/cri_errors.py b/cri_lib/cri_errors.py index 4069dc5..5c665fd 100644 --- a/cri_lib/cri_errors.py +++ b/cri_lib/cri_errors.py @@ -10,6 +10,14 @@ def __init__(self, message="Not connected to iRC or connection lost."): super().__init__(self.message) +class CRICommandError(CRIError): + """Raised when a command fails to execute properly.""" + + def __init__(self, message="Command execution failed."): + self.message = message + super().__init__(self.message) + + class CRICommandTimeOutError(CRIError): """Raised when a command times out.""" diff --git a/cri_lib/cri_protocol_parser.py b/cri_lib/cri_protocol_parser.py index bdd0a4c..b383b4d 100644 --- a/cri_lib/cri_protocol_parser.py +++ b/cri_lib/cri_protocol_parser.py @@ -1,4 +1,5 @@ import logging +import time from threading import Lock from typing import Any, Sequence @@ -46,69 +47,66 @@ def parse_message( a dict indicating which answer event to notify (key: "answer") and optionally an error message (key: "error") """ parts = message.split() - - match parts[2]: + cmd_category = parts[2] + result: dict[str, str] | dict[str, str | None] | None = None + match cmd_category: case "STATUS": self._parse_status(parts[3:-1]) - return {"answer": "status"} + result = {"answer": "status"} case "RUNSTATE": self._parse_runstate(parts[3:-1]) - return None case "CYCLESTAT": self._parse_cyclestat(parts[3:-1]) - return None case "GRIPPERSTATE": self._parse_gripperstate(parts[3:-1]) - return None case "VARIABLES": self._parse_variables(parts[3:-1]) - return None case "OPINFO": self._parse_opinfo(parts[3:-1]) - return None case "CMD": - return {"answer": self._parse_cmd(parts[3:-1])} + result = {"answer": self._parse_cmd(parts[3:-1])} case "MESSAGE": self._parse_message_message(parts[3:-1]) - return None case "CONFIG": self._parse_config(parts[3:-1]) - return None case "CANBridge": - return self._parse_can_bridge(parts[3:-1]) + result = self._parse_can_bridge(parts[3:-1]) case "CMDACK": - return {"answer": parts[3]} + result = {"answer": parts[3]} case "CMDERROR": - return self._parse_cmderror(parts[3:-1]) + result = self._parse_cmderror(parts[3:-1]) case "INFO": if (answer := self._parse_info(parts[3:-1])) is not None: - return {"answer": answer} - else: - return None + result = {"answer": answer} case "EXECEND": - return {"answer": "EXECEND"} + result = {"answer": "EXECEND"} case "EXECERROR": - return self._parse_execerror(parts[3:-1]) + result = self._parse_execerror(parts[3:-1]) case _: logger.debug( - "Unknown message type %s received:\n%s", parts[2], " ".join(parts) + "Unknown message type %s received:\n%s", + cmd_category, + " ".join(parts), ) - return None + # Remember per-category timestamps to facilitate age-checks + with self.robot_state_lock: + self.robot_state.category_time_ns[cmd_category] = time.time_ns() + return result def _parse_status(self, parameters: list[str]) -> None: """ diff --git a/cri_lib/robot_state.py b/cri_lib/robot_state.py index 3a31cca..6d9052b 100644 --- a/cri_lib/robot_state.py +++ b/cri_lib/robot_state.py @@ -345,3 +345,6 @@ class RobotState: motor_temps: list[float] = field(default_factory=lambda: [0.0] * 16) """Temperatures of motors""" + + category_time_ns: dict[str, int] = field(default_factory=lambda: {}) + """Per `CMD_CATEGORY ` nanosecond epoch timestamps when the most recent update was received.""" diff --git a/examples/asynchronous_control.py b/examples/asynchronous_control.py new file mode 100644 index 0000000..6ce167a --- /dev/null +++ b/examples/asynchronous_control.py @@ -0,0 +1,73 @@ +"""Example of asynchronous control of an iRC robot. + +This is relevant for applications that need to perform other tasks concurrently while controlling the robot, +for example monitoring other inputs, controlling other actuators, or coordinating with other systems. +""" + +import asyncio +import logging + +from cri_lib import CRIConnector + +logging.basicConfig( + level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger(__name__) + + +async def main(): + # The connector creates passive or active control sessions with proper resource management. + connector = CRIConnector( + host="127.0.0.1", + port=3921, + ) + + # connect asynchronously + async with connector.observe() as client: + logger.info("Current state is: %s", client.robot_state) + # disconnect automatically when exiting the context + + # connect and take control + async with connector.control(auto_disable=False) as controller: + controller.set_override(100.0) + + # Perform relative movement + logger.info("Moving base relative: +20mm in X, Y, Z...") + controller.move_base_relative( + 20.0, + 20.0, + 20.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 10.0, + wait_move_finished=True, + move_finished_timeout=1000, + ) + + logger.info("Moving back: -20mm in X, Y, Z...") + controller.move_base_relative( + -20.0, + -20.0, + -20.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 10.0, + wait_move_finished=True, + move_finished_timeout=1000, + ) + # disconnect and release control automatically by exiting the context + + logger.info("Script execution completed successfully.") + return + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/tests/test_cri_parser.py b/tests/test_cri_parser.py index 706493e..4e15f92 100644 --- a/tests/test_cri_parser.py +++ b/tests/test_cri_parser.py @@ -1,3 +1,4 @@ +import copy import threading import pytest @@ -22,6 +23,16 @@ ) +def robot_state_equal(actual: RobotState, expected: RobotState) -> bool: + """Compare robot state without requiring update nanosecond values to match.""" + if set(actual.category_time_ns) != set(expected.category_time_ns): + return False + # copy the timestamp values to facilitate equals comparison of the dataclasses + expected_copy = copy.deepcopy(expected) + expected_copy.category_time_ns.update(actual.category_time_ns) + return actual == expected_copy + + def test_parse_state(): test_message = """ CRISTART 1234 STATUS MODE joint @@ -44,6 +55,7 @@ def test_parse_state(): """ robot_state_correct = RobotState() + robot_state_correct.category_time_ns["STATUS"] = 0 robot_state_correct.mode = RobotMode.JOINT robot_state_correct.joints_set_point = JointsState( 1.00, @@ -135,7 +147,7 @@ def test_parse_state(): controller = CRIController() controller._parse_message(test_message) - assert controller.robot_state == robot_state_correct + assert robot_state_equal(controller.robot_state, robot_state_correct) def test_parse_runstate_main(): @@ -144,6 +156,7 @@ def test_parse_runstate_main(): ) robot_state_correct = RobotState() + robot_state_correct.category_time_ns["RUNSTATE"] = 0 robot_state_correct.main_main_program = "testmotion.xml" robot_state_correct.main_current_program = "pickpart.xml" robot_state_correct.main_commands_count = 12 @@ -153,7 +166,7 @@ def test_parse_runstate_main(): controller = CRIController() controller._parse_message(test_message) - assert controller.robot_state == robot_state_correct + assert robot_state_equal(controller.robot_state, robot_state_correct) def test_parse_runstate_logic(): @@ -162,6 +175,7 @@ def test_parse_runstate_logic(): ) robot_state_correct = RobotState() + robot_state_correct.category_time_ns["RUNSTATE"] = 0 robot_state_correct.logic_main_program = "testlogic.xml" robot_state_correct.logic_current_program = "testlogic.xml" robot_state_correct.logic_commands_count = 11 @@ -171,7 +185,7 @@ def test_parse_runstate_logic(): controller = CRIController() controller._parse_message(test_message) - assert controller.robot_state == robot_state_correct + assert robot_state_equal(controller.robot_state, robot_state_correct) def test_regr_parse_status_plattform(): @@ -180,11 +194,12 @@ def test_regr_parse_status_plattform(): test_message = "CRISTART 1234 STATUS POSCARTPLATTFORM 10.0 20.0 180.00 CRIEND" robot_state_correct = RobotState() + robot_state_correct.category_time_ns["STATUS"] = 0 robot_state_correct.position_platform = PlatformCartesianPosition(10.0, 20.0, 180.0) controller = CRIController() controller._parse_message(test_message) - assert controller.robot_state == robot_state_correct + assert robot_state_equal(controller.robot_state, robot_state_correct) def test_parse_cyclestat(): @@ -192,12 +207,13 @@ def test_parse_cyclestat(): test_message = "CRISTART 1234 CYCLESTAT 9.5 12.3 CRIEND" robot_state_correct = RobotState() + robot_state_correct.category_time_ns["CYCLESTAT"] = 0 robot_state_correct.cycle_time = 9.5 robot_state_correct.workload = 12.3 controller = CRIController() controller._parse_message(test_message) - assert controller.robot_state == robot_state_correct + assert robot_state_equal(controller.robot_state, robot_state_correct) def test_parse_gripperstate(): @@ -205,21 +221,23 @@ def test_parse_gripperstate(): test_message = "CRISTART 1234 GRIPPERSTATE 0.7 CRIEND" robot_state_correct = RobotState() + robot_state_correct.category_time_ns["GRIPPERSTATE"] = 0 robot_state_correct.gripper_state = 0.7 controller = CRIController() controller._parse_message(test_message) - assert controller.robot_state == robot_state_correct + assert robot_state_equal(controller.robot_state, robot_state_correct) def test_parse_unknown_message_tpye(): """Test for unknown message type""" test_message = "CRISTART 1234 UNKNOWN 1 2 3 4 5 CRIEND" robot_state_correct = RobotState() + robot_state_correct.category_time_ns["UNKNOWN"] = 0 controller = CRIController() controller._parse_message(test_message) - assert controller.robot_state == robot_state_correct + assert robot_state_equal(controller.robot_state, robot_state_correct) def test_parse_variables(): @@ -314,53 +332,57 @@ def test_parse_variables(): ), } robot_state_correct = RobotState() + robot_state_correct.category_time_ns["VARIABLES"] = 0 robot_state_correct.variabels = variables controller = CRIController() controller._parse_message(test_message) - assert controller.robot_state == robot_state_correct + assert robot_state_equal(controller.robot_state, robot_state_correct) def test_parse_opinfo(): test_message = "CRISTART 6 OPINFO 0 235 235 114 5 0 0 CRIEND" robot_state_correct = RobotState() + robot_state_correct.category_time_ns["OPINFO"] = 0 robot_state_correct.operation_info = OperationInfo(0, 235, 235, 114, 5, 0, 0) controller = CRIController() controller._parse_message(test_message) - assert controller.robot_state == robot_state_correct + assert robot_state_equal(controller.robot_state, robot_state_correct) def test_parse_cmd_active(): test_message = "CRISTART 4 CMD Active false CRIEND" robot_state_correct = RobotState() + robot_state_correct.category_time_ns["CMD"] = 0 robot_state_correct.active_control = False controller = CRIController() controller._parse_message(test_message) - assert controller.robot_state == robot_state_correct + assert robot_state_equal(controller.robot_state, robot_state_correct) test_message = "CRISTART 4 CMD Active true CRIEND" robot_state_correct.active_control = True controller._parse_message(test_message) - assert controller.robot_state == robot_state_correct + assert robot_state_equal(controller.robot_state, robot_state_correct) test_message = "CRISTART 4 CMD Active foo CRIEND" controller._parse_message(test_message) - assert controller.robot_state == robot_state_correct + assert robot_state_equal(controller.robot_state, robot_state_correct) def test_parse_message_robotcontrol(): test_message = "CRISTART 1 MESSAGE RobotControl Version V980-14-002-3 CRIEND" robot_state_correct = RobotState() + robot_state_correct.category_time_ns["MESSAGE"] = 0 robot_state_correct.robot_control_version = "V980-14-002-3" controller = CRIController() controller._parse_message(test_message) - assert controller.robot_state == robot_state_correct + assert robot_state_equal(controller.robot_state, robot_state_correct) def test_splits_quotes_aware(): @@ -384,31 +406,33 @@ def test_parse_message_configuration(): test_message = 'CRISTART 2 MESSAGE Configuration: "igus REBEL-6DOF" Type: "igus-REBEL/REBEL-6DOF-02" Gripper: "Multigrip" CRIEND' robot_state_correct = RobotState() + robot_state_correct.category_time_ns["MESSAGE"] = 0 robot_state_correct.robot_configuration = "igus REBEL-6DOF" robot_state_correct.robot_type = "igus-REBEL/REBEL-6DOF-02" robot_state_correct.gripper_type = "Multigrip" controller = CRIController() controller._parse_message(test_message) - assert controller.robot_state == robot_state_correct + assert robot_state_equal(controller.robot_state, robot_state_correct) test_message = 'CRISTART 2 MESSAGE Type: "igus-REBEL/REBEL-6DOF-02test1" Gripper: "Multigriptest1" CRIEND' robot_state_correct.robot_type = "igus-REBEL/REBEL-6DOF-02test1" robot_state_correct.gripper_type = "Multigriptest1" controller._parse_message(test_message) - assert controller.robot_state == robot_state_correct + assert robot_state_equal(controller.robot_state, robot_state_correct) def test_parse_config(): test_message = "CRISTART 1234 CONFIG ProjectFile robotprj.xml CRIEND" robot_state_correct = RobotState() + robot_state_correct.category_time_ns["CONFIG"] = 0 robot_state_correct.project_file = "robotprj.xml" controller = CRIController() controller._parse_message(test_message) - assert controller.robot_state == robot_state_correct + assert robot_state_equal(controller.robot_state, robot_state_correct) def test_parse_cmdack():