Skip to content

Commit

Permalink
compactor for compacting boltdb files uploaded by shipper
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeepsukhani committed Aug 24, 2020
1 parent 97dfb29 commit 026cd41
Show file tree
Hide file tree
Showing 13 changed files with 850 additions and 45 deletions.
6 changes: 6 additions & 0 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"fmt"
"net/http"

"github.com/grafana/loki/pkg/storage/stores/shipper/compactor"

"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/modules"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -61,6 +63,7 @@ type Config struct {
RuntimeConfig runtimeconfig.ManagerConfig `yaml:"runtime_config,omitempty"`
MemberlistKV memberlist.KVConfig `yaml:"memberlist"`
Tracing tracing.Config `yaml:"tracing"`
CompactorConfig compactor.Config `yaml:"compactor,omitempty"`
}

// RegisterFlags registers flag.
Expand Down Expand Up @@ -135,6 +138,7 @@ type Loki struct {
stopper queryrange.Stopper
runtimeConfig *runtimeconfig.Manager
memberlistKV *memberlist.KVInitService
compactor *compactor.Compactor

httpAuthMiddleware middleware.Interface
}
Expand Down Expand Up @@ -305,6 +309,7 @@ func (t *Loki) setupModuleManager() error {
mm.RegisterModule(Querier, t.initQuerier)
mm.RegisterModule(QueryFrontend, t.initQueryFrontend)
mm.RegisterModule(TableManager, t.initTableManager)
mm.RegisterModule(Compactor, t.initCompactor)
mm.RegisterModule(All, nil)

// Add dependencies
Expand All @@ -317,6 +322,7 @@ func (t *Loki) setupModuleManager() error {
Querier: {Store, Ring, Server},
QueryFrontend: {Server, Overrides},
TableManager: {Server},
Compactor: {Server},
All: {Querier, Ingester, Distributor, TableManager},
}

Expand Down
13 changes: 13 additions & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"os"
"time"

"github.com/grafana/loki/pkg/storage/stores/shipper/compactor"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/cortexproject/cortex/pkg/chunk/storage"
Expand Down Expand Up @@ -55,6 +57,7 @@ const (
Store string = "store"
TableManager string = "table-manager"
MemberlistKV string = "memberlist-kv"
Compactor string = "compactor"
All string = "all"
)

Expand Down Expand Up @@ -360,6 +363,16 @@ func (t *Loki) initMemberlistKV() (services.Service, error) {
return t.memberlistKV, nil
}

func (t *Loki) initCompactor() (services.Service, error) {
var err error
t.compactor, err = compactor.NewCompactor(t.cfg.CompactorConfig, t.cfg.StorageConfig.Config)
if err != nil {
return nil, err
}

return t.compactor, nil
}

func calculateMaxLookBack(pc chunk.PeriodConfig, maxLookBackConfig, maxChunkAge time.Duration) (time.Duration, error) {
if pc.ObjectType != shipper.FilesystemObjectStoreType && maxLookBackConfig.Nanoseconds() != 0 {
return 0, errors.New("it is an error to specify a non zero `query_store_max_look_back_period` value when using any object store other than `filesystem`")
Expand Down
91 changes: 91 additions & 0 deletions pkg/storage/stores/shipper/compactor/compactor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package compactor

import (
"context"
"flag"
"path/filepath"
"strings"
"time"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/storage"
chunk_util "github.com/cortexproject/cortex/pkg/chunk/util"
pkg_util "github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/go-kit/kit/log/level"

"github.com/grafana/loki/pkg/storage/stores/shipper"
"github.com/grafana/loki/pkg/storage/stores/util"
)

type Config struct {
WorkingDirectory string `yaml:"working_directory"`
SharedStoreType string `yaml:"shared_store"`
}

// RegisterFlags registers flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.WorkingDirectory, "working-directory", "", "Directory where files can be downloaded for compaction.")
f.StringVar(&cfg.SharedStoreType, "shared-store", "", "Shared store used for storing boltdb files. Supported types: gcs, s3, azure, swift, filesystem")
}

type Compactor struct {
services.Service

cfg Config
objectClient chunk.ObjectClient
}

func NewCompactor(cfg Config, storageConfig storage.Config) (*Compactor, error) {
objectClient, err := storage.NewObjectClient(cfg.SharedStoreType, storageConfig)
if err != nil {
return nil, err
}

err = chunk_util.EnsureDirectory(cfg.WorkingDirectory)
if err != nil {
return nil, err
}

compactor := Compactor{
cfg: cfg,
objectClient: util.NewPrefixedObjectClient(objectClient, shipper.StorageKeyPrefix),
}

compactor.Service = services.NewTimerService(4*time.Hour, nil, compactor.Run, nil)
return &compactor, nil
}

func (c *Compactor) Run(ctx context.Context) error {
_, dirs, err := c.objectClient.List(ctx, "")
if err != nil {
return err
}

tables := make([]string, len(dirs))
for i, dir := range dirs {
tables[i] = strings.TrimSuffix(string(dir), "/")
}

for _, tableName := range tables {
table, err := newTable(ctx, filepath.Join(c.cfg.WorkingDirectory, tableName), c.objectClient)
if err != nil {
level.Error(pkg_util.Logger).Log("msg", "failed to initialize table for compaction", "err", err)
continue
}

err = table.compact()
if err != nil {
level.Error(pkg_util.Logger).Log("msg", "failed to compact files", "err", err)
}

// check if context was cancelled before going for next table.
select {
case <-ctx.Done():
return nil
default:
}
}

return nil
}
Loading

0 comments on commit 026cd41

Please sign in to comment.