-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add support for blocking a policy to be ingested #16203
Conversation
pkg/distributor/distributor.go
Outdated
@@ -556,6 +556,16 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log | |||
continue | |||
} | |||
|
|||
if ok, until := d.ShouldBlockPolicy(lbs, tenantID); ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly to how we enforce labels, I think this one should also replace the d.validator.ShouldBlockIngestion
down below which enforces the per-tenant block.
loki/pkg/distributor/distributor.go
Lines 652 to 653 in 9f9622d
if block, until, retStatusCode := d.validator.ShouldBlockIngestion(validationContext, now); block { | |
d.trackDiscardedData(ctx, req, validationContext, tenantID, validationContext.validationMetrics, validation.BlockedIngestion) |
pkg/distributor/distributor.go
Outdated
@@ -556,6 +556,16 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log | |||
continue | |||
} | |||
|
|||
if ok, until := d.ShouldBlockPolicy(lbs, tenantID); ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, we should pass the already resolved policy here to avoid resolving it twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pkg/validation/limits.go
Outdated
BlockIngestionUntil dskit_flagext.Time `yaml:"block_ingestion_until" json:"block_ingestion_until"` | ||
BlockIngestionStatusCode int `yaml:"block_ingestion_status_code" json:"block_ingestion_status_code"` | ||
BlockIngestionPolicyUntil map[string]dskit_flagext.Time `yaml:"block_ingestion_policy_until" json:"block_ingestion_policy_until"` | ||
BlockIngestionPolicyStatusCode map[string]int `yaml:"block_ingestion_policy_status_code" json:"block_ingestion_policy_status_code"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a per policy status code? I think we can just reuse the per-tenant BlockIngestionStatusCode.
I wouldn't add it unless we need it in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not really. got rid of it, ty
💻 Deploy preview deleted. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Left some minor comments
// If the status code is 200, return success. | ||
// Note that we still log the error and increment the metrics. | ||
if statusCode == http.StatusOK { | ||
// do not add error to validationErrors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not clear on what this comment means 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
previous and following errors do add the error to the validationErrors
slice. We don't want to add it when the statusCode is success otherwise we'll return an error when t he only error that happened was this one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah damn I read "do not and error" which made no sense to me 😂 "add" instead of "and" makes 100% sense 🤦
pkg/distributor/distributor.go
Outdated
if block, statusCode, reason, err := d.validator.ShouldBlockIngestion(validationContext, now, policy); block { | ||
d.writeFailuresManager.Log(tenantID, err) | ||
discardedBytes := util.EntriesTotalSize(stream.Entries) | ||
d.validator.reportDiscardedData(ctx, reason, validationContext, lbs, retentionHours, policy, discardedBytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this 👏 Maybe worth a followup PR making it consistent across all the function among other error handles.
|
||
// Check if this policy is blocked in tenant configs | ||
blockUntil := v.Limits.BlockIngestionPolicyUntil(ctx.userID, policy) | ||
if blockUntil.IsZero() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Even though now.Before(blockUntil)
would be false when blockUntil.IsZero()
, I like the explicit handling for the zero case. May we do the same for the ctx.blockIngestionUntil
?
pkg/distributor/validator.go
Outdated
@@ -184,19 +184,18 @@ func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, strea | |||
return nil | |||
} | |||
|
|||
func (v Validator) reportDiscardedData(ctx context.Context, reason string, vCtx validationContext, labels labels.Labels, retentionHours string, policy string, entrySize int) { | |||
func (v Validator) reportDiscardedData(ctx context.Context, reason string, vCtx validationContext, labels labels.Labels, retentionHours string, policy string, entrySize int, withUsageTracker bool) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Have reportDiscardedData
and reportDiscardedDataWithUsageTracker
instead of this bool to make it more explicit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, wdyt? I also found we had a updateMetrics
very simila to reportDiscardedData
so I got rid of it
pkg/distributor/validator.go
Outdated
err := fmt.Errorf(validation.BlockedIngestionErrorMsg, ctx.userID, ctx.blockIngestionUntil.Format(time.RFC3339), ctx.blockIngestionStatusCode) | ||
return true, ctx.blockIngestionStatusCode, validation.BlockedIngestion, err | ||
if block, code, reason, err := v.shouldBlockGlobalPolicy(ctx, now); block { | ||
return block, code, reason, err | ||
} | ||
|
||
if block, until, code := v.ShouldBlockPolicy(ctx, policy, now); block { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ShouldBlockPolicy
should be private shouldBlockPolicy
**What this PR does / why we need it**: Add support for blocking a policy from being ingested. The policy is defined by stream mapping. (cherry picked from commit 69089ef)
What this PR does / why we need it:
Add support for blocking a policy from being ingested.
The policy is defined by stream mapping.
Which issue(s) this PR fixes:
Fixes #
Special notes for your reviewer:
Checklist
CONTRIBUTING.md
guide (required)feat
PRs are unlikely to be accepted unless a case can be made for the feature actually being a bug fix to existing behavior.docs/sources/setup/upgrade/_index.md
deprecated-config.yaml
anddeleted-config.yaml
files respectively in thetools/deprecated-config-checker
directory. Example PR