Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions nemo_run/run/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,12 +280,37 @@ def __post_init__(self):

self._dryrun_infos: list[AppDryRunInfo] = []

def _aggregate_state(self) -> AppState:
if not self.states:
return AppState.UNKNOWN

if any(state == AppState.FAILED for state in self.states):
return AppState.FAILED
if any(state == AppState.RUNNING for state in self.states):
return AppState.RUNNING
if any(state == AppState.PENDING for state in self.states):
return AppState.PENDING
if any(state == AppState.SUBMITTED for state in self.states):
return AppState.SUBMITTED
if any(state == AppState.UNKNOWN for state in self.states):
return AppState.UNKNOWN
if any(state == AppState.UNSUBMITTED for state in self.states):
return AppState.UNSUBMITTED
if all(state == AppState.SUCCEEDED for state in self.states):
return AppState.SUCCEEDED
if all(is_terminal(state) for state in self.states) and any(
state == AppState.CANCELLED for state in self.states
):
return AppState.CANCELLED

return AppState.UNKNOWN

@property
def state(self) -> AppState:
if not self.launched or not self.handles:
return AppState.UNSUBMITTED

return self.states[0]
return self._aggregate_state()

@property
def handle(self) -> str:
Expand Down Expand Up @@ -425,7 +450,7 @@ def cancel(self, runner: Runner):
runner.cancel(handle)

def cleanup(self):
if not self.handles or not is_terminal(self.state):
if not self.handles or not self.states or not all(is_terminal(state) for state in self.states):
return

executors: list[Executor] = []
Expand Down
28 changes: 18 additions & 10 deletions test/run/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,11 +390,11 @@ def test_job_group_properties(simple_task, docker_executor):
)

# Set properties explicitly for test
job_group.handles = ["handle1"]
job_group.states = [AppState.RUNNING]
job_group.handles = ["handle1", "handle2"]
job_group.states = [AppState.SUCCEEDED, AppState.CANCELLED]
job_group.launched = True

assert job_group.state == AppState.RUNNING
assert job_group.state == AppState.CANCELLED
assert job_group.handle == "handle1"
assert job_group.executor == docker_executor

Expand Down Expand Up @@ -429,12 +429,20 @@ def test_job_group_status_launched(simple_task, docker_executor, mock_runner):
tasks=[simple_task, simple_task],
executors=docker_executor,
launched=True,
handles=["handle1"],
states=[AppState.RUNNING],
handles=["handle1", "handle2"],
states=[AppState.RUNNING, AppState.RUNNING],
)

assert job_group.status(mock_runner) == AppState.SUCCEEDED
mock_runner.status.assert_called_once_with("handle1")
mock_runner.status.side_effect = [
MagicMock(state=AppState.SUCCEEDED),
MagicMock(state=AppState.FAILED),
]

assert job_group.status(mock_runner) == AppState.FAILED
assert job_group.states == [AppState.SUCCEEDED, AppState.FAILED]
assert mock_runner.status.call_count == 2
mock_runner.status.assert_any_call("handle1")
mock_runner.status.assert_any_call("handle2")


def test_job_group_status_exception(simple_task, docker_executor, mock_runner):
Expand Down Expand Up @@ -652,7 +660,7 @@ def test_job_group_cleanup(simple_task, docker_executor):
executors=docker_executor,
launched=True,
handles=["handle1", "handle2"],
states=[AppState.SUCCEEDED],
states=[AppState.SUCCEEDED, AppState.SUCCEEDED],
)

with patch.object(docker_executor, "cleanup") as mock_cleanup:
Expand All @@ -669,7 +677,7 @@ def test_job_group_cleanup_not_terminal(simple_task, docker_executor):
executors=docker_executor,
launched=True,
handles=["handle1", "handle2"],
states=[AppState.RUNNING],
states=[AppState.FAILED, AppState.RUNNING],
)

with patch.object(docker_executor, "cleanup") as mock_cleanup:
Expand All @@ -684,7 +692,7 @@ def test_job_group_cleanup_exception(simple_task, docker_executor):
executors=docker_executor,
launched=True,
handles=["handle1", "handle2"],
states=[AppState.SUCCEEDED],
states=[AppState.SUCCEEDED, AppState.SUCCEEDED],
)

with patch.object(docker_executor, "cleanup") as mock_cleanup:
Expand Down