Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skipchain leader only #25

Merged
merged 7 commits into from
Mar 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions blockchain/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ type Block interface {
encoding.Packable

GetHash() []byte

GetPlayers() mino.Players
}

// VerifiableBlock is an extension of a block so that its integrity can be
Expand All @@ -29,10 +31,14 @@ type BlockFactory interface {
FromVerifiable(src proto.Message) (Block, error)
}

// Validator is the interface to implement to validate the generic payload
// stored in the block.
type Validator interface {
// PayloadProcessor is the interface to implement to validate the generic
// payload stored in the block.
type PayloadProcessor interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes me think: we should also add the definition of "payload" to our terminologies.

// Validate should return nil if the data pass the validation.
Validate(data proto.Message) error

// Commit should process the data and perform any operation required when
// new data is stored on the chain.
Commit(data proto.Message) error
}

Expand All @@ -56,7 +62,7 @@ type Blockchain interface {

// Listen starts to listen for messages and returns the actor that the
// client can use to propose new blocks.
Listen(validator Validator) (Actor, error)
Listen(validator PayloadProcessor) (Actor, error)

// GetBlock returns the latest block.
GetBlock() (Block, error)
Expand Down
6 changes: 6 additions & 0 deletions blockchain/skipchain/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.dedis.ch/fabric/consensus"
"go.dedis.ch/fabric/crypto"
"go.dedis.ch/fabric/encoding"
"go.dedis.ch/fabric/mino"
"golang.org/x/xerrors"
)

Expand Down Expand Up @@ -107,6 +108,11 @@ func (b SkipBlock) GetPreviousHash() []byte {
return b.BackLink.Bytes()
}

// GetPlayers implements blockchain.Block. It returns the list of players.
func (b SkipBlock) GetPlayers() mino.Players {
return b.Conodes
}

// GetVerifier implements consensus.Proposal. It returns the verifier for the
// block.
// TODO: it might have sense to remove this function.
Expand Down
8 changes: 8 additions & 0 deletions blockchain/skipchain/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,10 @@ type fakeAddress struct {
err error
}

func (a fakeAddress) Equal(other mino.Address) bool {
return bytes.Equal(other.(fakeAddress).id, a.id)
}

func (a fakeAddress) MarshalText() ([]byte, error) {
return []byte(a.id), a.err
}
Expand Down Expand Up @@ -575,6 +579,10 @@ type fakeMino struct {
err error
}

func (m fakeMino) GetAddress() mino.Address {
return fakeAddress{id: []byte{0xaa}}
}

func (m fakeMino) GetAddressFactory() mino.AddressFactory {
return fakeAddressFactory{}
}
Expand Down
4 changes: 2 additions & 2 deletions blockchain/skipchain/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ func newHandler(sc *Skipchain) handler {

// Process implements mino.Handler. It handles genesis block propagation
// messages only and return an error for any other type.
func (h handler) Process(req proto.Message) (proto.Message, error) {
switch in := req.(type) {
func (h handler) Process(req mino.Request) (proto.Message, error) {
switch in := req.Message.(type) {
case *PropagateGenesis:
factory := h.GetBlockFactory().(blockFactory)

Expand Down
15 changes: 11 additions & 4 deletions blockchain/skipchain/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/golang/protobuf/ptypes/empty"
"github.com/stretchr/testify/require"
"go.dedis.ch/fabric/mino"
"golang.org/x/xerrors"
)

Expand All @@ -21,19 +22,25 @@ func TestHandler_Process(t *testing.T) {
packed, err := block.Pack()
require.NoError(t, err)

resp, err := h.Process(&PropagateGenesis{Genesis: packed.(*BlockProto)})
req := mino.Request{
Message: &PropagateGenesis{Genesis: packed.(*BlockProto)},
}
resp, err := h.Process(req)
require.NoError(t, err)
require.Nil(t, resp)

_, err = h.Process(&empty.Empty{})
req.Message = &empty.Empty{}
_, err = h.Process(req)
require.EqualError(t, err, "unknown message type '*empty.Empty'")

_, err = h.Process(&PropagateGenesis{})
req.Message = &PropagateGenesis{}
_, err = h.Process(req)
require.Error(t, err)
require.Contains(t, err.Error(), "couldn't decode the block: ")

h.Skipchain.db = &fakeDatabase{err: xerrors.New("oops")}
_, err = h.Process(&PropagateGenesis{Genesis: packed.(*BlockProto)})
req.Message = &PropagateGenesis{Genesis: packed.(*BlockProto)}
_, err = h.Process(req)
require.EqualError(t, err, "couldn't write the block: oops")

return true
Expand Down
51 changes: 45 additions & 6 deletions blockchain/skipchain/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ package skipchain

import (
"context"
"fmt"
"time"

"github.com/golang/protobuf/proto"
"github.com/rs/zerolog"
"go.dedis.ch/fabric"
"go.dedis.ch/fabric/blockchain"
"go.dedis.ch/fabric/blockchain/viewchange"
"go.dedis.ch/fabric/consensus"
"go.dedis.ch/fabric/consensus/cosipbft"
"go.dedis.ch/fabric/cosi"
Expand All @@ -32,12 +35,15 @@ const (
// between the blocks.
//
// - implements blockchain.Blockchain
// - implements fmt.Stringer
type Skipchain struct {
mino mino.Mino
cosi cosi.CollectiveSigning
db Database
consensus consensus.Consensus
watcher blockchain.Observable
logger zerolog.Logger
mino mino.Mino
cosi cosi.CollectiveSigning
db Database
consensus consensus.Consensus
watcher blockchain.Observable
viewchange viewchange.ViewChange
}

// NewSkipchain returns a new instance of Skipchain.
Expand All @@ -46,6 +52,7 @@ func NewSkipchain(m mino.Mino, cosi cosi.CollectiveSigning) *Skipchain {
db := NewInMemoryDatabase()

return &Skipchain{
logger: fabric.Logger,
mino: m,
cosi: cosi,
db: db,
Expand All @@ -56,7 +63,9 @@ func NewSkipchain(m mino.Mino, cosi cosi.CollectiveSigning) *Skipchain {

// Listen implements blockchain.Blockchain. It registers the RPC and starts the
// consensus module.
func (s *Skipchain) Listen(v blockchain.Validator) (blockchain.Actor, error) {
func (s *Skipchain) Listen(v blockchain.PayloadProcessor) (blockchain.Actor, error) {
s.viewchange = viewchange.NewConstant(s.mino.GetAddress(), s)

actor := skipchainActor{
Skipchain: s,
hashFactory: sha256Factory{},
Expand Down Expand Up @@ -138,6 +147,12 @@ func (s *Skipchain) Watch(ctx context.Context) <-chan blockchain.Block {
return ch
}

// String implements fmt.Stringer. It returns a simple representation of the
// skipchain instance to easily identify it.
func (s *Skipchain) String() string {
return fmt.Sprintf("skipchain@%v", s.mino.GetAddress())
}

// skipchainActor provides the primitives of a blockchain actor.
//
// - implements blockchain.Actor
Expand Down Expand Up @@ -213,6 +228,11 @@ func (a skipchainActor) InitChain(data proto.Message, players mino.Players) erro
func (a skipchainActor) Store(data proto.Message, players mino.Players) error {
factory := a.GetBlockFactory().(blockFactory)

ca, ok := players.(cosi.CollectiveAuthority)
if !ok {
return xerrors.Errorf("players must implement cosi.CollectiveAuthority")
}

previous, err := a.db.ReadLast()
if err != nil {
return xerrors.Errorf("couldn't read the latest block: %v", err)
Expand All @@ -223,6 +243,25 @@ func (a skipchainActor) Store(data proto.Message, players mino.Players) error {
return xerrors.Errorf("couldn't create next block: %v", err)
}

block.Conodes = newConodes(ca)

// Wait for the view change module green signal to go through the proposal.
// If the leader has failed and this node has to take over, we use the
// inherant property of CoSiPBFT to prove that 2f participants want the view
// change.
rotation, err := a.viewchange.Wait(block)
if err == nil {
// If the node is not the current leader and a rotation is necessary, it
// will be done.
block.Conodes = rotation.(Conodes)
} else {
a.logger.Debug().Msgf("%v refusing view change: %v", a, err)
// Not authorized to propose a block as the leader is moving
// forward so we drop the proposal. The upper layer is responsible to
// try again until the leader includes the data.
return nil
}

err = a.consensus.Propose(block, players)
if err != nil {
return xerrors.Errorf("couldn't propose the block: %v", err)
Expand Down
72 changes: 58 additions & 14 deletions blockchain/skipchain/mod_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package skipchain

import (
"bytes"
"context"
"testing"

proto "github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes/empty"
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"
"go.dedis.ch/fabric/blockchain"
"go.dedis.ch/fabric/consensus"
Expand Down Expand Up @@ -35,32 +37,32 @@ func TestSkipchain_Basic(t *testing.T) {
n := 5
manager := minoch.NewManager()

c1, s1, a1 := makeSkipchain(t, "A", manager)
c2, _, a2 := makeSkipchain(t, "B", manager)
c1, _, a1 := makeSkipchain(t, "A", manager)
c2, s2, _ := makeSkipchain(t, "B", manager)
conodes := Conodes{c1, c2}

err := a1.InitChain(&empty.Empty{}, conodes)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
blocks := s1.Watch(ctx)
blocks := s2.Watch(ctx)

for i := 0; i < n; i++ {
err = a2.Store(&empty.Empty{}, conodes)
err = a1.Store(&empty.Empty{}, conodes)
require.NoError(t, err)

event := <-blocks
require.NotNil(t, event)
require.IsType(t, SkipBlock{}, event)

chain, err := s1.GetVerifiableBlock()
chain, err := s2.GetVerifiableBlock()
require.NoError(t, err)

packed, err := chain.Pack()
require.NoError(t, err)

block, err := s1.GetBlockFactory().FromVerifiable(packed)
block, err := s2.GetBlockFactory().FromVerifiable(packed)
require.NoError(t, err)
require.NotNil(t, block)
require.Equal(t, uint64(i+1), block.(SkipBlock).Index)
Expand Down Expand Up @@ -184,23 +186,48 @@ func TestActor_InitChain(t *testing.T) {
}

func TestActor_Store(t *testing.T) {
buffer := new(bytes.Buffer)
cons := &fakeConsensusActor{}
actor := skipchainActor{
Skipchain: &Skipchain{
db: &fakeDatabase{},
logger: zerolog.New(buffer),
viewchange: fakeViewChange{},
mino: fakeMino{},
db: &fakeDatabase{},
},
consensus: fakeConsensusActor{},
consensus: cons,
}

conodes := Conodes{
{addr: fakeAddress{id: []byte{0xbb}}},
{addr: fakeAddress{id: []byte{0xaa}}},
{addr: fakeAddress{id: []byte{0xcc}}},
}

err := actor.Store(&empty.Empty{}, Conodes{})
err := actor.Store(&empty.Empty{}, conodes)
require.NoError(t, err)
// Make sure the conodes rotate if the view change allows it.
require.NotNil(t, cons.prop)
prop := cons.prop.(SkipBlock)
require.Equal(t, prop.Conodes[0].GetAddress(), conodes[1].GetAddress())

err = actor.Store(&empty.Empty{}, fakePlayers{})
require.EqualError(t, err, "players must implement cosi.CollectiveAuthority")

actor.Skipchain.db = &fakeDatabase{err: xerrors.New("oops")}
err = actor.Store(&empty.Empty{}, Conodes{})
err = actor.Store(&empty.Empty{}, conodes)
require.EqualError(t, err, "couldn't read the latest block: oops")

actor.Skipchain.db = &fakeDatabase{}
actor.consensus = fakeConsensusActor{err: xerrors.New("oops")}
err = actor.Store(&empty.Empty{}, Conodes{})
actor.Skipchain.viewchange = fakeViewChange{err: xerrors.New("oops")}
err = actor.Store(&empty.Empty{}, conodes)
// A view change is ignored.
require.NoError(t, err)
require.Contains(t, buffer.String(), "skipchain@aa refusing view change: oops")

actor.Skipchain.viewchange = fakeViewChange{}
actor.consensus = &fakeConsensusActor{err: xerrors.New("oops")}
err = actor.Store(&empty.Empty{}, conodes)
require.EqualError(t, err, "couldn't propose the block: oops")
}

Expand Down Expand Up @@ -275,10 +302,12 @@ type fakePlayers struct {

type fakeConsensusActor struct {
consensus.Actor
err error
err error
prop consensus.Proposal
}

func (a fakeConsensusActor) Propose(consensus.Proposal, mino.Players) error {
func (a *fakeConsensusActor) Propose(prop consensus.Proposal, pp mino.Players) error {
a.prop = prop
return a.err
}

Expand All @@ -293,3 +322,18 @@ func (rand fakeRandGenerator) Read(buffer []byte) (int, error) {
}
return len(buffer), rand.err
}

type fakeViewChange struct {
err error
}

func (vc fakeViewChange) Wait(block blockchain.Block) (mino.Players, error) {
// Simulate a rotating view change.
players := block.GetPlayers().
Take(mino.RangeFilter(0, block.GetPlayers().Len()), mino.RotateFilter(1))
return players, vc.err
}

func (vc fakeViewChange) Verify(blockchain.Block) error {
return vc.err
}
Loading