From 2c1028cf6186449da534c958f7c67190b4f4ab48 Mon Sep 17 00:00:00 2001 From: Clemens Volk Date: Thu, 28 May 2026 16:21:51 +0200 Subject: [PATCH 1/6] Disable RTX scene ambient in Arena env config MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The carb setting /rtx/sceneDb/ambientLightIntensity defaults to 1.0 with color [0.1, 0.1, 0.1] in the IsaacLab kit experience. This adds a hidden ~10%-gray ambient floor to every rendered frame, completely independent of any USD UsdLuxLight prim — so policy cameras see lit scenes even when all USD lights are zeroed. The leak silently confounds vision-policy evals: a "low intensity" sweep on the dome light shows no change in scene brightness until the dome exceeds the renderer's ambient term. Override the carb on every Arena eval via RenderCfg.carb_settings, so USD lights are the sole source of illumination. Envs that previously relied on the ambient floor should add an explicit DomeLight asset. Signed-off-by: Clemens Volk --- .../isaaclab_arena_manager_based_env.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/isaaclab_arena/environments/isaaclab_arena_manager_based_env.py b/isaaclab_arena/environments/isaaclab_arena_manager_based_env.py index f6058015c..98cd752fd 100644 --- a/isaaclab_arena/environments/isaaclab_arena_manager_based_env.py +++ b/isaaclab_arena/environments/isaaclab_arena_manager_based_env.py @@ -7,7 +7,7 @@ from isaaclab.envs import ManagerBasedRLEnvCfg from isaaclab.envs.mimic_env_cfg import MimicEnvCfg -from isaaclab.sim import SimulationCfg +from isaaclab.sim import RenderCfg, SimulationCfg from isaaclab.utils import configclass from isaaclab_newton.physics.newton_manager_cfg import MJWarpSolverCfg, NewtonCfg from isaaclab_physx.physics import PhysxCfg @@ -72,7 +72,19 @@ class IsaacLabArenaManagerBasedRLEnvCfg(ManagerBasedRLEnvCfg): isaaclab_arena_env: IsaacLabArenaEnvironment | None = None # Overriding defaults from base class - sim: SimulationCfg = SimulationCfg(dt=1 / 200, render_interval=2) + # Override the RTX renderer's built-in scene ambient (carb /rtx/sceneDb/ambientLightIntensity, default 1.0 with + # color [0.1, 0.1, 0.1]) so that USD light prims fully control scene illumination. Without this, Arena scenes + # carry a ~10%-gray ambient floor independent of any UsdLuxLight, which silently confounds vision-policy evals. + sim: SimulationCfg = SimulationCfg( + dt=1 / 200, + render_interval=2, + render=RenderCfg( + carb_settings={ + "/rtx/sceneDb/ambientLightIntensity": 0.0, + "/rtx/sceneDb/ambientLightColor": [0.0, 0.0, 0.0], + }, + ), + ) decimation: int = 4 episode_length_s: float = 50.0 wait_for_textures: bool = False From d0c305f085b476e0403076d5a94f0657ea4305d3 Mon Sep 17 00:00:00 2001 From: Clemens Volk Date: Wed, 27 May 2026 15:00:53 +0200 Subject: [PATCH 2/6] Add per-episode summary writer for sensitivity analysis Opt-in writer (--factor_keys + --episode_summary) records the values of the listed arena_env_args keys plus per-episode outcomes (from registered task metrics) to a JSONL during eval_runner. Existing behavior is unchanged when either flag is absent. - Job.arena_env_args_dict preserves the original dict form alongside the existing CLI-args list so the writer can look up factor values by name without re-parsing the args. - The writer's import is deferred inside the per-job try block, matching the policy_runner.py:107 pattern for pxr-touching modules (the writer pulls isaaclab_arena.metrics.metrics, which loads pxr at module top). - Hand-authored factors.yaml + jobs configs check in alongside; --factor_keys on the CLI must match the factors.yaml the analyzer consumes (the analyzer validates the pairing on load). Signed-off-by: Clemens Volk --- isaaclab_arena/analysis/__init__.py | 4 + .../analysis/sensitivity/__init__.py | 4 + .../analysis/sensitivity/episode_writer.py | 99 ++++++++++ isaaclab_arena/evaluation/eval_runner.py | 25 +++ isaaclab_arena/evaluation/eval_runner_cli.py | 22 +++ isaaclab_arena/evaluation/job_manager.py | 6 + .../light_intensity_sweep_factors.yaml | 31 +++ .../light_intensity_sweep_jobs_config.json | 184 ++++++++++++++++++ ...t_intensity_sweep_minimal_jobs_config.json | 64 ++++++ 9 files changed, 439 insertions(+) create mode 100644 isaaclab_arena/analysis/__init__.py create mode 100644 isaaclab_arena/analysis/sensitivity/__init__.py create mode 100644 isaaclab_arena/analysis/sensitivity/episode_writer.py create mode 100644 isaaclab_arena_environments/eval_jobs_configs/light_intensity_sweep_factors.yaml create mode 100644 isaaclab_arena_environments/eval_jobs_configs/light_intensity_sweep_jobs_config.json create mode 100644 isaaclab_arena_environments/eval_jobs_configs/light_intensity_sweep_minimal_jobs_config.json diff --git a/isaaclab_arena/analysis/__init__.py b/isaaclab_arena/analysis/__init__.py new file mode 100644 index 000000000..fee3a6a9f --- /dev/null +++ b/isaaclab_arena/analysis/__init__.py @@ -0,0 +1,4 @@ +# Copyright (c) 2025-2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 diff --git a/isaaclab_arena/analysis/sensitivity/__init__.py b/isaaclab_arena/analysis/sensitivity/__init__.py new file mode 100644 index 000000000..fee3a6a9f --- /dev/null +++ b/isaaclab_arena/analysis/sensitivity/__init__.py @@ -0,0 +1,4 @@ +# Copyright (c) 2025-2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 diff --git a/isaaclab_arena/analysis/sensitivity/episode_writer.py b/isaaclab_arena/analysis/sensitivity/episode_writer.py new file mode 100644 index 000000000..f2b4b6d19 --- /dev/null +++ b/isaaclab_arena/analysis/sensitivity/episode_writer.py @@ -0,0 +1,99 @@ +# Copyright (c) 2025-2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Per-episode summary writer for sensitivity analysis. + +``write_episode_summaries`` appends one JSONL row per recorded demo for a just-completed +job. Each row pairs the factor values (read from ``job.arena_env_args_dict`` for the keys +the user listed via ``--factor_keys``) with the per-episode outcome values, extracted from +the recorded hdf5 demos via each metric's ``compute_metric_from_recording``. + +The matching ``factors.yaml`` (consumed by the analyzer) is the user's responsibility — it +is hand-authored alongside the experiment's jobs config and must list the same factor keys. + +Import-order note: this module legitimately touches pxr at import time via +``isaaclab_arena.metrics.metrics`` (which imports ``isaaclab.envs.manager_based_rl_env``). +Like ``metrics`` itself, callers must defer importing this module until *after* +``SimulationAppContext`` is active — see ``policy_runner.py`` (which uses the same pattern +for ``compute_metrics``) and ``eval_runner.py``'s per-job try block for examples. +""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import TYPE_CHECKING + +import h5py + +from isaaclab_arena.metrics.metrics import get_metric_recorder_dataset_path +from isaaclab_arena.metrics.metrics_logger import metrics_to_plain_python_types + +if TYPE_CHECKING: + from isaaclab_arena.evaluation.job_manager import Job + + +def write_episode_summaries( + env, + job: "Job", + factor_keys: list[str], + output_path: str | Path, +) -> int: + """Append one JSONL row per recorded demo for the just-completed job. + + Each row has shape ``{"job_name", "episode_idx", "factors", "outcomes"}``. ``factors`` + is the same dict for every row in a job (factors don't vary within a job in MVP-1); + ``outcomes`` is per-demo, computed by calling each registered metric's + ``compute_metric_from_recording([demo_data])`` with a single-demo list. + + Args: + env: The (possibly gym-wrapped) Arena env that just finished its rollout. The + hdf5 path and registered metrics are read from ``env.unwrapped.cfg``. + job: The Job that ran. ``job.arena_env_args_dict`` must contain every key in + ``factor_keys``; missing keys raise AssertionError. + factor_keys: Names of factors to record (typically passed via --factor_keys). + output_path: JSONL file to append to. Created (with parent dirs) if absent. + + Returns: + Number of rows written. + """ + unwrapped = env.unwrapped + if not hasattr(unwrapped.cfg, "metrics") or unwrapped.cfg.metrics is None: + return 0 + + factors = {} + for key in factor_keys: + assert key in job.arena_env_args_dict, ( + f"Job '{job.name}' is missing factor '{key}' in its arena_env_args. " + f"Available keys: {sorted(job.arena_env_args_dict.keys())}" + ) + factors[key] = job.arena_env_args_dict[key] + + dataset_path = get_metric_recorder_dataset_path(unwrapped) + metrics_cfg = unwrapped.cfg.metrics + output_path = Path(output_path) + output_path.parent.mkdir(parents=True, exist_ok=True) + + rows_written = 0 + with h5py.File(dataset_path, "r") as f: + demos = f["data"] + with open(output_path, "a", encoding="utf-8") as out: + for demo_idx, demo_name in enumerate(demos): + demo = demos[demo_name] + raw_outcomes = {} + for metric in metrics_cfg: + demo_data = demo[metric.recorder_term_name][:] + raw_outcomes[metric.name] = metric.compute_metric_from_recording([demo_data]) + outcomes = metrics_to_plain_python_types(raw_outcomes) + row = { + "job_name": job.name, + "episode_idx": demo_idx, + "factors": factors, + "outcomes": outcomes, + } + out.write(json.dumps(row) + "\n") + rows_written += 1 + + return rows_written diff --git a/isaaclab_arena/evaluation/eval_runner.py b/isaaclab_arena/evaluation/eval_runner.py index cd6f845d4..680633abc 100644 --- a/isaaclab_arena/evaluation/eval_runner.py +++ b/isaaclab_arena/evaluation/eval_runner.py @@ -144,6 +144,22 @@ def main(): # Check if any job requires cameras and enable them if needed before starting simulation enable_cameras_if_required(eval_jobs_config, args_cli) + # Sensitivity recording is opt-in via --factor_keys + --episode_summary. The hand-authored + # factors.yaml the analyzer consumes is the user's responsibility — keep it in sync with + # what's recorded here. + sensitivity_enabled = args_cli.factor_keys is not None and args_cli.episode_summary is not None + factor_keys: list[str] = list(args_cli.factor_keys) if args_cli.factor_keys else [] + if sensitivity_enabled: + print( + f"[INFO] Sensitivity recording enabled. Recording factors {factor_keys}" + f" per episode to: {args_cli.episode_summary}" + ) + elif args_cli.factor_keys or args_cli.episode_summary: + print( + "[WARN] --factor_keys and --episode_summary must both be set to enable sensitivity" + " recording; got only one. Skipping recording." + ) + with SimulationAppContext(args_cli): job_manager = JobManager(eval_jobs_config["jobs"]) metrics_logger = MetricsLogger() @@ -194,6 +210,15 @@ def main(): language_instruction=job.language_instruction, ) + if sensitivity_enabled: + # Deferred import — episode_writer transitively touches pxr via + # isaaclab_arena.metrics.metrics. Matches the policy_runner.py:107 + # pattern for compute_metrics. + from isaaclab_arena.analysis.sensitivity.episode_writer import write_episode_summaries + + rows = write_episode_summaries(env, job, factor_keys, args_cli.episode_summary) + print(f"[INFO] Wrote {rows} episode summaries for job '{job.name}'") + job_manager.complete_job(job, metrics=metrics, status=Status.COMPLETED) # users may not specify metrics for a task, although it's not recommended diff --git a/isaaclab_arena/evaluation/eval_runner_cli.py b/isaaclab_arena/evaluation/eval_runner_cli.py index b39187b04..dec7bfca9 100644 --- a/isaaclab_arena/evaluation/eval_runner_cli.py +++ b/isaaclab_arena/evaluation/eval_runner_cli.py @@ -27,3 +27,25 @@ def add_eval_runner_arguments(parser: argparse.ArgumentParser) -> None: default=False, help="Continue evaluation with remaining jobs when a job fails instead of stopping immediately.", ) + parser.add_argument( + "--factor_keys", + type=str, + nargs="*", + default=None, + help=( + "Names of arena_env_args keys to record per episode for sensitivity analysis." + " When set together with --episode_summary, eval_runner writes one JSONL row per" + " demo with the listed factor values + the task's registered outcomes. The schema" + " (factors.yaml) is the user's responsibility — hand-author it to match this list" + " and the analyzer reads it. Example: --factor_keys light_intensity" + ), + ) + parser.add_argument( + "--episode_summary", + type=str, + default=None, + help=( + "Output JSONL file for per-episode sensitivity summaries. Only used when" + " --factor_keys is also set. Absent means no recording, unchanged behavior." + ), + ) diff --git a/isaaclab_arena/evaluation/job_manager.py b/isaaclab_arena/evaluation/job_manager.py index 8c4d09c46..43bbe1ffb 100644 --- a/isaaclab_arena/evaluation/job_manager.py +++ b/isaaclab_arena/evaluation/job_manager.py @@ -28,6 +28,7 @@ def __init__( policy_config_dict: dict = None, status: Status = None, language_instruction: str = None, + arena_env_args_dict: dict | None = None, ): """Initialize a Job instance. @@ -42,9 +43,13 @@ def __init__( status: Job status (defaults to PENDING) language_instruction: Optional language instruction override for the policy. When set, takes precedence over the task's own description. + arena_env_args_dict: The original dict form of arena_env_args before conversion to + CLI args list. Preserves typed values (e.g. floats stay floats) for downstream + consumers that need to index by key. """ self.name = name self.arena_env_args = arena_env_args + self.arena_env_args_dict = arena_env_args_dict if arena_env_args_dict is not None else {} assert num_envs > 0, "num_envs must be greater than 0" assert not ( num_steps is not None and num_episodes is not None @@ -102,6 +107,7 @@ def from_dict(cls, data: dict) -> "Job": return cls( name=data["name"], arena_env_args=cls.convert_args_dict_to_cli_args_list(data["arena_env_args"]), + arena_env_args_dict=data["arena_env_args"], policy_type=data["policy_type"], num_envs=num_envs, num_steps=num_steps, diff --git a/isaaclab_arena_environments/eval_jobs_configs/light_intensity_sweep_factors.yaml b/isaaclab_arena_environments/eval_jobs_configs/light_intensity_sweep_factors.yaml new file mode 100644 index 000000000..1153585ef --- /dev/null +++ b/isaaclab_arena_environments/eval_jobs_configs/light_intensity_sweep_factors.yaml @@ -0,0 +1,31 @@ +# Sensitivity-analysis schema for the light_intensity sweep on droid + pi0. +# Paired with: light_intensity_sweep_jobs_config.json (and the minimal variant). +# Hand-authored — must stay in sync with --factor_keys passed to eval_runner. +# +# - slice identifies the (policy, task, embodiment) the dataset comes from; MNPE/NPE +# assumes a single data-generating source per analysis. +# - factors declares what the eval varies; eval_runner is told which arena_env_args +# keys to record via --factor_keys (must match the names here). +# - outcomes declares what the eval measures; the writer pulls these from the +# registered task metrics (compute_metric_from_recording on each demo). + +slice: + policy: pi0_remote + task: pick_and_place_maple_table + embodiment: droid_abs_joint_pos + +factors: + light_intensity: + type: continuous + dim: 1 + # Mirrors the robolab evaluated endpoints [10, 5000] for direct comparison; spans the + # dark / normal / bright regimes around the policy's trained operating point (~500). + range: [[10, 5000]] + +outcomes: + success_rate: + # Per-episode value of SuccessRateMetric. Returns 0.0 or 1.0 for a single demo. + type: float + object_moved_rate: + # Per-episode value of ObjectMovedRateMetric. Same shape as success_rate. + type: float diff --git a/isaaclab_arena_environments/eval_jobs_configs/light_intensity_sweep_jobs_config.json b/isaaclab_arena_environments/eval_jobs_configs/light_intensity_sweep_jobs_config.json new file mode 100644 index 000000000..6da57f719 --- /dev/null +++ b/isaaclab_arena_environments/eval_jobs_configs/light_intensity_sweep_jobs_config.json @@ -0,0 +1,184 @@ +{ + "jobs": [ + { + "name": "light_intensity_sweep_10", + "arena_env_args": { + "enable_cameras": true, + "environment": "pick_and_place_maple_table", + "embodiment": "droid_abs_joint_pos", + "hdr": "billiard_hall_robolab", + "light_intensity": 10 + }, + "num_episodes": 20, + "language_instruction": "Pick up the Rubik's cube and place it in the bowl.", + "policy_type": "isaaclab_arena_openpi.policy.pi0_remote_policy.Pi0RemotePolicy", + "policy_config_dict": { + "policy_variant": "pi05", + "policy_device": "cuda:0", + "remote_host": "127.0.0.1", + "remote_port": 8000, + "openpi_embodiment_adapter": "droid" + } + }, + { + "name": "light_intensity_sweep_25", + "arena_env_args": { + "enable_cameras": true, + "environment": "pick_and_place_maple_table", + "embodiment": "droid_abs_joint_pos", + "hdr": "billiard_hall_robolab", + "light_intensity": 25 + }, + "num_episodes": 20, + "language_instruction": "Pick up the Rubik's cube and place it in the bowl.", + "policy_type": "isaaclab_arena_openpi.policy.pi0_remote_policy.Pi0RemotePolicy", + "policy_config_dict": { + "policy_variant": "pi05", + "policy_device": "cuda:0", + "remote_host": "127.0.0.1", + "remote_port": 8000, + "openpi_embodiment_adapter": "droid" + } + }, + { + "name": "light_intensity_sweep_60", + "arena_env_args": { + "enable_cameras": true, + "environment": "pick_and_place_maple_table", + "embodiment": "droid_abs_joint_pos", + "hdr": "billiard_hall_robolab", + "light_intensity": 60 + }, + "num_episodes": 20, + "language_instruction": "Pick up the Rubik's cube and place it in the bowl.", + "policy_type": "isaaclab_arena_openpi.policy.pi0_remote_policy.Pi0RemotePolicy", + "policy_config_dict": { + "policy_variant": "pi05", + "policy_device": "cuda:0", + "remote_host": "127.0.0.1", + "remote_port": 8000, + "openpi_embodiment_adapter": "droid" + } + }, + { + "name": "light_intensity_sweep_150", + "arena_env_args": { + "enable_cameras": true, + "environment": "pick_and_place_maple_table", + "embodiment": "droid_abs_joint_pos", + "hdr": "billiard_hall_robolab", + "light_intensity": 150 + }, + "num_episodes": 20, + "language_instruction": "Pick up the Rubik's cube and place it in the bowl.", + "policy_type": "isaaclab_arena_openpi.policy.pi0_remote_policy.Pi0RemotePolicy", + "policy_config_dict": { + "policy_variant": "pi05", + "policy_device": "cuda:0", + "remote_host": "127.0.0.1", + "remote_port": 8000, + "openpi_embodiment_adapter": "droid" + } + }, + { + "name": "light_intensity_sweep_350", + "arena_env_args": { + "enable_cameras": true, + "environment": "pick_and_place_maple_table", + "embodiment": "droid_abs_joint_pos", + "hdr": "billiard_hall_robolab", + "light_intensity": 350 + }, + "num_episodes": 20, + "language_instruction": "Pick up the Rubik's cube and place it in the bowl.", + "policy_type": "isaaclab_arena_openpi.policy.pi0_remote_policy.Pi0RemotePolicy", + "policy_config_dict": { + "policy_variant": "pi05", + "policy_device": "cuda:0", + "remote_host": "127.0.0.1", + "remote_port": 8000, + "openpi_embodiment_adapter": "droid" + } + }, + { + "name": "light_intensity_sweep_800", + "arena_env_args": { + "enable_cameras": true, + "environment": "pick_and_place_maple_table", + "embodiment": "droid_abs_joint_pos", + "hdr": "billiard_hall_robolab", + "light_intensity": 800 + }, + "num_episodes": 20, + "language_instruction": "Pick up the Rubik's cube and place it in the bowl.", + "policy_type": "isaaclab_arena_openpi.policy.pi0_remote_policy.Pi0RemotePolicy", + "policy_config_dict": { + "policy_variant": "pi05", + "policy_device": "cuda:0", + "remote_host": "127.0.0.1", + "remote_port": 8000, + "openpi_embodiment_adapter": "droid" + } + }, + { + "name": "light_intensity_sweep_1800", + "arena_env_args": { + "enable_cameras": true, + "environment": "pick_and_place_maple_table", + "embodiment": "droid_abs_joint_pos", + "hdr": "billiard_hall_robolab", + "light_intensity": 1800 + }, + "num_episodes": 20, + "language_instruction": "Pick up the Rubik's cube and place it in the bowl.", + "policy_type": "isaaclab_arena_openpi.policy.pi0_remote_policy.Pi0RemotePolicy", + "policy_config_dict": { + "policy_variant": "pi05", + "policy_device": "cuda:0", + "remote_host": "127.0.0.1", + "remote_port": 8000, + "openpi_embodiment_adapter": "droid" + } + }, + { + "name": "light_intensity_sweep_4000", + "arena_env_args": { + "enable_cameras": true, + "environment": "pick_and_place_maple_table", + "embodiment": "droid_abs_joint_pos", + "hdr": "billiard_hall_robolab", + "light_intensity": 4000 + }, + "num_episodes": 20, + "language_instruction": "Pick up the Rubik's cube and place it in the bowl.", + "policy_type": "isaaclab_arena_openpi.policy.pi0_remote_policy.Pi0RemotePolicy", + "policy_config_dict": { + "policy_variant": "pi05", + "policy_device": "cuda:0", + "remote_host": "127.0.0.1", + "remote_port": 8000, + "openpi_embodiment_adapter": "droid" + } + }, + { + "name": "light_intensity_sweep_5000", + "arena_env_args": { + "enable_cameras": true, + "environment": "pick_and_place_maple_table", + "embodiment": "droid_abs_joint_pos", + "hdr": "billiard_hall_robolab", + "light_intensity": 5000 + }, + "num_episodes": 20, + "language_instruction": "Pick up the Rubik's cube and place it in the bowl.", + "policy_type": "isaaclab_arena_openpi.policy.pi0_remote_policy.Pi0RemotePolicy", + "policy_config_dict": { + "policy_variant": "pi05", + "policy_device": "cuda:0", + "remote_host": "127.0.0.1", + "remote_port": 8000, + "openpi_embodiment_adapter": "droid" + } + } + ] +} diff --git a/isaaclab_arena_environments/eval_jobs_configs/light_intensity_sweep_minimal_jobs_config.json b/isaaclab_arena_environments/eval_jobs_configs/light_intensity_sweep_minimal_jobs_config.json new file mode 100644 index 000000000..0e6f1d3ce --- /dev/null +++ b/isaaclab_arena_environments/eval_jobs_configs/light_intensity_sweep_minimal_jobs_config.json @@ -0,0 +1,64 @@ +{ + "jobs": [ + { + "name": "light_intensity_minimal_100", + "arena_env_args": { + "enable_cameras": true, + "environment": "pick_and_place_maple_table", + "embodiment": "droid_abs_joint_pos", + "hdr": "billiard_hall_robolab", + "light_intensity": 100 + }, + "num_episodes": 2, + "language_instruction": "Pick up the Rubik's cube and place it in the bowl.", + "policy_type": "isaaclab_arena_openpi.policy.pi0_remote_policy.Pi0RemotePolicy", + "policy_config_dict": { + "policy_variant": "pi05", + "policy_device": "cuda:0", + "remote_host": "127.0.0.1", + "remote_port": 8000, + "openpi_embodiment_adapter": "droid" + } + }, + { + "name": "light_intensity_minimal_500", + "arena_env_args": { + "enable_cameras": true, + "environment": "pick_and_place_maple_table", + "embodiment": "droid_abs_joint_pos", + "hdr": "billiard_hall_robolab", + "light_intensity": 500 + }, + "num_episodes": 2, + "language_instruction": "Pick up the Rubik's cube and place it in the bowl.", + "policy_type": "isaaclab_arena_openpi.policy.pi0_remote_policy.Pi0RemotePolicy", + "policy_config_dict": { + "policy_variant": "pi05", + "policy_device": "cuda:0", + "remote_host": "127.0.0.1", + "remote_port": 8000, + "openpi_embodiment_adapter": "droid" + } + }, + { + "name": "light_intensity_minimal_5000", + "arena_env_args": { + "enable_cameras": true, + "environment": "pick_and_place_maple_table", + "embodiment": "droid_abs_joint_pos", + "hdr": "billiard_hall_robolab", + "light_intensity": 5000 + }, + "num_episodes": 2, + "language_instruction": "Pick up the Rubik's cube and place it in the bowl.", + "policy_type": "isaaclab_arena_openpi.policy.pi0_remote_policy.Pi0RemotePolicy", + "policy_config_dict": { + "policy_variant": "pi05", + "policy_device": "cuda:0", + "remote_host": "127.0.0.1", + "remote_port": 8000, + "openpi_embodiment_adapter": "droid" + } + } + ] +} From d3239082206bbe5ae8331a17f5841dfc46f3e098 Mon Sep 17 00:00:00 2001 From: Clemens Volk Date: Wed, 27 May 2026 15:01:09 +0200 Subject: [PATCH 3/6] Add NPE-based sensitivity analyzer Reads paired factors.yaml + episode_summary.jsonl into the (theta, x, prior, factor_columns) quadruple sbi consumes, trains NPE on a chosen outcome, plots the 1D posterior marginal for a continuous factor. CLI driver at isaaclab_arena/scripts/analyze_sensitivity.py. - MVP-1 scope: one continuous 1D factor; categorical and vector (dim > 1) branches raise NotImplementedError so the extension point is reserved. - Runtime [WARN] when fitting on a binary outcome surfaces sbi's 1D-Gaussian fallback caveat: the recovered peak reflects the empirical mean of successful theta values, not the true mode of the success curve. - synthetic_data.py generates a paired JSONL + factors.yaml from a known competence band, letting the analyzer smoke-test end-to-end without sim. - sbi added to DEV_DEPS so the docker dev install picks it up on rebuild. Signed-off-by: Clemens Volk --- .../analysis/sensitivity/analyzer.py | 164 ++++++++++++ .../analysis/sensitivity/dataset.py | 233 ++++++++++++++++++ .../analysis/sensitivity/synthetic_data.py | 166 +++++++++++++ isaaclab_arena/scripts/analyze_sensitivity.py | 105 ++++++++ setup.py | 1 + 5 files changed, 669 insertions(+) create mode 100644 isaaclab_arena/analysis/sensitivity/analyzer.py create mode 100644 isaaclab_arena/analysis/sensitivity/dataset.py create mode 100644 isaaclab_arena/analysis/sensitivity/synthetic_data.py create mode 100644 isaaclab_arena/scripts/analyze_sensitivity.py diff --git a/isaaclab_arena/analysis/sensitivity/analyzer.py b/isaaclab_arena/analysis/sensitivity/analyzer.py new file mode 100644 index 000000000..afe1fc46a --- /dev/null +++ b/isaaclab_arena/analysis/sensitivity/analyzer.py @@ -0,0 +1,164 @@ +# Copyright (c) 2025-2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""NPE analyzer for 1D continuous sensitivity analysis (MVP-1 path). + +Trains an ``sbi.inference.NPE`` density estimator on a ``SensitivityDataset`` and plots +the posterior over a single 1D continuous factor conditional on a chosen outcome value. + +Under the uniform prior used by v0.3, the posterior shape ``P(theta | outcome=success)`` +is proportional to the sensitivity curve ``P(success | theta)`` — the conditioning is +just inverted via Bayes with a constant prior. The plot shows the posterior density +overlaid with the empirical data, colored by outcome, so a human can sanity-check that +high-density regions correspond to clusters of successful episodes. +""" + +from __future__ import annotations + +import numpy as np +import torch +from pathlib import Path + +from isaaclab_arena.analysis.sensitivity.dataset import SensitivityDataset + + +class NPEAnalyzer: + """Trains sbi NPE on (theta, x[outcome]) and plots the 1D posterior marginal. + + For MVP-1 the analyzer is restricted to: + - one continuous 1D factor (raises NotImplementedError otherwise), + - a single outcome column at a time (selected by name at fit time). + + Multi-factor / categorical / vector extensions go in subclasses or follow-up modules; + this one stays minimal so the smoke test is a clear, debuggable signal. + """ + + def __init__(self, dataset: SensitivityDataset, outcome_name: str): + self.dataset = dataset + self.outcome_name = outcome_name + assert outcome_name in dataset.outcome_columns, ( + f"Outcome {outcome_name!r} not found in schema; available: {list(dataset.outcome_columns)}" + ) + # MVP-1 guards — keep them loud so anyone extending the analyzer notices. + assert dataset.theta.shape[1] == 1, ( + f"NPEAnalyzer (MVP-1) supports 1D theta only; got shape {tuple(dataset.theta.shape)}." + " Multi-factor or vector factors need the MNPE/multi-dim analyzer." + ) + self.posterior = None + + def fit(self, training_batch_size: int = 50) -> None: + """Train NPE on (theta, x_selected). Stores the posterior on the instance.""" + from sbi.inference import NPE + + outcome_idx = self.dataset.outcome_columns[self.outcome_name] + x_selected = self.dataset.x[:, outcome_idx : outcome_idx + 1] + + # Surface the sbi 1D-binary caveat in the user's output instead of letting it slip by as a + # deeply nested UserWarning. Detection: x has a single column whose values are all in {0, 1}. + unique_vals = set(x_selected.flatten().tolist()) + if unique_vals.issubset({0.0, 1.0}): + print( + "[WARN] Outcome " + f"'{self.outcome_name}' is binary (values in {{0, 1}}). sbi NPE falls back to a" + " Gaussian density in 1D output space, so the recovered posterior peak reflects the" + " *mean* of successful theta values rather than the true *mode* of the success" + " curve. The peak location may be shifted; the qualitative shape is still" + " informative. Mitigations (future): fit on multiple outcomes simultaneously, use" + " a log-prior, or swap to a binary-appropriate fitter (e.g. logistic regression)." + ) + + inference = NPE(prior=self.dataset.prior) + inference.append_simulations(self.dataset.theta, x_selected) + density_estimator = inference.train(training_batch_size=training_batch_size) + self.posterior = inference.build_posterior(density_estimator) + + def plot_marginal( + self, + factor_name: str, + output_path: str | Path, + outcome_value: float = 1.0, + num_grid_points: int = 200, + ) -> None: + """Plot ``P(factor | outcome=outcome_value)`` as a density curve. + + Args: + factor_name: Which factor's marginal to plot. For MVP-1 this is the only factor. + output_path: Where to save the figure (PNG inferred from extension). + outcome_value: Conditioning value; 1.0 for "given success". + num_grid_points: Resolution of the posterior density curve. + """ + import matplotlib.pyplot as plt + + assert self.posterior is not None, "Call fit() before plot_marginal()" + assert factor_name in self.dataset.factor_columns, ( + f"Factor {factor_name!r} not in schema; available: {list(self.dataset.factor_columns)}" + ) + + # Build a grid over the factor's declared/inferred range. + factor_spec = next(f for f in self.dataset.schema.factors if f.name == factor_name) + assert factor_spec.range is not None and len(factor_spec.range) == 1, ( + "plot_marginal (MVP-1) expects a single 1D continuous factor with a populated range" + ) + lo, hi = factor_spec.range[0] + grid = torch.linspace(lo, hi, num_grid_points, dtype=torch.float32).unsqueeze(1) + + # Evaluate posterior density at each grid point conditional on the observed outcome. + x_obs = torch.tensor([outcome_value], dtype=torch.float32) + with torch.no_grad(): + log_probs = self.posterior.log_prob(grid, x=x_obs) + density = torch.exp(log_probs).cpu().numpy() + grid_np = grid.squeeze(-1).cpu().numpy() + + # Empirical data: theta values colored by whether they hit the conditioning outcome. + outcome_idx = self.dataset.outcome_columns[self.outcome_name] + emp_theta = self.dataset.theta[:, 0].cpu().numpy() + emp_outcome = self.dataset.x[:, outcome_idx].cpu().numpy() + hit_mask = emp_outcome >= 0.5 + + fig, ax = plt.subplots(figsize=(8, 5)) + ax.plot( + grid_np, + density, + color="steelblue", + linewidth=2, + label=f"P({factor_name} | {self.outcome_name}={outcome_value:g})", + ) + ax.fill_between(grid_np, 0, density, color="steelblue", alpha=0.2) + + # Rug plot of empirical data, two rows for the two outcome levels. + y_rug_hit = np.full(hit_mask.sum(), -0.05 * density.max()) + y_rug_miss = np.full((~hit_mask).sum(), -0.1 * density.max()) + ax.scatter( + emp_theta[hit_mask], + y_rug_hit, + marker="|", + color="seagreen", + s=80, + label=f"{self.outcome_name} ≥ 0.5 (n={hit_mask.sum()})", + ) + ax.scatter( + emp_theta[~hit_mask], + y_rug_miss, + marker="|", + color="firebrick", + s=80, + label=f"{self.outcome_name} < 0.5 (n={(~hit_mask).sum()})", + ) + + ax.set_xlabel(factor_name) + ax.set_ylabel("posterior density") + ax.set_title( + f"Sensitivity of {self.outcome_name} to {factor_name}\n" + f"slice: {self.dataset.schema.slice.policy} / " + f"{self.dataset.schema.slice.task} / {self.dataset.schema.slice.embodiment}" + ) + ax.legend(loc="best", fontsize=9) + ax.grid(alpha=0.3) + fig.tight_layout() + + output_path = Path(output_path) + output_path.parent.mkdir(parents=True, exist_ok=True) + fig.savefig(output_path, dpi=150) + plt.close(fig) diff --git a/isaaclab_arena/analysis/sensitivity/dataset.py b/isaaclab_arena/analysis/sensitivity/dataset.py new file mode 100644 index 000000000..63fd4ab0f --- /dev/null +++ b/isaaclab_arena/analysis/sensitivity/dataset.py @@ -0,0 +1,233 @@ +# Copyright (c) 2025-2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Schema parser and dataset loader for sensitivity analysis. + +Combines a hand-authored ``factors.yaml`` (the declared schema + priors) with an +``episode_summary.jsonl`` (per-episode factor draws + outcome values, written by +``episode_writer``) into the tensors that ``sbi`` consumes: + + theta: (N, total_factor_dim) factor values per episode, continuous-first column order + x: (N, n_outcomes) outcome values per episode + prior: sbi.utils.BoxUniform built from declared (or data-inferred) range per factor + +``factor_columns[name]`` returns the slice that factor occupies in ``theta`` — the +stable interface across MVP-1/2/3 that lets the analyzer extract marginals by name. + +MVP-1 implements the continuous-scalar branch only; categorical and vector (dim > 1) +branches raise NotImplementedError so adding them later is a fill-in, not a rewrite. +""" + +from __future__ import annotations + +import json +import yaml +import torch +from dataclasses import dataclass +from pathlib import Path +from typing import Literal + + +@dataclass +class FactorSpec: + name: str + type: Literal["continuous", "categorical"] + dim: int = 1 + range: list[list[float]] | None = None # one [low, high] pair per dim, continuous only + choices: list[str] | None = None # categorical only + + +@dataclass +class OutcomeSpec: + name: str + type: str # "bool", "float", "int" — informational; loader treats all as float + + +@dataclass +class SliceSpec: + policy: str + task: str + embodiment: str + + +@dataclass +class FactorSchema: + slice: SliceSpec + factors: list[FactorSpec] + outcomes: list[OutcomeSpec] + + @classmethod + def from_yaml(cls, path: str | Path) -> FactorSchema: + with open(path, encoding="utf-8") as f: + data = yaml.safe_load(f) + assert isinstance(data, dict), f"factors.yaml at {path} must be a mapping at top level" + for required in ("slice", "factors", "outcomes"): + assert required in data, f"factors.yaml at {path} is missing top-level `{required}:` block" + + slice_data = data["slice"] + for required in ("policy", "task", "embodiment"): + assert required in slice_data, ( + f"factors.yaml at {path} `slice:` block is missing `{required}` (need policy/task/embodiment)" + ) + slice_spec = SliceSpec( + policy=slice_data["policy"], + task=slice_data["task"], + embodiment=slice_data["embodiment"], + ) + + factors = [] + for name, spec in data["factors"].items(): + assert "type" in spec, ( + f"factors.yaml at {path} factor {name!r} is missing required `type:` field" + " (expected 'continuous' or 'categorical')" + ) + ftype = spec["type"] + assert ftype in ("continuous", "categorical"), ( + f"factors.yaml at {path} factor {name!r} has unknown type {ftype!r};" + " expected 'continuous' or 'categorical'" + ) + factors.append( + FactorSpec( + name=name, + type=ftype, + dim=spec.get("dim", 1), + range=spec.get("range"), + choices=spec.get("choices"), + ) + ) + + outcomes = [OutcomeSpec(name=name, type=spec.get("type", "float")) for name, spec in data["outcomes"].items()] + + return cls(slice=slice_spec, factors=factors, outcomes=outcomes) + + @property + def total_factor_dim(self) -> int: + return sum(f.dim if f.type == "continuous" else 1 for f in self.factors) + + @property + def factor_columns(self) -> dict[str, slice]: + """Map factor name → column slice in theta. Continuous factors come first, categoricals after.""" + cont = [f for f in self.factors if f.type == "continuous"] + cat = [f for f in self.factors if f.type == "categorical"] + cols: dict[str, slice] = {} + i = 0 + for f in cont + cat: + width = f.dim if f.type == "continuous" else 1 + cols[f.name] = slice(i, i + width) + i += width + return cols + + +class SensitivityDataset: + """Combines factors.yaml + episode_summary.jsonl into (theta, x, prior, factor_columns). + + Validates that every JSONL row contains all declared factors and outcomes; fills in any + missing continuous ranges by inferring from observed min/max so downstream code can always + trust ``schema.factors[i].range`` to be populated. + """ + + def __init__(self, factors_yaml: str | Path, jsonl_path: str | Path): + self.schema = FactorSchema.from_yaml(factors_yaml) + + text = Path(jsonl_path).read_text(encoding="utf-8") + self.rows = [json.loads(line) for line in text.splitlines() if line.strip()] + assert len(self.rows) > 0, f"Empty episode_summary.jsonl at {jsonl_path}" + + self._validate_rows(jsonl_path) + self._fill_inferred_ranges() + + self._theta = self._build_theta() + self._x = self._build_x() + + def _validate_rows(self, jsonl_path: str | Path) -> None: + expected_factors = {f.name for f in self.schema.factors} + expected_outcomes = {o.name for o in self.schema.outcomes} + for i, row in enumerate(self.rows): + assert "factors" in row and "outcomes" in row, f"Row {i} of {jsonl_path} missing factors/outcomes block" + got_factors = set(row["factors"].keys()) + assert got_factors == expected_factors, ( + f"Row {i} of {jsonl_path} declares factors {sorted(got_factors)}; " + f"schema expects {sorted(expected_factors)}" + ) + missing_outcomes = expected_outcomes - set(row["outcomes"].keys()) + assert not missing_outcomes, ( + f"Row {i} of {jsonl_path} missing outcomes {sorted(missing_outcomes)}" + ) + + def _fill_inferred_ranges(self) -> None: + for f in self.schema.factors: + if f.type != "continuous" or f.range is not None: + continue + if f.dim != 1: + raise NotImplementedError( + f"Range inference for vector factors (dim > 1) is not implemented; factor {f.name!r} has dim={f.dim}" + ) + values = [float(row["factors"][f.name]) for row in self.rows] + f.range = [[min(values), max(values)]] + + def _build_theta(self) -> torch.Tensor: + cont_factors = [f for f in self.schema.factors if f.type == "continuous"] + cat_factors = [f for f in self.schema.factors if f.type == "categorical"] + if cat_factors: + raise NotImplementedError( + "Categorical factors are not yet supported by SensitivityDataset (MVP-1 covers continuous only)." + ) + + cols = [] + for f in cont_factors: + if f.dim != 1: + raise NotImplementedError( + f"Vector continuous factors (dim > 1) are not yet supported; factor {f.name!r} has dim={f.dim}" + ) + col = torch.tensor( + [float(row["factors"][f.name]) for row in self.rows], dtype=torch.float32 + ).unsqueeze(1) + cols.append(col) + return torch.cat(cols, dim=1) if cols else torch.zeros((len(self.rows), 0), dtype=torch.float32) + + def _build_x(self) -> torch.Tensor: + outcome_cols = [ + torch.tensor([float(row["outcomes"][o.name]) for row in self.rows], dtype=torch.float32).unsqueeze(1) + for o in self.schema.outcomes + ] + return torch.cat(outcome_cols, dim=1) + + @property + def theta(self) -> torch.Tensor: + return self._theta + + @property + def x(self) -> torch.Tensor: + return self._x + + @property + def factor_columns(self) -> dict[str, slice]: + return self.schema.factor_columns + + @property + def outcome_columns(self) -> dict[str, int]: + return {o.name: i for i, o in enumerate(self.schema.outcomes)} + + @property + def prior(self): + """sbi BoxUniform over all continuous factor dims, built from declared/inferred ranges. + + Imported lazily so loading the module doesn't require ``sbi`` for non-analysis use. + """ + from sbi.utils import BoxUniform + + low: list[float] = [] + high: list[float] = [] + for f in self.schema.factors: + if f.type != "continuous": + continue + assert f.range is not None, f"Factor {f.name!r} has no range and was not inferred" + for lo, hi in f.range: + low.append(float(lo)) + high.append(float(hi)) + return BoxUniform( + low=torch.tensor(low, dtype=torch.float32), + high=torch.tensor(high, dtype=torch.float32), + ) diff --git a/isaaclab_arena/analysis/sensitivity/synthetic_data.py b/isaaclab_arena/analysis/sensitivity/synthetic_data.py new file mode 100644 index 000000000..ab46b06bf --- /dev/null +++ b/isaaclab_arena/analysis/sensitivity/synthetic_data.py @@ -0,0 +1,166 @@ +# Copyright (c) 2025-2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Synthetic JSONL generator for smoke-testing the sensitivity analysis pipeline. + +Produces a fake ``episode_summary.jsonl`` with a known linear-Gaussian competence band: + + P(success | intensity) = exp(-(intensity - center)^2 / (2 * sigma^2)) + +i.e. a Gaussian directly in linear intensity space centered on a trained operating point. + +Sampling is **linear-uniform** over ``[10, 5000]`` (one intensity drawn independently per +episode). This matches the semantics of ``Uniform(10, 5000)`` in Alex's variation system +and matches the uniform prior declared in factors.yaml. With these choices the smoke +test should recover the posterior peak exactly at ``center``, because: + + 1. linear uniform sampling matches the declared uniform prior (no sampling bias), + 2. a linear-Gaussian likelihood is symmetric in linear theta-space, so its mode + equals its mean — and the NPE Gaussian fallback for 1D binary outcomes fits + the mean, recovering the true center. + +A more realistic competence band would be log-Gaussian (asymmetric: cameras blind fast +at low intensity, saturate gradually at high), but that introduces a peak-bias artifact +that masks pipeline-correctness signal. This smoke test deliberately matches the +structural assumptions the analyzer can recover exactly, so any mismatch in the output +points to a real bug rather than a known statistical limitation. + +Pair with the hand-authored ``light_intensity_sweep_factors.yaml`` so the analyzer +script can be smoke-tested end-to-end without running Isaac Sim: + + /isaac-sim/python.sh -m isaaclab_arena.analysis.sensitivity.synthetic_data \\ + --output /tmp/syn.jsonl + /isaac-sim/python.sh -m isaaclab_arena.scripts.analyze_sensitivity \\ + --factors_yaml isaaclab_arena_environments/eval_jobs_configs/light_intensity_sweep_factors.yaml \\ + --episode_summary /tmp/syn.jsonl \\ + --figure_path /tmp/syn_plot.png + +Expected output: a posterior-density curve peaking at ``center`` (default 500), with +empirical rug markers showing successes clustered around the center and failures at +both extremes. +""" + +from __future__ import annotations + +import argparse +import json +import math +import random +from pathlib import Path + +INTENSITY_LOW = 10.0 +INTENSITY_HIGH = 5000.0 + +# A self-contained factors.yaml template for the synthetic dataset. Kept inline (rather +# than imported from episode_writer.py) so this module stays a pure-python dev tool — +# importing episode_writer would transitively load pxr via isaaclab_arena.metrics. +_SYNTHETIC_FACTORS_YAML = """\ +# factors.yaml — synthetic dataset for analyzer smoke-testing. +# Auto-emitted by isaaclab_arena.analysis.sensitivity.synthetic_data alongside the JSONL. + +slice: + policy: synthetic_linear_uniform + task: synthetic_pick_and_place + embodiment: synthetic + +factors: + light_intensity: + type: continuous + dim: 1 + +outcomes: + success_rate: + type: float + object_moved_rate: + type: float +""" + + +def p_success(intensity: float, center: float, sigma: float) -> float: + """Linear-Gaussian competence band: peaks at `center`, falls off symmetrically in linear space.""" + z = (intensity - center) / sigma + return math.exp(-0.5 * z * z) + + +def main(): + parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument( + "--output", type=str, default="/tmp/synthetic_episode_summary.jsonl", help="Output JSONL path." + ) + parser.add_argument( + "--factors-yaml-out", + type=str, + default=None, + help="Output factors.yaml path. Default: same directory as --output, named factors.yaml.", + ) + parser.add_argument( + "--num-episodes", + type=int, + default=180, + help="Total number of episodes to generate. Each draws an intensity from Uniform(10, 5000).", + ) + parser.add_argument( + "--center", type=float, default=500.0, help="Intensity where success rate peaks. Default: 500." + ) + parser.add_argument( + "--sigma", + type=float, + default=400.0, + help="Linear-space width of the competence band (1 sigma in intensity units). Default: 400," + " which gives ~95%% success in [100, 900] and near-zero success beyond ~1700.", + ) + parser.add_argument("--seed", type=int, default=42, help="RNG seed for reproducibility.") + args = parser.parse_args() + + rng = random.Random(args.seed) + + rows = [] + for ep_idx in range(args.num_episodes): + intensity = rng.uniform(INTENSITY_LOW, INTENSITY_HIGH) + p = p_success(intensity, args.center, args.sigma) + success = 1.0 if rng.random() < p else 0.0 + rows.append( + { + "job_name": "synth_linear_uniform", + "episode_idx": ep_idx, + "factors": {"light_intensity": intensity}, + "outcomes": {"success_rate": success, "object_moved_rate": success}, + } + ) + + output_path = Path(args.output) + output_path.parent.mkdir(parents=True, exist_ok=True) + with open(output_path, "w", encoding="utf-8") as f: + for row in rows: + f.write(json.dumps(row) + "\n") + + # Emit a matching factors.yaml so the analyzer can be pointed at this synthetic dataset + # without any hand-authored schema. Inline string template — see _SYNTHETIC_FACTORS_YAML. + factors_yaml_out = ( + Path(args.factors_yaml_out) if args.factors_yaml_out else output_path.parent / "factors.yaml" + ) + factors_yaml_out.parent.mkdir(parents=True, exist_ok=True) + factors_yaml_out.write_text(_SYNTHETIC_FACTORS_YAML, encoding="utf-8") + + print(f"[INFO] Wrote {len(rows)} rows to {output_path}") + print(f"[INFO] Wrote factors schema → {factors_yaml_out}") + print(f"[INFO] Linear-Gaussian competence band: center={args.center:g}, sigma={args.sigma:g}") + print("[INFO] Per-bin success rates (10 equal bins across the prior range):") + num_bins = 10 + bin_width = (INTENSITY_HIGH - INTENSITY_LOW) / num_bins + for bin_idx in range(num_bins): + lo = INTENSITY_LOW + bin_idx * bin_width + hi = lo + bin_width + bin_rows = [r for r in rows if lo <= r["factors"]["light_intensity"] < hi] + if not bin_rows: + continue + succ = sum(int(r["outcomes"]["success_rate"]) for r in bin_rows) + pct = 100 * succ / len(bin_rows) + bar = "█" * int(round(pct / 5)) + print(f" [{lo:>5g}, {hi:>5g}): {succ:>3d}/{len(bin_rows):<3d} ({pct:>5.1f}%) {bar}") + + +if __name__ == "__main__": + main() diff --git a/isaaclab_arena/scripts/analyze_sensitivity.py b/isaaclab_arena/scripts/analyze_sensitivity.py new file mode 100644 index 000000000..4148aa6b6 --- /dev/null +++ b/isaaclab_arena/scripts/analyze_sensitivity.py @@ -0,0 +1,105 @@ +# Copyright (c) 2025-2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""CLI driver for 1D continuous sensitivity analysis (MVP-1). + +Loads a SensitivityDataset from a paired (factors.yaml, episode_summary.jsonl), trains +NPE on the selected outcome column, and saves a posterior-marginal plot for the chosen +factor. + +Example: + python -m isaaclab_arena.scripts.analyze_sensitivity \\ + --factors_yaml isaaclab_arena_environments/eval_jobs_configs/light_intensity_sweep_factors.yaml \\ + --episode_summary ./episode_summary.jsonl \\ + --figure_path ./light_intensity_sensitivity.png + +This script runs entirely offline — no Isaac Sim, no policy server. +""" + +from __future__ import annotations + +import argparse + +from isaaclab_arena.analysis.sensitivity.analyzer import NPEAnalyzer +from isaaclab_arena.analysis.sensitivity.dataset import SensitivityDataset + + +def main(): + parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("--factors_yaml", type=str, required=True, help="Path to factors.yaml.") + parser.add_argument( + "--episode_summary", type=str, required=True, help="Path to episode_summary.jsonl produced by eval_runner." + ) + parser.add_argument( + "--input_factor", + type=str, + default=None, + help="Name of the factor to plot. Defaults to the only factor declared in factors.yaml.", + ) + parser.add_argument( + "--output_metric", + type=str, + default=None, + help="Outcome name to condition on. Defaults to the first outcome listed in factors.yaml.", + ) + parser.add_argument( + "--outcome_value", + type=float, + default=1.0, + help="Outcome value to condition on (1.0 = success). Default: 1.0.", + ) + parser.add_argument( + "--figure_path", + type=str, + default="./sensitivity.png", + help="Output figure path. Default: ./sensitivity.png.", + ) + args = parser.parse_args() + + print(f"[INFO] Loading dataset: factors={args.factors_yaml} jsonl={args.episode_summary}") + dataset = SensitivityDataset(args.factors_yaml, args.episode_summary) + + available_factors = list(dataset.factor_columns) + available_outcomes = [o.name for o in dataset.schema.outcomes] + + if args.input_factor is None: + factor_name = available_factors[0] + else: + if args.input_factor not in available_factors: + parser.error( + f"--input_factor {args.input_factor!r} not found in factors.yaml. " + f"Available factors: {available_factors}" + ) + factor_name = args.input_factor + + if args.output_metric is None: + outcome_name = available_outcomes[0] + else: + if args.output_metric not in available_outcomes: + parser.error( + f"--output_metric {args.output_metric!r} not found in factors.yaml. " + f"Available outcomes: {available_outcomes}" + ) + outcome_name = args.output_metric + + print( + f"[INFO] Analyzing factor '{factor_name}' against outcome '{outcome_name}'" + f" (conditioning on outcome={args.outcome_value:g})" + ) + print( + f"[INFO] N={len(dataset.rows)} episodes; theta shape={tuple(dataset.theta.shape)};" + f" x shape={tuple(dataset.x.shape)}" + ) + + analyzer = NPEAnalyzer(dataset, outcome_name=outcome_name) + print("[INFO] Fitting NPE...") + analyzer.fit() + print(f"[INFO] Plotting marginal -> {args.figure_path}") + analyzer.plot_marginal(factor_name=factor_name, output_path=args.figure_path, outcome_value=args.outcome_value) + print("[INFO] Done.") + + +if __name__ == "__main__": + main() diff --git a/setup.py b/setup.py index 82cd92b56..2c1c5944f 100644 --- a/setup.py +++ b/setup.py @@ -20,6 +20,7 @@ "jupyter", "debugpy", "tenacity", + "sbi", ] setup( From 2e33a5e3baa5f989cf66b2f3dc276a2a818b8c94 Mon Sep 17 00:00:00 2001 From: Clemens Volk Date: Thu, 28 May 2026 13:47:13 +0200 Subject: [PATCH 4/6] Add categorical factor support + schema cleanup for sensitivity analysis Builds on the MVP-1 foundation (#729) with categorical factor support, a cleaner analyzer/plotting separation, and a tighter eval-side / analysis-side contract that drops a class of drift bugs. - Analyzer hierarchy (BaseAnalyzer / PosteriorAnalyzer / NPEAnalyzer / MNPEAnalyzer / EmpiricalAnalyzer) dispatched via make_analyzer. Pure- categorical schemas use empirical frequency analysis directly (under uniform prior the posterior is exactly the normalized per-category success rate); sbi MNPE 0.26 also requires at least one continuous theta column, which this dispatch handles automatically. - Split inference (analyzer.py) from rendering (plotting.py). Analyzers expose continuous_marginal_density and categorical_marginal_probs queries; plotting consumes them via plot_marginal. New plot types become additive (free functions) without touching the analyzer. - Drop --factor_keys CLI flag on eval_runner. The writer now logs the full arena_env_args per episode; the analyzer-side factors.yaml picks what to study. Removes the drift bug class where --factor_keys and factors.yaml could disagree. - Rename JSONL field "factors" -> "arena_env_args". Honest about provenance and leaves room for sibling source fields (future "sim_state" for MVP-3 reset-time snapshots, "variation_draws" for the variation system) without further wire-format changes. - Add synthetic_data_categorical.py smoke-test generator and rename synthetic_data.py -> synthetic_data_continuous.py for symmetry. Signed-off-by: Clemens Volk --- .../analysis/sensitivity/analyzer.py | 453 +++++++++++++----- .../analysis/sensitivity/dataset.py | 365 +++++++++----- .../analysis/sensitivity/episode_writer.py | 92 ++-- .../analysis/sensitivity/plotting.py | 217 +++++++++ .../sensitivity/synthetic_data_categorical.py | 155 ++++++ ...c_data.py => synthetic_data_continuous.py} | 91 ++-- isaaclab_arena/evaluation/eval_runner.py | 24 +- isaaclab_arena/evaluation/eval_runner_cli.py | 21 +- isaaclab_arena/scripts/analyze_sensitivity.py | 13 +- .../light_intensity_sweep_factors.yaml | 5 + .../pick_up_object_sweep_factors.yaml | 21 + ...k_up_object_sweep_minimal_jobs_config.json | 70 +++ 12 files changed, 1155 insertions(+), 372 deletions(-) create mode 100644 isaaclab_arena/analysis/sensitivity/plotting.py create mode 100644 isaaclab_arena/analysis/sensitivity/synthetic_data_categorical.py rename isaaclab_arena/analysis/sensitivity/{synthetic_data.py => synthetic_data_continuous.py} (64%) create mode 100644 isaaclab_arena_environments/eval_jobs_configs/pick_up_object_sweep_factors.yaml create mode 100644 isaaclab_arena_environments/eval_jobs_configs/pick_up_object_sweep_minimal_jobs_config.json diff --git a/isaaclab_arena/analysis/sensitivity/analyzer.py b/isaaclab_arena/analysis/sensitivity/analyzer.py index afe1fc46a..04e729a3b 100644 --- a/isaaclab_arena/analysis/sensitivity/analyzer.py +++ b/isaaclab_arena/analysis/sensitivity/analyzer.py @@ -3,162 +3,361 @@ # # SPDX-License-Identifier: Apache-2.0 -"""NPE analyzer for 1D continuous sensitivity analysis (MVP-1 path). +"""Inference-only analyzers for v0.3 sensitivity analysis. -Trains an ``sbi.inference.NPE`` density estimator on a ``SensitivityDataset`` and plots -the posterior over a single 1D continuous factor conditional on a chosen outcome value. +What this module does in plain English +-------------------------------------- +Given a dataset of (factor values, outcome values) pairs from a policy evaluation, the +analyzer learns the *conditional* distribution of factor values given a chosen outcome +value (e.g. "given the episode succeeded, which factor values were most consistent?"). +This is the **posterior** ``P(theta | outcome=success)``. Under v0.3's uniform prior, +this posterior's peak is also the operating point ``argmax P(success | theta)`` — so +plotting the marginal posterior over one factor identifies the values that maximize +success rate. -Under the uniform prior used by v0.3, the posterior shape ``P(theta | outcome=success)`` -is proportional to the sensitivity curve ``P(success | theta)`` — the conditioning is -just inverted via Bayes with a constant prior. The plot shows the posterior density -overlaid with the empirical data, colored by outcome, so a human can sanity-check that -high-density regions correspond to clusters of successful episodes. +The three concrete analyzers cover the three relevant factor-mix cases: + + - ``NPEAnalyzer`` — **N**eural **P**osterior **E**stimation. Used when *all* + declared factors are continuous. Trains a normalizing-flow density estimator on + ``(theta, x)`` pairs and exposes ``posterior.sample`` / ``posterior.log_prob``. + Limitation: with a binary outcome and a 1D theta, sbi falls back to a Gaussian + density and the recovered peak reflects the *mean* of successful theta values + rather than the true *mode* — a known caveat we surface as a [WARN] at fit time. + - ``MNPEAnalyzer`` — **M**ixed **N**eural **P**osterior **E**stimation. Used when + the schema has *both* continuous and categorical factors. sbi's MixedDensityEstimator + routes continuous columns through the same kind of flow NPE uses while routing + discrete columns through a categorical mass estimator. + - ``EmpiricalAnalyzer`` — Pure-categorical schemas. Skip the neural fit entirely: under + a uniform prior the posterior ``P(category | success)`` is *exactly* the normalized + per-category empirical success rate. No smoothing improves on that, and sbi MNPE + in version 0.26 also refuses to train without at least one continuous theta column. + +``make_analyzer(dataset, outcome_name)`` is the factory: callers don't need to know about +the hierarchy, they just hand it a dataset and outcome name. + +How rendering fits in +--------------------- +This module is *inference-only*. The sibling ``plotting`` module reads the analyzer's +public queries (``continuous_marginal_density``, ``categorical_marginal_probs``) and +renders matplotlib figures. Decoupling the two means new plot types don't require +analyzer changes, and analyzer changes don't risk breaking the plot. + +Public posterior-query surface used by ``plotting.py``: + - ``BaseAnalyzer.categorical_marginal_probs(factor_name, outcome_value, num_samples)`` + - ``PosteriorAnalyzer.continuous_marginal_density(factor_name, outcome_value, num_grid_points)`` + (NOT defined on ``EmpiricalAnalyzer`` — that analyzer rejects continuous factors at init time) """ from __future__ import annotations import numpy as np import torch -from pathlib import Path - -from isaaclab_arena.analysis.sensitivity.dataset import SensitivityDataset +from abc import ABC, abstractmethod +from isaaclab_arena.analysis.sensitivity.dataset import FactorSpec, SensitivityDataset -class NPEAnalyzer: - """Trains sbi NPE on (theta, x[outcome]) and plots the 1D posterior marginal. - For MVP-1 the analyzer is restricted to: - - one continuous 1D factor (raises NotImplementedError otherwise), - - a single outcome column at a time (selected by name at fit time). +class BaseAnalyzer(ABC): + """Abstract base — owns state validation and the abstract posterior-query surface. - Multi-factor / categorical / vector extensions go in subclasses or follow-up modules; - this one stays minimal so the smoke test is a clear, debuggable signal. + Subclasses must implement: + - ``fit`` — train (or no-op) so queries can be called afterwards. + - ``categorical_marginal_probs`` — return ``P(category | outcome)`` for a categorical factor. + Continuous-factor queries (``continuous_marginal_density``) live on ``PosteriorAnalyzer`` + only — the empirical analyzer never needs them by construction. """ def __init__(self, dataset: SensitivityDataset, outcome_name: str): self.dataset = dataset self.outcome_name = outcome_name - assert outcome_name in dataset.outcome_columns, ( - f"Outcome {outcome_name!r} not found in schema; available: {list(dataset.outcome_columns)}" - ) - # MVP-1 guards — keep them loud so anyone extending the analyzer notices. - assert dataset.theta.shape[1] == 1, ( - f"NPEAnalyzer (MVP-1) supports 1D theta only; got shape {tuple(dataset.theta.shape)}." - " Multi-factor or vector factors need the MNPE/multi-dim analyzer." - ) - self.posterior = None + assert ( + outcome_name in dataset.outcome_columns + ), f"Outcome {outcome_name!r} not found in schema; available: {list(dataset.outcome_columns)}" + assert len(dataset.schema.factors) > 0, "Schema declares no factors" + @abstractmethod def fit(self, training_batch_size: int = 50) -> None: - """Train NPE on (theta, x_selected). Stores the posterior on the instance.""" - from sbi.inference import NPE + """Train the posterior (or no-op for empirical) so queries can be called afterwards. - outcome_idx = self.dataset.outcome_columns[self.outcome_name] - x_selected = self.dataset.x[:, outcome_idx : outcome_idx + 1] + For NPE/MNPE this trains a neural density estimator on ``(theta, x_selected)``, + where ``x_selected`` is the single outcome column named by ``outcome_name``. For + the empirical analyzer this is a no-op — the categorical posterior is computed + directly from the data at query time. + """ + + @abstractmethod + def categorical_marginal_probs(self, factor_name: str, outcome_value: float, num_samples: int) -> np.ndarray: + """Return ``P(category | outcome=outcome_value)`` for one categorical factor. + + Output is a 1D numpy array of length ``len(factor.choices)`` whose entries sum to 1. + For posterior analyzers this is computed by sampling the trained posterior and + counting category frequencies; for the empirical analyzer it's the normalized + per-category empirical success rate. + """ + + def _factor_spec(self, factor_name: str) -> FactorSpec: + """Return the ``FactorSpec`` for ``factor_name``, asserting it exists in the schema.""" + assert ( + factor_name in self.dataset.factor_columns + ), f"Factor {factor_name!r} not in schema; available: {list(self.dataset.factor_columns)}" + return next(factor for factor in self.dataset.schema.factors if factor.name == factor_name) - # Surface the sbi 1D-binary caveat in the user's output instead of letting it slip by as a - # deeply nested UserWarning. Detection: x has a single column whose values are all in {0, 1}. - unique_vals = set(x_selected.flatten().tolist()) - if unique_vals.issubset({0.0, 1.0}): - print( - "[WARN] Outcome " - f"'{self.outcome_name}' is binary (values in {{0, 1}}). sbi NPE falls back to a" - " Gaussian density in 1D output space, so the recovered posterior peak reflects the" - " *mean* of successful theta values rather than the true *mode* of the success" - " curve. The peak location may be shifted; the qualitative shape is still" - " informative. Mitigations (future): fit on multiple outcomes simultaneously, use" - " a log-prior, or swap to a binary-appropriate fitter (e.g. logistic regression)." - ) - inference = NPE(prior=self.dataset.prior) - inference.append_simulations(self.dataset.theta, x_selected) +class PosteriorAnalyzer(BaseAnalyzer): + """Common base for the sbi-driven analyzers (NPE and MNPE). + + NPE and MNPE differ only in *which* sbi inference class they instantiate; everything + else (training loop, posterior storage, density and sample queries) is identical. + Subclasses override ``_make_inference`` to choose the class, and the + binary-outcome WARN hook to surface any method-specific caveats. + + After ``fit()`` returns, ``self.posterior`` is an sbi posterior object that supports + ``posterior.sample(shape, x=...)`` and (for NPE) ``posterior.log_prob(theta, x=...)``. + """ + + def __init__(self, dataset: SensitivityDataset, outcome_name: str): + super().__init__(dataset, outcome_name) + self.posterior = None + + def _make_inference(self): + """Return the sbi inference object to train with. + + Subclass-specific: ``NPEAnalyzer`` returns ``sbi.inference.NPE(...)``, + ``MNPEAnalyzer`` returns ``sbi.inference.MNPE(...)``. The lazy import of sbi + lives in the subclass so callers don't pay the (heavy) sbi import cost until + they actually fit. + """ + raise NotImplementedError("PosteriorAnalyzer subclasses must implement _make_inference") + + def fit(self, training_batch_size: int = 50) -> None: + """Train the chosen sbi estimator on ``(theta, x_selected)`` and stash the posterior. + + Steps: + 1. Slice ``self.dataset.x`` to the single outcome column named by ``outcome_name``. + 2. Surface any method-specific caveats about the outcome (e.g. NPE's + 1D-binary Gaussian fallback) via ``_maybe_warn_binary_outcome``. + 3. Instantiate the sbi inference object (NPE or MNPE) via ``_make_inference``. + 4. Append the simulations and train. + 5. Build a posterior object from the trained estimator and store it on ``self``. + """ + outcome_column_index = self.dataset.outcome_columns[self.outcome_name] + selected_outcome_column = self.dataset.x[:, outcome_column_index : outcome_column_index + 1] + self._maybe_warn_binary_outcome(selected_outcome_column) + + print( + f"[INFO] {type(self).__name__}: fitting on {self.dataset.theta.shape[0]} samples" + f" (theta dim={self.dataset.theta.shape[1]}," + f" x dim={selected_outcome_column.shape[1]})." + ) + inference = self._make_inference() + inference.append_simulations(self.dataset.theta, selected_outcome_column) density_estimator = inference.train(training_batch_size=training_batch_size) self.posterior = inference.build_posterior(density_estimator) - def plot_marginal( - self, - factor_name: str, - output_path: str | Path, - outcome_value: float = 1.0, - num_grid_points: int = 200, - ) -> None: - """Plot ``P(factor | outcome=outcome_value)`` as a density curve. - - Args: - factor_name: Which factor's marginal to plot. For MVP-1 this is the only factor. - output_path: Where to save the figure (PNG inferred from extension). - outcome_value: Conditioning value; 1.0 for "given success". - num_grid_points: Resolution of the posterior density curve. + def _maybe_warn_binary_outcome(self, selected_outcome_column: torch.Tensor) -> None: + """Optional hook for subclass-specific caveats about binary outcomes. Default: no-op. + + ``NPEAnalyzer`` overrides this to warn that with a single binary outcome column + sbi falls back to a Gaussian density, biasing the recovered peak toward the + mean of successful theta values rather than the true mode. """ - import matplotlib.pyplot as plt - assert self.posterior is not None, "Call fit() before plot_marginal()" - assert factor_name in self.dataset.factor_columns, ( - f"Factor {factor_name!r} not in schema; available: {list(self.dataset.factor_columns)}" - ) + def continuous_marginal_density( + self, factor_name: str, outcome_value: float, num_grid_points: int + ) -> tuple[np.ndarray, np.ndarray]: + """Evaluate ``P(factor_value | outcome=outcome_value)`` over the factor's prior range. - # Build a grid over the factor's declared/inferred range. - factor_spec = next(f for f in self.dataset.schema.factors if f.name == factor_name) - assert factor_spec.range is not None and len(factor_spec.range) == 1, ( - "plot_marginal (MVP-1) expects a single 1D continuous factor with a populated range" - ) - lo, hi = factor_spec.range[0] - grid = torch.linspace(lo, hi, num_grid_points, dtype=torch.float32).unsqueeze(1) + Returns ``(grid, density)`` as numpy arrays of length ``num_grid_points``, suitable + for plotting as a smooth curve. + + Two evaluation paths depending on whether other factors are present: + - **1D theta** (the only declared factor is this one): evaluate + ``posterior.log_prob`` directly on a regular grid — exact, no sampling. + - **Multi-dim theta**: sample the posterior at the given outcome value, extract + this factor's column, and histogram-then-interpolate to a grid. This + marginalizes over the other factor dims implicitly. + """ + assert self.posterior is not None, "Call fit() before querying the posterior" + factor_spec = self._factor_spec(factor_name) + assert ( + factor_spec.type == "continuous" + ), f"continuous_marginal_density expects a continuous factor; {factor_name!r} is {factor_spec.type!r}" + assert ( + factor_spec.range is not None and len(factor_spec.range) == 1 + ), "Continuous-factor marginal expects a populated 1D range" + + factor_column_slice = self.dataset.factor_columns[factor_name] + observed_outcome = torch.tensor([outcome_value], dtype=torch.float32) + range_low, range_high = factor_spec.range[0] + + if self.dataset.theta.shape[1] == 1: + grid_tensor = torch.linspace(range_low, range_high, num_grid_points, dtype=torch.float32).unsqueeze(1) + with torch.no_grad(): + log_probabilities = self.posterior.log_prob(grid_tensor, x=observed_outcome) + density_numpy = torch.exp(log_probabilities).cpu().numpy() + grid_numpy = grid_tensor.squeeze(-1).cpu().numpy() + else: + with torch.no_grad(): + posterior_samples = self.posterior.sample((10_000,), x=observed_outcome) + factor_column_samples = posterior_samples[:, factor_column_slice].squeeze(-1).cpu().numpy() + grid_numpy = np.linspace(range_low, range_high, num_grid_points) + histogram_density, bin_edges = np.histogram( + factor_column_samples, bins=40, range=(range_low, range_high), density=True + ) + density_numpy = np.interp(grid_numpy, 0.5 * (bin_edges[:-1] + bin_edges[1:]), histogram_density) + + return grid_numpy, density_numpy + + def categorical_marginal_probs(self, factor_name: str, outcome_value: float, num_samples: int) -> np.ndarray: + """Estimate ``P(category | outcome)`` by sampling the trained posterior. - # Evaluate posterior density at each grid point conditional on the observed outcome. - x_obs = torch.tensor([outcome_value], dtype=torch.float32) + Draws ``num_samples`` from ``posterior(theta | x=outcome_value)``, extracts the + factor's column (which sbi returns as floats over the BoxUniform support), rounds + to the nearest integer in ``[0, num_choices - 1]``, and tallies frequencies. + Result is a length-``num_choices`` numpy array that sums to 1. + """ + assert self.posterior is not None, "Call fit() before querying the posterior" + factor_spec = self._factor_spec(factor_name) + assert factor_spec.type == "categorical" + assert factor_spec.choices is not None + factor_column_slice = self.dataset.factor_columns[factor_name] + num_choices = len(factor_spec.choices) + + observed_outcome = torch.tensor([outcome_value], dtype=torch.float32) with torch.no_grad(): - log_probs = self.posterior.log_prob(grid, x=x_obs) - density = torch.exp(log_probs).cpu().numpy() - grid_np = grid.squeeze(-1).cpu().numpy() - - # Empirical data: theta values colored by whether they hit the conditioning outcome. - outcome_idx = self.dataset.outcome_columns[self.outcome_name] - emp_theta = self.dataset.theta[:, 0].cpu().numpy() - emp_outcome = self.dataset.x[:, outcome_idx].cpu().numpy() - hit_mask = emp_outcome >= 0.5 - - fig, ax = plt.subplots(figsize=(8, 5)) - ax.plot( - grid_np, - density, - color="steelblue", - linewidth=2, - label=f"P({factor_name} | {self.outcome_name}={outcome_value:g})", - ) - ax.fill_between(grid_np, 0, density, color="steelblue", alpha=0.2) - - # Rug plot of empirical data, two rows for the two outcome levels. - y_rug_hit = np.full(hit_mask.sum(), -0.05 * density.max()) - y_rug_miss = np.full((~hit_mask).sum(), -0.1 * density.max()) - ax.scatter( - emp_theta[hit_mask], - y_rug_hit, - marker="|", - color="seagreen", - s=80, - label=f"{self.outcome_name} ≥ 0.5 (n={hit_mask.sum()})", - ) - ax.scatter( - emp_theta[~hit_mask], - y_rug_miss, - marker="|", - color="firebrick", - s=80, - label=f"{self.outcome_name} < 0.5 (n={(~hit_mask).sum()})", - ) + posterior_samples = self.posterior.sample((num_samples,), x=observed_outcome) + factor_column_samples = posterior_samples[:, factor_column_slice].squeeze(-1).cpu().numpy() + clipped_codes = np.clip(np.round(factor_column_samples), 0, num_choices - 1).astype(int) + return np.bincount(clipped_codes, minlength=num_choices) / num_samples + + +class NPEAnalyzer(PosteriorAnalyzer): + """Neural Posterior Estimation analyzer for continuous-only factor schemas. + + Use this when every declared factor is continuous (no categoricals). Internally + trains ``sbi.inference.NPE``, which fits a normalizing-flow density over + ``(theta, x_selected)`` and exposes both ``sample`` and ``log_prob`` on the result. + + **Caveat for binary outcomes (1D x):** sbi's flow code falls back to a Gaussian + density when the output space is 1D, which biases the recovered posterior peak + toward the *mean* of successful theta values rather than the true *mode* of the + success curve. We surface a [WARN] at fit time so users see this in plain text + rather than buried in sbi's UserWarning stream. + """ + + def _make_inference(self): + """Construct ``sbi.inference.NPE`` configured with the dataset's uniform prior.""" + from sbi.inference import NPE + + return NPE(prior=self.dataset.prior) + + def _maybe_warn_binary_outcome(self, selected_outcome_column: torch.Tensor) -> None: + """Warn if the selected outcome is binary — see class docstring for the caveat.""" + unique_values = set(selected_outcome_column.flatten().tolist()) + if unique_values.issubset({0.0, 1.0}): + print( + f"[WARN] Outcome {self.outcome_name!r} is binary (values in {{0, 1}}) and the" + " analyzer is using NPE (no categorical factors). sbi NPE falls back to a" + " Gaussian density in 1D output space, so the recovered posterior peak" + " reflects the *mean* of successful theta values rather than the true *mode*" + " of the success curve. Qualitative shape is still informative." + ) + + +class MNPEAnalyzer(PosteriorAnalyzer): + """Mixed Neural Posterior Estimation analyzer for schemas with at least one of each type. + + Use this when the schema mixes continuous and categorical factors. Internally trains + ``sbi.inference.MNPE``, whose mixed density estimator routes continuous theta columns + through a normalizing flow while routing categorical columns through a categorical + mass estimator. The continuous-first / categorical-after column ordering in + ``factor_columns`` matches MNPE's expected layout exactly. + + sbi MNPE 0.26 requires at least one continuous theta column. For pure-categorical + schemas use ``EmpiricalAnalyzer`` instead — ``make_analyzer`` dispatches correctly. + """ - ax.set_xlabel(factor_name) - ax.set_ylabel("posterior density") - ax.set_title( - f"Sensitivity of {self.outcome_name} to {factor_name}\n" - f"slice: {self.dataset.schema.slice.policy} / " - f"{self.dataset.schema.slice.task} / {self.dataset.schema.slice.embodiment}" + def _make_inference(self): + """Construct ``sbi.inference.MNPE`` configured with the dataset's uniform prior.""" + from sbi.inference import MNPE + + return MNPE(prior=self.dataset.prior) + + +class EmpiricalAnalyzer(BaseAnalyzer): + """Frequency-table analyzer for pure-categorical factor schemas — no neural fit. + + Use this when every declared factor is categorical. Under v0.3's uniform prior, + Bayes' rule simplifies ``P(category | success) ∝ P(success | category) · P(category)`` + to ``P(category | success) ∝ P(success | category)`` — i.e. the posterior is *exactly* + the per-category empirical success rate, normalized to sum to 1. No neural network + can do better than this with a uniform prior; smoothing only hurts. + + Also covers a sbi limitation: MNPE 0.26 refuses to train if theta has zero continuous + columns. The empirical path sidesteps that entirely. + + Rejects continuous factors at construction time — ``make_analyzer`` shouldn't even + dispatch here for mixed schemas, but the explicit guard makes the constraint clear. + """ + + def __init__(self, dataset: SensitivityDataset, outcome_name: str): + super().__init__(dataset, outcome_name) + has_continuous_factor = any(factor.type == "continuous" for factor in dataset.schema.factors) + assert not has_continuous_factor, ( + "EmpiricalAnalyzer is only valid for all-categorical schemas. For mixed" + " continuous + categorical factors, use MNPEAnalyzer." ) - ax.legend(loc="best", fontsize=9) - ax.grid(alpha=0.3) - fig.tight_layout() - - output_path = Path(output_path) - output_path.parent.mkdir(parents=True, exist_ok=True) - fig.savefig(output_path, dpi=150) - plt.close(fig) + + def fit(self, training_batch_size: int = 50) -> None: + """No-op — the posterior is computed directly from the data at query time.""" + print(f"[INFO] {type(self).__name__}: no neural fit needed for pure-categorical schema.") + + def categorical_marginal_probs(self, factor_name: str, outcome_value: float, num_samples: int) -> np.ndarray: + """Return ``P(category | outcome) = per_category_success_rate / sum(per_category_success_rate)``. + + For each category, computes the fraction of rows assigned to it whose outcome + column is ``>= 0.5`` (treating outcome as binary). Then normalizes across + categories so the result sums to 1. ``outcome_value`` and ``num_samples`` are + accepted for interface compatibility with ``PosteriorAnalyzer`` but not used — + empirical analysis treats outcome as binary (success vs not-success). + """ + factor_spec = self._factor_spec(factor_name) + assert factor_spec.type == "categorical" + assert factor_spec.choices is not None + factor_column_slice = self.dataset.factor_columns[factor_name] + num_choices = len(factor_spec.choices) + outcome_column_index = self.dataset.outcome_columns[self.outcome_name] + + empirical_theta_codes = self.dataset.theta[:, factor_column_slice].squeeze(-1).long().cpu().numpy() + empirical_outcomes = self.dataset.x[:, outcome_column_index].cpu().numpy() + empirical_rates = np.zeros(num_choices) + for code in range(num_choices): + category_mask = empirical_theta_codes == code + if category_mask.any(): + empirical_rates[code] = float((empirical_outcomes[category_mask] >= 0.5).mean()) + total_rate = float(empirical_rates.sum()) + if total_rate > 0: + return empirical_rates / total_rate + return np.full(num_choices, 1.0 / num_choices) + + +def make_analyzer(dataset: SensitivityDataset, outcome_name: str) -> BaseAnalyzer: + """Construct the right analyzer for the dataset's factor mix. + + Dispatch table: + - any continuous + any categorical → :class:`MNPEAnalyzer` + - all categorical (zero continuous) → :class:`EmpiricalAnalyzer` + - all continuous (zero categorical) → :class:`NPEAnalyzer` + + Callers should always go through this factory rather than instantiating a specific + subclass — the dispatch encodes invariants (e.g. sbi MNPE 0.26 not supporting + pure-categorical theta) that aren't enforced elsewhere. + """ + num_continuous_factors = sum(1 for factor in dataset.schema.factors if factor.type == "continuous") + num_categorical_factors = sum(1 for factor in dataset.schema.factors if factor.type == "categorical") + assert num_continuous_factors + num_categorical_factors > 0, "Schema declares no factors" + if num_continuous_factors > 0 and num_categorical_factors > 0: + return MNPEAnalyzer(dataset, outcome_name) + if num_categorical_factors > 0: + return EmpiricalAnalyzer(dataset, outcome_name) + return NPEAnalyzer(dataset, outcome_name) diff --git a/isaaclab_arena/analysis/sensitivity/dataset.py b/isaaclab_arena/analysis/sensitivity/dataset.py index 63fd4ab0f..1a176bc23 100644 --- a/isaaclab_arena/analysis/sensitivity/dataset.py +++ b/isaaclab_arena/analysis/sensitivity/dataset.py @@ -7,24 +7,28 @@ Combines a hand-authored ``factors.yaml`` (the declared schema + priors) with an ``episode_summary.jsonl`` (per-episode factor draws + outcome values, written by -``episode_writer``) into the tensors that ``sbi`` consumes: - - theta: (N, total_factor_dim) factor values per episode, continuous-first column order - x: (N, n_outcomes) outcome values per episode - prior: sbi.utils.BoxUniform built from declared (or data-inferred) range per factor - -``factor_columns[name]`` returns the slice that factor occupies in ``theta`` — the -stable interface across MVP-1/2/3 that lets the analyzer extract marginals by name. - -MVP-1 implements the continuous-scalar branch only; categorical and vector (dim > 1) -branches raise NotImplementedError so adding them later is a fill-in, not a rewrite. +``episode_writer``) into the tensors that ``sbi`` consumes for posterior inference. + +Vocabulary refresher (for readers new to simulation-based inference / SBI): + - **theta** — the *factor* values per episode. The "inputs" we vary in the eval (e.g. + ``light_intensity``, ``pick_up_object``). Shape ``(num_episodes, total_factor_dim)``, + continuous factors come first then categoricals. + - **x** — the *outcome* values per episode. The "outputs" the policy produced (e.g. + ``success_rate``, ``object_moved_rate``). Shape ``(num_episodes, num_outcomes)``. + - **prior** — the assumed distribution over theta *before* seeing data. v0.3 ships + uniform priors only, encoded as ``sbi.utils.BoxUniform``. + - **factor_columns** — map from factor name to its column slice in theta, so + downstream code can extract a marginal by name without knowing the layout. + +MVP-2 supports continuous-1D and categorical factors. Vector continuous (``dim > 1``) +factors still raise ``NotImplementedError`` so adding them later is a fill-in. """ from __future__ import annotations import json -import yaml import torch +import yaml from dataclasses import dataclass from pathlib import Path from typing import Literal @@ -32,6 +36,12 @@ @dataclass class FactorSpec: + """One factor's schema as declared in ``factors.yaml``. + + Continuous factors carry a ``range`` (one ``[low, high]`` pair per dim); categorical + factors carry ``choices`` (a list of string labels, integer-encoded by index in theta). + """ + name: str type: Literal["continuous", "categorical"] dim: int = 1 @@ -41,12 +51,20 @@ class FactorSpec: @dataclass class OutcomeSpec: + """One outcome's schema (just a name and a type hint; the loader treats all as float).""" + name: str type: str # "bool", "float", "int" — informational; loader treats all as float @dataclass class SliceSpec: + """The ``(policy, task, embodiment)`` tuple a dataset comes from. + + MNPE/NPE assume a single data-generating source per analysis, so all rows in a + dataset must belong to the same slice — enforced by the loader. + """ + policy: str task: str embodiment: str @@ -54,180 +72,299 @@ class SliceSpec: @dataclass class FactorSchema: + """Parsed ``factors.yaml`` — slice + factor list + outcome list.""" + slice: SliceSpec factors: list[FactorSpec] outcomes: list[OutcomeSpec] @classmethod def from_yaml(cls, path: str | Path) -> FactorSchema: - with open(path, encoding="utf-8") as f: - data = yaml.safe_load(f) - assert isinstance(data, dict), f"factors.yaml at {path} must be a mapping at top level" - for required in ("slice", "factors", "outcomes"): - assert required in data, f"factors.yaml at {path} is missing top-level `{required}:` block" - - slice_data = data["slice"] - for required in ("policy", "task", "embodiment"): - assert required in slice_data, ( - f"factors.yaml at {path} `slice:` block is missing `{required}` (need policy/task/embodiment)" - ) + """Load a ``factors.yaml`` from disk into a typed ``FactorSchema``. + + The YAML must have three top-level blocks: ``slice`` (policy/task/embodiment), + ``factors`` (one entry per varied input), and ``outcomes`` (one entry per + measured output). Each factor's ``type`` must be ``continuous`` or ``categorical``. + """ + with open(path, encoding="utf-8") as yaml_file: + yaml_data = yaml.safe_load(yaml_file) + assert isinstance(yaml_data, dict), f"factors.yaml at {path} must be a mapping at top level" + for required_key in ("slice", "factors", "outcomes"): + assert required_key in yaml_data, f"factors.yaml at {path} is missing top-level `{required_key}:` block" + + slice_block = yaml_data["slice"] + for required_key in ("policy", "task", "embodiment"): + assert ( + required_key in slice_block + ), f"factors.yaml at {path} `slice:` block is missing `{required_key}` (need policy/task/embodiment)" slice_spec = SliceSpec( - policy=slice_data["policy"], - task=slice_data["task"], - embodiment=slice_data["embodiment"], + policy=slice_block["policy"], + task=slice_block["task"], + embodiment=slice_block["embodiment"], ) - factors = [] - for name, spec in data["factors"].items(): - assert "type" in spec, ( - f"factors.yaml at {path} factor {name!r} is missing required `type:` field" + factors: list[FactorSpec] = [] + for factor_name, factor_block in yaml_data["factors"].items(): + assert "type" in factor_block, ( + f"factors.yaml at {path} factor {factor_name!r} is missing required `type:` field" " (expected 'continuous' or 'categorical')" ) - ftype = spec["type"] - assert ftype in ("continuous", "categorical"), ( - f"factors.yaml at {path} factor {name!r} has unknown type {ftype!r};" + factor_type = factor_block["type"] + assert factor_type in ("continuous", "categorical"), ( + f"factors.yaml at {path} factor {factor_name!r} has unknown type {factor_type!r};" " expected 'continuous' or 'categorical'" ) factors.append( FactorSpec( - name=name, - type=ftype, - dim=spec.get("dim", 1), - range=spec.get("range"), - choices=spec.get("choices"), + name=factor_name, + type=factor_type, + dim=factor_block.get("dim", 1), + range=factor_block.get("range"), + choices=factor_block.get("choices"), ) ) - outcomes = [OutcomeSpec(name=name, type=spec.get("type", "float")) for name, spec in data["outcomes"].items()] + outcomes = [ + OutcomeSpec(name=outcome_name, type=outcome_block.get("type", "float")) + for outcome_name, outcome_block in yaml_data["outcomes"].items() + ] return cls(slice=slice_spec, factors=factors, outcomes=outcomes) @property def total_factor_dim(self) -> int: - return sum(f.dim if f.type == "continuous" else 1 for f in self.factors) + """Total width of theta — sum of ``dim`` over continuous factors plus 1 per categorical.""" + return sum(factor.dim if factor.type == "continuous" else 1 for factor in self.factors) @property def factor_columns(self) -> dict[str, slice]: - """Map factor name → column slice in theta. Continuous factors come first, categoricals after.""" - cont = [f for f in self.factors if f.type == "continuous"] - cat = [f for f in self.factors if f.type == "categorical"] - cols: dict[str, slice] = {} - i = 0 - for f in cont + cat: - width = f.dim if f.type == "continuous" else 1 - cols[f.name] = slice(i, i + width) - i += width - return cols + """Map factor name → column slice in theta. + Continuous factors occupy the leading columns (their ``dim`` columns each), then + each categorical factor occupies one trailing column. This continuous-first + ordering matches sbi's MNPE convention so the same theta layout works for both + NPE (all-continuous) and MNPE (mixed). + """ + continuous_factors = [factor for factor in self.factors if factor.type == "continuous"] + categorical_factors = [factor for factor in self.factors if factor.type == "categorical"] + column_slices: dict[str, slice] = {} + column_index = 0 + for factor in continuous_factors + categorical_factors: + column_width = factor.dim if factor.type == "continuous" else 1 + column_slices[factor.name] = slice(column_index, column_index + column_width) + column_index += column_width + return column_slices -class SensitivityDataset: - """Combines factors.yaml + episode_summary.jsonl into (theta, x, prior, factor_columns). - Validates that every JSONL row contains all declared factors and outcomes; fills in any - missing continuous ranges by inferring from observed min/max so downstream code can always - trust ``schema.factors[i].range`` to be populated. +class SensitivityDataset: + """Combines a ``factors.yaml`` schema with an ``episode_summary.jsonl`` data file. + + On construction: + 1. Parses the schema (factors + outcomes + slice metadata). + 2. Loads the JSONL rows (one row per episode). + 3. Validates that every row contains all declared factor and outcome keys. + 4. Fills any missing continuous ranges by inferring from observed min/max so the + analyzer can always trust ``schema.factors[i].range`` to be populated. + 5. Builds the ``theta`` and ``x`` tensors that sbi (or the empirical analyzer) + will consume. + + The four public attributes used by the analyzer (``theta``, ``x``, ``prior``, + ``factor_columns``) are properties — recomputed lazily where appropriate. """ def __init__(self, factors_yaml: str | Path, jsonl_path: str | Path): self.schema = FactorSchema.from_yaml(factors_yaml) - text = Path(jsonl_path).read_text(encoding="utf-8") - self.rows = [json.loads(line) for line in text.splitlines() if line.strip()] + jsonl_text = Path(jsonl_path).read_text(encoding="utf-8") + self.rows = [json.loads(line) for line in jsonl_text.splitlines() if line.strip()] assert len(self.rows) > 0, f"Empty episode_summary.jsonl at {jsonl_path}" self._validate_rows(jsonl_path) - self._fill_inferred_ranges() + self._infer_missing_factor_ranges() - self._theta = self._build_theta() - self._x = self._build_x() + self._theta = self._build_factor_tensor() + self._x = self._build_outcome_tensor() def _validate_rows(self, jsonl_path: str | Path) -> None: - expected_factors = {f.name for f in self.schema.factors} - expected_outcomes = {o.name for o in self.schema.outcomes} - for i, row in enumerate(self.rows): - assert "factors" in row and "outcomes" in row, f"Row {i} of {jsonl_path} missing factors/outcomes block" - got_factors = set(row["factors"].keys()) - assert got_factors == expected_factors, ( - f"Row {i} of {jsonl_path} declares factors {sorted(got_factors)}; " - f"schema expects {sorted(expected_factors)}" - ) - missing_outcomes = expected_outcomes - set(row["outcomes"].keys()) - assert not missing_outcomes, ( - f"Row {i} of {jsonl_path} missing outcomes {sorted(missing_outcomes)}" - ) + """Assert every JSONL row carries the keys declared in the schema. + + The writer logs the *entire* arena_env_args dict per row, so the loader only + requires that the schema's declared factor names are a *subset* of what's in + ``row["arena_env_args"]`` — extra keys (other arena_env_args we don't analyze) + are fine and ignored. Same superset-not-equality check for outcomes. - def _fill_inferred_ranges(self) -> None: - for f in self.schema.factors: - if f.type != "continuous" or f.range is not None: + Catches the most common authoring mistake: a factor declared in factors.yaml + that the eval didn't actually vary or log. Surfaces a clear error pointing at + the first offending row. + """ + expected_factor_names = {factor.name for factor in self.schema.factors} + expected_outcome_names = {outcome.name for outcome in self.schema.outcomes} + for row_index, row in enumerate(self.rows): + assert ( + "arena_env_args" in row and "outcomes" in row + ), f"Row {row_index} of {jsonl_path} missing arena_env_args/outcomes block" + missing_factor_names = expected_factor_names - set(row["arena_env_args"].keys()) + assert not missing_factor_names, ( + f"Row {row_index} of {jsonl_path} is missing factor(s) " + f"{sorted(missing_factor_names)} from its arena_env_args block; " + f"factors.yaml declares: {sorted(expected_factor_names)}" + ) + missing_outcome_names = expected_outcome_names - set(row["outcomes"].keys()) + assert ( + not missing_outcome_names + ), f"Row {row_index} of {jsonl_path} missing outcomes {sorted(missing_outcome_names)}" + + def _infer_missing_factor_ranges(self) -> None: + """For any continuous factor without a declared ``range``, fill it from observed data. + + The prior bounds default to ``[min(values), max(values)]`` over the JSONL. Users + who want a principled prior (e.g. matching the variation system's declared + ``Uniform(low, high)``) should hand-author ``range`` in factors.yaml; that value + takes precedence and this method skips them. + """ + for factor in self.schema.factors: + if factor.type != "continuous" or factor.range is not None: continue - if f.dim != 1: + if factor.dim != 1: raise NotImplementedError( - f"Range inference for vector factors (dim > 1) is not implemented; factor {f.name!r} has dim={f.dim}" + "Range inference for vector factors (dim > 1) is not implemented;" + f" factor {factor.name!r} has dim={factor.dim}" ) - values = [float(row["factors"][f.name]) for row in self.rows] - f.range = [[min(values), max(values)]] - - def _build_theta(self) -> torch.Tensor: - cont_factors = [f for f in self.schema.factors if f.type == "continuous"] - cat_factors = [f for f in self.schema.factors if f.type == "categorical"] - if cat_factors: - raise NotImplementedError( - "Categorical factors are not yet supported by SensitivityDataset (MVP-1 covers continuous only)." - ) + observed_values = [float(row["arena_env_args"][factor.name]) for row in self.rows] + factor.range = [[min(observed_values), max(observed_values)]] + + def _build_factor_tensor(self) -> torch.Tensor: + """Assemble the per-episode factor matrix ``theta``. - cols = [] - for f in cont_factors: - if f.dim != 1: + Layout: continuous factors fill the leading columns (one column per dim), then + each categorical factor fills one trailing column. Categorical values are + encoded as ``float32`` integers ``0..num_choices-1`` per the index in + ``FactorSpec.choices`` — sbi's MNPE expects exactly this layout (continuous-first, + discrete columns as floats, the density estimator handles them as discrete). + """ + continuous_factors = [factor for factor in self.schema.factors if factor.type == "continuous"] + categorical_factors = [factor for factor in self.schema.factors if factor.type == "categorical"] + + factor_columns: list[torch.Tensor] = [] + + # Continuous columns come first (sbi MNPE convention). + for factor in continuous_factors: + if factor.dim != 1: raise NotImplementedError( - f"Vector continuous factors (dim > 1) are not yet supported; factor {f.name!r} has dim={f.dim}" + "Vector continuous factors (dim > 1) are not yet supported;" + f" factor {factor.name!r} has dim={factor.dim}" ) - col = torch.tensor( - [float(row["factors"][f.name]) for row in self.rows], dtype=torch.float32 + factor_column = torch.tensor( + [float(row["arena_env_args"][factor.name]) for row in self.rows], dtype=torch.float32 ).unsqueeze(1) - cols.append(col) - return torch.cat(cols, dim=1) if cols else torch.zeros((len(self.rows), 0), dtype=torch.float32) + factor_columns.append(factor_column) + + # Categorical columns: integer-code each string value as its index in FactorSpec.choices. + for factor in categorical_factors: + assert ( + factor.choices is not None and len(factor.choices) > 0 + ), f"Categorical factor {factor.name!r} has no `choices:` block in factors.yaml" + choice_to_code = {choice: code for code, choice in enumerate(factor.choices)} + category_codes: list[int] = [] + for row_index, row in enumerate(self.rows): + value = row["arena_env_args"][factor.name] + assert value in choice_to_code, ( + f"Row {row_index} factor {factor.name!r} has value {value!r}" + f" not in declared choices {factor.choices}" + ) + category_codes.append(choice_to_code[value]) + factor_column = torch.tensor(category_codes, dtype=torch.float32).unsqueeze(1) + factor_columns.append(factor_column) + + if factor_columns: + return torch.cat(factor_columns, dim=1) + return torch.zeros((len(self.rows), 0), dtype=torch.float32) + + def _build_outcome_tensor(self) -> torch.Tensor: + """Assemble the per-episode outcome matrix ``x`` (one column per declared outcome). - def _build_x(self) -> torch.Tensor: - outcome_cols = [ - torch.tensor([float(row["outcomes"][o.name]) for row in self.rows], dtype=torch.float32).unsqueeze(1) - for o in self.schema.outcomes + Each outcome value is cast to float; bool outcomes become 0.0/1.0. The analyzer + usually selects a single outcome column at fit time and conditions queries on it. + """ + outcome_column_tensors = [ + torch.tensor([float(row["outcomes"][outcome.name]) for row in self.rows], dtype=torch.float32).unsqueeze(1) + for outcome in self.schema.outcomes ] - return torch.cat(outcome_cols, dim=1) + return torch.cat(outcome_column_tensors, dim=1) @property def theta(self) -> torch.Tensor: + """``(num_episodes, total_factor_dim)`` matrix of factor values, one row per episode. + + This is the "input" sbi infers a posterior over. Column layout is given by + ``factor_columns`` — continuous factors first, then categoricals (integer-coded). + """ return self._theta @property def x(self) -> torch.Tensor: + """``(num_episodes, num_outcomes)`` matrix of outcome values, one row per episode. + + This is what the analyzer conditions queries on. The analyzer typically selects a + single outcome column at fit time (e.g. ``success_rate``) and asks + "what theta values were consistent with observing this outcome?" + """ return self._x @property def factor_columns(self) -> dict[str, slice]: + """Map factor name → its column slice in theta. Same as ``schema.factor_columns``.""" return self.schema.factor_columns @property def outcome_columns(self) -> dict[str, int]: - return {o.name: i for i, o in enumerate(self.schema.outcomes)} + """Map outcome name → its column index in x.""" + return {outcome.name: index for index, outcome in enumerate(self.schema.outcomes)} + + @property + def has_categorical_factors(self) -> bool: + """True iff the schema declares at least one categorical factor.""" + return any(factor.type == "categorical" for factor in self.schema.factors) @property def prior(self): - """sbi BoxUniform over all continuous factor dims, built from declared/inferred ranges. + """The uniform prior over all factor dims that the analyzer assumes. + + Built as a single ``sbi.utils.BoxUniform`` over the concatenated bounds in + continuous-first / categorical-after order: + - Continuous factor → uses the declared (or inferred) ``[low, high]`` per dim. + - Categorical factor → uses ``[0, num_choices - 1]`` (the integer codes from + ``_build_factor_tensor``); sbi MNPE's mixed density estimator treats them as + discrete from there. - Imported lazily so loading the module doesn't require ``sbi`` for non-analysis use. + sbi is imported lazily so loading the dataset doesn't pay the sbi import cost + unless the analyzer actually runs. """ from sbi.utils import BoxUniform - low: list[float] = [] - high: list[float] = [] - for f in self.schema.factors: - if f.type != "continuous": + low_bounds: list[float] = [] + high_bounds: list[float] = [] + + # Continuous factor bounds (one [low, high] pair per dim). + for factor in self.schema.factors: + if factor.type != "continuous": continue - assert f.range is not None, f"Factor {f.name!r} has no range and was not inferred" - for lo, hi in f.range: - low.append(float(lo)) - high.append(float(hi)) + assert factor.range is not None, f"Factor {factor.name!r} has no range and was not inferred" + for dim_low, dim_high in factor.range: + low_bounds.append(float(dim_low)) + high_bounds.append(float(dim_high)) + + # Categorical factor bounds: [0, num_choices - 1] per factor (one column). + for factor in self.schema.factors: + if factor.type != "categorical": + continue + assert ( + factor.choices is not None and len(factor.choices) > 0 + ), f"Categorical factor {factor.name!r} has no `choices:` block" + low_bounds.append(0.0) + high_bounds.append(float(len(factor.choices) - 1)) + return BoxUniform( - low=torch.tensor(low, dtype=torch.float32), - high=torch.tensor(high, dtype=torch.float32), + low=torch.tensor(low_bounds, dtype=torch.float32), + high=torch.tensor(high_bounds, dtype=torch.float32), ) diff --git a/isaaclab_arena/analysis/sensitivity/episode_writer.py b/isaaclab_arena/analysis/sensitivity/episode_writer.py index f2b4b6d19..54624456b 100644 --- a/isaaclab_arena/analysis/sensitivity/episode_writer.py +++ b/isaaclab_arena/analysis/sensitivity/episode_writer.py @@ -6,12 +6,17 @@ """Per-episode summary writer for sensitivity analysis. ``write_episode_summaries`` appends one JSONL row per recorded demo for a just-completed -job. Each row pairs the factor values (read from ``job.arena_env_args_dict`` for the keys -the user listed via ``--factor_keys``) with the per-episode outcome values, extracted from -the recorded hdf5 demos via each metric's ``compute_metric_from_recording``. +job. Each row carries: -The matching ``factors.yaml`` (consumed by the analyzer) is the user's responsibility — it -is hand-authored alongside the experiment's jobs config and must list the same factor keys. + - ``job_name`` and ``episode_idx`` for traceability, + - ``arena_env_args`` — the *entire* job.arena_env_args_dict, i.e. every value that + parameterized the env for this episode, + - ``outcomes`` — per-episode outcome values from the task's registered metrics, extracted + from the recorded hdf5 demos via each metric's ``compute_metric_from_recording``. + +The eval-side writer is intentionally analysis-agnostic: it logs all env state, and the +analyzer's ``factors.yaml`` decides which subset of those keys to treat as factors. This +keeps the writer free of any "what counts as a factor?" knowledge. Import-order note: this module legitimately touches pxr at import time via ``isaaclab_arena.metrics.metrics`` (which imports ``isaaclab.envs.manager_based_rl_env``). @@ -22,12 +27,11 @@ from __future__ import annotations +import h5py import json from pathlib import Path from typing import TYPE_CHECKING -import h5py - from isaaclab_arena.metrics.metrics import get_metric_recorder_dataset_path from isaaclab_arena.metrics.metrics_logger import metrics_to_plain_python_types @@ -35,65 +39,57 @@ from isaaclab_arena.evaluation.job_manager import Job -def write_episode_summaries( - env, - job: "Job", - factor_keys: list[str], - output_path: str | Path, -) -> int: +def write_episode_summaries(env, job: Job, output_path: str | Path) -> int: """Append one JSONL row per recorded demo for the just-completed job. - Each row has shape ``{"job_name", "episode_idx", "factors", "outcomes"}``. ``factors`` - is the same dict for every row in a job (factors don't vary within a job in MVP-1); - ``outcomes`` is per-demo, computed by calling each registered metric's - ``compute_metric_from_recording([demo_data])`` with a single-demo list. + Each row has shape:: + + { + "job_name": "", + "episode_idx": , + "arena_env_args": , + "outcomes": + } Args: - env: The (possibly gym-wrapped) Arena env that just finished its rollout. The - hdf5 path and registered metrics are read from ``env.unwrapped.cfg``. - job: The Job that ran. ``job.arena_env_args_dict`` must contain every key in - ``factor_keys``; missing keys raise AssertionError. - factor_keys: Names of factors to record (typically passed via --factor_keys). + env: The (possibly gym-wrapped) Arena env that just finished its rollout. The hdf5 + path and registered metrics are read from ``env.unwrapped.cfg``. + job: The Job that ran. Its ``arena_env_args_dict`` is logged verbatim under + ``arena_env_args``. output_path: JSONL file to append to. Created (with parent dirs) if absent. Returns: Number of rows written. """ - unwrapped = env.unwrapped - if not hasattr(unwrapped.cfg, "metrics") or unwrapped.cfg.metrics is None: + unwrapped_env = env.unwrapped + if not hasattr(unwrapped_env.cfg, "metrics") or unwrapped_env.cfg.metrics is None: return 0 - factors = {} - for key in factor_keys: - assert key in job.arena_env_args_dict, ( - f"Job '{job.name}' is missing factor '{key}' in its arena_env_args. " - f"Available keys: {sorted(job.arena_env_args_dict.keys())}" - ) - factors[key] = job.arena_env_args_dict[key] + arena_env_args_snapshot = dict(job.arena_env_args_dict) - dataset_path = get_metric_recorder_dataset_path(unwrapped) - metrics_cfg = unwrapped.cfg.metrics + hdf5_dataset_path = get_metric_recorder_dataset_path(unwrapped_env) + registered_metrics = unwrapped_env.cfg.metrics output_path = Path(output_path) output_path.parent.mkdir(parents=True, exist_ok=True) rows_written = 0 - with h5py.File(dataset_path, "r") as f: - demos = f["data"] - with open(output_path, "a", encoding="utf-8") as out: - for demo_idx, demo_name in enumerate(demos): - demo = demos[demo_name] - raw_outcomes = {} - for metric in metrics_cfg: - demo_data = demo[metric.recorder_term_name][:] - raw_outcomes[metric.name] = metric.compute_metric_from_recording([demo_data]) - outcomes = metrics_to_plain_python_types(raw_outcomes) - row = { + with h5py.File(hdf5_dataset_path, "r") as hdf5_file: + recorded_demos = hdf5_file["data"] + with open(output_path, "a", encoding="utf-8") as jsonl_output: + for demo_index, demo_name in enumerate(recorded_demos): + demo_group = recorded_demos[demo_name] + raw_outcome_values = {} + for metric in registered_metrics: + recorded_metric_data = demo_group[metric.recorder_term_name][:] + raw_outcome_values[metric.name] = metric.compute_metric_from_recording([recorded_metric_data]) + outcome_values = metrics_to_plain_python_types(raw_outcome_values) + summary_row = { "job_name": job.name, - "episode_idx": demo_idx, - "factors": factors, - "outcomes": outcomes, + "episode_idx": demo_index, + "arena_env_args": arena_env_args_snapshot, + "outcomes": outcome_values, } - out.write(json.dumps(row) + "\n") + jsonl_output.write(json.dumps(summary_row) + "\n") rows_written += 1 return rows_written diff --git a/isaaclab_arena/analysis/sensitivity/plotting.py b/isaaclab_arena/analysis/sensitivity/plotting.py new file mode 100644 index 000000000..5a5df1bf7 --- /dev/null +++ b/isaaclab_arena/analysis/sensitivity/plotting.py @@ -0,0 +1,217 @@ +# Copyright (c) 2025-2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Plot renderers for sensitivity analysis. + +Pure-visualization module. Calls into the analyzer's public posterior queries +(``continuous_marginal_density`` and ``categorical_marginal_probs``) and renders matplotlib +figures. Decoupled from the analyzer hierarchy so new plot types can be added without +touching inference code, and so existing plot code can be tested with mock posteriors. + +The single entry point is ``plot_marginal(analyzer, factor_name, output_path, ...)``, +which dispatches by factor type to the right renderer. +""" + +from __future__ import annotations + +from pathlib import Path +from typing import TYPE_CHECKING + +import numpy as np + +if TYPE_CHECKING: + from isaaclab_arena.analysis.sensitivity.analyzer import BaseAnalyzer + from isaaclab_arena.analysis.sensitivity.dataset import FactorSpec + + +def plot_marginal( + analyzer: "BaseAnalyzer", + factor_name: str, + output_path: str | Path, + outcome_value: float = 1.0, + num_samples: int = 10_000, + num_grid_points: int = 200, +) -> None: + """Render the marginal posterior for ``factor_name``, dispatching by factor type. + + For continuous factors, the analyzer must expose ``continuous_marginal_density`` + (only ``PosteriorAnalyzer`` does — ``EmpiricalAnalyzer`` rejects continuous factors at + construction time, so this branch isn't reachable through ``make_analyzer``). + """ + factor_spec = analyzer._factor_spec(factor_name) + if factor_spec.type == "continuous": + if not hasattr(analyzer, "continuous_marginal_density"): + raise NotImplementedError( + f"{type(analyzer).__name__} cannot plot continuous factors; expected a" + " PosteriorAnalyzer (NPE/MNPE)." + ) + _plot_continuous_marginal(analyzer, factor_spec, output_path, outcome_value, num_grid_points) + elif factor_spec.type == "categorical": + _plot_categorical_marginal(analyzer, factor_spec, output_path, outcome_value, num_samples) + else: + raise NotImplementedError(f"Unsupported factor type {factor_spec.type!r}") + + +def _plot_continuous_marginal( + analyzer: "BaseAnalyzer", + factor_spec: "FactorSpec", + output_path: str | Path, + outcome_value: float, + num_grid_points: int, +) -> None: + """Render a continuous factor's marginal posterior as a density curve. + + The blue curve shows ``P(factor_value | outcome=outcome_value)`` from the analyzer. + Below the x-axis is an empirical "rug" — small vertical ticks at the actual recorded + theta values, coloured green for episodes where the outcome was achieved (``≥ 0.5``) + and red for episodes where it was not. The rug lets a human eyeball whether the + smooth posterior actually agrees with where the successful episodes lived. + """ + import matplotlib.pyplot as plt + + grid, density = analyzer.continuous_marginal_density( + factor_spec.name, outcome_value, num_grid_points + ) + # Empirical rug, coloured by outcome — gives the human a sanity-check on the curve. + factor_column_slice = analyzer.dataset.factor_columns[factor_spec.name] + outcome_column_index = analyzer.dataset.outcome_columns[analyzer.outcome_name] + empirical_theta_values = analyzer.dataset.theta[:, factor_column_slice].squeeze(-1).cpu().numpy() + empirical_outcomes = analyzer.dataset.x[:, outcome_column_index].cpu().numpy() + success_mask = empirical_outcomes >= 0.5 + + figure, axes = plt.subplots(figsize=(8, 5)) + axes.plot( + grid, + density, + color="steelblue", + linewidth=2, + label=f"P({factor_spec.name} | {analyzer.outcome_name}={outcome_value:g})", + ) + axes.fill_between(grid, 0, density, color="steelblue", alpha=0.2) + axes.scatter( + empirical_theta_values[success_mask], + np.full(success_mask.sum(), -0.05 * density.max()), + marker="|", + color="seagreen", + s=80, + label=f"{analyzer.outcome_name} ≥ 0.5 (n={success_mask.sum()})", + ) + axes.scatter( + empirical_theta_values[~success_mask], + np.full((~success_mask).sum(), -0.1 * density.max()), + marker="|", + color="firebrick", + s=80, + label=f"{analyzer.outcome_name} < 0.5 (n={(~success_mask).sum()})", + ) + axes.set_xlabel(factor_spec.name) + axes.set_ylabel("posterior density") + axes.set_title(_plot_title(analyzer, factor_spec.name)) + axes.legend(loc="best", fontsize=9) + axes.grid(alpha=0.3) + figure.tight_layout() + _save_figure(figure, output_path) + + +def _plot_categorical_marginal( + analyzer: "BaseAnalyzer", + factor_spec: "FactorSpec", + output_path: str | Path, + outcome_value: float, + num_samples: int, +) -> None: + """Render a categorical factor's marginal as side-by-side bars per category. + + The blue bar (left of each category) is the analyzer's ``P(category | outcome)``. + The green bar (right of each category) is the *empirical* per-category outcome rate + — independent of the analyzer's posterior, computed directly from the raw data. + For the ``EmpiricalAnalyzer`` the two will agree exactly (up to normalization); for + a posterior-based analyzer they may differ slightly if the model smooths. + + Each green bar is annotated with the sample count ``n`` for that category, so the + user can see how trustworthy each bar is. + """ + import matplotlib.pyplot as plt + + assert factor_spec.choices is not None + choices = factor_spec.choices + num_choices = len(choices) + factor_column_slice = analyzer.dataset.factor_columns[factor_spec.name] + outcome_column_index = analyzer.dataset.outcome_columns[analyzer.outcome_name] + + # Posterior probs come from the analyzer; empirical rate and counts are raw data, + # rendered alongside as a sanity reference. + posterior_probabilities = analyzer.categorical_marginal_probs( + factor_spec.name, outcome_value, num_samples + ) + + empirical_theta_codes = ( + analyzer.dataset.theta[:, factor_column_slice].squeeze(-1).long().cpu().numpy() + ) + empirical_outcomes = analyzer.dataset.x[:, outcome_column_index].cpu().numpy() + empirical_rates = np.zeros(num_choices) + empirical_counts = np.zeros(num_choices, dtype=int) + for code in range(num_choices): + category_mask = empirical_theta_codes == code + empirical_counts[code] = int(category_mask.sum()) + if category_mask.any(): + empirical_rates[code] = float((empirical_outcomes[category_mask] >= 0.5).mean()) + + figure, axes = plt.subplots(figsize=(max(8, 1.0 * num_choices), 5)) + bar_x_positions = np.arange(num_choices) + bar_width = 0.4 + axes.bar( + bar_x_positions - bar_width / 2, + posterior_probabilities, + bar_width, + color="steelblue", + alpha=0.8, + label=f"P(category | {analyzer.outcome_name}={outcome_value:g})", + ) + axes.bar( + bar_x_positions + bar_width / 2, + empirical_rates, + bar_width, + color="seagreen", + alpha=0.7, + label=f"empirical {analyzer.outcome_name} rate per category", + ) + for category_index, count in enumerate(empirical_counts): + axes.text( + category_index + bar_width / 2, + empirical_rates[category_index] + 0.02, + f"n={count}", + ha="center", + fontsize=8, + ) + + axes.set_xticks(bar_x_positions) + axes.set_xticklabels(choices, rotation=30, ha="right") + axes.set_ylabel("probability") + axes.set_ylim(0, 1.05) + axes.set_title(_plot_title(analyzer, factor_spec.name)) + axes.legend(loc="best", fontsize=9) + axes.grid(alpha=0.3, axis="y") + figure.tight_layout() + _save_figure(figure, output_path) + + +def _plot_title(analyzer: "BaseAnalyzer", factor_name: str) -> str: + """Format the plot title as ``"Sensitivity of to " / slice block``.""" + return ( + f"Sensitivity of {analyzer.outcome_name} to {factor_name}\n" + f"slice: {analyzer.dataset.schema.slice.policy} / " + f"{analyzer.dataset.schema.slice.task} / {analyzer.dataset.schema.slice.embodiment}" + ) + + +def _save_figure(figure, output_path: str | Path) -> None: + """Save a matplotlib figure to disk (creating parent dirs) and close it.""" + import matplotlib.pyplot as plt + + output_path = Path(output_path) + output_path.parent.mkdir(parents=True, exist_ok=True) + figure.savefig(output_path, dpi=150) + plt.close(figure) diff --git a/isaaclab_arena/analysis/sensitivity/synthetic_data_categorical.py b/isaaclab_arena/analysis/sensitivity/synthetic_data_categorical.py new file mode 100644 index 000000000..23a16640e --- /dev/null +++ b/isaaclab_arena/analysis/sensitivity/synthetic_data_categorical.py @@ -0,0 +1,155 @@ +# Copyright (c) 2025-2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Synthetic JSONL generator for the MVP-2 categorical-factor analyzer smoke test. + +Generates a fake ``episode_summary.jsonl`` where a single categorical factor +``pick_up_object`` drives the success probability. Half of the choices are "easy" +(high success rate), the other half are "hard" (low success rate). With enough samples +the analyzer's recovered ``P(category | success=1)`` should concentrate on the easy +choices, and the empirical per-category bar should match the configured rates within +binomial noise. + +Sampling is **uniform over the categorical choices** (matches the semantics of +``Choose(...)`` in Alex's variation system and the uniform prior the analyzer assumes). + +Pair with the auto-emitted factors.yaml. End-to-end smoke test: + + /isaac-sim/python.sh -m isaaclab_arena.analysis.sensitivity.synthetic_data_categorical \\ + --output /tmp/syn_cat.jsonl + /isaac-sim/python.sh -m isaaclab_arena.scripts.analyze_sensitivity \\ + --factors_yaml /tmp/factors.yaml \\ + --episode_summary /tmp/syn_cat.jsonl \\ + --figure_path /tmp/syn_cat_plot.png + +Expected output: a bar chart where the "easy" choices have ~3x more posterior mass and +empirical success rate than the "hard" choices. +""" + +from __future__ import annotations + +import argparse +import json +import random +from pathlib import Path + +# Five distinct objects, like the maple-table droid sweep. The first three are "easy" +# (high success), the last two are "hard" (low success) — a known signal the analyzer +# should recover. +DEFAULT_CHOICES = [ + "rubiks_cube_hot3d_robolab", + "wooden_bowl_hot3d_robolab", + "alphabet_soup_can_hope_robolab", + "mug_ycb_robolab", + "sugar_box_ycb_robolab", +] +DEFAULT_SUCCESS_PROBABILITIES = [0.90, 0.85, 0.75, 0.25, 0.15] + + +def _factors_yaml_text(choices: list[str]) -> str: + """Build the factors.yaml content matching the synthetic data.""" + choices_string = ", ".join(choices) + return ( + "# factors.yaml — synthetic categorical dataset for analyzer smoke-testing.\n" + "# Auto-emitted by synthetic_data_categorical alongside the JSONL.\n" + "\n" + "slice:\n" + " policy: synthetic_categorical\n" + " task: synthetic_pick_and_place\n" + " embodiment: synthetic\n" + "\n" + "factors:\n" + " pick_up_object:\n" + " type: categorical\n" + f" choices: [{choices_string}]\n" + "\n" + "outcomes:\n" + " success_rate:\n" + " type: float\n" + " object_moved_rate:\n" + " type: float\n" + ) + + +def main(): + parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument( + "--output", + type=str, + default="/tmp/synthetic_categorical_episode_summary.jsonl", + help="Output JSONL path.", + ) + parser.add_argument( + "--factors-yaml-out", + type=str, + default=None, + help="Output factors.yaml path. Default: same directory as --output, named factors.yaml.", + ) + parser.add_argument( + "--num-episodes", + type=int, + default=200, + help="Total episodes (uniform draws across all choices). Default 200 → ~40 per category" + " for 5 choices.", + ) + parser.add_argument("--seed", type=int, default=42, help="RNG seed for reproducibility.") + args = parser.parse_args() + + random_generator = random.Random(args.seed) + choices = DEFAULT_CHOICES + success_probabilities = DEFAULT_SUCCESS_PROBABILITIES + assert len(choices) == len(success_probabilities), ( + "DEFAULT_CHOICES and DEFAULT_SUCCESS_PROBABILITIES lengths must match" + ) + num_choices = len(choices) + + summary_rows = [] + per_category_stats: dict[str, list[int]] = { + choice: [0, 0] for choice in choices + } # category → [successes, total] + for episode_index in range(args.num_episodes): + category_index = random_generator.randrange(num_choices) + chosen_category = choices[category_index] + was_success = 1.0 if random_generator.random() < success_probabilities[category_index] else 0.0 + per_category_stats[chosen_category][0] += int(was_success) + per_category_stats[chosen_category][1] += 1 + summary_rows.append( + { + "job_name": "synth_categorical", + "episode_idx": episode_index, + "arena_env_args": {"pick_up_object": chosen_category}, + "outcomes": {"success_rate": was_success, "object_moved_rate": was_success}, + } + ) + + output_path = Path(args.output) + output_path.parent.mkdir(parents=True, exist_ok=True) + with open(output_path, "w", encoding="utf-8") as jsonl_file: + for summary_row in summary_rows: + jsonl_file.write(json.dumps(summary_row) + "\n") + + factors_yaml_path = ( + Path(args.factors_yaml_out) if args.factors_yaml_out else output_path.parent / "factors.yaml" + ) + factors_yaml_path.parent.mkdir(parents=True, exist_ok=True) + factors_yaml_path.write_text(_factors_yaml_text(choices), encoding="utf-8") + + print(f"[INFO] Wrote {len(summary_rows)} rows to {output_path}") + print(f"[INFO] Wrote factors schema → {factors_yaml_path}") + print( + "[INFO] Per-category success counts (analyzer should pull posterior mass toward easy cats):" + ) + for choice, target_probability in zip(choices, success_probabilities): + successes, total = per_category_stats[choice] + empirical_percentage = 100 * successes / total if total else 0.0 + bar_string = "█" * int(round(empirical_percentage / 5)) + print( + f" {choice:<35s} target={target_probability:>4.0%}" + f" empirical={successes:>3d}/{total:<3d} ({empirical_percentage:>5.1f}%) {bar_string}" + ) + + +if __name__ == "__main__": + main() diff --git a/isaaclab_arena/analysis/sensitivity/synthetic_data.py b/isaaclab_arena/analysis/sensitivity/synthetic_data_continuous.py similarity index 64% rename from isaaclab_arena/analysis/sensitivity/synthetic_data.py rename to isaaclab_arena/analysis/sensitivity/synthetic_data_continuous.py index ab46b06bf..24b1fba87 100644 --- a/isaaclab_arena/analysis/sensitivity/synthetic_data.py +++ b/isaaclab_arena/analysis/sensitivity/synthetic_data_continuous.py @@ -30,7 +30,7 @@ Pair with the hand-authored ``light_intensity_sweep_factors.yaml`` so the analyzer script can be smoke-tested end-to-end without running Isaac Sim: - /isaac-sim/python.sh -m isaaclab_arena.analysis.sensitivity.synthetic_data \\ + /isaac-sim/python.sh -m isaaclab_arena.analysis.sensitivity.synthetic_data_continuous \\ --output /tmp/syn.jsonl /isaac-sim/python.sh -m isaaclab_arena.scripts.analyze_sensitivity \\ --factors_yaml isaaclab_arena_environments/eval_jobs_configs/light_intensity_sweep_factors.yaml \\ @@ -58,7 +58,7 @@ # importing episode_writer would transitively load pxr via isaaclab_arena.metrics. _SYNTHETIC_FACTORS_YAML = """\ # factors.yaml — synthetic dataset for analyzer smoke-testing. -# Auto-emitted by isaaclab_arena.analysis.sensitivity.synthetic_data alongside the JSONL. +# Auto-emitted by isaaclab_arena.analysis.sensitivity.synthetic_data_continuous alongside the JSONL. slice: policy: synthetic_linear_uniform @@ -78,17 +78,15 @@ """ -def p_success(intensity: float, center: float, sigma: float) -> float: +def success_probability(intensity: float, center: float, sigma: float) -> float: """Linear-Gaussian competence band: peaks at `center`, falls off symmetrically in linear space.""" - z = (intensity - center) / sigma - return math.exp(-0.5 * z * z) + z_score = (intensity - center) / sigma + return math.exp(-0.5 * z_score * z_score) def main(): parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) - parser.add_argument( - "--output", type=str, default="/tmp/synthetic_episode_summary.jsonl", help="Output JSONL path." - ) + parser.add_argument("--output", type=str, default="/tmp/synthetic_episode_summary.jsonl", help="Output JSONL path.") parser.add_argument( "--factors-yaml-out", type=str, @@ -101,65 +99,64 @@ def main(): default=180, help="Total number of episodes to generate. Each draws an intensity from Uniform(10, 5000).", ) - parser.add_argument( - "--center", type=float, default=500.0, help="Intensity where success rate peaks. Default: 500." - ) + parser.add_argument("--center", type=float, default=500.0, help="Intensity where success rate peaks. Default: 500.") parser.add_argument( "--sigma", type=float, default=400.0, - help="Linear-space width of the competence band (1 sigma in intensity units). Default: 400," - " which gives ~95%% success in [100, 900] and near-zero success beyond ~1700.", + help=( + "Linear-space width of the competence band (1 sigma in intensity units). Default: 400," + " which gives ~95%% success in [100, 900] and near-zero success beyond ~1700." + ), ) parser.add_argument("--seed", type=int, default=42, help="RNG seed for reproducibility.") args = parser.parse_args() - rng = random.Random(args.seed) - - rows = [] - for ep_idx in range(args.num_episodes): - intensity = rng.uniform(INTENSITY_LOW, INTENSITY_HIGH) - p = p_success(intensity, args.center, args.sigma) - success = 1.0 if rng.random() < p else 0.0 - rows.append( - { - "job_name": "synth_linear_uniform", - "episode_idx": ep_idx, - "factors": {"light_intensity": intensity}, - "outcomes": {"success_rate": success, "object_moved_rate": success}, - } - ) + random_generator = random.Random(args.seed) + + summary_rows = [] + for episode_index in range(args.num_episodes): + intensity = random_generator.uniform(INTENSITY_LOW, INTENSITY_HIGH) + probability_of_success = success_probability(intensity, args.center, args.sigma) + was_success = 1.0 if random_generator.random() < probability_of_success else 0.0 + summary_rows.append({ + "job_name": "synth_linear_uniform", + "episode_idx": episode_index, + "arena_env_args": {"light_intensity": intensity}, + "outcomes": {"success_rate": was_success, "object_moved_rate": was_success}, + }) output_path = Path(args.output) output_path.parent.mkdir(parents=True, exist_ok=True) - with open(output_path, "w", encoding="utf-8") as f: - for row in rows: - f.write(json.dumps(row) + "\n") + with open(output_path, "w", encoding="utf-8") as jsonl_file: + for summary_row in summary_rows: + jsonl_file.write(json.dumps(summary_row) + "\n") # Emit a matching factors.yaml so the analyzer can be pointed at this synthetic dataset # without any hand-authored schema. Inline string template — see _SYNTHETIC_FACTORS_YAML. - factors_yaml_out = ( - Path(args.factors_yaml_out) if args.factors_yaml_out else output_path.parent / "factors.yaml" - ) - factors_yaml_out.parent.mkdir(parents=True, exist_ok=True) - factors_yaml_out.write_text(_SYNTHETIC_FACTORS_YAML, encoding="utf-8") + factors_yaml_path = Path(args.factors_yaml_out) if args.factors_yaml_out else output_path.parent / "factors.yaml" + factors_yaml_path.parent.mkdir(parents=True, exist_ok=True) + factors_yaml_path.write_text(_SYNTHETIC_FACTORS_YAML, encoding="utf-8") - print(f"[INFO] Wrote {len(rows)} rows to {output_path}") - print(f"[INFO] Wrote factors schema → {factors_yaml_out}") + print(f"[INFO] Wrote {len(summary_rows)} rows to {output_path}") + print(f"[INFO] Wrote factors schema → {factors_yaml_path}") print(f"[INFO] Linear-Gaussian competence band: center={args.center:g}, sigma={args.sigma:g}") print("[INFO] Per-bin success rates (10 equal bins across the prior range):") num_bins = 10 bin_width = (INTENSITY_HIGH - INTENSITY_LOW) / num_bins - for bin_idx in range(num_bins): - lo = INTENSITY_LOW + bin_idx * bin_width - hi = lo + bin_width - bin_rows = [r for r in rows if lo <= r["factors"]["light_intensity"] < hi] - if not bin_rows: + for bin_index in range(num_bins): + bin_low = INTENSITY_LOW + bin_index * bin_width + bin_high = bin_low + bin_width + rows_in_bin = [row for row in summary_rows if bin_low <= row["arena_env_args"]["light_intensity"] < bin_high] + if not rows_in_bin: continue - succ = sum(int(r["outcomes"]["success_rate"]) for r in bin_rows) - pct = 100 * succ / len(bin_rows) - bar = "█" * int(round(pct / 5)) - print(f" [{lo:>5g}, {hi:>5g}): {succ:>3d}/{len(bin_rows):<3d} ({pct:>5.1f}%) {bar}") + successes_in_bin = sum(int(row["outcomes"]["success_rate"]) for row in rows_in_bin) + percentage = 100 * successes_in_bin / len(rows_in_bin) + bar_string = "█" * int(round(percentage / 5)) + print( + f" [{bin_low:>5g}, {bin_high:>5g}): {successes_in_bin:>3d}/{len(rows_in_bin):<3d}" + f" ({percentage:>5.1f}%) {bar_string}" + ) if __name__ == "__main__": diff --git a/isaaclab_arena/evaluation/eval_runner.py b/isaaclab_arena/evaluation/eval_runner.py index 680633abc..bb8eeed3c 100644 --- a/isaaclab_arena/evaluation/eval_runner.py +++ b/isaaclab_arena/evaluation/eval_runner.py @@ -144,20 +144,14 @@ def main(): # Check if any job requires cameras and enable them if needed before starting simulation enable_cameras_if_required(eval_jobs_config, args_cli) - # Sensitivity recording is opt-in via --factor_keys + --episode_summary. The hand-authored - # factors.yaml the analyzer consumes is the user's responsibility — keep it in sync with - # what's recorded here. - sensitivity_enabled = args_cli.factor_keys is not None and args_cli.episode_summary is not None - factor_keys: list[str] = list(args_cli.factor_keys) if args_cli.factor_keys else [] - if sensitivity_enabled: + # Per-episode summary recording is opt-in via --episode_summary. The writer logs the + # full arena_env_args dict per episode; the analyzer side decides which keys to treat + # as factors via factors.yaml. No eval-side knowledge of "factors" required. + episode_summary_enabled = args_cli.episode_summary is not None + if episode_summary_enabled: print( - f"[INFO] Sensitivity recording enabled. Recording factors {factor_keys}" - f" per episode to: {args_cli.episode_summary}" - ) - elif args_cli.factor_keys or args_cli.episode_summary: - print( - "[WARN] --factor_keys and --episode_summary must both be set to enable sensitivity" - " recording; got only one. Skipping recording." + "[INFO] Episode summary recording enabled. Per-episode arena_env_args + outcomes" + f" → {args_cli.episode_summary}" ) with SimulationAppContext(args_cli): @@ -210,13 +204,13 @@ def main(): language_instruction=job.language_instruction, ) - if sensitivity_enabled: + if episode_summary_enabled: # Deferred import — episode_writer transitively touches pxr via # isaaclab_arena.metrics.metrics. Matches the policy_runner.py:107 # pattern for compute_metrics. from isaaclab_arena.analysis.sensitivity.episode_writer import write_episode_summaries - rows = write_episode_summaries(env, job, factor_keys, args_cli.episode_summary) + rows = write_episode_summaries(env, job, args_cli.episode_summary) print(f"[INFO] Wrote {rows} episode summaries for job '{job.name}'") job_manager.complete_job(job, metrics=metrics, status=Status.COMPLETED) diff --git a/isaaclab_arena/evaluation/eval_runner_cli.py b/isaaclab_arena/evaluation/eval_runner_cli.py index dec7bfca9..81343e61b 100644 --- a/isaaclab_arena/evaluation/eval_runner_cli.py +++ b/isaaclab_arena/evaluation/eval_runner_cli.py @@ -27,25 +27,16 @@ def add_eval_runner_arguments(parser: argparse.ArgumentParser) -> None: default=False, help="Continue evaluation with remaining jobs when a job fails instead of stopping immediately.", ) - parser.add_argument( - "--factor_keys", - type=str, - nargs="*", - default=None, - help=( - "Names of arena_env_args keys to record per episode for sensitivity analysis." - " When set together with --episode_summary, eval_runner writes one JSONL row per" - " demo with the listed factor values + the task's registered outcomes. The schema" - " (factors.yaml) is the user's responsibility — hand-author it to match this list" - " and the analyzer reads it. Example: --factor_keys light_intensity" - ), - ) parser.add_argument( "--episode_summary", type=str, default=None, help=( - "Output JSONL file for per-episode sensitivity summaries. Only used when" - " --factor_keys is also set. Absent means no recording, unchanged behavior." + "Output JSONL file for per-episode summaries. When set, eval_runner writes one" + " JSONL row per recorded demo containing the full arena_env_args dict (what" + " parameterized the env for that episode) and the task's registered outcomes." + " The analyzer side picks which arena_env_args keys to treat as factors via" + " factors.yaml — no eval-side flag needed. Absent here means no recording and" + " unchanged behavior for non-sensitivity workflows." ), ) diff --git a/isaaclab_arena/scripts/analyze_sensitivity.py b/isaaclab_arena/scripts/analyze_sensitivity.py index 4148aa6b6..052948b2f 100644 --- a/isaaclab_arena/scripts/analyze_sensitivity.py +++ b/isaaclab_arena/scripts/analyze_sensitivity.py @@ -22,8 +22,9 @@ import argparse -from isaaclab_arena.analysis.sensitivity.analyzer import NPEAnalyzer +from isaaclab_arena.analysis.sensitivity.analyzer import make_analyzer from isaaclab_arena.analysis.sensitivity.dataset import SensitivityDataset +from isaaclab_arena.analysis.sensitivity.plotting import plot_marginal def main(): @@ -62,7 +63,7 @@ def main(): dataset = SensitivityDataset(args.factors_yaml, args.episode_summary) available_factors = list(dataset.factor_columns) - available_outcomes = [o.name for o in dataset.schema.outcomes] + available_outcomes = [outcome.name for outcome in dataset.schema.outcomes] if args.input_factor is None: factor_name = available_factors[0] @@ -89,15 +90,15 @@ def main(): f" (conditioning on outcome={args.outcome_value:g})" ) print( - f"[INFO] N={len(dataset.rows)} episodes; theta shape={tuple(dataset.theta.shape)};" + f"[INFO] num_episodes={len(dataset.rows)}; theta shape={tuple(dataset.theta.shape)};" f" x shape={tuple(dataset.x.shape)}" ) - analyzer = NPEAnalyzer(dataset, outcome_name=outcome_name) - print("[INFO] Fitting NPE...") + analyzer = make_analyzer(dataset, outcome_name=outcome_name) + print(f"[INFO] Dispatched analyzer: {type(analyzer).__name__}") analyzer.fit() print(f"[INFO] Plotting marginal -> {args.figure_path}") - analyzer.plot_marginal(factor_name=factor_name, output_path=args.figure_path, outcome_value=args.outcome_value) + plot_marginal(analyzer, factor_name, output_path=args.figure_path, outcome_value=args.outcome_value) print("[INFO] Done.") diff --git a/isaaclab_arena_environments/eval_jobs_configs/light_intensity_sweep_factors.yaml b/isaaclab_arena_environments/eval_jobs_configs/light_intensity_sweep_factors.yaml index 1153585ef..4a4c82200 100644 --- a/isaaclab_arena_environments/eval_jobs_configs/light_intensity_sweep_factors.yaml +++ b/isaaclab_arena_environments/eval_jobs_configs/light_intensity_sweep_factors.yaml @@ -1,3 +1,8 @@ +# Copyright (c) 2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + # Sensitivity-analysis schema for the light_intensity sweep on droid + pi0. # Paired with: light_intensity_sweep_jobs_config.json (and the minimal variant). # Hand-authored — must stay in sync with --factor_keys passed to eval_runner. diff --git a/isaaclab_arena_environments/eval_jobs_configs/pick_up_object_sweep_factors.yaml b/isaaclab_arena_environments/eval_jobs_configs/pick_up_object_sweep_factors.yaml new file mode 100644 index 000000000..0649a2350 --- /dev/null +++ b/isaaclab_arena_environments/eval_jobs_configs/pick_up_object_sweep_factors.yaml @@ -0,0 +1,21 @@ +# Sensitivity-analysis schema for the pick_up_object sweep on droid + pi0. +# Paired with: pick_up_object_sweep_minimal_jobs_config.json +# Hand-authored — must stay in sync with --factor_keys passed to eval_runner. + +slice: + policy: pi0_remote + task: pick_and_place_maple_table + embodiment: droid_abs_joint_pos + +factors: + pick_up_object: + type: categorical + # Three objects with distinct visual / shape characteristics. List them in the order + # the analyzer should use as integer codes (0=rubiks_cube, 1=alphabet_soup_can, 2=sugar_box). + choices: [rubiks_cube_hot3d_robolab, alphabet_soup_can_hope_robolab, sugar_box_ycb_robolab] + +outcomes: + success_rate: + type: float + object_moved_rate: + type: float diff --git a/isaaclab_arena_environments/eval_jobs_configs/pick_up_object_sweep_minimal_jobs_config.json b/isaaclab_arena_environments/eval_jobs_configs/pick_up_object_sweep_minimal_jobs_config.json new file mode 100644 index 000000000..fc2b3950c --- /dev/null +++ b/isaaclab_arena_environments/eval_jobs_configs/pick_up_object_sweep_minimal_jobs_config.json @@ -0,0 +1,70 @@ +{ + "jobs": [ + { + "name": "pick_up_object_minimal_rubiks_cube", + "arena_env_args": { + "enable_cameras": true, + "environment": "pick_and_place_maple_table", + "embodiment": "droid_abs_joint_pos", + "hdr": "billiard_hall_robolab", + "light_intensity": 500, + "pick_up_object": "rubiks_cube_hot3d_robolab", + "destination_location": "wooden_bowl_hot3d_robolab" + }, + "num_episodes": 2, + "language_instruction": "Pick up the Rubik's cube and place it in the bowl.", + "policy_type": "isaaclab_arena_openpi.policy.pi0_remote_policy.Pi0RemotePolicy", + "policy_config_dict": { + "policy_variant": "pi05", + "policy_device": "cuda:0", + "remote_host": "127.0.0.1", + "remote_port": 8000, + "openpi_embodiment_adapter": "droid" + } + }, + { + "name": "pick_up_object_minimal_alphabet_soup_can", + "arena_env_args": { + "enable_cameras": true, + "environment": "pick_and_place_maple_table", + "embodiment": "droid_abs_joint_pos", + "hdr": "billiard_hall_robolab", + "light_intensity": 500, + "pick_up_object": "alphabet_soup_can_hope_robolab", + "destination_location": "wooden_bowl_hot3d_robolab" + }, + "num_episodes": 2, + "language_instruction": "Pick up the soup can and place it in the bowl.", + "policy_type": "isaaclab_arena_openpi.policy.pi0_remote_policy.Pi0RemotePolicy", + "policy_config_dict": { + "policy_variant": "pi05", + "policy_device": "cuda:0", + "remote_host": "127.0.0.1", + "remote_port": 8000, + "openpi_embodiment_adapter": "droid" + } + }, + { + "name": "pick_up_object_minimal_sugar_box", + "arena_env_args": { + "enable_cameras": true, + "environment": "pick_and_place_maple_table", + "embodiment": "droid_abs_joint_pos", + "hdr": "billiard_hall_robolab", + "light_intensity": 500, + "pick_up_object": "sugar_box_ycb_robolab", + "destination_location": "wooden_bowl_hot3d_robolab" + }, + "num_episodes": 2, + "language_instruction": "Pick up the sugar box and place it in the bowl.", + "policy_type": "isaaclab_arena_openpi.policy.pi0_remote_policy.Pi0RemotePolicy", + "policy_config_dict": { + "policy_variant": "pi05", + "policy_device": "cuda:0", + "remote_host": "127.0.0.1", + "remote_port": 8000, + "openpi_embodiment_adapter": "droid" + } + } + ] +} From 104314a92aa3fff91bf0a78e5574ac04eca2348a Mon Sep 17 00:00:00 2001 From: Clemens Volk Date: Thu, 28 May 2026 15:03:42 +0200 Subject: [PATCH 5/6] Fix pre-commit issues caught by CI CI's pre-commit was stricter than the local run had been: - isort + pyupgrade reformat tweaks in plotting.py and synthetic_data_categorical.py. - insert-license added the standard Apache-2.0 header to pick_up_object_sweep_factors.yaml. Local pre-commit run now passes cleanly across all files. Signed-off-by: Clemens Volk --- .../analysis/sensitivity/plotting.py | 30 ++++++---------- .../sensitivity/synthetic_data_categorical.py | 35 +++++++------------ .../pick_up_object_sweep_factors.yaml | 5 +++ 3 files changed, 29 insertions(+), 41 deletions(-) diff --git a/isaaclab_arena/analysis/sensitivity/plotting.py b/isaaclab_arena/analysis/sensitivity/plotting.py index 5a5df1bf7..2d2394da5 100644 --- a/isaaclab_arena/analysis/sensitivity/plotting.py +++ b/isaaclab_arena/analysis/sensitivity/plotting.py @@ -16,18 +16,17 @@ from __future__ import annotations +import numpy as np from pathlib import Path from typing import TYPE_CHECKING -import numpy as np - if TYPE_CHECKING: from isaaclab_arena.analysis.sensitivity.analyzer import BaseAnalyzer from isaaclab_arena.analysis.sensitivity.dataset import FactorSpec def plot_marginal( - analyzer: "BaseAnalyzer", + analyzer: BaseAnalyzer, factor_name: str, output_path: str | Path, outcome_value: float = 1.0, @@ -44,8 +43,7 @@ def plot_marginal( if factor_spec.type == "continuous": if not hasattr(analyzer, "continuous_marginal_density"): raise NotImplementedError( - f"{type(analyzer).__name__} cannot plot continuous factors; expected a" - " PosteriorAnalyzer (NPE/MNPE)." + f"{type(analyzer).__name__} cannot plot continuous factors; expected a PosteriorAnalyzer (NPE/MNPE)." ) _plot_continuous_marginal(analyzer, factor_spec, output_path, outcome_value, num_grid_points) elif factor_spec.type == "categorical": @@ -55,8 +53,8 @@ def plot_marginal( def _plot_continuous_marginal( - analyzer: "BaseAnalyzer", - factor_spec: "FactorSpec", + analyzer: BaseAnalyzer, + factor_spec: FactorSpec, output_path: str | Path, outcome_value: float, num_grid_points: int, @@ -71,9 +69,7 @@ def _plot_continuous_marginal( """ import matplotlib.pyplot as plt - grid, density = analyzer.continuous_marginal_density( - factor_spec.name, outcome_value, num_grid_points - ) + grid, density = analyzer.continuous_marginal_density(factor_spec.name, outcome_value, num_grid_points) # Empirical rug, coloured by outcome — gives the human a sanity-check on the curve. factor_column_slice = analyzer.dataset.factor_columns[factor_spec.name] outcome_column_index = analyzer.dataset.outcome_columns[analyzer.outcome_name] @@ -116,8 +112,8 @@ def _plot_continuous_marginal( def _plot_categorical_marginal( - analyzer: "BaseAnalyzer", - factor_spec: "FactorSpec", + analyzer: BaseAnalyzer, + factor_spec: FactorSpec, output_path: str | Path, outcome_value: float, num_samples: int, @@ -143,13 +139,9 @@ def _plot_categorical_marginal( # Posterior probs come from the analyzer; empirical rate and counts are raw data, # rendered alongside as a sanity reference. - posterior_probabilities = analyzer.categorical_marginal_probs( - factor_spec.name, outcome_value, num_samples - ) + posterior_probabilities = analyzer.categorical_marginal_probs(factor_spec.name, outcome_value, num_samples) - empirical_theta_codes = ( - analyzer.dataset.theta[:, factor_column_slice].squeeze(-1).long().cpu().numpy() - ) + empirical_theta_codes = analyzer.dataset.theta[:, factor_column_slice].squeeze(-1).long().cpu().numpy() empirical_outcomes = analyzer.dataset.x[:, outcome_column_index].cpu().numpy() empirical_rates = np.zeros(num_choices) empirical_counts = np.zeros(num_choices, dtype=int) @@ -198,7 +190,7 @@ def _plot_categorical_marginal( _save_figure(figure, output_path) -def _plot_title(analyzer: "BaseAnalyzer", factor_name: str) -> str: +def _plot_title(analyzer: BaseAnalyzer, factor_name: str) -> str: """Format the plot title as ``"Sensitivity of to " / slice block``.""" return ( f"Sensitivity of {analyzer.outcome_name} to {factor_name}\n" diff --git a/isaaclab_arena/analysis/sensitivity/synthetic_data_categorical.py b/isaaclab_arena/analysis/sensitivity/synthetic_data_categorical.py index 23a16640e..550048b03 100644 --- a/isaaclab_arena/analysis/sensitivity/synthetic_data_categorical.py +++ b/isaaclab_arena/analysis/sensitivity/synthetic_data_categorical.py @@ -91,8 +91,7 @@ def main(): "--num-episodes", type=int, default=200, - help="Total episodes (uniform draws across all choices). Default 200 → ~40 per category" - " for 5 choices.", + help="Total episodes (uniform draws across all choices). Default 200 → ~40 per category for 5 choices.", ) parser.add_argument("--seed", type=int, default=42, help="RNG seed for reproducibility.") args = parser.parse_args() @@ -100,29 +99,25 @@ def main(): random_generator = random.Random(args.seed) choices = DEFAULT_CHOICES success_probabilities = DEFAULT_SUCCESS_PROBABILITIES - assert len(choices) == len(success_probabilities), ( - "DEFAULT_CHOICES and DEFAULT_SUCCESS_PROBABILITIES lengths must match" - ) + assert len(choices) == len( + success_probabilities + ), "DEFAULT_CHOICES and DEFAULT_SUCCESS_PROBABILITIES lengths must match" num_choices = len(choices) summary_rows = [] - per_category_stats: dict[str, list[int]] = { - choice: [0, 0] for choice in choices - } # category → [successes, total] + per_category_stats: dict[str, list[int]] = {choice: [0, 0] for choice in choices} # category → [successes, total] for episode_index in range(args.num_episodes): category_index = random_generator.randrange(num_choices) chosen_category = choices[category_index] was_success = 1.0 if random_generator.random() < success_probabilities[category_index] else 0.0 per_category_stats[chosen_category][0] += int(was_success) per_category_stats[chosen_category][1] += 1 - summary_rows.append( - { - "job_name": "synth_categorical", - "episode_idx": episode_index, - "arena_env_args": {"pick_up_object": chosen_category}, - "outcomes": {"success_rate": was_success, "object_moved_rate": was_success}, - } - ) + summary_rows.append({ + "job_name": "synth_categorical", + "episode_idx": episode_index, + "arena_env_args": {"pick_up_object": chosen_category}, + "outcomes": {"success_rate": was_success, "object_moved_rate": was_success}, + }) output_path = Path(args.output) output_path.parent.mkdir(parents=True, exist_ok=True) @@ -130,17 +125,13 @@ def main(): for summary_row in summary_rows: jsonl_file.write(json.dumps(summary_row) + "\n") - factors_yaml_path = ( - Path(args.factors_yaml_out) if args.factors_yaml_out else output_path.parent / "factors.yaml" - ) + factors_yaml_path = Path(args.factors_yaml_out) if args.factors_yaml_out else output_path.parent / "factors.yaml" factors_yaml_path.parent.mkdir(parents=True, exist_ok=True) factors_yaml_path.write_text(_factors_yaml_text(choices), encoding="utf-8") print(f"[INFO] Wrote {len(summary_rows)} rows to {output_path}") print(f"[INFO] Wrote factors schema → {factors_yaml_path}") - print( - "[INFO] Per-category success counts (analyzer should pull posterior mass toward easy cats):" - ) + print("[INFO] Per-category success counts (analyzer should pull posterior mass toward easy cats):") for choice, target_probability in zip(choices, success_probabilities): successes, total = per_category_stats[choice] empirical_percentage = 100 * successes / total if total else 0.0 diff --git a/isaaclab_arena_environments/eval_jobs_configs/pick_up_object_sweep_factors.yaml b/isaaclab_arena_environments/eval_jobs_configs/pick_up_object_sweep_factors.yaml index 0649a2350..ab5eb24cd 100644 --- a/isaaclab_arena_environments/eval_jobs_configs/pick_up_object_sweep_factors.yaml +++ b/isaaclab_arena_environments/eval_jobs_configs/pick_up_object_sweep_factors.yaml @@ -1,3 +1,8 @@ +# Copyright (c) 2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + # Sensitivity-analysis schema for the pick_up_object sweep on droid + pi0. # Paired with: pick_up_object_sweep_minimal_jobs_config.json # Hand-authored — must stay in sync with --factor_keys passed to eval_runner. From 74585f1bda826d80a536aceaff704e37f4211c20 Mon Sep 17 00:00:00 2001 From: Clemens Volk Date: Thu, 28 May 2026 16:20:43 +0200 Subject: [PATCH 6/6] Add --camera_video flag to eval_runner Mirrors the existing flag on policy_runner: when set, eval_runner wraps each job's env with CameraObsVideoRecorder so one mp4 per camera in obs["camera_obs"] is written into //. Independent of --video (which records the kit viewport). Useful for diagnosing what a policy actually sees during an eval sweep. Signed-off-by: Clemens Volk --- isaaclab_arena/evaluation/eval_runner.py | 21 ++++++++++++++++---- isaaclab_arena/evaluation/eval_runner_cli.py | 10 ++++++++++ 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/isaaclab_arena/evaluation/eval_runner.py b/isaaclab_arena/evaluation/eval_runner.py index bb8eeed3c..d9902f638 100644 --- a/isaaclab_arena/evaluation/eval_runner.py +++ b/isaaclab_arena/evaluation/eval_runner.py @@ -14,6 +14,7 @@ from typing import TYPE_CHECKING from isaaclab_arena.cli.isaaclab_arena_cli import get_isaaclab_arena_cli_parser +from isaaclab_arena.evaluation.camera_video import CameraObsVideoRecorder from isaaclab_arena.evaluation.eval_runner_cli import add_eval_runner_arguments from isaaclab_arena.evaluation.job_manager import Job, JobManager, Status from isaaclab_arena.evaluation.policy_runner import get_policy_cls, rollout_policy @@ -160,7 +161,7 @@ def main(): job_manager.print_jobs_info() - if args_cli.video: + if args_cli.video or args_cli.camera_video: os.makedirs(args_cli.video_dir, exist_ok=True) print(f"[INFO] Video recording enabled. Videos will be saved to: {args_cli.video_dir}") @@ -182,20 +183,32 @@ def main(): else: job.num_steps = args_cli.num_steps - if args_cli.video: + if args_cli.video or args_cli.camera_video: if job.num_steps is not None: video_length = job.num_steps else: video_length = job.num_episodes * env.unwrapped.max_episode_length + job_video_folder = os.path.join(args_cli.video_dir, job.name) + + if args_cli.video: video_kwargs = { - "video_folder": os.path.join(args_cli.video_dir, job.name), + "video_folder": job_video_folder, "step_trigger": lambda step: step == 0, "video_length": video_length, "disable_logger": True, } - print(f"[INFO] Recording video for job '{job.name}' -> {video_kwargs['video_folder']}") + print(f"[INFO] Recording viewport video for job '{job.name}' -> {job_video_folder}") env = RecordVideo(env, **video_kwargs) + if args_cli.camera_video: + print(f"[INFO] Recording per-camera videos for job '{job.name}' -> {job_video_folder}") + env = CameraObsVideoRecorder( + env, + video_folder=job_video_folder, + step_trigger=lambda step: step == 0, + video_length=video_length, + ) + metrics = rollout_policy( env, policy, diff --git a/isaaclab_arena/evaluation/eval_runner_cli.py b/isaaclab_arena/evaluation/eval_runner_cli.py index 81343e61b..d776e180b 100644 --- a/isaaclab_arena/evaluation/eval_runner_cli.py +++ b/isaaclab_arena/evaluation/eval_runner_cli.py @@ -21,6 +21,16 @@ def add_eval_runner_arguments(parser: argparse.ArgumentParser) -> None: default="/eval/videos", help="Root directory for recorded videos. Each job gets a subdirectory.", ) + parser.add_argument( + "--camera_video", + "--camera-video", + action="store_true", + default=False, + help=( + "For each job, record one mp4 per camera in obs['camera_obs'] (what the policy actually sees)." + " Independent of --video; use either or both." + ), + ) parser.add_argument( "--continue_on_error", action="store_true",