Skip to content

[analytics-engine] Wire dc/earliest/latest/nth_value to DataFusion built-ins#21823

Open
songkant-aws wants to merge 17 commits into
opensearch-project:mainfrom
songkant-aws:fix-fragment-rewire-helper-column
Open

[analytics-engine] Wire dc/earliest/latest/nth_value to DataFusion built-ins#21823
songkant-aws wants to merge 17 commits into
opensearch-project:mainfrom
songkant-aws:fix-fragment-rewire-helper-column

Conversation

@songkant-aws
Copy link
Copy Markdown
Contributor

@songkant-aws songkant-aws commented May 25, 2026

Description

Wires PPL dc/distinct_count/distinct_count_approx, earliest/latest, and nth_value to DataFusion built-ins on the analytics-engine route.

Backend rewrite via a new per-function WindowFunctionAdapter SPI (parallel to ScalarFunctionAdapter). DataFusion plugin registers three rewrites that BackendPlanAdapter applies before substrait emission:

  • ARG_MIN(value, ts)FIRST_VALUE(value) ORDER BY ts ASC (PPL earliest())
  • ARG_MAX(value, ts)LAST_VALUE(value) ORDER BY ts ASC (PPL latest())
  • DISTINCT_COUNT_APPROX(x)APPROX_COUNT_DISTINCT(x) (PPL dc() / distinct_count() / distinct_count_approx())

A small Rust wrapper UDAF aliases substrait's approx_count_distinct name to DataFusion's built-in approx_distinct so end-to-end execution works.

Unit-level coverage in WindowFunctionAdaptersTests pins the rewrite contracts. End-to-end IT in EventstatsCommandIT and StreamstatsCommandIT (8 dc methods, plus single-shard and multi-shard variants for dc / dc by) assert exact rows.

Why this PR depends on opensearch-project/sql#5466

PPLFuncImpTable.INSTANCE is a JVM-process-level singleton with a mutable aggExternalFunctionRegistry (ConcurrentHashMap). On current sql-plugin main, DISTINCT_COUNT_APPROX is registered into that map only as a side effect of constructing OpenSearchExecutionEngine — which is Guice-bound lazily and only ever instantiated when the V3 PPL path is exercised. Because the singleton registry is shared across paths once populated, the registration does become visible to the unified-query path eventually — but only if the running process has serviced at least one V3 request first.

Our IT environment only sends requests through /_analytics/ppl (RestUnifiedQueryAction), never through V3, so OpenSearchExecutionEngine is never constructed and DISTINCT_COUNT_APPROX is never registered. PPL parser then throws "Cannot resolve function: DISTINCT_COUNT_APPROX" before our backend rewrite gets a chance to run. opensearch-project/sql#5466 moves the registration into PPLFuncImpTable.populate() (class-init self-registration) using a logical marker UDAF, eliminating the V3-path precondition entirely.

Related Issues

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.

…s via adapter rewrite

PPL lowers eventstats earliest/latest as RexOver(SqlStdOperatorTable.ARG_MIN/ARG_MAX,
[value, ts]) and eventstats dc/distinct_count/distinct_count_approx as
RexOver(SqlUserDefinedAggFunction("DISTINCT_COUNT_APPROX")). DataFusion has neither
name as a built-in. BackendPlanAdapter now rewrites these RexOvers before substrait
emission:

  ARG_MIN(value, ts)        →  FIRST_VALUE(value) ORDER BY ts ASC
  ARG_MAX(value, ts)        →  LAST_VALUE(value)  ORDER BY ts ASC
  DISTINCT_COUNT_APPROX(x)  →  APPROX_COUNT_DISTINCT(x)

first_value / last_value are DataFusion built-ins and Calcite/isthmus standard
window functions, so the substrait round-trip works out of the box. PPL's
default UNBOUNDED PRECEDING/FOLLOWING window frame combined with the SINGLETON
cost gate guarantees first_value / last_value see the whole partition, so the
ORDER-BY-driven pick is exact.

For the distinct count: isthmus emits Calcite's APPROX_COUNT_DISTINCT as the
substrait function name "approx_count_distinct", but DataFusion 53.1's built-in
is registered under "approx_distinct" with no alias. Rather than fork isthmus's
function mapping, register a thin wrapper UDAF in
analytics-backend-datafusion's udaf module whose name() is "approx_count_distinct"
and which delegates accumulator/state_fields/etc. to datafusion's
approx_distinct_udaf(). DataFusion's substrait consumer falls back to the UDAF
registry by name (window_function.rs:54-64), so it resolves the wrapper.

We don't rewrite to COUNT(DISTINCT x): DataFusion's substrait window_function
producer hardcodes invocation=0 (ALL) and the consumer at expr/window_function.rs:116
forces distinct=false, so the AggregationInvocation.DISTINCT flag is dropped
on round-trip. PPL's V3 path uses OpenSearch cardinality (HyperLogLog++) so
dc()/distinct_count() have always been approximate even when users wrote the
exact-sounding name; routing to approx_distinct matches that existing behavior
without semantic regression.

WindowFunction enum gains ARG_MIN / ARG_MAX / DISTINCT_COUNT_APPROX entries so
OpenSearchProjectRule's narrowByWindowCapability gate accepts the PPL form
before the adapter rewrite runs.

Signed-off-by: Songkan Tang <songkant@amazon.com>
…r unit tests

dc()/distinct_count()/distinct_count_approx() fail at PPL parser stage with
"Cannot resolve function: DISTINCT_COUNT_APPROX" on the unified-query /
analytics-engine path. Root cause is in sql-plugin: the UDAF is registered
to PPLFuncImpTable.aggExternalFunctionRegistry only inside
OpenSearchExecutionEngine's constructor (V2/V3 path side effect), which
the unified-query path never triggers.

Until upstream sql-plugin fixes the registration, the 8 dc IT methods
revert to assertErrorContains. The backend rewrite (ARG_MIN/ARG_MAX/
DISTINCT_COUNT_APPROX -> FIRST_VALUE/LAST_VALUE/APPROX_COUNT_DISTINCT)
in WindowFunctionAdapters stays — it's covered by the new
WindowFunctionAdaptersTests unit tests pinning the rewrite contracts.

Signed-off-by: Songkan Tang <songkant@amazon.com>
Depends on sql-plugin PR opensearch-project#5466 which registers the DISTINCT_COUNT_APPROX
logical marker in PPLFuncImpTable. With that PR (built locally and
published to mavenLocal), all 8 dc IT methods now pass end-to-end:
PPL parser succeeds via the marker, BackendPlanAdapter rewrites
RexOver(DISTINCT_COUNT_APPROX) -> APPROX_COUNT_DISTINCT, DataFusion
substrait reader executes natively.

Note this commit lives on the fix-fragment-rewire-helper-column branch
which depends on the upstream sql-plugin marker registration. Until
that PR merges and the public snapshot picks it up, this branch can
only be verified locally with mavenLocal.

Signed-off-by: Songkan Tang <songkant@amazon.com>
Reduce-stage helper chains (e.g. Sort(Project(Project-with-window(ER(StageInputScan))))
that PPL emits for streamstats by) used to be split by FragmentConversionDriver into
a child convertFragment + a series of attachFragmentOnTop calls walking back up the
spine. Each attachFragmentOnTop ran a standalone isthmus conversion of the wrapper,
which derived fieldRef ordinals against the placeholder schema; after replaceInput
swapped the placeholder for the inner plan's actual root the wrapper-level fieldRefs
no longer matched the post-rewrite inner schema (partial-agg lowering, NULL-typed
CASE branches, etc.), producing DataFusion runtime panics like "index out of bounds:
the len is 6 but the index is 14".

Unify the dispatch: any OpenSearchRelNode in the reduce stage now goes through a
single convertor.convertFragment(strip(node)) call. strip() removes ExchangeReducers
on the way down; the convertor's rewriteStageInputScans turns StageInputScan leaves
into TableScan placeholders; isthmus visits the whole subtree once, deriving every
node's row type from its child up the spine. fieldRefs are bound in a single row-type
system, so no schema mismatch is possible regardless of helper chain depth.

This subsumes the multi-input case (Join/Union/Intersect/Minus, which already used
this path) and the final-aggregate-boundary special case. The single-input attach
branch and the boundary special case are removed; the recursion is gone too. The
attachPartialAggOnTop path on shard stages is unchanged — its wrapper is a single
Aggregate(PARTIAL) node with no helper chain, so the original single-layer swap
remains correct.

Add a regression assertion in testTwoStageSortOnAggregateOnFilteredScan that the
reduce stage subtree is no longer routed through attachFragmentOnTop. Existing 76
streamstats + eventstats IT all pass; previously failing _3shard cases and
streamstats-by helper-chain cases are now green.

Signed-off-by: Songkan Tang <songkant@amazon.com>
…test

Inline convertReduceNode into convertReduceFragment — the function reduces to
a single convertor.convertFragment(strip(node, delegationBytes)) call. The
ExchangeReducer if-branch is redundant because strip() already drops ER on
the way down to its StageInputScan child, and the OpenSearchRelNode if-branch
is the only non-ER path the caller dispatches into us via line 315
(`leaf instanceof OpenSearchStageInputScan`). The throw-on-unknown-RelNode
contract is preserved implicitly by the same caller's leaf-type switch.

Add testReduceStageWindowHelperChainConvertedAsSingleSubtree covering the
streamstats-shaped reduce subtree
Sort(Project(Project-with-window(ER(StageInputScan)))). The existing
testTwoStageSortOnAggregateOnFilteredScan test asserts the same
"no attachFragmentOnTop" contract on a Sort+Aggregate chain; the new test
adds independent coverage of the window helper-Project shape.

Signed-off-by: Songkan Tang <songkant@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 25, 2026

PR Reviewer Guide 🔍

(Review updated until commit 25c7497)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Fallback Logic Gap

ArgFunctionAdapter.adapt() falls back to a passthrough rebuild when operands.size() != 2, but this silently produces incorrect output. If ARG_MIN/ARG_MAX arrive with 1 or 3+ operands due to a planner bug or future PPL syntax change, the rewritten RexOver will carry the wrong operator (ARG_MIN/ARG_MAX instead of FIRST_VALUE/LAST_VALUE) into substrait emission, likely causing a runtime error or wrong results. Consider logging a warning or throwing an exception instead of silent passthrough.

if (operands.size() != 2) {
    // Unexpected shape — preserve original behavior as best as possible.
    return rebuild(original, original.getAggOperator(), operands, partitions, orderKeys, original.isDistinct(), cluster);
}
Filter Predicate Fragility

The anonymous WindowFunctionConverter override filters out APPROX_COUNT_DISTINCT from the inherited sig list via a stream operation that reconstructs an ImmutableList. If getSigs() is called multiple times (e.g., during repeated conversions in a loop or retry scenario), the filter runs each time but the parent's sig list is already immutable, so the filter is safe. However, if a future refactor changes getSigs() to return a mutable list or caches the filtered result incorrectly, the filter could be bypassed. The current code is correct but the pattern is subtle. Consider caching the filtered list in a final field initialized in the constructor for clarity.

    @Override
    protected ImmutableList<FunctionMappings.Sig> getSigs() {
        return super.getSigs().stream()
            .filter(sig -> sig.operator != SqlStdOperatorTable.APPROX_COUNT_DISTINCT)
            .collect(ImmutableList.toImmutableList());
    }
};

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 25, 2026

PR Code Suggestions ✨

Latest suggestions up to 25c7497

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
General
Log warning for unexpected operand count

When operands.size() != 2, the fallback path preserves the original operator but the
operands list may have been adapted by scalar recursion. This creates an
inconsistent state where adapted operands are paired with the original (potentially
incompatible) operator. Consider logging a warning about the unexpected shape to aid
debugging.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/WindowFunctionAdapters.java [66-88]

 private record ArgFunctionAdapter(SqlAggFunction target) implements WindowFunctionAdapter {
+    private static final Logger logger = LogManager.getLogger(ArgFunctionAdapter.class);
+    
     @Override
     public RexNode adapt(
         RexOver original,
         List<RexNode> operands,
         List<RexNode> partitions,
         List<RexFieldCollation> orderKeys,
         RelOptCluster cluster
     ) {
         if (operands.size() != 2) {
-            // Unexpected shape — preserve original behavior as best as possible.
+            logger.warn("Unexpected operand count {} for ARG_MIN/ARG_MAX, expected 2. Preserving original operator.", operands.size());
             return rebuild(original, original.getAggOperator(), operands, partitions, orderKeys, original.isDistinct(), cluster);
         }
         return rebuild(
             original,
             target,
             List.of(operands.get(0)),
             partitions,
             appendAsc(orderKeys, operands.get(1)),
             original.isDistinct(),
             cluster
         );
     }
 }
Suggestion importance[1-10]: 6

__

Why: Adding logging for the unexpected operand count fallback path would aid debugging and make the fallback behavior more observable. The suggestion correctly identifies that the fallback preserves the original operator with adapted operands, which could be inconsistent. However, this is a defensive code path for malformed input, and the logging addition is a minor observability improvement rather than a correctness fix.

Low
Return immutable list from appendAsc

The appendAsc method creates a mutable ArrayList but returns it as List, allowing
external modification. Since this list is used to construct immutable RexOver nodes
via ImmutableList.copyOf(orderKeys) in the rebuild method, return an immutable list
directly to prevent unintended mutations and clarify the contract.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/WindowFunctionAdapters.java [124-129]

 private static List<RexFieldCollation> appendAsc(List<RexFieldCollation> existing, RexNode key) {
-    List<RexFieldCollation> out = new ArrayList<>(existing.size() + 1);
-    out.addAll(existing);
-    out.add(new RexFieldCollation(key, Collections.emptySet()));
-    return out;
+    return ImmutableList.<RexFieldCollation>builderWithExpectedSize(existing.size() + 1)
+        .addAll(existing)
+        .add(new RexFieldCollation(key, Collections.emptySet()))
+        .build();
 }
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies that appendAsc returns a mutable list that could be modified externally. However, the impact is limited since the list is immediately consumed by ImmutableList.copyOf() in rebuild(), preventing actual mutations from affecting the final RexOver. The improvement enhances code clarity and defensive programming but doesn't fix a critical bug.

Low

Previous suggestions

Suggestions up to commit 7b29502
CategorySuggestion                                                                                                                                    Impact
General
Log warning for unexpected operand count

When ARG_MIN/ARG_MAX receives an unexpected operand count (not 2), the fallback
silently preserves the original operator, which may cause downstream substrait
emission to fail with cryptic errors. Log a warning with the actual operand count
and operator name to aid debugging when this edge case occurs in production.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/WindowFunctionAdapters.java [75-78]

 if (operands.size() != 2) {
     // Unexpected shape — preserve original behavior as best as possible.
+    logger.warn("ARG_MIN/ARG_MAX adapter expected 2 operands but got {} for operator {}; falling back to passthrough",
+                operands.size(), original.getAggOperator().getName());
     return rebuild(original, original.getAggOperator(), operands, partitions, orderKeys, original.isDistinct(), cluster);
 }
Suggestion importance[1-10]: 6

__

Why: Adding a warning log for the unexpected operand count fallback case would help with debugging production issues. However, the suggestion assumes a logger exists but doesn't show its declaration, and logging alone doesn't change behavior or fix a bug.

Low
Return immutable list from appendAsc

The appendAsc method creates a mutable ArrayList but returns it as List, allowing
external modification. Since this list is used to build immutable RexOver nodes via
ImmutableList.copyOf(orderKeys) in the rebuild method, return an immutable list
directly to prevent accidental mutation and clarify the contract.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/WindowFunctionAdapters.java [124-129]

 private static List<RexFieldCollation> appendAsc(List<RexFieldCollation> existing, RexNode key) {
-    List<RexFieldCollation> out = new ArrayList<>(existing.size() + 1);
-    out.addAll(existing);
-    out.add(new RexFieldCollation(key, Collections.emptySet()));
-    return out;
+    return ImmutableList.<RexFieldCollation>builder()
+        .addAll(existing)
+        .add(new RexFieldCollation(key, Collections.emptySet()))
+        .build();
 }
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies that appendAsc returns a mutable list, but the impact is limited since rebuild already wraps it with ImmutableList.copyOf(orderKeys). The change improves code clarity and prevents potential misuse, but doesn't fix a bug.

Low
Log when window operator is unrecognized

If WindowFunction.resolveFunction returns null for an unrecognized window operator,
the code silently skips adaptation. This may allow unsupported window functions to
reach substrait emission and fail later. Consider logging a debug message when fn is
null to trace which operators are not being adapted, aiding future debugging.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/BackendPlanAdapter.java [260-264]

 WindowFunction fn = WindowFunction.resolveFunction(over.getAggOperator());
+if (fn == null) {
+    LOGGER.debug("No WindowFunction mapping for operator {}, skipping adaptation", over.getAggOperator().getName());
+}
 WindowFunctionAdapter adapter = fn == null ? null : adapters.window().get(fn);
 if (adapter != null) {
     return adapter.adapt(over, adaptedOperands, adaptedPartitionKeys, adaptedOrderKeys, cluster);
 }
Suggestion importance[1-10]: 5

__

Why: Adding a debug log when WindowFunction.resolveFunction returns null would aid debugging, but this is a low-impact observability improvement. The code already handles the null case correctly by skipping adaptation, and unrecognized operators would fail later with clear errors during substrait emission.

Low
Suggestions up to commit 2c731ef
CategorySuggestion                                                                                                                                    Impact
Possible issue
Validate adapter returns non-null RexNode

The code retrieves a WindowFunctionAdapter from the map but does not validate that
the adapter's returned RexNode is non-null. If a backend's adapter implementation
returns null, the subsequent code will propagate this null reference, potentially
causing a NullPointerException downstream. Add a null check after the adapter
invocation to fail fast with a clear error message.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/BackendPlanAdapter.java [265-269]

-private static RexNode adaptOver(
-    RexOver over,
-    Adapters adapters,
-    List<FieldStorageInfo> fieldStorage,
-    RelOptCluster cluster,
-    List<RexNode> adaptedOperands,
-    boolean operandsChanged
-) {
-    ...
-    WindowFunction fn = WindowFunction.resolveFunction(over.getAggOperator());
-    WindowFunctionAdapter adapter = fn == null ? null : adapters.window().get(fn);
-    if (adapter != null) {
-        return adapter.adapt(over, adaptedOperands, adaptedPartitionKeys, adaptedOrderKeys, cluster);
+WindowFunction fn = WindowFunction.resolveFunction(over.getAggOperator());
+WindowFunctionAdapter adapter = fn == null ? null : adapters.window().get(fn);
+if (adapter != null) {
+    RexNode adapted = adapter.adapt(over, adaptedOperands, adaptedPartitionKeys, adaptedOrderKeys, cluster);
+    if (adapted == null) {
+        throw new IllegalStateException("WindowFunctionAdapter for " + fn + " returned null");
     }
-    ...
+    return adapted;
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that if a WindowFunctionAdapter returns null, it could cause a NullPointerException downstream. Adding a null check with a clear error message would improve robustness and fail-fast behavior. However, the contract of WindowFunctionAdapter.adapt (line 52 in WindowFunctionAdapter.java) returns RexNode, and well-behaved implementations should not return null. The check is a defensive measure that improves error handling but addresses a scenario that should not occur under correct implementation.

Medium
General
Log warning on unexpected operand count

When operands.size() != 2, the adapter falls back to rebuilding with the original
operator, but this silently accepts malformed input without logging or signaling the
unexpected condition. Consider logging a warning when the fallback path is taken so
operators can detect and investigate queries that produce unexpected operand counts
during development or debugging.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/WindowFunctionAdapters.java [92-95]

 private record ArgFunctionAdapter(SqlAggFunction target) implements WindowFunctionAdapter {
     @Override
     public RexNode adapt(
         RexOver original,
         List<RexNode> operands,
         List<RexNode> partitions,
         List<RexFieldCollation> orderKeys,
         RelOptCluster cluster
     ) {
         if (operands.size() != 2) {
             // Unexpected shape — preserve original behavior as best as possible.
+            LOGGER.warn("ARG_MIN/ARG_MAX adapter expected 2 operands but got {}, falling back to passthrough", operands.size());
             return rebuild(original, original.getAggOperator(), operands, partitions, orderKeys, original.isDistinct(), cluster);
         }
         ...
     }
 }
Suggestion importance[1-10]: 5

__

Why: Adding a warning log when the fallback path is taken would aid debugging and help detect malformed queries during development. However, the fallback is intentionally designed to preserve original behavior gracefully, and logging every unexpected operand count could add noise. The suggestion is valid but offers moderate value for maintainability and debugging.

Low
Return immutable list from helper method

The appendAsc method creates a mutable ArrayList and returns it as List, allowing
external callers to modify the returned list. This could lead to unintended side
effects if the caller mutates the list. Consider returning an immutable list using
Collections.unmodifiableList() or List.copyOf() to prevent external modifications.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/WindowFunctionAdapters.java [141-146]

 private static List<RexFieldCollation> appendAsc(List<RexFieldCollation> existing, RexNode key) {
     List<RexFieldCollation> out = new ArrayList<>(existing.size() + 1);
     out.addAll(existing);
     out.add(new RexFieldCollation(key, Collections.emptySet()));
-    return out;
+    return Collections.unmodifiableList(out);
 }
Suggestion importance[1-10]: 3

__

Why: The suggestion to return an immutable list is a defensive programming practice that prevents external modification. However, the appendAsc helper is private static and only called internally within ArgFunctionAdapter.adapt, where the returned list is immediately passed to rebuild and not stored or exposed. The risk of unintended mutation is minimal in this controlled context, so the improvement is marginal.

Low
Suggestions up to commit b851d9e
CategorySuggestion                                                                                                                                    Impact
General
Log warning for unexpected operand count

The fallback path for unexpected operand counts silently preserves the original
operator without logging or signaling the anomaly. Consider adding a warning log to
alert operators when ARG_MIN/ARG_MAX receives an unexpected operand shape, as this
may indicate a planner bug or unsupported query pattern that should be investigated.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/WindowFunctionAdapters.java [92-95]

 if (operands.size() != 2) {
     // Unexpected shape — preserve original behavior as best as possible.
+    // Log warning: ARG_MIN/ARG_MAX expects exactly 2 operands (value, ts)
     return rebuild(original, original.getAggOperator(), operands, partitions, orderKeys, original.isDistinct(), cluster);
 }
Suggestion importance[1-10]: 5

__

Why: Adding a warning log when ARG_MIN/ARG_MAX receives unexpected operand counts would help diagnose planner bugs or unsupported query patterns. This is a reasonable observability improvement, though the suggestion only mentions adding a comment rather than actual logging code, which limits its immediate value.

Low
Return immutable list from appendAsc

The appendAsc method creates a mutable ArrayList and returns it, but callers may
expect an immutable list since the method signature doesn't indicate mutability.
Consider returning an immutable list using List.copyOf() or ImmutableList.copyOf()
to prevent unintended modifications and align with the immutable pattern used
elsewhere in the codebase (e.g., ImmutableList.copyOf(orderKeys) in rebuild).

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/WindowFunctionAdapters.java [141-146]

 private static List<RexFieldCollation> appendAsc(List<RexFieldCollation> existing, RexNode key) {
     List<RexFieldCollation> out = new ArrayList<>(existing.size() + 1);
     out.addAll(existing);
     out.add(new RexFieldCollation(key, Collections.emptySet()));
-    return out;
+    return List.copyOf(out);
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion to return an immutable list is valid for defensive programming, but the impact is low since appendAsc is a private helper method whose return value is immediately consumed by rebuild (which wraps it in ImmutableList.copyOf(orderKeys) anyway). The extra copy has minimal performance impact but adds a layer of safety.

Low
Guard against null WindowFunction lookup

When WindowFunction.resolveFunction returns null but an adapter exists in the map
for a null key (theoretically possible if the map allows null keys), the code would
skip adaptation. Add an explicit null-check guard or document that adapters.window()
must never contain null keys to prevent silent failures in adapter dispatch.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/BackendPlanAdapter.java [265-269]

 WindowFunction fn = WindowFunction.resolveFunction(over.getAggOperator());
-WindowFunctionAdapter adapter = fn == null ? null : adapters.window().get(fn);
+if (fn == null) {
+    // No recognized window function; skip adapter lookup
+    if (operandsChanged || windowChanged) {
+        return cluster.getRexBuilder().makeOver(...);
+    }
+    return over;
+}
+WindowFunctionAdapter adapter = adapters.window().get(fn);
 if (adapter != null) {
     return adapter.adapt(over, adaptedOperands, adaptedPartitionKeys, adaptedOrderKeys, cluster);
 }
Suggestion importance[1-10]: 3

__

Why: The suggestion addresses a theoretical edge case where resolveFunction returns null. However, the existing code already handles this correctly by checking adapter != null before calling adapt. The suggested refactoring adds clarity but doesn't fix an actual bug, and the concern about null keys in the map is unlikely given the Map.of() construction pattern used throughout the codebase.

Low
Suggestions up to commit 31702ea
CategorySuggestion                                                                                                                                    Impact
General
Fail fast on unexpected operand count

When ARG_MIN/ARG_MAX receive unexpected operand counts, the fallback silently
preserves the original operator instead of failing fast. This masks configuration
errors and may produce incorrect substrait. Consider logging a warning or throwing
an exception to surface the unexpected shape.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/WindowFunctionAdapters.java [92-95]

 if (operands.size() != 2) {
-    // Unexpected shape — preserve original behavior as best as possible.
-    return rebuild(original, original.getAggOperator(), operands, partitions, orderKeys, original.isDistinct(), cluster);
+    throw new IllegalStateException(
+        "ARG_MIN/ARG_MAX expects exactly 2 operands (value, ts), got " + operands.size()
+    );
 }
Suggestion importance[1-10]: 7

__

Why: The current fallback silently preserves incorrect shapes, which could mask configuration errors. Failing fast with an exception would surface issues earlier, improving debuggability. However, the comment suggests the fallback is intentional for robustness.

Medium
Fail fast on unsupported window function

When WindowFunction.fromRexOver returns null (unsupported window function), the code
silently falls through to the default rebuild path. This may allow unsupported
functions to reach substrait emission. Consider throwing an exception when fn is
null to fail fast on unsupported window functions.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/BackendPlanAdapter.java [265-269]

 WindowFunction fn = WindowFunction.fromRexOver(over);
-WindowFunctionAdapter adapter = fn == null ? null : adapters.window().get(fn);
+if (fn == null) {
+    throw new IllegalStateException(
+        "Unsupported window function: " + over.getAggOperator().getName()
+    );
+}
+WindowFunctionAdapter adapter = adapters.window().get(fn);
 if (adapter != null) {
     return adapter.adapt(over, adaptedOperands, adaptedPartitionKeys, adaptedOrderKeys, cluster);
 }
Suggestion importance[1-10]: 6

__

Why: When fromRexOver returns null, the code falls through to a default rebuild. Throwing an exception would catch unsupported functions earlier. However, the existing code may intentionally allow pass-through for functions without adapters, so this change could break valid use cases.

Low
Return immutable list from helper

The appendAsc method creates a mutable ArrayList but returns it as List, allowing
external modification. Since this is a helper that builds order-key lists for window
functions, return an immutable list to prevent accidental mutation after
construction.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/WindowFunctionAdapters.java [141-146]

 private static List<RexFieldCollation> appendAsc(List<RexFieldCollation> existing, RexNode key) {
     List<RexFieldCollation> out = new ArrayList<>(existing.size() + 1);
     out.addAll(existing);
     out.add(new RexFieldCollation(key, Collections.emptySet()));
-    return out;
+    return Collections.unmodifiableList(out);
 }
Suggestion importance[1-10]: 5

__

Why: Returning an immutable list prevents accidental mutation, which is a good defensive practice. However, the current code works correctly and the risk of external mutation is low in this context. The improvement is minor.

Low
Suggestions up to commit fdb5ba1
CategorySuggestion                                                                                                                                    Impact
General
Log warning on unexpected operand count

When ARG_MIN/ARG_MAX receive unexpected operand counts, the fallback silently
preserves the original operator instead of the target (FIRST_VALUE/LAST_VALUE). This
may produce incorrect substrait emission. Consider logging a warning or throwing an
exception to surface the unexpected shape.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/WindowFunctionAdapters.java [92-95]

 if (operands.size() != 2) {
-    // Unexpected shape — preserve original behavior as best as possible.
+    // Log unexpected shape before fallback
+    logger.warn("ARG_MIN/ARG_MAX adapter expected 2 operands, got {}. Falling back to original operator.", operands.size());
     return rebuild(original, original.getAggOperator(), operands, partitions, orderKeys, original.isDistinct(), cluster);
 }
Suggestion importance[1-10]: 6

__

Why: Adding logging for unexpected operand counts would help diagnose issues during development and debugging. However, the fallback behavior is already documented as "preserve original behavior as best as possible," and the suggestion introduces a dependency on a logger that isn't shown in the code.

Low
Return immutable list from helper

The appendAsc method creates a mutable ArrayList but returns it as List, allowing
external modification. Consider returning an immutable list to prevent unintended
mutations after the adapter completes.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/WindowFunctionAdapters.java [141-146]

 private static List<RexFieldCollation> appendAsc(List<RexFieldCollation> existing, RexNode key) {
     List<RexFieldCollation> out = new ArrayList<>(existing.size() + 1);
     out.addAll(existing);
     out.add(new RexFieldCollation(key, Collections.emptySet()));
-    return out;
+    return Collections.unmodifiableList(out);
 }
Suggestion importance[1-10]: 4

__

Why: Returning an immutable list prevents external modification, which is a good defensive practice. However, the impact is low since the list is only used internally within the adapter and there's no evidence of mutation issues in the current codebase.

Low

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 5f49795: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit a14b109

PPL trendline wma() lowers to NTH_VALUE; previously the analytics planner
rejected it with "Window function [NTH_VALUE] is not supported" because
the WindowFunction enum had no entry. DataFusion 53.x has nth_value as a
substrait-recognized built-in, so registration is purely a Java-side
capability declaration — no adapter, no Rust UDAF.

Verified with CalcitePPLTrendlineIT against runTask cluster: testTrendlineWma
and testTrendlineMultipleFields go from failing to passing.

Signed-off-by: Songkan Tang <songkant@amazon.com>
@songkant-aws songkant-aws changed the title [analytics-engine] Wire dc/earliest/latest + unify reduce-stage subtree conversion [analytics-engine] Wire dc/earliest/latest/nth_value + unify reduce-stage subtree conversion May 26, 2026
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 691717e

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 691717e: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Extends testAggregateOverKinds to assert NTH_VALUE maps correctly,
matching the wiring added in 691717e.

Signed-off-by: Songkan Tang <songkant@amazon.com>
@songkant-aws songkant-aws force-pushed the fix-fragment-rewire-helper-column branch from fd2d105 to 3d43c88 Compare May 26, 2026 08:52
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit fd2d105

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 3d43c88

@songkant-aws songkant-aws marked this pull request as ready for review May 26, 2026 09:50
@songkant-aws songkant-aws requested a review from a team as a code owner May 26, 2026 09:50
@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 3d43c88: null

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit fdb5ba1

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for fdb5ba1: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

* fails before reaching the window-function gate with
* {@code "Cannot resolve function: DISTINCT_COUNT_APPROX"}. */
/** sql IT: testEventstatsDistinctCount. */
public void testEventstatsDistinctCount() throws IOException {
Copy link
Copy Markdown
Member

@mch2 mch2 May 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we test these with multi shard case? calcs/mapping.json - i have tried setting for clickbench tests here #21848 to add randomization 1-2 shard, we can do somethign similar here.

We'll likely have variance with dc() being approximate, but to ensure the queries succeed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added multi-shard variants in 31702ea:

  • testEventstatsDistinctCount_3shard and testEventstatsDistinctCountByCountry_3shard reuse the existing DATASET_MULTI (3 shards) — eventstats dc is broadcast pattern so the per-row expectations are identical to single-shard, and HLL is exact at calcs's 17-row scale.
  • testStreamstatsDistinctCountByCountry_3shard collapses to per-partition finals via | stats max(dc_str3) by str0 | sort str0 since streamstats running values aren't deterministic when shard parallelism reorders input rows — same shape as testStreamstatsBy_3shard.

Didn't go with DatasetProvisioner.provision-level random shard count for this PR since #21848's randomization helper isn't merged yet; sticking with the existing per-test 3-shard pattern this file already uses.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sanity-checked datafusion 53.1's HLL with a standalone Rust harness running the real algorithm over 3 shards × 100 iterations — dc(str3) always returns 1 and dc(str0) always returns 3, with zero variance. The merge step is just register-wise max with a fixed seed, so multi-shard and single-shard paths produce bit-identical state, and for these two specific values the count output sits well clear of any rounding boundary.

Address review feedback to exercise the dc()/distinct_count() path on the
multi-shard exchange + PARTIAL/FINAL agg split, not just the single-shard
inline-agg path.

EventstatsCommandIT:
  * testEventstatsDistinctCount_3shard
  * testEventstatsDistinctCountByCountry_3shard
  Use the existing DATASET_MULTI (3 shards). dc is broadcast: every row in a
  partition sees the same dc value, so the per-row expectations are identical
  to the single-shard variants. HLL is exact at calcs's 17-row scale.

StreamstatsCommandIT:
  * testStreamstatsDistinctCountByCountry_3shard
  streamstats running aggregates depend on input row order; multi-shard
  parallelism reorders rows at the coordinator, so per-row running values are
  not deterministic. Collapse to per-partition finals via stats max(dc) by
  str0, mirroring testStreamstatsBy_3shard. Expected partition finals all
  equal 1 (each str0 group has only "e" or null in str3).
Signed-off-by: Songkan Tang <songkant@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 31702ea

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 31702ea: SUCCESS

@codecov
Copy link
Copy Markdown

codecov Bot commented May 28, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 73.46%. Comparing base (b186488) to head (7b29502).
⚠️ Report is 9 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main   #21823      +/-   ##
============================================
- Coverage     73.50%   73.46%   -0.05%     
- Complexity    75546    75569      +23     
============================================
  Files          6034     6034              
  Lines        342661   342661              
  Branches      49294    49294              
============================================
- Hits         251879   251719     -160     
- Misses        70790    70958     +168     
+ Partials      19992    19984       -8     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Copy Markdown
Member

@sandeshkr419 sandeshkr419 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you are doing 2 things:

Part 1: Wire dc/earliest/latest/nth_value: no comments for now, good to go!
Part 2: Unify reduce-stage subtree conversion: This part should simplify after my changes: #21775 (aiming to get this in today). I have introduced state-shipping which unifies all agg state as Binary IPC, so the schema never shifts between SINGLE and FINAL - this should simplify your code a lot.

…-helper-column

Signed-off-by: Songkan Tang <songkant@amazon.com>

# Conflicts:
#	sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/WindowFunction.java
#	sandbox/plugins/analytics-backend-datafusion/rust/src/udaf/mod.rs
#	sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionAnalyticsBackendPlugin.java
#	sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/FragmentConversionDriver.java
#	sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/rules/OpenSearchProjectRule.java
#	sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/planner/MockBackend.java
#	sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/planner/dag/FragmentConversionDriverTests.java
#	sandbox/qa/analytics-engine-rest/src/test/java/org/opensearch/analytics/qa/EventstatsCommandIT.java
#	sandbox/qa/analytics-engine-rest/src/test/java/org/opensearch/analytics/qa/StreamstatsCommandIT.java
…base

Two compile errors surfaced by upstream/main:

1. AnalyticsSearchBackendPlugin SPI default delegatedPredicateSerializers()
   added on main (PR 21760). MockBackend's protected hook collided with the
   public SPI default — promote to public @OverRide to match main's pattern.

2. OpenSearchStageInputScan constructor gained a List<FieldStorageInfo>
   parameter on main. Pass List.of() in the test fixture; nothing in the
   reduce-stage shape under test depends on output field storage.

Signed-off-by: Songkan Tang <songkant@amazon.com>
…ow-helper test

OpenSearchProject.getOutputFieldStorage now resolves each output column's storage
by walking back through input field storage. After main, OpenSearchStageInputScan
exposes a List<FieldStorageInfo> alongside its rowType, and the resolver enforces
that input.fieldCount == fieldStorage.size().

The test previously passed an empty List for outputFieldStorage; the resolver now
fails when the helper Project's input ref points at column 0 of a 1-column input
that declares no storage. Pass a single derivedColumn entry matching the synthetic
"a" column.

Signed-off-by: Songkan Tang <songkant@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit b851d9e

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for b851d9e: SUCCESS

@songkant-aws
Copy link
Copy Markdown
Contributor Author

songkant-aws commented May 30, 2026

@sandeshkr419 thanks for the review. Rebased on main — both #21775's reduce_eval Binary-IPC path and #21875's overrideExchangeType transformer are on this branch now. Those address PARTIAL/FINAL aggregate row-type drift at the rel layer for STATE_EXPANDING / opaque-state aggregates so the schema doesn't shift between SINGLE and FINAL.

Could you take another look?

Removes the FragmentConversionDriver convertReduceFragment unification that
this PR introduced, restoring main's recursive convertReduceNode form. Also
removes the two regression assertions we added on top:

- testReduceStageWindowHelperChainConvertedAsSingleSubtree (deleted)
- the attachFragmentCalled assertFalse line in testTwoStageSortOnAggregateOnFilteredScan
- RecordingConvertor.attachFragmentCalled field

Verified empirically against the full analytics-engine-rest IT suite with
the recursive form restored: 770 / 770 IT pass (42 pre-existing @AwaitsFix
skips). The DataFusion runtime panic that originally motivated Part 2 ("index
out of bounds: len=6 idx=14" on streamstats by helper chains) is no longer
reachable on top of merged main — likely covered by upstream rel-layer fixes
since the original investigation (opensearch-project#21875's DistributedAggregateRewriter
overrideExchangeType transformer + opensearch-project#21690's string/utf8 view & timestamp
coercion elimination).

Keep this PR focused on Part 1 (wire dc / earliest / latest / nth_value).
The reduce-stage simplification, if revisited, should be its own PR with
fresh justification.

Signed-off-by: Songkan Tang <songkant@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 2c731ef

@songkant-aws songkant-aws changed the title [analytics-engine] Wire dc/earliest/latest/nth_value + unify reduce-stage subtree conversion [analytics-engine] Wire dc/earliest/latest/nth_value to DataFusion built-ins May 30, 2026
@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 2c731ef: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Copy Markdown
Member

@sandeshkr419 sandeshkr419 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 major comment on support of dc, rest looks fine.

While you are addressing this, can you also please concise up the comments in your changes to avoid overflowing intermittent design decisions.

…a window FunctionMappings

Rename APPROX_COUNT_DISTINCT to substrait `approx_distinct` on the window emission
path the same way we already do for aggregates: add the rename to
ADDITIONAL_WINDOW_SIGS and override WindowFunctionConverter.getSigs() to filter
isthmus's default `approx_count_distinct` binding so ours wins. DataFusion's
substrait consumer then resolves the call directly to its built-in
approx_distinct UDAF — no wrapper needed.

Removes the 113-line ApproxCountDistinctUdaf wrapper that aliased
`approx_count_distinct` to DataFusion's built-in `approx_distinct`.

Also concise up javadoc / comments around the window adapter changes
(WindowFunction, WindowFunctionAdapter SPI, WindowFunctionAdapters,
DataFusionAnalyticsBackendPlugin.windowCapabilities, BackendPlanAdapter.adaptOver,
OpenSearchProjectRule.collectWindowFunctions) — drop intermittent design
decisions and stale comments (e.g. the COUNT(DISTINCT) rewrite path that's no
longer used, the obsolete "PARTITION BY rejected" claim).

Verified: analytics-framework + analytics-engine + analytics-backend-datafusion
unit tests green; analytics-engine-rest IT 770/770 pass (42 pre-existing
@AwaitsFix skips, 0 failures, 0 errors).

Signed-off-by: Songkan Tang <songkant@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 7b29502

@songkant-aws songkant-aws requested a review from sandeshkr419 May 31, 2026 03:17
@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 7b29502: SUCCESS

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 25c7497

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants