Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 24 additions & 16 deletions Sources/Cacheout/Memory/ProcessMemoryScanner.swift
Original file line number Diff line number Diff line change
Expand Up @@ -97,29 +97,37 @@ actor ProcessMemoryScanner {
///
/// Returns the collected entries and the count of EPERM failures.
private func scanPIDs(_ pids: [pid_t]) async -> (entries: [ProcessEntryDTO], epermCount: Int) {
// Chunk PIDs to cap concurrency at maxConcurrency.
let chunks = stride(from: 0, to: pids.count, by: maxConcurrency).map {
Array(pids[$0..<min($0 + maxConcurrency, pids.count)])
}

var allEntries: [ProcessEntryDTO] = []
var totalEperm = 0

for chunk in chunks {
await withTaskGroup(of: ScanPIDResult.self) { group in
for pid in chunk {
// Use a sliding window approach with an iterator instead of static chunking
// to maintain maximum concurrent execution limits continuously and improve throughput.
await withTaskGroup(of: ScanPIDResult.self) { group in
var iterator = pids.makeIterator()

// Initial batch
for _ in 0..<maxConcurrency {
if let pid = iterator.next() {
group.addTask { [self] in
self.scanSinglePID(pid)
}
}
for await result in group {
switch result {
case .success(let entry):
allEntries.append(entry)
case .eperm:
totalEperm += 1
case .otherError:
break
}

// Add new tasks as existing ones complete
for await result in group {
switch result {
case .success(let entry):
allEntries.append(entry)
case .eperm:
totalEperm += 1
case .otherError:
break
}

if let pid = iterator.next() {
group.addTask { [self] in
self.scanSinglePID(pid)
}
}
}
Expand Down