Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .jules/bolt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
## 2024-06-18 - Sliding Window withTaskGroup over Static Chunking
**Learning:** In Swift structured concurrency, processing high-volume tasks using `withTaskGroup` with static chunking (e.g. creating chunks of array elements) limits throughput due to tail latency. The execution pauses while waiting for the slowest task in the current chunk before starting the next chunk.
**Action:** Use a sliding window approach with an iterator instead of static chunking. Populate the `withTaskGroup` with the initial `maxConcurrency` tasks, then continuously loop over `group` results with `for await`, adding a new task from the iterator each time one completes to maintain maximum parallel execution.
38 changes: 22 additions & 16 deletions Sources/Cacheout/Memory/ProcessMemoryScanner.swift
Original file line number Diff line number Diff line change
Expand Up @@ -97,29 +97,35 @@ actor ProcessMemoryScanner {
///
/// Returns the collected entries and the count of EPERM failures.
private func scanPIDs(_ pids: [pid_t]) async -> (entries: [ProcessEntryDTO], epermCount: Int) {
// Chunk PIDs to cap concurrency at maxConcurrency.
let chunks = stride(from: 0, to: pids.count, by: maxConcurrency).map {
Array(pids[$0..<min($0 + maxConcurrency, pids.count)])
}

var allEntries: [ProcessEntryDTO] = []
var totalEperm = 0

for chunk in chunks {
await withTaskGroup(of: ScanPIDResult.self) { group in
for pid in chunk {
await withTaskGroup(of: ScanPIDResult.self) { group in
var iterator = pids.makeIterator()

// Add initial tasks up to maxConcurrency
for _ in 0..<maxConcurrency {
if let pid = iterator.next() {
group.addTask { [self] in
self.scanSinglePID(pid)
}
}
for await result in group {
switch result {
case .success(let entry):
allEntries.append(entry)
case .eperm:
totalEperm += 1
case .otherError:
break
}

// As each task completes, add a new one until iterator is exhausted
for await result in group {
switch result {
case .success(let entry):
allEntries.append(entry)
case .eperm:
totalEperm += 1
case .otherError:
break
}

if let pid = iterator.next() {
group.addTask { [self] in
self.scanSinglePID(pid)
}
}
}
Expand Down