From d9a58e2b00af7af8a7d9a995550e16592a4ceaa3 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Fri, 6 May 2022 13:12:45 -0400 Subject: [PATCH 1/3] loads previously created TSDBs into shipper on startup --- .../stores/indexshipper/uploads/table.go | 18 +++-- pkg/storage/stores/tsdb/head_manager.go | 6 ++ pkg/storage/stores/tsdb/manager.go | 65 +++++++++++++++++++ 3 files changed, 84 insertions(+), 5 deletions(-) diff --git a/pkg/storage/stores/indexshipper/uploads/table.go b/pkg/storage/stores/indexshipper/uploads/table.go index f88283581eb9d..e782df18a47e8 100644 --- a/pkg/storage/stores/indexshipper/uploads/table.go +++ b/pkg/storage/stores/indexshipper/uploads/table.go @@ -82,12 +82,20 @@ func (lt *table) ForEach(userID string, callback index.ForEachIndexCallback) err lt.indexSetMtx.RLock() defer lt.indexSetMtx.RUnlock() - idxSet, ok := lt.indexSet[userID] - if !ok { - return nil - } + // TODO(owen-d): refactor? Uploads mgr never has user indices, + // only common (multitenant) ones. + // iterate through both user and common index + for _, uid := range []string{userID, ""} { + idxSet, ok := lt.indexSet[uid] + if !ok { + continue + } - return idxSet.ForEach(callback) + if err := idxSet.ForEach(callback); err != nil { + return err + } + } + return nil } // Upload uploads the index to object storage. diff --git a/pkg/storage/stores/tsdb/head_manager.go b/pkg/storage/stores/tsdb/head_manager.go index 936a6f3556e70..2026d59721558 100644 --- a/pkg/storage/stores/tsdb/head_manager.go +++ b/pkg/storage/stores/tsdb/head_manager.go @@ -185,6 +185,12 @@ func (m *HeadManager) Start() error { m.activeHeads = newTenantHeads(now, m.shards, m.metrics, m.log) + // Load the shipper with any previously built TSDBs + if err := m.tsdbManager.Start(); err != nil { + return errors.Wrap(err, "failed to start tsdb manager") + } + + // Build any old WALs into TSDBs for the shipper for _, group := range walsByPeriod { if group.period < curPeriod { if err := m.tsdbManager.BuildFromWALs( diff --git a/pkg/storage/stores/tsdb/manager.go b/pkg/storage/stores/tsdb/manager.go index 1ee6e04ccfc66..96ea491e55625 100644 --- a/pkg/storage/stores/tsdb/manager.go +++ b/pkg/storage/stores/tsdb/manager.go @@ -3,8 +3,10 @@ package tsdb import ( "context" "fmt" + "io/ioutil" "math" "path/filepath" + "strconv" "sync" "time" @@ -24,6 +26,7 @@ import ( // TSDBManager wraps the index shipper and writes/manages // TSDB files on disk type TSDBManager interface { + Start() error Index // Builds a new TSDB file from a set of WALs BuildFromWALs(time.Time, []WALIdentifier) error @@ -68,6 +71,68 @@ func NewTSDBManager( } } +func (m *tsdbManager) Start() error { + // load list of multitenant tsdbs + mulitenantDir := managerMultitenantDir(m.dir) + files, err := ioutil.ReadDir(mulitenantDir) + if err != nil { + return err + } + + for _, f := range files { + if !f.IsDir() { + continue + } + + bucket, err := strconv.Atoi(f.Name()) + if err != nil { + level.Warn(m.log).Log( + "msg", "failed to parse bucket in multitenant dir ", + "err", err.Error(), + ) + continue + } + + tsdbs, err := ioutil.ReadDir(filepath.Join(mulitenantDir, f.Name())) + if err != nil { + level.Warn(m.log).Log( + "msg", "failed to open period bucket dir", + "bucket", bucket, + "err", err.Error(), + ) + continue + } + + for _, db := range tsdbs { + id, ok := parseMultitenantTSDBPath(db.Name()) + if !ok { + continue + } + + prefixed := newPrefixedIdentifier(id, filepath.Join(mulitenantDir, f.Name()), "") + loaded, err := NewShippableTSDBFile( + prefixed, + false, + ) + + if err != nil { + level.Warn(m.log).Log( + "msg", "", + "tsdbPath", prefixed.Path(), + "err", err.Error(), + ) + } + + if err := m.shipper.AddIndex(f.Name(), "", loaded); err != nil { + return err + } + } + + } + + return nil +} + func (m *tsdbManager) BuildFromWALs(t time.Time, ids []WALIdentifier) (err error) { level.Debug(m.log).Log("msg", "building WALs", "n", len(ids), "ts", t) // get relevant wals From 342529ed42dfc53be3c2ea2f4b880208a85f410f Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Fri, 6 May 2022 13:23:23 -0400 Subject: [PATCH 2/3] noopTSDBManager impl --- pkg/storage/stores/tsdb/head_manager_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/storage/stores/tsdb/head_manager_test.go b/pkg/storage/stores/tsdb/head_manager_test.go index a0d41c2a351e1..0aca98d562743 100644 --- a/pkg/storage/stores/tsdb/head_manager_test.go +++ b/pkg/storage/stores/tsdb/head_manager_test.go @@ -22,6 +22,7 @@ import ( type noopTSDBManager struct{ NoopIndex } func (noopTSDBManager) BuildFromWALs(_ time.Time, _ []WALIdentifier) error { return nil } +func (noopTSDBManager) Start() error { return nil } func chunkMetasToChunkRefs(user string, fp uint64, xs index.ChunkMetas) (res []ChunkRef) { for _, x := range xs { From c93276544b905148f0424d0cf634430b227ac48e Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Fri, 6 May 2022 13:46:14 -0400 Subject: [PATCH 3/3] info logs for initial tsdb loading --- pkg/storage/stores/tsdb/head_manager.go | 1 + pkg/storage/stores/tsdb/manager.go | 21 ++++++++++++++++++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/pkg/storage/stores/tsdb/head_manager.go b/pkg/storage/stores/tsdb/head_manager.go index 2026d59721558..dc841012e3d95 100644 --- a/pkg/storage/stores/tsdb/head_manager.go +++ b/pkg/storage/stores/tsdb/head_manager.go @@ -182,6 +182,7 @@ func (m *HeadManager) Start() error { if err != nil { return err } + level.Info(m.log).Log("msg", "loaded wals by period", "groups", len(walsByPeriod)) m.activeHeads = newTenantHeads(now, m.shards, m.metrics, m.log) diff --git a/pkg/storage/stores/tsdb/manager.go b/pkg/storage/stores/tsdb/manager.go index 96ea491e55625..5b9cffc4abe96 100644 --- a/pkg/storage/stores/tsdb/manager.go +++ b/pkg/storage/stores/tsdb/manager.go @@ -71,7 +71,22 @@ func NewTSDBManager( } } -func (m *tsdbManager) Start() error { +func (m *tsdbManager) Start() (err error) { + var ( + buckets, indices, loadingErrors int + ) + + defer func() { + level.Info(m.log).Log( + "msg", "loaded leftover local indices", + "err", err, + "successful", err == nil, + "buckets", buckets, + "indices", indices, + "failures", loadingErrors, + ) + }() + // load list of multitenant tsdbs mulitenantDir := managerMultitenantDir(m.dir) files, err := ioutil.ReadDir(mulitenantDir) @@ -92,6 +107,7 @@ func (m *tsdbManager) Start() error { ) continue } + buckets++ tsdbs, err := ioutil.ReadDir(filepath.Join(mulitenantDir, f.Name())) if err != nil { @@ -108,6 +124,7 @@ func (m *tsdbManager) Start() error { if !ok { continue } + indices++ prefixed := newPrefixedIdentifier(id, filepath.Join(mulitenantDir, f.Name()), "") loaded, err := NewShippableTSDBFile( @@ -121,9 +138,11 @@ func (m *tsdbManager) Start() error { "tsdbPath", prefixed.Path(), "err", err.Error(), ) + loadingErrors++ } if err := m.shipper.AddIndex(f.Name(), "", loaded); err != nil { + loadingErrors++ return err } }