Skip to content

Commit

Permalink
feat: Add support for blocking a policy to be ingested (#16203)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
Add support for blocking a policy from being ingested.
The policy is defined by stream mapping.
  • Loading branch information
DylanGuedes authored Feb 17, 2025
1 parent 677faad commit 69089ef
Show file tree
Hide file tree
Showing 8 changed files with 262 additions and 94 deletions.
5 changes: 5 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3641,6 +3641,11 @@ otlp_config:
# drop them altogether
[log_attributes: <list of attributes_configs>]

# Block ingestion for policy until the configured date. The time should be in
# RFC3339 format. The policy is based on the policy_stream_mapping
# configuration.
[block_ingestion_policy_until: <map of string to Time>]

# Block ingestion until the configured date. The time should be in RFC3339
# format.
# CLI flag: -limits.block-ingestion-until
Expand Down
34 changes: 17 additions & 17 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,9 +550,24 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
err := fmt.Errorf(validation.MissingEnforcedLabelsErrorMsg, strings.Join(lbsMissing, ","), tenantID)
d.writeFailuresManager.Log(tenantID, err)
validationErrors.Add(err)
validation.DiscardedSamples.WithLabelValues(validation.MissingEnforcedLabels, tenantID, retentionHours, policy).Add(float64(len(stream.Entries)))
discardedBytes := util.EntriesTotalSize(stream.Entries)
validation.DiscardedBytes.WithLabelValues(validation.MissingEnforcedLabels, tenantID, retentionHours, policy).Add(float64(discardedBytes))
d.validator.reportDiscardedData(validation.MissingEnforcedLabels, validationContext, retentionHours, policy, discardedBytes, len(stream.Entries))
continue
}

if block, statusCode, reason, err := d.validator.ShouldBlockIngestion(validationContext, now, policy); block {
d.writeFailuresManager.Log(tenantID, err)
discardedBytes := util.EntriesTotalSize(stream.Entries)
d.validator.reportDiscardedData(reason, validationContext, retentionHours, policy, discardedBytes, len(stream.Entries))

// If the status code is 200, return success.
// Note that we still log the error and increment the metrics.
if statusCode == http.StatusOK {
// do not add error to validationErrors.
continue
}

validationErrors.Add(err)
continue
}

Expand Down Expand Up @@ -639,21 +654,6 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return &logproto.PushResponse{}, validationErr
}

if block, until, retStatusCode := d.validator.ShouldBlockIngestion(validationContext, now); block {
d.trackDiscardedData(ctx, req, validationContext, tenantID, validationContext.validationMetrics, validation.BlockedIngestion)

err = fmt.Errorf(validation.BlockedIngestionErrorMsg, tenantID, until.Format(time.RFC3339), retStatusCode)
d.writeFailuresManager.Log(tenantID, err)

// If the status code is 200, return success.
// Note that we still log the error and increment the metrics.
if retStatusCode == http.StatusOK {
return &logproto.PushResponse{}, nil
}

return nil, httpgrpc.Errorf(retStatusCode, "%s", err.Error())
}

if !d.ingestionRateLimiter.AllowN(now, tenantID, validationContext.validationMetrics.aggregatedPushStats.lineSize) {
d.trackDiscardedData(ctx, req, validationContext, tenantID, validationContext.validationMetrics, validation.RateLimited)

Expand Down
125 changes: 121 additions & 4 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@ import (

otlptranslate "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus"

"github.com/grafana/loki/pkg/push"

"github.com/c2h5oh/datasize"
"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
dskit_flagext "github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/kv/consul"
Expand Down Expand Up @@ -51,6 +50,8 @@ import (
loki_net "github.com/grafana/loki/v3/pkg/util/net"
"github.com/grafana/loki/v3/pkg/util/test"
"github.com/grafana/loki/v3/pkg/validation"

"github.com/grafana/loki/pkg/push"
)

const (
Expand Down Expand Up @@ -441,6 +442,7 @@ func Test_MissingEnforcedLabels(t *testing.T) {
// request with all required labels.
lbs := labels.FromMap(map[string]string{"app": "foo", "env": "prod", "cluster": "cluster1", "namespace": "ns1"})
missing, missingLabels := distributors[0].missingEnforcedLabels(lbs, "test", "policy1")

assert.False(t, missing)
assert.Empty(t, missingLabels)

Expand All @@ -462,25 +464,42 @@ func Test_PushWithEnforcedLabels(t *testing.T) {
flagext.DefaultValues(limits)

// makeWriteRequest only contains a `{foo="bar"}` label.
req := makeWriteRequest(100, 100)
req := makeWriteRequest(100, 100) // 100 lines of 100 bytes each
limits.EnforcedLabels = []string{"app", "env"}
distributors, _ := prepare(t, 1, 3, limits, nil)

// reset metrics in case they were set from a previous test.
validation.DiscardedBytes.Reset()
validation.DiscardedSamples.Reset()

// enforced labels configured, but all labels are missing.
_, err := distributors[0].Push(ctx, req)
require.Error(t, err)
expectedErr := httpgrpc.Errorf(http.StatusBadRequest, validation.MissingEnforcedLabelsErrorMsg, "app,env", "test")
require.EqualError(t, err, expectedErr.Error())

// Verify metrics for discarded samples due to missing enforced labels
assert.Equal(t, float64(10000), testutil.ToFloat64(validation.DiscardedBytes)) // 100 lines * 100 bytes
assert.Equal(t, float64(100), testutil.ToFloat64(validation.DiscardedSamples)) // 100 lines

// enforced labels, but all labels are present.
req = makeWriteRequestWithLabels(100, 100, []string{`{app="foo", env="prod"}`}, false, false, false)
_, err = distributors[0].Push(ctx, req)
require.NoError(t, err)

// Metrics should not have increased since this push was successful
assert.Equal(t, float64(10000), testutil.ToFloat64(validation.DiscardedBytes))
assert.Equal(t, float64(100), testutil.ToFloat64(validation.DiscardedSamples))

// no enforced labels, so no errors.
limits.EnforcedLabels = []string{}
distributors, _ = prepare(t, 1, 3, limits, nil)
_, err = distributors[0].Push(ctx, req)
require.NoError(t, err)

// Metrics should remain unchanged
assert.Equal(t, float64(10000), testutil.ToFloat64(validation.DiscardedBytes))
assert.Equal(t, float64(100), testutil.ToFloat64(validation.DiscardedSamples))
}

func TestDistributorPushConcurrently(t *testing.T) {
Expand Down Expand Up @@ -1672,7 +1691,105 @@ func TestDistributor_PushIngestionBlocked(t *testing.T) {
if tc.expectError {
expectedErr := fmt.Sprintf(validation.BlockedIngestionErrorMsg, "test", tc.blockUntil.Format(time.RFC3339), tc.blockStatusCode)
require.ErrorContains(t, err, expectedErr)
require.Nil(t, response)
} else {
require.NoError(t, err)
require.Equal(t, success, response)
}
})
}
}

func TestDistributor_PushIngestionBlockedByPolicy(t *testing.T) {
now := time.Now()
defaultErrCode := 260

for _, tc := range []struct {
name string
blockUntil map[string]time.Time
policy string
labels string
expectError bool
expectedErrorMsg string
yes bool
}{
{
name: "not blocked - no policy block configured",
policy: "test-policy",
labels: `{foo="bar"}`,
expectError: false,
},
{
name: "not blocked - policy block expired",
blockUntil: map[string]time.Time{
"test-policy": now.Add(-1 * time.Hour),
},
policy: "test-policy",
labels: `{foo="bar"}`,
expectError: false,
},
{
name: "blocked - policy block active",
blockUntil: map[string]time.Time{
"test-policy": now.Add(1 * time.Hour),
},
policy: "test-policy",
labels: `{foo="bar"}`,
expectError: true,
expectedErrorMsg: fmt.Sprintf(validation.BlockedIngestionPolicyErrorMsg, "test", now.Add(1*time.Hour).Format(time.RFC3339), defaultErrCode),
yes: true,
},
{
name: "not blocked - different policy",
blockUntil: map[string]time.Time{
"blocked-policy": now.Add(1 * time.Hour),
},
policy: "test-policy",
labels: `{foo="bar"}`,
expectError: false,
},
{
name: "blocked - custom status code",
blockUntil: map[string]time.Time{
"test-policy": now.Add(1 * time.Hour),
},
policy: "test-policy",
labels: `{foo="bar"}`,
expectError: true,
expectedErrorMsg: fmt.Sprintf(validation.BlockedIngestionPolicyErrorMsg, "test", now.Add(1*time.Hour).Format(time.RFC3339), defaultErrCode),
},
} {
t.Run(tc.name, func(t *testing.T) {
if !tc.yes {
return
}
limits := &validation.Limits{}
flagext.DefaultValues(limits)

// Configure policy mapping
limits.PolicyStreamMapping = validation.PolicyStreamMapping{
tc.policy: []*validation.PriorityStream{
{
Selector: tc.labels,
Priority: 1,
},
},
}

// Configure policy blocks
if tc.blockUntil != nil {
limits.BlockIngestionPolicyUntil = make(map[string]dskit_flagext.Time)
for policy, until := range tc.blockUntil {
limits.BlockIngestionPolicyUntil[policy] = dskit_flagext.Time(until)
}
}

distributors, _ := prepare(t, 1, 3, limits, nil)
request := makeWriteRequestWithLabels(1, 1024, []string{tc.labels}, false, false, false)
response, err := distributors[0].Push(ctx, request)

if tc.expectError {
require.Error(t, err)
require.Contains(t, err.Error(), tc.expectedErrorMsg)
} else {
require.NoError(t, err)
require.Equal(t, success, response)
Expand Down
1 change: 1 addition & 0 deletions pkg/distributor/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Limits interface {

BlockIngestionUntil(userID string) time.Time
BlockIngestionStatusCode(userID string) int
BlockIngestionPolicyUntil(userID string, policy string) time.Time
EnforcedLabels(userID string) []string
PolicyEnforcedLabels(userID string, policy string) []string

Expand Down
Loading

0 comments on commit 69089ef

Please sign in to comment.