From 30231d82f19bfb0de446b9fa2779ddc682a1198e Mon Sep 17 00:00:00 2001 From: egonspace Date: Thu, 8 Jul 2021 18:23:17 +0900 Subject: [PATCH] feat: revise metric for measuring performance --- blockchain/v0/reactor.go | 2 +- blockchain/v0/reactor_test.go | 2 +- blockchain/v1/reactor.go | 2 +- blockchain/v1/reactor_test.go | 2 +- blockchain/v2/processor_context.go | 2 +- blockchain/v2/reactor.go | 2 +- blockchain/v2/reactor_test.go | 4 +- consensus/metrics.go | 103 ++++++++++-------------- consensus/replay.go | 2 +- consensus/replay_test.go | 2 +- consensus/state.go | 123 ++++++++++++++++------------- state/execution.go | 38 ++++++++- state/execution_test.go | 8 +- state/helpers_test.go | 2 +- test/maverick/consensus/replay.go | 2 +- test/maverick/consensus/state.go | 2 +- types/utils.go | 17 +++- 17 files changed, 175 insertions(+), 140 deletions(-) diff --git a/blockchain/v0/reactor.go b/blockchain/v0/reactor.go index 00e9238f1..d4d7eb727 100644 --- a/blockchain/v0/reactor.go +++ b/blockchain/v0/reactor.go @@ -398,7 +398,7 @@ FOR_LOOP: // TODO: same thing for app - but we would need a way to // get the hash without persisting the state var err error - state, _, err = bcR.blockExec.ApplyBlock(state, firstID, first) + state, _, err = bcR.blockExec.ApplyBlock(state, firstID, first, nil) if err != nil { // TODO This is bad, are we zombie? panic(fmt.Sprintf("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) diff --git a/blockchain/v0/reactor_test.go b/blockchain/v0/reactor_test.go index 64417dae9..ce5c2da93 100644 --- a/blockchain/v0/reactor_test.go +++ b/blockchain/v0/reactor_test.go @@ -118,7 +118,7 @@ func newBlockchainReactor( thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes) blockID := types.BlockID{Hash: thisBlock.Hash(), PartSetHeader: thisParts.Header()} - state, _, err = blockExec.ApplyBlock(state, blockID, thisBlock) + state, _, err = blockExec.ApplyBlock(state, blockID, thisBlock, nil) if err != nil { panic(fmt.Errorf("error apply block: %w", err)) } diff --git a/blockchain/v1/reactor.go b/blockchain/v1/reactor.go index a086543ba..785a06146 100644 --- a/blockchain/v1/reactor.go +++ b/blockchain/v1/reactor.go @@ -484,7 +484,7 @@ func (bcR *BlockchainReactor) processBlock() error { bcR.store.SaveBlock(first, firstParts, second.LastCommit) - bcR.state, _, err = bcR.blockExec.ApplyBlock(bcR.state, firstID, first) + bcR.state, _, err = bcR.blockExec.ApplyBlock(bcR.state, firstID, first, nil) if err != nil { panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) } diff --git a/blockchain/v1/reactor_test.go b/blockchain/v1/reactor_test.go index f57f8150f..d26c0295c 100644 --- a/blockchain/v1/reactor_test.go +++ b/blockchain/v1/reactor_test.go @@ -137,7 +137,7 @@ func newBlockchainReactor( thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes) blockID := types.BlockID{Hash: thisBlock.Hash(), PartSetHeader: thisParts.Header()} - state, _, err = blockExec.ApplyBlock(state, blockID, thisBlock) + state, _, err = blockExec.ApplyBlock(state, blockID, thisBlock, nil) if err != nil { panic(fmt.Errorf("error apply block: %w", err)) } diff --git a/blockchain/v2/processor_context.go b/blockchain/v2/processor_context.go index dbfad04e4..156074a63 100644 --- a/blockchain/v2/processor_context.go +++ b/blockchain/v2/processor_context.go @@ -30,7 +30,7 @@ func newProcessorContext(st blockStore, ex blockApplier, s state.State) *pContex } func (pc *pContext) applyBlock(blockID types.BlockID, block *types.Block) error { - newState, _, err := pc.applier.ApplyBlock(pc.state, blockID, block) + newState, _, err := pc.applier.ApplyBlock(pc.state, blockID, block, nil) pc.state = newState return err } diff --git a/blockchain/v2/reactor.go b/blockchain/v2/reactor.go index ad8dcfeaa..d306455d9 100644 --- a/blockchain/v2/reactor.go +++ b/blockchain/v2/reactor.go @@ -53,7 +53,7 @@ type blockVerifier interface { } type blockApplier interface { - ApplyBlock(state state.State, blockID types.BlockID, block *types.Block) (state.State, int64, error) + ApplyBlock(state state.State, blockID types.BlockID, block *types.Block, times *state.CommitStepTimes) (state.State, int64, error) } // XXX: unify naming in this package around tmState diff --git a/blockchain/v2/reactor_test.go b/blockchain/v2/reactor_test.go index 203f3fd8c..3cf94c1b0 100644 --- a/blockchain/v2/reactor_test.go +++ b/blockchain/v2/reactor_test.go @@ -82,7 +82,7 @@ type mockBlockApplier struct { // XXX: Add whitelist/blacklist? func (mba *mockBlockApplier) ApplyBlock( - state sm.State, blockID types.BlockID, block *types.Block, + state sm.State, blockID types.BlockID, block *types.Block, times *sm.CommitStepTimes, ) (sm.State, int64, error) { state.LastBlockHeight++ return state, 0, nil @@ -544,7 +544,7 @@ func newReactorStore( thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes) blockID := types.BlockID{Hash: thisBlock.Hash(), PartSetHeader: thisParts.Header()} - state, _, err = blockExec.ApplyBlock(state, blockID, thisBlock) + state, _, err = blockExec.ApplyBlock(state, blockID, thisBlock, nil) if err != nil { panic(fmt.Errorf("error apply block: %w", err)) } diff --git a/consensus/metrics.go b/consensus/metrics.go index 95e295967..0cf78326e 100644 --- a/consensus/metrics.go +++ b/consensus/metrics.go @@ -66,6 +66,10 @@ type Metrics struct { // Number of blockparts transmitted by peer. BlockParts metrics.Counter + // //////////////////////////////////// + // Metrics for measuring performance + // //////////////////////////////////// + // Number of blocks that are we couldn't receive MissingProposal metrics.Gauge @@ -73,16 +77,13 @@ type Metrics struct { RoundFailures metrics.Histogram // Execution time profiling of each step - ProposalCreating metrics.Histogram - ProposalWaiting metrics.Histogram - ProposalVerifying metrics.Histogram - ProposalBlockReceiving metrics.Histogram - PrevoteBlockVerifying metrics.Histogram - PrevoteReceiving metrics.Histogram - PrecommitBlockVerifying metrics.Histogram - PrecommitReceiving metrics.Histogram - CommitBlockVerifying metrics.Histogram - CommitBlockApplying metrics.Histogram + DurationProposal metrics.Histogram + DurationPrevote metrics.Histogram + DurationPrecommit metrics.Histogram + DurationCommitExecuting metrics.Histogram + DurationCommitCommitting metrics.Histogram + DurationCommitRechecking metrics.Histogram + DurationWaitingForNewRound metrics.Histogram } // PrometheusMetrics returns Metrics build using Prometheus client library. @@ -234,74 +235,53 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Help: "Number of rounds failed on consensus", Buckets: stdprometheus.LinearBuckets(0, 1, 5), }, labels).With(labelsAndValues...), - ProposalCreating: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ + DurationProposal: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, - Name: "duration_proposal_creating", - Help: "Duration of creating proposal and block", + Name: "duration_proposal", + Help: "Duration of proposal step", Buckets: stdprometheus.LinearBuckets(100, 100, 10), }, labels).With(labelsAndValues...), - ProposalWaiting: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ + DurationPrevote: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, - Name: "duration_proposal_waiting", - Help: "Duration between enterNewRound and receiving proposal", + Name: "duration_prevote", + Help: "Duration of prevote step", Buckets: stdprometheus.LinearBuckets(100, 100, 10), }, labels).With(labelsAndValues...), - ProposalVerifying: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "duration_proposal_verifying", - Help: "Duration of ValidBlock", - Buckets: stdprometheus.LinearBuckets(50, 50, 10), - }, labels).With(labelsAndValues...), - ProposalBlockReceiving: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ + DurationPrecommit: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, - Name: "duration_proposal_block_receiving", - Help: "Duration of receiving all proposal block parts", + Name: "duration_precommit", + Help: "Duration of precommit step", Buckets: stdprometheus.LinearBuckets(100, 100, 10), }, labels).With(labelsAndValues...), - PrevoteBlockVerifying: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ + DurationCommitExecuting: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, - Name: "duration_prevote_block_verifying", - Help: "Duration of ValidBlock in prevote", - Buckets: stdprometheus.LinearBuckets(50, 50, 10), - }, labels).With(labelsAndValues...), - PrevoteReceiving: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "duration_prevote_receiving", - Help: "Duration of receiving 2/3+ prevotes", + Name: "duration_commit_executing", + Help: "Duration of executing block txs", Buckets: stdprometheus.LinearBuckets(100, 100, 10), }, labels).With(labelsAndValues...), - PrecommitBlockVerifying: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ + DurationCommitCommitting: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, - Name: "duration_precommit_block_verifying", - Help: "Duration of ValidBlock in precommit", - Buckets: stdprometheus.LinearBuckets(50, 50, 10), - }, labels).With(labelsAndValues...), - PrecommitReceiving: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "duration_precommit_receiving", - Help: "Duration of receiving 2/3+ precommits", + Name: "duration_commit_committing", + Help: "Duration of committing updated state", Buckets: stdprometheus.LinearBuckets(100, 100, 10), }, labels).With(labelsAndValues...), - CommitBlockVerifying: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ + DurationCommitRechecking: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, - Name: "duration_commit_block_verifying", - Help: "Duration of ValidBlock in commit", - Buckets: stdprometheus.LinearBuckets(50, 50, 10), + Name: "duration_commit_rechecking", + Help: "Duration of rechecking mempool txs", + Buckets: stdprometheus.LinearBuckets(100, 100, 10), }, labels).With(labelsAndValues...), - CommitBlockApplying: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ + DurationWaitingForNewRound: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, - Name: "duration_commit_block_applying", - Help: "Duration of applying block", + Name: "duration_waiting_for_new_round", + Help: "Duration of waiting for next new round", Buckets: stdprometheus.LinearBuckets(100, 100, 10), }, labels).With(labelsAndValues...), } @@ -340,15 +320,12 @@ func NopMetrics() *Metrics { MissingProposal: discard.NewGauge(), RoundFailures: discard.NewHistogram(), - ProposalCreating: discard.NewHistogram(), - ProposalWaiting: discard.NewHistogram(), - ProposalVerifying: discard.NewHistogram(), - ProposalBlockReceiving: discard.NewHistogram(), - PrevoteBlockVerifying: discard.NewHistogram(), - PrevoteReceiving: discard.NewHistogram(), - PrecommitBlockVerifying: discard.NewHistogram(), - PrecommitReceiving: discard.NewHistogram(), - CommitBlockVerifying: discard.NewHistogram(), - CommitBlockApplying: discard.NewHistogram(), + DurationProposal: discard.NewHistogram(), + DurationPrevote: discard.NewHistogram(), + DurationPrecommit: discard.NewHistogram(), + DurationCommitExecuting: discard.NewHistogram(), + DurationCommitCommitting: discard.NewHistogram(), + DurationCommitRechecking: discard.NewHistogram(), + DurationWaitingForNewRound: discard.NewHistogram(), } } diff --git a/consensus/replay.go b/consensus/replay.go index 4e6271abf..5db237d20 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -502,7 +502,7 @@ func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.Ap blockExec.SetEventBus(h.eventBus) var err error - state, _, err = blockExec.ApplyBlock(state, meta.BlockID, block) + state, _, err = blockExec.ApplyBlock(state, meta.BlockID, block, nil) if err != nil { return sm.State{}, err } diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 91dca4959..d99bf4504 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -808,7 +808,7 @@ func applyBlock(stateStore sm.Store, st sm.State, blk *types.Block, proxyApp pro blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool) blkID := types.BlockID{Hash: blk.Hash(), PartSetHeader: blk.MakePartSet(testPartSize).Header()} - newState, _, err := blockExec.ApplyBlock(st, blkID, blk) + newState, _, err := blockExec.ApplyBlock(st, blkID, blk, nil) if err != nil { panic(err) } diff --git a/consensus/state.go b/consensus/state.go index c67341889..000afabfb 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -59,6 +59,53 @@ func (ti *timeoutInfo) String() string { return fmt.Sprintf("%v ; %d/%d %v", ti.Duration, ti.Height, ti.Round, ti.Step) } +type StepTimes struct { + Proposal types.StepDuration + Prevote types.StepDuration + Precommit types.StepDuration + sm.CommitStepTimes + WaitingForNewRound types.StepDuration +} + +func (st *StepTimes) StartNewRound() time.Time { + now := tmtime.Now() + if st.Current == &st.WaitingForNewRound { + st.Current.End = now + } + st.Current = &st.Proposal + st.Current.Start = now + return now +} + +func (st *StepTimes) ToPrevoteStep() time.Time { + return st.ToNextStep(&st.Proposal, &st.Prevote) +} + +func (st *StepTimes) ToPrecommitStep() time.Time { + return st.ToNextStep(&st.Prevote, &st.Precommit) +} + +func (st *StepTimes) ToCommitExecuting() time.Time { + return st.ToNextStep(&st.Precommit, &st.CommitExecuting) +} + +func (st *StepTimes) EndRound() time.Time { + now := tmtime.Now() + if st.Current == &st.CommitRechecking { + st.Current.End = now + st.Current = &st.WaitingForNewRound + } + return now +} + +func (st *StepTimes) StartWaiting() time.Time { + now := tmtime.Now() + if st.Current == &st.WaitingForNewRound { + st.Current.Start = now + } + return now +} + // interface to the mempool type txNotifier interface { TxsAvailable() <-chan struct{} @@ -98,20 +145,6 @@ func (sd *StepDuration) SetEnd() time.Time { return sd.end } -type StepTimes struct { - ProposalCreatedByMyself bool - ProposalCreating StepDuration - ProposalWaiting StepDuration - ProposalVerifying StepDuration - ProposalBlockReceiving StepDuration - PrevoteBlockVerifying StepDuration - PrevoteReceiving StepDuration - PrecommitBlockVerifying StepDuration - PrecommitReceiving StepDuration - CommitBlockVerifying StepDuration - CommitBlockApplying StepDuration -} - // State handles execution of the consensus algorithm. // It processes votes and proposals, and upon reaching agreement, // commits blocks to the chain and executes them against the application. @@ -183,7 +216,7 @@ type State struct { metrics *Metrics // times of each step - stepTimes StepTimes + stepTimes *StepTimes } // StateOption sets an optional parameter on the State. @@ -214,6 +247,7 @@ func NewState( evpool: evpool, evsw: tmevents.NewEventSwitch(), metrics: NopMetrics(), + stepTimes: &StepTimes{}, } // set function defaults (may be overwritten before calling Start) @@ -1016,7 +1050,7 @@ func (cs *State) enterNewRound(height int64, round int32) { return } - now := cs.stepTimes.ProposalWaiting.SetStart() + now := cs.stepTimes.StartNewRound() if cs.StartTime.After(now) { logger.Debug("need to set a buffer and log message here for sanity", "start_time", cs.StartTime, "now", now) } @@ -1133,11 +1167,9 @@ func (cs *State) enterPropose(height int64, round int32) { if cs.isProposer(address) { logger.Debug("propose step; our turn to propose", "proposer", address) cs.decideProposal(height, round) - cs.stepTimes.ProposalCreatedByMyself = true } else { logger.Debug("propose step; not our turn to propose", "proposer", cs.Proposer.Address, "privValidator", cs.privValidator) - cs.stepTimes.ProposalCreatedByMyself = false } if !cs.Voters.HasAddress(address) { @@ -1161,10 +1193,8 @@ func (cs *State) defaultDecideProposal(height int64, round int32) { block, blockParts = cs.ValidBlock, cs.ValidBlockParts } else { // Create a new proposal block from state/txs from the mempool. - cs.stepTimes.ProposalCreating.SetStart() block, blockParts = cs.createProposalBlock(round) if block == nil { // on error - cs.stepTimes.ProposalCreating.SetEnd() return } cs.Logger.Info("Create Block", "Height", height, "Round", round, @@ -1182,7 +1212,6 @@ func (cs *State) defaultDecideProposal(height int64, round int32) { proposal := types.NewProposal(height, round, cs.ValidRound, propBlockID) p := proposal.ToProto() if err := cs.privValidator.SignProposal(cs.state.ChainID, p); err == nil { - cs.stepTimes.ProposalCreating.SetEnd() proposal.Signature = p.Signature // send proposal and block parts on internal msg queue @@ -1196,7 +1225,6 @@ func (cs *State) defaultDecideProposal(height int64, round int32) { cs.Logger.Info("signed proposal", "height", height, "round", round, "proposal", proposal) cs.Logger.Debug("signed proposal block", "block", block) } else if !cs.replayMode { - cs.stepTimes.ProposalCreating.SetEnd() cs.Logger.Error("propose step; failed signing proposal", "height", height, "round", round, "err", err) } } @@ -1288,6 +1316,7 @@ func (cs *State) enterPrevote(height int64, round int32) { logger.Debug("entering prevote step", "current", fmt.Sprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step)) // Sign and broadcast vote as necessary + cs.stepTimes.ToPrevoteStep() cs.doPrevote(height, round) // Once `addVote` hits any +2/3 prevotes, we will go to PrevoteWait @@ -1307,8 +1336,6 @@ func (cs *State) defaultDoPrevote(height int64, round int32) { // If ProposalBlock is nil, prevote nil. if cs.ProposalBlock == nil { // if it already ends or not starts it will be ignored - cs.stepTimes.ProposalWaiting.SetEnd() - cs.stepTimes.ProposalBlockReceiving.SetEnd() logger.Debug("prevote step: ProposalBlock is nil") cs.signAddVote(tmproto.PrevoteType, nil, types.PartSetHeader{}) // increase missing proposal by one @@ -1316,24 +1343,20 @@ func (cs *State) defaultDoPrevote(height int64, round int32) { return } - cs.stepTimes.PrevoteBlockVerifying.SetStart() // Validate proposal block err := cs.blockExec.ValidateBlock(cs.state, round, cs.ProposalBlock) if err != nil { // ProposalBlock is invalid, prevote nil. logger.Error("prevote step: ProposalBlock is invalid", "err", err) - cs.stepTimes.PrevoteBlockVerifying.SetEnd() cs.signAddVote(tmproto.PrevoteType, nil, types.PartSetHeader{}) return } - cs.stepTimes.PrevoteBlockVerifying.SetEnd() // Prevote cs.ProposalBlock // NOTE: the proposal signature is validated when it is received, // and the proposal block parts are validated as they are received (against the merkle hash in the proposal) logger.Debug("prevote step: ProposalBlock is valid") cs.signAddVote(tmproto.PrevoteType, cs.ProposalBlock.Hash(), cs.ProposalBlockParts.Header()) - cs.stepTimes.PrevoteReceiving.SetStart() } // Enter: any +2/3 prevotes at next round. @@ -1392,6 +1415,8 @@ func (cs *State) enterPrecommit(height int64, round int32) { cs.newStep() }() + cs.stepTimes.ToPrecommitStep() + // check for a polka blockID, ok := cs.Votes.Prevotes(round).TwoThirdsMajority() @@ -1449,7 +1474,6 @@ func (cs *State) enterPrecommit(height int64, round int32) { } cs.signAddVote(tmproto.PrecommitType, blockID.Hash, blockID.PartSetHeader) - cs.stepTimes.PrecommitReceiving.SetStart() return } @@ -1458,13 +1482,11 @@ func (cs *State) enterPrecommit(height int64, round int32) { logger.Debug("precommit step; +2/3 prevoted proposal block; locking", "hash", blockID.Hash) // Validate the block. - cs.stepTimes.PrecommitBlockVerifying.SetStart() if err := cs.blockExec.ValidateBlock(cs.state, round, cs.ProposalBlock); err != nil { cs.Logger.Error(fmt.Sprintf("%v; block=%v", err, cs.ProposalBlock)) panic(fmt.Sprintf("enterPrecommit: +2/3 prevoted for an invalid block: %v", err)) } - cs.stepTimes.PrecommitBlockVerifying.SetEnd() cs.LockedRound = round cs.LockedBlock = cs.ProposalBlock cs.LockedBlockParts = cs.ProposalBlockParts @@ -1474,7 +1496,6 @@ func (cs *State) enterPrecommit(height int64, round int32) { } cs.signAddVote(tmproto.PrecommitType, blockID.Hash, blockID.PartSetHeader) - cs.stepTimes.PrecommitReceiving.SetStart() return } @@ -1637,7 +1658,6 @@ func (cs *State) finalizeCommit(height int64) { blockID, ok := cs.Votes.Precommits(cs.CommitRound).TwoThirdsMajority() block, blockParts := cs.ProposalBlock, cs.ProposalBlockParts - cs.stepTimes.CommitBlockVerifying.SetStart() if !ok { panic("cannot finalize commit; commit does not have 2/3 majority") } @@ -1650,7 +1670,6 @@ func (cs *State) finalizeCommit(height int64) { if err := cs.blockExec.ValidateBlock(cs.state, cs.CommitRound, block); err != nil { panic(fmt.Sprintf("+2/3 committed an invalid block: %v", err)) } - cs.stepTimes.CommitBlockVerifying.SetEnd() logger.Info( "finalizing commit of block", @@ -1704,12 +1723,12 @@ func (cs *State) finalizeCommit(height int64) { // Execute and commit the block, update and save the state, and update the mempool. // NOTE The block.AppHash wont reflect these txs until the next block. - cs.stepTimes.CommitBlockApplying.SetStart() var ( err error retainHeight int64 ) + cs.stepTimes.ToCommitExecuting() stateCopy, retainHeight, err = cs.blockExec.ApplyBlock( stateCopy, types.BlockID{ @@ -1717,12 +1736,12 @@ func (cs *State) finalizeCommit(height int64) { PartSetHeader: blockParts.Header(), }, block, + &cs.stepTimes.CommitStepTimes, ) if err != nil { logger.Error("failed to apply block", "err", err) return } - cs.stepTimes.CommitBlockApplying.SetEnd() fail.Fail() // XXX @@ -1736,6 +1755,8 @@ func (cs *State) finalizeCommit(height int64) { } } + cs.stepTimes.EndRound() + // must be called before we update state cs.recordMetrics(height, block) @@ -1756,6 +1777,8 @@ func (cs *State) finalizeCommit(height int64) { // * cs.Height has been increment to height+1 // * cs.Step is now cstypes.RoundStepNewHeight // * cs.StartTime is set to when we will start round0. + + cs.stepTimes.StartWaiting() } func (cs *State) pruneBlocks(retainHeight int64) (uint64, error) { @@ -1889,18 +1912,13 @@ func (cs *State) recordMetrics(height int64, block *types.Block) { cs.metrics.CommittedHeight.Set(float64(block.Height)) cs.metrics.RoundFailures.Observe(float64(cs.Round)) - if cs.stepTimes.ProposalCreatedByMyself { - cs.metrics.ProposalCreating.Observe(cs.stepTimes.ProposalCreating.GetDuration()) - } - cs.metrics.ProposalWaiting.Observe(cs.stepTimes.ProposalWaiting.GetDuration()) - cs.metrics.ProposalVerifying.Observe(cs.stepTimes.ProposalVerifying.GetDuration()) - cs.metrics.ProposalBlockReceiving.Observe(cs.stepTimes.ProposalBlockReceiving.GetDuration()) - cs.metrics.PrevoteBlockVerifying.Observe(cs.stepTimes.PrevoteBlockVerifying.GetDuration()) - cs.metrics.PrevoteReceiving.Observe(cs.stepTimes.PrevoteReceiving.GetDuration()) - cs.metrics.PrecommitBlockVerifying.Observe(cs.stepTimes.PrecommitBlockVerifying.GetDuration()) - cs.metrics.PrecommitReceiving.Observe(cs.stepTimes.PrecommitReceiving.GetDuration()) - cs.metrics.CommitBlockVerifying.Observe(cs.stepTimes.CommitBlockVerifying.GetDuration()) - cs.metrics.CommitBlockApplying.Observe(cs.stepTimes.CommitBlockApplying.GetDuration()) + cs.metrics.DurationProposal.Observe(cs.stepTimes.Proposal.GetDuration()) + cs.metrics.DurationPrevote.Observe(cs.stepTimes.Prevote.GetDuration()) + cs.metrics.DurationPrecommit.Observe(cs.stepTimes.Precommit.GetDuration()) + cs.metrics.DurationCommitExecuting.Observe(cs.stepTimes.CommitExecuting.GetDuration()) + cs.metrics.DurationCommitCommitting.Observe(cs.stepTimes.CommitCommitting.GetDuration()) + cs.metrics.DurationCommitRechecking.Observe(cs.stepTimes.CommitRechecking.GetDuration()) + cs.metrics.DurationWaitingForNewRound.Observe(cs.stepTimes.WaitingForNewRound.GetDuration()) } //----------------------------------------------------------------------------- @@ -1922,9 +1940,7 @@ func (cs *State) defaultSetProposal(proposal *types.Proposal) error { (proposal.POLRound >= 0 && proposal.POLRound >= proposal.Round) { return ErrInvalidProposalPOLRound } - cs.stepTimes.ProposalWaiting.SetEnd() - cs.stepTimes.ProposalVerifying.SetStart() // If consensus does not enterNewRound yet, cs.Proposer may be nil or prior proposer, so don't use cs.Proposer proposer := cs.Validators.SelectProposer(cs.state.LastProofHash, proposal.Height, proposal.Round) @@ -1933,13 +1949,11 @@ func (cs *State) defaultSetProposal(proposal *types.Proposal) error { if !proposer.PubKey.VerifySignature( types.ProposalSignBytes(cs.state.ChainID, p), proposal.Signature, ) { - cs.stepTimes.ProposalVerifying.SetEnd() cs.Logger.Error(fmt.Sprintf("proposal signature verification failed: proposer=%X, bytes=%X, signature=%X", cs.Proposer.Address, types.ProposalSignBytes(cs.state.ChainID, p), proposal.Signature)) return ErrInvalidProposalSignature } - cs.stepTimes.ProposalVerifying.SetEnd() proposal.Signature = p.Signature cs.Proposal = proposal @@ -1951,7 +1965,6 @@ func (cs *State) defaultSetProposal(proposal *types.Proposal) error { } cs.Logger.Info("received proposal", "proposal", proposal) - cs.stepTimes.ProposalBlockReceiving.SetStart() return nil } @@ -2008,7 +2021,6 @@ func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID p2p.ID) (add } cs.ProposalBlock = block - cs.stepTimes.ProposalBlockReceiving.SetEnd() // NOTE: it's possible to receive complete proposal blocks for future rounds without having the proposal cs.Logger.Info("received complete proposal block", "height", cs.ProposalBlock.Height, "hash", cs.ProposalBlock.Hash()) @@ -2171,7 +2183,6 @@ func (cs *State) addVote(vote *types.Vote, peerID p2p.ID) (added bool, err error // If +2/3 prevotes for a block or nil for *any* round: if blockID, ok := prevotes.TwoThirdsMajority(); ok { - cs.stepTimes.PrevoteReceiving.SetEnd() // There was a polka! // If we're locked but this is a recent polka, unlock. @@ -2252,7 +2263,6 @@ func (cs *State) addVote(vote *types.Vote, peerID p2p.ID) (added bool, err error blockID, ok := precommits.TwoThirdsMajority() if ok { - cs.stepTimes.PrecommitReceiving.SetEnd() // Executed as TwoThirdsMajority could be from a higher round cs.enterNewRound(height, vote.Round) cs.enterPrecommit(height, vote.Round) @@ -2266,7 +2276,6 @@ func (cs *State) addVote(vote *types.Vote, peerID p2p.ID) (added bool, err error cs.enterPrecommitWait(height, vote.Round) } } else if cs.Round <= vote.Round && precommits.HasTwoThirdsAny() { - cs.stepTimes.PrecommitReceiving.SetEnd() cs.enterNewRound(height, vote.Round) cs.enterPrecommitWait(height, vote.Round) } diff --git a/state/execution.go b/state/execution.go index dd740f2e9..55c6c7792 100644 --- a/state/execution.go +++ b/state/execution.go @@ -6,6 +6,7 @@ import ( "time" "github.com/line/ostracon/crypto" + time2 "github.com/line/ostracon/types/time" abci "github.com/line/ostracon/abci/types" cryptoenc "github.com/line/ostracon/crypto/encoding" @@ -45,6 +46,30 @@ type BlockExecutor struct { metrics *Metrics } +type CommitStepTimes struct { + CommitExecuting types.StepDuration + CommitCommitting types.StepDuration + CommitRechecking types.StepDuration + Current *types.StepDuration +} + +func (st *CommitStepTimes) ToNextStep(from, next *types.StepDuration) time.Time { + now := time2.Now() + if st.Current == from { + from.End, next.Start = now, now + st.Current = next + } + return now +} + +func (st *CommitStepTimes) ToCommitCommitting() time.Time { + return st.ToNextStep(&st.CommitExecuting, &st.CommitCommitting) +} + +func (st *CommitStepTimes) ToCommitRechecking() time.Time { + return st.ToNextStep(&st.CommitCommitting, &st.CommitRechecking) +} + type BlockExecutorOption func(executor *BlockExecutor) func BlockExecutorWithMetrics(metrics *Metrics) BlockExecutorOption { @@ -130,7 +155,7 @@ func (blockExec *BlockExecutor) ValidateBlock(state State, round int32, block *t // from outside this package to process and commit an entire block. // It takes a blockID to avoid recomputing the parts hash. func (blockExec *BlockExecutor) ApplyBlock( - state State, blockID types.BlockID, block *types.Block, + state State, blockID types.BlockID, block *types.Block, stepTimes *CommitStepTimes, ) (State, int64, error) { // When doing ApplyBlock, we don't need to check whether the block.Round is same to current round, @@ -183,9 +208,13 @@ func (blockExec *BlockExecutor) ApplyBlock( return state, 0, fmt.Errorf("commit failed for application: %v", err) } + if stepTimes != nil { + stepTimes.ToCommitCommitting() + } + // Lock mempool, commit app state, update mempoool. commitStartTime := time.Now().UnixNano() - appHash, retainHeight, err := blockExec.Commit(state, block, abciResponses.DeliverTxs) + appHash, retainHeight, err := blockExec.Commit(state, block, abciResponses.DeliverTxs, stepTimes) commitEndTime := time.Now().UnixNano() commitTimeMs := float64(commitEndTime-commitStartTime) / 1000000 @@ -225,6 +254,7 @@ func (blockExec *BlockExecutor) Commit( state State, block *types.Block, deliverTxResponses []*abci.ResponseDeliverTx, + stepTimes *CommitStepTimes, ) ([]byte, int64, error) { blockExec.mempool.Lock() defer blockExec.mempool.Unlock() @@ -258,6 +288,10 @@ func (blockExec *BlockExecutor) Commit( "app_hash", fmt.Sprintf("%X", res.Data), ) + if stepTimes != nil { + stepTimes.ToCommitRechecking() + } + // Update mempool. updateMempoolStartTime := time.Now().UnixNano() err = blockExec.mempool.Update( diff --git a/state/execution_test.go b/state/execution_test.go index 6591e34d3..b43f84ccd 100644 --- a/state/execution_test.go +++ b/state/execution_test.go @@ -51,7 +51,7 @@ func TestApplyBlock(t *testing.T) { block := makeBlockWithPrivVal(state, privVals[state.Validators.Validators[0].Address.String()], 1) blockID := types.BlockID{Hash: block.Hash(), PartSetHeader: block.MakePartSet(testPartSize).Header()} - state, retainHeight, err := blockExec.ApplyBlock(state, blockID, block) + state, retainHeight, err := blockExec.ApplyBlock(state, blockID, block, nil) require.Nil(t, err) assert.EqualValues(t, retainHeight, 1) @@ -273,7 +273,7 @@ func TestBeginBlockByzantineValidators(t *testing.T) { proof, _ := privVal.GenerateVRFProof(message) block.Proof = bytes.HexBytes(proof) - state, retainHeight, err := blockExec.ApplyBlock(state, blockID, block) + state, retainHeight, err := blockExec.ApplyBlock(state, blockID, block, nil) require.Nil(t, err) assert.EqualValues(t, retainHeight, 1) @@ -455,7 +455,7 @@ func TestEndBlockValidatorUpdates(t *testing.T) { {PubKey: pk, Power: 10}, } - state, _, err = blockExec.ApplyBlock(state, blockID, block) + state, _, err = blockExec.ApplyBlock(state, blockID, block, nil) require.Nil(t, err) // test new validator was added to NextValidators if assert.Equal(t, state.Validators.Size()+1, state.NextValidators.Size()) { @@ -511,7 +511,7 @@ func TestEndBlockValidatorUpdatesResultingInEmptySet(t *testing.T) { {PubKey: vp, Power: 0}, } - assert.NotPanics(t, func() { state, _, err = blockExec.ApplyBlock(state, blockID, block) }) + assert.NotPanics(t, func() { state, _, err = blockExec.ApplyBlock(state, blockID, block, nil) }) assert.NotNil(t, err) assert.NotEmpty(t, state.NextValidators.Validators) } diff --git a/state/helpers_test.go b/state/helpers_test.go index 7878f1069..0f26bf1ca 100644 --- a/state/helpers_test.go +++ b/state/helpers_test.go @@ -65,7 +65,7 @@ func makeAndApplyGoodBlock(state sm.State, privVal types.PrivValidator, height i } blockID := types.BlockID{Hash: block.Hash(), PartSetHeader: types.PartSetHeader{Total: 3, Hash: tmrand.Bytes(32)}} - state, _, err := blockExec.ApplyBlock(state, blockID, block) + state, _, err := blockExec.ApplyBlock(state, blockID, block, nil) if err != nil { return state, types.BlockID{}, err } diff --git a/test/maverick/consensus/replay.go b/test/maverick/consensus/replay.go index 6b2b03bbb..d82c05497 100644 --- a/test/maverick/consensus/replay.go +++ b/test/maverick/consensus/replay.go @@ -501,7 +501,7 @@ func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.Ap blockExec.SetEventBus(h.eventBus) var err error - state, _, err = blockExec.ApplyBlock(state, meta.BlockID, block) + state, _, err = blockExec.ApplyBlock(state, meta.BlockID, block, nil) if err != nil { return sm.State{}, err } diff --git a/test/maverick/consensus/state.go b/test/maverick/consensus/state.go index c31d2eb41..dea1d6141 100644 --- a/test/maverick/consensus/state.go +++ b/test/maverick/consensus/state.go @@ -1516,7 +1516,7 @@ func (cs *State) finalizeCommit(height int64) { stateCopy, retainHeight, err = cs.blockExec.ApplyBlock( stateCopy, types.BlockID{Hash: block.Hash(), PartSetHeader: blockParts.Header()}, - block) + block, nil) if err != nil { cs.Logger.Error("Error on ApplyBlock", "err", err) return diff --git a/types/utils.go b/types/utils.go index cec47e202..2646464dd 100644 --- a/types/utils.go +++ b/types/utils.go @@ -1,6 +1,21 @@ package types -import "reflect" +import ( + "reflect" + "time" +) + +type StepDuration struct { + Start time.Time + End time.Time +} + +func (sd *StepDuration) GetDuration() float64 { + if sd.End.After(sd.Start) { + return float64(sd.End.Sub(sd.Start).Microseconds()) / 1000 + } + return 0 +} // Go lacks a simple and safe way to see if something is a typed nil. // See: