diff --git a/isaaclab_arena/assets/object_library.py b/isaaclab_arena/assets/object_library.py index 20f8f0e10..eb8b686d1 100644 --- a/isaaclab_arena/assets/object_library.py +++ b/isaaclab_arena/assets/object_library.py @@ -4,12 +4,9 @@ # SPDX-License-Identifier: Apache-2.0 -from typing import TYPE_CHECKING, Any +from typing import Any import isaaclab.sim as sim_utils - -if TYPE_CHECKING: - from isaaclab_arena.assets.hdr_image import HDRImage from isaaclab.assets import RigidObjectCfg from isaaclab.sim.spawners.from_files.from_files_cfg import GroundPlaneCfg from isaaclab.utils.assets import ISAAC_NUCLEUS_DIR, ISAACLAB_NUCLEUS_DIR @@ -285,6 +282,88 @@ def __init__( super().__init__(instance_name=instance_name, prim_path=prim_path, initial_pose=initial_pose, scale=scale) +# NIST insertion assets use Arena's high-precision assembly preset to mirror +# Factory/Forge dense-contact settings for small-clearance gear insertion. +@register_asset +class GearsAndBase(LibraryObject): + """NIST gear base with SDF collision (gear_base as root prim).""" + + name = "gears_and_base" + tags = ["object"] + usd_path = f"{ISAACLAB_NUCLEUS_DIR}/Arena/assets/object_library/NIST/gearbase_and_gears_gearbase_root.usd" + + spawn_cfg_addon = { + "rigid_props": RIGID_BODY_PROPS_HIGH_PRECISION.replace(disable_gravity=False, kinematic_enabled=True), + "mass_props": sim_utils.MassPropertiesCfg(mass=0.012), + "collision_props": sim_utils.CollisionPropertiesCfg(contact_offset=0.005, rest_offset=0.0), + } + + def __init__( + self, instance_name: str | None = None, prim_path: str | None = None, initial_pose: Pose | None = None + ): + super().__init__(instance_name=instance_name, prim_path=prim_path, initial_pose=initial_pose) + + +@register_asset +class MediumNistGear(LibraryObject): + """NIST medium gear (held asset for gear insertion).""" + + name = "medium_nist_gear" + tags = ["object"] + usd_path = f"{ISAACLAB_NUCLEUS_DIR}/Arena/assets/object_library/NIST/gear_medium.usd" + + spawn_cfg_addon = { + "rigid_props": RIGID_BODY_PROPS_HIGH_PRECISION.replace(disable_gravity=True), + "mass_props": sim_utils.MassPropertiesCfg(mass=0.012), + "collision_props": sim_utils.CollisionPropertiesCfg(contact_offset=0.005, rest_offset=0.0), + } + + def __init__( + self, instance_name: str | None = None, prim_path: str | None = None, initial_pose: Pose | None = None + ): + super().__init__(instance_name=instance_name, prim_path=prim_path, initial_pose=initial_pose) + + +@register_asset +class NistBoardAssembled(LibraryObject): + """NIST fully assembled taskboard.""" + + name = "nist_board_assembled" + tags = ["nist", "object"] + usd_path = f"{ISAACLAB_NUCLEUS_DIR}/Arena/assets/object_library/NIST/nist_board_assembled.usd" + object_type = ObjectType.BASE + # The assembled board is visual context for this task; gears_and_base owns + # the physical peg and gear contacts used by observations, rewards, and success. + spawn_cfg_addon = { + "rigid_props": sim_utils.RigidBodyPropertiesCfg(rigid_body_enabled=False), + "collision_props": sim_utils.CollisionPropertiesCfg(collision_enabled=False), + } + + def __init__( + self, instance_name: str | None = None, prim_path: str | None = None, initial_pose: Pose | None = None + ): + super().__init__(instance_name=instance_name, prim_path=prim_path, initial_pose=initial_pose) + + +@register_asset +class NistGearBase(LibraryObject): + """NIST standalone gear base.""" + + name = "nist_gear_base" + tags = ["nist", "object"] + usd_path = f"{ISAACLAB_NUCLEUS_DIR}/Arena/assets/object_library/NIST/gear_base.usd" + spawn_cfg_addon = { + "rigid_props": RIGID_BODY_PROPS_HIGH_PRECISION.replace(disable_gravity=False, kinematic_enabled=True), + "mass_props": sim_utils.MassPropertiesCfg(mass=0.012), + "collision_props": sim_utils.CollisionPropertiesCfg(contact_offset=0.005, rest_offset=0.0), + } + + def __init__( + self, instance_name: str | None = None, prim_path: str | None = None, initial_pose: Pose | None = None + ): + super().__init__(instance_name=instance_name, prim_path=prim_path, initial_pose=initial_pose) + + @register_asset class BlueSortingBin(LibraryObject): """ diff --git a/isaaclab_arena/embodiments/__init__.py b/isaaclab_arena/embodiments/__init__.py index d63721b26..2b5d70824 100644 --- a/isaaclab_arena/embodiments/__init__.py +++ b/isaaclab_arena/embodiments/__init__.py @@ -6,6 +6,7 @@ from .agibot.agibot import * from .droid.droid import * from .franka.franka import * +from .franka.nist_gear_insertion.franka_osc import * from .g1.g1 import * from .galbot.galbot import * from .gr1t2.gr1t2 import * diff --git a/isaaclab_arena/embodiments/franka/nist_gear_insertion/__init__.py b/isaaclab_arena/embodiments/franka/nist_gear_insertion/__init__.py new file mode 100644 index 000000000..4a5683985 --- /dev/null +++ b/isaaclab_arena/embodiments/franka/nist_gear_insertion/__init__.py @@ -0,0 +1,12 @@ +# Copyright (c) 2025-2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Franka embodiment components for NIST gear insertion.""" + +from .franka_osc import FrankaNistGearInsertionOscEmbodiment + +__all__ = [ + "FrankaNistGearInsertionOscEmbodiment", +] diff --git a/isaaclab_arena/embodiments/franka/nist_gear_insertion/franka_osc.py b/isaaclab_arena/embodiments/franka/nist_gear_insertion/franka_osc.py new file mode 100644 index 000000000..341f66232 --- /dev/null +++ b/isaaclab_arena/embodiments/franka/nist_gear_insertion/franka_osc.py @@ -0,0 +1,193 @@ +# Copyright (c) 2025-2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Franka OSC embodiment for the NIST gear insertion task.""" + +from __future__ import annotations + +import isaaclab.sim as sim_utils +from isaaclab.actuators import ImplicitActuatorCfg +from isaaclab.assets.articulation.articulation_cfg import ArticulationCfg +from isaaclab.sensors.frame_transformer.frame_transformer_cfg import FrameTransformerCfg, OffsetCfg +from isaaclab.utils.assets import ISAACLAB_NUCLEUS_DIR + +from isaaclab_arena.assets.register import register_asset +from isaaclab_arena.embodiments.common.arm_mode import ArmMode +from isaaclab_arena.embodiments.franka.franka import _DEFAULT_CAMERA_OFFSET, FrankaEmbodimentBase +from isaaclab_arena.embodiments.franka.nist_gear_insertion.gear_grasp import get_franka_nist_gear_insertion_grasp_config +from isaaclab_arena.tasks.nist_gear_insertion.events import GraspCfg +from isaaclab_arena.utils.pose import Pose + +__all__ = [ + "FrankaNistGearInsertionOscEmbodiment", +] + +_FRANKA_MIMIC_OSC_USD_PATH = f"{ISAACLAB_NUCLEUS_DIR}/Factory/franka_mimic.usd" + +# Mirrors Isaac Lab Factory's franka_mimic OSC setup in +# isaaclab_tasks.direct.factory.factory_env_cfg. +_FRANKA_MIMIC_OSC_RIGID_PROPS = sim_utils.RigidBodyPropertiesCfg( + disable_gravity=True, + max_depenetration_velocity=5.0, + linear_damping=0.0, + angular_damping=0.0, + max_linear_velocity=1000.0, + max_angular_velocity=3666.0, + enable_gyroscopic_forces=True, + solver_position_iteration_count=192, + solver_velocity_iteration_count=1, + max_contact_impulse=1e32, +) + +_FRANKA_MIMIC_OSC_ARTICULATION_PROPS = sim_utils.ArticulationRootPropertiesCfg( + enabled_self_collisions=False, + solver_position_iteration_count=192, + solver_velocity_iteration_count=1, +) + +_FRANKA_MIMIC_OSC_COLLISION_PROPS = sim_utils.CollisionPropertiesCfg(contact_offset=0.005, rest_offset=0.0) + +# Default pose from the Factory-style Franka mimic asset. +_FRANKA_MIMIC_OSC_JOINT_POS = { + "panda_joint1": 0.0, + "panda_joint2": -0.569, + "panda_joint3": 0.0, + "panda_joint4": -2.810, + "panda_joint5": 0.0, + "panda_joint6": 3.037, + "panda_joint7": 0.741, + "panda_finger_joint2": 0.04, +} + +# OSC writes joint torques directly for the arm; the hand remains position controlled. +_FRANKA_MIMIC_OSC_ACTUATORS = { + "panda_arm1": ImplicitActuatorCfg( + joint_names_expr=["panda_joint[1-4]"], + stiffness=0.0, + damping=0.0, + friction=0.0, + armature=0.0, + effort_limit_sim=87, + velocity_limit_sim=124.6, + ), + "panda_arm2": ImplicitActuatorCfg( + joint_names_expr=["panda_joint[5-7]"], + stiffness=0.0, + damping=0.0, + friction=0.0, + armature=0.0, + effort_limit_sim=12, + velocity_limit_sim=149.5, + ), + "panda_hand": ImplicitActuatorCfg( + joint_names_expr=["panda_finger_joint[1-2]"], + effort_limit_sim=40.0, + velocity_limit_sim=0.04, + stiffness=7500.0, + damping=173.0, + friction=0.1, + armature=0.0, + ), +} + +# Franka mimic USD with contact sensors enabled for the wrist force body. +_FRANKA_MIMIC_OSC_CFG = ArticulationCfg( + prim_path="{ENV_REGEX_NS}/Robot", + spawn=sim_utils.UsdFileCfg( + usd_path=_FRANKA_MIMIC_OSC_USD_PATH, + activate_contact_sensors=True, + rigid_props=_FRANKA_MIMIC_OSC_RIGID_PROPS, + articulation_props=_FRANKA_MIMIC_OSC_ARTICULATION_PROPS, + collision_props=_FRANKA_MIMIC_OSC_COLLISION_PROPS, + ), + init_state=ArticulationCfg.InitialStateCfg( + joint_pos=_FRANKA_MIMIC_OSC_JOINT_POS, + pos=(0.0, 0.0, 0.0), + rot=(0.0, 0.0, 0.0, 1.0), + ), + actuators=_FRANKA_MIMIC_OSC_ACTUATORS, +) + +_GEAR_INSERTION_INITIAL_JOINT_POSE = [ + 0.561824, + 0.287201, + -0.543103, + -2.410188, + 0.507908, + 2.847644, + 0.454298, + 0.04, + 0.04, +] + + +def _gear_insertion_ee_frame_cfg() -> FrameTransformerCfg: + """Return Franka frames used by the gear insertion OSC policy. + + The policy commands the centered fingertip frame, while the finger frames + are exposed for grasp/reset diagnostics. + """ + return FrameTransformerCfg( + prim_path="{ENV_REGEX_NS}/Robot/panda_link0", + debug_vis=False, + target_frames=[ + FrameTransformerCfg.FrameCfg( + prim_path="{ENV_REGEX_NS}/Robot/panda_fingertip_centered", + name="end_effector", + offset=OffsetCfg(pos=(0.0, 0.0, 0.0)), + ), + FrameTransformerCfg.FrameCfg( + prim_path="{ENV_REGEX_NS}/Robot/panda_rightfinger", + name="tool_rightfinger", + offset=OffsetCfg(pos=(0.0, 0.0, 0.046)), + ), + FrameTransformerCfg.FrameCfg( + prim_path="{ENV_REGEX_NS}/Robot/panda_leftfinger", + name="tool_leftfinger", + offset=OffsetCfg(pos=(0.0, 0.0, 0.046)), + ), + ], + ) + + +@register_asset +class FrankaNistGearInsertionOscEmbodiment(FrankaEmbodimentBase): + """Franka embodiment for NIST gear insertion with OSC torque control. + + The embodiment owns the Franka mimic robot setup, OSC command frame, and + grasp metadata. Environments wire scene-specific asset names and insertion + geometry into the action, observation, and reward configs. + """ + + name = "franka_nist_gear_insertion_osc" + + def __init__( + self, + enable_cameras: bool = False, + initial_pose: Pose | None = None, + initial_joint_pose: list[float] | None = None, + concatenate_observation_terms: bool = False, + arm_mode: ArmMode | None = None, + camera_offset: Pose | None = _DEFAULT_CAMERA_OFFSET, + is_tiled_camera: bool = False, + ): + super().__init__( + enable_cameras=enable_cameras, + initial_pose=initial_pose, + initial_joint_pose=initial_joint_pose, + concatenate_observation_terms=concatenate_observation_terms, + arm_mode=arm_mode, + camera_offset=camera_offset, + is_tiled_camera=is_tiled_camera, + ) + self.scene_config.robot = _FRANKA_MIMIC_OSC_CFG.replace(prim_path="{ENV_REGEX_NS}/Robot") + self.scene_config.ee_frame = _gear_insertion_ee_frame_cfg() + self.set_initial_joint_pose(initial_joint_pose or _GEAR_INSERTION_INITIAL_JOINT_POSE) + + def get_command_body_name(self) -> str: + return "panda_fingertip_centered" + + def get_gear_insertion_grasp_config(self) -> GraspCfg: + return get_franka_nist_gear_insertion_grasp_config() diff --git a/isaaclab_arena/embodiments/franka/nist_gear_insertion/gear_grasp.py b/isaaclab_arena/embodiments/franka/nist_gear_insertion/gear_grasp.py new file mode 100644 index 000000000..9ad43b4d8 --- /dev/null +++ b/isaaclab_arena/embodiments/franka/nist_gear_insertion/gear_grasp.py @@ -0,0 +1,46 @@ +# Copyright (c) 2025-2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Franka-specific grasp parameters for NIST gear insertion.""" + +from __future__ import annotations + +import torch +from collections.abc import Sequence + +from isaaclab_arena.tasks.nist_gear_insertion.events import GraspCfg + + +def franka_gripper_joint_setter( + joint_pos: torch.Tensor, + row_indices: Sequence[int], + finger_joint_indices: Sequence[int], + width: float, +) -> None: + """Set Franka Panda finger joints from a total gripper opening width.""" + for jid in finger_joint_indices: + joint_pos[row_indices, jid] = width / 2.0 + + +def get_franka_nist_gear_insertion_grasp_config() -> GraspCfg: + """Return reset grasp parameters for the Franka NIST gear insertion policy. + + The reset first opens the fingers to 3 cm while inverse kinematics moves + ``panda_hand`` to the gear, then closes the fingers to 0 m to hold the gear. + In Isaac Lab 3.0 xyzw convention, ``[1, 0, 0, 0]`` is an intentional + 180-degree rotation about X. The ``[0.02, 0.0, -0.128]`` translation is the + held-gear-root to ``panda_hand`` offset that centers the gear between the + fingertips. + """ + return GraspCfg( + hand_grasp_width=0.03, + hand_close_width=0.0, + gripper_joint_setter_func=franka_gripper_joint_setter, + end_effector_body_name="panda_hand", + finger_joint_names="panda_finger_joint[1-2]", + grasp_rot_offset=[1.0, 0.0, 0.0, 0.0], + grasp_offset=[0.02, 0.0, -0.128], + arm_joint_names="panda_joint[1-7]", + ) diff --git a/isaaclab_arena/policy/__init__.py b/isaaclab_arena/policy/__init__.py index 66caa51a1..9c3dbfb18 100644 --- a/isaaclab_arena/policy/__init__.py +++ b/isaaclab_arena/policy/__init__.py @@ -6,5 +6,6 @@ from .action_chunking import ActionChunkingState from .action_chunking_client import * from .replay_action_policy import * +from .rl_games_action_policy import * from .rsl_rl_action_policy import * from .zero_action_policy import * diff --git a/isaaclab_arena/policy/rl_games_action_policy.py b/isaaclab_arena/policy/rl_games_action_policy.py new file mode 100644 index 000000000..c5d1e3a37 --- /dev/null +++ b/isaaclab_arena/policy/rl_games_action_policy.py @@ -0,0 +1,229 @@ +# Copyright (c) 2025-2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import argparse +import gymnasium as gym +import math +import torch +import yaml +from dataclasses import dataclass +from gymnasium.spaces.dict import Dict as GymSpacesDict +from pathlib import Path + +from isaaclab.utils.assets import retrieve_file_path +from isaaclab_rl.rl_games import RlGamesGpuEnv, RlGamesVecEnvWrapper +from rl_games.common import env_configurations, vecenv +from rl_games.common.player import BasePlayer +from rl_games.torch_runner import Runner + +from isaaclab_arena.assets.register import register_policy +from isaaclab_arena.policy.policy_base import PolicyBase + + +class _RlGamesInferenceEnvWrapper(RlGamesVecEnvWrapper): + """``RlGamesVecEnvWrapper`` that avoids extra simulator resets during inference. + + Mirrors :class:`_RslRlInferenceEnvWrapper`: ``policy_runner.py`` already + resets the environment before requesting the first action, so wrapper setup + should read the cached observation buffer instead of triggering a second + reset and recording a phantom episode. + """ + + def __init__( + self, + env: gym.Env, + rl_device: str, + clip_obs: float, + clip_actions: float, + obs_groups: dict[str, list[str]] | None = None, + concate_obs_group: bool = True, + ): + original_reset = env.reset + # Return cached obs during wrapper setup to avoid recording a phantom episode. + env.reset = lambda *args, **kwargs: (dict(env.unwrapped.obs_buf), {}) + try: + super().__init__(env, rl_device, clip_obs, clip_actions, obs_groups, concate_obs_group) + finally: + env.reset = original_reset + + def process_observations(self, observation: GymSpacesDict) -> dict[str, torch.Tensor]: + """Process Arena observations through the RL-Games wrapper.""" + return self._process_obs(dict(observation)) + + +@dataclass +class RlGamesActionPolicyConfig: + """Configuration for RL-Games action policy. + + Supports both dict-based (from JSON eval runner) and CLI-based configuration. + """ + + checkpoint_path: str + """Path to the RL-Games .pth checkpoint file.""" + + agent_cfg_path: Path | None = None + """Path to the RL-Games agent YAML configuration file. + + When using the CLI (``policy_runner.py``), this is set via ``--agent_cfg_path``. + """ + + device: str = "cuda:0" + """Device to run the policy on.""" + + deterministic: bool = True + """Use mean actions (no exploration noise) during evaluation.""" + + @classmethod + def from_cli_args(cls, args: argparse.Namespace) -> RlGamesActionPolicyConfig: + return cls( + checkpoint_path=args.checkpoint_path, + agent_cfg_path=args.agent_cfg_path, + device=args.device if hasattr(args, "device") else "cuda:0", + deterministic=getattr(args, "deterministic", True), + ) + + +@register_policy +class RlGamesActionPolicy(PolicyBase): + """Policy that uses a trained RL-Games model for inference. + + Wraps the RL-Games player for use with the Arena policy runner and eval + runner. Handles observation processing, clipping, and RNN state management. + """ + + name = "rl_games" + config_class = RlGamesActionPolicyConfig + + def __init__(self, config: RlGamesActionPolicyConfig): + super().__init__(config) + self.config: RlGamesActionPolicyConfig = config + self._player: BasePlayer | None = None + self._wrapper: _RlGamesInferenceEnvWrapper | None = None + self._rnn_initialized = False + + def _load_policy(self, env: gym.Env) -> None: + """Set up RL-Games infrastructure and load the checkpoint.""" + if self.config.agent_cfg_path is None: + raise ValueError("RL-Games policy requires --agent_cfg_path.") + + with open(self.config.agent_cfg_path) as f: + agent_cfg = yaml.safe_load(f) + + device = self.config.device + agent_cfg["params"]["config"]["device"] = device + agent_cfg["params"]["config"]["device_name"] = device + + resume_path = retrieve_file_path(self.config.checkpoint_path) + agent_cfg["params"]["load_checkpoint"] = True + agent_cfg["params"]["load_path"] = resume_path + + clip_obs = agent_cfg["params"]["env"].get("clip_observations", math.inf) + clip_actions = agent_cfg["params"]["env"].get("clip_actions", math.inf) + obs_groups = agent_cfg["params"]["env"].get("obs_groups") + concate_obs = agent_cfg["params"]["env"].get("concate_obs_groups", True) + + self._wrapper = _RlGamesInferenceEnvWrapper( + env, + device, + clip_obs, + clip_actions, + obs_groups, + concate_obs, + ) + + vecenv.register( + "IsaacRlgWrapper", + lambda config_name, num_actors, **kwargs: RlGamesGpuEnv(config_name, num_actors, **kwargs), + ) + env_configurations.register( + "rlgpu", + {"vecenv_type": "IsaacRlgWrapper", "env_creator": lambda **kwargs: self._wrapper}, + ) + + agent_cfg["params"]["config"]["num_actors"] = env.unwrapped.num_envs + + runner = Runner() + runner.load(agent_cfg) + self._player = runner.create_player() + self._player.restore(resume_path) + self._player.reset() + + print(f"[INFO] Loaded RL-Games checkpoint from: {resume_path}") + + def get_action(self, env: gym.Env, observation: GymSpacesDict) -> torch.Tensor: + if self._player is None: + self._load_policy(env) + + assert self._player is not None + assert self._wrapper is not None + + obs_rlg = self._wrapper.process_observations(observation) + obs_tensor = obs_rlg["obs"] + + if not self._rnn_initialized: + _ = self._player.get_batch_size(obs_tensor, 1) + if self._player.is_rnn: + self._player.init_rnn() + self._rnn_initialized = True + + obs_tensor = self._player.obs_to_torch(obs_tensor) + + with torch.inference_mode(): + action = self._player.get_action(obs_tensor, is_deterministic=self.config.deterministic) + + return action + + def reset(self, env_ids: torch.Tensor | None = None) -> None: + if self._player is None or not self._player.is_rnn: + return + if not hasattr(self._player, "states") or self._player.states is None: + return + if env_ids is None: + for state in self._player.states: + state.zero_() + else: + for state in self._player.states: + state[:, env_ids, :] = 0.0 + + @classmethod + def from_dict(cls, config_dict: dict) -> RlGamesActionPolicy: + config = RlGamesActionPolicyConfig(**config_dict) + return cls(config) + + @staticmethod + def add_args_to_parser(parser: argparse.ArgumentParser) -> argparse.ArgumentParser: + group = parser.add_argument_group("RL-Games Action Policy", "Arguments for RL-Games action policy") + group.add_argument( + "--checkpoint_path", + type=str, + required=True, + help="Path to the .pth checkpoint file containing the RL-Games policy.", + ) + group.add_argument( + "--agent_cfg_path", + type=Path, + required=True, + help="Path to the RL-Games agent YAML configuration file.", + ) + group.add_argument( + "--deterministic", + action="store_true", + default=True, + help="Use mean actions without exploration noise (default: True).", + ) + group.add_argument( + "--stochastic", + dest="deterministic", + action="store_false", + help="Use stochastic actions with exploration noise.", + ) + return parser + + @staticmethod + def from_args(args: argparse.Namespace) -> RlGamesActionPolicy: + config = RlGamesActionPolicyConfig.from_cli_args(args) + return RlGamesActionPolicy(config) diff --git a/isaaclab_arena/tasks/nist_gear_insertion/__init__.py b/isaaclab_arena/tasks/nist_gear_insertion/__init__.py new file mode 100644 index 000000000..11d432141 --- /dev/null +++ b/isaaclab_arena/tasks/nist_gear_insertion/__init__.py @@ -0,0 +1,16 @@ +# Copyright (c) 2025-2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Task modules for NIST gear insertion.""" + +from .events import GraspCfg, place_gear_in_gripper +from .task import GearInsertionGeometryCfg, NistGearInsertionRLTask + +__all__ = [ + "GearInsertionGeometryCfg", + "GraspCfg", + "NistGearInsertionRLTask", + "place_gear_in_gripper", +] diff --git a/isaaclab_arena/tasks/nist_gear_insertion/events.py b/isaaclab_arena/tasks/nist_gear_insertion/events.py new file mode 100644 index 000000000..45529df19 --- /dev/null +++ b/isaaclab_arena/tasks/nist_gear_insertion/events.py @@ -0,0 +1,318 @@ +# Copyright (c) 2025-2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Reset event terms for NIST gear insertion tasks.""" + +from __future__ import annotations + +import torch +from collections.abc import Callable, Sequence +from dataclasses import field +from typing import TYPE_CHECKING + +import isaaclab.utils.math as math_utils +import warp as wp +from isaaclab.assets import Articulation +from isaaclab.managers import EventTermCfg, ManagerTermBase, SceneEntityCfg +from isaaclab.utils import configclass +from isaaclab_tasks.direct.automate import factory_control as fc + +if TYPE_CHECKING: + from isaaclab.envs import ManagerBasedEnv + + +@configclass +class GraspCfg: + """Embodiment-specific reset grasp parameters. + + The task owns the reset event, but the embodiment supplies the hand body, + finger joints, offsets, and gripper-width setter because those details are + robot-specific. + """ + + hand_grasp_width: float = 0.03 + """Opening width used before flushing the reset grasp state.""" + + hand_close_width: float = 0.0 + """Final gripper target width after the gear is placed.""" + + gripper_joint_setter_func: Callable[..., None] | None = None + """Callable that maps a total opening width to embodiment-specific joints.""" + + end_effector_body_name: str = "panda_hand" + """Robot body used as the IK end-effector for reset grasp placement.""" + + finger_joint_names: str = "panda_finger_joint[1-2]" + """Joint-name expression for the gripper fingers.""" + + grasp_rot_offset: list[float] = field(default_factory=lambda: [0.0, 0.0, 0.0, 1.0]) + """Gear-frame to end-effector grasp rotation in Isaac Lab xyzw convention.""" + + grasp_offset: list[float] = field(default_factory=lambda: [0.0, 0.0, 0.0]) + """Gear-frame translation from the gear root to the end-effector grasp pose.""" + + arm_joint_names: str = "panda_joint.*" + """Arm joints affected by task reset randomization.""" + + max_ik_iterations: int = 10 + """Maximum DLS IK updates used to place the hand at the grasp frame.""" + + pos_threshold: float = 1e-6 + """Position-error threshold for stopping reset-time IK.""" + + rot_threshold: float = 1e-6 + """Axis-angle error threshold for stopping reset-time IK.""" + + +class place_gear_in_gripper(ManagerTermBase): + """Place the held gear in the robot gripper during reset. + + The event follows the Factory/Forge reset pattern: solve IK to move the hand + to a configured grasp frame on the gear, open to the grasp width, flush the + simulator state, then set the final closed target. The final target is not + written as a teleport so the controller maintains the grasp after reset. + """ + + def __init__(self, cfg: EventTermCfg, env: ManagerBasedEnv): + super().__init__(cfg, env) + gear_cfg = cfg.params["gear_cfg"] + grasp_cfg = cfg.params["grasp_cfg"] + robot_cfg = cfg.params.get("robot_cfg", SceneEntityCfg("robot")) + self.robot: Articulation = env.scene[robot_cfg.name] + + self.gear = env.scene[gear_cfg.name] + + self.hand_grasp_width = grasp_cfg.hand_grasp_width + self.hand_close_width = grasp_cfg.hand_close_width + assert grasp_cfg.gripper_joint_setter_func is not None, "A gripper joint setter is required for grasp reset." + self.gripper_joint_setter_func = grasp_cfg.gripper_joint_setter_func + + # Cache default grasp offsets as tensors because this event runs every + # reset and often across thousands of environments. + self.grasp_rot_offset_tensor = ( + torch.tensor(grasp_cfg.grasp_rot_offset, device=env.device, dtype=torch.float32) + .unsqueeze(0) + .repeat(env.num_envs, 1) + ) + + self.grasp_offset_tensor = torch.tensor(grasp_cfg.grasp_offset, device=env.device, dtype=torch.float32) + + self._resolve_robot_indices(grasp_cfg.end_effector_body_name, grasp_cfg.finger_joint_names) + + self._max_ik_iterations = grasp_cfg.max_ik_iterations + self._pos_threshold = grasp_cfg.pos_threshold + self._rot_threshold = grasp_cfg.rot_threshold + + def _resolve_robot_indices(self, end_effector_body_name: str, finger_joint_names: str) -> None: + """Resolve the body and joint indices needed by reset-time IK.""" + eef_indices, _ = self.robot.find_bodies([end_effector_body_name]) + if not eef_indices: + raise ValueError(f"End-effector body '{end_effector_body_name}' not found in robot") + self.eef_idx = eef_indices[0] + # Body index is shifted by one when indexing ``root_view.get_jacobians()``. + self.jacobi_body_idx = self.eef_idx - 1 + assert self.jacobi_body_idx >= 0, "End-effector body must not be the articulation root." + + self.all_joints, _ = self.robot.find_joints([".*"]) + self.finger_joints, _ = self.robot.find_joints([finger_joint_names]) + if not self.finger_joints: + raise ValueError(f"Finger joints '{finger_joint_names}' not found in robot") + + def _set_gripper_width( + self, + joint_pos: torch.Tensor, + width: float, + gripper_joint_setter_func: Callable[..., None], + ) -> None: + """Update the selected finger joints for the requested opening width.""" + row_indices = torch.arange(joint_pos.shape[0], device=joint_pos.device) + gripper_joint_setter_func(joint_pos, row_indices, self.finger_joints, width) + + def _sync_sim_state(self, env: ManagerBasedEnv) -> None: + """Flush written joint state before reading body poses again.""" + env.scene.write_data_to_sim() + env.sim.forward() + env.scene.update(dt=0.0) + + def _run_grasp_ik( + self, + env: ManagerBasedEnv, + env_ids: Sequence[int] | slice | torch.Tensor, + num_envs: int, + grasp_rot_offset: torch.Tensor, + grasp_offset: torch.Tensor, + ) -> torch.Tensor: + """Move the end-effector to the gear grasp pose using iterative DLS IK. + + Each iteration computes the world-frame grasp pose from the current gear pose, + applies one damped-least-squares joint update, and flushes the simulation state + so the next iteration sees updated body transforms. + """ + joint_vel = torch.zeros(num_envs, len(self.all_joints), device=env.device) + for _ in range(self._max_ik_iterations): + joint_pos, joint_vel = self._read_joint_state(env_ids) + target_pos, target_quat = self._compute_grasp_target_pose(env_ids, grasp_rot_offset, grasp_offset) + delta_hand_pose, pos_error, aa_error = self._compute_pose_error(env_ids, target_pos, target_quat) + + if self._has_converged(pos_error, aa_error): + break + + delta_dof_pos = self._compute_dls_joint_delta(env, env_ids, delta_hand_pose) + joint_pos = joint_pos + delta_dof_pos + joint_vel = torch.zeros_like(joint_pos) + self._write_ik_joint_state(env, env_ids, joint_pos, joint_vel) + + return joint_vel + + def _read_joint_state( + self, + env_ids: Sequence[int] | slice | torch.Tensor, + ) -> tuple[torch.Tensor, torch.Tensor]: + """Return current joint positions and velocities for the reset IK solve.""" + joint_pos = wp.to_torch(self.robot.data.joint_pos)[env_ids].clone() + joint_vel = wp.to_torch(self.robot.data.joint_vel)[env_ids].clone() + return joint_pos, joint_vel + + def _compute_grasp_target_pose( + self, + env_ids: Sequence[int] | slice | torch.Tensor, + grasp_rot_offset: torch.Tensor, + grasp_offset: torch.Tensor, + ) -> tuple[torch.Tensor, torch.Tensor]: + """Return the world-frame hand pose for the configured gear grasp.""" + gear_pos_w = wp.to_torch(self.gear.data.root_link_pos_w)[env_ids].clone() + gear_quat_w = wp.to_torch(self.gear.data.root_link_quat_w)[env_ids].clone() + target_quat = math_utils.quat_mul(gear_quat_w, grasp_rot_offset) + target_pos = gear_pos_w + math_utils.quat_apply(target_quat, grasp_offset) + return target_pos, target_quat + + def _compute_pose_error( + self, + env_ids: Sequence[int] | slice | torch.Tensor, + target_pos: torch.Tensor, + target_quat: torch.Tensor, + ) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]: + """Return Cartesian pose error used by the Factory DLS IK routine.""" + eef_pos = wp.to_torch(self.robot.data.body_pos_w)[env_ids, self.eef_idx] + eef_quat = wp.to_torch(self.robot.data.body_quat_w)[env_ids, self.eef_idx] + pos_error, aa_error = fc.get_pose_error( + fingertip_midpoint_pos=eef_pos, + fingertip_midpoint_quat=eef_quat, + ctrl_target_fingertip_midpoint_pos=target_pos, + ctrl_target_fingertip_midpoint_quat=target_quat, + jacobian_type="geometric", + rot_error_type="axis_angle", + ) + return torch.cat((pos_error, aa_error), dim=-1), pos_error, aa_error + + def _has_converged(self, pos_error: torch.Tensor, aa_error: torch.Tensor) -> bool: + """Return whether the reset IK solve is within configured tolerances.""" + return ( + torch.norm(pos_error, dim=-1).max() < self._pos_threshold + and torch.norm(aa_error, dim=-1).max() < self._rot_threshold + ) + + def _compute_dls_joint_delta( + self, + env: ManagerBasedEnv, + env_ids: Sequence[int] | slice | torch.Tensor, + delta_hand_pose: torch.Tensor, + ) -> torch.Tensor: + """Return one damped-least-squares joint update for the hand pose error.""" + # Isaac Lab articulation Jacobians are indexed by non-root body id; + # the shifted index is resolved once in ``_resolve_robot_indices``. + jacobians = wp.to_torch(self.robot.root_view.get_jacobians()).clone() + jacobian = jacobians[env_ids, self.jacobi_body_idx, :, :] + return fc._get_delta_dof_pos( + delta_pose=delta_hand_pose, + ik_method="dls", + jacobian=jacobian, + device=env.device, + ) + + def _write_ik_joint_state( + self, + env: ManagerBasedEnv, + env_ids: Sequence[int] | slice | torch.Tensor, + joint_pos: torch.Tensor, + joint_vel: torch.Tensor, + ) -> None: + """Write one IK update and refresh tensors before the next iteration.""" + self.robot.set_joint_position_target_index(target=joint_pos, env_ids=env_ids) + self.robot.set_joint_velocity_target_index(target=joint_vel, env_ids=env_ids) + self.robot.write_joint_position_to_sim_index(position=joint_pos, env_ids=env_ids) + self.robot.write_joint_velocity_to_sim_index(velocity=joint_vel, env_ids=env_ids) + self._sync_sim_state(env) + + def _write_gripper_width( + self, + env: ManagerBasedEnv, + env_ids: Sequence[int] | slice | torch.Tensor, + joint_pos: torch.Tensor, + joint_vel: torch.Tensor, + width: float, + gripper_joint_setter_func: Callable[..., None], + ) -> None: + """Write one gripper width and flush the reset state to the simulator.""" + self._set_gripper_width(joint_pos, width, gripper_joint_setter_func) + self.robot.set_joint_position_target_index( + target=joint_pos, + joint_ids=self.all_joints, + env_ids=env_ids, + ) + self.robot.write_joint_position_to_sim_index(position=joint_pos, env_ids=env_ids) + self.robot.write_joint_velocity_to_sim_index(velocity=joint_vel, env_ids=env_ids) + self._sync_sim_state(env) + + def _set_gripper_width_target( + self, + env_ids: Sequence[int] | slice | torch.Tensor, + joint_pos: torch.Tensor, + width: float, + gripper_joint_setter_func: Callable[..., None], + ) -> None: + """Set the final gripper target without teleporting the sim joint state.""" + self._set_gripper_width(joint_pos, width, gripper_joint_setter_func) + self.robot.set_joint_position_target_index( + target=joint_pos, + joint_ids=self.all_joints, + env_ids=env_ids, + ) + + def __call__( + self, + env: ManagerBasedEnv, + env_ids: Sequence[int] | slice | torch.Tensor, + gear_cfg: SceneEntityCfg, + grasp_cfg: GraspCfg, + robot_cfg: SceneEntityCfg | None = None, + ) -> None: + """Run the reset grasp sequence for ``env_ids``. + + The event manager passes ``gear_cfg``, ``grasp_cfg``, and ``robot_cfg`` + on every call; this term resolves and caches them in ``__init__``. + """ + grasp_rot_offset_tensor = self.grasp_rot_offset_tensor[env_ids] + num_envs = grasp_rot_offset_tensor.shape[0] + grasp_offset_batch = self.grasp_offset_tensor.unsqueeze(0).expand(num_envs, -1) + joint_vel = self._run_grasp_ik( + env=env, + env_ids=env_ids, + num_envs=num_envs, + grasp_rot_offset=grasp_rot_offset_tensor, + grasp_offset=grasp_offset_batch, + ) + + joint_pos = wp.to_torch(self.robot.data.joint_pos)[env_ids].clone() + + self._write_gripper_width( + env, + env_ids, + joint_pos, + joint_vel, + self.hand_grasp_width, + self.gripper_joint_setter_func, + ) + self._set_gripper_width_target(env_ids, joint_pos, self.hand_close_width, self.gripper_joint_setter_func) diff --git a/isaaclab_arena/tasks/nist_gear_insertion/geometry.py b/isaaclab_arena/tasks/nist_gear_insertion/geometry.py new file mode 100644 index 000000000..2ad2cd41f --- /dev/null +++ b/isaaclab_arena/tasks/nist_gear_insertion/geometry.py @@ -0,0 +1,150 @@ +# Copyright (c) 2025-2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Shared geometry utilities for gear insertion. + +The frame math follows the Isaac Lab Factory/Forge insertion tasks: represent +the peg and held-gear insertion points as local offsets on their owning assets, +then compare those points in each environment frame. +""" + +from __future__ import annotations + +import torch + +import isaaclab.utils.math as math_utils +import warp as wp +from isaaclab.assets import RigidObject +from isaaclab.envs import ManagerBasedRLEnv +from isaaclab.managers import SceneEntityCfg + + +def resolve_offset_tensor( + values: list[float] | tuple[float, ...] | None, + cached_values: tuple[float, ...], + cached_tensor: torch.Tensor, + device: torch.device, +) -> torch.Tensor: + """Return an offset tensor on ``device``. + + Reward and termination terms cache their default offsets at construction. + Manager call-time parameters may still override those values, so this function + keeps the common "use cached tensor unless explicitly overridden" path in + one place. + """ + if values is None or tuple(values) == cached_values: + return cached_tensor + return torch.tensor(values, device=device, dtype=torch.float32) + + +def compute_asset_local_offset_pose( + env: ManagerBasedRLEnv, + asset: RigidObject, + offset: tuple[float, ...] | torch.Tensor, + num_envs: int | None = None, +) -> tuple[torch.Tensor, torch.Tensor]: + """Return an asset-local offset pose in each environment frame. + + Isaac Lab stores rigid-object root poses in world coordinates. Gear + insertion rewards and terminations compare geometry in the per-environment + frame, so this function subtracts ``env.scene.env_origins`` before rotating + the local offset into the asset root frame. + """ + num_envs = env.num_envs if num_envs is None else num_envs + root_pos = wp.to_torch(asset.data.root_pos_w)[:num_envs] - env.scene.env_origins[:num_envs] + root_quat = wp.to_torch(asset.data.root_quat_w)[:num_envs] + offset_tensor = torch.as_tensor(offset, device=env.device, dtype=torch.float32) + offset_tensor = offset_tensor.unsqueeze(0).expand(num_envs, 3) + return root_pos + math_utils.quat_apply(root_quat, offset_tensor), root_quat + + +def compute_asset_local_offset_pos( + env: ManagerBasedRLEnv, + asset: RigidObject, + offset: tuple[float, ...] | torch.Tensor, +) -> torch.Tensor: + """Return an asset-local offset position in each environment frame.""" + pos, _ = compute_asset_local_offset_pose(env, asset, offset) + return pos + + +def peg_pos_in_env_frame( + env: ManagerBasedRLEnv, + board_cfg: SceneEntityCfg, + peg_offset: tuple[float, ...] = (0.0, 0.0, 0.0), +) -> torch.Tensor: + """Return the target peg position in each environment frame. + + ``peg_offset`` is measured in the fixed board or gear-base asset frame. + """ + board: RigidObject = env.scene[board_cfg.name] + return compute_asset_local_offset_pos(env, board, peg_offset) + + +def held_gear_base_pos_in_env_frame( + env: ManagerBasedRLEnv, + gear_cfg: SceneEntityCfg, + held_gear_base_offset: tuple[float, ...] = (0.0, 0.0, 0.0), +) -> torch.Tensor: + """Return the held gear insertion point in each environment frame. + + The held gear's root is not necessarily the point that should align with + the peg, so insertion logic compares this offset point against the peg. + """ + gear: RigidObject = env.scene[gear_cfg.name] + return compute_asset_local_offset_pos(env, gear, held_gear_base_offset) + + +def peg_delta_from_held_gear_base( + env: ManagerBasedRLEnv, + gear_cfg: SceneEntityCfg, + board_cfg: SceneEntityCfg, + peg_offset: tuple[float, ...] = (0.0, 0.0, 0.0), + held_gear_base_offset: tuple[float, ...] = (0.0, 0.0, 0.0), +) -> torch.Tensor: + """Return the vector from the held gear insertion point to the peg.""" + held_base = held_gear_base_pos_in_env_frame(env, gear_cfg, held_gear_base_offset) + peg_pos = peg_pos_in_env_frame(env, board_cfg, peg_offset) + return peg_pos - held_base + + +def check_gear_insertion_geometry( + held_base_pos: torch.Tensor, + peg_pos: torch.Tensor, + gear_peg_height: float, + z_fraction: float, + xy_threshold: float, +) -> torch.Tensor: + """Return whether the gear is centered and inserted on the peg. + + Success is geometric: the insertion point must be within the XY tolerance + and below the configured fraction of the peg height. Lower Z values mean the + gear has moved farther down the peg. + """ + xy_dist = torch.norm(held_base_pos[:, :2] - peg_pos[:, :2], dim=-1) + z_diff = held_base_pos[:, 2] - peg_pos[:, 2] + return (xy_dist < xy_threshold) & (z_diff < gear_peg_height * z_fraction) + + +def compute_gear_insertion_success( + env: ManagerBasedRLEnv, + gear_cfg: SceneEntityCfg, + board_cfg: SceneEntityCfg, + peg_offset: tuple[float, ...] | torch.Tensor, + held_gear_base_offset: tuple[float, ...] | torch.Tensor, + gear_peg_height: float, + z_fraction: float, + xy_threshold: float, +) -> torch.Tensor: + """Return whether the held gear meets the insertion success geometry. + + This is the shared implementation used by rewards, terminations, and + OSC-specific auxiliary losses so they agree on the success label. + """ + gear: RigidObject = env.scene[gear_cfg.name] + board: RigidObject = env.scene[board_cfg.name] + held_base_pos, _ = compute_asset_local_offset_pose(env, gear, held_gear_base_offset) + peg_pos, _ = compute_asset_local_offset_pose(env, board, peg_offset) + return check_gear_insertion_geometry(held_base_pos, peg_pos, gear_peg_height, z_fraction, xy_threshold) diff --git a/isaaclab_arena/tasks/nist_gear_insertion/observations.py b/isaaclab_arena/tasks/nist_gear_insertion/observations.py new file mode 100644 index 000000000..1808cd331 --- /dev/null +++ b/isaaclab_arena/tasks/nist_gear_insertion/observations.py @@ -0,0 +1,59 @@ +# Copyright (c) 2025-2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Generic observation terms for gear insertion tasks. + +This module intentionally contains reusable task observations. The packed +OSC policy observation lives in the environment MDP package because it depends +on OSC action-term state. +""" + +from __future__ import annotations + +import torch +from typing import TYPE_CHECKING + +import warp as wp +from isaaclab.assets import Articulation +from isaaclab.managers import SceneEntityCfg +from isaaclab.utils.math import quat_unique + +if TYPE_CHECKING: + from isaaclab.envs import ManagerBasedRLEnv + + +def _resolve_body_id(robot: Articulation, body_name: str) -> int: + """Return the single body id matching ``body_name``.""" + body_ids, body_names = robot.find_bodies([body_name]) + if len(body_ids) != 1: + raise ValueError(f"Expected one body matching '{body_name}', found {len(body_ids)}: {body_names}.") + return body_ids[0] + + +def body_pos_in_env_frame( + env: ManagerBasedRLEnv, + robot_cfg: SceneEntityCfg = SceneEntityCfg("robot"), + body_name: str = "panda_fingertip_centered", +) -> torch.Tensor: + """Return a robot body position in each environment frame. + + Isaac Lab body positions are world-frame values; subtracting environment + origins makes the observation invariant to scene cloning offsets. + """ + robot: Articulation = env.scene[robot_cfg.name] + body_id = _resolve_body_id(robot, body_name) + return wp.to_torch(robot.data.body_pos_w)[:, body_id] - env.scene.env_origins + + +def body_quat_canonical( + env: ManagerBasedRLEnv, + robot_cfg: SceneEntityCfg = SceneEntityCfg("robot"), + body_name: str = "panda_fingertip_centered", +) -> torch.Tensor: + """Return a robot body quaternion with a unique sign convention.""" + robot: Articulation = env.scene[robot_cfg.name] + body_id = _resolve_body_id(robot, body_name) + quat = wp.to_torch(robot.data.body_quat_w)[:, body_id] + return quat_unique(quat) diff --git a/isaaclab_arena/tasks/nist_gear_insertion/rewards.py b/isaaclab_arena/tasks/nist_gear_insertion/rewards.py new file mode 100644 index 000000000..423dce20a --- /dev/null +++ b/isaaclab_arena/tasks/nist_gear_insertion/rewards.py @@ -0,0 +1,234 @@ +# Copyright (c) 2025-2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Reward terms for gear insertion tasks. + +The generic rewards in this module only depend on insertion geometry: keypoint +alignment around the peg and sparse bonuses for engagement or completion. +Controller-specific penalties live in the environment MDP package. +""" + +from __future__ import annotations + +import torch +from typing import TYPE_CHECKING + +from isaaclab.assets import RigidObject +from isaaclab.managers import ManagerTermBase, RewardTermCfg, SceneEntityCfg +from isaaclab.utils.math import combine_frame_transforms +from isaaclab_tasks.direct.factory.factory_utils import get_keypoint_offsets, squashing_fn + +from isaaclab_arena.tasks.nist_gear_insertion import geometry as gear_geometry + +if TYPE_CHECKING: + from isaaclab.envs import ManagerBasedRLEnv + + +class _KeypointDistanceComputer: + """Keypoint distance calculator with reusable buffers. + + Factory/Forge insertion rewards compare multiple keypoints around the + target and held-object frames instead of comparing only frame origins. The + keypoint offsets and identity quaternions are allocated once here to avoid + per-step tensor construction. + """ + + def __init__(self, num_envs: int, device: torch.device, num_keypoints: int = 4): + self.offsets_base = get_keypoint_offsets(num_keypoints, device) + self.n_kp = self.offsets_base.shape[0] + self.identity_quat = ( + torch.tensor([[0.0, 0.0, 0.0, 1.0]], device=device, dtype=torch.float32) + .repeat(num_envs * self.n_kp, 1) + .contiguous() + ) + self.offsets_buf = torch.zeros(num_envs, self.n_kp, 3, device=device, dtype=torch.float32) + + def compute( + self, + pos_a: torch.Tensor, + quat_a: torch.Tensor, + pos_b: torch.Tensor, + quat_b: torch.Tensor, + scale: float = 1.0, + ) -> torch.Tensor: + """Return the mean L2 distance between transformed keypoint sets.""" + n = pos_a.shape[0] + offsets = self.offsets_base * scale + self.offsets_buf[:n] = offsets.unsqueeze(0) + off_flat = self.offsets_buf[:n].reshape(-1, 3) + iq = self.identity_quat[: n * self.n_kp] + + def _expand(t: torch.Tensor) -> torch.Tensor: + return t.unsqueeze(1).expand(-1, self.n_kp, -1).reshape(-1, t.shape[-1]) + + kp_a, _ = combine_frame_transforms(_expand(pos_a), _expand(quat_a), off_flat, iq) + kp_b, _ = combine_frame_transforms(_expand(pos_b), _expand(quat_b), off_flat, iq) + per_kp_dist = torch.norm(kp_b.reshape(n, self.n_kp, 3) - kp_a.reshape(n, self.n_kp, 3), p=2, dim=-1) + return per_kp_dist.mean(-1) + + +class gear_peg_keypoint_squashing(ManagerTermBase): + """Factory-style keypoint reward for gear-to-peg alignment. + + The term aligns the held gear insertion frame to the target peg frame using + a squashed keypoint distance. The squashing parameters are configured by the + reward group to provide coarse-to-fine shaping with the same geometry. + """ + + def __init__(self, cfg: RewardTermCfg, env: ManagerBasedRLEnv): + super().__init__(cfg, env) + self._peg_offset_values = tuple(cfg.params["peg_offset"]) + self.peg_offset = torch.tensor(self._peg_offset_values, device=env.device, dtype=torch.float32) + self._held_gear_base_offset_values = tuple(cfg.params["held_gear_base_offset"]) + self.held_gear_base_offset = torch.tensor( + self._held_gear_base_offset_values, device=env.device, dtype=torch.float32 + ) + self._xy_noise_range = cfg.params.get("peg_offset_xy_noise", 0.0) + self._num_keypoints: int = cfg.params.get("num_keypoints", 4) + self.kp = _KeypointDistanceComputer(env.num_envs, env.device, num_keypoints=self._num_keypoints) + self._offset_noise = torch.zeros(env.num_envs, 3, device=env.device, dtype=torch.float32) + + def reset(self, env_ids: torch.Tensor | None = None) -> None: + """Sample the per-episode XY target perturbation.""" + if self._xy_noise_range <= 0.0: + return + if env_ids is None: + env_ids = torch.arange(self._offset_noise.shape[0], device=self._offset_noise.device) + if len(env_ids) == 0: + return + + n = len(env_ids) + noise_dev = self._offset_noise.device + self._offset_noise[env_ids, 0] = (torch.rand(n, device=noise_dev) * 2 - 1) * self._xy_noise_range + self._offset_noise[env_ids, 1] = (torch.rand(n, device=noise_dev) * 2 - 1) * self._xy_noise_range + + def __call__( + self, + env: ManagerBasedRLEnv, + gear_cfg: SceneEntityCfg, + board_cfg: SceneEntityCfg, + peg_offset: list[float] | None = None, + held_gear_base_offset: list[float] | None = None, + keypoint_scale: float = 0.15, + num_keypoints: int = 4, + peg_offset_xy_noise: float = 0.0, + squash_a: float = 50.0, + squash_b: float = 2.0, + ) -> torch.Tensor: + """Return the squashed keypoint alignment reward.""" + self._validate_num_keypoints(num_keypoints) + held_base_pos, gear_quat = self._compute_held_base_pose(env, gear_cfg, held_gear_base_offset) + target_pos, target_quat = self._compute_target_pose(env, board_cfg, peg_offset) + target_pos = self._apply_target_noise(target_pos, peg_offset_xy_noise) + return self._compute_squashed_keypoint_reward( + target_pos, + target_quat, + held_base_pos, + gear_quat, + keypoint_scale, + squash_a, + squash_b, + ) + + def _validate_num_keypoints(self, num_keypoints: int) -> None: + """Validate that the reward uses the keypoint layout allocated at construction.""" + if num_keypoints != self._num_keypoints: + raise ValueError( + f"num_keypoints is fixed at term initialization. Expected {self._num_keypoints}, got {num_keypoints}." + ) + + def _compute_held_base_pose( + self, + env: ManagerBasedRLEnv, + gear_cfg: SceneEntityCfg, + held_gear_base_offset: list[float] | None, + ) -> tuple[torch.Tensor, torch.Tensor]: + """Return the held gear insertion-point pose in each environment frame.""" + gear: RigidObject = env.scene[gear_cfg.name] + held_gear_base_offset_tensor = gear_geometry.resolve_offset_tensor( + held_gear_base_offset, + self._held_gear_base_offset_values, + self.held_gear_base_offset, + env.device, + ) + return gear_geometry.compute_asset_local_offset_pose(env, gear, held_gear_base_offset_tensor) + + def _compute_target_pose( + self, + env: ManagerBasedRLEnv, + board_cfg: SceneEntityCfg, + peg_offset: list[float] | None, + ) -> tuple[torch.Tensor, torch.Tensor]: + """Return the target peg pose in each environment frame.""" + board: RigidObject = env.scene[board_cfg.name] + peg_offset_tensor = gear_geometry.resolve_offset_tensor( + peg_offset, self._peg_offset_values, self.peg_offset, env.device + ) + return gear_geometry.compute_asset_local_offset_pose(env, board, peg_offset_tensor) + + def _apply_target_noise(self, target_pos: torch.Tensor, peg_offset_xy_noise: float) -> torch.Tensor: + """Apply the per-reset XY target offset used for insertion robustness.""" + if peg_offset_xy_noise <= 0.0: + return target_pos + return target_pos + self._offset_noise[: target_pos.shape[0]] + + def _compute_squashed_keypoint_reward( + self, + target_pos: torch.Tensor, + target_quat: torch.Tensor, + held_base_pos: torch.Tensor, + gear_quat: torch.Tensor, + keypoint_scale: float, + squash_a: float, + squash_b: float, + ) -> torch.Tensor: + """Return the squashed mean distance between target and held-gear keypoints.""" + kp_dist = self.kp.compute(target_pos, target_quat, held_base_pos, gear_quat, scale=keypoint_scale) + return squashing_fn(kp_dist, squash_a, squash_b) + + +class gear_insertion_geometry_bonus(ManagerTermBase): + """Bonus when the gear satisfies the configured insertion geometry. + + Different reward terms reuse this class with different ``z_fraction`` values: + one for early peg engagement and one for the final success depth. + """ + + def __init__(self, cfg: RewardTermCfg, env: ManagerBasedRLEnv): + super().__init__(cfg, env) + self._peg_offset_values = tuple(cfg.params["peg_offset"]) + self._peg_offset = torch.tensor(self._peg_offset_values, device=env.device, dtype=torch.float32) + self._held_gear_base_offset_values = tuple(cfg.params["held_gear_base_offset"]) + self._held_gear_base_offset = torch.tensor( + self._held_gear_base_offset_values, device=env.device, dtype=torch.float32 + ) + + def __call__( + self, + env: ManagerBasedRLEnv, + gear_cfg: SceneEntityCfg, + board_cfg: SceneEntityCfg, + gear_peg_height: float, + z_fraction: float, + xy_threshold: float, + peg_offset: list[float] | None = None, + held_gear_base_offset: list[float] | None = None, + ) -> torch.Tensor: + """Return a binary insertion-geometry bonus as a float tensor.""" + return gear_geometry.compute_gear_insertion_success( + env, + gear_cfg, + board_cfg, + gear_geometry.resolve_offset_tensor(peg_offset, self._peg_offset_values, self._peg_offset, env.device), + gear_geometry.resolve_offset_tensor( + held_gear_base_offset, + self._held_gear_base_offset_values, + self._held_gear_base_offset, + env.device, + ), + gear_peg_height, + z_fraction, + xy_threshold, + ).float() diff --git a/isaaclab_arena/tasks/nist_gear_insertion/task.py b/isaaclab_arena/tasks/nist_gear_insertion/task.py new file mode 100644 index 000000000..7fd684fbc --- /dev/null +++ b/isaaclab_arena/tasks/nist_gear_insertion/task.py @@ -0,0 +1,467 @@ +# Copyright (c) 2025-2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""NIST gear insertion task. + +The task wiring follows the structure of Isaac Lab Factory/Forge insertion +tasks, with Arena assets and task registration. +""" + +from __future__ import annotations + +import math +import numpy as np +from dataclasses import MISSING, field + +import isaaclab.envs.mdp as mdp_isaac_lab +from isaaclab.envs.common import ViewerCfg +from isaaclab.managers import EventTermCfg +from isaaclab.managers import ObservationGroupCfg as ObsGroup +from isaaclab.managers import ObservationTermCfg as ObsTerm +from isaaclab.managers import RewardTermCfg, SceneEntityCfg, TerminationTermCfg +from isaaclab.utils import configclass + +from isaaclab_arena.assets.asset import Asset +from isaaclab_arena.embodiments.common.arm_mode import ArmMode +from isaaclab_arena.metrics.metric_base import MetricBase +from isaaclab_arena.metrics.success_rate import SuccessRateMetric +from isaaclab_arena.tasks.task_base import TaskBase +from isaaclab_arena.utils.cameras import get_viewer_cfg_look_at_object + +from . import geometry as gear_insertion_geometry +from . import observations as gear_insertion_observations +from . import rewards as gear_insertion_rewards +from .events import GraspCfg, place_gear_in_gripper +from .terminations import gear_dropped_from_gripper, gear_mesh_insertion_success + +_DEFAULT_PEG_OFFSET = (2.025e-2, 0.0, 0.0) + + +@configclass +class GearInsertionGeometryCfg: + """Geometry parameters shared by observations, rewards, and terminations. + + Offsets are expressed in the local frame of their asset. ``peg_offset_for_obs`` + may differ from ``peg_offset_from_board`` when the policy should target the + peg tip while success checks use the peg base. + """ + + peg_offset_from_board: list[float] = field(default_factory=lambda: list(_DEFAULT_PEG_OFFSET)) + peg_offset_for_obs: list[float] | None = None + held_gear_base_offset: list[float] = field(default_factory=lambda: list(_DEFAULT_PEG_OFFSET)) + gear_peg_height: float = 0.02 + success_z_fraction: float = 0.30 + xy_threshold: float = 0.0025 + peg_offset_xy_noise: float = 0.005 + + +class NistGearInsertionRLTask(TaskBase): + """RL task for inserting the held gear onto a fixed NIST peg. + + The task owns scene-level wiring: generic observations and geometry rewards, + reset events, success/drop terminations, and success-rate metrics. The OSC + policy observation and controller penalties are configured by the + environment MDP layer because they depend on a specific action term. + """ + + def __init__( + self, + assembled_board: Asset, + held_gear: Asset, + background_scene: Asset, + gear_base_asset: Asset | None = None, + geometry_cfg: GearInsertionGeometryCfg | None = None, + episode_length_s: float | None = None, + task_description: str | None = None, + grasp_cfg: GraspCfg | None = None, + fingertip_body_name: str = "panda_fingertip_centered", + enable_randomization: bool = False, + disable_drop_terminations: bool = True, + disable_success_termination: bool = False, + ): + super().__init__(episode_length_s=episode_length_s, task_description=task_description) + self.assembled_board = assembled_board + self.held_gear = held_gear + self.background_scene = background_scene + self.held_gear.disable_reset_pose() + self._gear_base_asset = gear_base_asset if gear_base_asset is not None else assembled_board + self.geometry_cfg = geometry_cfg if geometry_cfg is not None else GearInsertionGeometryCfg() + self.grasp_cfg = grasp_cfg + self.fingertip_body_name = fingertip_body_name + self.enable_randomization = enable_randomization + self.disable_drop_terminations = disable_drop_terminations + self.disable_success_termination = disable_success_termination + if self.task_description is None: + self.task_description = f"Insert the {held_gear.name} onto the gear base on the {assembled_board.name}" + + def get_scene_cfg(self): + """Return no additional scene config. + + Arena constructs the scene from assets supplied by the environment. + """ + + def get_observation_cfg(self): + """Return generic task observations for gear and peg geometry.""" + geometry_cfg = self.geometry_cfg + # Policies can observe the peg tip while geometry rewards and + # terminations evaluate insertion against the peg base. + peg_obs = ( + geometry_cfg.peg_offset_for_obs + if geometry_cfg.peg_offset_for_obs is not None + else geometry_cfg.peg_offset_from_board + ) + return GearInsertionObservationsCfg( + gear_name=self.held_gear.name, + board_name=self._gear_base_asset.name, + peg_offset=peg_obs, + held_gear_base_offset=geometry_cfg.held_gear_base_offset, + fingertip_body_name=self.fingertip_body_name, + ) + + def get_rewards_cfg(self): + """Return geometry-only reward terms for the task.""" + geometry_cfg = self.geometry_cfg + return GearInsertionRewardsCfg( + gear_name=self.held_gear.name, + board_name=self._gear_base_asset.name, + peg_offset=geometry_cfg.peg_offset_from_board, + held_gear_base_offset=geometry_cfg.held_gear_base_offset, + gear_peg_height=geometry_cfg.gear_peg_height, + success_z_fraction=geometry_cfg.success_z_fraction, + xy_threshold=geometry_cfg.xy_threshold, + peg_offset_xy_noise=geometry_cfg.peg_offset_xy_noise, + ) + + def get_termination_cfg(self): + """Return success and optional drop termination terms.""" + geometry_cfg = self.geometry_cfg + success = TerminationTermCfg( + func=gear_mesh_insertion_success, + params={ + "held_object_cfg": SceneEntityCfg(self.held_gear.name), + "fixed_object_cfg": SceneEntityCfg(self._gear_base_asset.name), + "gear_base_offset": geometry_cfg.peg_offset_from_board, + "held_gear_base_offset": geometry_cfg.held_gear_base_offset, + "gear_peg_height": geometry_cfg.gear_peg_height, + "success_z_fraction": geometry_cfg.success_z_fraction, + "xy_threshold": geometry_cfg.xy_threshold, + "disable_success_termination": self.disable_success_termination, + }, + ) + + cfg = GearInsertionTerminationsCfg(success=success, object_dropped=None) + # Drop checks are disabled during training to allow recovery. + if not self.disable_drop_terminations: + cfg.object_dropped = TerminationTermCfg( + func=mdp_isaac_lab.root_height_below_minimum, + params={ + "minimum_height": self.background_scene.object_min_z, + "asset_cfg": SceneEntityCfg(self.held_gear.name), + }, + ) + if not self.disable_drop_terminations and self.grasp_cfg is not None: + cfg.gear_dropped_from_gripper = TerminationTermCfg( + func=gear_dropped_from_gripper, + params={ + "gear_cfg": SceneEntityCfg(self.held_gear.name), + "robot_cfg": SceneEntityCfg("robot"), + "ee_body_name": self.grasp_cfg.end_effector_body_name, + "distance_threshold": 0.15, + }, + ) + return cfg + + def get_events_cfg(self): + """Return reset and randomization events for gear insertion.""" + cfg = GearInsertionEventsCfg() + self._add_grasp_reset_event(cfg) + if self.enable_randomization: + self._add_randomization_events(cfg) + return cfg + + def _add_grasp_reset_event(self, cfg: GearInsertionEventsCfg) -> None: + """Add the reset event that places the held gear in the gripper.""" + grasp_cfg = self.grasp_cfg + if grasp_cfg is not None and grasp_cfg.gripper_joint_setter_func is not None: + # The held gear does not use its default reset pose in this task; + # the embodiment-specific grasp event places it in the gripper. + cfg.place_gear = EventTermCfg( + func=place_gear_in_gripper, + mode="reset", + params={ + "gear_cfg": SceneEntityCfg(self.held_gear.name), + "grasp_cfg": grasp_cfg, + }, + ) + + def _add_randomization_events(self, cfg: GearInsertionEventsCfg) -> None: + """Add optional Factory/Forge-style domain randomization events.""" + grasp_cfg = self.grasp_cfg + assert grasp_cfg is not None, "NIST gear insertion randomization requires an embodiment grasp configuration." + arm_joints = grasp_cfg.arm_joint_names + + self._add_asset_randomization_events(cfg) + self._add_robot_randomization_events(cfg, arm_joints) + + def _add_asset_randomization_events(self, cfg: GearInsertionEventsCfg) -> None: + """Add reset randomization for the held gear and fixed gear base.""" + cfg.held_object_mass = EventTermCfg( + func=mdp_isaac_lab.randomize_rigid_body_mass, + mode="reset", + params={ + "asset_cfg": SceneEntityCfg(self.held_gear.name), + "mass_distribution_params": (-0.005, 0.005), + "operation": "add", + "distribution": "uniform", + }, + ) + cfg.fixed_asset_pose = EventTermCfg( + func=mdp_isaac_lab.reset_root_state_uniform, + mode="reset", + params={ + "asset_cfg": SceneEntityCfg(self._gear_base_asset.name), + # Keep translational variation fixed; only the board yaw is + # randomized as in the Forge insertion setup. + "pose_range": { + "yaw": (0.0, math.radians(15.0)), + }, + "velocity_range": {}, + }, + ) + + def _add_robot_randomization_events(self, cfg: GearInsertionEventsCfg, arm_joints: str) -> None: + """Add actuator and joint-parameter randomization for the arm.""" + cfg.robot_actuator_gains = EventTermCfg( + func=mdp_isaac_lab.randomize_actuator_gains, + mode="reset", + params={ + "asset_cfg": SceneEntityCfg("robot", joint_names=arm_joints), + "stiffness_distribution_params": (0.75, 1.5), + "damping_distribution_params": (0.3, 3.0), + "operation": "scale", + "distribution": "log_uniform", + }, + ) + cfg.robot_joint_friction = EventTermCfg( + func=mdp_isaac_lab.randomize_joint_parameters, + mode="reset", + params={ + "asset_cfg": SceneEntityCfg("robot", joint_names=arm_joints), + "friction_distribution_params": (0.3, 0.7), + "operation": "add", + "distribution": "uniform", + }, + ) + + def get_mimic_env_cfg(self, arm_mode: ArmMode): + """Raise because this task currently only defines the RL setup.""" + del arm_mode + raise NotImplementedError("NIST gear insertion does not define a Mimic configuration yet.") + + def get_metrics(self) -> list[MetricBase]: + """Return task metrics used during evaluation.""" + return [SuccessRateMetric()] + + def get_viewer_cfg(self) -> ViewerCfg: + """Return a camera view focused on the held gear and peg area.""" + return get_viewer_cfg_look_at_object( + lookat_object=self.held_gear, + offset=np.array([1.5, -0.5, 1.0]), + ) + + +@configclass +class GearInsertionTerminationsCfg: + """Termination terms for the gear insertion task. + + Success is always registered so metrics can read it. Drop checks are + optional and are disabled for recovery-focused RL training. + """ + + time_out: TerminationTermCfg = TerminationTermCfg(func=mdp_isaac_lab.time_out) + success: TerminationTermCfg = MISSING + object_dropped: TerminationTermCfg | None = None + gear_dropped_from_gripper: TerminationTermCfg | None = None + + +@configclass +class GearInsertionEventsCfg: + """Reset and randomization events for gear insertion. + + ``reset_all`` restores the base scene first; task-specific terms then place + the held gear in the gripper and apply optional domain randomization. + """ + + reset_all: EventTermCfg = EventTermCfg( + func=mdp_isaac_lab.reset_scene_to_default, + mode="reset", + params={"reset_joint_targets": True}, + ) + place_gear: EventTermCfg | None = None + fixed_asset_pose: EventTermCfg | None = None + held_object_mass: EventTermCfg | None = None + robot_actuator_gains: EventTermCfg | None = None + robot_joint_friction: EventTermCfg | None = None + + +@configclass +class GearInsertionTaskObsCfg(ObsGroup): + """Generic observation group for gear insertion task state. + + These terms expose object poses, peg geometry, robot joints, and noiseless + end-effector pose. Controller-specific policy packing remains outside this + generic task config. + """ + + gear_pos: ObsTerm = MISSING + gear_quat: ObsTerm = MISSING + peg_pos: ObsTerm = MISSING + board_quat: ObsTerm = MISSING + peg_delta: ObsTerm = MISSING + joint_pos: ObsTerm = MISSING + joint_vel: ObsTerm = MISSING + ee_pos_noiseless: ObsTerm = MISSING + ee_quat_noiseless: ObsTerm = MISSING + + def __post_init__(self): + self.enable_corruption = False + self.concatenate_terms = True + + +@configclass +class GearInsertionObservationsCfg: + """Observation config for the generic gear insertion task state.""" + + task_obs: GearInsertionTaskObsCfg = MISSING + + def __init__( + self, + gear_name: str, + board_name: str, + peg_offset: list[float], + held_gear_base_offset: list[float] | None = None, + fingertip_body_name: str = "panda_fingertip_centered", + ): + held_offset = held_gear_base_offset if held_gear_base_offset is not None else [2.025e-2, 0.0, 0.0] + self.task_obs = GearInsertionTaskObsCfg() + + self.task_obs.gear_pos = ObsTerm( + func=mdp_isaac_lab.root_pos_w, + params={"asset_cfg": SceneEntityCfg(gear_name)}, + ) + self.task_obs.gear_quat = ObsTerm( + func=mdp_isaac_lab.root_quat_w, + params={"make_quat_unique": True, "asset_cfg": SceneEntityCfg(gear_name)}, + ) + self.task_obs.board_quat = ObsTerm( + func=mdp_isaac_lab.root_quat_w, + params={"make_quat_unique": True, "asset_cfg": SceneEntityCfg(board_name)}, + ) + self.task_obs.peg_pos = ObsTerm( + func=gear_insertion_geometry.peg_pos_in_env_frame, + params={"board_cfg": SceneEntityCfg(board_name), "peg_offset": peg_offset}, + ) + self.task_obs.peg_delta = ObsTerm( + func=gear_insertion_geometry.peg_delta_from_held_gear_base, + params={ + "gear_cfg": SceneEntityCfg(gear_name), + "board_cfg": SceneEntityCfg(board_name), + "peg_offset": peg_offset, + "held_gear_base_offset": held_offset, + }, + ) + self.task_obs.joint_pos = ObsTerm( + func=mdp_isaac_lab.joint_pos_rel, + params={"asset_cfg": SceneEntityCfg("robot")}, + ) + self.task_obs.joint_vel = ObsTerm( + func=mdp_isaac_lab.joint_vel_rel, + params={"asset_cfg": SceneEntityCfg("robot")}, + ) + self.task_obs.ee_pos_noiseless = ObsTerm( + func=gear_insertion_observations.body_pos_in_env_frame, + params={"body_name": fingertip_body_name}, + ) + self.task_obs.ee_quat_noiseless = ObsTerm( + func=gear_insertion_observations.body_quat_canonical, + params={"body_name": fingertip_body_name}, + ) + + +@configclass +class GearInsertionRewardsCfg: + """Reward terms for gear insertion. + + Keypoint shaping and insertion-depth bonuses mirror the Factory/Forge + assembly reward structure used for peg and gear insertion. + """ + + kp_baseline: RewardTermCfg = MISSING + kp_coarse: RewardTermCfg = MISSING + kp_fine: RewardTermCfg = MISSING + engagement_bonus: RewardTermCfg = MISSING + success_bonus: RewardTermCfg = MISSING + + def __init__( + self, + gear_name: str, + board_name: str, + peg_offset: list[float], + held_gear_base_offset: list[float], + gear_peg_height: float, + success_z_fraction: float, + xy_threshold: float, + peg_offset_xy_noise: float = 0.005, + ): + gear_cfg = SceneEntityCfg(gear_name) + board_cfg = SceneEntityCfg(board_name) + keypoint_params = { + "gear_cfg": gear_cfg, + "board_cfg": board_cfg, + "peg_offset": peg_offset, + "held_gear_base_offset": held_gear_base_offset, + "keypoint_scale": 0.15, + "num_keypoints": 4, + "peg_offset_xy_noise": peg_offset_xy_noise, + } + geometry_params = { + "gear_cfg": gear_cfg, + "board_cfg": board_cfg, + "peg_offset": peg_offset, + "held_gear_base_offset": held_gear_base_offset, + "gear_peg_height": gear_peg_height, + "xy_threshold": xy_threshold, + } + + # The keypoint terms share geometry and differ only by squashing + # strength, giving coarse-to-fine shaping around the peg. + self.kp_baseline = RewardTermCfg( + func=gear_insertion_rewards.gear_peg_keypoint_squashing, + weight=1.0, + params={**keypoint_params, "squash_a": 5.0, "squash_b": 4.0}, + ) + self.kp_coarse = RewardTermCfg( + func=gear_insertion_rewards.gear_peg_keypoint_squashing, + weight=1.0, + params={**keypoint_params, "squash_a": 50.0, "squash_b": 2.0}, + ) + self.kp_fine = RewardTermCfg( + func=gear_insertion_rewards.gear_peg_keypoint_squashing, + weight=1.0, + params={**keypoint_params, "squash_a": 100.0, "squash_b": 0.0}, + ) + + # Sparse insertion bonuses use the same success geometry as the task + # termination with different insertion-depth thresholds. + self.engagement_bonus = RewardTermCfg( + func=gear_insertion_rewards.gear_insertion_geometry_bonus, + weight=1.0, + params={**geometry_params, "z_fraction": 0.90}, + ) + self.success_bonus = RewardTermCfg( + func=gear_insertion_rewards.gear_insertion_geometry_bonus, + weight=1.0, + params={**geometry_params, "z_fraction": success_z_fraction}, + ) diff --git a/isaaclab_arena/tasks/nist_gear_insertion/terminations.py b/isaaclab_arena/tasks/nist_gear_insertion/terminations.py new file mode 100644 index 000000000..b8b0d9a3d --- /dev/null +++ b/isaaclab_arena/tasks/nist_gear_insertion/terminations.py @@ -0,0 +1,70 @@ +# Copyright (c) 2025-2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Termination terms for gear insertion tasks.""" + +from __future__ import annotations + +import torch + +import warp as wp +from isaaclab.assets import Articulation, RigidObject +from isaaclab.envs import ManagerBasedRLEnv +from isaaclab.managers import SceneEntityCfg + +from isaaclab_arena.tasks.nist_gear_insertion.geometry import compute_gear_insertion_success + + +def gear_mesh_insertion_success( + env: ManagerBasedRLEnv, + held_object_cfg: SceneEntityCfg, + fixed_object_cfg: SceneEntityCfg, + gear_base_offset: tuple[float, ...], + held_gear_base_offset: list[float] | None = None, + gear_peg_height: float = 0.02, + success_z_fraction: float = 0.30, + xy_threshold: float = 0.0025, + disable_success_termination: bool = False, +) -> torch.Tensor: + """Terminate when the held gear is centered on the peg and lowered to the success depth. + + When success termination is disabled, the same term remains registered but + always returns false. This keeps success-rate metrics available during RL + training while allowing policies to continue after first success. + """ + if disable_success_termination: + return torch.zeros(env.num_envs, dtype=torch.bool, device=env.device) + return compute_gear_insertion_success( + env=env, + gear_cfg=held_object_cfg, + board_cfg=fixed_object_cfg, + peg_offset=gear_base_offset, + held_gear_base_offset=held_gear_base_offset if held_gear_base_offset is not None else gear_base_offset, + gear_peg_height=gear_peg_height, + z_fraction=success_z_fraction, + xy_threshold=xy_threshold, + ) + + +def gear_dropped_from_gripper( + env: ManagerBasedRLEnv, + gear_cfg: SceneEntityCfg, + robot_cfg: SceneEntityCfg = SceneEntityCfg("robot"), + ee_body_name: str = "panda_hand", + distance_threshold: float = 0.15, +) -> torch.Tensor: + """Reset when the gear has fallen too far from the end-effector. + + This optional guard is useful for evaluation and non-training rollouts. It + is disabled during recovery-focused training so the policy can learn from + off-nominal states. + """ + gear: RigidObject = env.scene[gear_cfg.name] + robot: Articulation = env.scene[robot_cfg.name] + eef_indices, _ = robot.find_bodies([ee_body_name]) + ee_pos = wp.to_torch(robot.data.body_pos_w)[:, eef_indices[0]] + gear_pos = wp.to_torch(gear.data.root_pos_w) + distance = torch.norm(gear_pos - ee_pos, dim=-1) + return distance > distance_threshold diff --git a/isaaclab_arena/tests/test_nist_gear_insertion_task.py b/isaaclab_arena/tests/test_nist_gear_insertion_task.py new file mode 100644 index 000000000..b488870ca --- /dev/null +++ b/isaaclab_arena/tests/test_nist_gear_insertion_task.py @@ -0,0 +1,265 @@ +# Copyright (c) 2025-2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +import argparse +import math +import torch +from dataclasses import dataclass + +import pytest + +_PEG_BASE_OFFSET = [0.02025, 0.0, 0.0] +_PEG_TIP_OFFSET = [0.02025, 0.0, 0.025] + + +@dataclass +class _DummyAsset: + name: str + object_min_z: float | None = None + reset_pose_disabled: bool = False + + def disable_reset_pose(self) -> None: + self.reset_pose_disabled = True + + +def _make_nist_task(**kwargs): + from isaaclab_arena.tasks.nist_gear_insertion.task import GearInsertionGeometryCfg, NistGearInsertionRLTask + + return NistGearInsertionRLTask( + assembled_board=_DummyAsset("nist_board_assembled"), + held_gear=_DummyAsset("medium_nist_gear"), + background_scene=_DummyAsset("table", object_min_z=0.0), + gear_base_asset=_DummyAsset("gears_and_base"), + geometry_cfg=GearInsertionGeometryCfg( + peg_offset_from_board=list(_PEG_BASE_OFFSET), + peg_offset_for_obs=list(_PEG_TIP_OFFSET), + held_gear_base_offset=list(_PEG_BASE_OFFSET), + success_z_fraction=0.20, + xy_threshold=0.0025, + ), + **kwargs, + ) + + +def test_task_geometry_config_feeds_observations_rewards_and_success(): + from isaaclab_arena.tasks.nist_gear_insertion import geometry, rewards + from isaaclab_arena.tasks.nist_gear_insertion.terminations import gear_mesh_insertion_success + + task = _make_nist_task(disable_success_termination=True) + + obs_cfg = task.get_observation_cfg() + reward_cfg = task.get_rewards_cfg() + termination_cfg = task.get_termination_cfg() + + assert task.held_gear.reset_pose_disabled + assert obs_cfg.task_obs.peg_pos.func is geometry.peg_pos_in_env_frame + assert obs_cfg.task_obs.peg_pos.params["board_cfg"].name == "gears_and_base" + assert obs_cfg.task_obs.peg_pos.params["peg_offset"] == _PEG_TIP_OFFSET + + assert obs_cfg.task_obs.peg_delta.func is geometry.peg_delta_from_held_gear_base + assert obs_cfg.task_obs.peg_delta.params["gear_cfg"].name == "medium_nist_gear" + assert obs_cfg.task_obs.peg_delta.params["held_gear_base_offset"] == _PEG_BASE_OFFSET + assert obs_cfg.task_obs.concatenate_terms + + for term in (reward_cfg.kp_baseline, reward_cfg.kp_coarse, reward_cfg.kp_fine): + assert term.func is rewards.gear_peg_keypoint_squashing + assert term.params["peg_offset"] == _PEG_BASE_OFFSET + assert term.params["held_gear_base_offset"] == _PEG_BASE_OFFSET + + assert reward_cfg.engagement_bonus.func is rewards.gear_insertion_geometry_bonus + assert reward_cfg.engagement_bonus.params["z_fraction"] == 0.90 + assert reward_cfg.success_bonus.func is rewards.gear_insertion_geometry_bonus + assert reward_cfg.success_bonus.params["z_fraction"] == 0.20 + + assert termination_cfg.success.func is gear_mesh_insertion_success + assert termination_cfg.success.params["gear_base_offset"] == _PEG_BASE_OFFSET + assert termination_cfg.success.params["held_gear_base_offset"] == _PEG_BASE_OFFSET + assert termination_cfg.success.params["disable_success_termination"] + assert termination_cfg.object_dropped is None + assert termination_cfg.gear_dropped_from_gripper is None + + +def test_task_events_add_grasp_reset_and_optional_randomization(): + from isaaclab_arena.embodiments.franka.nist_gear_insertion.gear_grasp import franka_gripper_joint_setter + from isaaclab_arena.tasks.nist_gear_insertion.events import GraspCfg, place_gear_in_gripper + + grasp_cfg = GraspCfg( + gripper_joint_setter_func=franka_gripper_joint_setter, + arm_joint_names="panda_joint[1-7]", + ) + + events_cfg = _make_nist_task(grasp_cfg=grasp_cfg).get_events_cfg() + + assert events_cfg.place_gear.func is place_gear_in_gripper + assert events_cfg.place_gear.params["gear_cfg"].name == "medium_nist_gear" + assert events_cfg.place_gear.params["grasp_cfg"].gripper_joint_setter_func is franka_gripper_joint_setter + assert events_cfg.place_gear.params["grasp_cfg"].arm_joint_names == "panda_joint[1-7]" + assert events_cfg.held_object_mass is None + assert events_cfg.fixed_asset_pose is None + assert events_cfg.robot_actuator_gains is None + assert events_cfg.robot_joint_friction is None + + randomized_cfg = _make_nist_task(grasp_cfg=grasp_cfg, enable_randomization=True).get_events_cfg() + + assert randomized_cfg.held_object_mass.params["asset_cfg"].name == "medium_nist_gear" + assert randomized_cfg.fixed_asset_pose.params["asset_cfg"].name == "gears_and_base" + assert randomized_cfg.fixed_asset_pose.params["pose_range"]["yaw"] == (0.0, math.radians(15.0)) + assert randomized_cfg.robot_actuator_gains.params["asset_cfg"].joint_names == "panda_joint[1-7]" + assert randomized_cfg.robot_joint_friction.params["asset_cfg"].joint_names == "panda_joint[1-7]" + + with pytest.raises(AssertionError): + _make_nist_task(enable_randomization=True).get_events_cfg() + + +def test_grasp_cfg_width_setter_maps_total_width_to_finger_joints(): + from isaaclab_arena.embodiments.franka.nist_gear_insertion.gear_grasp import ( + franka_gripper_joint_setter, + get_franka_nist_gear_insertion_grasp_config, + ) + + grasp_cfg = get_franka_nist_gear_insertion_grasp_config() + joint_pos = torch.zeros(2, 9) + + franka_gripper_joint_setter(joint_pos, torch.tensor([0, 1]), [7, 8], width=0.04) + + assert torch.allclose(joint_pos[:, 7], torch.full((2,), 0.02)) + assert torch.allclose(joint_pos[:, 8], torch.full((2,), 0.02)) + assert grasp_cfg.gripper_joint_setter_func is franka_gripper_joint_setter + assert grasp_cfg.hand_grasp_width == 0.03 + assert grasp_cfg.hand_close_width == 0.0 + assert grasp_cfg.grasp_rot_offset == [1.0, 0.0, 0.0, 0.0] + assert grasp_cfg.grasp_offset == [0.02, 0.0, -0.128] + + +def test_franka_osc_action_observation_and_reward_configs_use_scene_geometry(): + from isaaclab_arena_environments.mdp.nist_gear_insertion import osc_rewards + from isaaclab_arena_environments.mdp.nist_gear_insertion.franka_osc_cfg import ( + FrankaNistGearInsertionObservationsCfg, + FrankaNistGearInsertionOscActionsCfg, + ) + from isaaclab_arena_environments.mdp.nist_gear_insertion.osc_action import ACTION_DIM, NistGearInsertionOscActionCfg + from isaaclab_arena_environments.mdp.nist_gear_insertion.osc_observations import ( + POLICY_OBS_DIM, + POLICY_OBS_LAYOUT, + NistGearInsertionPolicyObservations, + ) + + action_cfg = FrankaNistGearInsertionOscActionsCfg( + fixed_asset_name="gears_and_base", + peg_offset=tuple(_PEG_TIP_OFFSET), + ) + obs_cfg = FrankaNistGearInsertionObservationsCfg( + fixed_asset_name="gears_and_base", + peg_offset=tuple(_PEG_TIP_OFFSET), + fingertip_body_name="panda_fingertip_centered", + concatenate_observation_terms=True, + ) + reward_cfg = osc_rewards.NistGearInsertionOscRewardsCfg( + gear_name="medium_nist_gear", + board_name="gears_and_base", + peg_offset=_PEG_BASE_OFFSET, + held_gear_base_offset=_PEG_BASE_OFFSET, + gear_peg_height=0.02, + success_z_fraction=0.20, + xy_threshold=0.0025, + ) + + assert isinstance(action_cfg.arm_action, NistGearInsertionOscActionCfg) + assert action_cfg.arm_action.fixed_asset_name == "gears_and_base" + assert action_cfg.arm_action.peg_offset == tuple(_PEG_TIP_OFFSET) + assert action_cfg.arm_action.body_name == "panda_fingertip_centered" + assert action_cfg.gripper_action is None + assert ACTION_DIM == 7 + + policy_obs = obs_cfg.policy.nist_gear_insertion_policy_obs + assert policy_obs.func is NistGearInsertionPolicyObservations + assert policy_obs.params["board_name"] == "gears_and_base" + assert policy_obs.params["peg_offset"] == _PEG_TIP_OFFSET + assert policy_obs.params["fingertip_body_name"] == "panda_fingertip_centered" + assert obs_cfg.policy.concatenate_terms + assert POLICY_OBS_DIM == sum(size for _, size in POLICY_OBS_LAYOUT) + + assert reward_cfg.action_magnitude_penalty.func is osc_rewards.osc_action_magnitude_penalty + assert reward_cfg.action_delta_penalty.func is osc_rewards.osc_action_delta_penalty + assert reward_cfg.contact_penalty.func is osc_rewards.wrist_contact_force_penalty + assert reward_cfg.success_prediction_error.func is osc_rewards.success_prediction_error + assert reward_cfg.success_prediction_error.params["peg_offset"] == _PEG_BASE_OFFSET + assert reward_cfg.success_prediction_error.params["success_z_fraction"] == 0.20 + + +def test_gear_insertion_geometry_success_thresholds(): + from isaaclab_arena.tasks.nist_gear_insertion.geometry import check_gear_insertion_geometry + + held_base_pos = torch.tensor([ + [0.0, 0.0, 0.001], + [0.01, 0.0, 0.001], + [0.0, 0.0, 0.01], + ]) + peg_pos = torch.zeros(3, 3) + + success = check_gear_insertion_geometry( + held_base_pos=held_base_pos, + peg_pos=peg_pos, + gear_peg_height=0.02, + z_fraction=0.2, + xy_threshold=0.0025, + ) + + assert success.tolist() == [True, False, False] + + +def test_osc_yaw_mapping_wraps_to_policy_interval(): + from isaaclab_arena_environments.mdp.nist_gear_insertion.osc_action import ( + _target_yaw_to_action, + _wrap_yaw_to_action_range, + ) + + yaw = torch.tensor([-math.pi, -math.pi / 2.0, 0.0, math.pi / 2.0, 0.75 * math.pi, math.pi]) + wrapped_yaw = _wrap_yaw_to_action_range(yaw) + action = _target_yaw_to_action(wrapped_yaw) + + assert torch.allclose(wrapped_yaw, torch.tensor([-math.pi, -math.pi / 2.0, 0.0, math.pi / 2.0, -math.pi, -math.pi])) + assert torch.all(action <= 1.0) + assert torch.all(action >= -1.0) + + +def test_policy_observation_layout_masks_symmetric_quaternion_and_computes_velocity(): + from isaaclab_arena_environments.mdp.nist_gear_insertion.osc_observations import ( + POLICY_OBS_LAYOUT, + PREV_ACTION_DIM, + _compute_pose_velocities, + _make_reported_quat, + ) + + assert POLICY_OBS_LAYOUT[-1] == ("prev_actions", PREV_ACTION_DIM) + + flip = torch.tensor([1.0, -1.0]) + noisy_quat = torch.tensor([[0.1, 0.2, 0.3, 0.4], [-0.2, 0.1, -0.4, 0.3]]) + reported_quat = _make_reported_quat(noisy_quat, flip) + + assert torch.allclose(reported_quat[:, 2:], torch.zeros(2, 2)) + assert torch.allclose(reported_quat[:, :2], noisy_quat[:, :2] * flip.unsqueeze(-1)) + + prev_pos = torch.zeros(2, 3) + noisy_pos = torch.tensor([[0.01, 0.0, 0.0], [0.0, 0.02, 0.0]]) + ee_linvel, ee_angvel = _compute_pose_velocities(prev_pos, reported_quat, noisy_pos, reported_quat, dt=0.05) + + assert torch.allclose(ee_linvel, noisy_pos / 0.05) + assert torch.allclose(ee_angvel, torch.zeros_like(ee_angvel), atol=1e-6) + + +def test_nist_osc_environment_cli_is_fixed_to_specialized_embodiment(): + from isaaclab_arena_environments.nist_assembled_gearmesh_osc_environment import NISTAssembledGearMeshOSCEnvironment + + parser = argparse.ArgumentParser() + NISTAssembledGearMeshOSCEnvironment.add_cli_args(parser) + + args_cli = parser.parse_args([]) + + assert not hasattr(args_cli, "embodiment") + assert not args_cli.disable_success_termination + assert parser.parse_args(["--rl_training_mode"]).disable_success_termination + with pytest.raises(SystemExit): + parser.parse_args(["--embodiment", "franka_ik"]) diff --git a/isaaclab_arena/tests/test_rl_games_action_policy.py b/isaaclab_arena/tests/test_rl_games_action_policy.py new file mode 100644 index 000000000..d92fe9495 --- /dev/null +++ b/isaaclab_arena/tests/test_rl_games_action_policy.py @@ -0,0 +1,43 @@ +# Copyright (c) 2025-2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +import torch + + +class _DummyUnwrappedEnv: + def __init__(self): + self.obs_buf = {"policy": torch.ones(1, 3)} + + +class _DummyEnv: + def __init__(self): + self.unwrapped = _DummyUnwrappedEnv() + self.num_resets = 0 + + def reset(self, *args, **kwargs): + self.num_resets += 1 + return {"policy": torch.zeros(1, 3)}, {"reset": True} + + +def test_rl_games_wrapper_init_reuses_current_observation_buffer(monkeypatch): + from isaaclab_arena.policy import rl_games_action_policy + + env = _DummyEnv() + original_reset = env.reset + observed = {} + + def _mock_base_init(self, env, *args, **kwargs): + obs, info = env.reset() + observed["obs"] = obs + observed["info"] = info + self.env = env + + monkeypatch.setattr(rl_games_action_policy.RlGamesVecEnvWrapper, "__init__", _mock_base_init) + rl_games_action_policy._RlGamesInferenceEnvWrapper(env, "cuda:0", torch.inf, torch.inf) + + assert env.reset == original_reset + assert env.num_resets == 0 + assert observed["info"] == {} + assert torch.equal(observed["obs"]["policy"], env.unwrapped.obs_buf["policy"]) diff --git a/isaaclab_arena_environments/mdp/nist_gear_insertion/__init__.py b/isaaclab_arena_environments/mdp/nist_gear_insertion/__init__.py new file mode 100644 index 000000000..adf64306f --- /dev/null +++ b/isaaclab_arena_environments/mdp/nist_gear_insertion/__init__.py @@ -0,0 +1,8 @@ +# Copyright (c) 2025-2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Controller-specific MDP terms for NIST gear insertion environments.""" + +from .franka_osc_cfg import FrankaNistGearInsertionObservationsCfg, FrankaNistGearInsertionOscActionsCfg diff --git a/isaaclab_arena_environments/mdp/nist_gear_insertion/franka_osc_cfg.py b/isaaclab_arena_environments/mdp/nist_gear_insertion/franka_osc_cfg.py new file mode 100644 index 000000000..431fc18d5 --- /dev/null +++ b/isaaclab_arena_environments/mdp/nist_gear_insertion/franka_osc_cfg.py @@ -0,0 +1,106 @@ +# Copyright (c) 2025-2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Franka OSC action and observation configs for NIST gear insertion.""" + +from __future__ import annotations + +from dataclasses import MISSING + +from isaaclab.controllers import OperationalSpaceControllerCfg +from isaaclab.managers import ActionTermCfg +from isaaclab.managers import ObservationGroupCfg as ObsGroup +from isaaclab.managers import ObservationTermCfg as ObsTerm +from isaaclab.utils import configclass + +from isaaclab_arena_environments.mdp.nist_gear_insertion.osc_action import NistGearInsertionOscActionCfg +from isaaclab_arena_environments.mdp.nist_gear_insertion.osc_observations import NistGearInsertionPolicyObservations + + +@configclass +class FrankaNistGearInsertionObservationsCfg: + """Policy observations for a Franka gear-insertion OSC environment. + + The fixed asset name and peg offset are supplied by the environment because + they describe scene geometry. + """ + + @configclass + class PolicyCfg(ObsGroup): + """Gear-insertion policy observation group.""" + + nist_gear_insertion_policy_obs: ObsTerm = MISSING + + def __post_init__(self): + self.enable_corruption = False + self.concatenate_terms = False + + policy: PolicyCfg = MISSING + + def __init__( + self, + fixed_asset_name: str, + peg_offset: tuple[float, float, float], + fingertip_body_name: str = "panda_fingertip_centered", + concatenate_observation_terms: bool = False, + ): + self.policy = self.PolicyCfg() + self.policy.nist_gear_insertion_policy_obs = ObsTerm( + func=NistGearInsertionPolicyObservations, + params={ + "robot_name": "robot", + "board_name": fixed_asset_name, + "peg_offset": list(peg_offset), + "fingertip_body_name": fingertip_body_name, + "force_body_name": "force_sensor", + "pos_noise_level": 0.0, + "rot_noise_level_deg": 0.0, + "force_noise_level": 0.0, + }, + ) + self.policy.concatenate_terms = concatenate_observation_terms + + +@configclass +class FrankaNistGearInsertionOscActionsCfg: + """Action terms for the Franka gear-insertion OSC policy. + + The seven-dimensional arm policy controls the insertion OSC term. The + gripper is not exposed as a policy action; reset events place the held gear + in the hand and maintain the grasp target. + """ + + arm_action: ActionTermCfg = MISSING + gripper_action: ActionTermCfg | None = None + + def __init__( + self, + fixed_asset_name: str, + peg_offset: tuple[float, float, float], + ): + self.arm_action = NistGearInsertionOscActionCfg( + asset_name="robot", + joint_names=["panda_joint[1-7]"], + body_name="panda_fingertip_centered", + controller_cfg=OperationalSpaceControllerCfg( + target_types=["pose_abs"], + impedance_mode="fixed", + inertial_dynamics_decoupling=True, + partial_inertial_dynamics_decoupling=False, + gravity_compensation=False, + motion_stiffness_task=[565.0, 565.0, 565.0, 28.0, 28.0, 28.0], + motion_damping_ratio_task=[1.0, 1.0, 1.0, 1.0, 1.0, 1.0], + nullspace_control="position", + nullspace_stiffness=10.0, + nullspace_damping_ratio=1.0, + ), + position_scale=1.0, + orientation_scale=1.0, + nullspace_joint_pos_target="default", + fixed_asset_name=fixed_asset_name, + peg_offset=peg_offset, + force_body_name="force_sensor", + ) + self.gripper_action = None diff --git a/isaaclab_arena_environments/mdp/nist_gear_insertion/osc_action.py b/isaaclab_arena_environments/mdp/nist_gear_insertion/osc_action.py new file mode 100644 index 000000000..7b09013ac --- /dev/null +++ b/isaaclab_arena_environments/mdp/nist_gear_insertion/osc_action.py @@ -0,0 +1,473 @@ +# Copyright (c) 2025-2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Operational-space action term for Franka gear insertion. + +This module converts normalized policy commands into end-effector pose targets +for the OSC controller. The policy targets the insertion peg directly, while +this term applies the filtering, command limits, force smoothing, and reset-time +randomization used by the Isaac Lab Forge gear-insertion setup. +""" + +from __future__ import annotations + +import math +import torch +from collections.abc import Sequence +from dataclasses import MISSING +from typing import TYPE_CHECKING + +import isaaclab.utils.math as math_utils +import warp as wp +from isaaclab.envs.mdp.actions.actions_cfg import OperationalSpaceControllerActionCfg +from isaaclab.envs.mdp.actions.task_space_actions import OperationalSpaceControllerAction +from isaaclab.managers.action_manager import ActionTerm +from isaaclab.utils import configclass +from isaaclab_tasks.direct.factory.factory_utils import wrap_yaw +from isaaclab_tasks.direct.forge.forge_utils import get_random_prop_gains + +from isaaclab_arena.tasks.nist_gear_insertion.geometry import compute_asset_local_offset_pos + +if TYPE_CHECKING: + from isaaclab.envs import ManagerBasedEnv + + +ACTION_DIM = 7 +POS_SLICE = slice(0, 3) +QUAT_SLICE = slice(3, 7) +ROLL_IDX = 3 +YAW_IDX = 5 +SUCCESS_IDX = 6 +ROLL_PITCH_SLICE = slice(ROLL_IDX, YAW_IDX) +ROT_ROLL_IDX = 0 +ROT_PITCH_IDX = 1 +ROT_YAW_IDX = 2 +# The trained policy uses a gripper-down orientation and controls only yaw. +# The interval avoids the wrist-frame discontinuity behind the end effector. +YAW_RANGE_RAD = math.radians(270.0) +YAW_MIN_RAD = -math.pi +YAW_MAX_RAD = math.pi / 2.0 + + +def _action_to_target_yaw(action: torch.Tensor) -> torch.Tensor: + """Map normalized policy action to the commanded yaw interval.""" + return YAW_MIN_RAD + YAW_RANGE_RAD * (action + 1.0) / 2.0 + + +def _target_yaw_to_action(yaw: torch.Tensor) -> torch.Tensor: + """Map commanded yaw back to the normalized policy interval.""" + return (yaw - YAW_MIN_RAD) / YAW_RANGE_RAD * 2.0 - 1.0 + + +def _wrap_yaw_to_action_range(yaw: torch.Tensor) -> torch.Tensor: + """Wrap yaw and clamp the excluded wrist sector to the policy interval.""" + yaw = torch.where(yaw > YAW_MAX_RAD, yaw - 2.0 * math.pi, yaw) + return torch.clamp(yaw, YAW_MIN_RAD, YAW_MAX_RAD) + + +def _gripper_down_to_yaw_frame_quat(num_envs: int, device: torch.device) -> torch.Tensor: + """Return the fixed rotation from the gripper-down frame to the yaw frame.""" + return math_utils.quat_from_euler_xyz( + torch.full((num_envs,), -math.pi, device=device), + torch.zeros(num_envs, device=device), + torch.zeros(num_envs, device=device), + ) + + +def get_nist_gear_insertion_arm_action( + env: ManagerBasedEnv, + term_name: str = "arm_action", +) -> NistGearInsertionOscAction: + """Return the NIST gear insertion OSC action term from an environment.""" + try: + action_term = env.action_manager.get_term(term_name) + except KeyError as exc: + raise KeyError(f"Action term '{term_name}' is required for gear insertion.") from exc + if not isinstance(action_term, NistGearInsertionOscAction): + raise TypeError( + f"Action term '{term_name}' must be {NistGearInsertionOscAction.__name__}; " + f"got {type(action_term).__name__}." + ) + return action_term + + +class NistGearInsertionOscAction(OperationalSpaceControllerAction): + """Operational-space action term for the 7-D NIST gear insertion policy. + + The policy layout is ``xyz, roll, pitch, yaw, success``. Only ``xyz`` and + ``yaw`` are sent to the OSC controller; roll and pitch are intentionally + ignored for the gripper-down insertion strategy. The final channel is an + auxiliary success prediction consumed by the OSC reward terms. + """ + + cfg: NistGearInsertionOscActionCfg + + def __init__(self, cfg: NistGearInsertionOscActionCfg, env: ManagerBasedEnv): + super().__init__(cfg, env) + + self._smoothed_actions = torch.zeros(self.num_envs, ACTION_DIM, device=self.device) + self._ema_factor = torch.full((self.num_envs, 1), 0.05, device=self.device) + self._position_action_bounds = torch.tensor(cfg.pos_action_bounds, device=self.device) + + pos_threshold = torch.tensor(cfg.pos_action_threshold, device=self.device) + rot_threshold = torch.tensor(cfg.rot_action_threshold, device=self.device) + self._default_position_step_limits = pos_threshold.unsqueeze(0).expand(self.num_envs, -1).clone() + self._default_rotation_step_limits = rot_threshold.unsqueeze(0).expand(self.num_envs, -1).clone() + self._position_step_limits = self._default_position_step_limits.clone() + self._rotation_step_limits = self._default_rotation_step_limits.clone() + + self._peg_offset = torch.tensor(cfg.peg_offset, device=self.device) + self._fixed_pos_noise_levels = torch.tensor(cfg.fixed_pos_noise_levels, device=self.device) + self._fixed_pos_noise = torch.zeros(self.num_envs, 3, device=self.device) + self._contact_thresholds = torch.full((self.num_envs,), 7.5, device=self.device) + self._smoothed_force = torch.zeros(self.num_envs, 3, device=self.device) + self._prev_smoothed_actions = torch.zeros(self.num_envs, ACTION_DIM, device=self.device) + + self._delta_pos = torch.zeros(self.num_envs, 3, device=self.device) + self._delta_yaw = torch.zeros(self.num_envs, device=self.device) + self._success_prediction = torch.full((self.num_envs,), -1.0, device=self.device) + self._pos_dead_zone = torch.tensor(cfg.pos_dead_zone, device=self.device).unsqueeze(0) + self._rot_dead_zone = cfg.rot_dead_zone + self._force_body_idx: int | None = None + self._force_smoothing_factor = cfg.force_smoothing_factor + + def _get_peg_pos(self, env_ids: torch.Tensor | None = None) -> torch.Tensor: + """Return the peg target position in each environment frame.""" + board = self._env.scene[self.cfg.fixed_asset_name] + peg_pos = compute_asset_local_offset_pos(self._env, board, self._peg_offset) + if env_ids is not None: + return peg_pos[env_ids] + return peg_pos + + @property + def action_dim(self) -> int: + """Number of policy actions consumed by the OSC term.""" + return ACTION_DIM + + @property + def smoothed_actions(self) -> torch.Tensor: + """EMA-filtered policy actions used by observations and penalties.""" + return self._smoothed_actions + + @property + def previous_smoothed_actions(self) -> torch.Tensor: + """EMA-filtered policy actions from the previous environment step.""" + return self._prev_smoothed_actions + + @property + def position_thresholds(self) -> torch.Tensor: + """Per-environment position command limits after reset randomization.""" + return self._position_step_limits + + @property + def rotation_thresholds(self) -> torch.Tensor: + """Per-environment orientation command limits after reset randomization.""" + return self._rotation_step_limits + + @property + def fixed_pos_noise(self) -> torch.Tensor: + """Per-environment fixed-asset position noise sampled at reset.""" + return self._fixed_pos_noise + + @property + def contact_thresholds(self) -> torch.Tensor: + """Per-environment wrist-force thresholds sampled at reset.""" + return self._contact_thresholds + + @property + def smoothed_force(self) -> torch.Tensor: + """EMA-filtered wrist force owned by the action term.""" + return self._smoothed_force + + @property + def delta_pos(self) -> torch.Tensor: + """Last unclipped peg-relative position delta requested by the policy.""" + return self._delta_pos + + @property + def delta_yaw(self) -> torch.Tensor: + """Last unclipped yaw delta requested by the policy.""" + return self._delta_yaw + + @property + def success_prediction(self) -> torch.Tensor: + """Auxiliary success prediction from the final OSC action channel.""" + return self._success_prediction + + def process_actions(self, actions: torch.Tensor): + """Filter policy actions and write the corresponding OSC command.""" + self._update_smoothed_force() + self._update_smoothed_actions(actions) + self._compute_ee_pose() + + # Convert normalized policy output into a bounded OSC pose command. + ee_pos_b = self._ee_pose_b[:, POS_SLICE] + ee_quat_b = self._ee_pose_b[:, QUAT_SLICE] + self._processed_actions[:, POS_SLICE] = self._compute_bounded_target_position_from_policy_action(ee_pos_b) + self._processed_actions[:, QUAT_SLICE] = self._compute_bounded_target_orientation_from_policy_yaw(ee_quat_b) + + self._compute_task_frame_pose() + self._osc.set_command( + command=self._processed_actions, + current_ee_pose_b=self._ee_pose_b, + current_task_frame_pose_b=self._task_frame_pose_b, + ) + + def _update_smoothed_actions(self, actions: torch.Tensor) -> None: + actions = torch.nan_to_num(actions, nan=0.0, posinf=1.0, neginf=-1.0).clamp(-1.0, 1.0) + self._raw_actions[:] = actions + self._prev_smoothed_actions[:] = self._smoothed_actions + self._smoothed_actions[:] = self._ema_factor * actions + (1.0 - self._ema_factor) * self._smoothed_actions + self._success_prediction[:] = self._smoothed_actions[:, SUCCESS_IDX] + + def _ensure_force_body_idx(self) -> None: + """Resolve the wrist force-sensor body index.""" + if self._force_body_idx is not None: + return + body_ids, body_names = self._asset.find_bodies(self.cfg.force_body_name) + if len(body_ids) != 1: + raise ValueError( + f"Body '{self.cfg.force_body_name}' is required by {self.__class__.__name__} on asset " + f"'{self.cfg.asset_name}'. Use a USD with a wrist force-sensor body or override " + f"'force_body_name'. Found {len(body_ids)} matches: {body_names}." + ) + self._force_body_idx = body_ids[0] + + def _update_smoothed_force(self) -> None: + """Update the wrist-force EMA owned by the action term.""" + self._ensure_force_body_idx() + raw_force = wp.to_torch(self._asset.root_view.get_link_incoming_joint_force())[:, self._force_body_idx, :3] + raw_force = torch.nan_to_num(raw_force, nan=0.0, posinf=100.0, neginf=-100.0).clamp(-100.0, 100.0) + self._smoothed_force[:] = ( + self._force_smoothing_factor * raw_force + (1.0 - self._force_smoothing_factor) * self._smoothed_force + ) + + def _compute_bounded_target_position_from_policy_action(self, ee_pos_b: torch.Tensor) -> torch.Tensor: + """Return a bounded peg-relative position target for stable contact search. + + The policy selects a target around the noisy peg estimate, but the OSC + command is clipped to a small per-step delta so contact corrections do + not jump through the gear or peg geometry. + """ + peg_pos_b = self._get_peg_pos() + self._fixed_pos_noise + + pos_actions = self._smoothed_actions[:, POS_SLICE] + target_pos = peg_pos_b + pos_actions * self._position_action_bounds + + self._delta_pos[:] = target_pos - ee_pos_b + clipped_delta = self._clip_delta_with_dead_zone( + self._delta_pos, + self._position_step_limits, + self._pos_dead_zone, + ) + return ee_pos_b + clipped_delta + + def _compute_bounded_target_orientation_from_policy_yaw(self, ee_quat_b: torch.Tensor) -> torch.Tensor: + """Return a bounded gripper-down yaw target around the gear symmetry axis.""" + target_yaw = _action_to_target_yaw(self._smoothed_actions[:, YAW_IDX]) + target_quat = self._target_quat_from_yaw(target_yaw) + desired_xyz = self._clip_orientation_delta(ee_quat_b, target_quat) + return math_utils.quat_from_euler_xyz( + roll=desired_xyz[:, 0], + pitch=desired_xyz[:, 1], + yaw=desired_xyz[:, 2], + ) + + def _target_quat_from_yaw(self, target_yaw: torch.Tensor) -> torch.Tensor: + """Return the gripper-down target orientation for the commanded yaw.""" + zero = torch.zeros_like(target_yaw) + target_yaw_quat = math_utils.quat_from_euler_xyz(zero, zero, target_yaw) + gripper_down_quat = math_utils.quat_from_euler_xyz(torch.full_like(target_yaw, math.pi), zero, zero) + return math_utils.quat_mul(gripper_down_quat, target_yaw_quat) + + def _clip_orientation_delta(self, ee_quat_b: torch.Tensor, target_quat: torch.Tensor) -> torch.Tensor: + """Clip roll, pitch, and yaw deltas before sending the OSC command.""" + curr_roll, curr_pitch, curr_yaw = math_utils.euler_xyz_from_quat(ee_quat_b, wrap_to_2pi=True) + desired_roll, desired_pitch, desired_yaw = math_utils.euler_xyz_from_quat(target_quat, wrap_to_2pi=True) + desired_xyz = torch.stack([desired_roll, desired_pitch, desired_yaw], dim=1) + + desired_xyz[:, ROT_ROLL_IDX] = self._clip_roll_delta(curr_roll, desired_roll) + desired_xyz[:, ROT_PITCH_IDX] = self._clip_pitch_delta(curr_pitch, desired_pitch) + desired_xyz[:, ROT_YAW_IDX] = self._clip_yaw_delta(curr_yaw, desired_yaw) + return desired_xyz + + def _clip_roll_delta(self, curr_roll: torch.Tensor, desired_roll: torch.Tensor) -> torch.Tensor: + """Return roll target after applying the per-step rotation limit.""" + desired_roll = torch.where(desired_roll < 0.0, desired_roll + 2 * math.pi, desired_roll) + delta_roll = desired_roll - curr_roll + clipped_roll = torch.clamp( + delta_roll, + -self._rotation_step_limits[:, ROT_ROLL_IDX], + self._rotation_step_limits[:, ROT_ROLL_IDX], + ) + return curr_roll + clipped_roll + + def _clip_pitch_delta(self, curr_pitch: torch.Tensor, desired_pitch: torch.Tensor) -> torch.Tensor: + """Return pitch target after wrapping into the signed interval and clipping.""" + curr_pitch_w = torch.where(curr_pitch > math.pi, curr_pitch - 2 * math.pi, curr_pitch) + desired_pitch = torch.where(desired_pitch < 0.0, desired_pitch + 2 * math.pi, desired_pitch) + desired_pitch_w = torch.where(desired_pitch > math.pi, desired_pitch - 2 * math.pi, desired_pitch) + delta_pitch = desired_pitch_w - curr_pitch_w + clipped_pitch = torch.clamp( + delta_pitch, + -self._rotation_step_limits[:, ROT_PITCH_IDX], + self._rotation_step_limits[:, ROT_PITCH_IDX], + ) + return curr_pitch_w + clipped_pitch + + def _clip_yaw_delta(self, curr_yaw: torch.Tensor, desired_yaw: torch.Tensor) -> torch.Tensor: + """Return yaw target after wrapping, clipping, and applying the dead zone.""" + curr_yaw = wrap_yaw(curr_yaw) + desired_yaw = wrap_yaw(desired_yaw) + + self._delta_yaw[:] = desired_yaw - curr_yaw + clipped_yaw = self._clip_delta_with_dead_zone( + self._delta_yaw, + self._rotation_step_limits[:, ROT_YAW_IDX], + self._rot_dead_zone, + ) + return curr_yaw + clipped_yaw + + def _clip_delta_with_dead_zone( + self, + delta: torch.Tensor, + limits: torch.Tensor | float, + dead_zone: torch.Tensor | float, + ) -> torch.Tensor: + """Clamp a requested delta and zero small values.""" + clipped = torch.clamp(delta, -limits, limits) + return torch.where(torch.abs(clipped) > dead_zone, clipped, torch.zeros_like(clipped)) + + def _ee_quat_to_yaw_action(self, ee_quat: torch.Tensor) -> torch.Tensor: + """Convert the current EE orientation to the normalized policy yaw.""" + n = ee_quat.shape[0] + gripper_down_to_yaw_frame = _gripper_down_to_yaw_frame_quat(n, self.device) + yaw_frame_quat = math_utils.quat_mul(gripper_down_to_yaw_frame, ee_quat) + target_yaw = math_utils.euler_xyz_from_quat(yaw_frame_quat, wrap_to_2pi=True)[2] + target_yaw = _wrap_yaw_to_action_range(target_yaw) + return _target_yaw_to_action(target_yaw) + + def reset(self, env_ids: Sequence[int] | None = None) -> None: + env_ids_index, num_envs = self._normalize_env_ids(env_ids) + if num_envs == 0: + return + + super().reset(env_ids) + self._reset_action_filter(env_ids_index, num_envs) + self._reset_command_thresholds(env_ids_index, num_envs) + self._reset_contact_state(env_ids_index, num_envs) + self._reset_initial_smoothed_actions(env_ids_index) + self._reset_debug_state(env_ids_index) + + def _normalize_env_ids(self, env_ids: Sequence[int] | None) -> tuple[Sequence[int] | slice, int]: + """Return an index object and count for tensor resets.""" + if env_ids is None: + return slice(None), self.num_envs + return env_ids, len(env_ids) + + def _reset_action_filter(self, env_ids: Sequence[int] | slice, num_envs: int) -> None: + """Randomize the EMA factor used to smooth policy actions this episode.""" + lo, hi = self.cfg.ema_factor_range + self._ema_factor[env_ids] = lo + torch.rand(num_envs, 1, device=self.device) * (hi - lo) + + def _reset_command_thresholds(self, env_ids: Sequence[int] | slice, num_envs: int) -> None: + """Randomize per-step OSC command limits and peg-position noise.""" + self._position_step_limits[env_ids] = get_random_prop_gains( + self._default_position_step_limits[env_ids], + self.cfg.pos_threshold_noise_level, + num_envs, + self.device, + ) + self._rotation_step_limits[env_ids] = get_random_prop_gains( + self._default_rotation_step_limits[env_ids], + self.cfg.rot_threshold_noise_level, + num_envs, + self.device, + ) + self._fixed_pos_noise[env_ids] = torch.randn(num_envs, 3, device=self.device) * self._fixed_pos_noise_levels + + def _reset_contact_state(self, env_ids: Sequence[int] | slice, num_envs: int) -> None: + """Randomize contact threshold and clear the smoothed force state.""" + ct_lo, ct_hi = self.cfg.contact_threshold_range + self._contact_thresholds[env_ids] = ct_lo + torch.rand(num_envs, device=self.device) * (ct_hi - ct_lo) + self._smoothed_force[env_ids] = 0.0 + + def _reset_initial_smoothed_actions(self, env_ids: Sequence[int] | slice) -> None: + """Seed smoothed actions from the current EE pose to avoid a reset-time command jump.""" + self._compute_ee_pose() + ee_pos = self._ee_pose_b[env_ids, POS_SLICE] + ee_quat = self._ee_pose_b[env_ids, QUAT_SLICE] + + peg_pos = self._get_peg_pos(env_ids) + self._fixed_pos_noise[env_ids] + + pos_actions = (ee_pos - peg_pos) / self._position_action_bounds + self._smoothed_actions[env_ids, POS_SLICE] = pos_actions + + yaw_action = self._ee_quat_to_yaw_action(ee_quat) + + self._smoothed_actions[env_ids, ROLL_PITCH_SLICE] = 0.0 + self._smoothed_actions[env_ids, YAW_IDX] = yaw_action + self._smoothed_actions[env_ids, SUCCESS_IDX] = -1.0 + + self._prev_smoothed_actions[env_ids] = self._smoothed_actions[env_ids] + + def _reset_debug_state(self, env_ids: Sequence[int] | slice) -> None: + """Clear diagnostic buffers used by observations and rewards.""" + self._delta_pos[env_ids] = 0.0 + self._delta_yaw[env_ids] = 0.0 + self._success_prediction[env_ids] = -1.0 + + +@configclass +class NistGearInsertionOscActionCfg(OperationalSpaceControllerActionCfg): + """Config for :class:`NistGearInsertionOscAction`. + + Environments provide the fixed asset name and peg offset because those + values are scene geometry, not robot configuration. + """ + + class_type: type[ActionTerm] = NistGearInsertionOscAction + + fixed_asset_name: str = MISSING + """Name of the fixed asset that contains the insertion peg.""" + + peg_offset: tuple[float, float, float] = MISSING + """Local-frame peg offset on :attr:`fixed_asset_name`.""" + + fixed_pos_noise_levels: tuple[float, float, float] = (0.001, 0.001, 0.001) + """Per-axis standard deviation for reset-time target noise.""" + + pos_action_bounds: tuple[float, float, float] = (0.05, 0.05, 0.05) + """Position scale applied to normalized policy actions.""" + + pos_action_threshold: tuple[float, float, float] = (0.02, 0.02, 0.02) + """Maximum per-step position delta sent to the OSC controller.""" + + rot_action_threshold: tuple[float, float, float] = (0.097, 0.097, 0.097) + """Maximum per-step orientation delta sent to the OSC controller.""" + + pos_threshold_noise_level: tuple[float, float, float] = (0.25, 0.25, 0.25) + """Reset-time multiplicative noise for position thresholds.""" + + rot_threshold_noise_level: tuple[float, float, float] = (0.29, 0.29, 0.29) + """Reset-time multiplicative noise for orientation thresholds.""" + + ema_factor_range: tuple[float, float] = (0.05, 0.2) + """Reset-time range for the action EMA factor.""" + + contact_threshold_range: tuple[float, float] = (5.0, 10.0) + """Reset-time wrist-force threshold range.""" + + pos_dead_zone: tuple[float, float, float] = (0.0005, 0.0005, 0.0005) + """Position command dead zone.""" + + rot_dead_zone: float = 0.001 + """Orientation command dead zone.""" + + force_body_name: str = "force_sensor" + """Body that exposes the wrist force-sensor joint wrench.""" + + force_smoothing_factor: float = 0.25 + """EMA factor used to smooth wrist-force readings.""" diff --git a/isaaclab_arena_environments/mdp/nist_gear_insertion/osc_observations.py b/isaaclab_arena_environments/mdp/nist_gear_insertion/osc_observations.py new file mode 100644 index 000000000..9739c49f5 --- /dev/null +++ b/isaaclab_arena_environments/mdp/nist_gear_insertion/osc_observations.py @@ -0,0 +1,436 @@ +# Copyright (c) 2025-2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""OSC policy observations for Franka NIST gear insertion. + +The observation term is paired with :class:`NistGearInsertionOscAction`: it exposes +the peg-relative fingertip state, smoothed wrist force, force threshold, and +filtered previous action values expected by the trained 24-D policy observation. +""" + +from __future__ import annotations + +import math +import torch +import torch.nn.functional as F +from dataclasses import dataclass +from typing import TYPE_CHECKING + +import isaaclab.utils.math as math_utils +import warp as wp +from isaaclab.assets import Articulation +from isaaclab.managers import ManagerTermBase +from isaaclab.utils.math import axis_angle_from_quat, quat_unique + +from isaaclab_arena.tasks.nist_gear_insertion.geometry import compute_asset_local_offset_pos +from isaaclab_arena_environments.mdp.nist_gear_insertion.osc_action import get_nist_gear_insertion_arm_action + +if TYPE_CHECKING: + from isaaclab.envs import ManagerBasedRLEnv + from isaaclab.managers import ObservationTermCfg + + +FINGERTIP_POS_REL_DIM = 3 +FINGERTIP_QUAT_DIM = 4 +EE_LINVEL_DIM = 3 +EE_ANGVEL_DIM = 3 +FT_FORCE_DIM = 3 +FORCE_THRESHOLD_DIM = 1 +PREV_ACTION_DIM = 7 +POLICY_OBS_LAYOUT = ( + ("fingertip_pos_rel_fixed", FINGERTIP_POS_REL_DIM), + ("fingertip_quat", FINGERTIP_QUAT_DIM), + ("ee_linvel", EE_LINVEL_DIM), + ("ee_angvel", EE_ANGVEL_DIM), + ("ft_force", FT_FORCE_DIM), + ("force_threshold", FORCE_THRESHOLD_DIM), + ("prev_actions", PREV_ACTION_DIM), +) +POLICY_OBS_DIM = sum(dim for _, dim in POLICY_OBS_LAYOUT) +PREV_ACTION_ROLL_PITCH_SLICE = slice(3, 5) + + +@dataclass(frozen=True) +class NistGearInsertionPolicyObsParams: + """Resolved config values for :class:`NistGearInsertionPolicyObservations`.""" + + robot_name: str = "robot" + board_name: str = "fixed_asset" + peg_offset: tuple[float, float, float] = (0.0, 0.0, 0.0) + fingertip_body_name: str = "panda_fingertip_centered" + force_body_name: str = "force_sensor" + """Body whose presence is validated because the paired action term reads wrist force.""" + pos_noise_level: float = 0.00025 + rot_noise_level_deg: float = 0.1 + force_noise_level: float = 1.0 + + @classmethod + def from_dict(cls, params: dict) -> NistGearInsertionPolicyObsParams: + return cls( + robot_name=params.get("robot_name", cls.robot_name), + board_name=params.get("board_name", cls.board_name), + peg_offset=tuple(params.get("peg_offset", cls.peg_offset)), + fingertip_body_name=params.get("fingertip_body_name", cls.fingertip_body_name), + force_body_name=params.get("force_body_name", cls.force_body_name), + pos_noise_level=params.get("pos_noise_level", cls.pos_noise_level), + rot_noise_level_deg=params.get("rot_noise_level_deg", cls.rot_noise_level_deg), + force_noise_level=params.get("force_noise_level", cls.force_noise_level), + ) + + +def _make_reported_quat(quat: torch.Tensor, flip: torch.Tensor) -> torch.Tensor: + """Return the symmetry-reduced quaternion channels exposed to the policy. + + Gear rotation about the peg axis is handled through the yaw action. For + checkpoint parity, the packed observation keeps the original x/y channels + and zeros z/w instead of renormalizing the quaternion. + """ + reported_quat = quat.clone() + reported_quat[:, [2, 3]] = 0.0 + return reported_quat * flip.unsqueeze(-1) + + +def _compute_pose_velocities( + prev_pos: torch.Tensor, + prev_quat: torch.Tensor, + pos: torch.Tensor, + quat: torch.Tensor, + dt: float, +) -> tuple[torch.Tensor, torch.Tensor]: + """Estimate finite-difference linear velocity and yaw angular velocity.""" + safe_dt = max(dt, 1e-6) + linvel = (pos - prev_pos) / safe_dt + + rot_diff = math_utils.quat_mul(quat, math_utils.quat_conjugate(prev_quat)) + rot_diff = quat_unique(rot_diff) + angvel = axis_angle_from_quat(rot_diff) / safe_dt + angvel[:, 0:2] = 0.0 + return linvel, angvel + + +class NistGearInsertionPolicyObservations(ManagerTermBase): + """Policy observation term for OSC-based gear insertion. + + The term reads state owned by :class:`NistGearInsertionOscAction`, including + reset-time peg noise, smoothed force, per-episode force threshold, and + previous filtered actions. The action term owns the wrist-force EMA and + updates it at most once per simulation step. + + Output layout (per env):: + + fingertip_pos_rel_fixed (3) + fingertip_quat (4) + ee_linvel (3) + ee_angvel (3) + ft_force (3) + force_threshold (1) + prev_actions (7) + """ + + def __init__(self, cfg: ObservationTermCfg, env: ManagerBasedRLEnv): + super().__init__(cfg, env) + + self._params = NistGearInsertionPolicyObsParams.from_dict(cfg.params) + self._robot_name = self._params.robot_name + self._board_name = self._params.board_name + self._peg_offset_values = self._params.peg_offset + self._peg_offset = torch.tensor(self._peg_offset_values, device=env.device) + self._fingertip_body = self._params.fingertip_body_name + self._force_body = self._params.force_body_name + + self._pos_noise = self._params.pos_noise_level + self._rot_noise_deg = self._params.rot_noise_level_deg + self._force_noise = self._params.force_noise_level + + n = env.num_envs + dev = env.device + + self._fingertip_idx: int | None = None + self._body_key = (self._robot_name, self._fingertip_body, self._force_body) + + self._flip_quats = torch.ones(n, device=dev) + self._prev_noisy_pos = torch.zeros(n, 3, device=dev) + self._prev_noisy_quat = torch.zeros(n, 4, device=dev) + self._prev_noisy_quat[:, 3] = 1.0 + + def _resolve_fingertip_idx( + self, + robot_name: str, + fingertip_body_name: str, + force_body_name: str, + ) -> int: + """Resolve the fingertip body index used by the observation term.""" + body_key = (robot_name, fingertip_body_name, force_body_name) + if self._fingertip_idx is not None and body_key == self._body_key: + return self._fingertip_idx + + robot: Articulation = self._env.scene[robot_name] + for body_name in (fingertip_body_name, force_body_name): + body_ids, body_names = robot.find_bodies([body_name]) + if not body_ids: + raise ValueError( + f"Body '{body_name}' is missing from robot '{robot_name}'. Use a USD that defines this " + "body or override the corresponding observation parameter. " + f"Matches: {body_names}." + ) + + fingertip_ids, _ = robot.find_bodies([fingertip_body_name]) + fingertip_idx = fingertip_ids[0] + if body_key == self._body_key: + self._fingertip_idx = fingertip_idx + return fingertip_idx + + def _resolve_peg_offset(self, peg_offset: list[float] | None, device: torch.device) -> torch.Tensor: + """Return the cached peg offset unless the manager supplies different values.""" + if peg_offset is None or tuple(peg_offset) == self._peg_offset_values: + return self._peg_offset + return torch.tensor(peg_offset, device=device, dtype=torch.float32) + + def _sample_noisy_pose( + self, + env: ManagerBasedRLEnv, + ft_pos: torch.Tensor, + ft_quat: torch.Tensor, + pos_noise_level: float, + rot_noise_level_deg: float, + ) -> tuple[torch.Tensor, torch.Tensor]: + """Return fingertip pose with configured sensor noise.""" + pos_noise = torch.randn(env.num_envs, 3, device=env.device) * pos_noise_level + noisy_pos = ft_pos + pos_noise + + rot_noise_axis = torch.randn(env.num_envs, 3, device=env.device) + rot_noise_axis = F.normalize(rot_noise_axis, dim=1, eps=1e-8) + rot_noise_angle = torch.randn(env.num_envs, device=env.device) * math.radians(rot_noise_level_deg) + noisy_quat = math_utils.quat_mul( + ft_quat, + math_utils.quat_from_angle_axis(rot_noise_angle, rot_noise_axis), + ) + return noisy_pos, quat_unique(noisy_quat) + + def _compute_fingertip_target_delta( + self, + env: ManagerBasedRLEnv, + noisy_pos: torch.Tensor, + board_name: str, + peg_offset: torch.Tensor, + arm_osc_action, + ) -> torch.Tensor: + """Return fingertip position relative to the noisy peg target.""" + board = env.scene[board_name] + peg_pos = compute_asset_local_offset_pos(env, board, peg_offset) + noisy_fixed_pos = peg_pos + arm_osc_action.fixed_pos_noise + return noisy_pos - noisy_fixed_pos + + def _compute_velocities( + self, + noisy_pos: torch.Tensor, + obs_quat: torch.Tensor, + dt: float, + ) -> tuple[torch.Tensor, torch.Tensor]: + """Estimate fingertip linear and angular velocity from noisy poses.""" + ee_linvel, ee_angvel = _compute_pose_velocities( + self._prev_noisy_pos, + self._prev_noisy_quat, + noisy_pos, + obs_quat, + dt, + ) + + self._prev_noisy_pos[:] = noisy_pos + self._prev_noisy_quat[:] = obs_quat + return ee_linvel, ee_angvel + + def _read_smoothed_force( + self, + env: ManagerBasedRLEnv, + arm_osc_action, + force_noise_level: float, + ) -> torch.Tensor: + """Return the smoothed wrist force with configured observation noise.""" + force = arm_osc_action.smoothed_force + return force + torch.randn(env.num_envs, 3, device=env.device) * force_noise_level + + def _read_fingertip_pose( + self, + env: ManagerBasedRLEnv, + robot_name: str, + fingertip_idx: int, + ) -> tuple[torch.Tensor, torch.Tensor]: + """Return fingertip pose in the environment frame.""" + robot: Articulation = env.scene[robot_name] + origins = env.scene.env_origins + fingertip_pos = wp.to_torch(robot.data.body_pos_w)[:, fingertip_idx] - origins + fingertip_quat = wp.to_torch(robot.data.body_quat_w)[:, fingertip_idx] + return fingertip_pos, fingertip_quat + + def _compute_fingertip_observations( + self, + env: ManagerBasedRLEnv, + robot_name: str, + board_name: str, + peg_offset: torch.Tensor, + fingertip_idx: int, + pos_noise_level: float, + rot_noise_level_deg: float, + arm_osc_action, + ) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]: + """Return target-relative pose and velocity observations for the fingertip.""" + ft_pos, ft_quat = self._read_fingertip_pose(env, robot_name, fingertip_idx) + noisy_pos, noisy_quat_full = self._sample_noisy_pose(env, ft_pos, ft_quat, pos_noise_level, rot_noise_level_deg) + fingertip_pos_rel = self._compute_fingertip_target_delta(env, noisy_pos, board_name, peg_offset, arm_osc_action) + obs_quat = _make_reported_quat(noisy_quat_full, self._flip_quats) + ee_linvel, ee_angvel = self._compute_velocities(noisy_pos, noisy_quat_full, env.step_dt) + return fingertip_pos_rel, obs_quat, ee_linvel, ee_angvel + + def _compute_force_action_observations( + self, + env: ManagerBasedRLEnv, + arm_osc_action, + force_noise_level: float, + ) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]: + """Return wrist-force, force-threshold, and previous-action observations.""" + noisy_force = self._read_smoothed_force(env, arm_osc_action, force_noise_level) + force_threshold = arm_osc_action.contact_thresholds.unsqueeze(-1) + + prev_actions = arm_osc_action.smoothed_actions.clone() + prev_actions[:, PREV_ACTION_ROLL_PITCH_SLICE] = 0.0 + return noisy_force, force_threshold, prev_actions + + def _pack_observation( + self, + fingertip_pos_rel: torch.Tensor, + obs_quat: torch.Tensor, + ee_linvel: torch.Tensor, + ee_angvel: torch.Tensor, + noisy_force: torch.Tensor, + force_threshold: torch.Tensor, + prev_actions: torch.Tensor, + ) -> torch.Tensor: + """Concatenate the policy observation and enforce the documented layout.""" + obs = torch.cat( + [ + fingertip_pos_rel, + obs_quat, + ee_linvel, + ee_angvel, + noisy_force, + force_threshold, + prev_actions, + ], + dim=-1, + ) + assert obs.shape[-1] == POLICY_OBS_DIM, f"Expected {POLICY_OBS_DIM}D gear insertion policy obs." + return torch.nan_to_num(obs, nan=0.0, posinf=100.0, neginf=-100.0).clamp(-100.0, 100.0) + + def _resolve_call_params( + self, + robot_name: str | None, + board_name: str | None, + fingertip_body_name: str | None, + force_body_name: str | None, + pos_noise_level: float | None, + rot_noise_level_deg: float | None, + force_noise_level: float | None, + ) -> tuple[str, str, str, str, float, float, float]: + """Resolve optional call-time overrides against the configured defaults.""" + return ( + self._robot_name if robot_name is None else robot_name, + self._board_name if board_name is None else board_name, + self._fingertip_body if fingertip_body_name is None else fingertip_body_name, + self._force_body if force_body_name is None else force_body_name, + self._pos_noise if pos_noise_level is None else pos_noise_level, + self._rot_noise_deg if rot_noise_level_deg is None else rot_noise_level_deg, + self._force_noise if force_noise_level is None else force_noise_level, + ) + + def reset(self, env_ids: list[int] | None = None): + """Reset noisy pose history and quaternion sign for selected environments.""" + env_ids, num_envs = self._normalize_env_ids(env_ids) + if num_envs == 0: + return + + dev = self._env.device + + flip = torch.ones(num_envs, device=dev) + flip[torch.rand(num_envs, device=dev) > 0.5] = -1.0 + self._flip_quats[env_ids] = flip + + fingertip_idx = self._resolve_fingertip_idx(self._robot_name, self._fingertip_body, self._force_body) + robot: Articulation = self._env.scene[self._robot_name] + origins = self._env.scene.env_origins + self._prev_noisy_pos[env_ids] = wp.to_torch(robot.data.body_pos_w)[env_ids, fingertip_idx] - origins[env_ids] + reset_quat = wp.to_torch(robot.data.body_quat_w)[env_ids, fingertip_idx] + self._prev_noisy_quat[env_ids] = quat_unique(reset_quat) + + def _normalize_env_ids( + self, + env_ids: list[int] | torch.Tensor | None, + ) -> tuple[torch.Tensor | slice, int]: + """Return an index object and count for observation reset buffers.""" + if env_ids is None: + return slice(None), self._env.num_envs + env_ids = torch.as_tensor(env_ids, device=self._env.device, dtype=torch.long) + return env_ids, env_ids.numel() + + def __call__( + self, + env: ManagerBasedRLEnv, + robot_name: str | None = None, + board_name: str | None = None, + peg_offset: list[float] | None = None, + fingertip_body_name: str | None = None, + force_body_name: str | None = None, + pos_noise_level: float | None = None, + rot_noise_level_deg: float | None = None, + force_noise_level: float | None = None, + ) -> torch.Tensor: + """Return the 24-D policy observation tensor.""" + ( + robot_name, + board_name, + fingertip_body_name, + force_body_name, + pos_noise_level, + rot_noise_level_deg, + force_noise_level, + ) = self._resolve_call_params( + robot_name, + board_name, + fingertip_body_name, + force_body_name, + pos_noise_level, + rot_noise_level_deg, + force_noise_level, + ) + + peg_offset_tensor = self._resolve_peg_offset(peg_offset, env.device) + fingertip_idx = self._resolve_fingertip_idx(robot_name, fingertip_body_name, force_body_name) + arm_osc_action = get_nist_gear_insertion_arm_action(env) + + fingertip_pos_rel, obs_quat, ee_linvel, ee_angvel = self._compute_fingertip_observations( + env, + robot_name, + board_name, + peg_offset_tensor, + fingertip_idx, + pos_noise_level, + rot_noise_level_deg, + arm_osc_action, + ) + noisy_force, force_threshold, prev_actions = self._compute_force_action_observations( + env, + arm_osc_action, + force_noise_level, + ) + + return self._pack_observation( + fingertip_pos_rel, + obs_quat, + ee_linvel, + ee_angvel, + noisy_force, + force_threshold, + prev_actions, + ) diff --git a/isaaclab_arena_environments/mdp/nist_gear_insertion/osc_rewards.py b/isaaclab_arena_environments/mdp/nist_gear_insertion/osc_rewards.py new file mode 100644 index 000000000..e1bc01d7b --- /dev/null +++ b/isaaclab_arena_environments/mdp/nist_gear_insertion/osc_rewards.py @@ -0,0 +1,197 @@ +# Copyright (c) 2025-2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""OSC-specific reward terms for NIST gear insertion. + +These terms consume public state from :class:`NistGearInsertionOscAction`, such as +filtered actions, command deltas, smoothed force, and the auxiliary success +prediction channel. +""" + +from __future__ import annotations + +import torch +from dataclasses import MISSING + +from isaaclab.envs import ManagerBasedRLEnv +from isaaclab.managers import ManagerTermBase, RewardTermCfg, SceneEntityCfg +from isaaclab.utils import configclass + +from isaaclab_arena.tasks.nist_gear_insertion import geometry as gear_geometry +from isaaclab_arena_environments.mdp.nist_gear_insertion.osc_action import get_nist_gear_insertion_arm_action + + +@configclass +class NistGearInsertionOscRewardsCfg: + """Config group for OSC-specific gear insertion rewards. + + Environments pass scene-specific asset names and insertion geometry so this + OSC reward group remains reusable across gear-insertion scenes. + """ + + action_magnitude_penalty: RewardTermCfg = MISSING + action_delta_penalty: RewardTermCfg = MISSING + contact_penalty: RewardTermCfg = MISSING + success_prediction_error: RewardTermCfg = MISSING + + def __init__( + self, + gear_name: str, + board_name: str, + peg_offset: list[float] | tuple[float, ...], + held_gear_base_offset: list[float] | tuple[float, ...], + gear_peg_height: float, + success_z_fraction: float, + xy_threshold: float, + ): + """Initialize OSC reward terms with shared insertion geometry.""" + geometry_params = { + "gear_cfg": SceneEntityCfg(gear_name), + "board_cfg": SceneEntityCfg(board_name), + "peg_offset": list(peg_offset), + "held_gear_base_offset": list(held_gear_base_offset), + "gear_peg_height": gear_peg_height, + "xy_threshold": xy_threshold, + } + + self.action_magnitude_penalty = RewardTermCfg( + func=osc_action_magnitude_penalty, + weight=-0.0005, + params={}, + ) + self.action_delta_penalty = RewardTermCfg( + func=osc_action_delta_penalty, + weight=-0.01, + params={}, + ) + self.contact_penalty = RewardTermCfg( + func=wrist_contact_force_penalty, + weight=-0.001, + params={}, + ) + self.success_prediction_error = RewardTermCfg( + func=success_prediction_error, + weight=-1.0, + params={ + **geometry_params, + "success_z_fraction": success_z_fraction, + "delay_until_ratio": 0.25, + }, + ) + + +class osc_action_magnitude_penalty(ManagerTermBase): + """Penalize large asset-relative position and yaw commands.""" + + def __call__(self, env: ManagerBasedRLEnv) -> torch.Tensor: + action_term = get_nist_gear_insertion_arm_action(env) + position_thresholds = action_term.position_thresholds.clamp_min(1.0e-6) + yaw_thresholds = action_term.rotation_thresholds[:, 2].clamp_min(1.0e-6) + pos_error = torch.norm(action_term.delta_pos / position_thresholds, p=2, dim=-1) + rot_error = torch.abs(action_term.delta_yaw) / yaw_thresholds + return pos_error + rot_error + + +class osc_action_delta_penalty(ManagerTermBase): + """Penalize jerky actions using smoothed action deltas.""" + + def __call__(self, env: ManagerBasedRLEnv) -> torch.Tensor: + action_term = get_nist_gear_insertion_arm_action(env) + return torch.norm( + action_term.smoothed_actions - action_term.previous_smoothed_actions, + p=2, + dim=-1, + ) + + +class wrist_contact_force_penalty(ManagerTermBase): + """Penalize wrist/contact force magnitude above per-episode threshold.""" + + def __call__(self, env: ManagerBasedRLEnv) -> torch.Tensor: + action_term = get_nist_gear_insertion_arm_action(env) + force_mag = torch.norm(action_term.smoothed_force, p=2, dim=-1) + return torch.nn.functional.relu(force_mag - action_term.contact_thresholds) + + +class success_prediction_error(ManagerTermBase): + """Penalize incorrect success predictions from the seventh action channel.""" + + def __init__(self, cfg: RewardTermCfg, env: ManagerBasedRLEnv): + super().__init__(cfg, env) + self._prediction_loss_weight = 0.0 + self._held_gear_base_offset_values = tuple(cfg.params["held_gear_base_offset"]) + self._held_gear_base_offset = torch.tensor( + self._held_gear_base_offset_values, device=env.device, dtype=torch.float32 + ) + self._peg_offset_values = tuple(cfg.params["peg_offset"]) + self._peg_offset = torch.tensor(self._peg_offset_values, device=env.device, dtype=torch.float32) + + def __call__( + self, + env: ManagerBasedRLEnv, + gear_cfg: SceneEntityCfg, + board_cfg: SceneEntityCfg, + gear_peg_height: float, + success_z_fraction: float, + xy_threshold: float, + peg_offset: list[float] | None = None, + held_gear_base_offset: list[float] | None = None, + delay_until_ratio: float = 0.25, + ) -> torch.Tensor: + true_success = self._compute_true_success( + env, + gear_cfg, + board_cfg, + peg_offset, + held_gear_base_offset, + gear_peg_height, + success_z_fraction, + xy_threshold, + ) + self._update_prediction_loss_weight(true_success, delay_until_ratio) + pred = self._read_success_prediction(env) + return self._compute_prediction_error(true_success, pred) + + def _compute_true_success( + self, + env: ManagerBasedRLEnv, + gear_cfg: SceneEntityCfg, + board_cfg: SceneEntityCfg, + peg_offset: list[float] | None, + held_gear_base_offset: list[float] | None, + gear_peg_height: float, + success_z_fraction: float, + xy_threshold: float, + ) -> torch.Tensor: + """Return the geometric success label used by the auxiliary prediction loss.""" + return gear_geometry.compute_gear_insertion_success( + env, + gear_cfg, + board_cfg, + gear_geometry.resolve_offset_tensor(peg_offset, self._peg_offset_values, self._peg_offset, env.device), + gear_geometry.resolve_offset_tensor( + held_gear_base_offset, + self._held_gear_base_offset_values, + self._held_gear_base_offset, + env.device, + ), + gear_peg_height, + success_z_fraction, + xy_threshold, + ) + + def _update_prediction_loss_weight(self, true_success: torch.Tensor, delay_until_ratio: float) -> None: + """Enable the auxiliary loss once enough environments have reached success.""" + if true_success.float().mean() >= delay_until_ratio: + self._prediction_loss_weight = 1.0 + + def _read_success_prediction(self, env: ManagerBasedRLEnv) -> torch.Tensor: + """Read the success-probability prediction from the OSC action term.""" + arm_osc_action = get_nist_gear_insertion_arm_action(env) + return (arm_osc_action.success_prediction + 1.0) / 2.0 + + def _compute_prediction_error(self, true_success: torch.Tensor, pred: torch.Tensor) -> torch.Tensor: + """Return the gated auxiliary success-prediction error.""" + return torch.abs(true_success.float() - pred) * self._prediction_loss_weight diff --git a/isaaclab_arena_environments/nist_assembled_gearmesh_osc_environment.py b/isaaclab_arena_environments/nist_assembled_gearmesh_osc_environment.py new file mode 100644 index 000000000..67f3db72a --- /dev/null +++ b/isaaclab_arena_environments/nist_assembled_gearmesh_osc_environment.py @@ -0,0 +1,137 @@ +# Copyright (c) 2025-2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""NIST assembled board gear-mesh environment with operational-space torque control.""" + +from __future__ import annotations + +import argparse +from typing import TYPE_CHECKING + +from isaaclab_arena.assets.register import register_environment +from isaaclab_arena_environments.example_environment_base import ExampleEnvironmentBase + +if TYPE_CHECKING: + from isaaclab_arena.environments.isaaclab_arena_environment import IsaacLabArenaEnvironment + +_FRANKA_NIST_GEAR_INSERTION_OSC_EMBODIMENT = "franka_nist_gear_insertion_osc" +_PEG_TIP_OFFSET = (0.02025, 0.0, 0.025) +_PEG_BASE_OFFSET = (0.02025, 0.0, 0.0) +_GEAR_PEG_HEIGHT = 0.02 +_SUCCESS_Z_FRACTION = 0.20 +_XY_THRESHOLD = 0.0025 +_EPISODE_LENGTH_S = 15.0 + + +@register_environment +class NISTAssembledGearMeshOSCEnvironment(ExampleEnvironmentBase): + """NIST gear insertion using OSC torque control and assembly-style observations.""" + + name: str = "nist_assembled_gear_mesh_osc" + + def get_env(self, args_cli: argparse.Namespace) -> IsaacLabArenaEnvironment: + import isaaclab.sim as sim_utils + + import isaaclab_arena_environments.mdp as mdp + from isaaclab_arena.environments.isaaclab_arena_environment import IsaacLabArenaEnvironment + from isaaclab_arena.scene.scene import Scene + from isaaclab_arena.tasks.nist_gear_insertion.task import GearInsertionGeometryCfg, NistGearInsertionRLTask + from isaaclab_arena.utils.pose import Pose + from isaaclab_arena_environments.mdp.nist_gear_insertion.franka_osc_cfg import ( + FrankaNistGearInsertionObservationsCfg, + FrankaNistGearInsertionOscActionsCfg, + ) + from isaaclab_arena_environments.mdp.nist_gear_insertion.osc_rewards import NistGearInsertionOscRewardsCfg + + table = self.asset_registry.get_asset_by_name("table")() + assembled_board = self.asset_registry.get_asset_by_name("nist_board_assembled")() + gears_and_base = self.asset_registry.get_asset_by_name("gears_and_base")() + medium_gear = self.asset_registry.get_asset_by_name("medium_nist_gear")() + light_spawner_cfg = sim_utils.DomeLightCfg(color=(0.75, 0.75, 0.75), intensity=1500.0) + light = self.asset_registry.get_asset_by_name("light")(spawner_cfg=light_spawner_cfg) + + embodiment = self.asset_registry.get_asset_by_name(_FRANKA_NIST_GEAR_INSERTION_OSC_EMBODIMENT)( + enable_cameras=args_cli.enable_cameras, + concatenate_observation_terms=True, + ) + embodiment.action_config = FrankaNistGearInsertionOscActionsCfg( + fixed_asset_name=gears_and_base.name, + peg_offset=_PEG_TIP_OFFSET, + ) + embodiment.observation_config = FrankaNistGearInsertionObservationsCfg( + fixed_asset_name=gears_and_base.name, + peg_offset=_PEG_TIP_OFFSET, + fingertip_body_name=embodiment.get_command_body_name(), + concatenate_observation_terms=embodiment.concatenate_observation_terms, + ) + embodiment.reward_config = NistGearInsertionOscRewardsCfg( + gear_name=medium_gear.name, + board_name=gears_and_base.name, + peg_offset=_PEG_BASE_OFFSET, + held_gear_base_offset=_PEG_BASE_OFFSET, + gear_peg_height=_GEAR_PEG_HEIGHT, + success_z_fraction=_SUCCESS_Z_FRACTION, + xy_threshold=_XY_THRESHOLD, + ) + if args_cli.teleop_device is not None: + teleop_device = self.device_registry.get_device_by_name(args_cli.teleop_device)() + else: + teleop_device = None + + table.set_initial_pose(Pose(position_xyz=(0.55, 0.0, -0.009), rotation_xyzw=(0.0, 0.0, 0.707, 0.707))) + assembled_board.set_initial_pose( + Pose(position_xyz=(0.88, 0.15, -0.009), rotation_xyzw=(0.0, 0.0, -0.7071, 0.7071)) + ) + medium_gear.set_initial_pose(Pose(position_xyz=(0.5462, -0.02386, 0.12858), rotation_xyzw=(0.0, 0.0, 0.0, 1.0))) + gears_and_base.set_initial_pose( + Pose(position_xyz=(0.585, -0.074, 0.0), rotation_xyzw=(0.0, 0.0, 0.9239, 0.3827)) + ) + scene = Scene(assets=[table, assembled_board, medium_gear, gears_and_base, light]) + + geometry_cfg = GearInsertionGeometryCfg( + peg_offset_from_board=list(_PEG_BASE_OFFSET), + peg_offset_for_obs=list(_PEG_TIP_OFFSET), + success_z_fraction=_SUCCESS_Z_FRACTION, + xy_threshold=_XY_THRESHOLD, + ) + + task = NistGearInsertionRLTask( + assembled_board=assembled_board, + held_gear=medium_gear, + background_scene=table, + gear_base_asset=gears_and_base, + geometry_cfg=geometry_cfg, + episode_length_s=_EPISODE_LENGTH_S, + grasp_cfg=embodiment.get_gear_insertion_grasp_config(), + fingertip_body_name=embodiment.get_command_body_name(), + enable_randomization=True, + disable_success_termination=args_cli.disable_success_termination, + ) + + return IsaacLabArenaEnvironment( + name=self.name, + embodiment=embodiment, + scene=scene, + task=task, + teleop_device=teleop_device, + env_cfg_callback=mdp.assembly_env_cfg_callback, + ) + + @staticmethod + def add_cli_args(parser: argparse.ArgumentParser) -> None: + parser.add_argument( + "--teleop_device", type=str, default=None, help="Teleoperation device (e.g., keyboard, spacemouse)" + ) + parser.add_argument( + "--disable_success_termination", + action="store_true", + help="Disable success termination during training.", + ) + parser.add_argument( + "--rl_training_mode", + dest="disable_success_termination", + action="store_true", + help="Alias for --disable_success_termination.", + )