Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pkg/github/runners/message-processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,12 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale

p.logger.Infof("process batched runner scale set job messages with id %d and batch size %d", message.MessageId, len(batchedMessages))

requiredRunners := message.Statistics.TotalAssignedJobs - message.Statistics.TotalRegisteredRunners
inflight := int(p.provisioningInflight.Load())
requiredRunners := message.Statistics.TotalAssignedJobs - message.Statistics.TotalRegisteredRunners - inflight
provisionedRunners := 0

p.logger.Infof("provisioning inflight: %d, required runners: %d", inflight, requiredRunners)

if message.MessageId == 0 && len(batchedMessages) == 0 {
// The session-creation snapshot can lag: runners whose jobs are already
// running may not be counted in TotalRegisteredRunners yet. A running job
Expand Down Expand Up @@ -206,11 +209,13 @@ func (p *RunnerMessageProcessor) AdoptVM(vmName string) {
}

func (p *RunnerMessageProcessor) startRunner(job jobIdentity) {
p.provisioningInflight.Add(1)
var executionErr error

defer p.removeUpstreamCanceledJob(job)

executor, commands, provisioningErr := p.provisionRunnerWithRetry(p.ctx, job)
p.provisioningInflight.Add(-1)
if provisioningErr != nil {
if errors.Is(provisioningErr, context.Canceled) {
p.logger.Infof("provisioning canceled for %s", p.runnerScaleSetName)
Expand Down
2 changes: 2 additions & 0 deletions pkg/github/runners/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package runners
import (
"context"
"sync"
"sync/atomic"

"github.com/macstadium/orka-github-actions-integration/pkg/github/actions"
"github.com/macstadium/orka-github-actions-integration/pkg/github/messagequeue"
Expand Down Expand Up @@ -47,4 +48,5 @@ type RunnerMessageProcessor struct {
upstreamCanceledJobsMutex sync.RWMutex
runnerContextCancels map[string]context.CancelFunc
runnerContextCancelsMutex sync.Mutex
provisioningInflight atomic.Int32
}
Loading