-
Notifications
You must be signed in to change notification settings - Fork 52
Drop empty demos by default in merge_demos.py #728
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,11 +18,15 @@ | |
| - Recomputes ``data.attrs["total"]`` from the per-demo ``num_samples`` sum. | ||
| - Validates structural compatibility across inputs (``format_version``, dataset paths, per-key | ||
| shapes/dtypes) and warns on ``env_args`` mismatches. | ||
| - Drops empty demos (no ``actions`` dataset or ``actions.shape[0] == 0``) automatically, | ||
| since they break replay and post-training. Dropped demos are listed in the merge summary. | ||
| - Logs an operator-friendly per-file summary and aggregate report. | ||
| - Uses a recursive ``h5py.Group.copy`` so new recorder terms added by future Isaac Lab versions | ||
| (new ``obs/*`` keys, new sensor groups, new metadata attrs) round-trip unchanged. | ||
|
|
||
| The script has zero simulation dependency and only requires ``h5py``. | ||
| The script has zero simulation dependency and only requires ``h5py``. A single input file is | ||
| supported and equivalent to a copy with empty demos dropped and per-demo ``num_samples`` totals | ||
| recomputed. | ||
|
|
||
| Example | ||
| ------- | ||
|
|
@@ -60,7 +64,12 @@ class _FileInfo: | |
| success_count: int = 0 | ||
| no_success_attr_count: int = 0 | ||
| failed_count: int = 0 | ||
| untracked_step_demos: list[str] = field(default_factory=list) | ||
| # Demos that won't replay or train: either no `actions` dataset or `actions.shape[0] == 0`. | ||
| empty_demo_names: list[str] = field(default_factory=list) | ||
| # Demos within this file whose dataset key/shape/dtype set differs from demo_0's. This | ||
| # surfaces single-recording corruption that the cross-file schema check (which samples | ||
| # one demo per input) cannot see. | ||
| intra_file_schema_mismatches: list[str] = field(default_factory=list) | ||
|
|
||
|
|
||
| def _format_bytes(num_bytes: float) -> str: | ||
|
|
@@ -140,25 +149,39 @@ def _inspect_file(path: str) -> _FileInfo: | |
| if num_demos == 0: | ||
| raise ValueError(f"{path}: contains no 'demo_*' groups; nothing to merge") | ||
|
|
||
| # Sample the schema from the first demo only. record_demos.py writes a uniform | ||
| # recorder stack within a single file, so this is the fast path. Intra-file | ||
| # inconsistency (e.g. a file that was manually edited mid-session) is not detected | ||
| # here; cross-file consistency is what _validate_compatibility checks. | ||
| # Sample the file-level reference schema from the first demo. record_demos.py writes | ||
| # a uniform recorder stack within a single file, so demo_0 is a reasonable reference; | ||
| # the per-demo loop below catches the rare case where a single demo in the file | ||
| # diverges (e.g. partial recording, manual edit). | ||
| schema_fingerprint = _build_schema_fingerprint(data_group[demo_names[0]]) | ||
|
|
||
| total_steps = 0 | ||
| success_count = 0 | ||
| no_success_attr_count = 0 | ||
| failed_count = 0 | ||
| untracked_step_demos: list[str] = [] | ||
| empty_demo_names: list[str] = [] | ||
| intra_file_schema_mismatches: list[str] = [] | ||
| for name in demo_names: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: When |
||
| demo = data_group[name] | ||
| actions_len = int(demo["actions"].shape[0]) if "actions" in demo else 0 | ||
|
|
||
| if "num_samples" in demo.attrs: | ||
| total_steps += int(demo.attrs["num_samples"]) | ||
| elif "actions" in demo: | ||
| total_steps += int(demo["actions"].shape[0]) | ||
| else: | ||
| untracked_step_demos.append(name) | ||
| total_steps += actions_len | ||
|
|
||
| # "Empty" = unusable for replay / training. Either there is no actions dataset at | ||
| # all, or it has zero timesteps. record_demos.py shouldn't normally export these, | ||
| # but partial recordings and interrupted sessions can produce them. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggestion (minor): The intra-file schema check builds a fingerprint for every non-first demo, which is O(keys × demos). For files with many demos (>100) this could add noticeable overhead. Consider short-circuiting after the first mismatch is found since one is enough to raise an error: if name != demo_names[0] and not intra_file_schema_mismatches:
demo_fp = _build_schema_fingerprint(demo)
if demo_fp != schema_fingerprint:
intra_file_schema_mismatches.append(name)Alternatively, if you want to report all mismatches, this is fine as-is, but perhaps add a comment noting the intentional choice. |
||
| if "actions" not in demo or actions_len == 0: | ||
| empty_demo_names.append(name) | ||
|
|
||
| # Intra-file schema mismatch: any demo whose key set / shape / dtype differs from | ||
| # the file's reference (demo_0). Skip demo_0 itself. | ||
| if name != demo_names[0]: | ||
| demo_fp = _build_schema_fingerprint(demo) | ||
| if demo_fp != schema_fingerprint: | ||
| intra_file_schema_mismatches.append(name) | ||
|
|
||
| if "success" in demo.attrs: | ||
| if bool(demo.attrs["success"]): | ||
|
|
@@ -179,7 +202,8 @@ def _inspect_file(path: str) -> _FileInfo: | |
| success_count=success_count, | ||
| no_success_attr_count=no_success_attr_count, | ||
| failed_count=failed_count, | ||
| untracked_step_demos=untracked_step_demos, | ||
| empty_demo_names=empty_demo_names, | ||
| intra_file_schema_mismatches=intra_file_schema_mismatches, | ||
| ) | ||
|
|
||
|
|
||
|
|
@@ -214,7 +238,12 @@ class _ValidationReport: | |
|
|
||
|
|
||
| def _validate_compatibility(infos: list[_FileInfo]) -> _ValidationReport: | ||
| """Compare input files for compatibility and produce a :class:`_ValidationReport`.""" | ||
| """Compare input files for compatibility and produce a :class:`_ValidationReport`. | ||
|
|
||
| Empty demos (no ``actions`` dataset or ``actions.shape[0] == 0``) are always dropped from | ||
| the merged output, so intra-file schema mismatches whose only divergence is the empty | ||
| demo itself are not surfaced as errors here. | ||
| """ | ||
| report = _ValidationReport() | ||
|
|
||
| versions = {i.format_version for i in infos} | ||
|
|
@@ -263,13 +292,31 @@ def _validate_compatibility(infos: list[_FileInfo]) -> _ValidationReport: | |
| report.info.append( | ||
| f"{i.path}: {i.no_success_attr_count} demo(s) without @success attribute (legacy format)." | ||
| ) | ||
| if i.untracked_step_demos: | ||
| shown = ", ".join(i.untracked_step_demos[:3]) | ||
| suffix = ", ..." if len(i.untracked_step_demos) > 3 else "" | ||
| if i.empty_demo_names: | ||
| shown = ", ".join(i.empty_demo_names[:3]) | ||
| suffix = ", ..." if len(i.empty_demo_names) > 3 else "" | ||
| report.warnings.append( | ||
| f"{i.path}: {len(i.untracked_step_demos)} demo(s) without 'num_samples' attribute" | ||
| f" or 'actions' dataset ({shown}{suffix}); contributed 0 to the reported step total." | ||
| f"{i.path}: {len(i.empty_demo_names)} empty demo(s) ({shown}{suffix}) — missing" | ||
| " 'actions' or actions.shape[0] == 0. These break replay and post-training and" | ||
| " will be dropped from the merged output." | ||
| ) | ||
| if i.intra_file_schema_mismatches: | ||
| empty_set = set(i.empty_demo_names) | ||
| # Demos that diverge from the file's reference schema solely because they are | ||
| # empty will be dropped before being written, so they cannot break downstream | ||
| # consumers. Real intra-file inconsistency (e.g. a missing camera obs on an | ||
| # otherwise valid demo) still surfaces as a hard error. | ||
| actionable = [name for name in i.intra_file_schema_mismatches if name not in empty_set] | ||
| if actionable: | ||
| shown = ", ".join(actionable[:3]) | ||
| suffix = ", ..." if len(actionable) > 3 else "" | ||
| report.schema_status = "MISMATCH" | ||
| report.errors.append( | ||
| f"{i.path}: {len(actionable)} demo(s) ({shown}{suffix}) have a key/shape/dtype" | ||
| " set that differs from demo_0 in the same file (e.g. a missing camera" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Observation: The error message suggests "drop them upstream or re-record," but this PR now auto-drops empty demos. Consider clarifying that non-empty demos with schema mismatches require manual intervention, while empty mismatches are auto-handled. |
||
| " observation). Inspect the offending demos and either drop them upstream" | ||
| " or re-record the session." | ||
| ) | ||
|
|
||
| return report | ||
|
|
||
|
|
@@ -283,6 +330,7 @@ def _print_summary( | |
| output_size_bytes: int | None = None, | ||
| report: _ValidationReport | None = None, | ||
| dry_run: bool = False, | ||
| dropped: list[str] | None = None, | ||
| ) -> None: | ||
| """Print an operator-friendly per-file table, aggregate row, and validation summary.""" | ||
| n = len(infos) | ||
|
|
@@ -295,6 +343,7 @@ def _print_summary( | |
| print() | ||
| for idx, i in enumerate(infos, start=1): | ||
| env_name = i.env_args.get("env_name", "") | ||
| empty_str = f" empty={len(i.empty_demo_names)}" if i.empty_demo_names else "" | ||
| print( | ||
| f"[{idx}/{n}] {os.path.basename(i.path):<{max_name_len}} " | ||
| f"demos={i.num_demos:>4d} " | ||
|
|
@@ -303,6 +352,7 @@ def _print_summary( | |
| f'env="{env_name}" ' | ||
| f"v={i.format_version} " | ||
| f"keys={len(i.schema_fingerprint)}" | ||
| f"{empty_str}" | ||
| ) | ||
|
|
||
| print(_TABLE_SEP_CHAR * (max_name_len + 65)) | ||
|
|
@@ -336,16 +386,34 @@ def _print_summary( | |
| for msg in report.errors: | ||
| print(f"ERROR: {msg}") | ||
|
|
||
| if dropped: | ||
| shown = ", ".join(dropped[:5]) | ||
| suffix = ", ..." if len(dropped) > 5 else "" | ||
| print(f"Dropped {len(dropped)} empty demo(s): {shown}{suffix}") | ||
|
|
||
| if total_demos > 0: | ||
| print(f"Demo numbering: demo_0..demo_{total_demos - 1} (input order preserved)") | ||
| numbering_note = "input order preserved" | ||
| if dropped: | ||
| numbering_note += f"; {len(dropped)} empty demo(s) skipped" | ||
| print(f"Demo numbering: demo_0..demo_{total_demos - 1} ({numbering_note})") | ||
| print() | ||
|
|
||
|
|
||
| def _merge(infos: list[_FileInfo], output_path: str) -> tuple[int, int, int]: | ||
| def _merge(infos: list[_FileInfo], output_path: str) -> tuple[int, int, int, list[str]]: | ||
| """Write a merged HDF5 dataset from validated inputs. | ||
|
|
||
| Empty demos (no ``actions`` dataset or ``actions.shape[0] == 0``) are always skipped: | ||
| they cannot be replayed or used for training, so including them would only break | ||
| downstream consumers. | ||
|
|
||
| Args: | ||
| infos: Per-input file summaries produced by :func:`_inspect_file`. | ||
| output_path: Destination HDF5 path. | ||
|
|
||
| Returns: | ||
| A tuple ``(output_size_bytes, total_steps_written, total_demos_written)``. | ||
| ``(output_size_bytes, total_steps_written, total_demos_written, dropped_log)`` where | ||
| ``dropped_log`` is a list of ``"{source_path}::{demo_name}"`` strings naming every | ||
| empty demo that was skipped. Empty when no input demo was empty. | ||
| """ | ||
| format_version = infos[0].format_version | ||
| merged_env_args = dict(infos[0].env_args) | ||
|
|
@@ -356,16 +424,21 @@ def _merge(infos: list[_FileInfo], output_path: str) -> tuple[int, int, int]: | |
|
|
||
| total_demos_written = 0 | ||
| total_steps_written = 0 | ||
| dropped: list[str] = [] | ||
|
|
||
| with h5py.File(output_path, "w") as out: | ||
| out.attrs["format_version"] = format_version | ||
| data_out = out.create_group("data") | ||
| data_out.attrs["env_args"] = json.dumps(merged_env_args) | ||
|
|
||
| for info in infos: | ||
| skip_set: set[str] = set(info.empty_demo_names) | ||
| with h5py.File(info.path, "r") as src: | ||
| src_data = src["data"] | ||
| for src_demo_name in _sorted_demo_names(src_data): | ||
| if src_demo_name in skip_set: | ||
| dropped.append(f"{info.path}::{src_demo_name}") | ||
| continue | ||
| dst_demo_name = f"demo_{total_demos_written}" | ||
| try: | ||
| src.copy(src_data[src_demo_name], data_out, name=dst_demo_name) | ||
|
|
@@ -387,7 +460,7 @@ def _merge(infos: list[_FileInfo], output_path: str) -> tuple[int, int, int]: | |
| data_out.attrs["total"] = total_steps_written | ||
|
|
||
| output_size = os.path.getsize(output_path) | ||
| return output_size, total_steps_written, total_demos_written | ||
| return output_size, total_steps_written, total_demos_written, dropped | ||
|
|
||
|
|
||
| def _build_parser() -> argparse.ArgumentParser: | ||
|
|
@@ -466,6 +539,19 @@ def main(argv: list[str] | None = None) -> int: | |
|
|
||
| report = _validate_compatibility(infos) | ||
|
|
||
| # Pre-flight: if every input demo is empty, the merged output would have a useless | ||
| # zero-demo /data group that downstream loaders can't consume. Refuse rather than | ||
| # write a structurally empty file. | ||
| total_input_demos = sum(i.num_demos for i in infos) | ||
| total_empty_demos = sum(len(i.empty_demo_names) for i in infos) | ||
| if total_input_demos > 0 and total_empty_demos == total_input_demos: | ||
| _print_summary(infos, args.output_file, report=report, dry_run=True) | ||
| print( | ||
| "ERROR: every input demo is empty (no 'actions' or actions.shape[0] == 0). Nothing to merge.", | ||
| file=sys.stderr, | ||
| ) | ||
| return 1 | ||
|
|
||
| if args.dry_run: | ||
| _print_summary(infos, args.output_file, report=report, dry_run=True) | ||
| return 1 if report.errors else 0 | ||
|
|
@@ -487,7 +573,7 @@ def main(argv: list[str] | None = None) -> int: | |
| tmp_path = args.output_file + ".tmp" | ||
| success = False | ||
| try: | ||
| output_size, total_steps, total_demos = _merge(infos, tmp_path) | ||
| output_size, total_steps, total_demos, dropped = _merge(infos, tmp_path) | ||
| os.replace(tmp_path, args.output_file) | ||
| success = True | ||
| except Exception as e: | ||
|
|
@@ -506,6 +592,7 @@ def main(argv: list[str] | None = None) -> int: | |
| output_size_bytes=output_size, | ||
| report=report, | ||
| dry_run=False, | ||
| dropped=dropped, | ||
| ) | ||
| print(f"Merged dataset written to: {args.output_file}") | ||
| return 0 | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
demo_0is the empty demo (missingactionskey)The reference fingerprint is computed from
demo_names[0](line 156) before the loop that classifies empty demos. Ifdemo_0has noactionskey at all (not merely a zero-length actions array, but the key is entirely absent — e.g. from a very early interrupted recording),schema_fingerprintwill not containactions. Every subsequent valid demo does haveactions, so it is flagged as a schema mismatch. Those valid demos land inintra_file_schema_mismatchesbut are not inempty_set, making them "actionable". The merge aborts with a false error naming the good demos as the problem while the actual culprit (demo_0) is never called out.The existing test suite only exercises the zero-length-
actionspath via_truncate_demo_actions(which keeps theactionskey with shape(0, action_dim)soshape[1:]matches), so this scenario has no coverage. A recording session where the very first demo never wrote a single step is a realistic edge case for interrupted teleop sessions.