Skip to content

Commit 28faaf0

Browse files
author
Wei
authored
Merge pull request #8 from cloudpilot-ai/vacant2333-dev-general
feat: add arm check/patch/rollback
2 parents 9f1064c + e683ea6 commit 28faaf0

1,056 files changed

Lines changed: 215246 additions & 2822 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
.idea
2+
migrate

README.md

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,62 @@ go build migrate
1414
docker run --rm -v $(pwd):/app -w /app golang:latest \
1515
bash -c 'GOOS=linux GOARCH=amd64 go build -o migrate_linux_amd64'
1616
```
17+
18+
## ARM NodePool Example
19+
20+
```yaml
21+
template:
22+
metadata:
23+
labels:
24+
node.cloudpilot.ai/managed: "true"
25+
spec:
26+
taints:
27+
- key: node.cloudpilot.ai/arch-arm64
28+
effect: NoSchedule
29+
requirements:
30+
- key: karpenter.k8s.aws/instance-category
31+
operator: NotIn
32+
values:
33+
- p
34+
- g
35+
- gr
36+
- inf
37+
- a
38+
- key: kubernetes.io/arch
39+
operator: In
40+
values:
41+
- arm64
42+
- key: kubernetes.io/os
43+
operator: In
44+
values:
45+
- linux
46+
- key: karpenter.sh/capacity-type
47+
operator: In
48+
values:
49+
- spot
50+
- on-demand
51+
- key: karpenter.k8s.aws/instance-memory
52+
operator: Lt
53+
values:
54+
- "32769"
55+
- key: karpenter.k8s.aws/instance-cpu
56+
operator: Lt
57+
values:
58+
- "17"
59+
- key: beta.kubernetes.io/instance-type
60+
operator: NotIn
61+
values:
62+
- c1.medium
63+
- m1.small
64+
nodeClassRef:
65+
kind: EC2NodeClass
66+
name: cloudpilot
67+
group: karpenter.k8s.aws
68+
expireAfter: Never
69+
disruption:
70+
consolidateAfter: 60m
71+
consolidationPolicy: WhenEmptyOrUnderutilized
72+
budgets:
73+
- nodes: "2"
74+
weight: 2
75+
```

arm_check.go

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io"
7+
"math/rand"
8+
"strings"
9+
"sync"
10+
"time"
11+
12+
"github.com/awslabs/amazon-ecr-credential-helper/ecr-login"
13+
"github.com/chrismellard/docker-credential-acr-env/pkg/credhelper"
14+
"github.com/google/go-containerregistry/pkg/authn"
15+
"github.com/google/go-containerregistry/pkg/authn/github"
16+
"github.com/google/go-containerregistry/pkg/name"
17+
"github.com/google/go-containerregistry/pkg/v1/google"
18+
"github.com/google/go-containerregistry/pkg/v1/remote"
19+
"github.com/google/go-containerregistry/pkg/v1/types"
20+
corev1 "k8s.io/api/core/v1"
21+
)
22+
23+
type ArmResult struct {
24+
Supported bool
25+
Err error
26+
}
27+
28+
const MaxConcurrent = 7
29+
30+
var armResultCache = make(map[string]ArmResult)
31+
var armResultCacheMutex sync.RWMutex
32+
33+
func CheckAllWorkloadsArm(workloads []Workload) []ArmResult {
34+
results := make([]ArmResult, len(workloads))
35+
36+
var wg sync.WaitGroup
37+
sem := make(chan struct{}, MaxConcurrent)
38+
39+
for idx := range workloads {
40+
wg.Add(1)
41+
42+
go func(i int) {
43+
defer wg.Done()
44+
45+
sem <- struct{}{}
46+
defer func() { <-sem }()
47+
48+
cacheKey := fmt.Sprintf("%s:%s:%s", workloads[i].Kind, workloads[i].Name, workloads[i].Namespace)
49+
50+
armResultCacheMutex.RLock()
51+
cachedResult, found := armResultCache[cacheKey]
52+
armResultCacheMutex.RUnlock()
53+
54+
if found {
55+
results[i] = cachedResult
56+
return
57+
}
58+
59+
var supported bool
60+
var err error
61+
62+
for {
63+
supported, err = CheckWorkloadSupportsArm(&workloads[i])
64+
if err != nil && strings.Contains(err.Error(), "TOOMANYREQUESTS") {
65+
time.Sleep(time.Millisecond * time.Duration(rand.Int63n(800)))
66+
} else {
67+
break
68+
}
69+
}
70+
result := ArmResult{Supported: supported, Err: err}
71+
results[i] = result
72+
73+
armResultCacheMutex.Lock()
74+
armResultCache[cacheKey] = result
75+
armResultCacheMutex.Unlock()
76+
}(idx)
77+
}
78+
wg.Wait()
79+
return results
80+
}
81+
82+
func CheckWorkloadSupportsArm(w *Workload) (bool, error) {
83+
var podSpec *corev1.PodSpec
84+
switch w.Kind {
85+
case WorkloadDeployment:
86+
podSpec = &w.deployment.Spec.Template.Spec
87+
case WorkloadStatefulSet:
88+
podSpec = &w.statefulSet.Spec.Template.Spec
89+
default:
90+
return false, fmt.Errorf("unsupported workload kind: %s", w.Kind)
91+
}
92+
images := getPodTemplateImages(*podSpec)
93+
94+
supportArm := true
95+
for _, image := range images {
96+
ret, err := imageSupportsArm64(image)
97+
if err != nil {
98+
return false, fmt.Errorf("failed to check image %s for arm64 support: %w", image, err)
99+
}
100+
if ret == false {
101+
supportArm = false
102+
break
103+
}
104+
}
105+
return supportArm, nil
106+
}
107+
108+
func getPodTemplateImages(podSpec corev1.PodSpec) []string {
109+
var images []string
110+
for _, c := range podSpec.Containers {
111+
images = append(images, c.Image)
112+
}
113+
for _, c := range podSpec.InitContainers {
114+
images = append(images, c.Image)
115+
}
116+
return images
117+
}
118+
119+
var keychain = authn.NewMultiKeychain(
120+
authn.NewKeychainFromHelper(ecr.NewECRHelper(ecr.WithLogger(io.Discard))), // ECR
121+
authn.NewKeychainFromHelper(credhelper.NewACRCredentialsHelper()), // ACR
122+
google.Keychain, // GCR & Artifact Registry
123+
github.Keychain, // GHCR
124+
authn.DefaultKeychain, // local ~/.docker/config.json
125+
)
126+
127+
// imageSupportsArm64 checks whether the given container image supports the linux/arm64
128+
// platform. It returns true if the image manifest list (index) contains an arm64
129+
// variant, or if the single-arch image itself is built for arm64.
130+
func imageSupportsArm64(imageRef string) (bool, error) {
131+
// Parse an arbitrary image reference (registry/name:tag or digest).
132+
ref, err := name.ParseReference(imageRef, name.WeakValidation)
133+
if err != nil {
134+
return false, fmt.Errorf("failed to parse image reference: %w", err)
135+
}
136+
137+
// Pull the descriptor (manifest or index) from the remote registry.
138+
remoteOpts := []remote.Option{
139+
remote.WithAuthFromKeychain(keychain),
140+
remote.WithContext(context.Background()),
141+
}
142+
desc, err := remote.Get(ref, remoteOpts...)
143+
if err != nil {
144+
return false, fmt.Errorf("failed to fetch image descriptor: %w", err)
145+
}
146+
147+
mt := desc.Descriptor.MediaType
148+
// Handle multi-arch images (OCI index / Docker manifest list).
149+
if mt == types.OCIImageIndex || mt == types.DockerManifestList {
150+
idx, err := desc.ImageIndex()
151+
if err != nil {
152+
return false, fmt.Errorf("failed to load image index: %w", err)
153+
}
154+
indexManifest, err := idx.IndexManifest()
155+
if err != nil {
156+
return false, fmt.Errorf("failed to read index manifest: %w", err)
157+
}
158+
for _, manifest := range indexManifest.Manifests {
159+
plat := manifest.Platform
160+
if plat != nil && plat.Architecture == "arm64" && strings.EqualFold(plat.OS, "linux") {
161+
return true, nil // linux/arm64 variant found
162+
}
163+
}
164+
return false, nil // no arm64 variant in the index
165+
}
166+
167+
// Handle single-arch images.
168+
img, err := desc.Image()
169+
if err != nil {
170+
return false, fmt.Errorf("failed to load image: %w", err)
171+
}
172+
cfg, err := img.ConfigFile()
173+
if err != nil {
174+
return false, fmt.Errorf("failed to read image config: %w", err)
175+
}
176+
return cfg.Architecture == "arm64", nil
177+
}

arm_patch.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
8+
)
9+
10+
func patchWorkloadARMAffinity(selectedWorkloads []Workload) {
11+
for _, workload := range selectedWorkloads {
12+
var err error
13+
switch workload.Kind {
14+
case WorkloadDeployment:
15+
err = patchDeploymentARMAffinity(&workload)
16+
case WorkloadStatefulSet:
17+
err = patchStatefulSetARMAffinity(&workload)
18+
}
19+
if err != nil {
20+
fmt.Printf("Failed to patch %s workload %s/%s: %v\n", workload.Kind,
21+
workload.Namespace, workload.Name, err)
22+
}
23+
}
24+
}
25+
26+
func patchDeploymentARMAffinity(workload *Workload) error {
27+
ctx := context.Background()
28+
deployment, err := kubeClient.AppsV1().Deployments(workload.Namespace).
29+
Get(ctx, workload.Name, metav1.GetOptions{})
30+
if err != nil {
31+
return fmt.Errorf("get deployment: %w", err)
32+
}
33+
34+
newDeployment := deployment.DeepCopy()
35+
newDeployment.Spec.Template.Spec.Affinity = ensurePreferAffinity(newDeployment.Spec.Template.Spec.Affinity)
36+
37+
if HasArm64Preference(newDeployment.Spec.Template.Spec.Affinity) {
38+
fmt.Printf("workload %s %s/%s already has arm preference, skip the prefer affinity\n",
39+
workload.Kind, workload.Namespace, workload.Name)
40+
} else {
41+
newDeployment.Spec.Template.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution =
42+
AddArm64Preference(newDeployment.Spec.Template.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution)
43+
}
44+
if CheckWorkloadHasARM64Toleration(newDeployment.Spec.Template.Spec.Tolerations) {
45+
fmt.Printf("workload %s %s/%s already has arm64 toleration, skip it\n",
46+
workload.Kind, workload.Namespace, workload.Name)
47+
} else {
48+
newDeployment.Spec.Template.Spec.Tolerations = AddARM64Toleration(newDeployment.Spec.Template.Spec.Tolerations)
49+
}
50+
51+
return patchResource(ctx, deployment, newDeployment, workload.Namespace, workload.Name, workload.Kind)
52+
}
53+
54+
func patchStatefulSetARMAffinity(workload *Workload) error {
55+
ctx := context.Background()
56+
57+
ss, err := kubeClient.AppsV1().
58+
StatefulSets(workload.Namespace).
59+
Get(ctx, workload.Name, metav1.GetOptions{})
60+
if err != nil {
61+
return fmt.Errorf("get statefulset: %w", err)
62+
}
63+
64+
newSS := ss.DeepCopy()
65+
newSS.Spec.Template.Spec.Affinity = ensurePreferAffinity(newSS.Spec.Template.Spec.Affinity)
66+
67+
if HasArm64Preference(newSS.Spec.Template.Spec.Affinity) {
68+
fmt.Printf("workload %s %s/%s already has arm preference, skip the prefer affinity\n",
69+
workload.Kind, workload.Namespace, workload.Name)
70+
} else {
71+
newSS.Spec.Template.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution =
72+
AddArm64Preference(newSS.Spec.Template.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution)
73+
}
74+
75+
if CheckWorkloadHasARM64Toleration(newSS.Spec.Template.Spec.Tolerations) {
76+
fmt.Printf("workload %s %s/%s already has arm64 toleration, skip it\n",
77+
workload.Kind, workload.Namespace, workload.Name)
78+
} else {
79+
newSS.Spec.Template.Spec.Tolerations = AddARM64Toleration(newSS.Spec.Template.Spec.Tolerations)
80+
}
81+
82+
return patchResource(ctx, ss, newSS, workload.Namespace, workload.Name, workload.Kind)
83+
}

arm_rollback.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
8+
)
9+
10+
func rollbackWorkloadARMAffinity(selectedWorkloads []Workload) {
11+
for _, workload := range selectedWorkloads {
12+
var err error
13+
switch workload.Kind {
14+
case WorkloadDeployment:
15+
err = rollbackDeploymentARMAffinity(&workload)
16+
case WorkloadStatefulSet:
17+
err = rollbackStatefulSetARMAffinity(&workload)
18+
}
19+
if err != nil {
20+
fmt.Printf("Failed to rollback %s workload %s/%s, err: %v\n", workload.Kind,
21+
workload.Namespace, workload.Name, err)
22+
}
23+
}
24+
}
25+
26+
func rollbackDeploymentARMAffinity(workload *Workload) error {
27+
ctx := context.Background()
28+
deployment, err := kubeClient.AppsV1().Deployments(workload.Namespace).
29+
Get(ctx, workload.Name, metav1.GetOptions{})
30+
if err != nil {
31+
return fmt.Errorf("get deployment: %w", err)
32+
}
33+
34+
newDeployment := deployment.DeepCopy()
35+
newDeployment.Spec.Template.Spec.Affinity = ensurePreferAffinity(newDeployment.Spec.Template.Spec.Affinity)
36+
37+
newDeployment.Spec.Template.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution =
38+
RemoveArm64Preference(newDeployment.Spec.Template.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution)
39+
newDeployment.Spec.Template.Spec.Tolerations = RemoveARM64Toleration(newDeployment.Spec.Template.Spec.Tolerations)
40+
41+
return patchResource(ctx, deployment, newDeployment, workload.Namespace, workload.Name, workload.Kind)
42+
}
43+
44+
func rollbackStatefulSetARMAffinity(workload *Workload) error {
45+
ctx := context.Background()
46+
47+
ss, err := kubeClient.AppsV1().
48+
StatefulSets(workload.Namespace).
49+
Get(ctx, workload.Name, metav1.GetOptions{})
50+
if err != nil {
51+
return fmt.Errorf("get statefulset: %w", err)
52+
}
53+
54+
newSS := ss.DeepCopy()
55+
newSS.Spec.Template.Spec.Affinity = ensurePreferAffinity(newSS.Spec.Template.Spec.Affinity)
56+
57+
newSS.Spec.Template.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution =
58+
RemoveArm64Preference(newSS.Spec.Template.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution)
59+
newSS.Spec.Template.Spec.Tolerations = RemoveARM64Toleration(newSS.Spec.Template.Spec.Tolerations)
60+
61+
return patchResource(ctx, ss, newSS, workload.Namespace, workload.Name, workload.Kind)
62+
}

0 commit comments

Comments
 (0)