Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ The Orka GitHub runner requires the following environment variabales to be confi
* `ENABLE_METRICS`: (Optional) Enables Prometheus metrics exposure. When set to `true`, the service will expose metrics at the `/metrics` endpoint. Defaults to `false`.
* `METRICS_ADDR`: (Optional) The address where the Prometheus metrics endpoint will be exposed (e.g., `:8080`). Defaults to `:8080`.
* `METRICS_POLL_INTERVAL`: (Optional) Interval at which runner scale set statistics are polled and metrics are updated (e.g., `30s`, `1m`). Defaults to `30s`.
* `MANAGE_RUNNER_SCALE_SETS`: (Optional) When set to `true`, deletes any existing runner scale set with the same name on startup and deletes the scale set on exit. When set to `false`, reuses an existing scale set if found and skips deletion on exit. Defaults to `false`.

For a complete example of the required format, refer to the `.env` file located in the examples directory [here](./examples/.env).

Expand Down
6 changes: 6 additions & 0 deletions examples/.env
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,9 @@ VM_TRACKER_INTERVAL="300s"
ENABLE_METRICS=true
METRICS_ADDR=:8080
METRICS_POLL_INTERVAL=30s

# Runner scale set lifecycle management (optional)
# When true, deletes any existing scale set with the same name on startup, and deletes the scale set on exit.
# When false, reuses an existing scale set if found and skips deletion on exit. This allows for the runner to pick up where it left off if the previous runner was terminated abnormally.
# Defaults to false.
MANAGE_RUNNER_SCALE_SETS=false
104 changes: 70 additions & 34 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package main

import (
"context"
"errors"
"fmt"
"os"
"os/signal"
"sync"
"syscall"

"github.com/macstadium/orka-github-actions-integration/pkg/constants"
Expand All @@ -21,8 +23,6 @@ import (
"k8s.io/apimachinery/pkg/util/validation"
)

var runnerScaleSetIDs = []int{}

func main() {
envData := env.ParseEnv()

Expand Down Expand Up @@ -52,25 +52,83 @@ func main() {
panic(err)
}

existing, err := actionsClient.GetRunnerScaleSet(ctx, groupId, runnerName)
if err != nil {
panic(fmt.Sprintf("error checking for existing runner scale set: %s", err.Error()))
}

var runnerScaleSet *types.RunnerScaleSet
if existing != nil && !envData.ManageRunnerScaleSets {
logger.Infof("reusing existing runner scale set %s (id=%d)", existing.Name, existing.Id)
runnerScaleSet = existing
} else {
if existing != nil {
if err = actionsClient.DeleteRunnerScaleSet(ctx, existing.Id); err != nil {
panic(fmt.Sprintf("error deleting existing runner scale set: %s", err.Error()))
}
}
runnerScaleSet, err = createScaleSet(ctx, actionsClient, runnerName, groupId)
if err != nil {
panic(fmt.Sprintf("unable to create runner %s, err: %s", runnerName, err.Error()))
}
logger.Infof("created runner scale set %s (id=%d)", runnerScaleSet.Name, runnerScaleSet.Id)
}

if envData.EnableMetrics {
metrics.Start(ctx, logger, envData, actionsClient, runnerName, groupId)
}

orkaClient, err := orka.NewOrkaClient(envData, ctx)
if err != nil {
panic(fmt.Sprintf("unable to access Orka cluster. More info: %s", err.Error()))
}

runnerManager, err := runners.NewRunnerManager(ctx, actionsClient, runnerScaleSet.Id)
if errors.Is(err, runners.ErrActiveSession) {
logger.Infof("scale set %s (id=%d) has a stale active session, deleting and recreating", runnerScaleSet.Name, runnerScaleSet.Id)
if err = actionsClient.DeleteRunnerScaleSet(ctx, runnerScaleSet.Id); err != nil {
panic(fmt.Sprintf("error deleting scale set with active session: %s", err.Error()))
}
runnerScaleSet, err = createScaleSet(ctx, actionsClient, runnerName, groupId)
if err != nil {
panic(fmt.Sprintf("error recreating scale set after active session conflict: %s", err.Error()))
}
logger.Infof("recreated scale set %s (id=%d)", runnerScaleSet.Name, runnerScaleSet.Id)
runnerManager, err = runners.NewRunnerManager(ctx, actionsClient, runnerScaleSet.Id)
}
if err != nil {
panic(err)
}

var closeOnce sync.Once
closeManager := func() {
closeOnce.Do(func() { runnerManager.Close() })
}
defer closeManager()

go func() {
// Wait for termination signal
<-ctx.Done()

if ctx.Err() == context.Canceled {
fmt.Println("Received termination signal. Performing cleanup...")
logger.Info("received termination signal, performing cleanup")

for _, runnerScaleSetID := range runnerScaleSetIDs {
err = actionsClient.DeleteRunnerScaleSet(context.TODO(), runnerScaleSetID)
if err != nil {
fmt.Printf("error while deleting runnerScaleSet %s", err.Error())
closeManager()

if envData.ManageRunnerScaleSets {
if err := actionsClient.DeleteRunnerScaleSet(context.TODO(), runnerScaleSet.Id); err != nil {
logger.Errorf("error deleting runner scale set on exit: %s", err.Error())
}
}

os.Exit(0)
}
}()

runnerScaleSet, err := actionsClient.CreateRunnerScaleSet(ctx, &types.RunnerScaleSet{
run(ctx, actionsClient, orkaClient, runnerScaleSet, runnerManager, envData, logger)
}

func createScaleSet(ctx context.Context, actionsClient *actions.ActionsClient, runnerName string, groupId int) (*types.RunnerScaleSet, error) {
return actionsClient.CreateRunnerScaleSet(ctx, &types.RunnerScaleSet{
Name: runnerName,
RunnerGroupId: groupId,
Labels: []types.RunnerScaleSetLabel{
Expand All @@ -84,39 +142,17 @@ func main() {
DisableUpdate: true,
},
})
if err != nil {
panic(fmt.Sprintf("unable to create runner %s, err: %s", runnerName, err.Error()))
}

runnerScaleSetIDs = append(runnerScaleSetIDs, runnerScaleSet.Id)

if envData.EnableMetrics {
metrics.Start(ctx, logger, envData, actionsClient, runnerName, groupId)
}

orkaClient, err := orka.NewOrkaClient(envData, ctx)
if err != nil {
panic(fmt.Sprintf("unable to access Orka cluster. More info: %s", err.Error()))
}

run(ctx, actionsClient, orkaClient, runnerScaleSet, envData, logger)
}

func run(ctx context.Context, actionsClient *actions.ActionsClient, orkaClient *orka.OrkaClient, runnerScaleSet *types.RunnerScaleSet, envData *env.Data, logger *zap.SugaredLogger) {
runnerManager, err := runners.NewRunnerManager(ctx, actionsClient, runnerScaleSet.Id)
if err != nil {
panic(err)
}
defer runnerManager.Close()

func run(ctx context.Context, actionsClient *actions.ActionsClient, orkaClient *orka.OrkaClient, runnerScaleSet *types.RunnerScaleSet, runnerManager *runners.RunnerManager, envData *env.Data, logger *zap.SugaredLogger) {
runnerProvisioner := provisioner.NewRunnerProvisioner(runnerScaleSet, actionsClient, orkaClient, envData)

vmTracker := runners.NewVMTracker(orkaClient, actionsClient, logger)
go vmTracker.Start(ctx, envData.VMTrackerInterval)

runnerMessageProcessor := runners.NewRunnerMessageProcessor(ctx, runnerManager, runnerProvisioner, vmTracker, runnerScaleSet)

if err = runnerMessageProcessor.StartProcessingMessages(); err != nil {
logger.Errorf("failed to start processing messages for runnerScaleSet %s: %w", runnerScaleSet.Name, err.Error())
if err := runnerMessageProcessor.StartProcessingMessages(); err != nil && !errors.Is(err, context.Canceled) {
logger.Errorf("failed to start processing messages for runnerScaleSet %s: %v", runnerScaleSet.Name, err)
}
}
2 changes: 2 additions & 0 deletions pkg/env/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ const (
EnableMetricsEnvName = "ENABLE_METRICS"
MetricsAddrEnvName = "METRICS_ADDR"
MetricsPollIntervalEnvName = "METRICS_POLL_INTERVAL"

ManageRunnerScaleSetsEnvName = "MANAGE_RUNNER_SCALE_SETS"
)
4 changes: 4 additions & 0 deletions pkg/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type Data struct {
EnableMetrics bool
MetricsAddr string
MetricsPollInterval time.Duration

ManageRunnerScaleSets bool
}

func ParseEnv() *Data {
Expand Down Expand Up @@ -90,6 +92,8 @@ func ParseEnv() *Data {
EnableMetrics: getBoolEnv(EnableMetricsEnvName, false),
MetricsAddr: getEnvWithDefault(MetricsAddrEnvName, ":8080"),
MetricsPollInterval: getDurationEnv(MetricsPollIntervalEnvName, 30*time.Second),

ManageRunnerScaleSets: getBoolEnv(ManageRunnerScaleSetsEnvName, false),
}

envData.OrkaURL = strings.TrimSuffix(envData.OrkaURL, "/")
Expand Down
12 changes: 8 additions & 4 deletions pkg/github/runners/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"go.uber.org/zap"

"github.com/macstadium/orka-github-actions-integration/pkg/github/actions"
ghErrors "github.com/macstadium/orka-github-actions-integration/pkg/github/errors"
"github.com/macstadium/orka-github-actions-integration/pkg/github/messagequeue"
"github.com/macstadium/orka-github-actions-integration/pkg/github/types"
"github.com/macstadium/orka-github-actions-integration/pkg/logging"
Expand All @@ -28,6 +27,8 @@ const (
runnerScaleSetJobMessagesType = "RunnerScaleSetJobMessages"
)

var ErrActiveSession = errors.New("runner scale set already has an active session")

func NewRunnerManager(ctx context.Context, client actions.ActionsService, runnerScaleSetId int) (*RunnerManager, error) {
logger := logging.Logger.Named(fmt.Sprintf("runner-manager-%d", runnerScaleSetId))

Expand Down Expand Up @@ -69,9 +70,12 @@ func createSessionWithRetry(ctx context.Context, logger *zap.SugaredLogger, clie
break
}

clientSideError := &ghErrors.HttpClientSideError{}
if errors.As(err, &clientSideError) && clientSideError.Code != http.StatusConflict {
logger.Info("unable to create message session. The error indicates something is wrong on the client side, won't make any retry.")
actionsErr := &actions.ActionsError{}
if errors.As(err, &actionsErr) {
if actionsErr.StatusCode == http.StatusConflict {
return nil, fmt.Errorf("%w: %s", ErrActiveSession, err)
}
logger.Infof("unable to create message session, client-side error (status %d), won't retry: %s", actionsErr.StatusCode, err.Error())
return nil, fmt.Errorf("create message session http request failed. %w", err)
}

Expand Down
13 changes: 13 additions & 0 deletions pkg/http/logger.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package retryablehttp

import (
"context"
"errors"

"go.uber.org/zap"
)

Expand All @@ -9,6 +12,16 @@ type LeveledLogger struct {
}

func (l *LeveledLogger) Error(msg string, keysAndValues ...interface{}) {
// go-retryablehttp logs every failed request at error level, including ones
// that fail purely because the caller's context was cancelled (e.g. during
// graceful shutdown). Downgrade those to debug to avoid alarming noise.
for i := 1; i < len(keysAndValues); i += 2 {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of confusing, and it might be nice to have a comment that we're iterating through until we find the error we want to get more info on.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I have added an explanation. Let me know what you think

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good -- thanks!

if err, ok := keysAndValues[i].(error); ok && errors.Is(err, context.Canceled) {
l.logger.Debugw(msg, keysAndValues...)
return
}
}

l.logger.Errorw(msg, keysAndValues...)
}

Expand Down
Loading