From 5ac0f837d47a416d5699d8b782a7f2e633f2b31b Mon Sep 17 00:00:00 2001 From: no0k Date: Tue, 2 Jun 2026 13:44:58 -0400 Subject: [PATCH] fix(defer): propagate CancelledError into background event loop DeferredTask.result() awaits on loop.run_in_executor(None, _get_result), where _get_result is a blocking call to self._future.result(timeout). When the outer asyncio task is cancelled (for example a wall-clock timeout in the caller fires), only the outer task receives the CancelledError. The underlying concurrent.futures.Future continues running on the singleton background event loop, and the thread-pool worker stays parked in _get_result waiting for it. Every cancelled result() leaks one worker thread until the default ThreadPoolExecutor saturates and subsequent result() calls block indefinitely. Fix: catch asyncio.CancelledError in result(), call self.kill() to cancel the underlying future and drain the singleton background event loop, then re-raise. The worker thread is released as the future resolves to CancelledError; any in-flight coroutine tasks on the background loop are cancelled too. Adds tests/test_defer_cancellation.py asserting that after a cancelled result(), the underlying future reaches done() within 500ms - a thread-leak regression guard. Reproduces in upstream main and earlier releases - same code shape. --- helpers/defer.py | 14 ++++++- tests/test_defer_cancellation.py | 69 ++++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 1 deletion(-) create mode 100644 tests/test_defer_cancellation.py diff --git a/helpers/defer.py b/helpers/defer.py index 0be8c0a5e6..7a3effb7a2 100644 --- a/helpers/defer.py +++ b/helpers/defer.py @@ -142,7 +142,19 @@ def _get_result(): "The task did not complete within the specified timeout." ) - return await loop.run_in_executor(None, _get_result) + try: + return await loop.run_in_executor(None, _get_result) + except asyncio.CancelledError: + # When this coroutine is cancelled by the outer asyncio task + # (for example a wall-clock timeout in the caller), + # cancellation does NOT propagate into _get_result's blocking + # call to self._future.result(timeout). The thread-pool + # worker stays parked waiting for the underlying future to + # complete, leaking one thread per cancellation. self.kill() + # cancels the future and drains the singleton background + # event loop so the worker is released. + self.kill(terminate_thread=False) + raise def kill(self, terminate_thread: bool = False) -> None: """Kill the task and optionally terminate its thread.""" diff --git a/tests/test_defer_cancellation.py b/tests/test_defer_cancellation.py new file mode 100644 index 0000000000..4bd927787a --- /dev/null +++ b/tests/test_defer_cancellation.py @@ -0,0 +1,69 @@ +"""Regression test for DeferredTask.result() cancellation propagation. + +Without the CancelledError handling in result(), cancelling the outer +asyncio task does NOT propagate into the thread-pool worker running +inside _get_result. The worker stays parked in +self._future.result(timeout) until the underlying future finishes +naturally, leaking one worker thread per cancellation. After enough +cancelled dispatches the default ThreadPoolExecutor saturates and new +result() calls block indefinitely. + +This test asserts that after the outer awaiter is cancelled, the +underlying concurrent.futures.Future reaches `done()` within a short +bound — meaning the worker has been released, not parked. +""" + +from __future__ import annotations + +import asyncio +import os +import sys + +import pytest + +# Make `helpers/` importable when tests are run from the repo root. +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + +from helpers.defer import DeferredTask + + +async def _slow_work() -> str: + # Long enough that the test would never reach this naturally. + await asyncio.sleep(60) + return "should-never-return-in-test" + + +@pytest.mark.asyncio +async def test_result_cancellation_releases_future() -> None: + """Cancelling result() must cancel the underlying future too. + + Bound: 500 ms after the awaiter is cancelled, the underlying + self._future should be `done()` (cancelled). Without the fix this + never happens and the worker thread is leaked. + """ + task = DeferredTask().start_task(_slow_work) + + # Give the background event loop a moment to schedule _slow_work + # and the executor a moment to start _get_result. + await asyncio.sleep(0.05) + + # Start awaiting the result, then cancel that awaiter. + waiter = asyncio.create_task(task.result(timeout=60.0)) + await asyncio.sleep(0.05) + waiter.cancel() + + with pytest.raises(asyncio.CancelledError): + await waiter + + # Poll for the underlying future to become done, up to 500 ms. + # Without the fix this polling exhausts and the assertion below fails. + for _ in range(50): + if task._future is not None and task._future.done(): + break + await asyncio.sleep(0.01) + + assert task._future is not None, "task._future is None after start_task" + assert task._future.done(), ( + "Underlying future did not reach done() within 500 ms of " + "result() cancellation. The thread-pool worker is leaked." + )