Skip to content

Commit

Permalink
Merge pull request #8348 from influxdata/jw-tsm-compaction-limit
Browse files Browse the repository at this point in the history
Compaction limits
  • Loading branch information
jwilder authored May 4, 2017
2 parents 72df1fe + fc34d30 commit 23af70a
Show file tree
Hide file tree
Showing 17 changed files with 438 additions and 243 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ The admin UI is removed and unusable in this release. The `[admin]` configuratio
- [#8302](https://github.com/influxdata/influxdb/pull/8302): Write throughput/concurrency improvements
- [#8273](https://github.com/influxdata/influxdb/issues/8273): Remove the admin UI.
- [#8327](https://github.com/influxdata/influxdb/pull/8327): Update to go1.8.1
- [#8348](https://github.com/influxdata/influxdb/pull/8348): Add max concurrent compaction limits

### Bugfixes

Expand Down Expand Up @@ -54,6 +55,8 @@ The admin UI is removed and unusable in this release. The `[admin]` configuratio
- [#8315](https://github.com/influxdata/influxdb/issues/8315): Remove default upper time bound on DELETE queries.
- [#8066](https://github.com/influxdata/influxdb/issues/8066): Fix LIMIT and OFFSET for certain aggregate queries.
- [#8045](https://github.com/influxdata/influxdb/issues/8045): Refactor the subquery code and fix outer condition queries.
- [#7425](https://github.com/influxdata/influxdb/issues/7425): Fix compaction aborted log messages
- [#8123](https://github.com/influxdata/influxdb/issues/8123): TSM compaction does not remove .tmp on error

## v1.2.3 [unreleased]

Expand Down
5 changes: 5 additions & 0 deletions etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@
# write or delete
# compact-full-write-cold-duration = "4h"

# The maximum number of concurrent full and level compactions that can run at one time. A
# value of 0 results in runtime.GOMAXPROCS(0) used at runtime. This setting does not apply
# to cache snapshotting.
# max-concurrent-compactions = 0

# The maximum series allowed per database before writes are dropped. This limit can prevent
# high cardinality issues at the database level. This limit can be disabled by setting it to
# 0.
Expand Down
20 changes: 18 additions & 2 deletions tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ const (

// DefaultMaxValuesPerTag is the maximum number of values a tag can have within a measurement.
DefaultMaxValuesPerTag = 100000

// DefaultMaxConcurrentCompactions is the maximum number of concurrent full and level compactions
// that can run at one time. A value of results in runtime.GOMAXPROCS(0) used at runtime.
DefaultMaxConcurrentCompactions = 0
)

// Config holds the configuration for the tsbd package.
Expand Down Expand Up @@ -84,6 +88,12 @@ type Config struct {
// A value of 0 disables the limit.
MaxValuesPerTag int `toml:"max-values-per-tag"`

// MaxConcurrentCompactions is the maximum number of concurrent level and full compactions
// that can be running at one time across all shards. Compactions scheduled to run when the
// limit is reached are blocked until a running compaction completes. Snapshot compactions are
// not affected by this limit. A value of 0 limits compactions to runtime.GOMAXPROCS(0).
MaxConcurrentCompactions int `toml:"max-concurrent-compactions"`

TraceLoggingEnabled bool `toml:"trace-logging-enabled"`
}

Expand All @@ -100,8 +110,9 @@ func NewConfig() Config {
CacheSnapshotWriteColdDuration: toml.Duration(DefaultCacheSnapshotWriteColdDuration),
CompactFullWriteColdDuration: toml.Duration(DefaultCompactFullWriteColdDuration),

MaxSeriesPerDatabase: DefaultMaxSeriesPerDatabase,
MaxValuesPerTag: DefaultMaxValuesPerTag,
MaxSeriesPerDatabase: DefaultMaxSeriesPerDatabase,
MaxValuesPerTag: DefaultMaxValuesPerTag,
MaxConcurrentCompactions: DefaultMaxConcurrentCompactions,

TraceLoggingEnabled: false,
}
Expand All @@ -115,6 +126,10 @@ func (c *Config) Validate() error {
return errors.New("Data.WALDir must be specified")
}

if c.MaxConcurrentCompactions < 0 {
return errors.New("max-concurrent-compactions must be greater than 0")
}

valid := false
for _, e := range RegisteredEngines() {
if e == c.Engine {
Expand Down Expand Up @@ -152,5 +167,6 @@ func (c Config) Diagnostics() (*diagnostics.Diagnostics, error) {
"compact-full-write-cold-duration": c.CompactFullWriteColdDuration,
"max-series-per-database": c.MaxSeriesPerDatabase,
"max-values-per-tag": c.MaxValuesPerTag,
"max-concurrent-compactions": c.MaxConcurrentCompactions,
}), nil
}
14 changes: 10 additions & 4 deletions tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/estimator"
"github.com/influxdata/influxdb/pkg/limiter"
"github.com/uber-go/zap"
)

Expand All @@ -30,6 +31,8 @@ type Engine interface {
Open() error
Close() error
SetEnabled(enabled bool)
SetCompactionsEnabled(enabled bool)

WithLogger(zap.Logger)

LoadMetadataIndex(shardID uint64, index Index) error
Expand Down Expand Up @@ -71,6 +74,8 @@ type Engine interface {
// Statistics will return statistics relevant to this engine.
Statistics(tags map[string]string) []models.Statistic
LastModified() time.Time
DiskSize() int64
IsIdle() bool

io.WriterTo
}
Expand Down Expand Up @@ -136,10 +141,11 @@ func NewEngine(id uint64, i Index, path string, walPath string, options EngineOp

// EngineOptions represents the options used to initialize the engine.
type EngineOptions struct {
EngineVersion string
IndexVersion string
ShardID uint64
InmemIndex interface{} // shared in-memory index
EngineVersion string
IndexVersion string
ShardID uint64
InmemIndex interface{} // shared in-memory index
CompactionLimiter limiter.Fixed

Config Config
}
Expand Down
6 changes: 3 additions & 3 deletions tsdb/engine/tsm1/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (c *Cache) Write(key string, values []Value) error {

// Enough room in the cache?
limit := c.maxSize
n := c.Size() + atomic.LoadUint64(&c.snapshotSize) + addedSize
n := c.Size() + addedSize

if limit > 0 && n > limit {
atomic.AddInt64(&c.stats.WriteErr, 1)
Expand Down Expand Up @@ -293,7 +293,7 @@ func (c *Cache) WriteMulti(values map[string][]Value) error {

// Enough room in the cache?
limit := c.maxSize // maxSize is safe for reading without a lock.
n := c.Size() + atomic.LoadUint64(&c.snapshotSize) + addedSize
n := c.Size() + addedSize
if limit > 0 && n > limit {
atomic.AddInt64(&c.stats.WriteErr, 1)
return ErrCacheMemorySizeLimitExceeded(n, limit)
Expand Down Expand Up @@ -416,7 +416,7 @@ func (c *Cache) ClearSnapshot(success bool) {

// Size returns the number of point-calcuated bytes the cache currently uses.
func (c *Cache) Size() uint64 {
return atomic.LoadUint64(&c.size)
return atomic.LoadUint64(&c.size) + atomic.LoadUint64(&c.snapshotSize)
}

// increaseSize increases size by delta.
Expand Down
2 changes: 1 addition & 1 deletion tsdb/engine/tsm1/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ func TestCache_Snapshot_Stats(t *testing.T) {
}

// Store size should have been reset.
if got, exp := c.Size(), uint64(0); got != exp {
if got, exp := c.Size(), uint64(16); got != exp {
t.Fatalf("got %v, expected %v", got, exp)
}

Expand Down
89 changes: 80 additions & 9 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,22 @@ type CompactionPlanner interface {
Plan(lastWrite time.Time) []CompactionGroup
PlanLevel(level int) []CompactionGroup
PlanOptimize() []CompactionGroup
Release(group []CompactionGroup)
FullyCompacted() bool
}

// DefaultPlanner implements CompactionPlanner using a strategy to roll up
// multiple generations of TSM files into larger files in stages. It attempts
// to minimize the number of TSM files on disk while rolling up a bounder number
// of files.
type DefaultPlanner struct {
FileStore interface {
Stats() []FileStat
LastModified() time.Time
BlockCount(path string, idx int) int
}
FileStore fileStore

// CompactFullWriteColdDuration specifies the length of time after
// compactFullWriteColdDuration specifies the length of time after
// which if no writes have been committed to the WAL, the engine will
// do a full compaction of the TSM files in this shard. This duration
// should always be greater than the CacheFlushWriteColdDuraion
CompactFullWriteColdDuration time.Duration
compactFullWriteColdDuration time.Duration

// lastPlanCheck is the last time Plan was called
lastPlanCheck time.Time
Expand All @@ -81,6 +79,24 @@ type DefaultPlanner struct {

// lastGenerations is the last set of generations found by findGenerations
lastGenerations tsmGenerations

// filesInUse is the set of files that have been returned as part of a plan and might
// be being compacted. Two plans should not return the same file at any given time.
filesInUse map[string]struct{}
}

type fileStore interface {
Stats() []FileStat
LastModified() time.Time
BlockCount(path string, idx int) int
}

func NewDefaultPlanner(fs fileStore, writeColdDuration time.Duration) *DefaultPlanner {
return &DefaultPlanner{
FileStore: fs,
compactFullWriteColdDuration: writeColdDuration,
filesInUse: make(map[string]struct{}),
}
}

// tsmGeneration represents the TSM files within a generation.
Expand Down Expand Up @@ -129,6 +145,12 @@ func (t *tsmGeneration) hasTombstones() bool {
return false
}

// FullyCompacted returns true if the shard is fully compacted.
func (c *DefaultPlanner) FullyCompacted() bool {
gens := c.findGenerations()
return len(gens) <= 1 && !gens.hasTombstones()
}

// PlanLevel returns a set of TSM files to rewrite for a specific level.
func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup {
// Determine the generations from all files on disk. We need to treat
Expand Down Expand Up @@ -205,6 +227,10 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup {
}
}

if !c.acquire(cGroups) {
return nil
}

return cGroups
}

Expand Down Expand Up @@ -270,6 +296,10 @@ func (c *DefaultPlanner) PlanOptimize() []CompactionGroup {
cGroups = append(cGroups, cGroup)
}

if !c.acquire(cGroups) {
return nil
}

return cGroups
}

Expand All @@ -279,7 +309,7 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {
generations := c.findGenerations()

// first check if we should be doing a full compaction because nothing has been written in a long time
if c.CompactFullWriteColdDuration > 0 && time.Since(lastWrite) > c.CompactFullWriteColdDuration && len(generations) > 1 {
if c.compactFullWriteColdDuration > 0 && time.Since(lastWrite) > c.compactFullWriteColdDuration && len(generations) > 1 {
var tsmFiles []string
var genCount int
for i, group := range generations {
Expand Down Expand Up @@ -316,7 +346,11 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {
return nil
}

return []CompactionGroup{tsmFiles}
group := []CompactionGroup{tsmFiles}
if !c.acquire(group) {
return nil
}
return group
}

// don't plan if nothing has changed in the filestore
Expand Down Expand Up @@ -449,6 +483,9 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {
tsmFiles = append(tsmFiles, cGroup)
}

if !c.acquire(tsmFiles) {
return nil
}
return tsmFiles
}

Expand Down Expand Up @@ -496,6 +533,40 @@ func (c *DefaultPlanner) findGenerations() tsmGenerations {
return orderedGenerations
}

func (c *DefaultPlanner) acquire(groups []CompactionGroup) bool {
c.mu.Lock()
defer c.mu.Unlock()

// See if the new files are already in use
for _, g := range groups {
for _, f := range g {
if _, ok := c.filesInUse[f]; ok {
return false
}
}
}

// Mark all the new files in use
for _, g := range groups {
for _, f := range g {
c.filesInUse[f] = struct{}{}
}
}
return true
}

// Release removes the files reference in each compaction group allowing new plans
// to be able to use them.
func (c *DefaultPlanner) Release(groups []CompactionGroup) {
c.mu.Lock()
defer c.mu.Unlock()
for _, g := range groups {
for _, f := range g {
delete(c.filesInUse, f)
}
}
}

// Compactor merges multiple TSM files into new files or
// writes a Cache into 1 or more TSM files.
type Compactor struct {
Expand Down
Loading

0 comments on commit 23af70a

Please sign in to comment.