Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync prometheus #595

Merged
merged 15 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 14 additions & 27 deletions cmd/prometheus/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,9 @@ func TestFailedStartupExitCode(t *testing.T) {
require.Error(t, err)

var exitError *exec.ExitError
if errors.As(err, &exitError) {
status := exitError.Sys().(syscall.WaitStatus)
require.Equal(t, expectedExitStatus, status.ExitStatus())
} else {
t.Errorf("unable to retrieve the exit status for prometheus: %v", err)
}
require.ErrorAs(t, err, &exitError)
status := exitError.Sys().(syscall.WaitStatus)
require.Equal(t, expectedExitStatus, status.ExitStatus())
}

type senderFunc func(alerts ...*notifier.Alert)
Expand Down Expand Up @@ -194,9 +191,7 @@ func TestSendAlerts(t *testing.T) {
tc := tc
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
senderFunc := senderFunc(func(alerts ...*notifier.Alert) {
if len(tc.in) == 0 {
t.Fatalf("sender called with 0 alert")
}
require.NotEmpty(t, tc.in, "sender called with 0 alert")
require.Equal(t, tc.exp, alerts)
})
rules.SendAlerts(senderFunc, "http://localhost:9090")(context.TODO(), "up", tc.in...)
Expand Down Expand Up @@ -228,7 +223,7 @@ func TestWALSegmentSizeBounds(t *testing.T) {
go func() { done <- prom.Wait() }()
select {
case err := <-done:
t.Errorf("prometheus should be still running: %v", err)
require.Fail(t, "prometheus should be still running: %v", err)
case <-time.After(startupTime):
prom.Process.Kill()
<-done
Expand All @@ -239,12 +234,9 @@ func TestWALSegmentSizeBounds(t *testing.T) {
err = prom.Wait()
require.Error(t, err)
var exitError *exec.ExitError
if errors.As(err, &exitError) {
status := exitError.Sys().(syscall.WaitStatus)
require.Equal(t, expectedExitStatus, status.ExitStatus())
} else {
t.Errorf("unable to retrieve the exit status for prometheus: %v", err)
}
require.ErrorAs(t, err, &exitError)
status := exitError.Sys().(syscall.WaitStatus)
require.Equal(t, expectedExitStatus, status.ExitStatus())
}
}

Expand Down Expand Up @@ -274,7 +266,7 @@ func TestMaxBlockChunkSegmentSizeBounds(t *testing.T) {
go func() { done <- prom.Wait() }()
select {
case err := <-done:
t.Errorf("prometheus should be still running: %v", err)
require.Fail(t, "prometheus should be still running: %v", err)
case <-time.After(startupTime):
prom.Process.Kill()
<-done
Expand All @@ -285,12 +277,9 @@ func TestMaxBlockChunkSegmentSizeBounds(t *testing.T) {
err = prom.Wait()
require.Error(t, err)
var exitError *exec.ExitError
if errors.As(err, &exitError) {
status := exitError.Sys().(syscall.WaitStatus)
require.Equal(t, expectedExitStatus, status.ExitStatus())
} else {
t.Errorf("unable to retrieve the exit status for prometheus: %v", err)
}
require.ErrorAs(t, err, &exitError)
status := exitError.Sys().(syscall.WaitStatus)
require.Equal(t, expectedExitStatus, status.ExitStatus())
}
}

Expand Down Expand Up @@ -347,10 +336,8 @@ func getCurrentGaugeValuesFor(t *testing.T, reg prometheus.Gatherer, metricNames
}

require.Len(t, g.GetMetric(), 1)
if _, ok := res[m]; ok {
t.Error("expected only one metric family for", m)
t.FailNow()
}
_, ok := res[m]
require.False(t, ok, "expected only one metric family for", m)
res[m] = *g.GetMetric()[0].GetGauge().Value
}
}
Expand Down
21 changes: 9 additions & 12 deletions cmd/prometheus/main_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/prometheus/prometheus/util/testutil"
)

Expand All @@ -37,9 +39,7 @@ func TestStartupInterrupt(t *testing.T) {

prom := exec.Command(promPath, "-test.main", "--config.file="+promConfig, "--storage.tsdb.path="+t.TempDir(), "--web.listen-address=0.0.0.0"+port)
err := prom.Start()
if err != nil {
t.Fatalf("execution error: %v", err)
}
require.NoError(t, err)

done := make(chan error, 1)
go func() {
Expand Down Expand Up @@ -68,14 +68,11 @@ Loop:
time.Sleep(500 * time.Millisecond)
}

if !startedOk {
t.Fatal("prometheus didn't start in the specified timeout")
}
switch err := prom.Process.Kill(); {
case err == nil:
t.Errorf("prometheus didn't shutdown gracefully after sending the Interrupt signal")
case stoppedErr != nil && stoppedErr.Error() != "signal: interrupt":
// TODO: find a better way to detect when the process didn't exit as expected!
t.Errorf("prometheus exited with an unexpected error: %v", stoppedErr)
require.True(t, startedOk, "prometheus didn't start in the specified timeout")
err = prom.Process.Kill()
require.Error(t, err, "prometheus didn't shutdown gracefully after sending the Interrupt signal")
// TODO - find a better way to detect when the process didn't exit as expected!
if stoppedErr != nil {
require.EqualError(t, stoppedErr, "signal: interrupt", "prometheus exit")
}
}
7 changes: 4 additions & 3 deletions cmd/promtool/unittest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package main
import (
"testing"

"github.com/stretchr/testify/require"

"github.com/prometheus/prometheus/promql"
)

Expand Down Expand Up @@ -178,9 +180,8 @@ func TestRulesUnitTestRun(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := RulesUnitTest(tt.queryOpts, tt.args.run, false, tt.args.files...); got != tt.want {
t.Errorf("RulesUnitTest() = %v, want %v", got, tt.want)
}
got := RulesUnitTest(tt.queryOpts, tt.args.run, false, tt.args.files...)
require.Equal(t, tt.want, got)
})
}
}
2 changes: 1 addition & 1 deletion documentation/prometheus-mixin/alerts.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@
alert: 'PrometheusNotIngestingSamples',
expr: |||
(
rate(prometheus_tsdb_head_samples_appended_total{%(prometheusSelector)s}[5m]) <= 0
sum without(type) (rate(prometheus_tsdb_head_samples_appended_total{%(prometheusSelector)s}[5m])) <= 0
and
(
sum without(scrape_job) (prometheus_target_metadata_cache_entries{%(prometheusSelector)s}) > 0
Expand Down
10 changes: 2 additions & 8 deletions promql/parser/lex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,16 +815,10 @@ func TestLexer(t *testing.T) {
hasError = true
}
}
if !hasError {
t.Logf("%d: input %q", i, test.input)
require.Fail(t, "expected lexing error but did not fail")
}
require.True(t, hasError, "%d: input %q, expected lexing error but did not fail", i, test.input)
continue
}
if lastItem.Typ == ERROR {
t.Logf("%d: input %q", i, test.input)
require.Fail(t, "unexpected lexing error at position %d: %s", lastItem.Pos, lastItem)
}
require.NotEqual(t, ERROR, lastItem.Typ, "%d: input %q, unexpected lexing error at position %d: %s", i, test.input, lastItem.Pos, lastItem)

eofItem := Item{EOF, posrange.Pos(len(test.input)), ""}
require.Equal(t, lastItem, eofItem, "%d: input %q", i, test.input)
Expand Down
5 changes: 2 additions & 3 deletions rules/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,8 @@ func NewAlertingRule(
evaluationTimestamp: atomic.NewTime(time.Time{}),
evaluationDuration: atomic.NewDuration(0),
lastError: atomic.NewError(nil),

noDependentRules: atomic.NewBool(false),
noDependencyRules: atomic.NewBool(false),
noDependentRules: atomic.NewBool(false),
noDependencyRules: atomic.NewBool(false),
}
}

