Skip to content

Add data-format-aware stats APIs (Plugin.nodeStats + dataformat_stats endpoint)#21860

Draft
ask-kamal-nayan wants to merge 3 commits into
opensearch-project:mainfrom
ask-kamal-nayan:pr1-stats
Draft

Add data-format-aware stats APIs (Plugin.nodeStats + dataformat_stats endpoint)#21860
ask-kamal-nayan wants to merge 3 commits into
opensearch-project:mainfrom
ask-kamal-nayan:pr1-stats

Conversation

@ask-kamal-nayan
Copy link
Copy Markdown
Contributor

@ask-kamal-nayan ask-kamal-nayan commented May 27, 2026

Description

Summary

Adds runtime visibility into the composite data-format engine via per-format stats APIs. Each registered data format exposes its own pair of REST endpoints, discovered automatically
via a small SPI. Open for extension (new format = new provider), closed for modification (composite-engine never references concrete formats).

API mirrors OpenSearch's standard _stats conventions — same parameter names, same JSON shape, same IndicesOptions semantics — so existing tooling works unchanged.

API surface

Endpoint Purpose
GET /_plugins/{format}/{index}/_stats Per-index stats (use ?level=shards for per-shard breakdown)
GET /_plugins/{format}/_nodes/_stats Aggregate across all nodes
GET /_plugins/{format}/_nodes/{nodeId}/_stats Aggregate for a specific node

{format} is parquet or lucene today.

Query params: level=index|shards, shards=0,2,5, nodes=node1,node2 or _local, ignore_unavailable, allow_no_indices, expand_wildcards. Standard ?filter_path works on
all responses.

Sample — parquet

GET /_plugins/parquet/myindex/_stats

{
  "_shards": { "total": 1, "successful": 1, "failed": 0 },
  "format": "parquet",
  "indices": {
    "myindex": {
      "indexing":     { "docs_indexed_total": 12345, "index_time_millis": 234 },
      "vsr":          { "vsr_rotations_total": 4 },
      "native_write": { "native_write_total": 4, "native_finalize_total": 4, "native_write_failures": 0 },
      "merge":        { "merge_total": 1, "merge_input_files_total": 4, "merge_output_rows_total": 12345, "merge_failures": 0 }
    }
  }
}

Sample — lucene

GET /_plugins/lucene/myindex/_stats

{
  "_shards": { "total": 1, "successful": 1, "failed": 0 },
  "format": "lucene",
  "indices": {
    "myindex": {
      "indexing": { "docs_indexed_total": 12345, "docs_indexed_failures": 0, "index_time_millis": 198 },
      "refresh":  { "refresh_total": 8, "refresh_segments_incorporated_total": 12 },
      "flush":    { "flush_total": 3, "flush_time_millis": 510 },
      "commit":   { "commit_total": 3, "commit_time_millis": 412 },
      "merge":    { "merge_total": 2, "merge_failures": 0 }
    }
  }
}

Shard-level (?level=shards) wraps per-shard routing in a routing sub-object (state, primary, node, relocating_node) matching the standard _stats shape.

Architecture

  • plugin-stats-spi (sandbox/libs/plugin-stats-spi/) — defines DataFormatStatsProvider and a JVM-wide DataFormatStatsProviderRegistry. Generic over each format's typed shard-stats
    class so aggregation is type-safe end-to-end.
  • Per-format providers — ParquetStatsProvider, LuceneStatsProvider register with the SPI on construction. Engines self-register per-shard LongAdder-backed trackers; the registry is the
    single source of truth.
  • Auto-discovery — CompositeDataFormatPlugin.getRestHandlers() iterates registry.all() and registers REST + transport actions per provider. Zero hardcoded format names.
  • Format-owned aggregation — coordinator merges via T.add(other); each format defines per-counter merge rules (sum vs. max vs. min) instead of blind JSON deep-merge.
  • Primary-only filter — DFA replicas have no indexing engine; broadcast restricts to primaries so _shards.total matches index.number_of_shards.

GET /_plugins/orc/{index}/_stats and GET /_plugins/orc/_nodes/_stats work automatically — no edits to composite-engine.

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 27, 2026

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit 8effa56.

PathLineSeverityDescription
sandbox/plugins/analytics-backend-lucene/build.gradle13highMandatory flag: extendedPlugins modified to add 'composite-engine', changing the plugin classloader dependency chain. Maintainers must verify the intended plugin extension.
sandbox/plugins/analytics-backend-lucene/build.gradle31highMandatory flag: New dependency added — compileOnly project(':sandbox:libs:plugin-stats-spi'). Artifact authenticity cannot be verified from the diff alone.
sandbox/plugins/composite-engine/build.gradle42highMandatory flag: New runtime dependency added — implementation project(':sandbox:libs:plugin-stats-spi'). Runtime scope broadens attack surface compared to compileOnly.
sandbox/plugins/parquet-data-format/build.gradle12highMandatory flag: extendedPlugins modified to add 'composite-engine'. Changes classloader hierarchy for the parquet plugin.
sandbox/plugins/parquet-data-format/build.gradle22highMandatory flag: New dependency added — compileOnly project(':sandbox:libs:plugin-stats-spi'). Maintainers must verify artifact integrity.
sandbox/plugins/composite-engine/build.gradle62highMandatory flag: New dependency added — internalClusterTestImplementation project(':modules:transport-netty4'). Introduces a network transport module into the test classpath.
sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/stats/ParquetStatsProvider.java107mediumaggregateNodeStats() catches broad Exception and silently swallows it after logging a warning. If RustBridge.collectRuntimeMetrics() throws due to a compromised or malfunctioning native library, the failure is suppressed and callers receive incomplete stats with no error signal.
sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/stats/ParquetStatsProvider.java45mediumSingleton INSTANCE set with a non-atomic check-then-set pattern (if INSTANCE == null). In concurrent plugin initialization scenarios this could result in two providers being active, with one silently discarded, causing tracker registrations to go to the wrong instance.
sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/merge/LuceneMerger.java75lowUses reflection to access the private segmentInfos field of IndexWriter. While this is an existing pattern in the codebase, reflective private-field access bypasses encapsulation and could be leveraged to manipulate internal Lucene state if the calling path is not carefully controlled.

The table above displays the top 10 most important findings.

Total: 9 | Critical: 0 | High: 6 | Medium: 2 | Low: 1


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@ask-kamal-nayan ask-kamal-nayan force-pushed the pr1-stats branch 11 times, most recently from f82afe3 to 37d72ac Compare May 28, 2026 09:57
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 28, 2026

PR Reviewer Guide 🔍

(Review updated until commit afd9e13)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Possible Issue

The addDoc method increments docCount after a failure (line 244) and sets state to PENDING_ROLLBACK, but then returns a WriteResult.Failure. If the caller retries the same rowId, the check at line 236 (input.getRowId() != docCount) will fail because docCount was already incremented. This breaks the rowId-docCount invariant for retries after a failure, causing subsequent writes to be rejected even though the failed doc was never successfully written.

public WriteResult addDoc(LuceneDocumentInput input) throws IOException {
    long start = System.nanoTime();
    try {
        if (state != WriterState.ACTIVE) {
            throw new IllegalStateException("addDoc requires ACTIVE state but was " + state);
        }
        // Defense-in-depth: CompositeWriter enforces rowId == docCount at the multiplexer
        // layer, but we re-check here so a single-format Lucene path is also protected.
        if (input.getRowId() != docCount) {
            throw new IllegalStateException("rowId [" + input.getRowId() + "] does not match doc count [" + docCount + "]");
        }
        try {
            indexWriter.addDocument(input.getFinalInput());
        } catch (IOException | IllegalArgumentException e) {
            // Lucene's IndexWriter may have consumed a docId before throwing; advance our
            // counter to match so rollbackTo can tombstone the partial slot, and
            // retire to preserve the docId == rowId invariant for subsequent writes.
            docCount++;
            state = WriterState.PENDING_ROLLBACK;
            stats.incDocsIndexedFailures();
            return new WriteResult.Failure(e, -1L, -1L, -1L);
        }
        long currentDocId = docCount;
        docCount++;
        stats.addDocsIndexed(1);
        return new WriteResult.Success(1L, 1L, currentDocId);
    } finally {
        stats.addIndexTimeMillis(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
    }
}
Possible Issue

The flush method wraps the entire flush logic in a try-finally that calls stats.incFlushTotal() and stats.addFlushTimeMillis(...) (lines 445-447). If an exception is thrown before the flush completes (e.g., during indexWriter.flush() at line 318 or indexWriter.forceMerge(...) at line 361), the flush counter is still incremented even though the flush did not succeed. This inflates the flush-total metric and misrepresents actual successful flushes.

        return FileInfos.empty();
    }

    long flushStart = System.nanoTime();
    try {
        long flushStartNanos = System.nanoTime();
        logger.info(
            "flush: START generation={}, docCount={}, hasRowIdMapping={}",
            writerGeneration,
            docCount,
            flushInput.hasRowIdMapping()
        );
        indexWriter.flush();

        // If sort permutation is provided, configure the reorder merge policy
        if (flushInput.hasRowIdMapping()) {
            // RowIdMapping shouldn't be available if index has sort configurations.
            Sort configuredIndexSort = indexWriter.getConfig().getIndexSort();
            if (configuredIndexSort != null) {
                throw new IllegalStateException(
                    "RowIdMapping should not be available when child IndexWriter is configured with IndexSort ["
                        + configuredIndexSort
                        + "] for writer generation ["
                        + writerGeneration
                        + "]"
                );
            }
            RowIdMapping mapping = flushInput.rowIdMapping();
            if (mapping.size() != docCount) {
                throw new IllegalStateException(
                    "RowIdMapping size ["
                        + mapping.size()
                        + "] does not match document count ["
                        + docCount
                        + "] for writer generation ["
                        + writerGeneration
                        + "]"
                );
            }
            configureSortedMerge(mapping, state == WriterState.RETIRED_FLUSHABLE);
        } else if (indexWriter.getConfig().getIndexSort() != null) {
            // Lucene is primary with IndexSort: Lucene natively reorders docs during
            // forceMerge, but __row_id__ values were assigned at insertion time and will
            // be scrambled after sort. Force a merge so the codec rewrites row IDs to
            // sequential 0..N-1 in the final doc order.
            if (flushInput.hasRowIdMapping()) {
                throw new IllegalStateException(
                    "RowIdMapping must not be provided when IndexSort is configured for writer generation [" + writerGeneration + "]"
                );
            }
        }

        // Common path: forceMerge to 1 segment, commit, build FileInfos
        long forceMergeStartNanos = System.nanoTime();
        try {
            indexWriter.forceMerge(1, true);
        } finally {
            stats.addFlushForceMergeTimeMillis(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - forceMergeStartNanos));
        }
        long forceMergeDurationMs = TimeValue.nsecToMSec(System.nanoTime() - forceMergeStartNanos);
        logger.info(
            "flush: forceMerge complete: generation={}, docCount={}, duration={}ms",
            writerGeneration,
            docCount,
            forceMergeDurationMs
        );

        long commitStartNanos = System.nanoTime();
        indexWriter.commit();
        long commitDurationMs = TimeValue.nsecToMSec(System.nanoTime() - commitStartNanos);
        logger.info("flush: commit complete: generation={}, duration={}ms", writerGeneration, commitDurationMs);

        // Close the IndexWriter before rewriting segment metadata.
        // This prevents IndexFileDeleter from removing our rewritten segments_N
        // file (which it wouldn't recognize as its own commit).
        indexWriter.close();

        // Verify the invariant: exactly 1 segment with docCount documents
        SegmentInfos segmentInfos = SegmentInfos.readLatestCommit(directory);
        assert segmentInfos.size() == 1 : "Expected exactly 1 segment after force merge, got " + segmentInfos.size();

        SegmentCommitInfo segmentInfo = segmentInfos.info(0);
        // After flush the segment must contain exactly docCount live docs. Any tombstones
        // from rollbackTo are expunged by the reordering forceMerge (secondary path)
        // or by LogByteSizeMergePolicy's forceMerge (primary-with-indexSort path).
        // TODO: this assertion will trip if Lucene is configured as the primary format
        // without an IndexSort and a rollbackTo has run — the no-sort/no-mapping
        // branch uses NoMergePolicy, so forceMerge is a no-op and the tombstone remains.
        // Production never wires Lucene as primary without IndexSort, but tests that do
        // need to either configure index.sort.field or avoid the rollback path.
        assert segmentInfo.info.maxDoc() == docCount : "Expected " + docCount + " docs in segment, got " + segmentInfo.info.maxDoc();

        // Invariant: ___row_id__ doc values must be sequential 0..maxDoc-1 after forceMerge.
        // This holds in all cases:
        // - Lucene secondary: docs reordered via OneMerge.reorder() + row ID rewrite
        // - Lucene primary with IndexSort: Lucene sorts natively + row ID rewrite
        // - No sort: docs added sequentially, row IDs naturally sequential
        // Wrapped in `assert` so the I/O cost is paid only when assertions are enabled.
        assert assertRowIdsSequential(directory) : "___row_id__ doc values not sequential after forceMerge for writer generation ["
            + writerGeneration
            + "]";

        // Stamp the IndexSort on the segment metadata post-commit so that
        // addIndexes(Directory...) on the shared writer sees matching sort.
        // The segment is always sorted by __row_id__ — either naturally (docs
        // written sequentially) or via OneMerge.reorder() + row ID rewrite.
        if (flushInput.hasRowIdMapping()) {
            logger.debug("Overriding segment info manually");
            rewriteSegmentInfoWithSort(segmentInfos, segmentInfo);
        }

        // Build the WriterFileSet pointing to the temp directory
        WriterFileSet.Builder wfsBuilder = WriterFileSet.builder()
            .directory(tempDirectory)
            .writerGeneration(writerGeneration)
            .addNumRows(docCount);

        // Add all files in the segment
        for (String file : directory.listAll()) {
            if (file.startsWith("segments") == false && file.equals("write.lock") == false) {
                wfsBuilder.addFile(file);
            }
        }

        directory.close();
        flushed = true;

        long totalFlushDurationMs = TimeValue.nsecToMSec(System.nanoTime() - flushStartNanos);
        logger.info(
            "flush: DONE generation={}, totalRows={}, forceMerge={}ms, commit={}ms, total={}ms",
            writerGeneration,
            docCount,
            forceMergeDurationMs,
            commitDurationMs,
            totalFlushDurationMs
        );

        return FileInfos.builder().putWriterFileSet(dataFormat, wfsBuilder.build()).build();
    } finally {
        stats.incFlushTotal();
        stats.addFlushTimeMillis(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - flushStart));
    }
}
Possible Issue

