diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 2e2dfe25cfd..9ab37139ee6 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -37,13 +37,24 @@ const ( ) var ( - errMaxFileExceeded = fmt.Errorf("max file exceeded") - errSnapshotsDisabled = fmt.Errorf("snapshots disabled") - errCompactionsDisabled = fmt.Errorf("compactions disabled") - errCompactionAborted = fmt.Errorf("compaction aborted") - errCompactionInProgress = fmt.Errorf("compaction in progress") + errMaxFileExceeded = fmt.Errorf("max file exceeded") + errSnapshotsDisabled = fmt.Errorf("snapshots disabled") + errCompactionsDisabled = fmt.Errorf("compactions disabled") + errCompactionAborted = fmt.Errorf("compaction aborted") ) +type errCompactionInProgress struct { + err error +} + +// Error returns the string representation of the error, to satisfy the error interface. +func (e errCompactionInProgress) Error() string { + if e.err != nil { + return fmt.Sprintf("compaction in progress: %s", e.err) + } + return "compaction in progress" +} + // CompactionGroup represents a list of files eligible to be compacted together. type CompactionGroup []string @@ -492,10 +503,11 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup { // findGenerations groups all the TSM files by generation based // on their filename, then returns the generations in descending order (newest first). func (c *DefaultPlanner) findGenerations() tsmGenerations { - c.mu.RLock() + c.mu.Lock() + defer c.mu.Unlock() + last := c.lastFindGenerations lastGen := c.lastGenerations - c.mu.RUnlock() if !last.IsZero() && c.FileStore.LastModified().Equal(last) { return lastGen @@ -525,10 +537,8 @@ func (c *DefaultPlanner) findGenerations() tsmGenerations { sort.Sort(orderedGenerations) } - c.mu.Lock() c.lastFindGenerations = genTime c.lastGenerations = orderedGenerations - c.mu.Unlock() return orderedGenerations } @@ -726,7 +736,7 @@ func (c *Compactor) CompactFull(tsmFiles []string) ([]string, error) { } if !c.add(tsmFiles) { - return nil, errCompactionInProgress + return nil, errCompactionInProgress{} } defer c.remove(tsmFiles) @@ -755,7 +765,7 @@ func (c *Compactor) CompactFast(tsmFiles []string) ([]string, error) { } if !c.add(tsmFiles) { - return nil, errCompactionInProgress + return nil, errCompactionInProgress{} } defer c.remove(tsmFiles) @@ -800,7 +810,7 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([ return nil, err } break - } else if err == errCompactionInProgress { + } else if _, ok := err.(errCompactionInProgress); ok { // Don't clean up the file as another compaction is using it. This should not happen as the // planner keeps track of which files are assigned to compaction plans now. return nil, err @@ -822,7 +832,7 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([ func (c *Compactor) write(path string, iter KeyIterator) (err error) { fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666) if err != nil { - return errCompactionInProgress + return errCompactionInProgress{err: err} } // Create the write for the new TSM file. diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index c2e894ac58d..3c152ec089c 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -1181,10 +1181,9 @@ func (e *Engine) compactTSMLevel(fast bool, level int, quit <-chan struct{}) { case <-t.C: s := e.levelCompactionStrategy(fast, level) if s != nil { - // Release the files in the compaction plan - defer e.CompactionPlan.Release(s.compactionGroups) - s.Apply() + // Release the files in the compaction plan + e.CompactionPlan.Release(s.compactionGroups) } } @@ -1203,9 +1202,9 @@ func (e *Engine) compactTSMFull(quit <-chan struct{}) { case <-t.C: s := e.fullCompactionStrategy() if s != nil { - // Release the files in the compaction plan - defer e.CompactionPlan.Release(s.compactionGroups) s.Apply() + // Release the files in the compaction plan + e.CompactionPlan.Release(s.compactionGroups) } } @@ -1291,10 +1290,11 @@ func (s *compactionStrategy) compactGroup(groupNum int) { }() if err != nil { - if err == errCompactionsDisabled || err == errCompactionInProgress { + _, inProgress := err.(errCompactionInProgress) + if err == errCompactionsDisabled || inProgress { s.logger.Info(fmt.Sprintf("aborted %s compaction group (%d). %v", s.description, groupNum, err)) - if err == errCompactionInProgress { + if _, ok := err.(errCompactionInProgress); ok { time.Sleep(time.Second) } return