From baea89c1b414a55b424fdd0494afaf0f2dd0ff28 Mon Sep 17 00:00:00 2001 From: YACOVM Date: Sun, 27 Nov 2016 11:54:15 +0200 Subject: [PATCH] FAB-1045 Gossip pull refactoring: uint64->string Currently the gossip pull mechanism can only handle items uint64 as keys (for digests). I refactored this to be string, because a string can hold both ints, hashes and can also be used as a key in a map. This is needed for disseminating entities such as certificates of peers. Change-Id: I06be757275dccd4e78f055bb703aeadd52787c33 Signed-off-by: Yacov Manevich --- gossip/gossip/algo/pull.go | 34 +++---- gossip/gossip/algo/pull_test.go | 150 ++++++++++++++------------- gossip/gossip/gossip_impl.go | 62 ++++++------ gossip/proto/message.pb.go | 173 +++++++++++++++++++------------- gossip/proto/message.proto | 20 +++- 5 files changed, 245 insertions(+), 194 deletions(-) diff --git a/gossip/gossip/algo/pull.go b/gossip/gossip/algo/pull.go index 8061d7b8a1a..1d372f21a70 100644 --- a/gossip/gossip/algo/pull.go +++ b/gossip/gossip/algo/pull.go @@ -26,7 +26,7 @@ import ( ) /* PullEngine is an object that performs pull-based gossip, and maintains an internal state of items - identified by uint64 numbers. + identified by string numbers. The protocol is as follows: 1) The Initiator sends a Hello message with a specific NONCE to a set of remote peers. 2) Each remote peer responds with a digest of its messages and returns that NONCE. @@ -82,14 +82,14 @@ type PullAdapter interface { // SendDigest sends a digest to a remote PullEngine. // The context parameter specifies the remote engine to send to. - SendDigest(digest []uint64, nonce uint64, context interface{}) + SendDigest(digest []string, nonce uint64, context interface{}) // SendReq sends an array of items to a certain remote PullEngine identified // by a string - SendReq(dest string, items []uint64, nonce uint64) + SendReq(dest string, items []string, nonce uint64) // SendRes sends an array of items to a remote PullEngine identified by a context. - SendRes(items []uint64, context interface{}, nonce uint64) + SendRes(items []string, context interface{}, nonce uint64) } // PullEngine is the component that actually invokes the pull algorithm @@ -98,7 +98,7 @@ type PullEngine struct { PullAdapter stopFlag int32 state *util.Set - item2owners map[uint64][]string + item2owners map[string][]string peers2nonces map[string]uint64 nonces2peers map[uint64]string acceptingDigests int32 @@ -115,7 +115,7 @@ func NewPullEngine(participant PullAdapter, sleepTime time.Duration) *PullEngine PullAdapter: participant, stopFlag: int32(0), state: util.NewSet(), - item2owners: make(map[uint64][]string), + item2owners: make(map[string][]string), peers2nonces: make(map[string]uint64), nonces2peers: make(map[uint64]string), acceptingDigests: int32(0), @@ -190,12 +190,12 @@ func (engine *PullEngine) processIncomingDigests() { engine.lock.Lock() defer engine.lock.Unlock() - requestMapping := make(map[string][]uint64) + requestMapping := make(map[string][]string) for n, sources := range engine.item2owners { // select a random source source := sources[rand.Intn(len(sources))] if _, exists := requestMapping[source]; !exists { - requestMapping[source] = make([]uint64, 0) + requestMapping[source] = make([]string, 0) } // append the number to that source requestMapping[source] = append(requestMapping[source], n) @@ -218,13 +218,13 @@ func (engine *PullEngine) endPull() { atomic.StoreInt32(&(engine.acceptingResponses), int32(0)) engine.outgoingNONCES.Clear() - engine.item2owners = make(map[uint64][]string) + engine.item2owners = make(map[string][]string) engine.peers2nonces = make(map[string]uint64) engine.nonces2peers = make(map[uint64]string) } // OnDigest notifies the engine that a digest has arrived -func (engine *PullEngine) OnDigest(digest []uint64, nonce uint64, context interface{}) { +func (engine *PullEngine) OnDigest(digest []string, nonce uint64, context interface{}) { if !engine.isAcceptingDigests() || !engine.outgoingNONCES.Exists(nonce) { return } @@ -246,14 +246,14 @@ func (engine *PullEngine) OnDigest(digest []uint64, nonce uint64, context interf } // Add adds items to the state -func (engine *PullEngine) Add(seqs ...uint64) { +func (engine *PullEngine) Add(seqs ...string) { for _, seq := range seqs { engine.state.Add(seq) } } // Remove removes items from the state -func (engine *PullEngine) Remove(seqs ...uint64) { +func (engine *PullEngine) Remove(seqs ...string) { for _, seq := range seqs { engine.state.Remove(seq) } @@ -267,21 +267,21 @@ func (engine *PullEngine) OnHello(nonce uint64, context interface{}) { }) a := engine.state.ToArray() - digest := make([]uint64, len(a)) + digest := make([]string, len(a)) for i, item := range a { - digest[i] = item.(uint64) + digest[i] = item.(string) } engine.SendDigest(digest, nonce, context) } // OnReq notifies the engine a request has arrived -func (engine *PullEngine) OnReq(items []uint64, nonce uint64, context interface{}) { +func (engine *PullEngine) OnReq(items []string, nonce uint64, context interface{}) { if !engine.incomingNONCES.Exists(nonce) { return } engine.lock.Lock() - var items2Send []uint64 + var items2Send []string for _, item := range items { if engine.state.Exists(item) { items2Send = append(items2Send, item) @@ -294,7 +294,7 @@ func (engine *PullEngine) OnReq(items []uint64, nonce uint64, context interface{ } // OnRes notifies the engine a response has arrived -func (engine *PullEngine) OnRes(items []uint64, nonce uint64) { +func (engine *PullEngine) OnRes(items []string, nonce uint64) { if !engine.outgoingNONCES.Exists(nonce) || !engine.isAcceptingResponses() { return } diff --git a/gossip/gossip/algo/pull_test.go b/gossip/gossip/algo/pull_test.go index 509b3aab6a9..ed8499d0995 100644 --- a/gossip/gossip/algo/pull_test.go +++ b/gossip/gossip/algo/pull_test.go @@ -54,18 +54,18 @@ type helloMsg struct { type digestMsg struct { nonce uint64 - digest []uint64 + digest []string source string } type reqMsg struct { - items []uint64 + items []string nonce uint64 source string } type resMsg struct { - items []uint64 + items []string nonce uint64 } @@ -153,40 +153,43 @@ func (p *pullTestInstance) Hello(dest string, nonce uint64) { p.peers[dest].msgQueue <- &helloMsg{nonce: nonce, source: p.name} } -func (p *pullTestInstance) SendDigest(digest []uint64, nonce uint64, context interface{}) { +func (p *pullTestInstance) SendDigest(digest []string, nonce uint64, context interface{}) { p.peers[context.(string)].msgQueue <- &digestMsg{source: p.name, nonce: nonce, digest: digest} } -func (p *pullTestInstance) SendReq(dest string, items []uint64, nonce uint64) { +func (p *pullTestInstance) SendReq(dest string, items []string, nonce uint64) { p.peers[dest].msgQueue <- &reqMsg{nonce: nonce, source: p.name, items: items} } -func (p *pullTestInstance) SendRes(items []uint64, context interface{}, nonce uint64) { +func (p *pullTestInstance) SendRes(items []string, context interface{}, nonce uint64) { p.peers[context.(string)].msgQueue <- &resMsg{items: items, nonce: nonce} } func TestPullEngine_Add(t *testing.T) { + t.Parallel() peers := make(map[string]*pullTestInstance) inst1 := newPushPullTestInstance("p1", peers) defer inst1.Stop() - inst1.Add(uint64(0)) - inst1.Add(uint64(0)) - assert.True(t, inst1.PullEngine.state.Exists(uint64(0))) + inst1.Add("0") + inst1.Add("0") + assert.True(t, inst1.PullEngine.state.Exists("0")) } func TestPullEngine_Remove(t *testing.T) { + t.Parallel() peers := make(map[string]*pullTestInstance) inst1 := newPushPullTestInstance("p1", peers) defer inst1.Stop() - inst1.Add(uint64(0)) - assert.True(t, inst1.PullEngine.state.Exists(uint64(0))) - inst1.Remove(uint64(0)) - assert.False(t, inst1.PullEngine.state.Exists(uint64(0))) - inst1.Remove(uint64(0)) // remove twice - assert.False(t, inst1.PullEngine.state.Exists(uint64(0))) + inst1.Add("0") + assert.True(t, inst1.PullEngine.state.Exists("0")) + inst1.Remove("0") + assert.False(t, inst1.PullEngine.state.Exists("0")) + inst1.Remove("0") // remove twice + assert.False(t, inst1.PullEngine.state.Exists("0")) } func TestPullEngine_Stop(t *testing.T) { + t.Parallel() peers := make(map[string]*pullTestInstance) inst1 := newPushPullTestInstance("p1", peers) inst2 := newPushPullTestInstance("p2", peers) @@ -194,7 +197,7 @@ func TestPullEngine_Stop(t *testing.T) { inst2.setNextPeerSelection([]string{"p1"}) go func() { for i := 0; i < 100; i++ { - inst1.Add(uint64(i)) + inst1.Add(string(i)) time.Sleep(time.Duration(10) * time.Millisecond) } }() @@ -208,6 +211,7 @@ func TestPullEngine_Stop(t *testing.T) { } func TestPullEngineAll2AllWithIncrementalSpawning(t *testing.T) { + t.Parallel() // Scenario: spawn 10 nodes, each 50 ms after the other // and have them transfer data between themselves. // Expected outcome: obviously, everything should succeed. @@ -217,7 +221,7 @@ func TestPullEngineAll2AllWithIncrementalSpawning(t *testing.T) { for i := 0; i < instanceCount; i++ { inst := newPushPullTestInstance(fmt.Sprintf("p%d", i+1), peers) - inst.Add(uint64(i + 1)) + inst.Add(string(i + 1)) time.Sleep(time.Duration(50) * time.Millisecond) } for i := 0; i < instanceCount; i++ { @@ -233,6 +237,7 @@ func TestPullEngineAll2AllWithIncrementalSpawning(t *testing.T) { } func TestPullEngineSelectiveUpdates(t *testing.T) { + t.Parallel() // Scenario: inst1 has {1, 3} and inst2 has {0,1,2,3}. // inst1 initiates to inst2 // Expected outcome: inst1 asks for 0,2 and inst2 sends 0,2 only @@ -242,38 +247,38 @@ func TestPullEngineSelectiveUpdates(t *testing.T) { defer inst1.stop() defer inst2.stop() - inst1.Add(uint64(1), uint64(3)) - inst2.Add(uint64(0), uint64(1), uint64(2), uint64(3)) + inst1.Add("1", "3") + inst2.Add("0", "1", "2", "3") // Ensure inst2 sent a proper digest to inst1 inst1.hook(func(m interface{}) { if dig, isDig := m.(*digestMsg); isDig { - assert.True(t, util.IndexInSlice(dig.digest, uint64(0), numericCompare) != -1) - assert.True(t, util.IndexInSlice(dig.digest, uint64(1), numericCompare) != -1) - assert.True(t, util.IndexInSlice(dig.digest, uint64(2), numericCompare) != -1) - assert.True(t, util.IndexInSlice(dig.digest, uint64(3), numericCompare) != -1) + assert.True(t, util.IndexInSlice(dig.digest, "0", Strcmp) != -1) + assert.True(t, util.IndexInSlice(dig.digest, "1", Strcmp) != -1) + assert.True(t, util.IndexInSlice(dig.digest, "2", Strcmp) != -1) + assert.True(t, util.IndexInSlice(dig.digest, "3", Strcmp) != -1) } }) // Ensure inst1 requested only needed updates from inst2 inst2.hook(func(m interface{}) { if req, isReq := m.(*reqMsg); isReq { - assert.True(t, util.IndexInSlice(req.items, uint64(1), numericCompare) == -1) - assert.True(t, util.IndexInSlice(req.items, uint64(3), numericCompare) == -1) + assert.True(t, util.IndexInSlice(req.items, "1", Strcmp) == -1) + assert.True(t, util.IndexInSlice(req.items, "3", Strcmp) == -1) - assert.True(t, util.IndexInSlice(req.items, uint64(0), numericCompare) != -1) - assert.True(t, util.IndexInSlice(req.items, uint64(2), numericCompare) != -1) + assert.True(t, util.IndexInSlice(req.items, "0", Strcmp) != -1) + assert.True(t, util.IndexInSlice(req.items, "2", Strcmp) != -1) } }) // Ensure inst1 received only needed updates from inst2 inst1.hook(func(m interface{}) { if res, isRes := m.(*resMsg); isRes { - assert.True(t, util.IndexInSlice(res.items, uint64(1), numericCompare) == -1) - assert.True(t, util.IndexInSlice(res.items, uint64(3), numericCompare) == -1) + assert.True(t, util.IndexInSlice(res.items, "1", Strcmp) == -1) + assert.True(t, util.IndexInSlice(res.items, "3", Strcmp) == -1) - assert.True(t, util.IndexInSlice(res.items, uint64(0), numericCompare) != -1) - assert.True(t, util.IndexInSlice(res.items, uint64(2), numericCompare) != -1) + assert.True(t, util.IndexInSlice(res.items, "0", Strcmp) != -1) + assert.True(t, util.IndexInSlice(res.items, "2", Strcmp) != -1) } }) @@ -284,6 +289,7 @@ func TestPullEngineSelectiveUpdates(t *testing.T) { } func TestByzantineResponder(t *testing.T) { + t.Parallel() // Scenario: inst1 sends hello to inst2 but inst3 is byzantine so it attempts to send a digest and a response to inst1. // expected outcome is for inst1 not to process updates from inst3. peers := make(map[string]*pullTestInstance) @@ -296,12 +302,12 @@ func TestByzantineResponder(t *testing.T) { receivedDigestFromInst3 := int32(0) - inst2.Add(uint64(1), uint64(2), uint64(3)) - inst3.Add(uint64(5), uint64(6), uint64(7)) + inst2.Add("1", "2", "3") + inst3.Add("1", "6", "7") inst2.hook(func(m interface{}) { if _, isHello := m.(*helloMsg); isHello { - inst3.SendDigest([]uint64{uint64(5), uint64(6), uint64(7)}, 0, "p1") + inst3.SendDigest([]string{"5", "6", "7"}, 0, "p1") } }) @@ -310,14 +316,14 @@ func TestByzantineResponder(t *testing.T) { if dig.source == "p3" { atomic.StoreInt32(&receivedDigestFromInst3, int32(1)) time.AfterFunc(time.Duration(150)*time.Millisecond, func() { - inst3.SendRes([]uint64{uint64(5), uint64(6), uint64(7)}, "p1", 0) + inst3.SendRes([]string{"5", "6", "7"}, "p1", 0) }) } } if res, isRes := m.(*resMsg); isRes { // the response is from p3 - if util.IndexInSlice(res.items, uint64(6), numericCompare) != -1 { + if util.IndexInSlice(res.items, "6", Strcmp) != -1 { // inst1 is currently accepting responses assert.Equal(t, int32(1), atomic.LoadInt32(&(inst1.acceptingResponses)), "inst1 is not accepting digests") } @@ -330,17 +336,18 @@ func TestByzantineResponder(t *testing.T) { assert.Equal(t, int32(1), atomic.LoadInt32(&receivedDigestFromInst3), "inst1 hasn't received a digest from inst3") - assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(1), numericCompare) != -1) - assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(2), numericCompare) != -1) - assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(3), numericCompare) != -1) + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), "1", Strcmp) != -1) + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), "2", Strcmp) != -1) + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), "3", Strcmp) != -1) - assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(5), numericCompare) == -1) - assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(6), numericCompare) == -1) - assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(7), numericCompare) == -1) + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), "5", Strcmp) == -1) + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), "6", Strcmp) == -1) + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), "7", Strcmp) == -1) } func TestMultipleInitiators(t *testing.T) { + t.Parallel() // Scenario: inst1, inst2 and inst3 both start protocol with inst4 at the same time. // Expected outcome: inst4 successfully transfers state to all of them peers := make(map[string]*pullTestInstance) @@ -353,7 +360,7 @@ func TestMultipleInitiators(t *testing.T) { defer inst3.stop() defer inst4.stop() - inst4.Add(uint64(1), uint64(2), uint64(3), uint64(4)) + inst4.Add("1", "2", "3", "4") inst1.setNextPeerSelection([]string{"p4"}) inst2.setNextPeerSelection([]string{"p4"}) inst3.setNextPeerSelection([]string{"p4"}) @@ -361,15 +368,16 @@ func TestMultipleInitiators(t *testing.T) { time.Sleep(time.Duration(2000) * time.Millisecond) for _, inst := range []*pullTestInstance{inst1, inst2, inst3} { - assert.True(t, util.IndexInSlice(inst.state.ToArray(), uint64(1), numericCompare) != -1) - assert.True(t, util.IndexInSlice(inst.state.ToArray(), uint64(2), numericCompare) != -1) - assert.True(t, util.IndexInSlice(inst.state.ToArray(), uint64(3), numericCompare) != -1) - assert.True(t, util.IndexInSlice(inst.state.ToArray(), uint64(4), numericCompare) != -1) + assert.True(t, util.IndexInSlice(inst.state.ToArray(), "1", Strcmp) != -1) + assert.True(t, util.IndexInSlice(inst.state.ToArray(), "2", Strcmp) != -1) + assert.True(t, util.IndexInSlice(inst.state.ToArray(), "3", Strcmp) != -1) + assert.True(t, util.IndexInSlice(inst.state.ToArray(), "4", Strcmp) != -1) } } func TestLatePeers(t *testing.T) { + t.Parallel() // Scenario: inst1 initiates to inst2 (items: {1,2,3,4}) and inst3 (items: {5,6,7,8}), // but inst2 is too slow to respond, and all items // should be received from inst3. @@ -380,8 +388,8 @@ func TestLatePeers(t *testing.T) { defer inst1.stop() defer inst2.stop() defer inst3.stop() - inst2.Add(uint64(1), uint64(2), uint64(3), uint64(4)) - inst3.Add(uint64(5), uint64(6), uint64(7), uint64(8)) + inst2.Add("1", "2", "3", "4") + inst3.Add("5", "6", "7", "8") inst2.hook(func(m interface{}) { time.Sleep(time.Duration(600) * time.Millisecond) }) @@ -389,19 +397,20 @@ func TestLatePeers(t *testing.T) { time.Sleep(time.Duration(2000) * time.Millisecond) - assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(1), numericCompare) == -1) - assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(2), numericCompare) == -1) - assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(3), numericCompare) == -1) - assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(4), numericCompare) == -1) + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), "1", Strcmp) == -1) + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), "2", Strcmp) == -1) + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), "3", Strcmp) == -1) + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), "4", Strcmp) == -1) - assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(5), numericCompare) != -1) - assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(6), numericCompare) != -1) - assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(7), numericCompare) != -1) - assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(8), numericCompare) != -1) + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), "5", Strcmp) != -1) + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), "6", Strcmp) != -1) + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), "7", Strcmp) != -1) + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), "8", Strcmp) != -1) } func TestBiDiUpdates(t *testing.T) { + t.Parallel() // Scenario: inst1 has {1, 3} and inst2 has {0,2} and both initiate to the other at the same time. // Expected outcome: both have {0,1,2,3} in the end peers := make(map[string]*pullTestInstance) @@ -410,27 +419,28 @@ func TestBiDiUpdates(t *testing.T) { defer inst1.stop() defer inst2.stop() - inst1.Add(uint64(1), uint64(3)) - inst2.Add(uint64(0), uint64(2)) + inst1.Add("1", "3") + inst2.Add("0", "2") inst1.setNextPeerSelection([]string{"p2"}) inst2.setNextPeerSelection([]string{"p1"}) time.Sleep(time.Duration(2000) * time.Millisecond) - assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(0), numericCompare) != -1) - assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(1), numericCompare) != -1) - assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(2), numericCompare) != -1) - assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(3), numericCompare) != -1) + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), "0", Strcmp) != -1) + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), "1", Strcmp) != -1) + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), "2", Strcmp) != -1) + assert.True(t, util.IndexInSlice(inst1.state.ToArray(), "3", Strcmp) != -1) - assert.True(t, util.IndexInSlice(inst2.state.ToArray(), uint64(0), numericCompare) != -1) - assert.True(t, util.IndexInSlice(inst2.state.ToArray(), uint64(1), numericCompare) != -1) - assert.True(t, util.IndexInSlice(inst2.state.ToArray(), uint64(2), numericCompare) != -1) - assert.True(t, util.IndexInSlice(inst2.state.ToArray(), uint64(3), numericCompare) != -1) + assert.True(t, util.IndexInSlice(inst2.state.ToArray(), "0", Strcmp) != -1) + assert.True(t, util.IndexInSlice(inst2.state.ToArray(), "1", Strcmp) != -1) + assert.True(t, util.IndexInSlice(inst2.state.ToArray(), "2", Strcmp) != -1) + assert.True(t, util.IndexInSlice(inst2.state.ToArray(), "3", Strcmp) != -1) } func TestSpread(t *testing.T) { + t.Parallel() // Scenario: inst1 initiates to inst2, inst3 inst4 and each have items 0-100. inst5 also has the same items but isn't selected // Expected outcome: each responder (inst2, inst3 and inst4) is chosen at least once (the probability for not choosing each of them is slim) // inst5 isn't selected at all @@ -470,7 +480,7 @@ func TestSpread(t *testing.T) { inst5.hook(addToCounters("p5")) for i := 0; i < 100; i++ { - item := uint64(i) + item := fmt.Sprintf("%d", i) inst2.Add(item) inst3.Add(item) inst4.Add(item) @@ -492,8 +502,8 @@ func TestSpread(t *testing.T) { } -func numericCompare(a interface{}, b interface{}) bool { - return a.(uint64) == b.(uint64) +func Strcmp(a interface{}, b interface{}) bool { + return a.(string) == b.(string) } func keySet(selfPeer string, m map[string]*pullTestInstance) []string { diff --git a/gossip/gossip/gossip_impl.go b/gossip/gossip/gossip_impl.go index 0f3e0515954..98d33b361ad 100644 --- a/gossip/gossip/gossip_impl.go +++ b/gossip/gossip/gossip_impl.go @@ -18,6 +18,7 @@ package gossip import ( "bytes" + "fmt" "sync" "sync/atomic" "time" @@ -82,7 +83,7 @@ func NewGossipService(conf *Config, c comm.Comm, crypto discovery.CryptoService) g.msgStore = newMessageStore(proto.NewGossipMessageComparator(g.conf.MaxMessageCountToStore), func(m interface{}) { if dataMsg, isDataMsg := m.(*proto.DataMessage); isDataMsg { - g.pushPull.Remove(dataMsg.Payload.SeqNum) + g.pushPull.Remove(fmt.Sprintf("%d", dataMsg.Payload.SeqNum)) } }) @@ -177,7 +178,9 @@ func (g *gossipServiceImpl) Hello(dest string, nonce uint64) { Nonce: 0, Content: &proto.GossipMessage_Hello{ Hello: &proto.GossipHello{ - Nonce: nonce, + Nonce: nonce, + Metadata: nil, + MsgType: proto.MsgType_BlockMessage, }, }, } @@ -187,29 +190,31 @@ func (g *gossipServiceImpl) Hello(dest string, nonce uint64) { } -func (g *gossipServiceImpl) SendDigest(digest []uint64, nonce uint64, context interface{}) { +func (g *gossipServiceImpl) SendDigest(digest []string, nonce uint64, context interface{}) { digMsg := &proto.GossipMessage{ Tag: proto.GossipMessage_EMPTY, Nonce: 0, Content: &proto.GossipMessage_DataDig{ DataDig: &proto.DataDigest{ - Nonce: nonce, - SeqMap: digest, + MsgType: proto.MsgType_BlockMessage, + Nonce: nonce, + Digests: digest, }, }, } - g.logger.Debug("Sending digest", digMsg.GetDataDig().SeqMap) + g.logger.Debug("Sending digest", digMsg.GetDataDig().Digests) context.(comm.ReceivedMessage).Respond(digMsg) } -func (g *gossipServiceImpl) SendReq(dest string, items []uint64, nonce uint64) { +func (g *gossipServiceImpl) SendReq(dest string, items []string, nonce uint64) { req := &proto.GossipMessage{ Tag: proto.GossipMessage_EMPTY, Nonce: 0, Content: &proto.GossipMessage_DataReq{ DataReq: &proto.DataRequest{ - Nonce: nonce, - SeqMap: items, + MsgType: proto.MsgType_BlockMessage, + Nonce: nonce, + Digests: items, }, }, } @@ -217,15 +222,15 @@ func (g *gossipServiceImpl) SendReq(dest string, items []uint64, nonce uint64) { g.comm.Send(req, g.peersWithEndpoints(dest)...) } -func (g *gossipServiceImpl) SendRes(requestedItems []uint64, context interface{}, nonce uint64) { - itemMap := make(map[uint64]*proto.DataMessage) +func (g *gossipServiceImpl) SendRes(requestedItems []string, context interface{}, nonce uint64) { + itemMap := make(map[string]*proto.GossipMessage) for _, msg := range g.msgStore.get() { if dataMsg := msg.(*proto.GossipMessage).GetDataMsg(); dataMsg != nil { - itemMap[dataMsg.Payload.SeqNum] = dataMsg + itemMap[fmt.Sprintf("%d", dataMsg.Payload.SeqNum)] = msg.(*proto.GossipMessage) } } - dataMsgs := []*proto.DataMessage{} + dataMsgs := []*proto.GossipMessage{} for _, item := range requestedItems { if dataMsg, exists := itemMap[item]; exists { @@ -238,8 +243,9 @@ func (g *gossipServiceImpl) SendRes(requestedItems []uint64, context interface{} Nonce: 0, Content: &proto.GossipMessage_DataUpdate{ DataUpdate: &proto.DataUpdate{ - Nonce: nonce, - Data: dataMsgs, + MsgType: proto.MsgType_BlockMessage, + Nonce: nonce, + Data: dataMsgs, }, }, } @@ -274,7 +280,7 @@ func (g *gossipServiceImpl) handleMessage(msg comm.ReceivedMessage) { if dataMsg := msg.GetGossipMessage().GetDataMsg(); dataMsg != nil { g.DeMultiplex(msg.GetGossipMessage()) - g.pushPull.Add(dataMsg.Payload.SeqNum) + g.pushPull.Add(fmt.Sprintf("%d", dataMsg.Payload.SeqNum)) } return @@ -296,32 +302,28 @@ func (g *gossipServiceImpl) forwardDiscoveryMsg(msg comm.ReceivedMessage) { func (g *gossipServiceImpl) handlePushPullMsg(msg comm.ReceivedMessage) { g.logger.Debug(msg) if helloMsg := msg.GetGossipMessage().GetHello(); helloMsg != nil { + if helloMsg.MsgType != proto.MsgType_BlockMessage { + return + } g.pushPull.OnHello(helloMsg.Nonce, msg) } if digest := msg.GetGossipMessage().GetDataDig(); digest != nil { - g.pushPull.OnDigest(digest.SeqMap, digest.Nonce, msg) + g.pushPull.OnDigest(digest.Digests, digest.Nonce, msg) } if req := msg.GetGossipMessage().GetDataReq(); req != nil { - g.pushPull.OnReq(req.SeqMap, req.Nonce, msg) + g.pushPull.OnReq(req.Digests, req.Nonce, msg) } if res := msg.GetGossipMessage().GetDataUpdate(); res != nil { - items := make([]uint64, len(res.Data)) + items := make([]string, len(res.Data)) for i, data := range res.Data { - dataMsg := &proto.GossipMessage{ - Tag: proto.GossipMessage_EMPTY, - Content: &proto.GossipMessage_DataMsg{ - DataMsg: data, - }, - Nonce: msg.GetGossipMessage().Nonce, - } - added := g.msgStore.add(dataMsg) + added := g.msgStore.add(data) // if we can't add the message to the msgStore, // no point in disseminating it to others... if !added { continue } - g.DeMultiplex(dataMsg) - items[i] = data.Payload.SeqNum + g.DeMultiplex(data) + items[i] = fmt.Sprintf("%d", data.GetDataMsg().Payload.SeqNum) } g.pushPull.OnRes(items, res.Nonce) } @@ -363,7 +365,7 @@ func (g *gossipServiceImpl) Gossip(msg *proto.GossipMessage) { g.logger.Info(msg) if dataMsg := msg.GetDataMsg(); dataMsg != nil { g.msgStore.add(msg) - g.pushPull.Add(dataMsg.Payload.SeqNum) + g.pushPull.Add(fmt.Sprintf("%d", dataMsg.Payload.SeqNum)) } g.emitter.Add(msg) } diff --git a/gossip/proto/message.pb.go b/gossip/proto/message.pb.go index f0ce43b0e53..d5513b2760f 100644 --- a/gossip/proto/message.pb.go +++ b/gossip/proto/message.pb.go @@ -50,6 +50,27 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto1.ProtoPackageIsVersion2 // please upgrade the proto package +type MsgType int32 + +const ( + MsgType_UNDEFINED MsgType = 0 + MsgType_BlockMessage MsgType = 1 +) + +var MsgType_name = map[int32]string{ + 0: "UNDEFINED", + 1: "BlockMessage", +} +var MsgType_value = map[string]int32{ + "UNDEFINED": 0, + "BlockMessage": 1, +} + +func (x MsgType) String() string { + return proto1.EnumName(MsgType_name, int32(x)) +} +func (MsgType) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + type GossipMessage_Tag int32 const ( @@ -632,8 +653,9 @@ func (*ConnEstablish) Descriptor() ([]byte, []int) { return fileDescriptor0, []i // DataRequest is a message used for a peer to request // certain data blocks from a remote peer type DataRequest struct { - Nonce uint64 `protobuf:"varint,1,opt,name=nonce" json:"nonce,omitempty"` - SeqMap []uint64 `protobuf:"varint,2,rep,packed,name=seqMap" json:"seqMap,omitempty"` + Nonce uint64 `protobuf:"varint,1,opt,name=nonce" json:"nonce,omitempty"` + Digests []string `protobuf:"bytes,2,rep,name=digests" json:"digests,omitempty"` + MsgType MsgType `protobuf:"varint,3,opt,name=msgType,enum=proto.MsgType" json:"msgType,omitempty"` } func (m *DataRequest) Reset() { *m = DataRequest{} } @@ -644,7 +666,9 @@ func (*DataRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int // GossipHello is the message that is used for the peer to initiate // a pull round with another peer type GossipHello struct { - Nonce uint64 `protobuf:"varint,1,opt,name=nonce" json:"nonce,omitempty"` + Nonce uint64 `protobuf:"varint,1,opt,name=nonce" json:"nonce,omitempty"` + Metadata []byte `protobuf:"bytes,2,opt,name=metadata,proto3" json:"metadata,omitempty"` + MsgType MsgType `protobuf:"varint,3,opt,name=msgType,enum=proto.MsgType" json:"msgType,omitempty"` } func (m *GossipHello) Reset() { *m = GossipHello{} } @@ -655,8 +679,9 @@ func (*GossipHello) Descriptor() ([]byte, []int) { return fileDescriptor0, []int // DataUpdate is the the final message in the pull phase // sent from the receiver to the initiator type DataUpdate struct { - Nonce uint64 `protobuf:"varint,1,opt,name=nonce" json:"nonce,omitempty"` - Data []*DataMessage `protobuf:"bytes,2,rep,name=data" json:"data,omitempty"` + Nonce uint64 `protobuf:"varint,1,opt,name=nonce" json:"nonce,omitempty"` + Data []*GossipMessage `protobuf:"bytes,2,rep,name=data" json:"data,omitempty"` + MsgType MsgType `protobuf:"varint,3,opt,name=msgType,enum=proto.MsgType" json:"msgType,omitempty"` } func (m *DataUpdate) Reset() { *m = DataUpdate{} } @@ -664,7 +689,7 @@ func (m *DataUpdate) String() string { return proto1.CompactTextStrin func (*DataUpdate) ProtoMessage() {} func (*DataUpdate) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} } -func (m *DataUpdate) GetData() []*DataMessage { +func (m *DataUpdate) GetData() []*GossipMessage { if m != nil { return m.Data } @@ -674,8 +699,9 @@ func (m *DataUpdate) GetData() []*DataMessage { // DataDigest is the message sent from the receiver peer // to the initator peer and contains the data items it has type DataDigest struct { - Nonce uint64 `protobuf:"varint,1,opt,name=nonce" json:"nonce,omitempty"` - SeqMap []uint64 `protobuf:"varint,2,rep,packed,name=seqMap" json:"seqMap,omitempty"` + Nonce uint64 `protobuf:"varint,1,opt,name=nonce" json:"nonce,omitempty"` + Digests []string `protobuf:"bytes,2,rep,name=digests" json:"digests,omitempty"` + MsgType MsgType `protobuf:"varint,3,opt,name=msgType,enum=proto.MsgType" json:"msgType,omitempty"` } func (m *DataDigest) Reset() { *m = DataDigest{} } @@ -864,6 +890,7 @@ func init() { proto1.RegisterType((*Empty)(nil), "proto.Empty") proto1.RegisterType((*RemoteStateRequest)(nil), "proto.RemoteStateRequest") proto1.RegisterType((*RemoteStateResponse)(nil), "proto.RemoteStateResponse") + proto1.RegisterEnum("proto.MsgType", MsgType_name, MsgType_value) proto1.RegisterEnum("proto.GossipMessage_Tag", GossipMessage_Tag_name, GossipMessage_Tag_value) } @@ -1012,68 +1039,70 @@ var _Gossip_serviceDesc = grpc.ServiceDesc{ func init() { proto1.RegisterFile("message.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 994 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x94, 0x56, 0x5b, 0x4f, 0xe3, 0x46, - 0x14, 0x8e, 0xc9, 0x0d, 0x9f, 0x38, 0x10, 0xce, 0xd2, 0xca, 0x45, 0xad, 0x84, 0xdc, 0x55, 0x9b, - 0x46, 0x25, 0xec, 0x86, 0x87, 0x95, 0x5a, 0x55, 0x2d, 0x10, 0x4a, 0xa8, 0x4a, 0x40, 0x03, 0xdb, - 0x6a, 0xfb, 0x82, 0x86, 0x64, 0x70, 0xac, 0x8d, 0xc7, 0x26, 0x33, 0xb4, 0xe2, 0x2f, 0xec, 0x43, - 0x7f, 0xf3, 0x6a, 0x2e, 0x4e, 0x6c, 0xf0, 0x3e, 0xec, 0x53, 0x7c, 0xce, 0x7c, 0xdf, 0xb9, 0xcc, - 0xb9, 0x4c, 0xa0, 0x1d, 0x33, 0x21, 0x68, 0xc8, 0xfa, 0xe9, 0x22, 0x91, 0x09, 0xd6, 0xf5, 0x4f, - 0xf0, 0x7f, 0x13, 0xda, 0xa7, 0x89, 0x10, 0x51, 0x7a, 0x6e, 0x8e, 0x71, 0x1b, 0xea, 0x3c, 0xe1, - 0x13, 0xe6, 0x3b, 0xbb, 0x4e, 0xb7, 0x46, 0x8c, 0x80, 0x3e, 0x34, 0x27, 0x33, 0xca, 0x39, 0x9b, - 0xfb, 0x6b, 0xbb, 0x4e, 0xd7, 0x23, 0x99, 0x88, 0x3d, 0xa8, 0x4a, 0x1a, 0xfa, 0xd5, 0x5d, 0xa7, - 0xbb, 0x31, 0xf0, 0x8d, 0xf5, 0x7e, 0xc1, 0x64, 0xff, 0x9a, 0x86, 0x44, 0x81, 0xf0, 0x35, 0xac, - 0xd3, 0x79, 0xf4, 0x2f, 0x3b, 0x17, 0xa1, 0x5f, 0xdb, 0x75, 0xba, 0xad, 0xc1, 0x0b, 0x4b, 0x38, - 0xd4, 0x6a, 0x83, 0x1f, 0x55, 0xc8, 0x12, 0x86, 0x03, 0x68, 0xc4, 0x2c, 0x26, 0xec, 0xde, 0xaf, - 0x6b, 0x42, 0xe6, 0xe1, 0x9c, 0xc5, 0xb7, 0x6c, 0x21, 0x66, 0x51, 0x4a, 0xd8, 0xfd, 0x03, 0x13, - 0x72, 0x54, 0x21, 0x16, 0x89, 0x07, 0x96, 0x23, 0xfc, 0x86, 0xe6, 0x7c, 0x55, 0xc2, 0x11, 0x69, - 0xc2, 0x05, 0x5b, 0x92, 0x04, 0xf6, 0xa1, 0x39, 0xa5, 0x92, 0xaa, 0xd0, 0x9a, 0x9a, 0x85, 0x96, - 0x35, 0x54, 0xda, 0x65, 0x64, 0x19, 0x08, 0x7b, 0x50, 0x9f, 0xb1, 0xf9, 0x3c, 0xf1, 0xff, 0x2e, - 0xa0, 0x4d, 0xe6, 0x23, 0x75, 0x32, 0xaa, 0x10, 0x03, 0xc1, 0x3d, 0x63, 0x7b, 0x18, 0x85, 0xbe, - 0xab, 0xd1, 0x5b, 0x39, 0xdb, 0xc3, 0x28, 0x34, 0xe1, 0x67, 0x98, 0x2c, 0x14, 0x95, 0x34, 0x3c, - 0x0b, 0x65, 0x95, 0x6e, 0x06, 0xc2, 0x03, 0x00, 0xf5, 0xf9, 0x36, 0x9d, 0x52, 0xc9, 0xfc, 0xd6, - 0x33, 0x0f, 0xe6, 0x60, 0x54, 0x21, 0x39, 0x18, 0xbe, 0x36, 0x15, 0x3d, 0x8e, 0xa7, 0xbe, 0xa7, - 0x19, 0x5f, 0x58, 0xc6, 0xb1, 0x29, 0xec, 0x71, 0x12, 0xc7, 0x94, 0x4f, 0x95, 0x1f, 0x8b, 0xc3, - 0x97, 0x50, 0x67, 0x71, 0x2a, 0x1f, 0xfd, 0xb6, 0x26, 0x78, 0x96, 0x70, 0xa2, 0x74, 0x2a, 0x59, - 0x7d, 0x88, 0x3d, 0xa8, 0x4d, 0x12, 0xce, 0xfd, 0x0d, 0x0d, 0xda, 0xce, 0xac, 0x26, 0x9c, 0x9f, - 0x08, 0x49, 0x6f, 0xe7, 0x91, 0x98, 0x8d, 0x2a, 0x44, 0x63, 0xf0, 0x15, 0xb8, 0x42, 0x52, 0xc9, - 0xce, 0xf8, 0x5d, 0xe2, 0x6f, 0x6a, 0x42, 0xc7, 0x12, 0xae, 0x32, 0xfd, 0xa8, 0x42, 0x56, 0x20, - 0xfc, 0x15, 0x3c, 0x2d, 0xd8, 0x6b, 0xf0, 0x3b, 0x85, 0x0a, 0x13, 0x16, 0x27, 0x92, 0x5d, 0xe5, - 0x00, 0xa3, 0x0a, 0x29, 0x10, 0xf0, 0x08, 0xda, 0x56, 0x36, 0x2d, 0xe0, 0x6f, 0x69, 0x0b, 0x3b, - 0x65, 0x16, 0x96, 0x4d, 0x52, 0xa4, 0x04, 0x63, 0xa8, 0x5e, 0xd3, 0x10, 0xdb, 0xe0, 0xbe, 0x1d, - 0x0f, 0x4f, 0x7e, 0x3f, 0x1b, 0x9f, 0x0c, 0x3b, 0x15, 0x74, 0xa1, 0x7e, 0x72, 0x7e, 0x79, 0xfd, - 0xae, 0xe3, 0xa0, 0x07, 0xeb, 0x17, 0xe4, 0xf4, 0xe6, 0x62, 0xfc, 0xe7, 0xbb, 0xce, 0x9a, 0xc2, - 0x1d, 0x8f, 0x0e, 0xc7, 0x46, 0xac, 0x62, 0x07, 0x3c, 0x2d, 0x1e, 0x8e, 0x87, 0x37, 0x17, 0xe4, - 0xb4, 0x53, 0x3b, 0x72, 0xa1, 0x39, 0x49, 0xb8, 0x64, 0x5c, 0x06, 0x1f, 0x1c, 0x70, 0x97, 0xa9, - 0xe3, 0x0e, 0xac, 0xc7, 0x4c, 0x52, 0x55, 0x36, 0x3d, 0x8f, 0x1e, 0x59, 0xca, 0xb8, 0x07, 0xae, - 0x8c, 0x62, 0x26, 0x24, 0x8d, 0x53, 0x3d, 0x94, 0xad, 0xc1, 0xa6, 0x4d, 0xe2, 0x92, 0xb1, 0xc5, - 0x75, 0x14, 0x33, 0xb2, 0x42, 0xa8, 0xb9, 0x4e, 0xdf, 0x47, 0x67, 0x43, 0x3d, 0xa9, 0x1e, 0x31, - 0x02, 0x7e, 0x0d, 0xae, 0x88, 0x42, 0x4e, 0xe5, 0xc3, 0x82, 0xe9, 0x91, 0xf4, 0xc8, 0x4a, 0x11, - 0xf4, 0x60, 0xa3, 0xd8, 0x0d, 0x6a, 0x0f, 0xa4, 0xf4, 0x71, 0x9e, 0xd0, 0xa9, 0x8d, 0x27, 0x13, - 0x83, 0x37, 0xd0, 0x2e, 0xd4, 0x18, 0x3b, 0x50, 0x15, 0x51, 0x68, 0x61, 0xea, 0x73, 0x15, 0xc2, - 0x5a, 0x2e, 0x84, 0xe0, 0x67, 0x68, 0xe5, 0xfa, 0xfa, 0x13, 0xfb, 0xe7, 0x4b, 0x68, 0x08, 0x76, - 0x7f, 0x4e, 0x55, 0xa6, 0xd5, 0x6e, 0x8d, 0x58, 0x29, 0xf8, 0x16, 0x5a, 0xb9, 0x89, 0x2b, 0x27, - 0x07, 0x7f, 0x00, 0xac, 0xc6, 0xe0, 0x13, 0x0e, 0xbe, 0x83, 0x9a, 0xbe, 0x65, 0x65, 0xbe, 0x74, - 0xf6, 0x89, 0x3e, 0x0f, 0x7e, 0x32, 0xb6, 0xcc, 0xd0, 0x7e, 0x66, 0xb0, 0x6f, 0x4c, 0xa6, 0xd9, - 0xa6, 0xed, 0x16, 0xef, 0xb2, 0x35, 0xd8, 0xc8, 0xca, 0x67, 0xb4, 0xab, 0xbb, 0x3d, 0x83, 0xa6, - 0xd5, 0x59, 0xdb, 0xe3, 0x87, 0xd8, 0xba, 0xb4, 0x12, 0x22, 0xd4, 0x66, 0x54, 0xcc, 0xf4, 0xd5, - 0xba, 0x44, 0x7f, 0x2b, 0x9d, 0xce, 0xc9, 0x54, 0xdc, 0xc4, 0xff, 0xc1, 0x01, 0x2f, 0xbf, 0x6c, - 0x71, 0x0f, 0x20, 0x5e, 0xee, 0x45, 0x1b, 0x48, 0xbb, 0xb0, 0x30, 0x49, 0x0e, 0xf0, 0xb9, 0x5d, - 0x57, 0xe8, 0xaf, 0xea, 0xd3, 0xfe, 0x3a, 0x84, 0xf5, 0x8c, 0x84, 0xdf, 0x00, 0x44, 0x7c, 0x72, - 0xc3, 0x1f, 0x94, 0x2b, 0x9b, 0x9c, 0x1b, 0xf1, 0xc9, 0x58, 0x2b, 0x72, 0x79, 0xaf, 0xe5, 0xf3, - 0x0e, 0x66, 0xb0, 0xf5, 0xec, 0x29, 0xc0, 0x5f, 0x60, 0x53, 0xb0, 0xf9, 0x9d, 0x1a, 0xa1, 0x45, - 0x4c, 0x65, 0x94, 0x70, 0x9b, 0x58, 0xd9, 0x73, 0x43, 0x9e, 0x62, 0x55, 0x55, 0xdf, 0xf3, 0xe4, - 0x3f, 0xae, 0xcb, 0xe7, 0x11, 0x23, 0x04, 0x33, 0xc0, 0xe7, 0x0f, 0x08, 0xfe, 0x00, 0x75, 0xfd, - 0x56, 0xf9, 0x8e, 0x6e, 0x9c, 0x52, 0x07, 0x06, 0x81, 0xdf, 0x43, 0x6d, 0xca, 0xe8, 0xd4, 0xb6, - 0x58, 0x29, 0x52, 0x03, 0x82, 0xbf, 0xa0, 0x61, 0x3c, 0xa9, 0xf9, 0x67, 0x7c, 0x9a, 0x26, 0x11, - 0x97, 0x3a, 0x03, 0x97, 0x2c, 0xe5, 0xc2, 0x6e, 0x58, 0x7b, 0xb2, 0x1b, 0x4a, 0x87, 0x3d, 0x68, - 0x42, 0x5d, 0xef, 0xea, 0xa0, 0x0f, 0xf8, 0x7c, 0x53, 0xaa, 0xd9, 0x36, 0x97, 0x2a, 0x74, 0x32, - 0x35, 0x92, 0x89, 0xc1, 0x21, 0xbc, 0x28, 0xd9, 0x8b, 0xd8, 0x83, 0x75, 0xdb, 0xa1, 0xc2, 0xa6, - 0xff, 0xb4, 0x83, 0x97, 0xe7, 0x83, 0x14, 0x1a, 0x66, 0x50, 0xf1, 0x37, 0xf0, 0xcc, 0xd7, 0x95, - 0x5c, 0x30, 0x1a, 0xe3, 0x76, 0xd9, 0x7f, 0x86, 0x9d, 0x52, 0x6d, 0x50, 0xe9, 0x3a, 0xaf, 0x1c, - 0x7c, 0x09, 0xb5, 0xcb, 0x88, 0x87, 0x58, 0x78, 0x80, 0x76, 0x0a, 0x52, 0x50, 0x39, 0xfa, 0xf1, - 0x9f, 0x5e, 0x18, 0xc9, 0xd9, 0xc3, 0x6d, 0x7f, 0x92, 0xc4, 0xfb, 0xb3, 0xc7, 0x94, 0x2d, 0xe6, - 0x6c, 0x1a, 0xb2, 0xc5, 0xfe, 0x1d, 0xbd, 0x5d, 0x44, 0x93, 0xfd, 0x50, 0x9b, 0xde, 0xd7, 0xac, - 0xdb, 0x86, 0xfe, 0x39, 0xf8, 0x18, 0x00, 0x00, 0xff, 0xff, 0x54, 0x9e, 0xcc, 0xd3, 0x27, 0x09, - 0x00, 0x00, + // 1036 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xb4, 0x55, 0xdf, 0x4f, 0xe3, 0x46, + 0x10, 0x8e, 0xf3, 0x13, 0x4f, 0x1c, 0x08, 0x73, 0xd7, 0xca, 0x45, 0xad, 0x84, 0xac, 0x93, 0x9a, + 0x46, 0x25, 0xdc, 0xc1, 0xc3, 0x3d, 0x55, 0x2d, 0x10, 0x4a, 0x90, 0x4a, 0x40, 0x0b, 0xd7, 0xea, + 0xfa, 0x82, 0x96, 0x64, 0x71, 0x2c, 0xe2, 0xb5, 0xc9, 0x2e, 0x57, 0xf1, 0x2f, 0xf4, 0xa1, 0x7f, + 0x73, 0xb5, 0x3f, 0xec, 0xd8, 0x10, 0xaa, 0xde, 0xc3, 0x3d, 0xd9, 0x33, 0xfb, 0x7d, 0x3b, 0x33, + 0xbb, 0xf3, 0xcd, 0x42, 0x27, 0x66, 0x42, 0xd0, 0x90, 0x0d, 0xd2, 0x45, 0x22, 0x13, 0x6c, 0xe8, + 0x4f, 0xf0, 0x4f, 0x0b, 0x3a, 0x27, 0x89, 0x10, 0x51, 0x7a, 0x66, 0x96, 0xf1, 0x35, 0x34, 0x78, + 0xc2, 0x27, 0xcc, 0x77, 0xb6, 0x9d, 0x5e, 0x9d, 0x18, 0x03, 0x7d, 0x68, 0x4d, 0x66, 0x94, 0x73, + 0x36, 0xf7, 0xab, 0xdb, 0x4e, 0xcf, 0x23, 0x99, 0x89, 0x7d, 0xa8, 0x49, 0x1a, 0xfa, 0xb5, 0x6d, + 0xa7, 0xb7, 0xbe, 0xe7, 0x9b, 0xdd, 0x07, 0xa5, 0x2d, 0x07, 0x57, 0x34, 0x24, 0x0a, 0x84, 0xef, + 0x60, 0x8d, 0xce, 0xa3, 0x4f, 0xec, 0x4c, 0x84, 0x7e, 0x7d, 0xdb, 0xe9, 0xb5, 0xf7, 0x5e, 0x59, + 0xc2, 0x81, 0x76, 0x1b, 0xfc, 0xa8, 0x42, 0x72, 0x18, 0xee, 0x41, 0x33, 0x66, 0x31, 0x61, 0xf7, + 0x7e, 0x43, 0x13, 0xb2, 0x08, 0x67, 0x2c, 0xbe, 0x61, 0x0b, 0x31, 0x8b, 0x52, 0xc2, 0xee, 0x1f, + 0x98, 0x90, 0xa3, 0x0a, 0xb1, 0x48, 0xdc, 0xb7, 0x1c, 0xe1, 0x37, 0x35, 0xe7, 0x9b, 0x15, 0x1c, + 0x91, 0x26, 0x5c, 0xb0, 0x9c, 0x24, 0x70, 0x00, 0xad, 0x29, 0x95, 0x54, 0xa5, 0xd6, 0xd2, 0x2c, + 0xb4, 0xac, 0xa1, 0xf2, 0xe6, 0x99, 0x65, 0x20, 0xec, 0x43, 0x63, 0xc6, 0xe6, 0xf3, 0xc4, 0xff, + 0xa3, 0x84, 0x36, 0x95, 0x8f, 0xd4, 0xca, 0xa8, 0x42, 0x0c, 0x04, 0x77, 0xcc, 0xde, 0xc3, 0x28, + 0xf4, 0x5d, 0x8d, 0xde, 0x2c, 0xec, 0x3d, 0x8c, 0x42, 0x93, 0x7e, 0x86, 0xc9, 0x52, 0x51, 0x45, + 0xc3, 0xb3, 0x54, 0x96, 0xe5, 0x66, 0x20, 0xdc, 0x07, 0x50, 0xbf, 0x1f, 0xd2, 0x29, 0x95, 0xcc, + 0x6f, 0x3f, 0x8b, 0x60, 0x16, 0x46, 0x15, 0x52, 0x80, 0xe1, 0x3b, 0x73, 0xa3, 0x47, 0xf1, 0xd4, + 0xf7, 0x34, 0xe3, 0x2b, 0xcb, 0x38, 0x32, 0x17, 0x7b, 0x94, 0xc4, 0x31, 0xe5, 0x53, 0x15, 0xc7, + 0xe2, 0xf0, 0x0d, 0x34, 0x58, 0x9c, 0xca, 0x47, 0xbf, 0xa3, 0x09, 0x9e, 0x25, 0x1c, 0x2b, 0x9f, + 0x2a, 0x56, 0x2f, 0x62, 0x1f, 0xea, 0x93, 0x84, 0x73, 0x7f, 0x5d, 0x83, 0x5e, 0x67, 0xbb, 0x26, + 0x9c, 0x1f, 0x0b, 0x49, 0x6f, 0xe6, 0x91, 0x98, 0x8d, 0x2a, 0x44, 0x63, 0xf0, 0x2d, 0xb8, 0x42, + 0x52, 0xc9, 0x4e, 0xf9, 0x6d, 0xe2, 0x6f, 0x68, 0x42, 0xd7, 0x12, 0x2e, 0x33, 0xff, 0xa8, 0x42, + 0x96, 0x20, 0xfc, 0x19, 0x3c, 0x6d, 0xd8, 0x63, 0xf0, 0xbb, 0xa5, 0x1b, 0x26, 0x2c, 0x4e, 0x24, + 0xbb, 0x2c, 0x00, 0x46, 0x15, 0x52, 0x22, 0xe0, 0x21, 0x74, 0xac, 0x6d, 0x5a, 0xc0, 0xdf, 0xd4, + 0x3b, 0x6c, 0xad, 0xda, 0x21, 0x6f, 0x92, 0x32, 0x25, 0x18, 0x43, 0xed, 0x8a, 0x86, 0xd8, 0x01, + 0xf7, 0xc3, 0x78, 0x78, 0xfc, 0xeb, 0xe9, 0xf8, 0x78, 0xd8, 0xad, 0xa0, 0x0b, 0x8d, 0xe3, 0xb3, + 0x8b, 0xab, 0x8f, 0x5d, 0x07, 0x3d, 0x58, 0x3b, 0x27, 0x27, 0xd7, 0xe7, 0xe3, 0xdf, 0x3e, 0x76, + 0xab, 0x0a, 0x77, 0x34, 0x3a, 0x18, 0x1b, 0xb3, 0x86, 0x5d, 0xf0, 0xb4, 0x79, 0x30, 0x1e, 0x5e, + 0x9f, 0x93, 0x93, 0x6e, 0xfd, 0xd0, 0x85, 0xd6, 0x24, 0xe1, 0x92, 0x71, 0x19, 0xfc, 0xed, 0x80, + 0x9b, 0x97, 0x8e, 0x5b, 0xb0, 0x16, 0x33, 0x49, 0xd5, 0xb5, 0x69, 0x3d, 0x7a, 0x24, 0xb7, 0x71, + 0x07, 0x5c, 0x19, 0xc5, 0x4c, 0x48, 0x1a, 0xa7, 0x5a, 0x94, 0xed, 0xbd, 0x0d, 0x5b, 0xc4, 0x05, + 0x63, 0x8b, 0xab, 0x28, 0x66, 0x64, 0x89, 0x50, 0xba, 0x4e, 0xef, 0xa2, 0xd3, 0xa1, 0x56, 0xaa, + 0x47, 0x8c, 0x81, 0xdf, 0x82, 0x2b, 0xa2, 0x90, 0x53, 0xf9, 0xb0, 0x60, 0x5a, 0x92, 0x1e, 0x59, + 0x3a, 0x82, 0x3e, 0xac, 0x97, 0xbb, 0x41, 0xcd, 0x81, 0x94, 0x3e, 0xce, 0x13, 0x3a, 0xb5, 0xf9, + 0x64, 0x66, 0xf0, 0x1e, 0x3a, 0xa5, 0x3b, 0xc6, 0x2e, 0xd4, 0x44, 0x14, 0x5a, 0x98, 0xfa, 0x5d, + 0xa6, 0x50, 0x2d, 0xa4, 0x10, 0x84, 0xd0, 0x2e, 0xf4, 0xf5, 0xcb, 0xf3, 0x67, 0xaa, 0x75, 0x22, + 0xfc, 0xea, 0x76, 0xad, 0xe7, 0x92, 0xcc, 0xc4, 0x1e, 0xb4, 0x62, 0x11, 0x5e, 0x3d, 0xa6, 0xcc, + 0xce, 0xa0, 0xf5, 0x4c, 0xed, 0xc6, 0x4b, 0xb2, 0xe5, 0x20, 0x82, 0x76, 0x41, 0x9d, 0x2f, 0x04, + 0x2a, 0x9e, 0x78, 0xf5, 0xc9, 0x89, 0xff, 0xff, 0x50, 0x9f, 0x00, 0x96, 0xc2, 0x7b, 0x21, 0x52, + 0x0f, 0xea, 0x36, 0x4a, 0xad, 0xa0, 0x93, 0xd2, 0xe4, 0x24, 0xf5, 0xcf, 0x8c, 0x7b, 0x6b, 0xe2, + 0x9a, 0x91, 0xf2, 0x05, 0x8f, 0xf2, 0xbd, 0xb9, 0xb3, 0xec, 0xcd, 0xe8, 0x95, 0xbb, 0xa2, 0x9d, + 0x13, 0x2f, 0x8c, 0x77, 0xd9, 0x25, 0xa7, 0xd0, 0xb2, 0x3e, 0xfc, 0x1a, 0x9a, 0x82, 0xdd, 0x8f, + 0x1f, 0x62, 0x9b, 0x9e, 0xb5, 0x10, 0xa1, 0x3e, 0xa3, 0x62, 0xa6, 0x4f, 0xdf, 0x25, 0xfa, 0x5f, + 0xf9, 0xf4, 0x59, 0x99, 0xde, 0xd5, 0xff, 0x4a, 0x29, 0x5e, 0xf1, 0xd9, 0xc0, 0x1d, 0x80, 0x38, + 0x9f, 0xf0, 0x36, 0x91, 0x4e, 0x69, 0xf4, 0x93, 0x02, 0xe0, 0x73, 0xf5, 0x53, 0x52, 0x4a, 0xed, + 0xa9, 0x52, 0x0e, 0x60, 0x2d, 0x23, 0xe1, 0x77, 0x00, 0x11, 0x9f, 0x5c, 0xf3, 0x07, 0x15, 0xca, + 0x16, 0xe7, 0x46, 0x7c, 0x32, 0xd6, 0x8e, 0x42, 0xdd, 0xd5, 0x62, 0xdd, 0xc1, 0x0c, 0x36, 0x9f, + 0x3d, 0x6a, 0xf8, 0x13, 0x6c, 0x08, 0x36, 0xbf, 0x55, 0xc3, 0x60, 0x11, 0x53, 0x19, 0x25, 0xdc, + 0x16, 0xb6, 0xea, 0xe1, 0x24, 0x4f, 0xb1, 0xaa, 0x03, 0xee, 0x78, 0xf2, 0x17, 0xd7, 0x37, 0xed, + 0x11, 0x63, 0x04, 0x33, 0xc0, 0xe7, 0x4f, 0x21, 0xfe, 0x00, 0x0d, 0xfd, 0xea, 0xfa, 0x8e, 0x6e, + 0xc8, 0x95, 0x01, 0x0c, 0x02, 0xbf, 0x87, 0xfa, 0x94, 0xd1, 0xa9, 0x6d, 0xdd, 0x95, 0x48, 0x0d, + 0x08, 0x7e, 0x87, 0xa6, 0x89, 0xa4, 0x74, 0xc5, 0xf8, 0x34, 0x4d, 0x22, 0x2e, 0x75, 0x05, 0x2e, + 0xc9, 0xed, 0xff, 0xd4, 0xdc, 0xca, 0xb1, 0x15, 0xb4, 0xa0, 0xa1, 0x5f, 0x9d, 0x60, 0x00, 0xf8, + 0x7c, 0xe6, 0xab, 0x16, 0x37, 0x87, 0x2a, 0x74, 0x31, 0x75, 0x92, 0x99, 0xc1, 0x01, 0xbc, 0x5a, + 0x31, 0xe1, 0xb1, 0x0f, 0x6b, 0xb6, 0x43, 0x85, 0x2d, 0xff, 0x69, 0x07, 0xe7, 0xeb, 0xfd, 0x3e, + 0xb4, 0xac, 0x1e, 0x9e, 0x3e, 0x00, 0x5d, 0xf0, 0x0e, 0xe7, 0xc9, 0xe4, 0xce, 0x9e, 0x41, 0xd7, + 0xd9, 0x4b, 0xa1, 0x69, 0x04, 0x8d, 0xbf, 0x80, 0x67, 0xfe, 0x2e, 0xe5, 0x82, 0xd1, 0x18, 0x57, + 0xea, 0x7d, 0x6b, 0xa5, 0x37, 0xa8, 0xf4, 0x9c, 0xb7, 0x0e, 0xbe, 0x81, 0xfa, 0x45, 0xc4, 0x43, + 0x2c, 0x3d, 0xbb, 0x5b, 0x25, 0x2b, 0xa8, 0x1c, 0xfe, 0xf8, 0x67, 0x3f, 0x8c, 0xe4, 0xec, 0xe1, + 0x66, 0x30, 0x49, 0xe2, 0xdd, 0xd9, 0x63, 0xca, 0x16, 0x73, 0x36, 0x0d, 0xd9, 0x62, 0xf7, 0x96, + 0xde, 0x2c, 0xa2, 0xc9, 0x6e, 0xa8, 0xb7, 0xde, 0xd5, 0xac, 0x9b, 0xa6, 0xfe, 0xec, 0xff, 0x1b, + 0x00, 0x00, 0xff, 0xff, 0xe3, 0x43, 0x0c, 0x66, 0x1d, 0x0a, 0x00, 0x00, } diff --git a/gossip/proto/message.proto b/gossip/proto/message.proto index 64dc4815c54..5fc85ff97f2 100644 --- a/gossip/proto/message.proto +++ b/gossip/proto/message.proto @@ -103,31 +103,41 @@ message ConnEstablish { // Messages related to pull mechanism +enum MsgType { + UNDEFINED = 0; + BlockMessage = 1; +} + // DataRequest is a message used for a peer to request // certain data blocks from a remote peer message DataRequest { - uint64 nonce = 1; - repeated uint64 seqMap = 2; // Maybe change this to bitmap later on + uint64 nonce = 1; + repeated string digests = 2; + MsgType msgType = 3; } // GossipHello is the message that is used for the peer to initiate // a pull round with another peer message GossipHello { - uint64 nonce = 1; + uint64 nonce = 1; + bytes metadata = 2; + MsgType msgType = 3; } // DataUpdate is the the final message in the pull phase // sent from the receiver to the initiator message DataUpdate { uint64 nonce = 1; - repeated DataMessage data = 2; + repeated GossipMessage data = 2; + MsgType msgType = 3; } // DataDigest is the message sent from the receiver peer // to the initator peer and contains the data items it has message DataDigest { uint64 nonce = 1; - repeated uint64 seqMap = 2; // Maybe change this to bitmap later on + repeated string digests = 2; // Maybe change this to bitmap later on + MsgType msgType = 3; }