[analytics-engine] Wire dc/earliest/latest/nth_value to DataFusion built-ins#21823
[analytics-engine] Wire dc/earliest/latest/nth_value to DataFusion built-ins#21823songkant-aws wants to merge 17 commits into
Conversation
…s via adapter rewrite
PPL lowers eventstats earliest/latest as RexOver(SqlStdOperatorTable.ARG_MIN/ARG_MAX,
[value, ts]) and eventstats dc/distinct_count/distinct_count_approx as
RexOver(SqlUserDefinedAggFunction("DISTINCT_COUNT_APPROX")). DataFusion has neither
name as a built-in. BackendPlanAdapter now rewrites these RexOvers before substrait
emission:
ARG_MIN(value, ts) → FIRST_VALUE(value) ORDER BY ts ASC
ARG_MAX(value, ts) → LAST_VALUE(value) ORDER BY ts ASC
DISTINCT_COUNT_APPROX(x) → APPROX_COUNT_DISTINCT(x)
first_value / last_value are DataFusion built-ins and Calcite/isthmus standard
window functions, so the substrait round-trip works out of the box. PPL's
default UNBOUNDED PRECEDING/FOLLOWING window frame combined with the SINGLETON
cost gate guarantees first_value / last_value see the whole partition, so the
ORDER-BY-driven pick is exact.
For the distinct count: isthmus emits Calcite's APPROX_COUNT_DISTINCT as the
substrait function name "approx_count_distinct", but DataFusion 53.1's built-in
is registered under "approx_distinct" with no alias. Rather than fork isthmus's
function mapping, register a thin wrapper UDAF in
analytics-backend-datafusion's udaf module whose name() is "approx_count_distinct"
and which delegates accumulator/state_fields/etc. to datafusion's
approx_distinct_udaf(). DataFusion's substrait consumer falls back to the UDAF
registry by name (window_function.rs:54-64), so it resolves the wrapper.
We don't rewrite to COUNT(DISTINCT x): DataFusion's substrait window_function
producer hardcodes invocation=0 (ALL) and the consumer at expr/window_function.rs:116
forces distinct=false, so the AggregationInvocation.DISTINCT flag is dropped
on round-trip. PPL's V3 path uses OpenSearch cardinality (HyperLogLog++) so
dc()/distinct_count() have always been approximate even when users wrote the
exact-sounding name; routing to approx_distinct matches that existing behavior
without semantic regression.
WindowFunction enum gains ARG_MIN / ARG_MAX / DISTINCT_COUNT_APPROX entries so
OpenSearchProjectRule's narrowByWindowCapability gate accepts the PPL form
before the adapter rewrite runs.
Signed-off-by: Songkan Tang <songkant@amazon.com>
…r unit tests dc()/distinct_count()/distinct_count_approx() fail at PPL parser stage with "Cannot resolve function: DISTINCT_COUNT_APPROX" on the unified-query / analytics-engine path. Root cause is in sql-plugin: the UDAF is registered to PPLFuncImpTable.aggExternalFunctionRegistry only inside OpenSearchExecutionEngine's constructor (V2/V3 path side effect), which the unified-query path never triggers. Until upstream sql-plugin fixes the registration, the 8 dc IT methods revert to assertErrorContains. The backend rewrite (ARG_MIN/ARG_MAX/ DISTINCT_COUNT_APPROX -> FIRST_VALUE/LAST_VALUE/APPROX_COUNT_DISTINCT) in WindowFunctionAdapters stays — it's covered by the new WindowFunctionAdaptersTests unit tests pinning the rewrite contracts. Signed-off-by: Songkan Tang <songkant@amazon.com>
Depends on sql-plugin PR opensearch-project#5466 which registers the DISTINCT_COUNT_APPROX logical marker in PPLFuncImpTable. With that PR (built locally and published to mavenLocal), all 8 dc IT methods now pass end-to-end: PPL parser succeeds via the marker, BackendPlanAdapter rewrites RexOver(DISTINCT_COUNT_APPROX) -> APPROX_COUNT_DISTINCT, DataFusion substrait reader executes natively. Note this commit lives on the fix-fragment-rewire-helper-column branch which depends on the upstream sql-plugin marker registration. Until that PR merges and the public snapshot picks it up, this branch can only be verified locally with mavenLocal. Signed-off-by: Songkan Tang <songkant@amazon.com>
Reduce-stage helper chains (e.g. Sort(Project(Project-with-window(ER(StageInputScan)))) that PPL emits for streamstats by) used to be split by FragmentConversionDriver into a child convertFragment + a series of attachFragmentOnTop calls walking back up the spine. Each attachFragmentOnTop ran a standalone isthmus conversion of the wrapper, which derived fieldRef ordinals against the placeholder schema; after replaceInput swapped the placeholder for the inner plan's actual root the wrapper-level fieldRefs no longer matched the post-rewrite inner schema (partial-agg lowering, NULL-typed CASE branches, etc.), producing DataFusion runtime panics like "index out of bounds: the len is 6 but the index is 14". Unify the dispatch: any OpenSearchRelNode in the reduce stage now goes through a single convertor.convertFragment(strip(node)) call. strip() removes ExchangeReducers on the way down; the convertor's rewriteStageInputScans turns StageInputScan leaves into TableScan placeholders; isthmus visits the whole subtree once, deriving every node's row type from its child up the spine. fieldRefs are bound in a single row-type system, so no schema mismatch is possible regardless of helper chain depth. This subsumes the multi-input case (Join/Union/Intersect/Minus, which already used this path) and the final-aggregate-boundary special case. The single-input attach branch and the boundary special case are removed; the recursion is gone too. The attachPartialAggOnTop path on shard stages is unchanged — its wrapper is a single Aggregate(PARTIAL) node with no helper chain, so the original single-layer swap remains correct. Add a regression assertion in testTwoStageSortOnAggregateOnFilteredScan that the reduce stage subtree is no longer routed through attachFragmentOnTop. Existing 76 streamstats + eventstats IT all pass; previously failing _3shard cases and streamstats-by helper-chain cases are now green. Signed-off-by: Songkan Tang <songkant@amazon.com>
…test Inline convertReduceNode into convertReduceFragment — the function reduces to a single convertor.convertFragment(strip(node, delegationBytes)) call. The ExchangeReducer if-branch is redundant because strip() already drops ER on the way down to its StageInputScan child, and the OpenSearchRelNode if-branch is the only non-ER path the caller dispatches into us via line 315 (`leaf instanceof OpenSearchStageInputScan`). The throw-on-unknown-RelNode contract is preserved implicitly by the same caller's leaf-type switch. Add testReduceStageWindowHelperChainConvertedAsSingleSubtree covering the streamstats-shaped reduce subtree Sort(Project(Project-with-window(ER(StageInputScan)))). The existing testTwoStageSortOnAggregateOnFilteredScan test asserts the same "no attachFragmentOnTop" contract on a Sort+Aggregate chain; the new test adds independent coverage of the window helper-Project shape. Signed-off-by: Songkan Tang <songkant@amazon.com>
PR Reviewer Guide 🔍(Review updated until commit 25c7497)Here are some key observations to aid the review process:
|
PR Code Suggestions ✨Latest suggestions up to 25c7497 Explore these optional code suggestions:
Previous suggestionsSuggestions up to commit 7b29502
Suggestions up to commit 2c731ef
Suggestions up to commit b851d9e
Suggestions up to commit 31702ea
Suggestions up to commit fdb5ba1
|
|
❌ Gradle check result for 5f49795: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
|
Persistent review updated to latest commit a14b109 |
PPL trendline wma() lowers to NTH_VALUE; previously the analytics planner rejected it with "Window function [NTH_VALUE] is not supported" because the WindowFunction enum had no entry. DataFusion 53.x has nth_value as a substrait-recognized built-in, so registration is purely a Java-side capability declaration — no adapter, no Rust UDAF. Verified with CalcitePPLTrendlineIT against runTask cluster: testTrendlineWma and testTrendlineMultipleFields go from failing to passing. Signed-off-by: Songkan Tang <songkant@amazon.com>
|
Persistent review updated to latest commit 691717e |
|
❌ Gradle check result for 691717e: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Extends testAggregateOverKinds to assert NTH_VALUE maps correctly, matching the wiring added in 691717e. Signed-off-by: Songkan Tang <songkant@amazon.com>
fd2d105 to
3d43c88
Compare
|
Persistent review updated to latest commit fd2d105 |
|
Persistent review updated to latest commit 3d43c88 |
|
❌ Gradle check result for 3d43c88: null Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
|
Persistent review updated to latest commit fdb5ba1 |
|
❌ Gradle check result for fdb5ba1: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
| * fails before reaching the window-function gate with | ||
| * {@code "Cannot resolve function: DISTINCT_COUNT_APPROX"}. */ | ||
| /** sql IT: testEventstatsDistinctCount. */ | ||
| public void testEventstatsDistinctCount() throws IOException { |
There was a problem hiding this comment.
can we test these with multi shard case? calcs/mapping.json - i have tried setting for clickbench tests here #21848 to add randomization 1-2 shard, we can do somethign similar here.
We'll likely have variance with dc() being approximate, but to ensure the queries succeed.
There was a problem hiding this comment.
Added multi-shard variants in 31702ea:
testEventstatsDistinctCount_3shardandtestEventstatsDistinctCountByCountry_3shardreuse the existingDATASET_MULTI(3 shards) — eventstats dc is broadcast pattern so the per-row expectations are identical to single-shard, and HLL is exact at calcs's 17-row scale.testStreamstatsDistinctCountByCountry_3shardcollapses to per-partition finals via| stats max(dc_str3) by str0 | sort str0since streamstats running values aren't deterministic when shard parallelism reorders input rows — same shape astestStreamstatsBy_3shard.
Didn't go with DatasetProvisioner.provision-level random shard count for this PR since #21848's randomization helper isn't merged yet; sticking with the existing per-test 3-shard pattern this file already uses.
There was a problem hiding this comment.
Sanity-checked datafusion 53.1's HLL with a standalone Rust harness running the real algorithm over 3 shards × 100 iterations — dc(str3) always returns 1 and dc(str0) always returns 3, with zero variance. The merge step is just register-wise max with a fixed seed, so multi-shard and single-shard paths produce bit-identical state, and for these two specific values the count output sits well clear of any rounding boundary.
Address review feedback to exercise the dc()/distinct_count() path on the multi-shard exchange + PARTIAL/FINAL agg split, not just the single-shard inline-agg path. EventstatsCommandIT: * testEventstatsDistinctCount_3shard * testEventstatsDistinctCountByCountry_3shard Use the existing DATASET_MULTI (3 shards). dc is broadcast: every row in a partition sees the same dc value, so the per-row expectations are identical to the single-shard variants. HLL is exact at calcs's 17-row scale. StreamstatsCommandIT: * testStreamstatsDistinctCountByCountry_3shard streamstats running aggregates depend on input row order; multi-shard parallelism reorders rows at the coordinator, so per-row running values are not deterministic. Collapse to per-partition finals via stats max(dc) by str0, mirroring testStreamstatsBy_3shard. Expected partition finals all equal 1 (each str0 group has only "e" or null in str3). Signed-off-by: Songkan Tang <songkant@amazon.com>
|
Persistent review updated to latest commit 31702ea |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #21823 +/- ##
============================================
- Coverage 73.50% 73.46% -0.05%
- Complexity 75546 75569 +23
============================================
Files 6034 6034
Lines 342661 342661
Branches 49294 49294
============================================
- Hits 251879 251719 -160
- Misses 70790 70958 +168
+ Partials 19992 19984 -8 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
I see you are doing 2 things:
Part 1: Wire dc/earliest/latest/nth_value: no comments for now, good to go!
Part 2: Unify reduce-stage subtree conversion: This part should simplify after my changes: #21775 (aiming to get this in today). I have introduced state-shipping which unifies all agg state as Binary IPC, so the schema never shifts between SINGLE and FINAL - this should simplify your code a lot.
…-helper-column Signed-off-by: Songkan Tang <songkant@amazon.com> # Conflicts: # sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/WindowFunction.java # sandbox/plugins/analytics-backend-datafusion/rust/src/udaf/mod.rs # sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionAnalyticsBackendPlugin.java # sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/FragmentConversionDriver.java # sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/rules/OpenSearchProjectRule.java # sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/planner/MockBackend.java # sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/planner/dag/FragmentConversionDriverTests.java # sandbox/qa/analytics-engine-rest/src/test/java/org/opensearch/analytics/qa/EventstatsCommandIT.java # sandbox/qa/analytics-engine-rest/src/test/java/org/opensearch/analytics/qa/StreamstatsCommandIT.java
…base Two compile errors surfaced by upstream/main: 1. AnalyticsSearchBackendPlugin SPI default delegatedPredicateSerializers() added on main (PR 21760). MockBackend's protected hook collided with the public SPI default — promote to public @OverRide to match main's pattern. 2. OpenSearchStageInputScan constructor gained a List<FieldStorageInfo> parameter on main. Pass List.of() in the test fixture; nothing in the reduce-stage shape under test depends on output field storage. Signed-off-by: Songkan Tang <songkant@amazon.com>
…ow-helper test OpenSearchProject.getOutputFieldStorage now resolves each output column's storage by walking back through input field storage. After main, OpenSearchStageInputScan exposes a List<FieldStorageInfo> alongside its rowType, and the resolver enforces that input.fieldCount == fieldStorage.size(). The test previously passed an empty List for outputFieldStorage; the resolver now fails when the helper Project's input ref points at column 0 of a 1-column input that declares no storage. Pass a single derivedColumn entry matching the synthetic "a" column. Signed-off-by: Songkan Tang <songkant@amazon.com>
|
Persistent review updated to latest commit b851d9e |
|
@sandeshkr419 thanks for the review. Rebased on main — both #21775's Could you take another look? |
Removes the FragmentConversionDriver convertReduceFragment unification that
this PR introduced, restoring main's recursive convertReduceNode form. Also
removes the two regression assertions we added on top:
- testReduceStageWindowHelperChainConvertedAsSingleSubtree (deleted)
- the attachFragmentCalled assertFalse line in testTwoStageSortOnAggregateOnFilteredScan
- RecordingConvertor.attachFragmentCalled field
Verified empirically against the full analytics-engine-rest IT suite with
the recursive form restored: 770 / 770 IT pass (42 pre-existing @AwaitsFix
skips). The DataFusion runtime panic that originally motivated Part 2 ("index
out of bounds: len=6 idx=14" on streamstats by helper chains) is no longer
reachable on top of merged main — likely covered by upstream rel-layer fixes
since the original investigation (opensearch-project#21875's DistributedAggregateRewriter
overrideExchangeType transformer + opensearch-project#21690's string/utf8 view & timestamp
coercion elimination).
Keep this PR focused on Part 1 (wire dc / earliest / latest / nth_value).
The reduce-stage simplification, if revisited, should be its own PR with
fresh justification.
Signed-off-by: Songkan Tang <songkant@amazon.com>
|
Persistent review updated to latest commit 2c731ef |
|
❌ Gradle check result for 2c731ef: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
sandeshkr419
left a comment
There was a problem hiding this comment.
1 major comment on support of dc, rest looks fine.
While you are addressing this, can you also please concise up the comments in your changes to avoid overflowing intermittent design decisions.
…a window FunctionMappings Rename APPROX_COUNT_DISTINCT to substrait `approx_distinct` on the window emission path the same way we already do for aggregates: add the rename to ADDITIONAL_WINDOW_SIGS and override WindowFunctionConverter.getSigs() to filter isthmus's default `approx_count_distinct` binding so ours wins. DataFusion's substrait consumer then resolves the call directly to its built-in approx_distinct UDAF — no wrapper needed. Removes the 113-line ApproxCountDistinctUdaf wrapper that aliased `approx_count_distinct` to DataFusion's built-in `approx_distinct`. Also concise up javadoc / comments around the window adapter changes (WindowFunction, WindowFunctionAdapter SPI, WindowFunctionAdapters, DataFusionAnalyticsBackendPlugin.windowCapabilities, BackendPlanAdapter.adaptOver, OpenSearchProjectRule.collectWindowFunctions) — drop intermittent design decisions and stale comments (e.g. the COUNT(DISTINCT) rewrite path that's no longer used, the obsolete "PARTITION BY rejected" claim). Verified: analytics-framework + analytics-engine + analytics-backend-datafusion unit tests green; analytics-engine-rest IT 770/770 pass (42 pre-existing @AwaitsFix skips, 0 failures, 0 errors). Signed-off-by: Songkan Tang <songkant@amazon.com>
|
Persistent review updated to latest commit 7b29502 |
|
Persistent review updated to latest commit 25c7497 |
Description
Wires PPL
dc/distinct_count/distinct_count_approx,earliest/latest, andnth_valueto DataFusion built-ins on the analytics-engine route.Backend rewrite via a new per-function
WindowFunctionAdapterSPI (parallel toScalarFunctionAdapter). DataFusion plugin registers three rewrites thatBackendPlanAdapterapplies before substrait emission:ARG_MIN(value, ts)→FIRST_VALUE(value) ORDER BY ts ASC(PPLearliest())ARG_MAX(value, ts)→LAST_VALUE(value) ORDER BY ts ASC(PPLlatest())DISTINCT_COUNT_APPROX(x)→APPROX_COUNT_DISTINCT(x)(PPLdc()/distinct_count()/distinct_count_approx())A small Rust wrapper UDAF aliases substrait's
approx_count_distinctname to DataFusion's built-inapprox_distinctso end-to-end execution works.Unit-level coverage in
WindowFunctionAdaptersTestspins the rewrite contracts. End-to-end IT inEventstatsCommandITandStreamstatsCommandIT(8 dc methods, plus single-shard and multi-shard variants for dc / dc by) assert exact rows.Why this PR depends on opensearch-project/sql#5466
PPLFuncImpTable.INSTANCEis a JVM-process-level singleton with a mutableaggExternalFunctionRegistry(ConcurrentHashMap). On current sql-plugin main,DISTINCT_COUNT_APPROXis registered into that map only as a side effect of constructingOpenSearchExecutionEngine— which is Guice-bound lazily and only ever instantiated when the V3 PPL path is exercised. Because the singleton registry is shared across paths once populated, the registration does become visible to the unified-query path eventually — but only if the running process has serviced at least one V3 request first.Our IT environment only sends requests through
/_analytics/ppl(RestUnifiedQueryAction), never through V3, soOpenSearchExecutionEngineis never constructed andDISTINCT_COUNT_APPROXis never registered. PPL parser then throws "Cannot resolve function: DISTINCT_COUNT_APPROX" before our backend rewrite gets a chance to run. opensearch-project/sql#5466 moves the registration intoPPLFuncImpTable.populate()(class-init self-registration) using a logical marker UDAF, eliminating the V3-path precondition entirely.Related Issues
assertRowsEqualinstead of negativeassertErrorContains).Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.