Skip to content

Commit

Permalink
Add non-indexed metadata to chunks (#9700)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
In #9694 we support adding metadata
labels for each entry in the push payload. In this PR we take that
metadata and add it to the entries in the chunk. Supporting
serialization and deserialization of those metadata labels.

---------

Signed-off-by: Vladyslav Diachenko <vlad.diachenko@grafana.com>
Co-authored-by: Vladyslav Diachenko <vlad.diachenko@grafana.com>
  • Loading branch information
salvacorts and vlad-diachenko authored Jul 18, 2023
1 parent c3da1ca commit ce91076
Show file tree
Hide file tree
Showing 9 changed files with 930 additions and 366 deletions.
241 changes: 202 additions & 39 deletions pkg/chunkenc/memchunk.go

Large diffs are not rendered by default.

607 changes: 405 additions & 202 deletions pkg/chunkenc/memchunk_test.go

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions pkg/chunkenc/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ var (
// Buckets [0.5KB,1KB,2KB,4KB,8KB]
BytesBufferPool = pool.New(1<<9, 1<<13, 2, func(size int) interface{} { return make([]byte, 0, size) })

// LabelsPool is a matrix of bytes buffers used to store label names and values.
// Buckets [8, 16, 32, 64, 128, 256].
// Since we store label names and values, the number of labels we can store is the half the bucket size.
// So we will be able to store from 0 to 128 labels.
LabelsPool = pool.New(1<<3, 1<<8, 2, func(size int) interface{} { return make([][]byte, 0, size) })

// SamplesPool pooling array of samples [512,1024,...,16k]
SamplesPool = pool.New(1<<9, 1<<14, 2, func(size int) interface{} { return make([]logproto.Sample, 0, size) })

Expand Down
156 changes: 127 additions & 29 deletions pkg/chunkenc/unordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type HeadBlock interface {
Entries() int
UncompressedSize() int
Convert(HeadBlockFmt) (HeadBlock, error)
Append(int64, string) error
Append(int64, string, labels.Labels) error
Iterator(
ctx context.Context,
direction logproto.Direction,
Expand All @@ -52,6 +52,7 @@ type HeadBlock interface {
}

type unorderedHeadBlock struct {
format HeadBlockFmt
// Opted for range tree over skiplist for space reduction.
// Inserts: O(log(n))
// Scans: (O(k+log(n))) where k=num_scanned_entries & n=total_entries
Expand All @@ -62,13 +63,14 @@ type unorderedHeadBlock struct {
mint, maxt int64 // upper and lower bounds
}

func newUnorderedHeadBlock() *unorderedHeadBlock {
func newUnorderedHeadBlock(headBlockFmt HeadBlockFmt) *unorderedHeadBlock {
return &unorderedHeadBlock{
rt: rangetree.New(1),
format: headBlockFmt,
rt: rangetree.New(1),
}
}

func (hb *unorderedHeadBlock) Format() HeadBlockFmt { return UnorderedHeadBlockFmt }
func (hb *unorderedHeadBlock) Format() HeadBlockFmt { return hb.format }

func (hb *unorderedHeadBlock) IsEmpty() bool {
return hb.size == 0
Expand All @@ -87,21 +89,30 @@ func (hb *unorderedHeadBlock) UncompressedSize() int {
}

func (hb *unorderedHeadBlock) Reset() {
x := newUnorderedHeadBlock()
x := newUnorderedHeadBlock(hb.format)
*hb = *x
}

type nsEntry struct {
line string
metadataLabels labels.Labels
}

// collection of entries belonging to the same nanosecond
type nsEntries struct {
ts int64
entries []string
entries []nsEntry
}

func (e *nsEntries) ValueAtDimension(_ uint64) int64 {
return e.ts
}

func (hb *unorderedHeadBlock) Append(ts int64, line string) error {
func (hb *unorderedHeadBlock) Append(ts int64, line string, metaLabels labels.Labels) error {
if hb.format < UnorderedWithMetadataHeadBlockFmt {
// metaLabels must be ignored for the previous head block formats
metaLabels = nil
}
// This is an allocation hack. The rangetree lib does not
// support the ability to pass a "mutate" function during an insert
// and instead will displace any existing entry at the specified timestamp.
Expand All @@ -120,14 +131,14 @@ func (hb *unorderedHeadBlock) Append(ts int64, line string) error {
// entries at the same time with the same content, iterate through any existing
// entries and ignore the line if we already have an entry with the same content
for _, et := range displaced[0].(*nsEntries).entries {
if et == line {
if et.line == line {
e.entries = displaced[0].(*nsEntries).entries
return nil
}
}
e.entries = append(displaced[0].(*nsEntries).entries, line)
e.entries = append(displaced[0].(*nsEntries).entries, nsEntry{line, metaLabels})
} else {
e.entries = []string{line}
e.entries = []nsEntry{{line, metaLabels}}
}

// Update hb metdata
Expand All @@ -139,12 +150,20 @@ func (hb *unorderedHeadBlock) Append(ts int64, line string) error {
hb.maxt = ts
}

hb.size += len(line)
hb.size += len(line) + metaLabelsLen(metaLabels)
hb.lines++

return nil
}

func metaLabelsLen(metaLabels labels.Labels) int {
length := 0
for _, label := range metaLabels {
length += len(label.Name) + len(label.Value)
}
return length
}

// Implements rangetree.Interval
type interval struct {
mint, maxt int64
Expand All @@ -162,7 +181,7 @@ func (hb *unorderedHeadBlock) forEntries(
direction logproto.Direction,
mint,
maxt int64,
entryFn func(int64, string) error, // returning an error exits early
entryFn func(int64, string, labels.Labels) error, // returning an error exits early
) (err error) {
if hb.IsEmpty() || (maxt < hb.mint || hb.maxt < mint) {
return
Expand Down Expand Up @@ -191,9 +210,10 @@ func (hb *unorderedHeadBlock) forEntries(
}

for ; i < len(es.entries) && i >= 0; next() {
line := es.entries[i]
line := es.entries[i].line
metadataLabels := es.entries[i].metadataLabels
chunkStats.AddHeadChunkBytes(int64(len(line)))
err = entryFn(es.ts, line)
err = entryFn(es.ts, line, metadataLabels)

}
}
Expand Down Expand Up @@ -235,7 +255,7 @@ func (hb *unorderedHeadBlock) Iterator(
direction,
mint,
maxt,
func(ts int64, line string) error {
func(ts int64, line string, nonIndexedLabels labels.Labels) error {
newLine, parsedLbs, matches := pipeline.ProcessString(ts, line)
if !matches {
return nil
Expand All @@ -253,8 +273,9 @@ func (hb *unorderedHeadBlock) Iterator(
}

stream.Entries = append(stream.Entries, logproto.Entry{
Timestamp: time.Unix(0, ts),
Line: newLine,
Timestamp: time.Unix(0, ts),
Line: newLine,
NonIndexedLabels: logproto.FromLabelsToLabelAdapters(nonIndexedLabels),
})
return nil
},
Expand Down Expand Up @@ -284,7 +305,7 @@ func (hb *unorderedHeadBlock) SampleIterator(
logproto.FORWARD,
mint,
maxt,
func(ts int64, line string) error {
func(ts int64, line string, metaLabels labels.Labels) error {
value, parsedLabels, ok := extractor.ProcessString(ts, line)
if !ok {
return nil
Expand All @@ -307,6 +328,7 @@ func (hb *unorderedHeadBlock) SampleIterator(
Timestamp: ts,
Value: value,
Hash: xxhash.Sum64(unsafeGetBytes(line)),
// TODO: add metadata labels to sample
})
return nil
},
Expand Down Expand Up @@ -346,14 +368,29 @@ func (hb *unorderedHeadBlock) Serialise(pool WriterPool) ([]byte, error) {
logproto.FORWARD,
0,
math.MaxInt64,
func(ts int64, line string) error {
func(ts int64, line string, metaLabels labels.Labels) error {
n := binary.PutVarint(encBuf, ts)
inBuf.Write(encBuf[:n])

n = binary.PutUvarint(encBuf, uint64(len(line)))
inBuf.Write(encBuf[:n])

inBuf.WriteString(line)

if hb.format >= UnorderedWithMetadataHeadBlockFmt {
// Serialize metadata labels
n = binary.PutUvarint(encBuf, uint64(len(metaLabels)))
inBuf.Write(encBuf[:n])
for _, l := range metaLabels {
n = binary.PutUvarint(encBuf, uint64(len(l.Name)))
inBuf.Write(encBuf[:n])
inBuf.WriteString(l.Name)

n = binary.PutUvarint(encBuf, uint64(len(l.Value)))
inBuf.Write(encBuf[:n])
inBuf.WriteString(l.Value)
}
}
return nil
},
)
Expand All @@ -369,7 +406,7 @@ func (hb *unorderedHeadBlock) Serialise(pool WriterPool) ([]byte, error) {
}

func (hb *unorderedHeadBlock) Convert(version HeadBlockFmt) (HeadBlock, error) {
if version > OrderedHeadBlockFmt {
if hb.format == version {
return hb, nil
}
out := version.NewBlock()
Expand All @@ -379,8 +416,8 @@ func (hb *unorderedHeadBlock) Convert(version HeadBlockFmt) (HeadBlock, error) {
logproto.FORWARD,
0,
math.MaxInt64,
func(ts int64, line string) error {
return out.Append(ts, line)
func(ts int64, line string, metaLabels labels.Labels) error {
return out.Append(ts, line, metaLabels)
},
)
return out, err
Expand All @@ -392,7 +429,22 @@ func (hb *unorderedHeadBlock) CheckpointSize() int {
size += binary.MaxVarintLen32 * 2 // total entries + total size
size += binary.MaxVarintLen64 * 2 // mint,maxt
size += (binary.MaxVarintLen64 + binary.MaxVarintLen32) * hb.lines // ts + len of log line.
size += hb.size // uncompressed bytes of lines
if hb.format >= UnorderedWithMetadataHeadBlockFmt {
_ = hb.forEntries(
context.Background(),
logproto.FORWARD,
0,
math.MaxInt64,
func(ts int64, line string, metaLabels labels.Labels) error {
// len of meta labels
size += binary.MaxVarintLen32
// len of name and value of each meta label, the size of values is already included into hb.size
size += (binary.MaxVarintLen32 * 2) * len(metaLabels)
return nil
},
)
}
size += hb.size // uncompressed bytes of lines
return size
}

Expand Down Expand Up @@ -432,7 +484,7 @@ func (hb *unorderedHeadBlock) CheckpointTo(w io.Writer) error {
logproto.FORWARD,
0,
math.MaxInt64,
func(ts int64, line string) error {
func(ts int64, line string, metaLabels labels.Labels) error {
eb.putVarint64(ts)
eb.putUvarint(len(line))
_, err = w.Write(eb.get())
Expand All @@ -445,6 +497,35 @@ func (hb *unorderedHeadBlock) CheckpointTo(w io.Writer) error {
if err != nil {
return errors.Wrap(err, "write headblock entry line")
}

if hb.format >= UnorderedWithMetadataHeadBlockFmt {
// metadata
eb.putUvarint(len(metaLabels))
_, err = w.Write(eb.get())
if err != nil {
return errors.Wrap(err, "write headBlock entry meta labels length")
}
eb.reset()
for _, l := range metaLabels {
eb.putUvarint(len(l.Name))
eb.putUvarint(len(l.Value))
_, err = w.Write(eb.get())
if err != nil {
return errors.Wrap(err, "write headBlock entry meta label name and value length")
}
eb.reset()

_, err = io.WriteString(w, l.Name)
if err != nil {
return errors.Wrap(err, "write headBlock entry meta label name")
}
_, err = io.WriteString(w, l.Value)
if err != nil {
return errors.Wrap(err, "write headBlock entry meta label value")
}
}
}

return nil
},
)
Expand All @@ -454,7 +535,7 @@ func (hb *unorderedHeadBlock) CheckpointTo(w io.Writer) error {

func (hb *unorderedHeadBlock) LoadBytes(b []byte) error {
// ensure it's empty
*hb = *newUnorderedHeadBlock()
*hb = *newUnorderedHeadBlock(hb.format)

if len(b) < 1 {
return nil
Expand All @@ -467,8 +548,8 @@ func (hb *unorderedHeadBlock) LoadBytes(b []byte) error {
return errors.Wrap(db.err(), "verifying headblock header")
}

if version != UnorderedHeadBlockFmt.Byte() {
return errors.Errorf("incompatible headBlock version (%v), only V4 is currently supported", version)
if version < UnorderedHeadBlockFmt.Byte() {
return errors.Errorf("incompatible headBlock version (%v), only V4 and the next versions are currently supported", version)
}

n := db.uvarint()
Expand All @@ -481,7 +562,24 @@ func (hb *unorderedHeadBlock) LoadBytes(b []byte) error {
ts := db.varint64()
lineLn := db.uvarint()
line := string(db.bytes(lineLn))
if err := hb.Append(ts, line); err != nil {

var metaLabels labels.Labels
if version >= UnorderedWithMetadataHeadBlockFmt.Byte() {
metaLn := db.uvarint()
if metaLn > 0 {
metaLabels = make(labels.Labels, metaLn)
for j := 0; j < metaLn && db.err() == nil; j++ {
nameLn := db.uvarint()
valueLn := db.uvarint()
metaLabels[j] = labels.Label{
Name: string(db.bytes(nameLn)),
Value: string(db.bytes(valueLn)),
}
}
}
}

if err := hb.Append(ts, line, metaLabels); err != nil {
return err
}
}
Expand All @@ -508,7 +606,7 @@ func HeadFromCheckpoint(b []byte, desired HeadBlockFmt) (HeadBlock, error) {
return nil, errors.Wrap(db.err(), "verifying headblock header")
}
format := HeadBlockFmt(version)
if format > UnorderedHeadBlockFmt {
if format > UnorderedWithMetadataHeadBlockFmt {
return nil, fmt.Errorf("unexpected head block version: %v", format)
}

Expand Down
Loading

0 comments on commit ce91076

Please sign in to comment.