Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 45 additions & 7 deletions airflow-core/src/airflow/utils/log/log_stream_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

from __future__ import annotations

import logging
import os
import tempfile
import weakref
from itertools import islice
from typing import IO, TYPE_CHECKING

Expand All @@ -31,6 +33,24 @@
StructuredLogStream,
)

logger = logging.getLogger(__name__)


def _safe_remove(path: str) -> None:
"""
Remove path, swallowing missing-file errors and logging others.

Module-level rather than a method so weakref.finalize can hold it
without retaining a strong reference to the owning accumulator (which
would defeat the backstop).
"""
try:
os.remove(path)
except FileNotFoundError:
pass
except OSError as exc:
logger.debug("LogStreamAccumulator: could not remove temp file %s: %s", path, exc)


class LogStreamAccumulator:
"""
Expand Down Expand Up @@ -69,11 +89,17 @@ def __init__(
self._buffer: list[StructuredLogMessage] = []
self._disk_lines: int = 0
self._tmpfile: IO[str] | None = None
self._stream_accessed: bool = False
self._finalizer: weakref.finalize | None = None

def _flush_buffer_to_disk(self) -> None:
"""Flush the buffer contents to a temporary file on disk."""
if self._tmpfile is None:
self._tmpfile = tempfile.NamedTemporaryFile(delete=False, mode="w+", encoding="utf-8")
# Backstop: guarantee the spill file is removed when this accumulator
# is garbage-collected, even if the caller abandons the stream
# generator (e.g. client disconnect on a StreamingResponse).
self._finalizer = weakref.finalize(self, _safe_remove, self._tmpfile.name)

self._disk_lines += len(self._buffer)
self._tmpfile.writelines(f"{log.model_dump_json()}\n" for log in self._buffer)
Expand All @@ -91,12 +117,16 @@ def _capture(self) -> None:
self._flush_buffer_to_disk()

def _cleanup(self) -> None:
"""Clean up the temporary file if it exists."""
"""Release the temp file and detach the finalizer; safe to call repeatedly."""
self._buffer.clear()
if self._tmpfile:
if self._tmpfile is not None:
path = self._tmpfile.name
self._tmpfile.close()
os.remove(self._tmpfile.name)
_safe_remove(path)
self._tmpfile = None
if self._finalizer is not None:
self._finalizer.detach()
self._finalizer = None

@property
def total_lines(self) -> int:
Expand All @@ -119,6 +149,10 @@ def stream(self) -> StructuredLogStream:
Returns:
A stream of the captured log messages.
"""
self._stream_accessed = True
return self._iter_stream()

def _iter_stream(self) -> StructuredLogStream:
try:
if not self._tmpfile:
# if no temporary file was created, return from the buffer
Expand Down Expand Up @@ -147,9 +181,13 @@ def __enter__(self) -> Self:

def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""
Context manager exit that doesn't perform resource cleanup.
Context manager exit.

