diff --git a/cmd/prometheus/main_test.go b/cmd/prometheus/main_test.go index f4fe3855c6..03f3a9bc39 100644 --- a/cmd/prometheus/main_test.go +++ b/cmd/prometheus/main_test.go @@ -126,12 +126,9 @@ func TestFailedStartupExitCode(t *testing.T) { require.Error(t, err) var exitError *exec.ExitError - if errors.As(err, &exitError) { - status := exitError.Sys().(syscall.WaitStatus) - require.Equal(t, expectedExitStatus, status.ExitStatus()) - } else { - t.Errorf("unable to retrieve the exit status for prometheus: %v", err) - } + require.ErrorAs(t, err, &exitError) + status := exitError.Sys().(syscall.WaitStatus) + require.Equal(t, expectedExitStatus, status.ExitStatus()) } type senderFunc func(alerts ...*notifier.Alert) @@ -194,9 +191,7 @@ func TestSendAlerts(t *testing.T) { tc := tc t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { senderFunc := senderFunc(func(alerts ...*notifier.Alert) { - if len(tc.in) == 0 { - t.Fatalf("sender called with 0 alert") - } + require.NotEmpty(t, tc.in, "sender called with 0 alert") require.Equal(t, tc.exp, alerts) }) rules.SendAlerts(senderFunc, "http://localhost:9090")(context.TODO(), "up", tc.in...) @@ -228,7 +223,7 @@ func TestWALSegmentSizeBounds(t *testing.T) { go func() { done <- prom.Wait() }() select { case err := <-done: - t.Errorf("prometheus should be still running: %v", err) + require.Fail(t, "prometheus should be still running: %v", err) case <-time.After(startupTime): prom.Process.Kill() <-done @@ -239,12 +234,9 @@ func TestWALSegmentSizeBounds(t *testing.T) { err = prom.Wait() require.Error(t, err) var exitError *exec.ExitError - if errors.As(err, &exitError) { - status := exitError.Sys().(syscall.WaitStatus) - require.Equal(t, expectedExitStatus, status.ExitStatus()) - } else { - t.Errorf("unable to retrieve the exit status for prometheus: %v", err) - } + require.ErrorAs(t, err, &exitError) + status := exitError.Sys().(syscall.WaitStatus) + require.Equal(t, expectedExitStatus, status.ExitStatus()) } } @@ -274,7 +266,7 @@ func TestMaxBlockChunkSegmentSizeBounds(t *testing.T) { go func() { done <- prom.Wait() }() select { case err := <-done: - t.Errorf("prometheus should be still running: %v", err) + require.Fail(t, "prometheus should be still running: %v", err) case <-time.After(startupTime): prom.Process.Kill() <-done @@ -285,12 +277,9 @@ func TestMaxBlockChunkSegmentSizeBounds(t *testing.T) { err = prom.Wait() require.Error(t, err) var exitError *exec.ExitError - if errors.As(err, &exitError) { - status := exitError.Sys().(syscall.WaitStatus) - require.Equal(t, expectedExitStatus, status.ExitStatus()) - } else { - t.Errorf("unable to retrieve the exit status for prometheus: %v", err) - } + require.ErrorAs(t, err, &exitError) + status := exitError.Sys().(syscall.WaitStatus) + require.Equal(t, expectedExitStatus, status.ExitStatus()) } } @@ -347,10 +336,8 @@ func getCurrentGaugeValuesFor(t *testing.T, reg prometheus.Gatherer, metricNames } require.Len(t, g.GetMetric(), 1) - if _, ok := res[m]; ok { - t.Error("expected only one metric family for", m) - t.FailNow() - } + _, ok := res[m] + require.False(t, ok, "expected only one metric family for", m) res[m] = *g.GetMetric()[0].GetGauge().Value } } diff --git a/cmd/prometheus/main_unix_test.go b/cmd/prometheus/main_unix_test.go index 417d062d66..2011fb123f 100644 --- a/cmd/prometheus/main_unix_test.go +++ b/cmd/prometheus/main_unix_test.go @@ -23,6 +23,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/prometheus/prometheus/util/testutil" ) @@ -37,9 +39,7 @@ func TestStartupInterrupt(t *testing.T) { prom := exec.Command(promPath, "-test.main", "--config.file="+promConfig, "--storage.tsdb.path="+t.TempDir(), "--web.listen-address=0.0.0.0"+port) err := prom.Start() - if err != nil { - t.Fatalf("execution error: %v", err) - } + require.NoError(t, err) done := make(chan error, 1) go func() { @@ -68,14 +68,11 @@ Loop: time.Sleep(500 * time.Millisecond) } - if !startedOk { - t.Fatal("prometheus didn't start in the specified timeout") - } - switch err := prom.Process.Kill(); { - case err == nil: - t.Errorf("prometheus didn't shutdown gracefully after sending the Interrupt signal") - case stoppedErr != nil && stoppedErr.Error() != "signal: interrupt": - // TODO: find a better way to detect when the process didn't exit as expected! - t.Errorf("prometheus exited with an unexpected error: %v", stoppedErr) + require.True(t, startedOk, "prometheus didn't start in the specified timeout") + err = prom.Process.Kill() + require.Error(t, err, "prometheus didn't shutdown gracefully after sending the Interrupt signal") + // TODO - find a better way to detect when the process didn't exit as expected! + if stoppedErr != nil { + require.EqualError(t, stoppedErr, "signal: interrupt", "prometheus exit") } } diff --git a/cmd/promtool/unittest_test.go b/cmd/promtool/unittest_test.go index b8170d784e..971ddb40c5 100644 --- a/cmd/promtool/unittest_test.go +++ b/cmd/promtool/unittest_test.go @@ -16,6 +16,8 @@ package main import ( "testing" + "github.com/stretchr/testify/require" + "github.com/prometheus/prometheus/promql" ) @@ -178,9 +180,8 @@ func TestRulesUnitTestRun(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := RulesUnitTest(tt.queryOpts, tt.args.run, false, tt.args.files...); got != tt.want { - t.Errorf("RulesUnitTest() = %v, want %v", got, tt.want) - } + got := RulesUnitTest(tt.queryOpts, tt.args.run, false, tt.args.files...) + require.Equal(t, tt.want, got) }) } } diff --git a/documentation/prometheus-mixin/alerts.libsonnet b/documentation/prometheus-mixin/alerts.libsonnet index 3efb0f27d1..508d89c244 100644 --- a/documentation/prometheus-mixin/alerts.libsonnet +++ b/documentation/prometheus-mixin/alerts.libsonnet @@ -122,7 +122,7 @@ alert: 'PrometheusNotIngestingSamples', expr: ||| ( - rate(prometheus_tsdb_head_samples_appended_total{%(prometheusSelector)s}[5m]) <= 0 + sum without(type) (rate(prometheus_tsdb_head_samples_appended_total{%(prometheusSelector)s}[5m])) <= 0 and ( sum without(scrape_job) (prometheus_target_metadata_cache_entries{%(prometheusSelector)s}) > 0 diff --git a/promql/parser/lex_test.go b/promql/parser/lex_test.go index 23c9dfbee0..4a29351b06 100644 --- a/promql/parser/lex_test.go +++ b/promql/parser/lex_test.go @@ -815,16 +815,10 @@ func TestLexer(t *testing.T) { hasError = true } } - if !hasError { - t.Logf("%d: input %q", i, test.input) - require.Fail(t, "expected lexing error but did not fail") - } + require.True(t, hasError, "%d: input %q, expected lexing error but did not fail", i, test.input) continue } - if lastItem.Typ == ERROR { - t.Logf("%d: input %q", i, test.input) - require.Fail(t, "unexpected lexing error at position %d: %s", lastItem.Pos, lastItem) - } + require.NotEqual(t, ERROR, lastItem.Typ, "%d: input %q, unexpected lexing error at position %d: %s", i, test.input, lastItem.Pos, lastItem) eofItem := Item{EOF, posrange.Pos(len(test.input)), ""} require.Equal(t, lastItem, eofItem, "%d: input %q", i, test.input) diff --git a/rules/alerting.go b/rules/alerting.go index eed1cc3fd1..5c82844ea1 100644 --- a/rules/alerting.go +++ b/rules/alerting.go @@ -171,9 +171,8 @@ func NewAlertingRule( evaluationTimestamp: atomic.NewTime(time.Time{}), evaluationDuration: atomic.NewDuration(0), lastError: atomic.NewError(nil), - - noDependentRules: atomic.NewBool(false), - noDependencyRules: atomic.NewBool(false), + noDependentRules: atomic.NewBool(false), + noDependencyRules: atomic.NewBool(false), } } diff --git a/rules/group.go b/rules/group.go index b56be4c456..751c27aeea 100644 --- a/rules/group.go +++ b/rules/group.go @@ -599,7 +599,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { // If the rule has no dependencies, it can run concurrently because no other rules in this group depend on its output. // Try run concurrently if there are slots available. - if ctrl := g.concurrencyController; ctrl.RuleEligible(g, rule) && ctrl.Allow() { + if ctrl := g.concurrencyController; isRuleEligibleForConcurrentExecution(rule) && ctrl.Allow() { wg.Add(1) go eval(i, rule, func() { @@ -1065,3 +1065,7 @@ func buildDependencyMap(rules []Rule) dependencyMap { return dependencies } + +func isRuleEligibleForConcurrentExecution(rule Rule) bool { + return rule.NoDependentRules() && rule.NoDependencyRules() +} diff --git a/rules/manager.go b/rules/manager.go index 2989ccfdb4..791510d558 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -121,6 +121,7 @@ type ManagerOptions struct { MaxConcurrentEvals int64 ConcurrentEvalsEnabled bool RuleConcurrencyController RuleConcurrencyController + RuleDependencyController RuleDependencyController DefaultEvaluationDelay func() time.Duration @@ -154,6 +155,10 @@ func NewManager(o *ManagerOptions) *Manager { } } + if o.RuleDependencyController == nil { + o.RuleDependencyController = ruleDependencyController{} + } + m := &Manager{ groups: map[string]*Group{}, opts: o, @@ -200,8 +205,6 @@ func (m *Manager) Update(interval time.Duration, files []string, externalLabels m.mtx.Lock() defer m.mtx.Unlock() - m.opts.RuleConcurrencyController.Invalidate() - groups, errs := m.LoadGroups(interval, externalLabels, externalURL, groupEvalIterationFunc, files...) if errs != nil { @@ -340,11 +343,7 @@ func (m *Manager) LoadGroups( } // Check dependencies between rules and store it on the Rule itself. - depMap := buildDependencyMap(rules) - for _, r := range rules { - r.SetNoDependentRules(depMap.dependents(r) == 0) - r.SetNoDependencyRules(depMap.dependencies(r) == 0) - } + m.opts.RuleDependencyController.AnalyseRules(rules) groups[GroupKey(fn, rg.Name)] = NewGroup(GroupOptions{ Name: rg.Name, @@ -445,24 +444,35 @@ func SendAlerts(s Sender, externalURL string) NotifyFunc { } } -// RuleConcurrencyController controls whether rules can be evaluated concurrently. Its purpose is to bound the amount -// of concurrency in rule evaluations to avoid overwhelming the Prometheus server with additional query load and ensure -// the correctness of rules running concurrently. Concurrency is controlled globally, not on a per-group basis. -type RuleConcurrencyController interface { - // RuleEligible determines if the rule can guarantee correct results while running concurrently. - RuleEligible(g *Group, r Rule) bool +// RuleDependencyController controls whether a set of rules have dependencies between each other. +type RuleDependencyController interface { + // AnalyseRules analyses dependencies between the input rules. For each rule that it's guaranteed + // not having any dependants and/or dependency, this function should call Rule.SetNoDependentRules(true) + // and/or Rule.SetNoDependencyRules(true). + AnalyseRules(rules []Rule) +} +type ruleDependencyController struct{} + +// AnalyseRules implements RuleDependencyController. +func (c ruleDependencyController) AnalyseRules(rules []Rule) { + depMap := buildDependencyMap(rules) + for _, r := range rules { + r.SetNoDependentRules(depMap.dependents(r) == 0) + r.SetNoDependencyRules(depMap.dependencies(r) == 0) + } +} + +// RuleConcurrencyController controls concurrency for rules that are safe to be evaluated concurrently. +// Its purpose is to bound the amount of concurrency in rule evaluations to avoid overwhelming the Prometheus +// server with additional query load. Concurrency is controlled globally, not on a per-group basis. +type RuleConcurrencyController interface { // Allow determines whether any concurrent evaluation slots are available. // If Allow() returns true, then Done() must be called to release the acquired slot. Allow() bool // Done releases a concurrent evaluation slot. Done() - - // Invalidate instructs the controller to invalidate its state. - // This should be called when groups are modified (during a reload, for instance), because the controller may - // store some state about each group in order to more efficiently determine rule eligibility. - Invalidate() } // concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules. diff --git a/rules/manager_test.go b/rules/manager_test.go index 5a97d1515b..0cd8cc02d2 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -1780,9 +1780,8 @@ func TestRuleGroupEvalIterationFunc(t *testing.T) { evaluationTimestamp: atomic.NewTime(time.Time{}), evaluationDuration: atomic.NewDuration(0), lastError: atomic.NewError(nil), - - noDependentRules: atomic.NewBool(false), - noDependencyRules: atomic.NewBool(false), + noDependentRules: atomic.NewBool(false), + noDependencyRules: atomic.NewBool(false), } group := NewGroup(GroupOptions{ diff --git a/rules/origin.go b/rules/origin.go index 53db5a27d2..695fc5f838 100644 --- a/rules/origin.go +++ b/rules/origin.go @@ -56,11 +56,10 @@ func NewRuleDetail(r Rule) RuleDetail { } return RuleDetail{ - Name: r.Name(), - Query: r.Query().String(), - Labels: r.Labels(), - Kind: kind, - + Name: r.Name(), + Query: r.Query().String(), + Labels: r.Labels(), + Kind: kind, NoDependentRules: r.NoDependentRules(), NoDependencyRules: r.NoDependencyRules(), } diff --git a/rules/origin_test.go b/rules/origin_test.go index 53aeb16ae9..75c83f9a4e 100644 --- a/rules/origin_test.go +++ b/rules/origin_test.go @@ -44,11 +44,10 @@ func (u unknownRule) SetEvaluationDuration(time.Duration) {} func (u unknownRule) GetEvaluationDuration() time.Duration { return 0 } func (u unknownRule) SetEvaluationTimestamp(time.Time) {} func (u unknownRule) GetEvaluationTimestamp() time.Time { return time.Time{} } - -func (u unknownRule) SetNoDependentRules(bool) {} -func (u unknownRule) NoDependentRules() bool { return false } -func (u unknownRule) SetNoDependencyRules(bool) {} -func (u unknownRule) NoDependencyRules() bool { return false } +func (u unknownRule) SetNoDependentRules(bool) {} +func (u unknownRule) NoDependentRules() bool { return false } +func (u unknownRule) SetNoDependencyRules(bool) {} +func (u unknownRule) NoDependencyRules() bool { return false } func TestNewRuleDetailPanics(t *testing.T) { require.PanicsWithValue(t, `unknown rule type "rules.unknownRule"`, func() { diff --git a/rules/recording.go b/rules/recording.go index 5f535c1727..243b6ebc4b 100644 --- a/rules/recording.go +++ b/rules/recording.go @@ -56,9 +56,8 @@ func NewRecordingRule(name string, vector parser.Expr, lset labels.Labels) *Reco evaluationTimestamp: atomic.NewTime(time.Time{}), evaluationDuration: atomic.NewDuration(0), lastError: atomic.NewError(nil), - - noDependentRules: atomic.NewBool(false), - noDependencyRules: atomic.NewBool(false), + noDependentRules: atomic.NewBool(false), + noDependencyRules: atomic.NewBool(false), } } diff --git a/scrape/manager_test.go b/scrape/manager_test.go index 524424269e..a73b730786 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -458,9 +458,9 @@ func loadConfiguration(t testing.TB, c string) *config.Config { t.Helper() cfg := &config.Config{} - if err := yaml.UnmarshalStrict([]byte(c), cfg); err != nil { - t.Fatalf("Unable to load YAML config: %s", err) - } + err := yaml.UnmarshalStrict([]byte(c), cfg) + require.NoError(t, err, "Unable to load YAML config.") + return cfg } @@ -533,42 +533,38 @@ scrape_configs: } // Apply the initial configuration. - if err := scrapeManager.ApplyConfig(cfg1); err != nil { - t.Fatalf("unable to apply configuration: %s", err) - } + err = scrapeManager.ApplyConfig(cfg1) + require.NoError(t, err, "Unable to apply configuration.") select { case <-ch: - t.Fatal("reload happened") + require.FailNow(t, "Reload happened.") default: } // Apply a configuration for which the reload fails. - if err := scrapeManager.ApplyConfig(cfg2); err == nil { - t.Fatalf("expecting error but got none") - } + err = scrapeManager.ApplyConfig(cfg2) + require.Error(t, err, "Expecting error but got none.") select { case <-ch: - t.Fatal("reload happened") + require.FailNow(t, "Reload happened.") default: } // Apply a configuration for which the reload succeeds. - if err := scrapeManager.ApplyConfig(cfg3); err != nil { - t.Fatalf("unable to apply configuration: %s", err) - } + err = scrapeManager.ApplyConfig(cfg3) + require.NoError(t, err, "Unable to apply configuration.") select { case <-ch: default: - t.Fatal("reload didn't happen") + require.FailNow(t, "Reload didn't happen.") } // Re-applying the same configuration shouldn't trigger a reload. - if err := scrapeManager.ApplyConfig(cfg3); err != nil { - t.Fatalf("unable to apply configuration: %s", err) - } + err = scrapeManager.ApplyConfig(cfg3) + require.NoError(t, err, "Unable to apply configuration.") select { case <-ch: - t.Fatal("reload happened") + require.FailNow(t, "Reload happened.") default: } } @@ -595,7 +591,7 @@ func TestManagerTargetsUpdates(t *testing.T) { select { case ts <- tgSent: case <-time.After(10 * time.Millisecond): - t.Error("Scrape manager's channel remained blocked after the set threshold.") + require.Fail(t, "Scrape manager's channel remained blocked after the set threshold.") } } @@ -609,7 +605,7 @@ func TestManagerTargetsUpdates(t *testing.T) { select { case <-m.triggerReload: default: - t.Error("No scrape loops reload was triggered after targets update.") + require.Fail(t, "No scrape loops reload was triggered after targets update.") } } @@ -622,9 +618,8 @@ global: ` cfg := &config.Config{} - if err := yaml.UnmarshalStrict([]byte(cfgText), cfg); err != nil { - t.Fatalf("Unable to load YAML config cfgYaml: %s", err) - } + err := yaml.UnmarshalStrict([]byte(cfgText), cfg) + require.NoError(t, err, "Unable to load YAML config cfgYaml.") return cfg } @@ -636,25 +631,18 @@ global: // Load the first config. cfg1 := getConfig("ha1") - if err := scrapeManager.setOffsetSeed(cfg1.GlobalConfig.ExternalLabels); err != nil { - t.Error(err) - } + err = scrapeManager.setOffsetSeed(cfg1.GlobalConfig.ExternalLabels) + require.NoError(t, err) offsetSeed1 := scrapeManager.offsetSeed - if offsetSeed1 == 0 { - t.Error("Offset seed has to be a hash of uint64") - } + require.NotZero(t, offsetSeed1, "Offset seed has to be a hash of uint64.") // Load the first config. cfg2 := getConfig("ha2") - if err := scrapeManager.setOffsetSeed(cfg2.GlobalConfig.ExternalLabels); err != nil { - t.Error(err) - } + require.NoError(t, scrapeManager.setOffsetSeed(cfg2.GlobalConfig.ExternalLabels)) offsetSeed2 := scrapeManager.offsetSeed - if offsetSeed1 == offsetSeed2 { - t.Error("Offset seed should not be the same on different set of external labels") - } + require.NotEqual(t, offsetSeed1, offsetSeed2, "Offset seed should not be the same on different set of external labels.") } func TestManagerScrapePools(t *testing.T) { diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 95e4e182a0..f827ffc8da 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -72,15 +72,11 @@ func TestNewScrapePool(t *testing.T) { sp, _ = newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t)) ) - if a, ok := sp.appendable.(*nopAppendable); !ok || a != app { - t.Fatalf("Wrong sample appender") - } - if sp.config != cfg { - t.Fatalf("Wrong scrape config") - } - if sp.newLoop == nil { - t.Fatalf("newLoop function not initialized") - } + a, ok := sp.appendable.(*nopAppendable) + require.True(t, ok, "Failure to append.") + require.Equal(t, app, a, "Wrong sample appender.") + require.Equal(t, cfg, sp.config, "Wrong scrape config.") + require.NotNil(t, sp.newLoop, "newLoop function not initialized.") } func TestDroppedTargetsList(t *testing.T) { @@ -233,12 +229,10 @@ func TestScrapePoolStop(t *testing.T) { select { case <-time.After(5 * time.Second): - t.Fatalf("scrapeLoop.stop() did not return as expected") + require.Fail(t, "scrapeLoop.stop() did not return as expected") case <-done: // This should have taken at least as long as the last target slept. - if time.Since(stopTime) < time.Duration(numTargets*20)*time.Millisecond { - t.Fatalf("scrapeLoop.stop() exited before all targets stopped") - } + require.GreaterOrEqual(t, time.Since(stopTime), time.Duration(numTargets*20)*time.Millisecond, "scrapeLoop.stop() exited before all targets stopped") } mtx.Lock() @@ -324,12 +318,10 @@ func TestScrapePoolReload(t *testing.T) { select { case <-time.After(5 * time.Second): - t.Fatalf("scrapeLoop.reload() did not return as expected") + require.FailNow(t, "scrapeLoop.reload() did not return as expected") case <-done: // This should have taken at least as long as the last target slept. - if time.Since(reloadTime) < time.Duration(numTargets*20)*time.Millisecond { - t.Fatalf("scrapeLoop.stop() exited before all targets stopped") - } + require.GreaterOrEqual(t, time.Since(reloadTime), time.Duration(numTargets*20)*time.Millisecond, "scrapeLoop.stop() exited before all targets stopped") } mtx.Lock() @@ -703,13 +695,13 @@ func TestScrapeLoopStopBeforeRun(t *testing.T) { select { case <-stopDone: - t.Fatalf("Stopping terminated before run exited successfully") + require.FailNow(t, "Stopping terminated before run exited successfully.") case <-time.After(500 * time.Millisecond): } // Running the scrape loop must exit before calling the scraper even once. scraper.scrapeFunc = func(context.Context, io.Writer) error { - t.Fatalf("scraper was called for terminated scrape loop") + require.FailNow(t, "Scraper was called for terminated scrape loop.") return nil } @@ -722,13 +714,13 @@ func TestScrapeLoopStopBeforeRun(t *testing.T) { select { case <-runDone: case <-time.After(1 * time.Second): - t.Fatalf("Running terminated scrape loop did not exit") + require.FailNow(t, "Running terminated scrape loop did not exit.") } select { case <-stopDone: case <-time.After(1 * time.Second): - t.Fatalf("Stopping did not terminate after running exited") + require.FailNow(t, "Stopping did not terminate after running exited.") } } @@ -765,14 +757,13 @@ func TestScrapeLoopStop(t *testing.T) { select { case <-signal: case <-time.After(5 * time.Second): - t.Fatalf("Scrape wasn't stopped.") + require.FailNow(t, "Scrape wasn't stopped.") } // We expected 1 actual sample for each scrape plus 5 for report samples. // At least 2 scrapes were made, plus the final stale markers. - if len(appender.resultFloats) < 6*3 || len(appender.resultFloats)%6 != 0 { - t.Fatalf("Expected at least 3 scrapes with 6 samples each, got %d samples", len(appender.resultFloats)) - } + require.GreaterOrEqual(t, len(appender.resultFloats), 6*3, "Expected at least 3 scrapes with 6 samples each.") + require.Zero(t, len(appender.resultFloats)%6, "There is a scrape with missing samples.") // All samples in a scrape must have the same timestamp. var ts int64 for i, s := range appender.resultFloats { @@ -785,9 +776,7 @@ func TestScrapeLoopStop(t *testing.T) { } // All samples from the last scrape must be stale markers. for _, s := range appender.resultFloats[len(appender.resultFloats)-5:] { - if !value.IsStaleNaN(s.f) { - t.Fatalf("Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(s.f)) - } + require.True(t, value.IsStaleNaN(s.f), "Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(s.f)) } } @@ -843,9 +832,9 @@ func TestScrapeLoopRun(t *testing.T) { select { case <-signal: case <-time.After(5 * time.Second): - t.Fatalf("Cancellation during initial offset failed") + require.FailNow(t, "Cancellation during initial offset failed.") case err := <-errc: - t.Fatalf("Unexpected error: %s", err) + require.FailNow(t, "Unexpected error: %s", err) } // The provided timeout must cause cancellation of the context passed down to the @@ -873,11 +862,9 @@ func TestScrapeLoopRun(t *testing.T) { select { case err := <-errc: - if !errors.Is(err, context.DeadlineExceeded) { - t.Fatalf("Expected timeout error but got: %s", err) - } + require.ErrorIs(t, err, context.DeadlineExceeded) case <-time.After(3 * time.Second): - t.Fatalf("Expected timeout error but got none") + require.FailNow(t, "Expected timeout error but got none.") } // We already caught the timeout error and are certainly in the loop. @@ -890,9 +877,9 @@ func TestScrapeLoopRun(t *testing.T) { case <-signal: // Loop terminated as expected. case err := <-errc: - t.Fatalf("Unexpected error: %s", err) + require.FailNow(t, "Unexpected error: %s", err) case <-time.After(3 * time.Second): - t.Fatalf("Loop did not terminate on context cancellation") + require.FailNow(t, "Loop did not terminate on context cancellation") } } @@ -912,7 +899,7 @@ func TestScrapeLoopForcedErr(t *testing.T) { sl.setForcedError(forcedErr) scraper.scrapeFunc = func(context.Context, io.Writer) error { - t.Fatalf("should not be scraped") + require.FailNow(t, "Should not be scraped.") return nil } @@ -923,18 +910,16 @@ func TestScrapeLoopForcedErr(t *testing.T) { select { case err := <-errc: - if !errors.Is(err, forcedErr) { - t.Fatalf("Expected forced error but got: %s", err) - } + require.ErrorIs(t, err, forcedErr) case <-time.After(3 * time.Second): - t.Fatalf("Expected forced error but got none") + require.FailNow(t, "Expected forced error but got none.") } cancel() select { case <-signal: case <-time.After(5 * time.Second): - t.Fatalf("Scrape not stopped") + require.FailNow(t, "Scrape not stopped.") } } @@ -1141,7 +1126,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) { select { case <-signal: case <-time.After(5 * time.Second): - t.Fatalf("Scrape wasn't stopped.") + require.FailNow(t, "Scrape wasn't stopped.") } // 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for @@ -1188,7 +1173,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) { select { case <-signal: case <-time.After(5 * time.Second): - t.Fatalf("Scrape wasn't stopped.") + require.FailNow(t, "Scrape wasn't stopped.") } // 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for @@ -1220,19 +1205,15 @@ func TestScrapeLoopCache(t *testing.T) { scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { switch numScrapes { case 1, 2: - if _, ok := sl.cache.series["metric_a"]; !ok { - t.Errorf("metric_a missing from cache after scrape %d", numScrapes) - } - if _, ok := sl.cache.series["metric_b"]; !ok { - t.Errorf("metric_b missing from cache after scrape %d", numScrapes) - } + _, ok := sl.cache.series["metric_a"] + require.True(t, ok, "metric_a missing from cache after scrape %d", numScrapes) + _, ok = sl.cache.series["metric_b"] + require.True(t, ok, "metric_b missing from cache after scrape %d", numScrapes) case 3: - if _, ok := sl.cache.series["metric_a"]; !ok { - t.Errorf("metric_a missing from cache after scrape %d", numScrapes) - } - if _, ok := sl.cache.series["metric_b"]; ok { - t.Errorf("metric_b present in cache after scrape %d", numScrapes) - } + _, ok := sl.cache.series["metric_a"] + require.True(t, ok, "metric_a missing from cache after scrape %d", numScrapes) + _, ok = sl.cache.series["metric_b"] + require.False(t, ok, "metric_b present in cache after scrape %d", numScrapes) } numScrapes++ @@ -1257,7 +1238,7 @@ func TestScrapeLoopCache(t *testing.T) { select { case <-signal: case <-time.After(5 * time.Second): - t.Fatalf("Scrape wasn't stopped.") + require.FailNow(t, "Scrape wasn't stopped.") } // 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for @@ -1305,12 +1286,10 @@ func TestScrapeLoopCacheMemoryExhaustionProtection(t *testing.T) { select { case <-signal: case <-time.After(5 * time.Second): - t.Fatalf("Scrape wasn't stopped.") + require.FailNow(t, "Scrape wasn't stopped.") } - if len(sl.cache.series) > 2000 { - t.Fatalf("More than 2000 series cached. Got: %d", len(sl.cache.series)) - } + require.LessOrEqual(t, len(sl.cache.series), 2000, "More than 2000 series cached.") } func TestScrapeLoopAppend(t *testing.T) { @@ -1541,9 +1520,7 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) { now := time.Now() slApp := sl.appender(context.Background()) total, added, seriesAdded, err := sl.append(app, []byte("metric_a 1\nmetric_b 1\nmetric_c 1\n"), "", now) - if !errors.Is(err, errSampleLimit) { - t.Fatalf("Did not see expected sample limit error: %s", err) - } + require.ErrorIs(t, err, errSampleLimit) require.NoError(t, slApp.Rollback()) require.Equal(t, 3, total) require.Equal(t, 3, added) @@ -1572,9 +1549,7 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) { now = time.Now() slApp = sl.appender(context.Background()) total, added, seriesAdded, err = sl.append(slApp, []byte("metric_a 1\nmetric_b 1\nmetric_c{deleteme=\"yes\"} 1\nmetric_d 1\nmetric_e 1\nmetric_f 1\nmetric_g 1\nmetric_h{deleteme=\"yes\"} 1\nmetric_i{deleteme=\"yes\"} 1\n"), "", now) - if !errors.Is(err, errSampleLimit) { - t.Fatalf("Did not see expected sample limit error: %s", err) - } + require.ErrorIs(t, err, errSampleLimit) require.NoError(t, slApp.Rollback()) require.Equal(t, 9, total) require.Equal(t, 6, added) @@ -2357,15 +2332,12 @@ func TestTargetScraperScrapeOK(t *testing.T) { http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if protobufParsing { accept := r.Header.Get("Accept") - if !strings.HasPrefix(accept, "application/vnd.google.protobuf;") { - t.Errorf("Expected Accept header to prefer application/vnd.google.protobuf, got %q", accept) - } + require.True(t, strings.HasPrefix(accept, "application/vnd.google.protobuf;"), + "Expected Accept header to prefer application/vnd.google.protobuf.") } timeout := r.Header.Get("X-Prometheus-Scrape-Timeout-Seconds") - if timeout != expectedTimeout { - t.Errorf("Expected scrape timeout header %q, got %q", expectedTimeout, timeout) - } + require.Equal(t, expectedTimeout, timeout, "Expected scrape timeout header.") w.Header().Set("Content-Type", `text/plain; version=0.0.4`) w.Write([]byte("metric_a 1\nmetric_b 2\n")) @@ -2453,7 +2425,7 @@ func TestTargetScrapeScrapeCancel(t *testing.T) { select { case <-time.After(5 * time.Second): - t.Fatalf("Scrape function did not return unexpectedly") + require.FailNow(t, "Scrape function did not return unexpectedly.") case err := <-errc: require.NoError(t, err) } @@ -3053,7 +3025,7 @@ func TestScrapeReportSingleAppender(t *testing.T) { select { case <-signal: case <-time.After(5 * time.Second): - t.Fatalf("Scrape wasn't stopped.") + require.FailNow(t, "Scrape wasn't stopped.") } } diff --git a/scrape/target_test.go b/scrape/target_test.go index dac502a80e..6e87ce71d9 100644 --- a/scrape/target_test.go +++ b/scrape/target_test.go @@ -77,9 +77,7 @@ func TestTargetOffset(t *testing.T) { buckets := make([]int, interval/bucketSize) for _, offset := range offsets { - if offset < 0 || offset >= interval { - t.Fatalf("Offset %v out of bounds", offset) - } + require.InDelta(t, time.Duration(0), offset, float64(interval), "Offset %v out of bounds.", offset) bucket := offset / bucketSize buckets[bucket]++ @@ -98,9 +96,7 @@ func TestTargetOffset(t *testing.T) { diff = -diff } - if float64(diff)/float64(avg) > tolerance { - t.Fatalf("Bucket out of tolerance bounds") - } + require.LessOrEqual(t, float64(diff)/float64(avg), tolerance, "Bucket out of tolerance bounds.") } } @@ -150,9 +146,7 @@ func TestNewHTTPBearerToken(t *testing.T) { func(w http.ResponseWriter, r *http.Request) { expected := "Bearer 1234" received := r.Header.Get("Authorization") - if expected != received { - t.Fatalf("Authorization header was not set correctly: expected '%v', got '%v'", expected, received) - } + require.Equal(t, expected, received, "Authorization header was not set correctly.") }, ), ) @@ -162,13 +156,9 @@ func TestNewHTTPBearerToken(t *testing.T) { BearerToken: "1234", } c, err := config_util.NewClientFromConfig(cfg, "test") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) _, err = c.Get(server.URL) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) } func TestNewHTTPBearerTokenFile(t *testing.T) { @@ -177,9 +167,7 @@ func TestNewHTTPBearerTokenFile(t *testing.T) { func(w http.ResponseWriter, r *http.Request) { expected := "Bearer 12345" received := r.Header.Get("Authorization") - if expected != received { - t.Fatalf("Authorization header was not set correctly: expected '%v', got '%v'", expected, received) - } + require.Equal(t, expected, received, "Authorization header was not set correctly.") }, ), ) @@ -189,13 +177,9 @@ func TestNewHTTPBearerTokenFile(t *testing.T) { BearerTokenFile: "testdata/bearertoken.txt", } c, err := config_util.NewClientFromConfig(cfg, "test") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) _, err = c.Get(server.URL) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) } func TestNewHTTPBasicAuth(t *testing.T) { @@ -203,9 +187,9 @@ func TestNewHTTPBasicAuth(t *testing.T) { http.HandlerFunc( func(w http.ResponseWriter, r *http.Request) { username, password, ok := r.BasicAuth() - if !(ok && username == "user" && password == "password123") { - t.Fatalf("Basic authorization header was not set correctly: expected '%v:%v', got '%v:%v'", "user", "password123", username, password) - } + require.True(t, ok, "Basic authorization header was not set correctly.") + require.Equal(t, "user", username) + require.Equal(t, "password123", password) }, ), ) @@ -218,13 +202,9 @@ func TestNewHTTPBasicAuth(t *testing.T) { }, } c, err := config_util.NewClientFromConfig(cfg, "test") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) _, err = c.Get(server.URL) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) } func TestNewHTTPCACert(t *testing.T) { @@ -246,13 +226,9 @@ func TestNewHTTPCACert(t *testing.T) { }, } c, err := config_util.NewClientFromConfig(cfg, "test") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) _, err = c.Get(server.URL) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) } func TestNewHTTPClientCert(t *testing.T) { @@ -279,13 +255,9 @@ func TestNewHTTPClientCert(t *testing.T) { }, } c, err := config_util.NewClientFromConfig(cfg, "test") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) _, err = c.Get(server.URL) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) } func TestNewHTTPWithServerName(t *testing.T) { @@ -308,13 +280,9 @@ func TestNewHTTPWithServerName(t *testing.T) { }, } c, err := config_util.NewClientFromConfig(cfg, "test") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) _, err = c.Get(server.URL) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) } func TestNewHTTPWithBadServerName(t *testing.T) { @@ -337,31 +305,23 @@ func TestNewHTTPWithBadServerName(t *testing.T) { }, } c, err := config_util.NewClientFromConfig(cfg, "test") - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) _, err = c.Get(server.URL) - if err == nil { - t.Fatal("Expected error, got nil.") - } + require.Error(t, err) } func newTLSConfig(certName string, t *testing.T) *tls.Config { tlsConfig := &tls.Config{} caCertPool := x509.NewCertPool() caCert, err := os.ReadFile(caCertPath) - if err != nil { - t.Fatalf("Couldn't set up TLS server: %v", err) - } + require.NoError(t, err, "Couldn't read CA cert.") caCertPool.AppendCertsFromPEM(caCert) tlsConfig.RootCAs = caCertPool tlsConfig.ServerName = "127.0.0.1" certPath := fmt.Sprintf("testdata/%s.cer", certName) keyPath := fmt.Sprintf("testdata/%s.key", certName) cert, err := tls.LoadX509KeyPair(certPath, keyPath) - if err != nil { - t.Errorf("Unable to use specified server cert (%s) & key (%v): %s", certPath, keyPath, err) - } + require.NoError(t, err, "Unable to use specified server cert (%s) & key (%v).", certPath, keyPath) tlsConfig.Certificates = []tls.Certificate{cert} return tlsConfig } @@ -375,9 +335,7 @@ func TestNewClientWithBadTLSConfig(t *testing.T) { }, } _, err := config_util.NewClientFromConfig(cfg, "test") - if err == nil { - t.Fatalf("Expected error, got nil.") - } + require.Error(t, err) } func TestTargetsFromGroup(t *testing.T) { @@ -389,15 +347,9 @@ func TestTargetsFromGroup(t *testing.T) { } lb := labels.NewBuilder(labels.EmptyLabels()) targets, failures := TargetsFromGroup(&targetgroup.Group{Targets: []model.LabelSet{{}, {model.AddressLabel: "localhost:9090"}}}, &cfg, false, nil, lb) - if len(targets) != 1 { - t.Fatalf("Expected 1 target, got %v", len(targets)) - } - if len(failures) != 1 { - t.Fatalf("Expected 1 failure, got %v", len(failures)) - } - if failures[0].Error() != expectedError { - t.Fatalf("Expected error %s, got %s", expectedError, failures[0]) - } + require.Len(t, targets, 1) + require.Len(t, failures, 1) + require.EqualError(t, failures[0], expectedError) } func BenchmarkTargetsFromGroup(b *testing.B) { diff --git a/storage/remote/client.go b/storage/remote/client.go index fbb6804983..e765b47c3e 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -141,24 +141,24 @@ func NewWriteClient(name string, conf *ClientConfig) (WriteClient, error) { } t := httpClient.Transport + if len(conf.Headers) > 0 { + t = newInjectHeadersRoundTripper(conf.Headers, t) + } + if conf.SigV4Config != nil { - t, err = sigv4.NewSigV4RoundTripper(conf.SigV4Config, httpClient.Transport) + t, err = sigv4.NewSigV4RoundTripper(conf.SigV4Config, t) if err != nil { return nil, err } } if conf.AzureADConfig != nil { - t, err = azuread.NewAzureADRoundTripper(conf.AzureADConfig, httpClient.Transport) + t, err = azuread.NewAzureADRoundTripper(conf.AzureADConfig, t) if err != nil { return nil, err } } - if len(conf.Headers) > 0 { - t = newInjectHeadersRoundTripper(conf.Headers, t) - } - httpClient.Transport = otelhttp.NewTransport(t) return &Client{ diff --git a/tsdb/db_test.go b/tsdb/db_test.go index f2293e8277..5924b8f919 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -27,7 +27,6 @@ import ( "path/filepath" "sort" "strconv" - "strings" "sync" "testing" "time" @@ -2334,9 +2333,7 @@ func TestBlockRanges(t *testing.T) { app := db.Appender(ctx) lbl := labels.FromStrings("a", "b") _, err = app.Append(0, lbl, firstBlockMaxT-1, rand.Float64()) - if err == nil { - t.Fatalf("appending a sample with a timestamp covered by a previous block shouldn't be possible") - } + require.Error(t, err, "appending a sample with a timestamp covered by a previous block shouldn't be possible") _, err = app.Append(0, lbl, firstBlockMaxT+1, rand.Float64()) require.NoError(t, err) _, err = app.Append(0, lbl, firstBlockMaxT+2, rand.Float64()) @@ -2354,9 +2351,8 @@ func TestBlockRanges(t *testing.T) { } require.Len(t, db.Blocks(), 2, "no new block created after the set timeout") - if db.Blocks()[0].Meta().MaxTime > db.Blocks()[1].Meta().MinTime { - t.Fatalf("new block overlaps old:%v,new:%v", db.Blocks()[0].Meta(), db.Blocks()[1].Meta()) - } + require.LessOrEqual(t, db.Blocks()[1].Meta().MinTime, db.Blocks()[0].Meta().MaxTime, + "new block overlaps old:%v,new:%v", db.Blocks()[0].Meta(), db.Blocks()[1].Meta()) // Test that wal records are skipped when an existing block covers the same time ranges // and compaction doesn't create an overlapping block. @@ -2396,9 +2392,8 @@ func TestBlockRanges(t *testing.T) { require.Len(t, db.Blocks(), 4, "no new block created after the set timeout") - if db.Blocks()[2].Meta().MaxTime > db.Blocks()[3].Meta().MinTime { - t.Fatalf("new block overlaps old:%v,new:%v", db.Blocks()[2].Meta(), db.Blocks()[3].Meta()) - } + require.LessOrEqual(t, db.Blocks()[3].Meta().MinTime, db.Blocks()[2].Meta().MaxTime, + "new block overlaps old:%v,new:%v", db.Blocks()[2].Meta(), db.Blocks()[3].Meta()) } // TestDBReadOnly ensures that opening a DB in readonly mode doesn't modify any files on the disk. @@ -3187,9 +3182,8 @@ func TestOpen_VariousBlockStates(t *testing.T) { var loaded int for _, l := range loadedBlocks { - if _, ok := expectedLoadedDirs[filepath.Join(tmpDir, l.meta.ULID.String())]; !ok { - t.Fatal("unexpected block", l.meta.ULID, "was loaded") - } + _, ok := expectedLoadedDirs[filepath.Join(tmpDir, l.meta.ULID.String())] + require.True(t, ok, "unexpected block", l.meta.ULID, "was loaded") loaded++ } require.Len(t, expectedLoadedDirs, loaded) @@ -3200,9 +3194,8 @@ func TestOpen_VariousBlockStates(t *testing.T) { var ignored int for _, f := range files { - if _, ok := expectedRemovedDirs[filepath.Join(tmpDir, f.Name())]; ok { - t.Fatal("expected", filepath.Join(tmpDir, f.Name()), "to be removed, but still exists") - } + _, ok := expectedRemovedDirs[filepath.Join(tmpDir, f.Name())] + require.False(t, ok, "expected", filepath.Join(tmpDir, f.Name()), "to be removed, but still exists") if _, ok := expectedIgnoredDirs[filepath.Join(tmpDir, f.Name())]; ok { ignored++ } @@ -3493,8 +3486,8 @@ func testQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChunks(t // the "cannot populate chunk XXX: not found" error occurred. This error can occur // when the iterator tries to fetch an head chunk which has been offloaded because // of the head compaction in the meanwhile. - if firstErr != nil && !strings.Contains(firstErr.Error(), "cannot populate chunk") { - t.Fatalf("unexpected error: %s", firstErr.Error()) + if firstErr != nil { + require.ErrorContains(t, firstErr, "cannot populate chunk") } } diff --git a/tsdb/docs/format/index.md b/tsdb/docs/format/index.md index 6279d34f68..53b77d9abe 100644 --- a/tsdb/docs/format/index.md +++ b/tsdb/docs/format/index.md @@ -82,6 +82,10 @@ Each series section is aligned to 16 bytes. The ID for a series is the `offset/1 Every series entry first holds its number of labels, followed by tuples of symbol table references that contain the label name and value. The label pairs are lexicographically sorted. After the labels, the number of indexed chunks is encoded, followed by a sequence of metadata entries containing the chunks minimum (`mint`) and maximum (`maxt`) timestamp and a reference to its position in the chunk file. The `mint` is the time of the first sample and `maxt` is the time of the last sample in the chunk. Holding the time range data in the index allows dropping chunks irrelevant to queried time ranges without accessing them directly. +Chunk references within single series must be increasing, and chunk references for `series_(N+1)` must be higher than chunk references for `series_N`. +This property guarantees that chunks that belong to the same series are grouped together in the segment files. +Furthermore chunk `mint` must be less or equal than `maxt`, and subsequent chunks within single series must have increasing `mint` and `maxt` and not overlap. + `mint` of the first chunk is stored, it's `maxt` is stored as a delta and the `mint` and `maxt` are encoded as deltas to the previous time for subsequent chunks. Similarly, the reference of the first chunk is stored and the next ref is stored as a delta to the previous one. ``` diff --git a/tsdb/fileutil/flock_test.go b/tsdb/fileutil/flock_test.go index f9cbbcf2b3..7aff789a26 100644 --- a/tsdb/fileutil/flock_test.go +++ b/tsdb/fileutil/flock_test.go @@ -18,6 +18,8 @@ import ( "path/filepath" "testing" + "github.com/stretchr/testify/require" + "github.com/prometheus/prometheus/util/testutil" ) @@ -27,54 +29,35 @@ func TestLocking(t *testing.T) { fileName := filepath.Join(dir.Path(), "LOCK") - if _, err := os.Stat(fileName); err == nil { - t.Fatalf("File %q unexpectedly exists.", fileName) - } + _, err := os.Stat(fileName) + require.Error(t, err, "File %q unexpectedly exists.", fileName) lock, existed, err := Flock(fileName) - if err != nil { - t.Fatalf("Error locking file %q: %s", fileName, err) - } - if existed { - t.Errorf("File %q reported as existing during locking.", fileName) - } + require.NoError(t, err, "Error locking file %q", fileName) + require.False(t, existed, "File %q reported as existing during locking.", fileName) // File must now exist. - if _, err = os.Stat(fileName); err != nil { - t.Errorf("Could not stat file %q expected to exist: %s", fileName, err) - } + _, err = os.Stat(fileName) + require.NoError(t, err, "Could not stat file %q expected to exist", fileName) // Try to lock again. lockedAgain, existed, err := Flock(fileName) - if err == nil { - t.Fatalf("File %q locked twice.", fileName) - } - if lockedAgain != nil { - t.Error("Unsuccessful locking did not return nil.") - } - if !existed { - t.Errorf("Existing file %q not recognized.", fileName) - } + require.Error(t, err, "File %q locked twice.", fileName) + require.Nil(t, lockedAgain, "Unsuccessful locking did not return nil.") + require.True(t, existed, "Existing file %q not recognized.", fileName) - if err := lock.Release(); err != nil { - t.Errorf("Error releasing lock for file %q: %s", fileName, err) - } + err = lock.Release() + require.NoError(t, err, "Error releasing lock for file %q", fileName) // File must still exist. - if _, err = os.Stat(fileName); err != nil { - t.Errorf("Could not stat file %q expected to exist: %s", fileName, err) - } + _, err = os.Stat(fileName) + require.NoError(t, err, "Could not stat file %q expected to exist", fileName) // Lock existing file. lock, existed, err = Flock(fileName) - if err != nil { - t.Fatalf("Error locking file %q: %s", fileName, err) - } - if !existed { - t.Errorf("Existing file %q not recognized.", fileName) - } + require.NoError(t, err, "Error locking file %q", fileName) + require.True(t, existed, "Existing file %q not recognized.", fileName) - if err := lock.Release(); err != nil { - t.Errorf("Error releasing lock for file %q: %s", fileName, err) - } + err = lock.Release() + require.NoError(t, err, "Error releasing lock for file %q", fileName) } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 0b71718525..9e0ead4ad4 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -206,7 +206,7 @@ func readTestWAL(t testing.TB, dir string) (recs []interface{}) { require.NoError(t, err) recs = append(recs, exemplars) default: - t.Fatalf("unknown record type") + require.Fail(t, "unknown record type") } } require.NoError(t, r.Err()) @@ -1371,7 +1371,7 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) { case []record.RefMetadata: metadata++ default: - t.Fatalf("unknown record type") + require.Fail(t, "unknown record type") } } require.Equal(t, 1, series) @@ -1620,9 +1620,7 @@ func TestComputeChunkEndTime(t *testing.T) { for testName, tc := range cases { t.Run(testName, func(t *testing.T) { got := computeChunkEndTime(tc.start, tc.cur, tc.max, tc.ratioToFull) - if got != tc.res { - t.Errorf("expected %d for (start: %d, cur: %d, max: %d, ratioToFull: %f), got %d", tc.res, tc.start, tc.cur, tc.max, tc.ratioToFull, got) - } + require.Equal(t, tc.res, got, "(start: %d, cur: %d, max: %d)", tc.start, tc.cur, tc.max) }) } } diff --git a/tsdb/index/index.go b/tsdb/index/index.go index 471c0f62b2..750836a850 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -147,8 +147,11 @@ type Writer struct { labelNames map[string]uint64 // Label names, and their usage. // Hold last series to validate that clients insert new series in order. - lastSeries labels.Labels - lastRef storage.SeriesRef + lastSeries labels.Labels + lastSeriesRef storage.SeriesRef + + // Hold last added chunk reference to make sure that chunks are ordered properly. + lastChunkRef chunks.ChunkRef crc32 hash.Hash @@ -434,9 +437,27 @@ func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, chunks ... return fmt.Errorf("out-of-order series added with label set %q", lset) } - if ref < w.lastRef && !w.lastSeries.IsEmpty() { + if ref < w.lastSeriesRef && !w.lastSeries.IsEmpty() { return fmt.Errorf("series with reference greater than %d already added", ref) } + + lastChunkRef := w.lastChunkRef + lastMaxT := int64(0) + for ix, c := range chunks { + if c.Ref < lastChunkRef { + return fmt.Errorf("unsorted chunk reference: %d, previous: %d", c.Ref, lastChunkRef) + } + lastChunkRef = c.Ref + + if ix > 0 && c.MinTime <= lastMaxT { + return fmt.Errorf("chunk minT %d is not higher than previous chunk maxT %d", c.MinTime, lastMaxT) + } + if c.MaxTime < c.MinTime { + return fmt.Errorf("chunk maxT %d is less than minT %d", c.MaxTime, c.MinTime) + } + lastMaxT = c.MaxTime + } + // We add padding to 16 bytes to increase the addressable space we get through 4 byte // series references. if err := w.addPadding(seriesByteAlign); err != nil { @@ -511,7 +532,8 @@ func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, chunks ... } w.lastSeries.CopyFrom(lset) - w.lastRef = ref + w.lastSeriesRef = ref + w.lastChunkRef = lastChunkRef return nil } diff --git a/tsdb/index/index_test.go b/tsdb/index/index_test.go index f68657637e..4a9df7811d 100644 --- a/tsdb/index/index_test.go +++ b/tsdb/index/index_test.go @@ -18,7 +18,6 @@ import ( "errors" "fmt" "hash/crc32" - "math/rand" "os" "path/filepath" "sort" @@ -412,15 +411,17 @@ func TestPersistence_index_e2e(t *testing.T) { var input indexWriterSeriesSlice + ref := uint64(0) // Generate ChunkMetas for every label set. for i, lset := range lbls { var metas []chunks.Meta for j := 0; j <= (i % 20); j++ { + ref++ metas = append(metas, chunks.Meta{ MinTime: int64(j * 10000), - MaxTime: int64((j + 1) * 10000), - Ref: chunks.ChunkRef(rand.Uint64()), + MaxTime: int64((j+1)*10000) - 1, + Ref: chunks.ChunkRef(ref), Chunk: chunkenc.NewXORChunk(), }) } @@ -682,3 +683,51 @@ func TestDecoder_Postings_WrongInput(t *testing.T) { _, _, err := (&Decoder{}).Postings([]byte("the cake is a lie")) require.Error(t, err) } + +func TestChunksRefOrdering(t *testing.T) { + dir := t.TempDir() + + idx, err := NewWriter(context.Background(), filepath.Join(dir, "index")) + require.NoError(t, err) + + require.NoError(t, idx.AddSymbol("1")) + require.NoError(t, idx.AddSymbol("2")) + require.NoError(t, idx.AddSymbol("__name__")) + + c50 := chunks.Meta{Ref: 50} + c100 := chunks.Meta{Ref: 100} + c200 := chunks.Meta{Ref: 200} + + require.NoError(t, idx.AddSeries(1, labels.FromStrings("__name__", "1"), c100)) + require.EqualError(t, idx.AddSeries(2, labels.FromStrings("__name__", "2"), c50), "unsorted chunk reference: 50, previous: 100") + require.NoError(t, idx.AddSeries(2, labels.FromStrings("__name__", "2"), c200)) + require.NoError(t, idx.Close()) +} + +func TestChunksTimeOrdering(t *testing.T) { + dir := t.TempDir() + + idx, err := NewWriter(context.Background(), filepath.Join(dir, "index")) + require.NoError(t, err) + + require.NoError(t, idx.AddSymbol("1")) + require.NoError(t, idx.AddSymbol("2")) + require.NoError(t, idx.AddSymbol("__name__")) + + require.NoError(t, idx.AddSeries(1, labels.FromStrings("__name__", "1"), + chunks.Meta{Ref: 1, MinTime: 0, MaxTime: 10}, // Also checks that first chunk can have MinTime: 0. + chunks.Meta{Ref: 2, MinTime: 11, MaxTime: 20}, + chunks.Meta{Ref: 3, MinTime: 21, MaxTime: 30}, + )) + + require.EqualError(t, idx.AddSeries(1, labels.FromStrings("__name__", "2"), + chunks.Meta{Ref: 10, MinTime: 0, MaxTime: 10}, + chunks.Meta{Ref: 20, MinTime: 10, MaxTime: 20}, + ), "chunk minT 10 is not higher than previous chunk maxT 10") + + require.EqualError(t, idx.AddSeries(1, labels.FromStrings("__name__", "2"), + chunks.Meta{Ref: 10, MinTime: 100, MaxTime: 30}, + ), "chunk maxT 30 is less than minT 100") + + require.NoError(t, idx.Close()) +} diff --git a/tsdb/index/postings_test.go b/tsdb/index/postings_test.go index 95f3aa2c1e..a730c372cf 100644 --- a/tsdb/index/postings_test.go +++ b/tsdb/index/postings_test.go @@ -61,9 +61,7 @@ func TestMemPostings_ensureOrder(t *testing.T) { ok := sort.SliceIsSorted(l, func(i, j int) bool { return l[i] < l[j] }) - if !ok { - t.Fatalf("postings list %v is not sorted", l) - } + require.True(t, ok, "postings list %v is not sorted", l) } } } @@ -214,9 +212,7 @@ func TestIntersect(t *testing.T) { for _, c := range cases { t.Run("", func(t *testing.T) { - if c.res == nil { - t.Fatal("intersect result expectancy cannot be nil") - } + require.NotNil(t, c.res, "intersect result expectancy cannot be nil") expected, err := ExpandPostings(c.res) require.NoError(t, err) @@ -228,9 +224,7 @@ func TestIntersect(t *testing.T) { return } - if i == EmptyPostings() { - t.Fatal("intersect unexpected result: EmptyPostings sentinel") - } + require.NotEqual(t, EmptyPostings(), i, "intersect unexpected result: EmptyPostings sentinel") res, err := ExpandPostings(i) require.NoError(t, err) @@ -501,9 +495,7 @@ func TestMergedPostings(t *testing.T) { for _, c := range cases { t.Run("", func(t *testing.T) { - if c.res == nil { - t.Fatal("merge result expectancy cannot be nil") - } + require.NotNil(t, c.res, "merge result expectancy cannot be nil") ctx := context.Background() @@ -517,9 +509,7 @@ func TestMergedPostings(t *testing.T) { return } - if m == EmptyPostings() { - t.Fatal("merge unexpected result: EmptyPostings sentinel") - } + require.NotEqual(t, EmptyPostings(), m, "merge unexpected result: EmptyPostings sentinel") res, err := ExpandPostings(m) require.NoError(t, err) @@ -897,9 +887,7 @@ func TestWithoutPostings(t *testing.T) { for _, c := range cases { t.Run("", func(t *testing.T) { - if c.res == nil { - t.Fatal("without result expectancy cannot be nil") - } + require.NotNil(t, c.res, "without result expectancy cannot be nil") expected, err := ExpandPostings(c.res) require.NoError(t, err) @@ -911,9 +899,7 @@ func TestWithoutPostings(t *testing.T) { return } - if w == EmptyPostings() { - t.Fatal("without unexpected result: EmptyPostings sentinel") - } + require.NotEqual(t, EmptyPostings(), w, "without unexpected result: EmptyPostings sentinel") res, err := ExpandPostings(w) require.NoError(t, err) diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 49b3be36d0..08a053d89b 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -3022,9 +3022,7 @@ func TestPostingsForMatchers(t *testing.T) { } } require.NoError(t, p.Err()) - if len(exp) != 0 { - t.Errorf("Evaluating %v, missing results %+v", c.matchers, exp) - } + require.Empty(t, exp, "Evaluating %v", c.matchers) }) } } @@ -3107,9 +3105,7 @@ func TestClose(t *testing.T) { createBlock(t, dir, genSeries(1, 1, 10, 20)) db, err := Open(dir, nil, nil, DefaultOptions(), nil) - if err != nil { - t.Fatalf("Opening test storage failed: %s", err) - } + require.NoError(t, err, "Opening test storage failed: %s") defer func() { require.NoError(t, db.Close()) }() diff --git a/tsdb/wal_test.go b/tsdb/wal_test.go index 9004f5093f..8700a70754 100644 --- a/tsdb/wal_test.go +++ b/tsdb/wal_test.go @@ -528,7 +528,7 @@ func TestMigrateWAL_Fuzz(t *testing.T) { require.NoError(t, err) res = append(res, s) default: - t.Fatalf("unknown record type %d", dec.Type(rec)) + require.Fail(t, "unknown record type %d", dec.Type(rec)) } } require.NoError(t, r.Err()) diff --git a/tsdb/wlog/reader_test.go b/tsdb/wlog/reader_test.go index 0f510e0c1e..484eff3664 100644 --- a/tsdb/wlog/reader_test.go +++ b/tsdb/wlog/reader_test.go @@ -182,16 +182,13 @@ func TestReader(t *testing.T) { t.Logf("record %d", j) rec := r.Record() - if j >= len(c.exp) { - t.Fatal("received more records than expected") - } + require.Less(t, j, len(c.exp), "received more records than expected") require.Equal(t, c.exp[j], rec, "Bytes within record did not match expected Bytes") } - if !c.fail && r.Err() != nil { - t.Fatalf("unexpected error: %s", r.Err()) - } - if c.fail && r.Err() == nil { - t.Fatalf("expected error but got none") + if !c.fail { + require.NoError(t, r.Err()) + } else { + require.Error(t, r.Err()) } }) } diff --git a/tsdb/wlog/wlog_test.go b/tsdb/wlog/wlog_test.go index 3aeebd099e..f9ab773e13 100644 --- a/tsdb/wlog/wlog_test.go +++ b/tsdb/wlog/wlog_test.go @@ -191,9 +191,7 @@ func TestWALRepair_ReadingError(t *testing.T) { require.Len(t, result, test.intactRecs, "Wrong number of intact records") for i, r := range result { - if !bytes.Equal(records[i], r) { - t.Fatalf("record %d diverges: want %x, got %x", i, records[i][:10], r[:10]) - } + require.True(t, bytes.Equal(records[i], r), "record %d diverges: want %x, got %x", i, records[i][:10], r[:10]) } // Make sure there is a new 0 size Segment after the corrupted Segment. diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index 3dc83548de..eceadb20ae 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -216,18 +216,11 @@ type rulesRetrieverMock struct { func (m *rulesRetrieverMock) CreateAlertingRules() { expr1, err := parser.ParseExpr(`absent(test_metric3) != 1`) - if err != nil { - m.testing.Fatalf("unable to parse alert expression: %s", err) - } + require.NoError(m.testing, err) expr2, err := parser.ParseExpr(`up == 1`) - if err != nil { - m.testing.Fatalf("Unable to parse alert expression: %s", err) - } - + require.NoError(m.testing, err) expr3, err := parser.ParseExpr(`vector(1)`) - if err != nil { - m.testing.Fatalf("Unable to parse alert expression: %s", err) - } + require.NoError(m.testing, err) rule1 := rules.NewAlertingRule( "test_metric3", @@ -302,9 +295,7 @@ func (m *rulesRetrieverMock) CreateRuleGroups() { } recordingExpr, err := parser.ParseExpr(`vector(1)`) - if err != nil { - m.testing.Fatalf("unable to parse alert expression: %s", err) - } + require.NoError(m.testing, err, "unable to parse alert expression") recordingRule := rules.NewRecordingRule("recording-rule-1", recordingExpr, labels.Labels{}) r = append(r, recordingRule) @@ -714,9 +705,7 @@ func TestQueryExemplars(t *testing.T) { for _, te := range tc.exemplars { for _, e := range te.Exemplars { _, err := es.AppendExemplar(0, te.SeriesLabels, e) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) } } @@ -2832,9 +2821,7 @@ func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, es storage.E } req, err := request(method, test.query) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) tr.ResetMetadataStore() for _, tm := range test.metadata { @@ -2844,9 +2831,7 @@ func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, es storage.E for _, te := range test.exemplars { for _, e := range te.Exemplars { _, err := es.AppendExemplar(0, te.SeriesLabels, e) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) } } @@ -2882,17 +2867,11 @@ func describeAPIFunc(f apiFunc) string { func assertAPIError(t *testing.T, got *apiError, exp errorType) { t.Helper() - if got != nil { - if exp == errorNone { - t.Fatalf("Unexpected error: %s", got) - } - if exp != got.typ { - t.Fatalf("Expected error of type %q but got type %q (%q)", exp, got.typ, got) - } - return - } - if exp != errorNone { - t.Fatalf("Expected error of type %q but got none", exp) + if exp == errorNone { + require.Nil(t, got) + } else { + require.NotNil(t, got) + require.Equal(t, exp, got.typ, "(%q)", got) } } @@ -2906,13 +2885,7 @@ func assertAPIResponseLength(t *testing.T, got interface{}, expLen int) { t.Helper() gotLen := reflect.ValueOf(got).Len() - if gotLen != expLen { - t.Fatalf( - "Response length does not match, expected:\n%d\ngot:\n%d", - expLen, - gotLen, - ) - } + require.Equal(t, expLen, gotLen, "Response length does not match") } func assertAPIResponseMetadataLen(t *testing.T, got interface{}, expLen int) { @@ -2924,13 +2897,7 @@ func assertAPIResponseMetadataLen(t *testing.T, got interface{}, expLen int) { gotLen += len(m) } - if gotLen != expLen { - t.Fatalf( - "Amount of metadata in the response does not match, expected:\n%d\ngot:\n%d", - expLen, - gotLen, - ) - } + require.Equal(t, expLen, gotLen, "Amount of metadata in the response does not match") } type fakeDB struct { @@ -3271,26 +3238,18 @@ func TestRespondError(t *testing.T) { defer s.Close() resp, err := http.Get(s.URL) - if err != nil { - t.Fatalf("Error on test request: %s", err) - } + require.NoError(t, err, "Error on test request") body, err := io.ReadAll(resp.Body) defer resp.Body.Close() - if err != nil { - t.Fatalf("Error reading response body: %s", err) - } - - if want, have := http.StatusServiceUnavailable, resp.StatusCode; want != have { - t.Fatalf("Return code %d expected in error response but got %d", want, have) - } - if h := resp.Header.Get("Content-Type"); h != "application/json" { - t.Fatalf("Expected Content-Type %q but got %q", "application/json", h) - } + require.NoError(t, err, "Error reading response body") + want, have := http.StatusServiceUnavailable, resp.StatusCode + require.Equal(t, want, have, "Return code %d expected in error response but got %d", want, have) + h := resp.Header.Get("Content-Type") + require.Equal(t, "application/json", h, "Expected Content-Type %q but got %q", "application/json", h) var res Response - if err = json.Unmarshal(body, &res); err != nil { - t.Fatalf("Error unmarshaling JSON body: %s", err) - } + err = json.Unmarshal(body, &res) + require.NoError(t, err, "Error unmarshaling JSON body") exp := &Response{ Status: statusError, @@ -3419,17 +3378,13 @@ func TestParseTime(t *testing.T) { for _, test := range tests { ts, err := parseTime(test.input) - if err != nil && !test.fail { - t.Errorf("Unexpected error for %q: %s", test.input, err) + if !test.fail { + require.NoError(t, err, "Unexpected error for %q", test.input) + require.NotNil(t, ts) + require.True(t, ts.Equal(test.result), "Expected time %v for input %q but got %v", test.result, test.input, ts) continue } - if err == nil && test.fail { - t.Errorf("Expected error for %q but got none", test.input) - continue - } - if !test.fail && !ts.Equal(test.result) { - t.Errorf("Expected time %v for input %q but got %v", test.result, test.input, ts) - } + require.Error(t, err, "Expected error for %q but got none", test.input) } } @@ -3473,17 +3428,12 @@ func TestParseDuration(t *testing.T) { for _, test := range tests { d, err := parseDuration(test.input) - if err != nil && !test.fail { - t.Errorf("Unexpected error for %q: %s", test.input, err) - continue - } - if err == nil && test.fail { - t.Errorf("Expected error for %q but got none", test.input) + if !test.fail { + require.NoError(t, err, "Unexpected error for %q", test.input) + require.Equal(t, test.result, d, "Expected duration %v for input %q but got %v", test.result, test.input, d) continue } - if !test.fail && d != test.result { - t.Errorf("Expected duration %v for input %q but got %v", test.result, test.input, d) - } + require.Error(t, err, "Expected error for %q but got none", test.input) } } @@ -3496,18 +3446,11 @@ func TestOptionsMethod(t *testing.T) { defer s.Close() req, err := http.NewRequest("OPTIONS", s.URL+"/any_path", nil) - if err != nil { - t.Fatalf("Error creating OPTIONS request: %s", err) - } + require.NoError(t, err, "Error creating OPTIONS request") client := &http.Client{} resp, err := client.Do(req) - if err != nil { - t.Fatalf("Error executing OPTIONS request: %s", err) - } - - if resp.StatusCode != http.StatusNoContent { - t.Fatalf("Expected status %d, got %d", http.StatusNoContent, resp.StatusCode) - } + require.NoError(t, err, "Error executing OPTIONS request") + require.Equal(t, http.StatusNoContent, resp.StatusCode) } func TestTSDBStatus(t *testing.T) { @@ -3546,9 +3489,7 @@ func TestTSDBStatus(t *testing.T) { api := &API{db: tc.db, gatherer: prometheus.DefaultGatherer} endpoint := tc.endpoint(api) req, err := http.NewRequest(tc.method, fmt.Sprintf("?%s", tc.values.Encode()), nil) - if err != nil { - t.Fatalf("Error when creating test request: %s", err) - } + require.NoError(t, err, "Error when creating test request") res := endpoint(req) assertAPIError(t, res.err, tc.errType) }) diff --git a/web/web_test.go b/web/web_test.go index 8832c28390..62bdb2ae31 100644 --- a/web/web_test.go +++ b/web/web_test.go @@ -440,7 +440,7 @@ func TestShutdownWithStaleConnection(t *testing.T) { select { case <-closed: case <-time.After(timeout + 5*time.Second): - t.Fatalf("Server still running after read timeout.") + require.FailNow(t, "Server still running after read timeout.") } } @@ -502,7 +502,7 @@ func TestHandleMultipleQuitRequests(t *testing.T) { select { case <-closed: case <-time.After(5 * time.Second): - t.Fatalf("Server still running after 5 seconds.") + require.FailNow(t, "Server still running after 5 seconds.") } }