Skip to content

Commit

Permalink
reduce number of list calls on shared object store when using boltdb-…
Browse files Browse the repository at this point in the history
…shipper (#3283)
  • Loading branch information
sandeepsukhani authored Feb 12, 2021
1 parent 1cd37a3 commit 76e713f
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 5 deletions.
8 changes: 7 additions & 1 deletion pkg/storage/stores/shipper/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/loki/pkg/storage/stores/shipper"
shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util"
"github.com/grafana/loki/pkg/storage/stores/util"
)

Expand Down Expand Up @@ -66,9 +67,14 @@ func NewCompactor(cfg Config, storageConfig storage.Config, r prometheus.Registe
return nil, err
}

objectClient = util.NewPrefixedObjectClient(objectClient, shipper.StorageKeyPrefix)
if cfg.SharedStoreType != "filesystem" {
objectClient = shipper_util.NewCachedObjectClient(objectClient)
}

compactor := Compactor{
cfg: cfg,
objectClient: util.NewPrefixedObjectClient(objectClient, shipper.StorageKeyPrefix),
objectClient: objectClient,
metrics: newMetrics(r),
}

Expand Down
13 changes: 9 additions & 4 deletions pkg/storage/stores/shipper/shipper_index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/grafana/loki/pkg/storage/stores/shipper/downloads"
"github.com/grafana/loki/pkg/storage/stores/shipper/uploads"
shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util"
"github.com/grafana/loki/pkg/storage/stores/util"
)

Expand Down Expand Up @@ -101,7 +102,7 @@ func NewShipper(cfg Config, storageClient chunk.ObjectClient, registerer prometh
return &shipper, nil
}

