diff --git a/CHANGELOG.md b/CHANGELOG.md index b628c8e3c86..df45076e883 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ The admin UI is removed and unusable in this release. The `[admin]` configuratio - [#8302](https://github.com/influxdata/influxdb/pull/8302): Write throughput/concurrency improvements - [#8273](https://github.com/influxdata/influxdb/issues/8273): Remove the admin UI. - [#8327](https://github.com/influxdata/influxdb/pull/8327): Update to go1.8.1 +- [#8348](https://github.com/influxdata/influxdb/pull/8348): Add max concurrent compaction limits ### Bugfixes @@ -54,6 +55,8 @@ The admin UI is removed and unusable in this release. The `[admin]` configuratio - [#8315](https://github.com/influxdata/influxdb/issues/8315): Remove default upper time bound on DELETE queries. - [#8066](https://github.com/influxdata/influxdb/issues/8066): Fix LIMIT and OFFSET for certain aggregate queries. - [#8045](https://github.com/influxdata/influxdb/issues/8045): Refactor the subquery code and fix outer condition queries. +- [#7425](https://github.com/influxdata/influxdb/issues/7425): Fix compaction aborted log messages +- [#8123](https://github.com/influxdata/influxdb/issues/8123): TSM compaction does not remove .tmp on error ## v1.2.3 [unreleased] diff --git a/etc/config.sample.toml b/etc/config.sample.toml index e380aaeacd7..25d14c239af 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -89,6 +89,11 @@ # write or delete # compact-full-write-cold-duration = "4h" + # The maximum number of concurrent full and level compactions that can run at one time. A + # value of 0 results in runtime.GOMAXPROCS(0) used at runtime. This setting does not apply + # to cache snapshotting. + # max-concurrent-compactions = 0 + # The maximum series allowed per database before writes are dropped. This limit can prevent # high cardinality issues at the database level. This limit can be disabled by setting it to # 0. diff --git a/tsdb/config.go b/tsdb/config.go index 8971f768430..d1596bb8a3b 100644 --- a/tsdb/config.go +++ b/tsdb/config.go @@ -47,6 +47,10 @@ const ( // DefaultMaxValuesPerTag is the maximum number of values a tag can have within a measurement. DefaultMaxValuesPerTag = 100000 + + // DefaultMaxConcurrentCompactions is the maximum number of concurrent full and level compactions + // that can run at one time. A value of results in runtime.GOMAXPROCS(0) used at runtime. + DefaultMaxConcurrentCompactions = 0 ) // Config holds the configuration for the tsbd package. @@ -84,6 +88,12 @@ type Config struct { // A value of 0 disables the limit. MaxValuesPerTag int `toml:"max-values-per-tag"` + // MaxConcurrentCompactions is the maximum number of concurrent level and full compactions + // that can be running at one time across all shards. Compactions scheduled to run when the + // limit is reached are blocked until a running compaction completes. Snapshot compactions are + // not affected by this limit. A value of 0 limits compactions to runtime.GOMAXPROCS(0). + MaxConcurrentCompactions int `toml:"max-concurrent-compactions"` + TraceLoggingEnabled bool `toml:"trace-logging-enabled"` } @@ -100,8 +110,9 @@ func NewConfig() Config { CacheSnapshotWriteColdDuration: toml.Duration(DefaultCacheSnapshotWriteColdDuration), CompactFullWriteColdDuration: toml.Duration(DefaultCompactFullWriteColdDuration), - MaxSeriesPerDatabase: DefaultMaxSeriesPerDatabase, - MaxValuesPerTag: DefaultMaxValuesPerTag, + MaxSeriesPerDatabase: DefaultMaxSeriesPerDatabase, + MaxValuesPerTag: DefaultMaxValuesPerTag, + MaxConcurrentCompactions: DefaultMaxConcurrentCompactions, TraceLoggingEnabled: false, } @@ -115,6 +126,10 @@ func (c *Config) Validate() error { return errors.New("Data.WALDir must be specified") } + if c.MaxConcurrentCompactions < 0 { + return errors.New("max-concurrent-compactions must be greater than 0") + } + valid := false for _, e := range RegisteredEngines() { if e == c.Engine { @@ -152,5 +167,6 @@ func (c Config) Diagnostics() (*diagnostics.Diagnostics, error) { "compact-full-write-cold-duration": c.CompactFullWriteColdDuration, "max-series-per-database": c.MaxSeriesPerDatabase, "max-values-per-tag": c.MaxValuesPerTag, + "max-concurrent-compactions": c.MaxConcurrentCompactions, }), nil } diff --git a/tsdb/engine.go b/tsdb/engine.go index a17410c642a..a81144d07a2 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -12,6 +12,7 @@ import ( "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/estimator" + "github.com/influxdata/influxdb/pkg/limiter" "github.com/uber-go/zap" ) @@ -30,6 +31,8 @@ type Engine interface { Open() error Close() error SetEnabled(enabled bool) + SetCompactionsEnabled(enabled bool) + WithLogger(zap.Logger) LoadMetadataIndex(shardID uint64, index Index) error @@ -71,6 +74,8 @@ type Engine interface { // Statistics will return statistics relevant to this engine. Statistics(tags map[string]string) []models.Statistic LastModified() time.Time + DiskSize() int64 + IsIdle() bool io.WriterTo } @@ -136,10 +141,11 @@ func NewEngine(id uint64, i Index, path string, walPath string, options EngineOp // EngineOptions represents the options used to initialize the engine. type EngineOptions struct { - EngineVersion string - IndexVersion string - ShardID uint64 - InmemIndex interface{} // shared in-memory index + EngineVersion string + IndexVersion string + ShardID uint64 + InmemIndex interface{} // shared in-memory index + CompactionLimiter limiter.Fixed Config Config } diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index d5c911bf826..8d8ababff6e 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -260,7 +260,7 @@ func (c *Cache) Write(key string, values []Value) error { // Enough room in the cache? limit := c.maxSize - n := c.Size() + atomic.LoadUint64(&c.snapshotSize) + addedSize + n := c.Size() + addedSize if limit > 0 && n > limit { atomic.AddInt64(&c.stats.WriteErr, 1) @@ -293,7 +293,7 @@ func (c *Cache) WriteMulti(values map[string][]Value) error { // Enough room in the cache? limit := c.maxSize // maxSize is safe for reading without a lock. - n := c.Size() + atomic.LoadUint64(&c.snapshotSize) + addedSize + n := c.Size() + addedSize if limit > 0 && n > limit { atomic.AddInt64(&c.stats.WriteErr, 1) return ErrCacheMemorySizeLimitExceeded(n, limit) @@ -416,7 +416,7 @@ func (c *Cache) ClearSnapshot(success bool) { // Size returns the number of point-calcuated bytes the cache currently uses. func (c *Cache) Size() uint64 { - return atomic.LoadUint64(&c.size) + return atomic.LoadUint64(&c.size) + atomic.LoadUint64(&c.snapshotSize) } // increaseSize increases size by delta. diff --git a/tsdb/engine/tsm1/cache_test.go b/tsdb/engine/tsm1/cache_test.go index a7637ea49e4..7376e0b5950 100644 --- a/tsdb/engine/tsm1/cache_test.go +++ b/tsdb/engine/tsm1/cache_test.go @@ -448,7 +448,7 @@ func TestCache_Snapshot_Stats(t *testing.T) { } // Store size should have been reset. - if got, exp := c.Size(), uint64(0); got != exp { + if got, exp := c.Size(), uint64(16); got != exp { t.Fatalf("got %v, expected %v", got, exp) } diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 562624a5a39..7aba38a7aff 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -53,6 +53,8 @@ type CompactionPlanner interface { Plan(lastWrite time.Time) []CompactionGroup PlanLevel(level int) []CompactionGroup PlanOptimize() []CompactionGroup + Release(group []CompactionGroup) + FullyCompacted() bool } // DefaultPlanner implements CompactionPlanner using a strategy to roll up @@ -60,17 +62,13 @@ type CompactionPlanner interface { // to minimize the number of TSM files on disk while rolling up a bounder number // of files. type DefaultPlanner struct { - FileStore interface { - Stats() []FileStat - LastModified() time.Time - BlockCount(path string, idx int) int - } + FileStore fileStore - // CompactFullWriteColdDuration specifies the length of time after + // compactFullWriteColdDuration specifies the length of time after // which if no writes have been committed to the WAL, the engine will // do a full compaction of the TSM files in this shard. This duration // should always be greater than the CacheFlushWriteColdDuraion - CompactFullWriteColdDuration time.Duration + compactFullWriteColdDuration time.Duration // lastPlanCheck is the last time Plan was called lastPlanCheck time.Time @@ -81,6 +79,24 @@ type DefaultPlanner struct { // lastGenerations is the last set of generations found by findGenerations lastGenerations tsmGenerations + + // filesInUse is the set of files that have been returned as part of a plan and might + // be being compacted. Two plans should not return the same file at any given time. + filesInUse map[string]struct{} +} + +type fileStore interface { + Stats() []FileStat + LastModified() time.Time + BlockCount(path string, idx int) int +} + +func NewDefaultPlanner(fs fileStore, writeColdDuration time.Duration) *DefaultPlanner { + return &DefaultPlanner{ + FileStore: fs, + compactFullWriteColdDuration: writeColdDuration, + filesInUse: make(map[string]struct{}), + } } // tsmGeneration represents the TSM files within a generation. @@ -129,6 +145,12 @@ func (t *tsmGeneration) hasTombstones() bool { return false } +// FullyCompacted returns true if the shard is fully compacted. +func (c *DefaultPlanner) FullyCompacted() bool { + gens := c.findGenerations() + return len(gens) <= 1 && !gens.hasTombstones() +} + // PlanLevel returns a set of TSM files to rewrite for a specific level. func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup { // Determine the generations from all files on disk. We need to treat @@ -205,6 +227,10 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup { } } + if !c.acquire(cGroups) { + return nil + } + return cGroups } @@ -270,6 +296,10 @@ func (c *DefaultPlanner) PlanOptimize() []CompactionGroup { cGroups = append(cGroups, cGroup) } + if !c.acquire(cGroups) { + return nil + } + return cGroups } @@ -279,7 +309,7 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup { generations := c.findGenerations() // first check if we should be doing a full compaction because nothing has been written in a long time - if c.CompactFullWriteColdDuration > 0 && time.Since(lastWrite) > c.CompactFullWriteColdDuration && len(generations) > 1 { + if c.compactFullWriteColdDuration > 0 && time.Since(lastWrite) > c.compactFullWriteColdDuration && len(generations) > 1 { var tsmFiles []string var genCount int for i, group := range generations { @@ -316,7 +346,11 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup { return nil } - return []CompactionGroup{tsmFiles} + group := []CompactionGroup{tsmFiles} + if !c.acquire(group) { + return nil + } + return group } // don't plan if nothing has changed in the filestore @@ -449,6 +483,9 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup { tsmFiles = append(tsmFiles, cGroup) } + if !c.acquire(tsmFiles) { + return nil + } return tsmFiles } @@ -496,6 +533,40 @@ func (c *DefaultPlanner) findGenerations() tsmGenerations { return orderedGenerations } +func (c *DefaultPlanner) acquire(groups []CompactionGroup) bool { + c.mu.Lock() + defer c.mu.Unlock() + + // See if the new files are already in use + for _, g := range groups { + for _, f := range g { + if _, ok := c.filesInUse[f]; ok { + return false + } + } + } + + // Mark all the new files in use + for _, g := range groups { + for _, f := range g { + c.filesInUse[f] = struct{}{} + } + } + return true +} + +// Release removes the files reference in each compaction group allowing new plans +// to be able to use them. +func (c *DefaultPlanner) Release(groups []CompactionGroup) { + c.mu.Lock() + defer c.mu.Unlock() + for _, g := range groups { + for _, f := range g { + delete(c.filesInUse, f) + } + } +} + // Compactor merges multiple TSM files into new files or // writes a Cache into 1 or more TSM files. type Compactor struct { diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index 893b61519e7..502e42efa51 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxdb/tsdb/engine/tsm1" ) @@ -1090,8 +1091,8 @@ func TestCacheKeyIterator_Chunked(t *testing.T) { } func TestDefaultPlanner_Plan_Min(t *testing.T) { - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return []tsm1.FileStat{ tsm1.FileStat{ @@ -1108,8 +1109,8 @@ func TestDefaultPlanner_Plan_Min(t *testing.T) { }, } }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) tsm := cp.Plan(time.Now()) if exp, got := 0, len(tsm); got != exp { @@ -1151,13 +1152,13 @@ func TestDefaultPlanner_Plan_CombineSequence(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return data }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) expFiles := []tsm1.FileStat{data[0], data[1], data[2], data[3]} tsm := cp.Plan(time.Now()) @@ -1213,13 +1214,11 @@ func TestDefaultPlanner_Plan_MultipleGroups(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ - PathsFn: func() []tsm1.FileStat { - return data - }, + cp := tsm1.NewDefaultPlanner(&fakeFileStore{ + PathsFn: func() []tsm1.FileStat { + return data }, - } + }, tsdb.DefaultCompactFullWriteColdDuration) expFiles := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]} @@ -1280,13 +1279,13 @@ func TestDefaultPlanner_PlanLevel_SmallestCompactionStep(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return data }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) expFiles := []tsm1.FileStat{data[4], data[5]} tsm := cp.PlanLevel(1) @@ -1333,13 +1332,13 @@ func TestDefaultPlanner_PlanLevel_SplitFile(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return data }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) expFiles := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4]} tsm := cp.PlanLevel(3) @@ -1382,13 +1381,13 @@ func TestDefaultPlanner_PlanLevel_IsolatedLowLevel(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return data }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) expFiles := []tsm1.FileStat{data[2], data[3]} tsm := cp.PlanLevel(1) @@ -1435,13 +1434,13 @@ func TestDefaultPlanner_PlanLevel_IsolatedHighLevel(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return data }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) expFiles := []tsm1.FileStat{} tsm := cp.PlanLevel(3) @@ -1478,13 +1477,13 @@ func TestDefaultPlanner_PlanLevel3_MinFiles(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return data }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) expFiles := []tsm1.FileStat{} tsm := cp.PlanLevel(3) @@ -1510,13 +1509,13 @@ func TestDefaultPlanner_PlanLevel2_MinFiles(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return data }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) expFiles := []tsm1.FileStat{} tsm := cp.PlanLevel(2) @@ -1554,13 +1553,13 @@ func TestDefaultPlanner_PlanLevel_Tombstone(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return data }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) expFiles := []tsm1.FileStat{data[0], data[1]} tsm := cp.PlanLevel(3) @@ -1603,13 +1602,13 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return data }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]} expFiles2 := []tsm1.FileStat{data[4], data[5]} @@ -1652,13 +1651,13 @@ func TestDefaultPlanner_PlanOptimize_NoLevel4(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return data }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) expFiles := []tsm1.FileStat{} tsm := cp.PlanOptimize() @@ -1695,13 +1694,13 @@ func TestDefaultPlanner_PlanOptimize_Level4(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return data }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]} tsm := cp.PlanOptimize() @@ -1760,13 +1759,13 @@ func TestDefaultPlanner_PlanOptimize_Multiple(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return data }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]} expFiles2 := []tsm1.FileStat{data[5], data[6], data[7], data[8]} @@ -1813,13 +1812,13 @@ func TestDefaultPlanner_PlanOptimize_Optimized(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return data }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) expFiles := []tsm1.FileStat{} tsm := cp.PlanOptimize() @@ -1845,13 +1844,13 @@ func TestDefaultPlanner_PlanOptimize_Tombstones(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return data }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) expFiles := []tsm1.FileStat{data[0], data[1], data[2]} tsm := cp.PlanOptimize() @@ -1897,14 +1896,14 @@ func TestDefaultPlanner_Plan_FullOnCold(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return data }, }, - CompactFullWriteColdDuration: time.Nanosecond, - } + time.Nanosecond, + ) tsm := cp.Plan(time.Now().Add(-time.Second)) if exp, got := len(data), len(tsm[0]); got != exp { @@ -1932,13 +1931,13 @@ func TestDefaultPlanner_Plan_SkipMaxSizeFiles(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return data }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) tsm := cp.Plan(time.Now()) if exp, got := 0, len(tsm); got != exp { @@ -1975,15 +1974,13 @@ func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) { blockCount: 1000, } - cp := &tsm1.DefaultPlanner{ - FileStore: fs, - CompactFullWriteColdDuration: time.Nanosecond, - } - + cp := tsm1.NewDefaultPlanner(fs, time.Nanosecond) + plan := cp.Plan(time.Now().Add(-time.Second)) // first verify that our test set would return files - if exp, got := 4, len(cp.Plan(time.Now().Add(-time.Second))[0]); got != exp { + if exp, got := 4, len(plan[0]); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) } + cp.Release(plan) // skip planning if all files are over the limit over := []tsm1.FileStat{ @@ -2017,14 +2014,18 @@ func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) { } cp.FileStore = overFs - if exp, got := 0, len(cp.Plan(time.Now().Add(-time.Second))); got != exp { + plan = cp.Plan(time.Now().Add(-time.Second)) + if exp, got := 0, len(plan); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) } + cp.Release(plan) + plan = cp.PlanOptimize() // ensure the optimize planner would pick this up - if exp, got := 1, len(cp.PlanOptimize()); got != exp { + if exp, got := 1, len(plan); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) } + cp.Release(plan) cp.FileStore = fs // ensure that it will plan if last modified has changed @@ -2082,15 +2083,14 @@ func TestDefaultPlanner_Plan_TwoGenLevel3(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ blockCount: 1000, PathsFn: func() []tsm1.FileStat { return data }, }, - CompactFullWriteColdDuration: time.Hour, - } + time.Hour) tsm := cp.Plan(time.Now().Add(-24 * time.Hour)) if exp, got := 1, len(tsm); got != exp { @@ -2127,15 +2127,17 @@ func TestDefaultPlanner_Plan_NotFullOverMaxsize(t *testing.T) { blockCount: 100, } - cp := &tsm1.DefaultPlanner{ - FileStore: fs, - CompactFullWriteColdDuration: time.Nanosecond, - } + cp := tsm1.NewDefaultPlanner( + fs, + time.Nanosecond, + ) + plan := cp.Plan(time.Now().Add(-time.Second)) // first verify that our test set would return files - if exp, got := 4, len(cp.Plan(time.Now().Add(-time.Second))[0]); got != exp { + if exp, got := 4, len(plan[0]); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) } + cp.Release(plan) // skip planning if all files are over the limit over := []tsm1.FileStat{ @@ -2188,13 +2190,13 @@ func TestDefaultPlanner_Plan_CompactsMiddleSteps(t *testing.T) { }, } - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return data }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) expFiles := []tsm1.FileStat{data[0], data[1], data[2], data[3]} tsm := cp.Plan(time.Now()) @@ -2210,8 +2212,8 @@ func TestDefaultPlanner_Plan_CompactsMiddleSteps(t *testing.T) { } func TestDefaultPlanner_Plan_LargeSets(t *testing.T) { - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return []tsm1.FileStat{ tsm1.FileStat{ @@ -2236,8 +2238,8 @@ func TestDefaultPlanner_Plan_LargeSets(t *testing.T) { }, } }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) tsm := cp.Plan(time.Now()) if exp, got := 0, len(tsm); got != exp { @@ -2246,8 +2248,8 @@ func TestDefaultPlanner_Plan_LargeSets(t *testing.T) { } func TestDefaultPlanner_Plan_LargeGeneration(t *testing.T) { - cp := &tsm1.DefaultPlanner{ - FileStore: &fakeFileStore{ + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return []tsm1.FileStat{ tsm1.FileStat{ @@ -2272,8 +2274,8 @@ func TestDefaultPlanner_Plan_LargeGeneration(t *testing.T) { }, } }, - }, - } + }, tsdb.DefaultCompactFullWriteColdDuration, + ) tsm := cp.Plan(time.Now()) if exp, got := 0, len(tsm); got != exp { diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 125a27e856b..b4572d7fd61 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -23,6 +23,7 @@ import ( "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/bytesutil" "github.com/influxdata/influxdb/pkg/estimator" + "github.com/influxdata/influxdb/pkg/limiter" "github.com/influxdata/influxdb/tsdb" _ "github.com/influxdata/influxdb/tsdb/index" "github.com/uber-go/zap" @@ -132,6 +133,9 @@ type Engine struct { enableCompactionsOnOpen bool stats *EngineStatistics + + // The limiter for concurrent compactions + compactionLimiter limiter.Fixed } // NewEngine returns a new instance of Engine. @@ -161,17 +165,15 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, opt tsdb. WAL: w, Cache: cache, - FileStore: fs, - Compactor: c, - CompactionPlan: &DefaultPlanner{ - FileStore: fs, - CompactFullWriteColdDuration: time.Duration(opt.Config.CompactFullWriteColdDuration), - }, + FileStore: fs, + Compactor: c, + CompactionPlan: NewDefaultPlanner(fs, time.Duration(opt.Config.CompactFullWriteColdDuration)), CacheFlushMemorySizeThreshold: opt.Config.CacheSnapshotMemorySize, CacheFlushWriteColdDuration: time.Duration(opt.Config.CacheSnapshotWriteColdDuration), enableCompactionsOnOpen: true, - stats: &EngineStatistics{}, + stats: &EngineStatistics{}, + compactionLimiter: opt.CompactionLimiter, } // Attach fieldset to index. @@ -243,6 +245,7 @@ func (e *Engine) disableLevelCompactions(wait bool) { e.levelWorkers += 1 } + var cleanup bool if old == 0 && e.done != nil { // Prevent new compactions from starting e.Compactor.DisableCompactions() @@ -250,12 +253,13 @@ func (e *Engine) disableLevelCompactions(wait bool) { // Stop all background compaction goroutines close(e.done) e.done = nil + cleanup = true } e.mu.Unlock() e.wg.Wait() - if old == 0 { // first to disable should cleanup + if cleanup { // first to disable should cleanup if err := e.cleanup(); err != nil { e.logger.Info(fmt.Sprintf("error cleaning up temp file: %v", err)) } @@ -428,6 +432,11 @@ func (e *Engine) Statistics(tags map[string]string) []models.Statistic { return statistics } +// DiskSize returns the total size in bytes of all TSM and WAL segments on disk. +func (e *Engine) DiskSize() int64 { + return e.FileStore.DiskSizeBytes() + e.WAL.DiskSizeBytes() +} + // Open opens and initializes the engine. func (e *Engine) Open() error { if err := os.MkdirAll(e.path, 0777); err != nil { @@ -526,6 +535,21 @@ func (e *Engine) LoadMetadataIndex(shardID uint64, index tsdb.Index) error { return nil } +// IsIdle returns true if the cache is empty, there are no running compactions and the +// shard is fully compacted. +func (e *Engine) IsIdle() bool { + cacheEmpty := e.Cache.Size() == 0 + + runningCompactions := atomic.LoadInt64(&e.stats.CacheCompactionsActive) + runningCompactions += atomic.LoadInt64(&e.stats.TSMCompactionsActive[0]) + runningCompactions += atomic.LoadInt64(&e.stats.TSMCompactionsActive[1]) + runningCompactions += atomic.LoadInt64(&e.stats.TSMCompactionsActive[2]) + runningCompactions += atomic.LoadInt64(&e.stats.TSMFullCompactionsActive) + runningCompactions += atomic.LoadInt64(&e.stats.TSMOptimizeCompactionsActive) + + return cacheEmpty && runningCompactions == 0 && e.CompactionPlan.FullyCompacted() +} + // Backup writes a tar archive of any TSM files modified since the passed // in time to the passed in writer. The basePath will be prepended to the names // of the files in the archive. It will force a snapshot of the WAL first @@ -1165,8 +1189,12 @@ func (e *Engine) compactTSMLevel(fast bool, level int, quit <-chan struct{}) { case <-t.C: s := e.levelCompactionStrategy(fast, level) if s != nil { + // Release the files in the compaction plan + defer e.CompactionPlan.Release(s.compactionGroups) + s.Apply() } + } } } @@ -1183,6 +1211,8 @@ func (e *Engine) compactTSMFull(quit <-chan struct{}) { case <-t.C: s := e.fullCompactionStrategy() if s != nil { + // Release the files in the compaction plan + defer e.CompactionPlan.Release(s.compactionGroups) s.Apply() } @@ -1205,6 +1235,7 @@ type compactionStrategy struct { logger zap.Logger compactor *Compactor fileStore *FileStore + limiter limiter.Fixed } // Apply concurrently compacts all the groups in a compaction strategy. @@ -1226,6 +1257,12 @@ func (s *compactionStrategy) Apply() { // compactGroup executes the compaction strategy against a single CompactionGroup. func (s *compactionStrategy) compactGroup(groupNum int) { + // Limit concurrent compactions if we have a limiter + if cap(s.limiter) > 0 { + s.limiter.Take() + defer s.limiter.Release() + } + group := s.compactionGroups[groupNum] start := time.Now() s.logger.Info(fmt.Sprintf("beginning %s compaction of group %d, %d TSM files", s.description, groupNum, len(group))) @@ -1290,6 +1327,7 @@ func (e *Engine) levelCompactionStrategy(fast bool, level int) *compactionStrate fileStore: e.FileStore, compactor: e.Compactor, fast: fast, + limiter: e.compactionLimiter, description: fmt.Sprintf("level %d", level), activeStat: &e.stats.TSMCompactionsActive[level-1], @@ -1320,6 +1358,7 @@ func (e *Engine) fullCompactionStrategy() *compactionStrategy { fileStore: e.FileStore, compactor: e.Compactor, fast: optimize, + limiter: e.compactionLimiter, } if optimize { diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index 63b385f1947..8254a37ac32 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -1059,6 +1059,8 @@ type mockPlanner struct{} func (m *mockPlanner) Plan(lastWrite time.Time) []tsm1.CompactionGroup { return nil } func (m *mockPlanner) PlanLevel(level int) []tsm1.CompactionGroup { return nil } func (m *mockPlanner) PlanOptimize() []tsm1.CompactionGroup { return nil } +func (m *mockPlanner) Release(groups []tsm1.CompactionGroup) {} +func (m *mockPlanner) FullyCompacted() bool { return false } // ParseTags returns an instance of Tags for a comma-delimited list of key/values. func ParseTags(s string) influxql.Tags { diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index decb8430830..e3905357702 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -317,13 +317,17 @@ func (f *FileStore) Delete(keys []string) error { // DeleteRange removes the values for keys between timestamps min and max. func (f *FileStore) DeleteRange(keys []string, min, max int64) error { + if err := f.walkFiles(func(tsm TSMFile) error { + return tsm.DeleteRange(keys, min, max) + }); err != nil { + return err + } + f.mu.Lock() f.lastModified = time.Now().UTC() + f.lastFileStats = nil f.mu.Unlock() - - return f.walkFiles(func(tsm TSMFile) error { - return tsm.DeleteRange(keys, min, max) - }) + return nil } // Open loads all the TSM files in the configured directory. @@ -382,15 +386,6 @@ func (f *FileStore) Open() error { return fmt.Errorf("error opening file %s: %v", fn, err) } - // Accumulate file store size stat - fi, err := file.Stat() - if err == nil { - atomic.AddInt64(&f.stats.DiskBytes, fi.Size()) - if fi.ModTime().UTC().After(f.lastModified) { - f.lastModified = fi.ModTime().UTC() - } - } - go func(idx int, file *os.File) { start := time.Now() df, err := NewTSMReader(file) @@ -404,6 +399,7 @@ func (f *FileStore) Open() error { }(i, file) } + var lm int64 for range files { res := <-readerC if res.err != nil { @@ -411,7 +407,19 @@ func (f *FileStore) Open() error { return res.err } f.files = append(f.files, res.r) + // Accumulate file store size stats + atomic.AddInt64(&f.stats.DiskBytes, int64(res.r.Size())) + for _, ts := range res.r.TombstoneFiles() { + atomic.AddInt64(&f.stats.DiskBytes, int64(ts.Size)) + } + + // Re-initialize the lastModified time for the file store + if res.r.LastModified() > lm { + lm = res.r.LastModified() + } + } + f.lastModified = time.Unix(0, lm) close(readerC) sort.Sort(tsmReaders(f.files)) @@ -434,6 +442,10 @@ func (f *FileStore) Close() error { return nil } +func (f *FileStore) DiskSizeBytes() int64 { + return atomic.LoadInt64(&f.stats.DiskBytes) +} + // Read returns the slice of values for the given key and the given timestamp, // if any file matches those constraints. func (f *FileStore) Read(key string, t int64) ([]Value, error) { @@ -623,6 +635,10 @@ func (f *FileStore) Replace(oldFiles, newFiles []string) error { var totalSize int64 for _, file := range f.files { totalSize += int64(file.Size()) + for _, ts := range file.TombstoneFiles() { + totalSize += int64(ts.Size) + } + } atomic.StoreInt64(&f.stats.DiskBytes, totalSize) diff --git a/tsdb/engine/tsm1/reader.go b/tsdb/engine/tsm1/reader.go index 0c9d2779141..7e5abf3b992 100644 --- a/tsdb/engine/tsm1/reader.go +++ b/tsdb/engine/tsm1/reader.go @@ -465,6 +465,11 @@ func (t *TSMReader) Size() uint32 { func (t *TSMReader) LastModified() int64 { t.mu.RLock() lm := t.lastModified + for _, ts := range t.tombstoner.TombstoneFiles() { + if ts.LastModified > lm { + lm = ts.LastModified + } + } t.mu.RUnlock() return lm } diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go index faff78c425f..28082629b5a 100644 --- a/tsdb/engine/tsm1/wal.go +++ b/tsdb/engine/tsm1/wal.go @@ -375,6 +375,10 @@ func (l *WAL) LastWriteTime() time.Time { return l.lastWriteTime } +func (l *WAL) DiskSizeBytes() int64 { + return atomic.LoadInt64(&l.stats.OldBytes) + atomic.LoadInt64(&l.stats.CurrentBytes) +} + func (l *WAL) writeToLog(entry WALEntry) (int, error) { // limit how many concurrent encodings can be in flight. Since we can only // write one at a time to disk, a slow disk can cause the allocations below diff --git a/tsdb/index/inmem/inmem.go b/tsdb/index/inmem/inmem.go index 940faa8d4b7..ca40cd8ce34 100644 --- a/tsdb/index/inmem/inmem.go +++ b/tsdb/index/inmem/inmem.go @@ -263,10 +263,13 @@ func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[s // ForEachMeasurementTagKey iterates over all tag keys for a measurement. func (i *Index) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error { + // Ensure we do not hold a lock on the index while fn executes in case fn tries + // to acquire a lock on the index again. If another goroutine has Lock, this will + // deadlock. i.mu.RLock() - defer i.mu.RUnlock() - mm := i.measurements[string(name)] + i.mu.RUnlock() + if mm == nil { return nil } @@ -537,9 +540,9 @@ func (i *Index) DropSeries(key []byte) error { // ForEachMeasurementSeriesByExpr iterates over all series in a measurement filtered by an expression. func (i *Index) ForEachMeasurementSeriesByExpr(name []byte, expr influxql.Expr, fn func(tags models.Tags) error) error { i.mu.RLock() - defer i.mu.RUnlock() - mm := i.measurements[string(name)] + i.mu.RUnlock() + if mm == nil { return nil } @@ -731,7 +734,6 @@ type ShardIndex struct { // CreateSeriesListIfNotExists creates a list of series if they doesn't exist in bulk. func (idx *ShardIndex) CreateSeriesListIfNotExists(keys, names [][]byte, tagsSlice []models.Tags) error { - keys, names, tagsSlice = idx.assignExistingSeries(idx.id, keys, names, tagsSlice) if len(keys) == 0 { return nil diff --git a/tsdb/meta.go b/tsdb/meta.go index b127c33899f..8d8ffae6ada 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -287,12 +287,9 @@ func (m *Measurement) ForEachSeriesByExpr(condition influxql.Expr, fn func(tags return err } - m.mu.RLock() - defer m.mu.RUnlock() - // Iterate over each series. for _, id := range ids { - s := m.seriesByID[id] + s := m.SeriesByID(id) if err := fn(s.Tags()); err != nil { return err } diff --git a/tsdb/shard.go b/tsdb/shard.go index 680245f997b..5682f2f7f49 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -6,7 +6,6 @@ import ( "fmt" "io" "math" - "os" "path/filepath" "regexp" "sort" @@ -110,6 +109,7 @@ type Shard struct { path string walPath string id uint64 + wg sync.WaitGroup database string retentionPolicy string @@ -206,14 +206,9 @@ func (s *Shard) Statistics(tags map[string]string) []models.Statistic { return nil } - // TODO(edd): Should statSeriesCreate be the current number of series in the - // shard, or the total number of series ever created? - sSketch, tSketch, err := s.engine.SeriesSketches() - seriesN := int64(sSketch.Count() - tSketch.Count()) - if err != nil { - s.logger.Error("cannot compute series sketch", zap.Error(err)) - seriesN = 0 - } + // Refresh our disk size stat + _, _ = s.DiskSize() + seriesN := s.engine.SeriesN() tags = s.defaultTags.Merge(tags) statistics := []models.Statistic{{ @@ -288,8 +283,6 @@ func (s *Shard) Open() error { } s.engine = e - go s.monitor() - return nil }(); err != nil { s.close(true) @@ -335,6 +328,7 @@ func (s *Shard) close(clean bool) error { default: close(s.closing) } + s.wg.Wait() if clean { // Don't leak our shard ID and series keys in the index @@ -352,6 +346,12 @@ func (s *Shard) close(clean bool) error { return err } +func (s *Shard) IndexType() string { + s.mu.RLock() + defer s.mu.RUnlock() + return s.index.Type() +} + // ready determines if the Shard is ready for queries or writes. // It returns nil if ready, otherwise ErrShardClosed or ErrShardDiabled func (s *Shard) ready() error { @@ -380,35 +380,28 @@ func (s *Shard) UnloadIndex() { s.index.RemoveShard(s.id) } -// DiskSize returns the size on disk of this shard -func (s *Shard) DiskSize() (int64, error) { - var size int64 - err := filepath.Walk(s.path, func(_ string, fi os.FileInfo, err error) error { - if err != nil { - return err - } - - if !fi.IsDir() { - size += fi.Size() - } - return err - }) - if err != nil { - return 0, err +// IsIdle return true if the shard is not receiving writes and is fully compacted. +func (s *Shard) IsIdle() bool { + if err := s.ready(); err != nil { + return true } - err = filepath.Walk(s.walPath, func(_ string, fi os.FileInfo, err error) error { - if err != nil { - return err - } + return s.engine.IsIdle() +} - if !fi.IsDir() { - size += fi.Size() - } - return err - }) +// SetCompactionsEnabled enables or disable shard background compactions. +func (s *Shard) SetCompactionsEnabled(enabled bool) { + if err := s.ready(); err != nil { + return + } + s.engine.SetCompactionsEnabled(enabled) +} - return size, err +// DiskSize returns the size on disk of this shard +func (s *Shard) DiskSize() (int64, error) { + size := s.engine.DiskSize() + atomic.StoreInt64(&s.stats.DiskBytes, size) + return size, nil } // FieldCreate holds information for a field to create on a measurement. @@ -964,62 +957,12 @@ func (s *Shard) CreateSnapshot() (string, error) { return s.engine.CreateSnapshot() } -func (s *Shard) monitor() { - t := time.NewTicker(monitorStatInterval) - defer t.Stop() - t2 := time.NewTicker(time.Minute) - defer t2.Stop() - var changed time.Time - - for { - select { - case <-s.closing: - return - case <-t.C: - - // Checking DiskSize can be expensive with a lot of shards and TSM files, only - // check if something has changed. - lm := s.LastModified() - if lm.Equal(changed) { - continue - } - - size, err := s.DiskSize() - if err != nil { - s.logger.Info(fmt.Sprintf("Error collecting shard size: %v", err)) - continue - } - atomic.StoreInt64(&s.stats.DiskBytes, size) - changed = lm - case <-t2.C: - if s.options.Config.MaxValuesPerTag == 0 { - continue - } - - names, err := s.MeasurementNamesByExpr(nil) - if err != nil { - s.logger.Warn("cannot retrieve measurement names", zap.Error(err)) - continue - } - - for _, name := range names { - s.engine.ForEachMeasurementTagKey(name, func(k []byte) error { - n := s.engine.TagKeyCardinality(name, k) - perc := int(float64(n) / float64(s.options.Config.MaxValuesPerTag) * 100) - if perc > 100 { - perc = 100 - } +func (s *Shard) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error { + return s.engine.ForEachMeasurementTagKey(name, fn) +} - // Log at 80, 85, 90-100% levels - if perc == 80 || perc == 85 || perc >= 90 { - s.logger.Info(fmt.Sprintf("WARN: %d%% of max-values-per-tag limit exceeded: (%d/%d), db=%s shard=%d measurement=%s tag=%s", - perc, n, s.options.Config.MaxValuesPerTag, s.database, s.id, name, k)) - } - return nil - }) - } - } - } +func (s *Shard) TagKeyCardinality(name, key []byte) int { + return s.engine.TagKeyCardinality(name, key) } type ShardGroup interface { diff --git a/tsdb/store.go b/tsdb/store.go index 443001df962..cb4aafe2725 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -145,6 +145,8 @@ func (s *Store) Open() error { } s.opened = true + s.wg.Add(1) + go s.monitorShards() return nil } @@ -158,6 +160,13 @@ func (s *Store) loadShards() error { t := limiter.NewFixed(runtime.GOMAXPROCS(0)) + // Setup a shared limiter for compactions + lim := s.EngineOptions.Config.MaxConcurrentCompactions + if lim == 0 { + lim = runtime.GOMAXPROCS(0) + } + s.EngineOptions.CompactionLimiter = limiter.NewFixed(lim) + resC := make(chan *res) var n int @@ -224,6 +233,9 @@ func (s *Store) loadShards() error { // Open engine. shard := NewShard(shardID, path, walPath, opt) + + // Disable compactions, writes and queries until all shards are loaded + shard.EnableOnOpen = false shard.WithLogger(s.baseLogger) err = shard.Open() @@ -251,6 +263,15 @@ func (s *Store) loadShards() error { s.databases[res.s.database] = struct{}{} } close(resC) + + // Enable all shards + for _, sh := range s.shards { + sh.SetEnabled(true) + if sh.IsIdle() { + sh.SetCompactionsEnabled(false) + } + } + return nil } @@ -1030,6 +1051,69 @@ func (s *Store) TagValues(database string, cond influxql.Expr) ([]TagValues, err return tagValues, nil } +func (s *Store) monitorShards() { + defer s.wg.Done() + t := time.NewTicker(10 * time.Second) + defer t.Stop() + t2 := time.NewTicker(time.Minute) + defer t2.Stop() + for { + select { + case <-s.closing: + return + case <-t.C: + s.mu.RLock() + for _, sh := range s.shards { + if sh.IsIdle() { + sh.SetCompactionsEnabled(false) + } else { + sh.SetCompactionsEnabled(true) + } + } + s.mu.RUnlock() + case <-t2.C: + if s.EngineOptions.Config.MaxValuesPerTag == 0 { + continue + } + + s.mu.RLock() + shards := s.filterShards(func(sh *Shard) bool { + return sh.IndexType() == "inmem" + }) + s.mu.RUnlock() + + s.walkShards(shards, func(sh *Shard) error { + db := sh.database + id := sh.id + + names, err := sh.MeasurementNamesByExpr(nil) + if err != nil { + s.Logger.Warn("cannot retrieve measurement names", zap.Error(err)) + return nil + } + + for _, name := range names { + sh.ForEachMeasurementTagKey(name, func(k []byte) error { + n := sh.TagKeyCardinality(name, k) + perc := int(float64(n) / float64(s.EngineOptions.Config.MaxValuesPerTag) * 100) + if perc > 100 { + perc = 100 + } + + // Log at 80, 85, 90-100% levels + if perc == 80 || perc == 85 || perc >= 90 { + s.Logger.Info(fmt.Sprintf("WARN: %d%% of max-values-per-tag limit exceeded: (%d/%d), db=%s shard=%d measurement=%s tag=%s", + perc, n, s.EngineOptions.Config.MaxValuesPerTag, db, id, name, k)) + } + return nil + }) + } + return nil + }) + } + } +} + // KeyValue holds a string key and a string value. type KeyValue struct { Key, Value string