Skip to content

IT: assert lastCommitFileName invariant under concurrent flush/refresh#21868

Draft
ask-kamal-nayan wants to merge 2 commits into
opensearch-project:mainfrom
ask-kamal-nayan:raceBugIT
Draft

IT: assert lastCommitFileName invariant under concurrent flush/refresh#21868
ask-kamal-nayan wants to merge 2 commits into
opensearch-project:mainfrom
ask-kamal-nayan:raceBugIT

Conversation

@ask-kamal-nayan
Copy link
Copy Markdown
Contributor

@ask-kamal-nayan ask-kamal-nayan commented May 28, 2026

Description

What

This PR adds two changes:

  1. A regression IT (CompositeConcurrentIndexingIT#testConcurrentFlushAndRefreshHoldsRefreshLock) for the refreshLock fix in DataFormatAwareEngine.flush().
  2. A bug fix + regression IT for an indexSort mismatch in LuceneCommitter.discoverAndTrimUnsafeCommits that prevents engine reinit when the safe commit has many Lucene segments.

Change 1 — IT for refreshLock fix in DataFormatAwareEngine.flush()

Why

Without holding refreshLock across the full flush body, a refresh that fires between committer.commit() and updateLastCommitInfo() advances latestCatalogSnapshot so the
just-committed snapshot keeps a stale lastCommitFileName. The stale name eventually points to a segments_<N> the deletion sweep has already removed, and surfaces as peer-recovery
NoSuchFileException.

The existing IT covered concurrent index/refresh/flush correctness but did not assert the on-disk invariant that the fix protects.

What the test does

  • 4 indexer + 3 flusher + 3 refresher threads, started in lockstep via a CyclicBarrier
  • After every flush, the flusher reads back the safe catalog snapshot and asserts
    • getLastCommitFileName() is non-null and starts with segments_
    • the referenced file is still listed in store().directory().listAll()
  • Final verifyIndex confirms doc counts and segment-generation invariants

Validation

Configuration Result
With refreshLock fix PASS in ~15s
refreshLock removed FAIL: Committed snapshot's lastCommitFileName must always match an existing segments_<N> on disk (also surfaces afterRefresh called but not beforeRefresh)

|

Test fails deterministically without the fix and passes with it.


Change 2 — Fix indexSort mismatch in LuceneCommitter.discoverAndTrimUnsafeCommits

Why

LuceneCommitter.discoverAndTrimUnsafeCommits opens a temp IndexWriter in APPEND mode at the safe-commit point with no setMergePolicy (defaults to TieredMergePolicy) and no
setIndexSort. When the safe commit references ≥ ~10 Lucene segments, the temp writer fires a merge during open/close and produces a segment with source=merge and indexSort=null.
The new (sorted) MergeIndexWriter then fails to open at the polluted commit:

IllegalArgumentException: cannot change previous indexSort=null
(from segment=_X(...source=merge,...))
to new indexSort=<sortednumeric: "row_id"> ...
at IndexWriter.validateIndexSort:1225
at LuceneCommitter.:126

The shard cannot start; the index goes RED.

Fix

Pin NoMergePolicy.INSTANCE on the temp writer — matches the existing guard vanilla Store.trimUnsafeCommits uses (Store.java:2255). The temp writer's job is to re-anchor the
commit point; it must never merge.

Test

DataFormatAwareReplicaResilienceIT#testEngineReinitDoesNotFailWithIndexSortMismatch fails deterministically without the fix (RED) and passes with it (~60s GREEN).


### Related Issues
Resolves #[Issue number to be closed when this PR is merged]
<!-- List any other related issues here -->

### Check List
- [ ] Functionality includes testing.
- [ ] API changes companion pull request [created](https://github.com/opensearch-project/opensearch-api-specification/blob/main/DEVELOPER_GUIDE.md), if applicable.
- [ ] Public documentation issue/PR [created](https://github.com/opensearch-project/documentation-website/issues/new/choose), if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check [here](https://github.com/opensearch-project/OpenSearch/blob/main/CONTRIBUTING.md#developer-certificate-of-origin).

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 28, 2026

PR Reviewer Guide 🔍

(Review updated until commit 9bf6bf4)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Possible Issue

The test resolves primaryShard once before starting concurrent threads but does not account for shard relocation or failover during the test. If the primary moves to another node mid-test, flushers will read stale state from the old shard instance, causing false negatives (missed race detections) or exceptions when accessing a closed shard. This is realistic in a multi-node cluster test with concurrent operations.

ShardId shardId = new ShardId(resolveIndex(indexName), 0);
IndexShard primaryShard = null;
for (String node : internalCluster().getDataNodeNames()) {
    try {
        IndexShard s = getIndexShard(node, shardId, indexName);
        if (s != null && s.routingEntry().primary()) {
            primaryShard = s;
            break;
        }
    } catch (Exception ignore) {}
}
assertNotNull("primary shard for [" + indexName + "][0] must be available", primaryShard);
final IndexShard primary = primaryShard;
Resource Leak

If primary.store().directory().listAll() throws an exception after primary.store().incRef(), the decRef() in the finally block is never reached, leaking the store reference. This can prevent shard closure and cause resource exhaustion under repeated test runs or when exceptions occur during directory listing.

primary.store().incRef();
try {
    Set<String> diskFiles = new HashSet<>(Arrays.asList(primary.store().directory().listAll()));
    if (!diskFiles.contains(reported)) {
        raceDetected.incrementAndGet();
    }
} finally {
    primary.store().decRef();
}

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 28, 2026

PR Code Suggestions ✨

Latest suggestions up to 64673c8

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Detect thread timeout failures

Add timeout handling for thread joins to detect and report threads that fail to
complete within the 60-second timeout. Currently, if a thread hangs, the test will
silently continue without detecting the issue, potentially masking deadlocks or
infinite loops.

sandbox/plugins/composite-engine/src/internalClusterTest/java/org/opensearch/composite/CompositeConcurrentIndexingIT.java [394-399]

-for (Thread t : indexers)
+for (Thread t : indexers) {
     t.join(60_000);
-for (Thread t : flushers)
+    if (t.isAlive()) {
+        fail("Indexer thread " + t.getName() + " did not complete within timeout");
+    }
+}
+for (Thread t : flushers) {
     t.join(60_000);
-for (Thread t : refreshers)
+    if (t.isAlive()) {
+        fail("Flusher thread " + t.getName() + " did not complete within timeout");
+    }
+}
+for (Thread t : refreshers) {
     t.join(60_000);
+    if (t.isAlive()) {
+        fail("Refresher thread " + t.getName() + " did not complete within timeout");
+    }
+}
Suggestion importance[1-10]: 7

__

Why: Valid suggestion that improves test reliability by detecting hung threads. However, the impact is moderate since the test already has a failures counter and timeout detection would primarily help diagnose test infrastructure issues rather than the actual race condition being tested.

Medium

Previous suggestions

Suggestions up to commit 00094d0
CategorySuggestion                                                                                                                                    Impact
General
Log exceptions before counting failures

Log the exception details when incrementing the failures counter. Without logging,
diagnosing test failures becomes difficult as the root cause is silently swallowed,
making it hard to understand which operation failed and why.

sandbox/plugins/composite-engine/src/internalClusterTest/java/org/opensearch/composite/CompositeConcurrentIndexingIT.java [366-368]

 } catch (Exception e) {
+    logger.error("Worker thread failed", e);
     failures.incrementAndGet();
 }
Suggestion importance[1-10]: 6

__

Why: Valid suggestion that would improve debuggability by logging exceptions before incrementing the failures counter. This helps diagnose test failures, though it's a moderate improvement rather than fixing a critical bug. The same pattern appears in multiple catch blocks (lines 330-332, 366-368, 381-383).

Low
Possible issue
Fix resource leak in store reference

The store().incRef() call occurs outside the try-with-resources scope that manages
the catalog snapshot reference. If an exception occurs between acquiring the
snapshot and calling incRef(), the store reference won't be properly released,
causing a resource leak.

sandbox/plugins/composite-engine/src/internalClusterTest/java/org/opensearch/composite/CompositeConcurrentIndexingIT.java [349-364]

+primary.store().incRef();
 try (GatedCloseable<CatalogSnapshot> ref = primary.acquireSafeCatalogSnapshot()) {
     String reported = ref.get().getLastCommitFileName();
     if (reported == null || !reported.startsWith("segments_")) {
         raceDetected.incrementAndGet();
-        continue;
+        return;
     }
-    primary.store().incRef();
-    try {
-        Set<String> diskFiles = new HashSet<>(Arrays.asList(primary.store().directory().listAll()));
-        if (!diskFiles.contains(reported)) {
-            raceDetected.incrementAndGet();
-        }
-    } finally {
-        primary.store().decRef();
+    Set<String> diskFiles = new HashSet<>(Arrays.asList(primary.store().directory().listAll()));
+    if (!diskFiles.contains(reported)) {
+        raceDetected.incrementAndGet();
     }
+} finally {
+    primary.store().decRef();
 }
Suggestion importance[1-10]: 3

__

Why: The suggestion identifies a potential resource management issue, but the improved code changes the control flow by replacing continue with return, which would exit the entire loop prematurely rather than just skipping one iteration. The restructuring may not preserve the original test logic correctly.

Low
Suggestions up to commit 48a1a59
CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix potential store reference leak

The primary.store().incRef() call occurs outside the try-with-resources scope that
manages the catalog snapshot. If an exception occurs between acquiring the snapshot
and incrementing the store reference, the store reference won't be properly
decremented, causing a resource leak.

sandbox/plugins/composite-engine/src/internalClusterTest/java/org/opensearch/composite/CompositeConcurrentIndexingIT.java [349-364]

 try (GatedCloseable<CatalogSnapshot> ref = primary.acquireSafeCatalogSnapshot()) {
     String reported = ref.get().getLastCommitFileName();
     if (reported == null || !reported.startsWith("segments_")) {
         raceDetected.incrementAndGet();
         continue;
     }
-    primary.store().incRef();
     try {
+        primary.store().incRef();
         Set<String> diskFiles = new HashSet<>(Arrays.asList(primary.store().directory().listAll()));
         if (!diskFiles.contains(reported)) {
             raceDetected.incrementAndGet();
         }
     } finally {
         primary.store().decRef();
     }
 }
Suggestion importance[1-10]: 9

__

Why: Critical fix for a resource leak. Moving primary.store().incRef() inside the inner try block ensures decRef() is always called even if an exception occurs during listAll(), preventing potential resource exhaustion in the test.

High
General
Detect and report hung threads

Add timeout handling for thread joins to detect and report hung threads. If any
thread fails to complete within the timeout, the test should fail with a clear
message indicating which thread type timed out, rather than silently continuing.

sandbox/plugins/composite-engine/src/internalClusterTest/java/org/opensearch/composite/CompositeConcurrentIndexingIT.java [394-399]

-for (Thread t : indexers)
+for (Thread t : indexers) {
     t.join(60_000);
-for (Thread t : flushers)
+    if (t.isAlive()) {
+        fail("Indexer thread " + t.getName() + " did not complete within timeout");
+    }
+}
+for (Thread t : flushers) {
     t.join(60_000);
-for (Thread t : refreshers)
+    if (t.isAlive()) {
+        fail("Flusher thread " + t.getName() + " did not complete within timeout");
+    }
+}
+for (Thread t : refreshers) {
     t.join(60_000);
+    if (t.isAlive()) {
+        fail("Refresher thread " + t.getName() + " did not complete within timeout");
+    }
+}
Suggestion importance[1-10]: 7

__

Why: Valid suggestion that improves test diagnostics by explicitly detecting and reporting hung threads. This helps identify which thread type failed to complete, making debugging easier. However, it's a test improvement rather than a critical functional fix.

Medium

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 48a1a59: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 00094d0

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 00094d0: SUCCESS

@codecov
Copy link
Copy Markdown

codecov Bot commented May 28, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 73.54%. Comparing base (700213c) to head (64673c8).
⚠️ Report is 1 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main   #21868      +/-   ##
============================================
+ Coverage     73.49%   73.54%   +0.05%     
- Complexity    75557    75602      +45     
============================================
  Files          6034     6034              
  Lines        342604   342604              
  Branches      49279    49279              
============================================
+ Hits         251805   251980     +175     
+ Misses        70739    70593     -146     
+ Partials      20060    20031      -29     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Signed-off-by: Kamal Nayan <askkamal@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 64673c8

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 64673c8: SUCCESS

Signed-off-by: Kamal Nayan <askkamal@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 9bf6bf4

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 9bf6bf4: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant