Skip to content

Make tokio concurrency settings dynamic#21817

Open
AjayRajNelapudi wants to merge 3 commits into
opensearch-project:mainfrom
AjayRajNelapudi:feature/concurrency-settings
Open

Make tokio concurrency settings dynamic#21817
AjayRajNelapudi wants to merge 3 commits into
opensearch-project:mainfrom
AjayRajNelapudi:feature/concurrency-settings

Conversation

@AjayRajNelapudi
Copy link
Copy Markdown
Contributor

@AjayRajNelapudi AjayRajNelapudi commented May 24, 2026

Description

Make tokio concurrency settings dynamic

% curl -s 'localhost:9200/_cluster/settings?include_defaults=true&flat_settings=true&pretty' | grep -A0 'concurrency'
    "datafusion.concurrency.fragment_executor_multiplier" : "1.5",
    "datafusion.concurrency.reduce_multiplier" : "1.5",
    "node.auto_force_merge.threads.concurrency_multiplier" : "2",

% curl -s 'localhost:9200/_plugins/analytics_backend_datafusion/stats?pretty' | grep -A8 'fragment_executor_gate\|reduce_gate'
  "fragment_executor_gate" : {
    "max_permits" : 12,
    "active_permits" : 0,
    "total_wait_duration_ms" : 0,
    "total_batches_started" : 0,
    "poison_permits" : 0,
    "target_max_permits" : 12
  },
  "reduce_gate" : {
    "max_permits" : 12,
    "active_permits" : 0,
    "total_wait_duration_ms" : 0,
    "total_batches_started" : 0,
    "poison_permits" : 0,
    "target_max_permits" : 12
  }

% curl -s -X PUT 'localhost:9200/_cluster/settings?pretty' -H 'Content-Type: application/json' -d '{
  "persistent": {
    "datafusion.concurrency.fragment_executor_multiplier": 5.0,
    "datafusion.concurrency.reduce_multiplier": 5.0
  }
}'
{
  "acknowledged" : true,
  "persistent" : {
    "datafusion" : {
      "concurrency" : {
        "reduce_multiplier" : "5.0",
        "fragment_executor_multiplier" : "5.0"
      }
    }
  },
  "transient" : { }
}

% curl -s 'localhost:9200/_plugins/analytics_backend_datafusion/stats?pretty' | grep -A8 'fragment_executor_gate\|reduce_gate'
  "fragment_executor_gate" : {
    "max_permits" : 40,
    "active_permits" : 0,
    "total_wait_duration_ms" : 0,
    "total_batches_started" : 0,
    "poison_permits" : 0,
    "target_max_permits" : 40
  },
  "reduce_gate" : {
    "max_permits" : 40,
    "active_permits" : 0,
    "total_wait_duration_ms" : 0,
    "total_batches_started" : 0,
    "poison_permits" : 0,
    "target_max_permits" : 40
  }

% curl -s -X PUT 'localhost:9200/_cluster/settings?pretty' -H 'Content-Type: application/json' -d '{
  "persistent": {
    "datafusion.concurrency.fragment_executor_multiplier": 0.5,
    "datafusion.concurrency.reduce_multiplier": 0.5
  }
}'
{
  "acknowledged" : true,
  "persistent" : {
    "datafusion" : {
      "concurrency" : {
        "reduce_multiplier" : "0.5",
        "fragment_executor_multiplier" : "0.5"
      }
    }
  },
  "transient" : { }
}

% curl -s 'localhost:9200/_plugins/analytics_backend_datafusion/stats?pretty' | grep -A8 'fragment_executor_gate\|reduce_gate'
  "fragment_executor_gate" : {
    "max_permits" : 4,
    "active_permits" : 0,
    "total_wait_duration_ms" : 0,
    "total_batches_started" : 0,
    "poison_permits" : 36,
    "target_max_permits" : 4
  },
  "reduce_gate" : {
    "max_permits" : 4,
    "active_permits" : 0,
    "total_wait_duration_ms" : 0,
    "total_batches_started" : 0,
    "poison_permits" : 36,
    "target_max_permits" : 4
  }

% curl -s -X PUT 'localhost:9200/_cluster/settings?pretty' -H 'Content-Type: application/json' -d '{
  "persistent": {
    "datafusion.concurrency.fragment_executor_multiplier": null,
    "datafusion.concurrency.reduce_multiplier": null
  }
}'
{
  "acknowledged" : true,
  "persistent" : { },
  "transient" : { }
}

% curl -s 'localhost:9200/_plugins/analytics_backend_datafusion/stats?pretty' | grep -A8 'fragment_executor_gate\|reduce_gate'
  "fragment_executor_gate" : {
    "max_permits" : 12,
    "active_permits" : 0,
    "total_wait_duration_ms" : 0,
    "total_batches_started" : 0,
    "poison_permits" : 28,
    "target_max_permits" : 12
  },
  "reduce_gate" : {
    "max_permits" : 12,
    "active_permits" : 0,
    "total_wait_duration_ms" : 0,
    "total_batches_started" : 0,
    "poison_permits" : 28,
    "target_max_permits" : 12
  }

% curl -s -X PUT 'localhost:9200/_cluster/settings?pretty' -H 'Content-Type: application/json' -d '{
  "persistent": { "datafusion.concurrency.fragment_executor_multiplier": 0.05 }
}'
{
  "error" : {
    "root_cause" : [
      {
        "type" : "illegal_argument_exception",
        "reason" : "Failed to parse value [0.05] for setting [datafusion.concurrency.fragment_executor_multiplier] must be >= 0.1"
      }
    ],
    "type" : "illegal_argument_exception",
    "reason" : "Failed to parse value [0.05] for setting [datafusion.concurrency.fragment_executor_multiplier] must be >= 0.1"
  },
  "status" : 400
}

% curl -s -X PUT 'localhost:9200/_cluster/settings?pretty' -H 'Content-Type: application/json' -d '{
  "persistent": { "datafusion.concurrency.fragment_executor_multiplier": 15.0 }
}'
{
  "error" : {
    "root_cause" : [
      {
        "type" : "illegal_argument_exception",
        "reason" : "Failed to parse value [15.0] for setting [datafusion.concurrency.fragment_executor_multiplier] must be <= 10.0"
      }
    ],
    "type" : "illegal_argument_exception",
    "reason" : "Failed to parse value [15.0] for setting [datafusion.concurrency.fragment_executor_multiplier] must be <= 10.0"
  },
  "status" : 400
}

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@AjayRajNelapudi AjayRajNelapudi requested a review from a team as a code owner May 24, 2026 10:41
@github-actions
Copy link
Copy Markdown
Contributor

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit c4e2de2.

PathLineSeverityDescription
sandbox/plugins/analytics-backend-datafusion/rust/src/executor.rs367highNew `use proptest::prelude::*;` import added in the test module implies the addition of the 'proptest' crate as a dependency. No corresponding Cargo.toml change is visible in this diff, which may mean the dependency change is in a separate commit or file not included here. Per mandatory policy, any dependency addition must be flagged regardless of apparent legitimacy — maintainers should verify proptest appears in Cargo.toml (dev-dependencies) and confirm no Cargo.lock changes introduce unexpected transitive dependencies.

The table above displays the top 10 most important findings.

Total: 1 | Critical: 0 | High: 1 | Medium: 0 | Low: 0


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@bharath-techie bharath-techie added the skip-diff-analyzer Maintainer to skip code-diff-analyzer check, after reviewing issues in AI analysis. label May 24, 2026
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 24, 2026

PR Reviewer Guide 🔍

(Review updated until commit e5245ce)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Possible Issue

In active_permits(), the subtraction max_permits.load(...).saturating_sub(semaphore.available_permits() as u32) can produce incorrect results during a resize-down. When poison permits are held, available_permits() reflects the reduced capacity, but max_permits still shows the old value until the resize completes. This causes active_permits() to report a value higher than the actual number of active queries. For example, if max_permits=8, 4 poison permits are held (available=4), and 2 queries are active, the formula returns 8 - 2 = 6 instead of the correct 2. This affects monitoring accuracy during resize operations.

    self.max_permits.load(Ordering::Acquire).saturating_sub(self.semaphore.available_permits() as u32)
}
Possible Issue

In resize() scale-down path, the code acquires poison permits one at a time in a loop (for _ in 0..additional_needed). If a query holds a permit and never releases it (e.g., due to a bug or infinite loop), this loop will block indefinitely waiting for acquire_owned() to succeed. The async mutex serializes resizes, so all subsequent resize attempts will also block. This can cause the entire concurrency control system to hang. A timeout or try_acquire fallback is needed to prevent indefinite blocking.

let additional_needed = total_poison_needed - existing_poison;
for _ in 0..additional_needed {
    let permit = self.semaphore.clone()
        .acquire_owned()
        .await
        .expect("semaphore closed during resize");
    state.poison_permits.push(permit);
Possible Issue

In resize() scale-up path, when releasing poison permits, the code pops permits from the Vec without checking if the Vec is empty. If permits_from_poison exceeds poison_permits.len(), the loop will panic on state.poison_permits.pop() when the Vec is empty. This can occur if concurrent resizes or a race condition causes poison_count to be stale. The code should verify state.poison_permits.len() >= permits_from_poison before the loop or use a checked pop.

let permits_from_poison = poison_count.min(new_max - current_max);
for _ in 0..permits_from_poison {
    state.poison_permits.pop();
}

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 24, 2026

PR Code Suggestions ✨

Latest suggestions up to e5245ce

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
General
Update max permits before semaphore operations

The scale-up logic updates max_permits and target_max_permits after releasing poison
permits and adding new permits. If a thread reads max_permits() between the permit
operations and the atomic store, it will see an inconsistent state. Update
max_permits before modifying the semaphore to maintain consistency.

sandbox/plugins/analytics-backend-datafusion/rust/src/executor.rs [149-165]

 if new_max > current_max {
-    // Scale-Up: release poison permits first, then add_permits for remaining delta
+    // Scale-Up: update max_permits first, then release poison permits and add new permits
+    self.max_permits.store(new_max, Ordering::Release);
+    state.target_max_permits = new_max;
+    
     let poison_count = state.poison_permits.len() as u32;
     let permits_from_poison = poison_count.min(new_max - current_max);
     for _ in 0..permits_from_poison {
         state.poison_permits.pop();
     }
     let still_needed = (new_max - current_max) - permits_from_poison;
     if still_needed > 0 {
         self.semaphore.add_permits(still_needed as usize);
     }
-    self.max_permits.store(new_max, Ordering::Release);
-    state.target_max_permits = new_max;
     ...
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a potential race condition where max_permits() could be read between semaphore operations and the atomic store, leading to temporary inconsistency. Updating max_permits before modifying the semaphore improves consistency. However, the impact is limited since the resize mutex serializes resize operations and the inconsistency window is brief.

Medium
Recalculate CPU threads inside lambda

The cpuThreads variable is captured from the outer scope and may become stale if the
number of available processors changes at runtime. Consider recalculating cpuThreads
inside each lambda to ensure the permit count reflects the current system state.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java [360-368]

 clusterService.getClusterSettings().addSettingsUpdateConsumer(DatafusionSettings.CONCURRENCY_DATANODE_MULTIPLIER, multiplier -> {
-    int newMax = Math.max(1, (int) (cpuThreads * multiplier));
+    int currentCpuThreads = DataFusionService.cpuThreadCount();
+    int newMax = Math.max(1, (int) (currentCpuThreads * multiplier));
     NativeBridge.updateConcurrencyGate("fragment_executor", newMax);
 });
 
 clusterService.getClusterSettings().addSettingsUpdateConsumer(DatafusionSettings.CONCURRENCY_COORDINATOR_MULTIPLIER, multiplier -> {
-    int newMax = Math.max(1, (int) (cpuThreads * multiplier));
+    int currentCpuThreads = DataFusionService.cpuThreadCount();
+    int newMax = Math.max(1, (int) (currentCpuThreads * multiplier));
     NativeBridge.updateConcurrencyGate("reduce", newMax);
 });
Suggestion importance[1-10]: 3

__

Why: While recalculating cpuThreads inside the lambda would reflect runtime changes in available processors, Runtime.getRuntime().availableProcessors() typically remains constant during JVM execution. The suggestion is technically correct but offers minimal practical benefit, as CPU count changes are rare and would require JVM restart to be properly handled.

Low

Previous suggestions

Suggestions up to commit 1b694e9
CategorySuggestion                                                                                                                                    Impact
General
Account for poison permits in calculation

The active_permits calculation subtracts available_permits from max_permits, but
during a resize-down with poison permits held, available_permits may exceed the
effective capacity. This can cause saturating_sub to return 0 even when permits are
genuinely active. Consider accounting for poison permits to accurately reflect the
number of permits held by active queries.

sandbox/plugins/analytics-backend-datafusion/rust/src/executor.rs [95-96]

 pub fn active_permits(&self) -> u32 {
-    self.max_permits.load(Ordering::Acquire).saturating_sub(self.semaphore.available_permits() as u32)
+    let max = self.max_permits.load(Ordering::Acquire);
+    let available = self.semaphore.available_permits() as u32;
+    let poison = self.poison_permits_held();
+    max.saturating_sub(available.saturating_sub(poison))
 }
Suggestion importance[1-10]: 7

__

Why: The current active_permits calculation may return incorrect values during resize-down when poison permits are held. The suggested fix properly accounts for poison permits to accurately reflect active query permits, improving observability correctness.

Medium
Update atomic after permit adjustments

The scale-up logic releases poison permits and adds new permits, but the max_permits
atomic is updated before verifying that the semaphore state is consistent. If a
concurrent acquire() reads max_permits between the store and the actual permit
availability, it may observe an inconsistent state. Consider updating max_permits
only after all permit adjustments are complete to maintain atomicity.

sandbox/plugins/analytics-backend-datafusion/rust/src/executor.rs [149-165]

 if new_max > current_max {
-    // Scale-Up: release poison permits first, then add_permits for remaining delta
     let poison_count = state.poison_permits.len() as u32;
     let permits_from_poison = poison_count.min(new_max - current_max);
     for _ in 0..permits_from_poison {
         state.poison_permits.pop();
     }
     let still_needed = (new_max - current_max) - permits_from_poison;
     if still_needed > 0 {
         self.semaphore.add_permits(still_needed as usize);
     }
+    state.target_max_permits = new_max;
     self.max_permits.store(new_max, Ordering::Release);
-    state.target_max_permits = new_max;
     ...
 }
Suggestion importance[1-10]: 5

__

Why: Moving the max_permits.store() after all permit adjustments reduces the window where concurrent acquire() calls might observe inconsistent state. However, the resize_mutex already serializes resize operations, limiting the practical impact of this race condition.

Low
Recompute CPU threads on each update

The cpuThreads variable is captured in lambda closures but is computed once at
plugin initialization. If the number of available processors changes at runtime
(e.g., container CPU limits adjusted), the gate permits will be computed using stale
values. Consider re-reading DataFusionService.cpuThreadCount() inside each lambda to
ensure dynamic updates reflect current CPU availability.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java [360-368]

 clusterService.getClusterSettings().addSettingsUpdateConsumer(DatafusionSettings.CONCURRENCY_DATANODE_MULTIPLIER, multiplier -> {
-    int newMax = Math.max(1, (int) (cpuThreads * multiplier));
+    int currentCpuThreads = DataFusionService.cpuThreadCount();
+    int newMax = Math.max(1, (int) (currentCpuThreads * multiplier));
     NativeBridge.updateConcurrencyGate("fragment_executor", newMax);
 });
 
 clusterService.getClusterSettings().addSettingsUpdateConsumer(DatafusionSettings.CONCURRENCY_COORDINATOR_MULTIPLIER, multiplier -> {
-    int newMax = Math.max(1, (int) (cpuThreads * multiplier));
+    int currentCpuThreads = DataFusionService.cpuThreadCount();
+    int newMax = Math.max(1, (int) (currentCpuThreads * multiplier));
     NativeBridge.updateConcurrencyGate("reduce", newMax);
 });
Suggestion importance[1-10]: 3

__

Why: While re-reading cpuThreadCount() on each update would handle dynamic CPU changes, Runtime.getRuntime().availableProcessors() typically remains constant during JVM lifetime. The suggestion addresses an edge case with minimal practical impact.

Low
Suggestions up to commit f2da88d
CategorySuggestion                                                                                                                                    Impact
General
Avoid stale CPU thread count

The cpuThreads variable is captured from the outer scope and may become stale if the
number of available processors changes at runtime. Consider fetching the CPU thread
count dynamically inside each consumer lambda to ensure the computation always uses
the current value.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java [360-368]

 clusterService.getClusterSettings().addSettingsUpdateConsumer(DatafusionSettings.CONCURRENCY_DATANODE_MULTIPLIER, multiplier -> {
+    int cpuThreads = DataFusionService.cpuThreadCount();
     int newMax = Math.max(1, (int) (cpuThreads * multiplier));
     NativeBridge.updateConcurrencyGate("datanode", newMax);
 });
 
 clusterService.getClusterSettings().addSettingsUpdateConsumer(DatafusionSettings.CONCURRENCY_COORDINATOR_MULTIPLIER, multiplier -> {
+    int cpuThreads = DataFusionService.cpuThreadCount();
     int newMax = Math.max(1, (int) (cpuThreads * multiplier));
     NativeBridge.updateConcurrencyGate("coordinator", newMax);
 });
Suggestion importance[1-10]: 3

__

Why: While the suggestion is technically correct that cpuThreads could become stale if processor count changes at runtime, this is an extremely rare scenario in production environments. The improvement is marginal and adds redundant calls to cpuThreadCount() on every settings update. The current implementation is acceptable.

Low
Suggestions up to commit f2da88d
CategorySuggestion                                                                                                                                    Impact
General
Avoid stale CPU thread count

The cpuThreads variable is captured from the outer scope and may become stale if the
number of available processors changes at runtime. Consider calling
DataFusionService.cpuThreadCount() inside each lambda to ensure the computation
always uses the current CPU thread count.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java [360-368]

 clusterService.getClusterSettings().addSettingsUpdateConsumer(DatafusionSettings.CONCURRENCY_DATANODE_MULTIPLIER, multiplier -> {
+    int cpuThreads = DataFusionService.cpuThreadCount();
     int newMax = Math.max(1, (int) (cpuThreads * multiplier));
     NativeBridge.updateConcurrencyGate("datanode", newMax);
 });
 
 clusterService.getClusterSettings().addSettingsUpdateConsumer(DatafusionSettings.CONCURRENCY_COORDINATOR_MULTIPLIER, multiplier -> {
+    int cpuThreads = DataFusionService.cpuThreadCount();
     int newMax = Math.max(1, (int) (cpuThreads * multiplier));
     NativeBridge.updateConcurrencyGate("coordinator", newMax);
 });
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies that cpuThreads is captured from outer scope. However, Runtime.getRuntime().availableProcessors() typically returns a constant value for the JVM lifetime, making this a minor improvement rather than a critical fix.

Low
Handle async resize task errors

The FFI function spawns an async resize task but does not await or track its
completion. If the resize fails or panics, the error is silently dropped. Consider
logging errors from the spawned task or returning a handle to allow the caller to
detect failures.

sandbox/plugins/analytics-backend-datafusion/rust/src/ffm.rs [127-132]

+let handle = io_runtime.spawn(async move {
+    gate.resize(new_max_permits, &gate_name_owned).await;
+});
+
+// Optionally: spawn a monitoring task to log errors
 io_runtime.spawn(async move {
-    gate.resize(new_max_permits, &gate_name_owned).await;
+    if let Err(e) = handle.await {
+        warn!("df_update_concurrency_gate task failed: {:?}", e);
+    }
 });
 
 Ok(0)
Suggestion importance[1-10]: 4

__

Why: Valid suggestion to improve error visibility. The spawned task's errors are currently silently dropped. However, the suggested code has a flaw (spawning another task to await the first), and the impact is moderate since resize failures would be rare in practice.

Low
Prevent active permits underflow

The active_permits calculation may underflow when poison permits are held during
scale-down, causing available_permits to exceed max_permits. Use saturating_sub to
prevent underflow, but verify that the metric accurately reflects active queries
when poison permits are present.

sandbox/plugins/analytics-backend-datafusion/rust/src/executor.rs [95-96]

 pub fn active_permits(&self) -> u32 {
-    self.max_permits.load(Ordering::Acquire).saturating_sub(self.semaphore.available_permits() as u32)
+    let max = self.max_permits.load(Ordering::Acquire);
+    let available = self.semaphore.available_permits() as u32;
+    // During scale-down, available may exceed max due to poison permits.
+    // Return 0 if available >= max to avoid underflow.
+    max.saturating_sub(available)
 }
Suggestion importance[1-10]: 3

__

Why: The code already uses saturating_sub which prevents underflow. The suggestion's improved code is functionally identical to the existing code, just with an intermediate variable and a comment. The concern about poison permits is valid but already handled.

Low
Suggestions up to commit f2da88d
CategorySuggestion                                                                                                                                    Impact
General
Clamp active permits to max

The active_permits calculation may underflow when available_permits exceeds
max_permits during a scale-up-after-scale-down scenario (due to bulk poison permit
release). Use saturating_sub to prevent underflow, but verify this behavior matches
the intended semantics for active permit reporting.

sandbox/plugins/analytics-backend-datafusion/rust/src/executor.rs [95-96]

 pub fn active_permits(&self) -> u32 {
-    self.max_permits.load(Ordering::Acquire).saturating_sub(self.semaphore.available_permits() as u32)
+    let max = self.max_permits.load(Ordering::Acquire);
+    let available = self.semaphore.available_permits() as u32;
+    max.saturating_sub(available).min(max)
 }
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies a potential underflow scenario during scale-up-after-scale-down. The improved code adds .min(max) to clamp the result, ensuring active_permits never exceeds max_permits, which improves correctness of the reported metric.

Low
Document bulk poison permit release

The scale-up logic releases poison permits one-by-one in a loop, but each
OwnedSemaphorePermit may represent multiple permits (from acquire_many_owned).
Dropping a single entry releases all permits it holds, potentially causing
available_permits to exceed new_max. Consider tracking the actual permit count per
entry or documenting this bulk-release behavior.

sandbox/plugins/analytics-backend-datafusion/rust/src/executor.rs [149-160]

 if new_max > current_max {
-    // Scale-Up: release poison permits first, then add_permits for remaining delta
+    // Scale-Up: release poison permits first (each entry may hold multiple permits)
+    // Note: dropping one OwnedSemaphorePermit releases all permits it holds
     let poison_count = state.poison_permits.len() as u32;
     let permits_from_poison = poison_count.min(new_max - current_max);
     for _ in 0..permits_from_poison {
-        state.poison_permits.pop();
+        state.poison_permits.pop(); // May release more than 1 permit per pop
     }
-    let still_needed = (new_max - current_max) - permits_from_poison;
+    let still_needed = (new_max - current_max).saturating_sub(permits_from_poison);
     if still_needed > 0 {
         self.semaphore.add_permits(still_needed as usize);
     }
     ...
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion correctly identifies that OwnedSemaphorePermit entries may hold multiple permits, causing bulk release. The improved code adds a clarifying comment and uses saturating_sub to prevent underflow in the still_needed calculation, improving code clarity and robustness.

Low
Avoid stale CPU count capture

The cpuThreads variable is captured from the outer scope and may become stale if the
number of available processors changes at runtime. Consider fetching the current CPU
count inside each consumer lambda to ensure the computation always uses the latest
value.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java [360-368]

 clusterService.getClusterSettings().addSettingsUpdateConsumer(DatafusionSettings.CONCURRENCY_DATANODE_MULTIPLIER, multiplier -> {
-    int newMax = Math.max(1, (int) (cpuThreads * multiplier));
+    int currentCpuThreads = DataFusionService.cpuThreadCount();
+    int newMax = Math.max(1, (int) (currentCpuThreads * multiplier));
     NativeBridge.updateConcurrencyGate("datanode", newMax);
 });
 
 clusterService.getClusterSettings().addSettingsUpdateConsumer(DatafusionSettings.CONCURRENCY_COORDINATOR_MULTIPLIER, multiplier -> {
-    int newMax = Math.max(1, (int) (cpuThreads * multiplier));
+    int currentCpuThreads = DataFusionService.cpuThreadCount();
+    int newMax = Math.max(1, (int) (currentCpuThreads * multiplier));
     NativeBridge.updateConcurrencyGate("coordinator", newMax);
 });
Suggestion importance[1-10]: 3

__

Why: The suggestion addresses a theoretical concern about stale cpuThreads values, but Runtime.getRuntime().availableProcessors() typically remains constant during JVM execution. The added overhead of repeated calls provides minimal benefit for a rare edge case.

Low
Suggestions up to commit c4e2de2
CategorySuggestion                                                                                                                                    Impact
General
Refresh CPU thread count dynamically

The cpuThreads value is captured once at plugin initialization and never updated. If
the JVM's available processors change at runtime (e.g., container CPU quota
adjustment), the gate permits will be computed using stale values. Consider fetching
cpuThreads inside the lambda to reflect current processor availability.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionPlugin.java [317-325]

 clusterService.getClusterSettings().addSettingsUpdateConsumer(DatafusionSettings.CONCURRENCY_DATANODE_MULTIPLIER, multiplier -> {
-    int newMax = Math.max(1, (int) (cpuThreads * multiplier));
+    int currentCpuThreads = DataFusionService.cpuThreadCount();
+    int newMax = Math.max(1, (int) (currentCpuThreads * multiplier));
     NativeBridge.updateConcurrencyGate("datanode", newMax);
 });
 
 clusterService.getClusterSettings().addSettingsUpdateConsumer(DatafusionSettings.CONCURRENCY_COORDINATOR_MULTIPLIER, multiplier -> {
-    int newMax = Math.max(1, (int) (cpuThreads * multiplier));
+    int currentCpuThreads = DataFusionService.cpuThreadCount();
+    int newMax = Math.max(1, (int) (currentCpuThreads * multiplier));
     NativeBridge.updateConcurrencyGate("coordinator", newMax);
 });
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that cpuThreads is captured once at initialization. While Runtime.getRuntime().availableProcessors() can change in containerized environments, this is an edge case. The suggestion improves robustness by fetching the current value on each update, though the impact is moderate since CPU quota changes are rare.

Medium
Optimize no-op resize path

The resize operation acquires the mutex before validating the new value against the
current state. If new_max equals current_max, the mutex is held unnecessarily. Move
the no-op check before acquiring the mutex to avoid contention when settings are
repeatedly applied with the same value.

sandbox/plugins/analytics-backend-datafusion/rust/src/executor.rs [129-144]

 pub async fn resize(&self, new_max: u32, gate_name: &str) {
     // Validate bounds
     if new_max < 1 || new_max > Semaphore::MAX_PERMITS as u32 {
         warn!(
             "[{}] resize rejected: new_max {} out of bounds [1, {}]",
             gate_name, new_max, Semaphore::MAX_PERMITS
         );
         return;
     }
 
+    let current_max = self.max_permits.load(Ordering::Acquire);
+    if new_max == current_max {
+        return; // No-op: avoid acquiring mutex
+    }
+
     let mut state = self.resize_mutex.lock().await;
-    let current_max = self.max_permits.load(Ordering::Acquire);
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies that the no-op check occurs after acquiring the mutex. Moving the check before the mutex acquisition would reduce contention when settings are repeatedly applied with the same value. However, the current code already has a no-op check at line 142-144 inside the mutex, so the optimization is valid but offers only marginal performance improvement.

Low

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for c4e2de2: SUCCESS

@codecov
Copy link
Copy Markdown

codecov Bot commented May 24, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 73.50%. Comparing base (dad63c0) to head (e5245ce).
⚠️ Report is 5 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main   #21817      +/-   ##
============================================
- Coverage     73.51%   73.50%   -0.02%     
+ Complexity    75582    75562      -20     
============================================
  Files          6034     6034              
  Lines        342661   342661              
  Branches      49294    49294              
============================================
- Hits         251918   251875      -43     
- Misses        70712    70748      +36     
- Partials      20031    20038       +7     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@Bukhtawar Bukhtawar self-requested a review May 25, 2026 05:23
Comment thread sandbox/plugins/analytics-backend-datafusion/rust/src/executor.rs Outdated
Comment on lines +314 to +326
// Wire dynamic concurrency gate multiplier settings
int cpuThreads = DataFusionService.cpuThreadCount();

clusterService.getClusterSettings().addSettingsUpdateConsumer(DatafusionSettings.CONCURRENCY_DATANODE_MULTIPLIER, multiplier -> {
int newMax = Math.max(1, (int) (cpuThreads * multiplier));
NativeBridge.updateConcurrencyGate("datanode", newMax);
});

clusterService.getClusterSettings().addSettingsUpdateConsumer(DatafusionSettings.CONCURRENCY_COORDINATOR_MULTIPLIER, multiplier -> {
int newMax = Math.max(1, (int) (cpuThreads * multiplier));
NativeBridge.updateConcurrencyGate("coordinator", newMax);
});

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no such thing as data node/coordinator, absence of role.data makes it coordinator.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, did not quite follow the comment. Are you talking about the naming or using different concurrency gates?

Background: We introduced different semaphores for data node and coordinator, else the coordinator was acquiring all permits and data node functions were blocked on coordinator, resulting in a deadlock.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Name it as fragment_executor(data node) and reduce(coordinator)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed the gates and settings to fragment_executor and reduce respectively.

@AjayRajNelapudi AjayRajNelapudi force-pushed the feature/concurrency-settings branch from c4e2de2 to f2da88d Compare May 28, 2026 04:20
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit f2da88d

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for f2da88d: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit f2da88d

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for f2da88d: null

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit f2da88d

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for f2da88d: SUCCESS

// Validate bounds
if new_max < 1 || new_max > Semaphore::MAX_PERMITS as u32 {
warn!(
"[{}] resize rejected: new_max {} out of bounds [1, {}]",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

acquire_many_owned(N) returns a single OwnedSemaphorePermit holding N permits, but the scale-up logic counts Vec entries as individual permits. After 8→4 (1 entry holding 4 permits), then 4→6: popping 1 entry releases all 4 permits back, not just 2. The semaphore ends up with 9 available but max_permits says 6. Should we track actual permit count per entry or use individual acquire_owned() calls instead?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched to individual acquire_owned() calls in a loop — each Vec entry now holds exactly 1 permit.

// the bulk release of the poison entry. This is an implementation detail of
// how acquire_many_owned stores multiple permits as one entry.
// The key contract is: max_permits == 6 and no poison held.
assert!(gate.semaphore.available_permits() >= 6);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we validating >=6 and not ==?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a consequence of above comment. Fixed now to be ==

@AjayRajNelapudi AjayRajNelapudi force-pushed the feature/concurrency-settings branch from f2da88d to 1b694e9 Compare May 30, 2026 12:31
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 1b694e9

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 1b694e9: SUCCESS

Copy link
Copy Markdown
Contributor

@himshikhagupta himshikhagupta left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

ajnelapu added 2 commits May 31, 2026 15:44
Signed-off-by: Ajay Raj Nelapudi <ajnelapu@amazon.com>
Signed-off-by: Ajay Raj Nelapudi <ajnelapu@amazon.com>
Signed-off-by: Ajay Raj Nelapudi <ajnelapu@amazon.com>
@AjayRajNelapudi AjayRajNelapudi force-pushed the feature/concurrency-settings branch from 1b694e9 to e5245ce Compare May 31, 2026 10:18
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit e5245ce

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for e5245ce: SUCCESS

Comment on lines +125 to +126
private static final VarHandle CG_TOTAL_WAIT_DURATION_MS = handle("reduce_gate", "total_wait_duration_ms");
private static final VarHandle CG_TOTAL_BATCHES_STARTED = handle("reduce_gate", "total_batches_started");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reduce_executor just for consistency

Comment on lines +94 to +95
this.poisonPermits = in.readVLong();
this.targetMaxPermits = in.readVLong();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If 3.7 is launched we might have to change this to the next minor version

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

skip-diff-analyzer Maintainer to skip code-diff-analyzer check, after reviewing issues in AI analysis.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants