Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cache usage statistics #6317

Merged
merged 11 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
#### Loki

##### Enhancements
* [6317](https://github.com/grafana/loki/pull/6317/files) **dannykoping**: General: add cache usage statistics

##### Fixes
* [6152](https://github.com/grafana/loki/pull/6152) **slim-bean**: Fixes unbounded ingester memory growth when live tailing under specific circumstances.
Expand Down
8 changes: 8 additions & 0 deletions pkg/logql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,14 @@ func RecordRangeAndInstantQueryMetrics(
"total_entries", stats.Summary.TotalEntriesReturned,
"queue_time", logql_stats.ConvertSecondsToNanoseconds(stats.Summary.QueueTime),
"subqueries", stats.Summary.Subqueries,
"cache_chunk_req", stats.Caches.Chunk.EntriesRequested,
"cache_chunk_hit", stats.Caches.Chunk.EntriesFound,
"cache_chunk_bytes_stored", stats.Caches.Chunk.BytesSent,
"cache_chunk_bytes_fetched", stats.Caches.Chunk.BytesReceived,
"cache_index_req", stats.Caches.Index.EntriesRequested,
"cache_index_hit", stats.Caches.Index.EntriesFound,
"cache_result_req", stats.Caches.Result.EntriesRequested,
"cache_result_hit", stats.Caches.Result.EntriesFound,
}...)

logValues = append(logValues, tagsToKeyValues(queryTags)...)
Expand Down
2 changes: 1 addition & 1 deletion pkg/logql/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestLogSlowQuery(t *testing.T) {
}, logqlmodel.Streams{logproto.Stream{Entries: make([]logproto.Entry, 10)}})
require.Equal(t,
fmt.Sprintf(
"level=info org_id=foo traceID=%s latency=slow query=\"{foo=\\\"bar\\\"} |= \\\"buzz\\\"\" query_type=filter range_type=range length=1h0m0s step=1m0s duration=25.25s status=200 limit=1000 returned_lines=10 throughput=100kB total_bytes=100kB total_entries=10 queue_time=2ns subqueries=0 source=logvolhist feature=beta\n",
"level=info org_id=foo traceID=%s latency=slow query=\"{foo=\\\"bar\\\"} |= \\\"buzz\\\"\" query_type=filter range_type=range length=1h0m0s step=1m0s duration=25.25s status=200 limit=1000 returned_lines=10 throughput=100kB total_bytes=100kB total_entries=10 queue_time=2ns subqueries=0 cache_chunk_req=0 cache_chunk_hit=0 cache_chunk_bytes_stored=0 cache_chunk_bytes_fetched=0 cache_index_req=0 cache_index_hit=0 cache_result_req=0 cache_result_hit=0 source=logvolhist feature=beta\n",
sp.Context().(jaeger.SpanContext).SpanID().String(),
),
buf.String())
Expand Down
140 changes: 140 additions & 0 deletions pkg/logqlmodel/stats/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const (
type Context struct {
querier Querier
ingester Ingester
caches Caches

// store is the store statistics collected across the query path
store Store
Expand All @@ -52,6 +53,15 @@ type Context struct {
mtx sync.Mutex
}

type CacheType string

const (
ChunkCache CacheType = "chunk" //nolint:staticcheck
IndexCache = "index"
ResultCache = "result"
WriteDedupeCache = "write-dedupe"
Copy link
Contributor Author

@dannykopping dannykopping Jun 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added the write dedupe cache here for the sake of completeness, but it's not yet displayed along with the others.

)

// NewContext creates a new statistics context
func NewContext(ctx context.Context) (*Context, context.Context) {
contextData := &Context{}
Expand Down Expand Up @@ -79,6 +89,15 @@ func (c *Context) Ingester() Ingester {
}
}

// Caches returns the cache statistics accumulated so far.
func (c *Context) Caches() Caches {
return Caches{
Chunk: c.caches.Chunk,
Index: c.caches.Index,
Result: c.caches.Result,
}
}

// Reset clears the statistics.
func (c *Context) Reset() {
c.mtx.Lock()
Expand All @@ -88,6 +107,7 @@ func (c *Context) Reset() {
c.querier.Reset()
c.ingester.Reset()
c.result.Reset()
c.caches.Reset()
}

// Result calculates the summary based on store and ingester data.
Expand All @@ -99,6 +119,7 @@ func (c *Context) Result(execTime time.Duration, queueTime time.Duration, totalE
Store: c.store,
},
Ingester: c.ingester,
Caches: c.caches,
})

r.ComputeSummary(execTime, queueTime, totalEntriesReturned)
Expand Down Expand Up @@ -168,12 +189,28 @@ func (i *Ingester) Merge(m Ingester) {
i.TotalReached += m.TotalReached
}

func (c *Caches) Merge(m Caches) {
c.Chunk.Merge(m.Chunk)
c.Index.Merge(m.Index)
c.Result.Merge(m.Result)
}

func (c *Cache) Merge(m Cache) {
c.EntriesFound += m.EntriesFound
c.EntriesRequested += m.EntriesRequested
c.EntriesStored += m.EntriesStored
c.Requests += m.Requests
c.BytesSent += m.BytesSent
c.BytesReceived += m.BytesReceived
}

// Merge merges two results of statistics.
// This will increase the total number of Subqueries.
func (r *Result) Merge(m Result) {
r.Summary.Subqueries++
r.Querier.Merge(m.Querier)
r.Ingester.Merge(m.Ingester)
r.Caches.Merge(m.Caches)
r.ComputeSummary(ConvertSecondsToNanoseconds(r.Summary.ExecTime+m.Summary.ExecTime),
ConvertSecondsToNanoseconds(r.Summary.QueueTime+m.Summary.QueueTime), int(r.Summary.TotalEntriesReturned))
}
Expand Down Expand Up @@ -257,6 +294,85 @@ func (c *Context) AddChunksRef(i int64) {
atomic.AddInt64(&c.store.TotalChunksRef, i)
}

// AddCacheEntriesFound counts the number of cache entries requested and found
func (c *Context) AddCacheEntriesFound(t CacheType, i int) {
stats := c.getCacheStatsByType(t)
if stats == nil {
return
}

atomic.AddInt32(&stats.EntriesFound, int32(i))
}

// AddCacheEntriesRequested counts the number of keys requested from the cache
func (c *Context) AddCacheEntriesRequested(t CacheType, i int) {
stats := c.getCacheStatsByType(t)
if stats == nil {
return
}

atomic.AddInt32(&stats.EntriesRequested, int32(i))
}

// AddCacheEntriesStored counts the number of keys *attempted* to be stored in the cache
// It should be noted that if a background writeback (https://grafana.com/docs/loki/latest/configuration/#cache_config)
// is configured we cannot know if these store attempts succeeded or not as this happens asynchronously
func (c *Context) AddCacheEntriesStored(t CacheType, i int) {
stats := c.getCacheStatsByType(t)
if stats == nil {
return
}

atomic.AddInt32(&stats.EntriesStored, int32(i))
}

// AddCacheBytesRetrieved counts the amount of bytes retrieved from the cache
func (c *Context) AddCacheBytesRetrieved(t CacheType, i int) {
stats := c.getCacheStatsByType(t)
if stats == nil {
return
}

atomic.AddInt64(&stats.BytesReceived, int64(i))
}

// AddCacheBytesSent counts the amount of bytes sent to the cache
// It should be noted that if a background writeback (https://grafana.com/docs/loki/latest/configuration/#cache_config)
// is configured we cannot know if these bytes actually got stored or not as this happens asynchronously
func (c *Context) AddCacheBytesSent(t CacheType, i int) {
stats := c.getCacheStatsByType(t)
if stats == nil {
return
}

atomic.AddInt64(&stats.BytesSent, int64(i))
}

// AddCacheRequest counts the number of fetch/store requests to the cache
func (c *Context) AddCacheRequest(t CacheType, i int) {
stats := c.getCacheStatsByType(t)
if stats == nil {
return
}

atomic.AddInt32(&stats.Requests, int32(i))
}

func (c *Context) getCacheStatsByType(t CacheType) *Cache {
var stats *Cache
switch t {
case ChunkCache:
stats = &c.caches.Chunk
case IndexCache:
stats = &c.caches.Index
case ResultCache:
stats = &c.caches.Result
default:
return nil
}
return stats
}

// Log logs a query statistics result.
func (r Result) Log(log log.Logger) {
_ = log.Log(
Expand Down Expand Up @@ -284,6 +400,7 @@ func (r Result) Log(log log.Logger) {
"Querier.CompressedBytes", humanize.Bytes(uint64(r.Querier.Store.Chunk.CompressedBytes)),
"Querier.TotalDuplicates", r.Querier.Store.Chunk.TotalDuplicates,
)
r.Caches.Log(log)
r.Summary.Log(log)
}

Expand All @@ -297,3 +414,26 @@ func (s Summary) Log(log log.Logger) {
"Summary.QueueTime", ConvertSecondsToNanoseconds(s.QueueTime),
)
}

func (c Caches) Log(log log.Logger) {
_ = log.Log(
"Cache.Chunk.Requests", c.Chunk.Requests,
"Cache.Chunk.EntriesRequested", c.Chunk.EntriesRequested,
DylanGuedes marked this conversation as resolved.
Show resolved Hide resolved
"Cache.Chunk.EntriesFound", c.Chunk.EntriesFound,
"Cache.Chunk.EntriesStored", c.Chunk.EntriesStored,
"Cache.Chunk.BytesSent", humanize.Bytes(uint64(c.Chunk.BytesSent)),
"Cache.Chunk.BytesReceived", humanize.Bytes(uint64(c.Chunk.BytesReceived)),
"Cache.Index.Requests", c.Index.Requests,
"Cache.Index.EntriesRequested", c.Index.EntriesRequested,
"Cache.Index.EntriesFound", c.Index.EntriesFound,
"Cache.Index.EntriesStored", c.Index.EntriesStored,
"Cache.Index.BytesSent", humanize.Bytes(uint64(c.Index.BytesSent)),
"Cache.Index.BytesReceived", humanize.Bytes(uint64(c.Index.BytesReceived)),
"Cache.Result.Requests", c.Result.Requests,
"Cache.Result.EntriesRequested", c.Result.EntriesRequested,
"Cache.Result.EntriesFound", c.Result.EntriesFound,
"Cache.Result.EntriesStored", c.Result.EntriesStored,
"Cache.Result.BytesSent", humanize.Bytes(uint64(c.Result.BytesSent)),
"Cache.Result.BytesReceived", humanize.Bytes(uint64(c.Result.BytesReceived)),
)
}
68 changes: 68 additions & 0 deletions pkg/logqlmodel/stats/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ func TestResult(t *testing.T) {
stats.AddChunksRef(50)
stats.AddChunksDownloaded(60)
stats.AddChunksDownloadTime(time.Second)
stats.AddCacheRequest(ChunkCache, 3)
stats.AddCacheRequest(IndexCache, 4)
stats.AddCacheRequest(ResultCache, 1)

fakeIngesterQuery(ctx)
fakeIngesterQuery(ctx)
Expand Down Expand Up @@ -60,6 +63,17 @@ func TestResult(t *testing.T) {
},
},
},
Caches: Caches{
Chunk: Cache{
Requests: 3,
},
Index: Cache{
Requests: 4,
},
Result: Cache{
Requests: 1,
},
},
Summary: Summary{
ExecTime: 2 * time.Second.Seconds(),
QueueTime: 2 * time.Nanosecond.Seconds(),
Expand Down Expand Up @@ -182,6 +196,20 @@ func TestResult_Merge(t *testing.T) {
},
},
},
Caches: Caches{
Chunk: Cache{
Requests: 5,
BytesReceived: 1024,
BytesSent: 512,
},
Index: Cache{
EntriesRequested: 22,
EntriesFound: 2,
},
Result: Cache{
EntriesStored: 3,
},
},
Summary: Summary{
ExecTime: 2 * time.Second.Seconds(),
QueueTime: 2 * time.Nanosecond.Seconds(),
Expand Down Expand Up @@ -230,6 +258,20 @@ func TestResult_Merge(t *testing.T) {
},
},
},
Caches: Caches{
Chunk: Cache{
Requests: 2 * 5,
BytesReceived: 2 * 1024,
BytesSent: 2 * 512,
},
Index: Cache{
EntriesRequested: 2 * 22,
EntriesFound: 2 * 2,
},
Result: Cache{
EntriesStored: 2 * 3,
},
},
Summary: Summary{
ExecTime: 2 * 2 * time.Second.Seconds(),
QueueTime: 2 * 2 * time.Nanosecond.Seconds(),
Expand Down Expand Up @@ -273,3 +315,29 @@ func TestIngester(t *testing.T) {
},
}, statsCtx.Ingester())
}

func TestCaches(t *testing.T) {
statsCtx, _ := NewContext(context.Background())

statsCtx.AddCacheRequest(ChunkCache, 5)
statsCtx.AddCacheEntriesStored(ResultCache, 3)
statsCtx.AddCacheEntriesRequested(IndexCache, 22)
statsCtx.AddCacheBytesRetrieved(ChunkCache, 1024)
statsCtx.AddCacheBytesSent(ChunkCache, 512)
statsCtx.AddCacheEntriesFound(IndexCache, 2)

require.Equal(t, Caches{
Chunk: Cache{
Requests: 5,
BytesReceived: 1024,
BytesSent: 512,
},
Index: Cache{
EntriesRequested: 22,
EntriesFound: 2,
},
Result: Cache{
EntriesStored: 3,
},
}, statsCtx.Caches())
}
Loading