Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .husky/pre-commit
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
npx --no-install lint-staged --cwd ./dashboard-ui --allow-empty
( cd dashboard-ui && npx --no-install lint-staged --allow-empty )
make lint
10 changes: 5 additions & 5 deletions nv/svc/farm/services/controller/facilities/bays/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ def __init__(self) -> None:
self._used = []

@abc.abstractmethod
async def get_capacity(self, tasks: Dict[Tuple[str, str], List[int]], job_definitions: List[Dict], taskid_to_job_definition_map: Dict[int, str] = {}) -> Tuple[List, Dict]:
async def get_capacity(self, tasks: Dict[Tuple[str, str], List[str]], job_definitions: List[Dict], taskid_to_job_definition_map: Dict[int, str] = {}) -> Tuple[List, Dict]:
"""
Get list of tasks and the capacity available to currently run on the agent.

Args:
tasks (Dict[Tuple[str, str], List[int]]): List of currently active tasks and their types.
tasks (Dict[Tuple[str, str], List[str]]): List of currently active tasks and their types.
job_definitions (List[Dict]): A list of the available jobs an agent can run.
"""
pass
Expand Down Expand Up @@ -52,7 +52,7 @@ async def release(self, process, capacities):
class OneSlotBay(BaseBays):
"""One-slot bay."""

async def get_capacity(self, tasks: Dict[Tuple[str, str], List[int]], job_definitions: List[Dict], taskid_to_job_definition_map: Dict[int, str] = {}) -> Tuple[List, Dict]:
async def get_capacity(self, tasks: Dict[Tuple[str, str], List[str]], job_definitions: List[Dict], taskid_to_job_definition_map: Dict[int, str] = {}) -> Tuple[List, Dict]:
"""
Return the capacity for the given tasks.

Expand Down Expand Up @@ -104,7 +104,7 @@ def _get_gpu_count(self, gpu_instance_type: Optional[str]) -> int:
return 1


async def get_capacity(self, tasks: Dict[Tuple[str, str], List[int]], job_definitions: List[Dict[str, Any]], taskid_to_job_definition_map: Dict[int, str] = {}) -> Tuple[List, Dict]:
async def get_capacity(self, tasks: Dict[Tuple[str, str], List[str]], job_definitions: List[Dict[str, Any]], taskid_to_job_definition_map: Dict[int, str] = {}) -> Tuple[List, Dict]:
"""
Return the capacity for the given tasks.

Expand Down Expand Up @@ -164,6 +164,6 @@ def _get_capacity_limit(self, capacity_file) -> int:

return 0

async def get_capacity(self, tasks: Dict[Tuple[str, str], List[int]], job_definitions: List[Dict], taskid_to_job_definition_map: Dict[int, str] = {}) -> Tuple[List, Dict]:
async def get_capacity(self, tasks: Dict[Tuple[str, str], List[str]], job_definitions: List[Dict], taskid_to_job_definition_map: Dict[int, str] = {}) -> Tuple[List, Dict]:
self._max_capacity = self._get_capacity_limit(self._capacity_file)
return await super().get_capacity(tasks, job_definitions, taskid_to_job_definition_map)
5 changes: 3 additions & 2 deletions nv/svc/farm/services/controller/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,13 +719,14 @@ async def _get_task(self) -> bool:
job_definitions: List[Dict[str, Any]] = await self.get_job_type_definitions()
task_types = await self.get_task_types()

