Skip to content

Refactor datafusion plugin stats to node level metrics#21846

Open
AjayRajNelapudi wants to merge 3 commits into
opensearch-project:mainfrom
AjayRajNelapudi:feature/df-node-level-stats
Open

Refactor datafusion plugin stats to node level metrics#21846
AjayRajNelapudi wants to merge 3 commits into
opensearch-project:mainfrom
AjayRajNelapudi:feature/df-node-level-stats

Conversation

@AjayRajNelapudi
Copy link
Copy Markdown
Contributor

Description

Refactor datafusion plugin stats to node level metrics

AI generated test report

DataFusion Cluster Stats — Endpoint Test Report

Test 1: All stats from all nodes

Expected: Returns _nodes, cluster_name, nodes with all 8 stat sections. No name/host/transport_address.

Actual:

{
  "_nodes": {
    "total": 1,
    "successful": 1,
    "failed": 0
  },
  "cluster_name": "runTask",
  "nodes": {
    "iXuve5W0TZqx77qY_CY9pQ": {
      "io_runtime": {
        "workers_count": 16,
        "total_polls_count": 0,
        "total_busy_duration_ms": 0,
        "total_overflow_count": 0,
        "global_queue_depth": 0,
        "blocking_queue_depth": 0,
        "num_alive_tasks": 0,
        "spawned_tasks_count": 0,
        "total_local_queue_depth": 0
      },
      "cpu_runtime": {
        "workers_count": 8,
        "total_polls_count": 0,
        "total_busy_duration_ms": 0,
        "total_overflow_count": 0,
        "global_queue_depth": 0,
        "blocking_queue_depth": 0,
        "num_alive_tasks": 0,
        "spawned_tasks_count": 0,
        "total_local_queue_depth": 0
      },
      "coordinator_reduce": {
        "total_poll_duration_ms": 0,
        "total_scheduled_duration_ms": 0,
        "total_idle_duration_ms": 0
      },
      "query_execution": {
        "total_poll_duration_ms": 0,
        "total_scheduled_duration_ms": 0,
        "total_idle_duration_ms": 0
      },
      "stream_next": {
        "total_poll_duration_ms": 0,
        "total_scheduled_duration_ms": 0,
        "total_idle_duration_ms": 0
      },
      "plan_setup": {
        "total_poll_duration_ms": 0,
        "total_scheduled_duration_ms": 0,
        "total_idle_duration_ms": 0
      },
      "datanode_gate": {
        "max_permits": 12,
        "active_permits": 0,
        "total_wait_duration_ms": 0,
        "total_batches_started": 0
      },
      "coordinator_gate": {
        "max_permits": 12,
        "active_permits": 0,
        "total_wait_duration_ms": 0,
        "total_batches_started": 0
      }
    }
  }
}

Result: ✅ PASS

Test 2: Local node only (_local)

Expected: Returns stats for local node only.

Actual:

{
  "_nodes": {
    "total": 1,
    "successful": 1,
    "failed": 0
  },
  "cluster_name": "runTask",
  "nodes": {
    "iXuve5W0TZqx77qY_CY9pQ": {
      "io_runtime": {
        "workers_count": 16,
        "total_polls_count": 0,
        "total_busy_duration_ms": 0,
        "total_overflow_count": 0,
        "global_queue_depth": 0,
        "blocking_queue_depth": 0,
        "num_alive_tasks": 0,
        "spawned_tasks_count": 0,
        "total_local_queue_depth": 0
      },
      "cpu_runtime": {
        "workers_count": 8,
        "total_polls_count": 0,
        "total_busy_duration_ms": 0,
        "total_overflow_count": 0,
        "global_queue_depth": 0,
        "blocking_queue_depth": 0,
        "num_alive_tasks": 0,
        "spawned_tasks_count": 0,
        "total_local_queue_depth": 0
      },
      "coordinator_reduce": {
        "total_poll_duration_ms": 0,
        "total_scheduled_duration_ms": 0,
        "total_idle_duration_ms": 0
      },
      "query_execution": {
        "total_poll_duration_ms": 0,
        "total_scheduled_duration_ms": 0,
        "total_idle_duration_ms": 0
      },
      "stream_next": {
        "total_poll_duration_ms": 0,
        "total_scheduled_duration_ms": 0,
        "total_idle_duration_ms": 0
      },
      "plan_setup": {
        "total_poll_duration_ms": 0,
        "total_scheduled_duration_ms": 0,
        "total_idle_duration_ms": 0
      },
      "datanode_gate": {
        "max_permits": 12,
        "active_permits": 0,
        "total_wait_duration_ms": 0,
        "total_batches_started": 0
      },
      "coordinator_gate": {
        "max_permits": 12,
        "active_permits": 0,
        "total_wait_duration_ms": 0,
        "total_batches_started": 0
      }
    }
  }
}

