diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 876f8220c6..21b766d1dd 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1326,7 +1326,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie s.metrics.cachedPostingsCompressions.WithLabelValues(labelDecode).Add(float64(stats.cachedPostingsDecompressions)) s.metrics.cachedPostingsCompressionErrors.WithLabelValues(labelEncode).Add(float64(stats.cachedPostingsCompressionErrors)) s.metrics.cachedPostingsCompressionErrors.WithLabelValues(labelDecode).Add(float64(stats.cachedPostingsDecompressionErrors)) - s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelEncode).Add(stats.CachedPostingsCompressionTimeSum.Seconds()) s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelDecode).Add(stats.CachedPostingsDecompressionTimeSum.Seconds()) s.metrics.cachedPostingsOriginalSizeBytes.Add(float64(stats.CachedPostingsOriginalSizeSum)) s.metrics.cachedPostingsCompressedSizeBytes.Add(float64(stats.CachedPostingsCompressedSizeSum)) @@ -2416,66 +2415,35 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab // Fetch from object storage concurrently and update stats and posting list. g.Go(func() error { - begin := time.Now() + for _, p := range ptrs[i:j] { + ir, err := r.block.bkt.GetRange(ctx, r.block.indexFilename(), p.ptr.Start, p.ptr.End-p.ptr.Start) + if err != nil { + return errors.Wrap(err, "get range reader") + } - b, err := r.block.readIndexRange(ctx, start, length) - if err != nil { - return errors.Wrap(err, "read postings range") + pr, err := newStreamedPostingsReader( + ir, + r.block.meta.ULID, + keys[p.keyID], + r.block.indexCache, + r.stats, + &r.mtx, + ) + if err != nil { + return errors.Wrap(err, "creating postings reader") + } + closeFns = append(closeFns, func() { + runutil.CloseWithLogOnErr(r.block.logger, pr, "fetchPostings close streamed reader") + }) + output[p.keyID] = pr } - fetchTime := time.Since(begin) r.mtx.Lock() r.stats.postingsFetchCount++ r.stats.postingsFetched += j - i - r.stats.PostingsFetchDurationSum += fetchTime r.stats.PostingsFetchedSizeSum += units.Base2Bytes(int(length)) r.mtx.Unlock() - for _, p := range ptrs[i:j] { - // index-header can estimate endings, which means we need to resize the endings. - pBytes, err := resizePostings(b[p.ptr.Start-start : p.ptr.End-start]) - if err != nil { - return err - } - - dataToCache := pBytes - - compressionTime := time.Duration(0) - compressions, compressionErrors, compressedSize := 0, 0, 0 - - // Reencode postings before storing to cache. If that fails, we store original bytes. - // This can only fail, if postings data was somehow corrupted, - // and there is nothing we can do about it. - // Errors from corrupted postings will be reported when postings are used. - compressions++ - s := time.Now() - bep := newBigEndianPostings(pBytes[4:]) - data, err := diffVarintSnappyStreamedEncode(bep, bep.length()) - compressionTime = time.Since(s) - if err == nil { - dataToCache = data - compressedSize = len(data) - } else { - compressionErrors = 1 - } - - r.mtx.Lock() - // Return postings and fill LRU cache. - // Truncate first 4 bytes which are length of posting. - output[p.keyID] = newBigEndianPostings(pBytes[4:]) - - r.block.indexCache.StorePostings(r.block.meta.ULID, keys[p.keyID], dataToCache) - - // If we just fetched it we still have to update the stats for touched postings. - r.stats.postingsTouched++ - r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(pBytes)) - r.stats.cachedPostingsCompressions += compressions - r.stats.cachedPostingsCompressionErrors += compressionErrors - r.stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(len(pBytes)) - r.stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(compressedSize) - r.stats.CachedPostingsCompressionTimeSum += compressionTime - r.mtx.Unlock() - } return nil }) } @@ -2483,75 +2451,6 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab return output, closeFns, g.Wait() } -func resizePostings(b []byte) ([]byte, error) { - d := encoding.Decbuf{B: b} - n := d.Be32int() - if d.Err() != nil { - return nil, errors.Wrap(d.Err(), "read postings list") - } - - // 4 for postings number of entries, then 4, foreach each big endian posting. - size := 4 + n*4 - if len(b) < size { - return nil, encoding.ErrInvalidSize - } - return b[:size], nil -} - -// bigEndianPostings implements the Postings interface over a byte stream of -// big endian numbers. -type bigEndianPostings struct { - list []byte - cur uint32 -} - -// TODO(bwplotka): Expose those inside Prometheus. -func newBigEndianPostings(list []byte) *bigEndianPostings { - return &bigEndianPostings{list: list} -} - -func (it *bigEndianPostings) At() storage.SeriesRef { - return storage.SeriesRef(it.cur) -} - -func (it *bigEndianPostings) Next() bool { - if len(it.list) >= 4 { - it.cur = binary.BigEndian.Uint32(it.list) - it.list = it.list[4:] - return true - } - return false -} - -func (it *bigEndianPostings) Seek(x storage.SeriesRef) bool { - if storage.SeriesRef(it.cur) >= x { - return true - } - - num := len(it.list) / 4 - // Do binary search between current position and end. - i := sort.Search(num, func(i int) bool { - return binary.BigEndian.Uint32(it.list[i*4:]) >= uint32(x) - }) - if i < num { - j := i * 4 - it.cur = binary.BigEndian.Uint32(it.list[j:]) - it.list = it.list[j+4:] - return true - } - it.list = nil - return false -} - -func (it *bigEndianPostings) Err() error { - return nil -} - -// Returns number of remaining postings values. -func (it *bigEndianPostings) length() int { - return len(it.list) / 4 -} - func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.SeriesRef, bytesLimiter BytesLimiter) error { timer := prometheus.NewTimer(r.block.metrics.seriesFetchDuration) defer timer.ObserveDuration() @@ -3062,7 +2961,6 @@ type queryStats struct { cachedPostingsCompressionErrors int CachedPostingsOriginalSizeSum units.Base2Bytes CachedPostingsCompressedSizeSum units.Base2Bytes - CachedPostingsCompressionTimeSum time.Duration cachedPostingsDecompressions int cachedPostingsDecompressionErrors int CachedPostingsDecompressionTimeSum time.Duration @@ -3101,7 +2999,6 @@ func (s queryStats) merge(o *queryStats) *queryStats { s.cachedPostingsCompressionErrors += o.cachedPostingsCompressionErrors s.CachedPostingsOriginalSizeSum += o.CachedPostingsOriginalSizeSum s.CachedPostingsCompressedSizeSum += o.CachedPostingsCompressedSizeSum - s.CachedPostingsCompressionTimeSum += o.CachedPostingsCompressionTimeSum s.cachedPostingsDecompressions += o.cachedPostingsDecompressions s.cachedPostingsDecompressionErrors += o.cachedPostingsDecompressionErrors s.CachedPostingsDecompressionTimeSum += o.CachedPostingsDecompressionTimeSum diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 28b96025db..0122e64546 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -6,7 +6,6 @@ package store import ( "bytes" "context" - "encoding/binary" "fmt" "io" "math" @@ -1889,24 +1888,6 @@ func mustMarshalAny(pb proto.Message) *types.Any { return out } -func TestBigEndianPostingsCount(t *testing.T) { - const count = 1000 - raw := make([]byte, count*4) - - for ix := 0; ix < count; ix++ { - binary.BigEndian.PutUint32(raw[4*ix:], rand.Uint32()) - } - - p := newBigEndianPostings(raw) - testutil.Equals(t, count, p.length()) - - c := 0 - for p.Next() { - c++ - } - testutil.Equals(t, count, c) -} - func createBlockWithOneSeriesWithStep(t testutil.TB, dir string, lbls labels.Labels, blockIndex, totalSamples int, random *rand.Rand, step int64) ulid.ULID { headOpts := tsdb.DefaultHeadOptions() headOpts.ChunkDirRoot = dir diff --git a/pkg/store/streamed_postings.go b/pkg/store/streamed_postings.go new file mode 100644 index 0000000000..3330636b64 --- /dev/null +++ b/pkg/store/streamed_postings.go @@ -0,0 +1,192 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package store + +import ( + "bufio" + "bytes" + "encoding/binary" + "fmt" + "io" + "sync" + + "github.com/alecthomas/units" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/thanos-io/thanos/pkg/errutil" + extsnappy "github.com/thanos-io/thanos/pkg/extgrpc/snappy" + storecache "github.com/thanos-io/thanos/pkg/store/cache" +) + +// Read postings directly from a reader. +// While reading, encode them. +// At the end of reading, store them! +// Note that this depends on the reader completely +// exhausting the postings. +type streamedPostingsReader struct { + bktReader io.ReadCloser + bufferedReader io.Reader + + blockID ulid.ULID + l labels.Label + indexCache storecache.IndexCache + cur uint32 + err error + + uvarintEncodeBuf []byte + readBuf []byte + + sw io.WriteCloser + compressedBuf *bytes.Buffer + + stats *queryStats + statsMtx *sync.Mutex + postingsCount uint32 +} + +func newStreamedPostingsReader( + bktReader io.ReadCloser, + blockID ulid.ULID, + l labels.Label, + indexCache storecache.IndexCache, + stats *queryStats, + statsMtx *sync.Mutex, +) (*streamedPostingsReader, error) { + r := &streamedPostingsReader{ + bktReader: bktReader, + blockID: blockID, + l: l, + indexCache: indexCache, + uvarintEncodeBuf: make([]byte, binary.MaxVarintLen64), + readBuf: make([]byte, 4), + + stats: stats, + statsMtx: statsMtx, + } + + postingsCount, err := getInt32(bktReader, r.readBuf[:0]) + if err != nil { + return nil, errors.Wrap(err, "getting postings count") + } + r.postingsCount = postingsCount + r.bufferedReader = bufio.NewReader(io.LimitReader(bktReader, int64(postingsCount)*4)) + + compressedBuf := bytes.NewBuffer(make([]byte, 0, estimateSnappyStreamSize(int(postingsCount)))) + if n, err := compressedBuf.WriteString(codecHeaderStreamedSnappy); err != nil { + return nil, fmt.Errorf("writing streamed snappy header") + } else if n != len(codecHeaderStreamedSnappy) { + return nil, fmt.Errorf("short-write streamed snappy header") + } + + sw, err := extsnappy.Compressor.Compress(compressedBuf) + if err != nil { + return nil, fmt.Errorf("creating snappy compressor: %w", err) + } + r.sw = sw + r.compressedBuf = compressedBuf + + return r, nil +} + +func getInt32(r io.Reader, buf []byte) (uint32, error) { + read, err := r.Read(buf[:0]) + if err != nil { + return 0, errors.Wrap(err, "reading") + } + if read != 4 { + return 0, fmt.Errorf("read got %d bytes instead of 4", read) + } + return binary.BigEndian.Uint32(buf), nil +} + +func (r *streamedPostingsReader) Close() error { + var errs errutil.MultiError + + closeSnappyErr := r.sw.Close() + errs.Add(closeSnappyErr) + + if errors.Is(r.err, io.EOF) && closeSnappyErr == nil { + r.indexCache.StorePostings(r.blockID, r.l, r.compressedBuf.Bytes()) + } + + if r.stats != nil { + r.statsMtx.Lock() + r.stats.cachedPostingsCompressions++ + r.stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(4 + 4*r.postingsCount) + r.stats.PostingsTouchedSizeSum += units.Base2Bytes(4 + 4*r.postingsCount) + r.stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(r.compressedBuf.Len()) + r.statsMtx.Unlock() + } + + errs.Add(r.bktReader.Close()) + return errs.Err() +} + +func (r *streamedPostingsReader) At() storage.SeriesRef { + return storage.SeriesRef(r.cur) +} + +func (r *streamedPostingsReader) Err() error { + if errors.Is(r.err, io.EOF) { + return nil + } + return r.err +} + +func (r *streamedPostingsReader) bumpCompressionErrors() { + if r.stats == nil { + return + } + r.statsMtx.Lock() + r.stats.cachedPostingsCompressionErrors++ + r.statsMtx.Unlock() +} + +func (r *streamedPostingsReader) Next() bool { + n, err := getInt32(r.bufferedReader, r.readBuf[:0]) + if err != nil { + r.err = err + if !errors.Is(err, io.EOF) { + r.bumpCompressionErrors() + } + return false + } + if n < r.cur { + r.bumpCompressionErrors() + r.err = fmt.Errorf("got non-decreasing values: %v, %v", n, r.cur) + return false + } + + uvarintSize := binary.PutUvarint(r.uvarintEncodeBuf, uint64(n-r.cur)) + if written, err := r.sw.Write(r.uvarintEncodeBuf[:uvarintSize]); err != nil { + r.bumpCompressionErrors() + r.err = errors.Wrap(err, "writing uvarint encoded byte") + return false + } else if written != uvarintSize { + r.bumpCompressionErrors() + r.err = errors.Wrap(err, "short-write for uvarint encoded byte") + return false + } + + r.cur = n + return true +} + +func (r *streamedPostingsReader) Seek(x storage.SeriesRef) bool { + if r.At() >= x { + return true + } + + // We cannot do any search due to how values are stored, + // so we simply advance until we find the right value. + for r.Next() { + if r.At() >= x { + return true + } + } + + return false +} diff --git a/pkg/store/streamed_postings_test.go b/pkg/store/streamed_postings_test.go new file mode 100644 index 0000000000..6bd5edac65 --- /dev/null +++ b/pkg/store/streamed_postings_test.go @@ -0,0 +1,140 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package store + +import ( + "bytes" + "context" + "fmt" + "os" + "testing" + + "github.com/efficientgo/core/testutil" + "github.com/oklog/ulid" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/encoding" + "github.com/prometheus/prometheus/tsdb/index" +) + +type noopIndexCache struct { + postingsData []byte +} + +func (n *noopIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) { + n.postingsData = v +} +func (n *noopIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) { + return nil, nil +} +func (n *noopIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) {} +func (n *noopIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { + return nil, nil +} + +type readerCloser struct { + *bytes.Buffer +} + +func (r *readerCloser) Close() error { + return nil +} + +func TestStreamedPostingsReader(t *testing.T) { + postingsData := []byte{} + eb := encoding.Encbuf{B: postingsData} + eb.PutBE32(3) + + eb.PutBE32(0) + eb.PutBE32(1) + eb.PutBE32(2) + + ic := &noopIndexCache{} + srr, err := newStreamedPostingsReader(&readerCloser{Buffer: bytes.NewBuffer(eb.Get())}, ulid.MustNew(1, nil), labels.Label{}, ic, nil, nil) + testutil.Ok(t, err) + + refs := []storage.SeriesRef{} + + for srr.Next() { + refs = append(refs, srr.At()) + } + + testutil.Equals(t, []storage.SeriesRef{0, 1, 2}, refs) + + testutil.Ok(t, srr.Close()) + testutil.Assert(t, len(ic.postingsData) == 24, fmt.Sprintf("postings data length is not 24 (%d)", len(ic.postingsData))) + + cp, err := diffVarintSnappyStreamedDecode(ic.postingsData) + testutil.Ok(t, err) + + decompressedRefs := []storage.SeriesRef{} + + for cp.Next() { + decompressedRefs = append(decompressedRefs, cp.At()) + } + + testutil.Equals(t, refs, decompressedRefs) +} + +func TestStreamedPostingsReaderEdge(t *testing.T) { + fis, err := os.ReadDir("./testdata") + testutil.Ok(t, err) + + for _, fi := range fis { + fi := fi + t.Run(fi.Name(), func(t *testing.T) { + postingsData, err := os.ReadFile(fmt.Sprintf("./testdata/%s", fi.Name())) + testutil.Ok(t, err) + + ic := &noopIndexCache{} + srr, err := newStreamedPostingsReader(&readerCloser{Buffer: bytes.NewBuffer(postingsData)}, ulid.MustNew(1, nil), labels.Label{}, ic, nil, nil) + testutil.Ok(t, err) + + refs := []storage.SeriesRef{} + + for srr.Next() { + refs = append(refs, srr.At()) + } + + t.Log(len(refs)) + + testutil.Ok(t, srr.Close()) + testutil.Ok(t, srr.Err()) + testutil.Equals(t, (len(postingsData)/4)-1, len(refs)) + + cp, err := diffVarintSnappyStreamedDecode(ic.postingsData) + testutil.Ok(t, err) + + decompressedRefs := []storage.SeriesRef{} + + for cp.Next() { + decompressedRefs = append(decompressedRefs, cp.At()) + } + + testutil.Equals(t, refs, decompressedRefs) + }) + } + + streamPostingsReaders := []index.Postings{} + for _, fi := range fis { + fi := fi + postingsData, err := os.ReadFile(fmt.Sprintf("./testdata/%s", fi.Name())) + testutil.Ok(t, err) + + ic := &noopIndexCache{} + srr, err := newStreamedPostingsReader(&readerCloser{Buffer: bytes.NewBuffer(postingsData)}, ulid.MustNew(1, nil), labels.Label{}, ic, nil, nil) + testutil.Ok(t, err) + + streamPostingsReaders = append(streamPostingsReaders, srr) + } + + intersected := index.Without(index.Intersect(streamPostingsReaders...), index.EmptyPostings()) + refs := []storage.SeriesRef{} + + for intersected.Next() { + refs = append(refs, intersected.At()) + } + + t.Log(len(refs)) +} diff --git a/pkg/store/testdata/postings689800435609769302 b/pkg/store/testdata/postings689800435609769302 new file mode 100755 index 0000000000..0a500774e3 Binary files /dev/null and b/pkg/store/testdata/postings689800435609769302 differ diff --git a/pkg/store/testdata/postings7688315166188217905 b/pkg/store/testdata/postings7688315166188217905 new file mode 100755 index 0000000000..dceb6c975d Binary files /dev/null and b/pkg/store/testdata/postings7688315166188217905 differ