Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
octu0 committed Oct 21, 2024
1 parent a57b0cc commit ce62e34
Show file tree
Hide file tree
Showing 7 changed files with 721 additions and 188 deletions.
274 changes: 139 additions & 135 deletions bully.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ import (
)

const (
DefaultElectionTimeout = 30 * time.Second
DefaultElectionInterval = 100 * time.Millisecond
DefaultUpdateNodeTimeout = 3 * time.Second
DefaultJoinNodeTimeout = 3 * time.Second
DefaultLeaveNodeTimeout = 3 * time.Second
DefaultElectionTimeout = 30 * time.Second
DefaultElectionInterval = 1 * time.Millisecond
DefaultUpdateNodeTimeout = 5 * time.Second
DefaultJoinNodeTimeout = 10 * time.Second
DefaultLeaveNodeTimeout = 10 * time.Second
DefaultTransferLeaderTimeout = 10 * time.Second
)

var (
Expand All @@ -42,7 +43,6 @@ type NodeEvent uint8
const (
JoinEvent NodeEvent = iota + 1
LeaveEvent
UpdateEvent
)

func (evt NodeEvent) String() string {
Expand All @@ -51,8 +51,6 @@ func (evt NodeEvent) String() string {
return "join"
case LeaveEvent:
return "leave"
case UpdateEvent:
return "update"
}
return "unknown event"
}
Expand All @@ -78,14 +76,15 @@ func DefaultOnErrorFunc(err error) {
type BullyOptFunc func(*bullyOpt)

type bullyOpt struct {
observeFunc ObserveFunc
electionTimeout time.Duration
electionInterval time.Duration
updateNodeTimeout time.Duration
joinNodeTimeout time.Duration
leaveNodeTimeout time.Duration
nodeNumberGenFunc NodeNumberGeneratorFunc
onErrorFunc OnErrorFunc
observeFunc ObserveFunc
electionTimeout time.Duration
electionInterval time.Duration
updateNodeTimeout time.Duration
joinNodeTimeout time.Duration
leaveNodeTimeout time.Duration
transferLeaderTimeout time.Duration
nodeNumberGenFunc NodeNumberGeneratorFunc
onErrorFunc OnErrorFunc
}

func WithObserveFunc(f ObserveFunc) BullyOptFunc {
Expand Down Expand Up @@ -124,6 +123,12 @@ func WithLeaveNodeTimeout(d time.Duration) BullyOptFunc {
}
}

func WithTransferLeaderTimeout(d time.Duration) BullyOptFunc {
return func(o *bullyOpt) {
o.transferLeaderTimeout = d
}
}

func WithNodeNumberGeneratorFunc(f NodeNumberGeneratorFunc) BullyOptFunc {
return func(o *bullyOpt) {
o.nodeNumberGenFunc = f
Expand All @@ -138,14 +143,15 @@ func WithOnErrorFunc(f OnErrorFunc) BullyOptFunc {

func newBullyOpt(opts []BullyOptFunc) *bullyOpt {
opt := &bullyOpt{
electionTimeout: DefaultElectionTimeout,
electionInterval: DefaultElectionInterval,
updateNodeTimeout: DefaultUpdateNodeTimeout,
joinNodeTimeout: DefaultJoinNodeTimeout,
leaveNodeTimeout: DefaultLeaveNodeTimeout,
observeFunc: DefaultObserverFunc,
nodeNumberGenFunc: DefaultNodeNumberGeneratorFunc,
onErrorFunc: DefaultOnErrorFunc,
electionTimeout: DefaultElectionTimeout,
electionInterval: DefaultElectionInterval,
updateNodeTimeout: DefaultUpdateNodeTimeout,
joinNodeTimeout: DefaultJoinNodeTimeout,
leaveNodeTimeout: DefaultLeaveNodeTimeout,
transferLeaderTimeout: DefaultTransferLeaderTimeout,
observeFunc: DefaultObserverFunc,
nodeNumberGenFunc: DefaultNodeNumberGeneratorFunc,
onErrorFunc: DefaultOnErrorFunc,
}
for _, f := range opts {
f(opt)
Expand All @@ -154,16 +160,17 @@ func newBullyOpt(opts []BullyOptFunc) *bullyOpt {
}

type Bully struct {
opt *bullyOpt
mu *sync.RWMutex
wg *sync.WaitGroup
onceStartup *sync.Once
waitStartup chan struct{}
joinWait map[string]chan struct{}
leaveWait map[string]chan struct{}
cancel context.CancelFunc
node Node
list *memberlist.Memberlist
opt *bullyOpt
mu *sync.RWMutex
wg *sync.WaitGroup
onceStartup *sync.Once
waitStartup chan struct{}
joinWait map[string]chan struct{}
leaveWait map[string]chan struct{}
electionQueue chan struct{}
cancel context.CancelFunc
node Node
list *memberlist.Memberlist
}

func (b *Bully) IsVoter() bool {
Expand Down Expand Up @@ -223,7 +230,7 @@ func (b *Bully) Join(addr string) error {
ret := make(chan error)
go func() {
select {
case <-time.After(b.opt.joinNodeTimeout * 2):
case <-time.After(b.opt.joinNodeTimeout):
ret <- errors.Wrapf(ErrJoinTimeout, "memberlist joined but not call callback")
case <-wait:
ret <- nil // ok
Expand Down Expand Up @@ -252,7 +259,7 @@ func (b *Bully) Leave() error {
ret := make(chan error)
go func() {
select {
case <-time.After(b.opt.leaveNodeTimeout * 2):
case <-time.After(b.opt.leaveNodeTimeout):
ret <- errors.Wrapf(ErrLeaveTimeout, "memberlist leaved but not call callback")
case <-wait:
ret <- nil // ok
Expand All @@ -265,18 +272,6 @@ func (b *Bully) Leave() error {
return <-ret
}

func (b *Bully) TransferLeader(ctx context.Context) error {
if b.node.IsLeader() != true {
return nil
}

if err := startTransferLeader(ctx, b); err != nil {
return errors.WithStack(err)
}

return nil
}

func (b *Bully) Shutdown() error {
if err := b.list.Shutdown(); err != nil {
return errors.WithStack(err)
Expand Down Expand Up @@ -441,62 +436,87 @@ func (b *Bully) readNodeEventLoop(ctx context.Context, ch chan *hookNodeEventMsg
select {
case <-ctx.Done():
return

case <-b.electionQueue:
if err := startElection(ctx, b); err != nil {
b.opt.onErrorFunc(errors.Wrapf(err, "election failure"))
continue
}

case msg := <-ch:
switch msg.evt {
case JoinEvent:
if err := startElection(ctx, b); err != nil {
b.opt.onErrorFunc(errors.Wrapf(err, "election failure"))
continue
}
if msg.node.Name == b.node.ID() {
b.onceStartup.Do(func() {
select {
case b.waitStartup <- struct{}{}:
// ok
default:
// pass
}
})
}

b.mu.RLock()
if ch, ok := b.joinWait[msg.node.Address()]; ok {
select {
case ch <- struct{}{}:
// ok
default:
log.Printf("warn: join wait exits but has no reader")
}
}
b.mu.RUnlock()

b.opt.observeFunc(b, msg.evt)
b.handleJoinEvent(ctx, msg)

case LeaveEvent:
if msg.node.Name != b.node.ID() { // leave other node = run election
if err := startElection(ctx, b); err != nil {
b.opt.onErrorFunc(errors.Wrapf(err, "election failure"))
continue
}
}

b.mu.RLock()
if ch, ok := b.leaveWait[msg.node.Address()]; ok {
select {
case ch <- struct{}{}:
// ok
default:
log.Printf("warn: leave wait exists but has no reader")
}
}
b.mu.RUnlock()

b.opt.observeFunc(b, msg.evt)
b.handleLeaveEvent(ctx, msg)
}
}
}
}

func (b *Bully) handleJoinEvent(ctx context.Context, msg *hookNodeEventMsg) {
nodeMeta, err := fromJSON(bytes.NewReader(msg.node.Meta))
if err != nil {
b.opt.onErrorFunc(errors.Wrapf(err, "invalid metadata"))
return
}
node := nodeMetaToNode(nodeMeta)

if node.IsVoterNode() {
if err := startElection(ctx, b); err != nil {
b.opt.onErrorFunc(errors.Wrapf(err, "election failure"))
return
}
}

if msg.node.Name == b.node.ID() {
b.onceStartup.Do(func() {
select {
case b.waitStartup <- struct{}{}:
// ok
default:
// pass
}
})
}

b.mu.RLock()
if ch, ok := b.joinWait[msg.node.Address()]; ok {
select {
case ch <- struct{}{}:
// ok
default:
log.Printf("warn: join wait exits but has no reader")
}
}
b.mu.RUnlock()

b.opt.observeFunc(b, msg.evt)
}

func (b *Bully) handleLeaveEvent(ctx context.Context, msg *hookNodeEventMsg) {
if msg.node.Name != b.node.ID() { // leave other node = run election
if err := startElection(ctx, b); err != nil {
b.opt.onErrorFunc(errors.Wrapf(err, "election failure"))
return
}
}

b.mu.RLock()
if ch, ok := b.leaveWait[msg.node.Address()]; ok {
select {
case ch <- struct{}{}:
// ok
default:
log.Printf("warn: leave wait exists but has no reader")
}
}
b.mu.RUnlock()

b.opt.observeFunc(b, msg.evt)
}

func (b *Bully) waitSelfJoin(ctx context.Context) error {
select {
case <-ctx.Done():
Expand All @@ -510,20 +530,23 @@ func (b *Bully) waitSelfJoin(ctx context.Context) error {

func newBully(cancel context.CancelFunc, node Node, list *memberlist.Memberlist, opt *bullyOpt) *Bully {
return &Bully{
opt: opt,
mu: new(sync.RWMutex),
wg: new(sync.WaitGroup),
onceStartup: new(sync.Once),
waitStartup: make(chan struct{}),
joinWait: make(map[string]chan struct{}),
leaveWait: make(map[string]chan struct{}),
cancel: cancel,
node: node,
list: list,
opt: opt,
mu: new(sync.RWMutex),
wg: new(sync.WaitGroup),
onceStartup: new(sync.Once),
waitStartup: make(chan struct{}),
joinWait: make(map[string]chan struct{}),
leaveWait: make(map[string]chan struct{}),
electionQueue: make(chan struct{}),
cancel: cancel,
node: node,
list: list,
}
}

func CreateVoter(parent context.Context, conf *memberlist.Config, funcs ...BullyOptFunc) (*Bully, error) {
type createNodeFunc func(nodeNumber int64) Node

func createBully(parent context.Context, conf *memberlist.Config, funcs []BullyOptFunc, createNode createNodeFunc) (*Bully, error) {
opt := newBullyOpt(funcs)

ctx, cancel := context.WithCancel(parent)
Expand All @@ -535,8 +558,7 @@ func CreateVoter(parent context.Context, conf *memberlist.Config, funcs ...Bully
resolvLater = true
}

nodeNum := opt.nodeNumberGenFunc(conf.Name)
node := newVoterNode(conf.Name, conf.AdvertiseAddr, conf.AdvertisePort, StateInitial.String(), nodeNum)
node := createNode(opt.nodeNumberGenFunc(conf.Name))
conf.Delegate = newHookMessage(msgCh, node)
conf.Events = newHooNodeEvent(evtCh)

Expand All @@ -546,7 +568,7 @@ func CreateVoter(parent context.Context, conf *memberlist.Config, funcs ...Bully
return nil, errors.WithStack(err)
}
if resolvLater {
node.port = int(list.LocalNode().Port)
node.setPort(int(list.LocalNode().Port))
}

b := newBully(cancel, node, list, opt)
Expand All @@ -561,34 +583,16 @@ func CreateVoter(parent context.Context, conf *memberlist.Config, funcs ...Bully
return b, nil
}

func CreateNonVoter(parent context.Context, conf *memberlist.Config, funcs ...BullyOptFunc) (*Bully, error) {
opt := newBullyOpt(funcs)

ctx, cancel := context.WithCancel(parent)
msgCh := make(chan []byte, 1)

resolvLater := false
if conf.BindPort == 0 || conf.AdvertisePort == 0 {
resolvLater = true
}

node := newNonvoterNode(conf.Name, conf.AdvertiseAddr, conf.AdvertisePort)
conf.Delegate = newHookMessage(msgCh, node)

list, err := memberlist.Create(conf)
if err != nil {
cancel()
return nil, errors.WithStack(err)
}
if resolvLater {
node.port = int(list.LocalNode().Port)
}

b := newBully(cancel, node, list, opt)
b.wg.Add(1)
go b.readMessageLoop(ctx, msgCh)
func CreateVoter(parent context.Context, conf *memberlist.Config, funcs ...BullyOptFunc) (*Bully, error) {
return createBully(parent, conf, funcs, func(nodeNumber int64) Node {
return newVoterNode(conf.Name, conf.AdvertiseAddr, conf.AdvertisePort, StateInitial.String(), nodeNumber)
})
}

return b, nil
func CreateNonVoter(parent context.Context, conf *memberlist.Config, funcs ...BullyOptFunc) (*Bully, error) {
return createBully(parent, conf, funcs, func(nodeNumber int64) Node {
return newNonvoterNode(conf.Name, conf.AdvertiseAddr, conf.AdvertisePort)
})
}

var (
Expand Down
Loading

0 comments on commit ce62e34

Please sign in to comment.