From 7eb4afd99dbe06020f2d7bf13db3321cf38562b1 Mon Sep 17 00:00:00 2001 From: Pedrinhonitz Date: Tue, 5 May 2026 13:51:53 -0300 Subject: [PATCH 1/2] fix(scheduler): ignore stale executor success after defer reschedule When a trigger moves a deferred task back to scheduled before the scheduler processes the executor success from the worker defer exit, treat it as benign (same try_number, next_method set) instead of state mismatch failure. Closes #66374 Co-authored-by: Cursor --- airflow-core/newsfragments/66374.bugfix.rst | 1 + .../src/airflow/jobs/scheduler_job_runner.py | 17 +++++- .../tests/unit/jobs/test_scheduler_job.py | 59 +++++++++++++++++++ 3 files changed, 74 insertions(+), 3 deletions(-) create mode 100644 airflow-core/newsfragments/66374.bugfix.rst diff --git a/airflow-core/newsfragments/66374.bugfix.rst b/airflow-core/newsfragments/66374.bugfix.rst new file mode 100644 index 0000000000000..1cd2794aa1b28 --- /dev/null +++ b/airflow-core/newsfragments/66374.bugfix.rst @@ -0,0 +1 @@ +Ignore stale executor success after a deferred task is rescheduled to ``scheduled`` (before re-queue) so the scheduler does not mark the task failed on a race with the trigger. Fixes ``#66374``. diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 80897213c18b5..c218fbf0da7ba 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -1198,7 +1198,9 @@ def process_executor_events( The method handles several key scenarios: 1. **Normal task completion**: Updates task states for successful/failed tasks 2. **External termination**: Detects tasks killed outside Airflow and marks them as failed - 3. **Task requeuing**: Handles tasks that were requeued by other schedulers or executors + 3. **Task requeuing**: Handles tasks that were requeued by other schedulers or executors, + and tasks moved to ``scheduled`` after a trigger fired so a stale executor success from the + pre-deferral worker exit does not fail the task instance 4. **Callback processing**: Sends task callback requests to DAG Processor for execution 5. **Email notifications**: Sends email notification requests to DAG Processor @@ -1348,12 +1350,14 @@ def process_executor_events( ti.pid, ) - # There are two scenarios why the same TI with the same try_number is queued - # after executor is finished with it: + # There are multiple scenarios why the same TI with the same try_number looks queued or + # waiting after the executor is finished with it: # 1) the TI was killed externally and it had no time to mark itself failed # - in this case we should mark it as failed here. # 2) the TI has been requeued after getting deferred - in this case either our executor has it # or the TI is queued by another job. Either ways we should not fail it. + # 3) the trigger already put the TI back to scheduled (resume after defer) but the executor success + # from the worker exit after defer() has not been processed yet - should not fail it. # All of this could also happen if the state is "running", # but that is handled by the scheduler detecting task instances without heartbeats. @@ -1367,6 +1371,13 @@ def process_executor_events( ti_requeued = ( ti.queued_by_job_id != job_id # Another scheduler has queued this task again or executor.has_task(ti) # This scheduler has this task already + or ( + # Resume-after-defer: trigger moved TI to scheduled (next_method set) before we saw the + # executor success from the defer exit for the same try_number. + ti.state == TaskInstanceState.SCHEDULED + and state == TaskInstanceState.SUCCESS + and ti.next_method is not None + ) ) if ti_queued and not ti_requeued: diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 16c42e8acd029..13d8a4009a1c1 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -816,6 +816,65 @@ def test_process_executor_events_ti_requeued( self.job_runner.executor.callback_sink.send.assert_not_called() mock_stats.incr.assert_not_called() + @mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest") + @mock.patch("airflow._shared.observability.metrics.stats._get_backend") + def test_process_executor_events_stale_success_when_scheduled_after_defer( + self, mock_get_backend, mock_task_callback, dag_maker + ): + """ + Trigger moved TI to scheduled (resume after defer) before executor success from defer exit arrived. + + Regression for https://github.com/apache/airflow/issues/66374 — must not treat as state mismatch. + """ + mock_stats = mock.MagicMock(spec=StatsLogger) + mock_get_backend.return_value = mock_stats + dag_id = "test_process_executor_events_stale_success_scheduled_after_defer" + task_id_1 = "dummy_task" + + session = settings.Session() + with dag_maker(dag_id=dag_id, fileloc="/test_path1/"): + task1 = EmptyOperator(task_id=task_id_1) + ti1 = dag_maker.create_dagrun().get_task_instance(task1.task_id) + + executor = MockExecutor(do_update=False) + task_callback = mock.MagicMock() + mock_task_callback.return_value = task_callback + scheduler_job = Job() + session.add(scheduler_job) + session.flush() + self.job_runner = SchedulerJobRunner(scheduler_job, executors=[executor]) + + ti1.state = State.SCHEDULED + ti1.next_method = "execute_callback" + ti1.queued_by_job_id = scheduler_job.id + ti1.try_number = 1 + session.merge(ti1) + session.commit() + + executor.event_buffer[ti1.key] = State.SUCCESS, None + executor.has_task = mock.MagicMock(return_value=False) + mock_stats.incr.reset_mock() + + self.job_runner._process_executor_events(executor=executor, session=session) + ti1.refresh_from_db(session=session) + assert ti1.state == State.SCHEDULED + self.job_runner.executor.callback_sink.send.assert_not_called() + mock_stats.incr.assert_not_called() + + # Without next_method, scheduled + stale success is still a mismatch (e.g. external kill). + ti1.next_method = None + session.merge(ti1) + session.commit() + + executor.event_buffer[ti1.key] = State.SUCCESS, None + mock_stats.incr.reset_mock() + + self.job_runner._process_executor_events(executor=executor, session=session) + mock_stats.incr.assert_any_call( + "scheduler.tasks.killed_externally", + tags={"dag_id": dag_id, "task_id": ti1.task_id}, + ) + @mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest") @mock.patch("airflow._shared.observability.metrics.stats._get_backend") def test_process_executor_events_multiple_try_numbers_warns( From a61eeb40532d4ea97844eb101c7224a1ee32bc5b Mon Sep 17 00:00:00 2001 From: Pedrinhonitz Date: Tue, 5 May 2026 14:44:54 -0300 Subject: [PATCH 2/2] Remove newsfragment for bugfix (per review) --- airflow-core/newsfragments/66374.bugfix.rst | 1 - 1 file changed, 1 deletion(-) delete mode 100644 airflow-core/newsfragments/66374.bugfix.rst diff --git a/airflow-core/newsfragments/66374.bugfix.rst b/airflow-core/newsfragments/66374.bugfix.rst deleted file mode 100644 index 1cd2794aa1b28..0000000000000 --- a/airflow-core/newsfragments/66374.bugfix.rst +++ /dev/null @@ -1 +0,0 @@ -Ignore stale executor success after a deferred task is rescheduled to ``scheduled`` (before re-queue) so the scheduler does not mark the task failed on a race with the trigger. Fixes ``#66374``.