The merge method increments stats.incMergeTotal() at line 165 inside the try block, before the catch that increments stats.incMergeFailures() at line 168. If the merge throws an exception, both counters are incremented. This double-counts failed merges in the total, making the total-merge counter unreliable (it should represent successful merges only, or be documented to include failures).

public MergeResult merge(MergeInput mergeInput) throws IOException {
    long start = System.nanoTime();
    try {
        RowIdMapping rowIdMapping = mergeInput.rowIdMapping();
        List<Segment> segments = mergeInput.segments();

        if (segments.isEmpty()) {
            return new MergeResult(Map.of());
        }

        Set<Long> generationsToMerge = new HashSet<>();
        for (Segment segment : segments) {
            generationsToMerge.add(segment.generation());
        }

        SegmentInfos segmentInfos;
        try {
            segmentInfos = (SegmentInfos) SEGMENT_INFOS_FIELD.get(indexWriter);
        } catch (IllegalAccessException e) {
            throw new IOException("Failed to access IndexWriter segmentInfos via reflection", e);
        }

        if (segmentInfos.size() == 0) {
            logger.warn("No segments in IndexWriter — skipping merge");
            return new MergeResult(Map.of());
        }

        List<SegmentCommitInfo> matchingSegments = findMatchingSegments(segmentInfos, generationsToMerge);

        if (matchingSegments.isEmpty()) {
            throw new IOException(
                "No Lucene segments found matching writer generations "
                    + generationsToMerge
                    + " — segments may have been consumed by a concurrent merge"
            );
        }

        logger.debug(
            "LuceneMerger: merging {} segments (generations {}) using merge(OneMerge) + IndexSort",
            matchingSegments.size(),
            generationsToMerge
        );

        // Delegate OneMerge creation to the strategy (primary vs secondary behavior).
        // For the secondary path, the returned RowIdRemappingOneMerge stamps the
        // writer_generation attribute onto the merged SegmentInfo via setMergeInfo, which
        // Lucene invokes immediately before codec.segmentInfoFormat().write(...) — so the
        // attribute is persisted to the .si file and survives a writer reopen.
        MergePolicy.OneMerge oneMerge = strategy.createOneMerge(matchingSegments, rowIdMapping, mergeInput.newWriterGeneration());
        indexWriter.executeMerge(oneMerge, mergeInput.newWriterGeneration());

        // Build the merged WriterFileSet from the output segment info
        SegmentCommitInfo mergedInfo = oneMerge.getMergeInfo();
        WriterFileSet mergedFileSet = buildMergedFileSet(mergedInfo, mergeInput.newWriterGeneration());

        // Delegate RowIdMapping production to the strategy
        RowIdMapping outputMapping = strategy.buildRowIdMapping(oneMerge, mergeInput);

        logger.debug(
            "LuceneMerger: completed merge of {} segments at generation {} ({} docs, {} files)",
            matchingSegments.size(),
            mergeInput.newWriterGeneration(),
            oneMerge.getMergeInfo().info.maxDoc(),
            oneMerge.getMergeInfo().files().size()
        );

        stats.incMergeTotal();
        return new MergeResult(Map.of(dataFormat, mergedFileSet), outputMapping);
    } catch (IOException e) {
        stats.incMergeFailures();
        throw e;
    } finally {
        stats.addMergeTimeMillis(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
    }
}
Possible Issue

The awaitPendingWrite method increments stats.incBackgroundWriteTotal() at line 395 inside the try block, before the catch clauses that increment stats.incBackgroundWriteTimeouts() or stats.incBackgroundWriteFailures(). If the write times out or fails, both the total counter and the failure/timeout counter are incremented. This double-counts failed writes in the total, making the total counter misleading (it should represent successful writes only, or be documented to include failures).

    if (pendingWrite == null) {
        return;
    }
    long startNanos = System.nanoTime();
    try {
        if (timeoutSeconds > 0) {
            pendingWrite.get(timeoutSeconds, TimeUnit.SECONDS);
        } else {
            pendingWrite.get();
        }
        stats.incBackgroundWriteTotal();
    } catch (TimeoutException e) {
        stats.incBackgroundWriteTimeouts();
        if (ignoreTimeout) {
            logger.warn("Timed out waiting for background VSR write for {}", fileName);
        } else {
            throw new IOException("Timed out waiting for background VSR write for " + fileName, e);
        }
    } catch (Exception e) {
        stats.incBackgroundWriteFailures();
        throw new IOException("Background VSR write failed for " + fileName, e.getCause());
    } finally {
        long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
        stats.addBackgroundWriteWaitMillis(elapsed);
        pendingWrite = null;
    }
}
Possible Issue

The refresh method increments stats.incRefreshTotal() and stats.addRefreshTimeMillis(...) in the finally block (lines 372-374), even if the refresh throws an exception before completing. If sharedWriter.addIndexes(...) at line 313 or DirectoryReader.open(sharedWriter) at line 335 throws, the refresh counter is still incremented despite the refresh not succeeding. This inflates the refresh-total metric and misrepresents actual successful refreshes.

        return new RefreshResult(List.of());
    }

    long refreshStart = System.nanoTime();
    try {
        List<Segment> resultSegments = new ArrayList<>(refreshInput.existingSegments());

        // Collect all source directories and their paths for a single batched addIndexes call
        List<Directory> sourceDirectories = new ArrayList<>();
        Set<Long> writerGenerations = new HashSet<>();

        for (Segment segment : refreshInput.writerFiles()) {
            WriterFileSet wfs = segment.dfGroupedSearchableFiles().get(LuceneDataFormat.LUCENE_FORMAT_NAME);
            if (wfs == null) {
                continue;
            }

            Path dirPath = Path.of(wfs.directory());
            if (Files.isDirectory(dirPath) == false) {
                logger.warn("Lucene writer directory does not exist: {}", dirPath);
                continue;
            }

            sourceDirectories.add(new HardlinkCopyDirectoryWrapper(new MMapDirectory(dirPath)));
            writerGenerations.add(wfs.writerGeneration());
        }

        // Single batched addIndexes call for all source directories
        if (sourceDirectories.isEmpty() == false) {
            long addIndexesStart = System.nanoTime();
            try {
                sharedWriter.addIndexes(sourceDirectories.toArray(new Directory[0]));
                logger.debug(
                    "Incorporated {} Lucene segments into shared writer in a single addIndexes call",
                    sourceDirectories.size()
                );
            } finally {
                stats.addRefreshAddIndexesTimeMillis(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - addIndexesStart));
                // Close all source directories
                for (Directory dir : sourceDirectories) {
                    try {
                        dir.close();
                    } catch (IOException e) {
                        logger.warn("Failed to close source directory after addIndexes", e);
                    }
                }
            }

            // After addIndexes, open an NRT reader to discover the actual file names
            // for the newly added segments. Lucene renames files during addIndexes,
            // so the original temp directory file names are no longer valid.
            Path sharedDir = store.shardPath().resolveIndex();

            try (DirectoryReader reader = DirectoryReader.open(sharedWriter)) {
                List<LeafReaderContext> leaves = reader.leaves();

                for (int i = 0; i < leaves.size(); i++) {
                    LeafReaderContext ctx = leaves.get(i);
                    if (ctx.reader() instanceof SegmentReader segReader) {
                        SegmentCommitInfo segInfo = segReader.getSegmentInfo();
                        String genAttr = segInfo.info.getAttribute(LuceneWriter.WRITER_GENERATION_ATTRIBUTE);
                        if (genAttr == null) {
                            continue;
                        }

                        long writerGen = Long.parseLong(genAttr);
                        if (!writerGenerations.contains(writerGen)) {
                            continue;
                        }
                        long numDocs = segReader.maxDoc();

                        WriterFileSet.Builder wfsBuilder = WriterFileSet.builder()
                            .directory(sharedDir)
                            .writerGeneration(writerGen)
                            .addNumRows(numDocs);

                        for (String file : segInfo.files()) {
                            wfsBuilder.addFile(file);
                        }

                        resultSegments.add(Segment.builder(writerGen).addSearchableFiles(dataFormat, wfsBuilder.build()).build());
                        writerGenerations.remove(writerGen);
                        stats.incRefreshSegmentsIncorporatedTotal();
                    }
                }
            }
            assert writerGenerations.isEmpty() : "Could not get segments from all writers";
        }

        return new RefreshResult(List.copyOf(resultSegments));
    } finally {
        stats.incRefreshTotal();
        stats.addRefreshTimeMillis(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - refreshStart));
    }
}

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 28, 2026

