From 9036a2f5d6f5bb5485dbb07943656a86019cc7ee Mon Sep 17 00:00:00 2001 From: Stefan Wang <21269208+1fanwang@users.noreply.github.com> Date: Tue, 5 May 2026 02:04:35 -0700 Subject: [PATCH 1/7] Include hook name in suppressed listener-exception log When a listener hookimpl raises, the suppressed-exception log line now identifies which hook raised. The previous generic "error calling listener" made it impossible to tell which of the registered hooks failed without re-reading the stack trace, especially painful when several listeners are registered and one of them sporadically misbehaves. The new format is "error calling listener for hook 'on_task_instance_'". Existing listener-error suppression behavior is preserved unchanged. Scope: task instance listener call sites (task_runner.py, taskinstance.py on the API server retry path, and the manual-set-state path in the fastapi service). DagRun, asset, and dag-processing listener call sites follow the same pattern and can be migrated incrementally. --- .../listener-hook-name-in-error-log.misc.rst | 9 +++++ .../services/public/task_instances.py | 18 ++++++--- .../src/airflow/models/taskinstance.py | 2 +- .../tests/unit/listeners/test_listeners.py | 3 ++ .../airflow/sdk/execution_time/task_runner.py | 10 ++--- .../execution_time/test_task_runner.py | 39 +++++++++++++++++++ 6 files changed, 69 insertions(+), 12 deletions(-) create mode 100644 airflow-core/newsfragments/listener-hook-name-in-error-log.misc.rst diff --git a/airflow-core/newsfragments/listener-hook-name-in-error-log.misc.rst b/airflow-core/newsfragments/listener-hook-name-in-error-log.misc.rst new file mode 100644 index 0000000000000..564510e5a5329 --- /dev/null +++ b/airflow-core/newsfragments/listener-hook-name-in-error-log.misc.rst @@ -0,0 +1,9 @@ +When a task instance listener hook raises an exception, the suppressed-error +log line now identifies which hook raised. Previously every listener failure +across the codebase logged the same generic ``"error calling listener"`` +message; the new format ``"error calling listener for hook 'on_task_instance_success'"`` +lets plugin authors identify the failing hook directly from the log without +re-reading the stack frame. + +Behavior is otherwise unchanged — listener exceptions are still suppressed and +do not affect task execution. diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py index 65e8260c2f47a..8ad91dfc6e82c 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py @@ -76,19 +76,25 @@ def _validate_patch_task_instance_body( def _emit_state_listener_hooks(updated_tis: list[TI], new_state: str | TaskInstanceState) -> None: """Fire listener hooks for the given TIs based on their new state. Listener errors are logged.""" for ti in updated_tis: - try: - if new_state == TaskInstanceState.SUCCESS: + if new_state == TaskInstanceState.SUCCESS: + try: get_listener_manager().hook.on_task_instance_success(previous_state=None, task_instance=ti) - elif new_state == TaskInstanceState.FAILED: + except Exception: + log.exception("error calling listener for hook %r", "on_task_instance_success") + elif new_state == TaskInstanceState.FAILED: + try: get_listener_manager().hook.on_task_instance_failed( previous_state=None, task_instance=ti, error=f"TaskInstance's state was manually set to `{TaskInstanceState.FAILED}`.", ) - elif new_state == TaskInstanceState.SKIPPED: + except Exception: + log.exception("error calling listener for hook %r", "on_task_instance_failed") + elif new_state == TaskInstanceState.SKIPPED: + try: get_listener_manager().hook.on_task_instance_skipped(previous_state=None, task_instance=ti) - except Exception: - log.exception("error calling listener") + except Exception: + log.exception("error calling listener for hook %r", "on_task_instance_skipped") def _reload_tis_with_rendered_fields(tis: list[TI], session: Session) -> list[TI]: diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index e5b19f2768da9..f88994eebe446 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -1776,7 +1776,7 @@ def fetch_handle_failure_context( previous_state=TaskInstanceState.RUNNING, task_instance=ti, error=error ) except Exception: - log.exception("error calling listener") + log.exception("error calling listener for hook %r", "on_task_instance_failed") return ti diff --git a/airflow-core/tests/unit/listeners/test_listeners.py b/airflow-core/tests/unit/listeners/test_listeners.py index aad2ea7b6e863..e609929928b04 100644 --- a/airflow-core/tests/unit/listeners/test_listeners.py +++ b/airflow-core/tests/unit/listeners/test_listeners.py @@ -119,6 +119,9 @@ def test_listener_suppresses_exceptions(create_task_instance, session, cap_struc ti = create_task_instance(session=session, state=TaskInstanceState.QUEUED) ti.run() assert "error calling listener" in cap_structlog + # The hook name must appear in the log so plugin authors can identify which + # listener hook raised when multiple hooks are registered. + assert "on_task_instance_success" in cap_structlog @provide_session diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 56ba8343c648b..84cd07ef92c99 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -1178,7 +1178,7 @@ def _prepare(ti: RuntimeTaskInstance, log: Logger, context: Context) -> ToSuperv previous_state=TaskInstanceState.QUEUED, task_instance=ti ) except Exception: - log.exception("error calling listener") + log.exception("error calling listener for hook %r", "on_task_instance_running") # No error, carry on and execute the task return None @@ -1903,7 +1903,7 @@ def finalize( previous_state=TaskInstanceState.RUNNING, task_instance=ti ) except Exception: - log.exception("error calling listener") + log.exception("error calling listener for hook %r", "on_task_instance_success") elif state == TaskInstanceState.SKIPPED: _run_task_state_change_callbacks(task, "on_skipped_callback", context, log) try: @@ -1911,7 +1911,7 @@ def finalize( previous_state=TaskInstanceState.RUNNING, task_instance=ti ) except Exception: - log.exception("error calling listener") + log.exception("error calling listener for hook %r", "on_task_instance_skipped") elif state == TaskInstanceState.UP_FOR_RETRY: _run_task_state_change_callbacks(task, "on_retry_callback", context, log) try: @@ -1919,7 +1919,7 @@ def finalize( previous_state=TaskInstanceState.RUNNING, task_instance=ti, error=error ) except Exception: - log.exception("error calling listener") + log.exception("error calling listener for hook %r", "on_task_instance_failed") if error and task.email_on_retry and task.email: _send_error_email_notification(task, ti, context, error, log) elif state == TaskInstanceState.FAILED: @@ -1929,7 +1929,7 @@ def finalize( previous_state=TaskInstanceState.RUNNING, task_instance=ti, error=error ) except Exception: - log.exception("error calling listener") + log.exception("error calling listener for hook %r", "on_task_instance_failed") if error and task.email_on_failure and task.email: _send_error_email_notification(task, ti, context, error, log) diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index 630aff9094ed1..1a60d2d4040a3 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -3941,6 +3941,45 @@ def execute(self, context): assert listener.state == [TaskInstanceState.RUNNING, TaskInstanceState.SUCCESS] + def test_listener_error_log_includes_hook_name( + self, mocked_parse, mock_supervisor_comms, listener_manager + ): + """When a listener hook raises, the exception log must identify which hook + raised so plugin authors can debug across multiple registered listeners.""" + + class ThrowingListener: + @hookimpl + def on_task_instance_success(self, previous_state, task_instance): + raise RuntimeError("listener boom") + + listener_manager(ThrowingListener()) + + class CustomOperator(BaseOperator): + def execute(self, context): + pass + + task = CustomOperator(task_id="test_listener_error_log_includes_hook_name") + dag = get_inline_dag(dag_id="test_dag", task=task) + ti = TaskInstance( + id=uuid7(), + task_id=task.task_id, + dag_id=dag.dag_id, + run_id="test_run", + try_number=1, + dag_version_id=uuid7(), + ) + runtime_ti = RuntimeTaskInstance.model_construct( + **ti.model_dump(exclude_unset=True), task=task, start_date=timezone.utcnow() + ) + log = mock.MagicMock() + context = runtime_ti.get_template_context() + state, _, _ = run(runtime_ti, context, log) + finalize(runtime_ti, state, context, log) + + log.exception.assert_any_call( + "error calling listener for hook %r", "on_task_instance_success" + ) + @pytest.mark.parametrize( "exception", [ From e24ec4f4a0cf655c6d5c0aaae7fc1c19f0ff3ea7 Mon Sep 17 00:00:00 2001 From: Stefan Wang <21269208+1fanwang@users.noreply.github.com> Date: Tue, 5 May 2026 02:05:15 -0700 Subject: [PATCH 2/7] Rename newsfragment to match PR number --- .../{listener-hook-name-in-error-log.misc.rst => 66395.misc.rst} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename airflow-core/newsfragments/{listener-hook-name-in-error-log.misc.rst => 66395.misc.rst} (100%) diff --git a/airflow-core/newsfragments/listener-hook-name-in-error-log.misc.rst b/airflow-core/newsfragments/66395.misc.rst similarity index 100% rename from airflow-core/newsfragments/listener-hook-name-in-error-log.misc.rst rename to airflow-core/newsfragments/66395.misc.rst From f64f1d89cea19c10c2c1c3c04b3c8feda376b1cd Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 13:45:30 -0700 Subject: [PATCH 3/7] fix: newsfragment must have empty second line (lint rule) --- airflow-core/newsfragments/66395.misc.rst | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/airflow-core/newsfragments/66395.misc.rst b/airflow-core/newsfragments/66395.misc.rst index 564510e5a5329..edc3c6ec55dc4 100644 --- a/airflow-core/newsfragments/66395.misc.rst +++ b/airflow-core/newsfragments/66395.misc.rst @@ -1,9 +1,8 @@ -When a task instance listener hook raises an exception, the suppressed-error -log line now identifies which hook raised. Previously every listener failure -across the codebase logged the same generic ``"error calling listener"`` -message; the new format ``"error calling listener for hook 'on_task_instance_success'"`` -lets plugin authors identify the failing hook directly from the log without -re-reading the stack frame. +When a task instance listener hook raises an exception, the suppressed-error log line now identifies which hook raised. -Behavior is otherwise unchanged — listener exceptions are still suppressed and -do not affect task execution. +Previously every listener failure across the codebase logged the same generic +``"error calling listener"`` message; the new format +``"error calling listener for hook 'on_task_instance_success'"`` lets plugin +authors identify the failing hook directly from the log without re-reading +the stack frame. Behavior is otherwise unchanged — listener exceptions are +still suppressed and do not affect task execution. From 0423d0b875f00f61df361aba7b08781e1de58a4a Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 13:51:25 -0700 Subject: [PATCH 4/7] Use structlog kwarg for hook field instead of %r format --- .../core_api/services/public/task_instances.py | 6 +++--- airflow-core/src/airflow/models/taskinstance.py | 2 +- task-sdk/src/airflow/sdk/execution_time/task_runner.py | 10 +++++----- .../tests/task_sdk/execution_time/test_task_runner.py | 4 +--- 4 files changed, 10 insertions(+), 12 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py index 8ad91dfc6e82c..a48a84ac0038d 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py @@ -80,7 +80,7 @@ def _emit_state_listener_hooks(updated_tis: list[TI], new_state: str | TaskInsta try: get_listener_manager().hook.on_task_instance_success(previous_state=None, task_instance=ti) except Exception: - log.exception("error calling listener for hook %r", "on_task_instance_success") + log.exception("error calling listener", hook="on_task_instance_success") elif new_state == TaskInstanceState.FAILED: try: get_listener_manager().hook.on_task_instance_failed( @@ -89,12 +89,12 @@ def _emit_state_listener_hooks(updated_tis: list[TI], new_state: str | TaskInsta error=f"TaskInstance's state was manually set to `{TaskInstanceState.FAILED}`.", ) except Exception: - log.exception("error calling listener for hook %r", "on_task_instance_failed") + log.exception("error calling listener", hook="on_task_instance_failed") elif new_state == TaskInstanceState.SKIPPED: try: get_listener_manager().hook.on_task_instance_skipped(previous_state=None, task_instance=ti) except Exception: - log.exception("error calling listener for hook %r", "on_task_instance_skipped") + log.exception("error calling listener", hook="on_task_instance_skipped") def _reload_tis_with_rendered_fields(tis: list[TI], session: Session) -> list[TI]: diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index f88994eebe446..598cf078e5ec3 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -1776,7 +1776,7 @@ def fetch_handle_failure_context( previous_state=TaskInstanceState.RUNNING, task_instance=ti, error=error ) except Exception: - log.exception("error calling listener for hook %r", "on_task_instance_failed") + log.exception("error calling listener", hook="on_task_instance_failed") return ti diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 84cd07ef92c99..f6c778ff3b83c 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -1178,7 +1178,7 @@ def _prepare(ti: RuntimeTaskInstance, log: Logger, context: Context) -> ToSuperv previous_state=TaskInstanceState.QUEUED, task_instance=ti ) except Exception: - log.exception("error calling listener for hook %r", "on_task_instance_running") + log.exception("error calling listener", hook="on_task_instance_running") # No error, carry on and execute the task return None @@ -1903,7 +1903,7 @@ def finalize( previous_state=TaskInstanceState.RUNNING, task_instance=ti ) except Exception: - log.exception("error calling listener for hook %r", "on_task_instance_success") + log.exception("error calling listener", hook="on_task_instance_success") elif state == TaskInstanceState.SKIPPED: _run_task_state_change_callbacks(task, "on_skipped_callback", context, log) try: @@ -1911,7 +1911,7 @@ def finalize( previous_state=TaskInstanceState.RUNNING, task_instance=ti ) except Exception: - log.exception("error calling listener for hook %r", "on_task_instance_skipped") + log.exception("error calling listener", hook="on_task_instance_skipped") elif state == TaskInstanceState.UP_FOR_RETRY: _run_task_state_change_callbacks(task, "on_retry_callback", context, log) try: @@ -1919,7 +1919,7 @@ def finalize( previous_state=TaskInstanceState.RUNNING, task_instance=ti, error=error ) except Exception: - log.exception("error calling listener for hook %r", "on_task_instance_failed") + log.exception("error calling listener", hook="on_task_instance_failed") if error and task.email_on_retry and task.email: _send_error_email_notification(task, ti, context, error, log) elif state == TaskInstanceState.FAILED: @@ -1929,7 +1929,7 @@ def finalize( previous_state=TaskInstanceState.RUNNING, task_instance=ti, error=error ) except Exception: - log.exception("error calling listener for hook %r", "on_task_instance_failed") + log.exception("error calling listener", hook="on_task_instance_failed") if error and task.email_on_failure and task.email: _send_error_email_notification(task, ti, context, error, log) diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index 1a60d2d4040a3..7afb6763491da 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -3976,9 +3976,7 @@ def execute(self, context): state, _, _ = run(runtime_ti, context, log) finalize(runtime_ti, state, context, log) - log.exception.assert_any_call( - "error calling listener for hook %r", "on_task_instance_success" - ) + log.exception.assert_any_call("error calling listener", hook="on_task_instance_success") @pytest.mark.parametrize( "exception", From 1cfca54a176028ef2d310dbfca6a448229571c12 Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 18:30:12 -0700 Subject: [PATCH 5/7] fix newsfragment: misc type must be single-line --- airflow-core/newsfragments/66395.misc.rst | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/airflow-core/newsfragments/66395.misc.rst b/airflow-core/newsfragments/66395.misc.rst index edc3c6ec55dc4..65dc94f8e5b06 100644 --- a/airflow-core/newsfragments/66395.misc.rst +++ b/airflow-core/newsfragments/66395.misc.rst @@ -1,8 +1 @@ -When a task instance listener hook raises an exception, the suppressed-error log line now identifies which hook raised. - -Previously every listener failure across the codebase logged the same generic -``"error calling listener"`` message; the new format -``"error calling listener for hook 'on_task_instance_success'"`` lets plugin -authors identify the failing hook directly from the log without re-reading -the stack frame. Behavior is otherwise unchanged — listener exceptions are -still suppressed and do not affect task execution. +Suppressed task instance listener exceptions now log the failing hook name as a structured field. From ecb1ffcb2c13491566b866ce31e3c15aa2b24532 Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 18:37:36 -0700 Subject: [PATCH 6/7] fix listener test assertion: structlog kwarg requires dict-form match --- airflow-core/tests/unit/listeners/test_listeners.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/airflow-core/tests/unit/listeners/test_listeners.py b/airflow-core/tests/unit/listeners/test_listeners.py index e609929928b04..cf4bff27e88ce 100644 --- a/airflow-core/tests/unit/listeners/test_listeners.py +++ b/airflow-core/tests/unit/listeners/test_listeners.py @@ -118,10 +118,7 @@ def test_listener_suppresses_exceptions(create_task_instance, session, cap_struc ti = create_task_instance(session=session, state=TaskInstanceState.QUEUED) ti.run() - assert "error calling listener" in cap_structlog - # The hook name must appear in the log so plugin authors can identify which - # listener hook raised when multiple hooks are registered. - assert "on_task_instance_success" in cap_structlog + assert {"event": "error calling listener", "hook": "on_task_instance_success"} in cap_structlog @provide_session From 53d0cb6e74f85672e986aab8a0116cfa456da14c Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 21:40:40 -0700 Subject: [PATCH 7/7] fix listener-error log: use format string for stdlib Logger compat stdlib Logger.exception accepts (msg, *args) but not arbitrary kwargs; mypy flagged log.exception('msg', hook=name) as call-arg error in taskinstance.py and other stdlib-Logger sites. Reverting to format-string form which works for both stdlib and structlog loggers. The structlog adapter interpolates the format args into the event field, so cap_structlog still captures the rendered hook name. Tests updated to match the rendered event field. --- .../core_api/services/public/task_instances.py | 6 +++--- airflow-core/src/airflow/models/taskinstance.py | 2 +- airflow-core/tests/unit/listeners/test_listeners.py | 2 +- task-sdk/src/airflow/sdk/execution_time/task_runner.py | 10 +++++----- .../tests/task_sdk/execution_time/test_task_runner.py | 2 +- 5 files changed, 11 insertions(+), 11 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py index a48a84ac0038d..8ad91dfc6e82c 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py @@ -80,7 +80,7 @@ def _emit_state_listener_hooks(updated_tis: list[TI], new_state: str | TaskInsta try: get_listener_manager().hook.on_task_instance_success(previous_state=None, task_instance=ti) except Exception: - log.exception("error calling listener", hook="on_task_instance_success") + log.exception("error calling listener for hook %r", "on_task_instance_success") elif new_state == TaskInstanceState.FAILED: try: get_listener_manager().hook.on_task_instance_failed( @@ -89,12 +89,12 @@ def _emit_state_listener_hooks(updated_tis: list[TI], new_state: str | TaskInsta error=f"TaskInstance's state was manually set to `{TaskInstanceState.FAILED}`.", ) except Exception: - log.exception("error calling listener", hook="on_task_instance_failed") + log.exception("error calling listener for hook %r", "on_task_instance_failed") elif new_state == TaskInstanceState.SKIPPED: try: get_listener_manager().hook.on_task_instance_skipped(previous_state=None, task_instance=ti) except Exception: - log.exception("error calling listener", hook="on_task_instance_skipped") + log.exception("error calling listener for hook %r", "on_task_instance_skipped") def _reload_tis_with_rendered_fields(tis: list[TI], session: Session) -> list[TI]: diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 598cf078e5ec3..f88994eebe446 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -1776,7 +1776,7 @@ def fetch_handle_failure_context( previous_state=TaskInstanceState.RUNNING, task_instance=ti, error=error ) except Exception: - log.exception("error calling listener", hook="on_task_instance_failed") + log.exception("error calling listener for hook %r", "on_task_instance_failed") return ti diff --git a/airflow-core/tests/unit/listeners/test_listeners.py b/airflow-core/tests/unit/listeners/test_listeners.py index cf4bff27e88ce..294b0b91842bc 100644 --- a/airflow-core/tests/unit/listeners/test_listeners.py +++ b/airflow-core/tests/unit/listeners/test_listeners.py @@ -118,7 +118,7 @@ def test_listener_suppresses_exceptions(create_task_instance, session, cap_struc ti = create_task_instance(session=session, state=TaskInstanceState.QUEUED) ti.run() - assert {"event": "error calling listener", "hook": "on_task_instance_success"} in cap_structlog + assert "error calling listener for hook 'on_task_instance_success'" in cap_structlog @provide_session diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index f6c778ff3b83c..84cd07ef92c99 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -1178,7 +1178,7 @@ def _prepare(ti: RuntimeTaskInstance, log: Logger, context: Context) -> ToSuperv previous_state=TaskInstanceState.QUEUED, task_instance=ti ) except Exception: - log.exception("error calling listener", hook="on_task_instance_running") + log.exception("error calling listener for hook %r", "on_task_instance_running") # No error, carry on and execute the task return None @@ -1903,7 +1903,7 @@ def finalize( previous_state=TaskInstanceState.RUNNING, task_instance=ti ) except Exception: - log.exception("error calling listener", hook="on_task_instance_success") + log.exception("error calling listener for hook %r", "on_task_instance_success") elif state == TaskInstanceState.SKIPPED: _run_task_state_change_callbacks(task, "on_skipped_callback", context, log) try: @@ -1911,7 +1911,7 @@ def finalize( previous_state=TaskInstanceState.RUNNING, task_instance=ti ) except Exception: - log.exception("error calling listener", hook="on_task_instance_skipped") + log.exception("error calling listener for hook %r", "on_task_instance_skipped") elif state == TaskInstanceState.UP_FOR_RETRY: _run_task_state_change_callbacks(task, "on_retry_callback", context, log) try: @@ -1919,7 +1919,7 @@ def finalize( previous_state=TaskInstanceState.RUNNING, task_instance=ti, error=error ) except Exception: - log.exception("error calling listener", hook="on_task_instance_failed") + log.exception("error calling listener for hook %r", "on_task_instance_failed") if error and task.email_on_retry and task.email: _send_error_email_notification(task, ti, context, error, log) elif state == TaskInstanceState.FAILED: @@ -1929,7 +1929,7 @@ def finalize( previous_state=TaskInstanceState.RUNNING, task_instance=ti, error=error ) except Exception: - log.exception("error calling listener", hook="on_task_instance_failed") + log.exception("error calling listener for hook %r", "on_task_instance_failed") if error and task.email_on_failure and task.email: _send_error_email_notification(task, ti, context, error, log) diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index 7afb6763491da..0df7236b38fec 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -3976,7 +3976,7 @@ def execute(self, context): state, _, _ = run(runtime_ti, context, log) finalize(runtime_ti, state, context, log) - log.exception.assert_any_call("error calling listener", hook="on_task_instance_success") + log.exception.assert_any_call("error calling listener for hook %r", "on_task_instance_success") @pytest.mark.parametrize( "exception",