From f17ef78d64d8005355c4d6295734cedc638651a0 Mon Sep 17 00:00:00 2001 From: Tom Zeng Date: Sat, 2 May 2026 13:25:17 -0700 Subject: [PATCH 1/3] fix(spark-expr): preserve scalar tag in WideDecimalBinaryExpr when both inputs are scalars WideDecimalBinaryExpr::evaluate always returned ColumnarValue::Array, even when both inputs were Scalar. The resulting length-1 array lost its scalar tag, so a downstream comparison against a full batch hit arrow-ord's "Cannot compare arrays of different lengths, got 8192 vs 1". This is the root cause of the TPC-DS q23 BroadcastHashJoin crash (issue #1615): the filter '0.95 * scalar_subquery > ssales' produces a Scalar x Scalar decimal multiply whose length-1 result was then compared against the 8192-row ssales column. Detect the both-scalar case and unwrap the length-1 result back into a ColumnarValue::Scalar so downstream kernels take the scalar fast-path. Adds two unit tests: - test_scalar_scalar_returns_scalar: regression for the crash - test_array_input_returns_array: guards the array path Closes #1615 --- .../math_funcs/wide_decimal_binary_expr.rs | 83 ++++++++++++++++++- 1 file changed, 82 insertions(+), 1 deletion(-) diff --git a/native/spark-expr/src/math_funcs/wide_decimal_binary_expr.rs b/native/spark-expr/src/math_funcs/wide_decimal_binary_expr.rs index 644252b46b..98e8211865 100644 --- a/native/spark-expr/src/math_funcs/wide_decimal_binary_expr.rs +++ b/native/spark-expr/src/math_funcs/wide_decimal_binary_expr.rs @@ -184,6 +184,16 @@ impl PhysicalExpr for WideDecimalBinaryExpr { let left_val = self.left.evaluate(batch)?; let right_val = self.right.evaluate(batch)?; + // Track scalar-ness so we can return a Scalar when both inputs are scalars. + // Without this, a (Scalar op Scalar) result would be returned as a length-1 + // Array, and downstream comparisons against full batches would incorrectly + // see two Array operands with mismatched lengths instead of (Array, Scalar). + // See https://github.com/apache/datafusion-comet/issues/1615 (the q23 BHJ + // join-filter scalar-subquery crash). + let both_scalar = matches!( + (&left_val, &right_val), + (ColumnarValue::Scalar(_), ColumnarValue::Scalar(_)) + ); let (left_arr, right_arr): (ArrayRef, ArrayRef) = match (&left_val, &right_val) { (ColumnarValue::Array(l), ColumnarValue::Array(r)) => (Arc::clone(l), Arc::clone(r)), (ColumnarValue::Scalar(l), ColumnarValue::Array(r)) => { @@ -280,7 +290,16 @@ impl PhysicalExpr for WideDecimalBinaryExpr { result }; let result = result.with_data_type(DataType::Decimal128(p_out, s_out)); - Ok(ColumnarValue::Array(Arc::new(result))) + if both_scalar { + // Convert the length-1 result back into a Scalar so downstream + // expressions (binary ops, comparisons) can take the scalar fast-path + // and propagate the scalar tag (Datum::is_scalar) through arrow-rs + // kernels. + let scalar = datafusion::common::ScalarValue::try_from_array(&result, 0)?; + Ok(ColumnarValue::Scalar(scalar)) + } else { + Ok(ColumnarValue::Array(Arc::new(result))) + } } fn children(&self) -> Vec<&Arc> { @@ -557,4 +576,66 @@ mod tests { let arr = result.as_primitive::(); assert_eq!(arr.value(0), 20000); // 2.0000 } + + /// Regression test for the TPC-DS q23 BroadcastHashJoin crash (issue #1615). + /// + /// When both inputs are `ColumnarValue::Scalar`, `evaluate` must return a + /// `ColumnarValue::Scalar` -- not a length-1 `ColumnarValue::Array`. Otherwise + /// downstream comparisons against full batches see two `Array` operands with + /// mismatched lengths and arrow-ord's `compare_op` rejects them with + /// "Cannot compare arrays of different lengths, got N vs 1". + #[test] + fn test_scalar_scalar_returns_scalar() { + use datafusion::common::ScalarValue; + use datafusion::physical_expr::expressions::Literal; + + // 0.95 * 100.00 -- the same Scalar x Scalar decimal multiply pattern that + // appears in the q23 filter `0.95 * scalar_subquery > ssales`. + let left: Arc = + Arc::new(Literal::new(ScalarValue::Decimal128(Some(95), 38, 2))); + let right: Arc = + Arc::new(Literal::new(ScalarValue::Decimal128(Some(10000), 38, 2))); + + let expr = WideDecimalBinaryExpr::new( + left, + right, + WideDecimalOp::Multiply, + 38, + 2, + EvalMode::Legacy, + ); + + // Empty schema -- both inputs are Literals so no columns are needed. + let batch = RecordBatch::new_empty(Arc::new(Schema::empty())); + match expr.evaluate(&batch).unwrap() { + ColumnarValue::Scalar(ScalarValue::Decimal128(Some(v), 38, 2)) => { + // 0.95 * 100.00 = 95.00 -> at scale 2, integer 9500 + assert_eq!(v, 9500); + } + ColumnarValue::Scalar(other) => { + panic!("expected Decimal128(Some(_), 38, 2), got {other:?}"); + } + ColumnarValue::Array(_) => { + panic!( + "Scalar x Scalar must return ColumnarValue::Scalar, not Array. This is the q23 BHJ crash regression (issue #1615)." + ); + } + } + } + + /// Companion test: when at least one input is an Array, the result must remain an Array. + /// Guards against over-eager scalar-unwrapping in the fix. + #[test] + fn test_array_input_returns_array() { + let batch = make_batch( + vec![Some(150), Some(250)], + 38, + 2, + vec![Some(100), Some(200)], + 38, + 2, + ); + let result = eval_expr(&batch, WideDecimalOp::Add, 38, 2, EvalMode::Legacy).unwrap(); + assert_eq!(result.len(), 2); + } } From fd3fcef6fff7e715badaf1e47661314e8da46b34 Mon Sep 17 00:00:00 2001 From: Tom Zeng Date: Tue, 5 May 2026 07:54:51 -0700 Subject: [PATCH 2/3] address review: drop #1615 link, fix panic message spacing - Remove #1615 references from comments and test docs (per review, #1615 is an unrelated Arrow C-Data offset-buffer bug). Keep the TPC-DS q23 BHJ pattern as a concrete example. - Apply andygrove's suggestion: collapse the broken multi-line panic string into a single clean message. --- .../src/math_funcs/wide_decimal_binary_expr.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/native/spark-expr/src/math_funcs/wide_decimal_binary_expr.rs b/native/spark-expr/src/math_funcs/wide_decimal_binary_expr.rs index 98e8211865..430aff7deb 100644 --- a/native/spark-expr/src/math_funcs/wide_decimal_binary_expr.rs +++ b/native/spark-expr/src/math_funcs/wide_decimal_binary_expr.rs @@ -187,9 +187,10 @@ impl PhysicalExpr for WideDecimalBinaryExpr { // Track scalar-ness so we can return a Scalar when both inputs are scalars. // Without this, a (Scalar op Scalar) result would be returned as a length-1 // Array, and downstream comparisons against full batches would incorrectly - // see two Array operands with mismatched lengths instead of (Array, Scalar). - // See https://github.com/apache/datafusion-comet/issues/1615 (the q23 BHJ - // join-filter scalar-subquery crash). + // see two Array operands with mismatched lengths instead of (Array, Scalar), + // crashing arrow-ord's compare_op with "Cannot compare arrays of different + // lengths". This pattern appears, for example, in TPC-DS q23's BHJ filter + // `0.95 * scalar_subquery > ssales`. let both_scalar = matches!( (&left_val, &right_val), (ColumnarValue::Scalar(_), ColumnarValue::Scalar(_)) @@ -577,20 +578,22 @@ mod tests { assert_eq!(arr.value(0), 20000); // 2.0000 } - /// Regression test for the TPC-DS q23 BroadcastHashJoin crash (issue #1615). + /// Regression test for the Scalar x Scalar wide-decimal evaluation path. /// /// When both inputs are `ColumnarValue::Scalar`, `evaluate` must return a /// `ColumnarValue::Scalar` -- not a length-1 `ColumnarValue::Array`. Otherwise /// downstream comparisons against full batches see two `Array` operands with /// mismatched lengths and arrow-ord's `compare_op` rejects them with - /// "Cannot compare arrays of different lengths, got N vs 1". + /// "Cannot compare arrays of different lengths, got N vs 1". This pattern + /// appears, for example, in TPC-DS q23's BHJ filter + /// `0.95 * scalar_subquery > ssales`. #[test] fn test_scalar_scalar_returns_scalar() { use datafusion::common::ScalarValue; use datafusion::physical_expr::expressions::Literal; // 0.95 * 100.00 -- the same Scalar x Scalar decimal multiply pattern that - // appears in the q23 filter `0.95 * scalar_subquery > ssales`. + // appears in TPC-DS q23's filter `0.95 * scalar_subquery > ssales`. let left: Arc = Arc::new(Literal::new(ScalarValue::Decimal128(Some(95), 38, 2))); let right: Arc = @@ -617,7 +620,7 @@ mod tests { } ColumnarValue::Array(_) => { panic!( - "Scalar x Scalar must return ColumnarValue::Scalar, not Array. This is the q23 BHJ crash regression (issue #1615)." + "Scalar x Scalar must return ColumnarValue::Scalar, not Array" ); } } From fedcb3510ea6d392ae9b5e6a02b34669589c0da9 Mon Sep 17 00:00:00 2001 From: Tom Zeng Date: Tue, 5 May 2026 09:08:08 -0700 Subject: [PATCH 3/3] =?UTF-8?q?fix:=20cargo=20fmt=20=E2=80=94=20collapse?= =?UTF-8?q?=20panic=20onto=20single=20line?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CI lint job (cargo fmt --all -- --check) wanted the short panic message inlined. --- native/spark-expr/src/math_funcs/wide_decimal_binary_expr.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/native/spark-expr/src/math_funcs/wide_decimal_binary_expr.rs b/native/spark-expr/src/math_funcs/wide_decimal_binary_expr.rs index 430aff7deb..7ff78713be 100644 --- a/native/spark-expr/src/math_funcs/wide_decimal_binary_expr.rs +++ b/native/spark-expr/src/math_funcs/wide_decimal_binary_expr.rs @@ -619,9 +619,7 @@ mod tests { panic!("expected Decimal128(Some(_), 38, 2), got {other:?}"); } ColumnarValue::Array(_) => { - panic!( - "Scalar x Scalar must return ColumnarValue::Scalar, not Array" - ); + panic!("Scalar x Scalar must return ColumnarValue::Scalar, not Array"); } } }