Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,22 @@ geometry, and prints a per-file summary with the demo and step counts:
input files are listed, so list the sessions chronologically if you want the merged file to
reflect the order of collection.

.. note::

Empty demos (no ``actions`` dataset or zero-length actions) are filtered out automatically
— they can appear after an interrupted recording session and would silently break
``replay_demos.py`` and the LeRobot conversion downstream. Dropped demos are listed by
source file in the merge summary, and the surviving demos are renumbered contiguously.
The script also works with a single input file (it copies the file, drops empties, and
recomputes ``data.attrs["total"]``), which is a quick way to sanitize a single
half-finished session before training.

If a session was recorded against a slightly different environment (e.g. a different physics
timestep) the merge will warn but still proceed. Schema-level differences (different action
dimensions, missing observation keys, different camera resolutions) are hard errors: re-record
the offending session against the canonical environment instead.
timestep) the merge will warn but still proceed. Cross-file schema-level differences (different
action dimensions, missing observation keys, different camera resolutions) and intra-file
inconsistencies on non-empty demos (one valid demo lacks a key the others have, e.g. a
missing camera observation) are hard errors — re-record the offending session against the
canonical environment.


Step 5: Replay Recorded Demos (Optional)
Expand Down
131 changes: 109 additions & 22 deletions isaaclab_arena/scripts/imitation_learning/merge_demos.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@
- Recomputes ``data.attrs["total"]`` from the per-demo ``num_samples`` sum.
- Validates structural compatibility across inputs (``format_version``, dataset paths, per-key
shapes/dtypes) and warns on ``env_args`` mismatches.
- Drops empty demos (no ``actions`` dataset or ``actions.shape[0] == 0``) automatically,
since they break replay and post-training. Dropped demos are listed in the merge summary.
- Logs an operator-friendly per-file summary and aggregate report.
- Uses a recursive ``h5py.Group.copy`` so new recorder terms added by future Isaac Lab versions
(new ``obs/*`` keys, new sensor groups, new metadata attrs) round-trip unchanged.

The script has zero simulation dependency and only requires ``h5py``.
The script has zero simulation dependency and only requires ``h5py``. A single input file is
supported and equivalent to a copy with empty demos dropped and per-demo ``num_samples`` totals
recomputed.

Example
-------
Expand Down Expand Up @@ -60,7 +64,12 @@ class _FileInfo:
success_count: int = 0
no_success_attr_count: int = 0
failed_count: int = 0
untracked_step_demos: list[str] = field(default_factory=list)
# Demos that won't replay or train: either no `actions` dataset or `actions.shape[0] == 0`.
empty_demo_names: list[str] = field(default_factory=list)
# Demos within this file whose dataset key/shape/dtype set differs from demo_0's. This
# surfaces single-recording corruption that the cross-file schema check (which samples
# one demo per input) cannot see.
intra_file_schema_mismatches: list[str] = field(default_factory=list)


def _format_bytes(num_bytes: float) -> str:
Expand Down Expand Up @@ -140,25 +149,39 @@ def _inspect_file(path: str) -> _FileInfo:
if num_demos == 0:
raise ValueError(f"{path}: contains no 'demo_*' groups; nothing to merge")

# Sample the schema from the first demo only. record_demos.py writes a uniform
# recorder stack within a single file, so this is the fast path. Intra-file
# inconsistency (e.g. a file that was manually edited mid-session) is not detected
# here; cross-file consistency is what _validate_compatibility checks.
# Sample the file-level reference schema from the first demo. record_demos.py writes
# a uniform recorder stack within a single file, so demo_0 is a reasonable reference;
# the per-demo loop below catches the rare case where a single demo in the file
# diverges (e.g. partial recording, manual edit).
schema_fingerprint = _build_schema_fingerprint(data_group[demo_names[0]])
Comment on lines +152 to 156
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 False intra-file schema error when demo_0 is the empty demo (missing actions key)

The reference fingerprint is computed from demo_names[0] (line 156) before the loop that classifies empty demos. If demo_0 has no actions key at all (not merely a zero-length actions array, but the key is entirely absent — e.g. from a very early interrupted recording), schema_fingerprint will not contain actions. Every subsequent valid demo does have actions, so it is flagged as a schema mismatch. Those valid demos land in intra_file_schema_mismatches but are not in empty_set, making them "actionable". The merge aborts with a false error naming the good demos as the problem while the actual culprit (demo_0) is never called out.

The existing test suite only exercises the zero-length-actions path via _truncate_demo_actions (which keeps the actions key with shape (0, action_dim) so shape[1:] matches), so this scenario has no coverage. A recording session where the very first demo never wrote a single step is a realistic edge case for interrupted teleop sessions.


total_steps = 0
success_count = 0
no_success_attr_count = 0
failed_count = 0
untracked_step_demos: list[str] = []
empty_demo_names: list[str] = []
intra_file_schema_mismatches: list[str] = []
for name in demo_names:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: When "actions" is missing entirely, actions_len becomes 0, which is correct. However, the empty-demo detection below distinguishes between "no actions dataset" and "zero-length actions." Consider logging which case was hit for debugging purposes (e.g., missing actions dataset vs actions.shape[0] == 0), since these can indicate different recorder failure modes.

demo = data_group[name]
actions_len = int(demo["actions"].shape[0]) if "actions" in demo else 0

if "num_samples" in demo.attrs:
total_steps += int(demo.attrs["num_samples"])
elif "actions" in demo:
total_steps += int(demo["actions"].shape[0])
else:
untracked_step_demos.append(name)
total_steps += actions_len

# "Empty" = unusable for replay / training. Either there is no actions dataset at
# all, or it has zero timesteps. record_demos.py shouldn't normally export these,
# but partial recordings and interrupted sessions can produce them.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion (minor): The intra-file schema check builds a fingerprint for every non-first demo, which is O(keys × demos). For files with many demos (>100) this could add noticeable overhead. Consider short-circuiting after the first mismatch is found since one is enough to raise an error:

if name != demo_names[0] and not intra_file_schema_mismatches:
    demo_fp = _build_schema_fingerprint(demo)
    if demo_fp != schema_fingerprint:
        intra_file_schema_mismatches.append(name)

Alternatively, if you want to report all mismatches, this is fine as-is, but perhaps add a comment noting the intentional choice.

if "actions" not in demo or actions_len == 0:
empty_demo_names.append(name)

# Intra-file schema mismatch: any demo whose key set / shape / dtype differs from
# the file's reference (demo_0). Skip demo_0 itself.
if name != demo_names[0]:
demo_fp = _build_schema_fingerprint(demo)
if demo_fp != schema_fingerprint:
intra_file_schema_mismatches.append(name)

if "success" in demo.attrs:
if bool(demo.attrs["success"]):
Expand All @@ -179,7 +202,8 @@ def _inspect_file(path: str) -> _FileInfo:
success_count=success_count,
no_success_attr_count=no_success_attr_count,
failed_count=failed_count,
untracked_step_demos=untracked_step_demos,
empty_demo_names=empty_demo_names,
intra_file_schema_mismatches=intra_file_schema_mismatches,
)


Expand Down Expand Up @@ -214,7 +238,12 @@ class _ValidationReport:


def _validate_compatibility(infos: list[_FileInfo]) -> _ValidationReport:
"""Compare input files for compatibility and produce a :class:`_ValidationReport`."""
"""Compare input files for compatibility and produce a :class:`_ValidationReport`.

Empty demos (no ``actions`` dataset or ``actions.shape[0] == 0``) are always dropped from
the merged output, so intra-file schema mismatches whose only divergence is the empty
demo itself are not surfaced as errors here.
"""
report = _ValidationReport()

versions = {i.format_version for i in infos}
Expand Down Expand Up @@ -263,13 +292,31 @@ def _validate_compatibility(infos: list[_FileInfo]) -> _ValidationReport:
report.info.append(
f"{i.path}: {i.no_success_attr_count} demo(s) without @success attribute (legacy format)."
)
if i.untracked_step_demos:
shown = ", ".join(i.untracked_step_demos[:3])
suffix = ", ..." if len(i.untracked_step_demos) > 3 else ""
if i.empty_demo_names:
shown = ", ".join(i.empty_demo_names[:3])
suffix = ", ..." if len(i.empty_demo_names) > 3 else ""
report.warnings.append(
f"{i.path}: {len(i.untracked_step_demos)} demo(s) without 'num_samples' attribute"
f" or 'actions' dataset ({shown}{suffix}); contributed 0 to the reported step total."
f"{i.path}: {len(i.empty_demo_names)} empty demo(s) ({shown}{suffix}) — missing"
" 'actions' or actions.shape[0] == 0. These break replay and post-training and"
" will be dropped from the merged output."
)
if i.intra_file_schema_mismatches:
empty_set = set(i.empty_demo_names)
# Demos that diverge from the file's reference schema solely because they are
# empty will be dropped before being written, so they cannot break downstream
# consumers. Real intra-file inconsistency (e.g. a missing camera obs on an
# otherwise valid demo) still surfaces as a hard error.
actionable = [name for name in i.intra_file_schema_mismatches if name not in empty_set]
if actionable:
shown = ", ".join(actionable[:3])
suffix = ", ..." if len(actionable) > 3 else ""
report.schema_status = "MISMATCH"
report.errors.append(
f"{i.path}: {len(actionable)} demo(s) ({shown}{suffix}) have a key/shape/dtype"
" set that differs from demo_0 in the same file (e.g. a missing camera"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Observation: The error message suggests "drop them upstream or re-record," but this PR now auto-drops empty demos. Consider clarifying that non-empty demos with schema mismatches require manual intervention, while empty mismatches are auto-handled.

" observation). Inspect the offending demos and either drop them upstream"
" or re-record the session."
)

return report

Expand All @@ -283,6 +330,7 @@ def _print_summary(
output_size_bytes: int | None = None,
report: _ValidationReport | None = None,
dry_run: bool = False,
dropped: list[str] | None = None,
) -> None:
"""Print an operator-friendly per-file table, aggregate row, and validation summary."""
n = len(infos)
Expand All @@ -295,6 +343,7 @@ def _print_summary(
print()
for idx, i in enumerate(infos, start=1):
env_name = i.env_args.get("env_name", "")
empty_str = f" empty={len(i.empty_demo_names)}" if i.empty_demo_names else ""
print(
f"[{idx}/{n}] {os.path.basename(i.path):<{max_name_len}} "
f"demos={i.num_demos:>4d} "
Expand All @@ -303,6 +352,7 @@ def _print_summary(
f'env="{env_name}" '
f"v={i.format_version} "
f"keys={len(i.schema_fingerprint)}"
f"{empty_str}"
)

print(_TABLE_SEP_CHAR * (max_name_len + 65))
Expand Down Expand Up @@ -336,16 +386,34 @@ def _print_summary(
for msg in report.errors:
print(f"ERROR: {msg}")

if dropped:
shown = ", ".join(dropped[:5])
suffix = ", ..." if len(dropped) > 5 else ""
print(f"Dropped {len(dropped)} empty demo(s): {shown}{suffix}")

if total_demos > 0:
print(f"Demo numbering: demo_0..demo_{total_demos - 1} (input order preserved)")
numbering_note = "input order preserved"
if dropped:
numbering_note += f"; {len(dropped)} empty demo(s) skipped"
print(f"Demo numbering: demo_0..demo_{total_demos - 1} ({numbering_note})")
print()


def _merge(infos: list[_FileInfo], output_path: str) -> tuple[int, int, int]:
def _merge(infos: list[_FileInfo], output_path: str) -> tuple[int, int, int, list[str]]:
"""Write a merged HDF5 dataset from validated inputs.

Empty demos (no ``actions`` dataset or ``actions.shape[0] == 0``) are always skipped:
they cannot be replayed or used for training, so including them would only break
downstream consumers.

Args:
infos: Per-input file summaries produced by :func:`_inspect_file`.
output_path: Destination HDF5 path.

Returns:
A tuple ``(output_size_bytes, total_steps_written, total_demos_written)``.
``(output_size_bytes, total_steps_written, total_demos_written, dropped_log)`` where
``dropped_log`` is a list of ``"{source_path}::{demo_name}"`` strings naming every
empty demo that was skipped. Empty when no input demo was empty.
"""
format_version = infos[0].format_version
merged_env_args = dict(infos[0].env_args)
Expand All @@ -356,16 +424,21 @@ def _merge(infos: list[_FileInfo], output_path: str) -> tuple[int, int, int]:

total_demos_written = 0
total_steps_written = 0
dropped: list[str] = []

with h5py.File(output_path, "w") as out:
out.attrs["format_version"] = format_version
data_out = out.create_group("data")
data_out.attrs["env_args"] = json.dumps(merged_env_args)

for info in infos:
skip_set: set[str] = set(info.empty_demo_names)
with h5py.File(info.path, "r") as src:
src_data = src["data"]
for src_demo_name in _sorted_demo_names(src_data):
if src_demo_name in skip_set:
dropped.append(f"{info.path}::{src_demo_name}")
continue
dst_demo_name = f"demo_{total_demos_written}"
try:
src.copy(src_data[src_demo_name], data_out, name=dst_demo_name)
Expand All @@ -387,7 +460,7 @@ def _merge(infos: list[_FileInfo], output_path: str) -> tuple[int, int, int]:
data_out.attrs["total"] = total_steps_written

output_size = os.path.getsize(output_path)
return output_size, total_steps_written, total_demos_written
return output_size, total_steps_written, total_demos_written, dropped


def _build_parser() -> argparse.ArgumentParser:
Expand Down Expand Up @@ -466,6 +539,19 @@ def main(argv: list[str] | None = None) -> int:

report = _validate_compatibility(infos)

# Pre-flight: if every input demo is empty, the merged output would have a useless
# zero-demo /data group that downstream loaders can't consume. Refuse rather than
# write a structurally empty file.
total_input_demos = sum(i.num_demos for i in infos)
total_empty_demos = sum(len(i.empty_demo_names) for i in infos)
if total_input_demos > 0 and total_empty_demos == total_input_demos:
_print_summary(infos, args.output_file, report=report, dry_run=True)
print(
"ERROR: every input demo is empty (no 'actions' or actions.shape[0] == 0). Nothing to merge.",
file=sys.stderr,
)
return 1

if args.dry_run:
_print_summary(infos, args.output_file, report=report, dry_run=True)
return 1 if report.errors else 0
Expand All @@ -487,7 +573,7 @@ def main(argv: list[str] | None = None) -> int:
tmp_path = args.output_file + ".tmp"
success = False
try:
output_size, total_steps, total_demos = _merge(infos, tmp_path)
output_size, total_steps, total_demos, dropped = _merge(infos, tmp_path)
os.replace(tmp_path, args.output_file)
success = True
except Exception as e:
Expand All @@ -506,6 +592,7 @@ def main(argv: list[str] | None = None) -> int:
output_size_bytes=output_size,
report=report,
dry_run=False,
dropped=dropped,
)
print(f"Merged dataset written to: {args.output_file}")
return 0
Expand Down
Loading
Loading