From 8558837000548495bf065a0ba92aa035c20c262f Mon Sep 17 00:00:00 2001 From: Vu Dinh Date: Tue, 19 May 2026 20:23:02 -0400 Subject: [PATCH] fix: prevent process hang when vLLM engine dies during async generation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The exception handler in _run_generate_for_a_group_loop had two bugs: 1. `raise RuntimeError()` inside asyncio.create_task() is silently swallowed — exceptions in tasks are only raised if you await them. This prevented sys.exit(1) from ever being reached. 2. sys.exit(1) itself raises SystemExit which is also captured by the asyncio task machinery. Use os._exit(1) to force-terminate. Additionally: - Release capacity slot on CancelledError via on_rollout_rejected() to prevent assertion failures in validate_state_at_epoch_end. - Reset slot_acquired after on_rollout_accepted() to avoid stale state across loop iterations. Without this fix, a single EngineDeadError causes all GPUs to idle indefinitely while the process hangs in the asyncio event loop. --- skyrl/train/fully_async_trainer.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/skyrl/train/fully_async_trainer.py b/skyrl/train/fully_async_trainer.py index a04bfd6cf4..47ec230088 100644 --- a/skyrl/train/fully_async_trainer.py +++ b/skyrl/train/fully_async_trainer.py @@ -14,7 +14,6 @@ import asyncio import inspect import os -import sys import traceback from dataclasses import dataclass from typing import Iterable, List, Set, Tuple @@ -593,19 +592,16 @@ async def _run_generate_for_a_group_loop(self, generation_output_group_buffer: a except asyncio.QueueFull: raise AssertionError("Generation buffer should never be full given staleness control.") await self._staleness_manager.on_rollout_accepted() + slot_acquired = False except asyncio.CancelledError: - # If a slot was acquired but we exit early, release running count - try: - if "slot_acquired" in locals() and slot_acquired: - raise RuntimeError("Generation workers should only be cancelled when they finish running.") - finally: - return + if "slot_acquired" in locals() and slot_acquired: + logger.warning("Generation worker cancelled while slot was acquired. Releasing slot.") + await self._staleness_manager.on_rollout_rejected() + return except Exception as e: logger.error(f"Generator worker errored out with exception: {e}") logger.error(f"Traceback: \n{traceback.format_exc()}") - if "slot_acquired" in locals() and slot_acquired: - raise RuntimeError("Generation workers should only run into error when they finish running.") - sys.exit(1) + os._exit(1) async def async_sync_policy_weights_to_inference_engines(self): return await self.policy_model.async_run_method(