Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support metric_relabel_configs in distributor #3329

Merged
merged 6 commits into from
Oct 26, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
* [FEATURE] Shuffle sharding: added support for shuffle-sharding queriers in the query-frontend. When configured (`-frontend.max-queriers-per-tenant` globally, or using per-tenant limit `max_queriers_per_tenant`), each tenants's requests will be handled by different set of queriers. #3113 #3257
* [FEATURE] Shuffle sharding: added support for shuffle-sharding ingesters on the read path. When ingesters shuffle-sharding is enabled and `-querier.shuffle-sharding-ingesters-lookback-period` is set, queriers will fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since 'now - lookback period'. #3252
* [FEATURE] Query-frontend: added `compression` config to support results cache with compression. #3217
* [FEATURE] Added support for applying Prometheus relabel configs on series received by the distributor. A `metric_relabel_configs` field has been added to the per-tenant limits configuration. #3329
* [ENHANCEMENT] Ruler: Introduces two new limits `-ruler.max-rules-per-rule-group` and `-ruler.max-rule-groups-per-tenant` to control the number of rules per rule group and the total number of rule groups for a given user. They are disabled by default. #3366
* [ENHANCEMENT] Allow to specify multiple comma-separated Cortex services to `-target` CLI option (or its respective YAML config option). For example, `-target=all,compactor` can be used to start Cortex single-binary with compactor as well. #3275
* [ENHANCEMENT] Expose additional HTTP configs for the S3 backend client. New flag are listed below: #3244
Expand Down
6 changes: 6 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ To specify which configuration file to load, pass the `-config.file` flag at the
* `<string>`: a regular string
* `<url>`: an URL
* `<prefix>`: a CLI flag prefix based on the context (look at the parent configuration block to see which CLI flags prefix should be used)
* `<relabel_config>`: a [prometheus relabeling configuration](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config).
* `<time>`: a timestamp, with available formats: `2006-01-20` (midnight, local timezone), `2006-01-20T15:04` (local timezone), and RFC 3339 formats: `2006-01-20T15:04:05Z` (UTC) or `2006-01-20T15:04:05+07:00` (explicit timezone)

### Use environment variables in the configuration
Expand Down Expand Up @@ -2799,6 +2800,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -distributor.ingestion-tenant-shard-size
[ingestion_tenant_shard_size: <int> | default = 0]

# List of metric relabel configurations. Note that in most situations, it is
# more effective to use metrics relabeling directly in the Prometheus server,
# e.g. remote_write.write_relabel_configs.
[metric_relabel_configs: <relabel_config...> | default = ]

# The maximum number of series for which a query can fetch samples from each
# ingester. This limit is enforced only in the ingesters (when querying samples
# not flushed to the storage yet) and it's a per-instance limit. This limit is
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/config-file-reference.template
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ To specify which configuration file to load, pass the `-config.file` flag at the
* `<string>`: a regular string
* `<url>`: an URL
* `<prefix>`: a CLI flag prefix based on the context (look at the parent configuration block to see which CLI flags prefix should be used)
* `<relabel_config>`: a [prometheus relabeling configuration](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config).
* `<time>`: a timestamp, with available formats: `2006-01-20` (midnight, local timezone), `2006-01-20T15:04` (local timezone), and RFC 3339 formats: `2006-01-20T15:04:05Z` (UTC) or `2006-01-20T15:04:05+07:00` (explicit timezone)

### Use environment variables in the configuration
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,4 @@ Currently experimental features are:
- TLS configuration in gRPC and HTTP clients.
- TLS configuration in Etcd client.
- Blocksconvert tools
- Metric relabeling in the distributor.
6 changes: 6 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/scrape"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/instrument"
Expand Down Expand Up @@ -446,6 +447,11 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
latestSampleTimestampMs = util.Max64(latestSampleTimestampMs, ts.Samples[len(ts.Samples)-1].TimestampMs)
}

if mrc := d.limits.MetricRelabelConfigs(userID); len(mrc) > 0 {
l := relabel.Process(client.FromLabelAdaptersToLabels(ts.Labels), mrc...)
ts.Labels = client.FromLabelsToLabelAdapters(l)
}

// If we found both the cluster and replica labels, we only want to include the cluster label when
// storing series in Cortex. If we kept the replica label we would end up with another series for the same
// series we're trying to dedupe when HA tracking moves over to a different replica.
Expand Down
75 changes: 75 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
Expand Down Expand Up @@ -1720,6 +1721,80 @@ func TestSortLabels(t *testing.T) {
})
}

func TestDistributor_Push_Relabel(t *testing.T) {
ctx = user.InjectOrgID(context.Background(), "user")

type testcase struct {
inputSeries labels.Labels
expectedSeries labels.Labels
metricRelabelConfigs []*relabel.Config
}

cases := []testcase{
// No relabel config.
{
inputSeries: labels.Labels{
{Name: "__name__", Value: "foo"},
{Name: "cluster", Value: "one"},
},
expectedSeries: labels.Labels{
{Name: "__name__", Value: "foo"},
{Name: "cluster", Value: "one"},
},
},
{
inputSeries: labels.Labels{
{Name: "__name__", Value: "foo"},
{Name: "cluster", Value: "one"},
},
expectedSeries: labels.Labels{
{Name: "__name__", Value: "foo"},
{Name: "cluster", Value: "two"},
},
metricRelabelConfigs: []*relabel.Config{
{
SourceLabels: []model.LabelName{"cluster"},
Action: relabel.DefaultRelabelConfig.Action,
Regex: relabel.DefaultRelabelConfig.Regex,
TargetLabel: "cluster",
Replacement: "two",
},
},
},
}

for _, tc := range cases {
var err error
var limits validation.Limits
flagext.DefaultValues(&limits)
limits.MetricRelabelConfigs = tc.metricRelabelConfigs

ds, ingesters, r := prepare(t, prepConfig{
numIngesters: 2,
happyIngesters: 2,
numDistributors: 1,
shardByAllLabels: true,
limits: &limits,
})
defer stopAll(ds, r)

// Push the series to the distributor
req := mockWriteRequest(tc.inputSeries, 1, 1)
_, err = ds[0].Push(ctx, req)
require.NoError(t, err)

// Since each test pushes only 1 series, we do expect the ingester
// to have received exactly 1 series
for i := range ingesters {
timeseries := ingesters[i].series()
assert.Equal(t, 1, len(timeseries))
for _, v := range timeseries {
assert.Equal(t, tc.expectedSeries, client.FromLabelAdaptersToLabels(v.Labels))
}
}
}
}

func countMockIngestersCalls(ingesters []mockIngester, name string) int {
count := 0
for i := 0; i < len(ingesters); i++ {
Expand Down
8 changes: 8 additions & 0 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"flag"
"time"

"github.com/prometheus/prometheus/pkg/relabel"

"github.com/cortexproject/cortex/pkg/util/flagext"
)

Expand Down Expand Up @@ -46,6 +48,7 @@ type Limits struct {
EnforceMetadataMetricName bool `yaml:"enforce_metadata_metric_name"`
EnforceMetricName bool `yaml:"enforce_metric_name"`
IngestionTenantShardSize int `yaml:"ingestion_tenant_shard_size"`
MetricRelabelConfigs []*relabel.Config `yaml:"metric_relabel_configs,omitempty" doc:"nocli|description=List of metric relabel configurations. Note that in most situations, it is more effective to use metrics relabeling directly in the Prometheus server, e.g. remote_write.write_relabel_configs."`

// Ingester enforced limits.
// Series
Expand Down Expand Up @@ -376,6 +379,11 @@ func (o *Overrides) EvaluationDelay(userID string) time.Duration {
return o.getOverridesForUser(userID).RulerEvaluationDelay
}

// MetricRelabelConfigs returns the metric relabel configs for a given user.
func (o *Overrides) MetricRelabelConfigs(userID string) []*relabel.Config {
return o.getOverridesForUser(userID).MetricRelabelConfigs
}

// RulerTenantShardSize returns shard size (number of rulers) used by this tenant when using shuffle-sharding strategy.
func (o *Overrides) RulerTenantShardSize(userID string) int {
return o.getOverridesForUser(userID).RulerTenantShardSize
Expand Down
27 changes: 26 additions & 1 deletion pkg/util/validation/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package validation
import (
"testing"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"
Expand Down Expand Up @@ -82,9 +84,32 @@ func TestLimitsLoadingFromYaml(t *testing.T) {
inp := `ingestion_rate: 0.5`

l := Limits{}
err := yaml.Unmarshal([]byte(inp), &l)
err := yaml.UnmarshalStrict([]byte(inp), &l)
require.NoError(t, err)

assert.Equal(t, 0.5, l.IngestionRate, "from yaml")
assert.Equal(t, 100, l.MaxLabelNameLength, "from defaults")
}

func TestMetricRelabelConfigLimitsLoadingFromYaml(t *testing.T) {
SetDefaultLimitsForYAMLUnmarshalling(Limits{})

inp := `
metric_relabel_configs:
- action: drop
source_labels: [le]
regex: .+
`
exp := relabel.DefaultRelabelConfig
exp.Action = relabel.Drop
regex, err := relabel.NewRegexp(".+")
require.NoError(t, err)
exp.Regex = regex
exp.SourceLabels = model.LabelNames([]model.LabelName{"le"})

l := Limits{}
err = yaml.UnmarshalStrict([]byte(inp), &l)
require.NoError(t, err)

assert.Equal(t, []*relabel.Config{&exp}, l.MetricRelabelConfigs)
}
19 changes: 19 additions & 0 deletions tools/doc-generator/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,16 @@ func parseConfig(block *configBlock, cfg interface{}, flags map[uintptr]*flag.Fl
if err != nil {
return nil, errors.Wrapf(err, "config=%s.%s", t.PkgPath(), t.Name())
}
if fieldFlag == nil {
block.Add(&configEntry{
kind: "field",
name: fieldName,
required: isFieldRequired(field),
fieldDesc: getFieldDescription(field, ""),
fieldType: fieldType,
})
continue
}

block.Add(&configEntry{
kind: "field",
Expand Down Expand Up @@ -243,6 +253,8 @@ func getFieldType(t reflect.Type) (string, error) {
return "string", nil
case "flagext.StringSliceCSV":
return "string", nil
case "[]*relabel.Config":
return "relabel_config...", nil
}

// Fallback to auto-detection of built-in data types
Expand Down Expand Up @@ -297,6 +309,9 @@ func getFieldType(t reflect.Type) (string, error) {
}

func getFieldFlag(field reflect.StructField, fieldValue reflect.Value, flags map[uintptr]*flag.Flag) (*flag.Flag, error) {
if isAbsentInCLI(field) {
return nil, nil
}
fieldPtr := fieldValue.Addr().Pointer()
fieldFlag, ok := flags[fieldPtr]
if !ok {
Expand Down Expand Up @@ -395,6 +410,10 @@ func isFieldHidden(f reflect.StructField) bool {
return getDocTagFlag(f, "hidden")
}

func isAbsentInCLI(f reflect.StructField) bool {
return getDocTagFlag(f, "nocli")
}

func isFieldRequired(f reflect.StructField) bool {
return getDocTagFlag(f, "required")
}
Expand Down