From 697dafa7e25243f1c15d6b4c29b37fae0742147f Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Sun, 31 May 2026 22:11:31 +0530 Subject: [PATCH] Add execution stats to data-node fragment slow log MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduces FragmentExecutionStats record with execution shape fields derived from the resolved plan alternative — the actual plan selected for execution on this node. Fields (all zero-cost — field reads from existing objects): - rows_produced: output row count (already counted) - used_secondary_index: from resolved plan's DelegationDescriptor - delegated_predicate_count: number of predicates delegated to secondary index — explains cost when used_secondary_index=true - filter_tree_shape: CONJUNCTIVE vs INTERLEAVED_BOOLEAN_EXPRESSION — explains why delegation is expensive (complex boolean evaluation) - has_partial_aggregate: whether the fragment does aggregation — explains high compute with low output rows - task_id: from the existing task object - id (X-Opaque-Id): from task headers Also refactors executeFragmentStreamingAsync to surface the resolved plan via ResolvedExecution wrapper, ensuring stats reflect the exact plan alternative that was executed. Signed-off-by: Bukhtawar Khan --- .../backend/AnalyticsOperationListener.java | 9 ++-- .../backend/FragmentExecutionStats.java | 36 +++++++++++++++ .../exec/AnalyticsFragmentSlowLog.java | 45 ++++++++++++------- .../exec/AnalyticsSearchService.java | 42 +++++++++++++++-- .../exec/AnalyticsFragmentSlowLogTests.java | 12 +++-- 5 files changed, 117 insertions(+), 27 deletions(-) create mode 100644 sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/backend/FragmentExecutionStats.java diff --git a/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/backend/AnalyticsOperationListener.java b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/backend/AnalyticsOperationListener.java index 4569f9029841f..ff3fe563d7be1 100644 --- a/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/backend/AnalyticsOperationListener.java +++ b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/backend/AnalyticsOperationListener.java @@ -140,6 +140,7 @@ default void onPreFragmentExecution(String queryId, int stageId, String shardId) * @param tookInNanos wall-clock time for the fragment execution * @param rowsProduced number of rows produced by this fragment * @param indexSettings the index settings for per-index slow log threshold lookup + * @param stats quantitative execution stats (rows scanned, bytes read, etc.) */ default void onFragmentSuccess( String queryId, @@ -147,7 +148,8 @@ default void onFragmentSuccess( String shardId, long tookInNanos, long rowsProduced, - IndexSettings indexSettings + IndexSettings indexSettings, + FragmentExecutionStats stats ) {} /** @@ -291,11 +293,12 @@ public void onFragmentSuccess( String shardId, long tookInNanos, long rowsProduced, - IndexSettings indexSettings + IndexSettings indexSettings, + FragmentExecutionStats stats ) { for (AnalyticsOperationListener l : delegates) { try { - l.onFragmentSuccess(queryId, stageId, shardId, tookInNanos, rowsProduced, indexSettings); + l.onFragmentSuccess(queryId, stageId, shardId, tookInNanos, rowsProduced, indexSettings, stats); } catch (Exception e) { warn("onFragmentSuccess", e); } diff --git a/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/backend/FragmentExecutionStats.java b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/backend/FragmentExecutionStats.java new file mode 100644 index 0000000000000..ad30249547676 --- /dev/null +++ b/sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/backend/FragmentExecutionStats.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.analytics.backend; + +/** + * Execution stats for a completed fragment, carried alongside the slow log + * callback to provide insight into the work performed. + * + * @param rowsProduced rows returned by this fragment + * @param usedSecondaryIndex whether a secondary/inverted index was consulted for filtering + * @param delegatedPredicateCount number of predicates delegated to the secondary index (0 if none) + * @param filterTreeShape boolean tree shape for delegation (null if no delegation) + * @param hasPartialAggregate whether this fragment performs partial aggregation + * @param taskId the task ID for correlation with _tasks API + * @param opaqueId the X-Opaque-Id header for correlation with the client request (nullable) + * + * @opensearch.internal + */ +public record FragmentExecutionStats( + long rowsProduced, + boolean usedSecondaryIndex, + int delegatedPredicateCount, + String filterTreeShape, + boolean hasPartialAggregate, + long taskId, + String opaqueId +) { + + public static final FragmentExecutionStats EMPTY = new FragmentExecutionStats(0, false, 0, null, false, -1, null); +} diff --git a/sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsFragmentSlowLog.java b/sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsFragmentSlowLog.java index 61c087a15996a..f0045b50f5e20 100644 --- a/sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsFragmentSlowLog.java +++ b/sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsFragmentSlowLog.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.analytics.backend.AnalyticsOperationListener; +import org.opensearch.analytics.backend.FragmentExecutionStats; import org.opensearch.common.logging.Loggers; import org.opensearch.common.logging.SlowLogLevel; import org.opensearch.common.unit.TimeValue; @@ -43,7 +44,8 @@ public void onFragmentSuccess( String shardId, long tookInNanos, long rowsProduced, - IndexSettings indexSettings + IndexSettings indexSettings, + FragmentExecutionStats stats ) { long warnThreshold = indexSettings.getValue(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_WARN_SETTING).nanos(); long infoThreshold = indexSettings.getValue(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_INFO_SETTING).nanos(); @@ -51,7 +53,7 @@ public void onFragmentSuccess( long traceThreshold = indexSettings.getValue(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_TRACE_SETTING).nanos(); SlowLogLevel level = indexSettings.getValue(SearchSlowLog.INDEX_SEARCH_SLOWLOG_LEVEL); - String message = formatMessage(queryId, stageId, shardId, tookInNanos, rowsProduced); + String message = formatMessage(queryId, stageId, shardId, tookInNanos, rowsProduced, stats); if (warnThreshold >= 0 && tookInNanos > warnThreshold && level.isLevelEnabledFor(SlowLogLevel.WARN)) { logger.warn(message); } else if (infoThreshold >= 0 && tookInNanos > infoThreshold && level.isLevelEnabledFor(SlowLogLevel.INFO)) { @@ -63,19 +65,30 @@ public void onFragmentSuccess( } } - private static String formatMessage(String queryId, int stageId, String shardId, long tookInNanos, long rowsProduced) { - return "took[" - + TimeValue.timeValueNanos(tookInNanos) - + "], took_millis[" - + TimeUnit.NANOSECONDS.toMillis(tookInNanos) - + "], query_id[" - + queryId - + "], stage_id[" - + stageId - + "], shard[" - + shardId - + "], rows_produced[" - + rowsProduced - + "]"; + private static String formatMessage( + String queryId, + int stageId, + String shardId, + long tookInNanos, + long rowsProduced, + FragmentExecutionStats stats + ) { + StringBuilder sb = new StringBuilder(); + sb.append("took[").append(TimeValue.timeValueNanos(tookInNanos)); + sb.append("], took_millis[").append(TimeUnit.NANOSECONDS.toMillis(tookInNanos)); + sb.append("], query_id[").append(queryId); + sb.append("], stage_id[").append(stageId); + sb.append("], shard[").append(shardId); + sb.append("], rows_produced[").append(rowsProduced); + sb.append("], used_secondary_index[").append(stats.usedSecondaryIndex()); + if (stats.usedSecondaryIndex()) { + sb.append("], delegated_predicates[").append(stats.delegatedPredicateCount()); + sb.append("], filter_tree_shape[").append(stats.filterTreeShape()); + } + sb.append("], partial_aggregate[").append(stats.hasPartialAggregate()); + sb.append("], task_id[").append(stats.taskId()); + sb.append("], id[").append(stats.opaqueId() != null ? stats.opaqueId() : ""); + sb.append("]"); + return sb.toString(); } } diff --git a/sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsSearchService.java b/sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsSearchService.java index b51d926bee1b3..f1502af7b4416 100644 --- a/sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsSearchService.java +++ b/sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsSearchService.java @@ -15,6 +15,7 @@ import org.opensearch.analytics.backend.AnalyticsOperationListener; import org.opensearch.analytics.backend.EngineResultBatch; import org.opensearch.analytics.backend.EngineResultStream; +import org.opensearch.analytics.backend.FragmentExecutionStats; import org.opensearch.analytics.backend.SearchExecEngine; import org.opensearch.analytics.backend.ShardScanExecutionContext; import org.opensearch.analytics.exec.action.FetchByRowIdsRequest; @@ -123,9 +124,18 @@ public void setTaskResourceTrackingService(TaskResourceTrackingService service) } public FragmentResources executeFragmentStreaming(FragmentExecutionRequest request, IndexShard shard, AnalyticsShardTask task) { + return executeFragmentStreamingResolved(request, shard, task).resources; + } + + private ResolvedExecution executeFragmentStreamingResolved( + FragmentExecutionRequest request, + IndexShard shard, + AnalyticsShardTask task + ) { ResolvedFragment resolved = resolveFragment(request, shard); try { - return startFragment(request, resolved, shard, task); + FragmentResources resources = startFragment(request, resolved, shard, task); + return new ResolvedExecution(resources, resolved); } catch (TaskCancelledException | IllegalStateException | IllegalArgumentException e) { listener.onFragmentFailure(resolved.queryId, resolved.stageId, resolved.shardIdStr, e); throw e; @@ -135,6 +145,13 @@ public FragmentResources executeFragmentStreaming(FragmentExecutionRequest reque } } + private record ResolvedExecution(FragmentResources resources, ResolvedFragment resolved) implements AutoCloseable { + @Override + public void close() throws Exception { + resources.close(); + } + } + /** * Async variant that forks fragment execution onto the given executor and streams * batches back through the channel. The transport thread returns immediately. @@ -151,8 +168,8 @@ public void executeFragmentStreamingAsync( LOGGER.debug("[FragmentExecution] shard={} task={}", shard.shardId(), task.getId()); final long startNanos = System.nanoTime(); long rowsProduced = 0; - try (FragmentResources ctx = executeFragmentStreaming(request, shard, task)) { - Iterator it = ctx.stream().iterator(); + try (ResolvedExecution exec = executeFragmentStreamingResolved(request, shard, task)) { + Iterator it = exec.resources().stream().iterator(); while (it.hasNext()) { EngineResultBatch batch = it.next(); rowsProduced += batch.getRowCount(); @@ -160,13 +177,30 @@ public void executeFragmentStreamingAsync( } long fragmentTookNanos = System.nanoTime() - startNanos; responseHandler.onComplete(); + ResolvedFragment resolved = exec.resolved(); + DelegationDescriptor delegation = resolved.plan().getDelegationDescriptor(); + boolean usedSecondaryIndex = delegation != null; + int delegatedPredicateCount = delegation != null ? delegation.delegatedPredicateCount() : 0; + String filterTreeShape = delegation != null ? delegation.treeShape().name() : null; + boolean hasPartialAggregate = resolved.plan().getInstructions().stream() + .anyMatch(n -> n.type() == org.opensearch.analytics.spi.InstructionType.SETUP_PARTIAL_AGGREGATE); + FragmentExecutionStats stats = new FragmentExecutionStats( + rowsProduced, + usedSecondaryIndex, + delegatedPredicateCount, + filterTreeShape, + hasPartialAggregate, + task.getId(), + task.getHeader(Task.X_OPAQUE_ID) + ); listener.onFragmentSuccess( request.getQueryId(), request.getStageId(), shard.shardId().toString(), fragmentTookNanos, rowsProduced, - shard.indexSettings() + shard.indexSettings(), + stats ); } catch (Exception e) { responseHandler.onFailure(e); diff --git a/sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/exec/AnalyticsFragmentSlowLogTests.java b/sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/exec/AnalyticsFragmentSlowLogTests.java index 572077761ef75..8dc64dadbe3dd 100644 --- a/sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/exec/AnalyticsFragmentSlowLogTests.java +++ b/sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/exec/AnalyticsFragmentSlowLogTests.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.Version; +import org.opensearch.analytics.backend.FragmentExecutionStats; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.logging.Loggers; import org.opensearch.common.settings.Settings; @@ -52,7 +53,8 @@ public void testFragmentSlowLogFiresWhenAboveThreshold() throws Exception { ) ); - fragmentSlowLog.onFragmentSuccess("q1", 0, "test-shard", TimeValue.timeValueMillis(5).nanos(), 42, indexSettings); + fragmentSlowLog.onFragmentSuccess("q1", 0, "test-shard", TimeValue.timeValueMillis(5).nanos(), 42, indexSettings, + new FragmentExecutionStats(42, true, 2, "CONJUNCTIVE", false, 99, "opaque-1")); appender.assertAllExpectationsMatched(); } } @@ -74,7 +76,8 @@ public void testFragmentSlowLogDoesNotFireWhenBelowThreshold() throws Exception ) ); - fragmentSlowLog.onFragmentSuccess("q1", 0, "test-shard", TimeValue.timeValueMillis(5).nanos(), 42, indexSettings); + fragmentSlowLog.onFragmentSuccess("q1", 0, "test-shard", TimeValue.timeValueMillis(5).nanos(), 42, indexSettings, + new FragmentExecutionStats(42, true, 2, "CONJUNCTIVE", false, 99, "opaque-1")); appender.assertAllExpectationsMatched(); } } @@ -92,11 +95,12 @@ public void testFragmentSlowLogContainsAllExpectedFields() throws Exception { "all fields present", AnalyticsFragmentSlowLog.LOGGER_NAME, Level.WARN, - ".*took\\[.*\\].*took_millis\\[\\d+\\].*query_id\\[q-fields\\].*stage_id\\[2\\].*shard\\[\\[my-idx\\]\\[0\\]\\].*rows_produced\\[100\\].*" + ".*took\\[.*\\].*took_millis\\[\\d+\\].*query_id\\[q-fields\\].*stage_id\\[2\\].*shard\\[\\[my-idx\\]\\[0\\]\\].*rows_produced\\[100\\].*used_secondary_index\\[false\\].*partial_aggregate\\[true\\].*task_id\\[101\\].*id\\[opaque-2\\].*" ) ); - fragmentSlowLog.onFragmentSuccess("q-fields", 2, "[my-idx][0]", TimeValue.timeValueMillis(50).nanos(), 100, indexSettings); + fragmentSlowLog.onFragmentSuccess("q-fields", 2, "[my-idx][0]", TimeValue.timeValueMillis(50).nanos(), 100, indexSettings, + new FragmentExecutionStats(100, false, 0, null, true, 101, "opaque-2")); appender.assertAllExpectationsMatched(); } }