Skip to content

Commit

Permalink
sync event
Browse files Browse the repository at this point in the history
  • Loading branch information
octu0 committed Oct 21, 2024
1 parent 80a86ab commit a57b0cc
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 26 deletions.
136 changes: 112 additions & 24 deletions bully.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ const (
DefaultElectionTimeout = 30 * time.Second
DefaultElectionInterval = 100 * time.Millisecond
DefaultUpdateNodeTimeout = 3 * time.Second
DefaultJoinNodeTimeout = 3 * time.Second
DefaultLeaveNodeTimeout = 3 * time.Second
)

var (
ErrBullyInitialize = errors.New("bully initialize")
ErrBullyAliveTimeout = errors.New("bully alive timeout")
ErrJoinTimeout = errors.New("join timeout")
ErrLeaveTimeout = errors.New("leave timeout")
errNodeNotFound = errors.New("node not found")
)

Expand Down Expand Up @@ -79,6 +82,7 @@ type bullyOpt struct {
electionTimeout time.Duration
electionInterval time.Duration
updateNodeTimeout time.Duration
joinNodeTimeout time.Duration
leaveNodeTimeout time.Duration
nodeNumberGenFunc NodeNumberGeneratorFunc
onErrorFunc OnErrorFunc
Expand Down Expand Up @@ -108,6 +112,12 @@ func WithUpdateNodeTimeout(d time.Duration) BullyOptFunc {
}
}

func WithJoinNodeTimeout(d time.Duration) BullyOptFunc {
return func(o *bullyOpt) {
o.joinNodeTimeout = d
}
}

func WithLeaveNodeTimeout(d time.Duration) BullyOptFunc {
return func(o *bullyOpt) {
o.leaveNodeTimeout = d
Expand All @@ -131,6 +141,7 @@ func newBullyOpt(opts []BullyOptFunc) *bullyOpt {
electionTimeout: DefaultElectionTimeout,
electionInterval: DefaultElectionInterval,
updateNodeTimeout: DefaultUpdateNodeTimeout,
joinNodeTimeout: DefaultJoinNodeTimeout,
leaveNodeTimeout: DefaultLeaveNodeTimeout,
observeFunc: DefaultObserverFunc,
nodeNumberGenFunc: DefaultNodeNumberGeneratorFunc,
Expand All @@ -143,15 +154,16 @@ func newBullyOpt(opts []BullyOptFunc) *bullyOpt {
}

type Bully struct {
opt *bullyOpt
wg *sync.WaitGroup
onceJoin *sync.Once
onceLeave *sync.Once
waitJoin chan struct{}
waitLeave chan struct{}
cancel context.CancelFunc
node Node
list *memberlist.Memberlist
opt *bullyOpt
mu *sync.RWMutex
wg *sync.WaitGroup
onceStartup *sync.Once
waitStartup chan struct{}
joinWait map[string]chan struct{}
leaveWait map[string]chan struct{}
cancel context.CancelFunc
node Node
list *memberlist.Memberlist
}

func (b *Bully) IsVoter() bool {
Expand Down Expand Up @@ -194,17 +206,63 @@ func (b *Bully) Members() []Node {
}

func (b *Bully) Join(addr string) error {
if addr == b.list.LocalNode().Address() {
return nil // skip self join (memberlist no event)
}

wait := make(chan struct{})
b.mu.Lock()
b.joinWait[addr] = wait
b.mu.Unlock()
defer func() {
b.mu.Lock()
delete(b.joinWait, addr)
b.mu.Unlock()
}()

ret := make(chan error)
go func() {
select {
case <-time.After(b.opt.joinNodeTimeout * 2):
ret <- errors.Wrapf(ErrJoinTimeout, "memberlist joined but not call callback")
case <-wait:
ret <- nil // ok
}
}()

if _, err := b.list.Join([]string{addr}); err != nil {
return errors.WithStack(err)
}
return nil

return <-ret
}

func (b *Bully) Leave() error {
addr := b.list.LocalNode().Address()
wait := make(chan struct{})
b.mu.Lock()
b.leaveWait[addr] = wait
b.mu.Unlock()
defer func() {
b.mu.Lock()
delete(b.leaveWait, addr)
b.mu.Unlock()
}()

ret := make(chan error)
go func() {
select {
case <-time.After(b.opt.leaveNodeTimeout * 2):
ret <- errors.Wrapf(ErrLeaveTimeout, "memberlist leaved but not call callback")
case <-wait:
ret <- nil // ok
}
}()

if err := b.list.Leave(b.opt.leaveNodeTimeout); err != nil {
return errors.WithStack(err)
}
return nil
return <-ret
}

func (b *Bully) TransferLeader(ctx context.Context) error {
Expand Down Expand Up @@ -391,21 +449,48 @@ func (b *Bully) readNodeEventLoop(ctx context.Context, ch chan *hookNodeEventMsg
continue
}
if msg.node.Name == b.node.ID() {
b.onceJoin.Do(func() {
b.onceStartup.Do(func() {
select {
case b.waitJoin <- struct{}{}:
case b.waitStartup <- struct{}{}:
// ok
default:
// pass
}
})
}

b.mu.RLock()
if ch, ok := b.joinWait[msg.node.Address()]; ok {
select {
case ch <- struct{}{}:
// ok
default:
log.Printf("warn: join wait exits but has no reader")
}
}
b.mu.RUnlock()

b.opt.observeFunc(b, msg.evt)

case LeaveEvent:
if err := startElection(ctx, b); err != nil {
b.opt.onErrorFunc(errors.Wrapf(err, "election failure"))
continue
if msg.node.Name != b.node.ID() { // leave other node = run election
if err := startElection(ctx, b); err != nil {
b.opt.onErrorFunc(errors.Wrapf(err, "election failure"))
continue
}
}

b.mu.RLock()
if ch, ok := b.leaveWait[msg.node.Address()]; ok {
select {
case ch <- struct{}{}:
// ok
default:
log.Printf("warn: leave wait exists but has no reader")
}
}
b.mu.RUnlock()

b.opt.observeFunc(b, msg.evt)
}
}
Expand All @@ -418,20 +503,23 @@ func (b *Bully) waitSelfJoin(ctx context.Context) error {
return errors.WithStack(ErrBullyInitialize)
case <-time.After(b.opt.electionTimeout):
return errors.WithStack(ErrBullyAliveTimeout)
case <-b.waitJoin:
case <-b.waitStartup:
return nil // ok
}
}

func newBully(cancel context.CancelFunc, node Node, list *memberlist.Memberlist, opt *bullyOpt) *Bully {
return &Bully{
opt: opt,
wg: new(sync.WaitGroup),
onceJoin: new(sync.Once),
waitJoin: make(chan struct{}),
cancel: cancel,
node: node,
list: list,
opt: opt,
mu: new(sync.RWMutex),
wg: new(sync.WaitGroup),
onceStartup: new(sync.Once),
waitStartup: make(chan struct{}),
joinWait: make(map[string]chan struct{}),
leaveWait: make(map[string]chan struct{}),
cancel: cancel,
node: node,
list: list,
}
}

Expand Down
94 changes: 94 additions & 0 deletions bully_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,100 @@ func TestCreateVoter(t *testing.T) {
}
tt.Logf("shutdown")
})

t.Run("two_node", func(tt *testing.T) {
conf1 := memberlist.DefaultLocalConfig()
conf1.Name = "test1"
conf1.BindAddr = "127.0.0.1"
conf1.BindPort = 0
conf1.AdvertiseAddr = "127.0.0.1"
conf1.AdvertisePort = 0

conf2 := memberlist.DefaultLocalConfig()
conf2.Name = "test2"
conf2.BindAddr = "127.0.0.1"
conf2.BindPort = 0
conf2.AdvertiseAddr = "127.0.0.1"
conf2.AdvertisePort = 0

ctx, cancel := context.WithCancel(context.Background())
b1, err := CreateVoter(ctx, conf1,
WithElectionTimeout(1*time.Second),
WithObserveFunc(func(b *Bully, evt NodeEvent) {
tt.Logf("[1] evt=%s", evt)
}),
WithOnErrorFunc(func(err error) {
tt.Fatalf("[1] on error=%+v", err)
cancel()
}),
)
if err != nil {
tt.Fatalf("CreateVoter: %+v", err)
}

b2, err := CreateVoter(ctx, conf2,
WithElectionTimeout(1*time.Second),
WithObserveFunc(func(b *Bully, evt NodeEvent) {
tt.Logf("[2] evt=%s", evt)
}),
WithOnErrorFunc(func(err error) {
tt.Fatalf("[2] on error=%+v", err)
cancel()
}),
)
if err != nil {
tt.Fatalf("CreateVoter: %+v", err)
}

tt.Logf("[1] addr=%v", b1.Address())
tt.Logf("[2] addr=%v", b2.Address())

if b1.IsVoter() != true {
tt.Errorf("must voter node")
}
if b2.IsVoter() != true {
tt.Errorf("must voter node")
}

if b1.IsLeader() != true {
tt.Errorf("not join = is leader")
}
if b2.IsLeader() != true {
tt.Errorf("not join = is leader")
}

if err := b1.Join(b1.Address()); err != nil {
tt.Fatalf("join self: %+v", err)
}
tt.Logf("b1 join b1: %s", b1.Address())

if err := b2.Join(b1.Address()); err != nil {
tt.Fatalf("join b1: %+v", err)
}
tt.Logf("b2 join b1: %s", b1.Address())

if b1.IsLeader() != true {
tt.Errorf("leader = b1: %+v", b1.node)
}
if b2.IsLeader() {
tt.Errorf("leader = b1: %+v", b2.node)
}

if err := b1.Leave(); err != nil {
tt.Fatalf("Leave: %+v", err)
}
tt.Logf("b1 leave: %+v", b1.node)

if b2.IsLeader() != true {
tt.Errorf("leader = b2: %+v", b2.node)
}
tt.Logf("b2 is leader = %v", b2.IsLeader())

if err := b2.Leave(); err != nil {
tt.Fatalf("Leave: %+v", err)
}
tt.Logf("b2 leave: %+v", b2.node)
})
}

/*
Expand Down
4 changes: 2 additions & 2 deletions election.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func leaderElection(ctx context.Context, b *Bully, voterNodes []internalVoterNod
selfNodeNumber := b.getNodeNumber()
targetNodes := make([]internalVoterNode, 0, len(voterNodes))
for _, n := range voterNodes {
if n.getNodeNumber() < selfNodeNumber {
if selfNodeNumber < n.getNodeNumber() {
targetNodes = append(targetNodes, n)
}
}
Expand Down Expand Up @@ -184,7 +184,7 @@ func syncLeader(ctx context.Context, b *Bully, answerNodes []internalVoterNode)
if len(leaderNodes) == 0 {
msg := bytes.NewBuffer(nil)
for _, n := range answerNodes {
msg.WriteString(fmt.Sprintf("id:%s election:%d answer:%d\n", n.ID(), n.getNumElection(), n.getNumAnswer()))
fmt.Fprintf(msg, "id:%s election:%d answer:%d\n", n.ID(), n.getNumElection(), n.getNumAnswer())
}
return nil, errors.Wrapf(ErrNoLeader, msg.String())
}
Expand Down

0 comments on commit a57b0cc

Please sign in to comment.