Skip to content

Commit

Permalink
Improve observability for non-indexed labels usage (grafana#9993)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
In grafana#9700, we support encoding and
decoding metadata for each entry into the chunks.
In this PR we:
- Update the bytes processed stats to account for the bytes from those
non-indexed labels
- Add new stats for bytes processed for those non-indexed labels
- Add new ingestion metrics to track ingested non-indexed bytes

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [x] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/upgrading/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](grafana@d10549e)
  • Loading branch information
salvacorts authored and LinTechSo committed Jul 21, 2023
1 parent 2cadd0b commit 3fa0d57
Show file tree
Hide file tree
Showing 13 changed files with 338 additions and 96 deletions.
24 changes: 21 additions & 3 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -1182,9 +1182,6 @@ func (si *bufferedIterator) Next() bool {
si.Close()
return false
}
// we decode always the line length and ts as varint
si.stats.AddDecompressedBytes(int64(len(line)) + 2*binary.MaxVarintLen64)
si.stats.AddDecompressedLines(1)

si.currTs = ts
si.currLine = line
Expand All @@ -1194,6 +1191,8 @@ func (si *bufferedIterator) Next() bool {

// moveNext moves the buffer to the next entry
func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) {
var decompressedBytes int64
var decompressedMetadataBytes int64
var ts int64
var tWidth, lWidth, lineSize, lastAttempt int
for lWidth == 0 { // Read until both varints have enough bytes.
Expand All @@ -1219,6 +1218,9 @@ func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) {
lastAttempt = si.readBufValid
}

// TS and line length
decompressedBytes += 2 * binary.MaxVarintLen64

if lineSize >= maxLineLength {
si.err = fmt.Errorf("line too long %d, maximum %d", lineSize, maxLineLength)
return 0, nil, nil, false
Expand Down Expand Up @@ -1256,7 +1258,11 @@ func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) {
}
}

decompressedBytes += int64(lineSize)

if si.format < chunkFormatV4 {
si.stats.AddDecompressedBytes(decompressedBytes)
si.stats.AddDecompressedLines(1)
return ts, si.buf[:lineSize], nil, true
}

Expand Down Expand Up @@ -1286,6 +1292,9 @@ func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) {
lastAttempt = si.readBufValid
}

// Number of labels
decompressedMetadataBytes += binary.MaxVarintLen64

// Shift down what is still left in the fixed-size read buffer, if any.
si.readBufValid = copy(si.readBuf[:], si.readBuf[labelsWidth:si.readBufValid])

Expand Down Expand Up @@ -1336,6 +1345,9 @@ func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) {
lastAttempt = si.readBufValid
}

// Label size
decompressedMetadataBytes += binary.MaxVarintLen64

// If the buffer is not yet initialize or too small, we get a new one.
if si.metaLabelsBuf[i] == nil || labelSize > cap(si.metaLabelsBuf[i]) {
// in case of a replacement we replace back the buffer in the pool
Expand Down Expand Up @@ -1369,8 +1381,14 @@ func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) {
return 0, nil, nil, false
}
}

decompressedMetadataBytes += int64(labelSize)
}

si.stats.AddDecompressedLines(1)
si.stats.AddDecompressedNonIndexedLabelsBytes(decompressedMetadataBytes)
si.stats.AddDecompressedBytes(decompressedBytes + decompressedMetadataBytes)

return ts, si.buf[:lineSize], si.metaLabelsBuf[:metaLabelsBufLen], true
}

Expand Down
22 changes: 21 additions & 1 deletion pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1574,10 +1574,26 @@ func TestMemChunk_IteratorWithNonIndexedLabels(t *testing.T) {
labels.FromStrings("traceID", "123", "user", "d").String(),
}

// The expected bytes is the sum of bytes decompressed and bytes read from the head chunk.
// First we add the bytes read from the store (aka decompressed). That's
// nonIndexedLabelsBytes = n. lines * (n. labels <int> + (2 * n. labels) * (label length <int> + label))
// lineBytes = n. lines * (ts <int> + line length <int> + line)
expectedNonIndexedLabelsBytes := 2 * (binary.MaxVarintLen64 + (binary.MaxVarintLen64 + len("traceID") + binary.MaxVarintLen64 + len("123") + binary.MaxVarintLen64 + len("user") + binary.MaxVarintLen64 + len("a")))
lineBytes := 2 * (2*binary.MaxVarintLen64 + len("lineA"))
// Now we add the bytes read from the head chunk. That's
// nonIndexedLabelsBytes = n. lines * (n. labels * (label name + label value))
// lineBytes = n. lines * (line)
expectedNonIndexedLabelsBytes += 2 * (len("traceID") + len("789") + len("user") + len("c"))
lineBytes += 2 * (len("lineC"))
// Finally, the expected total bytes is the line bytes + non-indexed labels bytes
expectedBytes := lineBytes + expectedNonIndexedLabelsBytes

// We will run the test twice so the iterator will be created twice.
// This is to ensure that the iterator is correctly closed.
for i := 0; i < 2; i++ {
it, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline)
sts, ctx := stats.NewContext(context.Background())

it, err := chk.Iterator(ctx, time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline)
require.NoError(t, err)

var lines []string
Expand All @@ -1590,6 +1606,10 @@ func TestMemChunk_IteratorWithNonIndexedLabels(t *testing.T) {
}
assert.ElementsMatch(t, expectedLines, lines)
assert.ElementsMatch(t, expectedStreams, streams)

resultStats := sts.Result(0, 0, len(lines))
require.Equal(t, int64(expectedBytes), resultStats.Summary.TotalBytesProcessed)
require.Equal(t, int64(expectedNonIndexedLabelsBytes), resultStats.Summary.TotalNonIndexedLabelsBytesProcessed)
}
})
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/chunkenc/unordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,14 @@ func (hb *unorderedHeadBlock) forEntries(
for ; i < len(es.entries) && i >= 0; next() {
line := es.entries[i].line
metadataLabels := es.entries[i].metadataLabels
chunkStats.AddHeadChunkBytes(int64(len(line)))

var nonIndexedLabelsBytes int64
for _, label := range metadataLabels {
nonIndexedLabelsBytes += int64(len(label.Name) + len(label.Value))
}
chunkStats.AddHeadChunkNonIndexedLabelsBytes(nonIndexedLabelsBytes)
chunkStats.AddHeadChunkBytes(int64(len(line)) + nonIndexedLabelsBytes)

err = entryFn(chunkStats, es.ts, line, metadataLabels)

}
Expand Down
14 changes: 11 additions & 3 deletions pkg/loghttp/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,22 @@ var (
bytesIngested = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "distributor_bytes_received_total",
Help: "The total number of uncompressed bytes received per tenant",
Help: "The total number of uncompressed bytes received per tenant. Includes non-indexed labels bytes.",
}, []string{"tenant", "retention_hours"})
nonIndexedLabelsBytesIngested = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "distributor_non_indexed_labels_bytes_received_total",
Help: "The total number of uncompressed bytes received per tenant for entries metadata (non-indexed labels)",
}, []string{"tenant", "retention_hours"})
linesIngested = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "distributor_lines_received_total",
Help: "The total number of lines received per tenant",
}, []string{"tenant"})

bytesReceivedStats = analytics.NewCounter("distributor_bytes_received")
linesReceivedStats = analytics.NewCounter("distributor_lines_received")
bytesReceivedStats = analytics.NewCounter("distributor_bytes_received")
nonIndexedLabelsBytesReceivedStats = analytics.NewCounter("distributor_non_indexed_labels_bytes_received")
linesReceivedStats = analytics.NewCounter("distributor_lines_received")
)