PR Code Suggestions ✨

Latest suggestions up to afd9e13

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
General
Fix counter increment timing

The success counter incNativeWriteTotal() is incremented before verifying the write
succeeded. If an exception occurs after incrementing but before the method returns,
the counter will be incorrect. Move the success increment to after the write
completes successfully.

sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/bridge/NativeParquetWriter.java [104-118]

 public void write(long arrayAddress, long schemaAddress) throws IOException {
     if (initialized == false) {
         throw new IllegalStateException("Writer not initialized: " + filePath);
     }
     long startNanos = System.nanoTime();
     try {
         RustBridge.write(filePath, arrayAddress, schemaAddress);
-        stats.incNativeWriteTotal();
     } catch (IOException e) {
         stats.incNativeWriteFailures();
         throw e;
     } finally {
         long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
         stats.addNativeWriteTimeMillis(elapsed);
     }
+    stats.incNativeWriteTotal();
 }
Suggestion importance[1-10]: 9

__

Why: This is a critical correctness issue. The success counter incNativeWriteTotal() is incremented inside the try block before the write completes, meaning it will be incremented even if an exception occurs. This leads to incorrect statistics. Moving the increment outside the try-catch ensures it only counts actual successes.

High
Move failure counter increment first

The failure counter is incremented after docCount++ and state transition, but if an
exception occurs during these operations, the counter may not be incremented. Move
stats.incDocsIndexedFailures() to the first line of the catch block to ensure it
always executes before any other failure-handling logic.

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneWriter.java [240-247]

 } catch (IOException | IllegalArgumentException e) {
+    stats.incDocsIndexedFailures();
     // Lucene's IndexWriter may have consumed a docId before throwing; advance our
     // counter to match so rollbackTo can tombstone the partial slot, and
     // retire to preserve the docId == rowId invariant for subsequent writes.
     docCount++;
     state = WriterState.PENDING_ROLLBACK;
-    stats.incDocsIndexedFailures();
     return new WriteResult.Failure(e, -1L, -1L, -1L);
 }
Suggestion importance[1-10]: 7

__

Why: Moving stats.incDocsIndexedFailures() to the first line of the catch block ensures the failure counter is always incremented before any other operations that might themselves throw exceptions. This improves the robustness of failure tracking.

Medium
Track cancelled writes as failures

When a background write is cancelled, the failure counter is not incremented,
creating an inconsistency in failure tracking. Increment
stats.incBackgroundWriteFailures() in the CANCELLED branch to ensure all background
write failures are counted.

sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/vsr/VSRManager.java [191-198]

 if (pendingWrite != null && pendingWrite.isDone()) {
     Future.State state = pendingWrite.state();
     if (state == Future.State.FAILED) {
         stats.incBackgroundWriteFailures();
         throw new IllegalStateException(pendingWrite.exceptionNow());
     } else if (state == Future.State.CANCELLED) {
+        stats.incBackgroundWriteFailures();
         throw new IllegalStateException("Background write was cancelled");
     }
 }
Suggestion importance[1-10]: 6

__

Why: Incrementing the failure counter for cancelled background writes ensures consistency in failure tracking. A cancelled write is effectively a failure from the stats perspective, so this change improves the completeness of the failure metrics.

Low
Add null check in aggregation

The add method does not validate that other is non-null before accessing its fields.
Add a null check at the start of the method to prevent NullPointerException when
aggregating stats across shards.

sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/stats/ParquetShardStats.java [237-261]

 public ParquetShardStats add(ParquetShardStats other) {
+    if (other == null) {
+        return this;
+    }
     return new ParquetShardStats(
         this.docsIndexedTotal + other.docsIndexedTotal,
         this.indexTimeMillis + other.indexTimeMillis,
         ...
     );
 }
Suggestion importance[1-10]: 6

__

Why: Adding a null check in the add method prevents potential NullPointerException when aggregating stats. This is a defensive programming practice that improves robustness, though the current usage may not pass null values.

Low
Add logging for missing provider

If the provider is null during engine construction, stats tracking silently fails
for the entire shard lifecycle. This could happen if plugin initialization order is
non-deterministic. Log a warning to aid debugging when the provider is unexpectedly
absent.

sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/engine/ParquetIndexingEngine.java [221-227]

 ParquetStatsProvider provider = (ParquetStatsProvider) DataFormatStatsProviderRegistry.INSTANCE.get(
     ParquetStatsProvider.FORMAT_NAME
 );
 if (provider != null) {
     provider.register(shardPath.getShardId(), statsTracker);
+} else {
+    logger.warn("ParquetStatsProvider not found during engine construction for shard {}", shardPath.getShardId());
 }
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies that silent failure when the provider is null could make debugging difficult. Adding a warning log would help diagnose initialization order issues. However, the impact is moderate since this is primarily a debugging aid rather than a functional fix.

Low
Log warning when provider unavailable

If the provider is null during engine construction, the tracker is never registered
and stats collection silently fails. Log a warning when provider is null to make
this initialization failure visible for debugging.

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneIndexingExecutionEngine.java [139-143]

 LuceneStatsProvider provider = (LuceneStatsProvider) DataFormatStatsProviderRegistry.INSTANCE.get(LuceneStatsProvider.FORMAT_NAME);
 if (provider != null) {
     provider.register(store.shardId(), stats);
+} else {
+    logger.warn("LuceneStatsProvider not found; stats will not be collected for shard {}", store.shardId());
 }
Suggestion importance[1-10]: 5

__

Why: Adding a warning log when the LuceneStatsProvider is null makes silent initialization failures visible, which aids in debugging. However, this is a minor improvement since the code already handles the null case gracefully by not registering the tracker.

Low
Possible issue
Add thread-safety to singleton initialization

The singleton initialization is not thread-safe. Multiple concurrent plugin
constructions could result in race conditions where INSTANCE is set to different
objects or registry registration happens multiple times.

sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/stats/ParquetStatsProvider.java [44-51]

 public ParquetStatsProvider() {
-    if (INSTANCE == null) {
-        INSTANCE = this;
+    synchronized (ParquetStatsProvider.class) {
+        if (INSTANCE == null) {
+            INSTANCE = this;
+        }
     }
     DataFormatStatsProviderRegistry.INSTANCE.register(this);
 }
Suggestion importance[1-10]: 8

__

Why: The suggestion identifies a real thread-safety issue in the singleton pattern. Without synchronization, concurrent plugin constructions could lead to race conditions where INSTANCE is set inconsistently or multiple registrations occur. The fix is straightforward and addresses a potential concurrency bug.

Medium
Detect duplicate format registrations

The putIfAbsent silently ignores duplicate registrations, which could mask plugin
conflicts. Add validation to throw an exception or log a warning when a format name
is already registered by a different provider instance.

sandbox/libs/plugin-stats-spi/src/main/java/org/opensearch/plugin/stats/DataFormatStatsProviderRegistry.java [41-43]

 public void register(DataFormatStatsProvider<?> provider) {
-    providers.putIfAbsent(provider.formatName(), provider);
+    DataFormatStatsProvider<?> existing = providers.putIfAbsent(provider.formatName(), provider);
+    if (existing != null && existing != provider) {
+        throw new IllegalStateException("Format '" + provider.formatName() + "' is already registered");
+    }
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that putIfAbsent silently ignores duplicate registrations. Adding validation to detect conflicts between different provider instances is valuable for debugging plugin issues, though the impact is moderate since the first registration wins consistently.

Medium

Previous suggestions

Suggestions up to commit fbfcd89
CategorySuggestion                                                                                                                                    Impact
General
Make constructor protected for inheritance

The constructor is public, allowing external instantiation of this base class.
Consider making the constructor protected to enforce that only subclasses can
instantiate it, preventing misuse and ensuring proper inheritance patterns.

sandbox/libs/plugin-stats-spi/src/main/java/org/opensearch/plugin/stats/transport/FormatNodeStatsActionType.java [24-26]

 public class FormatNodeStatsActionType<T extends DataFormatShardStats<T>> extends ActionType<FormatNodeStatsResponse<T>> {
 
-    public FormatNodeStatsActionType(String formatName, Writeable.Reader<T> reader) {
+    protected FormatNodeStatsActionType(String formatName, Writeable.Reader<T> reader) {
         super("cluster:monitor/dfa/" + formatName + "/nodes/stats", in -> new FormatNodeStatsResponse<>(in, reader));
     }
 }
Suggestion importance[1-10]: 7

__

Why: Making the constructor protected is a good design practice for base classes intended for inheritance, as it prevents direct instantiation while allowing subclasses to use it. This aligns with the pattern seen in FormatStatsActionType and the documented purpose of these classes as "base classes" for subclassing.

Medium
Track cancelled background writes in stats

When a background write is cancelled, the code throws an exception but does not
increment any failure or cancellation counter. This creates an inconsistency where
cancelled operations are not tracked in stats, making it difficult to diagnose
issues. Consider adding a dedicated counter for cancelled writes or incrementing the
failure counter.

sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/vsr/VSRManager.java [191-197]

 if (pendingWrite != null && pendingWrite.isDone()) {
     Future.State state = pendingWrite.state();
     if (state == Future.State.FAILED) {
         stats.incBackgroundWriteFailures();
         throw new IllegalStateException(pendingWrite.exceptionNow());
     } else if (state == Future.State.CANCELLED) {
+        stats.incBackgroundWriteFailures();
         throw new IllegalStateException("Background write was cancelled");
     }
 }
Suggestion importance[1-10]: 6

__

Why: Valid observation that cancelled writes are not tracked in stats, creating an inconsistency. Adding stats.incBackgroundWriteFailures() for cancelled operations would improve observability and make the stats more complete for diagnosing issues.

Low
Log warning when stats provider missing

The registration silently fails if the provider is null, which could happen if
plugin initialization order is incorrect. This makes debugging difficult because the
stats tracker is created but never registered, leading to missing stats without any
error indication. Consider logging a warning or throwing an exception if the
provider is unexpectedly null.

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneIndexingExecutionEngine.java [137-141]

 LuceneStatsProvider provider = (LuceneStatsProvider) DataFormatStatsProviderRegistry.INSTANCE.get(LuceneStatsProvider.FORMAT_NAME);
-if (provider != null) {
+if (provider == null) {
+    logger.warn("LuceneStatsProvider not found in registry during engine initialization for shard {}", store.shardId());
+} else {
     provider.register(store.shardId(), stats);
 }
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies that silent failure when the provider is null could make debugging difficult. Adding a warning log would improve observability, though the system continues to function (just without stats collection).

Low
Report all invalid shard filters together

The validation loop checks shardFilter against every index's shard count, but throws
an exception on the first mismatch. If multiple indices have different shard counts
and the filter is valid for some but not others, the error message only reports the
first failing index. This can be confusing when querying multiple indices. Consider
validating all indices first and reporting all mismatches, or clarifying that the
filter must be valid for all indices.

sandbox/libs/plugin-stats-spi/src/main/java/org/opensearch/plugin/stats/transport/BaseTransportFormatStatsAction.java [119-128]

 if (shardFilter != null) {
+    List<String> invalidIndices = new ArrayList<>();
     for (String index : concreteIndices) {
         int numShards = clusterState.routingTable().index(index).getShards().size();
         if (shardFilter < 0 || shardFilter >= numShards) {
-            throw new IllegalArgumentException(
-                "shard [" + shardFilter + "] is out of range for index [" + index + "] which has [" + numShards + "] shard(s)"
-            );
+            invalidIndices.add(index + " (has " + numShards + " shards)");
         }
+    }
+    if (!invalidIndices.isEmpty()) {
+        throw new IllegalArgumentException(
+            "shard [" + shardFilter + "] is out of range for indices: " + String.join(", ", invalidIndices)
+        );
     }
 }
Suggestion importance[1-10]: 5

__

Why: The suggestion improves error reporting by collecting all invalid indices before throwing an exception, rather than failing on the first mismatch. This provides better user experience when querying multiple indices with different shard counts.

Low
Clarify timing semantics for failed operations

The stats.incDocsIndexedFailures() call is inside the catch block but outside the
finally block's timing measurement. If an exception occurs, the failure counter
increments but the elapsed time is still recorded in the finally block. Consider
whether failed operations should contribute to indexTimeMillis, or if timing should
only track successful operations.

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneWriter.java [233-240]

 } catch (IOException | IllegalArgumentException e) {
-    // Lucene's IndexWriter may have consumed a docId before throwing; advance our
-    // counter to match so rollbackTo can tombstone the partial slot, and
-    // retire to preserve the docId == rowId invariant for subsequent writes.
     docCount++;
     state = WriterState.PENDING_ROLLBACK;
     stats.incDocsIndexedFailures();
+    // Timing is recorded in finally block regardless of success/failure
     return new WriteResult.Failure(e, -1L, -1L, -1L);
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion raises a valid design question about whether failed operations should contribute to indexTimeMillis. However, the current implementation is consistent (timing is always recorded in the finally block), and the comment in the improved code doesn't change behavior. This is more of a design consideration than a bug.

Low
Suggestions up to commit 37d72ac
CategorySuggestion                                                                                                                                    Impact
General
Fix inconsistent background write metrics

The stats.incBackgroundWriteTotal() is only incremented on success, but
stats.addBackgroundWriteWaitMillis() is always recorded in the finally block. This
creates inconsistent metrics where wait time can accumulate without a corresponding
increment in total writes. Move the total increment to the finally block or adjust
the wait time recording to match the success condition.

sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/vsr/VSRManager.java [326-347]

 long startNanos = System.nanoTime();
+boolean completed = false;
 try {
     if (timeoutSeconds > 0) {
         pendingWrite.get(timeoutSeconds, TimeUnit.SECONDS);
     } else {
         pendingWrite.get();
     }
-    stats.incBackgroundWriteTotal();
+    completed = true;
 } catch (TimeoutException e) {
     stats.incBackgroundWriteTimeouts();
     ...
 } finally {
     long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-    stats.addBackgroundWriteWaitMillis(elapsed);
+    if (completed) {
+        stats.incBackgroundWriteTotal();
+        stats.addBackgroundWriteWaitMillis(elapsed);
+    }
     pendingWrite = null;
 }
Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies an inconsistency where stats.incBackgroundWriteTotal() is only incremented on success, but stats.addBackgroundWriteWaitMillis() is always recorded. This creates misleading metrics where wait time accumulates without corresponding total writes. The proposed fix ensures both metrics are recorded consistently only on successful completion.

Medium
Move stats increment outside timing

The timing measurement includes the stats.addDocsIndexed(1) call inside the try
block, which inflates the reported index time. Move the stats increment to the
finally block after computing elapsed time to measure only the actual indexing
operation.

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneWriter.java [207-228]

 long start = System.nanoTime();
+boolean success = false;
 try {
     Document doc = input.getFinalInput();
     ...
-    stats.addDocsIndexed(1);
+    long currentDocId = docCount;
+    docCount++;
+    success = true;
     return new WriteResult.Success(1L, 1L, currentDocId);
 } catch (IOException e) {
     stats.incDocsIndexedFailures();
     throw e;
 } finally {
-    stats.addIndexTimeMillis(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+    long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+    stats.addIndexTimeMillis(elapsed);
+    if (success) {
+        stats.addDocsIndexed(1);
+    }
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that stats.addDocsIndexed(1) is called inside the try block before computing elapsed time, which slightly inflates the reported index time. However, the impact is minimal since the stats increment is a simple counter operation. The suggested refactoring adds complexity with a success flag without significant benefit.

Medium
Log parsing failures for stats

Silently returning null on any exception hides parsing errors and makes debugging
difficult. Log the exception at debug or warn level before returning null so
operators can diagnose malformed stats payloads.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/action/FormatStatsResponse.java [160-166]

 private static Map<String, Object> bytesToMap(BytesReference bytes) {
     try {
         return XContentHelper.convertToMap(bytes, true, XContentType.JSON).v2();
     } catch (Exception e) {
+        logger.debug("Failed to parse stats JSON bytes", e);
         return null;
     }
 }
Suggestion importance[1-10]: 6

__

Why: The suggestion to log parsing exceptions before returning null is reasonable for debugging, but the method is a private helper that silently handles malformed stats payloads. Adding logging would help diagnose issues, though the impact is moderate since the method already handles errors gracefully by returning null.

Low
Log when replicas are filtered

The filter predicate silently drops replicas without logging. When a user queries a
specific shard that only has replicas on the target node, they receive an empty
result with no indication why. Add a debug log when replicas are filtered to aid
troubleshooting.

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/action/TransportFormatStatsAction.java [154-170]

 List<ShardRouting> filtered = shards.stream().filter(sr -> {
     if (sr.primary() == false) {
+        logger.debug("Skipping replica shard [{}] for format stats", sr.shardId());
         return false;
     }
     if (shardFilter != null && sr.shardId().id() != shardFilter) {
         return false;
     }
     if (finalNodeFilter != null && !finalNodeFilter.equals(sr.currentNodeId())) {
         return false;
     }
     return true;
 }).collect(Collectors.toList());
Suggestion importance[1-10]: 5

__

Why: Adding debug logging when replicas are filtered could aid troubleshooting, but the filtering behavior is intentional and documented in the code comment. The suggestion is valid for debugging purposes but not critical, as the filtering logic is expected behavior for DFA replicas.

Low

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 37d72ac: SUCCESS

@codecov
Copy link
Copy Markdown

codecov Bot commented May 28, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 73.42%. Comparing base (e0404f4) to head (afd9e13).
⚠️ Report is 2 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main   #21860      +/-   ##
============================================
+ Coverage     73.36%   73.42%   +0.05%     
- Complexity    75430    75527      +97     
============================================
  Files          6034     6034              
  Lines        342604   342604              
  Branches      49279    49279              
============================================
+ Hits         251357   251552     +195     
+ Misses        71220    71083     -137     
+ Partials      20027    19969      -58     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@ask-kamal-nayan ask-kamal-nayan force-pushed the pr1-stats branch 4 times, most recently from 3c76162 to fbfcd89 Compare May 28, 2026 14:53
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit fbfcd89

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for fbfcd89: SUCCESS

Signed-off-by: Kamal Nayan <askkamal@amazon.com>
Signed-off-by: Kamal Nayan <askkamal@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit afd9e13

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for afd9e13: SUCCESS

Comment on lines +73 to +96
public ParquetShardStats(
long docsIndexedTotal,
long indexTimeMillis,
long vsrRotationsTotal,
long nativeWriteTotal,
long nativeWriteTimeMillis,
long nativeWriteFailures,
long nativeFinalizeTotal,
long nativeFinalizeTimeMillis,
long nativeFinalizeFailures,
long nativeSyncTotal,
long nativeSyncTimeMillis,
long nativeSyncFailures,
long mergeTotal,
long mergeTimeMillis,
long mergeFailures,
long mergeInputFilesTotal,
long mergeOutputRowsTotal,
long backgroundWriteTotal,
long backgroundWriteWaitMillis,
long backgroundWriteTimeouts,
long backgroundWriteFailures
) {
this.docsIndexedTotal = docsIndexedTotal;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add rayon/tokio stats as well?

Signed-off-by: Kamal Nayan <askkamal@amazon.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants