diff --git a/providers/cncf/kubernetes/docs/operators.rst b/providers/cncf/kubernetes/docs/operators.rst index a3f8e417237e8..2d8cb827e4345 100644 --- a/providers/cncf/kubernetes/docs/operators.rst +++ b/providers/cncf/kubernetes/docs/operators.rst @@ -653,6 +653,20 @@ spark_job_template.json An alternative method, apart from using YAML or JSON files, is to directly pass the ``template_spec`` field instead of application_file if you prefer not to employ a file for configuration. +How does XCom work? +^^^^^^^^^^^^^^^^^^^ + +When ``do_xcom_push=True``, :class:`~airflow.providers.cncf.kubernetes.operators.spark_kubernetes.SparkKubernetesOperator` +injects an XCom ``emptyDir`` volume, a driver ``volumeMount`` at ``/airflow/xcom``, and an +``airflow-xcom-sidecar`` sidecar into the SparkApplication spec before submission. + +.. important:: + + Do **not** define the XCom volume (for example ``spec.volumes`` with name ``xcom``), + a driver ``volumeMounts`` entry for ``/airflow/xcom``, or an ``airflow-xcom-sidecar`` + entry under ``driver.sidecars`` in your SparkApplication YAML or ``template_spec``. + The operator adds these resources when ``do_xcom_push=True``. Defining them yourself + duplicates the mount path and Kubernetes rejects the driver pod. Reference ^^^^^^^^^ diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py index d10cee73e5c1b..08a007858a989 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py @@ -31,6 +31,7 @@ from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator from airflow.providers.cncf.kubernetes.pod_generator import MAX_LABEL_LEN, PodGenerator from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager, PodPhase +from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import add_sidecar_to_spark_operator_pod_spec from airflow.providers.common.compat.sdk import AirflowException from airflow.utils.helpers import prune_dict @@ -230,7 +231,7 @@ def _try_numbers_match(self, context, pod) -> bool: pod_try_number = pod.metadata.labels.get(task_context_labels.get("try_number", ""), "") return str(task_instance.try_number) == str(pod_try_number) - @property + @cached_property def template_body(self): """Templated body for CustomObjectLauncher.""" return self.manage_template_specs() @@ -349,6 +350,16 @@ def get_or_create_spark_crd(self, launcher: CustomObjectLauncher, context) -> k8 ) return driver_pod + def _apply_xcom_sidecar_to_template(self, template_body: dict) -> None: + if self.do_xcom_push: + self.log.debug("Adding xcom sidecar to driver pod spec in task %s", self.task_id) + driver_template = template_body["spark"]["spec"] + template_body["spark"]["spec"] = add_sidecar_to_spark_operator_pod_spec( + driver_template, + sidecar_container_image=self.hook.get_xcom_sidecar_container_image(), + sidecar_container_resources=self.hook.get_xcom_sidecar_container_resources(), + ) + def execute(self, context: Context): self.name = self.create_job_name() @@ -407,6 +418,8 @@ def _setup_spark_configuration(self, context: Context): env_list.append({"name": "SPARK_APPLICATION_NAME", "value": self.name}) self.log.info("Creating sparkApplication.") + self._apply_xcom_sidecar_to_template(template_body) + self.launcher = CustomObjectLauncher( name=self.name, namespace=self.namespace, diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py index aab28673ad147..522893b8640b5 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py @@ -19,8 +19,9 @@ from __future__ import annotations import copy +from typing import Any -from kubernetes.client import models as k8s +from kubernetes.client import ApiClient, models as k8s # Pinned alpine version for the xcom sidecar default. Pinning (rather than # using the implicit `:latest`) makes kubelet's default imagePullPolicy @@ -37,8 +38,10 @@ class PodDefaults: XCOM_MOUNT_PATH = "/airflow/xcom" SIDECAR_CONTAINER_NAME = "airflow-xcom-sidecar" XCOM_CMD = 'trap "exit 0" INT; while true; do sleep 1; done;' - VOLUME_MOUNT = k8s.V1VolumeMount(name="xcom", mount_path=XCOM_MOUNT_PATH) - VOLUME = k8s.V1Volume(name="xcom", empty_dir=k8s.V1EmptyDirVolumeSource()) + VOLUME_MOUNT_NAME = "xcom" + VOLUME_MOUNT = k8s.V1VolumeMount(name=VOLUME_MOUNT_NAME, mount_path=XCOM_MOUNT_PATH) + XCOM_SIDECAR_COMMAND = ["sh", "-c", XCOM_CMD] + VOLUME = k8s.V1Volume(name=VOLUME_MOUNT_NAME, empty_dir=k8s.V1EmptyDirVolumeSource()) SIDECAR_CONTAINER = k8s.V1Container( name=SIDECAR_CONTAINER_NAME, command=["sh", "-c", XCOM_CMD], @@ -57,11 +60,11 @@ def add_xcom_sidecar( pod: k8s.V1Pod, *, sidecar_container_image: str | None = None, - sidecar_container_resources: k8s.V1ResourceRequirements | dict | None = None, + sidecar_container_resources: k8s.V1ResourceRequirements | dict[str, Any] | None = None, ) -> k8s.V1Pod: """Add sidecar.""" pod_cp = copy.deepcopy(pod) - pod_cp.spec.volumes = pod.spec.volumes or [] + pod_cp.spec.volumes = pod_cp.spec.volumes or [] pod_cp.spec.volumes.insert(0, PodDefaults.VOLUME) pod_cp.spec.containers[0].volume_mounts = pod_cp.spec.containers[0].volume_mounts or [] pod_cp.spec.containers[0].volume_mounts.insert(0, PodDefaults.VOLUME_MOUNT) @@ -72,3 +75,29 @@ def add_xcom_sidecar( pod_cp.spec.containers.append(sidecar) return pod_cp + + +def add_sidecar_to_spark_operator_pod_spec( + spec: dict[str, Any], + sidecar_container_image: str | None = None, + sidecar_container_resources: k8s.V1ResourceRequirements | dict[str, Any] | None = None, +) -> dict[str, Any]: + """Add xcom sidecar to a SparkApplication driver spec dict.""" + # The Spark Operator expects a custom SparkApplication object, which is different from the standard Kubernetes Pod model + driver_template = copy.deepcopy(spec) + xcom_volume_mount = ApiClient().sanitize_for_serialization(PodDefaults.VOLUME_MOUNT) + driver_template["volumes"] = driver_template.get("volumes") or [] + driver_template["volumes"].insert(0, PodDefaults.VOLUME.to_dict()) + driver_template["driver"]["volumeMounts"] = driver_template["driver"].get("volumeMounts") or [] + driver_template["driver"]["volumeMounts"].insert(0, xcom_volume_mount) + sidecar = { + "name": PodDefaults.SIDECAR_CONTAINER_NAME, + "command": PodDefaults.XCOM_SIDECAR_COMMAND, + "image": sidecar_container_image or PodDefaults.SIDECAR_CONTAINER.image, + "volumeMounts": [xcom_volume_mount], + } + if sidecar_container_resources: + sidecar["resources"] = sidecar_container_resources + driver_template["driver"]["sidecars"] = driver_template["driver"].get("sidecars") or [] + driver_template["driver"]["sidecars"].append(sidecar) + return driver_template diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py index 9a2f739266d46..eb7516032d5b1 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py @@ -38,6 +38,7 @@ from airflow.providers.cncf.kubernetes.pod_generator import MAX_LABEL_LEN from airflow.providers.cncf.kubernetes.triggers.pod import KubernetesPodTrigger from airflow.providers.cncf.kubernetes.utils.pod_manager import PodPhase +from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import PodDefaults from airflow.providers.common.compat.sdk import TaskDeferred from airflow.utils import timezone from airflow.utils.types import DagRunType @@ -1652,3 +1653,80 @@ def test_spark_application_name_env_not_duplicated(self, mock_client): app_name_envs = [e for e in env if e["name"] == "SPARK_APPLICATION_NAME"] assert len(app_name_envs) == 1 # not duplicated assert app_name_envs[0]["value"] == "user-defined" + + @patch("airflow.providers.cncf.kubernetes.operators.spark_kubernetes.SparkKubernetesOperator.client") + @patch("airflow.providers.cncf.kubernetes.operators.spark_kubernetes.KubernetesHook") + def test_setup_spark_configuration_injects_xcom_sidecar_when_do_xcom_push( + self, mock_hook_cls, mock_client + ): + xcom_sidecar_resources = { + "requests": {"cpu": "1m", "memory": "10Mi"}, + "limits": {"cpu": "10m", "memory": "50Mi"}, + } + mock_hook_cls.return_value.get_xcom_sidecar_container_image.return_value = "custom.repo/alpine:3.16" + mock_hook_cls.return_value.get_xcom_sidecar_container_resources.return_value = xcom_sidecar_resources + op = SparkKubernetesOperator( + task_id="test_task", + namespace="default", + template_spec={ + "apiVersion": "sparkoperator.k8s.io/v1beta2", + "kind": "SparkApplication", + "spec": { + "driver": {}, + "executor": {}, + }, + }, + do_xcom_push=True, + reattach_on_restart=False, + ) + op.name = "my-spark-app" + + with mock.patch.object(op, "get_or_create_spark_crd", return_value=mock.MagicMock()): + op._setup_spark_configuration(mock.MagicMock()) + + spec = op.launcher.body["spec"] + assert spec["volumes"] == [PodDefaults.VOLUME.to_dict()] + assert spec["driver"]["volumeMounts"] == [ + {"name": PodDefaults.VOLUME_MOUNT_NAME, "mountPath": PodDefaults.XCOM_MOUNT_PATH} + ] + sidecars = spec["driver"]["sidecars"] + assert len(sidecars) == 1 + assert sidecars[0]["name"] == PodDefaults.SIDECAR_CONTAINER_NAME + assert sidecars[0]["image"] == "custom.repo/alpine:3.16" + assert sidecars[0]["resources"] == xcom_sidecar_resources + mock_hook_cls.return_value.get_xcom_sidecar_container_image.assert_called_once() + mock_hook_cls.return_value.get_xcom_sidecar_container_resources.assert_called_once() + + @patch("airflow.providers.cncf.kubernetes.operators.spark_kubernetes.SparkKubernetesOperator.client") + @patch("airflow.providers.cncf.kubernetes.operators.spark_kubernetes.KubernetesHook") + def test_setup_spark_configuration_skips_xcom_sidecar_when_do_xcom_push_false( + self, mock_hook_cls, mock_client + ): + original_volumes = [{"name": "config", "emptyDir": {}}] + original_mounts = [{"name": "config", "mountPath": "/config"}] + op = SparkKubernetesOperator( + task_id="test_task", + namespace="default", + template_spec={ + "apiVersion": "sparkoperator.k8s.io/v1beta2", + "kind": "SparkApplication", + "spec": { + "volumes": copy.deepcopy(original_volumes), + "driver": {"volumeMounts": copy.deepcopy(original_mounts)}, + "executor": {}, + }, + }, + do_xcom_push=False, + reattach_on_restart=False, + ) + op.name = "my-spark-app" + + with mock.patch.object(op, "get_or_create_spark_crd", return_value=mock.MagicMock()): + op._setup_spark_configuration(mock.MagicMock()) + + spec = op.launcher.body["spec"] + assert spec["volumes"] == original_volumes + assert spec["driver"]["volumeMounts"] == original_mounts + assert "sidecars" not in spec["driver"] + mock_hook_cls.return_value.get_xcom_sidecar_container_image.assert_not_called() + mock_hook_cls.return_value.get_xcom_sidecar_container_resources.assert_not_called() diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_xcom_sidecar.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_xcom_sidecar.py new file mode 100644 index 0000000000000..1df33bf1bac98 --- /dev/null +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_xcom_sidecar.py @@ -0,0 +1,132 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import Any + +import pytest +from kubernetes.client import ApiClient, models as k8s + +from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import ( + PodDefaults, + add_sidecar_to_spark_operator_pod_spec, +) + +EXISTING_VOLUME_NAME = "config" +EXISTING_MOUNT_PATH = "/config" +XCOM_MOUNT = ApiClient().sanitize_for_serialization(PodDefaults.VOLUME_MOUNT) + + +def _make_spark_spec( + *, + volumes: list[dict[str, Any]] | None = None, + driver_volume_mounts: list[dict[str, Any]] | None = None, + driver_sidecars: list[dict[str, Any]] | None = None, +) -> dict[str, Any]: + spec: dict[str, Any] = { + "type": "Python", + "driver": {"cores": 1, "memory": "512m"}, + "executor": {"instances": 1, "cores": 1, "memory": "512m"}, + } + if volumes is not None: + spec["volumes"] = volumes + if driver_volume_mounts is not None: + spec["driver"]["volumeMounts"] = driver_volume_mounts + if driver_sidecars is not None: + spec["driver"]["sidecars"] = driver_sidecars + return spec + + +def test_add_sidecar_to_spark_operator_pod_spec_prepends_xcom_before_existing_volumes_and_mounts(): + existing_sidecar = {"name": "other-sidecar", "image": "busybox"} + existing_volume = {"name": EXISTING_VOLUME_NAME, "emptyDir": {}} + existing_mount = {"name": EXISTING_VOLUME_NAME, "mountPath": EXISTING_MOUNT_PATH} + spec = _make_spark_spec( + volumes=[existing_volume], + driver_volume_mounts=[existing_mount], + driver_sidecars=[existing_sidecar], + ) + + result = add_sidecar_to_spark_operator_pod_spec(spec) + + assert len(result["volumes"]) == len(spec["volumes"]) + 1 + assert result["volumes"][0] == PodDefaults.VOLUME.to_dict() + assert result["volumes"][1:] == spec["volumes"] + assert len(result["driver"]["volumeMounts"]) == len(spec["driver"]["volumeMounts"]) + 1 + assert result["driver"]["volumeMounts"][0] == XCOM_MOUNT + assert result["driver"]["volumeMounts"][1:] == spec["driver"]["volumeMounts"] + assert len(result["driver"]["sidecars"]) == len(spec["driver"]["sidecars"]) + 1 + assert result["driver"]["sidecars"][:-1] == spec["driver"]["sidecars"] + assert result["driver"]["sidecars"][-1]["name"] == PodDefaults.SIDECAR_CONTAINER_NAME + + +def test_add_sidecar_to_spark_operator_pod_spec_wires_shared_volume_between_driver_and_sidecar(): + existing_volume = {"name": EXISTING_VOLUME_NAME, "emptyDir": {}} + existing_mount = {"name": EXISTING_VOLUME_NAME, "mountPath": EXISTING_MOUNT_PATH} + spec = _make_spark_spec( + volumes=[existing_volume], + driver_volume_mounts=[existing_mount], + ) + + result = add_sidecar_to_spark_operator_pod_spec(spec) + + xcom_volume_name = result["volumes"][0]["name"] + assert xcom_volume_name == PodDefaults.VOLUME_MOUNT_NAME + assert result["driver"]["volumeMounts"][0]["name"] == xcom_volume_name + assert result["driver"]["sidecars"][-1]["volumeMounts"][0]["name"] == xcom_volume_name + assert result["driver"]["volumeMounts"][0]["mountPath"] == PodDefaults.XCOM_MOUNT_PATH + assert result["driver"]["sidecars"][-1]["volumeMounts"][0]["mountPath"] == PodDefaults.XCOM_MOUNT_PATH + + +@pytest.mark.parametrize( + ("sidecar_container_image", "expected_image"), + [ + ("private.repo/alpine:3.16", "private.repo/alpine:3.16"), + (None, PodDefaults.SIDECAR_CONTAINER.image), + ], + ids=["custom", "default_when_none"], +) +def test_add_sidecar_to_spark_operator_pod_spec_uses_sidecar_container_image( + sidecar_container_image, expected_image +): + spec = _make_spark_spec() + + result = add_sidecar_to_spark_operator_pod_spec(spec, sidecar_container_image=sidecar_container_image) + + assert result["driver"]["sidecars"][-1]["image"] == expected_image + + +@pytest.mark.parametrize( + "resources", + [ + { + "requests": {"cpu": "1m", "memory": "10Mi"}, + "limits": {"cpu": "10m", "memory": "50Mi"}, + }, + k8s.V1ResourceRequirements( + requests={"cpu": "2m", "memory": "20Mi"}, + limits={"cpu": "20m", "memory": "100Mi"}, + ), + ], + ids=["dict", "v1_resource_requirements"], +) +def test_add_sidecar_to_spark_operator_pod_spec_uses_custom_resources(resources): + spec = _make_spark_spec() + + result = add_sidecar_to_spark_operator_pod_spec(spec, sidecar_container_resources=resources) + + assert result["driver"]["sidecars"][-1]["resources"] == resources