From be3fcc203f0f4f5d5b080b6d7befa8651b66ad6f Mon Sep 17 00:00:00 2001 From: Stefan Wang <21269208+1fanwang@users.noreply.github.com> Date: Tue, 5 May 2026 02:10:50 -0700 Subject: [PATCH 1/7] Extend listener hook-name log to remaining call sites Follow-up to #66395, which introduced the hook-name-in-log change for the task instance listener call sites. This PR applies the identical mechanical edit to the remaining listener-call surfaces so suppressed-listener-error logs identify which hook raised across the entire codebase: - DagRun state changes: on_dag_run_success, on_dag_run_failed - DAG import errors: on_existing_dag_import_error, on_new_dag_import_error - Asset events: on_asset_created, on_asset_alias_created, on_asset_changed, on_asset_event_emitted - Component lifecycle: on_starting, before_stopping (Job + Task SDK runner) After this lands, every listener-call site in airflow-core and the Task SDK follows the same log format. Listener-exception suppression behavior is preserved. --- .../listener-hook-name-rest.misc.rst | 10 ++++++++++ .../core_api/routes/public/dag_run.py | 4 ++-- airflow-core/src/airflow/assets/manager.py | 8 ++++---- .../src/airflow/dag_processing/collection.py | 4 ++-- airflow-core/src/airflow/jobs/job.py | 4 ++-- .../tests/unit/listeners/test_listeners.py | 18 ++++++++++++++++++ .../airflow/sdk/execution_time/task_runner.py | 4 ++-- 7 files changed, 40 insertions(+), 12 deletions(-) create mode 100644 airflow-core/newsfragments/listener-hook-name-rest.misc.rst diff --git a/airflow-core/newsfragments/listener-hook-name-rest.misc.rst b/airflow-core/newsfragments/listener-hook-name-rest.misc.rst new file mode 100644 index 0000000000000..747b1bfee69ad --- /dev/null +++ b/airflow-core/newsfragments/listener-hook-name-rest.misc.rst @@ -0,0 +1,10 @@ +Extends the listener-error-log change introduced for task instance hooks to +the remaining listener call sites: DagRun state changes (``on_dag_run_success``, +``on_dag_run_failed``), DAG import errors (``on_existing_dag_import_error``, +``on_new_dag_import_error``), asset events (``on_asset_created``, +``on_asset_alias_created``, ``on_asset_changed``, ``on_asset_event_emitted``), +and component lifecycle (``on_starting``, ``before_stopping``). + +Suppressed-listener-exception logs now identify which hook raised across every +listener-call surface in airflow-core and the Task SDK, completing the +observability change. diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py index a40676f8e186f..091dd31eb30c1 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -224,7 +224,7 @@ def patch_dag_run( try: get_listener_manager().hook.on_dag_run_success(dag_run=dag_run, msg="") except Exception: - log.exception("error calling listener") + log.exception("error calling listener for hook %r", "on_dag_run_success") elif attr_value == DAGRunPatchStates.QUEUED: set_dag_run_state_to_queued(dag=dag, run_id=dag_run.run_id, commit=True, session=session) # Not notifying on queued - only notifying on RUNNING, this is happening in scheduler @@ -233,7 +233,7 @@ def patch_dag_run( try: get_listener_manager().hook.on_dag_run_failed(dag_run=dag_run, msg="") except Exception: - log.exception("error calling listener") + log.exception("error calling listener for hook %r", "on_dag_run_failed") elif attr_name == "note": updated_dag_run = session.get(DagRun, dag_run.id) if updated_dag_run and updated_dag_run.dag_run_note is None: diff --git a/airflow-core/src/airflow/assets/manager.py b/airflow-core/src/airflow/assets/manager.py index f1730fb2e8130..f6c09025bf990 100644 --- a/airflow-core/src/airflow/assets/manager.py +++ b/airflow-core/src/airflow/assets/manager.py @@ -396,7 +396,7 @@ def notify_asset_created(asset: SerializedAsset): try: get_listener_manager().hook.on_asset_created(asset=asset) except Exception: - log.exception("error calling listener") + log.exception("error calling listener for hook %r", "on_asset_created") @staticmethod def notify_asset_alias_created(asset_assets: SerializedAssetAlias): @@ -404,7 +404,7 @@ def notify_asset_alias_created(asset_assets: SerializedAssetAlias): try: get_listener_manager().hook.on_asset_alias_created(asset_alias=asset_assets) except Exception: - log.exception("error calling listener") + log.exception("error calling listener for hook %r", "on_asset_alias_created") @staticmethod def notify_asset_changed(asset: SerializedAsset) -> None: @@ -412,7 +412,7 @@ def notify_asset_changed(asset: SerializedAsset) -> None: try: get_listener_manager().hook.on_asset_changed(asset=asset) except Exception: - log.exception("error calling listener") + log.exception("error calling listener for hook %r", "on_asset_changed") @staticmethod def nofity_asset_event_emitted(asset_event: ListenerAssetEvent) -> None: @@ -420,7 +420,7 @@ def nofity_asset_event_emitted(asset_event: ListenerAssetEvent) -> None: try: get_listener_manager().hook.on_asset_event_emitted(asset_event=asset_event) except Exception: - log.exception("error calling listener") + log.exception("error calling listener for hook %r", "on_asset_event_emitted") @classmethod def _queue_dagruns( diff --git a/airflow-core/src/airflow/dag_processing/collection.py b/airflow-core/src/airflow/dag_processing/collection.py index 4df0b589c1fd9..d40c1111231c5 100644 --- a/airflow-core/src/airflow/dag_processing/collection.py +++ b/airflow-core/src/airflow/dag_processing/collection.py @@ -396,7 +396,7 @@ def _update_import_errors( filename=import_error.full_file_path(), stacktrace=stacktrace ) except Exception: - log.exception("error calling listener") + log.exception("error calling listener for hook %r", "on_existing_dag_import_error") else: import_error = ParseImportError( filename=relative_fileloc, @@ -411,7 +411,7 @@ def _update_import_errors( filename=import_error.full_file_path(), stacktrace=stacktrace ) except Exception: - log.exception("error calling listener") + log.exception("error calling listener for hook %r", "on_new_dag_import_error") session.execute( update(DagModel) .where( diff --git a/airflow-core/src/airflow/jobs/job.py b/airflow-core/src/airflow/jobs/job.py index 4ab2defd81b75..4ac2c04e7b741 100644 --- a/airflow-core/src/airflow/jobs/job.py +++ b/airflow-core/src/airflow/jobs/job.py @@ -147,7 +147,7 @@ def __init__(self, heartrate=None, **kwargs): try: get_listener_manager().hook.on_starting(component=self) except Exception: - self.log.exception("error calling listener") + self.log.exception("error calling listener for hook %r", "on_starting") super().__init__(**kwargs) @cached_property @@ -280,7 +280,7 @@ def complete_execution(self, session: Session = NEW_SESSION): try: get_listener_manager().hook.before_stopping(component=self) except Exception: - self.log.exception("error calling listener") + self.log.exception("error calling listener for hook %r", "before_stopping") self.end_date = timezone.utcnow() session.merge(self) session.commit() diff --git a/airflow-core/tests/unit/listeners/test_listeners.py b/airflow-core/tests/unit/listeners/test_listeners.py index aad2ea7b6e863..fd5ad19a1415b 100644 --- a/airflow-core/tests/unit/listeners/test_listeners.py +++ b/airflow-core/tests/unit/listeners/test_listeners.py @@ -121,6 +121,24 @@ def test_listener_suppresses_exceptions(create_task_instance, session, cap_struc assert "error calling listener" in cap_structlog +def test_listener_lifecycle_error_log_includes_hook_name(cap_structlog, listener_manager): + """When a lifecycle listener raises, the log identifies the hook so plugin + authors can debug across multiple registered listeners.""" + from airflow.listeners import hookimpl + + class ThrowingLifecycleListener: + @hookimpl + def on_starting(self, component): + raise RuntimeError("listener boom") + + listener_manager(ThrowingLifecycleListener()) + + Job() # instantiating a Job fires the on_starting hook + + assert "error calling listener" in cap_structlog + assert "on_starting" in cap_structlog + + @provide_session def test_listener_captures_failed_taskinstances(create_task_instance_of_operator, session, listener_manager): listener_manager(full_listener) diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 56ba8343c648b..8f36c3c462157 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -959,7 +959,7 @@ def startup(msg: StartupDetails) -> tuple[RuntimeTaskInstance, Context, Logger]: try: get_listener_manager().hook.on_starting(component=TaskRunnerMarker()) except Exception: - log.exception("error calling listener") + log.exception("error calling listener for hook %r", "on_starting") with _airflow_parsing_context_manager(dag_id=msg.ti.dag_id, task_id=msg.ti.task_id): ti = parse(msg, log) @@ -1936,7 +1936,7 @@ def finalize( try: get_listener_manager().hook.before_stopping(component=TaskRunnerMarker()) except Exception: - log.exception("error calling listener") + log.exception("error calling listener for hook %r", "before_stopping") log.info("::endgroup::") From b0b6b762eabbe372df5d4fa1f605a62455224b45 Mon Sep 17 00:00:00 2001 From: Stefan Wang <21269208+1fanwang@users.noreply.github.com> Date: Tue, 5 May 2026 02:11:49 -0700 Subject: [PATCH 2/7] Rename newsfragment to match PR number --- .../{listener-hook-name-rest.misc.rst => 66397.misc.rst} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename airflow-core/newsfragments/{listener-hook-name-rest.misc.rst => 66397.misc.rst} (100%) diff --git a/airflow-core/newsfragments/listener-hook-name-rest.misc.rst b/airflow-core/newsfragments/66397.misc.rst similarity index 100% rename from airflow-core/newsfragments/listener-hook-name-rest.misc.rst rename to airflow-core/newsfragments/66397.misc.rst From 04d3ec687b67ab747ef8ae31fd21fdc1aea66949 Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 13:45:27 -0700 Subject: [PATCH 3/7] fix: newsfragment must have empty second line (lint rule) --- airflow-core/newsfragments/66397.misc.rst | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/airflow-core/newsfragments/66397.misc.rst b/airflow-core/newsfragments/66397.misc.rst index 747b1bfee69ad..fcae18431653d 100644 --- a/airflow-core/newsfragments/66397.misc.rst +++ b/airflow-core/newsfragments/66397.misc.rst @@ -1,9 +1,4 @@ -Extends the listener-error-log change introduced for task instance hooks to -the remaining listener call sites: DagRun state changes (``on_dag_run_success``, -``on_dag_run_failed``), DAG import errors (``on_existing_dag_import_error``, -``on_new_dag_import_error``), asset events (``on_asset_created``, -``on_asset_alias_created``, ``on_asset_changed``, ``on_asset_event_emitted``), -and component lifecycle (``on_starting``, ``before_stopping``). +Extend the listener-error-log change introduced for task instance hooks to the remaining listener call sites (DagRun state changes, DAG import errors, asset events, component lifecycle). Suppressed-listener-exception logs now identify which hook raised across every listener-call surface in airflow-core and the Task SDK, completing the From b11ed803dea6643afc871a5d54c7a6a4b0065911 Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 13:51:28 -0700 Subject: [PATCH 4/7] Use structlog kwarg for hook field instead of %r format --- .../airflow/api_fastapi/core_api/routes/public/dag_run.py | 4 ++-- airflow-core/src/airflow/assets/manager.py | 8 ++++---- airflow-core/src/airflow/dag_processing/collection.py | 4 ++-- airflow-core/src/airflow/jobs/job.py | 4 ++-- task-sdk/src/airflow/sdk/execution_time/task_runner.py | 4 ++-- 5 files changed, 12 insertions(+), 12 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py index 091dd31eb30c1..18d0af45fbdb3 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -224,7 +224,7 @@ def patch_dag_run( try: get_listener_manager().hook.on_dag_run_success(dag_run=dag_run, msg="") except Exception: - log.exception("error calling listener for hook %r", "on_dag_run_success") + log.exception("error calling listener", hook="on_dag_run_success") elif attr_value == DAGRunPatchStates.QUEUED: set_dag_run_state_to_queued(dag=dag, run_id=dag_run.run_id, commit=True, session=session) # Not notifying on queued - only notifying on RUNNING, this is happening in scheduler @@ -233,7 +233,7 @@ def patch_dag_run( try: get_listener_manager().hook.on_dag_run_failed(dag_run=dag_run, msg="") except Exception: - log.exception("error calling listener for hook %r", "on_dag_run_failed") + log.exception("error calling listener", hook="on_dag_run_failed") elif attr_name == "note": updated_dag_run = session.get(DagRun, dag_run.id) if updated_dag_run and updated_dag_run.dag_run_note is None: diff --git a/airflow-core/src/airflow/assets/manager.py b/airflow-core/src/airflow/assets/manager.py index f6c09025bf990..e9f2ac6b40ca2 100644 --- a/airflow-core/src/airflow/assets/manager.py +++ b/airflow-core/src/airflow/assets/manager.py @@ -396,7 +396,7 @@ def notify_asset_created(asset: SerializedAsset): try: get_listener_manager().hook.on_asset_created(asset=asset) except Exception: - log.exception("error calling listener for hook %r", "on_asset_created") + log.exception("error calling listener", hook="on_asset_created") @staticmethod def notify_asset_alias_created(asset_assets: SerializedAssetAlias): @@ -404,7 +404,7 @@ def notify_asset_alias_created(asset_assets: SerializedAssetAlias): try: get_listener_manager().hook.on_asset_alias_created(asset_alias=asset_assets) except Exception: - log.exception("error calling listener for hook %r", "on_asset_alias_created") + log.exception("error calling listener", hook="on_asset_alias_created") @staticmethod def notify_asset_changed(asset: SerializedAsset) -> None: @@ -412,7 +412,7 @@ def notify_asset_changed(asset: SerializedAsset) -> None: try: get_listener_manager().hook.on_asset_changed(asset=asset) except Exception: - log.exception("error calling listener for hook %r", "on_asset_changed") + log.exception("error calling listener", hook="on_asset_changed") @staticmethod def nofity_asset_event_emitted(asset_event: ListenerAssetEvent) -> None: @@ -420,7 +420,7 @@ def nofity_asset_event_emitted(asset_event: ListenerAssetEvent) -> None: try: get_listener_manager().hook.on_asset_event_emitted(asset_event=asset_event) except Exception: - log.exception("error calling listener for hook %r", "on_asset_event_emitted") + log.exception("error calling listener", hook="on_asset_event_emitted") @classmethod def _queue_dagruns( diff --git a/airflow-core/src/airflow/dag_processing/collection.py b/airflow-core/src/airflow/dag_processing/collection.py index d40c1111231c5..4b525a545ce7f 100644 --- a/airflow-core/src/airflow/dag_processing/collection.py +++ b/airflow-core/src/airflow/dag_processing/collection.py @@ -396,7 +396,7 @@ def _update_import_errors( filename=import_error.full_file_path(), stacktrace=stacktrace ) except Exception: - log.exception("error calling listener for hook %r", "on_existing_dag_import_error") + log.exception("error calling listener", hook="on_existing_dag_import_error") else: import_error = ParseImportError( filename=relative_fileloc, @@ -411,7 +411,7 @@ def _update_import_errors( filename=import_error.full_file_path(), stacktrace=stacktrace ) except Exception: - log.exception("error calling listener for hook %r", "on_new_dag_import_error") + log.exception("error calling listener", hook="on_new_dag_import_error") session.execute( update(DagModel) .where( diff --git a/airflow-core/src/airflow/jobs/job.py b/airflow-core/src/airflow/jobs/job.py index 4ac2c04e7b741..49ad7862d391a 100644 --- a/airflow-core/src/airflow/jobs/job.py +++ b/airflow-core/src/airflow/jobs/job.py @@ -147,7 +147,7 @@ def __init__(self, heartrate=None, **kwargs): try: get_listener_manager().hook.on_starting(component=self) except Exception: - self.log.exception("error calling listener for hook %r", "on_starting") + self.log.exception("error calling listener", hook="on_starting") super().__init__(**kwargs) @cached_property @@ -280,7 +280,7 @@ def complete_execution(self, session: Session = NEW_SESSION): try: get_listener_manager().hook.before_stopping(component=self) except Exception: - self.log.exception("error calling listener for hook %r", "before_stopping") + self.log.exception("error calling listener", hook="before_stopping") self.end_date = timezone.utcnow() session.merge(self) session.commit() diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 8f36c3c462157..53b4024284fd3 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -959,7 +959,7 @@ def startup(msg: StartupDetails) -> tuple[RuntimeTaskInstance, Context, Logger]: try: get_listener_manager().hook.on_starting(component=TaskRunnerMarker()) except Exception: - log.exception("error calling listener for hook %r", "on_starting") + log.exception("error calling listener", hook="on_starting") with _airflow_parsing_context_manager(dag_id=msg.ti.dag_id, task_id=msg.ti.task_id): ti = parse(msg, log) @@ -1936,7 +1936,7 @@ def finalize( try: get_listener_manager().hook.before_stopping(component=TaskRunnerMarker()) except Exception: - log.exception("error calling listener for hook %r", "before_stopping") + log.exception("error calling listener", hook="before_stopping") log.info("::endgroup::") From 51558a77159a04886bb0c5ba6ec0f48733c99d00 Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 18:30:12 -0700 Subject: [PATCH 5/7] fix newsfragment: misc type must be single-line --- airflow-core/newsfragments/66397.misc.rst | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/airflow-core/newsfragments/66397.misc.rst b/airflow-core/newsfragments/66397.misc.rst index fcae18431653d..d54f15a2e0ded 100644 --- a/airflow-core/newsfragments/66397.misc.rst +++ b/airflow-core/newsfragments/66397.misc.rst @@ -1,5 +1 @@ -Extend the listener-error-log change introduced for task instance hooks to the remaining listener call sites (DagRun state changes, DAG import errors, asset events, component lifecycle). - -Suppressed-listener-exception logs now identify which hook raised across every -listener-call surface in airflow-core and the Task SDK, completing the -observability change. +Extend listener-error structured-log change to remaining call sites (DagRun, asset events, DAG import errors, component lifecycle). From 67c73a30294f9102518fea681626e5219060a4a1 Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 18:41:12 -0700 Subject: [PATCH 6/7] fix listener test: structlog kwarg requires dict-form match --- airflow-core/tests/unit/listeners/test_listeners.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airflow-core/tests/unit/listeners/test_listeners.py b/airflow-core/tests/unit/listeners/test_listeners.py index fd5ad19a1415b..29b247be9dcef 100644 --- a/airflow-core/tests/unit/listeners/test_listeners.py +++ b/airflow-core/tests/unit/listeners/test_listeners.py @@ -135,8 +135,7 @@ def on_starting(self, component): Job() # instantiating a Job fires the on_starting hook - assert "error calling listener" in cap_structlog - assert "on_starting" in cap_structlog + assert {"event": "error calling listener", "hook": "on_starting"} in cap_structlog @provide_session From d245c94301565975c467621d3fd40762d6fc212f Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 21:40:45 -0700 Subject: [PATCH 7/7] fix listener-error log: use format string for stdlib Logger compat MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirrors the same fix in PR-B (#66395) — extends to lifecycle and asset listener call sites. --- .../airflow/api_fastapi/core_api/routes/public/dag_run.py | 4 ++-- airflow-core/src/airflow/assets/manager.py | 8 ++++---- airflow-core/src/airflow/dag_processing/collection.py | 4 ++-- airflow-core/src/airflow/jobs/job.py | 4 ++-- airflow-core/tests/unit/listeners/test_listeners.py | 2 +- task-sdk/src/airflow/sdk/execution_time/task_runner.py | 4 ++-- 6 files changed, 13 insertions(+), 13 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py index 18d0af45fbdb3..091dd31eb30c1 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -224,7 +224,7 @@ def patch_dag_run( try: get_listener_manager().hook.on_dag_run_success(dag_run=dag_run, msg="") except Exception: - log.exception("error calling listener", hook="on_dag_run_success") + log.exception("error calling listener for hook %r", "on_dag_run_success") elif attr_value == DAGRunPatchStates.QUEUED: set_dag_run_state_to_queued(dag=dag, run_id=dag_run.run_id, commit=True, session=session) # Not notifying on queued - only notifying on RUNNING, this is happening in scheduler @@ -233,7 +233,7 @@ def patch_dag_run( try: get_listener_manager().hook.on_dag_run_failed(dag_run=dag_run, msg="") except Exception: - log.exception("error calling listener", hook="on_dag_run_failed") + log.exception("error calling listener for hook %r", "on_dag_run_failed") elif attr_name == "note": updated_dag_run = session.get(DagRun, dag_run.id) if updated_dag_run and updated_dag_run.dag_run_note is None: diff --git a/airflow-core/src/airflow/assets/manager.py b/airflow-core/src/airflow/assets/manager.py index e9f2ac6b40ca2..f6c09025bf990 100644 --- a/airflow-core/src/airflow/assets/manager.py +++ b/airflow-core/src/airflow/assets/manager.py @@ -396,7 +396,7 @@ def notify_asset_created(asset: SerializedAsset): try: get_listener_manager().hook.on_asset_created(asset=asset) except Exception: - log.exception("error calling listener", hook="on_asset_created") + log.exception("error calling listener for hook %r", "on_asset_created") @staticmethod def notify_asset_alias_created(asset_assets: SerializedAssetAlias): @@ -404,7 +404,7 @@ def notify_asset_alias_created(asset_assets: SerializedAssetAlias): try: get_listener_manager().hook.on_asset_alias_created(asset_alias=asset_assets) except Exception: - log.exception("error calling listener", hook="on_asset_alias_created") + log.exception("error calling listener for hook %r", "on_asset_alias_created") @staticmethod def notify_asset_changed(asset: SerializedAsset) -> None: @@ -412,7 +412,7 @@ def notify_asset_changed(asset: SerializedAsset) -> None: try: get_listener_manager().hook.on_asset_changed(asset=asset) except Exception: - log.exception("error calling listener", hook="on_asset_changed") + log.exception("error calling listener for hook %r", "on_asset_changed") @staticmethod def nofity_asset_event_emitted(asset_event: ListenerAssetEvent) -> None: @@ -420,7 +420,7 @@ def nofity_asset_event_emitted(asset_event: ListenerAssetEvent) -> None: try: get_listener_manager().hook.on_asset_event_emitted(asset_event=asset_event) except Exception: - log.exception("error calling listener", hook="on_asset_event_emitted") + log.exception("error calling listener for hook %r", "on_asset_event_emitted") @classmethod def _queue_dagruns( diff --git a/airflow-core/src/airflow/dag_processing/collection.py b/airflow-core/src/airflow/dag_processing/collection.py index 4b525a545ce7f..d40c1111231c5 100644 --- a/airflow-core/src/airflow/dag_processing/collection.py +++ b/airflow-core/src/airflow/dag_processing/collection.py @@ -396,7 +396,7 @@ def _update_import_errors( filename=import_error.full_file_path(), stacktrace=stacktrace ) except Exception: - log.exception("error calling listener", hook="on_existing_dag_import_error") + log.exception("error calling listener for hook %r", "on_existing_dag_import_error") else: import_error = ParseImportError( filename=relative_fileloc, @@ -411,7 +411,7 @@ def _update_import_errors( filename=import_error.full_file_path(), stacktrace=stacktrace ) except Exception: - log.exception("error calling listener", hook="on_new_dag_import_error") + log.exception("error calling listener for hook %r", "on_new_dag_import_error") session.execute( update(DagModel) .where( diff --git a/airflow-core/src/airflow/jobs/job.py b/airflow-core/src/airflow/jobs/job.py index 49ad7862d391a..4ac2c04e7b741 100644 --- a/airflow-core/src/airflow/jobs/job.py +++ b/airflow-core/src/airflow/jobs/job.py @@ -147,7 +147,7 @@ def __init__(self, heartrate=None, **kwargs): try: get_listener_manager().hook.on_starting(component=self) except Exception: - self.log.exception("error calling listener", hook="on_starting") + self.log.exception("error calling listener for hook %r", "on_starting") super().__init__(**kwargs) @cached_property @@ -280,7 +280,7 @@ def complete_execution(self, session: Session = NEW_SESSION): try: get_listener_manager().hook.before_stopping(component=self) except Exception: - self.log.exception("error calling listener", hook="before_stopping") + self.log.exception("error calling listener for hook %r", "before_stopping") self.end_date = timezone.utcnow() session.merge(self) session.commit() diff --git a/airflow-core/tests/unit/listeners/test_listeners.py b/airflow-core/tests/unit/listeners/test_listeners.py index 29b247be9dcef..bcbf696c94baf 100644 --- a/airflow-core/tests/unit/listeners/test_listeners.py +++ b/airflow-core/tests/unit/listeners/test_listeners.py @@ -135,7 +135,7 @@ def on_starting(self, component): Job() # instantiating a Job fires the on_starting hook - assert {"event": "error calling listener", "hook": "on_starting"} in cap_structlog + assert "error calling listener for hook 'on_starting'" in cap_structlog @provide_session diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 53b4024284fd3..8f36c3c462157 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -959,7 +959,7 @@ def startup(msg: StartupDetails) -> tuple[RuntimeTaskInstance, Context, Logger]: try: get_listener_manager().hook.on_starting(component=TaskRunnerMarker()) except Exception: - log.exception("error calling listener", hook="on_starting") + log.exception("error calling listener for hook %r", "on_starting") with _airflow_parsing_context_manager(dag_id=msg.ti.dag_id, task_id=msg.ti.task_id): ti = parse(msg, log) @@ -1936,7 +1936,7 @@ def finalize( try: get_listener_manager().hook.before_stopping(component=TaskRunnerMarker()) except Exception: - log.exception("error calling listener", hook="before_stopping") + log.exception("error calling listener for hook %r", "before_stopping") log.info("::endgroup::")