Skip to content

Add slow log support for analytics engine indexing and search paths#21884

Open
himshikhagupta wants to merge 1 commit into
opensearch-project:mainfrom
himshikhagupta:analytics-slow-log
Open

Add slow log support for analytics engine indexing and search paths#21884
himshikhagupta wants to merge 1 commit into
opensearch-project:mainfrom
himshikhagupta:analytics-slow-log

Conversation

@himshikhagupta
Copy link
Copy Markdown
Contributor

@himshikhagupta himshikhagupta commented May 29, 2026

Description

Wire up slow logs for the DataFusion/Parquet engine so operators can identify slow operations through the analytics path, matching the observability available on the standard Lucene search/indexing path.

Indexing path: verified that the existing IndexingSlowLog fires correctly for DFA-backed indices
Search path (:

  • AnalyticsSearchSlowLog (coordinator): logs query completions exceeding cluster.search.request.slowlog.* thresholds. Captures query source (PPL/SQL text), X-Opaque-ID,
    and request ID via a per-query wrapper closure.
  • AnalyticsFragmentSlowLog (data node): logs fragment executions exceeding index.search.slowlog.threshold.query.* per-index thresholds, reading IndexSettings at call
    time -- same settings as SearchSlowLog.

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@himshikhagupta himshikhagupta requested a review from a team as a code owner May 29, 2026 12:42
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 29, 2026

PR Reviewer Guide 🔍

(Review updated until commit 6506853)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Possible Issue

The rowsProduced counter is incremented inside the while loop but only the final count is passed to onFragmentSuccess. If responseHandler.onBatch(batch) throws an exception after incrementing rowsProduced, the counter will be incorrect because the exception handler does not reset it. This could lead to misleading slow log entries showing rows that were never successfully sent.

try (FragmentResources ctx = executeFragmentStreaming(request, shard, task)) {
    Iterator<EngineResultBatch> it = ctx.stream().iterator();
    while (it.hasNext()) {
        EngineResultBatch batch = it.next();
        rowsProduced += batch.getRowCount();
        responseHandler.onBatch(batch);
    }
    long fragmentTookNanos = System.nanoTime() - startNanos;
    responseHandler.onComplete();
    listener.onFragmentSuccess(
        request.getQueryId(),
        request.getStageId(),
        shard.shardId().toString(),
        fragmentTookNanos,
        rowsProduced,
        shard.indexSettings()
    );
Possible Issue

The totalRows calculation uses rows instanceof List to determine the count, but batchesToRows returns an Iterable<Object[]> which may not always be a List. If the iterable is not a List (e.g., a lazy iterator), totalRows will always be 0, causing the slow log to report incorrect row counts. This happens when the result set is large or streamed.

ActionListener<Iterable<VectorSchemaRoot>> batchesListener = ActionListener.runAfter(ActionListener.wrap(batches -> {
    Iterable<Object[]> rows = batchesToRows(batches, outputColumnOrder);
    long totalRows = rows instanceof List ? ((List<?>) rows).size() : 0;
    queryListener.onQueryComplete(dag.queryId(), System.nanoTime() - queryStartNanos, totalRows);
    rowsListener.onResponse(rows);
}, rowsListener::onFailure), () -> taskManager.unregister(queryTask));

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 29, 2026

PR Code Suggestions ✨

Latest suggestions up to 6506853

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Synchronize concurrent StringBuilder access

The stageTook StringBuilder is accessed without synchronization in a multi-threaded
context. Since multiple stages can complete concurrently and call onStageSuccess,
this creates a race condition that can corrupt the StringBuilder's internal state or
produce garbled output. Synchronize access to stageTook or use a thread-safe
alternative like ConcurrentHashMap to accumulate stage timings.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsSearchSlowLog.java [136-139]

-public void onStageSuccess(String queryId, int stageId, String stageType, long tookInNanos, long rowsProcessed) {
+public synchronized void onStageSuccess(String queryId, int stageId, String stageType, long tookInNanos, long rowsProcessed) {
     if (stageTook.length() > 0) stageTook.append(", ");
     stageTook.append(stageType).append("=").append(TimeUnit.NANOSECONDS.toMillis(tookInNanos));
 }
Suggestion importance[1-10]: 9

__

Why: The stageTook StringBuilder is accessed without synchronization in onStageSuccess, which can be called concurrently by multiple stages. This is a critical thread-safety issue that can corrupt the StringBuilder's internal state or produce garbled output in the slow log.

High
Ensure memory visibility for planning time

The planningTimeMs field is declared as non-volatile but is written in
onPlanningComplete and read in onQueryComplete, which may execute on different
threads. Without proper memory visibility guarantees, the read in onQueryComplete
might see a stale value (0) instead of the updated planning time. Declare
planningTimeMs as volatile to ensure visibility across threads.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsSearchSlowLog.java [118]

-public void onPlanningComplete(String queryId, long tookInNanos) {
-    this.planningTimeMs = TimeUnit.NANOSECONDS.toMillis(tookInNanos);
-}
+private volatile long planningTimeMs;
Suggestion importance[1-10]: 8

__

Why: The planningTimeMs field is written in onPlanningComplete and read in onQueryComplete on potentially different threads without proper memory visibility guarantees. Making it volatile ensures the correct value is visible across threads.

Medium
General
Fix row count for non-List iterables

The totalRows calculation assumes that batchesToRows returns a List when it needs to
count rows, but if it returns a different Iterable type (e.g., a lazy iterator), the
count will incorrectly be 0. This causes the slow log to report total_rows[0] even
when rows were produced. Either materialize the iterable into a list before
counting, or iterate through it to count rows accurately.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/DefaultPlanExecutor.java [255-260]

 ActionListener<Iterable<VectorSchemaRoot>> batchesListener = ActionListener.runAfter(ActionListener.wrap(batches -> {
     Iterable<Object[]> rows = batchesToRows(batches, outputColumnOrder);
-    long totalRows = rows instanceof List ? ((List<?>) rows).size() : 0;
+    long totalRows = 0;
+    if (rows instanceof List) {
+        totalRows = ((List<?>) rows).size();
+    } else {
+        for (Object[] ignored : rows) totalRows++;
+    }
     queryListener.onQueryComplete(dag.queryId(), System.nanoTime() - queryStartNanos, totalRows);
     rowsListener.onResponse(rows);
 }, rowsListener::onFailure), () -> taskManager.unregister(queryTask));
Suggestion importance[1-10]: 7

__

Why: The totalRows calculation assumes batchesToRows returns a List, but if it returns a different Iterable type, the count will be 0. The suggestion correctly handles both cases, though iterating through a non-List iterable may have performance implications.

Medium

Previous suggestions

Suggestions up to commit 4691e08
CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix thread-safety in stage timing accumulation

The QuerySlowLogListener accumulates stage timing in a non-thread-safe
StringBuilder. If multiple stages complete concurrently, this can lead to corrupted
log output or race conditions. Use a thread-safe collection or synchronize access to
stageTook.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsSearchSlowLog.java [136-139]

-public void testSlowLogContainsQuerySourceViaPPLFrontend() throws Exception {
-    setSlowLogThreshold(TimeValue.timeValueMillis(0));
-    createAndSeedIndex();
+private final ConcurrentHashMap<Integer, Long> stageTiming = new ConcurrentHashMap<>();
 
-    Logger queryLogger = LogManager.getLogger(QUERY_LOGGER);
-    Loggers.setLevel(queryLogger, Level.WARN);
-
-    try (MockLogAppender appender = MockLogAppender.createForLoggers(queryLogger)) {
-        appender.addExpectation(
-            new MockLogAppender.PatternSeenWithLoggerPrefixExpectation(
-                "source field contains PPL text",
-                QUERY_LOGGER,
-                Level.WARN,
-                ".*source\\[source = " + INDEX + ".*\\].*"
-            )
-        );
-
-        PPLResponse response = client().execute(
-            UnifiedPPLExecuteAction.INSTANCE,
-            new PPLRequest("source = " + INDEX + " | fields val")
-        ).actionGet();
-        assertFalse("PPL query must return rows", response.getRows().isEmpty());
-
-        appender.assertAllExpectationsMatched();
-    }
+@Override
+public void onStageSuccess(String queryId, int stageId, String stageType, long tookInNanos, long rowsProcessed) {
+    stageTiming.put(stageId, TimeUnit.NANOSECONDS.toMillis(tookInNanos));
 }
 
+@Override
+public void onQueryComplete(String queryId, long totalTookInNanos, long totalRows) {
+    StringBuilder stageTook = new StringBuilder();
+    stageTiming.forEach((id, millis) -> {
+        if (stageTook.length() > 0) stageTook.append(", ");
+        stageTook.append("stage").append(id).append("=").append(millis);
+    });
+    // ... rest of method
+}
+
Suggestion importance[1-10]: 8

__

Why: The StringBuilder stageTook in QuerySlowLogListener is not thread-safe and could be accessed concurrently by multiple stage completion events, leading to corrupted log output. This is a legitimate concurrency issue that could affect production systems.

Medium
General
Ensure accurate row counting for slow logs

The totalRows calculation assumes batchesToRows returns a List, but if it returns a
different Iterable type, the count will be zero. This leads to incorrect row counts
in slow logs. Ensure accurate row counting regardless of the returned type.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/DefaultPlanExecutor.java [255-260]

 ActionListener<Iterable<VectorSchemaRoot>> batchesListener = ActionListener.runAfter(ActionListener.wrap(batches -> {
     Iterable<Object[]> rows = batchesToRows(batches, outputColumnOrder);
-    long totalRows = rows instanceof List ? ((List<?>) rows).size() : 0;
+    long totalRows = 0;
+    for (Object[] ignored : rows) {
+        totalRows++;
+    }
     queryListener.onQueryComplete(dag.queryId(), System.nanoTime() - queryStartNanos, totalRows);
     rowsListener.onResponse(rows);
 }, rowsListener::onFailure), () -> taskManager.unregister(queryTask));
Suggestion importance[1-10]: 7

__

Why: The current implementation assumes batchesToRows returns a List for row counting, which may not always be true. If it returns a different Iterable, the row count will be zero, leading to inaccurate slow log metrics. However, iterating through all rows just to count them could have performance implications.

Medium
Sanitize stage type before logging

The stageType parameter is appended directly to the slow log without validation. If
stageType is null or contains special characters, it could corrupt the log format or
cause parsing issues. Validate and sanitize stageType before appending.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsSearchSlowLog.java [136-139]

 @Override
 public void onStageSuccess(String queryId, int stageId, String stageType, long tookInNanos, long rowsProcessed) {
     if (stageTook.length() > 0) stageTook.append(", ");
-    stageTook.append(stageType).append("=").append(TimeUnit.NANOSECONDS.toMillis(tookInNanos));
+    String sanitizedType = stageType != null ? stageType.replaceAll("[\\[\\],=]", "_") : "unknown";
+    stageTook.append(sanitizedType).append("=").append(TimeUnit.NANOSECONDS.toMillis(tookInNanos));
 }
Suggestion importance[1-10]: 4

__

Why: While sanitizing stageType could prevent log format corruption, the suggestion assumes stageType might contain problematic characters without evidence from the codebase. The improvement is defensive but may be unnecessary if stageType values are controlled internally.

Low
Suggestions up to commit cd3bff4
CategorySuggestion                                                                                                                                    Impact
General
Fix row count calculation accuracy

The row count calculation assumes rows is a List, but batchesToRows may return other
Iterable types. If it's not a List, totalRows defaults to 0, which produces
incorrect metrics. Consider iterating through the results to count rows accurately,
or ensure batchesToRows always returns a List.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/DefaultPlanExecutor.java [255-260]

 ActionListener<Iterable<VectorSchemaRoot>> batchesListener = ActionListener.runAfter(ActionListener.wrap(batches -> {
     Iterable<Object[]> rows = batchesToRows(batches, outputColumnOrder);
-    long totalRows = rows instanceof List ? ((List<?>) rows).size() : 0;
+    long totalRows = 0;
+    if (rows instanceof Collection) {
+        totalRows = ((Collection<?>) rows).size();
+    } else {
+        for (Object[] ignored : rows) {
+            totalRows++;
+        }
+    }
     queryListener.onQueryComplete(dag.queryId(), System.nanoTime() - queryStartNanos, totalRows);
     rowsListener.onResponse(rows);
 }, rowsListener::onFailure), () -> taskManager.unregister(queryTask));
Suggestion importance[1-10]: 6

__

Why: Valid concern about row counting accuracy when rows is not a List. The suggestion improves robustness by handling both Collection and general Iterable types. However, the impact is moderate since the current code already handles the common case, and the edge case may be rare in practice.

Low
Suggestions up to commit 1f6c58a
CategorySuggestion                                                                                                                                    Impact
General
Record slow log before response completion

The slow log callback listener.onFragmentSuccess is invoked after
responseHandler.onComplete(), but if onComplete() throws an exception, the slow log
will never fire. Move the slow log call before onComplete() to ensure timing is
always recorded, even if response handling fails.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsSearchService.java [153-169]

 try (FragmentResources ctx = executeFragmentStreaming(request, shard, task)) {
     Iterator<EngineResultBatch> it = ctx.stream().iterator();
     while (it.hasNext()) {
         EngineResultBatch batch = it.next();
         rowsProduced += batch.getRowCount();
         responseHandler.onBatch(batch);
     }
     long fragmentTookNanos = System.nanoTime() - startNanos;
-    responseHandler.onComplete();
     listener.onFragmentSuccess(
         request.getQueryId(),
         request.getStageId(),
         shard.shardId().toString(),
         fragmentTookNanos,
         rowsProduced,
         shard.indexSettings()
     );
+    responseHandler.onComplete();
 } catch (Exception e) {
     responseHandler.onFailure(e);
 }
Suggestion importance[1-10]: 8

__

Why: Moving listener.onFragmentSuccess before responseHandler.onComplete() ensures the slow log is recorded even if onComplete() throws an exception. This improves reliability of the slow log mechanism and prevents loss of timing data in edge cases.

Medium
Possible issue
Ensure thread-safe stage timing collection

The stageTook StringBuilder is not thread-safe and could be accessed concurrently if
multiple stages complete simultaneously. Consider using a thread-safe alternative
like ConcurrentHashMap or synchronizing access to prevent data corruption in the
stage timing log.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsSearchSlowLog.java [96-104]

 public AnalyticsOperationListener wrapWithQueryMetadata(String querySource, String opaqueId, String requestId) {
     return new AnalyticsOperationListener() {
-        private final StringBuilder stageTook = new StringBuilder();
+        private final ConcurrentHashMap<Integer, String> stageTiming = new ConcurrentHashMap<>();
 
         @Override
         public void onStageSuccess(String queryId, int stageId, String stageType, long tookInNanos, long rowsProcessed) {
-            if (stageTook.length() > 0) stageTook.append(", ");
-            stageTook.append(stageType).append("=").append(TimeUnit.NANOSECONDS.toMillis(tookInNanos));
+            stageTiming.put(stageId, stageType + "=" + TimeUnit.NANOSECONDS.toMillis(tookInNanos));
         }
 
+        @Override
+        public void onQuerySuccess(String queryId, long tookInNanos, long totalRows) {
+            String stageTook = stageTiming.values().stream().collect(Collectors.joining(", "));
+            logQuery(queryId, tookInNanos, totalRows, querySource, opaqueId, requestId, stageTook);
+        }
+    };
+}
+
Suggestion importance[1-10]: 7

__

Why: The StringBuilder used for stageTook could face thread-safety issues if multiple stages complete concurrently. However, the AnalyticsOperationListener callbacks are typically invoked sequentially within the query execution flow, making this a lower-probability issue. The suggestion is valid but may not be critical in practice.

Medium
Suggestions up to commit d84b0ac
CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent memory leak in stage maps

The per-query listener closure captures stage metadata in unbounded maps that are
never cleaned up. If a stage fails or the query is cancelled before onQuerySuccess
fires, these maps will leak memory indefinitely. Add cleanup in onQueryFailure or
use weak references.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsSearchSlowLog.java [100-101]

 private final Map<Integer, String> stageTypes = new ConcurrentHashMap<>();
 private final Map<String, Long> stageTiming = new ConcurrentHashMap<>();
 
+@Override
+public void onQueryFailure(String queryId, Exception e) {
+    stageTypes.clear();
+    stageTiming.clear();
+}
+
Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies a memory leak where stageTypes and stageTiming maps are never cleaned up if a query fails or is cancelled. This is a significant issue that could lead to unbounded memory growth in production systems.

Medium
General
Ensure slow log fires on completion

If responseHandler.onComplete() throws an exception, the slow log callback will
never fire, causing silent loss of timing data. Move the slow log call before
onComplete() or wrap it in a try-finally block to guarantee execution.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsSearchService.java [160-169]

 long fragmentTookNanos = System.nanoTime() - startNanos;
-responseHandler.onComplete();
-listener.onFragmentSuccess(
-    request.getQueryId(),
-    request.getStageId(),
-    shard.shardId().toString(),
-    fragmentTookNanos,
-    rowsProduced,
-    shard.indexSettings()
-);
+try {
+    responseHandler.onComplete();
+} finally {
+    listener.onFragmentSuccess(
+        request.getQueryId(),
+        request.getStageId(),
+        shard.shardId().toString(),
+        fragmentTookNanos,
+        rowsProduced,
+        shard.indexSettings()
+    );
+}
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that if responseHandler.onComplete() throws an exception, the slow log callback won't fire. Using a try-finally block ensures the slow log is always recorded, which is important for observability and debugging.

Medium
Cache per-index slow log thresholds

Reading five index settings on every fragment completion creates unnecessary
overhead. Cache these values per-index and update them via an IndexSettings listener
to avoid repeated lookups on the hot path.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsFragmentSlowLog.java [48-52]

-long warnThreshold = indexSettings.getValue(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_WARN_SETTING).nanos();
-long infoThreshold = indexSettings.getValue(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_INFO_SETTING).nanos();
-long debugThreshold = indexSettings.getValue(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_DEBUG_SETTING).nanos();
-long traceThreshold = indexSettings.getValue(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_TRACE_SETTING).nanos();
-SlowLogLevel level = indexSettings.getValue(SearchSlowLog.INDEX_SEARCH_SLOWLOG_LEVEL);
+private final Map<String, CachedThresholds> thresholdCache = new ConcurrentHashMap<>();
 
+private CachedThresholds getThresholds(IndexSettings indexSettings) {
+    return thresholdCache.computeIfAbsent(indexSettings.getIndex().getName(), k -> {
+        CachedThresholds cached = new CachedThresholds(indexSettings);
+        indexSettings.getScopedSettings().addSettingsUpdateConsumer(..., cached::update);
+        return cached;
+    });
+}
+
Suggestion importance[1-10]: 6

__

Why: The suggestion addresses a performance optimization by caching per-index settings to avoid repeated lookups. While valid, the impact is moderate since the overhead of reading settings is relatively small compared to the fragment execution itself.

Low
Suggestions up to commit c81a5e1
CategorySuggestion                                                                                                                                    Impact
General
Reorder callback invocation for accuracy

The onFragmentSuccess callback is invoked after onComplete(), which may cause the
slow log to fire after the response has already been sent. If onComplete() triggers
downstream processing or closes resources, the timing measurement could be
inaccurate. Move the onFragmentSuccess call before onComplete() to ensure accurate
timing and proper ordering of lifecycle events.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsSearchService.java [152-169]

 long rowsProduced = 0;
             try (FragmentResources ctx = executeFragmentStreaming(request, shard, task)) {
                 Iterator<EngineResultBatch> it = ctx.stream().iterator();
                 while (it.hasNext()) {
                     EngineResultBatch batch = it.next();
                     rowsProduced += batch.getRowCount();
                     responseHandler.onBatch(batch);
                 }
                 long fragmentTookNanos = System.nanoTime() - startNanos;
-                responseHandler.onComplete();
                 listener.onFragmentSuccess(
                     request.getQueryId(),
                     request.getStageId(),
                     shard.shardId().toString(),
                     fragmentTookNanos,
                     rowsProduced,
                     shard.indexSettings()
                 );
+                responseHandler.onComplete();
             } catch (Exception e) {
                 responseHandler.onFailure(e);
             }
Suggestion importance[1-10]: 7

__

Why: Moving onFragmentSuccess before onComplete() ensures the slow log captures timing before any downstream processing triggered by onComplete(). This improves accuracy and maintains proper lifecycle ordering, though the impact is moderate since onComplete() likely doesn't significantly affect timing in most cases.

Medium
Track timing for failed fragments

When an exception occurs during fragment execution, the onFragmentSuccess callback
is never invoked, meaning slow log entries are lost for failed fragments. Consider
adding an onFragmentFailure callback to the listener interface to capture timing
data even when fragments fail, ensuring complete observability of all fragment
executions regardless of outcome.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsSearchService.java [170-172]

 } catch (Exception e) {
+                long fragmentTookNanos = System.nanoTime() - startNanos;
+                listener.onFragmentFailure(
+                    request.getQueryId(),
+                    request.getStageId(),
+                    shard.shardId().toString(),
+                    fragmentTookNanos,
+                    e
+                );
                 responseHandler.onFailure(e);
             }
Suggestion importance[1-10]: 6

__

Why: Adding failure tracking would improve observability by capturing timing data for failed fragments. However, this requires interface changes (onFragmentFailure method) not present in the current PR, making it more of an enhancement suggestion than a fix for existing code. The suggestion is valid but represents additional functionality beyond the PR's scope.

Low

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 6b53f82: SUCCESS

@codecov
Copy link
Copy Markdown

codecov Bot commented May 29, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 73.43%. Comparing base (dad63c0) to head (6506853).
⚠️ Report is 5 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main   #21884      +/-   ##
============================================
- Coverage     73.51%   73.43%   -0.09%     
+ Complexity    75582    75521      -61     
============================================
  Files          6034     6034              
  Lines        342661   342661              
  Branches      49294    49294              
============================================
- Hits         251918   251627     -291     
- Misses        70712    71038     +326     
+ Partials      20031    19996      -35     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 9b26f6f

@github-actions
Copy link
Copy Markdown
Contributor

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit c81a5e1.

PathLineSeverityDescription
sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsSearchSlowLog.java133lowThe slow log records the full query source text (SQL/PPL), opaque ID, and request ID in log messages. While this mirrors existing OpenSearch SearchRequestSlowLog behavior, queries may contain sensitive data (e.g., filter values, PII in WHERE clauses) that would be written to disk in log files. No exfiltration to external endpoints is present — this is an operational data-exposure concern, not malicious.
sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsFragmentSlowLog.java36lowConstructor unconditionally sets the logger level to TRACE via Loggers.setLevel(). This mirrors the existing OpenSearch SearchSlowLog pattern, but hard-coding log level in a constructor could suppress or override operator-configured levels for the shared 'index.search.slowlog' logger namespace at startup, potentially increasing log verbosity beyond what operators expect.

The table above displays the top 10 most important findings.

Total: 2 | Critical: 0 | High: 0 | Medium: 0 | Low: 2


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit c81a5e1

String requestId
) {
StringBuilder sb = new StringBuilder();
sb.append("took[").append(TimeValue.timeValueNanos(tookInNanos)).append("], ");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we extend this to add critical metrics

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for c81a5e1: SUCCESS

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit d84b0ac

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 1f6c58a

Copy link
Copy Markdown
Contributor

@Bukhtawar Bukhtawar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if slow logs can we wired up with AnalyticsOperationListener to logs end to end breakdown per query @mch2 thought?

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 1f6c58a: SUCCESS

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit cd3bff4

@himshikhagupta
Copy link
Copy Markdown
Contributor Author

himshikhagupta commented May 30, 2026

I was wondering if slow logs can we wired up with AnalyticsOperationListener to logs end to end breakdown per query @mch2 thought?

@Bukhtawar this already implements AnalyticsOperationListener and covers complete query lifecycle. All wired through AnalyticsOperationListener events: onPlanningComplete → onStageSuccess → onQueryComplete for coordinator, onFragmentSuccess for data node.

    ┌──────────────────────┬────────────────────────────────────────────────────────────────┬──────────┐
    │        Field         │                          Description                           │ Covered? │
    ├──────────────────────┼────────────────────────────────────────────────────────────────┼──────────┤
    │ took / took_millis   │ Total end-to-end time (planning + execution + materialization) │    ✅    │
    ├──────────────────────┼────────────────────────────────────────────────────────────────┼──────────┤
    │ planning_time_millis │ Calcite CBO + DAG build + fragment conversion                  │    ✅    │
    ├──────────────────────┼────────────────────────────────────────────────────────────────┼──────────┤
    │ stage_took_millis    │ Per-stage breakdown (ShardFragment, Reduce, etc.)              │    ✅    │
    ├──────────────────────┼────────────────────────────────────────────────────────────────┼──────────┤
    │ query_id             │ Unique query identifier for correlation                        │    ✅    │
    ├──────────────────────┼────────────────────────────────────────────────────────────────┼──────────┤
    │ total_rows           │ Actual row count after materialization                         │    ✅    │
    ├──────────────────────┼────────────────────────────────────────────────────────────────┼──────────┤
    │ source               │ Original PPL/SQL query text                                    │    ✅    │
    ├──────────────────────┼────────────────────────────────────────────────────────────────┼──────────┤
    │ id                   │ X-Opaque-ID (client correlation)                               │    ✅    │
    ├──────────────────────┼────────────────────────────────────────────────────────────────┼──────────┤
    │ request_id           │ Internal request ID                                            │    ✅    │
    └──────────────────────┴────────────────────────────────────────────────────────────────┴──────────┘
    
    Fragment-level (per-shard):
    
    ┌──────────────────────┬─────────────────────────────────────────────┬──────────┐
    │        Field         │                 Description                 │ Covered? │
    ├──────────────────────┼─────────────────────────────────────────────┼──────────┤
    │ took / took_millis   │ Fragment execution time                     │    ✅    │
    ├──────────────────────┼─────────────────────────────────────────────┼──────────┤
    │ query_id             │ Query correlation                           │    ✅    │
    ├──────────────────────┼─────────────────────────────────────────────┼──────────┤
    │ stage_id             │ Stage identifier                            │    ✅    │
    ├──────────────────────┼─────────────────────────────────────────────┼──────────┤
    │ shard                │ Index + shard number                        │    ✅    │
    ├──────────────────────┼─────────────────────────────────────────────┼──────────┤
    │ rows_produced        │ Rows produced by this shard                 │    ✅    │
    ├──────────────────────┼─────────────────────────────────────────────┼──────────┤
    │ Per-index thresholds │ Uses index.search.slowlog.threshold.query.* │    ✅    │
    └──────────────────────┴─────────────────────────────────────────────┴──────────┘
    

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 4691e08

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 4691e08: null

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Wire up slow logs for the DataFusion/Parquet engine so operators can
identify slow operations through the analytics path, matching the
observability available on the standard Lucene search/indexing path.

Indexing path: verified that the existing IndexingSlowLog fires correctly
for DFA-backed indices (same IndexShard.index() listener sandwich).

Search path:
- AnalyticsSearchSlowLog (coordinator): logs query completions exceeding
  cluster.search.request.slowlog.* thresholds. Captures query source
  (PPL/SQL text), X-Opaque-ID, and request ID via a per-query wrapper
  closure pattern.
- AnalyticsFragmentSlowLog (data node): logs fragment executions exceeding
  index.search.slowlog.threshold.query.* per-index thresholds, reading
  IndexSettings at call time (same settings as SearchSlowLog).

Both log to the existing slow log files — no new log4j2 configuration
required.

Signed-off-by: Himshikha Gupta <himshikh@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 6506853

sb.append("took_millis[").append(TimeUnit.NANOSECONDS.toMillis(totalTookInNanos)).append("], ");
sb.append("planning_time_millis[").append(planningTimeMs).append("], ");
sb.append("stage_took_millis[{").append(stageTook).append("}], ");
sb.append("query_id[").append(queryId).append("], ");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would query id be useful?

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 6506853: SUCCESS

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants