Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
from typing import TYPE_CHECKING, Any

import attrs
import structlog
import structlog.typing
Comment on lines +33 to +34

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
import structlog
import structlog.typing

import watchtower

from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook
Expand Down Expand Up @@ -125,18 +127,22 @@ def processors(self) -> tuple[structlog.typing.Processor, ...]:
import structlog.stdlib

logRecordFactory = getLogRecordFactory()
# The handler MUST be initted here, before the processor is actually used to log anything.
# Otherwise, logging that occurs during the creation of the handler can create infinite loops.
_handler = self.handler

# Eagerly initialize the handler before the closure is created.
# Without this, the first log emission triggers handler creation,
# which itself logs internally during client initialization,
# causing an infinite loop.
self.handler

from airflow.sdk.log import relative_path_from_logger

def proc(logger: structlog.typing.WrappedLogger, method_name: str, event: structlog.typing.EventDict):
if not logger or not (stream_name := relative_path_from_logger(logger)):
return event
# We can't set the log stream name in the above init handler because
# the log path isn't known at that stage.
# Instead, we should always rely on the path (log stream name) provided by the logger.
_handler.log_stream_name = stream_name.as_posix().replace(":", "_")
# Access handler via self.handler each time rather than the
# closure-captured _handler, so the processor always uses the
# live instance rather than a potentially stale one.
self.handler.log_stream_name = stream_name.as_posix().replace(":", "_")
name = event.get("logger_name") or event.get("logger", "")
level = structlog.stdlib.NAME_TO_LEVEL.get(method_name.lower(), logging.INFO)
msg = copy.copy(event)
Expand All @@ -151,7 +157,7 @@ def proc(logger: structlog.typing.WrappedLogger, method_name: str, event: struct
ct = created.timestamp()
record.created = ct
record.msecs = int((ct - int(ct)) * 1000) + 0.0 # Copied from stdlib logging
_handler.handle(record)
self.handler.handle(record)
return event

return (proc,)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -908,3 +908,7 @@ def _get_result(self, hit: dict[Any, Any], parent_class=None) -> Hit:
# callback should get the Hit class if "from_es" is not defined
callback: type[Hit] | Callable[..., Any] = getattr(doc_class, "from_es", doc_class)
return callback(hit)

def close(self) -> None:
"""No-op: logs are streamed in real time and require no explicit close."""
pass
Original file line number Diff line number Diff line change
Expand Up @@ -1008,3 +1008,7 @@ def _get_result(self, hit: dict[Any, Any], parent_class=None) -> Hit:

callback: type[Hit] | Callable[..., Any] = getattr(doc_class, "from_es", doc_class)
return callback(hit)

def close(self) -> None:
"""No-op: logs are streamed in real time and require no explicit close."""
pass
4 changes: 4 additions & 0 deletions shared/logging/src/airflow_shared/logging/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ def read(self, relative_path: str, ti: RuntimeTI) -> LogResponse:
"""Read logs from the given remote log path."""
...

def close(self) -> None:
"""Flush and close the remote log handler."""
...
Comment on lines +67 to +69

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to introduce the close method at RemoteLogIO protocol in this PR.
Since it's still nop for ES and OS remote IO implementation.



@runtime_checkable
class RemoteLogStreamIO(RemoteLogIO, Protocol):
Expand Down
6 changes: 6 additions & 0 deletions shared/logging/tests/logging/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ def upload(self, path, ti):
def read(self, relative_path, ti):
return ([], [])

def close(self) -> None:
pass


class TestDiscoverRemoteLogHandler:
def test_discovers_handler_and_conn_id(self):
Expand Down Expand Up @@ -160,6 +163,9 @@ def read(self, relative_path, ti):
def stream(self, relative_path, ti):
return ([], [])

def close(self) -> None:
pass

handler = StreamHandler()
assert isinstance(handler, RemoteLogStreamIO)

Expand Down
87 changes: 60 additions & 27 deletions task-sdk/src/airflow/sdk/execution_time/supervisor.py
Comment thread
korex-f marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@
from typing_extensions import Self

from airflow.executors.workloads import BundleInfo
from airflow.sdk._shared.logging.remote import RemoteLogIO
from airflow.sdk.bases.secrets_backend import BaseSecretsBackend
from airflow.sdk.definitions.connection import Connection
from airflow.sdk.execution_time.workloads.task import TaskInstanceDTO
Expand Down Expand Up @@ -855,11 +856,7 @@ def _serialize_response(self, msg: BaseModel | ErrorResponse, **dump_opts) -> di
return msg.model_dump(**dump_opts)

def send_msg(
self,
msg: BaseModel | None,
request_id: int,
error: ErrorResponse | None = None,
**dump_opts,
self, msg: BaseModel | None, request_id: int, error: ErrorResponse | None = None, **dump_opts
):
"""
Send the msg as a length-prefixed response frame.
Expand Down Expand Up @@ -1707,6 +1704,8 @@ def _handle_request(self, msg: ToSupervisor, log: FilteringBoundLogger, req_id:
resp, dump_opts = handle_get_variable_keys(self.client, msg)
elif isinstance(msg, GetXCom):
resp, dump_opts = handle_get_xcom(self.client, msg)
elif isinstance(msg, GetXComCount):
resp, dump_opts = handle_get_xcom_count(self.client, msg)
elif isinstance(msg, GetXComSequenceItem):
resp, dump_opts = handle_get_xcom_sequence_item(self.client, msg)
elif isinstance(msg, GetXComSequenceSlice):
Expand Down Expand Up @@ -1774,8 +1773,6 @@ def _handle_request(self, msg: ToSupervisor, log: FilteringBoundLogger, req_id:
dump_opts = {"exclude_unset": True}
elif isinstance(msg, GetPrevSuccessfulDagRun):
resp, dump_opts = handle_get_prev_successful_dag_run(self.client, self.id)
elif isinstance(msg, GetXComCount):
resp, dump_opts = handle_get_xcom_count(self.client, msg)
Comment on lines 858 to -1778

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we revert these non-related changes? (the parameters style of def send_msg( and the order of elif isinstance(msg, GetXComCount): ).

elif isinstance(msg, TriggerDagRun):
resp = self.client.dag_runs.trigger(
msg.dag_id, msg.run_id, msg.conf, msg.logical_date, msg.run_after, msg.reset_dag_run, msg.note
Expand Down Expand Up @@ -2385,11 +2382,31 @@ def ensure_secrets_backend_loaded() -> list[BaseSecretsBackend]:
return ensure_secrets_loaded(default_backends=fallback_backends)


def _configure_logging(log_path: str, client: Client) -> tuple[FilteringBoundLogger, BinaryIO | TextIO]:
def _close_remote_log_handler(handler: RemoteLogIO) -> None:
"""
Close the remote log handler explicitly after all task log messages have been drained from the subprocess pipe.

Called after process.wait() returns, before process exit triggers
logging.shutdown(). This ensures the remote handler's internal batch
queue is flushed before the process tears down. For example, the AWS
CloudWatch logger will emit:

WatchtowerWarning: "Received message after logging system shutdown"

if this is not done before process exit.
"""
try:
handler.close()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to have more defensive compatible guard against the handler.close call.
Since not all remote logging handler support close method when using the Task-SDK version including this patch.

Suggested change
handler.close()
if hasattr(handler, "close", None):
handler.close()

except Exception:
log.warning("Failed to close remote log handler", exc_info=True)


@contextlib.contextmanager
def _configure_logging(log_path: str, client: Client) -> Generator[FilteringBoundLogger, None, None]:
# If we are told to write logs to a file, redirect the task logger to it. Make sure we append to the
# file though, otherwise when we resume we would lose the logs from the start->deferral segment if it
# lands on the same node as before.
from airflow.sdk.log import init_log_file, logging_processors
from airflow.sdk.log import init_log_file, load_remote_log_handler, logging_processors

log_file_descriptor: BinaryIO | TextIO | None = None

Expand All @@ -2405,9 +2422,30 @@ def _configure_logging(log_path: str, client: Client) -> tuple[FilteringBoundLog

with _remote_logging_conn(client):
processors = logging_processors(json_output=json_logs)

logger = structlog.wrap_logger(underlying_logger, processors=processors, logger_name="task").bind()

return logger, log_file_descriptor
try:
yield logger
finally:
# Flush and close the remote handler now — AFTER the supervisor has
# drained all task log messages from the subprocess pipe (i.e. after
# process.wait() has returned).
#
# Without this, the only thing that ever closes the handler is
# Python's logging.shutdown() at process exit, which fires after
# supervise_task() returns. Any messages still queued in the handler
# at that point are silently dropped. For example, the AWS CloudWatch
# logger will emit:

# WatchtowerWarning: "Received message after logging system shutdown"
remote_handler = load_remote_log_handler()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please keep the TODO instead of dropping it silently.

Suggested change
remote_handler = load_remote_log_handler()
# TODO: Use logging providers to handle the chunked upload for us etc.
remote_handler = load_remote_log_handler()

if remote_handler is not None:
Comment on lines +2442 to +2443

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
remote_handler = load_remote_log_handler()
if remote_handler is not None:
if (remote_handler := load_remote_log_handler()) is not None:

_close_remote_log_handler(remote_handler)

if log_file_descriptor is not None:
with contextlib.suppress(Exception):
log_file_descriptor.close()


def supervise_task(
Expand Down Expand Up @@ -2495,12 +2533,6 @@ def supervise_task(
with _ensure_client(server, token, client=client, dry_run=dry_run) as client:
start = time.monotonic()

# TODO: Use logging providers to handle the chunked upload for us etc.
logger: FilteringBoundLogger | None = None
log_file_descriptor: BinaryIO | TextIO | None = None
if log_path:
logger, log_file_descriptor = _configure_logging(log_path, client)

backends = ensure_secrets_backend_loaded()
log.info(
"Secrets backends loaded for worker",
Expand All @@ -2511,15 +2543,18 @@ def supervise_task(
reset_secrets_masker()

try:
result = coordinator.execute_task(
what=ti,
dag_rel_path=dag_rel_path,
bundle_info=bundle_info,
client=client,
logger=logger,
sentry_integration=sentry_integration,
subprocess_logs_to_stdout=subprocess_logs_to_stdout,
)
with _configure_logging(log_path, client) if log_path else contextlib.nullcontext(None) as logger:
result = coordinator.execute_task(
what=ti,
dag_rel_path=dag_rel_path,
bundle_info=bundle_info,
client=client,
logger=logger,
sentry_integration=sentry_integration,
subprocess_logs_to_stdout=subprocess_logs_to_stdout,
)
# _configure_logging.__exit__ fires here, remote handler is
# flushed and closed after all task log messages are drained.
end = time.monotonic()
log.info(
"Workload finished",
Expand All @@ -2531,8 +2566,6 @@ def supervise_task(
)
return result.exit_code
finally:
if log_path and log_file_descriptor:
log_file_descriptor.close()
provider = trace.get_tracer_provider()
if hasattr(provider, "force_flush"):
provider.force_flush(timeout_millis=5000) # upper bound, not a fixed wait
Expand Down
19 changes: 17 additions & 2 deletions task-sdk/src/airflow/sdk/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,14 @@ def configure_logging(
if mask_secrets:
extra_processors += (mask_logs,)

if (remote := load_remote_log_handler()) and (remote_processors := getattr(remote, "processors")):
extra_processors += remote_processors
# NOTE: Do NOT call getattr(remote, "processors") here.
# Accessing remote.processors triggers creation of, for example, the watchtower CloudWatchLogHandler
# via a cached_property. The configure_logging() call below runs dictConfig() internally,
# which calls _clearExistingHandlers() -> logging.shutdown() on ALL existing handlers —
# including the watchtower handler we would have just built. The handler ends up dead
# (shutting_down=True) before a single task log is emitted.
# See: https://github.com/apache/airflow/issues/66475
# Remote processors are injected AFTER dictConfig via a second structlog.configure() call below.

configure_logging(
json_output=json_output,
Expand All @@ -133,6 +139,15 @@ def configure_logging(
callsite_parameters=callsite_params,
)

# dictConfig has now run — safe to build the watchtower handler.
# We append remote processors into the already-configured structlog chain,
# inserting before the final renderer (last processor in the chain).
if (remote := load_remote_log_handler()) and (remote_processors := getattr(remote, "processors")):
current_processors = list(structlog.get_config()["processors"])
# Insert before the final renderer
updated_processors = current_processors[:-1] + list(remote_processors) + [current_processors[-1]]
structlog.configure(processors=updated_processors)


def logger_at_level(name: str, level: int) -> Logger:
"""Create a new logger at the given level."""
Expand Down
Loading