Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,8 @@ case class ListAgg(
inputAggBufferOffset: Int = 0)
extends Collect[mutable.ArrayBuffer[Any]]
with SupportsOrderingWithinGroup
with ImplicitCastInputTypes {
with ImplicitCastInputTypes
with AliasHelper {

override def orderingFilled: Boolean = orderExpressions.nonEmpty

Expand Down Expand Up @@ -588,7 +589,8 @@ case class ListAgg(
if (someOrder.isEmpty) {
return true
}
if (someOrder.size == 1 && someOrder.head.child.semanticEquals(child)) {
if (someOrder.size == 1 &&
trimAliases(someOrder.head.child).semanticEquals(trimAliases(child))) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description says "trim aliases from ListAgg expression subtree", but checkOrderValueDeterminism (line 684) still does orderExpressions.head.child.semanticEquals(castChild) without trimming. With spark.sql.listagg.allowDistinctCastWithOrder.enabled=true (default), a query like

SELECT listagg(DISTINCT v:a::string, ',') WITHIN GROUP (ORDER BY v:a) FROM ...

reaches that path, and the two analyzers still produce different errors:

  • single-pass (aliases still present): the Alias-vs-Alias semanticEquals is false because the ExprIds differ, so we return NonDeterministicMismatch and throw functionAndOrderExpressionMismatchError.
  • fixed-point (post-CleanupAliases): semanticEquals is true, then isCastEqualityPreserving(VariantType) is false, so we throw functionAndOrderExpressionUnsafeCastError.

Should this comparison also wrap both sides in trimAliases? Otherwise the single-pass/fixed-point divergence is only partly closed.

return true
}
false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -702,3 +702,84 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
"inputType" : "\"TIMESTAMP\""
}
}


-- !query
SELECT listagg(DISTINCT v:a::string, ',') WITHIN GROUP (ORDER BY v:a::string) FROM (SELECT parse_json('{"a": "x"}') v UNION ALL SELECT parse_json('{"a": "y"}') UNION ALL SELECT parse_json('{"a": "x"}'))
-- !query analysis
Aggregate [listagg(distinct cast(variant_get(v#x, $.a, VariantType, true, Some(America/Los_Angeles)) as string), ,, cast(variant_get(v#x, $.a, VariantType, true, Some(America/Los_Angeles)) as string) ASC NULLS FIRST, 0, 0) AS listagg(DISTINCT CAST(variant_get(v, $.a) AS a AS STRING), ,) WITHIN GROUP (ORDER BY CAST(variant_get(v, $.a) AS a AS STRING) ASC NULLS FIRST)#x]
+- SubqueryAlias __auto_generated_subquery_name
+- Union false, false
:- Union false, false
: :- Project [parse_json({"a": "x"}, true) AS v#x]
: : +- OneRowRelation
: +- Project [parse_json({"a": "y"}, true) AS parse_json({"a": "y"})#x]
: +- OneRowRelation
+- Project [parse_json({"a": "x"}, true) AS parse_json({"a": "x"})#x]
+- OneRowRelation


-- !query
SELECT listagg(v:a::string, ',') WITHIN GROUP (ORDER BY v:a::string) FROM (SELECT parse_json('{"a": "x"}') v UNION ALL SELECT parse_json('{"a": "y"}') UNION ALL SELECT parse_json('{"a": "x"}'))
-- !query analysis
Aggregate [listagg(cast(variant_get(v#x, $.a, VariantType, true, Some(America/Los_Angeles)) as string), ,, cast(variant_get(v#x, $.a, VariantType, true, Some(America/Los_Angeles)) as string) ASC NULLS FIRST, 0, 0) AS listagg(CAST(variant_get(v, $.a) AS a AS STRING), ,) WITHIN GROUP (ORDER BY CAST(variant_get(v, $.a) AS a AS STRING) ASC NULLS FIRST)#x]
+- SubqueryAlias __auto_generated_subquery_name
+- Union false, false
:- Union false, false
: :- Project [parse_json({"a": "x"}, true) AS v#x]
: : +- OneRowRelation
: +- Project [parse_json({"a": "y"}, true) AS parse_json({"a": "y"})#x]
: +- OneRowRelation
+- Project [parse_json({"a": "x"}, true) AS parse_json({"a": "x"})#x]
+- OneRowRelation


-- !query
SELECT listagg(DISTINCT v:a::string, ',') WITHIN GROUP (ORDER BY v:a::string DESC) FROM (SELECT parse_json('{"a": "x"}') v UNION ALL SELECT parse_json('{"a": "y"}') UNION ALL SELECT parse_json('{"a": "x"}'))
-- !query analysis
Aggregate [listagg(distinct cast(variant_get(v#x, $.a, VariantType, true, Some(America/Los_Angeles)) as string), ,, cast(variant_get(v#x, $.a, VariantType, true, Some(America/Los_Angeles)) as string) DESC NULLS LAST, 0, 0) AS listagg(DISTINCT CAST(variant_get(v, $.a) AS a AS STRING), ,) WITHIN GROUP (ORDER BY CAST(variant_get(v, $.a) AS a AS STRING) DESC NULLS LAST)#x]
+- SubqueryAlias __auto_generated_subquery_name
+- Union false, false
:- Union false, false
: :- Project [parse_json({"a": "x"}, true) AS v#x]
: : +- OneRowRelation
: +- Project [parse_json({"a": "y"}, true) AS parse_json({"a": "y"})#x]
: +- OneRowRelation
+- Project [parse_json({"a": "x"}, true) AS parse_json({"a": "x"})#x]
+- OneRowRelation


-- !query
SELECT listagg(DISTINCT v:a.b::string, ',') WITHIN GROUP (ORDER BY v:a.b::string) FROM (SELECT parse_json('{"a": {"b": "x"}}') v UNION ALL SELECT parse_json('{"a": {"b": "y"}}') UNION ALL SELECT parse_json('{"a": {"b": "x"}}'))
-- !query analysis
Aggregate [listagg(distinct cast(variant_get(v#x, $.a.b, VariantType, true, Some(America/Los_Angeles)) as string), ,, cast(variant_get(v#x, $.a.b, VariantType, true, Some(America/Los_Angeles)) as string) ASC NULLS FIRST, 0, 0) AS listagg(DISTINCT CAST(variant_get(v, $.a.b) AS b AS STRING), ,) WITHIN GROUP (ORDER BY CAST(variant_get(v, $.a.b) AS b AS STRING) ASC NULLS FIRST)#x]
+- SubqueryAlias __auto_generated_subquery_name
+- Union false, false
:- Union false, false
: :- Project [parse_json({"a": {"b": "x"}}, true) AS v#x]
: : +- OneRowRelation
: +- Project [parse_json({"a": {"b": "y"}}, true) AS parse_json({"a": {"b": "y"}})#x]
: +- OneRowRelation
+- Project [parse_json({"a": {"b": "x"}}, true) AS parse_json({"a": {"b": "x"}})#x]
+- OneRowRelation


-- !query
SELECT grp, listagg(DISTINCT v:a::string, ',') WITHIN GROUP (ORDER BY v:a::string) FROM (SELECT 1 grp, parse_json('{"a": "x"}') v UNION ALL SELECT 1, parse_json('{"a": "y"}') UNION ALL SELECT 2, parse_json('{"a": "x"}') UNION ALL SELECT 2, parse_json('{"a": "x"}') UNION ALL SELECT 1, parse_json('{"a": "x"}')) GROUP BY grp
-- !query analysis
Aggregate [grp#x], [grp#x, listagg(distinct cast(variant_get(v#x, $.a, VariantType, true, Some(America/Los_Angeles)) as string), ,, cast(variant_get(v#x, $.a, VariantType, true, Some(America/Los_Angeles)) as string) ASC NULLS FIRST, 0, 0) AS listagg(DISTINCT CAST(variant_get(v, $.a) AS a AS STRING), ,) WITHIN GROUP (ORDER BY CAST(variant_get(v, $.a) AS a AS STRING) ASC NULLS FIRST)#x]
+- SubqueryAlias __auto_generated_subquery_name
+- Union false, false
:- Union false, false
: :- Union false, false
: : :- Union false, false
: : : :- Project [1 AS grp#x, parse_json({"a": "x"}, true) AS v#x]
: : : : +- OneRowRelation
: : : +- Project [1 AS 1#x, parse_json({"a": "y"}, true) AS parse_json({"a": "y"})#x]
: : : +- OneRowRelation
: : +- Project [2 AS 2#x, parse_json({"a": "x"}, true) AS parse_json({"a": "x"})#x]
: : +- OneRowRelation
: +- Project [2 AS 2#x, parse_json({"a": "x"}, true) AS parse_json({"a": "x"})#x]
: +- OneRowRelation
+- Project [1 AS 1#x, parse_json({"a": "x"}, true) AS parse_json({"a": "x"})#x]
+- OneRowRelation
12 changes: 12 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/listagg.sql
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,15 @@ SELECT listagg(DISTINCT col1) WITHIN GROUP (ORDER BY col1, col2) FROM df;
SELECT listagg(DISTINCT col, ',') WITHIN GROUP (ORDER BY col) FROM VALUES (cast(1.1 as double)), (cast(2.2 as double)), (cast(2.2 as double)), (cast(3.3 as double)) AS t(col);
SELECT listagg(DISTINCT col, ',') WITHIN GROUP (ORDER BY col) FROM VALUES (cast(1.0 as float)), (cast(2.0 as float)), (cast(2.0 as float)) AS t(col);
SELECT listagg(DISTINCT col, ',') WITHIN GROUP (ORDER BY col) FROM VALUES (TIMESTAMP'2024-01-01 10:00:00'), (TIMESTAMP'2024-01-02 12:00:00'), (TIMESTAMP'2024-01-01 10:00:00') AS t(col);

-- LISTAGG with semi-structured extract (parser wraps v:a in Alias with fresh ExprId)
-- Tests that isOrderCompatible strips Alias wrappers before comparing via semanticEquals
SELECT listagg(DISTINCT v:a::string, ',') WITHIN GROUP (ORDER BY v:a::string) FROM (SELECT parse_json('{"a": "x"}') v UNION ALL SELECT parse_json('{"a": "y"}') UNION ALL SELECT parse_json('{"a": "x"}'));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQLQueryTestSuite only runs the legacy analyzer here, where CleanupAliases has already stripped the parser-introduced Alias before CheckAnalysis calls isOrderCompatible. So these golden files pass on master without the fix too — they don't actually exercise the single-pass path the PR is fixing.

Consider adding a targeted single-pass unit test (e.g. in AggregateExpressionResolverSuite or alongside ResolverRunner) that constructs LISTAGG(DISTINCT v:a::string) WITHIN GROUP (ORDER BY v:a::string) and goes through Resolver directly, so the regression is pinned at the layer where it actually manifested.

-- Semi-structured extract without DISTINCT
SELECT listagg(v:a::string, ',') WITHIN GROUP (ORDER BY v:a::string) FROM (SELECT parse_json('{"a": "x"}') v UNION ALL SELECT parse_json('{"a": "y"}') UNION ALL SELECT parse_json('{"a": "x"}'));
-- Semi-structured extract with DESC ordering
SELECT listagg(DISTINCT v:a::string, ',') WITHIN GROUP (ORDER BY v:a::string DESC) FROM (SELECT parse_json('{"a": "x"}') v UNION ALL SELECT parse_json('{"a": "y"}') UNION ALL SELECT parse_json('{"a": "x"}'));
-- Semi-structured extract with nested path
SELECT listagg(DISTINCT v:a.b::string, ',') WITHIN GROUP (ORDER BY v:a.b::string) FROM (SELECT parse_json('{"a": {"b": "x"}}') v UNION ALL SELECT parse_json('{"a": {"b": "y"}}') UNION ALL SELECT parse_json('{"a": {"b": "x"}}'));
-- Semi-structured extract with GROUP BY
SELECT grp, listagg(DISTINCT v:a::string, ',') WITHIN GROUP (ORDER BY v:a::string) FROM (SELECT 1 grp, parse_json('{"a": "x"}') v UNION ALL SELECT 1, parse_json('{"a": "y"}') UNION ALL SELECT 2, parse_json('{"a": "x"}') UNION ALL SELECT 2, parse_json('{"a": "x"}') UNION ALL SELECT 1, parse_json('{"a": "x"}')) GROUP BY grp;
41 changes: 41 additions & 0 deletions sql/core/src/test/resources/sql-tests/results/listagg.sql.out
Original file line number Diff line number Diff line change
Expand Up @@ -563,3 +563,44 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
"inputType" : "\"TIMESTAMP\""
}
}


-- !query
SELECT listagg(DISTINCT v:a::string, ',') WITHIN GROUP (ORDER BY v:a::string) FROM (SELECT parse_json('{"a": "x"}') v UNION ALL SELECT parse_json('{"a": "y"}') UNION ALL SELECT parse_json('{"a": "x"}'))
-- !query schema
struct<listagg(DISTINCT CAST(variant_get(v, $.a) AS a AS STRING), ,) WITHIN GROUP (ORDER BY CAST(variant_get(v, $.a) AS a AS STRING) ASC NULLS FIRST):string>
-- !query output
x,y


-- !query
SELECT listagg(v:a::string, ',') WITHIN GROUP (ORDER BY v:a::string) FROM (SELECT parse_json('{"a": "x"}') v UNION ALL SELECT parse_json('{"a": "y"}') UNION ALL SELECT parse_json('{"a": "x"}'))
-- !query schema
struct<listagg(CAST(variant_get(v, $.a) AS a AS STRING), ,) WITHIN GROUP (ORDER BY CAST(variant_get(v, $.a) AS a AS STRING) ASC NULLS FIRST):string>
-- !query output
x,x,y


-- !query
SELECT listagg(DISTINCT v:a::string, ',') WITHIN GROUP (ORDER BY v:a::string DESC) FROM (SELECT parse_json('{"a": "x"}') v UNION ALL SELECT parse_json('{"a": "y"}') UNION ALL SELECT parse_json('{"a": "x"}'))
-- !query schema
struct<listagg(DISTINCT CAST(variant_get(v, $.a) AS a AS STRING), ,) WITHIN GROUP (ORDER BY CAST(variant_get(v, $.a) AS a AS STRING) DESC NULLS LAST):string>
-- !query output
y,x


-- !query
SELECT listagg(DISTINCT v:a.b::string, ',') WITHIN GROUP (ORDER BY v:a.b::string) FROM (SELECT parse_json('{"a": {"b": "x"}}') v UNION ALL SELECT parse_json('{"a": {"b": "y"}}') UNION ALL SELECT parse_json('{"a": {"b": "x"}}'))
-- !query schema
struct<listagg(DISTINCT CAST(variant_get(v, $.a.b) AS b AS STRING), ,) WITHIN GROUP (ORDER BY CAST(variant_get(v, $.a.b) AS b AS STRING) ASC NULLS FIRST):string>
-- !query output
x,y


-- !query
SELECT grp, listagg(DISTINCT v:a::string, ',') WITHIN GROUP (ORDER BY v:a::string) FROM (SELECT 1 grp, parse_json('{"a": "x"}') v UNION ALL SELECT 1, parse_json('{"a": "y"}') UNION ALL SELECT 2, parse_json('{"a": "x"}') UNION ALL SELECT 2, parse_json('{"a": "x"}') UNION ALL SELECT 1, parse_json('{"a": "x"}')) GROUP BY grp
-- !query schema
struct<grp:int,listagg(DISTINCT CAST(variant_get(v, $.a) AS a AS STRING), ,) WITHIN GROUP (ORDER BY CAST(variant_get(v, $.a) AS a AS STRING) ASC NULLS FIRST):string>
-- !query output
1 x,y
2 x