diff --git a/.gitignore b/.gitignore index e53e461dc6..c12cdf83fc 100644 --- a/.gitignore +++ b/.gitignore @@ -21,6 +21,7 @@ build/_vendor/pkg *~ .project .settings +.idea # used by the Makefile /build/_workspace/ diff --git a/README.md b/README.md index 043c4986c4..7a2eaad59c 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,7 @@ Key enhancements: * __QuorumChain__ - a new consensus model based on majority voting * __Constellation__ - a peer-to-peer encrypted message exchange * __Peer Security__ - node/peer permissioning using smart contracts +* __Raft-based Consensus__ - a consensus model for faster blocktimes, transaction finality, and on-demand block creation ## Architecture @@ -125,10 +126,11 @@ Further documentation can be found in the [docs](docs/) folder and on the [wiki] ## See also -* Quorum - https://github.com/jpmorganchase/quorum (this repository) -* Constellation - https://github.com/jpmorganchase/constellation -* quorum-examples - https://github.com/jpmorganchase/quorum-examples -* Quorum Wiki - https://github.com/jpmorganchase/quorum/wiki +* [Quorum](https://github.com/jpmorganchase/quorum): this repository +* [Constellation](https://github.com/jpmorganchase/constellation): peer-to-peer encrypted message exchange for transaction privacy +* [Raft Consensus Documentation](raft/doc.md) +* [quorum-examples](https://github.com/jpmorganchase/quorum-examples): example quorum clusters +* [Quorum Wiki](https://github.com/jpmorganchase/quorum/wiki) ## Third Party Tools/Libraries diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 3c2dc63554..e0f256a3f5 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -157,6 +157,8 @@ participating. utils.VoteMaxBlockTimeFlag, utils.SingleBlockMakerFlag, utils.EnableNodePermissionFlag, + utils.RaftModeFlag, + utils.RaftBlockTime, } app.Flags = append(app.Flags, debug.Flags...) @@ -276,7 +278,13 @@ func startNode(ctx *cli.Context, stack *node.Node) { unlockAccount(ctx, accman, trimmed, i, passwords) } } - // Start auxiliary services + + if ctx.GlobalBool(utils.RaftModeFlag.Name) { + return + } + + // Start auxiliary services for Quorum Chain + var ethereum *eth.Ethereum if err := stack.Service(ðereum); err != nil { utils.Fatalf("ethereum service not running: %v", err) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 53a4d68e6a..fb8d17c6b3 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -46,9 +46,11 @@ import ( "github.com/ethereum/go-ethereum/p2p/nat" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/pow" + "github.com/ethereum/go-ethereum/raft" "github.com/ethereum/go-ethereum/rpc" whisper "github.com/ethereum/go-ethereum/whisper/whisperv2" "gopkg.in/urfave/cli.v1" + "log" ) func init() { @@ -373,6 +375,16 @@ var ( Name: "permissioned", Usage: "If enabled, the node will allow only a defined list of nodes to connect", } + // Raft flags + RaftModeFlag = cli.BoolFlag{ + Name: "raft", + Usage: "If enabled, uses Raft instead of Quorum Chain for consensus", + } + RaftBlockTime = cli.IntFlag{ + Name: "raftblocktime", + Usage: "Amount of time between raft block creations in milliseconds", + Value: 50, + } ) // MakeDataDir retrieves the currently requested data directory, terminating @@ -655,9 +667,11 @@ func RegisterEthService(ctx *cli.Context, stack *node.Node, extra []byte) { glog.V(logger.Info).Infoln("You're one of the lucky few that will try out the JIT VM (random). If you get a consensus failure please be so kind to report this incident with the block hash that failed. You can switch to the regular VM by setting --jitvm=false") } + chainConfig := MakeChainConfig(ctx, stack) + ethConf := ð.Config{ Etherbase: MakeEtherbase(stack.AccountManager(), ctx), - ChainConfig: MakeChainConfig(ctx, stack), + ChainConfig: chainConfig, SingleBlockMaker: ctx.GlobalBool(SingleBlockMakerFlag.Name), DatabaseCache: ctx.GlobalInt(CacheFlag.Name), DatabaseHandles: MakeDatabaseHandles(), @@ -670,6 +684,7 @@ func RegisterEthService(ctx *cli.Context, stack *node.Node, extra []byte) { SolcPath: ctx.GlobalString(SolcPathFlag.Name), VoteMinBlockTime: uint(ctx.GlobalInt(VoteMinBlockTimeFlag.Name)), VoteMaxBlockTime: uint(ctx.GlobalInt(VoteMaxBlockTimeFlag.Name)), + RaftMode: ctx.GlobalBool(RaftModeFlag.Name), } // Override any default configs in dev mode or the test net @@ -696,11 +711,46 @@ func RegisterEthService(ctx *cli.Context, stack *node.Node, extra []byte) { state.MaxTrieCacheGen = uint16(gen) } + // We need a pointer to the ethereum service so we can access it from the raft + // service + var ethereum *eth.Ethereum + if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) { - return eth.New(ctx, ethConf) + var err error + ethereum, err = eth.New(ctx, ethConf) + return ethereum, err }); err != nil { Fatalf("Failed to register the Ethereum service: %v", err) } + + if ctx.GlobalBool(RaftModeFlag.Name) { + blockTimeMillis := ctx.GlobalInt(RaftBlockTime.Name) + datadir := ctx.GlobalString(DataDirFlag.Name) + + if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) { + strId := discover.PubkeyID(stack.PublicKey()).String() + blockTimeNanos := time.Duration(blockTimeMillis) * time.Millisecond + peers := stack.StaticNodes() + + peerIds := make([]string, len(peers)) + var myId int + for peerIdx, peer := range peers { + peerId := peer.ID.String() + peerIds[peerIdx] = peerId + if peerId == strId { + myId = peerIdx + 1 + } + } + + if myId == 0 { + log.Panicf("failed to find local enode ID (%v) amongst peer IDs: %v", strId, peerIds) + } + + return raft.New(ctx, chainConfig, myId, blockTimeNanos, ethereum, peers, datadir) + }); err != nil { + Fatalf("Failed to register the Raft service: %v", err) + } + } } // RegisterShhService configures whisper and adds it to the given node. diff --git a/core/block_validator.go b/core/block_validator.go index c0bfd19ee7..56b18b4a3a 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -31,10 +31,19 @@ import ( "gopkg.in/fatih/set.v0" ) +func forceParseRfc3339(str string) time.Time { + time, err := time.Parse(time.RFC3339, str) + if err != nil { + panic("unexpected failure to parse rfc3339 timestamp: " + str) + } + return time +} + var ( ExpDiffPeriod = big.NewInt(100000) big10 = big.NewInt(10) bigMinus99 = big.NewInt(-99) + nanosecond2017Timestamp = forceParseRfc3339("2017-01-01T00:00:00+00:00").UnixNano() ) // BlockValidator is responsible for validating block headers, uncles and @@ -276,8 +285,20 @@ func ValidateHeader(chaindb ethdb.Database, bc *BlockChain, config *ChainConfig, return BlockTSTooBigErr } } else { - if header.Time.Cmp(big.NewInt(time.Now().Unix())) == 1 { - return BlockFutureErr + // We disable future checking if we're in --raft mode. This is crucial + // because block validation in the raft setting needs to be deterministic. + // There is no forking of the chain, and we need each node to only perform + // validation as a pure function of block contents with respect to the + // previous database state. + // + // NOTE: whereas we are currently checking whether the timestamp field has + // nanosecond semantics to detect --raft mode, we could also use a special + // "raft" sentinel in the Extra field, or pass a boolean for raftMode from + // all call sites of this function. + if raftMode := time.Now().UnixNano() > nanosecond2017Timestamp; !raftMode { + if header.Time.Cmp(big.NewInt(time.Now().Unix())) == 1 { + return BlockFutureErr + } } } if header.Time.Cmp(parent.Time) != 1 { diff --git a/core/blockchain.go b/core/blockchain.go index 63807df9b9..bc236f48c8 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -839,6 +839,51 @@ func (self *BlockChain) WriteBlock(block *types.Block) (status WriteStatus, err return } +// Only writes the block without inserting it as the head of the chain +func (self *BlockChain) WriteDetachedBlock(block *types.Block) (err error) { + self.wg.Add(1) + defer self.wg.Done() + + // Calculate the total difficulty of the block + ptd := self.GetTdByHash(block.ParentHash()) + if ptd == nil { + return ParentError(block.ParentHash()) + } + + externTd := new(big.Int).Add(block.Difficulty(), ptd) + + self.mu.Lock() + defer self.mu.Unlock() + + // Write the block itself to the database + if err := self.hc.WriteTd(block.Hash(), block.Number().Uint64(), externTd); err != nil { + glog.Fatalf("failed to write block total difficulty: %v", err) + } + if err := WriteBlock(self.chainDb, block); err != nil { + glog.Fatalf("failed to write block contents: %v", err) + } + + self.futureBlocks.Remove(block.Hash()) + + return +} + +// Sets a "detached block" to be the new head of the chain. +// +// See WriteDetachedBlock. +func (self *BlockChain) SetNewHeadBlock(block *types.Block) { + self.wg.Add(1) + defer self.wg.Done() + + self.chainmu.Lock() + defer self.chainmu.Unlock() + + self.mu.Lock() + defer self.mu.Unlock() + + self.insert(block) +} + // InsertChain will attempt to insert the given chain in to the canonical chain or, otherwise, create a fork. It an error is returned // it will return the index number of the failing block as well an error describing what went wrong (for possible errors see core/errors.go). func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) { diff --git a/eth/backend.go b/eth/backend.go index 35e3a2a5cd..0a2d431d0d 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -64,10 +64,10 @@ var ( type Config struct { ChainConfig *core.ChainConfig // chain configuration - NetworkId int // Network ID to use for selecting peers to connect to - Genesis string // Genesis JSON to seed the chain database with - SingleBlockMaker bool // Assume this node is the only node on the network allowed to create blocks - EnableNodePermission bool //Used for enabling / disabling node permissioning + NetworkId int // Network ID to use for selecting peers to connect to + Genesis string // Genesis JSON to seed the chain database with + SingleBlockMaker bool // Assume this node is the only node on the network allowed to create blocks + EnableNodePermission bool //Used for enabling / disabling node permissioning SkipBcVersionCheck bool // e.g. blockchain export DatabaseCache int @@ -92,6 +92,8 @@ type Config struct { VoteMinBlockTime uint VoteMaxBlockTime uint + + RaftMode bool } // Ethereum implements the Ethereum full node service. @@ -204,7 +206,13 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { ForceJit: config.ForceJit, } - eth.blockchain, err = core.NewBlockChain(chainDb, eth.chainConfig, eth.pow, eth.EventMux(), true) + // We can't swap fake pow into eth.pow: that field has to be a *ethash.Ethash. + // So we just set a variable down here, minimizing changes to upstream geth. + fakePow := core.FakePow{} + + performQuorumChecks := !config.RaftMode + + eth.blockchain, err = core.NewBlockChain(chainDb, eth.chainConfig, fakePow, eth.EventMux(), performQuorumChecks) if err != nil { if err == core.ErrNoGenesis { return nil, fmt.Errorf(`No chain found. Please initialise a new chain using the "init" subcommand.`) @@ -214,7 +222,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { newPool := core.NewTxPool(eth.chainConfig, eth.EventMux(), eth.blockchain.State, eth.blockchain.GasLimit) eth.txPool = newPool - if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SingleBlockMaker, config.NetworkId, eth.eventMux, eth.txPool, eth.pow, eth.blockchain, chainDb); err != nil { + if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SingleBlockMaker, config.NetworkId, eth.eventMux, eth.txPool, fakePow, eth.blockchain, chainDb, config.RaftMode); err != nil { return nil, err } diff --git a/eth/handler.go b/eth/handler.go index ffaa3f49c1..020224a2cf 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -90,11 +90,13 @@ type ProtocolManager struct { wg sync.WaitGroup badBlockReportingEnabled bool + + raftMode bool } // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable // with the ethereum network. -func NewProtocolManager(config *core.ChainConfig, singleMiner bool, networkId int, mux *event.TypeMux, txpool txPool, pow pow.PoW, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) { +func NewProtocolManager(config *core.ChainConfig, singleMiner bool, networkId int, mux *event.TypeMux, txpool txPool, pow pow.PoW, blockchain *core.BlockChain, chaindb ethdb.Database, raftMode bool) (*ProtocolManager, error) { // Create the protocol manager with the base fields manager := &ProtocolManager{ networkId: networkId, @@ -108,6 +110,7 @@ func NewProtocolManager(config *core.ChainConfig, singleMiner bool, networkId in noMorePeers: make(chan struct{}), txsyncCh: make(chan *txsync), quitSync: make(chan struct{}), + raftMode: raftMode, } if singleMiner { manager.synced = uint32(1) @@ -203,9 +206,17 @@ func (pm *ProtocolManager) Start() { // broadcast transactions pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{}) go pm.txBroadcastLoop() - // broadcast mined blocks - pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) - go pm.minedBroadcastLoop() + + if !pm.raftMode { + // broadcast mined blocks + pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) + go pm.minedBroadcastLoop() + } else { + // We set this immediately in raft mode to make sure the miner never drops + // incoming txes. Raft mode doesn't use the fetcher or downloader, and so + // this would never be set otherwise. + atomic.StoreUint32(&pm.synced, 1) + } // start sync handlers go pm.syncer() @@ -216,7 +227,9 @@ func (pm *ProtocolManager) Stop() { glog.V(logger.Info).Infoln("Stopping ethereum protocol handler...") pm.txSub.Unsubscribe() // quits txBroadcastLoop - pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop + if !pm.raftMode { + pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop + } // Quit the sync loop. // After this send has completed, no new peers will be accepted. @@ -312,6 +325,13 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } defer msg.Discard() + if pm.raftMode { + if msg.Code != TxMsg { + glog.V(logger.Debug).Infof("raft: ignoring non-TxMsg with code %v", msg.Code) + return nil + } + } + // Handle the message depending on its contents switch { case msg.Code == StatusMsg: diff --git a/eth/handler_test.go b/eth/handler_test.go index f264310b8b..bd5470620b 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -469,7 +469,7 @@ func testDAOChallenge(t *testing.T, localForked, remoteForked bool, timeout bool config = &core.ChainConfig{DAOForkBlock: big.NewInt(1), DAOForkSupport: localForked} blockchain, _ = core.NewBlockChain(db, config, pow, evmux, false) ) - pm, err := NewProtocolManager(config, true, NetworkId, evmux, new(testTxPool), pow, blockchain, db) + pm, err := NewProtocolManager(config, true, NetworkId, evmux, new(testTxPool), pow, blockchain, db, false) if err != nil { t.Fatalf("failed to start test protocol manager: %v", err) } diff --git a/eth/helper_test.go b/eth/helper_test.go index 0f60f9b548..1f552b8dfb 100644 --- a/eth/helper_test.go +++ b/eth/helper_test.go @@ -62,7 +62,7 @@ func newTestProtocolManager(blocks int, generator func(int, *core.BlockGen), new panic(err) } - pm, err := NewProtocolManager(chainConfig, true, NetworkId, evmux, &testTxPool{added: newtx}, pow, blockchain, db) + pm, err := NewProtocolManager(chainConfig, true, NetworkId, evmux, &testTxPool{added: newtx}, pow, blockchain, db, false) if err != nil { return nil, err } diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index bc1fcef94d..3bc94e03e3 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -21,6 +21,7 @@ import ( "encoding/hex" "encoding/json" "fmt" + "log" "math/big" "net/http" "strings" @@ -1193,8 +1194,12 @@ func submitTransaction(ctx context.Context, b Backend, tx *types.Transaction, si from, _ := signedTx.From() addr := crypto.CreateAddress(from, signedTx.Nonce()) glog.V(logger.Info).Infof("Tx(%s) created: %s\n", signedTx.Hash().Hex(), addr.Hex()) + // XXX(joel) use logCheckpoint + log.Printf("RAFT-CHECKPOINT TX-CREATED (%v, %v)\n", signedTx.Hash().Hex(), addr.Hex()) } else { glog.V(logger.Info).Infof("Tx(%s) to: %s\n", signedTx.Hash().Hex(), tx.To().Hex()) + // XXX(joel) use logCheckpoint + log.Printf("RAFT-CHECKPOINT TX-CREATED (%v, %v)\n", signedTx.Hash().Hex(), tx.To().Hex()) } return signedTx.Hash(), nil diff --git a/internal/web3ext/web3ext.go b/internal/web3ext/web3ext.go index 88247ad981..4cd9f907a0 100644 --- a/internal/web3ext/web3ext.go +++ b/internal/web3ext/web3ext.go @@ -24,6 +24,7 @@ var Modules = map[string]string{ "debug": Debug_JS, "ens": ENS_JS, "eth": Eth_JS, + "raft": Raft_JS, "miner": Miner_JS, "net": Net_JS, "personal": Personal_JS, @@ -512,6 +513,22 @@ web3._extend({ }); ` +const Raft_JS = ` +web3._extend({ + property: 'raft', + methods: + [ + ], + properties: + [ + new web3._extend.Property({ + name: 'role', + getter: 'raft_role' + }) + ] +}) +` + const Miner_JS = ` web3._extend({ property: 'miner', diff --git a/node/config.go b/node/config.go index 92c75933de..16318bed7e 100644 --- a/node/config.go +++ b/node/config.go @@ -349,12 +349,11 @@ func (c *Config) TrusterNodes() []*discover.Node { // parsePersistentNodes parses a list of discovery node URLs loaded from a .json // file from within the data directory. -func (c *Config) parsePersistentNodes(file string) []*discover.Node { +func (c *Config) parsePersistentNodes(path string) []*discover.Node { // Short circuit if no node config is present if c.DataDir == "" { return nil } - path := filepath.Join(c.DataDir, file) if _, err := os.Stat(path); err != nil { return nil } diff --git a/node/node.go b/node/node.go index 3a7903252c..e9411e30f6 100644 --- a/node/node.go +++ b/node/node.go @@ -17,6 +17,7 @@ package node import ( + "crypto/ecdsa" "errors" "net" "os" @@ -33,6 +34,7 @@ import ( "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/rpc" "github.com/syndtr/goleveldb/leveldb/storage" ) @@ -678,3 +680,11 @@ func (n *Node) apis() []rpc.API { }, } } + +func (n *Node) PublicKey() *ecdsa.PublicKey { + return &n.config.NodeKey().PublicKey +} + +func (n *Node) StaticNodes() []*discover.Node { + return n.config.StaticNodes() +} diff --git a/raft/api.go b/raft/api.go new file mode 100644 index 0000000000..87e65da13d --- /dev/null +++ b/raft/api.go @@ -0,0 +1,18 @@ +package raft + +type PublicRaftAPI struct { + raftService *RaftService +} + +func NewPublicRaftAPI(raftService *RaftService) *PublicRaftAPI { + return &PublicRaftAPI{raftService} +} + +func (s *PublicRaftAPI) Role() string { + role := s.raftService.raftProtocolManager.role + if (role == minterRole) { + return "minter" + } else { + return "verifier" + } +} diff --git a/raft/backend.go b/raft/backend.go new file mode 100644 index 0000000000..243351771b --- /dev/null +++ b/raft/backend.go @@ -0,0 +1,102 @@ +package raft + +import ( + "sync" + "time" + + "github.com/ethereum/go-ethereum/accounts" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/eth" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/rpc" +) + +type RaftService struct { + blockchain *core.BlockChain + chainDb ethdb.Database // Block chain database + txMu sync.Mutex + txPool *core.TxPool + accountManager *accounts.Manager + + raftProtocolManager *ProtocolManager + startPeers []*discover.Node + + // we need an event mux to instantiate the blockchain + eventMux *event.TypeMux + minter *minter +} + +type RaftNodeInfo struct { + ClusterSize int `json:"clusterSize"` + Genesis common.Hash `json:"genesis"` // SHA3 hash of the host's genesis block + Head common.Hash `json:"head"` // SHA3 hash of the host's best owned block + Role string `json:"role"` +} + +func New(ctx *node.ServiceContext, chainConfig *core.ChainConfig, id int, blockTime time.Duration, e *eth.Ethereum, startPeers []*discover.Node, datadir string) (*RaftService, error) { + service := &RaftService{ + eventMux: ctx.EventMux, + chainDb: e.ChainDb(), + blockchain: e.BlockChain(), + txPool: e.TxPool(), + accountManager: e.AccountManager(), + startPeers: startPeers, + } + + service.minter = newMinter(chainConfig, service, blockTime) + + var err error + if service.raftProtocolManager, err = NewProtocolManager(id, service.blockchain, service.eventMux, startPeers, datadir, service.minter); err != nil { + return nil, err + } + + return service, nil +} + +// Backend interface methods +func (service *RaftService) AccountManager() *accounts.Manager { return service.accountManager } +func (service *RaftService) BlockChain() *core.BlockChain { return service.blockchain } +func (service *RaftService) ChainDb() ethdb.Database { return service.chainDb } +func (service *RaftService) DappDb() ethdb.Database { return nil } +func (service *RaftService) EventMux() *event.TypeMux { return service.eventMux } +func (service *RaftService) TxPool() *core.TxPool { return service.txPool } + +// node.Service interface methods +func (service *RaftService) Protocols() []p2p.Protocol { return []p2p.Protocol{} } +func (service *RaftService) APIs() []rpc.API { + return []rpc.API{ + { + Namespace: "raft", + Version: "1.0", + Service: NewPublicRaftAPI(service), + Public: true, + }, + } +} + +// Start implements node.Service, starting the background data propagation thread +// of the protocol. +func (service *RaftService) Start(*p2p.Server) error { + service.raftProtocolManager.Start() + return nil +} + +// Stop implements node.Service, stopping the background data propagation thread +// of the protocol. +func (service *RaftService) Stop() error { + service.blockchain.Stop() + service.raftProtocolManager.Stop() + service.eventMux.Stop() + + service.chainDb.Close() + + glog.V(logger.Info).Infoln("Raft stopped") + return nil +} diff --git a/raft/constants.go b/raft/constants.go new file mode 100644 index 0000000000..eb865383f0 --- /dev/null +++ b/raft/constants.go @@ -0,0 +1,39 @@ +package raft + +import ( + etcdRaft "github.com/coreos/etcd/raft" +) + +const ( + protocolName = "raft" + protocolVersion uint64 = 0x01 + + raftMsg = 0x00 + + minterRole = etcdRaft.LEADER + verifierRole = etcdRaft.NOT_LEADER + + // Raft's ticker interval + tickerMS = 100 + + // We use a bounded channel of constant size buffering incoming messages + msgChanSize = 1000 + + // Snapshot after this many raft messages + // + // TODO: measure and get this as low as possible without affecting performance + // + snapshotPeriod = 250 + + peerUrlKeyPrefix = "peerUrl-" + + // checkpoints + txCreated = "TX-CREATED" + txAccepted = "TX-ACCEPTED" + becameMinter = "BECAME-MINTER" + becameVerifier = "BECAME-VERIFIER" +) + +var ( + appliedDbKey = []byte("applied") +) diff --git a/raft/doc.md b/raft/doc.md new file mode 100644 index 0000000000..323ef117b1 --- /dev/null +++ b/raft/doc.md @@ -0,0 +1,170 @@ +# Raft-based consensus for Ethereum/Quorum + +## Introduction + +This directory holds an implementation of a [Raft](https://raft.github.io)-based consensus mechanism (using [etcd](https://github.com/coreos/etcd)'s [Raft implementation](https://github.com/coreos/etcd/tree/master/raft)) as an alternative to Ethereum's default proof-of-work. This is useful for closed-membership/consortium settings where byzantine fault tolerance is not a requirement, and there is a desire for faster blocktimes (on the order of milliseconds instead of seconds) and transaction finality (the absence of forking.) + +When the `geth` binary is passed the `--raft` flag, the node will operate in "raft mode." + +Currently Raft-based consensus requires that all nodes in the cluster are configured to list the others up-front as [static peers](https://github.com/ethereum/go-ethereum/wiki/Connecting-to-the-network#static-nodes). We will be adding support for dynamic membership changes in the near future. + +## Some implementation basics + +Note: Though we use the etcd implementation of the Raft protocol, we speak of "Raft" more broadly to refer to the Raft protocol, and its use to achieve consensus for Quorum/Ethereum. + +Both Raft and Ethereum have their own notion of a "node": + +In Raft, a node in normal operation is either a "leader" or a "follower." There is a single leader for the entire cluster, which all log entries must flow through. There's also the concept of a "candidate", but only during leader election. We won't go into more detail about Raft here, because by design these details are opaque to applications built on it. + +In vanilla Ethereum, there is no such thing as a "leader" or "follower." It's possible for any node in the network to mine a new block -- which is akin to being the leader for that round. + +In Raft-based consensus, we impose a one-to-one correspondence between Raft and Ethereum nodes: each Ethereum node is also a Raft node, and by convention, the leader of the Raft cluster is the only Ethereum node that should mine (or "mint") new blocks. A minter is responsible for bundling transactions into a block just like an Ethereum miner, but does not present a proof of work. + +The main reasons we co-locate the leader and minter are (1) convenience, in that Raft ensures there is only one leader at a time, and (2) to avoid a network hop from a node minting blocks to the leader, through which all Raft writes must flow. Our implementation watches Raft leadership changes -- if a node becomes a leader it will start minting, and if a node loses its leadership, it will stop minting. + +An observant reader might note that during raft leadership transitions, there could be a small period of time where more than one node might assume that it has minting duties; we detail how correctness is preserved in more detail later in this document. + +We use the existing Ethereum p2p transport layer to communicate transactions between nodes, but we communicate blocks only through the Raft transport layer. They are created by the minter and flow from there to the rest of the cluster, always in the same order, via Raft. + +Ethereum | Raft +-------- | ---- +minter | leader +verifier | follower + +When the minter creates a block, unlike in vanilla Ethereum where the block is written to the database and immediately considered the new head of the chain, we write the block as "detached" from the chain. We only set the new head of the chain once the block has flown through Raft. All nodes will extend the chain together in lock-step as they "apply" their Raft log. + +From the point of view of Ethereum, Raft is integrated via an implementation of the `Service` interface in node/service.go: "an individual protocol that can be registered into a node". Other examples of services are `Ethereum`, `ReleaseService`, and `Whisper`. + +## The lifecycle of a transaction + +Let's follow the lifecycle of a typical transaction: + +#### on any node (whether minter or verifier): + +1. The transaction is submitted via an RPC call to geth. +2. Using the existing (p2p) transaction propagation mechanism in Ethereum, the transaction is announced to all peers and, because our cluster is currently configured to use "static nodes," every transaction is sent to all peers in the cluster. + +#### on the minter: + +3. It reaches the minter, where it's included in the next block (see `mintNewBlock`) via the transaction pool. +4. Block creation triggers a `NewMinedBlockEvent`, which the Raft protocol manager receives via its subscription `minedBlockSub`. The `minedBroadcastLoop` (in raft/handler.go) puts this new block to the `ProtocolManager.proposeC` channel. +5. `serveInternal` is waiting at the other end of the channel. Its job is to RLP-encode blocks and propose them to Raft. Once it flows through Raft, this block will likely become the new head of the blockchain (on all nodes.) + +#### on every node: + +6. _At this point, Raft comes to consensus and appends the log entry containing our block to the Raft log. (The way this happens at the Raft layer is that the leader sends an `AppendEntries` to all followers, and they acknowledge receipt of the message. Once the leader has received a quorum of such acknowledgements, it notifies each node that this new entry has been committed permanently to the log)._ + +7. Having crossed the network through Raft, the block reaches the `eventLoop` (which processes new Raft log entries.) It has arrived from the leader through `pm.transport`, an instance of [`rafthttp.Transport`](https://godoc.org/github.com/coreos/etcd/rafthttp#Transport). + +8. The block is now handled by `applyNewChainHead`. This method checks whether the block extends the chain (i.e. it's parent is the current head of the chain; see below). If it does not extend the chain, it is simply ignored as a no-op. If it does extend chain, the block is validated and then written as the new head of the chain by `SetNewHeadBlock` (in blockchain.go). + +9. A `ChainHeadEvent` is posted to notify listeners that a new block has been accepted. This is relevant to us because: +* It removes the relevant transaction from the transaction pool. +* It removes the relevant transaction from `speculativeChain`'s `proposedTxes` (see below). +* It triggers `requestMinting` in (minter.go), telling the node to schedule the minting of a new block if any more transactions are pending. + +The transaction is now available on all nodes in the cluster with complete finality. Because raft guarantees a single ordering of entries stored in its log, and because everything that is committed is guaranteed to remain so, there is no forking of the blockchain built upon Raft. + +## Chain extension, races, and correctness + +Raft is responsible for reaching consensus on which blocks should be accepted into the chain. In the simplest possible scenario, every subsequent block that passes through Raft becomes the new head of the chain. + +However, there are rare scenarios in which we can encounter a new block that has passed through Raft that we can not crown as the new head of the chain. In these cases, when applying the raft log in-order, if we come across a block whose parent is not currently the head of the chain, we simply skip the log entry as a no-op. + +The most common case where this can occur is during leadership changes. The leader can be thought of as a recommendation or proxy for who should mint -- and it is generally true that there is only a single minter -- but we do not rely on the maximum of one concurrent minter for correctness. During such a transition it's possible that two nodes are both minting for a short period of time. In this scenario there will be a race, the first block that successfully extends the chain will win, and the loser of the race will be ignored. + +Consider the following example where this might occur, where Raft entries attempting to extend the chain are denoted like: + +`[ 0xbeda Parent: 0xacaa ]` + +Where `0xbeda` is the ID of new block, and `0xaa` is the ID of its parent. Here, the initial minter (node 1) is partitioned, and node 2 takes over as the minter. + +``` + time block submissions + node 1 node 2 + | [ 0xbeda Parent: 0xacaa ] + | + | -- 1 is partitioned; 2 takes over as leader/minter -- + | + | [ 0x2c52 Parent: 0xbeda ] [ 0xf0ec Parent: 0xbeda ] + | [ 0x839c Parent: 0xf0ec ] + | + | -- 1 rejoins -- + | + v [ 0x8b37 Parent: 0x8b37 ] +``` + +Once the partition heals, at the Raft layer node1 will resubmit `0x2c52`, and the resulting serialized log might look as follows: + +``` +[ 0xbeda Parent: 0xacaa - Extends! ] (due to node 1) +[ 0xf0ec Parent: 0xbeda - Extends! ] (due to node 2; let's call this the "winner") +[ 0x839c Parent: 0xf0ec - Extends! ] (due to node 2) +[ 0x2c52 Parent: 0xbeda - NO-OP. ] (due to node 1; let's call this the "loser") +[ 0x8b37 Parent: 0x8b37 - Extends! ] (due to node 2) +``` + +Due to being serialized after the "winner," the "loser" entry will not extend the chain, because its parent (`0xbeda`) is no longer at the head of the chain when we apply the entry. The "winner" extended the same parent (`0xbeda`) earlier (and then `0x839c` extended it further.) + +Note that each block is accepted by Raft and serialized in the log, and that this "Extends"/"No-op" designation occurs at a higher level in our implementation. From Raft's point of view, each log entry is valid, but at the Quorum-Raft level, we choose which entries will be "used," and will actually extend the chain. This chain extension logic is deterministic: the same exact behavior will occur on every single node in the cluster, keeping the blockchain in sync. + +Also note how our approach differs from the "longest valid chain" (LVC) mechanism from vanilla Ethereum. LVC is used to resolve forks in a network that is eventually consistent. Because we use Raft, the state of the blockchain is strongly consistent. There can not be forks in the Raft setting. Once a block has been added as the new head of the chain, it is done so for the entire cluster, and it is permanent. + +## Minting frequency + +As a default, we mint blocks no more frequently than every 50ms. When new transactions come in we will mint a new block immediately (so latency is low), but we will only mint a block if it's been at least 50ms since the last block (so we don't flood raft with blocks). This rate limiting achieves a balance between transaction throughput and latency. + +This default of 50ms is configurable via the `--raftblocktime` flag to geth. + +## Speculative minting + +One of the ways our approach differs from vanilla Ethereum is that we introduce a new concept of "speculative minting." This is not strictly required for the core functionality of Raft-based Ethereum consensus, but rather it is an optimization that affords lower latency between blocks (or: faster transaction "finality.") + +It takes some time for a block to flow through Raft (consensus) to become the head of the chain. If we synchronously waited for a block to become the new head of the chain before creating the new block, any transactions that we receive would take more time to make it into the chain. + +In speculative minting we allow the creation of a new block (and its proposal to raft) before its parent has made it all the way through Raft and into the blockchain. + +Since this can happen repeatedly, these blocks (which each have a reference to their parent block) can form a sort of chain. We call this a "speculative chain." + +During the course of operation that a speculative chain forms, we keep track of the subset of transactions in the pool that we have already put into blocks (in the speculative chain) that have not yet made it into the blockchain (and whereupon a `core.ChainHeadEvent` occurs.) These are called "proposed transactions" (see speculative_chain.go). + +Per the presence of "races" (as we detail above), it is possible that a block somewhere in the middle of a speculative chain ends up not making into the chain. In this scenario an `InvalidRaftOrdering` event will occur, and we clean up the state of the speculative chain accordingly. + +There is currently no limit to the length of these speculative chains, but we plan to add support for this in the future. As a consequence, a minter can currently create arbitrarily many blocks back-to-back in a scenario where Raft stops making progress. + +### State in a speculative chain + +* `head`: The last-created speculative block. This can be `nil` if the last-created block is already included in the blockchain. +* `proposedTxes`: The set of transactions which have been proposed to Raft in some block, but not yet included in the blockchain. +* `unappliedBlocks`: A queue of blocks which have been proposed to Raft but not yet committed to the blockchain. + - When minting a new block, we enqueue it at the end of this queue + - `accept` is called to remove the oldest speculative block when it's accepted into the blockchain. + - When an `InvalidRaftOrdering` occurs, we unwind the queue by popping the most recent blocks from the "new end" of the queue until we find the invalid block. We must repeatedly remove these "newer" speculative blocks because they are all dependent on a block that we know has not been included in the blockchain. +* `expectedInvalidBlockHashes`: The set of blocks which build on an invalid block, but haven't passsed through Raft yet. We remove these as we get them back. When these non-extending blocks come back through Raft we remove them from the speculative chain. We use this set as a "guard" against trying to trim the speculative chain when we shouldn't. + +## The Raft transport layer + +We communicate blocks over the HTTP transport layer built in to etcd Raft. It's also (at least theoretically) possible to use p2p protocol built-in to Ethereum as a transport for Raft. In our testing we found the default etcd HTTP transport to be more reliable than the p2p (at least as implemented in geth) under high load. + +## FAQ + +### Could you have a single- or two-node cluster? More generally, could you have an even number of nodes? + +A cluster can tolerate failures that leave a quorum (majority) available. So a cluster of two nodes can't tolerate any failures, three nodes can tolerate one, and five nodes can tolerate two. Typically Raft clusters have an odd number of nodes, since an even number provides no failure tolerance benefit. + +### What happens if you don't assume minter and leader are the same node? + +There's no hard reason they couldn't be different. We just co-locate the minter and leader as an optimization. + +* It saves one network call communicating the block to the leader. +* It provides a simple way to choose a minter. If we didn't use the Raft leader we'd have to build in "minter election" at a higher level. + +Additionally there could even be multiple minters running at the same time, but this would produce contention for which blocks actually extend the chain, reducing the productivity of the cluster (see "races" above). + +### I thought there were no forks in a Raft-based blockchain. What's the deal with "speculative mining"? + +"Speculative chains" are not forks in the blockchain. They represent a series ("chain") of blocks that have been sent through Raft, after which each of the blocks may or may not actually end up being included in *the blockchain*. + +### Can transactions be reversed? Since raft log entries can be disregarded as "no-ops", does this imply transaction reversal? + +No. When a Raft log entry containing a new block is disregarded as a "no-op", its transactions will remain in the transaction pool, and so they will be included in a future block in the chain. \ No newline at end of file diff --git a/raft/events.go b/raft/events.go new file mode 100644 index 0000000000..b2a9020d49 --- /dev/null +++ b/raft/events.go @@ -0,0 +1,13 @@ +package raft + +import ( + "github.com/ethereum/go-ethereum/core/types" +) + +type InvalidRaftOrdering struct { + // Current head of the chain + headBlock *types.Block + + // New block that should point to the head, but doesn't + invalidBlock *types.Block +} diff --git a/raft/handler.go b/raft/handler.go new file mode 100644 index 0000000000..80ac879b2d --- /dev/null +++ b/raft/handler.go @@ -0,0 +1,661 @@ +// Overview of the channels used in this module: +// +// Node. +// * quitSync: *Every* channel operation can be unblocked by closing this +// channel. +// +// ProtocolManager. +// * proposeC, for proposals flowing from ethereum to raft +// * confChangeC, currently unused; in the future for adding new, non-initial, raft peers +// * roleC, coming from raft notifies us when our role changes +package raft + +import ( + "fmt" + "math/big" + "net/http" + "net/url" + "os" + "strconv" + "sync" + "time" + + "golang.org/x/net/context" + + "github.com/coreos/etcd/pkg/fileutil" + "github.com/coreos/etcd/snap" + "github.com/coreos/etcd/wal" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/downloader" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/rlp" + + "github.com/coreos/etcd/etcdserver/stats" + raftTypes "github.com/coreos/etcd/pkg/types" + etcdRaft "github.com/coreos/etcd/raft" + "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/rafthttp" + "github.com/syndtr/goleveldb/leveldb" +) + +type ProtocolManager struct { + // peers note -- each node tracks the peers acknowledged by raft + // + // only the leader proposes `ConfChangeAddNode` for each peer in the first set + // but not in the second. this is done: + // * when a node becomes leader + // * when the leader learns of new peers + + // This node's raft id + id int + + // set of currently active peers known to the raft cluster. this includes self + raftPeers []etcdRaft.Peer + peerUrls []string + p2pNodes []*discover.Node + + blockchain *core.BlockChain + + // to protect the raft peers and addresses + mu sync.RWMutex + + eventMux *event.TypeMux + minedBlockSub event.Subscription + + downloader *downloader.Downloader + peerGetter func() (string, *big.Int) + + rawNode etcdRaft.Node + raftStorage *etcdRaft.MemoryStorage + + transport *rafthttp.Transport + httpstopc chan struct{} + httpdonec chan struct{} + + // The number of entries applied to the raft log + appliedIndex uint64 + + // The index of the latest snapshot. + snapshotIndex uint64 + + // Snapshotting + snapshotter *snap.Snapshotter + snapdir string + confState raftpb.ConfState + + // write-ahead log + waldir string + wal *wal.WAL + + // Persistence outside of the blockchain and raft log to keep track of our + // last-applied raft index and raft peer URLs. + quorumRaftDb *leveldb.DB + + proposeC chan *types.Block + confChangeC chan raftpb.ConfChange + quitSync chan struct{} + + // Note: we don't actually use this field. We just set it at the same time as + // starting or stopping the miner in notifyRoleChange. We might want to remove + // it, but it might also be useful to check. + role int + + minter *minter +} + +// +// Public interface +// + +func NewProtocolManager(id int, blockchain *core.BlockChain, mux *event.TypeMux, peers []*discover.Node, datadir string, minter *minter) (*ProtocolManager, error) { + waldir := fmt.Sprintf("%s/raft-wal", datadir) + snapdir := fmt.Sprintf("%s/raft-snap", datadir) + quorumRaftDbLoc := fmt.Sprintf("%s/quorum-raft-state", datadir) + + peerUrls := makePeerUrls(peers) + manager := &ProtocolManager{ + raftPeers: makeRaftPeers(peerUrls), + peerUrls: peerUrls, + p2pNodes: peers, + blockchain: blockchain, + eventMux: mux, + proposeC: make(chan *types.Block), + confChangeC: make(chan raftpb.ConfChange), + httpstopc: make(chan struct{}), + httpdonec: make(chan struct{}), + waldir: waldir, + snapdir: snapdir, + snapshotter: snap.New(snapdir), + id: id, + quitSync: make(chan struct{}), + raftStorage: etcdRaft.NewMemoryStorage(), + minter: minter, + } + + if db, err := openQuorumRaftDb(quorumRaftDbLoc); err != nil { + return nil, err + } else { + manager.quorumRaftDb = db + } + + return manager, nil +} + +func (pm *ProtocolManager) Start() { + glog.V(logger.Info).Infoln("starting raft protocol handler") + + pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) + go pm.minedBroadcastLoop(pm.proposeC) + pm.startRaftNode() +} + +func (pm *ProtocolManager) Stop() { + glog.V(logger.Info).Infoln("stopping raft protocol handler...") + + pm.minedBlockSub.Unsubscribe() + + pm.transport.Stop() + close(pm.httpstopc) + <-pm.httpdonec + close(pm.quitSync) + if pm.rawNode != nil { + pm.rawNode.Stop() + } + + pm.quorumRaftDb.Close() + + pm.minter.stop() + + glog.V(logger.Info).Infoln("raft protocol handler stopped") +} + +func (pm *ProtocolManager) NodeInfo() *RaftNodeInfo { + pm.mu.RLock() // as we read pm.role + defer pm.mu.RUnlock() + + var roleDescription string + if pm.role == minterRole { + roleDescription = "minter" + } else { + roleDescription = "verifier" + } + + return &RaftNodeInfo{ + ClusterSize: len(pm.raftPeers), + Genesis: pm.blockchain.Genesis().Hash(), + Head: pm.blockchain.CurrentBlock().Hash(), + Role: roleDescription, + } +} + +// +// MsgWriter interface (necessary for p2p.Send) +// + +func (pm *ProtocolManager) WriteMsg(msg p2p.Msg) error { + // read *into* buffer + var buffer = make([]byte, msg.Size) + msg.Payload.Read(buffer) + + return pm.rawNode.Propose(context.TODO(), buffer) +} + +// +// Raft interface +// + +func (pm *ProtocolManager) Process(ctx context.Context, m raftpb.Message) error { + return pm.rawNode.Step(ctx, m) +} + +func (pm *ProtocolManager) IsIDRemoved(id uint64) bool { + // TODO: implement this in the future once we support dynamic cluster membership + + glog.V(logger.Info).Infof("reporting that raft ID %d is not removed", id) + + return false +} + +func (pm *ProtocolManager) ReportUnreachable(id uint64) { + glog.V(logger.Warn).Infof("peer %d is currently unreachable", id) + pm.rawNode.ReportUnreachable(id) +} + +func (pm *ProtocolManager) ReportSnapshot(id uint64, status etcdRaft.SnapshotStatus) { + glog.V(logger.Info).Infof("status of last-sent snapshot: %v", status) + pm.rawNode.ReportSnapshot(id, status) +} + +// +// Private methods +// + +func (pm *ProtocolManager) startRaftNode() { + if !fileutil.Exist(pm.snapdir) { + if err := os.Mkdir(pm.snapdir, 0750); err != nil { + glog.Fatalf("cannot create dir for snapshot (%v)", err) + } + } + + walExisted := wal.Exist(pm.waldir) + + pm.wal = pm.replayWAL() + + // NOTE: cockroach sets this to false for now until they've "worked out the + // bugs" + enablePreVote := true + + lastAppliedIndex := pm.loadAppliedIndex() + + c := &etcdRaft.Config{ + Applied: lastAppliedIndex, + ID: uint64(pm.id), + ElectionTick: 10, // NOTE: cockroach sets this to 15 + HeartbeatTick: 1, // NOTE: cockroach sets this to 5 + Storage: pm.raftStorage, + + // NOTE, from cockroach: + // "PreVote and CheckQuorum are two ways of achieving the same thing. + // PreVote is more compatible with quiesced ranges, so we want to switch + // to it once we've worked out the bugs." + PreVote: enablePreVote, + CheckQuorum: !enablePreVote, + + // MaxSizePerMsg controls how many Raft log entries the leader will send to + // followers in a single MsgApp. + MaxSizePerMsg: 4096, // NOTE: in cockroachdb this is 16*1024 + + // MaxInflightMsgs controls how many in-flight messages Raft will send to + // a follower without hearing a response. The total number of Raft log + // entries is a combination of this setting and MaxSizePerMsg. + // + // NOTE: Cockroach's settings (MaxSizePerMsg of 4k and MaxInflightMsgs + // of 4) provide for up to 64 KB of raft log to be sent without + // acknowledgement. With an average entry size of 1 KB that translates + // to ~64 commands that might be executed in the handling of a single + // etcdraft.Ready operation. + MaxInflightMsgs: 256, // NOTE: in cockroachdb this is 4 + } + + glog.V(logger.Info).Infof("local raft ID is %v", c.ID) + + ss := &stats.ServerStats{} + ss.Initialize() + + pm.transport = &rafthttp.Transport{ + ID: raftTypes.ID(pm.id), + ClusterID: 0x1000, + Raft: pm, + ServerStats: ss, + LeaderStats: stats.NewLeaderStats(strconv.Itoa(pm.id)), + ErrorC: make(chan error), + } + + pm.transport.Start() + + if walExisted { + pm.reconnectToPreviousPeers() + + pm.rawNode = etcdRaft.RestartNode(c) + } else { + if numPeers := len(pm.raftPeers); numPeers == 0 { + panic("exiting due to empty raft peers list") + } else { + glog.V(logger.Info).Infof("starting raft with %v total peers.", numPeers) + } + + pm.rawNode = etcdRaft.StartNode(c, pm.raftPeers) + } + + go pm.serveRaft() + go pm.serveInternal(pm.proposeC, pm.confChangeC) + go pm.eventLoop() + go pm.handleRoleChange(pm.rawNode.RoleChan().Out()) +} + +func (pm *ProtocolManager) serveRaft() { + urlString := fmt.Sprintf("http://0.0.0.0:%d", nodeHttpPort(pm.p2pNodes[pm.id-1])) + url, err := url.Parse(urlString) + if err != nil { + glog.Fatalf("Failed parsing URL (%v)", err) + } + + listener, err := newStoppableListener(url.Host, pm.httpstopc) + if err != nil { + glog.Fatalf("Failed to listen rafthttp (%v)", err) + } + err = (&http.Server{Handler: pm.transport.Handler()}).Serve(listener) + select { + case <-pm.httpstopc: + default: + glog.Fatalf("Failed to serve rafthttp (%v)", err) + } + close(pm.httpdonec) +} + +func (pm *ProtocolManager) handleRoleChange(roleC <-chan interface{}) { + for { + select { + case role := <-roleC: + intRole, ok := role.(int) + + if !ok { + panic("Couldn't cast role to int") + } + + if intRole == minterRole { + logCheckpoint(becameMinter, "") + pm.minter.start() + } else { // verifier + logCheckpoint(becameVerifier, "") + pm.minter.stop() + } + + pm.mu.Lock() + pm.role = intRole + pm.mu.Unlock() + + case <-pm.quitSync: + return + } + } +} + +func (pm *ProtocolManager) minedBroadcastLoop(proposeC chan<- *types.Block) { + for obj := range pm.minedBlockSub.Chan() { + switch ev := obj.Data.(type) { + case core.NewMinedBlockEvent: + select { + case proposeC <- ev.Block: + case <-pm.quitSync: + return + } + } + } +} + +// serve two channels (proposeC, confChangeC) to handle changes originating +// internally +func (pm *ProtocolManager) serveInternal(proposeC <-chan *types.Block, confChangeC <-chan raftpb.ConfChange) { + // + // TODO: does it matter that this will restart from 0 whenever we restart a cluster? + // + var confChangeCount uint64 + + for { + select { + case block, ok := <-proposeC: + if !ok { + glog.V(logger.Info).Infoln("error: read from proposeC failed") + return + } + + size, r, err := rlp.EncodeToReader(block) + if err != nil { + panic(fmt.Sprintf("error: failed to send RLP-encoded block: %s", err.Error())) + } + var buffer = make([]byte, uint32(size)) + r.Read(buffer) + + // blocks until accepted by the raft state machine + pm.rawNode.Propose(context.TODO(), buffer) + case cc, ok := <-confChangeC: + if !ok { + glog.V(logger.Info).Infoln("error: read from confChangeC failed") + return + } + + confChangeCount++ + cc.ID = confChangeCount + pm.rawNode.ProposeConfChange(context.TODO(), cc) + case <-pm.quitSync: + return + } + } +} + +func (pm *ProtocolManager) entriesToApply(ents []raftpb.Entry) (nents []raftpb.Entry) { + if len(ents) == 0 { + return + } + + first := ents[0].Index + lastApplied := pm.appliedIndex + + if first > lastApplied+1 { + glog.Fatalf("first index of committed entry[%d] should <= appliedIndex[%d] + 1", first, lastApplied) + } + + firstToApply := lastApplied - first + 1 + + if firstToApply < uint64(len(ents)) { + nents = ents[firstToApply:] + } + return +} + +func (pm *ProtocolManager) addPeer(nodeId uint64, peerUrl string) { + pm.transport.AddPeer(raftTypes.ID(nodeId), []string{peerUrl}) +} + +func (pm *ProtocolManager) removePeer(nodeId uint64) { + pm.transport.RemovePeer(raftTypes.ID(nodeId)) +} + +func (pm *ProtocolManager) reconnectToPreviousPeers() { + _, confState, _ := pm.raftStorage.InitialState() + + for _, nodeId := range confState.Nodes { + peerUrl := pm.loadPeerUrl(nodeId) + + if nodeId != uint64(pm.id) { + pm.addPeer(nodeId, peerUrl) + } + } +} + +func (pm *ProtocolManager) eventLoop() { + ticker := time.NewTicker(tickerMS * time.Millisecond) + defer ticker.Stop() + defer pm.wal.Close() + + for { + select { + case <-ticker.C: + pm.rawNode.Tick() + + // when the node is first ready it gives us entries to commit and messages + // to immediately publish + case rd := <-pm.rawNode.Ready(): + pm.wal.Save(rd.HardState, rd.Entries) + + if snap := rd.Snapshot; !etcdRaft.IsEmptySnap(snap) { + pm.saveSnapshot(snap) + pm.applySnapshot(snap) + } + + // 1: Write HardState, Entries, and Snapshot to persistent storage if they + // are not empty. + pm.raftStorage.Append(rd.Entries) + + // 2: Send all Messages to the nodes named in the To field. + pm.transport.Send(rd.Messages) + + // 3: Apply Snapshot (if any) and CommittedEntries to the state machine. + for _, entry := range pm.entriesToApply(rd.CommittedEntries) { + switch entry.Type { + case raftpb.EntryNormal: + if len(entry.Data) == 0 { + break + } + var block types.Block + err := rlp.DecodeBytes(entry.Data, &block) + if err != nil { + glog.V(logger.Error).Infoln("error decoding block: ", err) + } + pm.applyNewChainHead(&block) + + case raftpb.EntryConfChange: + var cc raftpb.ConfChange + cc.Unmarshal(entry.Data) + + // We lock access to this, in case we want to read the list of + // cluster members concurrently via RPC (e.g. from NodeInfo()): + pm.mu.Lock() + pm.confState = *pm.rawNode.ApplyConfChange(cc) + pm.mu.Unlock() + + switch cc.Type { + case raftpb.ConfChangeAddNode: + glog.V(logger.Info).Infof("adding peer %v due to ConfChangeAddNode", cc.NodeID) + + nodeId := cc.NodeID + peerUrl := string(cc.Context) + + if nodeId != uint64(pm.id) { + pm.addPeer(nodeId, peerUrl) + } + + pm.writePeerUrl(nodeId, peerUrl) + + case raftpb.ConfChangeRemoveNode: + glog.V(logger.Info).Infof("removing peer %v due to ConfChangeRemoveNode", cc.NodeID) + + if cc.NodeID == uint64(pm.id) { + glog.V(logger.Warn).Infoln("removing self from the cluster due to ConfChangeRemoveNode") + + pm.advanceAppliedIndex(entry.Index) + + // TODO: we might want to completely exit(0) geth here + return + } + + pm.removePeer(cc.NodeID) + + case raftpb.ConfChangeUpdateNode: + glog.Fatalln("not yet handled: ConfChangeUpdateNode") + } + + // We force a snapshot here to persist our updated confState, so we + // know our fellow cluster members when we come back online. + // + // It is critical here to snapshot *before* writing our applied + // index in LevelDB, otherwise a crash while/before snapshotting + // (after advancing our applied index) would result in the loss of a + // cluster member upon restart: we would re-mount with an old + // ConfState. + pm.triggerSnapshotWithNextIndex(entry.Index) + } + + pm.advanceAppliedIndex(entry.Index) + } + + // 4: Call Node.Advance() to signal readiness for the next batch of + // updates. + pm.maybeTriggerSnapshot() + pm.rawNode.Advance() + + case <-pm.quitSync: + return + } + } +} + +func makeRaftPeers(urls []string) []etcdRaft.Peer { + peers := make([]etcdRaft.Peer, len(urls)) + for i, url := range urls { + peerId := i + 1 + + peers[i] = etcdRaft.Peer{ + ID: uint64(peerId), + Context: []byte(url), + } + } + return peers +} + +func nodeHttpPort(node *discover.Node) uint16 { + // + // TODO: we should probably read this from the commandline, but it's a little trickier because we wouldn't be + // accepting a single port like with --port or --rpcport; we'd have to ask for a base HTTP port (e.g. 50400) + // with the convention/understanding that the port used by each node would be base + raft ID, which quorum is + // otherwise not aware of. + // + return 20000 + node.TCP +} + +func makePeerUrls(nodes []*discover.Node) []string { + urls := make([]string, len(nodes)) + for i, node := range nodes { + ip := node.IP.String() + port := nodeHttpPort(node) + urls[i] = fmt.Sprintf("http://%s:%d", ip, port) + } + + return urls +} + +func sleep(duration time.Duration) { + <-time.NewTimer(duration).C +} + +func logCheckpoint(checkpointName string, iface interface{}) { + glog.V(logger.Info).Infof("RAFT-CHECKPOINT %s %v\n", checkpointName, iface) +} + +func blockExtendsChain(block *types.Block, chain *core.BlockChain) bool { + return block.ParentHash() == chain.CurrentBlock().Hash() +} + +func (pm *ProtocolManager) applyNewChainHead(block *types.Block) { + if !blockExtendsChain(block, pm.blockchain) { + headBlock := pm.blockchain.CurrentBlock() + + glog.V(logger.Warn).Infof("Non-extending block: %x (parent is %x; current head is %x)\n", block.Hash(), block.ParentHash(), headBlock.Hash()) + + pm.eventMux.Post(InvalidRaftOrdering{headBlock: headBlock, invalidBlock: block}) + } else { + if existingBlock := pm.blockchain.GetBlockByHash(block.Hash()); nil == existingBlock { + if err := pm.blockchain.Validator().ValidateBlock(block); err != nil { + panic(fmt.Sprintf("failed to validate block %x (%v)", block.Hash(), err)) + } + } + + for _, tx := range block.Transactions() { + logCheckpoint(txAccepted, tx.Hash().Hex()) + } + + if pm.blockchain.HasBlock(block.Hash()) { + // This node mined the block, so it was already in the + // DB. We simply extend the chain: + pm.blockchain.SetNewHeadBlock(block) + } else { + // + // This will broadcast a CHE *almost always*. It does its + // broadcasting at the end in a goroutine, but only conditionally if + // the chain head is in a certain state. For now, we will broadcast + // a CHE ourselves below to guarantee correctness. + // + _, err := pm.blockchain.InsertChain([]*types.Block{block}) + + if err != nil { + panic(fmt.Sprintf("failed to extend chain: %s", err.Error())) + } + } + + pm.eventMux.Post(core.ChainHeadEvent{Block: block}) + glog.V(logger.Info).Infof("Successfully extended chain: %x\n", block.Hash()) + } +} + +// Sets new appliedIndex in-memory, *and* writes this appliedIndex to LevelDB. +func (pm *ProtocolManager) advanceAppliedIndex(index uint64) { + pm.appliedIndex = index + + pm.writeAppliedIndex(index) +} diff --git a/raft/listener.go b/raft/listener.go new file mode 100644 index 0000000000..4836b04d2e --- /dev/null +++ b/raft/listener.go @@ -0,0 +1,59 @@ +// Copyright 2015 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package raft + +import ( + "errors" + "net" + "time" +) + +// stoppableListener sets TCP keep-alive timeouts on accepted +// connections and waits on stopc message +type stoppableListener struct { + *net.TCPListener + stopc <-chan struct{} +} + +func newStoppableListener(addr string, stopc <-chan struct{}) (*stoppableListener, error) { + ln, err := net.Listen("tcp", addr) + if err != nil { + return nil, err + } + return &stoppableListener{ln.(*net.TCPListener), stopc}, nil +} + +func (ln stoppableListener) Accept() (c net.Conn, err error) { + connc := make(chan *net.TCPConn, 1) + errc := make(chan error, 1) + go func() { + tc, err := ln.AcceptTCP() + if err != nil { + errc <- err + return + } + connc <- tc + }() + select { + case <-ln.stopc: + return nil, errors.New("server stopped") + case err := <-errc: + return nil, err + case tc := <-connc: + tc.SetKeepAlive(true) + tc.SetKeepAlivePeriod(3 * time.Minute) + return tc, nil + } +} diff --git a/raft/minter.go b/raft/minter.go new file mode 100644 index 0000000000..9c2daca157 --- /dev/null +++ b/raft/minter.go @@ -0,0 +1,426 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package raft + +import ( + "fmt" + "math/big" + "sync" + "sync/atomic" + "time" + + "github.com/eapache/channels" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" +) + +// Current state information for building the next block +type work struct { + config *core.ChainConfig + publicState *state.StateDB + privateState *state.StateDB + Block *types.Block + header *types.Header +} + +type minter struct { + config *core.ChainConfig + mu sync.Mutex + mux *event.TypeMux + eth core.Backend + chain *core.BlockChain + chainDb ethdb.Database + coinbase common.Address + minting int32 // Atomic status counter + shouldMine *channels.RingChannel + blockTime time.Duration + speculativeChain *speculativeChain +} + + +func newMinter(config *core.ChainConfig, eth core.Backend, blockTime time.Duration) *minter { + minter := &minter{ + config: config, + eth: eth, + mux: eth.EventMux(), + chainDb: eth.ChainDb(), + chain: eth.BlockChain(), + shouldMine: channels.NewRingChannel(1), + blockTime: blockTime, + speculativeChain: newSpeculativeChain(), + } + events := minter.mux.Subscribe( + core.ChainHeadEvent{}, + core.TxPreEvent{}, + InvalidRaftOrdering{}, + ) + + minter.speculativeChain.clear(minter.chain.CurrentBlock()) + + go minter.eventLoop(events) + go minter.mintingLoop() + + return minter +} + +func (minter *minter) start() { + atomic.StoreInt32(&minter.minting, 1) + minter.requestMinting() +} + +func (minter *minter) stop() { + minter.mu.Lock() + defer minter.mu.Unlock() + + minter.speculativeChain.clear(minter.chain.CurrentBlock()) + atomic.StoreInt32(&minter.minting, 0) +} + +// Notify the minting loop that minting should occur, if it's not already been +// requested. Due to the use of a RingChannel, this function is idempotent if +// called multiple times before the minting occurs. +func (minter *minter) requestMinting() { + minter.shouldMine.In() <- struct{}{} +} + +type AddressTxes map[common.Address]types.Transactions + +func (minter *minter) updateSpeculativeChainPerNewHead(newHeadBlock *types.Block) { + minter.mu.Lock() + defer minter.mu.Unlock() + + minter.speculativeChain.accept(newHeadBlock) +} + +func (minter *minter) updateSpeculativeChainPerInvalidOrdering(headBlock *types.Block, invalidBlock *types.Block) { + invalidHash := invalidBlock.Hash() + + glog.V(logger.Warn).Infof("Handling InvalidRaftOrdering for invalid block %x; current head is %x\n", invalidHash, headBlock.Hash()) + + minter.mu.Lock() + defer minter.mu.Unlock() + + // 1. if the block is not in our db, exit. someone else mined this. + if !minter.chain.HasBlock(invalidHash) { + glog.V(logger.Warn).Infof("Someone else mined invalid block %x; ignoring\n", invalidHash) + + return + } + + minter.speculativeChain.unwindFrom(invalidHash, headBlock) +} + +func (minter *minter) eventLoop(events event.Subscription) { + for event := range events.Chan() { + switch ev := event.Data.(type) { + case core.ChainHeadEvent: + newHeadBlock := ev.Block + + if atomic.LoadInt32(&minter.minting) == 1 { + minter.updateSpeculativeChainPerNewHead(newHeadBlock) + + // + // TODO(bts): not sure if this is the place, but we're going to + // want to put an upper limit on our speculative mining chain + // length. + // + + minter.requestMinting() + } else { + minter.mu.Lock() + minter.speculativeChain.setHead(newHeadBlock) + minter.mu.Unlock() + } + + case core.TxPreEvent: + if atomic.LoadInt32(&minter.minting) == 1 { + minter.requestMinting() + } + + case InvalidRaftOrdering: + headBlock := ev.headBlock + invalidBlock := ev.invalidBlock + + minter.updateSpeculativeChainPerInvalidOrdering(headBlock, invalidBlock) + } + } +} + +// Returns a wrapper around no-arg func `f` which can be called without limit +// and returns immediately: this will call the underlying func `f` at most once +// every `rate`. If this function is called more than once before the underlying +// `f` is invoked (per this rate limiting), `f` will only be called *once*. +// +// TODO(joel): this has a small bug in that you can't call it *immediately* when +// first allocated. +func throttle(rate time.Duration, f func()) func() { + request := channels.NewRingChannel(1) + + // every tick, block waiting for another request. then serve it immediately + go func() { + ticker := time.NewTicker(rate) + defer ticker.Stop() + + for range ticker.C { + <-request.Out() + go f() + } + }() + + return func() { + request.In() <- struct{}{} + } +} + +// This function spins continuously, blocking until a block should be created +// (via requestMinting()). This is throttled by `minter.blockTime`: +// +// 1. A block is guaranteed to be minted within `blockTime` of being +// requested. +// 2. We never mint a block more frequently than `blockTime`. +func (minter *minter) mintingLoop() { + throttledMintNewBlock := throttle(minter.blockTime, func() { + if atomic.LoadInt32(&minter.minting) == 1 { + minter.mintNewBlock() + } + }) + + for range minter.shouldMine.Out() { + throttledMintNewBlock() + } +} + +func generateNanoTimestamp(parent *types.Block) (tstamp int64) { + parentTime := parent.Time().Int64() + tstamp = time.Now().UnixNano() + + if parentTime >= tstamp { + // Each successive block needs to be after its predecessor. + tstamp = parentTime + 1 + } + + return +} + +// Assumes mu is held. +func (minter *minter) createWork() *work { + parent := minter.speculativeChain.head + parentNumber := parent.Number() + tstamp := generateNanoTimestamp(parent) + + header := &types.Header{ + ParentHash: parent.Hash(), + Number: parentNumber.Add(parentNumber, common.Big1), + Difficulty: core.CalcDifficulty(minter.config, uint64(tstamp), parent.Time().Uint64(), parent.Number(), parent.Difficulty()), + GasLimit: core.CalcGasLimit(parent), + GasUsed: new(big.Int), + Coinbase: minter.coinbase, + Time: big.NewInt(tstamp), + } + + publicState, privateState, err := minter.chain.StateAt(parent.Root()) + if err != nil { + panic(fmt.Sprint("failed to get parent state: ", err)) + } + + return &work{ + config: minter.config, + publicState: publicState, + privateState: privateState, + header: header, + } +} + +func (minter *minter) getTransactions() *types.TransactionsByPriceAndNonce { + allAddrTxes := minter.eth.TxPool().Pending() + addrTxes := minter.speculativeChain.withoutProposedTxes(allAddrTxes) + return types.NewTransactionsByPriceAndNonce(addrTxes) +} + +// Sends-off events asynchronously. +func (minter *minter) firePendingBlockEvents(logs vm.Logs) { + // Copy logs before we mutate them, adding a block hash. + copiedLogs := make(vm.Logs, len(logs)) + for i, l := range logs { + copiedLogs[i] = new(vm.Log) + *copiedLogs[i] = *l + } + + go func() { + minter.mux.Post(core.PendingLogsEvent{Logs: copiedLogs}) + minter.mux.Post(core.PendingStateEvent{}) + }() +} + +func (minter *minter) fireMintedBlockEvents(block *types.Block, logs vm.Logs) { + minter.mux.Post(core.NewMinedBlockEvent{Block: block}) + minter.mux.Post(core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs}) + + // NOTE: we're currently not doing this because the block is not in the + // chain yet, and it seems like that's a prerequisite for this? + // + // TODO: do we need to do this in handleLogCommands in the case where we + // minted the block? + // + // minter.mux.Post(work.publicState.Logs()) +} + +func (minter *minter) mintNewBlock() { + minter.mu.Lock() + defer minter.mu.Unlock() + + work := minter.createWork() + transactions := minter.getTransactions() + + committedTxes, publicReceipts, privateReceipts, logs := work.commitTransactions(transactions, minter.chain) + txCount := len(committedTxes) + + if txCount == 0 { + glog.V(logger.Info).Infoln("Not minting a new block since there are no pending transactions") + return + } + + minter.firePendingBlockEvents(logs) + + header := work.header + + // commit state root after all state transitions. + core.AccumulateRewards(work.publicState, header, nil) + header.Root = work.publicState.IntermediateRoot() + + // NOTE: < QuorumChain creates a signature here and puts it in header.Extra. > + + allReceipts := append(publicReceipts, privateReceipts...) + header.Bloom = types.CreateBloom(allReceipts) + + // update block hash since it is now available, but was not when the + // receipt/log of individual transactions were created: + headerHash := header.Hash() + for _, l := range logs { + l.BlockHash = headerHash + } + + block := types.NewBlock(header, committedTxes, nil, publicReceipts) + + glog.V(logger.Info).Infof("Generated next block #%v with [%d txns]", block.Number(), txCount) + + if _, err := work.publicState.Commit(); err != nil { + panic(fmt.Sprint("error committing public state: ", err)) + } + privateStateRoot, privStateErr := work.privateState.Commit() + if privStateErr != nil { + panic(fmt.Sprint("error committing private state: ", privStateErr)) + } + + if err := core.WritePrivateStateRoot(minter.chainDb, block.Root(), privateStateRoot); err != nil { + panic(fmt.Sprint("error writing private state root: ", err)) + } + if err := minter.chain.WriteDetachedBlock(block); err != nil { + panic(fmt.Sprint("error writing block to chain: ", err)) + } + if err := core.WriteTransactions(minter.chainDb, block); err != nil { + panic(fmt.Sprint("error writing txes: ", err)) + } + if err := core.WriteReceipts(minter.chainDb, allReceipts); err != nil { + panic(fmt.Sprint("error writing receipts: ", err)) + } + if err := core.WriteMipmapBloom(minter.chainDb, block.NumberU64(), allReceipts); err != nil { + panic(fmt.Sprint("error writing mipmap bloom: ", err)) + } + if err := core.WritePrivateBlockBloom(minter.chainDb, block.NumberU64(), privateReceipts); err != nil { + panic(fmt.Sprint("error writing private block bloom: ", err)) + } + if err := core.WriteBlockReceipts(minter.chainDb, block.Hash(), block.Number().Uint64(), allReceipts); err != nil { + panic(fmt.Sprint("error writing block receipts: ", err)) + } + + minter.speculativeChain.extend(block) + + minter.fireMintedBlockEvents(block, logs) + + elapsed := time.Since(time.Unix(0, header.Time.Int64())) + glog.V(logger.Info).Infof("🔨 Mined block (#%v / %x) in %v", block.Number(), block.Hash().Bytes()[:4], elapsed) +} + +func (env *work) commitTransactions(txes *types.TransactionsByPriceAndNonce, bc *core.BlockChain) (types.Transactions, types.Receipts, types.Receipts, vm.Logs) { + var logs vm.Logs + var committedTxes types.Transactions + var publicReceipts types.Receipts + var privateReceipts types.Receipts + + gp := new(core.GasPool).AddGas(env.header.GasLimit) + txCount := 0 + + for { + tx := txes.Peek() + if tx == nil { + break + } + + env.publicState.StartRecord(tx.Hash(), common.Hash{}, 0) + + publicReceipt, privateReceipt, err := env.commitTransaction(tx, bc, gp) + switch { + case err != nil: + if glog.V(logger.Detail) { + glog.Infof("TX (%x) failed, will be removed: %v\n", tx.Hash().Bytes()[:4], err) + } + txes.Pop() // skip rest of txes from this account + default: + txCount++ + committedTxes = append(committedTxes, tx) + + logs = append(logs, publicReceipt.Logs...) + publicReceipts = append(publicReceipts, publicReceipt) + + if privateReceipt != nil { + logs = append(logs, privateReceipt.Logs...) + privateReceipts = append(privateReceipts, privateReceipt) + } + + txes.Shift() + } + } + + return committedTxes, publicReceipts, privateReceipts, logs +} + +func (env *work) commitTransaction(tx *types.Transaction, bc *core.BlockChain, gp *core.GasPool) (*types.Receipt, *types.Receipt, error) { + publicSnapshot := env.publicState.Snapshot() + privateSnapshot := env.privateState.Snapshot() + + // + // TODO(bts): look into that core.ApplyTransaction is not currently + // returning any logs? + // + publicReceipt, privateReceipt, _, err := core.ApplyTransaction(env.config, bc, gp, env.publicState, env.privateState, env.header, tx, env.header.GasUsed, env.config.VmConfig) + if err != nil { + env.publicState.RevertToSnapshot(publicSnapshot) + env.privateState.RevertToSnapshot(privateSnapshot) + + return nil, nil, err + } + + return publicReceipt, privateReceipt, nil +} diff --git a/raft/persistence.go b/raft/persistence.go new file mode 100644 index 0000000000..bcbad77591 --- /dev/null +++ b/raft/persistence.go @@ -0,0 +1,75 @@ +package raft + +import ( + "encoding/binary" + + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" + + "github.com/syndtr/goleveldb/leveldb/errors" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/opt" +) + +var ( + noFsync = &opt.WriteOptions{ + NoWriteMerge: false, + Sync: false, + } + + mustFsync = &opt.WriteOptions{ + NoWriteMerge: false, + Sync: true, + } +) + +func openQuorumRaftDb(path string) (db *leveldb.DB, err error) { + // Open the db and recover any potential corruptions + db, err = leveldb.OpenFile(path, &opt.Options{ + OpenFilesCacheCapacity: -1, // -1 means 0?? + BlockCacheCapacity: -1, + }) + if _, corrupted := err.(*errors.ErrCorrupted); corrupted { + db, err = leveldb.RecoverFile(path, nil) + } + return +} + +func (pm *ProtocolManager) loadAppliedIndex() uint64 { + dat, err := pm.quorumRaftDb.Get(appliedDbKey, nil) + var lastAppliedIndex uint64 + if err == errors.ErrNotFound { + lastAppliedIndex = 0 + } else if err != nil { + glog.Fatalln(err) + } else { + lastAppliedIndex = binary.LittleEndian.Uint64(dat) + } + + glog.V(logger.Info).Infof("Persistent applied index load: %d", lastAppliedIndex) + pm.appliedIndex = lastAppliedIndex + return lastAppliedIndex +} + +func (pm *ProtocolManager) writeAppliedIndex(index uint64) { + glog.V(logger.Info).Infof("Persistent applied index write: %d", index) + buf := make([]byte, 8) + binary.LittleEndian.PutUint64(buf, index) + pm.quorumRaftDb.Put(appliedDbKey, buf, noFsync) +} + +func (pm *ProtocolManager) loadPeerUrl(nodeId uint64) string { + peerUrlKey := []byte(peerUrlKeyPrefix + string(nodeId)) + value, err := pm.quorumRaftDb.Get(peerUrlKey, nil) + if err != nil { + glog.Fatalf("failed to read peer url for peer %d from leveldb: %v", nodeId, err) + } + return string(value) +} + +func (pm *ProtocolManager) writePeerUrl(nodeId uint64, url string) { + key := []byte(peerUrlKeyPrefix + string(nodeId)) + value := []byte(url) + + pm.quorumRaftDb.Put(key, value, mustFsync) +} diff --git a/raft/snapshot.go b/raft/snapshot.go new file mode 100644 index 0000000000..8e45702681 --- /dev/null +++ b/raft/snapshot.go @@ -0,0 +1,82 @@ +package raft + +import ( + "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/wal/walpb" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" + "github.com/coreos/etcd/snap" +) + +func (pm *ProtocolManager) saveSnapshot(snap raftpb.Snapshot) error { + if err := pm.snapshotter.SaveSnap(snap); err != nil { + return err + } + + walSnap := walpb.Snapshot { + Index: snap.Metadata.Index, + Term: snap.Metadata.Term, + } + + if err := pm.wal.SaveSnapshot(walSnap); err != nil { + return err + } + + return pm.wal.ReleaseLockTo(snap.Metadata.Index) +} + +func (pm *ProtocolManager) maybeTriggerSnapshot() { + if pm.appliedIndex - pm.snapshotIndex < snapshotPeriod { + return + } + + pm.triggerSnapshot() +} + +func (pm *ProtocolManager) triggerSnapshot() { + glog.V(logger.Info).Infof("start snapshot [applied index: %d | last snapshot index: %d]", pm.appliedIndex, pm.snapshotIndex) + snapData := pm.blockchain.CurrentBlock().Hash().Bytes() + snap, err := pm.raftStorage.CreateSnapshot(pm.appliedIndex, &pm.confState, snapData) + if err != nil { + panic(err) + } + if err := pm.saveSnapshot(snap); err != nil { + panic(err) + } + // Discard all log entries prior to appliedIndex. + if err := pm.raftStorage.Compact(pm.appliedIndex); err != nil { + panic(err) + } + glog.V(logger.Info).Infof("compacted log at index %d", pm.appliedIndex) + pm.snapshotIndex = pm.appliedIndex +} + +// For persisting cluster membership changes correctly, we need to trigger a +// snapshot before advancing our persisted appliedIndex in LevelDB. +// +// See handling of EntryConfChange entries in raft/handler.go for details. +func (pm *ProtocolManager) triggerSnapshotWithNextIndex(index uint64) { + pm.appliedIndex = index + pm.triggerSnapshot() +} + +func (pm *ProtocolManager) loadSnapshot() *raftpb.Snapshot { + snapshot, err := pm.snapshotter.Load() + if err != nil && err != snap.ErrNoSnapshot { + glog.Fatalf("error loading snapshot: %v", err) + } + + return snapshot +} + +func (pm *ProtocolManager) applySnapshot(snap raftpb.Snapshot) { + if err := pm.raftStorage.ApplySnapshot(snap); err != nil { + glog.Fatalln("failed to apply snapshot: ", err) + } + + snapMeta := snap.Metadata + + pm.confState = snapMeta.ConfState + pm.snapshotIndex = snapMeta.Index + pm.advanceAppliedIndex(snapMeta.Index) +} \ No newline at end of file diff --git a/raft/speculative_chain.go b/raft/speculative_chain.go new file mode 100644 index 0000000000..0aa9351ca6 --- /dev/null +++ b/raft/speculative_chain.go @@ -0,0 +1,175 @@ +package raft + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" + + "gopkg.in/fatih/set.v0" + lane "gopkg.in/oleiade/lane.v1" +) + +// The speculative chain represents blocks that we have minted which haven't been accepted into the chain yet, building +// on each other in a chain. It has three basic operations: +// * add new block to end +// * accept / remove oldest block +// * unwind / remove invalid blocks to the end +// +// Additionally: +// * clear state when we stop minting +// * set the parent when we're not minting (so it's always current) +type speculativeChain struct { + head *types.Block + unappliedBlocks *lane.Deque + expectedInvalidBlockHashes *set.Set // This is thread-safe. This set is referred to as our "guard" below. + proposedTxes *set.Set // This is thread-safe. +} + +func newSpeculativeChain() *speculativeChain { + return &speculativeChain { + head: nil, + unappliedBlocks: lane.NewDeque(), + expectedInvalidBlockHashes: set.New(), + proposedTxes: set.New(), + } +} + +func (chain *speculativeChain) clear(block *types.Block) { + chain.head = block + chain.unappliedBlocks = lane.NewDeque() + chain.expectedInvalidBlockHashes.Clear() + chain.proposedTxes.Clear() +} + +// Append a new speculative block +func (chain *speculativeChain) extend(block *types.Block) { + chain.head = block + chain.recordProposedTransactions(block.Transactions()) + chain.unappliedBlocks.Append(block) +} + +// Set the parent of the speculative chain +// +// Note: This is only called when not minter +func (chain *speculativeChain) setHead(block *types.Block) { + chain.head = block +} + +// Accept this block, removing it from the head of the speculative chain +func (chain *speculativeChain) accept(acceptedBlock *types.Block) { + earliestProposedI := chain.unappliedBlocks.Shift() + var earliestProposed *types.Block + if nil != earliestProposedI { + earliestProposed = earliestProposedI.(*types.Block) + } + + if expectedBlock := earliestProposed == nil || earliestProposed.Hash() == acceptedBlock.Hash(); expectedBlock { + // Remove the txes in this accepted block from our blacklist. + chain.removeProposedTxes(acceptedBlock) + } else { + glog.V(logger.Warn).Infof("Another node minted %x; Clearing speculative state\n", acceptedBlock.Hash()) + + chain.clear(acceptedBlock) + } +} + +// Remove all blocks in the chain from the specified one until the end +func (chain *speculativeChain) unwindFrom(invalidHash common.Hash, headBlock *types.Block) { + + // check our "guard" to see if this is a (descendant) block we're + // expected to be ruled invalid. if we find it, remove from the guard + if chain.expectedInvalidBlockHashes.Has(invalidHash) { + glog.V(logger.Warn).Infof("Removing expected-invalid block %x from guard.\n", invalidHash) + + chain.expectedInvalidBlockHashes.Remove(invalidHash) + + return + } + + // pop from the RHS repeatedly, updating minter.parent each time. if not + // our block, add to guard. in all cases, call removeProposedTxes + for { + currBlockI := chain.unappliedBlocks.Pop() + + if nil == currBlockI { + glog.V(logger.Warn).Infof("(Popped all blocks from queue.)\n") + + break + } + + currBlock := currBlockI.(*types.Block) + + glog.V(logger.Info).Infof("Popped block %x from queue RHS.\n", currBlock.Hash()) + + // Maintain invariant: the parent always points the last speculative block or the head of the blockchain + // if there are not speculative blocks. + if speculativeParentI := chain.unappliedBlocks.Last(); nil != speculativeParentI { + chain.head = speculativeParentI.(*types.Block) + } else { + chain.head = headBlock + } + + chain.removeProposedTxes(currBlock) + + if currBlock.Hash() != invalidHash { + glog.V(logger.Warn).Infof("Haven't yet found block %x; adding descendent %x to guard.\n", invalidHash, currBlock.Hash()) + + chain.expectedInvalidBlockHashes.Add(currBlock.Hash()) + } else { + break + } + } +} + +// We keep track of txes we've put in all newly-mined blocks since the last +// ChainHeadEvent, and filter them out so that we don't try to create blocks +// with the same transactions. This is necessary because the TX pool will keep +// supplying us these transactions until they are in the chain (after having +// flown through raft). +func (chain *speculativeChain) recordProposedTransactions(txes types.Transactions) { + txHashIs := make([]interface{}, len(txes)) + for i, tx := range txes { + txHashIs[i] = tx.Hash() + } + chain.proposedTxes.Add(txHashIs...) +} + +// Removes txes in block from our "blacklist" of "proposed tx" hashes. When we +// create a new block and use txes from the tx pool, we ignore those that we +// have already used ("proposed"), but that haven't yet officially made it into +// the chain yet. +// +// It's important to remove hashes from this blacklist (once we know we don't +// need them in there anymore) so that it doesn't grow endlessly. +func (chain *speculativeChain) removeProposedTxes(block *types.Block) { + minedTxes := block.Transactions() + minedTxInterfaces := make([]interface{}, len(minedTxes)) + for i, tx := range minedTxes { + minedTxInterfaces[i] = tx.Hash() + } + + // NOTE: we are using a thread-safe Set here, so it's fine if we access this + // here and in mintNewBlock concurrently. using a finer-grained set-specific + // lock here is preferable, because mintNewBlock holds its locks for a + // nontrivial amount of time. + chain.proposedTxes.Remove(minedTxInterfaces...) +} + +func (chain *speculativeChain) withoutProposedTxes(addrTxes AddressTxes) AddressTxes { + newMap := make(AddressTxes) + + for addr, txes := range addrTxes { + filteredTxes := make(types.Transactions, 0) + for _, tx := range txes { + if !chain.proposedTxes.Has(tx.Hash()) { + filteredTxes = append(filteredTxes, tx) + } + } + if len(filteredTxes) > 0 { + newMap[addr] = filteredTxes + } + } + + return newMap +} \ No newline at end of file diff --git a/raft/wal.go b/raft/wal.go new file mode 100644 index 0000000000..16e3186960 --- /dev/null +++ b/raft/wal.go @@ -0,0 +1,60 @@ +package raft + +import ( + "os" + + "github.com/coreos/etcd/wal" + "github.com/coreos/etcd/wal/walpb" + "github.com/ethereum/go-ethereum/logger/glog" + "github.com/coreos/etcd/raft/raftpb" + "github.com/ethereum/go-ethereum/logger" +) + +func (pm *ProtocolManager) openWAL(maybeSnapshot *raftpb.Snapshot) *wal.WAL { + if !wal.Exist(pm.waldir) { + if err := os.Mkdir(pm.waldir, 0750); err != nil { + glog.Fatalf("cannot create waldir (%v)", err) + } + + wal, err := wal.Create(pm.waldir, nil) + if err != nil { + glog.Fatalf("failed to create waldir (%v)", err) + } + wal.Close() + } + + walsnap := walpb.Snapshot{} + if maybeSnapshot != nil { + walsnap.Index = maybeSnapshot.Metadata.Index + walsnap.Term = maybeSnapshot.Metadata.Term + } + + glog.V(logger.Info).Infof("loading WAL at term %d and index %d", walsnap.Term, walsnap.Index) + + wal, err := wal.Open(pm.waldir, walsnap) + if err != nil { + glog.Fatalf("error loading WAL (%v)", err) + } + + return wal +} + +func (pm *ProtocolManager) replayWAL() *wal.WAL { + glog.V(logger.Info).Infoln("replaying WAL") + maybeSnapshot := pm.loadSnapshot() + wal := pm.openWAL(maybeSnapshot) + + _, hardState, entries, err := wal.ReadAll() + if err != nil { + glog.Fatalf("failed to read WAL (%v)", err) + } + + if maybeSnapshot != nil { + pm.applySnapshot(*maybeSnapshot) + } + + pm.raftStorage.SetHardState(hardState) + pm.raftStorage.Append(entries) + + return wal +} diff --git a/rpc/server.go b/rpc/server.go index 324be4f79b..465de72976 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -32,7 +32,7 @@ const ( notificationBufferSize = 10000 // max buffered notifications before codec is closed MetadataApi = "rpc" - DefaultIPCApis = "admin,debug,eth,net,personal,quorum,shh,txpool,web3" + DefaultIPCApis = "admin,debug,eth,net,personal,quorum,raft,shh,txpool,web3" DefaultHTTPApis = "eth,net,web3" ) diff --git a/vendor.conf b/vendor.conf index 58837ee383..6570bb63bf 100644 --- a/vendor.conf +++ b/vendor.conf @@ -4,6 +4,7 @@ github.com/ethereum/go-ethereum # import github.com/cespare/cp 165db2f github.com/davecgh/go-spew v1.0.0-3-g6d21280 +github.com/eapache/channels 47238d5 github.com/ethereum/ethash v23.1-249-g214d4c0 github.com/fatih/color v1.1.0-4-gbf82308 github.com/gizak/termui d29684e @@ -36,5 +37,24 @@ gopkg.in/natefinch/npipe.v2 c1b8fa8 gopkg.in/sourcemap.v1 v1.0.3 gopkg.in/urfave/cli.v1 v1.18.1 +# raft dependencies +# TODO: overlay our modifications over latest release, and use the versions for their deps listed in their glide config +github.com/coreos/etcd v3.1.0 # With aa899a37086c4b2a839bf06d927d72d64de manually added +github.com/coreos/pkg/capnslog 3ac0863 +github.com/coreos/go-systemd/journal 48702e0da86bd25e76cfef347e2adeb434a0d0a6 +github.com/eapache/queue 44cc805 +github.com/golang/protobuf/proto 4bd1920723d7b7c925de087aa32e2187708897f7 +github.com/prometheus/client_golang/prometheus c5b7fccd204277076155f10851dad72b76a49317 +github.com/beorn7/perks/quantile 4c0e845 +github.com/prometheus/client_model/go fa8ad6f +github.com/prometheus/common/expfmt 195bde7883f7c39ea62b0d92ab7359b5327065cb +github.com/matttproud/golang_protobuf_extensions/pbutil c12348c +github.com/prometheus/common/internal/bitbucket.org/ww/goautoneg 195bde7883f7c39ea62b0d92ab7359b5327065cb +github.com/prometheus/common/model 195bde7883f7c39ea62b0d92ab7359b5327065cb +github.com/prometheus/procfs fcdb11ccb4389efb1b210b7ffb623ab71c5fdd60 +gopkg.in/oleiade/lane.v1 28f7c3f +github.com/coreos/go-semver 568e959cd89871e61434c1143528d9162da89ef2 +github.com/xiang90/probing 07dd2e8dfe18522e9c447ba95f2fe95262f63bb2 + # exclude -golang.org/x/net/context diff --git a/vendor/github.com/beorn7/perks/quantile/exampledata.txt b/vendor/github.com/beorn7/perks/quantile/exampledata.txt new file mode 100644 index 0000000000..1602287d7c --- /dev/null +++ b/vendor/github.com/beorn7/perks/quantile/exampledata.txt @@ -0,0 +1,2388 @@ +8 +5 +26 +12 +5 +235 +13 +6 +28 +30 +3 +3 +3 +3 +5 +2 +33 +7 +2 +4 +7 +12 +14 +5 +8 +3 +10 +4 +5 +3 +6 +6 +209 +20 +3 +10 +14 +3 +4 +6 +8 +5 +11 +7 +3 +2 +3 +3 +212 +5 +222 +4 +10 +10 +5 +6 +3 +8 +3 +10 +254 +220 +2 +3 +5 +24 +5 +4 +222 +7 +3 +3 +223 +8 +15 +12 +14 +14 +3 +2 +2 +3 +13 +3 +11 +4 +4 +6 +5 +7 +13 +5 +3 +5 +2 +5 +3 +5 +2 +7 +15 +17 +14 +3 +6 +6 +3 +17 +5 +4 +7 +6 +4 +4 +8 +6 +8 +3 +9 +3 +6 +3 +4 +5 +3 +3 +660 +4 +6 +10 +3 +6 +3 +2 +5 +13 +2 +4 +4 +10 +4 +8 +4 +3 +7 +9 +9 +3 +10 +37 +3 +13 +4 +12 +3 +6 +10 +8 +5 +21 +2 +3 +8 +3 +2 +3 +3 +4 +12 +2 +4 +8 +8 +4 +3 +2 +20 +1 +6 +32 +2 +11 +6 +18 +3 +8 +11 +3 +212 +3 +4 +2 +6 +7 +12 +11 +3 +2 +16 +10 +6 +4 +6 +3 +2 +7 +3 +2 +2 +2 +2 +5 +6 +4 +3 +10 +3 +4 +6 +5 +3 +4 +4 +5 +6 +4 +3 +4 +4 +5 +7 +5 +5 +3 +2 +7 +2 +4 +12 +4 +5 +6 +2 +4 +4 +8 +4 +15 +13 +7 +16 +5 +3 +23 +5 +5 +7 +3 +2 +9 +8 +7 +5 +8 +11 +4 +10 +76 +4 +47 +4 +3 +2 +7 +4 +2 +3 +37 +10 +4 +2 +20 +5 +4 +4 +10 +10 +4 +3 +7 +23 +240 +7 +13 +5 +5 +3 +3 +2 +5 +4 +2 +8 +7 +19 +2 +23 +8 +7 +2 +5 +3 +8 +3 +8 +13 +5 +5 +5 +2 +3 +23 +4 +9 +8 +4 +3 +3 +5 +220 +2 +3 +4 +6 +14 +3 +53 +6 +2 +5 +18 +6 +3 +219 +6 +5 +2 +5 +3 +6 +5 +15 +4 +3 +17 +3 +2 +4 +7 +2 +3 +3 +4 +4 +3 +2 +664 +6 +3 +23 +5 +5 +16 +5 +8 +2 +4 +2 +24 +12 +3 +2 +3 +5 +8 +3 +5 +4 +3 +14 +3 +5 +8 +2 +3 +7 +9 +4 +2 +3 +6 +8 +4 +3 +4 +6 +5 +3 +3 +6 +3 +19 +4 +4 +6 +3 +6 +3 +5 +22 +5 +4 +4 +3 +8 +11 +4 +9 +7 +6 +13 +4 +4 +4 +6 +17 +9 +3 +3 +3 +4 +3 +221 +5 +11 +3 +4 +2 +12 +6 +3 +5 +7 +5 +7 +4 +9 +7 +14 +37 +19 +217 +16 +3 +5 +2 +2 +7 +19 +7 +6 +7 +4 +24 +5 +11 +4 +7 +7 +9 +13 +3 +4 +3 +6 +28 +4 +4 +5 +5 +2 +5 +6 +4 +4 +6 +10 +5 +4 +3 +2 +3 +3 +6 +5 +5 +4 +3 +2 +3 +7 +4 +6 +18 +16 +8 +16 +4 +5 +8 +6 +9 +13 +1545 +6 +215 +6 +5 +6 +3 +45 +31 +5 +2 +2 +4 +3 +3 +2 +5 +4 +3 +5 +7 +7 +4 +5 +8 +5 +4 +749 +2 +31 +9 +11 +2 +11 +5 +4 +4 +7 +9 +11 +4 +5 +4 +7 +3 +4 +6 +2 +15 +3 +4 +3 +4 +3 +5 +2 +13 +5 +5 +3 +3 +23 +4 +4 +5 +7 +4 +13 +2 +4 +3 +4 +2 +6 +2 +7 +3 +5 +5 +3 +29 +5 +4 +4 +3 +10 +2 +3 +79 +16 +6 +6 +7 +7 +3 +5 +5 +7 +4 +3 +7 +9 +5 +6 +5 +9 +6 +3 +6 +4 +17 +2 +10 +9 +3 +6 +2 +3 +21 +22 +5 +11 +4 +2 +17 +2 +224 +2 +14 +3 +4 +4 +2 +4 +4 +4 +4 +5 +3 +4 +4 +10 +2 +6 +3 +3 +5 +7 +2 +7 +5 +6 +3 +218 +2 +2 +5 +2 +6 +3 +5 +222 +14 +6 +33 +3 +2 +5 +3 +3 +3 +9 +5 +3 +3 +2 +7 +4 +3 +4 +3 +5 +6 +5 +26 +4 +13 +9 +7 +3 +221 +3 +3 +4 +4 +4 +4 +2 +18 +5 +3 +7 +9 +6 +8 +3 +10 +3 +11 +9 +5 +4 +17 +5 +5 +6 +6 +3 +2 +4 +12 +17 +6 +7 +218 +4 +2 +4 +10 +3 +5 +15 +3 +9 +4 +3 +3 +6 +29 +3 +3 +4 +5 +5 +3 +8 +5 +6 +6 +7 +5 +3 +5 +3 +29 +2 +31 +5 +15 +24 +16 +5 +207 +4 +3 +3 +2 +15 +4 +4 +13 +5 +5 +4 +6 +10 +2 +7 +8 +4 +6 +20 +5 +3 +4 +3 +12 +12 +5 +17 +7 +3 +3 +3 +6 +10 +3 +5 +25 +80 +4 +9 +3 +2 +11 +3 +3 +2 +3 +8 +7 +5 +5 +19 +5 +3 +3 +12 +11 +2 +6 +5 +5 +5 +3 +3 +3 +4 +209 +14 +3 +2 +5 +19 +4 +4 +3 +4 +14 +5 +6 +4 +13 +9 +7 +4 +7 +10 +2 +9 +5 +7 +2 +8 +4 +6 +5 +5 +222 +8 +7 +12 +5 +216 +3 +4 +4 +6 +3 +14 +8 +7 +13 +4 +3 +3 +3 +3 +17 +5 +4 +3 +33 +6 +6 +33 +7 +5 +3 +8 +7 +5 +2 +9 +4 +2 +233 +24 +7 +4 +8 +10 +3 +4 +15 +2 +16 +3 +3 +13 +12 +7 +5 +4 +207 +4 +2 +4 +27 +15 +2 +5 +2 +25 +6 +5 +5 +6 +13 +6 +18 +6 +4 +12 +225 +10 +7 +5 +2 +2 +11 +4 +14 +21 +8 +10 +3 +5 +4 +232 +2 +5 +5 +3 +7 +17 +11 +6 +6 +23 +4 +6 +3 +5 +4 +2 +17 +3 +6 +5 +8 +3 +2 +2 +14 +9 +4 +4 +2 +5 +5 +3 +7 +6 +12 +6 +10 +3 +6 +2 +2 +19 +5 +4 +4 +9 +2 +4 +13 +3 +5 +6 +3 +6 +5 +4 +9 +6 +3 +5 +7 +3 +6 +6 +4 +3 +10 +6 +3 +221 +3 +5 +3 +6 +4 +8 +5 +3 +6 +4 +4 +2 +54 +5 +6 +11 +3 +3 +4 +4 +4 +3 +7 +3 +11 +11 +7 +10 +6 +13 +223 +213 +15 +231 +7 +3 +7 +228 +2 +3 +4 +4 +5 +6 +7 +4 +13 +3 +4 +5 +3 +6 +4 +6 +7 +2 +4 +3 +4 +3 +3 +6 +3 +7 +3 +5 +18 +5 +6 +8 +10 +3 +3 +3 +2 +4 +2 +4 +4 +5 +6 +6 +4 +10 +13 +3 +12 +5 +12 +16 +8 +4 +19 +11 +2 +4 +5 +6 +8 +5 +6 +4 +18 +10 +4 +2 +216 +6 +6 +6 +2 +4 +12 +8 +3 +11 +5 +6 +14 +5 +3 +13 +4 +5 +4 +5 +3 +28 +6 +3 +7 +219 +3 +9 +7 +3 +10 +6 +3 +4 +19 +5 +7 +11 +6 +15 +19 +4 +13 +11 +3 +7 +5 +10 +2 +8 +11 +2 +6 +4 +6 +24 +6 +3 +3 +3 +3 +6 +18 +4 +11 +4 +2 +5 +10 +8 +3 +9 +5 +3 +4 +5 +6 +2 +5 +7 +4 +4 +14 +6 +4 +4 +5 +5 +7 +2 +4 +3 +7 +3 +3 +6 +4 +5 +4 +4 +4 +3 +3 +3 +3 +8 +14 +2 +3 +5 +3 +2 +4 +5 +3 +7 +3 +3 +18 +3 +4 +4 +5 +7 +3 +3 +3 +13 +5 +4 +8 +211 +5 +5 +3 +5 +2 +5 +4 +2 +655 +6 +3 +5 +11 +2 +5 +3 +12 +9 +15 +11 +5 +12 +217 +2 +6 +17 +3 +3 +207 +5 +5 +4 +5 +9 +3 +2 +8 +5 +4 +3 +2 +5 +12 +4 +14 +5 +4 +2 +13 +5 +8 +4 +225 +4 +3 +4 +5 +4 +3 +3 +6 +23 +9 +2 +6 +7 +233 +4 +4 +6 +18 +3 +4 +6 +3 +4 +4 +2 +3 +7 +4 +13 +227 +4 +3 +5 +4 +2 +12 +9 +17 +3 +7 +14 +6 +4 +5 +21 +4 +8 +9 +2 +9 +25 +16 +3 +6 +4 +7 +8 +5 +2 +3 +5 +4 +3 +3 +5 +3 +3 +3 +2 +3 +19 +2 +4 +3 +4 +2 +3 +4 +4 +2 +4 +3 +3 +3 +2 +6 +3 +17 +5 +6 +4 +3 +13 +5 +3 +3 +3 +4 +9 +4 +2 +14 +12 +4 +5 +24 +4 +3 +37 +12 +11 +21 +3 +4 +3 +13 +4 +2 +3 +15 +4 +11 +4 +4 +3 +8 +3 +4 +4 +12 +8 +5 +3 +3 +4 +2 +220 +3 +5 +223 +3 +3 +3 +10 +3 +15 +4 +241 +9 +7 +3 +6 +6 +23 +4 +13 +7 +3 +4 +7 +4 +9 +3 +3 +4 +10 +5 +5 +1 +5 +24 +2 +4 +5 +5 +6 +14 +3 +8 +2 +3 +5 +13 +13 +3 +5 +2 +3 +15 +3 +4 +2 +10 +4 +4 +4 +5 +5 +3 +5 +3 +4 +7 +4 +27 +3 +6 +4 +15 +3 +5 +6 +6 +5 +4 +8 +3 +9 +2 +6 +3 +4 +3 +7 +4 +18 +3 +11 +3 +3 +8 +9 +7 +24 +3 +219 +7 +10 +4 +5 +9 +12 +2 +5 +4 +4 +4 +3 +3 +19 +5 +8 +16 +8 +6 +22 +3 +23 +3 +242 +9 +4 +3 +3 +5 +7 +3 +3 +5 +8 +3 +7 +5 +14 +8 +10 +3 +4 +3 +7 +4 +6 +7 +4 +10 +4 +3 +11 +3 +7 +10 +3 +13 +6 +8 +12 +10 +5 +7 +9 +3 +4 +7 +7 +10 +8 +30 +9 +19 +4 +3 +19 +15 +4 +13 +3 +215 +223 +4 +7 +4 +8 +17 +16 +3 +7 +6 +5 +5 +4 +12 +3 +7 +4 +4 +13 +4 +5 +2 +5 +6 +5 +6 +6 +7 +10 +18 +23 +9 +3 +3 +6 +5 +2 +4 +2 +7 +3 +3 +2 +5 +5 +14 +10 +224 +6 +3 +4 +3 +7 +5 +9 +3 +6 +4 +2 +5 +11 +4 +3 +3 +2 +8 +4 +7 +4 +10 +7 +3 +3 +18 +18 +17 +3 +3 +3 +4 +5 +3 +3 +4 +12 +7 +3 +11 +13 +5 +4 +7 +13 +5 +4 +11 +3 +12 +3 +6 +4 +4 +21 +4 +6 +9 +5 +3 +10 +8 +4 +6 +4 +4 +6 +5 +4 +8 +6 +4 +6 +4 +4 +5 +9 +6 +3 +4 +2 +9 +3 +18 +2 +4 +3 +13 +3 +6 +6 +8 +7 +9 +3 +2 +16 +3 +4 +6 +3 +2 +33 +22 +14 +4 +9 +12 +4 +5 +6 +3 +23 +9 +4 +3 +5 +5 +3 +4 +5 +3 +5 +3 +10 +4 +5 +5 +8 +4 +4 +6 +8 +5 +4 +3 +4 +6 +3 +3 +3 +5 +9 +12 +6 +5 +9 +3 +5 +3 +2 +2 +2 +18 +3 +2 +21 +2 +5 +4 +6 +4 +5 +10 +3 +9 +3 +2 +10 +7 +3 +6 +6 +4 +4 +8 +12 +7 +3 +7 +3 +3 +9 +3 +4 +5 +4 +4 +5 +5 +10 +15 +4 +4 +14 +6 +227 +3 +14 +5 +216 +22 +5 +4 +2 +2 +6 +3 +4 +2 +9 +9 +4 +3 +28 +13 +11 +4 +5 +3 +3 +2 +3 +3 +5 +3 +4 +3 +5 +23 +26 +3 +4 +5 +6 +4 +6 +3 +5 +5 +3 +4 +3 +2 +2 +2 +7 +14 +3 +6 +7 +17 +2 +2 +15 +14 +16 +4 +6 +7 +13 +6 +4 +5 +6 +16 +3 +3 +28 +3 +6 +15 +3 +9 +2 +4 +6 +3 +3 +22 +4 +12 +6 +7 +2 +5 +4 +10 +3 +16 +6 +9 +2 +5 +12 +7 +5 +5 +5 +5 +2 +11 +9 +17 +4 +3 +11 +7 +3 +5 +15 +4 +3 +4 +211 +8 +7 +5 +4 +7 +6 +7 +6 +3 +6 +5 +6 +5 +3 +4 +4 +26 +4 +6 +10 +4 +4 +3 +2 +3 +3 +4 +5 +9 +3 +9 +4 +4 +5 +5 +8 +2 +4 +2 +3 +8 +4 +11 +19 +5 +8 +6 +3 +5 +6 +12 +3 +2 +4 +16 +12 +3 +4 +4 +8 +6 +5 +6 +6 +219 +8 +222 +6 +16 +3 +13 +19 +5 +4 +3 +11 +6 +10 +4 +7 +7 +12 +5 +3 +3 +5 +6 +10 +3 +8 +2 +5 +4 +7 +2 +4 +4 +2 +12 +9 +6 +4 +2 +40 +2 +4 +10 +4 +223 +4 +2 +20 +6 +7 +24 +5 +4 +5 +2 +20 +16 +6 +5 +13 +2 +3 +3 +19 +3 +2 +4 +5 +6 +7 +11 +12 +5 +6 +7 +7 +3 +5 +3 +5 +3 +14 +3 +4 +4 +2 +11 +1 +7 +3 +9 +6 +11 +12 +5 +8 +6 +221 +4 +2 +12 +4 +3 +15 +4 +5 +226 +7 +218 +7 +5 +4 +5 +18 +4 +5 +9 +4 +4 +2 +9 +18 +18 +9 +5 +6 +6 +3 +3 +7 +3 +5 +4 +4 +4 +12 +3 +6 +31 +5 +4 +7 +3 +6 +5 +6 +5 +11 +2 +2 +11 +11 +6 +7 +5 +8 +7 +10 +5 +23 +7 +4 +3 +5 +34 +2 +5 +23 +7 +3 +6 +8 +4 +4 +4 +2 +5 +3 +8 +5 +4 +8 +25 +2 +3 +17 +8 +3 +4 +8 +7 +3 +15 +6 +5 +7 +21 +9 +5 +6 +6 +5 +3 +2 +3 +10 +3 +6 +3 +14 +7 +4 +4 +8 +7 +8 +2 +6 +12 +4 +213 +6 +5 +21 +8 +2 +5 +23 +3 +11 +2 +3 +6 +25 +2 +3 +6 +7 +6 +6 +4 +4 +6 +3 +17 +9 +7 +6 +4 +3 +10 +7 +2 +3 +3 +3 +11 +8 +3 +7 +6 +4 +14 +36 +3 +4 +3 +3 +22 +13 +21 +4 +2 +7 +4 +4 +17 +15 +3 +7 +11 +2 +4 +7 +6 +209 +6 +3 +2 +2 +24 +4 +9 +4 +3 +3 +3 +29 +2 +2 +4 +3 +3 +5 +4 +6 +3 +3 +2 +4 diff --git a/vendor/github.com/beorn7/perks/quantile/stream.go b/vendor/github.com/beorn7/perks/quantile/stream.go new file mode 100644 index 0000000000..f4cabd6695 --- /dev/null +++ b/vendor/github.com/beorn7/perks/quantile/stream.go @@ -0,0 +1,292 @@ +// Package quantile computes approximate quantiles over an unbounded data +// stream within low memory and CPU bounds. +// +// A small amount of accuracy is traded to achieve the above properties. +// +// Multiple streams can be merged before calling Query to generate a single set +// of results. This is meaningful when the streams represent the same type of +// data. See Merge and Samples. +// +// For more detailed information about the algorithm used, see: +// +// Effective Computation of Biased Quantiles over Data Streams +// +// http://www.cs.rutgers.edu/~muthu/bquant.pdf +package quantile + +import ( + "math" + "sort" +) + +// Sample holds an observed value and meta information for compression. JSON +// tags have been added for convenience. +type Sample struct { + Value float64 `json:",string"` + Width float64 `json:",string"` + Delta float64 `json:",string"` +} + +// Samples represents a slice of samples. It implements sort.Interface. +type Samples []Sample + +func (a Samples) Len() int { return len(a) } +func (a Samples) Less(i, j int) bool { return a[i].Value < a[j].Value } +func (a Samples) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +type invariant func(s *stream, r float64) float64 + +// NewLowBiased returns an initialized Stream for low-biased quantiles +// (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but +// error guarantees can still be given even for the lower ranks of the data +// distribution. +// +// The provided epsilon is a relative error, i.e. the true quantile of a value +// returned by a query is guaranteed to be within (1±Epsilon)*Quantile. +// +// See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error +// properties. +func NewLowBiased(epsilon float64) *Stream { + ƒ := func(s *stream, r float64) float64 { + return 2 * epsilon * r + } + return newStream(ƒ) +} + +// NewHighBiased returns an initialized Stream for high-biased quantiles +// (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but +// error guarantees can still be given even for the higher ranks of the data +// distribution. +// +// The provided epsilon is a relative error, i.e. the true quantile of a value +// returned by a query is guaranteed to be within 1-(1±Epsilon)*(1-Quantile). +// +// See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error +// properties. +func NewHighBiased(epsilon float64) *Stream { + ƒ := func(s *stream, r float64) float64 { + return 2 * epsilon * (s.n - r) + } + return newStream(ƒ) +} + +// NewTargeted returns an initialized Stream concerned with a particular set of +// quantile values that are supplied a priori. Knowing these a priori reduces +// space and computation time. The targets map maps the desired quantiles to +// their absolute errors, i.e. the true quantile of a value returned by a query +// is guaranteed to be within (Quantile±Epsilon). +// +// See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error properties. +func NewTargeted(targets map[float64]float64) *Stream { + ƒ := func(s *stream, r float64) float64 { + var m = math.MaxFloat64 + var f float64 + for quantile, epsilon := range targets { + if quantile*s.n <= r { + f = (2 * epsilon * r) / quantile + } else { + f = (2 * epsilon * (s.n - r)) / (1 - quantile) + } + if f < m { + m = f + } + } + return m + } + return newStream(ƒ) +} + +// Stream computes quantiles for a stream of float64s. It is not thread-safe by +// design. Take care when using across multiple goroutines. +type Stream struct { + *stream + b Samples + sorted bool +} + +func newStream(ƒ invariant) *Stream { + x := &stream{ƒ: ƒ} + return &Stream{x, make(Samples, 0, 500), true} +} + +// Insert inserts v into the stream. +func (s *Stream) Insert(v float64) { + s.insert(Sample{Value: v, Width: 1}) +} + +func (s *Stream) insert(sample Sample) { + s.b = append(s.b, sample) + s.sorted = false + if len(s.b) == cap(s.b) { + s.flush() + } +} + +// Query returns the computed qth percentiles value. If s was created with +// NewTargeted, and q is not in the set of quantiles provided a priori, Query +// will return an unspecified result. +func (s *Stream) Query(q float64) float64 { + if !s.flushed() { + // Fast path when there hasn't been enough data for a flush; + // this also yields better accuracy for small sets of data. + l := len(s.b) + if l == 0 { + return 0 + } + i := int(math.Ceil(float64(l) * q)) + if i > 0 { + i -= 1 + } + s.maybeSort() + return s.b[i].Value + } + s.flush() + return s.stream.query(q) +} + +// Merge merges samples into the underlying streams samples. This is handy when +// merging multiple streams from separate threads, database shards, etc. +// +// ATTENTION: This method is broken and does not yield correct results. The +// underlying algorithm is not capable of merging streams correctly. +func (s *Stream) Merge(samples Samples) { + sort.Sort(samples) + s.stream.merge(samples) +} + +// Reset reinitializes and clears the list reusing the samples buffer memory. +func (s *Stream) Reset() { + s.stream.reset() + s.b = s.b[:0] +} + +// Samples returns stream samples held by s. +func (s *Stream) Samples() Samples { + if !s.flushed() { + return s.b + } + s.flush() + return s.stream.samples() +} + +// Count returns the total number of samples observed in the stream +// since initialization. +func (s *Stream) Count() int { + return len(s.b) + s.stream.count() +} + +func (s *Stream) flush() { + s.maybeSort() + s.stream.merge(s.b) + s.b = s.b[:0] +} + +func (s *Stream) maybeSort() { + if !s.sorted { + s.sorted = true + sort.Sort(s.b) + } +} + +func (s *Stream) flushed() bool { + return len(s.stream.l) > 0 +} + +type stream struct { + n float64 + l []Sample + ƒ invariant +} + +func (s *stream) reset() { + s.l = s.l[:0] + s.n = 0 +} + +func (s *stream) insert(v float64) { + s.merge(Samples{{v, 1, 0}}) +} + +func (s *stream) merge(samples Samples) { + // TODO(beorn7): This tries to merge not only individual samples, but + // whole summaries. The paper doesn't mention merging summaries at + // all. Unittests show that the merging is inaccurate. Find out how to + // do merges properly. + var r float64 + i := 0 + for _, sample := range samples { + for ; i < len(s.l); i++ { + c := s.l[i] + if c.Value > sample.Value { + // Insert at position i. + s.l = append(s.l, Sample{}) + copy(s.l[i+1:], s.l[i:]) + s.l[i] = Sample{ + sample.Value, + sample.Width, + math.Max(sample.Delta, math.Floor(s.ƒ(s, r))-1), + // TODO(beorn7): How to calculate delta correctly? + } + i++ + goto inserted + } + r += c.Width + } + s.l = append(s.l, Sample{sample.Value, sample.Width, 0}) + i++ + inserted: + s.n += sample.Width + r += sample.Width + } + s.compress() +} + +func (s *stream) count() int { + return int(s.n) +} + +func (s *stream) query(q float64) float64 { + t := math.Ceil(q * s.n) + t += math.Ceil(s.ƒ(s, t) / 2) + p := s.l[0] + var r float64 + for _, c := range s.l[1:] { + r += p.Width + if r+c.Width+c.Delta > t { + return p.Value + } + p = c + } + return p.Value +} + +func (s *stream) compress() { + if len(s.l) < 2 { + return + } + x := s.l[len(s.l)-1] + xi := len(s.l) - 1 + r := s.n - 1 - x.Width + + for i := len(s.l) - 2; i >= 0; i-- { + c := s.l[i] + if c.Width+x.Width+x.Delta <= s.ƒ(s, r) { + x.Width += c.Width + s.l[xi] = x + // Remove element at i. + copy(s.l[i:], s.l[i+1:]) + s.l = s.l[:len(s.l)-1] + xi -= 1 + } else { + x = c + xi = i + } + r -= c.Width + } +} + +func (s *stream) samples() Samples { + samples := make(Samples, len(s.l)) + copy(samples, s.l) + return samples +} diff --git a/vendor/github.com/coreos/etcd/.dockerignore b/vendor/github.com/coreos/etcd/.dockerignore new file mode 100644 index 0000000000..6b8710a711 --- /dev/null +++ b/vendor/github.com/coreos/etcd/.dockerignore @@ -0,0 +1 @@ +.git diff --git a/vendor/github.com/coreos/etcd/.gitignore b/vendor/github.com/coreos/etcd/.gitignore new file mode 100644 index 0000000000..604fd4d27d --- /dev/null +++ b/vendor/github.com/coreos/etcd/.gitignore @@ -0,0 +1,14 @@ +/coverage +/gopath +/gopath.proto +/go-bindata +/machine* +/bin +.vagrant +*.etcd +/etcd +*.swp +/hack/insta-discovery/.env +*.test +tools/functional-tester/docker/bin +hack/tls-setup/certs diff --git a/vendor/github.com/coreos/etcd/.godir b/vendor/github.com/coreos/etcd/.godir new file mode 100644 index 0000000000..00ff6aa802 --- /dev/null +++ b/vendor/github.com/coreos/etcd/.godir @@ -0,0 +1 @@ +github.com/coreos/etcd diff --git a/vendor/github.com/coreos/etcd/.header b/vendor/github.com/coreos/etcd/.header new file mode 100644 index 0000000000..0446af6d87 --- /dev/null +++ b/vendor/github.com/coreos/etcd/.header @@ -0,0 +1,13 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. diff --git a/vendor/github.com/coreos/etcd/.travis.yml b/vendor/github.com/coreos/etcd/.travis.yml new file mode 100644 index 0000000000..40a65e0e52 --- /dev/null +++ b/vendor/github.com/coreos/etcd/.travis.yml @@ -0,0 +1,61 @@ +dist: trusty +language: go +go_import_path: github.com/coreos/etcd +sudo: false + +go: + - 1.7.4 + - tip + +env: + matrix: + - TARGET=amd64 + - TARGET=arm64 + - TARGET=arm + - TARGET=386 + - TARGET=ppc64le + +matrix: + fast_finish: true + allow_failures: + - go: tip + exclude: + - go: tip + env: TARGET=arm + - go: tip + env: TARGET=arm64 + - go: tip + env: TARGET=386 + - go: tip + env: TARGET=ppc64le + +addons: + apt: + packages: + - libpcap-dev + - libaspell-dev + - libhunspell-dev + +before_install: + - go get -v github.com/chzchzchz/goword + - go get -v honnef.co/go/simple/cmd/gosimple + - go get -v honnef.co/go/unused/cmd/unused + +# disable godep restore override +install: + - pushd cmd/etcd && go get -t -v ./... && popd + +script: + - > + case "${TARGET}" in + amd64) + GOARCH=amd64 ./test + ;; + 386) + GOARCH=386 PASSES="build unit" ./test + ;; + *) + # test building out of gopath + GO_BUILD_FLAGS="-a -v" GOPATH="" GOARCH="${TARGET}" ./build + ;; + esac diff --git a/vendor/github.com/coreos/etcd/CONTRIBUTING.md b/vendor/github.com/coreos/etcd/CONTRIBUTING.md new file mode 100644 index 0000000000..736f3f2d69 --- /dev/null +++ b/vendor/github.com/coreos/etcd/CONTRIBUTING.md @@ -0,0 +1,70 @@ +# How to contribute + +etcd is Apache 2.0 licensed and accepts contributions via GitHub pull requests. This document outlines some of the conventions on commit message formatting, contact points for developers and other resources to make getting your contribution into etcd easier. + +# Email and chat + +- Email: [etcd-dev](https://groups.google.com/forum/?hl=en#!forum/etcd-dev) +- IRC: #[coreos](irc://irc.freenode.org:6667/#coreos) IRC channel on freenode.org + +## Getting started + +- Fork the repository on GitHub +- Read the README.md for build instructions + +## Reporting bugs and creating issues + +Reporting bugs is one of the best ways to contribute. However, a good bug report +has some very specific qualities, so please read over our short document on +[reporting bugs](https://github.com/coreos/etcd/blob/master/Documentation/reporting_bugs.md) +before you submit your bug report. This document might contain links known +issues, another good reason to take a look there, before reporting your bug. + +## Contribution flow + +This is a rough outline of what a contributor's workflow looks like: + +- Create a topic branch from where you want to base your work. This is usually master. +- Make commits of logical units. +- Make sure your commit messages are in the proper format (see below). +- Push your changes to a topic branch in your fork of the repository. +- Submit a pull request to coreos/etcd. +- Your PR must receive a LGTM from two maintainers found in the MAINTAINERS file. + +Thanks for your contributions! + +### Code style + +The coding style suggested by the Golang community is used in etcd. See the [style doc](https://github.com/golang/go/wiki/CodeReviewComments) for details. + +Please follow this style to make etcd easy to review, maintain and develop. + +### Format of the commit message + +We follow a rough convention for commit messages that is designed to answer two +questions: what changed and why. The subject line should feature the what and +the body of the commit should describe the why. + +``` +scripts: add the test-cluster command + +this uses tmux to setup a test cluster that you can easily kill and +start for debugging. + +Fixes #38 +``` + +The format can be described more formally as follows: + +``` +: + + + +