Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions dev/breeze/src/airflow_breeze/commands/kubernetes_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,97 @@ def _upload_k8s_image(python: str, kubernetes_version: str, output: Output | Non
return kind_load_result.returncode, f"Uploaded K8S image to {cluster_name}"


# Test-suite container images that Airflow's K8s system tests pull from Docker
# Hub. Tagged (not `:latest`) so kubelet's default imagePullPolicy is
# IfNotPresent — combined with `kind load` below, this means kubelet uses the
# already-loaded image and never reaches out to Docker Hub. The pin protects
# CI runs from Docker Hub anonymous-pull rate limits, which intermittently
# turn the scheduled K8s test job red. Auto-bumped by
# scripts/ci/prek/upgrade_important_versions.py.
K8S_TEST_IMAGES_TO_PRELOAD: tuple[str, ...] = (
"alpine:3.23", # xcom_sidecar default in providers/cncf/kubernetes
"busybox:1.37", # busybox-based system tests in kubernetes-tests/
"ubuntu:24.04", # ubuntu-based system tests in kubernetes-tests/
)


def _docker_pull_with_429_retry(image: str, output: Output | None, max_attempts: int = 5) -> int:
"""Run `docker pull <image>` retrying with exponential backoff on Docker Hub 429s.

Returns the final docker exit code (0 on success). Non-429 failures fail
fast — only the rate-limit pattern is retried, since for everything else
retrying would just amplify a real error.
"""
import time

delay = 5
for attempt in range(1, max_attempts + 1):
result = run_command(
["docker", "pull", image],
check=False,
output=output,
capture_output=True,
text=True,
)
if result.returncode == 0:
return 0
stderr = (result.stderr or "") + (result.stdout or "")
rate_limited = "429" in stderr or "Too Many Requests" in stderr or "toomanyrequests" in stderr
if not rate_limited:
get_console(output=output).print(
f"[error]docker pull {image} failed (non-rate-limit): {stderr.strip()[:500]}"
)
return result.returncode
if attempt == max_attempts:
get_console(output=output).print(
f"[error]docker pull {image} hit Docker Hub 429 on every {max_attempts} attempts; giving up."
)
return result.returncode
get_console(output=output).print(
f"[warning]docker pull {image} hit Docker Hub 429 "
f"(attempt {attempt}/{max_attempts}); sleeping {delay}s before retry."
)
time.sleep(delay)
delay *= 2
return 1


def _preload_test_images_to_kind(
python: str,
kubernetes_version: str,
output: Output | None,
) -> tuple[int, str]:
"""Pre-pull and `kind load` the pinned test-suite images.

See K8S_TEST_IMAGES_TO_PRELOAD for the list and rationale. Each image is
pulled once on the host (with retry-on-429), then loaded into every kind
node. Pods that reference these images then start without kubelet ever
reaching out to Docker Hub.
"""
cluster_name = get_kind_cluster_name(python=python, kubernetes_version=kubernetes_version)
for image in K8S_TEST_IMAGES_TO_PRELOAD:
get_console(output=output).print(
f"[info]Pre-pulling test image {image} for kind cluster {cluster_name}"
)
pull_rc = _docker_pull_with_429_retry(image, output=output)
if pull_rc != 0:
return pull_rc, f"docker pull {image} failed"
get_console(output=output).print(f"[info]Loading {image} into kind cluster {cluster_name}")
kind_load_result = run_command_with_k8s_env(
["kind", "load", "docker-image", "--name", cluster_name, image],
python=python,
output=output,
kubernetes_version=kubernetes_version,
check=False,
)
if kind_load_result.returncode != 0:
get_console(output=output).print(
f"[error]kind load docker-image {image} into {cluster_name} failed."
)
return kind_load_result.returncode, f"kind load {image} failed"
return 0, f"Pre-loaded {len(K8S_TEST_IMAGES_TO_PRELOAD)} test images into {cluster_name}"


@kubernetes_group.command(
name="build-k8s-image",
help="Build k8s-ready airflow image (optionally all images in parallel).",
Expand Down Expand Up @@ -2043,6 +2134,16 @@ def _run_complete_tests(
returncode, message = _upload_k8s_image(
python=python, kubernetes_version=kubernetes_version, output=output
)
if returncode != 0:
_logs(python=python, kubernetes_version=kubernetes_version)
return returncode, message
get_console(output=output).print(
f"\n[info]Pre-loading pinned test images into kind cluster for "
f"Python {python}, Kubernetes {kubernetes_version}\n"
)
returncode, message = _preload_test_images_to_kind(
python=python, kubernetes_version=kubernetes_version, output=output
)
if returncode != 0:
_logs(python=python, kubernetes_version=kubernetes_version)
return returncode, message
Expand Down
2 changes: 1 addition & 1 deletion dev/registry/extract_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
try:
import tomllib # Python 3.11+ stdlib
except ModuleNotFoundError: # pragma: no cover -- Python 3.10 fallback
import tomli as tomllib
import tomli as tomllib # type: ignore[no-redef]

import yaml
from registry_contract_models import validate_providers_catalog
Expand Down
2 changes: 1 addition & 1 deletion dev/registry/extract_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
try:
import tomllib # Python 3.11+ stdlib
except ModuleNotFoundError: # pragma: no cover -- Python 3.10 fallback
import tomli as tomllib
import tomli as tomllib # type: ignore[no-redef]
from registry_contract_models import validate_provider_version_metadata

try:
Expand Down
1 change: 1 addition & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,7 @@ krb
Kube
kube
kubeconfig
kubelet
Kubernetes
kubernetes
KubernetesPodOperator
Expand Down
Loading
Loading