-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add support for blocking a policy to be ingested #16203
Changes from 16 commits
9f9622d
e1df362
be5ceb6
4218d81
bfcc228
0eaa021
84a6931
6d0204e
49bd76a
70bd1e1
1b7fdb4
119d358
585b096
aaeadbf
d41480f
fd275ad
80d72f7
6ca9ec5
0e908b9
cfbe78b
23e8d07
2a1a87d
626fd51
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -104,21 +104,13 @@ func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, la | |
// Makes time string on the error message formatted consistently. | ||
formatedEntryTime := entry.Timestamp.Format(timeFormat) | ||
formatedRejectMaxAgeTime := time.Unix(0, vCtx.rejectOldSampleMaxAge).Format(timeFormat) | ||
validation.DiscardedSamples.WithLabelValues(validation.GreaterThanMaxSampleAge, vCtx.userID, retentionHours, policy).Inc() | ||
validation.DiscardedBytes.WithLabelValues(validation.GreaterThanMaxSampleAge, vCtx.userID, retentionHours, policy).Add(entrySize) | ||
if v.usageTracker != nil { | ||
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.GreaterThanMaxSampleAge, labels, entrySize) | ||
} | ||
v.reportDiscardedData(ctx, validation.GreaterThanMaxSampleAge, vCtx, labels, retentionHours, policy, int(entrySize), true) | ||
return fmt.Errorf(validation.GreaterThanMaxSampleAgeErrorMsg, labels, formatedEntryTime, formatedRejectMaxAgeTime) | ||
} | ||
|
||
if ts > vCtx.creationGracePeriod { | ||
formatedEntryTime := entry.Timestamp.Format(timeFormat) | ||
validation.DiscardedSamples.WithLabelValues(validation.TooFarInFuture, vCtx.userID, retentionHours, policy).Inc() | ||
validation.DiscardedBytes.WithLabelValues(validation.TooFarInFuture, vCtx.userID, retentionHours, policy).Add(entrySize) | ||
if v.usageTracker != nil { | ||
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.TooFarInFuture, labels, entrySize) | ||
} | ||
v.reportDiscardedData(ctx, validation.TooFarInFuture, vCtx, labels, retentionHours, policy, int(entrySize), true) | ||
return fmt.Errorf(validation.TooFarInFutureErrorMsg, labels, formatedEntryTime) | ||
} | ||
|
||
|
@@ -127,39 +119,23 @@ func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, la | |
// an orthogonal concept (we need not use ValidateLabels in this context) | ||
// but the upstream cortex_validation pkg uses it, so we keep this | ||
// for parity. | ||
validation.DiscardedSamples.WithLabelValues(validation.LineTooLong, vCtx.userID, retentionHours, policy).Inc() | ||
validation.DiscardedBytes.WithLabelValues(validation.LineTooLong, vCtx.userID, retentionHours, policy).Add(entrySize) | ||
if v.usageTracker != nil { | ||
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.LineTooLong, labels, entrySize) | ||
} | ||
v.reportDiscardedData(ctx, validation.LineTooLong, vCtx, labels, retentionHours, policy, int(entrySize), true) | ||
return fmt.Errorf(validation.LineTooLongErrorMsg, maxSize, labels, len(entry.Line)) | ||
} | ||
|
||
if structuredMetadataCount > 0 { | ||
if !vCtx.allowStructuredMetadata { | ||
validation.DiscardedSamples.WithLabelValues(validation.DisallowedStructuredMetadata, vCtx.userID, retentionHours, policy).Inc() | ||
validation.DiscardedBytes.WithLabelValues(validation.DisallowedStructuredMetadata, vCtx.userID, retentionHours, policy).Add(entrySize) | ||
if v.usageTracker != nil { | ||
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.DisallowedStructuredMetadata, labels, entrySize) | ||
} | ||
v.reportDiscardedData(ctx, validation.DisallowedStructuredMetadata, vCtx, labels, retentionHours, policy, int(entrySize), true) | ||
return fmt.Errorf(validation.DisallowedStructuredMetadataErrorMsg, labels) | ||
} | ||
|
||
if maxSize := vCtx.maxStructuredMetadataSize; maxSize != 0 && structuredMetadataSizeBytes > maxSize { | ||
validation.DiscardedSamples.WithLabelValues(validation.StructuredMetadataTooLarge, vCtx.userID, retentionHours, policy).Inc() | ||
validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooLarge, vCtx.userID, retentionHours, policy).Add(entrySize) | ||
if v.usageTracker != nil { | ||
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.StructuredMetadataTooLarge, labels, entrySize) | ||
} | ||
v.reportDiscardedData(ctx, validation.StructuredMetadataTooLarge, vCtx, labels, retentionHours, policy, int(entrySize), true) | ||
return fmt.Errorf(validation.StructuredMetadataTooLargeErrorMsg, labels, structuredMetadataSizeBytes, vCtx.maxStructuredMetadataSize) | ||
} | ||
|
||
if maxCount := vCtx.maxStructuredMetadataCount; maxCount != 0 && structuredMetadataCount > maxCount { | ||
validation.DiscardedSamples.WithLabelValues(validation.StructuredMetadataTooMany, vCtx.userID, retentionHours, policy).Inc() | ||
validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooMany, vCtx.userID, retentionHours, policy).Add(entrySize) | ||
if v.usageTracker != nil { | ||
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.StructuredMetadataTooMany, labels, entrySize) | ||
} | ||
v.reportDiscardedData(ctx, validation.StructuredMetadataTooMany, vCtx, labels, retentionHours, policy, int(entrySize), true) | ||
return fmt.Errorf(validation.StructuredMetadataTooManyErrorMsg, labels, structuredMetadataCount, vCtx.maxStructuredMetadataCount) | ||
} | ||
} | ||
|
@@ -208,13 +184,60 @@ func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, strea | |
return nil | ||
} | ||
|
||
func (v Validator) reportDiscardedData(ctx context.Context, reason string, vCtx validationContext, labels labels.Labels, retentionHours string, policy string, entrySize int, withUsageTracker bool) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done, wdyt? I also found we had a |
||
validation.DiscardedSamples.WithLabelValues(reason, vCtx.userID, retentionHours, policy).Inc() | ||
validation.DiscardedBytes.WithLabelValues(reason, vCtx.userID, retentionHours, policy).Add(float64(entrySize)) | ||
if withUsageTracker && v.usageTracker != nil { | ||
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, reason, labels, float64(entrySize)) | ||
} | ||
} | ||
|
||
// ShouldBlockIngestion returns whether ingestion should be blocked, until when and the status code. | ||
func (v Validator) ShouldBlockIngestion(ctx validationContext, now time.Time) (bool, time.Time, int) { | ||
func (v Validator) ShouldBlockIngestion(ctx validationContext, now time.Time, policy string) (bool, int, string, error) { | ||
if block, code, reason, err := v.shouldBlockGlobalPolicy(ctx, now); block { | ||
return block, code, reason, err | ||
} | ||
|
||
if block, until, code := v.ShouldBlockPolicy(ctx, policy, now); block { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
err := fmt.Errorf(validation.BlockedIngestionPolicyErrorMsg, ctx.userID, until.Format(time.RFC3339), code) | ||
return true, code, validation.BlockedIngestionPolicy, err | ||
} | ||
|
||
return false, 0, "", nil | ||
} | ||
|
||
func (v Validator) shouldBlockGlobalPolicy(ctx validationContext, now time.Time) (bool, int, string, error) { | ||
if ctx.blockIngestionUntil.IsZero() { | ||
return false, 0, "", nil | ||
} | ||
|
||
if now.Before(ctx.blockIngestionUntil) { | ||
err := fmt.Errorf(validation.BlockedIngestionErrorMsg, ctx.userID, ctx.blockIngestionUntil.Format(time.RFC3339), ctx.blockIngestionStatusCode) | ||
return true, ctx.blockIngestionStatusCode, validation.BlockedIngestion, err | ||
} | ||
|
||
return false, 0, "", nil | ||
} | ||
|
||
// ShouldBlockPolicy checks if ingestion should be blocked for the given policy. | ||
// It returns true if ingestion should be blocked, along with the block until time and status code. | ||
func (v *Validator) ShouldBlockPolicy(ctx validationContext, policy string, now time.Time) (bool, time.Time, int) { | ||
// No policy provided, don't block | ||
if policy == "" { | ||
return false, time.Time{}, 0 | ||
} | ||
|
||
return now.Before(ctx.blockIngestionUntil), ctx.blockIngestionUntil, ctx.blockIngestionStatusCode | ||
// Check if this policy is blocked in tenant configs | ||
blockUntil := v.Limits.BlockIngestionPolicyUntil(ctx.userID, policy) | ||
if blockUntil.IsZero() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Even though |
||
return false, time.Time{}, 0 | ||
} | ||
|
||
if now.Before(blockUntil) { | ||
return true, blockUntil, ctx.blockIngestionStatusCode | ||
} | ||
|
||
return false, time.Time{}, 0 | ||
} | ||
|
||
func updateMetrics(reason, userID string, stream logproto.Stream, retentionHours, policy string) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not clear on what this comment means 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
previous and following errors do add the error to the
validationErrors
slice. We don't want to add it when the statusCode is success otherwise we'll return an error when t he only error that happened was this one.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah damn I read "do not and error" which made no sense to me 😂 "add" instead of "and" makes 100% sense 🤦