diff --git a/consensus/bor/bor.go b/consensus/bor/bor.go index 945395dd6be9..c8e8b7b91c4f 100644 --- a/consensus/bor/bor.go +++ b/consensus/bor/bor.go @@ -138,6 +138,12 @@ var ( errRecentlySigned = errors.New("recently signed") ) +// ActiveSealingOp keeps the context of the active sealing operation +type ActiveSealingOp struct { + number uint64 + cancel context.CancelFunc +} + // SignerFn is a signer callback function to request a header to be signed by a // backing account. type SignerFn func(accounts.Account, string, []byte) ([]byte, error) @@ -246,8 +252,9 @@ type Bor struct { stateReceiverABI abi.ABI HeimdallClient IHeimdallClient - stateDataFeed event.Feed - scope event.SubscriptionScope + stateDataFeed event.Feed + scope event.SubscriptionScope + activeSealingOp *ActiveSealingOp // The fields below are for testing only fakeDiff bool // Skip difficulty verifications } @@ -725,6 +732,9 @@ func (c *Bor) Authorize(signer common.Address, signFn SignerFn) { // Seal implements consensus.Engine, attempting to create a sealed block using // the local signing credentials. func (c *Bor) Seal(chain consensus.ChainReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error { + if c.activeSealingOp != nil { + return &SealingInFlightError{c.activeSealingOp.number} + } header := block.Header() // Sealing the genesis block is not supported @@ -762,9 +772,6 @@ func (c *Bor) Seal(chain consensus.ChainReader, block *types.Block, results chan wiggle := time.Duration(2*c.config.Period) * time.Second * time.Duration(successionNumber) delay += wiggle - log.Info("Out-of-turn signing requested", "wiggle", common.PrettyDuration(wiggle)) - log.Info("Sealing block with", "number", number, "delay", delay, "headerDifficulty", header.Difficulty, "signer", signer.Hex(), "proposer", snap.ValidatorSet.GetProposer().Address.Hex()) - // Sign all the things! sighash, err := signFn(accounts.Account{Address: signer}, accounts.MimetypeBor, BorRLP(header)) if err != nil { @@ -774,23 +781,64 @@ func (c *Bor) Seal(chain consensus.ChainReader, block *types.Block, results chan // Wait until sealing is terminated or delay timeout. log.Trace("Waiting for slot to sign and propagate", "delay", common.PrettyDuration(delay)) + shouldSeal := make(chan bool) + go c.WaitForSealingOp(number, shouldSeal, delay, stop) go func() { - select { - case <-stop: + defer func() { + close(shouldSeal) + c.activeSealingOp = nil + }() + switch <-shouldSeal { + case false: return - case <-time.After(delay): + case true: + if wiggle > 0 { + log.Info( + "Sealing out-of-turn", + "number", number, + "wiggle", common.PrettyDuration(wiggle), + "in-turn-signer", snap.ValidatorSet.GetProposer().Address.Hex(), + ) + } + log.Info( + "Sealing successful", + "number", number, + "delay", delay, + "headerDifficulty", header.Difficulty, + ) } select { case results <- block.WithSeal(header): default: - log.Warn("Sealing result is not read by miner", "sealhash", SealHash(header)) + log.Warn("Sealing result was not read by miner", "sealhash", SealHash(header)) } }() - return nil } +// WaitForSealingOp blocks until delay elapses or stop signal is received +func (c *Bor) WaitForSealingOp(number uint64, shouldSeal chan bool, delay time.Duration, stop <-chan struct{}) { + ctx, cancel := context.WithCancel(context.Background()) + c.activeSealingOp = &ActiveSealingOp{number, cancel} + select { + case <-stop: + shouldSeal <- false + case <-ctx.Done(): + shouldSeal <- false + case <-time.After(delay): + shouldSeal <- true + } +} + +// CancelActiveSealingOp cancels in-flight sealing process +func (c *Bor) CancelActiveSealingOp() { + if c.activeSealingOp != nil { + log.Debug("Discarding active sealing operation", "number", c.activeSealingOp.number) + c.activeSealingOp.cancel() + } +} + // CalcDifficulty is the difficulty adjustment algorithm. It returns the difficulty // that a new block should have based on the previous blocks in the chain and the // current signer. diff --git a/consensus/bor/errors.go b/consensus/bor/errors.go index f9f99e2b6490..8b09bdfe9f94 100644 --- a/consensus/bor/errors.go +++ b/consensus/bor/errors.go @@ -68,3 +68,14 @@ func (e *MaxCheckpointLengthExceededError) Error() string { MaxCheckpointLength, ) } + +type SealingInFlightError struct { + Number uint64 +} + +func (e *SealingInFlightError) Error() string { + return fmt.Sprintf( + "Requested concurrent block sealing. Sealing for block %d is already in progress", + e.Number, + ) +} diff --git a/consensus/consensus.go b/consensus/consensus.go index e86583f394d0..0620cff0596f 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -116,6 +116,13 @@ type Engine interface { Close() error } +// Bor is a consensus engine developed by folks at Matic Network +type Bor interface { + Engine + IsValidatorAction(chain ChainReader, from common.Address, tx *types.Transaction) bool + CancelActiveSealingOp() +} + // PoW is a consensus engine based on proof-of-work. type PoW interface { Engine diff --git a/core/blockchain.go b/core/blockchain.go index e90ffbb30eac..3ce4356929e2 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1718,6 +1718,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() { events = append(events, ChainHeadEvent{lastCanon}) } + bc.engine.(consensus.Bor).CancelActiveSealingOp() return it.index, events, coalescedLogs, err } diff --git a/core/tx_pool.go b/core/tx_pool.go index 02011586c344..1462c2324e66 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -117,12 +117,6 @@ const ( TxStatusIncluded ) -// bor acts as a way to be able to type cast consensus.Engine; -// since importing "github.com/maticnetwork/bor/consensus/bor" results in a cyclic dependency -type bor interface { - IsValidatorAction(chain consensus.ChainReader, from common.Address, tx *types.Transaction) bool -} - // blockChain provides the state of blockchain and current gas limit to do // some pre checks in tx pool and event subscribers. type blockChain interface { @@ -537,7 +531,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { // Drop non-local transactions under our own minimal accepted gas price local = local || pool.locals.contains(from) // account may be local even if the transaction arrived from the network if !local && - !pool.chain.Engine().(bor).IsValidatorAction(pool.chain.(consensus.ChainReader), from, tx) && + !pool.chain.Engine().(consensus.Bor).IsValidatorAction(pool.chain.(consensus.ChainReader), from, tx) && pool.gasPrice.Cmp(tx.GasPrice()) > 0 { return ErrUnderpriced }