Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature block finalize #1324

Merged
merged 88 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
88 commits
Select commit Hold shift + click to select a range
efaac8b
add consensus finalizer interface
bysomeone Jul 4, 2023
d89fad9
update finalize interface
bysomeone Aug 22, 2023
cec2fa8
add snowman consensus engine
bysomeone Aug 30, 2023
9bbdfa0
merge upstream master
bysomeone Sep 22, 2023
ce53b1f
add avlanchego package
bysomeone Oct 11, 2023
decac75
update snowman engine
bysomeone Nov 8, 2023
fe776d2
add snow config construct
bysomeone Nov 14, 2023
38ab01d
add blank blockchain vm
bysomeone Nov 14, 2023
7fcc2a0
add snowman finalizer
bysomeone Nov 15, 2023
e2e00a6
add chain33 vm implementing snowman.ChainVM interface
bysomeone Nov 17, 2023
fda0955
add validator set wrapper
bysomeone Dec 5, 2023
29a1a26
add consensus finalizer initialize, message subcrible
bysomeone Dec 11, 2023
f2033cd
refactor snowman finalizer
bysomeone Dec 11, 2023
c3ba69a
add perfer and accept block event
bysomeone Dec 12, 2023
b8cf468
handle block finalize events in blockchain moudle
bysomeone Dec 25, 2023
e67688c
integrate snowman validator with libp2p peers
bysomeone Dec 25, 2023
3047e3f
update snowman config
bysomeone Jan 4, 2024
a7e6c55
decrease peer info refresh frequency
bysomeone Jan 4, 2024
4d1cb38
add snowman message sender and handler
bysomeone Jan 5, 2024
3c78f38
integrate snowman communication protocol in dht
bysomeone Jan 8, 2024
0bf08a4
add snowball parameters config
bysomeone Jan 8, 2024
5a2bb5f
adapt to grpc version update
bysomeone Jan 9, 2024
0eab4ca
update dependency btcd/btcec to v2
bysomeone Jan 10, 2024
9b2fbb4
merge upstream master
bysomeone Jan 10, 2024
7906244
mod:update vulnerable dep
bysomeone Jan 10, 2024
2988643
disable exec local tx log
bysomeone Jan 11, 2024
552b3af
use zero len []byte instead of nil []byte
bysomeone Jan 12, 2024
c93fcc1
set initial finalized block
bysomeone Jan 12, 2024
f0ef48c
rename finalize to snowman
bysomeone Jan 12, 2024
4386e3f
check if nil result when get block by hash
bysomeone Jan 12, 2024
73858c0
update log
bysomeone Jan 12, 2024
da65e6c
add console log for snowman engine
bysomeone Jan 12, 2024
61648a1
calc block hash after block execute
bysomeone Jan 15, 2024
93c6e2a
update snowman build block logic
bysomeone Jan 15, 2024
fe2dcf8
fix peer ID converting
bysomeone Jan 22, 2024
0216ff3
add finalized block info to peer
bysomeone Jan 22, 2024
f513cfa
fix block finalizer initial
bysomeone Jan 25, 2024
e3c8070
update net peer command with finalized info return
bysomeone Jan 25, 2024
618348b
update p2p peer info refresh protocol
bysomeone Jan 25, 2024
8958017
handle snowman message synchronously
bysomeone Jan 26, 2024
e876f52
update log time format with ms display
bysomeone Jan 26, 2024
591abaa
fix accept block disorder
bysomeone Jan 26, 2024
f746f8e
fix set finalize block info to db
bysomeone Jan 26, 2024
943fadb
update blockchain peer list
bysomeone Feb 1, 2024
9fad7ec
replace atomic with rwlock
bysomeone Feb 2, 2024
93fc21b
wait finalize start block when boot
bysomeone Feb 21, 2024
799dc46
update snowman start routine
bysomeone Feb 22, 2024
1414f04
add block finalize fork
bysomeone Feb 23, 2024
bb12161
handle add block event synchronously
bysomeone Mar 1, 2024
ca7e053
filter out peers with height less than finalized
bysomeone Mar 1, 2024
1aee9ac
fix handle get block from peer event
bysomeone Mar 12, 2024
29e4dff
update package name to snow
bysomeone Mar 13, 2024
3084c9b
add more log details
bysomeone Mar 13, 2024
7c4a353
fix config MinLtBlockSize unit conversion
bysomeone Mar 13, 2024
33267a2
update log
bysomeone Mar 15, 2024
85327f3
handle add block error return
bysomeone Apr 18, 2024
46b12ac
add snowman engine reset function
bysomeone Apr 19, 2024
7c751c9
* make sure connected block height higher than finalized height
bysomeone Apr 19, 2024
a2692e8
add finalize config for blockchain
bysomeone Apr 24, 2024
93e31f6
use libp2p peer id as snow engine node id
bysomeone Apr 25, 2024
25b1b0c
[[FEAT]] import avalanche snowman engine
bysomeone Apr 25, 2024
bf2d521
Merge remote-tracking branch 'upstream/master' into feat-block-finalize
bysomeone May 7, 2024
945997c
add block finalizer unit test
bysomeone May 8, 2024
feabc26
add unit test
bysomeone May 8, 2024
d66f10d
rpc:add get finalized block interface
bysomeone May 9, 2024
288306c
fix test
bysomeone May 10, 2024
068a214
fmt code
bysomeone May 10, 2024
c40ab88
fix test with json rpc 2.0
bysomeone May 10, 2024
6984d82
update avalanche package for removing cgo dependency
bysomeone May 11, 2024
6319ef3
add more test
bysomeone May 11, 2024
55d7dc4
test:fix datarace
bysomeone May 12, 2024
48ac907
handle accept finalized block inorder
bysomeone May 16, 2024
d67a164
add parent id for snow block
bysomeone May 16, 2024
1128d90
increase refresh peer info interval
bysomeone May 16, 2024
293e0a0
add finalizer reset method
bysomeone May 18, 2024
14c0279
add finalizer lazy start when node syncing state
bysomeone May 19, 2024
c79b8d9
add cache for block fetching from snowman validator
bysomeone May 19, 2024
cd672cb
test:fix datarace
bysomeone May 24, 2024
35f6c82
fix dht repeated closing of stream
bysomeone May 24, 2024
54980d7
mod:update avalanche dependency
bysomeone May 25, 2024
2c1f280
test:fix 64-bit unaligned in arch 386
bysomeone May 25, 2024
c9b4079
test:regenerate chain33 client mock file
bysomeone May 26, 2024
e9fa15c
snowman:reject block in side chain fetched from peers
bysomeone May 27, 2024
b7fb632
snowman:update doc
bysomeone May 28, 2024
8105570
mod:specify btcd package version 0.22.3
bysomeone May 28, 2024
55607da
fix test
bysomeone May 28, 2024
5c2eeea
mod:update avalanche package
bysomeone May 29, 2024
cf859e9
test:fix net address conflict, use random listen port(#1284)
bysomeone May 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,3 @@ jobs:
run: go test ./... -covermode=atomic
env:
GOARCH: 386

259 changes: 259 additions & 0 deletions blockchain/blockfinalize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
package blockchain

import (
"encoding/hex"
"github.com/33cn/chain33/queue"
"github.com/33cn/chain33/types"
"sync"
"sync/atomic"
"time"
)

var (
snowChoiceKey = []byte("blockchain-snowchoice")
)

type finalizer struct {
chain *BlockChain
choice types.SnowChoice
lock sync.RWMutex
healthNotify chan struct{}
resetRunning atomic.Bool
}

func (f *finalizer) Init(chain *BlockChain) {

f.healthNotify = make(chan struct{}, 1)
f.chain = chain
raw, err := chain.blockStore.db.Get(snowChoiceKey)

if err == nil {
err = types.Decode(raw, &f.choice)
if err != nil {
chainlog.Error("newFinalizer", "decode err", err)
panic(err)
}
chainlog.Info("newFinalizer", "height", f.choice.Height, "hash", hex.EncodeToString(f.choice.Hash))
go f.healthCheck()
} else if chain.client.GetConfig().GetModuleConfig().Consensus.Finalizer != "" {
f.choice.Height = chain.cfg.BlockFinalizeEnableHeight
chainlog.Info("newFinalizer", "enableHeight", f.choice.Height, "gapHeight", chain.cfg.BlockFinalizeGapHeight)
go f.waitFinalizeStartBlock(f.choice.Height)
}
go f.lazyStart(chain.cfg.BlockFinalizeGapHeight, MaxRollBlockNum)
}

// 基于最大区块回滚深度, 快速收敛, 主要针对同步节点, 减少历史数据共识流程
func (f *finalizer) lazyStart(gapHeight, maxRollbackNum int64) {

ticker := time.NewTicker(time.Minute * 2)
minPeerCount := 10
defer ticker.Stop()
for {
select {

case <-f.chain.quit:
return
case <-ticker.C:
finalized, _ := f.getLastFinalized()
peerNum := f.chain.GetPeerCount()
height := f.chain.GetBlockHeight() - gapHeight
// 连接节点过少||未达到使能高度, 等待连接及区块同步
if finalized >= height || peerNum < minPeerCount {
chainlog.Debug("lazyStart wait", "peerNum", peerNum,
"finalized", finalized, "height", height)
continue
}

maxPeerHeight := f.chain.GetPeerMaxBlkHeight()
// 已经最终化高度在回滚范围内, 无需加速
if finalized+maxRollbackNum > maxPeerHeight {
chainlog.Debug("lazyStart return", "peerNum", peerNum, "finalized", finalized,
"maxHeight", maxPeerHeight)
return
}

peers := f.chain.getActivePeersByHeight(height + maxRollbackNum)
chainlog.Debug("lazyStart peer", "peerNum", peerNum, "peersLen", len(peers),
"finalized", finalized, "height", height)
// 超过半数节点高度超过该高度, 说明当前链回滚最低高度大于height, 可设为已最终化高度
if len(peers) < peerNum/2 {
continue
}

detail, err := f.chain.GetBlock(height)
if err != nil {
chainlog.Error("lazyStart err", "height", height, "get block err", err)
continue
}
_ = f.reset(height, detail.GetBlock().Hash(f.chain.client.GetConfig()))
}
}

}

func (f *finalizer) healthCheck() {

ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
healthy := false
for {
select {

case <-f.chain.quit:
return
case <-f.healthNotify:
healthy = true
case <-ticker.C:
maxPeerHeight := f.chain.GetPeerMaxBlkHeight()
// 节点高度落后较多情况不处理, 等待同步
height := f.chain.GetBlockHeight()
if height < maxPeerHeight-128 || healthy {
chainlog.Debug("healthCheck not sync", "healthy", healthy, "height", height, "maxHeight", maxPeerHeight)
healthy = false
continue
}
finalized, hash := f.getLastFinalized()
chainlog.Debug("healthCheck timeout", "lastFinalize", finalized,
"hash", hex.EncodeToString(hash), "chainHeight", height)
if finalized >= height {
continue
}
// 重新设置高度, 哈希值
detail, err := f.chain.GetBlock(finalized)
if err != nil {
chainlog.Error("finalizer tiemout", "height", finalized, "get block err", err)
continue
}
_ = f.reset(finalized, detail.GetBlock().Hash(f.chain.client.GetConfig()))
}
}
}

const defaultFinalizeGapHeight = 128

func (f *finalizer) waitFinalizeStartBlock(beginHeight int64) {

waitHeight := f.chain.cfg.BlockFinalizeGapHeight
for f.chain.blockStore.Height() < beginHeight+waitHeight {
time.Sleep(time.Second * 5)
}

detail, err := f.chain.GetBlock(beginHeight)
if err != nil {
chainlog.Error("waitFinalizeStartBlock", "height", beginHeight, "waitHeight", waitHeight, "get block err", err)
panic(err)
}
go f.healthCheck()
_ = f.setFinalizedBlock(detail.GetBlock().Height, detail.GetBlock().Hash(f.chain.client.GetConfig()), false)

}

func (f *finalizer) snowmanPreferBlock(msg *queue.Message) {
//req := (msg.Data).(*types.ReqBytes)
return

}

func (f *finalizer) snowmanAcceptBlock(msg *queue.Message) {

req := (msg.Data).(*types.SnowChoice)
chainlog.Debug("snowmanAcceptBlock", "height", req.Height, "hash", hex.EncodeToString(req.Hash))

// 已经最终化区块不在当前最佳链中, 即可能在侧链上, 最终化记录不更新
if !f.chain.bestChain.HaveBlock(req.GetHash(), req.GetHeight()) {
chainHeight := f.chain.bestChain.Height()
chainlog.Debug("snowmanAcceptBlock not in bestChain", "height", req.Height,
"hash", hex.EncodeToString(req.GetHash()), "chainHeight", chainHeight)
if f.resetRunning.CompareAndSwap(false, true) {
go f.resetEngine(chainHeight, req, time.Second*10)
}
return
}

err := f.setFinalizedBlock(req.GetHeight(), req.GetHash(), true)
if err == nil {
f.healthNotify <- struct{}{}
}
}

const consensusTopic = "consensus"

func (f *finalizer) resetEngine(chainHeight int64, sc *types.SnowChoice, duration time.Duration) {

defer f.resetRunning.Store(false)
ticker := time.NewTicker(duration)
defer ticker.Stop()
for {

select {

case <-f.chain.quit:
return

case <-ticker.C:

currHeight := f.chain.bestChain.Height()
if f.chain.bestChain.HaveBlock(sc.GetHash(), sc.GetHeight()) {
chainlog.Debug("resetEngine accept", "chainHeight", chainHeight,
"currHeight", currHeight, "sc.height", sc.GetHeight(), "sc.hash", hex.EncodeToString(sc.GetHash()))
return
}
// 最终化区块不在主链上且主链高度正常增长, 重置最终化引擎, 尝试对该高度重新共识
if currHeight > chainHeight && currHeight > sc.GetHeight()+12 {
chainlog.Debug("resetEngine reject", "chainHeight", chainHeight,
"currHeight", currHeight, "sc.height", sc.GetHeight(), "sc.hash", hex.EncodeToString(sc.GetHash()))
_ = f.chain.client.Send(queue.NewMessage(types.EventSnowmanResetEngine, consensusTopic, types.EventForFinalizer, nil), true)
return
}
}
}
}

func (f *finalizer) reset(height int64, hash []byte) error {

chainlog.Debug("finalizer reset", "height", height, "hash", hex.EncodeToString(hash))
err := f.setFinalizedBlock(height, hash, false)
if err != nil {
chainlog.Error("finalizer reset", "setFinalizedBlock err", err)
return err
}
err = f.chain.client.Send(queue.NewMessage(types.EventSnowmanResetEngine, consensusTopic, types.EventForFinalizer, nil), true)
if err != nil {
chainlog.Error("finalizer reset", "send msg err", err)
}
return err
}

func (f *finalizer) setFinalizedBlock(height int64, hash []byte, mustInorder bool) error {

chainlog.Debug("setFinalizedBlock", "height", height, "hash", hex.EncodeToString(hash))
f.lock.Lock()
defer f.lock.Unlock()
if mustInorder && height <= f.choice.Height {
chainlog.Debug("setFinalizedBlock disorder", "height", height, "currHeight", f.choice.Height)
return types.ErrInvalidParam
}
f.choice.Height = height
f.choice.Hash = hash
err := f.chain.blockStore.db.Set(snowChoiceKey, types.Encode(&f.choice))
if err != nil {
chainlog.Error("setFinalizedBlock", "height", height, "hash", hex.EncodeToString(hash), "err", err)
return err
}
return nil
}

func (f *finalizer) getLastFinalized() (int64, []byte) {
f.lock.RLock()
defer f.lock.RUnlock()
return f.choice.Height, f.choice.Hash
}

func (f *finalizer) snowmanLastChoice(msg *queue.Message) {

height, hash := f.getLastFinalized()
//chainlog.Debug("snowmanLastChoice", "height", height, "hash", hex.EncodeToString(hash))
msg.Reply(f.chain.client.NewMessage(msg.Topic,
types.EventSnowmanLastChoice, &types.SnowChoice{Height: height, Hash: hash}))
}
97 changes: 97 additions & 0 deletions blockchain/blockfinalize_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package blockchain

import (
dbm "github.com/33cn/chain33/common/db"
"github.com/33cn/chain33/queue"
"github.com/33cn/chain33/types"
"github.com/stretchr/testify/require"
"os"
"testing"
"time"
)

func newTestChain(t *testing.T) (*BlockChain, string) {

chain := InitEnv()
chain.client.GetConfig().GetModuleConfig().Consensus.Finalizer = "snowman"
dir, err := os.MkdirTemp("", "finalize")
require.Nil(t, err)
blockStoreDB := dbm.NewDB("blockchain", "leveldb", dir, 64)
chain.blockStore = NewBlockStore(chain, blockStoreDB, nil)
node := newPreGenBlockNode()
node.parent = nil
chain.bestChain = newChainView(node)
return chain, dir
}

func TestFinalizer(t *testing.T) {

chain, dir := newTestChain(t)
defer os.RemoveAll(dir)
defer chain.Close()
f := &finalizer{}
f.Init(chain)

hash := []byte("testhash")
choice := &types.SnowChoice{Height: 1, Hash: hash}
msg := queue.NewMessage(0, "test", 0, choice)

f.snowmanPreferBlock(msg)
f.snowmanAcceptBlock(msg)
height, hash1 := f.getLastFinalized()
require.Equal(t, 0, int(height))
require.Equal(t, 0, len(hash1))
node := &blockNode{
parent: chain.bestChain.Tip(),
height: 1,
hash: hash,
}
chain.bestChain.SetTip(node)
f.snowmanAcceptBlock(msg)
height, hash1 = f.getLastFinalized()
require.Equal(t, 1, int(height))
require.Equal(t, hash, hash1)

f.snowmanLastChoice(msg)
msg1, err := chain.client.Wait(msg)
require.Nil(t, err)
require.Equal(t, msg1.Data, choice)

}

func TestResetEngine(t *testing.T) {

chain, dir := newTestChain(t)
defer os.RemoveAll(dir)
defer chain.Close()
f := &finalizer{}
f.Init(chain)

hash := []byte("testhash")
choice := &types.SnowChoice{Height: 1, Hash: hash}
node := &blockNode{
parent: chain.bestChain.Tip(),
height: 14,
hash: hash,
}
chain.bestChain.SetTip(node)
chain.client.Sub(consensusTopic)
go f.resetEngine(1, choice, time.Second)
recvMsg := func() {
select {
case msg := <-chain.client.Recv():
require.Equal(t, "consensus", msg.Topic)
require.Equal(t, int64(types.EventSnowmanResetEngine), msg.ID)
require.Equal(t, int64(types.EventForFinalizer), msg.Ty)
case <-time.After(time.Second * 5):
t.Error("resetEngine timeout")
}
}
recvMsg()
f.reset(choice.Height, choice.Hash)
recvMsg()
height, hash1 := f.getLastFinalized()
require.Equal(t, choice.Height, height)
require.Equal(t, hash, hash1)

}
6 changes: 4 additions & 2 deletions blockchain/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package blockchain

import (
"bytes"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -865,7 +866,7 @@ func (bs *BlockStore) GetTdByBlockHash(hash []byte) (*big.Int, error) {
blocktd, err := bs.db.Get(calcHashToTdKey(hash))
if blocktd == nil || err != nil {
if err != dbm.ErrNotFoundInDb {
storeLog.Error("GetTdByBlockHash ", "error", err)
storeLog.Error("GetTdByBlockHash ", "hash", hex.EncodeToString(hash), "err", err)
}
return nil, types.ErrHashNotExist
}
Expand Down Expand Up @@ -944,7 +945,8 @@ func (bs *BlockStore) dbMaybeStoreBlock(blockdetail *types.BlockDetail, sync boo
} else {
parenttd, err := bs.GetTdByBlockHash(parentHash)
if err != nil {
chainlog.Error("dbMaybeStoreBlock GetTdByBlockHash", "height", height, "parentHash", common.ToHex(parentHash))
chainlog.Error("dbMaybeStoreBlock GetTdByBlockHash",
"height", height, "parentHash", common.ToHex(parentHash), "err", err)
return err
}
blocktd = new(big.Int).Add(difficulty, parenttd)
Expand Down
Loading
Loading