From a1d058cdcbbc556b7ac0b52236cdef21229cfed3 Mon Sep 17 00:00:00 2001 From: Ganesha Upadhyaya Date: Thu, 30 Mar 2023 02:31:30 -0700 Subject: [PATCH] Support multi sequencer (#823) fixes https://github.com/rollkit/rollkit/issues/819 and https://github.com/rollkit/rollkit/issues/816 --------- Co-authored-by: Ganesha Upadhyaya Co-authored-by: Manav Aggarwal Co-authored-by: nashqueue <99758629+nashqueue@users.noreply.github.com> --- block/manager.go | 57 ++++++++++++++++--- node/full_client_test.go | 120 ++++++++++++++++++++++++++------------- state/executor.go | 2 +- 3 files changed, 128 insertions(+), 51 deletions(-) diff --git a/block/manager.go b/block/manager.go index d1c39fd218f..f4301d90e6e 100644 --- a/block/manager.go +++ b/block/manager.go @@ -1,7 +1,9 @@ package block import ( + "bytes" "context" + "encoding/hex" "fmt" "sync" "sync/atomic" @@ -68,6 +70,8 @@ type Manager struct { // retrieveCond is used to notify sync goroutine (SyncLoop) that it needs to retrieve data retrieveCond *sync.Cond + lastStateMtx *sync.Mutex + logger log.Logger // For usage by Lazy Aggregator mode @@ -151,6 +155,7 @@ func NewManager( blockInCh: make(chan newBlockEvent, 100), FraudProofInCh: make(chan *abci.FraudProof, 100), retrieveMtx: new(sync.Mutex), + lastStateMtx: new(sync.Mutex), syncCache: make(map[uint64]*types.Block), logger: logger, txsAvailable: txsAvailableCh, @@ -340,10 +345,18 @@ func (m *Manager) trySyncNextBlock(ctx context.Context, daHeight uint64) error { return fmt.Errorf("failed to save block responses: %w", err) } + // SaveValidators commits the DB tx + err = m.store.SaveValidators(uint64(b.SignedHeader.Header.Height()), m.lastState.Validators) + if err != nil { + return err + } + if daHeight > newState.DAHeight { newState.DAHeight = daHeight } + m.lastStateMtx.Lock() m.lastState = newState + m.lastStateMtx.Unlock() err = m.store.UpdateState(m.lastState) if err != nil { m.logger.Error("failed to save updated state", "error", err) @@ -449,6 +462,20 @@ func (m *Manager) getCommit(header types.Header) (*types.Commit, error) { }, nil } +func (m *Manager) IsProposer() (bool, error) { + // if proposer is not set, assume self proposer + if m.lastState.Validators.Proposer == nil { + return true, nil + } + + signerPubBytes, err := m.proposerKey.GetPublic().Raw() + if err != nil { + return false, err + } + + return bytes.Equal(m.lastState.Validators.Proposer.PubKey.Bytes(), signerPubBytes), nil +} + func (m *Manager) publishBlock(ctx context.Context) error { var lastCommit *types.Commit var lastHeaderHash types.Hash @@ -456,6 +483,16 @@ func (m *Manager) publishBlock(ctx context.Context) error { height := m.store.Height() newHeight := height + 1 + m.lastStateMtx.Lock() + isProposer, err := m.IsProposer() + m.lastStateMtx.Unlock() + if err != nil { + return fmt.Errorf("error while checking for proposer: %w", err) + } + if !isProposer { + return nil + } + // this is a special case, when first block is produced - there is no previous commit if newHeight == uint64(m.genesis.InitialHeight) { lastCommit = &types.Commit{} @@ -527,6 +564,9 @@ func (m *Manager) publishBlock(ctx context.Context) error { return err } + // Only update the stored height after successfully submitting to DA layer and committing to the DB + m.store.SetHeight(uint64(block.SignedHeader.Header.Height())) + // Commit the new state and block which writes to disk on the proxy app _, _, err = m.executor.Commit(ctx, newState, block, responses) if err != nil { @@ -539,6 +579,12 @@ func (m *Manager) publishBlock(ctx context.Context) error { return err } + // SaveValidators commits the DB tx + err = m.store.SaveValidators(uint64(block.SignedHeader.Header.Height()), m.lastState.Validators) + if err != nil { + return err + } + newState.DAHeight = atomic.LoadUint64(&m.daHeight) // After this call m.lastState is the NEW state returned from ApplyBlock m.lastState = newState @@ -549,18 +595,11 @@ func (m *Manager) publishBlock(ctx context.Context) error { return err } - // SaveValidators commits the DB tx - err = m.store.SaveValidators(uint64(block.SignedHeader.Header.Height()), m.lastState.Validators) - if err != nil { - return err - } - - // Only update the stored height after successfully submitting to DA layer and committing to the DB - m.store.SetHeight(uint64(block.SignedHeader.Header.Height())) - // Publish header to channel so that header exchange service can broadcast m.HeaderCh <- &block.SignedHeader + m.logger.Debug("successfully proposed block", "proposer", hex.EncodeToString(block.SignedHeader.ProposerAddress), "height", block.SignedHeader.Height()) + return nil } diff --git a/node/full_client_test.go b/node/full_client_test.go index 9739747140f..e9ee84644cf 100644 --- a/node/full_client_test.go +++ b/node/full_client_test.go @@ -31,7 +31,9 @@ import ( "github.com/rollkit/rollkit/config" "github.com/rollkit/rollkit/conv" abciconv "github.com/rollkit/rollkit/conv/abci" + mockda "github.com/rollkit/rollkit/da/mock" "github.com/rollkit/rollkit/mocks" + "github.com/rollkit/rollkit/store" "github.com/rollkit/rollkit/types" ) @@ -648,60 +650,73 @@ func TestBlockchainInfo(t *testing.T) { } func TestValidatorSetHandling(t *testing.T) { - // handle multiple sequencers - t.Skip() - assert := assert.New(t) require := require.New(t) - app := &mocks.Application{} - app.On("InitChain", mock.Anything).Return(abci.ResponseInitChain{}) - app.On("CheckTx", mock.Anything).Return(abci.ResponseCheckTx{}) - app.On("BeginBlock", mock.Anything).Return(abci.ResponseBeginBlock{}) - app.On("Commit", mock.Anything).Return(abci.ResponseCommit{}) - app.On("GetAppHash", mock.Anything).Return(abci.ResponseGetAppHash{}) - app.On("GenerateFraudProof", mock.Anything).Return(abci.ResponseGenerateFraudProof{}) - key, _, _ := crypto.GenerateEd25519Key(crand.Reader) + waitCh := make(chan interface{}) + + vKeys := make([]tmcrypto.PrivKey, 2) + apps := make([]*mocks.Application, 2) + nodes := make([]*FullNode, 2) - vKeys := make([]tmcrypto.PrivKey, 4) genesisValidators := make([]tmtypes.GenesisValidator, len(vKeys)) for i := 0; i < len(vKeys); i++ { vKeys[i] = ed25519.GenPrivKey() genesisValidators[i] = tmtypes.GenesisValidator{Address: vKeys[i].PubKey().Address(), PubKey: vKeys[i].PubKey(), Power: int64(i + 100), Name: fmt.Sprintf("gen #%d", i)} + apps[i] = createApp(vKeys[0], waitCh, require) } - nodeKey := &p2p.NodeKey{ - PrivKey: vKeys[0], - } - signingKey, _ := conv.GetNodeKey(nodeKey) - - pbValKey, err := encoding.PubKeyToProto(vKeys[0].PubKey()) - require.NoError(err) - - waitCh := make(chan interface{}) - - app.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{}).Times(5) - app.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{ValidatorUpdates: []abci.ValidatorUpdate{{PubKey: pbValKey, Power: 0}}}).Once() - app.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{}).Once() - app.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{ValidatorUpdates: []abci.ValidatorUpdate{{PubKey: pbValKey, Power: 100}}}).Once() - app.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{}).Run(func(args mock.Arguments) { - waitCh <- nil - }) + dalc := &mockda.DataAvailabilityLayerClient{} + ds, err := store.NewDefaultInMemoryKVStore() + require.Nil(err) + err = dalc.Init([8]byte{}, nil, ds, log.TestingLogger()) + require.Nil(err) + err = dalc.Start() + require.Nil(err) + + for i := 0; i < len(nodes); i++ { + nodeKey := &p2p.NodeKey{ + PrivKey: vKeys[i], + } + signingKey, err := conv.GetNodeKey(nodeKey) + require.NoError(err) + nodes[i], err = newFullNode( + context.Background(), + config.NodeConfig{ + DALayer: "mock", + Aggregator: true, + BlockManagerConfig: config.BlockManagerConfig{ + BlockTime: 1 * time.Second, + DABlockTime: 100 * time.Millisecond, + }, + }, + signingKey, + signingKey, + abcicli.NewLocalClient(nil, apps[i]), + &tmtypes.GenesisDoc{ChainID: "test", Validators: genesisValidators}, + log.TestingLogger(), + ) + require.NoError(err) + require.NotNil(nodes[i]) - node, err := newFullNode(context.Background(), config.NodeConfig{DALayer: "mock", Aggregator: true, BlockManagerConfig: config.BlockManagerConfig{BlockTime: 10 * time.Millisecond}}, key, signingKey, abcicli.NewLocalClient(nil, app), &tmtypes.GenesisDoc{ChainID: "test", Validators: genesisValidators}, log.TestingLogger()) - require.NoError(err) - require.NotNil(node) + // use same, common DALC, so nodes can share data + nodes[i].dalc = dalc + nodes[i].blockManager.SetDALC(dalc) + } - rpc := NewFullClient(node) + rpc := NewFullClient(nodes[0]) require.NotNil(rpc) - err = node.Start() - require.NoError(err) + for i := 0; i < len(nodes); i++ { + err := nodes[i].Start() + require.NoError(err) + } + <-waitCh <-waitCh // test first blocks - for h := int64(1); h <= 6; h++ { + for h := int64(1); h <= 3; h++ { vals, err := rpc.Validators(context.Background(), &h, nil, nil) assert.NoError(err) assert.NotNil(vals) @@ -710,8 +725,8 @@ func TestValidatorSetHandling(t *testing.T) { assert.EqualValues(vals.BlockHeight, h) } - // 6th EndBlock removes first validator from the list - for h := int64(7); h <= 8; h++ { + // 3rd EndBlock removes the first validator from the list + for h := int64(4); h <= 5; h++ { vals, err := rpc.Validators(context.Background(), &h, nil, nil) assert.NoError(err) assert.NotNil(vals) @@ -720,8 +735,9 @@ func TestValidatorSetHandling(t *testing.T) { assert.EqualValues(vals.BlockHeight, h) } - // 8th EndBlock adds validator back - for h := int64(9); h <= 12; h++ { + // 5th EndBlock adds validator back + for h := int64(6); h <= 9; h++ { + <-waitCh <-waitCh vals, err := rpc.Validators(context.Background(), &h, nil, nil) assert.NoError(err) @@ -737,7 +753,29 @@ func TestValidatorSetHandling(t *testing.T) { assert.NotNil(vals) assert.EqualValues(len(genesisValidators), vals.Total) assert.Len(vals.Validators, len(genesisValidators)) - assert.GreaterOrEqual(vals.BlockHeight, int64(12)) + assert.GreaterOrEqual(vals.BlockHeight, int64(9)) +} + +func createApp(keyToRemove tmcrypto.PrivKey, waitCh chan interface{}, require *require.Assertions) *mocks.Application { + app := &mocks.Application{} + app.On("InitChain", mock.Anything).Return(abci.ResponseInitChain{}) + app.On("CheckTx", mock.Anything).Return(abci.ResponseCheckTx{}) + app.On("BeginBlock", mock.Anything).Return(abci.ResponseBeginBlock{}) + app.On("Commit", mock.Anything).Return(abci.ResponseCommit{}) + app.On("GetAppHash", mock.Anything).Return(abci.ResponseGetAppHash{}) + app.On("GenerateFraudProof", mock.Anything).Return(abci.ResponseGenerateFraudProof{}) + + pbValKey, err := encoding.PubKeyToProto(keyToRemove.PubKey()) + require.NoError(err) + + app.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{}).Times(2) + app.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{ValidatorUpdates: []abci.ValidatorUpdate{{PubKey: pbValKey, Power: 0}}}).Once() + app.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{}).Once() + app.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{ValidatorUpdates: []abci.ValidatorUpdate{{PubKey: pbValKey, Power: 100}}}).Once() + app.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{}).Run(func(args mock.Arguments) { + waitCh <- nil + }) + return app } // copy-pasted from store/store_test.go diff --git a/state/executor.go b/state/executor.go index 8bf13ce340e..46c2335daa7 100644 --- a/state/executor.go +++ b/state/executor.go @@ -231,7 +231,7 @@ func (e *BlockExecutor) updateState(state types.State, block *types.Block, abciR // for now, we don't care about part set headers }, NextValidators: nValSet, - Validators: state.NextValidators.Copy(), + Validators: nValSet, LastValidators: state.Validators.Copy(), LastHeightValidatorsChanged: lastHeightValSetChanged, ConsensusParams: state.ConsensusParams,