-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ruler: Recording Rules #3766
Merged
owen-d
merged 41 commits into
grafana:main
from
dannykopping:dannykopping/recording_rules
Jun 2, 2021
Merged
Ruler: Recording Rules #3766
Changes from 33 commits
Commits
Show all changes
41 commits
Select commit
Hold shift + click to select a range
7d5942c
WIP: hack to get recording rules working and pushing to Cortex/Promet…
2215f7b
Refactoring
1e2d782
Merge remote-tracking branch 'upstream/master' into dannykopping/reco…
fc48da7
Minor refactorings
870aa51
Moving manager subpackage into ruler package to avoid dependency cycles
5565a78
Minor refactorings
23356a3
Skipping commit if remote-write client is not defined
d857417
Merge remote-tracking branch 'upstream/master' into dannykopping/reco…
a202c1a
Merge remote-tracking branch 'upstream/main' into dannykopping/record…
8f07114
Updating use of cortex client
56ab4eb
Merge remote-tracking branch 'upstream/main' into dannykopping/record…
f816193
Memoizing appenders, using queue for samples & labels
d0be7fa
Adding buffer size configurability
524bbf7
Adding metric to show current buffer size
0339fbf
Merge remote-tracking branch 'upstream/main' into dannykopping/record…
df1f8d2
Refactoring for better responsibility separation & testability
df72b2f
Adding per-tenant overrides of remote-write queue capacity
2eb9042
Adding tests for evicting queue
19874d9
Adding more tests and refactoring
0200090
Adding queue benchmark
1760bd5
Merge remote-tracking branch 'upstream/main' into dannykopping/record…
5800010
Reducing redundancy in metric names
02a0943
Testing that only metric queries can be run
0d188f7
Minor fixes pre-review
858934a
Appeasing the linter
67c78fa
Guarding against unprotected nil pointer dereference in Prometheus re…
469ce9b
Appeasing the linter
52489cd
Setting tenant ID header on remote-write client
4f218cf
Updating benchmark to use complex struct rather than int to be more r…
4d3bebd
Registering flags
cc736d7
Adding metric to track remote-write commit errors
7e95a8c
Refactoring based on review
a7a4186
Performance improvements based on review
1e8b8af
Return error on invalid queue capacity
9dee0f4
Removing global queue capacity config - using limits
bf473c8
Reusing memory in request preparation
082f54e
Moving remote-write metrics into struct
14a2d74
Applying review suggestions
8424008
Merge remote-tracking branch 'upstream/main' into dannykopping/record…
22f628c
Allowing for runtime changing of per-tenant remote-write queue capacity
a320a46
Appeasing the linter
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,182 @@ | ||
package ruler | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
|
||
"github.com/cortexproject/cortex/pkg/cortexpb" | ||
"github.com/go-kit/kit/log" | ||
"github.com/go-kit/kit/log/level" | ||
"github.com/prometheus/prometheus/pkg/exemplar" | ||
"github.com/prometheus/prometheus/pkg/labels" | ||
"github.com/prometheus/prometheus/promql" | ||
"github.com/prometheus/prometheus/rules" | ||
"github.com/prometheus/prometheus/storage" | ||
|
||
"github.com/grafana/loki/pkg/util" | ||
) | ||
|
||
type RemoteWriteAppendable struct { | ||
groupAppender map[string]*RemoteWriteAppender | ||
|
||
userID string | ||
cfg Config | ||
overrides RulesLimits | ||
logger log.Logger | ||
} | ||
|
||
func newRemoteWriteAppendable(cfg Config, overrides RulesLimits, logger log.Logger, userID string) *RemoteWriteAppendable { | ||
return &RemoteWriteAppendable{ | ||
logger: logger, | ||
userID: userID, | ||
cfg: cfg, | ||
overrides: overrides, | ||
groupAppender: make(map[string]*RemoteWriteAppender), | ||
} | ||
} | ||
|
||
type RemoteWriteAppender struct { | ||
logger log.Logger | ||
ctx context.Context | ||
remoteWriter remoteWriter | ||
userID string | ||
groupKey string | ||
|
||
queue *util.EvictingQueue | ||
} | ||
|
||
func (a *RemoteWriteAppendable) Appender(ctx context.Context) storage.Appender { | ||
groupKey := retrieveGroupKeyFromContext(ctx) | ||
|
||
// create or retrieve an appender associated with this groupKey (unique ID for rule group) | ||
appender, found := a.groupAppender[groupKey] | ||
if found { | ||
return appender | ||
} | ||
|
||
client, err := newRemoteWriter(a.cfg, a.userID) | ||
if err != nil { | ||
level.Error(a.logger).Log("msg", "error creating remote-write client; setting appender as noop", "err", err, "tenant", a.userID) | ||
return &NoopAppender{} | ||
} | ||
|
||
capacity := a.queueCapacityForTenant() | ||
appender = &RemoteWriteAppender{ | ||
ctx: ctx, | ||
logger: a.logger, | ||
remoteWriter: client, | ||
groupKey: groupKey, | ||
userID: a.userID, | ||
|
||
queue: util.NewEvictingQueue(capacity, onEvict(a.userID, groupKey)), | ||
} | ||
|
||
samplesQueueCapacity.WithLabelValues(a.userID, groupKey).Set(float64(capacity)) | ||
|
||
// only track reference if groupKey was retrieved | ||
if groupKey == "" { | ||
level.Warn(a.logger).Log("msg", "blank group key passed via context; creating new appender") | ||
return appender | ||
} | ||
|
||
a.groupAppender[groupKey] = appender | ||
return appender | ||
} | ||
|
||
func (a *RemoteWriteAppendable) queueCapacityForTenant() int { | ||
capacity := a.cfg.RemoteWrite.QueueCapacity | ||
if tenantCapacity := a.overrides.RulerRemoteWriteQueueCapacity(a.userID); tenantCapacity > 0 { | ||
capacity = tenantCapacity | ||
} | ||
|
||
return capacity | ||
} | ||
|
||
func onEvict(userID, groupKey string) func() { | ||
return func() { | ||
samplesEvicted.WithLabelValues(userID, groupKey).Inc() | ||
} | ||
} | ||
|
||
func (a *RemoteWriteAppender) Append(_ uint64, l labels.Labels, t int64, v float64) (uint64, error) { | ||
a.queue.Append(queueEntry{ | ||
labels: l, | ||
sample: cortexpb.Sample{ | ||
Value: v, | ||
TimestampMs: t, | ||
}, | ||
}) | ||
|
||
samplesQueued.WithLabelValues(a.userID, a.groupKey).Set(float64(a.queue.Length())) | ||
samplesQueuedTotal.WithLabelValues(a.userID, a.groupKey).Inc() | ||
|
||
return 0, nil | ||
} | ||
|
||
func (a *RemoteWriteAppender) AppendExemplar(_ uint64, _ labels.Labels, _ exemplar.Exemplar) (uint64, error) { | ||
return 0, errors.New("exemplars are unsupported") | ||
} | ||
|
||
func (a *RemoteWriteAppender) Commit() error { | ||
if a.queue.Length() <= 0 { | ||
return nil | ||
} | ||
|
||
if a.remoteWriter == nil { | ||
level.Warn(a.logger).Log("msg", "no remote_write client defined, skipping commit") | ||
return nil | ||
} | ||
|
||
level.Debug(a.logger).Log("msg", "writing samples to remote_write target", "target", a.remoteWriter.Endpoint(), "count", a.queue.Length()) | ||
|
||
req, err := a.remoteWriter.PrepareRequest(a.queue) | ||
if err != nil { | ||
level.Error(a.logger).Log("msg", "could not prepare remote-write request", "err", err) | ||
remoteWriteErrors.WithLabelValues(a.userID, a.groupKey).Inc() | ||
return err | ||
} | ||
|
||
err = a.remoteWriter.Store(a.ctx, req) | ||
if err != nil { | ||
level.Error(a.logger).Log("msg", "could not store recording rule samples", "err", err) | ||
remoteWriteErrors.WithLabelValues(a.userID, a.groupKey).Inc() | ||
return err | ||
} | ||
|
||
// Clear the queue on a successful response | ||
a.queue.Clear() | ||
|
||
samplesQueued.WithLabelValues(a.userID, a.groupKey).Set(0) | ||
|
||
return nil | ||
} | ||
|
||
func (a *RemoteWriteAppender) Rollback() error { | ||
a.queue.Clear() | ||
|
||
return nil | ||
} | ||
|
||
func retrieveGroupKeyFromContext(ctx context.Context) string { | ||
data, found := ctx.Value(promql.QueryOrigin{}).(map[string]interface{}) | ||
if !found { | ||
return "" | ||
} | ||
|
||
ruleGroup, found := data["ruleGroup"].(map[string]string) | ||
if !found { | ||
return "" | ||
} | ||
|
||
file, found := ruleGroup["file"] | ||
if !found { | ||
return "" | ||
} | ||
|
||
name, found := ruleGroup["name"] | ||
if !found { | ||
return "" | ||
} | ||
|
||
return rules.GroupKey(file, name) | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is ultimately called on every evaluation cycle in the Ruler, we can probably add a method
WithCapacity
which can mutate an appenders capacity and thus will be kept up to date with the overrides (would need a test)