Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow-core/newsfragments/66397.misc.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Extend listener-error structured-log change to remaining call sites (DagRun, asset events, DAG import errors, component lifecycle).
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def patch_dag_run(
try:
get_listener_manager().hook.on_dag_run_success(dag_run=dag_run, msg="")
except Exception:
log.exception("error calling listener")
log.exception("error calling listener for hook %r", "on_dag_run_success")
elif attr_value == DAGRunPatchStates.QUEUED:
set_dag_run_state_to_queued(dag=dag, run_id=dag_run.run_id, commit=True, session=session)
# Not notifying on queued - only notifying on RUNNING, this is happening in scheduler
Expand All @@ -233,7 +233,7 @@ def patch_dag_run(
try:
get_listener_manager().hook.on_dag_run_failed(dag_run=dag_run, msg="")
except Exception:
log.exception("error calling listener")
log.exception("error calling listener for hook %r", "on_dag_run_failed")
elif attr_name == "note":
updated_dag_run = session.get(DagRun, dag_run.id)
if updated_dag_run and updated_dag_run.dag_run_note is None:
Expand Down
8 changes: 4 additions & 4 deletions airflow-core/src/airflow/assets/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,31 +396,31 @@ def notify_asset_created(asset: SerializedAsset):
try:
get_listener_manager().hook.on_asset_created(asset=asset)
except Exception:
log.exception("error calling listener")
log.exception("error calling listener for hook %r", "on_asset_created")

@staticmethod
def notify_asset_alias_created(asset_assets: SerializedAssetAlias):
"""Run applicable notification actions when an asset alias is created."""
try:
get_listener_manager().hook.on_asset_alias_created(asset_alias=asset_assets)
except Exception:
log.exception("error calling listener")
log.exception("error calling listener for hook %r", "on_asset_alias_created")

@staticmethod
def notify_asset_changed(asset: SerializedAsset) -> None:
"""Run applicable notification actions when an asset is changed."""
try:
get_listener_manager().hook.on_asset_changed(asset=asset)
except Exception:
log.exception("error calling listener")
log.exception("error calling listener for hook %r", "on_asset_changed")

@staticmethod
def nofity_asset_event_emitted(asset_event: ListenerAssetEvent) -> None:
"""Run applicable notification actions when an asset event is emitted."""
try:
get_listener_manager().hook.on_asset_event_emitted(asset_event=asset_event)
except Exception:
log.exception("error calling listener")
log.exception("error calling listener for hook %r", "on_asset_event_emitted")

@classmethod
def _queue_dagruns(
Expand Down
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/dag_processing/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ def _update_import_errors(
filename=import_error.full_file_path(), stacktrace=stacktrace
)
except Exception:
log.exception("error calling listener")
log.exception("error calling listener for hook %r", "on_existing_dag_import_error")
else:
import_error = ParseImportError(
filename=relative_fileloc,
Expand All @@ -411,7 +411,7 @@ def _update_import_errors(
filename=import_error.full_file_path(), stacktrace=stacktrace
)
except Exception:
log.exception("error calling listener")
log.exception("error calling listener for hook %r", "on_new_dag_import_error")
session.execute(
update(DagModel)
.where(
Expand Down
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/jobs/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def __init__(self, heartrate=None, **kwargs):
try:
get_listener_manager().hook.on_starting(component=self)
except Exception:
self.log.exception("error calling listener")
self.log.exception("error calling listener for hook %r", "on_starting")
super().__init__(**kwargs)

@cached_property
Expand Down Expand Up @@ -280,7 +280,7 @@ def complete_execution(self, session: Session = NEW_SESSION):
try:
get_listener_manager().hook.before_stopping(component=self)
except Exception:
self.log.exception("error calling listener")
self.log.exception("error calling listener for hook %r", "before_stopping")
self.end_date = timezone.utcnow()
session.merge(self)
session.commit()
Expand Down
17 changes: 17 additions & 0 deletions airflow-core/tests/unit/listeners/test_listeners.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,23 @@ def test_listener_suppresses_exceptions(create_task_instance, session, cap_struc
assert "error calling listener" in cap_structlog


def test_listener_lifecycle_error_log_includes_hook_name(cap_structlog, listener_manager):
"""When a lifecycle listener raises, the log identifies the hook so plugin
authors can debug across multiple registered listeners."""
from airflow.listeners import hookimpl

class ThrowingLifecycleListener:
@hookimpl
def on_starting(self, component):
raise RuntimeError("listener boom")

listener_manager(ThrowingLifecycleListener())

Job() # instantiating a Job fires the on_starting hook

assert "error calling listener for hook 'on_starting'" in cap_structlog


@provide_session
def test_listener_captures_failed_taskinstances(create_task_instance_of_operator, session, listener_manager):
listener_manager(full_listener)
Expand Down
4 changes: 2 additions & 2 deletions task-sdk/src/airflow/sdk/execution_time/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -959,7 +959,7 @@ def startup(msg: StartupDetails) -> tuple[RuntimeTaskInstance, Context, Logger]:
try:
get_listener_manager().hook.on_starting(component=TaskRunnerMarker())
except Exception:
log.exception("error calling listener")
log.exception("error calling listener for hook %r", "on_starting")

with _airflow_parsing_context_manager(dag_id=msg.ti.dag_id, task_id=msg.ti.task_id):
ti = parse(msg, log)
Expand Down Expand Up @@ -1936,7 +1936,7 @@ def finalize(
try:
get_listener_manager().hook.before_stopping(component=TaskRunnerMarker())
except Exception:
log.exception("error calling listener")
log.exception("error calling listener for hook %r", "before_stopping")

log.info("::endgroup::")

Expand Down
Loading