Make tokio concurrency settings dynamic#21817
Conversation
PR Code Analyzer ❗AI-powered 'Code-Diff-Analyzer' found issues on commit c4e2de2.
The table above displays the top 10 most important findings. Pull Requests Author(s): Please update your Pull Request according to the report above. Repository Maintainer(s): You can Thanks. |
PR Reviewer Guide 🔍(Review updated until commit e5245ce)Here are some key observations to aid the review process:
|
PR Code Suggestions ✨Latest suggestions up to e5245ce Explore these optional code suggestions:
Previous suggestionsSuggestions up to commit 1b694e9
Suggestions up to commit f2da88d
Suggestions up to commit f2da88d
Suggestions up to commit f2da88d
Suggestions up to commit c4e2de2
|
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #21817 +/- ##
============================================
- Coverage 73.51% 73.50% -0.02%
+ Complexity 75582 75562 -20
============================================
Files 6034 6034
Lines 342661 342661
Branches 49294 49294
============================================
- Hits 251918 251875 -43
- Misses 70712 70748 +36
- Partials 20031 20038 +7 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| // Wire dynamic concurrency gate multiplier settings | ||
| int cpuThreads = DataFusionService.cpuThreadCount(); | ||
|
|
||
| clusterService.getClusterSettings().addSettingsUpdateConsumer(DatafusionSettings.CONCURRENCY_DATANODE_MULTIPLIER, multiplier -> { | ||
| int newMax = Math.max(1, (int) (cpuThreads * multiplier)); | ||
| NativeBridge.updateConcurrencyGate("datanode", newMax); | ||
| }); | ||
|
|
||
| clusterService.getClusterSettings().addSettingsUpdateConsumer(DatafusionSettings.CONCURRENCY_COORDINATOR_MULTIPLIER, multiplier -> { | ||
| int newMax = Math.max(1, (int) (cpuThreads * multiplier)); | ||
| NativeBridge.updateConcurrencyGate("coordinator", newMax); | ||
| }); | ||
|
|
There was a problem hiding this comment.
There is no such thing as data node/coordinator, absence of role.data makes it coordinator.
There was a problem hiding this comment.
Sorry, did not quite follow the comment. Are you talking about the naming or using different concurrency gates?
Background: We introduced different semaphores for data node and coordinator, else the coordinator was acquiring all permits and data node functions were blocked on coordinator, resulting in a deadlock.
There was a problem hiding this comment.
Name it as fragment_executor(data node) and reduce(coordinator)
There was a problem hiding this comment.
Renamed the gates and settings to fragment_executor and reduce respectively.
c4e2de2 to
f2da88d
Compare
|
Persistent review updated to latest commit f2da88d |
|
❌ Gradle check result for f2da88d: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
|
Persistent review updated to latest commit f2da88d |
|
❌ Gradle check result for f2da88d: null Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
|
Persistent review updated to latest commit f2da88d |
| // Validate bounds | ||
| if new_max < 1 || new_max > Semaphore::MAX_PERMITS as u32 { | ||
| warn!( | ||
| "[{}] resize rejected: new_max {} out of bounds [1, {}]", |
There was a problem hiding this comment.
acquire_many_owned(N) returns a single OwnedSemaphorePermit holding N permits, but the scale-up logic counts Vec entries as individual permits. After 8→4 (1 entry holding 4 permits), then 4→6: popping 1 entry releases all 4 permits back, not just 2. The semaphore ends up with 9 available but max_permits says 6. Should we track actual permit count per entry or use individual acquire_owned() calls instead?
There was a problem hiding this comment.
Switched to individual acquire_owned() calls in a loop — each Vec entry now holds exactly 1 permit.
| // the bulk release of the poison entry. This is an implementation detail of | ||
| // how acquire_many_owned stores multiple permits as one entry. | ||
| // The key contract is: max_permits == 6 and no poison held. | ||
| assert!(gate.semaphore.available_permits() >= 6); |
There was a problem hiding this comment.
Why are we validating >=6 and not ==?
There was a problem hiding this comment.
This was a consequence of above comment. Fixed now to be ==
f2da88d to
1b694e9
Compare
|
Persistent review updated to latest commit 1b694e9 |
Signed-off-by: Ajay Raj Nelapudi <ajnelapu@amazon.com>
Signed-off-by: Ajay Raj Nelapudi <ajnelapu@amazon.com>
Signed-off-by: Ajay Raj Nelapudi <ajnelapu@amazon.com>
1b694e9 to
e5245ce
Compare
|
Persistent review updated to latest commit e5245ce |
| private static final VarHandle CG_TOTAL_WAIT_DURATION_MS = handle("reduce_gate", "total_wait_duration_ms"); | ||
| private static final VarHandle CG_TOTAL_BATCHES_STARTED = handle("reduce_gate", "total_batches_started"); |
There was a problem hiding this comment.
reduce_executor just for consistency
| this.poisonPermits = in.readVLong(); | ||
| this.targetMaxPermits = in.readVLong(); |
There was a problem hiding this comment.
If 3.7 is launched we might have to change this to the next minor version
Description
Make tokio concurrency settings dynamic
Related Issues
Resolves #[Issue number to be closed when this PR is merged]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.