From 022862715cb289d8c480368b426bdbcdf95a7842 Mon Sep 17 00:00:00 2001 From: guillermodotn Date: Mon, 23 Mar 2026 17:11:27 +0100 Subject: [PATCH 01/15] feat: add netrc-based HTTP Basic auth support Assisted-by: Claude Opus --- productmd/localize.py | 61 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 59 insertions(+), 2 deletions(-) diff --git a/productmd/localize.py b/productmd/localize.py index 0ac0705..5935932 100644 --- a/productmd/localize.py +++ b/productmd/localize.py @@ -6,6 +6,10 @@ with parallel execution, checksum verification, and configurable error handling. +HTTP downloads support authentication via ``~/.netrc`` (or a custom +netrc file). Credentials matching the download URL hostname are sent +as HTTP Basic authentication headers. + OCI registry downloads require the ``oras-py`` package (``pip install productmd[oci]``). Authentication supports both Docker and Podman credential stores (``docker login`` / ``podman login``). @@ -29,13 +33,16 @@ """ import logging +import netrc import os import time import urllib.request +from base64 import b64encode from collections import namedtuple from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Callable, List, Optional, Tuple from urllib.error import HTTPError, URLError +from urllib.parse import urlparse from productmd.common import _get_default_headers from productmd.convert import downgrade_to_v1, iter_all_locations @@ -92,6 +99,37 @@ """ +def _get_netrc_auth_header( + url: str, + netrc_file: Optional[str] = None, +) -> Optional[str]: + """ + Look up HTTP Basic auth credentials from a netrc file. + + Searches the netrc file for credentials matching the hostname in + *url*. If found, returns a ``Basic`` Authorization header value. + + :param url: URL whose hostname is used for the netrc lookup + :param netrc_file: Path to a netrc file. When ``None``, the + standard ``~/.netrc`` (or ``~/_netrc`` on Windows) is used. + :return: ``"Basic "`` string, or ``None`` if no credentials + are found or the netrc file is missing / malformed + """ + try: + rc = netrc.netrc(netrc_file) + host = urlparse(url).hostname + if not host: + return None + auth = rc.authenticators(host) + if auth: + login, _, password = auth + credentials = b64encode(f"{login}:{password}".encode()).decode() + return f"Basic {credentials}" + except (FileNotFoundError, netrc.NetrcParseError, OSError) as e: + logger.debug("netrc lookup failed: %s", e) + return None + + #: Default chunk size for streaming downloads (8 KB) _CHUNK_SIZE = 8192 @@ -115,6 +153,7 @@ def _download_https( retries: int = 3, progress_callback: Optional[Callable] = None, filename: str = "", + netrc_file: Optional[str] = None, ) -> None: """ Download a file from an HTTP(S) URL to a local path. @@ -123,11 +162,16 @@ def _download_https( atomically to avoid partial files. Retries on failure with exponential backoff. + When *netrc_file* is provided (or ``~/.netrc`` exists), credentials + matching the URL hostname are used for HTTP Basic authentication. + :param url: HTTP(S) URL to download from :param dest_path: Local file path to save to :param retries: Number of retry attempts (default: 3) :param progress_callback: Optional callback for progress events :param filename: Relative path for progress event reporting + :param netrc_file: Path to a netrc file for credential lookup. + When ``None``, the standard ``~/.netrc`` is used. :raises urllib.error.URLError: If all retry attempts fail """ parent_dir = os.path.dirname(dest_path) @@ -137,9 +181,14 @@ def _download_https( tmp_path = dest_path + ".tmp" last_error = None + headers = _get_default_headers() + auth_header = _get_netrc_auth_header(url, netrc_file) + if auth_header: + headers["Authorization"] = auth_header + for attempt in range(retries + 1): try: - req = urllib.request.Request(url, headers=_get_default_headers()) + req = urllib.request.Request(url, headers=headers) response = urllib.request.urlopen(req) content_length = response.headers.get("Content-Length") total = int(content_length) if content_length else None @@ -579,6 +628,7 @@ def localize_compose( fail_fast: bool = True, retries: int = 3, progress_callback: Optional[Callable] = None, + netrc_file: Optional[str] = None, ) -> LocalizeResult: """ Localize a distributed v2.0 compose to local storage. @@ -591,6 +641,10 @@ def localize_compose( require ``oras-py`` (``pip install productmd[oci]``). Authentication supports Docker and Podman credential stores. + HTTP downloads support authentication via ``~/.netrc`` (or a custom + netrc file). Credentials matching the download URL hostname are + sent as HTTP Basic authentication headers. + :param output_dir: Local directory to create the compose layout :param images: :class:`~productmd.images.Images` instance :param rpms: :class:`~productmd.rpms.Rpms` instance @@ -605,6 +659,8 @@ def localize_compose( :param retries: Number of retry attempts per download (default: 3) :param progress_callback: Optional callable receiving :class:`DownloadEvent` instances for progress tracking + :param netrc_file: Path to a netrc file for HTTP credential lookup. + When ``None``, the standard ``~/.netrc`` is used. :return: :class:`LocalizeResult` with download statistics :raises RuntimeError: If OCI URLs are present but oras-py is not installed :raises urllib.error.URLError: If a download fails and fail_fast is True @@ -644,7 +700,7 @@ def localize_compose( # Sequential downloads for task in download_tasks: try: - _download_https(task.url, task.dest_path, retries, progress_callback, task.rel_path) + _download_https(task.url, task.dest_path, retries, progress_callback, task.rel_path, netrc_file) # Verify checksum after download if verify_checksums and task.location is not None and task.location.checksum is not None: if not task.location.verify(task.dest_path): @@ -669,6 +725,7 @@ def localize_compose( retries, progress_callback, task.rel_path, + netrc_file, ) future_to_task[future] = task From 1f951320969ea244180787c2effa8566369a1f15 Mon Sep 17 00:00:00 2001 From: guillermodotn Date: Mon, 23 Mar 2026 17:13:43 +0100 Subject: [PATCH 02/15] feat: add `--netrc-file` flag to localize subcommand Assisted-by: Claude Opus --- productmd/cli/localize.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/productmd/cli/localize.py b/productmd/cli/localize.py index 54a74c3..9c7daf5 100644 --- a/productmd/cli/localize.py +++ b/productmd/cli/localize.py @@ -1,10 +1,12 @@ """``productmd localize`` subcommand — download distributed v2.0 compose. -Supports both HTTPS/HTTP and OCI registry downloads. OCI downloads -require the ``oras-py`` package (``pip install productmd[oci]``). -Authentication supports Docker and Podman credential stores. +Supports both HTTPS/HTTP and OCI registry downloads. HTTP downloads +support authentication via ``~/.netrc``. OCI downloads require the +``oras-py`` package (``pip install productmd[oci]``) and support +Docker and Podman credential stores. """ +import os import sys from productmd.cli import add_input_args, load_metadata, print_error @@ -65,6 +67,11 @@ def register(subparsers: object) -> None: default=True, help="Continue downloading after failures (default: stop on first)", ) + parser.add_argument( + "--netrc-file", + default=os.environ.get("PRODUCTMD_NETRC_FILE"), + help=("Path to a netrc file for HTTP credential lookup (default: ~/.netrc). Can also be set via PRODUCTMD_NETRC_FILE env var."), + ) add_input_args(parser) parser.set_defaults(func=run) @@ -91,6 +98,7 @@ def run(args: object) -> None: retries=args.retries, fail_fast=args.fail_fast, progress_callback=progress_callback, + netrc_file=args.netrc_file, **metadata, ) finally: From ed181e99a5380c35ee029f625eef2df7eb97716c Mon Sep 17 00:00:00 2001 From: guillermodotn Date: Mon, 23 Mar 2026 17:43:51 +0100 Subject: [PATCH 03/15] feat: add explicit HTTP Basic auth credentials Assisted-by: Claude Opus --- productmd/localize.py | 45 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 36 insertions(+), 9 deletions(-) diff --git a/productmd/localize.py b/productmd/localize.py index 5935932..08f0ae8 100644 --- a/productmd/localize.py +++ b/productmd/localize.py @@ -154,6 +154,8 @@ def _download_https( progress_callback: Optional[Callable] = None, filename: str = "", netrc_file: Optional[str] = None, + username: Optional[str] = None, + password: Optional[str] = None, ) -> None: """ Download a file from an HTTP(S) URL to a local path. @@ -162,8 +164,10 @@ def _download_https( atomically to avoid partial files. Retries on failure with exponential backoff. - When *netrc_file* is provided (or ``~/.netrc`` exists), credentials - matching the URL hostname are used for HTTP Basic authentication. + When *username* and *password* are provided, they are used for HTTP + Basic authentication and take precedence over netrc credentials. + Otherwise, when ``~/.netrc`` (or *netrc_file*) contains credentials + matching the URL hostname, those are used instead. :param url: HTTP(S) URL to download from :param dest_path: Local file path to save to @@ -172,6 +176,9 @@ def _download_https( :param filename: Relative path for progress event reporting :param netrc_file: Path to a netrc file for credential lookup. When ``None``, the standard ``~/.netrc`` is used. + :param username: Username for HTTP Basic authentication. + Takes precedence over netrc credentials. + :param password: Password for HTTP Basic authentication. :raises urllib.error.URLError: If all retry attempts fail """ parent_dir = os.path.dirname(dest_path) @@ -182,9 +189,13 @@ def _download_https( last_error = None headers = _get_default_headers() - auth_header = _get_netrc_auth_header(url, netrc_file) - if auth_header: - headers["Authorization"] = auth_header + if username is not None and password is not None: + credentials = b64encode(f"{username}:{password}".encode()).decode() + headers["Authorization"] = f"Basic {credentials}" + else: + auth_header = _get_netrc_auth_header(url, netrc_file) + if auth_header: + headers["Authorization"] = auth_header for attempt in range(retries + 1): try: @@ -629,6 +640,8 @@ def localize_compose( retries: int = 3, progress_callback: Optional[Callable] = None, netrc_file: Optional[str] = None, + http_username: Optional[str] = None, + http_password: Optional[str] = None, ) -> LocalizeResult: """ Localize a distributed v2.0 compose to local storage. @@ -641,9 +654,9 @@ def localize_compose( require ``oras-py`` (``pip install productmd[oci]``). Authentication supports Docker and Podman credential stores. - HTTP downloads support authentication via ``~/.netrc`` (or a custom - netrc file). Credentials matching the download URL hostname are - sent as HTTP Basic authentication headers. + HTTP downloads support authentication via explicit credentials or + ``~/.netrc`` (or a custom netrc file). Explicit credentials take + precedence over netrc. :param output_dir: Local directory to create the compose layout :param images: :class:`~productmd.images.Images` instance @@ -661,6 +674,9 @@ def localize_compose( :class:`DownloadEvent` instances for progress tracking :param netrc_file: Path to a netrc file for HTTP credential lookup. When ``None``, the standard ``~/.netrc`` is used. + :param http_username: Username for HTTP Basic authentication. + Takes precedence over netrc credentials. + :param http_password: Password for HTTP Basic authentication. :return: :class:`LocalizeResult` with download statistics :raises RuntimeError: If OCI URLs are present but oras-py is not installed :raises urllib.error.URLError: If a download fails and fail_fast is True @@ -700,7 +716,16 @@ def localize_compose( # Sequential downloads for task in download_tasks: try: - _download_https(task.url, task.dest_path, retries, progress_callback, task.rel_path, netrc_file) + _download_https( + task.url, + task.dest_path, + retries, + progress_callback, + task.rel_path, + netrc_file, + http_username, + http_password, + ) # Verify checksum after download if verify_checksums and task.location is not None and task.location.checksum is not None: if not task.location.verify(task.dest_path): @@ -726,6 +751,8 @@ def localize_compose( progress_callback, task.rel_path, netrc_file, + http_username, + http_password, ) future_to_task[future] = task From 25ef840566f64e52a1a7c8cae4aa750808a891d3 Mon Sep 17 00:00:00 2001 From: guillermodotn Date: Mon, 23 Mar 2026 18:47:44 +0100 Subject: [PATCH 04/15] feat: add `--http-username` and `--http-password` flags --- productmd/cli/localize.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/productmd/cli/localize.py b/productmd/cli/localize.py index 9c7daf5..a976c97 100644 --- a/productmd/cli/localize.py +++ b/productmd/cli/localize.py @@ -72,6 +72,16 @@ def register(subparsers: object) -> None: default=os.environ.get("PRODUCTMD_NETRC_FILE"), help=("Path to a netrc file for HTTP credential lookup (default: ~/.netrc). Can also be set via PRODUCTMD_NETRC_FILE env var."), ) + parser.add_argument( + "--http-username", + default=None, + help=("Username for HTTP Basic authentication. Takes precedence over netrc credentials."), + ) + parser.add_argument( + "--http-password", + default=os.environ.get("PRODUCTMD_HTTP_PASSWORD"), + help=("Password for HTTP Basic authentication. Can also be set via PRODUCTMD_HTTP_PASSWORD env var."), + ) add_input_args(parser) parser.set_defaults(func=run) @@ -99,6 +109,8 @@ def run(args: object) -> None: fail_fast=args.fail_fast, progress_callback=progress_callback, netrc_file=args.netrc_file, + http_username=args.http_username, + http_password=args.http_password, **metadata, ) finally: From a87734188541c1659c130f4224f8147be73c3db6 Mon Sep 17 00:00:00 2001 From: guillermodotn Date: Mon, 23 Mar 2026 19:50:31 +0100 Subject: [PATCH 05/15] feat: add Bearer token auth Assisted-by: Claude Opus --- productmd/cli/localize.py | 23 ++++++++++++-- productmd/localize.py | 66 +++++++++++++++++++++++++++++---------- 2 files changed, 70 insertions(+), 19 deletions(-) diff --git a/productmd/cli/localize.py b/productmd/cli/localize.py index a976c97..b7b2944 100644 --- a/productmd/cli/localize.py +++ b/productmd/cli/localize.py @@ -1,9 +1,10 @@ """``productmd localize`` subcommand — download distributed v2.0 compose. Supports both HTTPS/HTTP and OCI registry downloads. HTTP downloads -support authentication via ``~/.netrc``. OCI downloads require the -``oras-py`` package (``pip install productmd[oci]``) and support -Docker and Podman credential stores. +support authentication via Bearer token, Basic credentials, or +``~/.netrc``. OCI downloads require the ``oras-py`` package +(``pip install productmd[oci]``) and support Docker and Podman +credential stores. """ import os @@ -27,6 +28,7 @@ def register(subparsers: object) -> None: "Download all remote artifacts from a v2.0 compose, " "recreating the standard v1.2 filesystem layout. " "Supports HTTPS/HTTP and OCI registry downloads. " + "HTTP auth: Bearer token, Basic credentials, or ~/.netrc. " "OCI requires oras-py (pip install productmd[oci]). " "Writes v1.2 metadata after download." ), @@ -82,6 +84,16 @@ def register(subparsers: object) -> None: default=os.environ.get("PRODUCTMD_HTTP_PASSWORD"), help=("Password for HTTP Basic authentication. Can also be set via PRODUCTMD_HTTP_PASSWORD env var."), ) + parser.add_argument( + "--http-token", + default=os.environ.get("PRODUCTMD_HTTP_TOKEN"), + help=( + "Bearer token for HTTP authentication. " + "Takes precedence over Basic credentials and netrc. " + "Mutually exclusive with --http-username/--http-password. " + "Can also be set via PRODUCTMD_HTTP_TOKEN env var." + ), + ) add_input_args(parser) parser.set_defaults(func=run) @@ -97,6 +109,10 @@ def run(args: object) -> None: print_error(f"No metadata found at {args.input}") sys.exit(1) + if args.http_token and (args.http_username or args.http_password): + print_error("--http-token is mutually exclusive with --http-username/--http-password") + sys.exit(2) + progress_callback, cleanup = make_progress_callback(parallel=args.parallel_downloads) try: @@ -111,6 +127,7 @@ def run(args: object) -> None: netrc_file=args.netrc_file, http_username=args.http_username, http_password=args.http_password, + http_token=args.http_token, **metadata, ) finally: diff --git a/productmd/localize.py b/productmd/localize.py index 08f0ae8..873c72a 100644 --- a/productmd/localize.py +++ b/productmd/localize.py @@ -130,6 +130,39 @@ def _get_netrc_auth_header( return None +def _build_auth_header( + url: str, + username: Optional[str] = None, + password: Optional[str] = None, + token: Optional[str] = None, + netrc_file: Optional[str] = None, +) -> Optional[str]: + """ + Resolve an HTTP Authorization header value. + + Determines the appropriate authorization header using the following + precedence (highest first): + + 1. Bearer token (``token``) + 2. Explicit Basic credentials (``username`` + ``password``) + 3. Netrc lookup by URL hostname + + :param url: URL whose hostname is used for netrc lookup + :param username: Username for HTTP Basic authentication + :param password: Password for HTTP Basic authentication + :param token: Bearer token for HTTP authentication + :param netrc_file: Path to a netrc file (default: ``~/.netrc``) + :return: Authorization header value, or ``None`` if no credentials + are available + """ + if token is not None: + return f"Bearer {token}" + if username is not None and password is not None: + credentials = b64encode(f"{username}:{password}".encode()).decode() + return f"Basic {credentials}" + return _get_netrc_auth_header(url, netrc_file) + + #: Default chunk size for streaming downloads (8 KB) _CHUNK_SIZE = 8192 @@ -156,6 +189,7 @@ def _download_https( netrc_file: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, + token: Optional[str] = None, ) -> None: """ Download a file from an HTTP(S) URL to a local path. @@ -164,10 +198,9 @@ def _download_https( atomically to avoid partial files. Retries on failure with exponential backoff. - When *username* and *password* are provided, they are used for HTTP - Basic authentication and take precedence over netrc credentials. - Otherwise, when ``~/.netrc`` (or *netrc_file*) contains credentials - matching the URL hostname, those are used instead. + Authentication is resolved via :func:`_build_auth_header` with the + following precedence: Bearer *token* > explicit *username*/*password* + (Basic) > netrc lookup. :param url: HTTP(S) URL to download from :param dest_path: Local file path to save to @@ -177,8 +210,9 @@ def _download_https( :param netrc_file: Path to a netrc file for credential lookup. When ``None``, the standard ``~/.netrc`` is used. :param username: Username for HTTP Basic authentication. - Takes precedence over netrc credentials. :param password: Password for HTTP Basic authentication. + :param token: Bearer token for HTTP authentication. + Takes precedence over Basic credentials and netrc. :raises urllib.error.URLError: If all retry attempts fail """ parent_dir = os.path.dirname(dest_path) @@ -189,13 +223,9 @@ def _download_https( last_error = None headers = _get_default_headers() - if username is not None and password is not None: - credentials = b64encode(f"{username}:{password}".encode()).decode() - headers["Authorization"] = f"Basic {credentials}" - else: - auth_header = _get_netrc_auth_header(url, netrc_file) - if auth_header: - headers["Authorization"] = auth_header + auth_header = _build_auth_header(url, username, password, token, netrc_file) + if auth_header: + headers["Authorization"] = auth_header for attempt in range(retries + 1): try: @@ -642,6 +672,7 @@ def localize_compose( netrc_file: Optional[str] = None, http_username: Optional[str] = None, http_password: Optional[str] = None, + http_token: Optional[str] = None, ) -> LocalizeResult: """ Localize a distributed v2.0 compose to local storage. @@ -654,9 +685,8 @@ def localize_compose( require ``oras-py`` (``pip install productmd[oci]``). Authentication supports Docker and Podman credential stores. - HTTP downloads support authentication via explicit credentials or - ``~/.netrc`` (or a custom netrc file). Explicit credentials take - precedence over netrc. + HTTP downloads support authentication with the following precedence: + Bearer token > explicit Basic credentials > netrc lookup. :param output_dir: Local directory to create the compose layout :param images: :class:`~productmd.images.Images` instance @@ -675,8 +705,10 @@ def localize_compose( :param netrc_file: Path to a netrc file for HTTP credential lookup. When ``None``, the standard ``~/.netrc`` is used. :param http_username: Username for HTTP Basic authentication. - Takes precedence over netrc credentials. :param http_password: Password for HTTP Basic authentication. + :param http_token: Bearer token for HTTP authentication. + Takes precedence over Basic credentials and netrc. + Mutually exclusive with ``http_username``/``http_password``. :return: :class:`LocalizeResult` with download statistics :raises RuntimeError: If OCI URLs are present but oras-py is not installed :raises urllib.error.URLError: If a download fails and fail_fast is True @@ -725,6 +757,7 @@ def localize_compose( netrc_file, http_username, http_password, + http_token, ) # Verify checksum after download if verify_checksums and task.location is not None and task.location.checksum is not None: @@ -753,6 +786,7 @@ def localize_compose( netrc_file, http_username, http_password, + http_token, ) future_to_task[future] = task From 558e376352c44f300fd0b78da6a64de0e07c1d22 Mon Sep 17 00:00:00 2001 From: guillermodotn Date: Mon, 23 Mar 2026 21:36:53 +0100 Subject: [PATCH 06/15] test: add tests for HTTP authentication Assisted-by: Claude Opus --- tests/test_localize.py | 260 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 260 insertions(+) diff --git a/tests/test_localize.py b/tests/test_localize.py index df25d7f..257a8b8 100644 --- a/tests/test_localize.py +++ b/tests/test_localize.py @@ -11,9 +11,11 @@ HttpTask, LocalizeResult, OciTask, + _build_auth_header, _deduplicate_http_tasks, _download_https, _download_single_oci, + _get_netrc_auth_header, _should_skip, _should_skip_oci, localize_compose, @@ -1117,3 +1119,261 @@ def test_same_url_different_dest_keeps_both(self): ) result = _deduplicate_http_tasks([task1, task2]) assert len(result) == 2 + + +# --------------------------------------------------------------------------- +# Tests: HTTP Authentication +# --------------------------------------------------------------------------- + + +class TestGetNetrcAuthHeader: + """Tests for _get_netrc_auth_header.""" + + def test_netrc_auth_found(self, tmp_path): + """Test that matching netrc credentials produce a Basic auth header.""" + netrc_file = tmp_path / ".netrc" + netrc_file.write_text("machine pulp.example.com\nlogin admin\npassword secret123\n") + + result = _get_netrc_auth_header("https://pulp.example.com/repo/file.rpm", str(netrc_file)) + + assert result is not None + assert result.startswith("Basic ") + import base64 + + decoded = base64.b64decode(result.split(" ", 1)[1]).decode() + assert decoded == "admin:secret123" + + def test_netrc_host_not_found(self, tmp_path): + """Test that no match returns None.""" + netrc_file = tmp_path / ".netrc" + netrc_file.write_text("machine other.example.com\nlogin user\npassword pass\n") + + result = _get_netrc_auth_header("https://pulp.example.com/file.rpm", str(netrc_file)) + + assert result is None + + def test_netrc_file_missing(self): + """Test that a missing netrc file returns None without error.""" + result = _get_netrc_auth_header("https://pulp.example.com/file.rpm", "/nonexistent/.netrc") + + assert result is None + + def test_netrc_parse_error(self, tmp_path): + """Test that a malformed netrc file returns None without error.""" + netrc_file = tmp_path / ".netrc" + netrc_file.write_text("this is not valid netrc content!!!\n~~~") + + result = _get_netrc_auth_header("https://pulp.example.com/file.rpm", str(netrc_file)) + + assert result is None + + def test_netrc_custom_file(self, tmp_path): + """Test that a custom netrc file path is used.""" + custom_netrc = tmp_path / "custom-netrc" + custom_netrc.write_text("machine cdn.example.com\nlogin deploy\npassword s3cret\n") + + result = _get_netrc_auth_header("https://cdn.example.com/boot.iso", str(custom_netrc)) + + assert result is not None + import base64 + + decoded = base64.b64decode(result.split(" ", 1)[1]).decode() + assert decoded == "deploy:s3cret" + + def test_netrc_no_hostname_in_url(self): + """Test that a URL with no hostname returns None.""" + result = _get_netrc_auth_header("file:///local/path", None) + + assert result is None + + +class TestBuildAuthHeader: + """Tests for _build_auth_header precedence logic.""" + + def test_explicit_basic_auth(self): + """Test that explicit username/password produce a Basic header.""" + result = _build_auth_header("https://example.com/file", username="user", password="pass") + + assert result is not None + assert result.startswith("Basic ") + import base64 + + decoded = base64.b64decode(result.split(" ", 1)[1]).decode() + assert decoded == "user:pass" + + def test_bearer_token(self): + """Test that a token produces a Bearer header.""" + result = _build_auth_header("https://example.com/file", token="my-jwt-token") + + assert result == "Bearer my-jwt-token" + + def test_token_overrides_basic(self): + """Test that token takes precedence over username/password.""" + result = _build_auth_header( + "https://example.com/file", + username="user", + password="pass", + token="my-token", + ) + + assert result == "Bearer my-token" + + def test_token_overrides_netrc(self, tmp_path): + """Test that token takes precedence over netrc credentials.""" + netrc_file = tmp_path / ".netrc" + netrc_file.write_text("machine example.com\nlogin admin\npassword secret\n") + + result = _build_auth_header( + "https://example.com/file", + token="my-token", + netrc_file=str(netrc_file), + ) + + assert result == "Bearer my-token" + + def test_explicit_basic_overrides_netrc(self, tmp_path): + """Test that explicit credentials override netrc.""" + netrc_file = tmp_path / ".netrc" + netrc_file.write_text("machine example.com\nlogin netrc-user\npassword netrc-pass\n") + + result = _build_auth_header( + "https://example.com/file", + username="explicit-user", + password="explicit-pass", + netrc_file=str(netrc_file), + ) + + assert result is not None + assert result.startswith("Basic ") + import base64 + + decoded = base64.b64decode(result.split(" ", 1)[1]).decode() + assert decoded == "explicit-user:explicit-pass" + + def test_falls_back_to_netrc(self, tmp_path): + """Test that netrc is used when no explicit credentials are given.""" + netrc_file = tmp_path / ".netrc" + netrc_file.write_text("machine example.com\nlogin netrc-user\npassword netrc-pass\n") + + result = _build_auth_header("https://example.com/file", netrc_file=str(netrc_file)) + + assert result is not None + import base64 + + decoded = base64.b64decode(result.split(" ", 1)[1]).decode() + assert decoded == "netrc-user:netrc-pass" + + def test_no_credentials_returns_none(self): + """Test that None is returned when no auth is available.""" + result = _build_auth_header( + "https://example.com/file", + netrc_file="/nonexistent/.netrc", + ) + + assert result is None + + +class TestDownloadHttpsAuth: + """Tests for authentication in _download_https.""" + + @patch("productmd.localize.urllib.request.urlopen") + def test_download_sends_basic_auth_header(self, mock_urlopen, tmp_path): + """Test that explicit credentials add Authorization header to request.""" + mock_urlopen.return_value = _mock_urlopen(b"content") + dest = str(tmp_path / "file.rpm") + + _download_https( + "https://pulp.example.com/file.rpm", + dest, + retries=0, + username="admin", + password="secret", + ) + + req = mock_urlopen.call_args[0][0] + assert req.get_header("Authorization").startswith("Basic ") + + @patch("productmd.localize.urllib.request.urlopen") + def test_download_sends_bearer_token_header(self, mock_urlopen, tmp_path): + """Test that a token adds a Bearer Authorization header.""" + mock_urlopen.return_value = _mock_urlopen(b"content") + dest = str(tmp_path / "file.rpm") + + _download_https( + "https://pulp.example.com/file.rpm", + dest, + retries=0, + token="jwt-token-here", + ) + + req = mock_urlopen.call_args[0][0] + assert req.get_header("Authorization") == "Bearer jwt-token-here" + + @patch("productmd.localize.urllib.request.urlopen") + def test_download_sends_netrc_auth_header(self, mock_urlopen, tmp_path): + """Test that netrc credentials add Authorization header.""" + mock_urlopen.return_value = _mock_urlopen(b"content") + netrc_file = tmp_path / ".netrc" + netrc_file.write_text("machine pulp.example.com\nlogin admin\npassword secret\n") + dest = str(tmp_path / "file.rpm") + + _download_https( + "https://pulp.example.com/file.rpm", + dest, + retries=0, + netrc_file=str(netrc_file), + ) + + req = mock_urlopen.call_args[0][0] + assert req.get_header("Authorization").startswith("Basic ") + + @patch("productmd.localize.urllib.request.urlopen") + def test_download_no_auth_by_default(self, mock_urlopen, tmp_path): + """Test that no Authorization header is set when no auth is configured.""" + mock_urlopen.return_value = _mock_urlopen(b"content") + dest = str(tmp_path / "file.rpm") + + _download_https( + "https://pulp.example.com/file.rpm", + dest, + retries=0, + netrc_file="/nonexistent/.netrc", + ) + + req = mock_urlopen.call_args[0][0] + assert req.get_header("Authorization") is None + + @patch("productmd.localize.urllib.request.urlopen") + def test_localize_compose_passes_auth_credentials(self, mock_urlopen, tmp_path): + """Test that localize_compose passes auth params to _download_https.""" + mock_urlopen.return_value = _mock_urlopen(b"x" * 512) + images = _create_images_v2() + + localize_compose( + output_dir=str(tmp_path), + images=images, + parallel_downloads=1, + verify_checksums=False, + http_username="user", + http_password="pass", + ) + + req = mock_urlopen.call_args[0][0] + assert req.get_header("Authorization").startswith("Basic ") + + @patch("productmd.localize.urllib.request.urlopen") + def test_localize_compose_passes_token(self, mock_urlopen, tmp_path): + """Test that localize_compose passes token to _download_https.""" + mock_urlopen.return_value = _mock_urlopen(b"x" * 512) + images = _create_images_v2() + + localize_compose( + output_dir=str(tmp_path), + images=images, + parallel_downloads=1, + verify_checksums=False, + http_token="my-bearer-token", + ) + + req = mock_urlopen.call_args[0][0] + assert req.get_header("Authorization") == "Bearer my-bearer-token" From a8a6e6f39bf6a3cf23f46c71626d59c7bc65c966 Mon Sep 17 00:00:00 2001 From: guillermodotn Date: Mon, 23 Mar 2026 21:43:25 +0100 Subject: [PATCH 07/15] docs: document HTTP authentication options Assisted-by: Claude Opus --- doc/cli-localize.rst | 75 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 74 insertions(+), 1 deletion(-) diff --git a/doc/cli-localize.rst b/doc/cli-localize.rst index 0ecb9dc..c6e1f1d 100644 --- a/doc/cli-localize.rst +++ b/doc/cli-localize.rst @@ -4,7 +4,7 @@ productmd-localize Synopsis -------- -**productmd localize** **--output** *DIR* [**--parallel-downloads** *N*] [**--no-verify-checksums**] [**--skip-existing**] [**--retries** *N*] [**--no-fail-fast**] *input* +**productmd localize** **--output** *DIR* [**--parallel-downloads** *N*] [**--no-verify-checksums**] [**--skip-existing**] [**--retries** *N*] [**--no-fail-fast**] [**--http-token** *TOKEN*] [**--http-username** *USER* **--http-password** *PASS*] [**--netrc-file** *PATH*] *input* Description ----------- @@ -56,6 +56,57 @@ Options *input* Path to a v2.0 metadata file or compose directory. Auto-detected. +HTTP Authentication +------------------- + +HTTP downloads support authentication for accessing protected content +servers (e.g. Pulp). Three mechanisms are available, resolved in the +following precedence order (highest first): + +1. **Bearer token** — ``--http-token`` or ``$PRODUCTMD_HTTP_TOKEN`` +2. **Basic credentials** — ``--http-username`` + ``--http-password`` + (or ``$PRODUCTMD_HTTP_PASSWORD``) +3. **Netrc** — automatic lookup from ``~/.netrc`` (or ``--netrc-file``) + +Only one of Bearer token or Basic credentials may be specified. +Providing both ``--http-token`` and ``--http-username``/``--http-password`` +is an error. + +When no explicit credentials are given, the tool automatically checks +``~/.netrc`` for entries matching the download URL hostname. This is +transparent and requires no CLI flags. + +**--http-token** *TOKEN* + Bearer token for HTTP authentication. Sent as + ``Authorization: Bearer ``. Useful for Keycloak/OAuth2-based + Pulp deployments. Can also be set via the ``PRODUCTMD_HTTP_TOKEN`` + environment variable. Mutually exclusive with + ``--http-username``/``--http-password``. + +**--http-username** *USER* + Username for HTTP Basic authentication. Must be combined with + ``--http-password``. Takes precedence over netrc credentials. + +**--http-password** *PASS* + Password for HTTP Basic authentication. Can also be set via the + ``PRODUCTMD_HTTP_PASSWORD`` environment variable to avoid exposing + the password in shell history. + +**--netrc-file** *PATH* + Path to a netrc file for credential lookup. Default: ``~/.netrc``. + Can also be set via the ``PRODUCTMD_NETRC_FILE`` environment variable. + Useful in CI/automation or containerized environments where + ``~/.netrc`` is not available. + +The netrc file uses the standard format:: + + machine pulp.example.com + login admin + password secret123 + +Credentials are matched by hostname. Different hosts can have +different credentials in the same netrc file. + OCI Support ----------- @@ -109,6 +160,28 @@ Continue after failures:: --no-fail-fast \ images.json +Download with a Bearer token:: + + productmd localize \ + --output /mnt/local \ + --http-token eyJhbGciOiJSUzI1NiIs... \ + images.json + +Download with Basic auth (password via env var):: + + export PRODUCTMD_HTTP_PASSWORD=secret123 + productmd localize \ + --output /mnt/local \ + --http-username admin \ + images.json + +Download using a custom netrc file:: + + productmd localize \ + --output /mnt/local \ + --netrc-file /run/secrets/netrc \ + images.json + See Also -------- From 56e7fbda66379f3292e75ffbb46083f2091f3f91 Mon Sep 17 00:00:00 2001 From: guillermodotn Date: Tue, 24 Mar 2026 12:17:40 +0100 Subject: [PATCH 08/15] fix: strip auth header on cross-origin HTTP redirects Assisted-by: Claude Opus --- doc/cli-localize.rst | 7 ++++ productmd/localize.py | 33 +++++++++++++++- tests/test_localize.py | 90 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 129 insertions(+), 1 deletion(-) diff --git a/doc/cli-localize.rst b/doc/cli-localize.rst index c6e1f1d..1b40019 100644 --- a/doc/cli-localize.rst +++ b/doc/cli-localize.rst @@ -107,6 +107,13 @@ The netrc file uses the standard format:: Credentials are matched by hostname. Different hosts can have different credentials in the same netrc file. +.. note:: + + Authorization headers are automatically stripped when an HTTP + redirect points to a different origin (scheme, host, or port), + preventing credential leakage to third-party servers such as CDNs + or S3 presigned URLs. This matches curl's default behavior. + OCI Support ----------- diff --git a/productmd/localize.py b/productmd/localize.py index 873c72a..a02f301 100644 --- a/productmd/localize.py +++ b/productmd/localize.py @@ -163,6 +163,36 @@ def _build_auth_header( return _get_netrc_auth_header(url, netrc_file) +class _SafeRedirectHandler(urllib.request.HTTPRedirectHandler): + """Redirect handler that strips Authorization on cross-origin redirects. + + Prevents credential leakage when a server (e.g. Pulp) redirects to + a different origin (e.g. CDN or S3 presigned URL). Matches curl's + default behavior of comparing scheme + host + port. + """ + + def redirect_request(self, req, fp, code, msg, headers, newurl): + new_req = super().redirect_request(req, fp, code, msg, headers, newurl) + if new_req is not None: + original = urlparse(req.full_url) + redirect = urlparse(new_req.full_url) + original_origin = (original.scheme, original.hostname, original.port) + redirect_origin = (redirect.scheme, redirect.hostname, redirect.port) + if original_origin != redirect_origin: + new_req.remove_header("Authorization") + logger.debug( + "Stripped Authorization header on redirect from %s to %s", + req.full_url, + newurl, + ) + return new_req + + +# Install the safe redirect handler globally so that all urlopen() calls +# in this module automatically strip Authorization on cross-origin redirects. +urllib.request.install_opener(urllib.request.build_opener(_SafeRedirectHandler)) + + #: Default chunk size for streaming downloads (8 KB) _CHUNK_SIZE = 8192 @@ -200,7 +230,8 @@ def _download_https( Authentication is resolved via :func:`_build_auth_header` with the following precedence: Bearer *token* > explicit *username*/*password* - (Basic) > netrc lookup. + (Basic) > netrc lookup. Authorization headers are automatically + stripped on cross-origin redirects to prevent credential leakage. :param url: HTTP(S) URL to download from :param dest_path: Local file path to save to diff --git a/tests/test_localize.py b/tests/test_localize.py index 257a8b8..f62df01 100644 --- a/tests/test_localize.py +++ b/tests/test_localize.py @@ -1,7 +1,9 @@ """Tests for the localization tool (localize_compose, _download_https).""" +import http.client import io import os +import urllib.request from unittest.mock import MagicMock, patch import pytest @@ -11,6 +13,7 @@ HttpTask, LocalizeResult, OciTask, + _SafeRedirectHandler, _build_auth_header, _deduplicate_http_tasks, _download_https, @@ -1377,3 +1380,90 @@ def test_localize_compose_passes_token(self, mock_urlopen, tmp_path): req = mock_urlopen.call_args[0][0] assert req.get_header("Authorization") == "Bearer my-bearer-token" + + +# --------------------------------------------------------------------------- +# Tests: Safe Redirect Handler +# --------------------------------------------------------------------------- + + +class TestSafeRedirectHandler: + """Tests for _SafeRedirectHandler cross-origin auth stripping.""" + + def _redirect(self, handler, orig_url, new_url, auth_header=None): + """Helper to invoke redirect_request with minimal boilerplate.""" + req = urllib.request.Request(orig_url) + if auth_header: + req.add_header("Authorization", auth_header) + headers = http.client.HTTPMessage() + fp = io.BytesIO(b"") + return handler.redirect_request(req, fp, 302, "Found", headers, new_url) + + def test_strips_auth_on_cross_origin_redirect(self): + """Test that Authorization is removed when redirecting to a different host.""" + handler = _SafeRedirectHandler() + + new_req = self._redirect( + handler, + "https://pulp.example.com/repo/file.rpm", + "https://cdn.example.com/cached-file.rpm", + auth_header="Basic abc123", + ) + + assert new_req is not None + assert new_req.get_header("Authorization") is None + + def test_keeps_auth_on_same_origin_redirect(self): + """Test that Authorization is preserved when redirecting to the same origin.""" + handler = _SafeRedirectHandler() + + new_req = self._redirect( + handler, + "https://pulp.example.com/repo/file.rpm", + "https://pulp.example.com/different/path.rpm", + auth_header="Basic abc123", + ) + + assert new_req is not None + assert new_req.get_header("Authorization") == "Basic abc123" + + def test_strips_auth_on_scheme_downgrade(self): + """Test that Authorization is removed on HTTPS -> HTTP downgrade.""" + handler = _SafeRedirectHandler() + + new_req = self._redirect( + handler, + "https://pulp.example.com/file.rpm", + "http://pulp.example.com/file.rpm", + auth_header="Bearer my-token", + ) + + assert new_req is not None + assert new_req.get_header("Authorization") is None + + def test_strips_auth_on_port_change(self): + """Test that Authorization is removed when the port changes.""" + handler = _SafeRedirectHandler() + + new_req = self._redirect( + handler, + "https://pulp.example.com:443/file.rpm", + "https://pulp.example.com:8443/file.rpm", + auth_header="Basic abc123", + ) + + assert new_req is not None + assert new_req.get_header("Authorization") is None + + def test_no_auth_header_no_error(self): + """Test that redirects without Authorization header work fine.""" + handler = _SafeRedirectHandler() + + new_req = self._redirect( + handler, + "https://pulp.example.com/file.rpm", + "https://cdn.example.com/file.rpm", + ) + + assert new_req is not None + assert new_req.get_header("Authorization") is None From eb12fb88e879b8f1610b4f418f977aced0988532 Mon Sep 17 00:00:00 2001 From: guillermodotn Date: Tue, 24 Mar 2026 13:22:36 +0100 Subject: [PATCH 09/15] fix: require both --http-username and --http-password together --- productmd/cli/localize.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/productmd/cli/localize.py b/productmd/cli/localize.py index b7b2944..4ef6a57 100644 --- a/productmd/cli/localize.py +++ b/productmd/cli/localize.py @@ -113,6 +113,14 @@ def run(args: object) -> None: print_error("--http-token is mutually exclusive with --http-username/--http-password") sys.exit(2) + if args.http_username and not args.http_password: + print_error("--http-username requires --http-password") + sys.exit(2) + + if args.http_password and not args.http_username: + print_error("--http-password requires --http-username") + sys.exit(2) + progress_callback, cleanup = make_progress_callback(parallel=args.parallel_downloads) try: From a8bda7fa866004bd077b2a60a06b7f94612bfb7c Mon Sep 17 00:00:00 2001 From: guillermodotn Date: Tue, 24 Mar 2026 15:00:44 +0100 Subject: [PATCH 10/15] fix: harden auth security and add missing validation Assisted-by: Claude Opus --- doc/cli-localize.rst | 11 ++- productmd/localize.py | 42 +++++++++- tests/test_localize.py | 172 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 222 insertions(+), 3 deletions(-) diff --git a/doc/cli-localize.rst b/doc/cli-localize.rst index 1b40019..55daa60 100644 --- a/doc/cli-localize.rst +++ b/doc/cli-localize.rst @@ -70,7 +70,16 @@ following precedence order (highest first): Only one of Bearer token or Basic credentials may be specified. Providing both ``--http-token`` and ``--http-username``/``--http-password`` -is an error. +is an error. Both ``--http-username`` and ``--http-password`` must be +provided together. + +.. note:: + + When using ``--http-token`` or ``--http-username``/``--http-password``, + the credentials are sent to **all** HTTP download hosts referenced in + the compose metadata. If your compose references multiple hosts, use + ``~/.netrc`` instead — it resolves credentials per hostname + automatically. When no explicit credentials are given, the tool automatically checks ``~/.netrc`` for entries matching the download URL hostname. This is diff --git a/productmd/localize.py b/productmd/localize.py index a02f301..cdb1941 100644 --- a/productmd/localize.py +++ b/productmd/localize.py @@ -130,6 +130,19 @@ def _get_netrc_auth_header( return None +def _validate_credential(value: str, name: str) -> str: + """Reject credential values containing CR or LF to prevent header injection. + + :param value: The credential value to validate + :param name: Human-readable name for error messages (e.g., "token") + :return: The value unchanged if valid + :raises ValueError: If the value contains CR or LF characters + """ + if "\r" in value or "\n" in value: + raise ValueError(f"{name} contains illegal newline characters") + return value + + def _build_auth_header( url: str, username: Optional[str] = None, @@ -154,15 +167,32 @@ def _build_auth_header( :param netrc_file: Path to a netrc file (default: ``~/.netrc``) :return: Authorization header value, or ``None`` if no credentials are available + :raises ValueError: If any credential contains CR or LF characters """ if token is not None: + _validate_credential(token, "token") return f"Bearer {token}" if username is not None and password is not None: + _validate_credential(username, "username") + _validate_credential(password, "password") credentials = b64encode(f"{username}:{password}".encode()).decode() return f"Basic {credentials}" return _get_netrc_auth_header(url, netrc_file) +def _effective_port(parsed): + """Return the effective port for a parsed URL. + + Resolves ``None`` (no explicit port) to the default port for the + scheme: 443 for HTTPS, 80 for HTTP. This prevents false mismatches + when comparing ``https://host/path`` (port=None) with + ``https://host:443/path`` (port=443). + """ + if parsed.port is not None: + return parsed.port + return 443 if parsed.scheme == "https" else 80 + + class _SafeRedirectHandler(urllib.request.HTTPRedirectHandler): """Redirect handler that strips Authorization on cross-origin redirects. @@ -176,8 +206,8 @@ def redirect_request(self, req, fp, code, msg, headers, newurl): if new_req is not None: original = urlparse(req.full_url) redirect = urlparse(new_req.full_url) - original_origin = (original.scheme, original.hostname, original.port) - redirect_origin = (redirect.scheme, redirect.hostname, redirect.port) + original_origin = (original.scheme, original.hostname, _effective_port(original)) + redirect_origin = (redirect.scheme, redirect.hostname, _effective_port(redirect)) if original_origin != redirect_origin: new_req.remove_header("Authorization") logger.debug( @@ -743,7 +773,15 @@ def localize_compose( :return: :class:`LocalizeResult` with download statistics :raises RuntimeError: If OCI URLs are present but oras-py is not installed :raises urllib.error.URLError: If a download fails and fail_fast is True + :raises ValueError: If auth parameters are invalid (e.g., username + without password, or token with username/password) """ + # Validate auth parameters + if (http_username is None) != (http_password is None): + raise ValueError("http_username and http_password must be provided together") + if http_token and (http_username or http_password): + raise ValueError("http_token is mutually exclusive with http_username/http_password") + # Collect all remote download tasks http_tasks, oci_tasks = _collect_download_tasks( output_dir, diff --git a/tests/test_localize.py b/tests/test_localize.py index f62df01..3087592 100644 --- a/tests/test_localize.py +++ b/tests/test_localize.py @@ -21,6 +21,7 @@ _get_netrc_auth_header, _should_skip, _should_skip_oci, + _validate_credential, localize_compose, ) from productmd.location import FileEntry, Location @@ -1467,3 +1468,174 @@ def test_no_auth_header_no_error(self): assert new_req is not None assert new_req.get_header("Authorization") is None + + def test_keeps_auth_on_default_port_redirect(self): + """Test that https://host → https://host:443 is treated as same origin.""" + handler = _SafeRedirectHandler() + + new_req = self._redirect( + handler, + "https://pulp.example.com/file.rpm", + "https://pulp.example.com:443/file.rpm", + auth_header="Basic abc123", + ) + + assert new_req is not None + assert new_req.get_header("Authorization") == "Basic abc123" + + def test_keeps_auth_on_default_http_port_redirect(self): + """Test that http://host → http://host:80 is treated as same origin.""" + handler = _SafeRedirectHandler() + + new_req = self._redirect( + handler, + "http://pulp.example.com/file.rpm", + "http://pulp.example.com:80/file.rpm", + auth_header="Bearer my-token", + ) + + assert new_req is not None + assert new_req.get_header("Authorization") == "Bearer my-token" + + +# --------------------------------------------------------------------------- +# Tests: Credential validation +# --------------------------------------------------------------------------- + + +class TestValidateCredential: + """Tests for _validate_credential header injection prevention.""" + + def test_valid_credential(self): + """Normal credential values pass through unchanged.""" + assert _validate_credential("admin", "username") == "admin" + assert _validate_credential("secret123", "password") == "secret123" + assert _validate_credential("eyJhbGciOiJSUzI1NiIs", "token") == "eyJhbGciOiJSUzI1NiIs" + + @pytest.mark.parametrize( + "value", + [ + "token\nevil", + "token\revil", + "token\r\nevil-header: injected", + ], + ) + def test_rejects_newline_characters(self, value): + """Credentials with CR, LF, or CRLF are rejected.""" + with pytest.raises(ValueError, match="newline"): + _validate_credential(value, "token") + + @pytest.mark.parametrize( + "kwargs", + [ + {"token": "bad\ntoken"}, + {"username": "bad\nuser", "password": "pass"}, + {"username": "user", "password": "bad\npass"}, + ], + ) + def test_build_auth_header_rejects_bad_credentials(self, kwargs): + """_build_auth_header raises on any credential with newlines.""" + with pytest.raises(ValueError, match="newline"): + _build_auth_header("https://example.com", **kwargs) + + +# --------------------------------------------------------------------------- +# Tests: Library-level auth validation +# --------------------------------------------------------------------------- + + +class TestLocalizeComposeAuthValidation: + """Tests for auth parameter validation in localize_compose.""" + + def test_username_without_password_raises(self): + """localize_compose raises ValueError when username is given without password.""" + with pytest.raises(ValueError, match="must be provided together"): + localize_compose( + output_dir="/tmp/test", + http_username="admin", + ) + + def test_password_without_username_raises(self): + """localize_compose raises ValueError when password is given without username.""" + with pytest.raises(ValueError, match="must be provided together"): + localize_compose( + output_dir="/tmp/test", + http_password="secret", + ) + + def test_token_with_username_raises(self): + """localize_compose raises ValueError when token and username are both given.""" + with pytest.raises(ValueError, match="mutually exclusive"): + localize_compose( + output_dir="/tmp/test", + http_token="my-token", + http_username="admin", + http_password="secret", + ) + + +# --------------------------------------------------------------------------- +# Tests: Netrc default entry +# --------------------------------------------------------------------------- + + +class TestNetrcDefaultEntry: + """Tests for netrc 'default' machine entry.""" + + def test_netrc_default_entry_matches(self, tmp_path): + """Test that the netrc 'default' entry matches any hostname.""" + netrc_file = tmp_path / ".netrc" + netrc_file.write_text("default\nlogin fallback-user\npassword fallback-pass\n") + + result = _get_netrc_auth_header("https://any-host.example.com/file.rpm", str(netrc_file)) + + assert result is not None + import base64 + + decoded = base64.b64decode(result.split(" ", 1)[1]).decode() + assert decoded == "fallback-user:fallback-pass" + + def test_netrc_specific_overrides_default(self, tmp_path): + """Test that a specific machine entry takes precedence over default.""" + netrc_file = tmp_path / ".netrc" + netrc_file.write_text( + "machine pulp.example.com\nlogin specific-user\npassword specific-pass\n\n" + "default\nlogin fallback-user\npassword fallback-pass\n" + ) + + result = _get_netrc_auth_header("https://pulp.example.com/file.rpm", str(netrc_file)) + + assert result is not None + import base64 + + decoded = base64.b64decode(result.split(" ", 1)[1]).decode() + assert decoded == "specific-user:specific-pass" + + +# --------------------------------------------------------------------------- +# Tests: Parallel downloads with auth +# --------------------------------------------------------------------------- + + +class TestParallelDownloadsAuth: + """Tests for auth in parallel download path.""" + + @patch("productmd.localize.urllib.request.urlopen") + def test_parallel_downloads_send_auth(self, mock_urlopen, tmp_path): + """Test that auth headers are sent when using parallel downloads.""" + mock_urlopen.return_value = _mock_urlopen(b"x" * 512) + images = _create_images_v2() + + localize_compose( + output_dir=str(tmp_path), + images=images, + parallel_downloads=2, + verify_checksums=False, + http_username="admin", + http_password="secret", + ) + + # All calls should have the auth header + for call in mock_urlopen.call_args_list: + req = call[0][0] + assert req.get_header("Authorization").startswith("Basic ") From 817d12dda891baf158266ce97efcb9b9a520d3a9 Mon Sep 17 00:00:00 2001 From: guillermodotn Date: Tue, 24 Mar 2026 15:30:50 +0100 Subject: [PATCH 11/15] refactor: split test_localize.py into three focused files Assisted-by: Claude Opus --- tests/test_localize.py | 983 +----------------------------------- tests/test_localize_auth.py | 494 ++++++++++++++++++ tests/test_localize_oci.py | 497 ++++++++++++++++++ 3 files changed, 992 insertions(+), 982 deletions(-) create mode 100644 tests/test_localize_auth.py create mode 100644 tests/test_localize_oci.py diff --git a/tests/test_localize.py b/tests/test_localize.py index 3087592..0f4fbfb 100644 --- a/tests/test_localize.py +++ b/tests/test_localize.py @@ -1,9 +1,7 @@ -"""Tests for the localization tool (localize_compose, _download_https).""" +"""Tests for the core localization tool (download, skip, compose, dedup).""" -import http.client import io import os -import urllib.request from unittest.mock import MagicMock, patch import pytest @@ -12,16 +10,9 @@ from productmd.localize import ( HttpTask, LocalizeResult, - OciTask, - _SafeRedirectHandler, - _build_auth_header, _deduplicate_http_tasks, _download_https, - _download_single_oci, - _get_netrc_auth_header, _should_skip, - _should_skip_oci, - _validate_credential, localize_compose, ) from productmd.location import FileEntry, Location @@ -614,462 +605,6 @@ def test_metadata_downgraded_to_v12(self, mock_urlopen, tmp_path): assert "location" not in img -# --------------------------------------------------------------------------- -# Tests: OCI-specific skip and download logic -# --------------------------------------------------------------------------- - - -class TestShouldSkipOci: - """Tests for the _should_skip_oci function.""" - - def test_skip_oci_contents_all_exist(self, tmp_path): - """Test skip when all content files exist with valid checksums.""" - from productmd.location import compute_checksum as cc - - dest_dir = str(tmp_path / "images") - os.makedirs(os.path.join(dest_dir, "pxeboot")) - - # Create files - vmlinuz = os.path.join(dest_dir, "pxeboot", "vmlinuz") - with open(vmlinuz, "wb") as f: - f.write(b"kernel content") - checksum = cc(vmlinuz, "sha256") - - contents = [ - FileEntry(file="pxeboot/vmlinuz", size=14, checksum=checksum, layer_digest="sha256:" + "a" * 64), - ] - task = OciTask( - oci_url="oci://registry/repo@sha256:" + "b" * 64, - dest_dir=dest_dir, - contents=contents, - location=Location(url="oci://registry/repo@sha256:" + "b" * 64, local_path="images"), - rel_path="images/pxeboot/vmlinuz", - ) - - assert _should_skip_oci(task, verify_checksums=True) - - def test_no_skip_oci_contents_missing_file(self, tmp_path): - """Test no skip when a content file is missing.""" - dest_dir = str(tmp_path / "images") - os.makedirs(dest_dir) - - contents = [ - FileEntry(file="pxeboot/vmlinuz", size=14, checksum=None, layer_digest="sha256:" + "a" * 64), - ] - task = OciTask( - oci_url="oci://registry/repo@sha256:" + "b" * 64, - dest_dir=dest_dir, - contents=contents, - location=Location(url="oci://registry/repo@sha256:" + "b" * 64, local_path="images"), - rel_path="images/pxeboot/vmlinuz", - ) - - assert not _should_skip_oci(task, verify_checksums=True) - - def test_no_skip_oci_contents_bad_checksum(self, tmp_path): - """Test no skip when a content file has wrong checksum.""" - dest_dir = str(tmp_path / "images") - os.makedirs(os.path.join(dest_dir, "pxeboot")) - - vmlinuz = os.path.join(dest_dir, "pxeboot", "vmlinuz") - with open(vmlinuz, "wb") as f: - f.write(b"kernel content") - - contents = [ - FileEntry( - file="pxeboot/vmlinuz", - size=14, - checksum="sha256:" + "f" * 64, # wrong - layer_digest="sha256:" + "a" * 64, - ), - ] - task = OciTask( - oci_url="oci://registry/repo@sha256:" + "b" * 64, - dest_dir=dest_dir, - contents=contents, - location=Location(url="oci://registry/repo@sha256:" + "b" * 64, local_path="images"), - rel_path="images/pxeboot/vmlinuz", - ) - - assert not _should_skip_oci(task, verify_checksums=True) - - def test_skip_oci_contents_no_verify(self, tmp_path): - """Test skip when all content files exist and verify_checksums=False.""" - dest_dir = str(tmp_path / "images") - os.makedirs(os.path.join(dest_dir, "pxeboot")) - - vmlinuz = os.path.join(dest_dir, "pxeboot", "vmlinuz") - with open(vmlinuz, "wb") as f: - f.write(b"kernel content") - - contents = [ - FileEntry( - file="pxeboot/vmlinuz", - size=14, - checksum="sha256:" + "f" * 64, # wrong but not checked - layer_digest="sha256:" + "a" * 64, - ), - ] - task = OciTask( - oci_url="oci://registry/repo@sha256:" + "b" * 64, - dest_dir=dest_dir, - contents=contents, - location=Location(url="oci://registry/repo@sha256:" + "b" * 64, local_path="images"), - rel_path="images/pxeboot/vmlinuz", - ) - - # Not verified, so existence alone is enough - assert _should_skip_oci(task, verify_checksums=False) - - def test_skip_oci_simple_existing_file(self, tmp_path): - """Test skip for simple OCI (no contents) when file exists.""" - # For simple OCI, dest_dir is the compose root and the file - # is at dest_dir/local_path - dest_dir = str(tmp_path / "compose") - os.makedirs(dest_dir) - file_path = os.path.join(dest_dir, "output.iso") - with open(file_path, "wb") as f: - f.write(b"iso content") - - task = OciTask( - oci_url="oci://registry/repo@sha256:" + "b" * 64, - dest_dir=dest_dir, - contents=[], - location=Location(url="oci://registry/repo@sha256:" + "b" * 64, local_path="output.iso"), - rel_path="output.iso", - ) - - assert _should_skip_oci(task, verify_checksums=False) - - -class TestOciLocalizeIntegration: - """Integration-style tests for OCI localization within localize_compose.""" - - @patch("productmd.oci.get_downloader") - @patch("productmd.oci.HAS_ORAS", True) - def test_oci_download_error_fail_fast(self, mock_get_downloader, tmp_path): - """Test that OCI download errors respect fail_fast.""" - mock_downloader = MagicMock() - mock_downloader.download_and_extract.side_effect = RuntimeError("registry unreachable") - mock_get_downloader.return_value = mock_downloader - - im = Images() - im.header.version = "2.0" - im.compose.id = "Test-1.0-20260204.0" - im.compose.type = "production" - im.compose.date = "20260204" - im.compose.respin = 0 - im.output_version = VERSION_2_0 - - img = _make_image(im, "Server/x86_64/iso/boot.iso", 512, "a" * 64) - img.location = Location( - url="oci://quay.io/fedora/server:41-x86_64@sha256:" + "a" * 64, - size=512, - checksum="sha256:" + "a" * 64, - local_path="Server/x86_64/iso/boot.iso", - ) - im.add("Server", "x86_64", img) - - result = localize_compose( - output_dir=str(tmp_path / "output"), - images=im, - fail_fast=True, - verify_checksums=False, - ) - - assert result.failed == 1 - assert result.downloaded == 0 - assert len(result.errors) == 1 - assert "registry unreachable" in str(result.errors[0][1]) - - @patch("productmd.oci.get_downloader") - @patch("productmd.oci.HAS_ORAS", True) - def test_oci_progress_events(self, mock_get_downloader, tmp_path): - """Test that OCI downloads emit progress events.""" - mock_downloader = MagicMock() - mock_get_downloader.return_value = mock_downloader - - im = Images() - im.header.version = "2.0" - im.compose.id = "Test-1.0-20260204.0" - im.compose.type = "production" - im.compose.date = "20260204" - im.compose.respin = 0 - im.output_version = VERSION_2_0 - - img = _make_image(im, "Server/x86_64/iso/boot.iso", 512, "a" * 64) - img.location = Location( - url="oci://quay.io/fedora/server:41-x86_64@sha256:" + "a" * 64, - size=512, - checksum="sha256:" + "a" * 64, - local_path="Server/x86_64/iso/boot.iso", - ) - im.add("Server", "x86_64", img) - - events = [] - localize_compose( - output_dir=str(tmp_path / "output"), - images=im, - verify_checksums=False, - progress_callback=events.append, - ) - - event_types = [e.event_type for e in events] - assert "start" in event_types - assert "complete" in event_types - - @patch("productmd.oci.get_downloader") - @patch("productmd.oci.HAS_ORAS", True) - @patch("productmd.localize.urllib.request.urlopen") - def test_mixed_http_and_oci_downloads(self, mock_urlopen, mock_get_downloader, tmp_path): - """Test that a compose with both HTTP and OCI URLs works.""" - mock_urlopen.return_value = _mock_urlopen(b"http content") - mock_downloader = MagicMock() - mock_get_downloader.return_value = mock_downloader - - im = Images() - im.header.version = "2.0" - im.compose.id = "Test-1.0-20260204.0" - im.compose.type = "production" - im.compose.date = "20260204" - im.compose.respin = 0 - im.output_version = VERSION_2_0 - - # HTTP image - img_http = _make_image(im, "Server/x86_64/iso/boot.iso", 512, "a" * 64) - img_http.location = Location( - url="https://cdn.example.com/Server/x86_64/iso/boot.iso", - size=512, - checksum="sha256:" + "a" * 64, - local_path="Server/x86_64/iso/boot.iso", - ) - im.add("Server", "x86_64", img_http) - - # OCI image - img_oci = _make_image(im, "Server/aarch64/iso/boot.iso", 1024, "b" * 64) - img_oci.arch = "aarch64" - img_oci.location = Location( - url="oci://quay.io/fedora/server:41-aarch64@sha256:" + "b" * 64, - size=1024, - checksum="sha256:" + "b" * 64, - local_path="Server/aarch64/iso/boot.iso", - ) - im.add("Server", "aarch64", img_oci) - - result = localize_compose( - output_dir=str(tmp_path / "output"), - images=im, - parallel_downloads=1, - verify_checksums=False, - retries=0, - ) - - # 1 HTTP + 1 OCI - assert result.downloaded == 2 - assert result.failed == 0 - mock_urlopen.assert_called_once() - mock_downloader.download_and_extract.assert_called_once() - - -# --------------------------------------------------------------------------- -# Tests: Parallel OCI downloads -# --------------------------------------------------------------------------- - - -class TestOciParallelDownloads: - """Tests for parallel OCI download execution.""" - - @patch("productmd.oci.get_downloader") - @patch("productmd.oci.HAS_ORAS", True) - def test_parallel_multiple_oci_tasks(self, mock_get_downloader, tmp_path): - """Test that multiple OCI tasks are all downloaded in parallel mode.""" - mock_downloader = MagicMock() - mock_get_downloader.return_value = mock_downloader - - im = Images() - im.header.version = "2.0" - im.compose.id = "Test-1.0-20260204.0" - im.compose.type = "production" - im.compose.date = "20260204" - im.compose.respin = 0 - im.output_version = VERSION_2_0 - - # Create 3 OCI images on different arches - for arch, hex_char in [("x86_64", "a"), ("aarch64", "b"), ("s390x", "c")]: - img = _make_image(im, f"Server/{arch}/iso/boot.iso", 512, hex_char * 64) - img.arch = arch - img.location = Location( - url=f"oci://quay.io/fedora/server:41-{arch}@sha256:" + hex_char * 64, - size=512, - checksum="sha256:" + hex_char * 64, - local_path=f"Server/{arch}/iso/boot.iso", - ) - im.add("Server", arch, img) - - result = localize_compose( - output_dir=str(tmp_path / "output"), - images=im, - parallel_downloads=3, - verify_checksums=False, - ) - - assert result.downloaded == 3 - assert result.failed == 0 - # Each task creates its own downloader (thread-safety) - assert mock_get_downloader.call_count == 3 - assert mock_downloader.download_and_extract.call_count == 3 - - @patch("productmd.oci.get_downloader") - @patch("productmd.oci.HAS_ORAS", True) - def test_parallel_oci_fail_fast_stops(self, mock_get_downloader, tmp_path): - """Test that fail_fast cancels remaining parallel OCI tasks on error.""" - mock_downloader = MagicMock() - mock_downloader.download_and_extract.side_effect = RuntimeError("pull failed") - mock_get_downloader.return_value = mock_downloader - - im = Images() - im.header.version = "2.0" - im.compose.id = "Test-1.0-20260204.0" - im.compose.type = "production" - im.compose.date = "20260204" - im.compose.respin = 0 - im.output_version = VERSION_2_0 - - for arch, hex_char in [("x86_64", "a"), ("aarch64", "b"), ("s390x", "c")]: - img = _make_image(im, f"Server/{arch}/iso/boot.iso", 512, hex_char * 64) - img.arch = arch - img.location = Location( - url=f"oci://quay.io/fedora/server:41-{arch}@sha256:" + hex_char * 64, - size=512, - checksum="sha256:" + hex_char * 64, - local_path=f"Server/{arch}/iso/boot.iso", - ) - im.add("Server", arch, img) - - result = localize_compose( - output_dir=str(tmp_path / "output"), - images=im, - parallel_downloads=2, - fail_fast=True, - verify_checksums=False, - ) - - assert result.downloaded == 0 - assert result.failed >= 1 - assert len(result.errors) >= 1 - - @patch("productmd.oci.get_downloader") - @patch("productmd.oci.HAS_ORAS", True) - def test_parallel_oci_no_fail_fast_continues(self, mock_get_downloader, tmp_path): - """Test that fail_fast=False continues after OCI errors in parallel mode.""" - call_count = 0 - - def side_effect(**kwargs): - nonlocal call_count - call_count += 1 - if call_count == 1: - raise RuntimeError("first pull failed") - # Subsequent calls succeed - - mock_downloader = MagicMock() - mock_downloader.download_and_extract.side_effect = side_effect - mock_get_downloader.return_value = mock_downloader - - im = Images() - im.header.version = "2.0" - im.compose.id = "Test-1.0-20260204.0" - im.compose.type = "production" - im.compose.date = "20260204" - im.compose.respin = 0 - im.output_version = VERSION_2_0 - - for arch, hex_char in [("x86_64", "a"), ("aarch64", "b")]: - img = _make_image(im, f"Server/{arch}/iso/boot.iso", 512, hex_char * 64) - img.arch = arch - img.location = Location( - url=f"oci://quay.io/fedora/server:41-{arch}@sha256:" + hex_char * 64, - size=512, - checksum="sha256:" + hex_char * 64, - local_path=f"Server/{arch}/iso/boot.iso", - ) - im.add("Server", arch, img) - - result = localize_compose( - output_dir=str(tmp_path / "output"), - images=im, - parallel_downloads=2, - fail_fast=False, - verify_checksums=False, - ) - - assert result.downloaded == 1 - assert result.failed == 1 - assert len(result.errors) == 1 - - @patch("productmd.oci.get_downloader") - @patch("productmd.oci.HAS_ORAS", True) - def test_sequential_oci_with_parallel_1(self, mock_get_downloader, tmp_path): - """Test that parallel_downloads=1 uses sequential OCI path.""" - mock_downloader = MagicMock() - mock_get_downloader.return_value = mock_downloader - - im = Images() - im.header.version = "2.0" - im.compose.id = "Test-1.0-20260204.0" - im.compose.type = "production" - im.compose.date = "20260204" - im.compose.respin = 0 - im.output_version = VERSION_2_0 - - for arch, hex_char in [("x86_64", "a"), ("aarch64", "b")]: - img = _make_image(im, f"Server/{arch}/iso/boot.iso", 512, hex_char * 64) - img.arch = arch - img.location = Location( - url=f"oci://quay.io/fedora/server:41-{arch}@sha256:" + hex_char * 64, - size=512, - checksum="sha256:" + hex_char * 64, - local_path=f"Server/{arch}/iso/boot.iso", - ) - im.add("Server", arch, img) - - result = localize_compose( - output_dir=str(tmp_path / "output"), - images=im, - parallel_downloads=1, - verify_checksums=False, - ) - - assert result.downloaded == 2 - assert result.failed == 0 - # Each task still creates its own downloader - assert mock_get_downloader.call_count == 2 - - @patch("productmd.oci.get_downloader") - @patch("productmd.oci.HAS_ORAS", True) - def test_download_single_oci_creates_own_downloader(self, mock_get_downloader, tmp_path): - """Test that _download_single_oci creates a fresh downloader per call.""" - mock_downloader = MagicMock() - mock_get_downloader.return_value = mock_downloader - - task = OciTask( - oci_url="oci://registry/repo@sha256:" + "a" * 64, - dest_dir=str(tmp_path / "out"), - contents=[], - location=Location( - url="oci://registry/repo@sha256:" + "a" * 64, - size=100, - local_path="out", - ), - rel_path="out/file", - ) - - _download_single_oci(task, progress_callback=None) - _download_single_oci(task, progress_callback=None) - - # get_downloader called once per invocation (fresh downloader each time) - assert mock_get_downloader.call_count == 2 - - # --------------------------------------------------------------------------- # Tests: HTTP task deduplication # --------------------------------------------------------------------------- @@ -1123,519 +658,3 @@ def test_same_url_different_dest_keeps_both(self): ) result = _deduplicate_http_tasks([task1, task2]) assert len(result) == 2 - - -# --------------------------------------------------------------------------- -# Tests: HTTP Authentication -# --------------------------------------------------------------------------- - - -class TestGetNetrcAuthHeader: - """Tests for _get_netrc_auth_header.""" - - def test_netrc_auth_found(self, tmp_path): - """Test that matching netrc credentials produce a Basic auth header.""" - netrc_file = tmp_path / ".netrc" - netrc_file.write_text("machine pulp.example.com\nlogin admin\npassword secret123\n") - - result = _get_netrc_auth_header("https://pulp.example.com/repo/file.rpm", str(netrc_file)) - - assert result is not None - assert result.startswith("Basic ") - import base64 - - decoded = base64.b64decode(result.split(" ", 1)[1]).decode() - assert decoded == "admin:secret123" - - def test_netrc_host_not_found(self, tmp_path): - """Test that no match returns None.""" - netrc_file = tmp_path / ".netrc" - netrc_file.write_text("machine other.example.com\nlogin user\npassword pass\n") - - result = _get_netrc_auth_header("https://pulp.example.com/file.rpm", str(netrc_file)) - - assert result is None - - def test_netrc_file_missing(self): - """Test that a missing netrc file returns None without error.""" - result = _get_netrc_auth_header("https://pulp.example.com/file.rpm", "/nonexistent/.netrc") - - assert result is None - - def test_netrc_parse_error(self, tmp_path): - """Test that a malformed netrc file returns None without error.""" - netrc_file = tmp_path / ".netrc" - netrc_file.write_text("this is not valid netrc content!!!\n~~~") - - result = _get_netrc_auth_header("https://pulp.example.com/file.rpm", str(netrc_file)) - - assert result is None - - def test_netrc_custom_file(self, tmp_path): - """Test that a custom netrc file path is used.""" - custom_netrc = tmp_path / "custom-netrc" - custom_netrc.write_text("machine cdn.example.com\nlogin deploy\npassword s3cret\n") - - result = _get_netrc_auth_header("https://cdn.example.com/boot.iso", str(custom_netrc)) - - assert result is not None - import base64 - - decoded = base64.b64decode(result.split(" ", 1)[1]).decode() - assert decoded == "deploy:s3cret" - - def test_netrc_no_hostname_in_url(self): - """Test that a URL with no hostname returns None.""" - result = _get_netrc_auth_header("file:///local/path", None) - - assert result is None - - -class TestBuildAuthHeader: - """Tests for _build_auth_header precedence logic.""" - - def test_explicit_basic_auth(self): - """Test that explicit username/password produce a Basic header.""" - result = _build_auth_header("https://example.com/file", username="user", password="pass") - - assert result is not None - assert result.startswith("Basic ") - import base64 - - decoded = base64.b64decode(result.split(" ", 1)[1]).decode() - assert decoded == "user:pass" - - def test_bearer_token(self): - """Test that a token produces a Bearer header.""" - result = _build_auth_header("https://example.com/file", token="my-jwt-token") - - assert result == "Bearer my-jwt-token" - - def test_token_overrides_basic(self): - """Test that token takes precedence over username/password.""" - result = _build_auth_header( - "https://example.com/file", - username="user", - password="pass", - token="my-token", - ) - - assert result == "Bearer my-token" - - def test_token_overrides_netrc(self, tmp_path): - """Test that token takes precedence over netrc credentials.""" - netrc_file = tmp_path / ".netrc" - netrc_file.write_text("machine example.com\nlogin admin\npassword secret\n") - - result = _build_auth_header( - "https://example.com/file", - token="my-token", - netrc_file=str(netrc_file), - ) - - assert result == "Bearer my-token" - - def test_explicit_basic_overrides_netrc(self, tmp_path): - """Test that explicit credentials override netrc.""" - netrc_file = tmp_path / ".netrc" - netrc_file.write_text("machine example.com\nlogin netrc-user\npassword netrc-pass\n") - - result = _build_auth_header( - "https://example.com/file", - username="explicit-user", - password="explicit-pass", - netrc_file=str(netrc_file), - ) - - assert result is not None - assert result.startswith("Basic ") - import base64 - - decoded = base64.b64decode(result.split(" ", 1)[1]).decode() - assert decoded == "explicit-user:explicit-pass" - - def test_falls_back_to_netrc(self, tmp_path): - """Test that netrc is used when no explicit credentials are given.""" - netrc_file = tmp_path / ".netrc" - netrc_file.write_text("machine example.com\nlogin netrc-user\npassword netrc-pass\n") - - result = _build_auth_header("https://example.com/file", netrc_file=str(netrc_file)) - - assert result is not None - import base64 - - decoded = base64.b64decode(result.split(" ", 1)[1]).decode() - assert decoded == "netrc-user:netrc-pass" - - def test_no_credentials_returns_none(self): - """Test that None is returned when no auth is available.""" - result = _build_auth_header( - "https://example.com/file", - netrc_file="/nonexistent/.netrc", - ) - - assert result is None - - -class TestDownloadHttpsAuth: - """Tests for authentication in _download_https.""" - - @patch("productmd.localize.urllib.request.urlopen") - def test_download_sends_basic_auth_header(self, mock_urlopen, tmp_path): - """Test that explicit credentials add Authorization header to request.""" - mock_urlopen.return_value = _mock_urlopen(b"content") - dest = str(tmp_path / "file.rpm") - - _download_https( - "https://pulp.example.com/file.rpm", - dest, - retries=0, - username="admin", - password="secret", - ) - - req = mock_urlopen.call_args[0][0] - assert req.get_header("Authorization").startswith("Basic ") - - @patch("productmd.localize.urllib.request.urlopen") - def test_download_sends_bearer_token_header(self, mock_urlopen, tmp_path): - """Test that a token adds a Bearer Authorization header.""" - mock_urlopen.return_value = _mock_urlopen(b"content") - dest = str(tmp_path / "file.rpm") - - _download_https( - "https://pulp.example.com/file.rpm", - dest, - retries=0, - token="jwt-token-here", - ) - - req = mock_urlopen.call_args[0][0] - assert req.get_header("Authorization") == "Bearer jwt-token-here" - - @patch("productmd.localize.urllib.request.urlopen") - def test_download_sends_netrc_auth_header(self, mock_urlopen, tmp_path): - """Test that netrc credentials add Authorization header.""" - mock_urlopen.return_value = _mock_urlopen(b"content") - netrc_file = tmp_path / ".netrc" - netrc_file.write_text("machine pulp.example.com\nlogin admin\npassword secret\n") - dest = str(tmp_path / "file.rpm") - - _download_https( - "https://pulp.example.com/file.rpm", - dest, - retries=0, - netrc_file=str(netrc_file), - ) - - req = mock_urlopen.call_args[0][0] - assert req.get_header("Authorization").startswith("Basic ") - - @patch("productmd.localize.urllib.request.urlopen") - def test_download_no_auth_by_default(self, mock_urlopen, tmp_path): - """Test that no Authorization header is set when no auth is configured.""" - mock_urlopen.return_value = _mock_urlopen(b"content") - dest = str(tmp_path / "file.rpm") - - _download_https( - "https://pulp.example.com/file.rpm", - dest, - retries=0, - netrc_file="/nonexistent/.netrc", - ) - - req = mock_urlopen.call_args[0][0] - assert req.get_header("Authorization") is None - - @patch("productmd.localize.urllib.request.urlopen") - def test_localize_compose_passes_auth_credentials(self, mock_urlopen, tmp_path): - """Test that localize_compose passes auth params to _download_https.""" - mock_urlopen.return_value = _mock_urlopen(b"x" * 512) - images = _create_images_v2() - - localize_compose( - output_dir=str(tmp_path), - images=images, - parallel_downloads=1, - verify_checksums=False, - http_username="user", - http_password="pass", - ) - - req = mock_urlopen.call_args[0][0] - assert req.get_header("Authorization").startswith("Basic ") - - @patch("productmd.localize.urllib.request.urlopen") - def test_localize_compose_passes_token(self, mock_urlopen, tmp_path): - """Test that localize_compose passes token to _download_https.""" - mock_urlopen.return_value = _mock_urlopen(b"x" * 512) - images = _create_images_v2() - - localize_compose( - output_dir=str(tmp_path), - images=images, - parallel_downloads=1, - verify_checksums=False, - http_token="my-bearer-token", - ) - - req = mock_urlopen.call_args[0][0] - assert req.get_header("Authorization") == "Bearer my-bearer-token" - - -# --------------------------------------------------------------------------- -# Tests: Safe Redirect Handler -# --------------------------------------------------------------------------- - - -class TestSafeRedirectHandler: - """Tests for _SafeRedirectHandler cross-origin auth stripping.""" - - def _redirect(self, handler, orig_url, new_url, auth_header=None): - """Helper to invoke redirect_request with minimal boilerplate.""" - req = urllib.request.Request(orig_url) - if auth_header: - req.add_header("Authorization", auth_header) - headers = http.client.HTTPMessage() - fp = io.BytesIO(b"") - return handler.redirect_request(req, fp, 302, "Found", headers, new_url) - - def test_strips_auth_on_cross_origin_redirect(self): - """Test that Authorization is removed when redirecting to a different host.""" - handler = _SafeRedirectHandler() - - new_req = self._redirect( - handler, - "https://pulp.example.com/repo/file.rpm", - "https://cdn.example.com/cached-file.rpm", - auth_header="Basic abc123", - ) - - assert new_req is not None - assert new_req.get_header("Authorization") is None - - def test_keeps_auth_on_same_origin_redirect(self): - """Test that Authorization is preserved when redirecting to the same origin.""" - handler = _SafeRedirectHandler() - - new_req = self._redirect( - handler, - "https://pulp.example.com/repo/file.rpm", - "https://pulp.example.com/different/path.rpm", - auth_header="Basic abc123", - ) - - assert new_req is not None - assert new_req.get_header("Authorization") == "Basic abc123" - - def test_strips_auth_on_scheme_downgrade(self): - """Test that Authorization is removed on HTTPS -> HTTP downgrade.""" - handler = _SafeRedirectHandler() - - new_req = self._redirect( - handler, - "https://pulp.example.com/file.rpm", - "http://pulp.example.com/file.rpm", - auth_header="Bearer my-token", - ) - - assert new_req is not None - assert new_req.get_header("Authorization") is None - - def test_strips_auth_on_port_change(self): - """Test that Authorization is removed when the port changes.""" - handler = _SafeRedirectHandler() - - new_req = self._redirect( - handler, - "https://pulp.example.com:443/file.rpm", - "https://pulp.example.com:8443/file.rpm", - auth_header="Basic abc123", - ) - - assert new_req is not None - assert new_req.get_header("Authorization") is None - - def test_no_auth_header_no_error(self): - """Test that redirects without Authorization header work fine.""" - handler = _SafeRedirectHandler() - - new_req = self._redirect( - handler, - "https://pulp.example.com/file.rpm", - "https://cdn.example.com/file.rpm", - ) - - assert new_req is not None - assert new_req.get_header("Authorization") is None - - def test_keeps_auth_on_default_port_redirect(self): - """Test that https://host → https://host:443 is treated as same origin.""" - handler = _SafeRedirectHandler() - - new_req = self._redirect( - handler, - "https://pulp.example.com/file.rpm", - "https://pulp.example.com:443/file.rpm", - auth_header="Basic abc123", - ) - - assert new_req is not None - assert new_req.get_header("Authorization") == "Basic abc123" - - def test_keeps_auth_on_default_http_port_redirect(self): - """Test that http://host → http://host:80 is treated as same origin.""" - handler = _SafeRedirectHandler() - - new_req = self._redirect( - handler, - "http://pulp.example.com/file.rpm", - "http://pulp.example.com:80/file.rpm", - auth_header="Bearer my-token", - ) - - assert new_req is not None - assert new_req.get_header("Authorization") == "Bearer my-token" - - -# --------------------------------------------------------------------------- -# Tests: Credential validation -# --------------------------------------------------------------------------- - - -class TestValidateCredential: - """Tests for _validate_credential header injection prevention.""" - - def test_valid_credential(self): - """Normal credential values pass through unchanged.""" - assert _validate_credential("admin", "username") == "admin" - assert _validate_credential("secret123", "password") == "secret123" - assert _validate_credential("eyJhbGciOiJSUzI1NiIs", "token") == "eyJhbGciOiJSUzI1NiIs" - - @pytest.mark.parametrize( - "value", - [ - "token\nevil", - "token\revil", - "token\r\nevil-header: injected", - ], - ) - def test_rejects_newline_characters(self, value): - """Credentials with CR, LF, or CRLF are rejected.""" - with pytest.raises(ValueError, match="newline"): - _validate_credential(value, "token") - - @pytest.mark.parametrize( - "kwargs", - [ - {"token": "bad\ntoken"}, - {"username": "bad\nuser", "password": "pass"}, - {"username": "user", "password": "bad\npass"}, - ], - ) - def test_build_auth_header_rejects_bad_credentials(self, kwargs): - """_build_auth_header raises on any credential with newlines.""" - with pytest.raises(ValueError, match="newline"): - _build_auth_header("https://example.com", **kwargs) - - -# --------------------------------------------------------------------------- -# Tests: Library-level auth validation -# --------------------------------------------------------------------------- - - -class TestLocalizeComposeAuthValidation: - """Tests for auth parameter validation in localize_compose.""" - - def test_username_without_password_raises(self): - """localize_compose raises ValueError when username is given without password.""" - with pytest.raises(ValueError, match="must be provided together"): - localize_compose( - output_dir="/tmp/test", - http_username="admin", - ) - - def test_password_without_username_raises(self): - """localize_compose raises ValueError when password is given without username.""" - with pytest.raises(ValueError, match="must be provided together"): - localize_compose( - output_dir="/tmp/test", - http_password="secret", - ) - - def test_token_with_username_raises(self): - """localize_compose raises ValueError when token and username are both given.""" - with pytest.raises(ValueError, match="mutually exclusive"): - localize_compose( - output_dir="/tmp/test", - http_token="my-token", - http_username="admin", - http_password="secret", - ) - - -# --------------------------------------------------------------------------- -# Tests: Netrc default entry -# --------------------------------------------------------------------------- - - -class TestNetrcDefaultEntry: - """Tests for netrc 'default' machine entry.""" - - def test_netrc_default_entry_matches(self, tmp_path): - """Test that the netrc 'default' entry matches any hostname.""" - netrc_file = tmp_path / ".netrc" - netrc_file.write_text("default\nlogin fallback-user\npassword fallback-pass\n") - - result = _get_netrc_auth_header("https://any-host.example.com/file.rpm", str(netrc_file)) - - assert result is not None - import base64 - - decoded = base64.b64decode(result.split(" ", 1)[1]).decode() - assert decoded == "fallback-user:fallback-pass" - - def test_netrc_specific_overrides_default(self, tmp_path): - """Test that a specific machine entry takes precedence over default.""" - netrc_file = tmp_path / ".netrc" - netrc_file.write_text( - "machine pulp.example.com\nlogin specific-user\npassword specific-pass\n\n" - "default\nlogin fallback-user\npassword fallback-pass\n" - ) - - result = _get_netrc_auth_header("https://pulp.example.com/file.rpm", str(netrc_file)) - - assert result is not None - import base64 - - decoded = base64.b64decode(result.split(" ", 1)[1]).decode() - assert decoded == "specific-user:specific-pass" - - -# --------------------------------------------------------------------------- -# Tests: Parallel downloads with auth -# --------------------------------------------------------------------------- - - -class TestParallelDownloadsAuth: - """Tests for auth in parallel download path.""" - - @patch("productmd.localize.urllib.request.urlopen") - def test_parallel_downloads_send_auth(self, mock_urlopen, tmp_path): - """Test that auth headers are sent when using parallel downloads.""" - mock_urlopen.return_value = _mock_urlopen(b"x" * 512) - images = _create_images_v2() - - localize_compose( - output_dir=str(tmp_path), - images=images, - parallel_downloads=2, - verify_checksums=False, - http_username="admin", - http_password="secret", - ) - - # All calls should have the auth header - for call in mock_urlopen.call_args_list: - req = call[0][0] - assert req.get_header("Authorization").startswith("Basic ") diff --git a/tests/test_localize_auth.py b/tests/test_localize_auth.py new file mode 100644 index 0000000..ebbe7e1 --- /dev/null +++ b/tests/test_localize_auth.py @@ -0,0 +1,494 @@ +"""Tests for HTTP authentication in the localization tool.""" + +import base64 +import http.client +import io +import urllib.request +from unittest.mock import MagicMock, patch + +import pytest + +from productmd.images import Images +from productmd.localize import ( + _SafeRedirectHandler, + _build_auth_header, + _download_https, + _get_netrc_auth_header, + _validate_credential, + localize_compose, +) +from productmd.location import Location +from productmd.version import VERSION_2_0 + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _create_images_v2(): + """Create Images with explicit remote Location objects.""" + from productmd.images import Image + + im = Images() + im.header.version = "2.0" + im.compose.id = "Test-1.0-20260204.0" + im.compose.type = "production" + im.compose.date = "20260204" + im.compose.respin = 0 + im.output_version = VERSION_2_0 + + img = Image(im) + img.path = "Server/x86_64/iso/boot.iso" + img.mtime = 1738627200 + img.size = 512 + img.volume_id = "Test-1.0" + img.type = "dvd" + img.format = "iso" + img.arch = "x86_64" + img.disc_number = 1 + img.disc_count = 1 + img.checksums = {"sha256": "a" * 64} + img.subvariant = "Server" + img.location = Location( + url="https://cdn.example.com/Server/x86_64/iso/boot.iso", + size=512, + checksum="sha256:" + "a" * 64, + local_path="Server/x86_64/iso/boot.iso", + ) + im.add("Server", "x86_64", img) + + return im + + +def _mock_urlopen(content=b"fake file content", status=200, content_length=None): + """Create a mock response for urllib.request.urlopen.""" + response = MagicMock() + data = io.BytesIO(content) + response.read = data.read + if content_length is None: + content_length = str(len(content)) + response.headers = {"Content-Length": content_length} + response.status = status + return response + + +# --------------------------------------------------------------------------- +# Tests: Netrc auth header lookup +# --------------------------------------------------------------------------- + + +class TestGetNetrcAuthHeader: + """Tests for _get_netrc_auth_header.""" + + def test_netrc_auth_found(self, tmp_path): + """Test that matching netrc credentials produce a Basic auth header.""" + netrc_file = tmp_path / ".netrc" + netrc_file.write_text("machine pulp.example.com\nlogin admin\npassword secret123\n") + + result = _get_netrc_auth_header("https://pulp.example.com/repo/file.rpm", str(netrc_file)) + + assert result is not None + assert result.startswith("Basic ") + decoded = base64.b64decode(result.split(" ", 1)[1]).decode() + assert decoded == "admin:secret123" + + def test_netrc_host_not_found(self, tmp_path): + """Test that no match returns None.""" + netrc_file = tmp_path / ".netrc" + netrc_file.write_text("machine other.example.com\nlogin user\npassword pass\n") + + result = _get_netrc_auth_header("https://pulp.example.com/file.rpm", str(netrc_file)) + + assert result is None + + def test_netrc_file_missing(self): + """Test that a missing netrc file returns None without error.""" + result = _get_netrc_auth_header("https://pulp.example.com/file.rpm", "/nonexistent/.netrc") + + assert result is None + + def test_netrc_parse_error(self, tmp_path): + """Test that a malformed netrc file returns None without error.""" + netrc_file = tmp_path / ".netrc" + netrc_file.write_text("this is not valid netrc content!!!\n~~~") + + result = _get_netrc_auth_header("https://pulp.example.com/file.rpm", str(netrc_file)) + + assert result is None + + def test_netrc_custom_file(self, tmp_path): + """Test that a custom netrc file path is used.""" + custom_netrc = tmp_path / "custom-netrc" + custom_netrc.write_text("machine cdn.example.com\nlogin deploy\npassword s3cret\n") + + result = _get_netrc_auth_header("https://cdn.example.com/boot.iso", str(custom_netrc)) + + assert result is not None + decoded = base64.b64decode(result.split(" ", 1)[1]).decode() + assert decoded == "deploy:s3cret" + + def test_netrc_no_hostname_in_url(self): + """Test that a URL with no hostname returns None.""" + result = _get_netrc_auth_header("file:///local/path", None) + + assert result is None + + def test_netrc_default_entry_matches(self, tmp_path): + """Test that the netrc 'default' entry matches any hostname.""" + netrc_file = tmp_path / ".netrc" + netrc_file.write_text("default\nlogin fallback-user\npassword fallback-pass\n") + + result = _get_netrc_auth_header("https://any-host.example.com/file.rpm", str(netrc_file)) + + assert result is not None + decoded = base64.b64decode(result.split(" ", 1)[1]).decode() + assert decoded == "fallback-user:fallback-pass" + + def test_netrc_specific_overrides_default(self, tmp_path): + """Test that a specific machine entry takes precedence over default.""" + netrc_file = tmp_path / ".netrc" + netrc_file.write_text( + "machine pulp.example.com\nlogin specific-user\npassword specific-pass\n\n" + "default\nlogin fallback-user\npassword fallback-pass\n" + ) + + result = _get_netrc_auth_header("https://pulp.example.com/file.rpm", str(netrc_file)) + + assert result is not None + decoded = base64.b64decode(result.split(" ", 1)[1]).decode() + assert decoded == "specific-user:specific-pass" + + +# --------------------------------------------------------------------------- +# Tests: Auth header precedence +# --------------------------------------------------------------------------- + + +class TestBuildAuthHeader: + """Tests for _build_auth_header precedence logic.""" + + def test_explicit_basic_auth(self): + """Test that explicit username/password produce a Basic header.""" + result = _build_auth_header("https://example.com/file", username="user", password="pass") + + assert result is not None + assert result.startswith("Basic ") + decoded = base64.b64decode(result.split(" ", 1)[1]).decode() + assert decoded == "user:pass" + + def test_bearer_token(self): + """Test that a token produces a Bearer header.""" + result = _build_auth_header("https://example.com/file", token="my-jwt-token") + + assert result == "Bearer my-jwt-token" + + def test_token_overrides_basic(self): + """Test that token takes precedence over username/password.""" + result = _build_auth_header( + "https://example.com/file", + username="user", + password="pass", + token="my-token", + ) + + assert result == "Bearer my-token" + + def test_token_overrides_netrc(self, tmp_path): + """Test that token takes precedence over netrc credentials.""" + netrc_file = tmp_path / ".netrc" + netrc_file.write_text("machine example.com\nlogin admin\npassword secret\n") + + result = _build_auth_header( + "https://example.com/file", + token="my-token", + netrc_file=str(netrc_file), + ) + + assert result == "Bearer my-token" + + def test_explicit_basic_overrides_netrc(self, tmp_path): + """Test that explicit credentials override netrc.""" + netrc_file = tmp_path / ".netrc" + netrc_file.write_text("machine example.com\nlogin netrc-user\npassword netrc-pass\n") + + result = _build_auth_header( + "https://example.com/file", + username="explicit-user", + password="explicit-pass", + netrc_file=str(netrc_file), + ) + + assert result is not None + assert result.startswith("Basic ") + decoded = base64.b64decode(result.split(" ", 1)[1]).decode() + assert decoded == "explicit-user:explicit-pass" + + def test_falls_back_to_netrc(self, tmp_path): + """Test that netrc is used when no explicit credentials are given.""" + netrc_file = tmp_path / ".netrc" + netrc_file.write_text("machine example.com\nlogin netrc-user\npassword netrc-pass\n") + + result = _build_auth_header("https://example.com/file", netrc_file=str(netrc_file)) + + assert result is not None + decoded = base64.b64decode(result.split(" ", 1)[1]).decode() + assert decoded == "netrc-user:netrc-pass" + + def test_no_credentials_returns_none(self): + """Test that None is returned when no auth is available.""" + result = _build_auth_header( + "https://example.com/file", + netrc_file="/nonexistent/.netrc", + ) + + assert result is None + + +# --------------------------------------------------------------------------- +# Tests: Auth in _download_https +# --------------------------------------------------------------------------- + + +class TestDownloadHttpsAuth: + """Tests for authentication in _download_https.""" + + @patch("productmd.localize.urllib.request.urlopen") + def test_download_sends_basic_auth_header(self, mock_urlopen_fn, tmp_path): + """Test that explicit credentials add Authorization header to request.""" + mock_urlopen_fn.return_value = _mock_urlopen(b"content") + dest = str(tmp_path / "file.rpm") + + _download_https("https://pulp.example.com/file.rpm", dest, retries=0, username="admin", password="secret") + + req = mock_urlopen_fn.call_args[0][0] + assert req.get_header("Authorization").startswith("Basic ") + + @patch("productmd.localize.urllib.request.urlopen") + def test_download_sends_bearer_token_header(self, mock_urlopen_fn, tmp_path): + """Test that a token adds a Bearer Authorization header.""" + mock_urlopen_fn.return_value = _mock_urlopen(b"content") + dest = str(tmp_path / "file.rpm") + + _download_https("https://pulp.example.com/file.rpm", dest, retries=0, token="jwt-token-here") + + req = mock_urlopen_fn.call_args[0][0] + assert req.get_header("Authorization") == "Bearer jwt-token-here" + + @patch("productmd.localize.urllib.request.urlopen") + def test_download_sends_netrc_auth_header(self, mock_urlopen_fn, tmp_path): + """Test that netrc credentials add Authorization header.""" + mock_urlopen_fn.return_value = _mock_urlopen(b"content") + netrc_file = tmp_path / ".netrc" + netrc_file.write_text("machine pulp.example.com\nlogin admin\npassword secret\n") + dest = str(tmp_path / "file.rpm") + + _download_https("https://pulp.example.com/file.rpm", dest, retries=0, netrc_file=str(netrc_file)) + + req = mock_urlopen_fn.call_args[0][0] + assert req.get_header("Authorization").startswith("Basic ") + + @patch("productmd.localize.urllib.request.urlopen") + def test_download_no_auth_by_default(self, mock_urlopen_fn, tmp_path): + """Test that no Authorization header is set when no auth is configured.""" + mock_urlopen_fn.return_value = _mock_urlopen(b"content") + dest = str(tmp_path / "file.rpm") + + _download_https("https://pulp.example.com/file.rpm", dest, retries=0, netrc_file="/nonexistent/.netrc") + + req = mock_urlopen_fn.call_args[0][0] + assert req.get_header("Authorization") is None + + @patch("productmd.localize.urllib.request.urlopen") + def test_localize_compose_passes_auth_credentials(self, mock_urlopen_fn, tmp_path): + """Test that localize_compose passes auth params to _download_https.""" + mock_urlopen_fn.return_value = _mock_urlopen(b"x" * 512) + images = _create_images_v2() + + localize_compose( + output_dir=str(tmp_path), + images=images, + parallel_downloads=1, + verify_checksums=False, + http_username="user", + http_password="pass", + ) + + req = mock_urlopen_fn.call_args[0][0] + assert req.get_header("Authorization").startswith("Basic ") + + @patch("productmd.localize.urllib.request.urlopen") + def test_localize_compose_passes_token(self, mock_urlopen_fn, tmp_path): + """Test that localize_compose passes token to _download_https.""" + mock_urlopen_fn.return_value = _mock_urlopen(b"x" * 512) + images = _create_images_v2() + + localize_compose( + output_dir=str(tmp_path), images=images, parallel_downloads=1, verify_checksums=False, http_token="my-bearer-token" + ) + + req = mock_urlopen_fn.call_args[0][0] + assert req.get_header("Authorization") == "Bearer my-bearer-token" + + @patch("productmd.localize.urllib.request.urlopen") + def test_parallel_downloads_send_auth(self, mock_urlopen_fn, tmp_path): + """Test that auth headers are sent when using parallel downloads.""" + mock_urlopen_fn.return_value = _mock_urlopen(b"x" * 512) + images = _create_images_v2() + + localize_compose( + output_dir=str(tmp_path), + images=images, + parallel_downloads=2, + verify_checksums=False, + http_username="admin", + http_password="secret", + ) + + for call in mock_urlopen_fn.call_args_list: + req = call[0][0] + assert req.get_header("Authorization").startswith("Basic ") + + +# --------------------------------------------------------------------------- +# Tests: Safe redirect handler +# --------------------------------------------------------------------------- + + +class TestSafeRedirectHandler: + """Tests for _SafeRedirectHandler cross-origin auth stripping.""" + + def _redirect(self, handler, orig_url, new_url, auth_header=None): + """Helper to invoke redirect_request with minimal boilerplate.""" + req = urllib.request.Request(orig_url) + if auth_header: + req.add_header("Authorization", auth_header) + headers = http.client.HTTPMessage() + fp = io.BytesIO(b"") + return handler.redirect_request(req, fp, 302, "Found", headers, new_url) + + def test_strips_auth_on_cross_origin_redirect(self): + """Test that Authorization is removed when redirecting to a different host.""" + handler = _SafeRedirectHandler() + new_req = self._redirect( + handler, "https://pulp.example.com/repo/file.rpm", "https://cdn.example.com/cached-file.rpm", auth_header="Basic abc123" + ) + assert new_req is not None + assert new_req.get_header("Authorization") is None + + def test_keeps_auth_on_same_origin_redirect(self): + """Test that Authorization is preserved when redirecting to the same origin.""" + handler = _SafeRedirectHandler() + new_req = self._redirect( + handler, "https://pulp.example.com/repo/file.rpm", "https://pulp.example.com/different/path.rpm", auth_header="Basic abc123" + ) + assert new_req is not None + assert new_req.get_header("Authorization") == "Basic abc123" + + def test_strips_auth_on_scheme_downgrade(self): + """Test that Authorization is removed on HTTPS -> HTTP downgrade.""" + handler = _SafeRedirectHandler() + new_req = self._redirect( + handler, "https://pulp.example.com/file.rpm", "http://pulp.example.com/file.rpm", auth_header="Bearer my-token" + ) + assert new_req is not None + assert new_req.get_header("Authorization") is None + + def test_strips_auth_on_port_change(self): + """Test that Authorization is removed when the port changes.""" + handler = _SafeRedirectHandler() + new_req = self._redirect( + handler, "https://pulp.example.com:443/file.rpm", "https://pulp.example.com:8443/file.rpm", auth_header="Basic abc123" + ) + assert new_req is not None + assert new_req.get_header("Authorization") is None + + def test_no_auth_header_no_error(self): + """Test that redirects without Authorization header work fine.""" + handler = _SafeRedirectHandler() + new_req = self._redirect(handler, "https://pulp.example.com/file.rpm", "https://cdn.example.com/file.rpm") + assert new_req is not None + assert new_req.get_header("Authorization") is None + + def test_keeps_auth_on_default_port_redirect(self): + """Test that https://host -> https://host:443 is treated as same origin.""" + handler = _SafeRedirectHandler() + new_req = self._redirect( + handler, "https://pulp.example.com/file.rpm", "https://pulp.example.com:443/file.rpm", auth_header="Basic abc123" + ) + assert new_req is not None + assert new_req.get_header("Authorization") == "Basic abc123" + + def test_keeps_auth_on_default_http_port_redirect(self): + """Test that http://host -> http://host:80 is treated as same origin.""" + handler = _SafeRedirectHandler() + new_req = self._redirect( + handler, "http://pulp.example.com/file.rpm", "http://pulp.example.com:80/file.rpm", auth_header="Bearer my-token" + ) + assert new_req is not None + assert new_req.get_header("Authorization") == "Bearer my-token" + + +# --------------------------------------------------------------------------- +# Tests: Credential validation +# --------------------------------------------------------------------------- + + +class TestValidateCredential: + """Tests for _validate_credential header injection prevention.""" + + def test_valid_credential(self): + """Normal credential values pass through unchanged.""" + assert _validate_credential("admin", "username") == "admin" + assert _validate_credential("secret123", "password") == "secret123" + assert _validate_credential("eyJhbGciOiJSUzI1NiIs", "token") == "eyJhbGciOiJSUzI1NiIs" + + @pytest.mark.parametrize( + "value", + [ + "token\nevil", + "token\revil", + "token\r\nevil-header: injected", + ], + ) + def test_rejects_newline_characters(self, value): + """Credentials with CR, LF, or CRLF are rejected.""" + with pytest.raises(ValueError, match="newline"): + _validate_credential(value, "token") + + @pytest.mark.parametrize( + "kwargs", + [ + {"token": "bad\ntoken"}, + {"username": "bad\nuser", "password": "pass"}, + {"username": "user", "password": "bad\npass"}, + ], + ) + def test_build_auth_header_rejects_bad_credentials(self, kwargs): + """_build_auth_header raises on any credential with newlines.""" + with pytest.raises(ValueError, match="newline"): + _build_auth_header("https://example.com", **kwargs) + + +# --------------------------------------------------------------------------- +# Tests: Library-level auth validation +# --------------------------------------------------------------------------- + + +class TestLocalizeComposeAuthValidation: + """Tests for auth parameter validation in localize_compose.""" + + def test_username_without_password_raises(self): + """localize_compose raises ValueError when username is given without password.""" + with pytest.raises(ValueError, match="must be provided together"): + localize_compose(output_dir="/tmp/test", http_username="admin") + + def test_password_without_username_raises(self): + """localize_compose raises ValueError when password is given without username.""" + with pytest.raises(ValueError, match="must be provided together"): + localize_compose(output_dir="/tmp/test", http_password="secret") + + def test_token_with_username_raises(self): + """localize_compose raises ValueError when token and username are both given.""" + with pytest.raises(ValueError, match="mutually exclusive"): + localize_compose(output_dir="/tmp/test", http_token="my-token", http_username="admin", http_password="secret") diff --git a/tests/test_localize_oci.py b/tests/test_localize_oci.py new file mode 100644 index 0000000..8b7f767 --- /dev/null +++ b/tests/test_localize_oci.py @@ -0,0 +1,497 @@ +"""Tests for OCI-specific localization logic (skip, download, parallel).""" + +import io +import os +from unittest.mock import MagicMock, patch + +from productmd.images import Image, Images +from productmd.localize import ( + OciTask, + _download_single_oci, + _should_skip_oci, + localize_compose, +) +from productmd.location import FileEntry, Location +from productmd.version import VERSION_2_0 + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_image(parent, path, size, checksum_hex): + img = Image(parent) + img.path = path + img.mtime = 1738627200 + img.size = size + img.volume_id = "Test-1.0" + img.type = "dvd" + img.format = "iso" + img.arch = "x86_64" + img.disc_number = 1 + img.disc_count = 1 + img.checksums = {"sha256": checksum_hex} + img.subvariant = "Server" + return img + + +def _mock_urlopen(content=b"fake file content", status=200, content_length=None): + """Create a mock response for urllib.request.urlopen.""" + response = MagicMock() + data = io.BytesIO(content) + response.read = data.read + if content_length is None: + content_length = str(len(content)) + response.headers = {"Content-Length": content_length} + response.status = status + return response + + +# --------------------------------------------------------------------------- +# Tests: OCI-specific skip logic +# --------------------------------------------------------------------------- + + +class TestShouldSkipOci: + """Tests for the _should_skip_oci function.""" + + def test_skip_oci_contents_all_exist(self, tmp_path): + """Test skip when all content files exist with valid checksums.""" + from productmd.location import compute_checksum as cc + + dest_dir = str(tmp_path / "images") + os.makedirs(os.path.join(dest_dir, "pxeboot")) + + vmlinuz = os.path.join(dest_dir, "pxeboot", "vmlinuz") + with open(vmlinuz, "wb") as f: + f.write(b"kernel content") + checksum = cc(vmlinuz, "sha256") + + contents = [ + FileEntry(file="pxeboot/vmlinuz", size=14, checksum=checksum, layer_digest="sha256:" + "a" * 64), + ] + task = OciTask( + oci_url="oci://registry/repo@sha256:" + "b" * 64, + dest_dir=dest_dir, + contents=contents, + location=Location(url="oci://registry/repo@sha256:" + "b" * 64, local_path="images"), + rel_path="images/pxeboot/vmlinuz", + ) + + assert _should_skip_oci(task, verify_checksums=True) + + def test_no_skip_oci_contents_missing_file(self, tmp_path): + """Test no skip when a content file is missing.""" + dest_dir = str(tmp_path / "images") + os.makedirs(dest_dir) + + contents = [ + FileEntry(file="pxeboot/vmlinuz", size=14, checksum=None, layer_digest="sha256:" + "a" * 64), + ] + task = OciTask( + oci_url="oci://registry/repo@sha256:" + "b" * 64, + dest_dir=dest_dir, + contents=contents, + location=Location(url="oci://registry/repo@sha256:" + "b" * 64, local_path="images"), + rel_path="images/pxeboot/vmlinuz", + ) + + assert not _should_skip_oci(task, verify_checksums=True) + + def test_no_skip_oci_contents_bad_checksum(self, tmp_path): + """Test no skip when a content file has wrong checksum.""" + dest_dir = str(tmp_path / "images") + os.makedirs(os.path.join(dest_dir, "pxeboot")) + + vmlinuz = os.path.join(dest_dir, "pxeboot", "vmlinuz") + with open(vmlinuz, "wb") as f: + f.write(b"kernel content") + + contents = [ + FileEntry( + file="pxeboot/vmlinuz", + size=14, + checksum="sha256:" + "f" * 64, # wrong + layer_digest="sha256:" + "a" * 64, + ), + ] + task = OciTask( + oci_url="oci://registry/repo@sha256:" + "b" * 64, + dest_dir=dest_dir, + contents=contents, + location=Location(url="oci://registry/repo@sha256:" + "b" * 64, local_path="images"), + rel_path="images/pxeboot/vmlinuz", + ) + + assert not _should_skip_oci(task, verify_checksums=True) + + def test_skip_oci_contents_no_verify(self, tmp_path): + """Test skip when all content files exist and verify_checksums=False.""" + dest_dir = str(tmp_path / "images") + os.makedirs(os.path.join(dest_dir, "pxeboot")) + + vmlinuz = os.path.join(dest_dir, "pxeboot", "vmlinuz") + with open(vmlinuz, "wb") as f: + f.write(b"kernel content") + + contents = [ + FileEntry( + file="pxeboot/vmlinuz", + size=14, + checksum="sha256:" + "f" * 64, # wrong but not checked + layer_digest="sha256:" + "a" * 64, + ), + ] + task = OciTask( + oci_url="oci://registry/repo@sha256:" + "b" * 64, + dest_dir=dest_dir, + contents=contents, + location=Location(url="oci://registry/repo@sha256:" + "b" * 64, local_path="images"), + rel_path="images/pxeboot/vmlinuz", + ) + + assert _should_skip_oci(task, verify_checksums=False) + + def test_skip_oci_simple_existing_file(self, tmp_path): + """Test skip for simple OCI (no contents) when file exists.""" + dest_dir = str(tmp_path / "compose") + os.makedirs(dest_dir) + file_path = os.path.join(dest_dir, "output.iso") + with open(file_path, "wb") as f: + f.write(b"iso content") + + task = OciTask( + oci_url="oci://registry/repo@sha256:" + "b" * 64, + dest_dir=dest_dir, + contents=[], + location=Location(url="oci://registry/repo@sha256:" + "b" * 64, local_path="output.iso"), + rel_path="output.iso", + ) + + assert _should_skip_oci(task, verify_checksums=False) + + +# --------------------------------------------------------------------------- +# Tests: OCI localize integration +# --------------------------------------------------------------------------- + + +class TestOciLocalizeIntegration: + """Integration-style tests for OCI localization within localize_compose.""" + + @patch("productmd.oci.get_downloader") + @patch("productmd.oci.HAS_ORAS", True) + def test_oci_download_error_fail_fast(self, mock_get_downloader, tmp_path): + """Test that OCI download errors respect fail_fast.""" + mock_downloader = MagicMock() + mock_downloader.download_and_extract.side_effect = RuntimeError("registry unreachable") + mock_get_downloader.return_value = mock_downloader + + im = Images() + im.header.version = "2.0" + im.compose.id = "Test-1.0-20260204.0" + im.compose.type = "production" + im.compose.date = "20260204" + im.compose.respin = 0 + im.output_version = VERSION_2_0 + + img = _make_image(im, "Server/x86_64/iso/boot.iso", 512, "a" * 64) + img.location = Location( + url="oci://quay.io/fedora/server:41-x86_64@sha256:" + "a" * 64, + size=512, + checksum="sha256:" + "a" * 64, + local_path="Server/x86_64/iso/boot.iso", + ) + im.add("Server", "x86_64", img) + + result = localize_compose( + output_dir=str(tmp_path / "output"), + images=im, + fail_fast=True, + verify_checksums=False, + ) + + assert result.failed == 1 + assert result.downloaded == 0 + assert len(result.errors) == 1 + assert "registry unreachable" in str(result.errors[0][1]) + + @patch("productmd.oci.get_downloader") + @patch("productmd.oci.HAS_ORAS", True) + def test_oci_progress_events(self, mock_get_downloader, tmp_path): + """Test that OCI downloads emit progress events.""" + mock_downloader = MagicMock() + mock_get_downloader.return_value = mock_downloader + + im = Images() + im.header.version = "2.0" + im.compose.id = "Test-1.0-20260204.0" + im.compose.type = "production" + im.compose.date = "20260204" + im.compose.respin = 0 + im.output_version = VERSION_2_0 + + img = _make_image(im, "Server/x86_64/iso/boot.iso", 512, "a" * 64) + img.location = Location( + url="oci://quay.io/fedora/server:41-x86_64@sha256:" + "a" * 64, + size=512, + checksum="sha256:" + "a" * 64, + local_path="Server/x86_64/iso/boot.iso", + ) + im.add("Server", "x86_64", img) + + events = [] + localize_compose( + output_dir=str(tmp_path / "output"), + images=im, + verify_checksums=False, + progress_callback=events.append, + ) + + event_types = [e.event_type for e in events] + assert "start" in event_types + assert "complete" in event_types + + @patch("productmd.oci.get_downloader") + @patch("productmd.oci.HAS_ORAS", True) + @patch("productmd.localize.urllib.request.urlopen") + def test_mixed_http_and_oci_downloads(self, mock_urlopen, mock_get_downloader, tmp_path): + """Test that a compose with both HTTP and OCI URLs works.""" + mock_urlopen.return_value = _mock_urlopen(b"http content") + mock_downloader = MagicMock() + mock_get_downloader.return_value = mock_downloader + + im = Images() + im.header.version = "2.0" + im.compose.id = "Test-1.0-20260204.0" + im.compose.type = "production" + im.compose.date = "20260204" + im.compose.respin = 0 + im.output_version = VERSION_2_0 + + img_http = _make_image(im, "Server/x86_64/iso/boot.iso", 512, "a" * 64) + img_http.location = Location( + url="https://cdn.example.com/Server/x86_64/iso/boot.iso", + size=512, + checksum="sha256:" + "a" * 64, + local_path="Server/x86_64/iso/boot.iso", + ) + im.add("Server", "x86_64", img_http) + + img_oci = _make_image(im, "Server/aarch64/iso/boot.iso", 1024, "b" * 64) + img_oci.arch = "aarch64" + img_oci.location = Location( + url="oci://quay.io/fedora/server:41-aarch64@sha256:" + "b" * 64, + size=1024, + checksum="sha256:" + "b" * 64, + local_path="Server/aarch64/iso/boot.iso", + ) + im.add("Server", "aarch64", img_oci) + + result = localize_compose( + output_dir=str(tmp_path / "output"), + images=im, + parallel_downloads=1, + verify_checksums=False, + retries=0, + ) + + assert result.downloaded == 2 + assert result.failed == 0 + mock_urlopen.assert_called_once() + mock_downloader.download_and_extract.assert_called_once() + + +# --------------------------------------------------------------------------- +# Tests: Parallel OCI downloads +# --------------------------------------------------------------------------- + + +class TestOciParallelDownloads: + """Tests for parallel OCI download execution.""" + + @patch("productmd.oci.get_downloader") + @patch("productmd.oci.HAS_ORAS", True) + def test_parallel_multiple_oci_tasks(self, mock_get_downloader, tmp_path): + """Test that multiple OCI tasks are all downloaded in parallel mode.""" + mock_downloader = MagicMock() + mock_get_downloader.return_value = mock_downloader + + im = Images() + im.header.version = "2.0" + im.compose.id = "Test-1.0-20260204.0" + im.compose.type = "production" + im.compose.date = "20260204" + im.compose.respin = 0 + im.output_version = VERSION_2_0 + + for arch, hex_char in [("x86_64", "a"), ("aarch64", "b"), ("s390x", "c")]: + img = _make_image(im, f"Server/{arch}/iso/boot.iso", 512, hex_char * 64) + img.arch = arch + img.location = Location( + url=f"oci://quay.io/fedora/server:41-{arch}@sha256:" + hex_char * 64, + size=512, + checksum="sha256:" + hex_char * 64, + local_path=f"Server/{arch}/iso/boot.iso", + ) + im.add("Server", arch, img) + + result = localize_compose( + output_dir=str(tmp_path / "output"), + images=im, + parallel_downloads=3, + verify_checksums=False, + ) + + assert result.downloaded == 3 + assert result.failed == 0 + assert mock_get_downloader.call_count == 3 + assert mock_downloader.download_and_extract.call_count == 3 + + @patch("productmd.oci.get_downloader") + @patch("productmd.oci.HAS_ORAS", True) + def test_parallel_oci_fail_fast_stops(self, mock_get_downloader, tmp_path): + """Test that fail_fast cancels remaining parallel OCI tasks on error.""" + mock_downloader = MagicMock() + mock_downloader.download_and_extract.side_effect = RuntimeError("pull failed") + mock_get_downloader.return_value = mock_downloader + + im = Images() + im.header.version = "2.0" + im.compose.id = "Test-1.0-20260204.0" + im.compose.type = "production" + im.compose.date = "20260204" + im.compose.respin = 0 + im.output_version = VERSION_2_0 + + for arch, hex_char in [("x86_64", "a"), ("aarch64", "b"), ("s390x", "c")]: + img = _make_image(im, f"Server/{arch}/iso/boot.iso", 512, hex_char * 64) + img.arch = arch + img.location = Location( + url=f"oci://quay.io/fedora/server:41-{arch}@sha256:" + hex_char * 64, + size=512, + checksum="sha256:" + hex_char * 64, + local_path=f"Server/{arch}/iso/boot.iso", + ) + im.add("Server", arch, img) + + result = localize_compose( + output_dir=str(tmp_path / "output"), + images=im, + parallel_downloads=2, + fail_fast=True, + verify_checksums=False, + ) + + assert result.downloaded == 0 + assert result.failed >= 1 + assert len(result.errors) >= 1 + + @patch("productmd.oci.get_downloader") + @patch("productmd.oci.HAS_ORAS", True) + def test_parallel_oci_no_fail_fast_continues(self, mock_get_downloader, tmp_path): + """Test that fail_fast=False continues after OCI errors in parallel mode.""" + call_count = 0 + + def side_effect(**kwargs): + nonlocal call_count + call_count += 1 + if call_count == 1: + raise RuntimeError("first pull failed") + + mock_downloader = MagicMock() + mock_downloader.download_and_extract.side_effect = side_effect + mock_get_downloader.return_value = mock_downloader + + im = Images() + im.header.version = "2.0" + im.compose.id = "Test-1.0-20260204.0" + im.compose.type = "production" + im.compose.date = "20260204" + im.compose.respin = 0 + im.output_version = VERSION_2_0 + + for arch, hex_char in [("x86_64", "a"), ("aarch64", "b")]: + img = _make_image(im, f"Server/{arch}/iso/boot.iso", 512, hex_char * 64) + img.arch = arch + img.location = Location( + url=f"oci://quay.io/fedora/server:41-{arch}@sha256:" + hex_char * 64, + size=512, + checksum="sha256:" + hex_char * 64, + local_path=f"Server/{arch}/iso/boot.iso", + ) + im.add("Server", arch, img) + + result = localize_compose( + output_dir=str(tmp_path / "output"), + images=im, + parallel_downloads=2, + fail_fast=False, + verify_checksums=False, + ) + + assert result.downloaded == 1 + assert result.failed == 1 + assert len(result.errors) == 1 + + @patch("productmd.oci.get_downloader") + @patch("productmd.oci.HAS_ORAS", True) + def test_sequential_oci_with_parallel_1(self, mock_get_downloader, tmp_path): + """Test that parallel_downloads=1 uses sequential OCI path.""" + mock_downloader = MagicMock() + mock_get_downloader.return_value = mock_downloader + + im = Images() + im.header.version = "2.0" + im.compose.id = "Test-1.0-20260204.0" + im.compose.type = "production" + im.compose.date = "20260204" + im.compose.respin = 0 + im.output_version = VERSION_2_0 + + for arch, hex_char in [("x86_64", "a"), ("aarch64", "b")]: + img = _make_image(im, f"Server/{arch}/iso/boot.iso", 512, hex_char * 64) + img.arch = arch + img.location = Location( + url=f"oci://quay.io/fedora/server:41-{arch}@sha256:" + hex_char * 64, + size=512, + checksum="sha256:" + hex_char * 64, + local_path=f"Server/{arch}/iso/boot.iso", + ) + im.add("Server", arch, img) + + result = localize_compose( + output_dir=str(tmp_path / "output"), + images=im, + parallel_downloads=1, + verify_checksums=False, + ) + + assert result.downloaded == 2 + assert result.failed == 0 + assert mock_get_downloader.call_count == 2 + + @patch("productmd.oci.get_downloader") + @patch("productmd.oci.HAS_ORAS", True) + def test_download_single_oci_creates_own_downloader(self, mock_get_downloader, tmp_path): + """Test that _download_single_oci creates a fresh downloader per call.""" + mock_downloader = MagicMock() + mock_get_downloader.return_value = mock_downloader + + task = OciTask( + oci_url="oci://registry/repo@sha256:" + "a" * 64, + dest_dir=str(tmp_path / "out"), + contents=[], + location=Location( + url="oci://registry/repo@sha256:" + "a" * 64, + size=100, + local_path="out", + ), + rel_path="out/file", + ) + + _download_single_oci(task, progress_callback=None) + _download_single_oci(task, progress_callback=None) + + assert mock_get_downloader.call_count == 2 From 186d40ee47c90cb368a7d8303be02a78c4e28cee Mon Sep 17 00:00:00 2001 From: guillermodotn Date: Tue, 24 Mar 2026 15:40:34 +0100 Subject: [PATCH 12/15] docs: update README and docstring for three-file split --- tests/README.md | 4 +++- tests/test_localize.py | 7 ++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/tests/README.md b/tests/README.md index 3705abd..33341cb 100644 --- a/tests/README.md +++ b/tests/README.md @@ -30,7 +30,9 @@ tests/ │ ├── test_version.py # Version constants and utilities │ ├── test_location.py # Location, FileEntry, checksum utilities │ ├── test_convert.py # Conversion utilities (upgrade/downgrade/iter) -│ ├── test_localize.py # Localization tool (download, verify, skip) +│ ├── test_localize.py # Localization: core download, skip, compose, dedup +│ ├── test_localize_oci.py # Localization: OCI skip, integration, parallel +│ ├── test_localize_auth.py # Localization: HTTP auth (netrc, Basic, Bearer, redirect) │ ├── test_oci.py # OCI download utilities (requires oras-py) │ └── test_cli.py # CLI subcommands and progress display │ diff --git a/tests/test_localize.py b/tests/test_localize.py index 0f4fbfb..671ac85 100644 --- a/tests/test_localize.py +++ b/tests/test_localize.py @@ -1,4 +1,9 @@ -"""Tests for the core localization tool (download, skip, compose, dedup).""" +"""Tests for the core localization tool (download, skip, compose, dedup). + +See also: +- test_localize_oci.py -- OCI-specific skip, integration, and parallel tests +- test_localize_auth.py -- HTTP authentication (netrc, Basic, Bearer, redirect) +""" import io import os From 6a5c8885ad9fed59b0f854ea8bde0c7f3622ba0c Mon Sep 17 00:00:00 2001 From: guillermodotn Date: Wed, 25 Mar 2026 11:14:31 +0100 Subject: [PATCH 13/15] fix: scope _SafeRedirectHandler to localize downloads only Assisted-by: Claude Opus --- productmd/localize.py | 7 +-- tests/test_cli.py | 12 ++--- tests/test_localize.py | 92 ++++++++++++++++++------------------- tests/test_localize_auth.py | 60 ++++++++++++------------ tests/test_localize_oci.py | 12 ++--- 5 files changed, 90 insertions(+), 93 deletions(-) diff --git a/productmd/localize.py b/productmd/localize.py index cdb1941..0317e8e 100644 --- a/productmd/localize.py +++ b/productmd/localize.py @@ -218,10 +218,7 @@ def redirect_request(self, req, fp, code, msg, headers, newurl): return new_req -# Install the safe redirect handler globally so that all urlopen() calls -# in this module automatically strip Authorization on cross-origin redirects. -urllib.request.install_opener(urllib.request.build_opener(_SafeRedirectHandler)) - +_opener = urllib.request.build_opener(_SafeRedirectHandler) #: Default chunk size for streaming downloads (8 KB) _CHUNK_SIZE = 8192 @@ -291,7 +288,7 @@ def _download_https( for attempt in range(retries + 1): try: req = urllib.request.Request(url, headers=headers) - response = urllib.request.urlopen(req) + response = _opener.open(req) content_length = response.headers.get("Content-Length") total = int(content_length) if content_length else None diff --git a/tests/test_cli.py b/tests/test_cli.py index 3eed32c..ea1802c 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -402,8 +402,8 @@ def test_downgrade_url_rejected(self): class TestLocalizeCommand: """Tests for the productmd localize subcommand.""" - @patch("productmd.localize.urllib.request.urlopen") - def test_basic_localize(self, mock_urlopen, tmp_path): + @patch("productmd.localize._opener.open") + def test_basic_localize(self, mock_open, tmp_path): """Test basic localization downloads files.""" import io from unittest.mock import MagicMock @@ -411,7 +411,7 @@ def test_basic_localize(self, mock_urlopen, tmp_path): response = MagicMock() response.read = io.BytesIO(b"iso content").read response.headers = {"Content-Length": "11"} - mock_urlopen.return_value = response + mock_open.return_value = response compose_dir = str(tmp_path / "compose") _create_v20_compose(compose_dir) @@ -431,8 +431,8 @@ def test_basic_localize(self, mock_urlopen, tmp_path): assert exit_code == 0 - @patch("productmd.localize.urllib.request.urlopen") - def test_localize_with_compose_dir(self, mock_urlopen, tmp_path): + @patch("productmd.localize._opener.open") + def test_localize_with_compose_dir(self, mock_open, tmp_path): """Test localization using a compose directory as input.""" import io from unittest.mock import MagicMock @@ -440,7 +440,7 @@ def test_localize_with_compose_dir(self, mock_urlopen, tmp_path): response = MagicMock() response.read = io.BytesIO(b"iso content").read response.headers = {"Content-Length": "11"} - mock_urlopen.return_value = response + mock_open.return_value = response compose_dir = str(tmp_path / "compose") _create_v20_compose(compose_dir) diff --git a/tests/test_localize.py b/tests/test_localize.py index 671ac85..d793b2d 100644 --- a/tests/test_localize.py +++ b/tests/test_localize.py @@ -112,8 +112,8 @@ def _create_images_v1(): return im -def _mock_urlopen(content=b"fake file content", status=200, content_length=None): - """Create a mock response for urllib.request.urlopen.""" +def _mock_response(content=b"fake file content", status=200, content_length=None): + """Create a mock HTTP response for _opener.open().""" response = MagicMock() data = io.BytesIO(content) response.read = data.read @@ -132,10 +132,10 @@ def _mock_urlopen(content=b"fake file content", status=200, content_length=None) class TestDownloadHttps: """Tests for the _download_https function.""" - @patch("productmd.localize.urllib.request.urlopen") - def test_download_creates_file(self, mock_urlopen, tmp_path): + @patch("productmd.localize._opener.open") + def test_download_creates_file(self, mock_open, tmp_path): """Test that download creates the file at the correct path.""" - mock_urlopen.return_value = _mock_urlopen(b"hello world") + mock_open.return_value = _mock_response(b"hello world") dest = str(tmp_path / "output" / "test.txt") _download_https("https://example.com/test.txt", dest, retries=0) @@ -144,20 +144,20 @@ def test_download_creates_file(self, mock_urlopen, tmp_path): with open(dest, "rb") as f: assert f.read() == b"hello world" - @patch("productmd.localize.urllib.request.urlopen") - def test_download_creates_parent_dirs(self, mock_urlopen, tmp_path): + @patch("productmd.localize._opener.open") + def test_download_creates_parent_dirs(self, mock_open, tmp_path): """Test that parent directories are created automatically.""" - mock_urlopen.return_value = _mock_urlopen(b"data") + mock_open.return_value = _mock_response(b"data") dest = str(tmp_path / "a" / "b" / "c" / "file.rpm") _download_https("https://example.com/file.rpm", dest, retries=0) assert os.path.isfile(dest) - @patch("productmd.localize.urllib.request.urlopen") - def test_download_atomic_rename(self, mock_urlopen, tmp_path): + @patch("productmd.localize._opener.open") + def test_download_atomic_rename(self, mock_open, tmp_path): """Test that .tmp file is renamed to final name (no partial files).""" - mock_urlopen.return_value = _mock_urlopen(b"content") + mock_open.return_value = _mock_response(b"content") dest = str(tmp_path / "final.iso") _download_https("https://example.com/final.iso", dest, retries=0) @@ -165,16 +165,16 @@ def test_download_atomic_rename(self, mock_urlopen, tmp_path): assert os.path.isfile(dest) assert not os.path.exists(dest + ".tmp") - @patch("productmd.localize.urllib.request.urlopen") + @patch("productmd.localize._opener.open") @patch("productmd.localize.time.sleep") - def test_download_retry_on_failure(self, mock_sleep, mock_urlopen, tmp_path): + def test_download_retry_on_failure(self, mock_sleep, mock_open, tmp_path): """Test that download retries on failure with exponential backoff.""" from urllib.error import URLError - mock_urlopen.side_effect = [ + mock_open.side_effect = [ URLError("connection refused"), URLError("timeout"), - _mock_urlopen(b"success"), + _mock_response(b"success"), ] dest = str(tmp_path / "retried.txt") @@ -185,11 +185,11 @@ def test_download_retry_on_failure(self, mock_sleep, mock_urlopen, tmp_path): assert f.read() == b"success" assert mock_sleep.call_count == 2 - @patch("productmd.localize.urllib.request.urlopen") - def test_download_calls_progress_callback(self, mock_urlopen, tmp_path): + @patch("productmd.localize._opener.open") + def test_download_calls_progress_callback(self, mock_open, tmp_path): """Test that progress callback receives correct events.""" content = b"x" * 100 - mock_urlopen.return_value = _mock_urlopen(content) + mock_open.return_value = _mock_response(content) dest = str(tmp_path / "progress.bin") events = [] @@ -215,12 +215,12 @@ def test_download_calls_progress_callback(self, mock_urlopen, tmp_path): complete = events[-1] assert complete.bytes_downloaded == 100 - @patch("productmd.localize.urllib.request.urlopen") - def test_download_all_retries_exhausted(self, mock_urlopen, tmp_path): + @patch("productmd.localize._opener.open") + def test_download_all_retries_exhausted(self, mock_open, tmp_path): """Test that exception is raised when all retries are exhausted.""" from urllib.error import URLError - mock_urlopen.side_effect = URLError("permanent failure") + mock_open.side_effect = URLError("permanent failure") dest = str(tmp_path / "fail.txt") with pytest.raises(URLError): @@ -283,10 +283,10 @@ def test_file_exists_no_checksum_available(self, tmp_path): class TestLocalizeCompose: """Tests for the localize_compose function.""" - @patch("productmd.localize.urllib.request.urlopen") - def test_basic_localize(self, mock_urlopen, tmp_path): + @patch("productmd.localize._opener.open") + def test_basic_localize(self, mock_open, tmp_path): """Test basic localization downloads files and writes v1.2 metadata.""" - mock_urlopen.return_value = _mock_urlopen(b"iso content") + mock_open.return_value = _mock_response(b"iso content") output_dir = str(tmp_path / "compose-output") im = _create_images_v2() @@ -309,8 +309,8 @@ def test_basic_localize(self, mock_urlopen, tmp_path): metadata_dir = os.path.join(output_dir, "compose", "metadata") assert os.path.isfile(os.path.join(metadata_dir, "images.json")) - @patch("productmd.localize.urllib.request.urlopen") - def test_skip_existing_with_valid_checksum(self, mock_urlopen, tmp_path): + @patch("productmd.localize._opener.open") + def test_skip_existing_with_valid_checksum(self, mock_open, tmp_path): """Test that existing files with valid checksum are skipped.""" output_dir = str(tmp_path / "compose-output") @@ -345,14 +345,14 @@ def test_skip_existing_with_valid_checksum(self, mock_urlopen, tmp_path): assert result.skipped == 1 assert result.downloaded == 0 # urlopen should not have been called - mock_urlopen.assert_not_called() + mock_open.assert_not_called() - @patch("productmd.localize.urllib.request.urlopen") + @patch("productmd.localize._opener.open") @patch("productmd.localize._should_skip") - def test_skip_existing_wrong_checksum_redownloads(self, mock_skip, mock_urlopen, tmp_path): + def test_skip_existing_wrong_checksum_redownloads(self, mock_skip, mock_open, tmp_path): """Test that files with wrong checksum are re-downloaded when skip returns False.""" mock_skip.return_value = False # Simulate checksum mismatch - mock_urlopen.return_value = _mock_urlopen(b"correct content") + mock_open.return_value = _mock_response(b"correct content") im = _create_images_v2() result = localize_compose( @@ -475,12 +475,12 @@ def test_oci_url_without_oras_raises_runtime_error(self, tmp_path): images=im, ) - @patch("productmd.localize.urllib.request.urlopen") - def test_fail_fast_stops_on_error(self, mock_urlopen, tmp_path): + @patch("productmd.localize._opener.open") + def test_fail_fast_stops_on_error(self, mock_open, tmp_path): """Test that fail_fast=True stops on first download failure.""" from urllib.error import URLError - mock_urlopen.side_effect = URLError("connection refused") + mock_open.side_effect = URLError("connection refused") im = _create_images_v2() rpms = _create_rpms_v2() @@ -498,15 +498,15 @@ def test_fail_fast_stops_on_error(self, mock_urlopen, tmp_path): # Should not have attempted all downloads assert result.downloaded == 0 - @patch("productmd.localize.urllib.request.urlopen") - def test_fail_fast_false_continues(self, mock_urlopen, tmp_path): + @patch("productmd.localize._opener.open") + def test_fail_fast_false_continues(self, mock_open, tmp_path): """Test that fail_fast=False continues after errors.""" from urllib.error import URLError # First call fails, second succeeds - mock_urlopen.side_effect = [ + mock_open.side_effect = [ URLError("connection refused"), - _mock_urlopen(b"rpm content"), + _mock_response(b"rpm content"), ] im = _create_images_v2() @@ -540,10 +540,10 @@ def test_skip_non_remote_locations(self, tmp_path): assert result.skipped == 0 assert result.failed == 0 - @patch("productmd.localize.urllib.request.urlopen") - def test_localize_result_counts(self, mock_urlopen, tmp_path): + @patch("productmd.localize._opener.open") + def test_localize_result_counts(self, mock_open, tmp_path): """Test that LocalizeResult has correct counts.""" - mock_urlopen.return_value = _mock_urlopen(b"data") + mock_open.return_value = _mock_response(b"data") im = _create_images_v2() result = localize_compose( @@ -560,10 +560,10 @@ def test_localize_result_counts(self, mock_urlopen, tmp_path): assert result.failed == 0 assert result.errors == [] - @patch("productmd.localize.urllib.request.urlopen") - def test_progress_callback_receives_events(self, mock_urlopen, tmp_path): + @patch("productmd.localize._opener.open") + def test_progress_callback_receives_events(self, mock_open, tmp_path): """Test that progress callback receives download events.""" - mock_urlopen.return_value = _mock_urlopen(b"file data") + mock_open.return_value = _mock_response(b"file data") im = _create_images_v2() events = [] @@ -581,12 +581,12 @@ def test_progress_callback_receives_events(self, mock_urlopen, tmp_path): assert "start" in event_types assert "complete" in event_types - @patch("productmd.localize.urllib.request.urlopen") - def test_metadata_downgraded_to_v12(self, mock_urlopen, tmp_path): + @patch("productmd.localize._opener.open") + def test_metadata_downgraded_to_v12(self, mock_open, tmp_path): """Test that output metadata is written in v1.2 format.""" import json - mock_urlopen.return_value = _mock_urlopen(b"content") + mock_open.return_value = _mock_response(b"content") im = _create_images_v2() output_dir = str(tmp_path / "output") diff --git a/tests/test_localize_auth.py b/tests/test_localize_auth.py index ebbe7e1..672100d 100644 --- a/tests/test_localize_auth.py +++ b/tests/test_localize_auth.py @@ -61,8 +61,8 @@ def _create_images_v2(): return im -def _mock_urlopen(content=b"fake file content", status=200, content_length=None): - """Create a mock response for urllib.request.urlopen.""" +def _mock_response(content=b"fake file content", status=200, content_length=None): + """Create a mock HTTP response for _opener.open().""" response = MagicMock() data = io.BytesIO(content) response.read = data.read @@ -253,56 +253,56 @@ def test_no_credentials_returns_none(self): class TestDownloadHttpsAuth: """Tests for authentication in _download_https.""" - @patch("productmd.localize.urllib.request.urlopen") - def test_download_sends_basic_auth_header(self, mock_urlopen_fn, tmp_path): + @patch("productmd.localize._opener.open") + def test_download_sends_basic_auth_header(self, mock_open, tmp_path): """Test that explicit credentials add Authorization header to request.""" - mock_urlopen_fn.return_value = _mock_urlopen(b"content") + mock_open.return_value = _mock_response(b"content") dest = str(tmp_path / "file.rpm") _download_https("https://pulp.example.com/file.rpm", dest, retries=0, username="admin", password="secret") - req = mock_urlopen_fn.call_args[0][0] + req = mock_open.call_args[0][0] assert req.get_header("Authorization").startswith("Basic ") - @patch("productmd.localize.urllib.request.urlopen") - def test_download_sends_bearer_token_header(self, mock_urlopen_fn, tmp_path): + @patch("productmd.localize._opener.open") + def test_download_sends_bearer_token_header(self, mock_open, tmp_path): """Test that a token adds a Bearer Authorization header.""" - mock_urlopen_fn.return_value = _mock_urlopen(b"content") + mock_open.return_value = _mock_response(b"content") dest = str(tmp_path / "file.rpm") _download_https("https://pulp.example.com/file.rpm", dest, retries=0, token="jwt-token-here") - req = mock_urlopen_fn.call_args[0][0] + req = mock_open.call_args[0][0] assert req.get_header("Authorization") == "Bearer jwt-token-here" - @patch("productmd.localize.urllib.request.urlopen") - def test_download_sends_netrc_auth_header(self, mock_urlopen_fn, tmp_path): + @patch("productmd.localize._opener.open") + def test_download_sends_netrc_auth_header(self, mock_open, tmp_path): """Test that netrc credentials add Authorization header.""" - mock_urlopen_fn.return_value = _mock_urlopen(b"content") + mock_open.return_value = _mock_response(b"content") netrc_file = tmp_path / ".netrc" netrc_file.write_text("machine pulp.example.com\nlogin admin\npassword secret\n") dest = str(tmp_path / "file.rpm") _download_https("https://pulp.example.com/file.rpm", dest, retries=0, netrc_file=str(netrc_file)) - req = mock_urlopen_fn.call_args[0][0] + req = mock_open.call_args[0][0] assert req.get_header("Authorization").startswith("Basic ") - @patch("productmd.localize.urllib.request.urlopen") - def test_download_no_auth_by_default(self, mock_urlopen_fn, tmp_path): + @patch("productmd.localize._opener.open") + def test_download_no_auth_by_default(self, mock_open, tmp_path): """Test that no Authorization header is set when no auth is configured.""" - mock_urlopen_fn.return_value = _mock_urlopen(b"content") + mock_open.return_value = _mock_response(b"content") dest = str(tmp_path / "file.rpm") _download_https("https://pulp.example.com/file.rpm", dest, retries=0, netrc_file="/nonexistent/.netrc") - req = mock_urlopen_fn.call_args[0][0] + req = mock_open.call_args[0][0] assert req.get_header("Authorization") is None - @patch("productmd.localize.urllib.request.urlopen") - def test_localize_compose_passes_auth_credentials(self, mock_urlopen_fn, tmp_path): + @patch("productmd.localize._opener.open") + def test_localize_compose_passes_auth_credentials(self, mock_open, tmp_path): """Test that localize_compose passes auth params to _download_https.""" - mock_urlopen_fn.return_value = _mock_urlopen(b"x" * 512) + mock_open.return_value = _mock_response(b"x" * 512) images = _create_images_v2() localize_compose( @@ -314,26 +314,26 @@ def test_localize_compose_passes_auth_credentials(self, mock_urlopen_fn, tmp_pat http_password="pass", ) - req = mock_urlopen_fn.call_args[0][0] + req = mock_open.call_args[0][0] assert req.get_header("Authorization").startswith("Basic ") - @patch("productmd.localize.urllib.request.urlopen") - def test_localize_compose_passes_token(self, mock_urlopen_fn, tmp_path): + @patch("productmd.localize._opener.open") + def test_localize_compose_passes_token(self, mock_open, tmp_path): """Test that localize_compose passes token to _download_https.""" - mock_urlopen_fn.return_value = _mock_urlopen(b"x" * 512) + mock_open.return_value = _mock_response(b"x" * 512) images = _create_images_v2() localize_compose( output_dir=str(tmp_path), images=images, parallel_downloads=1, verify_checksums=False, http_token="my-bearer-token" ) - req = mock_urlopen_fn.call_args[0][0] + req = mock_open.call_args[0][0] assert req.get_header("Authorization") == "Bearer my-bearer-token" - @patch("productmd.localize.urllib.request.urlopen") - def test_parallel_downloads_send_auth(self, mock_urlopen_fn, tmp_path): + @patch("productmd.localize._opener.open") + def test_parallel_downloads_send_auth(self, mock_open, tmp_path): """Test that auth headers are sent when using parallel downloads.""" - mock_urlopen_fn.return_value = _mock_urlopen(b"x" * 512) + mock_open.return_value = _mock_response(b"x" * 512) images = _create_images_v2() localize_compose( @@ -345,7 +345,7 @@ def test_parallel_downloads_send_auth(self, mock_urlopen_fn, tmp_path): http_password="secret", ) - for call in mock_urlopen_fn.call_args_list: + for call in mock_open.call_args_list: req = call[0][0] assert req.get_header("Authorization").startswith("Basic ") diff --git a/tests/test_localize_oci.py b/tests/test_localize_oci.py index 8b7f767..a92a85f 100644 --- a/tests/test_localize_oci.py +++ b/tests/test_localize_oci.py @@ -36,8 +36,8 @@ def _make_image(parent, path, size, checksum_hex): return img -def _mock_urlopen(content=b"fake file content", status=200, content_length=None): - """Create a mock response for urllib.request.urlopen.""" +def _mock_response(content=b"fake file content", status=200, content_length=None): + """Create a mock HTTP response for _opener.open().""" response = MagicMock() data = io.BytesIO(content) response.read = data.read @@ -255,10 +255,10 @@ def test_oci_progress_events(self, mock_get_downloader, tmp_path): @patch("productmd.oci.get_downloader") @patch("productmd.oci.HAS_ORAS", True) - @patch("productmd.localize.urllib.request.urlopen") - def test_mixed_http_and_oci_downloads(self, mock_urlopen, mock_get_downloader, tmp_path): + @patch("productmd.localize._opener.open") + def test_mixed_http_and_oci_downloads(self, mock_open, mock_get_downloader, tmp_path): """Test that a compose with both HTTP and OCI URLs works.""" - mock_urlopen.return_value = _mock_urlopen(b"http content") + mock_open.return_value = _mock_response(b"http content") mock_downloader = MagicMock() mock_get_downloader.return_value = mock_downloader @@ -299,7 +299,7 @@ def test_mixed_http_and_oci_downloads(self, mock_urlopen, mock_get_downloader, t assert result.downloaded == 2 assert result.failed == 0 - mock_urlopen.assert_called_once() + mock_open.assert_called_once() mock_downloader.download_and_extract.assert_called_once() From 823b1691acc64709cf31374615411c0db51d15d3 Mon Sep 17 00:00:00 2001 From: guillermodotn Date: Wed, 25 Mar 2026 11:27:33 +0100 Subject: [PATCH 14/15] fix: skip retries on 401/403 auth errors --- productmd/localize.py | 3 +++ tests/test_localize.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/productmd/localize.py b/productmd/localize.py index 0317e8e..62a5adb 100644 --- a/productmd/localize.py +++ b/productmd/localize.py @@ -313,6 +313,9 @@ def _download_https( except (HTTPError, URLError, OSError) as e: last_error = e logger.warning("Download attempt %d/%d failed for %s: %s", attempt + 1, retries + 1, url, e) + # Auth errors won't be fixed by retrying + if isinstance(e, HTTPError) and e.code in (401, 403): + raise # Clean up partial temp file if os.path.exists(tmp_path): try: diff --git a/tests/test_localize.py b/tests/test_localize.py index d793b2d..1728b6d 100644 --- a/tests/test_localize.py +++ b/tests/test_localize.py @@ -226,6 +226,34 @@ def test_download_all_retries_exhausted(self, mock_open, tmp_path): with pytest.raises(URLError): _download_https("https://example.com/fail.txt", dest, retries=1) + @patch("productmd.localize._opener.open") + def test_download_no_retry_on_401(self, mock_open, tmp_path): + """Test that 401 errors are raised immediately without retrying.""" + from urllib.error import HTTPError + + mock_open.side_effect = HTTPError("https://example.com/file.rpm", 401, "Unauthorized", {}, None) + dest = str(tmp_path / "auth_fail.txt") + + with pytest.raises(HTTPError) as exc_info: + _download_https("https://example.com/file.rpm", dest, retries=3) + + assert exc_info.value.code == 401 + mock_open.assert_called_once() + + @patch("productmd.localize._opener.open") + def test_download_no_retry_on_403(self, mock_open, tmp_path): + """Test that 403 errors are raised immediately without retrying.""" + from urllib.error import HTTPError + + mock_open.side_effect = HTTPError("https://example.com/file.rpm", 403, "Forbidden", {}, None) + dest = str(tmp_path / "forbidden.txt") + + with pytest.raises(HTTPError) as exc_info: + _download_https("https://example.com/file.rpm", dest, retries=3) + + assert exc_info.value.code == 403 + mock_open.assert_called_once() + # --------------------------------------------------------------------------- # Tests: _should_skip From c1e3ba883dac865d4c202cbe690e13f400dd28b2 Mon Sep 17 00:00:00 2001 From: guillermodotn Date: Wed, 25 Mar 2026 13:12:50 +0100 Subject: [PATCH 15/15] fix: use env vars for password and token instead of CLI flags --- doc/cli-localize.rst | 45 +++++++++++++++++---------------------- productmd/cli/localize.py | 37 +++++++++++++------------------- 2 files changed, 35 insertions(+), 47 deletions(-) diff --git a/doc/cli-localize.rst b/doc/cli-localize.rst index 55daa60..3c8cf4f 100644 --- a/doc/cli-localize.rst +++ b/doc/cli-localize.rst @@ -4,7 +4,7 @@ productmd-localize Synopsis -------- -**productmd localize** **--output** *DIR* [**--parallel-downloads** *N*] [**--no-verify-checksums**] [**--skip-existing**] [**--retries** *N*] [**--no-fail-fast**] [**--http-token** *TOKEN*] [**--http-username** *USER* **--http-password** *PASS*] [**--netrc-file** *PATH*] *input* +**productmd localize** **--output** *DIR* [**--parallel-downloads** *N*] [**--no-verify-checksums**] [**--skip-existing**] [**--retries** *N*] [**--no-fail-fast**] [**--http-username** *USER*] [**--netrc-file** *PATH*] *input* Description ----------- @@ -63,21 +63,26 @@ HTTP downloads support authentication for accessing protected content servers (e.g. Pulp). Three mechanisms are available, resolved in the following precedence order (highest first): -1. **Bearer token** — ``--http-token`` or ``$PRODUCTMD_HTTP_TOKEN`` -2. **Basic credentials** — ``--http-username`` + ``--http-password`` - (or ``$PRODUCTMD_HTTP_PASSWORD``) +1. **Bearer token** — ``$PRODUCTMD_HTTP_TOKEN`` +2. **Basic credentials** — ``--http-username`` + ``$PRODUCTMD_HTTP_PASSWORD`` 3. **Netrc** — automatic lookup from ``~/.netrc`` (or ``--netrc-file``) Only one of Bearer token or Basic credentials may be specified. -Providing both ``--http-token`` and ``--http-username``/``--http-password`` -is an error. Both ``--http-username`` and ``--http-password`` must be -provided together. +Setting ``$PRODUCTMD_HTTP_TOKEN`` together with +``--http-username``/``$PRODUCTMD_HTTP_PASSWORD`` is an error. .. note:: - When using ``--http-token`` or ``--http-username``/``--http-password``, - the credentials are sent to **all** HTTP download hosts referenced in - the compose metadata. If your compose references multiple hosts, use + Sensitive credentials (password and token) are provided exclusively + via environment variables to avoid leaking them in shell history or + process listings. + +.. note:: + + When using ``$PRODUCTMD_HTTP_TOKEN`` or + ``--http-username``/``$PRODUCTMD_HTTP_PASSWORD``, the credentials + are sent to **all** HTTP download hosts referenced in the compose + metadata. If your compose references multiple hosts, use ``~/.netrc`` instead — it resolves credentials per hostname automatically. @@ -85,21 +90,11 @@ When no explicit credentials are given, the tool automatically checks ``~/.netrc`` for entries matching the download URL hostname. This is transparent and requires no CLI flags. -**--http-token** *TOKEN* - Bearer token for HTTP authentication. Sent as - ``Authorization: Bearer ``. Useful for Keycloak/OAuth2-based - Pulp deployments. Can also be set via the ``PRODUCTMD_HTTP_TOKEN`` - environment variable. Mutually exclusive with - ``--http-username``/``--http-password``. - **--http-username** *USER* - Username for HTTP Basic authentication. Must be combined with - ``--http-password``. Takes precedence over netrc credentials. - -**--http-password** *PASS* - Password for HTTP Basic authentication. Can also be set via the - ``PRODUCTMD_HTTP_PASSWORD`` environment variable to avoid exposing - the password in shell history. + Username for HTTP Basic authentication. Password must be set via + the ``PRODUCTMD_HTTP_PASSWORD`` environment variable. Takes + precedence over netrc credentials. For Bearer token auth, set + ``PRODUCTMD_HTTP_TOKEN`` instead. **--netrc-file** *PATH* Path to a netrc file for credential lookup. Default: ``~/.netrc``. @@ -178,9 +173,9 @@ Continue after failures:: Download with a Bearer token:: + export PRODUCTMD_HTTP_TOKEN=eyJhbGciOiJSUzI1NiIs... productmd localize \ --output /mnt/local \ - --http-token eyJhbGciOiJSUzI1NiIs... \ images.json Download with Basic auth (password via env var):: diff --git a/productmd/cli/localize.py b/productmd/cli/localize.py index 4ef6a57..c854bb6 100644 --- a/productmd/cli/localize.py +++ b/productmd/cli/localize.py @@ -77,21 +77,11 @@ def register(subparsers: object) -> None: parser.add_argument( "--http-username", default=None, - help=("Username for HTTP Basic authentication. Takes precedence over netrc credentials."), - ) - parser.add_argument( - "--http-password", - default=os.environ.get("PRODUCTMD_HTTP_PASSWORD"), - help=("Password for HTTP Basic authentication. Can also be set via PRODUCTMD_HTTP_PASSWORD env var."), - ) - parser.add_argument( - "--http-token", - default=os.environ.get("PRODUCTMD_HTTP_TOKEN"), help=( - "Bearer token for HTTP authentication. " - "Takes precedence over Basic credentials and netrc. " - "Mutually exclusive with --http-username/--http-password. " - "Can also be set via PRODUCTMD_HTTP_TOKEN env var." + "Username for HTTP Basic authentication. " + "Password must be set via PRODUCTMD_HTTP_PASSWORD env var. " + "Takes precedence over netrc credentials. " + "For Bearer token auth, set PRODUCTMD_HTTP_TOKEN env var instead." ), ) add_input_args(parser) @@ -109,16 +99,19 @@ def run(args: object) -> None: print_error(f"No metadata found at {args.input}") sys.exit(1) - if args.http_token and (args.http_username or args.http_password): - print_error("--http-token is mutually exclusive with --http-username/--http-password") + http_password = os.environ.get("PRODUCTMD_HTTP_PASSWORD") + http_token = os.environ.get("PRODUCTMD_HTTP_TOKEN") + + if http_token and (args.http_username or http_password): + print_error("PRODUCTMD_HTTP_TOKEN is mutually exclusive with --http-username/PRODUCTMD_HTTP_PASSWORD") sys.exit(2) - if args.http_username and not args.http_password: - print_error("--http-username requires --http-password") + if args.http_username and not http_password: + print_error("--http-username requires PRODUCTMD_HTTP_PASSWORD env var") sys.exit(2) - if args.http_password and not args.http_username: - print_error("--http-password requires --http-username") + if http_password and not args.http_username: + print_error("PRODUCTMD_HTTP_PASSWORD requires --http-username") sys.exit(2) progress_callback, cleanup = make_progress_callback(parallel=args.parallel_downloads) @@ -134,8 +127,8 @@ def run(args: object) -> None: progress_callback=progress_callback, netrc_file=args.netrc_file, http_username=args.http_username, - http_password=args.http_password, - http_token=args.http_token, + http_password=http_password, + http_token=http_token, **metadata, ) finally: