diff --git a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py index 055cfd4493306..7d0f1805c4e3d 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py @@ -30,6 +30,8 @@ from typing import TYPE_CHECKING, Any import attrs +import structlog +import structlog.typing import watchtower from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook @@ -125,18 +127,22 @@ def processors(self) -> tuple[structlog.typing.Processor, ...]: import structlog.stdlib logRecordFactory = getLogRecordFactory() - # The handler MUST be initted here, before the processor is actually used to log anything. - # Otherwise, logging that occurs during the creation of the handler can create infinite loops. - _handler = self.handler + + # Eagerly initialize the handler before the closure is created. + # Without this, the first log emission triggers handler creation, + # which itself logs internally during client initialization, + # causing an infinite loop. + self.handler + from airflow.sdk.log import relative_path_from_logger def proc(logger: structlog.typing.WrappedLogger, method_name: str, event: structlog.typing.EventDict): if not logger or not (stream_name := relative_path_from_logger(logger)): return event - # We can't set the log stream name in the above init handler because - # the log path isn't known at that stage. - # Instead, we should always rely on the path (log stream name) provided by the logger. - _handler.log_stream_name = stream_name.as_posix().replace(":", "_") + # Access handler via self.handler each time rather than the + # closure-captured _handler, so the processor always uses the + # live instance rather than a potentially stale one. + self.handler.log_stream_name = stream_name.as_posix().replace(":", "_") name = event.get("logger_name") or event.get("logger", "") level = structlog.stdlib.NAME_TO_LEVEL.get(method_name.lower(), logging.INFO) msg = copy.copy(event) @@ -151,7 +157,7 @@ def proc(logger: structlog.typing.WrappedLogger, method_name: str, event: struct ct = created.timestamp() record.created = ct record.msecs = int((ct - int(ct)) * 1000) + 0.0 # Copied from stdlib logging - _handler.handle(record) + self.handler.handle(record) return event return (proc,) diff --git a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py index 8aad101b44d86..fc8fedb758807 100644 --- a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py @@ -908,3 +908,7 @@ def _get_result(self, hit: dict[Any, Any], parent_class=None) -> Hit: # callback should get the Hit class if "from_es" is not defined callback: type[Hit] | Callable[..., Any] = getattr(doc_class, "from_es", doc_class) return callback(hit) + + def close(self) -> None: + """No-op: logs are streamed in real time and require no explicit close.""" + pass diff --git a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py index 728c9c809868d..bc748224ad45c 100644 --- a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py +++ b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py @@ -1008,3 +1008,7 @@ def _get_result(self, hit: dict[Any, Any], parent_class=None) -> Hit: callback: type[Hit] | Callable[..., Any] = getattr(doc_class, "from_es", doc_class) return callback(hit) + + def close(self) -> None: + """No-op: logs are streamed in real time and require no explicit close.""" + pass diff --git a/shared/logging/src/airflow_shared/logging/remote.py b/shared/logging/src/airflow_shared/logging/remote.py index d8d76ef6a078c..cea20c41477f2 100644 --- a/shared/logging/src/airflow_shared/logging/remote.py +++ b/shared/logging/src/airflow_shared/logging/remote.py @@ -64,6 +64,10 @@ def read(self, relative_path: str, ti: RuntimeTI) -> LogResponse: """Read logs from the given remote log path.""" ... + def close(self) -> None: + """Flush and close the remote log handler.""" + ... + @runtime_checkable class RemoteLogStreamIO(RemoteLogIO, Protocol): diff --git a/shared/logging/tests/logging/test_remote.py b/shared/logging/tests/logging/test_remote.py index a8f1d7ab29091..8bb3ec6281b42 100644 --- a/shared/logging/tests/logging/test_remote.py +++ b/shared/logging/tests/logging/test_remote.py @@ -33,6 +33,9 @@ def upload(self, path, ti): def read(self, relative_path, ti): return ([], []) + def close(self) -> None: + pass + class TestDiscoverRemoteLogHandler: def test_discovers_handler_and_conn_id(self): @@ -160,6 +163,9 @@ def read(self, relative_path, ti): def stream(self, relative_path, ti): return ([], []) + def close(self) -> None: + pass + handler = StreamHandler() assert isinstance(handler, RemoteLogStreamIO) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index 528215797595a..5f3a3bf4693ef 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -170,6 +170,7 @@ from typing_extensions import Self from airflow.executors.workloads import BundleInfo + from airflow.sdk._shared.logging.remote import RemoteLogIO from airflow.sdk.bases.secrets_backend import BaseSecretsBackend from airflow.sdk.definitions.connection import Connection from airflow.sdk.execution_time.workloads.task import TaskInstanceDTO @@ -855,11 +856,7 @@ def _serialize_response(self, msg: BaseModel | ErrorResponse, **dump_opts) -> di return msg.model_dump(**dump_opts) def send_msg( - self, - msg: BaseModel | None, - request_id: int, - error: ErrorResponse | None = None, - **dump_opts, + self, msg: BaseModel | None, request_id: int, error: ErrorResponse | None = None, **dump_opts ): """ Send the msg as a length-prefixed response frame. @@ -1707,6 +1704,8 @@ def _handle_request(self, msg: ToSupervisor, log: FilteringBoundLogger, req_id: resp, dump_opts = handle_get_variable_keys(self.client, msg) elif isinstance(msg, GetXCom): resp, dump_opts = handle_get_xcom(self.client, msg) + elif isinstance(msg, GetXComCount): + resp, dump_opts = handle_get_xcom_count(self.client, msg) elif isinstance(msg, GetXComSequenceItem): resp, dump_opts = handle_get_xcom_sequence_item(self.client, msg) elif isinstance(msg, GetXComSequenceSlice): @@ -1774,8 +1773,6 @@ def _handle_request(self, msg: ToSupervisor, log: FilteringBoundLogger, req_id: dump_opts = {"exclude_unset": True} elif isinstance(msg, GetPrevSuccessfulDagRun): resp, dump_opts = handle_get_prev_successful_dag_run(self.client, self.id) - elif isinstance(msg, GetXComCount): - resp, dump_opts = handle_get_xcom_count(self.client, msg) elif isinstance(msg, TriggerDagRun): resp = self.client.dag_runs.trigger( msg.dag_id, msg.run_id, msg.conf, msg.logical_date, msg.run_after, msg.reset_dag_run, msg.note @@ -2385,11 +2382,31 @@ def ensure_secrets_backend_loaded() -> list[BaseSecretsBackend]: return ensure_secrets_loaded(default_backends=fallback_backends) -def _configure_logging(log_path: str, client: Client) -> tuple[FilteringBoundLogger, BinaryIO | TextIO]: +def _close_remote_log_handler(handler: RemoteLogIO) -> None: + """ + Close the remote log handler explicitly after all task log messages have been drained from the subprocess pipe. + + Called after process.wait() returns, before process exit triggers + logging.shutdown(). This ensures the remote handler's internal batch + queue is flushed before the process tears down. For example, the AWS + CloudWatch logger will emit: + + WatchtowerWarning: "Received message after logging system shutdown" + + if this is not done before process exit. + """ + try: + handler.close() + except Exception: + log.warning("Failed to close remote log handler", exc_info=True) + + +@contextlib.contextmanager +def _configure_logging(log_path: str, client: Client) -> Generator[FilteringBoundLogger, None, None]: # If we are told to write logs to a file, redirect the task logger to it. Make sure we append to the # file though, otherwise when we resume we would lose the logs from the start->deferral segment if it # lands on the same node as before. - from airflow.sdk.log import init_log_file, logging_processors + from airflow.sdk.log import init_log_file, load_remote_log_handler, logging_processors log_file_descriptor: BinaryIO | TextIO | None = None @@ -2405,9 +2422,30 @@ def _configure_logging(log_path: str, client: Client) -> tuple[FilteringBoundLog with _remote_logging_conn(client): processors = logging_processors(json_output=json_logs) + logger = structlog.wrap_logger(underlying_logger, processors=processors, logger_name="task").bind() - return logger, log_file_descriptor + try: + yield logger + finally: + # Flush and close the remote handler now — AFTER the supervisor has + # drained all task log messages from the subprocess pipe (i.e. after + # process.wait() has returned). + # + # Without this, the only thing that ever closes the handler is + # Python's logging.shutdown() at process exit, which fires after + # supervise_task() returns. Any messages still queued in the handler + # at that point are silently dropped. For example, the AWS CloudWatch + # logger will emit: + + # WatchtowerWarning: "Received message after logging system shutdown" + remote_handler = load_remote_log_handler() + if remote_handler is not None: + _close_remote_log_handler(remote_handler) + + if log_file_descriptor is not None: + with contextlib.suppress(Exception): + log_file_descriptor.close() def supervise_task( @@ -2495,12 +2533,6 @@ def supervise_task( with _ensure_client(server, token, client=client, dry_run=dry_run) as client: start = time.monotonic() - # TODO: Use logging providers to handle the chunked upload for us etc. - logger: FilteringBoundLogger | None = None - log_file_descriptor: BinaryIO | TextIO | None = None - if log_path: - logger, log_file_descriptor = _configure_logging(log_path, client) - backends = ensure_secrets_backend_loaded() log.info( "Secrets backends loaded for worker", @@ -2511,15 +2543,18 @@ def supervise_task( reset_secrets_masker() try: - result = coordinator.execute_task( - what=ti, - dag_rel_path=dag_rel_path, - bundle_info=bundle_info, - client=client, - logger=logger, - sentry_integration=sentry_integration, - subprocess_logs_to_stdout=subprocess_logs_to_stdout, - ) + with _configure_logging(log_path, client) if log_path else contextlib.nullcontext(None) as logger: + result = coordinator.execute_task( + what=ti, + dag_rel_path=dag_rel_path, + bundle_info=bundle_info, + client=client, + logger=logger, + sentry_integration=sentry_integration, + subprocess_logs_to_stdout=subprocess_logs_to_stdout, + ) + # _configure_logging.__exit__ fires here, remote handler is + # flushed and closed after all task log messages are drained. end = time.monotonic() log.info( "Workload finished", @@ -2531,8 +2566,6 @@ def supervise_task( ) return result.exit_code finally: - if log_path and log_file_descriptor: - log_file_descriptor.close() provider = trace.get_tracer_provider() if hasattr(provider, "force_flush"): provider.force_flush(timeout_millis=5000) # upper bound, not a fixed wait diff --git a/task-sdk/src/airflow/sdk/log.py b/task-sdk/src/airflow/sdk/log.py index 7391335b7f50b..411588dd6d06f 100644 --- a/task-sdk/src/airflow/sdk/log.py +++ b/task-sdk/src/airflow/sdk/log.py @@ -118,8 +118,14 @@ def configure_logging( if mask_secrets: extra_processors += (mask_logs,) - if (remote := load_remote_log_handler()) and (remote_processors := getattr(remote, "processors")): - extra_processors += remote_processors + # NOTE: Do NOT call getattr(remote, "processors") here. + # Accessing remote.processors triggers creation of, for example, the watchtower CloudWatchLogHandler + # via a cached_property. The configure_logging() call below runs dictConfig() internally, + # which calls _clearExistingHandlers() -> logging.shutdown() on ALL existing handlers — + # including the watchtower handler we would have just built. The handler ends up dead + # (shutting_down=True) before a single task log is emitted. + # See: https://github.com/apache/airflow/issues/66475 + # Remote processors are injected AFTER dictConfig via a second structlog.configure() call below. configure_logging( json_output=json_output, @@ -133,6 +139,15 @@ def configure_logging( callsite_parameters=callsite_params, ) + # dictConfig has now run — safe to build the watchtower handler. + # We append remote processors into the already-configured structlog chain, + # inserting before the final renderer (last processor in the chain). + if (remote := load_remote_log_handler()) and (remote_processors := getattr(remote, "processors")): + current_processors = list(structlog.get_config()["processors"]) + # Insert before the final renderer + updated_processors = current_processors[:-1] + list(remote_processors) + [current_processors[-1]] + structlog.configure(processors=updated_processors) + def logger_at_level(name: str, level: int) -> Logger: """Create a new logger at the given level."""