Skip to content

Commit

Permalink
Track uptime & distribute rewards based on it (#618)
Browse files Browse the repository at this point in the history
  • Loading branch information
gakonst authored and Asa Oines committed Nov 21, 2019
1 parent c3e4005 commit 3ecd94a
Show file tree
Hide file tree
Showing 18 changed files with 370 additions and 16 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ end-to-end-defaults: &end-to-end-defaults
docker:
- image: celohq/node10-gcloud
environment:
CELO_MONOREPO_BRANCH_TO_TEST: victor/gateway-fee
CELO_MONOREPO_BRANCH_TO_TEST: master

jobs:
unit-tests:
Expand Down
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ var (
utils.IstanbulRequestTimeoutFlag,
utils.IstanbulBlockPeriodFlag,
utils.IstanbulProposerPolicyFlag,
utils.IstanbulLookbackWindowFlag,
utils.PingIPFromPacketFlag,
utils.UseInMemoryDiscoverTableFlag,
utils.VersionCheckFlag,
Expand Down
1 change: 1 addition & 0 deletions cmd/geth/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ var AppHelpFlagGroups = []flagGroup{
utils.IstanbulRequestTimeoutFlag,
utils.IstanbulBlockPeriodFlag,
utils.IstanbulProposerPolicyFlag,
utils.IstanbulLookbackWindowFlag,
},
},
{
Expand Down
8 changes: 8 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,11 @@ var (
Usage: "Default minimum difference between two consecutive block's timestamps in seconds",
Value: uint64(eth.DefaultConfig.Istanbul.ProposerPolicy),
}
IstanbulLookbackWindowFlag = cli.Uint64Flag{
Name: "istanbul.lookbackwindow",
Usage: "A validator's signature must be absent for this many consecutive blocks to be considered down for the uptime score",
Value: eth.DefaultConfig.Istanbul.LookbackWindow,
}

// Proxy node settings
ProxyFlag = cli.BoolFlag{
Expand Down Expand Up @@ -1203,6 +1208,9 @@ func setIstanbul(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
if ctx.GlobalIsSet(IstanbulBlockPeriodFlag.Name) {
cfg.Istanbul.BlockPeriod = ctx.GlobalUint64(IstanbulBlockPeriodFlag.Name)
}
if ctx.GlobalIsSet(IstanbulLookbackWindowFlag.Name) {
cfg.Istanbul.LookbackWindow = ctx.GlobalUint64(IstanbulLookbackWindowFlag.Name)
}
if ctx.GlobalIsSet(IstanbulProposerPolicyFlag.Name) {
cfg.Istanbul.ProposerPolicy = istanbul.ProposerPolicy(ctx.GlobalUint64(IstanbulProposerPolicyFlag.Name))
}
Expand Down
6 changes: 6 additions & 0 deletions consensus/istanbul/backend/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,11 @@ func (sb *Backend) EpochSize() uint64 {
return sb.config.Epoch
}

// Returns the size of the lookback window for calculating uptime (in blocks)
func (sb *Backend) LookbackWindow() uint64 {
return sb.config.LookbackWindow
}

// Finalize runs any post-transaction state modifications (e.g. block rewards)
// and assembles the final block.
//
Expand All @@ -484,6 +489,7 @@ func (sb *Backend) Finalize(chain consensus.ChainReader, header *types.Header, s
state.RevertToSnapshot(snapshot)
}

sb.logger.Trace("Finalizing", "block", header.Number.Uint64(), "epochSize", sb.config.Epoch)
if istanbul.IsLastBlockOfEpoch(header.Number.Uint64(), sb.config.Epoch) {
snapshot = state.Snapshot()
err = sb.distributeEpochPaymentsAndRewards(header, state)
Expand Down
5 changes: 4 additions & 1 deletion consensus/istanbul/backend/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,10 @@ func getGenesisAndKeys(n int, isFullChain bool) (*core.Genesis, []*ecdsa.Private
genesis.Config.FullHeaderChainAvailable = false
}
// force enable Istanbul engine
genesis.Config.Istanbul = &params.IstanbulConfig{}
genesis.Config.Istanbul = &params.IstanbulConfig{
Epoch: 10,
LookbackWindow: 2,
}
genesis.Config.Ethash = nil
genesis.Difficulty = defaultDifficulty
genesis.Nonce = emptyNonce.Uint64()
Expand Down
39 changes: 33 additions & 6 deletions consensus/istanbul/backend/pos.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
package backend

import (
"errors"
"math/big"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/consensus/istanbul"
"github.com/ethereum/go-ethereum/contract_comm"
"github.com/ethereum/go-ethereum/contract_comm/currency"
Expand All @@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/contract_comm/gold_token"
"github.com/ethereum/go-ethereum/contract_comm/validators"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
Expand Down Expand Up @@ -78,11 +79,37 @@ func (sb *Backend) distributeEpochPaymentsAndRewards(header *types.Header, state
}

func (sb *Backend) updateValidatorScores(header *types.Header, state *state.StateDB, valSet []istanbul.Validator) error {
for _, val := range valSet {
// TODO: Use actual uptime metric.
// 1.0 in fixidity
uptime := math.BigPow(10, 24)
sb.logger.Info("Updating validator score for address", "address", val.Address(), "uptime", uptime.String())
epoch := istanbul.GetEpochNumber(header.Number.Uint64(), sb.EpochSize())
logger := sb.logger.New("func", "Backend.updateValidatorScores", "blocknum", header.Number.Uint64(), "epoch", epoch, "epochsize", sb.EpochSize(), "window", sb.LookbackWindow())
sb.logger.Trace("Updating validator scores")

// The denominator is the (last block - first block + 1) of the val score tally window
denominator := istanbul.GetValScoreTallyLastBlockNumber(epoch, sb.EpochSize()) - istanbul.GetValScoreTallyFirstBlockNumber(epoch, sb.EpochSize(), sb.LookbackWindow()) + 1

// get all the uptimes for this epoch
// note(@gakonst): `db` _might_ be possible to be replaced with `sb.db`,
// but I believe it's a different database handle
bc := sb.chain.(*core.BlockChain)
db := bc.GetDatabase()
uptimes := rawdb.ReadAccumulatedEpochUptime(db, epoch)
if uptimes == nil {
logger.Error("no accumulated uptimes found, will not update validator scores")
return errors.New("no accumulated uptimes found, will not update validator scores")
}

for i, val := range valSet {
scoreTally := uptimes[i].ScoreTally
logger = logger.New("scoreTally", scoreTally, "denominator", denominator, "index", i, "address", val.Address())
numerator := big.NewInt(0).Mul(big.NewInt(int64(uptimes[i].ScoreTally)), params.Fixidity1)
uptime := big.NewInt(0).Div(numerator, big.NewInt(int64(denominator)))

if scoreTally > denominator {
logger.Error("ScoreTally exceeds max possible")
// 1.0 in fixidity
uptime = params.Fixidity1
}

logger.Trace("Updating validator score", "uptime", uptime)
err := validators.UpdateValidatorScore(header, state, val.Address(), uptime)
if err != nil {
return err
Expand Down
2 changes: 2 additions & 0 deletions consensus/istanbul/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Config struct {
BlockPeriod uint64 `toml:",omitempty"` // Default minimum difference between two consecutive block's timestamps in second
ProposerPolicy ProposerPolicy `toml:",omitempty"` // The policy for proposer selection
Epoch uint64 `toml:",omitempty"` // The number of blocks after which to checkpoint and reset the pending votes
LookbackWindow uint64 `toml:",omitempty"` // The window of blocks in which a validator is forgived from voting
ValidatorEnodeDBPath string `toml:",omitempty"` // The location for the validator enodes DB

// Proxy Configs
Expand All @@ -51,6 +52,7 @@ var DefaultConfig = &Config{
BlockPeriod: 1,
ProposerPolicy: ShuffledRoundRobin,
Epoch: 30000,
LookbackWindow: 12,
ValidatorEnodeDBPath: "validatorenodes",
Proxy: false,
Proxied: false,
Expand Down
9 changes: 9 additions & 0 deletions consensus/istanbul/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ import (
"github.com/ethereum/go-ethereum/rlp"
)

type Uptime struct {
ScoreTally uint64
LastSignedBlock uint64
}

func (u *Uptime) String() string {
return fmt.Sprintf("Uptime { scoreTally: %v, lastBlock: %v}", u.ScoreTally, u.LastSignedBlock)
}

// Proposal supports retrieving height and serialized block to be used during Istanbul consensus.
type Proposal interface {
// Number retrieves the sequence number of this proposal.
Expand Down
31 changes: 28 additions & 3 deletions consensus/istanbul/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,19 @@ func IsLastBlockOfEpoch(number uint64, epochSize uint64) bool {
return GetNumberWithinEpoch(number, epochSize) == epochSize
}

func IsFirstBlockOfEpoch(number uint64, epochSize uint64) bool {
return GetNumberWithinEpoch(number, epochSize) == 1
}

// Retrieves the epoch number given the block number.
// There is a special case if the number == 0 (the genesis block). That block will be in the
// 1st epoch.
func GetEpochNumber(number uint64, epochSize uint64) uint64 {
if number == 0 {
return 0
epochNumber := number / epochSize
if IsLastBlockOfEpoch(number, epochSize) {
return epochNumber
} else {
return (number / epochSize) + 1
return epochNumber + 1
}
}

Expand All @@ -108,6 +113,26 @@ func GetEpochLastBlockNumber(epochNumber uint64, epochSize uint64) uint64 {
return firstBlockNum + (epochSize - 1)
}

func GetValScoreTallyFirstBlockNumber(epochNumber uint64, epochSize uint64, lookbackWindowSize uint64) uint64 {
// We need to wait for the completion of the first window with the start window's block being the
// 2nd block of the epoch, before we start tallying the validator score for epoch "epochNumber".
// We can't include the epoch's first block since it's aggregated parent seals
// is for the previous epoch's valset.

epochFirstBlock, _ := GetEpochFirstBlockNumber(epochNumber, epochSize)
return epochFirstBlock + 1 + (lookbackWindowSize - 1)
}

func GetValScoreTallyLastBlockNumber(epochNumber uint64, epochSize uint64) uint64 {
// We stop tallying for epoch "epochNumber" at the second to last block of that epoch.
// We can't include that epoch's last block as part of the tally because the epoch val score is calculated
// using a tally that is updated AFTER a block is finalized.
// Note that it's possible to count up to the last block of the epoch, but it's much harder to implement
// than couting up to the second to last one.

return GetEpochLastBlockNumber(epochNumber, epochSize) - 1
}

func ValidatorSetDiff(oldValSet []ValidatorData, newValSet []ValidatorData) ([]ValidatorData, *big.Int) {
valSetMap := make(map[common.Address]bool)
oldValSetIndices := make(map[common.Address]int)
Expand Down
105 changes: 102 additions & 3 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"io"
"math"
"math/big"
mrand "math/rand"
"sync"
Expand All @@ -31,6 +32,7 @@ import (
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/common/prque"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/istanbul"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -58,6 +60,7 @@ var (
const (
bodyCacheLimit = 256
blockCacheLimit = 256
uptimeCacheLimit = 16
receiptsCacheLimit = 32
maxFutureBlocks = 256
maxTimeFutureBlocks = 30
Expand Down Expand Up @@ -122,6 +125,7 @@ type BlockChain struct {
receiptsCache *lru.Cache // Cache for the most recent receipts per block
blockCache *lru.Cache // Cache for the most recent entire blocks
futureBlocks *lru.Cache // future blocks are blocks added for later processing
uptimeCache *lru.Cache // Cache for the most recent uptimes

quit chan struct{} // blockchain quit channel
running int32 // running must be called atomically
Expand Down Expand Up @@ -153,6 +157,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
bodyRLPCache, _ := lru.New(bodyCacheLimit)
receiptsCache, _ := lru.New(receiptsCacheLimit)
blockCache, _ := lru.New(blockCacheLimit)
uptimeCache, _ := lru.New(uptimeCacheLimit)
futureBlocks, _ := lru.New(maxFutureBlocks)
badBlocks, _ := lru.New(badBlockLimit)

Expand All @@ -168,6 +173,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
bodyRLPCache: bodyRLPCache,
receiptsCache: receiptsCache,
blockCache: blockCache,
uptimeCache: uptimeCache,
futureBlocks: futureBlocks,
engine: engine,
vmConfig: vmConfig,
Expand Down Expand Up @@ -215,6 +221,11 @@ func (bc *BlockChain) GetVMConfig() *vm.Config {
return &bc.vmConfig
}

// GetDatabase returns the block chain's database
func (bc *BlockChain) GetDatabase() ethdb.Database {
return bc.db
}

// loadLastState loads the last known chain state from the database. This method
// assumes that the chain manager mutex is held.
func (bc *BlockChain) loadLastState() error {
Expand Down Expand Up @@ -937,19 +948,107 @@ func (bc *BlockChain) WriteBlockWithoutState(block *types.Block, td *big.Int) (e
return nil
}

// https://stackoverflow.com/questions/19105791/is-there-a-big-bitcount/32702348#32702348
func bitCount(n *big.Int) int {
count := 0
for _, v := range n.Bits() {
count += popcount(uint64(v))
}
return count
}

// Straight and simple C to Go translation from https://en.wikipedia.org/wiki/Hamming_weight
func popcount(x uint64) int {
const (
m1 = 0x5555555555555555 //binary: 0101...
m2 = 0x3333333333333333 //binary: 00110011..
m4 = 0x0f0f0f0f0f0f0f0f //binary: 4 zeros, 4 ones ...
h01 = 0x0101010101010101 //the sum of 256 to the power of 0,1,2,3...
)
x -= (x >> 1) & m1 //put count of each 2 bits into those 2 bits
x = (x & m2) + ((x >> 2) & m2) //put count of each 4 bits into those 4 bits
x = (x + (x >> 4)) & m4 //put count of each 8 bits into those 8 bits
return int((x * h01) >> 56) //returns left 8 bits of x + (x<<8) + (x<<16) + (x<<24) + ...
}

func updateUptime(uptime []istanbul.Uptime, blockNumber uint64, bitmap *big.Int, window uint64, epochNum uint64, epochSize uint64) []istanbul.Uptime {
var validatorsSizeUpperBound uint64
if len(uptime) == 0 {
// The number of validators is upper bounded by 3/2 of the number of 1s in the bitmap
// We multiply by 2 just to be extra cautious of off-by-one errors.
validatorsSizeUpperBound = uint64(math.Ceil(float64(bitCount(bitmap)) * 2))
uptime = make([]istanbul.Uptime, validatorsSizeUpperBound)
}

valScoreTallyFirstBlockNum := istanbul.GetValScoreTallyFirstBlockNumber(epochNum, epochSize, window)
valScoreTallyLastBlockNum := istanbul.GetValScoreTallyLastBlockNumber(epochNum, epochSize)

// signedBlockWindowLastBlockNum is just the previous block
signedBlockWindowLastBlockNum := blockNumber - 1
signedBlockWindowFirstBlockNum := signedBlockWindowLastBlockNum - (window - 1)

for i := 0; i < len(uptime); i++ {
if bitmap.Bit(i) == 1 {
// update their latest signed block
uptime[i].LastSignedBlock = blockNumber - 1
}

// If we are within the validator uptime tally window, then update the validator's score if its last signed block is within
// the lookback window
if valScoreTallyFirstBlockNum <= blockNumber && blockNumber <= valScoreTallyLastBlockNum {
lastSignedBlock := uptime[i].LastSignedBlock

// Note that the second condition in the if condition is not necessary. But it does
// make the logic easier to understand. (e.g. it's checking is lastSignedBlock is within
// the range [signedBlockWindowFirstBlockNum, signedBlockWindowLastBlockNum])
if signedBlockWindowFirstBlockNum <= lastSignedBlock && lastSignedBlock <= signedBlockWindowLastBlockNum {
uptime[i].ScoreTally++
}
}
}
return uptime
}

// WriteBlockWithState writes the block and all associated state to the database.
func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) {
bc.wg.Add(1)
defer bc.wg.Done()

// Make sure no inconsistent state is leaked during insertion
bc.mu.Lock()
defer bc.mu.Unlock()

// We are going to update the uptime tally.
if bc.engine.Protocol().Name == "istanbul" {
// The epoch's first block's aggregated parent signatures is for the previous epoch's valset.
// We can ignore updating the tally for that block.
if !istanbul.IsFirstBlockOfEpoch(block.NumberU64(), bc.chainConfig.Istanbul.Epoch) {
epochNum := istanbul.GetEpochNumber(block.NumberU64(), bc.chainConfig.Istanbul.Epoch)

// Get the uptime scores
uptime := rawdb.ReadAccumulatedEpochUptime(bc.db, epochNum)

// Get the bitmap from the previous block
extra, err := types.ExtractIstanbulExtra(block.Header())
if err != nil {
log.Error("Unable to extrace istanbul extra", "func", "WriteBlockWithState", "blocknum", block.NumberU64(), "epoch", epochNum)
return NonStatTy, errors.New("could not extract block header extra")
}
signedValidatorsBitmap := extra.ParentAggregatedSeal.Bitmap

// Update the uptime scores
uptime = updateUptime(uptime, block.NumberU64(), signedValidatorsBitmap, bc.chainConfig.Istanbul.LookbackWindow, epochNum, bc.chainConfig.Istanbul.Epoch)

// Write the new uptime scores
rawdb.WriteAccumulatedEpochUptime(bc.db, epochNum, uptime)
}
}

// Calculate the total difficulty of the block
ptd := bc.GetTd(block.ParentHash(), block.NumberU64()-1)
if ptd == nil {
return NonStatTy, consensus.ErrUnknownAncestor
}
// Make sure no inconsistent state is leaked during insertion
bc.mu.Lock()
defer bc.mu.Unlock()

currentBlock := bc.CurrentBlock()
localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
Expand Down
Loading

0 comments on commit 3ecd94a

Please sign in to comment.