Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 41 additions & 3 deletions shared/logging/src/airflow_shared/logging/structlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,44 @@
LEVEL_TO_FILTERING_LOGGER: dict[int, type[Logger]] = {}


# ``_parse_path`` was introduced in Python 3.12; older versions use a different
# parsing path (``_flavour.parse_parts``) that does not call ``sys.intern``,
# so the patch is neither necessary nor applicable there. Python 3.14 removed
# the ``sys.intern`` call upstream, so the patch is unnecessary there too.
if sys.version_info < (3, 12) or sys.version_info >= (3, 14):
_PatchedPath = Path # type: ignore[misc, assignment]
else:

class _PatchedPath(Path):
"""
Backport of Python 3.14's ``PurePath._parse_path`` without ``sys.intern``.

The ``sys.intern`` call in the stock ``_parse_path`` causes memory
to grow unboundedly in long-running processes. Upstream removed it
in Python 3.14 (https://github.com/python/cpython/issues/119518);
this class applies the same fix for earlier versions.
"""

@classmethod
def _parse_path(cls, path: str) -> tuple[str, str, list[str]]:
if not path:
return "", "", []
sep = os.path.sep
altsep = os.path.altsep
if altsep:
path = path.replace(altsep, sep)
drv, root, rel = os.path.splitroot(path)
if not root and drv.startswith(sep) and not drv.endswith(sep):
drv_parts = drv.split(sep)
if len(drv_parts) == 4 and drv_parts[2] not in "?.":
# e.g. //server/share
root = sep
elif len(drv_parts) == 6:
# e.g. //?/unc/server/share
root = sep
return drv, root, [x for x in rel.split(sep) if x and x != "."]


def _make_airflow_structlogger(min_level):
# This uses https://github.com/hynek/structlog/blob/2f0cc42d/src/structlog/_native.py#L126
# as inspiration
Expand Down Expand Up @@ -717,8 +755,8 @@ def init_log_folder(directory: str | os.PathLike[str], new_folder_permissions: i
sure that the same group is set as default group for both - impersonated user and main airflow
user.
"""
directory = Path(directory)
for parent in reversed(Path(directory).parents):
directory = _PatchedPath(directory)
for parent in reversed(_PatchedPath(directory).parents):
parent.mkdir(mode=new_folder_permissions, exist_ok=True)
directory.mkdir(mode=new_folder_permissions, exist_ok=True)

Expand All @@ -737,7 +775,7 @@ def init_log_file(

See above ``init_log_folder`` method for more detailed explanation.
"""
full_path = Path(base_log_folder, local_relative_path)
full_path = _PatchedPath(base_log_folder, local_relative_path)
init_log_folder(full_path.parent, new_folder_permissions)

try:
Expand Down
Loading