diff --git a/pkg/github/runners/message-processor.go b/pkg/github/runners/message-processor.go index 16bef03..f157087 100644 --- a/pkg/github/runners/message-processor.go +++ b/pkg/github/runners/message-processor.go @@ -105,9 +105,12 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale p.logger.Infof("process batched runner scale set job messages with id %d and batch size %d", message.MessageId, len(batchedMessages)) - requiredRunners := message.Statistics.TotalAssignedJobs - message.Statistics.TotalRegisteredRunners + inflight := int(p.provisioningInflight.Load()) + requiredRunners := message.Statistics.TotalAssignedJobs - message.Statistics.TotalRegisteredRunners - inflight provisionedRunners := 0 + p.logger.Infof("provisioning inflight: %d, required runners: %d", inflight, requiredRunners) + if message.MessageId == 0 && len(batchedMessages) == 0 { // The session-creation snapshot can lag: runners whose jobs are already // running may not be counted in TotalRegisteredRunners yet. A running job @@ -206,11 +209,13 @@ func (p *RunnerMessageProcessor) AdoptVM(vmName string) { } func (p *RunnerMessageProcessor) startRunner(job jobIdentity) { + p.provisioningInflight.Add(1) var executionErr error defer p.removeUpstreamCanceledJob(job) executor, commands, provisioningErr := p.provisionRunnerWithRetry(p.ctx, job) + p.provisioningInflight.Add(-1) if provisioningErr != nil { if errors.Is(provisioningErr, context.Canceled) { p.logger.Infof("provisioning canceled for %s", p.runnerScaleSetName) diff --git a/pkg/github/runners/types.go b/pkg/github/runners/types.go index 5818ecf..9fddae2 100644 --- a/pkg/github/runners/types.go +++ b/pkg/github/runners/types.go @@ -7,6 +7,7 @@ package runners import ( "context" "sync" + "sync/atomic" "github.com/macstadium/orka-github-actions-integration/pkg/github/actions" "github.com/macstadium/orka-github-actions-integration/pkg/github/messagequeue" @@ -47,4 +48,5 @@ type RunnerMessageProcessor struct { upstreamCanceledJobsMutex sync.RWMutex runnerContextCancels map[string]context.CancelFunc runnerContextCancelsMutex sync.Mutex + provisioningInflight atomic.Int32 }