diff --git a/pyproject.toml b/pyproject.toml index 196d0d09..942f2d43 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,6 +44,7 @@ fmsr-mcp-server = "servers.fmsr.main:main" tsfm-mcp-server = "servers.tsfm.main:main" wo-mcp-server = "servers.wo.main:main" vibration-mcp-server = "servers.vibration.main:main" +robot-mcp-server = "servers.robot.main:main" openai-agent = "agent.openai_agent.cli:main" deep-agent = "agent.deep_agent.cli:main" stirrup-agent = "agent.stirrup_agent.cli:main" diff --git a/src/agent/runner.py b/src/agent/runner.py index 1c06ec60..334ad7a9 100644 --- a/src/agent/runner.py +++ b/src/agent/runner.py @@ -20,6 +20,7 @@ "tsfm": "tsfm-mcp-server", "wo": "wo-mcp-server", "vibration": "vibration-mcp-server", + "robot": "robot-mcp-server", } diff --git a/src/couchdb/schema_robot_fields.json b/src/couchdb/schema_robot_fields.json new file mode 100644 index 00000000..0ebc3be9 --- /dev/null +++ b/src/couchdb/schema_robot_fields.json @@ -0,0 +1,141 @@ +{ + "_comment": "CouchDB document types for the robot inspection MCP server (12 Layer 1 tools). All documents live in the iot DB alongside timestamped IoT sensor readings. None contain an asset_id field, so existing IoT server queries ({asset_id: {$exists: true}}) are unaffected.", + + "documents": { + + "asset_robot_profile": { + "doc_type": "asset_robot_profile", + "_id_pattern": "profile:{normalized_asset_id}", + "_id_examples": ["profile:chiller_6", "profile:motor_01"], + "description": "Per-asset physical layout and panel state. Seeded by seed_robot_profiles.py. Scenario-specific state (e.g. panel_stuck=true for R001) is loaded by the eval harness via the shared robot JSON files.", + "fields": { + "physical_location": { + "type": "object or null", + "schema": {"x": "float (m)", "y": "float (m)", "z": "float (m)", "room_id": "string"}, + "default": null, + "purpose": "Facility floor-plan coordinates used by navigate_to() as the GraphNav navigation target. Null until floor-plan data is collected for the asset." + }, + "gauge_path": { + "type": "string or null", + "default": null, + "purpose": "File path or URL of the physical gauge image captured by capture_image(). Null until field data has been collected. When non-null and panel_stuck=false, the VLM perception agent can read the gauge value from this image." + }, + "gauge_range": { + "type": "array [low, high]", + "default": [0, 100], + "purpose": "Physical measurement range of the gauge (e.g. [0, 400] for a 0–400 bar pressure gauge). Included in the vlm_hint string returned by capture_image() so the VLM agent knows the scale." + }, + "gauge_description": { + "type": "string", + "default": "", + "purpose": "Plain English description of the gauge and its units (e.g. 'Compressor outlet pressure gauge, analog dial, 0–400 bar'). Included in the vlm_hint string returned by capture_image()." + }, + "panel_stuck": { + "type": "bool", + "default": false, + "purpose": "True when the physical inspection panel cannot be opened by the robot arm. open_panel() returns access_granted=false and capture_image() returns occlusion_flag=true when panel_stuck=true. Represents a physical kinematic failure (panel seized, corroded, or obstructed) that requires human maintenance escalation." + } + } + }, + + "robot_state": { + "doc_type": "robot_state", + "_id_pattern": "robot_state:{robot_id}", + "_id_example": "robot_state:spot_1", + "description": "Per-robot battery, pose, power and stance state. Robot-level, not asset-level. Seeded by seed_robot_profiles.py. power_state and stance_state added to support the 6 motion tools (power_on, power_off, stand, sit, dock, undock).", + "fields": { + "battery_charge_pct": { + "type": "float", + "range": [0.0, 100.0], + "default": 85.0, + "purpose": "Current battery charge percentage. get_battery() reads this. When battery_charge_pct < battery_low_threshold, get_battery() returns low_battery=true and the agent must abort the inspection and return to the charging dock immediately." + }, + "battery_low_threshold": { + "type": "float", + "default": 20.0, + "purpose": "Threshold percentage below which the mission must be aborted. get_battery() computes low_battery = (battery_charge_pct < battery_low_threshold)." + }, + "battery_estimated_runtime_s": { + "type": "float", + "default": 5400.0, + "purpose": "Estimated remaining runtime in seconds. Informational — included in the get_battery() message to help the agent judge whether the battery will last the inspection." + }, + "at_charge_station": { + "type": "bool", + "default": false, + "purpose": "True if the robot is currently docked at a charging station. Included in the get_battery() response for context." + }, + "pose": { + "type": "object", + "schema": {"x": "float (m)", "y": "float (m)", "theta": "float (rad, yaw from east)", "frame": "string — always 'map'"}, + "default": {"x": 0.0, "y": 0.0, "theta": 0.0, "frame": "map"}, + "purpose": "Current robot pose in the facility map frame. get_pose() reads this. The agent should call get_pose() after navigate_to() to confirm the robot arrived at the intended location." + }, + "localization_ok": { + "type": "bool", + "default": true, + "purpose": "False when the robot cannot localise itself in the facility map (e.g. map data stale, fiducial missing, encoder drift too large). When false, get_pose() returns localization_ok=false and the agent must abort the inspection and raise a work order for robot remapping — taking a gauge reading from an unverified position risks reading the wrong asset." + }, + "pose_drift_m": { + "type": "float", + "default": 0.0, + "purpose": "Estimated pose error in metres. get_pose() issues a warning when drift > 0.5 m. When localization_ok=false this field typically exceeds 0.5 m (e.g. 1.8 m in the localisation failure scenario)." + }, + "fault_state": { + "type": "string or null", + "values": [null, "ESTOP_CUT", "COMMS_LOST", "JOINT_FAULT", "BATTERY_CRITICAL"], + "default": null, + "purpose": "Active robot fault, if any. Null means nominal. get_pose() surfaces this in the response. A non-null fault_state requires human intervention before the mission can continue." + }, + "power_state": { + "type": "string", + "values": ["POWERED_ON", "POWERED_OFF"], + "default": "POWERED_ON", + "purpose": "Motor power state. power_on() and power_off() read this field. Maps to bosdyn.client.power.PowerClient.power_on_motors() / power_off_motors() in the real Spot SDK. The robot must be POWERED_ON before stand(), sit(), navigate_to(), or open_panel() can succeed." + }, + "stance_state": { + "type": "string", + "values": ["STANDING", "SITTING"], + "default": "STANDING", + "purpose": "Robot stance. stand() and sit() read this field. Maps to bosdyn.client.robot_command.RobotCommandBuilder.synchro_stand_command() / synchro_sit_command(). The robot must be SITTING before power_off() is allowed. Call sit() before dock() at mission end." + } + } + }, + + "waypoints": { + "doc_type": "waypoints", + "_id": "waypoints", + "description": "Singleton document mapping assets to Spot GraphNav waypoint IDs. list_waypoints() returns the waypoints array. The agent must call list_waypoints() before navigate_to() to confirm the target waypoint exists and is active.", + "fields": { + "waypoints": { + "type": "array of objects", + "item_schema": { + "waypoint_id": "string — Spot GraphNav waypoint UUID or short name", + "waypoint_name": "string — human-readable name shown in the Spot app", + "asset_id": "string or null — normalized asset ID this waypoint serves (null for the dock)", + "location_description": "string — plain English location for logging", + "x": "float (m)", "y": "float (m)", + "active": "bool — false if the waypoint has been deleted or the plant reconfigured since last mapping. When false, navigate_to() for that asset must not be called — the agent must escalate and raise a work order for waypoint remapping." + }, + "purpose": "Authoritative list of inspection waypoints loaded from the Spot GraphNav map. list_waypoints() filters by asset_id when provided. A missing or inactive waypoint for the target asset represents a plant reconfiguration event that requires human re-mapping of the facility." + } + } + } + + }, + + "isolation_guarantee": "No robot document contains an asset_id field. IoT server queries {asset_id: {$exists: true}} cannot match any robot document.", + + "indexes": [ + { + "name": "idx_robot_state_by_robot", + "fields": ["doc_type", "robot_id"], + "purpose": "get_battery() and get_pose() lookup by robot_id" + }, + { + "name": "idx_waypoints_singleton", + "fields": ["doc_type"], + "purpose": "list_waypoints() fetch of the singleton waypoints document" + } + ] +} diff --git a/src/couchdb/seed_robot_profiles.py b/src/couchdb/seed_robot_profiles.py new file mode 100644 index 00000000..f0ec7bb7 --- /dev/null +++ b/src/couchdb/seed_robot_profiles.py @@ -0,0 +1,370 @@ +"""Seed robot documents into the iot CouchDB database. + +Creates three document types per robot deployment: + + profile:{asset_id} — per-asset physical layout and panel state (5 fields) + robot_state:{robot_id} — per-robot battery and pose state + waypoints — singleton inspection waypoint map + +All documents deliberately omit the ``asset_id`` field so existing IoT server +queries (``{"asset_id": {"$exists": true}}``) are completely unaffected. + +Scenario-specific state (e.g. low battery, localisation failure) is loaded by the +eval harness via init_data.py + the shared robot JSON files in +AssetOpsBenchScenarioGeneration/RobotInspection/shared/robot/. + +Usage: + python src/couchdb/seed_robot_profiles.py # seed all (nominal state) + python src/couchdb/seed_robot_profiles.py --dry-run # preview + python src/couchdb/seed_robot_profiles.py --verify # check seeded state + python src/couchdb/seed_robot_profiles.py --reset # restore nominal state +""" + +import argparse +import json +import os +import sys +from datetime import datetime, timezone + +import couchdb3 +import requests +from dotenv import load_dotenv + +load_dotenv() + +COUCHDB_URL = os.environ.get("COUCHDB_URL") +COUCHDB_DBNAME = os.environ.get("IOT_DBNAME") +COUCHDB_USERNAME = os.environ.get("COUCHDB_USERNAME") +COUCHDB_PASSWORD = os.environ.get("COUCHDB_PASSWORD") +ROBOT_ID = os.environ.get("ROBOT_ID", "spot_1") + +# --------------------------------------------------------------------------- +# PART 1 — Asset profile documents (5 fields per asset) +# --------------------------------------------------------------------------- + +PROFILE_DEFAULTS: dict = { + "physical_location": None, # navigate_to target; None until floor-plan loaded + "gauge_path": None, # capture_image returns this; None until field data collected + "gauge_range": [0, 100], + "gauge_description": "", + "panel_stuck": False, # open_panel + capture_image occlusion +} +PROFILE_FIELDS = list(PROFILE_DEFAULTS.keys()) + +ASSETS = [ + { + "display_name": "Chiller 6", + "profile_id": "profile:chiller_6", + "waypoint_id": "wp_chiller_6_panel", + "physical_location": {"x": 52.3, "y": 18.1, "z": 0.0, "room_id": "cooling_3B"}, + "gauge_range": [0, 400], + "gauge_description": "Compressor outlet pressure gauge, analog dial, 0–400 bar", + }, + { + "display_name": "Metro Pump 1", + "profile_id": "profile:metro_pump_1", + "waypoint_id": "wp_metro_pump_1_panel", + "physical_location": {"x": 14.0, "y": 32.5, "z": 0.0, "room_id": "pump_room_A"}, + "gauge_range": [0.0, 1.5], + "gauge_description": "Flow rate gauge, analog dial, 0–1.5 m³/s", + }, + { + "display_name": "Hydraulic Pump 1", + "profile_id": "profile:hydraulic_pump_1", + "waypoint_id": "wp_hydraulic_pump_1_panel", + "physical_location": {"x": 28.7, "y": 11.0, "z": 0.0, "room_id": "pump_room_B"}, + "gauge_range": [0, 350], + "gauge_description": "Hydraulic pressure gauge, analog dial, 0–350 bar", + }, + { + "display_name": "Motor 01", + "profile_id": "profile:motor_01", + "waypoint_id": "wp_motor_01_panel", + "physical_location": {"x": 7.2, "y": 44.8, "z": 0.0, "room_id": "motor_bay_1"}, + "gauge_range": [0, 200], + "gauge_description": "Motor temperature gauge, analog dial, 0–200°C", + }, +] + +# --------------------------------------------------------------------------- +# PART 2 — Robot state document +# --------------------------------------------------------------------------- + +def _nominal_robot_state() -> dict: + return { + "_id": f"robot_state:{ROBOT_ID}", + "doc_type": "robot_state", + "robot_id": ROBOT_ID, + "battery_charge_pct": 85.0, + "battery_low_threshold": 20.0, + "battery_estimated_runtime_s": 5400.0, + "at_charge_station": False, + "pose": {"x": 0.0, "y": 0.0, "theta": 0.0, "frame": "map"}, + "localization_ok": True, + "pose_drift_m": 0.0, + "fault_state": None, + "last_updated": datetime.now(timezone.utc).isoformat(), + } + +ROBOT_STATE_FIELDS = [ + "battery_charge_pct", "battery_low_threshold", "battery_estimated_runtime_s", + "at_charge_station", "pose", "localization_ok", "pose_drift_m", "fault_state", +] + +# --------------------------------------------------------------------------- +# PART 3 — Waypoints singleton document +# --------------------------------------------------------------------------- + +NOMINAL_WAYPOINTS: dict = { + "_id": "waypoints", + "doc_type": "waypoints", + "waypoints": [ + { + "waypoint_id": "wp_chiller_6_panel", + "waypoint_name": "Chiller 6 — Panel C-06", + "asset_id": "chiller_6", + "location_description": "Plant B / Mechanical Room 2 / Panel C-06", + "x": 52.3, "y": 18.1, + "active": True, + }, + { + "waypoint_id": "wp_metro_pump_1_panel", + "waypoint_name": "Metro Pump 1 — Panel MP-01", + "asset_id": "metro_pump_1", + "location_description": "Plant D / Pump Hall / Panel MP-01", + "x": 14.0, "y": 32.5, + "active": True, + }, + { + "waypoint_id": "wp_hydraulic_pump_1_panel", + "waypoint_name": "Hydraulic Pump 1 — Panel HP-01", + "asset_id": "hydraulic_pump_1", + "location_description": "Plant C / Hydraulic Room / Panel HP-01", + "x": 28.7, "y": 11.0, + "active": True, + }, + { + "waypoint_id": "wp_motor_01_panel", + "waypoint_name": "Motor 01 — Panel M-01", + "asset_id": "motor_01", + "location_description": "Plant A / Drive Bay 1 / Panel M-01", + "x": 7.2, "y": 44.8, + "active": True, + }, + { + "waypoint_id": "wp_dock_main", + "waypoint_name": "Main Charging Dock", + "asset_id": None, + "location_description": "Building A / Charging Bay 1", + "x": 0.0, "y": 0.0, + "active": True, + }, + ], +} + +# --------------------------------------------------------------------------- +# CouchDB helpers +# --------------------------------------------------------------------------- + +ALL_INDEXES = [ + {"name": "idx_robot_state_by_robot", "fields": ["doc_type", "robot_id"]}, + {"name": "idx_waypoints_singleton", "fields": ["doc_type"]}, +] + + +def _connect() -> couchdb3.Database: + if not COUCHDB_URL or not COUCHDB_DBNAME: + sys.exit("ERROR: COUCHDB_URL and IOT_DBNAME must be set.") + return couchdb3.Database( + COUCHDB_DBNAME, + url=COUCHDB_URL, + user=COUCHDB_USERNAME, + password=COUCHDB_PASSWORD, + ) + + +def _upsert(db, new_doc: dict, doc_id: str, required_fields: list, dry_run: bool) -> str: + try: + existing = db.get(doc_id) + except Exception: + existing = None + if existing is None: + action, final_doc = "CREATE", new_doc + else: + patched = False + final_doc = dict(existing) + for field in required_fields: + if field not in final_doc: + final_doc[field] = new_doc.get(field) + patched = True + action = "PATCH" if patched else "SKIP" + if not dry_run and action != "SKIP": + db.save(final_doc) + return action + + +def _ensure_indexes(db_name: str) -> None: + auth = (COUCHDB_USERNAME, COUCHDB_PASSWORD) + base = (COUCHDB_URL or "").rstrip("/") + print("\nCreating indexes...") + for idx in ALL_INDEXES: + url = f"{base}/{db_name}/_index" + payload = {"index": {"fields": idx["fields"]}, "name": idx["name"], "type": "json"} + try: + resp = requests.post(url, json=payload, auth=auth, timeout=10) + resp.raise_for_status() + result = resp.json().get("result", "ok") + print(f" [{result.upper()}] {idx['name']}") + except Exception as e: + print(f" [WARN] {idx['name']}: {e}") + + +# --------------------------------------------------------------------------- +# Main operations +# --------------------------------------------------------------------------- + +def run(dry_run: bool = False) -> None: + db = _connect() + label = "[DRY RUN] " if dry_run else "" + print(f"{label}Seeding robot documents into '{COUCHDB_DBNAME}'...\n") + + print("── Asset profiles ──") + for asset in ASSETS: + doc_id = asset["profile_id"] + new_doc = { + "_id": doc_id, "doc_type": "asset_robot_profile", + "display_name": asset["display_name"], + "waypoint_id": asset["waypoint_id"], + **PROFILE_DEFAULTS, + "physical_location": asset["physical_location"], + "gauge_range": asset["gauge_range"], + "gauge_description": asset["gauge_description"], + } + action = _upsert(db, new_doc, doc_id, PROFILE_FIELDS, dry_run) + print(f" [{action}] {doc_id}") + if dry_run and action != "SKIP": + print(f" {json.dumps({k: v for k, v in new_doc.items() if k not in ('_id', '_rev')}, indent=10)}\n") + + print("\n── Robot state ──") + state_doc = _nominal_robot_state() + action = _upsert(db, state_doc, state_doc["_id"], ROBOT_STATE_FIELDS, dry_run) + print(f" [{action}] robot_state:{ROBOT_ID}") + + print("\n── Waypoints ──") + wps_action = _upsert(db, NOMINAL_WAYPOINTS, "waypoints", ["waypoints"], dry_run) + print(f" [{wps_action}] waypoints") + + if not dry_run: + _ensure_indexes(COUCHDB_DBNAME) + print("\nDone." if not dry_run else "\n[Dry run complete — no writes performed.]") + + + +def reset(dry_run: bool = False) -> None: + db = _connect() + label = "[DRY RUN] " if dry_run else "" + print(f"{label}Restoring nominal state...\n") + state_id = f"robot_state:{ROBOT_ID}" + try: + existing = db.get(state_id) + except Exception: + existing = None + if existing: + nominal = {**_nominal_robot_state(), "_rev": existing["_rev"]} + if not dry_run: + db.save(nominal) + print(f" [{'WOULD RESET' if dry_run else 'RESET'}] {state_id}") + try: + wps_doc = db.get("waypoints") + except Exception: + wps_doc = None + if wps_doc: + reset_wps = {**NOMINAL_WAYPOINTS, "_rev": wps_doc["_rev"]} + if not dry_run: + db.save(reset_wps) + print(f" [{'WOULD RESET' if dry_run else 'RESET'}] waypoints") + print("\nNominal state restored." if not dry_run else "\n[Dry run — no writes.]") + + +def verify() -> bool: + db = _connect() + print(f"Verifying robot documents in '{COUCHDB_DBNAME}'...\n") + all_ok = True + + print("── Asset profiles ──") + for asset in ASSETS: + doc_id = asset["profile_id"] + try: + doc = db.get(doc_id) + except Exception: + doc = None + if doc is None: + print(f" [MISSING] {doc_id}"); all_ok = False; continue + missing = [f for f in PROFILE_FIELDS if f not in doc] + if missing: + print(f" [INCOMPLETE] {doc_id} — missing: {missing}"); all_ok = False + else: + print(f" [OK] {doc_id}") + for k in PROFILE_FIELDS: + print(f" {k}: {doc[k]!r}") + print() + + print("── Robot state ──") + state_id = f"robot_state:{ROBOT_ID}" + try: + state = db.get(state_id) + except Exception: + state = None + if state is None: + print(f" [MISSING] {state_id}"); all_ok = False + else: + missing = [f for f in ROBOT_STATE_FIELDS if f not in state] + if missing: + print(f" [INCOMPLETE] {state_id} — missing: {missing}"); all_ok = False + else: + print(f" [OK] {state_id}") + print(f" battery={state['battery_charge_pct']}% | " + f"localization_ok={state['localization_ok']} | " + f"drift={state['pose_drift_m']} m") + print() + + print("── Waypoints ──") + try: + wps_doc = db.get("waypoints") + except Exception: + wps_doc = None + if wps_doc is None: + print(" [MISSING] waypoints"); all_ok = False + else: + wps = wps_doc.get("waypoints", []) + active = sum(1 for w in wps if w.get("active")) + print(f" [OK] waypoints — {len(wps)} total, {active} active") + for wp in wps: + status = "ACTIVE" if wp.get("active") else "INACTIVE (FM-11)" + print(f" {wp['waypoint_id']} → {wp.get('asset_id', 'dock')} [{status}]") + + if all_ok: + print("\nAll documents verified.") + else: + print("\nVERIFICATION FAILED — re-run seed_robot_profiles.py.") + return all_ok + + +def main() -> None: + parser = argparse.ArgumentParser(description="Seed robot documents into CouchDB iot DB.") + group = parser.add_mutually_exclusive_group() + group.add_argument("--dry-run", action="store_true") + group.add_argument("--verify", action="store_true") + group.add_argument("--reset", action="store_true") + args = parser.parse_args() + + if args.verify: + sys.exit(0 if verify() else 1) + elif args.reset: + reset(dry_run=args.dry_run) + else: + run(dry_run=args.dry_run) + + +if __name__ == "__main__": + main() diff --git a/src/servers/robot/__init__.py b/src/servers/robot/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/servers/robot/main.py b/src/servers/robot/main.py new file mode 100644 index 00000000..96aebf24 --- /dev/null +++ b/src/servers/robot/main.py @@ -0,0 +1,902 @@ +"""Robot MCP Server — 12 Layer 1 intrinsic tools (Spot SDK wrappers). + +Each tool maps 1-to-1 to a real Boston Dynamics Spot SDK service call. +For the benchmark (no live robot), each tool reads from a CouchDB document +that simulates the Spot SDK response. Scenario state is injected externally +by the eval harness via init_data.py before each run. +The tools never know which scenario is active — they just read CouchDB. + +CouchDB document types (all in the iot DB): + profile:{asset_id} — per-asset physical layout and panel state + robot_state:{robot_id} — per-robot battery, pose, power and stance state + waypoints — singleton inspection waypoint map + +SDK service mapping: + navigate_to → bosdyn.client.graph_nav.GraphNavClient.navigate_to() + open_panel → bosdyn.client.robot_command.RobotCommandClient (arm manipulation) + get_battery → bosdyn.client.robot_state.RobotStateClient (battery_states) + get_pose → bosdyn.client.robot_state.RobotStateClient (kinematic_state) + list_waypoints → bosdyn.client.graph_nav.GraphNavClient.download_graph() + capture_image → bosdyn.client.image.ImageClient.get_image_from_sources() + power_on → bosdyn.client.power.PowerClient.power_on_motors() + power_off → bosdyn.client.power.PowerClient.power_off_motors() + stand → bosdyn.client.robot_command.RobotCommandBuilder.synchro_stand_command() + sit → bosdyn.client.robot_command.RobotCommandBuilder.synchro_sit_command() + dock → bosdyn.client.docking.DockingClient.blocking_dock_robot() + undock → bosdyn.client.docking.DockingClient.blocking_undock() + +Live-robot connection env vars (not used in benchmark mode): + SPOT_HOSTNAME — robot IP, e.g. 192.168.80.3 + SPOT_USERNAME — SDK auth username (default: admin) + SPOT_PASSWORD — SDK auth password + MAXIMO_URL — IBM Maximo base URL + MAXIMO_APIKEY — Maximo API key +""" + +import logging +import math +import os +from typing import Dict, List, Optional, Union + +import couchdb3 +from dotenv import load_dotenv +from mcp.server.fastmcp import FastMCP +from pydantic import BaseModel + +load_dotenv() + +_log_level = getattr( + logging, os.environ.get("LOG_LEVEL", "WARNING").upper(), logging.WARNING +) +logging.basicConfig(level=_log_level) +logger = logging.getLogger("robot-mcp-server") + +# --------------------------------------------------------------------------- +# Environment — CouchDB (benchmark) + Spot SDK (live robot, not used in bench) +# --------------------------------------------------------------------------- + +COUCHDB_URL = os.environ.get("COUCHDB_URL") +COUCHDB_USERNAME = os.environ.get("COUCHDB_USERNAME") +COUCHDB_PASSWORD = os.environ.get("COUCHDB_PASSWORD") +IOT_DBNAME = os.environ.get("IOT_DBNAME", "iot") +ROBOT_ID = os.environ.get("ROBOT_ID", "spot_1") + +# Live-robot connection vars — read but not consumed in benchmark mode +SPOT_HOSTNAME = os.environ.get("SPOT_HOSTNAME") +SPOT_USERNAME = os.environ.get("SPOT_USERNAME", "admin") +SPOT_PASSWORD = os.environ.get("SPOT_PASSWORD") +MAXIMO_URL = os.environ.get("MAXIMO_URL") +MAXIMO_APIKEY = os.environ.get("MAXIMO_APIKEY") + +try: + db = couchdb3.Database( + IOT_DBNAME, + url=COUCHDB_URL, + user=COUCHDB_USERNAME, + password=COUCHDB_PASSWORD, + ) + logger.info("Connected to IoT CouchDB: %s", IOT_DBNAME) +except Exception as exc: + logger.error("Failed to connect to IoT CouchDB: %s", exc) + db = None + +# --------------------------------------------------------------------------- +# Asset ID normalisation +# --------------------------------------------------------------------------- + +_DISPLAY_TO_KEY: Dict[str, str] = { + "Chiller 6": "chiller_6", + "Metro Pump 1": "metro_pump_1", + "Hydraulic Pump 1": "hydraulic_pump_1", + "Motor 01": "motor_01", + "chiller_6": "chiller_6", + "metro_pump_1": "metro_pump_1", + "hydraulic_pump_1": "hydraulic_pump_1", + "motor_01": "motor_01", +} + + +def _profile_key(asset_id: str) -> str: + return _DISPLAY_TO_KEY.get(asset_id, asset_id.lower().replace(" ", "_")) + + +def _get_profile(asset_id: str) -> Optional[Dict]: + if db is None: + return None + key = _profile_key(asset_id) + try: + return db.get(f"profile:{key}") + except Exception as exc: + logger.error("Profile lookup failed for %s: %s", asset_id, exc) + return None + + +def _get_robot_state() -> Optional[Dict]: + if db is None: + return None + try: + return db.get(f"robot_state:{ROBOT_ID}") + except Exception as exc: + logger.error("robot_state lookup failed for %s: %s", ROBOT_ID, exc) + return None + + +# --------------------------------------------------------------------------- +# FastMCP server +# --------------------------------------------------------------------------- + +mcp = FastMCP( + "robot", + instructions=( + "Spot robot inspection tools — 12 Layer 1 intrinsic tools. " + "Typical inspection sequence: power_on → undock → stand → list_waypoints " + "→ get_battery → navigate_to → get_pose → open_panel → capture_image " + "→ sit → dock → power_off. " + "Abort if get_battery returns low_battery=True. " + "Abort if get_pose returns localization_ok=False. " + "Abort if list_waypoints shows the target waypoint as active=False. " + "After capture_image, pass gauge_path to the VLM agent for reading. " + "Call dock before power_off to return the robot to the charging station." + ), +) + +# --------------------------------------------------------------------------- +# Pydantic result models +# --------------------------------------------------------------------------- + + +class ErrorResult(BaseModel): + error: str + + +class NavigateResult(BaseModel): + asset_id: str + success: bool + waypoint_id: Optional[str] + distance_m: float + steps_taken: int + blocked_reason: Optional[str] = None + message: str + + +class OpenPanelResult(BaseModel): + asset_id: str + success: bool + access_granted: bool + stuck_reason: Optional[str] = None + message: str + + +class BatteryResult(BaseModel): + robot_id: str + battery_charge_pct: float + battery_low_threshold: float + battery_estimated_runtime_s: float + at_charge_station: bool + low_battery: bool + message: str + + +class PoseResult(BaseModel): + robot_id: str + x: float + y: float + theta: float + frame: str + localization_ok: bool + pose_drift_m: float + fault_state: Optional[str] + message: str + + +class WaypointEntry(BaseModel): + waypoint_id: str + waypoint_name: str + asset_id: Optional[str] + location_description: str + x: float + y: float + active: bool + + +class ListWaypointsResult(BaseModel): + total: int + active_count: int + waypoints: List[WaypointEntry] + message: str + + +class CaptureImageResult(BaseModel): + asset_id: str + gauge_path: Optional[str] + gauge_range: List[float] + gauge_description: str + occlusion_flag: bool + image_available: bool + vlm_hint: str + message: str + + +class PowerResult(BaseModel): + robot_id: str + power_state: str # "POWERED_ON" | "POWERED_OFF" + transition: str # "power_on" | "power_off" + success: bool + message: str + + +class StanceResult(BaseModel): + robot_id: str + stance_state: str # "STANDING" | "SITTING" + transition: str # "stand" | "sit" + power_state: str + success: bool + message: str + + +class DockResult(BaseModel): + robot_id: str + docked: bool + at_charge_station: bool + dock_waypoint_id: Optional[str] + battery_charge_pct: float + message: str + + +# --------------------------------------------------------------------------- +# Tool 1: navigate_to +# --------------------------------------------------------------------------- + + +@mcp.tool(title="Navigate To Asset") +def navigate_to(asset_id: str) -> Union[NavigateResult, ErrorResult]: + """Navigate the robot to an asset's physical location. + + Simulates a Spot GraphNav NavigateTo service call. + Returns blocked if physical_location is not set in the profile. + + Call list_waypoints first to confirm the waypoint exists and is active. + """ + if db is None: + return ErrorResult(error="IoT database unavailable") + profile = _get_profile(asset_id) + if profile is None: + return ErrorResult(error=f"No robot profile found for asset '{asset_id}'") + + loc = profile.get("physical_location") + if loc is None: + return NavigateResult( + asset_id=asset_id, + success=False, + waypoint_id=None, + distance_m=0.0, + steps_taken=0, + blocked_reason="physical_location not set in profile", + message=f"Navigation blocked: no floor-plan coordinates for '{asset_id}'", + ) + + x, y, z = float(loc.get("x", 0)), float(loc.get("y", 0)), float(loc.get("z", 0)) + distance_m = round(math.sqrt(x**2 + y**2 + z**2), 2) + steps = max(1, int(distance_m / 0.5)) + room = loc.get("room_id", "unknown") + wp_id = profile.get("waypoint_id") + + return NavigateResult( + asset_id=asset_id, + success=True, + waypoint_id=wp_id, + distance_m=distance_m, + steps_taken=steps, + message=f"Navigated to '{asset_id}' in {room} ({distance_m} m, {steps} steps)", + ) + + +# --------------------------------------------------------------------------- +# Tool 2: open_panel +# --------------------------------------------------------------------------- + + +@mcp.tool(title="Open Inspection Panel") +def open_panel(asset_id: str) -> Union[OpenPanelResult, ErrorResult]: + """Open the asset's physical inspection panel using the Spot Arm. + + Simulates a Spot robot-command manipulation request. + panel_stuck=True in the profile means the panel cannot be opened (FM-1). + """ + if db is None: + return ErrorResult(error="IoT database unavailable") + profile = _get_profile(asset_id) + if profile is None: + return ErrorResult(error=f"No robot profile found for asset '{asset_id}'") + + panel_stuck = bool(profile.get("panel_stuck", False)) + + if panel_stuck: + return OpenPanelResult( + asset_id=asset_id, + success=False, + access_granted=False, + stuck_reason="panel_stuck=True in profile (FM-1)", + message=( + f"Panel failed to open for '{asset_id}'. " + "Escalate and raise work order — do not retry more than 3 times." + ), + ) + + return OpenPanelResult( + asset_id=asset_id, + success=True, + access_granted=True, + message=f"Panel opened successfully for '{asset_id}' — access granted", + ) + + +# --------------------------------------------------------------------------- +# Tool 3: get_battery +# --------------------------------------------------------------------------- + + +@mcp.tool(title="Get Robot Battery State") +def get_battery() -> Union[BatteryResult, ErrorResult]: + """Return the robot's battery charge and estimated runtime. + + Simulates a Spot robot-state battery_states query. + Reads robot_state:{ROBOT_ID} from CouchDB (seeded by seed_robot_profiles.py). + + when low_battery=True (charge_pct < threshold), abort the mission + and dock immediately. Do not proceed to navigate_to or open_panel. + + Call at mission start and after long navigate_to calls. + """ + if db is None: + return ErrorResult(error="IoT database unavailable") + state = _get_robot_state() + if state is None: + return ErrorResult( + error=f"No robot_state document for robot '{ROBOT_ID}' — " + "run seed_robot_profiles.py to initialise" + ) + + charge = float(state.get("battery_charge_pct", 100.0)) + threshold = float(state.get("battery_low_threshold", 20.0)) + runtime_s = float(state.get("battery_estimated_runtime_s", 0.0)) + at_dock = bool(state.get("at_charge_station", False)) + low = charge < threshold + + if low: + msg = ( + f"LOW BATTERY: {charge:.1f}% (threshold {threshold:.0f}%). " + f"~{runtime_s:.0f}s remaining. Abort and dock immediately." + ) + else: + msg = ( + f"Battery OK: {charge:.1f}%, ~{round(runtime_s / 60)} min remaining " + f"({'docked' if at_dock else 'on mission'})" + ) + + return BatteryResult( + robot_id=ROBOT_ID, + battery_charge_pct=charge, + battery_low_threshold=threshold, + battery_estimated_runtime_s=runtime_s, + at_charge_station=at_dock, + low_battery=low, + message=msg, + ) + + +# --------------------------------------------------------------------------- +# Tool 4: get_pose +# --------------------------------------------------------------------------- + + +@mcp.tool(title="Get Robot Pose") +def get_pose() -> Union[PoseResult, ErrorResult]: + """Return the robot's current pose and localisation status. + + Simulates a Spot robot-state kinematic_state query. + Reads robot_state:{ROBOT_ID} from CouchDB. + + when localization_ok=False (pose_drift_m > 0.5 m), abort the + inspection and raise a work order for robot remapping. Readings taken + at an unverified pose come from the wrong asset location. + + Call after navigate_to to confirm the robot arrived correctly. + """ + if db is None: + return ErrorResult(error="IoT database unavailable") + state = _get_robot_state() + if state is None: + return ErrorResult( + error=f"No robot_state document for robot '{ROBOT_ID}' — " + "run seed_robot_profiles.py to initialise" + ) + + pose = state.get("pose", {"x": 0.0, "y": 0.0, "theta": 0.0, "frame": "map"}) + loc_ok = bool(state.get("localization_ok", True)) + drift = float(state.get("pose_drift_m", 0.0)) + fault = state.get("fault_state") + + if not loc_ok: + msg = ( + f"LOCALIZATION FAILURE: drift={drift:.2f} m. " + "Cannot confirm position in facility map. " + "Abort and raise work order for robot remapping." + ) + elif drift > 0.5: + msg = ( + f"WARNING: pose drift {drift:.2f} m (>0.5 m). " + "Consider re-localising before opening panel." + ) + else: + msg = ( + f"Pose OK: x={float(pose.get('x', 0)):.2f} m, " + f"y={float(pose.get('y', 0)):.2f} m, " + f"θ={float(pose.get('theta', 0)):.3f} rad " + f"(frame={pose.get('frame', 'map')})" + ) + + return PoseResult( + robot_id=ROBOT_ID, + x=float(pose.get("x", 0.0)), + y=float(pose.get("y", 0.0)), + theta=float(pose.get("theta", 0.0)), + frame=str(pose.get("frame", "map")), + localization_ok=loc_ok, + pose_drift_m=drift, + fault_state=fault, + message=msg, + ) + + +# --------------------------------------------------------------------------- +# Tool 5: list_waypoints +# --------------------------------------------------------------------------- + + +@mcp.tool(title="List Inspection Waypoints") +def list_waypoints( + asset_id: Optional[str] = None, +) -> Union[ListWaypointsResult, ErrorResult]: + """List all known GraphNav waypoints from the facility map. + + Simulates a Spot graph-nav download_graph response. + Reads the singleton 'waypoints' document from CouchDB. + + when the target asset's waypoint is missing or active=False + (deleted after plant reconfiguration), raise a work order for waypoint + remapping. Do NOT call navigate_to for an inactive waypoint. + + Pass asset_id to filter to that asset's waypoints. Omit to list all. + Call before the first navigate_to in a session. + """ + if db is None: + return ErrorResult(error="IoT database unavailable") + + try: + wps_doc = db.get("waypoints") + except Exception as exc: + logger.error("Waypoints doc lookup failed: %s", exc) + wps_doc = None + + if wps_doc is None: + return ErrorResult( + error="'waypoints' document not found — run seed_robot_profiles.py to initialise" + ) + + raw = wps_doc.get("waypoints", []) + if asset_id is not None: + key = _profile_key(asset_id) + raw = [w for w in raw if w.get("asset_id") in (key, asset_id)] + + entries = [ + WaypointEntry( + waypoint_id=w.get("waypoint_id", ""), + waypoint_name=w.get("waypoint_name", ""), + asset_id=w.get("asset_id"), + location_description=w.get("location_description", ""), + x=float(w.get("x", 0.0)), + y=float(w.get("y", 0.0)), + active=bool(w.get("active", True)), + ) + for w in raw + ] + + active_count = sum(1 for e in entries if e.active) + inactive = [e.waypoint_id for e in entries if not e.active] + + if inactive: + msg = ( + f"{len(entries)} waypoint(s), {active_count} active. " + f"INACTIVE (FM-11): {inactive}. " + "Raise work order for waypoint remapping before navigating." + ) + else: + msg = f"{len(entries)} waypoint(s), all active." + + return ListWaypointsResult( + total=len(entries), + active_count=active_count, + waypoints=entries, + message=msg, + ) + + +# --------------------------------------------------------------------------- +# Tool 6: capture_image +# --------------------------------------------------------------------------- + + +@mcp.tool(title="Capture Asset Gauge Image") +def capture_image(asset_id: str) -> Union[CaptureImageResult, ErrorResult]: + """Capture an image of the asset's gauge panel. + + Simulates a Spot image-service get_image_from_sources call. + Returns gauge_path from the CouchDB profile (None until field data collected). + + image_available=False when gauge_path is None or the panel is occluded. + When image_available=True, pass gauge_path and vlm_hint to the VLM + perception agent for gauge reading. This tool does NOT run the VLM. + + occlusion_flag=True when panel_stuck=True in the profile (panel blocks view). + """ + if db is None: + return ErrorResult(error="IoT database unavailable") + profile = _get_profile(asset_id) + if profile is None: + return ErrorResult(error=f"No robot profile found for asset '{asset_id}'") + + gauge_path = profile.get("gauge_path") + gauge_range = profile.get("gauge_range", [0, 100]) + gauge_desc = profile.get("gauge_description", "") + panel_stuck = bool(profile.get("panel_stuck", False)) + + occlusion = panel_stuck + image_available = gauge_path is not None and not occlusion + + vlm_hint = ( + f"Gauge range: {gauge_range[0]}–{gauge_range[1]}. " + f"Description: {gauge_desc}. " + "Read the needle position and return a float value. " + "If the image is occluded or unreadable return null." + ) + + if occlusion: + msg = ( + f"Image capture for '{asset_id}' BLOCKED — panel is stuck/occluded. " + "Cannot capture gauge image. Escalate FM-2." + ) + elif not image_available: + msg = ( + f"Image for '{asset_id}': gauge_path not yet set in profile " + "(field data pending). VLM path unavailable." + ) + else: + msg = f"Image ready for '{asset_id}': {gauge_path}. Pass to VLM agent with vlm_hint." + + return CaptureImageResult( + asset_id=asset_id, + gauge_path=gauge_path, + gauge_range=gauge_range, + gauge_description=gauge_desc, + occlusion_flag=occlusion, + image_available=image_available, + vlm_hint=vlm_hint, + message=msg, + ) + + +# --------------------------------------------------------------------------- +# Tool 7: power_on +# --------------------------------------------------------------------------- + + +@mcp.tool(title="Power On Robot Motors") +def power_on() -> Union[PowerResult, ErrorResult]: + """Power on the Spot robot's drive motors. + + Simulates a Spot PowerClient.power_on_motors() call. + Reads power_state from robot_state:{ROBOT_ID} in CouchDB. + + Call at mission start before stand or navigate_to. Safe to call + when already powered on — returns success with the current state. + """ + if db is None: + return ErrorResult(error="IoT database unavailable") + state = _get_robot_state() + if state is None: + return ErrorResult( + error=f"No robot_state document for robot '{ROBOT_ID}' — " + "run seed_robot_profiles.py to initialise" + ) + + current = state.get("power_state", "POWERED_ON") + if current == "POWERED_ON": + msg = f"Motors already powered on (robot '{ROBOT_ID}')" + else: + msg = f"Motors powered on successfully (robot '{ROBOT_ID}')" + + return PowerResult( + robot_id=ROBOT_ID, + power_state="POWERED_ON", + transition="power_on", + success=True, + message=msg, + ) + + +# --------------------------------------------------------------------------- +# Tool 8: power_off +# --------------------------------------------------------------------------- + + +@mcp.tool(title="Power Off Robot Motors") +def power_off() -> Union[PowerResult, ErrorResult]: + """Power off the Spot robot's drive motors. + + Simulates a Spot PowerClient.power_off_motors() call. + The robot must be sitting before powering off — call sit() first. + + After power_off the robot cannot move until power_on() is called again. + """ + if db is None: + return ErrorResult(error="IoT database unavailable") + state = _get_robot_state() + if state is None: + return ErrorResult( + error=f"No robot_state document for robot '{ROBOT_ID}' — " + "run seed_robot_profiles.py to initialise" + ) + + current = state.get("power_state", "POWERED_ON") + stance = state.get("stance_state", "STANDING") + + if stance == "STANDING": + return PowerResult( + robot_id=ROBOT_ID, + power_state=current, + transition="power_off", + success=False, + message=( + f"Cannot power off: robot '{ROBOT_ID}' is STANDING. " + "Call sit() before power_off()." + ), + ) + + if current == "POWERED_OFF": + msg = f"Motors already powered off (robot '{ROBOT_ID}')" + else: + msg = f"Motors powered off successfully (robot '{ROBOT_ID}')" + + return PowerResult( + robot_id=ROBOT_ID, + power_state="POWERED_OFF", + transition="power_off", + success=True, + message=msg, + ) + + +# --------------------------------------------------------------------------- +# Tool 9: stand +# --------------------------------------------------------------------------- + + +@mcp.tool(title="Command Robot to Stand") +def stand() -> Union[StanceResult, ErrorResult]: + """Command the Spot robot to stand upright. + + Simulates a Spot RobotCommandBuilder.synchro_stand_command() call via + RobotCommandClient. + + Robot must be powered on before standing. Call power_on() first if needed. + Required before navigate_to or open_panel. + """ + if db is None: + return ErrorResult(error="IoT database unavailable") + state = _get_robot_state() + if state is None: + return ErrorResult( + error=f"No robot_state document for robot '{ROBOT_ID}' — " + "run seed_robot_profiles.py to initialise" + ) + + power = state.get("power_state", "POWERED_ON") + if power != "POWERED_ON": + return StanceResult( + robot_id=ROBOT_ID, + stance_state=state.get("stance_state", "SITTING"), + transition="stand", + power_state=power, + success=False, + message=f"Cannot stand: robot '{ROBOT_ID}' is not powered on. Call power_on() first.", + ) + + current_stance = state.get("stance_state", "SITTING") + if current_stance == "STANDING": + msg = f"Robot '{ROBOT_ID}' is already standing" + else: + msg = f"Robot '{ROBOT_ID}' stood up successfully" + + return StanceResult( + robot_id=ROBOT_ID, + stance_state="STANDING", + transition="stand", + power_state=power, + success=True, + message=msg, + ) + + +# --------------------------------------------------------------------------- +# Tool 10: sit +# --------------------------------------------------------------------------- + + +@mcp.tool(title="Command Robot to Sit") +def sit() -> Union[StanceResult, ErrorResult]: + """Command the Spot robot to sit down. + + Simulates a Spot RobotCommandBuilder.synchro_sit_command() call via + RobotCommandClient. + + Call before dock() at the end of a mission, or in an emergency battery abort. + """ + if db is None: + return ErrorResult(error="IoT database unavailable") + state = _get_robot_state() + if state is None: + return ErrorResult( + error=f"No robot_state document for robot '{ROBOT_ID}' — " + "run seed_robot_profiles.py to initialise" + ) + + power = state.get("power_state", "POWERED_ON") + if power != "POWERED_ON": + return StanceResult( + robot_id=ROBOT_ID, + stance_state=state.get("stance_state", "SITTING"), + transition="sit", + power_state=power, + success=False, + message=f"Cannot sit: robot '{ROBOT_ID}' is not powered on.", + ) + + current_stance = state.get("stance_state", "STANDING") + if current_stance == "SITTING": + msg = f"Robot '{ROBOT_ID}' is already sitting" + else: + msg = f"Robot '{ROBOT_ID}' sat down successfully" + + return StanceResult( + robot_id=ROBOT_ID, + stance_state="SITTING", + transition="sit", + power_state=power, + success=True, + message=msg, + ) + + +# --------------------------------------------------------------------------- +# Tool 11: dock +# --------------------------------------------------------------------------- + + +@mcp.tool(title="Dock Robot at Charging Station") +def dock() -> Union[DockResult, ErrorResult]: + """Return the robot to its charging dock. + + Simulates a Spot DockingClient.blocking_dock_robot() call. + Navigates to the dock waypoint (asset_id=None) in the waypoints document. + + Call at mission end or when get_battery() returns low_battery=True. + The dock waypoint must be active in the waypoints document. + """ + if db is None: + return ErrorResult(error="IoT database unavailable") + state = _get_robot_state() + if state is None: + return ErrorResult( + error=f"No robot_state document for robot '{ROBOT_ID}' — " + "run seed_robot_profiles.py to initialise" + ) + + try: + wps_doc = db.get("waypoints") + except Exception as exc: + logger.error("Waypoints doc lookup failed: %s", exc) + wps_doc = None + + dock_wp = None + if wps_doc: + for wp in wps_doc.get("waypoints", []): + if wp.get("asset_id") is None and "dock" in wp.get("waypoint_id", "").lower(): + dock_wp = wp + break + + if dock_wp is None or not dock_wp.get("active", True): + return DockResult( + robot_id=ROBOT_ID, + docked=False, + at_charge_station=False, + dock_waypoint_id=dock_wp.get("waypoint_id") if dock_wp else None, + battery_charge_pct=float(state.get("battery_charge_pct", 0.0)), + message=f"Dock waypoint not found or inactive — cannot dock robot '{ROBOT_ID}'", + ) + + charge = float(state.get("battery_charge_pct", 0.0)) + return DockResult( + robot_id=ROBOT_ID, + docked=True, + at_charge_station=True, + dock_waypoint_id=dock_wp.get("waypoint_id"), + battery_charge_pct=charge, + message=( + f"Robot '{ROBOT_ID}' docked at '{dock_wp.get('waypoint_name', 'dock')}'. " + f"Battery: {charge:.1f}%." + ), + ) + + +# --------------------------------------------------------------------------- +# Tool 12: undock +# --------------------------------------------------------------------------- + + +@mcp.tool(title="Undock Robot from Charging Station") +def undock() -> Union[DockResult, ErrorResult]: + """Release the robot from the charging dock. + + Simulates a Spot DockingClient.blocking_undock() call. + Checks at_charge_station and battery_charge_pct before undocking. + + Call at mission start, after power_on(). Abort if battery is too low + to complete the mission (check get_battery() before proceeding). + """ + if db is None: + return ErrorResult(error="IoT database unavailable") + state = _get_robot_state() + if state is None: + return ErrorResult( + error=f"No robot_state document for robot '{ROBOT_ID}' — " + "run seed_robot_profiles.py to initialise" + ) + + charge = float(state.get("battery_charge_pct", 100.0)) + threshold = float(state.get("battery_low_threshold", 20.0)) + + if charge < threshold: + return DockResult( + robot_id=ROBOT_ID, + docked=True, + at_charge_station=True, + dock_waypoint_id=None, + battery_charge_pct=charge, + message=( + f"UNDOCK REFUSED: battery at {charge:.1f}% (threshold {threshold:.0f}%). " + "Charge before undocking." + ), + ) + + return DockResult( + robot_id=ROBOT_ID, + docked=False, + at_charge_station=False, + dock_waypoint_id=None, + battery_charge_pct=charge, + message=f"Robot '{ROBOT_ID}' undocked successfully. Battery: {charge:.1f}%.", + ) + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + + +def main() -> None: + mcp.run(transport="stdio") + + +if __name__ == "__main__": + main() diff --git a/src/servers/robot/tests/__init__.py b/src/servers/robot/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/servers/robot/tests/conftest.py b/src/servers/robot/tests/conftest.py new file mode 100644 index 00000000..3fede36e --- /dev/null +++ b/src/servers/robot/tests/conftest.py @@ -0,0 +1,218 @@ +"""Shared fixtures and helpers for robot MCP server tests.""" + +import json +import os + +import pytest +from dotenv import load_dotenv +from unittest.mock import MagicMock, patch + +load_dotenv() + + +# --------------------------------------------------------------------------- +# CouchDB availability gate +# --------------------------------------------------------------------------- + + +def _couchdb_reachable() -> bool: + url = os.environ.get("COUCHDB_URL") + if not url: + return False + try: + import requests + requests.get(url, timeout=2) + return True + except Exception: + return False + + +requires_couchdb = pytest.mark.skipif( + not _couchdb_reachable(), + reason="CouchDB not reachable (set COUCHDB_URL and ensure CouchDB is running)", +) + + +# --------------------------------------------------------------------------- +# CouchDB document fixtures — simulate the 3 doc types in the iot DB +# --------------------------------------------------------------------------- + +ROBOT_ID = "spot_1" + +NOMINAL_PROFILE = { + "_id": "profile:motor_01", + "doc_type": "asset_robot_profile", + "display_name": "Motor 01", + "waypoint_id": "wp_motor_01_panel", + "physical_location": {"x": 7.2, "y": 44.8, "z": 0.0, "room_id": "motor_bay_1"}, + "gauge_path": None, + "gauge_range": [0, 200], + "gauge_description": "Motor temperature gauge, analog dial, 0–200°C", + "panel_stuck": False, +} + +NOMINAL_ROBOT_STATE = { + "_id": f"robot_state:{ROBOT_ID}", + "doc_type": "robot_state", + "robot_id": ROBOT_ID, + "battery_charge_pct": 85.0, + "battery_low_threshold": 20.0, + "battery_estimated_runtime_s": 5400.0, + "at_charge_station": False, + "pose": {"x": 0.0, "y": 0.0, "theta": 0.0, "frame": "map"}, + "localization_ok": True, + "pose_drift_m": 0.0, + "fault_state": None, + "power_state": "POWERED_ON", + "stance_state": "STANDING", +} + +NOMINAL_WAYPOINTS = { + "_id": "waypoints", + "doc_type": "waypoints", + "waypoints": [ + { + "waypoint_id": "wp_motor_01_panel", "waypoint_name": "Motor 01 — Panel M-01", + "asset_id": "motor_01", "location_description": "Plant A / Drive Bay 1", + "x": 7.2, "y": 44.8, "active": True, + }, + { + "waypoint_id": "wp_chiller_6_panel", "waypoint_name": "Chiller 6 — Panel C-06", + "asset_id": "chiller_6", "location_description": "Plant B / Mech Room 2", + "x": 52.3, "y": 18.1, "active": True, + }, + { + "waypoint_id": "wp_dock_main", "waypoint_name": "Main Charging Dock", + "asset_id": None, "location_description": "Building A / Charging Bay 1", + "x": 0.0, "y": 0.0, "active": True, + }, + ], +} + + +def _make_mock_db( + profile: dict = None, + robot_state: dict = None, + waypoints: dict = None, +) -> MagicMock: + """Return a MagicMock CouchDB that returns the specified docs on .get().""" + import copy + profile_doc = copy.deepcopy(profile or NOMINAL_PROFILE) + robot_state_doc = copy.deepcopy(robot_state or NOMINAL_ROBOT_STATE) + waypoints_doc = copy.deepcopy(waypoints or NOMINAL_WAYPOINTS) + + doc_map = { + profile_doc["_id"]: profile_doc, + robot_state_doc["_id"]: robot_state_doc, + waypoints_doc["_id"]: waypoints_doc, + } + + mock = MagicMock() + mock.get.side_effect = lambda doc_id: doc_map.get(doc_id) + return mock + + +@pytest.fixture +def mock_db(): + """Nominal CouchDB mock — motor_01 profile, nominal robot state, all waypoints active.""" + with patch("servers.robot.main.db", _make_mock_db()): + yield + + +@pytest.fixture +def no_db(): + """Simulate disconnected IoT CouchDB (db=None).""" + with patch("servers.robot.main.db", None): + yield + + +# --------------------------------------------------------------------------- +# Scenario-specific CouchDB mocks +# --------------------------------------------------------------------------- + + +@pytest.fixture +def mock_db_panel_stuck(): + """FM-1: panel_stuck=True in profile.""" + import copy + profile = copy.deepcopy(NOMINAL_PROFILE) + profile["panel_stuck"] = True + with patch("servers.robot.main.db", _make_mock_db(profile=profile)): + yield + + +@pytest.fixture +def mock_db_low_battery(): + """FM-9: battery_charge_pct below threshold.""" + import copy + state = copy.deepcopy(NOMINAL_ROBOT_STATE) + state["battery_charge_pct"] = 18.0 + state["battery_estimated_runtime_s"] = 360.0 + with patch("servers.robot.main.db", _make_mock_db(robot_state=state)): + yield + + +@pytest.fixture +def mock_db_loc_failure(): + """FM-10: localization_ok=False, pose_drift_m=1.8.""" + import copy + state = copy.deepcopy(NOMINAL_ROBOT_STATE) + state["localization_ok"] = False + state["pose_drift_m"] = 1.8 + state["pose"] = {"x": 13.1, "y": 31.7, "theta": 0.3, "frame": "map"} + with patch("servers.robot.main.db", _make_mock_db(robot_state=state)): + yield + + +@pytest.fixture +def mock_db_waypoint_missing(): + """FM-11: motor_01 waypoint active=False.""" + import copy + wps = copy.deepcopy(NOMINAL_WAYPOINTS) + for wp in wps["waypoints"]: + if wp["asset_id"] == "motor_01": + wp["active"] = False + with patch("servers.robot.main.db", _make_mock_db(waypoints=wps)): + yield + + +@pytest.fixture +def mock_db_with_gauge_path(): + """Profile with gauge_path set (for capture_image image_available=True test).""" + import copy + profile = copy.deepcopy(NOMINAL_PROFILE) + profile["gauge_path"] = "/data/gauges/motor_01/gauge_001.jpg" + with patch("servers.robot.main.db", _make_mock_db(profile=profile)): + yield + + +@pytest.fixture +def mock_db_powered_off(): + """Robot power_state=POWERED_OFF, stance_state=SITTING.""" + import copy + state = copy.deepcopy(NOMINAL_ROBOT_STATE) + state["power_state"] = "POWERED_OFF" + state["stance_state"] = "SITTING" + with patch("servers.robot.main.db", _make_mock_db(robot_state=state)): + yield + + +@pytest.fixture +def mock_db_sitting(): + """Robot powered on but stance_state=SITTING (before stand or after sit).""" + import copy + state = copy.deepcopy(NOMINAL_ROBOT_STATE) + state["stance_state"] = "SITTING" + with patch("servers.robot.main.db", _make_mock_db(robot_state=state)): + yield + + +# --------------------------------------------------------------------------- +# MCP tool call helper +# --------------------------------------------------------------------------- + + +async def call_tool(mcp_instance, tool_name: str, args: dict) -> dict: + """Call an MCP tool and return the parsed JSON response.""" + contents, _ = await mcp_instance.call_tool(tool_name, args) + return json.loads(contents[0].text) diff --git a/src/servers/robot/tests/test_robot_tools.py b/src/servers/robot/tests/test_robot_tools.py new file mode 100644 index 00000000..86906b8f --- /dev/null +++ b/src/servers/robot/tests/test_robot_tools.py @@ -0,0 +1,541 @@ +"""Unit tests for the 12 Layer 1 Spot SDK intrinsic tools. + +All tests mock the CouchDB layer — no live database required. +CouchDB integration tests are guarded by @requires_couchdb. + +Tools under test: + navigate_to open_panel get_battery + get_pose list_waypoints capture_image + power_on power_off stand + sit dock undock +""" + +import pytest + +from .conftest import call_tool, requires_couchdb +from servers.robot.main import mcp + + +# =========================================================================== +# navigate_to +# =========================================================================== + + +@pytest.mark.asyncio +async def test_navigate_to_nominal(mock_db): + result = await call_tool(mcp, "navigate_to", {"asset_id": "Motor 01"}) + assert result["success"] is True + assert result["asset_id"] == "Motor 01" + assert result["distance_m"] > 0 + assert result["steps_taken"] >= 1 + assert result["blocked_reason"] is None + assert "Motor 01" in result["message"] + + +@pytest.mark.asyncio +async def test_navigate_to_no_db(no_db): + result = await call_tool(mcp, "navigate_to", {"asset_id": "Motor 01"}) + assert "error" in result + assert "unavailable" in result["error"].lower() + + +@pytest.mark.asyncio +async def test_navigate_to_unknown_asset(mock_db): + result = await call_tool(mcp, "navigate_to", {"asset_id": "NonExistentPump"}) + assert "error" in result + assert "profile" in result["error"].lower() + + +@pytest.mark.asyncio +async def test_navigate_to_no_location(mock_db): + """Profile exists but physical_location is None.""" + import copy + from unittest.mock import patch + from .conftest import NOMINAL_PROFILE, _make_mock_db + profile = copy.deepcopy(NOMINAL_PROFILE) + profile["physical_location"] = None + with patch("servers.robot.main.db", _make_mock_db(profile=profile)): + result = await call_tool(mcp, "navigate_to", {"asset_id": "Motor 01"}) + assert result["success"] is False + assert result["blocked_reason"] is not None + assert "physical_location" in result["blocked_reason"] + + +# =========================================================================== +# open_panel +# =========================================================================== + + +@pytest.mark.asyncio +async def test_open_panel_nominal(mock_db): + result = await call_tool(mcp, "open_panel", {"asset_id": "Motor 01"}) + assert result["success"] is True + assert result["access_granted"] is True + assert result["stuck_reason"] is None + + +@pytest.mark.asyncio +async def test_open_panel_stuck(mock_db_panel_stuck): + """FM-1: panel_stuck=True → access_granted=False, escalation required.""" + result = await call_tool(mcp, "open_panel", {"asset_id": "Motor 01"}) + assert result["success"] is False + assert result["access_granted"] is False + assert result["stuck_reason"] is not None + assert "FM-1" in result["stuck_reason"] + assert "Escalate" in result["message"] + + +@pytest.mark.asyncio +async def test_open_panel_no_db(no_db): + result = await call_tool(mcp, "open_panel", {"asset_id": "Motor 01"}) + assert "error" in result + + +# =========================================================================== +# get_battery +# =========================================================================== + + +@pytest.mark.asyncio +async def test_get_battery_nominal(mock_db): + result = await call_tool(mcp, "get_battery", {}) + assert result["low_battery"] is False + assert result["battery_charge_pct"] == 85.0 + assert result["battery_low_threshold"] == 20.0 + assert result["battery_estimated_runtime_s"] == 5400.0 + assert "OK" in result["message"] + + +@pytest.mark.asyncio +async def test_get_battery_low(mock_db_low_battery): + """FM-9: battery below threshold → low_battery=True, abort instruction.""" + result = await call_tool(mcp, "get_battery", {}) + assert result["low_battery"] is True + assert result["battery_charge_pct"] == 18.0 + assert "LOW BATTERY" in result["message"] + assert "Abort" in result["message"] + + +@pytest.mark.asyncio +async def test_get_battery_no_db(no_db): + result = await call_tool(mcp, "get_battery", {}) + assert "error" in result + + +@pytest.mark.asyncio +async def test_get_battery_missing_state_doc(mock_db): + """robot_state doc not found → error with helpful message.""" + from unittest.mock import MagicMock, patch + mock = MagicMock() + mock.get.return_value = None + with patch("servers.robot.main.db", mock): + result = await call_tool(mcp, "get_battery", {}) + assert "error" in result + assert "robot_state" in result["error"] + + +# =========================================================================== +# get_pose +# =========================================================================== + + +@pytest.mark.asyncio +async def test_get_pose_nominal(mock_db): + result = await call_tool(mcp, "get_pose", {}) + assert result["localization_ok"] is True + assert result["pose_drift_m"] == 0.0 + assert result["fault_state"] is None + assert "OK" in result["message"] + + +@pytest.mark.asyncio +async def test_get_pose_localization_failure(mock_db_loc_failure): + """FM-10: localization_ok=False, drift=1.8 m → abort instruction.""" + result = await call_tool(mcp, "get_pose", {}) + assert result["localization_ok"] is False + assert result["pose_drift_m"] == 1.8 + assert "LOCALIZATION FAILURE" in result["message"] + assert "Abort" in result["message"] + + +@pytest.mark.asyncio +async def test_get_pose_no_db(no_db): + result = await call_tool(mcp, "get_pose", {}) + assert "error" in result + + +# =========================================================================== +# list_waypoints +# =========================================================================== + + +@pytest.mark.asyncio +async def test_list_waypoints_all(mock_db): + result = await call_tool(mcp, "list_waypoints", {}) + assert result["total"] == 3 + assert result["active_count"] == 3 + assert all(wp["active"] for wp in result["waypoints"]) + assert "all active" in result["message"] + + +@pytest.mark.asyncio +async def test_list_waypoints_filtered(mock_db): + result = await call_tool(mcp, "list_waypoints", {"asset_id": "motor_01"}) + assert result["total"] == 1 + assert result["waypoints"][0]["waypoint_id"] == "wp_motor_01_panel" + + +@pytest.mark.asyncio +async def test_list_waypoints_missing(mock_db_waypoint_missing): + """FM-11: motor_01 waypoint is inactive → escalation instruction.""" + result = await call_tool(mcp, "list_waypoints", {"asset_id": "motor_01"}) + assert result["active_count"] == 0 + inactive = result["waypoints"][0] + assert inactive["active"] is False + assert "INACTIVE" in result["message"] + assert "FM-11" in result["message"] + + +@pytest.mark.asyncio +async def test_list_waypoints_no_db(no_db): + result = await call_tool(mcp, "list_waypoints", {}) + assert "error" in result + + +@pytest.mark.asyncio +async def test_list_waypoints_no_doc(mock_db): + """waypoints doc returns None → error with helpful message.""" + from unittest.mock import MagicMock, patch + mock = MagicMock() + mock.get.return_value = None + with patch("servers.robot.main.db", mock): + result = await call_tool(mcp, "list_waypoints", {}) + assert "error" in result + assert "waypoints" in result["error"] + + +# =========================================================================== +# capture_image +# =========================================================================== + + +@pytest.mark.asyncio +async def test_capture_image_no_gauge_path(mock_db): + """Nominal profile: gauge_path=None → image_available=False.""" + result = await call_tool(mcp, "capture_image", {"asset_id": "Motor 01"}) + assert result["image_available"] is False + assert result["occlusion_flag"] is False + assert result["gauge_path"] is None + assert result["gauge_range"] == [0, 200] + assert "pending" in result["message"] + + +@pytest.mark.asyncio +async def test_capture_image_with_gauge_path(mock_db_with_gauge_path): + """gauge_path set → image_available=True, vlm_hint populated.""" + result = await call_tool(mcp, "capture_image", {"asset_id": "Motor 01"}) + assert result["image_available"] is True + assert result["occlusion_flag"] is False + assert result["gauge_path"] == "/data/gauges/motor_01/gauge_001.jpg" + assert "vlm_hint" in result + assert "0" in result["vlm_hint"] # range low + assert "200" in result["vlm_hint"] # range high + assert "Pass to VLM" in result["message"] + + +@pytest.mark.asyncio +async def test_capture_image_panel_stuck(mock_db_panel_stuck): + """panel_stuck=True → occlusion_flag=True, image_available=False.""" + result = await call_tool(mcp, "capture_image", {"asset_id": "Motor 01"}) + assert result["occlusion_flag"] is True + assert result["image_available"] is False + assert "BLOCKED" in result["message"] + assert "FM-2" in result["message"] + + +@pytest.mark.asyncio +async def test_capture_image_no_db(no_db): + result = await call_tool(mcp, "capture_image", {"asset_id": "Motor 01"}) + assert "error" in result + + +@pytest.mark.asyncio +async def test_capture_image_unknown_asset(mock_db): + result = await call_tool(mcp, "capture_image", {"asset_id": "GhostPump"}) + assert "error" in result + + +# =========================================================================== +# gauge_value isolation — no tool must return gauge_value +# =========================================================================== + + +@pytest.mark.asyncio +async def test_no_tool_returns_gauge_value(mock_db_with_gauge_path): + """None of the 12 tools should return a gauge_value field.""" + tools_and_args = [ + ("navigate_to", {"asset_id": "Motor 01"}), + ("open_panel", {"asset_id": "Motor 01"}), + ("get_battery", {}), + ("get_pose", {}), + ("list_waypoints", {}), + ("capture_image", {"asset_id": "Motor 01"}), + ("power_on", {}), + ("power_off", {}), + ("stand", {}), + ("sit", {}), + ("dock", {}), + ("undock", {}), + ] + for tool_name, args in tools_and_args: + result = await call_tool(mcp, tool_name, args) + assert "gauge_value" not in result, ( + f"Tool '{tool_name}' leaked gauge_value field: {result}" + ) + + +# =========================================================================== +# power_on +# =========================================================================== + + +@pytest.mark.asyncio +async def test_power_on_nominal(mock_db): + result = await call_tool(mcp, "power_on", {}) + assert result["success"] is True + assert result["power_state"] == "POWERED_ON" + assert result["transition"] == "power_on" + assert result["robot_id"] == "spot_1" + + +@pytest.mark.asyncio +async def test_power_on_already_on(mock_db): + """Calling power_on when already powered on returns success (idempotent).""" + result = await call_tool(mcp, "power_on", {}) + assert result["success"] is True + assert "already" in result["message"].lower() + + +@pytest.mark.asyncio +async def test_power_on_from_off(mock_db_powered_off): + """power_on transitions from POWERED_OFF to POWERED_ON.""" + result = await call_tool(mcp, "power_on", {}) + assert result["success"] is True + assert result["power_state"] == "POWERED_ON" + assert "powered on" in result["message"].lower() + + +@pytest.mark.asyncio +async def test_power_on_no_db(no_db): + result = await call_tool(mcp, "power_on", {}) + assert "error" in result + + +# =========================================================================== +# power_off +# =========================================================================== + + +@pytest.mark.asyncio +async def test_power_off_sitting(mock_db_sitting): + """Sitting + powered on → power_off succeeds.""" + result = await call_tool(mcp, "power_off", {}) + assert result["success"] is True + assert result["power_state"] == "POWERED_OFF" + assert result["transition"] == "power_off" + + +@pytest.mark.asyncio +async def test_power_off_while_standing(mock_db): + """Robot is STANDING → power_off must refuse.""" + result = await call_tool(mcp, "power_off", {}) + assert result["success"] is False + assert "STANDING" in result["message"] + assert "sit()" in result["message"] + + +@pytest.mark.asyncio +async def test_power_off_already_off(mock_db_powered_off): + """Already powered off → idempotent success.""" + result = await call_tool(mcp, "power_off", {}) + assert result["success"] is True + assert "already" in result["message"].lower() + + +@pytest.mark.asyncio +async def test_power_off_no_db(no_db): + result = await call_tool(mcp, "power_off", {}) + assert "error" in result + + +# =========================================================================== +# stand +# =========================================================================== + + +@pytest.mark.asyncio +async def test_stand_nominal(mock_db_sitting): + """Powered on and sitting → stand succeeds.""" + result = await call_tool(mcp, "stand", {}) + assert result["success"] is True + assert result["stance_state"] == "STANDING" + assert result["power_state"] == "POWERED_ON" + assert result["transition"] == "stand" + + +@pytest.mark.asyncio +async def test_stand_already_standing(mock_db): + """Already standing → idempotent success.""" + result = await call_tool(mcp, "stand", {}) + assert result["success"] is True + assert "already" in result["message"].lower() + + +@pytest.mark.asyncio +async def test_stand_powered_off(mock_db_powered_off): + """Power off → stand must refuse.""" + result = await call_tool(mcp, "stand", {}) + assert result["success"] is False + assert "not powered on" in result["message"].lower() + assert "power_on()" in result["message"] + + +@pytest.mark.asyncio +async def test_stand_no_db(no_db): + result = await call_tool(mcp, "stand", {}) + assert "error" in result + + +# =========================================================================== +# sit +# =========================================================================== + + +@pytest.mark.asyncio +async def test_sit_nominal(mock_db): + """Powered on and standing → sit succeeds.""" + result = await call_tool(mcp, "sit", {}) + assert result["success"] is True + assert result["stance_state"] == "SITTING" + assert result["transition"] == "sit" + + +@pytest.mark.asyncio +async def test_sit_already_sitting(mock_db_sitting): + """Already sitting → idempotent success.""" + result = await call_tool(mcp, "sit", {}) + assert result["success"] is True + assert "already" in result["message"].lower() + + +@pytest.mark.asyncio +async def test_sit_powered_off(mock_db_powered_off): + """Power off → sit must refuse.""" + result = await call_tool(mcp, "sit", {}) + assert result["success"] is False + assert "not powered on" in result["message"].lower() + + +@pytest.mark.asyncio +async def test_sit_no_db(no_db): + result = await call_tool(mcp, "sit", {}) + assert "error" in result + + +# =========================================================================== +# dock +# =========================================================================== + + +@pytest.mark.asyncio +async def test_dock_nominal(mock_db): + """Nominal state with dock waypoint active → dock succeeds.""" + result = await call_tool(mcp, "dock", {}) + assert result["docked"] is True + assert result["at_charge_station"] is True + assert result["dock_waypoint_id"] == "wp_dock_main" + assert result["battery_charge_pct"] == 85.0 + + +@pytest.mark.asyncio +async def test_dock_no_db(no_db): + result = await call_tool(mcp, "dock", {}) + assert "error" in result + + +@pytest.mark.asyncio +async def test_dock_no_dock_waypoint(mock_db): + """Waypoints doc has no dock waypoint → dock fails gracefully.""" + import copy + from unittest.mock import patch + from .conftest import NOMINAL_ROBOT_STATE, NOMINAL_PROFILE, _make_mock_db + wps_no_dock = { + "_id": "waypoints", + "doc_type": "waypoints", + "waypoints": [ + {"waypoint_id": "wp_motor_01_panel", "waypoint_name": "Motor 01", + "asset_id": "motor_01", "location_description": "Plant A", + "x": 7.2, "y": 44.8, "active": True}, + ], + } + with patch("servers.robot.main.db", _make_mock_db(waypoints=wps_no_dock)): + result = await call_tool(mcp, "dock", {}) + assert result["docked"] is False + assert result["at_charge_station"] is False + assert "not found" in result["message"].lower() + + +# =========================================================================== +# undock +# =========================================================================== + + +@pytest.mark.asyncio +async def test_undock_nominal(mock_db): + """Nominal battery 85% → undock succeeds.""" + result = await call_tool(mcp, "undock", {}) + assert result["docked"] is False + assert result["at_charge_station"] is False + assert result["battery_charge_pct"] == 85.0 + + +@pytest.mark.asyncio +async def test_undock_low_battery(mock_db_low_battery): + """Battery 18% (below threshold 20%) → undock refused.""" + result = await call_tool(mcp, "undock", {}) + assert result["docked"] is True + assert result["at_charge_station"] is True + assert "UNDOCK REFUSED" in result["message"] + + +@pytest.mark.asyncio +async def test_undock_no_db(no_db): + result = await call_tool(mcp, "undock", {}) + assert "error" in result + + +# =========================================================================== +# Integration (live CouchDB required) +# =========================================================================== + + +@requires_couchdb +def test_get_battery_live(): + """Live CouchDB: get_battery returns a valid result or a clear 'not seeded' error.""" + from servers.robot.main import get_battery + result = get_battery() + if hasattr(result, "error"): + assert "robot_state" in result.error + else: + assert result.battery_charge_pct >= 0 + + +@requires_couchdb +def test_list_waypoints_live(): + """Live CouchDB: list_waypoints returns waypoints or a clear 'not seeded' error.""" + from servers.robot.main import list_waypoints + result = list_waypoints() + if hasattr(result, "error"): + assert "waypoints" in result.error + else: + assert result.total >= 0