diff --git a/pkg/storage/config/schema_config.go b/pkg/storage/config/schema_config.go index f7856c72af2c1..c331eaae19cb1 100644 --- a/pkg/storage/config/schema_config.go +++ b/pkg/storage/config/schema_config.go @@ -60,6 +60,26 @@ var ( errZeroLengthConfig = errors.New("must specify at least one schema configuration") ) +// TableRange represents a range of table numbers built based on the configured schema start/end date and the table period. +// Both Start and End are inclusive. +type TableRange struct { + Start, End int64 +} + +// TableRanges represents a list of table ranges for multiple schemas. +type TableRanges []TableRange + +// TableNumberInRange tells whether given table number falls in any of the ranges. +func (t TableRanges) TableNumberInRange(tableNumber int64) bool { + for _, r := range t { + if r.Start <= tableNumber && tableNumber <= r.End { + return true + } + } + + return false +} + // PeriodConfig defines the schema and tables to use for a period of time type PeriodConfig struct { From DayTime `yaml:"from"` // used when working with config @@ -87,6 +107,15 @@ func (cfg *PeriodConfig) UnmarshalYAML(unmarshal func(interface{}) error) error return err } +// GetIndexTableNumberRange returns the table number range calculated based on +// the configured schema start date, index table period and the given schemaEndDate +func (cfg *PeriodConfig) GetIndexTableNumberRange(schemaEndDate DayTime) TableRange { + return TableRange{ + Start: cfg.From.Unix() / int64(cfg.IndexTables.Period/time.Second), + End: schemaEndDate.Unix() / int64(cfg.IndexTables.Period/time.Second), + } +} + // DayTime is a model.Time what holds day-aligned values, and marshals to/from // YAML in YYYY-MM-DD format. type DayTime struct { diff --git a/pkg/storage/factory.go b/pkg/storage/factory.go index 052b26ebfc2df..b52c26385ba32 100644 --- a/pkg/storage/factory.go +++ b/pkg/storage/factory.go @@ -175,7 +175,10 @@ func NewIndexClient(name string, cfg Config, schemaCfg config.SchemaConfig, limi return nil, err } - boltDBIndexClientWithShipper, err = shipper.NewShipper(cfg.BoltDBShipperConfig, objectClient, limits, ownsTenantFn, registerer) + tableRanges := getIndexStoreTableRanges(config.BoltDBShipperType, schemaCfg.Configs) + + boltDBIndexClientWithShipper, err = shipper.NewShipper(cfg.BoltDBShipperConfig, objectClient, limits, + ownsTenantFn, tableRanges, registerer) return boltDBIndexClientWithShipper, err default: diff --git a/pkg/storage/store.go b/pkg/storage/store.go index a1801cd415cb5..9a2540b646a9c 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -2,6 +2,7 @@ package storage import ( "context" + "math" "time" "github.com/go-kit/log" @@ -225,7 +226,8 @@ func (s *store) storeForPeriod(p config.PeriodConfig, chunkClient client.Client, return nil, nil, nil, err } - writer, idx, err := tsdb.NewStore(s.cfg.TSDBShipperConfig, p, f, objectClient, s.limits, indexClientReg) + writer, idx, err := tsdb.NewStore(s.cfg.TSDBShipperConfig, p, f, objectClient, s.limits, + getIndexStoreTableRanges(config.TSDBType, s.schemaCfg.Configs), indexClientReg) if err != nil { return nil, nil, nil, err } @@ -508,3 +510,21 @@ func (f failingChunkWriter) Put(_ context.Context, _ []chunk.Chunk) error { func (f failingChunkWriter) PutOne(_ context.Context, _, _ model.Time, _ chunk.Chunk) error { return errWritingChunkUnsupported } + +func getIndexStoreTableRanges(indexType string, periodicConfigs []config.PeriodConfig) config.TableRanges { + var ranges config.TableRanges + for i, periodicConfig := range periodicConfigs { + if periodicConfig.IndexType != indexType { + continue + } + + periodEndTime := config.DayTime{Time: math.MaxInt64} + if i < len(periodicConfigs)-1 { + periodEndTime = config.DayTime{Time: periodicConfigs[i+1].From.Time.Add(-time.Millisecond)} + } + + ranges = append(ranges, periodicConfig.GetIndexTableNumberRange(periodEndTime)) + } + + return ranges +} diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index bddb734151b9d..173176bf34e20 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -3,6 +3,7 @@ package storage import ( "context" "log" + "math" "net/http" _ "net/http/pprof" "path" @@ -1278,3 +1279,94 @@ func Test_GetSeries(t *testing.T) { }) } } + +func TestGetIndexStoreTableRanges(t *testing.T) { + now := model.Now() + schemaConfig := config.SchemaConfig{ + Configs: []config.PeriodConfig{ + { + From: config.DayTime{Time: now.Add(30 * 24 * time.Hour)}, + IndexType: config.BoltDBShipperType, + ObjectType: "filesystem", + Schema: "v9", + IndexTables: config.PeriodicTableConfig{ + Prefix: "index_", + Period: time.Hour * 24, + }, + }, + { + From: config.DayTime{Time: now.Add(20 * 24 * time.Hour)}, + IndexType: config.BoltDBShipperType, + ObjectType: "filesystem", + Schema: "v11", + IndexTables: config.PeriodicTableConfig{ + Prefix: "index_", + Period: time.Hour * 24, + }, + RowShards: 2, + }, + { + From: config.DayTime{Time: now.Add(15 * 24 * time.Hour)}, + IndexType: config.TSDBType, + ObjectType: "filesystem", + Schema: "v11", + IndexTables: config.PeriodicTableConfig{ + Prefix: "index_", + Period: time.Hour * 24, + }, + RowShards: 2, + }, + { + From: config.DayTime{Time: now.Add(10 * 24 * time.Hour)}, + IndexType: config.StorageTypeBigTable, + ObjectType: "filesystem", + Schema: "v11", + IndexTables: config.PeriodicTableConfig{ + Prefix: "index_", + Period: time.Hour * 24, + }, + RowShards: 2, + }, + { + From: config.DayTime{Time: now.Add(5 * 24 * time.Hour)}, + IndexType: config.TSDBType, + ObjectType: "filesystem", + Schema: "v11", + IndexTables: config.PeriodicTableConfig{ + Prefix: "index_", + Period: time.Hour * 24, + }, + RowShards: 2, + }, + }, + } + + require.Equal(t, config.TableRanges{ + { + Start: schemaConfig.Configs[0].From.Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second), + End: schemaConfig.Configs[1].From.Add(-time.Millisecond).Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second), + }, + { + Start: schemaConfig.Configs[1].From.Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second), + End: schemaConfig.Configs[2].From.Add(-time.Millisecond).Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second), + }, + }, getIndexStoreTableRanges(config.BoltDBShipperType, schemaConfig.Configs)) + + require.Equal(t, config.TableRanges{ + { + Start: schemaConfig.Configs[3].From.Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second), + End: schemaConfig.Configs[4].From.Add(-time.Millisecond).Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second), + }, + }, getIndexStoreTableRanges(config.StorageTypeBigTable, schemaConfig.Configs)) + + require.Equal(t, config.TableRanges{ + { + Start: schemaConfig.Configs[2].From.Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second), + End: schemaConfig.Configs[3].From.Add(-time.Millisecond).Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second), + }, + { + Start: schemaConfig.Configs[4].From.Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second), + End: model.Time(math.MaxInt64).Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second), + }, + }, getIndexStoreTableRanges(config.TSDBType, schemaConfig.Configs)) +} diff --git a/pkg/storage/stores/indexshipper/downloads/table_manager.go b/pkg/storage/stores/indexshipper/downloads/table_manager.go index c425e26f87a2e..ca95fd5b8a472 100644 --- a/pkg/storage/stores/indexshipper/downloads/table_manager.go +++ b/pkg/storage/stores/indexshipper/downloads/table_manager.go @@ -12,8 +12,10 @@ import ( "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" "github.com/grafana/loki/pkg/storage/chunk/client/util" + "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/indexshipper/index" "github.com/grafana/loki/pkg/storage/stores/shipper/storage" util_log "github.com/grafana/loki/pkg/util/log" @@ -22,9 +24,12 @@ import ( const ( cacheCleanupInterval = time.Hour - durationDay = 24 * time.Hour + daySeconds = int64(24 * time.Hour / time.Second) ) +// regexp for finding the trailing index bucket number at the end of table name +var extractTableNumberRegex = regexp.MustCompile(`[0-9]+$`) + type Limits interface { AllByUserID() map[string]*validation.Limits DefaultLimits() *validation.Limits @@ -49,9 +54,10 @@ type Config struct { } type tableManager struct { - cfg Config - openIndexFileFunc index.OpenIndexFileFunc - indexStorageClient storage.Client + cfg Config + openIndexFileFunc index.OpenIndexFileFunc + indexStorageClient storage.Client + tableRangesToHandle config.TableRanges tables map[string]Table tablesMtx sync.RWMutex @@ -65,21 +71,22 @@ type tableManager struct { } func NewTableManager(cfg Config, openIndexFileFunc index.OpenIndexFileFunc, indexStorageClient storage.Client, - ownsTenantFn IndexGatewayOwnsTenant, reg prometheus.Registerer) (TableManager, error) { + ownsTenantFn IndexGatewayOwnsTenant, tableRangesToHandle config.TableRanges, reg prometheus.Registerer) (TableManager, error) { if err := util.EnsureDirectory(cfg.CacheDir); err != nil { return nil, err } ctx, cancel := context.WithCancel(context.Background()) tm := &tableManager{ - cfg: cfg, - openIndexFileFunc: openIndexFileFunc, - indexStorageClient: indexStorageClient, - ownsTenant: ownsTenantFn, - tables: make(map[string]Table), - metrics: newMetrics(reg), - ctx: ctx, - cancel: cancel, + cfg: cfg, + openIndexFileFunc: openIndexFileFunc, + indexStorageClient: indexStorageClient, + tableRangesToHandle: tableRangesToHandle, + ownsTenant: ownsTenantFn, + tables: make(map[string]Table), + metrics: newMetrics(reg), + ctx: ctx, + cancel: cancel, } // load the existing tables first. @@ -271,23 +278,16 @@ func (tm *tableManager) ensureQueryReadiness(ctx context.Context) error { return err } - // regexp for finding the trailing index bucket number at the end - re, err := regexp.Compile(`[0-9]+$`) - if err != nil { - return err - } - for _, tableName := range tables { - match := re.Find([]byte(tableName)) - if match == nil { - continue - } - - tableNumber, err := strconv.ParseInt(string(match), 10, 64) + tableNumber, err := extractTableNumberFromName(tableName) if err != nil { return err } + if tableNumber == -1 || !tm.tableRangesToHandle.TableNumberInRange(tableNumber) { + continue + } + // continue if the table is not within query readiness if activeTableNumber-tableNumber > int64(largestQueryReadinessNum) { continue @@ -364,6 +364,14 @@ func (tm *tableManager) loadLocalTables() error { continue } + tableNumber, err := extractTableNumberFromName(fileInfo.Name()) + if err != nil { + return err + } + if tableNumber == -1 || !tm.tableRangesToHandle.TableNumberInRange(tableNumber) { + continue + } + level.Info(util_log.Logger).Log("msg", fmt.Sprintf("loading local table %s", fileInfo.Name())) table, err := LoadTable(fileInfo.Name(), filepath.Join(tm.cfg.CacheDir, fileInfo.Name()), @@ -378,8 +386,25 @@ func (tm *tableManager) loadLocalTables() error { return nil } +// extractTableNumberFromName extract the table number from a given tableName. +// if the tableName doesn't match the regex, it would return -1 as table number. +func extractTableNumberFromName(tableName string) (int64, error) { + match := extractTableNumberRegex.Find([]byte(tableName)) + if match == nil { + return -1, nil + } + + tableNumber, err := strconv.ParseInt(string(match), 10, 64) + if err != nil { + return -1, err + } + + return tableNumber, nil +} func getActiveTableNumber() int64 { - periodSecs := int64(durationDay / time.Second) + return getTableNumberForTime(model.Now()) +} - return time.Now().Unix() / periodSecs +func getTableNumberForTime(t model.Time) int64 { + return t.Unix() / daySeconds } diff --git a/pkg/storage/stores/indexshipper/downloads/table_manager_test.go b/pkg/storage/stores/indexshipper/downloads/table_manager_test.go index 9913703af2150..37eaa148798bb 100644 --- a/pkg/storage/stores/indexshipper/downloads/table_manager_test.go +++ b/pkg/storage/stores/indexshipper/downloads/table_manager_test.go @@ -3,6 +3,7 @@ package downloads import ( "context" "fmt" + "math" "path/filepath" "testing" "time" @@ -10,6 +11,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/storage/chunk/client/local" + "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/indexshipper/index" "github.com/grafana/loki/pkg/storage/stores/shipper/storage" "github.com/grafana/loki/pkg/validation" @@ -30,7 +32,7 @@ func buildTestStorageClient(t *testing.T, path string) storage.Client { type stopFunc func() -func buildTestTableManager(t *testing.T, path string) (*tableManager, stopFunc) { +func buildTestTableManager(t *testing.T, path string, tableRangesToHandle config.TableRanges) (*tableManager, stopFunc) { indexStorageClient := buildTestStorageClient(t, path) cachePath := filepath.Join(path, cacheDirName) @@ -40,9 +42,18 @@ func buildTestTableManager(t *testing.T, path string) (*tableManager, stopFunc) CacheTTL: time.Hour, Limits: &mockLimits{}, } + + if tableRangesToHandle == nil { + tableRangesToHandle = config.TableRanges{ + { + Start: 0, + End: math.MaxInt64, + }, + } + } tblManager, err := NewTableManager(cfg, func(s string) (index.Index, error) { return openMockIndexFile(t, s), nil - }, indexStorageClient, nil, nil) + }, indexStorageClient, nil, tableRangesToHandle, nil) require.NoError(t, err) return tblManager.(*tableManager), func() { @@ -62,7 +73,7 @@ func TestTableManager_ForEach(t *testing.T) { } } - tableManager, stopFunc := buildTestTableManager(t, tempDir) + tableManager, stopFunc := buildTestTableManager(t, tempDir, nil) defer stopFunc() for _, tableName := range tables { @@ -81,7 +92,7 @@ func TestTableManager_ForEach(t *testing.T) { func TestTableManager_cleanupCache(t *testing.T) { tempDir := t.TempDir() - tableManager, stopFunc := buildTestTableManager(t, tempDir) + tableManager, stopFunc := buildTestTableManager(t, tempDir, nil) defer stopFunc() // one table that would expire and other one won't @@ -110,7 +121,6 @@ func TestTableManager_cleanupCache(t *testing.T) { } func TestTableManager_ensureQueryReadiness(t *testing.T) { - activeTableNumber := getActiveTableNumber() mockIndexStorageClient := &mockIndexStorageClient{ userIndexesInTables: map[string][]string{}, } @@ -124,12 +134,11 @@ func TestTableManager_ensureQueryReadiness(t *testing.T) { cfg: cfg, indexStorageClient: mockIndexStorageClient, tables: make(map[string]Table), - ctx: context.Background(), - cancel: func() {}, - } - - buildTableName := func(idx int) string { - return fmt.Sprintf("table_%d", activeTableNumber-int64(idx)) + tableRangesToHandle: config.TableRanges{{ + Start: 0, End: math.MaxInt64, + }}, + ctx: context.Background(), + cancel: func() {}, } // setup 10 tables with 5 latest tables having user index for user1 and user2 @@ -153,9 +162,11 @@ func TestTableManager_ensureQueryReadiness(t *testing.T) { name string queryReadyNumDaysCfg int queryReadinessLimits mockLimits + tableRangesToHandle config.TableRanges expectedQueryReadinessDoneForUsers map[string][]string }{ + // includes whole table range { name: "no query readiness configured", queryReadinessLimits: mockLimits{}, @@ -251,12 +262,68 @@ func TestTableManager_ensureQueryReadiness(t *testing.T) { buildTableName(3): {"user2"}, }, }, + // includes limited table range + { + name: "common index: 20 days", + queryReadyNumDaysCfg: 20, + tableRangesToHandle: config.TableRanges{ + { + End: buildTableNumber(0), + Start: buildTableNumber(4), + }, + { + End: buildTableNumber(7), + Start: buildTableNumber(9), + }, + }, + expectedQueryReadinessDoneForUsers: map[string][]string{ + buildTableName(0): {}, + buildTableName(1): {}, + buildTableName(2): {}, + buildTableName(3): {}, + buildTableName(4): {}, + + buildTableName(7): {}, + buildTableName(8): {}, + buildTableName(9): {}, + }, + }, + { + name: "common index: 5 days, user index default: 2 days", + queryReadinessLimits: mockLimits{ + queryReadyIndexNumDaysDefault: 2, + }, + queryReadyNumDaysCfg: 5, + tableRangesToHandle: config.TableRanges{ + { + End: buildTableNumber(0), + Start: buildTableNumber(1), + }, + { + End: buildTableNumber(4), + Start: buildTableNumber(5), + }, + }, + expectedQueryReadinessDoneForUsers: map[string][]string{ + buildTableName(0): {"user1", "user2"}, + buildTableName(1): {"user1", "user2"}, + buildTableName(4): {}, + buildTableName(5): {}, + }, + }, } { t.Run(tc.name, func(t *testing.T) { tc := tc // just to make the linter happy resetTables() tableManager.cfg.QueryReadyNumDays = tc.queryReadyNumDaysCfg tableManager.cfg.Limits = &tc.queryReadinessLimits + if tc.tableRangesToHandle == nil { + tableManager.tableRangesToHandle = config.TableRanges{{ + Start: 0, End: math.MaxInt64, + }} + } else { + tableManager.tableRangesToHandle = tc.tableRangesToHandle + } require.NoError(t, tableManager.ensureQueryReadiness(context.Background())) for name, table := range tableManager.tables { @@ -266,6 +333,67 @@ func TestTableManager_ensureQueryReadiness(t *testing.T) { } } +func TestTableManager_loadTables(t *testing.T) { + tempDir := t.TempDir() + objectStoragePath := filepath.Join(tempDir, objectsStorageDirName) + cachePath := filepath.Join(tempDir, cacheDirName) + + var tables []string + for i := 0; i < 10; i++ { + tables = append(tables, buildTableName(i)) + } + users := []string{"", "user1"} + for _, tableName := range tables { + for _, userID := range users { + setupIndexesAtPath(t, userID, filepath.Join(objectStoragePath, tableName, userID), 1, 5) + setupIndexesAtPath(t, userID, filepath.Join(cachePath, tableName, userID), 1, 5) + } + } + + verifyTables := func(tableManager *tableManager, tables []string) { + for _, tableName := range tables { + for i, userID := range []string{"user1", "common-index-user"} { + expectedIndexes := buildListOfExpectedIndexes("", 1, 5) + if i == 0 { + expectedIndexes = append(expectedIndexes, buildListOfExpectedIndexes(userID, 1, 5)...) + } + verifyIndexForEach(t, expectedIndexes, func(callbackFunc func(index.Index) error) error { + return tableManager.ForEach(context.Background(), tableName, userID, callbackFunc) + }) + } + } + } + + tableManager, stopFunc := buildTestTableManager(t, tempDir, nil) + require.Equal(t, len(tables), len(tableManager.tables)) + verifyTables(tableManager, tables) + + stopFunc() + + tableManager, stopFunc = buildTestTableManager(t, tempDir, config.TableRanges{ + { + End: buildTableNumber(0), + Start: buildTableNumber(1), + }, + { + End: buildTableNumber(5), + Start: buildTableNumber(8), + }, + }) + defer stopFunc() + require.Equal(t, 6, len(tableManager.tables)) + + tables = []string{ + buildTableName(0), + buildTableName(1), + buildTableName(5), + buildTableName(6), + buildTableName(7), + buildTableName(8), + } + verifyTables(tableManager, tables) +} + type mockLimits struct { queryReadyIndexNumDaysDefault int queryReadyIndexNumDaysByUser map[string]int @@ -325,3 +453,11 @@ func (m *mockIndexStorageClient) ListTables(ctx context.Context) ([]string, erro func (m *mockIndexStorageClient) ListFiles(ctx context.Context, tableName string, bypassCache bool) ([]storage.IndexFile, []string, error) { return []storage.IndexFile{}, m.userIndexesInTables[tableName], nil } + +func buildTableNumber(idx int) int64 { + return getActiveTableNumber() - int64(idx) +} + +func buildTableName(idx int) string { + return fmt.Sprintf("table_%d", buildTableNumber(idx)) +} diff --git a/pkg/storage/stores/indexshipper/shipper.go b/pkg/storage/stores/indexshipper/shipper.go index 36dea7cebff73..5c154360de0d3 100644 --- a/pkg/storage/stores/indexshipper/shipper.go +++ b/pkg/storage/stores/indexshipper/shipper.go @@ -11,6 +11,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/grafana/loki/pkg/storage/chunk/client" + "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/indexshipper/downloads" "github.com/grafana/loki/pkg/storage/stores/indexshipper/gatewayclient" "github.com/grafana/loki/pkg/storage/stores/indexshipper/index" @@ -99,8 +100,12 @@ type indexShipper struct { // NewIndexShipper creates a shipper for providing index store functionality using index files and object storage. // It manages the whole life cycle of uploading the index and downloading the index at query time. +// +// Since IndexShipper is generic, which means it can be used to manage various index types under the same object storage and/or local disk path, +// it accepts ranges of table numbers(config.TableRanges) to be managed by the shipper. +// This is mostly useful on the read path to sync and manage specific index tables within the given table number ranges. func NewIndexShipper(cfg Config, storageClient client.ObjectClient, limits downloads.Limits, - ownsTenantFn downloads.IndexGatewayOwnsTenant, open index.OpenIndexFileFunc, reg prometheus.Registerer) (IndexShipper, error) { + ownsTenantFn downloads.IndexGatewayOwnsTenant, open index.OpenIndexFileFunc, tableRangesToHandle config.TableRanges, reg prometheus.Registerer) (IndexShipper, error) { switch cfg.Mode { case ModeReadOnly, ModeWriteOnly, ModeReadWrite: default: @@ -111,7 +116,7 @@ func NewIndexShipper(cfg Config, storageClient client.ObjectClient, limits downl openIndexFileFunc: open, } - err := shipper.init(storageClient, limits, ownsTenantFn, reg) + err := shipper.init(storageClient, limits, ownsTenantFn, tableRangesToHandle, reg) if err != nil { return nil, err } @@ -122,7 +127,7 @@ func NewIndexShipper(cfg Config, storageClient client.ObjectClient, limits downl } func (s *indexShipper) init(storageClient client.ObjectClient, limits downloads.Limits, - ownsTenantFn downloads.IndexGatewayOwnsTenant, reg prometheus.Registerer) error { + ownsTenantFn downloads.IndexGatewayOwnsTenant, tableRangesToHandle config.TableRanges, reg prometheus.Registerer) error { indexStorageClient := storage.NewIndexStorageClient(storageClient, s.cfg.SharedStoreKeyPrefix) if s.cfg.Mode != ModeReadOnly { @@ -146,7 +151,7 @@ func (s *indexShipper) init(storageClient client.ObjectClient, limits downloads. QueryReadyNumDays: s.cfg.QueryReadyNumDays, Limits: limits, } - downloadsManager, err := downloads.NewTableManager(cfg, s.openIndexFileFunc, indexStorageClient, ownsTenantFn, reg) + downloadsManager, err := downloads.NewTableManager(cfg, s.openIndexFileFunc, indexStorageClient, ownsTenantFn, tableRangesToHandle, reg) if err != nil { return err } diff --git a/pkg/storage/stores/shipper/shipper_index_client.go b/pkg/storage/stores/shipper/shipper_index_client.go index c5885b16f14a5..c2b62b278f5ad 100644 --- a/pkg/storage/stores/shipper/shipper_index_client.go +++ b/pkg/storage/stores/shipper/shipper_index_client.go @@ -18,6 +18,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/client" "github.com/grafana/loki/pkg/storage/chunk/client/local" "github.com/grafana/loki/pkg/storage/chunk/client/util" + "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/indexshipper" "github.com/grafana/loki/pkg/storage/stores/indexshipper/downloads" series_index "github.com/grafana/loki/pkg/storage/stores/series/index" @@ -63,13 +64,14 @@ type indexClient struct { } // NewShipper creates a shipper for syncing local objects with a store -func NewShipper(cfg Config, storageClient client.ObjectClient, limits downloads.Limits, ownsTenantFn downloads.IndexGatewayOwnsTenant, registerer prometheus.Registerer) (series_index.Client, error) { +func NewShipper(cfg Config, storageClient client.ObjectClient, limits downloads.Limits, + ownsTenantFn downloads.IndexGatewayOwnsTenant, tableRanges config.TableRanges, registerer prometheus.Registerer) (series_index.Client, error) { i := indexClient{ cfg: cfg, metrics: newMetrics(registerer), } - err := i.init(storageClient, limits, ownsTenantFn, registerer) + err := i.init(storageClient, limits, ownsTenantFn, tableRanges, registerer) if err != nil { return nil, err } @@ -79,10 +81,11 @@ func NewShipper(cfg Config, storageClient client.ObjectClient, limits downloads. return &i, nil } -func (i *indexClient) init(storageClient client.ObjectClient, limits downloads.Limits, ownsTenantFn downloads.IndexGatewayOwnsTenant, registerer prometheus.Registerer) error { +func (i *indexClient) init(storageClient client.ObjectClient, limits downloads.Limits, + ownsTenantFn downloads.IndexGatewayOwnsTenant, tableRanges config.TableRanges, registerer prometheus.Registerer) error { var err error i.indexShipper, err = indexshipper.NewIndexShipper(i.cfg.Config, storageClient, limits, ownsTenantFn, - indexfile.OpenIndexFile, prometheus.WrapRegistererWithPrefix("loki_boltdb_shipper_", registerer)) + indexfile.OpenIndexFile, tableRanges, prometheus.WrapRegistererWithPrefix("loki_boltdb_shipper_", registerer)) if err != nil { return err } diff --git a/pkg/storage/stores/tsdb/store.go b/pkg/storage/stores/tsdb/store.go index a353f9863cd63..a219d104006de 100644 --- a/pkg/storage/stores/tsdb/store.go +++ b/pkg/storage/stores/tsdb/store.go @@ -32,10 +32,10 @@ type store struct { // fetcher.Fetcher instances could be different due to periodic configs having different types of object storage configured // for storing chunks. func NewStore(indexShipperCfg indexshipper.Config, p config.PeriodConfig, f *fetcher.Fetcher, - objectClient client.ObjectClient, limits downloads.Limits, reg prometheus.Registerer) (stores.ChunkWriter, series.IndexStore, error) { + objectClient client.ObjectClient, limits downloads.Limits, tableRanges config.TableRanges, reg prometheus.Registerer) (stores.ChunkWriter, series.IndexStore, error) { if storeInstance == nil { storeInstance = &store{} - err := storeInstance.init(indexShipperCfg, p, objectClient, limits, reg) + err := storeInstance.init(indexShipperCfg, p, objectClient, limits, tableRanges, reg) if err != nil { return nil, nil, err } @@ -45,7 +45,7 @@ func NewStore(indexShipperCfg indexshipper.Config, p config.PeriodConfig, f *fet } func (s *store) init(indexShipperCfg indexshipper.Config, p config.PeriodConfig, - objectClient client.ObjectClient, limits downloads.Limits, reg prometheus.Registerer) error { + objectClient client.ObjectClient, limits downloads.Limits, tableRanges config.TableRanges, reg prometheus.Registerer) error { shpr, err := indexshipper.NewIndexShipper( indexShipperCfg, @@ -53,6 +53,7 @@ func (s *store) init(indexShipperCfg indexshipper.Config, p config.PeriodConfig, limits, nil, OpenShippableTSDB, + tableRanges, prometheus.WrapRegistererWithPrefix("loki_tsdb_shipper_", reg), ) if err != nil {