Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from airflow.providers.cncf.kubernetes.pod_generator import MAX_LABEL_LEN, PodGenerator
from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager
from airflow.utils.helpers import prune_dict
from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import add_sidecar_to_spark_operator_pod_spec

if TYPE_CHECKING:
import jinja2
Expand Down Expand Up @@ -233,7 +234,7 @@ def _try_numbers_match(self, context, pod) -> bool:
pod_try_number = pod.metadata.labels.get(task_context_labels.get("try_number", ""), "")
return str(task_instance.try_number) == str(pod_try_number)

@property
@cached_property
def template_body(self):
"""Templated body for CustomObjectLauncher."""
return self.manage_template_specs()
Expand All @@ -257,6 +258,17 @@ def find_spark_job(self, context, exclude_checked: bool = True):
self.log.info("`try_number` of pod: %s", pod.metadata.labels.get("try_number", "unknown"))
return pod

def get_or_create_spark_crd(self, launcher: CustomObjectLauncher, context) -> k8s.V1Pod:
if self.reattach_on_restart:
driver_pod = self.find_spark_job(context)
if driver_pod:
return driver_pod

driver_pod, spark_obj_spec = launcher.start_spark_job(
image=self.image, code_path=self.code_path, startup_timeout=self.startup_timeout_seconds
)
return driver_pod

def process_pod_deletion(self, pod, *, reraise=True):
if pod is not None:
if self.delete_on_termination:
Expand Down Expand Up @@ -286,16 +298,16 @@ def client(self) -> CoreV1Api:
def custom_obj_api(self) -> CustomObjectsApi:
return CustomObjectsApi()

def get_or_create_spark_crd(self, launcher: CustomObjectLauncher, context) -> k8s.V1Pod:
if self.reattach_on_restart:
driver_pod = self.find_spark_job(context)
if driver_pod:
return driver_pod

driver_pod, spark_obj_spec = launcher.start_spark_job(
image=self.image, code_path=self.code_path, startup_timeout=self.startup_timeout_seconds
)
return driver_pod
def update_pod_spec_add_xcom_sidecar(self) -> None:
if self.do_xcom_push:
self.log.debug("Adding xcom sidecar to driver pod spec in task %s", self.task_id)
driver_template = self.template_body["spark"]["spec"]
driver_with_xcom_template = add_sidecar_to_spark_operator_pod_spec(
driver_template,
sidecar_container_image=self.hook.get_xcom_sidecar_container_image(),
sidecar_container_resources=self.hook.get_xcom_sidecar_container_resources(),
)
self.template_body["spark"]["spec"] = driver_with_xcom_template

def execute(self, context: Context):
self.name = self.create_job_name()
Expand Down Expand Up @@ -346,6 +358,10 @@ def _setup_spark_configuration(self, context: Context):
spec_dict[component]["labels"].update(task_context_labels)

self.log.info("Creating sparkApplication.")

# Add xcom sidecar if needed
self.update_pod_spec_add_xcom_sidecar()