active_tasks = {}
active_tasks: Dict[Tuple[str, str], List[str]] = {}
for task_type, task_function in task_types:
active_tasks[(task_type, task_function)] = await self._task_store.get_tasks(
tasks = await self._task_store.get_tasks(
task_type=task_type,
task_function=task_function,
statuses=["running", "starting"],
)
active_tasks[(task_type, task_function)] = [t["task_id"] for t in tasks]

possible_tasks, capacity = await self._bay_manager.get_capacity(
active_tasks,
Expand Down
34 changes: 17 additions & 17 deletions tests/services/controller/test_bays.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async def test_available_capacity(self):
async def test_noavailable_capacity(self):
bay_manager = OneSlotBay()

task_data = {("foo", "bar"): [], ("baz", "qux"): [12345]}
task_data = {("foo", "bar"): [], ("baz", "qux"): ["12345"]}

tasks, capacity = await bay_manager.get_capacity(tasks=task_data, job_definitions=[])
self.assertEqual(tasks, [])
Expand All @@ -53,7 +53,7 @@ async def test_available_capacity(self):
async def test_noavailable_capacity(self):
bay_manager = MultiSlotBay(max_capacity=2)

task_data = {("foo", "bar"): [67890], ("baz", "qux"): [12345]}
task_data = {("foo", "bar"): ["67890"], ("baz", "qux"): ["12345"]}

tasks, capacity = await bay_manager.get_capacity(tasks=task_data, job_definitions=[])
self.assertEqual(tasks, [])
Expand Down Expand Up @@ -90,7 +90,7 @@ async def test_noavailable_capacity(self):

bay_manager = FileBasedMultiSlotBay(file.name)

task_data = {("foo", "bar"): [67890], ("baz", "qux"): [12345]}
task_data = {("foo", "bar"): ["67890"], ("baz", "qux"): ["12345"]}

tasks, capacity = await bay_manager.get_capacity(tasks=task_data, job_definitions=[])
self.assertEqual(tasks, [])
Expand All @@ -107,7 +107,7 @@ async def test_update_capacity(self):

bay_manager = FileBasedMultiSlotBay(file.name)

task_data = {("foo", "bar"): [67890], ("baz", "qux"): [12345]}
task_data = {("foo", "bar"): ["67890"], ("baz", "qux"): ["12345"]}

tasks, capacity = await bay_manager.get_capacity(tasks=task_data, job_definitions=[])
self.assertEqual(tasks, [])
Expand Down Expand Up @@ -332,12 +332,12 @@ async def test_multislot_tasks_are_filtered_on_gpu_capacity_not_full(self):

bay_manager = MultiSlotBay(max_capacity=10)

not_full_task_job_definition_map: Dict[int, str] = {
123: "single-gpu",
124: "multi-gpu",
125: "multi-gpu"
not_full_task_job_definition_map: Dict[str, str] = {
"123": "single-gpu",
"124": "multi-gpu",
"125": "multi-gpu"
}
task_data: Dict[Tuple[str, str], List[int]] = {("kit-service", "render.run"): list(not_full_task_job_definition_map.keys())}
task_data: Dict[Tuple[str, str], List[str]] = {("kit-service", "render.run"): list(not_full_task_job_definition_map.keys())}
job_definitions: List[Dict[str, Any]] = [
{"name": "single-gpu", "job_type": "kit-service", "task_function": "render.run", "capacity_requirements": {"gpuSpecification": {"instanceType": "nvidia.com/gpu_1x"}}},
{"name": "multi-gpu", "job_type": "kit-service", "task_function": "render.run", "capacity_requirements": {"gpuSpecification": {"instanceType": "nvidia.com/gpu_4x"}}}
Expand All @@ -353,15 +353,15 @@ async def test_multislot_tasks_are_filtered_on_gpu_capacity_full(self):
bay_manager = MultiSlotBay(max_capacity=10)

# this list of tasks should exceed the gpu capacity, but if we're not calculating gpus properly, it'll fit when tasks:gpus are 1:1.
full_task_job_definition_map: Dict[int, str] = {
123: "single-gpu",
124: "multi-gpu",
125: "multi-gpu",
126: "multi-gpu",
127: "multi-gpu",
128: "multi-gpu"
full_task_job_definition_map: Dict[str, str] = {
"123": "single-gpu",
"124": "multi-gpu",
"125": "multi-gpu",
"126": "multi-gpu",
"127": "multi-gpu",
"128": "multi-gpu"
}
task_data: Dict[Tuple[str, str], List[int]] = {("kit-service", "render.run"): list(full_task_job_definition_map.keys())}
task_data: Dict[Tuple[str, str], List[str]] = {("kit-service", "render.run"): list(full_task_job_definition_map.keys())}
job_definitions: List[Dict[str, Any]] = [
{"name": "single-gpu", "job_type": "kit-service", "task_function": "render.run", "capacity_requirements": {"gpuSpecification": {"instanceType": "nvidia.com/gpu_1x"}}},
{"name": "multi-gpu", "job_type": "kit-service", "task_function": "render.run", "capacity_requirements": {"gpuSpecification": {"instanceType": "nvidia.com/gpu_4x"}}}
Expand Down
46 changes: 45 additions & 1 deletion tests/services/controller/test_task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from nv.svc.farm.services.tasks.facilities.tasks import backends, store

from nv.svc.farm.services.controller.config import FarmControllerConfig
from nv.svc.farm.services.controller.facilities.bays import OneSlotBay
from nv.svc.farm.services.controller.facilities.bays import MultiSlotBay, OneSlotBay
from nv.svc.farm.services.controller.task_manager import TaskManager, get_utc_unixtime

from ._mocks import MockJobStore, MockProcessManager
Expand Down Expand Up @@ -488,6 +488,50 @@ async def test_running_tasks_update_donot_affect_dependants(self):

self.assertIsNone(updated_task)

async def _stub_post_launch_transitions(self):
"""Replace post-launch status transitions with no-ops so tests can focus on the fetch+capacity path."""
async def noop(*args, **kwargs):
return None
self._manager._set_task_status = noop
self._manager.set_task_errored = noop

async def test_get_task_with_real_multislot_bay_and_no_active_tasks(self):
"""MultiSlotBay reports capacity when nothing is running, and _get_task fetches."""
self._manager._bay_manager = MultiSlotBay(max_capacity=4)
self.TEST_FETCH_TASK_STATUS_ID = "fetched-when-empty"
await self._stub_post_launch_transitions()

submitted = await self._manager._get_task()

self.assertTrue(submitted, "_get_task should claim a task when the bay has capacity")

async def test_get_task_with_real_multislot_bay_and_multiple_active_tasks(self):
"""MultiSlotBay reports capacity for more work when active tasks remain under the cap."""
self._manager._bay_manager = MultiSlotBay(max_capacity=10)
await self._generate_tasks(running_count=3)
self.TEST_FETCH_TASK_STATUS_ID = "fetched-when-three-running"
await self._stub_post_launch_transitions()

submitted = await self._manager._get_task()

self.assertTrue(
submitted,
"_get_task should claim a task when bay has capacity for more, even with active tasks present",
)

async def test_get_task_with_real_multislot_bay_at_max_capacity(self):
"""MultiSlotBay refuses to advertise capacity once active tasks hit max_capacity."""
self._manager._bay_manager = MultiSlotBay(max_capacity=2)
await self._generate_tasks(running_count=2)
await self._stub_post_launch_transitions()

submitted = await self._manager._get_task()

self.assertFalse(
submitted,
"_get_task should not claim a task when active GPU usage already meets max_capacity",
)

async def test_fetched_task_from_queue_is_in_pending(self):
"""When we fetch a task from the queue we put in a pending state while the operator hasn't looked at it"""
_, running, _, _ = await self._generate_tasks(running_count=1)
Expand Down
Loading