Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(zetaclient): propagate context across codebase & refactor zetacore client #2428

Merged
merged 38 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
3e09525
Implement `retry` package
swift1337 Jul 2, 2024
d59b701
Refactor zetacore query methods
swift1337 Jul 2, 2024
71efb8b
Refactor zetacore broadcast.go
swift1337 Jul 2, 2024
95a44e0
Refactor zetacore client construction
swift1337 Jul 2, 2024
fb865c5
Refactor zetacore client [WIP]
swift1337 Jul 3, 2024
0d3699f
Fix debug cli command
swift1337 Jul 3, 2024
d4e7db3
Add ctx to evm observer
swift1337 Jul 4, 2024
9b46ff7
Add ctx to evm observer & signer
swift1337 Jul 4, 2024
92869d0
Add `bg` package
swift1337 Jul 4, 2024
d3e1112
Add ctx to btc observer
swift1337 Jul 4, 2024
49b7e90
Add ctx to supply checker
swift1337 Jul 4, 2024
2cfd9cf
Add ctx to orchestrator & signers
swift1337 Jul 4, 2024
544f3ea
Fix lint errors
swift1337 Jul 4, 2024
73d6a79
Improve zetacore client configuration. Fix zetacore tests [WIP]
swift1337 Jul 4, 2024
8e23aff
Fix zetacore client test cases
swift1337 Jul 5, 2024
da8bf83
Fix other test cases
swift1337 Jul 5, 2024
67c50da
Merge branch 'develop' into feat/zetaclient-ctx
swift1337 Jul 5, 2024
b59444e
Resolve merge conflicts
swift1337 Jul 5, 2024
1d74153
Update changelog
swift1337 Jul 5, 2024
358d971
Address PR comments [1]
swift1337 Jul 8, 2024
380d93c
Address PR comments [2]
swift1337 Jul 8, 2024
55626c9
Remove logger pointer from bg package
swift1337 Jul 10, 2024
c94e1c8
Minor fix
swift1337 Jul 10, 2024
f94d0a1
Minor code improvement
swift1337 Jul 10, 2024
cace765
Converge config.New and config.NewConfig
swift1337 Jul 10, 2024
0c29b29
Improve NewSigner logging
swift1337 Jul 10, 2024
50c541d
Add zctx.Copy()
swift1337 Jul 11, 2024
b920238
Refactor Orchestrator shutdown logic
swift1337 Jul 11, 2024
23ef837
Fix retrier logic for monitors
swift1337 Jul 11, 2024
c94d844
Minor fix
swift1337 Jul 11, 2024
454a56b
Fix gosec
swift1337 Jul 11, 2024
2fae181
Update e2e readme
swift1337 Jul 11, 2024
b935800
Fix BTC outbound typo
swift1337 Jul 11, 2024
bcb19ae
Merge branch 'develop' into feat/zetaclient-ctx
swift1337 Jul 11, 2024
d36876d
Fix typo
swift1337 Jul 11, 2024
4bdf50f
Address PR comments
swift1337 Jul 12, 2024
5ff1728
Add test cases for `bg`
swift1337 Jul 12, 2024
7e96f9d
Handle ctx errors in `retry`
swift1337 Jul 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions cmd/zetaclientd/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
btcobserver "github.com/zeta-chain/zetacore/zetaclient/chains/bitcoin/observer"
evmobserver "github.com/zeta-chain/zetacore/zetaclient/chains/evm/observer"
"github.com/zeta-chain/zetacore/zetaclient/config"
clientcontext "github.com/zeta-chain/zetacore/zetaclient/context"
zctx "github.com/zeta-chain/zetacore/zetaclient/context"
"github.com/zeta-chain/zetacore/zetaclient/keys"
"github.com/zeta-chain/zetacore/zetaclient/zetacore"
)
Expand Down Expand Up @@ -57,7 +57,8 @@ func debugCmd(_ *cobra.Command, args []string) error {
return err
}

appContext := clientcontext.New(cfg, zerolog.Nop())
appContext := zctx.New(cfg, zerolog.Nop())
ctx := zctx.WithAppContext(context.Background(), appContext)

