Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Containerfile.py36
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,8 @@ COPY tests/ ./tests/
# Install test dependencies
RUN pip3 install "pytest < 7.1" "pytest-cov < 3"

# Install runtime dependencies
RUN pip3 install "defusedxml<0.8"

# Default command: run tests
CMD ["python3.6", "-m", "pytest", "--cov", "--ignore=tests/integration", "tests/"]
7 changes: 7 additions & 0 deletions productmd/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,12 @@ class MetadataType(str, Enum):
"path",
"location",
"set_location",
"field_name",
],
)
# INFO: Using __new__.__defaults__ instead of namedtuple(defaults=...)
# because the defaults parameter requires Python 3.7+.
LocationEntry.__new__.__defaults__ = (None,)
"""
A single artifact location from compose metadata.

Expand All @@ -80,6 +84,8 @@ class MetadataType(str, Enum):
:param path: Relative path to the artifact
:param location: :class:`~productmd.location.Location` object, or ``None`` for v1.x data
:param set_location: Callable that sets a new Location on the source object
:param field_name: For variant paths, the field name (e.g., ``"repository"``).
``None`` for non-variant-path entries.
"""


Expand Down Expand Up @@ -236,6 +242,7 @@ def _setter(loc, _paths=paths, _field=field_name, _arch=arch):
path,
loc,
_setter,
field_name,
)
# Recurse into child variants
for child_variant in variant.variants.values():
Expand Down
221 changes: 213 additions & 8 deletions productmd/localize.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Callable, List, Optional, Tuple
from urllib.error import HTTPError, URLError
from urllib.parse import urlparse
from urllib.parse import urljoin, urlparse

import defusedxml.ElementTree as ET

from productmd.common import _get_default_headers
from productmd.convert import downgrade_to_v1, iter_all_locations
Expand Down Expand Up @@ -223,6 +225,185 @@ def redirect_request(self, req, fp, code, msg, headers, newurl):
#: Default chunk size for streaming downloads (8 KB)
_CHUNK_SIZE = 8192

#: Variant path fields that are YUM repository roots containing repodata/
_REPO_FIELDS = frozenset({"repository", "debug_repository", "source_repository"})

#: XML namespace used in repomd.xml
_REPOMD_NS = "http://linux.duke.edu/metadata/repo"


def _parse_repomd_xml(xml_bytes: bytes) -> List[dict]:
"""
Parse a ``repomd.xml`` and return metadata about each referenced file.

:param xml_bytes: Raw XML content of ``repomd.xml``
:return: List of dicts with keys ``href``, ``checksum``, ``checksum_type``, ``size``
"""
root = ET.fromstring(xml_bytes)
entries = []
for data_elem in root.findall(f"{{{_REPOMD_NS}}}data"):
location = data_elem.find(f"{{{_REPOMD_NS}}}location")
if location is None:
continue
href = location.get("href")
if not href:
continue

entry = {"href": href}

checksum_elem = data_elem.find(f"{{{_REPOMD_NS}}}checksum")
if checksum_elem is not None and checksum_elem.text:
entry["checksum_type"] = checksum_elem.get("type", "sha256")
entry["checksum"] = checksum_elem.text

size_elem = data_elem.find(f"{{{_REPOMD_NS}}}size")
if size_elem is not None and size_elem.text:
entry["size"] = int(size_elem.text)

entries.append(entry)
return entries


def _discover_repodata_tasks(
repo_entries: List[Tuple[str, str, object]],
compose_root: str,
retries: int = 3,
netrc_file: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
token: Optional[str] = None,
) -> List:
"""
Fetch ``repomd.xml`` for each repository and generate download tasks.

For each repository variant path, downloads ``repodata/repomd.xml``,
parses it to discover referenced files, and creates :class:`HttpTask`
entries for each referenced file.

Deduplicates repositories by URL to avoid fetching the same
``repomd.xml`` multiple times (e.g., source repos shared across arches).

:param repo_entries: List of ``(url, local_path, location)`` tuples
for each repository root
:param compose_root: Local compose root directory
:param retries: Number of retry attempts for fetching ``repomd.xml``
:param netrc_file: Path to a netrc file for credential lookup
:param username: Username for HTTP Basic authentication
:param password: Password for HTTP Basic authentication
:param token: Bearer token for HTTP authentication
:return: List of :class:`HttpTask` for all repodata files
"""
from productmd.location import Location as Loc

tasks = []
seen_urls = set()

for repo_url, repo_local_path, location in repo_entries:
if repo_url in seen_urls:
continue
seen_urls.add(repo_url)

# Ensure trailing slash for proper URL joining
if not repo_url.endswith("/"):
repo_url += "/"

repomd_url = urljoin(repo_url, "repodata/repomd.xml")
repomd_local = os.path.join(repo_local_path, "repodata", "repomd.xml")
repomd_dest = os.path.join(compose_root, repomd_local)

# Fetch repomd.xml
logger.info("Fetching repomd.xml from %s", repomd_url)
last_error = None
xml_bytes = None

headers = _get_default_headers()
auth_header = _build_auth_header(repomd_url, username, password, token, netrc_file)
if auth_header:
headers["Authorization"] = auth_header

for attempt in range(retries + 1):
try:
req = urllib.request.Request(repomd_url, headers=headers)
response = _opener.open(req)
xml_bytes = response.read()
break
except (HTTPError, URLError, OSError) as e:
last_error = e
logger.warning(
"Fetch attempt %d/%d failed for %s: %s",
attempt + 1,
retries + 1,
repomd_url,
e,
)
if isinstance(e, HTTPError) and e.code in (401, 403):
break
if attempt < retries:
time.sleep(2**attempt)

if xml_bytes is None:
logger.error("Failed to fetch repomd.xml from %s: %s", repomd_url, last_error)
continue

# Save repomd.xml itself
os.makedirs(os.path.dirname(repomd_dest), exist_ok=True)
with open(repomd_dest, "wb") as f:
f.write(xml_bytes)

# Verify checksum if available from composeinfo Location
if location is not None and getattr(location, "checksum", None) is not None:
try:
if not location.verify(repomd_dest):
logger.error("Checksum mismatch for repomd.xml from %s", repomd_url)
continue
except (OSError, ValueError) as e:
logger.error("Failed to verify repomd.xml from %s: %s", repomd_url, e)
continue

# Parse and generate tasks for referenced files
Comment thread
guillermodotn marked this conversation as resolved.
try:
repodata_entries = _parse_repomd_xml(xml_bytes)
except ET.ParseError as e:
logger.error("Failed to parse repomd.xml from %s: %s", repomd_url, e)
continue

for entry in repodata_entries:
Comment thread
guillermodotn marked this conversation as resolved.
href = entry["href"]
Comment thread
guillermodotn marked this conversation as resolved.

# Guard against path traversal in href values
normalized = os.path.normpath(href)
if normalized.startswith(("..", "/")) or "\\" in href:
logger.warning("Skipping suspicious repodata href: %s", href)
continue

file_url = urljoin(repo_url, href)
file_local = os.path.join(repo_local_path, href)
file_dest = os.path.join(compose_root, file_local)

# Build a Location with checksum/size from repomd.xml
# so the download pipeline can verify the file after download.
file_loc = None
checksum_type = entry.get("checksum_type")
checksum_value = entry.get("checksum")
if checksum_type and checksum_value:
file_loc = Loc(
url=file_url,
size=entry.get("size"),
checksum=f"{checksum_type}:{checksum_value}",
local_path=file_local,
)

tasks.append(
HttpTask(
url=file_url,
dest_path=file_dest,
location=file_loc,
rel_path=file_local,
)
)

return tasks


def _emit(
callback: Optional[Callable],
Expand Down Expand Up @@ -408,17 +589,20 @@ def _collect_download_tasks(
extra_files: Optional[object] = None,
modules: Optional[object] = None,
composeinfo: Optional[object] = None,
) -> Tuple[List[HttpTask], List[OciTask]]:
) -> Tuple[List[HttpTask], List[OciTask], List[Tuple[str, str, object]]]:
"""
Collect all remote artifacts that need downloading.

:return: Tuple of (http_tasks, oci_tasks) where http_tasks is a list
of :class:`HttpTask` namedtuples and oci_tasks is a list of
:class:`OciTask` namedtuples
:return: Tuple of (http_tasks, oci_tasks, repo_entries) where
http_tasks is a list of :class:`HttpTask` namedtuples,
oci_tasks is a list of :class:`OciTask` namedtuples, and
repo_entries is a list of ``(url, local_path, location)`` tuples for
YUM repository roots whose repodata needs downloading.
"""
compose_root = os.path.join(output_dir, "compose")
http_tasks = []
oci_tasks = []
repo_entries = []

for entry in iter_all_locations(
images=images,
Expand All @@ -431,8 +615,11 @@ def _collect_download_tasks(
continue
if not entry.location.is_remote:
continue
# Variant paths are directory references, not downloadable files
# Variant paths: repository fields need repodata downloading,
# all other fields are directory references (not downloadable).
if entry.metadata_type == "variant_path":
if entry.field_name in _REPO_FIELDS:
repo_entries.append((entry.location.url, entry.location.local_path, entry.location))
continue

if entry.location.is_oci:
Expand Down Expand Up @@ -466,7 +653,7 @@ def _collect_download_tasks(
)
)

return http_tasks, oci_tasks
return http_tasks, oci_tasks, repo_entries


def _deduplicate_http_tasks(
Expand Down Expand Up @@ -783,14 +970,32 @@ def localize_compose(
raise ValueError("http_token is mutually exclusive with http_username/http_password")

# Collect all remote download tasks
http_tasks, oci_tasks = _collect_download_tasks(
http_tasks, oci_tasks, repo_entries = _collect_download_tasks(
output_dir,
images,
rpms,
extra_files,
modules,
composeinfo,
)

# --- Phase 0: Repodata discovery ---
# Fetch repomd.xml for each repository and generate download tasks
# for the referenced metadata files (primary, filelists, comps, etc.).
compose_root = os.path.join(output_dir, "compose")
if repo_entries:
repodata_tasks = _discover_repodata_tasks(
repo_entries,
compose_root,
retries,
netrc_file=netrc_file,
username=http_username,
password=http_password,
token=http_token,
)
http_tasks.extend(repodata_tasks)
logger.info("Discovered %d repodata files from %d repositories", len(repodata_tasks), len(repo_entries))

http_tasks = _deduplicate_http_tasks(http_tasks)
oci_tasks = _deduplicate_oci_tasks(oci_tasks)

Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
dependencies = []
dependencies = ["defusedxml<0.8"]
name = "productmd"
version = "1.50"
description = "Product, compose and installation media metadata library"
Expand Down Expand Up @@ -120,7 +120,7 @@ env_list = ["py38", "py39", "py310", "py311", "py312", "py313", "format", "lint"

[tool.tox.env_run_base]
description = "Run tests with pytest"
deps = ["pytest", "oras"]
deps = ["pytest", "oras", "defusedxml"]
commands = [["pytest", { replace = "posargs", default = [], extend = true }]]

[tool.tox.env.lint]
Expand Down
17 changes: 17 additions & 0 deletions tests/test_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,23 @@ def test_composeinfo_yields_variant_paths(self):
assert "Server/x86_64/os" in paths
assert "Server/x86_64/os/Packages" in paths

def test_variant_path_has_field_name(self):
"""Test that variant path entries carry the field_name attribute."""
ci = _create_composeinfo()
entries = list(iter_all_locations(composeinfo=ci))

field_names = {e.field_name for e in entries}
assert "os_tree" in field_names
assert "packages" in field_names

def test_non_variant_path_field_name_is_none(self):
"""Test that non-variant-path entries have field_name=None."""
im = _create_images()
entries = list(iter_all_locations(images=im))

for e in entries:
assert e.field_name is None

def test_skips_none_modules(self):
"""Test that None modules are skipped."""
im = _create_images()
Expand Down
Loading
Loading