Skip to content

Commit

Permalink
Fixing review findings
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>
  • Loading branch information
duricanikolic committed Jun 11, 2024
1 parent 38d1ff9 commit c1bea5a
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 37 deletions.
16 changes: 8 additions & 8 deletions pkg/ingester/circuitbreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,8 @@ type ingesterCircuitBreaker struct {
read *circuitBreaker
}

func newIngesterCircuitBreaker(pushCfg CircuitBreakerConfig, readCfg CircuitBreakerConfig, logger log.Logger, registerer prometheus.Registerer) *ingesterCircuitBreaker {
return &ingesterCircuitBreaker{
func newIngesterCircuitBreaker(pushCfg CircuitBreakerConfig, readCfg CircuitBreakerConfig, logger log.Logger, registerer prometheus.Registerer) ingesterCircuitBreaker {
return ingesterCircuitBreaker{
push: newCircuitBreaker(pushCfg, registerer, circuitBreakerPushRequestType, logger),
read: newCircuitBreaker(readCfg, registerer, circuitBreakerReadRequestType, logger),
}
Expand All @@ -273,17 +273,17 @@ func (cb *ingesterCircuitBreaker) activate() {
cb.read.activate()
}

// tryPushAcquirePermit tries to acquire a permit to use the push circuit breaker and returns whether a permit was acquired.
// If it was possible, tryPushAcquirePermit returns a function that should be called to release the acquired permit.
// tryAcquirePushPermit tries to acquire a permit to use the push circuit breaker and returns whether a permit was acquired.
// If it was possible, tryAcquirePushPermit returns a function that should be called to release the acquired permit.
// If it was not possible, the causing error is returned.
func (cb *ingesterCircuitBreaker) tryPushAcquirePermit() (func(time.Duration, error), error) {
func (cb *ingesterCircuitBreaker) tryAcquirePushPermit() (func(time.Duration, error), error) {
return cb.push.tryAcquirePermit()
}

// tryReadAcquirePermit tries to acquire a permit to use the read circuit breaker and returns whether a permit was acquired.
// If it was possible, tryReadAcquirePermit returns a function that should be called to release the acquired permit.
// tryAcquireReadPermit tries to acquire a permit to use the read circuit breaker and returns whether a permit was acquired.
// If it was possible, tryAcquireReadPermit returns a function that should be called to release the acquired permit.
// If it was not possible, the causing error is returned.
func (cb *ingesterCircuitBreaker) tryReadAcquirePermit() (func(time.Duration, error), error) {
func (cb *ingesterCircuitBreaker) tryAcquireReadPermit() (func(time.Duration, error), error) {
// If the read circuit breaker is not active, we don't try to acquire a permit.
if !cb.read.isActive() {
return func(time.Duration, error) {}, nil
Expand Down
48 changes: 24 additions & 24 deletions pkg/ingester/circuitbreaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,12 +973,12 @@ func TestPRCircuitBreaker_TryPushAcquirePermit(t *testing.T) {
"cortex_ingester_circuit_breaker_current_state",
}
testCases := map[string]struct {
circuitBreakerSetup func(breaker *ingesterCircuitBreaker)
circuitBreakerSetup func(breaker ingesterCircuitBreaker)
expectedCircuitBreakerError bool
expectedMetrics string
}{
"if push circuit breaker is not active, finish function and no error are returned": {
circuitBreakerSetup: func(cb *ingesterCircuitBreaker) {
circuitBreakerSetup: func(cb ingesterCircuitBreaker) {
cb.push.active.Store(false)
},
expectedCircuitBreakerError: false,
Expand All @@ -1001,7 +1001,7 @@ func TestPRCircuitBreaker_TryPushAcquirePermit(t *testing.T) {
`,
},
"if push circuit breaker is closed, finish function and no error are returned": {
circuitBreakerSetup: func(cb *ingesterCircuitBreaker) {
circuitBreakerSetup: func(cb ingesterCircuitBreaker) {
cb.push.activate()
cb.push.cb.Close()
},
Expand All @@ -1025,7 +1025,7 @@ func TestPRCircuitBreaker_TryPushAcquirePermit(t *testing.T) {
`,
},
"if push circuit breaker is open, no finish function and a circuitBreakerErrorOpen are returned": {
circuitBreakerSetup: func(cb *ingesterCircuitBreaker) {
circuitBreakerSetup: func(cb ingesterCircuitBreaker) {
cb.push.activate()
cb.push.cb.Open()
},
Expand All @@ -1049,7 +1049,7 @@ func TestPRCircuitBreaker_TryPushAcquirePermit(t *testing.T) {
`,
},
"if push circuit breaker is half-open, finish function and no error are returned": {
circuitBreakerSetup: func(cb *ingesterCircuitBreaker) {
circuitBreakerSetup: func(cb ingesterCircuitBreaker) {
cb.push.activate()
cb.push.cb.HalfOpen()
},
Expand Down Expand Up @@ -1094,7 +1094,7 @@ func TestPRCircuitBreaker_TryPushAcquirePermit(t *testing.T) {
recordFailureCount: recordedFailureCount,
}
testCase.circuitBreakerSetup(cb)
finish, err := cb.tryPushAcquirePermit()
finish, err := cb.tryAcquirePushPermit()

if testCase.expectedCircuitBreakerError {
require.Nil(t, finish)
Expand Down Expand Up @@ -1128,12 +1128,12 @@ func TestPRCircuitBreaker_TryReadAcquirePermit(t *testing.T) {
"cortex_ingester_circuit_breaker_current_state",
}
testCases := map[string]struct {
circuitBreakerSetup func(breaker *ingesterCircuitBreaker)
circuitBreakerSetup func(breaker ingesterCircuitBreaker)
expectedCircuitBreakerError bool
expectedMetrics string
}{
"if read circuit breaker is not active and push circuit breaker is not active, finish function and no error are returned": {
circuitBreakerSetup: func(cb *ingesterCircuitBreaker) {
circuitBreakerSetup: func(cb ingesterCircuitBreaker) {
cb.read.active.Store(false)
cb.push.active.Store(false)
},
Expand Down Expand Up @@ -1166,7 +1166,7 @@ func TestPRCircuitBreaker_TryReadAcquirePermit(t *testing.T) {
`,
},
"if read circuit breaker is not active and push circuit breaker is closed, finish function and no error are returned": {
circuitBreakerSetup: func(cb *ingesterCircuitBreaker) {
circuitBreakerSetup: func(cb ingesterCircuitBreaker) {
cb.read.active.Store(false)
cb.push.activate()
cb.push.cb.Close()
Expand Down Expand Up @@ -1200,7 +1200,7 @@ func TestPRCircuitBreaker_TryReadAcquirePermit(t *testing.T) {
`,
},
"if read circuit breaker is not active and push circuit breaker is open, finish function and no error are returned": {
circuitBreakerSetup: func(cb *ingesterCircuitBreaker) {
circuitBreakerSetup: func(cb ingesterCircuitBreaker) {
cb.read.active.Store(false)
cb.push.activate()
cb.push.cb.Open()
Expand Down Expand Up @@ -1234,7 +1234,7 @@ func TestPRCircuitBreaker_TryReadAcquirePermit(t *testing.T) {
`,
},
"if read circuit breaker is not active and push circuit breaker is half-open, finish function and no error are returned": {
circuitBreakerSetup: func(cb *ingesterCircuitBreaker) {
circuitBreakerSetup: func(cb ingesterCircuitBreaker) {
cb.read.active.Store(false)
cb.push.activate()
cb.push.cb.HalfOpen()
Expand Down Expand Up @@ -1268,7 +1268,7 @@ func TestPRCircuitBreaker_TryReadAcquirePermit(t *testing.T) {
`,
},
"if read circuit breaker is closed and push circuit breaker is is not active, finish function and no error are returned": {
circuitBreakerSetup: func(cb *ingesterCircuitBreaker) {
circuitBreakerSetup: func(cb ingesterCircuitBreaker) {
cb.read.activate()
cb.read.cb.Close()
cb.push.active.Store(false)
Expand Down Expand Up @@ -1302,7 +1302,7 @@ func TestPRCircuitBreaker_TryReadAcquirePermit(t *testing.T) {
`,
},
"if read circuit breaker is closed and push circuit breaker is closed, finish function and no error are returned": {
circuitBreakerSetup: func(cb *ingesterCircuitBreaker) {
circuitBreakerSetup: func(cb ingesterCircuitBreaker) {
cb.read.activate()
cb.read.cb.Close()
cb.push.activate()
Expand Down Expand Up @@ -1337,7 +1337,7 @@ func TestPRCircuitBreaker_TryReadAcquirePermit(t *testing.T) {
`,
},
"if read circuit breaker is closed and push circuit breaker is open, finish function and no error are returned": {
circuitBreakerSetup: func(cb *ingesterCircuitBreaker) {
circuitBreakerSetup: func(cb ingesterCircuitBreaker) {
cb.read.activate()
cb.read.cb.Close()
cb.push.activate()
Expand Down Expand Up @@ -1372,7 +1372,7 @@ func TestPRCircuitBreaker_TryReadAcquirePermit(t *testing.T) {
`,
},
"if read circuit breaker is closed and push circuit breaker is half-open, finish function and no error are returned": {
circuitBreakerSetup: func(cb *ingesterCircuitBreaker) {
circuitBreakerSetup: func(cb ingesterCircuitBreaker) {
cb.read.activate()
cb.read.cb.Close()
cb.push.activate()
Expand Down Expand Up @@ -1407,7 +1407,7 @@ func TestPRCircuitBreaker_TryReadAcquirePermit(t *testing.T) {
`,
},
"if read circuit breaker is open and push circuit breaker is not active, no finish function and a circuitBreakerErrorOpen are returned": {
circuitBreakerSetup: func(cb *ingesterCircuitBreaker) {
circuitBreakerSetup: func(cb ingesterCircuitBreaker) {
cb.read.activate()
cb.read.cb.Open()
cb.push.active.Store(false)
Expand Down Expand Up @@ -1441,7 +1441,7 @@ func TestPRCircuitBreaker_TryReadAcquirePermit(t *testing.T) {
`,
},
"if read circuit breaker is open and push circuit breaker is closed, no finish function and a circuitBreakerErrorOpen are returned": {
circuitBreakerSetup: func(cb *ingesterCircuitBreaker) {
circuitBreakerSetup: func(cb ingesterCircuitBreaker) {
cb.read.activate()
cb.read.cb.Open()
cb.push.activate()
Expand Down Expand Up @@ -1476,7 +1476,7 @@ func TestPRCircuitBreaker_TryReadAcquirePermit(t *testing.T) {
`,
},
"if read circuit breaker is open and push circuit breaker is open, finish function and no error are returned": {
circuitBreakerSetup: func(cb *ingesterCircuitBreaker) {
circuitBreakerSetup: func(cb ingesterCircuitBreaker) {
cb.read.activate()
cb.read.cb.Open()
cb.push.activate()
Expand Down Expand Up @@ -1511,7 +1511,7 @@ func TestPRCircuitBreaker_TryReadAcquirePermit(t *testing.T) {
`,
},
"if read circuit breaker is open and push circuit breaker is half-open, finish function and no error are returned": {
circuitBreakerSetup: func(cb *ingesterCircuitBreaker) {
circuitBreakerSetup: func(cb ingesterCircuitBreaker) {
cb.read.activate()
cb.read.cb.Open()
cb.push.activate()
Expand Down Expand Up @@ -1546,7 +1546,7 @@ func TestPRCircuitBreaker_TryReadAcquirePermit(t *testing.T) {
`,
},
"if read circuit breaker is half-open and push circuit breaker is not active, finish function and no error are returned": {
circuitBreakerSetup: func(cb *ingesterCircuitBreaker) {
circuitBreakerSetup: func(cb ingesterCircuitBreaker) {
cb.read.activate()
cb.read.cb.HalfOpen()
cb.push.active.Store(false)
Expand Down Expand Up @@ -1580,7 +1580,7 @@ func TestPRCircuitBreaker_TryReadAcquirePermit(t *testing.T) {
`,
},
"if read circuit breaker is half-open and push circuit breaker is closed, finish function and no error are returned": {
circuitBreakerSetup: func(cb *ingesterCircuitBreaker) {
circuitBreakerSetup: func(cb ingesterCircuitBreaker) {
cb.read.activate()
cb.read.cb.HalfOpen()
cb.push.activate()
Expand Down Expand Up @@ -1615,7 +1615,7 @@ func TestPRCircuitBreaker_TryReadAcquirePermit(t *testing.T) {
`,
},
"if read circuit breaker is half-open and push circuit breaker is open, finish function and no error are returned": {
circuitBreakerSetup: func(cb *ingesterCircuitBreaker) {
circuitBreakerSetup: func(cb ingesterCircuitBreaker) {
cb.read.activate()
cb.read.cb.HalfOpen()
cb.push.activate()
Expand Down Expand Up @@ -1650,7 +1650,7 @@ func TestPRCircuitBreaker_TryReadAcquirePermit(t *testing.T) {
`,
},
"if read circuit breaker is half-open and push circuit breaker is half-open, finish function and no error are returned": {
circuitBreakerSetup: func(cb *ingesterCircuitBreaker) {
circuitBreakerSetup: func(cb ingesterCircuitBreaker) {
cb.read.activate()
cb.read.cb.HalfOpen()
cb.push.activate()
Expand Down Expand Up @@ -1722,7 +1722,7 @@ func TestPRCircuitBreaker_TryReadAcquirePermit(t *testing.T) {
recordFailureCount: readRecordedFailureCount,
}
testCase.circuitBreakerSetup(cb)
finish, err := cb.tryReadAcquirePermit()
finish, err := cb.tryAcquireReadPermit()

if testCase.expectedCircuitBreakerError {
require.Nil(t, finish)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,8 @@ func TestTooBusyError(t *testing.T) {

func TestNewCircuitBreakerOpenError(t *testing.T) {
remainingDelay := 1 * time.Second
expectedMsg := fmt.Sprintf("circuit breaker open on aaa request type with remaining delay %s", remainingDelay.String())
err := newCircuitBreakerOpenError("aaa", remainingDelay)
expectedMsg := fmt.Sprintf("circuit breaker open on foo request type with remaining delay %s", remainingDelay.String())
err := newCircuitBreakerOpenError("foo", remainingDelay)
require.Error(t, err)
require.EqualError(t, err, expectedMsg)
checkIngesterError(t, err, mimirpb.CIRCUIT_BREAKER_OPEN, false)
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ type Ingester struct {
ingestPartitionID int32
ingestPartitionLifecycler *ring.PartitionInstanceLifecycler

circuitBreaker *ingesterCircuitBreaker
circuitBreaker ingesterCircuitBreaker
}

func newIngester(cfg Config, limits *validation.Overrides, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) {
Expand Down Expand Up @@ -1031,7 +1031,7 @@ func (i *Ingester) startPushRequest(ctx context.Context, reqSize int64) (context
// If it is not possible, it is because the circuit breaker is open, and a circuitBreakerOpenError is returned.
// If it is possible, a permit has to be released by recording either a success or a failure with the circuit
// breaker. This is done by FinishPushRequest().
finish, err := i.circuitBreaker.tryPushAcquirePermit()
finish, err := i.circuitBreaker.tryAcquirePushPermit()
if err != nil {
return nil, false, err
}
Expand Down Expand Up @@ -3790,7 +3790,7 @@ func (i *Ingester) ShutdownHandler(w http.ResponseWriter, _ *http.Request) {
// function is returned.
func (i *Ingester) startReadRequest() (func(error), error) {
start := time.Now()
finish, err := i.circuitBreaker.tryReadAcquirePermit()
finish, err := i.circuitBreaker.tryAcquireReadPermit()
if err != nil {
return nil, err
}
Expand Down

0 comments on commit c1bea5a

Please sign in to comment.