Skip to content

Commit

Permalink
Merge pull request prometheus#12254 from zenador/histogram-bucket-limit
Browse files Browse the repository at this point in the history
Implement bucket limit for native histograms
  • Loading branch information
beorn7 authored May 10, 2023
2 parents 37fe9b8 + 40240c9 commit bd98fc8
Show file tree
Hide file tree
Showing 7 changed files with 346 additions and 45 deletions.
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,9 @@ type ScrapeConfig struct {
// More than this label value length post metric-relabeling will cause the
// scrape to fail.
LabelValueLengthLimit uint `yaml:"label_value_length_limit,omitempty"`
// More than this many buckets in a native histogram will cause the scrape to
// fail.
NativeHistogramBucketLimit uint `yaml:"native_histogram_bucket_limit,omitempty"`

// We cannot do proper Go type embedding below as the parser will then parse
// values arbitrarily into the overflow maps of further-down types.
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,11 @@ metric_relabel_configs:
# 0 means no limit. This is an experimental feature, this behaviour could
# change in the future.
[ target_limit: <int> | default = 0 ]

# Limit on total number of positive and negative buckets allowed in a single
# native histogram. If this is exceeded, the entire scrape will be treated as
# failed. 0 means no limit.
[ native_histogram_bucket_limit: <int> | default = 0 ]
```
Where `<job_name>` must be unique across all scrape configurations.
Expand Down
54 changes: 54 additions & 0 deletions scrape/clientprotobuf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package scrape

import (
"bytes"
"encoding/binary"

"github.com/gogo/protobuf/proto"

// Intentionally using client model to simulate client in tests.
dto "github.com/prometheus/client_model/go"
)

// Write a MetricFamily into a protobuf.
// This function is intended for testing scraping by providing protobuf serialized input.
func MetricFamilyToProtobuf(metricFamily *dto.MetricFamily) ([]byte, error) {
buffer := &bytes.Buffer{}
err := AddMetricFamilyToProtobuf(buffer, metricFamily)
if err != nil {
return nil, err
}
return buffer.Bytes(), nil
}

// Append a MetricFamily protobuf representation to a buffer.
// This function is intended for testing scraping by providing protobuf serialized input.
func AddMetricFamilyToProtobuf(buffer *bytes.Buffer, metricFamily *dto.MetricFamily) error {
protoBuf, err := proto.Marshal(metricFamily)
if err != nil {
return err
}

varintBuf := make([]byte, binary.MaxVarintLen32)
varintLength := binary.PutUvarint(varintBuf, uint64(len(protoBuf)))

_, err = buffer.Write(varintBuf[:varintLength])
if err != nil {
return err
}
_, err = buffer.Write(protoBuf)
return err
}
54 changes: 45 additions & 9 deletions scrape/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,12 @@ var (
},
[]string{"scrape_job"},
)
targetScrapeNativeHistogramBucketLimit = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "prometheus_target_scrapes_exceeded_native_histogram_bucket_limit_total",
Help: "Total number of scrapes that hit the native histogram bucket limit and were rejected.",
},
)
)

func init() {
Expand All @@ -216,6 +222,7 @@ func init() {
targetScrapeExemplarOutOfOrder,
targetScrapePoolExceededLabelLimits,
targetSyncFailed,
targetScrapeNativeHistogramBucketLimit,
)
}

Expand Down Expand Up @@ -256,6 +263,7 @@ type scrapeLoopOptions struct {
target *Target
scraper scraper
sampleLimit int
bucketLimit int
labelLimits *labelLimits
honorLabels bool
honorTimestamps bool
Expand Down Expand Up @@ -319,6 +327,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed
jitterSeed,
opts.honorTimestamps,
opts.sampleLimit,
opts.bucketLimit,
opts.labelLimits,
opts.interval,
opts.timeout,
Expand Down Expand Up @@ -412,6 +421,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
timeout = time.Duration(sp.config.ScrapeTimeout)
bodySizeLimit = int64(sp.config.BodySizeLimit)
sampleLimit = int(sp.config.SampleLimit)
bucketLimit = int(sp.config.NativeHistogramBucketLimit)
labelLimits = &labelLimits{
labelLimit: int(sp.config.LabelLimit),
labelNameLengthLimit: int(sp.config.LabelNameLengthLimit),
Expand Down Expand Up @@ -446,6 +456,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
target: t,
scraper: s,
sampleLimit: sampleLimit,
bucketLimit: bucketLimit,
labelLimits: labelLimits,
honorLabels: honorLabels,
honorTimestamps: honorTimestamps,
Expand Down Expand Up @@ -530,6 +541,7 @@ func (sp *scrapePool) sync(targets []*Target) {
timeout = time.Duration(sp.config.ScrapeTimeout)
bodySizeLimit = int64(sp.config.BodySizeLimit)
sampleLimit = int(sp.config.SampleLimit)
bucketLimit = int(sp.config.NativeHistogramBucketLimit)
labelLimits = &labelLimits{
labelLimit: int(sp.config.LabelLimit),
labelNameLengthLimit: int(sp.config.LabelNameLengthLimit),
Expand Down Expand Up @@ -559,6 +571,7 @@ func (sp *scrapePool) sync(targets []*Target) {
target: t,
scraper: s,
sampleLimit: sampleLimit,
bucketLimit: bucketLimit,
labelLimits: labelLimits,
honorLabels: honorLabels,
honorTimestamps: honorTimestamps,
Expand Down Expand Up @@ -731,17 +744,24 @@ func mutateReportSampleLabels(lset labels.Labels, target *Target) labels.Labels
}

// appender returns an appender for ingested samples from the target.
func appender(app storage.Appender, limit int) storage.Appender {
func appender(app storage.Appender, sampleLimit, bucketLimit int) storage.Appender {
app = &timeLimitAppender{
Appender: app,
maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)),
}

// The limit is applied after metrics are potentially dropped via relabeling.
if limit > 0 {
// The sampleLimit is applied after metrics are potentially dropped via relabeling.
if sampleLimit > 0 {
app = &limitAppender{
Appender: app,
limit: limit,
limit: sampleLimit,
}
}

if bucketLimit > 0 {
app = &bucketLimitAppender{
Appender: app,
limit: bucketLimit,
}
}
return app
Expand Down Expand Up @@ -872,6 +892,7 @@ type scrapeLoop struct {
forcedErr error
forcedErrMtx sync.Mutex
sampleLimit int
bucketLimit int
labelLimits *labelLimits
interval time.Duration
timeout time.Duration
Expand Down Expand Up @@ -1152,6 +1173,7 @@ func newScrapeLoop(ctx context.Context,
jitterSeed uint64,
honorTimestamps bool,
sampleLimit int,
bucketLimit int,
labelLimits *labelLimits,
interval time.Duration,
timeout time.Duration,
Expand Down Expand Up @@ -1195,6 +1217,7 @@ func newScrapeLoop(ctx context.Context,
appenderCtx: appenderCtx,
honorTimestamps: honorTimestamps,
sampleLimit: sampleLimit,
bucketLimit: bucketLimit,
labelLimits: labelLimits,
interval: interval,
timeout: timeout,
Expand Down Expand Up @@ -1482,6 +1505,7 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string,
defTime = timestamp.FromTime(ts)
appErrs = appendErrors{}
sampleLimitErr error
bucketLimitErr error
e exemplar.Exemplar // escapes to heap so hoisted out of loop
meta metadata.Metadata
metadataChanged bool
Expand Down Expand Up @@ -1510,7 +1534,7 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string,
}

// Take an appender with limits.
app = appender(app, sl.sampleLimit)
app = appender(app, sl.sampleLimit, sl.bucketLimit)

defer func() {
if err != nil {
Expand Down Expand Up @@ -1631,7 +1655,7 @@ loop:
} else {
ref, err = app.Append(ref, lset, t, val)
}
sampleAdded, err = sl.checkAddError(ce, met, parsedTimestamp, err, &sampleLimitErr, &appErrs)
sampleAdded, err = sl.checkAddError(ce, met, parsedTimestamp, err, &sampleLimitErr, &bucketLimitErr, &appErrs)
if err != nil {
if err != storage.ErrNotFound {
level.Debug(sl.l).Log("msg", "Unexpected error", "series", string(met), "err", err)
Expand All @@ -1645,7 +1669,7 @@ loop:
sl.cache.trackStaleness(hash, lset)
}
sl.cache.addRef(met, ref, lset, hash)
if sampleAdded && sampleLimitErr == nil {
if sampleAdded && sampleLimitErr == nil && bucketLimitErr == nil {
seriesAdded++
}
}
Expand Down Expand Up @@ -1681,6 +1705,13 @@ loop:
// We only want to increment this once per scrape, so this is Inc'd outside the loop.
targetScrapeSampleLimit.Inc()
}
if bucketLimitErr != nil {
if err == nil {
err = bucketLimitErr // If sample limit is hit, that error takes precedence.
}
// We only want to increment this once per scrape, so this is Inc'd outside the loop.
targetScrapeNativeHistogramBucketLimit.Inc()
}
if appErrs.numOutOfOrder > 0 {
level.Warn(sl.l).Log("msg", "Error on ingesting out-of-order samples", "num_dropped", appErrs.numOutOfOrder)
}
Expand Down Expand Up @@ -1710,8 +1741,8 @@ loop:
}

// Adds samples to the appender, checking the error, and then returns the # of samples added,
// whether the caller should continue to process more samples, and any sample limit errors.
func (sl *scrapeLoop) checkAddError(ce *cacheEntry, met []byte, tp *int64, err error, sampleLimitErr *error, appErrs *appendErrors) (bool, error) {
// whether the caller should continue to process more samples, and any sample or bucket limit errors.
func (sl *scrapeLoop) checkAddError(ce *cacheEntry, met []byte, tp *int64, err error, sampleLimitErr, bucketLimitErr *error, appErrs *appendErrors) (bool, error) {
switch errors.Cause(err) {
case nil:
if tp == nil && ce != nil {
Expand Down Expand Up @@ -1740,6 +1771,11 @@ func (sl *scrapeLoop) checkAddError(ce *cacheEntry, met []byte, tp *int64, err e
// total number of samples scraped.
*sampleLimitErr = err
return false, nil
case errBucketLimit:
// Keep on parsing output if we hit the limit, so we report the correct
// total number of samples scraped.
*bucketLimitErr = err
return false, nil
default:
return false, err
}
Expand Down
Loading

0 comments on commit bd98fc8

Please sign in to comment.