From 38bd0b19436ff67a380e4aa4ce7ff03f6e7687bf Mon Sep 17 00:00:00 2001 From: Brandur Date: Wed, 4 Jun 2025 02:17:18 +0200 Subject: [PATCH] Break poll loop out into subroutine guaranteed to quit before producer stop This one's aimed at fixing the race detector failure as observed in [1]. Honestly, I couldn't repro this locally even at 1,000 iteration, but reading the code, I'm pretty sure I know what the problem is. The `fetchAndRunLoop` would start a separate goroutine that'd use a timer to trigger a poll at `FetchPollInterval` so that fetches are occasionally performed even if listen/notify missed something. This inner goroutine would stop on context cancellation, but it wasn't guaranteed to return before a stop finished. Therefore, as the stress test called `Start` again, `fetchLimiter` could be modified while the goroutine was still accessing it from the last run of the producer. Fix this by making the fetch loop a "subroutine", which is a convention used already for other goroutines in `Start`. `Start` waits on all these goroutines to finish before completing a stop, therefore guaranteeing that the next iteration of `Start` can do so safely. [1] https://github.com/riverqueue/river/actions/runs/15419505042/job/43390545084 --- producer.go | 50 ++++++++++++++++++++++++++++++++------------------ 1 file changed, 32 insertions(+), 18 deletions(-) diff --git a/producer.go b/producer.go index 93db03e6..1fba620a 100644 --- a/producer.go +++ b/producer.go @@ -392,10 +392,18 @@ func (p *producer) StartWorkContext(fetchCtx, workCtx context.Context) error { } var subroutineWG sync.WaitGroup - subroutineWG.Add(3) subroutineCtx, cancelSubroutines := context.WithCancelCause(context.WithoutCancel(fetchCtx)) + + subroutineWG.Add(1) + go p.fetchPollLoop(subroutineCtx, &subroutineWG) + + subroutineWG.Add(1) go p.heartbeatLogLoop(subroutineCtx, &subroutineWG) + + subroutineWG.Add(1) go p.reportQueueStatusLoop(subroutineCtx, &subroutineWG) + + subroutineWG.Add(1) go p.reportProducerStatusLoop(subroutineCtx, &subroutineWG) if p.config.Notifier == nil { @@ -513,23 +521,6 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context) { // an insert notification or a fetch poll. p.fetchLimiter.Call() - fetchPollTimer := time.NewTimer(p.config.FetchPollInterval) - go func() { - for { - select { - case <-fetchCtx.Done(): - // Stop fetch timer so no more fetches are triggered. - if !fetchPollTimer.Stop() { - <-fetchPollTimer.C - } - return - case <-fetchPollTimer.C: - p.fetchLimiter.Call() - fetchPollTimer.Reset(p.config.FetchPollInterval) - } - } - }() - fetchResultCh := make(chan producerFetchResult) for { select { @@ -598,6 +589,29 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context) { } } +// Loops every FetchPollInterval to check for jobs. This is meant as a back up +// in case something with listen/notify didn't work, or the fetch limiter was +// limited so there's still jobs to pick up, and it's also important in +// poll-only mode. +func (p *producer) fetchPollLoop(ctx context.Context, wg *sync.WaitGroup) { + defer wg.Done() + + fetchPollTimer := time.NewTimer(p.config.FetchPollInterval) + for { + select { + case <-ctx.Done(): + // Stop fetch timer so no more fetches are triggered. + if !fetchPollTimer.Stop() { + <-fetchPollTimer.C + } + return + case <-fetchPollTimer.C: + p.fetchLimiter.Call() + fetchPollTimer.Reset(p.config.FetchPollInterval) + } + } +} + func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan producerFetchResult) { var limit int if p.paused {