Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TSDB shipper + WAL #6049

Merged
merged 89 commits into from
May 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
89 commits
Select commit Hold shift + click to select a range
7231181
begins speccing out TSDB Head
owen-d Apr 8, 2022
5e20970
auto incrementing series ref + mempostings
owen-d Apr 8, 2022
a04a2fd
mintime/maxtime methods
owen-d Apr 8, 2022
0b67b76
tsdb head IndexReader impl
owen-d Apr 9, 2022
9019d10
head correctly populates ref lookup
owen-d Apr 11, 2022
35402a0
tsdb head tests
owen-d Apr 11, 2022
e69729e
adds prometheus license to tsdb head
owen-d Apr 11, 2022
8de2ee6
Merge remote-tracking branch 'upstream/main' into tsdb/head
owen-d Apr 11, 2022
968e617
linting
owen-d Apr 11, 2022
e051eee
[WIP] speccing out tsdb head wal
owen-d Apr 12, 2022
7f207c7
fix length check and adds tsdb wal encoding tests
owen-d Apr 12, 2022
3da9db8
exposes wal structs & removes closed semantics
owen-d Apr 12, 2022
734a106
logs start time in the tsdb wal
owen-d Apr 12, 2022
dbb3067
wal interface + testing
owen-d Apr 12, 2022
cab1020
exports walrecord + returns ref when appending
owen-d Apr 13, 2022
60bfe0b
specs out head manager
owen-d Apr 13, 2022
5a07e53
tsdb head manager wal initialization
owen-d Apr 14, 2022
d83b86e
tsdb wal rotation
owen-d Apr 14, 2022
c629a2b
wals dont use node name, but tsdb files do
owen-d Apr 14, 2022
c115627
cleans up fn signature
owen-d Apr 14, 2022
4eaa1f3
multi tsdb idx now just wraps Index interfaces
owen-d Apr 15, 2022
9dc7234
no longer sorts indices when creating multi-idx
owen-d Apr 15, 2022
bf4b125
tenantHeads & HeadManger index impls
owen-d Apr 15, 2022
5d9e66c
head mgr tests
owen-d Apr 15, 2022
3ccd7ee
bugfixes & head manager tests
owen-d Apr 15, 2022
cd39afb
tsdb dir selection now helper fns
owen-d Apr 15, 2022
b556d90
period utility
owen-d Apr 15, 2022
ce6a27c
pulls out more code to helpers, fixes some var races
owen-d Apr 15, 2022
2fd813e
head recovery is more generic
owen-d Apr 15, 2022
4750079
tsdb manager builds from wals
owen-d Apr 15, 2022
c5eda36
pulls more helpers out of headmanager
owen-d Apr 16, 2022
c6b0747
lockedIdx, Close() on idx, tsdbManager update
owen-d Apr 16, 2022
f6859d1
Merge remote-tracking branch 'upstream/main' into tsdb/head-wal
owen-d Apr 18, 2022
3ece142
removes mmap from index reader implementation
owen-d Apr 18, 2022
8b4c2b4
tsdb file
owen-d Apr 18, 2022
ee6eda9
adds tsdb shipper config and refactors initStore
owen-d Apr 18, 2022
b844d85
removes unused tsdbManager code
owen-d Apr 18, 2022
0be6483
Merge remote-tracking branch 'upstream/main' into tsdb/head-wal
owen-d Apr 19, 2022
eb31b32
implements stores.Index and stores.ChunkWriter for tsdb
owen-d Apr 19, 2022
32218b9
chunk.Data now supports an Entries() method
owen-d Apr 19, 2022
7d20b5e
moves walreader to new util/wal pkg to avoid circular dep + tsdb stor…
owen-d Apr 19, 2022
f6f557e
tsdb store
owen-d Apr 19, 2022
3fbf316
passes indexWriter to chunkWriter
owen-d Apr 21, 2022
5ad1cf5
build a tsdb per index bucket in according with shipper conventions
owen-d Apr 21, 2022
52c4ea3
dont open tsdb files until necessary for indexshipper
owen-d Apr 21, 2022
e28b941
tsdbManager Index impl
owen-d Apr 21, 2022
15667a7
tsdb defaults + initStore fix for invalid looping
owen-d Apr 22, 2022
4775798
fixes UsingTSDB helper
owen-d Apr 22, 2022
7a51eda
disables deleteRequestStore when using TSDB
owen-d Apr 22, 2022
5fa3b4a
pass limits to tsdb store
owen-d Apr 22, 2022
c88d742
always start headmanager for tsdb
owen-d Apr 25, 2022
ed60710
fixes copy bug
owen-d Apr 25, 2022
cacbea8
more logging
owen-d Apr 25, 2022
afecd21
fixes duplicate tenant label bug
owen-d Apr 25, 2022
1f84ce0
debug logs, uses label builder, removes __name__=logs for tsdb
owen-d Apr 26, 2022
3b734cf
tsdb fixes labels at earlier pt
owen-d Apr 26, 2022
b88df5c
account for setting tenant label in head manager test
owen-d Apr 27, 2022
cdb9cfd
changing tsdb dir names
owen-d Apr 27, 2022
2c2dbb7
identifier interface, builder to tsdb pkg
owen-d Apr 27, 2022
9024210
tsdb version path prefix
owen-d Apr 27, 2022
40cc48c
fixes buildfromwals identifier
owen-d Apr 27, 2022
8d2d033
fixes tsdb shipper paths
owen-d Apr 28, 2022
08500ed
split buckets once per user set
owen-d Apr 28, 2022
f84af6a
refactors combining single and multi tenant tsdb indices on shipper r…
owen-d Apr 28, 2022
aa46336
indexshipper ignores old gzip logic
owen-d Apr 28, 2022
f388e24
method name refactor
owen-d Apr 28, 2022
4690dff
remove unused record type
owen-d May 3, 2022
a31c203
removes v1 prefix in tsdb paths and refactores indices method
owen-d May 3, 2022
8e6956f
ignores double optimization in tsdb looking for multitenant idx, ship…
owen-d May 3, 2022
e928519
removes 5-ln requirement on shipper tablename regexp
owen-d May 3, 2022
bc0fe93
groups identifiers, begins removing multitenant prefix in shipped files
owen-d May 3, 2022
9fc3385
passses open fn to indexshipper
owen-d May 3, 2022
3ce9110
exposes RealByteSlice
owen-d May 3, 2022
725abf8
TSDBFile no longer needs a file descriptor, parses gzip extensions
owen-d May 3, 2022
8da79b8
method signature fixing
owen-d May 3, 2022
bce8c52
stop masquerading as compressed indices post-download in indexshipper
owen-d May 3, 2022
e229160
variable bucket regexp
owen-d May 3, 2022
0b2125e
removes accidental configs committed
owen-d May 4, 2022
d1ff6f7
Merge remote-tracking branch 'upstream/main' into tsdb/head-wal
owen-d May 4, 2022
cc34584
label matcher handling for multitenancy and metricname in tsdb
owen-d May 4, 2022
a850e42
explicitly require fingerprint when creating tsdb index
owen-d May 4, 2022
14dcb1d
only add tenant label when creating multitenant tsdb
owen-d May 4, 2022
a853035
linting + unused removal
owen-d May 4, 2022
4871228
more linting :(
owen-d May 4, 2022
0a0aed8
goimports
owen-d May 4, 2022
74ccc09
removes uploadername from indexshipper
owen-d May 4, 2022
65aac18
maxuint32 for arm32 builds
owen-d May 4, 2022
eafaf1a
tsdb chunk filterer support
owen-d May 4, 2022
c6e7cf6
always set ingester name when using object storage index
sandeepsukhani May 5, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion pkg/chunkenc/facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ func (f Facade) Utilization() float64 {
return f.c.Utilization()
}

// Size implements encoding.Chunk.
// Size implements encoding.Chunk, which unfortunately uses
// the Size method to refer to the byte size and not the entry count
// like chunkenc.Chunk does.
func (f Facade) Size() int {
if f.c == nil {
return 0
Expand All @@ -82,6 +84,13 @@ func (f Facade) Size() int {
return f.c.CompressedSize()
}

func (f Facade) Entries() int {
if f.c == nil {
return 0
}
return f.c.Size()
}

// LokiChunk returns the chunkenc.Chunk.
func (f Facade) LokiChunk() Chunk {
return f.c
Expand Down
3 changes: 2 additions & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/grafana/loki/pkg/util"
errUtil "github.com/grafana/loki/pkg/util"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/wal"
"github.com/grafana/loki/pkg/validation"
)

Expand Down Expand Up @@ -420,7 +421,7 @@ func (i *Ingester) starting(ctx context.Context) error {
)

level.Info(util_log.Logger).Log("msg", "recovering from WAL")
segmentReader, segmentCloser, err := newWalReader(i.cfg.WAL.Dir, -1)
segmentReader, segmentCloser, err := wal.NewWalReader(i.cfg.WAL.Dir, -1)
if err != nil {
return err
}
Expand Down
34 changes: 0 additions & 34 deletions pkg/ingester/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,40 +30,6 @@ func (NoopWALReader) Err() error { return nil }
func (NoopWALReader) Record() []byte { return nil }
func (NoopWALReader) Close() error { return nil }

// If startSegment is <0, it means all the segments.
func newWalReader(dir string, startSegment int) (*wal.Reader, io.Closer, error) {
var (
segmentReader io.ReadCloser
err error
)
if startSegment < 0 {
segmentReader, err = wal.NewSegmentsReader(dir)
if err != nil {
return nil, nil, err
}
} else {
first, last, err := wal.Segments(dir)
if err != nil {
return nil, nil, err
}
if startSegment > last {
return nil, nil, errors.New("start segment is beyond the last WAL segment")
}
if first > startSegment {
startSegment = first
}
segmentReader, err = wal.NewSegmentsRangeReader(wal.SegmentRange{
Dir: dir,
First: startSegment,
Last: -1, // Till the end.
})
if err != nil {
return nil, nil, err
}
}
return wal.NewReader(segmentReader), segmentReader, nil
}

func newCheckpointReader(dir string) (WALReader, io.Closer, error) {
lastCheckpointDir, idx, err := lastCheckpoint(dir)
if err != nil {
Expand Down
29 changes: 29 additions & 0 deletions pkg/loki/config_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source {
betterBoltdbShipperDefaults(r, &defaults)
}

if len(r.SchemaConfig.Configs) > 0 && config.UsingTSDB(r.SchemaConfig.Configs) {
betterTSDBShipperDefaults(r, &defaults)
}

applyFIFOCacheConfig(r)
applyIngesterFinalSleep(r)
applyIngesterReplicationFactor(r)
Expand Down Expand Up @@ -497,6 +501,31 @@ func betterBoltdbShipperDefaults(cfg, defaults *ConfigWrapper) {
}
}

func betterTSDBShipperDefaults(cfg, defaults *ConfigWrapper) {
currentSchemaIdx := config.ActivePeriodConfig(cfg.SchemaConfig.Configs)
currentSchema := cfg.SchemaConfig.Configs[currentSchemaIdx]

if cfg.StorageConfig.TSDBShipperConfig.SharedStoreType == defaults.StorageConfig.TSDBShipperConfig.SharedStoreType {
cfg.StorageConfig.TSDBShipperConfig.SharedStoreType = currentSchema.ObjectType
}

if cfg.CompactorConfig.SharedStoreType == defaults.CompactorConfig.SharedStoreType {
cfg.CompactorConfig.SharedStoreType = currentSchema.ObjectType
}

if cfg.Common.PathPrefix != "" {
prefix := strings.TrimSuffix(cfg.Common.PathPrefix, "/")

if cfg.StorageConfig.TSDBShipperConfig.ActiveIndexDirectory == "" {
cfg.StorageConfig.TSDBShipperConfig.ActiveIndexDirectory = fmt.Sprintf("%s/tsdb-shipper-active", prefix)
}

if cfg.StorageConfig.TSDBShipperConfig.CacheLocation == "" {
cfg.StorageConfig.TSDBShipperConfig.CacheLocation = fmt.Sprintf("%s/tsdb-shipper-cache", prefix)
}
}
}

// applyFIFOCacheConfig turns on FIFO cache for the chunk store and for the query range results,
// but only if no other cache storage is configured (redis or memcache).
//
Expand Down
111 changes: 79 additions & 32 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/cache"
chunk_util "github.com/grafana/loki/pkg/storage/chunk/client/util"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/indexshipper"
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor"
Expand Down Expand Up @@ -382,19 +383,25 @@ func (t *Loki) initTableManager() (services.Service, error) {
}

func (t *Loki) initStore() (_ services.Service, err error) {
// Always set these configs
// TODO(owen-d): Do the same for TSDB when we add IndexGatewayClientConfig
t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Mode = t.Cfg.IndexGateway.Mode
t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Ring = t.indexGatewayRing

// If RF > 1 and current or upcoming index type is boltdb-shipper then disable index dedupe and write dedupe cache.
// This is to ensure that index entries are replicated to all the boltdb files in ingesters flushing replicated data.
if t.Cfg.Ingester.LifecyclerConfig.RingConfig.ReplicationFactor > 1 && config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) {
if t.Cfg.Ingester.LifecyclerConfig.RingConfig.ReplicationFactor > 1 && config.UsingObjectStorageIndex(t.Cfg.SchemaConfig.Configs) {
t.Cfg.ChunkStoreConfig.DisableIndexDeduplication = true
t.Cfg.ChunkStoreConfig.WriteDedupeCacheConfig = cache.Config{}
}

if config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) {
// Set configs pertaining to object storage based indices
if config.UsingObjectStorageIndex(t.Cfg.SchemaConfig.Configs) {
t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterName = t.Cfg.Ingester.LifecyclerConfig.ID
t.Cfg.StorageConfig.TSDBShipperConfig.IngesterName = t.Cfg.Ingester.LifecyclerConfig.ID

switch true {
case t.Cfg.isModuleEnabled(Ingester), t.Cfg.isModuleEnabled(Write):
// We do not want ingester to unnecessarily keep downloading files
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeWriteOnly
// Use fifo cache for caching index in memory, this also significantly helps performance.
t.Cfg.StorageConfig.IndexQueriesCacheConfig = cache.Config{
EnableFifoCache: true,
Expand All @@ -412,22 +419,53 @@ func (t *Loki) initStore() (_ services.Service, err error) {
// have query gaps on chunks flushed after an index entry is cached by keeping them retained in the ingester
// and queried as part of live data until the cache TTL expires on the index entry.
t.Cfg.Ingester.RetainPeriod = t.Cfg.StorageConfig.IndexCacheValidity + 1*time.Minute
t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterDBRetainPeriod = boltdbShipperQuerierIndexUpdateDelay(t.Cfg)

// We do not want ingester to unnecessarily keep downloading files
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeWriteOnly
t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterDBRetainPeriod = shipperQuerierIndexUpdateDelay(t.Cfg.StorageConfig.IndexCacheValidity, t.Cfg.StorageConfig.BoltDBShipperConfig.ResyncInterval)

t.Cfg.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeWriteOnly
t.Cfg.StorageConfig.TSDBShipperConfig.IngesterDBRetainPeriod = shipperQuerierIndexUpdateDelay(t.Cfg.StorageConfig.IndexCacheValidity, t.Cfg.StorageConfig.TSDBShipperConfig.ResyncInterval)

case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler), t.Cfg.isModuleEnabled(Read), t.isModuleActive(IndexGateway):
// We do not want query to do any updates to index
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly
t.Cfg.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeReadOnly
default:
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadWrite
t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterDBRetainPeriod = boltdbShipperQuerierIndexUpdateDelay(t.Cfg)
t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterDBRetainPeriod = shipperQuerierIndexUpdateDelay(t.Cfg.StorageConfig.IndexCacheValidity, t.Cfg.StorageConfig.BoltDBShipperConfig.ResyncInterval)
t.Cfg.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeReadWrite
t.Cfg.StorageConfig.TSDBShipperConfig.IngesterDBRetainPeriod = shipperQuerierIndexUpdateDelay(t.Cfg.StorageConfig.IndexCacheValidity, t.Cfg.StorageConfig.TSDBShipperConfig.ResyncInterval)

}
}

t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Mode = t.Cfg.IndexGateway.Mode
t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Ring = t.indexGatewayRing
if config.UsingObjectStorageIndex(t.Cfg.SchemaConfig.Configs) {
var asyncStore bool

shipperConfigIdx := config.ActivePeriodConfig(t.Cfg.SchemaConfig.Configs)
iTy := t.Cfg.SchemaConfig.Configs[shipperConfigIdx].IndexType
if iTy != config.BoltDBShipperType && iTy != config.TSDBType {
shipperConfigIdx++
}

// TODO(owen-d): make helper more agnostic between boltdb|tsdb
var resyncInterval time.Duration
switch t.Cfg.SchemaConfig.Configs[shipperConfigIdx].IndexType {
case config.BoltDBShipperType:
resyncInterval = t.Cfg.StorageConfig.BoltDBShipperConfig.ResyncInterval
case config.TSDBType:
resyncInterval = t.Cfg.StorageConfig.TSDBShipperConfig.ResyncInterval
}

minIngesterQueryStoreDuration := shipperMinIngesterQueryStoreDuration(
t.Cfg.Ingester.MaxChunkAge,
shipperQuerierIndexUpdateDelay(
t.Cfg.StorageConfig.IndexCacheValidity,
resyncInterval,
),
)

var asyncStore bool
if config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) {
boltdbShipperMinIngesterQueryStoreDuration := boltdbShipperMinIngesterQueryStoreDuration(t.Cfg)
switch true {
case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler), t.Cfg.isModuleEnabled(Read):
// Do not use the AsyncStore if the querier is configured with QueryStoreOnly set to true
Expand All @@ -439,30 +477,34 @@ func (t *Loki) initStore() (_ services.Service, err error) {
asyncStore = true
case t.Cfg.isModuleEnabled(IndexGateway):
// we want to use the actual storage when running the index-gateway, so we remove the Addr from the config
// TODO(owen-d): Do the same for TSDB when we add IndexGatewayClientConfig
t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Disabled = true
case t.Cfg.isModuleEnabled(All):
// We want ingester to also query the store when using boltdb-shipper but only when running with target All.
// We do not want to use AsyncStore otherwise it would start spiraling around doing queries over and over again to the ingesters and store.
// ToDo: See if we can avoid doing this when not running loki in clustered mode.
t.Cfg.Ingester.QueryStore = true
boltdbShipperConfigIdx := config.ActivePeriodConfig(t.Cfg.SchemaConfig.Configs)
if t.Cfg.SchemaConfig.Configs[boltdbShipperConfigIdx].IndexType != config.BoltDBShipperType {
boltdbShipperConfigIdx++
}
mlb, err := calculateMaxLookBack(t.Cfg.SchemaConfig.Configs[boltdbShipperConfigIdx], t.Cfg.Ingester.QueryStoreMaxLookBackPeriod,
boltdbShipperMinIngesterQueryStoreDuration)

mlb, err := calculateMaxLookBack(
t.Cfg.SchemaConfig.Configs[shipperConfigIdx],
t.Cfg.Ingester.QueryStoreMaxLookBackPeriod,
minIngesterQueryStoreDuration,
)
if err != nil {
return nil, err
}
t.Cfg.Ingester.QueryStoreMaxLookBackPeriod = mlb
}
}

if asyncStore {
t.Cfg.StorageConfig.EnableAsyncStore = true
t.Cfg.StorageConfig.AsyncStoreConfig = storage.AsyncStoreCfg{
IngesterQuerier: t.ingesterQuerier,
QueryIngestersWithin: calculateAsyncStoreQueryIngestersWithin(t.Cfg.Querier.QueryIngestersWithin, boltdbShipperMinIngesterQueryStoreDuration(t.Cfg)),
if asyncStore {
t.Cfg.StorageConfig.EnableAsyncStore = true
t.Cfg.StorageConfig.AsyncStoreConfig = storage.AsyncStoreCfg{
IngesterQuerier: t.ingesterQuerier,
QueryIngestersWithin: calculateAsyncStoreQueryIngestersWithin(
t.Cfg.Querier.QueryIngestersWithin,
minIngesterQueryStoreDuration,
),
}
}
}

Expand Down Expand Up @@ -908,6 +950,11 @@ func (t *Loki) initUsageReport() (services.Service, error) {
}

func (t *Loki) deleteRequestsStore() (deletion.DeleteRequestsStore, error) {
// TODO(owen-d): enable delete request storage in tsdb
if config.UsingTSDB(t.Cfg.SchemaConfig.Configs) {
return deletion.NewNoOpDeleteRequestsStore(), nil
}

filteringEnabled, err := deletion.FilteringEnabled(t.Cfg.CompactorConfig.DeletionMode)
if err != nil {
return nil, err
Expand Down Expand Up @@ -954,24 +1001,24 @@ func calculateAsyncStoreQueryIngestersWithin(queryIngestersWithinConfig, minDura
return queryIngestersWithinConfig
}

// boltdbShipperQuerierIndexUpdateDelay returns duration it could take for queriers to serve the index since it was uploaded.
// shipperQuerierIndexUpdateDelay returns duration it could take for queriers to serve the index since it was uploaded.
// It considers upto 3 sync attempts for the indexgateway/queries to be successful in syncing the files to factor in worst case scenarios like
// failures in sync, low download throughput, various kinds of caches in between etc. which can delay the sync operation from getting all the updates from the storage.
// It also considers index cache validity because a querier could have cached index just before it was going to resync which means
// it would keep serving index until the cache entries expire.
func boltdbShipperQuerierIndexUpdateDelay(cfg Config) time.Duration {
return cfg.StorageConfig.IndexCacheValidity + cfg.StorageConfig.BoltDBShipperConfig.ResyncInterval*3
func shipperQuerierIndexUpdateDelay(cacheValidity, resyncInterval time.Duration) time.Duration {
return cacheValidity + resyncInterval*3
}

// boltdbShipperIngesterIndexUploadDelay returns duration it could take for an index file containing id of a chunk to be uploaded to the shared store since it got flushed.
func boltdbShipperIngesterIndexUploadDelay() time.Duration {
// shipperIngesterIndexUploadDelay returns duration it could take for an index file containing id of a chunk to be uploaded to the shared store since it got flushed.
func shipperIngesterIndexUploadDelay() time.Duration {
return uploads.ShardDBsByDuration + shipper.UploadInterval
}

// boltdbShipperMinIngesterQueryStoreDuration returns minimum duration(with some buffer) ingesters should query their stores to
// avoid queriers from missing any logs or chunk ids due to async nature of BoltDB Shipper.
func boltdbShipperMinIngesterQueryStoreDuration(cfg Config) time.Duration {
return cfg.Ingester.MaxChunkAge + boltdbShipperIngesterIndexUploadDelay() + boltdbShipperQuerierIndexUpdateDelay(cfg) + 5*time.Minute
// shipperMinIngesterQueryStoreDuration returns minimum duration(with some buffer) ingesters should query their stores to
// avoid missing any logs or chunk ids due to async nature of shipper.
func shipperMinIngesterQueryStoreDuration(maxChunkAge, querierUpdateDelay time.Duration) time.Duration {
return maxChunkAge + shipperIngesterIndexUploadDelay() + querierUpdateDelay + 5*time.Minute
}

// NewServerService constructs service from Server component.
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/chunk/bigchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ func newBigchunk() *bigchunk {
return &bigchunk{}
}

// TODO(owen-d): remove bigchunk from our code, we don't use it.
// Hack an Entries() impl
func (b *bigchunk) Entries() int { return 0 }

func (b *bigchunk) Add(sample model.SamplePair) (Data, error) {
if b.remainingSamples == 0 {
if bigchunkSizeCapBytes > 0 && b.Size() > bigchunkSizeCapBytes {
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/chunk/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type Data interface {
Rebound(start, end model.Time, filter filter.Func) (Data, error)
// Size returns the approximate length of the chunk in bytes.
Size() int
// Entries returns the number of entries in a chunk
Entries() int
Utilization() float64
}

Expand Down
39 changes: 35 additions & 4 deletions pkg/storage/config/schema_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
StorageTypeSwift = "swift"
// BoltDBShipperType holds the index type for using boltdb with shipper which keeps flushing them to a shared storage
BoltDBShipperType = "boltdb-shipper"
TSDBType = "tsdb"
)

var (
Expand Down Expand Up @@ -184,17 +185,47 @@ func ActivePeriodConfig(configs []PeriodConfig) int {
return i
}

// UsingBoltdbShipper checks whether current or the next index type is boltdb-shipper, returns true if yes.
func UsingBoltdbShipper(configs []PeriodConfig) bool {
func usingForPeriodConfigs(configs []PeriodConfig, fn func(PeriodConfig) bool) bool {
activePCIndex := ActivePeriodConfig(configs)
if configs[activePCIndex].IndexType == BoltDBShipperType ||
(len(configs)-1 > activePCIndex && configs[activePCIndex+1].IndexType == BoltDBShipperType) {

if fn(configs[activePCIndex]) ||
(len(configs)-1 > activePCIndex && fn(configs[activePCIndex+1])) {
return true
}

return false
}

func UsingObjectStorageIndex(configs []PeriodConfig) bool {
fn := func(cfg PeriodConfig) bool {
switch cfg.IndexType {
case BoltDBShipperType, TSDBType:
return true
default:
return false
}
}

return usingForPeriodConfigs(configs, fn)
}

// UsingBoltdbShipper checks whether current or the next index type is boltdb-shipper, returns true if yes.
func UsingBoltdbShipper(configs []PeriodConfig) bool {
fn := func(cfg PeriodConfig) bool {
return cfg.IndexType == BoltDBShipperType
}

return usingForPeriodConfigs(configs, fn)
}

func UsingTSDB(configs []PeriodConfig) bool {
fn := func(cfg PeriodConfig) bool {
return cfg.IndexType == TSDBType
}

return usingForPeriodConfigs(configs, fn)
}

func defaultRowShards(schema string) uint32 {
switch schema {
case "v1", "v2", "v3", "v4", "v5", "v6", "v9":
Expand Down
Loading