From f042cac0b0adc91ad7ff5d97895a7d838e7ee05c Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Sun, 28 Jun 2026 16:03:25 +0200 Subject: [PATCH] Fix JobGroup state aggregation --- nemo_run/run/job.py | 29 +++++++++++++++++++++++++++-- test/run/test_job.py | 28 ++++++++++++++++++---------- 2 files changed, 45 insertions(+), 12 deletions(-) diff --git a/nemo_run/run/job.py b/nemo_run/run/job.py index 12243fbc..c915b021 100644 --- a/nemo_run/run/job.py +++ b/nemo_run/run/job.py @@ -280,12 +280,37 @@ def __post_init__(self): self._dryrun_infos: list[AppDryRunInfo] = [] + def _aggregate_state(self) -> AppState: + if not self.states: + return AppState.UNKNOWN + + if any(state == AppState.FAILED for state in self.states): + return AppState.FAILED + if any(state == AppState.RUNNING for state in self.states): + return AppState.RUNNING + if any(state == AppState.PENDING for state in self.states): + return AppState.PENDING + if any(state == AppState.SUBMITTED for state in self.states): + return AppState.SUBMITTED + if any(state == AppState.UNKNOWN for state in self.states): + return AppState.UNKNOWN + if any(state == AppState.UNSUBMITTED for state in self.states): + return AppState.UNSUBMITTED + if all(state == AppState.SUCCEEDED for state in self.states): + return AppState.SUCCEEDED + if all(is_terminal(state) for state in self.states) and any( + state == AppState.CANCELLED for state in self.states + ): + return AppState.CANCELLED + + return AppState.UNKNOWN + @property def state(self) -> AppState: if not self.launched or not self.handles: return AppState.UNSUBMITTED - return self.states[0] + return self._aggregate_state() @property def handle(self) -> str: @@ -425,7 +450,7 @@ def cancel(self, runner: Runner): runner.cancel(handle) def cleanup(self): - if not self.handles or not is_terminal(self.state): + if not self.handles or not self.states or not all(is_terminal(state) for state in self.states): return executors: list[Executor] = [] diff --git a/test/run/test_job.py b/test/run/test_job.py index 896e4841..ec8e136d 100644 --- a/test/run/test_job.py +++ b/test/run/test_job.py @@ -390,11 +390,11 @@ def test_job_group_properties(simple_task, docker_executor): ) # Set properties explicitly for test - job_group.handles = ["handle1"] - job_group.states = [AppState.RUNNING] + job_group.handles = ["handle1", "handle2"] + job_group.states = [AppState.SUCCEEDED, AppState.CANCELLED] job_group.launched = True - assert job_group.state == AppState.RUNNING + assert job_group.state == AppState.CANCELLED assert job_group.handle == "handle1" assert job_group.executor == docker_executor @@ -429,12 +429,20 @@ def test_job_group_status_launched(simple_task, docker_executor, mock_runner): tasks=[simple_task, simple_task], executors=docker_executor, launched=True, - handles=["handle1"], - states=[AppState.RUNNING], + handles=["handle1", "handle2"], + states=[AppState.RUNNING, AppState.RUNNING], ) - assert job_group.status(mock_runner) == AppState.SUCCEEDED - mock_runner.status.assert_called_once_with("handle1") + mock_runner.status.side_effect = [ + MagicMock(state=AppState.SUCCEEDED), + MagicMock(state=AppState.FAILED), + ] + + assert job_group.status(mock_runner) == AppState.FAILED + assert job_group.states == [AppState.SUCCEEDED, AppState.FAILED] + assert mock_runner.status.call_count == 2 + mock_runner.status.assert_any_call("handle1") + mock_runner.status.assert_any_call("handle2") def test_job_group_status_exception(simple_task, docker_executor, mock_runner): @@ -652,7 +660,7 @@ def test_job_group_cleanup(simple_task, docker_executor): executors=docker_executor, launched=True, handles=["handle1", "handle2"], - states=[AppState.SUCCEEDED], + states=[AppState.SUCCEEDED, AppState.SUCCEEDED], ) with patch.object(docker_executor, "cleanup") as mock_cleanup: @@ -669,7 +677,7 @@ def test_job_group_cleanup_not_terminal(simple_task, docker_executor): executors=docker_executor, launched=True, handles=["handle1", "handle2"], - states=[AppState.RUNNING], + states=[AppState.FAILED, AppState.RUNNING], ) with patch.object(docker_executor, "cleanup") as mock_cleanup: @@ -684,7 +692,7 @@ def test_job_group_cleanup_exception(simple_task, docker_executor): executors=docker_executor, launched=True, handles=["handle1", "handle2"], - states=[AppState.SUCCEEDED], + states=[AppState.SUCCEEDED, AppState.SUCCEEDED], ) with patch.object(docker_executor, "cleanup") as mock_cleanup: