Skip to content

feat: Add support for accelerated PyArrow UDFs [experimental]#4234

Draft
andygrove wants to merge 11 commits intoapache:mainfrom
andygrove:pyarrow-udf
Draft

feat: Add support for accelerated PyArrow UDFs [experimental]#4234
andygrove wants to merge 11 commits intoapache:mainfrom
andygrove:pyarrow-udf

Conversation

@andygrove
Copy link
Copy Markdown
Member

@andygrove andygrove commented May 6, 2026

Which issue does this PR close?

Closes #957

Rationale for this change

Spark supports optimized Python UDFs using Arrow format already. However, it involves performing an expensive row-to-column transition to convert to Arrow, and then an expensive column-to-row transition to convert back to rows for the next operator. Comet can avoid both transitions since the format is already Arrow.

What changes are included in this PR?

  • New CometPythonMapInArrowExec operator + plumbing + tests
  • Documentation

How are these changes tested?

Python test is added

andygrove added 5 commits May 5, 2026 18:06
When Comet operators produce Arrow columnar data and the next operator
is a Python UDF (mapInArrow/mapInPandas), Spark currently inserts an
unnecessary ColumnarToRow transition. The Python runner then converts
those rows back to Arrow to send to Python, creating a wasteful
Arrow->Row->Arrow round-trip.

This adds CometPythonMapInArrowExec which:
- Accepts columnar input directly from Comet operators
- Uses lightweight batch.rowIterator() instead of UnsafeProjection
- Keeps the Python output as ColumnarBatch (no output row conversion)

The optimization is detected in EliminateRedundantTransitions and
controlled by spark.comet.exec.pythonMapInArrow.enabled (default: true).
Documents the CometPythonMapInArrowExec optimization, including
supported APIs, configuration, usage example, and how to verify
the optimization is active in query plans.
…ions

Fix three issues that prevented test_pyarrow_udf.py from running:

1. mapInArrow callbacks must accept Iterator[pa.RecordBatch] and yield
   batches. The previous single-batch signatures crashed with
   "'map' object has no attribute 'to_pandas'".
2. PySpark DataFrame has no `queryExecution` attribute. Use
   `_jdf.queryExecution().executedPlan().toString()` instead.
3. Replace soft plan-string heuristics with assertions that fail loudly
   if the optimization regresses. Match on `CometPythonMapInArrow` (no
   `Exec` suffix in the plan toString) and assert no `ColumnarToRow`
   transition is present.
- Rewrite test_pyarrow_udf.py as a pytest module. A session-scoped
  SparkSession fixture builds the Comet-enabled session once and a
  parametrized `accelerated` fixture toggles
  spark.comet.exec.pythonMapInArrow.enabled per test, so each case runs
  under both the optimized and fallback paths and asserts the expected
  plan operator (`CometPythonMapInArrow` vs vanilla `PythonMapInArrow`).
  The jar is auto-discovered from spark/target by matching the installed
  pyspark version, or taken from the COMET_JAR env var.
- Add a dedicated `PyArrow UDF Tests` workflow that builds Comet against
  Spark 3.5 / Scala 2.12, installs pyspark/pyarrow/pandas/pytest, and
  runs the new pytest module.
- Add CometPythonMapInArrowSuite to the `exec` suite list in both
  pr_build_linux.yml and pr_build_macos.yml so the JVM-side suite is
  exercised on every PR.
Comment thread .github/workflows/pyarrow_udf_test.yml Fixed
andygrove added 6 commits May 5, 2026 18:46
Replace the narrow paths allowlist with the same paths-ignore list used
by pr_build_linux.yml so the workflow runs on any source change that
could affect Comet's PyArrow UDF execution path, not just the few files
explicitly named.
The PR's `CometPythonMapInArrowExec` and `EliminateRedundantTransitions`
rule directly reference Spark 3.5 APIs that differ across supported
Spark versions: the `ArrowPythonRunner` constructor (4 distinct
signatures across 3.4/3.5/4.0/4.1+/4.2), `arrowUseLargeVarTypes`,
`JobArtifactSet`, `MapInBatchExec.isBarrier`, and the `PythonMapInArrowExec`
type itself (renamed to `MapInArrowExec` in 4.0+). This breaks compile
on every profile other than 3.5.

Introduce a per-version `ShimCometPythonMapInArrow` trait under
`org.apache.spark.sql.comet.shims` (placed in the spark namespace so
it can reach `private[spark]` members) that:

* matches the Spark-version-specific MapInArrow / MapInPandas exec types
  and exposes their `(func, output, child, isBarrier, evalType)` tuple,
* constructs the right `ArrowPythonRunner` for the version,
* hides `arrowUseLargeVarTypes` / `JobArtifactSet` / `getPythonRunnerConfMap`
  behind helper methods.

Spark 3.4 lacks the prerequisite APIs (no `isBarrier`, no `JobArtifactSet`,
no `arrowUseLargeVarTypes`), so its shim returns `None` from the matchers
and the optimization is a no-op there.
The default `amd64/rust` image is Debian 13 (trixie), where the system
`python3` is 3.13 and there is no `python3.11` apt package. The workflow
installed `python3.11` explicitly, which fails on trixie with `Unable to
locate package python3.11`.

Switching to `rust:bookworm` gives a Debian 12 base where `python3` is
3.11, matching the job name and pyspark 3.5.x's supported runtime.
Spark launches Python workers in fresh subprocesses that look up python3
on PATH. Without PYSPARK_PYTHON, workers use the system python (no pyarrow
installed) and UDF execution fails with ModuleNotFoundError. Point both
PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON at /tmp/venv/bin/python so workers
inherit the same interpreter that pytest uses.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Is it possible to support PyArrow backed UDFs in Comet natively?

2 participants