diff --git a/sandbox/libs/analytics-api/src/main/java/org/opensearch/analytics/schema/OpenSearchSchemaBuilder.java b/sandbox/libs/analytics-api/src/main/java/org/opensearch/analytics/schema/OpenSearchSchemaBuilder.java index 78bca99446362..80e99b8e932a2 100644 --- a/sandbox/libs/analytics-api/src/main/java/org/opensearch/analytics/schema/OpenSearchSchemaBuilder.java +++ b/sandbox/libs/analytics-api/src/main/java/org/opensearch/analytics/schema/OpenSearchSchemaBuilder.java @@ -278,19 +278,21 @@ private static void addLeafFields( String fieldName = pathPrefix.isEmpty() ? fieldEntry.getKey() : pathPrefix + "." + fieldEntry.getKey(); Map fieldProps = (Map) fieldEntry.getValue(); String fieldType = (String) fieldProps.get("type"); - // Object types: implicit when "properties" is present without "type", or explicit "type: object". - // Recurse into sub-properties so dotted leaf paths ("city.location.latitude") appear as flat columns. - if (fieldType == null || "object".equals(fieldType)) { + // Object / nested types: implicit when "properties" is present without "type", + // explicit "type: object", or "type: nested". Recurse into sub-properties so + // dotted leaf paths ("city.location.latitude", "skills.name") appear as flat + // columns. Nested fields' leaves are written by ArrowSchemaBuilder as flat + // dotted columns at parquet time (the parent "nested" mapper itself has no + // ArrowFieldRegistry entry and is dropped); exposing the leaves in the Calcite + // row type matches that storage shape and matches sql-plugin OpenSearchDataType + // which also recurses Object and Nested through the same branch. + if (fieldType == null || "object".equals(fieldType) || "nested".equals(fieldType)) { Map nested = (Map) fieldProps.get("properties"); if (nested != null) { addLeafFields(builder, typeFactory, nested, fieldName); } continue; } - // Nested type (array-of-sub-docs) is a different beast — deferred. - if ("nested".equals(fieldType)) { - continue; - } RelDataType columnType = buildLeafType(fieldType, typeFactory); if (columnType == null) { // Unsupported (geo_point/shape/completion/…) or unknown plugin type. Drop the diff --git a/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/AggregateFunction.java b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/AggregateFunction.java index 16336781f2989..b741ffecc5ecf 100644 --- a/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/AggregateFunction.java +++ b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/AggregateFunction.java @@ -44,7 +44,10 @@ public enum AggregateFunction { LAST(Type.STATE_EXPANDING, SqlKind.OTHER, fields(IF("last_state", IntermediateTypeResolver.passThroughArg0(), null))), LIST(Type.STATE_EXPANDING, SqlKind.OTHER, fields(IF("list_state", IntermediateTypeResolver.passThroughArg0(), null))), VALUES(Type.STATE_EXPANDING, SqlKind.OTHER, fields(IF("values_state", IntermediateTypeResolver.passThroughArg0(), null))), - PATTERN(Type.STATE_EXPANDING, SqlKind.OTHER); + PATTERN(Type.STATE_EXPANDING, SqlKind.OTHER), + // PPL `mvcombine` lowers to Calcite SqlLibraryOperators.ARRAY_AGG (name "ARRAY_AGG"). Same + // PARTIAL/FINAL state shape as LIST: per-shard array_agg → cross-shard list_merge. + ARRAY_AGG(Type.STATE_EXPANDING, SqlKind.OTHER, fields(IF("array_agg_state", IntermediateTypeResolver.passThroughArg0(), null))); /** Category of aggregate function; affects execution strategy (shuffle vs map-reduce). */ public enum Type { diff --git a/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionAnalyticsBackendPlugin.java b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionAnalyticsBackendPlugin.java index b59d04cdd2f80..92c11cb84f4c2 100644 --- a/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionAnalyticsBackendPlugin.java +++ b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionAnalyticsBackendPlugin.java @@ -413,7 +413,8 @@ public class DataFusionAnalyticsBackendPlugin implements AnalyticsSearchBackendP AggregateFunction.LAST, AggregateFunction.LIST, AggregateFunction.VALUES, - AggregateFunction.PATTERN + AggregateFunction.PATTERN, + AggregateFunction.ARRAY_AGG ); private final DataFusionPlugin plugin; diff --git a/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/PplAggregateCallRewriter.java b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/PplAggregateCallRewriter.java index c4a019c5b23e7..019c55ef9e140 100644 --- a/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/PplAggregateCallRewriter.java +++ b/sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/PplAggregateCallRewriter.java @@ -76,8 +76,11 @@ private static AggregateCall rewriteCall(Aggregate agg, AggregateCall call) { case "TAKE" -> targetOp = DataFusionFragmentConvertor.LOCAL_TAKE_OP; case "FIRST" -> targetOp = DataFusionFragmentConvertor.LOCAL_FIRST_OP; case "LAST" -> targetOp = DataFusionFragmentConvertor.LOCAL_LAST_OP; - case "LIST", "VALUES" -> { - // arg0 type distinguishes PARTIAL (raw element → array_agg) from FINAL (array → list_merge). + case "LIST", "VALUES", "ARRAY_AGG" -> { + // arg0 type tells us PARTIAL (raw element) vs FINAL (already array): + // PARTIAL → array_agg (with INVOCATION_DISTINCT for VALUES); rebuild + // return type as ARRAY to repair PPL's lossy STRING_ARRAY. + // FINAL → list_merge / list_merge_distinct un-nests per-shard arrays. if (call.getArgList().isEmpty()) { return call; } diff --git a/sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/engine/OpenSearchSchemaBuilderTests.java b/sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/engine/OpenSearchSchemaBuilderTests.java index a444a10b743dd..050a23b0ca548 100644 --- a/sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/engine/OpenSearchSchemaBuilderTests.java +++ b/sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/engine/OpenSearchSchemaBuilderTests.java @@ -114,9 +114,11 @@ public void testMultipleIndicesProduceMultipleTables() throws Exception { } /** - * Test that nested/object fields are skipped. + * Object / nested fields without a {@code properties} sub-map contribute no leaves — + * the recursion enters but finds nothing to add. The sibling supported field still + * surfaces. Pins the no-op behavior for malformed object/nested entries. */ - public void testNestedAndObjectFieldsSkipped() throws Exception { + public void testNestedAndObjectFieldsWithoutPropertiesProduceNoLeaves() throws Exception { ClusterState clusterState = buildClusterState( Map.of("nested_index", Map.of("name", "keyword", "address", "object", "tags", "nested")) ); @@ -127,10 +129,39 @@ public void testNestedAndObjectFieldsSkipped() throws Exception { assertNotNull(table); RelDataType rowType = table.getRowType(new org.apache.calcite.jdbc.JavaTypeFactoryImpl()); - assertEquals("Should only have 'name' field, skipping object/nested", 1, rowType.getFieldCount()); + assertEquals("Only 'name' surfaces; object/nested with no properties add nothing", 1, rowType.getFieldCount()); assertFieldType(rowType, "name", SqlTypeName.VARCHAR); } + /** + * Nested fields with a {@code properties} sub-map (the array-of-sub-docs shape used by + * {@code mvexpand}-style tests) must surface their leaves as flat dotted columns — + * matching the parquet storage shape produced by {@code ArrowSchemaBuilder}, which + * iterates leaf mappers and writes {@code skills.name}/{@code skills.level} as flat + * top-level VARCHAR columns. Without this, every PPL query referencing a nested leaf + * fails Calcite validation with "column not found". + */ + public void testNestedFieldWithPropertiesExposesLeavesAsDottedColumns() throws Exception { + String mapping = "{\"properties\":{" + + "\"username\":{\"type\":\"keyword\"}," + + "\"skills\":{\"type\":\"nested\",\"properties\":{" + + "\"name\":{\"type\":\"keyword\"}," + + "\"level\":{\"type\":\"keyword\"}" + + "}}" + + "}}"; + ClusterState clusterState = buildClusterStateRaw("nested_skills", mapping); + + SchemaPlus schema = OpenSearchSchemaBuilder.buildSchema(clusterState); + Table table = schema.getTable("nested_skills"); + assertNotNull(table); + + RelDataType rowType = table.getRowType(new org.apache.calcite.jdbc.JavaTypeFactoryImpl()); + assertEquals("username + skills.name + skills.level", 3, rowType.getFieldCount()); + assertFieldType(rowType, "username", SqlTypeName.VARCHAR); + assertFieldType(rowType, "skills.name", SqlTypeName.VARCHAR); + assertFieldType(rowType, "skills.level", SqlTypeName.VARCHAR); + } + /** * Test that an empty ClusterState produces an empty schema. */