Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve observability for non-indexed labels usage #9993

Merged
merged 6 commits into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -1182,9 +1182,6 @@ func (si *bufferedIterator) Next() bool {
si.Close()
return false
}
// we decode always the line length and ts as varint
si.stats.AddDecompressedBytes(int64(len(line)) + 2*binary.MaxVarintLen64)
si.stats.AddDecompressedLines(1)

si.currTs = ts
si.currLine = line
Expand All @@ -1194,6 +1191,8 @@ func (si *bufferedIterator) Next() bool {

// moveNext moves the buffer to the next entry
func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) {
var decompressedBytes int64
var decompressedMetadataBytes int64
var ts int64
var tWidth, lWidth, lineSize, lastAttempt int
for lWidth == 0 { // Read until both varints have enough bytes.
Expand All @@ -1219,6 +1218,9 @@ func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) {
lastAttempt = si.readBufValid
}

// TS and line length
decompressedBytes += 2 * binary.MaxVarintLen64

if lineSize >= maxLineLength {
si.err = fmt.Errorf("line too long %d, maximum %d", lineSize, maxLineLength)
return 0, nil, nil, false
Expand Down Expand Up @@ -1256,7 +1258,11 @@ func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) {
}
}

decompressedBytes += int64(lineSize)

if si.format < chunkFormatV4 {
si.stats.AddDecompressedBytes(decompressedBytes)
si.stats.AddDecompressedLines(1)
return ts, si.buf[:lineSize], nil, true
}

Expand Down Expand Up @@ -1286,6 +1292,9 @@ func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) {
lastAttempt = si.readBufValid
}

// Number of labels
decompressedMetadataBytes += binary.MaxVarintLen64

// Shift down what is still left in the fixed-size read buffer, if any.
si.readBufValid = copy(si.readBuf[:], si.readBuf[labelsWidth:si.readBufValid])

Expand Down Expand Up @@ -1336,6 +1345,9 @@ func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) {
lastAttempt = si.readBufValid
}

// Label size
decompressedMetadataBytes += binary.MaxVarintLen64

// If the buffer is not yet initialize or too small, we get a new one.
if si.metaLabelsBuf[i] == nil || labelSize > cap(si.metaLabelsBuf[i]) {
// in case of a replacement we replace back the buffer in the pool
Expand Down Expand Up @@ -1369,8 +1381,14 @@ func (si *bufferedIterator) moveNext() (int64, []byte, [][]byte, bool) {
return 0, nil, nil, false
}
}

decompressedMetadataBytes += int64(labelSize)
}

si.stats.AddDecompressedLines(1)
si.stats.AddDecompressedMetadataBytes(decompressedMetadataBytes)
si.stats.AddDecompressedBytes(decompressedBytes + decompressedMetadataBytes)

return ts, si.buf[:lineSize], si.metaLabelsBuf[:metaLabelsBufLen], true
}

Expand Down
22 changes: 21 additions & 1 deletion pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1574,10 +1574,26 @@ func TestMemChunk_IteratorWithNonIndexedLabels(t *testing.T) {
labels.FromStrings("traceID", "123", "user", "d").String(),
}

// The expected bytes is the sum of bytes decompressed and bytes read from the head chunk.
// First we add the bytes read from the store (aka decompressed). That's
// metadataBytes = n. lines * (n. labels <int> + (2 * n. labels) * (label length <int> + label))
// lineBytes = n. lines * (ts <int> + line length <int> + line)
expectedMetadataBytes := 2 * (binary.MaxVarintLen64 + (binary.MaxVarintLen64 + len("traceID") + binary.MaxVarintLen64 + len("123") + binary.MaxVarintLen64 + len("user") + binary.MaxVarintLen64 + len("a")))
lineBytes := 2 * (2*binary.MaxVarintLen64 + len("lineA"))
// Now we add the bytes read from the head chunk. That's
// metadataBytes = n. lines * (n. labels * (label name + label value))
// lineBytes = n. lines * (line)
expectedMetadataBytes += 2 * (len("traceID") + len("789") + len("user") + len("c"))
lineBytes += 2 * (len("lineC"))
// Finally, the expected total bytes is the line bytes + metadata bytes
expectedBytes := lineBytes + expectedMetadataBytes

// We will run the test twice so the iterator will be created twice.
// This is to ensure that the iterator is correctly closed.
for i := 0; i < 2; i++ {
it, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline)
sts, ctx := stats.NewContext(context.Background())

it, err := chk.Iterator(ctx, time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline)
require.NoError(t, err)

var lines []string
Expand All @@ -1590,6 +1606,10 @@ func TestMemChunk_IteratorWithNonIndexedLabels(t *testing.T) {
}
assert.ElementsMatch(t, expectedLines, lines)
assert.ElementsMatch(t, expectedStreams, streams)