func (s *Shipper) init(storageClient chunk.ObjectClient, registerer prometheus.Registerer) error {
func (s *Shipper) init(objectClient chunk.ObjectClient, registerer prometheus.Registerer) error {
// When we run with target querier we don't have ActiveIndexDirectory set so using CacheLocation instead.
// Also it doesn't matter which directory we use since BoltDBIndexClient doesn't do anything with it but it is good to have a valid path.
boltdbIndexClientDir := s.cfg.ActiveIndexDirectory
Expand All @@ -115,7 +116,11 @@ func (s *Shipper) init(storageClient chunk.ObjectClient, registerer prometheus.R
return err
}

prefixedObjectClient := util.NewPrefixedObjectClient(storageClient, StorageKeyPrefix)
objectClient = util.NewPrefixedObjectClient(objectClient, StorageKeyPrefix)

if s.cfg.SharedStoreType != "filesystem" {
objectClient = shipper_util.NewCachedObjectClient(objectClient)
}

if s.cfg.Mode != ModeReadOnly {
uploader, err := s.getUploaderName()
Expand All @@ -129,7 +134,7 @@ func (s *Shipper) init(storageClient chunk.ObjectClient, registerer prometheus.R
UploadInterval: UploadInterval,
DBRetainPeriod: s.cfg.ResyncInterval + 2*time.Minute,
}
uploadsManager, err := uploads.NewTableManager(cfg, s.boltDBIndexClient, prefixedObjectClient, registerer)
uploadsManager, err := uploads.NewTableManager(cfg, s.boltDBIndexClient, objectClient, registerer)
if err != nil {
return err
}
Expand All @@ -144,7 +149,7 @@ func (s *Shipper) init(storageClient chunk.ObjectClient, registerer prometheus.R
CacheTTL: s.cfg.CacheTTL,
QueryReadyNumDays: s.cfg.QueryReadyNumDays,
}
downloadsManager, err := downloads.NewTableManager(cfg, s.boltDBIndexClient, prefixedObjectClient, registerer)
downloadsManager, err := downloads.NewTableManager(cfg, s.boltDBIndexClient, objectClient, registerer)
if err != nil {
return err
}
Expand Down
111 changes: 111 additions & 0 deletions pkg/storage/stores/shipper/util/cached_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package util

import (
"context"
"fmt"
"strings"
"sync"
"time"

"github.com/cortexproject/cortex/pkg/chunk"
)

const (
delimiter = "/"
cacheTimeout = time.Minute
)

// CachedObjectClient is meant for reducing number of LIST calls on hosted object stores(S3, GCS, Azure Blob Storage and Swift).
// We as of now do a LIST call per table when we need to find its objects.
// CachedObjectClient does flat listing of objects which is only supported by hosted object stores mentioned above.
// In case of boltdb files stored by shipper, the listed objects would have keys like <table-name>/<filename>.
// For each List call without a prefix(which is actually done to get list of tables),
// CachedObjectClient would build a map of TableName -> chunk.StorageObject which would be used as a cache for subsequent List calls for getting list of objects for tables.
// Cache items are evicted after first read or a timeout. The cache is rebuilt during List call with empty prefix or we encounter a cache miss.
type CachedObjectClient struct {
chunk.ObjectClient
tables map[string][]chunk.StorageObject
tablesMtx sync.Mutex
cacheBuiltAt time.Time
}

func NewCachedObjectClient(downstreamClient chunk.ObjectClient) *CachedObjectClient {
return &CachedObjectClient{
ObjectClient: downstreamClient,
tables: map[string][]chunk.StorageObject{},
}
}

func (c *CachedObjectClient) List(ctx context.Context, prefix, _ string) ([]chunk.StorageObject, []chunk.StorageCommonPrefix, error) {
c.tablesMtx.Lock()
defer c.tablesMtx.Unlock()

if prefix == "" {
tables, err := c.listTables(ctx)
if err != nil {
return nil, nil, err
}

return []chunk.StorageObject{}, tables, nil
}

// While listing objects in a table, prefix is set to <table-name>+delimiter so trim the delimiter first.
tableName := strings.TrimSuffix(prefix, delimiter)
if strings.Contains(tableName, delimiter) {
return nil, nil, fmt.Errorf("invalid prefix %s for listing table objects", prefix)
}
tableObjects, err := c.listTableObjects(ctx, tableName)
if err != nil {
return nil, nil, err
}

return tableObjects, []chunk.StorageCommonPrefix{}, nil
}

// listTables assumes that tablesMtx is already locked by the caller
func (c *CachedObjectClient) listTables(ctx context.Context) ([]chunk.StorageCommonPrefix, error) {
// do a flat listing by setting delimiter to empty string
objects, _, err := c.ObjectClient.List(ctx, "", "")
if err != nil {
return nil, err
}

// build the cache and response containing just table names as chunk.StorageCommonPrefix
var tableNames []chunk.StorageCommonPrefix
for _, object := range objects {
ss := strings.Split(object.Key, delimiter)
if len(ss) != 2 {
return nil, fmt.Errorf("invalid object key found %s", object.Key)
}

if _, ok := c.tables[ss[0]]; !ok {
tableNames = append(tableNames, chunk.StorageCommonPrefix(ss[0]))
}
c.tables[ss[0]] = append(c.tables[ss[0]], object)
}

c.cacheBuiltAt = time.Now()

return tableNames, nil
}

// listTableObjects assumes that tablesMtx is already locked by the caller
func (c *CachedObjectClient) listTableObjects(ctx context.Context, tableName string) ([]chunk.StorageObject, error) {
objects, ok := c.tables[tableName]
if ok && c.cacheBuiltAt.Add(cacheTimeout).After(time.Now()) {
// evict the element read from cache
delete(c.tables, tableName)
return objects, nil
}

// requested element not found in the cache, rebuild the cache.
_, err := c.listTables(ctx)
if err != nil {
return nil, err
}

objects = c.tables[tableName]
// evict the element read from cache
delete(c.tables, tableName)
return objects, nil
}
80 changes: 80 additions & 0 deletions pkg/storage/stores/shipper/util/cached_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package util

import (
"context"
"testing"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/stretchr/testify/require"
)

type mockHostedObjectClient struct {
chunk.ObjectClient
objects []chunk.StorageObject
}

func (m mockHostedObjectClient) List(_ context.Context, _, _ string) ([]chunk.StorageObject, []chunk.StorageCommonPrefix, error) {
return m.objects, []chunk.StorageCommonPrefix{}, nil
}

func TestCachedObjectClient_List(t *testing.T) {
objectClient := mockHostedObjectClient{
objects: []chunk.StorageObject{
{
Key: "table1/obj1",
},
{
Key: "table1/obj2",
},
{
Key: "table2/obj1",
},
{
Key: "table2/obj2",
},
},
}

cachedObjectClient := NewCachedObjectClient(objectClient)

// list tables which should build the cache
_, tables, err := cachedObjectClient.List(context.Background(), "", "")
require.NoError(t, err)
require.Equal(t, []chunk.StorageCommonPrefix{"table1", "table2"}, tables)

// verify whether cache has right items
require.Len(t, cachedObjectClient.tables, 2)
require.Equal(t, objectClient.objects[:2], cachedObjectClient.tables["table1"])
require.Equal(t, objectClient.objects[2:], cachedObjectClient.tables["table2"])

// list table1 objects
objects, _, err := cachedObjectClient.List(context.Background(), "table1/", "")
require.NoError(t, err)
require.Equal(t, objectClient.objects[:2], objects)

// verify whether table1 got evicted
require.Len(t, cachedObjectClient.tables, 1)
require.Contains(t, cachedObjectClient.tables, "table2")

// list table2 objects
objects, _, err = cachedObjectClient.List(context.Background(), "table2/", "")
require.NoError(t, err)
require.Equal(t, objectClient.objects[2:], objects)

// verify whether table2 got evicted as well
require.Len(t, cachedObjectClient.tables, 0)

// list table1 again which should rebuild the cache
objects, _, err = cachedObjectClient.List(context.Background(), "table1/", "")
require.NoError(t, err)
require.Equal(t, objectClient.objects[:2], objects)

// verify whether cache was rebuilt and table1 got evicted already
require.Len(t, cachedObjectClient.tables, 1)
require.Contains(t, cachedObjectClient.tables, "table2")

// verify whether listing non-existing table should not error
objects, _, err = cachedObjectClient.List(context.Background(), "table3/", "")
require.NoError(t, err)
require.Len(t, objects, 0)
}

0 comments on commit 76e713f

Please sign in to comment.