From 1ba1671701ec19a081b89eda7429898161c88239 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Fri, 29 Apr 2022 15:07:38 +0530 Subject: [PATCH 1/2] bypass index list cache when doing query time downloading of index --- .../indexshipper/downloads/index_set.go | 2 +- .../indexshipper/downloads/table_manager.go | 2 +- .../downloads/table_manager_test.go | 2 +- .../indexshipper/downloads/table_test.go | 21 ++++++++++++--- .../stores/shipper/compactor/index_set.go | 2 +- pkg/storage/stores/shipper/compactor/table.go | 4 +-- .../stores/shipper/downloads/index_set.go | 16 +++++------ .../shipper/downloads/index_set_test.go | 2 +- pkg/storage/stores/shipper/downloads/table.go | 6 ++--- .../stores/shipper/downloads/table_manager.go | 2 +- .../shipper/downloads/table_manager_test.go | 2 +- .../stores/shipper/downloads/table_test.go | 21 ++++++++++++--- .../stores/shipper/storage/cached_client.go | 21 ++++++++++----- .../shipper/storage/cached_client_test.go | 22 +++++++-------- pkg/storage/stores/shipper/storage/client.go | 27 ++++++++++--------- .../stores/shipper/storage/client_test.go | 3 ++- .../stores/shipper/storage/index_set.go | 13 ++++++--- pkg/storage/stores/shipper/table_client.go | 3 ++- 18 files changed, 109 insertions(+), 62 deletions(-) diff --git a/pkg/storage/stores/indexshipper/downloads/index_set.go b/pkg/storage/stores/indexshipper/downloads/index_set.go index 65363447b067f..d9fa6dcda2fe1 100644 --- a/pkg/storage/stores/indexshipper/downloads/index_set.go +++ b/pkg/storage/stores/indexshipper/downloads/index_set.go @@ -284,7 +284,7 @@ func (t *indexSet) checkStorageForUpdates(ctx context.Context, lock bool) (toDow // listing tables from store var files []storage.IndexFile - files, err = t.baseIndexSet.ListFiles(ctx, t.tableName, t.userID) + files, err = t.baseIndexSet.ListFiles(ctx, t.tableName, t.userID, false) if err != nil { return } diff --git a/pkg/storage/stores/indexshipper/downloads/table_manager.go b/pkg/storage/stores/indexshipper/downloads/table_manager.go index 423a71baab1ec..caf1a7b358f7a 100644 --- a/pkg/storage/stores/indexshipper/downloads/table_manager.go +++ b/pkg/storage/stores/indexshipper/downloads/table_manager.go @@ -263,7 +263,7 @@ func (tm *tableManager) ensureQueryReadiness(ctx context.Context) error { } // list the users that have dedicated index files for this table - _, usersWithIndex, err := tm.indexStorageClient.ListFiles(ctx, tableName) + _, usersWithIndex, err := tm.indexStorageClient.ListFiles(ctx, tableName, false) if err != nil { return err } diff --git a/pkg/storage/stores/indexshipper/downloads/table_manager_test.go b/pkg/storage/stores/indexshipper/downloads/table_manager_test.go index 9aa20c91f4a37..b62a95ac87a79 100644 --- a/pkg/storage/stores/indexshipper/downloads/table_manager_test.go +++ b/pkg/storage/stores/indexshipper/downloads/table_manager_test.go @@ -322,6 +322,6 @@ func (m *mockIndexStorageClient) ListTables(ctx context.Context) ([]string, erro return m.tablesInStorage, nil } -func (m *mockIndexStorageClient) ListFiles(ctx context.Context, tableName string) ([]storage.IndexFile, []string, error) { +func (m *mockIndexStorageClient) ListFiles(ctx context.Context, tableName string, bypassCache bool) ([]storage.IndexFile, []string, error) { return []storage.IndexFile{}, m.userIndexesInTables[tableName], nil } diff --git a/pkg/storage/stores/indexshipper/downloads/table_test.go b/pkg/storage/stores/indexshipper/downloads/table_test.go index 46189de49c135..14fb9874c6397 100644 --- a/pkg/storage/stores/indexshipper/downloads/table_test.go +++ b/pkg/storage/stores/indexshipper/downloads/table_test.go @@ -32,8 +32,8 @@ func newStorageClientWithFakeObjectsInList(storageClient storage.Client) storage return storageClientWithFakeObjectsInList{storageClient} } -func (o storageClientWithFakeObjectsInList) ListFiles(ctx context.Context, tableName string) ([]storage.IndexFile, []string, error) { - files, userIDs, err := o.Client.ListFiles(ctx, tableName) +func (o storageClientWithFakeObjectsInList) ListFiles(ctx context.Context, tableName string, bypassCache bool) ([]storage.IndexFile, []string, error) { + files, userIDs, err := o.Client.ListFiles(ctx, tableName, true) if err != nil { return nil, nil, err } @@ -46,6 +46,20 @@ func (o storageClientWithFakeObjectsInList) ListFiles(ctx context.Context, table return files, userIDs, nil } +func (o storageClientWithFakeObjectsInList) ListUserFiles(ctx context.Context, tableName, userID string, _ bool) ([]storage.IndexFile, error) { + files, err := o.Client.ListUserFiles(ctx, tableName, userID, true) + if err != nil { + return nil, err + } + + files = append(files, storage.IndexFile{ + Name: "fake-object", + ModifiedAt: time.Now(), + }) + + return files, nil +} + func buildTestTable(t *testing.T, path string) (*table, stopFunc) { storageClient := buildTestStorageClient(t, path) cachePath := filepath.Join(path, cacheDirName) @@ -53,7 +67,7 @@ func buildTestTable(t *testing.T, path string) (*table, stopFunc) { table := NewTable(tableName, cachePath, storageClient, func(path string) (index.Index, error) { return openMockIndexFile(t, path), nil }).(*table) - _, usersWithIndex, err := table.storageClient.ListFiles(context.Background(), tableName) + _, usersWithIndex, err := table.storageClient.ListFiles(context.Background(), tableName, false) require.NoError(t, err) require.NoError(t, table.EnsureQueryReadiness(context.Background(), usersWithIndex)) @@ -301,6 +315,7 @@ func TestTable_Sync(t *testing.T) { require.NoError(t, ioutil.WriteFile(filepath.Join(tablePathInStorage, newDB), []byte(newDB), 0755)) // sync the table + table.storageClient.RefreshIndexListCache(context.Background()) require.NoError(t, table.Sync(context.Background())) // check that table got the new index and dropped the deleted index diff --git a/pkg/storage/stores/shipper/compactor/index_set.go b/pkg/storage/stores/shipper/compactor/index_set.go index af1e27de1d5f4..212ce6818c52b 100644 --- a/pkg/storage/stores/shipper/compactor/index_set.go +++ b/pkg/storage/stores/shipper/compactor/index_set.go @@ -94,7 +94,7 @@ func (is *indexSet) initUserIndexSet(workingDir string) { ctx, cancelFunc := context.WithTimeout(is.ctx, userIndexReadinessTimeout) defer cancelFunc() - is.sourceObjects, is.err = is.baseIndexSet.ListFiles(is.ctx, is.tableName, is.userID) + is.sourceObjects, is.err = is.baseIndexSet.ListFiles(is.ctx, is.tableName, is.userID, false) if is.err != nil { return } diff --git a/pkg/storage/stores/shipper/compactor/table.go b/pkg/storage/stores/shipper/compactor/table.go index cac8e89f4aefa..c73b36eb2196e 100644 --- a/pkg/storage/stores/shipper/compactor/table.go +++ b/pkg/storage/stores/shipper/compactor/table.go @@ -123,7 +123,7 @@ func newTable(ctx context.Context, workingDirectory string, indexStorageClient s } func (t *table) compact(applyRetention bool) error { - indexFiles, usersWithPerUserIndex, err := t.indexStorageClient.ListFiles(t.ctx, t.name) + indexFiles, usersWithPerUserIndex, err := t.indexStorageClient.ListFiles(t.ctx, t.name, false) if err != nil { return err } @@ -209,7 +209,7 @@ func (t *table) done() error { continue } - indexFiles, err := t.baseUserIndexSet.ListFiles(t.ctx, t.name, userID) + indexFiles, err := t.baseUserIndexSet.ListFiles(t.ctx, t.name, userID, false) if err != nil { return err } diff --git a/pkg/storage/stores/shipper/downloads/index_set.go b/pkg/storage/stores/shipper/downloads/index_set.go index f0f8692d19c80..faa5a4f93400c 100644 --- a/pkg/storage/stores/shipper/downloads/index_set.go +++ b/pkg/storage/stores/shipper/downloads/index_set.go @@ -26,7 +26,7 @@ import ( ) type IndexSet interface { - Init() error + Init(forQuerying bool) error Close() MultiQueries(ctx context.Context, queries []index.Query, callback index.QueryPagesCallback) error DropAllDBs() error @@ -87,7 +87,7 @@ func NewIndexSet(tableName, userID, cacheLocation string, baseIndexSet storage.I } // Init downloads all the db files for the table from object storage. -func (t *indexSet) Init() (err error) { +func (t *indexSet) Init(forQuerying bool) (err error) { // Using background context to avoid cancellation of download when request times out. // We would anyways need the files for serving next requests. ctx, cancelFunc := context.WithTimeout(context.Background(), downloadTimeout) @@ -142,7 +142,7 @@ func (t *indexSet) Init() (err error) { level.Debug(logger).Log("msg", fmt.Sprintf("opened %d local files, now starting sync operation", len(t.dbs))) // sync the table to get new files and remove the deleted ones from storage. - err = t.sync(ctx, false) + err = t.sync(ctx, false, forQuerying) if err != nil { return } @@ -275,11 +275,11 @@ func (t *indexSet) cleanupDB(fileName string) error { } func (t *indexSet) Sync(ctx context.Context) (err error) { - return t.sync(ctx, true) + return t.sync(ctx, true, false) } // sync downloads updated and new files from the storage relevant for the table and removes the deleted ones -func (t *indexSet) sync(ctx context.Context, lock bool) (err error) { +func (t *indexSet) sync(ctx context.Context, lock, bypassListCache bool) (err error) { level.Debug(t.logger).Log("msg", "syncing index files") defer func() { @@ -290,7 +290,7 @@ func (t *indexSet) sync(ctx context.Context, lock bool) (err error) { t.metrics.tablesSyncOperationTotal.WithLabelValues(status).Inc() }() - toDownload, toDelete, err := t.checkStorageForUpdates(ctx, lock) + toDownload, toDelete, err := t.checkStorageForUpdates(ctx, lock, bypassListCache) if err != nil { return err } @@ -331,11 +331,11 @@ func (t *indexSet) sync(ctx context.Context, lock bool) (err error) { } // checkStorageForUpdates compares files from cache with storage and builds the list of files to be downloaded from storage and to be deleted from cache -func (t *indexSet) checkStorageForUpdates(ctx context.Context, lock bool) (toDownload []storage.IndexFile, toDelete []string, err error) { +func (t *indexSet) checkStorageForUpdates(ctx context.Context, lock, bypassListCache bool) (toDownload []storage.IndexFile, toDelete []string, err error) { // listing tables from store var files []storage.IndexFile - files, err = t.baseIndexSet.ListFiles(ctx, t.tableName, t.userID) + files, err = t.baseIndexSet.ListFiles(ctx, t.tableName, t.userID, bypassListCache) if err != nil { return } diff --git a/pkg/storage/stores/shipper/downloads/index_set_test.go b/pkg/storage/stores/shipper/downloads/index_set_test.go index b583b731ab789..0a7ddeb6ed2ad 100644 --- a/pkg/storage/stores/shipper/downloads/index_set_test.go +++ b/pkg/storage/stores/shipper/downloads/index_set_test.go @@ -26,7 +26,7 @@ func buildTestIndexSet(t *testing.T, userID, path string) (*indexSet, stopFunc) boltDBIndexClient, util_log.Logger, newMetrics(nil)) require.NoError(t, err) - require.NoError(t, idxSet.Init()) + require.NoError(t, idxSet.Init(false)) return idxSet.(*indexSet), func() { idxSet.Close() diff --git a/pkg/storage/stores/shipper/downloads/table.go b/pkg/storage/stores/shipper/downloads/table.go index ee82877eb1a6c..c75aae5160984 100644 --- a/pkg/storage/stores/shipper/downloads/table.go +++ b/pkg/storage/stores/shipper/downloads/table.go @@ -114,7 +114,7 @@ func LoadTable(name, cacheLocation string, storageClient storage.Client, boltDBI return nil, err } - err = userIndexSet.Init() + err = userIndexSet.Init(false) if err != nil { return nil, err } @@ -128,7 +128,7 @@ func LoadTable(name, cacheLocation string, storageClient storage.Client, boltDBI return nil, err } - err = commonIndexSet.Init() + err = commonIndexSet.Init(false) if err != nil { return nil, err } @@ -309,7 +309,7 @@ func (t *table) getOrCreateIndexSet(ctx context.Context, id string, forQuerying }() } - err := indexSet.Init() + err := indexSet.Init(forQuerying) if err != nil { level.Error(t.logger).Log("msg", fmt.Sprintf("failed to init user index set %s", id), "err", err) } diff --git a/pkg/storage/stores/shipper/downloads/table_manager.go b/pkg/storage/stores/shipper/downloads/table_manager.go index 04d61d2784c28..85574b6fca6fb 100644 --- a/pkg/storage/stores/shipper/downloads/table_manager.go +++ b/pkg/storage/stores/shipper/downloads/table_manager.go @@ -297,7 +297,7 @@ func (tm *TableManager) ensureQueryReadiness(ctx context.Context) error { } // list the users that have dedicated index files for this table - _, usersWithIndex, err := tm.indexStorageClient.ListFiles(ctx, tableName) + _, usersWithIndex, err := tm.indexStorageClient.ListFiles(ctx, tableName, false) if err != nil { return err } diff --git a/pkg/storage/stores/shipper/downloads/table_manager_test.go b/pkg/storage/stores/shipper/downloads/table_manager_test.go index bac73d5b8c4cf..8f7003890eb1f 100644 --- a/pkg/storage/stores/shipper/downloads/table_manager_test.go +++ b/pkg/storage/stores/shipper/downloads/table_manager_test.go @@ -328,6 +328,6 @@ func (m *mockIndexStorageClient) ListTables(ctx context.Context) ([]string, erro return m.tablesInStorage, nil } -func (m *mockIndexStorageClient) ListFiles(ctx context.Context, tableName string) ([]storage.IndexFile, []string, error) { +func (m *mockIndexStorageClient) ListFiles(ctx context.Context, tableName string, bypassCache bool) ([]storage.IndexFile, []string, error) { return []storage.IndexFile{}, m.userIndexesInTables[tableName], nil } diff --git a/pkg/storage/stores/shipper/downloads/table_test.go b/pkg/storage/stores/shipper/downloads/table_test.go index b91ef62d53aab..355357b6250f1 100644 --- a/pkg/storage/stores/shipper/downloads/table_test.go +++ b/pkg/storage/stores/shipper/downloads/table_test.go @@ -38,8 +38,8 @@ func newStorageClientWithFakeObjectsInList(storageClient storage.Client) storage return storageClientWithFakeObjectsInList{storageClient} } -func (o storageClientWithFakeObjectsInList) ListFiles(ctx context.Context, tableName string) ([]storage.IndexFile, []string, error) { - files, userIDs, err := o.Client.ListFiles(ctx, tableName) +func (o storageClientWithFakeObjectsInList) ListFiles(ctx context.Context, tableName string, _ bool) ([]storage.IndexFile, []string, error) { + files, userIDs, err := o.Client.ListFiles(ctx, tableName, true) if err != nil { return nil, nil, err } @@ -52,6 +52,20 @@ func (o storageClientWithFakeObjectsInList) ListFiles(ctx context.Context, table return files, userIDs, nil } +func (o storageClientWithFakeObjectsInList) ListUserFiles(ctx context.Context, tableName, userID string, _ bool) ([]storage.IndexFile, error) { + files, err := o.Client.ListUserFiles(ctx, tableName, userID, true) + if err != nil { + return nil, err + } + + files = append(files, storage.IndexFile{ + Name: "fake-object", + ModifiedAt: time.Now(), + }) + + return files, nil +} + type stopFunc func() func buildTestClients(t *testing.T, path string) (*local.BoltIndexClient, storage.Client) { @@ -72,7 +86,7 @@ func buildTestTable(t *testing.T, path string) (*table, *local.BoltIndexClient, cachePath := filepath.Join(path, cacheDirName) table := NewTable(tableName, cachePath, storageClient, boltDBIndexClient, newMetrics(nil)).(*table) - _, usersWithIndex, err := table.storageClient.ListFiles(context.Background(), tableName) + _, usersWithIndex, err := table.storageClient.ListFiles(context.Background(), tableName, false) require.NoError(t, err) require.NoError(t, table.EnsureQueryReadiness(context.Background(), usersWithIndex)) @@ -397,6 +411,7 @@ func TestTable_Sync(t *testing.T) { testutil.AddRecordsToDB(t, filepath.Join(tablePathInStorage, newDB), boltdbClient, 20, 10, nil) // sync the table + table.storageClient.RefreshIndexListCache(context.Background()) require.NoError(t, table.Sync(context.Background())) // query and verify table has expected records from new db and the records from deleted db are gone diff --git a/pkg/storage/stores/shipper/storage/cached_client.go b/pkg/storage/stores/shipper/storage/cached_client.go index d497d167fc381..af725a00cb598 100644 --- a/pkg/storage/stores/shipper/storage/cached_client.go +++ b/pkg/storage/stores/shipper/storage/cached_client.go @@ -50,7 +50,7 @@ func newCachedObjectClient(downstreamClient client.ObjectClient) *cachedObjectCl // We have a buffered channel here with a capacity of 1 to make sure only one concurrent call makes it through. // We also have a sync.WaitGroup to make sure all the concurrent calls to buildCacheOnce wait until the cache gets rebuilt since // we are doing read-through cache, and we do not want to serve stale results. -func (c *cachedObjectClient) buildCacheOnce(ctx context.Context) { +func (c *cachedObjectClient) buildCacheOnce(ctx context.Context, forceRefresh bool) { c.buildCacheWg.Add(1) defer c.buildCacheWg.Done() @@ -59,7 +59,7 @@ func (c *cachedObjectClient) buildCacheOnce(ctx context.Context) { select { case c.buildCacheChan <- struct{}{}: c.err = nil - c.err = c.buildCache(ctx) + c.err = c.buildCache(ctx, forceRefresh) <-c.buildCacheChan if c.err != nil { level.Error(util_log.Logger).Log("msg", "failed to build cache", "err", c.err) @@ -68,7 +68,16 @@ func (c *cachedObjectClient) buildCacheOnce(ctx context.Context) { } } -func (c *cachedObjectClient) List(ctx context.Context, prefix, _ string) ([]client.StorageObject, []client.StorageCommonPrefix, error) { +func (c *cachedObjectClient) RefreshIndexListCache(ctx context.Context) { + c.buildCacheOnce(ctx, true) + c.buildCacheWg.Wait() +} + +func (c *cachedObjectClient) List(ctx context.Context, prefix, objectDelimiter string, bypassCache bool) ([]client.StorageObject, []client.StorageCommonPrefix, error) { + if bypassCache { + return c.ObjectClient.List(ctx, prefix, objectDelimiter) + } + prefix = strings.TrimSuffix(prefix, delimiter) ss := strings.Split(prefix, delimiter) if len(ss) > 2 { @@ -76,7 +85,7 @@ func (c *cachedObjectClient) List(ctx context.Context, prefix, _ string) ([]clie } if time.Since(c.cacheBuiltAt) >= cacheTimeout { - c.buildCacheOnce(ctx) + c.buildCacheOnce(ctx, false) } // wait for cache build operation to finish, if running @@ -120,8 +129,8 @@ func (c *cachedObjectClient) List(ctx context.Context, prefix, _ string) ([]clie } // buildCache builds the cache if expired -func (c *cachedObjectClient) buildCache(ctx context.Context) error { - if time.Since(c.cacheBuiltAt) < cacheTimeout { +func (c *cachedObjectClient) buildCache(ctx context.Context, forceRefresh bool) error { + if !forceRefresh && time.Since(c.cacheBuiltAt) < cacheTimeout { return nil } diff --git a/pkg/storage/stores/shipper/storage/cached_client_test.go b/pkg/storage/stores/shipper/storage/cached_client_test.go index 6a89667160df2..f7835dd0cddf6 100644 --- a/pkg/storage/stores/shipper/storage/cached_client_test.go +++ b/pkg/storage/stores/shipper/storage/cached_client_test.go @@ -65,14 +65,14 @@ func TestCachedObjectClient(t *testing.T) { cachedObjectClient := newCachedObjectClient(objectClient) // list tables - objects, commonPrefixes, err := cachedObjectClient.List(context.Background(), "", "") + objects, commonPrefixes, err := cachedObjectClient.List(context.Background(), "", "", false) require.NoError(t, err) require.Equal(t, 1, objectClient.listCallsCount) require.Equal(t, []client.StorageObject{}, objects) require.Equal(t, []client.StorageCommonPrefix{"table1", "table2", "table3"}, commonPrefixes) // list objects in all 3 tables - objects, commonPrefixes, err = cachedObjectClient.List(context.Background(), "table1/", "") + objects, commonPrefixes, err = cachedObjectClient.List(context.Background(), "table1/", "", false) require.NoError(t, err) require.Equal(t, 1, objectClient.listCallsCount) require.Equal(t, []client.StorageObject{ @@ -81,7 +81,7 @@ func TestCachedObjectClient(t *testing.T) { }, objects) require.Equal(t, []client.StorageCommonPrefix{}, commonPrefixes) - objects, commonPrefixes, err = cachedObjectClient.List(context.Background(), "table2/", "") + objects, commonPrefixes, err = cachedObjectClient.List(context.Background(), "table2/", "", false) require.NoError(t, err) require.Equal(t, 1, objectClient.listCallsCount) require.Equal(t, []client.StorageObject{ @@ -89,14 +89,14 @@ func TestCachedObjectClient(t *testing.T) { }, objects) require.Equal(t, []client.StorageCommonPrefix{"table2/user1"}, commonPrefixes) - objects, commonPrefixes, err = cachedObjectClient.List(context.Background(), "table3/", "") + objects, commonPrefixes, err = cachedObjectClient.List(context.Background(), "table3/", "", false) require.NoError(t, err) require.Equal(t, 1, objectClient.listCallsCount) require.Equal(t, []client.StorageObject{}, objects) require.Equal(t, []client.StorageCommonPrefix{"table3/user1"}, commonPrefixes) // list user objects from table2 and table3 - objects, commonPrefixes, err = cachedObjectClient.List(context.Background(), "table2/user1/", "") + objects, commonPrefixes, err = cachedObjectClient.List(context.Background(), "table2/user1/", "", false) require.NoError(t, err) require.Equal(t, 1, objectClient.listCallsCount) require.Equal(t, []client.StorageObject{ @@ -106,7 +106,7 @@ func TestCachedObjectClient(t *testing.T) { }, objects) require.Equal(t, []client.StorageCommonPrefix{}, commonPrefixes) - objects, commonPrefixes, err = cachedObjectClient.List(context.Background(), "table3/user1/", "") + objects, commonPrefixes, err = cachedObjectClient.List(context.Background(), "table3/user1/", "", false) require.NoError(t, err) require.Equal(t, 1, objectClient.listCallsCount) require.Equal(t, []client.StorageObject{ @@ -116,14 +116,14 @@ func TestCachedObjectClient(t *testing.T) { require.Equal(t, []client.StorageCommonPrefix{}, commonPrefixes) // list non-existent table - objects, commonPrefixes, err = cachedObjectClient.List(context.Background(), "table4/", "") + objects, commonPrefixes, err = cachedObjectClient.List(context.Background(), "table4/", "", false) require.NoError(t, err) require.Equal(t, 1, objectClient.listCallsCount) require.Equal(t, []client.StorageObject{}, objects) require.Equal(t, []client.StorageCommonPrefix{}, commonPrefixes) // list non-existent user - objects, commonPrefixes, err = cachedObjectClient.List(context.Background(), "table3/user2/", "") + objects, commonPrefixes, err = cachedObjectClient.List(context.Background(), "table3/user2/", "", false) require.NoError(t, err) require.Equal(t, 1, objectClient.listCallsCount) require.Equal(t, []client.StorageObject{}, objects) @@ -141,7 +141,7 @@ func TestCachedObjectClient_errors(t *testing.T) { cachedObjectClient := newCachedObjectClient(objectClient) // do the initial listing - objects, commonPrefixes, err := cachedObjectClient.List(context.Background(), "", "") + objects, commonPrefixes, err := cachedObjectClient.List(context.Background(), "", "", false) require.NoError(t, err) require.Equal(t, 1, objectClient.listCallsCount) require.Equal(t, []client.StorageObject{}, objects) @@ -157,7 +157,7 @@ func TestCachedObjectClient_errors(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - _, _, err := cachedObjectClient.List(context.Background(), "", "") + _, _, err := cachedObjectClient.List(context.Background(), "", "", false) require.Error(t, err) require.Equal(t, 2, objectClient.listCallsCount) }() @@ -172,7 +172,7 @@ func TestCachedObjectClient_errors(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - objects, commonPrefixes, err = cachedObjectClient.List(context.Background(), "", "") + objects, commonPrefixes, err = cachedObjectClient.List(context.Background(), "", "", false) require.NoError(t, err) require.Equal(t, 3, objectClient.listCallsCount) require.Equal(t, []client.StorageObject{}, objects) diff --git a/pkg/storage/stores/shipper/storage/client.go b/pkg/storage/stores/shipper/storage/client.go index 2e699550bf842..1663891da43b0 100644 --- a/pkg/storage/stores/shipper/storage/client.go +++ b/pkg/storage/stores/shipper/storage/client.go @@ -8,14 +8,13 @@ import ( "time" "github.com/grafana/loki/pkg/storage/chunk/client" - "github.com/grafana/loki/pkg/storage/chunk/client/local" ) const delimiter = "/" // UserIndexClient allows doing operations on the object store for user specific index. type UserIndexClient interface { - ListUserFiles(ctx context.Context, tableName, userID string) ([]IndexFile, error) + ListUserFiles(ctx context.Context, tableName, userID string, bypassCache bool) ([]IndexFile, error) GetUserFile(ctx context.Context, tableName, userID, fileName string) (io.ReadCloser, error) PutUserFile(ctx context.Context, tableName, userID, fileName string, file io.ReadSeeker) error DeleteUserFile(ctx context.Context, tableName, userID, fileName string) error @@ -23,7 +22,7 @@ type UserIndexClient interface { // CommonIndexClient allows doing operations on the object store for common index. type CommonIndexClient interface { - ListFiles(ctx context.Context, tableName string) ([]IndexFile, []string, error) + ListFiles(ctx context.Context, tableName string, bypassCache bool) ([]IndexFile, []string, error) GetFile(ctx context.Context, tableName, fileName string) (io.ReadCloser, error) PutFile(ctx context.Context, tableName, fileName string, file io.ReadSeeker) error DeleteFile(ctx context.Context, tableName, fileName string) error @@ -34,13 +33,14 @@ type Client interface { CommonIndexClient UserIndexClient + RefreshIndexListCache(ctx context.Context) ListTables(ctx context.Context) ([]string, error) IsFileNotFoundErr(err error) bool Stop() } type indexStorageClient struct { - objectClient client.ObjectClient + objectClient *cachedObjectClient } type IndexFile struct { @@ -49,15 +49,16 @@ type IndexFile struct { } func NewIndexStorageClient(origObjectClient client.ObjectClient, storagePrefix string) Client { - objectClient := newPrefixedObjectClient(origObjectClient, storagePrefix) - if _, ok := origObjectClient.(*local.FSObjectClient); !ok { - objectClient = newCachedObjectClient(objectClient) - } + objectClient := newCachedObjectClient(newPrefixedObjectClient(origObjectClient, storagePrefix)) return &indexStorageClient{objectClient: objectClient} } +func (s *indexStorageClient) RefreshIndexListCache(ctx context.Context) { + s.objectClient.RefreshIndexListCache(ctx) +} + func (s *indexStorageClient) ListTables(ctx context.Context) ([]string, error) { - _, tables, err := s.objectClient.List(ctx, "", delimiter) + _, tables, err := s.objectClient.List(ctx, "", delimiter, false) if err != nil { return nil, err } @@ -70,11 +71,11 @@ func (s *indexStorageClient) ListTables(ctx context.Context) ([]string, error) { return tableNames, nil } -func (s *indexStorageClient) ListFiles(ctx context.Context, tableName string) ([]IndexFile, []string, error) { +func (s *indexStorageClient) ListFiles(ctx context.Context, tableName string, bypassCache bool) ([]IndexFile, []string, error) { // The forward slash here needs to stay because we are trying to list contents of a directory without which // we will get the name of the same directory back with hosted object stores. // This is due to the object stores not having a concept of directories. - objects, users, err := s.objectClient.List(ctx, tableName+delimiter, delimiter) + objects, users, err := s.objectClient.List(ctx, tableName+delimiter, delimiter, bypassCache) if err != nil { return nil, nil, err } @@ -99,11 +100,11 @@ func (s *indexStorageClient) ListFiles(ctx context.Context, tableName string) ([ return files, userIDs, nil } -func (s *indexStorageClient) ListUserFiles(ctx context.Context, tableName, userID string) ([]IndexFile, error) { +func (s *indexStorageClient) ListUserFiles(ctx context.Context, tableName, userID string, bypassCache bool) ([]IndexFile, error) { // The forward slash here needs to stay because we are trying to list contents of a directory without which // we will get the name of the same directory back with hosted object stores. // This is due to the object stores not having a concept of directories. - objects, _, err := s.objectClient.List(ctx, path.Join(tableName, userID)+delimiter, delimiter) + objects, _, err := s.objectClient.List(ctx, path.Join(tableName, userID)+delimiter, delimiter, bypassCache) if err != nil { return nil, err } diff --git a/pkg/storage/stores/shipper/storage/client_test.go b/pkg/storage/stores/shipper/storage/client_test.go index 13c869b4e6f95..991aebce5c290 100644 --- a/pkg/storage/stores/shipper/storage/client_test.go +++ b/pkg/storage/stores/shipper/storage/client_test.go @@ -36,6 +36,7 @@ func TestIndexStorageClient(t *testing.T) { indexStorageClient := NewIndexStorageClient(objectClient, storageKeyPrefix) verifyFiles := func() { + indexStorageClient.RefreshIndexListCache(context.Background()) tables, err := indexStorageClient.ListTables(context.Background()) require.NoError(t, err) require.Len(t, tables, len(tablesToSetup)) @@ -43,7 +44,7 @@ func TestIndexStorageClient(t *testing.T) { expectedFiles, ok := tablesToSetup[table] require.True(t, ok) - filesInStorage, _, err := indexStorageClient.ListFiles(context.Background(), table) + filesInStorage, _, err := indexStorageClient.ListFiles(context.Background(), table, false) require.NoError(t, err) require.Len(t, filesInStorage, len(expectedFiles)) diff --git a/pkg/storage/stores/shipper/storage/index_set.go b/pkg/storage/stores/shipper/storage/index_set.go index 306fbad9c12a4..2f05f88c1dd2f 100644 --- a/pkg/storage/stores/shipper/storage/index_set.go +++ b/pkg/storage/stores/shipper/storage/index_set.go @@ -13,7 +13,8 @@ var ( // IndexSet provides storage operations for user or common index tables. type IndexSet interface { - ListFiles(ctx context.Context, tableName, userID string) ([]IndexFile, error) + RefreshIndexListCache(ctx context.Context) + ListFiles(ctx context.Context, tableName, userID string, bypassCache bool) ([]IndexFile, error) GetFile(ctx context.Context, tableName, userID, fileName string) (io.ReadCloser, error) PutFile(ctx context.Context, tableName, userID, fileName string, file io.ReadSeeker) error DeleteFile(ctx context.Context, tableName, userID, fileName string) error @@ -44,17 +45,21 @@ func (i indexSet) validateUserID(userID string) error { return nil } -func (i indexSet) ListFiles(ctx context.Context, tableName, userID string) ([]IndexFile, error) { +func (i indexSet) RefreshIndexListCache(ctx context.Context) { + i.client.RefreshIndexListCache(ctx) +} + +func (i indexSet) ListFiles(ctx context.Context, tableName, userID string, bypassCache bool) ([]IndexFile, error) { err := i.validateUserID(userID) if err != nil { return nil, err } if i.userBasedIndex { - return i.client.ListUserFiles(ctx, tableName, userID) + return i.client.ListUserFiles(ctx, tableName, userID, bypassCache) } - files, _, err := i.client.ListFiles(ctx, tableName) + files, _, err := i.client.ListFiles(ctx, tableName, bypassCache) return files, err } diff --git a/pkg/storage/stores/shipper/table_client.go b/pkg/storage/stores/shipper/table_client.go index cd9d767b7a9b5..5759e9242a581 100644 --- a/pkg/storage/stores/shipper/table_client.go +++ b/pkg/storage/stores/shipper/table_client.go @@ -18,6 +18,7 @@ func NewBoltDBShipperTableClient(objectClient client.ObjectClient, storageKeyPre } func (b *boltDBShipperTableClient) ListTables(ctx context.Context) ([]string, error) { + b.indexStorageClient.RefreshIndexListCache(ctx) return b.indexStorageClient.ListTables(ctx) } @@ -30,7 +31,7 @@ func (b *boltDBShipperTableClient) Stop() { } func (b *boltDBShipperTableClient) DeleteTable(ctx context.Context, tableName string) error { - files, _, err := b.indexStorageClient.ListFiles(ctx, tableName) + files, _, err := b.indexStorageClient.ListFiles(ctx, tableName, true) if err != nil { return err } From f00c35f402e1617c2d0e47060a6b7030386c7069 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Fri, 29 Apr 2022 15:08:22 +0530 Subject: [PATCH 2/2] detect and refresh stale index list cache during sync --- .../stores/shipper/downloads/index_set.go | 10 +++++++++ pkg/storage/stores/shipper/downloads/table.go | 9 ++++++++ .../stores/shipper/downloads/table_test.go | 21 +++++++++++++++++++ 3 files changed, 40 insertions(+) diff --git a/pkg/storage/stores/shipper/downloads/index_set.go b/pkg/storage/stores/shipper/downloads/index_set.go index faa5a4f93400c..a6c9347e89ecf 100644 --- a/pkg/storage/stores/shipper/downloads/index_set.go +++ b/pkg/storage/stores/shipper/downloads/index_set.go @@ -25,6 +25,8 @@ import ( "github.com/grafana/loki/pkg/util/spanlogger" ) +var errIndexListCacheTooStale = fmt.Errorf("index list cache too stale") + type IndexSet interface { Init(forQuerying bool) error Close() @@ -302,6 +304,14 @@ func (t *indexSet) sync(ctx context.Context, lock, bypassListCache bool) (err er return err } + // if we did not bypass list cache and skipped downloading all the new files due to them being removed by compaction, + // it means the cache is not valid anymore since compaction would have happened after last index list cache refresh. + // Let us return error to ask the caller to re-run the sync after the list cache refresh. + if !bypassListCache && len(downloadedFiles) == 0 && len(toDownload) > 0 { + level.Error(t.logger).Log("msg", "we skipped downloading all the new files, possibly removed by compaction", "files", toDownload) + return errIndexListCacheTooStale + } + if lock { err = t.dbsMtx.lock(ctx) if err != nil { diff --git a/pkg/storage/stores/shipper/downloads/table.go b/pkg/storage/stores/shipper/downloads/table.go index c75aae5160984..8153423475a72 100644 --- a/pkg/storage/stores/shipper/downloads/table.go +++ b/pkg/storage/stores/shipper/downloads/table.go @@ -254,6 +254,15 @@ func (t *table) Sync(ctx context.Context) error { for userID, indexSet := range t.indexSets { if err := indexSet.Sync(ctx); err != nil { + if errors.Is(err, errIndexListCacheTooStale) { + level.Info(t.logger).Log("msg", "we have hit stale list cache, refreshing it and running sync again") + t.storageClient.RefreshIndexListCache(ctx) + + err = indexSet.Sync(ctx) + if err == nil { + continue + } + } return errors.Wrap(err, fmt.Sprintf("failed to sync index set %s for table %s", userID, t.name)) } } diff --git a/pkg/storage/stores/shipper/downloads/table_test.go b/pkg/storage/stores/shipper/downloads/table_test.go index 355357b6250f1..cc3430f8e5450 100644 --- a/pkg/storage/stores/shipper/downloads/table_test.go +++ b/pkg/storage/stores/shipper/downloads/table_test.go @@ -431,6 +431,27 @@ func TestTable_Sync(t *testing.T) { _, ok := expectedFilesInDir[fileInfo.Name()] require.True(t, ok) } + + // let us simulate a compaction to test stale index list cache handling + + // first, let us add a new file and refresh the index list cache + oneMoreDB := "one-more-db" + testutil.AddRecordsToDB(t, filepath.Join(tablePathInStorage, oneMoreDB), boltdbClient, 30, 10, nil) + table.storageClient.RefreshIndexListCache(context.Background()) + + // now, without syncing the table, let us compact the index in storage + compactedDBName := "compacted-db" + testutil.AddRecordsToDB(t, filepath.Join(tablePathInStorage, compactedDBName), boltdbClient, 10, 30, nil) + require.NoError(t, os.Remove(filepath.Join(tablePathInStorage, noUpdatesDB))) + require.NoError(t, os.Remove(filepath.Join(tablePathInStorage, newDB))) + require.NoError(t, os.Remove(filepath.Join(tablePathInStorage, oneMoreDB))) + + // let us run a sync which should detect the stale index list cache and sync the table after refreshing the cache + require.NoError(t, table.Sync(context.Background())) + // query and verify table has expected records + testutil.TestSingleTableQuery(t, userID, []index.Query{{}}, table, 10, 30) + + require.Len(t, table.indexSets[""].(*indexSet).dbs, 1) } func TestTable_QueryResponse(t *testing.T) {