diff --git a/airflow-core/newsfragments/66397.misc.rst b/airflow-core/newsfragments/66397.misc.rst new file mode 100644 index 0000000000000..d54f15a2e0ded --- /dev/null +++ b/airflow-core/newsfragments/66397.misc.rst @@ -0,0 +1 @@ +Extend listener-error structured-log change to remaining call sites (DagRun, asset events, DAG import errors, component lifecycle). diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py index a40676f8e186f..091dd31eb30c1 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -224,7 +224,7 @@ def patch_dag_run( try: get_listener_manager().hook.on_dag_run_success(dag_run=dag_run, msg="") except Exception: - log.exception("error calling listener") + log.exception("error calling listener for hook %r", "on_dag_run_success") elif attr_value == DAGRunPatchStates.QUEUED: set_dag_run_state_to_queued(dag=dag, run_id=dag_run.run_id, commit=True, session=session) # Not notifying on queued - only notifying on RUNNING, this is happening in scheduler @@ -233,7 +233,7 @@ def patch_dag_run( try: get_listener_manager().hook.on_dag_run_failed(dag_run=dag_run, msg="") except Exception: - log.exception("error calling listener") + log.exception("error calling listener for hook %r", "on_dag_run_failed") elif attr_name == "note": updated_dag_run = session.get(DagRun, dag_run.id) if updated_dag_run and updated_dag_run.dag_run_note is None: diff --git a/airflow-core/src/airflow/assets/manager.py b/airflow-core/src/airflow/assets/manager.py index f1730fb2e8130..f6c09025bf990 100644 --- a/airflow-core/src/airflow/assets/manager.py +++ b/airflow-core/src/airflow/assets/manager.py @@ -396,7 +396,7 @@ def notify_asset_created(asset: SerializedAsset): try: get_listener_manager().hook.on_asset_created(asset=asset) except Exception: - log.exception("error calling listener") + log.exception("error calling listener for hook %r", "on_asset_created") @staticmethod def notify_asset_alias_created(asset_assets: SerializedAssetAlias): @@ -404,7 +404,7 @@ def notify_asset_alias_created(asset_assets: SerializedAssetAlias): try: get_listener_manager().hook.on_asset_alias_created(asset_alias=asset_assets) except Exception: - log.exception("error calling listener") + log.exception("error calling listener for hook %r", "on_asset_alias_created") @staticmethod def notify_asset_changed(asset: SerializedAsset) -> None: @@ -412,7 +412,7 @@ def notify_asset_changed(asset: SerializedAsset) -> None: try: get_listener_manager().hook.on_asset_changed(asset=asset) except Exception: - log.exception("error calling listener") + log.exception("error calling listener for hook %r", "on_asset_changed") @staticmethod def nofity_asset_event_emitted(asset_event: ListenerAssetEvent) -> None: @@ -420,7 +420,7 @@ def nofity_asset_event_emitted(asset_event: ListenerAssetEvent) -> None: try: get_listener_manager().hook.on_asset_event_emitted(asset_event=asset_event) except Exception: - log.exception("error calling listener") + log.exception("error calling listener for hook %r", "on_asset_event_emitted") @classmethod def _queue_dagruns( diff --git a/airflow-core/src/airflow/dag_processing/collection.py b/airflow-core/src/airflow/dag_processing/collection.py index 4df0b589c1fd9..d40c1111231c5 100644 --- a/airflow-core/src/airflow/dag_processing/collection.py +++ b/airflow-core/src/airflow/dag_processing/collection.py @@ -396,7 +396,7 @@ def _update_import_errors( filename=import_error.full_file_path(), stacktrace=stacktrace ) except Exception: - log.exception("error calling listener") + log.exception("error calling listener for hook %r", "on_existing_dag_import_error") else: import_error = ParseImportError( filename=relative_fileloc, @@ -411,7 +411,7 @@ def _update_import_errors( filename=import_error.full_file_path(), stacktrace=stacktrace ) except Exception: - log.exception("error calling listener") + log.exception("error calling listener for hook %r", "on_new_dag_import_error") session.execute( update(DagModel) .where( diff --git a/airflow-core/src/airflow/jobs/job.py b/airflow-core/src/airflow/jobs/job.py index 4ab2defd81b75..4ac2c04e7b741 100644 --- a/airflow-core/src/airflow/jobs/job.py +++ b/airflow-core/src/airflow/jobs/job.py @@ -147,7 +147,7 @@ def __init__(self, heartrate=None, **kwargs): try: get_listener_manager().hook.on_starting(component=self) except Exception: - self.log.exception("error calling listener") + self.log.exception("error calling listener for hook %r", "on_starting") super().__init__(**kwargs) @cached_property @@ -280,7 +280,7 @@ def complete_execution(self, session: Session = NEW_SESSION): try: get_listener_manager().hook.before_stopping(component=self) except Exception: - self.log.exception("error calling listener") + self.log.exception("error calling listener for hook %r", "before_stopping") self.end_date = timezone.utcnow() session.merge(self) session.commit() diff --git a/airflow-core/tests/unit/listeners/test_listeners.py b/airflow-core/tests/unit/listeners/test_listeners.py index aad2ea7b6e863..bcbf696c94baf 100644 --- a/airflow-core/tests/unit/listeners/test_listeners.py +++ b/airflow-core/tests/unit/listeners/test_listeners.py @@ -121,6 +121,23 @@ def test_listener_suppresses_exceptions(create_task_instance, session, cap_struc assert "error calling listener" in cap_structlog +def test_listener_lifecycle_error_log_includes_hook_name(cap_structlog, listener_manager): + """When a lifecycle listener raises, the log identifies the hook so plugin + authors can debug across multiple registered listeners.""" + from airflow.listeners import hookimpl + + class ThrowingLifecycleListener: + @hookimpl + def on_starting(self, component): + raise RuntimeError("listener boom") + + listener_manager(ThrowingLifecycleListener()) + + Job() # instantiating a Job fires the on_starting hook + + assert "error calling listener for hook 'on_starting'" in cap_structlog + + @provide_session def test_listener_captures_failed_taskinstances(create_task_instance_of_operator, session, listener_manager): listener_manager(full_listener) diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 56ba8343c648b..8f36c3c462157 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -959,7 +959,7 @@ def startup(msg: StartupDetails) -> tuple[RuntimeTaskInstance, Context, Logger]: try: get_listener_manager().hook.on_starting(component=TaskRunnerMarker()) except Exception: - log.exception("error calling listener") + log.exception("error calling listener for hook %r", "on_starting") with _airflow_parsing_context_manager(dag_id=msg.ti.dag_id, task_id=msg.ti.task_id): ti = parse(msg, log) @@ -1936,7 +1936,7 @@ def finalize( try: get_listener_manager().hook.before_stopping(component=TaskRunnerMarker()) except Exception: - log.exception("error calling listener") + log.exception("error calling listener for hook %r", "before_stopping") log.info("::endgroup::")