Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add non-indexed metadata to chunks #9700

Merged
merged 15 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 202 additions & 39 deletions pkg/chunkenc/memchunk.go

Large diffs are not rendered by default.

607 changes: 405 additions & 202 deletions pkg/chunkenc/memchunk_test.go

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions pkg/chunkenc/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ var (
// Buckets [0.5KB,1KB,2KB,4KB,8KB]
BytesBufferPool = pool.New(1<<9, 1<<13, 2, func(size int) interface{} { return make([]byte, 0, size) })

// LabelsPool is a matrix of bytes buffers used to store label names and values.
// Buckets [8, 16, 32, 64, 128, 256].
// Since we store label names and values, the number of labels we can store is the half the bucket size.
// So we will be able to store from 0 to 128 labels.
LabelsPool = pool.New(1<<3, 1<<8, 2, func(size int) interface{} { return make([][]byte, 0, size) })

// SamplesPool pooling array of samples [512,1024,...,16k]
SamplesPool = pool.New(1<<9, 1<<14, 2, func(size int) interface{} { return make([]logproto.Sample, 0, size) })

Expand Down
156 changes: 127 additions & 29 deletions pkg/chunkenc/unordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type HeadBlock interface {
Entries() int
UncompressedSize() int
Convert(HeadBlockFmt) (HeadBlock, error)
Append(int64, string) error
Append(int64, string, labels.Labels) error
Iterator(
ctx context.Context,
direction logproto.Direction,
Expand All @@ -52,6 +52,7 @@ type HeadBlock interface {
}

type unorderedHeadBlock struct {
format HeadBlockFmt
// Opted for range tree over skiplist for space reduction.
// Inserts: O(log(n))
// Scans: (O(k+log(n))) where k=num_scanned_entries & n=total_entries
Expand All @@ -62,13 +63,14 @@ type unorderedHeadBlock struct {
mint, maxt int64 // upper and lower bounds
}

func newUnorderedHeadBlock() *unorderedHeadBlock {
func newUnorderedHeadBlock(headBlockFmt HeadBlockFmt) *unorderedHeadBlock {
return &unorderedHeadBlock{
rt: rangetree.New(1),
format: headBlockFmt,
rt: rangetree.New(1),
}
}

func (hb *unorderedHeadBlock) Format() HeadBlockFmt { return UnorderedHeadBlockFmt }
func (hb *unorderedHeadBlock) Format() HeadBlockFmt { return hb.format }

func (hb *unorderedHeadBlock) IsEmpty() bool {
return hb.size == 0
Expand All @@ -87,21 +89,30 @@ func (hb *unorderedHeadBlock) UncompressedSize() int {
}

func (hb *unorderedHeadBlock) Reset() {
x := newUnorderedHeadBlock()
x := newUnorderedHeadBlock(hb.format)
*hb = *x
}

type nsEntry struct {
sandeepsukhani marked this conversation as resolved.
Show resolved Hide resolved
line string
metadataLabels labels.Labels
}

// collection of entries belonging to the same nanosecond
type nsEntries struct {
ts int64
entries []string
entries []nsEntry
}

func (e *nsEntries) ValueAtDimension(_ uint64) int64 {
return e.ts
}

func (hb *unorderedHeadBlock) Append(ts int64, line string) error {
func (hb *unorderedHeadBlock) Append(ts int64, line string, metaLabels labels.Labels) error {
if hb.format < UnorderedWithMetadataHeadBlockFmt {
// metaLabels must be ignored for the previous head block formats
metaLabels = nil
}
// This is an allocation hack. The rangetree lib does not
// support the ability to pass a "mutate" function during an insert
// and instead will displace any existing entry at the specified timestamp.
Expand All @@ -120,14 +131,14 @@ func (hb *unorderedHeadBlock) Append(ts int64, line string) error {
// entries at the same time with the same content, iterate through any existing
// entries and ignore the line if we already have an entry with the same content
for _, et := range displaced[0].(*nsEntries).entries {
if et == line {
if et.line == line {
Copy link
Contributor

@sandeepsukhani sandeepsukhani Jul 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also check for equality of metaLabels here to allow entries with the same timestamp and logline but different non-indexed labels? However, this would mean that we will have to generate a hash with both log line and non-indexed labels in SampleIterator, which uses it for deduping duplicate data due to replication.

It would be rare that someone would have log entries with the same timestamp and logline but different non-indexed labels, so I think we should keep it as is to avoid paying the cost to support this rare case. Just putting it out to see if anyone wants to share their thoughts here. If we decide not to support it, we should make it clear in the docs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be rare that someone would have log entries with the same timestamp and logline but different non-indexed labels

Taking promtail as an example, I don't see any stage that can duplicate log lines. Therefore, this situation would only happen if the user is using a custom client that somehow is running into this scenario. IMO, to avoid having to compute a hash over the meta labels, and to keep the code simple, we should not check the non-indexed labels here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking promtail as an example, I don't see any stage that can duplicate log lines.

I was only considering the case where the same logline is being emitted by some jobs, and for whatever reason, they all have the same stream labels.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with just checking (indexed_labels, ts, line) for equality here without specifying a formal stance yet. We can later choose to handle this differently if we choose as long as it doesn't seriously break backwards compatibility in the future. Basically, before we include this in a release

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am inclined towards keeping it as is to avoid supporting it partially, i.e. without taking care of required changes in SampleIterator. If you strongly feel we should make this change, please let me know, and I will take care of it with my string interning change.

e.entries = displaced[0].(*nsEntries).entries
return nil
}
}
e.entries = append(displaced[0].(*nsEntries).entries, line)
e.entries = append(displaced[0].(*nsEntries).entries, nsEntry{line, metaLabels})
} else {
e.entries = []string{line}
e.entries = []nsEntry{{line, metaLabels}}
}

// Update hb metdata
Expand All @@ -139,12 +150,20 @@ func (hb *unorderedHeadBlock) Append(ts int64, line string) error {
hb.maxt = ts
}

hb.size += len(line)
hb.size += len(line) + metaLabelsLen(metaLabels)
sandeepsukhani marked this conversation as resolved.
Show resolved Hide resolved
hb.lines++

return nil
}

func metaLabelsLen(metaLabels labels.Labels) int {
length := 0
for _, label := range metaLabels {
length += len(label.Name) + len(label.Value)
}
return length
}

// Implements rangetree.Interval
type interval struct {
mint, maxt int64
Expand All @@ -162,7 +181,7 @@ func (hb *unorderedHeadBlock) forEntries(
direction logproto.Direction,
mint,
maxt int64,
entryFn func(int64, string) error, // returning an error exits early
entryFn func(int64, string, labels.Labels) error, // returning an error exits early
) (err error) {
if hb.IsEmpty() || (maxt < hb.mint || hb.maxt < mint) {
return
Expand Down Expand Up @@ -191,9 +210,10 @@ func (hb *unorderedHeadBlock) forEntries(
}

for ; i < len(es.entries) && i >= 0; next() {
line := es.entries[i]
line := es.entries[i].line
metadataLabels := es.entries[i].metadataLabels
chunkStats.AddHeadChunkBytes(int64(len(line)))
err = entryFn(es.ts, line)
err = entryFn(es.ts, line, metadataLabels)

}
}
Expand Down Expand Up @@ -235,7 +255,7 @@ func (hb *unorderedHeadBlock) Iterator(
direction,
mint,
maxt,
func(ts int64, line string) error {
func(ts int64, line string, nonIndexedLabels labels.Labels) error {
newLine, parsedLbs, matches := pipeline.ProcessString(ts, line)
if !matches {
return nil
Expand All @@ -253,8 +273,9 @@ func (hb *unorderedHeadBlock) Iterator(
}

stream.Entries = append(stream.Entries, logproto.Entry{
Timestamp: time.Unix(0, ts),
Line: newLine,
Timestamp: time.Unix(0, ts),
Line: newLine,
NonIndexedLabels: logproto.FromLabelsToLabelAdapters(nonIndexedLabels),
})
return nil
},
Expand Down Expand Up @@ -284,7 +305,7 @@ func (hb *unorderedHeadBlock) SampleIterator(
logproto.FORWARD,
mint,
maxt,
func(ts int64, line string) error {
func(ts int64, line string, metaLabels labels.Labels) error {
value, parsedLabels, ok := extractor.ProcessString(ts, line)
if !ok {
return nil
Expand All @@ -307,6 +328,7 @@ func (hb *unorderedHeadBlock) SampleIterator(
Timestamp: ts,
Value: value,
Hash: xxhash.Sum64(unsafeGetBytes(line)),
// TODO: add metadata labels to sample
})
return nil
},
Expand Down Expand Up @@ -346,14 +368,29 @@ func (hb *unorderedHeadBlock) Serialise(pool WriterPool) ([]byte, error) {
logproto.FORWARD,
0,
math.MaxInt64,
func(ts int64, line string) error {
func(ts int64, line string, metaLabels labels.Labels) error {
n := binary.PutVarint(encBuf, ts)
inBuf.Write(encBuf[:n])

n = binary.PutUvarint(encBuf, uint64(len(line)))
inBuf.Write(encBuf[:n])

inBuf.WriteString(line)

if hb.format >= UnorderedWithMetadataHeadBlockFmt {
// Serialize metadata labels
n = binary.PutUvarint(encBuf, uint64(len(metaLabels)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had decided to write the length of the whole nonIndexedLabels section first so that we can choose to skip it altogether and jump to the beginning of the next entry. I can take care of it while working on string interning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. We can do it here, but we'd need to iterate through all the labels and sum up their lengths. With strings interning, IIUC, the label and values would be refs to wherever the actual string is. Therefore, in that case we wouldn't need to iterate through all the label value pairs, but rather multiply len(metaLabels) by twice the ref size, right?

In that case, I think it would make more sense to do it in the PR for strings interning directly? To not complicate this PR more than necessary provided that this part would change. Wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are writing data with variable length encoding, the number of bytes can't be estimated beforehand. We will have to write the data to a temp buffer first, calculate and write its size first and then write the data from temp buffer. I will take care of it while working on string interning.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep in mind, this will break compatibility between this PR and a future PR unless we add a newer format than UnorderedWithMetadataHeadBlockFmt. This is one of the benefits of not setting this as the default yet (via DefaultHeadBlockFmt = UnorderedHeadBlockFmt) -- we can change the implementation in a later PR before we start using it.

inBuf.Write(encBuf[:n])
for _, l := range metaLabels {
n = binary.PutUvarint(encBuf, uint64(len(l.Name)))
inBuf.Write(encBuf[:n])
inBuf.WriteString(l.Name)

n = binary.PutUvarint(encBuf, uint64(len(l.Value)))
inBuf.Write(encBuf[:n])
inBuf.WriteString(l.Value)
}
}
return nil
},
)
Expand All @@ -369,7 +406,7 @@ func (hb *unorderedHeadBlock) Serialise(pool WriterPool) ([]byte, error) {
}

func (hb *unorderedHeadBlock) Convert(version HeadBlockFmt) (HeadBlock, error) {
if version > OrderedHeadBlockFmt {
if hb.format == version {
return hb, nil
}
out := version.NewBlock()
Expand All @@ -379,8 +416,8 @@ func (hb *unorderedHeadBlock) Convert(version HeadBlockFmt) (HeadBlock, error) {
logproto.FORWARD,
0,
math.MaxInt64,
func(ts int64, line string) error {
return out.Append(ts, line)
func(ts int64, line string, metaLabels labels.Labels) error {
return out.Append(ts, line, metaLabels)
},
)
return out, err
Expand All @@ -392,7 +429,22 @@ func (hb *unorderedHeadBlock) CheckpointSize() int {
size += binary.MaxVarintLen32 * 2 // total entries + total size
size += binary.MaxVarintLen64 * 2 // mint,maxt
size += (binary.MaxVarintLen64 + binary.MaxVarintLen32) * hb.lines // ts + len of log line.
size += hb.size // uncompressed bytes of lines
if hb.format >= UnorderedWithMetadataHeadBlockFmt {
_ = hb.forEntries(
context.Background(),
logproto.FORWARD,
0,
math.MaxInt64,
func(ts int64, line string, metaLabels labels.Labels) error {
// len of meta labels
size += binary.MaxVarintLen32
// len of name and value of each meta label, the size of values is already included into hb.size
size += (binary.MaxVarintLen32 * 2) * len(metaLabels)
return nil
},
)
}
size += hb.size // uncompressed bytes of lines
return size
}

Expand Down Expand Up @@ -432,7 +484,7 @@ func (hb *unorderedHeadBlock) CheckpointTo(w io.Writer) error {
logproto.FORWARD,
0,
math.MaxInt64,
func(ts int64, line string) error {
func(ts int64, line string, metaLabels labels.Labels) error {
eb.putVarint64(ts)
eb.putUvarint(len(line))
_, err = w.Write(eb.get())
Expand All @@ -445,6 +497,35 @@ func (hb *unorderedHeadBlock) CheckpointTo(w io.Writer) error {
if err != nil {
return errors.Wrap(err, "write headblock entry line")
}

if hb.format >= UnorderedWithMetadataHeadBlockFmt {
// metadata
eb.putUvarint(len(metaLabels))
_, err = w.Write(eb.get())
if err != nil {
return errors.Wrap(err, "write headBlock entry meta labels length")
}
eb.reset()
for _, l := range metaLabels {
eb.putUvarint(len(l.Name))
eb.putUvarint(len(l.Value))
_, err = w.Write(eb.get())
if err != nil {
return errors.Wrap(err, "write headBlock entry meta label name and value length")
}
eb.reset()

_, err = io.WriteString(w, l.Name)
if err != nil {
return errors.Wrap(err, "write headBlock entry meta label name")
}
_, err = io.WriteString(w, l.Value)
if err != nil {
return errors.Wrap(err, "write headBlock entry meta label value")
}
}
}

return nil
},
)
Expand All @@ -454,7 +535,7 @@ func (hb *unorderedHeadBlock) CheckpointTo(w io.Writer) error {

func (hb *unorderedHeadBlock) LoadBytes(b []byte) error {
// ensure it's empty
*hb = *newUnorderedHeadBlock()
*hb = *newUnorderedHeadBlock(hb.format)

if len(b) < 1 {
return nil
Expand All @@ -467,8 +548,8 @@ func (hb *unorderedHeadBlock) LoadBytes(b []byte) error {
return errors.Wrap(db.err(), "verifying headblock header")
}

if version != UnorderedHeadBlockFmt.Byte() {
return errors.Errorf("incompatible headBlock version (%v), only V4 is currently supported", version)
if version < UnorderedHeadBlockFmt.Byte() {
return errors.Errorf("incompatible headBlock version (%v), only V4 and the next versions are currently supported", version)
}

n := db.uvarint()
Expand All @@ -481,7 +562,24 @@ func (hb *unorderedHeadBlock) LoadBytes(b []byte) error {
ts := db.varint64()
lineLn := db.uvarint()
line := string(db.bytes(lineLn))
if err := hb.Append(ts, line); err != nil {

var metaLabels labels.Labels
if version >= UnorderedWithMetadataHeadBlockFmt.Byte() {
metaLn := db.uvarint()
if metaLn > 0 {
metaLabels = make(labels.Labels, metaLn)
for j := 0; j < metaLn && db.err() == nil; j++ {
nameLn := db.uvarint()
valueLn := db.uvarint()
metaLabels[j] = labels.Label{
Name: string(db.bytes(nameLn)),
Value: string(db.bytes(valueLn)),
}
}
}
}

if err := hb.Append(ts, line, metaLabels); err != nil {
return err
}
}
Expand All @@ -508,7 +606,7 @@ func HeadFromCheckpoint(b []byte, desired HeadBlockFmt) (HeadBlock, error) {
return nil, errors.Wrap(db.err(), "verifying headblock header")
}
format := HeadBlockFmt(version)
if format > UnorderedHeadBlockFmt {
if format > UnorderedWithMetadataHeadBlockFmt {
return nil, fmt.Errorf("unexpected head block version: %v", format)
}

Expand Down
Loading