From f3531be7fb073196bcdab138b1cc2c6d02f90b6c Mon Sep 17 00:00:00 2001 From: Charlie Chen <34498985+ws4charlie@users.noreply.github.com> Date: Wed, 14 Aug 2024 17:28:24 -0500 Subject: [PATCH] fix: adjust evm outbound tracker reporter to avoid submitting invalid hashes (#2628) * refactor and fix evm outbound tracker reporter to avoid invalid hashes; print log when outbound tracker is full of invalid hashes * add changelog entry * used predefined log fields * remove repeated fields information from log message; Devops team would configure Datadog to show the fields * remove redundant fields in log message; unified logs * remove pending transaction map from observer; the outbound tracker reporter will no longer report pending hash * use bg.Work() to launch outbound tracker reporter goroutines * bring the checking EnsureNoTrackers() back * add more rationale to EVM outbound tracker submission * sync observer and signers without wait on startup * try fixing tss migration E2E failure by increase timeout --- changelog.md | 1 + e2e/utils/zetacore.go | 2 +- pkg/constant/constant.go | 6 + zetaclient/chains/base/observer.go | 11 +- zetaclient/chains/base/signer.go | 6 +- zetaclient/chains/evm/constant.go | 7 +- zetaclient/chains/evm/observer/observer.go | 19 -- zetaclient/chains/evm/observer/outbound.go | 211 +++++++++--------- zetaclient/chains/evm/rpc/rpc.go | 52 +++++ zetaclient/chains/evm/rpc/rpc_live_test.go | 45 ++++ zetaclient/chains/evm/signer/outbound_data.go | 22 -- .../chains/evm/signer/outbound_data_test.go | 5 +- .../evm/signer/outbound_tracker_reporter.go | 85 +++++++ zetaclient/chains/evm/signer/signer.go | 152 ++----------- zetaclient/chains/evm/signer/signer_test.go | 41 +--- zetaclient/chains/solana/observer/outbound.go | 23 +- .../signer/outbound_tracker_reporter.go | 39 ++-- zetaclient/logs/fields.go | 18 ++ zetaclient/orchestrator/orchestrator.go | 11 +- 19 files changed, 394 insertions(+), 362 deletions(-) create mode 100644 zetaclient/chains/evm/rpc/rpc.go create mode 100644 zetaclient/chains/evm/rpc/rpc_live_test.go create mode 100644 zetaclient/chains/evm/signer/outbound_tracker_reporter.go create mode 100644 zetaclient/logs/fields.go diff --git a/changelog.md b/changelog.md index f70579b6ca..74969ff3d2 100644 --- a/changelog.md +++ b/changelog.md @@ -6,6 +6,7 @@ * [2533](https://github.com/zeta-chain/node/pull/2533) - parse memo from both OP_RETURN and inscription * [2597](https://github.com/zeta-chain/node/pull/2597) - Add generic rpc metrics to zetaclient * [2634](https://github.com/zeta-chain/node/pull/2634) - add support for EIP-1559 gas fees +* [2628](https://github.com/zeta-chain/node/pull/2628) - avoid submitting invalid hashes to outbound tracker ## v19.0.1 diff --git a/e2e/utils/zetacore.go b/e2e/utils/zetacore.go index a10dc8d68b..9122860d75 100644 --- a/e2e/utils/zetacore.go +++ b/e2e/utils/zetacore.go @@ -20,7 +20,7 @@ const ( AdminPolicyName = "admin" OperationalPolicyName = "operational" - DefaultCctxTimeout = 4 * time.Minute + DefaultCctxTimeout = 6 * time.Minute ) // WaitCctxMinedByInboundHash waits until cctx is mined; returns the cctxIndex (the last one) diff --git a/pkg/constant/constant.go b/pkg/constant/constant.go index 6aba0f4dea..b296be7654 100644 --- a/pkg/constant/constant.go +++ b/pkg/constant/constant.go @@ -1,6 +1,12 @@ package constant +import "time" + const ( + // ZetaBlockTime is the block time of the ZetaChain network + // It's a rough estimate that can be used in non-critical path to estimate the time of a block + ZetaBlockTime = 6000 * time.Millisecond + // DonationMessage is the message for donation transactions // Transaction sent to the TSS or ERC20 Custody address containing this message are considered as a donation DonationMessage = "I am rich!" diff --git a/zetaclient/chains/base/observer.go b/zetaclient/chains/base/observer.go index 428946f0bf..23d2f13a76 100644 --- a/zetaclient/chains/base/observer.go +++ b/zetaclient/chains/base/observer.go @@ -17,6 +17,7 @@ import ( observertypes "github.com/zeta-chain/zetacore/x/observer/types" "github.com/zeta-chain/zetacore/zetaclient/chains/interfaces" "github.com/zeta-chain/zetacore/zetaclient/db" + "github.com/zeta-chain/zetacore/zetaclient/logs" "github.com/zeta-chain/zetacore/zetaclient/metrics" clienttypes "github.com/zeta-chain/zetacore/zetaclient/types" "github.com/zeta-chain/zetacore/zetaclient/zetacore" @@ -295,13 +296,13 @@ func (ob *Observer) Logger() *ObserverLogger { // WithLogger attaches a new logger to the observer. func (ob *Observer) WithLogger(logger Logger) *Observer { - chainLogger := logger.Std.With().Int64("chain", ob.chain.ChainId).Logger() + chainLogger := logger.Std.With().Int64(logs.FieldChain, ob.chain.ChainId).Logger() ob.logger = ObserverLogger{ Chain: chainLogger, - Inbound: chainLogger.With().Str("module", "inbound").Logger(), - Outbound: chainLogger.With().Str("module", "outbound").Logger(), - GasPrice: chainLogger.With().Str("module", "gasprice").Logger(), - Headers: chainLogger.With().Str("module", "headers").Logger(), + Inbound: chainLogger.With().Str(logs.FieldModule, logs.ModNameInbound).Logger(), + Outbound: chainLogger.With().Str(logs.FieldModule, logs.ModNameOutbound).Logger(), + GasPrice: chainLogger.With().Str(logs.FieldModule, logs.ModNameGasPrice).Logger(), + Headers: chainLogger.With().Str(logs.FieldModule, logs.ModNameHeaders).Logger(), Compliance: logger.Compliance, } return ob diff --git a/zetaclient/chains/base/signer.go b/zetaclient/chains/base/signer.go index 6618c338de..781288b513 100644 --- a/zetaclient/chains/base/signer.go +++ b/zetaclient/chains/base/signer.go @@ -5,6 +5,7 @@ import ( "github.com/zeta-chain/zetacore/pkg/chains" "github.com/zeta-chain/zetacore/zetaclient/chains/interfaces" + "github.com/zeta-chain/zetacore/zetaclient/logs" "github.com/zeta-chain/zetacore/zetaclient/metrics" ) @@ -38,7 +39,10 @@ func NewSigner(chain chains.Chain, tss interfaces.TSSSigner, ts *metrics.Telemet tss: tss, ts: ts, logger: Logger{ - Std: logger.Std.With().Int64("chain", chain.ChainId).Str("module", "signer").Logger(), + Std: logger.Std.With(). + Int64(logs.FieldChain, chain.ChainId). + Str(logs.FieldModule, "signer"). + Logger(), Compliance: logger.Compliance, }, outboundBeingReported: make(map[string]bool), diff --git a/zetaclient/chains/evm/constant.go b/zetaclient/chains/evm/constant.go index b754d57f30..beaeb6143f 100644 --- a/zetaclient/chains/evm/constant.go +++ b/zetaclient/chains/evm/constant.go @@ -3,12 +3,13 @@ package evm import "time" const ( - // ZetaBlockTime is the block time of the Zeta network - ZetaBlockTime = 6500 * time.Millisecond - // OutboundInclusionTimeout is the timeout for waiting for an outbound to be included in a block OutboundInclusionTimeout = 20 * time.Minute + // ReorgProtectBlockCount is confirmations count to protect against reorg + // Short 1~2 block reorgs could happen often on Ethereum due to network congestion or block production race conditions + ReorgProtectBlockCount = 2 + // OutboundTrackerReportTimeout is the timeout for waiting for an outbound tracker report OutboundTrackerReportTimeout = 10 * time.Minute diff --git a/zetaclient/chains/evm/observer/observer.go b/zetaclient/chains/evm/observer/observer.go index 4959b13b9a..b5e7be3f6e 100644 --- a/zetaclient/chains/evm/observer/observer.go +++ b/zetaclient/chains/evm/observer/observer.go @@ -43,9 +43,6 @@ type Observer struct { // evmJSONRPC is the EVM JSON RPC client for the observed chain evmJSONRPC interfaces.EVMJSONRPCClient - // outboundPendingTransactions is the map to index pending transactions by hash - outboundPendingTransactions map[string]*ethtypes.Transaction - // outboundConfirmedReceipts is the map to index confirmed receipts by hash outboundConfirmedReceipts map[string]*ethtypes.Receipt @@ -92,7 +89,6 @@ func NewObserver( Observer: *baseObserver, evmClient: evmClient, evmJSONRPC: ethrpc.NewEthRPC(evmCfg.Endpoint), - outboundPendingTransactions: make(map[string]*ethtypes.Transaction), outboundConfirmedReceipts: make(map[string]*ethtypes.Receipt), outboundConfirmedTransactions: make(map[string]*ethtypes.Transaction), priorityFeeConfig: priorityFeeConfig{}, @@ -230,25 +226,10 @@ func (ob *Observer) WatchRPCStatus(ctx context.Context) error { } } -// SetPendingTx sets the pending transaction in memory -func (ob *Observer) SetPendingTx(nonce uint64, transaction *ethtypes.Transaction) { - ob.Mu().Lock() - defer ob.Mu().Unlock() - ob.outboundPendingTransactions[ob.OutboundID(nonce)] = transaction -} - -// GetPendingTx gets the pending transaction from memory -func (ob *Observer) GetPendingTx(nonce uint64) *ethtypes.Transaction { - ob.Mu().Lock() - defer ob.Mu().Unlock() - return ob.outboundPendingTransactions[ob.OutboundID(nonce)] -} - // SetTxNReceipt sets the receipt and transaction in memory func (ob *Observer) SetTxNReceipt(nonce uint64, receipt *ethtypes.Receipt, transaction *ethtypes.Transaction) { ob.Mu().Lock() defer ob.Mu().Unlock() - delete(ob.outboundPendingTransactions, ob.OutboundID(nonce)) // remove pending transaction, if any ob.outboundConfirmedReceipts[ob.OutboundID(nonce)] = receipt ob.outboundConfirmedTransactions[ob.OutboundID(nonce)] = transaction } diff --git a/zetaclient/chains/evm/observer/outbound.go b/zetaclient/chains/evm/observer/outbound.go index 680fb4e3af..598b9b6a44 100644 --- a/zetaclient/chains/evm/observer/outbound.go +++ b/zetaclient/chains/evm/observer/outbound.go @@ -13,25 +13,33 @@ import ( ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/pkg/errors" "github.com/rs/zerolog" - "github.com/rs/zerolog/log" "github.com/zeta-chain/protocol-contracts/pkg/contracts/evm/erc20custody.sol" "github.com/zeta-chain/protocol-contracts/pkg/contracts/evm/zetaconnector.non-eth.sol" "github.com/zeta-chain/zetacore/pkg/chains" "github.com/zeta-chain/zetacore/pkg/coin" + crosschainkeeper "github.com/zeta-chain/zetacore/x/crosschain/keeper" crosschaintypes "github.com/zeta-chain/zetacore/x/crosschain/types" "github.com/zeta-chain/zetacore/zetaclient/chains/evm" "github.com/zeta-chain/zetacore/zetaclient/chains/interfaces" "github.com/zeta-chain/zetacore/zetaclient/compliance" zctx "github.com/zeta-chain/zetacore/zetaclient/context" + "github.com/zeta-chain/zetacore/zetaclient/logs" clienttypes "github.com/zeta-chain/zetacore/zetaclient/types" "github.com/zeta-chain/zetacore/zetaclient/zetacore" ) // WatchOutbound watches evm chain for outgoing txs status // TODO(revamp): move ticker function to ticker file -// TODO(revamp): move inner logic to a separate function func (ob *Observer) WatchOutbound(ctx context.Context) error { + // get app context + app, err := zctx.FromContext(ctx) + if err != nil { + return err + } + + // create outbound ticker + chainID := ob.Chain().ChainId ticker, err := clienttypes.NewDynamicTicker( fmt.Sprintf("EVM_WatchOutbound_%d", ob.Chain().ChainId), ob.GetChainParams().OutboundTicker, @@ -41,11 +49,6 @@ func (ob *Observer) WatchOutbound(ctx context.Context) error { return err } - app, err := zctx.FromContext(ctx) - if err != nil { - return err - } - ob.Logger().Outbound.Info().Msgf("WatchOutbound started for chain %d", ob.Chain().ChainId) sampledLogger := ob.Logger().Outbound.Sample(&zerolog.BasicSampler{N: 10}) defer ticker.Stop() @@ -57,38 +60,16 @@ func (ob *Observer) WatchOutbound(ctx context.Context) error { Msgf("WatchOutbound: outbound observation is disabled for chain %d", ob.Chain().ChainId) continue } - trackers, err := ob.ZetacoreClient(). - GetAllOutboundTrackerByChain(ctx, ob.Chain().ChainId, interfaces.Ascending) + + // process outbound trackers + err := ob.ProcessOutboundTrackers(ctx) if err != nil { - continue - } - for _, tracker := range trackers { - nonceInt := tracker.Nonce - if ob.IsTxConfirmed(nonceInt) { // Go to next tracker if this one already has a confirmed tx - continue - } - txCount := 0 - var outboundReceipt *ethtypes.Receipt - var outbound *ethtypes.Transaction - for _, txHash := range tracker.HashList { - if receipt, tx, ok := ob.checkConfirmedTx(ctx, txHash.TxHash, nonceInt); ok { - txCount++ - outboundReceipt = receipt - outbound = tx - ob.Logger().Outbound.Info(). - Msgf("WatchOutbound: confirmed outbound %s for chain %d nonce %d", txHash.TxHash, ob.Chain().ChainId, nonceInt) - if txCount > 1 { - ob.Logger().Outbound.Error().Msgf( - "WatchOutbound: checkConfirmedTx passed, txCount %d chain %d nonce %d receipt %v transaction %v", txCount, ob.Chain().ChainId, nonceInt, outboundReceipt, outbound) - } - } - } - if txCount == 1 { // should be only one txHash confirmed for each nonce. - ob.SetTxNReceipt(nonceInt, outboundReceipt, outbound) - } else if txCount > 1 { // should not happen. We can't tell which txHash is true. It might happen (e.g. glitchy/hacked endpoint) - ob.Logger().Outbound.Error().Msgf("WatchOutbound: confirmed multiple (%d) outbound for chain %d nonce %d", txCount, ob.Chain().ChainId, nonceInt) - } + ob.Logger(). + Outbound.Error(). + Err(err). + Msgf("WatchOutbound: error ProcessOutboundTrackers for chain %d", chainID) } + ticker.UpdateInterval(ob.GetChainParams().OutboundTicker, ob.Logger().Outbound) case <-ob.StopChannel(): ob.Logger().Outbound.Info().Msg("WatchOutbound: stopped") @@ -97,6 +78,61 @@ func (ob *Observer) WatchOutbound(ctx context.Context) error { } } +// ProcessOutboundTrackers processes outbound trackers +func (ob *Observer) ProcessOutboundTrackers(ctx context.Context) error { + chainID := ob.Chain().ChainId + trackers, err := ob.ZetacoreClient().GetAllOutboundTrackerByChain(ctx, ob.Chain().ChainId, interfaces.Ascending) + if err != nil { + return errors.Wrap(err, "GetAllOutboundTrackerByChain error") + } + + // prepare logger fields + logger := ob.Logger().Outbound.With(). + Str(logs.FieldMethod, "ProcessOutboundTrackers"). + Int64(logs.FieldChain, chainID). + Logger() + + // process outbound trackers + for _, tracker := range trackers { + // go to next tracker if this one already has a confirmed tx + nonce := tracker.Nonce + if ob.IsTxConfirmed(nonce) { + continue + } + + // check each txHash and save tx and receipt if it's legit and confirmed + txCount := 0 + var outboundReceipt *ethtypes.Receipt + var outbound *ethtypes.Transaction + for _, txHash := range tracker.HashList { + if receipt, tx, ok := ob.checkConfirmedTx(ctx, txHash.TxHash, nonce); ok { + txCount++ + outboundReceipt = receipt + outbound = tx + logger.Info().Msgf("confirmed outbound %s for chain %d nonce %d", txHash.TxHash, chainID, nonce) + if txCount > 1 { + logger.Error(). + Msgf("checkConfirmedTx passed, txCount %d chain %d nonce %d receipt %v tx %v", txCount, chainID, nonce, receipt, tx) + } + } + } + + // should be only one txHash confirmed for each nonce. + if txCount == 1 { + ob.SetTxNReceipt(nonce, outboundReceipt, outbound) + } else if txCount > 1 { + // should not happen. We can't tell which txHash is true. It might happen (e.g. bug, glitchy/hacked endpoint) + ob.Logger().Outbound.Error().Msgf("WatchOutbound: confirmed multiple (%d) outbound for chain %d nonce %d", txCount, chainID, nonce) + } else { + if len(tracker.HashList) == crosschainkeeper.MaxOutboundTrackerHashes { + ob.Logger().Outbound.Error().Msgf("WatchOutbound: outbound tracker is full of hashes for chain %d nonce %d", chainID, nonce) + } + } + } + + return nil +} + // PostVoteOutbound posts vote to zetacore for the confirmed outbound func (ob *Observer) PostVoteOutbound( ctx context.Context, @@ -377,23 +413,27 @@ func (ob *Observer) checkConfirmedTx( ctx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() + // prepare logger + logger := ob.Logger().Outbound.With(). + Str(logs.FieldMethod, "checkConfirmedTx"). + Int64(logs.FieldChain, ob.Chain().ChainId). + Uint64(logs.FieldNonce, nonce). + Str(logs.FieldTx, txHash). + Logger() + // query transaction transaction, isPending, err := ob.evmClient.TransactionByHash(ctx, ethcommon.HexToHash(txHash)) if err != nil { - log.Error(). - Err(err). - Str("function", "confirmTxByHash"). - Str("outboundTxHash", txHash). - Int64("chainID", ob.Chain().ChainId). - Msg("error getting transaction for outbound") + logger.Error().Err(err).Msg("TransactionByHash error") return nil, nil, false } if transaction == nil { // should not happen - log.Error(). - Str("function", "confirmTxByHash"). - Str("outboundTxHash", txHash). - Uint64("nonce", nonce). - Msg("transaction is nil for txHash") + logger.Error().Msg("transaction is nil") + return nil, nil, false + } + if isPending { + // should not happen when we are here. The outbound tracker reporter won't report a pending tx. + logger.Error().Msg("transaction is pending") return nil, nil, false } @@ -401,12 +441,7 @@ func (ob *Observer) checkConfirmedTx( signer := ethtypes.NewLondonSigner(big.NewInt(ob.Chain().ChainId)) from, err := signer.Sender(transaction) if err != nil { - log.Error(). - Err(err). - Str("function", "confirmTxByHash"). - Str("outboundTxHash", transaction.Hash().Hex()). - Int64("chainID", ob.Chain().ChainId). - Msg("local recovery of sender address failed for outbound") + logger.Error().Err(err).Msg("local recovery of sender address failed") return nil, nil, false } if from != ob.TSS().EVMAddress() { // must be TSS address @@ -416,13 +451,8 @@ func (ob *Observer) checkConfirmedTx( // TODO : improve this logic to verify that the correct TSS address is the from address. // https://github.com/zeta-chain/node/issues/2487 - log.Info(). - Str("function", "confirmTxByHash"). - Str("sender", from.Hex()). - Str("outboundTxHash", transaction.Hash().Hex()). - Int64("chainID", ob.Chain().ChainId). - Str("currentTSSAddress", ob.TSS().EVMAddress().Hex()). - Msg("sender is not current TSS address") + logger.Warn(). + Msgf("tx sender %s is not matching current TSS address %s", from.String(), ob.TSS().EVMAddress().String()) addressList := ob.TSS().EVMAddressList() isOldTssAddress := false for _, addr := range addressList { @@ -431,70 +461,35 @@ func (ob *Observer) checkConfirmedTx( } } if !isOldTssAddress { - log.Error(). - Str("function", "confirmTxByHash"). - Str("sender", from.Hex()). - Str("outboundTxHash", transaction.Hash().Hex()). - Int64("chainID", ob.Chain().ChainId). - Str("currentTSSAddress", ob.TSS().EVMAddress().Hex()). - Msg("sender is not current or old TSS address") + logger.Error().Msgf("tx sender %s is not matching any of the TSS addresses", from.String()) return nil, nil, false } } - if transaction.Nonce() != nonce { // must match cctx nonce - log.Error(). - Str("function", "confirmTxByHash"). - Str("outboundTxHash", txHash). - Uint64("wantedNonce", nonce). - Uint64("gotTxNonce", transaction.Nonce()). - Msg("outbound nonce mismatch") - return nil, nil, false - } - - // save pending transaction - if isPending { - ob.SetPendingTx(nonce, transaction) + if transaction.Nonce() != nonce { // must match tracker nonce + logger.Error().Msgf("tx nonce %d is not matching tracker nonce", nonce) return nil, nil, false } // query receipt receipt, err := ob.evmClient.TransactionReceipt(ctx, ethcommon.HexToHash(txHash)) if err != nil { - log.Error(). - Err(err). - Str("function", "confirmTxByHash"). - Str("outboundTxHash", txHash). - Uint64("nonce", nonce). - Msg("transactionReceipt error") + logger.Error().Err(err).Msg("TransactionReceipt error") return nil, nil, false } if receipt == nil { // should not happen - log.Error(). - Str("function", "confirmTxByHash"). - Str("outboundTxHash", txHash). - Uint64("nonce", nonce). - Msg("receipt is nil") + logger.Error().Msg("receipt is nil") return nil, nil, false } ob.LastBlock() // check confirmations lastHeight, err := ob.evmClient.BlockNumber(ctx) if err != nil { - log.Error(). - Str("function", "confirmTxByHash"). - Err(err). - Int64("chainID", ob.GetChainParams().ChainId). - Msg("error getting block number for chain") + logger.Error().Err(err).Msg("BlockNumber error") return nil, nil, false } if !ob.HasEnoughConfirmations(receipt, lastHeight) { - log.Debug(). - Str("function", "confirmTxByHash"). - Str("txHash", txHash). - Uint64("nonce", nonce). - Uint64("receiptBlock", receipt.BlockNumber.Uint64()). - Uint64("currentBlock", lastHeight). - Msg("txHash included but not confirmed") + logger.Debug(). + Msgf("tx included but not confirmed, receipt block %d current block %d", receipt.BlockNumber.Uint64(), lastHeight) return nil, nil, false } @@ -502,13 +497,7 @@ func (ob *Observer) checkConfirmedTx( // Note: a guard for false BlockNumber in receipt. The blob-carrying tx won't come here err = ob.CheckTxInclusion(transaction, receipt) if err != nil { - log.Error(). - Err(err). - Str("function", "confirmTxByHash"). - Str("errorContext", "checkTxInclusion"). - Str("txHash", txHash). - Uint64("nonce", nonce). - Msg("checkTxInclusion error") + logger.Error().Err(err).Msg("CheckTxInclusion error") return nil, nil, false } diff --git a/zetaclient/chains/evm/rpc/rpc.go b/zetaclient/chains/evm/rpc/rpc.go new file mode 100644 index 0000000000..6fcc3d007c --- /dev/null +++ b/zetaclient/chains/evm/rpc/rpc.go @@ -0,0 +1,52 @@ +package rpc + +import ( + "context" + + ethcommon "github.com/ethereum/go-ethereum/common" + "github.com/pkg/errors" + + "github.com/zeta-chain/zetacore/zetaclient/chains/interfaces" +) + +// IsTxConfirmed checks if the transaction is confirmed with given confirmations +func IsTxConfirmed( + ctx context.Context, + client interfaces.EVMRPCClient, + txHash string, + confirmations uint64, +) (bool, error) { + // query the tx + _, isPending, err := client.TransactionByHash(ctx, ethcommon.HexToHash(txHash)) + if err != nil { + return false, errors.Wrapf(err, "error getting transaction for tx %s", txHash) + } + if isPending { + return false, nil + } + + // query receipt + receipt, err := client.TransactionReceipt(ctx, ethcommon.HexToHash(txHash)) + if err != nil { + return false, errors.Wrapf(err, "error getting transaction receipt for tx %s", txHash) + } + + // should not happen + if receipt == nil { + return false, errors.Errorf("receipt is nil for tx %s", txHash) + } + + // query last block height + lastHeight, err := client.BlockNumber(ctx) + if err != nil { + return false, errors.Wrap(err, "error getting block number") + } + + // check confirmations + if lastHeight < receipt.BlockNumber.Uint64() { + return false, nil + } + blocks := lastHeight - receipt.BlockNumber.Uint64() + 1 + + return blocks >= confirmations, nil +} diff --git a/zetaclient/chains/evm/rpc/rpc_live_test.go b/zetaclient/chains/evm/rpc/rpc_live_test.go new file mode 100644 index 0000000000..0c420c830e --- /dev/null +++ b/zetaclient/chains/evm/rpc/rpc_live_test.go @@ -0,0 +1,45 @@ +package rpc_test + +import ( + "context" + "math" + + "github.com/ethereum/go-ethereum/ethclient" + "github.com/stretchr/testify/require" + "github.com/zeta-chain/zetacore/zetaclient/chains/evm/rpc" + + "testing" +) + +const ( + URLEthMainnet = "https://rpc.ankr.com/eth" + URLEthSepolia = "https://rpc.ankr.com/eth_sepolia" + URLBscMainnet = "https://rpc.ankr.com/bsc" + URLPolygonMainnet = "https://rpc.ankr.com/polygon" +) + +// Test_EVMRPCLive is a phony test to run each live test individually +func Test_EVMRPCLive(t *testing.T) { + // LiveTest_IsTxConfirmed(t) +} + +func LiveTest_IsTxConfirmed(t *testing.T) { + client, err := ethclient.Dial(URLEthMainnet) + require.NoError(t, err) + + // check if the transaction is confirmed + ctx := context.Background() + txHash := "0xd2eba7ac3da1b62800165414ea4bcaf69a3b0fb9b13a0fc32f4be11bfef79146" + + t.Run("should confirm tx", func(t *testing.T) { + confirmed, err := rpc.IsTxConfirmed(ctx, client, txHash, 12) + require.NoError(t, err) + require.True(t, confirmed) + }) + + t.Run("should not confirm tx if confirmations is not enough", func(t *testing.T) { + confirmed, err := rpc.IsTxConfirmed(ctx, client, txHash, math.MaxUint64) + require.NoError(t, err) + require.False(t, confirmed) + }) +} diff --git a/zetaclient/chains/evm/signer/outbound_data.go b/zetaclient/chains/evm/signer/outbound_data.go index 6a4cec2154..a430adb3ba 100644 --- a/zetaclient/chains/evm/signer/outbound_data.go +++ b/zetaclient/chains/evm/signer/outbound_data.go @@ -13,7 +13,6 @@ import ( "github.com/zeta-chain/zetacore/pkg/coin" "github.com/zeta-chain/zetacore/x/crosschain/types" - "github.com/zeta-chain/zetacore/zetaclient/chains/evm/observer" zctx "github.com/zeta-chain/zetacore/zetaclient/context" ) @@ -47,7 +46,6 @@ type OutboundData struct { func NewOutboundData( ctx context.Context, cctx *types.CrossChainTx, - observer *observer.Observer, height uint64, logger zerolog.Logger, ) (*OutboundData, bool, error) { @@ -65,8 +63,6 @@ func NewOutboundData( } outboundParams := cctx.GetCurrentOutboundParam() - nonce := outboundParams.TssNonce - if err := validateParams(outboundParams); err != nil { return nil, false, errors.Wrap(err, "invalid outboundParams") } @@ -97,24 +93,6 @@ func NewOutboundData( return nil, false, errors.Wrap(err, "unable to get cctx index") } - // In case there is a pending tx, make sure this keySign is a tx replacement - if tx := observer.GetPendingTx(nonce); tx != nil { - evt := logger.Info(). - Str("cctx.pending_tx_hash", tx.Hash().Hex()). - Uint64("cctx.pending_tx_nonce", nonce) - - // new gas price is less or equal to pending tx gas - if gas.Price.Cmp(tx.GasPrice()) <= 0 { - evt.Msg("Please wait for pending outbound to be included in the block") - return nil, true, nil - } - - evt. - Uint64("cctx.gas_price", gas.Price.Uint64()). - Uint64("cctx.priority_fee", gas.PriorityFee.Uint64()). - Msg("Replacing pending outbound transaction with higher gas fees") - } - // Base64 decode message var message []byte if cctx.InboundParams.CoinType != coin.CoinType_Cmd { diff --git a/zetaclient/chains/evm/signer/outbound_data_test.go b/zetaclient/chains/evm/signer/outbound_data_test.go index 03c1329ef1..2f53bab028 100644 --- a/zetaclient/chains/evm/signer/outbound_data_test.go +++ b/zetaclient/chains/evm/signer/outbound_data_test.go @@ -13,15 +13,12 @@ import ( ) func TestNewOutboundData(t *testing.T) { - mockObserver, err := getNewEvmChainObserver(t, nil) - require.NoError(t, err) - logger := zerolog.New(zerolog.NewTestWriter(t)) ctx := makeCtx(t) newOutbound := func(cctx *types.CrossChainTx) (*OutboundData, bool, error) { - return NewOutboundData(ctx, cctx, mockObserver, 123, logger) + return NewOutboundData(ctx, cctx, 123, logger) } t.Run("success", func(t *testing.T) { diff --git a/zetaclient/chains/evm/signer/outbound_tracker_reporter.go b/zetaclient/chains/evm/signer/outbound_tracker_reporter.go new file mode 100644 index 0000000000..7a9b6bcfa2 --- /dev/null +++ b/zetaclient/chains/evm/signer/outbound_tracker_reporter.go @@ -0,0 +1,85 @@ +// Package signer implements the ChainSigner interface for EVM chains +package signer + +import ( + "context" + "time" + + "github.com/rs/zerolog" + + "github.com/zeta-chain/zetacore/pkg/bg" + "github.com/zeta-chain/zetacore/zetaclient/chains/evm" + "github.com/zeta-chain/zetacore/zetaclient/chains/evm/rpc" + "github.com/zeta-chain/zetacore/zetaclient/chains/interfaces" + "github.com/zeta-chain/zetacore/zetaclient/logs" +) + +// reportToOutboundTracker reports outboundHash to tracker only when tx receipt is available +func (signer *Signer) reportToOutboundTracker( + ctx context.Context, + zetacoreClient interfaces.ZetacoreClient, + chainID int64, + nonce uint64, + outboundHash string, + logger zerolog.Logger, +) { + // prepare logger + logger = logger.With(). + Str(logs.FieldMethod, "reportToOutboundTracker"). + Int64(logs.FieldChain, chainID). + Uint64(logs.FieldNonce, nonce). + Str(logs.FieldTx, outboundHash). + Logger() + + // set being reported flag to avoid duplicate reporting + alreadySet := signer.SetBeingReportedFlag(outboundHash) + if alreadySet { + logger.Info().Msg("outbound is being reported to tracker") + return + } + + // launch a goroutine to monitor tx confirmation status + bg.Work(ctx, func(ctx context.Context) error { + defer func() { + signer.ClearBeingReportedFlag(outboundHash) + }() + + // try monitoring tx inclusion status for 20 minutes + tStart := time.Now() + for { + // take a rest between each check + time.Sleep(10 * time.Second) + + // give up (forget about the tx) after 20 minutes of monitoring, there are 2 reasons: + // 1. the gas stability pool should have kicked in and replaced the tx by then. + // 2. even if there is a chance that the tx is included later, most likely it's going to be a false tx hash (either replaced or dropped). + // 3. we prefer missed tx hash over potentially invalid txhash. + if time.Since(tStart) > evm.OutboundInclusionTimeout { + logger.Info().Msgf("timeout waiting outbound inclusion") + return nil + } + + // check tx confirmation status + confirmed, err := rpc.IsTxConfirmed(ctx, signer.client, outboundHash, evm.ReorgProtectBlockCount) + if err != nil { + logger.Err(err).Msg("unable to check confirmation status of outbound") + continue + } + if !confirmed { + continue + } + + // report outbound hash to tracker + zetaHash, err := zetacoreClient.AddOutboundTracker(ctx, chainID, nonce, outboundHash, nil, "", -1) + if err != nil { + logger.Err(err).Msg("error adding outbound to tracker") + } else if zetaHash != "" { + logger.Info().Msgf("added outbound to tracker; zeta txhash %s", zetaHash) + } else { + // exit goroutine until the tracker contains the hash (reported by either this or other signers) + logger.Info().Msg("outbound now exists in tracker") + return nil + } + } + }, bg.WithName("TrackerReporterEVM"), bg.WithLogger(logger)) +} diff --git a/zetaclient/chains/evm/signer/signer.go b/zetaclient/chains/evm/signer/signer.go index d30b887ea8..0a89b5a657 100644 --- a/zetaclient/chains/evm/signer/signer.go +++ b/zetaclient/chains/evm/signer/signer.go @@ -25,14 +25,13 @@ import ( "github.com/zeta-chain/zetacore/pkg/chains" "github.com/zeta-chain/zetacore/pkg/coin" "github.com/zeta-chain/zetacore/pkg/constant" - crosschainkeeper "github.com/zeta-chain/zetacore/x/crosschain/keeper" "github.com/zeta-chain/zetacore/x/crosschain/types" "github.com/zeta-chain/zetacore/zetaclient/chains/base" "github.com/zeta-chain/zetacore/zetaclient/chains/evm" - "github.com/zeta-chain/zetacore/zetaclient/chains/evm/observer" "github.com/zeta-chain/zetacore/zetaclient/chains/interfaces" "github.com/zeta-chain/zetacore/zetaclient/compliance" zctx "github.com/zeta-chain/zetacore/zetaclient/context" + "github.com/zeta-chain/zetacore/zetaclient/logs" "github.com/zeta-chain/zetacore/zetaclient/metrics" "github.com/zeta-chain/zetacore/zetaclient/outboundprocessor" "github.com/zeta-chain/zetacore/zetaclient/testutils/mocks" @@ -396,48 +395,42 @@ func (signer *Signer) TryProcessOutbound( cctx *types.CrossChainTx, outboundProc *outboundprocessor.Processor, outboundID string, - chainObserver interfaces.ChainObserver, + _ interfaces.ChainObserver, zetacoreClient interfaces.ZetacoreClient, height uint64, ) { + // end outbound process on panic + defer func() { + outboundProc.EndTryProcess(outboundID) + if r := recover(); r != nil { + signer.Logger().Std.Error().Msgf("TryProcessOutbound: %s, caught panic error: %v", cctx.Index, r) + } + }() + + // prepare logger and a few local variables var ( params = cctx.GetCurrentOutboundParam() myID = zetacoreClient.GetKeys().GetOperatorAddress() logger = signer.Logger().Std.With(). - Str("method", "TryProcessOutbound"). - Int64("chain", signer.Chain().ChainId). - Uint64("nonce", params.TssNonce). - Str("cctx.index", cctx.Index). + Str(logs.FieldMethod, "TryProcessOutbound"). + Int64(logs.FieldChain, signer.Chain().ChainId). + Uint64(logs.FieldNonce, params.TssNonce). + Str(logs.FieldCctx, cctx.Index). Str("cctx.receiver", params.Receiver). Str("cctx.amount", params.Amount.String()). Logger() ) - logger.Info().Msgf("TryProcessOutbound") + // retrieve app context app, err := zctx.FromContext(ctx) if err != nil { - signer.Logger().Std.Error().Err(err).Msg("error getting app context") - return - } - - // end outbound process on panic - defer func() { - if r := recover(); r != nil { - logger.Error().Interface("panic", r).Msg("panic in TryProcessOutbound") - } - - outboundProc.EndTryProcess(outboundID) - }() - - evmObserver, ok := chainObserver.(*observer.Observer) - if !ok { - logger.Error().Msg("chain observer is not an EVM observer") + logger.Error().Err(err).Msg("error getting app context") return } // Setup Transaction input - txData, skipTx, err := NewOutboundData(ctx, cctx, evmObserver, height, logger) + txData, skipTx, err := NewOutboundData(ctx, cctx, height, logger) if err != nil { logger.Err(err).Msg("error setting up transaction input fields") return @@ -790,115 +783,6 @@ func (signer *Signer) SignMigrateTssFundsCmd(ctx context.Context, txData *Outbou return tx, nil } -// reportToOutboundTracker reports outboundHash to tracker only when tx receipt is available -// TODO(revamp): move outbound tracker function to a outbound tracker file -func (signer *Signer) reportToOutboundTracker( - ctx context.Context, - zetacoreClient interfaces.ZetacoreClient, - chainID int64, - nonce uint64, - outboundHash string, - logger zerolog.Logger, -) { - // set being reported flag to avoid duplicate reporting - alreadySet := signer.Signer.SetBeingReportedFlag(outboundHash) - if alreadySet { - logger.Info(). - Msgf("reportToOutboundTracker: outboundHash %s for chain %d nonce %d is being reported", outboundHash, chainID, nonce) - return - } - - // report to outbound tracker with goroutine - go func() { - defer func() { - signer.Signer.ClearBeingReportedFlag(outboundHash) - }() - - // try monitoring tx inclusion status for 10 minutes - var err error - report := false - isPending := false - blockNumber := uint64(0) - tStart := time.Now() - for { - // give up after 10 minutes of monitoring - time.Sleep(10 * time.Second) - - if time.Since(tStart) > evm.OutboundInclusionTimeout { - // if tx is still pending after timeout, report to outboundTracker anyway as we cannot monitor forever - if isPending { - report = true // probably will be included later - } - logger.Info(). - Msgf("reportToOutboundTracker: timeout waiting tx inclusion for chain %d nonce %d outboundHash %s report %v", chainID, nonce, outboundHash, report) - break - } - // try getting the tx - _, isPending, err = signer.client.TransactionByHash(ctx, ethcommon.HexToHash(outboundHash)) - if err != nil { - logger.Info(). - Err(err). - Msgf("reportToOutboundTracker: error getting tx for chain %d nonce %d outboundHash %s", chainID, nonce, outboundHash) - continue - } - // if tx is include in a block, try getting receipt - if !isPending { - report = true // included - receipt, err := signer.client.TransactionReceipt(ctx, ethcommon.HexToHash(outboundHash)) - if err != nil { - logger.Info(). - Err(err). - Msgf("reportToOutboundTracker: error getting receipt for chain %d nonce %d outboundHash %s", chainID, nonce, outboundHash) - } - if receipt != nil { - blockNumber = receipt.BlockNumber.Uint64() - } - break - } - // keep monitoring pending tx - logger.Info(). - Msgf("reportToOutboundTracker: tx has not been included yet for chain %d nonce %d outboundHash %s", chainID, nonce, outboundHash) - } - - // try adding to outbound tracker for 10 minutes - if report { - tStart := time.Now() - for { - // give up after 10 minutes of retrying - if time.Since(tStart) > evm.OutboundTrackerReportTimeout { - logger.Info(). - Msgf("reportToOutboundTracker: timeout adding outbound tracker for chain %d nonce %d outboundHash %s, please add manually", chainID, nonce, outboundHash) - break - } - // stop if the cctx is already finalized - cctx, err := zetacoreClient.GetCctxByNonce(ctx, chainID, nonce) - if err != nil { - logger.Err(err). - Msgf("reportToOutboundTracker: error getting cctx for chain %d nonce %d outboundHash %s", chainID, nonce, outboundHash) - } else if !crosschainkeeper.IsPending(cctx) { - logger.Info().Msgf("reportToOutboundTracker: cctx already finalized for chain %d nonce %d outboundHash %s", chainID, nonce, outboundHash) - break - } - // report to outbound tracker - zetaHash, err := zetacoreClient.AddOutboundTracker(ctx, chainID, nonce, outboundHash, nil, "", -1) - if err != nil { - logger.Err(err). - Msgf("reportToOutboundTracker: error adding to outbound tracker for chain %d nonce %d outboundHash %s", chainID, nonce, outboundHash) - } else if zetaHash != "" { - logger.Info().Msgf("reportToOutboundTracker: added outboundHash to core successful %s, chain %d nonce %d outboundHash %s block %d", - zetaHash, chainID, nonce, outboundHash, blockNumber) - } else { - // stop if the tracker contains the outboundHash - logger.Info().Msgf("reportToOutboundTracker: outbound tracker contains outboundHash %s for chain %d nonce %d", outboundHash, chainID, nonce) - break - } - // retry otherwise - time.Sleep(evm.ZetaBlockTime * 3) - } - } - }() -} - // getEVMRPC is a helper function to set up the client and signer, also initializes a mock client for unit tests func getEVMRPC(ctx context.Context, endpoint string) (interfaces.EVMRPCClient, ethtypes.Signer, error) { if endpoint == mocks.EVMRPCEnabled { diff --git a/zetaclient/chains/evm/signer/signer_test.go b/zetaclient/chains/evm/signer/signer_test.go index 9233dfcbe2..d96570a278 100644 --- a/zetaclient/chains/evm/signer/signer_test.go +++ b/zetaclient/chains/evm/signer/signer_test.go @@ -196,9 +196,7 @@ func TestSigner_SignOutbound(t *testing.T) { // Setup txData struct cctx := getCCTX(t) - mockObserver, err := getNewEvmChainObserver(t, tss) - require.NoError(t, err) - txData, skip, err := NewOutboundData(ctx, cctx, mockObserver, 123, makeLogger(t)) + txData, skip, err := NewOutboundData(ctx, cctx, 123, zerolog.Logger{}) require.False(t, skip) require.NoError(t, err) @@ -238,7 +236,7 @@ func TestSigner_SignOutbound(t *testing.T) { cctx.OutboundParams[0].GasPriorityFee = big.NewInt(priorityFee).String() // Given outbound data - txData, skip, err := NewOutboundData(ctx, cctx, mockObserver, 123, makeLogger(t)) + txData, skip, err := NewOutboundData(ctx, cctx, 123, makeLogger(t)) require.False(t, skip) require.NoError(t, err) @@ -271,9 +269,7 @@ func TestSigner_SignRevertTx(t *testing.T) { // Setup txData struct cctx := getCCTX(t) - mockObserver, err := getNewEvmChainObserver(t, tss) - require.NoError(t, err) - txData, skip, err := NewOutboundData(ctx, cctx, mockObserver, 123, zerolog.Logger{}) + txData, skip, err := NewOutboundData(ctx, cctx, 123, zerolog.Logger{}) require.False(t, skip) require.NoError(t, err) @@ -311,9 +307,7 @@ func TestSigner_SignCancelTx(t *testing.T) { // Setup txData struct cctx := getCCTX(t) - mockObserver, err := getNewEvmChainObserver(t, tss) - require.NoError(t, err) - txData, skip, err := NewOutboundData(ctx, cctx, mockObserver, 123, zerolog.Logger{}) + txData, skip, err := NewOutboundData(ctx, cctx, 123, zerolog.Logger{}) require.False(t, skip) require.NoError(t, err) @@ -351,9 +345,7 @@ func TestSigner_SignWithdrawTx(t *testing.T) { // Setup txData struct cctx := getCCTX(t) - mockObserver, err := getNewEvmChainObserver(t, tss) - require.NoError(t, err) - txData, skip, err := NewOutboundData(ctx, cctx, mockObserver, 123, zerolog.Logger{}) + txData, skip, err := NewOutboundData(ctx, cctx, 123, zerolog.Logger{}) require.False(t, skip) require.NoError(t, err) @@ -389,9 +381,7 @@ func TestSigner_SignCommandTx(t *testing.T) { // Setup txData struct cctx := getCCTX(t) - mockObserver, err := getNewEvmChainObserver(t, nil) - require.NoError(t, err) - txData, skip, err := NewOutboundData(ctx, cctx, mockObserver, 123, zerolog.Logger{}) + txData, skip, err := NewOutboundData(ctx, cctx, 123, zerolog.Logger{}) require.False(t, skip) require.NoError(t, err) @@ -436,9 +426,7 @@ func TestSigner_SignERC20WithdrawTx(t *testing.T) { // Setup txData struct cctx := getCCTX(t) - mockObserver, err := getNewEvmChainObserver(t, tss) - require.NoError(t, err) - txData, skip, err := NewOutboundData(ctx, cctx, mockObserver, 123, zerolog.Logger{}) + txData, skip, err := NewOutboundData(ctx, cctx, 123, zerolog.Logger{}) require.False(t, skip) require.NoError(t, err) @@ -476,10 +464,7 @@ func TestSigner_BroadcastOutbound(t *testing.T) { // Setup txData struct cctx := getCCTX(t) - mockObserver, err := getNewEvmChainObserver(t, nil) - require.NoError(t, err) - - txData, skip, err := NewOutboundData(ctx, cctx, mockObserver, 123, zerolog.Logger{}) + txData, skip, err := NewOutboundData(ctx, cctx, 123, zerolog.Logger{}) require.NoError(t, err) require.False(t, skip) @@ -532,11 +517,7 @@ func TestSigner_SignWhitelistERC20Cmd(t *testing.T) { // Setup txData struct cctx := getCCTX(t) - - mockObserver, err := getNewEvmChainObserver(t, tss) - require.NoError(t, err) - - txData, skip, err := NewOutboundData(ctx, cctx, mockObserver, 123, zerolog.Logger{}) + txData, skip, err := NewOutboundData(ctx, cctx, 123, zerolog.Logger{}) require.NoError(t, err) require.False(t, skip) @@ -579,9 +560,7 @@ func TestSigner_SignMigrateTssFundsCmd(t *testing.T) { // Setup txData struct cctx := getCCTX(t) - mockObserver, err := getNewEvmChainObserver(t, tss) - require.NoError(t, err) - txData, skip, err := NewOutboundData(ctx, cctx, mockObserver, 123, zerolog.Logger{}) + txData, skip, err := NewOutboundData(ctx, cctx, 123, zerolog.Logger{}) require.False(t, skip) require.NoError(t, err) diff --git a/zetaclient/chains/solana/observer/outbound.go b/zetaclient/chains/solana/observer/outbound.go index 6eca0b437e..396e49df1c 100644 --- a/zetaclient/chains/solana/observer/outbound.go +++ b/zetaclient/chains/solana/observer/outbound.go @@ -17,6 +17,7 @@ import ( crosschaintypes "github.com/zeta-chain/zetacore/x/crosschain/types" "github.com/zeta-chain/zetacore/zetaclient/chains/interfaces" zctx "github.com/zeta-chain/zetacore/zetaclient/context" + "github.com/zeta-chain/zetacore/zetaclient/logs" clienttypes "github.com/zeta-chain/zetacore/zetaclient/types" "github.com/zeta-chain/zetacore/zetaclient/zetacore" ) @@ -254,15 +255,15 @@ func (ob *Observer) CheckFinalizedTx( // prepare logger fields chainID := ob.Chain().ChainId logger := ob.Logger().Outbound.With(). - Str("method", "checkFinalizedTx"). - Int64("chain", chainID). - Uint64("nonce", nonce). - Str("tx", txHash).Logger() + Str(logs.FieldMethod, "CheckFinalizedTx"). + Int64(logs.FieldChain, chainID). + Uint64(logs.FieldNonce, nonce). + Str(logs.FieldTx, txHash).Logger() // convert txHash to signature sig, err := solana.SignatureFromBase58(txHash) if err != nil { - logger.Error().Err(err).Msgf("SignatureFromBase58 err for chain %d nonce %d", chainID, nonce) + logger.Error().Err(err).Msg("SignatureFromBase58 error") return nil, false } @@ -271,20 +272,20 @@ func (ob *Observer) CheckFinalizedTx( Commitment: rpc.CommitmentFinalized, }) if err != nil { - logger.Error().Err(err).Msgf("GetTransaction err for chain %d nonce %d", chainID, nonce) + logger.Error().Err(err).Msg("GetTransaction error") return nil, false } // the tx must be successful in order to effectively increment the nonce if txResult.Meta.Err != nil { - logger.Error().Any("Err", txResult.Meta.Err).Msgf("tx is not successful for chain %d nonce %d", chainID, nonce) + logger.Error().Any("Err", txResult.Meta.Err).Msg("tx is not successful") return nil, false } // parse gateway instruction from tx result inst, err := ParseGatewayInstruction(txResult, ob.gatewayID, coinType) if err != nil { - logger.Error().Err(err).Msgf("ParseGatewayInstruction err for chain %d nonce %d", chainID, nonce) + logger.Error().Err(err).Msg("ParseGatewayInstruction error") return nil, false } txNonce := inst.GatewayNonce() @@ -292,19 +293,19 @@ func (ob *Observer) CheckFinalizedTx( // recover ECDSA signer from instruction signerECDSA, err := inst.Signer() if err != nil { - logger.Error().Err(err).Msgf("cannot get instruction signer for chain %d nonce %d", chainID, nonce) + logger.Error().Err(err).Msg("cannot get instruction signer") return nil, false } // check tx authorization if signerECDSA != ob.TSS().EVMAddress() { - logger.Error().Msgf("tx signer %s is not matching TSS, chain %d nonce %d", signerECDSA, chainID, nonce) + logger.Error().Msgf("tx signer %s is not matching current TSS address %s", signerECDSA, ob.TSS().EVMAddress()) return nil, false } // check tx nonce if txNonce != nonce { - logger.Error().Msgf("tx nonce %d is not matching cctx, chain %d nonce %d", txNonce, chainID, nonce) + logger.Error().Msgf("tx nonce %d is not matching tracker nonce", txNonce) return nil, false } diff --git a/zetaclient/chains/solana/signer/outbound_tracker_reporter.go b/zetaclient/chains/solana/signer/outbound_tracker_reporter.go index 6462060beb..0a5fb6432e 100644 --- a/zetaclient/chains/solana/signer/outbound_tracker_reporter.go +++ b/zetaclient/chains/solana/signer/outbound_tracker_reporter.go @@ -8,7 +8,9 @@ import ( "github.com/gagliardetto/solana-go/rpc" "github.com/rs/zerolog" + "github.com/zeta-chain/zetacore/pkg/bg" "github.com/zeta-chain/zetacore/zetaclient/chains/interfaces" + "github.com/zeta-chain/zetacore/zetaclient/logs" ) const ( @@ -27,16 +29,23 @@ func (signer *Signer) reportToOutboundTracker( txSig solana.Signature, logger zerolog.Logger, ) { + // prepare logger + logger = logger.With(). + Str(logs.FieldMethod, "reportToOutboundTracker"). + Int64(logs.FieldChain, chainID). + Uint64(logs.FieldNonce, nonce). + Str(logs.FieldTx, txSig.String()). + Logger() + // set being reported flag to avoid duplicate reporting alreadySet := signer.Signer.SetBeingReportedFlag(txSig.String()) if alreadySet { - logger.Info(). - Msgf("reportToOutboundTracker: outbound %s for chain %d nonce %d is being reported", txSig, chainID, nonce) + logger.Info().Msg("outbound is being reported to tracker") return } // launch a goroutine to monitor tx confirmation status - go func() { + bg.Work(ctx, func(ctx context.Context) error { defer func() { signer.Signer.ClearBeingReportedFlag(txSig.String()) }() @@ -48,9 +57,8 @@ func (signer *Signer) reportToOutboundTracker( // give up if we know the tx is too old and already expired if time.Since(start) > SolanaTransactionTimeout { - logger.Info(). - Msgf("reportToOutboundTracker: outbound %s expired for chain %d nonce %d", txSig, chainID, nonce) - return + logger.Info().Msg("outbound is expired") + return nil } // query tx using optimistic commitment level "confirmed" @@ -68,24 +76,21 @@ func (signer *Signer) reportToOutboundTracker( // unlike Ethereum, Solana doesn't have protocol-level nonce; the nonce is enforced by the gateway program. // a failed outbound (e.g. signature err, balance err) will never be able to increment the gateway program nonce. // a good/valid candidate of outbound tracker hash must come with a successful tx. - logger.Warn(). - Any("Err", tx.Meta.Err). - Msgf("reportToOutboundTracker: outbound %s failed for chain %d nonce %d", txSig, chainID, nonce) - return + logger.Warn().Any("Err", tx.Meta.Err).Msg("outbound is failed") + return nil } // report outbound hash to zetacore zetaHash, err := zetacoreClient.AddOutboundTracker(ctx, chainID, nonce, txSig.String(), nil, "", -1) if err != nil { - logger.Err(err). - Msgf("reportToOutboundTracker: error adding outbound %s for chain %d nonce %d", txSig, chainID, nonce) + logger.Err(err).Msg("error adding outbound to tracker") } else if zetaHash != "" { - logger.Info().Msgf("reportToOutboundTracker: added outbound %s for chain %d nonce %d; zeta txhash %s", txSig, chainID, nonce, zetaHash) + logger.Info().Msgf("added outbound to tracker; zeta txhash %s", zetaHash) } else { - // exit goroutine if the tracker already contains the hash (reported by other signer) - logger.Info().Msgf("reportToOutboundTracker: outbound %s already in tracker for chain %d nonce %d", txSig, chainID, nonce) - return + // exit goroutine until the tracker contains the hash (reported by either this or other signers) + logger.Info().Msg("outbound now exists in tracker") + return nil } } - }() + }, bg.WithName("TrackerReporterSolana"), bg.WithLogger(logger)) } diff --git a/zetaclient/logs/fields.go b/zetaclient/logs/fields.go new file mode 100644 index 0000000000..497690ffa4 --- /dev/null +++ b/zetaclient/logs/fields.go @@ -0,0 +1,18 @@ +package logs + +// A group of predefined field keys and module names for zetaclient logs +const ( + // field keys + FieldModule = "module" + FieldMethod = "method" + FieldChain = "chain" + FieldNonce = "nonce" + FieldTx = "tx" + FieldCctx = "cctx" + + // module names + ModNameInbound = "inbound" + ModNameOutbound = "outbound" + ModNameGasPrice = "gasprice" + ModNameHeaders = "headers" +) diff --git a/zetaclient/orchestrator/orchestrator.go b/zetaclient/orchestrator/orchestrator.go index c6cdea4f6f..b32c28d5b5 100644 --- a/zetaclient/orchestrator/orchestrator.go +++ b/zetaclient/orchestrator/orchestrator.go @@ -15,12 +15,12 @@ import ( "github.com/samber/lo" "github.com/zeta-chain/zetacore/pkg/bg" + "github.com/zeta-chain/zetacore/pkg/constant" zetamath "github.com/zeta-chain/zetacore/pkg/math" "github.com/zeta-chain/zetacore/x/crosschain/types" observertypes "github.com/zeta-chain/zetacore/x/observer/types" "github.com/zeta-chain/zetacore/zetaclient/chains/base" btcobserver "github.com/zeta-chain/zetacore/zetaclient/chains/bitcoin/observer" - "github.com/zeta-chain/zetacore/zetaclient/chains/evm" "github.com/zeta-chain/zetacore/zetaclient/chains/interfaces" solanaobserver "github.com/zeta-chain/zetacore/zetaclient/chains/solana/observer" zctx "github.com/zeta-chain/zetacore/zetaclient/context" @@ -663,8 +663,13 @@ func (oc *Orchestrator) ScheduleCctxSolana( // runObserverSignerSync runs a blocking ticker that observes chain changes from zetacore // and optionally (de)provisions respective observers and signers. func (oc *Orchestrator) runObserverSignerSync(ctx context.Context) error { - // check every other zeta block - const cadence = 2 * evm.ZetaBlockTime + // sync observers and signers right away to speed up zetaclient startup + if err := oc.syncObserverSigner(ctx); err != nil { + oc.logger.Error().Err(err).Msg("runObserverSignerSync: syncObserverSigner failed for initial sync") + } + + // sync observer and signer every 10 blocks (approx. 1 minute) + const cadence = 10 * constant.ZetaBlockTime ticker := time.NewTicker(cadence) defer ticker.Stop()