Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Map streams to scopes and add per-scope ingestion block. #15561

Closed
wants to merge 37 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
3196552
Add per-scope ingestion block.
DylanGuedes Dec 30, 2024
4ee1420
Emit error message
DylanGuedes Jan 3, 2025
cda0b3a
Block at entry/stream iteration level.
DylanGuedes Jan 5, 2025
aa1ac74
fix lnit
DylanGuedes Jan 5, 2025
059df30
Update docs
DylanGuedes Jan 5, 2025
f5522f8
lint
DylanGuedes Jan 5, 2025
825d7af
Use empty map instead of nil
DylanGuedes Jan 5, 2025
0462fb0
lint import
DylanGuedes Jan 5, 2025
f5559b8
Add support for enforced labels.
DylanGuedes Jan 6, 2025
67ded00
Fix tests
DylanGuedes Jan 6, 2025
e174ed3
Merge branch 'main' of github.com:grafana/loki into add-per-scope-limits
DylanGuedes Jan 14, 2025
9b17ed7
Remove merge marks
DylanGuedes Jan 14, 2025
d03772d
Remove more conflict marks
DylanGuedes Jan 14, 2025
d080a8f
Reuse lbs.
DylanGuedes Jan 15, 2025
16f3937
Test multiple scopes
DylanGuedes Jan 15, 2025
37a1249
Improve flag docs
DylanGuedes Jan 15, 2025
4796661
Update docs
DylanGuedes Jan 15, 2025
65c6834
Rename `scope` to `policy`.
DylanGuedes Jan 16, 2025
885aa4f
Better docs
DylanGuedes Jan 16, 2025
503088a
Add new configs to limits test.
DylanGuedes Jan 16, 2025
eed8acd
fix lint
DylanGuedes Jan 16, 2025
9b26693
Update tests
DylanGuedes Jan 16, 2025
bd04998
Make per-retention period metric
DylanGuedes Jan 19, 2025
74ddaea
Fix compilation errors
DylanGuedes Jan 19, 2025
2a9f4dc
fix lint
DylanGuedes Jan 19, 2025
98bf0b7
Fix lint
DylanGuedes Jan 20, 2025
8e1922e
Use both variables
DylanGuedes Jan 20, 2025
d9e0007
Use new retention on those metrics invocation
DylanGuedes Jan 20, 2025
7060090
Emit metrics everywhere
DylanGuedes Jan 20, 2025
31e0439
Use retention when blocked policy
DylanGuedes Jan 20, 2025
b35d95e
Pass the retention limits to the ingester
DylanGuedes Jan 20, 2025
0cfb7cf
Use retention on ingester metrics
DylanGuedes Jan 20, 2025
f1ae37f
Fix test
DylanGuedes Jan 20, 2025
2a597c4
Fix test
DylanGuedes Jan 20, 2025
efd7207
last one?
DylanGuedes Jan 20, 2025
bf47258
Improve docs
DylanGuedes Jan 20, 2025
0b15b81
refactor to reuse code
DylanGuedes Jan 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3941,6 +3941,25 @@ otlp_config:
# CLI flag: -limits.block-ingestion-status-code
[block_ingestion_status_code: <int> | default = 260]

# Block ingestion until the given time for the given policy. Pushes will be
# assigned to a policy based on the stream matcher configuration. Experimental.
[block_policy_ingestion_until: <map of string to Time>]
Copy link
Contributor

@JStickler JStickler Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there defaults for any of these settings? (even if the default is nothing is set?)
Also, these settings could definitely use an example in the description, since users might not know what these mappings should look like.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I should have closed this PR earlier. I'm dropping it to work on smaller PRs (I already merged one of them). Regarding your suggestion: absolutely. I'll tackle that on my next PRs.


# HTTP status code to return when ingestion is blocked for the given policy.
# Experimental.
[block_policy_ingestion_status_code: <map of string to int>]

# Map of policies to stream selectors. Push streams that matches a policy
# selector will be considered as belonging to that policy. If that policy is
# blocked, the push will be rejected with the status code specified in
# block_policy_ingestion_status_code. Experimental.
[policy_stream_mapping: <map of string to string>]

# Map of policies to enforced labels. Push streams that matches a policy
# selector will be considered as belonging to that policy and as such, the
# labels related to the policy will be enforced. Experimental.
[policy_enforced_labels: <map of string to list of strings>]

# List of labels that must be present in the stream. If any of the labels are
# missing, the stream will be discarded. This flag configures it globally for
# all tenants. Experimental.
Expand Down
168 changes: 141 additions & 27 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,8 +453,10 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
// We use the heuristic of 1 sample per TS to size the array.
// We also work out the hash value at the same time.
streams := make([]KeyedStream, 0, len(req.Streams))
validatedLineSize := 0
validatedLineCount := 0
validatedLineCountTotal := 0
validatedLineSizeTotal := 0
validatedLineSizePerRetention := make(map[string]int) // map of retention period to validated line size
validatedLineCountPerRetention := make(map[string]int) // map of retention period to validated line count

var validationErrors util.GroupedErrors

Expand Down Expand Up @@ -512,33 +514,35 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
// Truncate first so subsequent steps have consistent line lengths
d.truncateLines(validationContext, &stream)

tenantRetention := d.validator.Limits.RetentionPeriod(tenantID)

initialEntriesSize := util.EntriesTotalSize(stream.Entries)

var lbs labels.Labels
lbs, stream.Labels, stream.Hash, err = d.parseStreamLabels(validationContext, stream.Labels, stream)
if err != nil {
d.writeFailuresManager.Log(tenantID, err)
validationErrors.Add(err)
validation.DiscardedSamples.WithLabelValues(validation.InvalidLabels, tenantID).Add(float64(len(stream.Entries)))
discardedBytes := util.EntriesTotalSize(stream.Entries)
validation.DiscardedBytes.WithLabelValues(validation.InvalidLabels, tenantID).Add(float64(discardedBytes))
validation.DiscardedSamples.WithLabelValues(validation.InvalidLabels, tenantID, tenantRetention.String()).Add(float64(len(stream.Entries)))
validation.DiscardedBytes.WithLabelValues(validation.InvalidLabels, tenantID, tenantRetention.String()).Add(float64(initialEntriesSize))
continue
}

if missing, lbsMissing := d.missingEnforcedLabels(lbs, tenantID); missing {
err := fmt.Errorf(validation.MissingEnforcedLabelsErrorMsg, strings.Join(lbsMissing, ","), tenantID)
if errorLbValue, err := d.missingEnforcedLabels(lbs, tenantID); err != nil {
d.writeFailuresManager.Log(tenantID, err)
validationErrors.Add(err)
validation.DiscardedSamples.WithLabelValues(validation.MissingEnforcedLabels, tenantID).Add(float64(len(stream.Entries)))
discardedBytes := util.EntriesTotalSize(stream.Entries)
validation.DiscardedBytes.WithLabelValues(validation.MissingEnforcedLabels, tenantID).Add(float64(discardedBytes))
validation.DiscardedSamples.WithLabelValues(errorLbValue, tenantID, tenantRetention.String()).Add(float64(len(stream.Entries)))
validation.DiscardedBytes.WithLabelValues(errorLbValue, tenantID, tenantRetention.String()).Add(float64(initialEntriesSize))
continue
}

n := 0
pushSize := 0
prevTs := stream.Entries[0].Timestamp
streamRetention := d.retentionPeriodForStream(lbs, tenantID)

for _, entry := range stream.Entries {
if err := d.validator.ValidateEntry(ctx, validationContext, lbs, entry); err != nil {
if err := d.validator.ValidateEntry(ctx, validationContext, lbs, entry, streamRetention); err != nil {
d.writeFailuresManager.Log(tenantID, err)
validationErrors.Add(err)
continue
Expand Down Expand Up @@ -593,16 +597,36 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}

n++
validatedLineSize += util.EntryTotalSize(&entry)
validatedLineCount++
pushSize += len(entry.Line)

entrySize := util.EntryTotalSize(&entry)
validatedLineSizePerRetention[streamRetention.String()] += entrySize
validatedLineCountPerRetention[streamRetention.String()]++
validatedLineSizeTotal += entrySize
validatedLineCountTotal++
pushSize += entrySize
}

stream.Entries = stream.Entries[:n]
if len(stream.Entries) == 0 {
// Empty stream after validating all the entries
continue
}

if policy := d.policyForStream(lbs, tenantID); policy != "" {
streamSize := util.EntriesTotalSize(stream.Entries)
if yes, until, retStatusCode := d.validator.ShouldBlockIngestionForPolicy(validationContext, policy, now); yes {
d.trackDiscardedDataFromPolicy(ctx, policy, tenantID, validation.BlockedPolicyIngestion, streamRetention, streamSize, lbs)

err := fmt.Errorf(validation.BlockedPolicyIngestionErrorMsg, tenantID, until.Format(time.RFC3339), retStatusCode, policy)
d.writeFailuresManager.Log(tenantID, err)

validationErrors.Add(err)
validation.DiscardedSamples.WithLabelValues(validation.BlockedPolicyIngestion, tenantID, streamRetention.String()).Add(float64(len(stream.Entries)))
validation.DiscardedBytes.WithLabelValues(validation.BlockedPolicyIngestion, tenantID, streamRetention.String()).Add(float64(streamSize))
continue
}
}

maybeShardStreams(stream, lbs, pushSize)
}
}()
Expand All @@ -617,8 +641,8 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return &logproto.PushResponse{}, validationErr
}

