Skip to content

Commit

Permalink
fix: Fix blocked ingestion returned error when 260 (#16387)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
Modify how we handle blocked ingestion errors:
* If it is 200, don't expose any error
* Otherwise, expose the error as the configured statusCode, but only if no other validation error occur. I'm calling this a secondTierErr
Also modifying the enforced labels error message to log present labels.
  • Loading branch information
DylanGuedes authored Feb 20, 2025
1 parent 073c94c commit 3d6163a
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 6 deletions.
14 changes: 10 additions & 4 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,8 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}
}

var ingestionBlockedError error

func() {
sp := opentracing.SpanFromContext(ctx)
if sp != nil {
Expand Down Expand Up @@ -547,7 +549,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}

if missing, lbsMissing := d.missingEnforcedLabels(lbs, tenantID, policy); missing {
err := fmt.Errorf(validation.MissingEnforcedLabelsErrorMsg, strings.Join(lbsMissing, ","), tenantID)
err := fmt.Errorf(validation.MissingEnforcedLabelsErrorMsg, strings.Join(lbsMissing, ","), tenantID, stream.Labels)
d.writeFailuresManager.Log(tenantID, err)
validationErrors.Add(err)
discardedBytes := util.EntriesTotalSize(stream.Entries)
Expand All @@ -560,14 +562,15 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
discardedBytes := util.EntriesTotalSize(stream.Entries)
d.validator.reportDiscardedData(reason, validationContext, retentionHours, policy, discardedBytes, len(stream.Entries))

// If the status code is 200, return success.
// If the status code is 200, return no error.
// Note that we still log the error and increment the metrics.
if statusCode == http.StatusOK {
// do not add error to validationErrors.
continue
}

validationErrors.Add(err)
// return an error but do not add it to validationErrors
// otherwise client will get a 400 and will log it.
ingestionBlockedError = httpgrpc.Errorf(statusCode, "%s", err.Error())
continue
}

Expand Down Expand Up @@ -647,6 +650,9 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
var validationErr error
if validationErrors.Err() != nil {
validationErr = httpgrpc.Errorf(http.StatusBadRequest, "%s", validationErrors.Error())
} else if ingestionBlockedError != nil {
// Any validation error takes precedence over the status code and error message for blocked ingestion.
validationErr = ingestionBlockedError
}

// Return early if none of the streams contained entries
Expand Down
9 changes: 8 additions & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ func Test_PushWithEnforcedLabels(t *testing.T) {
// enforced labels configured, but all labels are missing.
_, err := distributors[0].Push(ctx, req)
require.Error(t, err)
expectedErr := httpgrpc.Errorf(http.StatusBadRequest, validation.MissingEnforcedLabelsErrorMsg, "app,env", "test")
expectedErr := httpgrpc.Errorf(http.StatusBadRequest, validation.MissingEnforcedLabelsErrorMsg, "app,env", "test", "{foo=\"bar\"}")
require.EqualError(t, err, expectedErr.Error())

// Verify metrics for discarded samples due to missing enforced labels
Expand Down Expand Up @@ -1677,6 +1677,13 @@ func TestDistributor_PushIngestionBlocked(t *testing.T) {
expectError: false,
expectedStatusCode: http.StatusOK,
},
{
name: "blocked with status code 260",
blockUntil: time.Now().Add(1 * time.Hour),
blockStatusCode: 260,
expectError: true,
expectedStatusCode: 260,
},
} {
t.Run(tc.name, func(t *testing.T) {
limits := &validation.Limits{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/validation/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ const (
BlockedIngestionPolicy = "blocked_ingestion_policy"
BlockedIngestionPolicyErrorMsg = "ingestion blocked for user %s until '%s' with status code '%d'"
MissingEnforcedLabels = "missing_enforced_labels"
MissingEnforcedLabelsErrorMsg = "missing required labels %s for user %s"
MissingEnforcedLabelsErrorMsg = "missing required labels %s for user %s for stream %s"
)

type ErrStreamRateLimit struct {
Expand Down

0 comments on commit 3d6163a

Please sign in to comment.