Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for blocking a policy to be ingested #16203

Merged
merged 23 commits into from
Feb 17, 2025
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3601,6 +3601,11 @@ otlp_config:
# drop them altogether
[log_attributes: <list of attributes_configs>]

# Block ingestion for policy until the configured date. The time should be in
# RFC3339 format. The policy is based on the policy_stream_mapping
# configuration.
[block_ingestion_policy_until: <map of string to Time>]

# Block ingestion until the configured date. The time should be in RFC3339
# format.
# CLI flag: -limits.block-ingestion-until
Expand Down
34 changes: 17 additions & 17 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,9 +550,24 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
err := fmt.Errorf(validation.MissingEnforcedLabelsErrorMsg, strings.Join(lbsMissing, ","), tenantID)
d.writeFailuresManager.Log(tenantID, err)
validationErrors.Add(err)
validation.DiscardedSamples.WithLabelValues(validation.MissingEnforcedLabels, tenantID, retentionHours, policy).Add(float64(len(stream.Entries)))
discardedBytes := util.EntriesTotalSize(stream.Entries)
validation.DiscardedBytes.WithLabelValues(validation.MissingEnforcedLabels, tenantID, retentionHours, policy).Add(float64(discardedBytes))
d.validator.reportDiscardedData(ctx, validation.MissingEnforcedLabels, validationContext, lbs, retentionHours, policy, discardedBytes)
continue
}

if block, statusCode, reason, err := d.validator.ShouldBlockIngestion(validationContext, now, policy); block {
d.writeFailuresManager.Log(tenantID, err)
discardedBytes := util.EntriesTotalSize(stream.Entries)
d.validator.reportDiscardedData(ctx, reason, validationContext, lbs, retentionHours, policy, discardedBytes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this 👏 Maybe worth a followup PR making it consistent across all the function among other error handles.


// If the status code is 200, return success.
// Note that we still log the error and increment the metrics.
if statusCode == http.StatusOK {
// do not add error to validationErrors.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not clear on what this comment means 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previous and following errors do add the error to the validationErrors slice. We don't want to add it when the statusCode is success otherwise we'll return an error when t he only error that happened was this one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah damn I read "do not and error" which made no sense to me 😂 "add" instead of "and" makes 100% sense 🤦

continue
}

validationErrors.Add(err)
continue
}

Expand Down Expand Up @@ -639,21 +654,6 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return &logproto.PushResponse{}, validationErr
}

if block, until, retStatusCode := d.validator.ShouldBlockIngestion(validationContext, now); block {
d.trackDiscardedData(ctx, req, validationContext, tenantID, validationContext.validationMetrics, validation.BlockedIngestion)

err = fmt.Errorf(validation.BlockedIngestionErrorMsg, tenantID, until.Format(time.RFC3339), retStatusCode)
d.writeFailuresManager.Log(tenantID, err)

// If the status code is 200, return success.
// Note that we still log the error and increment the metrics.
if retStatusCode == http.StatusOK {
return &logproto.PushResponse{}, nil
}

return nil, httpgrpc.Errorf(retStatusCode, "%s", err.Error())
}

