diff --git a/airflow-core/src/airflow/api_fastapi/logging/decorators.py b/airflow-core/src/airflow/api_fastapi/logging/decorators.py index a4734bb3e41aa..becc4654e9a70 100644 --- a/airflow-core/src/airflow/api_fastapi/logging/decorators.py +++ b/airflow-core/src/airflow/api_fastapi/logging/decorators.py @@ -28,6 +28,7 @@ from airflow._shared.secrets_masker import secrets_masker from airflow.api_fastapi.common.db.common import SessionDep from airflow.api_fastapi.core_api.security import GetUserDep +from airflow.listeners.listener import get_listener_manager from airflow.models import Log logger = logging.getLogger(__name__) @@ -170,5 +171,6 @@ async def log_action( # Explicit commit to persist the access log independently if the path operation fails or not. # Also it cannot be deferred to a 'function' scoped dependency because of the `request` parameter. session.commit() + get_listener_manager().hook.on_audit_log_created(log=log) return log_action diff --git a/airflow-core/src/airflow/listeners/listener.py b/airflow-core/src/airflow/listeners/listener.py index 06be7a1b9b908..6f543c1187170 100644 --- a/airflow-core/src/airflow/listeners/listener.py +++ b/airflow-core/src/airflow/listeners/listener.py @@ -21,7 +21,7 @@ from airflow._shared.listeners.listener import ListenerManager from airflow._shared.listeners.spec import lifecycle, taskinstance -from airflow.listeners.spec import asset, dagrun, importerrors +from airflow.listeners.spec import asset, audit_log, dagrun, importerrors from airflow.plugins_manager import integrate_listener_plugins @@ -40,6 +40,7 @@ def get_listener_manager() -> ListenerManager: _listener_manager = ListenerManager() _listener_manager.add_hookspecs(lifecycle) + _listener_manager.add_hookspecs(audit_log) _listener_manager.add_hookspecs(dagrun) _listener_manager.add_hookspecs(taskinstance) _listener_manager.add_hookspecs(asset) diff --git a/airflow-core/src/airflow/listeners/spec/audit_log.py b/airflow-core/src/airflow/listeners/spec/audit_log.py new file mode 100644 index 0000000000000..df1c8d23ad765 --- /dev/null +++ b/airflow-core/src/airflow/listeners/spec/audit_log.py @@ -0,0 +1,32 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING + +from pluggy import HookspecMarker + +if TYPE_CHECKING: + from airflow.models.log import Log + +hookspec = HookspecMarker("airflow") + + +@hookspec +def on_audit_log_created(log: Log): + """Execute when an audit log record is created.""" diff --git a/airflow-core/tests/unit/listeners/audit_log_listener.py b/airflow-core/tests/unit/listeners/audit_log_listener.py new file mode 100644 index 0000000000000..74077ebe85e6e --- /dev/null +++ b/airflow-core/tests/unit/listeners/audit_log_listener.py @@ -0,0 +1,36 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING + +from airflow.listeners import hookimpl + +if TYPE_CHECKING: + from airflow.models.log import Log + +audit_logs: list[Log] = [] + + +@hookimpl +def on_audit_log_created(log: Log): + audit_logs.append(log) + + +def clear(): + audit_logs.clear() diff --git a/airflow-core/tests/unit/listeners/test_audit_log_listener.py b/airflow-core/tests/unit/listeners/test_audit_log_listener.py new file mode 100644 index 0000000000000..20df308da12aa --- /dev/null +++ b/airflow-core/tests/unit/listeners/test_audit_log_listener.py @@ -0,0 +1,59 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pytest + +from airflow.listeners.listener import get_listener_manager +from airflow.models.log import Log + +from unit.listeners import audit_log_listener + +pytestmark = pytest.mark.db_test + + +@pytest.fixture(autouse=True) +def clean_listener_state(): + yield + audit_log_listener.clear() + + +def test_audit_log_hookspec_registered(): + """Verify the on_audit_log_created hookspec is registered in the listener manager.""" + lm = get_listener_manager() + assert hasattr(lm.hook, "on_audit_log_created") + + +def test_audit_log_listener_receives_event(listener_manager): + """Verify a registered listener receives audit log events.""" + listener_manager(audit_log_listener) + + log = Log(event="test_event", owner="test_user") + get_listener_manager().hook.on_audit_log_created(log=log) + + assert len(audit_log_listener.audit_logs) == 1 + assert audit_log_listener.audit_logs[0].event == "test_event" + assert audit_log_listener.audit_logs[0].owner == "test_user" + + +def test_audit_log_listener_no_event_without_registration(): + """Verify no error when hook fires with no listeners registered.""" + log = Log(event="test_event", owner="test_user") + # Should not raise + get_listener_manager().hook.on_audit_log_created(log=log) + assert len(audit_log_listener.audit_logs) == 0