const applicationJSON = "application/json"
Expand Down Expand Up @@ -141,7 +147,9 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete
entriesSize += entrySize
nonIndexedLabelsSize += entryLabelsSize
bytesIngested.WithLabelValues(userID, retentionHours).Add(float64(entrySize))
nonIndexedLabelsBytesIngested.WithLabelValues(userID, retentionHours).Add(float64(entryLabelsSize))
bytesReceivedStats.Inc(entrySize)
nonIndexedLabelsBytesReceivedStats.Inc(entryLabelsSize)
if e.Timestamp.After(mostRecentEntry) {
mostRecentEntry = e.Timestamp
}
Expand Down
39 changes: 24 additions & 15 deletions pkg/loghttp/push/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,16 @@ func deflateString(source string) string {
}

func TestParseRequest(t *testing.T) {
var previousBytesReceived, previousLinesReceived int
var previousBytesReceived, previousNonIndexedLabelsBytesReceived, previousLinesReceived int
for index, test := range []struct {
path string
body string
contentType string
contentEncoding string
valid bool
expectedBytes int
expectedLines int
path string
body string
contentType string
contentEncoding string
valid bool
expectedNonIndexedLabelsBytes int
expectedBytes int
expectedLines int
}{
{
path: `/loki/api/v1/push`,
Expand Down Expand Up @@ -176,16 +177,18 @@ func TestParseRequest(t *testing.T) {
valid: false,
},
{
path: `/loki/api/v1/push`,
body: deflateString(`{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz", {"a": "a", "b": "b"} ] ] }]}`),
contentType: `application/json; charset=utf-8`,
contentEncoding: `deflate`,
valid: true,
expectedBytes: len("fizzbuzz") + 2*len("a") + 2*len("b"),
expectedLines: 1,
path: `/loki/api/v1/push`,
body: deflateString(`{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz", {"a": "a", "b": "b"} ] ] }]}`),
contentType: `application/json; charset=utf-8`,
contentEncoding: `deflate`,
valid: true,
expectedNonIndexedLabelsBytes: 2*len("a") + 2*len("b"),
expectedBytes: len("fizzbuzz") + 2*len("a") + 2*len("b"),
expectedLines: 1,
},
} {
t.Run(fmt.Sprintf("test %d", index), func(t *testing.T) {
nonIndexedLabelsBytesIngested.Reset()
bytesIngested.Reset()
linesIngested.Reset()

Expand All @@ -199,6 +202,8 @@ func TestParseRequest(t *testing.T) {

data, err := ParseRequest(util_log.Logger, "fake", request, nil)

nonIndexedLabelsBytesReceived := int(nonIndexedLabelsBytesReceivedStats.Value()["total"].(int64)) - previousNonIndexedLabelsBytesReceived
previousNonIndexedLabelsBytesReceived += nonIndexedLabelsBytesReceived
bytesReceived := int(bytesReceivedStats.Value()["total"].(int64)) - previousBytesReceived
previousBytesReceived += bytesReceived
linesReceived := int(linesReceivedStats.Value()["total"].(int64)) - previousLinesReceived
Expand All @@ -207,15 +212,19 @@ func TestParseRequest(t *testing.T) {
if test.valid {
assert.Nil(t, err, "Should not give error for %d", index)
assert.NotNil(t, data, "Should give data for %d", index)
require.Equal(t, test.expectedNonIndexedLabelsBytes, nonIndexedLabelsBytesReceived)
require.Equal(t, test.expectedBytes, bytesReceived)
require.Equal(t, test.expectedLines, linesReceived)
require.Equal(t, float64(test.expectedNonIndexedLabelsBytes), testutil.ToFloat64(nonIndexedLabelsBytesIngested.WithLabelValues("fake", "")))
require.Equal(t, float64(test.expectedBytes), testutil.ToFloat64(bytesIngested.WithLabelValues("fake", "")))
require.Equal(t, float64(test.expectedLines), testutil.ToFloat64(linesIngested.WithLabelValues("fake")))
} else {
assert.NotNil(t, err, "Should give error for %d", index)
assert.Nil(t, data, "Should not give data for %d", index)
require.Equal(t, 0, nonIndexedLabelsBytesReceived)
require.Equal(t, 0, bytesReceived)
require.Equal(t, 0, linesReceived)
require.Equal(t, float64(0), testutil.ToFloat64(nonIndexedLabelsBytesIngested.WithLabelValues("fake", "")))
require.Equal(t, float64(0), testutil.ToFloat64(bytesIngested.WithLabelValues("fake", "")))
require.Equal(t, float64(0), testutil.ToFloat64(linesIngested.WithLabelValues("fake")))
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/logqlmodel/stats/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ func JoinIngesters(ctx context.Context, inc Ingester) {
func (r *Result) ComputeSummary(execTime time.Duration, queueTime time.Duration, totalEntriesReturned int) {
r.Summary.TotalBytesProcessed = r.Querier.Store.Chunk.DecompressedBytes + r.Querier.Store.Chunk.HeadChunkBytes +
r.Ingester.Store.Chunk.DecompressedBytes + r.Ingester.Store.Chunk.HeadChunkBytes
r.Summary.TotalNonIndexedLabelsBytesProcessed = r.Querier.Store.Chunk.DecompressedNonIndexedLabelsBytes + r.Querier.Store.Chunk.HeadChunkNonIndexedLabelsBytes +
r.Ingester.Store.Chunk.DecompressedNonIndexedLabelsBytes + r.Ingester.Store.Chunk.HeadChunkNonIndexedLabelsBytes
r.Summary.TotalLinesProcessed = r.Querier.Store.Chunk.DecompressedLines + r.Querier.Store.Chunk.HeadChunkLines +
r.Ingester.Store.Chunk.DecompressedLines + r.Ingester.Store.Chunk.HeadChunkLines
r.Summary.TotalPostFilterLines = r.Querier.Store.Chunk.PostFilterLines + r.Ingester.Store.Chunk.PostFilterLines
Expand All @@ -172,8 +174,10 @@ func (s *Store) Merge(m Store) {
s.TotalChunksDownloaded += m.TotalChunksDownloaded
s.ChunksDownloadTime += m.ChunksDownloadTime
s.Chunk.HeadChunkBytes += m.Chunk.HeadChunkBytes
s.Chunk.HeadChunkNonIndexedLabelsBytes += m.Chunk.HeadChunkNonIndexedLabelsBytes
s.Chunk.HeadChunkLines += m.Chunk.HeadChunkLines
s.Chunk.DecompressedBytes += m.Chunk.DecompressedBytes
s.Chunk.DecompressedNonIndexedLabelsBytes += m.Chunk.DecompressedNonIndexedLabelsBytes
s.Chunk.DecompressedLines += m.Chunk.DecompressedLines
s.Chunk.CompressedBytes += m.Chunk.CompressedBytes
s.Chunk.TotalDuplicates += m.Chunk.TotalDuplicates
Expand Down Expand Up @@ -284,6 +288,10 @@ func (c *Context) AddHeadChunkBytes(i int64) {
atomic.AddInt64(&c.store.Chunk.HeadChunkBytes, i)
}

func (c *Context) AddHeadChunkNonIndexedLabelsBytes(i int64) {
atomic.AddInt64(&c.store.Chunk.HeadChunkNonIndexedLabelsBytes, i)
}

func (c *Context) AddCompressedBytes(i int64) {
atomic.AddInt64(&c.store.Chunk.CompressedBytes, i)
}
Expand All @@ -292,6 +300,10 @@ func (c *Context) AddDecompressedBytes(i int64) {
atomic.AddInt64(&c.store.Chunk.DecompressedBytes, i)
}

func (c *Context) AddDecompressedNonIndexedLabelsBytes(i int64) {
atomic.AddInt64(&c.store.Chunk.DecompressedNonIndexedLabelsBytes, i)
}

func (c *Context) AddDecompressedLines(i int64) {
atomic.AddInt64(&c.store.Chunk.DecompressedLines, i)
}
Expand Down
Loading

0 comments on commit 3fa0d57

Please sign in to comment.