diff --git a/airflow-core/newsfragments/66395.misc.rst b/airflow-core/newsfragments/66395.misc.rst new file mode 100644 index 0000000000000..65dc94f8e5b06 --- /dev/null +++ b/airflow-core/newsfragments/66395.misc.rst @@ -0,0 +1 @@ +Suppressed task instance listener exceptions now log the failing hook name as a structured field. diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py index 65e8260c2f47a..8ad91dfc6e82c 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py @@ -76,19 +76,25 @@ def _validate_patch_task_instance_body( def _emit_state_listener_hooks(updated_tis: list[TI], new_state: str | TaskInstanceState) -> None: """Fire listener hooks for the given TIs based on their new state. Listener errors are logged.""" for ti in updated_tis: - try: - if new_state == TaskInstanceState.SUCCESS: + if new_state == TaskInstanceState.SUCCESS: + try: get_listener_manager().hook.on_task_instance_success(previous_state=None, task_instance=ti) - elif new_state == TaskInstanceState.FAILED: + except Exception: + log.exception("error calling listener for hook %r", "on_task_instance_success") + elif new_state == TaskInstanceState.FAILED: + try: get_listener_manager().hook.on_task_instance_failed( previous_state=None, task_instance=ti, error=f"TaskInstance's state was manually set to `{TaskInstanceState.FAILED}`.", ) - elif new_state == TaskInstanceState.SKIPPED: + except Exception: + log.exception("error calling listener for hook %r", "on_task_instance_failed") + elif new_state == TaskInstanceState.SKIPPED: + try: get_listener_manager().hook.on_task_instance_skipped(previous_state=None, task_instance=ti) - except Exception: - log.exception("error calling listener") + except Exception: + log.exception("error calling listener for hook %r", "on_task_instance_skipped") def _reload_tis_with_rendered_fields(tis: list[TI], session: Session) -> list[TI]: diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index e5b19f2768da9..f88994eebe446 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -1776,7 +1776,7 @@ def fetch_handle_failure_context( previous_state=TaskInstanceState.RUNNING, task_instance=ti, error=error ) except Exception: - log.exception("error calling listener") + log.exception("error calling listener for hook %r", "on_task_instance_failed") return ti diff --git a/airflow-core/tests/unit/listeners/test_listeners.py b/airflow-core/tests/unit/listeners/test_listeners.py index aad2ea7b6e863..294b0b91842bc 100644 --- a/airflow-core/tests/unit/listeners/test_listeners.py +++ b/airflow-core/tests/unit/listeners/test_listeners.py @@ -118,7 +118,7 @@ def test_listener_suppresses_exceptions(create_task_instance, session, cap_struc ti = create_task_instance(session=session, state=TaskInstanceState.QUEUED) ti.run() - assert "error calling listener" in cap_structlog + assert "error calling listener for hook 'on_task_instance_success'" in cap_structlog @provide_session diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 56ba8343c648b..84cd07ef92c99 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -1178,7 +1178,7 @@ def _prepare(ti: RuntimeTaskInstance, log: Logger, context: Context) -> ToSuperv previous_state=TaskInstanceState.QUEUED, task_instance=ti ) except Exception: - log.exception("error calling listener") + log.exception("error calling listener for hook %r", "on_task_instance_running") # No error, carry on and execute the task return None @@ -1903,7 +1903,7 @@ def finalize( previous_state=TaskInstanceState.RUNNING, task_instance=ti ) except Exception: - log.exception("error calling listener") + log.exception("error calling listener for hook %r", "on_task_instance_success") elif state == TaskInstanceState.SKIPPED: _run_task_state_change_callbacks(task, "on_skipped_callback", context, log) try: @@ -1911,7 +1911,7 @@ def finalize( previous_state=TaskInstanceState.RUNNING, task_instance=ti ) except Exception: - log.exception("error calling listener") + log.exception("error calling listener for hook %r", "on_task_instance_skipped") elif state == TaskInstanceState.UP_FOR_RETRY: _run_task_state_change_callbacks(task, "on_retry_callback", context, log) try: @@ -1919,7 +1919,7 @@ def finalize( previous_state=TaskInstanceState.RUNNING, task_instance=ti, error=error ) except Exception: - log.exception("error calling listener") + log.exception("error calling listener for hook %r", "on_task_instance_failed") if error and task.email_on_retry and task.email: _send_error_email_notification(task, ti, context, error, log) elif state == TaskInstanceState.FAILED: @@ -1929,7 +1929,7 @@ def finalize( previous_state=TaskInstanceState.RUNNING, task_instance=ti, error=error ) except Exception: - log.exception("error calling listener") + log.exception("error calling listener for hook %r", "on_task_instance_failed") if error and task.email_on_failure and task.email: _send_error_email_notification(task, ti, context, error, log) diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index 630aff9094ed1..0df7236b38fec 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -3941,6 +3941,43 @@ def execute(self, context): assert listener.state == [TaskInstanceState.RUNNING, TaskInstanceState.SUCCESS] + def test_listener_error_log_includes_hook_name( + self, mocked_parse, mock_supervisor_comms, listener_manager + ): + """When a listener hook raises, the exception log must identify which hook + raised so plugin authors can debug across multiple registered listeners.""" + + class ThrowingListener: + @hookimpl + def on_task_instance_success(self, previous_state, task_instance): + raise RuntimeError("listener boom") + + listener_manager(ThrowingListener()) + + class CustomOperator(BaseOperator): + def execute(self, context): + pass + + task = CustomOperator(task_id="test_listener_error_log_includes_hook_name") + dag = get_inline_dag(dag_id="test_dag", task=task) + ti = TaskInstance( + id=uuid7(), + task_id=task.task_id, + dag_id=dag.dag_id, + run_id="test_run", + try_number=1, + dag_version_id=uuid7(), + ) + runtime_ti = RuntimeTaskInstance.model_construct( + **ti.model_dump(exclude_unset=True), task=task, start_date=timezone.utcnow() + ) + log = mock.MagicMock() + context = runtime_ti.get_template_context() + state, _, _ = run(runtime_ti, context, log) + finalize(runtime_ti, state, context, log) + + log.exception.assert_any_call("error calling listener for hook %r", "on_task_instance_success") + @pytest.mark.parametrize( "exception", [