Skip to content

Commit

Permalink
Create base error type for ingester per-instance errors and remove lo…
Browse files Browse the repository at this point in the history
…gging for them (#5585)

This allows us to decorate them with extra information for gRPC responses
and our logging middleware (to prevent them from being logged which is
expensive).

Related #5581
Related weaveworks/common#299
  • Loading branch information
56quarters authored Aug 3, 2023
1 parent 3a622c5 commit fa415b0
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* [CHANGE] Querier: Renamed `-querier.prefer-streaming-chunks` to `-querier.prefer-streaming-chunks-from-ingesters` to enable streaming chunks from ingesters to queriers. #5182
* [CHANGE] Querier: `-query-frontend.cache-unaligned-requests` has been moved from a global flag to a per-tenant override. #5312
* [CHANGE] Ingester: removed `cortex_ingester_shipper_dir_syncs_total` and `cortex_ingester_shipper_dir_sync_failures_total` metrics. The former metric was not much useful, and the latter was never incremented. #5396
* [CHANGE] Ingester: Do not log errors related to hitting per-instance limits to reduce resource usage when ingesters are under pressure. #5585
* [CHANGE] gRPC clients: use default connect timeout of 5s, and therefore enable default connect backoff max delay of 5s. #5562
* [FEATURE] Cardinality API: Add a new `count_method` parameter which enables counting active series #5136
* [FEATURE] Query-frontend: added experimental support to cache cardinality, label names and label values query responses. The cache will be used when `-query-frontend.cache-results` is enabled, and `-query-frontend.results-cache-ttl-for-cardinality-query` or `-query-frontend.results-cache-ttl-for-labels-query` set to a value greater than 0. The following metrics have been added to track the query results cache hit ratio per `request_type`: #5212 #5235 #5426 #5524
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.0.0 // indirect
github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect
github.com/cenkalti/backoff/v3 v3.2.2 // indirect
github.com/go-test/deep v1.1.0 // indirect
github.com/hashicorp/go-retryablehttp v0.7.4 // indirect
Expand All @@ -100,6 +99,7 @@ require (
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v1.1.1 // indirect
github.com/DmitriyVTitov/size v1.5.0 // indirect
github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/armon/go-metrics v0.4.1 // indirect
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
Expand Down
16 changes: 13 additions & 3 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,11 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, pushReq *push.Request) (

db, err := i.getOrCreateTSDB(userID, false)
if err != nil {
// Check for a particular per-instance limit and return that error directly
// since it contains extra information for gRPC and our logging middleware.
if errors.Is(err, errMaxTenantsReached) {
return nil, err
}
return nil, wrapWithUser(err, userID)
}

Expand Down Expand Up @@ -805,7 +810,12 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, pushReq *push.Request) (
level.Warn(i.logger).Log("msg", "failed to rollback appender on error", "user", userID, "err", err)
}

return nil, err
// Check for a particular per-instance limit and return that error directly
// since it contains extra information for gRPC and our logging middleware.
if errors.Is(err, errMaxInMemorySeriesReached) {
return nil, err
}
return nil, wrapWithUser(err, userID)
}

// At this point all samples have been added to the appender, so we can track the time it took.
Expand Down Expand Up @@ -1038,7 +1048,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre
}

// Otherwise, return a 500.
return wrapWithUser(err, userID)
return err
}

numNativeHistogramBuckets := -1
Expand Down Expand Up @@ -1079,7 +1089,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre
continue
}