resultStats := sts.Result(0, 0, len(lines))
require.Equal(t, int64(expectedBytes), resultStats.Summary.TotalBytesProcessed)
require.Equal(t, int64(expectedMetadataBytes), resultStats.Summary.TotalMetadataBytesProcessed)
}
})
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/chunkenc/unordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,14 @@ func (hb *unorderedHeadBlock) forEntries(
for ; i < len(es.entries) && i >= 0; next() {
line := es.entries[i].line
metadataLabels := es.entries[i].metadataLabels
chunkStats.AddHeadChunkBytes(int64(len(line)))

var metadataBytes int64
for _, label := range metadataLabels {
metadataBytes += int64(len(label.Name) + len(label.Value))
}
chunkStats.AddHeadChunkMetadataBytes(metadataBytes)
chunkStats.AddHeadChunkBytes(int64(len(line)) + metadataBytes)

err = entryFn(chunkStats, es.ts, line, metadataLabels)

}
Expand Down
14 changes: 11 additions & 3 deletions pkg/loghttp/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,22 @@ var (
bytesIngested = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "distributor_bytes_received_total",
Help: "The total number of uncompressed bytes received per tenant",
Help: "The total number of uncompressed bytes received per tenant. Includes metadata.",
}, []string{"tenant", "retention_hours"})
metadataBytesIngested = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "distributor_metadata_bytes_received_total",
Help: "The total number of uncompressed bytes received per tenant for entries metadata (non-indexed labels)",
}, []string{"tenant", "retention_hours"})
linesIngested = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "distributor_lines_received_total",
Help: "The total number of lines received per tenant",
}, []string{"tenant"})

bytesReceivedStats = analytics.NewCounter("distributor_bytes_received")
linesReceivedStats = analytics.NewCounter("distributor_lines_received")
bytesReceivedStats = analytics.NewCounter("distributor_bytes_received")
metadataBytesReceivedStats = analytics.NewCounter("distributor_metadata_bytes_received")
linesReceivedStats = analytics.NewCounter("distributor_lines_received")
)

const applicationJSON = "application/json"
Expand Down Expand Up @@ -141,7 +147,9 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete
entriesSize += entrySize
nonIndexedLabelsSize += entryLabelsSize
bytesIngested.WithLabelValues(userID, retentionHours).Add(float64(entrySize))
metadataBytesIngested.WithLabelValues(userID, retentionHours).Add(float64(entryLabelsSize))
bytesReceivedStats.Inc(entrySize)
metadataBytesReceivedStats.Inc(entryLabelsSize)
if e.Timestamp.After(mostRecentEntry) {
mostRecentEntry = e.Timestamp
}
Expand Down
39 changes: 24 additions & 15 deletions pkg/loghttp/push/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,16 @@ func deflateString(source string) string {
}

