diff --git a/docs/pages/example_workflows/static_apple/step_2_teleoperation.rst b/docs/pages/example_workflows/static_apple/step_2_teleoperation.rst index 402b2ead5..b151bd33c 100644 --- a/docs/pages/example_workflows/static_apple/step_2_teleoperation.rst +++ b/docs/pages/example_workflows/static_apple/step_2_teleoperation.rst @@ -311,10 +311,22 @@ geometry, and prints a per-file summary with the demo and step counts: input files are listed, so list the sessions chronologically if you want the merged file to reflect the order of collection. +.. note:: + + Empty demos (no ``actions`` dataset or zero-length actions) are filtered out automatically + — they can appear after an interrupted recording session and would silently break + ``replay_demos.py`` and the LeRobot conversion downstream. Dropped demos are listed by + source file in the merge summary, and the surviving demos are renumbered contiguously. + The script also works with a single input file (it copies the file, drops empties, and + recomputes ``data.attrs["total"]``), which is a quick way to sanitize a single + half-finished session before training. + If a session was recorded against a slightly different environment (e.g. a different physics -timestep) the merge will warn but still proceed. Schema-level differences (different action -dimensions, missing observation keys, different camera resolutions) are hard errors: re-record -the offending session against the canonical environment instead. +timestep) the merge will warn but still proceed. Cross-file schema-level differences (different +action dimensions, missing observation keys, different camera resolutions) and intra-file +inconsistencies on non-empty demos (one valid demo lacks a key the others have, e.g. a +missing camera observation) are hard errors — re-record the offending session against the +canonical environment. Step 5: Replay Recorded Demos (Optional) diff --git a/isaaclab_arena/scripts/imitation_learning/merge_demos.py b/isaaclab_arena/scripts/imitation_learning/merge_demos.py index 13eec64ac..6e29f2a71 100644 --- a/isaaclab_arena/scripts/imitation_learning/merge_demos.py +++ b/isaaclab_arena/scripts/imitation_learning/merge_demos.py @@ -18,11 +18,15 @@ - Recomputes ``data.attrs["total"]`` from the per-demo ``num_samples`` sum. - Validates structural compatibility across inputs (``format_version``, dataset paths, per-key shapes/dtypes) and warns on ``env_args`` mismatches. +- Drops empty demos (no ``actions`` dataset or ``actions.shape[0] == 0``) automatically, + since they break replay and post-training. Dropped demos are listed in the merge summary. - Logs an operator-friendly per-file summary and aggregate report. - Uses a recursive ``h5py.Group.copy`` so new recorder terms added by future Isaac Lab versions (new ``obs/*`` keys, new sensor groups, new metadata attrs) round-trip unchanged. -The script has zero simulation dependency and only requires ``h5py``. +The script has zero simulation dependency and only requires ``h5py``. A single input file is +supported and equivalent to a copy with empty demos dropped and per-demo ``num_samples`` totals +recomputed. Example ------- @@ -60,7 +64,12 @@ class _FileInfo: success_count: int = 0 no_success_attr_count: int = 0 failed_count: int = 0 - untracked_step_demos: list[str] = field(default_factory=list) + # Demos that won't replay or train: either no `actions` dataset or `actions.shape[0] == 0`. + empty_demo_names: list[str] = field(default_factory=list) + # Demos within this file whose dataset key/shape/dtype set differs from demo_0's. This + # surfaces single-recording corruption that the cross-file schema check (which samples + # one demo per input) cannot see. + intra_file_schema_mismatches: list[str] = field(default_factory=list) def _format_bytes(num_bytes: float) -> str: @@ -140,25 +149,39 @@ def _inspect_file(path: str) -> _FileInfo: if num_demos == 0: raise ValueError(f"{path}: contains no 'demo_*' groups; nothing to merge") - # Sample the schema from the first demo only. record_demos.py writes a uniform - # recorder stack within a single file, so this is the fast path. Intra-file - # inconsistency (e.g. a file that was manually edited mid-session) is not detected - # here; cross-file consistency is what _validate_compatibility checks. + # Sample the file-level reference schema from the first demo. record_demos.py writes + # a uniform recorder stack within a single file, so demo_0 is a reasonable reference; + # the per-demo loop below catches the rare case where a single demo in the file + # diverges (e.g. partial recording, manual edit). schema_fingerprint = _build_schema_fingerprint(data_group[demo_names[0]]) total_steps = 0 success_count = 0 no_success_attr_count = 0 failed_count = 0 - untracked_step_demos: list[str] = [] + empty_demo_names: list[str] = [] + intra_file_schema_mismatches: list[str] = [] for name in demo_names: demo = data_group[name] + actions_len = int(demo["actions"].shape[0]) if "actions" in demo else 0 + if "num_samples" in demo.attrs: total_steps += int(demo.attrs["num_samples"]) - elif "actions" in demo: - total_steps += int(demo["actions"].shape[0]) else: - untracked_step_demos.append(name) + total_steps += actions_len + + # "Empty" = unusable for replay / training. Either there is no actions dataset at + # all, or it has zero timesteps. record_demos.py shouldn't normally export these, + # but partial recordings and interrupted sessions can produce them. + if "actions" not in demo or actions_len == 0: + empty_demo_names.append(name) + + # Intra-file schema mismatch: any demo whose key set / shape / dtype differs from + # the file's reference (demo_0). Skip demo_0 itself. + if name != demo_names[0]: + demo_fp = _build_schema_fingerprint(demo) + if demo_fp != schema_fingerprint: + intra_file_schema_mismatches.append(name) if "success" in demo.attrs: if bool(demo.attrs["success"]): @@ -179,7 +202,8 @@ def _inspect_file(path: str) -> _FileInfo: success_count=success_count, no_success_attr_count=no_success_attr_count, failed_count=failed_count, - untracked_step_demos=untracked_step_demos, + empty_demo_names=empty_demo_names, + intra_file_schema_mismatches=intra_file_schema_mismatches, ) @@ -214,7 +238,12 @@ class _ValidationReport: def _validate_compatibility(infos: list[_FileInfo]) -> _ValidationReport: - """Compare input files for compatibility and produce a :class:`_ValidationReport`.""" + """Compare input files for compatibility and produce a :class:`_ValidationReport`. + + Empty demos (no ``actions`` dataset or ``actions.shape[0] == 0``) are always dropped from + the merged output, so intra-file schema mismatches whose only divergence is the empty + demo itself are not surfaced as errors here. + """ report = _ValidationReport() versions = {i.format_version for i in infos} @@ -263,13 +292,31 @@ def _validate_compatibility(infos: list[_FileInfo]) -> _ValidationReport: report.info.append( f"{i.path}: {i.no_success_attr_count} demo(s) without @success attribute (legacy format)." ) - if i.untracked_step_demos: - shown = ", ".join(i.untracked_step_demos[:3]) - suffix = ", ..." if len(i.untracked_step_demos) > 3 else "" + if i.empty_demo_names: + shown = ", ".join(i.empty_demo_names[:3]) + suffix = ", ..." if len(i.empty_demo_names) > 3 else "" report.warnings.append( - f"{i.path}: {len(i.untracked_step_demos)} demo(s) without 'num_samples' attribute" - f" or 'actions' dataset ({shown}{suffix}); contributed 0 to the reported step total." + f"{i.path}: {len(i.empty_demo_names)} empty demo(s) ({shown}{suffix}) — missing" + " 'actions' or actions.shape[0] == 0. These break replay and post-training and" + " will be dropped from the merged output." ) + if i.intra_file_schema_mismatches: + empty_set = set(i.empty_demo_names) + # Demos that diverge from the file's reference schema solely because they are + # empty will be dropped before being written, so they cannot break downstream + # consumers. Real intra-file inconsistency (e.g. a missing camera obs on an + # otherwise valid demo) still surfaces as a hard error. + actionable = [name for name in i.intra_file_schema_mismatches if name not in empty_set] + if actionable: + shown = ", ".join(actionable[:3]) + suffix = ", ..." if len(actionable) > 3 else "" + report.schema_status = "MISMATCH" + report.errors.append( + f"{i.path}: {len(actionable)} demo(s) ({shown}{suffix}) have a key/shape/dtype" + " set that differs from demo_0 in the same file (e.g. a missing camera" + " observation). Inspect the offending demos and either drop them upstream" + " or re-record the session." + ) return report @@ -283,6 +330,7 @@ def _print_summary( output_size_bytes: int | None = None, report: _ValidationReport | None = None, dry_run: bool = False, + dropped: list[str] | None = None, ) -> None: """Print an operator-friendly per-file table, aggregate row, and validation summary.""" n = len(infos) @@ -295,6 +343,7 @@ def _print_summary( print() for idx, i in enumerate(infos, start=1): env_name = i.env_args.get("env_name", "") + empty_str = f" empty={len(i.empty_demo_names)}" if i.empty_demo_names else "" print( f"[{idx}/{n}] {os.path.basename(i.path):<{max_name_len}} " f"demos={i.num_demos:>4d} " @@ -303,6 +352,7 @@ def _print_summary( f'env="{env_name}" ' f"v={i.format_version} " f"keys={len(i.schema_fingerprint)}" + f"{empty_str}" ) print(_TABLE_SEP_CHAR * (max_name_len + 65)) @@ -336,16 +386,34 @@ def _print_summary( for msg in report.errors: print(f"ERROR: {msg}") + if dropped: + shown = ", ".join(dropped[:5]) + suffix = ", ..." if len(dropped) > 5 else "" + print(f"Dropped {len(dropped)} empty demo(s): {shown}{suffix}") + if total_demos > 0: - print(f"Demo numbering: demo_0..demo_{total_demos - 1} (input order preserved)") + numbering_note = "input order preserved" + if dropped: + numbering_note += f"; {len(dropped)} empty demo(s) skipped" + print(f"Demo numbering: demo_0..demo_{total_demos - 1} ({numbering_note})") print() -def _merge(infos: list[_FileInfo], output_path: str) -> tuple[int, int, int]: +def _merge(infos: list[_FileInfo], output_path: str) -> tuple[int, int, int, list[str]]: """Write a merged HDF5 dataset from validated inputs. + Empty demos (no ``actions`` dataset or ``actions.shape[0] == 0``) are always skipped: + they cannot be replayed or used for training, so including them would only break + downstream consumers. + + Args: + infos: Per-input file summaries produced by :func:`_inspect_file`. + output_path: Destination HDF5 path. + Returns: - A tuple ``(output_size_bytes, total_steps_written, total_demos_written)``. + ``(output_size_bytes, total_steps_written, total_demos_written, dropped_log)`` where + ``dropped_log`` is a list of ``"{source_path}::{demo_name}"`` strings naming every + empty demo that was skipped. Empty when no input demo was empty. """ format_version = infos[0].format_version merged_env_args = dict(infos[0].env_args) @@ -356,6 +424,7 @@ def _merge(infos: list[_FileInfo], output_path: str) -> tuple[int, int, int]: total_demos_written = 0 total_steps_written = 0 + dropped: list[str] = [] with h5py.File(output_path, "w") as out: out.attrs["format_version"] = format_version @@ -363,9 +432,13 @@ def _merge(infos: list[_FileInfo], output_path: str) -> tuple[int, int, int]: data_out.attrs["env_args"] = json.dumps(merged_env_args) for info in infos: + skip_set: set[str] = set(info.empty_demo_names) with h5py.File(info.path, "r") as src: src_data = src["data"] for src_demo_name in _sorted_demo_names(src_data): + if src_demo_name in skip_set: + dropped.append(f"{info.path}::{src_demo_name}") + continue dst_demo_name = f"demo_{total_demos_written}" try: src.copy(src_data[src_demo_name], data_out, name=dst_demo_name) @@ -387,7 +460,7 @@ def _merge(infos: list[_FileInfo], output_path: str) -> tuple[int, int, int]: data_out.attrs["total"] = total_steps_written output_size = os.path.getsize(output_path) - return output_size, total_steps_written, total_demos_written + return output_size, total_steps_written, total_demos_written, dropped def _build_parser() -> argparse.ArgumentParser: @@ -466,6 +539,19 @@ def main(argv: list[str] | None = None) -> int: report = _validate_compatibility(infos) + # Pre-flight: if every input demo is empty, the merged output would have a useless + # zero-demo /data group that downstream loaders can't consume. Refuse rather than + # write a structurally empty file. + total_input_demos = sum(i.num_demos for i in infos) + total_empty_demos = sum(len(i.empty_demo_names) for i in infos) + if total_input_demos > 0 and total_empty_demos == total_input_demos: + _print_summary(infos, args.output_file, report=report, dry_run=True) + print( + "ERROR: every input demo is empty (no 'actions' or actions.shape[0] == 0). Nothing to merge.", + file=sys.stderr, + ) + return 1 + if args.dry_run: _print_summary(infos, args.output_file, report=report, dry_run=True) return 1 if report.errors else 0 @@ -487,7 +573,7 @@ def main(argv: list[str] | None = None) -> int: tmp_path = args.output_file + ".tmp" success = False try: - output_size, total_steps, total_demos = _merge(infos, tmp_path) + output_size, total_steps, total_demos, dropped = _merge(infos, tmp_path) os.replace(tmp_path, args.output_file) success = True except Exception as e: @@ -506,6 +592,7 @@ def main(argv: list[str] | None = None) -> int: output_size_bytes=output_size, report=report, dry_run=False, + dropped=dropped, ) print(f"Merged dataset written to: {args.output_file}") return 0 diff --git a/isaaclab_arena/tests/test_merge_demos.py b/isaaclab_arena/tests/test_merge_demos.py index 862645cdd..67f2a6089 100644 --- a/isaaclab_arena/tests/test_merge_demos.py +++ b/isaaclab_arena/tests/test_merge_demos.py @@ -366,24 +366,6 @@ def test_output_parent_directory_created(tmp_path): assert nested_out.exists() -def test_demo_without_step_info_warns(tmp_path, capsys): - """A demo with no num_samples attr and no actions dataset should produce a warning.""" - a = tmp_path / "a.hdf5" - _make_dataset(str(a), num_demos=2) - # Strip num_samples and remove the actions dataset from demo_1 to simulate a broken demo - with h5py.File(a, "r+") as f: - demo = f["data/demo_1"] - del demo.attrs["num_samples"] - del demo["actions"] - - out = tmp_path / "merged.hdf5" - assert _run_merge([str(a), "-o", str(out)]) == 0 - captured = capsys.readouterr() - combined = captured.out + captured.err - assert "demo_1" in combined - assert "num_samples" in combined or "actions" in combined - - def test_non_hdf5_input_produces_clean_error(tmp_path, capsys): """A plain-text file with .hdf5 extension should produce a clean ERROR, not a traceback.""" bogus = tmp_path / "not_actually_hdf5.hdf5" @@ -405,7 +387,7 @@ def test_merge_failure_cleans_up_partial_output(tmp_path, monkeypatch, capsys): out = tmp_path / "merged.hdf5" _make_dataset(str(a)) - def _failing_merge(infos, output_path): + def _failing_merge(infos, output_path, **kwargs): with open(output_path, "wb") as f: f.write(b"partial garbage") raise OSError("simulated disk full") @@ -429,7 +411,7 @@ def test_overwrite_failure_preserves_existing_output(tmp_path, monkeypatch): # Sentinel episode length proves the original file is untouched after the failure. _make_dataset(str(out), num_demos=1, base_episode_len=4242) - def _failing_merge(infos, output_path): + def _failing_merge(infos, output_path, **kwargs): with open(output_path, "wb") as f: f.write(b"partial garbage") raise OSError("simulated mid-merge failure") @@ -442,6 +424,124 @@ def _failing_merge(infos, output_path): assert not (tmp_path / "merged.hdf5.tmp").exists() +def _truncate_demo_actions(path, demo_name, action_dim=8): + """Replace a demo's actions with a (0, action_dim) zero-length dataset in-place.""" + with h5py.File(path, "r+") as f: + demo = f["data"][demo_name] + del demo["actions"] + demo.create_dataset( + "actions", + data=np.empty((0, action_dim), dtype=np.float32), + compression="gzip", + ) + demo.attrs["num_samples"] = 0 + + +def test_empty_demo_dropped_by_default(tmp_path, capsys): + """Default behavior: zero-length actions demos are dropped and survivors renumber.""" + a = tmp_path / "a.hdf5" + b = tmp_path / "b.hdf5" + out = tmp_path / "merged.hdf5" + _make_dataset(str(a), num_demos=3, base_episode_len=100) + _make_dataset(str(b), num_demos=2, base_episode_len=200) + _truncate_demo_actions(str(a), "demo_1") + _truncate_demo_actions(str(b), "demo_0") + + assert _run_merge([str(a), str(b), "-o", str(out)]) == 0 + + with h5py.File(out, "r") as f: + names = _h5_demo_names(out) + # 5 inputs - 2 empty = 3 survivors, renumbered demo_0..demo_2 + assert names == ["demo_0", "demo_1", "demo_2"] + for n in names: + assert f[f"data/{n}/actions"].shape[0] > 0 + # data.attrs["total"] excludes the dropped demos' contribution + # a: 100 + 102 = 202 (demo_1 dropped); b: 201 (demo_0 dropped) + assert int(f["data"].attrs["total"]) == 202 + 201 + + combined = capsys.readouterr().out + assert "Dropped 2 empty demo(s)" in combined + assert "demo_1" in combined + assert "demo_0" in combined + + +def test_single_input_with_empty_demo_drops_and_succeeds(tmp_path): + """A single-file merge must still drop empty demos and produce a valid output.""" + a = tmp_path / "a.hdf5" + out = tmp_path / "merged.hdf5" + _make_dataset(str(a), num_demos=4, base_episode_len=50) + _truncate_demo_actions(str(a), "demo_2") + + assert _run_merge([str(a), "-o", str(out)]) == 0 + with h5py.File(out, "r") as f: + names = _h5_demo_names(out) + assert names == ["demo_0", "demo_1", "demo_2"] + for n in names: + assert f[f"data/{n}/actions"].shape[0] > 0 + # 50 + 51 + 53 = 154 (original demo_2 with num_samples=52 was dropped) + assert int(f["data"].attrs["total"]) == 50 + 51 + 53 + + +def test_all_empty_input_rejected(tmp_path, capsys): + """If every input demo is empty, the script must abort instead of writing a useless file.""" + a = tmp_path / "a.hdf5" + out = tmp_path / "merged.hdf5" + _make_dataset(str(a), num_demos=2) + _truncate_demo_actions(str(a), "demo_0") + _truncate_demo_actions(str(a), "demo_1") + + assert _run_merge([str(a), "-o", str(out)]) != 0 + assert not out.exists() + # Single readouterr() call so both stdout and stderr are captured. The error message is + # written to stderr (and the summary table to stdout). + captured = capsys.readouterr() + combined = captured.out + captured.err + assert "every input demo is empty" in combined + + +def test_intra_file_schema_mismatch_detected(tmp_path, capsys): + """If a single demo within a file lacks an obs key the others have, the merge must error.""" + a = tmp_path / "a.hdf5" + out = tmp_path / "merged.hdf5" + _make_dataset( + str(a), + num_demos=3, + camera_shapes={"robot_head_cam_rgb": (12, 16)}, + base_episode_len=4, + ) + # demo_2 loses its camera observation entirely — the kind of half-recorded session that + # silently breaks LeRobot conversion downstream. + with h5py.File(a, "r+") as f: + del f["data/demo_2/camera_obs/robot_head_cam_rgb"] + + assert _run_merge([str(a), "-o", str(out)]) != 0 + assert not out.exists() + captured = capsys.readouterr() + combined = captured.out + captured.err + assert "demo_2" in combined + # The intra-file mismatch must be surfaced (either as an explicit message or via the + # validation status row), not silently swallowed. + assert "differs" in combined or "MISMATCH" in combined + + +def test_intra_file_mismatch_dropped_when_empty(tmp_path): + """A demo that diverges from the file schema solely because it is empty is auto-dropped.""" + a = tmp_path / "a.hdf5" + out = tmp_path / "merged.hdf5" + _make_dataset(str(a), num_demos=3) + # demo_1 becomes empty by losing actions; stripping its obs/ further makes its key set + # diverge from demo_0's, exercising the "empty AND intra-file mismatch" code path. + with h5py.File(a, "r+") as f: + demo = f["data/demo_1"] + del demo.attrs["num_samples"] + del demo["actions"] + del demo["obs"] + + # The empty demo is dropped before its schema divergence can poison validation. + assert _run_merge([str(a), "-o", str(out)]) == 0 + assert _h5_demo_names(out) == ["demo_0", "demo_1"] + + def test_merged_file_loads_via_isaaclab_handler(tmp_path): """Property test: merged file must be readable by isaaclab's HDF5DatasetFileHandler.""" try: