Skip to content

Commit

Permalink
network: separate tx handling from msg handling
Browse files Browse the repository at this point in the history
This allows to naturally scale transaction processing if we have some peer
that is sending a lot of them while others are mostly silent. It also can help
somewhat in the event we have 50 peers that all send transactions. 4+1
scenario benefits a lot from it, while 7+2 slows down a little. Delayed
scenarios don't care.

Surprisingly, this also makes disconnects (#2744) much more rare, 4-node
scenario almost never sees it now. Most probably this is the case where peers
affect each other a lot, single-threaded transaction receiver can be slow
enough to trigger some timeout in getdata handler of its peer (because it
tries to push a number of replies).
  • Loading branch information
roman-khimov committed Oct 21, 2022
1 parent e003b67 commit 12b9076
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 13 deletions.
64 changes: 53 additions & 11 deletions pkg/network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"math/big"
mrand "math/rand"
"net"
"runtime"
"sort"
"strconv"
"sync"
Expand Down Expand Up @@ -111,6 +112,7 @@ type (
txCbEnabled atomic.Bool

txInLock sync.Mutex
txin chan *transaction.Transaction
txInMap map[util.Uint256]struct{}

lock sync.RWMutex
Expand Down Expand Up @@ -183,6 +185,7 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
mempool: chain.GetMemPool(),
extensiblePool: extpool.New(chain, config.ExtensiblePoolSize),
log: log,
txin: make(chan *transaction.Transaction, 64),
transactions: make(chan *transaction.Transaction, 64),
services: make(map[string]Service),
extensHandlers: make(map[string]func(*payload.Extensible) error),
Expand Down Expand Up @@ -256,6 +259,10 @@ func (s *Server) Start(errChan chan error) {
s.tryStartServices()
s.initStaleMemPools()

var txThreads = optimalNumOfThreads()
for i := 0; i < txThreads; i++ {
go s.txHandlerLoop()
}
go s.broadcastTxLoop()
go s.relayBlocksLoop()
go s.bQueue.run()
Expand Down Expand Up @@ -1042,19 +1049,39 @@ func (s *Server) handleTxCmd(tx *transaction.Transaction) error {
}
s.txInMap[tx.Hash()] = struct{}{}
s.txInLock.Unlock()
s.serviceLock.RLock()
txCallback := s.txCallback
s.serviceLock.RUnlock()
if txCallback != nil && s.txCbEnabled.Load() {
txCallback(tx)
s.txin <- tx
return nil
}

func (s *Server) txHandlerLoop() {
txloop:
for {
select {
case tx := <-s.txin:
s.serviceLock.RLock()
txCallback := s.txCallback
s.serviceLock.RUnlock()
if txCallback != nil && s.txCbEnabled.Load() {
txCallback(tx)
}
if s.verifyAndPoolTX(tx) == nil {
s.broadcastTX(tx, nil)
}
s.txInLock.Lock()
delete(s.txInMap, tx.Hash())
s.txInLock.Unlock()
case <-s.quit:
break txloop
}
}
if s.verifyAndPoolTX(tx) == nil {
s.broadcastTX(tx, nil)
drainloop:
for {
select {
case <-s.txin:
default:
break drainloop
}
}
s.txInLock.Lock()
delete(s.txInMap, tx.Hash())
s.txInLock.Unlock()
return nil
}

// handleP2PNotaryRequestCmd process the received P2PNotaryRequest payload.
Expand Down Expand Up @@ -1589,3 +1616,18 @@ func (s *Server) Port() (uint16, error) {
}
return port, nil
}

// optimalNumOfThreads returns the optimal number of processing threads to create
// for transaction processing.
func optimalNumOfThreads() int {
// Doing more won't help, mempool is still a contention point.
const maxThreads = 16
var threads = runtime.GOMAXPROCS(0)
if threads > runtime.NumCPU() {
threads = runtime.NumCPU()
}
if threads > maxThreads {
threads = maxThreads
}
return threads
}
18 changes: 16 additions & 2 deletions pkg/network/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,13 +477,27 @@ func TestTransaction(t *testing.T) {
s.register <- p

s.testHandleMessage(t, nil, CMDTX, tx)
require.Contains(t, s.services["fake"].(*fakeConsensus).txs, tx)
require.Eventually(t, func() bool {
for _, t := range s.services["fake"].(*fakeConsensus).txs {
if t == tx {
return true
}
}
return false
}, 2*time.Second, time.Millisecond*500)
})
t.Run("bad", func(t *testing.T) {
tx := newDummyTx()
s.chain.(*fakechain.FakeChain).PoolTxF = func(*transaction.Transaction) error { return core.ErrInsufficientFunds }
s.testHandleMessage(t, nil, CMDTX, tx)
require.Contains(t, s.services["fake"].(*fakeConsensus).txs, tx) // Consensus receives everything.
require.Eventually(t, func() bool {
for _, t := range s.services["fake"].(*fakeConsensus).txs {
if t == tx {
return true
}
}
return false
}, 2*time.Second, time.Millisecond*500)
})
}

Expand Down

0 comments on commit 12b9076

Please sign in to comment.