Skip to content

Commit

Permalink
store: add streamed postings reading
Browse files Browse the repository at this point in the history
`readIndexRange` dominates the profiles here so let's stream reading
postings into `index.Postings` instead of allocating everything at once.

Work in progress.

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
  • Loading branch information
GiedriusS committed May 10, 2023
1 parent a3ae856 commit 2d10d0f
Show file tree
Hide file tree
Showing 6 changed files with 352 additions and 142 deletions.
143 changes: 20 additions & 123 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1326,7 +1326,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
s.metrics.cachedPostingsCompressions.WithLabelValues(labelDecode).Add(float64(stats.cachedPostingsDecompressions))
s.metrics.cachedPostingsCompressionErrors.WithLabelValues(labelEncode).Add(float64(stats.cachedPostingsCompressionErrors))
s.metrics.cachedPostingsCompressionErrors.WithLabelValues(labelDecode).Add(float64(stats.cachedPostingsDecompressionErrors))
s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelEncode).Add(stats.CachedPostingsCompressionTimeSum.Seconds())
s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelDecode).Add(stats.CachedPostingsDecompressionTimeSum.Seconds())
s.metrics.cachedPostingsOriginalSizeBytes.Add(float64(stats.CachedPostingsOriginalSizeSum))
s.metrics.cachedPostingsCompressedSizeBytes.Add(float64(stats.CachedPostingsCompressedSizeSum))
Expand Down Expand Up @@ -2416,142 +2415,42 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab

// Fetch from object storage concurrently and update stats and posting list.
g.Go(func() error {
begin := time.Now()
for _, p := range ptrs[i:j] {
ir, err := r.block.bkt.GetRange(ctx, r.block.indexFilename(), p.ptr.Start, p.ptr.End-p.ptr.Start)
if err != nil {
return errors.Wrap(err, "get range reader")
}

b, err := r.block.readIndexRange(ctx, start, length)
if err != nil {
return errors.Wrap(err, "read postings range")
pr, err := newStreamedPostingsReader(
ir,
r.block.meta.ULID,
keys[p.keyID],
r.block.indexCache,
r.stats,
&r.mtx,
)
if err != nil {
return errors.Wrap(err, "creating postings reader")
}
closeFns = append(closeFns, func() {
runutil.CloseWithLogOnErr(r.block.logger, pr, "fetchPostings close streamed reader")
})
output[p.keyID] = pr
}
fetchTime := time.Since(begin)

r.mtx.Lock()
r.stats.postingsFetchCount++
r.stats.postingsFetched += j - i
r.stats.PostingsFetchDurationSum += fetchTime
r.stats.PostingsFetchedSizeSum += units.Base2Bytes(int(length))
r.mtx.Unlock()

for _, p := range ptrs[i:j] {
// index-header can estimate endings, which means we need to resize the endings.
pBytes, err := resizePostings(b[p.ptr.Start-start : p.ptr.End-start])
if err != nil {
return err
}

dataToCache := pBytes

compressionTime := time.Duration(0)
compressions, compressionErrors, compressedSize := 0, 0, 0

// Reencode postings before storing to cache. If that fails, we store original bytes.
// This can only fail, if postings data was somehow corrupted,
// and there is nothing we can do about it.
// Errors from corrupted postings will be reported when postings are used.
compressions++
s := time.Now()
bep := newBigEndianPostings(pBytes[4:])
data, err := diffVarintSnappyStreamedEncode(bep, bep.length())
compressionTime = time.Since(s)
if err == nil {
dataToCache = data
compressedSize = len(data)
} else {
compressionErrors = 1
}

r.mtx.Lock()
// Return postings and fill LRU cache.
// Truncate first 4 bytes which are length of posting.
output[p.keyID] = newBigEndianPostings(pBytes[4:])

r.block.indexCache.StorePostings(r.block.meta.ULID, keys[p.keyID], dataToCache)

// If we just fetched it we still have to update the stats for touched postings.
r.stats.postingsTouched++
r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(pBytes))
r.stats.cachedPostingsCompressions += compressions
r.stats.cachedPostingsCompressionErrors += compressionErrors
r.stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(len(pBytes))
r.stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(compressedSize)
r.stats.CachedPostingsCompressionTimeSum += compressionTime
r.mtx.Unlock()
}
return nil
})
}

return output, closeFns, g.Wait()
}

func resizePostings(b []byte) ([]byte, error) {
d := encoding.Decbuf{B: b}
n := d.Be32int()
if d.Err() != nil {
return nil, errors.Wrap(d.Err(), "read postings list")
}

// 4 for postings number of entries, then 4, foreach each big endian posting.
size := 4 + n*4
if len(b) < size {
return nil, encoding.ErrInvalidSize
}
return b[:size], nil
}

// bigEndianPostings implements the Postings interface over a byte stream of
// big endian numbers.
type bigEndianPostings struct {
list []byte
cur uint32
}

// TODO(bwplotka): Expose those inside Prometheus.
func newBigEndianPostings(list []byte) *bigEndianPostings {
return &bigEndianPostings{list: list}
}

func (it *bigEndianPostings) At() storage.SeriesRef {
return storage.SeriesRef(it.cur)
}

func (it *bigEndianPostings) Next() bool {
if len(it.list) >= 4 {
it.cur = binary.BigEndian.Uint32(it.list)
it.list = it.list[4:]
return true
}
return false
}

func (it *bigEndianPostings) Seek(x storage.SeriesRef) bool {
if storage.SeriesRef(it.cur) >= x {
return true
}

num := len(it.list) / 4
// Do binary search between current position and end.
i := sort.Search(num, func(i int) bool {
return binary.BigEndian.Uint32(it.list[i*4:]) >= uint32(x)
})
if i < num {
j := i * 4
it.cur = binary.BigEndian.Uint32(it.list[j:])
it.list = it.list[j+4:]
return true
}
it.list = nil
return false
}

func (it *bigEndianPostings) Err() error {
return nil
}

// Returns number of remaining postings values.
func (it *bigEndianPostings) length() int {
return len(it.list) / 4
}

func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.SeriesRef, bytesLimiter BytesLimiter) error {
timer := prometheus.NewTimer(r.block.metrics.seriesFetchDuration)
defer timer.ObserveDuration()
Expand Down Expand Up @@ -3062,7 +2961,6 @@ type queryStats struct {
cachedPostingsCompressionErrors int
CachedPostingsOriginalSizeSum units.Base2Bytes
CachedPostingsCompressedSizeSum units.Base2Bytes
CachedPostingsCompressionTimeSum time.Duration
cachedPostingsDecompressions int
cachedPostingsDecompressionErrors int
CachedPostingsDecompressionTimeSum time.Duration
Expand Down Expand Up @@ -3101,7 +2999,6 @@ func (s queryStats) merge(o *queryStats) *queryStats {
s.cachedPostingsCompressionErrors += o.cachedPostingsCompressionErrors
s.CachedPostingsOriginalSizeSum += o.CachedPostingsOriginalSizeSum
s.CachedPostingsCompressedSizeSum += o.CachedPostingsCompressedSizeSum
s.CachedPostingsCompressionTimeSum += o.CachedPostingsCompressionTimeSum
s.cachedPostingsDecompressions += o.cachedPostingsDecompressions
s.cachedPostingsDecompressionErrors += o.cachedPostingsDecompressionErrors
s.CachedPostingsDecompressionTimeSum += o.CachedPostingsDecompressionTimeSum
Expand Down
19 changes: 0 additions & 19 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package store
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -1889,24 +1888,6 @@ func mustMarshalAny(pb proto.Message) *types.Any {
return out
}

func TestBigEndianPostingsCount(t *testing.T) {
const count = 1000
raw := make([]byte, count*4)

for ix := 0; ix < count; ix++ {
binary.BigEndian.PutUint32(raw[4*ix:], rand.Uint32())
}

p := newBigEndianPostings(raw)
testutil.Equals(t, count, p.length())

c := 0
for p.Next() {
c++
}
testutil.Equals(t, count, c)
}

func createBlockWithOneSeriesWithStep(t testutil.TB, dir string, lbls labels.Labels, blockIndex, totalSamples int, random *rand.Rand, step int64) ulid.ULID {
headOpts := tsdb.DefaultHeadOptions()
headOpts.ChunkDirRoot = dir
Expand Down
Loading

0 comments on commit 2d10d0f

Please sign in to comment.