diff --git a/airflow-core/src/airflow/utils/log/log_stream_accumulator.py b/airflow-core/src/airflow/utils/log/log_stream_accumulator.py index 953b47dd9719b..343a9f3513997 100644 --- a/airflow-core/src/airflow/utils/log/log_stream_accumulator.py +++ b/airflow-core/src/airflow/utils/log/log_stream_accumulator.py @@ -18,8 +18,10 @@ from __future__ import annotations +import logging import os import tempfile +import weakref from itertools import islice from typing import IO, TYPE_CHECKING @@ -31,6 +33,24 @@ StructuredLogStream, ) +logger = logging.getLogger(__name__) + + +def _safe_remove(path: str) -> None: + """ + Remove path, swallowing missing-file errors and logging others. + + Module-level rather than a method so weakref.finalize can hold it + without retaining a strong reference to the owning accumulator (which + would defeat the backstop). + """ + try: + os.remove(path) + except FileNotFoundError: + pass + except OSError as exc: + logger.debug("LogStreamAccumulator: could not remove temp file %s: %s", path, exc) + class LogStreamAccumulator: """ @@ -69,11 +89,17 @@ def __init__( self._buffer: list[StructuredLogMessage] = [] self._disk_lines: int = 0 self._tmpfile: IO[str] | None = None + self._stream_accessed: bool = False + self._finalizer: weakref.finalize | None = None def _flush_buffer_to_disk(self) -> None: """Flush the buffer contents to a temporary file on disk.""" if self._tmpfile is None: self._tmpfile = tempfile.NamedTemporaryFile(delete=False, mode="w+", encoding="utf-8") + # Backstop: guarantee the spill file is removed when this accumulator + # is garbage-collected, even if the caller abandons the stream + # generator (e.g. client disconnect on a StreamingResponse). + self._finalizer = weakref.finalize(self, _safe_remove, self._tmpfile.name) self._disk_lines += len(self._buffer) self._tmpfile.writelines(f"{log.model_dump_json()}\n" for log in self._buffer) @@ -91,12 +117,16 @@ def _capture(self) -> None: self._flush_buffer_to_disk() def _cleanup(self) -> None: - """Clean up the temporary file if it exists.""" + """Release the temp file and detach the finalizer; safe to call repeatedly.""" self._buffer.clear() - if self._tmpfile: + if self._tmpfile is not None: + path = self._tmpfile.name self._tmpfile.close() - os.remove(self._tmpfile.name) + _safe_remove(path) self._tmpfile = None + if self._finalizer is not None: + self._finalizer.detach() + self._finalizer = None @property def total_lines(self) -> int: @@ -119,6 +149,10 @@ def stream(self) -> StructuredLogStream: Returns: A stream of the captured log messages. """ + self._stream_accessed = True + return self._iter_stream() + + def _iter_stream(self) -> StructuredLogStream: try: if not self._tmpfile: # if no temporary file was created, return from the buffer @@ -147,9 +181,13 @@ def __enter__(self) -> Self: def __exit__(self, exc_type, exc_val, exc_tb) -> None: """ - Context manager exit that doesn't perform resource cleanup. + Context manager exit. - Note: Resources are not cleaned up here. Cleanup is deferred until - get_stream() is called and fully consumed, ensuring all logs are properly - yielded before cleanup occurs. + Cleans up immediately when an exception is propagating, or when the + caller never accessed .stream (so no generator exists to run the + deferred finally block). When the caller did access .stream, + cleanup is left to the generator's finally (happy path) with + weakref.finalize as a backstop if the generator is abandoned. """ + if exc_type is not None or not self._stream_accessed: + self._cleanup() diff --git a/airflow-core/tests/unit/utils/log/test_stream_accumulator.py b/airflow-core/tests/unit/utils/log/test_stream_accumulator.py index b1d2df09c884c..e813c5f19cd24 100644 --- a/airflow-core/tests/unit/utils/log/test_stream_accumulator.py +++ b/airflow-core/tests/unit/utils/log/test_stream_accumulator.py @@ -17,7 +17,9 @@ from __future__ import annotations +import gc import os +import weakref from typing import TYPE_CHECKING from unittest import mock @@ -74,15 +76,18 @@ def test__capture(self, structured_logs): with accumulator: mock_setup.assert_called_once() - def test__flush_buffer_to_disk(self, structured_logs): + def test__flush_buffer_to_disk(self, structured_logs, tmp_path): """Test flush-to-disk behavior with a small threshold.""" threshold = 6 - # Mock the temporary file to verify it's being written to - with ( - mock.patch("tempfile.NamedTemporaryFile") as mock_tmpfile, - ): + # Use a real path on tmp_path so __exit__ cleanup (which now runs when + # the stream is never accessed) can remove the file without raising. + fake_spill = tmp_path / "fake_spill.json" + fake_spill.touch() + + with mock.patch("tempfile.NamedTemporaryFile") as mock_tmpfile: mock_file = mock.MagicMock() + mock_file.name = str(fake_spill) mock_tmpfile.return_value = mock_file with LogStreamAccumulator(structured_logs, threshold) as accumulator: @@ -163,3 +168,84 @@ def test__cleanup(self, structured_logs): # After fully consuming the stream, cleanup should be called mock_cleanup.assert_called_once() + + def test_cleanup_when_stream_never_accessed(self, structured_logs): + """Temp file is removed on __exit__ when caller never reads .stream (e.g. uses only total_lines).""" + + with LogStreamAccumulator(structured_logs, 5) as accumulator: + assert accumulator._tmpfile is not None + tmpfile_name = accumulator._tmpfile.name + assert os.path.exists(tmpfile_name) + assert accumulator.total_lines == LOG_COUNT + + assert accumulator._tmpfile is None + assert not os.path.exists(tmpfile_name) + + def test_cleanup_on_exception_in_with_block(self, structured_logs): + """An exception propagating through __exit__ overrides the deferred-cleanup path.""" + + accumulator = LogStreamAccumulator(structured_logs, 5) + accumulator.__enter__() + _ = accumulator.stream + assert accumulator._tmpfile is not None + tmpfile_name = accumulator._tmpfile.name + assert os.path.exists(tmpfile_name) + + accumulator.__exit__(RuntimeError, RuntimeError("boom"), None) + + assert not os.path.exists(tmpfile_name) + assert accumulator._tmpfile is None + + def test_cleanup_deferred_when_stream_returned_then_iterated(self, structured_logs): + """Stream returned from inside the with block must remain readable after __exit__.""" + + with LogStreamAccumulator(structured_logs, 5) as accumulator: + returned_stream = accumulator.stream + assert accumulator._tmpfile is not None + tmpfile_name = accumulator._tmpfile.name + assert os.path.exists(tmpfile_name) + + assert os.path.exists(tmpfile_name) + + self.validate_log_stream(returned_stream) + + assert not os.path.exists(tmpfile_name) + + def test_cleanup_via_finalizer_on_abandoned_generator(self, structured_logs): + """When _cleanup is suppressed, the weakref backstop still removes the spill on GC.""" + + def make_and_abandon() -> tuple[str, weakref.ref]: + acc = LogStreamAccumulator(structured_logs, 5) + # Disable explicit cleanup so only the weakref backstop can remove the file. + acc._cleanup = lambda: None # type: ignore[method-assign] + acc.__enter__() + _gen = acc.stream + assert acc._tmpfile is not None + path = acc._tmpfile.name + acc.__exit__(None, None, None) + # _gen, acc go out of scope on return — finalizer should fire. + return path, weakref.ref(acc) + + tmpfile_name, acc_ref = make_and_abandon() + + gc.collect() + + assert acc_ref() is None, "accumulator should have been garbage collected" + assert not os.path.exists(tmpfile_name), "weakref.finalize backstop should have removed the file" + + def test_cleanup_idempotent(self, structured_logs): + """Repeated _cleanup calls do not raise.""" + + accumulator = LogStreamAccumulator(structured_logs, 5) + accumulator._capture() + assert accumulator._tmpfile is not None + tmpfile_name = accumulator._tmpfile.name + assert os.path.exists(tmpfile_name) + + accumulator._cleanup() + assert not os.path.exists(tmpfile_name) + assert accumulator._tmpfile is None + assert accumulator._finalizer is None + + accumulator._cleanup() + accumulator._cleanup()