Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compaction planning fixes #8420

Merged
merged 3 commits into from
May 23, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 23 additions & 13 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,24 @@ const (
)

var (
errMaxFileExceeded = fmt.Errorf("max file exceeded")
errSnapshotsDisabled = fmt.Errorf("snapshots disabled")
errCompactionsDisabled = fmt.Errorf("compactions disabled")
errCompactionAborted = fmt.Errorf("compaction aborted")
errCompactionInProgress = fmt.Errorf("compaction in progress")
errMaxFileExceeded = fmt.Errorf("max file exceeded")
errSnapshotsDisabled = fmt.Errorf("snapshots disabled")
errCompactionsDisabled = fmt.Errorf("compactions disabled")
errCompactionAborted = fmt.Errorf("compaction aborted")
)

type errCompactionInProgress struct {
err error
}

// Error returns the string representation of the error, to satisfy the error interface.
func (e errCompactionInProgress) Error() string {
if e.err != nil {
return fmt.Sprintf("compaction in progress: %s", e.err)
}
return "compaction in progress"
}

// CompactionGroup represents a list of files eligible to be compacted together.
type CompactionGroup []string

Expand Down Expand Up @@ -492,10 +503,11 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {
// findGenerations groups all the TSM files by generation based
// on their filename, then returns the generations in descending order (newest first).
func (c *DefaultPlanner) findGenerations() tsmGenerations {
c.mu.RLock()
c.mu.Lock()
defer c.mu.Unlock()

last := c.lastFindGenerations
lastGen := c.lastGenerations
c.mu.RUnlock()

if !last.IsZero() && c.FileStore.LastModified().Equal(last) {
return lastGen
Expand Down Expand Up @@ -525,10 +537,8 @@ func (c *DefaultPlanner) findGenerations() tsmGenerations {
sort.Sort(orderedGenerations)
}

c.mu.Lock()
c.lastFindGenerations = genTime
c.lastGenerations = orderedGenerations
c.mu.Unlock()

return orderedGenerations
}
Expand Down Expand Up @@ -726,7 +736,7 @@ func (c *Compactor) CompactFull(tsmFiles []string) ([]string, error) {
}

if !c.add(tsmFiles) {
return nil, errCompactionInProgress
return nil, errCompactionInProgress{}
}
defer c.remove(tsmFiles)

Expand Down Expand Up @@ -755,7 +765,7 @@ func (c *Compactor) CompactFast(tsmFiles []string) ([]string, error) {
}

if !c.add(tsmFiles) {
return nil, errCompactionInProgress
return nil, errCompactionInProgress{}
}
defer c.remove(tsmFiles)

Expand Down Expand Up @@ -800,7 +810,7 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([
return nil, err
}
break
} else if err == errCompactionInProgress {
} else if _, ok := err.(errCompactionInProgress); ok {
// Don't clean up the file as another compaction is using it. This should not happen as the
// planner keeps track of which files are assigned to compaction plans now.
return nil, err
Expand All @@ -822,7 +832,7 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([
func (c *Compactor) write(path string, iter KeyIterator) (err error) {
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666)
if err != nil {
return errCompactionInProgress
return errCompactionInProgress{err: err}
}

// Create the write for the new TSM file.
Expand Down
14 changes: 7 additions & 7 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1181,10 +1181,9 @@ func (e *Engine) compactTSMLevel(fast bool, level int, quit <-chan struct{}) {
case <-t.C:
s := e.levelCompactionStrategy(fast, level)
if s != nil {
// Release the files in the compaction plan
defer e.CompactionPlan.Release(s.compactionGroups)

s.Apply()
// Release the files in the compaction plan
e.CompactionPlan.Release(s.compactionGroups)
}

}
Expand All @@ -1203,9 +1202,9 @@ func (e *Engine) compactTSMFull(quit <-chan struct{}) {
case <-t.C:
s := e.fullCompactionStrategy()
if s != nil {
// Release the files in the compaction plan
defer e.CompactionPlan.Release(s.compactionGroups)
s.Apply()
// Release the files in the compaction plan
e.CompactionPlan.Release(s.compactionGroups)
}

}
Expand Down Expand Up @@ -1291,10 +1290,11 @@ func (s *compactionStrategy) compactGroup(groupNum int) {
}()

if err != nil {
if err == errCompactionsDisabled || err == errCompactionInProgress {
_, inProgress := err.(errCompactionInProgress)
if err == errCompactionsDisabled || inProgress {
s.logger.Info(fmt.Sprintf("aborted %s compaction group (%d). %v", s.description, groupNum, err))

if err == errCompactionInProgress {
if _, ok := err.(errCompactionInProgress); ok {
time.Sleep(time.Second)
}
return
Expand Down