diff --git a/embodichain/lab/sim/cfg.py b/embodichain/lab/sim/cfg.py index 0b10a725..fd653d35 100644 --- a/embodichain/lab/sim/cfg.py +++ b/embodichain/lab/sim/cfg.py @@ -181,6 +181,23 @@ class WindowRecordCfg: """Video file prefix used when no explicit save path is provided.""" +@configclass +class WindowCaptureCfg: + """Configuration for interactive viewer window capture.""" + + enable_hotkey: bool = True + """Whether to register the ``p`` hotkey for single-frame capture when the window opens.""" + + save_path: str | None = None + """Optional output image path. If None, use the default outputs directory with a timestamp.""" + + image_prefix: str = "window_capture" + """Image file prefix used when no explicit save path is provided.""" + + use_window_pose: bool = True + """Whether the hidden capture camera follows the current viewer pose.""" + + @configclass class GPUMemoryCfg: """A gpu memory configuration dataclass that neatly holds all parameters that configure physics GPU memory for simulation""" diff --git a/embodichain/lab/sim/sim_manager.py b/embodichain/lab/sim/sim_manager.py index 8f2e257f..fbfb13c2 100644 --- a/embodichain/lab/sim/sim_manager.py +++ b/embodichain/lab/sim/sim_manager.py @@ -53,7 +53,7 @@ from dexsim.core import TASK_RETURN from dexsim.engine import CudaArray, Material from dexsim.models import MeshObject -from dexsim.render import Light as _Light, LightType, Windows +from dexsim.render import Light as _Light, LightType, Windows, ViewFlags from dexsim.engine import GizmoController, ObjectManipulator from embodichain.lab.sim.objects import ( @@ -79,6 +79,7 @@ MarkerCfg, GPUMemoryCfg, WindowRecordCfg, + WindowCaptureCfg, LightCfg, RigidObjectCfg, SoftObjectCfg, @@ -88,6 +89,7 @@ RobotCfg, ) from embodichain.lab.sim import VisualMaterial, VisualMaterialCfg +from embodichain.utils.math import look_at_to_pose from embodichain.utils import configclass, logger __all__ = [ @@ -152,6 +154,32 @@ class SimulationManagerCfg: window_record: WindowRecordCfg = field(default_factory=WindowRecordCfg) """Viewer window recording settings (hotkey, paths, FPS, memory budget).""" + window_capture: WindowCaptureCfg = field(default_factory=WindowCaptureCfg) + """Viewer window single-frame capture settings (hotkey, paths).""" + + +@dataclass +class _WindowCaptureRequest: + """Pending single-frame window capture parameters.""" + + save_path: str | None + width: int | None + height: int | None + camera_pose: np.ndarray | torch.Tensor | None + eye: Sequence[float] | None + target: Sequence[float] | None + up: Sequence[float] | None + use_window_pose: bool + + +@dataclass +class _WindowCaptureAsyncState: + """Internal state for async viewer-window capture.""" + + pending: _WindowCaptureRequest | None = None + task_status: int = TASK_RETURN.TASK_LOOP + loop_handle: object | None = None + @dataclass class _WindowRecordState: @@ -241,6 +269,8 @@ def __init__( self._is_registered_window_control = False self._window_record_state: _WindowRecordState | None = None self._window_record_camera: object | None = None + self._window_capture_camera: object | None = None + self._window_capture_camera_size: tuple[int, int] | None = None wr = sim_config.window_record self._window_record_hotkey_cfg: dict[str, object] | None = ( { @@ -254,6 +284,19 @@ def __init__( ) self._window_record_input_control: ObjectManipulator | None = None self._window_record_save_threads: list[threading.Thread] = [] + self._window_capture_async_state: _WindowCaptureAsyncState | None = None + self._window_capture_input_control: ObjectManipulator | None = None + self._window_capture_save_threads: list[threading.Thread] = [] + wc = sim_config.window_capture + self._window_capture_hotkey_cfg: dict[str, object] | None = ( + { + "save_path": wc.save_path, + "image_prefix": wc.image_prefix, + "use_window_pose": wc.use_window_pose, + } + if wc.enable_hotkey + else None + ) self._world.set_delta_time(sim_config.physics_dt) self._world.show_coordinate_axis(False) @@ -552,15 +595,22 @@ def open_window(self) -> None: # and self._window_record_input_control is None # ): # self.enable_window_record_hotkey(**self._window_record_hotkey_cfg) + if ( + self._window_capture_hotkey_cfg is not None + and self._window_capture_input_control is None + ): + self.enable_window_capture_hotkey(**self._window_capture_hotkey_cfg) self.is_window_opened = True def close_window(self) -> None: """Close the simulation window.""" if self.is_window_recording(): self.stop_window_record() + self._stop_window_capture_async_loop() self._world.close_window() self._window = None self._window_record_input_control = None + self._window_capture_input_control = None self.is_window_opened = False def _build_multiple_arenas(self, num: int, space: float | None = None) -> None: @@ -1695,10 +1745,331 @@ def _build_window_record_output( video_name = Path(os.path.basename(save_path)).stem return output_dir, video_name + def _build_window_capture_output( + self, save_path: str | None, image_prefix: str + ) -> str: + """Resolve the output image path for a single-frame window capture.""" + if save_path is not None: + return save_path + output_dir = os.path.join(os.getcwd(), "outputs", "window_capture") + timestamp = datetime.now().strftime("%Y-%m-%d-%H-%M-%S") + return os.path.join(output_dir, f"{image_prefix}_{timestamp}.png") + def is_window_recording(self) -> bool: """Check whether the viewer window is currently recording.""" return self._window_record_state is not None + def _get_window_capture_camera(self, width: int, height: int) -> object: + """Get or create the hidden camera used for window capture.""" + camera_size = (width, height) + if ( + self._window_capture_camera is None + or self._window_capture_camera_size != camera_size + ): + # TODO: Will change ViewFlags to ViewFlags.COLOR in the future. + camera_name = f"window_capture_camera_{self.instance_id}" + self._window_capture_camera = self._env.create_camera( + camera_name, width, height, True, ViewFlags.ALL + ) + self._window_capture_camera_size = camera_size + return self._window_capture_camera + + def _capture_rgb_from_camera( + self, record_camera: object, camera_pose: np.ndarray + ) -> np.ndarray | None: + """Render an RGB frame from a hidden DexSim camera.""" + if hasattr(record_camera, "is_open") and record_camera.is_open() is False: + record_camera.open_camera() + + record_camera.set_world_pose(camera_pose) + record_camera.render() + rgb = np.asarray(record_camera.get_rgb_map()) + if rgb.size == 0: + return None + return np.ascontiguousarray(rgb[..., :3]) + + def _resolve_window_capture_pose( + self, + camera_pose: np.ndarray | torch.Tensor | None, + eye: Sequence[float] | None, + target: Sequence[float] | None, + up: Sequence[float] | None, + use_window_pose: bool, + ) -> np.ndarray: + """Resolve the camera pose for single-frame window capture.""" + if camera_pose is not None: + if isinstance(camera_pose, torch.Tensor): + return camera_pose.detach().cpu().numpy().astype(np.float32) + return np.asarray(camera_pose, dtype=np.float32) + + if use_window_pose and self._window is not None: + return np.asarray(self._window.get_pose_matrix(), dtype=np.float32) + + if eye is None: + eye = (2.5, -3.0, 2.0) + if target is None: + target = (0.0, 0.0, 0.5) + if up is None: + up = (0.0, 0.0, 1.0) + + pose = look_at_to_pose(eye, target, up) + pose[:, :3, 1] = -pose[:, :3, 1] + pose[:, :3, 2] = -pose[:, :3, 2] + return pose.squeeze(0).cpu().numpy().astype(np.float32) + + def _render_window_capture_frame( + self, + width: int | None = None, + height: int | None = None, + camera_pose: np.ndarray | torch.Tensor | None = None, + eye: Sequence[float] | None = None, + target: Sequence[float] | None = None, + up: Sequence[float] | None = None, + use_window_pose: bool = True, + ) -> np.ndarray | None: + """Render a single RGB frame from the hidden window capture camera.""" + width = self.sim_config.width if width is None else width + height = self.sim_config.height if height is None else height + record_camera = self._get_window_capture_camera(width, height) + resolved_pose = self._resolve_window_capture_pose( + camera_pose=camera_pose, + eye=eye, + target=target, + up=up, + use_window_pose=use_window_pose, + ) + return self._capture_rgb_from_camera(record_camera, resolved_pose) + + @staticmethod + def _save_window_capture_frame(frame: np.ndarray, save_path: str) -> None: + """Save a captured RGB frame to disk.""" + from PIL import Image + + output_dir = os.path.dirname(save_path) + if output_dir: + os.makedirs(output_dir, exist_ok=True) + Image.fromarray(frame).save(save_path) + + def _save_window_capture_worker(self, frame: np.ndarray, save_path: str) -> None: + """Save a captured frame in a background thread.""" + try: + self._save_window_capture_frame(frame, save_path) + logger.log_info(f"Window capture saved to {save_path}") + except Exception as exc: + logger.log_error(f"Failed to save window capture: {exc}") + + def capture_window( + self, + save_path: str | None = None, + width: int | None = None, + height: int | None = None, + camera_pose: np.ndarray | torch.Tensor | None = None, + eye: Sequence[float] | None = None, + target: Sequence[float] | None = None, + up: Sequence[float] | None = None, + use_window_pose: bool = True, + ) -> np.ndarray | None: + """Capture a single RGB frame using a hidden render camera. + + When a viewer window is open, the hidden camera follows the current window + pose by default. In headless mode, pass ``camera_pose`` or use the + ``eye``/``target``/``up`` look-at parameters to render without opening a + window. + + Args: + save_path: Optional image path. Parent directories are created. + width: Capture width. Defaults to the simulation window width. + height: Capture height. Defaults to the simulation window height. + camera_pose: Optional 4x4 world pose for the hidden camera. + eye: Optional look-at camera position used when no window pose exists. + target: Optional look-at target used when no window pose exists. + up: Optional look-at up vector used when no window pose exists. + use_window_pose: Whether to follow the current viewer pose when a + window is open and ``camera_pose`` is not provided. + + Returns: + Captured RGB frame with shape ``(height, width, 3)``, or ``None`` if + rendering did not produce an image. + """ + frame = self._render_window_capture_frame( + width=width, + height=height, + camera_pose=camera_pose, + eye=eye, + target=target, + up=up, + use_window_pose=use_window_pose, + ) + if frame is None: + logger.log_warning("Window capture did not produce an RGB frame.") + return None + + if save_path is not None: + self._save_window_capture_frame(frame, save_path) + logger.log_info(f"Window capture saved to {save_path}") + + return frame + + def _ensure_window_capture_async_loop(self) -> None: + """Start the render-thread loop that processes pending capture requests.""" + if self._window_capture_async_state is not None: + return + + state = _WindowCaptureAsyncState() + + def _window_capture_loop(_: float) -> int: + return self._step_window_capture_async(state) + + state.loop_handle = self._world.thread_rt().add_loop( + _window_capture_loop, 1.0 / 60.0 + ) + self._window_capture_async_state = state + + def _stop_window_capture_async_loop(self) -> None: + """Stop the async window capture loop and clear pending requests.""" + if self._window_capture_async_state is None: + return + state = self._window_capture_async_state + state.task_status = TASK_RETURN.TASK_EXIT + state.pending = None + self._window_capture_async_state = None + + def _step_window_capture_async(self, state: _WindowCaptureAsyncState) -> int: + """Process a pending capture request on the render thread.""" + if state.task_status != TASK_RETURN.TASK_LOOP: + return state.task_status + if state.pending is None: + return state.task_status + + request = state.pending + state.pending = None + frame = self._render_window_capture_frame( + width=request.width, + height=request.height, + camera_pose=request.camera_pose, + eye=request.eye, + target=request.target, + up=request.up, + use_window_pose=request.use_window_pose, + ) + if frame is None: + logger.log_warning("Window capture did not produce an RGB frame.") + return state.task_status + + save_path = request.save_path + if save_path is None: + return state.task_status + + self._window_capture_save_threads = [ + thread for thread in self._window_capture_save_threads if thread.is_alive() + ] + save_thread = threading.Thread( + target=self._save_window_capture_worker, + args=(frame.copy(), save_path), + daemon=False, + ) + save_thread.start() + self._window_capture_save_threads.append(save_thread) + return state.task_status + + def request_window_capture_async( + self, + save_path: str | None = None, + width: int | None = None, + height: int | None = None, + camera_pose: np.ndarray | torch.Tensor | None = None, + eye: Sequence[float] | None = None, + target: Sequence[float] | None = None, + up: Sequence[float] | None = None, + use_window_pose: bool = True, + image_prefix: str = "window_capture", + ) -> bool: + """Queue a single-frame window capture on the render thread. + + The capture uses the current viewer pose when a window is open. Saving to + disk is performed in a background thread so the UI loop is not blocked. + + Args: + save_path: Optional image path. If None, a timestamped path is used. + width: Capture width. Defaults to the simulation window width. + height: Capture height. Defaults to the simulation window height. + camera_pose: Optional 4x4 world pose for the hidden camera. + eye: Optional look-at camera position used when no window pose exists. + target: Optional look-at target used when no window pose exists. + up: Optional look-at up vector used when no window pose exists. + use_window_pose: Whether to follow the current viewer pose when a + window is open and ``camera_pose`` is not provided. + image_prefix: Image file prefix when ``save_path`` is None. + + Returns: + True if the capture request was queued, False otherwise. + """ + if self._window is None: + logger.log_warning( + "No simulation window available for async window capture." + ) + return False + + resolved_save_path = self._build_window_capture_output(save_path, image_prefix) + self._ensure_window_capture_async_loop() + if self._window_capture_async_state is None: + return False + + self._window_capture_async_state.pending = _WindowCaptureRequest( + save_path=resolved_save_path, + width=width, + height=height, + camera_pose=camera_pose, + eye=eye, + target=target, + up=up, + use_window_pose=use_window_pose, + ) + logger.log_info( + f"Window capture queued. It will be saved to {resolved_save_path}." + ) + return True + + def enable_window_capture_hotkey( + self, + save_path: str | None = None, + image_prefix: str = "window_capture", + use_window_pose: bool = True, + ) -> bool: + """Register the ``p`` key to queue a single-frame window capture.""" + self._window_capture_hotkey_cfg = { + "save_path": save_path, + "image_prefix": image_prefix, + "use_window_pose": use_window_pose, + } + if self._window is None: + logger.log_warning( + "No simulation window available yet. The window capture hotkey " + "will be registered after `open_window()`." + ) + return False + if self._window_capture_input_control is not None: + self._ensure_window_capture_async_loop() + return True + + from dexsim.types import InputKey + + sim = self + hotkey_cfg = dict(self._window_capture_hotkey_cfg) + + class WindowCaptureEvent(ObjectManipulator): + def on_key_down(self, key): + if key == InputKey.SCANCODE_P.value: + sim.request_window_capture_async(**hotkey_cfg) + + self._window_capture_input_control = WindowCaptureEvent() + self._window.add_input_control(self._window_capture_input_control) + self._ensure_window_capture_async_loop() + logger.log_info( + "Window capture hotkey registered. Press 'p' to capture the current view." + ) + return True + def _step_window_record(self, state: _WindowRecordState) -> int: """Capture frames in the render thread without blocking the UI loop.""" if state.task_status != TASK_RETURN.TASK_LOOP: @@ -1712,11 +2083,7 @@ def _step_window_record(self, state: _WindowRecordState) -> int: frame: np.ndarray | None = None if self._window is not None and state.record_camera is not None: pose = np.asarray(self._window.get_pose_matrix(), dtype=np.float32) - state.record_camera.set_world_pose(pose) - state.record_camera.render() - rgb = np.asarray(state.record_camera.get_rgb_map()) - if rgb.size != 0: - frame = np.ascontiguousarray(rgb[..., :3]) + frame = self._capture_rgb_from_camera(state.record_camera, pose) if frame is None: return state.task_status @@ -1773,7 +2140,7 @@ def start_window_record( if self._window_record_camera is None: camera_name = f"viewer_record_camera_{self.instance_id}" self._window_record_camera = self._env.create_camera( - camera_name, width, height + camera_name, width, height, True, ViewFlags.ALL ) record_camera = self._window_record_camera if hasattr(record_camera, "is_open") and record_camera.is_open() is False: diff --git a/scripts/tutorials/sim/create_scene.py b/scripts/tutorials/sim/create_scene.py index b8f6c727..b7507f7b 100644 --- a/scripts/tutorials/sim/create_scene.py +++ b/scripts/tutorials/sim/create_scene.py @@ -29,6 +29,8 @@ from embodichain.lab.gym.utils.gym_utils import add_env_launcher_args_to_parser from embodichain.data import get_data_path +DEFAULT_CAPTURE_PATH = "./outputs/window_capture/create_scene.png" + def main(): """Main function to create and run the simulation scene.""" @@ -38,6 +40,12 @@ def main(): description="Create a simulation scene with SimulationManager" ) add_env_launcher_args_to_parser(parser) + parser.add_argument( + "--max-steps", + type=int, + default=10000, + help="Optional number of simulation steps to run before exiting.", + ) args = parser.parse_args() # Configure the simulation @@ -92,20 +100,40 @@ def main(): print("[INFO]: Scene setup complete!") print(f"[INFO]: Running simulation with {args.num_envs} environment(s)") print("[INFO]: Press Ctrl+C to stop the simulation") + if not args.headless: + print( + "[INFO]: Press 'p' in the viewer to capture a frame " + "(saved under ./outputs/window_capture/)" + ) # Open window when the scene has been set up if not args.headless: sim.open_window() # Run the simulation - run_simulation(sim) + run_simulation( + sim, + capture_window=args.headless, + capture_path=DEFAULT_CAPTURE_PATH, + max_steps=args.max_steps, + ) -def run_simulation(sim: SimulationManager): +def run_simulation( + sim: SimulationManager, + capture_window: bool = False, + capture_path: str = DEFAULT_CAPTURE_PATH, + max_steps: int | None = None, +) -> None: """Run the simulation loop. Args: sim: The SimulationManager instance to run + capture_window: Whether to capture a single frame with the hidden window + capture camera. Enabled by default when ``--headless`` is passed. + When a viewer window is open, press ``p`` to capture asynchronously instead. + capture_path: Path where the captured image is saved. + max_steps: Optional number of steps to run before exiting. """ # Initialize GPU physics if using CUDA @@ -113,6 +141,10 @@ def run_simulation(sim: SimulationManager): sim.init_gpu_physics() step_count = 0 + capture_done = False + capture_step = 10 + if max_steps is not None: + capture_step = max(1, min(capture_step, max_steps)) try: last_time = time.time() @@ -135,6 +167,20 @@ def run_simulation(sim: SimulationManager): last_time = current_time last_step = step_count + if capture_window and not capture_done and step_count >= capture_step: + frame = sim.capture_window(save_path=capture_path) + if frame is None: + raise RuntimeError("Window capture failed to produce a frame.") + print( + f"[INFO]: Captured window frame at {capture_path} " + f"with shape {frame.shape}" + ) + capture_done = True + + if max_steps is not None and step_count >= max_steps: + print(f"[INFO]: Reached max steps: {max_steps}") + break + except KeyboardInterrupt: print("\n[INFO]: Stopping simulation...") finally: