Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions providers/cncf/kubernetes/docs/operators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,20 @@ spark_job_template.json
An alternative method, apart from using YAML or JSON files, is to directly pass the ``template_spec`` field instead of application_file
if you prefer not to employ a file for configuration.

How does XCom work?
^^^^^^^^^^^^^^^^^^^

When ``do_xcom_push=True``, :class:`~airflow.providers.cncf.kubernetes.operators.spark_kubernetes.SparkKubernetesOperator`
injects an XCom ``emptyDir`` volume, a driver ``volumeMount`` at ``/airflow/xcom``, and an
``airflow-xcom-sidecar`` sidecar into the SparkApplication spec before submission.

.. important::

Do **not** define the XCom volume (for example ``spec.volumes`` with name ``xcom``),
a driver ``volumeMounts`` entry for ``/airflow/xcom``, or an ``airflow-xcom-sidecar``
entry under ``driver.sidecars`` in your SparkApplication YAML or ``template_spec``.
The operator adds these resources when ``do_xcom_push=True``. Defining them yourself
duplicates the mount path and Kubernetes rejects the driver pod.

Reference
^^^^^^^^^
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.pod_generator import MAX_LABEL_LEN, PodGenerator
from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager, PodPhase
from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import add_sidecar_to_spark_operator_pod_spec
from airflow.providers.common.compat.sdk import AirflowException
from airflow.utils.helpers import prune_dict

Expand Down Expand Up @@ -230,7 +231,7 @@ def _try_numbers_match(self, context, pod) -> bool:
pod_try_number = pod.metadata.labels.get(task_context_labels.get("try_number", ""), "")
return str(task_instance.try_number) == str(pod_try_number)

@property
@cached_property
def template_body(self):
"""Templated body for CustomObjectLauncher."""
return self.manage_template_specs()
Expand Down Expand Up @@ -349,6 +350,16 @@ def get_or_create_spark_crd(self, launcher: CustomObjectLauncher, context) -> k8
)
return driver_pod

def _apply_xcom_sidecar_to_template(self, template_body: dict) -> None:
if self.do_xcom_push:
self.log.debug("Adding xcom sidecar to driver pod spec in task %s", self.task_id)
driver_template = template_body["spark"]["spec"]
template_body["spark"]["spec"] = add_sidecar_to_spark_operator_pod_spec(
driver_template,
sidecar_container_image=self.hook.get_xcom_sidecar_container_image(),
sidecar_container_resources=self.hook.get_xcom_sidecar_container_resources(),
)

def execute(self, context: Context):
self.name = self.create_job_name()

Expand Down Expand Up @@ -407,6 +418,8 @@ def _setup_spark_configuration(self, context: Context):
env_list.append({"name": "SPARK_APPLICATION_NAME", "value": self.name})

self.log.info("Creating sparkApplication.")
self._apply_xcom_sidecar_to_template(template_body)

