This guide walks through building a peer application in Python that runs under The Mule test orchestrator.
- Python 3.10+
- Docker (for building the container image)
- A working
tmsetup (see QUICKSTART.md)
The client library lives in lib/python/ of The Mule repository:
pip install -e path/to/the-mule/lib/python/Or copy the the_mule/ package into your project.
The client library handles Redis connection, log forwarding via a background thread, and async command iteration. Your code needs to:
- Build the client
- Send a
"started"status (with optional VLAD and multiaddr) - Iterate over incoming commands
- Send a
"stopped"status before exiting
#!/usr/bin/env python3
import asyncio
import logging
import sys
from the_mule import MuleClientBuilder
async def main() -> None:
client = await MuleClientBuilder().build()
# Tell the orchestrator we are ready.
# Use "started|<vlad>|<multiaddr>" if your app has identity/network info.
await client.send_status("started")
# Process commands from the orchestrator.
# MuleClient is an async iterator that yields raw command strings.
async for command in client:
logging.info(f"received: {command}")
if command == "connect":
logging.info("connecting to network...")
# ... your connect logic ...
await client.send_status("connected")
elif command == "disconnect":
logging.info("disconnecting...")
# ... your disconnect logic ...
await client.send_status("disconnected")
elif command.startswith("push|"):
parts = command.split("|", 2)
peer, message = parts[1], parts[2]
logging.info(f"push to {peer}: {message}")
# ... your push logic ...
elif command == "pull":
logging.info("pulling messages...")
# ... your pull logic ...
elif command == "rotate-key":
logging.info("rotating key...")
# ... your key rotation logic ...
elif command.startswith("track|"):
peer = command.split("|", 1)[1]
logging.info(f"tracking {peer}")
# ... your tracking logic ...
elif command.startswith("peer|"):
parts = command.split("|", 2)
vlad, multiaddr = parts[1], parts[2]
logging.info(f"adding bootstrap peer: {vlad} at {multiaddr}")
# ... add peer to your routing table ...
elif command.startswith("restart|"):
delay = command.split("|", 1)[1]
await client.send_status("restarting")
with open("/tmp/delay", "w") as f:
f.write(delay)
sys.exit(42)
elif command == "shutdown":
await client.send_status("stopped")
break
await client.close()
asyncio.run(main())The MuleClient async iterator calls BLPOP {peer}_command 0 on each
iteration — this blocks on the Redis server until a command is available, then
returns it as a string. Because it uses redis.asyncio, the await yields
control to the asyncio event loop so your other coroutines (status updates, log
handling, etc.) continue to run. There is no polling.
The client installs a logging.Handler that captures log records and forwards
them to Redis via LPUSH {peer}_log "level|message" in a background thread.
Use logging.info(), logging.error(), etc. as normal and they appear in the
orchestrator's log output and TUI automatically.
FROM python:3.12-slim
WORKDIR /app
COPY the_mule/ ./the_mule/
COPY my_peer.py .
RUN pip install redis
COPY entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]The entrypoint script supports the restart protocol (exit code 42):
#!/bin/bash
set -e
while true; do
python3 /app/my_peer.py "$@" && break
EXIT_CODE=$?
if [ "$EXIT_CODE" -eq 42 ] && [ -f /tmp/delay ]; then
DELAY=$(cat /tmp/delay)
rm -f /tmp/delay
echo "restarting in ${DELAY}s..."
sleep "$DELAY"
else
exit $EXIT_CODE
fi
donedocker build -t my-python-peer:latest .# my-test.yaml
name: "my-python-peer-test"
timeout:
startup: 60
shutdown: 30
redis:
port: 6399
image: "redis:7-alpine"
hosts:
- address: localhost
ssh_user: user
ssh_auth: agent
base_port: 11984
peers:
- name: alice
image: "my-python-peer:latest"
bootstrap: [bob]
environment:
- LOG_LEVEL=INFO
- name: bob
image: "my-python-peer:latest"
bootstrap: [alice]
environment:
- LOG_LEVEL=INFO
commands:
- { time: 0, peer: alice, command: "connect" }
- { time: 0, peer: bob, command: "connect" }
- { time: 10, peer: alice, command: "push|bob|hello" }
- { time: 15, peer: bob, command: "pull" }RUST_LOG=info tm my-test.yaml --verboseThe orchestrator sets these environment variables on your container:
| Variable | Description |
|---|---|
REDIS_URL |
Redis connection URL (e.g., redis://192.168.1.10:6399) |
PEER_NAME |
This peer's name (e.g., alice) |
LISTEN_ADDR |
Multiaddr to listen on (e.g., /ip4/0.0.0.0/udp/11984/quic-v1) |
LOG_LEVEL |
Python log level (from peer config environment) |
| Item | Description |
|---|---|
MuleClientBuilder() |
Create builder, reads REDIS_URL, PEER_NAME, LOG_LEVEL from env |
.redis_url(url) / .peer_name(name) |
Override env var values |
await .build() |
Connect to Redis, install log handler |
await client.send_status(status) |
SET the status key (triggers orchestrator notification) |
async for command in client: |
Yields raw command strings via BLPOP |
await client.close() |
Close Redis connection and log handler |