Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,16 @@ default void onPreFragmentExecution(String queryId, int stageId, String shardId)
* @param tookInNanos wall-clock time for the fragment execution
* @param rowsProduced number of rows produced by this fragment
* @param indexSettings the index settings for per-index slow log threshold lookup
* @param stats quantitative execution stats (rows scanned, bytes read, etc.)
*/
default void onFragmentSuccess(
String queryId,
int stageId,
String shardId,
long tookInNanos,
long rowsProduced,
IndexSettings indexSettings
IndexSettings indexSettings,
FragmentExecutionStats stats
) {}

/**
Expand Down Expand Up @@ -291,11 +293,12 @@ public void onFragmentSuccess(
String shardId,
long tookInNanos,
long rowsProduced,
IndexSettings indexSettings
IndexSettings indexSettings,
FragmentExecutionStats stats
) {
for (AnalyticsOperationListener l : delegates) {
try {
l.onFragmentSuccess(queryId, stageId, shardId, tookInNanos, rowsProduced, indexSettings);
l.onFragmentSuccess(queryId, stageId, shardId, tookInNanos, rowsProduced, indexSettings, stats);
} catch (Exception e) {
warn("onFragmentSuccess", e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.analytics.backend;

/**
* Execution stats for a completed fragment, carried alongside the slow log
* callback to provide insight into the work performed.
*
* @param rowsProduced rows returned by this fragment
* @param usedSecondaryIndex whether a secondary/inverted index was consulted for filtering
* @param delegatedPredicateCount number of predicates delegated to the secondary index (0 if none)
* @param filterTreeShape boolean tree shape for delegation (null if no delegation)
* @param hasPartialAggregate whether this fragment performs partial aggregation
* @param taskId the task ID for correlation with _tasks API
* @param opaqueId the X-Opaque-Id header for correlation with the client request (nullable)
*
* @opensearch.internal
*/
public record FragmentExecutionStats(
long rowsProduced,
boolean usedSecondaryIndex,
int delegatedPredicateCount,
String filterTreeShape,
boolean hasPartialAggregate,
long taskId,
String opaqueId
) {

public static final FragmentExecutionStats EMPTY = new FragmentExecutionStats(0, false, 0, null, false, -1, null);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.analytics.backend.AnalyticsOperationListener;
import org.opensearch.analytics.backend.FragmentExecutionStats;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.logging.SlowLogLevel;
import org.opensearch.common.unit.TimeValue;
Expand Down Expand Up @@ -43,15 +44,16 @@ public void onFragmentSuccess(
String shardId,
long tookInNanos,
long rowsProduced,
IndexSettings indexSettings
IndexSettings indexSettings,
FragmentExecutionStats stats
) {
long warnThreshold = indexSettings.getValue(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_WARN_SETTING).nanos();
long infoThreshold = indexSettings.getValue(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_INFO_SETTING).nanos();
long debugThreshold = indexSettings.getValue(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_DEBUG_SETTING).nanos();
long traceThreshold = indexSettings.getValue(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_TRACE_SETTING).nanos();
SlowLogLevel level = indexSettings.getValue(SearchSlowLog.INDEX_SEARCH_SLOWLOG_LEVEL);

String message = formatMessage(queryId, stageId, shardId, tookInNanos, rowsProduced);
String message = formatMessage(queryId, stageId, shardId, tookInNanos, rowsProduced, stats);
if (warnThreshold >= 0 && tookInNanos > warnThreshold && level.isLevelEnabledFor(SlowLogLevel.WARN)) {
logger.warn(message);
} else if (infoThreshold >= 0 && tookInNanos > infoThreshold && level.isLevelEnabledFor(SlowLogLevel.INFO)) {
Expand All @@ -63,19 +65,30 @@ public void onFragmentSuccess(
}
}

private static String formatMessage(String queryId, int stageId, String shardId, long tookInNanos, long rowsProduced) {
return "took["
+ TimeValue.timeValueNanos(tookInNanos)
+ "], took_millis["
+ TimeUnit.NANOSECONDS.toMillis(tookInNanos)
+ "], query_id["
+ queryId
+ "], stage_id["
+ stageId
+ "], shard["
+ shardId
+ "], rows_produced["
+ rowsProduced
+ "]";
private static String formatMessage(
String queryId,
int stageId,
String shardId,
long tookInNanos,
long rowsProduced,
FragmentExecutionStats stats
) {
StringBuilder sb = new StringBuilder();
sb.append("took[").append(TimeValue.timeValueNanos(tookInNanos));
sb.append("], took_millis[").append(TimeUnit.NANOSECONDS.toMillis(tookInNanos));
sb.append("], query_id[").append(queryId);
sb.append("], stage_id[").append(stageId);
sb.append("], shard[").append(shardId);
sb.append("], rows_produced[").append(rowsProduced);
sb.append("], used_secondary_index[").append(stats.usedSecondaryIndex());
if (stats.usedSecondaryIndex()) {
sb.append("], delegated_predicates[").append(stats.delegatedPredicateCount());
sb.append("], filter_tree_shape[").append(stats.filterTreeShape());
}
sb.append("], partial_aggregate[").append(stats.hasPartialAggregate());
sb.append("], task_id[").append(stats.taskId());
sb.append("], id[").append(stats.opaqueId() != null ? stats.opaqueId() : "");
sb.append("]");
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.analytics.backend.AnalyticsOperationListener;
import org.opensearch.analytics.backend.EngineResultBatch;
import org.opensearch.analytics.backend.EngineResultStream;
import org.opensearch.analytics.backend.FragmentExecutionStats;
import org.opensearch.analytics.backend.SearchExecEngine;
import org.opensearch.analytics.backend.ShardScanExecutionContext;
import org.opensearch.analytics.exec.action.FetchByRowIdsRequest;
Expand Down Expand Up @@ -123,9 +124,18 @@ public void setTaskResourceTrackingService(TaskResourceTrackingService service)
}

public FragmentResources executeFragmentStreaming(FragmentExecutionRequest request, IndexShard shard, AnalyticsShardTask task) {
return executeFragmentStreamingResolved(request, shard, task).resources;
}

private ResolvedExecution executeFragmentStreamingResolved(
FragmentExecutionRequest request,
IndexShard shard,
AnalyticsShardTask task
) {
ResolvedFragment resolved = resolveFragment(request, shard);
try {
return startFragment(request, resolved, shard, task);
FragmentResources resources = startFragment(request, resolved, shard, task);
return new ResolvedExecution(resources, resolved);
} catch (TaskCancelledException | IllegalStateException | IllegalArgumentException e) {
listener.onFragmentFailure(resolved.queryId, resolved.stageId, resolved.shardIdStr, e);
throw e;
Expand All @@ -135,6 +145,13 @@ public FragmentResources executeFragmentStreaming(FragmentExecutionRequest reque
}
}

private record ResolvedExecution(FragmentResources resources, ResolvedFragment resolved) implements AutoCloseable {
@Override
public void close() throws Exception {
resources.close();
}
}

/**
* Async variant that forks fragment execution onto the given executor and streams
* batches back through the channel. The transport thread returns immediately.
Expand All @@ -151,22 +168,39 @@ public void executeFragmentStreamingAsync(
LOGGER.debug("[FragmentExecution] shard={} task={}", shard.shardId(), task.getId());
final long startNanos = System.nanoTime();
long rowsProduced = 0;
try (FragmentResources ctx = executeFragmentStreaming(request, shard, task)) {
Iterator<EngineResultBatch> it = ctx.stream().iterator();
try (ResolvedExecution exec = executeFragmentStreamingResolved(request, shard, task)) {
Iterator<EngineResultBatch> it = exec.resources().stream().iterator();
while (it.hasNext()) {
EngineResultBatch batch = it.next();
rowsProduced += batch.getRowCount();
responseHandler.onBatch(batch);
}
long fragmentTookNanos = System.nanoTime() - startNanos;
responseHandler.onComplete();
ResolvedFragment resolved = exec.resolved();
DelegationDescriptor delegation = resolved.plan().getDelegationDescriptor();
boolean usedSecondaryIndex = delegation != null;
int delegatedPredicateCount = delegation != null ? delegation.delegatedPredicateCount() : 0;
String filterTreeShape = delegation != null ? delegation.treeShape().name() : null;
boolean hasPartialAggregate = resolved.plan().getInstructions().stream()
.anyMatch(n -> n.type() == org.opensearch.analytics.spi.InstructionType.SETUP_PARTIAL_AGGREGATE);
FragmentExecutionStats stats = new FragmentExecutionStats(
rowsProduced,
usedSecondaryIndex,
delegatedPredicateCount,
filterTreeShape,
hasPartialAggregate,
task.getId(),
task.getHeader(Task.X_OPAQUE_ID)
);
listener.onFragmentSuccess(
request.getQueryId(),
request.getStageId(),
shard.shardId().toString(),
fragmentTookNanos,
rowsProduced,
shard.indexSettings()
shard.indexSettings(),
stats
);
} catch (Exception e) {
responseHandler.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.Version;
import org.opensearch.analytics.backend.FragmentExecutionStats;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -52,7 +53,8 @@ public void testFragmentSlowLogFiresWhenAboveThreshold() throws Exception {
)
);

fragmentSlowLog.onFragmentSuccess("q1", 0, "test-shard", TimeValue.timeValueMillis(5).nanos(), 42, indexSettings);
fragmentSlowLog.onFragmentSuccess("q1", 0, "test-shard", TimeValue.timeValueMillis(5).nanos(), 42, indexSettings,
new FragmentExecutionStats(42, true, 2, "CONJUNCTIVE", false, 99, "opaque-1"));
appender.assertAllExpectationsMatched();
}
}
Expand All @@ -74,7 +76,8 @@ public void testFragmentSlowLogDoesNotFireWhenBelowThreshold() throws Exception
)
);

fragmentSlowLog.onFragmentSuccess("q1", 0, "test-shard", TimeValue.timeValueMillis(5).nanos(), 42, indexSettings);
fragmentSlowLog.onFragmentSuccess("q1", 0, "test-shard", TimeValue.timeValueMillis(5).nanos(), 42, indexSettings,
new FragmentExecutionStats(42, true, 2, "CONJUNCTIVE", false, 99, "opaque-1"));
appender.assertAllExpectationsMatched();
}
}
Expand All @@ -92,11 +95,12 @@ public void testFragmentSlowLogContainsAllExpectedFields() throws Exception {
"all fields present",
AnalyticsFragmentSlowLog.LOGGER_NAME,
Level.WARN,
".*took\\[.*\\].*took_millis\\[\\d+\\].*query_id\\[q-fields\\].*stage_id\\[2\\].*shard\\[\\[my-idx\\]\\[0\\]\\].*rows_produced\\[100\\].*"
".*took\\[.*\\].*took_millis\\[\\d+\\].*query_id\\[q-fields\\].*stage_id\\[2\\].*shard\\[\\[my-idx\\]\\[0\\]\\].*rows_produced\\[100\\].*used_secondary_index\\[false\\].*partial_aggregate\\[true\\].*task_id\\[101\\].*id\\[opaque-2\\].*"
)
);

fragmentSlowLog.onFragmentSuccess("q-fields", 2, "[my-idx][0]", TimeValue.timeValueMillis(50).nanos(), 100, indexSettings);
fragmentSlowLog.onFragmentSuccess("q-fields", 2, "[my-idx][0]", TimeValue.timeValueMillis(50).nanos(), 100, indexSettings,
new FragmentExecutionStats(100, false, 0, null, true, 101, "opaque-2"));
appender.assertAllExpectationsMatched();
}
}
Expand Down
Loading