chainID, err := strconv.ParseInt(args[1], 10, 64)
if err != nil {
Expand All @@ -74,15 +75,16 @@ func debugCmd(_ *cobra.Command, args []string) error {
"",
debugArgs.zetaChainID,
false,
nil)
zerolog.Nop(),
)
if err != nil {
return err
}
chainParams, err := client.GetChainParams()
chainParams, err := client.GetChainParams(ctx)
if err != nil {
return err
}
tssEthAddress, err := client.GetEthTssAddress()
tssEthAddress, err := client.GetEVMTSSAddress(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -148,19 +150,19 @@ func debugCmd(_ *cobra.Command, args []string) error {

switch coinType {
case coin.CoinType_Zeta:
ballotIdentifier, err = evmObserver.CheckAndVoteInboundTokenZeta(tx, receipt, false)
ballotIdentifier, err = evmObserver.CheckAndVoteInboundTokenZeta(ctx, tx, receipt, false)
if err != nil {
return err
}

case coin.CoinType_ERC20:
ballotIdentifier, err = evmObserver.CheckAndVoteInboundTokenERC20(tx, receipt, false)
ballotIdentifier, err = evmObserver.CheckAndVoteInboundTokenERC20(ctx, tx, receipt, false)
if err != nil {
return err
}

case coin.CoinType_Gas:
ballotIdentifier, err = evmObserver.CheckAndVoteInboundTokenGas(tx, receipt, false)
ballotIdentifier, err = evmObserver.CheckAndVoteInboundTokenGas(ctx, tx, receipt, false)
if err != nil {
return err
}
Expand All @@ -186,15 +188,15 @@ func debugCmd(_ *cobra.Command, args []string) error {
return err
}
btcObserver.WithBtcClient(btcClient)
ballotIdentifier, err = btcObserver.CheckReceiptForBtcTxHash(inboundHash, false)
ballotIdentifier, err = btcObserver.CheckReceiptForBtcTxHash(ctx, inboundHash, false)
if err != nil {
return err
}
}
fmt.Println("BallotIdentifier : ", ballotIdentifier)

// query ballot
ballot, err := client.GetBallot(ballotIdentifier)
ballot, err := client.GetBallot(ctx, ballotIdentifier)
if err != nil {
return err
}
Expand Down
45 changes: 32 additions & 13 deletions cmd/zetaclientd/keygen_tss.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"encoding/hex"
"encoding/json"
"errors"
Expand All @@ -16,35 +17,42 @@ import (

"github.com/zeta-chain/zetacore/pkg/chains"
observertypes "github.com/zeta-chain/zetacore/x/observer/types"
"github.com/zeta-chain/zetacore/zetaclient/context"
zctx "github.com/zeta-chain/zetacore/zetaclient/context"
"github.com/zeta-chain/zetacore/zetaclient/metrics"
mc "github.com/zeta-chain/zetacore/zetaclient/tss"
"github.com/zeta-chain/zetacore/zetaclient/zetacore"
)

func GenerateTss(
appContext *context.AppContext,
ctx context.Context,
logger zerolog.Logger,
client *zetacore.Client,
peers p2p.AddrList,
priKey secp256k1.PrivKey,
ts *metrics.TelemetryServer,
tssHistoricalList []observertypes.TSS,
tssPassword string,
hotkeyPassword string) (*mc.TSS, error) {
hotkeyPassword string,
) (*mc.TSS, error) {
app, err := zctx.FromContext(ctx)
if err != nil {
return nil, err
}

keygenLogger := logger.With().Str("module", "keygen").Logger()

// Bitcoin chain ID is currently used for using the correct signature format
// TODO: remove this once we have a better way to determine the signature format
// https://github.com/zeta-chain/node/issues/1397
bitcoinChainID := chains.BitcoinRegtest.ChainId
btcChain, _, btcEnabled := appContext.GetBTCChainAndConfig()
btcChain, _, btcEnabled := app.GetBTCChainAndConfig()
if btcEnabled {
bitcoinChainID = btcChain.ChainId
}

tss, err := mc.NewTSS(
appContext,
ctx,
app,
peers,
priKey,
preParams,
Expand Down Expand Up @@ -74,7 +82,7 @@ func GenerateTss(
// This loop will try keygen at the keygen block and then wait for keygen to be successfully reported by all nodes before breaking out of the loop.
// If keygen is unsuccessful, it will reset the triedKeygenAtBlock flag and try again at a new keygen block.

keyGen := appContext.GetKeygen()
keyGen := app.GetKeygen()
if keyGen.Status == observertypes.KeygenStatus_KeyGenSuccess {
return tss, nil
}
Expand All @@ -86,7 +94,7 @@ func GenerateTss(
// Try generating TSS at keygen block , only when status is pending keygen and generation has not been tried at the block
if keyGen.Status == observertypes.KeygenStatus_PendingKeygen {
// Return error if RPC is not working
currentBlock, err := client.GetBlockHeight()
currentBlock, err := client.GetBlockHeight(ctx)
if err != nil {
keygenLogger.Error().Err(err).Msg("GetBlockHeight RPC error")
continue
Expand All @@ -101,16 +109,21 @@ func GenerateTss(
if currentBlock > lastBlock {
lastBlock = currentBlock
keygenLogger.Info().
Msgf("Waiting For Keygen Block to arrive or new keygen block to be set. Keygen Block : %d Current Block : %d ChainID %s ", keyGen.BlockNumber, currentBlock, appContext.Config().ChainID)
Msgf("Waiting For Keygen Block to arrive or new keygen block to be set. Keygen Block : %d Current Block : %d ChainID %s ", keyGen.BlockNumber, currentBlock, app.Config().ChainID)
}
continue
}
// Try keygen only once at a particular block, irrespective of whether it is successful or failure
triedKeygenAtBlock = true
err = keygenTss(keyGen, tss, keygenLogger)
err = keygenTss(ctx, keyGen, tss, keygenLogger)
if err != nil {
keygenLogger.Error().Err(err).Msg("keygenTss error")
tssFailedVoteHash, err := client.SetTSS("", keyGen.BlockNumber, chains.ReceiveStatus_failed)
tssFailedVoteHash, err := client.PostVoteTSS(
ctx,
"",
keyGen.BlockNumber,
chains.ReceiveStatus_failed,
)
if err != nil {
keygenLogger.Error().Err(err).Msg("Failed to broadcast Failed TSS Vote to zetacore")
return nil, err
Expand All @@ -128,7 +141,8 @@ func GenerateTss(
}

// If TSS is successful , broadcast the vote to zetacore and set Pubkey
tssSuccessVoteHash, err := client.SetTSS(
tssSuccessVoteHash, err := client.PostVoteTSS(
ctx,
newTss.CurrentPubkey,
keyGen.BlockNumber,
chains.ReceiveStatus_success,
Expand All @@ -155,7 +169,7 @@ func GenerateTss(
return nil, errors.New("unexpected state for TSS generation")
}

func keygenTss(keyGen observertypes.Keygen, tss *mc.TSS, keygenLogger zerolog.Logger) error {
func keygenTss(ctx context.Context, keyGen observertypes.Keygen, tss *mc.TSS, keygenLogger zerolog.Logger) error {
keygenLogger.Info().Msgf("Keygen at blocknum %d , TSS signers %s ", keyGen.BlockNumber, keyGen.GranteePubkeys)
var req keygen.Request
req = keygen.NewRequest(keyGen.GranteePubkeys, keyGen.BlockNumber, "0.14.0")
Expand All @@ -168,7 +182,12 @@ func keygenTss(keyGen observertypes.Keygen, tss *mc.TSS, keygenLogger zerolog.Lo
return err
}
index := fmt.Sprintf("keygen-%s-%d", digest, keyGen.BlockNumber)
zetaHash, err := tss.ZetacoreClient.PostBlameData(&res.Blame, tss.ZetacoreClient.Chain().ChainId, index)
zetaHash, err := tss.ZetacoreClient.PostVoteBlameData(
ctx,
&res.Blame,
tss.ZetacoreClient.Chain().ChainId,
index,
)
if err != nil {
keygenLogger.Error().Err(err).Msg("error sending blame data to core")
return err
Expand Down
54 changes: 30 additions & 24 deletions cmd/zetaclientd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
Expand All @@ -26,7 +27,7 @@ import (
observerTypes "github.com/zeta-chain/zetacore/x/observer/types"
"github.com/zeta-chain/zetacore/zetaclient/chains/base"
"github.com/zeta-chain/zetacore/zetaclient/config"
"github.com/zeta-chain/zetacore/zetaclient/context"
zctx "github.com/zeta-chain/zetacore/zetaclient/context"
"github.com/zeta-chain/zetacore/zetaclient/metrics"
"github.com/zeta-chain/zetacore/zetaclient/orchestrator"
)
Expand All @@ -44,8 +45,7 @@ func init() {
}

func start(_ *cobra.Command, _ []string) error {
err := setHomeDir()
if err != nil {
if err := setHomeDir(); err != nil {
return err
}

Expand All @@ -62,24 +62,25 @@ func start(_ *cobra.Command, _ []string) error {
if err != nil {
return err
}

logger, err := base.InitLogger(cfg)
if err != nil {
log.Error().Err(err).Msg("InitLogger failed")
return err
return errors.Wrap(err, "initLogger failed")
}

//Wait until zetacore has started
// Wait until zetacore has started
if len(cfg.Peer) != 0 {
err := validatePeer(cfg.Peer)
if err != nil {
log.Error().Err(err).Msg("invalid peer")
return err
if err := validatePeer(cfg.Peer); err != nil {
return errors.Wrap(err, "unable to validate peer")
}
}

masterLogger := logger.Std
startLogger := masterLogger.With().Str("module", "startup").Logger()

appContext := zctx.New(cfg, masterLogger)
ctx := zctx.WithAppContext(context.Background(), appContext)

// Wait until zetacore is up
waitForZetaCore(cfg, startLogger)
startLogger.Info().Msgf("Zetacore is ready, trying to connect to %s", cfg.Peer)
Expand All @@ -95,15 +96,14 @@ func start(_ *cobra.Command, _ []string) error {

// CreateZetacoreClient: zetacore client is used for all communication to zetacore , which this client connects to.
// Zetacore accumulates votes , and provides a centralized source of truth for all clients
zetacoreClient, err := CreateZetacoreClient(cfg, telemetryServer, hotkeyPass)
zetacoreClient, err := CreateZetacoreClient(cfg, hotkeyPass, masterLogger)
if err != nil {
startLogger.Error().Err(err).Msg("CreateZetacoreClient error")
return err
}

// Wait until zetacore is ready to create blocks
err = zetacoreClient.WaitForZetacoreToCreateBlocks()
if err != nil {
if err = zetacoreClient.WaitForZetacoreToCreateBlocks(ctx); err != nil {
startLogger.Error().Err(err).Msg("WaitForZetacoreToCreateBlocks error")
return err
}
Expand All @@ -117,7 +117,7 @@ func start(_ *cobra.Command, _ []string) error {
}

// cross-check chainid
res, err := zetacoreClient.GetNodeInfo()
res, err := zetacoreClient.GetNodeInfo(ctx)
if err != nil {
startLogger.Error().Err(err).Msg("GetNodeInfo error")
return err
Expand All @@ -144,15 +144,14 @@ func start(_ *cobra.Command, _ []string) error {
startLogger.Debug().Msgf("CreateAuthzSigner is ready")

// Initialize core parameters from zetacore
appContext := context.New(cfg, masterLogger)
err = zetacoreClient.UpdateZetacoreContext(appContext, true, startLogger)
err = zetacoreClient.UpdateZetacoreContext(ctx, appContext, true, startLogger)
if err != nil {
startLogger.Error().Err(err).Msg("Error getting core parameters")
return err
}
startLogger.Info().Msgf("Config is updated from zetacore %s", maskCfg(cfg))

go zetacoreClient.ZetacoreContextUpdater(appContext)
go zetacoreClient.UpdateZetacoreContextWorker(ctx, appContext)

// Generate TSS address . The Tss address is generated through Keygen ceremony. The TSS key is used to sign all outbound transactions .
// The hotkeyPk is private key for the Hotkey. The Hotkey is used to sign all inbound transactions
Expand Down Expand Up @@ -195,14 +194,14 @@ func start(_ *cobra.Command, _ []string) error {
metrics.LastStartTime.SetToCurrentTime()

var tssHistoricalList []observerTypes.TSS
tssHistoricalList, err = zetacoreClient.GetTssHistory()
tssHistoricalList, err = zetacoreClient.GetTSSHistory(ctx)
if err != nil {
startLogger.Error().Err(err).Msg("GetTssHistory error")
}

telemetryServer.SetIPAddress(cfg.PublicIP)
tss, err := GenerateTss(
appContext,
ctx,
masterLogger,
zetacoreClient,
peers,
Expand Down Expand Up @@ -237,7 +236,7 @@ func start(_ *cobra.Command, _ []string) error {
// Update Current TSS value from zetacore, if TSS keygen is successful, the TSS address is set on zeta-core
// Returns err if the RPC call fails as zeta client needs the current TSS address to be set
// This is only needed in case of a new Keygen , as the TSS address is set on zetacore only after the keygen is successful i.e enough votes have been broadcast
currentTss, err := zetacoreClient.GetCurrentTss()
currentTss, err := zetacoreClient.GetCurrentTSS(ctx)
if err != nil {
startLogger.Error().Err(err).Msg("GetCurrentTSS error")
return err
Expand All @@ -254,7 +253,7 @@ func start(_ *cobra.Command, _ []string) error {
startLogger.Error().Msgf("No chains enabled in updated config %s ", cfg.String())
}

observerList, err := zetacoreClient.GetObserverList()
observerList, err := zetacoreClient.GetObserverList(ctx)
if err != nil {
startLogger.Error().Err(err).Msg("GetObserverList error")
return err
Expand Down Expand Up @@ -294,13 +293,20 @@ func start(_ *cobra.Command, _ []string) error {
} else {
startLogger.Debug().Msgf("Node %s is an active observer starting external chain observers", zetacoreClient.GetKeys().GetOperatorAddress().String())
for _, observer := range observerMap {
observer.Start()
observer.Start(ctx)
}
}

// Orchestrator wraps the zetacore client and adds the observers and signer maps to it . This is the high level object used for CCTX interactions
orchestrator := orchestrator.NewOrchestrator(zetacoreClient, signerMap, observerMap, masterLogger, telemetryServer)
err = orchestrator.MonitorCore(appContext)
orchestrator := orchestrator.NewOrchestrator(
ctx,
zetacoreClient,
signerMap,
observerMap,
masterLogger,
telemetryServer,
)
err = orchestrator.MonitorCore(ctx)
if err != nil {
startLogger.Error().Err(err).Msg("Orchestrator failed to start")
return err
Expand Down
Loading
Loading