diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index d602c48e058ec..dea698b032289 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -3941,6 +3941,25 @@ otlp_config: # CLI flag: -limits.block-ingestion-status-code [block_ingestion_status_code: | default = 260] +# Block ingestion until the given time for the given policy. Pushes will be +# assigned to a policy based on the stream matcher configuration. Experimental. +[block_policy_ingestion_until: ] + +# HTTP status code to return when ingestion is blocked for the given policy. +# Experimental. +[block_policy_ingestion_status_code: ] + +# Map of policies to stream selectors. Push streams that matches a policy +# selector will be considered as belonging to that policy. If that policy is +# blocked, the push will be rejected with the status code specified in +# block_policy_ingestion_status_code. Experimental. +[policy_stream_mapping: ] + +# Map of policies to enforced labels. Push streams that matches a policy +# selector will be considered as belonging to that policy and as such, the +# labels related to the policy will be enforced. Experimental. +[policy_enforced_labels: ] + # List of labels that must be present in the stream. If any of the labels are # missing, the stream will be discarded. This flag configures it globally for # all tenants. Experimental. diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 6ede42aab1c20..cde4cf07cb750 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -453,8 +453,10 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log // We use the heuristic of 1 sample per TS to size the array. // We also work out the hash value at the same time. streams := make([]KeyedStream, 0, len(req.Streams)) - validatedLineSize := 0 - validatedLineCount := 0 + validatedLineCountTotal := 0 + validatedLineSizeTotal := 0 + validatedLineSizePerRetention := make(map[string]int) // map of retention period to validated line size + validatedLineCountPerRetention := make(map[string]int) // map of retention period to validated line count var validationErrors util.GroupedErrors @@ -512,33 +514,35 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log // Truncate first so subsequent steps have consistent line lengths d.truncateLines(validationContext, &stream) + tenantRetention := d.validator.Limits.RetentionPeriod(tenantID) + + initialEntriesSize := util.EntriesTotalSize(stream.Entries) + var lbs labels.Labels lbs, stream.Labels, stream.Hash, err = d.parseStreamLabels(validationContext, stream.Labels, stream) if err != nil { d.writeFailuresManager.Log(tenantID, err) validationErrors.Add(err) - validation.DiscardedSamples.WithLabelValues(validation.InvalidLabels, tenantID).Add(float64(len(stream.Entries))) - discardedBytes := util.EntriesTotalSize(stream.Entries) - validation.DiscardedBytes.WithLabelValues(validation.InvalidLabels, tenantID).Add(float64(discardedBytes)) + validation.DiscardedSamples.WithLabelValues(validation.InvalidLabels, tenantID, tenantRetention.String()).Add(float64(len(stream.Entries))) + validation.DiscardedBytes.WithLabelValues(validation.InvalidLabels, tenantID, tenantRetention.String()).Add(float64(initialEntriesSize)) continue } - if missing, lbsMissing := d.missingEnforcedLabels(lbs, tenantID); missing { - err := fmt.Errorf(validation.MissingEnforcedLabelsErrorMsg, strings.Join(lbsMissing, ","), tenantID) + if errorLbValue, err := d.missingEnforcedLabels(lbs, tenantID); err != nil { d.writeFailuresManager.Log(tenantID, err) validationErrors.Add(err) - validation.DiscardedSamples.WithLabelValues(validation.MissingEnforcedLabels, tenantID).Add(float64(len(stream.Entries))) - discardedBytes := util.EntriesTotalSize(stream.Entries) - validation.DiscardedBytes.WithLabelValues(validation.MissingEnforcedLabels, tenantID).Add(float64(discardedBytes)) + validation.DiscardedSamples.WithLabelValues(errorLbValue, tenantID, tenantRetention.String()).Add(float64(len(stream.Entries))) + validation.DiscardedBytes.WithLabelValues(errorLbValue, tenantID, tenantRetention.String()).Add(float64(initialEntriesSize)) continue } n := 0 pushSize := 0 prevTs := stream.Entries[0].Timestamp + streamRetention := d.retentionPeriodForStream(lbs, tenantID) for _, entry := range stream.Entries { - if err := d.validator.ValidateEntry(ctx, validationContext, lbs, entry); err != nil { + if err := d.validator.ValidateEntry(ctx, validationContext, lbs, entry, streamRetention); err != nil { d.writeFailuresManager.Log(tenantID, err) validationErrors.Add(err) continue @@ -593,16 +597,36 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log } n++ - validatedLineSize += util.EntryTotalSize(&entry) - validatedLineCount++ - pushSize += len(entry.Line) + + entrySize := util.EntryTotalSize(&entry) + validatedLineSizePerRetention[streamRetention.String()] += entrySize + validatedLineCountPerRetention[streamRetention.String()]++ + validatedLineSizeTotal += entrySize + validatedLineCountTotal++ + pushSize += entrySize } + stream.Entries = stream.Entries[:n] if len(stream.Entries) == 0 { // Empty stream after validating all the entries continue } + if policy := d.policyForStream(lbs, tenantID); policy != "" { + streamSize := util.EntriesTotalSize(stream.Entries) + if yes, until, retStatusCode := d.validator.ShouldBlockIngestionForPolicy(validationContext, policy, now); yes { + d.trackDiscardedDataFromPolicy(ctx, policy, tenantID, validation.BlockedPolicyIngestion, streamRetention, streamSize, lbs) + + err := fmt.Errorf(validation.BlockedPolicyIngestionErrorMsg, tenantID, until.Format(time.RFC3339), retStatusCode, policy) + d.writeFailuresManager.Log(tenantID, err) + + validationErrors.Add(err) + validation.DiscardedSamples.WithLabelValues(validation.BlockedPolicyIngestion, tenantID, streamRetention.String()).Add(float64(len(stream.Entries))) + validation.DiscardedBytes.WithLabelValues(validation.BlockedPolicyIngestion, tenantID, streamRetention.String()).Add(float64(streamSize)) + continue + } + } + maybeShardStreams(stream, lbs, pushSize) } }() @@ -617,8 +641,8 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log return &logproto.PushResponse{}, validationErr } - if block, until, retStatusCode := d.validator.ShouldBlockIngestion(validationContext, now); block { - d.trackDiscardedData(ctx, req, validationContext, tenantID, validatedLineCount, validatedLineSize, validation.BlockedIngestion) + if block, until, retStatusCode := d.validator.ShouldBlockIngestionForTenant(validationContext, now); block { + d.trackDiscardedData(ctx, req, validationContext, tenantID, validatedLineCountPerRetention, validatedLineSizePerRetention, validation.BlockedIngestion) err = fmt.Errorf(validation.BlockedIngestionErrorMsg, tenantID, until.Format(time.RFC3339), retStatusCode) d.writeFailuresManager.Log(tenantID, err) @@ -632,10 +656,10 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log return nil, httpgrpc.Errorf(retStatusCode, "%s", err.Error()) } - if !d.ingestionRateLimiter.AllowN(now, tenantID, validatedLineSize) { - d.trackDiscardedData(ctx, req, validationContext, tenantID, validatedLineCount, validatedLineSize, validation.RateLimited) + if !d.ingestionRateLimiter.AllowN(now, tenantID, validatedLineSizeTotal) { + d.trackDiscardedData(ctx, req, validationContext, tenantID, validatedLineCountPerRetention, validatedLineSizePerRetention, validation.RateLimited) - err = fmt.Errorf(validation.RateLimitedErrorMsg, tenantID, int(d.ingestionRateLimiter.Limit(now, tenantID)), validatedLineCount, validatedLineSize) + err = fmt.Errorf(validation.RateLimitedErrorMsg, tenantID, int(d.ingestionRateLimiter.Limit(now, tenantID)), validatedLineCountTotal, validatedLineSizeTotal) d.writeFailuresManager.Log(tenantID, err) // Return a 429 to indicate to the client they are being rate limited return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "%s", err.Error()) @@ -743,14 +767,39 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log } } -// missingEnforcedLabels returns true if the stream is missing any of the required labels. +func (d *Distributor) retentionPeriodForStream(lbs labels.Labels, tenantID string) time.Duration { + retentions := d.validator.Limits.StreamRetention(tenantID) + + for _, retention := range retentions { + if retention.Matches(lbs) { + return time.Duration(retention.Period) + } + } + + // No retention for this specific stream, use the default retention period. + return d.validator.Limits.RetentionPeriod(tenantID) +} + +func (d *Distributor) missingEnforcedLabels(lbs labels.Labels, tenantID string) (string, error) { + if err := d.missingTenantEnforcedLabels(lbs, tenantID); err != nil { + return validation.MissingEnforcedLabels, err + } + + if err := d.missingPolicyEnforcedLabels(lbs, tenantID); err != nil { + return validation.MissingPolicyEnforcedLabels, err + } + + return "", nil +} + +// missingTenantEnforcedLabels returns true if the stream is missing any of the required labels for the tenant. // // It also returns the first label that is missing if any (for the case of multiple labels missing). -func (d *Distributor) missingEnforcedLabels(lbs labels.Labels, tenantID string) (bool, []string) { +func (d *Distributor) missingTenantEnforcedLabels(lbs labels.Labels, tenantID string) error { requiredLbs := d.validator.Limits.EnforcedLabels(tenantID) if len(requiredLbs) == 0 { // no enforced labels configured. - return false, []string{} + return nil } missingLbs := []string{} @@ -761,7 +810,70 @@ func (d *Distributor) missingEnforcedLabels(lbs labels.Labels, tenantID string) } } - return len(missingLbs) > 0, missingLbs + if len(missingLbs) > 0 { + return fmt.Errorf(validation.MissingEnforcedLabelsErrorMsg, strings.Join(missingLbs, ","), tenantID) + } + + return nil +} + +func (d *Distributor) policyForStream(lbs labels.Labels, tenantID string) string { + policyStreamMapping := d.validator.Limits.PolicyStreamMapping(tenantID) + if len(policyStreamMapping) == 0 { + // not configured. + return "" + } + + for policy, streamSelector := range policyStreamMapping { + matchers, err := syntax.ParseMatchers(streamSelector, true) + if err != nil { + level.Error(d.logger).Log("msg", "failed to parse stream selector", "error", err, "stream_selector", streamSelector, "tenant_id", tenantID) + continue + } + + if validation.LabelMatchesMatchers(lbs, matchers) { + return policy + } + } + + return "" +} + +func (d *Distributor) missingPolicyEnforcedLabels(lbs labels.Labels, tenantID string) error { + policyEnforcedLabels := d.validator.Limits.PolicyEnforcedLabels(tenantID) + if len(policyEnforcedLabels) == 0 { + // not configured. + return nil + } + + policy := d.policyForStream(lbs, tenantID) + if policy == "" { + // no policy found for stream. + return nil + } + + missingLbs := []string{} + + for _, lb := range policyEnforcedLabels[policy] { + if !lbs.Has(lb) { + missingLbs = append(missingLbs, lb) + } + } + + if len(missingLbs) > 0 { + return fmt.Errorf(validation.MissingPolicyEnforcedLabelsErrorMsg, strings.Join(missingLbs, ","), policy, tenantID) + } + + return nil +} + +func (d *Distributor) trackDiscardedDataFromPolicy(ctx context.Context, policy string, tenantID string, reason string, retention time.Duration, discardedStreamBytes int, lbs labels.Labels) { + validation.DiscardedSamplesByPolicy.WithLabelValues(reason, policy, tenantID, retention.String()).Add(float64(1)) + validation.DiscardedBytesByPolicy.WithLabelValues(reason, policy, tenantID, retention.String()).Add(float64(discardedStreamBytes)) + + if d.usageTracker != nil { + d.usageTracker.DiscardedBytesAddByPolicy(ctx, tenantID, policy, reason, retention, lbs, float64(discardedStreamBytes)) + } } func (d *Distributor) trackDiscardedData( @@ -769,12 +881,14 @@ func (d *Distributor) trackDiscardedData( req *logproto.PushRequest, validationContext validationContext, tenantID string, - validatedLineCount int, - validatedLineSize int, + validatedLineCount map[string]int, + validatedLineSize map[string]int, reason string, ) { - validation.DiscardedSamples.WithLabelValues(reason, tenantID).Add(float64(validatedLineCount)) - validation.DiscardedBytes.WithLabelValues(reason, tenantID).Add(float64(validatedLineSize)) + for retention, count := range validatedLineCount { + validation.DiscardedSamples.WithLabelValues(reason, tenantID, retention).Add(float64(count)) + validation.DiscardedBytes.WithLabelValues(reason, tenantID, retention).Add(float64(validatedLineSize[retention])) + } if d.usageTracker != nil { for _, stream := range req.Streams { diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 91d3fcdf1367b..361497efd3d69 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -17,8 +17,6 @@ import ( otlptranslate "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus" - "github.com/grafana/loki/pkg/push" - "github.com/c2h5oh/datasize" "github.com/go-kit/log" "github.com/grafana/dskit/flagext" @@ -38,6 +36,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" + "github.com/grafana/loki/pkg/push" "github.com/grafana/loki/v3/pkg/ingester" "github.com/grafana/loki/v3/pkg/ingester/client" loghttp_push "github.com/grafana/loki/v3/pkg/loghttp/push" @@ -436,21 +435,21 @@ func Test_MissingEnforcedLabels(t *testing.T) { // request with all required labels. lbs := labels.FromMap(map[string]string{"app": "foo", "env": "prod"}) - missing, missingLabels := distributors[0].missingEnforcedLabels(lbs, "test") - assert.False(t, missing) - assert.Empty(t, missingLabels) + errLabel, err := distributors[0].missingEnforcedLabels(lbs, "test") + assert.NoError(t, err) + assert.Empty(t, errLabel) // request missing the `app` label. lbs = labels.FromMap(map[string]string{"env": "prod"}) - missing, missingLabels = distributors[0].missingEnforcedLabels(lbs, "test") - assert.True(t, missing) - assert.EqualValues(t, []string{"app"}, missingLabels) + errLabel, err = distributors[0].missingEnforcedLabels(lbs, "test") + assert.Error(t, err) + assert.Equal(t, errLabel, validation.MissingEnforcedLabels) // request missing all required labels. lbs = labels.FromMap(map[string]string{"pod": "distributor-abc"}) - missing, missingLabels = distributors[0].missingEnforcedLabels(lbs, "test") - assert.True(t, missing) - assert.EqualValues(t, []string{"app", "env"}, missingLabels) + errLabel, err = distributors[0].missingEnforcedLabels(lbs, "test") + assert.Error(t, err) + assert.Equal(t, errLabel, validation.MissingEnforcedLabels) } func Test_PushWithEnforcedLabels(t *testing.T) { @@ -1623,6 +1622,106 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) { } } +func TestDistributor_PushIngestionBlockedPolicy(t *testing.T) { + oneHourAgo := time.Now().Add(-1 * time.Hour) + afterOneHour := time.Now().Add(1 * time.Hour) + + for _, tc := range []struct { + name string + blockUntilPolicy map[string]flagext.Time + blockStatusCodeScope map[string]int + policyStreamMapping map[string]string + expectError bool + expectedValidationError error + expectedStatusCode int + requestLbs []string + }{ + { + name: "not configured", + expectedStatusCode: http.StatusOK, + requestLbs: []string{`{team="squad-1", env="dev"}`, `{team="squad-1", env="prod"}`, `{team="squad-2", env="dev"}`, `{team="squad-2", env="prod"}`}, + }, + { + name: "not blocked", + blockUntilPolicy: map[string]flagext.Time{ + "policy1": flagext.Time(oneHourAgo), + "policy2": flagext.Time(oneHourAgo), + }, + blockStatusCodeScope: map[string]int{ + "policy1": http.StatusOK, + "policy2": http.StatusOK, + }, + policyStreamMapping: map[string]string{ + "policy1": `{team="squad-1", env="prod"}`, + "policy2": `{team="squad-1", env="dev"}`, + "policy3": `{team="squad-2", env="prod"}`, + }, + expectError: false, + expectedStatusCode: http.StatusOK, + requestLbs: []string{`{team="squad-1", env="prod"}`, `{team="squad-1", env="dev"}`, `{team="squad-2", env="prod"}`, `{team="squad-2", env="dev"}`}, + }, + { + name: "blocked", + blockUntilPolicy: map[string]flagext.Time{ + "policy1": flagext.Time(afterOneHour), + "policy2": flagext.Time(oneHourAgo), + }, + blockStatusCodeScope: map[string]int{ + "policy1": 456, + "policy2": 456, + }, + policyStreamMapping: map[string]string{ + "policy1": `{team="squad-1", env="prod"}`, + "policy2": `{team="squad-1", env="dev"}`, + "policy3": `{team="squad-2", env="prod"}`, + }, + expectError: true, + expectedValidationError: httpgrpc.Errorf(400, validation.BlockedPolicyIngestionErrorMsg, "test", afterOneHour.Format(time.RFC3339), 456, "policy1"), + expectedStatusCode: 456, + requestLbs: []string{`{team="squad-1", env="prod"}`, `{team="squad-1", env="dev"}`, `{team="squad-2", env="prod"}`, `{team="squad-2", env="dev"}`}, + }, + { + name: "blocked with status code 200", + blockUntilPolicy: map[string]flagext.Time{ + "scope1": flagext.Time(afterOneHour), + "scope2": flagext.Time(oneHourAgo), + }, + blockStatusCodeScope: map[string]int{ + "scope1": http.StatusOK, + "scope2": http.StatusOK, + }, + policyStreamMapping: map[string]string{ + "policy1": `{team="squad-1", env="prod"}`, + "policy2": `{team="squad-1", env="dev"}`, + "policy3": `{team="squad-2", env="prod"}`, + }, + expectError: false, + expectedStatusCode: http.StatusOK, + requestLbs: []string{`{team="squad-1", env="prod"}`, `{team="squad-1", env="dev"}`, `{team="squad-2", env="prod"}`, `{team="squad-2", env="dev"}`}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + limits := &validation.Limits{} + flagext.DefaultValues(limits) + limits.BlockPolicyIngestionUntil = tc.blockUntilPolicy + limits.BlockPolicyIngestionStatusCode = tc.blockStatusCodeScope + limits.PolicyStreamMapping = tc.policyStreamMapping + + distributors, _ := prepare(t, 1, 3, limits, nil) + request := makeWriteRequestWithLabels(10, 10, tc.requestLbs, false, false, false) + response, err := distributors[0].Push(ctx, request) + + if tc.expectError { + require.Error(t, err) + require.EqualError(t, err, tc.expectedValidationError.Error()) + } else { + require.NoError(t, err) + require.Equal(t, success, response) + } + }) + } +} + func TestDistributor_PushIngestionBlocked(t *testing.T) { for _, tc := range []struct { name string diff --git a/pkg/distributor/limits.go b/pkg/distributor/limits.go index 62098dac6d96f..931c028e4becc 100644 --- a/pkg/distributor/limits.go +++ b/pkg/distributor/limits.go @@ -3,6 +3,8 @@ package distributor import ( "time" + "github.com/grafana/dskit/flagext" + "github.com/grafana/loki/v3/pkg/compactor/retention" "github.com/grafana/loki/v3/pkg/distributor/shardstreams" "github.com/grafana/loki/v3/pkg/loghttp/push" @@ -38,6 +40,10 @@ type Limits interface { BlockIngestionUntil(userID string) time.Time BlockIngestionStatusCode(userID string) int + BlockPolicyIngestionUntil(userID string) map[string]flagext.Time + BlockPolicyIngestionStatusCode(userID string) map[string]int + PolicyStreamMapping(userID string) map[string]string // map of policy name to stream matching regex. + PolicyEnforcedLabels(userID string) map[string][]string // map of policy name to enforced labels. EnforcedLabels(userID string) []string IngestionPartitionsTenantShardSize(userID string) int diff --git a/pkg/distributor/validator.go b/pkg/distributor/validator.go index 5aea652225a56..042a85ac509c5 100644 --- a/pkg/distributor/validator.go +++ b/pkg/distributor/validator.go @@ -7,6 +7,7 @@ import ( "strings" "time" + "github.com/grafana/dskit/flagext" "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/v3/pkg/loghttp/push" @@ -53,40 +54,47 @@ type validationContext struct { maxStructuredMetadataSize int maxStructuredMetadataCount int - blockIngestionUntil time.Time - blockIngestionStatusCode int - enforcedLabels []string + blockIngestionUntil time.Time + blockIngestionStatusCode int + blockPolicyIngestionUntil map[string]flagext.Time + blockPolicyIngestionStatusCode map[string]int + policyStreamMapping map[string]string + policyEnforcedLabels map[string][]string + enforcedLabels []string userID string } func (v Validator) getValidationContextForTime(now time.Time, userID string) validationContext { return validationContext{ - userID: userID, - rejectOldSample: v.RejectOldSamples(userID), - rejectOldSampleMaxAge: now.Add(-v.RejectOldSamplesMaxAge(userID)).UnixNano(), - creationGracePeriod: now.Add(v.CreationGracePeriod(userID)).UnixNano(), - maxLineSize: v.MaxLineSize(userID), - maxLineSizeTruncate: v.MaxLineSizeTruncate(userID), - maxLabelNamesPerSeries: v.MaxLabelNamesPerSeries(userID), - maxLabelNameLength: v.MaxLabelNameLength(userID), - maxLabelValueLength: v.MaxLabelValueLength(userID), - incrementDuplicateTimestamps: v.IncrementDuplicateTimestamps(userID), - discoverServiceName: v.DiscoverServiceName(userID), - discoverLogLevels: v.DiscoverLogLevels(userID), - logLevelFields: v.LogLevelFields(userID), - discoverGenericFields: v.DiscoverGenericFields(userID), - allowStructuredMetadata: v.AllowStructuredMetadata(userID), - maxStructuredMetadataSize: v.MaxStructuredMetadataSize(userID), - maxStructuredMetadataCount: v.MaxStructuredMetadataCount(userID), - blockIngestionUntil: v.BlockIngestionUntil(userID), - blockIngestionStatusCode: v.BlockIngestionStatusCode(userID), - enforcedLabels: v.EnforcedLabels(userID), + userID: userID, + rejectOldSample: v.RejectOldSamples(userID), + rejectOldSampleMaxAge: now.Add(-v.RejectOldSamplesMaxAge(userID)).UnixNano(), + creationGracePeriod: now.Add(v.CreationGracePeriod(userID)).UnixNano(), + maxLineSize: v.MaxLineSize(userID), + maxLineSizeTruncate: v.MaxLineSizeTruncate(userID), + maxLabelNamesPerSeries: v.MaxLabelNamesPerSeries(userID), + maxLabelNameLength: v.MaxLabelNameLength(userID), + maxLabelValueLength: v.MaxLabelValueLength(userID), + incrementDuplicateTimestamps: v.IncrementDuplicateTimestamps(userID), + discoverServiceName: v.DiscoverServiceName(userID), + discoverLogLevels: v.DiscoverLogLevels(userID), + logLevelFields: v.LogLevelFields(userID), + allowStructuredMetadata: v.AllowStructuredMetadata(userID), + maxStructuredMetadataSize: v.MaxStructuredMetadataSize(userID), + maxStructuredMetadataCount: v.MaxStructuredMetadataCount(userID), + blockIngestionUntil: v.BlockIngestionUntil(userID), + blockIngestionStatusCode: v.BlockIngestionStatusCode(userID), + blockPolicyIngestionUntil: v.BlockPolicyIngestionUntil(userID), + blockPolicyIngestionStatusCode: v.BlockPolicyIngestionStatusCode(userID), + policyStreamMapping: v.PolicyStreamMapping(userID), + policyEnforcedLabels: v.PolicyEnforcedLabels(userID), + enforcedLabels: v.EnforcedLabels(userID), } } // ValidateEntry returns an error if the entry is invalid and report metrics for invalid entries accordingly. -func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, labels labels.Labels, entry logproto.Entry) error { +func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, labels labels.Labels, entry logproto.Entry, retention time.Duration) error { ts := entry.Timestamp.UnixNano() validation.LineLengthHist.Observe(float64(len(entry.Line))) structuredMetadataCount := len(entry.StructuredMetadata) @@ -97,8 +105,8 @@ func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, la // Makes time string on the error message formatted consistently. formatedEntryTime := entry.Timestamp.Format(timeFormat) formatedRejectMaxAgeTime := time.Unix(0, vCtx.rejectOldSampleMaxAge).Format(timeFormat) - validation.DiscardedSamples.WithLabelValues(validation.GreaterThanMaxSampleAge, vCtx.userID).Inc() - validation.DiscardedBytes.WithLabelValues(validation.GreaterThanMaxSampleAge, vCtx.userID).Add(entrySize) + validation.DiscardedSamples.WithLabelValues(validation.GreaterThanMaxSampleAge, vCtx.userID, retention.String()).Inc() + validation.DiscardedBytes.WithLabelValues(validation.GreaterThanMaxSampleAge, vCtx.userID, retention.String()).Add(entrySize) if v.usageTracker != nil { v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.GreaterThanMaxSampleAge, labels, entrySize) } @@ -107,8 +115,8 @@ func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, la if ts > vCtx.creationGracePeriod { formatedEntryTime := entry.Timestamp.Format(timeFormat) - validation.DiscardedSamples.WithLabelValues(validation.TooFarInFuture, vCtx.userID).Inc() - validation.DiscardedBytes.WithLabelValues(validation.TooFarInFuture, vCtx.userID).Add(entrySize) + validation.DiscardedSamples.WithLabelValues(validation.TooFarInFuture, vCtx.userID, retention.String()).Inc() + validation.DiscardedBytes.WithLabelValues(validation.TooFarInFuture, vCtx.userID, retention.String()).Add(entrySize) if v.usageTracker != nil { v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.TooFarInFuture, labels, entrySize) } @@ -120,8 +128,8 @@ func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, la // an orthogonal concept (we need not use ValidateLabels in this context) // but the upstream cortex_validation pkg uses it, so we keep this // for parity. - validation.DiscardedSamples.WithLabelValues(validation.LineTooLong, vCtx.userID).Inc() - validation.DiscardedBytes.WithLabelValues(validation.LineTooLong, vCtx.userID).Add(entrySize) + validation.DiscardedSamples.WithLabelValues(validation.LineTooLong, vCtx.userID, retention.String()).Inc() + validation.DiscardedBytes.WithLabelValues(validation.LineTooLong, vCtx.userID, retention.String()).Add(entrySize) if v.usageTracker != nil { v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.LineTooLong, labels, entrySize) } @@ -130,8 +138,8 @@ func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, la if structuredMetadataCount > 0 { if !vCtx.allowStructuredMetadata { - validation.DiscardedSamples.WithLabelValues(validation.DisallowedStructuredMetadata, vCtx.userID).Inc() - validation.DiscardedBytes.WithLabelValues(validation.DisallowedStructuredMetadata, vCtx.userID).Add(entrySize) + validation.DiscardedSamples.WithLabelValues(validation.DisallowedStructuredMetadata, vCtx.userID, retention.String()).Inc() + validation.DiscardedBytes.WithLabelValues(validation.DisallowedStructuredMetadata, vCtx.userID, retention.String()).Add(entrySize) if v.usageTracker != nil { v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.DisallowedStructuredMetadata, labels, entrySize) } @@ -139,8 +147,8 @@ func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, la } if maxSize := vCtx.maxStructuredMetadataSize; maxSize != 0 && structuredMetadataSizeBytes > maxSize { - validation.DiscardedSamples.WithLabelValues(validation.StructuredMetadataTooLarge, vCtx.userID).Inc() - validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooLarge, vCtx.userID).Add(entrySize) + validation.DiscardedSamples.WithLabelValues(validation.StructuredMetadataTooLarge, vCtx.userID, retention.String()).Inc() + validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooLarge, vCtx.userID, retention.String()).Add(entrySize) if v.usageTracker != nil { v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.StructuredMetadataTooLarge, labels, entrySize) } @@ -148,8 +156,8 @@ func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, la } if maxCount := vCtx.maxStructuredMetadataCount; maxCount != 0 && structuredMetadataCount > maxCount { - validation.DiscardedSamples.WithLabelValues(validation.StructuredMetadataTooMany, vCtx.userID).Inc() - validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooMany, vCtx.userID).Add(entrySize) + validation.DiscardedSamples.WithLabelValues(validation.StructuredMetadataTooMany, vCtx.userID, retention.String()).Inc() + validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooMany, vCtx.userID, retention.String()).Add(entrySize) if v.usageTracker != nil { v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.StructuredMetadataTooMany, labels, entrySize) } @@ -160,10 +168,23 @@ func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, la return nil } +func (v Validator) RetentionForStream(userID string, ls labels.Labels) time.Duration { + retentions := v.Limits.StreamRetention(userID) + for _, retention := range retentions { + if retention.Matches(ls) { + return time.Duration(retention.Period) + } + } + + return v.Limits.RetentionPeriod(userID) +} + // Validate labels returns an error if the labels are invalid func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, stream logproto.Stream) error { + streamRetention := v.RetentionForStream(ctx.userID, ls) + if len(ls) == 0 { - validation.DiscardedSamples.WithLabelValues(validation.MissingLabels, ctx.userID).Inc() + validation.DiscardedSamples.WithLabelValues(validation.MissingLabels, ctx.userID, streamRetention.String()).Inc() return fmt.Errorf(validation.MissingLabelsErrorMsg) } @@ -180,20 +201,20 @@ func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, strea } if numLabelNames > ctx.maxLabelNamesPerSeries { - updateMetrics(validation.MaxLabelNamesPerSeries, ctx.userID, stream) + updateMetrics(validation.MaxLabelNamesPerSeries, ctx.userID, stream, streamRetention) return fmt.Errorf(validation.MaxLabelNamesPerSeriesErrorMsg, stream.Labels, numLabelNames, ctx.maxLabelNamesPerSeries) } lastLabelName := "" for _, l := range ls { if len(l.Name) > ctx.maxLabelNameLength { - updateMetrics(validation.LabelNameTooLong, ctx.userID, stream) + updateMetrics(validation.LabelNameTooLong, ctx.userID, stream, streamRetention) return fmt.Errorf(validation.LabelNameTooLongErrorMsg, stream.Labels, l.Name) } else if len(l.Value) > ctx.maxLabelValueLength { - updateMetrics(validation.LabelValueTooLong, ctx.userID, stream) + updateMetrics(validation.LabelValueTooLong, ctx.userID, stream, streamRetention) return fmt.Errorf(validation.LabelValueTooLongErrorMsg, stream.Labels, l.Value) } else if cmp := strings.Compare(lastLabelName, l.Name); cmp == 0 { - updateMetrics(validation.DuplicateLabelNames, ctx.userID, stream) + updateMetrics(validation.DuplicateLabelNames, ctx.userID, stream, streamRetention) return fmt.Errorf(validation.DuplicateLabelNamesErrorMsg, stream.Labels, l.Name) } lastLabelName = l.Name @@ -201,8 +222,7 @@ func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, strea return nil } -// ShouldBlockIngestion returns whether ingestion should be blocked, until when and the status code. -func (v Validator) ShouldBlockIngestion(ctx validationContext, now time.Time) (bool, time.Time, int) { +func (v Validator) ShouldBlockIngestionForTenant(ctx validationContext, now time.Time) (bool, time.Time, int) { if ctx.blockIngestionUntil.IsZero() { return false, time.Time{}, 0 } @@ -210,8 +230,39 @@ func (v Validator) ShouldBlockIngestion(ctx validationContext, now time.Time) (b return now.Before(ctx.blockIngestionUntil), ctx.blockIngestionUntil, ctx.blockIngestionStatusCode } -func updateMetrics(reason, userID string, stream logproto.Stream) { - validation.DiscardedSamples.WithLabelValues(reason, userID).Add(float64(len(stream.Entries))) +func (v Validator) ShouldBlockIngestionForPolicy(ctx validationContext, policy string, now time.Time) (bool, time.Time, int) { + if policy == "" { + return false, time.Time{}, 0 + } + + if ctx.blockPolicyIngestionUntil == nil { + return false, time.Time{}, 0 + } + + until, ok := ctx.blockPolicyIngestionUntil[policy] + if !ok { + return false, time.Time{}, 0 + } + parsedUntil := time.Time(until) + + if parsedUntil.IsZero() { + return false, time.Time{}, 0 + } + + return now.Before(parsedUntil), parsedUntil, ctx.blockPolicyIngestionStatusCode[policy] +} + +func (v Validator) ShouldBlockPolicyIngestion(ctx validationContext, policy string, now time.Time) (bool, time.Time, int) { + ts := time.Time(ctx.blockPolicyIngestionUntil[policy]) + if ts.IsZero() { + return false, time.Time{}, 0 + } + + return now.Before(ts), ts, ctx.blockPolicyIngestionStatusCode[policy] +} + +func updateMetrics(reason, userID string, stream logproto.Stream, retention time.Duration) { + validation.DiscardedSamples.WithLabelValues(reason, userID, retention.String()).Add(float64(len(stream.Entries))) bytes := util.EntriesTotalSize(stream.Entries) - validation.DiscardedBytes.WithLabelValues(reason, userID).Add(float64(bytes)) + validation.DiscardedBytes.WithLabelValues(reason, userID, retention.String()).Add(float64(bytes)) } diff --git a/pkg/distributor/validator_test.go b/pkg/distributor/validator_test.go index 9e51099dfad38..0961ae7391e5f 100644 --- a/pkg/distributor/validator_test.go +++ b/pkg/distributor/validator_test.go @@ -130,8 +130,9 @@ func TestValidator_ValidateEntry(t *testing.T) { assert.NoError(t, err) v, err := NewValidator(o, nil) assert.NoError(t, err) + retention := o.RetentionPeriod(tt.userID) - err = v.ValidateEntry(ctx, v.getValidationContextForTime(testTime, tt.userID), testStreamLabels, tt.entry) + err = v.ValidateEntry(ctx, v.getValidationContextForTime(testTime, tt.userID), testStreamLabels, tt.entry, retention) assert.Equal(t, tt.expected, err) }) } diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index c6afcacfbdfde..0cc431bc9b7a5 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -307,7 +307,7 @@ func (i *instance) createStream(ctx context.Context, pushReqStream logproto.Stre return nil, fmt.Errorf("failed to create stream: %w", err) } - s := newStream(chunkfmt, headfmt, i.cfg, i.limiter.rateLimitStrategy, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures, i.configs) + s := newStream(chunkfmt, headfmt, i.cfg, i.limiter.rateLimitStrategy, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures, i.configs, i.evaluateRetention(labels)) // record will be nil when replaying the wal (we don't want to rewrite wal entries as we replay them). if record != nil { @@ -341,15 +341,27 @@ func (i *instance) onStreamCreationError(ctx context.Context, pushReqStream logp ) } - validation.DiscardedSamples.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(len(pushReqStream.Entries))) + retention := i.evaluateRetention(labels) + validation.DiscardedSamples.WithLabelValues(validation.StreamLimit, i.instanceID, retention.String()).Add(float64(len(pushReqStream.Entries))) bytes := util.EntriesTotalSize(pushReqStream.Entries) - validation.DiscardedBytes.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(bytes)) + validation.DiscardedBytes.WithLabelValues(validation.StreamLimit, i.instanceID, retention.String()).Add(float64(bytes)) if i.customStreamsTracker != nil { i.customStreamsTracker.DiscardedBytesAdd(ctx, i.instanceID, validation.StreamLimit, labels, float64(bytes)) } return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.StreamLimitErrorMsg, labels, i.instanceID) } +func (i *instance) evaluateRetention(labels labels.Labels) time.Duration { + streamRetention := i.limiter.limits.StreamRetention(i.instanceID) + for _, retention := range streamRetention { + if retention.Matches(labels) { + return time.Duration(retention.Period) + } + } + // no matches, return default retention + return i.limiter.limits.RetentionPeriod(i.instanceID) +} + func (i *instance) onStreamCreated(s *stream) { memoryStreams.WithLabelValues(i.instanceID).Inc() memoryStreamsLabelsBytes.Add(float64(len(s.labels.String()))) @@ -375,7 +387,7 @@ func (i *instance) createStreamByFP(ls labels.Labels, fp model.Fingerprint) (*st return nil, fmt.Errorf("failed to create stream for fingerprint: %w", err) } - s := newStream(chunkfmt, headfmt, i.cfg, i.limiter.rateLimitStrategy, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures, i.configs) + s := newStream(chunkfmt, headfmt, i.cfg, i.limiter.rateLimitStrategy, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures, i.configs, i.evaluateRetention(ls)) i.onStreamCreated(s) diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 369d0ab2d7469..43e935d70e925 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -310,12 +310,15 @@ func setupTestStreams(t *testing.T) (*instance, time.Time, int) { {Labels: "{app=\"test\",job=\"varlogs2\"}", Entries: entries(5, currentTime.Add(12*time.Nanosecond))}, } + retention, err := time.ParseDuration("720h") + require.NoError(t, err) + for _, testStream := range testStreams { stream, err := instance.getOrCreateStream(context.Background(), testStream, recordPool.GetRecord()) require.NoError(t, err) chunkfmt, headfmt, err := instance.chunkFormatAt(minTs(&testStream)) require.NoError(t, err) - chunk := newStream(chunkfmt, headfmt, cfg, limiter.rateLimitStrategy, "fake", 0, nil, true, NewStreamRateCalculator(), NilMetrics, nil, nil).NewChunk() + chunk := newStream(chunkfmt, headfmt, cfg, limiter.rateLimitStrategy, "fake", 0, nil, true, NewStreamRateCalculator(), NilMetrics, nil, nil, retention).NewChunk() for _, entry := range testStream.Entries { dup, err := chunk.Append(&entry) require.False(t, dup) @@ -573,9 +576,12 @@ func Benchmark_instance_addNewTailer(b *testing.B) { chunkfmt, headfmt, err := inst.chunkFormatAt(model.Now()) require.NoError(b, err) + retention, err := time.ParseDuration("720h") + require.NoError(b, err) + b.Run("addTailersToNewStream", func(b *testing.B) { for n := 0; n < b.N; n++ { - inst.addTailersToNewStream(newStream(chunkfmt, headfmt, nil, limiter.rateLimitStrategy, "fake", 0, lbs, true, NewStreamRateCalculator(), NilMetrics, nil, nil)) + inst.addTailersToNewStream(newStream(chunkfmt, headfmt, nil, limiter.rateLimitStrategy, "fake", 0, lbs, true, NewStreamRateCalculator(), NilMetrics, nil, nil, retention)) } }) } @@ -1564,3 +1570,6 @@ func (m *mockUsageTracker) DiscardedBytesAdd(_ context.Context, _ string, _ stri // ReceivedBytesAdd implements push.UsageTracker. func (*mockUsageTracker) ReceivedBytesAdd(_ context.Context, _ string, _ time.Duration, _ labels.Labels, _ float64) { } + +func (m *mockUsageTracker) DiscardedBytesAddByPolicy(_ context.Context, _ string, _ string, _ string, _ time.Duration, _ labels.Labels, _ float64) { +} diff --git a/pkg/ingester/limiter.go b/pkg/ingester/limiter.go index c4e64149f1658..fab09ece063e8 100644 --- a/pkg/ingester/limiter.go +++ b/pkg/ingester/limiter.go @@ -31,6 +31,8 @@ type Limits interface { MaxLocalStreamsPerUser(userID string) int MaxGlobalStreamsPerUser(userID string) int PerStreamRateLimit(userID string) validation.RateLimit + StreamRetention(userID string) []validation.StreamRetention + RetentionPeriod(userID string) time.Duration ShardStreams(userID string) shardstreams.Config IngestionPartitionsTenantShardSize(userID string) int } diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index b36cbb290db7e..33f4b7f584799 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -82,6 +82,8 @@ type stream struct { chunkHeadBlockFormat chunkenc.HeadBlockFmt configs *runtime.TenantConfigs + + retention time.Duration } type chunkDesc struct { @@ -112,6 +114,7 @@ func newStream( metrics *ingesterMetrics, writeFailures *writefailures.Manager, configs *runtime.TenantConfigs, + retention time.Duration, ) *stream { hashNoShard, _ := labels.HashWithoutLabels(make([]byte, 0, 1024), ShardLbName) return &stream{ @@ -132,7 +135,8 @@ func newStream( chunkFormat: chunkFormat, chunkHeadBlockFormat: headBlockFmt, - configs: configs, + configs: configs, + retention: retention, } } @@ -477,15 +481,15 @@ func (s *stream) reportMetrics(ctx context.Context, outOfOrderSamples, outOfOrde if s.unorderedWrites { name = validation.TooFarBehind } - validation.DiscardedSamples.WithLabelValues(name, s.tenant).Add(float64(outOfOrderSamples)) - validation.DiscardedBytes.WithLabelValues(name, s.tenant).Add(float64(outOfOrderBytes)) + validation.DiscardedSamples.WithLabelValues(name, s.tenant, s.retention.String()).Add(float64(outOfOrderSamples)) + validation.DiscardedBytes.WithLabelValues(name, s.tenant, s.retention.String()).Add(float64(outOfOrderBytes)) if usageTracker != nil { usageTracker.DiscardedBytesAdd(ctx, s.tenant, name, s.labels, float64(outOfOrderBytes)) } } if rateLimitedSamples > 0 { - validation.DiscardedSamples.WithLabelValues(validation.StreamRateLimit, s.tenant).Add(float64(rateLimitedSamples)) - validation.DiscardedBytes.WithLabelValues(validation.StreamRateLimit, s.tenant).Add(float64(rateLimitedBytes)) + validation.DiscardedSamples.WithLabelValues(validation.StreamRateLimit, s.tenant, s.retention.String()).Add(float64(rateLimitedSamples)) + validation.DiscardedBytes.WithLabelValues(validation.StreamRateLimit, s.tenant, s.retention.String()).Add(float64(rateLimitedBytes)) if usageTracker != nil { usageTracker.DiscardedBytesAdd(ctx, s.tenant, validation.StreamRateLimit, s.labels, float64(rateLimitedBytes)) } diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index 8bf7bfaf4ce98..705f95b67ecf3 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -57,6 +57,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) + retention := limiter.limits.RetentionPeriod("fake") for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { @@ -79,6 +80,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) { NilMetrics, nil, nil, + retention, ) _, err := s.Push(context.Background(), []logproto.Entry{ @@ -115,6 +117,7 @@ func TestPushDeduplication(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) + retention := limiter.limits.RetentionPeriod("fake") chunkfmt, headfmt := defaultChunkFormat(t) @@ -133,6 +136,7 @@ func TestPushDeduplication(t *testing.T) { NilMetrics, nil, nil, + retention, ) written, err := s.Push(context.Background(), []logproto.Entry{ @@ -178,6 +182,8 @@ func TestPushDeduplicationExtraMetrics(t *testing.T) { require.NoError(t, err) metrics := newIngesterMetrics(registry, "loki") + retention := limiter.limits.RetentionPeriod("fake") + s := newStream( chunkfmt, headfmt, @@ -193,6 +199,7 @@ func TestPushDeduplicationExtraMetrics(t *testing.T) { metrics, manager, runtimeCfg, + retention, ) _, err = s.Push(context.Background(), []logproto.Entry{ @@ -224,6 +231,8 @@ func TestPushRejectOldCounter(t *testing.T) { chunkfmt, headfmt := defaultChunkFormat(t) + retention := limiter.limits.RetentionPeriod("fake") + s := newStream( chunkfmt, headfmt, @@ -239,6 +248,7 @@ func TestPushRejectOldCounter(t *testing.T) { NilMetrics, nil, nil, + retention, ) // counter should be 2 now since the first line will be deduped @@ -331,7 +341,7 @@ func TestEntryErrorCorrectlyReported(t *testing.T) { limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) chunkfmt, headfmt := defaultChunkFormat(t) - + retention := limiter.limits.RetentionPeriod("fake") s := newStream( chunkfmt, headfmt, @@ -347,6 +357,7 @@ func TestEntryErrorCorrectlyReported(t *testing.T) { NilMetrics, nil, nil, + retention, ) s.highestTs = time.Now() @@ -370,7 +381,7 @@ func TestUnorderedPush(t *testing.T) { limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) chunkfmt, headfmt := defaultChunkFormat(t) - + retention := limiter.limits.RetentionPeriod("fake") s := newStream( chunkfmt, headfmt, @@ -386,6 +397,7 @@ func TestUnorderedPush(t *testing.T) { NilMetrics, nil, nil, + retention, ) for _, x := range []struct { @@ -473,7 +485,7 @@ func TestPushRateLimit(t *testing.T) { limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) chunkfmt, headfmt := defaultChunkFormat(t) - + retention := limiter.limits.RetentionPeriod("fake") s := newStream( chunkfmt, headfmt, @@ -489,6 +501,7 @@ func TestPushRateLimit(t *testing.T) { NilMetrics, nil, nil, + retention, ) entries := []logproto.Entry{ @@ -511,6 +524,7 @@ func TestPushRateLimitAllOrNothing(t *testing.T) { limits, err := validation.NewOverrides(l, nil) require.NoError(t, err) limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) + retention := limiter.limits.RetentionPeriod("fake") cfg := defaultConfig() chunkfmt, headfmt := defaultChunkFormat(t) @@ -530,6 +544,7 @@ func TestPushRateLimitAllOrNothing(t *testing.T) { NilMetrics, nil, nil, + retention, ) entries := []logproto.Entry{ @@ -550,7 +565,7 @@ func TestReplayAppendIgnoresValidityWindow(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) - + retention := limiter.limits.RetentionPeriod("fake") cfg := defaultConfig() cfg.MaxChunkAge = time.Minute chunkfmt, headfmt := defaultChunkFormat(t) @@ -570,6 +585,7 @@ func TestReplayAppendIgnoresValidityWindow(t *testing.T) { NilMetrics, nil, nil, + retention, ) base := time.Now() @@ -619,8 +635,8 @@ func Benchmark_PushStream(b *testing.B) { require.NoError(b, err) limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) chunkfmt, headfmt := defaultChunkFormat(b) - - s := newStream(chunkfmt, headfmt, &Config{MaxChunkAge: 24 * time.Hour}, limiter.rateLimitStrategy, "fake", model.Fingerprint(0), ls, true, NewStreamRateCalculator(), NilMetrics, nil, nil) + retention := limiter.limits.RetentionPeriod("fake") + s := newStream(chunkfmt, headfmt, &Config{MaxChunkAge: 24 * time.Hour}, limiter.rateLimitStrategy, "fake", model.Fingerprint(0), ls, true, NewStreamRateCalculator(), NilMetrics, nil, nil, retention) expr, err := syntax.ParseLogSelector(`{namespace="loki-dev"}`, true) require.NoError(b, err) t, err := newTailer("foo", expr, &fakeTailServer{}, 10) diff --git a/pkg/ingester/streams_map_test.go b/pkg/ingester/streams_map_test.go index 273c489d34d4a..827be11eb0a0e 100644 --- a/pkg/ingester/streams_map_test.go +++ b/pkg/ingester/streams_map_test.go @@ -15,7 +15,7 @@ func TestStreamsMap(t *testing.T) { require.NoError(t, err) limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) chunkfmt, headfmt := defaultChunkFormat(t) - + retention := limiter.limits.RetentionPeriod("fake") ss := []*stream{ newStream( chunkfmt, @@ -32,6 +32,7 @@ func TestStreamsMap(t *testing.T) { NilMetrics, nil, nil, + retention, ), newStream( chunkfmt, @@ -48,6 +49,7 @@ func TestStreamsMap(t *testing.T) { NilMetrics, nil, nil, + retention, ), } var s *stream diff --git a/pkg/loghttp/push/push_test.go b/pkg/loghttp/push/push_test.go index 54618eb3480cc..3f13079219a0f 100644 --- a/pkg/loghttp/push/push_test.go +++ b/pkg/loghttp/push/push_test.go @@ -492,3 +492,7 @@ func (t *MockCustomTracker) DiscardedBytesAdd(_ context.Context, _, _ string, la func (t *MockCustomTracker) ReceivedBytesAdd(_ context.Context, _ string, _ time.Duration, labels labels.Labels, value float64) { t.receivedBytes[labels.String()] += value } + +// DiscardedBytesAddByPolicy implements CustomTracker. +func (t *MockCustomTracker) DiscardedBytesAddByPolicy(_ context.Context, _ string, _ string, _ string, _ time.Duration, _ labels.Labels, _ float64) { +} diff --git a/pkg/loghttp/push/usage_tracker.go b/pkg/loghttp/push/usage_tracker.go index 8a75a1c7a563f..455cbd0751aec 100644 --- a/pkg/loghttp/push/usage_tracker.go +++ b/pkg/loghttp/push/usage_tracker.go @@ -14,4 +14,7 @@ type UsageTracker interface { // DiscardedBytesAdd records discarded bytes by tenant and labels. DiscardedBytesAdd(ctx context.Context, tenant, reason string, labels labels.Labels, value float64) + + // DiscardedBytesAddByPolicy records discarded bytes by tenant, policy, retention period and labels. + DiscardedBytesAddByPolicy(ctx context.Context, tenant, policy, reason string, retentionPeriod time.Duration, labels labels.Labels, value float64) } diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 4ecc566e36389..cffb74126a3da 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -228,9 +228,13 @@ type Limits struct { OTLPConfig push.OTLPConfig `yaml:"otlp_config" json:"otlp_config" doc:"description=OTLP log ingestion configurations"` GlobalOTLPConfig push.GlobalOTLPConfig `yaml:"-" json:"-"` - BlockIngestionUntil dskit_flagext.Time `yaml:"block_ingestion_until" json:"block_ingestion_until"` - BlockIngestionStatusCode int `yaml:"block_ingestion_status_code" json:"block_ingestion_status_code"` - EnforcedLabels []string `yaml:"enforced_labels" json:"enforced_labels" category:"experimental"` + BlockIngestionUntil dskit_flagext.Time `yaml:"block_ingestion_until" json:"block_ingestion_until"` + BlockIngestionStatusCode int `yaml:"block_ingestion_status_code" json:"block_ingestion_status_code"` + BlockPolicyIngestionUntil map[string]dskit_flagext.Time `yaml:"block_policy_ingestion_until" json:"block_policy_ingestion_until" category:"experimental" doc:"description=Block ingestion until the given time for the given policy. Pushes will be assigned to a policy based on the stream matcher configuration. Experimental."` + BlockPolicyIngestionStatusCode map[string]int `yaml:"block_policy_ingestion_status_code" json:"block_policy_ingestion_status_code" category:"experimental" doc:"description=HTTP status code to return when ingestion is blocked for the given policy. Experimental."` + PolicyStreamMapping map[string]string `yaml:"policy_stream_mapping" json:"policy_stream_mapping" category:"experimental" doc:"description=Map of policies to stream selectors. Push streams that matches a policy selector will be considered as belonging to that policy. If that policy is blocked, the push will be rejected with the status code specified in block_policy_ingestion_status_code. Experimental."` + PolicyEnforcedLabels map[string][]string `yaml:"policy_enforced_labels" json:"policy_enforced_labels" category:"experimental" doc:"description=Map of policies to enforced labels. Push streams that matches a policy selector will be considered as belonging to that policy and as such, the labels related to the policy will be enforced. Experimental."` + EnforcedLabels []string `yaml:"enforced_labels" json:"enforced_labels" category:"experimental"` IngestionPartitionsTenantShardSize int `yaml:"ingestion_partitions_tenant_shard_size" json:"ingestion_partitions_tenant_shard_size" category:"experimental"` @@ -259,6 +263,19 @@ type StreamRetention struct { Matchers []*labels.Matcher `yaml:"-" json:"-"` // populated during validation. } +func (r *StreamRetention) Matches(lbs labels.Labels) bool { + return LabelMatchesMatchers(lbs, r.Matchers) +} + +func LabelMatchesMatchers(lbs labels.Labels, matchers []*labels.Matcher) bool { + for _, matcher := range matchers { + if !matcher.Matches(lbs.Get(matcher.Name)) { + return false + } + } + return true +} + // LimitError are errors that do not comply with the limits specified. type LimitError string @@ -447,7 +464,6 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.Var(&l.BlockIngestionUntil, "limits.block-ingestion-until", "Block ingestion until the configured date. The time should be in RFC3339 format.") f.IntVar(&l.BlockIngestionStatusCode, "limits.block-ingestion-status-code", defaultBlockedIngestionStatusCode, "HTTP status code to return when ingestion is blocked. If 200, the ingestion will be blocked without returning an error to the client. By Default, a custom status code (260) is returned to the client along with an error message.") f.Var((*dskit_flagext.StringSlice)(&l.EnforcedLabels), "validation.enforced-labels", "List of labels that must be present in the stream. If any of the labels are missing, the stream will be discarded. This flag configures it globally for all tenants. Experimental.") - f.IntVar(&l.IngestionPartitionsTenantShardSize, "limits.ingestion-partition-tenant-shard-size", 0, "The number of partitions a tenant's data should be sharded to when using kafka ingestion. Tenants are sharded across partitions using shuffle-sharding. 0 disables shuffle sharding and tenant is sharded across all partitions.") _ = l.PatternIngesterTokenizableJSONFieldsDefault.Set("log,message,msg,msg_,_msg,content") @@ -611,6 +627,26 @@ func (o *Overrides) IngestionBurstSizeBytes(userID string) int { return int(o.getOverridesForUser(userID).IngestionBurstSizeMB * bytesInMB) } +func (o *Overrides) BlockPolicyIngestionUntil(userID string) map[string]dskit_flagext.Time { + return o.getOverridesForUser(userID).BlockPolicyIngestionUntil +} + +func (o *Overrides) BlockPolicyIngestionStatusCode(userID string) map[string]int { + return o.getOverridesForUser(userID).BlockPolicyIngestionStatusCode +} + +func (o *Overrides) PolicyStreamMapping(userID string) map[string]string { + return o.getOverridesForUser(userID).PolicyStreamMapping +} + +func (o *Overrides) EnforcedLabels(userID string) []string { + return o.getOverridesForUser(userID).EnforcedLabels +} + +func (o *Overrides) PolicyEnforcedLabels(userID string) map[string][]string { + return o.getOverridesForUser(userID).PolicyEnforcedLabels +} + // MaxLabelNameLength returns maximum length a label name can be. func (o *Overrides) MaxLabelNameLength(userID string) int { return o.getOverridesForUser(userID).MaxLabelNameLength @@ -1113,10 +1149,6 @@ func (o *Overrides) BlockIngestionStatusCode(userID string) int { return o.getOverridesForUser(userID).BlockIngestionStatusCode } -func (o *Overrides) EnforcedLabels(userID string) []string { - return o.getOverridesForUser(userID).EnforcedLabels -} - func (o *Overrides) ShardAggregations(userID string) []string { return o.getOverridesForUser(userID).ShardAggregations } diff --git a/pkg/validation/limits_test.go b/pkg/validation/limits_test.go index 845ec805c5b37..864f5014c7c8e 100644 --- a/pkg/validation/limits_test.go +++ b/pkg/validation/limits_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/grafana/dskit/flagext" "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -71,6 +72,15 @@ ruler_max_rules_per_rule_group: 210 ruler_max_rule_groups_per_tenant: 220 ruler_remote_write_sigv4_config: region: us-east-1 +policy_stream_mapping: + policy1: "{team=\"squad-1\", env=\"prod\"}" + policy2: "{team=\"squad-2\", env=\"dev\"}" + policy3: "{team=\"squad-1\", env=\"prod\"}" +enforced_labels: + - team +block_policy_ingestion_status_code: + policy1: 400 + policy2: 400 per_tenant_override_config: "" per_tenant_override_period: 230s query_timeout: 5m @@ -117,6 +127,16 @@ volume_max_series: 10001 "ruler_remote_write_sigv4_config": { "region": "us-east-1" }, + "policy_stream_mapping": { + "policy1": "{team=\"squad-1\", env=\"prod\"}", + "policy2": "{team=\"squad-2\", env=\"dev\"}", + "policy3": "{team=\"squad-1\", env=\"prod\"}" + }, + "enforced_labels": ["team"], + "block_policy_ingestion_status_code": { + "policy1": 400, + "policy2": 400 + }, "per_tenant_override_config": "", "per_tenant_override_period": "230s", "query_timeout": "5m", @@ -226,8 +246,12 @@ ruler_remote_write_headers: Selector: `{a="b"}`, }, }, - OTLPConfig: defaultOTLPConfig, - EnforcedLabels: []string{}, + OTLPConfig: defaultOTLPConfig, + BlockPolicyIngestionUntil: map[string]flagext.Time{}, + BlockPolicyIngestionStatusCode: map[string]int{}, + PolicyStreamMapping: map[string]string{}, + EnforcedLabels: []string{}, + PolicyEnforcedLabels: map[string][]string{}, }, }, { @@ -246,8 +270,12 @@ ruler_remote_write_headers: Selector: `{a="b"}`, }, }, - OTLPConfig: defaultOTLPConfig, - EnforcedLabels: []string{}, + OTLPConfig: defaultOTLPConfig, + BlockPolicyIngestionUntil: map[string]flagext.Time{}, + BlockPolicyIngestionStatusCode: map[string]int{}, + PolicyStreamMapping: map[string]string{}, + EnforcedLabels: []string{}, + PolicyEnforcedLabels: map[string][]string{}, }, }, { @@ -269,9 +297,13 @@ retention_stream: }, // Rest from new defaults - RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"a": "b"}}, - OTLPConfig: defaultOTLPConfig, - EnforcedLabels: []string{}, + RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"a": "b"}}, + OTLPConfig: defaultOTLPConfig, + BlockPolicyIngestionUntil: map[string]flagext.Time{}, + BlockPolicyIngestionStatusCode: map[string]int{}, + PolicyStreamMapping: map[string]string{}, + EnforcedLabels: []string{}, + PolicyEnforcedLabels: map[string][]string{}, }, }, { @@ -293,8 +325,12 @@ reject_old_samples: true Selector: `{a="b"}`, }, }, - OTLPConfig: defaultOTLPConfig, - EnforcedLabels: []string{}, + OTLPConfig: defaultOTLPConfig, + BlockPolicyIngestionUntil: map[string]flagext.Time{}, + BlockPolicyIngestionStatusCode: map[string]int{}, + PolicyStreamMapping: map[string]string{}, + EnforcedLabels: []string{}, + PolicyEnforcedLabels: map[string][]string{}, }, }, { @@ -317,8 +353,12 @@ query_timeout: 5m Selector: `{a="b"}`, }, }, - OTLPConfig: defaultOTLPConfig, - EnforcedLabels: []string{}, + OTLPConfig: defaultOTLPConfig, + BlockPolicyIngestionUntil: map[string]flagext.Time{}, + BlockPolicyIngestionStatusCode: map[string]int{}, + PolicyStreamMapping: map[string]string{}, + PolicyEnforcedLabels: map[string][]string{}, + EnforcedLabels: []string{}, }, }, } { diff --git a/pkg/validation/validate.go b/pkg/validation/validate.go index ff681ac8d0936..72ab8a0b47c15 100644 --- a/pkg/validation/validate.go +++ b/pkg/validation/validate.go @@ -71,8 +71,12 @@ const ( StructuredMetadataTooManyErrorMsg = "stream '%s' has too many structured metadata labels: '%d', limit: '%d'. Please see `limits_config.max_structured_metadata_entries_count` or contact your Loki administrator to increase it." BlockedIngestion = "blocked_ingestion" BlockedIngestionErrorMsg = "ingestion blocked for user %s until '%s' with status code '%d'" + BlockedPolicyIngestion = "blocked_policy_ingestion" + BlockedPolicyIngestionErrorMsg = "ingestion blocked for user %s until '%s' with status code '%d' for policy '%s'" MissingEnforcedLabels = "missing_enforced_labels" MissingEnforcedLabelsErrorMsg = "missing required labels %s for user %s" + MissingPolicyEnforcedLabels = "missing_policy_enforced_labels" + MissingPolicyEnforcedLabelsErrorMsg = "missing required labels %s for policy %s for user %s" ) type ErrStreamRateLimit struct { @@ -115,7 +119,16 @@ var DiscardedBytes = promauto.NewCounterVec( Name: "discarded_bytes_total", Help: "The total number of bytes that were discarded.", }, - []string{ReasonLabel, "tenant"}, + []string{ReasonLabel, "tenant", "retention_hours"}, +) + +var DiscardedBytesByPolicy = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: constants.Loki, + Name: "discarded_bytes_by_policy_total", + Help: "The total number of bytes that were discarded by policy.", + }, + []string{ReasonLabel, "policy", "tenant", "retention_hours"}, ) // DiscardedSamples is a metric of the number of discarded samples, by reason. @@ -125,7 +138,16 @@ var DiscardedSamples = promauto.NewCounterVec( Name: "discarded_samples_total", Help: "The total number of samples that were discarded.", }, - []string{ReasonLabel, "tenant"}, + []string{ReasonLabel, "tenant", "retention_hours"}, +) + +var DiscardedSamplesByPolicy = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: constants.Loki, + Name: "discarded_samples_by_policy_total", + Help: "The total number of samples that were discarded by policy.", + }, + []string{ReasonLabel, "policy", "tenant", "retention_hours"}, ) var LineLengthHist = promauto.NewHistogram(prometheus.HistogramOpts{