Skip to content

Commit

Permalink
[FAB-5862] Implement ack-based send in gossip
Browse files Browse the repository at this point in the history
This commit implements an acknowledge-based sending in gossip.
A new method was introduced to the comm API: SendWithAck, that receives:
- msg *proto.SignedGossipMessage: a message to send
- timeout time.Duration: a maximum timeout to wait for acks
- minAck int: a minimum amount of acknowledgements to collect
- peers ...*RemotePeer: the peers to send the message to.

The method returns []SendResult, which reports success/failure
on the various peers (but, not all of them - since the method may
return when we succeeded sending to minAck peers)

Change-Id: Ie2a478228180c5dd2cf773ae10160f1a8484f546
Signed-off-by: yacovm <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Sep 5, 2017
1 parent 099524a commit 7c404eb
Show file tree
Hide file tree
Showing 15 changed files with 595 additions and 114 deletions.
73 changes: 73 additions & 0 deletions gossip/comm/ack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package comm

import (
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/util"
proto "github.com/hyperledger/fabric/protos/gossip"
)

type sendFunc func(peer *RemotePeer, msg *proto.SignedGossipMessage)
type waitFunc func(*RemotePeer) error

type ackSendOperation struct {
snd sendFunc
waitForAck waitFunc
}

func newAckSendOperation(snd sendFunc, waitForAck waitFunc) *ackSendOperation {
return &ackSendOperation{
snd: snd,
waitForAck: waitForAck,
}
}

func (aso *ackSendOperation) send(msg *proto.SignedGossipMessage, minAckNum int, peers ...*RemotePeer) []SendResult {
successAcks := 0
results := []SendResult{}

acks := make(chan SendResult, len(peers))
// Send to all peers the message
for _, p := range peers {
go func(p *RemotePeer) {
// Send the message to 'p'
aso.snd(p, msg)
// Wait for an ack from 'p', or get an error if timed out
err := aso.waitForAck(p)
acks <- SendResult{
RemotePeer: *p,
error: err,
}
}(p)
}
for {
ack := <-acks
results = append(results, SendResult{
error: ack.error,
RemotePeer: ack.RemotePeer,
})
if ack.error == nil {
successAcks++
}
if successAcks == minAckNum || len(results) == len(peers) {
break
}
}
return results
}

func interceptAcks(nextHandler handler, remotePeerID common.PKIidType, pubSub *util.PubSub) func(*proto.SignedGossipMessage) {
return func(m *proto.SignedGossipMessage) {
if m.IsAck() {
topic := topicForAck(m.Nonce, remotePeerID)
pubSub.Publish(topic, m.GetAck())
return
}
nextHandler(m)
}
}
153 changes: 153 additions & 0 deletions gossip/comm/ack_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package comm

import (
"errors"
"testing"
"time"

"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/util"
proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/stretchr/testify/assert"
)

func TestInterceptAcks(t *testing.T) {
pubsub := util.NewPubSub()
pkiID := common.PKIidType("pkiID")
msgs := make(chan *proto.SignedGossipMessage, 1)
handlerFunc := func(message *proto.SignedGossipMessage) {
msgs <- message
}
wrappedHandler := interceptAcks(handlerFunc, pkiID, pubsub)
ack := &proto.SignedGossipMessage{
GossipMessage: &proto.GossipMessage{
Nonce: 1,
Content: &proto.GossipMessage_Ack{
Ack: &proto.Acknowledgement{},
},
},
}
sub := pubsub.Subscribe(topicForAck(1, pkiID), time.Second)
wrappedHandler(ack)
// Ensure ack was consumed and not passed onwards to the wrapped hander
assert.Len(t, msgs, 0)
_, err := sub.Listen()
// Ensure ack was published
assert.NoError(t, err)

// Test none acks are just forwarded
notAck := &proto.SignedGossipMessage{
GossipMessage: &proto.GossipMessage{
Nonce: 2,
Content: &proto.GossipMessage_DataMsg{
DataMsg: &proto.DataMessage{},
},
},
}
sub = pubsub.Subscribe(topicForAck(2, pkiID), time.Second)
wrappedHandler(notAck)
// Ensure message was passed to the wrapped handler
assert.Len(t, msgs, 1)
_, err = sub.Listen()
// Ensure ack was not published
assert.Error(t, err)
}

func TestAck(t *testing.T) {
t.Parallel()

comm1, _ := newCommInstance(14000, naiveSec)
comm2, _ := newCommInstance(14001, naiveSec)
defer comm2.Stop()
comm3, _ := newCommInstance(14002, naiveSec)
defer comm3.Stop()
comm4, _ := newCommInstance(14003, naiveSec)
defer comm4.Stop()

acceptData := func(o interface{}) bool {
return o.(proto.ReceivedMessage).GetGossipMessage().IsDataMsg()
}

ack := func(c <-chan proto.ReceivedMessage) {
msg := <-c
msg.Ack(nil)
}

nack := func(c <-chan proto.ReceivedMessage) {
msg := <-c
msg.Ack(errors.New("Failed processing message because reasons"))
}

// Have instances 2 and 3 subscribe to data messages, and ack them
inc2 := comm2.Accept(acceptData)
inc3 := comm3.Accept(acceptData)

// Collect 2 out of 2 acks - should succeed
go ack(inc2)
go ack(inc3)
res := comm1.SendWithAck(createGossipMsg(), time.Second*3, 2, remotePeer(14001), remotePeer(14002))
assert.Len(t, res, 2)
assert.Empty(t, res[0].Error())
assert.Empty(t, res[1].Error())

// Collect 2 out of 3 acks - should succeed
t1 := time.Now()
go ack(inc2)
go ack(inc3)
res = comm1.SendWithAck(createGossipMsg(), time.Second*10, 2, remotePeer(14001), remotePeer(14002), remotePeer(14003))
elapsed := time.Since(t1)
assert.Len(t, res, 2)
assert.Empty(t, res[0].Error())
assert.Empty(t, res[1].Error())
// Collection of 2 out of 3 acks should have taken much less than the timeout (10 seconds)
assert.True(t, elapsed < time.Second*5)

// Collect 2 out of 3 acks - should fail, because peer3 now have sent an error along with the ack
go ack(inc2)
go nack(inc3)
res = comm1.SendWithAck(createGossipMsg(), time.Second*10, 2, remotePeer(14001), remotePeer(14002), remotePeer(14003))
assert.Len(t, res, 3)
assert.Contains(t, []string{res[0].Error(), res[1].Error(), res[2].Error()}, "Failed processing message because reasons")
assert.Contains(t, []string{res[0].Error(), res[1].Error(), res[2].Error()}, "timed out")

// Collect 2 out of 2 acks - should fail because comm2 and comm3 now don't acknowledge messages
res = comm1.SendWithAck(createGossipMsg(), time.Second*3, 2, remotePeer(14001), remotePeer(14002))
assert.Len(t, res, 2)
assert.Contains(t, res[0].Error(), "timed out")
assert.Contains(t, res[1].Error(), "timed out")
// Drain ack messages to prepare for next salvo
<-inc2
<-inc3

// Collect 2 out of 3 acks - should fail
go ack(inc2)
go nack(inc3)
res = comm1.SendWithAck(createGossipMsg(), time.Second*3, 2, remotePeer(14001), remotePeer(14002), remotePeer(14003))
assert.Len(t, res, 3)
assert.Contains(t, []string{res[0].Error(), res[1].Error(), res[2].Error()}, "") // This is the "successful ack"
assert.Contains(t, []string{res[0].Error(), res[1].Error(), res[2].Error()}, "Failed processing message because reasons")
assert.Contains(t, []string{res[0].Error(), res[1].Error(), res[2].Error()}, "timed out")
assert.Contains(t, res.String(), "\"Failed processing message because reasons\":1")
assert.Contains(t, res.String(), "\"timed out\":1")
assert.Contains(t, res.String(), "\"successes\":1")
assert.Equal(t, 2, res.NackCount())
assert.Equal(t, 1, res.AckCount())

// Send a message to no one
res = comm1.SendWithAck(createGossipMsg(), time.Second*3, 1)
assert.Len(t, res, 0)

// Send a message while stopping
comm1.Stop()
res = comm1.SendWithAck(createGossipMsg(), time.Second*3, 1, remotePeer(14001), remotePeer(14002), remotePeer(14003))
assert.Len(t, res, 3)
assert.Contains(t, res[0].Error(), "comm is stopping")
assert.Contains(t, res[1].Error(), "comm is stopping")
assert.Contains(t, res[2].Error(), "comm is stopping")
}
62 changes: 62 additions & 0 deletions gossip/comm/comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ SPDX-License-Identifier: Apache-2.0
package comm

import (
"encoding/json"
"fmt"
"time"

"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/common"
Expand All @@ -24,6 +26,9 @@ type Comm interface {
// Send sends a message to remote peers
Send(msg *proto.SignedGossipMessage, peers ...*RemotePeer)

// SendWithAck sends a message to remote peers, waiting for acknowledgement from minAck of them, or until a certain timeout expires
SendWithAck(msg *proto.SignedGossipMessage, timeout time.Duration, minAck int, peers ...*RemotePeer) AggregatedSendResult

// Probe probes a remote node and returns nil if its responsive,
// and an error if it's not.
Probe(peer *RemotePeer) error
Expand Down Expand Up @@ -52,6 +57,63 @@ type RemotePeer struct {
PKIID common.PKIidType
}

// SendResult defines a result of a send to a remote peer
type SendResult struct {
error
RemotePeer
}

// Error returns the error of the SendResult, or an empty string
// if an error hasn't occurred
func (sr SendResult) Error() string {
if sr.error != nil {
return sr.error.Error()
}
return ""
}

// AggregatedSendResult represents a slice of SendResults
type AggregatedSendResult []SendResult

// AckCount returns the number of successful acknowledgements
func (ar AggregatedSendResult) AckCount() int {
c := 0
for _, ack := range ar {
if ack.error == nil {
c++
}
}
return c
}

// NackCount returns the number of unsuccessful acknowledgements
func (ar AggregatedSendResult) NackCount() int {
return len(ar) - ar.AckCount()
}

// String returns a JSONed string representation
// of the AggregatedSendResult
func (ar AggregatedSendResult) String() string {
errMap := map[string]int{}
for _, ack := range ar {
if ack.error == nil {
continue
}
errMap[ack.Error()]++
}

ackCount := ar.AckCount()
output := map[string]interface{}{}
if ackCount > 0 {
output["successes"] = ackCount
}
if ackCount < len(ar) {
output["failures"] = errMap
}
b, _ := json.Marshal(output)
return string(b)
}

// String converts a RemotePeer to a string
func (p *RemotePeer) String() string {
return fmt.Sprintf("%s, PKIid:%v", p.Endpoint, p.PKIID)
Expand Down
Loading

0 comments on commit 7c404eb

Please sign in to comment.