Parallelize updates across variable groups, unifying updates and backfills#529
Parallelize updates across variable groups, unifying updates and backfills#529
Conversation
There was a problem hiding this comment.
Pull request overview
This PR updates the reformatter orchestration layer to support parallelizing operational updates the same way backfills are parallelized, with shared worker coordination logic that preserves reader safety for both Zarr v3 and Icechunk-backed datasets.
Changes:
- Unifies update + backfill processing into
DynamicalDataset._process_region_jobs(...), adding worker coordination via_internal/{job_name}/files. - Replaces
max_vars_per_backfill_jobwithmax_vars_per_joband removeskind/worker partitioning fromRegionJob.get_jobs()(partitioning now done byget_worker_jobs). - Adds Icechunk temp-branch strategy and Zarr v3 “defer metadata write until last worker” behavior; adds docs + tests for these workflows.
Reviewed changes
Copilot reviewed 30 out of 30 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/noaa/gfs/analysis/region_job_test.py | Adjusts expectations for job splitting by max_vars_per_job. |
| tests/noaa/gefs/forecast_35_day/region_job_test.py | Renames test to max_vars_per_job and updates get_jobs() call expectations. |
| tests/noaa/gefs/forecast_35_day/dynamical_dataset_test.py | Updates integration assertion to max_vars_per_job. |
| tests/noaa/gefs/analysis/region_job_test.py | Renames test to max_vars_per_job and updates get_jobs() call expectations. |
| tests/noaa/gefs/analysis/dynamical_dataset_test.py | Updates integration assertion to max_vars_per_job. |
| tests/ecmwf/ifs_ens/forecast_15_day_0_25_degree/region_job_test.py | Updates operational update job count/vars assertions due to per-var job splitting. |
| tests/datasets_test.py | Updates CLI command expectation from process-backfill-region-jobs to backfill. |
| tests/common/test_storage.py | Adds tests for coordination file IO, branch support, and icechunk repo enumeration. |
| tests/common/test_parallel_writes.py | New integration tests for multi-worker coordination and reader safety (Zarr v3 + Icechunk + replicas). |
| tests/common/region_job_test.py | Updates tests for get_jobs() API change and shifts worker partitioning to get_worker_jobs. |
| tests/common/dynamical_dataset_test.py | Updates dataset API tests for new backfill() name and changed orchestration behavior. |
| src/reformatters/noaa/mrms/conus_analysis_hourly/region_job.py | Removes deprecated kind= parameter from get_jobs() call. |
| src/reformatters/noaa/hrrr/region_job.py | Removes deprecated kind= parameter from get_jobs() call. |
| src/reformatters/noaa/gfs/region_job.py | Removes deprecated kind= parameter from get_jobs() call. |
| src/reformatters/noaa/gfs/analysis/region_job.py | Renames max_vars_per_backfill_job to max_vars_per_job. |
| src/reformatters/noaa/gefs/forecast_35_day/region_job.py | Renames max_vars_per_backfill_job to max_vars_per_job; removes kind=. |
| src/reformatters/noaa/gefs/analysis/region_job.py | Renames max_vars_per_backfill_job to max_vars_per_job; removes kind=. |
| src/reformatters/example/region_job.py | Updates commented example removing kind= usage. |
| src/reformatters/ecmwf/ifs_ens/forecast_15_day_0_25_degree/region_job.py | Renames max_vars_per_backfill_job to max_vars_per_job; removes kind=. |
| src/reformatters/ecmwf/aifs_deterministic/forecast/region_job.py | Removes deprecated kind= parameter from get_jobs() call. |
| src/reformatters/dwd/icon_eu/forecast_5_day/region_job.py | Removes deprecated kind= parameter from get_jobs() call. |
| src/reformatters/contrib/uarizona/swann/analysis/region_job.py | Removes deprecated kind= parameter from get_jobs() call. |
| src/reformatters/contrib/noaa/ndvi_cdr/analysis/region_job.py | Removes deprecated kind= parameter from get_jobs() call. |
| src/reformatters/contrib/nasa/smap/level3_36km_v9/region_job.py | Removes deprecated kind= parameter from get_jobs() call. |
| src/reformatters/common/zarr.py | Adds zarr3_only flag and consolidates metadata-copy store filtering logic. |
| src/reformatters/common/storage.py | Adds branch support for Icechunk stores, icechunk_repos(), and coordination file read/write/cleanup. |
| src/reformatters/common/region_job.py | Updates get_jobs() API (no kind/worker partitioning) and unifies variable splitting via max_vars_per_job. |
| src/reformatters/common/dynamical_dataset.py | Introduces unified parallel orchestration with setup/results/finalize phases, Icechunk temp branch flow, and deferred Zarr v3 metadata write. |
| docs/parallel_processing.md | New documentation describing the worker coordination protocol, reader safety, and failure modes. |
| AGENTS.md | Updates contributor docs to reflect parallel updates and adds docs directory listing. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
TODO
|
8fdae7c to
e8a75b4
Compare
…418) Unify update and backfill codepaths into a shared `_process_region_jobs` method that coordinates parallel writes across Kubernetes indexed jobs. - Rename `max_vars_per_backfill_job` to `max_vars_per_job`, applying variable group splitting to both updates and backfills - Remove `kind` parameter from `get_jobs()` and `worker_index`/`workers_total` (partitioning now handled by caller via `get_worker_jobs`) - Add `backfill()` method, rename CLI command from `process-backfill-region-jobs` - Add `worker_index`/`workers_total` parameters to `update()` for parallel updates - Icechunk: use temp branch strategy so readers on main never see partial data - Zarr v3: defer metadata write until last worker to prevent exposing empty holes - Worker coordination via `_internal/{job_name}/` files in object store - Deterministic branch names from job name for safe worker restarts - Two-pass finalize (reset all repos, then delete branches) with skip-if-already-reset - Replicas updated before primary so primary drives retry behavior - StoreFactory gains branch support, icechunk_repos(), and coordination file I/O - Add `zarr3_only` parameter to `copy_zarr_metadata` Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Split _finalize icechunk loop into two passes (reset all, then delete branches) with skip-if-already-reset for safe retries - Guard icechunk finalization on branch_name != "main" to skip for backfills - Add required sort kwarg to icechunk_repos() for explicit ordering - Remove worker_index/workers_total from get_jobs() (caller handles partitioning) - Add replica parallel write integration test - Add K8s pod_active_deadline timeout comments on polling loops Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…tup_info Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…setup_info - Fix dict overwrite bug in results merging: use setdefault+extend to properly accumulate source file coords across multiple jobs for the same variable (different regions) - Split _collect_results into _wait_for_workers + _collect_results so backfills (update_template_with_results=False) only wait for completion without loading/deserializing result files - Serialize setup_info as JSON instead of pickle to avoid code-execution risk from untrusted data in _internal/ Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
e8a75b4 to
92f95ca
Compare
Skip store creation and icechunk commit for workers with zero assigned jobs to avoid committing empty sessions. Split Sentry monitor checkins so worker 0 sends in_progress and the last worker sends ok/error. Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
Add update_num_variable_groups() on DynamicalDataset that computes the number of variable groups from source_groups and max_vars_per_job. Use it in the 4 datasets that set max_vars_per_job to auto-set workers_total and parallelism on their ReformatCronJob. Extract split_groups into common/iterating.py as a generic function. Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
…tes" This reverts commit 444f0e6.
Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
S3FileSystem caches directory listings, so polling loops in _wait_for_workers and _parallel_setup never see new files after the first ls call. Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
primary_store()/replica_stores() return ALL stores regardless of format. For datasets where the primary is zarr v3 (e.g. ecmwf-ifs-ens), _parallel_setup was calling write_metadata(mode="w") on the production zarr v3 store, wiping all chunk data. Get stores from icechunk_repos instead, which only contains icechunk stores by construction. Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
Take the most descriptive summary from old and new versions for each doc entry, preferring specificity over brevity. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Use copy_zarr_metadata to expand icechunk dimensions by copying zarr.json and coordinate arrays from the local tmp_store, instead of to_zarr(mode=w) which deletes existing data first. Add a guard in write_metadata that raises ValueError if mode=w is used on a non-Path store, preventing accidental remote data deletion. Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
Add a log.warning in _process_region_jobs when workers_total doesn't match the number of jobs, guiding operators to the optimal value. Set GEFS forecast-35-day and GEFS analysis to 2x variable groups since their operational updates reprocess the most recent time slice. Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
Add parallelism guidance to dataset integration guide. Update example dynamical_dataset.py to show setting workers_total via update_num_variable_groups. Fix setup/ready.pkl -> setup/ready.json in parallel_processing.md. Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
…alysis Forecast datasets reprocess the most recent init_time + process a new one, always producing 2 region jobs (or 2 * num_variable_groups). Analysis datasets almost always stay within one shard, so 1x is right. Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
…railing dashes Pod names for indexed jobs include job timestamp + index + random suffix. Reduce max cronjob name from 52 to 42 to fit within the 63-char DNS label limit. Strip trailing dashes after trimming dataset_id to avoid double dashes in names. Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
…leanups
- Cache poll results in _parallel_setup and _wait_for_workers to avoid
double read_all_coordination_files calls against object store
- Return result data from _wait_for_workers so _collect_results reuses it
- Use urlparse instead of manual split("://") in _coordination_fs
- Use truthiness check on icechunk_repos list instead of len() > 0
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…n retry - Replace broad try/except IcechunkError around create_branch/delete_branch with list_branches() membership checks. - On worker 0 retry, reuse snapshots from prior ready.json so original_snapshot stays stable. Without this, a retry after an external main write would refresh original_snapshot to match the moved main, bypassing finalize's from_snapshot_id skip and silently overwriting the external change. - Replace the workers_total != len(all_jobs) warning with a plain info log including total job count, since backfills intentionally run fewer workers than jobs. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…parallel test coverage
- Replace hardcoded workers_total=2 on 4 forecast datasets (gfs/forecast,
aifs_single, icon_eu, hrrr) with 2 * self.update_num_variable_groups().
Same numeric result today, scales automatically if max_vars_per_job is
added later.
- Remove obsolete "warning logged at runtime" note from the integration guide.
- Add failure-mode tests in test_parallel_writes.py:
- Worker restart idempotence (zarr3 and icechunk).
- Worker 0 retry preserves snapshot in ready.json via setdefault.
- Last worker retry after partial finalize skips already-reset repos
and completes remaining ones.
- Replica-last sort ordering invariant.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Backfills can have tens of thousands of workers, making the per-file cat_file loop in read_all_coordination_files prohibitive (N serial GETs). - Add StoreFactory.count_coordination_files for lightweight polling via ls only. - _wait_for_workers now counts instead of reading, and returns None. Backfills never load the pickled result bodies. - _collect_results (updates only) waits then reads once. - Switch read_all_coordination_files from serial cat_file to fs.cat, which runs concurrently on async backends like s3fs. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Rename update_num_variable_groups -> num_variable_groups across all call sites (8 datasets, docs, example). - Move the impl onto RegionJob.num_variable_groups as a classmethod; DynamicalDataset keeps a thin wrapper that forwards data_vars. - Rename my_jobs -> worker_jobs and all_results -> worker_results in _process_region_jobs. - Inline the workers_total == 1 case at the _collect_results call site (nested ifs) so _collect_results only handles the multi-worker path. - Add test_coordination_base_path_ends_in_internal (parametrized across envs and formats) and test_clear_coordination_files_rm_path_rooted_in_internal to lock the "_internal/" safety boundary. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Move _parallel_setup, _wait_for_workers, _collect_results, and _finalize out of DynamicalDataset into a new module as plain module-level functions that take StoreFactory as their first argument. DynamicalDataset is now ~160 lines shorter. Also improve the finalize commit message: capture a single timestamp at the start of finalize so all replicas share the same commit message, formatted "Update at YYYY-MM-DDTHH:MM:SS UTC". Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Matches StoreFactory.icechunk_repos' real return type instead of Any. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…urrent jobs - Replace test_results_contain_all_variables (was bypassing the merge path via update_template_with_results=False) with a test that captures the merged dict via a monkey-patch and asserts keys from every worker - Add test_partial_read_failure_trims_template_to_last_successful_time covering the default update_template_with_results trim behavior - Narrow TestReplicaOrdering to what it verifies; add test_finalize_resets_replicas_before_primary which observes reset_branch call order end-to-end with a 3-store setup - Add test_retry_before_any_reset_completes_all covering the "last worker died before any reset_branch" partial-finalize state - Add TestConcurrentJobs::test_second_job_finalize_skips_reset_when_main_already_moved documenting the skip behavior when two jobs race to finalize Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Worker commit: "Update worker {i} at <ts>" (was "Worker {i} at <ts>").
- Finalize commit: "Update at <ts>" with Z suffix (was " UTC" suffix).
- Both use seconds precision and the literal Z suffix.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… tests - Substitute ThreadPoolExecutor for ProcessPoolExecutor in region_job via an autouse fixture. The per-job ProcessPoolExecutor was spawning fresh workers (conftest forces "spawn" start method) and dominated test time. Shard writes use SharedMemory (works in-process) and zarr writes are I/O bound, so threads are correct here. File runtime 62s → 8s. - Mark the whole module with pytest.mark.slow so the fast suite (-m "not slow") stays within budget. Each test runs _process_region_jobs end-to-end, which is integration-test territory. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 45 out of 45 changed files in this pull request and generated 4 comments.
Comments suppressed due to low confidence (1)
src/reformatters/common/dynamical_dataset.py:323
- Same issue for
backfill():worker_index/workers_totalaretyper.Argument(envvar=...)even though they’re meant to come from env vars on Kubernetes indexed jobs. Usetyper.Option(envvar=...)so the values are actually populated fromWORKER_INDEX/WORKERS_TOTALat runtime; otherwise all pods may run as worker 0 / total 1.
def backfill(
self,
append_dim_end: datetime,
reformat_job_name: Annotated[str, typer.Argument(envvar="JOB_NAME")],
*,
worker_index: Annotated[int, typer.Argument(envvar="WORKER_INDEX")],
workers_total: Annotated[int, typer.Argument(envvar="WORKERS_TOTAL")],
filter_start: datetime | None = None,
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def collect_results( | ||
| store_factory: StoreFactory, reformat_job_name: str, workers_total: int | ||
| ) -> Mapping[str, Sequence[Any]]: | ||
| wait_for_workers(store_factory, reformat_job_name, workers_total) | ||
| result_files = store_factory.read_all_coordination_files( | ||
| reformat_job_name, "results" | ||
| ) | ||
|
|
||
| merged: dict[str, list[SourceFileCoord]] = {} | ||
| for data in result_files: | ||
| for var_name, coords in pickle.loads(data).items(): # noqa: S301 | ||
| merged.setdefault(var_name, []).extend(coords) | ||
| return merged |
Covers parallel_setup, wait_for_workers, collect_results, and finalize with in-memory fakes for StoreFactory + icechunk Repository so real I/O is stubbed out. Suite runs in ~0.3s, keeping the not-slow test runtime effectively unchanged. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Towards #418
Unify update and backfill codepaths into a shared
_process_region_jobsmethod that coordinates parallel writes across Kubernetes indexed jobs.max_vars_per_backfill_jobtomax_vars_per_job, applying variable group splitting to both updates and backfillskindparameter fromget_jobs()andworker_index/workers_total(partitioning now handled by caller viaget_worker_jobs)backfill()method, rename CLI command fromprocess-backfill-region-jobsworker_index/workers_totalparameters toupdate()for parallel updates_internal/{job_name}/files in object storezarr3_onlyparameter tocopy_zarr_metadata