Skip to content

Commit

Permalink
copy boltdb-shipper cache changes from PR #6054 to generic index ship…
Browse files Browse the repository at this point in the history
…per (#6316)
  • Loading branch information
sandeepsukhani authored Jun 6, 2022
1 parent de418bf commit 9a2df5a
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 21 deletions.
26 changes: 18 additions & 8 deletions pkg/storage/stores/indexshipper/downloads/index_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ const (
gzipExtension = ".gz"
)

var errIndexListCacheTooStale = fmt.Errorf("index list cache too stale")

type IndexSet interface {
Init() error
Init(forQuerying bool) error
Close()
ForEach(ctx context.Context, callback index.ForEachIndexCallback) error
DropAllDBs() error
Expand Down Expand Up @@ -87,7 +89,7 @@ func NewIndexSet(tableName, userID, cacheLocation string, baseIndexSet storage.I
}

// Init downloads all the db files for the table from object storage.
func (t *indexSet) Init() (err error) {
func (t *indexSet) Init(forQuerying bool) (err error) {
// Using background context to avoid cancellation of download when request times out.
// We would anyways need the files for serving next requests.
ctx, cancelFunc := context.WithTimeout(context.Background(), downloadTimeout)
Expand Down Expand Up @@ -142,7 +144,7 @@ func (t *indexSet) Init() (err error) {
level.Debug(logger).Log("msg", fmt.Sprintf("opened %d local files, now starting sync operation", len(t.index)))

// sync the table to get new files and remove the deleted ones from storage.
err = t.sync(ctx, false)
err = t.sync(ctx, false, forQuerying)
if err != nil {
return
}
Expand Down Expand Up @@ -241,14 +243,14 @@ func (t *indexSet) cleanupDB(fileName string) error {
}

func (t *indexSet) Sync(ctx context.Context) (err error) {
return t.sync(ctx, true)
return t.sync(ctx, true, false)
}

// sync downloads updated and new files from the storage relevant for the table and removes the deleted ones
func (t *indexSet) sync(ctx context.Context, lock bool) (err error) {
func (t *indexSet) sync(ctx context.Context, lock bool, bypassListCache bool) (err error) {
level.Debug(t.logger).Log("msg", fmt.Sprintf("syncing files for table %s", t.tableName))

toDownload, toDelete, err := t.checkStorageForUpdates(ctx, lock)
toDownload, toDelete, err := t.checkStorageForUpdates(ctx, lock, bypassListCache)
if err != nil {
return err
}
Expand All @@ -260,6 +262,14 @@ func (t *indexSet) sync(ctx context.Context, lock bool) (err error) {
return err
}

// if we did not bypass list cache and skipped downloading all the new files due to them being removed by compaction,
// it means the cache is not valid anymore since compaction would have happened after last index list cache refresh.
// Let us return error to ask the caller to re-run the sync after the list cache refresh.
if !bypassListCache && len(downloadedFiles) == 0 && len(toDownload) > 0 {
level.Error(t.logger).Log("msg", "we skipped downloading all the new files, possibly removed by compaction", "files", toDownload)
return errIndexListCacheTooStale
}

if lock {
err = t.indexMtx.lock(ctx)
if err != nil {
Expand Down Expand Up @@ -289,11 +299,11 @@ func (t *indexSet) sync(ctx context.Context, lock bool) (err error) {
}

// checkStorageForUpdates compares files from cache with storage and builds the list of files to be downloaded from storage and to be deleted from cache
func (t *indexSet) checkStorageForUpdates(ctx context.Context, lock bool) (toDownload []storage.IndexFile, toDelete []string, err error) {
func (t *indexSet) checkStorageForUpdates(ctx context.Context, lock, bypassListCache bool) (toDownload []storage.IndexFile, toDelete []string, err error) {
// listing tables from store
var files []storage.IndexFile

files, err = t.baseIndexSet.ListFiles(ctx, t.tableName, t.userID, false)
files, err = t.baseIndexSet.ListFiles(ctx, t.tableName, t.userID, bypassListCache)
if err != nil {
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func buildTestIndexSet(t *testing.T, userID, path string) (*indexSet, stopFunc)
}, util_log.Logger)
require.NoError(t, err)

require.NoError(t, idxSet.Init())
require.NoError(t, idxSet.Init(false))

return idxSet.(*indexSet), idxSet.Close
}
Expand Down
44 changes: 32 additions & 12 deletions pkg/storage/stores/indexshipper/downloads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func LoadTable(name, cacheLocation string, storageClient storage.Client, openInd
return nil, err
}

err = userIndexSet.Init()
err = userIndexSet.Init(false)
if err != nil {
return nil, err
}
Expand All @@ -122,7 +122,7 @@ func LoadTable(name, cacheLocation string, storageClient storage.Client, openInd
return nil, err
}

err = commonIndexSet.Init()
err = commonIndexSet.Init(false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -153,15 +153,7 @@ func (t *table) ForEach(ctx context.Context, userID string, callback index.ForEa
}

if indexSet.Err() != nil {
level.Error(util_log.WithContext(ctx, t.logger)).Log("msg", fmt.Sprintf("index set %s has some problem, cleaning it up", uid), "err", indexSet.Err())
if err := indexSet.DropAllDBs(); err != nil {
level.Error(t.logger).Log("msg", fmt.Sprintf("failed to cleanup broken index set %s", uid), "err", err)
}

t.indexSetsMtx.Lock()
delete(t.indexSets, userID)
t.indexSetsMtx.Unlock()

t.cleanupBrokenIndexSet(ctx, uid)
return indexSet.Err()
}

Expand Down Expand Up @@ -243,6 +235,15 @@ func (t *table) Sync(ctx context.Context) error {

for userID, indexSet := range t.indexSets {
if err := indexSet.Sync(ctx); err != nil {
if errors.Is(err, errIndexListCacheTooStale) {
level.Info(t.logger).Log("msg", "we have hit stale list cache, refreshing it and running sync again")
t.storageClient.RefreshIndexListCache(ctx)

err = indexSet.Sync(ctx)
if err == nil {
continue
}
}
return errors.Wrap(err, fmt.Sprintf("failed to sync index set %s for table %s", userID, t.name))
}
}
Expand Down Expand Up @@ -297,15 +298,34 @@ func (t *table) getOrCreateIndexSet(ctx context.Context, id string, forQuerying
}()
}

err := indexSet.Init()
err := indexSet.Init(forQuerying)
if err != nil {
level.Error(t.logger).Log("msg", fmt.Sprintf("failed to init user index set %s", id), "err", err)
t.cleanupBrokenIndexSet(ctx, id)
}
}()

return indexSet, nil
}

// cleanupBrokenIndexSet if an indexSet with given id exists and is really broken i.e Err() returns a non-nil error
func (t *table) cleanupBrokenIndexSet(ctx context.Context, id string) {
t.indexSetsMtx.Lock()
defer t.indexSetsMtx.Unlock()

indexSet, ok := t.indexSets[id]
if !ok || indexSet.Err() == nil {
return
}

level.Error(util_log.WithContext(ctx, t.logger)).Log("msg", fmt.Sprintf("index set %s has some problem, cleaning it up", id), "err", indexSet.Err())
if err := indexSet.DropAllDBs(); err != nil {
level.Error(t.logger).Log("msg", fmt.Sprintf("failed to cleanup broken index set %s", id), "err", err)
}

delete(t.indexSets, id)
}

// EnsureQueryReadiness ensures that we have downloaded the common index as well as user index for the provided userIDs.
// When ensuring query readiness for a table, we will always download common index set because it can include index for one of the provided user ids.
func (t *table) EnsureQueryReadiness(ctx context.Context, userIDs []string) error {
Expand Down
27 changes: 27 additions & 0 deletions pkg/storage/stores/indexshipper/downloads/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,33 @@ func TestTable_Sync(t *testing.T) {
_, ok := expectedFilesInDir[fileInfo.Name()]
require.True(t, ok)
}

// let us simulate a compaction to test stale index list cache handling

// first, let us add a new file and refresh the index list cache
oneMoreDB := "one-more-db"
require.NoError(t, ioutil.WriteFile(filepath.Join(tablePathInStorage, oneMoreDB), []byte(oneMoreDB), 0755))
table.storageClient.RefreshIndexListCache(context.Background())

// now, without syncing the table, let us compact the index in storage
compactedDBName := "compacted-db"
require.NoError(t, ioutil.WriteFile(filepath.Join(tablePathInStorage, compactedDBName), []byte(compactedDBName), 0755))
require.NoError(t, os.Remove(filepath.Join(tablePathInStorage, noUpdatesDB)))
require.NoError(t, os.Remove(filepath.Join(tablePathInStorage, newDB)))
require.NoError(t, os.Remove(filepath.Join(tablePathInStorage, oneMoreDB)))

// let us run a sync which should detect the stale index list cache and sync the table after refreshing the cache
require.NoError(t, table.Sync(context.Background()))

// verify that table has got only compacted db
indexesFound = []string{}
err = table.ForEach(context.Background(), userID, func(idx index.Index) error {
indexesFound = append(indexesFound, idx.Name())
return nil
})
require.NoError(t, err)
sort.Strings(indexesFound)
require.Equal(t, []string{compactedDBName}, indexesFound)
}

func TestLoadTable(t *testing.T) {
Expand Down

0 comments on commit 9a2df5a

Please sign in to comment.