Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ jobs:
org.apache.comet.exec.CometGenerateExecSuite
org.apache.comet.exec.CometWindowExecSuite
org.apache.comet.exec.CometJoinSuite
org.apache.comet.exec.CometPythonMapInArrowSuite
org.apache.comet.CometNativeSuite
org.apache.comet.CometSparkSessionExtensionsSuite
org.apache.spark.CometPluginsSuite
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ jobs:
org.apache.comet.exec.CometGenerateExecSuite
org.apache.comet.exec.CometWindowExecSuite
org.apache.comet.exec.CometJoinSuite
org.apache.comet.exec.CometPythonMapInArrowSuite
org.apache.comet.CometNativeSuite
org.apache.comet.CometSparkSessionExtensionsSuite
org.apache.spark.CometPluginsSuite
Expand Down
115 changes: 115 additions & 0 deletions .github/workflows/pyarrow_udf_test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

name: PyArrow UDF Tests

concurrency:
group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }}
cancel-in-progress: true

on:
push:
branches:
- main
paths-ignore:
- "benchmarks/**"
- "doc/**"
- "docs/**"
- "**.md"
- "dev/changelog/*.md"
- "native/core/benches/**"
- "native/spark-expr/benches/**"
- "spark/src/test/scala/org/apache/spark/sql/benchmark/**"
- "spark/src/main/scala/org/apache/comet/GenerateDocs.scala"
pull_request:
paths-ignore:
- "benchmarks/**"
- "doc/**"
- "docs/**"
- "**.md"
- "dev/changelog/*.md"
- "native/core/benches/**"
- "native/spark-expr/benches/**"
- "spark/src/test/scala/org/apache/spark/sql/benchmark/**"
- "spark/src/main/scala/org/apache/comet/GenerateDocs.scala"
workflow_dispatch:

permissions:
contents: read

env:
RUST_VERSION: stable
RUST_BACKTRACE: 1
RUSTFLAGS: "-Clink-arg=-fuse-ld=bfd"

jobs:
pyarrow-udf:
name: PyArrow UDF (Spark 3.5, JDK 17, Python 3.11)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be Spark 4 now?
I'm assuming that this is enabled for only one version of Spark because it is experimental?

runs-on: ubuntu-latest
container:
# Pinned to the Debian 12 (bookworm) base so the system `python3` is 3.11. The default
# `amd64/rust` image is Debian 13 (trixie) which ships Python 3.13 and no python3.11 apt
# package, breaking `apt-get install python3.11`.
image: rust:bookworm
env:
JAVA_TOOL_OPTIONS: "--add-exports=java.base/sun.nio.ch=ALL-UNNAMED --add-exports=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED"
steps:
- uses: actions/checkout@v6

- name: Setup Rust & Java toolchain
uses: ./.github/actions/setup-builder
with:
rust-version: ${{ env.RUST_VERSION }}
jdk-version: 17

- name: Cache Maven dependencies
uses: actions/cache@v5
with:
path: |
~/.m2/repository
/root/.m2/repository
key: ${{ runner.os }}-java-maven-${{ hashFiles('**/pom.xml') }}-pyarrow-udf
restore-keys: |
${{ runner.os }}-java-maven-

- name: Build Comet (release, Spark 3.5 / Scala 2.12)
run: |
cd native && cargo build --release
cd .. && ./mvnw -B -Prelease install -DskipTests -Pspark-3.5 -Pscala-2.12

- name: Install Python 3.11 and pip
run: |
apt-get update
apt-get install -y --no-install-recommends python3 python3-venv python3-pip
python3 -m venv /tmp/venv
/tmp/venv/bin/pip install --upgrade pip
/tmp/venv/bin/pip install "pyspark==3.5.8" "pyarrow>=14" pandas pytest

- name: Run PyArrow UDF pytest
env:
# Spark launches Python workers in a fresh subprocess and looks up `python3`
# on PATH unless PYSPARK_PYTHON is set. Without this, workers use the system
# python which has no pyarrow installed and UDF execution fails with
# ModuleNotFoundError.
PYSPARK_PYTHON: /tmp/venv/bin/python
PYSPARK_DRIVER_PYTHON: /tmp/venv/bin/python
run: |
jar=$(ls "$PWD"/spark/target/comet-spark-spark3.5_2.12-*-SNAPSHOT.jar \
| grep -v sources | grep -v tests | head -n1)
echo "Using $jar"
COMET_JAR="$jar" /tmp/venv/bin/python -m pytest -v \
spark/src/test/resources/pyspark/test_pyarrow_udf.py
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed
12 changes: 12 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,18 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_PYTHON_MAP_IN_ARROW_ENABLED: ConfigEntry[Boolean] =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MAP is confusing IMO, so many things related to map in Spark

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe COMET_PYARROW_SUPPORT or something like that?

conf("spark.comet.exec.pythonMapInArrow.enabled")
.category(CATEGORY_EXEC)
.doc(
"Experimental: whether to enable optimized execution of PyArrow UDFs " +
"(mapInArrow/mapInPandas). When enabled, Comet passes Arrow columnar data " +
"directly to Python UDFs without the intermediate Arrow-to-Row-to-Arrow " +
"conversion that Spark normally performs. Disabled by default while the " +
"feature stabilizes.")
.booleanConf
.createWithDefault(false)

val COMET_TRACING_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.tracing.enabled")
.category(CATEGORY_TUNING)
.doc(s"Enable fine-grained tracing of events and memory usage. $TRACING_GUIDE.")
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/latest/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,6 @@ Comet $COMET_VERSION User Guide
Understanding Comet Plans <understanding-comet-plans>
Tuning Guide <tuning>
Metrics Guide <metrics>
PyArrow UDF Acceleration <pyarrow-udfs>
Iceberg Guide <iceberg>
Kubernetes Guide <kubernetes>
160 changes: 160 additions & 0 deletions docs/source/user-guide/latest/pyarrow-udfs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# PyArrow UDF Acceleration

Comet can accelerate Python UDFs that use PyArrow-backed batch processing, such as `mapInArrow` and `mapInPandas`.
These APIs are commonly used for ML inference, feature engineering, and data transformation workloads.

## Background

Spark's `mapInArrow` and `mapInPandas` APIs allow users to apply Python functions that operate on Arrow
RecordBatches or Pandas DataFrames. Under the hood, Spark communicates with the Python worker process
using the Arrow IPC format.

Without Comet, the execution path for these UDFs involves unnecessary data conversions:

1. Comet reads data in Arrow columnar format (via CometScan)
2. Spark inserts a ColumnarToRow transition (converts Arrow to UnsafeRow)
3. The Python runner converts those rows back to Arrow to send to Python
4. Python executes the UDF on Arrow batches
5. Results are returned as Arrow and then converted back to rows

Steps 2 and 3 are redundant since the data starts and ends in Arrow format.

## How Comet Optimizes This

When enabled, Comet detects `PythonMapInArrowExec` and `MapInPandasExec` operators in the physical plan
and replaces them with `CometPythonMapInArrowExec`, which:

- Reads Arrow columnar batches directly from the upstream Comet operator
- Feeds them to the Python runner without the expensive UnsafeProjection copy
- Keeps the Python output in columnar format for downstream operators

This eliminates the ColumnarToRow transition and the output row conversion, reducing CPU overhead
and memory allocations.

## Configuration

The optimization is experimental and disabled by default. Enable it with:

```
spark.comet.exec.pythonMapInArrow.enabled=true
```

The default is `false` while the feature stabilizes.

## Supported APIs

| PySpark API | Spark Plan Node | Supported |
| -------------------------------- | --------------------------- | --------- |
| `df.mapInArrow(func, schema)` | `PythonMapInArrowExec` | Yes |
| `df.mapInPandas(func, schema)` | `MapInPandasExec` | Yes |
| `@pandas_udf` (scalar) | `ArrowEvalPythonExec` | Not yet |
| `df.applyInPandas(func, schema)` | `FlatMapGroupsInPandasExec` | Not yet |

## Example

```python
import pyarrow as pa
from pyspark.sql import SparkSession, types as T

spark = SparkSession.builder \
.config("spark.plugins", "org.apache.spark.CometPlugin") \
.config("spark.comet.enabled", "true") \
.config("spark.comet.exec.enabled", "true") \
.config("spark.comet.exec.pythonMapInArrow.enabled", "true") \
.config("spark.memory.offHeap.enabled", "true") \
.config("spark.memory.offHeap.size", "2g") \
.getOrCreate()

df = spark.read.parquet("data.parquet")

def transform(batch: pa.RecordBatch) -> pa.RecordBatch:
# Your transformation logic here
table = batch.to_pandas()
table["new_col"] = table["value"] * 2
return pa.RecordBatch.from_pandas(table)

output_schema = T.StructType([
T.StructField("value", T.DoubleType()),
T.StructField("new_col", T.DoubleType()),
])

result = df.mapInArrow(transform, output_schema)
```

## Verifying the Optimization

Use `explain()` to verify that `CometPythonMapInArrow` appears in your plan:

```python
result.explain(mode="extended")
```

You should see:

```
CometPythonMapInArrow ...
+- CometNativeExec ...
+- CometScan ...
```

Instead of the unoptimized plan:

```
PythonMapInArrow ...
+- ColumnarToRow
+- CometNativeExec ...
+- CometScan ...
```

When AQE is enabled (the Spark default) and the query contains a shuffle, the
optimization is applied during stage materialization. Calling `explain()` before
running an action will show the unoptimized plan:

```
AdaptiveSparkPlan isFinalPlan=false
+- PythonMapInArrow ...
+- CometExchange ...
```

To see the optimized plan, run an action first (for example `result.collect()` or
`result.cache(); result.count()`) and then call `explain()`. The post-execution
plan shows the materialized stages and includes `CometPythonMapInArrow` if the
optimization fired.

## Limitations

- The optimization currently applies only to `mapInArrow` and `mapInPandas`. Scalar pandas UDFs
(`@pandas_udf`) and grouped operations (`applyInPandas`) are not yet supported.
- The internal row-to-Arrow conversion inside the Python runner is still present in this version.
Comet currently routes columnar input through `ColumnarBatch.rowIterator()` so that the existing
`ArrowPythonRunner` can re-encode the rows back to Arrow IPC. A future optimization will write
Arrow batches directly to the Python IPC stream, eliminating the remaining round-trip and
achieving near zero-copy data transfer.
- The optimization requires Arrow data on the input side. If a shuffle sits between the upstream
Comet operator and the Python UDF, you need Comet's native shuffle for the optimization to
apply. Set `spark.shuffle.manager` to
`org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager` and enable
`spark.comet.exec.shuffle.enabled=true` at session startup. With a vanilla Spark `Exchange`
in the plan the data leaves the shuffle as rows and the optimization cannot fire.
- Spark 3.4 lacks several APIs the optimization depends on (`MapInBatchExec.isBarrier`,
`arrowUseLargeVarTypes`, `JobArtifactSet`, the modern `ArrowPythonRunner` constructor). On
Spark 3.4 the feature is a no-op even when enabled. Spark 3.5+ is required.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ package org.apache.comet.rules
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.sideBySide
import org.apache.spark.sql.comet.{CometCollectLimitExec, CometColumnarToRowExec, CometNativeColumnarToRowExec, CometNativeWriteExec, CometPlan, CometSparkToColumnarExec}
import org.apache.spark.sql.comet.{CometCollectLimitExec, CometColumnarToRowExec, CometNativeColumnarToRowExec, CometNativeWriteExec, CometPlan, CometPythonMapInArrowExec, CometSparkToColumnarExec}
import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometShuffleExchangeExec}
import org.apache.spark.sql.comet.shims.ShimCometPythonMapInArrow
import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.QueryStageExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
Expand Down Expand Up @@ -51,7 +52,9 @@ import org.apache.comet.CometConf
// various reasons) or Spark requests row-based output such as a `collect` call. Spark will adds
// another `ColumnarToRowExec` on top of `CometSparkToColumnarExec`. In this case, the pair could
// be removed.
case class EliminateRedundantTransitions(session: SparkSession) extends Rule[SparkPlan] {
case class EliminateRedundantTransitions(session: SparkSession)
extends Rule[SparkPlan]
with ShimCometPythonMapInArrow {

private lazy val showTransformations = CometConf.COMET_EXPLAIN_TRANSFORMATIONS.get()

Expand Down Expand Up @@ -98,6 +101,26 @@ case class EliminateRedundantTransitions(session: SparkSession) extends Rule[Spa
case CometNativeColumnarToRowExec(sparkToColumnar: CometSparkToColumnarExec) =>
sparkToColumnar.child
case CometSparkToColumnarExec(child: CometSparkToColumnarExec) => child
// Replace MapInBatchExec (PythonMapInArrowExec / MapInArrowExec / MapInPandasExec) that has
// a ColumnarToRow child with CometPythonMapInArrowExec to avoid the unnecessary
// Arrow->Row->Arrow round-trip. The matchers are version-shimmed: Spark 3.4 returns None
// (it lacks the required APIs) and Spark 4.1+ matches the renamed `MapInArrowExec`.
case p: SparkPlan
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking if it actually makes sense to have python/udf support in separate module? so user can build comet with python support or not

if CometConf.COMET_PYTHON_MAP_IN_ARROW_ENABLED.get() &&
matchMapInArrow(p).orElse(matchMapInPandas(p)).isDefined =>
val (mapFunc, mapOutput, mapChild, mapIsBarrier, mapEvalType) =
matchMapInArrow(p).orElse(matchMapInPandas(p)).get
extractColumnarChild(mapChild)
.map { columnarChild =>
CometPythonMapInArrowExec(
mapFunc,
mapOutput,
columnarChild,
mapIsBarrier,
mapEvalType)
}
.getOrElse(p)

// Spark adds `RowToColumnar` under Comet columnar shuffle. But it's redundant as the
// shuffle takes row-based input.
case s @ CometShuffleExchangeExec(
Expand Down Expand Up @@ -130,6 +153,18 @@ case class EliminateRedundantTransitions(session: SparkSession) extends Rule[Spa
}
}

/**
* If the given plan is a ColumnarToRow transition wrapping a columnar child, returns that
* columnar child. Used to detect and eliminate unnecessary transitions before Python UDF
* operators.
*/
private def extractColumnarChild(plan: SparkPlan): Option[SparkPlan] = plan match {
case ColumnarToRowExec(child) if child.supportsColumnar => Some(child)
case CometColumnarToRowExec(child) => Some(child)
case CometNativeColumnarToRowExec(child) => Some(child)
case _ => None
}

/**
* Creates an appropriate columnar to row transition operator.
*
Expand Down
Loading
Loading