diff --git a/processor/blockprocessor/block_processor.go b/processor/blockprocessor/block_processor.go new file mode 100644 index 000000000..6dd5d9a67 --- /dev/null +++ b/processor/blockprocessor/block_processor.go @@ -0,0 +1,90 @@ +package blockprocessor + +import ( + "fmt" + + "github.com/algorand/go-algorand/ledger" + "github.com/algorand/go-algorand/ledger/ledgercore" + "github.com/algorand/go-algorand/rpcs" + "github.com/algorand/indexer/processor" +) + +type blockProcessor struct { + handler func(block *ledgercore.ValidatedBlock) error + nextRoundToProcess uint64 + ledger *ledger.Ledger +} + +// MakeProcessor creates a block processor +func MakeProcessor(ledger *ledger.Ledger, handler func(block *ledgercore.ValidatedBlock) error) (processor.Processor, error) { + if ledger == nil { + return nil, fmt.Errorf("MakeProcessor(): local ledger not initialized") + } + if handler != nil && ledger.Latest() == 0 { + blk, err := ledger.Block(0) + if err != nil { + return nil, fmt.Errorf("MakeProcessor() err: %w", err) + } + vb := ledgercore.MakeValidatedBlock(blk, ledgercore.StateDelta{}) + err = handler(&vb) + if err != nil { + return nil, fmt.Errorf("MakeProcessor() handler err: %w", err) + } + } + return &blockProcessor{ledger: ledger, nextRoundToProcess: uint64(ledger.Latest() + 1), handler: handler}, nil +} + +// Process a raw algod block +func (processor *blockProcessor) Process(blockCert *rpcs.EncodedBlockCert) error { + if blockCert == nil { + return fmt.Errorf("Process(): cannot process a nil block") + } + if uint64(blockCert.Block.Round()) != processor.nextRoundToProcess { + return fmt.Errorf("Process() invalid round blockCert.Block.Round(): %d processor.nextRoundToProcess: %d", blockCert.Block.Round(), processor.nextRoundToProcess) + } + + blkeval, err := processor.ledger.StartEvaluator(blockCert.Block.BlockHeader, len(blockCert.Block.Payset), 0) + if err != nil { + return fmt.Errorf("Process() block eval err: %w", err) + } + + paysetgroups, err := blockCert.Block.DecodePaysetGroups() + if err != nil { + return fmt.Errorf("Process() decode payset groups err: %w", err) + } + + for _, group := range paysetgroups { + err = blkeval.TransactionGroup(group) + if err != nil { + return fmt.Errorf("Process() apply transaction group err: %w", err) + } + } + + // validated block + vb, err := blkeval.GenerateBlock() + if err != nil { + return fmt.Errorf("Process() validated block err: %w", err) + } + // execute handler before writing to local ledger + if processor.handler != nil { + err = processor.handler(vb) + if err != nil { + return fmt.Errorf("Process() handler err: %w", err) + } + } + // write to ledger + err = processor.ledger.AddValidatedBlock(*vb, blockCert.Certificate) + if err != nil { + return fmt.Errorf("Process() add validated block err: %w", err) + } + processor.nextRoundToProcess = uint64(processor.ledger.Latest()) + 1 + return nil +} + +func (processor *blockProcessor) SetHandler(handler func(block *ledgercore.ValidatedBlock) error) { + processor.handler = handler +} + +func (processor *blockProcessor) NextRoundToProcess() uint64 { + return processor.nextRoundToProcess +} diff --git a/processor/blockprocessor/block_processor_test.go b/processor/blockprocessor/block_processor_test.go new file mode 100644 index 000000000..a80a82999 --- /dev/null +++ b/processor/blockprocessor/block_processor_test.go @@ -0,0 +1,129 @@ +package blockprocessor_test + +import ( + "fmt" + "log" + "testing" + + "github.com/algorand/go-algorand/agreement" + "github.com/algorand/go-algorand/config" + "github.com/algorand/go-algorand/data/basics" + "github.com/algorand/go-algorand/ledger" + "github.com/algorand/go-algorand/ledger/ledgercore" + "github.com/algorand/go-algorand/logging" + "github.com/algorand/go-algorand/rpcs" + block_processor "github.com/algorand/indexer/processor/blockprocessor" + "github.com/algorand/indexer/util" + "github.com/algorand/indexer/util/test" + "github.com/stretchr/testify/assert" +) + +func TestProcess(t *testing.T) { + l := makeTestLedger(t, "local_ledger") + genesisBlock, err := l.Block(basics.Round(0)) + assert.Nil(t, err) + // create processor + handler := func(vb *ledgercore.ValidatedBlock) error { + return nil + } + pr, _ := block_processor.MakeProcessor(l, handler) + prevHeader := genesisBlock.BlockHeader + + // create a few rounds + for i := 1; i <= 3; i++ { + txn := test.MakePaymentTxn(0, uint64(i), 0, 1, 1, 0, test.AccountA, test.AccountA, basics.Address{}, basics.Address{}) + block, err := test.MakeBlockForTxns(prevHeader, &txn) + assert.Nil(t, err) + rawBlock := rpcs.EncodedBlockCert{Block: block, Certificate: agreement.Certificate{}} + err = pr.Process(&rawBlock) + assert.Nil(t, err) + // check round + assert.Equal(t, basics.Round(i), l.Latest()) + assert.Equal(t, uint64(basics.Round(i+1)), pr.NextRoundToProcess()) + // check added block + addedBlock, err := l.Block(l.Latest()) + assert.Nil(t, err) + assert.NotNil(t, addedBlock) + assert.Equal(t, 1, len(addedBlock.Payset)) + prevHeader = addedBlock.BlockHeader + } +} + +func TestFailedProcess(t *testing.T) { + l := makeTestLedger(t, "local_ledger2") + // invalid processor + pr, err := block_processor.MakeProcessor(nil, nil) + assert.Contains(t, err.Error(), "MakeProcessor(): local ledger not initialized") + pr, err = block_processor.MakeProcessor(l, nil) + assert.Nil(t, err) + err = pr.Process(nil) + assert.Contains(t, err.Error(), "Process(): cannot process a nil block") + + genesisBlock, err := l.Block(basics.Round(0)) + assert.Nil(t, err) + // incorrect round + txn := test.MakePaymentTxn(0, 10, 0, 1, 1, 0, test.AccountA, test.AccountA, test.AccountA, test.AccountA) + block, err := test.MakeBlockForTxns(genesisBlock.BlockHeader, &txn) + block.BlockHeader.Round = 10 + assert.Nil(t, err) + rawBlock := rpcs.EncodedBlockCert{Block: block, Certificate: agreement.Certificate{}} + err = pr.Process(&rawBlock) + assert.Contains(t, err.Error(), "Process() invalid round blockCert.Block.Round()") + + // non-zero balance after close remainder to sender address + txn = test.MakePaymentTxn(0, 10, 0, 1, 1, 0, test.AccountA, test.AccountA, test.AccountA, test.AccountA) + block, err = test.MakeBlockForTxns(genesisBlock.BlockHeader, &txn) + assert.Nil(t, err) + rawBlock = rpcs.EncodedBlockCert{Block: block, Certificate: agreement.Certificate{}} + err = pr.Process(&rawBlock) + assert.Contains(t, err.Error(), "Process() apply transaction group") + + // stxn GenesisID not empty + txn = test.MakePaymentTxn(0, 10, 0, 1, 1, 0, test.AccountA, test.AccountA, basics.Address{}, basics.Address{}) + block, err = test.MakeBlockForTxns(genesisBlock.BlockHeader, &txn) + assert.Nil(t, err) + block.Payset[0].Txn.GenesisID = "genesisID" + rawBlock = rpcs.EncodedBlockCert{Block: block, Certificate: agreement.Certificate{}} + err = pr.Process(&rawBlock) + assert.Contains(t, err.Error(), "Process() decode payset groups err") + + // eval error: concensus protocol not supported + txn = test.MakePaymentTxn(0, 10, 0, 1, 1, 0, test.AccountA, test.AccountA, basics.Address{}, basics.Address{}) + block, err = test.MakeBlockForTxns(genesisBlock.BlockHeader, &txn) + block.BlockHeader.CurrentProtocol = "testing" + assert.Nil(t, err) + rawBlock = rpcs.EncodedBlockCert{Block: block, Certificate: agreement.Certificate{}} + err = pr.Process(&rawBlock) + assert.Contains(t, err.Error(), "Process() block eval err") + + // handler error + handler := func(vb *ledgercore.ValidatedBlock) error { + return fmt.Errorf("handler error") + } + _, err = block_processor.MakeProcessor(l, handler) + assert.Contains(t, err.Error(), "MakeProcessor() handler err") + pr, _ = block_processor.MakeProcessor(l, nil) + txn = test.MakePaymentTxn(0, 10, 0, 1, 1, 0, test.AccountA, test.AccountA, basics.Address{}, basics.Address{}) + block, err = test.MakeBlockForTxns(genesisBlock.BlockHeader, &txn) + assert.Nil(t, err) + pr.SetHandler(handler) + rawBlock = rpcs.EncodedBlockCert{Block: block, Certificate: agreement.Certificate{}} + err = pr.Process(&rawBlock) + assert.Contains(t, err.Error(), "Process() handler err") +} + +func makeTestLedger(t *testing.T, prefix string) *ledger.Ledger { + // initialize local ledger + genesis := test.MakeGenesis() + genesisBlock := test.MakeGenesisBlock() + initState, err := util.CreateInitState(&genesis, &genesisBlock) + if err != nil { + log.Panicf("test init err: %v", err) + } + logger := logging.NewLogger() + l, err := ledger.OpenLedger(logger, prefix, true, initState, config.GetDefaultLocal()) + if err != nil { + log.Panicf("test init err: %v", err) + } + return l +} diff --git a/processor/processor.go b/processor/processor.go new file mode 100644 index 000000000..43de31441 --- /dev/null +++ b/processor/processor.go @@ -0,0 +1,13 @@ +package processor + +import ( + "github.com/algorand/go-algorand/ledger/ledgercore" + "github.com/algorand/go-algorand/rpcs" +) + +// Processor is the block processor interface +type Processor interface { + Process(cert *rpcs.EncodedBlockCert) error + SetHandler(handler func(block *ledgercore.ValidatedBlock) error) + NextRoundToProcess() uint64 +} diff --git a/util/test/account_testutil.go b/util/test/account_testutil.go index 1142eb3c0..dd1ae560d 100644 --- a/util/test/account_testutil.go +++ b/util/test/account_testutil.go @@ -164,6 +164,7 @@ func MakePaymentTxn(fee, amt, closeAmt, sendRewards, receiveRewards, Fee: basics.MicroAlgos{Raw: fee}, GenesisHash: GenesisHash, RekeyTo: rekeyTo, + LastValid: 10, }, PaymentTxnFields: transactions.PaymentTxnFields{ Receiver: receiver, diff --git a/util/util.go b/util/util.go index 52a366ea7..4abeac190 100644 --- a/util/util.go +++ b/util/util.go @@ -7,6 +7,9 @@ import ( "unicode" "unicode/utf8" + "github.com/algorand/go-algorand/data/basics" + "github.com/algorand/go-algorand/data/bookkeeping" + "github.com/algorand/go-algorand/ledger/ledgercore" "github.com/algorand/go-codec/codec" ) @@ -53,6 +56,24 @@ func JSONOneLine(obj interface{}) string { return string(b) } +// CreateInitState makes an initState +func CreateInitState(genesis *bookkeeping.Genesis, genesisBlock *bookkeeping.Block) (ledgercore.InitState, error) { + accounts := make(map[basics.Address]basics.AccountData) + for _, alloc := range genesis.Allocation { + address, err := basics.UnmarshalChecksumAddress(alloc.Address) + if err != nil { + return ledgercore.InitState{}, fmt.Errorf("openLedger() decode address err: %w", err) + } + accounts[address] = alloc.State + } + initState := ledgercore.InitState{ + Block: *genesisBlock, + Accounts: accounts, + GenesisHash: genesisBlock.GenesisHash(), + } + return initState, nil +} + func init() { oneLineJSONCodecHandle = new(codec.JsonHandle) oneLineJSONCodecHandle.ErrorIfNoField = true