From ad9c3aa42ee2bee9267e272f7bf843178842d442 Mon Sep 17 00:00:00 2001 From: Idris Akorede Ibrahim Date: Sat, 9 May 2026 19:53:17 +0100 Subject: [PATCH 01/11] Fix CloudWatch/Watchtower logs dropped in Task SDK due to handler lifetime bugs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three related bugs all produce WatchtowerWarning: "Received message after logging system shutdown", causing CloudWatch log streams to be truncated or never created. Bug 1 (task-sdk/src/airflow/sdk/log.py): In configure_logging(), remote.processors was accessed before the inner configure_logging() call ran dictConfig(). dictConfig() calls _clearExistingHandlers() -> logging.shutdown() on all existing handlers, killing the just-created watchtower handler (shutting_down=True) before any task log is emitted. Fix: move getattr(remote, "processors") to after dictConfig runs, injecting via a second structlog.configure() call. Bug 2 (providers/amazon/.../cloudwatch_task_handler.py): CloudWatchRemoteLogIO.handler was a cached_property — once killed by logging.shutdown() the dead instance was never replaced. The processors cached_property also captured _handler in a closure, pinning the dead instance. Fix: convert handler to a regular property that recreates when shutting_down=True; access self.handler dynamically in the processors closure. Bug 3 (task-sdk/src/airflow/sdk/execution_time/supervisor.py): _configure_logging() returned a plain tuple with no cleanup path. The only code that ever closed the remote handler was logging.shutdown() at process exit, which fired after supervise_task() returned while final task messages were still in flight. Fix: convert _configure_logging() to a contextmanager that explicitly flushes and closes the handler after process.wait() drains all task log messages. Fixes: #66475 --- .../amazon/aws/log/cloudwatch_task_handler.py | 30 ++++++-- .../src/airflow_shared/logging/remote.py | 4 + .../airflow/sdk/execution_time/supervisor.py | 76 ++++++++++++++----- task-sdk/src/airflow/sdk/log.py | 19 ++++- 4 files changed, 100 insertions(+), 29 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py index 055cfd4493306..eb062783bcb32 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py @@ -107,8 +107,11 @@ def hook(self): aws_conn_id=conf.get("logging", "remote_log_conn_id"), region_name=self.region_name ) - @cached_property - def handler(self) -> watchtower.CloudWatchLogHandler: + _handler_instance: watchtower.CloudWatchLogHandler | None = attrs.field( + default=None, init=False, repr=False + ) + + def _create_handler(self) -> watchtower.CloudWatchLogHandler: _json_serialize = conf.getimport("aws", "cloudwatch_task_handler_json_serializer", fallback=None) return watchtower.CloudWatchLogHandler( log_group_name=self.log_group, @@ -118,6 +121,16 @@ def handler(self) -> watchtower.CloudWatchLogHandler: json_serialize_default=_json_serialize or json_serialize_legacy, ) + @property + def handler(self) -> watchtower.CloudWatchLogHandler: + # Defensive self-healing: if the handler was killed by logging.shutdown() + # (shutting_down=True), recreate it. This can happen if dictConfig() is called + # after the handler was first created, since dictConfig calls + # _clearExistingHandlers() -> logging.shutdown() on all existing handlers. + if self._handler_instance is None or self._handler_instance.shutting_down: + self._handler_instance = self._create_handler() + return self._handler + @cached_property def processors(self) -> tuple[structlog.typing.Processor, ...]: from logging import getLogRecordFactory @@ -125,9 +138,12 @@ def processors(self) -> tuple[structlog.typing.Processor, ...]: import structlog.stdlib logRecordFactory = getLogRecordFactory() - # The handler MUST be initted here, before the processor is actually used to log anything. - # Otherwise, logging that occurs during the creation of the handler can create infinite loops. - _handler = self.handler + # Eagerly init the handler to avoid infinite loops from logging during handler creation. + # We do NOT capture it in a closure variable — instead we access self.handler each time + # so that if the handler is killed by logging.shutdown() and recreated, the processor + # always uses the live instance rather than a dead one.# + _ = self.handler + _self = self from airflow.sdk.log import relative_path_from_logger def proc(logger: structlog.typing.WrappedLogger, method_name: str, event: structlog.typing.EventDict): @@ -136,7 +152,7 @@ def proc(logger: structlog.typing.WrappedLogger, method_name: str, event: struct # We can't set the log stream name in the above init handler because # the log path isn't known at that stage. # Instead, we should always rely on the path (log stream name) provided by the logger. - _handler.log_stream_name = stream_name.as_posix().replace(":", "_") + _self.handler.log_stream_name = stream_name.as_posix().replace(":", "_") name = event.get("logger_name") or event.get("logger", "") level = structlog.stdlib.NAME_TO_LEVEL.get(method_name.lower(), logging.INFO) msg = copy.copy(event) @@ -151,7 +167,7 @@ def proc(logger: structlog.typing.WrappedLogger, method_name: str, event: struct ct = created.timestamp() record.created = ct record.msecs = int((ct - int(ct)) * 1000) + 0.0 # Copied from stdlib logging - _handler.handle(record) + _self.handler.handle(record) return event return (proc,) diff --git a/shared/logging/src/airflow_shared/logging/remote.py b/shared/logging/src/airflow_shared/logging/remote.py index d8d76ef6a078c..cea20c41477f2 100644 --- a/shared/logging/src/airflow_shared/logging/remote.py +++ b/shared/logging/src/airflow_shared/logging/remote.py @@ -64,6 +64,10 @@ def read(self, relative_path: str, ti: RuntimeTI) -> LogResponse: """Read logs from the given remote log path.""" ... + def close(self) -> None: + """Flush and close the remote log handler.""" + ... + @runtime_checkable class RemoteLogStreamIO(RemoteLogIO, Protocol): diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index 528215797595a..9eb8c2fd49345 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -170,6 +170,7 @@ from typing_extensions import Self from airflow.executors.workloads import BundleInfo + from airflow.sdk._shared.logging.remote import RemoteLogIO from airflow.sdk.bases.secrets_backend import BaseSecretsBackend from airflow.sdk.definitions.connection import Connection from airflow.sdk.execution_time.workloads.task import TaskInstanceDTO @@ -2385,11 +2386,31 @@ def ensure_secrets_backend_loaded() -> list[BaseSecretsBackend]: return ensure_secrets_loaded(default_backends=fallback_backends) -def _configure_logging(log_path: str, client: Client) -> tuple[FilteringBoundLogger, BinaryIO | TextIO]: +def _close_remote_log_handler(handler: RemoteLogIO) -> None: + """ + Close the remote log handler explicitly after all task log messages have been drained from the subprocess pipe. + + Called after process.wait() returns, before process exit triggers + logging.shutdown(). This ensures the remote handler's internal batch + queue is flushed before the process tears down. For example, the AWS + CloudWatch logger will emit: + + WatchtowerWarning: "Received message after logging system shutdown" + + if this is not done before process exit. + """ + try: + handler.close() + except Exception: + log.warning("Failed to close remote log handler", exc_info=True) + + +@contextlib.contextmanager +def _configure_logging(log_path: str, client: Client) -> Generator[FilteringBoundLogger, None, None]: # If we are told to write logs to a file, redirect the task logger to it. Make sure we append to the # file though, otherwise when we resume we would lose the logs from the start->deferral segment if it # lands on the same node as before. - from airflow.sdk.log import init_log_file, logging_processors + from airflow.sdk.log import init_log_file, load_remote_log_handler, logging_processors log_file_descriptor: BinaryIO | TextIO | None = None @@ -2405,9 +2426,29 @@ def _configure_logging(log_path: str, client: Client) -> tuple[FilteringBoundLog with _remote_logging_conn(client): processors = logging_processors(json_output=json_logs) + logger = structlog.wrap_logger(underlying_logger, processors=processors, logger_name="task").bind() - return logger, log_file_descriptor + try: + yield logger + finally: + # Flush and close the remote handler now — AFTER the supervisor has + # drained all task log messages from the subprocess pipe (i.e. after + # process.wait() has returned). + # + # Without this, the only thing that ever closes the handler is + # Python's logging.shutdown() at process exit, which fires after + # supervise_task() returns. Any messages still queued in the handler + # at that point are silently dropped, producing: + # + # WatchtowerWarning: "Received message after logging system shutdown" + remote_handler = load_remote_log_handler() + if remote_handler is not None: + _close_remote_log_handler(remote_handler) + + if log_file_descriptor is not None: + with contextlib.suppress(Exception): + log_file_descriptor.close() def supervise_task( @@ -2495,12 +2536,6 @@ def supervise_task( with _ensure_client(server, token, client=client, dry_run=dry_run) as client: start = time.monotonic() - # TODO: Use logging providers to handle the chunked upload for us etc. - logger: FilteringBoundLogger | None = None - log_file_descriptor: BinaryIO | TextIO | None = None - if log_path: - logger, log_file_descriptor = _configure_logging(log_path, client) - backends = ensure_secrets_backend_loaded() log.info( "Secrets backends loaded for worker", @@ -2511,15 +2546,18 @@ def supervise_task( reset_secrets_masker() try: - result = coordinator.execute_task( - what=ti, - dag_rel_path=dag_rel_path, - bundle_info=bundle_info, - client=client, - logger=logger, - sentry_integration=sentry_integration, - subprocess_logs_to_stdout=subprocess_logs_to_stdout, - ) + with _configure_logging(log_path, client) if log_path else contextlib.nullcontext(None) as logger: + result = coordinator.execute_task( + what=ti, + dag_rel_path=dag_rel_path, + bundle_info=bundle_info, + client=client, + logger=logger, + sentry_integration=sentry_integration, + subprocess_logs_to_stdout=subprocess_logs_to_stdout, + ) + # _configure_logging.__exit__ fires here, remote handler is + # flushed and closed after all task log messages are drained. end = time.monotonic() log.info( "Workload finished", @@ -2531,8 +2569,6 @@ def supervise_task( ) return result.exit_code finally: - if log_path and log_file_descriptor: - log_file_descriptor.close() provider = trace.get_tracer_provider() if hasattr(provider, "force_flush"): provider.force_flush(timeout_millis=5000) # upper bound, not a fixed wait diff --git a/task-sdk/src/airflow/sdk/log.py b/task-sdk/src/airflow/sdk/log.py index 7391335b7f50b..4c0a50ee0b422 100644 --- a/task-sdk/src/airflow/sdk/log.py +++ b/task-sdk/src/airflow/sdk/log.py @@ -118,8 +118,14 @@ def configure_logging( if mask_secrets: extra_processors += (mask_logs,) - if (remote := load_remote_log_handler()) and (remote_processors := getattr(remote, "processors")): - extra_processors += remote_processors + # NOTE: Do NOT call getattr(remote, "processors") here. + # Accessing remote.processors triggers creation of the watchtower CloudWatchLogHandler + # via a cached_property. The configure_logging() call below runs dictConfig() internally, + # which calls _clearExistingHandlers() -> logging.shutdown() on ALL existing handlers — + # including the watchtower handler we would have just built. The handler ends up dead + # (shutting_down=True) before a single task log is emitted. + # See: https://github.com/apache/airflow/issues/66475 + # Remote processors are injected AFTER dictConfig via a second structlog.configure() call below. configure_logging( json_output=json_output, @@ -133,6 +139,15 @@ def configure_logging( callsite_parameters=callsite_params, ) + # dictConfig has now run — safe to build the watchtower handler. + # We append remote processors into the already-configured structlog chain, + # inserting before the final renderer (last processor in the chain). + if (remote := load_remote_log_handler()) and (remote_processors := getattr(remote, "processors")): + current_processors = list(structlog.get_config()["processors"]) + # Insert before the final renderer + updated_processors = current_processors[:-1] + list(remote_processors) + [current_processors[-1]] + structlog.configure(processors=updated_processors) + def logger_at_level(name: str, level: int) -> Logger: """Create a new logger at the given level.""" From dd641f4a5a9310a6775bbbb508441938198273b0 Mon Sep 17 00:00:00 2001 From: Idris Akorede Ibrahim Date: Sat, 9 May 2026 20:40:37 +0100 Subject: [PATCH 02/11] Fix test stubs to implement updated RemoteLogIO protocol with close() --- shared/logging/tests/logging/test_remote.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/shared/logging/tests/logging/test_remote.py b/shared/logging/tests/logging/test_remote.py index a8f1d7ab29091..8bb3ec6281b42 100644 --- a/shared/logging/tests/logging/test_remote.py +++ b/shared/logging/tests/logging/test_remote.py @@ -33,6 +33,9 @@ def upload(self, path, ti): def read(self, relative_path, ti): return ([], []) + def close(self) -> None: + pass + class TestDiscoverRemoteLogHandler: def test_discovers_handler_and_conn_id(self): @@ -160,6 +163,9 @@ def read(self, relative_path, ti): def stream(self, relative_path, ti): return ([], []) + def close(self) -> None: + pass + handler = StreamHandler() assert isinstance(handler, RemoteLogStreamIO) From 5f89059228ce4ba5aea79a5434e59f4d0d4a4547 Mon Sep 17 00:00:00 2001 From: Idris Akorede Ibrahim Date: Sun, 10 May 2026 09:02:17 +0100 Subject: [PATCH 03/11] Fix attrs field declaration for self-healing handler in CloudWatchRemoteLogIO --- .../providers/amazon/aws/log/cloudwatch_task_handler.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py index eb062783bcb32..7f32a06a6b0df 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py @@ -107,9 +107,7 @@ def hook(self): aws_conn_id=conf.get("logging", "remote_log_conn_id"), region_name=self.region_name ) - _handler_instance: watchtower.CloudWatchLogHandler | None = attrs.field( - default=None, init=False, repr=False - ) + _handler: watchtower.CloudWatchLogHandler | None = attrs.field(default=None, init=False, repr=False) def _create_handler(self) -> watchtower.CloudWatchLogHandler: _json_serialize = conf.getimport("aws", "cloudwatch_task_handler_json_serializer", fallback=None) @@ -127,8 +125,8 @@ def handler(self) -> watchtower.CloudWatchLogHandler: # (shutting_down=True), recreate it. This can happen if dictConfig() is called # after the handler was first created, since dictConfig calls # _clearExistingHandlers() -> logging.shutdown() on all existing handlers. - if self._handler_instance is None or self._handler_instance.shutting_down: - self._handler_instance = self._create_handler() + if self._handler is None or self._handler.shutting_down: + self._handler = self._create_handler() return self._handler @cached_property From 2e504ff4570f4275600ff05a09950e5873cff531 Mon Sep 17 00:00:00 2001 From: Idris Akorede Ibrahim Date: Sun, 10 May 2026 09:47:46 +0100 Subject: [PATCH 04/11] Add close() to ElAdd close() to ElasticsearchRemoteLogIO and OpensearchRemoteLogIO to satisfy updated RemoteLogIO protocol --- .../airflow/providers/elasticsearch/log/es_task_handler.py | 4 ++++ .../src/airflow/providers/opensearch/log/os_task_handler.py | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py index 8aad101b44d86..fc8fedb758807 100644 --- a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py @@ -908,3 +908,7 @@ def _get_result(self, hit: dict[Any, Any], parent_class=None) -> Hit: # callback should get the Hit class if "from_es" is not defined callback: type[Hit] | Callable[..., Any] = getattr(doc_class, "from_es", doc_class) return callback(hit) + + def close(self) -> None: + """No-op: logs are streamed in real time and require no explicit close.""" + pass diff --git a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py index 728c9c809868d..bc748224ad45c 100644 --- a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py +++ b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py @@ -1008,3 +1008,7 @@ def _get_result(self, hit: dict[Any, Any], parent_class=None) -> Hit: callback: type[Hit] | Callable[..., Any] = getattr(doc_class, "from_es", doc_class) return callback(hit) + + def close(self) -> None: + """No-op: logs are streamed in real time and require no explicit close.""" + pass From 142192e1a787b8ff4b6797a2e6743345ce840541 Mon Sep 17 00:00:00 2001 From: Idris Akorede Ibrahim Date: Wed, 13 May 2026 00:18:07 +0100 Subject: [PATCH 05/11] Add regression tests for self-healing handler and dynamic processor access --- .../aws/log/test_cloudwatch_task_handler.py | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py index c13515e044039..98db9348be9ad 100644 --- a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py +++ b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py @@ -242,6 +242,62 @@ def test_log_message(self): '{"foo": "bar", "event": "Hi", "level": "info", "timestamp": "2025-03-27T21:58:01.002000+00:00"}\n' ] + def test_handler_recreates_after_shutdown(self): + """ + Handler was a cached_property, once killed by logging.shutdown() + the dead instance was never replaced. Verify that accessing handler + after shutting_down=True returns a new live instance. + """ + with conf_vars({("logging", "base_log_folder"): self.local_log_location.as_posix()}): + # Access handler to initialize it + first_handler = self.subject.handler + assert first_handler.shutting_down is False + + # Simulate what dictConfig() does, calls logging.shutdown() on + # all registered handlers, setting shutting_down=True + first_handler.shutting_down = True + + # Accessing handler now must return a new live instance, not the dead one + second_handler = self.subject.handler + assert second_handler is not first_handler + assert second_handler.shutting_down is False + + second_handler.close() + + def test_processor_uses_live_handler_after_shutdown(self): + """ + Processors captured _handler in a closure, pinning the dead instance + even after the property recreated a new one. Verify that the processor + emits to the live handler after a shutdown/recreate cycle. + """ + with conf_vars({("logging", "base_log_folder"): self.local_log_location.as_posix()}): + # Get the processors tuple, this eagerly initializes the handler + procs = self.subject.processors + assert len(procs) == 1 + + # Simulate dictConfig() killing the handler + first_handler = self.subject.handler + first_handler.shutting_down = True + + # Access handler to trigger recreation + second_handler = self.subject.handler + assert second_handler is not first_handler + + # The processor must now emit to the live second_handler, not the dead first_handler + with mock.patch.object(second_handler, "handle") as mock_handle: + with mock.patch.object(first_handler, "handle") as mock_dead_handle: + import structlog + + log = structlog.get_logger() + log.info("test after recreate", foo="baz") + + # Live handler should receive the record + mock_handle.assert_called_once() + # Dead handler must not receive anything + mock_dead_handle.assert_not_called() + + second_handler.close() + @pytest.mark.db_test class TestCloudwatchTaskHandler: From 0b60d296106d3d3e411da63e9cfa60e4e8ab7fa9 Mon Sep 17 00:00:00 2001 From: Idris Akorede Ibrahim Date: Wed, 13 May 2026 09:58:59 +0100 Subject: [PATCH 06/11] Remove dead isinstance guard in _close_remote_log_handler --- .../airflow/sdk/execution_time/supervisor.py | 107 ++++++++++++++---- 1 file changed, 87 insertions(+), 20 deletions(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index 9eb8c2fd49345..e47079dbcc6f9 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -145,7 +145,6 @@ handle_get_ti_count, handle_get_variable, handle_get_variable_keys, - handle_get_xcom, handle_get_xcom_count, handle_get_xcom_sequence_item, handle_get_xcom_sequence_slice, @@ -484,7 +483,8 @@ def exit(n: int) -> NoReturn: except Exception as e: with suppress(Exception): print( - f"--- Last chance exception handler failed --- {repr(str(e))}\n", file=last_chance_stderr + f"--- Last chance exception handler failed --- {repr(str(e))}\n", + file=last_chance_stderr, ) exit(125) @@ -722,13 +722,22 @@ def start( # execv replaces the process -- unreachable on success else: # Run the child entrypoint - _fork_main(child_requests, child_stdout, child_stderr, child_logs.fileno(), target) + _fork_main( + child_requests, + child_stdout, + child_stderr, + child_logs.fileno(), + target, + ) except BaseException as e: import traceback with suppress(BaseException): # We can't use log here, as if we except out of the child something _weird_ went on. - print("Exception in child process, exiting with code 124", file=sys.stderr) + print( + "Exception in child process, exiting with code 124", + file=sys.stderr, + ) traceback.print_exception(type(e), e, e.__traceback__, file=sys.stderr) # It's really super super important we never exit this block. We are in the forked child, and if we @@ -805,7 +814,8 @@ def _register_pipe_readers( logs, selectors.EVENT_READ, make_buffered_socket_reader( - process_log_messages_from_subprocess(target_loggers), on_close=self._on_socket_closed + process_log_messages_from_subprocess(target_loggers), + on_close=self._on_socket_closed, ), ) self.selector.register( @@ -999,7 +1009,11 @@ def kill( return # Escalation sequence: SIGINT -> SIGTERM -> SIGKILL - escalation_path: list[signal.Signals] = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL] + escalation_path: list[signal.Signals] = [ + signal.SIGINT, + signal.SIGTERM, + signal.SIGKILL, + ] if force and signal_to_send in escalation_path: # Start from `signal_to_send` and escalate to the end of the escalation path @@ -1020,10 +1034,17 @@ def kill( # read from any of the sockets, so we need to re-run it if the process is still alive if ( exit_code := self._service_subprocess( - max_wait_time=end - now, raise_on_timeout=False, expect_signal=sig + max_wait_time=end - now, + raise_on_timeout=False, + expect_signal=sig, ) ) is not None: - log.info("Process exited", pid=self.pid, exit_code=exit_code, signal_sent=sig.name) + log.info( + "Process exited", + pid=self.pid, + exit_code=exit_code, + signal_sent=sig.name, + ) return now = time.monotonic() @@ -1057,7 +1078,10 @@ def __repr__(self) -> str: return rep + " >" def _service_subprocess( - self, max_wait_time: float, raise_on_timeout: bool = False, expect_signal: None | int = None + self, + max_wait_time: float, + raise_on_timeout: bool = False, + expect_signal: None | int = None, ): """ Service subprocess events by processing socket activity and checking for process exit. @@ -1333,7 +1357,12 @@ def start( # type: ignore[override] # infrastructure; keep bare fork for those. use_exec = target is _subprocess_main and sys.platform in _FORK_EXEC_PLATFORMS proc: Self = super().start( - id=what.id, client=client, target=target, logger=logger, use_exec=use_exec, **kwargs + id=what.id, + client=client, + target=target, + logger=logger, + use_exec=use_exec, + **kwargs, ) # Tell the task process what it needs to do! proc._on_child_started( @@ -1388,7 +1417,10 @@ def _on_child_started( self.send_msg(msg, request_id=0) except (BrokenPipeError, ConnectionResetError): # Debug is fine, the process will have shown _something_ in it's last_chance exception handler - log.debug("Couldn't send startup message to Subprocess - it died very early", pid=self.pid) + log.debug( + "Couldn't send startup message to Subprocess - it died very early", + pid=self.pid, + ) def wait(self) -> int: if self._exit_code is not None: @@ -1615,7 +1647,11 @@ def _send_heartbeat_if_needed(self): # Reset the counter on success self.failed_heartbeats = 0 except ServerResponseError as e: - if e.response.status_code in {HTTPStatus.NOT_FOUND, HTTPStatus.GONE, HTTPStatus.CONFLICT}: + if e.response.status_code in { + HTTPStatus.NOT_FOUND, + HTTPStatus.GONE, + HTTPStatus.CONFLICT, + }: log.error( "Server indicated the task shouldn't be running anymore", detail=e.detail, @@ -1648,7 +1684,8 @@ def _handle_heartbeat_failures(self, exc: Exception): # If we've failed to heartbeat too many times, kill the process if self.failed_heartbeats >= MAX_FAILED_HEARTBEATS: log.error( - "Too many failed heartbeats; terminating process", failed_heartbeats=self.failed_heartbeats + "Too many failed heartbeats; terminating process", + failed_heartbeats=self.failed_heartbeats, ) self.kill(signal.SIGTERM, force=True) @@ -1707,7 +1744,18 @@ def _handle_request(self, msg: ToSupervisor, log: FilteringBoundLogger, req_id: elif isinstance(msg, GetVariableKeys): resp, dump_opts = handle_get_variable_keys(self.client, msg) elif isinstance(msg, GetXCom): - resp, dump_opts = handle_get_xcom(self.client, msg) + xcom = self.client.xcoms.get( + msg.dag_id, + msg.run_id, + msg.task_id, + msg.key, + msg.map_index, + msg.include_prior_dates, + ) + xcom_result = XComResult.from_xcom_response(xcom) + resp = xcom_result + elif isinstance(msg, GetXComCount): + resp = self.client.xcoms.head(msg.dag_id, msg.run_id, msg.task_id, msg.key) elif isinstance(msg, GetXComSequenceItem): resp, dump_opts = handle_get_xcom_sequence_item(self.client, msg) elif isinstance(msg, GetXComSequenceSlice): @@ -1779,7 +1827,13 @@ def _handle_request(self, msg: ToSupervisor, log: FilteringBoundLogger, req_id: resp, dump_opts = handle_get_xcom_count(self.client, msg) elif isinstance(msg, TriggerDagRun): resp = self.client.dag_runs.trigger( - msg.dag_id, msg.run_id, msg.conf, msg.logical_date, msg.run_after, msg.reset_dag_run, msg.note + msg.dag_id, + msg.run_id, + msg.conf, + msg.logical_date, + msg.run_after, + msg.reset_dag_run, + msg.note, ) elif isinstance(msg, GetDagRun): dr_resp = self.client.dag_runs.get_detail(msg.dag_id, msg.run_id) @@ -1908,7 +1962,8 @@ def _send_new_log_fd(self, req_id: int) -> None: read_logs, selectors.EVENT_READ, make_buffered_socket_reader( - process_log_messages_from_subprocess(target_loggers), on_close=self._on_socket_closed + process_log_messages_from_subprocess(target_loggers), + on_close=self._on_socket_closed, ), ) # We don't explicitly close the old log socket, that will get handled for us if/when the other end is @@ -2041,7 +2096,11 @@ def start( # type: ignore[override] **kwargs, ) - from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance, finalize, run + from airflow.sdk.execution_time.task_runner import ( + RuntimeTaskInstance, + finalize, + run, + ) supervisor.comms = InProcessSupervisorComms(supervisor=supervisor) with set_supervisor_comms(supervisor.comms): @@ -2105,7 +2164,11 @@ def _api_client(dag=None): return client def send_msg( - self, msg: BaseModel | None, request_id: int, error: ErrorResponse | None = None, **dump_opts + self, + msg: BaseModel | None, + request_id: int, + error: ErrorResponse | None = None, + **dump_opts, ): """Override to use in-process comms.""" self.comms.messages.append(msg) @@ -2410,7 +2473,11 @@ def _configure_logging(log_path: str, client: Client) -> Generator[FilteringBoun # If we are told to write logs to a file, redirect the task logger to it. Make sure we append to the # file though, otherwise when we resume we would lose the logs from the start->deferral segment if it # lands on the same node as before. - from airflow.sdk.log import init_log_file, load_remote_log_handler, logging_processors + from airflow.sdk.log import ( + init_log_file, + load_remote_log_handler, + logging_processors, + ) log_file_descriptor: BinaryIO | TextIO | None = None @@ -2439,7 +2506,7 @@ def _configure_logging(log_path: str, client: Client) -> Generator[FilteringBoun # Without this, the only thing that ever closes the handler is # Python's logging.shutdown() at process exit, which fires after # supervise_task() returns. Any messages still queued in the handler - # at that point are silently dropped, producing: + # at that point are silently dropped. The AWS CloudWatch logger for example will emit # # WatchtowerWarning: "Received message after logging system shutdown" remote_handler = load_remote_log_handler() From 84597f229ecc29e6b94c74871b7f1ed275150293 Mon Sep 17 00:00:00 2001 From: Idris Akorede Ibrahim Date: Wed, 13 May 2026 10:02:22 +0100 Subject: [PATCH 07/11] Address review comments from o-nikolas and 1fanwang --- .../amazon/aws/log/cloudwatch_task_handler.py | 10 +++++----- task-sdk/src/airflow/sdk/log.py | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py index 7f32a06a6b0df..ab2421486a3f3 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py @@ -136,12 +136,12 @@ def processors(self) -> tuple[structlog.typing.Processor, ...]: import structlog.stdlib logRecordFactory = getLogRecordFactory() - # Eagerly init the handler to avoid infinite loops from logging during handler creation. - # We do NOT capture it in a closure variable — instead we access self.handler each time - # so that if the handler is killed by logging.shutdown() and recreated, the processor - # always uses the live instance rather than a dead one.# - _ = self.handler + # Eagerly initialize the handler before the closure is created. + # Without this, the first log emission would trigger handler creation, + # which itself logs, causing an infinite loop. + self._handler _self = self + from airflow.sdk.log import relative_path_from_logger def proc(logger: structlog.typing.WrappedLogger, method_name: str, event: structlog.typing.EventDict): diff --git a/task-sdk/src/airflow/sdk/log.py b/task-sdk/src/airflow/sdk/log.py index 4c0a50ee0b422..411588dd6d06f 100644 --- a/task-sdk/src/airflow/sdk/log.py +++ b/task-sdk/src/airflow/sdk/log.py @@ -119,7 +119,7 @@ def configure_logging( extra_processors += (mask_logs,) # NOTE: Do NOT call getattr(remote, "processors") here. - # Accessing remote.processors triggers creation of the watchtower CloudWatchLogHandler + # Accessing remote.processors triggers creation of, for example, the watchtower CloudWatchLogHandler # via a cached_property. The configure_logging() call below runs dictConfig() internally, # which calls _clearExistingHandlers() -> logging.shutdown() on ALL existing handlers — # including the watchtower handler we would have just built. The handler ends up dead From 2506df08890ff590b78065559f93f070944b04b7 Mon Sep 17 00:00:00 2001 From: Idris Akorede Ibrahim Date: Thu, 14 May 2026 08:39:22 +0100 Subject: [PATCH 08/11] Fix re-entrant handler creation recursion in CloudWatchRemoteLogIO.handler --- .../amazon/aws/log/cloudwatch_task_handler.py | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py index ab2421486a3f3..33af1a873a822 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py @@ -30,6 +30,8 @@ from typing import TYPE_CHECKING, Any import attrs +import structlog +import structlog.typing import watchtower from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook @@ -39,8 +41,6 @@ from airflow.utils.log.logging_mixin import LoggingMixin if TYPE_CHECKING: - import structlog.typing - from airflow.models.taskinstance import TaskInstance from airflow.providers.amazon.aws.hooks.logs import CloudWatchLogEvent from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI @@ -119,14 +119,23 @@ def _create_handler(self) -> watchtower.CloudWatchLogHandler: json_serialize_default=_json_serialize or json_serialize_legacy, ) + _handler_creating: bool = attrs.field(default=False, init=False, repr=False) + @property def handler(self) -> watchtower.CloudWatchLogHandler: - # Defensive self-healing: if the handler was killed by logging.shutdown() - # (shutting_down=True), recreate it. This can happen if dictConfig() is called - # after the handler was first created, since dictConfig calls - # _clearExistingHandlers() -> logging.shutdown() on all existing handlers. + if self._handler_creating: + # Re-entrant call during handler creation, some libraries log internally + # during initialization, which triggers this process again before + # handler is fully constructed. + if self._handler is not None and isinstance(self._handler, watchtower.CloudWatchLogHandler): + return self._handler + raise structlog.DropEvent() if self._handler is None or self._handler.shutting_down: - self._handler = self._create_handler() + self._handler_creating = True + try: + self._handler = self._create_handler() + finally: + self._handler_creating = False return self._handler @cached_property From c2e6283fbaff1fc9b24504c2064cf437e646b0ee Mon Sep 17 00:00:00 2001 From: Idris Akorede Ibrahim Date: Fri, 5 Jun 2026 17:06:41 +0100 Subject: [PATCH 09/11] Address ashb review: fix ordering, simplify context manager, log close errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Restore original _configure_logging call site ordering so secrets masker reset happens after logging is configured, not before - Simplify _configure_logging to yield only logger, not a tuple — file descriptor cleanup is internal to the context manager - Replace contextlib.suppress(Exception) in _close_remote_log_handler with a logged warning so close errors are visible for debugging - Revert f-string formatting change in _fork_main - Move import structlog.typing back into TYPE_CHECKING block - Revert handler to cached_property in CloudWatchRemoteLogIO, removing the self-healing property complexity introduced earlier - Keep only the processors closure fix: access self.handler dynamically instead of captured _handler variable - Remove tests for the self-healing property that is being reverted --- .../amazon/aws/log/cloudwatch_task_handler.py | 45 ++----- .../aws/log/test_cloudwatch_task_handler.py | 56 -------- .../airflow/sdk/execution_time/supervisor.py | 122 +++++------------- 3 files changed, 49 insertions(+), 174 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py index 33af1a873a822..7d0f1805c4e3d 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py @@ -41,6 +41,8 @@ from airflow.utils.log.logging_mixin import LoggingMixin if TYPE_CHECKING: + import structlog.typing + from airflow.models.taskinstance import TaskInstance from airflow.providers.amazon.aws.hooks.logs import CloudWatchLogEvent from airflow.sdk.types import RuntimeTaskInstanceProtocol as RuntimeTI @@ -107,9 +109,8 @@ def hook(self): aws_conn_id=conf.get("logging", "remote_log_conn_id"), region_name=self.region_name ) - _handler: watchtower.CloudWatchLogHandler | None = attrs.field(default=None, init=False, repr=False) - - def _create_handler(self) -> watchtower.CloudWatchLogHandler: + @cached_property + def handler(self) -> watchtower.CloudWatchLogHandler: _json_serialize = conf.getimport("aws", "cloudwatch_task_handler_json_serializer", fallback=None) return watchtower.CloudWatchLogHandler( log_group_name=self.log_group, @@ -119,25 +120,6 @@ def _create_handler(self) -> watchtower.CloudWatchLogHandler: json_serialize_default=_json_serialize or json_serialize_legacy, ) - _handler_creating: bool = attrs.field(default=False, init=False, repr=False) - - @property - def handler(self) -> watchtower.CloudWatchLogHandler: - if self._handler_creating: - # Re-entrant call during handler creation, some libraries log internally - # during initialization, which triggers this process again before - # handler is fully constructed. - if self._handler is not None and isinstance(self._handler, watchtower.CloudWatchLogHandler): - return self._handler - raise structlog.DropEvent() - if self._handler is None or self._handler.shutting_down: - self._handler_creating = True - try: - self._handler = self._create_handler() - finally: - self._handler_creating = False - return self._handler - @cached_property def processors(self) -> tuple[structlog.typing.Processor, ...]: from logging import getLogRecordFactory @@ -145,21 +127,22 @@ def processors(self) -> tuple[structlog.typing.Processor, ...]: import structlog.stdlib logRecordFactory = getLogRecordFactory() + # Eagerly initialize the handler before the closure is created. - # Without this, the first log emission would trigger handler creation, - # which itself logs, causing an infinite loop. - self._handler - _self = self + # Without this, the first log emission triggers handler creation, + # which itself logs internally during client initialization, + # causing an infinite loop. + self.handler from airflow.sdk.log import relative_path_from_logger def proc(logger: structlog.typing.WrappedLogger, method_name: str, event: structlog.typing.EventDict): if not logger or not (stream_name := relative_path_from_logger(logger)): return event - # We can't set the log stream name in the above init handler because - # the log path isn't known at that stage. - # Instead, we should always rely on the path (log stream name) provided by the logger. - _self.handler.log_stream_name = stream_name.as_posix().replace(":", "_") + # Access handler via self.handler each time rather than the + # closure-captured _handler, so the processor always uses the + # live instance rather than a potentially stale one. + self.handler.log_stream_name = stream_name.as_posix().replace(":", "_") name = event.get("logger_name") or event.get("logger", "") level = structlog.stdlib.NAME_TO_LEVEL.get(method_name.lower(), logging.INFO) msg = copy.copy(event) @@ -174,7 +157,7 @@ def proc(logger: structlog.typing.WrappedLogger, method_name: str, event: struct ct = created.timestamp() record.created = ct record.msecs = int((ct - int(ct)) * 1000) + 0.0 # Copied from stdlib logging - _self.handler.handle(record) + self.handler.handle(record) return event return (proc,) diff --git a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py index 98db9348be9ad..c13515e044039 100644 --- a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py +++ b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py @@ -242,62 +242,6 @@ def test_log_message(self): '{"foo": "bar", "event": "Hi", "level": "info", "timestamp": "2025-03-27T21:58:01.002000+00:00"}\n' ] - def test_handler_recreates_after_shutdown(self): - """ - Handler was a cached_property, once killed by logging.shutdown() - the dead instance was never replaced. Verify that accessing handler - after shutting_down=True returns a new live instance. - """ - with conf_vars({("logging", "base_log_folder"): self.local_log_location.as_posix()}): - # Access handler to initialize it - first_handler = self.subject.handler - assert first_handler.shutting_down is False - - # Simulate what dictConfig() does, calls logging.shutdown() on - # all registered handlers, setting shutting_down=True - first_handler.shutting_down = True - - # Accessing handler now must return a new live instance, not the dead one - second_handler = self.subject.handler - assert second_handler is not first_handler - assert second_handler.shutting_down is False - - second_handler.close() - - def test_processor_uses_live_handler_after_shutdown(self): - """ - Processors captured _handler in a closure, pinning the dead instance - even after the property recreated a new one. Verify that the processor - emits to the live handler after a shutdown/recreate cycle. - """ - with conf_vars({("logging", "base_log_folder"): self.local_log_location.as_posix()}): - # Get the processors tuple, this eagerly initializes the handler - procs = self.subject.processors - assert len(procs) == 1 - - # Simulate dictConfig() killing the handler - first_handler = self.subject.handler - first_handler.shutting_down = True - - # Access handler to trigger recreation - second_handler = self.subject.handler - assert second_handler is not first_handler - - # The processor must now emit to the live second_handler, not the dead first_handler - with mock.patch.object(second_handler, "handle") as mock_handle: - with mock.patch.object(first_handler, "handle") as mock_dead_handle: - import structlog - - log = structlog.get_logger() - log.info("test after recreate", foo="baz") - - # Live handler should receive the record - mock_handle.assert_called_once() - # Dead handler must not receive anything - mock_dead_handle.assert_not_called() - - second_handler.close() - @pytest.mark.db_test class TestCloudwatchTaskHandler: diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index e47079dbcc6f9..8b0fb6ad5c15d 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -483,8 +483,7 @@ def exit(n: int) -> NoReturn: except Exception as e: with suppress(Exception): print( - f"--- Last chance exception handler failed --- {repr(str(e))}\n", - file=last_chance_stderr, + f"--- Last chance exception handler failed --- {repr(str(e))}\n", file=last_chance_stderr ) exit(125) @@ -722,22 +721,13 @@ def start( # execv replaces the process -- unreachable on success else: # Run the child entrypoint - _fork_main( - child_requests, - child_stdout, - child_stderr, - child_logs.fileno(), - target, - ) + _fork_main(child_requests, child_stdout, child_stderr, child_logs.fileno(), target) except BaseException as e: import traceback with suppress(BaseException): # We can't use log here, as if we except out of the child something _weird_ went on. - print( - "Exception in child process, exiting with code 124", - file=sys.stderr, - ) + print("Exception in child process, exiting with code 124", file=sys.stderr) traceback.print_exception(type(e), e, e.__traceback__, file=sys.stderr) # It's really super super important we never exit this block. We are in the forked child, and if we @@ -796,9 +786,13 @@ def _register_pipe_readers( target_loggers = self._get_target_loggers() self.selector.register( +<<<<<<< HEAD stdout, selectors.EVENT_READ, self._create_log_forwarder(target_loggers, "task.stdout", data=data.get(stdout, b"")), +======= + stdout, selectors.EVENT_READ, self._create_log_forwarder(target_loggers, "task.stdout") +>>>>>>> 904f1ea7a9 (Address ashb review: fix ordering, simplify context manager, log close errors) ) self.selector.register( stderr, @@ -814,8 +808,7 @@ def _register_pipe_readers( logs, selectors.EVENT_READ, make_buffered_socket_reader( - process_log_messages_from_subprocess(target_loggers), - on_close=self._on_socket_closed, + process_log_messages_from_subprocess(target_loggers), on_close=self._on_socket_closed ), ) self.selector.register( @@ -848,9 +841,13 @@ def _create_log_forwarder( for log in loggers ) return make_buffered_socket_reader( +<<<<<<< HEAD forward_to_log(loggers, logger=name, level=log_level), data=data, on_close=self._on_socket_closed, +======= + forward_to_log(loggers, logger=name, level=log_level), on_close=self._on_socket_closed +>>>>>>> 904f1ea7a9 (Address ashb review: fix ordering, simplify context manager, log close errors) ) def _on_socket_closed(self, sock: socket): @@ -866,11 +863,7 @@ def _serialize_response(self, msg: BaseModel | ErrorResponse, **dump_opts) -> di return msg.model_dump(**dump_opts) def send_msg( - self, - msg: BaseModel | None, - request_id: int, - error: ErrorResponse | None = None, - **dump_opts, + self, msg: BaseModel | None, request_id: int, error: ErrorResponse | None = None, **dump_opts ): """ Send the msg as a length-prefixed response frame. @@ -1009,11 +1002,7 @@ def kill( return # Escalation sequence: SIGINT -> SIGTERM -> SIGKILL - escalation_path: list[signal.Signals] = [ - signal.SIGINT, - signal.SIGTERM, - signal.SIGKILL, - ] + escalation_path: list[signal.Signals] = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL] if force and signal_to_send in escalation_path: # Start from `signal_to_send` and escalate to the end of the escalation path @@ -1034,17 +1023,10 @@ def kill( # read from any of the sockets, so we need to re-run it if the process is still alive if ( exit_code := self._service_subprocess( - max_wait_time=end - now, - raise_on_timeout=False, - expect_signal=sig, + max_wait_time=end - now, raise_on_timeout=False, expect_signal=sig ) ) is not None: - log.info( - "Process exited", - pid=self.pid, - exit_code=exit_code, - signal_sent=sig.name, - ) + log.info("Process exited", pid=self.pid, exit_code=exit_code, signal_sent=sig.name) return now = time.monotonic() @@ -1078,10 +1060,7 @@ def __repr__(self) -> str: return rep + " >" def _service_subprocess( - self, - max_wait_time: float, - raise_on_timeout: bool = False, - expect_signal: None | int = None, + self, max_wait_time: float, raise_on_timeout: bool = False, expect_signal: None | int = None ): """ Service subprocess events by processing socket activity and checking for process exit. @@ -1357,12 +1336,7 @@ def start( # type: ignore[override] # infrastructure; keep bare fork for those. use_exec = target is _subprocess_main and sys.platform in _FORK_EXEC_PLATFORMS proc: Self = super().start( - id=what.id, - client=client, - target=target, - logger=logger, - use_exec=use_exec, - **kwargs, + id=what.id, client=client, target=target, logger=logger, use_exec=use_exec, **kwargs ) # Tell the task process what it needs to do! proc._on_child_started( @@ -1417,10 +1391,7 @@ def _on_child_started( self.send_msg(msg, request_id=0) except (BrokenPipeError, ConnectionResetError): # Debug is fine, the process will have shown _something_ in it's last_chance exception handler - log.debug( - "Couldn't send startup message to Subprocess - it died very early", - pid=self.pid, - ) + log.debug("Couldn't send startup message to Subprocess - it died very early", pid=self.pid) def wait(self) -> int: if self._exit_code is not None: @@ -1647,11 +1618,7 @@ def _send_heartbeat_if_needed(self): # Reset the counter on success self.failed_heartbeats = 0 except ServerResponseError as e: - if e.response.status_code in { - HTTPStatus.NOT_FOUND, - HTTPStatus.GONE, - HTTPStatus.CONFLICT, - }: + if e.response.status_code in {HTTPStatus.NOT_FOUND, HTTPStatus.GONE, HTTPStatus.CONFLICT}: log.error( "Server indicated the task shouldn't be running anymore", detail=e.detail, @@ -1684,8 +1651,7 @@ def _handle_heartbeat_failures(self, exc: Exception): # If we've failed to heartbeat too many times, kill the process if self.failed_heartbeats >= MAX_FAILED_HEARTBEATS: log.error( - "Too many failed heartbeats; terminating process", - failed_heartbeats=self.failed_heartbeats, + "Too many failed heartbeats; terminating process", failed_heartbeats=self.failed_heartbeats ) self.kill(signal.SIGTERM, force=True) @@ -1745,12 +1711,7 @@ def _handle_request(self, msg: ToSupervisor, log: FilteringBoundLogger, req_id: resp, dump_opts = handle_get_variable_keys(self.client, msg) elif isinstance(msg, GetXCom): xcom = self.client.xcoms.get( - msg.dag_id, - msg.run_id, - msg.task_id, - msg.key, - msg.map_index, - msg.include_prior_dates, + msg.dag_id, msg.run_id, msg.task_id, msg.key, msg.map_index, msg.include_prior_dates ) xcom_result = XComResult.from_xcom_response(xcom) resp = xcom_result @@ -1827,13 +1788,7 @@ def _handle_request(self, msg: ToSupervisor, log: FilteringBoundLogger, req_id: resp, dump_opts = handle_get_xcom_count(self.client, msg) elif isinstance(msg, TriggerDagRun): resp = self.client.dag_runs.trigger( - msg.dag_id, - msg.run_id, - msg.conf, - msg.logical_date, - msg.run_after, - msg.reset_dag_run, - msg.note, + msg.dag_id, msg.run_id, msg.conf, msg.logical_date, msg.run_after, msg.reset_dag_run, msg.note ) elif isinstance(msg, GetDagRun): dr_resp = self.client.dag_runs.get_detail(msg.dag_id, msg.run_id) @@ -1962,8 +1917,7 @@ def _send_new_log_fd(self, req_id: int) -> None: read_logs, selectors.EVENT_READ, make_buffered_socket_reader( - process_log_messages_from_subprocess(target_loggers), - on_close=self._on_socket_closed, + process_log_messages_from_subprocess(target_loggers), on_close=self._on_socket_closed ), ) # We don't explicitly close the old log socket, that will get handled for us if/when the other end is @@ -2096,11 +2050,7 @@ def start( # type: ignore[override] **kwargs, ) - from airflow.sdk.execution_time.task_runner import ( - RuntimeTaskInstance, - finalize, - run, - ) + from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance, finalize, run supervisor.comms = InProcessSupervisorComms(supervisor=supervisor) with set_supervisor_comms(supervisor.comms): @@ -2164,11 +2114,7 @@ def _api_client(dag=None): return client def send_msg( - self, - msg: BaseModel | None, - request_id: int, - error: ErrorResponse | None = None, - **dump_opts, + self, msg: BaseModel | None, request_id: int, error: ErrorResponse | None = None, **dump_opts ): """Override to use in-process comms.""" self.comms.messages.append(msg) @@ -2473,11 +2419,7 @@ def _configure_logging(log_path: str, client: Client) -> Generator[FilteringBoun # If we are told to write logs to a file, redirect the task logger to it. Make sure we append to the # file though, otherwise when we resume we would lose the logs from the start->deferral segment if it # lands on the same node as before. - from airflow.sdk.log import ( - init_log_file, - load_remote_log_handler, - logging_processors, - ) + from airflow.sdk.log import init_log_file, load_remote_log_handler, logging_processors log_file_descriptor: BinaryIO | TextIO | None = None @@ -2506,8 +2448,9 @@ def _configure_logging(log_path: str, client: Client) -> Generator[FilteringBoun # Without this, the only thing that ever closes the handler is # Python's logging.shutdown() at process exit, which fires after # supervise_task() returns. Any messages still queued in the handler - # at that point are silently dropped. The AWS CloudWatch logger for example will emit - # + # at that point are silently dropped. For example, the AWS CloudWatch + # logger will emit: + # WatchtowerWarning: "Received message after logging system shutdown" remote_handler = load_remote_log_handler() if remote_handler is not None: @@ -2614,7 +2557,12 @@ def supervise_task( try: with _configure_logging(log_path, client) if log_path else contextlib.nullcontext(None) as logger: +<<<<<<< HEAD result = coordinator.execute_task( +======= + process = ActivitySubprocess.start( + dag_rel_path=dag_rel_path, +>>>>>>> 904f1ea7a9 (Address ashb review: fix ordering, simplify context manager, log close errors) what=ti, dag_rel_path=dag_rel_path, bundle_info=bundle_info, From 93a67a78e9c59d95e9e60eb634e5a6bb1451332e Mon Sep 17 00:00:00 2001 From: Idris Akorede Ibrahim Date: Sat, 6 Jun 2026 12:02:08 +0100 Subject: [PATCH 10/11] Resolve conflict: use coordinator.execute_task with _configure_logging wrapper --- .../airflow/sdk/execution_time/supervisor.py | 22 +++---------------- 1 file changed, 3 insertions(+), 19 deletions(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index 8b0fb6ad5c15d..d4b961de8db0f 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -145,6 +145,7 @@ handle_get_ti_count, handle_get_variable, handle_get_variable_keys, + handle_get_xcom, handle_get_xcom_count, handle_get_xcom_sequence_item, handle_get_xcom_sequence_slice, @@ -786,13 +787,9 @@ def _register_pipe_readers( target_loggers = self._get_target_loggers() self.selector.register( -<<<<<<< HEAD stdout, selectors.EVENT_READ, self._create_log_forwarder(target_loggers, "task.stdout", data=data.get(stdout, b"")), -======= - stdout, selectors.EVENT_READ, self._create_log_forwarder(target_loggers, "task.stdout") ->>>>>>> 904f1ea7a9 (Address ashb review: fix ordering, simplify context manager, log close errors) ) self.selector.register( stderr, @@ -841,13 +838,9 @@ def _create_log_forwarder( for log in loggers ) return make_buffered_socket_reader( -<<<<<<< HEAD forward_to_log(loggers, logger=name, level=log_level), data=data, on_close=self._on_socket_closed, -======= - forward_to_log(loggers, logger=name, level=log_level), on_close=self._on_socket_closed ->>>>>>> 904f1ea7a9 (Address ashb review: fix ordering, simplify context manager, log close errors) ) def _on_socket_closed(self, sock: socket): @@ -1710,13 +1703,9 @@ def _handle_request(self, msg: ToSupervisor, log: FilteringBoundLogger, req_id: elif isinstance(msg, GetVariableKeys): resp, dump_opts = handle_get_variable_keys(self.client, msg) elif isinstance(msg, GetXCom): - xcom = self.client.xcoms.get( - msg.dag_id, msg.run_id, msg.task_id, msg.key, msg.map_index, msg.include_prior_dates - ) - xcom_result = XComResult.from_xcom_response(xcom) - resp = xcom_result + resp, dump_opts = handle_get_xcom(self.client, msg) elif isinstance(msg, GetXComCount): - resp = self.client.xcoms.head(msg.dag_id, msg.run_id, msg.task_id, msg.key) + resp, dump_opts = handle_get_xcom_count(self.client, msg) elif isinstance(msg, GetXComSequenceItem): resp, dump_opts = handle_get_xcom_sequence_item(self.client, msg) elif isinstance(msg, GetXComSequenceSlice): @@ -2557,12 +2546,7 @@ def supervise_task( try: with _configure_logging(log_path, client) if log_path else contextlib.nullcontext(None) as logger: -<<<<<<< HEAD result = coordinator.execute_task( -======= - process = ActivitySubprocess.start( - dag_rel_path=dag_rel_path, ->>>>>>> 904f1ea7a9 (Address ashb review: fix ordering, simplify context manager, log close errors) what=ti, dag_rel_path=dag_rel_path, bundle_info=bundle_info, From e4262d2924dd7146bccbad14d9835e6a8ff289ab Mon Sep 17 00:00:00 2001 From: Idris Akorede Ibrahim Date: Wed, 10 Jun 2026 09:22:34 +0100 Subject: [PATCH 11/11] Clean up merge artifacts in task execution handler - Remove duplicate GetXComCount handler in _handler_request - Fix with-statement syntax by wrapping conditional context manager - Eliminate leftover merge-conflict artifacts --- task-sdk/src/airflow/sdk/execution_time/supervisor.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index d4b961de8db0f..5f3a3bf4693ef 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -1773,8 +1773,6 @@ def _handle_request(self, msg: ToSupervisor, log: FilteringBoundLogger, req_id: dump_opts = {"exclude_unset": True} elif isinstance(msg, GetPrevSuccessfulDagRun): resp, dump_opts = handle_get_prev_successful_dag_run(self.client, self.id) - elif isinstance(msg, GetXComCount): - resp, dump_opts = handle_get_xcom_count(self.client, msg) elif isinstance(msg, TriggerDagRun): resp = self.client.dag_runs.trigger( msg.dag_id, msg.run_id, msg.conf, msg.logical_date, msg.run_after, msg.reset_dag_run, msg.note