Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: WIP - Stress Test Part 2 #557

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/zetaclientd/generate_new_tss.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,13 @@ func genNewTSSAtBlock(cfg *config.Config, bridge *mc.ZetaCoreBridge, tss *mc.TSS
}
log.Info().Msgf("setting TSS pubkey: %s", res.PubKey)
err = tss.InsertPubKey(res.PubKey)
tss.CurrentPubkey = res.PubKey

if err != nil {
log.Error().Msgf("SetPubKey fail")
return err
}
tss.CurrentPubkey = res.PubKey // this is only needed for version 0.13.0 leaderless keysign
tss.Signers = pubKeys
log.Info().Msgf("TSS address in hex: %s", tss.EVMAddress().Hex())

return nil
Expand Down
4 changes: 2 additions & 2 deletions contrib/localnet/orchestrator/smoketest/test_stress.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
const (
StatInterval = 5
DepositInterval = 1
WithdrawInterval = 1
WithdrawInterval = 500
)

var (
Expand Down Expand Up @@ -174,7 +174,7 @@ func (sm *SmokeTest) SendCCtx(msg []byte) {

// WithdrawCCtx withdraw USDT from ZEVM to EVM
func (sm *SmokeTest) WithdrawCCtx() {
ticker := time.NewTicker(time.Second * WithdrawInterval)
ticker := time.NewTicker(time.Millisecond * WithdrawInterval)
for {
select {
case <-ticker.C:
Expand Down
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ require (
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1
github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce
github.com/evmos/ethermint v0.21.0
github.com/pelletier/go-toml v1.9.5
github.com/pkg/errors v0.9.1
github.com/rakyll/statik v0.1.7
github.com/zeta-chain/protocol-contracts v0.0.0-20230421024932-34119ba32459
Expand All @@ -61,6 +60,7 @@ require (
github.com/cockroachdb/redact v1.1.3 // indirect
github.com/getsentry/sentry-go v0.18.0 // indirect
github.com/linxGnu/grocksdb v1.7.15 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/prometheus/tsdb v0.7.1 // indirect
github.com/rjeczalik/notify v0.9.1 // indirect
)
Expand Down Expand Up @@ -92,7 +92,7 @@ require (
github.com/cheekybits/genny v1.0.0 // indirect
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e // indirect
github.com/cockroachdb/apd/v2 v2.0.2 // indirect
github.com/coinbase/rosetta-sdk-go v0.7.9 // indirect
github.com/coinbase/rosetta-sdk-go v0.7.9
github.com/confio/ics23/go v0.9.0 // indirect
github.com/containerd/cgroups v1.0.4 // indirect
github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534 // indirect
Expand Down Expand Up @@ -180,10 +180,10 @@ require (
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
github.com/libp2p/go-cidranger v1.1.0 // indirect
github.com/libp2p/go-flow-metrics v0.1.0 // indirect
github.com/libp2p/go-libp2p v0.22.0 // indirect
github.com/libp2p/go-libp2p v0.22.0
github.com/libp2p/go-libp2p-asn-util v0.2.0 // indirect
github.com/libp2p/go-libp2p-core v0.20.0 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.18.0 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.18.0
github.com/libp2p/go-libp2p-kbucket v0.4.7 // indirect
github.com/libp2p/go-libp2p-record v0.2.0 // indirect
github.com/libp2p/go-msgio v0.2.0 // indirect
Expand Down Expand Up @@ -298,5 +298,5 @@ replace (
github.com/confio/ics23/go => github.com/cosmos/cosmos-sdk/ics23/go v0.8.0
github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1
github.com/tendermint/tm-db => github.com/BlockPILabs/cosmos-db v0.0.3
gitlab.com/thorchain/tss/go-tss => github.com/brewmaster012/go-tss v0.0.0-20230328191220-06e3b12d56a7
gitlab.com/thorchain/tss/go-tss => github.com/brewmaster012/go-tss v0.0.0-20230513044345-2316cd28fa3f
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnweb
github.com/bmizerany/pat v0.0.0-20170815010413-6226ea591a40/go.mod h1:8rLXio+WjiTceGBHIoTvn60HIbs7Hm7bcHjyrSqYB9c=
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=
github.com/brewmaster012/go-tss v0.0.0-20230328191220-06e3b12d56a7 h1:oP3b/kCGmDEvM5mhj5Dm16joK9wx922k62B/79nrG5Q=
github.com/brewmaster012/go-tss v0.0.0-20230328191220-06e3b12d56a7/go.mod h1:RYOe4ihG8KkoQQhW6ljiyxyW6F3CoZA3ozyyEb5HVmI=
github.com/brewmaster012/go-tss v0.0.0-20230513044345-2316cd28fa3f h1:gFP4wtI/KhJhkcakkz0AkppJnTjtKP+HjJiN75+N+SE=
github.com/brewmaster012/go-tss v0.0.0-20230513044345-2316cd28fa3f/go.mod h1:RYOe4ihG8KkoQQhW6ljiyxyW6F3CoZA3ozyyEb5HVmI=
github.com/btcsuite/btcd v0.22.1 h1:CnwP9LM/M9xuRrGSCGeMVs9iv09uMqwsVX7EeIpgV2c=
github.com/btcsuite/btcd v0.22.1/go.mod h1:wqgTSL29+50LRkmOVknEdmt8ZojIzhuWvgu/iptuN7Y=
github.com/btcsuite/btcd/btcec/v2 v2.1.2/go.mod h1:ctjw4H1kknNJmRN4iP1R7bTQ+v3GJkZBd6mui8ZsAZE=
Expand Down
3 changes: 2 additions & 1 deletion zetaclient/tss_signer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type TSS struct {
Keys map[string]*TSSKey // PubkeyInBech32 => TSSKey
CurrentPubkey string
logger zerolog.Logger
Signers []string
}

var _ TSSSigner = (*TSS)(nil)
Expand All @@ -88,7 +89,7 @@ func (tss *TSS) Sign(digest []byte) ([65]byte, error) {
log.Debug().Msgf("hash of digest is %s", H)

tssPubkey := tss.CurrentPubkey
keysignReq := keysign.NewRequest(tssPubkey, []string{base64.StdEncoding.EncodeToString(H)}, 10, nil, "0.14.0")
keysignReq := keysign.NewRequest(tssPubkey, []string{base64.StdEncoding.EncodeToString(H)}, 10, tss.Signers, "0.14.0")
ksRes, err := tss.Server.KeySign(keysignReq)
if err != nil {
log.Warn().Msg("keysign fail")
Expand Down
85 changes: 70 additions & 15 deletions zetaclient/zetacore_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package zetaclient

import (
"fmt"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p/core/protocol"
"math"
"sort"
"strings"
Expand Down Expand Up @@ -33,7 +35,7 @@ type CoreObserver struct {
signerMap map[common.Chain]ChainSigner
clientMap map[common.Chain]ChainClient
metrics *metrics.Metrics
tss *TSS
Tss *TSS
logger ZetaCoreLog
cfg *config.Config
}
Expand All @@ -49,7 +51,7 @@ func NewCoreObserver(bridge *ZetaCoreBridge, signerMap map[common.Chain]ChainSig
ZetaChainWatcher: chainLogger.With().Str("module", "ZetaChainWatcher").Logger(),
}

co.tss = tss
co.Tss = tss
co.bridge = bridge
co.signerMap = signerMap

Expand Down Expand Up @@ -78,13 +80,46 @@ func (co *CoreObserver) MonitorCore() {
go co.startSendScheduler()
}

// returns map(protocolID -> count); map(connID -> count)
func countActiveStreams(n network.Network) (map[string]int, map[string]int, int) {
count := 0
conns := n.Conns()
protocolCount := make(map[string]int)
connCount := make(map[string]int)
for _, conn := range conns {
count += len(conn.GetStreams())
for _, stream := range conn.GetStreams() {
protocolCount[string(stream.Protocol())]++
}
connCount[string(conn.ID())] += len(conn.GetStreams())
}
return protocolCount, connCount, count
}

var joinPartyProtocolWithLeader protocol.ID = "/p2p/join-party-leader"

func releaseAllStreams(n network.Network) int {
conns := n.Conns()
cnt := 0
for _, conn := range conns {
for _, stream := range conn.GetStreams() {
if stream.Protocol() == joinPartyProtocolWithLeader {
stream.Reset()
cnt++
}
}
}
return cnt
}

// ZetaCore block is heart beat; each block we schedule some send according to
// retry schedule. ses
func (co *CoreObserver) startSendScheduler() {
outTxMan := NewOutTxProcessorManager(co.logger.ChainLogger)
go outTxMan.StartMonitorHealth()
observeTicker := time.NewTicker(1 * time.Second)
var lastBlockNum int64
zblockToProcessedNonce := make(map[int64]int64)
for range observeTicker.C {
bn, err := co.bridge.GetZetaBlockHeight()
if err != nil {
Expand All @@ -99,6 +134,7 @@ func (co *CoreObserver) startSendScheduler() {
if bn%10 == 0 {
co.logger.ZetaChainWatcher.Debug().Msgf("ZetaCore heart beat: %d", bn)
}

//tStart := time.Now()
sendList, err := co.bridge.GetAllPendingCctx()
if err != nil {
Expand All @@ -124,28 +160,44 @@ func (co *CoreObserver) startSendScheduler() {
co.logger.ZetaChainWatcher.Warn().Msgf("chain %s is not enabled; skip scheduling", c.String())
continue
}
if bn%10 == 0 {
if len(sendList) > 0 {
co.logger.ZetaChainWatcher.Info().Msgf("outstanding %d CCTX's on chain %s: range [%d,%d]", len(sendList), chain, sendList[0].GetCurrentOutTxParam().OutboundTxTssNonce, sendList[len(sendList)-1].GetCurrentOutTxParam().OutboundTxTssNonce)
} else {
continue
}
signer := co.signerMap[*c]
chainClient := co.clientMap[*c]
cnt := 0
maxCnt := 4
safeMode := true // by default, be cautious and only send 1 tx per block
if len(sendList) > 0 {
lastProcessedNonce := int64(sendList[0].GetCurrentOutTxParam().OutboundTxTssNonce) - 1
zblockToProcessedNonce[bn] = lastProcessedNonce
// if for 10 blocks there is no progress, then wind down the maxCnt (lookahead)
if nonce1, found := zblockToProcessedNonce[bn-10]; found {
if nonce1 < lastProcessedNonce && outTxMan.numActiveProcessor < 10 {
safeMode = false
}
}
co.logger.ZetaChainWatcher.Info().Msgf("20 blocks outbound tx processing rate: %.2f", float64(lastProcessedNonce-zblockToProcessedNonce[bn-20])/20.0)
co.logger.ZetaChainWatcher.Info().Msgf("100 blocks outbound tx processing rate: %.2f", float64(lastProcessedNonce-zblockToProcessedNonce[bn-100])/100.0)
co.logger.ZetaChainWatcher.Info().Msgf("since block 0 outbound tx processing rate: %.2f", float64(lastProcessedNonce)/(1.0*float64(bn)))
}
host := co.Tss.Server.P2pCommunication.GetHost()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

go-tss is instrumented (zeta-chain/go-tss#6) to expose the Server and P2pCommunication fields, just to be able to GetHost() (libp2p host).

pCount, cCount, numStreams := countActiveStreams(host.Network())
co.logger.ZetaChainWatcher.Info().Msgf("numStreams: %d; protocol: %+v; conn: %+v", numStreams, pCount, cCount)
if outTxMan.numActiveProcessor == 0 {
co.logger.ZetaChainWatcher.Warn().Msgf("no active outbound tx processor; safeMode: %v", safeMode)
numStreamsReleased := releaseAllStreams(host.Network())
Copy link
Collaborator

@brewmaster012 brewmaster012 May 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kevinssgh this line works around the leaking stream by manually clearing them when progress is slow.

To see the leakage:

comment this line out (releaseAllStreams) , and grep logs of zetaclients with

docker logs -f zetaclient2 | grep -e 'blocked' -e 'StartTryProcess'  -e 'EndTryProcess' -e 'outstanding'   -e 'numStreams' -e 'released' -e 'rate'

You will see that when progress is slow and scheduler slows down to 1 keysign every 20 blocks, a lot of times there is no keysign ceremony going on, and the number of strams might still be in the 1000s. I think this is clearly indication that stream is leaking, as there is no reason to have many streams when there is no keysign ceremony.

Decomposing the stream by protocol-ID, you'll see that most leaked streams are protocol ID "/p2p/join-party-leader".

This line manually closes/resets all such streams when clearly there is no keysign going on.

By adding this workaround, the stress test is able to complete 35min without stopping at around 1tx/s.

co.logger.ZetaChainWatcher.Warn().Msgf("released %d streams", numStreamsReleased)
}

for _, send := range sendList {
ob, err := co.getTargetChainOb(send)
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("getTargetChainOb fail %s", chain)
continue
}
// update metrics
//if idx == 0 {
// pTxs, err := ob.GetPromGauge(metrics.PendingTxs)
// if err != nil {
// co.logger.Warn().Msgf("cannot get prometheus counter [%s]", metrics.PendingTxs)
// } else {
// pTxs.Set(float64(len(sendList)))
// }
//}
// Monitor Core Logger for OutboundTxTssNonce
included, _, err := ob.IsSendOutTxProcessed(send.Index, int(send.GetCurrentOutTxParam().OutboundTxTssNonce), send.GetCurrentOutTxParam().CoinType, co.logger.ZetaChainWatcher)
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("IsSendOutTxProcessed fail %s", chain)
Expand All @@ -164,13 +216,16 @@ func (co *CoreObserver) startSendScheduler() {
continue
}
currentHeight := uint64(bn)
if nonce%20 == currentHeight%20 && !outTxMan.IsOutTxActive(outTxID) { // ZetaCore 5s
if nonce%10 == currentHeight%10 && !outTxMan.IsOutTxActive(outTxID) {
if safeMode && nonce != sendList[0].GetCurrentOutTxParam().OutboundTxTssNonce {
break
}
cnt++
outTxMan.StartTryProcess(outTxID)
co.logger.ZetaChainWatcher.Debug().Msgf("chain %s: Sign outtx %s with value %d\n", chain, send.Index, send.GetCurrentOutTxParam().Amount)
go signer.TryProcessOutTx(send, outTxMan, outTxID, chainClient, co.bridge)
}
if cnt == 4 { // 3/2s
if cnt == maxCnt {
break
}
}
Expand Down