Skip to content

[Analytics Engine] Make PPL chart work on the analytics-engine route (partition-boundary coercion + non-prefix agg-split type fix)#21878

Open
ahkcs wants to merge 3 commits into
opensearch-project:mainfrom
ahkcs:pr/analytics-partition-coerce
Open

[Analytics Engine] Make PPL chart work on the analytics-engine route (partition-boundary coercion + non-prefix agg-split type fix)#21878
ahkcs wants to merge 3 commits into
opensearch-project:mainfrom
ahkcs:pr/analytics-partition-coerce

Conversation

@ahkcs
Copy link
Copy Markdown
Contributor

@ahkcs ahkcs commented May 28, 2026

Description

Two type-handling fixes on the analytics-engine route, both surfaced by PPL chart (force-routed through the analytics-engine path). No SQL-plugin change — chart's lowering is already merged and unchanged.

1. Coerce streaming-partition schema & batches at the reduce-stage boundary (DataFusion backend)

A producer fragment emits batches in DataFusion's raw physical Arrow types — UInt64 (e.g. row_number()), BinaryView (view-typed columns), Float16 (half_float) — but the consumer fragment's Substrait plan declares the coerced forms (Int64 / Binary / Float32) that coerce_inferred_schema already applies to scan-inferred schemas. LocalSession::register_partition registered the reduce-stage StreamingTable under the raw producer schema, so when a column of one of those types crosses the reduce exchange the consumer plan can fail to bind:

Substrait error: Field 'X' in Substrait schema has a different type (Int64) than the corresponding field in the table schema (UInt64).

Register the StreamingTable under the coerced schema and coerce each incoming batch (new coerce_record_batch + CoercingReceiverPartition). The channel/sender keep the un-coerced schema so the Java-side tripwire still validates producer batches; coerce_inferred_schema returns the same Arc when nothing needs rewriting, so plans without these types keep the cheaper passthrough partition (no behavior change).

2. Skip partial/final split on non-prefix groupSet type mismatch — exact SqlTypeName (analytics-engine planner)

OpenSearchAggregateSplitRule.shouldSkipPartialFinalSplit() skips the split for non-prefix group sets, where a group-key ordinal k >= groupCount lands on a PARTIAL agg-output slot — the structural FINAL reuses original ordinals over the keys-first PARTIAL output, so the group key takes the agg-output type, which must equal the original input-field type or Volcano's typeMatchesInferred / row-type-equivalence check throws. The collision check compared SqlTypeFamily, which is too coarse: BIGINT and INTEGER share the NUMERIC family, so a span group key (INTEGER) over a measure declared BIGINT could slip through and planning crash with:

AssertionError: type mismatch: aggCall type: BIGINT inferred type: INTEGER
IllegalArgumentException: Type mismatch: rel rowtype ... $f2: BIGINT -> INTEGER

Compare the exact SqlTypeName, so such cases fall back to a SINGLE coordinator aggregate (always correct). Prefix group sets (the common case) are unaffected — they early-return before this check.

Both are surfaced by chart ... by <field> span=... over an INTEGER measure and by chart's ranked ROW_NUMBER() OVER (...) subplan crossing a join exchange; the equivalent stats ... by <field> span=... and any unsigned_long / binary / float16 column crossing a reduce stage hit the same paths.

Tests

New Rust unit tests: coerce_record_batch_rewrites_uint64_column, coerce_record_batch_passthrough_when_already_matching.

Check List

  • New functionality includes testing.
  • Commits are signed per the DCO using --signoff.

@ahkcs ahkcs requested a review from a team as a code owner May 28, 2026 23:31
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 28, 2026

PR Reviewer Guide 🔍

(Review updated until commit 946d240)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 Multiple PR themes

Sub-PR theme: Fix non-prefix agg-split type check (SqlTypeName)

Relevant files:

  • sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/rules/OpenSearchAggregateSplitRule.java

Sub-PR theme: Add partition-boundary schema coercion (DataFusion)

Relevant files:

  • sandbox/plugins/analytics-backend-datafusion/rust/src/local_executor.rs
  • sandbox/plugins/analytics-backend-datafusion/rust/src/partition_stream.rs
  • sandbox/plugins/analytics-backend-datafusion/rust/src/schema_coerce.rs

⚡ No major issues detected

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 2ef4acb: SUCCESS

@codecov
Copy link
Copy Markdown

codecov Bot commented May 29, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 73.44%. Comparing base (e0404f4) to head (f4559d6).
⚠️ Report is 3 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main   #21878      +/-   ##
============================================
+ Coverage     73.36%   73.44%   +0.07%     
- Complexity    75430    75507      +77     
============================================
  Files          6034     6034              
  Lines        342604   342604              
  Branches      49279    49279              
============================================
+ Hits         251357   251625     +268     
+ Misses        71220    70997     -223     
+ Partials      20027    19982      -45     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@mch2
Copy link
Copy Markdown
Member

mch2 commented May 29, 2026

This coercion should not be required once #21775 lands cc @sandeshkr419

ahkcs added 2 commits May 29, 2026 11:41
…ce-stage boundary

A producer fragment emits batches in DataFusion's raw physical types — UInt64
for `row_number()`, BinaryView for view-typed columns, Float16 for half_float —
but the consumer fragment's Substrait plan declares the coerced, Substrait-
compatible forms (Int64 / Binary / Float32) that `coerce_inferred_schema`
already applies to scan-inferred schemas. `LocalSession::register_partition`
registered the StreamingTable under the raw producer schema, so when a column
of one of those types crosses the reduce exchange the consumer plan fails to
bind:

    Substrait error: Field '<col>' in Substrait schema has a different type
    (Int64) than the corresponding field in the table schema (UInt64).

Register the StreamingTable under the coerced schema and coerce each incoming
batch (new `coerce_record_batch` helper + `CoercingReceiverPartition`) so the
table the consumer binds against matches what flows out of it. The channel and
the returned sender keep the un-coerced schema, so the Java-side schema
tripwire still validates producer batches against what the producer actually
emits. `coerce_inferred_schema` returns the same `Arc` when nothing needs
rewriting, so the common case keeps the cheaper passthrough partition.

Surfaced by PPL `chart` on the analytics-engine route, whose ranked
`ROW_NUMBER() OVER (...)` subplan crosses a join exchange; also covers any
`unsigned_long` / `binary` / `ip` / `half_float` column that survives across a
reduce stage.

Signed-off-by: Kai Huang <ahkcs@amazon.com>
…pe mismatch (exact SqlTypeName)

OpenSearchAggregateSplitRule.shouldSkipPartialFinalSplit() skips the
PARTIAL/FINAL split for non-prefix group sets whose group-key ordinal would
land on a PARTIAL agg-output slot — the structural FINAL reuses the original
ordinals over the keys-first PARTIAL output, so a non-prefix group key would
take the agg-output column's type, which must equal the original input field
type or Volcano's Aggregate.typeMatchesInferred / row-type equivalence check
throws.

The collision check compared SqlTypeFamily, which is too coarse: BIGINT and
INTEGER both belong to the NUMERIC family, so a span group key (INTEGER) over
a measure declared BIGINT slipped through and the split was attempted,
crashing with either:

    AssertionError: type mismatch: aggCall type: BIGINT inferred type: INTEGER
or
    IllegalArgumentException: Type mismatch: rel rowtype ... $f2: BIGINT -> INTEGER

Compare the exact SqlTypeName instead, so these cases fall back to a SINGLE
coordinator aggregate (always correct) rather than an unsound split.

Surfaced by PPL `chart ... by <field> span=...` over an INTEGER measure on the
analytics-engine route (e.g. `chart max(balance) by age span=10`); also covers
the equivalent `stats ... by <field> span=...` shape.

Signed-off-by: Kai Huang <ahkcs@amazon.com>
@ahkcs ahkcs force-pushed the pr/analytics-partition-coerce branch from 2ef4acb to f4559d6 Compare May 29, 2026 18:42
@ahkcs ahkcs changed the title [Analytics] Coerce streaming-partition schema/batches at the reduce-stage boundary [Analytics Engine] Make PPL chart work on the analytics-engine route (partition-boundary coercion + non-prefix agg-split type fix) May 29, 2026
@github-actions github-actions Bot added :sanitize Removing elastic specific artifacts :xpack-removal Related to removal of x-pack >FORK Related to the fork process Meta Meta issue, not directly linked to a PR labels May 29, 2026
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit f4559d6

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 29, 2026

PR Code Suggestions ✨

Latest suggestions up to 946d240
Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Validate column count before indexing

Accessing target.field(i) without bounds checking could panic if batch has more
columns than target. Add validation to ensure the schemas have matching column
counts before iterating.

sandbox/plugins/analytics-backend-datafusion/rust/src/schema_coerce.rs [143-144]

+if batch.num_columns() != target.fields().len() {
+    return Err(DataFusionError::Execution(format!(
+        "coerce_record_batch: column count mismatch: batch has {} columns, target has {}",
+        batch.num_columns(),
+        target.fields().len()
+    )));
+}
 for (i, column) in batch.columns().iter().enumerate() {
     let want = target.field(i).data_type();
Suggestion importance[1-10]: 7

__

Why: Valid defensive programming to prevent potential panics from out-of-bounds access when batch and target have mismatched column counts, though RecordBatch::try_new at line 159 would catch this later.

Medium

Previous suggestions

Suggestions up to commit f4559d6
CategorySuggestion                                                                                                                                    Impact
Possible issue
Validate column count before indexing

Accessing target.field(i) without bounds checking could panic if batch has more
columns than target. Add validation to ensure the schemas have matching column
counts before iterating.

sandbox/plugins/analytics-backend-datafusion/rust/src/schema_coerce.rs [143-144]

+if batch.num_columns() != target.fields().len() {
+    return Err(DataFusionError::Execution(format!(
+        "coerce_record_batch: column count mismatch: batch has {} columns, target has {}",
+        batch.num_columns(),
+        target.fields().len()
+    )));
+}
 for (i, column) in batch.columns().iter().enumerate() {
     let want = target.field(i).data_type();
Suggestion importance[1-10]: 7

__

Why: Valid defensive programming to prevent potential panics from out-of-bounds access when batch and target have mismatched column counts. However, this scenario should not occur in normal operation given the function's contract.

Medium

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for f4559d6: SUCCESS

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 946d240

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 946d240: null

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

>FORK Related to the fork process Meta Meta issue, not directly linked to a PR :sanitize Removing elastic specific artifacts :xpack-removal Related to removal of x-pack

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants