From 99d57b949878cb00f53a23108536c0fad8ed8e7c Mon Sep 17 00:00:00 2001 From: kesem shaked Date: Sun, 21 Jun 2026 02:49:41 +0300 Subject: [PATCH 1/6] fix spark push xcom issue --- providers/cncf/kubernetes/docs/operators.rst | 17 ++ .../kubernetes/operators/spark_kubernetes.py | 15 +- .../cncf/kubernetes/utils/xcom_sidecar.py | 39 +++- .../operators/test_spark_kubernetes.py | 78 ++++++++ .../kubernetes/utils/test_xcom_sidecar.py | 169 ++++++++++++++++++ 5 files changed, 312 insertions(+), 6 deletions(-) create mode 100644 providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_xcom_sidecar.py diff --git a/providers/cncf/kubernetes/docs/operators.rst b/providers/cncf/kubernetes/docs/operators.rst index a3f8e417237e8..7d7d2640ab94c 100644 --- a/providers/cncf/kubernetes/docs/operators.rst +++ b/providers/cncf/kubernetes/docs/operators.rst @@ -362,6 +362,9 @@ For further information, look at: SparkKubernetesOperator ========================== + +.. _howto/operator:SparkKubernetesOperator: + The :class:`~airflow.providers.cncf.kubernetes.operators.spark_kubernetes.SparkKubernetesOperator` allows you to create and run spark job on a Kubernetes cluster. It is based on `spark-on-k8s-operator `__ project. @@ -653,6 +656,20 @@ spark_job_template.json An alternative method, apart from using YAML or JSON files, is to directly pass the ``template_spec`` field instead of application_file if you prefer not to employ a file for configuration. +How does XCom work? +^^^^^^^^^^^^^^^^^^^ + +When ``do_xcom_push=True``, :class:`~airflow.providers.cncf.kubernetes.operators.spark_kubernetes.SparkKubernetesOperator` +injects an XCom ``emptyDir`` volume, a driver ``volumeMount`` at ``/airflow/xcom``, and an +``airflow-xcom-sidecar`` sidecar into the SparkApplication spec before submission. + +.. important:: + + Do **not** define the XCom volume (for example ``spec.volumes`` with name ``xcom``), + a driver ``volumeMounts`` entry for ``/airflow/xcom``, or an ``airflow-xcom-sidecar`` + entry under ``driver.sidecars`` in your SparkApplication YAML or ``template_spec``. + The operator adds these resources when ``do_xcom_push=True``. Defining them yourself + duplicates the mount path and Kubernetes rejects the driver pod. Reference ^^^^^^^^^ diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py index d10cee73e5c1b..0b7993427c764 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py @@ -33,6 +33,7 @@ from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager, PodPhase from airflow.providers.common.compat.sdk import AirflowException from airflow.utils.helpers import prune_dict +from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import add_sidecar_to_spark_operator_pod_spec if TYPE_CHECKING: import jinja2 @@ -230,7 +231,7 @@ def _try_numbers_match(self, context, pod) -> bool: pod_try_number = pod.metadata.labels.get(task_context_labels.get("try_number", ""), "") return str(task_instance.try_number) == str(pod_try_number) - @property + @cached_property def template_body(self): """Templated body for CustomObjectLauncher.""" return self.manage_template_specs() @@ -349,6 +350,16 @@ def get_or_create_spark_crd(self, launcher: CustomObjectLauncher, context) -> k8 ) return driver_pod + def _apply_xcom_sidecar_to_template(self, template_body: dict) -> None: + if self.do_xcom_push: + self.log.debug("Adding xcom sidecar to driver pod spec in task %s", self.task_id) + driver_template = template_body["spark"]["spec"] + template_body["spark"]["spec"] = add_sidecar_to_spark_operator_pod_spec( + driver_template, + sidecar_container_image=self.hook.get_xcom_sidecar_container_image(), + sidecar_container_resources=self.hook.get_xcom_sidecar_container_resources(), + ) + def execute(self, context: Context): self.name = self.create_job_name() @@ -407,6 +418,8 @@ def _setup_spark_configuration(self, context: Context): env_list.append({"name": "SPARK_APPLICATION_NAME", "value": self.name}) self.log.info("Creating sparkApplication.") + self._apply_xcom_sidecar_to_template(template_body) + self.launcher = CustomObjectLauncher( name=self.name, namespace=self.namespace, diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py index aab28673ad147..522893b8640b5 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py @@ -19,8 +19,9 @@ from __future__ import annotations import copy +from typing import Any -from kubernetes.client import models as k8s +from kubernetes.client import ApiClient, models as k8s # Pinned alpine version for the xcom sidecar default. Pinning (rather than # using the implicit `:latest`) makes kubelet's default imagePullPolicy @@ -37,8 +38,10 @@ class PodDefaults: XCOM_MOUNT_PATH = "/airflow/xcom" SIDECAR_CONTAINER_NAME = "airflow-xcom-sidecar" XCOM_CMD = 'trap "exit 0" INT; while true; do sleep 1; done;' - VOLUME_MOUNT = k8s.V1VolumeMount(name="xcom", mount_path=XCOM_MOUNT_PATH) - VOLUME = k8s.V1Volume(name="xcom", empty_dir=k8s.V1EmptyDirVolumeSource()) + VOLUME_MOUNT_NAME = "xcom" + VOLUME_MOUNT = k8s.V1VolumeMount(name=VOLUME_MOUNT_NAME, mount_path=XCOM_MOUNT_PATH) + XCOM_SIDECAR_COMMAND = ["sh", "-c", XCOM_CMD] + VOLUME = k8s.V1Volume(name=VOLUME_MOUNT_NAME, empty_dir=k8s.V1EmptyDirVolumeSource()) SIDECAR_CONTAINER = k8s.V1Container( name=SIDECAR_CONTAINER_NAME, command=["sh", "-c", XCOM_CMD], @@ -57,11 +60,11 @@ def add_xcom_sidecar( pod: k8s.V1Pod, *, sidecar_container_image: str | None = None, - sidecar_container_resources: k8s.V1ResourceRequirements | dict | None = None, + sidecar_container_resources: k8s.V1ResourceRequirements | dict[str, Any] | None = None, ) -> k8s.V1Pod: """Add sidecar.""" pod_cp = copy.deepcopy(pod) - pod_cp.spec.volumes = pod.spec.volumes or [] + pod_cp.spec.volumes = pod_cp.spec.volumes or [] pod_cp.spec.volumes.insert(0, PodDefaults.VOLUME) pod_cp.spec.containers[0].volume_mounts = pod_cp.spec.containers[0].volume_mounts or [] pod_cp.spec.containers[0].volume_mounts.insert(0, PodDefaults.VOLUME_MOUNT) @@ -72,3 +75,29 @@ def add_xcom_sidecar( pod_cp.spec.containers.append(sidecar) return pod_cp + + +def add_sidecar_to_spark_operator_pod_spec( + spec: dict[str, Any], + sidecar_container_image: str | None = None, + sidecar_container_resources: k8s.V1ResourceRequirements | dict[str, Any] | None = None, +) -> dict[str, Any]: + """Add xcom sidecar to a SparkApplication driver spec dict.""" + # The Spark Operator expects a custom SparkApplication object, which is different from the standard Kubernetes Pod model + driver_template = copy.deepcopy(spec) + xcom_volume_mount = ApiClient().sanitize_for_serialization(PodDefaults.VOLUME_MOUNT) + driver_template["volumes"] = driver_template.get("volumes") or [] + driver_template["volumes"].insert(0, PodDefaults.VOLUME.to_dict()) + driver_template["driver"]["volumeMounts"] = driver_template["driver"].get("volumeMounts") or [] + driver_template["driver"]["volumeMounts"].insert(0, xcom_volume_mount) + sidecar = { + "name": PodDefaults.SIDECAR_CONTAINER_NAME, + "command": PodDefaults.XCOM_SIDECAR_COMMAND, + "image": sidecar_container_image or PodDefaults.SIDECAR_CONTAINER.image, + "volumeMounts": [xcom_volume_mount], + } + if sidecar_container_resources: + sidecar["resources"] = sidecar_container_resources + driver_template["driver"]["sidecars"] = driver_template["driver"].get("sidecars") or [] + driver_template["driver"]["sidecars"].append(sidecar) + return driver_template diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py index 9a2f739266d46..eb7516032d5b1 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py @@ -38,6 +38,7 @@ from airflow.providers.cncf.kubernetes.pod_generator import MAX_LABEL_LEN from airflow.providers.cncf.kubernetes.triggers.pod import KubernetesPodTrigger from airflow.providers.cncf.kubernetes.utils.pod_manager import PodPhase +from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import PodDefaults from airflow.providers.common.compat.sdk import TaskDeferred from airflow.utils import timezone from airflow.utils.types import DagRunType @@ -1652,3 +1653,80 @@ def test_spark_application_name_env_not_duplicated(self, mock_client): app_name_envs = [e for e in env if e["name"] == "SPARK_APPLICATION_NAME"] assert len(app_name_envs) == 1 # not duplicated assert app_name_envs[0]["value"] == "user-defined" + + @patch("airflow.providers.cncf.kubernetes.operators.spark_kubernetes.SparkKubernetesOperator.client") + @patch("airflow.providers.cncf.kubernetes.operators.spark_kubernetes.KubernetesHook") + def test_setup_spark_configuration_injects_xcom_sidecar_when_do_xcom_push( + self, mock_hook_cls, mock_client + ): + xcom_sidecar_resources = { + "requests": {"cpu": "1m", "memory": "10Mi"}, + "limits": {"cpu": "10m", "memory": "50Mi"}, + } + mock_hook_cls.return_value.get_xcom_sidecar_container_image.return_value = "custom.repo/alpine:3.16" + mock_hook_cls.return_value.get_xcom_sidecar_container_resources.return_value = xcom_sidecar_resources + op = SparkKubernetesOperator( + task_id="test_task", + namespace="default", + template_spec={ + "apiVersion": "sparkoperator.k8s.io/v1beta2", + "kind": "SparkApplication", + "spec": { + "driver": {}, + "executor": {}, + }, + }, + do_xcom_push=True, + reattach_on_restart=False, + ) + op.name = "my-spark-app" + + with mock.patch.object(op, "get_or_create_spark_crd", return_value=mock.MagicMock()): + op._setup_spark_configuration(mock.MagicMock()) + + spec = op.launcher.body["spec"] + assert spec["volumes"] == [PodDefaults.VOLUME.to_dict()] + assert spec["driver"]["volumeMounts"] == [ + {"name": PodDefaults.VOLUME_MOUNT_NAME, "mountPath": PodDefaults.XCOM_MOUNT_PATH} + ] + sidecars = spec["driver"]["sidecars"] + assert len(sidecars) == 1 + assert sidecars[0]["name"] == PodDefaults.SIDECAR_CONTAINER_NAME + assert sidecars[0]["image"] == "custom.repo/alpine:3.16" + assert sidecars[0]["resources"] == xcom_sidecar_resources + mock_hook_cls.return_value.get_xcom_sidecar_container_image.assert_called_once() + mock_hook_cls.return_value.get_xcom_sidecar_container_resources.assert_called_once() + + @patch("airflow.providers.cncf.kubernetes.operators.spark_kubernetes.SparkKubernetesOperator.client") + @patch("airflow.providers.cncf.kubernetes.operators.spark_kubernetes.KubernetesHook") + def test_setup_spark_configuration_skips_xcom_sidecar_when_do_xcom_push_false( + self, mock_hook_cls, mock_client + ): + original_volumes = [{"name": "config", "emptyDir": {}}] + original_mounts = [{"name": "config", "mountPath": "/config"}] + op = SparkKubernetesOperator( + task_id="test_task", + namespace="default", + template_spec={ + "apiVersion": "sparkoperator.k8s.io/v1beta2", + "kind": "SparkApplication", + "spec": { + "volumes": copy.deepcopy(original_volumes), + "driver": {"volumeMounts": copy.deepcopy(original_mounts)}, + "executor": {}, + }, + }, + do_xcom_push=False, + reattach_on_restart=False, + ) + op.name = "my-spark-app" + + with mock.patch.object(op, "get_or_create_spark_crd", return_value=mock.MagicMock()): + op._setup_spark_configuration(mock.MagicMock()) + + spec = op.launcher.body["spec"] + assert spec["volumes"] == original_volumes + assert spec["driver"]["volumeMounts"] == original_mounts + assert "sidecars" not in spec["driver"] + mock_hook_cls.return_value.get_xcom_sidecar_container_image.assert_not_called() + mock_hook_cls.return_value.get_xcom_sidecar_container_resources.assert_not_called() diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_xcom_sidecar.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_xcom_sidecar.py new file mode 100644 index 0000000000000..b005e28cb926c --- /dev/null +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_xcom_sidecar.py @@ -0,0 +1,169 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pytest +from kubernetes.client import ApiClient, models as k8s + +from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import ( + PodDefaults, + add_sidecar_to_spark_operator_pod_spec, +) + +EXISTING_VOLUME_NAME = "config" +EXISTING_MOUNT_PATH = "/config" +XCOM_MOUNT = ApiClient().sanitize_for_serialization(PodDefaults.VOLUME_MOUNT) + + +def _make_spark_spec( + *, + volumes: list[dict] | None = None, + driver_volume_mounts: list[dict] | None = None, + driver_sidecars: list[dict] | None = None, +) -> dict: + spec = { + "type": "Python", + "driver": {"cores": 1, "memory": "512m"}, + "executor": {"instances": 1, "cores": 1, "memory": "512m"}, + } + if volumes is not None: + spec["volumes"] = volumes + if driver_volume_mounts is not None: + spec["driver"]["volumeMounts"] = driver_volume_mounts + if driver_sidecars is not None: + spec["driver"]["sidecars"] = driver_sidecars + return spec + + +def test_add_sidecar_to_spark_operator_pod_spec_does_not_mutate_input_spec(): + existing_volume = {"name": EXISTING_VOLUME_NAME, "emptyDir": {}} + existing_mount = {"name": EXISTING_VOLUME_NAME, "mountPath": EXISTING_MOUNT_PATH} + existing_sidecar = {"name": "other-sidecar", "image": "busybox"} + spec = _make_spark_spec( + volumes=[existing_volume], + driver_volume_mounts=[existing_mount], + driver_sidecars=[existing_sidecar], + ) + original_volumes = spec["volumes"] + original_mounts = spec["driver"]["volumeMounts"] + original_sidecars = spec["driver"]["sidecars"] + + result = add_sidecar_to_spark_operator_pod_spec(spec) + + assert spec["volumes"] is original_volumes + assert spec["volumes"] == [existing_volume] + assert spec["driver"]["volumeMounts"] is original_mounts + assert spec["driver"]["volumeMounts"] == [existing_mount] + assert spec["driver"]["sidecars"] is original_sidecars + assert spec["driver"]["sidecars"] == [existing_sidecar] + assert result is not spec + + +def test_add_sidecar_to_spark_operator_pod_spec_sidecar_defaults(): + spec = _make_spark_spec() + + result = add_sidecar_to_spark_operator_pod_spec(spec) + + assert len(result["volumes"]) == 1 + assert result["volumes"][0] == PodDefaults.VOLUME.to_dict() + assert result["driver"]["volumeMounts"] == [XCOM_MOUNT] + sidecar = result["driver"]["sidecars"][0] + assert sidecar["name"] == PodDefaults.SIDECAR_CONTAINER_NAME + assert sidecar["command"] == PodDefaults.XCOM_SIDECAR_COMMAND + assert sidecar["image"] == PodDefaults.SIDECAR_CONTAINER.image + assert sidecar["volumeMounts"] == [XCOM_MOUNT] + + +def test_add_sidecar_to_spark_operator_pod_spec_prepends_xcom_before_existing_volumes_and_mounts(): + existing_sidecar = {"name": "other-sidecar", "image": "busybox"} + existing_volume = {"name": EXISTING_VOLUME_NAME, "emptyDir": {}} + existing_mount = {"name": EXISTING_VOLUME_NAME, "mountPath": EXISTING_MOUNT_PATH} + spec = _make_spark_spec( + volumes=[existing_volume], + driver_volume_mounts=[existing_mount], + driver_sidecars=[existing_sidecar], + ) + + result = add_sidecar_to_spark_operator_pod_spec(spec) + + assert len(result["volumes"]) == len(spec["volumes"]) + 1 + assert result["volumes"][0] == PodDefaults.VOLUME.to_dict() + assert result["volumes"][1:] == spec["volumes"] + assert len(result["driver"]["volumeMounts"]) == len(spec["driver"]["volumeMounts"]) + 1 + assert result["driver"]["volumeMounts"][0] == XCOM_MOUNT + assert result["driver"]["volumeMounts"][1:] == spec["driver"]["volumeMounts"] + assert len(result["driver"]["sidecars"]) == len(spec["driver"]["sidecars"]) + 1 + assert result["driver"]["sidecars"][:-1] == spec["driver"]["sidecars"] + assert result["driver"]["sidecars"][-1]["name"] == PodDefaults.SIDECAR_CONTAINER_NAME + + +def test_add_sidecar_to_spark_operator_pod_spec_wires_shared_volume_between_driver_and_sidecar(): + existing_volume = {"name": EXISTING_VOLUME_NAME, "emptyDir": {}} + existing_mount = {"name": EXISTING_VOLUME_NAME, "mountPath": EXISTING_MOUNT_PATH} + spec = _make_spark_spec( + volumes=[existing_volume], + driver_volume_mounts=[existing_mount], + ) + + result = add_sidecar_to_spark_operator_pod_spec(spec) + + xcom_volume_name = result["volumes"][0]["name"] + assert xcom_volume_name == PodDefaults.VOLUME_MOUNT_NAME + assert result["driver"]["volumeMounts"][0]["name"] == xcom_volume_name + assert result["driver"]["sidecars"][-1]["volumeMounts"][0]["name"] == xcom_volume_name + assert result["driver"]["volumeMounts"][0]["mountPath"] == PodDefaults.XCOM_MOUNT_PATH + assert result["driver"]["sidecars"][-1]["volumeMounts"][0]["mountPath"] == PodDefaults.XCOM_MOUNT_PATH + + +@pytest.mark.parametrize( + ("sidecar_container_image", "expected_image"), + [ + ("private.repo/alpine:3.16", "private.repo/alpine:3.16"), + (None, PodDefaults.SIDECAR_CONTAINER.image), + ], + ids=["custom", "default_when_none"], +) +def test_add_sidecar_to_spark_operator_pod_spec_uses_sidecar_container_image( + sidecar_container_image, expected_image +): + spec = _make_spark_spec() + + result = add_sidecar_to_spark_operator_pod_spec(spec, sidecar_container_image=sidecar_container_image) + + assert result["driver"]["sidecars"][-1]["image"] == expected_image + + +@pytest.mark.parametrize( + "resources", + [ + { + "requests": {"cpu": "1m", "memory": "10Mi"}, + "limits": {"cpu": "10m", "memory": "50Mi"}, + }, + k8s.V1ResourceRequirements( + requests={"cpu": "2m", "memory": "20Mi"}, + limits={"cpu": "20m", "memory": "100Mi"}, + ), + ], + ids=["dict", "v1_resource_requirements"], +) +def test_add_sidecar_to_spark_operator_pod_spec_uses_custom_resources(resources): + spec = _make_spark_spec() + + result = add_sidecar_to_spark_operator_pod_spec(spec, sidecar_container_resources=resources) + + assert result["driver"]["sidecars"][-1]["resources"] == resources From e2af1c95327b9114904af34714e43704fcb16cf5 Mon Sep 17 00:00:00 2001 From: kesem shaked Date: Sun, 21 Jun 2026 03:08:22 +0300 Subject: [PATCH 2/6] remove unedded line in doc --- providers/cncf/kubernetes/docs/operators.rst | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/providers/cncf/kubernetes/docs/operators.rst b/providers/cncf/kubernetes/docs/operators.rst index 7d7d2640ab94c..6b4e6504fa668 100644 --- a/providers/cncf/kubernetes/docs/operators.rst +++ b/providers/cncf/kubernetes/docs/operators.rst @@ -362,9 +362,6 @@ For further information, look at: SparkKubernetesOperator ========================== - -.. _howto/operator:SparkKubernetesOperator: - The :class:`~airflow.providers.cncf.kubernetes.operators.spark_kubernetes.SparkKubernetesOperator` allows you to create and run spark job on a Kubernetes cluster. It is based on `spark-on-k8s-operator `__ project. @@ -461,7 +458,7 @@ spark_job_template.yaml # karpenter.sh/provisioner-name: spark node_selector: {} - # example: https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/ + # example: https://kubernetes.io/docs/concepts/schedulingF-eviction/assign-pod-node/ # affinity: # nodeAffinity: # requiredDuringSchedulingIgnoredDuringExecution: @@ -661,7 +658,7 @@ How does XCom work? When ``do_xcom_push=True``, :class:`~airflow.providers.cncf.kubernetes.operators.spark_kubernetes.SparkKubernetesOperator` injects an XCom ``emptyDir`` volume, a driver ``volumeMount`` at ``/airflow/xcom``, and an -``airflow-xcom-sidecar`` sidecar into the SparkApplication spec before submission. +``airflow-xcom-sidecar`` sidecar into the SparkApplication spec before submission. .. important:: From 46aee9646d742acf95d096b9c31daf4f5236278d Mon Sep 17 00:00:00 2001 From: kesem shaked Date: Sun, 21 Jun 2026 03:09:47 +0300 Subject: [PATCH 3/6] remove unedded line in doc --- providers/cncf/kubernetes/docs/operators.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/cncf/kubernetes/docs/operators.rst b/providers/cncf/kubernetes/docs/operators.rst index 6b4e6504fa668..caaad63c22f45 100644 --- a/providers/cncf/kubernetes/docs/operators.rst +++ b/providers/cncf/kubernetes/docs/operators.rst @@ -458,7 +458,7 @@ spark_job_template.yaml # karpenter.sh/provisioner-name: spark node_selector: {} - # example: https://kubernetes.io/docs/concepts/schedulingF-eviction/assign-pod-node/ + # example: https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/ # affinity: # nodeAffinity: # requiredDuringSchedulingIgnoredDuringExecution: From ae1cfe9d1269c9ba1c65b444c3174ce2018fd249 Mon Sep 17 00:00:00 2001 From: kesem shaked Date: Sun, 21 Jun 2026 15:42:16 +0300 Subject: [PATCH 4/6] fix spaces --- providers/cncf/kubernetes/docs/operators.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/cncf/kubernetes/docs/operators.rst b/providers/cncf/kubernetes/docs/operators.rst index caaad63c22f45..2d8cb827e4345 100644 --- a/providers/cncf/kubernetes/docs/operators.rst +++ b/providers/cncf/kubernetes/docs/operators.rst @@ -658,7 +658,7 @@ How does XCom work? When ``do_xcom_push=True``, :class:`~airflow.providers.cncf.kubernetes.operators.spark_kubernetes.SparkKubernetesOperator` injects an XCom ``emptyDir`` volume, a driver ``volumeMount`` at ``/airflow/xcom``, and an -``airflow-xcom-sidecar`` sidecar into the SparkApplication spec before submission. +``airflow-xcom-sidecar`` sidecar into the SparkApplication spec before submission. .. important:: From 6420ce132a811e824e26b46a45147ff08ff3fcf3 Mon Sep 17 00:00:00 2001 From: kesem shaked Date: Sun, 21 Jun 2026 17:48:23 +0300 Subject: [PATCH 5/6] Fix ruff import order --- .../providers/cncf/kubernetes/operators/spark_kubernetes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py index 0b7993427c764..08a007858a989 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py @@ -31,9 +31,9 @@ from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator from airflow.providers.cncf.kubernetes.pod_generator import MAX_LABEL_LEN, PodGenerator from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager, PodPhase +from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import add_sidecar_to_spark_operator_pod_spec from airflow.providers.common.compat.sdk import AirflowException from airflow.utils.helpers import prune_dict -from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import add_sidecar_to_spark_operator_pod_spec if TYPE_CHECKING: import jinja2 From 16ccbba7deb5a576a7ad441f499b5b9e27bebfe7 Mon Sep 17 00:00:00 2001 From: kesem shaked Date: Sun, 21 Jun 2026 17:54:38 +0300 Subject: [PATCH 6/6] Fix mypy and ruff issues in Spark XCom sidecar changes --- .../kubernetes/utils/test_xcom_sidecar.py | 51 +++---------------- 1 file changed, 7 insertions(+), 44 deletions(-) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_xcom_sidecar.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_xcom_sidecar.py index b005e28cb926c..1df33bf1bac98 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_xcom_sidecar.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_xcom_sidecar.py @@ -16,6 +16,8 @@ # under the License. from __future__ import annotations +from typing import Any + import pytest from kubernetes.client import ApiClient, models as k8s @@ -31,11 +33,11 @@ def _make_spark_spec( *, - volumes: list[dict] | None = None, - driver_volume_mounts: list[dict] | None = None, - driver_sidecars: list[dict] | None = None, -) -> dict: - spec = { + volumes: list[dict[str, Any]] | None = None, + driver_volume_mounts: list[dict[str, Any]] | None = None, + driver_sidecars: list[dict[str, Any]] | None = None, +) -> dict[str, Any]: + spec: dict[str, Any] = { "type": "Python", "driver": {"cores": 1, "memory": "512m"}, "executor": {"instances": 1, "cores": 1, "memory": "512m"}, @@ -49,45 +51,6 @@ def _make_spark_spec( return spec -def test_add_sidecar_to_spark_operator_pod_spec_does_not_mutate_input_spec(): - existing_volume = {"name": EXISTING_VOLUME_NAME, "emptyDir": {}} - existing_mount = {"name": EXISTING_VOLUME_NAME, "mountPath": EXISTING_MOUNT_PATH} - existing_sidecar = {"name": "other-sidecar", "image": "busybox"} - spec = _make_spark_spec( - volumes=[existing_volume], - driver_volume_mounts=[existing_mount], - driver_sidecars=[existing_sidecar], - ) - original_volumes = spec["volumes"] - original_mounts = spec["driver"]["volumeMounts"] - original_sidecars = spec["driver"]["sidecars"] - - result = add_sidecar_to_spark_operator_pod_spec(spec) - - assert spec["volumes"] is original_volumes - assert spec["volumes"] == [existing_volume] - assert spec["driver"]["volumeMounts"] is original_mounts - assert spec["driver"]["volumeMounts"] == [existing_mount] - assert spec["driver"]["sidecars"] is original_sidecars - assert spec["driver"]["sidecars"] == [existing_sidecar] - assert result is not spec - - -def test_add_sidecar_to_spark_operator_pod_spec_sidecar_defaults(): - spec = _make_spark_spec() - - result = add_sidecar_to_spark_operator_pod_spec(spec) - - assert len(result["volumes"]) == 1 - assert result["volumes"][0] == PodDefaults.VOLUME.to_dict() - assert result["driver"]["volumeMounts"] == [XCOM_MOUNT] - sidecar = result["driver"]["sidecars"][0] - assert sidecar["name"] == PodDefaults.SIDECAR_CONTAINER_NAME - assert sidecar["command"] == PodDefaults.XCOM_SIDECAR_COMMAND - assert sidecar["image"] == PodDefaults.SIDECAR_CONTAINER.image - assert sidecar["volumeMounts"] == [XCOM_MOUNT] - - def test_add_sidecar_to_spark_operator_pod_spec_prepends_xcom_before_existing_volumes_and_mounts(): existing_sidecar = {"name": "other-sidecar", "image": "busybox"} existing_volume = {"name": EXISTING_VOLUME_NAME, "emptyDir": {}}