diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 80897213c18b5..c218fbf0da7ba 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -1198,7 +1198,9 @@ def process_executor_events( The method handles several key scenarios: 1. **Normal task completion**: Updates task states for successful/failed tasks 2. **External termination**: Detects tasks killed outside Airflow and marks them as failed - 3. **Task requeuing**: Handles tasks that were requeued by other schedulers or executors + 3. **Task requeuing**: Handles tasks that were requeued by other schedulers or executors, + and tasks moved to ``scheduled`` after a trigger fired so a stale executor success from the + pre-deferral worker exit does not fail the task instance 4. **Callback processing**: Sends task callback requests to DAG Processor for execution 5. **Email notifications**: Sends email notification requests to DAG Processor @@ -1348,12 +1350,14 @@ def process_executor_events( ti.pid, ) - # There are two scenarios why the same TI with the same try_number is queued - # after executor is finished with it: + # There are multiple scenarios why the same TI with the same try_number looks queued or + # waiting after the executor is finished with it: # 1) the TI was killed externally and it had no time to mark itself failed # - in this case we should mark it as failed here. # 2) the TI has been requeued after getting deferred - in this case either our executor has it # or the TI is queued by another job. Either ways we should not fail it. + # 3) the trigger already put the TI back to scheduled (resume after defer) but the executor success + # from the worker exit after defer() has not been processed yet - should not fail it. # All of this could also happen if the state is "running", # but that is handled by the scheduler detecting task instances without heartbeats. @@ -1367,6 +1371,13 @@ def process_executor_events( ti_requeued = ( ti.queued_by_job_id != job_id # Another scheduler has queued this task again or executor.has_task(ti) # This scheduler has this task already + or ( + # Resume-after-defer: trigger moved TI to scheduled (next_method set) before we saw the + # executor success from the defer exit for the same try_number. + ti.state == TaskInstanceState.SCHEDULED + and state == TaskInstanceState.SUCCESS + and ti.next_method is not None + ) ) if ti_queued and not ti_requeued: diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 16c42e8acd029..13d8a4009a1c1 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -816,6 +816,65 @@ def test_process_executor_events_ti_requeued( self.job_runner.executor.callback_sink.send.assert_not_called() mock_stats.incr.assert_not_called() + @mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest") + @mock.patch("airflow._shared.observability.metrics.stats._get_backend") + def test_process_executor_events_stale_success_when_scheduled_after_defer( + self, mock_get_backend, mock_task_callback, dag_maker + ): + """ + Trigger moved TI to scheduled (resume after defer) before executor success from defer exit arrived. + + Regression for https://github.com/apache/airflow/issues/66374 — must not treat as state mismatch. + """ + mock_stats = mock.MagicMock(spec=StatsLogger) + mock_get_backend.return_value = mock_stats + dag_id = "test_process_executor_events_stale_success_scheduled_after_defer" + task_id_1 = "dummy_task" + + session = settings.Session() + with dag_maker(dag_id=dag_id, fileloc="/test_path1/"): + task1 = EmptyOperator(task_id=task_id_1) + ti1 = dag_maker.create_dagrun().get_task_instance(task1.task_id) + + executor = MockExecutor(do_update=False) + task_callback = mock.MagicMock() + mock_task_callback.return_value = task_callback + scheduler_job = Job() + session.add(scheduler_job) + session.flush() + self.job_runner = SchedulerJobRunner(scheduler_job, executors=[executor]) + + ti1.state = State.SCHEDULED + ti1.next_method = "execute_callback" + ti1.queued_by_job_id = scheduler_job.id + ti1.try_number = 1 + session.merge(ti1) + session.commit() + + executor.event_buffer[ti1.key] = State.SUCCESS, None + executor.has_task = mock.MagicMock(return_value=False) + mock_stats.incr.reset_mock() + + self.job_runner._process_executor_events(executor=executor, session=session) + ti1.refresh_from_db(session=session) + assert ti1.state == State.SCHEDULED + self.job_runner.executor.callback_sink.send.assert_not_called() + mock_stats.incr.assert_not_called() + + # Without next_method, scheduled + stale success is still a mismatch (e.g. external kill). + ti1.next_method = None + session.merge(ti1) + session.commit() + + executor.event_buffer[ti1.key] = State.SUCCESS, None + mock_stats.incr.reset_mock() + + self.job_runner._process_executor_events(executor=executor, session=session) + mock_stats.incr.assert_any_call( + "scheduler.tasks.killed_externally", + tags={"dag_id": dag_id, "task_id": ti1.task_id}, + ) + @mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest") @mock.patch("airflow._shared.observability.metrics.stats._get_backend") def test_process_executor_events_multiple_try_numbers_warns(