Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for blocking a policy to be ingested #16203

Merged
merged 23 commits into from
Feb 17, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,16 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
continue
}

if ok, until := d.ShouldBlockPolicy(lbs, tenantID); ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly to how we enforce labels, I think this one should also replace the d.validator.ShouldBlockIngestion down below which enforces the per-tenant block.

if block, until, retStatusCode := d.validator.ShouldBlockIngestion(validationContext, now); block {
d.trackDiscardedData(ctx, req, validationContext, tenantID, validationContext.validationMetrics, validation.BlockedIngestion)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we should pass the already resolved policy here to avoid resolving it twice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

err := fmt.Errorf(validation.BlockedIngestionPolicyErrorMsg, tenantID, until.Format(time.RFC3339), d.validator.Limits.BlockIngestionPolicyStatusCode(tenantID, policy))
d.writeFailuresManager.Log(tenantID, err)
validationErrors.Add(err)
validation.DiscardedSamples.WithLabelValues(validation.BlockedIngestionPolicy, tenantID, retentionHours, policy).Add(float64(len(stream.Entries)))
discardedBytes := util.EntriesTotalSize(stream.Entries)
validation.DiscardedBytes.WithLabelValues(validation.BlockedIngestionPolicy, tenantID, retentionHours, policy).Add(float64(discardedBytes))
continue
}

n := 0
pushSize := 0
prevTs := stream.Entries[0].Timestamp
Expand Down Expand Up @@ -1293,3 +1303,33 @@ func newRingAndLifecycler(cfg RingConfig, instanceCount *atomic.Uint32, logger l
func (d *Distributor) HealthyInstancesCount() int {
return int(d.healthyInstancesCount.Load())
}

// ShouldBlockPolicy checks if ingestion should be blocked for the given labels based on their policy.
// It returns true if ingestion should be blocked.
func (d *Distributor) ShouldBlockPolicy(lbs labels.Labels, tenantID string) (bool, time.Time) {
// Get policy mappings for the tenant
mapping := d.validator.Limits.PoliciesStreamMapping(tenantID)
if mapping == nil {
// No policy mappings defined, don't block
return false, time.Time{}
}

// Get the policy for these labels
policy := mapping.PolicyFor(lbs)
if policy == "" {
// No specific policy, don't block
return false, time.Time{}
}

// Check if this policy is blocked in tenant configs
blockUntil := d.validator.Limits.BlockIngestionPolicyUntil(tenantID, policy)
if blockUntil.IsZero() {
return false, time.Time{}
}

if time.Now().Before(blockUntil) {
return true, blockUntil
}

return false, time.Time{}
}
114 changes: 112 additions & 2 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@ import (

otlptranslate "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus"

"github.com/grafana/loki/pkg/push"

"github.com/c2h5oh/datasize"
"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
dskit_flagext "github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/kv/consul"
Expand All @@ -38,6 +37,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/loki/pkg/push"
"github.com/grafana/loki/v3/pkg/ingester"
"github.com/grafana/loki/v3/pkg/ingester/client"
loghttp_push "github.com/grafana/loki/v3/pkg/loghttp/push"
Expand Down Expand Up @@ -1677,6 +1677,116 @@ func TestDistributor_PushIngestionBlocked(t *testing.T) {
}
}

func TestDistributor_PushIngestionBlockedByPolicy(t *testing.T) {
now := time.Now()

for _, tc := range []struct {
name string
blockUntil map[string]time.Time
blockStatusCode map[string]int
policy string
labels string
expectError bool
expectedErrorMsg string
}{
{
name: "not blocked - no policy block configured",
policy: "test-policy",
labels: `{foo="bar"}`,
expectError: false,
},
{
name: "not blocked - policy block expired",
blockUntil: map[string]time.Time{
"test-policy": now.Add(-1 * time.Hour),
},
blockStatusCode: map[string]int{
"test-policy": 429,
},
policy: "test-policy",
labels: `{foo="bar"}`,
expectError: false,
},
{
name: "blocked - policy block active",
blockUntil: map[string]time.Time{
"test-policy": now.Add(1 * time.Hour),
},
blockStatusCode: map[string]int{
"test-policy": 429,
},
policy: "test-policy",
labels: `{foo="bar"}`,
expectError: true,
expectedErrorMsg: fmt.Sprintf(validation.BlockedIngestionPolicyErrorMsg, "test", now.Add(1*time.Hour).Format(time.RFC3339), 429),
},
{
name: "not blocked - different policy",
blockUntil: map[string]time.Time{
"blocked-policy": now.Add(1 * time.Hour),
},
blockStatusCode: map[string]int{
"blocked-policy": 429,
},
policy: "test-policy",
labels: `{foo="bar"}`,
expectError: false,
},
{
name: "blocked - custom status code",
blockUntil: map[string]time.Time{
"test-policy": now.Add(1 * time.Hour),
},
blockStatusCode: map[string]int{
"test-policy": 456,
},
policy: "test-policy",
labels: `{foo="bar"}`,
expectError: true,
expectedErrorMsg: fmt.Sprintf(validation.BlockedIngestionPolicyErrorMsg, "test", now.Add(1*time.Hour).Format(time.RFC3339), 456),
},
} {
t.Run(tc.name, func(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)

// Configure policy mapping
limits.PolicyStreamMapping = validation.PolicyStreamMapping{
tc.policy: []*validation.PriorityStream{
{
Selector: tc.labels,
Priority: 1,
},
},
}

// Configure policy blocks
if tc.blockUntil != nil {
limits.BlockIngestionPolicyUntil = make(map[string]dskit_flagext.Time)
for policy, until := range tc.blockUntil {
limits.BlockIngestionPolicyUntil[policy] = dskit_flagext.Time(until)
}
}

if tc.blockStatusCode != nil {
limits.BlockIngestionPolicyStatusCode = tc.blockStatusCode
}

distributors, _ := prepare(t, 1, 5, limits, nil)
request := makeWriteRequestWithLabels(1, 1024, []string{tc.labels}, false, false, false)
response, err := distributors[0].Push(ctx, request)

if tc.expectError {
require.Error(t, err)
require.Contains(t, err.Error(), tc.expectedErrorMsg)
} else {
require.NoError(t, err)
require.Equal(t, success, response)
}
})
}
}

func prepare(t *testing.T, numDistributors, numIngesters int, limits *validation.Limits, factory func(addr string) (ring_client.PoolClient, error)) ([]*Distributor, []mockIngester) {
t.Helper()

Expand Down
2 changes: 2 additions & 0 deletions pkg/distributor/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type Limits interface {

BlockIngestionUntil(userID string) time.Time
BlockIngestionStatusCode(userID string) int
BlockIngestionPolicyUntil(userID string, policy string) time.Time
BlockIngestionPolicyStatusCode(userID string, policy string) int
EnforcedLabels(userID string) []string

IngestionPartitionsTenantShardSize(userID string) int
Expand Down
36 changes: 32 additions & 4 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,12 @@ type Limits struct {
OTLPConfig push.OTLPConfig `yaml:"otlp_config" json:"otlp_config" doc:"description=OTLP log ingestion configurations"`
GlobalOTLPConfig push.GlobalOTLPConfig `yaml:"-" json:"-"`

BlockIngestionUntil dskit_flagext.Time `yaml:"block_ingestion_until" json:"block_ingestion_until"`
BlockIngestionStatusCode int `yaml:"block_ingestion_status_code" json:"block_ingestion_status_code"`
EnforcedLabels []string `yaml:"enforced_labels" json:"enforced_labels" category:"experimental"`
PolicyStreamMapping PolicyStreamMapping `yaml:"policy_stream_mapping" json:"policy_stream_mapping" category:"experimental" doc:"description=Map of policies to stream selectors with a priority. Experimental. Example:\n policy_stream_mapping: \n finance: \n - selector: '{namespace=\"prod\", container=\"billing\"}' \n priority: 2 \n ops: \n - selector: '{namespace=\"prod\", container=\"ops\"}' \n priority: 1 \n staging: \n - selector: '{namespace=\"staging\"}' \n priority: 1"`
BlockIngestionUntil dskit_flagext.Time `yaml:"block_ingestion_until" json:"block_ingestion_until"`
BlockIngestionStatusCode int `yaml:"block_ingestion_status_code" json:"block_ingestion_status_code"`
BlockIngestionPolicyUntil map[string]dskit_flagext.Time `yaml:"block_ingestion_policy_until" json:"block_ingestion_policy_until"`
BlockIngestionPolicyStatusCode map[string]int `yaml:"block_ingestion_policy_status_code" json:"block_ingestion_policy_status_code"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a per policy status code? I think we can just reuse the per-tenant BlockIngestionStatusCode.
I wouldn't add it unless we need it in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really. got rid of it, ty

EnforcedLabels []string `yaml:"enforced_labels" json:"enforced_labels" category:"experimental"`
PolicyStreamMapping PolicyStreamMapping `yaml:"policy_stream_mapping" json:"policy_stream_mapping" category:"experimental" doc:"description=Map of policies to stream selectors with a priority. Experimental. Example:\n policy_stream_mapping: \n finance: \n - selector: '{namespace=\"prod\", container=\"billing\"}' \n priority: 2 \n ops: \n - selector: '{namespace=\"prod\", container=\"ops\"}' \n priority: 1 \n staging: \n - selector: '{namespace=\"staging\"}' \n priority: 1"`

IngestionPartitionsTenantShardSize int `yaml:"ingestion_partitions_tenant_shard_size" json:"ingestion_partitions_tenant_shard_size" category:"experimental"`

Expand Down Expand Up @@ -1120,6 +1122,32 @@ func (o *Overrides) BlockIngestionStatusCode(userID string) int {
return o.getOverridesForUser(userID).BlockIngestionStatusCode
}

func (o *Overrides) BlockIngestionPolicyUntil(userID string, policy string) time.Time {
limits := o.getOverridesForUser(userID)
if limits == nil || limits.BlockIngestionPolicyUntil == nil {
return time.Time{} // Zero time means no blocking
}

if blockUntil, ok := limits.BlockIngestionPolicyUntil[policy]; ok {
return time.Time(blockUntil)
}
return time.Time{} // Zero time means no blocking
}

// BlockIngestionPolicyStatusCode returns the status code to use when blocking ingestion for a given policy.
func (o *Overrides) BlockIngestionPolicyStatusCode(userID string, policy string) int {
limits := o.getOverridesForUser(userID)
if limits == nil {
return defaultBlockedIngestionStatusCode
}

if statusCode, ok := limits.BlockIngestionPolicyStatusCode[policy]; ok {
return statusCode
}

return defaultBlockedIngestionStatusCode
}

func (o *Overrides) EnforcedLabels(userID string) []string {
return o.getOverridesForUser(userID).EnforcedLabels
}
Expand Down
44 changes: 28 additions & 16 deletions pkg/validation/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"

dskit_flagext "github.com/grafana/dskit/flagext"

"github.com/grafana/loki/v3/pkg/compactor/deletionmode"
"github.com/grafana/loki/v3/pkg/compression"
"github.com/grafana/loki/v3/pkg/loghttp/push"
Expand Down Expand Up @@ -226,9 +228,11 @@ ruler_remote_write_headers:
Selector: `{a="b"}`,
},
},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyStreamMapping: PolicyStreamMapping{},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyStreamMapping: PolicyStreamMapping{},
BlockIngestionPolicyStatusCode: map[string]int{},
BlockIngestionPolicyUntil: map[string]dskit_flagext.Time{},
},
},
{
Expand All @@ -247,9 +251,11 @@ ruler_remote_write_headers:
Selector: `{a="b"}`,
},
},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyStreamMapping: PolicyStreamMapping{},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyStreamMapping: PolicyStreamMapping{},
BlockIngestionPolicyStatusCode: map[string]int{},
BlockIngestionPolicyUntil: map[string]dskit_flagext.Time{},
},
},
{
Expand All @@ -271,10 +277,12 @@ retention_stream:
},

// Rest from new defaults
RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"a": "b"}},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyStreamMapping: PolicyStreamMapping{},
RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"a": "b"}},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyStreamMapping: PolicyStreamMapping{},
BlockIngestionPolicyStatusCode: map[string]int{},
BlockIngestionPolicyUntil: map[string]dskit_flagext.Time{},
},
},
{
Expand All @@ -296,9 +304,11 @@ reject_old_samples: true
Selector: `{a="b"}`,
},
},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyStreamMapping: PolicyStreamMapping{},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyStreamMapping: PolicyStreamMapping{},
BlockIngestionPolicyStatusCode: map[string]int{},
BlockIngestionPolicyUntil: map[string]dskit_flagext.Time{},
},
},
{
Expand All @@ -321,9 +331,11 @@ query_timeout: 5m
Selector: `{a="b"}`,
},
},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyStreamMapping: PolicyStreamMapping{},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
PolicyStreamMapping: PolicyStreamMapping{},
BlockIngestionPolicyStatusCode: map[string]int{},
BlockIngestionPolicyUntil: map[string]dskit_flagext.Time{},
},
},
} {
Expand Down
2 changes: 2 additions & 0 deletions pkg/validation/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ const (
StructuredMetadataTooManyErrorMsg = "stream '%s' has too many structured metadata labels: '%d', limit: '%d'. Please see `limits_config.max_structured_metadata_entries_count` or contact your Loki administrator to increase it."
BlockedIngestion = "blocked_ingestion"
BlockedIngestionErrorMsg = "ingestion blocked for user %s until '%s' with status code '%d'"
BlockedIngestionPolicy = "blocked_ingestion_policy"
BlockedIngestionPolicyErrorMsg = "ingestion blocked for user %s until '%s' with status code '%d'"
MissingEnforcedLabels = "missing_enforced_labels"
MissingEnforcedLabelsErrorMsg = "missing required labels %s for user %s"
)
Expand Down
Loading