if !d.ingestionRateLimiter.AllowN(now, tenantID, validationContext.validationMetrics.aggregatedPushStats.lineSize) {
d.trackDiscardedData(ctx, req, validationContext, tenantID, validationContext.validationMetrics, validation.RateLimited)

Expand Down
105 changes: 102 additions & 3 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@ import (

otlptranslate "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus"

"github.com/grafana/loki/pkg/push"

"github.com/c2h5oh/datasize"
"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
dskit_flagext "github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/kv/consul"
Expand All @@ -38,6 +37,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/loki/pkg/push"
"github.com/grafana/loki/v3/pkg/ingester"
"github.com/grafana/loki/v3/pkg/ingester/client"
loghttp_push "github.com/grafana/loki/v3/pkg/loghttp/push"
Expand Down Expand Up @@ -441,6 +441,7 @@ func Test_MissingEnforcedLabels(t *testing.T) {
// request with all required labels.
lbs := labels.FromMap(map[string]string{"app": "foo", "env": "prod", "cluster": "cluster1", "namespace": "ns1"})
missing, missingLabels := distributors[0].missingEnforcedLabels(lbs, "test", "policy1")

assert.False(t, missing)
assert.Empty(t, missingLabels)

Expand Down Expand Up @@ -1672,7 +1673,105 @@ func TestDistributor_PushIngestionBlocked(t *testing.T) {
if tc.expectError {
expectedErr := fmt.Sprintf(validation.BlockedIngestionErrorMsg, "test", tc.blockUntil.Format(time.RFC3339), tc.blockStatusCode)
require.ErrorContains(t, err, expectedErr)
require.Nil(t, response)
} else {
require.NoError(t, err)
require.Equal(t, success, response)
}
})
}
}

func TestDistributor_PushIngestionBlockedByPolicy(t *testing.T) {
now := time.Now()
defaultErrCode := 260

for _, tc := range []struct {
name string
blockUntil map[string]time.Time
policy string
labels string
expectError bool
expectedErrorMsg string
yes bool
}{
{
name: "not blocked - no policy block configured",
policy: "test-policy",
labels: `{foo="bar"}`,
expectError: false,
},
{
name: "not blocked - policy block expired",
blockUntil: map[string]time.Time{
"test-policy": now.Add(-1 * time.Hour),
},
policy: "test-policy",
labels: `{foo="bar"}`,
expectError: false,
},
{
name: "blocked - policy block active",
blockUntil: map[string]time.Time{
"test-policy": now.Add(1 * time.Hour),
},
policy: "test-policy",
labels: `{foo="bar"}`,
expectError: true,
expectedErrorMsg: fmt.Sprintf(validation.BlockedIngestionPolicyErrorMsg, "test", now.Add(1*time.Hour).Format(time.RFC3339), defaultErrCode),
yes: true,
},
{
name: "not blocked - different policy",
blockUntil: map[string]time.Time{
"blocked-policy": now.Add(1 * time.Hour),
},
policy: "test-policy",
labels: `{foo="bar"}`,
expectError: false,
},
{
name: "blocked - custom status code",
blockUntil: map[string]time.Time{
"test-policy": now.Add(1 * time.Hour),
},
policy: "test-policy",
labels: `{foo="bar"}`,
expectError: true,
expectedErrorMsg: fmt.Sprintf(validation.BlockedIngestionPolicyErrorMsg, "test", now.Add(1*time.Hour).Format(time.RFC3339), defaultErrCode),
},
} {
t.Run(tc.name, func(t *testing.T) {
if !tc.yes {
return
}
limits := &validation.Limits{}
flagext.DefaultValues(limits)

// Configure policy mapping
limits.PolicyStreamMapping = validation.PolicyStreamMapping{
tc.policy: []*validation.PriorityStream{
{
Selector: tc.labels,
Priority: 1,
},
},
}

// Configure policy blocks
if tc.blockUntil != nil {
limits.BlockIngestionPolicyUntil = make(map[string]dskit_flagext.Time)
for policy, until := range tc.blockUntil {
limits.BlockIngestionPolicyUntil[policy] = dskit_flagext.Time(until)
}
}

distributors, _ := prepare(t, 1, 3, limits, nil)
request := makeWriteRequestWithLabels(1, 1024, []string{tc.labels}, false, false, false)
response, err := distributors[0].Push(ctx, request)

if tc.expectError {
require.Error(t, err)
require.Contains(t, err.Error(), tc.expectedErrorMsg)
} else {
require.NoError(t, err)
require.Equal(t, success, response)
Expand Down
1 change: 1 addition & 0 deletions pkg/distributor/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Limits interface {

BlockIngestionUntil(userID string) time.Time
BlockIngestionStatusCode(userID string) int
BlockIngestionPolicyUntil(userID string, policy string) time.Time
EnforcedLabels(userID string) []string
PolicyEnforcedLabels(userID string, policy string) []string

Expand Down
77 changes: 44 additions & 33 deletions pkg/distributor/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,21 +104,13 @@ func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, la
// Makes time string on the error message formatted consistently.
formatedEntryTime := entry.Timestamp.Format(timeFormat)
formatedRejectMaxAgeTime := time.Unix(0, vCtx.rejectOldSampleMaxAge).Format(timeFormat)
validation.DiscardedSamples.WithLabelValues(validation.GreaterThanMaxSampleAge, vCtx.userID, retentionHours, policy).Inc()
validation.DiscardedBytes.WithLabelValues(validation.GreaterThanMaxSampleAge, vCtx.userID, retentionHours, policy).Add(entrySize)
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.GreaterThanMaxSampleAge, labels, entrySize)
}
v.reportDiscardedData(ctx, validation.GreaterThanMaxSampleAge, vCtx, labels, retentionHours, policy, int(entrySize))
return fmt.Errorf(validation.GreaterThanMaxSampleAgeErrorMsg, labels, formatedEntryTime, formatedRejectMaxAgeTime)
}

if ts > vCtx.creationGracePeriod {
formatedEntryTime := entry.Timestamp.Format(timeFormat)
validation.DiscardedSamples.WithLabelValues(validation.TooFarInFuture, vCtx.userID, retentionHours, policy).Inc()
validation.DiscardedBytes.WithLabelValues(validation.TooFarInFuture, vCtx.userID, retentionHours, policy).Add(entrySize)
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.TooFarInFuture, labels, entrySize)
}
v.reportDiscardedData(ctx, validation.TooFarInFuture, vCtx, labels, retentionHours, policy, int(entrySize))
return fmt.Errorf(validation.TooFarInFutureErrorMsg, labels, formatedEntryTime)
}

Expand All @@ -127,39 +119,23 @@ func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, la
// an orthogonal concept (we need not use ValidateLabels in this context)
// but the upstream cortex_validation pkg uses it, so we keep this
// for parity.
validation.DiscardedSamples.WithLabelValues(validation.LineTooLong, vCtx.userID, retentionHours, policy).Inc()
validation.DiscardedBytes.WithLabelValues(validation.LineTooLong, vCtx.userID, retentionHours, policy).Add(entrySize)
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.LineTooLong, labels, entrySize)
}
v.reportDiscardedData(ctx, validation.LineTooLong, vCtx, labels, retentionHours, policy, int(entrySize))
return fmt.Errorf(validation.LineTooLongErrorMsg, maxSize, labels, len(entry.Line))
}

if structuredMetadataCount > 0 {
if !vCtx.allowStructuredMetadata {
validation.DiscardedSamples.WithLabelValues(validation.DisallowedStructuredMetadata, vCtx.userID, retentionHours, policy).Inc()
validation.DiscardedBytes.WithLabelValues(validation.DisallowedStructuredMetadata, vCtx.userID, retentionHours, policy).Add(entrySize)
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.DisallowedStructuredMetadata, labels, entrySize)
}
v.reportDiscardedData(ctx, validation.DisallowedStructuredMetadata, vCtx, labels, retentionHours, policy, int(entrySize))
return fmt.Errorf(validation.DisallowedStructuredMetadataErrorMsg, labels)
}

if maxSize := vCtx.maxStructuredMetadataSize; maxSize != 0 && structuredMetadataSizeBytes > maxSize {
validation.DiscardedSamples.WithLabelValues(validation.StructuredMetadataTooLarge, vCtx.userID, retentionHours, policy).Inc()
validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooLarge, vCtx.userID, retentionHours, policy).Add(entrySize)
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.StructuredMetadataTooLarge, labels, entrySize)
}
v.reportDiscardedData(ctx, validation.StructuredMetadataTooLarge, vCtx, labels, retentionHours, policy, int(entrySize))
return fmt.Errorf(validation.StructuredMetadataTooLargeErrorMsg, labels, structuredMetadataSizeBytes, vCtx.maxStructuredMetadataSize)
}

