Skip to content

Commit

Permalink
Node/Solana: Reobservation by transaction ID (#4223)
Browse files Browse the repository at this point in the history
* Node/Solana: Reobservation by transaction ID

* Code review rework
  • Loading branch information
bruce-riley authored Jan 16, 2025
1 parent 28223dc commit 8cf8650
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 67 deletions.
178 changes: 111 additions & 67 deletions node/pkg/watchers/solana/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"encoding/base64"
"encoding/hex"
"encoding/json"

"github.com/certusone/wormhole/node/pkg/common"
Expand Down Expand Up @@ -125,6 +126,16 @@ type (
}
)

const (
// NOTE: We have a test to make sure these constants don't change in solana-go.

// SolanaAccountLen is the expected length of an account identifier, which is a public key. Using the number here because that's what the admin client will populate.
SolanaAccountLen = 32

// SolanaSignatureLen is the expected length of a signature. As of v1.12.0, solana-go does not have a const for this.
SolanaSignatureLen = 64
)

var (
emptyAddressBytes = vaa.Address{}.Bytes()
emptyGapBytes = []byte{0, 0, 0}
Expand Down Expand Up @@ -370,13 +381,39 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
if m.ChainId != uint32(s.chainID) {
panic("unexpected chain id")
}

acc := solana.PublicKeyFromBytes(m.TxHash)
logger.Info("received observation request", zap.String("account", acc.String()))

rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
s.fetchMessageAccount(rCtx, logger, acc, 0, true)
cancel()
if len(m.TxHash) == SolanaAccountLen { // Request by account ID
acc := solana.PublicKeyFromBytes(m.TxHash)
logger.Info("received observation request with account id", zap.String("account", acc.String()))
rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
s.fetchMessageAccount(rCtx, logger, acc, 0, true)
cancel()
} else if len(m.TxHash) == SolanaSignatureLen { // Request by transaction ID
signature := solana.SignatureFromBytes(m.TxHash)
logger.Info("received observation request with transaction id", zap.Stringer("signature", signature))
rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
version := uint64(0)
result, err := s.rpcClient.GetTransaction(
rCtx,
signature,
&rpc.GetTransactionOpts{
MaxSupportedTransactionVersion: &version,
Encoding: solana.EncodingBase64,
},
)
cancel()
if err != nil {
logger.Error("failed to get transaction for observation request", zap.String("bytes", hex.EncodeToString(m.TxHash)), zap.Stringer("signature", signature), zap.Error(err))
} else {
tx, err := result.Transaction.GetTransaction()
if err != nil {
logger.Error("failed to extract transaction for observation request", zap.String("bytes", hex.EncodeToString(m.TxHash)), zap.Stringer("signature", signature), zap.Error(err))
} else {
s.processTransaction(ctx, logger, tx, result.Meta, result.Slot, true)
}
}
} else {
logger.Error("ignoring an observation request of unexpected length", zap.Int("len", len(m.TxHash)), zap.String("bytes", hex.EncodeToString(m.TxHash)))
}
case <-timer.C:
// Get current slot height
rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
Expand Down Expand Up @@ -580,87 +617,94 @@ func (s *SolanaWatcher) fetchBlock(ctx context.Context, logger *zap.Logger, slot
continue
}

err = s.populateLookupTableAccounts(ctx, tx)
if err != nil {
logger.Error("failed to fetch lookup table accounts",
zap.Uint64("slot", slot),
zap.Int("txNum", txNum),
zap.Error(err),
)
continue
}
s.processTransaction(ctx, logger, tx, txRpc.Meta, slot, false)
}

signature := tx.Signatures[0]
var programIndex uint16
for n, key := range tx.Message.AccountKeys {
if key.Equals(s.contract) {
programIndex = uint16(n)
}
}
if programIndex == 0 {
continue
if emptyRetry > 0 && logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("skipped or unavailable block retrieved on retry attempt",
zap.Uint("empty_retry", emptyRetry),
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
}

return true
}

// processTransaction processes a transaction and publishes any Wormhole events.
func (s *SolanaWatcher) processTransaction(ctx context.Context, logger *zap.Logger, tx *solana.Transaction, meta *rpc.TransactionMeta, slot uint64, isReobservation bool) {
signature := tx.Signatures[0]
err := s.populateLookupTableAccounts(ctx, tx)
if err != nil {
logger.Error("failed to fetch lookup table accounts for transaction",
zap.Uint64("slot", slot),
zap.Stringer("signature", signature),
zap.Error(err),
)
return
}

var programIndex uint16
for n, key := range tx.Message.AccountKeys {
if key.Equals(s.contract) {
programIndex = uint16(n)
}
}
if programIndex == 0 {
return
}

if logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("found Wormhole transaction",
if logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("found Wormhole transaction",
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
}

// Find top-level instructions
for i, inst := range tx.Message.Instructions {
found, err := s.processInstruction(ctx, logger, slot, inst, programIndex, tx, signature, i, isReobservation)
if err != nil {
logger.Error("malformed Wormhole instruction",
zap.Error(err),
zap.Int("idx", i),
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
zap.String("commitment", string(s.commitment)),
zap.Binary("data", inst.Data))
} else if found {
if logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("found a top-level Wormhole instruction",
zap.Int("idx", i),
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
}
}
}

// Find top-level instructions
for i, inst := range tx.Message.Instructions {
found, err := s.processInstruction(ctx, logger, slot, inst, programIndex, tx, signature, i, isReobservation)
for outerIdx, inner := range meta.InnerInstructions {
for innerIdx, inst := range inner.Instructions {
found, err := s.processInstruction(ctx, logger, slot, inst, programIndex, tx, signature, innerIdx, isReobservation)
if err != nil {
logger.Error("malformed Wormhole instruction",
zap.Error(err),
zap.Int("idx", i),
zap.Int("outerIdx", outerIdx),
zap.Int("innerIdx", innerIdx),
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)),
zap.Binary("data", inst.Data))
zap.String("commitment", string(s.commitment)))
} else if found {
if logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("found a top-level Wormhole instruction",
zap.Int("idx", i),
logger.Debug("found an inner Wormhole instruction",
zap.Int("outerIdx", outerIdx),
zap.Int("innerIdx", innerIdx),
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
}
}
}

for _, inner := range txRpc.Meta.InnerInstructions {
for i, inst := range inner.Instructions {
found, err := s.processInstruction(ctx, logger, slot, inst, programIndex, tx, signature, i, isReobservation)
if err != nil {
logger.Error("malformed Wormhole instruction",
zap.Error(err),
zap.Int("idx", i),
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
} else if found {
if logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("found an inner Wormhole instruction",
zap.Int("idx", i),
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
}
}
}
}
}

if emptyRetry > 0 && logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("skipped or unavailable block retrieved on retry attempt",
zap.Uint("empty_retry", emptyRetry),
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
}

return true
}

func (s *SolanaWatcher) processInstruction(ctx context.Context, logger *zap.Logger, slot uint64, inst solana.CompiledInstruction, programIndex uint16, tx *solana.Transaction, signature solana.Signature, idx int, isReobservation bool) (bool, error) {
Expand Down
15 changes: 15 additions & 0 deletions node/pkg/watchers/solana/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package solana

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/gagliardetto/solana-go"
)

func TestVerifyConstants(t *testing.T) {
// If either of these ever change, message publication and reobservation will break.
assert.Equal(t, SolanaAccountLen, solana.PublicKeyLength)
assert.Equal(t, SolanaSignatureLen, len(solana.Signature{}))
}

0 comments on commit 8cf8650

Please sign in to comment.