Skip to content

Commit

Permalink
make index shipper read path handle range of tables by type of index (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeepsukhani authored Jun 6, 2022
1 parent 9a2df5a commit 82da01a
Show file tree
Hide file tree
Showing 9 changed files with 365 additions and 51 deletions.
29 changes: 29 additions & 0 deletions pkg/storage/config/schema_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,26 @@ var (
errZeroLengthConfig = errors.New("must specify at least one schema configuration")
)

// TableRange represents a range of table numbers built based on the configured schema start/end date and the table period.
// Both Start and End are inclusive.
type TableRange struct {
Start, End int64
}

// TableRanges represents a list of table ranges for multiple schemas.
type TableRanges []TableRange

// TableNumberInRange tells whether given table number falls in any of the ranges.
func (t TableRanges) TableNumberInRange(tableNumber int64) bool {
for _, r := range t {
if r.Start <= tableNumber && tableNumber <= r.End {
return true
}
}

return false
}

// PeriodConfig defines the schema and tables to use for a period of time
type PeriodConfig struct {
From DayTime `yaml:"from"` // used when working with config
Expand Down Expand Up @@ -87,6 +107,15 @@ func (cfg *PeriodConfig) UnmarshalYAML(unmarshal func(interface{}) error) error
return err
}

// GetIndexTableNumberRange returns the table number range calculated based on
// the configured schema start date, index table period and the given schemaEndDate
func (cfg *PeriodConfig) GetIndexTableNumberRange(schemaEndDate DayTime) TableRange {
return TableRange{
Start: cfg.From.Unix() / int64(cfg.IndexTables.Period/time.Second),
End: schemaEndDate.Unix() / int64(cfg.IndexTables.Period/time.Second),
}
}

// DayTime is a model.Time what holds day-aligned values, and marshals to/from
// YAML in YYYY-MM-DD format.
type DayTime struct {
Expand Down
5 changes: 4 additions & 1 deletion pkg/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,10 @@ func NewIndexClient(name string, cfg Config, schemaCfg config.SchemaConfig, limi
return nil, err
}

boltDBIndexClientWithShipper, err = shipper.NewShipper(cfg.BoltDBShipperConfig, objectClient, limits, ownsTenantFn, registerer)
tableRanges := getIndexStoreTableRanges(config.BoltDBShipperType, schemaCfg.Configs)

boltDBIndexClientWithShipper, err = shipper.NewShipper(cfg.BoltDBShipperConfig, objectClient, limits,
ownsTenantFn, tableRanges, registerer)

return boltDBIndexClientWithShipper, err
default:
Expand Down
22 changes: 21 additions & 1 deletion pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package storage

import (
"context"
"math"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -225,7 +226,8 @@ func (s *store) storeForPeriod(p config.PeriodConfig, chunkClient client.Client,
return nil, nil, nil, err
}

writer, idx, err := tsdb.NewStore(s.cfg.TSDBShipperConfig, p, f, objectClient, s.limits, indexClientReg)
writer, idx, err := tsdb.NewStore(s.cfg.TSDBShipperConfig, p, f, objectClient, s.limits,
getIndexStoreTableRanges(config.TSDBType, s.schemaCfg.Configs), indexClientReg)
if err != nil {
return nil, nil, nil, err
}
Expand Down Expand Up @@ -508,3 +510,21 @@ func (f failingChunkWriter) Put(_ context.Context, _ []chunk.Chunk) error {
func (f failingChunkWriter) PutOne(_ context.Context, _, _ model.Time, _ chunk.Chunk) error {
return errWritingChunkUnsupported
}

func getIndexStoreTableRanges(indexType string, periodicConfigs []config.PeriodConfig) config.TableRanges {
var ranges config.TableRanges
for i, periodicConfig := range periodicConfigs {
if periodicConfig.IndexType != indexType {
continue
}

periodEndTime := config.DayTime{Time: math.MaxInt64}
if i < len(periodicConfigs)-1 {
periodEndTime = config.DayTime{Time: periodicConfigs[i+1].From.Time.Add(-time.Millisecond)}
}

ranges = append(ranges, periodicConfig.GetIndexTableNumberRange(periodEndTime))
}

return ranges
}
92 changes: 92 additions & 0 deletions pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package storage
import (
"context"
"log"
"math"
"net/http"
_ "net/http/pprof"
"path"
Expand Down Expand Up @@ -1278,3 +1279,94 @@ func Test_GetSeries(t *testing.T) {
})
}
}

func TestGetIndexStoreTableRanges(t *testing.T) {
now := model.Now()
schemaConfig := config.SchemaConfig{
Configs: []config.PeriodConfig{
{
From: config.DayTime{Time: now.Add(30 * 24 * time.Hour)},
IndexType: config.BoltDBShipperType,
ObjectType: "filesystem",
Schema: "v9",
IndexTables: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
},
{
From: config.DayTime{Time: now.Add(20 * 24 * time.Hour)},
IndexType: config.BoltDBShipperType,
ObjectType: "filesystem",
Schema: "v11",
IndexTables: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
RowShards: 2,
},
{
From: config.DayTime{Time: now.Add(15 * 24 * time.Hour)},
IndexType: config.TSDBType,
ObjectType: "filesystem",
Schema: "v11",
IndexTables: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
RowShards: 2,
},
{
From: config.DayTime{Time: now.Add(10 * 24 * time.Hour)},
IndexType: config.StorageTypeBigTable,
ObjectType: "filesystem",
Schema: "v11",
IndexTables: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
RowShards: 2,
},
{
From: config.DayTime{Time: now.Add(5 * 24 * time.Hour)},
IndexType: config.TSDBType,
ObjectType: "filesystem",
Schema: "v11",
IndexTables: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
RowShards: 2,
},
},
}

require.Equal(t, config.TableRanges{
{
Start: schemaConfig.Configs[0].From.Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second),
End: schemaConfig.Configs[1].From.Add(-time.Millisecond).Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second),
},
{
Start: schemaConfig.Configs[1].From.Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second),
End: schemaConfig.Configs[2].From.Add(-time.Millisecond).Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second),
},
}, getIndexStoreTableRanges(config.BoltDBShipperType, schemaConfig.Configs))

