diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
new file mode 100644
index 0000000..cf27180
--- /dev/null
+++ b/.github/workflows/build.yml
@@ -0,0 +1,30 @@
+---
+name: ROS2 Build and Test
+
+on:
+ push:
+ branches: [main]
+ pull_request:
+
+jobs:
+ build_and_test_ros2:
+ runs-on: ubuntu-latest
+ strategy:
+ fail-fast: false
+ matrix:
+ include:
+ - rosdistro: humble
+ ubuntu: jammy
+ - rosdistro: jazzy
+ ubuntu: noble
+ - rosdistro: kilted
+ ubuntu: noble
+ - rosdistro: rolling
+ ubuntu: noble
+ container:
+ image: rostooling/setup-ros-docker:ubuntu-${{ matrix.ubuntu }}-latest
+ steps:
+ - name: Build and run tests
+ uses: ros-tooling/action-ros-ci@v0.4
+ with:
+ target-ros2-distro: ${{ matrix.rosdistro }}
diff --git a/.github/workflows/pre-commit.yml b/.github/workflows/pre-commit.yml
new file mode 100644
index 0000000..3ec904e
--- /dev/null
+++ b/.github/workflows/pre-commit.yml
@@ -0,0 +1,17 @@
+---
+name: Pre-commit
+
+on:
+ push:
+ branches: [main]
+ pull_request:
+
+jobs:
+ pre-commit:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v6
+ - uses: actions/setup-python@v6
+ with:
+ python-version: '3.10'
+ - uses: pre-commit/action@v3.0.1
diff --git a/.gitignore b/.gitignore
index d213fd2..3cf700d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,4 +2,4 @@
__pycache__
.pytest_cache
node_modules
-/.gitlab-ci-local/
+/.ruff.toml
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
deleted file mode 100644
index de3be01..0000000
--- a/.gitlab-ci.yml
+++ /dev/null
@@ -1,29 +0,0 @@
----
-
-stages:
- - build
- - test
- - evaluate
-
-workflow:
- rules:
- - if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH
- variables:
- DEFAULT_VCS_CHECKOUT: ""
- - if: $CI_COMMIT_TAG
- variables:
- DEFAULT_VCS_CHECKOUT: ""
- - if: $CI_PIPELINE_SOURCE == 'merge_request_event'
- variables:
- DEFAULT_VCS_CHECKOUT: $CI_COMMIT_REF_NAME
-
-include:
- - component: gitlab.com/polymathrobotics/polymath_core/ros2-package@ci-1.3
- inputs:
- base_image: docker.io/polymathrobotics/ros:humble-builder-ubuntu
-
-# Build all packages in the workspace and run their test suites
-ros2_build_and_test:
- extends: .ros2_build_and_test
- variables:
- PACKAGES: ""
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
new file mode 100644
index 0000000..3923d8a
--- /dev/null
+++ b/.pre-commit-config.yaml
@@ -0,0 +1,21 @@
+---
+# Apply Polymath Code Standard formatters and linters
+repos:
+ - repo: https://github.com/polymathrobotics/polymath_code_standard
+ rev: v2.1.1
+ hooks:
+ # Everything
+ - id: polymath-general
+ - id: polymath-copyright
+ args: [--license, Apache-2.0, --wildcard-copyright-org, --reuse-style]
+ # Implementation
+ - id: polymath-cmake
+ - id: polymath-cpp
+ exclude: ^pstop_c/
+ - id: polymath-json
+ - id: polymath-python
+ # Mark(up/down)
+ - id: polymath-markdown
+ - id: polymath-shell
+ - id: polymath-xml
+ - id: polymath-yaml
diff --git a/LICENSE b/LICENSE
index 2f80464..ea2db4f 100644
--- a/LICENSE
+++ b/LICENSE
@@ -186,7 +186,7 @@
same "printed page" as the copyright notice for easier
identification within third-party archives.
- Copyright [2024] [Polymath Robotics, Inc.]
+ Copyright 2024 Polymath Robotics, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
diff --git a/README.md b/README.md
index d5ae582..bee4fb3 100644
--- a/README.md
+++ b/README.md
@@ -42,7 +42,6 @@ To register a remote protective stop, the node also exposes two services:
In order to integrate with the node, you have to activate it first before starting to send heartbeat messages.
-
#### User Monitor Mode
**USE WITH EXTREME CAUTION**
@@ -53,14 +52,12 @@ In the case you have a test driver or other onsite personnel to operate a physic
ros2 param set /protective_stop_node is_user_monitored False
```
-
### Protective Stop Remote
The remote is run next to the operator of the P-Stop. It exchanges heartbeat messages with the node on the robot, the latter of which will signal if it detects that the heartbeats are coming in at the expected rate.
In this repo currently, we've implemented it as a web client, but it can also be instantiated as a hardware interface.
-
### Foxglove Websocket Bridge
The bridge acts as the proxy between the remote and the node, so the two can talk to one-another. If it is not online, the other components will fail gracefully.
@@ -73,4 +70,3 @@ cd build
cmake ..
make
```
-
diff --git a/archive/docs/Case.md b/archive/docs/Case.md
index 926304b..84dd5c0 100644
--- a/archive/docs/Case.md
+++ b/archive/docs/Case.md
@@ -1 +1 @@
-Instructions to be added
\ No newline at end of file
+Instructions to be added
diff --git a/archive/docs/Hardware.md b/archive/docs/Hardware.md
index 4941e89..61a7705 100644
--- a/archive/docs/Hardware.md
+++ b/archive/docs/Hardware.md
@@ -24,8 +24,7 @@
-
-2. Solder Raspberry Pi GPIO pin extender
+1. Solder Raspberry Pi GPIO pin extender
- Use flux to make soldering easier
- Secure a few pins on each end first for better stability
- May want to clean singe marks from flux after soldering; we used a toothbrush and water for this.
@@ -33,8 +32,7 @@
Left = before, right = after
-
-3. Apply Raspberry Pi heat sink.
+1. Apply Raspberry Pi heat sink.
- The Geekworm heatsink we used came with two thermal pads, one is 0.5mm and the other is 1mm. Make sure each pad goes in the right place! It comes with a diagram.
Before
@@ -49,8 +47,7 @@
Thermal Pad Placement
-
-4. Add e-ink display to UPS using GPIO header.
+1. Add e-ink display to UPS using GPIO header.
Top to bottom: UPS, GPIO header, e-ink display
@@ -61,16 +58,16 @@
Install e-ink to UPS by plugging in GPIO header to pins
-
## Wiring
Now it’s time to starting connecting stuff together. The connections at a glance are shown below.
+
```mermaid
graph TD
USB-C -->|Power Out| UPS(UPS)
UPS(UPS) -->|Power Out| R(Raspi)
R -->|Control| ES(Eink Screen)
R <-->|USB Power and Data| C(Cell Modem)
- C -->|USB Power and Data| E(ESP32)
+ C -->|USB Power and Data| E(ESP32)
E -->|D2 to DI, + and GND| L(LED Ring)
E -->|D22 to switch, + and GNDx2| PB(Power Button)
PB -->|switch to input| UPS
@@ -99,7 +96,7 @@ We will go through the wire soldering connections in the diagram above one by on
Corresponding ESP32 wiring for stop button
-2. LED Ring to ESP32
+1. LED Ring to ESP32
- LED Ring DI → ESP32 D2 (Green wire)
- LED Ring PWR5V → ESP32 VIN (Red wire)
- LED Ring GND → ESP32 GND (Black wire)
@@ -111,7 +108,7 @@ We will go through the wire soldering connections in the diagram above one by on
Corresponding ESP32 wiring for LED ring
-3. Power Button to ESP32 and UPS
+1. Power Button to ESP32 and UPS
- Power Button LED power terminal → ESP32 D22 and UPS pin 3 (White wires)
- Power Button LED ground and switch contact A → ESP32 GND (Black wire)
- Power Button switch contact B→ ESP32 D23 (Red wire)
@@ -134,12 +131,12 @@ We will go through the wire soldering connections in the diagram above one by on
- Two white wires should be connected to the same terminal on the button (power button LED power terminal)
- - Tip: Solder these wires together first, and then solder them to the terminal
+ - Tip: Solder these wires together first, and then solder them to the terminal
- The black wire is soldered across two power button terminals (power button LED ground and switch contact A)
- - Tip: Solder the end of the wire to the further terminal first, then the closer one
+ - Tip: Solder the end of the wire to the further terminal first, then the closer one
- Red wire is soldered normally
-4. UPS to ESP32 (battery indicator)
+1. UPS to ESP32 (battery indicator)
- Lowbat to D34 (yellow wire)
UPS lowbat soldering
@@ -148,16 +145,16 @@ We will go through the wire soldering connections in the diagram above one by on
ESP32 pin D34 soldering
-5. Connect cell modem to ESP32 and RPi UPS.
+1. Connect cell modem to ESP32 and RPi UPS.
- Modem USB → ESP32 microUSB (solid black cable)
- Modem microUSB → UPS microUSB (black/white cable)
-6. Add battery to USP.
+1. Add battery to USP.
Wired and connected system.
-7. Add antennas - if you want to use cellular data instead of WiFi
+1. Add antennas - if you want to use cellular data instead of WiFi
- Screw on antennas to AUX and MAIN pins on the underside of the modem
-Now you're ready to move onto [Software](/docs/Software.md)
\ No newline at end of file
+Now you're ready to move onto [Software](/docs/Software.md)
diff --git a/archive/docs/Robot-Quickstart.md b/archive/docs/Robot-Quickstart.md
index 141560f..d6912cb 100644
--- a/archive/docs/Robot-Quickstart.md
+++ b/archive/docs/Robot-Quickstart.md
@@ -1,45 +1,59 @@
# Robot Quick-Start
+
Have a protective stop and want to set up your robot to work with it? Follow these instructions.
## Install Tailscale
+
1. On your robot, install Tailscale on it with the following commands. You will be provided a link to log in to your Tailscale account after the last step.
-```bash
-curl -fsSL https://pkgs.tailscale.com/stable/ubuntu/focal.noarmor.gpg | sudo tee /usr/share/keyrings/tailscale-archive-keyring.gpg >/dev/null
-curl -fsSL https://pkgs.tailscale.com/stable/ubuntu/focal.tailscale-keyring.list | sudo tee /etc/apt/sources.list.d/tailscale.list
-sudo apt-get update
-sudo apt-get install tailscale
-sudo tailscale up
-```
-2. We can then check the Tailscale address of the robot with `tailscale ip -4`.
+ ```bash
+ curl -fsSL https://pkgs.tailscale.com/stable/ubuntu/focal.noarmor.gpg | sudo tee /usr/share/keyrings/tailscale-archive-keyring.gpg >/dev/null
+ curl -fsSL https://pkgs.tailscale.com/stable/ubuntu/focal.tailscale-keyring.list | sudo tee /etc/apt/sources.list.d/tailscale.list
+ sudo apt-get update
+ sudo apt-get install tailscale
+ sudo tailscale up
+ ```
+
+1. We can then check the Tailscale address of the robot with `tailscale ip -4`.
## roslibpy Custom Message Set-up
+
On your local machine (or robot that you want the p-stop to control):
1. Install rosbridge-server with `sudo apt-get install ros-$ROS-DISTRO-rosbridge-server`.
-2. Create a colcon workspace if you don't already have one:
-```
-mkdir -p colcon_ws/src
-cd colcon_ws/src
-```
-3. Put the [`pstop_msg`](../pstop_msg) folder in this repo inside the `src` directory. It contains a custom message definition for our protective stop.
-4. Go back to the `colcon_ws` directory and run:
-```
-colcon build
-source install/setup.bash
-```
-5. To verify that our custom message set-up was successful, you can run `ros2 interface show pstop_msg/msg/EStopMsg`, which should print out the message definition.
+1. Create a colcon workspace if you don't already have one:
+
+ ```
+ mkdir -p colcon_ws/src
+ cd colcon_ws/src
+ ```
+
+1. Put the [`pstop_msg`](../pstop_msg) folder in this repo inside the `src` directory. It contains a custom message definition for our protective stop.
+1. Go back to the `colcon_ws` directory and run:
+
+ ```
+ colcon build
+ source install/setup.bash
+ ```
+
+1. To verify that our custom message set-up was successful, you can run `ros2 interface show pstop_msg/msg/EStopMsg`, which should print out the message definition.
## roslibpy Client Set-up
+
In a terminal on your local machine where you have built and sourced the custom message type above:
+
1. Launch a rosbridge_server with `ros2 launch rosbridge_server rosbridge_websocket_launch.xml`.
-- If you are running this in a Docker container, make sure that port 9090 is exposed by doing `docker run -p 9090:9090 ...`
-2. To run the roslibpy client, run `python roslibpy_client.py [ip address]`.
-- You can set a default ip address by altering line 12 of [`roslibpy_client.py`](../roslibpy_client.py):
-```
-parser.add_argument('target', type=str, help='Target IP address', default='')
-```
-3. To use the flask interface, connect to the Raspberry Pi through SSH with portforwarding:
-```
-ssh -L 8000:localhost:5000 [username]@[tailscale ip]
-```
-- Now, you should be able to see the flask interface by going to `http://localhost:8000/config`.
\ No newline at end of file
+ - If you are running this in a Docker container, make sure that port 9090 is exposed by doing `docker run -p 9090:9090 ...`
+1. To run the roslibpy client, run `python roslibpy_client.py [ip address]`.
+ - You can set a default ip address by altering line 12 of [`roslibpy_client.py`](../roslibpy_client.py):
+
+ ```
+ parser.add_argument('target', type=str, help='Target IP address', default='')
+ ```
+
+1. To use the flask interface, connect to the Raspberry Pi through SSH with portforwarding:
+
+ ```
+ ssh -L 8000:localhost:5000 [username]@[tailscale ip]
+ ```
+
+ - Now, you should be able to see the flask interface by going to `http://localhost:8000/config`.
diff --git a/archive/docs/Software.md b/archive/docs/Software.md
index e2b2bfb..d99143f 100644
--- a/archive/docs/Software.md
+++ b/archive/docs/Software.md
@@ -1,225 +1,257 @@
# Software/Firmware
## Imaging Raspberry Pi
+
1. Basic Set-up
-- Download [Raspberry Pi OS image](https://downloads.raspberrypi.com/raspios_lite_arm64/images/raspios_lite_arm64-2024-03-15/2024-03-15-raspios-bookworm-arm64-lite.img.xz?_gl=1*13v8f9s*_ga*MTc2NDY2NjEzLjE3MTIzNTk2OTQ.*_ga_22FD70LWDS*MTcxMzgyMDE2OC4zLjEuMTcxMzgyMDE3My4wLjAuMA..).
-- Take a microSD card and insert it into a computer with [Raspberry Pi Imager](https://www.raspberrypi.com/software/). Use a microSD adapter if necessary.
-- In the imager:
- - For device, elect “Raspberry Pi Zero 2W”.
- - For operating system, use “Custom” and use the image you just downloaded, which should be called `2024-03-15-raspios-bookworm-arm64-lite.img`.
- - For storage, choose the microSD card you just inserted.
-
- Raspberry Pi Imager
-
-
-2. Custom settings — “General”
-- After clicking next, there is a pop-up for OS customization. Click “Edit Settings”.
-- Set a hostname, as well as a username and password
-- Input WiFi credentials - note that you likely can't use a 5G network.
-
- Custom Settings - General
-
-
-3. Enable SSH
-- Go to the “Services” tab and click Enable SSH.
-- Use password authentication so other people can communicate with the Raspberry Pi other than you.
-
- Custom Settings - General
-
-
-4. Proceed with flashing the Pi — there will be a “write” and “verify” step.
-
-5. Remove microSD from your computer and insert it in the Raspberry Pi. Turn on power and wait a couple minutes for the Raspberry Pi to boot.
-
-6. You can use find the IP address of the Raspberry Pi by going on your computer and running `nmap -p 22 --open [your network range]`.
-- Your network range will be something like `192.168.93.0/24`, and can be found by running `ifconfig` or `ip addr`.
-- Alternatively, you can use the more aggressive `nmap -sV -p 22 192.168.93.0/24`, and identify the Raspberry Pi from its operating system.
-
-7. Once you’ve found the IP address of the Raspberry Pi, connect to it over SSH by using: `ssh [username]@[ip address]`
-- Here `[username]` is the username you set during the configuration step before, and the `[ip address]` is the IP address of the Raspberry Pi you just found.
-- You will be prompted for the password — this is the password set earlier during the OS customization step.
+ - Download [Raspberry Pi OS image](https://downloads.raspberrypi.com/raspios_lite_arm64/images/raspios_lite_arm64-2024-03-15/2024-03-15-raspios-bookworm-arm64-lite.img.xz?_gl=1*13v8f9s*_ga*MTc2NDY2NjEzLjE3MTIzNTk2OTQ.*_ga_22FD70LWDS*MTcxMzgyMDE2OC4zLjEuMTcxMzgyMDE3My4wLjAuMA..).
+ - Take a microSD card and insert it into a computer with [Raspberry Pi Imager](https://www.raspberrypi.com/software/). Use a microSD adapter if necessary.
+ - In the imager:
+ - For device, elect “Raspberry Pi Zero 2W”.
+ - For operating system, use “Custom” and use the image you just downloaded, which should be called `2024-03-15-raspios-bookworm-arm64-lite.img`.
+ - For storage, choose the microSD card you just inserted.
+
+ Raspberry Pi Imager
+
+
+1. Custom settings — “General”
+ - After clicking next, there is a pop-up for OS customization. Click “Edit Settings”.
+ - Set a hostname, as well as a username and password
+ - Input WiFi credentials - note that you likely can't use a 5G network.
+
+ Custom Settings - General
+
+
+1. Enable SSH
+ - Go to the “Services” tab and click Enable SSH.
+ - Use password authentication so other people can communicate with the Raspberry Pi other than you.
+
+ Custom Settings - General
+
+
+1. Proceed with flashing the Pi — there will be a “write” and “verify” step.
+
+1. Remove microSD from your computer and insert it in the Raspberry Pi. Turn on power and wait a couple minutes for the Raspberry Pi to boot.
+
+1. You can use find the IP address of the Raspberry Pi by going on your computer and running `nmap -p 22 --open [your network range]`.
+ - Your network range will be something like `192.168.93.0/24`, and can be found by running `ifconfig` or `ip addr`.
+ - Alternatively, you can use the more aggressive `nmap -sV -p 22 192.168.93.0/24`, and identify the Raspberry Pi from its operating system.
+
+1. Once you’ve found the IP address of the Raspberry Pi, connect to it over SSH by using: `ssh [username]@[ip address]`
+ - Here `[username]` is the username you set during the configuration step before, and the `[ip address]` is the IP address of the Raspberry Pi you just found.
+ - You will be prompted for the password — this is the password set earlier during the OS customization step.
## Raspberry Pi Software Installation
+
1. After connecting to the Raspberry Pi through SSH, install Tailscale on it with the following commands. You will be provided a link to log in to your Tailscale account after the last step.
-```bash
-curl -fsSL https://pkgs.tailscale.com/stable/ubuntu/focal.noarmor.gpg | sudo tee /usr/share/keyrings/tailscale-archive-keyring.gpg >/dev/null
-curl -fsSL https://pkgs.tailscale.com/stable/ubuntu/focal.tailscale-keyring.list | sudo tee /etc/apt/sources.list.d/tailscale.list
-sudo apt-get update
-sudo apt-get install tailscale
-sudo tailscale up
-```
-2. We can then check the Tailscale address of the Rasperry Pi with `tailscale ip -4`. Now, we can access the Raspberry Pi over VPN anywhere through Tailscale by using `ssh [username]@[tailscale ip]`
-
-3. Install the following packages on the Raspberry Pi:
-- Install chrony with `sudo apt install chrony`
-- Install pip with `sudo apt install python3-pip -y`
-- Install PySerial with `sudo pip install pyserial --break-system-packages`
-- Install the roslibpy with:
-```
-sudo apt install git-all
-git clone https://github.com/gramaziokohler/roslibpy.git
-cd roslibpy
-sudo pip install -r requirements-dev.txt --break-system-packages
-```
-- For more information on roslibpy, refer to the [roslibpy repo](https://github.com/gramaziokohler/roslibpy).
+
+ ```bash
+ curl -fsSL https://pkgs.tailscale.com/stable/ubuntu/focal.noarmor.gpg | sudo tee /usr/share/keyrings/tailscale-archive-keyring.gpg >/dev/null
+ curl -fsSL https://pkgs.tailscale.com/stable/ubuntu/focal.tailscale-keyring.list | sudo tee /etc/apt/sources.list.d/tailscale.list
+ sudo apt-get update
+ sudo apt-get install tailscale
+ sudo tailscale up
+ ```
+
+1. We can then check the Tailscale address of the Rasperry Pi with `tailscale ip -4`. Now, we can access the Raspberry Pi over VPN anywhere through Tailscale by using `ssh [username]@[tailscale ip]`
+1. Install the following packages on the Raspberry Pi:
+ - Install chrony with `sudo apt install chrony`
+ - Install pip with `sudo apt install python3-pip -y`
+ - Install PySerial with `sudo pip install pyserial --break-system-packages`
+ - Install the roslibpy with:
+
+ ```
+ sudo apt install git-all
+ git clone https://github.com/gramaziokohler/roslibpy.git
+ cd roslibpy
+ sudo pip install -r requirements-dev.txt --break-system-packages
+ ```
+
+ - For more information on roslibpy, refer to the [roslibpy repo](https://github.com/gramaziokohler/roslibpy).
## ESP32 Set-up
+
On the Raspberry Pi (over SSH):
1. Install esptool with `pip install esptool --break-system-packages`
-2. Make sure these four files on Raspberry Pi, which are in the [pstop_MCU build folder](../pstop_MCU/build/esp32.esp32.esp32/) folder:
-- `boot_app0.bin`
-- `pstop_MCU.ino.bin`
-- `pstop_MCU.ino.bootloader.bin`
-- `pstop_MCU.ino.partitions.bin`
-- If you are moving these files from your local computer, this can be done by going into the directory with these files and doing:
-```
-scp esp32_new.ino.bin esp32_new.ino.bootloader.bin esp32_new.ino.partitions.bin [username]@[tailscale ip]:/path/to/home/dir
-```
-- The original code [`pstop_MCU.ino`](../pstop_MCU/pstop_MCU.ino) file is also included, so you can make changes and re-compile if desired.
-3. Now, run the [esp32_flash.sh](../esp32_flash.sh) bash script with `bash esp32_flash.sh`. This should output something like this:
-```
-polymath@polymath-estop-001:~ $ bash esp32_flash.sh
-esptool.py v4.7.0
-Serial port /dev/serial/by-id/usb-Silicon_Labs_CP2102_USB_to_UART_Bridge_Controller_0001-if00-port0
-Connecting.....
-Chip is ESP32-D0WD-V3 (revision v3.1)
-Features: WiFi, BT, Dual Core, 240MHz, VRef calibration in efuse, Coding Scheme None
-Crystal is 40MHz
-MAC: e4:65:b8:0f:49:f4
-Uploading stub...
-Running stub...
-Stub running...
-Changing baud rate to 921600
-Changed.
-Configuring flash size...
-Flash will be erased from 0x00001000 to 0x00005fff...
-Flash will be erased from 0x00008000 to 0x00008fff...
-Flash will be erased from 0x0000e000 to 0x0000ffff...
-Flash will be erased from 0x00010000 to 0x000e4fff...
-Compressed 18992 bytes to 13112...
-Wrote 18992 bytes (13112 compressed) at 0x00001000 in 0.3 seconds (effective 583.1 kbit/s)...
-Hash of data verified.
-Compressed 3072 bytes to 146...
-Wrote 3072 bytes (146 compressed) at 0x00008000 in 0.0 seconds (effective 1368.9 kbit/s)...
-Hash of data verified.
-Compressed 8192 bytes to 47...
-Wrote 8192 bytes (47 compressed) at 0x0000e000 in 0.0 seconds (effective 2240.4 kbit/s)...
-Hash of data verified.
-Compressed 870384 bytes to 563721...
-Wrote 870384 bytes (563721 compressed) at 0x00010000 in 7.2 seconds (effective 968.5 kbit/s)...
-Hash of data verified.
-
-Leaving...
-Hard resetting via RTS pin...
-Flashing complete.
-Resetting USB-to-UART bridge...
-Bridge reset complete.
-Configuring serial port settings...
-Reading from serial port...
-```
-4. After seeing "Reading from serial port", you can stop the script with Ctrl + C.
-5. At this point, the LED ring should be lighting up and be responsive to the red stop button.
+1. Make sure these four files on Raspberry Pi, which are in the [pstop_MCU build folder](../pstop_MCU/build/esp32.esp32.esp32/) folder:
+ - `boot_app0.bin`
+ - `pstop_MCU.ino.bin`
+ - `pstop_MCU.ino.bootloader.bin`
+ - `pstop_MCU.ino.partitions.bin`
+ - If you are moving these files from your local computer, this can be done by going into the directory with these files and doing:
+
+ ```
+ scp esp32_new.ino.bin esp32_new.ino.bootloader.bin esp32_new.ino.partitions.bin [username]@[tailscale ip]:/path/to/home/dir
+ ```
+
+ - The original code [`pstop_MCU.ino`](../pstop_MCU/pstop_MCU.ino) file is also included, so you can make changes and re-compile if desired.
+
+1. Now, run the [esp32_flash.sh](../esp32_flash.sh) bash script with `bash esp32_flash.sh`. This should output something like this:
+
+ ```
+ polymath@polymath-estop-001:~ $ bash esp32_flash.sh
+ esptool.py v4.7.0
+ Serial port /dev/serial/by-id/usb-Silicon_Labs_CP2102_USB_to_UART_Bridge_Controller_0001-if00-port0
+ Connecting.....
+ Chip is ESP32-D0WD-V3 (revision v3.1)
+ Features: WiFi, BT, Dual Core, 240MHz, VRef calibration in efuse, Coding Scheme None
+ Crystal is 40MHz
+ MAC: e4:65:b8:0f:49:f4
+ Uploading stub...
+ Running stub...
+ Stub running...
+ Changing baud rate to 921600
+ Changed.
+ Configuring flash size...
+ Flash will be erased from 0x00001000 to 0x00005fff...
+ Flash will be erased from 0x00008000 to 0x00008fff...
+ Flash will be erased from 0x0000e000 to 0x0000ffff...
+ Flash will be erased from 0x00010000 to 0x000e4fff...
+ Compressed 18992 bytes to 13112...
+ Wrote 18992 bytes (13112 compressed) at 0x00001000 in 0.3 seconds (effective 583.1 kbit/s)...
+ Hash of data verified.
+ Compressed 3072 bytes to 146...
+ Wrote 3072 bytes (146 compressed) at 0x00008000 in 0.0 seconds (effective 1368.9 kbit/s)...
+ Hash of data verified.
+ Compressed 8192 bytes to 47...
+ Wrote 8192 bytes (47 compressed) at 0x0000e000 in 0.0 seconds (effective 2240.4 kbit/s)...
+ Hash of data verified.
+ Compressed 870384 bytes to 563721...
+ Wrote 870384 bytes (563721 compressed) at 0x00010000 in 7.2 seconds (effective 968.5 kbit/s)...
+ Hash of data verified.
+
+ Leaving...
+ Hard resetting via RTS pin...
+ Flashing complete.
+ Resetting USB-to-UART bridge...
+ Bridge reset complete.
+ Configuring serial port settings...
+ Reading from serial port...
+ ```
+
+1. After seeing "Reading from serial port", you can stop the script with Ctrl + C.
+1. At this point, the LED ring should be lighting up and be responsive to the red stop button.
For more information on esptool, refer to their [repo](https://github.com/espressif/esptool) or [docs](https://docs.espressif.com/projects/esptool/en/latest/esp32/).
## SIM Module Set-up
+
On the Raspberry Pi:
1. Install minicom with `sudo apt install minicom`
-2. Start minimcom with `sudo systemctl stop ModemManager.service && minicom -D /dev/ttyUSB2`
-- This may not work properly if `/dev/ttyUSB2` is not the right address.
-- You can tell whether it's working properly by typing `ATE` and then Enter in the minicom terminal and seeing if there is an `OK` response.
-- You may have to try some different addresses, which can be found with:
-```
-cd /dev/serial/by-id
-ls
-```
-3. In the minicom terminal, issue the following commands:
-```
-ATE
-AT&F
-ATI
-AT&V
-AT+CGDCONT?
-AT+CUSBPIDSWITCH?
-AT+CGPSAUTO?
-AT+CUSBPIDSWITCH=9001,1,1
-AT+CGDCONT=1,"IPV4V6","h2g2"
-AT+CGDCONT=6,"IPV4V6","h2g2"
-AT+CGPSAUTO=1
-```
-4. Exit minicom (`Ctrl+A, Z, X`), and then power cycle the system.
-5. Now, configure the Raspian Bookworm Network Manager with: `sudo nmcli connection add type gsm ifname '*' con-name '1-gsm' apn 'h2g2' connection.autoconnect yes`
-6. If you need more detailed instructions, refer to https://wimsworld.wordpress.com/2023/12/
+1. Start minimcom with `sudo systemctl stop ModemManager.service && minicom -D /dev/ttyUSB2`
+ - This may not work properly if `/dev/ttyUSB2` is not the right address.
+ - You can tell whether it's working properly by typing `ATE` and then Enter in the minicom terminal and seeing if there is an `OK` response.
+ - You may have to try some different addresses, which can be found with:
+
+ ```
+ cd /dev/serial/by-id
+ ls
+ ```
+
+1. In the minicom terminal, issue the following commands:
+
+ ```
+ ATE
+ AT&F
+ ATI
+ AT&V
+ AT+CGDCONT?
+ AT+CUSBPIDSWITCH?
+ AT+CGPSAUTO?
+ AT+CUSBPIDSWITCH=9001,1,1
+ AT+CGDCONT=1,"IPV4V6","h2g2"
+ AT+CGDCONT=6,"IPV4V6","h2g2"
+ AT+CGPSAUTO=1
+ ```
+
+1. Exit minicom (`Ctrl+A, Z, X`), and then power cycle the system.
+1. Now, configure the Raspian Bookworm Network Manager with: `sudo nmcli connection add type gsm ifname '*' con-name '1-gsm' apn 'h2g2' connection.autoconnect yes`
+1. If you need more detailed instructions, refer to https://wimsworld.wordpress.com/2023/12/
## E-ink Paper Display
+
On the Raspberry Pi terminal:
1. Install the following:
-```
-sudo pip3 install RPi.GPIO
-sudo pip3 install spidev
-```
-2. Git clone the e-Paper repo with `git clone https://github.com/waveshare/e-Paper.git`
-3. Go in the Raspberry Pi directory with: `cd e-Paper/RaspberryPi_JetsonNano/`
-4. Set up the libraries with `sudo python3 [setup.py](http://setup.py/) install`
-5. Install flask with `pip install flask --break-system-packages`
-6. Configure the Raspberry Pi by typing `sudo raspi-config`.
-- Go to `Choose Interfacing Options -> SPI -> Yes Enable SPI interface`
-
- Raspberry Pi Config Menu
-
-
-7. Reboot your Raspberry Pi with either `sudo reboot` or power cycle.
+
+ ```
+ sudo pip3 install RPi.GPIO
+ sudo pip3 install spidev
+ ```
+
+1. Git clone the e-Paper repo with `git clone https://github.com/waveshare/e-Paper.git`
+1. Go in the Raspberry Pi directory with: `cd e-Paper/RaspberryPi_JetsonNano/`
+1. Set up the libraries with `sudo python3 [setup.py](http://setup.py/) install`
+1. Install flask with `pip install flask --break-system-packages`
+1. Configure the Raspberry Pi by typing `sudo raspi-config`.
+ - Go to `Choose Interfacing Options -> SPI -> Yes Enable SPI interface`
+
+ Raspberry Pi Config Menu
+
+
+1. Reboot your Raspberry Pi with either `sudo reboot` or power cycle.
For more info on this display, please refer to Waveshare's [manual](https://www.waveshare.com/wiki/2.13inch_e-Paper_HAT_Manual).
## Custom Message Set-up
+
On your local machine your local machine or (robot that you want the p-stop to control):
1. Install rosbridge-server with `sudo apt-get install ros-$ROS-DISTRO-rosbridge-server`.
-2. Create a colcon workspace if you don't already have one:
-```
-mkdir -p colcon_ws/src
-cd colcon_ws/src
-```
-3. Put the [`pstop_msg`](../pstop_msg) folder in this repo inside the `src` directory. It contains a custom message definition for our protective stop.
-4. Go back to the `colcon_ws` directory and run:
-```
-colcon build
-source install/setup.bash
-```
-5. To verify that our custom message set-up was successful, you can run `ros2 interface show estop_interface/msg/EStopMsg`, which should print out the message definition.
+1. Create a colcon workspace if you don't already have one:
+
+ ```
+ mkdir -p colcon_ws/src
+ cd colcon_ws/src
+ ```
+
+1. Put the [`pstop_msg`](../pstop_msg) folder in this repo inside the `src` directory. It contains a custom message definition for our protective stop.
+1. Go back to the `colcon_ws` directory and run:
+
+ ```
+ colcon build
+ source install/setup.bash
+ ```
+
+1. To verify that our custom message set-up was successful, you can run `ros2 interface show estop_interface/msg/EStopMsg`, which should print out the message definition.
## roslibpy Client Set-up
+
In a terminal on your local machine where you have built and sourced the custom message type above:
1. Launch a rosbridge_server with `ros2 launch rosbridge_server rosbridge_websocket_launch.xml`.
-- If you are running this in a Docker container, make sure that port 9090 is exposed by doing `docker run -p 9090:9090 ...`
-2. To run the roslibpy client, run `python roslibpy_client.py [ip address]`.
-- You can set a default ip address by altering line 12 of [`roslibpy_client.py`](../roslibpy_client.py):
-```
-parser.add_argument('target', type=str, help='Target IP address', default='')
-```
-3. To use the flask interface, connect to the Raspberry Pi through SSH with portforwarding:
-```
-ssh -L 8000:localhost:5000 [username]@[tailscale ip]
-```
-- Now, you should be able to see the flask interface by going to `http://localhost:8000/config`.
-- For more information about roslibpy and rosbridge_server, see [Robot Web Tools](https://robotwebtools.github.io/).
+ - If you are running this in a Docker container, make sure that port 9090 is exposed by doing `docker run -p 9090:9090 ...`
+1. To run the roslibpy client, run `python roslibpy_client.py [ip address]`.
+ - You can set a default ip address by altering line 12 of [`roslibpy_client.py`](../roslibpy_client.py):
+
+ ```
+ parser.add_argument('target', type=str, help='Target IP address', default='')
+ ```
+
+1. To use the flask interface, connect to the Raspberry Pi through SSH with portforwarding:
+
+ ```
+ ssh -L 8000:localhost:5000 [username]@[tailscale ip]
+ ```
+
+ - Now, you should be able to see the flask interface by going to `http://localhost:8000/config`.
+ - For more information about roslibpy and rosbridge_server, see [Robot Web Tools](https://robotwebtools.github.io/).
## Starting on Boot
+
If you want to start the roslibpy client above on boot, we can do so with a systemctl service.
1. Create a service file with `sudo nano /etc/systemd/system/pstop-autostart.service`.
-2. Edit the service file `nano` or your text editor of choice and add:
-```
-[Unit]
-Description=Start protective stop on boot
-After=multi-user.target
-
-[Service]
-Type=idle
-ExecStart=/usr/bin/python /path/roslibpy_client.py
-
-[Install]
-WantedBy=multi-user.target
-```
-3. Reload the systemd manager configuration: `sudo systemctl daemon-reload`.
-4. Enable the service: `sudo systemctl enable pstop-autostart.service`.
-5. This service should now start on boot!
-- You can reboot either by power cycling or with `sudo reboot`.
-- You can also test it without rebooting with `sudo systemctl start pstop-autostart.service`.
\ No newline at end of file
+1. Edit the service file `nano` or your text editor of choice and add:
+
+ ```
+ [Unit]
+ Description=Start protective stop on boot
+ After=multi-user.target
+
+ [Service]
+ Type=idle
+ ExecStart=/usr/bin/python /path/roslibpy_client.py
+
+ [Install]
+ WantedBy=multi-user.target
+ ```
+
+1. Reload the systemd manager configuration: `sudo systemctl daemon-reload`.
+1. Enable the service: `sudo systemctl enable pstop-autostart.service`.
+1. This service should now start on boot!
+ - You can reboot either by power cycling or with `sudo reboot`.
+ - You can also test it without rebooting with `sudo systemctl start pstop-autostart.service`.
diff --git a/archive/protective_stop_remote/protective_stop_remote/__init__.py b/archive/protective_stop_remote/protective_stop_remote/__init__.py
index e69de29..18a2699 100644
--- a/archive/protective_stop_remote/protective_stop_remote/__init__.py
+++ b/archive/protective_stop_remote/protective_stop_remote/__init__.py
@@ -0,0 +1,2 @@
+# SPDX-FileCopyrightText: 2026 Polymath Robotics, Inc.
+# SPDX-License-Identifier: Apache-2.0
diff --git a/archive/protective_stop_remote/protective_stop_remote/app.py b/archive/protective_stop_remote/protective_stop_remote/app.py
index 5d3bda1..22d6b09 100644
--- a/archive/protective_stop_remote/protective_stop_remote/app.py
+++ b/archive/protective_stop_remote/protective_stop_remote/app.py
@@ -1,13 +1,12 @@
-#!/usr/bin/env python3
-# -*- coding: utf-8 -*-
-
+# SPDX-FileCopyrightText: 2026 Polymath Robotics, Inc.
+# SPDX-License-Identifier: Apache-2.0
# Handles configuration via webpage + saving config to json file
import json
-import subprocess
import re
-from flask import Flask, request, redirect, render_template, jsonify
-import logging
+import subprocess
+
+from flask import Flask, jsonify, redirect, render_template, request
app = Flask(__name__)
CONFIG_FILE = 'config.json'
@@ -17,121 +16,118 @@
def list_wifi_connections():
try:
- result = subprocess.run(['nmcli', '-t', '-f', 'NAME,DEVICE,STATE',
- 'connection', 'show'], capture_output=True, text=True)
+ result = subprocess.run(
+ ['nmcli', '-t', '-f', 'NAME,DEVICE,STATE', 'connection', 'show'], capture_output=True, text=True
+ )
if result.returncode != 0:
- raise Exception(f"nmcli error: {result.stderr}")
+ raise Exception(f'nmcli error: {result.stderr}')
connections = []
- for line in result.stdout.strip().split("\n"):
+ for line in result.stdout.strip().split('\n'):
name, device, state = line.split(':')
- connections.append(
- {"name": name, "device": device, "state": state})
+ connections.append({'name': name, 'device': device, 'state': state})
return connections
except Exception as e:
- print(f"Error listing Wi-Fi connections: {e}")
+ print(f'Error listing Wi-Fi connections: {e}')
return []
def add_wifi_connection(ssid, password):
try:
- result = subprocess.run(['nmcli', 'dev', 'wifi', 'connect',
- ssid, 'password', password], capture_output=True, text=True)
+ result = subprocess.run(
+ ['nmcli', 'dev', 'wifi', 'connect', ssid, 'password', password], capture_output=True, text=True
+ )
if result.returncode != 0:
- raise Exception(f"nmcli error: {result.stderr}")
+ raise Exception(f'nmcli error: {result.stderr}')
return True
except Exception as e:
- print(f"Error adding Wi-Fi connection: {e}")
+ print(f'Error adding Wi-Fi connection: {e}')
return False
def remove_wifi_connection(name):
try:
- result = subprocess.run(
- ['nmcli', 'connection', 'delete', name], capture_output=True, text=True)
+ result = subprocess.run(['nmcli', 'connection', 'delete', name], capture_output=True, text=True)
if result.returncode != 0:
- raise Exception(f"nmcli error: {result.stderr}")
+ raise Exception(f'nmcli error: {result.stderr}')
return True
except Exception as e:
- print(f"Error removing Wi-Fi connection: {e}")
+ print(f'Error removing Wi-Fi connection: {e}')
return False
def activate_wifi_connection(name):
"""Activate a specific Wi-Fi connection."""
try:
- result = subprocess.run(
- ['nmcli', 'connection', 'up', name], capture_output=True, text=True)
+ result = subprocess.run(['nmcli', 'connection', 'up', name], capture_output=True, text=True)
if result.returncode != 0:
- raise Exception(f"nmcli error: {result.stderr}")
+ raise Exception(f'nmcli error: {result.stderr}')
return True
except Exception as e:
- print(f"Error activating Wi-Fi connection: {e}")
+ print(f'Error activating Wi-Fi connection: {e}')
return False
+
# Tailscale and signal strength functions
def get_wifi_signal_strength():
try:
- result = subprocess.run(
- ['nmcli', '-t', '-f', 'IN-USE,SIGNAL', 'dev', 'wifi'], capture_output=True, text=True)
+ result = subprocess.run(['nmcli', '-t', '-f', 'IN-USE,SIGNAL', 'dev', 'wifi'], capture_output=True, text=True)
if result.returncode != 0:
- raise Exception(f"nmcli error: {result.stderr}")
+ raise Exception(f'nmcli error: {result.stderr}')
- wifi_lines = result.stdout.strip().split("\n")
+ wifi_lines = result.stdout.strip().split('\n')
for line in wifi_lines:
if line.startswith('*'):
_, signal = line.split(':')
return int(signal)
except Exception as e:
- print(f"Error getting Wi-Fi signal strength: {e}")
+ print(f'Error getting Wi-Fi signal strength: {e}')
return None
def get_cellular_signal_strength():
try:
- result = subprocess.run(
- ['mmcli', '-L'], capture_output=True, text=True)
+ result = subprocess.run(['mmcli', '-L'], capture_output=True, text=True)
if result.returncode != 0:
- raise Exception(f"mmcli error: {result.stderr}")
+ raise Exception(f'mmcli error: {result.stderr}')
output = result.stdout.strip()
match = re.search(r'/Modem/(\d+)', output)
if not match:
- raise Exception("Could not find a modem index in mmcli output.")
+ raise Exception('Could not find a modem index in mmcli output.')
modem_index = match.group(1)
- result = subprocess.run(
- ['mmcli', '-m', modem_index], capture_output=True, text=True)
+ result = subprocess.run(['mmcli', '-m', modem_index], capture_output=True, text=True)
if result.returncode != 0:
- raise Exception(f"mmcli error: {result.stderr}")
+ raise Exception(f'mmcli error: {result.stderr}')
output = result.stdout
signal_match = re.search(r'signal quality: (\d+)', output)
if signal_match:
return int(signal_match.group(1))
else:
- raise Exception("Signal quality not found in modem details.")
+ raise Exception('Signal quality not found in modem details.')
except Exception as e:
- print(f"Error getting cellular signal strength: {e}")
+ print(f'Error getting cellular signal strength: {e}')
return None
def fetch_tailscale_info(shared):
try:
- result = subprocess.run(
- ["tailscale", "status", "--json"], capture_output=True, text=True, check=True)
+ result = subprocess.run(['tailscale', 'status', '--json'], capture_output=True, text=True, check=True)
status = json.loads(result.stdout)
- tailscale_ips = status.get("Self", {}).get("TailscaleIPs", [])
- dns_name = status.get("Self", {}).get("DNSName", "")
- shared["TailscaleIP"] = tailscale_ips[0] if tailscale_ips else "N/A"
- shared["DNSName"] = dns_name if dns_name else "N/A"
+ tailscale_ips = status.get('Self', {}).get('TailscaleIPs', [])
+ dns_name = status.get('Self', {}).get('DNSName', '')
+ shared['TailscaleIP'] = tailscale_ips[0] if tailscale_ips else 'N/A'
+ shared['DNSName'] = dns_name if dns_name else 'N/A'
except Exception as e:
- print(f"Error fetching Tailscale info: {e}")
- shared["TailscaleIP"] = "N/A"
- shared["DNSName"] = "N/A"
+ print(f'Error fetching Tailscale info: {e}')
+ shared['TailscaleIP'] = 'N/A'
+ shared['DNSName'] = 'N/A'
+
# Load and save configuration
@@ -150,31 +146,32 @@ def save_config(cfg):
with open(CONFIG_FILE, 'w') as f:
json.dump(cfg, f, indent=2)
except Exception as e:
- print(f"Error saving config: {e}")
+ print(f'Error saving config: {e}')
+
# Shared dictionary initialization
def initialize_shared_dict(shared):
"""Ensure all required keys exist in the shared dictionary with default values."""
- if "status_summary" not in shared:
- shared["status_summary"] = "Not Connected"
- if "uuid" not in shared:
- shared["uuid"] = "Unassigned!"
- if "TailscaleIP" not in shared:
- shared["TailscaleIP"] = "N/A"
- if "DNSName" not in shared:
- shared["DNSName"] = "N/A"
- if "battery_level" not in shared:
- shared["battery_level"] = "Unknown"
- if "WifiSignal" not in shared:
- shared["WifiSignal"] = "N/A"
- if "CellularSignal" not in shared:
- shared["CellularSignal"] = "N/A"
- if "target_pairs" not in shared:
- shared["target_pairs"] = []
- if "WifiConnections" not in shared:
- shared["WifiConnections"] = []
+ if 'status_summary' not in shared:
+ shared['status_summary'] = 'Not Connected'
+ if 'uuid' not in shared:
+ shared['uuid'] = 'Unassigned!'
+ if 'TailscaleIP' not in shared:
+ shared['TailscaleIP'] = 'N/A'
+ if 'DNSName' not in shared:
+ shared['DNSName'] = 'N/A'
+ if 'battery_level' not in shared:
+ shared['battery_level'] = 'Unknown'
+ if 'WifiSignal' not in shared:
+ shared['WifiSignal'] = 'N/A'
+ if 'CellularSignal' not in shared:
+ shared['CellularSignal'] = 'N/A'
+ if 'target_pairs' not in shared:
+ shared['target_pairs'] = []
+ if 'WifiConnections' not in shared:
+ shared['WifiConnections'] = []
def load_shared_from_config(shared):
@@ -194,19 +191,19 @@ def config_app(shared={}):
def get_data():
"""API endpoint for live data updates."""
fetch_tailscale_info(shared)
- shared["WifiSignal"] = get_wifi_signal_strength() or "N/A"
- shared["CellularSignal"] = get_cellular_signal_strength() or "N/A"
- shared["WifiConnections"] = list_wifi_connections()
+ shared['WifiSignal'] = get_wifi_signal_strength() or 'N/A'
+ shared['CellularSignal'] = get_cellular_signal_strength() or 'N/A'
+ shared['WifiConnections'] = list_wifi_connections()
return jsonify({
- "status_summary": shared.get("status_summary", "Not Connected"),
- "uuid": shared.get("uuid", "Unassigned"),
- "TailscaleIP": shared.get("TailscaleIP", "N/A"),
- "DNSName": shared.get("DNSName", "N/A"),
- "BatteryLevel": shared.get("battery_level", "Unknown"),
- "WifiSignal": shared.get("WifiSignal", "N/A"),
- "CellularSignal": shared.get("CellularSignal", "N/A"),
- "TargetPairs": shared.get("target_pairs", []),
- "WifiConnections": shared["WifiConnections"]
+ 'status_summary': shared.get('status_summary', 'Not Connected'),
+ 'uuid': shared.get('uuid', 'Unassigned'),
+ 'TailscaleIP': shared.get('TailscaleIP', 'N/A'),
+ 'DNSName': shared.get('DNSName', 'N/A'),
+ 'BatteryLevel': shared.get('battery_level', 'Unknown'),
+ 'WifiSignal': shared.get('WifiSignal', 'N/A'),
+ 'CellularSignal': shared.get('CellularSignal', 'N/A'),
+ 'TargetPairs': shared.get('target_pairs', []),
+ 'WifiConnections': shared['WifiConnections'],
})
@app.route('/wifi', methods=['POST'])
@@ -237,8 +234,7 @@ def index():
new_uuid = request.form.get('new_uuid', '')
if new_ip and new_uuid:
pairs = list(shared['target_pairs'])
- pairs.append({'ip': new_ip, 'uuid': new_uuid,
- 'status': "Not Connected"})
+ pairs.append({'ip': new_ip, 'uuid': new_uuid, 'status': 'Not Connected'})
shared['target_pairs'] = pairs
save_config(dict(shared))
elif action.startswith('remove_'):
diff --git a/archive/protective_stop_remote/protective_stop_remote/button.py b/archive/protective_stop_remote/protective_stop_remote/button.py
index f7d4f2c..769b3d5 100644
--- a/archive/protective_stop_remote/protective_stop_remote/button.py
+++ b/archive/protective_stop_remote/protective_stop_remote/button.py
@@ -1,31 +1,32 @@
-#!/usr/bin/env python3
-# -*- coding: utf-8 -*-
-
+# SPDX-FileCopyrightText: 2026 Polymath Robotics, Inc.
+# SPDX-License-Identifier: Apache-2.0
# Handles pstop button physical interaction
-import RPi.GPIO as GPIO
-import time
import logging
+import time
+
+import RPi.GPIO as GPIO
# GPIO setup
# 15 connected to 23 when button pressed
# 14 connected to 23 when button released
+
def button_node(shared):
logging.basicConfig(level=logging.DEBUG)
OUTPUT_PIN = 23
INPUT_PINS = [14, 15]
-
- if "button_pressed" not in shared:
- shared["button_pressed"] = None
-
+
+ if 'button_pressed' not in shared:
+ shared['button_pressed'] = None
+
try:
GPIO.setmode(GPIO.BCM) # Use BCM pin numbering
GPIO.setup(OUTPUT_PIN, GPIO.OUT)
-
- logging.info(f"Button Node starting at {time.time()}")
+
+ logging.info(f'Button Node starting at {time.time()}')
# The following code toggles output high and low, and checks for correct return
- # If the value is not correct, one or both of the return lines is broken
+ # If the value is not correct, one or both of the return lines is broken
while True:
GPIO.output(OUTPUT_PIN, GPIO.HIGH)
for pin in INPUT_PINS:
@@ -55,23 +56,24 @@ def button_node(shared):
new_state = None
# Log only when the state changes
- if new_state != shared["button_pressed"]:
- shared["button_pressed"] = new_state
+ if new_state != shared['button_pressed']:
+ shared['button_pressed'] = new_state
if new_state is True:
- logging.info(f"Button pressed at {time.time()}")
+ logging.info(f'Button pressed at {time.time()}')
elif new_state is False:
- logging.info(f"Button released at {time.time()}")
+ logging.info(f'Button released at {time.time()}')
else:
- logging.info(f"Button state unknown (failure) at {time.time()}")
+ logging.info(f'Button state unknown (failure) at {time.time()}')
time.sleep(0.05) # 20 Hz
-
+
except KeyboardInterrupt:
- logging.info(f"Button Node exiting at {time.time()}")
-
- finally:
+ logging.info(f'Button Node exiting at {time.time()}')
+
+ finally:
GPIO.cleanup()
-
+
+
# Running standalone if needed
if __name__ == '__main__':
button_node(shared={})
diff --git a/archive/protective_stop_remote/protective_stop_remote/main.py b/archive/protective_stop_remote/protective_stop_remote/main.py
index 6e0de92..940355a 100644
--- a/archive/protective_stop_remote/protective_stop_remote/main.py
+++ b/archive/protective_stop_remote/protective_stop_remote/main.py
@@ -1,19 +1,17 @@
-#!/usr/bin/env python3
-# -*- coding: utf-8 -*-
-
+# SPDX-FileCopyrightText: 2026 Polymath Robotics, Inc.
+# SPDX-License-Identifier: Apache-2.0
# Automatically started on boot, combines data from other nodes
-from multiprocessing import Process, Manager
+from multiprocessing import Manager, Process
-from power import power_node # Handles power button, UPS management, shutdown command
-from ui import ui_node # Controls the LED ring, E paper display
# Handles configuration via webpage + saving config to json file
from app import run_app
-# Main pstop node, communicates with machine node on robot
-from pstop import pstop_node
from button import button_node # Handles pstop button
+from power import power_node # Handles power button, UPS management, shutdown command
-import logging
+# Main pstop node, communicates with machine node on robot
+from pstop import pstop_node
+from ui import ui_node # Controls the LED ring, E paper display
if __name__ == '__main__':
with Manager() as manager:
@@ -22,7 +20,7 @@
# Create two processes, one for ROS node and one for UI node
power_process = Process(target=power_node, args=(shared_data,))
- app_process = Process(target=run_app, args=(shared_data,))
+ app_process = Process(target=run_app, args=(shared_data,))
ui_process = Process(target=ui_node, args=(shared_data,))
pstop_process = Process(target=pstop_node, args=(shared_data,))
button_process = Process(target=button_node, args=(shared_data,))
diff --git a/archive/protective_stop_remote/protective_stop_remote/power.py b/archive/protective_stop_remote/protective_stop_remote/power.py
index 952a520..4e50efa 100644
--- a/archive/protective_stop_remote/protective_stop_remote/power.py
+++ b/archive/protective_stop_remote/protective_stop_remote/power.py
@@ -1,28 +1,27 @@
-#!/usr/bin/env python3
-# -*- coding: utf-8 -*-
-
-
+# SPDX-FileCopyrightText: 2026 Polymath Robotics, Inc.
+# SPDX-License-Identifier: Apache-2.0
# Handles power button, UPS management, shutdown command
+import logging
+import subprocess
import time
+
import smbus
-import subprocess
-import logging
-import traceback
# ========== I²C Setup (PiSugar) ==========
I2C_BUS = 1
DEVICE_ADDR = 0x57
-REG_BUTTON = 0x02 # Register to read button state (LSB) & modify bits
-REG_ACTION = 0x09 # Register to write an action/delay if needed
+REG_BUTTON = 0x02 # Register to read button state (LSB) & modify bits
+REG_ACTION = 0x09 # Register to write an action/delay if needed
-PRESS_THRESHOLD = 0.2 # Number of seconds to hold button
+PRESS_THRESHOLD = 0.2 # Number of seconds to hold button
bus = smbus.SMBus(I2C_BUS)
button_pressed_start = None
+
def shutdown_process(shared):
- logging.info("==> Shut Down Sequence Initiated...")
+ logging.info('==> Shut Down Sequence Initiated...')
shared['shutdown'] = True
# Sets shutdown delay to 20 seconds
@@ -33,7 +32,7 @@ def shutdown_process(shared):
try:
value = bus.read_byte_data(DEVICE_ADDR, REG_BUTTON)
except OSError as e:
- logging.warning(f"I2C Read Error: {e}. Retrying...")
+ logging.warning(f'I2C Read Error: {e}. Retrying...')
continue
# Clear the 5th bit (bit index 5) of 0x02
@@ -43,26 +42,25 @@ def shutdown_process(shared):
try:
bus.write_byte_data(DEVICE_ADDR, REG_BUTTON, new_value)
verify_val = bus.read_byte_data(DEVICE_ADDR, REG_BUTTON)
- logging.info(f"Verified 0x02 after write: 0x{verify_val:02X}")
+ logging.info(f'Verified 0x02 after write: 0x{verify_val:02X}')
except OSError as e:
- logging.warning(f"I2C Write/Verify Error (REG_BUTTON): {e}")
-
+ logging.warning(f'I2C Write/Verify Error (REG_BUTTON): {e}')
# ========== Issue System Shutdown ==========
- logging.info("Issuing system shutdown command...")
- subprocess.run(["sudo", "shutdown", "-h", "now"])
+ logging.info('Issuing system shutdown command...')
+ subprocess.run(['sudo', 'shutdown', '-h', 'now'])
-def power_node(shared):
+def power_node(shared):
logging.basicConfig(level=logging.DEBUG)
# Optional: Make certain registers writable (per PiSugar docs, if needed)
try:
bus.write_byte_data(DEVICE_ADDR, 0x0B, 0x29)
- logging.info("Setting 0x0B = 0x29 to enable writable registers.")
+ logging.info('Setting 0x0B = 0x29 to enable writable registers.')
except OSError as e:
- logging.warning(f"I2C Write Error (0x0B): {e}")
-
+ logging.warning(f'I2C Write Error (0x0B): {e}')
+
shared['shutdown'] = False
# ========== Main Loop ==========
@@ -70,30 +68,31 @@ def power_node(shared):
# ========== Battery Check ==========
try:
# Use shell=True so we can pipe directly in one command
- battery_output = subprocess.check_output(
- "echo 'get battery' | nc -q 0 127.0.0.1 8423",
- shell=True
- ).decode('utf-8').strip()
+ battery_output = (
+ subprocess.check_output("echo 'get battery' | nc -q 0 127.0.0.1 8423", shell=True)
+ .decode('utf-8')
+ .strip()
+ )
# If battery_output is something like "37%" or "37", parse it
# Remove any trailing '%' if present:
battery_value = battery_output.replace('%', '').strip()[9:]
- battery_percentage = round(float(battery_value),1)
+ battery_percentage = round(float(battery_value), 1)
shared['battery_level'] = battery_percentage
- #logging.warning(f"Battery level is ({battery_percentage}%).")
+ # logging.warning(f"Battery level is ({battery_percentage}%).")
if battery_percentage < 4:
- logging.warning(f"Battery level is below 4% ({battery_percentage}%).")
+ logging.warning(f'Battery level is below 4% ({battery_percentage}%).')
shutdown_process(shared)
except Exception as e:
- logging.warning(f"Battery check failed: {e}")
+ logging.warning(f'Battery check failed: {e}')
# ========== Button Handling ==========
try:
value = bus.read_byte_data(DEVICE_ADDR, REG_BUTTON)
except OSError as e:
- logging.warning(f"I2C Read Error: {e}. Retrying...")
+ logging.warning(f'I2C Read Error: {e}. Retrying...')
time.sleep(0.1)
continue
@@ -108,7 +107,6 @@ def power_node(shared):
else:
# Check how long it's been held
if (time.time() - button_pressed_start) >= PRESS_THRESHOLD:
-
shutdown_process(shared)
# Reset press timer so we only do this once per press
button_pressed_start = None
@@ -117,7 +115,7 @@ def power_node(shared):
button_pressed_start = None
time.sleep(0.1)
-
-
+
+
if __name__ == '__main__':
power_node(shared={}) # Normal dict here for a simple test
diff --git a/archive/protective_stop_remote/protective_stop_remote/pstop.py b/archive/protective_stop_remote/protective_stop_remote/pstop.py
index 6ab5a6a..999150c 100644
--- a/archive/protective_stop_remote/protective_stop_remote/pstop.py
+++ b/archive/protective_stop_remote/protective_stop_remote/pstop.py
@@ -1,10 +1,12 @@
-import time
+# SPDX-FileCopyrightText: 2026 Polymath Robotics, Inc.
+# SPDX-License-Identifier: Apache-2.0
+import base64
import logging
-import roslibpy
import threading
+import time
import uuid
-import base64
+import roslibpy
# TODO:
# - Handle connect/disconnect/connect more reliably
@@ -46,8 +48,15 @@ def compute_crc16(data: bytes, poly: int = 0x1021, init: int = 0xFFFF) -> int:
def build_pstop_message(
- message_id, state_id, timestamp, sender_uuid, receiver_uuid,
- counter, heartbeat_timeout, received_stamp, received_counter
+ message_id,
+ state_id,
+ timestamp,
+ sender_uuid,
+ receiver_uuid,
+ counter,
+ heartbeat_timeout,
+ received_stamp,
+ received_counter,
):
"""
Constructs a PStopMsg dictionary using the correct lifecycle state definitions.
@@ -69,22 +78,22 @@ def build_pstop_message(
# Lifecycle state definitions, from https://docs.ros2.org/foxy/api/lifecycle_msgs/msg/State.html
lifecycle_states = {
- 0: "UNKNOWN",
- 1: "UNCONFIGURED",
- 2: "INACTIVE",
- 3: "ACTIVE",
- 4: "FINALIZED",
- 10: "CONFIGURING",
- 11: "CLEANINGUP",
- 12: "SHUTTINGDOWN",
- 13: "ACTIVATING",
- 14: "DEACTIVATING",
- 15: "ERRORPROCESSING"
+ 0: 'UNKNOWN',
+ 1: 'UNCONFIGURED',
+ 2: 'INACTIVE',
+ 3: 'ACTIVE',
+ 4: 'FINALIZED',
+ 10: 'CONFIGURING',
+ 11: 'CLEANINGUP',
+ 12: 'SHUTTINGDOWN',
+ 13: 'ACTIVATING',
+ 14: 'DEACTIVATING',
+ 15: 'ERRORPROCESSING',
}
# Function to get state label dynamically
def get_state_label(state_id):
- return lifecycle_states.get(state_id, "UNKNOWN")
+ return lifecycle_states.get(state_id, 'UNKNOWN')
return {
'message': message_id,
@@ -95,9 +104,9 @@ def get_state_label(state_id):
'counter': counter,
'heartbeat_timeout': heartbeat_timeout,
'checksum_type': 'CRC-16',
- 'checksum_value': f"{compute_crc16(str(counter).encode()):04X}",
+ 'checksum_value': f'{compute_crc16(str(counter).encode()):04X}',
'received_stamp': received_stamp,
- 'received_counter': received_counter
+ 'received_counter': received_counter,
}
@@ -108,160 +117,161 @@ def start_client(ip, port, shared):
while not client.is_connected and attempt_count < max_attempts:
try:
- logging.info(
- f"Attempting to connect to {ip}:{port} (Attempt {attempt_count + 1}/{max_attempts})")
+ logging.info(f'Attempting to connect to {ip}:{port} (Attempt {attempt_count + 1}/{max_attempts})')
client.run()
client.connect()
time.sleep(0.5)
if client.is_connected:
- logging.info(f"Connected to {ip}:{port}")
- machine = next(
- (m for m in shared["target_pairs"] if m.get("ip") == ip), None)
+ logging.info(f'Connected to {ip}:{port}')
+ machine = next((m for m in shared['target_pairs'] if m.get('ip') == ip), None)
if machine:
- machine['status'] = "Connected"
+ machine['status'] = 'Connected'
else:
- logging.error(f"Connection to {ip}:{port} failed")
+ logging.error(f'Connection to {ip}:{port} failed')
client.close()
except Exception as e:
- logging.error(f"Connection to {ip}:{port} failed: {e}")
+ logging.error(f'Connection to {ip}:{port} failed: {e}')
attempt_count += 1
- machine = next(
- (m for m in shared["target_pairs"] if m.get("ip") == ip), None)
+ machine = next((m for m in shared['target_pairs'] if m.get('ip') == ip), None)
if machine:
- machine['status'] = f"Retrying ({attempt_count}/{max_attempts})"
+ machine['status'] = f'Retrying ({attempt_count}/{max_attempts})'
if attempt_count < max_attempts:
- logging.info("Retrying in 5 seconds...")
+ logging.info('Retrying in 5 seconds...')
time.sleep(5)
if not client.is_connected:
- logging.error(
- f"Failed to connect to {ip}:{port} after {max_attempts} attempts.")
- machine = next(
- (m for m in shared["target_pairs"] if m.get("ip") == ip), None)
+ logging.error(f'Failed to connect to {ip}:{port} after {max_attempts} attempts.')
+ machine = next((m for m in shared['target_pairs'] if m.get('ip') == ip), None)
if machine:
- machine['status'] = "Failed"
+ machine['status'] = 'Failed'
return client
def setup_client(client, ip, port, shared):
- publisher = roslibpy.Topic(
- client, '/protective_stop', 'pstop_msg/msg/PStopMsg')
+ publisher = roslibpy.Topic(client, '/protective_stop', 'pstop_msg/msg/PStopMsg')
publisher.advertise()
- subscriber = roslibpy.Topic(
- client, '/protective_stop', 'pstop_msg/msg/PStopMsg')
+ subscriber = roslibpy.Topic(client, '/protective_stop', 'pstop_msg/msg/PStopMsg')
bonding_status = {} # Track bonding state per machine UUID
try:
- self_uuid = uuid.UUID(shared.get("uuid"))
+ self_uuid = uuid.UUID(shared.get('uuid'))
except ValueError:
- logging.error(f"Invalid self UUID: {shared.get('uuid')}")
+ logging.error(f'Invalid self UUID: {shared.get("uuid")}')
return
def callback(message):
- sender_uuid_raw = message.get("id", {}).get("uuid")
- receiver_uuid_raw = message.get("receiver_uuid", {}).get("uuid")
+ sender_uuid_raw = message.get('id', {}).get('uuid')
+ receiver_uuid_raw = message.get('receiver_uuid', {}).get('uuid')
sender_uuid = None
receiver_uuid = None
- logging.info(f"⚠️ Received message: {message}")
+ logging.info(f'⚠️ Received message: {message}')
# ✅ Ensure sender UUID is extracted correctly
if isinstance(sender_uuid_raw, list) and len(sender_uuid_raw) == 16:
sender_uuid = uuid.UUID(bytes=bytes(sender_uuid_raw))
elif isinstance(sender_uuid_raw, str):
try:
- sender_uuid = uuid.UUID(
- bytes=base64.b64decode(sender_uuid_raw))
+ sender_uuid = uuid.UUID(bytes=base64.b64decode(sender_uuid_raw))
except Exception:
- logging.warning(
- f"⚠️ Failed to decode sender UUID from Base64: {sender_uuid_raw}")
+ logging.warning(f'⚠️ Failed to decode sender UUID from Base64: {sender_uuid_raw}')
# ✅ Ensure receiver UUID is extracted correctly
if isinstance(receiver_uuid_raw, list) and len(receiver_uuid_raw) == 16:
receiver_uuid = uuid.UUID(bytes=bytes(receiver_uuid_raw))
elif isinstance(receiver_uuid_raw, str):
try:
- receiver_uuid = uuid.UUID(
- bytes=base64.b64decode(receiver_uuid_raw))
+ receiver_uuid = uuid.UUID(bytes=base64.b64decode(receiver_uuid_raw))
except Exception:
- logging.warning(
- f"⚠️ Failed to decode receiver UUID from Base64: {receiver_uuid_raw}")
+ logging.warning(f'⚠️ Failed to decode receiver UUID from Base64: {receiver_uuid_raw}')
# ✅ Convert self UUID for consistent comparison
try:
- self_uuid = uuid.UUID(shared.get("uuid"))
+ self_uuid = uuid.UUID(shared.get('uuid'))
except ValueError:
- logging.error(f"❌ Invalid self UUID: {shared.get('uuid')}")
+ logging.error(f'❌ Invalid self UUID: {shared.get("uuid")}')
return
- logging.info(
- f"Sender UUID: {sender_uuid}, Receiver UUID: {receiver_uuid}, Self UUID: {self_uuid}")
+ logging.info(f'Sender UUID: {sender_uuid}, Receiver UUID: {receiver_uuid}, Self UUID: {self_uuid}')
if sender_uuid and sender_uuid == self_uuid:
- logging.info(
- f"❌ Ignoring self-generated message from UUID {self_uuid}")
+ logging.info(f'❌ Ignoring self-generated message from UUID {self_uuid}')
return
sender_str = str(sender_uuid)
- state_id = message["state"]["id"]
+ state_id = message['state']['id']
if state_id == 13: # TRANSITION_STATE_ACTIVATING
- logging.info(f"🔄 Bonding started with {sender_uuid}")
- bonding_status[sender_str] = "ACTIVATING"
+ logging.info(f'🔄 Bonding started with {sender_uuid}')
+ bonding_status[sender_str] = 'ACTIVATING'
response = build_pstop_message(
- PSTOP_OK, 13, message["stamp"], message["id"], message["receiver_uuid"],
- message["counter"], message["heartbeat_timeout"],
- message["received_stamp"], message["received_counter"]
+ PSTOP_OK,
+ 13,
+ message['stamp'],
+ message['id'],
+ message['receiver_uuid'],
+ message['counter'],
+ message['heartbeat_timeout'],
+ message['received_stamp'],
+ message['received_counter'],
)
publisher.publish(roslibpy.Message(response))
elif state_id == 3: # PRIMARY_STATE_ACTIVE
- if bonding_status.get(sender_str) == "ACTIVATING":
- logging.info(f"✅ Bonded successfully with {sender_uuid}")
- bonding_status[sender_str] = "ACTIVE"
+ if bonding_status.get(sender_str) == 'ACTIVATING':
+ logging.info(f'✅ Bonded successfully with {sender_uuid}')
+ bonding_status[sender_str] = 'ACTIVE'
elif state_id == 14: # TRANSITION_STATE_DEACTIVATING
- logging.info(f"❌ Unbonding from {sender_uuid}")
- bonding_status[sender_str] = "DEACTIVATING"
+ logging.info(f'❌ Unbonding from {sender_uuid}')
+ bonding_status[sender_str] = 'DEACTIVATING'
response = build_pstop_message(
- PSTOP_OK, 14, message["stamp"], message["id"], message["receiver_uuid"],
- message["counter"], message["heartbeat_timeout"],
- message["received_stamp"], message["received_counter"]
+ PSTOP_OK,
+ 14,
+ message['stamp'],
+ message['id'],
+ message['receiver_uuid'],
+ message['counter'],
+ message['heartbeat_timeout'],
+ message['received_stamp'],
+ message['received_counter'],
)
publisher.publish(roslibpy.Message(response))
elif state_id == 2: # PRIMARY_STATE_INACTIVE
- if bonding_status.get(sender_str) == "DEACTIVATING":
- logging.info(f"✅ Successfully unbonded from {sender_uuid}")
+ if bonding_status.get(sender_str) == 'DEACTIVATING':
+ logging.info(f'✅ Successfully unbonded from {sender_uuid}')
bonding_status.pop(sender_str, None)
subscriber.subscribe(callback)
- logging.info(f"Publisher and subscriber set up for {ip}:{port}")
+ logging.info(f'Publisher and subscriber set up for {ip}:{port}')
def bonding_thread():
while True:
- for target in shared.get("target_pairs", []):
- target_uuid = target.get("uuid")
- if target_uuid and target["ip"] == ip and target_uuid not in bonding_status:
- logging.info(f"🔄 Initiating bonding with {target_uuid}")
+ for target in shared.get('target_pairs', []):
+ target_uuid = target.get('uuid')
+ if target_uuid and target['ip'] == ip and target_uuid not in bonding_status:
+ logging.info(f'🔄 Initiating bonding with {target_uuid}')
bonding_msg = build_pstop_message(
- PSTOP_OK, 13, # ✅ Uses TRANSITION_STATE_ACTIVATING from build_pstop_message()
- {"sec": int(time.time()), "nanosec": int(
- (time.time() % 1) * 1e9)},
- {"uuid": str(self_uuid)},
- {"uuid": target_uuid},
- 0, {"sec": 1, "nanosec": 500000000},
- {"sec": 0, "nanosec": 0}, 0
+ PSTOP_OK,
+ 13, # ✅ Uses TRANSITION_STATE_ACTIVATING from build_pstop_message()
+ {'sec': int(time.time()), 'nanosec': int((time.time() % 1) * 1e9)},
+ {'uuid': str(self_uuid)},
+ {'uuid': target_uuid},
+ 0,
+ {'sec': 1, 'nanosec': 500000000},
+ {'sec': 0, 'nanosec': 0},
+ 0,
)
publisher.publish(roslibpy.Message(bonding_msg))
- bonding_status[target_uuid] = "ACTIVATING"
+ bonding_status[target_uuid] = 'ACTIVATING'
time.sleep(1)
@@ -273,55 +283,57 @@ def client_thread(ip, rosbridge_port, shared, connected_clients):
client = start_client(ip, rosbridge_port, shared)
publisher, subscriber = setup_client(client, ip, rosbridge_port, shared)
connected_clients[ip] = {
- "client": client,
- "publisher": publisher,
- "subscriber": subscriber,
- "thread": threading.current_thread()
+ 'client': client,
+ 'publisher': publisher,
+ 'subscriber': subscriber,
+ 'thread': threading.current_thread(),
}
counter = 0
# ✅ Ensure receiver UUID is extracted correctly
- receiver_uuid_str = next(
- (m["uuid"] for m in shared["target_pairs"] if m["ip"] == ip), None)
- receiver_uuid = uuid.UUID(receiver_uuid_str.strip(
- )) if receiver_uuid_str else uuid.UUID(int=0)
+ receiver_uuid_str = next((m['uuid'] for m in shared['target_pairs'] if m['ip'] == ip), None)
+ receiver_uuid = uuid.UUID(receiver_uuid_str.strip()) if receiver_uuid_str else uuid.UUID(int=0)
while ip in connected_clients:
if not client.is_connected:
- logging.info(f"Client {ip} disconnected, attempting reconnection.")
+ logging.info(f'Client {ip} disconnected, attempting reconnection.')
client = start_client(ip, rosbridge_port, shared)
- publisher, subscriber = setup_client(
- client, ip, rosbridge_port, shared)
+ publisher, subscriber = setup_client(client, ip, rosbridge_port, shared)
connected_clients[ip] = {
- "client": client,
- "publisher": publisher,
- "subscriber": subscriber,
- "thread": threading.current_thread()
+ 'client': client,
+ 'publisher': publisher,
+ 'subscriber': subscriber,
+ 'thread': threading.current_thread(),
}
# ✅ Convert self UUID properly
- self_uuid = uuid.UUID(shared["uuid"].strip())
+ self_uuid = uuid.UUID(shared['uuid'].strip())
# ✅ Convert both UUIDs to `uint8[16]` lists before sending
- id = {"uuid": list(self_uuid.bytes)}
- receiver_uuid = {"uuid": list(receiver_uuid.bytes)}
+ id = {'uuid': list(self_uuid.bytes)}
+ receiver_uuid = {'uuid': list(receiver_uuid.bytes)}
# ✅ Build and send a protective stop message
- timestamp = {'sec': int(time.time()), 'nanosec': int(
- (time.time() % 1) * 1e9)}
+ timestamp = {'sec': int(time.time()), 'nanosec': int((time.time() % 1) * 1e9)}
state = {'id': 1, 'label': 'ACTIVE'}
heartbeat_timeout = {'sec': 1, 'nanosec': 500000000}
- received_stamp = {
- 'sec': timestamp['sec'], 'nanosec': timestamp['nanosec']}
+ received_stamp = {'sec': timestamp['sec'], 'nanosec': timestamp['nanosec']}
received_counter = counter
button = PSTOP_STOP
- if shared.get("button_pressed") is False:
+ if shared.get('button_pressed') is False:
button = PSTOP_OK # ✅ Only set to OK if the button is NOT pressed
message = build_pstop_message(
- button, state["id"], timestamp, id, receiver_uuid, counter,
- heartbeat_timeout, received_stamp, received_counter
+ button,
+ state['id'],
+ timestamp,
+ id,
+ receiver_uuid,
+ counter,
+ heartbeat_timeout,
+ received_stamp,
+ received_counter,
)
publisher.publish(roslibpy.Message(message))
counter += 1
@@ -331,62 +343,56 @@ def client_thread(ip, rosbridge_port, shared, connected_clients):
def pstop_node(shared):
logging.basicConfig(level=logging.DEBUG)
- logging.info(f"PStop Node started at {time.time()}")
+ logging.info(f'PStop Node started at {time.time()}')
- while "target_pairs" not in shared:
+ while 'target_pairs' not in shared:
time.sleep(0.1)
connected_clients = {}
try:
while True:
- current_machines = {machine['ip'] for machine in shared.get(
- "target_pairs", {}) if 'ip' in machine}
+ current_machines = {machine['ip'] for machine in shared.get('target_pairs', {}) if 'ip' in machine}
current_ips = set(connected_clients.keys())
ips_to_add = current_machines - current_ips
ips_to_remove = current_ips - current_machines
# Add new connections
for ip in ips_to_add:
- logging.info(f"Starting client thread for {ip}")
- thread = threading.Thread(target=client_thread, args=(
- ip, rosbridge_port, shared, connected_clients))
+ logging.info(f'Starting client thread for {ip}')
+ thread = threading.Thread(target=client_thread, args=(ip, rosbridge_port, shared, connected_clients))
thread.daemon = True
thread.start()
# Remove old connections
for ip in ips_to_remove:
- logging.info(f"Stopping client thread for {ip}")
+ logging.info(f'Stopping client thread for {ip}')
connection = connected_clients.pop(ip, None)
if connection:
- connection["publisher"].unadvertise()
- connection["subscriber"].unsubscribe()
- connection["client"].close()
+ connection['publisher'].unadvertise()
+ connection['subscriber'].unsubscribe()
+ connection['client'].close()
time.sleep(1)
except KeyboardInterrupt:
- logging.info("PStop Node exiting.")
+ logging.info('PStop Node exiting.')
finally:
for ip, connection in connected_clients.items():
- logging.info(f"Cleaning up connection for {ip}")
- connection["publisher"].unadvertise()
- connection["subscriber"].unsubscribe()
- connection["client"].terminate()
+ logging.info(f'Cleaning up connection for {ip}')
+ connection['publisher'].unadvertise()
+ connection['subscriber'].unsubscribe()
+ connection['client'].terminate()
# Running standalone for testing, with some default values
if __name__ == '__main__':
shared = {
- "target_pairs": [
- {
- "ip": "100.125.44.33",
- "uuid": "00000000-0000-0000-0000-000000000000",
- "status": "Not Connected"
- }
+ 'target_pairs': [
+ {'ip': '100.125.44.33', 'uuid': '00000000-0000-0000-0000-000000000000', 'status': 'Not Connected'}
],
- "button_pressed": False,
- "uuid": "00000000-0000-0000-0000-000000000001",
+ 'button_pressed': False,
+ 'uuid': '00000000-0000-0000-0000-000000000001',
}
pstop_node(shared)
diff --git a/archive/protective_stop_remote/protective_stop_remote/templates/index.html b/archive/protective_stop_remote/protective_stop_remote/templates/index.html
index a51812a..6ff341b 100644
--- a/archive/protective_stop_remote/protective_stop_remote/templates/index.html
+++ b/archive/protective_stop_remote/protective_stop_remote/templates/index.html
@@ -9,7 +9,7 @@
.then(response => response.json())
.then(data => {
document.getElementById('status_summary').textContent = data.status_summary;
-
+
// Update Tailscale information
document.getElementById('tailscale-ip').textContent = data.TailscaleIP;
document.getElementById('dns-name').textContent = data.DNSName;
@@ -61,7 +61,7 @@