Skip to content

Commit

Permalink
Update distributed_gobreaker.go
Browse files Browse the repository at this point in the history
  • Loading branch information
YoshiyukiMineo authored Dec 28, 2024
1 parent 9e4111b commit 6cbc544
Showing 1 changed file with 63 additions and 46 deletions.
109 changes: 63 additions & 46 deletions v2/distributed_gobreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ import (
"time"
)

var (
// ErrNoSharedStore is returned when there is no shared store.
ErrNoSharedStore = errors.New("no shared store")
// ErrNoSharedState is returned when there is no shared state.
ErrNoSharedState = errors.New("no shared state")
)

// SharedState represents the shared state of DistributedCircuitBreaker.
type SharedState struct {
State State `json:"state"`
Expand All @@ -15,36 +22,45 @@ type SharedState struct {
Expiry time.Time `json:"expiry"`
}

// SharedDataStore stores the shared state of DistributedCircuitBreaker.
type SharedDataStore interface {
GetData(ctx context.Context, name string) ([]byte, error)
SetData(ctx context.Context, name string, data []byte) error
}

// DistributedCircuitBreaker extends CircuitBreaker with distributed state storage
// DistributedCircuitBreaker extends CircuitBreaker with SharedDataStore.
type DistributedCircuitBreaker[T any] struct {
*CircuitBreaker[T]
store SharedDataStore
}

// NewDistributedCircuitBreaker returns a new DistributedCircuitBreaker configured with the given StorageSettings
func NewDistributedCircuitBreaker[T any](store SharedDataStore, settings Settings) *DistributedCircuitBreaker[T] {
cb := NewCircuitBreaker[T](settings)
return &DistributedCircuitBreaker[T]{
CircuitBreaker: cb,
// NewDistributedCircuitBreaker returns a new DistributedCircuitBreaker.
func NewDistributedCircuitBreaker[T any](ctx context.Context, store SharedDataStore, settings Settings) (*DistributedCircuitBreaker[T], error) {
if store == nil {
return nil, ErrNoSharedStore
}

dcb := &DistributedCircuitBreaker[T]{
CircuitBreaker: NewCircuitBreaker[T](settings),
store: store,
}
err := dcb.setSharedState(ctx, cb.state)
return dcb, err
}

func (rcb *DistributedCircuitBreaker[T]) getStorageKey() string {
return "cb:" + rcb.name
func (dcb *DistributedCircuitBreaker[T]) sharedStateKey() string {
return "gobreaker:" + dcb.name
}

func (rcb *DistributedCircuitBreaker[T]) getStoredState(ctx context.Context) (SharedState, error) {
func (dcb *DistributedCircuitBreaker[T]) getSharedState(ctx context.Context) (SharedState, error) {
var state SharedState
data, err := rcb.store.GetData(ctx, rcb.getStorageKey())
if dcb.store == nil {
return state, ErrNoSharedStore
}

data, err := dcb.store.GetData(ctx, dcb.sharedStateKey())
if len(data) == 0 {
// Key doesn't exist, return default state
return SharedState{State: StateClosed}, nil
return state, ErrNoSharedState
} else if err != nil {
return state, err
}
Expand All @@ -53,68 +69,69 @@ func (rcb *DistributedCircuitBreaker[T]) getStoredState(ctx context.Context) (Sh
return state, err
}

func (rcb *DistributedCircuitBreaker[T]) setStoredState(ctx context.Context, state SharedState) error {
func (dcb *DistributedCircuitBreaker[T]) setSharedState(ctx context.Context, state SharedState) error {
if dcb.store == nil {
return ErrNoSharedStore
}

data, err := json.Marshal(state)
if err != nil {
return err
}

return rcb.store.SetData(ctx, rcb.getStorageKey(), data)
return dcb.store.SetData(ctx, dcb.sharedStateKey(), data)
}

func (dcb *DistributedCircuitBreaker[T]) State(ctx context.Context) State {
if dcb.store == nil {
return dcb.CircuitBreaker.State()
}

state, err := dcb.getStoredState(ctx)
// State returns the State of DistributedCircuitBreaker.
func (dcb *DistributedCircuitBreaker[T]) State(ctx context.Context) (State, error) {
state, err := dcb.getSharedState(ctx)
if err != nil {
// Fallback to in-memory state if Storage fails
return dcb.CircuitBreaker.State()
return state.State, err
}

now := time.Now()
currentState, _ := dcb.currentState(state, now)

// Update the state in Storage if it has changed
// update the state if it has changed
if currentState != state.State {
state.State = currentState
if err := dcb.setStoredState(ctx, state); err != nil {
// Log the error, but continue with the current state
fmt.Printf("Failed to update state in storage: %v\n", err)
if err := dcb.setSharedState(ctx, state); err != nil {
return state.State, err
}
}

return state.State
return state.State, nil
}

// Execute runs the given request if the DistributedCircuitBreaker accepts it
func (dcb *DistributedCircuitBreaker[T]) Execute(ctx context.Context, req func() (T, error)) (T, error) {
if dcb.store == nil {
return dcb.CircuitBreaker.Execute(req)
}
// Execute runs the given request if the DistributedCircuitBreaker accepts it.
func (dcb *DistributedCircuitBreaker[T]) Execute(ctx context.Context, req func() (T, error)) (T, err error) {
generation, err := dcb.beforeRequest(ctx)
if err != nil {
var zero T
return zero, err
var defaultValue T
return defaultValue, err
}

defer func() {
e := recover()
if e != nil {
dcb.afterRequest(ctx, generation, false)
ae := dcb.afterRequest(ctx, generation, false)
if err == nil {
err = ae
}
panic(e)
}
}()

result, err := req()
dcb.afterRequest(ctx, generation, dcb.isSuccessful(err))

ae := dcb.afterRequest(ctx, generation, dcb.isSuccessful(err))
if err == nil {
err = ae
}
return result, err
}

func (dcb *DistributedCircuitBreaker[T]) beforeRequest(ctx context.Context) (uint64, error) {
state, err := dcb.getStoredState(ctx)
state, err := dcb.getSharedState(ctx)
if err != nil {
return 0, err
}
Expand All @@ -123,7 +140,7 @@ func (dcb *DistributedCircuitBreaker[T]) beforeRequest(ctx context.Context) (uin

if currentState != state.State {
dcb.setState(&state, currentState, now)
err = dcb.setStoredState(ctx, state)
err = dcb.setSharedState(ctx, state)
if err != nil {
return 0, err
}
Expand All @@ -136,32 +153,32 @@ func (dcb *DistributedCircuitBreaker[T]) beforeRequest(ctx context.Context) (uin
}

state.Counts.onRequest()
err = dcb.setStoredState(ctx, state)
err = dcb.setSharedState(ctx, state)
if err != nil {
return 0, err
}

return generation, nil
}

func (dcb *DistributedCircuitBreaker[T]) afterRequest(ctx context.Context, before uint64, success bool) {
state, err := dcb.getStoredState(ctx)
func (dcb *DistributedCircuitBreaker[T]) afterRequest(ctx context.Context, before uint64, success bool) error {
state, err := dcb.getSharedState(ctx)
if err != nil {
return
return err
}

now := time.Now()
currentState, generation := dcb.currentState(state, now)
if generation != before {
return
return nil
}

if success {
dcb.onSuccess(&state, currentState, now)
} else {
dcb.onFailure(&state, currentState, now)
}

dcb.setStoredState(ctx, state)
return dcb.setSharedState(ctx, state)
}

func (dcb *DistributedCircuitBreaker[T]) onSuccess(state *SharedState, currentState State, now time.Time) {
Expand Down

0 comments on commit 6cbc544

Please sign in to comment.