diff --git a/core/deliverservice/client.go b/core/deliverservice/client.go index 2d6f65577ba..910da792c30 100644 --- a/core/deliverservice/client.go +++ b/core/deliverservice/client.go @@ -47,14 +47,14 @@ type clientFactory func(*grpc.ClientConn) orderer.AtomicBroadcastClient type broadcastClient struct { stopFlag int32 - sync.RWMutex + sync.Mutex stopChan chan struct{} createClient clientFactory shouldRetry retryPolicy onConnect broadcastSetup prod comm.ConnectionProducer blocksprovider.BlocksDeliverer - conn *grpc.ClientConn + conn *connection } // NewBroadcastClient returns a broadcastClient with the given params @@ -65,6 +65,9 @@ func NewBroadcastClient(prod comm.ConnectionProducer, clFactory clientFactory, o // Recv receives a message from the ordering service func (bc *broadcastClient) Recv() (*orderer.DeliverResponse, error) { o, err := bc.try(func() (interface{}, error) { + if bc.shouldStop() { + return nil, errors.New("closing") + } return bc.BlocksDeliverer.Recv() }) if err != nil { @@ -76,6 +79,9 @@ func (bc *broadcastClient) Recv() (*orderer.DeliverResponse, error) { // Send sends a message to the ordering service func (bc *broadcastClient) Send(msg *common.Envelope) error { _, err := bc.try(func() (interface{}, error) { + if bc.shouldStop() { + return nil, errors.New("closing") + } return nil, bc.BlocksDeliverer.Send(msg) }) return err @@ -106,7 +112,7 @@ func (bc *broadcastClient) try(action func() (interface{}, error)) (interface{}, } func (bc *broadcastClient) doAction(action func() (interface{}, error)) (interface{}, error) { - if bc.BlocksDeliverer == nil { + if bc.conn == nil { err := bc.connect() if err != nil { return nil, err @@ -114,9 +120,7 @@ func (bc *broadcastClient) doAction(action func() (interface{}, error)) (interfa } resp, err := action() if err != nil { - bc.conn.Close() - bc.BlocksDeliverer = nil - bc.conn = nil + bc.disconnect() return nil, err } return resp, nil @@ -141,16 +145,45 @@ func (bc *broadcastClient) connect() error { conn.Close() return err } - err = bc.onConnect(bc) + err = bc.afterConnect(conn, abc) if err == nil { - bc.Lock() - bc.conn = conn + return nil + } + // If we reached here, lets make sure connection is closed + // and nullified before we return + bc.disconnect() + return err +} + +func (bc *broadcastClient) afterConnect(conn *grpc.ClientConn, abc orderer.AtomicBroadcast_DeliverClient) error { + bc.Lock() + bc.conn = &connection{ClientConn: conn} + bc.BlocksDeliverer = abc + if bc.shouldStop() { bc.Unlock() - bc.BlocksDeliverer = abc + return errors.New("closing") + } + bc.Unlock() + // If the client is closed at this point- before onConnect, + // any use of this object by onConnect would return an error. + err := bc.onConnect(bc) + // If the client is closed right after onConnect, but before + // the following lock- this method would return an error because + // the client has been closed. + bc.Lock() + defer bc.Unlock() + if bc.shouldStop() { + return errors.New("closing") + } + // If the client is closed right after this method exits, + // it's because this method returned nil and not an error. + // So- connect() would return nil also, and the flow of the goroutine + // is returned to doAction(), where action() is invoked - and is configured + // to check whether the client has closed or not. + if err == nil { return nil } logger.Error("Failed setting up broadcast:", err) - conn.Close() return err } @@ -159,12 +192,39 @@ func (bc *broadcastClient) shouldStop() bool { } func (bc *broadcastClient) Close() { + bc.Lock() + defer bc.Unlock() + if bc.shouldStop() { + return + } atomic.StoreInt32(&bc.stopFlag, int32(1)) bc.stopChan <- struct{}{} - bc.RLock() - defer bc.RUnlock() if bc.conn == nil { return } bc.conn.Close() } + +func (bc *broadcastClient) disconnect() { + bc.Lock() + defer bc.Unlock() + if bc.conn == nil { + return + } + bc.conn.Close() + bc.conn = nil + bc.BlocksDeliverer = nil +} + +type connection struct { + *grpc.ClientConn + sync.Once +} + +func (c *connection) Close() error { + var err error + c.Once.Do(func() { + err = c.ClientConn.Close() + }) + return err +} diff --git a/core/deliverservice/client_test.go b/core/deliverservice/client_test.go index af932161b98..7778c7b1103 100644 --- a/core/deliverservice/client_test.go +++ b/core/deliverservice/client_test.go @@ -17,16 +17,20 @@ limitations under the License. package deliverclient import ( + "crypto/sha256" "errors" + "math" "sync" "sync/atomic" "testing" "time" + "github.com/hyperledger/fabric/core/comm" "github.com/hyperledger/fabric/core/deliverservice/blocksprovider" "github.com/hyperledger/fabric/core/deliverservice/mocks" "github.com/hyperledger/fabric/protos/common" "github.com/hyperledger/fabric/protos/orderer" + "github.com/hyperledger/fabric/protos/utils" "github.com/stretchr/testify/assert" "golang.org/x/net/context" "google.golang.org/grpc" @@ -153,7 +157,6 @@ func (cp *connProducer) UpdateEndpoints(endpoints []string) { } func TestOrderingServiceConnFailure(t *testing.T) { - t.Parallel() testOrderingServiceConnFailure(t, blockDelivererConsumerWithRecv) testOrderingServiceConnFailure(t, blockDelivererConsumerWithSend) assert.Equal(t, 0, connNumber) @@ -406,13 +409,11 @@ func testLimitedConnAttempts(t *testing.T, bdc blocksDelivererConsumer) { } func TestLimitedTotalConnTimeRcv(t *testing.T) { - t.Parallel() testLimitedTotalConnTime(t, blockDelivererConsumerWithRecv) assert.Equal(t, 0, connNumber) } func TestLimitedTotalConnTimeSnd(t *testing.T) { - t.Parallel() testLimitedTotalConnTime(t, blockDelivererConsumerWithSend) assert.Equal(t, 0, connNumber) } @@ -473,11 +474,10 @@ func testGreenPath(t *testing.T, bdc blocksDelivererConsumer) { } func TestCloseWhileRecv(t *testing.T) { - t.Parallel() // Scenario: Recv is being called and after a while, // the connection is closed. // The Recv should return immediately in such a case - fakeOrderer := mocks.NewOrderer(5611) + fakeOrderer := mocks.NewOrderer(5611, t) time.Sleep(time.Second) defer fakeOrderer.Shutdown() cp := &connProducer{ordererEndpoint: "localhost:5611"} @@ -496,6 +496,7 @@ func TestCloseWhileRecv(t *testing.T) { time.AfterFunc(time.Second, func() { atomic.StoreInt32(&flag, int32(1)) bc.Close() + bc.Close() // Try to close a second time }) resp, err := bc.Recv() // Ensure we returned because bc.Close() was called and not because some other reason @@ -506,7 +507,6 @@ func TestCloseWhileRecv(t *testing.T) { } func TestCloseWhileSleep(t *testing.T) { - t.Parallel() testCloseWhileSleep(t, blockDelivererConsumerWithRecv) testCloseWhileSleep(t, blockDelivererConsumerWithSend) assert.Equal(t, 0, connNumber) @@ -543,9 +543,61 @@ func testCloseWhileSleep(t *testing.T, bdc blocksDelivererConsumer) { go func() { wg.Wait() bc.Close() + bc.Close() // Try to close a second time }() err := bdc(bc) assert.Error(t, err) assert.Equal(t, 1, cp.connAttempts) assert.Equal(t, 0, setupInvoked) } + +type signerMock struct { +} + +func (s *signerMock) NewSignatureHeader() (*common.SignatureHeader, error) { + return &common.SignatureHeader{}, nil +} + +func (s *signerMock) Sign(message []byte) ([]byte, error) { + hasher := sha256.New() + hasher.Write(message) + return hasher.Sum(nil), nil +} + +func TestProductionUsage(t *testing.T) { + // This test configures the client in a similar fashion as will be + // in production, and tests against a live gRPC server. + os := mocks.NewOrderer(5612, t) + os.SetNextExpectedSeek(5) + defer os.Shutdown() + connFact := func(endpoint string) (*grpc.ClientConn, error) { + return grpc.Dial(endpoint, grpc.WithInsecure(), grpc.WithBlock()) + } + prod := comm.NewConnectionProducer(connFact, []string{"localhost:5612"}) + clFact := func(cc *grpc.ClientConn) orderer.AtomicBroadcastClient { + return orderer.NewAtomicBroadcastClient(cc) + } + onConnect := func(bd blocksprovider.BlocksDeliverer) error { + env, err := utils.CreateSignedEnvelope(common.HeaderType_CONFIG_UPDATE, + "TEST", + &signerMock{}, newTestSeekInfo(), 0, 0) + assert.NoError(t, err) + return bd.Send(env) + } + retryPol := func(attemptNum int, elapsedTime time.Duration) (time.Duration, bool) { + return time.Second * 3, attemptNum < 2 + } + cl := NewBroadcastClient(prod, clFact, onConnect, retryPol) + go os.SendBlock(5) + resp, err := cl.Recv() + assert.NoError(t, err) + assert.NotNil(t, resp) + assert.Equal(t, uint64(5), resp.GetBlock().Header.Number) +} + +func newTestSeekInfo() *orderer.SeekInfo { + return &orderer.SeekInfo{Start: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: 5}}}, + Stop: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: math.MaxUint64}}}, + Behavior: orderer.SeekInfo_BLOCK_UNTIL_READY, + } +} diff --git a/core/deliverservice/mocks/orderer.go b/core/deliverservice/mocks/orderer.go index 77fbc8905f2..d05ea2d258a 100644 --- a/core/deliverservice/mocks/orderer.go +++ b/core/deliverservice/mocks/orderer.go @@ -19,29 +19,45 @@ package mocks import ( "fmt" "net" + "sync/atomic" + "testing" + "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric/protos/common" "github.com/hyperledger/fabric/protos/orderer" + "github.com/stretchr/testify/assert" "google.golang.org/grpc" ) type Orderer struct { net.Listener *grpc.Server + nextExpectedSeek uint64 + t *testing.T + blockChannel chan uint64 + stopChan chan struct{} } -func NewOrderer(port int) *Orderer { +func NewOrderer(port int, t *testing.T) *Orderer { srv := grpc.NewServer() lsnr, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port)) if err != nil { panic(err) } go srv.Serve(lsnr) - o := &Orderer{Server: srv, Listener: lsnr} + o := &Orderer{Server: srv, + Listener: lsnr, + t: t, + nextExpectedSeek: uint64(1), + blockChannel: make(chan uint64, 1), + stopChan: make(chan struct{}, 1), + } orderer.RegisterAtomicBroadcastServer(srv, o) return o } func (o *Orderer) Shutdown() { + o.stopChan <- struct{}{} o.Server.Stop() o.Listener.Close() } @@ -50,6 +66,44 @@ func (*Orderer) Broadcast(orderer.AtomicBroadcast_BroadcastServer) error { panic("Should not have ben called") } -func (*Orderer) Deliver(orderer.AtomicBroadcast_DeliverServer) error { - return nil +func (o *Orderer) SetNextExpectedSeek(seq uint64) { + atomic.StoreUint64(&o.nextExpectedSeek, uint64(seq)) +} + +func (o *Orderer) SendBlock(seq uint64) { + o.blockChannel <- seq +} + +func (o *Orderer) Deliver(stream orderer.AtomicBroadcast_DeliverServer) error { + envlp, err := stream.Recv() + if err != nil { + fmt.Println(err) + return nil + } + payload := &common.Payload{} + proto.Unmarshal(envlp.Payload, payload) + seekInfo := &orderer.SeekInfo{} + proto.Unmarshal(payload.Data, seekInfo) + assert.True(o.t, seekInfo.Behavior == orderer.SeekInfo_BLOCK_UNTIL_READY) + assert.Equal(o.t, atomic.LoadUint64(&o.nextExpectedSeek), seekInfo.Start.GetSpecified().Number) + + for { + select { + case <-o.stopChan: + return nil + case seq := <-o.blockChannel: + o.sendBlock(stream, seq) + } + } +} + +func (o *Orderer) sendBlock(stream orderer.AtomicBroadcast_DeliverServer, seq uint64) { + block := &common.Block{ + Header: &common.BlockHeader{ + Number: seq, + }, + } + stream.Send(&orderer.DeliverResponse{ + Type: &orderer.DeliverResponse_Block{Block: block}, + }) }