diff --git a/producer.go b/producer.go index 93db03e6..1fba620a 100644 --- a/producer.go +++ b/producer.go @@ -392,10 +392,18 @@ func (p *producer) StartWorkContext(fetchCtx, workCtx context.Context) error { } var subroutineWG sync.WaitGroup - subroutineWG.Add(3) subroutineCtx, cancelSubroutines := context.WithCancelCause(context.WithoutCancel(fetchCtx)) + + subroutineWG.Add(1) + go p.fetchPollLoop(subroutineCtx, &subroutineWG) + + subroutineWG.Add(1) go p.heartbeatLogLoop(subroutineCtx, &subroutineWG) + + subroutineWG.Add(1) go p.reportQueueStatusLoop(subroutineCtx, &subroutineWG) + + subroutineWG.Add(1) go p.reportProducerStatusLoop(subroutineCtx, &subroutineWG) if p.config.Notifier == nil { @@ -513,23 +521,6 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context) { // an insert notification or a fetch poll. p.fetchLimiter.Call() - fetchPollTimer := time.NewTimer(p.config.FetchPollInterval) - go func() { - for { - select { - case <-fetchCtx.Done(): - // Stop fetch timer so no more fetches are triggered. - if !fetchPollTimer.Stop() { - <-fetchPollTimer.C - } - return - case <-fetchPollTimer.C: - p.fetchLimiter.Call() - fetchPollTimer.Reset(p.config.FetchPollInterval) - } - } - }() - fetchResultCh := make(chan producerFetchResult) for { select { @@ -598,6 +589,29 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context) { } } +// Loops every FetchPollInterval to check for jobs. This is meant as a back up +// in case something with listen/notify didn't work, or the fetch limiter was +// limited so there's still jobs to pick up, and it's also important in +// poll-only mode. +func (p *producer) fetchPollLoop(ctx context.Context, wg *sync.WaitGroup) { + defer wg.Done() + + fetchPollTimer := time.NewTimer(p.config.FetchPollInterval) + for { + select { + case <-ctx.Done(): + // Stop fetch timer so no more fetches are triggered. + if !fetchPollTimer.Stop() { + <-fetchPollTimer.C + } + return + case <-fetchPollTimer.C: + p.fetchLimiter.Call() + fetchPollTimer.Reset(p.config.FetchPollInterval) + } + } +} + func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan producerFetchResult) { var limit int if p.paused {