-
Notifications
You must be signed in to change notification settings - Fork 314
feat: Add experimental support for accelerated PyArrow UDFs #4234
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
a454360
84aec84
af98fbb
f29cb2f
f751539
b14fbfb
ca0bbbf
55c28c3
05b1e7a
66eb246
ec6fa78
1de2c2f
3f68cbe
e2ca2d2
f4b5c32
3822ed7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,115 @@ | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an | ||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
|
|
||
| name: PyArrow UDF Tests | ||
|
|
||
| concurrency: | ||
| group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }} | ||
| cancel-in-progress: true | ||
|
|
||
| on: | ||
| push: | ||
| branches: | ||
| - main | ||
| paths-ignore: | ||
| - "benchmarks/**" | ||
| - "doc/**" | ||
| - "docs/**" | ||
| - "**.md" | ||
| - "dev/changelog/*.md" | ||
| - "native/core/benches/**" | ||
| - "native/spark-expr/benches/**" | ||
| - "spark/src/test/scala/org/apache/spark/sql/benchmark/**" | ||
| - "spark/src/main/scala/org/apache/comet/GenerateDocs.scala" | ||
| pull_request: | ||
| paths-ignore: | ||
| - "benchmarks/**" | ||
| - "doc/**" | ||
| - "docs/**" | ||
| - "**.md" | ||
| - "dev/changelog/*.md" | ||
| - "native/core/benches/**" | ||
| - "native/spark-expr/benches/**" | ||
| - "spark/src/test/scala/org/apache/spark/sql/benchmark/**" | ||
| - "spark/src/main/scala/org/apache/comet/GenerateDocs.scala" | ||
| workflow_dispatch: | ||
|
|
||
| permissions: | ||
| contents: read | ||
|
|
||
| env: | ||
| RUST_VERSION: stable | ||
| RUST_BACKTRACE: 1 | ||
| RUSTFLAGS: "-Clink-arg=-fuse-ld=bfd" | ||
|
|
||
| jobs: | ||
| pyarrow-udf: | ||
| name: PyArrow UDF (Spark 3.5, JDK 17, Python 3.11) | ||
| runs-on: ubuntu-latest | ||
| container: | ||
| # Pinned to the Debian 12 (bookworm) base so the system `python3` is 3.11. The default | ||
| # `amd64/rust` image is Debian 13 (trixie) which ships Python 3.13 and no python3.11 apt | ||
| # package, breaking `apt-get install python3.11`. | ||
| image: rust:bookworm | ||
| env: | ||
| JAVA_TOOL_OPTIONS: "--add-exports=java.base/sun.nio.ch=ALL-UNNAMED --add-exports=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED" | ||
| steps: | ||
| - uses: actions/checkout@v6 | ||
|
|
||
| - name: Setup Rust & Java toolchain | ||
| uses: ./.github/actions/setup-builder | ||
| with: | ||
| rust-version: ${{ env.RUST_VERSION }} | ||
| jdk-version: 17 | ||
|
|
||
| - name: Cache Maven dependencies | ||
| uses: actions/cache@v5 | ||
| with: | ||
| path: | | ||
| ~/.m2/repository | ||
| /root/.m2/repository | ||
| key: ${{ runner.os }}-java-maven-${{ hashFiles('**/pom.xml') }}-pyarrow-udf | ||
| restore-keys: | | ||
| ${{ runner.os }}-java-maven- | ||
|
|
||
| - name: Build Comet (release, Spark 3.5 / Scala 2.12) | ||
| run: | | ||
| cd native && cargo build --release | ||
| cd .. && ./mvnw -B -Prelease install -DskipTests -Pspark-3.5 -Pscala-2.12 | ||
|
|
||
| - name: Install Python 3.11 and pip | ||
| run: | | ||
| apt-get update | ||
| apt-get install -y --no-install-recommends python3 python3-venv python3-pip | ||
| python3 -m venv /tmp/venv | ||
| /tmp/venv/bin/pip install --upgrade pip | ||
| /tmp/venv/bin/pip install "pyspark==3.5.8" "pyarrow>=14" pandas pytest | ||
|
|
||
| - name: Run PyArrow UDF pytest | ||
| env: | ||
| # Spark launches Python workers in a fresh subprocess and looks up `python3` | ||
| # on PATH unless PYSPARK_PYTHON is set. Without this, workers use the system | ||
| # python which has no pyarrow installed and UDF execution fails with | ||
| # ModuleNotFoundError. | ||
| PYSPARK_PYTHON: /tmp/venv/bin/python | ||
| PYSPARK_DRIVER_PYTHON: /tmp/venv/bin/python | ||
| run: | | ||
| jar=$(ls "$PWD"/spark/target/comet-spark-spark3.5_2.12-*-SNAPSHOT.jar \ | ||
| | grep -v sources | grep -v tests | head -n1) | ||
| echo "Using $jar" | ||
| COMET_JAR="$jar" /tmp/venv/bin/python -m pytest -v \ | ||
| spark/src/test/resources/pyspark/test_pyarrow_udf.py | ||
|
github-advanced-security[bot] marked this conversation as resolved.
Fixed
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -314,6 +314,18 @@ object CometConf extends ShimCometConf { | |
| .booleanConf | ||
| .createWithDefault(false) | ||
|
|
||
| val COMET_PYTHON_MAP_IN_ARROW_ENABLED: ConfigEntry[Boolean] = | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. MAP is confusing IMO, so many things related to map in Spark
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe |
||
| conf("spark.comet.exec.pythonMapInArrow.enabled") | ||
| .category(CATEGORY_EXEC) | ||
| .doc( | ||
| "Experimental: whether to enable optimized execution of PyArrow UDFs " + | ||
| "(mapInArrow/mapInPandas). When enabled, Comet passes Arrow columnar data " + | ||
| "directly to Python UDFs without the intermediate Arrow-to-Row-to-Arrow " + | ||
| "conversion that Spark normally performs. Disabled by default while the " + | ||
| "feature stabilizes.") | ||
| .booleanConf | ||
| .createWithDefault(false) | ||
|
|
||
| val COMET_TRACING_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.tracing.enabled") | ||
| .category(CATEGORY_TUNING) | ||
| .doc(s"Enable fine-grained tracing of events and memory usage. $TRACING_GUIDE.") | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,160 @@ | ||
| <!--- | ||
| Licensed to the Apache Software Foundation (ASF) under one | ||
| or more contributor license agreements. See the NOTICE file | ||
| distributed with this work for additional information | ||
| regarding copyright ownership. The ASF licenses this file | ||
| to you under the Apache License, Version 2.0 (the | ||
| "License"); you may not use this file except in compliance | ||
| with the License. You may obtain a copy of the License at | ||
|
|
||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| Unless required by applicable law or agreed to in writing, | ||
| software distributed under the License is distributed on an | ||
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| KIND, either express or implied. See the License for the | ||
| specific language governing permissions and limitations | ||
| under the License. | ||
| --> | ||
|
|
||
| # PyArrow UDF Acceleration | ||
|
|
||
| Comet can accelerate Python UDFs that use PyArrow-backed batch processing, such as `mapInArrow` and `mapInPandas`. | ||
| These APIs are commonly used for ML inference, feature engineering, and data transformation workloads. | ||
|
|
||
| ## Background | ||
|
|
||
| Spark's `mapInArrow` and `mapInPandas` APIs allow users to apply Python functions that operate on Arrow | ||
| RecordBatches or Pandas DataFrames. Under the hood, Spark communicates with the Python worker process | ||
| using the Arrow IPC format. | ||
|
|
||
| Without Comet, the execution path for these UDFs involves unnecessary data conversions: | ||
|
|
||
| 1. Comet reads data in Arrow columnar format (via CometScan) | ||
| 2. Spark inserts a ColumnarToRow transition (converts Arrow to UnsafeRow) | ||
| 3. The Python runner converts those rows back to Arrow to send to Python | ||
| 4. Python executes the UDF on Arrow batches | ||
| 5. Results are returned as Arrow and then converted back to rows | ||
|
|
||
| Steps 2 and 3 are redundant since the data starts and ends in Arrow format. | ||
|
|
||
| ## How Comet Optimizes This | ||
|
|
||
| When enabled, Comet detects `PythonMapInArrowExec` and `MapInPandasExec` operators in the physical plan | ||
| and replaces them with `CometPythonMapInArrowExec`, which: | ||
|
|
||
| - Reads Arrow columnar batches directly from the upstream Comet operator | ||
| - Feeds them to the Python runner without the expensive UnsafeProjection copy | ||
| - Keeps the Python output in columnar format for downstream operators | ||
|
|
||
| This eliminates the ColumnarToRow transition and the output row conversion, reducing CPU overhead | ||
| and memory allocations. | ||
|
|
||
| ## Configuration | ||
|
|
||
| The optimization is experimental and disabled by default. Enable it with: | ||
|
|
||
| ``` | ||
| spark.comet.exec.pythonMapInArrow.enabled=true | ||
| ``` | ||
|
|
||
| The default is `false` while the feature stabilizes. | ||
|
|
||
| ## Supported APIs | ||
|
|
||
| | PySpark API | Spark Plan Node | Supported | | ||
| | -------------------------------- | --------------------------- | --------- | | ||
| | `df.mapInArrow(func, schema)` | `PythonMapInArrowExec` | Yes | | ||
| | `df.mapInPandas(func, schema)` | `MapInPandasExec` | Yes | | ||
| | `@pandas_udf` (scalar) | `ArrowEvalPythonExec` | Not yet | | ||
| | `df.applyInPandas(func, schema)` | `FlatMapGroupsInPandasExec` | Not yet | | ||
|
|
||
| ## Example | ||
|
|
||
| ```python | ||
| import pyarrow as pa | ||
| from pyspark.sql import SparkSession, types as T | ||
|
|
||
| spark = SparkSession.builder \ | ||
| .config("spark.plugins", "org.apache.spark.CometPlugin") \ | ||
| .config("spark.comet.enabled", "true") \ | ||
| .config("spark.comet.exec.enabled", "true") \ | ||
| .config("spark.comet.exec.pythonMapInArrow.enabled", "true") \ | ||
| .config("spark.memory.offHeap.enabled", "true") \ | ||
| .config("spark.memory.offHeap.size", "2g") \ | ||
| .getOrCreate() | ||
|
|
||
| df = spark.read.parquet("data.parquet") | ||
|
|
||
| def transform(batch: pa.RecordBatch) -> pa.RecordBatch: | ||
| # Your transformation logic here | ||
| table = batch.to_pandas() | ||
| table["new_col"] = table["value"] * 2 | ||
| return pa.RecordBatch.from_pandas(table) | ||
|
|
||
| output_schema = T.StructType([ | ||
| T.StructField("value", T.DoubleType()), | ||
| T.StructField("new_col", T.DoubleType()), | ||
| ]) | ||
|
|
||
| result = df.mapInArrow(transform, output_schema) | ||
| ``` | ||
|
|
||
| ## Verifying the Optimization | ||
|
|
||
| Use `explain()` to verify that `CometPythonMapInArrow` appears in your plan: | ||
|
|
||
| ```python | ||
| result.explain(mode="extended") | ||
| ``` | ||
|
|
||
| You should see: | ||
|
|
||
| ``` | ||
| CometPythonMapInArrow ... | ||
| +- CometNativeExec ... | ||
| +- CometScan ... | ||
| ``` | ||
|
|
||
| Instead of the unoptimized plan: | ||
|
|
||
| ``` | ||
| PythonMapInArrow ... | ||
| +- ColumnarToRow | ||
| +- CometNativeExec ... | ||
| +- CometScan ... | ||
| ``` | ||
|
|
||
| When AQE is enabled (the Spark default) and the query contains a shuffle, the | ||
| optimization is applied during stage materialization. Calling `explain()` before | ||
| running an action will show the unoptimized plan: | ||
|
|
||
| ``` | ||
| AdaptiveSparkPlan isFinalPlan=false | ||
| +- PythonMapInArrow ... | ||
| +- CometExchange ... | ||
| ``` | ||
|
|
||
| To see the optimized plan, run an action first (for example `result.collect()` or | ||
| `result.cache(); result.count()`) and then call `explain()`. The post-execution | ||
| plan shows the materialized stages and includes `CometPythonMapInArrow` if the | ||
| optimization fired. | ||
|
|
||
| ## Limitations | ||
|
|
||
| - The optimization currently applies only to `mapInArrow` and `mapInPandas`. Scalar pandas UDFs | ||
| (`@pandas_udf`) and grouped operations (`applyInPandas`) are not yet supported. | ||
| - The internal row-to-Arrow conversion inside the Python runner is still present in this version. | ||
| Comet currently routes columnar input through `ColumnarBatch.rowIterator()` so that the existing | ||
| `ArrowPythonRunner` can re-encode the rows back to Arrow IPC. A future optimization will write | ||
| Arrow batches directly to the Python IPC stream, eliminating the remaining round-trip and | ||
| achieving near zero-copy data transfer. | ||
| - The optimization requires Arrow data on the input side. If a shuffle sits between the upstream | ||
| Comet operator and the Python UDF, you need Comet's native shuffle for the optimization to | ||
| apply. Set `spark.shuffle.manager` to | ||
| `org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager` and enable | ||
| `spark.comet.exec.shuffle.enabled=true` at session startup. With a vanilla Spark `Exchange` | ||
| in the plan the data leaves the shuffle as rows and the optimization cannot fire. | ||
| - Spark 3.4 lacks several APIs the optimization depends on (`MapInBatchExec.isBarrier`, | ||
| `arrowUseLargeVarTypes`, `JobArtifactSet`, the modern `ArrowPythonRunner` constructor). On | ||
| Spark 3.4 the feature is a no-op even when enabled. Spark 3.5+ is required. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,8 +22,9 @@ package org.apache.comet.rules | |
| import org.apache.spark.sql.SparkSession | ||
| import org.apache.spark.sql.catalyst.rules.Rule | ||
| import org.apache.spark.sql.catalyst.util.sideBySide | ||
| import org.apache.spark.sql.comet.{CometCollectLimitExec, CometColumnarToRowExec, CometNativeColumnarToRowExec, CometNativeWriteExec, CometPlan, CometSparkToColumnarExec} | ||
| import org.apache.spark.sql.comet.{CometCollectLimitExec, CometColumnarToRowExec, CometNativeColumnarToRowExec, CometNativeWriteExec, CometPlan, CometPythonMapInArrowExec, CometSparkToColumnarExec} | ||
| import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometShuffleExchangeExec} | ||
| import org.apache.spark.sql.comet.shims.ShimCometPythonMapInArrow | ||
| import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec, SparkPlan} | ||
| import org.apache.spark.sql.execution.adaptive.QueryStageExec | ||
| import org.apache.spark.sql.execution.exchange.ReusedExchangeExec | ||
|
|
@@ -51,7 +52,9 @@ import org.apache.comet.CometConf | |
| // various reasons) or Spark requests row-based output such as a `collect` call. Spark will adds | ||
| // another `ColumnarToRowExec` on top of `CometSparkToColumnarExec`. In this case, the pair could | ||
| // be removed. | ||
| case class EliminateRedundantTransitions(session: SparkSession) extends Rule[SparkPlan] { | ||
| case class EliminateRedundantTransitions(session: SparkSession) | ||
| extends Rule[SparkPlan] | ||
| with ShimCometPythonMapInArrow { | ||
|
|
||
| private lazy val showTransformations = CometConf.COMET_EXPLAIN_TRANSFORMATIONS.get() | ||
|
|
||
|
|
@@ -98,6 +101,26 @@ case class EliminateRedundantTransitions(session: SparkSession) extends Rule[Spa | |
| case CometNativeColumnarToRowExec(sparkToColumnar: CometSparkToColumnarExec) => | ||
| sparkToColumnar.child | ||
| case CometSparkToColumnarExec(child: CometSparkToColumnarExec) => child | ||
| // Replace MapInBatchExec (PythonMapInArrowExec / MapInArrowExec / MapInPandasExec) that has | ||
| // a ColumnarToRow child with CometPythonMapInArrowExec to avoid the unnecessary | ||
| // Arrow->Row->Arrow round-trip. The matchers are version-shimmed: Spark 3.4 returns None | ||
| // (it lacks the required APIs) and Spark 4.1+ matches the renamed `MapInArrowExec`. | ||
| case p: SparkPlan | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thinking if it actually makes sense to have python/udf support in separate module? so user can build comet with python support or not |
||
| if CometConf.COMET_PYTHON_MAP_IN_ARROW_ENABLED.get() && | ||
| matchMapInArrow(p).orElse(matchMapInPandas(p)).isDefined => | ||
| val (mapFunc, mapOutput, mapChild, mapIsBarrier, mapEvalType) = | ||
| matchMapInArrow(p).orElse(matchMapInPandas(p)).get | ||
| extractColumnarChild(mapChild) | ||
| .map { columnarChild => | ||
| CometPythonMapInArrowExec( | ||
| mapFunc, | ||
| mapOutput, | ||
| columnarChild, | ||
| mapIsBarrier, | ||
| mapEvalType) | ||
| } | ||
| .getOrElse(p) | ||
|
|
||
| // Spark adds `RowToColumnar` under Comet columnar shuffle. But it's redundant as the | ||
| // shuffle takes row-based input. | ||
| case s @ CometShuffleExchangeExec( | ||
|
|
@@ -130,6 +153,18 @@ case class EliminateRedundantTransitions(session: SparkSession) extends Rule[Spa | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * If the given plan is a ColumnarToRow transition wrapping a columnar child, returns that | ||
| * columnar child. Used to detect and eliminate unnecessary transitions before Python UDF | ||
| * operators. | ||
| */ | ||
| private def extractColumnarChild(plan: SparkPlan): Option[SparkPlan] = plan match { | ||
| case ColumnarToRowExec(child) if child.supportsColumnar => Some(child) | ||
| case CometColumnarToRowExec(child) => Some(child) | ||
| case CometNativeColumnarToRowExec(child) => Some(child) | ||
| case _ => None | ||
| } | ||
|
|
||
| /** | ||
| * Creates an appropriate columnar to row transition operator. | ||
| * | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be Spark 4 now?
I'm assuming that this is enabled for only one version of Spark because it is experimental?