diff --git a/node/cmd/guardiand/node.go b/node/cmd/guardiand/node.go index 460757e45c..b714c6ac47 100644 --- a/node/cmd/guardiand/node.go +++ b/node/cmd/guardiand/node.go @@ -231,7 +231,8 @@ var ( // Prometheus remote write URL promRemoteURL *string - chainGovernorEnabled *bool + chainGovernorEnabled *bool + governorFlowCancelEnabled *bool ccqEnabled *bool ccqAllowedRequesters *string @@ -435,6 +436,7 @@ func init() { promRemoteURL = NodeCmd.Flags().String("promRemoteURL", "", "Prometheus remote write URL (Grafana)") chainGovernorEnabled = NodeCmd.Flags().Bool("chainGovernorEnabled", false, "Run the chain governor") + governorFlowCancelEnabled = NodeCmd.Flags().Bool("governorFlowCancelEnabled", false, "Enable flow cancel on the governor") ccqEnabled = NodeCmd.Flags().Bool("ccqEnabled", false, "Enable cross chain query support") ccqAllowedRequesters = NodeCmd.Flags().String("ccqAllowedRequesters", "", "Comma separated list of signers allowed to submit cross chain queries") @@ -541,6 +543,11 @@ func runNode(cmd *cobra.Command, args []string) { os.Exit(1) } + if !(*chainGovernorEnabled) && *governorFlowCancelEnabled { + fmt.Println("Flow cancel can only be enabled when the governor is enabled") + os.Exit(1) + } + logger := zap.New(zapcore.NewCore( consoleEncoder{zapcore.NewConsoleEncoder( zap.NewDevelopmentEncoderConfig())}, @@ -1575,7 +1582,7 @@ func runNode(cmd *cobra.Command, args []string) { node.GuardianOptionDatabase(db), node.GuardianOptionWatchers(watcherConfigs, ibcWatcherConfig), node.GuardianOptionAccountant(*accountantWS, *accountantContract, *accountantCheckEnabled, accountantWormchainConn, *accountantNttContract, accountantNttWormchainConn), - node.GuardianOptionGovernor(*chainGovernorEnabled), + node.GuardianOptionGovernor(*chainGovernorEnabled, *governorFlowCancelEnabled), node.GuardianOptionGatewayRelayer(*gatewayRelayerContract, gatewayRelayerWormchainConn), node.GuardianOptionQueryHandler(*ccqEnabled, *ccqAllowedRequesters), node.GuardianOptionAdminService(*adminSocketPath, ethRPC, ethContract, rpcMap), diff --git a/node/pkg/adminrpc/adminserver_test.go b/node/pkg/adminrpc/adminserver_test.go index 08bcf64745..f1167f71c1 100644 --- a/node/pkg/adminrpc/adminserver_test.go +++ b/node/pkg/adminrpc/adminserver_test.go @@ -322,7 +322,7 @@ func Test_adminCommands(t *testing.T) { } func newNodePrivilegedServiceForGovernorTests() *nodePrivilegedService { - gov := governor.NewChainGovernor(zap.NewNop(), &db.MockGovernorDB{}, wh_common.GoTest) + gov := governor.NewChainGovernor(zap.NewNop(), &db.MockGovernorDB{}, wh_common.GoTest, false) return &nodePrivilegedService{ db: nil, diff --git a/node/pkg/governor/devnet_config.go b/node/pkg/governor/devnet_config.go index 55a6e2f68b..828073b067 100644 --- a/node/pkg/governor/devnet_config.go +++ b/node/pkg/governor/devnet_config.go @@ -17,8 +17,11 @@ func (gov *ChainGovernor) initDevnetConfig() ([]tokenConfigEntry, []tokenConfigE {chain: 2, addr: "000000000000000000000000DDb64fE46a91D46ee29420539FC25FD07c5FEa3E", symbol: "WETH", coinGeckoId: "weth", decimals: 8, price: 1174}, } - flowCancelTokens := []tokenConfigEntry{ - {chain: 1, addr: "3b442cb3912157f13a933d0134282d032b5ffecd01a2dbf1b7790608df002ea7", symbol: "USDC", coinGeckoId: "usdc", decimals: 6, price: 1}, // Addr: 4zMMC9srt5Ri5X14GAgXhaHii3GnPAEERYPJgZJDncDU, Notional: 1 + flowCancelTokens := []tokenConfigEntry{} + if gov.flowCancelEnabled { + flowCancelTokens = []tokenConfigEntry{ + {chain: 1, addr: "3b442cb3912157f13a933d0134282d032b5ffecd01a2dbf1b7790608df002ea7", symbol: "USDC", coinGeckoId: "usdc", decimals: 6, price: 1}, // Addr: 4zMMC9srt5Ri5X14GAgXhaHii3GnPAEERYPJgZJDncDU, Notional: 1 + } } chains := []chainConfigEntry{ diff --git a/node/pkg/governor/governor.go b/node/pkg/governor/governor.go index 13830fd006..fbea430def 100644 --- a/node/pkg/governor/governor.go +++ b/node/pkg/governor/governor.go @@ -28,9 +28,12 @@ package governor import ( "context" "encoding/hex" + "errors" "fmt" "math" "math/big" + "sort" + "strconv" "sync" "time" @@ -179,12 +182,15 @@ func (ce *chainEntry) isBigTransfer(value uint64) bool { } type ChainGovernor struct { - db db.GovernorDB // protected by `mutex` - logger *zap.Logger - mutex sync.Mutex - tokens map[tokenKey]*tokenEntry // protected by `mutex` - tokensByCoinGeckoId map[string][]*tokenEntry // protected by `mutex` - chains map[vaa.ChainID]*chainEntry // protected by `mutex` + db db.GovernorDB // protected by `mutex` + logger *zap.Logger + mutex sync.Mutex + tokens map[tokenKey]*tokenEntry // protected by `mutex` + tokensByCoinGeckoId map[string][]*tokenEntry // protected by `mutex` + chains map[vaa.ChainID]*chainEntry // protected by `mutex` + // We maintain a sorted slice of governed chainIds so we can iterate over maps in a deterministic way + // This slice should be sorted in ascending order by (Wormhole) Chain ID. + chainIds []vaa.ChainID msgsSeen map[string]bool // protected by `mutex` // Key is hash, payload is consts transferComplete and transferEnqueued. msgsToPublish []*common.MessagePublication // protected by `mutex` dayLengthInMinutes int @@ -194,12 +200,14 @@ type ChainGovernor struct { nextConfigPublishTime time.Time statusPublishCounter int64 configPublishCounter int64 + flowCancelEnabled bool } func NewChainGovernor( logger *zap.Logger, db db.GovernorDB, env common.Environment, + flowCancelEnabled bool, ) *ChainGovernor { return &ChainGovernor{ db: db, @@ -209,6 +217,7 @@ func NewChainGovernor( chains: make(map[vaa.ChainID]*chainEntry), msgsSeen: make(map[string]bool), env: env, + flowCancelEnabled: flowCancelEnabled, } } @@ -232,19 +241,28 @@ func (gov *ChainGovernor) Run(ctx context.Context) error { return nil } +func (gov *ChainGovernor) IsFlowCancelEnabled() bool { + return gov.flowCancelEnabled +} + func (gov *ChainGovernor) initConfig() error { gov.mutex.Lock() defer gov.mutex.Unlock() gov.dayLengthInMinutes = 24 * 60 - configTokens := tokenList() - flowCancelTokens := FlowCancelTokenList() configChains := chainList() + configTokens := tokenList() + flowCancelTokens := []tokenConfigEntry{} if gov.env == common.UnsafeDevNet { configTokens, flowCancelTokens, configChains = gov.initDevnetConfig() } else if gov.env == common.TestNet { configTokens, flowCancelTokens, configChains = gov.initTestnetConfig() + } else { + // mainnet, unit tests, or accountant-mock + if gov.flowCancelEnabled { + flowCancelTokens = FlowCancelTokenList() + } } for _, ct := range configTokens { @@ -306,23 +324,27 @@ func (gov *ChainGovernor) initConfig() error { } } - for _, flowCancelConfigEntry := range flowCancelTokens { - addr, err := vaa.StringToAddress(flowCancelConfigEntry.addr) - if err != nil { - return err - } - key := tokenKey{chain: vaa.ChainID(flowCancelConfigEntry.chain), addr: addr} + // If flow cancelling is enabled, enable the `flowCancels` field for the Governed assets that + // correspond to the entries in the Flow Cancel Tokens List + if gov.flowCancelEnabled { + for _, flowCancelConfigEntry := range flowCancelTokens { + addr, err := vaa.StringToAddress(flowCancelConfigEntry.addr) + if err != nil { + return err + } + key := tokenKey{chain: vaa.ChainID(flowCancelConfigEntry.chain), addr: addr} - // Only add flow cancelling for tokens that are already configured for rate-limiting. - if _, ok := gov.tokens[key]; ok { - gov.tokens[key].flowCancels = true - } else { - gov.logger.Debug("token present in flow cancel list but absent from main token list:", - zap.Stringer("chain", key.chain), - zap.Stringer("addr", key.addr), - zap.String("symbol", flowCancelConfigEntry.symbol), - zap.String("coinGeckoId", flowCancelConfigEntry.coinGeckoId), - ) + // Only add flow cancelling for tokens that are already configured for rate-limiting. + if _, ok := gov.tokens[key]; ok { + gov.tokens[key].flowCancels = true + } else { + gov.logger.Debug("token present in flow cancel list but absent from main token list:", + zap.Stringer("chain", key.chain), + zap.Stringer("addr", key.addr), + zap.String("symbol", flowCancelConfigEntry.symbol), + zap.String("coinGeckoId", flowCancelConfigEntry.coinGeckoId), + ) + } } } @@ -375,6 +397,22 @@ func (gov *ChainGovernor) initConfig() error { return fmt.Errorf("no chains are configured") } + // Populate a sorted list of chain IDs so that we can iterate over maps in a determinstic way. + // https://go.dev/blog/maps, "Iteration order" section + governedChainIds := make([]vaa.ChainID, len(gov.chains)) + i := 0 + for id := range gov.chains { + // updating the slice in place here to satisfy prealloc lint. In theory this should be more performant + governedChainIds[i] = id + i++ + } + // Custom sorting for the vaa.ChainID type + sort.Slice(governedChainIds, func(i, j int) bool { + return governedChainIds[i] < governedChainIds[j] + }) + + gov.chainIds = governedChainIds + return nil } @@ -651,10 +689,23 @@ func (gov *ChainGovernor) parseMsgAlreadyLocked( return true, ce, token, payload, nil } +// CheckPending is a wrapper method for CheckPendingForTime. It is called by the processor with the purpose of releasing +// queued transfers. func (gov *ChainGovernor) CheckPending() ([]*common.MessagePublication, error) { return gov.CheckPendingForTime(time.Now()) } +// CheckPendingForTime checks whether a pending message is ready to be released, and if so, modifies the chain entry's `pending` and `transfers` slices by +// moving a `dbTransfer` element from `pending` to `transfers`. Returns a slice of Messages that will be published. +// A transfer is ready to be released when one of the following conditions holds: +// - The 'release time' duration has passed since `now` (i.e. the transfer has been queued for 24 hours, regardless of +// the Governor's current capacity) +// - Within the release time duration, other transfers have been processed and have freed up outbound Governor capacity. +// This happens either because other transfers get released after 24 hours or because incoming transfers of +// flow-cancelling assets have freed up outbound capacity. +// +// WARNING: When this function returns an error, it propagates to the `processor` which in turn interprets this as a +// signal to RESTART THE PROCESSOR. Therefore, errors returned by this function effectively act as panics. func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessagePublication, error) { gov.mutex.Lock() defer gov.mutex.Unlock() @@ -669,15 +720,26 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP gov.msgsToPublish = nil } - for _, ce := range gov.chains { + // Iterate deterministically by accessing keys from this slice instead of the chainEntry map directly + for _, chainId := range gov.chainIds { + ce, ok := gov.chains[chainId] + if !ok { + gov.logger.Error("chainId not found in gov.chains", zap.Stringer("chainId", chainId)) + + } // Keep going as long as we find something that will fit. for { foundOne := false prevTotalValue, err := gov.TrimAndSumValueForChain(ce, startTime) if err != nil { gov.logger.Error("error when attempting to trim and sum transfers", zap.Error(err)) + gov.logger.Error("refusing to release transfers for this chain until the sum can be correctly calculated", + zap.Stringer("chainId", chainId), + zap.Uint64("prevTotalValue", prevTotalValue), + zap.Error(err)) gov.msgsToPublish = msgsToPublish - return nil, err + // Skip further processing for this chain entry + break } // Keep going until we find something that fits or hit the end. @@ -734,7 +796,8 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP zap.Uint64("value", value), zap.Uint64("prevTotalValue", prevTotalValue), zap.Uint64("newTotalValue", newTotalValue), - zap.String("msgID", pe.dbData.Msg.MessageIDString())) + zap.String("msgID", pe.dbData.Msg.MessageIDString()), + zap.String("flowCancels", strconv.FormatBool(pe.token.flowCancels))) } payload, err := vaa.DecodeTransferPayloadHdr(pe.dbData.Msg.Payload) @@ -746,7 +809,9 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP ) delete(gov.msgsSeen, pe.hash) // Rest of the clean up happens below. } else { - // If we get here, publish it and remove it from the pending list. + // If we get here, publish it and move it from the pending list to the + // transfers list. Also add a flow-cancel transfer to the destination chain + // if the transfer is sending a flow-canceling asset. msgsToPublish = append(msgsToPublish, &pe.dbData.Msg) if countsTowardsTransfers { @@ -762,26 +827,47 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP Hash: pe.hash, } - if err := gov.db.StoreTransfer(&dbTransfer); err != nil { - gov.msgsToPublish = msgsToPublish + transfer, err := newTransferFromDbTransfer(&dbTransfer) + if err != nil { + // Should never occur unless dbTransfer.Value overflows MaxInt64 + gov.logger.Error("could not convert dbTransfer to transfer", + zap.String("msgID", dbTransfer.MsgID), + zap.String("hash", pe.hash), + zap.Error(err), + ) + // This causes the processor to die. We don't want to process transfers that + // have USD value in excess of MaxInt64 under any circumstances. + // This check should occur before the call to the database so + // that we don't store a problematic transfer. return nil, err } - transfer, err := newTransferFromDbTransfer(&dbTransfer) - if err != nil { + if err := gov.db.StoreTransfer(&dbTransfer); err != nil { + // This causes the processor to die. We can't tolerate DB connection + // errors. return nil, err } + ce.transfers = append(ce.transfers, transfer) + gov.msgsSeen[pe.hash] = transferComplete + // Add inverse transfer to destination chain entry if this asset can cancel flows. - key := tokenKey{chain: dbTransfer.EmitterChain, addr: dbTransfer.EmitterAddress} + key := tokenKey{chain: pe.token.token.chain, addr: pe.token.token.addr} tokenEntry := gov.tokens[key] if tokenEntry != nil { // Mandatory check to ensure that the token should be able to reduce the Governor limit. if tokenEntry.flowCancels { if destinationChainEntry, ok := gov.chains[payload.TargetChain]; ok { + if err := destinationChainEntry.addFlowCancelTransferFromDbTransfer(&dbTransfer); err != nil { - return nil, err + gov.logger.Warn("could not add flow canceling transfer to destination chain", + zap.String("msgID", dbTransfer.MsgID), + zap.String("hash", pe.hash), + zap.Error(err), + ) + // Process the next pending transfer + continue } } else { gov.logger.Warn("tried to cancel flow but chain entry for target chain does not exist", @@ -792,7 +878,6 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP } } } - gov.msgsSeen[pe.hash] = transferComplete } else { delete(gov.msgsSeen, pe.hash) } @@ -836,46 +921,46 @@ func computeValue(amount *big.Int, token *tokenEntry) (uint64, error) { return value, nil } -// TrimAndSumValueForChain calculates the `sum` of `Transfer`s for a given chain `emitter`. In effect, it represents a +// TrimAndSumValueForChain calculates the `sum` of `Transfer`s for a given chain `chainEntry`. In effect, it represents a // chain's "Governor Usage" for a given 24 hour period. // This sum may be reduced by the sum of 'flow cancelling' transfers: that is, transfers of an allow-listed token // that have the `emitter` as their destination chain. // The resulting `sum` return value therefore represents the net flow across a chain when taking flow-cancelling tokens // into account. Therefore, this value should never be less than 0 and should never exceed the "Governor limit" for the chain. -// As a side-effect, this function modifies the parameter `emitter`, updating its `transfers` field so that it only includes +// As a side-effect, this function modifies the parameter `chainEntry`, updating its `transfers` field so that it only includes // filtered `Transfer`s (i.e. outgoing `Transfer`s newer than `startTime`). +// Returns an error if the sum cannot be calculated. The transfers field will still be updated in this case. When +// an error condition occurs, this function returns the chain's `dailyLimit` as the sum. This should result in the +// chain appearing at maximum capacity from the perspective of the Governor, and therefore cause new transfers to be +// queued until space opens up. // SECURITY Invariant: The `sum` return value should never be less than 0 -// SECURITY Invariant: The `sum` return value should never exceed the "Governor limit" for the chain -func (gov *ChainGovernor) TrimAndSumValueForChain(emitter *chainEntry, startTime time.Time) (sum uint64, err error) { - // Sum the value of all outgoing transfers - var sumOutgoing int64 - sumOutgoing, emitter.transfers, err = gov.TrimAndSumValue(emitter.transfers, startTime) +func (gov *ChainGovernor) TrimAndSumValueForChain(chainEntry *chainEntry, startTime time.Time) (sum uint64, err error) { + if chainEntry == nil { + // We don't expect this to happen but this prevents a nil pointer deference + return 0, errors.New("TrimAndSumValeForChain parameter chainEntry must not be nil") + } + // Sum the value of all transfers for this chain. This sum can be negative if flow-cancelling is enabled + // and the incoming value of flow-cancelling assets exceeds the summed value of all outgoing assets. + var sumValue int64 + sumValue, chainEntry.transfers, err = gov.TrimAndSumValue(chainEntry.transfers, startTime) if err != nil { - return 0, err + // Return the daily limit as the sum so that any further transfers will be queued. + return chainEntry.dailyLimit, err } - // Return early if the sum is not positive as it cannot exceed the daily limit. - // In this case, return 0 even if the sum is negative. - if sumOutgoing <= 0 { + // Return 0 even if the sum is negative. + if sumValue <= 0 { return 0, nil } - sum = uint64(sumOutgoing) - if sum > emitter.dailyLimit { - return 0, fmt.Errorf( - "invariant violation: calculated sum %d exceeds Governor limit %d", - sum, - emitter.dailyLimit, - ) - } - - return sum, nil + return uint64(sumValue), nil } // TrimAndSumValue iterates over a slice of transfer structs. It filters out transfers that have Timestamp values that // are earlier than the parameter `startTime`. The function then iterates over the remaining transfers, sums their Value, // and returns the sum and the filtered transfers. +// As a side-effect, this function deletes transfers from the database if their Timestamp is before `startTime`. // The `transfers` slice must be sorted by Timestamp. We expect this to be the case as transfers are added to the // Governor in chronological order as they arrive. Note that `Timestamp` is created by the Governor; it is not read // from the actual on-chain transaction. @@ -948,7 +1033,8 @@ func CheckedAddUint64(x uint64, y uint64) (uint64, error) { return sum, nil } -// CheckedAddInt64 adds two uint64 values with overflow checks +// CheckedAddInt64 adds two uint64 values with overflow checks. Returns an error if the calculation would +// overflow or underflow. In this case, the returned value is 0. func CheckedAddInt64(x int64, y int64) (int64, error) { if x == 0 { return y, nil diff --git a/node/pkg/governor/governor_db.go b/node/pkg/governor/governor_db.go index a71fb194fa..db928ce868 100644 --- a/node/pkg/governor/governor_db.go +++ b/node/pkg/governor/governor_db.go @@ -243,19 +243,23 @@ func (gov *ChainGovernor) reloadTransfer(xfer *db.Transfer) error { } ce.transfers = append(ce.transfers, transfer) - // Reload flow-cancel transfers for the TargetChain. This is important when node restarts so that a corresponding, - // inverse transfer is added to the TargetChain. This is already done during the `ProcessMsgForTime` loop but - // that function does not capture flow-cancelling when the node is restarted. + // Reload flow-cancel transfers for the TargetChain. This is important when the node restarts so that a corresponding, + // inverse transfer is added to the TargetChain. This is already done during the `ProcessMsgForTime` and + // `CheckPending` loops but those functions do not capture flow-cancelling when the node is restarted. tokenEntry := gov.tokens[tk] if tokenEntry != nil { // Mandatory check to ensure that the token should be able to reduce the Governor limit. if tokenEntry.flowCancels { if destinationChainEntry, ok := gov.chains[xfer.TargetChain]; ok { if err := destinationChainEntry.addFlowCancelTransferFromDbTransfer(xfer); err != nil { + gov.logger.Error("could not add flow canceling transfer to destination chain", + zap.String("msgID", xfer.MsgID), + zap.String("hash", xfer.Hash), zap.Error(err), + ) return err } } else { - gov.logger.Warn("tried to cancel flow but chain entry for target chain does not exist", + gov.logger.Error("tried to cancel flow but chain entry for target chain does not exist", zap.String("msgID", xfer.MsgID), zap.Stringer("token chain", xfer.OriginChain), zap.Stringer("token address", xfer.OriginAddress), diff --git a/node/pkg/governor/governor_monitoring.go b/node/pkg/governor/governor_monitoring.go index 9929acb98b..142a90dd8e 100644 --- a/node/pkg/governor/governor_monitoring.go +++ b/node/pkg/governor/governor_monitoring.go @@ -100,15 +100,16 @@ func (gov *ChainGovernor) Status() (resp string) { defer gov.mutex.Unlock() startTime := time.Now().Add(-time.Minute * time.Duration(gov.dayLengthInMinutes)) + for _, ce := range gov.chains { - valueTrans, err := sumValue(ce.transfers, startTime) + netValue, _, _, err := sumValue(ce.transfers, startTime) if err != nil { // We don't want to actually return an error or otherwise stop // execution in this case. Instead of propagating the error here, print the contents of the // error message. return fmt.Sprintf("chain: %v, dailyLimit: OVERFLOW. error: %s", ce.emitterChainId, err) } - s1 := fmt.Sprintf("chain: %v, dailyLimit: %v, total: %v, numPending: %v", ce.emitterChainId, ce.dailyLimit, valueTrans, len(ce.pending)) + s1 := fmt.Sprintf("chain: %v, dailyLimit: %v, total: %v, numPending: %v", ce.emitterChainId, ce.dailyLimit, netValue, len(ce.pending)) resp += s1 + "\n" gov.logger.Info(s1) if len(ce.pending) != 0 { @@ -244,62 +245,111 @@ func (gov *ChainGovernor) resetReleaseTimerForTime(vaaId string, now time.Time, } return "", fmt.Errorf("vaa not found in the pending list") + } -// sumValue sums the value of all `transfers`. See also `TrimAndSumValue`. -func sumValue(transfers []transfer, startTime time.Time) (uint64, error) { +// sumValue sums the value of all `transfers`, returning separate fields for: +// - the net sum of all outgoing small tranasfers minus flow cancel sum +// - the sum of all outgoing small tranasfers +// - the sum of all incoming flow-cancelling transfers +// NOTE these sums exclude "big transfers" as they are always queued for 24h and are never added to the chain entry's 'transfers' field. +// Returns an error if the sum of all transfers would overflow the bounds of Int64. In this case, the function +// returns a value of 0. +func sumValue(transfers []transfer, startTime time.Time) (netNotional int64, smallTxOutgoingNotional uint64, flowCancelNotional uint64, err error) { if len(transfers) == 0 { - return 0, nil + return 0, 0, 0, nil } - var sum int64 + // Sum of all outgoing small tranasfers minus incoming flow cancel transfers. Big transfers are excluded + netNotional = int64(0) + smallTxOutgoingNotional = uint64(0) + flowCancelNotional = uint64(0) for _, t := range transfers { if t.dbTransfer.Timestamp.Before(startTime) { continue } - checkedSum, err := CheckedAddInt64(sum, t.value) + netNotional, err = CheckedAddInt64(netNotional, t.value) if err != nil { // We have to stop and return an error here (rather than saturate, for example). The // transfers are not sorted by value so we can't make any guarantee on the final value // if we hit the upper or lower bound. We don't expect this to happen in any case. - return 0, err + return 0, 0, 0, err + } + if t.value < 0 { + // If a transfer is negative then it is an incoming, flow-cancelling transfer. + // We can use the dbTransfer.Value for calculating the sum because it is the unsigned version + // of t.Value + flowCancelNotional += t.dbTransfer.Value + } else { + smallTxOutgoingNotional += t.dbTransfer.Value } - sum = checkedSum - } - - // Do not return negative values. Instead, saturate to zero. - if sum <= 0 { - return 0, nil } - return uint64(sum), nil + return netNotional, smallTxOutgoingNotional, flowCancelNotional, nil } -// REST query to get the current available notional value per chain. +// REST query to get the current available notional value per chain. This is defined as the sum of all transfers +// subtracted from the chains's dailyLimit. +// The available notional limit by chain represents the remaining capacity of a chain. As a result, it should not be +// a negative number: we don't want to represent that there is "negative value" available. func (gov *ChainGovernor) GetAvailableNotionalByChain() (resp []*publicrpcv1.GovernorGetAvailableNotionalByChainResponse_Entry) { gov.mutex.Lock() defer gov.mutex.Unlock() startTime := time.Now().Add(-time.Minute * time.Duration(gov.dayLengthInMinutes)) - for _, ce := range gov.chains { - value, err := sumValue(ce.transfers, startTime) + + // Iterate deterministically by accessing keys from this slice instead of the chainEntry map directly + for _, chainId := range gov.chainIds { + ce := gov.chains[chainId] + netUsage, _, incoming, err := sumValue(ce.transfers, startTime) if err != nil { - // Don't return an error here, just return 0 - return make([]*publicrpcv1.GovernorGetAvailableNotionalByChainResponse_Entry, 0) + // Report 0 available notional if we can't calculate the current usage + gov.logger.Error("GetAvailableNotionalByChain: failed to compute sum of transfers for chain entry", + zap.String("chainID", chainId.String()), + zap.Error(err)) + resp = append(resp, &publicrpcv1.GovernorGetAvailableNotionalByChainResponse_Entry{ + ChainId: uint32(ce.emitterChainId), + RemainingAvailableNotional: 0, + NotionalLimit: ce.dailyLimit, + BigTransactionSize: ce.bigTransactionSize, + }) + continue } - if value >= ce.dailyLimit { - value = 0 - } else { - value = ce.dailyLimit - value + + remaining := gov.availableNotionalValue(chainId, netUsage) + + if !gov.flowCancelEnabled { + // When flow cancel is disabled, we expect that both the netUsage and remaining notional should be + // within the range of [0, dailyLimit]. Flow cancel allows flexibility here. netUsage may be + // negative if there is a lot of incoming flow; conversely, it may exceed dailyLimit if incoming + // flow added space, allowed additional transfers through, and then expired after 24h. + // Note that if flow cancel is enabled and then later disabled, netUsage can exceed dailyLimit + // for 24h as old transfers will be loaded from the database into the Governor, but the flow + // cancel transfers will not. The value should return to the normal range after 24h has elapsed + // since the old transfers were sent. + if netUsage < 0 || incoming != 0 { + gov.logger.Warn("GetAvailableNotionalByChain: net value for chain is negative even though flow cancel is disabled", + zap.String("chainID", chainId.String()), + zap.Uint64("dailyLimit", ce.dailyLimit), + zap.Int64("netUsage", netUsage), + zap.Error(err)) + } else if uint64(netUsage) > ce.dailyLimit { + gov.logger.Warn("GetAvailableNotionalByChain: net value for chain exceeds daily limit even though flow cancel is disabled", + zap.String("chainID", chainId.String()), + zap.Uint64("dailyLimit", ce.dailyLimit), + zap.Error(err)) + } + } resp = append(resp, &publicrpcv1.GovernorGetAvailableNotionalByChainResponse_Entry{ ChainId: uint32(ce.emitterChainId), - RemainingAvailableNotional: value, + RemainingAvailableNotional: remaining, NotionalLimit: ce.dailyLimit, BigTransactionSize: ce.bigTransactionSize, }) + } sort.SliceStable(resp, func(i, j int) bool { @@ -365,6 +415,25 @@ func (gov *ChainGovernor) IsVAAEnqueued(msgId *publicrpcv1.MessageID) (bool, err return false, nil } +// availableNotionalValue calculates the available notional USD value for a chain entry based on the net value +// of the chain. +func (gov *ChainGovernor) availableNotionalValue(id vaa.ChainID, netUsage int64) uint64 { + remaining := uint64(0) + ce := gov.chains[id] + + // Handle negative case here so we can safely cast to uint64 below + if netUsage < 0 { + // The full capacity is available for the chain. + remaining = ce.dailyLimit + } else if uint64(netUsage) > ce.dailyLimit { + remaining = 0 + } else { + remaining = ce.dailyLimit - uint64(netUsage) + } + + return remaining +} + // REST query to get the list of tokens being monitored by the governor. func (gov *ChainGovernor) GetTokenList() []*publicrpcv1.GovernorGetTokenListResponse_Entry { gov.mutex.Lock() @@ -441,21 +510,21 @@ func (gov *ChainGovernor) CollectMetrics(hb *gossipv1.Heartbeat, sendC chan<- [] if exists { enabled = "1" - value, err := sumValue(ce.transfers, startTime) + netUsage, _, _, err := sumValue(ce.transfers, startTime) + + remaining := uint64(0) if err != nil { // Error can occur if the sum overflows. Return 0 in this case rather than returning an // error. - value = 0 - } - if value >= ce.dailyLimit { - value = 0 + gov.logger.Error("CollectMetrics: failed to compute sum of transfers for chain entry", zap.String("chain", chain.String()), zap.Error(err)) + remaining = 0 } else { - value = ce.dailyLimit - value + remaining = gov.availableNotionalValue(chain, netUsage) } pending := len(ce.pending) totalNotional = fmt.Sprint(ce.dailyLimit) - available = float64(value) + available = float64(remaining) numPending = float64(pending) totalPending += pending } @@ -493,7 +562,9 @@ var governorMessagePrefixStatus = []byte("governor_status_000000000000000000|") func (gov *ChainGovernor) publishConfig(hb *gossipv1.Heartbeat, sendC chan<- []byte, gk *ecdsa.PrivateKey, ourAddr ethCommon.Address) { chains := make([]*gossipv1.ChainGovernorConfig_Chain, 0) - for _, ce := range gov.chains { + // Iterate deterministically by accessing keys from this slice instead of the chainEntry map directly + for _, cid := range gov.chainIds { + ce := gov.chains[cid] chains = append(chains, &gossipv1.ChainGovernorConfig_Chain{ ChainId: uint32(ce.emitterChainId), NotionalLimit: ce.dailyLimit, @@ -513,11 +584,12 @@ func (gov *ChainGovernor) publishConfig(hb *gossipv1.Heartbeat, sendC chan<- []b gov.configPublishCounter += 1 payload := &gossipv1.ChainGovernorConfig{ - NodeName: hb.NodeName, - Counter: gov.configPublishCounter, - Timestamp: hb.Timestamp, - Chains: chains, - Tokens: tokens, + NodeName: hb.NodeName, + Counter: gov.configPublishCounter, + Timestamp: hb.Timestamp, + Chains: chains, + Tokens: tokens, + FlowCancelEnabled: gov.flowCancelEnabled, } b, err := proto.Marshal(payload) @@ -551,16 +623,15 @@ func (gov *ChainGovernor) publishConfig(hb *gossipv1.Heartbeat, sendC chan<- []b func (gov *ChainGovernor) publishStatus(hb *gossipv1.Heartbeat, sendC chan<- []byte, startTime time.Time, gk *ecdsa.PrivateKey, ourAddr ethCommon.Address) { chains := make([]*gossipv1.ChainGovernorStatus_Chain, 0) numEnqueued := 0 - for _, ce := range gov.chains { - value, err := sumValue(ce.transfers, startTime) + for chainId, ce := range gov.chains { + // The capacity for the chain to emit further messages, denoted as USD value. + remaining := uint64(0) + netUsage, smallTxNotional, flowCancelNotional, err := sumValue(ce.transfers, startTime) - if err != nil || value >= ce.dailyLimit { - // In case of error, set value to 0 rather than returning an error to the caller. An error - // here means sumValue has encountered an overflow and this should never happen. Even if it did - // we don't want to stop execution here. - value = 0 + if err != nil { + gov.logger.Error("publishStatus: failed to compute sum of transfers for chain entry", zap.String("chain", chainId.String()), zap.Error(err)) } else { - value = ce.dailyLimit - value + remaining = gov.availableNotionalValue(chainId, netUsage) } enqueuedVaas := make([]*gossipv1.ChainGovernorStatus_EnqueuedVAA, 0) @@ -589,9 +660,12 @@ func (gov *ChainGovernor) publishStatus(hb *gossipv1.Heartbeat, sendC chan<- []b } chains = append(chains, &gossipv1.ChainGovernorStatus_Chain{ - ChainId: uint32(ce.emitterChainId), - RemainingAvailableNotional: value, - Emitters: []*gossipv1.ChainGovernorStatus_Emitter{&emitter}, + ChainId: uint32(ce.emitterChainId), + RemainingAvailableNotional: remaining, + Emitters: []*gossipv1.ChainGovernorStatus_Emitter{&emitter}, + SmallTxNetNotionalValue: netUsage, + SmallTxOutgoingNotionalValue: smallTxNotional, + FlowCancelNotionalValue: flowCancelNotional, }) } diff --git a/node/pkg/governor/governor_monitoring_test.go b/node/pkg/governor/governor_monitoring_test.go index aac6449556..5683e17429 100644 --- a/node/pkg/governor/governor_monitoring_test.go +++ b/node/pkg/governor/governor_monitoring_test.go @@ -11,7 +11,7 @@ import ( func TestIsVAAEnqueuedNilMessageID(t *testing.T) { logger, _ := zap.NewProduction() - gov := NewChainGovernor(logger, nil, common.GoTest) + gov := NewChainGovernor(logger, nil, common.GoTest, true) enqueued, err := gov.IsVAAEnqueued(nil) require.EqualError(t, err, "no message ID specified") assert.Equal(t, false, enqueued) diff --git a/node/pkg/governor/governor_prices.go b/node/pkg/governor/governor_prices.go index 510e7a5f00..f3ebe8fded 100644 --- a/node/pkg/governor/governor_prices.go +++ b/node/pkg/governor/governor_prices.go @@ -309,7 +309,7 @@ func CheckQuery(logger *zap.Logger) error { logger.Info("Instantiating governor.") ctx := context.Background() var db db.MockGovernorDB - gov := NewChainGovernor(logger, &db, common.MainNet) + gov := NewChainGovernor(logger, &db, common.MainNet, true) if err := gov.initConfig(); err != nil { return err diff --git a/node/pkg/governor/governor_test.go b/node/pkg/governor/governor_test.go index 964ec53345..388af53d37 100644 --- a/node/pkg/governor/governor_test.go +++ b/node/pkg/governor/governor_test.go @@ -185,6 +185,21 @@ func TestTrimEmptyTransfers(t *testing.T) { assert.Equal(t, 0, len(updatedTransfers)) } +// Make sure that the code doesn't panic if called with a nil chainEntry +func TestTrimAndSumValueForChainReturnsErrorForNilChainEntry(t *testing.T) { + ctx := context.Background() + gov, err := newChainGovernorForTest(ctx) + require.NoError(t, err) + assert.NotNil(t, gov) + + now, err := time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 1, 2022 at 12:00pm (CST)") + require.NoError(t, err) + + sum, err := gov.TrimAndSumValueForChain(nil, now) + require.Error(t, err) + assert.Equal(t, uint64(0), sum) +} + func TestSumAllFromToday(t *testing.T) { ctx := context.Background() gov, err := newChainGovernorForTest(ctx) @@ -283,6 +298,51 @@ func TestSumWithFlowCancelling(t *testing.T) { assert.Equal(t, difference, usage) } +func TestFlowCancelFeatureFlag(t *testing.T) { + + ctx := context.Background() + var db db.MockGovernorDB + gov := NewChainGovernor(zap.NewNop(), &db, common.GoTest, true) + + // Trigger the evaluation of the flow cancelling config + err := gov.Run(ctx) + require.NoError(t, err) + assert.NotNil(t, gov) + + // Test private bool + assert.True(t, gov.flowCancelEnabled) + // Test public getter + assert.True(t, gov.IsFlowCancelEnabled()) + numFlowCancelling := 0 + for _, tokenEntry := range gov.tokens { + if tokenEntry.flowCancels == true { + numFlowCancelling++ + } + } + assert.NotZero(t, numFlowCancelling) + + // Disable flow cancelling + gov = NewChainGovernor(zap.NewNop(), &db, common.GoTest, false) + + // Trigger the evaluation of the flow cancelling config + err = gov.Run(ctx) + require.NoError(t, err) + assert.NotNil(t, gov) + + // Test private bool + assert.False(t, gov.flowCancelEnabled) + // Test public getter + assert.False(t, gov.IsFlowCancelEnabled()) + numFlowCancelling = 0 + for _, tokenEntry := range gov.tokens { + if tokenEntry.flowCancels == true { + numFlowCancelling++ + } + } + assert.Zero(t, numFlowCancelling) + +} + // Flow cancelling transfers are subtracted from the overall sum of all transfers from a given // emitter chain. Since we are working with uint64 values, ensure that there is no underflow. // When the sum of all flow cancelling transfers is greater than emitted transfers for a chain, @@ -360,10 +420,12 @@ func TestFlowCancelCannotUnderflow(t *testing.T) { assert.Zero(t, usage) } -// Simulate a case where the total sum of transfers for a chain in a 24 hour period exceeds -// the configured Governor limit. This should never happen, so we make sure that an error -// is returned if the system is in this state -func TestInvariantGovernorLimit(t *testing.T) { +// We never expect this to occur when flow-cancelling is disabled. If flow-cancelling is enabled, there +// are some cases where the outgoing value exceeds the daily limit. Example: a large, incoming transfer +// of a flow-cancelling asset increases the Governor capacity beyond the daily limit. After 24h, that +// transfer is trimmed. This reduces the daily limit back to normal, but by this time more outgoing +// transfers have been emitted, causing the sum to exceed the daily limit. +func TestChainEntrySumExceedsDailyLimit(t *testing.T) { ctx := context.Background() gov, err := newChainGovernorForTest(ctx) require.NoError(t, err) @@ -406,10 +468,69 @@ func TestInvariantGovernorLimit(t *testing.T) { assert.Equal(t, expectedNumTransfers, len(transfers)) assert.NotZero(t, sum) - // Make sure we trigger the Invariant usage, err := gov.TrimAndSumValueForChain(emitter, now.Add(-time.Hour*24)) - require.ErrorContains(t, err, "invariant violation: calculated sum") - assert.Zero(t, usage) + require.NoError(t, err) + assert.Equal(t, emitterTransferValue*uint64(expectedNumTransfers), usage) +} + +func TestTrimAndSumValueOverflowErrors(t *testing.T) { + ctx := context.Background() + gov, err := newChainGovernorForTest(ctx) + require.NoError(t, err) + assert.NotNil(t, gov) + + now, err := time.Parse("2006-Jan-02", "2024-Feb-19") + require.NoError(t, err) + + var transfers_from_emitter []transfer + transferTime, err := time.Parse("2006-Jan-02", "2024-Feb-19") + require.NoError(t, err) + + emitterChainId := vaa.ChainIDSolana + + transfer, err := newTransferFromDbTransfer(&db.Transfer{Value: math.MaxInt64, Timestamp: transferTime}) + require.NoError(t, err) + transfer2, err := newTransferFromDbTransfer(&db.Transfer{Value: 1, Timestamp: transferTime}) + require.NoError(t, err) + transfers_from_emitter = append(transfers_from_emitter, transfer, transfer2) + + // Populate chainEntry and ChainGovernor + emitter := &chainEntry{ + transfers: transfers_from_emitter, + emitterChainId: vaa.ChainID(emitterChainId), + dailyLimit: 10000, + } + gov.chains[emitter.emitterChainId] = emitter + + sum, _, err := gov.TrimAndSumValue(emitter.transfers, now.Add(-time.Hour*24)) + require.ErrorContains(t, err, "integer overflow") + assert.Zero(t, sum) + usage, err := gov.TrimAndSumValueForChain(emitter, now.Add(-time.Hour*24)) + require.ErrorContains(t, err, "integer overflow") + assert.Equal(t, uint64(10000), usage) + + // overwrite emitter (discard transfer added above) + emitter = &chainEntry{ + emitterChainId: vaa.ChainID(emitterChainId), + dailyLimit: 10000, + } + gov.chains[emitter.emitterChainId] = emitter + + // Now test underflow + transfer3 := &db.Transfer{Value: math.MaxInt64, Timestamp: transferTime, TargetChain: vaa.ChainIDSolana} + + ce := gov.chains[emitter.emitterChainId] + err = ce.addFlowCancelTransferFromDbTransfer(transfer3) + require.NoError(t, err) + err = ce.addFlowCancelTransferFromDbTransfer(transfer3) + require.NoError(t, err) + + sum, _, err = gov.TrimAndSumValue(emitter.transfers, now.Add(-time.Hour*24)) + require.ErrorContains(t, err, "integer underflow") + assert.Zero(t, sum) + usage, err = gov.TrimAndSumValueForChain(emitter, now.Add(-time.Hour*24)) + require.ErrorContains(t, err, "integer underflow") + assert.Equal(t, uint64(10000), usage) } func TestTrimOneOfTwoTransfers(t *testing.T) { @@ -545,21 +666,21 @@ func newChainGovernorForTestWithLogger(ctx context.Context, logger *zap.Logger) } var db db.MockGovernorDB - gov := NewChainGovernor(logger, &db, common.GoTest) + gov := NewChainGovernor(logger, &db, common.GoTest, true) err := gov.Run(ctx) if err != nil { - return gov, nil + return gov, err } emitterAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16") if err != nil { - return gov, nil + return gov, err } tokenAddr, err := vaa.StringToAddress("0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E") if err != nil { - return gov, nil + return gov, err } gov.initConfigForTest( @@ -1371,7 +1492,6 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) { func TestPendingTransferBeingReleased(t *testing.T) { ctx := context.Background() gov, err := newChainGovernorForTest(ctx) - require.NoError(t, err) assert.NotNil(t, gov) @@ -1516,7 +1636,9 @@ func TestPendingTransferBeingReleased(t *testing.T) { assert.Equal(t, 4, len(gov.msgsSeen)) // If we check pending before noon, nothing should happen. - now, _ = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 9:00am (CST)") + now, err = time.Parse("Jan 2, 2006 at 3:04pm (MST)", "Jun 2, 2022 at 9:00am (CST)") + require.NoError(t, err) + assert.NotNil(t, now) toBePublished, err := gov.CheckPendingForTime(now) require.NoError(t, err) assert.Equal(t, 0, len(toBePublished)) @@ -1544,6 +1666,292 @@ func TestPendingTransferBeingReleased(t *testing.T) { assert.Equal(t, 3, len(gov.msgsSeen)) } +func TestPopulateChainIds(t *testing.T) { + ctx := context.Background() + gov, err := newChainGovernorForTest(ctx) + require.NoError(t, err) + assert.NotNil(t, gov) + // Sanity check + assert.NotZero(t, len(gov.chainIds)) + + // Ensure that the chainIds slice match the entries in the chains map + assert.Equal(t, len(gov.chains), len(gov.chainIds)) + lowest := 0 + for _, chainId := range gov.chainIds { + chainEntry, ok := gov.chains[chainId] + assert.NotNil(t, chainEntry) + assert.True(t, ok) + assert.Equal(t, chainEntry.emitterChainId, chainId) + // Check that the chainIds are in ascending order. The point of this slice is that it provides + // deterministic ordering over chainIds. + assert.Greater(t, int(chainId), lowest) + lowest = int(chainId) + } +} + +// Test that, when a small transfer (under the 'big tx limit') of a flow-cancelling asset is queued and +// later released, it causes a reduction in the Governor usage for the destination chain. +func TestPendingTransferFlowCancelsWhenReleased(t *testing.T) { + + ctx := context.Background() + gov, err := newChainGovernorForTest(ctx) + + require.NoError(t, err) + assert.NotNil(t, gov) + + // Set-up time + gov.setDayLengthInMinutes(24 * 60) + transferTime := time.Unix(int64(1654543099), 0) + + // Solana USDC used as the flow cancelling asset. This ensures that the flow cancel mechanism works + // when the Origin chain of the asset does not match the emitter chain + // NOTE: Replace this Chain:Address pair if the Flow Cancel Token List is modified + var flowCancelTokenOriginAddress vaa.Address + flowCancelTokenOriginAddress, err = vaa.StringToAddress("c6fa7af3bedbad3a3d65f36aabc97431b1bbe4c2d2f6e0e47ca60203452f5d61") + require.NoError(t, err) + + require.NoError(t, err) + + // Data for Ethereum + tokenBridgeAddrStrEthereum := "0x0290fb167208af455bb137780163b7b7a9a10c16" //nolint:gosec + tokenBridgeAddrEthereum, err := vaa.StringToAddress(tokenBridgeAddrStrEthereum) + require.NoError(t, err) + recipientEthereum := "0x707f9118e33a9b8998bea41dd0d46f38bb963fc8" //nolint:gosec + + // Data for Sui + tokenBridgeAddrStrSui := "0xc57508ee0d4595e5a8728974a4a93a787d38f339757230d441e895422c07aba9" //nolint:gosec + tokenBridgeAddrSui, err := vaa.StringToAddress(tokenBridgeAddrStrSui) + require.NoError(t, err) + recipientSui := "0x84a5f374d29fc77e370014dce4fd6a55b58ad608de8074b0be5571701724da31" + + // Data for Solana. Only used to represent the flow cancel asset. + // "wormDTUJ6AWPNvk59vGQbDvGJmqbDTdgWgAqcLBCgUb" + tokenBridgeAddrStrSolana := "0x0e0a589e6488147a94dcfa592b90fdd41152bb2ca77bf6016758a6f4df9d21b4" //nolint:gosec + + // Add chain entries to `gov` + dailyLimit := uint64(10000) + err = gov.setChainForTesting(vaa.ChainIDEthereum, tokenBridgeAddrStrEthereum, dailyLimit, 0) + require.NoError(t, err) + err = gov.setChainForTesting(vaa.ChainIDSui, tokenBridgeAddrStrSui, dailyLimit, 0) + require.NoError(t, err) + err = gov.setChainForTesting(vaa.ChainIDSolana, tokenBridgeAddrStrSolana, dailyLimit, 0) + require.NoError(t, err) + + // Add flow cancel asset and non-flow cancelable asset to the token entry for `gov` + err = gov.setTokenForTesting(vaa.ChainIDSolana, flowCancelTokenOriginAddress.String(), "USDC", 1.0, true) + require.NoError(t, err) + assert.NotNil(t, gov.tokens[tokenKey{chain: vaa.ChainIDSolana, addr: flowCancelTokenOriginAddress}]) + + // First message: consume most of the dailyLimit for the emitter chain + msg1 := common.MessagePublication{ + TxHash: hashFromString("0x888888f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a8888"), + Timestamp: time.Unix(int64(transferTime.Unix()+1), 0), + Nonce: uint32(1), + Sequence: uint64(1), + EmitterChain: vaa.ChainIDEthereum, + EmitterAddress: tokenBridgeAddrEthereum, + ConsistencyLevel: uint8(32), + Payload: buildMockTransferPayloadBytes(1, + vaa.ChainIDSolana, // The origin asset for the token being transferred + flowCancelTokenOriginAddress.String(), + vaa.ChainIDSui, + recipientSui, + 10000, + ), + } + + // Second message: This transfer gets queued because the limit is exhausted + msg2 := common.MessagePublication{ + TxHash: hashFromString("0x888888f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a8888"), + Timestamp: time.Unix(int64(transferTime.Unix()+2), 0), + Nonce: uint32(2), + Sequence: uint64(2), + EmitterChain: vaa.ChainIDEthereum, + EmitterAddress: tokenBridgeAddrEthereum, + ConsistencyLevel: uint8(32), + Payload: buildMockTransferPayloadBytes(1, + vaa.ChainIDSolana, + flowCancelTokenOriginAddress.String(), + vaa.ChainIDSui, + recipientSui, + 500, + ), + } + + // Third message: Incoming flow cancelling transfer to the emitter chain for the previous messages. This + // reduces the Governor usage for that chain. + msg3 := common.MessagePublication{ + TxHash: hashFromString("0x888888f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a8888"), + Timestamp: time.Unix(int64(transferTime.Unix()+3), 0), + Nonce: uint32(3), + Sequence: uint64(3), + EmitterChain: vaa.ChainIDSui, + EmitterAddress: tokenBridgeAddrSui, + ConsistencyLevel: uint8(0), // Sui has a consistency level of 0 (instant) + Payload: buildMockTransferPayloadBytes(1, + vaa.ChainIDSolana, + flowCancelTokenOriginAddress.String(), + vaa.ChainIDEthereum, + recipientEthereum, + 1000, + ), + } + + // Stage 0: No transfers sent + chainEntryEthereum, exists := gov.chains[vaa.ChainIDEthereum] + assert.True(t, exists) + assert.NotNil(t, chainEntryEthereum) + chainEntrySui, exists := gov.chains[vaa.ChainIDSui] + assert.True(t, exists) + assert.NotNil(t, chainEntrySui) + sumEth, ethTransfers, err := gov.TrimAndSumValue(chainEntryEthereum.transfers, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Zero(t, len(ethTransfers)) + assert.Zero(t, len(chainEntryEthereum.pending)) + assert.Zero(t, sumEth) + require.NoError(t, err) + sumSui, suiTransfers, err := gov.TrimAndSumValue(chainEntrySui.transfers, time.Unix(int64(1654543099), 0)) + assert.Zero(t, len(suiTransfers)) + assert.Zero(t, sumSui) + require.NoError(t, err) + + // Perform a FIRST transfer (Ethereum --> Sui) + result, err := gov.ProcessMsgForTime(&msg1, time.Now()) + assert.True(t, result) + require.NoError(t, err) + + numTrans, netValueTrans, numPending, valuePending := gov.getStatsForAllChainsCancelFlow() + assert.Equal(t, 2, numTrans) // One for the positive and one for the negative + assert.Equal(t, int64(0), netValueTrans) // Zero, because the asset flow cancels + assert.Equal(t, 0, numPending) + assert.Equal(t, uint64(0), valuePending) + assert.Equal(t, 1, len(gov.msgsSeen)) + + // Check the state of the governor + chainEntryEthereum = gov.chains[vaa.ChainIDEthereum] + chainEntrySui = gov.chains[vaa.ChainIDSui] + assert.Equal(t, int(1), len(chainEntryEthereum.transfers)) + assert.Equal(t, int(0), len(chainEntryEthereum.pending)) // One for inbound refund and another for outbound + assert.Equal(t, int(1), len(chainEntrySui.transfers)) + sumEth, ethTransfers, err = gov.TrimAndSumValue(chainEntryEthereum.transfers, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, int64(10000), sumEth) // Equal to total dailyLimit + assert.Equal(t, int(1), len(ethTransfers)) + require.NoError(t, err) + + // Outbound check: + // - ensure that the sum of the transfers is equal to the value of the inverse transfer + // - ensure the actual governor usage is Zero (any negative value is converted to zero by TrimAndSumValueForChain) + sumSui, suiTransfers, err = gov.TrimAndSumValue(chainEntrySui.transfers, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, 1, len(suiTransfers)) // A single NEGATIVE transfer + assert.Equal(t, int64(-10000), sumSui) // Ensure the inverse (negative) transfer is in the Sui chain Entry + require.NoError(t, err) + suiGovernorUsage, err := gov.TrimAndSumValueForChain(chainEntrySui, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Zero(t, suiGovernorUsage) // Actual governor usage must not be negative. + require.NoError(t, err) + + // Perform a SECOND transfer (Ethereum --> Sui again) + // When a transfer is queued, ProcessMsgForTime should return false. + result, err = gov.ProcessMsgForTime(&msg2, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.False(t, result) + require.NoError(t, err) + + // Stage 2: Transfer sent from Ethereum to Sui gets queued + numTrans, netValueTrans, numPending, valuePending = gov.getStatsForAllChainsCancelFlow() + assert.Equal(t, 2, len(gov.msgsSeen)) // Two messages observed + assert.Equal(t, 2, numTrans) // Two transfers (same as previous step) + assert.Equal(t, int64(0), netValueTrans) // The two transfers and their inverses cancel each other out. + assert.Equal(t, 1, numPending) // Second transfer is queued because the limit is exhausted + assert.Equal(t, uint64(500), valuePending) + + // Check the state of the governor. + chainEntryEthereum = gov.chains[vaa.ChainIDEthereum] + chainEntrySui = gov.chains[vaa.ChainIDSui] + assert.Equal(t, int(1), len(chainEntryEthereum.transfers)) // One from previous step + assert.Equal(t, int(1), len(chainEntryEthereum.pending)) // One for inbound refund and another for outbound + assert.Equal(t, int(1), len(chainEntrySui.transfers)) // One inverse transfer. Inverse from pending not added yet + sumEth, ethTransfers, err = gov.TrimAndSumValue(chainEntryEthereum.transfers, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, int64(10000), sumEth) // Same as before: full dailyLimit + assert.Equal(t, int(1), len(ethTransfers)) // Same as before + require.NoError(t, err) + sumSui, suiTransfers, err = gov.TrimAndSumValue(chainEntrySui.transfers, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, int(1), len(suiTransfers)) // just the inverse from before + assert.Equal(t, int64(-10000), sumSui) // Unchanged. + require.NoError(t, err) + suiGovernorUsage, err = gov.TrimAndSumValueForChain(chainEntrySui, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Zero(t, suiGovernorUsage) // Actual governor usage must not be negative. + require.NoError(t, err) + + // Stage 3: Message that reduces Governor usage for Ethereum (Sui --> Ethereum) + result, err = gov.ProcessMsgForTime(&msg3, time.Now()) + assert.True(t, result) + require.NoError(t, err) + + // Stage 3: Governor usage reduced on Ethereum due to incoming from Sui + numTrans, netValueTrans, numPending, valuePending = gov.getStatsForAllChainsCancelFlow() + assert.Equal(t, 3, len(gov.msgsSeen)) + assert.Equal(t, 4, numTrans) // Two transfers and their inverses + assert.Equal(t, int64(0), netValueTrans) // Still zero because everything flow cancels + assert.Equal(t, 1, numPending) // Not released yet + assert.Equal(t, uint64(500), valuePending) + + // Check the state of the governor + chainEntryEthereum = gov.chains[vaa.ChainIDEthereum] + chainEntrySui = gov.chains[vaa.ChainIDSui] + assert.Equal(t, int(2), len(chainEntryEthereum.transfers)) + assert.Equal(t, int(1), len(chainEntryEthereum.pending)) // We have not yet released the pending transfer + assert.Equal(t, int(2), len(chainEntrySui.transfers)) + sumEth, ethTransfers, err = gov.TrimAndSumValue(chainEntryEthereum.transfers, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, int64(9000), sumEth) // We freed up room because of Sui incoming + assert.Equal(t, int(2), len(ethTransfers)) // Two transfers cancel each other out + require.NoError(t, err) + sumSui, suiTransfers, err = gov.TrimAndSumValue(chainEntrySui.transfers, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, int(2), len(suiTransfers)) + assert.Equal(t, int64(-9000), sumSui) // We consumed some outbound capacity + require.NoError(t, err) + suiGovernorUsage, err = gov.TrimAndSumValueForChain(chainEntrySui, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, uint64(0), suiGovernorUsage) // Still zero because it's still negative + require.NoError(t, err) + + // Stage 4: Release the pending transfer. We deliberately do not advance the time here because we are relying + // on the pending transfer being released as a result of flow-cancelling and not because 24 hours have passed. + // NOTE that even though the function says "Checked..." it modifies `gov` as a side-effect when a pending + // transfer is ready to be released + toBePublished, err := gov.CheckPendingForTime(time.Unix(int64(transferTime.Unix()-1000), 0)) + require.NoError(t, err) + assert.Equal(t, 1, len(toBePublished)) + + // Stage 4: Pending transfer released. This increases the Ethereum Governor usage again and reduces Sui. + numTrans, netValueTrans, numPending, valuePending = gov.getStatsForAllChainsCancelFlow() + assert.Equal(t, 3, len(gov.msgsSeen)) + assert.Equal(t, 6, numTrans) // Two new transfers created from previous pending transfer + assert.Equal(t, int64(0), netValueTrans) // Still zero because everything flow cancels + assert.Equal(t, 0, numPending) // Pending transfer has been released + assert.Equal(t, uint64(0), valuePending) + + // Verify the stats that are non flow-cancelling. + // In practice this is the sum of the absolute value of all the transfers, including the inverses. + // 2 * (10000 + 1000 + 500) = 23000 + _, absValueTrans, _, _ := gov.getStatsForAllChains() + assert.Equal(t, uint64(23000), absValueTrans) + + // Check the state of the governor + chainEntryEthereum = gov.chains[vaa.ChainIDEthereum] + chainEntrySui = gov.chains[vaa.ChainIDSui] + assert.Equal(t, int(3), len(chainEntryEthereum.transfers)) // Two outbound, one inverse from Sui + assert.Equal(t, int(0), len(chainEntryEthereum.pending)) // Released + assert.Equal(t, int(3), len(chainEntrySui.transfers)) // One outbound, two inverses from Ethereum + sumEth, ethTransfers, err = gov.TrimAndSumValue(chainEntryEthereum.transfers, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, int64(9500), sumEth) + assert.Equal(t, int(3), len(ethTransfers)) + require.NoError(t, err) + sumSui, suiTransfers, err = gov.TrimAndSumValue(chainEntrySui.transfers, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, int(3), len(suiTransfers)) // New inverse transfer added after pending transfer was released + assert.Equal(t, int64(-9500), sumSui) // Flow-cancelling inverse transfer added to Sui after released + require.NoError(t, err) + suiGovernorUsage, err = gov.TrimAndSumValueForChain(chainEntrySui, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, uint64(0), suiGovernorUsage) // Still zero + require.NoError(t, err) +} + func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) { ctx := context.Background() gov, err := newChainGovernorForTest(ctx) @@ -1775,7 +2183,7 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) { func TestMainnetConfigIsValid(t *testing.T) { logger := zap.NewNop() var db db.MockGovernorDB - gov := NewChainGovernor(logger, &db, common.GoTest) + gov := NewChainGovernor(logger, &db, common.GoTest, true) gov.env = common.TestNet err := gov.initConfig() @@ -1785,7 +2193,7 @@ func TestMainnetConfigIsValid(t *testing.T) { func TestTestnetConfigIsValid(t *testing.T) { logger := zap.NewNop() var db db.MockGovernorDB - gov := NewChainGovernor(logger, &db, common.GoTest) + gov := NewChainGovernor(logger, &db, common.GoTest, true) gov.env = common.TestNet err := gov.initConfig() @@ -2061,9 +2469,9 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T // But the big transaction should not affect the daily notional. ce, exists := gov.chains[vaa.ChainIDEthereum] require.Equal(t, true, exists) - valueTrans, err = sumValue(ce.transfers, now) + _, outgoing, _, err := sumValue(ce.transfers, now) require.NoError(t, err) - assert.Equal(t, uint64(0), valueTrans) + assert.Equal(t, uint64(0), outgoing) } func TestSmallTransactionsGetReleasedWhenTheTimerExpires(t *testing.T) { diff --git a/node/pkg/governor/testnet_config.go b/node/pkg/governor/testnet_config.go index c2650bcc17..6c7788f096 100644 --- a/node/pkg/governor/testnet_config.go +++ b/node/pkg/governor/testnet_config.go @@ -14,8 +14,12 @@ func (gov *ChainGovernor) initTestnetConfig() ([]tokenConfigEntry, []tokenConfig {chain: 1, addr: "3b442cb3912157f13a933d0134282d032b5ffecd01a2dbf1b7790608df002ea7", symbol: "USDC", coinGeckoId: "usdc", decimals: 6, price: 1}, // Addr: 4zMMC9srt5Ri5X14GAgXhaHii3GnPAEERYPJgZJDncDU, Notional: 1 } - flowCancelTokens := []tokenConfigEntry{ - {chain: 1, addr: "3b442cb3912157f13a933d0134282d032b5ffecd01a2dbf1b7790608df002ea7", symbol: "USDC", coinGeckoId: "usdc", decimals: 6, price: 1}, // Addr: 4zMMC9srt5Ri5X14GAgXhaHii3GnPAEERYPJgZJDncDU, Notional: 1 + flowCancelTokens := []tokenConfigEntry{} + + if gov.flowCancelEnabled { + flowCancelTokens = []tokenConfigEntry{ + {chain: 1, addr: "3b442cb3912157f13a933d0134282d032b5ffecd01a2dbf1b7790608df002ea7", symbol: "USDC", coinGeckoId: "usdc", decimals: 6, price: 1}, // Addr: 4zMMC9srt5Ri5X14GAgXhaHii3GnPAEERYPJgZJDncDU, Notional: 1 + } } chains := []chainConfigEntry{ diff --git a/node/pkg/node/node_test.go b/node/pkg/node/node_test.go index 8c37e3dbf4..ecedf76573 100644 --- a/node/pkg/node/node_test.go +++ b/node/pkg/node/node_test.go @@ -188,7 +188,7 @@ func mockGuardianRunnable(t testing.TB, gs []*mockGuardian, mockGuardianIndex ui GuardianOptionDatabase(db), GuardianOptionWatchers(watcherConfigs, nil), GuardianOptionNoAccountant(), // disable accountant - GuardianOptionGovernor(true), + GuardianOptionGovernor(true, false), GuardianOptionGatewayRelayer("", nil), // disable gateway relayer GuardianOptionP2P(gs[mockGuardianIndex].p2pKey, networkID, bootstrapPeers, nodeName, false, cfg.p2pPort, "", 0, "", "", func() string { return "" }), GuardianOptionPublicRpcSocket(cfg.publicSocket, publicRpcLogDetail), diff --git a/node/pkg/node/options.go b/node/pkg/node/options.go index 8ccd9e3073..7bd0d88be1 100644 --- a/node/pkg/node/options.go +++ b/node/pkg/node/options.go @@ -195,14 +195,18 @@ func GuardianOptionAccountant( // GuardianOptionGovernor enables or disables the governor. // Dependencies: db -func GuardianOptionGovernor(governorEnabled bool) *GuardianOption { +func GuardianOptionGovernor(governorEnabled bool, flowCancelEnabled bool) *GuardianOption { return &GuardianOption{ name: "governor", dependencies: []string{"db"}, f: func(ctx context.Context, logger *zap.Logger, g *G) error { if governorEnabled { - logger.Info("chain governor is enabled") - g.gov = governor.NewChainGovernor(logger, g.db, g.env) + if flowCancelEnabled { + logger.Info("chain governor is enabled with flow cancel enabled") + } else { + logger.Info("chain governor is enabled without flow cancel") + } + g.gov = governor.NewChainGovernor(logger, g.db, g.env, flowCancelEnabled) } else { logger.Info("chain governor is disabled") } diff --git a/node/pkg/p2p/p2p.go b/node/pkg/p2p/p2p.go index b4ffded1d7..ed353938b3 100644 --- a/node/pkg/p2p/p2p.go +++ b/node/pkg/p2p/p2p.go @@ -459,7 +459,11 @@ func Run(params *RunParams) func(ctx context.Context) error { features := make([]string, 0) if params.gov != nil { - features = append(features, "governor") + if params.gov.IsFlowCancelEnabled() { + features = append(features, "governor:fc") + } else { + features = append(features, "governor") + } } if params.acct != nil { features = append(features, params.acct.FeatureString()) diff --git a/node/pkg/proto/gossip/v1/gossip.pb.go b/node/pkg/proto/gossip/v1/gossip.pb.go index 96ab3498eb..0fb9ea3d49 100644 --- a/node/pkg/proto/gossip/v1/gossip.pb.go +++ b/node/pkg/proto/gossip/v1/gossip.pb.go @@ -720,11 +720,12 @@ type ChainGovernorConfig struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - NodeName string `protobuf:"bytes,1,opt,name=node_name,json=nodeName,proto3" json:"node_name,omitempty"` - Counter int64 `protobuf:"varint,2,opt,name=counter,proto3" json:"counter,omitempty"` - Timestamp int64 `protobuf:"varint,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"` - Chains []*ChainGovernorConfig_Chain `protobuf:"bytes,4,rep,name=chains,proto3" json:"chains,omitempty"` - Tokens []*ChainGovernorConfig_Token `protobuf:"bytes,5,rep,name=tokens,proto3" json:"tokens,omitempty"` + NodeName string `protobuf:"bytes,1,opt,name=node_name,json=nodeName,proto3" json:"node_name,omitempty"` + Counter int64 `protobuf:"varint,2,opt,name=counter,proto3" json:"counter,omitempty"` + Timestamp int64 `protobuf:"varint,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + Chains []*ChainGovernorConfig_Chain `protobuf:"bytes,4,rep,name=chains,proto3" json:"chains,omitempty"` + Tokens []*ChainGovernorConfig_Token `protobuf:"bytes,5,rep,name=tokens,proto3" json:"tokens,omitempty"` + FlowCancelEnabled bool `protobuf:"varint,6,opt,name=flow_cancel_enabled,json=flowCancelEnabled,proto3" json:"flow_cancel_enabled,omitempty"` } func (x *ChainGovernorConfig) Reset() { @@ -794,6 +795,13 @@ func (x *ChainGovernorConfig) GetTokens() []*ChainGovernorConfig_Token { return nil } +func (x *ChainGovernorConfig) GetFlowCancelEnabled() bool { + if x != nil { + return x.FlowCancelEnabled + } + return false +} + // This message is published every minute. type SignedChainGovernorStatus struct { state protoimpl.MessageState @@ -1404,9 +1412,12 @@ type ChainGovernorStatus_Chain struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - ChainId uint32 `protobuf:"varint,1,opt,name=chain_id,json=chainId,proto3" json:"chain_id,omitempty"` - RemainingAvailableNotional uint64 `protobuf:"varint,2,opt,name=remaining_available_notional,json=remainingAvailableNotional,proto3" json:"remaining_available_notional,omitempty"` - Emitters []*ChainGovernorStatus_Emitter `protobuf:"bytes,3,rep,name=emitters,proto3" json:"emitters,omitempty"` + ChainId uint32 `protobuf:"varint,1,opt,name=chain_id,json=chainId,proto3" json:"chain_id,omitempty"` + RemainingAvailableNotional uint64 `protobuf:"varint,2,opt,name=remaining_available_notional,json=remainingAvailableNotional,proto3" json:"remaining_available_notional,omitempty"` + Emitters []*ChainGovernorStatus_Emitter `protobuf:"bytes,3,rep,name=emitters,proto3" json:"emitters,omitempty"` + SmallTxNetNotionalValue int64 `protobuf:"varint,4,opt,name=small_tx_net_notional_value,json=smallTxNetNotionalValue,proto3" json:"small_tx_net_notional_value,omitempty"` + SmallTxOutgoingNotionalValue uint64 `protobuf:"varint,5,opt,name=small_tx_outgoing_notional_value,json=smallTxOutgoingNotionalValue,proto3" json:"small_tx_outgoing_notional_value,omitempty"` + FlowCancelNotionalValue uint64 `protobuf:"varint,6,opt,name=flow_cancel_notional_value,json=flowCancelNotionalValue,proto3" json:"flow_cancel_notional_value,omitempty"` } func (x *ChainGovernorStatus_Chain) Reset() { @@ -1462,6 +1473,27 @@ func (x *ChainGovernorStatus_Chain) GetEmitters() []*ChainGovernorStatus_Emitter return nil } +func (x *ChainGovernorStatus_Chain) GetSmallTxNetNotionalValue() int64 { + if x != nil { + return x.SmallTxNetNotionalValue + } + return 0 +} + +func (x *ChainGovernorStatus_Chain) GetSmallTxOutgoingNotionalValue() uint64 { + if x != nil { + return x.SmallTxOutgoingNotionalValue + } + return 0 +} + +func (x *ChainGovernorStatus_Chain) GetFlowCancelNotionalValue() uint64 { + if x != nil { + return x.FlowCancelNotionalValue + } + return 0 +} + var File_gossip_v1_gossip_proto protoreflect.FileDescriptor var file_gossip_v1_gossip_proto_rawDesc = []byte{ @@ -1586,8 +1618,8 @@ var file_gossip_v1_gossip_proto_rawDesc = []byte{ 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x67, 0x75, 0x61, 0x72, 0x64, 0x69, 0x61, 0x6e, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, - 0x52, 0x0c, 0x67, 0x75, 0x61, 0x72, 0x64, 0x69, 0x61, 0x6e, 0x41, 0x64, 0x64, 0x72, 0x22, 0xd1, - 0x03, 0x0a, 0x13, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x47, 0x6f, 0x76, 0x65, 0x72, 0x6e, 0x6f, 0x72, + 0x52, 0x0c, 0x67, 0x75, 0x61, 0x72, 0x64, 0x69, 0x61, 0x6e, 0x41, 0x64, 0x64, 0x72, 0x22, 0x81, + 0x04, 0x0a, 0x13, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x47, 0x6f, 0x76, 0x65, 0x72, 0x6e, 0x6f, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x1b, 0x0a, 0x09, 0x6e, 0x6f, 0x64, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6e, 0x6f, 0x64, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x65, 0x72, 0x18, 0x02, @@ -1601,7 +1633,10 @@ var file_gossip_v1_gossip_proto_rawDesc = []byte{ 0x65, 0x6e, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x24, 0x2e, 0x67, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x47, 0x6f, 0x76, 0x65, 0x72, 0x6e, 0x6f, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x52, - 0x06, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x73, 0x1a, 0x7b, 0x0a, 0x05, 0x43, 0x68, 0x61, 0x69, 0x6e, + 0x06, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x73, 0x12, 0x2e, 0x0a, 0x13, 0x66, 0x6c, 0x6f, 0x77, 0x5f, + 0x63, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x5f, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x18, 0x06, + 0x20, 0x01, 0x28, 0x08, 0x52, 0x11, 0x66, 0x6c, 0x6f, 0x77, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, + 0x45, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x1a, 0x7b, 0x0a, 0x05, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x12, 0x19, 0x0a, 0x08, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x49, 0x64, 0x12, 0x25, 0x0a, 0x0e, 0x6e, 0x6f, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x6c, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x02, 0x20, @@ -1623,7 +1658,7 @@ var file_gossip_v1_gossip_proto_rawDesc = []byte{ 0x74, 0x75, 0x72, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x67, 0x75, 0x61, 0x72, 0x64, 0x69, 0x61, 0x6e, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0c, 0x67, 0x75, - 0x61, 0x72, 0x64, 0x69, 0x61, 0x6e, 0x41, 0x64, 0x64, 0x72, 0x22, 0x98, 0x05, 0x0a, 0x13, 0x43, + 0x61, 0x72, 0x64, 0x69, 0x61, 0x6e, 0x41, 0x64, 0x64, 0x72, 0x22, 0xdb, 0x06, 0x0a, 0x13, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x47, 0x6f, 0x76, 0x65, 0x72, 0x6e, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x1b, 0x0a, 0x09, 0x6e, 0x6f, 0x64, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6e, 0x6f, 0x64, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, @@ -1654,7 +1689,7 @@ var file_gossip_v1_gossip_proto_rawDesc = []byte{ 0x32, 0x2a, 0x2e, 0x67, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x47, 0x6f, 0x76, 0x65, 0x72, 0x6e, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x64, 0x56, 0x41, 0x41, 0x52, 0x0c, 0x65, 0x6e, - 0x71, 0x75, 0x65, 0x75, 0x65, 0x64, 0x56, 0x61, 0x61, 0x73, 0x1a, 0xa8, 0x01, 0x0a, 0x05, 0x43, + 0x71, 0x75, 0x65, 0x75, 0x65, 0x64, 0x56, 0x61, 0x61, 0x73, 0x1a, 0xeb, 0x02, 0x0a, 0x05, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x12, 0x19, 0x0a, 0x08, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x49, 0x64, 0x12, 0x40, 0x0a, 0x1c, 0x72, 0x65, 0x6d, 0x61, 0x69, 0x6e, 0x69, 0x6e, 0x67, 0x5f, 0x61, 0x76, 0x61, @@ -1665,23 +1700,35 @@ var file_gossip_v1_gossip_proto_rawDesc = []byte{ 0x03, 0x28, 0x0b, 0x32, 0x26, 0x2e, 0x67, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x47, 0x6f, 0x76, 0x65, 0x72, 0x6e, 0x6f, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x45, 0x6d, 0x69, 0x74, 0x74, 0x65, 0x72, 0x52, 0x08, 0x65, 0x6d, 0x69, - 0x74, 0x74, 0x65, 0x72, 0x73, 0x22, 0x57, 0x0a, 0x12, 0x53, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x51, - 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x23, 0x0a, 0x0d, 0x71, - 0x75, 0x65, 0x72, 0x79, 0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x0c, 0x52, 0x0c, 0x71, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x0c, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x22, 0x5a, - 0x0a, 0x13, 0x53, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x5f, 0x72, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0d, 0x71, - 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1c, 0x0a, 0x09, - 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, - 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x42, 0x41, 0x5a, 0x3f, 0x67, 0x69, - 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x65, 0x72, 0x74, 0x75, 0x73, 0x6f, - 0x6e, 0x65, 0x2f, 0x77, 0x6f, 0x72, 0x6d, 0x68, 0x6f, 0x6c, 0x65, 0x2f, 0x6e, 0x6f, 0x64, 0x65, - 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x73, 0x73, 0x69, - 0x70, 0x2f, 0x76, 0x31, 0x3b, 0x67, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x76, 0x31, 0x62, 0x06, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x74, 0x74, 0x65, 0x72, 0x73, 0x12, 0x3c, 0x0a, 0x1b, 0x73, 0x6d, 0x61, 0x6c, 0x6c, 0x5f, 0x74, + 0x78, 0x5f, 0x6e, 0x65, 0x74, 0x5f, 0x6e, 0x6f, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x6c, 0x5f, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x17, 0x73, 0x6d, 0x61, 0x6c, + 0x6c, 0x54, 0x78, 0x4e, 0x65, 0x74, 0x4e, 0x6f, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x6c, 0x56, 0x61, + 0x6c, 0x75, 0x65, 0x12, 0x46, 0x0a, 0x20, 0x73, 0x6d, 0x61, 0x6c, 0x6c, 0x5f, 0x74, 0x78, 0x5f, + 0x6f, 0x75, 0x74, 0x67, 0x6f, 0x69, 0x6e, 0x67, 0x5f, 0x6e, 0x6f, 0x74, 0x69, 0x6f, 0x6e, 0x61, + 0x6c, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x04, 0x52, 0x1c, 0x73, + 0x6d, 0x61, 0x6c, 0x6c, 0x54, 0x78, 0x4f, 0x75, 0x74, 0x67, 0x6f, 0x69, 0x6e, 0x67, 0x4e, 0x6f, + 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x6c, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x3b, 0x0a, 0x1a, 0x66, + 0x6c, 0x6f, 0x77, 0x5f, 0x63, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x5f, 0x6e, 0x6f, 0x74, 0x69, 0x6f, + 0x6e, 0x61, 0x6c, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x04, 0x52, + 0x17, 0x66, 0x6c, 0x6f, 0x77, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x4e, 0x6f, 0x74, 0x69, 0x6f, + 0x6e, 0x61, 0x6c, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x57, 0x0a, 0x12, 0x53, 0x69, 0x67, 0x6e, + 0x65, 0x64, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x23, + 0x0a, 0x0d, 0x71, 0x75, 0x65, 0x72, 0x79, 0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0c, 0x71, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, + 0x65, 0x22, 0x5a, 0x0a, 0x13, 0x53, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x51, 0x75, 0x65, 0x72, 0x79, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x71, 0x75, 0x65, 0x72, + 0x79, 0x5f, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, + 0x52, 0x0d, 0x71, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x1c, 0x0a, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0c, 0x52, 0x09, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x42, 0x41, 0x5a, + 0x3f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x65, 0x72, 0x74, + 0x75, 0x73, 0x6f, 0x6e, 0x65, 0x2f, 0x77, 0x6f, 0x72, 0x6d, 0x68, 0x6f, 0x6c, 0x65, 0x2f, 0x6e, + 0x6f, 0x64, 0x65, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, + 0x73, 0x73, 0x69, 0x70, 0x2f, 0x76, 0x31, 0x3b, 0x67, 0x6f, 0x73, 0x73, 0x69, 0x70, 0x76, 0x31, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/node/pkg/publicrpc/publicrpcserver_test.go b/node/pkg/publicrpc/publicrpcserver_test.go index e9b614d262..a6b9dad416 100644 --- a/node/pkg/publicrpc/publicrpcserver_test.go +++ b/node/pkg/publicrpc/publicrpcserver_test.go @@ -69,7 +69,7 @@ func TestGetSignedVAABadAddress(t *testing.T) { func TestGovernorIsVAAEnqueuedNoMessage(t *testing.T) { ctx := context.Background() logger, _ := zap.NewProduction() - gov := governor.NewChainGovernor(logger, nil, common.GoTest) + gov := governor.NewChainGovernor(logger, nil, common.GoTest, false) server := &PublicrpcServer{logger: logger, gov: gov} // A message without the messageId set should not panic but return an error instead. diff --git a/proto/gossip/v1/gossip.proto b/proto/gossip/v1/gossip.proto index 914d707f01..05564fedfb 100644 --- a/proto/gossip/v1/gossip.proto +++ b/proto/gossip/v1/gossip.proto @@ -156,6 +156,7 @@ message ChainGovernorConfig { int64 timestamp = 3; repeated Chain chains = 4; repeated Token tokens = 5; + bool flow_cancel_enabled = 6; } // This message is published every minute. @@ -188,6 +189,9 @@ message ChainGovernorStatus { uint32 chain_id = 1; uint64 remaining_available_notional = 2; repeated Emitter emitters = 3; + int64 small_tx_net_notional_value = 4; + uint64 small_tx_outgoing_notional_value = 5; + uint64 flow_cancel_notional_value = 6; } string node_name = 1;