Note: Resources are not cleaned up here. Cleanup is deferred until
get_stream() is called and fully consumed, ensuring all logs are properly
yielded before cleanup occurs.
Cleans up immediately when an exception is propagating, or when the
caller never accessed .stream (so no generator exists to run the
deferred finally block). When the caller did access .stream,
cleanup is left to the generator's finally (happy path) with
weakref.finalize as a backstop if the generator is abandoned.
"""
if exc_type is not None or not self._stream_accessed:
self._cleanup()
96 changes: 91 additions & 5 deletions airflow-core/tests/unit/utils/log/test_stream_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

from __future__ import annotations

import gc
import os
import weakref
from typing import TYPE_CHECKING
from unittest import mock

Expand Down Expand Up @@ -74,15 +76,18 @@ def test__capture(self, structured_logs):
with accumulator:
mock_setup.assert_called_once()

def test__flush_buffer_to_disk(self, structured_logs):
def test__flush_buffer_to_disk(self, structured_logs, tmp_path):
"""Test flush-to-disk behavior with a small threshold."""
threshold = 6

# Mock the temporary file to verify it's being written to
with (
mock.patch("tempfile.NamedTemporaryFile") as mock_tmpfile,
):
# Use a real path on tmp_path so __exit__ cleanup (which now runs when
# the stream is never accessed) can remove the file without raising.
fake_spill = tmp_path / "fake_spill.json"
fake_spill.touch()

with mock.patch("tempfile.NamedTemporaryFile") as mock_tmpfile:
mock_file = mock.MagicMock()
mock_file.name = str(fake_spill)
mock_tmpfile.return_value = mock_file

with LogStreamAccumulator(structured_logs, threshold) as accumulator:
Expand Down Expand Up @@ -163,3 +168,84 @@ def test__cleanup(self, structured_logs):

# After fully consuming the stream, cleanup should be called
mock_cleanup.assert_called_once()

def test_cleanup_when_stream_never_accessed(self, structured_logs):
"""Temp file is removed on __exit__ when caller never reads .stream (e.g. uses only total_lines)."""

with LogStreamAccumulator(structured_logs, 5) as accumulator:
assert accumulator._tmpfile is not None
tmpfile_name = accumulator._tmpfile.name
assert os.path.exists(tmpfile_name)
assert accumulator.total_lines == LOG_COUNT

assert accumulator._tmpfile is None
assert not os.path.exists(tmpfile_name)

def test_cleanup_on_exception_in_with_block(self, structured_logs):
"""An exception propagating through __exit__ overrides the deferred-cleanup path."""

accumulator = LogStreamAccumulator(structured_logs, 5)
accumulator.__enter__()
_ = accumulator.stream
assert accumulator._tmpfile is not None
tmpfile_name = accumulator._tmpfile.name
assert os.path.exists(tmpfile_name)

accumulator.__exit__(RuntimeError, RuntimeError("boom"), None)

assert not os.path.exists(tmpfile_name)
assert accumulator._tmpfile is None

def test_cleanup_deferred_when_stream_returned_then_iterated(self, structured_logs):
"""Stream returned from inside the with block must remain readable after __exit__."""

with LogStreamAccumulator(structured_logs, 5) as accumulator:
returned_stream = accumulator.stream
assert accumulator._tmpfile is not None
tmpfile_name = accumulator._tmpfile.name
assert os.path.exists(tmpfile_name)

assert os.path.exists(tmpfile_name)

self.validate_log_stream(returned_stream)

assert not os.path.exists(tmpfile_name)

def test_cleanup_via_finalizer_on_abandoned_generator(self, structured_logs):
"""When _cleanup is suppressed, the weakref backstop still removes the spill on GC."""

def make_and_abandon() -> tuple[str, weakref.ref]:
acc = LogStreamAccumulator(structured_logs, 5)
# Disable explicit cleanup so only the weakref backstop can remove the file.
acc._cleanup = lambda: None # type: ignore[method-assign]
acc.__enter__()
_gen = acc.stream
assert acc._tmpfile is not None
path = acc._tmpfile.name
acc.__exit__(None, None, None)
# _gen, acc go out of scope on return — finalizer should fire.
return path, weakref.ref(acc)

tmpfile_name, acc_ref = make_and_abandon()

gc.collect()

assert acc_ref() is None, "accumulator should have been garbage collected"
assert not os.path.exists(tmpfile_name), "weakref.finalize backstop should have removed the file"

def test_cleanup_idempotent(self, structured_logs):
"""Repeated _cleanup calls do not raise."""

accumulator = LogStreamAccumulator(structured_logs, 5)
accumulator._capture()
assert accumulator._tmpfile is not None
tmpfile_name = accumulator._tmpfile.name
assert os.path.exists(tmpfile_name)

accumulator._cleanup()
assert not os.path.exists(tmpfile_name)
assert accumulator._tmpfile is None
assert accumulator._finalizer is None

accumulator._cleanup()
accumulator._cleanup()
Loading