Result: ✅ PASS

Test 3: Single stat filter (io_runtime)

Expected: Only io_runtime section per node.

Actual:

{
  "_nodes": {
    "total": 1,
    "successful": 1,
    "failed": 0
  },
  "cluster_name": "runTask",
  "nodes": {
    "iXuve5W0TZqx77qY_CY9pQ": {
      "io_runtime": {
        "workers_count": 16,
        "total_polls_count": 0,
        "total_busy_duration_ms": 0,
        "total_overflow_count": 0,
        "global_queue_depth": 0,
        "blocking_queue_depth": 0,
        "num_alive_tasks": 0,
        "spawned_tasks_count": 0,
        "total_local_queue_depth": 0
      }
    }
  }
}

Result: ✅ PASS

Test 4: Multiple stat filter (io_runtime,cpu_runtime)

Expected: Only io_runtime and cpu_runtime.

Actual:

{
  "_nodes": {
    "total": 1,
    "successful": 1,
    "failed": 0
  },
  "cluster_name": "runTask",
  "nodes": {
    "iXuve5W0TZqx77qY_CY9pQ": {
      "io_runtime": {
        "workers_count": 16,
        "total_polls_count": 0,
        "total_busy_duration_ms": 0,
        "total_overflow_count": 0,
        "global_queue_depth": 0,
        "blocking_queue_depth": 0,
        "num_alive_tasks": 0,
        "spawned_tasks_count": 0,
        "total_local_queue_depth": 0
      },
      "cpu_runtime": {
        "workers_count": 8,
        "total_polls_count": 0,
        "total_busy_duration_ms": 0,
        "total_overflow_count": 0,
        "global_queue_depth": 0,
        "blocking_queue_depth": 0,
        "num_alive_tasks": 0,
        "spawned_tasks_count": 0,
        "total_local_queue_depth": 0
      }
    }
  }
}

Result: ✅ PASS

Test 5: Node + stat filter (_local/datanode_gate)

Expected: Only datanode_gate for local node.

Actual:

{
  "_nodes": {
    "total": 1,
    "successful": 1,
    "failed": 0
  },
  "cluster_name": "runTask",
  "nodes": {
    "iXuve5W0TZqx77qY_CY9pQ": {
      "datanode_gate": {
        "max_permits": 12,
        "active_permits": 0,
        "total_wait_duration_ms": 0,
        "total_batches_started": 0
      }
    }
  }
}

Result: ✅ PASS

Test 6: Invalid stat name → HTTP 400

Expected: HTTP 400 with error listing valid names.

Actual: HTTP 400
Result: ✅ PASS

Test 7: Mixed valid+invalid → HTTP 400

Expected: HTTP 400 even with one invalid stat.

Actual: HTTP 400
Result: ✅ PASS

Test 8: Deprecated legacy route

Expected: HTTP 200 with same data via legacy prefix.

Actual: HTTP 200
Result: ✅ PASS

Test 9: No IP leakage

Expected: No name/host/transport_address in per-node entries.

Actual: Node keys = ['io_runtime', 'cpu_runtime', 'coordinator_reduce', 'query_execution', 'stream_next', 'plan_setup', 'datanode_gate', 'coordinator_gate']
IP Leak: False
Result: ✅ PASS

Summary

