Skip to content

Commit

Permalink
Node/Solana: Add shim support
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley committed Jan 16, 2025
1 parent 629c25d commit f236c1a
Show file tree
Hide file tree
Showing 5 changed files with 1,436 additions and 24 deletions.
14 changes: 11 additions & 3 deletions node/cmd/guardiand/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ var (

guardianKeyPath *string
guardianSignerUri *string
solanaContract *string

ethRPC *string
ethContract *string
Expand Down Expand Up @@ -153,7 +152,9 @@ var (
suiRPC *string
suiMoveEventType *string

solanaRPC *string
solanaRPC *string
solanaContract *string
solanaShimContract *string

pythnetContract *string
pythnetRPC *string
Expand Down Expand Up @@ -292,7 +293,8 @@ func init() {

guardianKeyPath = NodeCmd.Flags().String("guardianKey", "", "Path to guardian key")
guardianSignerUri = NodeCmd.Flags().String("guardianSignerUri", "", "Guardian signer URI")
solanaContract = NodeCmd.Flags().String("solanaContract", "", "Address of the Solana program (required)")
solanaContract = NodeCmd.Flags().String("solanaContract", "", "Address of the Solana program (required if solanaRpc is specified)")
solanaShimContract = NodeCmd.Flags().String("solanaShimContract", "", "Address of the Solana shim program")

ethRPC = node.RegisterFlagWithValidationOrFail(NodeCmd, "ethRPC", "Ethereum RPC URL", "ws://eth-devnet:8545", []string{"ws", "wss"})
ethContract = NodeCmd.Flags().String("ethContract", "", "Ethereum contract address")
Expand Down Expand Up @@ -847,6 +849,10 @@ func runNode(cmd *cobra.Command, args []string) {
logger.Fatal("Both --solanaContract and --solanaRPC must be set or both unset")
}

if *solanaShimContract != "" && *solanaContract == "" {
logger.Fatal("--solanaShimContract may only be specified if --solanaContract is specified")
}

if !argsConsistent([]string{*pythnetContract, *pythnetRPC, *pythnetWS}) {
logger.Fatal("Either --pythnetContract, --pythnetRPC and --pythnetWS must all be set or all unset")
}
Expand Down Expand Up @@ -1589,6 +1595,7 @@ func runNode(cmd *cobra.Command, args []string) {
Rpc: *solanaRPC,
Websocket: "",
Contract: *solanaContract,
ShimContract: *solanaShimContract,
ReceiveObsReq: false,
Commitment: rpc.CommitmentConfirmed,
}
Expand All @@ -1602,6 +1609,7 @@ func runNode(cmd *cobra.Command, args []string) {
Rpc: *solanaRPC,
Websocket: "",
Contract: *solanaContract,
ShimContract: *solanaShimContract,
ReceiveObsReq: true,
Commitment: rpc.CommitmentFinalized,
}
Expand Down
124 changes: 104 additions & 20 deletions node/pkg/watchers/solana/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ type (

ccqConfig query.PerChainConfig
ccqLogger *zap.Logger

shimContractStr string
shimContractAddr solana.PublicKey
shimEnabled bool
shimPostMessageDiscriminator []byte
shimMessageEventDiscriminator []byte
}

EventSubscriptionError struct {
Expand Down Expand Up @@ -221,6 +227,8 @@ func NewSolanaWatcher(
chainID vaa.ChainID,
queryReqC <-chan *query.PerChainQueryInternal,
queryResponseC chan<- *query.PerChainQueryResponseInternal,
shimContractStr string,
shimContractAddr solana.PublicKey,
) *SolanaWatcher {
msgObservedLogLevel := zapcore.InfoLevel
if chainID == vaa.ChainIDPythNet {
Expand All @@ -243,6 +251,8 @@ func NewSolanaWatcher(
queryReqC: queryReqC,
queryResponseC: queryResponseC,
ccqConfig: query.GetPerChainConfig(chainID),
shimContractStr: shimContractStr,
shimContractAddr: shimContractAddr,
}
}

Expand Down Expand Up @@ -334,10 +344,13 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
zap.String("wsUrl", wsUrl),
zap.String("contract", contractAddr),
zap.String("rawContract", s.rawContract),
zap.String("shimContractAddr", s.shimContractStr),
)

logger.Info("Solana watcher connecting to RPC node ", zap.String("url", s.rpcUrl))

s.shimSetup()

s.errC = make(chan error)
s.pumpData = make(chan []byte)

Expand Down Expand Up @@ -561,6 +574,7 @@ func (s *SolanaWatcher) fetchBlock(ctx context.Context, logger *zap.Logger, slot

// If the logs don't contain the contract address, skip the transaction.
// ex: "Program 3u8hJUVTA4jH1wYAyUur7FFZVQ8H635K3tSHHF4ssjQ5 invoke [2]",
// Assumption: Transactions for the shim contract also contain the core contract address so this check is still valid.
var possiblyWormhole bool
for i := 0; i < len(txRpc.Meta.LogMessages) && !possiblyWormhole; i++ {
possiblyWormhole = strings.HasPrefix(txRpc.Meta.LogMessages[i], s.whLogPrefix)
Expand Down Expand Up @@ -592,10 +606,16 @@ func (s *SolanaWatcher) fetchBlock(ctx context.Context, logger *zap.Logger, slot

signature := tx.Signatures[0]
var programIndex uint16
var shimProgramIndex uint16
var shimFound bool
for n, key := range tx.Message.AccountKeys {
if key.Equals(s.contract) {
programIndex = uint16(n)
}
if s.shimEnabled && key.Equals(s.shimContractAddr) {
shimProgramIndex = uint16(n)
shimFound = true
}
}
if programIndex == 0 {
continue
Expand All @@ -608,41 +628,42 @@ func (s *SolanaWatcher) fetchBlock(ctx context.Context, logger *zap.Logger, slot
zap.String("commitment", string(s.commitment)))
}

alreadyProcessed := ShimAlreadyProcessed{}

// Find top-level instructions
for i, inst := range tx.Message.Instructions {
found, err := s.processInstruction(ctx, logger, slot, inst, programIndex, tx, signature, i, isReobservation)
if err != nil {
logger.Error("malformed Wormhole instruction",
zap.Error(err),
zap.Int("idx", i),
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)),
zap.Binary("data", inst.Data))
} else if found {
if logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("found a top-level Wormhole instruction",
if shimFound && inst.ProgramIDIndex == shimProgramIndex {
found, err := s.shimProcessTopLevelInstruction(logger, programIndex, shimProgramIndex, tx, txRpc.Meta.InnerInstructions, i, alreadyProcessed, isReobservation)
if err != nil {
logger.Error("malformed wormhole shim instruction",
zap.Error(err),
zap.Int("idx", i),
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
zap.String("commitment", string(s.commitment)),
zap.Binary("data", inst.Data))
} else if found {
if logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("found a top-level wormhole shim instruction",
zap.Int("idx", i),
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
}
}
}
}

for _, inner := range txRpc.Meta.InnerInstructions {
for i, inst := range inner.Instructions {
} else {
found, err := s.processInstruction(ctx, logger, slot, inst, programIndex, tx, signature, i, isReobservation)
if err != nil {
logger.Error("malformed Wormhole instruction",
zap.Error(err),
zap.Int("idx", i),
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
zap.String("commitment", string(s.commitment)),
zap.Binary("data", inst.Data))
} else if found {
if logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("found an inner Wormhole instruction",
logger.Debug("found a top-level Wormhole instruction",
zap.Int("idx", i),
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
Expand All @@ -651,6 +672,54 @@ func (s *SolanaWatcher) fetchBlock(ctx context.Context, logger *zap.Logger, slot
}
}
}

for outerIdx, inner := range txRpc.Meta.InnerInstructions {
for innerIdx, inst := range inner.Instructions {
if !alreadyProcessed.exists(outerIdx, innerIdx) {
if shimFound && inst.ProgramIDIndex == shimProgramIndex {
found, err := s.shimProcessInnerInstruction(logger, programIndex, shimProgramIndex, tx, inner.Instructions, outerIdx, innerIdx, alreadyProcessed, isReobservation)
if err != nil {
logger.Error("malformed inner wormhole shim instruction",
zap.Error(err),
zap.Int("outerIdx", outerIdx),
zap.Int("innerIdx", innerIdx),
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
} else if found {
if logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("found an inner wormhole shim instruction",
zap.Int("outerIdx", outerIdx),
zap.Int("innerIdx", innerIdx),
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
}
}
} else {
found, err := s.processInstruction(ctx, logger, slot, inst, programIndex, tx, signature, innerIdx, isReobservation)
if err != nil {
logger.Error("malformed Wormhole instruction",
zap.Error(err),
zap.Int("outerIdx", outerIdx),
zap.Int("innerIdx", innerIdx),
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
} else if found {
if logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("found an inner Wormhole instruction",
zap.Int("outerIdx", outerIdx),
zap.Int("innerIdx", innerIdx),
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
}
}
}
}
}
}
}

if emptyRetry > 0 && logger.Level().Enabled(zapcore.DebugLevel) {
Expand Down Expand Up @@ -933,6 +1002,21 @@ func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, a
Unreliable: !reliable,
}

if !reliable && len(observation.Payload) == 0 {
logger.Debug("ignoring an observation because it is marked unreliable and has a zero length payload, probably from the shim",
zap.Stringer("account", acc),
zap.Time("timestamp", observation.Timestamp),
zap.Uint32("nonce", observation.Nonce),
zap.Uint64("sequence", observation.Sequence),
zap.Stringer("emitter_chain", observation.EmitterChain),
zap.Stringer("emitter_address", observation.EmitterAddress),
zap.Bool("isReobservation", isReobservation),
zap.Binary("payload", observation.Payload),
zap.Uint8("consistency_level", observation.ConsistencyLevel),
)
return
}

solanaMessagesConfirmed.WithLabelValues(s.networkName).Inc()

if logger.Level().Enabled(s.msgObservedLogLevel) {
Expand Down
12 changes: 11 additions & 1 deletion node/pkg/watchers/solana/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/certusone/wormhole/node/pkg/supervisor"
"github.com/certusone/wormhole/node/pkg/watchers"
"github.com/certusone/wormhole/node/pkg/watchers/interfaces"
"github.com/gagliardetto/solana-go"
solana_types "github.com/gagliardetto/solana-go"
solana_rpc "github.com/gagliardetto/solana-go/rpc"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
Expand All @@ -19,6 +20,7 @@ type WatcherConfig struct {
Rpc string // RPC URL
Websocket string // Websocket URL
Contract string // hex representation of the contract address
ShimContract string // Address of the shim contract (empty string if disabled)
Commitment solana_rpc.CommitmentType
}

Expand Down Expand Up @@ -51,11 +53,19 @@ func (wc *WatcherConfig) Create(
return nil, nil, err
}

var shimContractAddr solana.PublicKey
if wc.ShimContract != "" {
shimContractAddr, err = solana_types.PublicKeyFromBase58(wc.Contract)
if err != nil {
return nil, nil, err
}
}

if !wc.ReceiveObsReq {
obsvReqC = nil
}

watcher := NewSolanaWatcher(wc.Rpc, &wc.Websocket, solAddress, wc.Contract, msgC, obsvReqC, wc.Commitment, wc.ChainID, queryReqC, queryResponseC)
watcher := NewSolanaWatcher(wc.Rpc, &wc.Websocket, solAddress, wc.Contract, msgC, obsvReqC, wc.Commitment, wc.ChainID, queryReqC, queryResponseC, wc.ShimContract, shimContractAddr)

return watcher, watcher.Run, nil
}
Loading

0 comments on commit f236c1a

Please sign in to comment.