fix xcom push not working in SparkKubernetesOperator#52051
Conversation
| ) | ||
| self.template_body["spark"]["spec"]= driver_with_xcom_template | ||
| except KeyError as e: | ||
| raise AirflowException("Spec missing in SparkApplication template") from e |
There was a problem hiding this comment.
let's not use AirflowException, we can create a customized exception or just use KeyError
There was a problem hiding this comment.
Why?
In all the file, when something doesn't work as expected, an AirflowException is raised
There was a problem hiding this comment.
AirflowException is broad and not infomative
|
|
||
| def add_xcom_sidecar( | ||
| pod: k8s.V1Pod, | ||
| pod: Union[k8s.V1Pod, dict], |
There was a problem hiding this comment.
| pod: Union[k8s.V1Pod, dict], | |
| pod: k8s.V1Pod | dict, |
| ) | ||
| self.template_body["spark"]["spec"]= driver_with_xcom_template | ||
| except KeyError as e: | ||
| raise AirflowException("Spec missing in SparkApplication template") from e |
There was a problem hiding this comment.
AirflowException is broad and not infomative
…ure/fix_xcom_push_in_SparkKubernetesOperator
SparkKubernetesOperator
|
@kesem0811 can you fix the static checks? |
Lee-W
left a comment
There was a problem hiding this comment.
I think we'll fix static checks
…ator - Resolved merge conflict in spark_kubernetes.py - Preserved xcom sidecar functionality from local changes - Integrated upstream improvements including deferrable support and better reattach logic - Combined both approaches: upstream refactoring + local xcom features
- Add proper type checking for dict vs V1Pod inputs in add_xcom_sidecar() - Convert dict inputs to V1Pod objects to ensure .spec attribute access - Maintains backward compatibility while fixing type safety - Resolves: Item 'dict[Any, Any]' has no attribute 'spec' [union-attr]
|
the static checks failed with the error: |
Lee-W
left a comment
There was a problem hiding this comment.
in general looks good, but we'll need unit tests for this
| VOLUME_MOUNT_NAME = "xcom" | ||
| VOLUME_MOUNT = k8s.V1VolumeMount(name=VOLUME_MOUNT_NAME, mount_path=XCOM_MOUNT_PATH) | ||
| XCOM_SIDECAR_COMMAND = ["sh", "-c", XCOM_CMD] | ||
| VOLUME = k8s.V1Volume(name=VOLUME_MOUNT_NAME, empty_dir=k8s.V1EmptyDirVolumeSource()) |
There was a problem hiding this comment.
| VOLUME_MOUNT_NAME = "xcom" | |
| VOLUME_MOUNT = k8s.V1VolumeMount(name=VOLUME_MOUNT_NAME, mount_path=XCOM_MOUNT_PATH) | |
| XCOM_SIDECAR_COMMAND = ["sh", "-c", XCOM_CMD] | |
| VOLUME = k8s.V1Volume(name=VOLUME_MOUNT_NAME, empty_dir=k8s.V1EmptyDirVolumeSource()) | |
| XCOM_SIDECAR_COMMAND = ["sh", "-c", XCOM_CMD] | |
| VOLUME_MOUNT_NAME = "xcom" | |
| VOLUME_MOUNT = k8s.V1VolumeMount(name=VOLUME_MOUNT_NAME, mount_path=XCOM_MOUNT_PATH) | |
| VOLUME = k8s.V1Volume(name=VOLUME_MOUNT_NAME, empty_dir=k8s.V1EmptyDirVolumeSource()) |
let's group similar concepts together
|
|
||
| def add_xcom_sidecar( | ||
| pod: k8s.V1Pod, | ||
| pod: k8s.V1Pod | dict, |
There was a problem hiding this comment.
| pod: k8s.V1Pod | dict, | |
| pod: k8s.V1Pod | dict[str, Any], |
| def add_sidecar_to_spark_operator_pod_spec( | ||
| spec: dict, sidecar_container_image: str | None = None, sidecar_container_resources: dict | None = None | ||
| ): |
There was a problem hiding this comment.
| def add_sidecar_to_spark_operator_pod_spec( | |
| spec: dict, sidecar_container_image: str | None = None, sidecar_container_resources: dict | None = None | |
| ): | |
| def add_sidecar_to_spark_operator_pod_spec( | |
| spec: dict[str, Any], sidecar_container_image: str | None = None, sidecar_container_resources: dict[str, Any] | None = None | |
| ) -> dict[str, Any]: |
please confirm whether these type are correct. Thanks!
|
@kesem0811 Can you also fix the static checks CI error? Should be autofixed by your prek pre-commit hooks, assuming you have them installed. |
|
will also need your help resolving the conflict. Thanks! |
|
This pull request has had no activity from the author for over 4 weeks. We are converting it to draft to keep the review queue manageable. @kesem0811, please mark this PR as ready for review when you are ready to continue working on it. Thank you for your contribution! |
|
@kesem0811 This draft PR has had no activity for 3 weeks. Closing to keep the queue clean. You are welcome to reopen and continue when you're ready. If you'd like to pick it back up, please rebase onto the current Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you. |
|
Hi everyone - sorry for not responding earlier. That’s on me, and I understand why the PR was closed. I’ve opened a new PR that addresses the review feedback and completes the fix: #68788 |
Fixes: #39184 where SparkKubernetesOperator tasks hang indefinitely because no sidecar is injected to read /airflow/xcom/return.json.