Skip to content

Commit

Permalink
Developer Tools: indexer block processor (#982)
Browse files Browse the repository at this point in the history
  • Loading branch information
shiqizng authored May 9, 2022
1 parent 58e68aa commit ce34b6f
Show file tree
Hide file tree
Showing 5 changed files with 254 additions and 0 deletions.
90 changes: 90 additions & 0 deletions processor/blockprocessor/block_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package blockprocessor

import (
"fmt"

"github.com/algorand/go-algorand/ledger"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/rpcs"
"github.com/algorand/indexer/processor"
)

type blockProcessor struct {
handler func(block *ledgercore.ValidatedBlock) error
nextRoundToProcess uint64
ledger *ledger.Ledger
}

// MakeProcessor creates a block processor
func MakeProcessor(ledger *ledger.Ledger, handler func(block *ledgercore.ValidatedBlock) error) (processor.Processor, error) {
if ledger == nil {
return nil, fmt.Errorf("MakeProcessor(): local ledger not initialized")
}
if handler != nil && ledger.Latest() == 0 {
blk, err := ledger.Block(0)
if err != nil {
return nil, fmt.Errorf("MakeProcessor() err: %w", err)
}
vb := ledgercore.MakeValidatedBlock(blk, ledgercore.StateDelta{})
err = handler(&vb)
if err != nil {
return nil, fmt.Errorf("MakeProcessor() handler err: %w", err)
}
}
return &blockProcessor{ledger: ledger, nextRoundToProcess: uint64(ledger.Latest() + 1), handler: handler}, nil
}

// Process a raw algod block
func (processor *blockProcessor) Process(blockCert *rpcs.EncodedBlockCert) error {
if blockCert == nil {
return fmt.Errorf("Process(): cannot process a nil block")
}
if uint64(blockCert.Block.Round()) != processor.nextRoundToProcess {
return fmt.Errorf("Process() invalid round blockCert.Block.Round(): %d processor.nextRoundToProcess: %d", blockCert.Block.Round(), processor.nextRoundToProcess)
}

blkeval, err := processor.ledger.StartEvaluator(blockCert.Block.BlockHeader, len(blockCert.Block.Payset), 0)
if err != nil {
return fmt.Errorf("Process() block eval err: %w", err)
}

paysetgroups, err := blockCert.Block.DecodePaysetGroups()
if err != nil {
return fmt.Errorf("Process() decode payset groups err: %w", err)
}

for _, group := range paysetgroups {
err = blkeval.TransactionGroup(group)
if err != nil {
return fmt.Errorf("Process() apply transaction group err: %w", err)
}
}

// validated block
vb, err := blkeval.GenerateBlock()
if err != nil {
return fmt.Errorf("Process() validated block err: %w", err)
}
// execute handler before writing to local ledger
if processor.handler != nil {
err = processor.handler(vb)
if err != nil {
return fmt.Errorf("Process() handler err: %w", err)
}
}
// write to ledger
err = processor.ledger.AddValidatedBlock(*vb, blockCert.Certificate)
if err != nil {
return fmt.Errorf("Process() add validated block err: %w", err)
}
processor.nextRoundToProcess = uint64(processor.ledger.Latest()) + 1
return nil
}

func (processor *blockProcessor) SetHandler(handler func(block *ledgercore.ValidatedBlock) error) {
processor.handler = handler
}

func (processor *blockProcessor) NextRoundToProcess() uint64 {
return processor.nextRoundToProcess
}
129 changes: 129 additions & 0 deletions processor/blockprocessor/block_processor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package blockprocessor_test

import (
"fmt"
"log"
"testing"

"github.com/algorand/go-algorand/agreement"
"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/ledger"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/rpcs"
block_processor "github.com/algorand/indexer/processor/blockprocessor"
"github.com/algorand/indexer/util"
"github.com/algorand/indexer/util/test"
"github.com/stretchr/testify/assert"
)

func TestProcess(t *testing.T) {
l := makeTestLedger(t, "local_ledger")
genesisBlock, err := l.Block(basics.Round(0))
assert.Nil(t, err)
// create processor
handler := func(vb *ledgercore.ValidatedBlock) error {
return nil
}
pr, _ := block_processor.MakeProcessor(l, handler)
prevHeader := genesisBlock.BlockHeader

// create a few rounds
for i := 1; i <= 3; i++ {
txn := test.MakePaymentTxn(0, uint64(i), 0, 1, 1, 0, test.AccountA, test.AccountA, basics.Address{}, basics.Address{})
block, err := test.MakeBlockForTxns(prevHeader, &txn)
assert.Nil(t, err)
rawBlock := rpcs.EncodedBlockCert{Block: block, Certificate: agreement.Certificate{}}
err = pr.Process(&rawBlock)
assert.Nil(t, err)
// check round
assert.Equal(t, basics.Round(i), l.Latest())
assert.Equal(t, uint64(basics.Round(i+1)), pr.NextRoundToProcess())
// check added block
addedBlock, err := l.Block(l.Latest())
assert.Nil(t, err)
assert.NotNil(t, addedBlock)
assert.Equal(t, 1, len(addedBlock.Payset))
prevHeader = addedBlock.BlockHeader
}
}

func TestFailedProcess(t *testing.T) {
l := makeTestLedger(t, "local_ledger2")
// invalid processor
pr, err := block_processor.MakeProcessor(nil, nil)
assert.Contains(t, err.Error(), "MakeProcessor(): local ledger not initialized")
pr, err = block_processor.MakeProcessor(l, nil)
assert.Nil(t, err)
err = pr.Process(nil)
assert.Contains(t, err.Error(), "Process(): cannot process a nil block")

genesisBlock, err := l.Block(basics.Round(0))
assert.Nil(t, err)
// incorrect round
txn := test.MakePaymentTxn(0, 10, 0, 1, 1, 0, test.AccountA, test.AccountA, test.AccountA, test.AccountA)
block, err := test.MakeBlockForTxns(genesisBlock.BlockHeader, &txn)
block.BlockHeader.Round = 10
assert.Nil(t, err)
rawBlock := rpcs.EncodedBlockCert{Block: block, Certificate: agreement.Certificate{}}
err = pr.Process(&rawBlock)
assert.Contains(t, err.Error(), "Process() invalid round blockCert.Block.Round()")

// non-zero balance after close remainder to sender address
txn = test.MakePaymentTxn(0, 10, 0, 1, 1, 0, test.AccountA, test.AccountA, test.AccountA, test.AccountA)
block, err = test.MakeBlockForTxns(genesisBlock.BlockHeader, &txn)
assert.Nil(t, err)
rawBlock = rpcs.EncodedBlockCert{Block: block, Certificate: agreement.Certificate{}}
err = pr.Process(&rawBlock)
assert.Contains(t, err.Error(), "Process() apply transaction group")

// stxn GenesisID not empty
txn = test.MakePaymentTxn(0, 10, 0, 1, 1, 0, test.AccountA, test.AccountA, basics.Address{}, basics.Address{})
block, err = test.MakeBlockForTxns(genesisBlock.BlockHeader, &txn)
assert.Nil(t, err)
block.Payset[0].Txn.GenesisID = "genesisID"
rawBlock = rpcs.EncodedBlockCert{Block: block, Certificate: agreement.Certificate{}}
err = pr.Process(&rawBlock)
assert.Contains(t, err.Error(), "Process() decode payset groups err")

// eval error: concensus protocol not supported
txn = test.MakePaymentTxn(0, 10, 0, 1, 1, 0, test.AccountA, test.AccountA, basics.Address{}, basics.Address{})
block, err = test.MakeBlockForTxns(genesisBlock.BlockHeader, &txn)
block.BlockHeader.CurrentProtocol = "testing"
assert.Nil(t, err)
rawBlock = rpcs.EncodedBlockCert{Block: block, Certificate: agreement.Certificate{}}
err = pr.Process(&rawBlock)
assert.Contains(t, err.Error(), "Process() block eval err")

// handler error
handler := func(vb *ledgercore.ValidatedBlock) error {
return fmt.Errorf("handler error")
}
_, err = block_processor.MakeProcessor(l, handler)
assert.Contains(t, err.Error(), "MakeProcessor() handler err")
pr, _ = block_processor.MakeProcessor(l, nil)
txn = test.MakePaymentTxn(0, 10, 0, 1, 1, 0, test.AccountA, test.AccountA, basics.Address{}, basics.Address{})
block, err = test.MakeBlockForTxns(genesisBlock.BlockHeader, &txn)
assert.Nil(t, err)
pr.SetHandler(handler)
rawBlock = rpcs.EncodedBlockCert{Block: block, Certificate: agreement.Certificate{}}
err = pr.Process(&rawBlock)
assert.Contains(t, err.Error(), "Process() handler err")
}

func makeTestLedger(t *testing.T, prefix string) *ledger.Ledger {
// initialize local ledger
genesis := test.MakeGenesis()
genesisBlock := test.MakeGenesisBlock()
initState, err := util.CreateInitState(&genesis, &genesisBlock)
if err != nil {
log.Panicf("test init err: %v", err)
}
logger := logging.NewLogger()
l, err := ledger.OpenLedger(logger, prefix, true, initState, config.GetDefaultLocal())
if err != nil {
log.Panicf("test init err: %v", err)
}
return l
}
13 changes: 13 additions & 0 deletions processor/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package processor

import (
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/rpcs"
)

// Processor is the block processor interface
type Processor interface {
Process(cert *rpcs.EncodedBlockCert) error
SetHandler(handler func(block *ledgercore.ValidatedBlock) error)
NextRoundToProcess() uint64
}
1 change: 1 addition & 0 deletions util/test/account_testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func MakePaymentTxn(fee, amt, closeAmt, sendRewards, receiveRewards,
Fee: basics.MicroAlgos{Raw: fee},
GenesisHash: GenesisHash,
RekeyTo: rekeyTo,
LastValid: 10,
},
PaymentTxnFields: transactions.PaymentTxnFields{
Receiver: receiver,
Expand Down
21 changes: 21 additions & 0 deletions util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"unicode"
"unicode/utf8"

"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-codec/codec"
)

Expand Down Expand Up @@ -53,6 +56,24 @@ func JSONOneLine(obj interface{}) string {
return string(b)
}

// CreateInitState makes an initState
func CreateInitState(genesis *bookkeeping.Genesis, genesisBlock *bookkeeping.Block) (ledgercore.InitState, error) {
accounts := make(map[basics.Address]basics.AccountData)
for _, alloc := range genesis.Allocation {
address, err := basics.UnmarshalChecksumAddress(alloc.Address)
if err != nil {
return ledgercore.InitState{}, fmt.Errorf("openLedger() decode address err: %w", err)
}
accounts[address] = alloc.State
}
initState := ledgercore.InitState{
Block: *genesisBlock,
Accounts: accounts,
GenesisHash: genesisBlock.GenesisHash(),
}
return initState, nil
}

func init() {
oneLineJSONCodecHandle = new(codec.JsonHandle)
oneLineJSONCodecHandle.ErrorIfNoField = true
Expand Down

0 comments on commit ce34b6f

Please sign in to comment.