Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for blocking a policy to be ingested #16203

Merged
merged 23 commits into from
Feb 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3601,6 +3601,11 @@ otlp_config:
# drop them altogether
[log_attributes: <list of attributes_configs>]

# Block ingestion for policy until the configured date. The time should be in
# RFC3339 format. The policy is based on the policy_stream_mapping
# configuration.
[block_ingestion_policy_until: <map of string to Time>]

# Block ingestion until the configured date. The time should be in RFC3339
# format.
# CLI flag: -limits.block-ingestion-until
Expand Down
34 changes: 17 additions & 17 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,9 +550,24 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
err := fmt.Errorf(validation.MissingEnforcedLabelsErrorMsg, strings.Join(lbsMissing, ","), tenantID)
d.writeFailuresManager.Log(tenantID, err)
validationErrors.Add(err)
validation.DiscardedSamples.WithLabelValues(validation.MissingEnforcedLabels, tenantID, retentionHours, policy).Add(float64(len(stream.Entries)))
discardedBytes := util.EntriesTotalSize(stream.Entries)
validation.DiscardedBytes.WithLabelValues(validation.MissingEnforcedLabels, tenantID, retentionHours, policy).Add(float64(discardedBytes))
d.validator.reportDiscardedData(validation.MissingEnforcedLabels, validationContext, retentionHours, policy, discardedBytes, len(stream.Entries))
continue
}

if block, statusCode, reason, err := d.validator.ShouldBlockIngestion(validationContext, now, policy); block {
d.writeFailuresManager.Log(tenantID, err)
discardedBytes := util.EntriesTotalSize(stream.Entries)
d.validator.reportDiscardedData(reason, validationContext, retentionHours, policy, discardedBytes, len(stream.Entries))

// If the status code is 200, return success.
// Note that we still log the error and increment the metrics.
if statusCode == http.StatusOK {
// do not add error to validationErrors.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not clear on what this comment means 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previous and following errors do add the error to the validationErrors slice. We don't want to add it when the statusCode is success otherwise we'll return an error when t he only error that happened was this one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah damn I read "do not and error" which made no sense to me 😂 "add" instead of "and" makes 100% sense 🤦

continue
}

validationErrors.Add(err)
continue
}

Expand Down Expand Up @@ -639,21 +654,6 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return &logproto.PushResponse{}, validationErr
}

if block, until, retStatusCode := d.validator.ShouldBlockIngestion(validationContext, now); block {
d.trackDiscardedData(ctx, req, validationContext, tenantID, validationContext.validationMetrics, validation.BlockedIngestion)

err = fmt.Errorf(validation.BlockedIngestionErrorMsg, tenantID, until.Format(time.RFC3339), retStatusCode)
d.writeFailuresManager.Log(tenantID, err)

// If the status code is 200, return success.
// Note that we still log the error and increment the metrics.
if retStatusCode == http.StatusOK {
return &logproto.PushResponse{}, nil
}

return nil, httpgrpc.Errorf(retStatusCode, "%s", err.Error())
}

if !d.ingestionRateLimiter.AllowN(now, tenantID, validationContext.validationMetrics.aggregatedPushStats.lineSize) {
d.trackDiscardedData(ctx, req, validationContext, tenantID, validationContext.validationMetrics, validation.RateLimited)

Expand Down
125 changes: 121 additions & 4 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@ import (

otlptranslate "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus"

"github.com/grafana/loki/pkg/push"

"github.com/c2h5oh/datasize"
"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
dskit_flagext "github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/kv/consul"
Expand Down Expand Up @@ -51,6 +50,8 @@ import (
loki_net "github.com/grafana/loki/v3/pkg/util/net"
"github.com/grafana/loki/v3/pkg/util/test"
"github.com/grafana/loki/v3/pkg/validation"

"github.com/grafana/loki/pkg/push"
)

const (
Expand Down Expand Up @@ -441,6 +442,7 @@ func Test_MissingEnforcedLabels(t *testing.T) {
// request with all required labels.
lbs := labels.FromMap(map[string]string{"app": "foo", "env": "prod", "cluster": "cluster1", "namespace": "ns1"})
missing, missingLabels := distributors[0].missingEnforcedLabels(lbs, "test", "policy1")

assert.False(t, missing)
assert.Empty(t, missingLabels)

Expand All @@ -462,25 +464,42 @@ func Test_PushWithEnforcedLabels(t *testing.T) {
flagext.DefaultValues(limits)

// makeWriteRequest only contains a `{foo="bar"}` label.
req := makeWriteRequest(100, 100)
req := makeWriteRequest(100, 100) // 100 lines of 100 bytes each
limits.EnforcedLabels = []string{"app", "env"}
distributors, _ := prepare(t, 1, 3, limits, nil)

// reset metrics in case they were set from a previous test.
validation.DiscardedBytes.Reset()
validation.DiscardedSamples.Reset()

// enforced labels configured, but all labels are missing.
_, err := distributors[0].Push(ctx, req)
require.Error(t, err)
expectedErr := httpgrpc.Errorf(http.StatusBadRequest, validation.MissingEnforcedLabelsErrorMsg, "app,env", "test")
require.EqualError(t, err, expectedErr.Error())

// Verify metrics for discarded samples due to missing enforced labels
assert.Equal(t, float64(10000), testutil.ToFloat64(validation.DiscardedBytes)) // 100 lines * 100 bytes
assert.Equal(t, float64(100), testutil.ToFloat64(validation.DiscardedSamples)) // 100 lines

// enforced labels, but all labels are present.
req = makeWriteRequestWithLabels(100, 100, []string{`{app="foo", env="prod"}`}, false, false, false)
_, err = distributors[0].Push(ctx, req)
require.NoError(t, err)

// Metrics should not have increased since this push was successful
assert.Equal(t, float64(10000), testutil.ToFloat64(validation.DiscardedBytes))
assert.Equal(t, float64(100), testutil.ToFloat64(validation.DiscardedSamples))

// no enforced labels, so no errors.
limits.EnforcedLabels = []string{}
distributors, _ = prepare(t, 1, 3, limits, nil)
_, err = distributors[0].Push(ctx, req)
require.NoError(t, err)

// Metrics should remain unchanged
assert.Equal(t, float64(10000), testutil.ToFloat64(validation.DiscardedBytes))
assert.Equal(t, float64(100), testutil.ToFloat64(validation.DiscardedSamples))
}

func TestDistributorPushConcurrently(t *testing.T) {
Expand Down Expand Up @@ -1672,7 +1691,105 @@ func TestDistributor_PushIngestionBlocked(t *testing.T) {
if tc.expectError {
expectedErr := fmt.Sprintf(validation.BlockedIngestionErrorMsg, "test", tc.blockUntil.Format(time.RFC3339), tc.blockStatusCode)
require.ErrorContains(t, err, expectedErr)
require.Nil(t, response)
} else {
require.NoError(t, err)
require.Equal(t, success, response)
}
})
}
}

func TestDistributor_PushIngestionBlockedByPolicy(t *testing.T) {
now := time.Now()
defaultErrCode := 260

for _, tc := range []struct {
name string
blockUntil map[string]time.Time
policy string
labels string
expectError bool
expectedErrorMsg string
yes bool
}{
{
name: "not blocked - no policy block configured",
policy: "test-policy",
labels: `{foo="bar"}`,
expectError: false,
},
{
name: "not blocked - policy block expired",
blockUntil: map[string]time.Time{
"test-policy": now.Add(-1 * time.Hour),
},
policy: "test-policy",
labels: `{foo="bar"}`,
expectError: false,
},
{
name: "blocked - policy block active",
blockUntil: map[string]time.Time{
"test-policy": now.Add(1 * time.Hour),
},
policy: "test-policy",
labels: `{foo="bar"}`,
expectError: true,
expectedErrorMsg: fmt.Sprintf(validation.BlockedIngestionPolicyErrorMsg, "test", now.Add(1*time.Hour).Format(time.RFC3339), defaultErrCode),
yes: true,
},
{
name: "not blocked - different policy",
blockUntil: map[string]time.Time{
"blocked-policy": now.Add(1 * time.Hour),
},
policy: "test-policy",
labels: `{foo="bar"}`,
expectError: false,
},
{
name: "blocked - custom status code",
blockUntil: map[string]time.Time{
"test-policy": now.Add(1 * time.Hour),
},
policy: "test-policy",
labels: `{foo="bar"}`,
expectError: true,
expectedErrorMsg: fmt.Sprintf(validation.BlockedIngestionPolicyErrorMsg, "test", now.Add(1*time.Hour).Format(time.RFC3339), defaultErrCode),
},
} {
t.Run(tc.name, func(t *testing.T) {
if !tc.yes {
return
}
limits := &validation.Limits{}
flagext.DefaultValues(limits)

// Configure policy mapping
limits.PolicyStreamMapping = validation.PolicyStreamMapping{
tc.policy: []*validation.PriorityStream{
{
Selector: tc.labels,
Priority: 1,
},
},
}

// Configure policy blocks
if tc.blockUntil != nil {
limits.BlockIngestionPolicyUntil = make(map[string]dskit_flagext.Time)
for policy, until := range tc.blockUntil {
limits.BlockIngestionPolicyUntil[policy] = dskit_flagext.Time(until)
}
}

distributors, _ := prepare(t, 1, 3, limits, nil)
request := makeWriteRequestWithLabels(1, 1024, []string{tc.labels}, false, false, false)
response, err := distributors[0].Push(ctx, request)

if tc.expectError {
require.Error(t, err)
require.Contains(t, err.Error(), tc.expectedErrorMsg)
} else {
require.NoError(t, err)
require.Equal(t, success, response)
Expand Down
1 change: 1 addition & 0 deletions pkg/distributor/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Limits interface {

BlockIngestionUntil(userID string) time.Time
BlockIngestionStatusCode(userID string) int
BlockIngestionPolicyUntil(userID string, policy string) time.Time
EnforcedLabels(userID string) []string
PolicyEnforcedLabels(userID string, policy string) []string

Expand Down
Loading