Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions examples/.env
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,10 @@ METRICS_POLL_INTERVAL=30s
# When false, reuses an existing scale set if found and skips deletion on exit. This allows for the runner to pick up where it left off if the previous runner was terminated abnormally.
# Defaults to false.
MANAGE_RUNNER_SCALE_SETS=false

# Startup VM reconciliation (optional)
# When true, on startup lists existing Orka VMs for the scale set, checks each VM's state over SSH
# (sentinel files + run.sh process), and either adopts (active), cleans up (run finished), or deletes
# (no GitHub runner / setup incomplete / run.sh crashed) so the controller recovers from prior process exits.
# Defaults to true.
ENABLE_RECONCILIATION=true

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want this to be enabled by default now? As it changes the behavior, it may cause unexpected issues for existing users if the controller is restarted.

Should we make it opt-in for now?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering about htis.
But given the fact the previous behavior introduces stuck jobs and orphaned VM, this seems like a better default

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. At the least, when we release a new version we should highlight the change in behavior

36 changes: 26 additions & 10 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/macstadium/orka-github-actions-integration/pkg/logging"
"github.com/macstadium/orka-github-actions-integration/pkg/metrics"
"github.com/macstadium/orka-github-actions-integration/pkg/orka"
"github.com/macstadium/orka-github-actions-integration/pkg/reconciler"
provisioner "github.com/macstadium/orka-github-actions-integration/pkg/runner-provisioner"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/validation"
Expand Down Expand Up @@ -83,6 +84,20 @@ func main() {
panic(fmt.Sprintf("unable to access Orka cluster. More info: %s", err.Error()))
}

vmTracker := runners.NewVMTracker(orkaClient, actionsClient, logger)
go vmTracker.Start(ctx, envData.VMTrackerInterval)

runnerProvisioner := provisioner.NewRunnerProvisioner(runnerScaleSet, actionsClient, orkaClient, envData)

var existingVMs []*orka.OrkaVMInfo
if envData.EnableReconciliation {
var listErr error
existingVMs, listErr = orkaClient.ListVMs(ctx, runnerScaleSet.Name)
if listErr != nil {
logger.Warnf("failed to list existing VMs for reconciliation: %v", listErr)
}
}

runnerManager, err := runners.NewRunnerManager(ctx, actionsClient, runnerScaleSet.Id)
if errors.Is(err, runners.ErrActiveSession) {
logger.Infof("scale set %s (id=%d) has a stale active session, deleting and recreating", runnerScaleSet.Name, runnerScaleSet.Id)
Expand All @@ -94,6 +109,7 @@ func main() {
panic(fmt.Sprintf("error recreating scale set after active session conflict: %s", err.Error()))
}
logger.Infof("recreated scale set %s (id=%d)", runnerScaleSet.Name, runnerScaleSet.Id)
runnerProvisioner = provisioner.NewRunnerProvisioner(runnerScaleSet, actionsClient, orkaClient, envData)
runnerManager, err = runners.NewRunnerManager(ctx, actionsClient, runnerScaleSet.Id)
}
if err != nil {
Expand Down Expand Up @@ -124,7 +140,14 @@ func main() {
}
}()

run(ctx, actionsClient, orkaClient, runnerScaleSet, runnerManager, envData, logger)
runnerMessageProcessor := runners.NewRunnerMessageProcessor(ctx, runnerManager, runnerProvisioner, vmTracker, runnerScaleSet)

if envData.EnableReconciliation {
vmReconciler := reconciler.NewVMReconciler(actionsClient, runnerProvisioner, runnerMessageProcessor.AdoptVM, envData)
go vmReconciler.ReconcileVMs(ctx, existingVMs)
}

run(ctx, runnerMessageProcessor, runnerScaleSet, logger)
}

func createScaleSet(ctx context.Context, actionsClient *actions.ActionsClient, runnerName string, groupId int) (*types.RunnerScaleSet, error) {
Expand All @@ -144,15 +167,8 @@ func createScaleSet(ctx context.Context, actionsClient *actions.ActionsClient, r
})
}

func run(ctx context.Context, actionsClient *actions.ActionsClient, orkaClient *orka.OrkaClient, runnerScaleSet *types.RunnerScaleSet, runnerManager *runners.RunnerManager, envData *env.Data, logger *zap.SugaredLogger) {
runnerProvisioner := provisioner.NewRunnerProvisioner(runnerScaleSet, actionsClient, orkaClient, envData)

vmTracker := runners.NewVMTracker(orkaClient, actionsClient, logger)
go vmTracker.Start(ctx, envData.VMTrackerInterval)

runnerMessageProcessor := runners.NewRunnerMessageProcessor(ctx, runnerManager, runnerProvisioner, vmTracker, runnerScaleSet)

if err := runnerMessageProcessor.StartProcessingMessages(); err != nil && !errors.Is(err, context.Canceled) {
func run(ctx context.Context, processor *runners.RunnerMessageProcessor, runnerScaleSet *types.RunnerScaleSet, logger *zap.SugaredLogger) {
if err := processor.StartProcessingMessages(); err != nil && !errors.Is(err, context.Canceled) {
logger.Errorf("failed to start processing messages for runnerScaleSet %s: %v", runnerScaleSet.Name, err)
}
}
2 changes: 2 additions & 0 deletions pkg/env/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,6 @@ const (
MetricsPollIntervalEnvName = "METRICS_POLL_INTERVAL"

ManageRunnerScaleSetsEnvName = "MANAGE_RUNNER_SCALE_SETS"

EnableReconciliationEnvName = "ENABLE_RECONCILIATION"
)
4 changes: 4 additions & 0 deletions pkg/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type Data struct {
MetricsPollInterval time.Duration

ManageRunnerScaleSets bool

EnableReconciliation bool
}

func ParseEnv() *Data {
Expand Down Expand Up @@ -94,6 +96,8 @@ func ParseEnv() *Data {
MetricsPollInterval: getDurationEnv(MetricsPollIntervalEnvName, 30*time.Second),

ManageRunnerScaleSets: getBoolEnv(ManageRunnerScaleSetsEnvName, false),

EnableReconciliation: getBoolEnv(EnableReconciliationEnvName, true),
}

envData.OrkaURL = strings.TrimSuffix(envData.OrkaURL, "/")
Expand Down
170 changes: 101 additions & 69 deletions pkg/github/runners/message-processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"sync"
"time"

"github.com/google/uuid"
"github.com/macstadium/orka-github-actions-integration/pkg/github/types"
"github.com/macstadium/orka-github-actions-integration/pkg/logging"
"github.com/macstadium/orka-github-actions-integration/pkg/orka"
Expand All @@ -34,6 +35,10 @@ func newJobIdentity(jobId string, runnerRequestId int64) jobIdentity {
return jobIdentity{jobId: jobId, runnerRequestId: runnerRequestId}
}

func newRecoveryJobIdentity() jobIdentity {
return jobIdentity{jobId: "recovery-" + uuid.NewString()}
}

func (k jobIdentity) String() string {
return fmt.Sprintf("jobId=%s runnerRequestId=%d", k.jobId, k.runnerRequestId)
}
Expand Down Expand Up @@ -91,20 +96,35 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale
return nil
}

if message.MessageId == 0 && message.Body == "" { // initial message with statistics only
return nil
}

var batchedMessages []json.RawMessage
if err := json.NewDecoder(strings.NewReader(message.Body)).Decode(&batchedMessages); err != nil {
return fmt.Errorf("could not decode job messages. %w", err)
if message.Body != "" {
if err := json.NewDecoder(strings.NewReader(message.Body)).Decode(&batchedMessages); err != nil {
return fmt.Errorf("could not decode job messages. %w", err)
}
}

p.logger.Infof("process batched runner scale set job messages with id %d and batch size %d", message.MessageId, len(batchedMessages))

requiredRunners := message.Statistics.TotalAssignedJobs - message.Statistics.TotalRegisteredRunners
provisionedRunners := 0

if message.MessageId == 0 && len(batchedMessages) == 0 {
// The session-creation snapshot can lag: runners whose jobs are already
// running may not be counted in TotalRegisteredRunners yet. A running job
// implies a live runner picked it up (unlike TotalBusyRunners, which only
// means a job is pinned to a runner record - possibly one that never
// connected), so trust whichever count is larger.
effectiveRegisteredRunners := max(message.Statistics.TotalRegisteredRunners, message.Statistics.TotalRunningJobs)
requiredRunners = message.Statistics.TotalAssignedJobs - effectiveRegisteredRunners
p.logger.Infof("initial message received, provision runners to cover assigned-job gap (effective registered runners: %d, required runners: %d)", effectiveRegisteredRunners, requiredRunners)
for provisionedRunners < requiredRunners {
provisionedRunners++
p.logger.Infof("provisioning runner %d/%d to cover assigned-job gap", provisionedRunners, requiredRunners)
go p.startRunner(newRecoveryJobIdentity())
}
return nil
}

var availableJobs []int64
for _, message := range batchedMessages {
var messageType types.JobMessageType
Expand All @@ -131,69 +151,7 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale
if provisionedRunners < requiredRunners {
provisionedRunners++
p.logger.Infof("number of runners provisioning started: %d. Max required runners: %d", provisionedRunners, requiredRunners)

job := newJobIdentity(jobAssigned.JobId, jobAssigned.RunnerRequestId)

go func() {
var executionErr error

defer p.removeUpstreamCanceledJob(job)

executor, commands, provisioningErr := p.provisionRunnerWithRetry(p.ctx, job)
if provisioningErr != nil {
if errors.Is(provisioningErr, context.Canceled) {
p.logger.Infof("provisioning canceled for %s", p.runnerScaleSetName)
} else {
p.logger.Errorf("unable to provision Orka runner for %s: %v", p.runnerScaleSetName, provisioningErr)
}
return
}

if executor == nil {
p.logger.Errorf("provisioning returned nil executor for %s", p.runnerScaleSetName)
return
}

runnerContext, cancel := context.WithCancel(p.ctx)
p.storeRunnerContextCancel(executor.VMName, cancel)

context.AfterFunc(runnerContext, func() {
p.logger.Infof("cleaning up resources for %s after runner context is canceled", executor.VMName)
p.runnerProvisioner.CleanupResources(context.WithoutCancel(p.ctx), executor.VMName)
p.vmTracker.Untrack(executor.VMName)
})

defer func() {
if isNetworkingFailure(executionErr) {
p.logger.Warnf("SSH connection dropped for %s (%v). Skipping cleanup, relying on JobCompleted webhook.", job, executionErr)
return
}

var cancelReason string
var exitErr *ssh.ExitError

if errors.Is(executionErr, context.Canceled) {
cancelReason = "runner context was canceled"
p.logger.Infof("runner context canceled for RunnerName %s with %s. Cleaning up resources.", executor.VMName, job)
} else if executionErr != nil {
if errors.As(executionErr, &exitErr) {
cancelReason = fmt.Sprintf("execution failed with exit code %d", exitErr.ExitStatus())
p.logger.Errorf("execution failed with exit code %d for RunnerName %s with %s. Cleaning up resources.", exitErr.ExitStatus(), executor.VMName, job)
} else {
cancelReason = fmt.Sprintf("execution failed: %v", executionErr)
p.logger.Errorf("execution failed for RunnerName %s with %s. Cleaning up resources: %v", executor.VMName, job, executionErr)
}
} else {
cancelReason = "execution completed successfully"
p.logger.Infof("execution completed successfully for RunnerName %s with %s. Cleaning up resources.", executor.VMName, job)
}

p.cancelRunnerContext(executor.VMName, cancelReason)
}()

p.vmTracker.Track(executor.VMName)
executionErr = p.executeJobCommands(runnerContext, job, executor, commands)
}()
go p.startRunner(newJobIdentity(jobAssigned.JobId, jobAssigned.RunnerRequestId))
}
case "JobStarted":
var jobStarted types.JobStarted
Expand Down Expand Up @@ -234,6 +192,80 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale
return nil
}

func (p *RunnerMessageProcessor) AdoptVM(vmName string) {
runnerContext, cancel := context.WithCancel(p.ctx)
p.storeRunnerContextCancel(vmName, cancel)

context.AfterFunc(runnerContext, func() {
p.logger.Infof("cleaning up resources for %s after runner context is canceled", vmName)
p.runnerProvisioner.CleanupResources(context.WithoutCancel(p.ctx), vmName)
p.vmTracker.Untrack(vmName)
})

p.vmTracker.Track(vmName)
}

func (p *RunnerMessageProcessor) startRunner(job jobIdentity) {
var executionErr error

defer p.removeUpstreamCanceledJob(job)

executor, commands, provisioningErr := p.provisionRunnerWithRetry(p.ctx, job)
if provisioningErr != nil {
if errors.Is(provisioningErr, context.Canceled) {
p.logger.Infof("provisioning canceled for %s", p.runnerScaleSetName)
} else {
p.logger.Errorf("unable to provision Orka runner for %s: %v", p.runnerScaleSetName, provisioningErr)
}
return
}

if executor == nil {
p.logger.Errorf("provisioning returned nil executor for %s", p.runnerScaleSetName)
return
}

runnerContext, cancel := context.WithCancel(p.ctx)
p.storeRunnerContextCancel(executor.VMName, cancel)

context.AfterFunc(runnerContext, func() {
p.logger.Infof("cleaning up resources for %s after runner context is canceled", executor.VMName)
p.runnerProvisioner.CleanupResources(context.WithoutCancel(p.ctx), executor.VMName)
p.vmTracker.Untrack(executor.VMName)
})

defer func() {
if isNetworkingFailure(executionErr) {
p.logger.Warnf("SSH connection dropped for %s (%v). Skipping cleanup, relying on JobCompleted webhook.", job, executionErr)
return
}

var cancelReason string
var exitErr *ssh.ExitError

if errors.Is(executionErr, context.Canceled) {
cancelReason = "runner context was canceled"
p.logger.Infof("runner context canceled for RunnerName %s with %s. Cleaning up resources.", executor.VMName, job)
} else if executionErr != nil {
if errors.As(executionErr, &exitErr) {
cancelReason = fmt.Sprintf("execution failed with exit code %d", exitErr.ExitStatus())
p.logger.Errorf("execution failed with exit code %d for RunnerName %s with %s. Cleaning up resources.", exitErr.ExitStatus(), executor.VMName, job)
} else {
cancelReason = fmt.Sprintf("execution failed: %v", executionErr)
p.logger.Errorf("execution failed for RunnerName %s with %s. Cleaning up resources: %v", executor.VMName, job, executionErr)
}
} else {
cancelReason = "execution completed successfully"
p.logger.Infof("execution completed successfully for RunnerName %s with %s. Cleaning up resources.", executor.VMName, job)
}

p.cancelRunnerContext(executor.VMName, cancelReason)
}()

p.vmTracker.Track(executor.VMName)
executionErr = p.executeJobCommands(runnerContext, job, executor, commands)
}

func (p *RunnerMessageProcessor) provisionRunnerWithRetry(ctx context.Context, job jobIdentity) (*orka.VMCommandExecutor, []string, error) {
for attempt := 1; !p.isUpstreamCanceled(job); attempt++ {
executor, commands, err := p.runnerProvisioner.ProvisionRunner(ctx)
Expand Down
4 changes: 4 additions & 0 deletions pkg/github/runners/vm_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ func (m *MockOrkaClient) DeployVM(ctx context.Context, namePrefix, vmConfig stri
return nil, nil
}

func (m *MockOrkaClient) ListVMs(ctx context.Context, namePrefix string) ([]*orka.OrkaVMInfo, error) {
return nil, nil
}

type MockActionsClient struct {
GetRunnerFunc func(ctx context.Context, runnerName string) (*types.RunnerReference, error)
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/orka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
type OrkaService interface {
DeployVM(ctx context.Context, namePrefix, vmConfig string) (*OrkaVMDeployResponseModel, error)
DeleteVM(ctx context.Context, name string) error
ListVMs(ctx context.Context, namePrefix string) ([]*OrkaVMInfo, error)
}

type OrkaClient struct {
Expand All @@ -35,6 +36,21 @@ func (client *OrkaClient) DeployVM(ctx context.Context, namePrefix, vmConfig str
return (*res)[0], nil
}

func (client *OrkaClient) ListVMs(ctx context.Context, namePrefix string) ([]*OrkaVMInfo, error) {
res, err := exec.ExecJSONCommand[[]*OrkaVMInfo]("orka3", []string{"vm", "list", "--namespace", client.envData.OrkaNamespace, "-o", "json"})
if err != nil {
return nil, err
}

var filtered []*OrkaVMInfo
for _, vm := range *res {
if strings.HasPrefix(vm.Name, namePrefix) {
filtered = append(filtered, vm)
}
}
return filtered, nil
}

func (client *OrkaClient) DeleteVM(ctx context.Context, name string) error {
out, err := exec.ExecStringCommand("orka3", []string{"vm", "delete", name, "--namespace", client.envData.OrkaNamespace})
if out == fmt.Sprintf("Successfully deleted vm %s", name) {
Expand Down
7 changes: 7 additions & 0 deletions pkg/orka/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,10 @@ type OrkaClusterInfoResponseModel struct {
AppClientId string `json:"appClientId"`
BaseOauthEndpoint string `json:"baseOauthEndpoint"`
}

type OrkaVMInfo struct {
Name string `json:"name"`
IP string `json:"ip"`
SSH *int `json:"ssh,omitempty"`
Status VMPhase `json:"status"`
}
Loading
Loading