self.launcher = CustomObjectLauncher(
name=self.name,
namespace=self.namespace,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import copy

from kubernetes.client import models as k8s
from kubernetes import client


class PodDefaults:
Expand All @@ -29,11 +30,13 @@ class PodDefaults:
XCOM_MOUNT_PATH = "/airflow/xcom"
SIDECAR_CONTAINER_NAME = "airflow-xcom-sidecar"
XCOM_CMD = 'trap "exit 0" INT; while true; do sleep 1; done;'
VOLUME_MOUNT = k8s.V1VolumeMount(name="xcom", mount_path=XCOM_MOUNT_PATH)
VOLUME = k8s.V1Volume(name="xcom", empty_dir=k8s.V1EmptyDirVolumeSource())
VOLUME_MOUNT_NAME = "xcom"
VOLUME_MOUNT = k8s.V1VolumeMount(name=VOLUME_MOUNT_NAME, mount_path=XCOM_MOUNT_PATH)
XCOM_SIDECAR_COMMAND = ["sh", "-c", XCOM_CMD]
VOLUME = k8s.V1Volume(name=VOLUME_MOUNT_NAME, empty_dir=k8s.V1EmptyDirVolumeSource())
Comment on lines +33 to +36

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
VOLUME_MOUNT_NAME = "xcom"
VOLUME_MOUNT = k8s.V1VolumeMount(name=VOLUME_MOUNT_NAME, mount_path=XCOM_MOUNT_PATH)
XCOM_SIDECAR_COMMAND = ["sh", "-c", XCOM_CMD]
VOLUME = k8s.V1Volume(name=VOLUME_MOUNT_NAME, empty_dir=k8s.V1EmptyDirVolumeSource())
XCOM_SIDECAR_COMMAND = ["sh", "-c", XCOM_CMD]
VOLUME_MOUNT_NAME = "xcom"
VOLUME_MOUNT = k8s.V1VolumeMount(name=VOLUME_MOUNT_NAME, mount_path=XCOM_MOUNT_PATH)
VOLUME = k8s.V1Volume(name=VOLUME_MOUNT_NAME, empty_dir=k8s.V1EmptyDirVolumeSource())

let's group similar concepts together

SIDECAR_CONTAINER = k8s.V1Container(
name=SIDECAR_CONTAINER_NAME,
command=["sh", "-c", XCOM_CMD],
command=XCOM_SIDECAR_COMMAND,
image="alpine",
volume_mounts=[VOLUME_MOUNT],
resources=k8s.V1ResourceRequirements(
Expand All @@ -46,14 +49,20 @@ class PodDefaults:


def add_xcom_sidecar(
pod: k8s.V1Pod,
pod: k8s.V1Pod | dict,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pod: k8s.V1Pod | dict,
pod: k8s.V1Pod | dict[str, Any],

*,
sidecar_container_image: str | None = None,
sidecar_container_resources: k8s.V1ResourceRequirements | dict | None = None,
) -> k8s.V1Pod:
"""Add sidecar."""
pod_cp = copy.deepcopy(pod)
pod_cp.spec.volumes = pod.spec.volumes or []

# Handle both V1Pod object and dict cases
if isinstance(pod_cp, dict):
# Convert dict to V1Pod object if needed
pod_cp = k8s.V1Pod(**pod_cp)

pod_cp.spec.volumes = pod_cp.spec.volumes or []
pod_cp.spec.volumes.insert(0, PodDefaults.VOLUME)
pod_cp.spec.containers[0].volume_mounts = pod_cp.spec.containers[0].volume_mounts or []
pod_cp.spec.containers[0].volume_mounts.insert(0, PodDefaults.VOLUME_MOUNT)
Expand All @@ -64,3 +73,27 @@ def add_xcom_sidecar(
pod_cp.spec.containers.append(sidecar)

return pod_cp


def add_sidecar_to_spark_operator_pod_spec(
spec: dict, sidecar_container_image: str | None = None, sidecar_container_resources: dict | None = None
):
Comment on lines +78 to +80

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def add_sidecar_to_spark_operator_pod_spec(
spec: dict, sidecar_container_image: str | None = None, sidecar_container_resources: dict | None = None
):
def add_sidecar_to_spark_operator_pod_spec(
spec: dict[str, Any], sidecar_container_image: str | None = None, sidecar_container_resources: dict[str, Any] | None = None
) -> dict[str, Any]:

please confirm whether these type are correct. Thanks!

# The Spark Operator expects a custom SparkApplication object, which is different from the standard Kubernetes Pod model.
driver_template = copy.deepcopy(spec)
driver_template["volumes"] = [PodDefaults.VOLUME.to_dict()]
driver_template["driver"]["volumeMounts"] = [
{"name": PodDefaults.VOLUME_MOUNT_NAME, "mountPath": PodDefaults.XCOM_MOUNT_PATH}
]
driver_template["driver"]["sidecars"] = [
{
"name": PodDefaults.SIDECAR_CONTAINER_NAME,
"command": PodDefaults.XCOM_SIDECAR_COMMAND,
"image": sidecar_container_image or PodDefaults.SIDECAR_CONTAINER.image,
"volumeMounts": [
{"name": PodDefaults.VOLUME_MOUNT_NAME, "mountPath": PodDefaults.XCOM_MOUNT_PATH}
],
}
]
if sidecar_container_resources:
driver_template["driver"]["sidecars"]["resources"] = sidecar_container_resources
return driver_template
Loading