diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 145c091e60054..1df1e4bfbb631 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -472,6 +472,7 @@ devel DevOps devtools df +dictConfig dicts Dingding dingding diff --git a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py index 5bc286cb9631d..d620bd335571f 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py @@ -91,6 +91,10 @@ class CloudWatchRemoteLogIO(LoggingMixin): # noqa: D101 log_stream_name: str = "" log_group: str = attrs.field(init=False, repr=False) region_name: str = attrs.field(init=False, repr=False) + _cached_handler: watchtower.CloudWatchLogHandler | None = attrs.field( + init=False, default=None, repr=False + ) + _closed: bool = attrs.field(init=False, default=False, repr=False) @log_group.default def _(self): @@ -107,8 +111,7 @@ def hook(self): aws_conn_id=conf.get("logging", "remote_log_conn_id"), region_name=self.region_name ) - @cached_property - def handler(self) -> watchtower.CloudWatchLogHandler: + def _build_handler(self) -> watchtower.CloudWatchLogHandler: _json_serialize = conf.getimport("aws", "cloudwatch_task_handler_json_serializer", fallback=None) return watchtower.CloudWatchLogHandler( log_group_name=self.log_group, @@ -118,6 +121,17 @@ def handler(self) -> watchtower.CloudWatchLogHandler: json_serialize_default=_json_serialize or json_serialize_legacy, ) + @property + def handler(self) -> watchtower.CloudWatchLogHandler: + """Return the streaming handler, rebuilding it if dictConfig closed it mid-task.""" + # dictConfig's non-incremental reset closes every handler in logging._handlerList, + # leaving this one with shutting_down=True (it then silently drops every record). + # Rebuild only while the IO is live: once close() has run, keep the closed handler so a + # late record is dropped instead of spawning an orphan handler + background thread. + if self._cached_handler is None or (not self._closed and self._cached_handler.shutting_down): + self._cached_handler = self._build_handler() + return self._cached_handler + @cached_property def processors(self) -> tuple[structlog.typing.Processor, ...]: from logging import getLogRecordFactory @@ -127,16 +141,19 @@ def processors(self) -> tuple[structlog.typing.Processor, ...]: logRecordFactory = getLogRecordFactory() # The handler MUST be initted here, before the processor is actually used to log anything. # Otherwise, logging that occurs during the creation of the handler can create infinite loops. - _handler = self.handler + _ = self.handler from airflow.sdk.log import relative_path_from_logger def proc(logger: structlog.typing.WrappedLogger, method_name: str, event: structlog.typing.EventDict): if not logger or not (stream_name := relative_path_from_logger(logger)): return event + # Resolve the handler on every record: configure_logging() may have + # closed the one built above, in which case ``handler`` rebuilds it. + handler = self.handler # We can't set the log stream name in the above init handler because # the log path isn't known at that stage. # Instead, we should always rely on the path (log stream name) provided by the logger. - _handler.log_stream_name = stream_name.as_posix().replace(":", "_") + handler.log_stream_name = stream_name.as_posix().replace(":", "_") name = event.get("logger_name") or event.get("logger", "") level = structlog.stdlib.NAME_TO_LEVEL.get(method_name.lower(), logging.INFO) msg = copy.copy(event) @@ -151,20 +168,21 @@ def proc(logger: structlog.typing.WrappedLogger, method_name: str, event: struct ct = created.timestamp() record.created = ct record.msecs = int((ct - int(ct)) * 1000) + 0.0 # Copied from stdlib logging - _handler.handle(record) + handler.handle(record) return event return (proc,) def close(self): - # Use the flush method to ensure all logs are sent to CloudWatch. - # Closing the handler sets `shutting_down` to True, which prevents any further logs from being sent. - # When `shutting_down` is True, means the logging system is in the process of shutting down, - # during which it attempts to flush the logs which are queued. - if self.handler is None or self.handler.shutting_down: + # Terminal flush — only ever called from upload(). Mark the IO closed first so `handler` + # stops rebuilding: a record arriving after teardown must be dropped, not revive a fresh + # handler. Read the cached handler directly so we never build one just to flush it. + self._closed = True + handler = self._cached_handler + if handler is None or handler.shutting_down: return - self.handler.flush() + handler.flush() def upload(self, path: os.PathLike | str, ti: RuntimeTI | None = None) -> None: """Upload the given log path to the remote storage.""" @@ -314,8 +332,12 @@ def close(self): if self.closed: return - if self.handler is not None: - self.handler.close() + # Close the handler the IO is actually using now, not the reference captured in + # set_context(): dictConfig may have closed that one and the IO rebuilt since, and + # closing the stale handler would leak the live handler's background thread. + live_handler = self.io._cached_handler + if live_handler is not None: + live_handler.close() if hasattr(self, "ti"): try: self.io.upload(self.log_relative_path, self.ti) diff --git a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py index c13515e044039..1786e7c06045a 100644 --- a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py +++ b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py @@ -242,6 +242,47 @@ def test_log_message(self): '{"foo": "bar", "event": "Hi", "level": "info", "timestamp": "2025-03-27T21:58:01.002000+00:00"}\n' ] + @time_machine.travel(datetime(2025, 3, 27, 21, 58, 1, 2345), tick=False) + def test_log_message_after_handler_closed_by_dictconfig(self): + # configure_logging() ends in logging.config.dictConfig(), whose + # _clearExistingHandlers closes every handler in logging._handlerList, + # including the streaming watchtower handler built moments earlier. The + # processor must rebuild it instead of feeding the closed one (which + # silently drops every record), otherwise no task log ever ships. + with conf_vars({("logging", "base_log_folder"): self.local_log_location.as_posix()}): + import structlog + + closed = self.subject.handler + closed.close() + assert closed.shutting_down is True + + log = structlog.get_logger() + log.info("Hi", foo="bar") + self.subject.close() + + # A fresh handler was built rather than reusing the closed one. + assert self.subject.handler is not closed + assert self.subject.handler.shutting_down is False + + stream_name = self.task_log_path.replace(":", "_") + _, logs = self.subject.read(stream_name, self.ti) + assert logs == [ + '{"foo": "bar", "event": "Hi", "level": "info", "timestamp": "2025-03-27T21:58:01.002000+00:00"}\n' + ] + + def test_handler_not_rebuilt_after_close(self): + # Once the IO has been closed, a closed handler must NOT be rebuilt: a record arriving + # after teardown should be dropped silently rather than spin up an orphan handler and its + # background queue thread. Only dictConfig closing it mid-task should trigger a rebuild. + with conf_vars({("logging", "base_log_folder"): self.local_log_location.as_posix()}): + original = self.subject.handler + self.subject.close() + original.close() + assert original.shutting_down is True + + assert self.subject.handler is original + assert self.subject.handler.shutting_down is True + @pytest.mark.db_test class TestCloudwatchTaskHandler: @@ -509,6 +550,23 @@ def test_close_calls_upload_once(self): self.cloudwatch_task_handler.log_relative_path, self.ti ) + def test_close_closes_live_io_handler_after_rebuild(self): + """close() closes the handler the IO is currently using, not a stale captured reference.""" + handler = self.cloudwatch_task_handler + with mock.patch("airflow.utils.log.file_task_handler.FileTaskHandler.set_context"): + with mock.patch.object(handler.io, "upload"): + handler.set_context(self.ti) + stale = handler.handler + # Simulate dictConfig closing the handler mid-task and the IO rebuilding it. + stale.close() + rebuilt = handler.io._cached_handler = handler.io._build_handler() + assert rebuilt is not stale + + handler.close() + + # The live (rebuilt) handler is the one that gets closed, not the stale reference. + assert rebuilt.shutting_down is True + def test_close_skips_upload_without_set_context(self): """close() without a prior set_context() should not call io.upload().""" with mock.patch.object(self.cloudwatch_task_handler.io, "upload") as mock_upload: