Skip to content

Commit

Permalink
Create custom struct that eases json/yaml unmarshaling of complex rel…
Browse files Browse the repository at this point in the history
…abel.Config struct (#4364)

Signed-off-by: Danny Kopping <danny.kopping@grafana.com>
  • Loading branch information
Danny Kopping committed Sep 27, 2021
1 parent 5b2b3fa commit 58dc90c
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 22 deletions.
4 changes: 2 additions & 2 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/pkg/rulefmt"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/promql"
Expand All @@ -27,6 +26,7 @@ import (

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/ruler/util"
)

// RulesLimits is the one function we need from limits.Overrides, and
Expand All @@ -38,7 +38,7 @@ type RulesLimits interface {
RulerRemoteWriteURL(userID string) string
RulerRemoteWriteTimeout(userID string) time.Duration
RulerRemoteWriteHeaders(userID string) map[string]string
RulerRemoteWriteRelabelConfigs(userID string) []*relabel.Config
RulerRemoteWriteRelabelConfigs(userID string) []*util.RelabelConfig
RulerRemoteWriteQueueCapacity(userID string) int
RulerRemoteWriteQueueMinShards(userID string) int
RulerRemoteWriteQueueMaxShards(userID string) int
Expand Down
32 changes: 31 additions & 1 deletion pkg/ruler/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/exemplar"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/storage"
"github.com/weaveworks/common/user"
"gopkg.in/yaml.v2"

"github.com/grafana/loki/pkg/ruler/storage/cleaner"
"github.com/grafana/loki/pkg/ruler/storage/instance"
Expand Down Expand Up @@ -228,12 +230,17 @@ func (r *walRegistry) getTenantRemoteWriteConfig(tenant string, base RemoteWrite
return nil, fmt.Errorf("error parsing given remote-write URL: %w", err)
}

relabelConfigs, err := r.createRelabelConfigs(tenant)
if err != nil {
return nil, fmt.Errorf("failed to parse relabel configs: %w", err)
}

overrides := RemoteWriteConfig{
Client: config.RemoteWriteConfig{
URL: &promConfig.URL{u},
RemoteTimeout: model.Duration(r.overrides.RulerRemoteWriteTimeout(tenant)),
Headers: r.overrides.RulerRemoteWriteHeaders(tenant),
WriteRelabelConfigs: r.overrides.RulerRemoteWriteRelabelConfigs(tenant),
WriteRelabelConfigs: relabelConfigs,
Name: fmt.Sprintf("%s-rw", tenant),
SendExemplars: false,
// TODO(dannyk): configure HTTP client overrides
Expand Down Expand Up @@ -270,6 +277,29 @@ func (r *walRegistry) getTenantRemoteWriteConfig(tenant string, base RemoteWrite
return copy, nil
}

// createRelabelConfigs converts the util.RelabelConfig into relabel.Config to allow for
// more control over json/yaml unmarshaling
func (r *walRegistry) createRelabelConfigs(tenant string) ([]*relabel.Config, error) {
configs := r.overrides.RulerRemoteWriteRelabelConfigs(tenant)

var relabelConfigs []*relabel.Config
for _, config := range configs {
out, err := yaml.Marshal(config)
if err != nil {
return nil, err
}

var rc relabel.Config
if err = yaml.Unmarshal(out, &rc); err != nil {
return nil, err
}

relabelConfigs = append(relabelConfigs, &rc)
}

return relabelConfigs, nil
}

var errNotReady = errors.New("appender not ready")

type notReadyAppender struct{}
Expand Down
54 changes: 50 additions & 4 deletions pkg/ruler/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,23 @@ import (
"github.com/weaveworks/common/user"

"github.com/grafana/loki/pkg/ruler/storage/instance"
"github.com/grafana/loki/pkg/ruler/util"
"github.com/grafana/loki/pkg/validation"
)

const enabledRWTenant = "12345"
const disabledRWTenant = "54321"
const additionalHeadersRWTenant = "55443"
const customRelabelsTenant = "98765"
const badRelabelsTenant = "45677"

const defaultCapacity = 1000

func newFakeLimits() fakeLimits {
return fakeLimits{
limits: map[string]*validation.Limits{
enabledRWTenant: {
RulerRemoteWriteQueueCapacity: 987,
RulerRemoteWriteRelabelConfigs: []*relabel.Config{},
RulerRemoteWriteQueueCapacity: 987,
},
disabledRWTenant: {
RulerRemoteWriteDisabled: true,
Expand All @@ -48,6 +50,27 @@ func newFakeLimits() fakeLimits {
"Additional": "Header",
},
},
customRelabelsTenant: {
RulerRemoteWriteRelabelConfigs: []*util.RelabelConfig{
{
Regex: ".+:.+",
SourceLabels: []string{"__name__"},
Action: "drop",
},
{
Regex: "__cluster__",
Action: "labeldrop",
},
},
},
badRelabelsTenant: {
RulerRemoteWriteRelabelConfigs: []*util.RelabelConfig{
{
SourceLabels: []string{"__cluster__"},
Action: "labeldrop",
},
},
},
},
}
}
Expand Down Expand Up @@ -104,8 +127,6 @@ func TestTenantRemoteWriteConfigWithOverride(t *testing.T) {
assert.Len(t, tenantCfg.RemoteWrite, 1)
// but the tenant has an override for the queue capacity
assert.Equal(t, tenantCfg.RemoteWrite[0].QueueConfig.Capacity, 987)
// it should also override the default label configs (in this case by clearing them)
assert.Len(t, tenantCfg.RemoteWrite[0].WriteRelabelConfigs, 0)
}

func TestTenantRemoteWriteConfigWithoutOverride(t *testing.T) {
Expand Down Expand Up @@ -159,6 +180,31 @@ func TestTenantRemoteWriteHeaderOverride(t *testing.T) {
assert.Equal(t, tenantCfg.RemoteWrite[0].Headers[user.OrgIDHeaderName], enabledRWTenant)
}

func TestRelabelConfigOverrides(t *testing.T) {
walDir, err := createTempWALDir()
require.NoError(t, err)
reg := setupRegistry(t, walDir)
defer os.RemoveAll(walDir)

tenantCfg, err := reg.getTenantConfig(customRelabelsTenant)
require.NoError(t, err)

// it should also override the default label configs
assert.Len(t, tenantCfg.RemoteWrite[0].WriteRelabelConfigs, 2)
}

func TestRelabelConfigOverridesWithErrors(t *testing.T) {
walDir, err := createTempWALDir()
require.NoError(t, err)
reg := setupRegistry(t, walDir)
defer os.RemoveAll(walDir)

_, err = reg.getTenantConfig(badRelabelsTenant)

// ensure that relabel validation is being applied
require.EqualError(t, err, "failed to parse relabel configs: labeldrop action requires only 'regex', and no other fields")
}

func TestWALRegistryCreation(t *testing.T) {
overrides, err := validation.NewOverrides(validation.Limits{}, nil)
require.NoError(t, err)
Expand Down
22 changes: 22 additions & 0 deletions pkg/ruler/util/relabel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package util

// copy and modification of github.com/prometheus/prometheus/pkg/relabel/relabel.go
// reason: the custom types in github.com/prometheus/prometheus/pkg/relabel/relabel.go are difficult to unmarshal
type RelabelConfig struct {
// A list of labels from which values are taken and concatenated
// with the configured separator in order.
SourceLabels []string `yaml:"source_labels,flow,omitempty" json:"source_labels,omitempty"`
// Separator is the string between concatenated values from the source labels.
Separator string `yaml:"separator,omitempty" json:"separator,omitempty"`
// Regex against which the concatenation is matched.
Regex string `yaml:"regex,omitempty" json:"regex,omitempty"`
// Modulus to take of the hash of concatenated values from the source labels.
Modulus uint64 `yaml:"modulus,omitempty" json:"modulus,omitempty"`
// TargetLabel is the label to which the resulting string is written in a replacement.
// Regexp interpolation is allowed for the replace action.
TargetLabel string `yaml:"target_label,omitempty" json:"target_label,omitempty"`
// Replacement is the regex replacement pattern to be used.
Replacement string `yaml:"replacement,omitempty" json:"replacement,omitempty"`
// Action is the action to be performed for the relabeling.
Action string `yaml:"action,omitempty" json:"action,omitempty"`
}
30 changes: 15 additions & 15 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"golang.org/x/time/rate"

"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/ruler/util"
"github.com/grafana/loki/pkg/util/flagext"
)

Expand Down Expand Up @@ -83,19 +83,19 @@ type Limits struct {

// this field is the inversion of the general remote_write.enabled because the zero value of a boolean is false,
// and if it were ruler_remote_write_enabled, it would be impossible to know if the value was explicitly set or default
RulerRemoteWriteDisabled bool `yaml:"ruler_remote_write_disabled" json:"ruler_remote_write_disabled"`
RulerRemoteWriteURL string `yaml:"ruler_remote_write_url" json:"ruler_remote_write_url"`
RulerRemoteWriteTimeout time.Duration `yaml:"ruler_remote_write_timeout" json:"ruler_remote_write_timeout"`
RulerRemoteWriteHeaders map[string]string `yaml:"ruler_remote_write_headers" json:"ruler_remote_write_headers"`
RulerRemoteWriteRelabelConfigs []*relabel.Config `yaml:"ruler_remote_write_relabel_configs" json:"ruler_remote_write_relabel_configs"`
RulerRemoteWriteQueueCapacity int `yaml:"ruler_remote_write_queue_capacity" json:"ruler_remote_write_queue_capacity"`
RulerRemoteWriteQueueMinShards int `yaml:"ruler_remote_write_queue_min_shards" json:"ruler_remote_write_queue_min_shards"`
RulerRemoteWriteQueueMaxShards int `yaml:"ruler_remote_write_queue_max_shards" json:"ruler_remote_write_queue_max_shards"`
RulerRemoteWriteQueueMaxSamplesPerSend int `yaml:"ruler_remote_write_queue_max_samples_per_send" json:"ruler_remote_write_queue_max_samples_per_send"`
RulerRemoteWriteQueueBatchSendDeadline time.Duration `yaml:"ruler_remote_write_queue_batch_send_deadline" json:"ruler_remote_write_queue_batch_send_deadline"`
RulerRemoteWriteQueueMinBackoff time.Duration `yaml:"ruler_remote_write_queue_min_backoff" json:"ruler_remote_write_queue_min_backoff"`
RulerRemoteWriteQueueMaxBackoff time.Duration `yaml:"ruler_remote_write_queue_max_backoff" json:"ruler_remote_write_queue_max_backoff"`
RulerRemoteWriteQueueRetryOnRateLimit bool `yaml:"ruler_remote_write_queue_retry_on_ratelimit" json:"ruler_remote_write_queue_retry_on_ratelimit"`
RulerRemoteWriteDisabled bool `yaml:"ruler_remote_write_disabled" json:"ruler_remote_write_disabled"`
RulerRemoteWriteURL string `yaml:"ruler_remote_write_url" json:"ruler_remote_write_url"`
RulerRemoteWriteTimeout time.Duration `yaml:"ruler_remote_write_timeout" json:"ruler_remote_write_timeout"`
RulerRemoteWriteHeaders map[string]string `yaml:"ruler_remote_write_headers" json:"ruler_remote_write_headers"`
RulerRemoteWriteRelabelConfigs []*util.RelabelConfig `yaml:"ruler_remote_write_relabel_configs" json:"ruler_remote_write_relabel_configs"`
RulerRemoteWriteQueueCapacity int `yaml:"ruler_remote_write_queue_capacity" json:"ruler_remote_write_queue_capacity"`
RulerRemoteWriteQueueMinShards int `yaml:"ruler_remote_write_queue_min_shards" json:"ruler_remote_write_queue_min_shards"`
RulerRemoteWriteQueueMaxShards int `yaml:"ruler_remote_write_queue_max_shards" json:"ruler_remote_write_queue_max_shards"`
RulerRemoteWriteQueueMaxSamplesPerSend int `yaml:"ruler_remote_write_queue_max_samples_per_send" json:"ruler_remote_write_queue_max_samples_per_send"`
RulerRemoteWriteQueueBatchSendDeadline time.Duration `yaml:"ruler_remote_write_queue_batch_send_deadline" json:"ruler_remote_write_queue_batch_send_deadline"`
RulerRemoteWriteQueueMinBackoff time.Duration `yaml:"ruler_remote_write_queue_min_backoff" json:"ruler_remote_write_queue_min_backoff"`
RulerRemoteWriteQueueMaxBackoff time.Duration `yaml:"ruler_remote_write_queue_max_backoff" json:"ruler_remote_write_queue_max_backoff"`
RulerRemoteWriteQueueRetryOnRateLimit bool `yaml:"ruler_remote_write_queue_retry_on_ratelimit" json:"ruler_remote_write_queue_retry_on_ratelimit"`

// Global and per tenant retention
RetentionPeriod model.Duration `yaml:"retention_period" json:"retention_period"`
Expand Down Expand Up @@ -446,7 +446,7 @@ func (o *Overrides) RulerRemoteWriteHeaders(userID string) map[string]string {
}

// RulerRemoteWriteRelabelConfigs returns the write relabel configs to use in a remote-write for a given user.
func (o *Overrides) RulerRemoteWriteRelabelConfigs(userID string) []*relabel.Config {
func (o *Overrides) RulerRemoteWriteRelabelConfigs(userID string) []*util.RelabelConfig {
return o.getOverridesForUser(userID).RulerRemoteWriteRelabelConfigs
}

Expand Down

0 comments on commit 58dc90c

Please sign in to comment.