Skip to content

Commit

Permalink
network/consensus: use new dbft StopTxFlow callback
Browse files Browse the repository at this point in the history
It makes sense in general (further narrowing down the time window when
transactions are processed by consensus thread) and it improves block times a
little too, especially in the 7+2 scenario.

Related to #2744.
  • Loading branch information
roman-khimov committed Oct 18, 2022
1 parent 7307974 commit 73ce898
Show file tree
Hide file tree
Showing 8 changed files with 19 additions and 6 deletions.
1 change: 1 addition & 0 deletions cli/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ func mkConsensus(config config.Wallet, tpb time.Duration, chain *core.Blockchain
Chain: chain,
ProtocolConfiguration: chain.GetConfig(),
RequestTx: serv.RequestTx,
StopTxFlow: serv.StopTxFlow,
Wallet: &config,
TimePerBlock: tpb,
})
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/holiman/uint256 v1.2.0
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
github.com/mr-tron/base58 v1.2.0
github.com/nspcc-dev/dbft v0.0.0-20220902113116-58a5e763e647
github.com/nspcc-dev/dbft v0.0.0-20221018080254-c7e1bf49ccd7
github.com/nspcc-dev/go-ordered-json v0.0.0-20220111165707-25110be27d22
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20220927123257-24c107e3a262
github.com/nspcc-dev/neofs-sdk-go v0.0.0-20220113123743-7f3162110659
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,8 @@ github.com/nspcc-dev/dbft v0.0.0-20191209120240-0d6b7568d9ae/go.mod h1:3FjXOoHmA
github.com/nspcc-dev/dbft v0.0.0-20200117124306-478e5cfbf03a/go.mod h1:/YFK+XOxxg0Bfm6P92lY5eDSLYfp06XOdL8KAVgXjVk=
github.com/nspcc-dev/dbft v0.0.0-20200219114139-199d286ed6c1/go.mod h1:O0qtn62prQSqizzoagHmuuKoz8QMkU3SzBoKdEvm3aQ=
github.com/nspcc-dev/dbft v0.0.0-20210721160347-1b03241391ac/go.mod h1:U8MSnEShH+o5hexfWJdze6uMFJteP0ko7J2frO7Yu1Y=
github.com/nspcc-dev/dbft v0.0.0-20220902113116-58a5e763e647 h1:handGBjqVzRx7HD6152zsP8ZRxw083zCMbN0IlUaPQk=
github.com/nspcc-dev/dbft v0.0.0-20220902113116-58a5e763e647/go.mod h1:g9xisXmX9NP9MjioaTe862n9SlZTrP+6PVUWLBYOr98=
github.com/nspcc-dev/dbft v0.0.0-20221018080254-c7e1bf49ccd7 h1:RxVI9RFiHmpUvbuYIM5siLMiOvVt8P651BdmTstGi3Q=
github.com/nspcc-dev/dbft v0.0.0-20221018080254-c7e1bf49ccd7/go.mod h1:g9xisXmX9NP9MjioaTe862n9SlZTrP+6PVUWLBYOr98=
github.com/nspcc-dev/go-ordered-json v0.0.0-20210915112629-e1b6cce73d02/go.mod h1:79bEUDEviBHJMFV6Iq6in57FEOCMcRhfQnfaf0ETA5U=
github.com/nspcc-dev/go-ordered-json v0.0.0-20220111165707-25110be27d22 h1:n4ZaFCKt1pQJd7PXoMJabZWK9ejjbLOVrkl/lOUmshg=
github.com/nspcc-dev/go-ordered-json v0.0.0-20220111165707-25110be27d22/go.mod h1:79bEUDEviBHJMFV6Iq6in57FEOCMcRhfQnfaf0ETA5U=
Expand Down
1 change: 1 addition & 0 deletions internal/testcli/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ func NewTestChain(t *testing.T, f func(*config.Config), run bool) (*core.Blockch
Chain: chain,
ProtocolConfiguration: chain.GetConfig(),
RequestTx: netSrv.RequestTx,
StopTxFlow: netSrv.StopTxFlow,
Wallet: &cfg.ApplicationConfiguration.UnlockWallet,
TimePerBlock: serverConfig.TimePerBlock,
})
Expand Down
4 changes: 4 additions & 0 deletions pkg/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ type Config struct {
// RequestTx is a callback to which will be called
// when a node lacks transactions present in the block.
RequestTx func(h ...util.Uint256)
// StopTxFlow is a callback that is called after the consensus
// process stops accepting incoming transactions.
StopTxFlow func()
// TimePerBlock is minimal time that should pass before the next block is accepted.
TimePerBlock time.Duration
// Wallet is a local-node wallet configuration.
Expand Down Expand Up @@ -173,6 +176,7 @@ func NewService(cfg Config) (Service, error) {
dbft.WithSecondsPerBlock(cfg.TimePerBlock),
dbft.WithGetKeyPair(srv.getKeyPair),
dbft.WithRequestTx(cfg.RequestTx),
dbft.WithStopTxFlow(cfg.StopTxFlow),
dbft.WithGetTx(srv.getTx),
dbft.WithGetVerified(srv.getVerifiedTx),
dbft.WithBroadcast(srv.broadcast),
Expand Down
1 change: 1 addition & 0 deletions pkg/consensus/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ func newTestServiceWithChain(t *testing.T, bc *core.Blockchain) *service {
Chain: bc,
ProtocolConfiguration: bc.GetConfig(),
RequestTx: func(...util.Uint256) {},
StopTxFlow: func() {},
TimePerBlock: time.Duration(bc.GetConfig().SecondsPerBlock) * time.Second,
Wallet: &config.Wallet{
Path: "./testdata/wallet1.json",
Expand Down
11 changes: 8 additions & 3 deletions pkg/network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ type (
services map[string]Service
extensHandlers map[string]func(*payload.Extensible) error
txCallback func(*transaction.Transaction)
txCbHeight atomic.Uint32
txCbEnabled atomic.Bool

txInLock sync.Mutex
txInMap map[util.Uint256]struct{}
Expand Down Expand Up @@ -1045,7 +1045,7 @@ func (s *Server) handleTxCmd(tx *transaction.Transaction) error {
s.serviceLock.RLock()
txCallback := s.txCallback
s.serviceLock.RUnlock()
if txCallback != nil && s.chain.BlockHeight() <= s.txCbHeight.Load() {
if txCallback != nil && s.txCbEnabled.Load() {
txCallback(tx)
}
if s.verifyAndPoolTX(tx) == nil {
Expand Down Expand Up @@ -1345,7 +1345,7 @@ func (s *Server) RequestTx(hashes ...util.Uint256) {
return
}

s.txCbHeight.Store(s.chain.BlockHeight())
s.txCbEnabled.Store(true)

for i := 0; i <= len(hashes)/payload.MaxHashesCount; i++ {
start := i * payload.MaxHashesCount
Expand All @@ -1363,6 +1363,11 @@ func (s *Server) RequestTx(hashes ...util.Uint256) {
}
}

// StopTxFlow makes the server not call previously specified consensus transaction callback.
func (s *Server) StopTxFlow() {
s.txCbEnabled.Store(false)
}

// iteratePeersWithSendMsg sends the given message to all peers using two functions
// passed, one is to send the message and the other is to filtrate peers (the
// peer is considered invalid if it returns false).
Expand Down
1 change: 1 addition & 0 deletions pkg/network/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ func TestTransaction(t *testing.T) {
cons := new(fakeConsensus)
s.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction)
startWithCleanup(t, s)
s.RequestTx(util.Uint256{1})

t.Run("good", func(t *testing.T) {
tx := newDummyTx()
Expand Down

0 comments on commit 73ce898

Please sign in to comment.