Skip to content

Commit

Permalink
Add Raft-based consensus (#79)
Browse files Browse the repository at this point in the history
This was implemented by Joel Burget (@joelburget) and Brian Schroeder (@bts).
  • Loading branch information
joelburget authored and jpmsam committed Mar 23, 2017
1 parent 8c47c29 commit b628632
Show file tree
Hide file tree
Showing 286 changed files with 43,985 additions and 35 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ build/_vendor/pkg
*~
.project
.settings
.idea

# used by the Makefile
/build/_workspace/
Expand Down
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Key enhancements:
* __QuorumChain__ - a new consensus model based on majority voting
* __Constellation__ - a peer-to-peer encrypted message exchange
* __Peer Security__ - node/peer permissioning using smart contracts
* __Raft-based Consensus__ - a consensus model for faster blocktimes, transaction finality, and on-demand block creation

## Architecture

Expand Down Expand Up @@ -125,10 +126,11 @@ Further documentation can be found in the [docs](docs/) folder and on the [wiki]

## See also

* Quorum - https://github.com/jpmorganchase/quorum (this repository)
* Constellation - https://github.com/jpmorganchase/constellation
* quorum-examples - https://github.com/jpmorganchase/quorum-examples
* Quorum Wiki - https://github.com/jpmorganchase/quorum/wiki
* [Quorum](https://github.com/jpmorganchase/quorum): this repository
* [Constellation](https://github.com/jpmorganchase/constellation): peer-to-peer encrypted message exchange for transaction privacy
* [Raft Consensus Documentation](raft/doc.md)
* [quorum-examples](https://github.com/jpmorganchase/quorum-examples): example quorum clusters
* [Quorum Wiki](https://github.com/jpmorganchase/quorum/wiki)

## Third Party Tools/Libraries

Expand Down
10 changes: 9 additions & 1 deletion cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ participating.
utils.VoteMaxBlockTimeFlag,
utils.SingleBlockMakerFlag,
utils.EnableNodePermissionFlag,
utils.RaftModeFlag,
utils.RaftBlockTime,
}
app.Flags = append(app.Flags, debug.Flags...)

Expand Down Expand Up @@ -276,7 +278,13 @@ func startNode(ctx *cli.Context, stack *node.Node) {
unlockAccount(ctx, accman, trimmed, i, passwords)
}
}
// Start auxiliary services

if ctx.GlobalBool(utils.RaftModeFlag.Name) {
return
}

// Start auxiliary services for Quorum Chain

var ethereum *eth.Ethereum
if err := stack.Service(&ethereum); err != nil {
utils.Fatalf("ethereum service not running: %v", err)
Expand Down
54 changes: 52 additions & 2 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ import (
"github.com/ethereum/go-ethereum/p2p/nat"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/pow"
"github.com/ethereum/go-ethereum/raft"
"github.com/ethereum/go-ethereum/rpc"
whisper "github.com/ethereum/go-ethereum/whisper/whisperv2"
"gopkg.in/urfave/cli.v1"
"log"
)

func init() {
Expand Down Expand Up @@ -373,6 +375,16 @@ var (
Name: "permissioned",
Usage: "If enabled, the node will allow only a defined list of nodes to connect",
}
// Raft flags
RaftModeFlag = cli.BoolFlag{
Name: "raft",
Usage: "If enabled, uses Raft instead of Quorum Chain for consensus",
}
RaftBlockTime = cli.IntFlag{
Name: "raftblocktime",
Usage: "Amount of time between raft block creations in milliseconds",
Value: 50,
}
)

// MakeDataDir retrieves the currently requested data directory, terminating
Expand Down Expand Up @@ -655,9 +667,11 @@ func RegisterEthService(ctx *cli.Context, stack *node.Node, extra []byte) {
glog.V(logger.Info).Infoln("You're one of the lucky few that will try out the JIT VM (random). If you get a consensus failure please be so kind to report this incident with the block hash that failed. You can switch to the regular VM by setting --jitvm=false")
}

chainConfig := MakeChainConfig(ctx, stack)

ethConf := &eth.Config{
Etherbase: MakeEtherbase(stack.AccountManager(), ctx),
ChainConfig: MakeChainConfig(ctx, stack),
ChainConfig: chainConfig,
SingleBlockMaker: ctx.GlobalBool(SingleBlockMakerFlag.Name),
DatabaseCache: ctx.GlobalInt(CacheFlag.Name),
DatabaseHandles: MakeDatabaseHandles(),
Expand All @@ -670,6 +684,7 @@ func RegisterEthService(ctx *cli.Context, stack *node.Node, extra []byte) {
SolcPath: ctx.GlobalString(SolcPathFlag.Name),
VoteMinBlockTime: uint(ctx.GlobalInt(VoteMinBlockTimeFlag.Name)),
VoteMaxBlockTime: uint(ctx.GlobalInt(VoteMaxBlockTimeFlag.Name)),
RaftMode: ctx.GlobalBool(RaftModeFlag.Name),
}

// Override any default configs in dev mode or the test net
Expand All @@ -696,11 +711,46 @@ func RegisterEthService(ctx *cli.Context, stack *node.Node, extra []byte) {
state.MaxTrieCacheGen = uint16(gen)
}

// We need a pointer to the ethereum service so we can access it from the raft
// service
var ethereum *eth.Ethereum

if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
return eth.New(ctx, ethConf)
var err error
ethereum, err = eth.New(ctx, ethConf)
return ethereum, err
}); err != nil {
Fatalf("Failed to register the Ethereum service: %v", err)
}

if ctx.GlobalBool(RaftModeFlag.Name) {
blockTimeMillis := ctx.GlobalInt(RaftBlockTime.Name)
datadir := ctx.GlobalString(DataDirFlag.Name)

if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
strId := discover.PubkeyID(stack.PublicKey()).String()
blockTimeNanos := time.Duration(blockTimeMillis) * time.Millisecond
peers := stack.StaticNodes()

peerIds := make([]string, len(peers))
var myId int
for peerIdx, peer := range peers {
peerId := peer.ID.String()
peerIds[peerIdx] = peerId
if peerId == strId {
myId = peerIdx + 1
}
}

if myId == 0 {
log.Panicf("failed to find local enode ID (%v) amongst peer IDs: %v", strId, peerIds)
}

return raft.New(ctx, chainConfig, myId, blockTimeNanos, ethereum, peers, datadir)
}); err != nil {
Fatalf("Failed to register the Raft service: %v", err)
}
}
}

// RegisterShhService configures whisper and adds it to the given node.
Expand Down
25 changes: 23 additions & 2 deletions core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,19 @@ import (
"gopkg.in/fatih/set.v0"
)

func forceParseRfc3339(str string) time.Time {
time, err := time.Parse(time.RFC3339, str)
if err != nil {
panic("unexpected failure to parse rfc3339 timestamp: " + str)
}
return time
}

var (
ExpDiffPeriod = big.NewInt(100000)
big10 = big.NewInt(10)
bigMinus99 = big.NewInt(-99)
nanosecond2017Timestamp = forceParseRfc3339("2017-01-01T00:00:00+00:00").UnixNano()
)

// BlockValidator is responsible for validating block headers, uncles and
Expand Down Expand Up @@ -276,8 +285,20 @@ func ValidateHeader(chaindb ethdb.Database, bc *BlockChain, config *ChainConfig,
return BlockTSTooBigErr
}
} else {
if header.Time.Cmp(big.NewInt(time.Now().Unix())) == 1 {
return BlockFutureErr
// We disable future checking if we're in --raft mode. This is crucial
// because block validation in the raft setting needs to be deterministic.
// There is no forking of the chain, and we need each node to only perform
// validation as a pure function of block contents with respect to the
// previous database state.
//
// NOTE: whereas we are currently checking whether the timestamp field has
// nanosecond semantics to detect --raft mode, we could also use a special
// "raft" sentinel in the Extra field, or pass a boolean for raftMode from
// all call sites of this function.
if raftMode := time.Now().UnixNano() > nanosecond2017Timestamp; !raftMode {
if header.Time.Cmp(big.NewInt(time.Now().Unix())) == 1 {
return BlockFutureErr
}
}
}
if header.Time.Cmp(parent.Time) != 1 {
Expand Down
45 changes: 45 additions & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,51 @@ func (self *BlockChain) WriteBlock(block *types.Block) (status WriteStatus, err
return
}

// Only writes the block without inserting it as the head of the chain
func (self *BlockChain) WriteDetachedBlock(block *types.Block) (err error) {
self.wg.Add(1)
defer self.wg.Done()

// Calculate the total difficulty of the block
ptd := self.GetTdByHash(block.ParentHash())
if ptd == nil {
return ParentError(block.ParentHash())
}

externTd := new(big.Int).Add(block.Difficulty(), ptd)

self.mu.Lock()
defer self.mu.Unlock()

// Write the block itself to the database
if err := self.hc.WriteTd(block.Hash(), block.Number().Uint64(), externTd); err != nil {
glog.Fatalf("failed to write block total difficulty: %v", err)
}
if err := WriteBlock(self.chainDb, block); err != nil {
glog.Fatalf("failed to write block contents: %v", err)
}

self.futureBlocks.Remove(block.Hash())

return
}

// Sets a "detached block" to be the new head of the chain.
//
// See WriteDetachedBlock.
func (self *BlockChain) SetNewHeadBlock(block *types.Block) {
self.wg.Add(1)
defer self.wg.Done()

self.chainmu.Lock()
defer self.chainmu.Unlock()

self.mu.Lock()
defer self.mu.Unlock()

self.insert(block)
}

// InsertChain will attempt to insert the given chain in to the canonical chain or, otherwise, create a fork. It an error is returned
// it will return the index number of the failing block as well an error describing what went wrong (for possible errors see core/errors.go).
func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) {
Expand Down
20 changes: 14 additions & 6 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ var (
type Config struct {
ChainConfig *core.ChainConfig // chain configuration

NetworkId int // Network ID to use for selecting peers to connect to
Genesis string // Genesis JSON to seed the chain database with
SingleBlockMaker bool // Assume this node is the only node on the network allowed to create blocks
EnableNodePermission bool //Used for enabling / disabling node permissioning
NetworkId int // Network ID to use for selecting peers to connect to
Genesis string // Genesis JSON to seed the chain database with
SingleBlockMaker bool // Assume this node is the only node on the network allowed to create blocks
EnableNodePermission bool //Used for enabling / disabling node permissioning

SkipBcVersionCheck bool // e.g. blockchain export
DatabaseCache int
Expand All @@ -92,6 +92,8 @@ type Config struct {

VoteMinBlockTime uint
VoteMaxBlockTime uint

RaftMode bool
}

// Ethereum implements the Ethereum full node service.
Expand Down Expand Up @@ -204,7 +206,13 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
ForceJit: config.ForceJit,
}

eth.blockchain, err = core.NewBlockChain(chainDb, eth.chainConfig, eth.pow, eth.EventMux(), true)
// We can't swap fake pow into eth.pow: that field has to be a *ethash.Ethash.
// So we just set a variable down here, minimizing changes to upstream geth.
fakePow := core.FakePow{}

performQuorumChecks := !config.RaftMode

eth.blockchain, err = core.NewBlockChain(chainDb, eth.chainConfig, fakePow, eth.EventMux(), performQuorumChecks)
if err != nil {
if err == core.ErrNoGenesis {
return nil, fmt.Errorf(`No chain found. Please initialise a new chain using the "init" subcommand.`)
Expand All @@ -214,7 +222,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
newPool := core.NewTxPool(eth.chainConfig, eth.EventMux(), eth.blockchain.State, eth.blockchain.GasLimit)
eth.txPool = newPool

if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SingleBlockMaker, config.NetworkId, eth.eventMux, eth.txPool, eth.pow, eth.blockchain, chainDb); err != nil {
if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SingleBlockMaker, config.NetworkId, eth.eventMux, eth.txPool, fakePow, eth.blockchain, chainDb, config.RaftMode); err != nil {
return nil, err
}

Expand Down
30 changes: 25 additions & 5 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,13 @@ type ProtocolManager struct {
wg sync.WaitGroup

badBlockReportingEnabled bool

raftMode bool
}

// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the ethereum network.
func NewProtocolManager(config *core.ChainConfig, singleMiner bool, networkId int, mux *event.TypeMux, txpool txPool, pow pow.PoW, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) {
func NewProtocolManager(config *core.ChainConfig, singleMiner bool, networkId int, mux *event.TypeMux, txpool txPool, pow pow.PoW, blockchain *core.BlockChain, chaindb ethdb.Database, raftMode bool) (*ProtocolManager, error) {
// Create the protocol manager with the base fields
manager := &ProtocolManager{
networkId: networkId,
Expand All @@ -108,6 +110,7 @@ func NewProtocolManager(config *core.ChainConfig, singleMiner bool, networkId in
noMorePeers: make(chan struct{}),
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
raftMode: raftMode,
}
if singleMiner {
manager.synced = uint32(1)
Expand Down Expand Up @@ -203,9 +206,17 @@ func (pm *ProtocolManager) Start() {
// broadcast transactions
pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{})
go pm.txBroadcastLoop()
// broadcast mined blocks
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop()

if !pm.raftMode {
// broadcast mined blocks
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop()
} else {
// We set this immediately in raft mode to make sure the miner never drops
// incoming txes. Raft mode doesn't use the fetcher or downloader, and so
// this would never be set otherwise.
atomic.StoreUint32(&pm.synced, 1)
}

// start sync handlers
go pm.syncer()
Expand All @@ -216,7 +227,9 @@ func (pm *ProtocolManager) Stop() {
glog.V(logger.Info).Infoln("Stopping ethereum protocol handler...")

pm.txSub.Unsubscribe() // quits txBroadcastLoop
pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
if !pm.raftMode {
pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
}

// Quit the sync loop.
// After this send has completed, no new peers will be accepted.
Expand Down Expand Up @@ -312,6 +325,13 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
defer msg.Discard()

if pm.raftMode {
if msg.Code != TxMsg {
glog.V(logger.Debug).Infof("raft: ignoring non-TxMsg with code %v", msg.Code)
return nil
}
}

// Handle the message depending on its contents
switch {
case msg.Code == StatusMsg:
Expand Down
2 changes: 1 addition & 1 deletion eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ func testDAOChallenge(t *testing.T, localForked, remoteForked bool, timeout bool
config = &core.ChainConfig{DAOForkBlock: big.NewInt(1), DAOForkSupport: localForked}
blockchain, _ = core.NewBlockChain(db, config, pow, evmux, false)
)
pm, err := NewProtocolManager(config, true, NetworkId, evmux, new(testTxPool), pow, blockchain, db)
pm, err := NewProtocolManager(config, true, NetworkId, evmux, new(testTxPool), pow, blockchain, db, false)
if err != nil {
t.Fatalf("failed to start test protocol manager: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion eth/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func newTestProtocolManager(blocks int, generator func(int, *core.BlockGen), new
panic(err)
}

pm, err := NewProtocolManager(chainConfig, true, NetworkId, evmux, &testTxPool{added: newtx}, pow, blockchain, db)
pm, err := NewProtocolManager(chainConfig, true, NetworkId, evmux, &testTxPool{added: newtx}, pow, blockchain, db, false)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit b628632

Please sign in to comment.