require.Equal(t, config.TableRanges{
{
Start: schemaConfig.Configs[3].From.Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second),
End: schemaConfig.Configs[4].From.Add(-time.Millisecond).Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second),
},
}, getIndexStoreTableRanges(config.StorageTypeBigTable, schemaConfig.Configs))

require.Equal(t, config.TableRanges{
{
Start: schemaConfig.Configs[2].From.Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second),
End: schemaConfig.Configs[3].From.Add(-time.Millisecond).Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second),
},
{
Start: schemaConfig.Configs[4].From.Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second),
End: model.Time(math.MaxInt64).Unix() / int64(schemaConfig.Configs[0].IndexTables.Period/time.Second),
},
}, getIndexStoreTableRanges(config.TSDBType, schemaConfig.Configs))
}
79 changes: 52 additions & 27 deletions pkg/storage/stores/indexshipper/downloads/table_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import (

"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/storage/chunk/client/util"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
util_log "github.com/grafana/loki/pkg/util/log"
Expand All @@ -22,9 +24,12 @@ import (

const (
cacheCleanupInterval = time.Hour
durationDay = 24 * time.Hour
daySeconds = int64(24 * time.Hour / time.Second)
)

// regexp for finding the trailing index bucket number at the end of table name
var extractTableNumberRegex = regexp.MustCompile(`[0-9]+$`)

type Limits interface {
AllByUserID() map[string]*validation.Limits
DefaultLimits() *validation.Limits
Expand All @@ -49,9 +54,10 @@ type Config struct {
}

type tableManager struct {
cfg Config
openIndexFileFunc index.OpenIndexFileFunc
indexStorageClient storage.Client
cfg Config
openIndexFileFunc index.OpenIndexFileFunc
indexStorageClient storage.Client
tableRangesToHandle config.TableRanges

tables map[string]Table
tablesMtx sync.RWMutex
Expand All @@ -65,21 +71,22 @@ type tableManager struct {
}

func NewTableManager(cfg Config, openIndexFileFunc index.OpenIndexFileFunc, indexStorageClient storage.Client,
ownsTenantFn IndexGatewayOwnsTenant, reg prometheus.Registerer) (TableManager, error) {
ownsTenantFn IndexGatewayOwnsTenant, tableRangesToHandle config.TableRanges, reg prometheus.Registerer) (TableManager, error) {
if err := util.EnsureDirectory(cfg.CacheDir); err != nil {
return nil, err
}

ctx, cancel := context.WithCancel(context.Background())
tm := &tableManager{
cfg: cfg,
openIndexFileFunc: openIndexFileFunc,
indexStorageClient: indexStorageClient,
ownsTenant: ownsTenantFn,
tables: make(map[string]Table),
metrics: newMetrics(reg),
ctx: ctx,
cancel: cancel,
cfg: cfg,
openIndexFileFunc: openIndexFileFunc,
indexStorageClient: indexStorageClient,
tableRangesToHandle: tableRangesToHandle,
ownsTenant: ownsTenantFn,
tables: make(map[string]Table),
metrics: newMetrics(reg),
ctx: ctx,
cancel: cancel,
}

// load the existing tables first.
Expand Down Expand Up @@ -271,23 +278,16 @@ func (tm *tableManager) ensureQueryReadiness(ctx context.Context) error {
return err
}

// regexp for finding the trailing index bucket number at the end
re, err := regexp.Compile(`[0-9]+$`)
if err != nil {
return err
}

for _, tableName := range tables {
match := re.Find([]byte(tableName))
if match == nil {
continue
}

tableNumber, err := strconv.ParseInt(string(match), 10, 64)
tableNumber, err := extractTableNumberFromName(tableName)
if err != nil {
return err
}

if tableNumber == -1 || !tm.tableRangesToHandle.TableNumberInRange(tableNumber) {
continue
}

// continue if the table is not within query readiness
if activeTableNumber-tableNumber > int64(largestQueryReadinessNum) {
continue
Expand Down Expand Up @@ -364,6 +364,14 @@ func (tm *tableManager) loadLocalTables() error {
continue
}

tableNumber, err := extractTableNumberFromName(fileInfo.Name())
if err != nil {
return err
}
if tableNumber == -1 || !tm.tableRangesToHandle.TableNumberInRange(tableNumber) {
continue
}

level.Info(util_log.Logger).Log("msg", fmt.Sprintf("loading local table %s", fileInfo.Name()))

table, err := LoadTable(fileInfo.Name(), filepath.Join(tm.cfg.CacheDir, fileInfo.Name()),
Expand All @@ -378,8 +386,25 @@ func (tm *tableManager) loadLocalTables() error {
return nil
}

// extractTableNumberFromName extract the table number from a given tableName.
// if the tableName doesn't match the regex, it would return -1 as table number.
func extractTableNumberFromName(tableName string) (int64, error) {
match := extractTableNumberRegex.Find([]byte(tableName))
if match == nil {
return -1, nil
}

tableNumber, err := strconv.ParseInt(string(match), 10, 64)
if err != nil {
return -1, err
}

return tableNumber, nil
}
func getActiveTableNumber() int64 {
periodSecs := int64(durationDay / time.Second)
return getTableNumberForTime(model.Now())
}

return time.Now().Unix() / periodSecs
func getTableNumberForTime(t model.Time) int64 {
return t.Unix() / daySeconds
}
Loading

0 comments on commit 82da01a

Please sign in to comment.