From bffdd173699e1b8454ff7b3f33e652b2c47f0220 Mon Sep 17 00:00:00 2001
From: Emerson Knapp

Before
@@ -49,8 +47,7 @@
Thermal Pad Placement
Top to bottom: UPS, GPIO header, e-ink display
UPS lowbat soldering
@@ -148,16 +145,16 @@ We will go through the wire soldering connections in the diagram above one by on
ESP32 pin D34 soldering
Wired and connected system.
Raspberry Pi Config Menu
@@ -190,7 +190,7 @@ source install/setup.bash
In a terminal on your local machine where you have built and sourced the custom message type above:
1. Launch a rosbridge_server with `ros2 launch rosbridge_server rosbridge_websocket_launch.xml`.
- If you are running this in a Docker container, make sure that port 9090 is exposed by doing `docker run -p 9090:9090 ...`
-2. To run the roslibpy client, run `python roslibpy_client.py [ip address]`.
+2. To run the roslibpy client, run `python roslibpy_client.py [ip address]`.
- You can set a default ip address by altering line 12 of [`roslibpy_client.py`](../roslibpy_client.py):
```
parser.add_argument('target', type=str, help='Target IP address', default='')
@@ -222,4 +222,4 @@ WantedBy=multi-user.target
4. Enable the service: `sudo systemctl enable pstop-autostart.service`.
5. This service should now start on boot!
- You can reboot either by power cycling or with `sudo reboot`.
-- You can also test it without rebooting with `sudo systemctl start pstop-autostart.service`.
\ No newline at end of file
+- You can also test it without rebooting with `sudo systemctl start pstop-autostart.service`.
diff --git a/archive/protective_stop_remote/protective_stop_remote/app.py b/archive/protective_stop_remote/protective_stop_remote/app.py
index 5d3bda1..34f9597 100644
--- a/archive/protective_stop_remote/protective_stop_remote/app.py
+++ b/archive/protective_stop_remote/protective_stop_remote/app.py
@@ -1,13 +1,10 @@
-#!/usr/bin/env python3
-# -*- coding: utf-8 -*-
-
# Handles configuration via webpage + saving config to json file
import json
-import subprocess
import re
-from flask import Flask, request, redirect, render_template, jsonify
-import logging
+import subprocess
+
+from flask import Flask, jsonify, redirect, render_template, request
app = Flask(__name__)
CONFIG_FILE = 'config.json'
@@ -17,121 +14,118 @@
def list_wifi_connections():
try:
- result = subprocess.run(['nmcli', '-t', '-f', 'NAME,DEVICE,STATE',
- 'connection', 'show'], capture_output=True, text=True)
+ result = subprocess.run(
+ ['nmcli', '-t', '-f', 'NAME,DEVICE,STATE', 'connection', 'show'], capture_output=True, text=True
+ )
if result.returncode != 0:
- raise Exception(f"nmcli error: {result.stderr}")
+ raise Exception(f'nmcli error: {result.stderr}')
connections = []
- for line in result.stdout.strip().split("\n"):
+ for line in result.stdout.strip().split('\n'):
name, device, state = line.split(':')
- connections.append(
- {"name": name, "device": device, "state": state})
+ connections.append({'name': name, 'device': device, 'state': state})
return connections
except Exception as e:
- print(f"Error listing Wi-Fi connections: {e}")
+ print(f'Error listing Wi-Fi connections: {e}')
return []
def add_wifi_connection(ssid, password):
try:
- result = subprocess.run(['nmcli', 'dev', 'wifi', 'connect',
- ssid, 'password', password], capture_output=True, text=True)
+ result = subprocess.run(
+ ['nmcli', 'dev', 'wifi', 'connect', ssid, 'password', password], capture_output=True, text=True
+ )
if result.returncode != 0:
- raise Exception(f"nmcli error: {result.stderr}")
+ raise Exception(f'nmcli error: {result.stderr}')
return True
except Exception as e:
- print(f"Error adding Wi-Fi connection: {e}")
+ print(f'Error adding Wi-Fi connection: {e}')
return False
def remove_wifi_connection(name):
try:
- result = subprocess.run(
- ['nmcli', 'connection', 'delete', name], capture_output=True, text=True)
+ result = subprocess.run(['nmcli', 'connection', 'delete', name], capture_output=True, text=True)
if result.returncode != 0:
- raise Exception(f"nmcli error: {result.stderr}")
+ raise Exception(f'nmcli error: {result.stderr}')
return True
except Exception as e:
- print(f"Error removing Wi-Fi connection: {e}")
+ print(f'Error removing Wi-Fi connection: {e}')
return False
def activate_wifi_connection(name):
"""Activate a specific Wi-Fi connection."""
try:
- result = subprocess.run(
- ['nmcli', 'connection', 'up', name], capture_output=True, text=True)
+ result = subprocess.run(['nmcli', 'connection', 'up', name], capture_output=True, text=True)
if result.returncode != 0:
- raise Exception(f"nmcli error: {result.stderr}")
+ raise Exception(f'nmcli error: {result.stderr}')
return True
except Exception as e:
- print(f"Error activating Wi-Fi connection: {e}")
+ print(f'Error activating Wi-Fi connection: {e}')
return False
+
# Tailscale and signal strength functions
def get_wifi_signal_strength():
try:
- result = subprocess.run(
- ['nmcli', '-t', '-f', 'IN-USE,SIGNAL', 'dev', 'wifi'], capture_output=True, text=True)
+ result = subprocess.run(['nmcli', '-t', '-f', 'IN-USE,SIGNAL', 'dev', 'wifi'], capture_output=True, text=True)
if result.returncode != 0:
- raise Exception(f"nmcli error: {result.stderr}")
+ raise Exception(f'nmcli error: {result.stderr}')
- wifi_lines = result.stdout.strip().split("\n")
+ wifi_lines = result.stdout.strip().split('\n')
for line in wifi_lines:
if line.startswith('*'):
_, signal = line.split(':')
return int(signal)
except Exception as e:
- print(f"Error getting Wi-Fi signal strength: {e}")
+ print(f'Error getting Wi-Fi signal strength: {e}')
return None
def get_cellular_signal_strength():
try:
- result = subprocess.run(
- ['mmcli', '-L'], capture_output=True, text=True)
+ result = subprocess.run(['mmcli', '-L'], capture_output=True, text=True)
if result.returncode != 0:
- raise Exception(f"mmcli error: {result.stderr}")
+ raise Exception(f'mmcli error: {result.stderr}')
output = result.stdout.strip()
match = re.search(r'/Modem/(\d+)', output)
if not match:
- raise Exception("Could not find a modem index in mmcli output.")
+ raise Exception('Could not find a modem index in mmcli output.')
modem_index = match.group(1)
- result = subprocess.run(
- ['mmcli', '-m', modem_index], capture_output=True, text=True)
+ result = subprocess.run(['mmcli', '-m', modem_index], capture_output=True, text=True)
if result.returncode != 0:
- raise Exception(f"mmcli error: {result.stderr}")
+ raise Exception(f'mmcli error: {result.stderr}')
output = result.stdout
signal_match = re.search(r'signal quality: (\d+)', output)
if signal_match:
return int(signal_match.group(1))
else:
- raise Exception("Signal quality not found in modem details.")
+ raise Exception('Signal quality not found in modem details.')
except Exception as e:
- print(f"Error getting cellular signal strength: {e}")
+ print(f'Error getting cellular signal strength: {e}')
return None
def fetch_tailscale_info(shared):
try:
- result = subprocess.run(
- ["tailscale", "status", "--json"], capture_output=True, text=True, check=True)
+ result = subprocess.run(['tailscale', 'status', '--json'], capture_output=True, text=True, check=True)
status = json.loads(result.stdout)
- tailscale_ips = status.get("Self", {}).get("TailscaleIPs", [])
- dns_name = status.get("Self", {}).get("DNSName", "")
- shared["TailscaleIP"] = tailscale_ips[0] if tailscale_ips else "N/A"
- shared["DNSName"] = dns_name if dns_name else "N/A"
+ tailscale_ips = status.get('Self', {}).get('TailscaleIPs', [])
+ dns_name = status.get('Self', {}).get('DNSName', '')
+ shared['TailscaleIP'] = tailscale_ips[0] if tailscale_ips else 'N/A'
+ shared['DNSName'] = dns_name if dns_name else 'N/A'
except Exception as e:
- print(f"Error fetching Tailscale info: {e}")
- shared["TailscaleIP"] = "N/A"
- shared["DNSName"] = "N/A"
+ print(f'Error fetching Tailscale info: {e}')
+ shared['TailscaleIP'] = 'N/A'
+ shared['DNSName'] = 'N/A'
+
# Load and save configuration
@@ -150,31 +144,32 @@ def save_config(cfg):
with open(CONFIG_FILE, 'w') as f:
json.dump(cfg, f, indent=2)
except Exception as e:
- print(f"Error saving config: {e}")
+ print(f'Error saving config: {e}')
+
# Shared dictionary initialization
def initialize_shared_dict(shared):
"""Ensure all required keys exist in the shared dictionary with default values."""
- if "status_summary" not in shared:
- shared["status_summary"] = "Not Connected"
- if "uuid" not in shared:
- shared["uuid"] = "Unassigned!"
- if "TailscaleIP" not in shared:
- shared["TailscaleIP"] = "N/A"
- if "DNSName" not in shared:
- shared["DNSName"] = "N/A"
- if "battery_level" not in shared:
- shared["battery_level"] = "Unknown"
- if "WifiSignal" not in shared:
- shared["WifiSignal"] = "N/A"
- if "CellularSignal" not in shared:
- shared["CellularSignal"] = "N/A"
- if "target_pairs" not in shared:
- shared["target_pairs"] = []
- if "WifiConnections" not in shared:
- shared["WifiConnections"] = []
+ if 'status_summary' not in shared:
+ shared['status_summary'] = 'Not Connected'
+ if 'uuid' not in shared:
+ shared['uuid'] = 'Unassigned!'
+ if 'TailscaleIP' not in shared:
+ shared['TailscaleIP'] = 'N/A'
+ if 'DNSName' not in shared:
+ shared['DNSName'] = 'N/A'
+ if 'battery_level' not in shared:
+ shared['battery_level'] = 'Unknown'
+ if 'WifiSignal' not in shared:
+ shared['WifiSignal'] = 'N/A'
+ if 'CellularSignal' not in shared:
+ shared['CellularSignal'] = 'N/A'
+ if 'target_pairs' not in shared:
+ shared['target_pairs'] = []
+ if 'WifiConnections' not in shared:
+ shared['WifiConnections'] = []
def load_shared_from_config(shared):
@@ -194,19 +189,19 @@ def config_app(shared={}):
def get_data():
"""API endpoint for live data updates."""
fetch_tailscale_info(shared)
- shared["WifiSignal"] = get_wifi_signal_strength() or "N/A"
- shared["CellularSignal"] = get_cellular_signal_strength() or "N/A"
- shared["WifiConnections"] = list_wifi_connections()
+ shared['WifiSignal'] = get_wifi_signal_strength() or 'N/A'
+ shared['CellularSignal'] = get_cellular_signal_strength() or 'N/A'
+ shared['WifiConnections'] = list_wifi_connections()
return jsonify({
- "status_summary": shared.get("status_summary", "Not Connected"),
- "uuid": shared.get("uuid", "Unassigned"),
- "TailscaleIP": shared.get("TailscaleIP", "N/A"),
- "DNSName": shared.get("DNSName", "N/A"),
- "BatteryLevel": shared.get("battery_level", "Unknown"),
- "WifiSignal": shared.get("WifiSignal", "N/A"),
- "CellularSignal": shared.get("CellularSignal", "N/A"),
- "TargetPairs": shared.get("target_pairs", []),
- "WifiConnections": shared["WifiConnections"]
+ 'status_summary': shared.get('status_summary', 'Not Connected'),
+ 'uuid': shared.get('uuid', 'Unassigned'),
+ 'TailscaleIP': shared.get('TailscaleIP', 'N/A'),
+ 'DNSName': shared.get('DNSName', 'N/A'),
+ 'BatteryLevel': shared.get('battery_level', 'Unknown'),
+ 'WifiSignal': shared.get('WifiSignal', 'N/A'),
+ 'CellularSignal': shared.get('CellularSignal', 'N/A'),
+ 'TargetPairs': shared.get('target_pairs', []),
+ 'WifiConnections': shared['WifiConnections'],
})
@app.route('/wifi', methods=['POST'])
@@ -237,8 +232,7 @@ def index():
new_uuid = request.form.get('new_uuid', '')
if new_ip and new_uuid:
pairs = list(shared['target_pairs'])
- pairs.append({'ip': new_ip, 'uuid': new_uuid,
- 'status': "Not Connected"})
+ pairs.append({'ip': new_ip, 'uuid': new_uuid, 'status': 'Not Connected'})
shared['target_pairs'] = pairs
save_config(dict(shared))
elif action.startswith('remove_'):
diff --git a/archive/protective_stop_remote/protective_stop_remote/button.py b/archive/protective_stop_remote/protective_stop_remote/button.py
index f7d4f2c..b68ecab 100644
--- a/archive/protective_stop_remote/protective_stop_remote/button.py
+++ b/archive/protective_stop_remote/protective_stop_remote/button.py
@@ -1,31 +1,30 @@
-#!/usr/bin/env python3
-# -*- coding: utf-8 -*-
-
# Handles pstop button physical interaction
-import RPi.GPIO as GPIO
-import time
import logging
+import time
+
+import RPi.GPIO as GPIO
# GPIO setup
# 15 connected to 23 when button pressed
# 14 connected to 23 when button released
+
def button_node(shared):
logging.basicConfig(level=logging.DEBUG)
OUTPUT_PIN = 23
INPUT_PINS = [14, 15]
-
- if "button_pressed" not in shared:
- shared["button_pressed"] = None
-
+
+ if 'button_pressed' not in shared:
+ shared['button_pressed'] = None
+
try:
GPIO.setmode(GPIO.BCM) # Use BCM pin numbering
GPIO.setup(OUTPUT_PIN, GPIO.OUT)
-
- logging.info(f"Button Node starting at {time.time()}")
+
+ logging.info(f'Button Node starting at {time.time()}')
# The following code toggles output high and low, and checks for correct return
- # If the value is not correct, one or both of the return lines is broken
+ # If the value is not correct, one or both of the return lines is broken
while True:
GPIO.output(OUTPUT_PIN, GPIO.HIGH)
for pin in INPUT_PINS:
@@ -55,23 +54,24 @@ def button_node(shared):
new_state = None
# Log only when the state changes
- if new_state != shared["button_pressed"]:
- shared["button_pressed"] = new_state
+ if new_state != shared['button_pressed']:
+ shared['button_pressed'] = new_state
if new_state is True:
- logging.info(f"Button pressed at {time.time()}")
+ logging.info(f'Button pressed at {time.time()}')
elif new_state is False:
- logging.info(f"Button released at {time.time()}")
+ logging.info(f'Button released at {time.time()}')
else:
- logging.info(f"Button state unknown (failure) at {time.time()}")
+ logging.info(f'Button state unknown (failure) at {time.time()}')
time.sleep(0.05) # 20 Hz
-
+
except KeyboardInterrupt:
- logging.info(f"Button Node exiting at {time.time()}")
-
- finally:
+ logging.info(f'Button Node exiting at {time.time()}')
+
+ finally:
GPIO.cleanup()
-
+
+
# Running standalone if needed
if __name__ == '__main__':
button_node(shared={})
diff --git a/archive/protective_stop_remote/protective_stop_remote/main.py b/archive/protective_stop_remote/protective_stop_remote/main.py
index 6e0de92..26b91d9 100644
--- a/archive/protective_stop_remote/protective_stop_remote/main.py
+++ b/archive/protective_stop_remote/protective_stop_remote/main.py
@@ -1,19 +1,15 @@
-#!/usr/bin/env python3
-# -*- coding: utf-8 -*-
-
# Automatically started on boot, combines data from other nodes
-from multiprocessing import Process, Manager
+from multiprocessing import Manager, Process
-from power import power_node # Handles power button, UPS management, shutdown command
-from ui import ui_node # Controls the LED ring, E paper display
# Handles configuration via webpage + saving config to json file
from app import run_app
-# Main pstop node, communicates with machine node on robot
-from pstop import pstop_node
from button import button_node # Handles pstop button
+from power import power_node # Handles power button, UPS management, shutdown command
-import logging
+# Main pstop node, communicates with machine node on robot
+from pstop import pstop_node
+from ui import ui_node # Controls the LED ring, E paper display
if __name__ == '__main__':
with Manager() as manager:
@@ -22,7 +18,7 @@
# Create two processes, one for ROS node and one for UI node
power_process = Process(target=power_node, args=(shared_data,))
- app_process = Process(target=run_app, args=(shared_data,))
+ app_process = Process(target=run_app, args=(shared_data,))
ui_process = Process(target=ui_node, args=(shared_data,))
pstop_process = Process(target=pstop_node, args=(shared_data,))
button_process = Process(target=button_node, args=(shared_data,))
diff --git a/archive/protective_stop_remote/protective_stop_remote/power.py b/archive/protective_stop_remote/protective_stop_remote/power.py
index 952a520..99f8e14 100644
--- a/archive/protective_stop_remote/protective_stop_remote/power.py
+++ b/archive/protective_stop_remote/protective_stop_remote/power.py
@@ -1,28 +1,25 @@
-#!/usr/bin/env python3
-# -*- coding: utf-8 -*-
-
-
# Handles power button, UPS management, shutdown command
+import logging
+import subprocess
import time
+
import smbus
-import subprocess
-import logging
-import traceback
# ========== I²C Setup (PiSugar) ==========
I2C_BUS = 1
DEVICE_ADDR = 0x57
-REG_BUTTON = 0x02 # Register to read button state (LSB) & modify bits
-REG_ACTION = 0x09 # Register to write an action/delay if needed
+REG_BUTTON = 0x02 # Register to read button state (LSB) & modify bits
+REG_ACTION = 0x09 # Register to write an action/delay if needed
-PRESS_THRESHOLD = 0.2 # Number of seconds to hold button
+PRESS_THRESHOLD = 0.2 # Number of seconds to hold button
bus = smbus.SMBus(I2C_BUS)
button_pressed_start = None
+
def shutdown_process(shared):
- logging.info("==> Shut Down Sequence Initiated...")
+ logging.info('==> Shut Down Sequence Initiated...')
shared['shutdown'] = True
# Sets shutdown delay to 20 seconds
@@ -33,7 +30,7 @@ def shutdown_process(shared):
try:
value = bus.read_byte_data(DEVICE_ADDR, REG_BUTTON)
except OSError as e:
- logging.warning(f"I2C Read Error: {e}. Retrying...")
+ logging.warning(f'I2C Read Error: {e}. Retrying...')
continue
# Clear the 5th bit (bit index 5) of 0x02
@@ -43,26 +40,25 @@ def shutdown_process(shared):
try:
bus.write_byte_data(DEVICE_ADDR, REG_BUTTON, new_value)
verify_val = bus.read_byte_data(DEVICE_ADDR, REG_BUTTON)
- logging.info(f"Verified 0x02 after write: 0x{verify_val:02X}")
+ logging.info(f'Verified 0x02 after write: 0x{verify_val:02X}')
except OSError as e:
- logging.warning(f"I2C Write/Verify Error (REG_BUTTON): {e}")
-
+ logging.warning(f'I2C Write/Verify Error (REG_BUTTON): {e}')
# ========== Issue System Shutdown ==========
- logging.info("Issuing system shutdown command...")
- subprocess.run(["sudo", "shutdown", "-h", "now"])
+ logging.info('Issuing system shutdown command...')
+ subprocess.run(['sudo', 'shutdown', '-h', 'now'])
-def power_node(shared):
+def power_node(shared):
logging.basicConfig(level=logging.DEBUG)
# Optional: Make certain registers writable (per PiSugar docs, if needed)
try:
bus.write_byte_data(DEVICE_ADDR, 0x0B, 0x29)
- logging.info("Setting 0x0B = 0x29 to enable writable registers.")
+ logging.info('Setting 0x0B = 0x29 to enable writable registers.')
except OSError as e:
- logging.warning(f"I2C Write Error (0x0B): {e}")
-
+ logging.warning(f'I2C Write Error (0x0B): {e}')
+
shared['shutdown'] = False
# ========== Main Loop ==========
@@ -70,30 +66,31 @@ def power_node(shared):
# ========== Battery Check ==========
try:
# Use shell=True so we can pipe directly in one command
- battery_output = subprocess.check_output(
- "echo 'get battery' | nc -q 0 127.0.0.1 8423",
- shell=True
- ).decode('utf-8').strip()
+ battery_output = (
+ subprocess.check_output("echo 'get battery' | nc -q 0 127.0.0.1 8423", shell=True)
+ .decode('utf-8')
+ .strip()
+ )
# If battery_output is something like "37%" or "37", parse it
# Remove any trailing '%' if present:
battery_value = battery_output.replace('%', '').strip()[9:]
- battery_percentage = round(float(battery_value),1)
+ battery_percentage = round(float(battery_value), 1)
shared['battery_level'] = battery_percentage
- #logging.warning(f"Battery level is ({battery_percentage}%).")
+ # logging.warning(f"Battery level is ({battery_percentage}%).")
if battery_percentage < 4:
- logging.warning(f"Battery level is below 4% ({battery_percentage}%).")
+ logging.warning(f'Battery level is below 4% ({battery_percentage}%).')
shutdown_process(shared)
except Exception as e:
- logging.warning(f"Battery check failed: {e}")
+ logging.warning(f'Battery check failed: {e}')
# ========== Button Handling ==========
try:
value = bus.read_byte_data(DEVICE_ADDR, REG_BUTTON)
except OSError as e:
- logging.warning(f"I2C Read Error: {e}. Retrying...")
+ logging.warning(f'I2C Read Error: {e}. Retrying...')
time.sleep(0.1)
continue
@@ -108,7 +105,6 @@ def power_node(shared):
else:
# Check how long it's been held
if (time.time() - button_pressed_start) >= PRESS_THRESHOLD:
-
shutdown_process(shared)
# Reset press timer so we only do this once per press
button_pressed_start = None
@@ -117,7 +113,7 @@ def power_node(shared):
button_pressed_start = None
time.sleep(0.1)
-
-
+
+
if __name__ == '__main__':
power_node(shared={}) # Normal dict here for a simple test
diff --git a/archive/protective_stop_remote/protective_stop_remote/pstop.py b/archive/protective_stop_remote/protective_stop_remote/pstop.py
index 6ab5a6a..8616a49 100644
--- a/archive/protective_stop_remote/protective_stop_remote/pstop.py
+++ b/archive/protective_stop_remote/protective_stop_remote/pstop.py
@@ -1,10 +1,10 @@
-import time
+import base64
import logging
-import roslibpy
import threading
+import time
import uuid
-import base64
+import roslibpy
# TODO:
# - Handle connect/disconnect/connect more reliably
@@ -46,8 +46,15 @@ def compute_crc16(data: bytes, poly: int = 0x1021, init: int = 0xFFFF) -> int:
def build_pstop_message(
- message_id, state_id, timestamp, sender_uuid, receiver_uuid,
- counter, heartbeat_timeout, received_stamp, received_counter
+ message_id,
+ state_id,
+ timestamp,
+ sender_uuid,
+ receiver_uuid,
+ counter,
+ heartbeat_timeout,
+ received_stamp,
+ received_counter,
):
"""
Constructs a PStopMsg dictionary using the correct lifecycle state definitions.
@@ -69,22 +76,22 @@ def build_pstop_message(
# Lifecycle state definitions, from https://docs.ros2.org/foxy/api/lifecycle_msgs/msg/State.html
lifecycle_states = {
- 0: "UNKNOWN",
- 1: "UNCONFIGURED",
- 2: "INACTIVE",
- 3: "ACTIVE",
- 4: "FINALIZED",
- 10: "CONFIGURING",
- 11: "CLEANINGUP",
- 12: "SHUTTINGDOWN",
- 13: "ACTIVATING",
- 14: "DEACTIVATING",
- 15: "ERRORPROCESSING"
+ 0: 'UNKNOWN',
+ 1: 'UNCONFIGURED',
+ 2: 'INACTIVE',
+ 3: 'ACTIVE',
+ 4: 'FINALIZED',
+ 10: 'CONFIGURING',
+ 11: 'CLEANINGUP',
+ 12: 'SHUTTINGDOWN',
+ 13: 'ACTIVATING',
+ 14: 'DEACTIVATING',
+ 15: 'ERRORPROCESSING',
}
# Function to get state label dynamically
def get_state_label(state_id):
- return lifecycle_states.get(state_id, "UNKNOWN")
+ return lifecycle_states.get(state_id, 'UNKNOWN')
return {
'message': message_id,
@@ -95,9 +102,9 @@ def get_state_label(state_id):
'counter': counter,
'heartbeat_timeout': heartbeat_timeout,
'checksum_type': 'CRC-16',
- 'checksum_value': f"{compute_crc16(str(counter).encode()):04X}",
+ 'checksum_value': f'{compute_crc16(str(counter).encode()):04X}',
'received_stamp': received_stamp,
- 'received_counter': received_counter
+ 'received_counter': received_counter,
}
@@ -108,160 +115,161 @@ def start_client(ip, port, shared):
while not client.is_connected and attempt_count < max_attempts:
try:
- logging.info(
- f"Attempting to connect to {ip}:{port} (Attempt {attempt_count + 1}/{max_attempts})")
+ logging.info(f'Attempting to connect to {ip}:{port} (Attempt {attempt_count + 1}/{max_attempts})')
client.run()
client.connect()
time.sleep(0.5)
if client.is_connected:
- logging.info(f"Connected to {ip}:{port}")
- machine = next(
- (m for m in shared["target_pairs"] if m.get("ip") == ip), None)
+ logging.info(f'Connected to {ip}:{port}')
+ machine = next((m for m in shared['target_pairs'] if m.get('ip') == ip), None)
if machine:
- machine['status'] = "Connected"
+ machine['status'] = 'Connected'
else:
- logging.error(f"Connection to {ip}:{port} failed")
+ logging.error(f'Connection to {ip}:{port} failed')
client.close()
except Exception as e:
- logging.error(f"Connection to {ip}:{port} failed: {e}")
+ logging.error(f'Connection to {ip}:{port} failed: {e}')
attempt_count += 1
- machine = next(
- (m for m in shared["target_pairs"] if m.get("ip") == ip), None)
+ machine = next((m for m in shared['target_pairs'] if m.get('ip') == ip), None)
if machine:
- machine['status'] = f"Retrying ({attempt_count}/{max_attempts})"
+ machine['status'] = f'Retrying ({attempt_count}/{max_attempts})'
if attempt_count < max_attempts:
- logging.info("Retrying in 5 seconds...")
+ logging.info('Retrying in 5 seconds...')
time.sleep(5)
if not client.is_connected:
- logging.error(
- f"Failed to connect to {ip}:{port} after {max_attempts} attempts.")
- machine = next(
- (m for m in shared["target_pairs"] if m.get("ip") == ip), None)
+ logging.error(f'Failed to connect to {ip}:{port} after {max_attempts} attempts.')
+ machine = next((m for m in shared['target_pairs'] if m.get('ip') == ip), None)
if machine:
- machine['status'] = "Failed"
+ machine['status'] = 'Failed'
return client
def setup_client(client, ip, port, shared):
- publisher = roslibpy.Topic(
- client, '/protective_stop', 'pstop_msg/msg/PStopMsg')
+ publisher = roslibpy.Topic(client, '/protective_stop', 'pstop_msg/msg/PStopMsg')
publisher.advertise()
- subscriber = roslibpy.Topic(
- client, '/protective_stop', 'pstop_msg/msg/PStopMsg')
+ subscriber = roslibpy.Topic(client, '/protective_stop', 'pstop_msg/msg/PStopMsg')
bonding_status = {} # Track bonding state per machine UUID
try:
- self_uuid = uuid.UUID(shared.get("uuid"))
+ self_uuid = uuid.UUID(shared.get('uuid'))
except ValueError:
- logging.error(f"Invalid self UUID: {shared.get('uuid')}")
+ logging.error(f'Invalid self UUID: {shared.get("uuid")}')
return
def callback(message):
- sender_uuid_raw = message.get("id", {}).get("uuid")
- receiver_uuid_raw = message.get("receiver_uuid", {}).get("uuid")
+ sender_uuid_raw = message.get('id', {}).get('uuid')
+ receiver_uuid_raw = message.get('receiver_uuid', {}).get('uuid')
sender_uuid = None
receiver_uuid = None
- logging.info(f"⚠️ Received message: {message}")
+ logging.info(f'⚠️ Received message: {message}')
# ✅ Ensure sender UUID is extracted correctly
if isinstance(sender_uuid_raw, list) and len(sender_uuid_raw) == 16:
sender_uuid = uuid.UUID(bytes=bytes(sender_uuid_raw))
elif isinstance(sender_uuid_raw, str):
try:
- sender_uuid = uuid.UUID(
- bytes=base64.b64decode(sender_uuid_raw))
+ sender_uuid = uuid.UUID(bytes=base64.b64decode(sender_uuid_raw))
except Exception:
- logging.warning(
- f"⚠️ Failed to decode sender UUID from Base64: {sender_uuid_raw}")
+ logging.warning(f'⚠️ Failed to decode sender UUID from Base64: {sender_uuid_raw}')
# ✅ Ensure receiver UUID is extracted correctly
if isinstance(receiver_uuid_raw, list) and len(receiver_uuid_raw) == 16:
receiver_uuid = uuid.UUID(bytes=bytes(receiver_uuid_raw))
elif isinstance(receiver_uuid_raw, str):
try:
- receiver_uuid = uuid.UUID(
- bytes=base64.b64decode(receiver_uuid_raw))
+ receiver_uuid = uuid.UUID(bytes=base64.b64decode(receiver_uuid_raw))
except Exception:
- logging.warning(
- f"⚠️ Failed to decode receiver UUID from Base64: {receiver_uuid_raw}")
+ logging.warning(f'⚠️ Failed to decode receiver UUID from Base64: {receiver_uuid_raw}')
# ✅ Convert self UUID for consistent comparison
try:
- self_uuid = uuid.UUID(shared.get("uuid"))
+ self_uuid = uuid.UUID(shared.get('uuid'))
except ValueError:
- logging.error(f"❌ Invalid self UUID: {shared.get('uuid')}")
+ logging.error(f'❌ Invalid self UUID: {shared.get("uuid")}')
return
- logging.info(
- f"Sender UUID: {sender_uuid}, Receiver UUID: {receiver_uuid}, Self UUID: {self_uuid}")
+ logging.info(f'Sender UUID: {sender_uuid}, Receiver UUID: {receiver_uuid}, Self UUID: {self_uuid}')
if sender_uuid and sender_uuid == self_uuid:
- logging.info(
- f"❌ Ignoring self-generated message from UUID {self_uuid}")
+ logging.info(f'❌ Ignoring self-generated message from UUID {self_uuid}')
return
sender_str = str(sender_uuid)
- state_id = message["state"]["id"]
+ state_id = message['state']['id']
if state_id == 13: # TRANSITION_STATE_ACTIVATING
- logging.info(f"🔄 Bonding started with {sender_uuid}")
- bonding_status[sender_str] = "ACTIVATING"
+ logging.info(f'🔄 Bonding started with {sender_uuid}')
+ bonding_status[sender_str] = 'ACTIVATING'
response = build_pstop_message(
- PSTOP_OK, 13, message["stamp"], message["id"], message["receiver_uuid"],
- message["counter"], message["heartbeat_timeout"],
- message["received_stamp"], message["received_counter"]
+ PSTOP_OK,
+ 13,
+ message['stamp'],
+ message['id'],
+ message['receiver_uuid'],
+ message['counter'],
+ message['heartbeat_timeout'],
+ message['received_stamp'],
+ message['received_counter'],
)
publisher.publish(roslibpy.Message(response))
elif state_id == 3: # PRIMARY_STATE_ACTIVE
- if bonding_status.get(sender_str) == "ACTIVATING":
- logging.info(f"✅ Bonded successfully with {sender_uuid}")
- bonding_status[sender_str] = "ACTIVE"
+ if bonding_status.get(sender_str) == 'ACTIVATING':
+ logging.info(f'✅ Bonded successfully with {sender_uuid}')
+ bonding_status[sender_str] = 'ACTIVE'
elif state_id == 14: # TRANSITION_STATE_DEACTIVATING
- logging.info(f"❌ Unbonding from {sender_uuid}")
- bonding_status[sender_str] = "DEACTIVATING"
+ logging.info(f'❌ Unbonding from {sender_uuid}')
+ bonding_status[sender_str] = 'DEACTIVATING'
response = build_pstop_message(
- PSTOP_OK, 14, message["stamp"], message["id"], message["receiver_uuid"],
- message["counter"], message["heartbeat_timeout"],
- message["received_stamp"], message["received_counter"]
+ PSTOP_OK,
+ 14,
+ message['stamp'],
+ message['id'],
+ message['receiver_uuid'],
+ message['counter'],
+ message['heartbeat_timeout'],
+ message['received_stamp'],
+ message['received_counter'],
)
publisher.publish(roslibpy.Message(response))
elif state_id == 2: # PRIMARY_STATE_INACTIVE
- if bonding_status.get(sender_str) == "DEACTIVATING":
- logging.info(f"✅ Successfully unbonded from {sender_uuid}")
+ if bonding_status.get(sender_str) == 'DEACTIVATING':
+ logging.info(f'✅ Successfully unbonded from {sender_uuid}')
bonding_status.pop(sender_str, None)
subscriber.subscribe(callback)
- logging.info(f"Publisher and subscriber set up for {ip}:{port}")
+ logging.info(f'Publisher and subscriber set up for {ip}:{port}')
def bonding_thread():
while True:
- for target in shared.get("target_pairs", []):
- target_uuid = target.get("uuid")
- if target_uuid and target["ip"] == ip and target_uuid not in bonding_status:
- logging.info(f"🔄 Initiating bonding with {target_uuid}")
+ for target in shared.get('target_pairs', []):
+ target_uuid = target.get('uuid')
+ if target_uuid and target['ip'] == ip and target_uuid not in bonding_status:
+ logging.info(f'🔄 Initiating bonding with {target_uuid}')
bonding_msg = build_pstop_message(
- PSTOP_OK, 13, # ✅ Uses TRANSITION_STATE_ACTIVATING from build_pstop_message()
- {"sec": int(time.time()), "nanosec": int(
- (time.time() % 1) * 1e9)},
- {"uuid": str(self_uuid)},
- {"uuid": target_uuid},
- 0, {"sec": 1, "nanosec": 500000000},
- {"sec": 0, "nanosec": 0}, 0
+ PSTOP_OK,
+ 13, # ✅ Uses TRANSITION_STATE_ACTIVATING from build_pstop_message()
+ {'sec': int(time.time()), 'nanosec': int((time.time() % 1) * 1e9)},
+ {'uuid': str(self_uuid)},
+ {'uuid': target_uuid},
+ 0,
+ {'sec': 1, 'nanosec': 500000000},
+ {'sec': 0, 'nanosec': 0},
+ 0,
)
publisher.publish(roslibpy.Message(bonding_msg))
- bonding_status[target_uuid] = "ACTIVATING"
+ bonding_status[target_uuid] = 'ACTIVATING'
time.sleep(1)
@@ -273,55 +281,57 @@ def client_thread(ip, rosbridge_port, shared, connected_clients):
client = start_client(ip, rosbridge_port, shared)
publisher, subscriber = setup_client(client, ip, rosbridge_port, shared)
connected_clients[ip] = {
- "client": client,
- "publisher": publisher,
- "subscriber": subscriber,
- "thread": threading.current_thread()
+ 'client': client,
+ 'publisher': publisher,
+ 'subscriber': subscriber,
+ 'thread': threading.current_thread(),
}
counter = 0
# ✅ Ensure receiver UUID is extracted correctly
- receiver_uuid_str = next(
- (m["uuid"] for m in shared["target_pairs"] if m["ip"] == ip), None)
- receiver_uuid = uuid.UUID(receiver_uuid_str.strip(
- )) if receiver_uuid_str else uuid.UUID(int=0)
+ receiver_uuid_str = next((m['uuid'] for m in shared['target_pairs'] if m['ip'] == ip), None)
+ receiver_uuid = uuid.UUID(receiver_uuid_str.strip()) if receiver_uuid_str else uuid.UUID(int=0)
while ip in connected_clients:
if not client.is_connected:
- logging.info(f"Client {ip} disconnected, attempting reconnection.")
+ logging.info(f'Client {ip} disconnected, attempting reconnection.')
client = start_client(ip, rosbridge_port, shared)
- publisher, subscriber = setup_client(
- client, ip, rosbridge_port, shared)
+ publisher, subscriber = setup_client(client, ip, rosbridge_port, shared)
connected_clients[ip] = {
- "client": client,
- "publisher": publisher,
- "subscriber": subscriber,
- "thread": threading.current_thread()
+ 'client': client,
+ 'publisher': publisher,
+ 'subscriber': subscriber,
+ 'thread': threading.current_thread(),
}
# ✅ Convert self UUID properly
- self_uuid = uuid.UUID(shared["uuid"].strip())
+ self_uuid = uuid.UUID(shared['uuid'].strip())
# ✅ Convert both UUIDs to `uint8[16]` lists before sending
- id = {"uuid": list(self_uuid.bytes)}
- receiver_uuid = {"uuid": list(receiver_uuid.bytes)}
+ id = {'uuid': list(self_uuid.bytes)}
+ receiver_uuid = {'uuid': list(receiver_uuid.bytes)}
# ✅ Build and send a protective stop message
- timestamp = {'sec': int(time.time()), 'nanosec': int(
- (time.time() % 1) * 1e9)}
+ timestamp = {'sec': int(time.time()), 'nanosec': int((time.time() % 1) * 1e9)}
state = {'id': 1, 'label': 'ACTIVE'}
heartbeat_timeout = {'sec': 1, 'nanosec': 500000000}
- received_stamp = {
- 'sec': timestamp['sec'], 'nanosec': timestamp['nanosec']}
+ received_stamp = {'sec': timestamp['sec'], 'nanosec': timestamp['nanosec']}
received_counter = counter
button = PSTOP_STOP
- if shared.get("button_pressed") is False:
+ if shared.get('button_pressed') is False:
button = PSTOP_OK # ✅ Only set to OK if the button is NOT pressed
message = build_pstop_message(
- button, state["id"], timestamp, id, receiver_uuid, counter,
- heartbeat_timeout, received_stamp, received_counter
+ button,
+ state['id'],
+ timestamp,
+ id,
+ receiver_uuid,
+ counter,
+ heartbeat_timeout,
+ received_stamp,
+ received_counter,
)
publisher.publish(roslibpy.Message(message))
counter += 1
@@ -331,62 +341,56 @@ def client_thread(ip, rosbridge_port, shared, connected_clients):
def pstop_node(shared):
logging.basicConfig(level=logging.DEBUG)
- logging.info(f"PStop Node started at {time.time()}")
+ logging.info(f'PStop Node started at {time.time()}')
- while "target_pairs" not in shared:
+ while 'target_pairs' not in shared:
time.sleep(0.1)
connected_clients = {}
try:
while True:
- current_machines = {machine['ip'] for machine in shared.get(
- "target_pairs", {}) if 'ip' in machine}
+ current_machines = {machine['ip'] for machine in shared.get('target_pairs', {}) if 'ip' in machine}
current_ips = set(connected_clients.keys())
ips_to_add = current_machines - current_ips
ips_to_remove = current_ips - current_machines
# Add new connections
for ip in ips_to_add:
- logging.info(f"Starting client thread for {ip}")
- thread = threading.Thread(target=client_thread, args=(
- ip, rosbridge_port, shared, connected_clients))
+ logging.info(f'Starting client thread for {ip}')
+ thread = threading.Thread(target=client_thread, args=(ip, rosbridge_port, shared, connected_clients))
thread.daemon = True
thread.start()
# Remove old connections
for ip in ips_to_remove:
- logging.info(f"Stopping client thread for {ip}")
+ logging.info(f'Stopping client thread for {ip}')
connection = connected_clients.pop(ip, None)
if connection:
- connection["publisher"].unadvertise()
- connection["subscriber"].unsubscribe()
- connection["client"].close()
+ connection['publisher'].unadvertise()
+ connection['subscriber'].unsubscribe()
+ connection['client'].close()
time.sleep(1)
except KeyboardInterrupt:
- logging.info("PStop Node exiting.")
+ logging.info('PStop Node exiting.')
finally:
for ip, connection in connected_clients.items():
- logging.info(f"Cleaning up connection for {ip}")
- connection["publisher"].unadvertise()
- connection["subscriber"].unsubscribe()
- connection["client"].terminate()
+ logging.info(f'Cleaning up connection for {ip}')
+ connection['publisher'].unadvertise()
+ connection['subscriber'].unsubscribe()
+ connection['client'].terminate()
# Running standalone for testing, with some default values
if __name__ == '__main__':
shared = {
- "target_pairs": [
- {
- "ip": "100.125.44.33",
- "uuid": "00000000-0000-0000-0000-000000000000",
- "status": "Not Connected"
- }
+ 'target_pairs': [
+ {'ip': '100.125.44.33', 'uuid': '00000000-0000-0000-0000-000000000000', 'status': 'Not Connected'}
],
- "button_pressed": False,
- "uuid": "00000000-0000-0000-0000-000000000001",
+ 'button_pressed': False,
+ 'uuid': '00000000-0000-0000-0000-000000000001',
}
pstop_node(shared)
diff --git a/archive/protective_stop_remote/protective_stop_remote/templates/index.html b/archive/protective_stop_remote/protective_stop_remote/templates/index.html
index a51812a..6ff341b 100644
--- a/archive/protective_stop_remote/protective_stop_remote/templates/index.html
+++ b/archive/protective_stop_remote/protective_stop_remote/templates/index.html
@@ -9,7 +9,7 @@
.then(response => response.json())
.then(data => {
document.getElementById('status_summary').textContent = data.status_summary;
-
+
// Update Tailscale information
document.getElementById('tailscale-ip').textContent = data.TailscaleIP;
document.getElementById('dns-name').textContent = data.DNSName;
@@ -61,7 +61,7 @@