diff --git a/README.md b/README.md index 8300e5b..4f99f66 100644 --- a/README.md +++ b/README.md @@ -63,6 +63,7 @@ The Orka GitHub runner requires the following environment variabales to be confi * `ENABLE_METRICS`: (Optional) Enables Prometheus metrics exposure. When set to `true`, the service will expose metrics at the `/metrics` endpoint. Defaults to `false`. * `METRICS_ADDR`: (Optional) The address where the Prometheus metrics endpoint will be exposed (e.g., `:8080`). Defaults to `:8080`. * `METRICS_POLL_INTERVAL`: (Optional) Interval at which runner scale set statistics are polled and metrics are updated (e.g., `30s`, `1m`). Defaults to `30s`. +* `MANAGE_RUNNER_SCALE_SETS`: (Optional) When set to `true`, deletes any existing runner scale set with the same name on startup and deletes the scale set on exit. When set to `false`, reuses an existing scale set if found and skips deletion on exit. Defaults to `false`. For a complete example of the required format, refer to the `.env` file located in the examples directory [here](./examples/.env). diff --git a/examples/.env b/examples/.env index f2a90ae..e0c2e81 100644 --- a/examples/.env +++ b/examples/.env @@ -82,3 +82,9 @@ VM_TRACKER_INTERVAL="300s" ENABLE_METRICS=true METRICS_ADDR=:8080 METRICS_POLL_INTERVAL=30s + +# Runner scale set lifecycle management (optional) +# When true, deletes any existing scale set with the same name on startup, and deletes the scale set on exit. +# When false, reuses an existing scale set if found and skips deletion on exit. This allows for the runner to pick up where it left off if the previous runner was terminated abnormally. +# Defaults to false. +MANAGE_RUNNER_SCALE_SETS=false diff --git a/main.go b/main.go index b5c1133..e629edb 100644 --- a/main.go +++ b/main.go @@ -2,9 +2,11 @@ package main import ( "context" + "errors" "fmt" "os" "os/signal" + "sync" "syscall" "github.com/macstadium/orka-github-actions-integration/pkg/constants" @@ -21,8 +23,6 @@ import ( "k8s.io/apimachinery/pkg/util/validation" ) -var runnerScaleSetIDs = []int{} - func main() { envData := env.ParseEnv() @@ -52,17 +52,71 @@ func main() { panic(err) } + existing, err := actionsClient.GetRunnerScaleSet(ctx, groupId, runnerName) + if err != nil { + panic(fmt.Sprintf("error checking for existing runner scale set: %s", err.Error())) + } + + var runnerScaleSet *types.RunnerScaleSet + if existing != nil && !envData.ManageRunnerScaleSets { + logger.Infof("reusing existing runner scale set %s (id=%d)", existing.Name, existing.Id) + runnerScaleSet = existing + } else { + if existing != nil { + if err = actionsClient.DeleteRunnerScaleSet(ctx, existing.Id); err != nil { + panic(fmt.Sprintf("error deleting existing runner scale set: %s", err.Error())) + } + } + runnerScaleSet, err = createScaleSet(ctx, actionsClient, runnerName, groupId) + if err != nil { + panic(fmt.Sprintf("unable to create runner %s, err: %s", runnerName, err.Error())) + } + logger.Infof("created runner scale set %s (id=%d)", runnerScaleSet.Name, runnerScaleSet.Id) + } + + if envData.EnableMetrics { + metrics.Start(ctx, logger, envData, actionsClient, runnerName, groupId) + } + + orkaClient, err := orka.NewOrkaClient(envData, ctx) + if err != nil { + panic(fmt.Sprintf("unable to access Orka cluster. More info: %s", err.Error())) + } + + runnerManager, err := runners.NewRunnerManager(ctx, actionsClient, runnerScaleSet.Id) + if errors.Is(err, runners.ErrActiveSession) { + logger.Infof("scale set %s (id=%d) has a stale active session, deleting and recreating", runnerScaleSet.Name, runnerScaleSet.Id) + if err = actionsClient.DeleteRunnerScaleSet(ctx, runnerScaleSet.Id); err != nil { + panic(fmt.Sprintf("error deleting scale set with active session: %s", err.Error())) + } + runnerScaleSet, err = createScaleSet(ctx, actionsClient, runnerName, groupId) + if err != nil { + panic(fmt.Sprintf("error recreating scale set after active session conflict: %s", err.Error())) + } + logger.Infof("recreated scale set %s (id=%d)", runnerScaleSet.Name, runnerScaleSet.Id) + runnerManager, err = runners.NewRunnerManager(ctx, actionsClient, runnerScaleSet.Id) + } + if err != nil { + panic(err) + } + + var closeOnce sync.Once + closeManager := func() { + closeOnce.Do(func() { runnerManager.Close() }) + } + defer closeManager() + go func() { - // Wait for termination signal <-ctx.Done() if ctx.Err() == context.Canceled { - fmt.Println("Received termination signal. Performing cleanup...") + logger.Info("received termination signal, performing cleanup") - for _, runnerScaleSetID := range runnerScaleSetIDs { - err = actionsClient.DeleteRunnerScaleSet(context.TODO(), runnerScaleSetID) - if err != nil { - fmt.Printf("error while deleting runnerScaleSet %s", err.Error()) + closeManager() + + if envData.ManageRunnerScaleSets { + if err := actionsClient.DeleteRunnerScaleSet(context.TODO(), runnerScaleSet.Id); err != nil { + logger.Errorf("error deleting runner scale set on exit: %s", err.Error()) } } @@ -70,7 +124,11 @@ func main() { } }() - runnerScaleSet, err := actionsClient.CreateRunnerScaleSet(ctx, &types.RunnerScaleSet{ + run(ctx, actionsClient, orkaClient, runnerScaleSet, runnerManager, envData, logger) +} + +func createScaleSet(ctx context.Context, actionsClient *actions.ActionsClient, runnerName string, groupId int) (*types.RunnerScaleSet, error) { + return actionsClient.CreateRunnerScaleSet(ctx, &types.RunnerScaleSet{ Name: runnerName, RunnerGroupId: groupId, Labels: []types.RunnerScaleSetLabel{ @@ -84,31 +142,9 @@ func main() { DisableUpdate: true, }, }) - if err != nil { - panic(fmt.Sprintf("unable to create runner %s, err: %s", runnerName, err.Error())) - } - - runnerScaleSetIDs = append(runnerScaleSetIDs, runnerScaleSet.Id) - - if envData.EnableMetrics { - metrics.Start(ctx, logger, envData, actionsClient, runnerName, groupId) - } - - orkaClient, err := orka.NewOrkaClient(envData, ctx) - if err != nil { - panic(fmt.Sprintf("unable to access Orka cluster. More info: %s", err.Error())) - } - - run(ctx, actionsClient, orkaClient, runnerScaleSet, envData, logger) } -func run(ctx context.Context, actionsClient *actions.ActionsClient, orkaClient *orka.OrkaClient, runnerScaleSet *types.RunnerScaleSet, envData *env.Data, logger *zap.SugaredLogger) { - runnerManager, err := runners.NewRunnerManager(ctx, actionsClient, runnerScaleSet.Id) - if err != nil { - panic(err) - } - defer runnerManager.Close() - +func run(ctx context.Context, actionsClient *actions.ActionsClient, orkaClient *orka.OrkaClient, runnerScaleSet *types.RunnerScaleSet, runnerManager *runners.RunnerManager, envData *env.Data, logger *zap.SugaredLogger) { runnerProvisioner := provisioner.NewRunnerProvisioner(runnerScaleSet, actionsClient, orkaClient, envData) vmTracker := runners.NewVMTracker(orkaClient, actionsClient, logger) @@ -116,7 +152,7 @@ func run(ctx context.Context, actionsClient *actions.ActionsClient, orkaClient * runnerMessageProcessor := runners.NewRunnerMessageProcessor(ctx, runnerManager, runnerProvisioner, vmTracker, runnerScaleSet) - if err = runnerMessageProcessor.StartProcessingMessages(); err != nil { - logger.Errorf("failed to start processing messages for runnerScaleSet %s: %w", runnerScaleSet.Name, err.Error()) + if err := runnerMessageProcessor.StartProcessingMessages(); err != nil && !errors.Is(err, context.Canceled) { + logger.Errorf("failed to start processing messages for runnerScaleSet %s: %v", runnerScaleSet.Name, err) } } diff --git a/pkg/env/constants.go b/pkg/env/constants.go index 64571a5..7b3b8b5 100644 --- a/pkg/env/constants.go +++ b/pkg/env/constants.go @@ -35,4 +35,6 @@ const ( EnableMetricsEnvName = "ENABLE_METRICS" MetricsAddrEnvName = "METRICS_ADDR" MetricsPollIntervalEnvName = "METRICS_POLL_INTERVAL" + + ManageRunnerScaleSetsEnvName = "MANAGE_RUNNER_SCALE_SETS" ) diff --git a/pkg/env/env.go b/pkg/env/env.go index f930255..e77aefb 100644 --- a/pkg/env/env.go +++ b/pkg/env/env.go @@ -55,6 +55,8 @@ type Data struct { EnableMetrics bool MetricsAddr string MetricsPollInterval time.Duration + + ManageRunnerScaleSets bool } func ParseEnv() *Data { @@ -90,6 +92,8 @@ func ParseEnv() *Data { EnableMetrics: getBoolEnv(EnableMetricsEnvName, false), MetricsAddr: getEnvWithDefault(MetricsAddrEnvName, ":8080"), MetricsPollInterval: getDurationEnv(MetricsPollIntervalEnvName, 30*time.Second), + + ManageRunnerScaleSets: getBoolEnv(ManageRunnerScaleSetsEnvName, false), } envData.OrkaURL = strings.TrimSuffix(envData.OrkaURL, "/") diff --git a/pkg/github/runners/manager.go b/pkg/github/runners/manager.go index 95b1ea7..7b20078 100644 --- a/pkg/github/runners/manager.go +++ b/pkg/github/runners/manager.go @@ -17,7 +17,6 @@ import ( "go.uber.org/zap" "github.com/macstadium/orka-github-actions-integration/pkg/github/actions" - ghErrors "github.com/macstadium/orka-github-actions-integration/pkg/github/errors" "github.com/macstadium/orka-github-actions-integration/pkg/github/messagequeue" "github.com/macstadium/orka-github-actions-integration/pkg/github/types" "github.com/macstadium/orka-github-actions-integration/pkg/logging" @@ -28,6 +27,8 @@ const ( runnerScaleSetJobMessagesType = "RunnerScaleSetJobMessages" ) +var ErrActiveSession = errors.New("runner scale set already has an active session") + func NewRunnerManager(ctx context.Context, client actions.ActionsService, runnerScaleSetId int) (*RunnerManager, error) { logger := logging.Logger.Named(fmt.Sprintf("runner-manager-%d", runnerScaleSetId)) @@ -69,9 +70,12 @@ func createSessionWithRetry(ctx context.Context, logger *zap.SugaredLogger, clie break } - clientSideError := &ghErrors.HttpClientSideError{} - if errors.As(err, &clientSideError) && clientSideError.Code != http.StatusConflict { - logger.Info("unable to create message session. The error indicates something is wrong on the client side, won't make any retry.") + actionsErr := &actions.ActionsError{} + if errors.As(err, &actionsErr) { + if actionsErr.StatusCode == http.StatusConflict { + return nil, fmt.Errorf("%w: %s", ErrActiveSession, err) + } + logger.Infof("unable to create message session, client-side error (status %d), won't retry: %s", actionsErr.StatusCode, err.Error()) return nil, fmt.Errorf("create message session http request failed. %w", err) } diff --git a/pkg/http/logger.go b/pkg/http/logger.go index 8749104..13b819c 100644 --- a/pkg/http/logger.go +++ b/pkg/http/logger.go @@ -1,6 +1,9 @@ package retryablehttp import ( + "context" + "errors" + "go.uber.org/zap" ) @@ -9,6 +12,16 @@ type LeveledLogger struct { } func (l *LeveledLogger) Error(msg string, keysAndValues ...interface{}) { + // go-retryablehttp logs every failed request at error level, including ones + // that fail purely because the caller's context was cancelled (e.g. during + // graceful shutdown). Downgrade those to debug to avoid alarming noise. + for i := 1; i < len(keysAndValues); i += 2 { + if err, ok := keysAndValues[i].(error); ok && errors.Is(err, context.Canceled) { + l.logger.Debugw(msg, keysAndValues...) + return + } + } + l.logger.Errorw(msg, keysAndValues...) }