diff --git a/examples/.env b/examples/.env index e0c2e81..357965e 100644 --- a/examples/.env +++ b/examples/.env @@ -88,3 +88,10 @@ METRICS_POLL_INTERVAL=30s # When false, reuses an existing scale set if found and skips deletion on exit. This allows for the runner to pick up where it left off if the previous runner was terminated abnormally. # Defaults to false. MANAGE_RUNNER_SCALE_SETS=false + +# Startup VM reconciliation (optional) +# When true, on startup lists existing Orka VMs for the scale set, checks each VM's state over SSH +# (sentinel files + run.sh process), and either adopts (active), cleans up (run finished), or deletes +# (no GitHub runner / setup incomplete / run.sh crashed) so the controller recovers from prior process exits. +# Defaults to true. +ENABLE_RECONCILIATION=true diff --git a/main.go b/main.go index e629edb..30d1c19 100644 --- a/main.go +++ b/main.go @@ -18,6 +18,7 @@ import ( "github.com/macstadium/orka-github-actions-integration/pkg/logging" "github.com/macstadium/orka-github-actions-integration/pkg/metrics" "github.com/macstadium/orka-github-actions-integration/pkg/orka" + "github.com/macstadium/orka-github-actions-integration/pkg/reconciler" provisioner "github.com/macstadium/orka-github-actions-integration/pkg/runner-provisioner" "go.uber.org/zap" "k8s.io/apimachinery/pkg/util/validation" @@ -83,6 +84,20 @@ func main() { panic(fmt.Sprintf("unable to access Orka cluster. More info: %s", err.Error())) } + vmTracker := runners.NewVMTracker(orkaClient, actionsClient, logger) + go vmTracker.Start(ctx, envData.VMTrackerInterval) + + runnerProvisioner := provisioner.NewRunnerProvisioner(runnerScaleSet, actionsClient, orkaClient, envData) + + var existingVMs []*orka.OrkaVMInfo + if envData.EnableReconciliation { + var listErr error + existingVMs, listErr = orkaClient.ListVMs(ctx, runnerScaleSet.Name) + if listErr != nil { + logger.Warnf("failed to list existing VMs for reconciliation: %v", listErr) + } + } + runnerManager, err := runners.NewRunnerManager(ctx, actionsClient, runnerScaleSet.Id) if errors.Is(err, runners.ErrActiveSession) { logger.Infof("scale set %s (id=%d) has a stale active session, deleting and recreating", runnerScaleSet.Name, runnerScaleSet.Id) @@ -94,6 +109,7 @@ func main() { panic(fmt.Sprintf("error recreating scale set after active session conflict: %s", err.Error())) } logger.Infof("recreated scale set %s (id=%d)", runnerScaleSet.Name, runnerScaleSet.Id) + runnerProvisioner = provisioner.NewRunnerProvisioner(runnerScaleSet, actionsClient, orkaClient, envData) runnerManager, err = runners.NewRunnerManager(ctx, actionsClient, runnerScaleSet.Id) } if err != nil { @@ -124,7 +140,14 @@ func main() { } }() - run(ctx, actionsClient, orkaClient, runnerScaleSet, runnerManager, envData, logger) + runnerMessageProcessor := runners.NewRunnerMessageProcessor(ctx, runnerManager, runnerProvisioner, vmTracker, runnerScaleSet) + + if envData.EnableReconciliation { + vmReconciler := reconciler.NewVMReconciler(actionsClient, runnerProvisioner, runnerMessageProcessor.AdoptVM, envData) + go vmReconciler.ReconcileVMs(ctx, existingVMs) + } + + run(ctx, runnerMessageProcessor, runnerScaleSet, logger) } func createScaleSet(ctx context.Context, actionsClient *actions.ActionsClient, runnerName string, groupId int) (*types.RunnerScaleSet, error) { @@ -144,15 +167,8 @@ func createScaleSet(ctx context.Context, actionsClient *actions.ActionsClient, r }) } -func run(ctx context.Context, actionsClient *actions.ActionsClient, orkaClient *orka.OrkaClient, runnerScaleSet *types.RunnerScaleSet, runnerManager *runners.RunnerManager, envData *env.Data, logger *zap.SugaredLogger) { - runnerProvisioner := provisioner.NewRunnerProvisioner(runnerScaleSet, actionsClient, orkaClient, envData) - - vmTracker := runners.NewVMTracker(orkaClient, actionsClient, logger) - go vmTracker.Start(ctx, envData.VMTrackerInterval) - - runnerMessageProcessor := runners.NewRunnerMessageProcessor(ctx, runnerManager, runnerProvisioner, vmTracker, runnerScaleSet) - - if err := runnerMessageProcessor.StartProcessingMessages(); err != nil && !errors.Is(err, context.Canceled) { +func run(ctx context.Context, processor *runners.RunnerMessageProcessor, runnerScaleSet *types.RunnerScaleSet, logger *zap.SugaredLogger) { + if err := processor.StartProcessingMessages(); err != nil && !errors.Is(err, context.Canceled) { logger.Errorf("failed to start processing messages for runnerScaleSet %s: %v", runnerScaleSet.Name, err) } } diff --git a/pkg/env/constants.go b/pkg/env/constants.go index 7b3b8b5..5c1f771 100644 --- a/pkg/env/constants.go +++ b/pkg/env/constants.go @@ -37,4 +37,6 @@ const ( MetricsPollIntervalEnvName = "METRICS_POLL_INTERVAL" ManageRunnerScaleSetsEnvName = "MANAGE_RUNNER_SCALE_SETS" + + EnableReconciliationEnvName = "ENABLE_RECONCILIATION" ) diff --git a/pkg/env/env.go b/pkg/env/env.go index e77aefb..7b60941 100644 --- a/pkg/env/env.go +++ b/pkg/env/env.go @@ -57,6 +57,8 @@ type Data struct { MetricsPollInterval time.Duration ManageRunnerScaleSets bool + + EnableReconciliation bool } func ParseEnv() *Data { @@ -94,6 +96,8 @@ func ParseEnv() *Data { MetricsPollInterval: getDurationEnv(MetricsPollIntervalEnvName, 30*time.Second), ManageRunnerScaleSets: getBoolEnv(ManageRunnerScaleSetsEnvName, false), + + EnableReconciliation: getBoolEnv(EnableReconciliationEnvName, true), } envData.OrkaURL = strings.TrimSuffix(envData.OrkaURL, "/") diff --git a/pkg/github/runners/message-processor.go b/pkg/github/runners/message-processor.go index 9f8be76..16bef03 100644 --- a/pkg/github/runners/message-processor.go +++ b/pkg/github/runners/message-processor.go @@ -13,6 +13,7 @@ import ( "sync" "time" + "github.com/google/uuid" "github.com/macstadium/orka-github-actions-integration/pkg/github/types" "github.com/macstadium/orka-github-actions-integration/pkg/logging" "github.com/macstadium/orka-github-actions-integration/pkg/orka" @@ -34,6 +35,10 @@ func newJobIdentity(jobId string, runnerRequestId int64) jobIdentity { return jobIdentity{jobId: jobId, runnerRequestId: runnerRequestId} } +func newRecoveryJobIdentity() jobIdentity { + return jobIdentity{jobId: "recovery-" + uuid.NewString()} +} + func (k jobIdentity) String() string { return fmt.Sprintf("jobId=%s runnerRequestId=%d", k.jobId, k.runnerRequestId) } @@ -91,13 +96,11 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale return nil } - if message.MessageId == 0 && message.Body == "" { // initial message with statistics only - return nil - } - var batchedMessages []json.RawMessage - if err := json.NewDecoder(strings.NewReader(message.Body)).Decode(&batchedMessages); err != nil { - return fmt.Errorf("could not decode job messages. %w", err) + if message.Body != "" { + if err := json.NewDecoder(strings.NewReader(message.Body)).Decode(&batchedMessages); err != nil { + return fmt.Errorf("could not decode job messages. %w", err) + } } p.logger.Infof("process batched runner scale set job messages with id %d and batch size %d", message.MessageId, len(batchedMessages)) @@ -105,6 +108,23 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale requiredRunners := message.Statistics.TotalAssignedJobs - message.Statistics.TotalRegisteredRunners provisionedRunners := 0 + if message.MessageId == 0 && len(batchedMessages) == 0 { + // The session-creation snapshot can lag: runners whose jobs are already + // running may not be counted in TotalRegisteredRunners yet. A running job + // implies a live runner picked it up (unlike TotalBusyRunners, which only + // means a job is pinned to a runner record - possibly one that never + // connected), so trust whichever count is larger. + effectiveRegisteredRunners := max(message.Statistics.TotalRegisteredRunners, message.Statistics.TotalRunningJobs) + requiredRunners = message.Statistics.TotalAssignedJobs - effectiveRegisteredRunners + p.logger.Infof("initial message received, provision runners to cover assigned-job gap (effective registered runners: %d, required runners: %d)", effectiveRegisteredRunners, requiredRunners) + for provisionedRunners < requiredRunners { + provisionedRunners++ + p.logger.Infof("provisioning runner %d/%d to cover assigned-job gap", provisionedRunners, requiredRunners) + go p.startRunner(newRecoveryJobIdentity()) + } + return nil + } + var availableJobs []int64 for _, message := range batchedMessages { var messageType types.JobMessageType @@ -131,69 +151,7 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale if provisionedRunners < requiredRunners { provisionedRunners++ p.logger.Infof("number of runners provisioning started: %d. Max required runners: %d", provisionedRunners, requiredRunners) - - job := newJobIdentity(jobAssigned.JobId, jobAssigned.RunnerRequestId) - - go func() { - var executionErr error - - defer p.removeUpstreamCanceledJob(job) - - executor, commands, provisioningErr := p.provisionRunnerWithRetry(p.ctx, job) - if provisioningErr != nil { - if errors.Is(provisioningErr, context.Canceled) { - p.logger.Infof("provisioning canceled for %s", p.runnerScaleSetName) - } else { - p.logger.Errorf("unable to provision Orka runner for %s: %v", p.runnerScaleSetName, provisioningErr) - } - return - } - - if executor == nil { - p.logger.Errorf("provisioning returned nil executor for %s", p.runnerScaleSetName) - return - } - - runnerContext, cancel := context.WithCancel(p.ctx) - p.storeRunnerContextCancel(executor.VMName, cancel) - - context.AfterFunc(runnerContext, func() { - p.logger.Infof("cleaning up resources for %s after runner context is canceled", executor.VMName) - p.runnerProvisioner.CleanupResources(context.WithoutCancel(p.ctx), executor.VMName) - p.vmTracker.Untrack(executor.VMName) - }) - - defer func() { - if isNetworkingFailure(executionErr) { - p.logger.Warnf("SSH connection dropped for %s (%v). Skipping cleanup, relying on JobCompleted webhook.", job, executionErr) - return - } - - var cancelReason string - var exitErr *ssh.ExitError - - if errors.Is(executionErr, context.Canceled) { - cancelReason = "runner context was canceled" - p.logger.Infof("runner context canceled for RunnerName %s with %s. Cleaning up resources.", executor.VMName, job) - } else if executionErr != nil { - if errors.As(executionErr, &exitErr) { - cancelReason = fmt.Sprintf("execution failed with exit code %d", exitErr.ExitStatus()) - p.logger.Errorf("execution failed with exit code %d for RunnerName %s with %s. Cleaning up resources.", exitErr.ExitStatus(), executor.VMName, job) - } else { - cancelReason = fmt.Sprintf("execution failed: %v", executionErr) - p.logger.Errorf("execution failed for RunnerName %s with %s. Cleaning up resources: %v", executor.VMName, job, executionErr) - } - } else { - cancelReason = "execution completed successfully" - p.logger.Infof("execution completed successfully for RunnerName %s with %s. Cleaning up resources.", executor.VMName, job) - } - - p.cancelRunnerContext(executor.VMName, cancelReason) - }() - - p.vmTracker.Track(executor.VMName) - executionErr = p.executeJobCommands(runnerContext, job, executor, commands) - }() + go p.startRunner(newJobIdentity(jobAssigned.JobId, jobAssigned.RunnerRequestId)) } case "JobStarted": var jobStarted types.JobStarted @@ -234,6 +192,80 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale return nil } +func (p *RunnerMessageProcessor) AdoptVM(vmName string) { + runnerContext, cancel := context.WithCancel(p.ctx) + p.storeRunnerContextCancel(vmName, cancel) + + context.AfterFunc(runnerContext, func() { + p.logger.Infof("cleaning up resources for %s after runner context is canceled", vmName) + p.runnerProvisioner.CleanupResources(context.WithoutCancel(p.ctx), vmName) + p.vmTracker.Untrack(vmName) + }) + + p.vmTracker.Track(vmName) +} + +func (p *RunnerMessageProcessor) startRunner(job jobIdentity) { + var executionErr error + + defer p.removeUpstreamCanceledJob(job) + + executor, commands, provisioningErr := p.provisionRunnerWithRetry(p.ctx, job) + if provisioningErr != nil { + if errors.Is(provisioningErr, context.Canceled) { + p.logger.Infof("provisioning canceled for %s", p.runnerScaleSetName) + } else { + p.logger.Errorf("unable to provision Orka runner for %s: %v", p.runnerScaleSetName, provisioningErr) + } + return + } + + if executor == nil { + p.logger.Errorf("provisioning returned nil executor for %s", p.runnerScaleSetName) + return + } + + runnerContext, cancel := context.WithCancel(p.ctx) + p.storeRunnerContextCancel(executor.VMName, cancel) + + context.AfterFunc(runnerContext, func() { + p.logger.Infof("cleaning up resources for %s after runner context is canceled", executor.VMName) + p.runnerProvisioner.CleanupResources(context.WithoutCancel(p.ctx), executor.VMName) + p.vmTracker.Untrack(executor.VMName) + }) + + defer func() { + if isNetworkingFailure(executionErr) { + p.logger.Warnf("SSH connection dropped for %s (%v). Skipping cleanup, relying on JobCompleted webhook.", job, executionErr) + return + } + + var cancelReason string + var exitErr *ssh.ExitError + + if errors.Is(executionErr, context.Canceled) { + cancelReason = "runner context was canceled" + p.logger.Infof("runner context canceled for RunnerName %s with %s. Cleaning up resources.", executor.VMName, job) + } else if executionErr != nil { + if errors.As(executionErr, &exitErr) { + cancelReason = fmt.Sprintf("execution failed with exit code %d", exitErr.ExitStatus()) + p.logger.Errorf("execution failed with exit code %d for RunnerName %s with %s. Cleaning up resources.", exitErr.ExitStatus(), executor.VMName, job) + } else { + cancelReason = fmt.Sprintf("execution failed: %v", executionErr) + p.logger.Errorf("execution failed for RunnerName %s with %s. Cleaning up resources: %v", executor.VMName, job, executionErr) + } + } else { + cancelReason = "execution completed successfully" + p.logger.Infof("execution completed successfully for RunnerName %s with %s. Cleaning up resources.", executor.VMName, job) + } + + p.cancelRunnerContext(executor.VMName, cancelReason) + }() + + p.vmTracker.Track(executor.VMName) + executionErr = p.executeJobCommands(runnerContext, job, executor, commands) +} + func (p *RunnerMessageProcessor) provisionRunnerWithRetry(ctx context.Context, job jobIdentity) (*orka.VMCommandExecutor, []string, error) { for attempt := 1; !p.isUpstreamCanceled(job); attempt++ { executor, commands, err := p.runnerProvisioner.ProvisionRunner(ctx) diff --git a/pkg/github/runners/vm_tracker_test.go b/pkg/github/runners/vm_tracker_test.go index 8f837e1..ddf39a3 100644 --- a/pkg/github/runners/vm_tracker_test.go +++ b/pkg/github/runners/vm_tracker_test.go @@ -37,6 +37,10 @@ func (m *MockOrkaClient) DeployVM(ctx context.Context, namePrefix, vmConfig stri return nil, nil } +func (m *MockOrkaClient) ListVMs(ctx context.Context, namePrefix string) ([]*orka.OrkaVMInfo, error) { + return nil, nil +} + type MockActionsClient struct { GetRunnerFunc func(ctx context.Context, runnerName string) (*types.RunnerReference, error) } diff --git a/pkg/orka/client.go b/pkg/orka/client.go index 77d8de9..0a9b764 100644 --- a/pkg/orka/client.go +++ b/pkg/orka/client.go @@ -15,6 +15,7 @@ import ( type OrkaService interface { DeployVM(ctx context.Context, namePrefix, vmConfig string) (*OrkaVMDeployResponseModel, error) DeleteVM(ctx context.Context, name string) error + ListVMs(ctx context.Context, namePrefix string) ([]*OrkaVMInfo, error) } type OrkaClient struct { @@ -35,6 +36,21 @@ func (client *OrkaClient) DeployVM(ctx context.Context, namePrefix, vmConfig str return (*res)[0], nil } +func (client *OrkaClient) ListVMs(ctx context.Context, namePrefix string) ([]*OrkaVMInfo, error) { + res, err := exec.ExecJSONCommand[[]*OrkaVMInfo]("orka3", []string{"vm", "list", "--namespace", client.envData.OrkaNamespace, "-o", "json"}) + if err != nil { + return nil, err + } + + var filtered []*OrkaVMInfo + for _, vm := range *res { + if strings.HasPrefix(vm.Name, namePrefix) { + filtered = append(filtered, vm) + } + } + return filtered, nil +} + func (client *OrkaClient) DeleteVM(ctx context.Context, name string) error { out, err := exec.ExecStringCommand("orka3", []string{"vm", "delete", name, "--namespace", client.envData.OrkaNamespace}) if out == fmt.Sprintf("Successfully deleted vm %s", name) { diff --git a/pkg/orka/types.go b/pkg/orka/types.go index 3f962af..ea141b6 100644 --- a/pkg/orka/types.go +++ b/pkg/orka/types.go @@ -41,3 +41,10 @@ type OrkaClusterInfoResponseModel struct { AppClientId string `json:"appClientId"` BaseOauthEndpoint string `json:"baseOauthEndpoint"` } + +type OrkaVMInfo struct { + Name string `json:"name"` + IP string `json:"ip"` + SSH *int `json:"ssh,omitempty"` + Status VMPhase `json:"status"` +} diff --git a/pkg/reconciler/reconciler.go b/pkg/reconciler/reconciler.go new file mode 100644 index 0000000..7ed06ca --- /dev/null +++ b/pkg/reconciler/reconciler.go @@ -0,0 +1,191 @@ +package reconciler + +import ( + "context" + "errors" + "fmt" + "net" + "time" + + "github.com/macstadium/orka-github-actions-integration/pkg/env" + "github.com/macstadium/orka-github-actions-integration/pkg/github/actions" + "github.com/macstadium/orka-github-actions-integration/pkg/logging" + "github.com/macstadium/orka-github-actions-integration/pkg/orka" + provisioner "github.com/macstadium/orka-github-actions-integration/pkg/runner-provisioner" + "go.uber.org/zap" + "golang.org/x/crypto/ssh" +) + +type vmState int + +const ( + vmStateActive vmState = iota // run.sh running → adopt + vmStateRunComplete // run.sh exited cleanly → cleanup + vmStateNeedsDeletion // setup incomplete or run.sh crashed → delete +) + +type VMReconciler struct { + actionsClient actions.ActionsService + provisioner *provisioner.RunnerProvisioner + adopt func(vmName string) + envData *env.Data + logger *zap.SugaredLogger +} + +func NewVMReconciler(actionsClient actions.ActionsService, p *provisioner.RunnerProvisioner, adopt func(string), envData *env.Data) *VMReconciler { + return &VMReconciler{ + actionsClient: actionsClient, + provisioner: p, + adopt: adopt, + envData: envData, + logger: logging.Logger.Named("vm-reconciler"), + } +} + +// VMs must be captured before message processing starts to avoid reconciling +// VMs provisioned by the current process. +func (r *VMReconciler) ReconcileVMs(ctx context.Context, vms []*orka.OrkaVMInfo) { + if len(vms) == 0 { + return + } + r.logger.Infof("reconciliation: found %d existing VM(s) to reconcile", len(vms)) + + for _, vm := range vms { + r.reconcileVM(ctx, vm) + } + + r.logger.Infof("reconciliation: completed") +} + +func (r *VMReconciler) reconcileVM(ctx context.Context, vm *orka.OrkaVMInfo) { + runner, err := r.actionsClient.GetRunner(ctx, vm.Name) + if err != nil { + r.logger.Warnf("reconciliation: VM %s GitHub check failed, adopting as potential orphan: %v", vm.Name, err) + r.adopt(vm.Name) + return + } + + if runner == nil { + r.logger.Infof("reconciliation: VM %s has no GitHub runner, deleting", vm.Name) + go r.deleteVM(ctx, vm.Name) + return + } + + state, err := r.checkVMState(vm) + if err != nil { + r.logger.Warnf("reconciliation: VM %s state check failed, adopting as potential orphan: %v", vm.Name, err) + r.adopt(vm.Name) + return + } + + switch state { + case vmStateRunComplete: + r.logger.Infof("reconciliation: VM %s run completed without cleanup, cleaning up", vm.Name) + go r.provisioner.CleanupResources(context.WithoutCancel(ctx), vm.Name) + case vmStateNeedsDeletion: + r.logger.Infof("reconciliation: VM %s setup incomplete or run.sh crashed, deleting", vm.Name) + go r.deleteVM(ctx, vm.Name) + case vmStateActive: + r.logger.Infof("reconciliation: VM %s is active, adopting for cleanup", vm.Name) + r.adopt(vm.Name) + } +} + +func (r *VMReconciler) checkVMState(vm *orka.OrkaVMInfo) (vmState, error) { + if vm.SSH == nil { + return vmStateNeedsDeletion, fmt.Errorf("VM %s has no SSH port", vm.Name) + } + + vmIP, err := r.resolveVMIP(vm.IP) + if err != nil { + return vmStateNeedsDeletion, err + } + + client, err := ssh.Dial("tcp", fmt.Sprintf("%s:%d", vmIP, *vm.SSH), &ssh.ClientConfig{ + User: r.envData.OrkaVMUsername, + Auth: []ssh.AuthMethod{ssh.Password(r.envData.OrkaVMPassword)}, + HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error { + return nil + }, + Timeout: 10 * time.Second, + }) + if err != nil { + return vmStateNeedsDeletion, fmt.Errorf("SSH connect failed: %v", err) + } + defer client.Close() + + setupComplete, err := r.fileExists(client, provisioner.SentinelSetupComplete) + if err != nil { + return vmStateNeedsDeletion, fmt.Errorf("setup sentinel check failed: %w", err) + } + if !setupComplete { + r.logger.Infof("reconciliation: VM %s setup is not complete, deleting", vm.Name) + return vmStateNeedsDeletion, nil + } + + runComplete, err := r.fileExists(client, provisioner.SentinelRunComplete) + if err != nil { + return vmStateNeedsDeletion, fmt.Errorf("run sentinel check failed: %w", err) + } + if runComplete { + r.logger.Infof("reconciliation: VM %s run.sh completed, cleaning up", vm.Name) + return vmStateRunComplete, nil + } + + processRunning, err := r.isProcessRunning(client, "actions-runner/run.sh") + if err != nil { + return vmStateNeedsDeletion, fmt.Errorf("run.sh process check failed: %w", err) + } + if !processRunning { + r.logger.Infof("reconciliation: VM %s run.sh is not running, deleting", vm.Name) + return vmStateNeedsDeletion, nil + } + + return vmStateActive, nil +} + +func (r *VMReconciler) fileExists(client *ssh.Client, path string) (bool, error) { + return r.runBoolCheck(client, fmt.Sprintf("test -f %s", path)) +} + +func (r *VMReconciler) isProcessRunning(client *ssh.Client, pattern string) (bool, error) { + return r.runBoolCheck(client, fmt.Sprintf("pgrep -f %q", pattern)) +} + +// runBoolCheck runs a remote command whose exit status encodes a boolean: +// 0 means true, 1 means a clean negative (file missing / no process matched), +// anything else (SSH failure, command error) is returned as an error so a +// failed check is never mistaken for a negative result. +func (r *VMReconciler) runBoolCheck(client *ssh.Client, cmd string) (bool, error) { + session, err := client.NewSession() + if err != nil { + return false, fmt.Errorf("ssh session failed: %w", err) + } + defer session.Close() + + err = session.Run(cmd) + if err == nil { + return true, nil + } + + var exitErr *ssh.ExitError + if errors.As(err, &exitErr) && exitErr.ExitStatus() == 1 { + return false, nil + } + + return false, fmt.Errorf("%q failed: %w", cmd, err) +} + +func (r *VMReconciler) resolveVMIP(vmIP string) (string, error) { + if !r.envData.OrkaEnableNodeIPMapping { + return vmIP, nil + } + if r.envData.OrkaNodeIPMapping[vmIP] == "" { + return "", fmt.Errorf("unable to retrieve VM IP from node IP mapping") + } + return r.envData.OrkaNodeIPMapping[vmIP], nil +} + +func (r *VMReconciler) deleteVM(ctx context.Context, vmName string) { + r.provisioner.CleanupResources(context.WithoutCancel(ctx), vmName) +} diff --git a/pkg/runner-provisioner/provisioner.go b/pkg/runner-provisioner/provisioner.go index 040432d..17daed3 100644 --- a/pkg/runner-provisioner/provisioner.go +++ b/pkg/runner-provisioner/provisioner.go @@ -28,6 +28,11 @@ type RunnerProvisioner struct { mu sync.Mutex } +const ( + SentinelSetupComplete = "/tmp/orka-runner-setup-complete" + SentinelRunComplete = "/tmp/orka-runner-run-complete" +) + var commands_template = []string{ "set -e", "echo \"Downloading Git Action Runner from https://github.com/actions/runner/releases/download/v$VERSION/actions-runner-osx-$(uname -m | sed 's/86_//')-$VERSION.tar.gz\"", @@ -38,8 +43,10 @@ var commands_template = []string{ "cd /Users/$USERNAME/actions-runner", "tar xzf /Users/$USERNAME/actions-runner/actions-runner.tar.gz", "echo 'Git Action Runner unarchive completed'", + "touch " + SentinelSetupComplete, "echo 'Starting Git Action Runner'", "/Users/$USERNAME/actions-runner/run.sh --jitconfig $JITCONFIG", + "touch " + SentinelRunComplete, "echo 'Git Action Runner exited'", }