Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ devel
DevOps
devtools
df
dictConfig
dicts
Dingding
dingding
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ class CloudWatchRemoteLogIO(LoggingMixin): # noqa: D101
log_stream_name: str = ""
log_group: str = attrs.field(init=False, repr=False)
region_name: str = attrs.field(init=False, repr=False)
_cached_handler: watchtower.CloudWatchLogHandler | None = attrs.field(
init=False, default=None, repr=False
)
_closed: bool = attrs.field(init=False, default=False, repr=False)

@log_group.default
def _(self):
Expand All @@ -107,8 +111,7 @@ def hook(self):
aws_conn_id=conf.get("logging", "remote_log_conn_id"), region_name=self.region_name
)

@cached_property
def handler(self) -> watchtower.CloudWatchLogHandler:
def _build_handler(self) -> watchtower.CloudWatchLogHandler:
_json_serialize = conf.getimport("aws", "cloudwatch_task_handler_json_serializer", fallback=None)
return watchtower.CloudWatchLogHandler(
log_group_name=self.log_group,
Expand All @@ -118,6 +121,17 @@ def handler(self) -> watchtower.CloudWatchLogHandler:
json_serialize_default=_json_serialize or json_serialize_legacy,
)

@property
def handler(self) -> watchtower.CloudWatchLogHandler:
"""Return the streaming handler, rebuilding it if dictConfig closed it mid-task."""
# dictConfig's non-incremental reset closes every handler in logging._handlerList,
# leaving this one with shutting_down=True (it then silently drops every record).
# Rebuild only while the IO is live: once close() has run, keep the closed handler so a
# late record is dropped instead of spawning an orphan handler + background thread.
if self._cached_handler is None or (not self._closed and self._cached_handler.shutting_down):
self._cached_handler = self._build_handler()
return self._cached_handler

@cached_property
def processors(self) -> tuple[structlog.typing.Processor, ...]:
from logging import getLogRecordFactory
Expand All @@ -127,16 +141,19 @@ def processors(self) -> tuple[structlog.typing.Processor, ...]:
logRecordFactory = getLogRecordFactory()
# The handler MUST be initted here, before the processor is actually used to log anything.
# Otherwise, logging that occurs during the creation of the handler can create infinite loops.
_handler = self.handler
_ = self.handler
from airflow.sdk.log import relative_path_from_logger

def proc(logger: structlog.typing.WrappedLogger, method_name: str, event: structlog.typing.EventDict):
if not logger or not (stream_name := relative_path_from_logger(logger)):
return event
# Resolve the handler on every record: configure_logging() may have
# closed the one built above, in which case ``handler`` rebuilds it.
handler = self.handler
# We can't set the log stream name in the above init handler because
# the log path isn't known at that stage.
# Instead, we should always rely on the path (log stream name) provided by the logger.
_handler.log_stream_name = stream_name.as_posix().replace(":", "_")
handler.log_stream_name = stream_name.as_posix().replace(":", "_")
name = event.get("logger_name") or event.get("logger", "")
level = structlog.stdlib.NAME_TO_LEVEL.get(method_name.lower(), logging.INFO)
msg = copy.copy(event)
Expand All @@ -151,20 +168,21 @@ def proc(logger: structlog.typing.WrappedLogger, method_name: str, event: struct
ct = created.timestamp()
record.created = ct
record.msecs = int((ct - int(ct)) * 1000) + 0.0 # Copied from stdlib logging
_handler.handle(record)
handler.handle(record)
return event

return (proc,)

def close(self):
# Use the flush method to ensure all logs are sent to CloudWatch.
# Closing the handler sets `shutting_down` to True, which prevents any further logs from being sent.
# When `shutting_down` is True, means the logging system is in the process of shutting down,
# during which it attempts to flush the logs which are queued.
if self.handler is None or self.handler.shutting_down:
# Terminal flush — only ever called from upload(). Mark the IO closed first so `handler`
# stops rebuilding: a record arriving after teardown must be dropped, not revive a fresh
# handler. Read the cached handler directly so we never build one just to flush it.
self._closed = True
handler = self._cached_handler
if handler is None or handler.shutting_down:
return

self.handler.flush()
handler.flush()

def upload(self, path: os.PathLike | str, ti: RuntimeTI | None = None) -> None:
"""Upload the given log path to the remote storage."""
Expand Down Expand Up @@ -314,8 +332,12 @@ def close(self):
if self.closed:
return

if self.handler is not None:
self.handler.close()
# Close the handler the IO is actually using now, not the reference captured in
# set_context(): dictConfig may have closed that one and the IO rebuilt since, and
# closing the stale handler would leak the live handler's background thread.
live_handler = self.io._cached_handler
if live_handler is not None:
live_handler.close()
if hasattr(self, "ti"):
try:
self.io.upload(self.log_relative_path, self.ti)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,47 @@ def test_log_message(self):
'{"foo": "bar", "event": "Hi", "level": "info", "timestamp": "2025-03-27T21:58:01.002000+00:00"}\n'
]

