Skip to content

Commit

Permalink
feat(policies): Per policy enforced labels (#16182)
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts authored Feb 13, 2025
1 parent 14e2c87 commit 5fda84b
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 23 deletions.
10 changes: 10 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3618,6 +3618,16 @@ otlp_config:
# CLI flag: -validation.enforced-labels
[enforced_labels: <list of strings> | default = []]

# Map of policies to enforced labels. Example:
# policy_enforced_labels:
# policy1:
# - label1
# - label2
# policy2:
# - label3
# - label4
[policy_enforced_labels: <map of string to list of strings>]

# Map of policies to stream selectors with a priority. Experimental. Example:
# policy_stream_mapping:
# finance:
Expand Down
17 changes: 14 additions & 3 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
continue
}

if missing, lbsMissing := d.missingEnforcedLabels(lbs, tenantID); missing {
if missing, lbsMissing := d.missingEnforcedLabels(lbs, tenantID, policy); missing {
err := fmt.Errorf(validation.MissingEnforcedLabelsErrorMsg, strings.Join(lbsMissing, ","), tenantID)
d.writeFailuresManager.Log(tenantID, err)
validationErrors.Add(err)
Expand Down Expand Up @@ -768,16 +768,27 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
// missingEnforcedLabels returns true if the stream is missing any of the required labels.
//
// It also returns the first label that is missing if any (for the case of multiple labels missing).
func (d *Distributor) missingEnforcedLabels(lbs labels.Labels, tenantID string) (bool, []string) {
requiredLbs := d.validator.Limits.EnforcedLabels(tenantID)
func (d *Distributor) missingEnforcedLabels(lbs labels.Labels, tenantID string, policy string) (bool, []string) {
perPolicyEnforcedLabels := d.validator.Limits.PolicyEnforcedLabels(tenantID, policy)
globalEnforcedLabels := d.validator.Limits.EnforcedLabels(tenantID)

requiredLbs := append(globalEnforcedLabels, perPolicyEnforcedLabels...)
if len(requiredLbs) == 0 {
// no enforced labels configured.
return false, []string{}
}

// Use a map to deduplicate the required labels. Duplicates may happen if the same label is configured
// in both global and per-policy enforced labels.
seen := make(map[string]struct{})
missingLbs := []string{}

for _, lb := range requiredLbs {
if _, ok := seen[lb]; ok {
continue
}

seen[lb] = struct{}{}
if !lbs.Has(lb) {
missingLbs = append(missingLbs, lb)
}
Expand Down
20 changes: 12 additions & 8 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,26 +431,30 @@ func Test_MissingEnforcedLabels(t *testing.T) {
flagext.DefaultValues(limits)

limits.EnforcedLabels = []string{"app", "env"}
limits.PolicyEnforcedLabels = map[string][]string{
"policy1": {"cluster", "namespace"},
"policy2": {"namespace"},
}

distributors, _ := prepare(t, 1, 5, limits, nil)

// request with all required labels.
lbs := labels.FromMap(map[string]string{"app": "foo", "env": "prod"})
missing, missingLabels := distributors[0].missingEnforcedLabels(lbs, "test")
lbs := labels.FromMap(map[string]string{"app": "foo", "env": "prod", "cluster": "cluster1", "namespace": "ns1"})
missing, missingLabels := distributors[0].missingEnforcedLabels(lbs, "test", "policy1")
assert.False(t, missing)
assert.Empty(t, missingLabels)

// request missing the `app` label.
lbs = labels.FromMap(map[string]string{"env": "prod"})
missing, missingLabels = distributors[0].missingEnforcedLabels(lbs, "test")
// request missing the `app` label from global enforced labels and `cluster` label from policy enforced labels.
lbs = labels.FromMap(map[string]string{"env": "prod", "namespace": "ns1"})
missing, missingLabels = distributors[0].missingEnforcedLabels(lbs, "test", "policy1")
assert.True(t, missing)
assert.EqualValues(t, []string{"app"}, missingLabels)
assert.EqualValues(t, []string{"app", "cluster"}, missingLabels)

// request missing all required labels.
lbs = labels.FromMap(map[string]string{"pod": "distributor-abc"})
missing, missingLabels = distributors[0].missingEnforcedLabels(lbs, "test")
missing, missingLabels = distributors[0].missingEnforcedLabels(lbs, "test", "policy2")
assert.True(t, missing)
assert.EqualValues(t, []string{"app", "env"}, missingLabels)
assert.EqualValues(t, []string{"app", "env", "namespace"}, missingLabels)
}

func Test_PushWithEnforcedLabels(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/distributor/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Limits interface {
BlockIngestionUntil(userID string) time.Time
BlockIngestionStatusCode(userID string) int
EnforcedLabels(userID string) []string
PolicyEnforcedLabels(userID string, policy string) []string

IngestionPartitionsTenantShardSize(userID string) int
}
6 changes: 6 additions & 0 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ type Limits struct {
BlockIngestionUntil dskit_flagext.Time `yaml:"block_ingestion_until" json:"block_ingestion_until"`
BlockIngestionStatusCode int `yaml:"block_ingestion_status_code" json:"block_ingestion_status_code"`
EnforcedLabels []string `yaml:"enforced_labels" json:"enforced_labels" category:"experimental"`
PolicyEnforcedLabels map[string][]string `yaml:"policy_enforced_labels" json:"policy_enforced_labels" category:"experimental" doc:"description=Map of policies to enforced labels. Example:\n policy_enforced_labels: \n policy1: \n - label1 \n - label2 \n policy2: \n - label3 \n - label4"`
PolicyStreamMapping PolicyStreamMapping `yaml:"policy_stream_mapping" json:"policy_stream_mapping" category:"experimental" doc:"description=Map of policies to stream selectors with a priority. Experimental. Example:\n policy_stream_mapping: \n finance: \n - selector: '{namespace=\"prod\", container=\"billing\"}' \n priority: 2 \n ops: \n - selector: '{namespace=\"prod\", container=\"ops\"}' \n priority: 1 \n staging: \n - selector: '{namespace=\"staging\"}' \n priority: 1"`

IngestionPartitionsTenantShardSize int `yaml:"ingestion_partitions_tenant_shard_size" json:"ingestion_partitions_tenant_shard_size" category:"experimental"`
Expand Down Expand Up @@ -446,6 +447,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.Var(&l.BlockIngestionUntil, "limits.block-ingestion-until", "Block ingestion until the configured date. The time should be in RFC3339 format.")
f.IntVar(&l.BlockIngestionStatusCode, "limits.block-ingestion-status-code", defaultBlockedIngestionStatusCode, "HTTP status code to return when ingestion is blocked. If 200, the ingestion will be blocked without returning an error to the client. By Default, a custom status code (260) is returned to the client along with an error message.")
f.Var((*dskit_flagext.StringSlice)(&l.EnforcedLabels), "validation.enforced-labels", "List of labels that must be present in the stream. If any of the labels are missing, the stream will be discarded. This flag configures it globally for all tenants. Experimental.")
l.PolicyEnforcedLabels = make(map[string][]string)

f.IntVar(&l.IngestionPartitionsTenantShardSize, "limits.ingestion-partition-tenant-shard-size", 0, "The number of partitions a tenant's data should be sharded to when using kafka ingestion. Tenants are sharded across partitions using shuffle-sharding. 0 disables shuffle sharding and tenant is sharded across all partitions.")

Expand Down Expand Up @@ -1124,6 +1126,10 @@ func (o *Overrides) EnforcedLabels(userID string) []string {
return o.getOverridesForUser(userID).EnforcedLabels
}

func (o *Overrides) PolicyEnforcedLabels(userID string, policy string) []string {
return o.getOverridesForUser(userID).PolicyEnforcedLabels[policy]
}

func (o *Overrides) PoliciesStreamMapping(userID string) PolicyStreamMapping {
return o.getOverridesForUser(userID).PolicyStreamMapping
}
Expand Down
29 changes: 17 additions & 12 deletions pkg/validation/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,10 @@ ruler_remote_write_headers:
Selector: `{a="b"}`,
},
},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyStreamMapping: PolicyStreamMapping{},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyEnforcedLabels: map[string][]string{},
PolicyStreamMapping: PolicyStreamMapping{},
},
},
{
Expand All @@ -247,9 +248,10 @@ ruler_remote_write_headers:
Selector: `{a="b"}`,
},
},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyStreamMapping: PolicyStreamMapping{},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyEnforcedLabels: map[string][]string{},
PolicyStreamMapping: PolicyStreamMapping{},
},
},
{
Expand All @@ -274,6 +276,7 @@ retention_stream:
RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"a": "b"}},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyEnforcedLabels: map[string][]string{},
PolicyStreamMapping: PolicyStreamMapping{},
},
},
Expand All @@ -296,9 +299,10 @@ reject_old_samples: true
Selector: `{a="b"}`,
},
},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyStreamMapping: PolicyStreamMapping{},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyEnforcedLabels: map[string][]string{},
PolicyStreamMapping: PolicyStreamMapping{},
},
},
{
Expand All @@ -321,9 +325,10 @@ query_timeout: 5m
Selector: `{a="b"}`,
},
},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyStreamMapping: PolicyStreamMapping{},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyEnforcedLabels: map[string][]string{},
PolicyStreamMapping: PolicyStreamMapping{},
},
},
} {
Expand Down

0 comments on commit 5fda84b

Please sign in to comment.