From 7eebdb818d393e34b1e1018cf16a2744593b0498 Mon Sep 17 00:00:00 2001 From: 0xPoe Date: Mon, 4 May 2026 11:42:19 +0200 Subject: [PATCH 1/4] statistics: collect singleton sketches in row sampler Signed-off-by: 0xPoe --- pkg/statistics/BUILD.bazel | 2 +- pkg/statistics/constants.go | 2 + pkg/statistics/row_sampler.go | 274 +++++++++++++++--- pkg/statistics/sample_test.go | 170 +++++++++++ .../mockstore/unistore/cophandler/analyze.go | 1 + 5 files changed, 414 insertions(+), 35 deletions(-) diff --git a/pkg/statistics/BUILD.bazel b/pkg/statistics/BUILD.bazel index c67cdb90b630d..18ce1e8d21371 100644 --- a/pkg/statistics/BUILD.bazel +++ b/pkg/statistics/BUILD.bazel @@ -81,7 +81,7 @@ go_test( data = glob(["testdata/**"]), embed = [":statistics"], flaky = True, - shard_count = 44, + shard_count = 45, deps = [ "//pkg/config", "//pkg/meta/model", diff --git a/pkg/statistics/constants.go b/pkg/statistics/constants.go index 059d6d5a65879..323f03c556f97 100644 --- a/pkg/statistics/constants.go +++ b/pkg/statistics/constants.go @@ -20,4 +20,6 @@ const ( DefaultTopNValue = 100 // DefaultHistogramBuckets is the default number of histogram buckets DefaultHistogramBuckets = 256 + // NDVSampleSkipRate is the sample rate at which sampling is skipped, so NDV sketches are built from every row. + NDVSampleSkipRate = 1 ) diff --git a/pkg/statistics/row_sampler.go b/pkg/statistics/row_sampler.go index 53411cd3d54b7..a585cecedea21 100644 --- a/pkg/statistics/row_sampler.go +++ b/pkg/statistics/row_sampler.go @@ -17,6 +17,7 @@ package statistics import ( "container/heap" "context" + "maps" "math/rand" "unsafe" @@ -27,6 +28,7 @@ import ( "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/codec" "github.com/pingcap/tidb/pkg/util/collate" + "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/memory" "github.com/pingcap/tidb/pkg/util/sqlexec" "github.com/pingcap/tipb/go-tipb" @@ -41,12 +43,14 @@ type RowSampleCollector interface { } type baseCollector struct { - Samples WeightedRowSampleHeap - NullCount []int64 - FMSketches []*FMSketch - TotalSizes []int64 - Count int64 - MemSize int64 + Samples WeightedRowSampleHeap + NullCount []int64 + FMSketches []*FMSketch + SingletonSketches []*FMSketch + TotalSizes []int64 + Count int64 + SketchSampleCount int64 + MemSize int64 } // ReservoirRowSampleCollector collects the samples from the source and organize the samples by row. @@ -64,6 +68,73 @@ type ReservoirRowSampleCollector struct { MaxSampleSize int } +// singletonSketchBuilder partitions hash values into singletons (seen exactly +// once) and the rest. Only singletons feed the final FM sketch, which is the +// basis for population NDV estimation under sub-sampling. +type singletonSketchBuilder struct { + once map[uint64]struct{} + multiple map[uint64]struct{} +} + +func newSingletonSketchBuilder() *singletonSketchBuilder { + return &singletonSketchBuilder{ + once: make(map[uint64]struct{}), + multiple: make(map[uint64]struct{}), + } +} + +func (s *singletonSketchBuilder) insertHashValue(hashVal uint64) { + if _, ok := s.multiple[hashVal]; ok { + return + } + if _, ok := s.once[hashVal]; ok { + delete(s.once, hashVal) + s.multiple[hashVal] = struct{}{} + } else { + s.once[hashVal] = struct{}{} + } +} + +func (s *singletonSketchBuilder) insertValue(sc *stmtctx.StatementContext, value types.Datum) error { + hashVal, err := hashDatum(sc, value) + if err != nil { + return err + } + s.insertHashValue(hashVal) + return nil +} + +func (s *singletonSketchBuilder) insertRowValue(sc *stmtctx.StatementContext, values []types.Datum) error { + hashVal, err := hashRow(sc, values) + if err != nil { + return err + } + s.insertHashValue(hashVal) + return nil +} + +func (s *singletonSketchBuilder) clone() *singletonSketchBuilder { + if s == nil { + return nil + } + return &singletonSketchBuilder{ + once: maps.Clone(s.once), + multiple: maps.Clone(s.multiple), + } +} + +func (s *singletonSketchBuilder) build(maxSketchSize int) *FMSketch { + if s == nil { + return nil + } + intest.Assert(maxSketchSize > 0, "maxSketchSize should be greater than 0") + sketch := NewFMSketch(maxSketchSize) + for val := range s.once { + sketch.insertHashValue(val) + } + return sketch +} + // ReservoirRowSampleItem is the item for the ReservoirRowSampleCollector. The weight is needed for the sampling algorithm. type ReservoirRowSampleItem struct { Handle kv.Handle @@ -74,6 +145,14 @@ type ReservoirRowSampleItem struct { // EmptyReservoirSampleItemSize = (24 + 16 + 8) now. const EmptyReservoirSampleItemSize = int64(unsafe.Sizeof(ReservoirRowSampleItem{})) +// ShouldBuildSingletonSketches reports whether the configured NDV sample rate +// requires building per-node singleton sketches (rate < NDVSampleSkipRate means sketches are +// collected from a subset of rows and a global NDV estimate is derived from +// the singletons). +func ShouldBuildSingletonSketches(rate float64) bool { + return rate < NDVSampleSkipRate +} + // MemUsage returns the memory usage of sample item. func (i ReservoirRowSampleItem) MemUsage() (sum int64) { sum = EmptyReservoirSampleItemSize @@ -128,6 +207,7 @@ type RowSampleBuilder struct { ColGroups [][]int64 MaxSampleSize int SampleRate float64 + NDVSampleRate float64 MaxFMSketchSize int } @@ -145,10 +225,11 @@ func NewRowSampleCollector(maxSampleSize int, sampleRate float64, totalLen int) // NewReservoirRowSampleCollector creates the new collector by the given inputs. func NewReservoirRowSampleCollector(maxSampleSize int, totalLen int) *ReservoirRowSampleCollector { base := &baseCollector{ - Samples: make(WeightedRowSampleHeap, 0, maxSampleSize), - NullCount: make([]int64, totalLen), - FMSketches: make([]*FMSketch, 0, totalLen), - TotalSizes: make([]int64, totalLen), + Samples: make(WeightedRowSampleHeap, 0, maxSampleSize), + NullCount: make([]int64, totalLen), + FMSketches: make([]*FMSketch, 0, totalLen), + SingletonSketches: make([]*FMSketch, 0, totalLen), + TotalSizes: make([]int64, totalLen), } return &ReservoirRowSampleCollector{ baseCollector: base, @@ -160,9 +241,20 @@ func NewReservoirRowSampleCollector(maxSampleSize int, totalLen int) *ReservoirR // column group. // Then use the weighted reservoir sampling to collect the samples. func (s *RowSampleBuilder) Collect() (RowSampleCollector, error) { - collector := NewRowSampleCollector(s.MaxSampleSize, s.SampleRate, len(s.ColsFieldType)+len(s.ColGroups)) - for range len(s.ColsFieldType) + len(s.ColGroups) { + totalLen := len(s.ColsFieldType) + len(s.ColGroups) + collector := NewRowSampleCollector(s.MaxSampleSize, s.SampleRate, totalLen) + ndvSampleRate := s.NDVSampleRate + intest.Assert(ndvSampleRate > 0 && ndvSampleRate <= NDVSampleSkipRate, "NDVSampleRate must be in (0, 1]") + buildSingletons := ShouldBuildSingletonSketches(ndvSampleRate) + var singletonBuilders []*singletonSketchBuilder + if buildSingletons { + singletonBuilders = make([]*singletonSketchBuilder, 0, totalLen) + } + for range totalLen { collector.Base().FMSketches = append(collector.Base().FMSketches, NewFMSketch(s.MaxFMSketchSize)) + if buildSingletons { + singletonBuilders = append(singletonBuilders, newSingletonSketchBuilder()) + } } ctx := context.TODO() chk := s.RecordSet.NewChunk(nil) @@ -203,17 +295,25 @@ func (s *RowSampleBuilder) Collect() (RowSampleCollector, error) { datums[i].SetBytes(encodedKey) } } - err := collector.Base().collectColumns(s.Sc, datums, sizes) - if err != nil { - return nil, err - } - err = collector.Base().collectColumnGroups(s.Sc, datums, s.ColGroups, sizes) - if err != nil { - return nil, err + collectSketch := !buildSingletons || s.Rng.Float64() < ndvSampleRate + if collectSketch { + if err := collector.Base().collectColumns(s.Sc, datums, sizes, singletonBuilders); err != nil { + return nil, err + } + if err := collector.Base().collectColumnGroups(s.Sc, datums, s.ColGroups, sizes, singletonBuilders); err != nil { + return nil, err + } + if buildSingletons { + collector.Base().SketchSampleCount++ + } } collector.sampleRow(newCols, s.Rng) } } + // NDV sub-sampling only tallies NullCount/TotalSizes for rows that passed + // the rate check. Rescale them back to per-population estimates before the + // single-column-group copy below picks them up. + collector.Base().rescaleNullCountAndTotalSizes() for i, group := range s.ColGroups { if len(group) != 1 { continue @@ -224,17 +324,30 @@ func (s *RowSampleBuilder) Collect() (RowSampleCollector, error) { colIdx := group[0] colGroupIdx := len(s.ColsFieldType) + i collector.Base().FMSketches[colGroupIdx] = collector.Base().FMSketches[colIdx].Copy() + if buildSingletons { + singletonBuilders[colGroupIdx] = singletonBuilders[colIdx].clone() + } collector.Base().NullCount[colGroupIdx] = collector.Base().NullCount[colIdx] collector.Base().TotalSizes[colGroupIdx] = collector.Base().TotalSizes[colIdx] } + if buildSingletons { + collector.Base().buildSingletonSketches(singletonBuilders, s.MaxFMSketchSize) + } return collector, nil } func (s *baseCollector) destroyAndPutToPool() { s.FMSketches = nil // Release for GC. + s.SingletonSketches = nil } -func (s *baseCollector) collectColumns(sc *stmtctx.StatementContext, cols []types.Datum, sizes []int64) error { +func (s *baseCollector) collectColumns( + sc *stmtctx.StatementContext, + cols []types.Datum, + sizes []int64, + singletonBuilders []*singletonSketchBuilder, +) error { + collectSingletonSketch := singletonBuilders != nil for i, col := range cols { if col.IsNull() { s.NullCount[i]++ @@ -242,16 +355,27 @@ func (s *baseCollector) collectColumns(sc *stmtctx.StatementContext, cols []type } // Minus one is to remove the flag byte. s.TotalSizes[i] += sizes[i] - 1 - err := s.FMSketches[i].InsertValue(sc, col) - if err != nil { + if err := s.FMSketches[i].InsertValue(sc, col); err != nil { return err } + if collectSingletonSketch { + if err := singletonBuilders[i].insertValue(sc, col); err != nil { + return err + } + } } return nil } -func (s *baseCollector) collectColumnGroups(sc *stmtctx.StatementContext, cols []types.Datum, colGroups [][]int64, sizes []int64) error { +func (s *baseCollector) collectColumnGroups( + sc *stmtctx.StatementContext, + cols []types.Datum, + colGroups [][]int64, + sizes []int64, + singletonBuilders []*singletonSketchBuilder, +) error { colLen := len(cols) + collectSingletonSketch := singletonBuilders != nil datumBuffer := make([]types.Datum, 0, len(cols)) for i, group := range colGroups { if len(group) == 1 { @@ -268,26 +392,98 @@ func (s *baseCollector) collectColumnGroups(sc *stmtctx.StatementContext, cols [ s.TotalSizes[colLen+i] += sizes[c] - 1 } } - err := s.FMSketches[colLen+i].InsertRowValue(sc, datumBuffer) - if err != nil { + if err := s.FMSketches[colLen+i].InsertRowValue(sc, datumBuffer); err != nil { return err } + if collectSingletonSketch { + if err := singletonBuilders[colLen+i].insertRowValue(sc, datumBuffer); err != nil { + return err + } + } } return nil } +// rescaleSampledValue scales `sampled` (accumulated over `sampleCount` rows) +// into an estimate over `totalRowCount` rows using round-half-up division. +// Only used in the mock store path (unistore cophandler), so int64 is wide +// enough for any realistic test fixture. Non-positive inputs return 0. +func rescaleSampledValue(sampled, totalRowCount, sampleCount int64) int64 { + intest.Assert(sampleCount > 0, "sampleCount must be positive") + if sampled <= 0 { + return 0 + } + // Round-half-up integer division: floor(a*b/c + 0.5). + return (sampled*totalRowCount + sampleCount/2) / sampleCount +} + +// rescaleNullCountAndTotalSizes converts per-column null counts and total sizes +// gathered from the NDV sub-sample into estimates over the full row population. +// Count itself is exact (TiKV reports it from scanned_rows_per_range; the +// mock store has no scan-level sampling) and is only read here as the +// scaling divisor — it is never modified. No-op when no sub-sampling occurred +// (sampleCount == 0 or every row was sampled). +func (s *baseCollector) rescaleNullCountAndTotalSizes() { + sampleCount := s.SketchSampleCount + totalRowCount := s.Count + // sampleCount > totalRowCount would scale stats *down* and corrupt them. + intest.Assert(totalRowCount >= sampleCount, "totalRowCount must be bigger than or equal to sampleCount") + // No sub-sampling: values are already exact. + if sampleCount <= 0 { + return + } + // Scaling factor is 1.0; nothing to do. + if totalRowCount == sampleCount { + return + } + for i, nc := range s.NullCount { + s.NullCount[i] = rescaleSampledValue(nc, totalRowCount, sampleCount) + } + for i, ts := range s.TotalSizes { + s.TotalSizes[i] = rescaleSampledValue(ts, totalRowCount, sampleCount) + } +} + +func (s *baseCollector) buildSingletonSketches(singletonBuilders []*singletonSketchBuilder, maxSketchSize int) { + s.SingletonSketches = make([]*FMSketch, len(singletonBuilders)) + for i, builder := range singletonBuilders { + s.SingletonSketches[i] = builder.build(maxSketchSize) + } +} + +func (s *baseCollector) mergeSingletonSketches(singletonSketches []*FMSketch) { + // Initialize on the first merge; later merges use the same sketch layout. + if len(s.SingletonSketches) == 0 { + s.SingletonSketches = make([]*FMSketch, len(singletonSketches)) + for i, singletonSketch := range singletonSketches { + s.SingletonSketches[i] = singletonSketch.Copy() + } + return + } + intest.Assert(len(s.SingletonSketches) == len(singletonSketches), "singleton sketch count should match") + for i, singletonSketch := range singletonSketches { + s.SingletonSketches[i].MergeFMSketch(singletonSketch) + } +} + // ToProto converts the collector to pb struct. func (s *baseCollector) ToProto() *tipb.RowSampleCollector { pbFMSketches := make([]*tipb.FMSketch, 0, len(s.FMSketches)) for _, sketch := range s.FMSketches { pbFMSketches = append(pbFMSketches, FMSketchToProto(sketch)) } + pbSingletonSketches := make([]*tipb.FMSketch, 0, len(s.SingletonSketches)) + for _, sketch := range s.SingletonSketches { + pbSingletonSketches = append(pbSingletonSketches, FMSketchToProto(sketch)) + } collector := &tipb.RowSampleCollector{ - Samples: RowSamplesToProto(s.Samples), - NullCounts: s.NullCount, - Count: s.Count, - FmSketch: pbFMSketches, - TotalSize: s.TotalSizes, + Samples: RowSamplesToProto(s.Samples), + NullCounts: s.NullCount, + Count: s.Count, + FmSketch: pbFMSketches, + TotalSize: s.TotalSizes, + SingletonSketch: pbSingletonSketches, + SketchSampleCount: s.SketchSampleCount, } return collector } @@ -296,9 +492,14 @@ func (s *baseCollector) FromProto(pbCollector *tipb.RowSampleCollector, memTrack s.Count = pbCollector.Count s.NullCount = pbCollector.NullCounts s.FMSketches = make([]*FMSketch, 0, len(pbCollector.FmSketch)) + s.SketchSampleCount = pbCollector.GetSketchSampleCount() for _, pbSketch := range pbCollector.FmSketch { s.FMSketches = append(s.FMSketches, FMSketchFromProto(pbSketch)) } + s.SingletonSketches = make([]*FMSketch, 0, len(pbCollector.GetSingletonSketch())) + for _, pbSketch := range pbCollector.GetSingletonSketch() { + s.SingletonSketches = append(s.SingletonSketches, FMSketchFromProto(pbSketch)) + } s.TotalSizes = pbCollector.TotalSize sampleNum := len(pbCollector.Samples) s.Samples = make(WeightedRowSampleHeap, 0, sampleNum) @@ -370,9 +571,11 @@ func (s *ReservoirRowSampleCollector) sampleRow(row []types.Datum, rng *rand.Ran // MergeCollector merges the collectors to a final one. func (s *ReservoirRowSampleCollector) MergeCollector(subCollector RowSampleCollector) { s.Count += subCollector.Base().Count + s.SketchSampleCount += subCollector.Base().SketchSampleCount for i, fms := range subCollector.Base().FMSketches { s.FMSketches[i].MergeFMSketch(fms) } + s.mergeSingletonSketches(subCollector.Base().SingletonSketches) for i, nullCount := range subCollector.Base().NullCount { s.NullCount[i] += nullCount } @@ -440,10 +643,11 @@ type BernoulliRowSampleCollector struct { // NewBernoulliRowSampleCollector creates the new collector by the given inputs. func NewBernoulliRowSampleCollector(sampleRate float64, totalLen int) *BernoulliRowSampleCollector { base := &baseCollector{ - Samples: make(WeightedRowSampleHeap, 0, 8), - NullCount: make([]int64, totalLen), - FMSketches: make([]*FMSketch, 0, totalLen), - TotalSizes: make([]int64, totalLen), + Samples: make(WeightedRowSampleHeap, 0, 8), + NullCount: make([]int64, totalLen), + FMSketches: make([]*FMSketch, 0, totalLen), + SingletonSketches: make([]*FMSketch, 0, totalLen), + TotalSizes: make([]int64, totalLen), } return &BernoulliRowSampleCollector{ baseCollector: base, @@ -464,9 +668,11 @@ func (s *BernoulliRowSampleCollector) sampleRow(row []types.Datum, rng *rand.Ran // MergeCollector merges the collectors to a final one. func (s *BernoulliRowSampleCollector) MergeCollector(subCollector RowSampleCollector) { s.Count += subCollector.Base().Count + s.SketchSampleCount += subCollector.Base().SketchSampleCount for i := range subCollector.Base().FMSketches { s.FMSketches[i].MergeFMSketch(subCollector.Base().FMSketches[i]) } + s.mergeSingletonSketches(subCollector.Base().SingletonSketches) for i := range subCollector.Base().NullCount { s.NullCount[i] += subCollector.Base().NullCount[i] } diff --git a/pkg/statistics/sample_test.go b/pkg/statistics/sample_test.go index 9a5fca3f9cbdb..9c94f8a4c8779 100644 --- a/pkg/statistics/sample_test.go +++ b/pkg/statistics/sample_test.go @@ -19,10 +19,13 @@ import ( "testing" "time" + "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/planner/core/resolve" "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/collate" + "github.com/pingcap/tidb/pkg/util/memory" "github.com/pingcap/tidb/pkg/util/mock" "github.com/pingcap/tidb/pkg/util/sqlexec" "github.com/stretchr/testify/require" @@ -75,6 +78,7 @@ func TestWeightedSampling(t *testing.T) { Collators: make([]collate.Collator, 1), ColGroups: nil, MaxSampleSize: int(sampleNum), + NDVSampleRate: NDVSampleSkipRate, MaxFMSketchSize: 1000, Rng: rand.New(rand.NewSource(time.Now().UnixNano())), } @@ -117,6 +121,7 @@ func TestDistributedWeightedSampling(t *testing.T) { Collators: make([]collate.Collator, 1), ColGroups: nil, MaxSampleSize: int(sampleNum), + NDVSampleRate: NDVSampleSkipRate, MaxFMSketchSize: 1000, Rng: rand.New(rand.NewSource(time.Now().UnixNano())), } @@ -258,6 +263,28 @@ func TestBuildSampleFullNDV(t *testing.T) { require.Equal(t, 2, len(topN.TopN), "TopN should be trimmed to sampleNDV-1 items when ndv > sampleNDV") } +func TestRescaleSampledValue(t *testing.T) { + cases := []struct { + name string + sampled int64 + population int64 + sample int64 + expected int64 + }{ + {"identity when sampled == sample", 50, 100, 50, 100}, + {"scales up linearly", 10, 100, 50, 20}, + {"rounds half up", 1, 3, 2, 2}, + {"rounds down below half", 1, 4, 3, 1}, + {"zero sampled stays zero", 0, 100, 50, 0}, + {"negative sampled clamps to zero", -5, 100, 50, 0}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + require.Equal(t, c.expected, rescaleSampledValue(c.sampled, c.population, c.sample)) + }) + } +} + type testSampleSuite struct { count int rs sqlexec.RecordSet @@ -268,6 +295,149 @@ func TestSampleSerial(t *testing.T) { t.Run("SubTestCollectColumnStats", SubTestCollectColumnStats(s)) t.Run("SubTestMergeSampleCollector", SubTestMergeSampleCollector(s)) t.Run("SubTestCollectorProtoConversion", SubTestCollectorProtoConversion(s)) + t.Run("SubTestRowSampleDefaultNDVRate", SubTestRowSampleDefaultNDVRate()) + t.Run("SubTestRowSampleSingletonSketches", SubTestRowSampleSingletonSketches()) + t.Run("SubTestRowSampleRescalesNullCountUnderSubSampling", SubTestRowSampleRescalesNullCountUnderSubSampling()) + t.Run("SubTestSingletonSketchBuildRespectsMaxSize", SubTestSingletonSketchBuildRespectsMaxSize()) +} + +func SubTestRowSampleDefaultNDVRate() func(*testing.T) { + return func(t *testing.T) { + rs := recordSetForWeightSamplingTest(100) + builder := &RowSampleBuilder{ + Sc: mock.NewContext().GetSessionVars().StmtCtx, + RecordSet: rs, + ColsFieldType: []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, + Collators: make([]collate.Collator, 1), + MaxSampleSize: 10, + NDVSampleRate: NDVSampleSkipRate, + MaxFMSketchSize: 1000, + Rng: rand.New(rand.NewSource(1)), + } + collector, err := builder.Collect() + require.NoError(t, err) + + require.Equal(t, int64(100), collector.Base().FMSketches[0].NDV()) + require.Zero(t, collector.Base().SketchSampleCount) + require.Empty(t, collector.Base().SingletonSketches) + + pbCollector := collector.Base().ToProto() + require.Empty(t, pbCollector.GetSingletonSketch()) + require.Zero(t, pbCollector.GetSketchSampleCount()) + } +} + +func SubTestRowSampleSingletonSketches() func(*testing.T) { + return func(t *testing.T) { + intType := types.NewFieldType(mysql.TypeLonglong) + rs := &sqlexec.SimpleRecordSet{ + ResultFields: []*resolve.ResultField{ + {Column: &model.ColumnInfo{FieldType: *intType}}, + {Column: &model.ColumnInfo{FieldType: *intType}}, + }, + Rows: [][]any{ + {int64(1), int64(10)}, + {int64(2), int64(10)}, + {int64(2), int64(20)}, + {int64(3), int64(20)}, + }, + MaxChunkSize: 32, + } + + builder := &RowSampleBuilder{ + Sc: mock.NewContext().GetSessionVars().StmtCtx, + RecordSet: rs, + ColsFieldType: []*types.FieldType{intType, intType}, + Collators: make([]collate.Collator, 2), + ColGroups: [][]int64{{0}, {0, 1}}, + MaxSampleSize: 4, + NDVSampleRate: 0.5, + MaxFMSketchSize: 1000, + // Seed 1 deterministically samples rows [2, 3] under rate=0.5, exercising the + // "sketch sampling filtered some rows" path while keeping NDVs deterministic. + Rng: rand.New(rand.NewSource(1)), + } + collector, err := builder.Collect() + require.NoError(t, err) + base := collector.Base() + + // Only rows (2, 20) and (3, 20) pass the rate=0.5 sketch sampling check. + require.Equal(t, int64(2), base.SketchSampleCount) + require.Len(t, base.SingletonSketches, 4) + require.Equal(t, int64(2), base.SingletonSketches[0].NDV()) // col 0: 2 and 3 are singletons. + require.Equal(t, int64(0), base.SingletonSketches[1].NDV()) // col 1: 20 repeats. + // Single-column group [0] is cloned from col 0, so its singletons match. + require.Equal(t, base.SingletonSketches[0].NDV(), base.SingletonSketches[2].NDV()) + // Multi-column group [0,1]: (2,20) and (3,20) are singleton row pairs. + require.Equal(t, int64(2), base.SingletonSketches[3].NDV()) + + pbCollector := base.ToProto() + require.Equal(t, int64(2), pbCollector.GetSketchSampleCount()) + require.Len(t, pbCollector.GetSingletonSketch(), 4) + + restored := NewReservoirRowSampleCollector(4, 4) + restored.FromProto(pbCollector, memory.NewTracker(0, -1)) + require.Equal(t, base.SketchSampleCount, restored.SketchSampleCount) + require.Len(t, restored.SingletonSketches, 4) + require.Equal(t, base.SingletonSketches[0].NDV(), restored.SingletonSketches[0].NDV()) + require.Equal(t, base.SingletonSketches[3].NDV(), restored.SingletonSketches[3].NDV()) + } +} + +func SubTestRowSampleRescalesNullCountUnderSubSampling() func(*testing.T) { + return func(t *testing.T) { + intType := types.NewFieldType(mysql.TypeLonglong) + rs := &sqlexec.SimpleRecordSet{ + ResultFields: []*resolve.ResultField{ + {Column: &model.ColumnInfo{FieldType: *intType}}, + {Column: &model.ColumnInfo{FieldType: *intType}}, + }, + // Seed 1 + rate 0.5 deterministically samples rows [2, 3]. + // Nulls outside the sampled window are intentionally not "seen" so the + // rescaled estimate diverges from the true full-table null count — that + // is the entire point of post-sampling rescaling. + Rows: [][]any{ + {nil, int64(10)}, // row 0 — not sampled. + {int64(2), nil}, // row 1 — not sampled. + {int64(3), int64(20)}, // row 2 — sampled. no nulls. + {nil, int64(40)}, // row 3 — sampled. col 0 null. + }, + MaxChunkSize: 32, + } + + builder := &RowSampleBuilder{ + Sc: mock.NewContext().GetSessionVars().StmtCtx, + RecordSet: rs, + ColsFieldType: []*types.FieldType{intType, intType}, + Collators: make([]collate.Collator, 2), + MaxSampleSize: 4, + NDVSampleRate: 0.5, + MaxFMSketchSize: 1000, + Rng: rand.New(rand.NewSource(1)), + } + collector, err := builder.Collect() + require.NoError(t, err) + base := collector.Base() + + // Count is exact by construction (mock store doesn't drop rows), so we + // don't assert on it here — we only check that NullCount got rescaled. + // Rows 2 and 3 are sampled (rate=0.5, seed=1), and within that sample + // col 0 has 1 null and col 1 has 0 nulls. Rescale factor is 4/2 = 2. + require.Equal(t, int64(2), base.SketchSampleCount) + require.Equal(t, int64(2), base.NullCount[0]) + require.Equal(t, int64(0), base.NullCount[1]) + } +} + +func SubTestSingletonSketchBuildRespectsMaxSize() func(*testing.T) { + return func(t *testing.T) { + builder := newSingletonSketchBuilder() + for i := range 100 { + builder.insertHashValue(uint64(i)) + } + + require.LessOrEqual(t, len(builder.build(10).hashset), 10) + } } func createTestSampleSuite() *testSampleSuite { diff --git a/pkg/store/mockstore/unistore/cophandler/analyze.go b/pkg/store/mockstore/unistore/cophandler/analyze.go index f4d283dbbcf7d..5adf5673285bf 100644 --- a/pkg/store/mockstore/unistore/cophandler/analyze.go +++ b/pkg/store/mockstore/unistore/cophandler/analyze.go @@ -457,6 +457,7 @@ func handleAnalyzeFullSamplingReq( MaxSampleSize: int(colReq.SampleSize), MaxFMSketchSize: int(colReq.SketchSize), SampleRate: colReq.GetSampleRate(), + NDVSampleRate: statistics.NDVSampleSkipRate, Rng: rand.New(rand.NewSource(time.Now().UnixNano())), } collector, err := builder.Collect() From 49f24593fc13ecfd6935b9df1534a9a20095b580 Mon Sep 17 00:00:00 2001 From: 0xPoe Date: Mon, 25 May 2026 11:09:27 +0200 Subject: [PATCH 2/4] docs: add comments for two sample rates Signed-off-by: 0xPoe --- pkg/statistics/row_sampler.go | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/pkg/statistics/row_sampler.go b/pkg/statistics/row_sampler.go index a585cecedea21..8f4823e7830e0 100644 --- a/pkg/statistics/row_sampler.go +++ b/pkg/statistics/row_sampler.go @@ -199,14 +199,23 @@ func (h *WeightedRowSampleHeap) Pop() any { // RowSampleBuilder is used to construct the ReservoirRowSampleCollector to get the samples. type RowSampleBuilder struct { - RecordSet sqlexec.RecordSet - Sc *stmtctx.StatementContext - Rng *rand.Rand - ColsFieldType []*types.FieldType - Collators []collate.Collator - ColGroups [][]int64 - MaxSampleSize int - SampleRate float64 + RecordSet sqlexec.RecordSet + Sc *stmtctx.StatementContext + Rng *rand.Rand + ColsFieldType []*types.FieldType + Collators []collate.Collator + ColGroups [][]int64 + MaxSampleSize int + // SampleRate is the per-row keep probability for the row sample (the rows that + // build histograms and TopN). It drives Bernoulli sampling only when + // MaxSampleSize is 0; otherwise reservoir sampling is used and this is ignored. + SampleRate float64 + // NDVSampleRate is the per-row keep probability for the per-column aggregates: + // FMsketches, NULL counts, and TotalSizes. NULL counts and TotalSizes are + // rescaled back to the full population afterwards. It is in (0, 1] and uses a + // separate per-row draw from SampleRate. Below NDVSampleSkipRate (1) the + // sketches see only a sub-sample, so singleton sketches are built to estimate + // population NDV from it. NDVSampleRate float64 MaxFMSketchSize int } From ef312c723b8c1f888061714889eeb0f340920c0c Mon Sep 17 00:00:00 2001 From: 0xPoe Date: Mon, 25 May 2026 11:30:56 +0200 Subject: [PATCH 3/4] docs: add comments for Sketeches Signed-off-by: 0xPoe --- pkg/statistics/row_sampler.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/pkg/statistics/row_sampler.go b/pkg/statistics/row_sampler.go index 8f4823e7830e0..da27e0e17b26d 100644 --- a/pkg/statistics/row_sampler.go +++ b/pkg/statistics/row_sampler.go @@ -43,12 +43,20 @@ type RowSampleCollector interface { } type baseCollector struct { - Samples WeightedRowSampleHeap - NullCount []int64 - FMSketches []*FMSketch + Samples WeightedRowSampleHeap + NullCount []int64 + // FMSketches holds the per-column FM sketch used to estimate NDV. + FMSketches []*FMSketch + // SingletonSketches holds the per-column sketch of values seen exactly once in + // the sketch sub-sample; it recovers population NDV when NDVSampleRate < 1 and + // is empty otherwise. SingletonSketches []*FMSketch TotalSizes []int64 Count int64 + // SketchSampleCount is the number of rows fed into FMSketches and + // SingletonSketches. + // It rescales NullCount and TotalSizes to the full population; + // 0 means no sub-sampling. SketchSampleCount int64 MemSize int64 } From 60e55140bc5e0c639bff1f0fc7798fa1f0eaf80a Mon Sep 17 00:00:00 2001 From: 0xPoe Date: Mon, 25 May 2026 13:16:51 +0200 Subject: [PATCH 4/4] statistics: keep per-region singleton sketches when merging MergeCollector unioned the per-region singleton FM sketches. Union conflates them: a value that is a singleton in two regions occurs twice overall, yet stays in the union and was miscounted as a global singleton, inflating the NDV estimate. Keep each region's NDV and singleton sketches separate in a new RegionSketchSummary so EstimateGlobalSingletonBySketches can discount such values across regions. The merge collects one summary per region instead of unioning, preserving per-region granularity through every merge level. ref #67449 Signed-off-by: 0xPoe --- pkg/statistics/row_sampler.go | 65 +++++++++++++++++++++++++--------- pkg/statistics/sample_test.go | 66 +++++++++++++++++++++++++++++++++++ 2 files changed, 114 insertions(+), 17 deletions(-) diff --git a/pkg/statistics/row_sampler.go b/pkg/statistics/row_sampler.go index da27e0e17b26d..ee974e4b19afc 100644 --- a/pkg/statistics/row_sampler.go +++ b/pkg/statistics/row_sampler.go @@ -19,6 +19,7 @@ import ( "context" "maps" "math/rand" + "slices" "unsafe" "github.com/pingcap/tidb/pkg/kv" @@ -48,11 +49,19 @@ type baseCollector struct { // FMSketches holds the per-column FM sketch used to estimate NDV. FMSketches []*FMSketch // SingletonSketches holds the per-column sketch of values seen exactly once in - // the sketch sub-sample; it recovers population NDV when NDVSampleRate < 1 and - // is empty otherwise. + // the sketch sub-sample; it recovers population NDV when + // NDVSampleRate < NDVSampleSkipRate and is empty otherwise. A merged collector + // keeps per-region sketches in RegionSketchSummaries instead. SingletonSketches []*FMSketch - TotalSizes []int64 - Count int64 + // RegionSketchSummaries holds one summary per region merged into this collector + // (see RegionSketchSummary); it is nil on a leaf collector or when singleton + // sketches are not collected. + // + // It grows with the region count and is not counted in MemSize, so the consumer + // must bound and account for it. + RegionSketchSummaries []RegionSketchSummary + TotalSizes []int64 + Count int64 // SketchSampleCount is the number of rows fed into FMSketches and // SingletonSketches. // It rescales NullCount and TotalSizes to the full population; @@ -61,6 +70,21 @@ type baseCollector struct { MemSize int64 } +// RegionSketchSummary holds one region's NDV and singleton sketches and the row +// count that fed them. Regions are kept separate and never unioned: unioning the +// singleton sketches would break "seen exactly once" — a value that is a singleton +// in two regions occurs twice overall — and inflate the global singleton count. +// EstimateGlobalSingletonBySketches consumes the regions separately to avoid that. +type RegionSketchSummary struct { + // NDVSketches is the region's per-column distinct-value sketch. + NDVSketches []*FMSketch + // SingletonSketches is the region's per-column sketch of values seen exactly + // once in the region. + SingletonSketches []*FMSketch + // SketchSampleCount is the number of rows that fed these sketches. + SketchSampleCount int64 +} + // ReservoirRowSampleCollector collects the samples from the source and organize the samples by row. // It will maintain the following things: // @@ -154,7 +178,7 @@ type ReservoirRowSampleItem struct { const EmptyReservoirSampleItemSize = int64(unsafe.Sizeof(ReservoirRowSampleItem{})) // ShouldBuildSingletonSketches reports whether the configured NDV sample rate -// requires building per-node singleton sketches (rate < NDVSampleSkipRate means sketches are +// requires building per-region singleton sketches (rate < NDVSampleSkipRate means sketches are // collected from a subset of rows and a global NDV estimate is derived from // the singletons). func ShouldBuildSingletonSketches(rate float64) bool { @@ -356,6 +380,7 @@ func (s *RowSampleBuilder) Collect() (RowSampleCollector, error) { func (s *baseCollector) destroyAndPutToPool() { s.FMSketches = nil // Release for GC. s.SingletonSketches = nil + s.RegionSketchSummaries = nil } func (s *baseCollector) collectColumns( @@ -468,19 +493,25 @@ func (s *baseCollector) buildSingletonSketches(singletonBuilders []*singletonSke } } -func (s *baseCollector) mergeSingletonSketches(singletonSketches []*FMSketch) { - // Initialize on the first merge; later merges use the same sketch layout. - if len(s.SingletonSketches) == 0 { - s.SingletonSketches = make([]*FMSketch, len(singletonSketches)) - for i, singletonSketch := range singletonSketches { - s.SingletonSketches[i] = singletonSketch.Copy() - } +// collectRegionSketchSummaries records the sub-collector's per-region sketches on +// this collector instead of unioning them; see RegionSketchSummary for why. +func (s *baseCollector) collectRegionSketchSummaries(sub *baseCollector) { + // An already-merged sub-collector carries its own summaries; concatenate them. + if len(sub.RegionSketchSummaries) > 0 { + s.RegionSketchSummaries = append(s.RegionSketchSummaries, sub.RegionSketchSummaries...) return } - intest.Assert(len(s.SingletonSketches) == len(singletonSketches), "singleton sketch count should match") - for i, singletonSketch := range singletonSketches { - s.SingletonSketches[i].MergeFMSketch(singletonSketch) + // A leaf sub-collector is one region; skip it when it has no singleton sketches. + if len(sub.SingletonSketches) == 0 { + return } + // Reference the region's sketches instead of copying them — they are not mutated + // after the merge. slices.Clone duplicates only the pointer slice, not the maps. + s.RegionSketchSummaries = append(s.RegionSketchSummaries, RegionSketchSummary{ + NDVSketches: slices.Clone(sub.FMSketches), + SingletonSketches: slices.Clone(sub.SingletonSketches), + SketchSampleCount: sub.SketchSampleCount, + }) } // ToProto converts the collector to pb struct. @@ -592,7 +623,7 @@ func (s *ReservoirRowSampleCollector) MergeCollector(subCollector RowSampleColle for i, fms := range subCollector.Base().FMSketches { s.FMSketches[i].MergeFMSketch(fms) } - s.mergeSingletonSketches(subCollector.Base().SingletonSketches) + s.collectRegionSketchSummaries(subCollector.Base()) for i, nullCount := range subCollector.Base().NullCount { s.NullCount[i] += nullCount } @@ -689,7 +720,7 @@ func (s *BernoulliRowSampleCollector) MergeCollector(subCollector RowSampleColle for i := range subCollector.Base().FMSketches { s.FMSketches[i].MergeFMSketch(subCollector.Base().FMSketches[i]) } - s.mergeSingletonSketches(subCollector.Base().SingletonSketches) + s.collectRegionSketchSummaries(subCollector.Base()) for i := range subCollector.Base().NullCount { s.NullCount[i] += subCollector.Base().NullCount[i] } diff --git a/pkg/statistics/sample_test.go b/pkg/statistics/sample_test.go index 9c94f8a4c8779..7621ed89f7318 100644 --- a/pkg/statistics/sample_test.go +++ b/pkg/statistics/sample_test.go @@ -299,6 +299,7 @@ func TestSampleSerial(t *testing.T) { t.Run("SubTestRowSampleSingletonSketches", SubTestRowSampleSingletonSketches()) t.Run("SubTestRowSampleRescalesNullCountUnderSubSampling", SubTestRowSampleRescalesNullCountUnderSubSampling()) t.Run("SubTestSingletonSketchBuildRespectsMaxSize", SubTestSingletonSketchBuildRespectsMaxSize()) + t.Run("SubTestMergePreservesPerRegionSingletonSketches", SubTestMergePreservesPerRegionSingletonSketches()) } func SubTestRowSampleDefaultNDVRate() func(*testing.T) { @@ -552,3 +553,68 @@ func SubTestCollectorProtoConversion(s *testSampleSuite) func(*testing.T) { } } } + +// SubTestMergePreservesPerRegionSingletonSketches checks that merging keeps each +// region's sketches separate (in RegionSketchSummaries) rather than unioning the +// singleton sketches, which would miscount a value that is a singleton in two +// regions as a global singleton. +func SubTestMergePreservesPerRegionSingletonSketches() func(*testing.T) { + return func(t *testing.T) { + // Distinct hash values stand in for distinct data values; with maxSize=1000 + // the FM sketch mask stays 0 so NDV is exact. + const ( + a = uint64(100) + b = uint64(200) + c = uint64(300) + ) + // makeRegion builds a single-column leaf collector that mimics one analyze + // response: its own NDV sketch and singleton sketch over that region's + // sub-sample. + makeRegion := func(ndv, singleton []uint64, sketchSampleCount int64) *ReservoirRowSampleCollector { + coll := NewReservoirRowSampleCollector(1, 1) + coll.Base().FMSketches = []*FMSketch{newFMSketchFromHashValues(ndv...)} + coll.Base().SingletonSketches = []*FMSketch{newFMSketchFromHashValues(singleton...)} + coll.Base().SketchSampleCount = sketchSampleCount + return coll + } + + // `a` is a local singleton in both regions, so it occurs twice globally and + // is not a global singleton. `b` and `c` are the only global singletons. + region1 := makeRegion([]uint64{a, b}, []uint64{a, b}, 2) + region2 := makeRegion([]uint64{a, c}, []uint64{a, c}, 2) + + root := NewReservoirRowSampleCollector(1, 1) + root.Base().FMSketches = []*FMSketch{NewFMSketch(1000)} + root.MergeCollector(region1) + root.MergeCollector(region2) + + // The two regions stay separate instead of collapsing into one sketch. + require.Len(t, root.Base().RegionSketchSummaries, 2) + require.Equal(t, int64(4), root.Base().SketchSampleCount) + + ndvSketches := make([]*FMSketch, 0, 2) + singletonSketches := make([]*FMSketch, 0, 2) + for _, region := range root.Base().RegionSketchSummaries { + ndvSketches = append(ndvSketches, region.NDVSketches[0]) + singletonSketches = append(singletonSketches, region.SingletonSketches[0]) + } + // Per-region keeps `a` out of the global singleton count (it appears in both + // regions' NDV sketches); only `b` and `c` remain. + require.Equal(t, uint64(2), EstimateGlobalSingletonBySketches(ndvSketches, singletonSketches)) + + // Contrast with the previous behavior: unioning the two regions into a single + // sketch over-counts the cross-region duplicate `a` as a global singleton. + unionNDV := newFMSketchFromHashValues(a, b, c) + unionSingleton := newFMSketchFromHashValues(a, b, c) + require.Equal(t, uint64(3), + EstimateGlobalSingletonBySketches([]*FMSketch{unionNDV}, []*FMSketch{unionSingleton}), + "a union of the regions over-counts the cross-region duplicate") + + // A merged collector contributes its region summaries when merged again, so + // per-region granularity survives the second (worker -> root) merge level. + top := NewReservoirRowSampleCollector(1, 1) + top.Base().FMSketches = []*FMSketch{NewFMSketch(1000)} + top.MergeCollector(root) + require.Len(t, top.Base().RegionSketchSummaries, 2) + } +}