Fix: Prevent concurrent subgroup processing across peers#563
Conversation
Add a signalling guard that checks if another peer is already processing the same channel before starting LLM work. Uses the existing ProcessingState broadcast via the signalling service. - Add isAnotherPeerProcessingChannel() helper with 5-min staleness timeout - Add channelId param to checkIfWeShouldProcessTask() with guard check - Add re-check in processesNextTask() before starting LLM processing - Update call sites in TimelineColumn.vue and findProcessingTasksInCommunity()
✅ Deploy Preview for fluxsocial-dev ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughAdds peer-aware AI task coordination by passing channel identifiers into processing checks and introducing a stale-timeout guard; refines timestamp formatting in TimelineColumn without changing output semantics. Changes
Sequence DiagramsequenceDiagram
participant TC as TimelineColumn
participant Store as aiStore
participant Signal as SignallingService
TC->>Store: checkIfWeShouldProcessTask(unprocessedItems, signallingService, channelId)
activate Store
Store->>Signal: query active agents for channelId
Signal-->>Store: agents + lastUpdate
Store->>Store: filter out stale entries (PROCESSING_STALE_TIMEOUT)
alt Another peer currently processing
Store-->>TC: return false
else No active peer processing
Store-->>TC: return true
end
deactivate Store
alt TC queues task and Store begins processing
TC->>Store: enqueue task
Store->>Store: processesNextTask()
activate Store
Store->>Signal: re-check agent status for channelId before LLM work
Signal-->>Store: current agents
alt Peer detected while queued
Store->>Store: shift queued task and abort processing
else Still safe
Store->>LLM: start processing task
end
deactivate Store
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@app/src/stores/aiStore.ts`:
- Around line 187-194: The code uses a non-null assertion rawChannel.id! when
calling checkIfWeShouldProcessTask which can hide undefined ids; update the
logic in the block that builds shouldProcess so you first validate rawChannel.id
(from rawChannel or conversationData.channel) is present and bail out (return
null or skip calling checkIfWeShouldProcessTask) if it's undefined, then only
call checkIfWeShouldProcessTask(unprocessedItems,
communityService.signallingService, rawChannel.id) when the id exists; reference
rawChannel, unprocessedItems, checkIfWeShouldProcessTask,
conversationData.channel and communityId to locate and modify the guard.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 6f624a83-8d80-43c6-ab36-8eda0398fd32
📒 Files selected for processing (2)
app/src/components/conversation/timeline/TimelineColumn.vueapp/src/stores/aiStore.ts
…nity Bail out early if rawChannel.id is undefined instead of using a non-null assertion that could hide the issue.
Fix: Prevent concurrent subgroup processing across peers
Problem
When multiple peers are online with AI enabled, they can simultaneously process the same unprocessed messages in a channel, resulting in overlapping subgroups — the same messages end up assigned to subgroups from both peers.
The existing deterministic responsibility check (
checkItemsForResponsibility) works correctly when all peers see the same unprocessed items list. However, sync lag between peers causes divergent lists — different first items lead to different walk orders, and both peers independently elect themselves as responsible.Solution
Use the existing signalling service to prevent concurrent processing. The
ProcessingState(includingchannelId) is already broadcast to all peers — it just wasn't being consulted before starting processing.Changes
isAnotherPeerProcessingChannel()— New helper that checks if any other peer's signalling state shows them actively processing the same channel. Includes a 5-minute staleness timeout so crashed/disconnected peers don't block processing indefinitely.checkIfWeShouldProcessTask()— AddedchannelIdparameter and signalling guard as the first check, before the existing responsibility logic.processesNextTask()— Added a re-check of the signalling guard right before starting LLM work, covering the window between a task being queued and actually starting.Call sites updated —
TimelineColumn.vueandfindProcessingTasksInCommunity()now pass the channel ID.Files changed
app/src/stores/aiStore.tschannelIdparamapp/src/components/conversation/timeline/TimelineColumn.vuechannelUrlto guardLimitations
Summary by CodeRabbit
Bug Fixes
Refactor