diff --git a/CHANGELOG.md b/CHANGELOG.md index 2dbf1c41f2b31..03e4540b2b08f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ * [4904](https://github.com/grafana/loki/pull/4904) **bboreham**: Fixes rare race condition that could crash an ingester. * [4942](https://github.com/grafana/loki/pull/4942) **cyriltovena**: Allow to disable HTTP/2 for GCS. * [4876](https://github.com/grafana/loki/pull/4876) **trevorwhitney**: Docs: add simple, scalable example using docker-compose +* [4857](https://github.com/grafana/loki/pull/4857) **jordanrushing**: New schema v12 changes chunk key structure # 2.4.1 (2021/11/07) diff --git a/cmd/migrate/main.go b/cmd/migrate/main.go index 6d93c09f8da31..09f9c57ebf241 100644 --- a/cmd/migrate/main.go +++ b/cmd/migrate/main.go @@ -171,7 +171,8 @@ func main() { syncRanges := calcSyncRanges(parsedFrom.UnixNano(), parsedTo.UnixNano(), shardByNs.Nanoseconds()) log.Printf("With a shard duration of %v, %v ranges have been calculated.\n", shardByNs, len(syncRanges)) - cm := newChunkMover(ctx, s, d, *source, *dest, matchers, *batch) + // Pass dest schema config, the destination determines the new chunk external keys using potentially a different schema config. + cm := newChunkMover(ctx, destConfig.SchemaConfig.SchemaConfig, s, d, *source, *dest, matchers, *batch) syncChan := make(chan *syncRange) errorChan := make(chan error) statsChan := make(chan stats) @@ -264,6 +265,7 @@ type stats struct { type chunkMover struct { ctx context.Context + schema chunk.SchemaConfig source storage.Store dest storage.Store sourceUser string @@ -272,9 +274,10 @@ type chunkMover struct { batch int } -func newChunkMover(ctx context.Context, source, dest storage.Store, sourceUser, destUser string, matchers []*labels.Matcher, batch int) *chunkMover { +func newChunkMover(ctx context.Context, s chunk.SchemaConfig, source, dest storage.Store, sourceUser, destUser string, matchers []*labels.Matcher, batch int) *chunkMover { cm := &chunkMover{ ctx: ctx, + schema: s, source: source, dest: dest, sourceUser: sourceUser, @@ -319,9 +322,11 @@ func (m *chunkMover) moveChunks(ctx context.Context, threadID int, syncRangeCh < chks := make([]chunk.Chunk, 0, len(chunks)) // FetchChunks requires chunks to be ordered by external key. - sort.Slice(chunks, func(l, m int) bool { return chunks[l].ExternalKey() < chunks[m].ExternalKey() }) + sort.Slice(chunks, func(x, y int) bool { + return m.schema.ExternalKey(chunks[x]) < m.schema.ExternalKey(chunks[y]) + }) for _, chk := range chunks { - key := chk.ExternalKey() + key := m.schema.ExternalKey(chk) keys = append(keys, key) chks = append(chks, chk) } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 8d164113385c8..a5fa6f3ff1752 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -668,11 +668,16 @@ func (i *Ingester) GetChunkIDs(ctx context.Context, req *logproto.GetChunkIDsReq return nil, err } + // todo (Callum) ingester should maybe store the whole schema config? + s := chunk.SchemaConfig{ + Configs: i.periodicConfigs, + } + // build the response resp := logproto.GetChunkIDsResponse{ChunkIDs: []string{}} for _, chunks := range chunksGroups { for _, chk := range chunks { - resp.ChunkIDs = append(resp.ChunkIDs, chk.ExternalKey()) + resp.ChunkIDs = append(resp.ChunkIDs, s.ExternalKey(chk)) } } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index b2b6bf2d69216..c21711df98031 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -387,7 +387,7 @@ func (t *Loki) initStore() (_ services.Service, err error) { } // Use AsyncStore to query both ingesters local store and chunk store for store queries. // Only queriers should use the AsyncStore, it should never be used in ingesters. - chunkStore = loki_storage.NewAsyncStore(chunkStore, t.ingesterQuerier, + chunkStore = loki_storage.NewAsyncStore(chunkStore, t.Cfg.SchemaConfig.SchemaConfig, t.ingesterQuerier, calculateAsyncStoreQueryIngestersWithin(t.Cfg.Querier.QueryIngestersWithin, boltdbShipperMinIngesterQueryStoreDuration), ) case t.Cfg.isModuleEnabled(All): diff --git a/pkg/storage/async_store.go b/pkg/storage/async_store.go index 48f1e23f6f079..125dfda9522a5 100644 --- a/pkg/storage/async_store.go +++ b/pkg/storage/async_store.go @@ -25,13 +25,15 @@ type IngesterQuerier interface { // It should never be used in ingesters otherwise it would start spiraling around doing queries over and over again to other ingesters. type AsyncStore struct { chunk.Store + scfg chunk.SchemaConfig ingesterQuerier IngesterQuerier queryIngestersWithin time.Duration } -func NewAsyncStore(store chunk.Store, querier IngesterQuerier, queryIngestersWithin time.Duration) *AsyncStore { +func NewAsyncStore(store chunk.Store, scfg chunk.SchemaConfig, querier IngesterQuerier, queryIngestersWithin time.Duration) *AsyncStore { return &AsyncStore{ Store: store, + scfg: scfg, ingesterQuerier: querier, queryIngestersWithin: queryIngestersWithin, } @@ -87,7 +89,7 @@ func (a *AsyncStore) GetChunkRefs(ctx context.Context, userID string, from, thro } func (a *AsyncStore) mergeIngesterAndStoreChunks(userID string, storeChunks [][]chunk.Chunk, fetchers []*chunk.Fetcher, ingesterChunkIDs []string) ([][]chunk.Chunk, []*chunk.Fetcher, error) { - ingesterChunkIDs = filterDuplicateChunks(storeChunks, ingesterChunkIDs) + ingesterChunkIDs = filterDuplicateChunks(a.scfg, storeChunks, ingesterChunkIDs) level.Debug(util_log.Logger).Log("msg", "post-filtering ingester chunks", "count", len(ingesterChunkIDs)) fetcherToChunksGroupIdx := make(map[*chunk.Fetcher]int, len(fetchers)) @@ -105,7 +107,7 @@ func (a *AsyncStore) mergeIngesterAndStoreChunks(userID string, storeChunks [][] // ToDo(Sandeep) possible optimization: Keep the chunk fetcher reference handy after first call since it is expected to stay the same. fetcher := a.Store.GetChunkFetcher(chk.Through) if fetcher == nil { - return nil, nil, fmt.Errorf("got a nil fetcher for chunk %s", chk.ExternalKey()) + return nil, nil, fmt.Errorf("got a nil fetcher for chunk %s", a.scfg.ExternalKey(chk)) } if _, ok := fetcherToChunksGroupIdx[fetcher]; !ok { @@ -121,13 +123,13 @@ func (a *AsyncStore) mergeIngesterAndStoreChunks(userID string, storeChunks [][] return storeChunks, fetchers, nil } -func filterDuplicateChunks(storeChunks [][]chunk.Chunk, ingesterChunkIDs []string) []string { +func filterDuplicateChunks(scfg chunk.SchemaConfig, storeChunks [][]chunk.Chunk, ingesterChunkIDs []string) []string { filteredChunkIDs := make([]string, 0, len(ingesterChunkIDs)) seen := make(map[string]struct{}, len(storeChunks)) for i := range storeChunks { for j := range storeChunks[i] { - seen[storeChunks[i][j].ExternalKey()] = struct{}{} + seen[scfg.ExternalKey(storeChunks[i][j])] = struct{}{} } } diff --git a/pkg/storage/async_store_test.go b/pkg/storage/async_store_test.go index 6d0a5d567c622..669220ced5c1b 100644 --- a/pkg/storage/async_store_test.go +++ b/pkg/storage/async_store_test.go @@ -53,13 +53,23 @@ func buildMockChunkRef(t *testing.T, num int) []chunk.Chunk { now := time.Now() var chunks []chunk.Chunk + s := chunk.SchemaConfig{ + Configs: []chunk.PeriodConfig{ + { + From: chunk.DayTime{Time: 0}, + Schema: "v11", + RowShards: 16, + }, + }, + } + for i := 0; i < num; i++ { chk := newChunk(buildTestStreams(fooLabelsWithName, timeRange{ from: now.Add(time.Duration(i) * time.Minute), to: now.Add(time.Duration(i+1) * time.Minute), })) - chunkRef, err := chunk.ParseExternalKey(chk.UserID, chk.ExternalKey()) + chunkRef, err := chunk.ParseExternalKey(chk.UserID, s.ExternalKey(chk)) require.NoError(t, err) chunks = append(chunks, chunkRef) @@ -80,6 +90,17 @@ func buildMockFetchers(num int) []*chunk.Fetcher { func TestAsyncStore_mergeIngesterAndStoreChunks(t *testing.T) { testChunks := buildMockChunkRef(t, 10) fetchers := buildMockFetchers(3) + + s := chunk.SchemaConfig{ + Configs: []chunk.PeriodConfig{ + { + From: chunk.DayTime{Time: 0}, + Schema: "v11", + RowShards: 16, + }, + }, + } + for _, tc := range []struct { name string storeChunks [][]chunk.Chunk @@ -101,7 +122,7 @@ func TestAsyncStore_mergeIngesterAndStoreChunks(t *testing.T) { }, { name: "no chunks from querier", - ingesterChunkIDs: convertChunksToChunkIDs(testChunks), + ingesterChunkIDs: convertChunksToChunkIDs(s, testChunks), ingesterFetcher: fetchers[0], expectedChunks: [][]chunk.Chunk{testChunks}, expectedFetchers: fetchers[0:1], @@ -112,7 +133,7 @@ func TestAsyncStore_mergeIngesterAndStoreChunks(t *testing.T) { testChunks[0:5], }, storeFetcher: fetchers[0:1], - ingesterChunkIDs: convertChunksToChunkIDs(testChunks[5:]), + ingesterChunkIDs: convertChunksToChunkIDs(s, testChunks[5:]), ingesterFetcher: fetchers[1], expectedChunks: [][]chunk.Chunk{ testChunks[0:5], @@ -127,7 +148,7 @@ func TestAsyncStore_mergeIngesterAndStoreChunks(t *testing.T) { testChunks[5:], }, storeFetcher: fetchers[0:2], - ingesterChunkIDs: convertChunksToChunkIDs(testChunks[5:]), + ingesterChunkIDs: convertChunksToChunkIDs(s, testChunks[5:]), ingesterFetcher: fetchers[2], expectedChunks: [][]chunk.Chunk{ testChunks[0:5], @@ -142,7 +163,7 @@ func TestAsyncStore_mergeIngesterAndStoreChunks(t *testing.T) { testChunks[2:5], }, storeFetcher: fetchers[0:2], - ingesterChunkIDs: convertChunksToChunkIDs(testChunks[5:]), + ingesterChunkIDs: convertChunksToChunkIDs(s, testChunks[5:]), ingesterFetcher: fetchers[1], expectedChunks: [][]chunk.Chunk{ testChunks[0:2], @@ -157,7 +178,7 @@ func TestAsyncStore_mergeIngesterAndStoreChunks(t *testing.T) { testChunks[2:5], }, storeFetcher: fetchers[0:2], - ingesterChunkIDs: convertChunksToChunkIDs(testChunks[5:]), + ingesterChunkIDs: convertChunksToChunkIDs(s, testChunks[5:]), ingesterFetcher: fetchers[2], expectedChunks: [][]chunk.Chunk{ testChunks[0:2], @@ -172,7 +193,7 @@ func TestAsyncStore_mergeIngesterAndStoreChunks(t *testing.T) { testChunks[0:5], }, storeFetcher: fetchers[0:1], - ingesterChunkIDs: convertChunksToChunkIDs(append(testChunks[5:], testChunks[5:]...)), + ingesterChunkIDs: convertChunksToChunkIDs(s, append(testChunks[5:], testChunks[5:]...)), ingesterFetcher: fetchers[0], expectedChunks: [][]chunk.Chunk{ testChunks, @@ -188,7 +209,7 @@ func TestAsyncStore_mergeIngesterAndStoreChunks(t *testing.T) { ingesterQuerier := newIngesterQuerierMock() ingesterQuerier.On("GetChunkIDs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.ingesterChunkIDs, nil) - asyncStore := NewAsyncStore(store, ingesterQuerier, 0) + asyncStore := NewAsyncStore(store, chunk.SchemaConfig{}, ingesterQuerier, 0) chunks, fetchers, err := asyncStore.GetChunkRefs(context.Background(), "fake", model.Now(), model.Now(), nil) require.NoError(t, err) @@ -245,7 +266,7 @@ func TestAsyncStore_QueryIngestersWithin(t *testing.T) { ingesterQuerier := newIngesterQuerierMock() ingesterQuerier.On("GetChunkIDs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil) - asyncStore := NewAsyncStore(store, ingesterQuerier, tc.queryIngestersWithin) + asyncStore := NewAsyncStore(store, chunk.SchemaConfig{}, ingesterQuerier, tc.queryIngestersWithin) _, _, err := asyncStore.GetChunkRefs(context.Background(), "fake", tc.queryFrom, tc.queryThrough, nil) require.NoError(t, err) @@ -259,10 +280,10 @@ func TestAsyncStore_QueryIngestersWithin(t *testing.T) { } } -func convertChunksToChunkIDs(chunks []chunk.Chunk) []string { +func convertChunksToChunkIDs(s chunk.SchemaConfig, chunks []chunk.Chunk) []string { var chunkIDs []string for _, chk := range chunks { - chunkIDs = append(chunkIDs, chk.ExternalKey()) + chunkIDs = append(chunkIDs, s.ExternalKey(chk)) } return chunkIDs diff --git a/pkg/storage/batch.go b/pkg/storage/batch.go index 557a9a650e9c3..50545ba228a27 100644 --- a/pkg/storage/batch.go +++ b/pkg/storage/batch.go @@ -78,6 +78,7 @@ func NewChunkMetrics(r prometheus.Registerer, maxBatchSize int) *ChunkMetrics { // chunks with the next chunk from the next batch and added it to the next iteration. In this case the boundaries of the batch // is reduced to non-overlapping chunks boundaries. type batchChunkIterator struct { + schemas chunk.SchemaConfig chunks lazyChunks batchSize int lastOverlapping []*LazyChunk @@ -95,6 +96,7 @@ type batchChunkIterator struct { // newBatchChunkIterator creates a new batch iterator with the given batchSize. func newBatchChunkIterator( ctx context.Context, + s chunk.SchemaConfig, chunks []*LazyChunk, batchSize int, direction logproto.Direction, @@ -109,6 +111,7 @@ func newBatchChunkIterator( matchers = removeMatchersByName(matchers, labels.MetricName, astmapper.ShardLabel) res := &batchChunkIterator{ batchSize: batchSize, + schemas: s, metrics: metrics, matchers: matchers, start: start, @@ -279,7 +282,7 @@ func (it *batchChunkIterator) nextBatch() (res *chunkBatch) { } } // download chunk for this batch. - chksBySeries, err := fetchChunkBySeries(it.ctx, it.metrics, batch, it.matchers, it.chunkFilterer) + chksBySeries, err := fetchChunkBySeries(it.ctx, it.schemas, it.metrics, batch, it.matchers, it.chunkFilterer) if err != nil { return &chunkBatch{err: err} } @@ -312,6 +315,7 @@ type logBatchIterator struct { func newLogBatchIterator( ctx context.Context, + schemas chunk.SchemaConfig, metrics *ChunkMetrics, chunks []*LazyChunk, batchSize int, @@ -326,7 +330,7 @@ func newLogBatchIterator( pipeline: pipeline, ctx: ctx, cancel: cancel, - batchChunkIterator: newBatchChunkIterator(ctx, chunks, batchSize, direction, start, end, metrics, matchers, chunkFilterer), + batchChunkIterator: newBatchChunkIterator(ctx, schemas, chunks, batchSize, direction, start, end, metrics, matchers, chunkFilterer), }, nil } @@ -451,6 +455,7 @@ type sampleBatchIterator struct { func newSampleBatchIterator( ctx context.Context, + schemas chunk.SchemaConfig, metrics *ChunkMetrics, chunks []*LazyChunk, batchSize int, @@ -464,7 +469,7 @@ func newSampleBatchIterator( extractor: extractor, ctx: ctx, cancel: cancel, - batchChunkIterator: newBatchChunkIterator(ctx, chunks, batchSize, logproto.FORWARD, start, end, metrics, matchers, chunkFilterer), + batchChunkIterator: newBatchChunkIterator(ctx, schemas, chunks, batchSize, logproto.FORWARD, start, end, metrics, matchers, chunkFilterer), }, nil } @@ -585,6 +590,7 @@ func removeMatchersByName(matchers []*labels.Matcher, names ...string) []*labels func fetchChunkBySeries( ctx context.Context, + s chunk.SchemaConfig, metrics *ChunkMetrics, chunks []*LazyChunk, matchers []*labels.Matcher, @@ -594,7 +600,7 @@ func fetchChunkBySeries( // Make sure the initial chunks are loaded. This is not one chunk // per series, but rather a chunk per non-overlapping iterator. - if err := loadFirstChunks(ctx, chksBySeries); err != nil { + if err := loadFirstChunks(ctx, s, chksBySeries); err != nil { return nil, err } @@ -610,7 +616,7 @@ func fetchChunkBySeries( } // Finally we load all chunks not already loaded - if err := fetchLazyChunks(ctx, allChunks); err != nil { + if err := fetchLazyChunks(ctx, s, allChunks); err != nil { return nil, err } metrics.chunks.WithLabelValues(statusMatched).Add(float64(len(allChunks))) @@ -655,7 +661,7 @@ outer: return chks } -func fetchLazyChunks(ctx context.Context, chunks []*LazyChunk) error { +func fetchLazyChunks(ctx context.Context, s chunk.SchemaConfig, chunks []*LazyChunk) error { var ( totalChunks int64 start = time.Now() @@ -687,9 +693,9 @@ func fetchLazyChunks(ctx context.Context, chunks []*LazyChunk) error { index := make(map[string]*LazyChunk, len(chunks)) // FetchChunks requires chunks to be ordered by external key. - sort.Slice(chunks, func(i, j int) bool { return chunks[i].Chunk.ExternalKey() < chunks[j].Chunk.ExternalKey() }) + sort.Slice(chunks, func(i, j int) bool { return s.ExternalKey(chunks[i].Chunk) < s.ExternalKey(chunks[j].Chunk) }) for _, chk := range chunks { - key := chk.Chunk.ExternalKey() + key := s.ExternalKey(chk.Chunk) keys = append(keys, key) chks = append(chks, chk.Chunk) index[key] = chk @@ -708,7 +714,7 @@ func fetchLazyChunks(ctx context.Context, chunks []*LazyChunk) error { } // assign fetched chunk by key as FetchChunks doesn't guarantee the order. for _, chk := range chks { - index[chk.ExternalKey()].Chunk = chk + index[s.ExternalKey(chk)].Chunk = chk } errChan <- nil @@ -742,7 +748,7 @@ func isInvalidChunkError(err error) bool { return false } -func loadFirstChunks(ctx context.Context, chks map[model.Fingerprint][][]*LazyChunk) error { +func loadFirstChunks(ctx context.Context, s chunk.SchemaConfig, chks map[model.Fingerprint][][]*LazyChunk) error { var toLoad []*LazyChunk for _, lchks := range chks { for _, lchk := range lchks { @@ -752,7 +758,7 @@ func loadFirstChunks(ctx context.Context, chks map[model.Fingerprint][][]*LazyCh toLoad = append(toLoad, lchk[0]) } } - return fetchLazyChunks(ctx, toLoad) + return fetchLazyChunks(ctx, s, toLoad) } func partitionBySeriesChunks(chunks []*LazyChunk) map[model.Fingerprint][][]*LazyChunk { diff --git a/pkg/storage/batch_test.go b/pkg/storage/batch_test.go index 98fea8fdf4d1d..faeb4d1b2624f 100644 --- a/pkg/storage/batch_test.go +++ b/pkg/storage/batch_test.go @@ -42,7 +42,17 @@ func Test_batchIterSafeStart(t *testing.T) { newLazyChunk(stream), } - batch := newBatchChunkIterator(context.Background(), chks, 1, logproto.FORWARD, from, from.Add(4*time.Millisecond), NilMetrics, []*labels.Matcher{}, nil) + s := chunk.SchemaConfig{ + Configs: []chunk.PeriodConfig{ + { + From: chunk.DayTime{Time: 0}, + Schema: "v11", + RowShards: 16, + }, + }, + } + + batch := newBatchChunkIterator(context.Background(), s, chks, 1, logproto.FORWARD, from, from.Add(4*time.Millisecond), NilMetrics, []*labels.Matcher{}, nil) // if it was started already, we should see a panic before this time.Sleep(time.Millisecond) @@ -941,10 +951,20 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, } + s := chunk.SchemaConfig{ + Configs: []chunk.PeriodConfig{ + { + From: chunk.DayTime{Time: 0}, + Schema: "v11", + RowShards: 16, + }, + }, + } + for name, tt := range tests { tt := tt t.Run(name, func(t *testing.T) { - it, err := newLogBatchIterator(context.Background(), NilMetrics, tt.chunks, tt.batchSize, newMatchers(tt.matchers), log.NewNoopPipeline(), tt.direction, tt.start, tt.end, nil) + it, err := newLogBatchIterator(context.Background(), s, NilMetrics, tt.chunks, tt.batchSize, newMatchers(tt.matchers), log.NewNoopPipeline(), tt.direction, tt.start, tt.end, nil) require.NoError(t, err) streams, _, err := iter.ReadBatch(it, 1000) _ = it.Close() @@ -1339,13 +1359,23 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, } + s := chunk.SchemaConfig{ + Configs: []chunk.PeriodConfig{ + { + From: chunk.DayTime{Time: 0}, + Schema: "v11", + RowShards: 16, + }, + }, + } + for name, tt := range tests { tt := tt t.Run(name, func(t *testing.T) { ex, err := log.NewLineSampleExtractor(log.CountExtractor, nil, nil, false, false) require.NoError(t, err) - it, err := newSampleBatchIterator(context.Background(), NilMetrics, tt.chunks, tt.batchSize, newMatchers(tt.matchers), ex, tt.start, tt.end, nil) + it, err := newSampleBatchIterator(context.Background(), s, NilMetrics, tt.chunks, tt.batchSize, newMatchers(tt.matchers), ex, tt.start, tt.end, nil) require.NoError(t, err) series, _, err := iter.ReadSampleBatch(it, 1000) _ = it.Close() @@ -1604,7 +1634,7 @@ func Test_IsInvalidChunkError(t *testing.T) { } func TestBatchCancel(t *testing.T) { - chunk := func(from time.Time) *LazyChunk { + createChunk := func(from time.Time) *LazyChunk { return newLazyChunk(logproto.Stream{ Labels: fooLabelsWithName, Entries: []logproto.Entry{ @@ -1620,11 +1650,22 @@ func TestBatchCancel(t *testing.T) { }) } chunks := []*LazyChunk{ - chunk(from), chunk(from.Add(10 * time.Millisecond)), chunk(from.Add(30 * time.Millisecond)), + createChunk(from), createChunk(from.Add(10 * time.Millisecond)), createChunk(from.Add(30 * time.Millisecond)), } ctx, cancel := context.WithCancel(context.Background()) cancel() - it, err := newLogBatchIterator(ctx, NilMetrics, chunks, 1, newMatchers(fooLabels), log.NewNoopPipeline(), logproto.FORWARD, from, time.Now(), nil) + + s := chunk.SchemaConfig{ + Configs: []chunk.PeriodConfig{ + { + From: chunk.DayTime{Time: 0}, + Schema: "v11", + RowShards: 16, + }, + }, + } + + it, err := newLogBatchIterator(ctx, s, NilMetrics, chunks, 1, newMatchers(fooLabels), log.NewNoopPipeline(), logproto.FORWARD, from, time.Now(), nil) require.NoError(t, err) defer require.NoError(t, it.Close()) for it.Next() { diff --git a/pkg/storage/chunk/aws/dynamodb_storage_client.go b/pkg/storage/chunk/aws/dynamodb_storage_client.go index 5b3c6b17c8b8d..4e851f71c2d60 100644 --- a/pkg/storage/chunk/aws/dynamodb_storage_client.go +++ b/pkg/storage/chunk/aws/dynamodb_storage_client.go @@ -423,7 +423,7 @@ func (a dynamoDBStorageClient) getDynamoDBChunks(ctx context.Context, chunks []c outstanding := dynamoDBReadRequest{} chunksByKey := map[string]chunk.Chunk{} for _, chunk := range chunks { - key := chunk.ExternalKey() + key := a.schemaCfg.ExternalKey(chunk) chunksByKey[key] = chunk tableName, err := a.schemaCfg.ChunkTableFor(chunk.From) if err != nil { @@ -582,7 +582,7 @@ func (a dynamoDBStorageClient) writesForChunks(chunks []chunk.Chunk) (dynamoDBWr if err != nil { return nil, err } - key := chunks[i].ExternalKey() + key := a.schemaCfg.ExternalKey(chunks[i]) table, err := a.schemaCfg.ChunkTableFor(chunks[i].From) if err != nil { diff --git a/pkg/storage/chunk/aws/dynamodb_storage_client_test.go b/pkg/storage/chunk/aws/dynamodb_storage_client_test.go index c221fcd10a013..277af75de35cb 100644 --- a/pkg/storage/chunk/aws/dynamodb_storage_client_test.go +++ b/pkg/storage/chunk/aws/dynamodb_storage_client_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/chunk/testutils" ) @@ -29,7 +30,16 @@ func TestChunksPartialError(t *testing.T) { } ctx := context.Background() // Create more chunks than we can read in one batch - _, chunks, err := testutils.CreateChunks(0, dynamoDBMaxReadBatchSize+50, model.Now().Add(-time.Hour), model.Now()) + s := chunk.SchemaConfig{ + Configs: []chunk.PeriodConfig{ + { + From: chunk.DayTime{Time: 0}, + Schema: "v11", + RowShards: 16, + }, + }, + } + _, chunks, err := testutils.CreateChunks(s, 0, dynamoDBMaxReadBatchSize+50, model.Now().Add(-time.Hour), model.Now()) require.NoError(t, err) err = client.PutChunks(ctx, chunks) require.NoError(t, err) diff --git a/pkg/storage/chunk/aws/fixtures.go b/pkg/storage/chunk/aws/fixtures.go index 7d0bf783d41ee..c546cf6cb3ab6 100644 --- a/pkg/storage/chunk/aws/fixtures.go +++ b/pkg/storage/chunk/aws/fixtures.go @@ -45,7 +45,7 @@ var Fixtures = []testutils.Fixture{ metrics: newMetrics(nil), } mock := newMockS3() - object := objectclient.NewClient(&S3ObjectClient{S3: mock, hedgedS3: mock}, nil) + object := objectclient.NewClient(&S3ObjectClient{S3: mock, hedgedS3: mock}, nil, schemaConfig) return index, object, table, schemaConfig, testutils.CloserFunc(func() error { table.Stop() index.Stop() diff --git a/pkg/storage/chunk/bench_test.go b/pkg/storage/chunk/bench_test.go new file mode 100644 index 0000000000000..be167989755a1 --- /dev/null +++ b/pkg/storage/chunk/bench_test.go @@ -0,0 +1,30 @@ +package chunk + +import ( + "testing" + + "github.com/stretchr/testify/require" + yaml "gopkg.in/yaml.v2" +) + +func BenchmarkExternalKey(b *testing.B) { + b.ReportAllocs() + var cfg SchemaConfig + require.Nil(b, yaml.Unmarshal([]byte(` +configs: + - index: + period: 24h + prefix: loki_dev_004_index_ + object_store: gcs + schema: v12 + store: boltdb-shipper +`), &cfg)) + require.Nil(b, cfg.Validate()) + key := "fake/57f628c7f6d57aad/162c699f000:162c69a07eb:eb242d99" + chunk, err := ParseExternalKey("fake", key) + require.Nil(b, err) + + for i := 0; i < b.N; i++ { + _ = cfg.ExternalKey(chunk) + } +} diff --git a/pkg/storage/chunk/cache/background_test.go b/pkg/storage/chunk/cache/background_test.go index f1a906f47ab82..4852b03c88039 100644 --- a/pkg/storage/chunk/cache/background_test.go +++ b/pkg/storage/chunk/cache/background_test.go @@ -3,6 +3,7 @@ package cache_test import ( "testing" + "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/chunk/cache" ) @@ -12,7 +13,17 @@ func TestBackground(t *testing.T) { WriteBackBuffer: 100, }, cache.NewMockCache(), nil) - keys, chunks := fillCache(t, c) + s := chunk.SchemaConfig{ + Configs: []chunk.PeriodConfig{ + { + From: chunk.DayTime{Time: 0}, + Schema: "v11", + RowShards: 16, + }, + }, + } + + keys, chunks := fillCache(t, s, c) cache.Flush(c) testCacheSingle(t, c, keys, chunks) diff --git a/pkg/storage/chunk/cache/cache_test.go b/pkg/storage/chunk/cache/cache_test.go index aa3d0d3d094d9..5267a66dce4f9 100644 --- a/pkg/storage/chunk/cache/cache_test.go +++ b/pkg/storage/chunk/cache/cache_test.go @@ -20,7 +20,7 @@ import ( const userID = "1" -func fillCache(t *testing.T, cache cache.Cache) ([]string, []chunk.Chunk) { +func fillCache(t *testing.T, scfg chunk.SchemaConfig, cache cache.Cache) ([]string, []chunk.Chunk) { const chunkLen = 13 * 3600 // in seconds // put a set of chunks, larger than background batch size, with varying timestamps and values @@ -68,7 +68,7 @@ func fillCache(t *testing.T, cache cache.Cache) ([]string, []chunk.Chunk) { err = cleanChunk.Decode(chunk.NewDecodeContext(), buf) require.NoError(t, err) - keys = append(keys, c.ExternalKey()) + keys = append(keys, scfg.ExternalKey(c)) bufs = append(bufs, buf) chunks = append(chunks, cleanChunk) } @@ -114,22 +114,37 @@ func testCacheMultiple(t *testing.T, cache cache.Cache, keys []string, chunks [] } func testChunkFetcher(t *testing.T, c cache.Cache, keys []string, chunks []chunk.Chunk) { - fetcher, err := chunk.NewChunkFetcher(c, false, nil) + s := chunk.SchemaConfig{ + Configs: []chunk.PeriodConfig{ + { + From: chunk.DayTime{Time: 0}, + Schema: "v11", + RowShards: 16, + }, + }, + } + + fetcher, err := chunk.NewChunkFetcher(c, false, s, nil) require.NoError(t, err) defer fetcher.Stop() found, err := fetcher.FetchChunks(context.Background(), chunks, keys) require.NoError(t, err) - sort.Sort(byExternalKey(found)) - sort.Sort(byExternalKey(chunks)) + sort.Sort(byExternalKey{found, s}) + sort.Sort(byExternalKey{chunks, s}) require.Equal(t, chunks, found) } -type byExternalKey []chunk.Chunk +type byExternalKey struct { + chunks []chunk.Chunk + scfg chunk.SchemaConfig +} -func (a byExternalKey) Len() int { return len(a) } -func (a byExternalKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a byExternalKey) Less(i, j int) bool { return a[i].ExternalKey() < a[j].ExternalKey() } +func (a byExternalKey) Len() int { return len(a.chunks) } +func (a byExternalKey) Swap(i, j int) { a.chunks[i], a.chunks[j] = a.chunks[j], a.chunks[i] } +func (a byExternalKey) Less(i, j int) bool { + return a.scfg.ExternalKey(a.chunks[i]) < a.scfg.ExternalKey(a.chunks[j]) +} func testCacheMiss(t *testing.T, cache cache.Cache) { for i := 0; i < 100; i++ { @@ -142,7 +157,16 @@ func testCacheMiss(t *testing.T, cache cache.Cache) { } func testCache(t *testing.T, cache cache.Cache) { - keys, chunks := fillCache(t, cache) + s := chunk.SchemaConfig{ + Configs: []chunk.PeriodConfig{ + { + From: chunk.DayTime{Time: 0}, + Schema: "v11", + RowShards: 16, + }, + }, + } + keys, chunks := fillCache(t, s, cache) t.Run("Single", func(t *testing.T) { testCacheSingle(t, cache, keys, chunks) }) diff --git a/pkg/storage/chunk/cassandra/storage_client.go b/pkg/storage/chunk/cassandra/storage_client.go index 30952895dc2b1..65c8a0280c614 100644 --- a/pkg/storage/chunk/cassandra/storage_client.go +++ b/pkg/storage/chunk/cassandra/storage_client.go @@ -484,7 +484,7 @@ func (s *ObjectClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) erro if err != nil { return errors.WithStack(err) } - key := chunks[i].ExternalKey() + key := s.schemaCfg.ExternalKey(chunks[i]) tableName, err := s.schemaCfg.ChunkTableFor(chunks[i].From) if err != nil { return err @@ -520,7 +520,7 @@ func (s *ObjectClient) getChunk(ctx context.Context, decodeContext *chunk.Decode } var buf []byte - if err := s.readSession.Query(fmt.Sprintf("SELECT value FROM %s WHERE hash = ?", tableName), input.ExternalKey()). + if err := s.readSession.Query(fmt.Sprintf("SELECT value FROM %s WHERE hash = ?", tableName), s.schemaCfg.ExternalKey(input)). WithContext(ctx).Scan(&buf); err != nil { return input, errors.WithStack(err) } diff --git a/pkg/storage/chunk/chunk.go b/pkg/storage/chunk/chunk.go index aca45cce2a299..b3349a6fcacac 100644 --- a/pkg/storage/chunk/chunk.go +++ b/pkg/storage/chunk/chunk.go @@ -3,7 +3,6 @@ package chunk import ( "bytes" "encoding/binary" - "fmt" "hash/crc32" "reflect" "strconv" @@ -89,18 +88,20 @@ func NewChunk(userID string, fp model.Fingerprint, metric labels.Labels, c prom_ // Post-checksums, externals keys become the same across DynamoDB, Memcache // and S3. Numbers become hex encoded. Keys look like: // `/:::`. +// +// v12+, fingerprint is now a prefix to support better read and write request parallelization: +// `//::` func ParseExternalKey(userID, externalKey string) (Chunk, error) { - if !strings.Contains(externalKey, "/") { + if !strings.Contains(externalKey, "/") { // pre-checksum return parseLegacyChunkID(userID, externalKey) + } else if strings.Count(externalKey, "/") == 2 { // v12+ + return parseNewerExternalKey(userID, externalKey) + } else { // post-checksum + return parseNewExternalKey(userID, externalKey) } - chunk, err := parseNewExternalKey(userID, externalKey) - if err != nil { - return Chunk{}, err - } - - return chunk, nil } +// pre-checksum func parseLegacyChunkID(userID, key string) (Chunk, error) { parts := strings.Split(key, ":") if len(parts) != 3 { @@ -126,6 +127,7 @@ func parseLegacyChunkID(userID, key string) (Chunk, error) { }, nil } +// post-checksum func parseNewExternalKey(userID, key string) (Chunk, error) { userIdx := strings.Index(key, "/") if userIdx == -1 || userIdx+1 >= len(key) { @@ -176,9 +178,66 @@ func parseNewExternalKey(userID, key string) (Chunk, error) { }, nil } +// v12+ +func parseNewerExternalKey(userID, key string) (Chunk, error) { + // Parse user + userIdx := strings.Index(key, "/") + if userIdx == -1 || userIdx+1 >= len(key) { + return Chunk{}, errInvalidChunkID(key) + } + if userID != key[:userIdx] { + return Chunk{}, errors.WithStack(ErrWrongMetadata) + } + hexParts := key[userIdx+1:] + partsBytes := unsafeGetBytes(hexParts) + // Parse fingerprint + h, i := readOneHexPart(partsBytes) + if i == 0 || i+1 >= len(partsBytes) { + return Chunk{}, errors.Wrap(errInvalidChunkID(key), "decoding fingerprint") + } + fingerprint, err := strconv.ParseUint(unsafeGetString(h), 16, 64) + if err != nil { + return Chunk{}, errors.Wrap(err, "parsing fingerprint") + } + partsBytes = partsBytes[i+1:] + // Parse start + h, i = readOneHexPart(partsBytes) + if i == 0 || i+1 >= len(partsBytes) { + return Chunk{}, errors.Wrap(errInvalidChunkID(key), "decoding start") + } + from, err := strconv.ParseInt(unsafeGetString(h), 16, 64) + if err != nil { + return Chunk{}, errors.Wrap(err, "parsing start") + } + partsBytes = partsBytes[i+1:] + // Parse through + h, i = readOneHexPart(partsBytes) + if i == 0 || i+1 >= len(partsBytes) { + return Chunk{}, errors.Wrap(errInvalidChunkID(key), "decoding through") + } + through, err := strconv.ParseInt(unsafeGetString(h), 16, 64) + if err != nil { + return Chunk{}, errors.Wrap(err, "parsing through") + } + partsBytes = partsBytes[i+1:] + // Parse checksum + checksum, err := strconv.ParseUint(unsafeGetString(partsBytes), 16, 64) + if err != nil { + return Chunk{}, errors.Wrap(err, "parsing checksum") + } + return Chunk{ + UserID: userID, + Fingerprint: model.Fingerprint(fingerprint), + From: model.Time(from), + Through: model.Time(through), + Checksum: uint32(checksum), + ChecksumSet: true, + }, nil +} + func readOneHexPart(hex []byte) (part []byte, i int) { for i < len(hex) { - if hex[i] != ':' { + if hex[i] != ':' && hex[i] != '/' { i++ continue } @@ -199,21 +258,6 @@ func unsafeGetString(buf []byte) string { return *((*string)(unsafe.Pointer(&buf))) } -// ExternalKey returns the key you can use to fetch this chunk from external -// storage. For newer chunks, this key includes a checksum. -func (c *Chunk) ExternalKey() string { - // Some chunks have a checksum stored in dynamodb, some do not. We must - // generate keys appropriately. - if c.ChecksumSet { - // This is the inverse of parseNewExternalKey. - return fmt.Sprintf("%s/%x:%x:%x:%x", c.UserID, uint64(c.Fingerprint), int64(c.From), int64(c.Through), c.Checksum) - } - // This is the inverse of parseLegacyExternalKey, with "/" prepended. - // Legacy chunks had the user ID prefix on s3/memcache, but not in DynamoDB. - // See comment on parseExternalKey. - return fmt.Sprintf("%s/%d:%d:%d", c.UserID, uint64(c.Fingerprint), int64(c.From), int64(c.Through)) -} - var writerPool = sync.Pool{ New: func() interface{} { return snappy.NewBufferedWriter(nil) }, } diff --git a/pkg/storage/chunk/chunk_store.go b/pkg/storage/chunk/chunk_store.go index 35f1cb9a36cd0..24c27d0546898 100644 --- a/pkg/storage/chunk/chunk_store.go +++ b/pkg/storage/chunk/chunk_store.go @@ -88,6 +88,9 @@ func (cfg *StoreConfig) Validate(logger log.Logger) error { type baseStore struct { cfg StoreConfig + // todo (callum) it looks like baseStore is created off a specific schema struct implementation, so perhaps we can store something else here + // other than the entire set of schema period configs + schemaCfg SchemaConfig index IndexClient chunks Client @@ -96,19 +99,20 @@ type baseStore struct { fetcher *Fetcher } -func newBaseStore(cfg StoreConfig, schema BaseSchema, index IndexClient, chunks Client, limits StoreLimits, chunksCache cache.Cache) (baseStore, error) { - fetcher, err := NewChunkFetcher(chunksCache, cfg.chunkCacheStubs, chunks) +func newBaseStore(cfg StoreConfig, scfg SchemaConfig, schema BaseSchema, index IndexClient, chunks Client, limits StoreLimits, chunksCache cache.Cache) (baseStore, error) { + fetcher, err := NewChunkFetcher(chunksCache, cfg.chunkCacheStubs, scfg, chunks) if err != nil { return baseStore{}, err } return baseStore{ - cfg: cfg, - index: index, - chunks: chunks, - schema: schema, - limits: limits, - fetcher: fetcher, + cfg: cfg, + schemaCfg: scfg, + index: index, + chunks: chunks, + schema: schema, + limits: limits, + fetcher: fetcher, }, nil } @@ -125,8 +129,8 @@ type store struct { schema StoreSchema } -func newStore(cfg StoreConfig, schema StoreSchema, index IndexClient, chunks Client, limits StoreLimits, chunksCache cache.Cache) (Store, error) { - rs, err := newBaseStore(cfg, schema, index, chunks, limits, chunksCache) +func newStore(cfg StoreConfig, scfg SchemaConfig, schema StoreSchema, index IndexClient, chunks Client, limits StoreLimits, chunksCache cache.Cache) (Store, error) { + rs, err := newBaseStore(cfg, scfg, schema, index, chunks, limits, chunksCache) if err != nil { return nil, err } @@ -179,7 +183,7 @@ func (c *store) calculateIndexEntries(userID string, from, through model.Time, c return nil, ErrMetricNameLabelMissing } - entries, err := c.schema.GetWriteEntries(from, through, userID, metricName, chunk.Metric, chunk.ExternalKey()) + entries, err := c.schema.GetWriteEntries(from, through, userID, metricName, chunk.Metric, c.baseStore.schemaCfg.ExternalKey(chunk)) if err != nil { return nil, err } @@ -279,7 +283,7 @@ func (c *store) LabelNamesForMetricName(ctx context.Context, userID string, from // Filter out chunks that are not in the selected time range and keep a single chunk per fingerprint filtered := filterChunksByTime(from, through, chunks) - filtered, keys := filterChunksByUniqueFingerprint(filtered) + filtered, keys := filterChunksByUniqueFingerprint(c.baseStore.schemaCfg, filtered) level.Debug(log).Log("msg", "Chunks post filtering", "chunks", len(chunks)) // Now fetch the actual chunk data from Memcache / S3 @@ -362,7 +366,7 @@ func (c *store) getMetricNameChunks(ctx context.Context, userID string, from, th } // Now fetch the actual chunk data from Memcache / S3 - keys := keysFromChunks(filtered) + keys := keysFromChunks(c.baseStore.schemaCfg, filtered) allChunks, err := c.fetcher.FetchChunks(ctx, filtered, keys) if err != nil { return nil, err diff --git a/pkg/storage/chunk/chunk_store_test.go b/pkg/storage/chunk/chunk_store_test.go index fdccda26e7607..6d4a606de2723 100644 --- a/pkg/storage/chunk/chunk_store_test.go +++ b/pkg/storage/chunk/chunk_store_test.go @@ -57,19 +57,19 @@ var stores = []struct { } // newTestStore creates a new Store for testing. -func newTestChunkStore(t require.TestingT, schemaName string) Store { +func newTestChunkStore(t require.TestingT, schemaName string) (Store, SchemaConfig) { var storeCfg StoreConfig flagext.DefaultValues(&storeCfg) return newTestChunkStoreConfig(t, schemaName, storeCfg) } -func newTestChunkStoreConfig(t require.TestingT, schemaName string, storeCfg StoreConfig) Store { +func newTestChunkStoreConfig(t require.TestingT, schemaName string, storeCfg StoreConfig) (Store, SchemaConfig) { schemaCfg := DefaultSchemaConfig("", schemaName, 0) schema, err := schemaCfg.Configs[0].CreateSchema() require.NoError(t, err) - return newTestChunkStoreConfigWithMockStorage(t, schemaCfg, schema, storeCfg) + return newTestChunkStoreConfigWithMockStorage(t, schemaCfg, schema, storeCfg), schemaCfg } func newTestChunkStoreConfigWithMockStorage(t require.TestingT, schemaCfg SchemaConfig, schema BaseSchema, storeCfg StoreConfig) Store { @@ -98,7 +98,7 @@ func newTestChunkStoreConfigWithMockStorage(t require.TestingT, schemaCfg Schema require.NoError(t, err) store := NewCompositeStore(nil) - err = store.addSchema(storeCfg, schema, schemaCfg.Configs[0].From.Time, storage, storage, overrides, chunksCache, writeDedupeCache) + err = store.addSchema(storeCfg, schemaCfg, schema, schemaCfg.Configs[0].From.Time, storage, storage, overrides, chunksCache, writeDedupeCache) require.NoError(t, err) return store } @@ -190,7 +190,7 @@ func TestChunkStore_Get(t *testing.T) { for _, schema := range schemas { for _, storeCase := range stores { storeCfg := storeCase.configFn() - store := newTestChunkStoreConfig(t, schema, storeCfg) + store, _ := newTestChunkStoreConfig(t, schema, storeCfg) defer store.Stop() if err := store.Put(ctx, []Chunk{ @@ -312,7 +312,7 @@ func TestChunkStore_LabelValuesForMetricName(t *testing.T) { t.Run(fmt.Sprintf("%s / %s / %s / %s", tc.metricName, tc.labelName, schema, storeCase.name), func(t *testing.T) { t.Log("========= Running labelValues with metricName", tc.metricName, "with labelName", tc.labelName, "with schema", schema) storeCfg := storeCase.configFn() - store := newTestChunkStoreConfig(t, schema, storeCfg) + store, _ := newTestChunkStoreConfig(t, schema, storeCfg) defer store.Stop() if err := store.Put(ctx, []Chunk{ @@ -412,7 +412,7 @@ func TestChunkStore_LabelNamesForMetricName(t *testing.T) { t.Run(fmt.Sprintf("%s / %s / %s ", tc.metricName, schema, storeCase.name), func(t *testing.T) { t.Log("========= Running labelNames with metricName", tc.metricName, "with schema", schema) storeCfg := storeCase.configFn() - store := newTestChunkStoreConfig(t, schema, storeCfg) + store, _ := newTestChunkStoreConfig(t, schema, storeCfg) defer store.Stop() if err := store.Put(ctx, []Chunk{ @@ -519,7 +519,7 @@ func TestChunkStore_getMetricNameChunks(t *testing.T) { for _, schema := range schemas { for _, storeCase := range stores { storeCfg := storeCase.configFn() - store := newTestChunkStoreConfig(t, schema, storeCfg) + store, _ := newTestChunkStoreConfig(t, schema, storeCfg) defer store.Stop() if err := store.Put(ctx, []Chunk{chunk1, chunk2}); err != nil { @@ -556,7 +556,7 @@ func TestChunkStoreRandom(t *testing.T) { for _, schema := range schemas { t.Run(schema, func(t *testing.T) { - store := newTestChunkStore(t, schema) + store, _ := newTestChunkStore(t, schema) defer store.Stop() // put 100 chunks from 0 to 99 @@ -624,7 +624,7 @@ func TestChunkStoreRandom(t *testing.T) { func TestChunkStoreLeastRead(t *testing.T) { // Test we don't read too much from the index ctx := context.Background() - store := newTestChunkStore(t, "v6") + store, _ := newTestChunkStore(t, "v6") defer store.Stop() // Put 24 chunks 1hr chunks in the store @@ -696,7 +696,7 @@ func TestIndexCachingWorks(t *testing.T) { storeMaker := stores[1] storeCfg := storeMaker.configFn() - store := newTestChunkStoreConfig(t, "v9", storeCfg) + store, _ := newTestChunkStoreConfig(t, "v9", storeCfg) defer store.Stop() storage := store.(CompositeStore).stores[0].Store.(*seriesStore).fetcher.storage.(*MockStorage) @@ -722,7 +722,7 @@ func BenchmarkIndexCaching(b *testing.B) { storeMaker := stores[1] storeCfg := storeMaker.configFn() - store := newTestChunkStoreConfig(b, "v9", storeCfg) + store, _ := newTestChunkStoreConfig(b, "v9", storeCfg) defer store.Stop() fooChunk1 := dummyChunkFor(model.Time(0).Add(15*time.Second), BenchmarkLabels) @@ -769,7 +769,7 @@ func TestChunkStoreError(t *testing.T) { } { for _, schema := range schemas { t.Run(fmt.Sprintf("%s / %s", tc.query, schema), func(t *testing.T) { - store := newTestChunkStore(t, schema) + store, _ := newTestChunkStore(t, schema) defer store.Stop() matchers, err := parser.ParseMetricSelector(tc.query) @@ -892,60 +892,60 @@ func TestStore_DeleteChunk(t *testing.T) { fooMetricNameMatcher, err := parser.ParseMetricSelector(`foo`) require.NoError(t, err) - - for _, tc := range []struct { - name string - chunks []Chunk - chunkToDelete Chunk - partialDeleteInterval *model.Interval - err error - numChunksToExpectAfterDeletion int - }{ - { - name: "delete whole chunk", - chunkToDelete: fooChunk1, - numChunksToExpectAfterDeletion: 1, - }, - { - name: "delete chunk partially at start", - chunkToDelete: fooChunk1, - partialDeleteInterval: &model.Interval{Start: fooChunk1.From, End: fooChunk1.From.Add(30 * time.Minute)}, - numChunksToExpectAfterDeletion: 2, - }, - { - name: "delete chunk partially at end", - chunkToDelete: fooChunk1, - partialDeleteInterval: &model.Interval{Start: fooChunk1.Through.Add(-30 * time.Minute), End: fooChunk1.Through}, - numChunksToExpectAfterDeletion: 2, - }, - { - name: "delete chunk partially in the middle", - chunkToDelete: fooChunk1, - partialDeleteInterval: &model.Interval{Start: fooChunk1.From.Add(15 * time.Minute), End: fooChunk1.Through.Add(-15 * time.Minute)}, - numChunksToExpectAfterDeletion: 3, - }, - { - name: "delete non-existent chunk", - chunkToDelete: nonExistentChunk, - numChunksToExpectAfterDeletion: 2, - }, - { - name: "delete first second", - chunkToDelete: fooChunk1, - partialDeleteInterval: &model.Interval{Start: fooChunk1.From, End: fooChunk1.From}, - numChunksToExpectAfterDeletion: 2, - }, - { - name: "delete chunk out of range", - chunkToDelete: fooChunk1, - partialDeleteInterval: &model.Interval{Start: fooChunk1.Through.Add(time.Minute), End: fooChunk1.Through.Add(10 * time.Minute)}, - numChunksToExpectAfterDeletion: 2, - err: errors.Wrapf(ErrParialDeleteChunkNoOverlap, "chunkID=%s", fooChunk1.ExternalKey()), - }, - } { - for _, schema := range schemas { + for _, schema := range schemas { + scfg := DefaultSchemaConfig("", schema, 0) + for _, tc := range []struct { + name string + chunks []Chunk + chunkToDelete Chunk + partialDeleteInterval *model.Interval + err error + numChunksToExpectAfterDeletion int + }{ + { + name: "delete whole chunk", + chunkToDelete: fooChunk1, + numChunksToExpectAfterDeletion: 1, + }, + { + name: "delete chunk partially at start", + chunkToDelete: fooChunk1, + partialDeleteInterval: &model.Interval{Start: fooChunk1.From, End: fooChunk1.From.Add(30 * time.Minute)}, + numChunksToExpectAfterDeletion: 2, + }, + { + name: "delete chunk partially at end", + chunkToDelete: fooChunk1, + partialDeleteInterval: &model.Interval{Start: fooChunk1.Through.Add(-30 * time.Minute), End: fooChunk1.Through}, + numChunksToExpectAfterDeletion: 2, + }, + { + name: "delete chunk partially in the middle", + chunkToDelete: fooChunk1, + partialDeleteInterval: &model.Interval{Start: fooChunk1.From.Add(15 * time.Minute), End: fooChunk1.Through.Add(-15 * time.Minute)}, + numChunksToExpectAfterDeletion: 3, + }, + { + name: "delete non-existent chunk", + chunkToDelete: nonExistentChunk, + numChunksToExpectAfterDeletion: 2, + }, + { + name: "delete first second", + chunkToDelete: fooChunk1, + partialDeleteInterval: &model.Interval{Start: fooChunk1.From, End: fooChunk1.From}, + numChunksToExpectAfterDeletion: 2, + }, + { + name: "delete chunk out of range", + chunkToDelete: fooChunk1, + partialDeleteInterval: &model.Interval{Start: fooChunk1.Through.Add(time.Minute), End: fooChunk1.Through.Add(10 * time.Minute)}, + numChunksToExpectAfterDeletion: 2, + err: errors.Wrapf(ErrParialDeleteChunkNoOverlap, "chunkID=%s", scfg.ExternalKey(fooChunk1)), + }, + } { t.Run(fmt.Sprintf("%s / %s", schema, tc.name), func(t *testing.T) { - store := newTestChunkStore(t, schema) + store, scfg := newTestChunkStore(t, schema) defer store.Stop() // inserting 2 chunks with different labels but same metric name @@ -958,7 +958,7 @@ func TestStore_DeleteChunk(t *testing.T) { require.Equal(t, 2, len(chunks)) err = store.DeleteChunk(ctx, tc.chunkToDelete.From, tc.chunkToDelete.Through, userID, - tc.chunkToDelete.ExternalKey(), tc.chunkToDelete.Metric, tc.partialDeleteInterval) + scfg.ExternalKey(tc.chunkToDelete), tc.chunkToDelete.Metric, tc.partialDeleteInterval) if tc.err != nil { require.Error(t, err) @@ -1024,7 +1024,7 @@ func TestStore_DeleteSeriesIDs(t *testing.T) { for _, schema := range seriesStoreSchemas { t.Run(schema, func(t *testing.T) { - store := newTestChunkStore(t, schema) + store, scfg := newTestChunkStore(t, schema) defer store.Stop() seriesStore := store.(CompositeStore).stores[0].Store.(*seriesStore) @@ -1062,7 +1062,7 @@ func TestStore_DeleteSeriesIDs(t *testing.T) { require.Equal(t, 2, len(seriesIDs)) // lets delete a chunk and then delete its series ID - err = store.DeleteChunk(ctx, fooChunk1.From, fooChunk1.Through, userID, fooChunk1.ExternalKey(), metric1, nil) + err = store.DeleteChunk(ctx, fooChunk1.From, fooChunk1.Through, userID, scfg.ExternalKey(fooChunk1), metric1, nil) require.NoError(t, err) err = store.DeleteSeriesIDs(ctx, fooChunk1.From, fooChunk1.Through, userID, fooChunk1.Metric) @@ -1080,7 +1080,7 @@ func TestStore_DeleteSeriesIDs(t *testing.T) { require.Equal(t, string(labelsSeriesID(fooChunk2.Metric)), seriesIDs[0]) // lets delete the other chunk partially and try deleting the series ID - err = store.DeleteChunk(ctx, fooChunk2.From, fooChunk2.Through, userID, fooChunk2.ExternalKey(), metric2, + err = store.DeleteChunk(ctx, fooChunk2.From, fooChunk2.Through, userID, scfg.ExternalKey(fooChunk2), metric2, &model.Interval{Start: fooChunk2.From, End: fooChunk2.From.Add(30 * time.Minute)}) require.NoError(t, err) @@ -1118,7 +1118,7 @@ func TestDisableIndexDeduplication(t *testing.T) { }, prometheus.NewRegistry(), log.NewNopLogger()) storeCfg.DisableIndexDeduplication = disableIndexDeduplication - store := newTestChunkStoreConfig(t, "v9", storeCfg) + store, _ := newTestChunkStoreConfig(t, "v9", storeCfg) defer store.Stop() storage := store.(CompositeStore).stores[0].Store.(*seriesStore).fetcher.storage.(*MockStorage) diff --git a/pkg/storage/chunk/chunk_store_utils.go b/pkg/storage/chunk/chunk_store_utils.go index a86edd33493fa..392941d78e53e 100644 --- a/pkg/storage/chunk/chunk_store_utils.go +++ b/pkg/storage/chunk/chunk_store_utils.go @@ -29,10 +29,10 @@ func filterChunksByTime(from, through model.Time, chunks []Chunk) []Chunk { return filtered } -func keysFromChunks(chunks []Chunk) []string { +func keysFromChunks(s SchemaConfig, chunks []Chunk) []string { keys := make([]string, 0, len(chunks)) for _, chk := range chunks { - keys = append(keys, chk.ExternalKey()) + keys = append(keys, s.ExternalKey(chk)) } return keys @@ -48,7 +48,7 @@ func labelNamesFromChunks(chunks []Chunk) []string { return result.Strings() } -func filterChunksByUniqueFingerprint(chunks []Chunk) ([]Chunk, []string) { +func filterChunksByUniqueFingerprint(s SchemaConfig, chunks []Chunk) ([]Chunk, []string) { filtered := make([]Chunk, 0, len(chunks)) keys := make([]string, 0, len(chunks)) uniqueFp := map[model.Fingerprint]struct{}{} @@ -58,7 +58,7 @@ func filterChunksByUniqueFingerprint(chunks []Chunk) ([]Chunk, []string) { continue } filtered = append(filtered, chunk) - keys = append(keys, chunk.ExternalKey()) + keys = append(keys, s.ExternalKey(chunk)) uniqueFp[chunk.Fingerprint] = struct{}{} } return filtered, keys @@ -82,6 +82,7 @@ outer: // and writing back any misses to the cache. Also responsible for decoding // chunks from the cache, in parallel. type Fetcher struct { + schema SchemaConfig storage Client cache cache.Cache cacheStubs bool @@ -102,8 +103,9 @@ type decodeResponse struct { } // NewChunkFetcher makes a new ChunkFetcher. -func NewChunkFetcher(cacher cache.Cache, cacheStubs bool, storage Client) (*Fetcher, error) { +func NewChunkFetcher(cacher cache.Cache, cacheStubs bool, schema SchemaConfig, storage Client) (*Fetcher, error) { c := &Fetcher{ + schema: schema, storage: storage, cache: cacher, cacheStubs: cacheStubs, @@ -187,7 +189,7 @@ func (c *Fetcher) writeBackCache(ctx context.Context, chunks []Chunk) error { } } - keys = append(keys, chunks[i].ExternalKey()) + keys = append(keys, c.schema.ExternalKey(chunks[i])) bufs = append(bufs, encoded) } @@ -207,7 +209,7 @@ func (c *Fetcher) processCacheResponse(ctx context.Context, chunks []Chunk, keys i, j := 0, 0 for i < len(chunks) && j < len(keys) { - chunkKey := chunks[i].ExternalKey() + chunkKey := c.schema.ExternalKey(chunks[i]) if chunkKey < keys[j] { missing = append(missing, chunks[i]) diff --git a/pkg/storage/chunk/chunk_test.go b/pkg/storage/chunk/chunk_test.go index aa83f4d4520a5..349eeb3f9033f 100644 --- a/pkg/storage/chunk/chunk_test.go +++ b/pkg/storage/chunk/chunk_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sort" + "strings" "testing" "time" @@ -114,7 +115,17 @@ func TestChunkCodec(t *testing.T) { encoded, err := c.chunk.Encoded() require.NoError(t, err) - have, err := ParseExternalKey(userID, c.chunk.ExternalKey()) + s := SchemaConfig{ + Configs: []PeriodConfig{ + { + From: DayTime{Time: 0}, + Schema: "v11", + RowShards: 16, + }, + }, + } + + have, err := ParseExternalKey(userID, s.ExternalKey(c.chunk)) require.NoError(t, err) buf := make([]byte, len(encoded)) @@ -167,7 +178,17 @@ func TestChunkDecodeBackwardsCompatibility(t *testing.T) { haveEncoded, _ := have.Encoded() wantEncoded, _ := want.Encoded() require.Equal(t, haveEncoded, wantEncoded) - require.Equal(t, have.ExternalKey(), want.ExternalKey()) + + s := SchemaConfig{ + Configs: []PeriodConfig{ + { + From: DayTime{Time: 0}, + Schema: "v11", + RowShards: 16, + }, + }, + } + require.Equal(t, s.ExternalKey(have), s.ExternalKey(want)) } func TestParseExternalKey(t *testing.T) { @@ -192,6 +213,15 @@ func TestParseExternalKey(t *testing.T) { Checksum: 4165752645, }}, + {key: userID + "/2/270d8f00:270d8f00:f84c5745", chunk: Chunk{ + UserID: userID, + Fingerprint: model.Fingerprint(2), + From: model.Time(655200000), + Through: model.Time(655200000), + ChecksumSet: true, + Checksum: 4165752645, + }}, + {key: "invalidUserID/2:270d8f00:270d8f00:f84c5745", chunk: Chunk{}, err: ErrWrongMetadata}, } { chunk, err := ParseExternalKey(userID, c.key) @@ -379,9 +409,136 @@ func TestChunk_Slice(t *testing.T) { } } -func Benchmark_ParseExternalKey(b *testing.B) { +func TestChunkKeys(t *testing.T) { + for _, tc := range []struct { + name string + chunk Chunk + schemaCfg SchemaConfig + }{ + { + name: "Legacy key (pre-checksum)", + chunk: Chunk{ + Fingerprint: 100, + UserID: "fake", + From: model.TimeFromUnix(1000), + Through: model.TimeFromUnix(5000), + }, + schemaCfg: SchemaConfig{ + Configs: []PeriodConfig{ + { + From: DayTime{Time: 0}, + Schema: "v9", + }, + }, + }, + }, + { + name: "New key (post-checksum)", + chunk: Chunk{ + Fingerprint: 100, + UserID: "fake", + From: model.TimeFromUnix(1000), + Through: model.TimeFromUnix(5000), + ChecksumSet: true, + Checksum: 12345, + }, + schemaCfg: SchemaConfig{ + Configs: []PeriodConfig{ + { + From: DayTime{Time: 0}, + Schema: "v11", + RowShards: 16, + }, + }, + }, + }, + { + name: "Newer key (post-v12)", + chunk: Chunk{ + Fingerprint: 100, + UserID: "fake", + From: model.TimeFromUnix(1000), + Through: model.TimeFromUnix(5000), + ChecksumSet: true, + Checksum: 12345, + }, + schemaCfg: SchemaConfig{ + Configs: []PeriodConfig{ + { + From: DayTime{Time: 0}, + Schema: "v12", + RowShards: 16, + }, + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + key := tc.schemaCfg.ExternalKey(tc.chunk) + newChunk, err := ParseExternalKey("fake", key) + require.NoError(t, err) + require.Equal(t, tc.chunk, newChunk) + require.Equal(t, key, tc.schemaCfg.ExternalKey(newChunk)) + }) + } +} + +func BenchmarkParseNewerExternalKey(b *testing.B) { + benchmarkParseExternalKey(b, "fake/57f628c7f6d57aad/162c699f000:162c69a07eb:eb242d99") +} +func BenchmarkParseNewExternalKey(b *testing.B) { + benchmarkParseExternalKey(b, "fake/57f628c7f6d57aad:162c699f000:162c69a07eb:eb242d99") +} +func BenchmarkParseLegacyExternalKey(b *testing.B) { + benchmarkParseExternalKey(b, "2:1484661279394:1484664879394") +} + +func BenchmarkParseOldLegacyExternalKey(b *testing.B) { + benchmarkOldParseExternalKey(b, "2:1484661279394:1484664879394") +} + +func BenchmarkParseOldNewExternalKey(b *testing.B) { + benchmarkOldParseExternalKey(b, "fake/57f628c7f6d57aad:162c699f000:162c69a07eb:eb242d99") +} + +func BenchmarkRootParseLegacyExternalKey(b *testing.B) { + for i := 0; i < b.N; i++ { + _, err := parseLegacyChunkID("fake", "2:1484661279394:1484664879394") + require.NoError(b, err) + } +} + +func BenchmarkRootParseNewExternalKey(b *testing.B) { for i := 0; i < b.N; i++ { - _, err := ParseExternalKey("fake", "fake/57f628c7f6d57aad:162c699f000:162c69a07eb:eb242d99") + _, err := parseNewExternalKey("fake", "fake/57f628c7f6d57aad:162c699f000:162c69a07eb:eb242d99") require.NoError(b, err) } } + +func BenchmarkRootParseNewerExternalKey(b *testing.B) { + for i := 0; i < b.N; i++ { + _, err := parseNewerExternalKey("fake", "fake/57f628c7f6d57aad/162c699f000:162c69a07eb:eb242d99") + require.NoError(b, err) + } +} + +func benchmarkParseExternalKey(b *testing.B, key string) { + for i := 0; i < b.N; i++ { + _, err := ParseExternalKey("fake", key) + require.NoError(b, err) + } +} + +func benchmarkOldParseExternalKey(b *testing.B, key string) { + for i := 0; i < b.N; i++ { + _, err := OldParseExternalKey("fake", key) + require.NoError(b, err) + } +} + +func OldParseExternalKey(userID, externalKey string) (Chunk, error) { + if !strings.Contains(externalKey, "/") { // pre-checksum + return parseLegacyChunkID(userID, externalKey) + } + return parseNewExternalKey(userID, externalKey) +} diff --git a/pkg/storage/chunk/composite_store.go b/pkg/storage/chunk/composite_store.go index b02fda176c285..a668589a442fb 100644 --- a/pkg/storage/chunk/composite_store.go +++ b/pkg/storage/chunk/composite_store.go @@ -71,10 +71,10 @@ func (c *CompositeStore) AddPeriod(storeCfg StoreConfig, cfg PeriodConfig, index return err } - return c.addSchema(storeCfg, schema, cfg.From.Time, index, chunks, limits, chunksCache, writeDedupeCache) + return c.addSchema(storeCfg, SchemaConfig{Configs: []PeriodConfig{cfg}}, schema, cfg.From.Time, index, chunks, limits, chunksCache, writeDedupeCache) } -func (c *CompositeStore) addSchema(storeCfg StoreConfig, schema BaseSchema, start model.Time, index IndexClient, chunks Client, limits StoreLimits, chunksCache, writeDedupeCache cache.Cache) error { +func (c *CompositeStore) addSchema(storeCfg StoreConfig, schemaCfg SchemaConfig, schema BaseSchema, start model.Time, index IndexClient, chunks Client, limits StoreLimits, chunksCache, writeDedupeCache cache.Cache) error { var ( err error store Store @@ -82,9 +82,9 @@ func (c *CompositeStore) addSchema(storeCfg StoreConfig, schema BaseSchema, star switch s := schema.(type) { case SeriesStoreSchema: - store, err = newSeriesStore(storeCfg, s, index, chunks, limits, chunksCache, writeDedupeCache) + store, err = newSeriesStore(storeCfg, schemaCfg, s, index, chunks, limits, chunksCache, writeDedupeCache) case StoreSchema: - store, err = newStore(storeCfg, s, index, chunks, limits, chunksCache) + store, err = newStore(storeCfg, schemaCfg, s, index, chunks, limits, chunksCache) default: err = errors.New("invalid schema type") } diff --git a/pkg/storage/chunk/gcp/bigtable_object_client.go b/pkg/storage/chunk/gcp/bigtable_object_client.go index fa590059291b8..00e1a74f7a5b5 100644 --- a/pkg/storage/chunk/gcp/bigtable_object_client.go +++ b/pkg/storage/chunk/gcp/bigtable_object_client.go @@ -55,7 +55,7 @@ func (s *bigtableObjectClient) PutChunks(ctx context.Context, chunks []chunk.Chu if err != nil { return err } - key := chunks[i].ExternalKey() + key := s.schemaCfg.ExternalKey(chunks[i]) tableName, err := s.schemaCfg.ChunkTableFor(chunks[i].From) if err != nil { return err @@ -94,7 +94,7 @@ func (s *bigtableObjectClient) GetChunks(ctx context.Context, input []chunk.Chun if err != nil { return nil, err } - key := c.ExternalKey() + key := s.schemaCfg.ExternalKey(c) keys[tableName] = append(keys[tableName], key) if _, ok := chunks[tableName]; !ok { chunks[tableName] = map[string]chunk.Chunk{} diff --git a/pkg/storage/chunk/gcp/fixtures.go b/pkg/storage/chunk/gcp/fixtures.go index 088d118554150..f37cc06aa04ea 100644 --- a/pkg/storage/chunk/gcp/fixtures.go +++ b/pkg/storage/chunk/gcp/fixtures.go @@ -87,7 +87,7 @@ func (f *fixture) Clients() ( if err != nil { return } - cClient = objectclient.NewClient(c, nil) + cClient = objectclient.NewClient(c, nil, chunk.SchemaConfig{}) } else { cClient = newBigtableObjectClient(Config{}, schemaConfig, client) } diff --git a/pkg/storage/chunk/grpc/grpc_server_mock_test.go b/pkg/storage/chunk/grpc/grpc_server_mock_test.go index a58f8b23cd98e..b67665bbb61dd 100644 --- a/pkg/storage/chunk/grpc/grpc_server_mock_test.go +++ b/pkg/storage/chunk/grpc/grpc_server_mock_test.go @@ -49,9 +49,11 @@ func (s server) DeleteIndex(ctx context.Context, deletes *DeleteIndexRequest) (* } // storageClient RPCs +// +// Support new and old chunk key formats func (s server) PutChunks(ctx context.Context, request *PutChunksRequest) (*empty.Empty, error) { - // encoded := - if request.Chunks[0].TableName == "" && request.Chunks[0].Key == "fake/ddf337b84e835f32:171bc00155a:171bc00155a:fc8fd207" { + if request.Chunks[0].TableName == "" && (request.Chunks[0].Key == "fake/ddf337b84e835f32:171bc00155a:171bc00155a:fc8fd207") { + return &empty.Empty{}, nil } err := errors.New("putChunks from storageClient request doesn't match with test from gRPC client") @@ -59,7 +61,7 @@ func (s server) PutChunks(ctx context.Context, request *PutChunksRequest) (*empt } func (s server) GetChunks(request *GetChunksRequest, chunksServer GrpcStore_GetChunksServer) error { - if request.Chunks[0].TableName == "" && request.Chunks[0].Key == "fake/ddf337b84e835f32:171bc00155a:171bc00155a:d9a103b5" && + if request.Chunks[0].TableName == "" && (request.Chunks[0].Key == "fake/ddf337b84e835f32:171bc00155a:171bc00155a:d9a103b5") && request.Chunks[0].Encoded == nil { return nil } diff --git a/pkg/storage/chunk/grpc/storage_client.go b/pkg/storage/chunk/grpc/storage_client.go index 66a7a5731bc47..ebe1725ea0067 100644 --- a/pkg/storage/chunk/grpc/storage_client.go +++ b/pkg/storage/chunk/grpc/storage_client.go @@ -43,7 +43,7 @@ func (s *StorageClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) err return errors.WithStack(err) } - key := chunks[i].ExternalKey() + key := s.schemaCfg.ExternalKey(chunks[i]) tableName, err := s.schemaCfg.ChunkTableFor(chunks[i].From) if err != nil { return errors.WithStack(err) @@ -89,7 +89,7 @@ func (s *StorageClient) GetChunks(ctx context.Context, input []chunk.Chunk) ([]c if err != nil { return nil, errors.WithStack(err) } - chunkInfo.Key = inputInfo.ExternalKey() + chunkInfo.Key = s.schemaCfg.ExternalKey(inputInfo) req.Chunks = append(req.Chunks, chunkInfo) } streamer, err := s.client.GetChunks(ctx, req) diff --git a/pkg/storage/chunk/inmemory_storage_client.go b/pkg/storage/chunk/inmemory_storage_client.go index 94c57b35a9b37..5c894ce90cddc 100644 --- a/pkg/storage/chunk/inmemory_storage_client.go +++ b/pkg/storage/chunk/inmemory_storage_client.go @@ -31,9 +31,10 @@ const ( // MockStorage is a fake in-memory StorageClient. type MockStorage struct { - mtx sync.RWMutex - tables map[string]*mockTable - objects map[string][]byte + mtx sync.RWMutex + tables map[string]*mockTable + objects map[string][]byte + schemaCfg SchemaConfig numIndexWrites int numChunkWrites int @@ -53,6 +54,15 @@ type mockItem struct { // NewMockStorage creates a new MockStorage. func NewMockStorage() *MockStorage { return &MockStorage{ + schemaCfg: SchemaConfig{ + Configs: []PeriodConfig{ + { + From: DayTime{Time: 0}, + Schema: "v11", + RowShards: 16, + }, + }, + }, tables: map[string]*mockTable{}, objects: map[string][]byte{}, } @@ -360,7 +370,7 @@ func (m *MockStorage) PutChunks(_ context.Context, chunks []Chunk) error { if err != nil { return err } - m.objects[chunks[i].ExternalKey()] = buf + m.objects[m.schemaCfg.ExternalKey(chunks[i])] = buf } return nil } @@ -377,7 +387,7 @@ func (m *MockStorage) GetChunks(ctx context.Context, chunkSet []Chunk) ([]Chunk, decodeContext := NewDecodeContext() result := []Chunk{} for _, chunk := range chunkSet { - key := chunk.ExternalKey() + key := m.schemaCfg.ExternalKey(chunk) buf, ok := m.objects[key] if !ok { return nil, errStorageObjectNotFound diff --git a/pkg/storage/chunk/local/fixtures.go b/pkg/storage/chunk/local/fixtures.go index d2703808c2797..32839eb8b09b5 100644 --- a/pkg/storage/chunk/local/fixtures.go +++ b/pkg/storage/chunk/local/fixtures.go @@ -43,7 +43,7 @@ func (f *fixture) Clients() ( return } - chunkClient = objectclient.NewClient(oClient, objectclient.Base64Encoder) + chunkClient = objectclient.NewClient(oClient, objectclient.Base64Encoder, chunk.SchemaConfig{}) tableClient, err = NewTableClient(f.dirname) if err != nil { diff --git a/pkg/storage/chunk/objectclient/client.go b/pkg/storage/chunk/objectclient/client.go index 7e9ebe519d84e..b0d37f9bae5ce 100644 --- a/pkg/storage/chunk/objectclient/client.go +++ b/pkg/storage/chunk/objectclient/client.go @@ -29,18 +29,20 @@ type Client struct { store chunk.ObjectClient keyEncoder KeyEncoder getChunkMaxParallel int + schema chunk.SchemaConfig } // NewClient wraps the provided ObjectClient with a chunk.Client implementation -func NewClient(store chunk.ObjectClient, encoder KeyEncoder) *Client { - return NewClientWithMaxParallel(store, encoder, defaultMaxParallel) +func NewClient(store chunk.ObjectClient, encoder KeyEncoder, schema chunk.SchemaConfig) *Client { + return NewClientWithMaxParallel(store, encoder, defaultMaxParallel, schema) } -func NewClientWithMaxParallel(store chunk.ObjectClient, encoder KeyEncoder, maxParallel int) *Client { +func NewClientWithMaxParallel(store chunk.ObjectClient, encoder KeyEncoder, maxParallel int, schema chunk.SchemaConfig) *Client { return &Client{ store: store, keyEncoder: encoder, getChunkMaxParallel: maxParallel, + schema: schema, } } @@ -55,6 +57,7 @@ func (o *Client) PutChunks(ctx context.Context, chunks []chunk.Chunk) error { var ( chunkKeys []string chunkBufs [][]byte + key string ) for i := range chunks { @@ -62,7 +65,9 @@ func (o *Client) PutChunks(ctx context.Context, chunks []chunk.Chunk) error { if err != nil { return err } - key := chunks[i].ExternalKey() + + key = o.schema.ExternalKey(chunks[i]) + if o.keyEncoder != nil { key = o.keyEncoder(key) } @@ -98,7 +103,7 @@ func (o *Client) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.C } func (o *Client) getChunk(ctx context.Context, decodeContext *chunk.DecodeContext, c chunk.Chunk) (chunk.Chunk, error) { - key := c.ExternalKey() + key := o.schema.ExternalKey(c) if o.keyEncoder != nil { key = o.keyEncoder(key) } diff --git a/pkg/storage/chunk/schema.go b/pkg/storage/chunk/schema.go index 5f75a475df484..f61f299cbdae5 100644 --- a/pkg/storage/chunk/schema.go +++ b/pkg/storage/chunk/schema.go @@ -966,3 +966,7 @@ func (v11Entries) GetLabelNamesForSeries(bucket Bucket, seriesID []byte) ([]Inde }, }, nil } + +type v12Entries struct { + v11Entries +} diff --git a/pkg/storage/chunk/schema_config.go b/pkg/storage/chunk/schema_config.go index b23ee87f49f54..309fcd3907fa7 100644 --- a/pkg/storage/chunk/schema_config.go +++ b/pkg/storage/chunk/schema_config.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "strconv" + "strings" "time" "github.com/go-kit/log/level" @@ -22,6 +23,7 @@ const ( secondsInDay = int64(24 * time.Hour / time.Second) millisecondsInHour = int64(time.Hour / time.Millisecond) millisecondsInDay = int64(24 * time.Hour / time.Millisecond) + v12 = "v12" ) var ( @@ -41,6 +43,22 @@ type PeriodConfig struct { IndexTables PeriodicTableConfig `yaml:"index"` ChunkTables PeriodicTableConfig `yaml:"chunks"` RowShards uint32 `yaml:"row_shards"` + + // Integer representation of schema used for hot path calculation. Populated on unmarshaling. + schemaInt *int `yaml:"-"` +} + +// UnmarshalYAML implements yaml.Unmarshaller. +func (cfg *PeriodConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + type plain PeriodConfig + err := unmarshal((*plain)(cfg)) + if err != nil { + return err + } + + // call VersionAsInt after unmarshaling to errcheck schema version and populate PeriodConfig.schemaInt + _, err = cfg.VersionAsInt() + return err } // DayTime is a model.Time what holds day-aligned values, and marshals to/from @@ -188,17 +206,19 @@ func (cfg PeriodConfig) CreateSchema() (BaseSchema, error) { return newStoreSchema(buckets, v6Entries{}), nil case "v9": return newSeriesStoreSchema(buckets, v9Entries{}), nil - case "v10", "v11": + case "v10", "v11", v12: if cfg.RowShards == 0 { - return nil, fmt.Errorf("Must have row_shards > 0 (current: %d) for schema (%s)", cfg.RowShards, cfg.Schema) + return nil, fmt.Errorf("must have row_shards > 0 (current: %d) for schema (%s)", cfg.RowShards, cfg.Schema) } v10 := v10Entries{rowShards: cfg.RowShards} if cfg.Schema == "v10" { return newSeriesStoreSchema(buckets, v10), nil + } else if cfg.Schema == "v11" { + return newSeriesStoreSchema(buckets, v11Entries{v10}), nil + } else { // v12 + return newSeriesStoreSchema(buckets, v12Entries{v11Entries{v10}}), nil } - - return newSeriesStoreSchema(buckets, v11Entries{v10}), nil default: return nil, errInvalidSchemaVersion } @@ -305,6 +325,19 @@ func (cfg *PeriodConfig) dailyBuckets(from, through model.Time, userID string) [ return result } +func (cfg *PeriodConfig) VersionAsInt() (int, error) { + // Read memoized schema version. This is called during unmarshaling, + // but may be nil in the case of testware. + if cfg.schemaInt != nil { + return *cfg.schemaInt, nil + } + + v := strings.Trim(cfg.Schema, "v") + n, err := strconv.Atoi(v) + cfg.schemaInt = &n + return n, err +} + // PeriodicTableConfig is configuration for a set of time-sharded tables. type PeriodicTableConfig struct { Prefix string @@ -434,6 +467,17 @@ func (cfg SchemaConfig) ChunkTableFor(t model.Time) (string, error) { return "", fmt.Errorf("no chunk table found for time %v", t) } +// SchemaForTime returns the Schema PeriodConfig to use for a given point in time. +func (cfg SchemaConfig) SchemaForTime(t model.Time) (PeriodConfig, error) { + for i := range cfg.Configs { + // TODO: callum, confirm we can rely on the schema configs being sorted in this order. + if t >= cfg.Configs[i].From.Time && (i+1 == len(cfg.Configs) || t < cfg.Configs[i+1].From.Time) { + return cfg.Configs[i], nil + } + } + return PeriodConfig{}, fmt.Errorf("no schema config found for time %v", t) +} + // TableFor calculates the table shard for a given point in time. func (cfg *PeriodicTableConfig) TableFor(t model.Time) string { if cfg.Period == 0 { // non-periodic @@ -446,3 +490,34 @@ func (cfg *PeriodicTableConfig) TableFor(t model.Time) string { func (cfg *PeriodicTableConfig) tableForPeriod(i int64) string { return cfg.Prefix + strconv.Itoa(int(i)) } + +// Generate the appropriate external key based on cfg.Schema, chunk.Checksum, and chunk.From +func (cfg SchemaConfig) ExternalKey(chunk Chunk) string { + p, err := cfg.SchemaForTime(chunk.From) + v, _ := p.VersionAsInt() + if err == nil && v >= 12 { + return cfg.newerExternalKey(chunk) + } else if chunk.ChecksumSet { + return cfg.newExternalKey(chunk) + } else { + return cfg.legacyExternalKey(chunk) + } +} + +// pre-checksum +func (cfg SchemaConfig) legacyExternalKey(chunk Chunk) string { + // This is the inverse of chunk.parseLegacyExternalKey, with "/" prepended. + // Legacy chunks had the user ID prefix on s3/memcache, but not in DynamoDB. + return fmt.Sprintf("%d:%d:%d", (chunk.Fingerprint), int64(chunk.From), int64(chunk.Through)) +} + +// post-checksum +func (cfg SchemaConfig) newExternalKey(chunk Chunk) string { + // This is the inverse of chunk.parseNewExternalKey. + return fmt.Sprintf("%s/%x:%x:%x:%x", chunk.UserID, uint64(chunk.Fingerprint), int64(chunk.From), int64(chunk.Through), chunk.Checksum) +} + +// v12+ +func (cfg SchemaConfig) newerExternalKey(chunk Chunk) string { + return fmt.Sprintf("%s/%x/%x:%x:%x", chunk.UserID, uint64(chunk.Fingerprint), int64(chunk.From), int64(chunk.Through), chunk.Checksum) +} diff --git a/pkg/storage/chunk/schema_config_test.go b/pkg/storage/chunk/schema_config_test.go index 814d3446321d0..c3d25548a3d29 100644 --- a/pkg/storage/chunk/schema_config_test.go +++ b/pkg/storage/chunk/schema_config_test.go @@ -665,7 +665,16 @@ func TestPeriodConfig_Validate(t *testing.T) { IndexTables: PeriodicTableConfig{Period: 0}, ChunkTables: PeriodicTableConfig{Period: 0}, }, - err: "Must have row_shards > 0 (current: 0) for schema (v10)", + err: "must have row_shards > 0 (current: 0) for schema (v10)", + }, + { + desc: "v12", + in: PeriodConfig{ + Schema: "v12", + RowShards: 16, + IndexTables: PeriodicTableConfig{Period: 0}, + ChunkTables: PeriodicTableConfig{Period: 0}, + }, }, } { t.Run(tc.desc, func(t *testing.T) { @@ -712,3 +721,129 @@ tags: require.Equal(t, yamlFile, string(yamlGenerated)) } + +func TestSchemaForTime(t *testing.T) { + schemaCfg := SchemaConfig{Configs: []PeriodConfig{ + { + From: DayTime{Time: 1564358400000}, + IndexType: "grpc-store", + ObjectType: "grpc-store", + Schema: "v10", + IndexTables: PeriodicTableConfig{ + Prefix: "index_", + Period: 604800000000000, + Tags: nil, + }, + RowShards: 16, + }, + { + From: DayTime{Time: 1564444800000}, + IndexType: "grpc-store", + ObjectType: "grpc-store", + Schema: "v10", + IndexTables: PeriodicTableConfig{ + Prefix: "index_", + Period: 604800000000000, + Tags: nil, + }, + RowShards: 32, + }, + }} + + first, err := schemaCfg.SchemaForTime(model.TimeFromUnix(1564444800 + 100)) + require.NoError(t, err) + require.Equal(t, schemaCfg.Configs[1], first) + + second, err := schemaCfg.SchemaForTime(model.TimeFromUnix(1564358400 + 100)) + require.NoError(t, err) + require.Equal(t, schemaCfg.Configs[0], second) +} + +func TestVersionAsInt(t *testing.T) { + for _, tc := range []struct { + name string + schemaCfg SchemaConfig + expected int + err bool + }{ + { + name: "v9", + schemaCfg: SchemaConfig{ + Configs: []PeriodConfig{ + { + From: DayTime{Time: 0}, + Schema: "v9", + }, + }, + }, + expected: int(9), + }, + { + name: "malformed", + schemaCfg: SchemaConfig{ + Configs: []PeriodConfig{ + { + From: DayTime{Time: 0}, + Schema: "v", + }, + }, + }, + expected: int(0), + err: true, + }, + { + name: "v12", + schemaCfg: SchemaConfig{ + Configs: []PeriodConfig{ + { + From: DayTime{Time: 0}, + Schema: "v12", + RowShards: 16, + }, + }, + }, + expected: int(12), + }, + } { + t.Run(tc.name, func(t *testing.T) { + version, err := tc.schemaCfg.Configs[0].VersionAsInt() + require.Equal(t, tc.expected, version) + if tc.err { + require.NotNil(t, err) + } else { + + require.NoError(t, err) + } + }) + } +} + +func TestUnmarshalPeriodConfig(t *testing.T) { + input := ` +from: "2020-07-31" +index: + period: 24h + prefix: loki_index_ +object_store: gcs +schema: v11 +store: boltdb-shipper +` + + var cfg PeriodConfig + require.Nil(t, yaml.Unmarshal([]byte(input), &cfg)) + var n = 11 + + expected := PeriodConfig{ + From: DayTime{model.Time(1596153600000)}, + IndexType: "boltdb-shipper", + ObjectType: "gcs", + Schema: "v11", + IndexTables: PeriodicTableConfig{ + Prefix: "loki_index_", + Period: 24 * time.Hour, + }, + schemaInt: &n, + } + + require.Equal(t, expected, cfg) +} diff --git a/pkg/storage/chunk/series_store.go b/pkg/storage/chunk/series_store.go index 77b693fc09070..8a214d1e4dfd1 100644 --- a/pkg/storage/chunk/series_store.go +++ b/pkg/storage/chunk/series_store.go @@ -76,8 +76,8 @@ type seriesStore struct { writeDedupeCache cache.Cache } -func newSeriesStore(cfg StoreConfig, schema SeriesStoreSchema, index IndexClient, chunks Client, limits StoreLimits, chunksCache, writeDedupeCache cache.Cache) (Store, error) { - rs, err := newBaseStore(cfg, schema, index, chunks, limits, chunksCache) +func newSeriesStore(cfg StoreConfig, scfg SchemaConfig, schema SeriesStoreSchema, index IndexClient, chunks Client, limits StoreLimits, chunksCache, writeDedupeCache cache.Cache) (Store, error) { + rs, err := newBaseStore(cfg, scfg, schema, index, chunks, limits, chunksCache) if err != nil { return nil, err } @@ -123,7 +123,7 @@ func (c *seriesStore) Get(ctx context.Context, userID string, from, through mode } // Now fetch the actual chunk data from Memcache / S3 - keys := keysFromChunks(chunks) + keys := keysFromChunks(c.baseStore.schemaCfg, chunks) allChunks, err := fetcher.FetchChunks(ctx, chunks, keys) if err != nil { level.Error(log).Log("msg", "FetchChunks", "err", err) @@ -301,7 +301,7 @@ func (c *seriesStore) lookupLabelNamesByChunks(ctx context.Context, from, throug // Filter out chunks that are not in the selected time range and keep a single chunk per fingerprint filtered := filterChunksByTime(from, through, chunks) - filtered, keys := filterChunksByUniqueFingerprint(filtered) + filtered, keys := filterChunksByUniqueFingerprint(c.baseStore.schemaCfg, filtered) level.Debug(log).Log("Chunks post filtering", len(chunks)) chunksPerQuery.Observe(float64(len(filtered))) @@ -478,7 +478,7 @@ func (c *seriesStore) PutOne(ctx context.Context, from, through model.Time, chun writeChunk := true // If this chunk is in cache it must already be in the database so we don't need to write it again - found, _, _ := c.fetcher.cache.Fetch(ctx, []string{chunk.ExternalKey()}) + found, _, _ := c.fetcher.cache.Fetch(ctx, []string{c.baseStore.schemaCfg.ExternalKey(chunk)}) if len(found) > 0 { writeChunk = false dedupedChunksTotal.Inc() @@ -541,7 +541,7 @@ func (c *seriesStore) calculateIndexEntries(ctx context.Context, from, through m return nil, nil, fmt.Errorf("no MetricNameLabel for chunk") } - keys, labelEntries, err := c.schema.GetCacheKeysAndLabelWriteEntries(from, through, chunk.UserID, metricName, chunk.Metric, chunk.ExternalKey()) + keys, labelEntries, err := c.schema.GetCacheKeysAndLabelWriteEntries(from, through, chunk.UserID, metricName, chunk.Metric, c.baseStore.schemaCfg.ExternalKey(chunk)) if err != nil { return nil, nil, err } @@ -556,7 +556,7 @@ func (c *seriesStore) calculateIndexEntries(ctx context.Context, from, through m } } - chunkEntries, err := c.schema.GetChunkWriteEntries(from, through, chunk.UserID, metricName, chunk.Metric, chunk.ExternalKey()) + chunkEntries, err := c.schema.GetChunkWriteEntries(from, through, chunk.UserID, metricName, chunk.Metric, c.baseStore.schemaCfg.ExternalKey(chunk)) if err != nil { return nil, nil, err } diff --git a/pkg/storage/chunk/series_store_test.go b/pkg/storage/chunk/series_store_test.go index 700c4d617011e..b7f25367ee94c 100644 --- a/pkg/storage/chunk/series_store_test.go +++ b/pkg/storage/chunk/series_store_test.go @@ -58,7 +58,7 @@ func TestSeriesStore_LabelValuesForMetricName(t *testing.T) { t.Run(fmt.Sprintf("%s / %s / %s / %s", tc.metricName, tc.labelName, schema, storeCase.name), func(t *testing.T) { t.Log("========= Running labelValues with metricName", tc.metricName, "with labelName", tc.labelName, "with schema", schema) storeCfg := storeCase.configFn() - store := newTestChunkStoreConfig(t, schema, storeCfg) + store, _ := newTestChunkStoreConfig(t, schema, storeCfg) defer store.Stop() if err := store.Put(ctx, []Chunk{ diff --git a/pkg/storage/chunk/storage/by_key_test.go b/pkg/storage/chunk/storage/by_key_test.go index 59141a864837b..837482fb49012 100644 --- a/pkg/storage/chunk/storage/by_key_test.go +++ b/pkg/storage/chunk/storage/by_key_test.go @@ -5,8 +5,13 @@ import ( ) // ByKey allow you to sort chunks by ID -type ByKey []chunk.Chunk +type ByKey struct { + chunks []chunk.Chunk + scfg chunk.SchemaConfig +} -func (cs ByKey) Len() int { return len(cs) } -func (cs ByKey) Swap(i, j int) { cs[i], cs[j] = cs[j], cs[i] } -func (cs ByKey) Less(i, j int) bool { return cs[i].ExternalKey() < cs[j].ExternalKey() } +func (a ByKey) Len() int { return len(a.chunks) } +func (a ByKey) Swap(i, j int) { a.chunks[i], a.chunks[j] = a.chunks[j], a.chunks[i] } +func (a ByKey) Less(i, j int) bool { + return a.scfg.ExternalKey(a.chunks[i]) < a.scfg.ExternalKey(a.chunks[j]) +} diff --git a/pkg/storage/chunk/storage/chunk_client_test.go b/pkg/storage/chunk/storage/chunk_client_test.go index bafa4b72afced..31d20664fcabd 100644 --- a/pkg/storage/chunk/storage/chunk_client_test.go +++ b/pkg/storage/chunk/storage/chunk_client_test.go @@ -18,6 +18,15 @@ import ( func TestChunksBasic(t *testing.T) { forAllFixtures(t, func(t *testing.T, _ chunk.IndexClient, client chunk.Client) { + s := chunk.SchemaConfig{ + Configs: []chunk.PeriodConfig{ + { + From: chunk.DayTime{Time: 0}, + Schema: "v11", + RowShards: 16, + }, + }, + } const batchSize = 5 ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() @@ -25,7 +34,7 @@ func TestChunksBasic(t *testing.T) { // Write a few batches of chunks. written := []string{} for i := 0; i < 5; i++ { - keys, chunks, err := testutils.CreateChunks(i, batchSize, model.Now().Add(-time.Hour), model.Now()) + keys, chunks, err := testutils.CreateChunks(s, i, batchSize, model.Now().Add(-time.Hour), model.Now()) require.NoError(t, err) written = append(written, keys...) err = client.PutChunks(ctx, chunks) @@ -51,10 +60,10 @@ func TestChunksBasic(t *testing.T) { require.NoError(t, err) require.Equal(t, len(chunksToGet), len(chunksWeGot)) - sort.Sort(ByKey(chunksToGet)) - sort.Sort(ByKey(chunksWeGot)) + sort.Sort(ByKey{chunksToGet, s}) + sort.Sort(ByKey{chunksWeGot, s}) for i := 0; i < len(chunksWeGot); i++ { - require.Equal(t, chunksToGet[i].ExternalKey(), chunksWeGot[i].ExternalKey(), strconv.Itoa(i)) + require.Equal(t, s.ExternalKey(chunksToGet[i]), s.ExternalKey(chunksWeGot[i]), strconv.Itoa(i)) } } }) diff --git a/pkg/storage/chunk/storage/factory.go b/pkg/storage/chunk/storage/factory.go index f80cb210f40a7..427b932ea3248 100644 --- a/pkg/storage/chunk/storage/factory.go +++ b/pkg/storage/chunk/storage/factory.go @@ -277,7 +277,7 @@ func NewChunkClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, regis if err != nil { return nil, err } - return objectclient.NewClientWithMaxParallel(c, nil, cfg.MaxParallelGetChunk), nil + return objectclient.NewClientWithMaxParallel(c, nil, cfg.MaxParallelGetChunk, schemaCfg), nil case StorageTypeAWSDynamo: if cfg.AWSStorageConfig.DynamoDB.URL == nil { return nil, fmt.Errorf("Must set -dynamodb.url in aws mode") @@ -292,7 +292,7 @@ func NewChunkClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, regis if err != nil { return nil, err } - return objectclient.NewClientWithMaxParallel(c, nil, cfg.MaxParallelGetChunk), nil + return objectclient.NewClientWithMaxParallel(c, nil, cfg.MaxParallelGetChunk, schemaCfg), nil case StorageTypeGCP: return gcp.NewBigtableObjectClient(context.Background(), cfg.GCPStorageConfig, schemaCfg) case StorageTypeGCPColumnKey, StorageTypeBigTable, StorageTypeBigTableHashed: @@ -302,13 +302,13 @@ func NewChunkClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, regis if err != nil { return nil, err } - return objectclient.NewClientWithMaxParallel(c, nil, cfg.MaxParallelGetChunk), nil + return objectclient.NewClientWithMaxParallel(c, nil, cfg.MaxParallelGetChunk, schemaCfg), nil case StorageTypeSwift: c, err := openstack.NewSwiftObjectClient(cfg.Swift, cfg.Hedging) if err != nil { return nil, err } - return objectclient.NewClientWithMaxParallel(c, nil, cfg.MaxParallelGetChunk), nil + return objectclient.NewClientWithMaxParallel(c, nil, cfg.MaxParallelGetChunk, schemaCfg), nil case StorageTypeCassandra: return cassandra.NewObjectClient(cfg.CassandraStorageConfig, schemaCfg, registerer, cfg.MaxParallelGetChunk) case StorageTypeFileSystem: @@ -316,7 +316,7 @@ func NewChunkClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, regis if err != nil { return nil, err } - return objectclient.NewClientWithMaxParallel(store, objectclient.Base64Encoder, cfg.MaxParallelGetChunk), nil + return objectclient.NewClientWithMaxParallel(store, objectclient.Base64Encoder, cfg.MaxParallelGetChunk, schemaCfg), nil case StorageTypeGrpc: return grpc.NewStorageClient(cfg.GrpcConfig, schemaCfg) default: diff --git a/pkg/storage/chunk/testutils/testutils.go b/pkg/storage/chunk/testutils/testutils.go index 2b18ce537ad29..f292795e93e03 100644 --- a/pkg/storage/chunk/testutils/testutils.go +++ b/pkg/storage/chunk/testutils/testutils.go @@ -68,7 +68,7 @@ func Setup(fixture Fixture, tableName string) (chunk.IndexClient, chunk.Client, } // CreateChunks creates some chunks for testing -func CreateChunks(startIndex, batchSize int, from model.Time, through model.Time) ([]string, []chunk.Chunk, error) { +func CreateChunks(scfg chunk.SchemaConfig, startIndex, batchSize int, from model.Time, through model.Time) ([]string, []chunk.Chunk, error) { keys := []string{} chunks := []chunk.Chunk{} for j := 0; j < batchSize; j++ { @@ -77,7 +77,7 @@ func CreateChunks(startIndex, batchSize int, from model.Time, through model.Time {Name: "index", Value: strconv.Itoa(startIndex*batchSize + j)}, }) chunks = append(chunks, chunk) - keys = append(keys, chunk.ExternalKey()) + keys = append(keys, scfg.ExternalKey(chunk)) } return keys, chunks, nil } diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 474514c7dfdc6..796b0bb51570d 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -295,7 +295,7 @@ func (s *store) GetSeries(ctx context.Context, req logql.SelectLogParams) ([]log } for _, group := range groups { - err = fetchLazyChunks(ctx, group) + err = fetchLazyChunks(ctx, s.schemaCfg.SchemaConfig, group) if err != nil { return nil, err } @@ -357,7 +357,7 @@ func (s *store) SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter chunkFilterer = s.chunkFilterer.ForRequest(ctx) } - return newLogBatchIterator(ctx, s.chunkMetrics, lazyChunks, s.cfg.MaxChunkBatchSize, matchers, pipeline, req.Direction, req.Start, req.End, chunkFilterer) + return newLogBatchIterator(ctx, s.schemaCfg.SchemaConfig, s.chunkMetrics, lazyChunks, s.cfg.MaxChunkBatchSize, matchers, pipeline, req.Direction, req.Start, req.End, chunkFilterer) } func (s *store) SelectSamples(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error) { @@ -389,7 +389,7 @@ func (s *store) SelectSamples(ctx context.Context, req logql.SelectSampleParams) chunkFilterer = s.chunkFilterer.ForRequest(ctx) } - return newSampleBatchIterator(ctx, s.chunkMetrics, lazyChunks, s.cfg.MaxChunkBatchSize, matchers, extractor, req.Start, req.End, chunkFilterer) + return newSampleBatchIterator(ctx, s.schemaCfg.SchemaConfig, s.chunkMetrics, lazyChunks, s.cfg.MaxChunkBatchSize, matchers, extractor, req.Start, req.End, chunkFilterer) } func (s *store) GetSchemaConfigs() []chunk.PeriodConfig { diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index ac9c65395fb5a..ad0f5d6234bff 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -885,7 +885,7 @@ func TestStore_MultipleBoltDBShippersInConfig(t *testing.T) { err := store.PutOne(ctx, chk.From, chk.Through, chk) require.NoError(t, err) - addedChunkIDs[chk.ExternalKey()] = struct{}{} + addedChunkIDs[schemaConfig.ExternalKey(chk)] = struct{}{} } // recreate the store because boltdb-shipper now runs queriers on snapshots which are created every 1 min and during startup. @@ -916,7 +916,7 @@ func TestStore_MultipleBoltDBShippersInConfig(t *testing.T) { // check whether we got back all the chunks which were added for i := range chunks { - _, ok := addedChunkIDs[chunks[i].ExternalKey()] + _, ok := addedChunkIDs[schemaConfig.ExternalKey(chunks[i])] require.True(t, ok) } } diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index a9f90cf7d76dd..ace1bcd6a1922 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -194,7 +194,7 @@ func (c *Compactor) init(storageConfig storage.Config, schemaConfig loki_storage encoder = objectclient.Base64Encoder } - chunkClient := objectclient.NewClient(objectClient, encoder) + chunkClient := objectclient.NewClient(objectClient, encoder, schemaConfig.SchemaConfig) retentionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "retention") c.sweeper, err = retention.NewSweeper(retentionWorkDir, chunkClient, c.cfg.RetentionDeleteWorkCount, c.cfg.RetentionDeleteDelay, r) diff --git a/pkg/storage/stores/shipper/compactor/retention/iterator_test.go b/pkg/storage/stores/shipper/compactor/retention/iterator_test.go index 36fe3b11897ad..0f193a2a6b616 100644 --- a/pkg/storage/stores/shipper/compactor/retention/iterator_test.go +++ b/pkg/storage/stores/shipper/compactor/retention/iterator_test.go @@ -46,8 +46,8 @@ func Test_ChunkIterator(t *testing.T) { }) require.NoError(t, err) require.Equal(t, []ChunkEntry{ - entryFromChunk(c1), - entryFromChunk(c2), + entryFromChunk(store.schemaCfg.SchemaConfig, c1), + entryFromChunk(store.schemaCfg.SchemaConfig, c2), }, actual) // second pass we delete c2 @@ -62,7 +62,7 @@ func Test_ChunkIterator(t *testing.T) { }) require.NoError(t, err) require.Equal(t, []ChunkEntry{ - entryFromChunk(c1), + entryFromChunk(store.schemaCfg.SchemaConfig, c1), }, actual) }) } @@ -72,6 +72,7 @@ func Test_SeriesCleaner(t *testing.T) { for _, tt := range allSchemas { tt := tt t.Run(tt.schema, func(t *testing.T) { + testSchema := chunk.SchemaConfig{Configs: []chunk.PeriodConfig{tt.config}} store := newTestStore(t) c1 := createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, tt.from, tt.from.Add(1*time.Hour)) c2 := createChunk(t, "2", labels.Labels{labels.Label{Name: "foo", Value: "buzz"}, labels.Label{Name: "bar", Value: "foo"}}, tt.from, tt.from.Add(1*time.Hour)) @@ -101,19 +102,19 @@ func Test_SeriesCleaner(t *testing.T) { err = tables[0].DB.Update(func(tx *bbolt.Tx) error { cleaner := newSeriesCleaner(tx.Bucket(bucketName), tt.config, tables[0].name) - if err := cleaner.Cleanup(entryFromChunk(c2).UserID, c2.Metric); err != nil { + if err := cleaner.Cleanup(entryFromChunk(testSchema, c2).UserID, c2.Metric); err != nil { return err } // remove series for c1 without __name__ label, which should work just fine - return cleaner.Cleanup(entryFromChunk(c1).UserID, c1.Metric.WithoutLabels(labels.MetricName)) + return cleaner.Cleanup(entryFromChunk(testSchema, c1).UserID, c1.Metric.WithoutLabels(labels.MetricName)) }) require.NoError(t, err) err = tables[0].DB.View(func(tx *bbolt.Tx) error { return tx.Bucket(bucketName).ForEach(func(k, _ []byte) error { - c1SeriesID := entryFromChunk(c1).SeriesID - c2SeriesID := entryFromChunk(c2).SeriesID + c1SeriesID := entryFromChunk(testSchema, c1).SeriesID + c2SeriesID := entryFromChunk(testSchema, c2).SeriesID series, ok, err := parseLabelIndexSeriesID(decodeKey(k)) if !ok { return nil @@ -136,12 +137,12 @@ func Test_SeriesCleaner(t *testing.T) { } } -func entryFromChunk(c chunk.Chunk) ChunkEntry { +func entryFromChunk(s chunk.SchemaConfig, c chunk.Chunk) ChunkEntry { return ChunkEntry{ ChunkRef: ChunkRef{ UserID: []byte(c.UserID), SeriesID: labelsSeriesID(c.Metric), - ChunkID: []byte(c.ExternalKey()), + ChunkID: []byte(s.ExternalKey(c)), From: c.From, Through: c.Through, }, diff --git a/pkg/storage/stores/shipper/compactor/retention/retention.go b/pkg/storage/stores/shipper/compactor/retention/retention.go index 1f4d53bba2cc3..fbc7a372f7ae2 100644 --- a/pkg/storage/stores/shipper/compactor/retention/retention.go +++ b/pkg/storage/stores/shipper/compactor/retention/retention.go @@ -276,6 +276,7 @@ type chunkRewriter struct { chunkClient chunk.Client tableName string bucket *bbolt.Bucket + scfg chunk.SchemaConfig seriesStoreSchema chunk.SeriesStoreSchema } @@ -296,6 +297,7 @@ func newChunkRewriter(chunkClient chunk.Client, schemaCfg chunk.PeriodConfig, chunkClient: chunkClient, tableName: tableName, bucket: bucket, + scfg: chunk.SchemaConfig{Configs: []chunk.PeriodConfig{schemaCfg}}, seriesStoreSchema: seriesStoreSchema, }, nil } @@ -343,7 +345,7 @@ func (c *chunkRewriter) rewriteChunk(ctx context.Context, ce ChunkEntry, interva return false, err } - entries, err := c.seriesStoreSchema.GetChunkWriteEntries(interval.Start, interval.End, userID, "logs", newChunk.Metric, newChunk.ExternalKey()) + entries, err := c.seriesStoreSchema.GetChunkWriteEntries(interval.Start, interval.End, userID, "logs", newChunk.Metric, c.scfg.ExternalKey(newChunk)) if err != nil { return false, err } diff --git a/pkg/storage/stores/shipper/compactor/retention/retention_test.go b/pkg/storage/stores/shipper/compactor/retention/retention_test.go index 6a55069db92fd..62d47f8067501 100644 --- a/pkg/storage/stores/shipper/compactor/retention/retention_test.go +++ b/pkg/storage/stores/shipper/compactor/retention/retention_test.go @@ -162,7 +162,7 @@ func Test_Retention(t *testing.T) { for i, e := range tt.alive { require.Equal(t, e, store.HasChunk(tt.chunks[i]), "chunk %d should be %t", i, e) if !e { - expectDeleted = append(expectDeleted, tt.chunks[i].ExternalKey()) + expectDeleted = append(expectDeleted, store.schemaCfg.ExternalKey(tt.chunks[i])) } } sort.Strings(expectDeleted) @@ -363,7 +363,7 @@ func TestChunkRewriter(t *testing.T) { require.NoError(t, store.Put(context.TODO(), []chunk.Chunk{tt.chunk})) store.Stop() - chunkClient := objectclient.NewClient(newTestObjectClient(store.chunkDir), objectclient.Base64Encoder) + chunkClient := objectclient.NewClient(newTestObjectClient(store.chunkDir), objectclient.Base64Encoder, schemaCfg.SchemaConfig) for _, indexTable := range store.indexTables() { err := indexTable.DB.Update(func(tx *bbolt.Tx) error { bucket := tx.Bucket(bucketName) @@ -374,7 +374,7 @@ func TestChunkRewriter(t *testing.T) { cr, err := newChunkRewriter(chunkClient, store.schemaCfg.SchemaConfig.Configs[0], indexTable.name, bucket) require.NoError(t, err) - wroteChunks, err := cr.rewriteChunk(context.Background(), entryFromChunk(tt.chunk), tt.rewriteIntervals) + wroteChunks, err := cr.rewriteChunk(context.Background(), entryFromChunk(store.schemaCfg.SchemaConfig, tt.chunk), tt.rewriteIntervals) require.NoError(t, err) if len(tt.rewriteIntervals) == 0 { require.False(t, wroteChunks) @@ -393,7 +393,7 @@ func TestChunkRewriter(t *testing.T) { for _, interval := range tt.rewriteIntervals { expectedChk := createChunk(t, tt.chunk.UserID, labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, interval.Start, interval.End) for i, chk := range chunks { - if chk.ExternalKey() == expectedChk.ExternalKey() { + if store.schemaCfg.ExternalKey(chk) == store.schemaCfg.ExternalKey(expectedChk) { chunks = append(chunks[:i], chunks[i+1:]...) break } @@ -402,7 +402,7 @@ func TestChunkRewriter(t *testing.T) { // the source chunk should still be there in the store require.Len(t, chunks, 1) - require.Equal(t, tt.chunk.ExternalKey(), chunks[0].ExternalKey()) + require.Equal(t, store.schemaCfg.ExternalKey(tt.chunk), store.schemaCfg.ExternalKey(chunks[0])) store.Stop() }) } @@ -630,7 +630,7 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { require.NoError(t, store.Put(context.TODO(), tc.chunks)) chunksExpiry := map[string]chunkExpiry{} for i, chunk := range tc.chunks { - chunksExpiry[chunk.ExternalKey()] = tc.expiry[i] + chunksExpiry[store.schemaCfg.ExternalKey(chunk)] = tc.expiry[i] } expirationChecker := newMockExpirationChecker(chunksExpiry) @@ -640,7 +640,7 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { tables := store.indexTables() require.Len(t, tables, len(tc.expectedDeletedSeries)) - chunkClient := objectclient.NewClient(newTestObjectClient(store.chunkDir), objectclient.Base64Encoder) + chunkClient := objectclient.NewClient(newTestObjectClient(store.chunkDir), objectclient.Base64Encoder, schemaCfg.SchemaConfig) for i, table := range tables { seriesCleanRecorder := newSeriesCleanRecorder() diff --git a/pkg/storage/stores/shipper/compactor/retention/util_test.go b/pkg/storage/stores/shipper/compactor/retention/util_test.go index e54c9e46e5634..f4b77c0ae0975 100644 --- a/pkg/storage/stores/shipper/compactor/retention/util_test.go +++ b/pkg/storage/stores/shipper/compactor/retention/util_test.go @@ -163,9 +163,9 @@ func (t *testStore) HasChunk(c chunk.Chunk) bool { chunkIDs := make(map[string]struct{}) for _, chk := range chunks { - chunkIDs[chk.ExternalKey()] = struct{}{} + chunkIDs[t.schemaCfg.ExternalKey(chk)] = struct{}{} } - return len(chunkIDs) == 1 && c.ExternalKey() == chunks[0].ExternalKey() + return len(chunkIDs) == 1 && t.schemaCfg.ExternalKey(c) == t.schemaCfg.ExternalKey(chunks[0]) } func (t *testStore) GetChunks(userID string, from, through model.Time, metric labels.Labels) []chunk.Chunk { diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go index cfd9a04613fe8..50a4e34e86c9a 100644 --- a/pkg/storage/util_test.go +++ b/pkg/storage/util_test.go @@ -145,8 +145,9 @@ func newSampleQuery(query string, start, end time.Time) *logproto.SampleQueryReq } type mockChunkStore struct { - chunks []chunk.Chunk - client *mockChunkStoreClient + schemas chunk.SchemaConfig + chunks []chunk.Chunk + client *mockChunkStoreClient } // mockChunkStore cannot implement both chunk.Store and chunk.Client, @@ -161,7 +162,7 @@ func newMockChunkStore(streams []*logproto.Stream) *mockChunkStore { for _, s := range streams { chunks = append(chunks, newChunk(*s)) } - return &mockChunkStore{chunks: chunks, client: &mockChunkStoreClient{chunks: chunks}} + return &mockChunkStore{schemas: chunk.SchemaConfig{}, chunks: chunks, client: &mockChunkStoreClient{chunks: chunks, scfg: chunk.SchemaConfig{}}} } func (m *mockChunkStore) Put(ctx context.Context, chunks []chunk.Chunk) error { return nil } @@ -197,7 +198,7 @@ func (m *mockChunkStore) GetChunkRefs(ctx context.Context, userID string, from, refs := make([]chunk.Chunk, 0, len(m.chunks)) // transform real chunks into ref chunks. for _, c := range m.chunks { - r, err := chunk.ParseExternalKey("fake", c.ExternalKey()) + r, err := chunk.ParseExternalKey("fake", m.schemas.ExternalKey(c)) if err != nil { panic(err) } @@ -209,7 +210,7 @@ func (m *mockChunkStore) GetChunkRefs(ctx context.Context, userID string, from, panic(err) } - f, err := chunk.NewChunkFetcher(cache, false, m.client) + f, err := chunk.NewChunkFetcher(cache, false, m.schemas, m.client) if err != nil { panic(err) } @@ -218,6 +219,7 @@ func (m *mockChunkStore) GetChunkRefs(ctx context.Context, userID string, from, type mockChunkStoreClient struct { chunks []chunk.Chunk + scfg chunk.SchemaConfig } func (m mockChunkStoreClient) Stop() { @@ -233,7 +235,7 @@ func (m mockChunkStoreClient) GetChunks(ctx context.Context, chunks []chunk.Chun for _, c := range chunks { for _, sc := range m.chunks { // only returns chunks requested using the external key - if c.ExternalKey() == sc.ExternalKey() { + if m.scfg.ExternalKey(c) == m.scfg.ExternalKey((sc)) { res = append(res, sc) } }