From df220ae63668e3b0d9144829e441d62af10fdf7d Mon Sep 17 00:00:00 2001 From: michaelawyu Date: Wed, 1 Apr 2026 10:05:56 +1100 Subject: [PATCH 1/3] test: add UTs for the refactored member agent CLI flags setup (#553) --- cmd/memberagent/options/options_test.go | 712 ++++++++++++++++++++++++ 1 file changed, 712 insertions(+) create mode 100644 cmd/memberagent/options/options_test.go diff --git a/cmd/memberagent/options/options_test.go b/cmd/memberagent/options/options_test.go new file mode 100644 index 000000000..dbc1f4c91 --- /dev/null +++ b/cmd/memberagent/options/options_test.go @@ -0,0 +1,712 @@ +/* +Copyright 2026 The KubeFleet Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package options + +import ( + "flag" + "fmt" + "strings" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// TestHubConnectivityOptions tests the parsing of the hub connectivity options defined in HubConnectivityOptions. +func TestHubConnectivityOptions(t *testing.T) { + testCases := []struct { + name string + flagSetName string + args []string + wantHubConnectOpts HubConnectivityOptions + wantErred bool + wantErrMsgSubStr string + }{ + { + name: "all default", + flagSetName: "allDefault", + args: []string{}, + wantHubConnectOpts: HubConnectivityOptions{ + UseCertificateAuth: false, + UseInsecureTLSClient: false, + }, + }, + { + name: "all specified", + flagSetName: "allSpecified", + args: []string{ + "--use-ca-auth=true", + "--tls-insecure=true", + }, + wantHubConnectOpts: HubConnectivityOptions{ + UseCertificateAuth: true, + UseInsecureTLSClient: true, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + flags := flag.NewFlagSet(tc.flagSetName, flag.ContinueOnError) + hubConnectOpts := HubConnectivityOptions{} + hubConnectOpts.AddFlags(flags) + + err := flags.Parse(tc.args) + if tc.wantErred { + if err == nil { + t.Fatalf("flag Parse() = nil, want erred") + } + + if !strings.Contains(err.Error(), tc.wantErrMsgSubStr) { + t.Fatalf("flag Parse() error = %v, want error msg with sub-string %s", err, tc.wantErrMsgSubStr) + } + return + } + + if err != nil { + t.Fatalf("flag Parse() = %v, want nil", err) + } + + if diff := cmp.Diff(hubConnectOpts, tc.wantHubConnectOpts); diff != "" { + t.Errorf("hub connectivity options diff (-got, +want):\n%s", diff) + } + }) + } +} + +// TestCtrlManagerOptions tests the parsing and validation logic of the controller manager options +// defined in CtrlManagerOptions, including the hub and member per-cluster options and the leader +// election options. +func TestCtrlManagerOptions(t *testing.T) { + testCases := []struct { + name string + flagSetName string + args []string + wantCtrlMgrOpts CtrlManagerOptions + wantErred bool + wantErrMsgSubStr string + }{ + { + name: "all default", + flagSetName: "allDefault", + args: []string{}, + wantCtrlMgrOpts: CtrlManagerOptions{ + HubManagerOpts: PerClusterCtrlManagerOptions{ + HealthProbeBindAddress: ":8081", + MetricsBindAddress: ":8080", + PprofPort: 6066, + QPS: 50.0, + Burst: 500, + }, + MemberManagerOpts: PerClusterCtrlManagerOptions{ + HealthProbeBindAddress: ":8091", + MetricsBindAddress: ":8090", + PprofPort: 6065, + QPS: 250.0, + Burst: 1000, + }, + EnablePprof: false, + LeaderElectionOpts: LeaderElectionOptions{ + LeaderElect: false, + LeaseDuration: metav1.Duration{Duration: 15 * time.Second}, + RenewDeadline: metav1.Duration{Duration: 10 * time.Second}, + RetryPeriod: metav1.Duration{Duration: 2 * time.Second}, + ResourceNamespace: "kube-system", + }, + }, + }, + { + name: "all specified", + flagSetName: "allSpecified", + args: []string{ + "--hub-health-probe-bind-address=:18081", + "--hub-metrics-bind-address=:18080", + "--hub-pprof-port=16066", + "--hub-api-qps=100", + "--hub-api-burst=1000", + "--health-probe-bind-address=:18091", + "--metrics-bind-address=:18090", + "--pprof-port=16065", + "--member-api-qps=500", + "--member-api-burst=2000", + "--enable-pprof=true", + "--leader-elect=true", + "--leader-lease-duration=30s", + "--leader-renew-deadline=20s", + "--leader-retry-period=5s", + "--leader-election-namespace=test-namespace", + }, + wantCtrlMgrOpts: CtrlManagerOptions{ + HubManagerOpts: PerClusterCtrlManagerOptions{ + HealthProbeBindAddress: ":18081", + MetricsBindAddress: ":18080", + PprofPort: 16066, + QPS: 100.0, + Burst: 1000, + }, + MemberManagerOpts: PerClusterCtrlManagerOptions{ + HealthProbeBindAddress: ":18091", + MetricsBindAddress: ":18090", + PprofPort: 16065, + QPS: 500.0, + Burst: 2000, + }, + EnablePprof: true, + LeaderElectionOpts: LeaderElectionOptions{ + LeaderElect: true, + LeaseDuration: metav1.Duration{Duration: 30 * time.Second}, + RenewDeadline: metav1.Duration{Duration: 20 * time.Second}, + RetryPeriod: metav1.Duration{Duration: 5 * time.Second}, + ResourceNamespace: "test-namespace", + }, + }, + }, + { + name: "negative hub client QPS value", + flagSetName: "hubQPSNegative", + args: []string{"--hub-api-qps=-5"}, + wantCtrlMgrOpts: CtrlManagerOptions{ + HubManagerOpts: PerClusterCtrlManagerOptions{ + HealthProbeBindAddress: ":8081", + MetricsBindAddress: ":8080", + PprofPort: 6066, + QPS: -1, + Burst: 500, + }, + MemberManagerOpts: PerClusterCtrlManagerOptions{ + HealthProbeBindAddress: ":8091", + MetricsBindAddress: ":8090", + PprofPort: 6065, + QPS: 250.0, + Burst: 1000, + }, + EnablePprof: false, + LeaderElectionOpts: LeaderElectionOptions{ + LeaderElect: false, + LeaseDuration: metav1.Duration{Duration: 15 * time.Second}, + RenewDeadline: metav1.Duration{Duration: 10 * time.Second}, + RetryPeriod: metav1.Duration{Duration: 2 * time.Second}, + ResourceNamespace: "kube-system", + }, + }, + }, + { + name: "hub client QPS parse error", + flagSetName: "hubQPSParseError", + args: []string{"--hub-api-qps=abc"}, + wantErred: true, + wantErrMsgSubStr: "failed to parse float64 value", + }, + { + name: "hub client QPS out of range (too small)", + flagSetName: "hubQPSOutOfRangeTooSmall", + args: []string{"--hub-api-qps=9.9"}, + wantErred: true, + wantErrMsgSubStr: fmt.Sprintf("QPS limit is set to an invalid value (%f), must be a value in the range [10.0, 1000.0]", 9.9), + }, + { + name: "hub client QPS out of range (too large)", + flagSetName: "hubQPSOutOfRangeTooLarge", + args: []string{"--hub-api-qps=1000.1"}, + wantErred: true, + wantErrMsgSubStr: fmt.Sprintf("QPS limit is set to an invalid value (%f), must be a value in the range [10.0, 1000.0]", 1000.1), + }, + { + name: "hub client burst parse error", + flagSetName: "hubBurstParseError", + args: []string{"--hub-api-burst=abc"}, + wantErred: true, + wantErrMsgSubStr: "failed to parse int value", + }, + { + name: "hub client burst out of range (too small)", + flagSetName: "hubBurstOutOfRangeTooSmall", + args: []string{"--hub-api-burst=9"}, + wantErred: true, + wantErrMsgSubStr: fmt.Sprintf("burst limit is set to an invalid value (%d), must be a value in the range [10, 2000]", 9), + }, + { + name: "hub client burst out of range (too large)", + flagSetName: "hubBurstOutOfRangeTooLarge", + args: []string{"--hub-api-burst=2001"}, + wantErred: true, + wantErrMsgSubStr: fmt.Sprintf("burst limit is set to an invalid value (%d), must be a value in the range [10, 2000]", 2001), + }, + { + name: "negative member client QPS value", + flagSetName: "memberQPSNegative", + args: []string{"--member-api-qps=-5"}, + wantCtrlMgrOpts: CtrlManagerOptions{ + HubManagerOpts: PerClusterCtrlManagerOptions{ + HealthProbeBindAddress: ":8081", + MetricsBindAddress: ":8080", + PprofPort: 6066, + QPS: 50.0, + Burst: 500, + }, + MemberManagerOpts: PerClusterCtrlManagerOptions{ + HealthProbeBindAddress: ":8091", + MetricsBindAddress: ":8090", + PprofPort: 6065, + QPS: -1, + Burst: 1000, + }, + EnablePprof: false, + LeaderElectionOpts: LeaderElectionOptions{ + LeaderElect: false, + LeaseDuration: metav1.Duration{Duration: 15 * time.Second}, + RenewDeadline: metav1.Duration{Duration: 10 * time.Second}, + RetryPeriod: metav1.Duration{Duration: 2 * time.Second}, + ResourceNamespace: "kube-system", + }, + }, + }, + { + name: "member client QPS parse error", + flagSetName: "memberQPSParseError", + args: []string{"--member-api-qps=abc"}, + wantErred: true, + wantErrMsgSubStr: "failed to parse float64 value", + }, + { + name: "member client QPS out of range (too small)", + flagSetName: "memberQPSOutOfRangeTooSmall", + args: []string{"--member-api-qps=9.9"}, + wantErred: true, + wantErrMsgSubStr: fmt.Sprintf("QPS limit is set to an invalid value (%f), must be a value in the range [10.0, 10000.0]", 9.9), + }, + { + name: "member client QPS out of range (too large)", + flagSetName: "memberQPSOutOfRangeTooLarge", + args: []string{"--member-api-qps=10000.1"}, + wantErred: true, + wantErrMsgSubStr: fmt.Sprintf("QPS limit is set to an invalid value (%f), must be a value in the range [10.0, 10000.0]", 10000.1), + }, + { + name: "member client burst parse error", + flagSetName: "memberBurstParseError", + args: []string{"--member-api-burst=abc"}, + wantErred: true, + wantErrMsgSubStr: "failed to parse int value", + }, + { + name: "member client burst out of range (too small)", + flagSetName: "memberBurstOutOfRangeTooSmall", + args: []string{"--member-api-burst=9"}, + wantErred: true, + wantErrMsgSubStr: fmt.Sprintf("burst limit is set to an invalid value (%d), must be a value in the range [10, 20000]", 9), + }, + { + name: "member client burst out of range (too large)", + flagSetName: "memberBurstOutOfRangeTooLarge", + args: []string{"--member-api-burst=20001"}, + wantErred: true, + wantErrMsgSubStr: fmt.Sprintf("burst limit is set to an invalid value (%d), must be a value in the range [10, 20000]", 20001), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + flags := flag.NewFlagSet(tc.flagSetName, flag.ContinueOnError) + ctrlMgrOpts := CtrlManagerOptions{} + ctrlMgrOpts.AddFlags(flags) + + err := flags.Parse(tc.args) + if tc.wantErred { + if err == nil { + t.Fatalf("flag Parse() = nil, want erred") + } + + if !strings.Contains(err.Error(), tc.wantErrMsgSubStr) { + t.Fatalf("flag Parse() error = %v, want error msg with sub-string %s", err, tc.wantErrMsgSubStr) + } + return + } + + if err != nil { + t.Fatalf("flag Parse() = %v, want nil", err) + } + + if diff := cmp.Diff(ctrlMgrOpts, tc.wantCtrlMgrOpts); diff != "" { + t.Errorf("controller manager options diff (-got, +want):\n%s", diff) + } + }) + } +} + +// TestApplierOptions tests the parsing and validation logic of the applier options defined in ApplierOptions. +func TestApplierOptions(t *testing.T) { + testCases := []struct { + name string + flagSetName string + args []string + wantApplierOpts ApplierOptions + wantErred bool + wantErrMsgSubStr string + }{ + { + name: "all default", + flagSetName: "allDefault", + args: []string{}, + wantApplierOpts: ApplierOptions{ + ResourceForceDeletionWaitTimeMinutes: 5, + EnablePriorityQueue: false, + RequeueRateLimiterAttemptsWithFixedDelay: 1, + RequeueRateLimiterFixedDelaySeconds: 5, + RequeueRateLimiterExponentialBaseForSlowBackoff: 1.2, + RequeueRateLimiterInitialSlowBackoffDelaySeconds: 2, + RequeueRateLimiterMaxSlowBackoffDelaySeconds: 15, + RequeueRateLimiterExponentialBaseForFastBackoff: 1.5, + RequeueRateLimiterMaxFastBackoffDelaySeconds: 900, + RequeueRateLimiterSkipToFastBackoffForAvailableOrDiffReportedWorkObjs: true, + PriorityLinearEquationCoEffA: -3, + PriorityLinearEquationCoEffB: 100, + }, + }, + { + name: "all specified", + flagSetName: "allSpecified", + args: []string{ + "--deletion-wait-time=10", + "--enable-work-applier-priority-queue=true", + "--work-applier-requeue-rate-limiter-attempts-with-fixed-delay=5", + "--work-applier-requeue-rate-limiter-fixed-delay-seconds=10", + "--work-applier-requeue-rate-limiter-exponential-base-for-slow-backoff=1.5", + "--work-applier-requeue-rate-limiter-initial-slow-backoff-delay-seconds=5", + "--work-applier-requeue-rate-limiter-max-slow-backoff-delay-seconds=30", + "--work-applier-requeue-rate-limiter-exponential-base-for-fast-backoff=2.0", + "--work-applier-requeue-rate-limiter-max-fast-backoff-delay-seconds=1800", + "--work-applier-requeue-rate-limiter-skip-to-fast-backoff-for-available-or-diff-reported-work-objs=false", + "--work-applier-priority-linear-equation-coeff-a=-10", + "--work-applier-priority-linear-equation-coeff-b=500", + }, + wantApplierOpts: ApplierOptions{ + ResourceForceDeletionWaitTimeMinutes: 10, + EnablePriorityQueue: true, + RequeueRateLimiterAttemptsWithFixedDelay: 5, + RequeueRateLimiterFixedDelaySeconds: 10, + RequeueRateLimiterExponentialBaseForSlowBackoff: 1.5, + RequeueRateLimiterInitialSlowBackoffDelaySeconds: 5, + RequeueRateLimiterMaxSlowBackoffDelaySeconds: 30, + RequeueRateLimiterExponentialBaseForFastBackoff: 2.0, + RequeueRateLimiterMaxFastBackoffDelaySeconds: 1800, + RequeueRateLimiterSkipToFastBackoffForAvailableOrDiffReportedWorkObjs: false, + PriorityLinearEquationCoEffA: -10, + PriorityLinearEquationCoEffB: 500, + }, + }, + { + name: "deletion wait time parse error", + flagSetName: "deletionWaitTimeParseError", + args: []string{"--deletion-wait-time=abc"}, + wantErred: true, + wantErrMsgSubStr: "failed to parse integer value", + }, + { + name: "deletion wait time out of range (too small)", + flagSetName: "deletionWaitTimeOutOfRangeTooSmall", + args: []string{"--deletion-wait-time=0"}, + wantErred: true, + wantErrMsgSubStr: fmt.Sprintf("resource force deletion wait time in minutes is set to an invalid value (%d), must be a value in the range [1, 60]", 0), + }, + { + name: "deletion wait time out of range (too large)", + flagSetName: "deletionWaitTimeOutOfRangeTooLarge", + args: []string{"--deletion-wait-time=61"}, + wantErred: true, + wantErrMsgSubStr: fmt.Sprintf("resource force deletion wait time in minutes is set to an invalid value (%d), must be a value in the range [1, 60]", 61), + }, + { + name: "requeue attempts with fixed delay parse error", + flagSetName: "requeueAttemptsWithFixedDelayParseError", + args: []string{"--work-applier-requeue-rate-limiter-attempts-with-fixed-delay=abc"}, + wantErred: true, + wantErrMsgSubStr: "failed to parse integer value", + }, + { + name: "requeue attempts with fixed delay out of range (too small)", + flagSetName: "requeueAttemptsWithFixedDelayOutOfRangeTooSmall", + args: []string{"--work-applier-requeue-rate-limiter-attempts-with-fixed-delay=0"}, + wantErred: true, + wantErrMsgSubStr: fmt.Sprintf("requeue rate limiter attempts with fixed delay is set to an invalid value (%d), must be a value in the range [1, 40]", 0), + }, + { + name: "requeue attempts with fixed delay out of range (too large)", + flagSetName: "requeueAttemptsWithFixedDelayOutOfRangeTooLarge", + args: []string{"--work-applier-requeue-rate-limiter-attempts-with-fixed-delay=41"}, + wantErred: true, + wantErrMsgSubStr: fmt.Sprintf("requeue rate limiter attempts with fixed delay is set to an invalid value (%d), must be a value in the range [1, 40]", 41), + }, + { + name: "requeue fixed delay seconds parse error", + flagSetName: "requeueFixedDelaySecondsParseError", + args: []string{"--work-applier-requeue-rate-limiter-fixed-delay-seconds=abc"}, + wantErred: true, + wantErrMsgSubStr: "failed to parse integer value", + }, + { + name: "requeue fixed delay seconds out of range (too small)", + flagSetName: "requeueFixedDelaySecondsOutOfRangeTooSmall", + args: []string{"--work-applier-requeue-rate-limiter-fixed-delay-seconds=1"}, + wantErred: true, + wantErrMsgSubStr: fmt.Sprintf("requeue rate limiter fixed delay seconds is set to an invalid value (%d), must be a value in the range [2, 300]", 1), + }, + { + name: "requeue fixed delay seconds out of range (too large)", + flagSetName: "requeueFixedDelaySecondsOutOfRangeTooLarge", + args: []string{"--work-applier-requeue-rate-limiter-fixed-delay-seconds=301"}, + wantErred: true, + wantErrMsgSubStr: fmt.Sprintf("requeue rate limiter fixed delay seconds is set to an invalid value (%d), must be a value in the range [2, 300]", 301), + }, + { + name: "requeue exponential base for slow backoff parse error", + flagSetName: "requeueExpBaseForSlowBackoffParseError", + args: []string{"--work-applier-requeue-rate-limiter-exponential-base-for-slow-backoff=abc"}, + wantErred: true, + wantErrMsgSubStr: "failed to parse float value", + }, + { + name: "requeue exponential base for slow backoff out of range (too small)", + flagSetName: "requeueExpBaseForSlowBackoffOutOfRangeTooSmall", + args: []string{"--work-applier-requeue-rate-limiter-exponential-base-for-slow-backoff=1.04"}, + wantErred: true, + wantErrMsgSubStr: fmt.Sprintf("requeue rate limiter exponential base for slow backoff is set to an invalid value (%g), must be a value in the range [1.05, 2]", 1.04), + }, + { + name: "requeue exponential base for slow backoff out of range (too large)", + flagSetName: "requeueExpBaseForSlowBackoffOutOfRangeTooLarge", + args: []string{"--work-applier-requeue-rate-limiter-exponential-base-for-slow-backoff=2.01"}, + wantErred: true, + wantErrMsgSubStr: fmt.Sprintf("requeue rate limiter exponential base for slow backoff is set to an invalid value (%g), must be a value in the range [1.05, 2]", 2.01), + }, + { + name: "requeue initial slow backoff delay seconds parse error", + flagSetName: "requeueInitSlowBackoffDelaySecondsParseError", + args: []string{"--work-applier-requeue-rate-limiter-initial-slow-backoff-delay-seconds=abc"}, + wantErred: true, + wantErrMsgSubStr: "failed to parse integer value", + }, + { + name: "requeue initial slow backoff delay seconds out of range (too small)", + flagSetName: "requeueInitSlowBackoffDelaySecondsOutOfRangeTooSmall", + args: []string{"--work-applier-requeue-rate-limiter-initial-slow-backoff-delay-seconds=1"}, + wantErred: true, + wantErrMsgSubStr: fmt.Sprintf("requeue rate limiter initial slow backoff delay seconds is set to an invalid value (%d), must be a value no less than 2", 1), + }, + { + name: "requeue max slow backoff delay seconds parse error", + flagSetName: "requeueMaxSlowBackoffDelaySecondsParseError", + args: []string{"--work-applier-requeue-rate-limiter-max-slow-backoff-delay-seconds=abc"}, + wantErred: true, + wantErrMsgSubStr: "failed to parse integer value", + }, + { + name: "requeue max slow backoff delay seconds out of range (too small)", + flagSetName: "requeueMaxSlowBackoffDelaySecondsOutOfRangeTooSmall", + args: []string{"--work-applier-requeue-rate-limiter-max-slow-backoff-delay-seconds=1"}, + wantErred: true, + wantErrMsgSubStr: fmt.Sprintf("requeue rate limiter max slow backoff delay seconds is set to an invalid value (%d), must be a value no less than 2", 1), + }, + { + name: "requeue exponential base for fast backoff parse error", + flagSetName: "requeueExpBaseForFastBackoffParseError", + args: []string{"--work-applier-requeue-rate-limiter-exponential-base-for-fast-backoff=abc"}, + wantErred: true, + wantErrMsgSubStr: "failed to parse float value", + }, + { + name: "requeue exponential base for fast backoff out of range (at lower boundary)", + flagSetName: "requeueExpBaseForFastBackoffOutOfRangeAtLowerBoundary", + args: []string{"--work-applier-requeue-rate-limiter-exponential-base-for-fast-backoff=1.0"}, + wantErred: true, + wantErrMsgSubStr: fmt.Sprintf("requeue rate limiter exponential base for fast backoff is set to an invalid value (%g), must be a value in the range (1, 2]", 1.0), + }, + { + name: "requeue exponential base for fast backoff out of range (too large)", + flagSetName: "requeueExpBaseForFastBackoffOutOfRangeTooLarge", + args: []string{"--work-applier-requeue-rate-limiter-exponential-base-for-fast-backoff=2.01"}, + wantErred: true, + wantErrMsgSubStr: fmt.Sprintf("requeue rate limiter exponential base for fast backoff is set to an invalid value (%g), must be a value in the range (1, 2]", 2.01), + }, + { + name: "requeue max fast backoff delay seconds parse error", + flagSetName: "requeueMaxFastBackoffDelaySecondsParseError", + args: []string{"--work-applier-requeue-rate-limiter-max-fast-backoff-delay-seconds=abc"}, + wantErred: true, + wantErrMsgSubStr: "failed to parse integer value", + }, + { + name: "requeue max fast backoff delay seconds out of range (at lower boundary)", + flagSetName: "requeueMaxFastBackoffDelaySecondsOutOfRangeAtLowerBoundary", + args: []string{"--work-applier-requeue-rate-limiter-max-fast-backoff-delay-seconds=0"}, + wantErred: true, + wantErrMsgSubStr: fmt.Sprintf("requeue rate limiter max fast backoff delay seconds is set to an invalid value (%d), must be a value in the range (0, 3600]", 0), + }, + { + name: "requeue max fast backoff delay seconds out of range (too large)", + flagSetName: "requeueMaxFastBackoffDelaySecondsOutOfRangeTooLarge", + args: []string{"--work-applier-requeue-rate-limiter-max-fast-backoff-delay-seconds=3601"}, + wantErred: true, + wantErrMsgSubStr: fmt.Sprintf("requeue rate limiter max fast backoff delay seconds is set to an invalid value (%d), must be a value in the range (0, 3600]", 3601), + }, + { + name: "priority linear equation coefficient A parse error", + flagSetName: "priCoEffAParseError", + args: []string{"--work-applier-priority-linear-equation-coeff-a=abc"}, + wantErred: true, + wantErrMsgSubStr: "failed to parse integer value", + }, + { + name: "priority linear equation coefficient A out of range (non-negative)", + flagSetName: "priCoEffAOutOfRangeNonNegative", + args: []string{"--work-applier-priority-linear-equation-coeff-a=0"}, + wantErred: true, + wantErrMsgSubStr: fmt.Sprintf("priority linear equation coefficient A is set to an invalid value (%d), must be a negative integer no less than -100", 0), + }, + { + name: "priority linear equation coefficient A out of range (too small)", + flagSetName: "priCoEffAOutOfRangeTooSmall", + args: []string{"--work-applier-priority-linear-equation-coeff-a=-101"}, + wantErred: true, + wantErrMsgSubStr: fmt.Sprintf("priority linear equation coefficient A is set to an invalid value (%d), must be a negative integer no less than -100", -101), + }, + { + name: "priority linear equation coefficient B parse error", + flagSetName: "priCoEffBParseError", + args: []string{"--work-applier-priority-linear-equation-coeff-b=abc"}, + wantErred: true, + wantErrMsgSubStr: "failed to parse integer value", + }, + { + name: "priority linear equation coefficient B out of range (too small)", + flagSetName: "priCoEffBOutOfRangeTooSmall", + args: []string{"--work-applier-priority-linear-equation-coeff-b=0"}, + wantErred: true, + wantErrMsgSubStr: fmt.Sprintf("priority linear equation coefficient B is set to an invalid value (%d), must be a positive integer no greater than 1000", 0), + }, + { + name: "priority linear equation coefficient B out of range (too large)", + flagSetName: "priCoEffBOutOfRangeTooLarge", + args: []string{"--work-applier-priority-linear-equation-coeff-b=1001"}, + wantErred: true, + wantErrMsgSubStr: fmt.Sprintf("priority linear equation coefficient B is set to an invalid value (%d), must be a positive integer no greater than 1000", 1001), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + flags := flag.NewFlagSet(tc.flagSetName, flag.ContinueOnError) + applierOpts := ApplierOptions{} + applierOpts.AddFlags(flags) + + err := flags.Parse(tc.args) + if tc.wantErred { + if err == nil { + t.Fatalf("flag Parse() = nil, want erred") + } + + if !strings.Contains(err.Error(), tc.wantErrMsgSubStr) { + t.Fatalf("flag Parse() error = %v, want error msg with sub-string %s", err, tc.wantErrMsgSubStr) + } + return + } + + if err != nil { + t.Fatalf("flag Parse() = %v, want nil", err) + } + + if diff := cmp.Diff(applierOpts, tc.wantApplierOpts); diff != "" { + t.Errorf("applier options diff (-got, +want):\n%s", diff) + } + }) + } +} + +// TestPropertyProviderOptions tests the parsing of the property provider options defined in PropertyProviderOptions. +func TestPropertyProviderOptions(t *testing.T) { + testCases := []struct { + name string + flagSetName string + args []string + wantPropertyProvOpts PropertyProviderOptions + wantErred bool + wantErrMsgSubStr string + }{ + { + name: "all default", + flagSetName: "allDefault", + args: []string{}, + wantPropertyProvOpts: PropertyProviderOptions{ + Region: "", + Name: "none", + CloudConfigFilePath: "/etc/kubernetes/provider/config.json", + EnableAzProviderCostProperties: true, + EnableAzProviderAvailableResourceProperties: true, + EnableAzProviderNamespaceCollection: false, + }, + }, + { + name: "all specified", + flagSetName: "allSpecified", + args: []string{ + "--region=eastus", + "--property-provider=azure", + "--cloud-config=/custom/path/config.json", + "--use-cost-properties-in-azure-provider=false", + "--use-available-res-properties-in-azure-provider=false", + "--enable-namespace-collection-in-property-provider=true", + }, + wantPropertyProvOpts: PropertyProviderOptions{ + Region: "eastus", + Name: "azure", + CloudConfigFilePath: "/custom/path/config.json", + EnableAzProviderCostProperties: false, + EnableAzProviderAvailableResourceProperties: false, + EnableAzProviderNamespaceCollection: true, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + flags := flag.NewFlagSet(tc.flagSetName, flag.ContinueOnError) + propertyProvOpts := PropertyProviderOptions{} + propertyProvOpts.AddFlags(flags) + + err := flags.Parse(tc.args) + if tc.wantErred { + if err == nil { + t.Fatalf("flag Parse() = nil, want erred") + } + + if !strings.Contains(err.Error(), tc.wantErrMsgSubStr) { + t.Fatalf("flag Parse() error = %v, want error msg with sub-string %s", err, tc.wantErrMsgSubStr) + } + return + } + + if err != nil { + t.Fatalf("flag Parse() = %v, want nil", err) + } + + if diff := cmp.Diff(propertyProvOpts, tc.wantPropertyProvOpts); diff != "" { + t.Errorf("property provider options diff (-got, +want):\n%s", diff) + } + }) + } +} From 6936bb83a2f7c2081a998cf013e2dd41a0186503 Mon Sep 17 00:00:00 2001 From: michaelawyu Date: Wed, 1 Apr 2026 16:53:09 +1100 Subject: [PATCH 2/3] feat: set the PDB webhook to be disabled by default temporarily for compatibility reasons (#556) --- cmd/hubagent/options/options_test.go | 4 ++++ cmd/hubagent/options/webhooks.go | 3 ++- test/e2e/fleet_guard_rail_test.go | 10 ++++++++++ test/e2e/webhook_test.go | 4 ++++ 4 files changed, 20 insertions(+), 1 deletion(-) diff --git a/cmd/hubagent/options/options_test.go b/cmd/hubagent/options/options_test.go index 79205d5df..138dfcf19 100644 --- a/cmd/hubagent/options/options_test.go +++ b/cmd/hubagent/options/options_test.go @@ -728,6 +728,7 @@ func TestWebhookOptions(t *testing.T) { GuardRailWhitelistedUsers: "", GuardRailDenyModifyMemberClusterLabels: false, EnableWorkload: false, + EnablePDBs: true, UseCertManager: false, }, }, @@ -752,6 +753,7 @@ func TestWebhookOptions(t *testing.T) { GuardRailWhitelistedUsers: "user1,user2", GuardRailDenyModifyMemberClusterLabels: true, EnableWorkload: true, + EnablePDBs: true, UseCertManager: true, }, }, @@ -767,6 +769,7 @@ func TestWebhookOptions(t *testing.T) { GuardRailWhitelistedUsers: "", GuardRailDenyModifyMemberClusterLabels: false, EnableWorkload: false, + EnablePDBs: true, UseCertManager: false, }, }, @@ -782,6 +785,7 @@ func TestWebhookOptions(t *testing.T) { GuardRailWhitelistedUsers: "", GuardRailDenyModifyMemberClusterLabels: false, EnableWorkload: false, + EnablePDBs: true, UseCertManager: false, }, }, diff --git a/cmd/hubagent/options/webhooks.go b/cmd/hubagent/options/webhooks.go index 954c91a43..5882b5210 100644 --- a/cmd/hubagent/options/webhooks.go +++ b/cmd/hubagent/options/webhooks.go @@ -120,7 +120,8 @@ func (o *WebhookOptions) AddFlags(flags *flag.FlagSet) { flags.BoolVar( &o.EnablePDBs, "enable-pdbs", - false, + // TO-DO (chenyu1): use the true value for compatibility reasons; this will be set to false in a later release. + true, "Enable PodDisruptionBudgets to be created directly in the hub cluster or not. If set to true, the KubeFleet PodDisruptionBudget validating webhook, which blocks the creation of PodDisruptionBudgets outside KubeFleet reserved namespaces, will be disabled. This option only applies if webhooks are enabled.", ) diff --git a/test/e2e/fleet_guard_rail_test.go b/test/e2e/fleet_guard_rail_test.go index 086b2feca..322af5ff1 100644 --- a/test/e2e/fleet_guard_rail_test.go +++ b/test/e2e/fleet_guard_rail_test.go @@ -1168,6 +1168,8 @@ var _ = Describe("fleet guard rail webhook tests for PodDisruptionBudgets", Seri Context("deny PDB operations in fleet-system namespace", func() { It("should deny CREATE operation on PDB in fleet-system namespace for non-whitelisted users", func() { + Skip("PDB webhook is temporarily disabled.") + pdb := policyv1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ Name: "test-pdb-fleet-system", @@ -1191,6 +1193,8 @@ var _ = Describe("fleet guard rail webhook tests for PodDisruptionBudgets", Seri ) BeforeAll(func() { + Skip("PDB webhook is temporarily disabled.") + mcName = fmt.Sprintf(mcNameTemplate, GinkgoParallelProcess()) imcNamespace = fmt.Sprintf(utils.NamespaceNameFormat, mcName) createMemberCluster(mcName, testIdentity, nil, map[string]string{fleetClusterResourceIDAnnotationKey: clusterID1}) @@ -1202,6 +1206,8 @@ var _ = Describe("fleet guard rail webhook tests for PodDisruptionBudgets", Seri }) It("should deny CREATE operation on PDB in fleet-member namespace for user not in MC identity", func() { + Skip("PDB webhook is temporarily disabled.") + pdb := policyv1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ Name: "test-pdb-member", @@ -1218,6 +1224,8 @@ var _ = Describe("fleet guard rail webhook tests for PodDisruptionBudgets", Seri }) It("should deny UPDATE operation on PDB in fleet-member namespace for user not in MC identity", func() { + Skip("PDB webhook is temporarily disabled.") + // First create a PDB as admin. pdb := policyv1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ @@ -1255,6 +1263,8 @@ var _ = Describe("fleet guard rail webhook tests for PodDisruptionBudgets", Seri Context("deny PDB operations in kube-system namespace", func() { It("should deny CREATE operation on PDB in kube-system namespace for non-whitelisted users", func() { + Skip("PDB webhook is temporarily disabled.") + pdb := policyv1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ Name: "test-pdb-kube", diff --git a/test/e2e/webhook_test.go b/test/e2e/webhook_test.go index b1da3ca96..b5d429441 100644 --- a/test/e2e/webhook_test.go +++ b/test/e2e/webhook_test.go @@ -1683,6 +1683,8 @@ var _ = Describe("webhook tests for ResourceOverride UPDATE operations", Ordered var _ = Describe("webhook tests for PodDisruptionBudget CREATE operations", func() { Context("deny PDB creation in non-reserved namespaces", func() { It("should deny CREATE operation on PDB in default namespace", func() { + Skip("PDB webhook is temporarily disabled.") + pdb := policyv1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ Name: "test-pdb", @@ -1707,6 +1709,8 @@ var _ = Describe("webhook tests for PodDisruptionBudget CREATE operations", func }) It("should allow CREATE operation on PDB in kube-system namespace for master users", func() { + Skip("PDB webhook is temporarily disabled.") + pdb = policyv1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ Name: "test-pdb-kube-system", From 382c2cfe2287b552de96c5f3468db05668e90a9f Mon Sep 17 00:00:00 2001 From: michaelawyu Date: Wed, 1 Apr 2026 16:54:25 +1100 Subject: [PATCH 3/3] feat: check in performance test related code/utilities/scripts (2 of N) (#543) --- hack/perftest/1000placements/README.md | 35 ++++ .../1000placements/example-placement.yaml | 24 +++ hack/perftest/1000placements/main.go | 116 ++++++++++++++ hack/perftest/1000placements/utils/cleanup.go | 130 +++++++++++++++ hack/perftest/1000placements/utils/latency.go | 51 ++++++ .../1000placements/utils/placement.go | 125 +++++++++++++++ hack/perftest/1000placements/utils/poll.go | 101 ++++++++++++ hack/perftest/1000placements/utils/res.go | 150 ++++++++++++++++++ hack/perftest/1000placements/utils/setup.go | 133 ++++++++++++++++ 9 files changed, 865 insertions(+) create mode 100644 hack/perftest/1000placements/README.md create mode 100644 hack/perftest/1000placements/example-placement.yaml create mode 100644 hack/perftest/1000placements/main.go create mode 100644 hack/perftest/1000placements/utils/cleanup.go create mode 100644 hack/perftest/1000placements/utils/latency.go create mode 100644 hack/perftest/1000placements/utils/placement.go create mode 100644 hack/perftest/1000placements/utils/poll.go create mode 100644 hack/perftest/1000placements/utils/res.go create mode 100644 hack/perftest/1000placements/utils/setup.go diff --git a/hack/perftest/1000placements/README.md b/hack/perftest/1000placements/README.md new file mode 100644 index 000000000..ffdfb79d8 --- /dev/null +++ b/hack/perftest/1000placements/README.md @@ -0,0 +1,35 @@ +# KubeFleet Performance/Scalability Test Utility: Creating 1K Placements Concurrently + +This directory contains a utility program that creates 1K placements that run concurrently and polls +until their full completion. + +The program is added for the purpose of testing the performance and scalability of KubeFleet. + +## Before you begin + +* Set up a KubeFleet deployment. +* Make sure that all member clusters are labelled with `placement-group=N`, where N ranges from 0 to 9. +Placements created by this utility program will be each assigned an index X, ranging from 0 to 999, +and a placement of index X will select all member clusters that are labelled with `placement-group=X%10`. +* The program requires the following tools (aside from the Go runtime) to be installed: + * `curl` (for retrieving pprof data) + +> If you have followed the instructions in `../../README.md` and have used the given scripts and utility programs +> to run the performance/scalability test, all the steps above should have been done for you already. + +## Running the utility program + +Run the commands below to run the utility program: + +```bash +RUN_NAME=example go run main.go +``` + +With the default setup, it might take ~1 hour before all 1K placements are fully completed. The program +reports the progress in the output. + +After the program completes, run the commands below to clean up the created 1K placements: + +```bash +CLEANUP=set go run main.go +``` \ No newline at end of file diff --git a/hack/perftest/1000placements/example-placement.yaml b/hack/perftest/1000placements/example-placement.yaml new file mode 100644 index 000000000..c141e76ad --- /dev/null +++ b/hack/perftest/1000placements/example-placement.yaml @@ -0,0 +1,24 @@ +apiVersion: placement.kubernetes-fleet.io/v1beta1 +kind: ClusterResourcePlacement +metadata: + name: crp-0 +spec: + policy: + placementType: PickAll + affinity: + clusterAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + clusterSelectorTerms: + - labelSelector: + matchLabels: + placement-group: "0" + resourceSelectors: + - group: "" + kind: Namespace + version: v1 + name: work-0 + strategy: + type: RollingUpdate + rollingUpdate: + maxUnavailable: 100% + unavailablePeriodSeconds: 1 diff --git a/hack/perftest/1000placements/main.go b/hack/perftest/1000placements/main.go new file mode 100644 index 000000000..e02fbdec8 --- /dev/null +++ b/hack/perftest/1000placements/main.go @@ -0,0 +1,116 @@ +package main + +import ( + "context" + "fmt" + "os" + "time" + + "k8s.io/apimachinery/pkg/util/wait" + + "github.com/kubefleet-dev/kubefleet/hack/perftest/1000placements/utils" +) + +const ( + resourceSetupWorkerCount = 15 + longPollingWorkerCount = 15 + resourceCreationCoolDownPeriod = time.Second * 1 + longPollingCoolDownPeriod = time.Second * 2 + betweenStageCoolDownPeriod = time.Second * 30 + + maxCRPToCreateCount = 1000 + + configMapDataByteCount = 1024 // 1 KB. +) + +var ( + // Add trigger points for dumping pprof profiles when a specific # of placement + // have been created. + triggerPtsForMemProfileDumping = map[int]bool{} +) + +var ( + retryOpsBackoff = wait.Backoff{ + Steps: 4, + Duration: 4 * time.Second, + Factor: 2.0, + Jitter: 0.1, + } +) + +func main() { + ctx := context.Background() + + // Read the arguments. + doCleanUp := false + cleanUpFlag := os.Getenv("CLEANUP") + if len(cleanUpFlag) != 0 { + doCleanUp = true + } + + runName := os.Getenv("RUN_NAME") + if len(runName) == 0 { + panic("RUN_NAME environment variable is not set") + } + + retrievePprofData := false + if s := os.Getenv("RETRIEVE_PPROF_DATA"); len(s) != 0 { + retrievePprofData = true + } + var pProfEndpoint string + if retrievePprofData { + if pProfEndpoint = os.Getenv("PPROF_ENDPOINT"); len(pProfEndpoint) == 0 { + panic("PPROF_ENDPOINT environment variable is not set") + } + } + + runner := utils.New( + runName, + resourceSetupWorkerCount, + longPollingWorkerCount, + betweenStageCoolDownPeriod, + resourceCreationCoolDownPeriod, + longPollingCoolDownPeriod, + configMapDataByteCount, + maxCRPToCreateCount, + triggerPtsForMemProfileDumping, + pProfEndpoint, + retryOpsBackoff, + ) + + if doCleanUp { + runner.CleanUp(ctx) + return + } + + fmt.Println("Preparing the resources...") + runner.CreateResources(ctx) + + // Cool down. + fmt.Println("Cooling down...") + runner.CoolDown() + + fmt.Println("Creating the placements...") + runner.CreatePlacements(ctx) + + // Cool down. + fmt.Println("Cooling down...") + runner.CoolDown() + + fmt.Println("Long polling the placements...") + runner.LongPollPlacements(ctx) + + fmt.Println("Tracking latency") + runner.TrackLatency(ctx) + + if retrievePprofData { + fmt.Println("retrieving final pprof data") + runner.RetrievePProfProfile(maxCRPToCreateCount + 1) + } + + // Tally the latency quantiles. + fmt.Println("Tallying latency quantiles...") + runner.TallyLatencyQuantiles() + + fmt.Println("All placements have been completed, exiting.") +} diff --git a/hack/perftest/1000placements/utils/cleanup.go b/hack/perftest/1000placements/utils/cleanup.go new file mode 100644 index 000000000..af27a37bd --- /dev/null +++ b/hack/perftest/1000placements/utils/cleanup.go @@ -0,0 +1,130 @@ +package utils + +import ( + "context" + "fmt" + "sync" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" + + placementv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1" +) + +func (r *Runner) CleanUp(ctx context.Context) { + wg := sync.WaitGroup{} + + // Run the producer. + wg.Add(1) + go func() { + defer wg.Done() + + for i := 0; i < r.maxCRPToCreateCount; i++ { + select { + case r.toDeleteChan <- i: + case <-ctx.Done(): + close(r.toDeleteChan) + return + } + } + + close(r.toDeleteChan) + }() + + // Run the workers. + for i := 0; i < r.resourceSetupWorkerCount; i++ { + wg.Add(1) + go func(workerIdx int) { + defer wg.Done() + + for { + // Read from the channel. + var resIdx int + var readOk bool + select { + case resIdx, readOk = <-r.toDeleteChan: + if !readOk { + fmt.Printf("worker %d exits\n", workerIdx) + return + } + case <-ctx.Done(): + return + } + + // Delete the CRPs. + crp := placementv1beta1.ClusterResourcePlacement{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(placementNameFmt, resIdx), + }, + } + errAfterRetries := retry.OnError(r.retryOpsBackoff, func(err error) bool { + return err != nil && !errors.IsNotFound(err) + }, func() error { + return r.hubClient.Delete(ctx, &crp) + }) + if errAfterRetries != nil && !errors.IsNotFound(errAfterRetries) { + fmt.Printf("worker %d: failed to delete CRP %s after retries: %v\n", workerIdx, fmt.Sprintf(placementNameFmt, resIdx), errAfterRetries) + continue + } + + // Wait until the CRP is deleted. + errAfterRetries = retry.OnError(r.retryOpsBackoff, func(err error) bool { + return err != nil && !errors.IsNotFound(err) + }, func() error { + crp := placementv1beta1.ClusterResourcePlacement{} + err := r.hubClient.Get(ctx, types.NamespacedName{Name: fmt.Sprintf(placementNameFmt, resIdx)}, &crp) + if err == nil { + return fmt.Errorf("CRP %s still exists", fmt.Sprintf(placementNameFmt, resIdx)) + } + return err + }) + if !errors.IsNotFound(errAfterRetries) { + fmt.Printf("worker %d: failed to wait for CRP %s to be deleted after retries: %v\n", workerIdx, fmt.Sprintf(placementNameFmt, resIdx), errAfterRetries) + } else { + fmt.Printf("worker %d: deleted CRP %s\n", workerIdx, fmt.Sprintf(placementNameFmt, resIdx)) + } + + // Delete the namespace if it exists. + namespace := corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(nsNameFmt, resIdx), + }, + } + errAfterRetries = retry.OnError(r.retryOpsBackoff, func(err error) bool { + return err != nil && !errors.IsNotFound(err) + }, func() error { + return r.hubClient.Delete(ctx, &namespace) + }) + if errAfterRetries != nil && !errors.IsNotFound(errAfterRetries) { + fmt.Printf("worker %d: failed to delete namespace %s after retries: %v\n", workerIdx, fmt.Sprintf(nsNameFmt, resIdx), errAfterRetries) + continue + } + + // Wait until the namespace is deleted. + errAfterRetries = retry.OnError(r.retryOpsBackoff, func(err error) bool { + return err != nil && !errors.IsNotFound(err) + }, func() error { + namespace := corev1.Namespace{} + err := r.hubClient.Get(ctx, types.NamespacedName{Name: fmt.Sprintf(nsNameFmt, resIdx)}, &namespace) + if err == nil { + return fmt.Errorf("namespace %s still exists", fmt.Sprintf(nsNameFmt, resIdx)) + } + return err + }) + if errAfterRetries == nil || !errors.IsNotFound(errAfterRetries) { + fmt.Printf("worker %d: failed to wait for namespace %s to be deleted after retries: %v\n", workerIdx, fmt.Sprintf(nsNameFmt, resIdx), errAfterRetries) + } else { + fmt.Printf("worker %d: deleted namespace %s\n", workerIdx, fmt.Sprintf(nsNameFmt, resIdx)) + } + } + }(i) + } + wg.Wait() + + // Cool down. + time.Sleep(r.betweenStageCoolDownPeriod) +} diff --git a/hack/perftest/1000placements/utils/latency.go b/hack/perftest/1000placements/utils/latency.go new file mode 100644 index 000000000..ca5ee0723 --- /dev/null +++ b/hack/perftest/1000placements/utils/latency.go @@ -0,0 +1,51 @@ +package utils + +import ( + "context" + "fmt" + "math" + "sort" + "sync" +) + +func (r *Runner) TrackLatency(ctx context.Context) { + wg := sync.WaitGroup{} + + wg.Add(1) + go func() { + defer wg.Done() + + for { + var attempt latencyTrackAttempt + var readOK bool + select { + case attempt, readOK = <-r.toTrackLatencyChan: + if !readOK { + return + } + fmt.Printf("latency tracker: CRP %s has latency %v\n", fmt.Sprintf(placementNameFmt, attempt.resIdx), attempt.latency) + r.placementCompletionLatencyByName[fmt.Sprintf(placementNameFmt, attempt.resIdx)] = attempt.latency + case <-ctx.Done(): + return + } + } + }() + wg.Wait() +} + +func (r *Runner) TallyLatencyQuantiles() { + latencies := make([]float64, 0, len(r.placementCompletionLatencyByName)) + for _, latency := range r.placementCompletionLatencyByName { + latencies = append(latencies, float64(latency.Seconds())) + } + sort.Slice(latencies, func(i, j int) bool { + return latencies[i] < latencies[j] + }) + q25 := int(math.Floor(float64(len(latencies)) * 0.25)) + q50 := int(math.Floor(float64(len(latencies)) * 0.50)) + q75 := int(math.Floor(float64(len(latencies)) * 0.75)) + q90 := int(math.Floor(float64(len(latencies)) * 0.90)) + q99 := int(math.Floor(float64(len(latencies)) * 0.99)) + fmt.Printf("latencies: 25th=%v, 50th=%v, 75th=%v, 90th=%v, 99th=%v\n", + latencies[q25], latencies[q50], latencies[q75], latencies[q90], latencies[q99]) +} diff --git a/hack/perftest/1000placements/utils/placement.go b/hack/perftest/1000placements/utils/placement.go new file mode 100644 index 000000000..25a31cadf --- /dev/null +++ b/hack/perftest/1000placements/utils/placement.go @@ -0,0 +1,125 @@ +package utils + +import ( + "context" + "fmt" + "sync" + "time" + + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/util/retry" + "k8s.io/utils/ptr" + + placementv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1" +) + +func (r *Runner) CreatePlacements(ctx context.Context) { + wg := sync.WaitGroup{} + + // Run the producer. + wg.Add(1) + go func() { + defer wg.Done() + + for i := 0; i < r.maxCRPToCreateCount; i++ { + select { + case r.toCreatePlacementsChan <- i: + case <-ctx.Done(): + close(r.toCreatePlacementsChan) + return + } + } + + close(r.toCreatePlacementsChan) + }() + + // Run the workers to create the placements. + for i := 0; i < r.resourceSetupWorkerCount; i++ { + wg.Add(1) + + go func(workerIdx int) { + defer wg.Done() + + for { + // Read from the channel. + var resIdx int + var readOk bool + select { + case resIdx, readOk = <-r.toCreatePlacementsChan: + if !readOk { + fmt.Printf("worker %d exits\n", workerIdx) + return + } + case <-ctx.Done(): + return + } + + // Create the CRP. + crp := placementv1beta1.ClusterResourcePlacement{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(placementNameFmt, resIdx), + }, + Spec: placementv1beta1.PlacementSpec{ + Policy: &placementv1beta1.PlacementPolicy{ + PlacementType: placementv1beta1.PickAllPlacementType, + Affinity: &placementv1beta1.Affinity{ + ClusterAffinity: &placementv1beta1.ClusterAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &placementv1beta1.ClusterSelector{ + ClusterSelectorTerms: []placementv1beta1.ClusterSelectorTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + placementGroupLabelKey: fmt.Sprintf("%d", resIdx%10), + }, + }, + }, + }, + }, + }, + }, + }, + ResourceSelectors: []placementv1beta1.ResourceSelectorTerm{ + { + Group: "", + Kind: "Namespace", + Version: "v1", + Name: fmt.Sprintf(nsNameFmt, resIdx), + }, + }, + Strategy: placementv1beta1.RolloutStrategy{ + Type: placementv1beta1.RollingUpdateRolloutStrategyType, + RollingUpdate: &placementv1beta1.RollingUpdateConfig{ + MaxUnavailable: ptr.To(intstr.Parse("100%")), + UnavailablePeriodSeconds: ptr.To(1), + }, + }, + }, + } + errAfterRetries := retry.OnError(r.retryOpsBackoff, func(err error) bool { + return err != nil && !errors.IsAlreadyExists(err) + }, func() error { + return r.hubClient.Create(ctx, &crp) + }) + if errAfterRetries != nil && !errors.IsAlreadyExists(errAfterRetries) { + fmt.Printf("worker %d: failed to create CRP %s after retries: %v\n", workerIdx, crp.Name, errAfterRetries) + continue + } + + // Set the CRP to the long pollers. + r.toLongPollPlacementsChan <- resIdx + r.longPollingPlacementsCount.Add(1) + + // Dump the memory profile if needed. + if _, ok := r.triggerPtsForMemProfileDumping[resIdx]; ok { + fmt.Printf("worker %d: retrieving pprof data after CRP %d is created\n", workerIdx, resIdx) + r.RetrievePProfProfile(resIdx) + } + // Cool down. + time.Sleep(r.resourceCreationCoolDownPeriod) + } + }(i) + } + wg.Wait() +} diff --git a/hack/perftest/1000placements/utils/poll.go b/hack/perftest/1000placements/utils/poll.go new file mode 100644 index 000000000..e53778414 --- /dev/null +++ b/hack/perftest/1000placements/utils/poll.go @@ -0,0 +1,101 @@ +package utils + +import ( + "context" + "fmt" + "sync" + "time" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + placementv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1" +) + +func (r *Runner) LongPollPlacements(ctx context.Context) { + wg := sync.WaitGroup{} + + // Start waiting for all placements to be available. + wg.Add(1) + go func() { + defer wg.Done() + + for { + select { + case <-ctx.Done(): + return + default: + } + + c1 := r.longPollingPlacementsCount.Load() + c2 := r.completedPlacementCount.Load() + if c2 >= c1 { + fmt.Println("all CRPs are now available") + close(r.toLongPollPlacementsChan) + return + } + + fmt.Printf("waiting for %d CRPs to be available, %d are available\n", c1, c2) + time.Sleep(time.Second * 5) + } + }() + + // Run the long pollers. + for i := 0; i < r.longPollingWorkerCount; i++ { + wg.Add(1) + + go func(pollerIdx int) { + defer wg.Done() + + for { + // Read from the channel. + var resIdx int + var readOK bool + select { + case resIdx, readOK = <-r.toLongPollPlacementsChan: + if !readOK { + fmt.Printf("long poller %d exits\n", pollerIdx) + return + } + case <-ctx.Done(): + return + } + + // Read the CRP. + var crp placementv1beta1.ClusterResourcePlacement + if err := r.hubClient.Get(ctx, types.NamespacedName{Name: fmt.Sprintf(placementNameFmt, resIdx)}, &crp); err != nil { + fmt.Printf("poller %d: failed to get CRP %s: %v\n", pollerIdx, fmt.Sprintf(placementNameFmt, resIdx), err) + // Requeue this CRP. + time.Sleep(r.longPollingCoolDownPeriod) + r.toLongPollPlacementsChan <- resIdx + continue + } + + // Check the status of the CRP. + availableCond := meta.FindStatusCondition(crp.Status.Conditions, string(placementv1beta1.ClusterResourcePlacementAvailableConditionType)) + if availableCond == nil || availableCond.Status != metav1.ConditionTrue || availableCond.ObservedGeneration != crp.Generation { + fmt.Printf("poller %d: CRP %s is not available yet\n", pollerIdx, fmt.Sprintf(placementNameFmt, resIdx)) + // Requeue this CRP. + time.Sleep(r.longPollingCoolDownPeriod) + r.toLongPollPlacementsChan <- resIdx + } else { + // The CRP is available. + fmt.Printf("poller %d: CRP %s is available\n", pollerIdx, fmt.Sprintf(placementNameFmt, resIdx)) + createdTimestamp := crp.GetCreationTimestamp() + availablityLatency := availableCond.LastTransitionTime.Time.Sub(createdTimestamp.Time) + r.toTrackLatencyChan <- latencyTrackAttempt{ + latency: availablityLatency, + resIdx: resIdx, + } + + r.completedPlacementCount.Add(1) + } + } + }(i) + } + + wg.Wait() + fmt.Println("All long pollers have completed") + close(r.toTrackLatencyChan) +} diff --git a/hack/perftest/1000placements/utils/res.go b/hack/perftest/1000placements/utils/res.go new file mode 100644 index 000000000..5edb4ceb3 --- /dev/null +++ b/hack/perftest/1000placements/utils/res.go @@ -0,0 +1,150 @@ +package utils + +import ( + "context" + "crypto/rand" + "encoding/base64" + "fmt" + "sync" + "time" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/retry" + "k8s.io/utils/ptr" +) + +func (r *Runner) CreateResources(ctx context.Context) { + wg := sync.WaitGroup{} + + // Run the producer. + wg.Add(1) + go func() { + defer wg.Done() + + for i := 0; i < r.maxCRPToCreateCount; i++ { + select { + case r.toCreateResourcesChan <- i: + case <-ctx.Done(): + close(r.toCreateResourcesChan) + return + } + } + + close(r.toCreateResourcesChan) + }() + + // Run the workers to create namespaces and configMaps. + for i := 0; i < r.resourceSetupWorkerCount; i++ { + wg.Add(1) + go func(workerIdx int) { + defer wg.Done() + + for { + // Read from the channel. + var resIdx int + var readOk bool + select { + case resIdx, readOk = <-r.toCreateResourcesChan: + if !readOk { + fmt.Printf("worker %d exits\n", workerIdx) + return + } + case <-ctx.Done(): + return + } + + // Create the namespace. + namespace := corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(nsNameFmt, resIdx), + }, + } + errAfterRetries := retry.OnError(r.retryOpsBackoff, func(err error) bool { + return err != nil && !errors.IsAlreadyExists(err) + }, func() error { + return r.hubClient.Create(ctx, &namespace) + }) + if errAfterRetries != nil && !errors.IsAlreadyExists(errAfterRetries) { + fmt.Printf("worker %d: failed to create namespace %s after retries: %v\n", workerIdx, fmt.Sprintf(nsNameFmt, resIdx), errAfterRetries) + continue + } + + // Create a configMap in the namespace. + fooValBytes := make([]byte, r.configMapDataByteCount) + _, err := rand.Read(fooValBytes) + if err != nil { + // This should never run; Go documents that rand.Read will never fail unless it is run on legacy + // Linux platforms. + panic(fmt.Sprintf("failed to generate random bytes for configMap data: %v", err)) + } + fooValStr := base64.StdEncoding.EncodeToString(fooValBytes) + configMap := corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(configMapNameFmt, resIdx), + Namespace: fmt.Sprintf(nsNameFmt, resIdx), + }, + Data: map[string]string{ + "foo": fooValStr, + }, + } + errAfterRetries = retry.OnError(r.retryOpsBackoff, func(err error) bool { + return err != nil && !errors.IsAlreadyExists(err) + }, func() error { + return r.hubClient.Create(ctx, &configMap) + }) + if errAfterRetries != nil && !errors.IsAlreadyExists(errAfterRetries) { + fmt.Printf("worker %d: failed to create configMap %s in namespace %s after retries: %v\n", workerIdx, fmt.Sprintf(configMapNameFmt, resIdx), fmt.Sprintf(nsNameFmt, resIdx), errAfterRetries) + continue + } + + // Create a deployment in the namespace. + deployment := appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(deployNameFmt, resIdx), + Namespace: fmt.Sprintf(nsNameFmt, resIdx), + }, + Spec: appsv1.DeploymentSpec{ + Replicas: ptr.To(int32(1)), + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "pause", + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "pause", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "pause", + Image: "registry.k8s.io/pause:3.9", + }, + }, + TerminationGracePeriodSeconds: ptr.To(int64(60)), + }, + }, + }, + } + errAfterRetries = retry.OnError(r.retryOpsBackoff, func(err error) bool { + return err != nil && !errors.IsAlreadyExists(err) + }, func() error { + return r.hubClient.Create(ctx, &deployment) + }) + if errAfterRetries != nil && !errors.IsAlreadyExists(errAfterRetries) { + fmt.Printf("worker %d: failed to create deployment %s in namespace %s after retries: %v\n", workerIdx, fmt.Sprintf(deployNameFmt, resIdx), fmt.Sprintf(nsNameFmt, resIdx), errAfterRetries) + continue + } + + fmt.Printf("worker %d: created namespace %s, configMap %s, and deployment %s\n", workerIdx, fmt.Sprintf(nsNameFmt, resIdx), fmt.Sprintf(configMapNameFmt, resIdx), fmt.Sprintf(deployNameFmt, resIdx)) + time.Sleep(r.resourceCreationCoolDownPeriod) + } + }(i) + } + wg.Wait() +} diff --git a/hack/perftest/1000placements/utils/setup.go b/hack/perftest/1000placements/utils/setup.go new file mode 100644 index 000000000..934b98ed2 --- /dev/null +++ b/hack/perftest/1000placements/utils/setup.go @@ -0,0 +1,133 @@ +package utils + +import ( + "fmt" + "os/exec" + "sync/atomic" + "time" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes/scheme" + + "k8s.io/apimachinery/pkg/util/wait" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + placementv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1" +) + +const ( + nsNameFmt = "work-%d" + configMapNameFmt = "data-%d" + deployNameFmt = "app-%d" + + placementNameFmt = "crp-%d" +) + +const ( + placementGroupLabelKey = "placement-group" +) + +type latencyTrackAttempt struct { + latency time.Duration + resIdx int +} + +type Runner struct { + runName string + hubClient client.Client + + resourceSetupWorkerCount int + longPollingWorkerCount int + betweenStageCoolDownPeriod time.Duration + resourceCreationCoolDownPeriod time.Duration + longPollingCoolDownPeriod time.Duration + + configMapDataByteCount int + + maxCRPToCreateCount int + + triggerPtsForMemProfileDumping map[int]bool + pProfEndpoint string + + retryOpsBackoff wait.Backoff + + toDeleteChan chan int + toCreateResourcesChan chan int + toCreatePlacementsChan chan int + toLongPollPlacementsChan chan int + toTrackLatencyChan chan latencyTrackAttempt + + longPollingPlacementsCount atomic.Int32 + completedPlacementCount atomic.Int32 + + placementCompletionLatencyByName map[string]time.Duration +} + +func New( + runName string, + resourceSetupWorkerCount, longPollingWorkerCount int, + betweenStageCoolDownPeriod, resourceCreationCoolDownPeriod, longPollingCoolDownPeriod time.Duration, + configMapDataByteCount, maxCRPToCreateCount int, + triggerPtsForMemProfileDumping map[int]bool, + pProfEndpoint string, + retryOpsBackoff wait.Backoff, +) *Runner { + // Set up the K8s client for the hub cluster. + hubClusterConfig := ctrl.GetConfigOrDie() + hubClusterConfig.QPS = 200 + hubClusterConfig.Burst = 400 + hubClient, err := client.New(hubClusterConfig, client.Options{ + Scheme: scheme.Scheme, + }) + if err != nil { + panic(fmt.Sprintf("Failed to create hub client: %v", err)) + } + + return &Runner{ + runName: runName, + hubClient: hubClient, + resourceSetupWorkerCount: resourceSetupWorkerCount, + longPollingWorkerCount: longPollingWorkerCount, + betweenStageCoolDownPeriod: betweenStageCoolDownPeriod, + resourceCreationCoolDownPeriod: resourceCreationCoolDownPeriod, + longPollingCoolDownPeriod: longPollingCoolDownPeriod, + configMapDataByteCount: configMapDataByteCount, + maxCRPToCreateCount: maxCRPToCreateCount, + triggerPtsForMemProfileDumping: triggerPtsForMemProfileDumping, + pProfEndpoint: pProfEndpoint, + retryOpsBackoff: retryOpsBackoff, + toDeleteChan: make(chan int, 20), + toCreateResourcesChan: make(chan int, maxCRPToCreateCount), + toCreatePlacementsChan: make(chan int, 20), + toLongPollPlacementsChan: make(chan int, maxCRPToCreateCount), + toTrackLatencyChan: make(chan latencyTrackAttempt, maxCRPToCreateCount+1), + placementCompletionLatencyByName: make(map[string]time.Duration, maxCRPToCreateCount), + } +} + +func init() { + // Set up the scheme. + if err := placementv1beta1.AddToScheme(scheme.Scheme); err != nil { + panic(fmt.Sprintf("Failed to add placement v1beta1 APIs to the scheme: %v", err)) + } + if err := corev1.AddToScheme(scheme.Scheme); err != nil { + panic(fmt.Sprintf("Failed to add core v1 APIs to the scheme: %v", err)) + } + if err := appsv1.AddToScheme(scheme.Scheme); err != nil { + panic(fmt.Sprintf("Failed to add apps v1 APIs to the scheme: %v", err)) + } +} + +func (r *Runner) CoolDown() { + time.Sleep(r.betweenStageCoolDownPeriod) +} + +func (r *Runner) RetrievePProfProfile(retrievalIdx int) { + cmd := exec.Command("curl", "-o", fmt.Sprintf("pprof_%s_%d.out", r.runName, retrievalIdx), r.pProfEndpoint) // #nosec G204 + if err := cmd.Run(); err != nil { + fmt.Printf("Failed to run curl command to retrieve pprof data: %v\n", err) + return + } +}