if maxCount := vCtx.maxStructuredMetadataCount; maxCount != 0 && structuredMetadataCount > maxCount {
validation.DiscardedSamples.WithLabelValues(validation.StructuredMetadataTooMany, vCtx.userID, retentionHours, policy).Inc()
validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooMany, vCtx.userID, retentionHours, policy).Add(entrySize)
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.StructuredMetadataTooMany, labels, entrySize)
}
v.reportDiscardedData(ctx, validation.StructuredMetadataTooMany, vCtx, labels, retentionHours, policy, int(entrySize))
return fmt.Errorf(validation.StructuredMetadataTooManyErrorMsg, labels, structuredMetadataCount, vCtx.maxStructuredMetadataCount)
}
}
Expand Down Expand Up @@ -208,13 +184,48 @@ func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, strea
return nil
}

func (v Validator) reportDiscardedData(ctx context.Context, reason string, vCtx validationContext, labels labels.Labels, retentionHours string, policy string, entrySize int) {
validation.DiscardedSamples.WithLabelValues(reason, vCtx.userID, retentionHours, policy).Inc()
validation.DiscardedBytes.WithLabelValues(reason, vCtx.userID, retentionHours, policy).Add(float64(entrySize))
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, reason, labels, float64(entrySize))
}
}

// ShouldBlockIngestion returns whether ingestion should be blocked, until when and the status code.
func (v Validator) ShouldBlockIngestion(ctx validationContext, now time.Time) (bool, time.Time, int) {
if ctx.blockIngestionUntil.IsZero() {
func (v Validator) ShouldBlockIngestion(ctx validationContext, now time.Time, policy string) (bool, int, string, error) {
if now.Before(ctx.blockIngestionUntil) {
err := fmt.Errorf(validation.BlockedIngestionErrorMsg, ctx.userID, ctx.blockIngestionUntil.Format(time.RFC3339), ctx.blockIngestionStatusCode)
return true, ctx.blockIngestionStatusCode, validation.BlockedIngestion, err
}

if block, until, code := v.ShouldBlockPolicy(ctx, policy, now); block {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ShouldBlockPolicy should be private shouldBlockPolicy

err := fmt.Errorf(validation.BlockedIngestionPolicyErrorMsg, ctx.userID, until.Format(time.RFC3339), code)
return true, code, validation.BlockedIngestionPolicy, err
}

return false, 0, "", nil
}

// ShouldBlockPolicy checks if ingestion should be blocked for the given policy.
// It returns true if ingestion should be blocked, along with the block until time and status code.
func (v *Validator) ShouldBlockPolicy(ctx validationContext, policy string, now time.Time) (bool, time.Time, int) {
// No policy provided, don't block
if policy == "" {
return false, time.Time{}, 0
}

// Check if this policy is blocked in tenant configs
blockUntil := v.Limits.BlockIngestionPolicyUntil(ctx.userID, policy)
if blockUntil.IsZero() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Even though now.Before(blockUntil) would be false when blockUntil.IsZero(), I like the explicit handling for the zero case. May we do the same for the ctx.blockIngestionUntil?

return false, time.Time{}, 0
}

return now.Before(ctx.blockIngestionUntil), ctx.blockIngestionUntil, ctx.blockIngestionStatusCode
if now.Before(blockUntil) {
return true, blockUntil, ctx.blockIngestionStatusCode
}

return false, time.Time{}, 0
}

func updateMetrics(reason, userID string, stream logproto.Stream, retentionHours, policy string) {
Expand Down
23 changes: 18 additions & 5 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,12 @@ type Limits struct {
OTLPConfig push.OTLPConfig `yaml:"otlp_config" json:"otlp_config" doc:"description=OTLP log ingestion configurations"`
GlobalOTLPConfig push.GlobalOTLPConfig `yaml:"-" json:"-"`

BlockIngestionUntil dskit_flagext.Time `yaml:"block_ingestion_until" json:"block_ingestion_until"`
BlockIngestionStatusCode int `yaml:"block_ingestion_status_code" json:"block_ingestion_status_code"`
EnforcedLabels []string `yaml:"enforced_labels" json:"enforced_labels" category:"experimental"`
PolicyEnforcedLabels map[string][]string `yaml:"policy_enforced_labels" json:"policy_enforced_labels" category:"experimental" doc:"description=Map of policies to enforced labels. Example:\n policy_enforced_labels: \n policy1: \n - label1 \n - label2 \n policy2: \n - label3 \n - label4"`
PolicyStreamMapping PolicyStreamMapping `yaml:"policy_stream_mapping" json:"policy_stream_mapping" category:"experimental" doc:"description=Map of policies to stream selectors with a priority. Experimental. Example:\n policy_stream_mapping: \n finance: \n - selector: '{namespace=\"prod\", container=\"billing\"}' \n priority: 2 \n ops: \n - selector: '{namespace=\"prod\", container=\"ops\"}' \n priority: 1 \n staging: \n - selector: '{namespace=\"staging\"}' \n priority: 1"`
BlockIngestionPolicyUntil map[string]dskit_flagext.Time `yaml:"block_ingestion_policy_until" json:"block_ingestion_policy_until" category:"experimental" doc:"description=Block ingestion for policy until the configured date. The time should be in RFC3339 format. The policy is based on the policy_stream_mapping configuration."`
BlockIngestionUntil dskit_flagext.Time `yaml:"block_ingestion_until" json:"block_ingestion_until" category:"experimental"`
BlockIngestionStatusCode int `yaml:"block_ingestion_status_code" json:"block_ingestion_status_code"`
EnforcedLabels []string `yaml:"enforced_labels" json:"enforced_labels" category:"experimental"`
PolicyEnforcedLabels map[string][]string `yaml:"policy_enforced_labels" json:"policy_enforced_labels" category:"experimental" doc:"description=Map of policies to enforced labels. Example:\n policy_enforced_labels: \n policy1: \n - label1 \n - label2 \n policy2: \n - label3 \n - label4"`
PolicyStreamMapping PolicyStreamMapping `yaml:"policy_stream_mapping" json:"policy_stream_mapping" category:"experimental" doc:"description=Map of policies to stream selectors with a priority. Experimental. Example:\n policy_stream_mapping: \n finance: \n - selector: '{namespace=\"prod\", container=\"billing\"}' \n priority: 2 \n ops: \n - selector: '{namespace=\"prod\", container=\"ops\"}' \n priority: 1 \n staging: \n - selector: '{namespace=\"staging\"}' \n priority: 1"`

IngestionPartitionsTenantShardSize int `yaml:"ingestion_partitions_tenant_shard_size" json:"ingestion_partitions_tenant_shard_size" category:"experimental"`

Expand Down Expand Up @@ -1122,6 +1123,18 @@ func (o *Overrides) BlockIngestionStatusCode(userID string) int {
return o.getOverridesForUser(userID).BlockIngestionStatusCode
}

func (o *Overrides) BlockIngestionPolicyUntil(userID string, policy string) time.Time {
limits := o.getOverridesForUser(userID)
if limits == nil || limits.BlockIngestionPolicyUntil == nil {
return time.Time{} // Zero time means no blocking
}

if blockUntil, ok := limits.BlockIngestionPolicyUntil[policy]; ok {
return time.Time(blockUntil)
}
return time.Time{} // Zero time means no blocking
}

func (o *Overrides) EnforcedLabels(userID string) []string {
return o.getOverridesForUser(userID).EnforcedLabels
}
Expand Down
Loading