feat: Add support for accelerated PyArrow UDFs [experimental]#4234
Draft
andygrove wants to merge 11 commits intoapache:mainfrom
Draft
feat: Add support for accelerated PyArrow UDFs [experimental]#4234andygrove wants to merge 11 commits intoapache:mainfrom
andygrove wants to merge 11 commits intoapache:mainfrom
Conversation
When Comet operators produce Arrow columnar data and the next operator is a Python UDF (mapInArrow/mapInPandas), Spark currently inserts an unnecessary ColumnarToRow transition. The Python runner then converts those rows back to Arrow to send to Python, creating a wasteful Arrow->Row->Arrow round-trip. This adds CometPythonMapInArrowExec which: - Accepts columnar input directly from Comet operators - Uses lightweight batch.rowIterator() instead of UnsafeProjection - Keeps the Python output as ColumnarBatch (no output row conversion) The optimization is detected in EliminateRedundantTransitions and controlled by spark.comet.exec.pythonMapInArrow.enabled (default: true).
Documents the CometPythonMapInArrowExec optimization, including supported APIs, configuration, usage example, and how to verify the optimization is active in query plans.
…ions Fix three issues that prevented test_pyarrow_udf.py from running: 1. mapInArrow callbacks must accept Iterator[pa.RecordBatch] and yield batches. The previous single-batch signatures crashed with "'map' object has no attribute 'to_pandas'". 2. PySpark DataFrame has no `queryExecution` attribute. Use `_jdf.queryExecution().executedPlan().toString()` instead. 3. Replace soft plan-string heuristics with assertions that fail loudly if the optimization regresses. Match on `CometPythonMapInArrow` (no `Exec` suffix in the plan toString) and assert no `ColumnarToRow` transition is present.
- Rewrite test_pyarrow_udf.py as a pytest module. A session-scoped SparkSession fixture builds the Comet-enabled session once and a parametrized `accelerated` fixture toggles spark.comet.exec.pythonMapInArrow.enabled per test, so each case runs under both the optimized and fallback paths and asserts the expected plan operator (`CometPythonMapInArrow` vs vanilla `PythonMapInArrow`). The jar is auto-discovered from spark/target by matching the installed pyspark version, or taken from the COMET_JAR env var. - Add a dedicated `PyArrow UDF Tests` workflow that builds Comet against Spark 3.5 / Scala 2.12, installs pyspark/pyarrow/pandas/pytest, and runs the new pytest module. - Add CometPythonMapInArrowSuite to the `exec` suite list in both pr_build_linux.yml and pr_build_macos.yml so the JVM-side suite is exercised on every PR.
Replace the narrow paths allowlist with the same paths-ignore list used by pr_build_linux.yml so the workflow runs on any source change that could affect Comet's PyArrow UDF execution path, not just the few files explicitly named.
The PR's `CometPythonMapInArrowExec` and `EliminateRedundantTransitions` rule directly reference Spark 3.5 APIs that differ across supported Spark versions: the `ArrowPythonRunner` constructor (4 distinct signatures across 3.4/3.5/4.0/4.1+/4.2), `arrowUseLargeVarTypes`, `JobArtifactSet`, `MapInBatchExec.isBarrier`, and the `PythonMapInArrowExec` type itself (renamed to `MapInArrowExec` in 4.0+). This breaks compile on every profile other than 3.5. Introduce a per-version `ShimCometPythonMapInArrow` trait under `org.apache.spark.sql.comet.shims` (placed in the spark namespace so it can reach `private[spark]` members) that: * matches the Spark-version-specific MapInArrow / MapInPandas exec types and exposes their `(func, output, child, isBarrier, evalType)` tuple, * constructs the right `ArrowPythonRunner` for the version, * hides `arrowUseLargeVarTypes` / `JobArtifactSet` / `getPythonRunnerConfMap` behind helper methods. Spark 3.4 lacks the prerequisite APIs (no `isBarrier`, no `JobArtifactSet`, no `arrowUseLargeVarTypes`), so its shim returns `None` from the matchers and the optimization is a no-op there.
The default `amd64/rust` image is Debian 13 (trixie), where the system `python3` is 3.13 and there is no `python3.11` apt package. The workflow installed `python3.11` explicitly, which fails on trixie with `Unable to locate package python3.11`. Switching to `rust:bookworm` gives a Debian 12 base where `python3` is 3.11, matching the job name and pyspark 3.5.x's supported runtime.
Spark launches Python workers in fresh subprocesses that look up python3 on PATH. Without PYSPARK_PYTHON, workers use the system python (no pyarrow installed) and UDF execution fails with ModuleNotFoundError. Point both PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON at /tmp/venv/bin/python so workers inherit the same interpreter that pytest uses.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #957
Rationale for this change
Spark supports optimized Python UDFs using Arrow format already. However, it involves performing an expensive row-to-column transition to convert to Arrow, and then an expensive column-to-row transition to convert back to rows for the next operator. Comet can avoid both transitions since the format is already Arrow.
What changes are included in this PR?
CometPythonMapInArrowExecoperator + plumbing + testsHow are these changes tested?
Python test is added