diff --git a/shared/logging/src/airflow_shared/logging/structlog.py b/shared/logging/src/airflow_shared/logging/structlog.py index d82ac60526fe9..6677b3234667a 100644 --- a/shared/logging/src/airflow_shared/logging/structlog.py +++ b/shared/logging/src/airflow_shared/logging/structlog.py @@ -28,7 +28,7 @@ from functools import cache, cached_property, partial from pathlib import Path from types import ModuleType -from typing import TYPE_CHECKING, Any, BinaryIO, Generic, TextIO, TypeVar, cast +from typing import IO, TYPE_CHECKING, Any, BinaryIO, Generic, TextIO, TypeVar, cast import pygtrie import structlog @@ -803,6 +803,16 @@ def reconfigure_logger( ) +def clear_structlog_shared_lock(log_file_descriptor: IO[Any]): + try: + from structlog._output import WRITE_LOCKS + except ImportError: + WRITE_LOCKS = None # type: ignore[assignment] + + if WRITE_LOCKS is not None: + WRITE_LOCKS.pop(log_file_descriptor, None) + + if __name__ == "__main__": configure_logging( # json_output=True, diff --git a/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py b/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py index 94d84193192db..d698ddfbef317 100644 --- a/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py @@ -28,6 +28,7 @@ import structlog from pydantic import Field, TypeAdapter +from airflow.sdk._shared.logging.structlog import clear_structlog_shared_lock from airflow.sdk._shared.module_loading import accepts_context, accepts_keyword_args from airflow.sdk.exceptions import ErrorType from airflow.sdk.execution_time.comms import ( @@ -378,3 +379,4 @@ def supervise_callback( finally: if log_path and log_file_descriptor: log_file_descriptor.close() + clear_structlog_shared_lock(log_file_descriptor) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index cd25d9279571b..708f443ea980a 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -48,7 +48,7 @@ import structlog from pydantic import BaseModel, TypeAdapter -from airflow.sdk._shared.logging.structlog import reconfigure_logger +from airflow.sdk._shared.logging.structlog import clear_structlog_shared_lock, reconfigure_logger from airflow.sdk.api.client import Client, ServerResponseError from airflow.sdk.api.datamodels._generated import ( AssetResponse, @@ -2260,6 +2260,8 @@ def supervise_task( finally: if log_path and log_file_descriptor: log_file_descriptor.close() + clear_structlog_shared_lock(log_file_descriptor) + provider = trace.get_tracer_provider() if hasattr(provider, "force_flush"): provider.force_flush(timeout_millis=5000) # upper bound, not a fixed wait