if block, until, retStatusCode := d.validator.ShouldBlockIngestion(validationContext, now); block {
d.trackDiscardedData(ctx, req, validationContext, tenantID, validatedLineCount, validatedLineSize, validation.BlockedIngestion)
if block, until, retStatusCode := d.validator.ShouldBlockIngestionForTenant(validationContext, now); block {
d.trackDiscardedData(ctx, req, validationContext, tenantID, validatedLineCountPerRetention, validatedLineSizePerRetention, validation.BlockedIngestion)

err = fmt.Errorf(validation.BlockedIngestionErrorMsg, tenantID, until.Format(time.RFC3339), retStatusCode)
d.writeFailuresManager.Log(tenantID, err)
Expand All @@ -632,10 +656,10 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return nil, httpgrpc.Errorf(retStatusCode, "%s", err.Error())
}

if !d.ingestionRateLimiter.AllowN(now, tenantID, validatedLineSize) {
d.trackDiscardedData(ctx, req, validationContext, tenantID, validatedLineCount, validatedLineSize, validation.RateLimited)
if !d.ingestionRateLimiter.AllowN(now, tenantID, validatedLineSizeTotal) {
d.trackDiscardedData(ctx, req, validationContext, tenantID, validatedLineCountPerRetention, validatedLineSizePerRetention, validation.RateLimited)

err = fmt.Errorf(validation.RateLimitedErrorMsg, tenantID, int(d.ingestionRateLimiter.Limit(now, tenantID)), validatedLineCount, validatedLineSize)
err = fmt.Errorf(validation.RateLimitedErrorMsg, tenantID, int(d.ingestionRateLimiter.Limit(now, tenantID)), validatedLineCountTotal, validatedLineSizeTotal)
d.writeFailuresManager.Log(tenantID, err)
// Return a 429 to indicate to the client they are being rate limited
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "%s", err.Error())
Expand Down Expand Up @@ -743,14 +767,39 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}
}

// missingEnforcedLabels returns true if the stream is missing any of the required labels.
func (d *Distributor) retentionPeriodForStream(lbs labels.Labels, tenantID string) time.Duration {
retentions := d.validator.Limits.StreamRetention(tenantID)

for _, retention := range retentions {
if retention.Matches(lbs) {
return time.Duration(retention.Period)
}
}

// No retention for this specific stream, use the default retention period.
return d.validator.Limits.RetentionPeriod(tenantID)
}

func (d *Distributor) missingEnforcedLabels(lbs labels.Labels, tenantID string) (string, error) {
if err := d.missingTenantEnforcedLabels(lbs, tenantID); err != nil {
return validation.MissingEnforcedLabels, err
}

if err := d.missingPolicyEnforcedLabels(lbs, tenantID); err != nil {
return validation.MissingPolicyEnforcedLabels, err
}

return "", nil
}

// missingTenantEnforcedLabels returns true if the stream is missing any of the required labels for the tenant.
//
// It also returns the first label that is missing if any (for the case of multiple labels missing).
func (d *Distributor) missingEnforcedLabels(lbs labels.Labels, tenantID string) (bool, []string) {
func (d *Distributor) missingTenantEnforcedLabels(lbs labels.Labels, tenantID string) error {
requiredLbs := d.validator.Limits.EnforcedLabels(tenantID)
if len(requiredLbs) == 0 {
// no enforced labels configured.
return false, []string{}
return nil
}

missingLbs := []string{}
Expand All @@ -761,20 +810,85 @@ func (d *Distributor) missingEnforcedLabels(lbs labels.Labels, tenantID string)
}
}

return len(missingLbs) > 0, missingLbs
if len(missingLbs) > 0 {
return fmt.Errorf(validation.MissingEnforcedLabelsErrorMsg, strings.Join(missingLbs, ","), tenantID)
}

return nil
}

func (d *Distributor) policyForStream(lbs labels.Labels, tenantID string) string {
policyStreamMapping := d.validator.Limits.PolicyStreamMapping(tenantID)
if len(policyStreamMapping) == 0 {
// not configured.
return ""
}

for policy, streamSelector := range policyStreamMapping {
matchers, err := syntax.ParseMatchers(streamSelector, true)
if err != nil {
level.Error(d.logger).Log("msg", "failed to parse stream selector", "error", err, "stream_selector", streamSelector, "tenant_id", tenantID)
continue
}

if validation.LabelMatchesMatchers(lbs, matchers) {
return policy
}
}

return ""
}

func (d *Distributor) missingPolicyEnforcedLabels(lbs labels.Labels, tenantID string) error {
policyEnforcedLabels := d.validator.Limits.PolicyEnforcedLabels(tenantID)
if len(policyEnforcedLabels) == 0 {
// not configured.
return nil
}

policy := d.policyForStream(lbs, tenantID)
if policy == "" {
// no policy found for stream.
return nil
}

missingLbs := []string{}

for _, lb := range policyEnforcedLabels[policy] {
if !lbs.Has(lb) {
missingLbs = append(missingLbs, lb)
}
}

if len(missingLbs) > 0 {
return fmt.Errorf(validation.MissingPolicyEnforcedLabelsErrorMsg, strings.Join(missingLbs, ","), policy, tenantID)
}

return nil
}

func (d *Distributor) trackDiscardedDataFromPolicy(ctx context.Context, policy string, tenantID string, reason string, retention time.Duration, discardedStreamBytes int, lbs labels.Labels) {
validation.DiscardedSamplesByPolicy.WithLabelValues(reason, policy, tenantID, retention.String()).Add(float64(1))
validation.DiscardedBytesByPolicy.WithLabelValues(reason, policy, tenantID, retention.String()).Add(float64(discardedStreamBytes))

if d.usageTracker != nil {
d.usageTracker.DiscardedBytesAddByPolicy(ctx, tenantID, policy, reason, retention, lbs, float64(discardedStreamBytes))
}
}

func (d *Distributor) trackDiscardedData(
ctx context.Context,
req *logproto.PushRequest,
validationContext validationContext,
tenantID string,
validatedLineCount int,
validatedLineSize int,
validatedLineCount map[string]int,
validatedLineSize map[string]int,
reason string,
) {
validation.DiscardedSamples.WithLabelValues(reason, tenantID).Add(float64(validatedLineCount))
validation.DiscardedBytes.WithLabelValues(reason, tenantID).Add(float64(validatedLineSize))
for retention, count := range validatedLineCount {
validation.DiscardedSamples.WithLabelValues(reason, tenantID, retention).Add(float64(count))
validation.DiscardedBytes.WithLabelValues(reason, tenantID, retention).Add(float64(validatedLineSize[retention]))
}

if d.usageTracker != nil {
for _, stream := range req.Streams {
Expand Down
Loading
Loading