Skip to content

Fix: Prevent concurrent subgroup processing across peers#563

Merged
lucksus merged 2 commits intodevfrom
fix/concurrent-subgroup-processing
Apr 1, 2026
Merged

Fix: Prevent concurrent subgroup processing across peers#563
lucksus merged 2 commits intodevfrom
fix/concurrent-subgroup-processing

Conversation

@jhweir
Copy link
Copy Markdown
Contributor

@jhweir jhweir commented Mar 19, 2026

Fix: Prevent concurrent subgroup processing across peers

Problem

When multiple peers are online with AI enabled, they can simultaneously process the same unprocessed messages in a channel, resulting in overlapping subgroups — the same messages end up assigned to subgroups from both peers.

The existing deterministic responsibility check (checkItemsForResponsibility) works correctly when all peers see the same unprocessed items list. However, sync lag between peers causes divergent lists — different first items lead to different walk orders, and both peers independently elect themselves as responsible.

Solution

Use the existing signalling service to prevent concurrent processing. The ProcessingState (including channelId) is already broadcast to all peers — it just wasn't being consulted before starting processing.

Changes

  • isAnotherPeerProcessingChannel() — New helper that checks if any other peer's signalling state shows them actively processing the same channel. Includes a 5-minute staleness timeout so crashed/disconnected peers don't block processing indefinitely.

  • checkIfWeShouldProcessTask() — Added channelId parameter and signalling guard as the first check, before the existing responsibility logic.

  • processesNextTask() — Added a re-check of the signalling guard right before starting LLM work, covering the window between a task being queued and actually starting.

  • Call sites updatedTimelineColumn.vue and findProcessingTasksInCommunity() now pass the channel ID.

Files changed

File Change
app/src/stores/aiStore.ts Signalling guard + channelId param
app/src/components/conversation/timeline/TimelineColumn.vue Pass channelUrl to guard

Limitations

  • Fully offline peers — If two peers can't reach each other via signalling, they can't see each other's processing state. This is inherent to the signalling approach.
  • Existing duplicates — This fix prevents future concurrent processing but doesn't clean up items already in multiple subgroups.

Summary by CodeRabbit

  • Bug Fixes

    • Improved coordination of AI task processing across peers to prevent duplicate or conflicting work, including handling of stale processing claims so tasks are not blocked indefinitely.
  • Refactor

    • Refined timestamp formatting for more consistent, robust display of message times.

Add a signalling guard that checks if another peer is already processing
the same channel before starting LLM work. Uses the existing
ProcessingState broadcast via the signalling service.

- Add isAnotherPeerProcessingChannel() helper with 5-min staleness timeout
- Add channelId param to checkIfWeShouldProcessTask() with guard check
- Add re-check in processesNextTask() before starting LLM processing
- Update call sites in TimelineColumn.vue and findProcessingTasksInCommunity()
@netlify
Copy link
Copy Markdown

netlify Bot commented Mar 19, 2026

Deploy Preview for fluxsocial-dev ready!

Name Link
🔨 Latest commit 1d0b542
🔍 Latest deploy log https://app.netlify.com/projects/fluxsocial-dev/deploys/69bc0f57785d470008fcad90
😎 Deploy Preview https://deploy-preview-563--fluxsocial-dev.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify project configuration.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Mar 19, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 749e8039-74e8-4230-9059-c52f888f8430

📥 Commits

Reviewing files that changed from the base of the PR and between 0a8be9f and 1d0b542.

📒 Files selected for processing (1)
  • app/src/stores/aiStore.ts

📝 Walkthrough

Walkthrough

Adds peer-aware AI task coordination by passing channel identifiers into processing checks and introducing a stale-timeout guard; refines timestamp formatting in TimelineColumn without changing output semantics.

Changes

Cohort / File(s) Summary
Timeline UI & Timestamp
app/src/components/conversation/timeline/TimelineColumn.vue
Refined formatTimestamp() implementation (uses explicit date.toISOString() call) preserving existing output and fallback behavior; updated call to aiStore.checkIfWeShouldProcessTask to pass channelUrl/channel identifier.
AI Store: Peer-aware processing
app/src/stores/aiStore.ts
Added exported PROCESSING_STALE_TIMEOUT; introduced isAnotherPeerProcessingChannel(signallingService, channelId); changed checkIfWeShouldProcessTask signature to accept channelId; extended task discovery to guard missing channel IDs; added runtime signalling re-check in processesNextTask to drop queued tasks if another peer is detected.

Sequence Diagram

sequenceDiagram
    participant TC as TimelineColumn
    participant Store as aiStore
    participant Signal as SignallingService

    TC->>Store: checkIfWeShouldProcessTask(unprocessedItems, signallingService, channelId)
    activate Store
    Store->>Signal: query active agents for channelId
    Signal-->>Store: agents + lastUpdate
    Store->>Store: filter out stale entries (PROCESSING_STALE_TIMEOUT)
    alt Another peer currently processing
        Store-->>TC: return false
    else No active peer processing
        Store-->>TC: return true
    end
    deactivate Store

    alt TC queues task and Store begins processing
        TC->>Store: enqueue task
        Store->>Store: processesNextTask()
        activate Store
        Store->>Signal: re-check agent status for channelId before LLM work
        Signal-->>Store: current agents
        alt Peer detected while queued
            Store->>Store: shift queued task and abort processing
        else Still safe
            Store->>LLM: start processing task
        end
        deactivate Store
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested reviewers

  • lucksus

Poem

🐇 I hop and check the channel bright,

"Who's processing now? Is it day or night?"
Stale clocks fade, a timeout keeps the peace,
Two rabbits step aside — the queued work finds release. 🥕

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title directly summarizes the main change: preventing concurrent subgroup processing across peers, which is the core problem being solved according to the PR objectives.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/concurrent-subgroup-processing
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@app/src/stores/aiStore.ts`:
- Around line 187-194: The code uses a non-null assertion rawChannel.id! when
calling checkIfWeShouldProcessTask which can hide undefined ids; update the
logic in the block that builds shouldProcess so you first validate rawChannel.id
(from rawChannel or conversationData.channel) is present and bail out (return
null or skip calling checkIfWeShouldProcessTask) if it's undefined, then only
call checkIfWeShouldProcessTask(unprocessedItems,
communityService.signallingService, rawChannel.id) when the id exists; reference
rawChannel, unprocessedItems, checkIfWeShouldProcessTask,
conversationData.channel and communityId to locate and modify the guard.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 6f624a83-8d80-43c6-ab36-8eda0398fd32

📥 Commits

Reviewing files that changed from the base of the PR and between ce8dee7 and 0a8be9f.

📒 Files selected for processing (2)
  • app/src/components/conversation/timeline/TimelineColumn.vue
  • app/src/stores/aiStore.ts

Comment thread app/src/stores/aiStore.ts
…nity

Bail out early if rawChannel.id is undefined instead of using a non-null
assertion that could hide the issue.
@lucksus lucksus merged commit c9130b0 into dev Apr 1, 2026
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants