Skip to content

[SPARK-56999][PYSPARK] Fix mapInArrow opaque getInt error by coercing output to declared schema#56049

Open
Yicong-Huang wants to merge 1 commit into
apache:masterfrom
Yicong-Huang:mapinarrow-int-column-fix
Open

[SPARK-56999][PYSPARK] Fix mapInArrow opaque getInt error by coercing output to declared schema#56049
Yicong-Huang wants to merge 1 commit into
apache:masterfrom
Yicong-Huang:mapinarrow-int-column-fix

Conversation

@Yicong-Huang
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Make the SQL_MAP_ARROW_ITER_UDF branch in python/pyspark/worker.py coerce each
output pa.RecordBatch against the UDF's declared output schema via
ArrowBatchTransformer.enforce_schema, mirroring every sibling Arrow eval-type
branch. reorder_by_name is wired to the existing
spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName config, matching
SQL_GROUPED_MAP_ARROW_UDF / applyInArrow.

Why are the changes needed?

Without coercion, an Arrow type mismatch between the UDF output and the declared
output schema surfaces in the JVM as an opaque accessor error:

[UNSUPPORTED_CALL.WITHOUT_SUGGESTION] Cannot call the method "getInt" of the
class "org.apache.spark.sql.vectorized.ArrowColumnVector\$ArrowVectorAccessor".
SQLSTATE: 0A000

Repro:

```python
import pyarrow as pa
from pyspark.sql.types import StructType, StructField, IntegerType

def double_x(iter_batches):
for batch in iter_batches:
df = batch.to_pandas()
df["x"] = df["x"] * 2
yield pa.RecordBatch.from_pandas(df[["x"]])

src = spark.createDataFrame([(1,), (2,), (3,)], ["x"]) # x is long (int64)
out = src.mapInArrow(double_x, schema=StructType([StructField("x", IntegerType())]))
out.show()
```

`createDataFrame` infers `x` as `LongType`; the pandas roundtrip preserves int64;
the declared schema is int32; the JVM picks `LongAccessor` for a `BigIntVector`
and the outer scan calls `getInt` on it, hitting the no-op base accessor.

Does this PR introduce any user-facing change?

Yes. `mapInArrow` output batches that previously caused an opaque JVM accessor
failure are now coerced in Python:

  • Compatible Arrow type mismatches (e.g. int64 -> int32 from a pandas roundtrip)
    are cast with `safecheck=True`, so overflows still raise.
  • Column ordering is reconciled by name under
    `spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName` (default
    true), consistent with `applyInArrow`.
  • Incompatible mismatches raise a Python `RESULT_COLUMN_NAMES_MISMATCH` /
    `RESULT_COLUMN_TYPES_MISMATCH` error instead of failing in the JVM.

How was this patch tested?

`python/run-tests --testnames pyspark.sql.tests.arrow.test_arrow_map`
(81 tests pass). Added regression test `test_coerce_output_type_to_declared_schema`
covering the int64 -> int32 case; updated `test_top_level_wrong_order` and
`test_nested_extraneous_field` to reflect the new coerce-by-name behavior.

Was this patch authored or co-authored using generative AI tooling?

No.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant