Skip to content

[SPARK-56903][SQL] Spread NULL outer join keys across shuffle partitions#55927

Open
sunchao wants to merge 10 commits into
apache:masterfrom
sunchao:dev/chao/codex/null-aware-outer-join-apache
Open

[SPARK-56903][SQL] Spread NULL outer join keys across shuffle partitions#55927
sunchao wants to merge 10 commits into
apache:masterfrom
sunchao:dev/chao/codex/null-aware-outer-join-apache

Conversation

@sunchao
Copy link
Copy Markdown
Member

@sunchao sunchao commented May 17, 2026

What changes were proposed in this pull request?

This PR reduces shuffle skew for null-heavy shuffled outer equi-joins.

For LEFT OUTER, RIGHT OUTER, and FULL OUTER joins, preserved rows with a NULL
shuffle key may not need to stay concentrated on one reducer. Today those rows can all
collapse into the same shuffle partition, which creates avoidable skew on NULL-heavy inputs.

This change adds a feature-flagged null-aware shuffle partitioning mode for shuffled outer
joins:

  • Non-NULL shuffle keys keep the existing hash partitioning behavior.
  • Rows with any NULL shuffle key are spread across reducers instead of collapsing into one
    partition.
  • The behavior is disabled by default behind
    spark.sql.shuffle.spreadNullJoinKeys.enabled.

Spreading remains result-safe for null-safe equality (<=>) outer joins:

  • For ordinary extracted <=> join keys, Spark rewrites them into non-null shuffle-key
    expressions using coalesce(...) and isnull(...), so there are no NULL shuffle keys for
    this feature to redistribute.
  • The only remaining corner is NullType, where the shuffle key can still be NULL. In that
    case, shuffled join execution already treats the row as unmatched, so redistributing those
    rows does not change query results.

The implementation wires this through the planner and runtime pieces that need to understand
the new partitioning contract:

  • ClusteredDistribution can opt into null-aware spreading.
  • New null-aware partitioning and shuffle-spec variants preserve compatibility checks without
    pretending to satisfy ordinary clustered distributions.
  • Shuffle execution spreads unmatched NULL keys while preserving retry safety.
  • AQE/coalesced shuffle reads preserve the new partitioning shape.

This PR intentionally stays scoped to outer joins. Left anti joins may also have skewed
preserved-side NULL rows for ordinary = predicates and are worth evaluating separately, but
they need their own correctness and planning review rather than being folded into this patch.

Why are the changes needed?

Outer joins can preserve large numbers of unmatched rows from the outer side. When many of those
rows have NULL shuffle keys, sending them all to one reducer creates skew even though they do
not require one shared reducer for correctness.

Example:

SELECT *
FROM fact f
LEFT OUTER JOIN dim d
  ON f.k = d.k

If fact.k contains many NULL values, those rows must remain in the result as unmatched
left-side rows, but they do not need to be grouped together for correctness. Spreading them
reduces needless reducer concentration while leaving normal key matching unchanged.

Does this PR introduce any user-facing change?

Yes, in execution behavior only. Query results are unchanged, but when the feature flag is
enabled, shuffle partitioning for eligible NULL-heavy outer equi-joins becomes less skewed.

How was this patch tested?

  • Added and updated unit tests covering outer-join planning, FULL OUTER JOIN result correctness
    with NULL keys, null-safe outer-join behavior, shuffle-level NULL spreading, retry
    determinism, shuffle-spec compatibility, and AQE preservation of null-aware coalesced reads.
  • Regenerated the affected TPC-DS plan-stability outputs after the expected physical-plan change.
  • Ran focused plan-stability verification for the affected TPC-DS cases locally.
  • Ran git diff --check.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Codex GPT-5

Copy link
Copy Markdown
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make the CI happy, @sunchao .

[info] *** 18 TESTS FAILED ***
[error] Failed: Total 6146, Failed 18, Errors 0, Passed 6128, Ignored 4
[error] Failed tests:
[error] 	org.apache.spark.sql.TPCDSV1_4_PlanStabilitySuite
[error] 	org.apache.spark.sql.TPCDSV2_7_PlanStabilityWithStatsSuite
[error] 	org.apache.spark.sql.TPCDSV1_4_PlanStabilityWithStatsSuite
[error] 	org.apache.spark.sql.TPCDSV2_7_PlanStabilitySuite
[error] (sql / Test / test) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 3764 s (01:02:44.0), completed May 17, 2026, 4:30:13 AM

@sunchao
Copy link
Copy Markdown
Member Author

sunchao commented May 17, 2026

Thanks @dongjoon-hyun , fixed.

@sunchao
Copy link
Copy Markdown
Member Author

sunchao commented May 17, 2026

Also cc @cloud-fan @viirya @peter-toth to review

keys.exists(_.exists(_.isInstanceOf[IsNull]))
}

private lazy val canSpreadNullJoinKeys: Boolean = {
Copy link
Copy Markdown
Contributor

@peter-toth peter-toth May 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this robust enough? What if someone crafts a null handling join condition by hand?

Actually, this looks good.

Actually, why this is needed at all and when can't we spread nulls?
<=> is translated to 2 key pairs Coalesce(a.k, default), Coalesce(b.k, default)) and (IsNull(a.k), IsNull(b.k)), so null never shows up in shuffle keys. The join type check seems fair enough.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For most types the coalesce key is non-null, but Literal.default(NullType) is itself null, so it seems the extracted shuffle key can still contain nulls even though those rows remain matchable under <=>.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without spreading, NullType <=> keys all hash to the same value (Murmur3Hash(null) is deterministic) → all NULL rows collocate on one reducer. The executor then runs:

  • SortMergeJoinExec.scala:1116: while (advancedStreamed() && streamedRowKey.anyNull) — skip every NULL-keyed streamed row.
  • SortMergeJoinExec.scala:1529: in full-outer, leftRowKey.anyNull triggers padding emission, never a match.

So even with NULL rows colocated, the executor's anyNull guard prevents NULL=NULL from matching. The <=> semantics the user wanted (NULL matches NULL) is never delivered for NullType — the rewrite was supposed to convert NULLs
to non-null sentinels so the executor's guard wouldn't fire, but for NullType the sentinel itself is NULL, so the guard fires anyway and the join produces only padding (full outer) or nothing (inner).

With spreading, NULL rows scatter across reducers. Each reducer's executor sees some NULL rows from both sides. The anyNull guard fires the same way. Same padding emission, same lack of matching.

Output is identical with or without spreading — both produce the broken-but-self-consistent "NULL=NULL doesn't match" behavior for NullType.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm that is a good point. It seems the check is indeed unnecessary then, let me remove it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't forget to update the PR description and let's leave some comments here why spreading nulls is safe in <=> outer joins.
I wonder if left anti join could also benefit from the feature.

Copy link
Copy Markdown
Member Author

@sunchao sunchao May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the PR description.

As for left anti join, yes, ordinary shuffled left anti equi-joins with = could likely benefit for the same reason as outer joins: preserved left-side rows with NULL join keys are guaranteed not to match, so concentrating them on one reducer is unnecessary. I kept this PR scoped to outer joins for now to avoid broadening the change.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Sure, handling outer joins in this PR is a nice improvement.

Copy link
Copy Markdown
Contributor

@peter-toth peter-toth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @sunchao, let me do another round:

Summary

Adds an opt-in null-aware shuffle partitioning for outer equi-joins to break up the skew that occurs when many preserved-side rows share a NULL join key. With the flag on, only NULL-keyed rows are redistributed; non-NULL keys keep the existing hash contract.

Prior state. Outer-join inputs ship through HashPartitioning. For null-heavy keys, Murmur3Hash(null) is fixed, so all NULL-keyed rows from one side land on a single reducer; the unmatched-NULLs become a hot partition that pre-shuffle skew handling cannot help — the imbalance is in the hash distribution itself, not in any data sketch the skew detector measures.

Design approach. Limit the new behavior to LEFT/RIGHT/FULL OUTER (where preserved-NULL rows can never match under = semantics) and gate it behind spark.sql.shuffle.spreadNullJoinKeys.enabled. When opted in, the join asks for ClusteredDistribution(_, allowNullKeySpreading = true), the planner satisfies that via a new NullAwareHashPartitioning, and the partitioner spreads any anyNull() row round-robin within a map task while keeping non-NULL rows on their normal hash partition. <=> correctness is preserved because the existing coalesce(...)/isnull(...) rewrite eliminates NULLs from the shuffle keys before this code path sees them; the only residual case (NullType) is unaffected because the executor's anyNull skip in SortMergeJoinExec already blocks NULL=NULL matches independent of where the rows physically land.

Key design decisions.

  • NullAwareHashPartitioning does not satisfy an ordinary ClusteredDistribution — only one with allowNullKeySpreading = true. This is the load-bearing invariant: downstream operators that need NULL co-location (aggregates, non-spreading joins) correctly trigger a re-shuffle. The AdaptiveQueryExecSuite diff confirms the cost — optimizeOutRepartition no longer fires for repartitions sitting above a null-aware-shuffled join.
  • HashShuffleSpec and NullAwareHashShuffleSpec are made mutually compatible only when both distributions opt in. Without this, the asymmetric case where one input already arrives as HashPartitioning(k) would force a redundant left-side shuffle.
  • Stateful per-row partition assignment reuses the SPARK-23207 mechanism: sortBeforeRepartition + isOrderSensitive. The path is structurally parallel to RoundRobinPartitioning, so retry semantics are inherited rather than re-derived.
  • The NullType shuffle-key guard added in earlier revisions was deliberately removed in commit 7c760ec after analysis showed SortMergeJoinExec's anyNull skip already makes spreading result-equivalent in that corner.

Implementation sketch. Partitioning hierarchy (partitioning.scala: new NullAwareHashPartitioning, CoalescedNullAwareHashPartitioning, NullAwareHashShuffleSpec, plus a HashShuffleSpecCompatibility helper to deduplicate the now-shared compatibility check). Shuffle path (ShuffleExchangeExec: new partitioner case spreads NULLs round-robin per map task; deterministic-local-sort guard widened to cover the new partitioning). Join trait (ShuffledJoin: gate-controlled opt-in via the feature flag). AQE path (AQEShuffleReadExec wraps coalesced specs into CoalescedNullAwareHashPartitioning). Config: SHUFFLE_SPREAD_NULL_JOIN_KEYS_ENABLED with ConfigBindingPolicy.SESSION.


override def expressions: Seq[Expression] = from.expressions

override def satisfies0(required: Distribution): Boolean = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This body is identical to NullAwareHashPartitioning.satisfies0 at line 340 — same outer UnspecifiedDistribution/AllTuples/_ => false match, same ClusteredDistribution inner match guarded on allowNullKeySpreading, same requireAllClusterKeys branching. This is the same kind of duplication addressed elsewhere in this PR by extracting HashShuffleSpecCompatibility.isCompatible (lines 944-955).

Two cleaner shapes:

  • Lift the inner block to a private helper, e.g. private def nullAwareSatisfies0(exprs, n, required) shared by both classes.
  • Or just delegate: since boundaries don't change satisfaction semantics for the allowNullKeySpreading contract, CoalescedNullAwareHashPartitioning.satisfies0(required) is essentially from.satisfies0(required) except for the AllTuples case where numPartitions differs — that single divergence is easy to handle inline.

Side note: both overrides skip the StatefulOpClusteredDistribution case that HashPartitioningLike.satisfies0 handles. Currently unreachable (streaming joins use StatefulOpClusteredDistribution, not ClusteredDistribution, so they never opt into allowNullKeySpreading), but a one-line comment that the omission is deliberate would help the next reader.

trait ShuffledJoin extends JoinCodegenSupport {
def isSkewJoin: Boolean

private lazy val canSpreadNullJoinKeys: Boolean = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The gate opts in based on join type alone, ignoring whether the shuffle keys are actually nullable. For an outer join on non-nullable keys (e.g. f.k = d.k where both k are NOT NULL — common after a NOT NULL filter or on schema-non-null columns), the new path:

  1. Adds a per-row joinKeys.anyNull() check in ShuffleExchangeExec.getPartitionKeyExtractor that always returns false.
  2. Produces NullAwareHashPartitioning as the join's output partitioning, which doesn't satisfy ordinary ClusteredDistribution. The AdaptiveQueryExecSuite diff in this PR (optimizeOutRepartition = false cases around lines 2079-2127) shows the cost — a downstream df.repartition($"b") is no longer collapsed even though the underlying NULL-skew problem can never have existed.

Two options worth considering:

  • Gate also on leftKeys.exists(_.nullable) || rightKeys.exists(_.nullable) so a non-nullable-key outer join falls back to plain HashPartitioning.
  • If the simpler shape is preferred, add a sentence to the lazy val's comment explicitly calling out the trade-off (skew reduction vs. potentially unnecessary downstream re-shuffle / lost optimizeOutRepartition) so future readers don't read it as an oversight.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Updated

}

test("null-aware hash shuffle preserves retry determinism with local sorting") {
withSQLConf(SQLConf.SORT_BEFORE_REPARTITION.key -> "true") {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retry-determinism for the new partitioning has two paths and only one is covered: with SORT_BEFORE_REPARTITION = true the local sort makes row order deterministic; with false, the code in ShuffleExchangeExec (line ~497-498) instead relies on isOrderSensitive = true to propagate the parent's determinism level. Only the sorted half is exercised here.

A second test running the same ShuffleExchangeExec(NullAwareHashPartitioning, ...) with SORT_BEFORE_REPARTITION = false and asserting that outputDeterministicLevel inherits the parent's level would catch a future regression that drops isNullAwareHashPartitioning from the isOrderSensitive clause (which would silently make retries unsafe for the unsorted path). Mirroring the structure of the existing test is enough — no new infrastructure required.

Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary

Prior state and problem. For shuffled outer equi-joins on ordinary =, NULL-keyed rows on the preserved side must appear in the output (that's the outer-join contract) but can never satisfy a.k = b.k (NULL = anything is unknown, not true). So those rows need to be emitted, but the join machinery doesn't need to match them with anything on the other side — they're carry-through rows.

This contrasts with what HashPartitioning actually does today: it sends every row with the same hash to the same reducer, including NULL keys. Murmur3Hash(null) is deterministic, so on NULL-heavy inputs the entire NULL cohort collapses onto one reducer, producing a textbook skewed-reducer pattern in the join's output stage even though no correctness contract requires those rows to be co-located.

Worth noting why this is uniquely an outer-join problem rather than a general equi-join one: for inner joins, InferFiltersFromConstraints pushes IsNotNull(k) to both sides, so NULL-keyed rows are filtered out before the shuffle and the skew never materializes. For outer joins, that pushdown is blocked on the preserved side(s) — left outer can push IsNotNull to the right but not the left, right outer is symmetric, full outer can push to neither. The skew lives exactly where the pushdown can't reach. (Same structural reason applies to left anti, which @peter-toth raised — worth framing the follow-up that way.)

Design approach. Recognize that what equi-joins actually need from their input distribution is strictly weaker than what aggregate / window / stateful joins need:

  • Aggregate / window / StatefulOpClusteredDistribution / bucketed writes — need "same-key co-location," NULLs included, because they consume the NULL group as a coherent unit.
  • Equi-join — only needs "same-non-NULL-key co-location," because the NULL cohort never participates in matching.

The PR introduces a layout that delivers the weaker contract: non-NULL keys hash as before; NULL keys are scattered round-robin within each map task. Implementation is gated behind spark.sql.shuffle.spreadNullJoinKeys.enabled (default off) and engaged by ShuffledJoin only for outer joins.

Result-safety for <=> (the concern @peter-toth dug into at length, worth restating). ExtractEquiJoinKeys rewrites a <=> b to (coalesce(a, default(T)), isNull(a)) shuffle-key pairs, so for any concrete type the shuffle keys are non-null and the new spreading path never triggers. The only residual is NullType, where Literal.default(NullType) is itself null — but for that case the executor's anyNull guard in SortMergeJoinExec / ShuffledHashJoinExec already classifies the rows as unmatched, so spreading them does not change the result. The earlier-revision <=> gate on canSpreadNullJoinKeys was correctly removed once that argument was nailed down.

Key design decisions made by this PR.

  • How to mark "this layout has weaker NULL co-location" so downstream operators see it. The PR's choice: a parallel type hierarchy — NullAwareHashPartitioning, CoalescedNullAwareHashPartitioning, NullAwareHashShuffleSpec — paralleling the existing HashPartitioning family. Downstream operators that require strict ClusteredDistribution correctly fail satisfies against the new type and re-shuffle.
  • Layout-marking is required either way (a downstream GROUP BY k looking at the join's outputPartitioning has to be able to tell the difference) — but a flag on HashPartitioning would deliver the same property with substantially less code. The PR currently duplicates hashKeyPositions, canCreatePartitioning, createPartitioning, and numPartitions between HashShuffleSpec and NullAwareHashShuffleSpec, and CoalescedNullAwareHashPartitioning is structurally identical to CoalescedHashPartitioning modulo the type of from. The compatibility helper HashShuffleSpecCompatibility shows the duplication was felt; a spreadNullKeys: Boolean = false field on HashPartitioning (and on HashShuffleSpec) would let the existing classes carry the property and eliminate the duplication entirely. EXPLAIN visibility — the one argument for distinct types — costs one line in toString. (Detailed inline below.)
  • Scope to outer joins. Correct, and for the structural reason above (where preserved-side IsNotNull pushdown is blocked) — though that reason isn't in the code or PR description today; the comment in canSpreadNullJoinKeys only addresses the <=> corner. Worth saying it: NULL keys on the preserved side must be emitted but can never satisfy =, so their reducer placement is purely a layout choice.
  • Round-robin within each map task, seeded by XORShiftRandom(partitionId). Mirrors RoundRobinPartitioning exactly, including the sortBeforeRepartition / isOrderSensitive retry-determinism plumbing. The XOR-shift seed avoids the correlated-start hazard a plain partitionId % numPartitions would have.

Implementation sketch.

  • Catalyst (partitioning.scala): ClusteredDistribution gains opt-in allowNullKeySpreading; NullAwareHashPartitioning / CoalescedNullAwareHashPartitioning parallel their non-null-aware peers; NullAwareHashShuffleSpec participates in compatibility checks (a regular HashShuffleSpec and a NullAwareHashShuffleSpec are mutually compatible iff both distributions have the opt-in flag set).
  • Planner (ShuffledJoin.scala): when canSpreadNullJoinKeys holds, both sides request the opted-in ClusteredDistribution.
  • Shuffle write (ShuffleExchangeExec.scala): the partition extractor for NullAwareHashPartitioning checks anyNull per row and round-robins NULL-keyed rows via a per-task counter; the existing round-robin retry-determinism plumbing is extended to cover the new stateful extractor.
  • AQE (AQEShuffleReadExec.scala): coalesced reads preserve the partitioning shape by producing CoalescedNullAwareHashPartitioning.

Behavioral changes worth calling out (and worth adding to the PR description so users enabling the flag aren't surprised):

  • df.repartition($"k") JOIN ... ON k = ... on an outer join no longer optimizes out the user-requested top shuffle. The inner join produces NullAwareHashPartitioning, which is intentionally not equivalent to the user-requested HashPartitioning. This is what the AQE test diffs reflect (optimizeOutRepartition: true → false).
  • When one side of an outer join is already shuffled as HashPartitioning, only the other side gets re-shuffled as NullAwareHashPartitioning. The already-shuffled side keeps its NULL skew; the optimization is one-sided in that case.
  • An outer join's downstream GROUP BY k / OVER (PARTITION BY k) / JOIN ... ON k = ... will now re-shuffle (NullAwareHashPartitioning does not satisfy strict ClusteredDistribution). Correct, but an extra exchange relative to today's plan.

Suggested improvements (most as inline comments below):

  1. Static nullability gate. canSpreadNullJoinKeys should also check leftKeys.exists(_.nullable) || rightKeys.exists(_.nullable). Outer joins on PK/FK / NOT-NULL / post-IsNotNull keys gain nothing from the null-aware path and pay both runtime per-row cost and the downstream re-shuffle cost above. The analyzer already tracks nullability — using it here makes the whole mechanism a no-op when there's no NULL to spread.
  2. Collapse the parallel type hierarchy into a flag on HashPartitioning (see inline) — removes ~85 lines of duplication, keeps every existing case h: HashPartitioning => pattern match working.
  3. Reframe canSpreadNullJoinKeys comment and PR-description "Why" around the structural reason — preserved-side rows must be emitted but can never satisfy =, and on outer joins IsNotNull pushdown can't reach the preserved side. The <=> and NullType notes become a corollary rather than the lead.
  4. Single key evaluation in getPartitionKeyExtractor — today the join key expressions are evaluated twice for non-NULL rows.
  5. Test coverage. (a) ShuffledHashJoinExec also extends ShuffledJoin and supports outer joins, but the new tests only cover SortMergeJoinExec. (b) The NullType <=> corner — the basis of the safety argument for <=> — isn't directly exercised. (c) ExtractEquiJoinKeys.unapply(join).foreach { case ... => withSQLConf { ... } } silently passes if unapply returns None; six new tests use this pattern.
  6. Documentation. NullAwareHashPartitioning Scaladoc should call out that it does NOT satisfy a strict ClusteredDistribution; CoalescedNullAwareHashPartitioning, NullAwareHashShuffleSpec, and ClusteredDistribution.allowNullKeySpreading have no Scaladoc.

SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION),
requiredNumPartitions: Option[Int] = None) extends Distribution {
requiredNumPartitions: Option[Int] = None,
allowNullKeySpreading: Boolean = false) extends Distribution {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth a Scaladoc on this field describing the contract: it's a permission, not a requirement (an ordinary HashPartitioning still satisfies this distribution when the flag is true; the flag only weakens what the default partitioning produced by createPartitioning looks like). And it's the consumer-side knob — the partitioning-side marker (NullAwareHashPartitioning today, or a flag on HashPartitioning per the comment below) is what tells downstream operators they need to re-shuffle for strict ClusteredDistribution.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

* to be co-located. Non-NULL join keys preserve the same partitioning contract as
* [[HashPartitioning]], while rows with any NULL join key may be spread across partitions.
*/
case class NullAwareHashPartitioning(expressions: Seq[Expression], numPartitions: Int)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Design alternative worth considering: a spreadNullKeys: Boolean = false field on HashPartitioning instead of a parallel type hierarchy.

The marker this carries is one bit ("NULL keys may be spread, so I don't deliver strict same-key co-location"). Encoding it as a parallel type means duplicating hashKeyPositions, canCreatePartitioning, createPartitioning, numPartitions, and (modulo the helper just extracted) isCompatibleWith in NullAwareHashShuffleSpec, plus reproducing CoalescedHashPartitioning as CoalescedNullAwareHashPartitioning, plus a new arm in every dispatcher (ShuffleExchangeExec.prepareShuffleDependency's part and getPartitionKeyExtractor, AQEShuffleReadExec.outputPartitioning).

With a flag:

  • HashPartitioning.satisfies0 only matches strict ClusteredDistribution when !spreadNullKeys, only matches allowNullKeySpreading=true distributions when spreadNullKeys.
  • HashShuffleSpec carries the flag; one extra clause in isCompatibleWith.
  • CoalescedHashPartitioning already wraps a HashPartitioning — it inherits the flag transparently. No new coalesced class.
  • Dispatchers branch on h.spreadNullKeys instead of branching on type, so every existing case h: HashPartitioning => site (BucketingUtils, V1Writes, EnsureRequirements, AQEUtils, basicPhysicalOperators, etc.) keeps working unchanged.

The one argument for distinct types is EXPLAIN-string visibility — a one-line toString fix on the flagged variant.

Separately on this class's Scaladoc: worth calling out that NullAwareHashPartitioning intentionally does NOT satisfy a strict ClusteredDistribution (NULL clustering keys aren't co-located). That's the non-obvious downstream contract — it's what forces downstream GROUP BY / window / strict equi-join to re-shuffle.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea this is an alternative design. The pros and cons are:

Pros:

  • Much less duplicated code.
  • Existing HashPartitioning plumbing can often be reused directly.
  • CoalescedHashPartitioning and HashShuffleSpec can carry the flag instead of requiring parallel classes.
  • Fewer pattern-match branches across the codebase.

Cons:

  • The semantic distinction becomes easier to overlook.
  • A HashPartitioning with spreadNullKeys = true is no longer “ordinary hash partitioning” in the old sense.
  • Every place that reasons about HashPartitioning now has to remember to inspect the flag before assuming strict same-key co-location.
  • That is subtle and potentially error-prone because HashPartitioning is already widely used.
  • The class name no longer advertises the weaker contract; you would need careful toString, docs, and audits to preserve the same clarity.

I'm a bit concerned about the cons since HashPartitioning is widely used in the codebase and the change could have a bigger blast radius than just adding another NullAwareHashPartitioning.

copy(from = from.copy(expressions = newChildren))
}

case class CoalescedNullAwareHashPartitioning(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Scaladoc here (and on NullAwareHashShuffleSpec below). CoalescedHashPartitioning documents what it represents — worth matching that here.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

Comment on lines +32 to +39
private lazy val canSpreadNullJoinKeys: Boolean = {
// Null-safe equality usually rewrites to non-null shuffle keys. The NullType corner can still
// produce NULL shuffle keys, but shuffled join execution already treats those rows as
// unmatched, so spreading them does not change the result.
val isOuterJoin = joinType == LeftOuter || joinType == RightOuter || joinType == FullOuter
conf.getConf(SQLConf.SHUFFLE_SPREAD_NULL_JOIN_KEYS_ENABLED) &&
isOuterJoin
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two improvements on this gate:

(1) Static nullability check. Outer joins on non-nullable keys (PK/FK / NOT-NULL columns / post-IsNotNull filtered keys) gain nothing from the null-aware path but still pay both the runtime per-row anyNull check and the downstream re-shuffle cost from outputPartitioning no longer satisfying strict ClusteredDistribution. The analyzer already tracks Expression.nullable — use it here to make the mechanism a no-op when there's no NULL to spread.

(2) Reframe the comment around the structural reason. The current comment only addresses the <=> corner. The real "why this PR exists" story is the preserved-side / pushdown-asymmetry argument — worth leading with that, with the <=> and NullType notes as a corollary.

Suggested change
private lazy val canSpreadNullJoinKeys: Boolean = {
// Null-safe equality usually rewrites to non-null shuffle keys. The NullType corner can still
// produce NULL shuffle keys, but shuffled join execution already treats those rows as
// unmatched, so spreading them does not change the result.
val isOuterJoin = joinType == LeftOuter || joinType == RightOuter || joinType == FullOuter
conf.getConf(SQLConf.SHUFFLE_SPREAD_NULL_JOIN_KEYS_ENABLED) &&
isOuterJoin
}
private lazy val canSpreadNullJoinKeys: Boolean = {
// NULL keys on the preserved side of an outer join must be emitted but can never
// satisfy `a.k = b.k` under three-valued logic, so their reducer placement is a
// pure layout choice. Inner joins don't have this problem because
// InferFiltersFromConstraints pushes IsNotNull(key) to both sides; for outer joins
// that pushdown is blocked on the preserved side(s) -- which is exactly where
// NULL-key skew can land.
//
// For null-safe equality (`<=>`), ExtractEquiJoinKeys rewrites to (coalesce, isNull)
// shuffle keys, which are non-null for any concrete type. The NullType corner can
// still produce NULL shuffle keys, but shuffled join execution already treats those
// rows as unmatched, so spreading them does not change the result.
val isOuterJoin = joinType == LeftOuter || joinType == RightOuter || joinType == FullOuter
conf.getConf(SQLConf.SHUFFLE_SPREAD_NULL_JOIN_KEYS_ENABLED) &&
isOuterJoin &&
(leftKeys.exists(_.nullable) || rightKeys.exists(_.nullable))
}

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

Comment on lines +408 to +425
case h: NullAwareHashPartitioning =>
val partitionIdProjection =
UnsafeProjection.create(h.partitionIdExpression :: Nil, outputAttributes)
val joinKeyProjection = UnsafeProjection.create(h.expressions, outputAttributes)
var nullKeyPartition =
new XORShiftRandom(TaskContext.get().partitionId()).nextInt(h.numPartitions)
row => {
val joinKeys = joinKeyProjection(row)
if (joinKeys.anyNull()) {
// NULL join keys cannot match under ordinary equi-join semantics. Spread them
// round-robin within each map task so identical rows do not collapse to one reducer.
val partition = nullKeyPartition
nullKeyPartition = (nullKeyPartition + 1) % h.numPartitions
partition
} else {
partitionIdProjection(row).getInt(0)
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Join keys are evaluated twice for non-NULL rows on this path: once via joinKeyProjection(row) to call anyNull(), again via partitionIdProjection(row).getInt(0) which re-evaluates the same expressions to compute the hash. For most expression shapes that's a tight loop, but redundant.

Could evaluate the keys once, check anyNull on the materialized row, then hash directly from that row.

Combined with the static-nullability gate at ShuffledJoin.canSpreadNullJoinKeys (which skips this path entirely when keys are statically non-nullable), the residual overhead becomes "check the null bitset once per row when at least one key is nullable" — about as low as this gets without adaptive observation of actual NULL frequency.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated it to only evaluate the join keys once but the logic becomes more complicated. Please take another look!

val join = Join(nullableLeft.logicalPlan, nullableRight.logicalPlan,
LeftOuter, Some(joinCondition), JoinHint.NONE)

ExtractEquiJoinKeys.unapply(join).foreach {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExtractEquiJoinKeys.unapply(join).foreach { case ... => withSQLConf { ... } } silently passes if unapply returns None — all assertions live inside the foreach body, so a regression in ExtractEquiJoinKeys would make these tests report success without exercising anything. Prefer:

val (_, leftKeys, rightKeys, boundCondition, _, _, _, _) =
  ExtractEquiJoinKeys.unapply(join).getOrElse(fail("Failed to extract equi-join keys"))
withSQLConf(...) {
  ...
}

Applies to all six new tests in this file using this pattern.

Separately: all six new tests use SortMergeJoinExec. ShuffledHashJoinExec also extends ShuffledJoin and supports LeftOuter / RightOuter / FullOuter (with the matching build side), so it picks up the same canSpreadNullJoinKeys behavior — worth at least one end-to-end test on that path too. And the NullType <=> corner case (which the safety argument for <=> rests on) isn't directly exercised by any of the new tests.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants