From 9e9ba67c22577b1b989b63f94a50f8c19a75b4cf Mon Sep 17 00:00:00 2001 From: xyuzh Date: Mon, 18 May 2026 12:08:47 -0700 Subject: [PATCH 1/2] [workers] Clean teardown on SIGTERM: drain CUDA streams + destroy process group When a k8s pod is evicted (preemption, scale-down, node drain) the container gets SIGTERM with a 25s grace period before SIGKILL. Without a handler, in-flight NCCL collectives leak communicators and the next run may hit stale process group state. Add a SIGTERM handler inside DistributedTorchRayActor.init_worker_process_group() that: - calls torch.cuda.synchronize() to drain any in-flight CUDA work - calls torch.distributed.destroy_process_group() to release NCCL - exits cleanly with sys.exit(0) Both calls are wrapped in try/except so a partial-state worker still tears down the half that's healthy. The whole sequence is well under the 25s grace window. Each call is guarded (`torch.distributed.is_available() and is_initialized()`) so it does nothing when distributed isn't set up yet. --- skyrl/backends/skyrl_train/workers/worker.py | 24 ++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/skyrl/backends/skyrl_train/workers/worker.py b/skyrl/backends/skyrl_train/workers/worker.py index 43a73fc28c..4aa02f3842 100644 --- a/skyrl/backends/skyrl_train/workers/worker.py +++ b/skyrl/backends/skyrl_train/workers/worker.py @@ -1,7 +1,9 @@ import asyncio import logging import os +import signal import socket +import sys from collections import defaultdict from ctypes import CDLL, POINTER, Structure, c_char_p, c_int, c_ulong, c_void_p from datetime import timedelta @@ -121,6 +123,28 @@ def init_worker_process_group(self): backend="cpu:gloo,cuda:nccl", timeout=timedelta(seconds=SKYRL_WORKER_NCCL_TIMEOUT_IN_S) ) + # Clean teardown on k8s SIGTERM: drain CUDA streams + release NCCL + # communicators before the 25s grace period elapses. + rank = self._rank + + def _sigterm_cleanup(signum, frame): + logger.warning(f"SIGTERM received in worker rank={rank}, cleaning up...") + + try: + torch.cuda.synchronize() + except Exception as e: + logger.warning(f"cuda.synchronize() failed: {e}") + + try: + if torch.distributed.is_available() and torch.distributed.is_initialized(): + torch.distributed.destroy_process_group() + except Exception as e: + logger.warning(f"destroy_process_group() failed: {e}") + + sys.exit(0) + + signal.signal(signal.SIGTERM, _sigterm_cleanup) + # setup device mesh # TODO: Support TP / PP for additional backends # NOTE (sumanthrh): Device mesh and mesh rank are rank specific attributes. For the current way the strategy is defined, From eb8d6126d034180fc280c1c64d3b672ef76513f1 Mon Sep 17 00:00:00 2001 From: xyuzh Date: Mon, 18 May 2026 12:14:24 -0700 Subject: [PATCH 2/2] Guard cuda.synchronize() with torch.cuda.is_available() Skip the call entirely on CPU-only environments so we don't generate a noisy warning every time a non-CUDA worker is terminated. Only emit a warning if synchronize() actually fails on a CUDA-capable system. --- skyrl/backends/skyrl_train/workers/worker.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/skyrl/backends/skyrl_train/workers/worker.py b/skyrl/backends/skyrl_train/workers/worker.py index 4aa02f3842..d0ae5af0cc 100644 --- a/skyrl/backends/skyrl_train/workers/worker.py +++ b/skyrl/backends/skyrl_train/workers/worker.py @@ -130,10 +130,11 @@ def init_worker_process_group(self): def _sigterm_cleanup(signum, frame): logger.warning(f"SIGTERM received in worker rank={rank}, cleaning up...") - try: - torch.cuda.synchronize() - except Exception as e: - logger.warning(f"cuda.synchronize() failed: {e}") + if torch.cuda.is_available(): + try: + torch.cuda.synchronize() + except Exception as e: + logger.warning(f"cuda.synchronize() failed: {e}") try: if torch.distributed.is_available() and torch.distributed.is_initialized():