diff --git a/.github/workflows/build-viewer.yml b/.github/workflows/build-viewer.yml new file mode 100644 index 000000000000..eafc427779f5 --- /dev/null +++ b/.github/workflows/build-viewer.yml @@ -0,0 +1,141 @@ +name: Build Interactive Viewer + +on: + push: + branches: [main, dev] + tags: ['v*'] + pull_request: + branches: [main, dev] + +jobs: + build-linux: + runs-on: ubuntu-22.04 + steps: + - uses: actions/checkout@v4 + with: + submodules: recursive + + - name: Install Rust + uses: dtolnay/rust-toolchain@stable + with: + toolchain: 1.92.0 + + - name: Install system dependencies + run: | + sudo apt-get update + sudo apt-get install -y \ + libgtk-3-dev \ + libxcb-render0-dev \ + libxcb-shape0-dev \ + libxcb-xfixes0-dev \ + libxkbcommon-dev \ + libvulkan-dev \ + mesa-vulkan-drivers \ + xvfb \ + libxkbcommon-x11-0 + + - name: Cache cargo + uses: actions/cache@v3 + with: + path: | + ~/.cargo/bin/ + ~/.cargo/registry/index/ + ~/.cargo/registry/cache/ + ~/.cargo/git/db/ + target/ + key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} + + - name: Build custom_callback viewer + run: | + cd rerun + cargo build --release -p custom_callback + + - name: Run tests + run: | + cd rerun + cargo test -p custom_callback + + # Python bridge tests + cd ../engineering/DIM-643/python_bridge + python3 test_bridge_bincode.py + + # DimOS integration tests + cd ../dimos_integration + python3 test_rerun_interaction_module.py + + - name: Package binary + run: | + cd rerun/target/release + strip custom_callback_viewer + tar -czf dimos-viewer-${{ github.ref_name }}-linux-x64.tar.gz \ + custom_callback_viewer + + - name: Upload artifact + uses: actions/upload-artifact@v3 + with: + name: dimos-viewer-linux-x64 + path: rerun/target/release/dimos-viewer-*.tar.gz + + build-macos: + runs-on: macos-14 # arm64 + steps: + - uses: actions/checkout@v4 + with: + submodules: recursive + + - name: Install Rust + uses: dtolnay/rust-toolchain@stable + with: + toolchain: 1.92.0 + + - name: Cache cargo + uses: actions/cache@v3 + with: + path: | + ~/.cargo/bin/ + ~/.cargo/registry/index/ + ~/.cargo/registry/cache/ + ~/.cargo/git/db/ + target/ + key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} + + - name: Build custom_callback viewer + run: | + cd rerun + cargo build --release -p custom_callback + + - name: Run tests + run: | + cd rerun + cargo test -p custom_callback + + - name: Package binary + run: | + cd rerun/target/release + strip custom_callback_viewer + tar -czf dimos-viewer-${{ github.ref_name }}-macos-arm64.tar.gz \ + custom_callback_viewer + + - name: Upload artifact + uses: actions/upload-artifact@v3 + with: + name: dimos-viewer-macos-arm64 + path: rerun/target/release/dimos-viewer-*.tar.gz + + release: + needs: [build-linux, build-macos] + runs-on: ubuntu-22.04 + if: startsWith(github.ref, 'refs/tags/v') + steps: + - name: Download all artifacts + uses: actions/download-artifact@v3 + + - name: Create GitHub Release + uses: softprops/action-gh-release@v1 + with: + files: | + dimos-viewer-linux-x64/dimos-viewer-*.tar.gz + dimos-viewer-macos-arm64/dimos-viewer-*.tar.gz + generate_release_notes: true + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/examples/rust/custom_callback/python_bridge/bincode_codec.py b/examples/rust/custom_callback/python_bridge/bincode_codec.py new file mode 100644 index 000000000000..e8f84dce4dc1 --- /dev/null +++ b/examples/rust/custom_callback/python_bridge/bincode_codec.py @@ -0,0 +1,369 @@ +"""Minimal bincode codec for ViewerEvent and AppCommand messages. + +Implements the subset of bincode serialization needed to communicate with +the Rust viewer. Supports Rust enums, Option, String, Vec, arrays, and +primitives (u32, u64, f32, bool). + +Bincode format (default config): +- Integers: Little-endian +- Strings: Length (u64 LE) + UTF-8 bytes +- Option: 1 byte (0=None, 1=Some) + T if Some +- Vec: Length (u64 LE) + elements +- Enums: Variant index (u32 LE) + variant fields +""" + +import struct +from typing import Optional, Any +from dataclasses import dataclass + + +class BincodeError(Exception): + """Bincode encoding/decoding error.""" + pass + + +class BincodeEncoder: + """Encode Python objects to bincode format.""" + + def __init__(self): + self.buffer = bytearray() + + def write_u32(self, value: int): + """Write a u32 (4 bytes, little-endian).""" + self.buffer.extend(struct.pack(' (1 byte tag + String if Some).""" + if value is None: + self.buffer.append(0) + else: + self.buffer.append(1) + self.write_string(value) + + def write_vec_f32_array3(self, values: list[tuple[float, float, float]]): + """Write a Vec<[f32; 3]> (length u64 + array elements).""" + self.write_u64(len(values)) + for x, y, z in values: + self.write_f32(x) + self.write_f32(y) + self.write_f32(z) + + def write_f32_array3(self, x: float, y: float, z: float): + """Write a [f32; 3] array.""" + self.write_f32(x) + self.write_f32(y) + self.write_f32(z) + + def bytes(self) -> bytes: + """Get the encoded bytes.""" + return bytes(self.buffer) + + +class BincodeDecoder: + """Decode bincode format to Python objects.""" + + def __init__(self, data: bytes): + self.data = data + self.offset = 0 + + def read_u32(self) -> int: + """Read a u32 (4 bytes, little-endian).""" + if self.offset + 4 > len(self.data): + raise BincodeError(f"Not enough data for u32 at offset {self.offset}") + value = struct.unpack_from(' int: + """Read a u64 (8 bytes, little-endian).""" + if self.offset + 8 > len(self.data): + raise BincodeError(f"Not enough data for u64 at offset {self.offset}") + value = struct.unpack_from(' float: + """Read an f32 (4 bytes, little-endian).""" + if self.offset + 4 > len(self.data): + raise BincodeError(f"Not enough data for f32 at offset {self.offset}") + value = struct.unpack_from(' bool: + """Read a bool (1 byte, 0 or 1).""" + if self.offset + 1 > len(self.data): + raise BincodeError(f"Not enough data for bool at offset {self.offset}") + value = self.data[self.offset] + self.offset += 1 + if value > 1: + raise BincodeError(f"Invalid bool value: {value}") + return value == 1 + + def read_string(self) -> str: + """Read a String (length u64 + UTF-8 bytes).""" + length = self.read_u64() + if self.offset + length > len(self.data): + raise BincodeError(f"Not enough data for string of length {length}") + utf8 = self.data[self.offset:self.offset + length] + self.offset += length + try: + return utf8.decode('utf-8') + except UnicodeDecodeError as e: + raise BincodeError(f"Invalid UTF-8 in string: {e}") + + def read_option_string(self) -> Optional[str]: + """Read an Option (1 byte tag + String if Some).""" + tag = self.data[self.offset] + self.offset += 1 + + if tag == 0: + return None + elif tag == 1: + return self.read_string() + else: + raise BincodeError(f"Invalid Option tag: {tag}") + + def read_vec_f32_array3(self) -> list[tuple[float, float, float]]: + """Read a Vec<[f32; 3]> (length u64 + array elements).""" + length = self.read_u64() + waypoints = [] + for _ in range(length): + x = self.read_f32() + y = self.read_f32() + z = self.read_f32() + waypoints.append((x, y, z)) + return waypoints + + def read_f32_array3(self) -> tuple[float, float, float]: + """Read a [f32; 3] array.""" + x = self.read_f32() + y = self.read_f32() + z = self.read_f32() + return (x, y, z) + + +@dataclass +class ViewerEvent: + """Base class for viewer events.""" + pass + + +@dataclass +class ClickEvent(ViewerEvent): + """User clicked in a spatial view.""" + position: tuple[float, float, float] + entity_path: Optional[str] + view_id: str + timestamp_ms: int + is_2d: bool + + @property + def x(self) -> float: + return self.position[0] + + @property + def y(self) -> float: + return self.position[1] + + @property + def z(self) -> float: + return self.position[2] + + @property + def timestamp(self) -> float: + """Unix timestamp in seconds.""" + return self.timestamp_ms / 1000.0 + + +@dataclass +class WaypointCompleteEvent(ViewerEvent): + """Waypoint sequence completed.""" + waypoints: list[tuple[float, float, float]] + + +@dataclass +class ModeChangedEvent(ViewerEvent): + """Interaction mode changed.""" + mode: str + + +@dataclass +class DisconnectEvent(ViewerEvent): + """Viewer is disconnecting.""" + pass + + +def decode_viewer_event(data: bytes) -> ViewerEvent: + """Decode a ViewerEvent from bincode bytes. + + Rust enum ViewerEvent variants: + - 0: Click + - 1: WaypointComplete + - 2: ModeChanged + - 3: Disconnect + """ + decoder = BincodeDecoder(data) + variant_index = decoder.read_u32() + + if variant_index == 0: # Click + position = decoder.read_f32_array3() + entity_path = decoder.read_option_string() + view_id = decoder.read_string() + timestamp_ms = decoder.read_u64() + is_2d = decoder.read_bool() + + return ClickEvent( + position=position, + entity_path=entity_path, + view_id=view_id, + timestamp_ms=timestamp_ms, + is_2d=is_2d, + ) + + elif variant_index == 1: # WaypointComplete + waypoints = decoder.read_vec_f32_array3() + return WaypointCompleteEvent(waypoints=waypoints) + + elif variant_index == 2: # ModeChanged + mode = decoder.read_string() + return ModeChangedEvent(mode=mode) + + elif variant_index == 3: # Disconnect + return DisconnectEvent() + + else: + raise BincodeError(f"Unknown ViewerEvent variant: {variant_index}") + + +@dataclass +class AppCommand: + """Base class for app commands.""" + pass + + +@dataclass +class SetModeCommand(AppCommand): + """Change the interaction mode.""" + mode: str + + +@dataclass +class ClearWaypointsCommand(AppCommand): + """Clear all waypoint markers.""" + pass + + +@dataclass +class SetCursorCommand(AppCommand): + """Set the cursor style.""" + cursor: str + + +def encode_app_command(command: AppCommand) -> bytes: + """Encode an AppCommand to bincode bytes. + + Rust enum AppCommand variants: + - 0: SetMode + - 1: ClearWaypoints + - 2: SetCursor + """ + encoder = BincodeEncoder() + + if isinstance(command, SetModeCommand): + encoder.write_u32(0) # Variant index + encoder.write_string(command.mode) + + elif isinstance(command, ClearWaypointsCommand): + encoder.write_u32(1) # Variant index + # No fields + + elif isinstance(command, SetCursorCommand): + encoder.write_u32(2) # Variant index + encoder.write_string(command.cursor) + + else: + raise BincodeError(f"Unknown AppCommand type: {type(command)}") + + return encoder.bytes() + + +if __name__ == "__main__": + # Quick test + import sys + + print("Testing bincode codec...") + + # Test Click event roundtrip (encode in Rust, decode in Python) + # This would normally come from the Rust viewer + print("\n1. Testing Click event decode:") + encoder = BincodeEncoder() + encoder.write_u32(0) # Click variant + encoder.write_f32_array3(1.5, 2.5, 3.5) # position + encoder.buffer.append(1) # Option::Some + encoder.write_string("world/robot") # entity_path + encoder.write_string("view_123") # view_id + encoder.write_u64(1234567890) # timestamp_ms + encoder.write_bool(False) # is_2d + + event_bytes = encoder.bytes() + print(f"Encoded {len(event_bytes)} bytes: {event_bytes.hex()}") + + event = decode_viewer_event(event_bytes) + print(f"Decoded: {event}") + assert isinstance(event, ClickEvent) + assert event.position == (1.5, 2.5, 3.5) + assert event.entity_path == "world/robot" + assert event.view_id == "view_123" + assert event.timestamp_ms == 1234567890 + assert event.is_2d == False + print("āœ“ Click event OK") + + # Test SetMode command encode + print("\n2. Testing SetMode command encode:") + cmd = SetModeCommand(mode="waypoint") + cmd_bytes = encode_app_command(cmd) + print(f"Encoded {len(cmd_bytes)} bytes: {cmd_bytes.hex()}") + + # Verify structure + decoder = BincodeDecoder(cmd_bytes) + variant = decoder.read_u32() + mode_str = decoder.read_string() + assert variant == 0 + assert mode_str == "waypoint" + print("āœ“ SetMode command OK") + + # Test WaypointComplete + print("\n3. Testing WaypointComplete event:") + encoder = BincodeEncoder() + encoder.write_u32(1) # WaypointComplete variant + encoder.write_vec_f32_array3([(1.0, 2.0, 3.0), (4.0, 5.0, 6.0)]) + + event_bytes = encoder.bytes() + event = decode_viewer_event(event_bytes) + print(f"Decoded: {event}") + assert isinstance(event, WaypointCompleteEvent) + assert len(event.waypoints) == 2 + assert event.waypoints[0] == (1.0, 2.0, 3.0) + assert event.waypoints[1] == (4.0, 5.0, 6.0) + print("āœ“ WaypointComplete OK") + + print("\nāœ… All codec tests passed!") diff --git a/examples/rust/custom_callback/python_bridge/bridge.py b/examples/rust/custom_callback/python_bridge/bridge.py new file mode 100644 index 000000000000..a892963e9330 --- /dev/null +++ b/examples/rust/custom_callback/python_bridge/bridge.py @@ -0,0 +1,359 @@ +"""ViewerBridge - Python client for interactive Rerun viewer. + +Connects to the custom Rerun viewer over TCP and provides a callback-based API +for handling click events, waypoint completions, and sending commands back to +the viewer. + +Example usage: + from viewer_bridge import ViewerBridge, ClickEvent + + bridge = ViewerBridge(port=8888) + + @bridge.on_click + def handle_click(event: ClickEvent): + print(f"Clicked at ({event.x}, {event.y}, {event.z})") + + bridge.start() # Blocks until viewer disconnects +""" + +import socket +import struct +import threading +from dataclasses import dataclass +from enum import Enum +from typing import Callable, Optional +import time + + +@dataclass +class ClickEvent: + """Event emitted when the user clicks in a spatial view.""" + + x: float # World-space X coordinate + y: float # World-space Y coordinate + z: float # World-space Z coordinate + entity_path: Optional[str] # Rerun entity path that was clicked (if any) + view_id: str # Which spatial view the click occurred in + timestamp: float # Unix timestamp of the click (seconds) + is_2d: bool # Whether this was a 2D or 3D view click + + @property + def position(self) -> tuple[float, float, float]: + """Get position as a tuple.""" + return (self.x, self.y, self.z) + + +@dataclass +class WaypointRoute: + """Event emitted when a waypoint sequence is completed.""" + + waypoints: list[tuple[float, float, float]] # Ordered list of (x, y, z) + timestamp: float # Unix timestamp of completion + + @property + def total_distance(self) -> float: + """Compute total path distance (sum of segment lengths).""" + if len(self.waypoints) < 2: + return 0.0 + + distance = 0.0 + for i in range(len(self.waypoints) - 1): + p1 = self.waypoints[i] + p2 = self.waypoints[i + 1] + dx = p2[0] - p1[0] + dy = p2[1] - p1[1] + dz = p2[2] - p1[2] + distance += (dx**2 + dy**2 + dz**2) ** 0.5 + + return distance + + +class InteractionMode(Enum): + """Viewer interaction modes.""" + NORMAL = "normal" # Standard rerun behavior (no events sent) + CLICK = "click" # Single click reports position + WAYPOINT = "waypoint" # Sequential clicks build a route + + +class ViewerBridge: + """TCP client for bidirectional communication with the interactive Rerun viewer. + + Connects to the custom viewer, receives click/waypoint events, and can send + commands back to the viewer (mode changes, cursor updates, etc.). + + Thread-safe: callbacks are invoked on a background thread. + """ + + def __init__(self, host: str = "127.0.0.1", port: int = 8888): + """Initialize the bridge. + + Args: + host: Viewer TCP server hostname/IP + port: Viewer TCP server port + """ + self.host = host + self.port = port + self.socket: Optional[socket.socket] = None + self.running = False + self.thread: Optional[threading.Thread] = None + + # Callback registrations + self._click_handlers: list[Callable[[ClickEvent], None]] = [] + self._waypoint_handlers: list[Callable[[WaypointRoute], None]] = [] + self._mode_changed_handlers: list[Callable[[str], None]] = [] + self._disconnect_handlers: list[Callable[[], None]] = [] + + def on_click(self, handler: Callable[[ClickEvent], None]): + """Register a click event handler (decorator). + + Example: + @bridge.on_click + def handle_click(event: ClickEvent): + print(f"Clicked at {event.position}") + """ + self._click_handlers.append(handler) + return handler + + def on_waypoint_complete(self, handler: Callable[[WaypointRoute], None]): + """Register a waypoint completion handler (decorator). + + Example: + @bridge.on_waypoint_complete + def handle_route(route: WaypointRoute): + print(f"Route: {len(route.waypoints)} waypoints") + """ + self._waypoint_handlers.append(handler) + return handler + + def on_mode_changed(self, handler: Callable[[str], None]): + """Register a mode changed handler (decorator).""" + self._mode_changed_handlers.append(handler) + return handler + + def on_disconnect(self, handler: Callable[[], None]): + """Register a disconnect handler (decorator).""" + self._disconnect_handlers.append(handler) + return handler + + def send_command(self, command: dict): + """Send a command to the viewer. + + Args: + command: Dict with "type" key and command-specific fields + e.g. {"type": "SetMode", "mode": "click"} + + Raises: + RuntimeError: If not connected + """ + if not self.socket: + raise RuntimeError("Not connected to viewer") + + # For now, we'll implement a simple JSON protocol + # In production, this should use the same bincode format as Rust + import json + data = json.dumps(command).encode('utf-8') + length = struct.pack('>I', len(data)) + + try: + self.socket.sendall(length + data) + except (socket.error, BrokenPipeError) as e: + print(f"Failed to send command: {e}") + self.running = False + + def set_mode(self, mode: InteractionMode): + """Change the viewer interaction mode. + + Args: + mode: Target interaction mode + """ + self.send_command({ + "type": "SetMode", + "mode": mode.value, + }) + + def clear_waypoints(self): + """Clear all waypoint markers in the viewer.""" + self.send_command({"type": "ClearWaypoints"}) + + def set_cursor(self, cursor: str): + """Set the viewer cursor style. + + Args: + cursor: Cursor name ("default", "crosshair", "pointer") + """ + self.send_command({ + "type": "SetCursor", + "cursor": cursor, + }) + + def start(self, blocking: bool = True): + """Start the bridge (connect and listen for events). + + Args: + blocking: If True, blocks until viewer disconnects. + If False, runs in background thread. + + Raises: + ConnectionError: If cannot connect to viewer + """ + # Connect to viewer + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + self.socket.connect((self.host, self.port)) + print(f"Connected to viewer at {self.host}:{self.port}") + except socket.error as e: + raise ConnectionError(f"Failed to connect to viewer: {e}") + + self.running = True + + if blocking: + self._run_loop() + else: + self.thread = threading.Thread(target=self._run_loop, daemon=True) + self.thread.start() + + def stop(self): + """Stop the bridge and disconnect.""" + self.running = False + if self.socket: + try: + self.socket.close() + except: + pass + self.socket = None + + def _run_loop(self): + """Main event loop (runs on background thread if non-blocking).""" + try: + while self.running: + # Read message length (4 bytes, big-endian u32) + length_data = self._recv_exact(4) + if not length_data: + break + + length = struct.unpack('>I', length_data)[0] + + # Read message body + message_data = self._recv_exact(length) + if not message_data: + break + + # Parse and dispatch + self._handle_message(message_data) + + except Exception as e: + print(f"Bridge error: {e}") + + finally: + self.running = False + if self.socket: + try: + self.socket.close() + except: + pass + + # Notify disconnect handlers + for handler in self._disconnect_handlers: + try: + handler() + except Exception as e: + print(f"Disconnect handler error: {e}") + + def _recv_exact(self, n: int) -> Optional[bytes]: + """Receive exactly n bytes or return None on disconnect.""" + data = b'' + while len(data) < n: + chunk = self.socket.recv(n - len(data)) + if not chunk: + return None + data += chunk + return data + + def _handle_message(self, data: bytes): + """Parse and dispatch a viewer event message. + + For now, uses JSON for simplicity. In production, should use bincode + to match the Rust side exactly. + """ + import json + + try: + event = json.loads(data.decode('utf-8')) + event_type = event.get("type") + + if event_type == "Click": + click_event = ClickEvent( + x=event["position"][0], + y=event["position"][1], + z=event["position"][2], + entity_path=event.get("entity_path"), + view_id=event["view_id"], + timestamp=event["timestamp_ms"] / 1000.0, + is_2d=event["is_2d"], + ) + + for handler in self._click_handlers: + try: + handler(click_event) + except Exception as e: + print(f"Click handler error: {e}") + + elif event_type == "WaypointComplete": + route = WaypointRoute( + waypoints=[tuple(wp) for wp in event["waypoints"]], + timestamp=time.time(), + ) + + for handler in self._waypoint_handlers: + try: + handler(route) + except Exception as e: + print(f"Waypoint handler error: {e}") + + elif event_type == "ModeChanged": + mode = event["mode"] + + for handler in self._mode_changed_handlers: + try: + handler(mode) + except Exception as e: + print(f"Mode handler error: {e}") + + elif event_type == "Disconnect": + self.running = False + + else: + print(f"Unknown event type: {event_type}") + + except Exception as e: + print(f"Failed to parse message: {e}") + + +if __name__ == "__main__": + # Example usage + bridge = ViewerBridge(port=8888) + + @bridge.on_click + def handle_click(event: ClickEvent): + print(f"Clicked at ({event.x:.2f}, {event.y:.2f}, {event.z:.2f})") + if event.entity_path: + print(f" Entity: {event.entity_path}") + print(f" View: {event.view_id}") + + @bridge.on_waypoint_complete + def handle_route(route: WaypointRoute): + print(f"Route completed: {len(route.waypoints)} waypoints") + print(f"Total distance: {route.total_distance:.2f} meters") + for i, (x, y, z) in enumerate(route.waypoints): + print(f" {i+1}. ({x:.2f}, {y:.2f}, {z:.2f})") + + @bridge.on_disconnect + def handle_disconnect(): + print("Viewer disconnected") + + print("Waiting for viewer connection...") + try: + bridge.start(blocking=True) + except KeyboardInterrupt: + print("\nShutting down...") + bridge.stop() diff --git a/examples/rust/custom_callback/python_bridge/bridge_bincode.py b/examples/rust/custom_callback/python_bridge/bridge_bincode.py new file mode 100644 index 000000000000..1d1b50586bd8 --- /dev/null +++ b/examples/rust/custom_callback/python_bridge/bridge_bincode.py @@ -0,0 +1,350 @@ +"""ViewerBridge - Python client for interactive Rerun viewer (bincode protocol). + +Connects to the custom Rerun viewer over TCP and provides a callback-based API +for handling click events, waypoint completions, and sending commands back to +the viewer. + +Uses bincode serialization to match the Rust viewer protocol exactly. + +Example usage: + from bridge_bincode import ViewerBridge + from bincode_codec import ClickEvent + + bridge = ViewerBridge(port=8888) + + @bridge.on_click + def handle_click(event: ClickEvent): + print(f"Clicked at {event.position}") + + bridge.start() # Blocks until viewer disconnects +""" + +import socket +import struct +import threading +from typing import Callable, Optional +from dataclasses import dataclass +from enum import Enum + +from bincode_codec import ( + ViewerEvent, ClickEvent, WaypointCompleteEvent, ModeChangedEvent, DisconnectEvent, + AppCommand, SetModeCommand, ClearWaypointsCommand, SetCursorCommand, + decode_viewer_event, encode_app_command, +) + + +class InteractionMode(Enum): + """Viewer interaction modes.""" + NORMAL = "normal" # Standard rerun behavior (no events sent) + CLICK = "click" # Single click reports position + WAYPOINT = "waypoint" # Sequential clicks build a route + + +@dataclass +class WaypointRoute: + """Convenience wrapper for waypoint completion events.""" + waypoints: list[tuple[float, float, float]] + + @property + def total_distance(self) -> float: + """Compute total path distance (sum of segment lengths).""" + if len(self.waypoints) < 2: + return 0.0 + + distance = 0.0 + for i in range(len(self.waypoints) - 1): + p1 = self.waypoints[i] + p2 = self.waypoints[i + 1] + dx = p2[0] - p1[0] + dy = p2[1] - p1[1] + dz = p2[2] - p1[2] + distance += (dx**2 + dy**2 + dz**2) ** 0.5 + + return distance + + +class ViewerBridge: + """TCP client for bidirectional communication with the interactive Rerun viewer. + + Connects to the custom viewer, receives click/waypoint events (bincode), and can + send commands back to the viewer (bincode). + + Thread-safe: callbacks are invoked on a background thread. + """ + + def __init__(self, host: str = "127.0.0.1", port: int = 8888): + """Initialize the bridge. + + Args: + host: Viewer TCP server hostname/IP + port: Viewer TCP server port + """ + self.host = host + self.port = port + self.socket: Optional[socket.socket] = None + self.running = False + self.thread: Optional[threading.Thread] = None + + # Callback registrations + self._click_handlers: list[Callable[[ClickEvent], None]] = [] + self._waypoint_handlers: list[Callable[[WaypointRoute], None]] = [] + self._mode_changed_handlers: list[Callable[[str], None]] = [] + self._disconnect_handlers: list[Callable[[], None]] = [] + + def on_click(self, handler: Callable[[ClickEvent], None]): + """Register a click event handler (decorator). + + Example: + @bridge.on_click + def handle_click(event: ClickEvent): + print(f"Clicked at {event.position}") + """ + self._click_handlers.append(handler) + return handler + + def on_waypoint_complete(self, handler: Callable[[WaypointRoute], None]): + """Register a waypoint completion handler (decorator). + + Example: + @bridge.on_waypoint_complete + def handle_route(route: WaypointRoute): + print(f"Route: {len(route.waypoints)} waypoints") + """ + self._waypoint_handlers.append(handler) + return handler + + def on_mode_changed(self, handler: Callable[[str], None]): + """Register a mode changed handler (decorator).""" + self._mode_changed_handlers.append(handler) + return handler + + def on_disconnect(self, handler: Callable[[], None]): + """Register a disconnect handler (decorator).""" + self._disconnect_handlers.append(handler) + return handler + + def send_command(self, command: AppCommand): + """Send a command to the viewer (bincode-encoded). + + Args: + command: AppCommand instance (SetModeCommand, etc.) + + Raises: + RuntimeError: If not connected + """ + if not self.socket: + raise RuntimeError("Not connected to viewer") + + try: + data = encode_app_command(command) + length = struct.pack('>I', len(data)) # Big-endian u32 for length prefix + self.socket.sendall(length + data) + except (socket.error, BrokenPipeError) as e: + print(f"Failed to send command: {e}") + self.running = False + + def set_mode(self, mode: InteractionMode): + """Change the viewer interaction mode. + + Args: + mode: Target interaction mode + """ + self.send_command(SetModeCommand(mode=mode.value)) + + def clear_waypoints(self): + """Clear all waypoint markers in the viewer.""" + self.send_command(ClearWaypointsCommand()) + + def set_cursor(self, cursor: str): + """Set the viewer cursor style. + + Args: + cursor: Cursor name ("default", "crosshair", "pointer") + """ + self.send_command(SetCursorCommand(cursor=cursor)) + + def start(self, blocking: bool = True): + """Start the bridge (listen for viewer connections and handle events). + + Args: + blocking: If True, blocks until viewer disconnects. + If False, runs in background thread. + + Raises: + ConnectionError: If cannot bind the server socket + """ + # Create and bind the server socket + self._server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + try: + self._server_socket.bind((self.host, self.port)) + self._server_socket.listen(1) + print(f"Waiting for viewer connection on {self.host}:{self.port}...") + except socket.error as e: + raise ConnectionError(f"Failed to start server: {e}") + + self.running = True + + if blocking: + self._accept_and_run() + else: + self.thread = threading.Thread(target=self._accept_and_run, daemon=True) + self.thread.start() + + def _accept_and_run(self): + """Accept a viewer connection, then enter the event loop.""" + try: + self._server_socket.settimeout(30.0) # Don't block forever + self.socket, addr = self._server_socket.accept() + print(f"Viewer connected from {addr}") + self._run_loop() + except socket.timeout: + print("No viewer connected within timeout") + except OSError: + pass # Socket closed during shutdown + finally: + if self._server_socket: + try: + self._server_socket.close() + except Exception: + pass + + def stop(self): + """Stop the bridge and disconnect.""" + self.running = False + if hasattr(self, '_server_socket') and self._server_socket: + try: + self._server_socket.close() + except Exception: + pass + self._server_socket = None + if self.socket: + try: + self.socket.close() + except Exception: + pass + self.socket = None + + def _run_loop(self): + """Main event loop (runs on background thread if non-blocking).""" + try: + while self.running: + # Read message length (4 bytes, big-endian u32) + length_data = self._recv_exact(4) + if not length_data: + break + + length = struct.unpack('>I', length_data)[0] + + # Read message body (bincode-encoded ViewerEvent) + message_data = self._recv_exact(length) + if not message_data: + break + + # Decode and dispatch + self._handle_message(message_data) + + except Exception as e: + print(f"Bridge error: {e}") + import traceback + traceback.print_exc() + + finally: + self.running = False + if self.socket: + try: + self.socket.close() + except: + pass + + # Notify disconnect handlers + for handler in self._disconnect_handlers: + try: + handler() + except Exception as e: + print(f"Disconnect handler error: {e}") + + def _recv_exact(self, n: int) -> Optional[bytes]: + """Receive exactly n bytes or return None on disconnect.""" + data = b'' + while len(data) < n: + chunk = self.socket.recv(n - len(data)) + if not chunk: + return None + data += chunk + return data + + def _handle_message(self, data: bytes): + """Parse and dispatch a viewer event message (bincode-encoded).""" + try: + event = decode_viewer_event(data) + + if isinstance(event, ClickEvent): + for handler in self._click_handlers: + try: + handler(event) + except Exception as e: + print(f"Click handler error: {e}") + import traceback + traceback.print_exc() + + elif isinstance(event, WaypointCompleteEvent): + route = WaypointRoute(waypoints=event.waypoints) + + for handler in self._waypoint_handlers: + try: + handler(route) + except Exception as e: + print(f"Waypoint handler error: {e}") + import traceback + traceback.print_exc() + + elif isinstance(event, ModeChangedEvent): + for handler in self._mode_changed_handlers: + try: + handler(event.mode) + except Exception as e: + print(f"Mode handler error: {e}") + import traceback + traceback.print_exc() + + elif isinstance(event, DisconnectEvent): + self.running = False + + else: + print(f"Unknown event type: {type(event)}") + + except Exception as e: + print(f"Failed to parse message: {e}") + import traceback + traceback.print_exc() + + +if __name__ == "__main__": + # Example usage + bridge = ViewerBridge(port=8888) + + @bridge.on_click + def handle_click(event: ClickEvent): + print(f"Clicked at ({event.x:.2f}, {event.y:.2f}, {event.z:.2f})") + if event.entity_path: + print(f" Entity: {event.entity_path}") + print(f" View: {event.view_id}") + + @bridge.on_waypoint_complete + def handle_route(route: WaypointRoute): + print(f"Route completed: {len(route.waypoints)} waypoints") + print(f"Total distance: {route.total_distance:.2f} meters") + for i, (x, y, z) in enumerate(route.waypoints): + print(f" {i+1}. ({x:.2f}, {y:.2f}, {z:.2f})") + + @bridge.on_disconnect + def handle_disconnect(): + print("Viewer disconnected") + + print("Waiting for viewer connection...") + try: + bridge.start(blocking=True) + except KeyboardInterrupt: + print("\nShutting down...") + bridge.stop() diff --git a/examples/rust/custom_callback/python_bridge/ground_plane.py b/examples/rust/custom_callback/python_bridge/ground_plane.py new file mode 100644 index 000000000000..3dfbfddb80b4 --- /dev/null +++ b/examples/rust/custom_callback/python_bridge/ground_plane.py @@ -0,0 +1,72 @@ +"""Invisible ground plane for click-anywhere support. + +Rerun's picking system only reports coordinates when clicking on an entity. +This module logs a large, nearly-invisible ground plane mesh so that clicks +on "empty" floor space still produce world-space coordinates. + +Usage: + from ground_plane import log_ground_plane + + # After rr.init() and rr.connect_grpc(): + log_ground_plane() # logs at z=0, 100m x 100m + + # Custom size/height: + log_ground_plane(size=200.0, z=0.5, entity_path="world/floor") +""" + +import rerun as rr +import numpy as np + + +def log_ground_plane( + size: float = 100.0, + z: float = 0.0, + entity_path: str = "world/ground_plane", + opacity: int = 1, + color: tuple = (40, 40, 40), + subdivisions: int = 10, +) -> None: + """Log an invisible ground plane mesh for click-anywhere support. + + Args: + size: Half-extent of the plane in meters (default 100 = 200m x 200m total) + z: Height of the ground plane (default 0.0) + entity_path: Rerun entity path for the plane + opacity: Alpha value 0-255 (default 1 = nearly invisible) + color: RGB color tuple (default dark gray) + subdivisions: Grid subdivisions (more = better picking accuracy) + """ + # Generate a subdivided grid mesh for better picking resolution. + # A single quad has poor pick accuracy at large scales because the + # GPU interpolation produces imprecise barycentric coords. + steps = subdivisions + 1 + xs = np.linspace(-size, size, steps) + ys = np.linspace(-size, size, steps) + xx, yy = np.meshgrid(xs, ys) + zz = np.full_like(xx, z) + + # Vertices: (steps * steps) points + vertices = np.stack([xx.flatten(), yy.flatten(), zz.flatten()], axis=-1).astype(np.float32) + + # Triangles: 2 per grid cell + triangles = [] + for row in range(subdivisions): + for col in range(subdivisions): + i = row * steps + col + triangles.append([i, i + 1, i + steps]) + triangles.append([i + 1, i + steps + 1, i + steps]) + + triangle_indices = np.array(triangles, dtype=np.uint32) + + # RGBA color with near-zero alpha + rgba = [color[0], color[1], color[2], opacity] + vertex_colors = np.tile(rgba, (len(vertices), 1)).astype(np.uint8) + + rr.log( + entity_path, + rr.Mesh3D( + vertex_positions=vertices, + triangle_indices=triangle_indices, + vertex_colors=vertex_colors, + ), + ) diff --git a/examples/rust/custom_callback/python_bridge/test_bridge.py b/examples/rust/custom_callback/python_bridge/test_bridge.py new file mode 100644 index 000000000000..1b01421b8540 --- /dev/null +++ b/examples/rust/custom_callback/python_bridge/test_bridge.py @@ -0,0 +1,215 @@ +"""Unit tests for ViewerBridge.""" + +import unittest +import socket +import struct +import json +import threading +import time +from bridge import ViewerBridge, ClickEvent, WaypointRoute, InteractionMode + + +class MockViewerServer: + """Mock TCP server simulating the viewer side.""" + + def __init__(self, port: int = 8889): + self.port = port + self.server = None + self.client = None + self.thread = None + self.running = False + self.received_commands = [] + + def start(self): + """Start the mock server in a background thread.""" + self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.server.bind(("127.0.0.1", self.port)) + self.server.listen(1) + + self.running = True + self.thread = threading.Thread(target=self._accept_loop, daemon=True) + self.thread.start() + time.sleep(0.1) # Give server time to start + + def stop(self): + """Stop the mock server.""" + self.running = False + if self.client: + try: + self.client.close() + except: + pass + if self.server: + try: + self.server.close() + except: + pass + + def _accept_loop(self): + """Accept client connection.""" + try: + self.client, _ = self.server.accept() + except: + return + + def send_click_event(self, x: float, y: float, z: float, entity_path: str = None): + """Send a click event to the connected client.""" + event = { + "type": "Click", + "position": [x, y, z], + "entity_path": entity_path, + "view_id": "test_view", + "timestamp_ms": int(time.time() * 1000), + "is_2d": False, + } + self._send_json(event) + + def send_waypoint_complete(self, waypoints: list): + """Send a waypoint complete event.""" + event = { + "type": "WaypointComplete", + "waypoints": waypoints, + } + self._send_json(event) + + def _send_json(self, data: dict): + """Send JSON message with length prefix.""" + if not self.client: + raise RuntimeError("No client connected") + + payload = json.dumps(data).encode('utf-8') + length = struct.pack('>I', len(payload)) + self.client.sendall(length + payload) + + def read_command(self, timeout: float = 1.0) -> dict: + """Read a command sent by the client.""" + if not self.client: + raise RuntimeError("No client connected") + + self.client.settimeout(timeout) + try: + # Read length + length_data = self.client.recv(4) + if not length_data: + return None + + length = struct.unpack('>I', length_data)[0] + + # Read payload + payload = b'' + while len(payload) < length: + chunk = self.client.recv(length - len(payload)) + if not chunk: + return None + payload += chunk + + return json.loads(payload.decode('utf-8')) + + except socket.timeout: + return None + + +class TestViewerBridge(unittest.TestCase): + """Test cases for ViewerBridge.""" + + def setUp(self): + """Set up mock server and bridge for each test.""" + self.server = MockViewerServer(port=8889) + self.server.start() + + self.bridge = ViewerBridge(port=8889) + + self.click_events = [] + self.waypoint_events = [] + + def tearDown(self): + """Clean up after each test.""" + self.bridge.stop() + self.server.stop() + + def test_click_event_reception(self): + """Test receiving a click event from the viewer.""" + @self.bridge.on_click + def handle_click(event: ClickEvent): + self.click_events.append(event) + + # Start bridge in non-blocking mode + self.bridge.start(blocking=False) + time.sleep(0.2) # Wait for connection + + # Send click event from mock viewer + self.server.send_click_event(1.0, 2.0, 3.0, "world/robot") + + # Wait for event to be processed + time.sleep(0.2) + + # Verify event was received + self.assertEqual(len(self.click_events), 1) + event = self.click_events[0] + self.assertAlmostEqual(event.x, 1.0) + self.assertAlmostEqual(event.y, 2.0) + self.assertAlmostEqual(event.z, 3.0) + self.assertEqual(event.entity_path, "world/robot") + self.assertEqual(event.position, (1.0, 2.0, 3.0)) + + def test_waypoint_event_reception(self): + """Test receiving a waypoint complete event.""" + @self.bridge.on_waypoint_complete + def handle_route(route: WaypointRoute): + self.waypoint_events.append(route) + + self.bridge.start(blocking=False) + time.sleep(0.2) + + waypoints = [[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]] + self.server.send_waypoint_complete(waypoints) + + time.sleep(0.2) + + self.assertEqual(len(self.waypoint_events), 1) + route = self.waypoint_events[0] + self.assertEqual(len(route.waypoints), 3) + self.assertEqual(route.waypoints[0], (1.0, 2.0, 3.0)) + self.assertTrue(route.total_distance > 0) + + def test_send_command(self): + """Test sending commands to the viewer.""" + self.bridge.start(blocking=False) + time.sleep(0.2) + + # Send mode change command + self.bridge.set_mode(InteractionMode.CLICK) + + # Read command from mock server + cmd = self.server.read_command(timeout=1.0) + + self.assertIsNotNone(cmd) + self.assertEqual(cmd["type"], "SetMode") + self.assertEqual(cmd["mode"], "click") + + def test_multiple_handlers(self): + """Test that multiple handlers can be registered for the same event.""" + calls_1 = [] + calls_2 = [] + + @self.bridge.on_click + def handler1(event): + calls_1.append(event) + + @self.bridge.on_click + def handler2(event): + calls_2.append(event) + + self.bridge.start(blocking=False) + time.sleep(0.2) + + self.server.send_click_event(1.0, 2.0, 3.0) + time.sleep(0.2) + + self.assertEqual(len(calls_1), 1) + self.assertEqual(len(calls_2), 1) + + +if __name__ == "__main__": + unittest.main() diff --git a/examples/rust/custom_callback/python_bridge/test_bridge_bincode.py b/examples/rust/custom_callback/python_bridge/test_bridge_bincode.py new file mode 100644 index 000000000000..6632237aeadc --- /dev/null +++ b/examples/rust/custom_callback/python_bridge/test_bridge_bincode.py @@ -0,0 +1,286 @@ +"""Unit tests for ViewerBridge with bincode protocol.""" + +import unittest +import socket +import struct +import threading +import time +import os +from bincode_codec import ( + BincodeEncoder, BincodeDecoder, ClickEvent, WaypointCompleteEvent, + SetModeCommand, ClearWaypointsCommand, SetCursorCommand, + encode_app_command, decode_viewer_event, +) +from bridge_bincode import ViewerBridge, WaypointRoute, InteractionMode + + +def _free_port(): + """Get a free port from the OS.""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(('127.0.0.1', 0)) + return s.getsockname()[1] + + +class MockViewerClient: + """Mock TCP client that simulates the Rust viewer connecting to the Python bridge. + + In the real architecture: + - Python ViewerBridge is the SERVER (listens on port) + - Rust viewer is the CLIENT (connects to bridge) + + So the mock simulates the Rust side: connect, send events, receive commands. + """ + + def __init__(self): + self.socket = None + self.received_commands = [] + + def connect(self, host='127.0.0.1', port=8888, timeout=5.0): + """Connect to the bridge server.""" + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + try: + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.connect((host, port)) + return + except ConnectionRefusedError: + self.socket.close() + time.sleep(0.05) + raise ConnectionError(f"Could not connect to bridge at {host}:{port}") + + def send_click_event(self, x, y, z, entity_path=None, view_id="view_123", is_2d=False): + """Send a Click event to the bridge.""" + encoder = BincodeEncoder() + encoder.write_u32(0) # Click variant + encoder.write_f32_array3(x, y, z) + + if entity_path is None: + encoder.buffer.append(0) # Option::None + else: + encoder.buffer.append(1) # Option::Some + encoder.write_string(entity_path) + + encoder.write_string(view_id) + encoder.write_u64(int(time.time() * 1000)) + encoder.write_bool(is_2d) + + data = encoder.bytes() + length = struct.pack('>I', len(data)) + self.socket.sendall(length + data) + + def send_waypoint_complete_event(self, waypoints): + """Send a WaypointComplete event to the bridge.""" + encoder = BincodeEncoder() + encoder.write_u32(1) # WaypointComplete variant + encoder.write_vec_f32_array3(waypoints) + + data = encoder.bytes() + length = struct.pack('>I', len(data)) + self.socket.sendall(length + data) + + def recv_command(self, timeout=2.0): + """Receive a command from the bridge.""" + self.socket.settimeout(timeout) + + length_data = self._recv_exact(4) + if not length_data: + return None + + length = struct.unpack('>I', length_data)[0] + + data = self._recv_exact(length) + self.received_commands.append(data) + return data + + def _recv_exact(self, n): + data = b'' + while len(data) < n: + chunk = self.socket.recv(n - len(data)) + if not chunk: + return None + data += chunk + return data + + def close(self): + if self.socket: + try: + self.socket.close() + except Exception: + pass + self.socket = None + + +class TestViewerBridgeBincode(unittest.TestCase): + """Test suite for ViewerBridge with bincode protocol.""" + + def setUp(self): + self.port = _free_port() + self.bridge = None + self.mock = None + + def tearDown(self): + if self.mock: + self.mock.close() + if self.bridge: + self.bridge.stop() + # Give sockets time to release + time.sleep(0.05) + + def _start_bridge_and_connect(self): + """Helper: start bridge in background, connect mock client.""" + self.bridge = ViewerBridge(port=self.port) + # Start bridge (server) in background + self.bridge.start(blocking=False) + time.sleep(0.1) + + # Connect mock viewer client + self.mock = MockViewerClient() + self.mock.connect(port=self.port) + time.sleep(0.1) + + def test_click_event_reception(self): + """Test receiving and handling Click events.""" + click_received = threading.Event() + received_event = None + + self.bridge = ViewerBridge(port=self.port) + + def handle_click(event): + nonlocal received_event + received_event = event + click_received.set() + + self.bridge.on_click(handle_click) + self.bridge.start(blocking=False) + time.sleep(0.1) + + self.mock = MockViewerClient() + self.mock.connect(port=self.port) + time.sleep(0.1) + + # Send click event from mock viewer + self.mock.send_click_event(1.5, 2.5, 3.5, entity_path="world/robot", view_id="3d_view") + + self.assertTrue(click_received.wait(timeout=2.0), "Click event not received") + + self.assertIsNotNone(received_event) + self.assertAlmostEqual(received_event.position[0], 1.5, places=1) + self.assertAlmostEqual(received_event.position[1], 2.5, places=1) + self.assertAlmostEqual(received_event.position[2], 3.5, places=1) + self.assertEqual(received_event.entity_path, "world/robot") + self.assertEqual(received_event.view_id, "3d_view") + self.assertFalse(received_event.is_2d) + + def test_waypoint_complete_reception(self): + """Test receiving and handling WaypointComplete events.""" + waypoint_received = threading.Event() + received_route = None + + self.bridge = ViewerBridge(port=self.port) + + def handle_waypoints(route): + nonlocal received_route + received_route = route + waypoint_received.set() + + self.bridge.on_waypoint_complete(handle_waypoints) + self.bridge.start(blocking=False) + time.sleep(0.1) + + self.mock = MockViewerClient() + self.mock.connect(port=self.port) + time.sleep(0.1) + + waypoints = [(1.0, 2.0, 3.0), (4.0, 5.0, 6.0), (7.0, 8.0, 9.0)] + self.mock.send_waypoint_complete_event(waypoints) + + self.assertTrue(waypoint_received.wait(timeout=2.0), "Waypoint event not received") + + self.assertIsNotNone(received_route) + self.assertEqual(len(received_route.waypoints), 3) + self.assertAlmostEqual(received_route.waypoints[0][0], 1.0, places=1) + self.assertAlmostEqual(received_route.waypoints[1][0], 4.0, places=1) + self.assertAlmostEqual(received_route.waypoints[2][0], 7.0, places=1) + self.assertGreater(received_route.total_distance, 0) + + def test_send_set_mode_command(self): + """Test sending SetMode command.""" + self._start_bridge_and_connect() + + self.bridge.set_mode(InteractionMode.WAYPOINT) + + cmd_data = self.mock.recv_command(timeout=2.0) + self.assertIsNotNone(cmd_data, "Command not received") + + decoder = BincodeDecoder(cmd_data) + variant = decoder.read_u32() + mode = decoder.read_string() + + self.assertEqual(variant, 0) # SetMode variant + self.assertEqual(mode, "waypoint") + + def test_send_clear_waypoints_command(self): + """Test sending ClearWaypoints command.""" + self._start_bridge_and_connect() + + self.bridge.clear_waypoints() + + cmd_data = self.mock.recv_command(timeout=2.0) + self.assertIsNotNone(cmd_data) + + decoder = BincodeDecoder(cmd_data) + variant = decoder.read_u32() + self.assertEqual(variant, 1) # ClearWaypoints variant + + def test_send_set_cursor_command(self): + """Test sending SetCursor command.""" + self._start_bridge_and_connect() + + self.bridge.set_cursor("crosshair") + + cmd_data = self.mock.recv_command(timeout=2.0) + self.assertIsNotNone(cmd_data) + + decoder = BincodeDecoder(cmd_data) + variant = decoder.read_u32() + cursor = decoder.read_string() + + self.assertEqual(variant, 2) # SetCursor variant + self.assertEqual(cursor, "crosshair") + + def test_multiple_click_handlers(self): + """Test registering multiple click handlers.""" + clicks = [] + done = threading.Event() + + self.bridge = ViewerBridge(port=self.port) + + @self.bridge.on_click + def handler1(event): + clicks.append(("handler1", event.position)) + if len(clicks) >= 2: + done.set() + + @self.bridge.on_click + def handler2(event): + clicks.append(("handler2", event.position)) + if len(clicks) >= 2: + done.set() + + self.bridge.start(blocking=False) + time.sleep(0.1) + + self.mock = MockViewerClient() + self.mock.connect(port=self.port) + time.sleep(0.1) + + self.mock.send_click_event(1.0, 2.0, 3.0) + + self.assertTrue(done.wait(timeout=2.0), "Handlers not called") + self.assertEqual(len(clicks), 2) + self.assertEqual(clicks[0][0], "handler1") + self.assertEqual(clicks[1][0], "handler2") + + +if __name__ == '__main__': + unittest.main() diff --git a/examples/rust/custom_callback/src/interaction/handle.rs b/examples/rust/custom_callback/src/interaction/handle.rs new file mode 100644 index 000000000000..929c3ff12e3c --- /dev/null +++ b/examples/rust/custom_callback/src/interaction/handle.rs @@ -0,0 +1,131 @@ +use tokio::sync::mpsc; +use super::protocol::ViewerEvent; + +/// Handle for sending interaction events from the viewer to the application. +/// +/// This is designed to be cheap to clone and thread-safe, so it can be embedded +/// in ViewerContext and shared across all views and UI components. +#[derive(Clone)] +pub struct InteractionHandle { + tx: mpsc::UnboundedSender, +} + +impl InteractionHandle { + /// Create a new handle from a channel sender. + pub fn new(tx: mpsc::UnboundedSender) -> Self { + Self { tx } + } + + /// Send a click event to the application. + pub fn send_click( + &self, + position: [f32; 3], + entity_path: Option, + view_id: String, + is_2d: bool, + ) { + let event = ViewerEvent::Click { + position, + entity_path, + view_id, + timestamp_ms: std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64, + is_2d, + }; + + // Log if send fails, but don't panic + if let Err(e) = self.tx.send(event) { + eprintln!("Failed to send click event: {}", e); + } + } + + /// Send a waypoint completion event. + pub fn send_waypoint_complete(&self, waypoints: Vec<[f32; 3]>) { + let event = ViewerEvent::WaypointComplete { waypoints }; + + if let Err(e) = self.tx.send(event) { + eprintln!("Failed to send waypoint complete event: {}", e); + } + } + + /// Send a mode changed event. + pub fn send_mode_changed(&self, mode: String) { + let event = ViewerEvent::ModeChanged { mode }; + + if let Err(e) = self.tx.send(event) { + eprintln!("Failed to send mode changed event: {}", e); + } + } + + /// Send a disconnect event. + pub fn send_disconnect(&self) { + let event = ViewerEvent::Disconnect; + + let _ = self.tx.send(event); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_handle_send_click() { + let (tx, mut rx) = mpsc::unbounded_channel(); + let handle = InteractionHandle::new(tx); + + handle.send_click( + [1.0, 2.0, 3.0], + Some("world/robot".to_string()), + "view_123".to_string(), + false, + ); + + let event = rx.try_recv().unwrap(); + match event { + ViewerEvent::Click { position, entity_path, view_id, is_2d, .. } => { + assert_eq!(position, [1.0, 2.0, 3.0]); + assert_eq!(entity_path, Some("world/robot".to_string())); + assert_eq!(view_id, "view_123"); + assert!(!is_2d); + } + _ => panic!("Expected Click event"), + } + } + + #[test] + fn test_handle_send_waypoint_complete() { + let (tx, mut rx) = mpsc::unbounded_channel(); + let handle = InteractionHandle::new(tx); + + let waypoints = vec![[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]; + handle.send_waypoint_complete(waypoints.clone()); + + let event = rx.try_recv().unwrap(); + match event { + ViewerEvent::WaypointComplete { waypoints: w } => { + assert_eq!(w, waypoints); + } + _ => panic!("Expected WaypointComplete event"), + } + } + + #[test] + fn test_handle_is_cloneable() { + let (tx, mut rx) = mpsc::unbounded_channel(); + let handle1 = InteractionHandle::new(tx); + let handle2 = handle1.clone(); + + // Both handles should work + handle1.send_mode_changed("click".to_string()); + handle2.send_mode_changed("waypoint".to_string()); + + let event1 = rx.try_recv().unwrap(); + let event2 = rx.try_recv().unwrap(); + + assert!(matches!(event1, ViewerEvent::ModeChanged { .. })); + assert!(matches!(event2, ViewerEvent::ModeChanged { .. })); + } +} diff --git a/examples/rust/custom_callback/src/interaction/mod.rs b/examples/rust/custom_callback/src/interaction/mod.rs new file mode 100644 index 000000000000..c6f1bef0c479 --- /dev/null +++ b/examples/rust/custom_callback/src/interaction/mod.rs @@ -0,0 +1,7 @@ +pub mod handle; +pub mod protocol; +pub mod sender; + +pub use handle::InteractionHandle; +pub use protocol::{ViewerEvent, AppCommand}; +pub use sender::{ViewerEventSender, ViewerEventSenderHandle}; diff --git a/examples/rust/custom_callback/src/interaction/protocol.rs b/examples/rust/custom_callback/src/interaction/protocol.rs new file mode 100644 index 000000000000..32efad37325a --- /dev/null +++ b/examples/rust/custom_callback/src/interaction/protocol.rs @@ -0,0 +1,86 @@ +use std::io::{self, ErrorKind}; +use serde::{Deserialize, Serialize}; + +/// Events sent from the viewer to the application. +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub enum ViewerEvent { + /// User clicked in a spatial view. + Click { + position: [f32; 3], + entity_path: Option, + view_id: String, + timestamp_ms: u64, + is_2d: bool, + }, + + /// Waypoint sequence completed. + WaypointComplete { + waypoints: Vec<[f32; 3]>, + }, + + /// Interaction mode changed. + ModeChanged { + mode: String, + }, + + /// Viewer is disconnecting. + Disconnect, +} + +/// Commands sent from the application to the viewer. +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub enum AppCommand { + /// Change the interaction mode. + SetMode { + mode: String, + }, + + /// Clear all waypoint markers. + ClearWaypoints, + + /// Set the cursor style. + SetCursor { + cursor: String, + }, +} + +impl ViewerEvent { + pub fn encode(&self) -> io::Result> { + bincode::serialize(self).map_err(|err| io::Error::new(ErrorKind::InvalidData, err)) + } + + pub fn decode(data: &[u8]) -> io::Result { + bincode::deserialize(data).map_err(|err| io::Error::new(ErrorKind::InvalidData, err)) + } +} + +impl AppCommand { + pub fn encode(&self) -> io::Result> { + bincode::serialize(self).map_err(|err| io::Error::new(ErrorKind::InvalidData, err)) + } + + pub fn decode(data: &[u8]) -> io::Result { + bincode::deserialize(data).map_err(|err| io::Error::new(ErrorKind::InvalidData, err)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_viewer_event_click_roundtrip() { + let event = ViewerEvent::Click { + position: [1.0, 2.0, 3.0], + entity_path: Some("world/robot".to_string()), + view_id: "view_123".to_string(), + timestamp_ms: 1234567890, + is_2d: false, + }; + + let encoded = event.encode().unwrap(); + let decoded = ViewerEvent::decode(&encoded).unwrap(); + + assert_eq!(event, decoded); + } +} diff --git a/examples/rust/custom_callback/src/interaction/sender.rs b/examples/rust/custom_callback/src/interaction/sender.rs new file mode 100644 index 000000000000..5157ccbfe9ad --- /dev/null +++ b/examples/rust/custom_callback/src/interaction/sender.rs @@ -0,0 +1,103 @@ +use std::sync::Arc; +use std::time::Duration; + +use rerun::external::re_log; +use tokio::io::AsyncWriteExt; +use tokio::net::TcpStream; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}; +use tokio::sync::Mutex; + +use super::protocol::ViewerEvent; + +/// Sends ViewerEvents to a Python bridge over TCP. +/// +/// This sender connects to the Python ViewerBridge (which acts as a server) +/// and sends length-prefixed bincode messages. +#[derive(Debug)] +pub struct ViewerEventSender { + address: String, + tx: UnboundedSender, + rx: Arc>>, +} + +/// A cloneable handle for sending events. +#[derive(Clone)] +pub struct ViewerEventSenderHandle { + tx: UnboundedSender, +} + +impl ViewerEventSenderHandle { + /// Send a ViewerEvent (non-blocking, queued). + pub fn send(&self, event: ViewerEvent) -> Result<(), tokio::sync::mpsc::error::SendError> { + self.tx.send(event) + } +} + +impl ViewerEventSender { + /// Create a new sender (not yet connected). + pub fn new(address: String) -> Self { + #[expect(clippy::disallowed_methods)] + let (tx, rx) = unbounded_channel(); + Self { + address, + tx, + rx: Arc::new(Mutex::new(rx)), + } + } + + /// Get a cloneable handle for sending events. + pub fn handle(&self) -> ViewerEventSenderHandle { + ViewerEventSenderHandle { + tx: self.tx.clone(), + } + } + + /// Run the sender (connect and send events in a loop). + /// This should be spawned in a tokio task. + pub async fn run(self) { + re_log::info!("ViewerEventSender: Starting"); + + loop { + match TcpStream::connect(&self.address).await { + Ok(mut socket) => { + re_log::info!("ViewerEventSender: Connected to {}", self.address); + + // Send events until connection fails + let mut rx = self.rx.lock().await; + while let Some(event) = rx.recv().await { + match Self::send_event(&mut socket, &event).await { + Ok(()) => { + re_log::debug!("ViewerEventSender: Sent event: {:?}", event); + } + Err(err) => { + re_log::error!("ViewerEventSender: Failed to send event: {:?}", err); + break; // Connection lost, reconnect + } + } + } + drop(rx); // Release lock before reconnecting + } + Err(err) => { + re_log::error!("ViewerEventSender: Failed to connect to {}: {:?}", self.address, err); + } + } + + // Wait before reconnecting + tokio::time::sleep(Duration::from_secs(2)).await; + } + } + + /// Send a single event (length-prefixed bincode). + async fn send_event(socket: &mut TcpStream, event: &ViewerEvent) -> tokio::io::Result<()> { + let encoded = event.encode()?; + let len = encoded.len() as u32; + + // Send length prefix (4 bytes, big-endian) + socket.write_all(&len.to_be_bytes()).await?; + + // Send payload + socket.write_all(&encoded).await?; + + Ok(()) + } +} diff --git a/examples/rust/custom_callback/src/lib.rs b/examples/rust/custom_callback/src/lib.rs index 78e7ab722a40..a4f218c74871 100644 --- a/examples/rust/custom_callback/src/lib.rs +++ b/examples/rust/custom_callback/src/lib.rs @@ -1,2 +1,4 @@ pub mod comms; pub mod panel; + +pub mod interaction; diff --git a/examples/rust/custom_callback/src/viewer.rs b/examples/rust/custom_callback/src/viewer.rs index 8fcd083199cf..cf1cf8030a7e 100644 --- a/examples/rust/custom_callback/src/viewer.rs +++ b/examples/rust/custom_callback/src/viewer.rs @@ -1,4 +1,9 @@ +use std::cell::RefCell; +use std::rc::Rc; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; + use custom_callback::comms::viewer::ControlViewer; +use custom_callback::interaction::{ViewerEvent, ViewerEventSender}; use custom_callback::panel::Control; use rerun::external::{eframe, re_crash_handler, re_grpc_server, re_log, re_memory, re_viewer}; @@ -9,8 +14,14 @@ use rerun::external::{eframe, re_crash_handler, re_grpc_server, re_log, re_memor static GLOBAL: re_memory::AccountingAllocator = re_memory::AccountingAllocator::new(mimalloc::MiMalloc); -/// Port used for control messages -const CONTROL_PORT: u16 = 8888; +/// Port used for control messages (old protocol) +const CONTROL_PORT: u16 = 8889; +/// Port used for sending click events to Python bridge (new protocol) +const BRIDGE_PORT: u16 = 8888; +/// Minimum time between click events (debouncing) +const CLICK_DEBOUNCE_MS: u64 = 100; +/// Maximum rapid clicks to log as warning +const RAPID_CLICK_THRESHOLD: usize = 5; #[tokio::main] async fn main() -> Result<(), Box> { @@ -30,7 +41,7 @@ async fn main() -> Result<(), Box> { re_grpc_server::shutdown::never(), ); - // First we attempt to connect to the external application + // Connect to the external application (old demo protocol on port 8889) let viewer = ControlViewer::connect(format!("127.0.0.1:{CONTROL_PORT}")).await?; let handle = viewer.handle(); @@ -39,6 +50,19 @@ async fn main() -> Result<(), Box> { viewer.run().await; }); + // Create ViewerEventSender for sending click events to Python bridge (port 8888) + let event_sender = ViewerEventSender::new(format!("127.0.0.1:{BRIDGE_PORT}")); + let event_sender_handle = event_sender.handle(); + + // Spawn the event sender + tokio::spawn(async move { + event_sender.run().await; + }); + + // State for debouncing and rapid click detection + let last_click_time = Rc::new(RefCell::new(Instant::now())); + let rapid_click_count = Rc::new(RefCell::new(0usize)); + // Then we start the Rerun viewer let mut native_options = re_viewer::native::eframe_options(None); native_options.viewport = native_options @@ -48,8 +72,106 @@ async fn main() -> Result<(), Box> { // This is used for analytics, if the `analytics` feature is on in `Cargo.toml` let app_env = re_viewer::AppEnvironment::Custom("My Custom Callback".to_owned()); - let startup_options = re_viewer::StartupOptions::default(); - let window_title = "Rerun Control Panel"; + let startup_options = re_viewer::StartupOptions { + on_event: Some(Rc::new({ + let last_click_time = last_click_time.clone(); + let rapid_click_count = rapid_click_count.clone(); + + move |event: re_viewer::ViewerEvent| { + // Handle selection changes with position data + if let re_viewer::ViewerEventKind::SelectionChange { items } = event.kind { + let mut has_position = false; + let mut no_position_count = 0; + + for item in items { + match item { + re_viewer::SelectionChangeItem::Entity { + entity_path, + view_name, + position: Some(pos), + .. + } => { + has_position = true; + + // Debouncing: check time since last click + let now = Instant::now(); + let elapsed = now.duration_since(*last_click_time.borrow()); + + if elapsed < Duration::from_millis(CLICK_DEBOUNCE_MS) { + // Rapid click detected + let mut count = rapid_click_count.borrow_mut(); + *count += 1; + + if *count == RAPID_CLICK_THRESHOLD { + re_log::warn!( + "Rapid click detected ({} clicks within {}ms). Events may be dropped.", + RAPID_CLICK_THRESHOLD, + CLICK_DEBOUNCE_MS + ); + } + + // Skip this click event (debounced) + continue; + } else { + // Reset rapid click counter + *rapid_click_count.borrow_mut() = 0; + } + + // Update last click time + *last_click_time.borrow_mut() = now; + + // Get current timestamp + let timestamp_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + + // Convert to ViewerEvent::Click + let click_event = ViewerEvent::Click { + position: [pos.x, pos.y, pos.z], + entity_path: Some(entity_path.to_string()), + view_id: view_name.unwrap_or_else(|| "unknown_view".to_string()), + timestamp_ms, + is_2d: pos.z.abs() < 0.001, // Heuristic: if z is near 0, it's 2D + }; + + // Send to Python bridge + if let Err(err) = event_sender_handle.send(click_event) { + re_log::error!("Failed to send click event: {:?}", err); + } else { + re_log::debug!( + "Click event sent: entity={}, pos=({:.2}, {:.2}, {:.2})", + entity_path, + pos.x, + pos.y, + pos.z + ); + } + } + re_viewer::SelectionChangeItem::Entity { position: None, .. } => { + // Entity selection without position data (hover, keyboard nav, etc.) + no_position_count += 1; + } + _ => { + // Other selection types (space view, data result, etc.) + } + } + } + + // Log edge cases for debugging + if !has_position && no_position_count > 0 { + re_log::trace!( + "Selection change without position data ({} items). This is normal for hover/keyboard navigation.", + no_position_count + ); + } + } + } + })), + ..Default::default() + }; + + let window_title = "Rerun Interactive Viewer"; eframe::run_native( window_title, native_options,