From 4e582f297a0d093babf53ea82f8835a69115ab10 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 23 May 2017 12:05:47 -0600 Subject: [PATCH 1/3] Fix race in findGenerations It was possible that the findGenerations could get stuck returning no files even when generations existed on disk. --- tsdb/engine/tsm1/compact.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 2e2dfe25cfd..1afd460b1fe 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -492,10 +492,11 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup { // findGenerations groups all the TSM files by generation based // on their filename, then returns the generations in descending order (newest first). func (c *DefaultPlanner) findGenerations() tsmGenerations { - c.mu.RLock() + c.mu.Lock() + defer c.mu.Unlock() + last := c.lastFindGenerations lastGen := c.lastGenerations - c.mu.RUnlock() if !last.IsZero() && c.FileStore.LastModified().Equal(last) { return lastGen @@ -525,10 +526,8 @@ func (c *DefaultPlanner) findGenerations() tsmGenerations { sort.Sort(orderedGenerations) } - c.mu.Lock() c.lastFindGenerations = genTime c.lastGenerations = orderedGenerations - c.mu.Unlock() return orderedGenerations } From bd6d0681e942d59e95f600aa8e402ef36cf6dab5 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 23 May 2017 12:08:25 -0600 Subject: [PATCH 2/3] Ensure planned files are released The defer was never executed because the planning happens in a long running goroutine that loops. The plans need to be released immediately after applying them. --- tsdb/engine/tsm1/engine.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index c2e894ac58d..0e32362e2ce 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -1181,10 +1181,9 @@ func (e *Engine) compactTSMLevel(fast bool, level int, quit <-chan struct{}) { case <-t.C: s := e.levelCompactionStrategy(fast, level) if s != nil { - // Release the files in the compaction plan - defer e.CompactionPlan.Release(s.compactionGroups) - s.Apply() + // Release the files in the compaction plan + e.CompactionPlan.Release(s.compactionGroups) } } @@ -1203,9 +1202,9 @@ func (e *Engine) compactTSMFull(quit <-chan struct{}) { case <-t.C: s := e.fullCompactionStrategy() if s != nil { - // Release the files in the compaction plan - defer e.CompactionPlan.Release(s.compactionGroups) s.Apply() + // Release the files in the compaction plan + e.CompactionPlan.Release(s.compactionGroups) } } From 29e4287fd29e7bfb2d0c1c51322432af701225fb Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 23 May 2017 12:09:36 -0600 Subject: [PATCH 3/3] Preven masking root errors when compactions are in progress The root error when creating a tmp file when writing a snapshot was hidden making it difficult to determine why snapshots were failing. --- tsdb/engine/tsm1/compact.go | 29 ++++++++++++++++++++--------- tsdb/engine/tsm1/engine.go | 5 +++-- 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 1afd460b1fe..9ab37139ee6 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -37,13 +37,24 @@ const ( ) var ( - errMaxFileExceeded = fmt.Errorf("max file exceeded") - errSnapshotsDisabled = fmt.Errorf("snapshots disabled") - errCompactionsDisabled = fmt.Errorf("compactions disabled") - errCompactionAborted = fmt.Errorf("compaction aborted") - errCompactionInProgress = fmt.Errorf("compaction in progress") + errMaxFileExceeded = fmt.Errorf("max file exceeded") + errSnapshotsDisabled = fmt.Errorf("snapshots disabled") + errCompactionsDisabled = fmt.Errorf("compactions disabled") + errCompactionAborted = fmt.Errorf("compaction aborted") ) +type errCompactionInProgress struct { + err error +} + +// Error returns the string representation of the error, to satisfy the error interface. +func (e errCompactionInProgress) Error() string { + if e.err != nil { + return fmt.Sprintf("compaction in progress: %s", e.err) + } + return "compaction in progress" +} + // CompactionGroup represents a list of files eligible to be compacted together. type CompactionGroup []string @@ -725,7 +736,7 @@ func (c *Compactor) CompactFull(tsmFiles []string) ([]string, error) { } if !c.add(tsmFiles) { - return nil, errCompactionInProgress + return nil, errCompactionInProgress{} } defer c.remove(tsmFiles) @@ -754,7 +765,7 @@ func (c *Compactor) CompactFast(tsmFiles []string) ([]string, error) { } if !c.add(tsmFiles) { - return nil, errCompactionInProgress + return nil, errCompactionInProgress{} } defer c.remove(tsmFiles) @@ -799,7 +810,7 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([ return nil, err } break - } else if err == errCompactionInProgress { + } else if _, ok := err.(errCompactionInProgress); ok { // Don't clean up the file as another compaction is using it. This should not happen as the // planner keeps track of which files are assigned to compaction plans now. return nil, err @@ -821,7 +832,7 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([ func (c *Compactor) write(path string, iter KeyIterator) (err error) { fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666) if err != nil { - return errCompactionInProgress + return errCompactionInProgress{err: err} } // Create the write for the new TSM file. diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 0e32362e2ce..3c152ec089c 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -1290,10 +1290,11 @@ func (s *compactionStrategy) compactGroup(groupNum int) { }() if err != nil { - if err == errCompactionsDisabled || err == errCompactionInProgress { + _, inProgress := err.(errCompactionInProgress) + if err == errCompactionsDisabled || inProgress { s.logger.Info(fmt.Sprintf("aborted %s compaction group (%d). %v", s.description, groupNum, err)) - if err == errCompactionInProgress { + if _, ok := err.(errCompactionInProgress); ok { time.Sleep(time.Second) } return