From 1b5d378b003c93b688e181eb58082db756574ebf Mon Sep 17 00:00:00 2001 From: Jason Yellick Date: Wed, 9 Nov 2016 17:08:40 -0500 Subject: [PATCH] [FAB-798] Abstract out the solo deliver handler As the next step of consolidating the common logic of the atomicbroadcast api between components, this changeset pulls out the logic which is not solo specific and moves it into the common/deliver package. SBFT was already dependent on this functionality, so this is a natural move. This continues, but does not satisfy FAB-798. Change-Id: I02c5ef5b03f9e1a17fd188e3df7b6fb42aa126b5 Signed-off-by: Jason Yellick --- orderer/{solo => common/deliver}/deliver.go | 18 ++++++-- .../{solo => common/deliver}/deliver_test.go | 42 ++++++++++++------- orderer/sbft/backend/backendab.go | 8 ++-- orderer/solo/solo.go | 7 ++-- 4 files changed, 50 insertions(+), 25 deletions(-) rename orderer/{solo => common/deliver}/deliver.go (91%) rename orderer/{solo => common/deliver}/deliver_test.go (90%) diff --git a/orderer/solo/deliver.go b/orderer/common/deliver/deliver.go similarity index 91% rename from orderer/solo/deliver.go rename to orderer/common/deliver/deliver.go index a588f8cce96..7eb87511f5f 100644 --- a/orderer/solo/deliver.go +++ b/orderer/common/deliver/deliver.go @@ -14,27 +14,39 @@ See the License for the specific language governing permissions and limitations under the License. */ -package solo +package deliver import ( "github.com/hyperledger/fabric/orderer/rawledger" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" + + "github.com/op/go-logging" ) +var logger = logging.MustGetLogger("orderer/common/deliver") + +func init() { + logging.SetLevel(logging.DEBUG, "") +} + +type Handler interface { + Handle(srv ab.AtomicBroadcast_DeliverServer) error +} + type DeliverServer struct { rl rawledger.Reader maxWindow int } -func NewDeliverServer(rl rawledger.Reader, maxWindow int) *DeliverServer { +func NewHandlerImpl(rl rawledger.Reader, maxWindow int) Handler { return &DeliverServer{ rl: rl, maxWindow: maxWindow, } } -func (ds *DeliverServer) HandleDeliver(srv ab.AtomicBroadcast_DeliverServer) error { +func (ds *DeliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error { logger.Debugf("Starting new Deliver loop") d := newDeliverer(ds, srv) return d.recv() diff --git a/orderer/solo/deliver_test.go b/orderer/common/deliver/deliver_test.go similarity index 90% rename from orderer/solo/deliver_test.go rename to orderer/common/deliver/deliver_test.go index d9d6e1cf94f..b4d888e9f90 100644 --- a/orderer/solo/deliver_test.go +++ b/orderer/common/deliver/deliver_test.go @@ -14,20 +14,32 @@ See the License for the specific language governing permissions and limitations under the License. */ -package solo +package deliver import ( "fmt" "testing" "time" - "google.golang.org/grpc" - + "github.com/hyperledger/fabric/orderer/common/bootstrap/static" "github.com/hyperledger/fabric/orderer/rawledger/ramledger" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" + + "google.golang.org/grpc" ) +var genesisBlock *cb.Block + +func init() { + bootstrapper := static.New() + var err error + genesisBlock, err = bootstrapper.GenesisBlock() + if err != nil { + panic("Error intializing static bootstrap genesis block") + } +} + // MagicLargestWindow is used as the default max window size for initializing the deliver service const MagicLargestWindow int = 1000 @@ -66,9 +78,9 @@ func TestOldestSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := NewDeliverServer(rl, MagicLargestWindow) + ds := NewHandlerImpl(rl, MagicLargestWindow) - go ds.HandleDeliver(m) + go ds.Handle(m) m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_OLDEST}}} @@ -98,9 +110,9 @@ func TestNewestSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := NewDeliverServer(rl, MagicLargestWindow) + ds := NewHandlerImpl(rl, MagicLargestWindow) - go ds.HandleDeliver(m) + go ds.Handle(m) m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_NEWEST}}} @@ -126,9 +138,9 @@ func TestSpecificSeek(t *testing.T) { } m := newMockD() - ds := NewDeliverServer(rl, MagicLargestWindow) + ds := NewHandlerImpl(rl, MagicLargestWindow) - go ds.HandleDeliver(m) + go ds.Handle(m) m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_SPECIFIED, SpecifiedNumber: uint64(ledgerSize - 1)}}} @@ -155,9 +167,9 @@ func TestBadSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := NewDeliverServer(rl, MagicLargestWindow) + ds := NewHandlerImpl(rl, MagicLargestWindow) - go ds.HandleDeliver(m) + go ds.Handle(m) m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow), Start: ab.SeekInfo_SPECIFIED, SpecifiedNumber: uint64(ledgerSize - 1)}}} @@ -188,9 +200,9 @@ func TestBadWindow(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := NewDeliverServer(rl, MagicLargestWindow) + ds := NewHandlerImpl(rl, MagicLargestWindow) - go ds.HandleDeliver(m) + go ds.Handle(m) m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: uint64(MagicLargestWindow) * 2, Start: ab.SeekInfo_OLDEST}}} @@ -214,9 +226,9 @@ func TestAck(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := NewDeliverServer(rl, MagicLargestWindow) + ds := NewHandlerImpl(rl, MagicLargestWindow) - go ds.HandleDeliver(m) + go ds.Handle(m) m.recvChan <- &ab.DeliverUpdate{Type: &ab.DeliverUpdate_Seek{Seek: &ab.SeekInfo{WindowSize: windowSize, Start: ab.SeekInfo_OLDEST}}} diff --git a/orderer/sbft/backend/backendab.go b/orderer/sbft/backend/backendab.go index 1c27384bb8c..7c918453fa3 100644 --- a/orderer/sbft/backend/backendab.go +++ b/orderer/sbft/backend/backendab.go @@ -18,20 +18,20 @@ package backend import ( "github.com/golang/protobuf/proto" - "github.com/hyperledger/fabric/orderer/solo" + "github.com/hyperledger/fabric/orderer/common/deliver" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" ) type BackendAB struct { backend *Backend - deliverserver *solo.DeliverServer + deliverserver deliver.Handler } func NewBackendAB(backend *Backend) *BackendAB { bab := &BackendAB{ backend: backend, - deliverserver: solo.NewDeliverServer(backend.ledger, 1000), + deliverserver: deliver.NewHandlerImpl(backend.ledger, 1000), } return bab } @@ -64,5 +64,5 @@ func (b *BackendAB) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error { // Deliver sends a stream of blocks to a client after ordering func (b *BackendAB) Deliver(srv ab.AtomicBroadcast_DeliverServer) error { - return b.deliverserver.HandleDeliver(srv) + return b.deliverserver.Handle(srv) } diff --git a/orderer/solo/solo.go b/orderer/solo/solo.go index 62d6afea22b..aafbd639bec 100644 --- a/orderer/solo/solo.go +++ b/orderer/solo/solo.go @@ -22,6 +22,7 @@ import ( "github.com/hyperledger/fabric/orderer/common/broadcast" "github.com/hyperledger/fabric/orderer/common/broadcastfilter" "github.com/hyperledger/fabric/orderer/common/configtx" + "github.com/hyperledger/fabric/orderer/common/deliver" "github.com/hyperledger/fabric/orderer/rawledger" ab "github.com/hyperledger/fabric/protos/orderer" @@ -38,14 +39,14 @@ func init() { type server struct { bh broadcast.Handler bs *broadcastServer - ds *DeliverServer + ds deliver.Handler } // New creates a ab.AtomicBroadcastServer based on the solo orderer implementation func New(queueSize, batchSize, maxWindowSize int, batchTimeout time.Duration, rl rawledger.ReadWriter, grpcServer *grpc.Server, filters *broadcastfilter.RuleSet, configManager configtx.Manager) ab.AtomicBroadcastServer { logger.Infof("Starting solo with queueSize=%d, batchSize=%d batchTimeout=%v and ledger=%T", queueSize, batchSize, batchTimeout, rl) bs := newBroadcastServer(batchSize, batchTimeout, rl, filters, configManager) - ds := NewDeliverServer(rl, maxWindowSize) + ds := deliver.NewHandlerImpl(rl, maxWindowSize) bh := broadcast.NewHandlerImpl(queueSize, bs, filters, configManager) s := &server{ @@ -65,5 +66,5 @@ func (s *server) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error { // Deliver sends a stream of blocks to a client after ordering func (s *server) Deliver(srv ab.AtomicBroadcast_DeliverServer) error { logger.Debugf("Starting new Deliver loop") - return s.ds.HandleDeliver(srv) + return s.ds.Handle(srv) }