From 8680148b0605491d7a93bb19130d305bc56292f4 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Tue, 10 Nov 2020 22:10:20 +0530 Subject: [PATCH] add support for slicing of chunk in encodings (#3472) Signed-off-by: Sandeep Sukhani --- chunk.go | 43 ++++-------------------- chunk_store.go | 5 +-- chunk_test.go | 2 +- encoding/bigchunk.go | 4 +++ encoding/chunk.go | 54 +++++++++++++++++++++++++++++-- encoding/chunk_test.go | 72 +++++++++++++++++++++++++++++++++++++++++ encoding/doubledelta.go | 4 +++ encoding/varbit.go | 4 +++ 8 files changed, 145 insertions(+), 43 deletions(-) diff --git a/chunk.go b/chunk.go index d52acf4bc4dee..2879a8050ae1f 100644 --- a/chunk.go +++ b/chunk.go @@ -20,15 +20,12 @@ import ( "github.com/cortexproject/cortex/pkg/prom1/storage/metric" ) -// Errors that decode can return const ( - ErrInvalidChecksum = errs.Error("invalid chunk checksum") - ErrWrongMetadata = errs.Error("wrong chunk metadata") - ErrMetadataLength = errs.Error("chunk metadata wrong length") - ErrDataLength = errs.Error("chunk data wrong length") - ErrSliceOutOfRange = errs.Error("chunk can't be sliced out of its data range") - ErrSliceNoDataInRange = errs.Error("chunk has no data for given range to slice") - ErrSliceChunkOverflow = errs.Error("slicing should not overflow a chunk") + ErrInvalidChecksum = errs.Error("invalid chunk checksum") + ErrWrongMetadata = errs.Error("wrong chunk metadata") + ErrMetadataLength = errs.Error("chunk metadata wrong length") + ErrDataLength = errs.Error("chunk data wrong length") + ErrSliceOutOfRange = errs.Error("chunk can't be sliced out of its data range") ) var castagnoliTable = crc32.MakeTable(crc32.Castagnoli) @@ -338,39 +335,11 @@ func (c *Chunk) Slice(from, through model.Time) (*Chunk, error) { return nil, ErrSliceOutOfRange } - itr := c.Data.NewIterator(nil) - if !itr.FindAtOrAfter(from) { - return nil, ErrSliceNoDataInRange - } - - pc, err := prom_chunk.NewForEncoding(c.Data.Encoding()) + pc, err := c.Data.Rebound(from, through) if err != nil { return nil, err } - for !itr.Value().Timestamp.After(through) { - oc, err := pc.Add(itr.Value()) - if err != nil { - return nil, err - } - - if oc != nil { - return nil, ErrSliceChunkOverflow - } - if !itr.Scan() { - break - } - } - - err = itr.Err() - if err != nil { - return nil, err - } - - if pc.Len() == 0 { - return nil, ErrSliceNoDataInRange - } - nc := NewChunk(c.UserID, c.Fingerprint, c.Metric, pc, from, through) return &nc, nil } diff --git a/chunk_store.go b/chunk_store.go index f0869ef3853af..97670b10fa138 100644 --- a/chunk_store.go +++ b/chunk_store.go @@ -17,6 +17,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/cortexproject/cortex/pkg/chunk/cache" + "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/extract" "github.com/cortexproject/cortex/pkg/util/flagext" @@ -684,7 +685,7 @@ func (c *baseStore) reboundChunk(ctx context.Context, userID, chunkID string, pa var newChunks []*Chunk if partiallyDeletedInterval.Start > chunk.From { newChunk, err := chunk.Slice(chunk.From, partiallyDeletedInterval.Start-1) - if err != nil && err != ErrSliceNoDataInRange { + if err != nil && err != encoding.ErrSliceNoDataInRange { return errors.Wrapf(err, "when slicing chunk for interval %d - %d", chunk.From, partiallyDeletedInterval.Start-1) } @@ -695,7 +696,7 @@ func (c *baseStore) reboundChunk(ctx context.Context, userID, chunkID string, pa if partiallyDeletedInterval.End < chunk.Through { newChunk, err := chunk.Slice(partiallyDeletedInterval.End+1, chunk.Through) - if err != nil && err != ErrSliceNoDataInRange { + if err != nil && err != encoding.ErrSliceNoDataInRange { return errors.Wrapf(err, "when slicing chunk for interval %d - %d", partiallyDeletedInterval.End+1, chunk.Through) } diff --git a/chunk_test.go b/chunk_test.go index d7b5570c6ab2d..e0c78549941aa 100644 --- a/chunk_test.go +++ b/chunk_test.go @@ -333,7 +333,7 @@ func TestChunk_Slice(t *testing.T) { { name: "slice no data in range", sliceRange: model.Interval{Start: chunkStartTime.Add(time.Second), End: chunkStartTime.Add(10 * time.Second)}, - err: ErrSliceNoDataInRange, + err: encoding.ErrSliceNoDataInRange, }, { name: "slice interval not aligned with sample intervals", diff --git a/encoding/bigchunk.go b/encoding/bigchunk.go index 8683ebc5a00bc..c05defedb172c 100644 --- a/encoding/bigchunk.go +++ b/encoding/bigchunk.go @@ -210,6 +210,10 @@ func (b *bigchunk) Slice(start, end model.Time) Chunk { } } +func (b *bigchunk) Rebound(start, end model.Time) (Chunk, error) { + return reboundChunk(b, start, end) +} + type writer struct { io.Writer } diff --git a/encoding/chunk.go b/encoding/chunk.go index b31304714d1b7..97c95e41a7736 100644 --- a/encoding/chunk.go +++ b/encoding/chunk.go @@ -22,12 +22,18 @@ import ( "sort" "github.com/prometheus/common/model" + errs "github.com/weaveworks/common/errors" "github.com/cortexproject/cortex/pkg/prom1/storage/metric" ) -// ChunkLen is the length of a chunk in bytes. -const ChunkLen = 1024 +const ( + // ChunkLen is the length of a chunk in bytes. + ChunkLen = 1024 + + ErrSliceNoDataInRange = errs.Error("chunk has no data for given range to slice") + ErrSliceChunkOverflow = errs.Error("slicing should not overflow a chunk") +) var ( errChunkBoundsExceeded = errors.New("attempted access outside of chunk boundaries") @@ -50,10 +56,15 @@ type Chunk interface { Encoding() Encoding Utilization() float64 - // Slice returns a smaller chunk the includes all samples between start and end + // Slice returns a smaller chunk that includes all samples between start and end // (inclusive). Its may over estimate. On some encodings it is a noop. Slice(start, end model.Time) Chunk + // Rebound returns a smaller chunk that includes all samples between start and end (inclusive). + // We do not want to change existing Slice implementations because + // it is built specifically for query optimization and is a noop for some of the encodings. + Rebound(start, end model.Time) (Chunk, error) + // Len returns the number of samples in the chunk. Implementations may be // expensive. Len() int @@ -246,3 +257,40 @@ func (it *indexAccessingChunkIterator) Batch(size int) Batch { func (it *indexAccessingChunkIterator) Err() error { return it.acc.err() } + +func reboundChunk(c Chunk, start, end model.Time) (Chunk, error) { + itr := c.NewIterator(nil) + if !itr.FindAtOrAfter(start) { + return nil, ErrSliceNoDataInRange + } + + pc, err := NewForEncoding(c.Encoding()) + if err != nil { + return nil, err + } + + for !itr.Value().Timestamp.After(end) { + oc, err := pc.Add(itr.Value()) + if err != nil { + return nil, err + } + + if oc != nil { + return nil, ErrSliceChunkOverflow + } + if !itr.Scan() { + break + } + } + + err = itr.Err() + if err != nil { + return nil, err + } + + if pc.Len() == 0 { + return nil, ErrSliceNoDataInRange + } + + return pc, nil +} diff --git a/encoding/chunk_test.go b/encoding/chunk_test.go index c78e06381b010..5b06cd3e0e311 100644 --- a/encoding/chunk_test.go +++ b/encoding/chunk_test.go @@ -86,6 +86,10 @@ func TestChunk(t *testing.T) { t.Run(fmt.Sprintf("testChunkBatch/%s/%d", tc.encoding.String(), samples), func(t *testing.T) { testChunkBatch(t, tc.encoding, samples) }) + + t.Run(fmt.Sprintf("testChunkRebound/%s/%d", tc.encoding.String(), samples), func(t *testing.T) { + testChunkRebound(t, tc.encoding, samples) + }) } } } @@ -220,3 +224,71 @@ func testChunkBatch(t *testing.T, encoding Encoding, samples int) { require.False(t, iter.Scan()) require.NoError(t, iter.Err()) } + +func testChunkRebound(t *testing.T, encoding Encoding, samples int) { + for _, tc := range []struct { + name string + sliceFrom, sliceTo model.Time + err error + }{ + { + name: "slice first half", + sliceFrom: 0, + sliceTo: model.Time((samples / 2) * step), + }, + { + name: "slice second half", + sliceFrom: model.Time((samples / 2) * step), + sliceTo: model.Time((samples - 1) * step), + }, + { + name: "slice in the middle", + sliceFrom: model.Time(int(float64(samples)*0.25) * step), + sliceTo: model.Time(int(float64(samples)*0.75) * step), + }, + { + name: "slice no data in range", + err: ErrSliceNoDataInRange, + sliceFrom: model.Time((samples + 1) * step), + sliceTo: model.Time(samples * 2 * step), + }, + { + name: "slice interval not aligned with sample intervals", + sliceFrom: model.Time(0 + step/2), + sliceTo: model.Time(samples * step).Add(time.Duration(-step / 2)), + }, + } { + t.Run(tc.name, func(t *testing.T) { + originalChunk := mkChunk(t, encoding, samples) + + newChunk, err := originalChunk.Rebound(tc.sliceFrom, tc.sliceTo) + if tc.err != nil { + require.Equal(t, tc.err, err) + return + } + require.NoError(t, err) + + chunkItr := originalChunk.NewIterator(nil) + chunkItr.FindAtOrAfter(tc.sliceFrom) + + newChunkItr := newChunk.NewIterator(nil) + newChunkItr.Scan() + + for { + require.Equal(t, chunkItr.Value(), newChunkItr.Value()) + + originalChunksHasMoreSamples := chunkItr.Scan() + newChunkHasMoreSamples := newChunkItr.Scan() + + // originalChunk and newChunk both should end at same time or newChunk should end before or at slice end time + if !originalChunksHasMoreSamples || chunkItr.Value().Timestamp > tc.sliceTo { + require.False(t, newChunkHasMoreSamples) + break + } + + require.True(t, newChunkHasMoreSamples) + } + + }) + } +} diff --git a/encoding/doubledelta.go b/encoding/doubledelta.go index 683ce844eef6a..e0e43e7d63bdd 100644 --- a/encoding/doubledelta.go +++ b/encoding/doubledelta.go @@ -233,6 +233,10 @@ func (c *doubleDeltaEncodedChunk) Slice(_, _ model.Time) Chunk { return c } +func (c *doubleDeltaEncodedChunk) Rebound(start, end model.Time) (Chunk, error) { + return reboundChunk(c, start, end) +} + // Marshal implements chunk. func (c doubleDeltaEncodedChunk) Marshal(w io.Writer) error { if len(c) > math.MaxUint16 { diff --git a/encoding/varbit.go b/encoding/varbit.go index a9d1c2f28771d..fe67337ecadff 100644 --- a/encoding/varbit.go +++ b/encoding/varbit.go @@ -287,6 +287,10 @@ func (c *varbitChunk) Slice(_, _ model.Time) Chunk { return c } +func (c *varbitChunk) Rebound(start, end model.Time) (Chunk, error) { + return reboundChunk(c, start, end) +} + // Marshal implements chunk. func (c varbitChunk) Marshal(w io.Writer) error { size := c.Size()