-
Notifications
You must be signed in to change notification settings - Fork 17.3k
fix xcom push not working in SparkKubernetesOperator
#52051
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
kesem0811
wants to merge
17
commits into
apache:main
from
kesem0811:feature/fix_xcom_push_in_SparkKubernetesOperator
+65
−16
Closed
Changes from all commits
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
ee4a5d8
add xcom sidecar
kesem0811 f63c0d2
this version is working
kesem0811 efb2bd7
final version without test changes
kesem0811 48b5826
delete prints
kesem0811 4f172d6
your commit message
kesem0811 bb747dd
cr fix
kesem0811 87085c8
cr fix
kesem0811 c4919ea
your message
kesem0811 4c022ca
Merge branch 'main' of https://github.com/kesem0811/airflow into feat…
kesem0811 95054f2
resolve conflicts
kesem0811 6d892e9
run ruff format
kesem0811 df0b081
Merge upstream main into feature/fix_xcom_push_in_SparkKubernetesOper…
kesem0811 0257457
fix: resolve mypy type error in xcom_sidecar.py
kesem0811 92125a6
Merge branch 'main' into feature/fix_xcom_push_in_SparkKubernetesOper…
romsharon98 68a1324
style: apply ruff formatting to spark_kubernetes.py
kesem0811 2bc61bc
ruff formating
kesem0811 fe841e3
Merge branch 'main' into feature/fix_xcom_push_in_SparkKubernetesOper…
kesem0811 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -21,6 +21,7 @@ | |||||||||||||
| import copy | ||||||||||||||
|
|
||||||||||||||
| from kubernetes.client import models as k8s | ||||||||||||||
| from kubernetes import client | ||||||||||||||
|
|
||||||||||||||
|
|
||||||||||||||
| class PodDefaults: | ||||||||||||||
|
|
@@ -29,11 +30,13 @@ class PodDefaults: | |||||||||||||
| XCOM_MOUNT_PATH = "/airflow/xcom" | ||||||||||||||
| SIDECAR_CONTAINER_NAME = "airflow-xcom-sidecar" | ||||||||||||||
| XCOM_CMD = 'trap "exit 0" INT; while true; do sleep 1; done;' | ||||||||||||||
| VOLUME_MOUNT = k8s.V1VolumeMount(name="xcom", mount_path=XCOM_MOUNT_PATH) | ||||||||||||||
| VOLUME = k8s.V1Volume(name="xcom", empty_dir=k8s.V1EmptyDirVolumeSource()) | ||||||||||||||
| VOLUME_MOUNT_NAME = "xcom" | ||||||||||||||
| VOLUME_MOUNT = k8s.V1VolumeMount(name=VOLUME_MOUNT_NAME, mount_path=XCOM_MOUNT_PATH) | ||||||||||||||
| XCOM_SIDECAR_COMMAND = ["sh", "-c", XCOM_CMD] | ||||||||||||||
| VOLUME = k8s.V1Volume(name=VOLUME_MOUNT_NAME, empty_dir=k8s.V1EmptyDirVolumeSource()) | ||||||||||||||
| SIDECAR_CONTAINER = k8s.V1Container( | ||||||||||||||
| name=SIDECAR_CONTAINER_NAME, | ||||||||||||||
| command=["sh", "-c", XCOM_CMD], | ||||||||||||||
| command=XCOM_SIDECAR_COMMAND, | ||||||||||||||
| image="alpine", | ||||||||||||||
| volume_mounts=[VOLUME_MOUNT], | ||||||||||||||
| resources=k8s.V1ResourceRequirements( | ||||||||||||||
|
|
@@ -46,14 +49,20 @@ class PodDefaults: | |||||||||||||
|
|
||||||||||||||
|
|
||||||||||||||
| def add_xcom_sidecar( | ||||||||||||||
| pod: k8s.V1Pod, | ||||||||||||||
| pod: k8s.V1Pod | dict, | ||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||
| *, | ||||||||||||||
| sidecar_container_image: str | None = None, | ||||||||||||||
| sidecar_container_resources: k8s.V1ResourceRequirements | dict | None = None, | ||||||||||||||
| ) -> k8s.V1Pod: | ||||||||||||||
| """Add sidecar.""" | ||||||||||||||
| pod_cp = copy.deepcopy(pod) | ||||||||||||||
| pod_cp.spec.volumes = pod.spec.volumes or [] | ||||||||||||||
|
|
||||||||||||||
| # Handle both V1Pod object and dict cases | ||||||||||||||
| if isinstance(pod_cp, dict): | ||||||||||||||
| # Convert dict to V1Pod object if needed | ||||||||||||||
| pod_cp = k8s.V1Pod(**pod_cp) | ||||||||||||||
|
|
||||||||||||||
| pod_cp.spec.volumes = pod_cp.spec.volumes or [] | ||||||||||||||
| pod_cp.spec.volumes.insert(0, PodDefaults.VOLUME) | ||||||||||||||
| pod_cp.spec.containers[0].volume_mounts = pod_cp.spec.containers[0].volume_mounts or [] | ||||||||||||||
| pod_cp.spec.containers[0].volume_mounts.insert(0, PodDefaults.VOLUME_MOUNT) | ||||||||||||||
|
|
@@ -64,3 +73,27 @@ def add_xcom_sidecar( | |||||||||||||
| pod_cp.spec.containers.append(sidecar) | ||||||||||||||
|
|
||||||||||||||
| return pod_cp | ||||||||||||||
|
|
||||||||||||||
|
|
||||||||||||||
| def add_sidecar_to_spark_operator_pod_spec( | ||||||||||||||
| spec: dict, sidecar_container_image: str | None = None, sidecar_container_resources: dict | None = None | ||||||||||||||
| ): | ||||||||||||||
|
Comment on lines
+78
to
+80
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
please confirm whether these type are correct. Thanks! |
||||||||||||||
| # The Spark Operator expects a custom SparkApplication object, which is different from the standard Kubernetes Pod model. | ||||||||||||||
| driver_template = copy.deepcopy(spec) | ||||||||||||||
| driver_template["volumes"] = [PodDefaults.VOLUME.to_dict()] | ||||||||||||||
| driver_template["driver"]["volumeMounts"] = [ | ||||||||||||||
| {"name": PodDefaults.VOLUME_MOUNT_NAME, "mountPath": PodDefaults.XCOM_MOUNT_PATH} | ||||||||||||||
| ] | ||||||||||||||
| driver_template["driver"]["sidecars"] = [ | ||||||||||||||
| { | ||||||||||||||
| "name": PodDefaults.SIDECAR_CONTAINER_NAME, | ||||||||||||||
| "command": PodDefaults.XCOM_SIDECAR_COMMAND, | ||||||||||||||
| "image": sidecar_container_image or PodDefaults.SIDECAR_CONTAINER.image, | ||||||||||||||
| "volumeMounts": [ | ||||||||||||||
| {"name": PodDefaults.VOLUME_MOUNT_NAME, "mountPath": PodDefaults.XCOM_MOUNT_PATH} | ||||||||||||||
| ], | ||||||||||||||
| } | ||||||||||||||
| ] | ||||||||||||||
| if sidecar_container_resources: | ||||||||||||||
| driver_template["driver"]["sidecars"]["resources"] = sidecar_container_resources | ||||||||||||||
| return driver_template | ||||||||||||||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's group similar concepts together