From 2392d5ef9eb56ce28eb7f28e0ba6772ebcf93123 Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Thu, 23 Apr 2026 14:16:14 +0900 Subject: [PATCH 1/3] Fix memory growth from pathlib sys.intern in long-running processes --- .../src/airflow_shared/logging/structlog.py | 36 +++++++++++++++++-- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/shared/logging/src/airflow_shared/logging/structlog.py b/shared/logging/src/airflow_shared/logging/structlog.py index d5b0b9a8bfe43..0490e90203cc2 100644 --- a/shared/logging/src/airflow_shared/logging/structlog.py +++ b/shared/logging/src/airflow_shared/logging/structlog.py @@ -60,6 +60,36 @@ LEVEL_TO_FILTERING_LOGGER: dict[int, type[Logger]] = {} +class _PatchedPath(Path): + """ + Backport of Python 3.14's ``PurePath._parse_path`` without ``sys.intern``. + + The ``sys.intern`` call in the stock ``_parse_path`` causes memory + to grow unboundedly in long-running processes. Upstream removed it + in Python 3.14 (https://github.com/python/cpython/issues/119518); + this class applies the same fix for earlier versions. + """ + + @classmethod + def _parse_path(cls, path: str) -> tuple[str, str, list[str]]: + if not path: + return "", "", [] + sep = os.path.sep + altsep = os.path.altsep + if altsep: + path = path.replace(altsep, sep) + drv, root, rel = os.path.splitroot(path) + if not root and drv.startswith(sep) and not drv.endswith(sep): + drv_parts = drv.split(sep) + if len(drv_parts) == 4 and drv_parts[2] not in "?.": + # e.g. //server/share + root = sep + elif len(drv_parts) == 6: + # e.g. //?/unc/server/share + root = sep + return drv, root, [x for x in rel.split(sep) if x and x != "."] + + def _make_airflow_structlogger(min_level): # This uses https://github.com/hynek/structlog/blob/2f0cc42d/src/structlog/_native.py#L126 # as inspiration @@ -717,8 +747,8 @@ def init_log_folder(directory: str | os.PathLike[str], new_folder_permissions: i sure that the same group is set as default group for both - impersonated user and main airflow user. """ - directory = Path(directory) - for parent in reversed(Path(directory).parents): + directory = _PatchedPath(directory) + for parent in reversed(_PatchedPath(directory).parents): parent.mkdir(mode=new_folder_permissions, exist_ok=True) directory.mkdir(mode=new_folder_permissions, exist_ok=True) @@ -737,7 +767,7 @@ def init_log_file( See above ``init_log_folder`` method for more detailed explanation. """ - full_path = Path(base_log_folder, local_relative_path) + full_path = _PatchedPath(base_log_folder, local_relative_path) init_log_folder(full_path.parent, new_folder_permissions) try: From 0e4fefbbae41025ae8d3cd29b859144ec2efa4ec Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Thu, 23 Apr 2026 16:12:44 +0900 Subject: [PATCH 2/3] fix supporting python version --- .../src/airflow_shared/logging/structlog.py | 64 +++++++++++-------- 1 file changed, 36 insertions(+), 28 deletions(-) diff --git a/shared/logging/src/airflow_shared/logging/structlog.py b/shared/logging/src/airflow_shared/logging/structlog.py index 0490e90203cc2..06dca5b40b9bc 100644 --- a/shared/logging/src/airflow_shared/logging/structlog.py +++ b/shared/logging/src/airflow_shared/logging/structlog.py @@ -60,34 +60,42 @@ LEVEL_TO_FILTERING_LOGGER: dict[int, type[Logger]] = {} -class _PatchedPath(Path): - """ - Backport of Python 3.14's ``PurePath._parse_path`` without ``sys.intern``. - - The ``sys.intern`` call in the stock ``_parse_path`` causes memory - to grow unboundedly in long-running processes. Upstream removed it - in Python 3.14 (https://github.com/python/cpython/issues/119518); - this class applies the same fix for earlier versions. - """ - - @classmethod - def _parse_path(cls, path: str) -> tuple[str, str, list[str]]: - if not path: - return "", "", [] - sep = os.path.sep - altsep = os.path.altsep - if altsep: - path = path.replace(altsep, sep) - drv, root, rel = os.path.splitroot(path) - if not root and drv.startswith(sep) and not drv.endswith(sep): - drv_parts = drv.split(sep) - if len(drv_parts) == 4 and drv_parts[2] not in "?.": - # e.g. //server/share - root = sep - elif len(drv_parts) == 6: - # e.g. //?/unc/server/share - root = sep - return drv, root, [x for x in rel.split(sep) if x and x != "."] +# ``_parse_path`` was introduced in Python 3.12; older versions use a different +# parsing path (``_flavour.parse_parts``) that does not call ``sys.intern``, +# so the patch is neither necessary nor applicable there. +if (3, 12) <= sys.version_info < (3, 14): + + class _PatchedPath(Path): + """ + Backport of Python 3.14's ``PurePath._parse_path`` without ``sys.intern``. + + The ``sys.intern`` call in the stock ``_parse_path`` causes memory + to grow unboundedly in long-running processes. Upstream removed it + in Python 3.14 (https://github.com/python/cpython/issues/119518); + this class applies the same fix for earlier versions. + """ + + @classmethod + def _parse_path(cls, path: str) -> tuple[str, str, list[str]]: + if not path: + return "", "", [] + sep = os.path.sep + altsep = os.path.altsep + if altsep: + path = path.replace(altsep, sep) + drv, root, rel = os.path.splitroot(path) + if not root and drv.startswith(sep) and not drv.endswith(sep): + drv_parts = drv.split(sep) + if len(drv_parts) == 4 and drv_parts[2] not in "?.": + # e.g. //server/share + root = sep + elif len(drv_parts) == 6: + # e.g. //?/unc/server/share + root = sep + return drv, root, [x for x in rel.split(sep) if x and x != "."] + +else: + _PatchedPath = Path # type: ignore[misc, assignment] def _make_airflow_structlogger(min_level): From 26eb9452e1ed0124545ebded52cae0be42980458 Mon Sep 17 00:00:00 2001 From: wjddn279 Date: Thu, 23 Apr 2026 22:35:50 +0900 Subject: [PATCH 3/3] fix logic --- shared/logging/src/airflow_shared/logging/structlog.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/shared/logging/src/airflow_shared/logging/structlog.py b/shared/logging/src/airflow_shared/logging/structlog.py index 06dca5b40b9bc..d82ac60526fe9 100644 --- a/shared/logging/src/airflow_shared/logging/structlog.py +++ b/shared/logging/src/airflow_shared/logging/structlog.py @@ -62,8 +62,11 @@ # ``_parse_path`` was introduced in Python 3.12; older versions use a different # parsing path (``_flavour.parse_parts``) that does not call ``sys.intern``, -# so the patch is neither necessary nor applicable there. -if (3, 12) <= sys.version_info < (3, 14): +# so the patch is neither necessary nor applicable there. Python 3.14 removed +# the ``sys.intern`` call upstream, so the patch is unnecessary there too. +if sys.version_info < (3, 12) or sys.version_info >= (3, 14): + _PatchedPath = Path # type: ignore[misc, assignment] +else: class _PatchedPath(Path): """ @@ -94,9 +97,6 @@ def _parse_path(cls, path: str) -> tuple[str, str, list[str]]: root = sep return drv, root, [x for x in rel.split(sep) if x and x != "."] -else: - _PatchedPath = Path # type: ignore[misc, assignment] - def _make_airflow_structlogger(min_level): # This uses https://github.com/hynek/structlog/blob/2f0cc42d/src/structlog/_native.py#L126