diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py index c1f92af00374a..56ce902fce63d 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py @@ -32,6 +32,7 @@ from airflow.providers.cncf.kubernetes.pod_generator import MAX_LABEL_LEN, PodGenerator from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager from airflow.utils.helpers import prune_dict +from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import add_sidecar_to_spark_operator_pod_spec if TYPE_CHECKING: import jinja2 @@ -233,7 +234,7 @@ def _try_numbers_match(self, context, pod) -> bool: pod_try_number = pod.metadata.labels.get(task_context_labels.get("try_number", ""), "") return str(task_instance.try_number) == str(pod_try_number) - @property + @cached_property def template_body(self): """Templated body for CustomObjectLauncher.""" return self.manage_template_specs() @@ -257,6 +258,17 @@ def find_spark_job(self, context, exclude_checked: bool = True): self.log.info("`try_number` of pod: %s", pod.metadata.labels.get("try_number", "unknown")) return pod + def get_or_create_spark_crd(self, launcher: CustomObjectLauncher, context) -> k8s.V1Pod: + if self.reattach_on_restart: + driver_pod = self.find_spark_job(context) + if driver_pod: + return driver_pod + + driver_pod, spark_obj_spec = launcher.start_spark_job( + image=self.image, code_path=self.code_path, startup_timeout=self.startup_timeout_seconds + ) + return driver_pod + def process_pod_deletion(self, pod, *, reraise=True): if pod is not None: if self.delete_on_termination: @@ -286,16 +298,16 @@ def client(self) -> CoreV1Api: def custom_obj_api(self) -> CustomObjectsApi: return CustomObjectsApi() - def get_or_create_spark_crd(self, launcher: CustomObjectLauncher, context) -> k8s.V1Pod: - if self.reattach_on_restart: - driver_pod = self.find_spark_job(context) - if driver_pod: - return driver_pod - - driver_pod, spark_obj_spec = launcher.start_spark_job( - image=self.image, code_path=self.code_path, startup_timeout=self.startup_timeout_seconds - ) - return driver_pod + def update_pod_spec_add_xcom_sidecar(self) -> None: + if self.do_xcom_push: + self.log.debug("Adding xcom sidecar to driver pod spec in task %s", self.task_id) + driver_template = self.template_body["spark"]["spec"] + driver_with_xcom_template = add_sidecar_to_spark_operator_pod_spec( + driver_template, + sidecar_container_image=self.hook.get_xcom_sidecar_container_image(), + sidecar_container_resources=self.hook.get_xcom_sidecar_container_resources(), + ) + self.template_body["spark"]["spec"] = driver_with_xcom_template def execute(self, context: Context): self.name = self.create_job_name() @@ -346,6 +358,10 @@ def _setup_spark_configuration(self, context: Context): spec_dict[component]["labels"].update(task_context_labels) self.log.info("Creating sparkApplication.") + + # Add xcom sidecar if needed + self.update_pod_spec_add_xcom_sidecar() + self.launcher = CustomObjectLauncher( name=self.name, namespace=self.namespace, diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py index 6cdc9febb0252..3c360ef25a0e2 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py @@ -21,6 +21,7 @@ import copy from kubernetes.client import models as k8s +from kubernetes import client class PodDefaults: @@ -29,11 +30,13 @@ class PodDefaults: XCOM_MOUNT_PATH = "/airflow/xcom" SIDECAR_CONTAINER_NAME = "airflow-xcom-sidecar" XCOM_CMD = 'trap "exit 0" INT; while true; do sleep 1; done;' - VOLUME_MOUNT = k8s.V1VolumeMount(name="xcom", mount_path=XCOM_MOUNT_PATH) - VOLUME = k8s.V1Volume(name="xcom", empty_dir=k8s.V1EmptyDirVolumeSource()) + VOLUME_MOUNT_NAME = "xcom" + VOLUME_MOUNT = k8s.V1VolumeMount(name=VOLUME_MOUNT_NAME, mount_path=XCOM_MOUNT_PATH) + XCOM_SIDECAR_COMMAND = ["sh", "-c", XCOM_CMD] + VOLUME = k8s.V1Volume(name=VOLUME_MOUNT_NAME, empty_dir=k8s.V1EmptyDirVolumeSource()) SIDECAR_CONTAINER = k8s.V1Container( name=SIDECAR_CONTAINER_NAME, - command=["sh", "-c", XCOM_CMD], + command=XCOM_SIDECAR_COMMAND, image="alpine", volume_mounts=[VOLUME_MOUNT], resources=k8s.V1ResourceRequirements( @@ -46,14 +49,20 @@ class PodDefaults: def add_xcom_sidecar( - pod: k8s.V1Pod, + pod: k8s.V1Pod | dict, *, sidecar_container_image: str | None = None, sidecar_container_resources: k8s.V1ResourceRequirements | dict | None = None, ) -> k8s.V1Pod: """Add sidecar.""" pod_cp = copy.deepcopy(pod) - pod_cp.spec.volumes = pod.spec.volumes or [] + + # Handle both V1Pod object and dict cases + if isinstance(pod_cp, dict): + # Convert dict to V1Pod object if needed + pod_cp = k8s.V1Pod(**pod_cp) + + pod_cp.spec.volumes = pod_cp.spec.volumes or [] pod_cp.spec.volumes.insert(0, PodDefaults.VOLUME) pod_cp.spec.containers[0].volume_mounts = pod_cp.spec.containers[0].volume_mounts or [] pod_cp.spec.containers[0].volume_mounts.insert(0, PodDefaults.VOLUME_MOUNT) @@ -64,3 +73,27 @@ def add_xcom_sidecar( pod_cp.spec.containers.append(sidecar) return pod_cp + + +def add_sidecar_to_spark_operator_pod_spec( + spec: dict, sidecar_container_image: str | None = None, sidecar_container_resources: dict | None = None +): + # The Spark Operator expects a custom SparkApplication object, which is different from the standard Kubernetes Pod model. + driver_template = copy.deepcopy(spec) + driver_template["volumes"] = [PodDefaults.VOLUME.to_dict()] + driver_template["driver"]["volumeMounts"] = [ + {"name": PodDefaults.VOLUME_MOUNT_NAME, "mountPath": PodDefaults.XCOM_MOUNT_PATH} + ] + driver_template["driver"]["sidecars"] = [ + { + "name": PodDefaults.SIDECAR_CONTAINER_NAME, + "command": PodDefaults.XCOM_SIDECAR_COMMAND, + "image": sidecar_container_image or PodDefaults.SIDECAR_CONTAINER.image, + "volumeMounts": [ + {"name": PodDefaults.VOLUME_MOUNT_NAME, "mountPath": PodDefaults.XCOM_MOUNT_PATH} + ], + } + ] + if sidecar_container_resources: + driver_template["driver"]["sidecars"]["resources"] = sidecar_container_resources + return driver_template