From e94ade80d38312a6ed8abc820b9dc37cb64073ca Mon Sep 17 00:00:00 2001 From: ispasov Date: Fri, 29 May 2026 10:56:02 +0300 Subject: [PATCH 1/5] Reconcile VMs on startup VMs spun up from the previous process were left orphaned. They were either not fully provisioned or were never cleaned up upon job completion. In addition to that the integration was ignoring the initial resource request message and the runner had to wait for another job to start in order to provision the needed capacity. This PR introduces the following: - Reconciliation: - List existing Orka VMs by scale set name prefix on startup - Provisioning writes sentinel files (/tmp/orka-runner-setup-complete, /tmp/orka-runner-run-complete) so the reconciler can probe state. - For each existing VM, check the GitHub runner and SSH into the VM: - No GH runner OR setup never completed OR run.sh crashed -> delete. - run.sh finished but cleanup was missed -> clean up. - Setup complete and run.sh running -> adopt. Adoption: - Track existing running VMs as if they were created by the current process. Recovery provisioning: - Start respecting the first message received from GitHub. --- examples/.env | 7 + main.go | 36 +++-- pkg/env/constants.go | 2 + pkg/env/env.go | 4 + pkg/github/runners/message-processor.go | 167 ++++++++++++++---------- pkg/github/runners/vm_tracker_test.go | 4 + pkg/orka/client.go | 16 +++ pkg/orka/types.go | 7 + pkg/reconciler/reconciler.go | 161 +++++++++++++++++++++++ pkg/runner-provisioner/provisioner.go | 7 + 10 files changed, 332 insertions(+), 79 deletions(-) create mode 100644 pkg/reconciler/reconciler.go diff --git a/examples/.env b/examples/.env index e0c2e81..357965e 100644 --- a/examples/.env +++ b/examples/.env @@ -88,3 +88,10 @@ METRICS_POLL_INTERVAL=30s # When false, reuses an existing scale set if found and skips deletion on exit. This allows for the runner to pick up where it left off if the previous runner was terminated abnormally. # Defaults to false. MANAGE_RUNNER_SCALE_SETS=false + +# Startup VM reconciliation (optional) +# When true, on startup lists existing Orka VMs for the scale set, checks each VM's state over SSH +# (sentinel files + run.sh process), and either adopts (active), cleans up (run finished), or deletes +# (no GitHub runner / setup incomplete / run.sh crashed) so the controller recovers from prior process exits. +# Defaults to true. +ENABLE_RECONCILIATION=true diff --git a/main.go b/main.go index e629edb..30d1c19 100644 --- a/main.go +++ b/main.go @@ -18,6 +18,7 @@ import ( "github.com/macstadium/orka-github-actions-integration/pkg/logging" "github.com/macstadium/orka-github-actions-integration/pkg/metrics" "github.com/macstadium/orka-github-actions-integration/pkg/orka" + "github.com/macstadium/orka-github-actions-integration/pkg/reconciler" provisioner "github.com/macstadium/orka-github-actions-integration/pkg/runner-provisioner" "go.uber.org/zap" "k8s.io/apimachinery/pkg/util/validation" @@ -83,6 +84,20 @@ func main() { panic(fmt.Sprintf("unable to access Orka cluster. More info: %s", err.Error())) } + vmTracker := runners.NewVMTracker(orkaClient, actionsClient, logger) + go vmTracker.Start(ctx, envData.VMTrackerInterval) + + runnerProvisioner := provisioner.NewRunnerProvisioner(runnerScaleSet, actionsClient, orkaClient, envData) + + var existingVMs []*orka.OrkaVMInfo + if envData.EnableReconciliation { + var listErr error + existingVMs, listErr = orkaClient.ListVMs(ctx, runnerScaleSet.Name) + if listErr != nil { + logger.Warnf("failed to list existing VMs for reconciliation: %v", listErr) + } + } + runnerManager, err := runners.NewRunnerManager(ctx, actionsClient, runnerScaleSet.Id) if errors.Is(err, runners.ErrActiveSession) { logger.Infof("scale set %s (id=%d) has a stale active session, deleting and recreating", runnerScaleSet.Name, runnerScaleSet.Id) @@ -94,6 +109,7 @@ func main() { panic(fmt.Sprintf("error recreating scale set after active session conflict: %s", err.Error())) } logger.Infof("recreated scale set %s (id=%d)", runnerScaleSet.Name, runnerScaleSet.Id) + runnerProvisioner = provisioner.NewRunnerProvisioner(runnerScaleSet, actionsClient, orkaClient, envData) runnerManager, err = runners.NewRunnerManager(ctx, actionsClient, runnerScaleSet.Id) } if err != nil { @@ -124,7 +140,14 @@ func main() { } }() - run(ctx, actionsClient, orkaClient, runnerScaleSet, runnerManager, envData, logger) + runnerMessageProcessor := runners.NewRunnerMessageProcessor(ctx, runnerManager, runnerProvisioner, vmTracker, runnerScaleSet) + + if envData.EnableReconciliation { + vmReconciler := reconciler.NewVMReconciler(actionsClient, runnerProvisioner, runnerMessageProcessor.AdoptVM, envData) + go vmReconciler.ReconcileVMs(ctx, existingVMs) + } + + run(ctx, runnerMessageProcessor, runnerScaleSet, logger) } func createScaleSet(ctx context.Context, actionsClient *actions.ActionsClient, runnerName string, groupId int) (*types.RunnerScaleSet, error) { @@ -144,15 +167,8 @@ func createScaleSet(ctx context.Context, actionsClient *actions.ActionsClient, r }) } -func run(ctx context.Context, actionsClient *actions.ActionsClient, orkaClient *orka.OrkaClient, runnerScaleSet *types.RunnerScaleSet, runnerManager *runners.RunnerManager, envData *env.Data, logger *zap.SugaredLogger) { - runnerProvisioner := provisioner.NewRunnerProvisioner(runnerScaleSet, actionsClient, orkaClient, envData) - - vmTracker := runners.NewVMTracker(orkaClient, actionsClient, logger) - go vmTracker.Start(ctx, envData.VMTrackerInterval) - - runnerMessageProcessor := runners.NewRunnerMessageProcessor(ctx, runnerManager, runnerProvisioner, vmTracker, runnerScaleSet) - - if err := runnerMessageProcessor.StartProcessingMessages(); err != nil && !errors.Is(err, context.Canceled) { +func run(ctx context.Context, processor *runners.RunnerMessageProcessor, runnerScaleSet *types.RunnerScaleSet, logger *zap.SugaredLogger) { + if err := processor.StartProcessingMessages(); err != nil && !errors.Is(err, context.Canceled) { logger.Errorf("failed to start processing messages for runnerScaleSet %s: %v", runnerScaleSet.Name, err) } } diff --git a/pkg/env/constants.go b/pkg/env/constants.go index 7b3b8b5..5c1f771 100644 --- a/pkg/env/constants.go +++ b/pkg/env/constants.go @@ -37,4 +37,6 @@ const ( MetricsPollIntervalEnvName = "METRICS_POLL_INTERVAL" ManageRunnerScaleSetsEnvName = "MANAGE_RUNNER_SCALE_SETS" + + EnableReconciliationEnvName = "ENABLE_RECONCILIATION" ) diff --git a/pkg/env/env.go b/pkg/env/env.go index e77aefb..7b60941 100644 --- a/pkg/env/env.go +++ b/pkg/env/env.go @@ -57,6 +57,8 @@ type Data struct { MetricsPollInterval time.Duration ManageRunnerScaleSets bool + + EnableReconciliation bool } func ParseEnv() *Data { @@ -94,6 +96,8 @@ func ParseEnv() *Data { MetricsPollInterval: getDurationEnv(MetricsPollIntervalEnvName, 30*time.Second), ManageRunnerScaleSets: getBoolEnv(ManageRunnerScaleSetsEnvName, false), + + EnableReconciliation: getBoolEnv(EnableReconciliationEnvName, true), } envData.OrkaURL = strings.TrimSuffix(envData.OrkaURL, "/") diff --git a/pkg/github/runners/message-processor.go b/pkg/github/runners/message-processor.go index 9f8be76..fefe31d 100644 --- a/pkg/github/runners/message-processor.go +++ b/pkg/github/runners/message-processor.go @@ -13,6 +13,7 @@ import ( "sync" "time" + "github.com/google/uuid" "github.com/macstadium/orka-github-actions-integration/pkg/github/types" "github.com/macstadium/orka-github-actions-integration/pkg/logging" "github.com/macstadium/orka-github-actions-integration/pkg/orka" @@ -34,6 +35,10 @@ func newJobIdentity(jobId string, runnerRequestId int64) jobIdentity { return jobIdentity{jobId: jobId, runnerRequestId: runnerRequestId} } +func newRecoveryJobIdentity() jobIdentity { + return jobIdentity{jobId: "recovery-" + uuid.NewString()} +} + func (k jobIdentity) String() string { return fmt.Sprintf("jobId=%s runnerRequestId=%d", k.jobId, k.runnerRequestId) } @@ -91,13 +96,11 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale return nil } - if message.MessageId == 0 && message.Body == "" { // initial message with statistics only - return nil - } - var batchedMessages []json.RawMessage - if err := json.NewDecoder(strings.NewReader(message.Body)).Decode(&batchedMessages); err != nil { - return fmt.Errorf("could not decode job messages. %w", err) + if message.Body != "" { + if err := json.NewDecoder(strings.NewReader(message.Body)).Decode(&batchedMessages); err != nil { + return fmt.Errorf("could not decode job messages. %w", err) + } } p.logger.Infof("process batched runner scale set job messages with id %d and batch size %d", message.MessageId, len(batchedMessages)) @@ -105,6 +108,18 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale requiredRunners := message.Statistics.TotalAssignedJobs - message.Statistics.TotalRegisteredRunners provisionedRunners := 0 + if message.MessageId == 0 && len(batchedMessages) == 0 { + p.logger.Infof("initial message received, provision runners to cover assigned-job gap") + requiredRunners = requiredRunners - message.Statistics.TotalRunningJobs + p.logger.Infof("required runners after subtracting running jobs: %d", requiredRunners) + for provisionedRunners < requiredRunners { + provisionedRunners++ + p.logger.Infof("provisioning runner %d/%d to cover assigned-job gap", provisionedRunners, requiredRunners) + p.startRunner(newRecoveryJobIdentity()) + } + return nil + } + var availableJobs []int64 for _, message := range batchedMessages { var messageType types.JobMessageType @@ -131,69 +146,7 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale if provisionedRunners < requiredRunners { provisionedRunners++ p.logger.Infof("number of runners provisioning started: %d. Max required runners: %d", provisionedRunners, requiredRunners) - - job := newJobIdentity(jobAssigned.JobId, jobAssigned.RunnerRequestId) - - go func() { - var executionErr error - - defer p.removeUpstreamCanceledJob(job) - - executor, commands, provisioningErr := p.provisionRunnerWithRetry(p.ctx, job) - if provisioningErr != nil { - if errors.Is(provisioningErr, context.Canceled) { - p.logger.Infof("provisioning canceled for %s", p.runnerScaleSetName) - } else { - p.logger.Errorf("unable to provision Orka runner for %s: %v", p.runnerScaleSetName, provisioningErr) - } - return - } - - if executor == nil { - p.logger.Errorf("provisioning returned nil executor for %s", p.runnerScaleSetName) - return - } - - runnerContext, cancel := context.WithCancel(p.ctx) - p.storeRunnerContextCancel(executor.VMName, cancel) - - context.AfterFunc(runnerContext, func() { - p.logger.Infof("cleaning up resources for %s after runner context is canceled", executor.VMName) - p.runnerProvisioner.CleanupResources(context.WithoutCancel(p.ctx), executor.VMName) - p.vmTracker.Untrack(executor.VMName) - }) - - defer func() { - if isNetworkingFailure(executionErr) { - p.logger.Warnf("SSH connection dropped for %s (%v). Skipping cleanup, relying on JobCompleted webhook.", job, executionErr) - return - } - - var cancelReason string - var exitErr *ssh.ExitError - - if errors.Is(executionErr, context.Canceled) { - cancelReason = "runner context was canceled" - p.logger.Infof("runner context canceled for RunnerName %s with %s. Cleaning up resources.", executor.VMName, job) - } else if executionErr != nil { - if errors.As(executionErr, &exitErr) { - cancelReason = fmt.Sprintf("execution failed with exit code %d", exitErr.ExitStatus()) - p.logger.Errorf("execution failed with exit code %d for RunnerName %s with %s. Cleaning up resources.", exitErr.ExitStatus(), executor.VMName, job) - } else { - cancelReason = fmt.Sprintf("execution failed: %v", executionErr) - p.logger.Errorf("execution failed for RunnerName %s with %s. Cleaning up resources: %v", executor.VMName, job, executionErr) - } - } else { - cancelReason = "execution completed successfully" - p.logger.Infof("execution completed successfully for RunnerName %s with %s. Cleaning up resources.", executor.VMName, job) - } - - p.cancelRunnerContext(executor.VMName, cancelReason) - }() - - p.vmTracker.Track(executor.VMName) - executionErr = p.executeJobCommands(runnerContext, job, executor, commands) - }() + p.startRunner(newJobIdentity(jobAssigned.JobId, jobAssigned.RunnerRequestId)) } case "JobStarted": var jobStarted types.JobStarted @@ -234,6 +187,82 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale return nil } +func (p *RunnerMessageProcessor) AdoptVM(vmName string) { + runnerContext, cancel := context.WithCancel(p.ctx) + p.storeRunnerContextCancel(vmName, cancel) + + context.AfterFunc(runnerContext, func() { + p.logger.Infof("cleaning up resources for %s after runner context is canceled", vmName) + p.runnerProvisioner.CleanupResources(context.WithoutCancel(p.ctx), vmName) + p.vmTracker.Untrack(vmName) + }) + + p.vmTracker.Track(vmName) +} + +func (p *RunnerMessageProcessor) startRunner(job jobIdentity) { + go func() { + var executionErr error + + defer p.removeUpstreamCanceledJob(job) + + executor, commands, provisioningErr := p.provisionRunnerWithRetry(p.ctx, job) + if provisioningErr != nil { + if errors.Is(provisioningErr, context.Canceled) { + p.logger.Infof("provisioning canceled for %s", p.runnerScaleSetName) + } else { + p.logger.Errorf("unable to provision Orka runner for %s: %v", p.runnerScaleSetName, provisioningErr) + } + return + } + + if executor == nil { + p.logger.Errorf("provisioning returned nil executor for %s", p.runnerScaleSetName) + return + } + + runnerContext, cancel := context.WithCancel(p.ctx) + p.storeRunnerContextCancel(executor.VMName, cancel) + + context.AfterFunc(runnerContext, func() { + p.logger.Infof("cleaning up resources for %s after runner context is canceled", executor.VMName) + p.runnerProvisioner.CleanupResources(context.WithoutCancel(p.ctx), executor.VMName) + p.vmTracker.Untrack(executor.VMName) + }) + + defer func() { + if isNetworkingFailure(executionErr) { + p.logger.Warnf("SSH connection dropped for %s (%v). Skipping cleanup, relying on JobCompleted webhook.", job, executionErr) + return + } + + var cancelReason string + var exitErr *ssh.ExitError + + if errors.Is(executionErr, context.Canceled) { + cancelReason = "runner context was canceled" + p.logger.Infof("runner context canceled for RunnerName %s with %s. Cleaning up resources.", executor.VMName, job) + } else if executionErr != nil { + if errors.As(executionErr, &exitErr) { + cancelReason = fmt.Sprintf("execution failed with exit code %d", exitErr.ExitStatus()) + p.logger.Errorf("execution failed with exit code %d for RunnerName %s with %s. Cleaning up resources.", exitErr.ExitStatus(), executor.VMName, job) + } else { + cancelReason = fmt.Sprintf("execution failed: %v", executionErr) + p.logger.Errorf("execution failed for RunnerName %s with %s. Cleaning up resources: %v", executor.VMName, job, executionErr) + } + } else { + cancelReason = "execution completed successfully" + p.logger.Infof("execution completed successfully for RunnerName %s with %s. Cleaning up resources.", executor.VMName, job) + } + + p.cancelRunnerContext(executor.VMName, cancelReason) + }() + + p.vmTracker.Track(executor.VMName) + executionErr = p.executeJobCommands(runnerContext, job, executor, commands) + }() +} + func (p *RunnerMessageProcessor) provisionRunnerWithRetry(ctx context.Context, job jobIdentity) (*orka.VMCommandExecutor, []string, error) { for attempt := 1; !p.isUpstreamCanceled(job); attempt++ { executor, commands, err := p.runnerProvisioner.ProvisionRunner(ctx) diff --git a/pkg/github/runners/vm_tracker_test.go b/pkg/github/runners/vm_tracker_test.go index 8f837e1..ddf39a3 100644 --- a/pkg/github/runners/vm_tracker_test.go +++ b/pkg/github/runners/vm_tracker_test.go @@ -37,6 +37,10 @@ func (m *MockOrkaClient) DeployVM(ctx context.Context, namePrefix, vmConfig stri return nil, nil } +func (m *MockOrkaClient) ListVMs(ctx context.Context, namePrefix string) ([]*orka.OrkaVMInfo, error) { + return nil, nil +} + type MockActionsClient struct { GetRunnerFunc func(ctx context.Context, runnerName string) (*types.RunnerReference, error) } diff --git a/pkg/orka/client.go b/pkg/orka/client.go index 77d8de9..0a9b764 100644 --- a/pkg/orka/client.go +++ b/pkg/orka/client.go @@ -15,6 +15,7 @@ import ( type OrkaService interface { DeployVM(ctx context.Context, namePrefix, vmConfig string) (*OrkaVMDeployResponseModel, error) DeleteVM(ctx context.Context, name string) error + ListVMs(ctx context.Context, namePrefix string) ([]*OrkaVMInfo, error) } type OrkaClient struct { @@ -35,6 +36,21 @@ func (client *OrkaClient) DeployVM(ctx context.Context, namePrefix, vmConfig str return (*res)[0], nil } +func (client *OrkaClient) ListVMs(ctx context.Context, namePrefix string) ([]*OrkaVMInfo, error) { + res, err := exec.ExecJSONCommand[[]*OrkaVMInfo]("orka3", []string{"vm", "list", "--namespace", client.envData.OrkaNamespace, "-o", "json"}) + if err != nil { + return nil, err + } + + var filtered []*OrkaVMInfo + for _, vm := range *res { + if strings.HasPrefix(vm.Name, namePrefix) { + filtered = append(filtered, vm) + } + } + return filtered, nil +} + func (client *OrkaClient) DeleteVM(ctx context.Context, name string) error { out, err := exec.ExecStringCommand("orka3", []string{"vm", "delete", name, "--namespace", client.envData.OrkaNamespace}) if out == fmt.Sprintf("Successfully deleted vm %s", name) { diff --git a/pkg/orka/types.go b/pkg/orka/types.go index 3f962af..ea141b6 100644 --- a/pkg/orka/types.go +++ b/pkg/orka/types.go @@ -41,3 +41,10 @@ type OrkaClusterInfoResponseModel struct { AppClientId string `json:"appClientId"` BaseOauthEndpoint string `json:"baseOauthEndpoint"` } + +type OrkaVMInfo struct { + Name string `json:"name"` + IP string `json:"ip"` + SSH *int `json:"ssh,omitempty"` + Status VMPhase `json:"status"` +} diff --git a/pkg/reconciler/reconciler.go b/pkg/reconciler/reconciler.go new file mode 100644 index 0000000..62e0ba3 --- /dev/null +++ b/pkg/reconciler/reconciler.go @@ -0,0 +1,161 @@ +package reconciler + +import ( + "context" + "fmt" + "net" + "time" + + "github.com/macstadium/orka-github-actions-integration/pkg/env" + "github.com/macstadium/orka-github-actions-integration/pkg/github/actions" + "github.com/macstadium/orka-github-actions-integration/pkg/logging" + "github.com/macstadium/orka-github-actions-integration/pkg/orka" + provisioner "github.com/macstadium/orka-github-actions-integration/pkg/runner-provisioner" + "go.uber.org/zap" + "golang.org/x/crypto/ssh" +) + +type vmState int + +const ( + vmStateActive vmState = iota // run.sh running → adopt + vmStateRunComplete // run.sh exited cleanly → cleanup + vmStateNeedsDeletion // setup incomplete or run.sh crashed → delete +) + +type VMReconciler struct { + actionsClient actions.ActionsService + provisioner *provisioner.RunnerProvisioner + adopt func(vmName string) + envData *env.Data + logger *zap.SugaredLogger +} + +func NewVMReconciler(actionsClient actions.ActionsService, p *provisioner.RunnerProvisioner, adopt func(string), envData *env.Data) *VMReconciler { + return &VMReconciler{ + actionsClient: actionsClient, + provisioner: p, + adopt: adopt, + envData: envData, + logger: logging.Logger.Named("vm-reconciler"), + } +} + +// VMs must be captured before message processing starts to avoid reconciling +// VMs provisioned by the current process. +func (r *VMReconciler) ReconcileVMs(ctx context.Context, vms []*orka.OrkaVMInfo) { + if len(vms) == 0 { + return + } + r.logger.Infof("reconciliation: found %d existing VM(s) to reconcile", len(vms)) + + for _, vm := range vms { + r.reconcileVM(ctx, vm) + } + + r.logger.Infof("reconciliation: completed") +} + +func (r *VMReconciler) reconcileVM(ctx context.Context, vm *orka.OrkaVMInfo) { + runner, err := r.actionsClient.GetRunner(ctx, vm.Name) + if err != nil { + r.logger.Warnf("reconciliation: VM %s GitHub check failed, adopting as potential orphan: %v", vm.Name, err) + r.adopt(vm.Name) + return + } + + if runner == nil { + r.logger.Infof("reconciliation: VM %s has no GitHub runner, deleting", vm.Name) + go r.deleteVM(ctx, vm.Name) + return + } + + state, err := r.checkVMState(vm) + if err != nil { + r.logger.Warnf("reconciliation: VM %s state check failed, adopting as potential orphan: %v", vm.Name, err) + r.adopt(vm.Name) + return + } + + switch state { + case vmStateRunComplete: + r.logger.Infof("reconciliation: VM %s run completed without cleanup, cleaning up", vm.Name) + go r.provisioner.CleanupResources(context.WithoutCancel(ctx), vm.Name) + case vmStateNeedsDeletion: + r.logger.Infof("reconciliation: VM %s setup incomplete or run.sh crashed, deleting", vm.Name) + go r.deleteVM(ctx, vm.Name) + case vmStateActive: + r.logger.Infof("reconciliation: VM %s is active, adopting for cleanup", vm.Name) + r.adopt(vm.Name) + } +} + +func (r *VMReconciler) checkVMState(vm *orka.OrkaVMInfo) (vmState, error) { + if vm.SSH == nil { + return vmStateNeedsDeletion, fmt.Errorf("VM %s has no SSH port", vm.Name) + } + + vmIP, err := r.resolveVMIP(vm.IP) + if err != nil { + return vmStateNeedsDeletion, err + } + + client, err := ssh.Dial("tcp", fmt.Sprintf("%s:%d", vmIP, *vm.SSH), &ssh.ClientConfig{ + User: r.envData.OrkaVMUsername, + Auth: []ssh.AuthMethod{ssh.Password(r.envData.OrkaVMPassword)}, + HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error { + return nil + }, + Timeout: 10 * time.Second, + }) + if err != nil { + return vmStateNeedsDeletion, fmt.Errorf("SSH connect failed: %v", err) + } + defer client.Close() + + if !r.fileExists(client, provisioner.SentinelSetupComplete) { + return vmStateNeedsDeletion, nil + } + + if r.fileExists(client, provisioner.SentinelRunComplete) { + return vmStateRunComplete, nil + } + + if !r.isProcessRunning(client, "actions-runner/run.sh") { + return vmStateNeedsDeletion, nil + } + + return vmStateActive, nil +} + +func (r *VMReconciler) fileExists(client *ssh.Client, path string) bool { + session, err := client.NewSession() + if err != nil { + return false + } + defer session.Close() + return session.Run(fmt.Sprintf("test -f %s", path)) == nil +} + +func (r *VMReconciler) isProcessRunning(client *ssh.Client, pattern string) bool { + session, err := client.NewSession() + if err != nil { + return false + } + defer session.Close() + return session.Run(fmt.Sprintf("pgrep -f %q", pattern)) == nil +} + +func (r *VMReconciler) resolveVMIP(vmIP string) (string, error) { + if !r.envData.OrkaEnableNodeIPMapping { + return vmIP, nil + } + if r.envData.OrkaNodeIPMapping[vmIP] == "" { + return "", fmt.Errorf("unable to retrieve VM IP from node IP mapping") + } + return r.envData.OrkaNodeIPMapping[vmIP], nil +} + +func (r *VMReconciler) deleteVM(ctx context.Context, vmName string) { + r.provisioner.CleanupResources(context.WithoutCancel(ctx), vmName) +} diff --git a/pkg/runner-provisioner/provisioner.go b/pkg/runner-provisioner/provisioner.go index 040432d..17daed3 100644 --- a/pkg/runner-provisioner/provisioner.go +++ b/pkg/runner-provisioner/provisioner.go @@ -28,6 +28,11 @@ type RunnerProvisioner struct { mu sync.Mutex } +const ( + SentinelSetupComplete = "/tmp/orka-runner-setup-complete" + SentinelRunComplete = "/tmp/orka-runner-run-complete" +) + var commands_template = []string{ "set -e", "echo \"Downloading Git Action Runner from https://github.com/actions/runner/releases/download/v$VERSION/actions-runner-osx-$(uname -m | sed 's/86_//')-$VERSION.tar.gz\"", @@ -38,8 +43,10 @@ var commands_template = []string{ "cd /Users/$USERNAME/actions-runner", "tar xzf /Users/$USERNAME/actions-runner/actions-runner.tar.gz", "echo 'Git Action Runner unarchive completed'", + "touch " + SentinelSetupComplete, "echo 'Starting Git Action Runner'", "/Users/$USERNAME/actions-runner/run.sh --jitconfig $JITCONFIG", + "touch " + SentinelRunComplete, "echo 'Git Action Runner exited'", } From ee748b9521ca1eed3a41b8124698a283dd73cfc0 Mon Sep 17 00:00:00 2001 From: ispasov Date: Wed, 3 Jun 2026 12:17:35 +0300 Subject: [PATCH 2/5] Move go routine to caller --- pkg/github/runners/message-processor.go | 98 ++++++++++++------------- 1 file changed, 48 insertions(+), 50 deletions(-) diff --git a/pkg/github/runners/message-processor.go b/pkg/github/runners/message-processor.go index fefe31d..c071083 100644 --- a/pkg/github/runners/message-processor.go +++ b/pkg/github/runners/message-processor.go @@ -115,7 +115,7 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale for provisionedRunners < requiredRunners { provisionedRunners++ p.logger.Infof("provisioning runner %d/%d to cover assigned-job gap", provisionedRunners, requiredRunners) - p.startRunner(newRecoveryJobIdentity()) + go p.startRunner(newRecoveryJobIdentity()) } return nil } @@ -146,7 +146,7 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale if provisionedRunners < requiredRunners { provisionedRunners++ p.logger.Infof("number of runners provisioning started: %d. Max required runners: %d", provisionedRunners, requiredRunners) - p.startRunner(newJobIdentity(jobAssigned.JobId, jobAssigned.RunnerRequestId)) + go p.startRunner(newJobIdentity(jobAssigned.JobId, jobAssigned.RunnerRequestId)) } case "JobStarted": var jobStarted types.JobStarted @@ -201,66 +201,64 @@ func (p *RunnerMessageProcessor) AdoptVM(vmName string) { } func (p *RunnerMessageProcessor) startRunner(job jobIdentity) { - go func() { - var executionErr error + var executionErr error - defer p.removeUpstreamCanceledJob(job) + defer p.removeUpstreamCanceledJob(job) - executor, commands, provisioningErr := p.provisionRunnerWithRetry(p.ctx, job) - if provisioningErr != nil { - if errors.Is(provisioningErr, context.Canceled) { - p.logger.Infof("provisioning canceled for %s", p.runnerScaleSetName) - } else { - p.logger.Errorf("unable to provision Orka runner for %s: %v", p.runnerScaleSetName, provisioningErr) - } - return + executor, commands, provisioningErr := p.provisionRunnerWithRetry(p.ctx, job) + if provisioningErr != nil { + if errors.Is(provisioningErr, context.Canceled) { + p.logger.Infof("provisioning canceled for %s", p.runnerScaleSetName) + } else { + p.logger.Errorf("unable to provision Orka runner for %s: %v", p.runnerScaleSetName, provisioningErr) } + return + } - if executor == nil { - p.logger.Errorf("provisioning returned nil executor for %s", p.runnerScaleSetName) - return - } + if executor == nil { + p.logger.Errorf("provisioning returned nil executor for %s", p.runnerScaleSetName) + return + } - runnerContext, cancel := context.WithCancel(p.ctx) - p.storeRunnerContextCancel(executor.VMName, cancel) + runnerContext, cancel := context.WithCancel(p.ctx) + p.storeRunnerContextCancel(executor.VMName, cancel) - context.AfterFunc(runnerContext, func() { - p.logger.Infof("cleaning up resources for %s after runner context is canceled", executor.VMName) - p.runnerProvisioner.CleanupResources(context.WithoutCancel(p.ctx), executor.VMName) - p.vmTracker.Untrack(executor.VMName) - }) + context.AfterFunc(runnerContext, func() { + p.logger.Infof("cleaning up resources for %s after runner context is canceled", executor.VMName) + p.runnerProvisioner.CleanupResources(context.WithoutCancel(p.ctx), executor.VMName) + p.vmTracker.Untrack(executor.VMName) + }) - defer func() { - if isNetworkingFailure(executionErr) { - p.logger.Warnf("SSH connection dropped for %s (%v). Skipping cleanup, relying on JobCompleted webhook.", job, executionErr) - return - } + defer func() { + if isNetworkingFailure(executionErr) { + p.logger.Warnf("SSH connection dropped for %s (%v). Skipping cleanup, relying on JobCompleted webhook.", job, executionErr) + return + } - var cancelReason string - var exitErr *ssh.ExitError - - if errors.Is(executionErr, context.Canceled) { - cancelReason = "runner context was canceled" - p.logger.Infof("runner context canceled for RunnerName %s with %s. Cleaning up resources.", executor.VMName, job) - } else if executionErr != nil { - if errors.As(executionErr, &exitErr) { - cancelReason = fmt.Sprintf("execution failed with exit code %d", exitErr.ExitStatus()) - p.logger.Errorf("execution failed with exit code %d for RunnerName %s with %s. Cleaning up resources.", exitErr.ExitStatus(), executor.VMName, job) - } else { - cancelReason = fmt.Sprintf("execution failed: %v", executionErr) - p.logger.Errorf("execution failed for RunnerName %s with %s. Cleaning up resources: %v", executor.VMName, job, executionErr) - } + var cancelReason string + var exitErr *ssh.ExitError + + if errors.Is(executionErr, context.Canceled) { + cancelReason = "runner context was canceled" + p.logger.Infof("runner context canceled for RunnerName %s with %s. Cleaning up resources.", executor.VMName, job) + } else if executionErr != nil { + if errors.As(executionErr, &exitErr) { + cancelReason = fmt.Sprintf("execution failed with exit code %d", exitErr.ExitStatus()) + p.logger.Errorf("execution failed with exit code %d for RunnerName %s with %s. Cleaning up resources.", exitErr.ExitStatus(), executor.VMName, job) } else { - cancelReason = "execution completed successfully" - p.logger.Infof("execution completed successfully for RunnerName %s with %s. Cleaning up resources.", executor.VMName, job) + cancelReason = fmt.Sprintf("execution failed: %v", executionErr) + p.logger.Errorf("execution failed for RunnerName %s with %s. Cleaning up resources: %v", executor.VMName, job, executionErr) } + } else { + cancelReason = "execution completed successfully" + p.logger.Infof("execution completed successfully for RunnerName %s with %s. Cleaning up resources.", executor.VMName, job) + } - p.cancelRunnerContext(executor.VMName, cancelReason) - }() - - p.vmTracker.Track(executor.VMName) - executionErr = p.executeJobCommands(runnerContext, job, executor, commands) + p.cancelRunnerContext(executor.VMName, cancelReason) }() + + p.vmTracker.Track(executor.VMName) + executionErr = p.executeJobCommands(runnerContext, job, executor, commands) } func (p *RunnerMessageProcessor) provisionRunnerWithRetry(ctx context.Context, job jobIdentity) (*orka.VMCommandExecutor, []string, error) { From c9f098c2bbdf75c9a776069ae8a07bb62b9192ca Mon Sep 17 00:00:00 2001 From: ispasov Date: Wed, 3 Jun 2026 14:48:10 +0300 Subject: [PATCH 3/5] Better reporting during reconcile --- pkg/reconciler/reconciler.go | 56 +++++++++++++++++++++++++++--------- 1 file changed, 43 insertions(+), 13 deletions(-) diff --git a/pkg/reconciler/reconciler.go b/pkg/reconciler/reconciler.go index 62e0ba3..7ed06ca 100644 --- a/pkg/reconciler/reconciler.go +++ b/pkg/reconciler/reconciler.go @@ -2,6 +2,7 @@ package reconciler import ( "context" + "errors" "fmt" "net" "time" @@ -113,37 +114,66 @@ func (r *VMReconciler) checkVMState(vm *orka.OrkaVMInfo) (vmState, error) { } defer client.Close() - if !r.fileExists(client, provisioner.SentinelSetupComplete) { + setupComplete, err := r.fileExists(client, provisioner.SentinelSetupComplete) + if err != nil { + return vmStateNeedsDeletion, fmt.Errorf("setup sentinel check failed: %w", err) + } + if !setupComplete { + r.logger.Infof("reconciliation: VM %s setup is not complete, deleting", vm.Name) return vmStateNeedsDeletion, nil } - if r.fileExists(client, provisioner.SentinelRunComplete) { + runComplete, err := r.fileExists(client, provisioner.SentinelRunComplete) + if err != nil { + return vmStateNeedsDeletion, fmt.Errorf("run sentinel check failed: %w", err) + } + if runComplete { + r.logger.Infof("reconciliation: VM %s run.sh completed, cleaning up", vm.Name) return vmStateRunComplete, nil } - if !r.isProcessRunning(client, "actions-runner/run.sh") { + processRunning, err := r.isProcessRunning(client, "actions-runner/run.sh") + if err != nil { + return vmStateNeedsDeletion, fmt.Errorf("run.sh process check failed: %w", err) + } + if !processRunning { + r.logger.Infof("reconciliation: VM %s run.sh is not running, deleting", vm.Name) return vmStateNeedsDeletion, nil } return vmStateActive, nil } -func (r *VMReconciler) fileExists(client *ssh.Client, path string) bool { - session, err := client.NewSession() - if err != nil { - return false - } - defer session.Close() - return session.Run(fmt.Sprintf("test -f %s", path)) == nil +func (r *VMReconciler) fileExists(client *ssh.Client, path string) (bool, error) { + return r.runBoolCheck(client, fmt.Sprintf("test -f %s", path)) +} + +func (r *VMReconciler) isProcessRunning(client *ssh.Client, pattern string) (bool, error) { + return r.runBoolCheck(client, fmt.Sprintf("pgrep -f %q", pattern)) } -func (r *VMReconciler) isProcessRunning(client *ssh.Client, pattern string) bool { +// runBoolCheck runs a remote command whose exit status encodes a boolean: +// 0 means true, 1 means a clean negative (file missing / no process matched), +// anything else (SSH failure, command error) is returned as an error so a +// failed check is never mistaken for a negative result. +func (r *VMReconciler) runBoolCheck(client *ssh.Client, cmd string) (bool, error) { session, err := client.NewSession() if err != nil { - return false + return false, fmt.Errorf("ssh session failed: %w", err) } defer session.Close() - return session.Run(fmt.Sprintf("pgrep -f %q", pattern)) == nil + + err = session.Run(cmd) + if err == nil { + return true, nil + } + + var exitErr *ssh.ExitError + if errors.As(err, &exitErr) && exitErr.ExitStatus() == 1 { + return false, nil + } + + return false, fmt.Errorf("%q failed: %w", cmd, err) } func (r *VMReconciler) resolveVMIP(vmIP string) (string, error) { From 1ca2fe77aac1822b89e66f7e137b9c03215fe067 Mon Sep 17 00:00:00 2001 From: ispasov Date: Wed, 3 Jun 2026 18:36:48 +0300 Subject: [PATCH 4/5] Handle initial messages better --- pkg/github/runners/message-processor.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/github/runners/message-processor.go b/pkg/github/runners/message-processor.go index c071083..653b08d 100644 --- a/pkg/github/runners/message-processor.go +++ b/pkg/github/runners/message-processor.go @@ -109,9 +109,12 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale provisionedRunners := 0 if message.MessageId == 0 && len(batchedMessages) == 0 { - p.logger.Infof("initial message received, provision runners to cover assigned-job gap") - requiredRunners = requiredRunners - message.Statistics.TotalRunningJobs - p.logger.Infof("required runners after subtracting running jobs: %d", requiredRunners) + // The session-creation snapshot can lag: runners that are already busy may + // not be counted in TotalRegisteredRunners yet. A busy or idle runner must + // be registered, so trust whichever count is larger. + effectiveRegisteredRunners := max(message.Statistics.TotalRegisteredRunners, message.Statistics.TotalBusyRunners+message.Statistics.TotalIdleRunners) + requiredRunners = message.Statistics.TotalAssignedJobs - effectiveRegisteredRunners + p.logger.Infof("initial message received, provision runners to cover assigned-job gap (effective registered runners: %d, required runners: %d)", effectiveRegisteredRunners, requiredRunners) for provisionedRunners < requiredRunners { provisionedRunners++ p.logger.Infof("provisioning runner %d/%d to cover assigned-job gap", provisionedRunners, requiredRunners) From 69ebcdfa0c0f4c133ec63775cdde08fd38e93185 Mon Sep 17 00:00:00 2001 From: ispasov Date: Wed, 3 Jun 2026 18:54:09 +0300 Subject: [PATCH 5/5] Use running job instead of busy --- pkg/github/runners/message-processor.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/github/runners/message-processor.go b/pkg/github/runners/message-processor.go index 653b08d..16bef03 100644 --- a/pkg/github/runners/message-processor.go +++ b/pkg/github/runners/message-processor.go @@ -109,10 +109,12 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale provisionedRunners := 0 if message.MessageId == 0 && len(batchedMessages) == 0 { - // The session-creation snapshot can lag: runners that are already busy may - // not be counted in TotalRegisteredRunners yet. A busy or idle runner must - // be registered, so trust whichever count is larger. - effectiveRegisteredRunners := max(message.Statistics.TotalRegisteredRunners, message.Statistics.TotalBusyRunners+message.Statistics.TotalIdleRunners) + // The session-creation snapshot can lag: runners whose jobs are already + // running may not be counted in TotalRegisteredRunners yet. A running job + // implies a live runner picked it up (unlike TotalBusyRunners, which only + // means a job is pinned to a runner record - possibly one that never + // connected), so trust whichever count is larger. + effectiveRegisteredRunners := max(message.Statistics.TotalRegisteredRunners, message.Statistics.TotalRunningJobs) requiredRunners = message.Statistics.TotalAssignedJobs - effectiveRegisteredRunners p.logger.Infof("initial message received, provision runners to cover assigned-job gap (effective registered runners: %d, required runners: %d)", effectiveRegisteredRunners, requiredRunners) for provisionedRunners < requiredRunners {