Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1198,7 +1198,9 @@ def process_executor_events(
The method handles several key scenarios:
1. **Normal task completion**: Updates task states for successful/failed tasks
2. **External termination**: Detects tasks killed outside Airflow and marks them as failed
3. **Task requeuing**: Handles tasks that were requeued by other schedulers or executors
3. **Task requeuing**: Handles tasks that were requeued by other schedulers or executors,
and tasks moved to ``scheduled`` after a trigger fired so a stale executor success from the
pre-deferral worker exit does not fail the task instance
4. **Callback processing**: Sends task callback requests to DAG Processor for execution
5. **Email notifications**: Sends email notification requests to DAG Processor

Expand Down Expand Up @@ -1348,12 +1350,14 @@ def process_executor_events(
ti.pid,
)

# There are two scenarios why the same TI with the same try_number is queued
# after executor is finished with it:
# There are multiple scenarios why the same TI with the same try_number looks queued or
# waiting after the executor is finished with it:
# 1) the TI was killed externally and it had no time to mark itself failed
# - in this case we should mark it as failed here.
# 2) the TI has been requeued after getting deferred - in this case either our executor has it
# or the TI is queued by another job. Either ways we should not fail it.
# 3) the trigger already put the TI back to scheduled (resume after defer) but the executor success
# from the worker exit after defer() has not been processed yet - should not fail it.

# All of this could also happen if the state is "running",
# but that is handled by the scheduler detecting task instances without heartbeats.
Expand All @@ -1367,6 +1371,13 @@ def process_executor_events(
ti_requeued = (
ti.queued_by_job_id != job_id # Another scheduler has queued this task again
or executor.has_task(ti) # This scheduler has this task already
or (
# Resume-after-defer: trigger moved TI to scheduled (next_method set) before we saw the
# executor success from the defer exit for the same try_number.
ti.state == TaskInstanceState.SCHEDULED
and state == TaskInstanceState.SUCCESS
and ti.next_method is not None
)
)

if ti_queued and not ti_requeued:
Expand Down
59 changes: 59 additions & 0 deletions airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,65 @@ def test_process_executor_events_ti_requeued(
self.job_runner.executor.callback_sink.send.assert_not_called()
mock_stats.incr.assert_not_called()

@mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest")
@mock.patch("airflow._shared.observability.metrics.stats._get_backend")
def test_process_executor_events_stale_success_when_scheduled_after_defer(
self, mock_get_backend, mock_task_callback, dag_maker
):
"""
Trigger moved TI to scheduled (resume after defer) before executor success from defer exit arrived.

Regression for https://github.com/apache/airflow/issues/66374 — must not treat as state mismatch.
"""
mock_stats = mock.MagicMock(spec=StatsLogger)
mock_get_backend.return_value = mock_stats
dag_id = "test_process_executor_events_stale_success_scheduled_after_defer"
task_id_1 = "dummy_task"

session = settings.Session()
with dag_maker(dag_id=dag_id, fileloc="/test_path1/"):
task1 = EmptyOperator(task_id=task_id_1)
ti1 = dag_maker.create_dagrun().get_task_instance(task1.task_id)

executor = MockExecutor(do_update=False)
task_callback = mock.MagicMock()
mock_task_callback.return_value = task_callback
scheduler_job = Job()
session.add(scheduler_job)
session.flush()
self.job_runner = SchedulerJobRunner(scheduler_job, executors=[executor])

ti1.state = State.SCHEDULED
ti1.next_method = "execute_callback"
ti1.queued_by_job_id = scheduler_job.id
ti1.try_number = 1
session.merge(ti1)
session.commit()

executor.event_buffer[ti1.key] = State.SUCCESS, None
executor.has_task = mock.MagicMock(return_value=False)
mock_stats.incr.reset_mock()

self.job_runner._process_executor_events(executor=executor, session=session)
ti1.refresh_from_db(session=session)
assert ti1.state == State.SCHEDULED
self.job_runner.executor.callback_sink.send.assert_not_called()
mock_stats.incr.assert_not_called()

# Without next_method, scheduled + stale success is still a mismatch (e.g. external kill).
ti1.next_method = None
session.merge(ti1)
session.commit()

executor.event_buffer[ti1.key] = State.SUCCESS, None
mock_stats.incr.reset_mock()

self.job_runner._process_executor_events(executor=executor, session=session)
mock_stats.incr.assert_any_call(
"scheduler.tasks.killed_externally",
tags={"dag_id": dag_id, "task_id": ti1.task_id},
)

@mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest")
@mock.patch("airflow._shared.observability.metrics.stats._get_backend")
def test_process_executor_events_multiple_try_numbers_warns(
Expand Down
Loading