return wrapWithUser(err, userID)
return err
}
numNativeHistograms := len(ts.Histograms)
if numNativeHistograms > 0 {
Expand Down
34 changes: 22 additions & 12 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ import (
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/grafana/mimir/pkg/ingester/activeseries"
"github.com/grafana/mimir/pkg/ingester/client"
Expand All @@ -63,7 +65,6 @@ import (
"github.com/grafana/mimir/pkg/storage/tsdb/block"
"github.com/grafana/mimir/pkg/usagestats"
"github.com/grafana/mimir/pkg/util"
util_log "github.com/grafana/mimir/pkg/util/log"
util_math "github.com/grafana/mimir/pkg/util/math"
"github.com/grafana/mimir/pkg/util/push"
util_test "github.com/grafana/mimir/pkg/util/test"
Expand Down Expand Up @@ -5415,10 +5416,9 @@ func TestIngesterNoFlushWithInFlightRequest(t *testing.T) {

func TestIngester_PushInstanceLimits(t *testing.T) {
tests := map[string]struct {
limits InstanceLimits
reqs map[string][]*mimirpb.WriteRequest
expectedErr error
expectedErrType interface{}
limits InstanceLimits
reqs map[string][]*mimirpb.WriteRequest
expectedErr error
}{
"should succeed creating one user and series": {
limits: InstanceLimits{MaxInMemorySeries: 1, MaxInMemoryTenants: 1},
Expand Down Expand Up @@ -5461,7 +5461,7 @@ func TestIngester_PushInstanceLimits(t *testing.T) {
},
},

expectedErr: wrapWithUser(errMaxInMemorySeriesReached, "test"),
expectedErr: errMaxInMemorySeriesReached,
},

"should fail creating two users": {
Expand All @@ -5488,7 +5488,7 @@ func TestIngester_PushInstanceLimits(t *testing.T) {
),
},
},
expectedErr: wrapWithUser(errMaxTenantsReached, "user2"),
expectedErr: errMaxTenantsReached,
},

"should fail pushing samples in two requests due to rate limit": {
Expand Down Expand Up @@ -5557,9 +5557,12 @@ func TestIngester_PushInstanceLimits(t *testing.T) {
} else {
// Last push may expect error.
if testData.expectedErr != nil {
assert.Equal(t, testData.expectedErr, err)
} else if testData.expectedErrType != nil {
assert.True(t, errors.As(err, testData.expectedErrType), "expected error type %T, got %v", testData.expectedErrType, err)
assert.ErrorIs(t, err, testData.expectedErr)
var optional middleware.OptionalLogging
assert.ErrorAs(t, err, &optional)
s, ok := status.FromError(err)
require.True(t, ok, "expected to be able to convert to gRPC status")
assert.Equal(t, codes.Unavailable, s.Code())
} else {
assert.NoError(t, err)
}
Expand Down Expand Up @@ -5687,10 +5690,17 @@ func TestIngester_inflightPushRequests(t *testing.T) {

time.Sleep(10 * time.Millisecond) // Give first goroutine a chance to start pushing...
req := generateSamplesForLabel(labels.FromStrings(labels.MetricName, "testcase"), 1, 1024)
var optional middleware.OptionalLogging

_, err := i.Push(ctx, req)
require.Equal(t, errMaxInflightRequestsReached, err)
require.ErrorAs(t, err, &util_log.DoNotLogError{})
require.ErrorIs(t, err, errMaxInflightRequestsReached)
require.ErrorAs(t, err, &optional)
require.False(t, optional.ShouldLog(ctx, time.Duration(0)), "expected not to log via .ShouldLog()")

s, ok := status.FromError(err)
require.True(t, ok, "expected to be able to convert to gRPC status")
require.Equal(t, codes.Unavailable, s.Code())

return nil
})

Expand Down
43 changes: 36 additions & 7 deletions pkg/ingester/instance_limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
package ingester

import (
"context"
"flag"
"time"

"github.com/pkg/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"gopkg.in/yaml.v3"

"github.com/grafana/mimir/pkg/util/globalerror"
util_log "github.com/grafana/mimir/pkg/util/log"
)

const (
Expand All @@ -22,14 +24,41 @@ const (
maxInflightPushRequestsFlag = "ingester.instance-limits.max-inflight-push-requests"
)

// We don't include values in the messages for per-instance limits to avoid leaking Mimir cluster configuration to users.
var (
// We don't include values in the message to avoid leaking Mimir cluster configuration to users.
errMaxIngestionRateReached = errors.New(globalerror.IngesterMaxIngestionRate.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the samples ingestion rate limit", maxIngestionRateFlag))
errMaxTenantsReached = errors.New(globalerror.IngesterMaxTenants.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the allowed number of tenants", maxInMemoryTenantsFlag))
errMaxInMemorySeriesReached = errors.New(globalerror.IngesterMaxInMemorySeries.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the allowed number of in-memory series", maxInMemorySeriesFlag))
errMaxInflightRequestsReached = util_log.DoNotLogError{Err: errors.New(globalerror.IngesterMaxInflightPushRequests.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the allowed number of inflight push requests", maxInflightPushRequestsFlag))}
errMaxIngestionRateReached = newInstanceLimitError(globalerror.IngesterMaxIngestionRate.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the samples ingestion rate limit", maxIngestionRateFlag))
errMaxTenantsReached = newInstanceLimitError(globalerror.IngesterMaxTenants.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the allowed number of tenants", maxInMemoryTenantsFlag))
errMaxInMemorySeriesReached = newInstanceLimitError(globalerror.IngesterMaxInMemorySeries.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the allowed number of in-memory series", maxInMemorySeriesFlag))
errMaxInflightRequestsReached = newInstanceLimitError(globalerror.IngesterMaxInflightPushRequests.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the allowed number of inflight push requests", maxInflightPushRequestsFlag))
)

type instanceLimitErr struct {
msg string
status *status.Status
}

func newInstanceLimitError(msg string) error {
return &instanceLimitErr{
// Errors from hitting per-instance limits are always "unavailable" for gRPC
status: status.New(codes.Unavailable, msg),
msg: msg,
}
}

func (e *instanceLimitErr) ShouldLog(context.Context, time.Duration) bool {
// We increment metrics when hitting per-instance limits and so there's no need to
// log them, the error doesn't contain any interesting information for us.
return false
}

func (e *instanceLimitErr) GRPCStatus() *status.Status {
return e.status
}

func (e *instanceLimitErr) Error() string {
return e.msg
}

// InstanceLimits describes limits used by ingester. Reaching any of these will result in Push method to return
// (internal) error.
type InstanceLimits struct {
Expand Down
34 changes: 34 additions & 0 deletions pkg/ingester/instance_limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,16 @@
package ingester

import (
"context"
"fmt"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/weaveworks/common/middleware"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"gopkg.in/yaml.v3"
)

Expand All @@ -34,3 +40,31 @@ max_tenants: 50000
require.Equal(t, int64(30), l.MaxInMemorySeries) // default value
require.Equal(t, int64(40), l.MaxInflightPushRequests) // default value
}

func TestInstanceLimitErr(t *testing.T) {
t.Run("bare error implements ShouldLog()", func(t *testing.T) {
var optional middleware.OptionalLogging
require.ErrorAs(t, errMaxInflightRequestsReached, &optional)
require.False(t, optional.ShouldLog(context.Background(), time.Duration(0)))
})

t.Run("wrapped error implements ShouldLog()", func(t *testing.T) {
err := fmt.Errorf("%w: oh no", errMaxTenantsReached)
var optional middleware.OptionalLogging
require.ErrorAs(t, err, &optional)
require.False(t, optional.ShouldLog(context.Background(), time.Duration(0)))
})

t.Run("bare error implements GRPCStatus()", func(t *testing.T) {
s, ok := status.FromError(errMaxInMemorySeriesReached)
require.True(t, ok, "expected to be able to convert to gRPC status")
require.Equal(t, codes.Unavailable, s.Code())
})

t.Run("wrapped error implements GRPCStatus()", func(t *testing.T) {
err := fmt.Errorf("%w: oh no", errMaxIngestionRateReached)
s, ok := status.FromError(err)
require.True(t, ok, "expected to be able to convert to gRPC status")
require.Equal(t, codes.Unavailable, s.Code())
})
}

0 comments on commit fa415b0

Please sign in to comment.