From fb804eb5e7e7d828ad9fb2eea92770a5e7525838 Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Mon, 13 Apr 2026 10:01:29 +0900 Subject: [PATCH 1/3] fix memory leak in worker caused by log_file_descriptor --- task-sdk/src/airflow/sdk/execution_time/supervisor.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index cd25d9279571b..7bba6ae7dc53d 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -47,6 +47,7 @@ import psutil import structlog from pydantic import BaseModel, TypeAdapter +from structlog._output import WRITE_LOCKS from airflow.sdk._shared.logging.structlog import reconfigure_logger from airflow.sdk.api.client import Client, ServerResponseError @@ -2260,6 +2261,7 @@ def supervise_task( finally: if log_path and log_file_descriptor: log_file_descriptor.close() + WRITE_LOCKS.pop(log_file_descriptor) provider = trace.get_tracer_provider() if hasattr(provider, "force_flush"): provider.force_flush(timeout_millis=5000) # upper bound, not a fixed wait From 978852351ac6e2939ef0c1c55df22f58f75ea35b Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Thu, 23 Apr 2026 13:18:37 +0900 Subject: [PATCH 2/3] fix logic --- task-sdk/src/airflow/sdk/execution_time/supervisor.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index 7bba6ae7dc53d..1c20479432c11 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -47,7 +47,6 @@ import psutil import structlog from pydantic import BaseModel, TypeAdapter -from structlog._output import WRITE_LOCKS from airflow.sdk._shared.logging.structlog import reconfigure_logger from airflow.sdk.api.client import Client, ServerResponseError @@ -133,10 +132,17 @@ except ImportError: send_fds = None # type: ignore[assignment] +<<<<<<< HEAD from opentelemetry import context as otel_context, trace from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator _trace_propagator = TraceContextTextMapPropagator() +======= +try: + from structlog._output import WRITE_LOCKS +except ImportError: + WRITE_LOCKS = None # type: ignore[assignment] +>>>>>>> d26179d9e0 (fix logic) if TYPE_CHECKING: from structlog.typing import FilteringBoundLogger, WrappedLogger @@ -2261,7 +2267,8 @@ def supervise_task( finally: if log_path and log_file_descriptor: log_file_descriptor.close() - WRITE_LOCKS.pop(log_file_descriptor) + if WRITE_LOCKS is not None: + WRITE_LOCKS.pop(log_file_descriptor, None) provider = trace.get_tracer_provider() if hasattr(provider, "force_flush"): provider.force_flush(timeout_millis=5000) # upper bound, not a fixed wait From 0a08921db4426d73a6d5969d6b758420a5574c69 Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Thu, 30 Apr 2026 15:23:22 +0900 Subject: [PATCH 3/3] fix logic --- .../logging/src/airflow_shared/logging/structlog.py | 12 +++++++++++- .../sdk/execution_time/callback_supervisor.py | 2 ++ .../src/airflow/sdk/execution_time/supervisor.py | 13 +++---------- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/shared/logging/src/airflow_shared/logging/structlog.py b/shared/logging/src/airflow_shared/logging/structlog.py index d82ac60526fe9..6677b3234667a 100644 --- a/shared/logging/src/airflow_shared/logging/structlog.py +++ b/shared/logging/src/airflow_shared/logging/structlog.py @@ -28,7 +28,7 @@ from functools import cache, cached_property, partial from pathlib import Path from types import ModuleType -from typing import TYPE_CHECKING, Any, BinaryIO, Generic, TextIO, TypeVar, cast +from typing import IO, TYPE_CHECKING, Any, BinaryIO, Generic, TextIO, TypeVar, cast import pygtrie import structlog @@ -803,6 +803,16 @@ def reconfigure_logger( ) +def clear_structlog_shared_lock(log_file_descriptor: IO[Any]): + try: + from structlog._output import WRITE_LOCKS + except ImportError: + WRITE_LOCKS = None # type: ignore[assignment] + + if WRITE_LOCKS is not None: + WRITE_LOCKS.pop(log_file_descriptor, None) + + if __name__ == "__main__": configure_logging( # json_output=True, diff --git a/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py b/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py index 94d84193192db..d698ddfbef317 100644 --- a/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py @@ -28,6 +28,7 @@ import structlog from pydantic import Field, TypeAdapter +from airflow.sdk._shared.logging.structlog import clear_structlog_shared_lock from airflow.sdk._shared.module_loading import accepts_context, accepts_keyword_args from airflow.sdk.exceptions import ErrorType from airflow.sdk.execution_time.comms import ( @@ -378,3 +379,4 @@ def supervise_callback( finally: if log_path and log_file_descriptor: log_file_descriptor.close() + clear_structlog_shared_lock(log_file_descriptor) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index 1c20479432c11..708f443ea980a 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -48,7 +48,7 @@ import structlog from pydantic import BaseModel, TypeAdapter -from airflow.sdk._shared.logging.structlog import reconfigure_logger +from airflow.sdk._shared.logging.structlog import clear_structlog_shared_lock, reconfigure_logger from airflow.sdk.api.client import Client, ServerResponseError from airflow.sdk.api.datamodels._generated import ( AssetResponse, @@ -132,17 +132,10 @@ except ImportError: send_fds = None # type: ignore[assignment] -<<<<<<< HEAD from opentelemetry import context as otel_context, trace from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator _trace_propagator = TraceContextTextMapPropagator() -======= -try: - from structlog._output import WRITE_LOCKS -except ImportError: - WRITE_LOCKS = None # type: ignore[assignment] ->>>>>>> d26179d9e0 (fix logic) if TYPE_CHECKING: from structlog.typing import FilteringBoundLogger, WrappedLogger @@ -2267,8 +2260,8 @@ def supervise_task( finally: if log_path and log_file_descriptor: log_file_descriptor.close() - if WRITE_LOCKS is not None: - WRITE_LOCKS.pop(log_file_descriptor, None) + clear_structlog_shared_lock(log_file_descriptor) + provider = trace.get_tracer_provider() if hasattr(provider, "force_flush"): provider.force_flush(timeout_millis=5000) # upper bound, not a fixed wait