Expand Down
6 changes: 5 additions & 1 deletion rules/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {

// If the rule has no dependencies, it can run concurrently because no other rules in this group depend on its output.
// Try run concurrently if there are slots available.
if ctrl := g.concurrencyController; ctrl.RuleEligible(g, rule) && ctrl.Allow() {
if ctrl := g.concurrencyController; isRuleEligibleForConcurrentExecution(rule) && ctrl.Allow() {
wg.Add(1)

go eval(i, rule, func() {
Expand Down Expand Up @@ -1065,3 +1065,7 @@ func buildDependencyMap(rules []Rule) dependencyMap {

return dependencies
}

func isRuleEligibleForConcurrentExecution(rule Rule) bool {
return rule.NoDependentRules() && rule.NoDependencyRules()
}
46 changes: 28 additions & 18 deletions rules/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ type ManagerOptions struct {
MaxConcurrentEvals int64
ConcurrentEvalsEnabled bool
RuleConcurrencyController RuleConcurrencyController
RuleDependencyController RuleDependencyController

DefaultEvaluationDelay func() time.Duration

Expand Down Expand Up @@ -154,6 +155,10 @@ func NewManager(o *ManagerOptions) *Manager {
}
}

if o.RuleDependencyController == nil {
o.RuleDependencyController = ruleDependencyController{}
}

m := &Manager{
groups: map[string]*Group{},
opts: o,
Expand Down Expand Up @@ -200,8 +205,6 @@ func (m *Manager) Update(interval time.Duration, files []string, externalLabels
m.mtx.Lock()
defer m.mtx.Unlock()

m.opts.RuleConcurrencyController.Invalidate()

groups, errs := m.LoadGroups(interval, externalLabels, externalURL, groupEvalIterationFunc, files...)

if errs != nil {
Expand Down Expand Up @@ -340,11 +343,7 @@ func (m *Manager) LoadGroups(
}

// Check dependencies between rules and store it on the Rule itself.
depMap := buildDependencyMap(rules)
for _, r := range rules {
r.SetNoDependentRules(depMap.dependents(r) == 0)
r.SetNoDependencyRules(depMap.dependencies(r) == 0)
}
m.opts.RuleDependencyController.AnalyseRules(rules)

groups[GroupKey(fn, rg.Name)] = NewGroup(GroupOptions{
Name: rg.Name,
Expand Down Expand Up @@ -445,24 +444,35 @@ func SendAlerts(s Sender, externalURL string) NotifyFunc {
}
}

// RuleConcurrencyController controls whether rules can be evaluated concurrently. Its purpose is to bound the amount
// of concurrency in rule evaluations to avoid overwhelming the Prometheus server with additional query load and ensure
// the correctness of rules running concurrently. Concurrency is controlled globally, not on a per-group basis.
type RuleConcurrencyController interface {
// RuleEligible determines if the rule can guarantee correct results while running concurrently.
RuleEligible(g *Group, r Rule) bool
// RuleDependencyController controls whether a set of rules have dependencies between each other.
type RuleDependencyController interface {
// AnalyseRules analyses dependencies between the input rules. For each rule that it's guaranteed
// not having any dependants and/or dependency, this function should call Rule.SetNoDependentRules(true)
// and/or Rule.SetNoDependencyRules(true).
AnalyseRules(rules []Rule)
}

type ruleDependencyController struct{}

// AnalyseRules implements RuleDependencyController.
func (c ruleDependencyController) AnalyseRules(rules []Rule) {
depMap := buildDependencyMap(rules)
for _, r := range rules {
r.SetNoDependentRules(depMap.dependents(r) == 0)
r.SetNoDependencyRules(depMap.dependencies(r) == 0)
}
}

// RuleConcurrencyController controls concurrency for rules that are safe to be evaluated concurrently.
// Its purpose is to bound the amount of concurrency in rule evaluations to avoid overwhelming the Prometheus
// server with additional query load. Concurrency is controlled globally, not on a per-group basis.
type RuleConcurrencyController interface {
// Allow determines whether any concurrent evaluation slots are available.
// If Allow() returns true, then Done() must be called to release the acquired slot.
Allow() bool

// Done releases a concurrent evaluation slot.
Done()

// Invalidate instructs the controller to invalidate its state.
// This should be called when groups are modified (during a reload, for instance), because the controller may
// store some state about each group in order to more efficiently determine rule eligibility.
Invalidate()
}

// concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules.
Expand Down
5 changes: 2 additions & 3 deletions rules/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1780,9 +1780,8 @@ func TestRuleGroupEvalIterationFunc(t *testing.T) {
evaluationTimestamp: atomic.NewTime(time.Time{}),
evaluationDuration: atomic.NewDuration(0),
lastError: atomic.NewError(nil),

noDependentRules: atomic.NewBool(false),
noDependencyRules: atomic.NewBool(false),
noDependentRules: atomic.NewBool(false),
noDependencyRules: atomic.NewBool(false),
}

group := NewGroup(GroupOptions{
Expand Down
9 changes: 4 additions & 5 deletions rules/origin.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,10 @@ func NewRuleDetail(r Rule) RuleDetail {
}

return RuleDetail{
Name: r.Name(),
Query: r.Query().String(),
Labels: r.Labels(),
Kind: kind,

Name: r.Name(),
Query: r.Query().String(),
Labels: r.Labels(),
Kind: kind,
NoDependentRules: r.NoDependentRules(),
NoDependencyRules: r.NoDependencyRules(),
}
Expand Down
9 changes: 4 additions & 5 deletions rules/origin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,10 @@ func (u unknownRule) SetEvaluationDuration(time.Duration) {}
func (u unknownRule) GetEvaluationDuration() time.Duration { return 0 }
func (u unknownRule) SetEvaluationTimestamp(time.Time) {}
func (u unknownRule) GetEvaluationTimestamp() time.Time { return time.Time{} }

func (u unknownRule) SetNoDependentRules(bool) {}
func (u unknownRule) NoDependentRules() bool { return false }
func (u unknownRule) SetNoDependencyRules(bool) {}
func (u unknownRule) NoDependencyRules() bool { return false }
func (u unknownRule) SetNoDependentRules(bool) {}
func (u unknownRule) NoDependentRules() bool { return false }
func (u unknownRule) SetNoDependencyRules(bool) {}
func (u unknownRule) NoDependencyRules() bool { return false }

func TestNewRuleDetailPanics(t *testing.T) {
require.PanicsWithValue(t, `unknown rule type "rules.unknownRule"`, func() {
Expand Down
5 changes: 2 additions & 3 deletions rules/recording.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,8 @@ func NewRecordingRule(name string, vector parser.Expr, lset labels.Labels) *Reco
evaluationTimestamp: atomic.NewTime(time.Time{}),
evaluationDuration: atomic.NewDuration(0),
lastError: atomic.NewError(nil),

noDependentRules: atomic.NewBool(false),
noDependencyRules: atomic.NewBool(false),
noDependentRules: atomic.NewBool(false),
noDependencyRules: atomic.NewBool(false),
}
}

Expand Down
Loading
Loading