@time_machine.travel(datetime(2025, 3, 27, 21, 58, 1, 2345), tick=False)
def test_log_message_after_handler_closed_by_dictconfig(self):
# configure_logging() ends in logging.config.dictConfig(), whose
# _clearExistingHandlers closes every handler in logging._handlerList,
# including the streaming watchtower handler built moments earlier. The
# processor must rebuild it instead of feeding the closed one (which
# silently drops every record), otherwise no task log ever ships.
with conf_vars({("logging", "base_log_folder"): self.local_log_location.as_posix()}):
import structlog

closed = self.subject.handler
closed.close()
assert closed.shutting_down is True

log = structlog.get_logger()
log.info("Hi", foo="bar")
self.subject.close()

# A fresh handler was built rather than reusing the closed one.
assert self.subject.handler is not closed
assert self.subject.handler.shutting_down is False

stream_name = self.task_log_path.replace(":", "_")
_, logs = self.subject.read(stream_name, self.ti)
assert logs == [
'{"foo": "bar", "event": "Hi", "level": "info", "timestamp": "2025-03-27T21:58:01.002000+00:00"}\n'
]

def test_handler_not_rebuilt_after_close(self):
# Once the IO has been closed, a closed handler must NOT be rebuilt: a record arriving
# after teardown should be dropped silently rather than spin up an orphan handler and its
# background queue thread. Only dictConfig closing it mid-task should trigger a rebuild.
with conf_vars({("logging", "base_log_folder"): self.local_log_location.as_posix()}):
original = self.subject.handler
self.subject.close()
original.close()
assert original.shutting_down is True

assert self.subject.handler is original
assert self.subject.handler.shutting_down is True


@pytest.mark.db_test
class TestCloudwatchTaskHandler:
Expand Down Expand Up @@ -509,6 +550,23 @@ def test_close_calls_upload_once(self):
self.cloudwatch_task_handler.log_relative_path, self.ti
)

def test_close_closes_live_io_handler_after_rebuild(self):
"""close() closes the handler the IO is currently using, not a stale captured reference."""
handler = self.cloudwatch_task_handler
with mock.patch("airflow.utils.log.file_task_handler.FileTaskHandler.set_context"):
with mock.patch.object(handler.io, "upload"):
handler.set_context(self.ti)
stale = handler.handler
# Simulate dictConfig closing the handler mid-task and the IO rebuilding it.
stale.close()
rebuilt = handler.io._cached_handler = handler.io._build_handler()
assert rebuilt is not stale

handler.close()

# The live (rebuilt) handler is the one that gets closed, not the stale reference.
assert rebuilt.shutting_down is True

def test_close_skips_upload_without_set_context(self):
"""close() without a prior set_context() should not call io.upload()."""
with mock.patch.object(self.cloudwatch_task_handler.io, "upload") as mock_upload:
Expand Down
Loading