Skip to content

Commit

Permalink
Callback Logs Cleanup Pt. 3 (ICQCallbacks) (#554)
Browse files Browse the repository at this point in the history
  • Loading branch information
sampocs authored Jan 14, 2023
1 parent 3c3c54b commit 9a8b757
Show file tree
Hide file tree
Showing 23 changed files with 518 additions and 467 deletions.
4 changes: 2 additions & 2 deletions app/apptesting/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ func (s *AppTestHelper) Setup() {
s.TestAccs = CreateRandomAccounts(3)
s.IbcEnabled = false
s.IcaAddresses = make(map[string]string)

}

// Mints coins directly to a module account
Expand Down Expand Up @@ -138,6 +137,7 @@ func (s *AppTestHelper) CreateTransferChannel(hostChainID string) {
s.App = s.StrideChain.App.(*app.StrideApp)
s.HostApp = s.HostChain.GetSimApp()
s.Ctx = s.StrideChain.GetContext()

// Finally confirm the channel was setup properly
s.Require().Equal(ibctesting.FirstClientID, s.TransferPath.EndpointA.ClientID, "stride clientID")
s.Require().Equal(ibctesting.FirstConnectionID, s.TransferPath.EndpointA.ConnectionID, "stride connectionID")
Expand Down Expand Up @@ -166,7 +166,6 @@ func (s *AppTestHelper) CreateICAChannel(owner string) string {
icaPath = CopyConnectionAndClientToPath(icaPath, s.TransferPath)

// Register the ICA and complete the handshake

s.RegisterInterchainAccount(icaPath.EndpointA, owner)

err := icaPath.EndpointB.ChanOpenTry()
Expand All @@ -179,6 +178,7 @@ func (s *AppTestHelper) CreateICAChannel(owner string) string {
s.Require().NoError(err, "ChanOpenConfirm error")

s.Ctx = s.StrideChain.GetContext()

// Confirm the ICA channel was created properly
portID := icaPath.EndpointA.ChannelConfig.PortID
channelID := icaPath.EndpointA.ChannelID
Expand Down
2 changes: 1 addition & 1 deletion app/upgrades.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (app *StrideApp) setupUpgradeHandlers() {
// v5 upgrade handler
app.UpgradeKeeper.SetUpgradeHandler(
v5.UpgradeName,
v5.CreateUpgradeHandler(app.mm, app.configurator, app.InterchainqueryKeeper),
v5.CreateUpgradeHandler(app.mm, app.configurator, app.InterchainqueryKeeper, app.StakeibcKeeper),
)

upgradeInfo, err := app.UpgradeKeeper.ReadUpgradeInfoFromDisk()
Expand Down
8 changes: 8 additions & 0 deletions app/upgrades/v5/upgrades.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
upgradetypes "github.com/cosmos/cosmos-sdk/x/upgrade/types"

interchainquerykeeper "github.com/Stride-Labs/stride/v4/x/interchainquery/keeper"
stakeibckeeper "github.com/Stride-Labs/stride/v4/x/stakeibc/keeper"
stakeibctypes "github.com/Stride-Labs/stride/v4/x/stakeibc/types"
)

// Note: ensure these values are properly set before running upgrade
Expand All @@ -19,6 +21,7 @@ func CreateUpgradeHandler(
mm *module.Manager,
configurator module.Configurator,
interchainqueryKeeper interchainquerykeeper.Keeper,
stakeibcKeeper stakeibckeeper.Keeper,
) upgradetypes.UpgradeHandler {
return func(ctx sdk.Context, _ upgradetypes.Plan, vm module.VersionMap) (module.VersionMap, error) {
// Remove authz from store as it causes an issue with state sync
Expand All @@ -29,6 +32,11 @@ func CreateUpgradeHandler(
staleQueryId := "60b8e09dc7a65938cd6e6e5728b8aa0ca3726ffbe5511946a4f08ced316174ab"
interchainqueryKeeper.DeleteQuery(ctx, staleQueryId)

// Add the SafetyMaxSlashPercent param to the stakeibc param store
stakeibcParams := stakeibcKeeper.GetParams(ctx)
stakeibcParams.SafetyMaxSlashPercent = stakeibctypes.DefaultSafetyMaxSlashPercent
stakeibcKeeper.SetParams(ctx, stakeibcParams)

return mm.RunMigrations(ctx, configurator, vm)
}
}
5 changes: 3 additions & 2 deletions dockernet/config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ TX_LOGS=$DOCKERNET_HOME/logs/tx.log
KEYS_LOGS=$DOCKERNET_HOME/logs/keys.log

# List of hosts enabled
# `start-docker` defaults to just GAIA if HOST_CHAINS is empty
# `start-docker-all` always runs all hosts
HOST_CHAINS=()

# If no host zones are specified above:
# `start-docker` defaults to just GAIA if HOST_CHAINS is empty
# `start-docker-all` always runs all hosts
if [[ "${ALL_HOST_CHAINS:-false}" == "true" ]]; then
HOST_CHAINS=(GAIA JUNO OSMO STARS)
elif [[ "${#HOST_CHAINS[@]}" == "0" ]]; then
Expand Down
2 changes: 1 addition & 1 deletion dockernet/src/init_chain.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ set_host_genesis() {
# This makes it easier to test updating weights after a host zone validator is slashed
sed -i -E 's|"signed_blocks_window": "100"|"signed_blocks_window": "10"|g' $genesis_config
sed -i -E 's|"downtime_jail_duration": "600s"|"downtime_jail_duration": "10s"|g' $genesis_config
sed -i -E 's|"slash_fraction_downtime": "0.010000000000000000"|"slash_fraction_downtime": "0.100000000000000000"|g' $genesis_config
sed -i -E 's|"slash_fraction_downtime": "0.010000000000000000"|"slash_fraction_downtime": "0.050000000000000000"|g' $genesis_config
}

MAIN_ID=1 # Node responsible for genesis and persistent_peers
Expand Down
1 change: 1 addition & 0 deletions proto/stride/stakeibc/params.proto
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ message Params {
uint64 safety_max_redemption_rate_threshold = 15;
uint64 ibc_transfer_timeout_nanos = 16;
uint64 safety_num_validators = 17;
uint64 safety_max_slash_percent = 18;
}
2 changes: 0 additions & 2 deletions x/icacallbacks/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,6 @@ func (k Keeper) CallRegisteredICACallback(ctx sdk.Context, modulePacket channelt
} else {
k.Logger(ctx).Error(fmt.Sprintf("Callback %v has no associated callback", callbackData))
}
// QUESTION: Do we want to catch the case where the callback ID has not been registered?
// Maybe just as an info log if it's expected that some acks do not have an associated callback?

// remove the callback data
k.RemoveCallbackData(ctx, callbackDataKey)
Expand Down
189 changes: 84 additions & 105 deletions x/interchainquery/keeper/msg_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package keeper

import (
"context"
"fmt"
"net/url"
"sort"
"strings"
Expand All @@ -14,6 +13,7 @@ import (
tmclienttypes "github.com/cosmos/ibc-go/v5/modules/light-clients/07-tendermint/types"
"github.com/spf13/cast"

"github.com/Stride-Labs/stride/v4/utils"
"github.com/Stride-Labs/stride/v4/x/interchainquery/types"
)

Expand All @@ -30,174 +30,153 @@ func NewMsgServerImpl(keeper Keeper) types.MsgServer {
var _ types.MsgServer = msgServer{}

// check if the query requires proving; if it does, verify it!
func (k Keeper) VerifyKeyProof(ctx sdk.Context, msg *types.MsgSubmitQueryResponse, q types.Query) error {
pathParts := strings.Split(q.QueryType, "/")
func (k Keeper) VerifyKeyProof(ctx sdk.Context, msg *types.MsgSubmitQueryResponse, query types.Query) error {
pathParts := strings.Split(query.QueryType, "/")

// the query does NOT have an associated proof, so no need to verify it.
if pathParts[len(pathParts)-1] != "key" {
return nil
} else {
// the query is a "key" proof query -- verify the results are valid by checking the proof!
if msg.ProofOps == nil {
errMsg := fmt.Sprintf("[ICQ Resp] for query %s, unable to validate proof. No proof submitted", q.Id)
k.Logger(ctx).Error(errMsg)
return sdkerrors.Wrapf(types.ErrInvalidICQProof, errMsg)
}
connection, _ := k.IBCKeeper.ConnectionKeeper.GetConnection(ctx, q.ConnectionId)
}

msgHeight, err := cast.ToUint64E(msg.Height)
if err != nil {
return err
}
height := clienttypes.NewHeight(clienttypes.ParseChainID(q.ChainId), msgHeight+1)
consensusState, found := k.IBCKeeper.ClientKeeper.GetClientConsensusState(ctx, connection.ClientId, height)
if !found {
errMsg := fmt.Sprintf("[ICQ Resp] for query %s, consensus state not found for client %s and height %d", q.Id, connection.ClientId, height)
k.Logger(ctx).Error(errMsg)
return sdkerrors.Wrapf(types.ErrInvalidICQProof, errMsg)
}
// If the query is a "key" proof query, verify the results are valid by checking the poof
if msg.ProofOps == nil {
return sdkerrors.Wrapf(types.ErrInvalidICQProof, "Unable to validate proof. No proof submitted")
}

clientState, found := k.IBCKeeper.ClientKeeper.GetClientState(ctx, connection.ClientId)
if !found {
errMsg := fmt.Sprintf("[ICQ Resp] for query %s, unable to fetch client state for client %s and height %d", q.Id, connection.ClientId, height)
k.Logger(ctx).Error(errMsg)
return sdkerrors.Wrapf(types.ErrInvalidICQProof, errMsg)
}
path := commitmenttypes.NewMerklePath([]string{pathParts[1], url.PathEscape(string(q.Request))}...)
// Get the client consensus state at the height 1 block above the message height
msgHeight, err := cast.ToUint64E(msg.Height)
if err != nil {
return err
}
height := clienttypes.NewHeight(clienttypes.ParseChainID(query.ChainId), msgHeight+1)

merkleProof, err := commitmenttypes.ConvertProofs(msg.ProofOps)
if err != nil {
errMsg := fmt.Sprintf("[ICQ Resp] for query %s, error converting proofs", q.Id)
k.Logger(ctx).Error(errMsg)
return sdkerrors.Wrapf(types.ErrInvalidICQProof, errMsg)
}
// Get the client state and consensus state from the connection Id
connection, found := k.IBCKeeper.ConnectionKeeper.GetConnection(ctx, query.ConnectionId)
if !found {
return sdkerrors.Wrapf(types.ErrInvalidICQProof, "ConnectionId %s does not exist", query.ConnectionId)
}
consensusState, found := k.IBCKeeper.ClientKeeper.GetClientConsensusState(ctx, connection.ClientId, height)
if !found {
return sdkerrors.Wrapf(types.ErrInvalidICQProof, "Consensus state not found for client %s and height %d", connection.ClientId, height)
}
clientState, found := k.IBCKeeper.ClientKeeper.GetClientState(ctx, connection.ClientId)
if !found {
return sdkerrors.Wrapf(types.ErrInvalidICQProof, "Unable to fetch client state for client %s", connection.ClientId)
}
tmClientState, ok := clientState.(*tmclienttypes.ClientState)
if !ok {
return sdkerrors.Wrapf(types.ErrInvalidICQProof, "Client state is not tendermint")
}

tmclientstate, ok := clientState.(*tmclienttypes.ClientState)
if !ok {
errMsg := fmt.Sprintf("[ICQ Resp] for query %s, error unmarshaling client state %v", q.Id, clientState)
k.Logger(ctx).Error(errMsg)
return sdkerrors.Wrapf(types.ErrInvalidICQProof, errMsg)
// Get the merkle path and merkle proof
path := commitmenttypes.NewMerklePath([]string{pathParts[1], url.PathEscape(string(query.Request))}...)
merkleProof, err := commitmenttypes.ConvertProofs(msg.ProofOps)
if err != nil {
return sdkerrors.Wrapf(types.ErrInvalidICQProof, "Error converting proofs: %s", err.Error())
}

// If we got a non-nil response, verify inclusion proof
if len(msg.Result) != 0 {
if err := merkleProof.VerifyMembership(tmClientState.ProofSpecs, consensusState.GetRoot(), path, msg.Result); err != nil {
return sdkerrors.Wrapf(types.ErrInvalidICQProof, "Unable to verify membership proof: %s", err.Error())
}
k.Logger(ctx).Info(utils.LogICQCallbackWithHostZone(query.ChainId, query.CallbackId, "Inclusion proof validated - QueryId %s", query.Id))

if len(msg.Result) != 0 {
// if we got a non-nil response, verify inclusion proof.
if err := merkleProof.VerifyMembership(tmclientstate.ProofSpecs, consensusState.GetRoot(), path, msg.Result); err != nil {
errMsg := fmt.Sprintf("[ICQ Resp] for query %s, unable to verify membership proof: %s", q.Id, err)
k.Logger(ctx).Error(errMsg)
return sdkerrors.Wrapf(types.ErrInvalidICQProof, errMsg)
}
k.Logger(ctx).Info(fmt.Sprintf("Proof validated! module: %s, queryId %s", types.ModuleName, q.Id))

} else {
// if we got a nil response, verify non inclusion proof.
if err := merkleProof.VerifyNonMembership(tmclientstate.ProofSpecs, consensusState.GetRoot(), path); err != nil {
errMsg := fmt.Sprintf("[ICQ Resp] for query %s, unable to verify non-membership proof: %s", q.Id, err)
k.Logger(ctx).Error(errMsg)
return sdkerrors.Wrapf(types.ErrInvalidICQProof, errMsg)
}
k.Logger(ctx).Info(fmt.Sprintf("Non-inclusion Proof validated, stopping here! module: %s, queryId %s", types.ModuleName, q.Id))
} else {
// if we got a nil query response, verify non inclusion proof.
if err := merkleProof.VerifyNonMembership(tmClientState.ProofSpecs, consensusState.GetRoot(), path); err != nil {
return sdkerrors.Wrapf(types.ErrInvalidICQProof, "Unable to verify non-membership proof: %s", err.Error())
}
k.Logger(ctx).Info(utils.LogICQCallbackWithHostZone(query.ChainId, query.CallbackId, "Non-inclusion proof validated - QueryId %s", query.Id))
}

return nil
}

// call the query's associated callback function
func (k Keeper) InvokeCallback(ctx sdk.Context, msg *types.MsgSubmitQueryResponse, q types.Query) error {
// get all the stored queries and sort them for determinism
func (k Keeper) InvokeCallback(ctx sdk.Context, msg *types.MsgSubmitQueryResponse, query types.Query) error {
// get all the callback handlers and sort them for determinism
// (each module has their own callback handler)
moduleNames := []string{}
for moduleName := range k.callbacks {
moduleNames = append(moduleNames, moduleName)
}
sort.Strings(moduleNames)

// Loop through each module until the callbackId is found in one of the module handlers
for _, moduleName := range moduleNames {
k.Logger(ctx).Info(fmt.Sprintf("[ICQ Resp] executing callback for queryId (%s), module (%s)", q.Id, moduleName))
moduleCallbackHandler := k.callbacks[moduleName]

if moduleCallbackHandler.HasICQCallback(q.CallbackId) {
k.Logger(ctx).Info(fmt.Sprintf("[ICQ Resp] callback (%s) found for module (%s)", q.CallbackId, moduleName))
// call the correct callback function
err := moduleCallbackHandler.CallICQCallback(ctx, q.CallbackId, msg.Result, q)
if err != nil {
k.Logger(ctx).Error(fmt.Sprintf("[ICQ Resp] error in ICQ callback, error: %s, msg: %s, result: %v, type: %s, params: %v", err.Error(), msg.QueryId, msg.Result, q.QueryType, q.Request))
return err
}
} else {
k.Logger(ctx).Info(fmt.Sprintf("[ICQ Resp] callback not found for module (%s)", moduleName))
// Once the callback is found, invoke the function
if moduleCallbackHandler.HasICQCallback(query.CallbackId) {
return moduleCallbackHandler.CallICQCallback(ctx, query.CallbackId, msg.Result, query)
}
}
return nil
}

// verify the query has not exceeded its ttl
func (k Keeper) HasQueryExceededTtl(ctx sdk.Context, msg *types.MsgSubmitQueryResponse, query types.Query) (bool, error) {
k.Logger(ctx).Info(fmt.Sprintf("[ICQ Resp] query %s with ttl: %d, resp time: %d.", msg.QueryId, query.Ttl, ctx.BlockHeader().Time.UnixNano()))
currBlockTime, err := cast.ToUint64E(ctx.BlockTime().UnixNano())
if err != nil {
return false, err
}

if query.Ttl < currBlockTime {
errMsg := fmt.Sprintf("[ICQ Resp] aborting query callback due to ttl expiry! ttl is %d, time now %d for query of type %s with id %s, on chain %s",
query.Ttl, ctx.BlockTime().UnixNano(), query.QueryType, query.ChainId, msg.QueryId)
fmt.Println(errMsg)
k.Logger(ctx).Error(errMsg)
return true, nil
}
return false, nil
// If no callback was found, return an error
return types.ErrICQCallbackNotFound
}

// Handle ICQ query responses by validating the proof, and calling the query's corresponding callback
func (k msgServer) SubmitQueryResponse(goCtx context.Context, msg *types.MsgSubmitQueryResponse) (*types.MsgSubmitQueryResponseResponse, error) {
ctx := sdk.UnwrapSDKContext(goCtx)

// check if the response has an associated query stored on stride
q, found := k.GetQuery(ctx, msg.QueryId)
query, found := k.GetQuery(ctx, msg.QueryId)
if !found {
k.Logger(ctx).Info("[ICQ Resp] ignoring non-existent query response (note: duplicate responses are nonexistent)")
k.Logger(ctx).Info("ICQ RESPONSE | Ignoring non-existent query response (note: duplicate responses are nonexistent)")
return &types.MsgSubmitQueryResponseResponse{}, nil // technically this is an error, but will cause the entire tx to fail if we have one 'bad' message, so we can just no-op here.
}

defer ctx.EventManager().EmitEvents(sdk.Events{
sdk.NewEvent(
sdk.EventTypeMessage,
sdk.NewAttribute(sdk.AttributeKeyModule, types.AttributeValueCategory),
sdk.NewAttribute(types.AttributeKeyQueryId, q.Id),
sdk.NewAttribute(types.AttributeKeyQueryId, query.Id),
),
sdk.NewEvent(
"query_response",
sdk.NewAttribute(sdk.AttributeKeyModule, types.AttributeValueCategory),
sdk.NewAttribute(types.AttributeKeyQueryId, q.Id),
sdk.NewAttribute(types.AttributeKeyChainId, q.ChainId),
sdk.NewAttribute(types.AttributeKeyQueryId, query.Id),
sdk.NewAttribute(types.AttributeKeyChainId, query.ChainId),
),
})

// 1. verify the response's proof, if one exists
err := k.VerifyKeyProof(ctx, msg, q)
// Verify the response's proof, if one exists
err := k.VerifyKeyProof(ctx, msg, query)
if err != nil {
k.Logger(ctx).Error(utils.LogICQCallbackWithHostZone(query.ChainId, query.CallbackId,
"QUERY PROOF VERIFICATION FAILED - QueryId: %s, Error: %s", query.Id, err.Error()))
return nil, err
}
// 2. immediately delete the query so it cannot process again
k.DeleteQuery(ctx, q.Id)

// 3. verify the query's ttl is unexpired
ttlExceeded, err := k.HasQueryExceededTtl(ctx, msg, q)
// Immediately delete the query so it cannot process again
k.DeleteQuery(ctx, query.Id)

// Verify the query hasn't expired (if the block time is greater than the TTL timestamp, the query is expired)
currBlockTime, err := cast.ToUint64E(ctx.BlockTime().UnixNano())
if err != nil {
return nil, err
}
if ttlExceeded {
k.Logger(ctx).Info(fmt.Sprintf("[ICQ Resp] %s's ttl exceeded: %d < %d.", msg.QueryId, q.Ttl, ctx.BlockHeader().Time.UnixNano()))
if query.Ttl < currBlockTime {
k.Logger(ctx).Error(utils.LogICQCallbackWithHostZone(query.ChainId, query.CallbackId,
"QUERY TIMEOUT - QueryId: %s, TTL: %d, BlockTime: %d", query.Id, query.Ttl, ctx.BlockHeader().Time.UnixNano()))
return &types.MsgSubmitQueryResponseResponse{}, nil
}

// 4. if the query is contentless, end
// If the query is contentless, end
if len(msg.Result) == 0 {
k.Logger(ctx).Info(fmt.Sprintf("[ICQ Resp] query %s is contentless, removing from store.", msg.QueryId))
k.Logger(ctx).Info(utils.LogICQCallbackWithHostZone(query.ChainId, query.CallbackId,
"Query response is contentless - QueryId: %s", query.Id))
return &types.MsgSubmitQueryResponseResponse{}, nil
}

// 5. call the query's associated callback function
err = k.InvokeCallback(ctx, msg, q)
// Call the query's associated callback function
err = k.InvokeCallback(ctx, msg, query)
if err != nil {
k.Logger(ctx).Error(utils.LogICQCallbackWithHostZone(query.ChainId, query.CallbackId,
"Error invoking ICQ callback, error: %s, QueryId: %s, QueryType: %s, ConnectionId: %s, QueryRequest: %v, QueryReponse: %v",
err.Error(), msg.QueryId, query.QueryType, query.ConnectionId, query.Request, msg.Result))
return nil, err
}

Expand Down
Loading

0 comments on commit 9a8b757

Please sign in to comment.