Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[consensus] Add check block in consensus #46

Merged
merged 26 commits into from
Sep 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions abcix/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ var (

type adaptedApp struct {
abciApp abci.Application
appHash []byte
events []abcix.Event
}

type AdaptedApp interface {
Expand Down Expand Up @@ -189,6 +191,7 @@ func (app *adaptedApp) DeliverBlock(req abcix.RequestDeliverBlock) (resp abcix.R
allEvents = append(allEvents, beginEvents...)
allEvents = append(allEvents, endEvents...)
resp.Events = allEvents
app.events = allEvents
return resp
}

Expand All @@ -198,12 +201,20 @@ func (app *adaptedApp) Commit() (resp abcix.ResponseCommit) {
// TODO: panic for debugging purposes. better error handling soon!
panic(err)
}
app.appHash = abciResp.Data
return
}

func (app *adaptedApp) CheckBlock(req abcix.RequestCheckBlock) abcix.ResponseCheckBlock {
// TODO: defer to consensus engine for now
panic("implement me")
respDeliverTx := make([]*abcix.ResponseDeliverTx, len(req.Txs))
for i := range respDeliverTx {
respDeliverTx[i] = &abcix.ResponseDeliverTx{Code: 0}
}
return abcix.ResponseCheckBlock{
AppHash: app.appHash,
DeliverTxs: respDeliverTx,
Events: app.events,
}
}

func (app *adaptedApp) ListSnapshots(req abcix.RequestListSnapshots) (resp abcix.ResponseListSnapshots) {
Expand Down
34 changes: 23 additions & 11 deletions abcix/example/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,17 +124,7 @@ func (app *Application) CreateBlock(
appHash := make([]byte, 8)
binary.PutVarint(appHash, size)

events := []types.Event{
{
Type: "create_block",
Attributes: []types.EventAttribute{
{Key: []byte("height"), Value: []byte{byte(req.Height)}},
{Key: []byte("valid tx"), Value: []byte{byte(len(txs))}},
{Key: []byte("invalid tx"), Value: []byte{byte(len(invalidTxs))}},
},
},
}
return types.ResponseCreateBlock{Txs: txs, InvalidTxs: invalidTxs, Hash: appHash, Events: events}
return types.ResponseCreateBlock{Txs: txs, InvalidTxs: invalidTxs, Hash: appHash, Events: nil}
}

// Combination of ABCI.BeginBlock, []ABCI.DeliverTx, and ABCI.EndBlock
Expand All @@ -154,6 +144,28 @@ func (app *Application) DeliverBlock(req types.RequestDeliverBlock) types.Respon
return ret
}

func (app *Application) CheckBlock(req types.RequestCheckBlock) types.ResponseCheckBlock {
// Tx looks like "[key1]=[value1],[key2]=[value2],[from],[gasprice]"
// e.g. "a=41,c=42,alice,100"
ret := types.ResponseCheckBlock{}

lastState := app.state
for _, tx := range req.Txs {
newState, gasUsed, err := executeTx(lastState, tx, true)
if err != nil {
panic("consensus failure: invalid tx found in DeliverBlock: " + err.Error())
}
lastState = newState
txResp := types.ResponseDeliverTx{GasUsed: gasUsed}
ret.DeliverTxs = append(ret.DeliverTxs, &txResp)
}
if len(lastState.AppHash) != 0 {
ret.AppHash = lastState.AppHash
}

return ret
}

func (app *Application) Commit() types.ResponseCommit {
// Using a memdb - just return the big endian size of the db
appHash := make([]byte, 8)
Expand Down
1,014 changes: 814 additions & 200 deletions abcix/types/types.pb.go

Large diffs are not rendered by default.

47 changes: 47 additions & 0 deletions consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,24 @@ func randState(nValidators int) (*State, []*validatorStub) {
return cs, vss
}

func randStateShouldCheckBlockFail(nValidators int) (*State, []*validatorStub) {
// Get State
state, privVals := randGenesisState(nValidators, false, 10)

vss := make([]*validatorStub, nValidators)

badxApp := &badxApp{}
cs := newState(state, privVals[0], badxApp)

for i := 0; i < nValidators; i++ {
vss[i] = newValidatorStub(privVals[i], int32(i))
}
// since cs1 starts at 1
incrementHeight(vss[1:]...)

return cs, vss
}

func randStateWithEvpool(nValidators int) (*State, []*validatorStub, *evidence.Pool) {
state, privVals := randGenesisState(nValidators, false, 10)

Expand Down Expand Up @@ -681,6 +699,24 @@ func ensureVote(voteCh <-chan tmpubsub.Message, height int64, round int32,
}
}

func ensurePrevoteWithNilBlock(voteCh <-chan tmpubsub.Message) {
select {
case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewVote event")
case msg := <-voteCh:
voteEvent, ok := msg.Data().(types.EventDataVote)
if !ok {
panic(fmt.Sprintf("expected a EventDataVote, got %T. Wrong subscription channel?",
msg.Data()))
}

vote := voteEvent.Vote
if len(vote.BlockID.Hash) != 0 {
panic("Expect vote with nil block")
}
}
}

func ensurePrecommitTimeout(ch <-chan tmpubsub.Message) {
select {
case <-time.After(ensureTimeout):
Expand Down Expand Up @@ -904,3 +940,14 @@ func newPersistentKVStore() abcix.Application {
func newPersistentKVStoreWithPath(dbDir string) abcix.Application {
return adapter.AdaptToABCIx(kvstore.NewPersistentKVStoreApplication(dbDir))
}

//------------------------------------
type badxApp struct {
abcix.BaseApplication
}

func (app *badxApp) CheckBlock(req abcix.RequestCheckBlock) abcix.ResponseCheckBlock {
return abcix.ResponseCheckBlock{
Code: 1,
}
}
7 changes: 7 additions & 0 deletions consensus/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,13 @@ func (app *CounterApplication) CheckTx(req abcix.RequestCheckTx) abcix.ResponseC
return abcix.ResponseCheckTx{Code: code.CodeTypeOK}
}

func (app *CounterApplication) CheckBlock(req abcix.RequestCheckBlock) abcix.ResponseCheckBlock {
return abcix.ResponseCheckBlock{
//TODO: uncomment
//AppHash: req.Header.AppHash,
}
}

func txAsUint64(tx []byte) uint64 {
tx8 := make([]byte, 8)
copy(tx8[len(tx8)-len(tx):], tx)
Expand Down
5 changes: 5 additions & 0 deletions consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -1739,6 +1739,11 @@ func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID p2p.ID) (add
return added, err
}

if err = cs.blockExec.CheckBlock(block); err != nil {
cs.Logger.Error("Error on CheckBlock", "err", err)
return false, err
}

cs.ProposalBlock = block
// NOTE: it's possible to receive complete proposal blocks for future rounds without having the proposal
cs.Logger.Info("Received complete proposal block", "height", cs.ProposalBlock.Height, "hash", cs.ProposalBlock.Hash())
Expand Down
17 changes: 17 additions & 0 deletions consensus/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1833,6 +1833,23 @@ func TestStateOutputVoteStats(t *testing.T) {

}

func TestStateCheckBlockFail(t *testing.T) {
cs, _ := randStateShouldCheckBlockFail(1)
height, round := cs.Height, cs.Round

cs.eventBus.Stop()
eventBus := types.NewEventBusWithBufferCapacity(0)
eventBus.SetLogger(log.TestingLogger().With("module", "events"))
cs.SetEventBus(eventBus)
eventBus.Start()

voteCh := subscribeUnBuffered(cs.eventBus, types.EventQueryVote)
// start round and wait for propose and prevote
startTestRound(cs, height, round)

ensurePrevoteWithNilBlock(voteCh)
}

// subscribe subscribes test client to the given query and returns a channel with cap = 1.
func subscribe(eventBus *types.EventBus, q tmpubsub.Query) <-chan tmpubsub.Message {
sub, err := eventBus.Subscribe(context.Background(), testSubscriber, q)
Expand Down
17 changes: 15 additions & 2 deletions proto/tendermint/abcix/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ message RequestDeliverBlock {
}

message RequestCheckBlock {
int64 height = 1;
bytes hash = 2;
tendermint.types.Header header = 3 [(gogoproto.nullable) = false];
LastCommitInfo last_commit_info = 4 [(gogoproto.nullable) = false];
repeated Evidence byzantine_validators = 5 [(gogoproto.nullable) = false];
repeated bytes txs = 6;
}

// lists available snapshots
Expand Down Expand Up @@ -250,8 +256,15 @@ message ResponseDeliverBlock {
}

message ResponseCheckBlock {
uint32 code = 1;
string codespace = 2;
uint32 code = 1;
bytes app_hash = 2;
repeated ResponseDeliverTx deliver_txs = 3;
repeated ValidatorUpdate validator_updates = 4
[(gogoproto.nullable) = false];
ConsensusParams consensus_param_updates = 5;
repeated Event events = 6
[(gogoproto.nullable) = false, (gogoproto.jsontag) = "events,omitempty"];
string codespace = 7;
}

message ResponseListSnapshots {
Expand Down
74 changes: 74 additions & 0 deletions state/execution.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package state

import (
"bytes"
"errors"
"fmt"
"time"

"github.com/tendermint/tendermint/crypto/merkle"

"github.com/gogo/protobuf/proto"
dbm "github.com/tendermint/tm-db"

abcix "github.com/tendermint/tendermint/abcix/types"
Expand Down Expand Up @@ -268,6 +273,60 @@ func (blockExec *BlockExecutor) Commit(
return res.Data, res.RetainHeight, err
}

func (blockExec *BlockExecutor) CheckBlock(block *types.Block) error {
commitInfo, byzVals := getBlockValidatorInfo(
block.Time,
block.Height,
block.LastCommit,
block.Evidence.Evidence,
blockExec.db,
)
pbh := block.Header.ToProto()
txs := make([][]byte, 0, len(block.Txs))
for _, tx := range block.Txs {
txs = append(txs, tx)
}

resp, err := blockExec.proxyApp.CheckBlockSync(abcix.RequestCheckBlock{
Height: block.Height,
Hash: block.Hash(),
Header: *pbh,
LastCommitInfo: commitInfo,
ByzantineValidators: byzVals,
Txs: txs,
})

if err != nil {
return err
}
if resp.Code != 0 {
return fmt.Errorf("application error during CheckBlock, code: %d", resp.Code)
}
for _, tx := range resp.DeliverTxs {
if tx.Code != 0 {
return fmt.Errorf("invalid transaction, code: %d", tx.Code)
}
}
resultHash := CheckBlockResponseResultHash(resp)
if !bytes.Equal(resultHash, block.Header.LastResultsHash.Bytes()) {
blockExec.logger.Error(
"resultHash mismatch. ResultHash in ResponseCheckBlock: %X\n ResultHash in block header: %X",
resultHash, block.Header.LastResultsHash,
)
return errors.New("resultHash mismatch")
}

if !bytes.Equal(resp.AppHash, block.Header.AppHash.Bytes()) {
blockExec.logger.Error(
"appHash mismatch. AppHash in ResponseCheckBlock: %X\n AppHash in block header: %X",
resp.AppHash, block.Header.AppHash,
)
return errors.New("appHash mismatch")
}

return err
}

//---------------------------------------------------------
// Helper functions for executing blocks and updating state

Expand Down Expand Up @@ -523,3 +582,18 @@ func ExecCommitBlock(
// ResponseCommit has no error or log, just data
return res.Data, nil
}

func CheckBlockResponseResultHash(resp *abcix.ResponseCheckBlock) []byte {
cbeBytes, err := proto.Marshal(&abcix.ResponseCheckBlock{
Events: resp.Events,
})
if err != nil {
panic(err)
}

// Build a Merkle tree of proto-encoded DeliverTx results and get a hash.
results := types.NewResults(resp.DeliverTxs)

// Build a Merkle tree out of the above 3 binary slices.
return merkle.HashFromByteSlices([][]byte{cbeBytes, results.Hash()})
}
Loading