[Analytics Engine] Make PPL chart work on the analytics-engine route (partition-boundary coercion + non-prefix agg-split type fix)#21878
Conversation
PR Reviewer Guide 🔍(Review updated until commit 946d240)Here are some key observations to aid the review process:
|
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #21878 +/- ##
============================================
+ Coverage 73.36% 73.44% +0.07%
- Complexity 75430 75507 +77
============================================
Files 6034 6034
Lines 342604 342604
Branches 49279 49279
============================================
+ Hits 251357 251625 +268
+ Misses 71220 70997 -223
+ Partials 20027 19982 -45 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
This coercion should not be required once #21775 lands cc @sandeshkr419 |
…ce-stage boundary
A producer fragment emits batches in DataFusion's raw physical types — UInt64
for `row_number()`, BinaryView for view-typed columns, Float16 for half_float —
but the consumer fragment's Substrait plan declares the coerced, Substrait-
compatible forms (Int64 / Binary / Float32) that `coerce_inferred_schema`
already applies to scan-inferred schemas. `LocalSession::register_partition`
registered the StreamingTable under the raw producer schema, so when a column
of one of those types crosses the reduce exchange the consumer plan fails to
bind:
Substrait error: Field '<col>' in Substrait schema has a different type
(Int64) than the corresponding field in the table schema (UInt64).
Register the StreamingTable under the coerced schema and coerce each incoming
batch (new `coerce_record_batch` helper + `CoercingReceiverPartition`) so the
table the consumer binds against matches what flows out of it. The channel and
the returned sender keep the un-coerced schema, so the Java-side schema
tripwire still validates producer batches against what the producer actually
emits. `coerce_inferred_schema` returns the same `Arc` when nothing needs
rewriting, so the common case keeps the cheaper passthrough partition.
Surfaced by PPL `chart` on the analytics-engine route, whose ranked
`ROW_NUMBER() OVER (...)` subplan crosses a join exchange; also covers any
`unsigned_long` / `binary` / `ip` / `half_float` column that survives across a
reduce stage.
Signed-off-by: Kai Huang <ahkcs@amazon.com>
…pe mismatch (exact SqlTypeName)
OpenSearchAggregateSplitRule.shouldSkipPartialFinalSplit() skips the
PARTIAL/FINAL split for non-prefix group sets whose group-key ordinal would
land on a PARTIAL agg-output slot — the structural FINAL reuses the original
ordinals over the keys-first PARTIAL output, so a non-prefix group key would
take the agg-output column's type, which must equal the original input field
type or Volcano's Aggregate.typeMatchesInferred / row-type equivalence check
throws.
The collision check compared SqlTypeFamily, which is too coarse: BIGINT and
INTEGER both belong to the NUMERIC family, so a span group key (INTEGER) over
a measure declared BIGINT slipped through and the split was attempted,
crashing with either:
AssertionError: type mismatch: aggCall type: BIGINT inferred type: INTEGER
or
IllegalArgumentException: Type mismatch: rel rowtype ... $f2: BIGINT -> INTEGER
Compare the exact SqlTypeName instead, so these cases fall back to a SINGLE
coordinator aggregate (always correct) rather than an unsound split.
Surfaced by PPL `chart ... by <field> span=...` over an INTEGER measure on the
analytics-engine route (e.g. `chart max(balance) by age span=10`); also covers
the equivalent `stats ... by <field> span=...` shape.
Signed-off-by: Kai Huang <ahkcs@amazon.com>
2ef4acb to
f4559d6
Compare
|
Persistent review updated to latest commit f4559d6 |
PR Code Suggestions ✨Latest suggestions up to 946d240
Previous suggestionsSuggestions up to commit f4559d6
|
|
Persistent review updated to latest commit 946d240 |
|
❌ Gradle check result for 946d240: null Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Description
Two type-handling fixes on the analytics-engine route, both surfaced by PPL
chart(force-routed through the analytics-engine path). No SQL-plugin change — chart's lowering is already merged and unchanged.1. Coerce streaming-partition schema & batches at the reduce-stage boundary (DataFusion backend)
A producer fragment emits batches in DataFusion's raw physical Arrow types —
UInt64(e.g.row_number()),BinaryView(view-typed columns),Float16(half_float) — but the consumer fragment's Substrait plan declares the coerced forms (Int64/Binary/Float32) thatcoerce_inferred_schemaalready applies to scan-inferred schemas.LocalSession::register_partitionregistered the reduce-stageStreamingTableunder the raw producer schema, so when a column of one of those types crosses the reduce exchange the consumer plan can fail to bind:Register the
StreamingTableunder the coerced schema and coerce each incoming batch (newcoerce_record_batch+CoercingReceiverPartition). The channel/sender keep the un-coerced schema so the Java-side tripwire still validates producer batches;coerce_inferred_schemareturns the sameArcwhen nothing needs rewriting, so plans without these types keep the cheaper passthrough partition (no behavior change).2. Skip partial/final split on non-prefix groupSet type mismatch — exact
SqlTypeName(analytics-engine planner)OpenSearchAggregateSplitRule.shouldSkipPartialFinalSplit()skips the split for non-prefix group sets, where a group-key ordinalk >= groupCountlands on a PARTIAL agg-output slot — the structural FINAL reuses original ordinals over the keys-first PARTIAL output, so the group key takes the agg-output type, which must equal the original input-field type or Volcano'stypeMatchesInferred/ row-type-equivalence check throws. The collision check comparedSqlTypeFamily, which is too coarse:BIGINTandINTEGERshare theNUMERICfamily, so a span group key (INTEGER) over a measure declaredBIGINTcould slip through and planning crash with:Compare the exact
SqlTypeName, so such cases fall back to a SINGLE coordinator aggregate (always correct). Prefix group sets (the common case) are unaffected — they early-return before this check.Both are surfaced by
chart ... by <field> span=...over anINTEGERmeasure and by chart's rankedROW_NUMBER() OVER (...)subplan crossing a join exchange; the equivalentstats ... by <field> span=...and anyunsigned_long/binary/float16column crossing a reduce stage hit the same paths.Tests
New Rust unit tests:
coerce_record_batch_rewrites_uint64_column,coerce_record_batch_passthrough_when_already_matching.Check List
--signoff.