Skip to content

Commit

Permalink
all: integrate the freezer with fast sync
Browse files Browse the repository at this point in the history
* all: freezer style syncing

core, eth, les, light: clean up freezer relative APIs

core, eth, les, trie, ethdb, light: clean a bit

core, eth, les, light: add unit tests

core, light: rewrite setHead function

core, eth: fix downloader unit tests

core: add receipt chain insertion test

core: use constant instead of hardcoding table name

core: fix rollback

core: fix setHead

core/rawdb: remove canonical block first and then iterate side chain

core/rawdb, ethdb: add hasAncient interface

eth/downloader: calculate ancient limit via cht first

core, eth, ethdb: lots of fixes

* eth/downloader: print ancient disable log only for fast sync
  • Loading branch information
rjl493456442 authored and karalabe committed Apr 25, 2019
1 parent 3b5f7e3 commit ebca031
Show file tree
Hide file tree
Showing 26 changed files with 1,071 additions and 314 deletions.
386 changes: 300 additions & 86 deletions core/blockchain.go

Large diffs are not rendered by default.

205 changes: 189 additions & 16 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package core

import (
"io/ioutil"
"math/big"
"math/rand"
"os"
"sync"
"testing"
"time"
Expand All @@ -32,7 +34,6 @@ import (
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/memorydb"
"github.com/ethereum/go-ethereum/params"
)

Expand Down Expand Up @@ -638,34 +639,63 @@ func TestFastVsFullChains(t *testing.T) {
if n, err := fast.InsertHeaderChain(headers, 1); err != nil {
t.Fatalf("failed to insert header %d: %v", n, err)
}
if n, err := fast.InsertReceiptChain(blocks, receipts); err != nil {
if n, err := fast.InsertReceiptChain(blocks, receipts, 0); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err)
}
// Freezer style fast import the chain.
frdir, err := ioutil.TempDir("", "")
if err != nil {
t.Fatalf("failed to create temp freezer dir: %v", err)
}
defer os.Remove(frdir)
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "")
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
gspec.MustCommit(ancientDb)
ancient, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil)
defer ancient.Stop()

if n, err := ancient.InsertHeaderChain(headers, 1); err != nil {
t.Fatalf("failed to insert header %d: %v", n, err)
}
if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(len(blocks)/2)); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err)
}
// Iterate over all chain data components, and cross reference
for i := 0; i < len(blocks); i++ {
num, hash := blocks[i].NumberU64(), blocks[i].Hash()

if ftd, atd := fast.GetTdByHash(hash), archive.GetTdByHash(hash); ftd.Cmp(atd) != 0 {
t.Errorf("block #%d [%x]: td mismatch: have %v, want %v", num, hash, ftd, atd)
t.Errorf("block #%d [%x]: td mismatch: fastdb %v, archivedb %v", num, hash, ftd, atd)
}
if antd, artd := ancient.GetTdByHash(hash), archive.GetTdByHash(hash); antd.Cmp(artd) != 0 {
t.Errorf("block #%d [%x]: td mismatch: ancientdb %v, archivedb %v", num, hash, antd, artd)
}
if fheader, aheader := fast.GetHeaderByHash(hash), archive.GetHeaderByHash(hash); fheader.Hash() != aheader.Hash() {
t.Errorf("block #%d [%x]: header mismatch: have %v, want %v", num, hash, fheader, aheader)
t.Errorf("block #%d [%x]: header mismatch: fastdb %v, archivedb %v", num, hash, fheader, aheader)
}
if anheader, arheader := ancient.GetHeaderByHash(hash), archive.GetHeaderByHash(hash); anheader.Hash() != arheader.Hash() {
t.Errorf("block #%d [%x]: header mismatch: ancientdb %v, archivedb %v", num, hash, anheader, arheader)
}
if fblock, ablock := fast.GetBlockByHash(hash), archive.GetBlockByHash(hash); fblock.Hash() != ablock.Hash() {
t.Errorf("block #%d [%x]: block mismatch: have %v, want %v", num, hash, fblock, ablock)
} else if types.DeriveSha(fblock.Transactions()) != types.DeriveSha(ablock.Transactions()) {
t.Errorf("block #%d [%x]: transactions mismatch: have %v, want %v", num, hash, fblock.Transactions(), ablock.Transactions())
} else if types.CalcUncleHash(fblock.Uncles()) != types.CalcUncleHash(ablock.Uncles()) {
t.Errorf("block #%d [%x]: uncles mismatch: have %v, want %v", num, hash, fblock.Uncles(), ablock.Uncles())
if fblock, arblock, anblock := fast.GetBlockByHash(hash), archive.GetBlockByHash(hash), ancient.GetBlockByHash(hash); fblock.Hash() != arblock.Hash() || anblock.Hash() != arblock.Hash() {
t.Errorf("block #%d [%x]: block mismatch: fastdb %v, ancientdb %v, archivedb %v", num, hash, fblock, anblock, arblock)
} else if types.DeriveSha(fblock.Transactions()) != types.DeriveSha(arblock.Transactions()) || types.DeriveSha(anblock.Transactions()) != types.DeriveSha(arblock.Transactions()) {
t.Errorf("block #%d [%x]: transactions mismatch: fastdb %v, ancientdb %v, archivedb %v", num, hash, fblock.Transactions(), anblock.Transactions(), arblock.Transactions())
} else if types.CalcUncleHash(fblock.Uncles()) != types.CalcUncleHash(arblock.Uncles()) || types.CalcUncleHash(anblock.Uncles()) != types.CalcUncleHash(arblock.Uncles()) {
t.Errorf("block #%d [%x]: uncles mismatch: fastdb %v, ancientdb %v, archivedb %v", num, hash, fblock.Uncles(), anblock, arblock.Uncles())
}
if freceipts, areceipts := rawdb.ReadReceipts(fastDb, hash, *rawdb.ReadHeaderNumber(fastDb, hash), fast.Config()), rawdb.ReadReceipts(archiveDb, hash, *rawdb.ReadHeaderNumber(archiveDb, hash), archive.Config()); types.DeriveSha(freceipts) != types.DeriveSha(areceipts) {
t.Errorf("block #%d [%x]: receipts mismatch: have %v, want %v", num, hash, freceipts, areceipts)
if freceipts, anreceipts, areceipts := rawdb.ReadReceipts(fastDb, hash, *rawdb.ReadHeaderNumber(fastDb, hash), fast.Config()), rawdb.ReadReceipts(ancientDb, hash, *rawdb.ReadHeaderNumber(ancientDb, hash), fast.Config()), rawdb.ReadReceipts(archiveDb, hash, *rawdb.ReadHeaderNumber(archiveDb, hash), fast.Config()); types.DeriveSha(freceipts) != types.DeriveSha(areceipts) {
t.Errorf("block #%d [%x]: receipts mismatch: fastdb %v, ancientdb %v, archivedb %v", num, hash, freceipts, anreceipts, areceipts)
}
}
// Check that the canonical chains are the same between the databases
for i := 0; i < len(blocks)+1; i++ {
if fhash, ahash := rawdb.ReadCanonicalHash(fastDb, uint64(i)), rawdb.ReadCanonicalHash(archiveDb, uint64(i)); fhash != ahash {
t.Errorf("block #%d: canonical hash mismatch: have %v, want %v", i, fhash, ahash)
t.Errorf("block #%d: canonical hash mismatch: fastdb %v, archivedb %v", i, fhash, ahash)
}
if anhash, arhash := rawdb.ReadCanonicalHash(ancientDb, uint64(i)), rawdb.ReadCanonicalHash(archiveDb, uint64(i)); anhash != arhash {
t.Errorf("block #%d: canonical hash mismatch: ancientdb %v, archivedb %v", i, anhash, arhash)
}
}
}
Expand Down Expand Up @@ -729,13 +759,40 @@ func TestLightVsFastVsFullChainHeads(t *testing.T) {
if n, err := fast.InsertHeaderChain(headers, 1); err != nil {
t.Fatalf("failed to insert header %d: %v", n, err)
}
if n, err := fast.InsertReceiptChain(blocks, receipts); err != nil {
if n, err := fast.InsertReceiptChain(blocks, receipts, 0); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err)
}
assert(t, "fast", fast, height, height, 0)
fast.Rollback(remove)
assert(t, "fast", fast, height/2, height/2, 0)

// Import the chain as a ancient-first node and ensure all pointers are updated
frdir, err := ioutil.TempDir("", "")
if err != nil {
t.Fatalf("failed to create temp freezer dir: %v", err)
}
defer os.Remove(frdir)
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "")
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
gspec.MustCommit(ancientDb)
ancient, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil)
defer ancient.Stop()

if n, err := ancient.InsertHeaderChain(headers, 1); err != nil {
t.Fatalf("failed to insert header %d: %v", n, err)
}
if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err)
}
assert(t, "ancient", ancient, height, height, 0)
ancient.Rollback(remove)
assert(t, "ancient", ancient, height/2, height/2, 0)
if frozen, err := ancientDb.Ancients(); err != nil || frozen != height/2+1 {
t.Fatalf("failed to truncate ancient store, want %v, have %v", height/2+1, frozen)
}

// Import the chain as a light node and ensure all pointers are updated
lightDb := rawdb.NewMemoryDatabase()
gspec.MustCommit(lightDb)
Expand Down Expand Up @@ -917,7 +974,7 @@ func TestLogRebirth(t *testing.T) {
var (
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
db = memorydb.New()
db = rawdb.NewMemoryDatabase()

// this code generates a log
code = common.Hex2Bytes("60606040525b7f24ec1d3ff24c2f6ff210738839dbc339cd45a5294d85c79361016243157aae7b60405180905060405180910390a15b600a8060416000396000f360606040526008565b00")
Expand Down Expand Up @@ -1039,7 +1096,7 @@ func TestSideLogRebirth(t *testing.T) {
var (
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
db = memorydb.New()
db = rawdb.NewMemoryDatabase()

// this code generates a log
code = common.Hex2Bytes("60606040525b7f24ec1d3ff24c2f6ff210738839dbc339cd45a5294d85c79361016243157aae7b60405180905060405180910390a15b600a8060416000396000f360606040526008565b00")
Expand Down Expand Up @@ -1563,6 +1620,122 @@ func TestLargeReorgTrieGC(t *testing.T) {
}
}

func TestBlockchainRecovery(t *testing.T) {
// Configure and generate a sample block chain
var (
gendb = rawdb.NewMemoryDatabase()
key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
address = crypto.PubkeyToAddress(key.PublicKey)
funds = big.NewInt(1000000000)
gspec = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{address: {Balance: funds}}}
genesis = gspec.MustCommit(gendb)
)
height := uint64(1024)
blocks, receipts := GenerateChain(gspec.Config, genesis, ethash.NewFaker(), gendb, int(height), nil)

// Import the chain as a ancient-first node and ensure all pointers are updated
frdir, err := ioutil.TempDir("", "")
if err != nil {
t.Fatalf("failed to create temp freezer dir: %v", err)
}
defer os.Remove(frdir)
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "")
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
gspec.MustCommit(ancientDb)
ancient, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil)

headers := make([]*types.Header, len(blocks))
for i, block := range blocks {
headers[i] = block.Header()
}
if n, err := ancient.InsertHeaderChain(headers, 1); err != nil {
t.Fatalf("failed to insert header %d: %v", n, err)
}
if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err)
}
ancient.Stop()

// Destroy head fast block manually
midBlock := blocks[len(blocks)/2]
rawdb.WriteHeadFastBlockHash(ancientDb, midBlock.Hash())

// Reopen broken blockchain again
ancient, _ = NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil)
defer ancient.Stop()
if num := ancient.CurrentBlock().NumberU64(); num != 0 {
t.Errorf("head block mismatch: have #%v, want #%v", num, 0)
}
if num := ancient.CurrentFastBlock().NumberU64(); num != midBlock.NumberU64() {
t.Errorf("head fast-block mismatch: have #%v, want #%v", num, midBlock.NumberU64())
}
if num := ancient.CurrentHeader().Number.Uint64(); num != midBlock.NumberU64() {
t.Errorf("head header mismatch: have #%v, want #%v", num, midBlock.NumberU64())
}
}

func TestIncompleteAncientReceiptChainInsertion(t *testing.T) {
// Configure and generate a sample block chain
var (
gendb = rawdb.NewMemoryDatabase()
key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
address = crypto.PubkeyToAddress(key.PublicKey)
funds = big.NewInt(1000000000)
gspec = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{address: {Balance: funds}}}
genesis = gspec.MustCommit(gendb)
)
height := uint64(1024)
blocks, receipts := GenerateChain(gspec.Config, genesis, ethash.NewFaker(), gendb, int(height), nil)

// Import the chain as a ancient-first node and ensure all pointers are updated
frdir, err := ioutil.TempDir("", "")
if err != nil {
t.Fatalf("failed to create temp freezer dir: %v", err)
}
defer os.Remove(frdir)
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "")
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
gspec.MustCommit(ancientDb)
ancient, _ := NewBlockChain(ancientDb, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil)
defer ancient.Stop()

headers := make([]*types.Header, len(blocks))
for i, block := range blocks {
headers[i] = block.Header()
}
if n, err := ancient.InsertHeaderChain(headers, 1); err != nil {
t.Fatalf("failed to insert header %d: %v", n, err)
}
// Abort ancient receipt chain insertion deliberately
ancient.terminateInsert = func(hash common.Hash, number uint64) bool {
if number == blocks[len(blocks)/2].NumberU64() {
return true
}
return false
}
previousFastBlock := ancient.CurrentFastBlock()
if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err == nil {
t.Fatalf("failed to insert receipt %d: %v", n, err)
}
if ancient.CurrentFastBlock().NumberU64() != previousFastBlock.NumberU64() {
t.Fatalf("failed to rollback ancient data, want %d, have %d", previousFastBlock.NumberU64(), ancient.CurrentFastBlock().NumberU64())
}
if frozen, err := ancient.db.Ancients(); err != nil || frozen != 1 {
t.Fatalf("failed to truncate ancient data")
}
ancient.terminateInsert = nil
if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err)
}
if ancient.CurrentFastBlock().NumberU64() != blocks[len(blocks)-1].NumberU64() {
t.Fatalf("failed to insert ancient recept chain after rollback")
}
}

// Benchmarks large blocks with value transfers to non-existing accounts
func benchmarkLargeNumberOfValueToNonexisting(b *testing.B, numTxs, numBlocks int, recipientFn func(uint64) common.Address, dataFn func(uint64) []byte) {
var (
Expand Down
64 changes: 40 additions & 24 deletions core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,47 +453,63 @@ func (hc *HeaderChain) SetCurrentHeader(head *types.Header) {
hc.currentHeaderHash = head.Hash()
}

// DeleteCallback is a callback function that is called by SetHead before
// each header is deleted.
type DeleteCallback func(ethdb.Writer, common.Hash, uint64)
type (
// UpdateHeadBlocksCallback is a callback function that is called by SetHead
// before head header is updated.
UpdateHeadBlocksCallback func(ethdb.KeyValueWriter, *types.Header)

// DeleteBlockContentCallback is a callback function that is called by SetHead
// before each header is deleted.
DeleteBlockContentCallback func(ethdb.KeyValueWriter, common.Hash, uint64)
)

// SetHead rewinds the local chain to a new head. Everything above the new head
// will be deleted and the new one set.
func (hc *HeaderChain) SetHead(head uint64, delFn DeleteCallback) {
height := uint64(0)

if hdr := hc.CurrentHeader(); hdr != nil {
height = hdr.Number.Uint64()
}
batch := hc.chainDb.NewBatch()
func (hc *HeaderChain) SetHead(head uint64, updateFn UpdateHeadBlocksCallback, delFn DeleteBlockContentCallback) {
var (
parentHash common.Hash
batch = hc.chainDb.NewBatch()
)
for hdr := hc.CurrentHeader(); hdr != nil && hdr.Number.Uint64() > head; hdr = hc.CurrentHeader() {
hash := hdr.Hash()
num := hdr.Number.Uint64()
hash, num := hdr.Hash(), hdr.Number.Uint64()

// Rewind block chain to new head.
parent := hc.GetHeader(hdr.ParentHash, num-1)
if parent == nil {
parent = hc.genesisHeader
}
parentHash = hdr.ParentHash
// Notably, since geth has the possibility for setting the head to a low
// height which is even lower than ancient head.
// In order to ensure that the head is always no higher than the data in
// the database(ancient store or active store), we need to update head
// first then remove the relative data from the database.
//
// Update head first(head fast block, head full block) before deleting the data.
if updateFn != nil {
updateFn(hc.chainDb, parent)
}
// Update head header then.
rawdb.WriteHeadHeaderHash(hc.chainDb, parentHash)

// Remove the relative data from the database.
if delFn != nil {
delFn(batch, hash, num)
}
// Rewind header chain to new head.
rawdb.DeleteHeader(batch, hash, num)
rawdb.DeleteTd(batch, hash, num)
rawdb.DeleteCanonicalHash(batch, num)

hc.currentHeader.Store(hc.GetHeader(hdr.ParentHash, hdr.Number.Uint64()-1))
}
// Roll back the canonical chain numbering
for i := height; i > head; i-- {
rawdb.DeleteCanonicalHash(batch, i)
hc.currentHeader.Store(parent)
hc.currentHeaderHash = parentHash
}
batch.Write()

// Clear out any stale content from the caches
hc.headerCache.Purge()
hc.tdCache.Purge()
hc.numberCache.Purge()

if hc.CurrentHeader() == nil {
hc.currentHeader.Store(hc.genesisHeader)
}
hc.currentHeaderHash = hc.CurrentHeader().Hash()

rawdb.WriteHeadHeaderHash(hc.chainDb, hc.currentHeaderHash)
}

// SetGenesis sets a new genesis block header for the chain
Expand Down
Loading

0 comments on commit ebca031

Please sign in to comment.