[SPARK-56903][SQL] Spread NULL outer join keys across shuffle partitions#55927
[SPARK-56903][SQL] Spread NULL outer join keys across shuffle partitions#55927sunchao wants to merge 10 commits into
Conversation
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Please make the CI happy, @sunchao .
[info] *** 18 TESTS FAILED ***
[error] Failed: Total 6146, Failed 18, Errors 0, Passed 6128, Ignored 4
[error] Failed tests:
[error] org.apache.spark.sql.TPCDSV1_4_PlanStabilitySuite
[error] org.apache.spark.sql.TPCDSV2_7_PlanStabilityWithStatsSuite
[error] org.apache.spark.sql.TPCDSV1_4_PlanStabilityWithStatsSuite
[error] org.apache.spark.sql.TPCDSV2_7_PlanStabilitySuite
[error] (sql / Test / test) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 3764 s (01:02:44.0), completed May 17, 2026, 4:30:13 AM
|
Thanks @dongjoon-hyun , fixed. |
|
Also cc @cloud-fan @viirya @peter-toth to review |
| keys.exists(_.exists(_.isInstanceOf[IsNull])) | ||
| } | ||
|
|
||
| private lazy val canSpreadNullJoinKeys: Boolean = { |
There was a problem hiding this comment.
Is this robust enough? What if someone crafts a null handling join condition by hand?
Actually, this looks good.
Actually, why this is needed at all and when can't we spread nulls?
<=> is translated to 2 key pairs Coalesce(a.k, default), Coalesce(b.k, default)) and (IsNull(a.k), IsNull(b.k)), so null never shows up in shuffle keys. The join type check seems fair enough.
There was a problem hiding this comment.
For most types the coalesce key is non-null, but Literal.default(NullType) is itself null, so it seems the extracted shuffle key can still contain nulls even though those rows remain matchable under <=>.
There was a problem hiding this comment.
Without spreading, NullType <=> keys all hash to the same value (Murmur3Hash(null) is deterministic) → all NULL rows collocate on one reducer. The executor then runs:
- SortMergeJoinExec.scala:1116: while (advancedStreamed() && streamedRowKey.anyNull) — skip every NULL-keyed streamed row.
- SortMergeJoinExec.scala:1529: in full-outer, leftRowKey.anyNull triggers padding emission, never a match.
So even with NULL rows colocated, the executor's anyNull guard prevents NULL=NULL from matching. The <=> semantics the user wanted (NULL matches NULL) is never delivered for NullType — the rewrite was supposed to convert NULLs
to non-null sentinels so the executor's guard wouldn't fire, but for NullType the sentinel itself is NULL, so the guard fires anyway and the join produces only padding (full outer) or nothing (inner).
With spreading, NULL rows scatter across reducers. Each reducer's executor sees some NULL rows from both sides. The anyNull guard fires the same way. Same padding emission, same lack of matching.
Output is identical with or without spreading — both produce the broken-but-self-consistent "NULL=NULL doesn't match" behavior for NullType.
There was a problem hiding this comment.
Hmm that is a good point. It seems the check is indeed unnecessary then, let me remove it.
There was a problem hiding this comment.
Please don't forget to update the PR description and let's leave some comments here why spreading nulls is safe in <=> outer joins.
I wonder if left anti join could also benefit from the feature.
There was a problem hiding this comment.
Updated the PR description.
As for left anti join, yes, ordinary shuffled left anti equi-joins with = could likely benefit for the same reason as outer joins: preserved left-side rows with NULL join keys are guaranteed not to match, so concentrating them on one reducer is unnecessary. I kept this PR scoped to outer joins for now to avoid broadening the change.
There was a problem hiding this comment.
Thanks. Sure, handling outer joins in this PR is a nice improvement.
There was a problem hiding this comment.
Thank you @sunchao, let me do another round:
Summary
Adds an opt-in null-aware shuffle partitioning for outer equi-joins to break up the skew that occurs when many preserved-side rows share a NULL join key. With the flag on, only NULL-keyed rows are redistributed; non-NULL keys keep the existing hash contract.
Prior state. Outer-join inputs ship through HashPartitioning. For null-heavy keys, Murmur3Hash(null) is fixed, so all NULL-keyed rows from one side land on a single reducer; the unmatched-NULLs become a hot partition that pre-shuffle skew handling cannot help — the imbalance is in the hash distribution itself, not in any data sketch the skew detector measures.
Design approach. Limit the new behavior to LEFT/RIGHT/FULL OUTER (where preserved-NULL rows can never match under = semantics) and gate it behind spark.sql.shuffle.spreadNullJoinKeys.enabled. When opted in, the join asks for ClusteredDistribution(_, allowNullKeySpreading = true), the planner satisfies that via a new NullAwareHashPartitioning, and the partitioner spreads any anyNull() row round-robin within a map task while keeping non-NULL rows on their normal hash partition. <=> correctness is preserved because the existing coalesce(...)/isnull(...) rewrite eliminates NULLs from the shuffle keys before this code path sees them; the only residual case (NullType) is unaffected because the executor's anyNull skip in SortMergeJoinExec already blocks NULL=NULL matches independent of where the rows physically land.
Key design decisions.
NullAwareHashPartitioningdoes not satisfy an ordinaryClusteredDistribution— only one withallowNullKeySpreading = true. This is the load-bearing invariant: downstream operators that need NULL co-location (aggregates, non-spreading joins) correctly trigger a re-shuffle. TheAdaptiveQueryExecSuitediff confirms the cost —optimizeOutRepartitionno longer fires for repartitions sitting above a null-aware-shuffled join.HashShuffleSpecandNullAwareHashShuffleSpecare made mutually compatible only when both distributions opt in. Without this, the asymmetric case where one input already arrives asHashPartitioning(k)would force a redundant left-side shuffle.- Stateful per-row partition assignment reuses the SPARK-23207 mechanism:
sortBeforeRepartition+isOrderSensitive. The path is structurally parallel toRoundRobinPartitioning, so retry semantics are inherited rather than re-derived. - The
NullTypeshuffle-key guard added in earlier revisions was deliberately removed in commit 7c760ec after analysis showedSortMergeJoinExec'sanyNullskip already makes spreading result-equivalent in that corner.
Implementation sketch. Partitioning hierarchy (partitioning.scala: new NullAwareHashPartitioning, CoalescedNullAwareHashPartitioning, NullAwareHashShuffleSpec, plus a HashShuffleSpecCompatibility helper to deduplicate the now-shared compatibility check). Shuffle path (ShuffleExchangeExec: new partitioner case spreads NULLs round-robin per map task; deterministic-local-sort guard widened to cover the new partitioning). Join trait (ShuffledJoin: gate-controlled opt-in via the feature flag). AQE path (AQEShuffleReadExec wraps coalesced specs into CoalescedNullAwareHashPartitioning). Config: SHUFFLE_SPREAD_NULL_JOIN_KEYS_ENABLED with ConfigBindingPolicy.SESSION.
|
|
||
| override def expressions: Seq[Expression] = from.expressions | ||
|
|
||
| override def satisfies0(required: Distribution): Boolean = { |
There was a problem hiding this comment.
This body is identical to NullAwareHashPartitioning.satisfies0 at line 340 — same outer UnspecifiedDistribution/AllTuples/_ => false match, same ClusteredDistribution inner match guarded on allowNullKeySpreading, same requireAllClusterKeys branching. This is the same kind of duplication addressed elsewhere in this PR by extracting HashShuffleSpecCompatibility.isCompatible (lines 944-955).
Two cleaner shapes:
- Lift the inner block to a private helper, e.g.
private def nullAwareSatisfies0(exprs, n, required)shared by both classes. - Or just delegate: since boundaries don't change satisfaction semantics for the
allowNullKeySpreadingcontract,CoalescedNullAwareHashPartitioning.satisfies0(required)is essentiallyfrom.satisfies0(required)except for theAllTuplescase wherenumPartitionsdiffers — that single divergence is easy to handle inline.
Side note: both overrides skip the StatefulOpClusteredDistribution case that HashPartitioningLike.satisfies0 handles. Currently unreachable (streaming joins use StatefulOpClusteredDistribution, not ClusteredDistribution, so they never opt into allowNullKeySpreading), but a one-line comment that the omission is deliberate would help the next reader.
| trait ShuffledJoin extends JoinCodegenSupport { | ||
| def isSkewJoin: Boolean | ||
|
|
||
| private lazy val canSpreadNullJoinKeys: Boolean = { |
There was a problem hiding this comment.
The gate opts in based on join type alone, ignoring whether the shuffle keys are actually nullable. For an outer join on non-nullable keys (e.g. f.k = d.k where both k are NOT NULL — common after a NOT NULL filter or on schema-non-null columns), the new path:
- Adds a per-row
joinKeys.anyNull()check inShuffleExchangeExec.getPartitionKeyExtractorthat always returns false. - Produces
NullAwareHashPartitioningas the join's output partitioning, which doesn't satisfy ordinaryClusteredDistribution. TheAdaptiveQueryExecSuitediff in this PR (optimizeOutRepartition = falsecases around lines 2079-2127) shows the cost — a downstreamdf.repartition($"b")is no longer collapsed even though the underlying NULL-skew problem can never have existed.
Two options worth considering:
- Gate also on
leftKeys.exists(_.nullable) || rightKeys.exists(_.nullable)so a non-nullable-key outer join falls back to plainHashPartitioning. - If the simpler shape is preferred, add a sentence to the lazy val's comment explicitly calling out the trade-off (skew reduction vs. potentially unnecessary downstream re-shuffle / lost
optimizeOutRepartition) so future readers don't read it as an oversight.
| } | ||
|
|
||
| test("null-aware hash shuffle preserves retry determinism with local sorting") { | ||
| withSQLConf(SQLConf.SORT_BEFORE_REPARTITION.key -> "true") { |
There was a problem hiding this comment.
Retry-determinism for the new partitioning has two paths and only one is covered: with SORT_BEFORE_REPARTITION = true the local sort makes row order deterministic; with false, the code in ShuffleExchangeExec (line ~497-498) instead relies on isOrderSensitive = true to propagate the parent's determinism level. Only the sorted half is exercised here.
A second test running the same ShuffleExchangeExec(NullAwareHashPartitioning, ...) with SORT_BEFORE_REPARTITION = false and asserting that outputDeterministicLevel inherits the parent's level would catch a future regression that drops isNullAwareHashPartitioning from the isOrderSensitive clause (which would silently make retries unsafe for the unsorted path). Mirroring the structure of the existing test is enough — no new infrastructure required.
cloud-fan
left a comment
There was a problem hiding this comment.
Summary
Prior state and problem. For shuffled outer equi-joins on ordinary =, NULL-keyed rows on the preserved side must appear in the output (that's the outer-join contract) but can never satisfy a.k = b.k (NULL = anything is unknown, not true). So those rows need to be emitted, but the join machinery doesn't need to match them with anything on the other side — they're carry-through rows.
This contrasts with what HashPartitioning actually does today: it sends every row with the same hash to the same reducer, including NULL keys. Murmur3Hash(null) is deterministic, so on NULL-heavy inputs the entire NULL cohort collapses onto one reducer, producing a textbook skewed-reducer pattern in the join's output stage even though no correctness contract requires those rows to be co-located.
Worth noting why this is uniquely an outer-join problem rather than a general equi-join one: for inner joins, InferFiltersFromConstraints pushes IsNotNull(k) to both sides, so NULL-keyed rows are filtered out before the shuffle and the skew never materializes. For outer joins, that pushdown is blocked on the preserved side(s) — left outer can push IsNotNull to the right but not the left, right outer is symmetric, full outer can push to neither. The skew lives exactly where the pushdown can't reach. (Same structural reason applies to left anti, which @peter-toth raised — worth framing the follow-up that way.)
Design approach. Recognize that what equi-joins actually need from their input distribution is strictly weaker than what aggregate / window / stateful joins need:
- Aggregate / window /
StatefulOpClusteredDistribution/ bucketed writes — need "same-key co-location," NULLs included, because they consume the NULL group as a coherent unit. - Equi-join — only needs "same-non-NULL-key co-location," because the NULL cohort never participates in matching.
The PR introduces a layout that delivers the weaker contract: non-NULL keys hash as before; NULL keys are scattered round-robin within each map task. Implementation is gated behind spark.sql.shuffle.spreadNullJoinKeys.enabled (default off) and engaged by ShuffledJoin only for outer joins.
Result-safety for <=> (the concern @peter-toth dug into at length, worth restating). ExtractEquiJoinKeys rewrites a <=> b to (coalesce(a, default(T)), isNull(a)) shuffle-key pairs, so for any concrete type the shuffle keys are non-null and the new spreading path never triggers. The only residual is NullType, where Literal.default(NullType) is itself null — but for that case the executor's anyNull guard in SortMergeJoinExec / ShuffledHashJoinExec already classifies the rows as unmatched, so spreading them does not change the result. The earlier-revision <=> gate on canSpreadNullJoinKeys was correctly removed once that argument was nailed down.
Key design decisions made by this PR.
- How to mark "this layout has weaker NULL co-location" so downstream operators see it. The PR's choice: a parallel type hierarchy —
NullAwareHashPartitioning,CoalescedNullAwareHashPartitioning,NullAwareHashShuffleSpec— paralleling the existingHashPartitioningfamily. Downstream operators that require strictClusteredDistributioncorrectly failsatisfiesagainst the new type and re-shuffle. - Layout-marking is required either way (a downstream
GROUP BY klooking at the join'soutputPartitioninghas to be able to tell the difference) — but a flag onHashPartitioningwould deliver the same property with substantially less code. The PR currently duplicateshashKeyPositions,canCreatePartitioning,createPartitioning, andnumPartitionsbetweenHashShuffleSpecandNullAwareHashShuffleSpec, andCoalescedNullAwareHashPartitioningis structurally identical toCoalescedHashPartitioningmodulo the type offrom. The compatibility helperHashShuffleSpecCompatibilityshows the duplication was felt; aspreadNullKeys: Boolean = falsefield onHashPartitioning(and onHashShuffleSpec) would let the existing classes carry the property and eliminate the duplication entirely. EXPLAIN visibility — the one argument for distinct types — costs one line intoString. (Detailed inline below.) - Scope to outer joins. Correct, and for the structural reason above (where preserved-side
IsNotNullpushdown is blocked) — though that reason isn't in the code or PR description today; the comment incanSpreadNullJoinKeysonly addresses the<=>corner. Worth saying it: NULL keys on the preserved side must be emitted but can never satisfy=, so their reducer placement is purely a layout choice. - Round-robin within each map task, seeded by
XORShiftRandom(partitionId). MirrorsRoundRobinPartitioningexactly, including thesortBeforeRepartition/isOrderSensitiveretry-determinism plumbing. The XOR-shift seed avoids the correlated-start hazard a plainpartitionId % numPartitionswould have.
Implementation sketch.
- Catalyst (
partitioning.scala):ClusteredDistributiongains opt-inallowNullKeySpreading;NullAwareHashPartitioning/CoalescedNullAwareHashPartitioningparallel their non-null-aware peers;NullAwareHashShuffleSpecparticipates in compatibility checks (a regularHashShuffleSpecand aNullAwareHashShuffleSpecare mutually compatible iff both distributions have the opt-in flag set). - Planner (
ShuffledJoin.scala): whencanSpreadNullJoinKeysholds, both sides request the opted-inClusteredDistribution. - Shuffle write (
ShuffleExchangeExec.scala): the partition extractor forNullAwareHashPartitioningchecksanyNullper row and round-robins NULL-keyed rows via a per-task counter; the existing round-robin retry-determinism plumbing is extended to cover the new stateful extractor. - AQE (
AQEShuffleReadExec.scala): coalesced reads preserve the partitioning shape by producingCoalescedNullAwareHashPartitioning.
Behavioral changes worth calling out (and worth adding to the PR description so users enabling the flag aren't surprised):
df.repartition($"k") JOIN ... ON k = ...on an outer join no longer optimizes out the user-requested top shuffle. The inner join producesNullAwareHashPartitioning, which is intentionally not equivalent to the user-requestedHashPartitioning. This is what the AQE test diffs reflect (optimizeOutRepartition: true → false).- When one side of an outer join is already shuffled as
HashPartitioning, only the other side gets re-shuffled asNullAwareHashPartitioning. The already-shuffled side keeps its NULL skew; the optimization is one-sided in that case. - An outer join's downstream
GROUP BY k/OVER (PARTITION BY k)/JOIN ... ON k = ...will now re-shuffle (NullAwareHashPartitioningdoes not satisfy strictClusteredDistribution). Correct, but an extra exchange relative to today's plan.
Suggested improvements (most as inline comments below):
- Static nullability gate.
canSpreadNullJoinKeysshould also checkleftKeys.exists(_.nullable) || rightKeys.exists(_.nullable). Outer joins on PK/FK / NOT-NULL / post-IsNotNullkeys gain nothing from the null-aware path and pay both runtime per-row cost and the downstream re-shuffle cost above. The analyzer already tracks nullability — using it here makes the whole mechanism a no-op when there's no NULL to spread. - Collapse the parallel type hierarchy into a flag on
HashPartitioning(see inline) — removes ~85 lines of duplication, keeps every existingcase h: HashPartitioning =>pattern match working. - Reframe
canSpreadNullJoinKeyscomment and PR-description "Why" around the structural reason — preserved-side rows must be emitted but can never satisfy=, and on outer joinsIsNotNullpushdown can't reach the preserved side. The<=>and NullType notes become a corollary rather than the lead. - Single key evaluation in
getPartitionKeyExtractor— today the join key expressions are evaluated twice for non-NULL rows. - Test coverage. (a)
ShuffledHashJoinExecalso extendsShuffledJoinand supports outer joins, but the new tests only coverSortMergeJoinExec. (b) TheNullType<=>corner — the basis of the safety argument for<=>— isn't directly exercised. (c)ExtractEquiJoinKeys.unapply(join).foreach { case ... => withSQLConf { ... } }silently passes ifunapplyreturnsNone; six new tests use this pattern. - Documentation.
NullAwareHashPartitioningScaladoc should call out that it does NOT satisfy a strictClusteredDistribution;CoalescedNullAwareHashPartitioning,NullAwareHashShuffleSpec, andClusteredDistribution.allowNullKeySpreadinghave no Scaladoc.
| SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION), | ||
| requiredNumPartitions: Option[Int] = None) extends Distribution { | ||
| requiredNumPartitions: Option[Int] = None, | ||
| allowNullKeySpreading: Boolean = false) extends Distribution { |
There was a problem hiding this comment.
Worth a Scaladoc on this field describing the contract: it's a permission, not a requirement (an ordinary HashPartitioning still satisfies this distribution when the flag is true; the flag only weakens what the default partitioning produced by createPartitioning looks like). And it's the consumer-side knob — the partitioning-side marker (NullAwareHashPartitioning today, or a flag on HashPartitioning per the comment below) is what tells downstream operators they need to re-shuffle for strict ClusteredDistribution.
| * to be co-located. Non-NULL join keys preserve the same partitioning contract as | ||
| * [[HashPartitioning]], while rows with any NULL join key may be spread across partitions. | ||
| */ | ||
| case class NullAwareHashPartitioning(expressions: Seq[Expression], numPartitions: Int) |
There was a problem hiding this comment.
Design alternative worth considering: a spreadNullKeys: Boolean = false field on HashPartitioning instead of a parallel type hierarchy.
The marker this carries is one bit ("NULL keys may be spread, so I don't deliver strict same-key co-location"). Encoding it as a parallel type means duplicating hashKeyPositions, canCreatePartitioning, createPartitioning, numPartitions, and (modulo the helper just extracted) isCompatibleWith in NullAwareHashShuffleSpec, plus reproducing CoalescedHashPartitioning as CoalescedNullAwareHashPartitioning, plus a new arm in every dispatcher (ShuffleExchangeExec.prepareShuffleDependency's part and getPartitionKeyExtractor, AQEShuffleReadExec.outputPartitioning).
With a flag:
HashPartitioning.satisfies0only matches strictClusteredDistributionwhen!spreadNullKeys, only matchesallowNullKeySpreading=truedistributions whenspreadNullKeys.HashShuffleSpeccarries the flag; one extra clause inisCompatibleWith.CoalescedHashPartitioningalready wraps aHashPartitioning— it inherits the flag transparently. No new coalesced class.- Dispatchers branch on
h.spreadNullKeysinstead of branching on type, so every existingcase h: HashPartitioning =>site (BucketingUtils,V1Writes,EnsureRequirements,AQEUtils,basicPhysicalOperators, etc.) keeps working unchanged.
The one argument for distinct types is EXPLAIN-string visibility — a one-line toString fix on the flagged variant.
Separately on this class's Scaladoc: worth calling out that NullAwareHashPartitioning intentionally does NOT satisfy a strict ClusteredDistribution (NULL clustering keys aren't co-located). That's the non-obvious downstream contract — it's what forces downstream GROUP BY / window / strict equi-join to re-shuffle.
There was a problem hiding this comment.
Yea this is an alternative design. The pros and cons are:
Pros:
- Much less duplicated code.
- Existing
HashPartitioningplumbing can often be reused directly. CoalescedHashPartitioningandHashShuffleSpeccan carry the flag instead of requiring parallel classes.- Fewer pattern-match branches across the codebase.
Cons:
- The semantic distinction becomes easier to overlook.
- A
HashPartitioningwithspreadNullKeys = trueis no longer “ordinary hash partitioning” in the old sense. - Every place that reasons about
HashPartitioningnow has to remember to inspect the flag before assuming strict same-key co-location. - That is subtle and potentially error-prone because
HashPartitioningis already widely used. - The class name no longer advertises the weaker contract; you would need careful
toString, docs, and audits to preserve the same clarity.
I'm a bit concerned about the cons since HashPartitioning is widely used in the codebase and the change could have a bigger blast radius than just adding another NullAwareHashPartitioning.
| copy(from = from.copy(expressions = newChildren)) | ||
| } | ||
|
|
||
| case class CoalescedNullAwareHashPartitioning( |
There was a problem hiding this comment.
Missing Scaladoc here (and on NullAwareHashShuffleSpec below). CoalescedHashPartitioning documents what it represents — worth matching that here.
| private lazy val canSpreadNullJoinKeys: Boolean = { | ||
| // Null-safe equality usually rewrites to non-null shuffle keys. The NullType corner can still | ||
| // produce NULL shuffle keys, but shuffled join execution already treats those rows as | ||
| // unmatched, so spreading them does not change the result. | ||
| val isOuterJoin = joinType == LeftOuter || joinType == RightOuter || joinType == FullOuter | ||
| conf.getConf(SQLConf.SHUFFLE_SPREAD_NULL_JOIN_KEYS_ENABLED) && | ||
| isOuterJoin | ||
| } |
There was a problem hiding this comment.
Two improvements on this gate:
(1) Static nullability check. Outer joins on non-nullable keys (PK/FK / NOT-NULL columns / post-IsNotNull filtered keys) gain nothing from the null-aware path but still pay both the runtime per-row anyNull check and the downstream re-shuffle cost from outputPartitioning no longer satisfying strict ClusteredDistribution. The analyzer already tracks Expression.nullable — use it here to make the mechanism a no-op when there's no NULL to spread.
(2) Reframe the comment around the structural reason. The current comment only addresses the <=> corner. The real "why this PR exists" story is the preserved-side / pushdown-asymmetry argument — worth leading with that, with the <=> and NullType notes as a corollary.
| private lazy val canSpreadNullJoinKeys: Boolean = { | |
| // Null-safe equality usually rewrites to non-null shuffle keys. The NullType corner can still | |
| // produce NULL shuffle keys, but shuffled join execution already treats those rows as | |
| // unmatched, so spreading them does not change the result. | |
| val isOuterJoin = joinType == LeftOuter || joinType == RightOuter || joinType == FullOuter | |
| conf.getConf(SQLConf.SHUFFLE_SPREAD_NULL_JOIN_KEYS_ENABLED) && | |
| isOuterJoin | |
| } | |
| private lazy val canSpreadNullJoinKeys: Boolean = { | |
| // NULL keys on the preserved side of an outer join must be emitted but can never | |
| // satisfy `a.k = b.k` under three-valued logic, so their reducer placement is a | |
| // pure layout choice. Inner joins don't have this problem because | |
| // InferFiltersFromConstraints pushes IsNotNull(key) to both sides; for outer joins | |
| // that pushdown is blocked on the preserved side(s) -- which is exactly where | |
| // NULL-key skew can land. | |
| // | |
| // For null-safe equality (`<=>`), ExtractEquiJoinKeys rewrites to (coalesce, isNull) | |
| // shuffle keys, which are non-null for any concrete type. The NullType corner can | |
| // still produce NULL shuffle keys, but shuffled join execution already treats those | |
| // rows as unmatched, so spreading them does not change the result. | |
| val isOuterJoin = joinType == LeftOuter || joinType == RightOuter || joinType == FullOuter | |
| conf.getConf(SQLConf.SHUFFLE_SPREAD_NULL_JOIN_KEYS_ENABLED) && | |
| isOuterJoin && | |
| (leftKeys.exists(_.nullable) || rightKeys.exists(_.nullable)) | |
| } |
| case h: NullAwareHashPartitioning => | ||
| val partitionIdProjection = | ||
| UnsafeProjection.create(h.partitionIdExpression :: Nil, outputAttributes) | ||
| val joinKeyProjection = UnsafeProjection.create(h.expressions, outputAttributes) | ||
| var nullKeyPartition = | ||
| new XORShiftRandom(TaskContext.get().partitionId()).nextInt(h.numPartitions) | ||
| row => { | ||
| val joinKeys = joinKeyProjection(row) | ||
| if (joinKeys.anyNull()) { | ||
| // NULL join keys cannot match under ordinary equi-join semantics. Spread them | ||
| // round-robin within each map task so identical rows do not collapse to one reducer. | ||
| val partition = nullKeyPartition | ||
| nullKeyPartition = (nullKeyPartition + 1) % h.numPartitions | ||
| partition | ||
| } else { | ||
| partitionIdProjection(row).getInt(0) | ||
| } | ||
| } |
There was a problem hiding this comment.
Join keys are evaluated twice for non-NULL rows on this path: once via joinKeyProjection(row) to call anyNull(), again via partitionIdProjection(row).getInt(0) which re-evaluates the same expressions to compute the hash. For most expression shapes that's a tight loop, but redundant.
Could evaluate the keys once, check anyNull on the materialized row, then hash directly from that row.
Combined with the static-nullability gate at ShuffledJoin.canSpreadNullJoinKeys (which skips this path entirely when keys are statically non-nullable), the residual overhead becomes "check the null bitset once per row when at least one key is nullable" — about as low as this gets without adaptive observation of actual NULL frequency.
There was a problem hiding this comment.
I updated it to only evaluate the join keys once but the logic becomes more complicated. Please take another look!
| val join = Join(nullableLeft.logicalPlan, nullableRight.logicalPlan, | ||
| LeftOuter, Some(joinCondition), JoinHint.NONE) | ||
|
|
||
| ExtractEquiJoinKeys.unapply(join).foreach { |
There was a problem hiding this comment.
ExtractEquiJoinKeys.unapply(join).foreach { case ... => withSQLConf { ... } } silently passes if unapply returns None — all assertions live inside the foreach body, so a regression in ExtractEquiJoinKeys would make these tests report success without exercising anything. Prefer:
val (_, leftKeys, rightKeys, boundCondition, _, _, _, _) =
ExtractEquiJoinKeys.unapply(join).getOrElse(fail("Failed to extract equi-join keys"))
withSQLConf(...) {
...
}Applies to all six new tests in this file using this pattern.
Separately: all six new tests use SortMergeJoinExec. ShuffledHashJoinExec also extends ShuffledJoin and supports LeftOuter / RightOuter / FullOuter (with the matching build side), so it picks up the same canSpreadNullJoinKeys behavior — worth at least one end-to-end test on that path too. And the NullType <=> corner case (which the safety argument for <=> rests on) isn't directly exercised by any of the new tests.
What changes were proposed in this pull request?
This PR reduces shuffle skew for null-heavy shuffled outer equi-joins.
For
LEFT OUTER,RIGHT OUTER, andFULL OUTERjoins, preserved rows with aNULLshuffle key may not need to stay concentrated on one reducer. Today those rows can all
collapse into the same shuffle partition, which creates avoidable skew on NULL-heavy inputs.
This change adds a feature-flagged null-aware shuffle partitioning mode for shuffled outer
joins:
NULLshuffle key are spread across reducers instead of collapsing into onepartition.
spark.sql.shuffle.spreadNullJoinKeys.enabled.Spreading remains result-safe for null-safe equality (
<=>) outer joins:<=>join keys, Spark rewrites them into non-null shuffle-keyexpressions using
coalesce(...)andisnull(...), so there are noNULLshuffle keys forthis feature to redistribute.
NullType, where the shuffle key can still beNULL. In thatcase, shuffled join execution already treats the row as unmatched, so redistributing those
rows does not change query results.
The implementation wires this through the planner and runtime pieces that need to understand
the new partitioning contract:
ClusteredDistributioncan opt into null-aware spreading.pretending to satisfy ordinary clustered distributions.
NULLkeys while preserving retry safety.This PR intentionally stays scoped to outer joins. Left anti joins may also have skewed
preserved-side
NULLrows for ordinary=predicates and are worth evaluating separately, butthey need their own correctness and planning review rather than being folded into this patch.
Why are the changes needed?
Outer joins can preserve large numbers of unmatched rows from the outer side. When many of those
rows have
NULLshuffle keys, sending them all to one reducer creates skew even though they donot require one shared reducer for correctness.
Example:
If
fact.kcontains manyNULLvalues, those rows must remain in the result as unmatchedleft-side rows, but they do not need to be grouped together for correctness. Spreading them
reduces needless reducer concentration while leaving normal key matching unchanged.
Does this PR introduce any user-facing change?
Yes, in execution behavior only. Query results are unchanged, but when the feature flag is
enabled, shuffle partitioning for eligible NULL-heavy outer equi-joins becomes less skewed.
How was this patch tested?
with
NULLkeys, null-safe outer-join behavior, shuffle-levelNULLspreading, retrydeterminism, shuffle-spec compatibility, and AQE preservation of null-aware coalesced reads.
git diff --check.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Codex GPT-5