From 8a40a51e80564beae2f8ba7ca43316a3b431e785 Mon Sep 17 00:00:00 2001 From: YACOVM Date: Mon, 26 Sep 2016 18:14:27 +0300 Subject: [PATCH] Pull algorithm module for Gossip component The gossip protocol disseminates messages via push and pull. This commit contains the module that runs the pull protocol which is a separate, timer-based protocol. The patch also contains some gofmt changes Change-Id: I39797631756d76ccd90951af6ac812c1e956fa6b Signed-off-by: Yacov Manevich --- gossip/discovery/discovery_impl.go | 10 +- gossip/discovery/discovery_test.go | 4 +- gossip/gossip/algo/pull.go | 288 ++++++++++++++++++ gossip/gossip/algo/pull_test.go | 472 +++++++++++++++++++++++++++++ gossip/util/misc.go | 66 +++- 5 files changed, 826 insertions(+), 14 deletions(-) create mode 100644 gossip/gossip/algo/pull.go create mode 100644 gossip/gossip/algo/pull_test.go diff --git a/gossip/discovery/discovery_impl.go b/gossip/discovery/discovery_impl.go index 03bef2d2437..e5c3f9bfd42 100644 --- a/gossip/discovery/discovery_impl.go +++ b/gossip/discovery/discovery_impl.go @@ -113,7 +113,6 @@ func NewDiscoveryService(bootstrapPeers []*NetworkMember, self NetworkMember, co go d.periodicalReconnectToDead() go d.handlePresumedDeadPeers() - return d } @@ -132,10 +131,10 @@ func (d *gossipDiscoveryImpl) InitiateSync(peerNum int) { k = n } - for _, i := range util.GetRandomIndices(k, n - 1) { + for _, i := range util.GetRandomIndices(k, n-1) { pulledPeer := d.cachedMembership.Alive[i].Membership netMember := &NetworkMember{ - Id: pulledPeer.Id, + Id: pulledPeer.Id, Endpoint: pulledPeer.Endpoint, Metadata: pulledPeer.Metadata, } @@ -143,7 +142,6 @@ func (d *gossipDiscoveryImpl) InitiateSync(peerNum int) { } } - func (d *gossipDiscoveryImpl) handlePresumedDeadPeers() { d.stopSignal.Add(1) defer d.stopSignal.Done() @@ -231,7 +229,7 @@ func (d *gossipDiscoveryImpl) createMembershipResponse(known []string) *proto.Me defer d.lock.RUnlock() alivePeers := make([]*proto.AliveMessage, 0) - deadPeers := make([]*proto.AliveMessage, 0) + deadPeers := make([]*proto.AliveMessage, 0) for _, am := range d.cachedMembership.Alive { isKnown := false @@ -261,7 +259,7 @@ func (d *gossipDiscoveryImpl) createMembershipResponse(known []string) *proto.Me } } - return &proto.MembershipResponse { + return &proto.MembershipResponse{ Alive: append(alivePeers, aliveMsg), Dead: deadPeers, } diff --git a/gossip/discovery/discovery_test.go b/gossip/discovery/discovery_test.go index 42273e5ed76..8a5c2abb854 100644 --- a/gossip/discovery/discovery_test.go +++ b/gossip/discovery/discovery_test.go @@ -92,7 +92,7 @@ func (comm *dummyCommModule) Ping(peer *NetworkMember) bool { _, alreadyExists := comm.streams[peer.Id] if !alreadyExists { - newConn, err := grpc.Dial(peer.Endpoint, grpc.WithInsecure(), grpc.WithTimeout(time.Duration(500) * time.Millisecond)) + newConn, err := grpc.Dial(peer.Endpoint, grpc.WithInsecure(), grpc.WithTimeout(time.Duration(500)*time.Millisecond)) if err != nil { //fmt.Printf("Error dialing: to %v: %v\n",peer.Endpoint, err) return false @@ -133,7 +133,7 @@ func (comm *dummyCommModule) CloseConn(id string) { comm.streams[id].CloseSend() comm.conns[id].Close() - delete(comm.streams,id) + delete(comm.streams, id) delete(comm.conns, id) } diff --git a/gossip/gossip/algo/pull.go b/gossip/gossip/algo/pull.go new file mode 100644 index 00000000000..d7b23c35eec --- /dev/null +++ b/gossip/gossip/algo/pull.go @@ -0,0 +1,288 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package algo + +import ( + "math/rand" + "sync" + "sync/atomic" + "time" + + "github.com/hyperledger/fabric/gossip/util" +) + +/* PullEngine is an object that performs pull-based gossip, and maintains an internal state of items + identified by uint64 numbers. + The protocol is as follows: + 1) The Initiator sends a Hello message with a specific NONCE to a set of remote peers. + 2) Each remote peer responds with a digest of its messages and returns that NONCE. + 3) The initiator checks the validity of the NONCEs received, aggregates the digests, + and crafts a request containing specific item ids it wants to receive from each remote peer and then + sends each request to its corresponding peer. + 4) Each peer sends back the response containing the items requested, if it still holds them and the NONCE. + + Other peer Initiator + O <-------- Hello ------------------------- O + /|\ --------- Digest <[3,5,8, 10...], NONCE> --------> /|\ + | <-------- Request <[3,8], NONCE> ----------------- | + / \ --------- Response <[item3, item8], NONCE>-------> / \ + +*/ + +const ( + DEF_DIGEST_WAIT_TIME = time.Duration(4) * time.Second + DEF_REQUEST_WAIT_TIME = time.Duration(4) * time.Second + DEF_RESPONSE_WAIT_TIME = time.Duration(7) * time.Second +) + +func init() { + rand.Seed(42) +} + +var defaultDigestWaitTime = DEF_DIGEST_WAIT_TIME +var defaultRequestWaitTime = DEF_REQUEST_WAIT_TIME +var defaultResponseWaitTime = DEF_RESPONSE_WAIT_TIME + +// PullAdapter is needed by the PullEngine in order to +// send messages to the remote PullEngine instances. +// The PullEngine expects to be invoked with +// OnHello, OnDigest, OnReq, OnRes when the respective message arrives +// from a remote PullEngine +type PullAdapter interface { + // SelectPeers returns a slice of peers which the engine will initiate the protocol with + SelectPeers() []string + + // Hello sends a hello message to initiate the protocol + // and returns an NONCE that is expected to be returned + // in the digest message. + Hello(dest string, nonce uint64) + + // SendDigest sends a digest to a remote PullEngine. + // The context parameter specifies the remote engine to send to. + SendDigest(digest []uint64, nonce uint64, context interface{}) + + // SendReq sends an array of items to a certain remote PullEngine identified + // by a string + SendReq(dest string, items []uint64, nonce uint64) + + // SendRes sends an array of items to a remote PullEngine identified by a context. + SendRes(items []uint64, context interface{}, nonce uint64) +} + +type PullEngine struct { + PullAdapter + stopFlag int32 + state *util.Set + item2owners map[uint64][]string + peers2nonces map[string]uint64 + nonces2peers map[uint64]string + acceptingDigests int32 + acceptingResponses int32 + lock sync.Mutex + nonces *util.Set +} + +func NewPullEngine(participant PullAdapter, sleepTime time.Duration) *PullEngine { + engine := &PullEngine{ + PullAdapter: participant, + stopFlag: int32(0), + state: util.NewSet(), + item2owners: make(map[uint64][]string), + peers2nonces: make(map[string]uint64), + nonces2peers: make(map[uint64]string), + acceptingDigests: int32(0), + acceptingResponses: int32(0), + nonces: util.NewSet(), + } + + go func() { + for !engine.toDie() { + time.Sleep(sleepTime) + engine.initiatePull() + + } + }() + + return engine +} + +func (engine *PullEngine) toDie() bool { + return (atomic.LoadInt32(&(engine.stopFlag)) == int32(1)) +} + +func (engine *PullEngine) acceptResponses() { + atomic.StoreInt32(&(engine.acceptingResponses), int32(1)) +} + +func (engine *PullEngine) isAcceptingResponses() bool { + return atomic.LoadInt32(&(engine.acceptingResponses)) == int32(1) +} + +func (engine *PullEngine) acceptDigests() { + atomic.StoreInt32(&(engine.acceptingDigests), int32(1)) +} + +func (engine *PullEngine) isAcceptingDigests() bool { + return atomic.LoadInt32(&(engine.acceptingDigests)) == int32(1) +} + +func (engine *PullEngine) ignoreDigests() { + atomic.StoreInt32(&(engine.acceptingDigests), int32(0)) +} + +func (engine *PullEngine) Stop() { + atomic.StoreInt32(&(engine.stopFlag), int32(1)) +} + +func (engine *PullEngine) initiatePull() { + engine.lock.Lock() + defer engine.lock.Unlock() + + engine.acceptDigests() + for _, peer := range engine.SelectPeers() { + nonce := engine.newNONCE() + engine.nonces.Add(nonce) + engine.nonces2peers[nonce] = peer + engine.peers2nonces[peer] = nonce + engine.Hello(peer, nonce) + } + + time.AfterFunc(defaultDigestWaitTime, func() { + engine.processIncomingDigests() + }) +} + +func (engine *PullEngine) processIncomingDigests() { + engine.ignoreDigests() + + engine.lock.Lock() + defer engine.lock.Unlock() + + requestMapping := make(map[string][]uint64) + for n, sources := range engine.item2owners { + // select a random source + source := sources[rand.Intn(len(sources))] + if _, exists := requestMapping[source]; !exists { + requestMapping[source] = make([]uint64, 0) + } + // append the number to that source + requestMapping[source] = append(requestMapping[source], n) + } + + engine.acceptResponses() + + for dest, seqsToReq := range requestMapping { + engine.SendReq(dest, seqsToReq, engine.peers2nonces[dest]) + } + + time.AfterFunc(defaultResponseWaitTime, engine.endPull) + +} + +func (engine *PullEngine) endPull() { + engine.lock.Lock() + defer engine.lock.Unlock() + + atomic.StoreInt32(&(engine.acceptingResponses), int32(0)) + engine.nonces.Clear() + + engine.item2owners = make(map[uint64][]string) + engine.peers2nonces = make(map[string]uint64) + engine.nonces2peers = make(map[uint64]string) +} + +func (engine *PullEngine) OnDigest(digest []uint64, nonce uint64, context interface{}) { + if !engine.isAcceptingDigests() || !engine.nonces.Exists(nonce) { + return + } + + engine.lock.Lock() + defer engine.lock.Unlock() + + for _, n := range digest { + if engine.state.Exists(n) { + continue + } + + if _, exists := engine.item2owners[n]; !exists { + engine.item2owners[n] = make([]string, 0) + } + + engine.item2owners[n] = append(engine.item2owners[n], engine.nonces2peers[nonce]) + } +} + +func (engine *PullEngine) Add(seqs ...uint64) { + for _, seq := range seqs { + engine.state.Add(seq) + } +} + +func (engine *PullEngine) Remove(seqs ...uint64) { + for _, seq := range seqs { + engine.state.Remove(seq) + } +} + +func (engine *PullEngine) OnHello(nonce uint64, context interface{}) { + engine.nonces.Add(nonce) + time.AfterFunc(defaultRequestWaitTime, func() { + engine.nonces.Remove(nonce) + }) + + a := engine.state.ToArray() + digest := make([]uint64, len(a)) + for i, item := range a { + digest[i] = item.(uint64) + } + engine.SendDigest(digest, nonce, context) +} + +func (engine *PullEngine) OnReq(items []uint64, nonce uint64, context interface{}) { + if !engine.nonces.Exists(nonce) { + return + } + engine.lock.Lock() + defer engine.lock.Unlock() + + items2Send := make([]uint64, 0) + for _, item := range items { + if engine.state.Exists(item) { + items2Send = append(items2Send, item) + } + } + + engine.SendRes(items2Send, context, nonce) +} + +func (engine *PullEngine) OnRes(items []uint64, nonce uint64) { + if !engine.nonces.Exists(nonce) || !engine.isAcceptingResponses() { + return + } + + engine.Add(items...) +} + +func (engine *PullEngine) newNONCE() uint64 { + n := uint64(0) + for { + n = uint64(rand.Int63()) + if !engine.nonces.Exists(n) { + return n + } + } +} diff --git a/gossip/gossip/algo/pull_test.go b/gossip/gossip/algo/pull_test.go new file mode 100644 index 00000000000..cd76b908975 --- /dev/null +++ b/gossip/gossip/algo/pull_test.go @@ -0,0 +1,472 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package algo + +import ( + "sync" + "testing" + "time" + + "github.com/hyperledger/fabric/gossip/util" + "github.com/stretchr/testify/assert" + "sync/atomic" +) + +func init() { + defaultRequestWaitTime = time.Duration(50) * time.Millisecond + defaultDigestWaitTime = time.Duration(20) * time.Millisecond + defaultResponseWaitTime = time.Duration(50) * time.Millisecond + +} + +type messageHook func(interface{}) + +type pullTestInstance struct { + msgHooks []messageHook + peers map[string]*pullTestInstance + name string + nextPeerSelection []string + msgQueue chan interface{} + lock sync.Mutex + stopChan chan struct{} + *PullEngine +} + +type helloMsg struct { + nonce uint64 + source string +} + +type digestMsg struct { + nonce uint64 + digest []uint64 + source string +} + +type reqMsg struct { + items []uint64 + nonce uint64 + source string +} + +type resMsg struct { + items []uint64 + nonce uint64 +} + +func newPushPullTestInstance(name string, peers map[string]*pullTestInstance) *pullTestInstance { + inst := &pullTestInstance{ + msgHooks: make([]messageHook, 0), + peers: peers, + msgQueue: make(chan interface{}, 100), + nextPeerSelection: make([]string, 0), + stopChan: make(chan struct{}, 1), + name: name, + } + + inst.PullEngine = NewPullEngine(inst, time.Duration(500)*time.Millisecond) + + peers[name] = inst + go func() { + for { + select { + case <-inst.stopChan: + return + break + case m := <-inst.msgQueue: + inst.handleMessage(m) + break + } + } + }() + + return inst +} + +// Used to test the messages one peer sends to another. +// Assert statements should be passed via the messageHook f +func (p *pullTestInstance) hook(f messageHook) { + p.lock.Lock() + defer p.lock.Unlock() + p.msgHooks = append(p.msgHooks, f) +} + +func (p *pullTestInstance) handleMessage(m interface{}) { + p.lock.Lock() + for _, f := range p.msgHooks { + f(m) + } + p.lock.Unlock() + + if helloMsg, isHello := m.(*helloMsg); isHello { + p.OnHello(helloMsg.nonce, helloMsg.source) + return + } + + if digestMsg, isDigest := m.(*digestMsg); isDigest { + p.OnDigest(digestMsg.digest, digestMsg.nonce, digestMsg.source) + return + } + + if reqMsg, isReq := m.(*reqMsg); isReq { + p.OnReq(reqMsg.items, reqMsg.nonce, reqMsg.source) + return + } + + if resMsg, isRes := m.(*resMsg); isRes { + p.OnRes(resMsg.items, resMsg.nonce) + } +} + +func (p *pullTestInstance) stop() { + p.stopChan <- struct{}{} + p.Stop() +} + +func (p *pullTestInstance) setNextPeerSelection(selection []string) { + p.lock.Lock() + defer p.lock.Unlock() + p.nextPeerSelection = selection +} + +func (p *pullTestInstance) SelectPeers() []string { + p.lock.Lock() + defer p.lock.Unlock() + return p.nextPeerSelection +} + +func (p *pullTestInstance) Hello(dest string, nonce uint64) { + p.peers[dest].msgQueue <- &helloMsg{nonce: nonce, source: p.name} +} + +func (p *pullTestInstance) SendDigest(digest []uint64, nonce uint64, context interface{}) { + p.peers[context.(string)].msgQueue <- &digestMsg{source: p.name, nonce: nonce, digest: digest} +} + +func (p *pullTestInstance) SendReq(dest string, items []uint64, nonce uint64) { + p.peers[dest].msgQueue <- &reqMsg{nonce: nonce, source: p.name, items: items} +} + +func (p *pullTestInstance) SendRes(items []uint64, context interface{}, nonce uint64) { + p.peers[context.(string)].msgQueue <- &resMsg{items: items, nonce: nonce} +} + +func TestPullEngine_Add(t *testing.T) { + peers := make(map[string]*pullTestInstance) + inst1 := newPushPullTestInstance("p1", peers) + defer inst1.Stop() + inst1.Add(uint64(0)) + inst1.Add(uint64(0)) + assert.True(t, inst1.PullEngine.state.Exists(uint64(0))) +} + +func TestPullEngine_Remove(t *testing.T) { + peers := make(map[string]*pullTestInstance) + inst1 := newPushPullTestInstance("p1", peers) + defer inst1.Stop() + inst1.Add(uint64(0)) + assert.True(t, inst1.PullEngine.state.Exists(uint64(0))) + inst1.Remove(uint64(0)) + assert.False(t, inst1.PullEngine.state.Exists(uint64(0))) + inst1.Remove(uint64(0)) // remove twice + assert.False(t, inst1.PullEngine.state.Exists(uint64(0))) +} + +func TestPullEngine_Stop(t *testing.T) { + peers := make(map[string]*pullTestInstance) + inst1 := newPushPullTestInstance("p1", peers) + inst2 := newPushPullTestInstance("p2", peers) + defer inst2.stop() + inst2.setNextPeerSelection([]string{"p1"}) + go func() { + for i := 0; i < 100; i++ { + inst1.Add(uint64(i)) + time.Sleep(time.Duration(10) * time.Millisecond) + } + }() + + time.Sleep(time.Duration(800) * time.Millisecond) + len1 := len(inst2.state.ToArray()) + inst1.stop() + time.Sleep(time.Duration(800) * time.Millisecond) + len2 := len(inst2.state.ToArray()) + assert.Equal(t, len1, len2, "PullEngine was still active after Stop() was invoked!") +} + +func TestPullEngineSelectiveUpdates(t *testing.T) { + // Scenario: inst1 has {1, 3} and inst2 has {0,1,2,3}. + // inst1 initiates to inst2 + // Expected outcome: inst1 asks for 0,2 and inst2 sends 0,2 only + peers := make(map[string]*pullTestInstance) + inst1 := newPushPullTestInstance("p1", peers) + inst2 := newPushPullTestInstance("p2", peers) + defer inst1.stop() + defer inst2.stop() + + inst1.Add(uint64(1), uint64(3)) + inst2.Add(uint64(0), uint64(1), uint64(2), uint64(3)) + + // Ensure inst2 sent a proper digest to inst1 + inst1.hook(func(m interface{}) { + if dig, isDig := m.(*digestMsg); isDig { + assert.True(t, util.IndexInSlice(dig.digest, uint64(0), numericCompare) != -1) + assert.True(t, util.IndexInSlice(dig.digest, uint64(1), numericCompare) != -1) + assert.True(t, util.IndexInSlice(dig.digest, uint64(2), numericCompare) != -1) + assert.True(t, util.IndexInSlice(dig.digest, uint64(3), numericCompare) != -1) + } + }) + + // Ensure inst1 requested only needed updates from inst2 + inst2.hook(func(m interface{}) { + if req, isReq := m.(*reqMsg); isReq { + assert.True(t, util.IndexInSlice(req.items, uint64(1), numericCompare) == -1) + assert.True(t, util.IndexInSlice(req.items, uint64(3), numericCompare) == -1) + + assert.True(t, util.IndexInSlice(req.items, uint64(0), numericCompare) != -1) + assert.True(t, util.IndexInSlice(req.items, uint64(2), numericCompare) != -1) + } + }) + + // Ensure inst1 received only needed updates from inst2 + inst1.hook(func(m interface{}) { + if res, isRes := m.(*resMsg); isRes { + assert.True(t, util.IndexInSlice(res.items, uint64(1), numericCompare) == -1) + assert.True(t, util.IndexInSlice(res.items, uint64(3), numericCompare) == -1) + + assert.True(t, util.IndexInSlice(res.items, uint64(0), numericCompare) != -1) + assert.True(t, util.IndexInSlice(res.items, uint64(2), numericCompare) != -1) + } + }) + + inst1.setNextPeerSelection([]string{"p2"}) + + time.Sleep(time.Duration(800) * time.Millisecond) + assert.Equal(t, len(inst2.state.ToArray()), len(inst1.state.ToArray())) +} + +func TestByzantineResponder(t *testing.T) { + // Scenario: inst1 sends hello to inst2 but inst3 is byzantine so it attempts to send a digest and a response to inst1. + // expected outcome is for inst1 not to process updates from inst3. + peers := make(map[string]*pullTestInstance) + inst1 := newPushPullTestInstance("p1", peers) + inst2 := newPushPullTestInstance("p2", peers) + inst3 := newPushPullTestInstance("p3", peers) + defer inst1.stop() + defer inst2.stop() + defer inst3.stop() + + receivedDigestFromInst3 := int32(0) + + inst2.Add(uint64(1), uint64(2), uint64(3)) + inst3.Add(uint64(5), uint64(6), uint64(7)) + + inst2.hook(func(m interface{}) { + if _, isHello := m.(*helloMsg); isHello { + inst3.SendDigest([]uint64{uint64(5), uint64(6), uint64(7)}, 0, "p1") + } + }) + + inst1.hook(func(m interface{}) { + if dig, isDig := m.(*digestMsg); isDig { + if dig.source == "p3" { + atomic.StoreInt32(&receivedDigestFromInst3, int32(1)) + time.AfterFunc(time.Duration(25)*time.Millisecond, func() { + inst3.SendRes([]uint64{uint64(5), uint64(6), uint64(7)}, "p1", 0) + }) + } + } + + if res, isRes := m.(*resMsg); isRes { + // the response is from p3 + if util.IndexInSlice(res.items, uint64(6), numericCompare) != -1 { + // inst1 is currently accepting responses + assert.Equal(t, int32(1), atomic.LoadInt32(&(inst1.acceptingResponses))) + } + } + }) + + inst1.setNextPeerSelection([]string{"p2"}) + + time.Sleep(time.Duration(800) * time.Millisecond) + + assert.Equal(t, int32(1), atomic.LoadInt32(&receivedDigestFromInst3), "inst1 hasn't received a digest from inst3") + + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(1), numericCompare) != -1) + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(2), numericCompare) != -1) + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(3), numericCompare) != -1) + + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(5), numericCompare) == -1) + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(6), numericCompare) == -1) + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(7), numericCompare) == -1) + +} + +func TestMultipleInitiators(t *testing.T) { + // Scenario: inst1, inst2 and inst3 both start protocol with inst4 at the same time. + // Expected outcome: inst4 successfully transfers state to all of them + peers := make(map[string]*pullTestInstance) + inst1 := newPushPullTestInstance("p1", peers) + inst2 := newPushPullTestInstance("p2", peers) + inst3 := newPushPullTestInstance("p3", peers) + inst4 := newPushPullTestInstance("p4", peers) + defer inst1.stop() + defer inst2.stop() + defer inst3.stop() + defer inst4.stop() + + inst4.Add(uint64(1), uint64(2), uint64(3), uint64(4)) + inst1.setNextPeerSelection([]string{"p4"}) + inst2.setNextPeerSelection([]string{"p4"}) + inst3.setNextPeerSelection([]string{"p4"}) + + time.Sleep(time.Duration(800) * time.Millisecond) + + for _, inst := range []*pullTestInstance{inst1, inst2, inst3} { + assert.True(t, util.IndexInSlice(inst.state.ToArray(), uint64(1), numericCompare) != -1) + assert.True(t, util.IndexInSlice(inst.state.ToArray(), uint64(2), numericCompare) != -1) + assert.True(t, util.IndexInSlice(inst.state.ToArray(), uint64(3), numericCompare) != -1) + assert.True(t, util.IndexInSlice(inst.state.ToArray(), uint64(4), numericCompare) != -1) + } + +} + +func TestLatePeers(t *testing.T) { + // Scenario: inst1 initiates to inst2 (items: {1,2,3,4}) and inst3 (items: {5,6,7,8}), + // but inst2 is too slow to respond, and all items + // should be received from inst3. + peers := make(map[string]*pullTestInstance) + inst1 := newPushPullTestInstance("p1", peers) + inst2 := newPushPullTestInstance("p2", peers) + inst3 := newPushPullTestInstance("p3", peers) + defer inst1.stop() + defer inst2.stop() + defer inst3.stop() + inst2.Add(uint64(1), uint64(2), uint64(3), uint64(4)) + inst3.Add(uint64(5), uint64(6), uint64(7), uint64(8)) + inst2.hook(func(m interface{}) { + time.Sleep(time.Duration(60) * time.Millisecond) + }) + inst1.setNextPeerSelection([]string{"p2", "p3"}) + + time.Sleep(time.Duration(800) * time.Millisecond) + + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(1), numericCompare) == -1) + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(2), numericCompare) == -1) + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(3), numericCompare) == -1) + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(4), numericCompare) == -1) + + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(5), numericCompare) != -1) + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(6), numericCompare) != -1) + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(7), numericCompare) != -1) + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(8), numericCompare) != -1) + +} + +func TestBiDiUpdates(t *testing.T) { + // Scenario: inst1 has {1, 3} and inst2 has {0,2} and both initiate to the other at the same time. + // Expected outcome: both have {0,1,2,3} in the end + peers := make(map[string]*pullTestInstance) + inst1 := newPushPullTestInstance("p1", peers) + inst2 := newPushPullTestInstance("p2", peers) + defer inst1.stop() + defer inst2.stop() + + inst1.Add(uint64(1), uint64(3)) + inst2.Add(uint64(0), uint64(2)) + + inst1.setNextPeerSelection([]string{"p2"}) + inst2.setNextPeerSelection([]string{"p1"}) + + time.Sleep(time.Duration(800) * time.Millisecond) + + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(0), numericCompare) != -1) + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(1), numericCompare) != -1) + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(2), numericCompare) != -1) + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(3), numericCompare) != -1) + + assert.True(t, util.IndexInSlice(inst2.state.ToArray(), uint64(0), numericCompare) != -1) + assert.True(t, util.IndexInSlice(inst2.state.ToArray(), uint64(1), numericCompare) != -1) + assert.True(t, util.IndexInSlice(inst2.state.ToArray(), uint64(2), numericCompare) != -1) + assert.True(t, util.IndexInSlice(inst2.state.ToArray(), uint64(3), numericCompare) != -1) + +} + +func TestSpread(t *testing.T) { + // Scenario: inst1 initiates to inst2, inst3 inst4 and each have items 0-100. inst5 also has the same items but isn't selected + // Expected outcome: each responder (inst2, inst3 and inst4) is chosen at least once (the probability for not choosing each of them is slim) + // inst5 isn't selected at all + peers := make(map[string]*pullTestInstance) + inst1 := newPushPullTestInstance("p1", peers) + inst2 := newPushPullTestInstance("p2", peers) + inst3 := newPushPullTestInstance("p3", peers) + inst4 := newPushPullTestInstance("p4", peers) + inst5 := newPushPullTestInstance("p5", peers) + defer inst1.stop() + defer inst2.stop() + defer inst3.stop() + defer inst4.stop() + defer inst5.stop() + + chooseCounters := make(map[string]int) + chooseCounters["p2"] = 0 + chooseCounters["p3"] = 0 + chooseCounters["p4"] = 0 + chooseCounters["p5"] = 0 + + lock := &sync.Mutex{} + + addToCounters := func(dest string) func(m interface{}) { + return func(m interface{}) { + if _, isReq := m.(*reqMsg); isReq { + lock.Lock() + chooseCounters[dest]++ + lock.Unlock() + } + } + } + + inst2.hook(addToCounters("p2")) + inst3.hook(addToCounters("p3")) + inst4.hook(addToCounters("p4")) + inst5.hook(addToCounters("p5")) + + for i := 0; i < 100; i++ { + item := uint64(i) + inst2.Add(item) + inst3.Add(item) + inst4.Add(item) + } + + inst1.setNextPeerSelection([]string{"p2", "p3", "p4"}) + + time.Sleep(time.Duration(800) * time.Millisecond) + + lock.Lock() + for p_i, counter := range chooseCounters { + if p_i == "p5" { + assert.Equal(t, 0, counter) + } else { + assert.True(t, counter > 0, "%s was not selected!", p_i) + } + } + lock.Unlock() + +} + +func numericCompare(a interface{}, b interface{}) bool { + return a.(uint64) == b.(uint64) +} diff --git a/gossip/util/misc.go b/gossip/util/misc.go index 7041021d294..c187e1e9a18 100644 --- a/gossip/util/misc.go +++ b/gossip/util/misc.go @@ -19,6 +19,7 @@ package util import ( "math/rand" "reflect" + "sync" ) type Equals func(a interface{}, b interface{}) bool @@ -27,11 +28,6 @@ func init() { rand.Seed(42) } - -func numbericEqual(a interface{}, b interface{}) bool { - return a.(int) == b.(int) -} - func IndexInSlice(array interface{}, o interface{}, equals Equals) int { arr := reflect.ValueOf(array) for i := 0; i < arr.Len(); i++ { @@ -42,6 +38,10 @@ func IndexInSlice(array interface{}, o interface{}, equals Equals) int { return -1 } +func numbericEqual(a interface{}, b interface{}) bool { + return a.(int) == b.(int) +} + func GetRandomIndices(indiceCount, highestIndex int) []int { if highestIndex+1 < indiceCount { return nil @@ -63,4 +63,58 @@ func GetRandomIndices(indiceCount, highestIndex int) []int { indices = append(indices, n) } return indices -} \ No newline at end of file +} + +func Abs(a, b uint64) uint64 { + if a > b { + return a - b + } else { + return b - a + } +} + +type Set struct { + items map[interface{}]struct{} + lock *sync.RWMutex +} + +func NewSet() *Set { + return &Set{lock: &sync.RWMutex{}, items: make(map[interface{}]struct{})} +} + +func (s *Set) Add(item interface{}) { + s.lock.Lock() + defer s.lock.Unlock() + s.items[item] = struct{}{} +} + +func (s *Set) Exists(item interface{}) bool { + s.lock.RLock() + defer s.lock.RUnlock() + _, exists := s.items[item] + return exists +} + +func (s *Set) ToArray() []interface{} { + s.lock.RLock() + defer s.lock.RUnlock() + a := make([]interface{}, len(s.items)) + i := 0 + for item := range s.items { + a[i] = item + i++ + } + return a +} + +func (s *Set) Clear() { + s.lock.Lock() + defer s.lock.Unlock() + s.items = make(map[interface{}]struct{}) +} + +func (s *Set) Remove(item interface{}) { + s.lock.Lock() + defer s.lock.Unlock() + delete(s.items, item) +}