From 4f72680f4cd51b34dc8e92cc446c1cb1ffc99211 Mon Sep 17 00:00:00 2001 From: rpowers Date: Wed, 6 May 2026 15:38:28 -0400 Subject: [PATCH] fix(controller): pass task ids to bay get_capacity --- .husky/pre-commit | 2 +- .../controller/facilities/bays/__init__.py | 10 ++-- .../farm/services/controller/task_manager.py | 5 +- tests/services/controller/test_bays.py | 34 +++++++------- .../services/controller/test_task_manager.py | 46 ++++++++++++++++++- 5 files changed, 71 insertions(+), 26 deletions(-) diff --git a/.husky/pre-commit b/.husky/pre-commit index 955ee52..248f363 100755 --- a/.husky/pre-commit +++ b/.husky/pre-commit @@ -1,2 +1,2 @@ -npx --no-install lint-staged --cwd ./dashboard-ui --allow-empty +( cd dashboard-ui && npx --no-install lint-staged --allow-empty ) make lint diff --git a/nv/svc/farm/services/controller/facilities/bays/__init__.py b/nv/svc/farm/services/controller/facilities/bays/__init__.py index dd97076..ad30cda 100644 --- a/nv/svc/farm/services/controller/facilities/bays/__init__.py +++ b/nv/svc/farm/services/controller/facilities/bays/__init__.py @@ -18,12 +18,12 @@ def __init__(self) -> None: self._used = [] @abc.abstractmethod - async def get_capacity(self, tasks: Dict[Tuple[str, str], List[int]], job_definitions: List[Dict], taskid_to_job_definition_map: Dict[int, str] = {}) -> Tuple[List, Dict]: + async def get_capacity(self, tasks: Dict[Tuple[str, str], List[str]], job_definitions: List[Dict], taskid_to_job_definition_map: Dict[int, str] = {}) -> Tuple[List, Dict]: """ Get list of tasks and the capacity available to currently run on the agent. Args: - tasks (Dict[Tuple[str, str], List[int]]): List of currently active tasks and their types. + tasks (Dict[Tuple[str, str], List[str]]): List of currently active tasks and their types. job_definitions (List[Dict]): A list of the available jobs an agent can run. """ pass @@ -52,7 +52,7 @@ async def release(self, process, capacities): class OneSlotBay(BaseBays): """One-slot bay.""" - async def get_capacity(self, tasks: Dict[Tuple[str, str], List[int]], job_definitions: List[Dict], taskid_to_job_definition_map: Dict[int, str] = {}) -> Tuple[List, Dict]: + async def get_capacity(self, tasks: Dict[Tuple[str, str], List[str]], job_definitions: List[Dict], taskid_to_job_definition_map: Dict[int, str] = {}) -> Tuple[List, Dict]: """ Return the capacity for the given tasks. @@ -104,7 +104,7 @@ def _get_gpu_count(self, gpu_instance_type: Optional[str]) -> int: return 1 - async def get_capacity(self, tasks: Dict[Tuple[str, str], List[int]], job_definitions: List[Dict[str, Any]], taskid_to_job_definition_map: Dict[int, str] = {}) -> Tuple[List, Dict]: + async def get_capacity(self, tasks: Dict[Tuple[str, str], List[str]], job_definitions: List[Dict[str, Any]], taskid_to_job_definition_map: Dict[int, str] = {}) -> Tuple[List, Dict]: """ Return the capacity for the given tasks. @@ -164,6 +164,6 @@ def _get_capacity_limit(self, capacity_file) -> int: return 0 - async def get_capacity(self, tasks: Dict[Tuple[str, str], List[int]], job_definitions: List[Dict], taskid_to_job_definition_map: Dict[int, str] = {}) -> Tuple[List, Dict]: + async def get_capacity(self, tasks: Dict[Tuple[str, str], List[str]], job_definitions: List[Dict], taskid_to_job_definition_map: Dict[int, str] = {}) -> Tuple[List, Dict]: self._max_capacity = self._get_capacity_limit(self._capacity_file) return await super().get_capacity(tasks, job_definitions, taskid_to_job_definition_map) diff --git a/nv/svc/farm/services/controller/task_manager.py b/nv/svc/farm/services/controller/task_manager.py index 6db6811..c6a0502 100644 --- a/nv/svc/farm/services/controller/task_manager.py +++ b/nv/svc/farm/services/controller/task_manager.py @@ -719,13 +719,14 @@ async def _get_task(self) -> bool: job_definitions: List[Dict[str, Any]] = await self.get_job_type_definitions() task_types = await self.get_task_types() - active_tasks = {} + active_tasks: Dict[Tuple[str, str], List[str]] = {} for task_type, task_function in task_types: - active_tasks[(task_type, task_function)] = await self._task_store.get_tasks( + tasks = await self._task_store.get_tasks( task_type=task_type, task_function=task_function, statuses=["running", "starting"], ) + active_tasks[(task_type, task_function)] = [t["task_id"] for t in tasks] possible_tasks, capacity = await self._bay_manager.get_capacity( active_tasks, diff --git a/tests/services/controller/test_bays.py b/tests/services/controller/test_bays.py index 7703991..9926a80 100644 --- a/tests/services/controller/test_bays.py +++ b/tests/services/controller/test_bays.py @@ -32,7 +32,7 @@ async def test_available_capacity(self): async def test_noavailable_capacity(self): bay_manager = OneSlotBay() - task_data = {("foo", "bar"): [], ("baz", "qux"): [12345]} + task_data = {("foo", "bar"): [], ("baz", "qux"): ["12345"]} tasks, capacity = await bay_manager.get_capacity(tasks=task_data, job_definitions=[]) self.assertEqual(tasks, []) @@ -53,7 +53,7 @@ async def test_available_capacity(self): async def test_noavailable_capacity(self): bay_manager = MultiSlotBay(max_capacity=2) - task_data = {("foo", "bar"): [67890], ("baz", "qux"): [12345]} + task_data = {("foo", "bar"): ["67890"], ("baz", "qux"): ["12345"]} tasks, capacity = await bay_manager.get_capacity(tasks=task_data, job_definitions=[]) self.assertEqual(tasks, []) @@ -90,7 +90,7 @@ async def test_noavailable_capacity(self): bay_manager = FileBasedMultiSlotBay(file.name) - task_data = {("foo", "bar"): [67890], ("baz", "qux"): [12345]} + task_data = {("foo", "bar"): ["67890"], ("baz", "qux"): ["12345"]} tasks, capacity = await bay_manager.get_capacity(tasks=task_data, job_definitions=[]) self.assertEqual(tasks, []) @@ -107,7 +107,7 @@ async def test_update_capacity(self): bay_manager = FileBasedMultiSlotBay(file.name) - task_data = {("foo", "bar"): [67890], ("baz", "qux"): [12345]} + task_data = {("foo", "bar"): ["67890"], ("baz", "qux"): ["12345"]} tasks, capacity = await bay_manager.get_capacity(tasks=task_data, job_definitions=[]) self.assertEqual(tasks, []) @@ -332,12 +332,12 @@ async def test_multislot_tasks_are_filtered_on_gpu_capacity_not_full(self): bay_manager = MultiSlotBay(max_capacity=10) - not_full_task_job_definition_map: Dict[int, str] = { - 123: "single-gpu", - 124: "multi-gpu", - 125: "multi-gpu" + not_full_task_job_definition_map: Dict[str, str] = { + "123": "single-gpu", + "124": "multi-gpu", + "125": "multi-gpu" } - task_data: Dict[Tuple[str, str], List[int]] = {("kit-service", "render.run"): list(not_full_task_job_definition_map.keys())} + task_data: Dict[Tuple[str, str], List[str]] = {("kit-service", "render.run"): list(not_full_task_job_definition_map.keys())} job_definitions: List[Dict[str, Any]] = [ {"name": "single-gpu", "job_type": "kit-service", "task_function": "render.run", "capacity_requirements": {"gpuSpecification": {"instanceType": "nvidia.com/gpu_1x"}}}, {"name": "multi-gpu", "job_type": "kit-service", "task_function": "render.run", "capacity_requirements": {"gpuSpecification": {"instanceType": "nvidia.com/gpu_4x"}}} @@ -353,15 +353,15 @@ async def test_multislot_tasks_are_filtered_on_gpu_capacity_full(self): bay_manager = MultiSlotBay(max_capacity=10) # this list of tasks should exceed the gpu capacity, but if we're not calculating gpus properly, it'll fit when tasks:gpus are 1:1. - full_task_job_definition_map: Dict[int, str] = { - 123: "single-gpu", - 124: "multi-gpu", - 125: "multi-gpu", - 126: "multi-gpu", - 127: "multi-gpu", - 128: "multi-gpu" + full_task_job_definition_map: Dict[str, str] = { + "123": "single-gpu", + "124": "multi-gpu", + "125": "multi-gpu", + "126": "multi-gpu", + "127": "multi-gpu", + "128": "multi-gpu" } - task_data: Dict[Tuple[str, str], List[int]] = {("kit-service", "render.run"): list(full_task_job_definition_map.keys())} + task_data: Dict[Tuple[str, str], List[str]] = {("kit-service", "render.run"): list(full_task_job_definition_map.keys())} job_definitions: List[Dict[str, Any]] = [ {"name": "single-gpu", "job_type": "kit-service", "task_function": "render.run", "capacity_requirements": {"gpuSpecification": {"instanceType": "nvidia.com/gpu_1x"}}}, {"name": "multi-gpu", "job_type": "kit-service", "task_function": "render.run", "capacity_requirements": {"gpuSpecification": {"instanceType": "nvidia.com/gpu_4x"}}} diff --git a/tests/services/controller/test_task_manager.py b/tests/services/controller/test_task_manager.py index 04aab79..4af398b 100644 --- a/tests/services/controller/test_task_manager.py +++ b/tests/services/controller/test_task_manager.py @@ -17,7 +17,7 @@ from nv.svc.farm.services.tasks.facilities.tasks import backends, store from nv.svc.farm.services.controller.config import FarmControllerConfig -from nv.svc.farm.services.controller.facilities.bays import OneSlotBay +from nv.svc.farm.services.controller.facilities.bays import MultiSlotBay, OneSlotBay from nv.svc.farm.services.controller.task_manager import TaskManager, get_utc_unixtime from ._mocks import MockJobStore, MockProcessManager @@ -488,6 +488,50 @@ async def test_running_tasks_update_donot_affect_dependants(self): self.assertIsNone(updated_task) + async def _stub_post_launch_transitions(self): + """Replace post-launch status transitions with no-ops so tests can focus on the fetch+capacity path.""" + async def noop(*args, **kwargs): + return None + self._manager._set_task_status = noop + self._manager.set_task_errored = noop + + async def test_get_task_with_real_multislot_bay_and_no_active_tasks(self): + """MultiSlotBay reports capacity when nothing is running, and _get_task fetches.""" + self._manager._bay_manager = MultiSlotBay(max_capacity=4) + self.TEST_FETCH_TASK_STATUS_ID = "fetched-when-empty" + await self._stub_post_launch_transitions() + + submitted = await self._manager._get_task() + + self.assertTrue(submitted, "_get_task should claim a task when the bay has capacity") + + async def test_get_task_with_real_multislot_bay_and_multiple_active_tasks(self): + """MultiSlotBay reports capacity for more work when active tasks remain under the cap.""" + self._manager._bay_manager = MultiSlotBay(max_capacity=10) + await self._generate_tasks(running_count=3) + self.TEST_FETCH_TASK_STATUS_ID = "fetched-when-three-running" + await self._stub_post_launch_transitions() + + submitted = await self._manager._get_task() + + self.assertTrue( + submitted, + "_get_task should claim a task when bay has capacity for more, even with active tasks present", + ) + + async def test_get_task_with_real_multislot_bay_at_max_capacity(self): + """MultiSlotBay refuses to advertise capacity once active tasks hit max_capacity.""" + self._manager._bay_manager = MultiSlotBay(max_capacity=2) + await self._generate_tasks(running_count=2) + await self._stub_post_launch_transitions() + + submitted = await self._manager._get_task() + + self.assertFalse( + submitted, + "_get_task should not claim a task when active GPU usage already meets max_capacity", + ) + async def test_fetched_task_from_queue_is_in_pending(self): """When we fetch a task from the queue we put in a pending state while the operator hasn't looked at it""" _, running, _, _ = await self._generate_tasks(running_count=1)