From 2c394a7216056d54546e7411c64041be7a15956b Mon Sep 17 00:00:00 2001 From: Anatoli <2006065+ASk1@users.noreply.github.com> Date: Wed, 1 Apr 2026 12:39:41 +0300 Subject: [PATCH 1/5] add error as context["exception"] in InProcessTestSupervisor --- task-sdk/src/airflow/sdk/execution_time/supervisor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index a4386927fee0c..7c0f38d87b3e1 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -1680,6 +1680,7 @@ def start( # type: ignore[override] log = structlog.get_logger(logger_name="task") state, msg, error = run(ti, context, log) + context["exception"] = error finalize(ti, state, context, log, error) # In the normal subprocess model, the task runner calls this before exiting. From 2109a65140b8435bbb72682ffadc896243c604a9 Mon Sep 17 00:00:00 2001 From: Anatoli <2006065+ASk1@users.noreply.github.com> Date: Thu, 2 Apr 2026 19:26:06 +0300 Subject: [PATCH 2/5] add test_inprocess_failure_callback_receives_exception --- .../execution_time/test_supervisor.py | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py index e88b5c794cfda..a517d8bb12ec3 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py +++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py @@ -2805,6 +2805,56 @@ def _handle_request(self, msg, log, req_id): assert isinstance(response, VariableResult) assert response.value == "value" + def test_inprocess_failure_callback_receives_exception( + self, + monkeypatch, + make_ti_context, + ): + """Run a failing task via InProcessTestSupervisor and ensure the + `on_failure_callback` receives `context['exception']`. + """ + collected: list[BaseException | None] = [None] + + class _Failure(Exception): + pass + + def failure_callback(context): + collected[0] = context.get("exception") + + class FailingOperator(BaseOperator): + def execute(self, context=None): + raise _Failure("boom") + + task = FailingOperator(task_id="failing", on_failure_callback=failure_callback) + + # Assign a minimal DAG to the operator so `task.dag` access succeeds + from airflow.sdk import DAG + + task.dag = DAG(dag_id="test_dag") + + # Create a simple TaskInstance datamodel to pass to the supervisor + ti = TaskInstance( + id=uuid7(), + task_id=task.task_id, + dag_id="test_dag", + run_id="r", + try_number=1, + dag_version_id=uuid7(), + ) + + # Patch the API client used by InProcessTestSupervisor to return a predictable TI context + fake_client = MagicMock() + fake_client.task_instances.start.return_value = make_ti_context() + monkeypatch.setattr( + InProcessTestSupervisor, "_api_client", staticmethod(lambda dag=None: fake_client) + ) + + result = InProcessTestSupervisor.start(what=ti, task=task) + + # Ensure the task failed and the callback saw the exception + assert isinstance(result.error, _Failure) + assert isinstance(collected[0], _Failure) + class TestInProcessClient: def test_no_retries(self): From c07eb32319cf8bf18fbe7181c722ba128e2f0c72 Mon Sep 17 00:00:00 2001 From: Anatoli <2006065+ASk1@users.noreply.github.com> Date: Tue, 14 Apr 2026 12:23:08 +0300 Subject: [PATCH 3/5] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- task-sdk/tests/task_sdk/execution_time/test_supervisor.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py index a517d8bb12ec3..0c69061e95324 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py +++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py @@ -2843,8 +2843,10 @@ def execute(self, context=None): ) # Patch the API client used by InProcessTestSupervisor to return a predictable TI context - fake_client = MagicMock() - fake_client.task_instances.start.return_value = make_ti_context() + fake_task_instances = mock.MagicMock(spec_set=["start"]) + fake_task_instances.start.return_value = make_ti_context() + fake_client = mock.MagicMock(spec_set=["task_instances"]) + fake_client.task_instances = fake_task_instances monkeypatch.setattr( InProcessTestSupervisor, "_api_client", staticmethod(lambda dag=None: fake_client) ) From 0466b5dcfad06f6d7d20d01dceb6579e5b4c6ef8 Mon Sep 17 00:00:00 2001 From: Anatoli <2006065+ASk1@users.noreply.github.com> Date: Tue, 14 Apr 2026 14:31:11 +0300 Subject: [PATCH 4/5] adjust the strict mocked task_instances API to include finish --- task-sdk/tests/task_sdk/execution_time/test_supervisor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py index 0c69061e95324..c7f398b19b3dd 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py +++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py @@ -2843,7 +2843,7 @@ def execute(self, context=None): ) # Patch the API client used by InProcessTestSupervisor to return a predictable TI context - fake_task_instances = mock.MagicMock(spec_set=["start"]) + fake_task_instances = mock.MagicMock(spec_set=["start", "finish"]) fake_task_instances.start.return_value = make_ti_context() fake_client = mock.MagicMock(spec_set=["task_instances"]) fake_client.task_instances = fake_task_instances From 3d66cd581f134af585508304c770e551d6fc87eb Mon Sep 17 00:00:00 2001 From: Anatoli <2006065+ASk1@users.noreply.github.com> Date: Tue, 14 Apr 2026 15:18:53 +0300 Subject: [PATCH 5/5] move DAG import to module level --- task-sdk/tests/task_sdk/execution_time/test_supervisor.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py index c7f398b19b3dd..6a5f69a1b4838 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py +++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py @@ -49,7 +49,7 @@ from uuid6 import uuid7 from airflow.executors.workloads import BundleInfo -from airflow.sdk import BaseOperator, timezone +from airflow.sdk import DAG, BaseOperator, timezone from airflow.sdk.api import client as sdk_client from airflow.sdk.api.client import ServerResponseError from airflow.sdk.api.datamodels._generated import ( @@ -2828,8 +2828,6 @@ def execute(self, context=None): task = FailingOperator(task_id="failing", on_failure_callback=failure_callback) # Assign a minimal DAG to the operator so `task.dag` access succeeds - from airflow.sdk import DAG - task.dag = DAG(dag_id="test_dag") # Create a simple TaskInstance datamodel to pass to the supervisor