From 3dfd2448a803e64d1d2c2f350edd435f809d4c0f Mon Sep 17 00:00:00 2001 From: Gaylor Bosson Date: Mon, 23 Mar 2020 16:48:50 +0100 Subject: [PATCH 1/7] Mino: additionnal information in Handler.Process --- blockchain/skipchain/block_test.go | 4 +++ blockchain/skipchain/conode.go | 15 ++++++++++ blockchain/skipchain/handler.go | 4 +-- blockchain/skipchain/handler_test.go | 15 +++++++--- blockchain/skipchain/mod.go | 2 ++ blockchain/skipchain/mod_test.go | 12 ++++---- blockchain/skipchain/validator.go | 9 +++++- blockchain/skipchain/validator_test.go | 13 ++++---- consensus/cosipbft/mod.go | 13 ++++---- consensus/cosipbft/mod_test.go | 41 ++++++++++++++------------ consensus/mod.go | 2 +- consensus/qsc/broadcast.go | 6 ++-- consensus/qsc/broadcast_test.go | 6 ++-- consensus/qsc/mod_test.go | 5 +++- cosi/flatcosi/handler.go | 8 ++--- cosi/flatcosi/handler_test.go | 9 ++++-- cosi/mod.go | 2 +- mino/minoch/address.go | 6 ++++ mino/minoch/mod.go | 1 + mino/minoch/rpc.go | 8 ++++- mino/minogrpc/mod.go | 5 ++++ mino/minogrpc/overlay.go | 7 ++++- mino/minogrpc/overlay_test.go | 14 +++++++-- mino/minogrpc/server_test.go | 8 ++--- mino/mod.go | 13 ++++++-- mino/mod_test.go | 2 +- 26 files changed, 157 insertions(+), 73 deletions(-) diff --git a/blockchain/skipchain/block_test.go b/blockchain/skipchain/block_test.go index f69ba9a1b..d337793a1 100644 --- a/blockchain/skipchain/block_test.go +++ b/blockchain/skipchain/block_test.go @@ -408,6 +408,10 @@ type fakeAddress struct { err error } +func (a fakeAddress) Equal(other mino.Address) bool { + return bytes.Equal(other.(fakeAddress).id, a.id) +} + func (a fakeAddress) MarshalText() ([]byte, error) { return []byte(a.id), a.err } diff --git a/blockchain/skipchain/conode.go b/blockchain/skipchain/conode.go index be60fdc36..0ecdbaccf 100644 --- a/blockchain/skipchain/conode.go +++ b/blockchain/skipchain/conode.go @@ -128,6 +128,21 @@ func newConodes(ca cosi.CollectiveAuthority) Conodes { return conodes } +// GetLeader returns the leader of the conodes which is the conode at index 0. +func (cc Conodes) GetLeader() mino.Address { + if len(cc) == 0 { + return nil + } + + return cc[0].GetAddress() +} + +// HasLeader returns true if the address belongs to the current leader. +func (cc Conodes) HasLeader(addr mino.Address) bool { + leader := cc.GetLeader() + return leader != nil && leader.Equal(addr) +} + // Take implements mino.Players. It returns a subset of the conodes. func (cc Conodes) Take(filters ...mino.FilterUpdater) mino.Players { f := mino.ApplyFilters(filters) diff --git a/blockchain/skipchain/handler.go b/blockchain/skipchain/handler.go index 8e0f4d36a..8bdb23d70 100644 --- a/blockchain/skipchain/handler.go +++ b/blockchain/skipchain/handler.go @@ -24,8 +24,8 @@ func newHandler(sc *Skipchain) handler { // Process implements mino.Handler. It handles genesis block propagation // messages only and return an error for any other type. -func (h handler) Process(req proto.Message) (proto.Message, error) { - switch in := req.(type) { +func (h handler) Process(req mino.Request) (proto.Message, error) { + switch in := req.Message.(type) { case *PropagateGenesis: factory := h.GetBlockFactory().(blockFactory) diff --git a/blockchain/skipchain/handler_test.go b/blockchain/skipchain/handler_test.go index 77fc19a36..90a39851f 100644 --- a/blockchain/skipchain/handler_test.go +++ b/blockchain/skipchain/handler_test.go @@ -6,6 +6,7 @@ import ( "github.com/golang/protobuf/ptypes/empty" "github.com/stretchr/testify/require" + "go.dedis.ch/fabric/mino" "golang.org/x/xerrors" ) @@ -21,19 +22,25 @@ func TestHandler_Process(t *testing.T) { packed, err := block.Pack() require.NoError(t, err) - resp, err := h.Process(&PropagateGenesis{Genesis: packed.(*BlockProto)}) + req := mino.Request{ + Message: &PropagateGenesis{Genesis: packed.(*BlockProto)}, + } + resp, err := h.Process(req) require.NoError(t, err) require.Nil(t, resp) - _, err = h.Process(&empty.Empty{}) + req.Message = &empty.Empty{} + _, err = h.Process(req) require.EqualError(t, err, "unknown message type '*empty.Empty'") - _, err = h.Process(&PropagateGenesis{}) + req.Message = &PropagateGenesis{} + _, err = h.Process(req) require.Error(t, err) require.Contains(t, err.Error(), "couldn't decode the block: ") h.Skipchain.db = &fakeDatabase{err: xerrors.New("oops")} - _, err = h.Process(&PropagateGenesis{Genesis: packed.(*BlockProto)}) + req.Message = &PropagateGenesis{Genesis: packed.(*BlockProto)} + _, err = h.Process(req) require.EqualError(t, err, "couldn't write the block: oops") return true diff --git a/blockchain/skipchain/mod.go b/blockchain/skipchain/mod.go index 9d9959b56..175507f7f 100644 --- a/blockchain/skipchain/mod.go +++ b/blockchain/skipchain/mod.go @@ -218,6 +218,8 @@ func (a skipchainActor) Store(data proto.Message, players mino.Players) error { return xerrors.Errorf("couldn't read the latest block: %v", err) } + // TODO: skip if not leader.. + block, err := factory.fromPrevious(previous, data) if err != nil { return xerrors.Errorf("couldn't create next block: %v", err) diff --git a/blockchain/skipchain/mod_test.go b/blockchain/skipchain/mod_test.go index e007998d9..764180623 100644 --- a/blockchain/skipchain/mod_test.go +++ b/blockchain/skipchain/mod_test.go @@ -35,8 +35,8 @@ func TestSkipchain_Basic(t *testing.T) { n := 5 manager := minoch.NewManager() - c1, s1, a1 := makeSkipchain(t, "A", manager) - c2, _, a2 := makeSkipchain(t, "B", manager) + c1, _, a1 := makeSkipchain(t, "A", manager) + c2, s2, _ := makeSkipchain(t, "B", manager) conodes := Conodes{c1, c2} err := a1.InitChain(&empty.Empty{}, conodes) @@ -44,23 +44,23 @@ func TestSkipchain_Basic(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - blocks := s1.Watch(ctx) + blocks := s2.Watch(ctx) for i := 0; i < n; i++ { - err = a2.Store(&empty.Empty{}, conodes) + err = a1.Store(&empty.Empty{}, conodes) require.NoError(t, err) event := <-blocks require.NotNil(t, event) require.IsType(t, SkipBlock{}, event) - chain, err := s1.GetVerifiableBlock() + chain, err := s2.GetVerifiableBlock() require.NoError(t, err) packed, err := chain.Pack() require.NoError(t, err) - block, err := s1.GetBlockFactory().FromVerifiable(packed) + block, err := s2.GetBlockFactory().FromVerifiable(packed) require.NoError(t, err) require.NotNil(t, block) require.Equal(t, uint64(i+1), block.(SkipBlock).Index) diff --git a/blockchain/skipchain/validator.go b/blockchain/skipchain/validator.go index a833de87d..514011eb4 100644 --- a/blockchain/skipchain/validator.go +++ b/blockchain/skipchain/validator.go @@ -9,6 +9,7 @@ import ( "go.dedis.ch/fabric/blockchain" "go.dedis.ch/fabric/consensus" "go.dedis.ch/fabric/encoding" + "go.dedis.ch/fabric/mino" "golang.org/x/xerrors" ) @@ -42,7 +43,9 @@ func newBlockValidator( // Validate implements consensus.Validator. It decodes the message into a block // and validates its integrity. It returns the block if it is correct, otherwise // the error. -func (v *blockValidator) Validate(pb proto.Message) (consensus.Proposal, error) { +func (v *blockValidator) Validate(addr mino.Address, + pb proto.Message) (consensus.Proposal, error) { + factory := v.GetBlockFactory().(blockFactory) block, err := factory.decodeBlock(pb) @@ -60,6 +63,10 @@ func (v *blockValidator) Validate(pb proto.Message) (consensus.Proposal, error) genesis.hash, block.GenesisID) } + if !genesis.Conodes.HasLeader(addr) { + return nil, xerrors.Errorf("mismatch leader %v != %v", addr, genesis.Conodes.GetLeader()) + } + err = v.validator.Validate(block.Payload) if err != nil { return nil, xerrors.Errorf("couldn't validate the payload: %v", err) diff --git a/blockchain/skipchain/validator_test.go b/blockchain/skipchain/validator_test.go index f73d7925d..aff89b2e2 100644 --- a/blockchain/skipchain/validator_test.go +++ b/blockchain/skipchain/validator_test.go @@ -29,27 +29,27 @@ func TestBlockValidator_Validate(t *testing.T) { consensus: fakeConsensus{}, }, } - prop, err := v.Validate(packed) + prop, err := v.Validate(fakeAddress{}, packed) require.NoError(t, err) require.NotNil(t, prop) require.Equal(t, block.GetHash(), prop.GetHash()) require.Equal(t, block.BackLink.Bytes(), prop.GetPreviousHash()) - _, err = v.Validate(nil) + _, err = v.Validate(fakeAddress{}, nil) require.EqualError(t, err, "couldn't decode block: invalid message type ''") v.Skipchain.db = &fakeDatabase{err: xerrors.New("oops")} - _, err = v.Validate(packed) + _, err = v.Validate(fakeAddress{}, packed) require.EqualError(t, err, "couldn't read genesis block: oops") v.Skipchain.db = &fakeDatabase{genesisID: Digest{}} - _, err = v.Validate(packed) + _, err = v.Validate(fakeAddress{}, packed) require.EqualError(t, err, fmt.Sprintf("mismatch genesis hash '%v' != '%v'", Digest{}, block.GenesisID)) v.Skipchain.db = &fakeDatabase{genesisID: block.GenesisID} v.validator = fakeValidator{err: xerrors.New("oops")} - _, err = v.Validate(packed) + _, err = v.Validate(fakeAddress{}, packed) require.EqualError(t, err, "couldn't validate the payload: oops") return true @@ -115,7 +115,8 @@ type fakeDatabase struct { } func (db *fakeDatabase) Read(index int64) (SkipBlock, error) { - return SkipBlock{hash: db.genesisID}, db.err + conodes := Conodes{{addr: fakeAddress{}}} + return SkipBlock{hash: db.genesisID, Conodes: conodes}, db.err } func (db *fakeDatabase) Write(SkipBlock) error { diff --git a/consensus/cosipbft/mod.go b/consensus/cosipbft/mod.go index 36c421b3e..f20e2ce2c 100644 --- a/consensus/cosipbft/mod.go +++ b/consensus/cosipbft/mod.go @@ -174,7 +174,7 @@ type handler struct { validator consensus.Validator } -func (h handler) Hash(in proto.Message) (Digest, error) { +func (h handler) Hash(addr mino.Address, in proto.Message) (Digest, error) { switch msg := in.(type) { case *PrepareRequest: var da ptypes.DynamicAny @@ -185,10 +185,7 @@ func (h handler) Hash(in proto.Message) (Digest, error) { // The proposal first needs to be validated by the caller of the module // to insure the generic data is valid. - // - // TODO: this should lock during the event propagation to insure atomic - // operations. - proposal, err := h.validator.Validate(da.Message) + proposal, err := h.validator.Validate(addr, da.Message) if err != nil { return nil, xerrors.Errorf("couldn't validate the proposal: %v", err) } @@ -250,10 +247,10 @@ type rpcHandler struct { validator consensus.Validator } -func (h rpcHandler) Process(req proto.Message) (proto.Message, error) { - msg, ok := req.(*PropagateRequest) +func (h rpcHandler) Process(req mino.Request) (proto.Message, error) { + msg, ok := req.Message.(*PropagateRequest) if !ok { - return nil, xerrors.Errorf("message type not supported: %T", req) + return nil, xerrors.Errorf("message type not supported: %T", req.Message) } commit, err := h.factory.DecodeSignature(msg.GetCommit()) diff --git a/consensus/cosipbft/mod_test.go b/consensus/cosipbft/mod_test.go index 218b3dd59..500848ed6 100644 --- a/consensus/cosipbft/mod_test.go +++ b/consensus/cosipbft/mod_test.go @@ -259,42 +259,42 @@ func TestHandler_HashPrepare(t *testing.T) { }, } - _, err := h.Hash(&empty.Empty{}) + _, err := h.Hash(nil, &empty.Empty{}) require.EqualError(t, err, "message type not supported: *empty.Empty") empty, err := ptypes.MarshalAny(&empty.Empty{}) require.NoError(t, err) - buffer, err := h.Hash(&PrepareRequest{Proposal: empty}) + buffer, err := h.Hash(nil, &PrepareRequest{Proposal: empty}) require.NoError(t, err) require.NotEmpty(t, buffer) - _, err = h.Hash(&PrepareRequest{Proposal: nil}) + _, err = h.Hash(nil, &PrepareRequest{Proposal: nil}) require.Error(t, err) require.True(t, xerrors.Is(err, encoding.NewAnyDecodingError((*ptypes.DynamicAny)(nil), nil))) h.validator = fakeValidator{err: xerrors.New("oops")} - _, err = h.Hash(&PrepareRequest{Proposal: empty}) + _, err = h.Hash(nil, &PrepareRequest{Proposal: empty}) require.EqualError(t, err, "couldn't validate the proposal: oops") h.validator = fakeValidator{} h.Consensus = &Consensus{storage: fakeStorage{}} - _, err = h.Hash(&PrepareRequest{Proposal: empty}) + _, err = h.Hash(nil, &PrepareRequest{Proposal: empty}) require.EqualError(t, err, "couldn't read last: oops") h.Consensus.storage = newInMemoryStorage() h.Consensus.storage.Store(&ForwardLinkProto{To: []byte{0xaa}}) - _, err = h.Hash(&PrepareRequest{Proposal: empty}) + _, err = h.Hash(nil, &PrepareRequest{Proposal: empty}) require.EqualError(t, err, "mismatch with previous link: aa != bb") h.Consensus.storage = newInMemoryStorage() h.Consensus.queue = &queue{locked: true} - _, err = h.Hash(&PrepareRequest{Proposal: empty}) + _, err = h.Hash(nil, &PrepareRequest{Proposal: empty}) require.EqualError(t, err, "couldn't add to queue: queue is locked") h.Consensus.queue = &queue{} h.factory = &defaultChainFactory{hashFactory: badHashFactory{}} - _, err = h.Hash(&PrepareRequest{Proposal: empty}) + _, err = h.Hash(nil, &PrepareRequest{Proposal: empty}) require.EqualError(t, err, "couldn't compute hash: couldn't write 'from': oops") } @@ -330,21 +330,21 @@ func TestHandler_HashCommit(t *testing.T) { err := h.Consensus.queue.New(fakeProposal{}) require.NoError(t, err) - buffer, err := h.Hash(&CommitRequest{To: []byte{0xaa}}) + buffer, err := h.Hash(nil, &CommitRequest{To: []byte{0xaa}}) require.NoError(t, err) require.Equal(t, []byte{0xde, 0xad, 0xbe, 0xef}, buffer) h.Consensus.factory = fakeFactory{err: xerrors.New("oops")} - _, err = h.Hash(&CommitRequest{}) + _, err = h.Hash(nil, &CommitRequest{}) require.EqualError(t, err, "couldn't decode prepare signature: oops") h.Consensus.factory = fakeFactory{} queue.locked = false - _, err = h.Hash(&CommitRequest{To: []byte("unknown")}) + _, err = h.Hash(nil, &CommitRequest{To: []byte("unknown")}) require.EqualError(t, err, "couldn't update signature: couldn't find proposal '756e6b6e6f776e'") h.Consensus.factory = fakeFactory{errSignature: xerrors.New("oops")} - _, err = h.Hash(&CommitRequest{To: []byte{0xaa}}) + _, err = h.Hash(nil, &CommitRequest{To: []byte{0xaa}}) require.EqualError(t, err, "couldn't marshal the signature: oops") } @@ -367,31 +367,32 @@ func TestRPCHandler_Process(t *testing.T) { }, } - resp, err := h.Process(&empty.Empty{}) + resp, err := h.Process(mino.Request{Message: &empty.Empty{}}) require.EqualError(t, err, "message type not supported: *empty.Empty") require.Nil(t, resp) - resp, err = h.Process(&PropagateRequest{}) + req := mino.Request{Message: &PropagateRequest{}} + resp, err = h.Process(req) require.NoError(t, err) require.Nil(t, resp) h.Consensus.factory = fakeFactory{err: xerrors.New("oops")} - _, err = h.Process(&PropagateRequest{}) + _, err = h.Process(req) require.EqualError(t, err, "couldn't decode commit signature: oops") h.Consensus.factory = fakeFactory{} h.Consensus.queue = fakeQueue{err: xerrors.New("oops")} - _, err = h.Process(&PropagateRequest{}) + _, err = h.Process(req) require.EqualError(t, err, "couldn't finalize: oops") h.Consensus.queue = fakeQueue{} h.Consensus.storage = fakeStorage{} - _, err = h.Process(&PropagateRequest{}) + _, err = h.Process(req) require.EqualError(t, err, "couldn't write forward link: oops") h.Consensus.storage = newInMemoryStorage() h.validator = fakeValidator{err: xerrors.New("oops")} - _, err = h.Process(&PropagateRequest{}) + _, err = h.Process(req) require.EqualError(t, err, "couldn't commit: oops") } @@ -420,7 +421,9 @@ type fakeValidator struct { err error } -func (v fakeValidator) Validate(msg proto.Message) (consensus.Proposal, error) { +func (v fakeValidator) Validate(addr mino.Address, + msg proto.Message) (consensus.Proposal, error) { + p := fakeProposal{} return p, v.err } diff --git a/consensus/mod.go b/consensus/mod.go index c239841b2..048f5b7d8 100644 --- a/consensus/mod.go +++ b/consensus/mod.go @@ -28,7 +28,7 @@ type Validator interface { // Validate should return the proposal decoded from the message or // an error if it is invalid. It should also return the previous // proposal. - Validate(message proto.Message) (curr Proposal, err error) + Validate(addr mino.Address, message proto.Message) (curr Proposal, err error) // Commit should commit the proposal with the given identifier. The // implementation makes sure that the commit is atomic with the validation diff --git a/consensus/qsc/broadcast.go b/consensus/qsc/broadcast.go index 57420ad28..35092358c 100644 --- a/consensus/qsc/broadcast.go +++ b/consensus/qsc/broadcast.go @@ -67,8 +67,8 @@ type hTLCR struct { // Process implements mino.Handler. It handles two cases: (1) A message set sent // from a player that must be processed. (2) A message set request that returns // the list of messages missing to the distant player. -func (h hTLCR) Process(in proto.Message) (proto.Message, error) { - switch msg := in.(type) { +func (h hTLCR) Process(req mino.Request) (proto.Message, error) { + switch msg := req.Message.(type) { case *MessageSet: h.ch <- msg return nil, nil @@ -85,7 +85,7 @@ func (h hTLCR) Process(in proto.Message) (proto.Message, error) { return h.store.previous, nil default: - return nil, xerrors.Errorf("invalid message type '%T'", in) + return nil, xerrors.Errorf("invalid message type '%T'", req.Message) } } diff --git a/consensus/qsc/broadcast_test.go b/consensus/qsc/broadcast_test.go index 69b6ae521..c34dfda94 100644 --- a/consensus/qsc/broadcast_test.go +++ b/consensus/qsc/broadcast_test.go @@ -51,16 +51,16 @@ func TestHandlerTLCR_Process(t *testing.T) { }, } - resp, err := h.Process(&MessageSet{}) + resp, err := h.Process(mino.Request{Message: &MessageSet{}}) require.NoError(t, err) require.Nil(t, resp) require.NotNil(t, <-ch) - resp, err = h.Process(&RequestMessageSet{TimeStep: 0}) + resp, err = h.Process(mino.Request{Message: &RequestMessageSet{TimeStep: 0}}) require.NoError(t, err) require.Nil(t, resp) - _, err = h.Process(&empty.Empty{}) + _, err = h.Process(mino.Request{Message: &empty.Empty{}}) require.EqualError(t, err, "invalid message type '*empty.Empty'") } diff --git a/consensus/qsc/mod_test.go b/consensus/qsc/mod_test.go index a83d0e946..a00146466 100644 --- a/consensus/qsc/mod_test.go +++ b/consensus/qsc/mod_test.go @@ -13,6 +13,7 @@ import ( "go.dedis.ch/fabric/consensus" "go.dedis.ch/fabric/encoding" internal "go.dedis.ch/fabric/internal/testing" + "go.dedis.ch/fabric/mino" "go.dedis.ch/fabric/mino/minoch" "golang.org/x/xerrors" ) @@ -174,7 +175,9 @@ type fakeValidator struct { wg sync.WaitGroup } -func (v *fakeValidator) Validate(pb proto.Message) (consensus.Proposal, error) { +func (v *fakeValidator) Validate(addr mino.Address, + pb proto.Message) (consensus.Proposal, error) { + return nil, nil } diff --git a/cosi/flatcosi/handler.go b/cosi/flatcosi/handler.go index 02cc6d3b7..282fbf995 100644 --- a/cosi/flatcosi/handler.go +++ b/cosi/flatcosi/handler.go @@ -23,18 +23,18 @@ func newHandler(s crypto.Signer, h cosi.Hashable) handler { } } -func (h handler) Process(msg proto.Message) (proto.Message, error) { +func (h handler) Process(req mino.Request) (proto.Message, error) { var resp proto.Message - switch req := msg.(type) { + switch msg := req.Message.(type) { case *SignatureRequest: var da ptypes.DynamicAny - err := protoenc.UnmarshalAny(req.Message, &da) + err := protoenc.UnmarshalAny(msg.Message, &da) if err != nil { return nil, encoding.NewAnyDecodingError(&da, err) } - buf, err := h.hasher.Hash(da.Message) + buf, err := h.hasher.Hash(req.Address, da.Message) if err != nil { return nil, xerrors.Errorf("couldn't hash message: %v", err) } diff --git a/cosi/flatcosi/handler_test.go b/cosi/flatcosi/handler_test.go index f68b49337..051a9113f 100644 --- a/cosi/flatcosi/handler_test.go +++ b/cosi/flatcosi/handler_test.go @@ -11,6 +11,7 @@ import ( "go.dedis.ch/fabric/cosi" "go.dedis.ch/fabric/crypto/bls" "go.dedis.ch/fabric/encoding" + "go.dedis.ch/fabric/mino" "golang.org/x/xerrors" ) @@ -19,7 +20,7 @@ type fakeHasher struct { err error } -func (h fakeHasher) Hash(proto.Message) ([]byte, error) { +func (h fakeHasher) Hash(mino.Address, proto.Message) ([]byte, error) { return []byte{0xab}, h.err } @@ -27,12 +28,14 @@ func TestHandler_Process(t *testing.T) { defer func() { protoenc = encoding.NewProtoEncoder() }() h := newHandler(bls.NewSigner(), fakeHasher{}) - req := &SignatureRequest{Message: makeMessage(t)} + req := mino.Request{ + Message: &SignatureRequest{Message: makeMessage(t)}, + } _, err := h.Process(req) require.NoError(t, err) - resp, err := h.Process(&empty.Empty{}) + resp, err := h.Process(mino.Request{Message: &empty.Empty{}}) require.EqualError(t, err, "invalid message type: *empty.Empty") require.Nil(t, resp) diff --git a/cosi/mod.go b/cosi/mod.go index 252f39a5c..2c3c47908 100644 --- a/cosi/mod.go +++ b/cosi/mod.go @@ -19,7 +19,7 @@ type CollectiveAuthority interface { // Hashable is the interface to implement to validate an incoming message for a // collective signing. It will return the hash that will be signed. type Hashable interface { - Hash(in proto.Message) ([]byte, error) + Hash(addr mino.Address, in proto.Message) ([]byte, error) } // Message is the type of input that can be provided to a collective signing diff --git a/mino/minoch/address.go b/mino/minoch/address.go index 4e8be40a8..542bb2fd9 100644 --- a/mino/minoch/address.go +++ b/mino/minoch/address.go @@ -7,6 +7,12 @@ type address struct { id string } +// Equal implements mino.Address. It returns true if both addresses are equal. +func (a address) Equal(other mino.Address) bool { + addr, ok := other.(address) + return ok && addr.id == a.id +} + // MarshalText returns the string representation of an address. func (a address) MarshalText() ([]byte, error) { return []byte(a.id), nil diff --git a/mino/minoch/mod.go b/mino/minoch/mod.go index 67f1d3675..e0b81694e 100644 --- a/mino/minoch/mod.go +++ b/mino/minoch/mod.go @@ -63,6 +63,7 @@ func (m *Minoch) MakeNamespace(path string) (mino.Mino, error) { func (m *Minoch) MakeRPC(name string, h mino.Handler) (mino.RPC, error) { rpc := RPC{ manager: m.manager, + addr: m.GetAddress(), path: fmt.Sprintf("%s/%s", m.path, name), h: h, } diff --git a/mino/minoch/rpc.go b/mino/minoch/rpc.go index 44f129680..b9009644f 100644 --- a/mino/minoch/rpc.go +++ b/mino/minoch/rpc.go @@ -22,6 +22,7 @@ type Envelope struct { // RPC is an implementation of the mino.RPC interface. type RPC struct { manager *Manager + addr mino.Address path string h mino.Handler } @@ -44,7 +45,12 @@ func (c RPC) Call(ctx context.Context, req proto.Message, defer wg.Done() if m != nil { - resp, err := m.rpcs[c.path].h.Process(cloneReq) + req := mino.Request{ + Address: c.addr, + Message: cloneReq, + } + + resp, err := m.rpcs[c.path].h.Process(req) if err != nil { errs <- err } diff --git a/mino/minogrpc/mod.go b/mino/minogrpc/mod.go index 5fadbd251..c7b087968 100644 --- a/mino/minogrpc/mod.go +++ b/mino/minogrpc/mod.go @@ -27,6 +27,11 @@ type address struct { id string } +func (a address) Equal(other mino.Address) bool { + addr, ok := other.(address) + return ok && addr.id == a.id +} + // MarshalText implements mino.Address func (a address) MarshalText() ([]byte, error) { return []byte(a.id), nil diff --git a/mino/minogrpc/overlay.go b/mino/minogrpc/overlay.go index 3ac1876bd..4289bc064 100644 --- a/mino/minogrpc/overlay.go +++ b/mino/minogrpc/overlay.go @@ -50,7 +50,12 @@ func (o overlayService) Call(ctx context.Context, msg *OverlayMsg) (*OverlayMsg, return nil, encoding.NewAnyDecodingError(msg.Message, err) } - result, err := handler.Process(dynamicAny.Message) + req := mino.Request{ + Address: o.addr, + Message: dynamicAny.Message, + } + + result, err := handler.Process(req) if err != nil { return nil, xerrors.Errorf("failed to call the Process function from "+ "the handler using the provided message: %v", err) diff --git a/mino/minogrpc/overlay_test.go b/mino/minogrpc/overlay_test.go index bbfd77564..4c7a43a70 100644 --- a/mino/minogrpc/overlay_test.go +++ b/mino/minogrpc/overlay_test.go @@ -14,6 +14,7 @@ import ( "go.dedis.ch/fabric" "go.dedis.ch/fabric/encoding" "go.dedis.ch/fabric/mino" + "golang.org/x/xerrors" "google.golang.org/grpc/metadata" ) @@ -162,7 +163,7 @@ type testFailHandler struct { mino.UnsupportedHandler } -func (t testFailHandler) Process(req proto.Message) (proto.Message, error) { +func (t testFailHandler) Process(req mino.Request) (proto.Message, error) { return nil, errors.New("oops") } @@ -177,6 +178,15 @@ type testFailHandler2 struct { t *testing.T } -func (t testFailHandler2) Process(req proto.Message) (proto.Message, error) { +func (t testFailHandler2) Process(req mino.Request) (proto.Message, error) { return nil, nil } + +func (t testFailHandler2) Stream(out mino.Sender, in mino.Receiver) error { + any, err := ptypes.MarshalAny(&empty.Empty{}) + require.NoError(t.t, err) + + _, _, err = in.Recv(context.Background()) + require.True(t.t, xerrors.Is(err, encoding.NewAnyDecodingError(any, nil))) + return nil +} diff --git a/mino/minogrpc/server_test.go b/mino/minogrpc/server_test.go index 45baca26e..af2dc5aaf 100644 --- a/mino/minogrpc/server_test.go +++ b/mino/minogrpc/server_test.go @@ -856,8 +856,8 @@ type testSameHandler struct { timeout time.Duration } -func (t testSameHandler) Process(req proto.Message) (proto.Message, error) { - return req, nil +func (t testSameHandler) Process(req mino.Request) (proto.Message, error) { + return req.Message, nil } func (t testSameHandler) Combine(req []proto.Message) ([]proto.Message, error) { @@ -891,8 +891,8 @@ func (t testSameHandler) Stream(out mino.Sender, in mino.Receiver) error { type testModifyHandler struct { } -func (t testModifyHandler) Process(req proto.Message) (proto.Message, error) { - msg, ok := req.(*empty.Empty) +func (t testModifyHandler) Process(req mino.Request) (proto.Message, error) { + msg, ok := req.Message.(*empty.Empty) if !ok { return nil, xerrors.Errorf("failed to parse request") } diff --git a/mino/mod.go b/mino/mod.go index 5b921880a..74cf7d599 100644 --- a/mino/mod.go +++ b/mino/mod.go @@ -15,6 +15,7 @@ import ( type Address interface { encoding.TextMarshaler + Equal(other Address) bool String() string } @@ -43,6 +44,14 @@ type Sender interface { Send(msg proto.Message, addrs ...Address) <-chan error } +// Request is a wrapper around the context of a message received from a player +// and that needs to be processed by the node. It provides some useful +// information about the network layer. +type Request struct { + Address Address + Message proto.Message +} + // Receiver is an interface to provide primitives to receive messages from // recipients. type Receiver interface { @@ -65,7 +74,7 @@ type RPC interface { type Handler interface { // Process handles a single request by producing the response according to // the request message. - Process(req proto.Message) (resp proto.Message, err error) + Process(req Request) (resp proto.Message, err error) // Combine gives a chance to reduce the network load by combining multiple // messages for a collect call on the intermediate nodes. @@ -81,7 +90,7 @@ type Handler interface { type UnsupportedHandler struct{} // Process is the default implementation for a handler. It will return an error. -func (h UnsupportedHandler) Process(req proto.Message) (proto.Message, error) { +func (h UnsupportedHandler) Process(req Request) (proto.Message, error) { return nil, errors.New("rpc is not supported") } diff --git a/mino/mod_test.go b/mino/mod_test.go index 847a99f84..ebad4f20f 100644 --- a/mino/mod_test.go +++ b/mino/mod_test.go @@ -10,7 +10,7 @@ import ( func TestUnsupportedHandler_Process(t *testing.T) { h := UnsupportedHandler{} - resp, err := h.Process(nil) + resp, err := h.Process(Request{}) require.Error(t, err) require.Nil(t, resp) } From b3574d85068669479f0cc309ee70962794e5d1ab Mon Sep 17 00:00:00 2001 From: Gaylor Bosson Date: Mon, 23 Mar 2020 17:07:48 +0100 Subject: [PATCH 2/7] Skipchain: propose only when the leader --- blockchain/skipchain/block_test.go | 4 ++++ blockchain/skipchain/conode_test.go | 13 +++++++++++++ blockchain/skipchain/mod.go | 19 ++++++++++++++----- blockchain/skipchain/mod_test.go | 14 ++++++++++---- 4 files changed, 41 insertions(+), 9 deletions(-) diff --git a/blockchain/skipchain/block_test.go b/blockchain/skipchain/block_test.go index d337793a1..4fb1f5782 100644 --- a/blockchain/skipchain/block_test.go +++ b/blockchain/skipchain/block_test.go @@ -579,6 +579,10 @@ type fakeMino struct { err error } +func (m fakeMino) GetAddress() mino.Address { + return fakeAddress{} +} + func (m fakeMino) GetAddressFactory() mino.AddressFactory { return fakeAddressFactory{} } diff --git a/blockchain/skipchain/conode_test.go b/blockchain/skipchain/conode_test.go index f9d230173..ad106b0f1 100644 --- a/blockchain/skipchain/conode_test.go +++ b/blockchain/skipchain/conode_test.go @@ -125,6 +125,19 @@ func TestPublicKeyIterator_GetNext(t *testing.T) { require.Nil(t, iter.GetNext()) } +func TestConodes_GetLeader(t *testing.T) { + require.Nil(t, Conodes{}.GetLeader()) + require.NotNil(t, Conodes{{addr: fakeAddress{}}}.GetLeader()) +} + +func TestConodes_HasLeader(t *testing.T) { + addr := fakeAddress{id: []byte{0xaa}} + conodes := Conodes{{addr: addr}} + + require.True(t, conodes.HasLeader(addr)) + require.False(t, conodes.HasLeader(fakeAddress{})) +} + func TestConodes_Take(t *testing.T) { conodes := Conodes{randomConode(), randomConode(), randomConode()} diff --git a/blockchain/skipchain/mod.go b/blockchain/skipchain/mod.go index 175507f7f..eacc9b325 100644 --- a/blockchain/skipchain/mod.go +++ b/blockchain/skipchain/mod.go @@ -213,21 +213,30 @@ func (a skipchainActor) InitChain(data proto.Message, players mino.Players) erro func (a skipchainActor) Store(data proto.Message, players mino.Players) error { factory := a.GetBlockFactory().(blockFactory) + ca, ok := players.(cosi.CollectiveAuthority) + if !ok { + return xerrors.Errorf("players must implement cosi.CollectiveAuthority") + } + previous, err := a.db.ReadLast() if err != nil { return xerrors.Errorf("couldn't read the latest block: %v", err) } - // TODO: skip if not leader.. - block, err := factory.fromPrevious(previous, data) if err != nil { return xerrors.Errorf("couldn't create next block: %v", err) } - err = a.consensus.Propose(block, players) - if err != nil { - return xerrors.Errorf("couldn't propose the block: %v", err) + block.Conodes = newConodes(ca) + + if block.Conodes.HasLeader(a.mino.GetAddress()) { + err = a.consensus.Propose(block, players) + if err != nil { + return xerrors.Errorf("couldn't propose the block: %v", err) + } + } else { + // TODO: send proposal to the leader.. } return nil diff --git a/blockchain/skipchain/mod_test.go b/blockchain/skipchain/mod_test.go index 764180623..e155bdd62 100644 --- a/blockchain/skipchain/mod_test.go +++ b/blockchain/skipchain/mod_test.go @@ -186,21 +186,27 @@ func TestActor_InitChain(t *testing.T) { func TestActor_Store(t *testing.T) { actor := skipchainActor{ Skipchain: &Skipchain{ - db: &fakeDatabase{}, + mino: fakeMino{}, + db: &fakeDatabase{}, }, consensus: fakeConsensusActor{}, } - err := actor.Store(&empty.Empty{}, Conodes{}) + conodes := Conodes{{addr: fakeAddress{}}} + + err := actor.Store(&empty.Empty{}, conodes) require.NoError(t, err) + err = actor.Store(&empty.Empty{}, fakePlayers{}) + require.EqualError(t, err, "players must implement cosi.CollectiveAuthority") + actor.Skipchain.db = &fakeDatabase{err: xerrors.New("oops")} - err = actor.Store(&empty.Empty{}, Conodes{}) + err = actor.Store(&empty.Empty{}, conodes) require.EqualError(t, err, "couldn't read the latest block: oops") actor.Skipchain.db = &fakeDatabase{} actor.consensus = fakeConsensusActor{err: xerrors.New("oops")} - err = actor.Store(&empty.Empty{}, Conodes{}) + err = actor.Store(&empty.Empty{}, conodes) require.EqualError(t, err, "couldn't propose the block: oops") } From c1550c714edf1d436aedc3c164875b08d6b15ee0 Mon Sep 17 00:00:00 2001 From: Gaylor Bosson Date: Tue, 24 Mar 2020 11:30:21 +0100 Subject: [PATCH 3/7] View change module to insure roster integrity --- blockchain/mod.go | 14 +++-- blockchain/skipchain/block.go | 6 ++ blockchain/skipchain/conode.go | 25 +++++--- blockchain/skipchain/conode_test.go | 13 ---- blockchain/skipchain/mod.go | 42 +++++++++---- blockchain/skipchain/mod_test.go | 15 ++++- blockchain/skipchain/validator.go | 9 +-- blockchain/skipchain/validator_test.go | 11 ++-- blockchain/skipchain/viewchange/constant.go | 67 +++++++++++++++++++++ blockchain/skipchain/viewchange/mod.go | 13 ++++ mino/mod.go | 2 + 11 files changed, 167 insertions(+), 50 deletions(-) create mode 100644 blockchain/skipchain/viewchange/constant.go create mode 100644 blockchain/skipchain/viewchange/mod.go diff --git a/blockchain/mod.go b/blockchain/mod.go index ab9a793e2..122806f3c 100644 --- a/blockchain/mod.go +++ b/blockchain/mod.go @@ -14,6 +14,8 @@ type Block interface { encoding.Packable GetHash() []byte + + GetPlayers() mino.Players } // VerifiableBlock is an extension of a block so that its integrity can be @@ -29,10 +31,14 @@ type BlockFactory interface { FromVerifiable(src proto.Message) (Block, error) } -// Validator is the interface to implement to validate the generic payload -// stored in the block. -type Validator interface { +// PayloadProcessor is the interface to implement to validate the generic +// payload stored in the block. +type PayloadProcessor interface { + // Validate should return nil if the data pass the validation. Validate(data proto.Message) error + + // Commit should process the data and perform any operation required when + // new data is stored on the chain. Commit(data proto.Message) error } @@ -56,7 +62,7 @@ type Blockchain interface { // Listen starts to listen for messages and returns the actor that the // client can use to propose new blocks. - Listen(validator Validator) (Actor, error) + Listen(validator PayloadProcessor) (Actor, error) // GetBlock returns the latest block. GetBlock() (Block, error) diff --git a/blockchain/skipchain/block.go b/blockchain/skipchain/block.go index 73f93da47..0f4f03332 100644 --- a/blockchain/skipchain/block.go +++ b/blockchain/skipchain/block.go @@ -13,6 +13,7 @@ import ( "go.dedis.ch/fabric/consensus" "go.dedis.ch/fabric/crypto" "go.dedis.ch/fabric/encoding" + "go.dedis.ch/fabric/mino" "golang.org/x/xerrors" ) @@ -107,6 +108,11 @@ func (b SkipBlock) GetPreviousHash() []byte { return b.BackLink.Bytes() } +// GetPlayers implements blockchain.Block. It returns the list of players. +func (b SkipBlock) GetPlayers() mino.Players { + return b.Conodes +} + // GetVerifier implements consensus.Proposal. It returns the verifier for the // block. // TODO: it might have sense to remove this function. diff --git a/blockchain/skipchain/conode.go b/blockchain/skipchain/conode.go index 0ecdbaccf..b98200758 100644 --- a/blockchain/skipchain/conode.go +++ b/blockchain/skipchain/conode.go @@ -128,19 +128,24 @@ func newConodes(ca cosi.CollectiveAuthority) Conodes { return conodes } -// GetLeader returns the leader of the conodes which is the conode at index 0. -func (cc Conodes) GetLeader() mino.Address { - if len(cc) == 0 { - return nil +// Rotate takes the new leader and moves it to the beginning of the array while +// moving the old one to the end. +func (cc Conodes) Rotate(addr mino.Address) Conodes { + index := 0 + for i, conode := range cc { + if conode.GetAddress().Equal(addr) { + index = i + } } - return cc[0].GetAddress() -} + if index == 0 { + return cc + } -// HasLeader returns true if the address belongs to the current leader. -func (cc Conodes) HasLeader(addr mino.Address) bool { - leader := cc.GetLeader() - return leader != nil && leader.Equal(addr) + newConodes := append(Conodes{cc[index]}, cc[1:index]...) + newConodes = append(newConodes, cc[index+1:]...) + newConodes = append(newConodes, cc[0]) + return newConodes } // Take implements mino.Players. It returns a subset of the conodes. diff --git a/blockchain/skipchain/conode_test.go b/blockchain/skipchain/conode_test.go index ad106b0f1..f9d230173 100644 --- a/blockchain/skipchain/conode_test.go +++ b/blockchain/skipchain/conode_test.go @@ -125,19 +125,6 @@ func TestPublicKeyIterator_GetNext(t *testing.T) { require.Nil(t, iter.GetNext()) } -func TestConodes_GetLeader(t *testing.T) { - require.Nil(t, Conodes{}.GetLeader()) - require.NotNil(t, Conodes{{addr: fakeAddress{}}}.GetLeader()) -} - -func TestConodes_HasLeader(t *testing.T) { - addr := fakeAddress{id: []byte{0xaa}} - conodes := Conodes{{addr: addr}} - - require.True(t, conodes.HasLeader(addr)) - require.False(t, conodes.HasLeader(fakeAddress{})) -} - func TestConodes_Take(t *testing.T) { conodes := Conodes{randomConode(), randomConode(), randomConode()} diff --git a/blockchain/skipchain/mod.go b/blockchain/skipchain/mod.go index eacc9b325..9b0a4392b 100644 --- a/blockchain/skipchain/mod.go +++ b/blockchain/skipchain/mod.go @@ -12,6 +12,7 @@ import ( "github.com/golang/protobuf/proto" "go.dedis.ch/fabric" "go.dedis.ch/fabric/blockchain" + "go.dedis.ch/fabric/blockchain/skipchain/viewchange" "go.dedis.ch/fabric/consensus" "go.dedis.ch/fabric/consensus/cosipbft" "go.dedis.ch/fabric/cosi" @@ -33,11 +34,12 @@ const ( // // - implements blockchain.Blockchain type Skipchain struct { - mino mino.Mino - cosi cosi.CollectiveSigning - db Database - consensus consensus.Consensus - watcher blockchain.Observable + mino mino.Mino + cosi cosi.CollectiveSigning + db Database + consensus consensus.Consensus + watcher blockchain.Observable + viewchange viewchange.ViewChange } // NewSkipchain returns a new instance of Skipchain. @@ -56,7 +58,9 @@ func NewSkipchain(m mino.Mino, cosi cosi.CollectiveSigning) *Skipchain { // Listen implements blockchain.Blockchain. It registers the RPC and starts the // consensus module. -func (s *Skipchain) Listen(v blockchain.Validator) (blockchain.Actor, error) { +func (s *Skipchain) Listen(v blockchain.PayloadProcessor) (blockchain.Actor, error) { + s.viewchange = viewchange.NewConstant(s.mino.GetAddress(), s) + actor := skipchainActor{ Skipchain: s, hashFactory: sha256Factory{}, @@ -230,13 +234,27 @@ func (a skipchainActor) Store(data proto.Message, players mino.Players) error { block.Conodes = newConodes(ca) - if block.Conodes.HasLeader(a.mino.GetAddress()) { - err = a.consensus.Propose(block, players) - if err != nil { - return xerrors.Errorf("couldn't propose the block: %v", err) - } + // Wait for the view change module green signal to go through the proposal. + // If the leader has failed and this node has to take over, we use the + // inherant property of CoSiPBFT to prove that 2f participants want the view + // change. + err = a.viewchange.Wait(block) + if err == nil { + // If the node is not the current leader and a rotation is necessary, it + // will be done. + block.Conodes = block.Conodes.Rotate(a.mino.GetAddress()) } else { - // TODO: send proposal to the leader.. + fabric.Logger.Debug().Msgf("%v refusing view change: %v", + a.mino.GetAddress(), err) + // Not authorized to propose a block as the leader is moving + // forward so we drop the proposal. The upper layer is responsible to + // try again until the leader includes the data. + return nil + } + + err = a.consensus.Propose(block, players) + if err != nil { + return xerrors.Errorf("couldn't propose the block: %v", err) } return nil diff --git a/blockchain/skipchain/mod_test.go b/blockchain/skipchain/mod_test.go index e155bdd62..d4473646d 100644 --- a/blockchain/skipchain/mod_test.go +++ b/blockchain/skipchain/mod_test.go @@ -186,8 +186,9 @@ func TestActor_InitChain(t *testing.T) { func TestActor_Store(t *testing.T) { actor := skipchainActor{ Skipchain: &Skipchain{ - mino: fakeMino{}, - db: &fakeDatabase{}, + viewchange: fakeViewChange{}, + mino: fakeMino{}, + db: &fakeDatabase{}, }, consensus: fakeConsensusActor{}, } @@ -299,3 +300,13 @@ func (rand fakeRandGenerator) Read(buffer []byte) (int, error) { } return len(buffer), rand.err } + +type fakeViewChange struct{} + +func (vc fakeViewChange) Wait(blockchain.Block) error { + return nil +} + +func (vc fakeViewChange) Verify(blockchain.Block) error { + return nil +} diff --git a/blockchain/skipchain/validator.go b/blockchain/skipchain/validator.go index 514011eb4..43e91cbda 100644 --- a/blockchain/skipchain/validator.go +++ b/blockchain/skipchain/validator.go @@ -20,14 +20,14 @@ import ( type blockValidator struct { *Skipchain - validator blockchain.Validator + validator blockchain.PayloadProcessor queue *blockQueue watcher blockchain.Observable } func newBlockValidator( s *Skipchain, - v blockchain.Validator, + v blockchain.PayloadProcessor, w blockchain.Observable, ) *blockValidator { return &blockValidator{ @@ -63,8 +63,9 @@ func (v *blockValidator) Validate(addr mino.Address, genesis.hash, block.GenesisID) } - if !genesis.Conodes.HasLeader(addr) { - return nil, xerrors.Errorf("mismatch leader %v != %v", addr, genesis.Conodes.GetLeader()) + err = v.viewchange.Verify(block) + if err != nil { + return nil, xerrors.Errorf("viewchange refused the block: %v", err) } err = v.validator.Validate(block.Payload) diff --git a/blockchain/skipchain/validator_test.go b/blockchain/skipchain/validator_test.go index aff89b2e2..dd899826f 100644 --- a/blockchain/skipchain/validator_test.go +++ b/blockchain/skipchain/validator_test.go @@ -23,10 +23,11 @@ func TestBlockValidator_Validate(t *testing.T) { validator: fakeValidator{}, watcher: &fakeWatcher{}, Skipchain: &Skipchain{ - db: &fakeDatabase{genesisID: block.GenesisID}, - cosi: fakeCosi{}, - mino: fakeMino{}, - consensus: fakeConsensus{}, + viewchange: fakeViewChange{}, + db: &fakeDatabase{genesisID: block.GenesisID}, + cosi: fakeCosi{}, + mino: fakeMino{}, + consensus: fakeConsensus{}, }, } prop, err := v.Validate(fakeAddress{}, packed) @@ -95,7 +96,7 @@ func TestBlockValidator_Commit(t *testing.T) { } type fakeValidator struct { - blockchain.Validator + blockchain.PayloadProcessor err error } diff --git a/blockchain/skipchain/viewchange/constant.go b/blockchain/skipchain/viewchange/constant.go new file mode 100644 index 000000000..8b98391ee --- /dev/null +++ b/blockchain/skipchain/viewchange/constant.go @@ -0,0 +1,67 @@ +package viewchange + +import ( + "go.dedis.ch/fabric/blockchain" + "go.dedis.ch/fabric/mino" + "golang.org/x/xerrors" +) + +// ConstantViewChange is a naive implementation of the view change that will +// simply keep the same leader all the time. +// +// - implements viewchange.ViewChange +type ConstantViewChange struct { + addr mino.Address + bc blockchain.Blockchain +} + +// NewConstant returns a new instance of a view change. +func NewConstant(addr mino.Address, bc blockchain.Blockchain) ConstantViewChange { + return ConstantViewChange{ + addr: addr, + bc: bc, + } +} + +// Wait implements viewchange.ViewChange. It returns an error if the address +// does not match the leader of the previous block. +func (vc ConstantViewChange) Wait(block blockchain.Block) error { + latest, err := vc.bc.GetBlock() + if err != nil { + return xerrors.Errorf("couldn't read latest block: %v", err) + } + + if latest.GetPlayers().Len() == 0 { + return xerrors.New("players is empty") + } + + leader := latest.GetPlayers().AddressIterator().GetNext() + + if !leader.Equal(vc.addr) { + return xerrors.Errorf("mismatching leader: %v != %v", leader, vc.addr) + } + + return nil +} + +func getLeader(block blockchain.Block) mino.Address { + return block.GetPlayers().AddressIterator().GetNext() +} + +// Verify implements viewchange.ViewChange. It returns an error if the first +// player of the block does not match the address. +func (vc ConstantViewChange) Verify(block blockchain.Block) error { + latest, err := vc.bc.GetBlock() + if err != nil { + return xerrors.Errorf("couldn't read latest block: %v", err) + } + + newLeader := getLeader(block) + oldLeader := getLeader(latest) + + if !newLeader.Equal(oldLeader) { + return xerrors.Errorf("mismatching leader: %v != %v", newLeader, oldLeader) + } + + return nil +} diff --git a/blockchain/skipchain/viewchange/mod.go b/blockchain/skipchain/viewchange/mod.go new file mode 100644 index 000000000..1a7a5ac2d --- /dev/null +++ b/blockchain/skipchain/viewchange/mod.go @@ -0,0 +1,13 @@ +package viewchange + +import ( + "go.dedis.ch/fabric/blockchain" +) + +// ViewChange provides primitives to verify if a participant is allowed to +// propose a block as the leader. It is also responsible for verifying the +// integrity of the players of the chain. +type ViewChange interface { + Wait(block blockchain.Block) error + Verify(block blockchain.Block) error +} diff --git a/mino/mod.go b/mino/mod.go index 74cf7d599..7dcd1820e 100644 --- a/mino/mod.go +++ b/mino/mod.go @@ -48,7 +48,9 @@ type Sender interface { // and that needs to be processed by the node. It provides some useful // information about the network layer. type Request struct { + // Address is the address of the sender of the request. Address Address + // Message is the message of the request. Message proto.Message } From 239249b776b7d7ebc5811c40b2d4a81b0f543e6b Mon Sep 17 00:00:00 2001 From: Gaylor Bosson Date: Tue, 24 Mar 2020 15:12:57 +0100 Subject: [PATCH 4/7] View change improvement and unit tests --- blockchain/skipchain/block_test.go | 2 +- blockchain/skipchain/conode.go | 20 ---- blockchain/skipchain/mod.go | 20 +++- blockchain/skipchain/mod_test.go | 45 ++++++-- blockchain/skipchain/validator_test.go | 5 + blockchain/skipchain/viewchange/mod.go | 13 --- .../{skipchain => }/viewchange/constant.go | 16 +-- blockchain/viewchange/constant_test.go | 106 ++++++++++++++++++ blockchain/viewchange/mod.go | 23 ++++ mino/option.go | 19 ++++ mino/option_test.go | 20 ++++ 11 files changed, 234 insertions(+), 55 deletions(-) delete mode 100644 blockchain/skipchain/viewchange/mod.go rename blockchain/{skipchain => }/viewchange/constant.go (73%) create mode 100644 blockchain/viewchange/constant_test.go create mode 100644 blockchain/viewchange/mod.go diff --git a/blockchain/skipchain/block_test.go b/blockchain/skipchain/block_test.go index 4fb1f5782..918a9a19e 100644 --- a/blockchain/skipchain/block_test.go +++ b/blockchain/skipchain/block_test.go @@ -580,7 +580,7 @@ type fakeMino struct { } func (m fakeMino) GetAddress() mino.Address { - return fakeAddress{} + return fakeAddress{id: []byte{0xaa}} } func (m fakeMino) GetAddressFactory() mino.AddressFactory { diff --git a/blockchain/skipchain/conode.go b/blockchain/skipchain/conode.go index b98200758..be60fdc36 100644 --- a/blockchain/skipchain/conode.go +++ b/blockchain/skipchain/conode.go @@ -128,26 +128,6 @@ func newConodes(ca cosi.CollectiveAuthority) Conodes { return conodes } -// Rotate takes the new leader and moves it to the beginning of the array while -// moving the old one to the end. -func (cc Conodes) Rotate(addr mino.Address) Conodes { - index := 0 - for i, conode := range cc { - if conode.GetAddress().Equal(addr) { - index = i - } - } - - if index == 0 { - return cc - } - - newConodes := append(Conodes{cc[index]}, cc[1:index]...) - newConodes = append(newConodes, cc[index+1:]...) - newConodes = append(newConodes, cc[0]) - return newConodes -} - // Take implements mino.Players. It returns a subset of the conodes. func (cc Conodes) Take(filters ...mino.FilterUpdater) mino.Players { f := mino.ApplyFilters(filters) diff --git a/blockchain/skipchain/mod.go b/blockchain/skipchain/mod.go index 9b0a4392b..d1956a7f1 100644 --- a/blockchain/skipchain/mod.go +++ b/blockchain/skipchain/mod.go @@ -7,12 +7,14 @@ package skipchain import ( "context" + "fmt" "time" "github.com/golang/protobuf/proto" + "github.com/rs/zerolog" "go.dedis.ch/fabric" "go.dedis.ch/fabric/blockchain" - "go.dedis.ch/fabric/blockchain/skipchain/viewchange" + "go.dedis.ch/fabric/blockchain/viewchange" "go.dedis.ch/fabric/consensus" "go.dedis.ch/fabric/consensus/cosipbft" "go.dedis.ch/fabric/cosi" @@ -33,7 +35,9 @@ const ( // between the blocks. // // - implements blockchain.Blockchain +// - implements fmt.Stringer type Skipchain struct { + logger zerolog.Logger mino mino.Mino cosi cosi.CollectiveSigning db Database @@ -48,6 +52,7 @@ func NewSkipchain(m mino.Mino, cosi cosi.CollectiveSigning) *Skipchain { db := NewInMemoryDatabase() return &Skipchain{ + logger: fabric.Logger, mino: m, cosi: cosi, db: db, @@ -142,6 +147,12 @@ func (s *Skipchain) Watch(ctx context.Context) <-chan blockchain.Block { return ch } +// String implements fmt.Stringer. It returns a simple representation of the +// skipchain instance to easily identify it. +func (s *Skipchain) String() string { + return fmt.Sprintf("skipchain@%v", s.mino.GetAddress()) +} + // skipchainActor provides the primitives of a blockchain actor. // // - implements blockchain.Actor @@ -238,14 +249,13 @@ func (a skipchainActor) Store(data proto.Message, players mino.Players) error { // If the leader has failed and this node has to take over, we use the // inherant property of CoSiPBFT to prove that 2f participants want the view // change. - err = a.viewchange.Wait(block) + rotation, err := a.viewchange.Wait(block) if err == nil { // If the node is not the current leader and a rotation is necessary, it // will be done. - block.Conodes = block.Conodes.Rotate(a.mino.GetAddress()) + block.Conodes = rotation.(Conodes) } else { - fabric.Logger.Debug().Msgf("%v refusing view change: %v", - a.mino.GetAddress(), err) + a.logger.Debug().Msgf("%v refusing view change: %v", a, err) // Not authorized to propose a block as the leader is moving // forward so we drop the proposal. The upper layer is responsible to // try again until the leader includes the data. diff --git a/blockchain/skipchain/mod_test.go b/blockchain/skipchain/mod_test.go index d4473646d..68f8c0824 100644 --- a/blockchain/skipchain/mod_test.go +++ b/blockchain/skipchain/mod_test.go @@ -1,11 +1,13 @@ package skipchain import ( + "bytes" "context" "testing" proto "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes/empty" + "github.com/rs/zerolog" "github.com/stretchr/testify/require" "go.dedis.ch/fabric/blockchain" "go.dedis.ch/fabric/consensus" @@ -184,19 +186,30 @@ func TestActor_InitChain(t *testing.T) { } func TestActor_Store(t *testing.T) { + buffer := new(bytes.Buffer) + cons := &fakeConsensusActor{} actor := skipchainActor{ Skipchain: &Skipchain{ + logger: zerolog.New(buffer), viewchange: fakeViewChange{}, mino: fakeMino{}, db: &fakeDatabase{}, }, - consensus: fakeConsensusActor{}, + consensus: cons, } - conodes := Conodes{{addr: fakeAddress{}}} + conodes := Conodes{ + {addr: fakeAddress{id: []byte{0xbb}}}, + {addr: fakeAddress{id: []byte{0xaa}}}, + {addr: fakeAddress{id: []byte{0xcc}}}, + } err := actor.Store(&empty.Empty{}, conodes) require.NoError(t, err) + // Make sure the conodes rotate if the view change allows it. + require.NotNil(t, cons.prop) + prop := cons.prop.(SkipBlock) + require.Equal(t, prop.Conodes[0].GetAddress(), conodes[1].GetAddress()) err = actor.Store(&empty.Empty{}, fakePlayers{}) require.EqualError(t, err, "players must implement cosi.CollectiveAuthority") @@ -206,7 +219,14 @@ func TestActor_Store(t *testing.T) { require.EqualError(t, err, "couldn't read the latest block: oops") actor.Skipchain.db = &fakeDatabase{} - actor.consensus = fakeConsensusActor{err: xerrors.New("oops")} + actor.Skipchain.viewchange = fakeViewChange{err: xerrors.New("oops")} + err = actor.Store(&empty.Empty{}, conodes) + // A view change is ignored. + require.NoError(t, err) + require.Contains(t, buffer.String(), "skipchain@aa refusing view change: oops") + + actor.Skipchain.viewchange = fakeViewChange{} + actor.consensus = &fakeConsensusActor{err: xerrors.New("oops")} err = actor.Store(&empty.Empty{}, conodes) require.EqualError(t, err, "couldn't propose the block: oops") } @@ -282,10 +302,12 @@ type fakePlayers struct { type fakeConsensusActor struct { consensus.Actor - err error + err error + prop consensus.Proposal } -func (a fakeConsensusActor) Propose(consensus.Proposal, mino.Players) error { +func (a *fakeConsensusActor) Propose(prop consensus.Proposal, pp mino.Players) error { + a.prop = prop return a.err } @@ -301,12 +323,17 @@ func (rand fakeRandGenerator) Read(buffer []byte) (int, error) { return len(buffer), rand.err } -type fakeViewChange struct{} +type fakeViewChange struct { + err error +} -func (vc fakeViewChange) Wait(blockchain.Block) error { - return nil +func (vc fakeViewChange) Wait(block blockchain.Block) (mino.Players, error) { + // Simulate a rotating view change. + players := block.GetPlayers(). + Take(mino.RangeFilter(0, block.GetPlayers().Len()), mino.RotateFilter(1)) + return players, vc.err } func (vc fakeViewChange) Verify(blockchain.Block) error { - return nil + return vc.err } diff --git a/blockchain/skipchain/validator_test.go b/blockchain/skipchain/validator_test.go index dd899826f..abe3ba925 100644 --- a/blockchain/skipchain/validator_test.go +++ b/blockchain/skipchain/validator_test.go @@ -49,6 +49,11 @@ func TestBlockValidator_Validate(t *testing.T) { fmt.Sprintf("mismatch genesis hash '%v' != '%v'", Digest{}, block.GenesisID)) v.Skipchain.db = &fakeDatabase{genesisID: block.GenesisID} + v.Skipchain.viewchange = fakeViewChange{err: xerrors.New("oops")} + _, err = v.Validate(fakeAddress{}, packed) + require.EqualError(t, err, "viewchange refused the block: oops") + + v.Skipchain.viewchange = fakeViewChange{} v.validator = fakeValidator{err: xerrors.New("oops")} _, err = v.Validate(fakeAddress{}, packed) require.EqualError(t, err, "couldn't validate the payload: oops") diff --git a/blockchain/skipchain/viewchange/mod.go b/blockchain/skipchain/viewchange/mod.go deleted file mode 100644 index 1a7a5ac2d..000000000 --- a/blockchain/skipchain/viewchange/mod.go +++ /dev/null @@ -1,13 +0,0 @@ -package viewchange - -import ( - "go.dedis.ch/fabric/blockchain" -) - -// ViewChange provides primitives to verify if a participant is allowed to -// propose a block as the leader. It is also responsible for verifying the -// integrity of the players of the chain. -type ViewChange interface { - Wait(block blockchain.Block) error - Verify(block blockchain.Block) error -} diff --git a/blockchain/skipchain/viewchange/constant.go b/blockchain/viewchange/constant.go similarity index 73% rename from blockchain/skipchain/viewchange/constant.go rename to blockchain/viewchange/constant.go index 8b98391ee..8b4e10740 100644 --- a/blockchain/skipchain/viewchange/constant.go +++ b/blockchain/viewchange/constant.go @@ -7,7 +7,8 @@ import ( ) // ConstantViewChange is a naive implementation of the view change that will -// simply keep the same leader all the time. +// simply keep the same leader all the time and only allow a leader to propose a +// block. // // - implements viewchange.ViewChange type ConstantViewChange struct { @@ -24,24 +25,25 @@ func NewConstant(addr mino.Address, bc blockchain.Blockchain) ConstantViewChange } // Wait implements viewchange.ViewChange. It returns an error if the address -// does not match the leader of the previous block. -func (vc ConstantViewChange) Wait(block blockchain.Block) error { +// does not match the leader of the previous block. The implementation of the +// returned players is preserved. +func (vc ConstantViewChange) Wait(block blockchain.Block) (mino.Players, error) { latest, err := vc.bc.GetBlock() if err != nil { - return xerrors.Errorf("couldn't read latest block: %v", err) + return nil, xerrors.Errorf("couldn't read latest block: %v", err) } if latest.GetPlayers().Len() == 0 { - return xerrors.New("players is empty") + return nil, xerrors.New("players is empty") } leader := latest.GetPlayers().AddressIterator().GetNext() if !leader.Equal(vc.addr) { - return xerrors.Errorf("mismatching leader: %v != %v", leader, vc.addr) + return nil, xerrors.Errorf("mismatching leader: %v != %v", leader, vc.addr) } - return nil + return block.GetPlayers(), nil } func getLeader(block blockchain.Block) mino.Address { diff --git a/blockchain/viewchange/constant_test.go b/blockchain/viewchange/constant_test.go new file mode 100644 index 000000000..9bb349f72 --- /dev/null +++ b/blockchain/viewchange/constant_test.go @@ -0,0 +1,106 @@ +package viewchange + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.dedis.ch/fabric/blockchain" + "go.dedis.ch/fabric/mino" + "golang.org/x/xerrors" +) + +func TestConstantViewChange_Wait(t *testing.T) { + vc := NewConstant(fakeAddress{id: "unknown"}, fakeBlockchain{len: 1, id: "unknown"}) + + players, err := vc.Wait(fakeBlock{}) + require.NoError(t, err) + require.NotNil(t, players) + + vc.bc = fakeBlockchain{err: xerrors.New("oops")} + _, err = vc.Wait(fakeBlock{}) + require.EqualError(t, err, "couldn't read latest block: oops") + + vc.bc = fakeBlockchain{len: 0} + _, err = vc.Wait(fakeBlock{}) + require.EqualError(t, err, "players is empty") + + vc.bc = fakeBlockchain{len: 1, id: "unknown"} + vc.addr = fakeAddress{id: "deadbeef"} + _, err = vc.Wait(fakeBlock{}) + require.EqualError(t, err, "mismatching leader: unknown != deadbeef") +} + +func TestConstantViewChange_Verify(t *testing.T) { + vc := NewConstant(fakeAddress{}, fakeBlockchain{}) + + err := vc.Verify(fakeBlock{}) + require.NoError(t, err) + + vc.bc = fakeBlockchain{err: xerrors.New("oops")} + err = vc.Verify(fakeBlock{}) + require.EqualError(t, err, "couldn't read latest block: oops") + + vc.bc = fakeBlockchain{id: "B"} + err = vc.Verify(fakeBlock{id: "A"}) + require.EqualError(t, err, "mismatching leader: A != B") +} + +//------------------------------------------------------------------------------ +// Utility functions + +type fakeAddress struct { + mino.Address + id string +} + +func (a fakeAddress) Equal(other mino.Address) bool { + return a.id == other.(fakeAddress).id +} + +func (a fakeAddress) String() string { + return a.id +} + +type fakeIterator struct { + mino.AddressIterator + id string +} + +func (i fakeIterator) GetNext() mino.Address { + return fakeAddress{id: i.id} +} + +type fakePlayers struct { + mino.Players + len int + id string +} + +func (p fakePlayers) Len() int { + return p.len +} + +func (p fakePlayers) AddressIterator() mino.AddressIterator { + return fakeIterator{id: p.id} +} + +type fakeBlock struct { + blockchain.Block + len int + id string +} + +func (b fakeBlock) GetPlayers() mino.Players { + return fakePlayers{len: b.len, id: b.id} +} + +type fakeBlockchain struct { + blockchain.Blockchain + err error + len int + id string +} + +func (bc fakeBlockchain) GetBlock() (blockchain.Block, error) { + return fakeBlock{len: bc.len, id: bc.id}, bc.err +} diff --git a/blockchain/viewchange/mod.go b/blockchain/viewchange/mod.go new file mode 100644 index 000000000..9e8d91bfd --- /dev/null +++ b/blockchain/viewchange/mod.go @@ -0,0 +1,23 @@ +package viewchange + +import ( + "go.dedis.ch/fabric/blockchain" + "go.dedis.ch/fabric/mino" +) + +// ViewChange provides primitives to verify if a participant is allowed to +// propose a block as the leader. It is also responsible for verifying the +// integrity of the players of the chain. +type ViewChange interface { + // Wait returns a non-nil error when the node is allowed to make the + // proposal. It will also return the authorized list of players that must be + // used so that the Verify function returns nil. + // + // Note: the implementation of the returned mino.Players interface must be + // preserved. + Wait(block blockchain.Block) (mino.Players, error) + + // Verify makes sure that the players for the given are authorized and in + // the right order if necessary. + Verify(block blockchain.Block) error +} diff --git a/mino/option.go b/mino/option.go index 1502427b3..077c2af22 100644 --- a/mino/option.go +++ b/mino/option.go @@ -30,6 +30,25 @@ func ApplyFilters(filters []FilterUpdater) *Filter { // FilterUpdater is a function to update the filters. type FilterUpdater func(*Filter) +// RotateFilter is a filter to rotate the indices. When n is above zero, it will +// rotate by n steps on the left and when n is below, it will do the same on the +// right. The behaviour is unknown if not used as the last filter as next +// updaters could change the order. +func RotateFilter(n int) FilterUpdater { + return func(filter *Filter) { + if len(filter.Indices) == 0 { + return + } + + n = n % len(filter.Indices) + if n < 0 { + n += len(filter.Indices) + } + + filter.Indices = append(filter.Indices[n:], filter.Indices[:n]...) + } +} + // IndexFilter is a filter to include a given index. func IndexFilter(index int) FilterUpdater { return func(filters *Filter) { diff --git a/mino/option_test.go b/mino/option_test.go index 59f9225d9..9b7d573a6 100644 --- a/mino/option_test.go +++ b/mino/option_test.go @@ -11,6 +11,26 @@ func TestFilter_ParseFilters(t *testing.T) { require.Equal(t, []int{1}, filters.Indices) } +func TestFilter_RotateFilter(t *testing.T) { + filters := &Filter{Indices: []int{1, 2, 3, 4, 5}} + + RotateFilter(-2)(filters) + require.Equal(t, filters.Indices, []int{4, 5, 1, 2, 3}) + + RotateFilter(3)(filters) + require.Equal(t, filters.Indices, []int{2, 3, 4, 5, 1}) + + RotateFilter(10)(filters) + require.Equal(t, filters.Indices, []int{2, 3, 4, 5, 1}) + + RotateFilter(-7)(filters) + require.Equal(t, filters.Indices, []int{5, 1, 2, 3, 4}) + + filters = &Filter{} + RotateFilter(3)(filters) + require.Equal(t, filters.Indices, []int(nil)) +} + func TestFilter_IndexFilter(t *testing.T) { filters := &Filter{Indices: []int{}} From 8ecb9a965dc19e49fc5883eaa0135a87611fcd99 Mon Sep 17 00:00:00 2001 From: Gaylor Bosson Date: Tue, 24 Mar 2020 15:58:22 +0100 Subject: [PATCH 5/7] Add missing address tests --- mino/minoch/address_test.go | 15 +++++++++++++++ mino/minogrpc/mod_test.go | 11 +++++++++++ 2 files changed, 26 insertions(+) diff --git a/mino/minoch/address_test.go b/mino/minoch/address_test.go index f702601ab..bacab9aed 100644 --- a/mino/minoch/address_test.go +++ b/mino/minoch/address_test.go @@ -6,8 +6,16 @@ import ( "testing/quick" "github.com/stretchr/testify/require" + "go.dedis.ch/fabric/mino" ) +func TestAddress_Equal(t *testing.T) { + addr := address{id: "A"} + require.True(t, addr.Equal(addr)) + require.False(t, addr.Equal(address{})) + require.False(t, addr.Equal(fakeAddress{})) +} + func TestAddress_MarshalText(t *testing.T) { f := func(id string) bool { addr := address{id: id} @@ -43,3 +51,10 @@ func TestAddressFactory_FromText(t *testing.T) { err := quick.Check(f, nil) require.NoError(t, err) } + +//------------------------------------------------------------------------------ +// Utility functions + +type fakeAddress struct { + mino.Address +} diff --git a/mino/minogrpc/mod_test.go b/mino/minogrpc/mod_test.go index 0ae372e1c..f488fe03f 100644 --- a/mino/minogrpc/mod_test.go +++ b/mino/minogrpc/mod_test.go @@ -114,6 +114,13 @@ func Test_MakeRPC(t *testing.T) { } +func TestAddress_Equal(t *testing.T) { + addr := address{id: "A"} + require.True(t, addr.Equal(addr)) + require.False(t, addr.Equal(address{})) + require.False(t, addr.Equal(fakeAddress{})) +} + func TestAddress_MarshalText(t *testing.T) { f := func(id string) bool { addr := address{id: id} @@ -224,3 +231,7 @@ func (it *fakeAddressIterator) GetNext() mino.Address { it.cursor++ return p } + +type fakeAddress struct { + mino.Address +} From 77ee16f4d70333f8cf8aaa365212409bbfaad0fd Mon Sep 17 00:00:00 2001 From: Gaylor Bosson Date: Wed, 25 Mar 2020 08:23:50 +0100 Subject: [PATCH 6/7] Address nkcr's comments --- blockchain/viewchange/mod.go | 6 +++--- docs/introduction.md | 4 ++++ 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/blockchain/viewchange/mod.go b/blockchain/viewchange/mod.go index 9e8d91bfd..27dfbf0db 100644 --- a/blockchain/viewchange/mod.go +++ b/blockchain/viewchange/mod.go @@ -9,9 +9,9 @@ import ( // propose a block as the leader. It is also responsible for verifying the // integrity of the players of the chain. type ViewChange interface { - // Wait returns a non-nil error when the node is allowed to make the - // proposal. It will also return the authorized list of players that must be - // used so that the Verify function returns nil. + // Wait returns a nil error when the player is allowed to propose the block. + // It will also return the authorized list of players that must be used so + // that the Verify function returns nil. // // Note: the implementation of the returned mino.Players interface must be // preserved. diff --git a/docs/introduction.md b/docs/introduction.md index d6cd45d64..a4dbff78e 100644 --- a/docs/introduction.md +++ b/docs/introduction.md @@ -30,6 +30,10 @@ An intro... - **node** - A node is... +- **payload** - A payload is the data that a block will store. The blockchain + implementation does not know the data structure thus requires a + *PayloadProcessor* that will validate during the consensus. + - **proof** - A proof is... - **protobuf** - Protobuf is... From af99baabefbb951b7f2009beff296e3a0642f920 Mon Sep 17 00:00:00 2001 From: Gaylor Bosson Date: Wed, 25 Mar 2020 08:45:12 +0100 Subject: [PATCH 7/7] Update option comment --- mino/option.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mino/option.go b/mino/option.go index 077c2af22..a5f800033 100644 --- a/mino/option.go +++ b/mino/option.go @@ -10,7 +10,7 @@ type Filter struct { // list if updated based on the filter that we apply. For example, [0,3] // tells that this filter keeps 2 elements from the underlying data // structure we filter that are stored at indexes 0, 3. This list is always - // sorted. + // sorted and can be shifted in a circular way. Indices []int }