diff --git a/shared/logging/src/airflow_shared/logging/structlog.py b/shared/logging/src/airflow_shared/logging/structlog.py index d5b0b9a8bfe43..d82ac60526fe9 100644 --- a/shared/logging/src/airflow_shared/logging/structlog.py +++ b/shared/logging/src/airflow_shared/logging/structlog.py @@ -60,6 +60,44 @@ LEVEL_TO_FILTERING_LOGGER: dict[int, type[Logger]] = {} +# ``_parse_path`` was introduced in Python 3.12; older versions use a different +# parsing path (``_flavour.parse_parts``) that does not call ``sys.intern``, +# so the patch is neither necessary nor applicable there. Python 3.14 removed +# the ``sys.intern`` call upstream, so the patch is unnecessary there too. +if sys.version_info < (3, 12) or sys.version_info >= (3, 14): + _PatchedPath = Path # type: ignore[misc, assignment] +else: + + class _PatchedPath(Path): + """ + Backport of Python 3.14's ``PurePath._parse_path`` without ``sys.intern``. + + The ``sys.intern`` call in the stock ``_parse_path`` causes memory + to grow unboundedly in long-running processes. Upstream removed it + in Python 3.14 (https://github.com/python/cpython/issues/119518); + this class applies the same fix for earlier versions. + """ + + @classmethod + def _parse_path(cls, path: str) -> tuple[str, str, list[str]]: + if not path: + return "", "", [] + sep = os.path.sep + altsep = os.path.altsep + if altsep: + path = path.replace(altsep, sep) + drv, root, rel = os.path.splitroot(path) + if not root and drv.startswith(sep) and not drv.endswith(sep): + drv_parts = drv.split(sep) + if len(drv_parts) == 4 and drv_parts[2] not in "?.": + # e.g. //server/share + root = sep + elif len(drv_parts) == 6: + # e.g. //?/unc/server/share + root = sep + return drv, root, [x for x in rel.split(sep) if x and x != "."] + + def _make_airflow_structlogger(min_level): # This uses https://github.com/hynek/structlog/blob/2f0cc42d/src/structlog/_native.py#L126 # as inspiration @@ -717,8 +755,8 @@ def init_log_folder(directory: str | os.PathLike[str], new_folder_permissions: i sure that the same group is set as default group for both - impersonated user and main airflow user. """ - directory = Path(directory) - for parent in reversed(Path(directory).parents): + directory = _PatchedPath(directory) + for parent in reversed(_PatchedPath(directory).parents): parent.mkdir(mode=new_folder_permissions, exist_ok=True) directory.mkdir(mode=new_folder_permissions, exist_ok=True) @@ -737,7 +775,7 @@ def init_log_file( See above ``init_log_folder`` method for more detailed explanation. """ - full_path = Path(base_log_folder, local_relative_path) + full_path = _PatchedPath(base_log_folder, local_relative_path) init_log_folder(full_path.parent, new_folder_permissions) try: