From 7f7d50b4130fe38b2342e83d6dee58f782e1cf25 Mon Sep 17 00:00:00 2001 From: Eric Wei Date: Thu, 28 May 2026 09:44:15 -0700 Subject: [PATCH 1/2] [analytics-engine] Register Calcite ARRAY_AGG so PPL mvcombine can plan MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PPL `mvcombine` lowers to Calcite SqlLibraryOperators.ARRAY_AGG (name "ARRAY_AGG", SqlKind.ARRAY_AGG) in CalciteRelNodeVisitor#performArrayAggAggregation. The analytics-engine HEP marking phase, OpenSearchAggregateRule#resolveViableBackendsForCall, calls AggregateFunction.fromSqlKind first and falls back to fromNameOrError on null. Neither path previously recognized ARRAY_AGG: no enum constant carried that SqlKind, and `valueOf("ARRAY_AGG")` threw IllegalArgumentException which was wrapped as IllegalStateException and surfaced through the PPL error renderer, taking down every mvcombine query on the analytics-engine path. Adds ARRAY_AGG as a STATE_EXPANDING aggregate with the same intermediate shape as LIST: per-shard `array_agg` produces ARRAY, cross-shard `list_merge` un-nests. Routes through the same PplAggregateCallRewriter LIST/VALUES branch (PARTIAL → LOCAL_ARRAY_AGG_OP, FINAL → LOCAL_LIST_MERGE_OP), distinguished by the arg0-is-list check that already exists. Also adds ARRAY_AGG to DataFusion's supported AGG_FUNCTIONS so the backend declares capability. Three-line edit: enum constant, capability set entry, switch case extension. Signed-off-by: Eric Wei --- .../org/opensearch/analytics/spi/AggregateFunction.java | 5 ++++- .../be/datafusion/DataFusionAnalyticsBackendPlugin.java | 3 ++- .../opensearch/be/datafusion/PplAggregateCallRewriter.java | 7 +++++-- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/AggregateFunction.java b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/AggregateFunction.java index 16336781f2989..b741ffecc5ecf 100644 --- a/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/AggregateFunction.java +++ b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/AggregateFunction.java @@ -44,7 +44,10 @@ public enum AggregateFunction { LAST(Type.STATE_EXPANDING, SqlKind.OTHER, fields(IF("last_state", IntermediateTypeResolver.passThroughArg0(), null))), LIST(Type.STATE_EXPANDING, SqlKind.OTHER, fields(IF("list_state", IntermediateTypeResolver.passThroughArg0(), null))), VALUES(Type.STATE_EXPANDING, SqlKind.OTHER, fields(IF("values_state", IntermediateTypeResolver.passThroughArg0(), null))), - PATTERN(Type.STATE_EXPANDING, SqlKind.OTHER); + PATTERN(Type.STATE_EXPANDING, SqlKind.OTHER), + // PPL `mvcombine` lowers to Calcite SqlLibraryOperators.ARRAY_AGG (name "ARRAY_AGG"). Same + // PARTIAL/FINAL state shape as LIST: per-shard array_agg → cross-shard list_merge. + ARRAY_AGG(Type.STATE_EXPANDING, SqlKind.OTHER, fields(IF("array_agg_state", IntermediateTypeResolver.passThroughArg0(), null))); /** Category of aggregate function; affects execution strategy (shuffle vs map-reduce). */ public enum Type { diff --git a/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionAnalyticsBackendPlugin.java b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionAnalyticsBackendPlugin.java index b59d04cdd2f80..92c11cb84f4c2 100644 --- a/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionAnalyticsBackendPlugin.java +++ b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionAnalyticsBackendPlugin.java @@ -413,7 +413,8 @@ public class DataFusionAnalyticsBackendPlugin implements AnalyticsSearchBackendP AggregateFunction.LAST, AggregateFunction.LIST, AggregateFunction.VALUES, - AggregateFunction.PATTERN + AggregateFunction.PATTERN, + AggregateFunction.ARRAY_AGG ); private final DataFusionPlugin plugin; diff --git a/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/PplAggregateCallRewriter.java b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/PplAggregateCallRewriter.java index c4a019c5b23e7..019c55ef9e140 100644 --- a/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/PplAggregateCallRewriter.java +++ b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/PplAggregateCallRewriter.java @@ -76,8 +76,11 @@ private static AggregateCall rewriteCall(Aggregate agg, AggregateCall call) { case "TAKE" -> targetOp = DataFusionFragmentConvertor.LOCAL_TAKE_OP; case "FIRST" -> targetOp = DataFusionFragmentConvertor.LOCAL_FIRST_OP; case "LAST" -> targetOp = DataFusionFragmentConvertor.LOCAL_LAST_OP; - case "LIST", "VALUES" -> { - // arg0 type distinguishes PARTIAL (raw element → array_agg) from FINAL (array → list_merge). + case "LIST", "VALUES", "ARRAY_AGG" -> { + // arg0 type tells us PARTIAL (raw element) vs FINAL (already array): + // PARTIAL → array_agg (with INVOCATION_DISTINCT for VALUES); rebuild + // return type as ARRAY to repair PPL's lossy STRING_ARRAY. + // FINAL → list_merge / list_merge_distinct un-nests per-shard arrays. if (call.getArgList().isEmpty()) { return call; } From 5e7037cdaf7725493916ae9e60aef4bba36676e7 Mon Sep 17 00:00:00 2001 From: Eric Wei Date: Thu, 28 May 2026 10:41:01 -0700 Subject: [PATCH 2/2] [analytics-engine] Recurse into nested-typed mappings in OpenSearchSchemaBuilder MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Treat OpenSearch `nested` fields the same as `object` fields: walk their `properties` sub-map and emit dotted leaves into the Calcite row type. The parquet storage path (ArrowSchemaBuilder) already iterates the document mapper's leaf mappers, so a nested field's children are written as flat top-level columns (e.g. `skills.name`, `skills.level`). The Calcite row type was the only side that hid them, leaving every PPL query referencing a nested leaf to fail validation with "column not found" — which the SQL plugin's PPL frontend renders as an error envelope without `datarows`, producing the "JSONObject[\"datarows\"] not found" failures across the mvexpand-edge-cases / graph-employees ITs on the force-routed AE path. The pre-existing `nested`-skip branch was a placeholder ("a different beast — deferred"); that deferral now bites every nested mapping. Sql plugin's own OpenSearchDataType collapses Object and Nested into the same recursion branch (data/type/OpenSearchDataType.java:147-157), and this change matches that contract. Renames the existing unit test that pinned the old skip-everything behavior to reflect what it actually covers (object/nested without properties is a no-op), and adds a positive test that asserts a nested field with sub-properties surfaces its leaves as dotted columns. Signed-off-by: Eric Wei --- .../schema/OpenSearchSchemaBuilder.java | 16 ++++---- .../engine/OpenSearchSchemaBuilderTests.java | 37 +++++++++++++++++-- 2 files changed, 43 insertions(+), 10 deletions(-) diff --git a/sandbox/libs/analytics-api/src/main/java/org/opensearch/analytics/schema/OpenSearchSchemaBuilder.java b/sandbox/libs/analytics-api/src/main/java/org/opensearch/analytics/schema/OpenSearchSchemaBuilder.java index 78bca99446362..80e99b8e932a2 100644 --- a/sandbox/libs/analytics-api/src/main/java/org/opensearch/analytics/schema/OpenSearchSchemaBuilder.java +++ b/sandbox/libs/analytics-api/src/main/java/org/opensearch/analytics/schema/OpenSearchSchemaBuilder.java @@ -278,19 +278,21 @@ private static void addLeafFields( String fieldName = pathPrefix.isEmpty() ? fieldEntry.getKey() : pathPrefix + "." + fieldEntry.getKey(); Map fieldProps = (Map) fieldEntry.getValue(); String fieldType = (String) fieldProps.get("type"); - // Object types: implicit when "properties" is present without "type", or explicit "type: object". - // Recurse into sub-properties so dotted leaf paths ("city.location.latitude") appear as flat columns. - if (fieldType == null || "object".equals(fieldType)) { + // Object / nested types: implicit when "properties" is present without "type", + // explicit "type: object", or "type: nested". Recurse into sub-properties so + // dotted leaf paths ("city.location.latitude", "skills.name") appear as flat + // columns. Nested fields' leaves are written by ArrowSchemaBuilder as flat + // dotted columns at parquet time (the parent "nested" mapper itself has no + // ArrowFieldRegistry entry and is dropped); exposing the leaves in the Calcite + // row type matches that storage shape and matches sql-plugin OpenSearchDataType + // which also recurses Object and Nested through the same branch. + if (fieldType == null || "object".equals(fieldType) || "nested".equals(fieldType)) { Map nested = (Map) fieldProps.get("properties"); if (nested != null) { addLeafFields(builder, typeFactory, nested, fieldName); } continue; } - // Nested type (array-of-sub-docs) is a different beast — deferred. - if ("nested".equals(fieldType)) { - continue; - } RelDataType columnType = buildLeafType(fieldType, typeFactory); if (columnType == null) { // Unsupported (geo_point/shape/completion/…) or unknown plugin type. Drop the diff --git a/sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/engine/OpenSearchSchemaBuilderTests.java b/sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/engine/OpenSearchSchemaBuilderTests.java index a444a10b743dd..050a23b0ca548 100644 --- a/sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/engine/OpenSearchSchemaBuilderTests.java +++ b/sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/engine/OpenSearchSchemaBuilderTests.java @@ -114,9 +114,11 @@ public void testMultipleIndicesProduceMultipleTables() throws Exception { } /** - * Test that nested/object fields are skipped. + * Object / nested fields without a {@code properties} sub-map contribute no leaves — + * the recursion enters but finds nothing to add. The sibling supported field still + * surfaces. Pins the no-op behavior for malformed object/nested entries. */ - public void testNestedAndObjectFieldsSkipped() throws Exception { + public void testNestedAndObjectFieldsWithoutPropertiesProduceNoLeaves() throws Exception { ClusterState clusterState = buildClusterState( Map.of("nested_index", Map.of("name", "keyword", "address", "object", "tags", "nested")) ); @@ -127,10 +129,39 @@ public void testNestedAndObjectFieldsSkipped() throws Exception { assertNotNull(table); RelDataType rowType = table.getRowType(new org.apache.calcite.jdbc.JavaTypeFactoryImpl()); - assertEquals("Should only have 'name' field, skipping object/nested", 1, rowType.getFieldCount()); + assertEquals("Only 'name' surfaces; object/nested with no properties add nothing", 1, rowType.getFieldCount()); assertFieldType(rowType, "name", SqlTypeName.VARCHAR); } + /** + * Nested fields with a {@code properties} sub-map (the array-of-sub-docs shape used by + * {@code mvexpand}-style tests) must surface their leaves as flat dotted columns — + * matching the parquet storage shape produced by {@code ArrowSchemaBuilder}, which + * iterates leaf mappers and writes {@code skills.name}/{@code skills.level} as flat + * top-level VARCHAR columns. Without this, every PPL query referencing a nested leaf + * fails Calcite validation with "column not found". + */ + public void testNestedFieldWithPropertiesExposesLeavesAsDottedColumns() throws Exception { + String mapping = "{\"properties\":{" + + "\"username\":{\"type\":\"keyword\"}," + + "\"skills\":{\"type\":\"nested\",\"properties\":{" + + "\"name\":{\"type\":\"keyword\"}," + + "\"level\":{\"type\":\"keyword\"}" + + "}}" + + "}}"; + ClusterState clusterState = buildClusterStateRaw("nested_skills", mapping); + + SchemaPlus schema = OpenSearchSchemaBuilder.buildSchema(clusterState); + Table table = schema.getTable("nested_skills"); + assertNotNull(table); + + RelDataType rowType = table.getRowType(new org.apache.calcite.jdbc.JavaTypeFactoryImpl()); + assertEquals("username + skills.name + skills.level", 3, rowType.getFieldCount()); + assertFieldType(rowType, "username", SqlTypeName.VARCHAR); + assertFieldType(rowType, "skills.name", SqlTypeName.VARCHAR); + assertFieldType(rowType, "skills.level", SqlTypeName.VARCHAR); + } + /** * Test that an empty ClusterState produces an empty schema. */