# Test Result
1 All stats from all nodes ✅ PASS
2 Local node only (_local) ✅ PASS
3 Single stat filter (io_runtime) ✅ PASS
4 Multiple stat filter (io_runtime,cpu_runtime) ✅ PASS
5 Node + stat filter (_local/datanode_gate) ✅ PASS
6 Invalid stat name → HTTP 400 ✅ PASS
7 Mixed valid+invalid → HTTP 400 ✅ PASS
8 Deprecated legacy route ✅ PASS
9 No IP leakage ✅ PASS

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@AjayRajNelapudi AjayRajNelapudi requested a review from a team as a code owner May 27, 2026 07:13
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 27, 2026

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit 2a99a20.

PathLineSeverityDescription
sandbox/plugins/analytics-backend-datafusion/build.gradle15highBuild plugin added: 'apply plugin: opensearch.internal-cluster-test'. Mandatory flag: any build plugin change must be flagged for maintainer verification regardless of apparent internal origin.
sandbox/plugins/analytics-backend-datafusion/build.gradle107highSix new internalClusterTestImplementation module dependencies added (arrow-flight-rpc, analytics-engine, analytics-backend-lucene, composite-engine, parquet-data-format, test-ppl-frontend). Mandatory flag: any dependency or module addition must be flagged for maintainer verification. These appear to be same-repository project references, but policy requires explicit review.

The table above displays the top 10 most important findings.

Total: 2 | Critical: 0 | High: 2 | Medium: 0 | Low: 0


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

Signed-off-by: Ajay Raj Nelapudi <ajnelapu@amazon.com>
@AjayRajNelapudi AjayRajNelapudi force-pushed the feature/df-node-level-stats branch from 0c6b8d0 to c650d10 Compare May 27, 2026 07:44
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 27, 2026

PR Reviewer Guide 🔍

(Review updated until commit 2a99a20)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Possible Issue

The constructor no longer validates that ioRuntime is non-null, but the deserialization constructor reads it as optional (readOptionalWriteable). If existing serialized data on the wire contains a non-null ioRuntime, but new code constructs a NativeExecutorsStats with ioRuntime=null and serializes it, the recipient may encounter unexpected null values. This breaks wire compatibility if any code path still expects ioRuntime to always be present.

public NativeExecutorsStats(RuntimeMetrics ioRuntime, RuntimeMetrics cpuRuntime, Map<String, TaskMonitorStats> taskMonitors) {
    this.ioRuntime = ioRuntime;
    this.cpuRuntime = cpuRuntime;
    this.taskMonitors = Objects.requireNonNull(taskMonitors);
}
Serialization Mismatch

The serialization format for taskMonitors changed from a fixed-order iteration over OperationType.values() to a dynamic map iteration. If a node running the old code (which writes 4 monitors in enum order) sends data to a node running the new code (which reads a variable-length map), or vice versa, deserialization will fail or produce incorrect data. This is a breaking wire format change that requires a version guard or migration strategy.

public NativeExecutorsStats(StreamInput in) throws IOException {
    this.ioRuntime = in.readOptionalWriteable(RuntimeMetrics::new);
    this.cpuRuntime = in.readBoolean() ? new RuntimeMetrics(in) : null;

    int monitorCount = in.readVInt();
    this.taskMonitors = new LinkedHashMap<>(monitorCount);
    for (int i = 0; i < monitorCount; i++) {
        String key = in.readString();
        this.taskMonitors.put(key, new TaskMonitorStats(in));
    }
}

@Override
public void writeTo(StreamOutput out) throws IOException {
    out.writeOptionalWriteable(ioRuntime);
    if (cpuRuntime != null) {
        out.writeBoolean(true);
        cpuRuntime.writeTo(out);
    } else {
        out.writeBoolean(false);
    }
    // Write the number of task monitors followed by each entry
    out.writeVInt(taskMonitors.size());
    for (Map.Entry<String, TaskMonitorStats> entry : taskMonitors.entrySet()) {
        out.writeString(entry.getKey());
        entry.getValue().writeTo(out);
    }
}

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 27, 2026

PR Code Suggestions ✨

Latest suggestions up to 2a99a20

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Replace Java 16 toList() with Java 8 compatible Collectors.toList()

The toList() method was introduced in Java 16, but the plugin targets Java 1.8 (as
seen in PluginInfo construction). Use Collectors.toList() instead to maintain
compatibility with Java 8.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/action/stats/RestDataFusionStatsAction.java [120-122]

 List<String> invalidStats = Arrays.stream(requestedStats)
     .filter(s -> !VALID_STAT_NAMES.contains(s))
-    .toList();
+    .collect(Collectors.toList());
Suggestion importance[1-10]: 9

__

Why: The toList() method is a Java 16 feature, but the plugin targets Java 1.8 (as evidenced by the PluginInfo construction in the test files). Using Collectors.toList() is essential for Java 8 compatibility and prevents compilation/runtime failures.

High
General
Add logging for service initialization failures

The catch block silently swallows the IllegalStateException without logging. This
makes debugging difficult when the service fails to start. Add logging at WARN or
DEBUG level to track when nodes return null stats.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/action/stats/TransportDataFusionStatsAction.java [92-100]

 public DataFusionStatsNodeResponse nodeOperation(DataFusionStatsNodeRequest request) {
     DataFusionStats stats;
     try {
         stats = dataFusionService.getStats();
     } catch (IllegalStateException e) {
-        // DataFusionService not started on this node — return null stats
+        logger.debug("DataFusionService not started on node {}: {}", clusterService.localNode().getId(), e.getMessage());
         stats = null;
     }
     return new DataFusionStatsNodeResponse(clusterService.localNode(), filteredStats(stats, request.getStatsToRetrieve()));
 }
Suggestion importance[1-10]: 6

__

Why: Adding logging for the caught IllegalStateException improves debuggability when DataFusionService is not started. However, this is a minor enhancement since returning null stats is an expected behavior for nodes where the service isn't initialized.

Low
Replace null NodeClient with NoOpNodeClient

Passing null as the NodeClient parameter is risky even though validation happens
before client usage. If the handler implementation changes to use the client
earlier, this will cause a NullPointerException. Use a NoOpNodeClient instead for
safer testing.

sandbox/plugins/analytics-backend-datafusion/src/propertyTest/java/org/opensearch/be/datafusion/action/stats/InvalidStatNameRejectionPropertyTests.java [94-109]

 @Property(tries = 150)
 void invalidStatNameReturnsHttp400WithValidNamesList(
     @ForAll("invalidStatNames") String invalidStat
 ) throws Exception {
     ...
-    handler.handleRequest(request, channel, null);
+    try (NodeClient client = new NoOpNodeClient("property-test")) {
+        handler.handleRequest(request, channel, client);
+    }
Suggestion importance[1-10]: 5

__

Why: Using NoOpNodeClient instead of null is a safer testing practice that prevents potential NullPointerException if the handler implementation changes. However, the current code is correct since validation occurs before client usage, making this a defensive improvement rather than a bug fix.

Low

Previous suggestions

Suggestions up to commit 2a99a20
CategorySuggestion                                                                                                                                    Impact
Possible issue
Replace Java 16 toList() method

The toList() method was introduced in Java 16, but OpenSearch targets Java 11
compatibility. Use Collectors.toList() instead to ensure compatibility with the
project's Java version requirements.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/action/stats/RestDataFusionStatsAction.java [120-122]

 List<String> invalidStats = Arrays.stream(requestedStats)
     .filter(s -> !VALID_STAT_NAMES.contains(s))
-    .toList();
+    .collect(Collectors.toList());
Suggestion importance[1-10]: 9

__

Why: This is a critical compatibility issue. Using .toList() (Java 16+) breaks Java 11 compatibility, which is OpenSearch's target version. This will cause compilation failures on Java 11 environments. The fix is straightforward: replace with .collect(Collectors.toList()).

High
Suggestions up to commit 79f5fd2
CategorySuggestion                                                                                                                                    Impact
General
Ensure consistent immutability across constructors

The deserialization constructor creates an unmodifiable set wrapper, but the primary
constructor stores the input set directly without wrapping. This inconsistency could
lead to unexpected mutability behavior. Ensure both constructors produce the same
immutability guarantees.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/action/stats/DataFusionStatsNodeRequest.java [37-39]

-public DataFusionStatsNodeRequest(StreamInput in) throws IOException {
-    super(in);
-    int size = in.readVInt();
-    if (size == 0) {
-        this.statsToRetrieve = Collections.emptySet();
-    } else {
-        Set<String> stats = new HashSet<>(size);
-        for (int i = 0; i < size; i++) {
-            stats.add(in.readString());
-        }
-        this.statsToRetrieve = Collections.unmodifiableSet(stats);
-    }
+public DataFusionStatsNodeRequest(DataFusionStatsNodesRequest nodesRequest) {
+    this.statsToRetrieve = nodesRequest.getStatsToRetrieve() != null 
+        ? Collections.unmodifiableSet(new HashSet<>(nodesRequest.getStatsToRetrieve()))
+        : Collections.emptySet();
 }
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies an immutability inconsistency between constructors. The deserialization constructor wraps the set in Collections.unmodifiableSet(), while the primary constructor stores the input directly. However, the impact is moderate since getStatsToRetrieve() returns the set directly without defensive copying anyway.

Low
Add defensive copy for map parameter

The constructor now accepts nullable ioRuntime for filtered stats but stores the raw
taskMonitors map reference without defensive copying. If the caller mutates the map
after construction, internal state becomes inconsistent. Make a defensive copy.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/stats/NativeExecutorsStats.java [68-72]

 public NativeExecutorsStats(RuntimeMetrics ioRuntime, RuntimeMetrics cpuRuntime, Map<String, TaskMonitorStats> taskMonitors) {
     this.ioRuntime = ioRuntime;
     this.cpuRuntime = cpuRuntime;
-    this.taskMonitors = Objects.requireNonNull(taskMonitors);
+    this.taskMonitors = Collections.unmodifiableMap(new LinkedHashMap<>(Objects.requireNonNull(taskMonitors)));
 }
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies that the taskMonitors map is stored without defensive copying, which could allow external mutation. However, the getTaskMonitors() method returns the map directly without protection, so the defensive copy alone doesn't fully solve the issue. The impact is moderate.

Low
Use structured error response format

The error response uses string concatenation to build the error message, which
produces a raw list format like [stat1, stat2]. This is inconsistent with standard
OpenSearch error responses. Use structured JSON fields for better client parsing.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/action/stats/RestDataFusionStatsAction.java [120-131]

-List<String> invalidStats = Arrays.stream(requestedStats)
-    .filter(s -> !VALID_STAT_NAMES.contains(s))
-    .toList();
 if (!invalidStats.isEmpty()) {
     return channel -> {
         XContentBuilder builder = channel.newBuilder();
         builder.startObject();
-        builder.field("error", "Invalid stat sections: " + invalidStats + ". Valid values are: " + VALID_STATS);
+        builder.field("error", "Invalid stat sections");
+        builder.field("invalid_sections", invalidStats);
+        builder.field("valid_sections", VALID_STAT_NAMES);
         builder.endObject();
         channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, builder));
     };
 }
Suggestion importance[1-10]: 5

__

Why: The suggestion improves error response structure by using separate JSON fields instead of string concatenation. This enhances client parsing and consistency with OpenSearch patterns. However, the current implementation is functional and the improvement is primarily stylistic.

Low
Return null when filter matches nothing

When all three components (filteredNativeStats, datanodeGate, coordinatorGate) are
null after filtering, the method returns a DataFusionStats with all null fields.
This could cause issues downstream. Consider returning null instead when no stats
match the filter.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/action/stats/TransportDataFusionStatsAction.java [126-170]

-static DataFusionStats filteredStats(DataFusionStats stats, Set<String> filter) {
-    if (stats == null) {
-        return null;
-    }
-    if (filter == null || filter.isEmpty()) {
-        return stats;
-    }
-    ...
-    return new DataFusionStats(filteredNativeStats, datanodeGate, coordinatorGate);
+if (filteredNativeStats == null && datanodeGate == null && coordinatorGate == null) {
+    return null;
 }
+return new DataFusionStats(filteredNativeStats, datanodeGate, coordinatorGate);
Suggestion importance[1-10]: 4

__

Why: The suggestion proposes returning null when all filtered components are null, but this is a design choice rather than a bug. The current behavior of returning a DataFusionStats with all null fields is valid and allows downstream code to distinguish between "no stats available" and "stats filtered to nothing". The impact is low.

Low
Suggestions up to commit c650d10
CategorySuggestion                                                                                                                                    Impact
General
Report all invalid stats together

The validation loop returns immediately on the first invalid stat, but when multiple
invalid stats are provided, only the first is reported. Collect all invalid stats
and report them together to provide complete feedback in a single request-response
cycle.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/action/stats/RestDataFusionStatsAction.java [119-130]

+List<String> invalidStats = new ArrayList<>();
 for (String stat : requestedStats) {
     if (!VALID_STAT_NAMES.contains(stat)) {
-        return channel -> {
-            XContentBuilder builder = channel.newBuilder();
-            builder.startObject();
-            builder.field("error", "Invalid stat sections: [" + stat + "]. Valid values are: " + VALID_STATS);
-            builder.endObject();
-            channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, builder));
-        };
+        invalidStats.add(stat);
     }
 }
+if (!invalidStats.isEmpty()) {
+    return channel -> {
+        XContentBuilder builder = channel.newBuilder();
+        builder.startObject();
+        builder.field("error", "Invalid stat sections: " + invalidStats + ". Valid values are: " + VALID_STATS);
+        builder.endObject();
+        channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, builder));
+    };
+}
Suggestion importance[1-10]: 7

__

Why: Collecting and reporting all invalid stat names in a single response improves user experience by providing complete feedback. This is a meaningful usability improvement that reduces the number of request-response cycles needed to identify all validation errors.

Medium
Add logging for service unavailability

The catch block silently swallows the IllegalStateException without logging. When a
service fails to start on a node, operators need visibility into which nodes have
issues. Add logging at WARN or DEBUG level to track service availability problems
across the cluster.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/action/stats/TransportDataFusionStatsAction.java [93-101]

 DataFusionStats stats;
 try {
     stats = dataFusionService.getStats();
 } catch (IllegalStateException e) {
-    // DataFusionService not started on this node — return null stats
+    logger.debug("DataFusionService not started on node {}: {}", clusterService.localNode().getId(), e.getMessage());
     stats = null;
 }
 return new DataFusionStatsNodeResponse(clusterService.localNode(), filteredStats(stats, request.getStatsToRetrieve()));
Suggestion importance[1-10]: 6

__

Why: Adding logging for IllegalStateException when DataFusionService is not started improves observability and helps operators diagnose cluster issues. However, this is a minor enhancement for debugging rather than a critical fix.

Low
Simplify null check logic

The nodeIdParam null check is redundant because RestRequest.param() returns null
when the parameter is absent, and Strings.splitStringByCommaToArray() handles null
input by returning an empty array. Simplify by removing the explicit null/empty
check and directly passing the parameter to the split function.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/action/stats/RestDataFusionStatsAction.java [104-110]

-String[] nodeIds;
 String nodeIdParam = request.param("nodeId");
-if (nodeIdParam != null && !nodeIdParam.isEmpty()) {
-    nodeIds = Strings.splitStringByCommaToArray(nodeIdParam);
-} else {
-    nodeIds = new String[0];
-}
+String[] nodeIds = nodeIdParam != null && !nodeIdParam.isEmpty()
+    ? Strings.splitStringByCommaToArray(nodeIdParam)
+    : new String[0];
Suggestion importance[1-10]: 3

__

Why: The suggestion proposes a minor refactoring to simplify null check logic, but the 'improved_code' is essentially equivalent to the 'existing_code' and doesn't provide meaningful improvement. The change is cosmetic and doesn't impact correctness or maintainability significantly.

Low
Suggestions up to commit c650d10
CategorySuggestion                                                                                                                                    Impact
General
Omit nodes with null stats

When nodeResponse.getStats() is null, the node entry becomes an empty object {}.
This may confuse API consumers who expect either stats fields or an explicit
indicator of unavailability. Consider adding a status field or omitting the node
entry entirely when stats are unavailable.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/action/stats/DataFusionStatsNodesResponse.java [88-98]

 builder.startObject("nodes");
 for (DataFusionStatsNodeResponse nodeResponse : getNodes()) {
-    builder.startObject(nodeResponse.getNode().getId());
-
     if (nodeResponse.getStats() != null) {
+        builder.startObject(nodeResponse.getNode().getId());
         nodeResponse.getStats().toXContent(builder, params);
+        builder.endObject();
     }
-
-    builder.endObject();
 }
 builder.endObject();
Suggestion importance[1-10]: 5

__

Why: This is a valid design consideration. However, the current behavior (empty object for unavailable stats) is a deliberate choice that maintains consistent node presence in the response. The suggestion changes API semantics without clear justification that it's superior.

Low
Simplify nodeId parameter parsing logic

The nodeIdParam null check is redundant because RestRequest.param() returns null
when the parameter is absent, and Strings.splitStringByCommaToArray() handles null
input by returning an empty array. Simplify the logic to avoid unnecessary
branching.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/action/stats/RestDataFusionStatsAction.java [104-110]

-String[] nodeIds;
 String nodeIdParam = request.param("nodeId");
-if (nodeIdParam != null && !nodeIdParam.isEmpty()) {
-    nodeIds = Strings.splitStringByCommaToArray(nodeIdParam);
-} else {
-    nodeIds = new String[0];
-}
+String[] nodeIds = (nodeIdParam != null && !nodeIdParam.isEmpty())
+    ? Strings.splitStringByCommaToArray(nodeIdParam)
+    : new String[0];
Suggestion importance[1-10]: 4

__

Why: The suggestion correctly identifies redundant null checking, but the improvement is marginal. The existing code is clear and explicit, while the suggested ternary operator doesn't significantly improve readability or performance.

Low
Broaden exception handling for robustness

Catching only IllegalStateException may miss other runtime exceptions from
getStats(). If the service throws unexpected exceptions, they will propagate and
fail the entire node response. Consider catching broader exception types or logging
the exception before returning null stats.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/action/stats/TransportDataFusionStatsAction.java [93-100]

 DataFusionStats stats;
 try {
     stats = dataFusionService.getStats();
-} catch (IllegalStateException e) {
-    // DataFusionService not started on this node — return null stats
+} catch (Exception e) {
+    // DataFusionService not started or encountered error — return null stats
     stats = null;
 }
 return new DataFusionStatsNodeResponse(clusterService.localNode(), filteredStats(stats, request.getStatsToRetrieve()));
Suggestion importance[1-10]: 3

__

Why: While catching broader exceptions could improve robustness, the suggestion lacks specificity about which exceptions to handle. The current IllegalStateException catch is intentional for the "service not started" case. Catching all exceptions without logging could hide real errors.

Low
Suggestions up to commit c650d10
CategorySuggestion                                                                                                                                    Impact
General
Report all invalid stat names together

The error message only reports the first invalid stat name encountered, which can be
confusing when multiple invalid names are provided. Collect all invalid stat names
and report them together in a single error message for better user experience.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/action/stats/RestDataFusionStatsAction.java [119-129]

+List<String> invalidStats = new ArrayList<>();
 for (String stat : requestedStats) {
     if (!VALID_STAT_NAMES.contains(stat)) {
-        return channel -> {
-            XContentBuilder builder = channel.newBuilder();
-            builder.startObject();
-            builder.field("error", "Invalid stat sections: [" + stat + "]. Valid values are: " + VALID_STATS);
-            builder.endObject();
-            channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, builder));
-        };
+        invalidStats.add(stat);
     }
 }
+if (!invalidStats.isEmpty()) {
+    return channel -> {
+        XContentBuilder builder = channel.newBuilder();
+        builder.startObject();
+        builder.field("error", "Invalid stat sections: " + invalidStats + ". Valid values are: " + VALID_STATS);
+        builder.endObject();
+        channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, builder));
+    };
+}
Suggestion importance[1-10]: 7

__

Why: This is a valuable UX improvement that would help users identify all invalid stat names at once rather than requiring multiple round-trips. The suggestion correctly identifies a usability issue and provides a reasonable solution.

Medium
Simplify nodeId parameter parsing logic

The nodeIdParam null check is redundant because RestRequest.param() returns null
when the parameter is absent, and Strings.splitStringByCommaToArray() already
handles empty strings correctly by returning an empty array. Simplify the logic to
reduce unnecessary branching.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/action/stats/RestDataFusionStatsAction.java [104-110]

-String[] nodeIds;
 String nodeIdParam = request.param("nodeId");
-if (nodeIdParam != null && !nodeIdParam.isEmpty()) {
-    nodeIds = Strings.splitStringByCommaToArray(nodeIdParam);
-} else {
-    nodeIds = new String[0];
-}
+String[] nodeIds = (nodeIdParam != null) 
+    ? Strings.splitStringByCommaToArray(nodeIdParam) 
+    : new String[0];
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies redundant null checking logic. However, the improvement is minor since the original code is clear and readable. The ternary operator doesn't significantly improve maintainability.

Low
Broaden exception handling for robustness

Catching only IllegalStateException may miss other runtime exceptions from
getStats(). If the service throws unexpected exceptions, they will propagate and
potentially crash the node operation. Consider catching broader exception types or
logging the exception before returning null stats.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/action/stats/TransportDataFusionStatsAction.java [93-100]

 DataFusionStats stats;
 try {
     stats = dataFusionService.getStats();
-} catch (IllegalStateException e) {
-    // DataFusionService not started on this node — return null stats
+} catch (Exception e) {
+    // DataFusionService not available or failed — return null stats
     stats = null;
 }
 return new DataFusionStatsNodeResponse(clusterService.localNode(), filteredStats(stats, request.getStatsToRetrieve()));
Suggestion importance[1-10]: 3

__

Why: While broader exception handling could improve robustness, catching all Exception types may mask programming errors. The current IllegalStateException catch is intentional for the specific case when DataFusionService is not started. The suggestion lacks logging which would be important if implemented.

Low

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for c650d10: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Comment on lines +58 to +61
public static final String VALID_STATS = "io_runtime, cpu_runtime, coordinator_reduce, "
+ "query_execution, stream_next, plan_setup, datanode_gate, coordinator_gate";

private static final Set<String> VALID_STAT_NAMES = Set.of(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we derive VALID_STATS from VALID_STAT_NAMES so there is single place where edits need to done for adding stats?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Derived in next commit on same PR

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit c650d10

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for c650d10: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit c650d10

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for c650d10: SUCCESS

@codecov
Copy link
Copy Markdown

codecov Bot commented May 28, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 73.36%. Comparing base (0c60e50) to head (79f5fd2).
⚠️ Report is 31 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main   #21846      +/-   ##
============================================
+ Coverage     73.34%   73.36%   +0.01%     
+ Complexity    75417    75392      -25     
============================================
  Files          6032     6030       -2     
  Lines        342404   342308      -96     
  Branches      49235    49231       -4     
============================================
- Hits         251142   251139       -3     
+ Misses        71272    71234      -38     
+ Partials      19990    19935      -55     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Copy Markdown
Contributor

@Bukhtawar Bukhtawar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add ITs for node and cluster level?

Signed-off-by: Ajay Raj Nelapudi <ajnelapu@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 79f5fd2

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 79f5fd2: SUCCESS

Copy link
Copy Markdown
Contributor

@himshikhagupta himshikhagupta left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Signed-off-by: Ajay Raj Nelapudi <ajnelapu@amazon.com>
@AjayRajNelapudi AjayRajNelapudi force-pushed the feature/df-node-level-stats branch from 76cca8e to 2a99a20 Compare May 31, 2026 10:01
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 2a99a20

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 2a99a20: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 2a99a20

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 2a99a20: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants