From 5b0d57e48db0cd5b8db4e9c0a6d8a6553c43c0b8 Mon Sep 17 00:00:00 2001 From: Lynn Date: Thu, 3 Apr 2025 17:53:37 +0800 Subject: [PATCH 1/6] *: concurrency handle a single metric --- cmd/diag/command/collect.go | 1 + collector/collect.go | 50 +++--- collector/prometheus.go | 302 +++++++++++++++++++++++++----------- collector/util.go | 34 ++++ pkg/utils/tokenlimiter.go | 3 +- 5 files changed, 277 insertions(+), 113 deletions(-) create mode 100644 collector/util.go diff --git a/cmd/diag/command/collect.go b/cmd/diag/command/collect.go index a5cbffd6..6bc07300 100644 --- a/cmd/diag/command/collect.go +++ b/cmd/diag/command/collect.go @@ -158,6 +158,7 @@ func newCollectCmd() *cobra.Command { cmd.Flags().StringSliceVar(&cOpt.MetricsFilter, "metricsfilter", nil, "prefix of metrics to collect") cmd.Flags().StringSliceVar(&cOpt.MetricsExclude, "metricsexclude", []string{"node_interrupts_total"}, "prefix of metrics to exclude") cmd.Flags().IntVar(&cOpt.MetricsLimit, "metricslimit", 10000, "metric size limit of single request, specified in series*hour per request") + cmd.Flags().IntVar(&cOpt.MetricMinInterval, "metric-min-interval", 2, "the minimum interval of a single request in minutes") cmd.Flags().StringVar(&metricsConf, "metricsconfig", "", "config file of metricsfilter") cmd.Flags().StringSliceVar(&labels, "metricslabel", nil, "only collect metrics that match labels") cmd.Flags().StringVar(&promEndpoint, "overwrite-prometheus-endpoint", "", "Prometheus endpoint") diff --git a/collector/collect.go b/collector/collect.go index fc834150..f1f56637 100644 --- a/collector/collect.go +++ b/collector/collect.go @@ -108,28 +108,29 @@ type BaseOptions struct { // CollectOptions contains the options defining which type of data to collect type CollectOptions struct { - RawRequest interface{} // raw collect command or request - Mode string // the cluster is deployed with what type of tool - DiagMode string // run diag collect at command line mode or server mode - ProfileName string // the name of a pre-defined collecting profile - Collectors CollectTree // struct to show which collector is enabled - MetricsFilter []string // prefix of metrics to collect - MetricsExclude []string //prefix of metrics to exclude - MetricsLabel map[string]string // label to filte metrics - Dir string // target directory to store collected data - Limit int // rate limit of SCP - MetricsLimit int // query limit of one request - PerfDuration int //seconds: profile time(s), default is 30s. - CompressScp bool // compress of files during collecting - CompressMetrics bool // compress of files during collecting - RawMonitor bool // collect raw data for metrics - ExitOnError bool // break the process and exit when an error occur - ExtendedAttrs map[string]string // extended attributes used for manual collecting mode - ExplainSQLPath string // File path for explain sql - ExplainSqls []string // explain sqls - CurrDB string - Header []string - UsePortForward bool // use portforward when call api inside k8s cluster + RawRequest interface{} // raw collect command or request + Mode string // the cluster is deployed with what type of tool + DiagMode string // run diag collect at command line mode or server mode + ProfileName string // the name of a pre-defined collecting profile + Collectors CollectTree // struct to show which collector is enabled + MetricsFilter []string // prefix of metrics to collect + MetricsExclude []string //prefix of metrics to exclude + MetricsLabel map[string]string // label to filte metrics + Dir string // target directory to store collected data + Limit int // rate limit of SCP + MetricsLimit int // query limit of one request + MetricMinInterval int // query minimum interval of one request, default is 1min. + PerfDuration int //seconds: profile time(s), default is 30s. + CompressScp bool // compress of files during collecting + CompressMetrics bool // compress of files during collecting + RawMonitor bool // collect raw data for metrics + ExitOnError bool // break the process and exit when an error occur + ExtendedAttrs map[string]string // extended attributes used for manual collecting mode + ExplainSQLPath string // File path for explain sql + ExplainSqls []string // explain sqls + CurrDB string + Header []string + UsePortForward bool // use portforward when call api inside k8s cluster } // CollectStat is estimated size stats of data to be collected @@ -302,6 +303,7 @@ func (m *Manager) CollectClusterInfo( filter: cOpt.MetricsFilter, exclude: cOpt.MetricsExclude, limit: cOpt.MetricsLimit, + minInterval: cOpt.MetricMinInterval, compress: cOpt.CompressMetrics, customHeader: cOpt.Header, portForward: cOpt.UsePortForward, @@ -534,7 +536,7 @@ func (m *Manager) CollectClusterInfo( // run collectors collectErrs := make(map[string]error) for _, c := range collectors { - m.logger.Infof("Collecting %s...\n", c.Desc()) + m.logger.Infof("Collecting %s..., time:%v\n", c.Desc(), time.Now()) if err := c.Collect(m, cls); err != nil { if cOpt.ExitOnError { return "", err @@ -563,7 +565,7 @@ func (m *Manager) CollectClusterInfo( if m.logger.GetDisplayMode() == logprinter.DisplayModeDefault { dir = color.CyanString(resultDir) } - m.logger.Infof("Collected data are stored in %s\n", dir) + m.logger.Infof("Collected data are stored in %s, now:%v\n", dir, time.Now()) return resultDir, nil } diff --git a/collector/prometheus.go b/collector/prometheus.go index 08cf20fd..ce07abfd 100644 --- a/collector/prometheus.go +++ b/collector/prometheus.go @@ -18,6 +18,7 @@ import ( "fmt" "io" "maps" + "net" "net/http" "net/url" "os" @@ -49,12 +50,12 @@ import ( ) const ( - subdirMonitor = "monitor" - subdirAlerts = "alerts" - subdirMetrics = "metrics" - subdirRaw = "raw" - maxQueryRange = 120 * 60 // 120min - minQueryRange = 5 * 60 // 5min + subdirMonitor = "monitor" + subdirAlerts = "alerts" + subdirMetrics = "metrics" + subdirRaw = "raw" + maxQueryRange = 120 * 60 // 120min + smallQueryRange = 5 * 60 // 5min ) type collectMonitor struct { @@ -177,6 +178,7 @@ type MetricCollectOptions struct { filter []string exclude []string limit int // series*min per query + minInterval int // the minimum interval of a single request in minutes compress bool customHeader []string endpoint string @@ -281,20 +283,21 @@ func (c *MetricCollectOptions) Collect(m *Manager, topo *models.TiDBCluster) err if c.endpoint == "" { return nil } - mb := progress.NewMultiBar("+ Dumping metrics") + startTime := time.Now() + // mb := progress.NewMultiBar("+ Dumping metrics") bars := make(map[string]*progress.MultiBarItem) total := len(c.metrics) mu := sync.Mutex{} key := c.endpoint - if _, ok := bars[key]; !ok { - bars[key] = mb.AddBar(fmt.Sprintf(" - Querying server %s", key)) - } + // if _, ok := bars[key]; !ok { + // bars[key] = mb.AddBar(fmt.Sprintf(" - Querying server %s", key)) + // } - if m.diagMode == DiagModeCmd { - mb.StartRenderLoop() - defer mb.StopRenderLoop() - } + // if m.diagMode == DiagModeCmd { + // mb.StartRenderLoop() + // defer mb.StopRenderLoop() + // } qLimit := c.opt.Concurrency cpuCnt := runtime.NumCPU() @@ -312,33 +315,50 @@ func (c *MetricCollectOptions) Collect(m *Manager, topo *models.TiDBCluster) err return err } - client := &http.Client{Timeout: time.Second * time.Duration(c.opt.APITimeout)} + client := &http.Client{ + Transport: &http.Transport{ + MaxIdleConns: 10, + MaxIdleConnsPerHost: 5, + IdleConnTimeout: 30 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + DialContext: (&net.Dialer{ + Timeout: 5 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + }, + Timeout: time.Second * time.Duration(c.opt.APITimeout), + } + tokMap := make(map[uint]struct{}, 10) for _, mtc := range c.metrics { go func(tok *utils.Token, mtc string) { - bars[key].UpdateDisplay(&progress.DisplayProps{ - Prefix: fmt.Sprintf(" - Querying server %s", key), - Suffix: fmt.Sprintf("%d/%d querying %s ...", done, total, mtc), - }) + // bars[key].UpdateDisplay(&progress.DisplayProps{ + // Prefix: fmt.Sprintf(" - Querying server %s", key), + // Suffix: fmt.Sprintf("%d/%d querying %s ...", done, total, mtc), + // }) tsEnd, _ := utils.ParseTime(c.GetBaseOptions().ScrapeEnd) tsStart, _ := utils.ParseTime(c.GetBaseOptions().ScrapeBegin) - collectMetric(m.logger, client, key, tsStart, tsEnd, mtc, c.label, c.resultDir, c.limit, c.compress, c.customHeader, "") + collectMetric(m.logger, client, key, tsStart, tsEnd, mtc, c.label, c.resultDir, c.limit, c.compress, c.customHeader, c.minInterval, "") mu.Lock() done++ if done >= total { - bars[key].UpdateDisplay(&progress.DisplayProps{ - Prefix: fmt.Sprintf(" - Query server %s", key), - Mode: progress.ModeDone, - }) + // bars[key].UpdateDisplay(&progress.DisplayProps{ + // Prefix: fmt.Sprintf(" - Query server %s", key), + // Mode: progress.ModeDone, + // }) } mu.Unlock() + if _, ok := tokMap[tok.ID]; !ok { + tokMap[tok.ID] = struct{}{} + } tl.Put(tok) }(tl.Get(), mtc) } tl.Wait() + m.logger.Infof("Dumping metric end .......................................... take time:%v, tokMap:%v", time.Since(startTime), tokMap) return nil } @@ -409,6 +429,7 @@ func collectMetric( speedlimit int, compress bool, customHeader []string, + minInterval int, instance string, ) { nameSuffix := "" @@ -466,7 +487,7 @@ func collectMetric( newLabel := make(map[string]string) maps.Copy(newLabel, label) newLabel["instance"] = instance - collectMetric(l, c, promAddr, beginTime, endTime, mtc, newLabel, resultDir, speedlimit, compress, customHeader, instance) + collectMetric(l, c, promAddr, beginTime, endTime, mtc, newLabel, resultDir, speedlimit, compress, customHeader, minInterval, instance) } } return @@ -485,11 +506,35 @@ func collectMetric( if block > maxQueryRange { block = maxQueryRange } - if block < minQueryRange { - block = minQueryRange + if block < smallQueryRange { + block = smallQueryRange } - l.Debugf("Dumping metric %s-%s-%s%s...", mtc, beginTime.Format(time.RFC3339), endTime.Format(time.RFC3339), nameSuffix) + concurrency := 1 + isBlackList := mtc+nameSuffix == "tidb_session_parse_duration_seconds_bucket" || mtc+nameSuffix == "tidb_session_compile_duration_seconds_bucket" + if isBlackList { + block = minInterval * 60 + concurrency = 5 + } + if series > 1000 || block < maxQueryRange || isBlackList { + l.Infof("Dumping metric %s, speedlimit:%d, series:%d, block:%d min, req timeout:%v, start time:%v ...", + mtc+nameSuffix, speedlimit, series, block/60, c.Timeout, time.Now()) + } + retryOption := tiuputils.RetryOption{ + Attempts: 3, + Delay: time.Microsecond * 300, + Timeout: c.Timeout*3 + 5*time.Second, //make sure the retry timeout is longer than the api timeout + } + num := 1 + qInfo := queryInfo{ + query: query, + promAddr: promAddr, + customHeader: customHeader, + compress: compress, + retryOption: retryOption, + } + errCh := make(chan error, concurrency) + queryInfoCh := make(chan queryInfo, concurrency) for queryEnd := endTime; queryEnd.After(beginTime); queryEnd = queryEnd.Add(time.Duration(-block) * time.Second) { querySec := block queryBegin := queryEnd.Add(time.Duration(-block) * time.Second) @@ -497,71 +542,152 @@ func collectMetric( querySec = int(queryEnd.Sub(beginTime).Seconds()) queryBegin = beginTime } - if err := tiuputils.Retry( - func() error { - req, err := http.NewRequest( - http.MethodGet, - fmt.Sprintf("http://%s/api/v1/query?%s", promAddr, url.Values{ - "query": {fmt.Sprintf("%s[%ds]", query, querySec)}, - "time": {queryEnd.Format(time.RFC3339)}, - }.Encode()), - nil) - if err != nil { - return err - } - utils.AddHeaders(req.Header, customHeader) - resp, err := c.Do(req) - if err != nil { - l.Errorf("failed query metric %s: %s, retry...", mtc+nameSuffix, err) - return err - } - // Prometheus API response format is JSON. Every successful API request returns a 2xx status code. - if resp.StatusCode/100 != 2 { - l.Errorf("failed query metric %s: Status Code %d, retry...", mtc+nameSuffix, resp.StatusCode) - } - defer resp.Body.Close() + startTime0 := time.Now() - dst, err := os.Create( - filepath.Join( - resultDir, subdirMonitor, subdirMetrics, strings.ReplaceAll(promAddr, ":", "-"), - fmt.Sprintf("%s-%s-%s%s.json", mtc, queryBegin.Format(time.RFC3339), queryEnd.Format(time.RFC3339), nameSuffix), - ), - ) - if err != nil { - l.Errorf("collect metric %s: %s, retry...", mtc+nameSuffix, err) - } - defer dst.Close() + qInfo.queryRangeInfo = queryRangeInfo{ + queryBegin: queryBegin, + queryEnd: queryEnd, + intervalSec: querySec, + } + wg := sync.WaitGroup{} + if concurrency == 1 { + if err := collectSingleQuery(l, c, resultDir, mtc, nameSuffix, qInfo); err != nil { + l.Errorf("Error quering metrics %s: %s... timeout:%v, take time:%v", + mtc+nameSuffix, err, c.Timeout*3+5*time.Second, time.Since(startTime0)) + } + } else { + queryInfoCh <- qInfo + if num >= concurrency { + wg.Add(1) + go func() { + defer wg.Done() + collectQueries(l, c, resultDir, mtc, nameSuffix, queryInfoCh, errCh) + }() + } + } + wg.Wait() + num++ + } + close(queryInfoCh) +} - var enc io.WriteCloser - var n int64 - if compress { - // compress the metric - enc, err = zstd.NewWriter(dst) - if err != nil { - l.Errorf("failed compressing metric %s: %s, retry...\n", mtc+nameSuffix, err) - return err - } - defer enc.Close() - } else { - enc = dst - } - n, err = io.Copy(enc, resp.Body) +type queryInfo struct { + query string + promAddr string + queryRangeInfo + customHeader []string + compress bool + retryOption tiuputils.RetryOption +} + +type queryRangeInfo struct { + queryBegin time.Time + queryEnd time.Time + intervalSec int +} + +func collectQueries(l *logprinter.Logger, c *http.Client, resultDir, mtc, nameSuffix string, + queryInfoCh chan queryInfo, errCh chan error) { + for { + qInfo, ok := <-queryInfoCh + if !ok { + return + } + + err := collectSingleQuery(l, c, resultDir, mtc, nameSuffix, qInfo) + if err != nil { + l.Errorf("Error quering metrics %s: %s... timeout:%v, take time:%v", + mtc+nameSuffix, err, c.Timeout*3+5*time.Second, time.Since(startTime0)) + } + // if err != nil && strings.Contains(err.Error(), "connection refused") { + // querySec := 30 + // beginTime := qInfo.queryBegin + // for queryEnd := qInfo.queryEnd; queryEnd.After(beginTime); queryEnd = queryEnd.Add(time.Duration(-querySec) * time.Second) { + // queryBegin := queryEnd.Add(time.Duration(-querySec) * time.Second) + // if queryBegin.Before(beginTime) { + // querySec = int(queryEnd.Sub(beginTime).Seconds()) + // queryBegin = beginTime + // } + + // qInfo.queryRangeInfo = queryRangeInfo{ + // queryBegin: queryBegin, + // queryEnd: qInfo.queryBegin, + // intervalSec: querySec, + // } + // } + // } + // errCh <- err + } +} + +func collectSingleQuery(l *logprinter.Logger, c *http.Client, resultDir, mtc, nameSuffix string, qInfo queryInfo) error { + i := 0 + return tiuputils.Retry( + func() error { + startTime := time.Now() + req, err := http.NewRequest( + http.MethodGet, + fmt.Sprintf("http://%s/api/v1/query?%s", qInfo.promAddr, url.Values{ + "query": {fmt.Sprintf("%s[%ds]", qInfo.query, qInfo.intervalSec)}, + "time": {qInfo.queryEnd.Format(time.RFC3339)}, + }.Encode()), + nil) + if err != nil { + return err + } + getTime := time.Since(startTime) + utils.AddHeaders(req.Header, qInfo.customHeader) + resp, err := c.Do(req) + i++ + if err != nil { + l.Errorf("failed query metric no.%d: %s, retry... block:%v min, get time:%v", i, err, qInfo.intervalSec/60, getTime) + // l.Errorf("failed query metric %s: %s, retry... block:%v min, get time:%v", mtc+nameSuffix, err, qInfo.intervalSec/60, getTime) + return err + } + // Prometheus API response format is JSON. Every successful API request returns a 2xx status code. + if resp.StatusCode/100 != 2 { + l.Errorf("failed query metric no.%d: %s: Status Code %d, retry... get time:%v", i, qInfo.intervalSec/60, resp.StatusCode, getTime) + // l.Errorf("failed query metric %s: Status Code %d, retry... get time:%v", mtc+nameSuffix, resp.StatusCode, getTime) + } + defer resp.Body.Close() + + dst, err := os.Create( + filepath.Join( + resultDir, subdirMonitor, subdirMetrics, strings.ReplaceAll(qInfo.promAddr, ":", "-"), + fmt.Sprintf("%s-%s-%s%s.json", mtc, qInfo.queryBegin.Format(time.RFC3339), qInfo.queryEnd.Format(time.RFC3339), nameSuffix), + ), + ) + if err != nil { + l.Errorf("collect metric %s: %s, retry...", mtc+nameSuffix, err) + } + defer dst.Close() + + var enc io.WriteCloser + var n int64 + if qInfo.compress { + // compress the metric + enc, err = zstd.NewWriter(dst) if err != nil { - l.Errorf("failed writing metric %s to file: %s, retry...\n", mtc+nameSuffix, err) + l.Errorf("failed compressing metric %s: %s, retry...\n", mtc+nameSuffix, err) return err } - l.Debugf(" Dumped metric %s from %s to %s (%d bytes)", mtc+nameSuffix, queryBegin.Format(time.RFC3339), queryEnd.Format(time.RFC3339), n) - return nil - }, - tiuputils.RetryOption{ - Attempts: 3, - Delay: time.Microsecond * 300, - Timeout: c.Timeout*3 + 5*time.Second, //make sure the retry timeout is longer than the api timeout - }, - ); err != nil { - l.Errorf("Error quering metrics %s: %s", mtc+nameSuffix, err) - } - } + defer enc.Close() + } else { + enc = dst + } + n, err = io.Copy(enc, resp.Body) + if err != nil { + l.Errorf("failed writing metric %s to file: %s, retry...get time:%v \n", mtc+nameSuffix, err, time.Since(startTime)) + return err + } + if time.Since(startTime) > time.Second { + l.Infof(" Dumped metric %s from %s to %s (%d bytes), no.%d take time get:%v", + mtc+nameSuffix, qInfo.queryBegin.Format(time.RFC3339), qInfo.queryEnd.Format(time.RFC3339), n, i, time.Since(startTime)) + } + return nil + }, + qInfo.retryOption, + ) } func ensureMonitorDir(base string, sub ...string) error { diff --git a/collector/util.go b/collector/util.go new file mode 100644 index 00000000..8e951491 --- /dev/null +++ b/collector/util.go @@ -0,0 +1,34 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +// WithRecovery wraps goroutine startup call with force recovery. +// it will dump current goroutine stack into log if catch any recover result. +// +// exec: execute logic function. +// recoverFn: handler will be called after recover and before dump stack, passing `nil` means noop. +func WithRecovery(exec func(), recoverFn func(r any)) { + defer func() { + r := recover() + if recoverFn != nil { + recoverFn(r) + } + if r != nil { + logutil.BgLogger().Error("panic in the recoverable goroutine", + zap.Any("r", r), + zap.Stack("stack trace")) + } + }() + exec() +} diff --git a/pkg/utils/tokenlimiter.go b/pkg/utils/tokenlimiter.go index 03cc7ee4..8e3c7179 100644 --- a/pkg/utils/tokenlimiter.go +++ b/pkg/utils/tokenlimiter.go @@ -19,6 +19,7 @@ import ( // Token is used as a permission to keep on running. type Token struct { + ID uint } // TokenLimiter is used to limit the number of concurrent tasks. @@ -48,7 +49,7 @@ func (tl *TokenLimiter) Wait() { func NewTokenLimiter(count uint) *TokenLimiter { tl := &TokenLimiter{count: count, ch: make(chan *Token, count)} for i := uint(0); i < count; i++ { - tl.ch <- &Token{} + tl.ch <- &Token{ID: i} } return tl From 9ac08d06639dcccf9d1e220a2b30ef9e088e3199 Mon Sep 17 00:00:00 2001 From: Lynn Date: Thu, 17 Apr 2025 10:50:29 +0800 Subject: [PATCH 2/6] *: add two flags and add concurrently handle a single metric --- cmd/diag/command/collect.go | 4 +- collector/collect.go | 50 +++---- collector/prom2influx.go | 2 +- collector/prometheus.go | 253 ++++++++++++++++++++++-------------- collector/util.go | 45 ++++--- k8s/server/collectors.go | 1 - pkg/utils/tokenlimiter.go | 15 ++- 7 files changed, 223 insertions(+), 147 deletions(-) diff --git a/cmd/diag/command/collect.go b/cmd/diag/command/collect.go index 6bc07300..0360fe42 100644 --- a/cmd/diag/command/collect.go +++ b/cmd/diag/command/collect.go @@ -157,8 +157,10 @@ func newCollectCmd() *cobra.Command { cmd.Flags().StringSliceVar(&ext, "exclude", nil, "types of data not to collect") cmd.Flags().StringSliceVar(&cOpt.MetricsFilter, "metricsfilter", nil, "prefix of metrics to collect") cmd.Flags().StringSliceVar(&cOpt.MetricsExclude, "metricsexclude", []string{"node_interrupts_total"}, "prefix of metrics to exclude") + cmd.Flags().StringSliceVar(&cOpt.MetricsLowPriority, "metrics-low-priority", []string{"tidb_tikvclient_request_seconds_bucket"}, + "prefix of metrics to collect with low priority") + cmd.Flags().IntVar(&cOpt.MetricsMinInterval, "metrics-min-interval", 60, "the minimum interval of a single request in seconds") cmd.Flags().IntVar(&cOpt.MetricsLimit, "metricslimit", 10000, "metric size limit of single request, specified in series*hour per request") - cmd.Flags().IntVar(&cOpt.MetricMinInterval, "metric-min-interval", 2, "the minimum interval of a single request in minutes") cmd.Flags().StringVar(&metricsConf, "metricsconfig", "", "config file of metricsfilter") cmd.Flags().StringSliceVar(&labels, "metricslabel", nil, "only collect metrics that match labels") cmd.Flags().StringVar(&promEndpoint, "overwrite-prometheus-endpoint", "", "Prometheus endpoint") diff --git a/collector/collect.go b/collector/collect.go index f1f56637..204c2c9d 100644 --- a/collector/collect.go +++ b/collector/collect.go @@ -108,29 +108,30 @@ type BaseOptions struct { // CollectOptions contains the options defining which type of data to collect type CollectOptions struct { - RawRequest interface{} // raw collect command or request - Mode string // the cluster is deployed with what type of tool - DiagMode string // run diag collect at command line mode or server mode - ProfileName string // the name of a pre-defined collecting profile - Collectors CollectTree // struct to show which collector is enabled - MetricsFilter []string // prefix of metrics to collect - MetricsExclude []string //prefix of metrics to exclude - MetricsLabel map[string]string // label to filte metrics - Dir string // target directory to store collected data - Limit int // rate limit of SCP - MetricsLimit int // query limit of one request - MetricMinInterval int // query minimum interval of one request, default is 1min. - PerfDuration int //seconds: profile time(s), default is 30s. - CompressScp bool // compress of files during collecting - CompressMetrics bool // compress of files during collecting - RawMonitor bool // collect raw data for metrics - ExitOnError bool // break the process and exit when an error occur - ExtendedAttrs map[string]string // extended attributes used for manual collecting mode - ExplainSQLPath string // File path for explain sql - ExplainSqls []string // explain sqls - CurrDB string - Header []string - UsePortForward bool // use portforward when call api inside k8s cluster + RawRequest interface{} // raw collect command or request + Mode string // the cluster is deployed with what type of tool + DiagMode string // run diag collect at command line mode or server mode + ProfileName string // the name of a pre-defined collecting profile + Collectors CollectTree // struct to show which collector is enabled + MetricsFilter []string // prefix of metrics to collect + MetricsExclude []string // prefix of metrics to exclude + MetricsLowPriority []string // prefix of metrics to collect with low priority + MetricsLabel map[string]string // label to filte metrics + Dir string // target directory to store collected data + Limit int // rate limit of SCP + MetricsLimit int // query limit of one request + MetricsMinInterval int // query minimum interval of one request, default is 1min. + PerfDuration int // seconds: profile time(s), default is 30s. + CompressScp bool // compress of files during collecting + CompressMetrics bool // compress of files during collecting + RawMonitor bool // collect raw data for metrics + ExitOnError bool // break the process and exit when an error occur + ExtendedAttrs map[string]string // extended attributes used for manual collecting mode + ExplainSQLPath string // File path for explain sql + ExplainSqls []string // explain sqls + CurrDB string + Header []string + UsePortForward bool // use portforward when call api inside k8s cluster } // CollectStat is estimated size stats of data to be collected @@ -302,8 +303,9 @@ func (m *Manager) CollectClusterInfo( label: cOpt.MetricsLabel, filter: cOpt.MetricsFilter, exclude: cOpt.MetricsExclude, + lowPriority: cOpt.MetricsLowPriority, limit: cOpt.MetricsLimit, - minInterval: cOpt.MetricMinInterval, + minInterval: cOpt.MetricsMinInterval, compress: cOpt.CompressMetrics, customHeader: cOpt.Header, portForward: cOpt.UsePortForward, diff --git a/collector/prom2influx.go b/collector/prom2influx.go index c07cf9ba..f12b82df 100644 --- a/collector/prom2influx.go +++ b/collector/prom2influx.go @@ -269,7 +269,7 @@ func buildPoints( func writeBatchPoints(client influx.Client, data promDump, opts *RebuildOptions) error { // build and write points var errr error - tl := utils.NewTokenLimiter(uint(opts.Concurrency)) + tl := utils.NewTokenLimiter(opts.Concurrency) wg := sync.WaitGroup{} for _, series := range data.Data.Result { ptChan := buildPoints(series, opts) diff --git a/collector/prometheus.go b/collector/prometheus.go index ce07abfd..3f151b36 100644 --- a/collector/prometheus.go +++ b/collector/prometheus.go @@ -55,7 +55,7 @@ const ( subdirMetrics = "metrics" subdirRaw = "raw" maxQueryRange = 120 * 60 // 120min - smallQueryRange = 5 * 60 // 5min + smallQueryRange = 15 // 15s. ) type collectMonitor struct { @@ -177,8 +177,9 @@ type MetricCollectOptions struct { metrics []string // metric list filter []string exclude []string + lowPriority []string limit int // series*min per query - minInterval int // the minimum interval of a single request in minutes + minInterval int // the minimum interval of a single request in seconds compress bool customHeader []string endpoint string @@ -284,29 +285,31 @@ func (c *MetricCollectOptions) Collect(m *Manager, topo *models.TiDBCluster) err return nil } startTime := time.Now() - // mb := progress.NewMultiBar("+ Dumping metrics") + mb := progress.NewMultiBar("+ Dumping metrics") bars := make(map[string]*progress.MultiBarItem) - total := len(c.metrics) - mu := sync.Mutex{} key := c.endpoint - // if _, ok := bars[key]; !ok { - // bars[key] = mb.AddBar(fmt.Sprintf(" - Querying server %s", key)) - // } + if _, ok := bars[key]; !ok { + bars[key] = mb.AddBar(fmt.Sprintf(" - Querying server %s", key)) + } - // if m.diagMode == DiagModeCmd { - // mb.StartRenderLoop() - // defer mb.StopRenderLoop() - // } + if m.diagMode == DiagModeCmd { + mb.StartRenderLoop() + defer mb.StopRenderLoop() + } qLimit := c.opt.Concurrency cpuCnt := runtime.NumCPU() if cpuCnt < qLimit { qLimit = cpuCnt } - tl := utils.NewTokenLimiter(uint(qLimit)) + // Prometheus default query.max-concurrency is 20, so here set the max qLimit to 20. + defaultQueryMaxConcurrency := 20 + if qLimit > defaultQueryMaxConcurrency { + qLimit = defaultQueryMaxConcurrency + } + tl := utils.NewTokenLimiter(qLimit) - done := 1 if err := ensureMonitorDir(c.resultDir, subdirMetrics, strings.ReplaceAll(c.endpoint, ":", "-")); err != nil { bars[key].UpdateDisplay(&progress.DisplayProps{ Prefix: fmt.Sprintf(" - Query server %s: %s", key, err), @@ -317,8 +320,8 @@ func (c *MetricCollectOptions) Collect(m *Manager, topo *models.TiDBCluster) err client := &http.Client{ Transport: &http.Transport{ - MaxIdleConns: 10, - MaxIdleConnsPerHost: 5, + MaxIdleConns: defaultQueryMaxConcurrency, + MaxIdleConnsPerHost: 10, IdleConnTimeout: 30 * time.Second, ExpectContinueTimeout: 1 * time.Second, DialContext: (&net.Dialer{ @@ -328,39 +331,89 @@ func (c *MetricCollectOptions) Collect(m *Manager, topo *models.TiDBCluster) err }, Timeout: time.Second * time.Duration(c.opt.APITimeout), } - tokMap := make(map[uint]struct{}, 10) - for _, mtc := range c.metrics { + tsEnd, _ := utils.ParseTime(c.GetBaseOptions().ScrapeEnd) + tsStart, _ := utils.ParseTime(c.GetBaseOptions().ScrapeBegin) + if len(c.lowPriority) == 0 { + c.collectMetrics(m.logger, client, c.metrics, tsStart, tsEnd, midPriority, tl, bars) + m.logger.Infof("Dumping metrics finish .......................................... token limit:%d, concurrency:%d, take time:%v", + qLimit, c.opt.Concurrency, time.Since(startTime)) + } else { + c.collectMetrics(m.logger, client, c.metrics, tsStart, tsEnd, highPriority, tl, bars) + m.logger.Infof("Dumping high priority metrics finish .......................................... token limit:%d, concurrency:%d, take time:%v", + qLimit, c.opt.Concurrency, time.Since(startTime)) + startTime = time.Now() + c.collectMetrics(m.logger, client, c.lowPriority, tsStart, tsEnd, lowPriority, tl, bars) + m.logger.Infof("Dumping low priority metrics finish .......................................... token limit:%d, concurrency:%d, take time:%v", + qLimit, c.opt.Concurrency, time.Since(startTime)) + } + + return nil +} + +const ( + lowPriority = -1 + midPriority = 0 + highPriority = 1 +) + +func (c *MetricCollectOptions) collectMetrics( + l *logprinter.Logger, + client *http.Client, + metrics []string, + tsStart, tsEnd time.Time, + priority int, + tl *utils.TokenLimiter, + bars map[string]*progress.MultiBarItem, +) { + done := 1 + key := c.endpoint + mu := sync.Mutex{} + minInterval := c.minInterval + if minInterval < smallQueryRange { + minInterval = smallQueryRange + } + concurrency := 1 + if priority > lowPriority { + concurrency = c.opt.Concurrency + } + total := len(c.metrics) + if priority == highPriority { + total = len(c.metrics) - len(c.lowPriority) + } else if priority == lowPriority { + total = len(c.lowPriority) + } + originInfo := queryRangeInfo{ + queryBegin: tsStart, + queryEnd: tsEnd, + intervalSec: minInterval, + } + for _, mtc := range metrics { + if priority == highPriority && utils.MatchPrefixs(mtc, c.lowPriority) { + continue + } + go func(tok *utils.Token, mtc string) { - // bars[key].UpdateDisplay(&progress.DisplayProps{ - // Prefix: fmt.Sprintf(" - Querying server %s", key), - // Suffix: fmt.Sprintf("%d/%d querying %s ...", done, total, mtc), - // }) + bars[key].UpdateDisplay(&progress.DisplayProps{ + Prefix: fmt.Sprintf(" - Querying server %s", key), + Suffix: fmt.Sprintf("%d/%d querying %s ...", done, total, mtc), + }) - tsEnd, _ := utils.ParseTime(c.GetBaseOptions().ScrapeEnd) - tsStart, _ := utils.ParseTime(c.GetBaseOptions().ScrapeBegin) - collectMetric(m.logger, client, key, tsStart, tsEnd, mtc, c.label, c.resultDir, c.limit, c.compress, c.customHeader, c.minInterval, "") + collectSingleMetric(l, client, key, originInfo, concurrency, mtc, c.label, c.resultDir, c.limit, c.compress, c.customHeader, "", tok.ID, tl) mu.Lock() done++ if done >= total { - // bars[key].UpdateDisplay(&progress.DisplayProps{ - // Prefix: fmt.Sprintf(" - Query server %s", key), - // Mode: progress.ModeDone, - // }) + bars[key].UpdateDisplay(&progress.DisplayProps{ + Prefix: fmt.Sprintf(" - Query server %s", key), + Mode: progress.ModeDone, + }) } mu.Unlock() - if _, ok := tokMap[tok.ID]; !ok { - tokMap[tok.ID] = struct{}{} - } tl.Put(tok) }(tl.Get(), mtc) } - tl.Wait() - m.logger.Infof("Dumping metric end .......................................... take time:%v, tokMap:%v", time.Since(startTime), tokMap) - - return nil } func getMetricList(c *http.Client, addr string, customHeader []string) ([]string, error) { @@ -418,25 +471,28 @@ func makeURL(addr string, path string, queries map[string]string) string { return link + "?" + vals.Encode() } -func collectMetric( +func collectSingleMetric( l *logprinter.Logger, c *http.Client, promAddr string, - beginTime, endTime time.Time, + originInfo queryRangeInfo, + concurrency int, mtc string, label map[string]string, resultDir string, speedlimit int, compress bool, customHeader []string, - minInterval int, instance string, + curTokenID int, + tl *utils.TokenLimiter, ) { nameSuffix := "" if len(instance) > 0 { nameSuffix = "." + strings.ReplaceAll(instance, ":", "-") } query := generateQueryWitLabel(mtc, label) + beginTime, endTime := originInfo.queryBegin, originInfo.queryEnd queries := map[string]string{ "match[]": query, "start": beginTime.Format(time.RFC3339), @@ -487,7 +543,7 @@ func collectMetric( newLabel := make(map[string]string) maps.Copy(newLabel, label) newLabel["instance"] = instance - collectMetric(l, c, promAddr, beginTime, endTime, mtc, newLabel, resultDir, speedlimit, compress, customHeader, minInterval, instance) + collectSingleMetric(l, c, promAddr, originInfo, concurrency, mtc, newLabel, resultDir, speedlimit, compress, customHeader, instance, curTokenID, tl) } } return @@ -506,26 +562,21 @@ func collectMetric( if block > maxQueryRange { block = maxQueryRange } - if block < smallQueryRange { - block = smallQueryRange + if block < originInfo.intervalSec { + block = originInfo.intervalSec } - concurrency := 1 isBlackList := mtc+nameSuffix == "tidb_session_parse_duration_seconds_bucket" || mtc+nameSuffix == "tidb_session_compile_duration_seconds_bucket" - if isBlackList { - block = minInterval * 60 - concurrency = 5 - } if series > 1000 || block < maxQueryRange || isBlackList { - l.Infof("Dumping metric %s, speedlimit:%d, series:%d, block:%d min, req timeout:%v, start time:%v ...", - mtc+nameSuffix, speedlimit, series, block/60, c.Timeout, time.Now()) + l.Infof("Dumping metric %s, concurrency:%d, speedlimit:%d, series:%d, interval:%d s, req timeout:%v, start time:%v ...", + mtc+nameSuffix, concurrency, speedlimit, series, block, c.Timeout, time.Now().Format(time.RFC3339)) } retryOption := tiuputils.RetryOption{ Attempts: 3, Delay: time.Microsecond * 300, Timeout: c.Timeout*3 + 5*time.Second, //make sure the retry timeout is longer than the api timeout } - num := 1 + goCnt := 0 qInfo := queryInfo{ query: query, promAddr: promAddr, @@ -533,8 +584,9 @@ func collectMetric( compress: compress, retryOption: retryOption, } - errCh := make(chan error, concurrency) queryInfoCh := make(chan queryInfo, concurrency) + wg := WaitGroupWrapper{} + startTime := time.Now() for queryEnd := endTime; queryEnd.After(beginTime); queryEnd = queryEnd.Add(time.Duration(-block) * time.Second) { querySec := block queryBegin := queryEnd.Add(time.Duration(-block) * time.Second) @@ -549,26 +601,44 @@ func collectMetric( queryEnd: queryEnd, intervalSec: querySec, } - wg := sync.WaitGroup{} if concurrency == 1 { - if err := collectSingleQuery(l, c, resultDir, mtc, nameSuffix, qInfo); err != nil { + if err := collectSingleQuery(l, c, curTokenID, resultDir, mtc, nameSuffix, qInfo); err != nil { l.Errorf("Error quering metrics %s: %s... timeout:%v, take time:%v", mtc+nameSuffix, err, c.Timeout*3+5*time.Second, time.Since(startTime0)) } } else { queryInfoCh <- qInfo - if num >= concurrency { - wg.Add(1) - go func() { - defer wg.Done() - collectQueries(l, c, resultDir, mtc, nameSuffix, queryInfoCh, errCh) - }() + if goCnt == 0 { + wg.RunWithRecover(func() { collectQueries(l, c, curTokenID, resultDir, mtc, nameSuffix, queryInfoCh) }, nil) + } else if goCnt < concurrency { + token := tl.Get() + wg.RunWithRecover(func() { + collectQueries(l, c, int(token.ID), resultDir, mtc, nameSuffix, queryInfoCh) + tl.Put(token) + }, + nil) } } - wg.Wait() - num++ + goCnt++ + } + if concurrency == 1 { + return + } + + startTime1 := time.Now() + for { + if len(queryInfoCh) == 0 { + close(queryInfoCh) + break + } + if wg.PanicCnt == concurrency { + break + } + time.Sleep(5 * time.Millisecond) } - close(queryInfoCh) + wg.Wait() + l.Infof("Dumped metric %s from %s to %s, concurrency:%d, wait take time:%v, total take time:%v", + mtc+nameSuffix, endTime.Format(time.RFC3339), beginTime.Format(time.RFC3339), goCnt, time.Since(startTime1), time.Since(startTime)) } type queryInfo struct { @@ -586,41 +656,25 @@ type queryRangeInfo struct { intervalSec int } -func collectQueries(l *logprinter.Logger, c *http.Client, resultDir, mtc, nameSuffix string, - queryInfoCh chan queryInfo, errCh chan error) { +func collectQueries(l *logprinter.Logger, c *http.Client, tokenID int, resultDir, mtc, nameSuffix string, + queryInfoCh chan queryInfo) { for { qInfo, ok := <-queryInfoCh if !ok { + l.Infof("[ID:%d] collect metric %s finished", tokenID, mtc+nameSuffix) return } - err := collectSingleQuery(l, c, resultDir, mtc, nameSuffix, qInfo) + startTime0 := time.Now() + err := collectSingleQuery(l, c, tokenID, resultDir, mtc, nameSuffix, qInfo) if err != nil { - l.Errorf("Error quering metrics %s: %s... timeout:%v, take time:%v", - mtc+nameSuffix, err, c.Timeout*3+5*time.Second, time.Since(startTime0)) + l.Errorf("[ID:%d] failed query metric %s: %s... client timeout:%v, take time:%v", + tokenID, mtc+nameSuffix, err, c.Timeout*3+5*time.Second, time.Since(startTime0)) } - // if err != nil && strings.Contains(err.Error(), "connection refused") { - // querySec := 30 - // beginTime := qInfo.queryBegin - // for queryEnd := qInfo.queryEnd; queryEnd.After(beginTime); queryEnd = queryEnd.Add(time.Duration(-querySec) * time.Second) { - // queryBegin := queryEnd.Add(time.Duration(-querySec) * time.Second) - // if queryBegin.Before(beginTime) { - // querySec = int(queryEnd.Sub(beginTime).Seconds()) - // queryBegin = beginTime - // } - - // qInfo.queryRangeInfo = queryRangeInfo{ - // queryBegin: queryBegin, - // queryEnd: qInfo.queryBegin, - // intervalSec: querySec, - // } - // } - // } - // errCh <- err - } -} - -func collectSingleQuery(l *logprinter.Logger, c *http.Client, resultDir, mtc, nameSuffix string, qInfo queryInfo) error { + } +} + +func collectSingleQuery(l *logprinter.Logger, c *http.Client, tokenID int, resultDir, mtc, nameSuffix string, qInfo queryInfo) error { i := 0 return tiuputils.Retry( func() error { @@ -640,14 +694,16 @@ func collectSingleQuery(l *logprinter.Logger, c *http.Client, resultDir, mtc, na resp, err := c.Do(req) i++ if err != nil { - l.Errorf("failed query metric no.%d: %s, retry... block:%v min, get time:%v", i, err, qInfo.intervalSec/60, getTime) - // l.Errorf("failed query metric %s: %s, retry... block:%v min, get time:%v", mtc+nameSuffix, err, qInfo.intervalSec/60, getTime) + l.Errorf("[ID:%d-no.%d] failed query metric retry... interval:%v s, take time:%v. If prometheus OOM is the cause, consider reducing concurrency and metrics-min-interval", + tokenID, i, err, qInfo.intervalSec, getTime) + time.Sleep(200 * time.Millisecond) return err } // Prometheus API response format is JSON. Every successful API request returns a 2xx status code. if resp.StatusCode/100 != 2 { - l.Errorf("failed query metric no.%d: %s: Status Code %d, retry... get time:%v", i, qInfo.intervalSec/60, resp.StatusCode, getTime) - // l.Errorf("failed query metric %s: Status Code %d, retry... get time:%v", mtc+nameSuffix, resp.StatusCode, getTime) + l.Errorf("[ID:%d-no.%d] failed query metric Status Code %d, retry... interval:%d s, take time:%v", + tokenID, i, resp.StatusCode, qInfo.intervalSec, getTime) + time.Sleep(200 * time.Millisecond) } defer resp.Body.Close() @@ -658,7 +714,7 @@ func collectSingleQuery(l *logprinter.Logger, c *http.Client, resultDir, mtc, na ), ) if err != nil { - l.Errorf("collect metric %s: %s, retry...", mtc+nameSuffix, err) + l.Errorf("[ID:%d-no.%d] collect metric %s: %s, retry...", tokenID, i, mtc+nameSuffix, err) } defer dst.Close() @@ -668,7 +724,7 @@ func collectSingleQuery(l *logprinter.Logger, c *http.Client, resultDir, mtc, na // compress the metric enc, err = zstd.NewWriter(dst) if err != nil { - l.Errorf("failed compressing metric %s: %s, retry...\n", mtc+nameSuffix, err) + l.Errorf("[ID:%d-no.%d] failed compressing metric %s: %s, retry...\n", tokenID, i, mtc+nameSuffix, err) return err } defer enc.Close() @@ -677,12 +733,13 @@ func collectSingleQuery(l *logprinter.Logger, c *http.Client, resultDir, mtc, na } n, err = io.Copy(enc, resp.Body) if err != nil { - l.Errorf("failed writing metric %s to file: %s, retry...get time:%v \n", mtc+nameSuffix, err, time.Since(startTime)) + l.Errorf("[ID:%d-no.%d] failed writing metric err %s to file: %s, retry...take time:%v \n", + tokenID, i, mtc+nameSuffix, err, time.Since(startTime)) return err } if time.Since(startTime) > time.Second { - l.Infof(" Dumped metric %s from %s to %s (%d bytes), no.%d take time get:%v", - mtc+nameSuffix, qInfo.queryBegin.Format(time.RFC3339), qInfo.queryEnd.Format(time.RFC3339), n, i, time.Since(startTime)) + l.Infof("[ID:%d-no.%d] Dumped metric %s from %s to %s (%d bytes) take time:%v", + tokenID, i, mtc+nameSuffix, qInfo.queryBegin.Format(time.RFC3339), qInfo.queryEnd.Format(time.RFC3339), n, time.Since(startTime)) } return nil }, diff --git a/collector/util.go b/collector/util.go index 8e951491..3188cf2f 100644 --- a/collector/util.go +++ b/collector/util.go @@ -13,22 +13,33 @@ package collector -// WithRecovery wraps goroutine startup call with force recovery. -// it will dump current goroutine stack into log if catch any recover result. -// -// exec: execute logic function. -// recoverFn: handler will be called after recover and before dump stack, passing `nil` means noop. -func WithRecovery(exec func(), recoverFn func(r any)) { - defer func() { - r := recover() - if recoverFn != nil { - recoverFn(r) - } - if r != nil { - logutil.BgLogger().Error("panic in the recoverable goroutine", - zap.Any("r", r), - zap.Stack("stack trace")) - } +import ( + "sync" +) + +// WaitGroupWrapper is a wrapper for sync.WaitGroup +type WaitGroupWrapper struct { + sync.WaitGroup + PanicCnt int +} + +// RunWithRecover wraps goroutine startup call with force recovery, add 1 to WaitGroup +// and call done when function return. it will dump current goroutine stack into log if catch any recover result. +// exec is that execute logic function. recoverFn is that handler will be called after recover and before dump stack, +// passing `nil` means noop. +func (w *WaitGroupWrapper) RunWithRecover(exec func(), recoverFn func(r any)) { + w.Add(1) + go func() { + defer func() { + r := recover() + if recoverFn != nil { + recoverFn(r) + } + if r != nil { + w.PanicCnt++ + } + w.Done() + }() + exec() }() - exec() } diff --git a/k8s/server/collectors.go b/k8s/server/collectors.go index 48f1c512..34df0fea 100644 --- a/k8s/server/collectors.go +++ b/k8s/server/collectors.go @@ -32,7 +32,6 @@ import ( operator "github.com/pingcap/tiup/pkg/cluster/operation" "github.com/pingcap/tiup/pkg/crypto/rand" logprinter "github.com/pingcap/tiup/pkg/logger/printer" - "k8s.io/klog/v2" ) // collect job status diff --git a/pkg/utils/tokenlimiter.go b/pkg/utils/tokenlimiter.go index 8e3c7179..61bdb623 100644 --- a/pkg/utils/tokenlimiter.go +++ b/pkg/utils/tokenlimiter.go @@ -19,15 +19,20 @@ import ( // Token is used as a permission to keep on running. type Token struct { - ID uint + ID int } // TokenLimiter is used to limit the number of concurrent tasks. type TokenLimiter struct { - count uint + count int ch chan *Token } +// Cap obtains the cap. +func (tl *TokenLimiter) Cap() int { + return tl.count +} + // Put releases the token. func (tl *TokenLimiter) Put(tk *Token) { tl.ch <- tk @@ -40,15 +45,15 @@ func (tl *TokenLimiter) Get() *Token { // Wait all token put back func (tl *TokenLimiter) Wait() { - for len(tl.ch) < int(tl.count) { + for len(tl.ch) < tl.count { runtime.Gosched() } } // NewTokenLimiter creates a TokenLimiter with count tokens. -func NewTokenLimiter(count uint) *TokenLimiter { +func NewTokenLimiter(count int) *TokenLimiter { tl := &TokenLimiter{count: count, ch: make(chan *Token, count)} - for i := uint(0); i < count; i++ { + for i := 0; i < count; i++ { tl.ch <- &Token{ID: i} } From 4e78028de1a89b1af8f5e5d19dba3b473e98086d Mon Sep 17 00:00:00 2001 From: Lynn Date: Thu, 17 Apr 2025 11:09:05 +0800 Subject: [PATCH 3/6] collector: update errors --- collector/prometheus.go | 8 ++++---- k8s/server/collectors.go | 1 + 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/collector/prometheus.go b/collector/prometheus.go index 3f151b36..ff6a86d2 100644 --- a/collector/prometheus.go +++ b/collector/prometheus.go @@ -694,15 +694,15 @@ func collectSingleQuery(l *logprinter.Logger, c *http.Client, tokenID int, resul resp, err := c.Do(req) i++ if err != nil { - l.Errorf("[ID:%d-no.%d] failed query metric retry... interval:%v s, take time:%v. If prometheus OOM is the cause, consider reducing concurrency and metrics-min-interval", - tokenID, i, err, qInfo.intervalSec, getTime) + l.Errorf("[ID:%d-no.%d] failed query metric %s: %s retry... interval:%v s, take time:%v. If prometheus OOM is the cause, consider reducing concurrency and metrics-min-interval", + tokenID, i, mtc+nameSuffix, err, qInfo.intervalSec, getTime) time.Sleep(200 * time.Millisecond) return err } // Prometheus API response format is JSON. Every successful API request returns a 2xx status code. if resp.StatusCode/100 != 2 { - l.Errorf("[ID:%d-no.%d] failed query metric Status Code %d, retry... interval:%d s, take time:%v", - tokenID, i, resp.StatusCode, qInfo.intervalSec, getTime) + l.Errorf("[ID:%d-no.%d] failed query metric %s Status Code %d, retry... interval:%d s, take time:%v", + tokenID, i, mtc+nameSuffix, resp.StatusCode, qInfo.intervalSec, getTime) time.Sleep(200 * time.Millisecond) } defer resp.Body.Close() diff --git a/k8s/server/collectors.go b/k8s/server/collectors.go index 34df0fea..48f1c512 100644 --- a/k8s/server/collectors.go +++ b/k8s/server/collectors.go @@ -32,6 +32,7 @@ import ( operator "github.com/pingcap/tiup/pkg/cluster/operation" "github.com/pingcap/tiup/pkg/crypto/rand" logprinter "github.com/pingcap/tiup/pkg/logger/printer" + "k8s.io/klog/v2" ) // collect job status From a4e3a0c3387ad9a2d92d440e5449573c78a08ce9 Mon Sep 17 00:00:00 2001 From: Lynn Date: Tue, 22 Apr 2025 16:12:14 +0800 Subject: [PATCH 4/6] *: tiny update logs --- collector/prometheus.go | 84 +++++++++++++++++++++------------------ pkg/utils/tokenlimiter.go | 10 +++++ 2 files changed, 55 insertions(+), 39 deletions(-) diff --git a/collector/prometheus.go b/collector/prometheus.go index ff6a86d2..b2797a43 100644 --- a/collector/prometheus.go +++ b/collector/prometheus.go @@ -55,7 +55,8 @@ const ( subdirMetrics = "metrics" subdirRaw = "raw" maxQueryRange = 120 * 60 // 120min - smallQueryRange = 15 // 15s. + smallQueryRange = 15 // 15s + logQuerySeries = 120000 // The value is equal to the result of 3600*speedLimit/300(s), where the default value of speedLimit is 10000. ) type collectMonitor struct { @@ -331,20 +332,19 @@ func (c *MetricCollectOptions) Collect(m *Manager, topo *models.TiDBCluster) err }, Timeout: time.Second * time.Duration(c.opt.APITimeout), } - tsEnd, _ := utils.ParseTime(c.GetBaseOptions().ScrapeEnd) - tsStart, _ := utils.ParseTime(c.GetBaseOptions().ScrapeBegin) + if len(c.lowPriority) == 0 { - c.collectMetrics(m.logger, client, c.metrics, tsStart, tsEnd, midPriority, tl, bars) - m.logger.Infof("Dumping metrics finish .......................................... token limit:%d, concurrency:%d, take time:%v", - qLimit, c.opt.Concurrency, time.Since(startTime)) + c.collectMetrics(m.logger, client, c.metrics, midPriority, tl, bars) + m.logger.Infof("Dumping metrics finish .......................................... token limit:%d, take time:%v", + qLimit, time.Since(startTime)) } else { - c.collectMetrics(m.logger, client, c.metrics, tsStart, tsEnd, highPriority, tl, bars) + c.collectMetrics(m.logger, client, c.metrics, highPriority, tl, bars) m.logger.Infof("Dumping high priority metrics finish .......................................... token limit:%d, concurrency:%d, take time:%v", qLimit, c.opt.Concurrency, time.Since(startTime)) startTime = time.Now() - c.collectMetrics(m.logger, client, c.lowPriority, tsStart, tsEnd, lowPriority, tl, bars) - m.logger.Infof("Dumping low priority metrics finish .......................................... token limit:%d, concurrency:%d, take time:%v", - qLimit, c.opt.Concurrency, time.Since(startTime)) + c.collectMetrics(m.logger, client, c.lowPriority, lowPriority, tl, bars) + m.logger.Infof("Dumping low priority metrics finish .......................................... token limit:%d, take time:%v", + qLimit, time.Since(startTime)) } return nil @@ -360,7 +360,6 @@ func (c *MetricCollectOptions) collectMetrics( l *logprinter.Logger, client *http.Client, metrics []string, - tsStart, tsEnd time.Time, priority int, tl *utils.TokenLimiter, bars map[string]*progress.MultiBarItem, @@ -373,15 +372,15 @@ func (c *MetricCollectOptions) collectMetrics( minInterval = smallQueryRange } concurrency := 1 - if priority > lowPriority { - concurrency = c.opt.Concurrency - } total := len(c.metrics) if priority == highPriority { total = len(c.metrics) - len(c.lowPriority) } else if priority == lowPriority { total = len(c.lowPriority) + concurrency = tl.Cap() } + tsEnd, _ := utils.ParseTime(c.GetBaseOptions().ScrapeEnd) + tsStart, _ := utils.ParseTime(c.GetBaseOptions().ScrapeBegin) originInfo := queryRangeInfo{ queryBegin: tsStart, queryEnd: tsEnd, @@ -480,7 +479,7 @@ func collectSingleMetric( mtc string, label map[string]string, resultDir string, - speedlimit int, + speedLimit int, compress bool, customHeader []string, instance string, @@ -543,7 +542,7 @@ func collectSingleMetric( newLabel := make(map[string]string) maps.Copy(newLabel, label) newLabel["instance"] = instance - collectSingleMetric(l, c, promAddr, originInfo, concurrency, mtc, newLabel, resultDir, speedlimit, compress, customHeader, instance, curTokenID, tl) + collectSingleMetric(l, c, promAddr, originInfo, concurrency, mtc, newLabel, resultDir, speedLimit, compress, customHeader, instance, curTokenID, tl) } } return @@ -555,10 +554,10 @@ func collectSingleMetric( } // split time into smaller ranges to avoid querying too many data in one request - if speedlimit == 0 { - speedlimit = 10000 + if speedLimit == 0 { + speedLimit = 10000 } - block := 3600 * speedlimit / series + block := 3600 * speedLimit / series if block > maxQueryRange { block = maxQueryRange } @@ -566,10 +565,10 @@ func collectSingleMetric( block = originInfo.intervalSec } - isBlackList := mtc+nameSuffix == "tidb_session_parse_duration_seconds_bucket" || mtc+nameSuffix == "tidb_session_compile_duration_seconds_bucket" - if series > 1000 || block < maxQueryRange || isBlackList { - l.Infof("Dumping metric %s, concurrency:%d, speedlimit:%d, series:%d, interval:%d s, req timeout:%v, start time:%v ...", - mtc+nameSuffix, concurrency, speedlimit, series, block, c.Timeout, time.Now().Format(time.RFC3339)) + if block == originInfo.intervalSec || series >= logQuerySeries { + l.Infof("Collecting single metric %s series %d too large or interval %ds too small, so update concurrency from %d to %d, speedLimit:%d, req timeout:%v ...", + mtc+nameSuffix, series, block, concurrency, tl.Cap(), speedLimit, c.Timeout) + concurrency = tl.Cap() } retryOption := tiuputils.RetryOption{ Attempts: 3, @@ -601,6 +600,7 @@ func collectSingleMetric( queryEnd: queryEnd, intervalSec: querySec, } + logInfo := "" if concurrency == 1 { if err := collectSingleQuery(l, c, curTokenID, resultDir, mtc, nameSuffix, qInfo); err != nil { l.Errorf("Error quering metrics %s: %s... timeout:%v, take time:%v", @@ -611,13 +611,19 @@ func collectSingleMetric( if goCnt == 0 { wg.RunWithRecover(func() { collectQueries(l, c, curTokenID, resultDir, mtc, nameSuffix, queryInfoCh) }, nil) } else if goCnt < concurrency { - token := tl.Get() - wg.RunWithRecover(func() { - collectQueries(l, c, int(token.ID), resultDir, mtc, nameSuffix, queryInfoCh) - tl.Put(token) - }, - nil) + token := tl.TryGet() + if token != nil { + logInfo = fmt.Sprintf(" with a new goroutine ID:%v", token.ID) + wg.RunWithRecover(func() { + collectQueries(l, c, token.ID, resultDir, mtc, nameSuffix, queryInfoCh) + tl.Put(token) + }, nil) + } else { + logInfo = " try get failed" + } } + l.Infof("Collecting single metric %s%s, interval:%d s, put task no.%d range[%v:%v] to chan ...", + mtc+nameSuffix, logInfo, qInfo.intervalSec, goCnt, queryBegin.Format(time.RFC3339), queryEnd.Format(time.RFC3339)) } goCnt++ } @@ -637,8 +643,8 @@ func collectSingleMetric( time.Sleep(5 * time.Millisecond) } wg.Wait() - l.Infof("Dumped metric %s from %s to %s, concurrency:%d, wait take time:%v, total take time:%v", - mtc+nameSuffix, endTime.Format(time.RFC3339), beginTime.Format(time.RFC3339), goCnt, time.Since(startTime1), time.Since(startTime)) + l.Infof("Collected single metric %s from %s to %s take time:%v, total task:%v, concurrency:%d, wait take time:%v", + mtc+nameSuffix, endTime.Format(time.RFC3339), beginTime.Format(time.RFC3339), time.Since(startTime), goCnt, concurrency, time.Since(startTime1)) } type queryInfo struct { @@ -661,14 +667,14 @@ func collectQueries(l *logprinter.Logger, c *http.Client, tokenID int, resultDir for { qInfo, ok := <-queryInfoCh if !ok { - l.Infof("[ID:%d] collect metric %s finished", tokenID, mtc+nameSuffix) + l.Infof("[ID:%d] collect metric %s goroutine finished", tokenID, mtc+nameSuffix) return } startTime0 := time.Now() err := collectSingleQuery(l, c, tokenID, resultDir, mtc, nameSuffix, qInfo) if err != nil { - l.Errorf("[ID:%d] failed query metric %s: %s... client timeout:%v, take time:%v", + l.Errorf("[ID:%d] failed retry collecting a query metric %s: %s... client timeout:%v, take time:%v", tokenID, mtc+nameSuffix, err, c.Timeout*3+5*time.Second, time.Since(startTime0)) } } @@ -694,14 +700,14 @@ func collectSingleQuery(l *logprinter.Logger, c *http.Client, tokenID int, resul resp, err := c.Do(req) i++ if err != nil { - l.Errorf("[ID:%d-no.%d] failed query metric %s: %s retry... interval:%v s, take time:%v. If prometheus OOM is the cause, consider reducing concurrency and metrics-min-interval", + l.Errorf("[ID:%d-try:%d] failed query metric %s: %s retry... interval:%v s, take time:%v. If prometheus OOM is the cause, consider reducing concurrency and metrics-min-interval", tokenID, i, mtc+nameSuffix, err, qInfo.intervalSec, getTime) time.Sleep(200 * time.Millisecond) return err } // Prometheus API response format is JSON. Every successful API request returns a 2xx status code. if resp.StatusCode/100 != 2 { - l.Errorf("[ID:%d-no.%d] failed query metric %s Status Code %d, retry... interval:%d s, take time:%v", + l.Errorf("[ID:%d-try:%d] failed query metric %s Status Code %d, retry... interval:%d s, take time:%v", tokenID, i, mtc+nameSuffix, resp.StatusCode, qInfo.intervalSec, getTime) time.Sleep(200 * time.Millisecond) } @@ -714,7 +720,7 @@ func collectSingleQuery(l *logprinter.Logger, c *http.Client, tokenID int, resul ), ) if err != nil { - l.Errorf("[ID:%d-no.%d] collect metric %s: %s, retry...", tokenID, i, mtc+nameSuffix, err) + l.Errorf("[ID:%d-try:%d] failed query metric %s: %s, retry...", tokenID, i, mtc+nameSuffix, err) } defer dst.Close() @@ -724,7 +730,7 @@ func collectSingleQuery(l *logprinter.Logger, c *http.Client, tokenID int, resul // compress the metric enc, err = zstd.NewWriter(dst) if err != nil { - l.Errorf("[ID:%d-no.%d] failed compressing metric %s: %s, retry...\n", tokenID, i, mtc+nameSuffix, err) + l.Errorf("[ID:%d-try:%d] failed compressing metric %s: %s, retry...\n", tokenID, i, mtc+nameSuffix, err) return err } defer enc.Close() @@ -733,12 +739,12 @@ func collectSingleQuery(l *logprinter.Logger, c *http.Client, tokenID int, resul } n, err = io.Copy(enc, resp.Body) if err != nil { - l.Errorf("[ID:%d-no.%d] failed writing metric err %s to file: %s, retry...take time:%v \n", + l.Errorf("[ID:%d-try:%d] failed writing metric err %s to file: %s, retry...take time:%v \n", tokenID, i, mtc+nameSuffix, err, time.Since(startTime)) return err } if time.Since(startTime) > time.Second { - l.Infof("[ID:%d-no.%d] Dumped metric %s from %s to %s (%d bytes) take time:%v", + l.Infof("[ID:%d-try:%d] Collected a query metric %s from %s to %s (%d bytes) take a long time:%v", tokenID, i, mtc+nameSuffix, qInfo.queryBegin.Format(time.RFC3339), qInfo.queryEnd.Format(time.RFC3339), n, time.Since(startTime)) } return nil diff --git a/pkg/utils/tokenlimiter.go b/pkg/utils/tokenlimiter.go index 61bdb623..3725ef2b 100644 --- a/pkg/utils/tokenlimiter.go +++ b/pkg/utils/tokenlimiter.go @@ -43,6 +43,16 @@ func (tl *TokenLimiter) Get() *Token { return <-tl.ch } +// TryGet trys to obtain a token. +func (tl *TokenLimiter) TryGet() *Token { + select { + case token := <-tl.ch: + return token + default: + return nil + } +} + // Wait all token put back func (tl *TokenLimiter) Wait() { for len(tl.ch) < tl.count { From e18990bcc0c755afdda9da28cae035ae28680ebc Mon Sep 17 00:00:00 2001 From: Lynn Date: Wed, 7 May 2025 18:50:45 +0800 Subject: [PATCH 5/6] *: tiny update log --- collector/prometheus.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/collector/prometheus.go b/collector/prometheus.go index b2797a43..d971af13 100644 --- a/collector/prometheus.go +++ b/collector/prometheus.go @@ -566,9 +566,8 @@ func collectSingleMetric( } if block == originInfo.intervalSec || series >= logQuerySeries { - l.Infof("Collecting single metric %s series %d too large or interval %ds too small, so update concurrency from %d to %d, speedLimit:%d, req timeout:%v ...", - mtc+nameSuffix, series, block, concurrency, tl.Cap(), speedLimit, c.Timeout) - concurrency = tl.Cap() + l.Infof("Collecting single metric %s series %d too large and the interval is %ds, concurrency: %d, speedLimit:%d, req timeout:%v ...", + mtc+nameSuffix, series, block, tl.Cap(), speedLimit, c.Timeout) } retryOption := tiuputils.RetryOption{ Attempts: 3, @@ -576,6 +575,7 @@ func collectSingleMetric( Timeout: c.Timeout*3 + 5*time.Second, //make sure the retry timeout is longer than the api timeout } goCnt := 0 + taskCnt := 0 qInfo := queryInfo{ query: query, promAddr: promAddr, @@ -609,7 +609,9 @@ func collectSingleMetric( } else { queryInfoCh <- qInfo if goCnt == 0 { + logInfo = fmt.Sprintf(" with a new goroutine ID:%v", curTokenID) wg.RunWithRecover(func() { collectQueries(l, c, curTokenID, resultDir, mtc, nameSuffix, queryInfoCh) }, nil) + goCnt++ } else if goCnt < concurrency { token := tl.TryGet() if token != nil { @@ -618,14 +620,13 @@ func collectSingleMetric( collectQueries(l, c, token.ID, resultDir, mtc, nameSuffix, queryInfoCh) tl.Put(token) }, nil) - } else { - logInfo = " try get failed" + goCnt++ } } - l.Infof("Collecting single metric %s%s, interval:%d s, put task no.%d range[%v:%v] to chan ...", - mtc+nameSuffix, logInfo, qInfo.intervalSec, goCnt, queryBegin.Format(time.RFC3339), queryEnd.Format(time.RFC3339)) + l.Infof("Collecting single metric %s%s, go:%d, interval:%d s, put task no.%d range[%v:%v] to chan ...", + mtc+nameSuffix, logInfo, goCnt, qInfo.intervalSec, taskCnt, queryBegin.Format(time.RFC3339), queryEnd.Format(time.RFC3339)) } - goCnt++ + taskCnt++ } if concurrency == 1 { return From ed6f196224aebc06aff4b6a543b96f380cb78c83 Mon Sep 17 00:00:00 2001 From: Lynn Date: Mon, 13 Oct 2025 10:44:20 +0800 Subject: [PATCH 6/6] *: make vet happy --- collector/collect.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/collector/collect.go b/collector/collect.go index cf2ee371..d13c88f6 100644 --- a/collector/collect.go +++ b/collector/collect.go @@ -574,7 +574,7 @@ func (m *Manager) CollectClusterInfo( } logStr := fmt.Sprintf("The collected data has been stored in %s. For more details, please refer to the log at %s/diag.log.", dir, dir) fmt.Println(logStr) - m.logger.Infof(logStr) + m.logger.Infof("%s", logStr) return resultDir, nil }