self.launcher = CustomObjectLauncher(
name=self.name,
namespace=self.namespace,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
from __future__ import annotations

import copy
from typing import Any

from kubernetes.client import models as k8s
from kubernetes.client import ApiClient, models as k8s

# Pinned alpine version for the xcom sidecar default. Pinning (rather than
# using the implicit `:latest`) makes kubelet's default imagePullPolicy
Expand All @@ -37,8 +38,10 @@ class PodDefaults:
XCOM_MOUNT_PATH = "/airflow/xcom"
SIDECAR_CONTAINER_NAME = "airflow-xcom-sidecar"
XCOM_CMD = 'trap "exit 0" INT; while true; do sleep 1; done;'
VOLUME_MOUNT = k8s.V1VolumeMount(name="xcom", mount_path=XCOM_MOUNT_PATH)
VOLUME = k8s.V1Volume(name="xcom", empty_dir=k8s.V1EmptyDirVolumeSource())
VOLUME_MOUNT_NAME = "xcom"
VOLUME_MOUNT = k8s.V1VolumeMount(name=VOLUME_MOUNT_NAME, mount_path=XCOM_MOUNT_PATH)
XCOM_SIDECAR_COMMAND = ["sh", "-c", XCOM_CMD]
VOLUME = k8s.V1Volume(name=VOLUME_MOUNT_NAME, empty_dir=k8s.V1EmptyDirVolumeSource())
SIDECAR_CONTAINER = k8s.V1Container(
name=SIDECAR_CONTAINER_NAME,
command=["sh", "-c", XCOM_CMD],
Expand All @@ -57,11 +60,11 @@ def add_xcom_sidecar(
pod: k8s.V1Pod,
*,
sidecar_container_image: str | None = None,
sidecar_container_resources: k8s.V1ResourceRequirements | dict | None = None,
sidecar_container_resources: k8s.V1ResourceRequirements | dict[str, Any] | None = None,
) -> k8s.V1Pod:
"""Add sidecar."""
pod_cp = copy.deepcopy(pod)
pod_cp.spec.volumes = pod.spec.volumes or []
pod_cp.spec.volumes = pod_cp.spec.volumes or []
pod_cp.spec.volumes.insert(0, PodDefaults.VOLUME)
pod_cp.spec.containers[0].volume_mounts = pod_cp.spec.containers[0].volume_mounts or []
pod_cp.spec.containers[0].volume_mounts.insert(0, PodDefaults.VOLUME_MOUNT)
Expand All @@ -72,3 +75,29 @@ def add_xcom_sidecar(
pod_cp.spec.containers.append(sidecar)

return pod_cp


def add_sidecar_to_spark_operator_pod_spec(
spec: dict[str, Any],
sidecar_container_image: str | None = None,
sidecar_container_resources: k8s.V1ResourceRequirements | dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Add xcom sidecar to a SparkApplication driver spec dict."""
# The Spark Operator expects a custom SparkApplication object, which is different from the standard Kubernetes Pod model
driver_template = copy.deepcopy(spec)
xcom_volume_mount = ApiClient().sanitize_for_serialization(PodDefaults.VOLUME_MOUNT)
driver_template["volumes"] = driver_template.get("volumes") or []
driver_template["volumes"].insert(0, PodDefaults.VOLUME.to_dict())
driver_template["driver"]["volumeMounts"] = driver_template["driver"].get("volumeMounts") or []
driver_template["driver"]["volumeMounts"].insert(0, xcom_volume_mount)
sidecar = {
"name": PodDefaults.SIDECAR_CONTAINER_NAME,
"command": PodDefaults.XCOM_SIDECAR_COMMAND,
"image": sidecar_container_image or PodDefaults.SIDECAR_CONTAINER.image,
"volumeMounts": [xcom_volume_mount],
}
if sidecar_container_resources:
sidecar["resources"] = sidecar_container_resources
driver_template["driver"]["sidecars"] = driver_template["driver"].get("sidecars") or []
driver_template["driver"]["sidecars"].append(sidecar)
return driver_template
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from airflow.providers.cncf.kubernetes.pod_generator import MAX_LABEL_LEN
from airflow.providers.cncf.kubernetes.triggers.pod import KubernetesPodTrigger
from airflow.providers.cncf.kubernetes.utils.pod_manager import PodPhase
from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import PodDefaults
from airflow.providers.common.compat.sdk import TaskDeferred
from airflow.utils import timezone
from airflow.utils.types import DagRunType
Expand Down Expand Up @@ -1652,3 +1653,80 @@ def test_spark_application_name_env_not_duplicated(self, mock_client):
app_name_envs = [e for e in env if e["name"] == "SPARK_APPLICATION_NAME"]
assert len(app_name_envs) == 1 # not duplicated
assert app_name_envs[0]["value"] == "user-defined"

@patch("airflow.providers.cncf.kubernetes.operators.spark_kubernetes.SparkKubernetesOperator.client")
@patch("airflow.providers.cncf.kubernetes.operators.spark_kubernetes.KubernetesHook")
def test_setup_spark_configuration_injects_xcom_sidecar_when_do_xcom_push(
self, mock_hook_cls, mock_client
):
xcom_sidecar_resources = {
"requests": {"cpu": "1m", "memory": "10Mi"},
"limits": {"cpu": "10m", "memory": "50Mi"},
}
mock_hook_cls.return_value.get_xcom_sidecar_container_image.return_value = "custom.repo/alpine:3.16"
mock_hook_cls.return_value.get_xcom_sidecar_container_resources.return_value = xcom_sidecar_resources
op = SparkKubernetesOperator(
task_id="test_task",
namespace="default",
template_spec={
"apiVersion": "sparkoperator.k8s.io/v1beta2",
"kind": "SparkApplication",
"spec": {
"driver": {},
"executor": {},
},
},
do_xcom_push=True,
reattach_on_restart=False,
)
op.name = "my-spark-app"

with mock.patch.object(op, "get_or_create_spark_crd", return_value=mock.MagicMock()):
op._setup_spark_configuration(mock.MagicMock())

spec = op.launcher.body["spec"]
assert spec["volumes"] == [PodDefaults.VOLUME.to_dict()]
assert spec["driver"]["volumeMounts"] == [
{"name": PodDefaults.VOLUME_MOUNT_NAME, "mountPath": PodDefaults.XCOM_MOUNT_PATH}
]
sidecars = spec["driver"]["sidecars"]
assert len(sidecars) == 1
assert sidecars[0]["name"] == PodDefaults.SIDECAR_CONTAINER_NAME
assert sidecars[0]["image"] == "custom.repo/alpine:3.16"
assert sidecars[0]["resources"] == xcom_sidecar_resources
mock_hook_cls.return_value.get_xcom_sidecar_container_image.assert_called_once()
mock_hook_cls.return_value.get_xcom_sidecar_container_resources.assert_called_once()

@patch("airflow.providers.cncf.kubernetes.operators.spark_kubernetes.SparkKubernetesOperator.client")
@patch("airflow.providers.cncf.kubernetes.operators.spark_kubernetes.KubernetesHook")
def test_setup_spark_configuration_skips_xcom_sidecar_when_do_xcom_push_false(
self, mock_hook_cls, mock_client
):
original_volumes = [{"name": "config", "emptyDir": {}}]
original_mounts = [{"name": "config", "mountPath": "/config"}]
op = SparkKubernetesOperator(
task_id="test_task",
namespace="default",
template_spec={
"apiVersion": "sparkoperator.k8s.io/v1beta2",
"kind": "SparkApplication",
"spec": {
"volumes": copy.deepcopy(original_volumes),
"driver": {"volumeMounts": copy.deepcopy(original_mounts)},
"executor": {},
},
},
do_xcom_push=False,
reattach_on_restart=False,
)
op.name = "my-spark-app"

with mock.patch.object(op, "get_or_create_spark_crd", return_value=mock.MagicMock()):
op._setup_spark_configuration(mock.MagicMock())

spec = op.launcher.body["spec"]
assert spec["volumes"] == original_volumes
assert spec["driver"]["volumeMounts"] == original_mounts
assert "sidecars" not in spec["driver"]
mock_hook_cls.return_value.get_xcom_sidecar_container_image.assert_not_called()
mock_hook_cls.return_value.get_xcom_sidecar_container_resources.assert_not_called()
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import Any

import pytest
from kubernetes.client import ApiClient, models as k8s

from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import (
PodDefaults,
add_sidecar_to_spark_operator_pod_spec,
)

EXISTING_VOLUME_NAME = "config"
EXISTING_MOUNT_PATH = "/config"
XCOM_MOUNT = ApiClient().sanitize_for_serialization(PodDefaults.VOLUME_MOUNT)


def _make_spark_spec(
*,
volumes: list[dict[str, Any]] | None = None,
driver_volume_mounts: list[dict[str, Any]] | None = None,
driver_sidecars: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
spec: dict[str, Any] = {
"type": "Python",
"driver": {"cores": 1, "memory": "512m"},
"executor": {"instances": 1, "cores": 1, "memory": "512m"},
}
if volumes is not None:
spec["volumes"] = volumes
if driver_volume_mounts is not None:
spec["driver"]["volumeMounts"] = driver_volume_mounts
if driver_sidecars is not None:
spec["driver"]["sidecars"] = driver_sidecars
return spec


def test_add_sidecar_to_spark_operator_pod_spec_prepends_xcom_before_existing_volumes_and_mounts():
existing_sidecar = {"name": "other-sidecar", "image": "busybox"}
existing_volume = {"name": EXISTING_VOLUME_NAME, "emptyDir": {}}
existing_mount = {"name": EXISTING_VOLUME_NAME, "mountPath": EXISTING_MOUNT_PATH}
spec = _make_spark_spec(
volumes=[existing_volume],
driver_volume_mounts=[existing_mount],
driver_sidecars=[existing_sidecar],
)

result = add_sidecar_to_spark_operator_pod_spec(spec)

assert len(result["volumes"]) == len(spec["volumes"]) + 1
assert result["volumes"][0] == PodDefaults.VOLUME.to_dict()
assert result["volumes"][1:] == spec["volumes"]
assert len(result["driver"]["volumeMounts"]) == len(spec["driver"]["volumeMounts"]) + 1
assert result["driver"]["volumeMounts"][0] == XCOM_MOUNT
assert result["driver"]["volumeMounts"][1:] == spec["driver"]["volumeMounts"]
assert len(result["driver"]["sidecars"]) == len(spec["driver"]["sidecars"]) + 1
assert result["driver"]["sidecars"][:-1] == spec["driver"]["sidecars"]
assert result["driver"]["sidecars"][-1]["name"] == PodDefaults.SIDECAR_CONTAINER_NAME


def test_add_sidecar_to_spark_operator_pod_spec_wires_shared_volume_between_driver_and_sidecar():
existing_volume = {"name": EXISTING_VOLUME_NAME, "emptyDir": {}}
existing_mount = {"name": EXISTING_VOLUME_NAME, "mountPath": EXISTING_MOUNT_PATH}
spec = _make_spark_spec(
volumes=[existing_volume],
driver_volume_mounts=[existing_mount],
)

result = add_sidecar_to_spark_operator_pod_spec(spec)

xcom_volume_name = result["volumes"][0]["name"]
assert xcom_volume_name == PodDefaults.VOLUME_MOUNT_NAME
assert result["driver"]["volumeMounts"][0]["name"] == xcom_volume_name
assert result["driver"]["sidecars"][-1]["volumeMounts"][0]["name"] == xcom_volume_name
assert result["driver"]["volumeMounts"][0]["mountPath"] == PodDefaults.XCOM_MOUNT_PATH
assert result["driver"]["sidecars"][-1]["volumeMounts"][0]["mountPath"] == PodDefaults.XCOM_MOUNT_PATH


@pytest.mark.parametrize(
("sidecar_container_image", "expected_image"),
[
("private.repo/alpine:3.16", "private.repo/alpine:3.16"),
(None, PodDefaults.SIDECAR_CONTAINER.image),
],
ids=["custom", "default_when_none"],
)
def test_add_sidecar_to_spark_operator_pod_spec_uses_sidecar_container_image(
sidecar_container_image, expected_image
):
spec = _make_spark_spec()

result = add_sidecar_to_spark_operator_pod_spec(spec, sidecar_container_image=sidecar_container_image)

assert result["driver"]["sidecars"][-1]["image"] == expected_image


@pytest.mark.parametrize(
"resources",
[
{
"requests": {"cpu": "1m", "memory": "10Mi"},
"limits": {"cpu": "10m", "memory": "50Mi"},
},
k8s.V1ResourceRequirements(
requests={"cpu": "2m", "memory": "20Mi"},
limits={"cpu": "20m", "memory": "100Mi"},
),
],
ids=["dict", "v1_resource_requirements"],
)
def test_add_sidecar_to_spark_operator_pod_spec_uses_custom_resources(resources):
spec = _make_spark_spec()

result = add_sidecar_to_spark_operator_pod_spec(spec, sidecar_container_resources=resources)

assert result["driver"]["sidecars"][-1]["resources"] == resources
Loading