func TestParseRequest(t *testing.T) {
var previousBytesReceived, previousLinesReceived int
var previousBytesReceived, previousMetadataBytesReceived, previousLinesReceived int
for index, test := range []struct {
path string
body string
contentType string
contentEncoding string
valid bool
expectedBytes int
expectedLines int
path string
body string
contentType string
contentEncoding string
valid bool
expectedMetadataBytes int
expectedBytes int
expectedLines int
}{
{
path: `/loki/api/v1/push`,
Expand Down Expand Up @@ -176,16 +177,18 @@ func TestParseRequest(t *testing.T) {
valid: false,
},
{
path: `/loki/api/v1/push`,
body: deflateString(`{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz", {"a": "a", "b": "b"} ] ] }]}`),
contentType: `application/json; charset=utf-8`,
contentEncoding: `deflate`,
valid: true,
expectedBytes: len("fizzbuzz") + 2*len("a") + 2*len("b"),
expectedLines: 1,
path: `/loki/api/v1/push`,
body: deflateString(`{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz", {"a": "a", "b": "b"} ] ] }]}`),
contentType: `application/json; charset=utf-8`,
contentEncoding: `deflate`,
valid: true,
expectedMetadataBytes: 2*len("a") + 2*len("b"),
expectedBytes: len("fizzbuzz") + 2*len("a") + 2*len("b"),
expectedLines: 1,
},
} {
t.Run(fmt.Sprintf("test %d", index), func(t *testing.T) {
metadataBytesIngested.Reset()
bytesIngested.Reset()
linesIngested.Reset()

Expand All @@ -199,6 +202,8 @@ func TestParseRequest(t *testing.T) {

data, err := ParseRequest(util_log.Logger, "fake", request, nil)

metadataBytesReceived := int(metadataBytesReceivedStats.Value()["total"].(int64)) - previousMetadataBytesReceived
previousMetadataBytesReceived += metadataBytesReceived
bytesReceived := int(bytesReceivedStats.Value()["total"].(int64)) - previousBytesReceived
previousBytesReceived += bytesReceived
linesReceived := int(linesReceivedStats.Value()["total"].(int64)) - previousLinesReceived
Expand All @@ -207,15 +212,19 @@ func TestParseRequest(t *testing.T) {
if test.valid {
assert.Nil(t, err, "Should not give error for %d", index)
assert.NotNil(t, data, "Should give data for %d", index)
require.Equal(t, test.expectedMetadataBytes, metadataBytesReceived)
require.Equal(t, test.expectedBytes, bytesReceived)
require.Equal(t, test.expectedLines, linesReceived)
require.Equal(t, float64(test.expectedMetadataBytes), testutil.ToFloat64(metadataBytesIngested.WithLabelValues("fake", "")))
require.Equal(t, float64(test.expectedBytes), testutil.ToFloat64(bytesIngested.WithLabelValues("fake", "")))
require.Equal(t, float64(test.expectedLines), testutil.ToFloat64(linesIngested.WithLabelValues("fake")))
} else {
assert.NotNil(t, err, "Should give error for %d", index)
assert.Nil(t, data, "Should not give data for %d", index)
require.Equal(t, 0, metadataBytesReceived)
require.Equal(t, 0, bytesReceived)
require.Equal(t, 0, linesReceived)
require.Equal(t, float64(0), testutil.ToFloat64(metadataBytesIngested.WithLabelValues("fake", "")))
require.Equal(t, float64(0), testutil.ToFloat64(bytesIngested.WithLabelValues("fake", "")))
require.Equal(t, float64(0), testutil.ToFloat64(linesIngested.WithLabelValues("fake")))
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/logqlmodel/stats/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ func JoinIngesters(ctx context.Context, inc Ingester) {
func (r *Result) ComputeSummary(execTime time.Duration, queueTime time.Duration, totalEntriesReturned int) {
r.Summary.TotalBytesProcessed = r.Querier.Store.Chunk.DecompressedBytes + r.Querier.Store.Chunk.HeadChunkBytes +
r.Ingester.Store.Chunk.DecompressedBytes + r.Ingester.Store.Chunk.HeadChunkBytes
r.Summary.TotalMetadataBytesProcessed = r.Querier.Store.Chunk.DecompressedMetadataBytes + r.Querier.Store.Chunk.HeadChunkMetadataBytes +
r.Ingester.Store.Chunk.DecompressedMetadataBytes + r.Ingester.Store.Chunk.HeadChunkMetadataBytes
r.Summary.TotalLinesProcessed = r.Querier.Store.Chunk.DecompressedLines + r.Querier.Store.Chunk.HeadChunkLines +
r.Ingester.Store.Chunk.DecompressedLines + r.Ingester.Store.Chunk.HeadChunkLines
r.Summary.TotalPostFilterLines = r.Querier.Store.Chunk.PostFilterLines + r.Ingester.Store.Chunk.PostFilterLines
Expand All @@ -172,8 +174,10 @@ func (s *Store) Merge(m Store) {
s.TotalChunksDownloaded += m.TotalChunksDownloaded
s.ChunksDownloadTime += m.ChunksDownloadTime
s.Chunk.HeadChunkBytes += m.Chunk.HeadChunkBytes
s.Chunk.HeadChunkMetadataBytes += m.Chunk.HeadChunkMetadataBytes
s.Chunk.HeadChunkLines += m.Chunk.HeadChunkLines
s.Chunk.DecompressedBytes += m.Chunk.DecompressedBytes
s.Chunk.DecompressedMetadataBytes += m.Chunk.DecompressedMetadataBytes
s.Chunk.DecompressedLines += m.Chunk.DecompressedLines
s.Chunk.CompressedBytes += m.Chunk.CompressedBytes
s.Chunk.TotalDuplicates += m.Chunk.TotalDuplicates
Expand Down Expand Up @@ -284,6 +288,10 @@ func (c *Context) AddHeadChunkBytes(i int64) {
atomic.AddInt64(&c.store.Chunk.HeadChunkBytes, i)
}

func (c *Context) AddHeadChunkMetadataBytes(i int64) {
atomic.AddInt64(&c.store.Chunk.HeadChunkMetadataBytes, i)
}

func (c *Context) AddCompressedBytes(i int64) {
atomic.AddInt64(&c.store.Chunk.CompressedBytes, i)
}
Expand All @@ -292,6 +300,10 @@ func (c *Context) AddDecompressedBytes(i int64) {
atomic.AddInt64(&c.store.Chunk.DecompressedBytes, i)
}

func (c *Context) AddDecompressedMetadataBytes(i int64) {
atomic.AddInt64(&c.store.Chunk.DecompressedMetadataBytes, i)
}

func (c *Context) AddDecompressedLines(i int64) {
atomic.AddInt64(&c.store.Chunk.DecompressedLines, i)
}
Expand Down
Loading