-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add support for blocking a policy to be ingested #16203
Merged
Merged
Changes from 4 commits
Commits
Show all changes
23 commits
Select commit
Hold shift + click to select a range
9f9622d
Add the policy blocking.
DylanGuedes e1df362
address suggestions
DylanGuedes be5ceb6
update test accordingly
DylanGuedes 4218d81
Merge branch 'main' of github.com:grafana/loki into add-block-based-o…
DylanGuedes bfcc228
undo merge changes
DylanGuedes 0eaa021
undo changes
DylanGuedes 84a6931
check for blocked ingestion at same place
DylanGuedes 6d0204e
update docs
DylanGuedes 49bd76a
Fix test
DylanGuedes 70bd1e1
Report things at a single place
DylanGuedes 1b7fdb4
do not check for nil
DylanGuedes 119d358
Update docs
DylanGuedes 585b096
handle the zero case isoalted
DylanGuedes aaeadbf
Add conditional for the usageTracker reporting
DylanGuedes d41480f
undo change
DylanGuedes fd275ad
do it isolated
DylanGuedes 80d72f7
refactor how we report metrics
DylanGuedes 6ca9ec5
make method private
DylanGuedes 0e908b9
lint
DylanGuedes cfbe78b
Add test covering metrics reporting
DylanGuedes 23e8d07
Remove dead test
DylanGuedes 2a1a87d
lint
DylanGuedes 626fd51
clear test state
DylanGuedes File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -104,13 +104,13 @@ func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, la | |
// Makes time string on the error message formatted consistently. | ||
formatedEntryTime := entry.Timestamp.Format(timeFormat) | ||
formatedRejectMaxAgeTime := time.Unix(0, vCtx.rejectOldSampleMaxAge).Format(timeFormat) | ||
v.reportDiscardedData(ctx, validation.GreaterThanMaxSampleAge, vCtx, labels, retentionHours, policy, int(entrySize)) | ||
v.reportDiscardedData(ctx, validation.GreaterThanMaxSampleAge, vCtx, labels, retentionHours, policy, int(entrySize), true) | ||
return fmt.Errorf(validation.GreaterThanMaxSampleAgeErrorMsg, labels, formatedEntryTime, formatedRejectMaxAgeTime) | ||
} | ||
|
||
if ts > vCtx.creationGracePeriod { | ||
formatedEntryTime := entry.Timestamp.Format(timeFormat) | ||
v.reportDiscardedData(ctx, validation.TooFarInFuture, vCtx, labels, retentionHours, policy, int(entrySize)) | ||
v.reportDiscardedData(ctx, validation.TooFarInFuture, vCtx, labels, retentionHours, policy, int(entrySize), true) | ||
return fmt.Errorf(validation.TooFarInFutureErrorMsg, labels, formatedEntryTime) | ||
} | ||
|
||
|
@@ -119,23 +119,23 @@ func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, la | |
// an orthogonal concept (we need not use ValidateLabels in this context) | ||
// but the upstream cortex_validation pkg uses it, so we keep this | ||
// for parity. | ||
v.reportDiscardedData(ctx, validation.LineTooLong, vCtx, labels, retentionHours, policy, int(entrySize)) | ||
v.reportDiscardedData(ctx, validation.LineTooLong, vCtx, labels, retentionHours, policy, int(entrySize), true) | ||
return fmt.Errorf(validation.LineTooLongErrorMsg, maxSize, labels, len(entry.Line)) | ||
} | ||
|
||
if structuredMetadataCount > 0 { | ||
if !vCtx.allowStructuredMetadata { | ||
v.reportDiscardedData(ctx, validation.DisallowedStructuredMetadata, vCtx, labels, retentionHours, policy, int(entrySize)) | ||
v.reportDiscardedData(ctx, validation.DisallowedStructuredMetadata, vCtx, labels, retentionHours, policy, int(entrySize), true) | ||
return fmt.Errorf(validation.DisallowedStructuredMetadataErrorMsg, labels) | ||
} | ||
|
||
if maxSize := vCtx.maxStructuredMetadataSize; maxSize != 0 && structuredMetadataSizeBytes > maxSize { | ||
v.reportDiscardedData(ctx, validation.StructuredMetadataTooLarge, vCtx, labels, retentionHours, policy, int(entrySize)) | ||
v.reportDiscardedData(ctx, validation.StructuredMetadataTooLarge, vCtx, labels, retentionHours, policy, int(entrySize), true) | ||
return fmt.Errorf(validation.StructuredMetadataTooLargeErrorMsg, labels, structuredMetadataSizeBytes, vCtx.maxStructuredMetadataSize) | ||
} | ||
|
||
if maxCount := vCtx.maxStructuredMetadataCount; maxCount != 0 && structuredMetadataCount > maxCount { | ||
v.reportDiscardedData(ctx, validation.StructuredMetadataTooMany, vCtx, labels, retentionHours, policy, int(entrySize)) | ||
v.reportDiscardedData(ctx, validation.StructuredMetadataTooMany, vCtx, labels, retentionHours, policy, int(entrySize), true) | ||
return fmt.Errorf(validation.StructuredMetadataTooManyErrorMsg, labels, structuredMetadataCount, vCtx.maxStructuredMetadataCount) | ||
} | ||
} | ||
|
@@ -184,19 +184,18 @@ func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, strea | |
return nil | ||
} | ||
|
||
func (v Validator) reportDiscardedData(ctx context.Context, reason string, vCtx validationContext, labels labels.Labels, retentionHours string, policy string, entrySize int) { | ||
func (v Validator) reportDiscardedData(ctx context.Context, reason string, vCtx validationContext, labels labels.Labels, retentionHours string, policy string, entrySize int, withUsageTracker bool) { | ||
validation.DiscardedSamples.WithLabelValues(reason, vCtx.userID, retentionHours, policy).Inc() | ||
validation.DiscardedBytes.WithLabelValues(reason, vCtx.userID, retentionHours, policy).Add(float64(entrySize)) | ||
if v.usageTracker != nil { | ||
if withUsageTracker && v.usageTracker != nil { | ||
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, reason, labels, float64(entrySize)) | ||
} | ||
} | ||
|
||
// ShouldBlockIngestion returns whether ingestion should be blocked, until when and the status code. | ||
func (v Validator) ShouldBlockIngestion(ctx validationContext, now time.Time, policy string) (bool, int, string, error) { | ||
if now.Before(ctx.blockIngestionUntil) { | ||
err := fmt.Errorf(validation.BlockedIngestionErrorMsg, ctx.userID, ctx.blockIngestionUntil.Format(time.RFC3339), ctx.blockIngestionStatusCode) | ||
return true, ctx.blockIngestionStatusCode, validation.BlockedIngestion, err | ||
if block, code, reason, err := v.shouldBlockGlobalPolicy(ctx, now); block { | ||
return block, code, reason, err | ||
} | ||
|
||
if block, until, code := v.ShouldBlockPolicy(ctx, policy, now); block { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
@@ -207,6 +206,19 @@ func (v Validator) ShouldBlockIngestion(ctx validationContext, now time.Time, po | |
return false, 0, "", nil | ||
} | ||
|
||
func (v Validator) shouldBlockGlobalPolicy(ctx validationContext, now time.Time) (bool, int, string, error) { | ||
if ctx.blockIngestionUntil.IsZero() { | ||
return false, 0, "", nil | ||
} | ||
|
||
if now.Before(ctx.blockIngestionUntil) { | ||
err := fmt.Errorf(validation.BlockedIngestionErrorMsg, ctx.userID, ctx.blockIngestionUntil.Format(time.RFC3339), ctx.blockIngestionStatusCode) | ||
return true, ctx.blockIngestionStatusCode, validation.BlockedIngestion, err | ||
} | ||
|
||
return false, 0, "", nil | ||
} | ||
|
||
// ShouldBlockPolicy checks if ingestion should be blocked for the given policy. | ||
// It returns true if ingestion should be blocked, along with the block until time and status code. | ||
func (v *Validator) ShouldBlockPolicy(ctx validationContext, policy string, now time.Time) (bool, time.Time, int) { | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Have
reportDiscardedData
andreportDiscardedDataWithUsageTracker
instead of this bool to make it more explicitThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, wdyt? I also found we had a
updateMetrics
very simila toreportDiscardedData
so I got rid of it