From ee4a5d8b682f2132512f78320325d2353eeb127f Mon Sep 17 00:00:00 2001 From: kesem Date: Thu, 19 Jun 2025 01:34:21 +0300 Subject: [PATCH 01/11] add xcom sidecar --- .../kubernetes/operators/spark_kubernetes.py | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py index 5d3ff3b51a34e..9d69c34f5ad2f 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py @@ -32,6 +32,7 @@ from airflow.providers.cncf.kubernetes.pod_generator import MAX_LABEL_LEN, PodGenerator from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager from airflow.utils.helpers import prune_dict +from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import add_xcom_sidecar if TYPE_CHECKING: import jinja2 @@ -165,6 +166,18 @@ def manage_template_specs(self): raise AirflowException("either application_file or template_spec should be passed") if "spark" not in template_body: template_body = {"spark": template_body} + if self.do_xcom_push: + try: + self.log.debug("Adding xcom sidecar to driver pod spec in task %s", self.task_id) + driver_template = template_body["spark"]["spec"]["driver"] + driver_with_xcom_template = add_xcom_sidecar( + driver_template, + sidecar_container_image=self.hook.get_xcom_sidecar_container_image(), + sidecar_container_resources=self.hook.get_xcom_sidecar_container_resources(), + ) + template_body["spark"]["spec"]["driver"] = driver_with_xcom_template + except KeyError as e: + raise AirflowException("Driver spec missing in SparkApplication template") from e return template_body def create_job_name(self): @@ -259,6 +272,14 @@ def get_or_create_spark_crd(self, launcher: CustomObjectLauncher, context) -> k8 driver_pod = self.find_spark_job(context) if driver_pod: return driver_pod + # + # if self.do_xcom_push: + # self.log.debug("Adding xcom sidecar to task %s", self.task_id) + # pod = add_xcom_sidecar( + # pod, + # sidecar_container_image=self.hook.get_xcom_sidecar_container_image(), + # sidecar_container_resources=self.hook.get_xcom_sidecar_container_resources(), + # ) driver_pod, spark_obj_spec = launcher.start_spark_job( image=self.image, code_path=self.code_path, startup_timeout=self.startup_timeout_seconds From f63c0d23f4ea0e6fa3fe5e1474fe00ce23ad4ebd Mon Sep 17 00:00:00 2001 From: kesem Date: Mon, 23 Jun 2025 00:24:41 +0300 Subject: [PATCH 02/11] this version is working --- chart/values.yaml | 4 +- .../cncf/kubernetes/hooks/kubernetes.py | 2 + .../kubernetes/operators/spark_kubernetes.py | 60 +++++++++++-------- .../cncf/kubernetes/utils/xcom_sidecar.py | 50 +++++++++++++++- 4 files changed, 88 insertions(+), 28 deletions(-) diff --git a/chart/values.yaml b/chart/values.yaml index fc733631293d6..2b5d057a6fe3a 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -365,7 +365,7 @@ airflowLocalSettings: |- rbac: # Specifies whether RBAC resources should be created create: true - createSCCRoleBinding: false + createSCCRoleBinding: true # Airflow executor # One or multiple of: LocalExecutor, CeleryExecutor, KubernetesExecutor @@ -1968,7 +1968,7 @@ triggerer: # Airflow Dag Processor Config dagProcessor: - enabled: ~ + enabled: true # Number of airflow dag processors in the deployment replicas: 1 # Max number of old replicasets to retain diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py index f90de71855f9d..d1eb1715d59cc 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py @@ -397,6 +397,8 @@ def get_namespace(self) -> str | None: def get_xcom_sidecar_container_image(self): """Return the xcom sidecar image that defined in the connection.""" + print("shlomit") + print(self._get_field("xcom_sidecar_container_image")) return self._get_field("xcom_sidecar_container_image") def get_xcom_sidecar_container_resources(self): diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py index 9d69c34f5ad2f..96ffedc9681a2 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py @@ -32,7 +32,7 @@ from airflow.providers.cncf.kubernetes.pod_generator import MAX_LABEL_LEN, PodGenerator from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager from airflow.utils.helpers import prune_dict -from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import add_xcom_sidecar +from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import add_xcom_sidecar, add_sidecar if TYPE_CHECKING: import jinja2 @@ -166,18 +166,6 @@ def manage_template_specs(self): raise AirflowException("either application_file or template_spec should be passed") if "spark" not in template_body: template_body = {"spark": template_body} - if self.do_xcom_push: - try: - self.log.debug("Adding xcom sidecar to driver pod spec in task %s", self.task_id) - driver_template = template_body["spark"]["spec"]["driver"] - driver_with_xcom_template = add_xcom_sidecar( - driver_template, - sidecar_container_image=self.hook.get_xcom_sidecar_container_image(), - sidecar_container_resources=self.hook.get_xcom_sidecar_container_resources(), - ) - template_body["spark"]["spec"]["driver"] = driver_with_xcom_template - except KeyError as e: - raise AirflowException("Driver spec missing in SparkApplication template") from e return template_body def create_job_name(self): @@ -243,7 +231,7 @@ def pod_manager(self) -> PodManager: def _try_numbers_match(context, pod) -> bool: return pod.metadata.labels["try_number"] == context["ti"].try_number - @property + @cached_property def template_body(self): """Templated body for CustomObjectLauncher.""" return self.manage_template_specs() @@ -272,15 +260,6 @@ def get_or_create_spark_crd(self, launcher: CustomObjectLauncher, context) -> k8 driver_pod = self.find_spark_job(context) if driver_pod: return driver_pod - # - # if self.do_xcom_push: - # self.log.debug("Adding xcom sidecar to task %s", self.task_id) - # pod = add_xcom_sidecar( - # pod, - # sidecar_container_image=self.hook.get_xcom_sidecar_container_image(), - # sidecar_container_resources=self.hook.get_xcom_sidecar_container_resources(), - # ) - driver_pod, spark_obj_spec = launcher.start_spark_job( image=self.image, code_path=self.code_path, startup_timeout=self.startup_timeout_seconds ) @@ -314,10 +293,43 @@ def client(self) -> CoreV1Api: def custom_obj_api(self) -> CustomObjectsApi: return CustomObjectsApi() + def update_pod_spec_add_xcom_sidecar(self): + if self.do_xcom_push: + try: + self.log.debug("Adding xcom sidecar to driver pod spec in task %s", self.task_id) + print("0") + print(self.template_body["spark"]) + print("1") + print(self.template_body["spark"]["spec"]) + print("2") + driver_template = self.template_body["spark"]["spec"] + print("3") + print(driver_template) + print(self.hook.get_xcom_sidecar_container_image()) + print(self.hook.get_xcom_sidecar_container_resources()) + print("roni") + driver_with_xcom_template = add_sidecar( + driver_template, + sidecar_container_image=self.hook.get_xcom_sidecar_container_image(), + sidecar_container_resources=self.hook.get_xcom_sidecar_container_resources(), + ) + print("4") + self.template_body["spark"]["spec"]= driver_with_xcom_template + print("5") + print("kesem shaked") + print(driver_with_xcom_template) + print(self.template_body) + except KeyError as e: + raise AirflowException("Driver spec missing in SparkApplication template") from e + def execute(self, context: Context): self.name = self.create_job_name() - self.log.info("Creating sparkApplication.") + print("kesem") + print(self.template_body) + self.update_pod_spec_add_xcom_sidecar() + print("kesem shaked hello in here") + print(self.template_body) self.launcher = CustomObjectLauncher( name=self.name, namespace=self.namespace, diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py index 6cdc9febb0252..79af07aba8c84 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py @@ -29,8 +29,9 @@ class PodDefaults: XCOM_MOUNT_PATH = "/airflow/xcom" SIDECAR_CONTAINER_NAME = "airflow-xcom-sidecar" XCOM_CMD = 'trap "exit 0" INT; while true; do sleep 1; done;' - VOLUME_MOUNT = k8s.V1VolumeMount(name="xcom", mount_path=XCOM_MOUNT_PATH) - VOLUME = k8s.V1Volume(name="xcom", empty_dir=k8s.V1EmptyDirVolumeSource()) + VOLUME_MOUNT_NAME= "xcom" + VOLUME_MOUNT = k8s.V1VolumeMount(name=VOLUME_MOUNT_NAME, mount_path=XCOM_MOUNT_PATH) + VOLUME = k8s.V1Volume(name=VOLUME_MOUNT_NAME, empty_dir=k8s.V1EmptyDirVolumeSource()) SIDECAR_CONTAINER = k8s.V1Container( name=SIDECAR_CONTAINER_NAME, command=["sh", "-c", XCOM_CMD], @@ -64,3 +65,48 @@ def add_xcom_sidecar( pod_cp.spec.containers.append(sidecar) return pod_cp + +def remove_none(d): + if isinstance(d, dict): + return {k: remove_none(v) for k, v in d.items() if v is not None} + elif isinstance(d, list): + return [remove_none(v) for v in d if v is not None] + else: + return d + +def add_sidecar(spec,sidecar_container_image: str | None = None, sidecar_container_resources= None): + driver_template = copy.deepcopy(spec) + print("zmira") + # print(sidecar.to_dict()) + # driver_template["sidecars"] = sidecar.to_dict() + #TODO: i checked this but check again if adding the dict using the object v1 and so on will work + driver_template["volumes"]= [ + { + "name": "xcom", + "emptyDir": {} + } +] + driver_template["driver"]["volumeMounts"]= [ + { + "name": PodDefaults.VOLUME_MOUNT_NAME, + "mountPath": PodDefaults.XCOM_MOUNT_PATH + }] + driver_template["driver"]["sidecars"] = [ + { + "name": PodDefaults.SIDECAR_CONTAINER_NAME, + "command": ["sh", "-c", PodDefaults.XCOM_CMD], + "image": sidecar_container_image or PodDefaults.SIDECAR_CONTAINER.image, + "volumeMounts": [ + { + "name": PodDefaults.VOLUME_MOUNT_NAME, + "mountPath": PodDefaults.XCOM_MOUNT_PATH + } + ], + "env": [ + {"name": "MY_ENV_VAR", "value": "value"} + ] + } + ] + # driver_template["sidecars"] = remove_none(PodDefaultsfaults.SIDECAR_CONTAINER.to_dict()) + print(driver_template) + return driver_template From efb2bd77d5f3a5a713af5980c8a8df6d3075d76a Mon Sep 17 00:00:00 2001 From: kesem Date: Mon, 23 Jun 2025 02:51:21 +0300 Subject: [PATCH 03/11] final version without test changes --- .../kubernetes/operators/spark_kubernetes.py | 26 ++-------- .../cncf/kubernetes/utils/xcom_sidecar.py | 48 +++++++------------ 2 files changed, 20 insertions(+), 54 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py index 96ffedc9681a2..413912ed96804 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py @@ -32,7 +32,7 @@ from airflow.providers.cncf.kubernetes.pod_generator import MAX_LABEL_LEN, PodGenerator from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager from airflow.utils.helpers import prune_dict -from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import add_xcom_sidecar, add_sidecar +from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import add_sidecar_to_spark_operator_pod_spec if TYPE_CHECKING: import jinja2 @@ -297,39 +297,20 @@ def update_pod_spec_add_xcom_sidecar(self): if self.do_xcom_push: try: self.log.debug("Adding xcom sidecar to driver pod spec in task %s", self.task_id) - print("0") - print(self.template_body["spark"]) - print("1") - print(self.template_body["spark"]["spec"]) - print("2") driver_template = self.template_body["spark"]["spec"] - print("3") - print(driver_template) - print(self.hook.get_xcom_sidecar_container_image()) - print(self.hook.get_xcom_sidecar_container_resources()) - print("roni") - driver_with_xcom_template = add_sidecar( + driver_with_xcom_template = add_sidecar_to_spark_operator_pod_spec( driver_template, sidecar_container_image=self.hook.get_xcom_sidecar_container_image(), sidecar_container_resources=self.hook.get_xcom_sidecar_container_resources(), ) - print("4") self.template_body["spark"]["spec"]= driver_with_xcom_template - print("5") - print("kesem shaked") - print(driver_with_xcom_template) - print(self.template_body) except KeyError as e: - raise AirflowException("Driver spec missing in SparkApplication template") from e + raise AirflowException("Spec missing in SparkApplication template") from e def execute(self, context: Context): self.name = self.create_job_name() self.log.info("Creating sparkApplication.") - print("kesem") - print(self.template_body) self.update_pod_spec_add_xcom_sidecar() - print("kesem shaked hello in here") - print(self.template_body) self.launcher = CustomObjectLauncher( name=self.name, namespace=self.namespace, @@ -339,7 +320,6 @@ def execute(self, context: Context): ) self.pod = self.get_or_create_spark_crd(self.launcher, context) self.pod_request_obj = self.launcher.pod_spec - return super().execute(context=context) def on_kill(self) -> None: diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py index 79af07aba8c84..576c225663317 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py @@ -21,6 +21,8 @@ import copy from kubernetes.client import models as k8s +from kubernetes import client + class PodDefaults: @@ -31,10 +33,11 @@ class PodDefaults: XCOM_CMD = 'trap "exit 0" INT; while true; do sleep 1; done;' VOLUME_MOUNT_NAME= "xcom" VOLUME_MOUNT = k8s.V1VolumeMount(name=VOLUME_MOUNT_NAME, mount_path=XCOM_MOUNT_PATH) + XCOM_SIDECAR_COMMAND = ["sh", "-c", XCOM_CMD] VOLUME = k8s.V1Volume(name=VOLUME_MOUNT_NAME, empty_dir=k8s.V1EmptyDirVolumeSource()) SIDECAR_CONTAINER = k8s.V1Container( name=SIDECAR_CONTAINER_NAME, - command=["sh", "-c", XCOM_CMD], + command=XCOM_SIDECAR_COMMAND, image="alpine", volume_mounts=[VOLUME_MOUNT], resources=k8s.V1ResourceRequirements( @@ -47,7 +50,7 @@ class PodDefaults: def add_xcom_sidecar( - pod: k8s.V1Pod, + pod: Union[k8s.V1Pod, dict], *, sidecar_container_image: str | None = None, sidecar_container_resources: k8s.V1ResourceRequirements | dict | None = None, @@ -66,47 +69,30 @@ def add_xcom_sidecar( return pod_cp -def remove_none(d): - if isinstance(d, dict): - return {k: remove_none(v) for k, v in d.items() if v is not None} - elif isinstance(d, list): - return [remove_none(v) for v in d if v is not None] - else: - return d -def add_sidecar(spec,sidecar_container_image: str | None = None, sidecar_container_resources= None): + +def add_sidecar_to_spark_operator_pod_spec(spec,sidecar_container_image: str | None = None, sidecar_container_resources= None): + #The Spark Operator expects a custom SparkApplication object, which is different from the standard Kubernetes Pod model. driver_template = copy.deepcopy(spec) - print("zmira") - # print(sidecar.to_dict()) - # driver_template["sidecars"] = sidecar.to_dict() - #TODO: i checked this but check again if adding the dict using the object v1 and so on will work - driver_template["volumes"]= [ - { - "name": "xcom", - "emptyDir": {} - } -] - driver_template["driver"]["volumeMounts"]= [ - { - "name": PodDefaults.VOLUME_MOUNT_NAME, - "mountPath": PodDefaults.XCOM_MOUNT_PATH - }] + driver_template["volumes"] = [PodDefaults.VOLUME.to_dict()] + driver_template["driver"]["volumeMounts"] = [ + { + "name": PodDefaults.VOLUME_MOUNT_NAME, + "mountPath": PodDefaults.XCOM_MOUNT_PATH + }] driver_template["driver"]["sidecars"] = [ { "name": PodDefaults.SIDECAR_CONTAINER_NAME, - "command": ["sh", "-c", PodDefaults.XCOM_CMD], + "command": PodDefaults.XCOM_SIDECAR_COMMAND, "image": sidecar_container_image or PodDefaults.SIDECAR_CONTAINER.image, "volumeMounts": [ { "name": PodDefaults.VOLUME_MOUNT_NAME, "mountPath": PodDefaults.XCOM_MOUNT_PATH } - ], - "env": [ - {"name": "MY_ENV_VAR", "value": "value"} ] } ] - # driver_template["sidecars"] = remove_none(PodDefaultsfaults.SIDECAR_CONTAINER.to_dict()) - print(driver_template) + if sidecar_container_resources: + driver_template["driver"]["sidecars"]["resources"] = sidecar_container_resources return driver_template From 48b582645ed57090159c86287170588f67bc1bfa Mon Sep 17 00:00:00 2001 From: kesem Date: Mon, 23 Jun 2025 04:09:10 +0300 Subject: [PATCH 04/11] delete prints --- .../src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py index d1eb1715d59cc..f90de71855f9d 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py @@ -397,8 +397,6 @@ def get_namespace(self) -> str | None: def get_xcom_sidecar_container_image(self): """Return the xcom sidecar image that defined in the connection.""" - print("shlomit") - print(self._get_field("xcom_sidecar_container_image")) return self._get_field("xcom_sidecar_container_image") def get_xcom_sidecar_container_resources(self): From 4f172d6967e31e8ed0898cf3321a8d8f5007cf42 Mon Sep 17 00:00:00 2001 From: kesem0811 Date: Tue, 24 Jun 2025 00:19:28 +0300 Subject: [PATCH 05/11] your commit message --- chart/values.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/chart/values.yaml b/chart/values.yaml index 2b5d057a6fe3a..fc733631293d6 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -365,7 +365,7 @@ airflowLocalSettings: |- rbac: # Specifies whether RBAC resources should be created create: true - createSCCRoleBinding: true + createSCCRoleBinding: false # Airflow executor # One or multiple of: LocalExecutor, CeleryExecutor, KubernetesExecutor @@ -1968,7 +1968,7 @@ triggerer: # Airflow Dag Processor Config dagProcessor: - enabled: true + enabled: ~ # Number of airflow dag processors in the deployment replicas: 1 # Max number of old replicasets to retain From bb747dddf11b1c20f954e37b2a5bc0ae972730a3 Mon Sep 17 00:00:00 2001 From: kesem0811 Date: Sun, 29 Jun 2025 21:42:01 +0300 Subject: [PATCH 06/11] cr fix --- .../providers/cncf/kubernetes/operators/spark_kubernetes.py | 2 +- .../src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py index 413912ed96804..e6573010ae123 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py @@ -293,7 +293,7 @@ def client(self) -> CoreV1Api: def custom_obj_api(self) -> CustomObjectsApi: return CustomObjectsApi() - def update_pod_spec_add_xcom_sidecar(self): + def update_pod_spec_add_xcom_sidecar(self)->None: if self.do_xcom_push: try: self.log.debug("Adding xcom sidecar to driver pod spec in task %s", self.task_id) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py index 576c225663317..8c17e49050fe5 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py @@ -50,7 +50,7 @@ class PodDefaults: def add_xcom_sidecar( - pod: Union[k8s.V1Pod, dict], + pod: k8s.V1Pod| dict, *, sidecar_container_image: str | None = None, sidecar_container_resources: k8s.V1ResourceRequirements | dict | None = None, From 87085c8d28e54e57f9b46f9f466535662bc1d2f3 Mon Sep 17 00:00:00 2001 From: kesem0811 Date: Sun, 29 Jun 2025 23:35:03 +0300 Subject: [PATCH 07/11] cr fix --- .../providers/cncf/kubernetes/utils/xcom_sidecar.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py index 8c17e49050fe5..1cb24f14554a7 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py @@ -24,14 +24,13 @@ from kubernetes import client - class PodDefaults: """Static defaults for Pods.""" XCOM_MOUNT_PATH = "/airflow/xcom" SIDECAR_CONTAINER_NAME = "airflow-xcom-sidecar" XCOM_CMD = 'trap "exit 0" INT; while true; do sleep 1; done;' - VOLUME_MOUNT_NAME= "xcom" + VOLUME_MOUNT_NAME = "xcom" VOLUME_MOUNT = k8s.V1VolumeMount(name=VOLUME_MOUNT_NAME, mount_path=XCOM_MOUNT_PATH) XCOM_SIDECAR_COMMAND = ["sh", "-c", XCOM_CMD] VOLUME = k8s.V1Volume(name=VOLUME_MOUNT_NAME, empty_dir=k8s.V1EmptyDirVolumeSource()) @@ -50,7 +49,7 @@ class PodDefaults: def add_xcom_sidecar( - pod: k8s.V1Pod| dict, + pod: k8s.V1Pod | dict, *, sidecar_container_image: str | None = None, sidecar_container_resources: k8s.V1ResourceRequirements | dict | None = None, @@ -70,9 +69,9 @@ def add_xcom_sidecar( return pod_cp - -def add_sidecar_to_spark_operator_pod_spec(spec,sidecar_container_image: str | None = None, sidecar_container_resources= None): - #The Spark Operator expects a custom SparkApplication object, which is different from the standard Kubernetes Pod model. +def add_sidecar_to_spark_operator_pod_spec(spec: dict, sidecar_container_image: str | None = None, + sidecar_container_resources: dict | None = None): + # The Spark Operator expects a custom SparkApplication object, which is different from the standard Kubernetes Pod model. driver_template = copy.deepcopy(spec) driver_template["volumes"] = [PodDefaults.VOLUME.to_dict()] driver_template["driver"]["volumeMounts"] = [ From c4919ea51a0d5b5a9453e881b12a3db10cf59dc9 Mon Sep 17 00:00:00 2001 From: kesem0811 Date: Wed, 30 Jul 2025 13:17:03 +0300 Subject: [PATCH 08/11] your message --- .../providers/cncf/kubernetes/operators/spark_kubernetes.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py index e6573010ae123..c9a024580dbc9 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py @@ -295,7 +295,6 @@ def custom_obj_api(self) -> CustomObjectsApi: def update_pod_spec_add_xcom_sidecar(self)->None: if self.do_xcom_push: - try: self.log.debug("Adding xcom sidecar to driver pod spec in task %s", self.task_id) driver_template = self.template_body["spark"]["spec"] driver_with_xcom_template = add_sidecar_to_spark_operator_pod_spec( @@ -304,8 +303,6 @@ def update_pod_spec_add_xcom_sidecar(self)->None: sidecar_container_resources=self.hook.get_xcom_sidecar_container_resources(), ) self.template_body["spark"]["spec"]= driver_with_xcom_template - except KeyError as e: - raise AirflowException("Spec missing in SparkApplication template") from e def execute(self, context: Context): self.name = self.create_job_name() From 6d892e94da6687d2bd4f4d5c95a36b1d41f2ab45 Mon Sep 17 00:00:00 2001 From: kesem0811 Date: Wed, 30 Jul 2025 14:48:48 +0300 Subject: [PATCH 09/11] run ruff format --- .../cncf/kubernetes/utils/xcom_sidecar.py | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py index 1cb24f14554a7..8c1ce12d46094 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py @@ -69,27 +69,23 @@ def add_xcom_sidecar( return pod_cp -def add_sidecar_to_spark_operator_pod_spec(spec: dict, sidecar_container_image: str | None = None, - sidecar_container_resources: dict | None = None): +def add_sidecar_to_spark_operator_pod_spec( + spec: dict, sidecar_container_image: str | None = None, sidecar_container_resources: dict | None = None +): # The Spark Operator expects a custom SparkApplication object, which is different from the standard Kubernetes Pod model. driver_template = copy.deepcopy(spec) driver_template["volumes"] = [PodDefaults.VOLUME.to_dict()] driver_template["driver"]["volumeMounts"] = [ - { - "name": PodDefaults.VOLUME_MOUNT_NAME, - "mountPath": PodDefaults.XCOM_MOUNT_PATH - }] + {"name": PodDefaults.VOLUME_MOUNT_NAME, "mountPath": PodDefaults.XCOM_MOUNT_PATH} + ] driver_template["driver"]["sidecars"] = [ { "name": PodDefaults.SIDECAR_CONTAINER_NAME, "command": PodDefaults.XCOM_SIDECAR_COMMAND, "image": sidecar_container_image or PodDefaults.SIDECAR_CONTAINER.image, "volumeMounts": [ - { - "name": PodDefaults.VOLUME_MOUNT_NAME, - "mountPath": PodDefaults.XCOM_MOUNT_PATH - } - ] + {"name": PodDefaults.VOLUME_MOUNT_NAME, "mountPath": PodDefaults.XCOM_MOUNT_PATH} + ], } ] if sidecar_container_resources: From 02574574bc6317e5068c5589c5ac210ec93f689f Mon Sep 17 00:00:00 2001 From: Kesem Date: Thu, 25 Sep 2025 14:33:25 +0300 Subject: [PATCH 10/11] fix: resolve mypy type error in xcom_sidecar.py - Add proper type checking for dict vs V1Pod inputs in add_xcom_sidecar() - Convert dict inputs to V1Pod objects to ensure .spec attribute access - Maintains backward compatibility while fixing type safety - Resolves: Item 'dict[Any, Any]' has no attribute 'spec' [union-attr] --- .../providers/cncf/kubernetes/utils/xcom_sidecar.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py index 8c1ce12d46094..3c360ef25a0e2 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py @@ -56,7 +56,13 @@ def add_xcom_sidecar( ) -> k8s.V1Pod: """Add sidecar.""" pod_cp = copy.deepcopy(pod) - pod_cp.spec.volumes = pod.spec.volumes or [] + + # Handle both V1Pod object and dict cases + if isinstance(pod_cp, dict): + # Convert dict to V1Pod object if needed + pod_cp = k8s.V1Pod(**pod_cp) + + pod_cp.spec.volumes = pod_cp.spec.volumes or [] pod_cp.spec.volumes.insert(0, PodDefaults.VOLUME) pod_cp.spec.containers[0].volume_mounts = pod_cp.spec.containers[0].volume_mounts or [] pod_cp.spec.containers[0].volume_mounts.insert(0, PodDefaults.VOLUME_MOUNT) From 68a132479952422d7cb6dd96efcb43cd46fc11b8 Mon Sep 17 00:00:00 2001 From: Kesem Date: Sun, 5 Oct 2025 13:48:02 +0300 Subject: [PATCH 11/11] style: apply ruff formatting to spark_kubernetes.py --- .../providers/cncf/kubernetes/operators/spark_kubernetes.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py index 0bbcdfafc64b7..56ce902fce63d 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py @@ -268,6 +268,7 @@ def get_or_create_spark_crd(self, launcher: CustomObjectLauncher, context) -> k8 image=self.image, code_path=self.code_path, startup_timeout=self.startup_timeout_seconds ) return driver_pod + def process_pod_deletion(self, pod, *, reraise=True): if pod is not None: if self.delete_on_termination: @@ -357,10 +358,10 @@ def _setup_spark_configuration(self, context: Context): spec_dict[component]["labels"].update(task_context_labels) self.log.info("Creating sparkApplication.") - + # Add xcom sidecar if needed self.update_pod_spec_add_xcom_sidecar() - + self.launcher = CustomObjectLauncher( name=self.name, namespace=self.namespace,