diff --git a/node/hack/accountant/send_obs.go b/node/hack/accountant/send_obs.go index f64bdbf843..24b77d5de2 100644 --- a/node/hack/accountant/send_obs.go +++ b/node/hack/accountant/send_obs.go @@ -58,6 +58,7 @@ func main() { return } + // Don't increment the sequence number here. if !testSubmit(ctx, logger, guardianSigner, wormchainConn, contract, "0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16", timestamp, sequence, true, "Already commited should succeed") { return } @@ -82,6 +83,11 @@ func main() { return } + sequence += 10 + if !testBigBatch(ctx, logger, guardianSigner, wormchainConn, contract, "0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16", timestamp, sequence, true, "Submit of big batch should succeed") { + return + } + logger.Info("Success! All tests passed!") } @@ -102,7 +108,7 @@ func testSubmit( Payload, _ := hex.DecodeString("010000000000000000000000000000000000000000000000000de0b6b3a76400000000000000000000000000002d8be6bf0baa74e0a907016679cae9190e80dd0a0002000000000000000000000000c10820983f33456ce7beb3a046f5a83fa34f027d0c200000000000000000000000000000000000000000000000000000000000000000") msg := common.MessagePublication{ - TxHash: TxHash, + TxID: TxHash.Bytes(), Timestamp: timestamp, Nonce: uint32(0), Sequence: sequence, @@ -167,7 +173,7 @@ func testBatch( msgs := []*common.MessagePublication{} msg1 := common.MessagePublication{ - TxHash: TxHash, + TxID: TxHash.Bytes(), Timestamp: timestamp, Nonce: nonce, Sequence: sequence, @@ -181,7 +187,7 @@ func testBatch( nonce = nonce + 1 sequence = sequence + 1 msg2 := common.MessagePublication{ - TxHash: TxHash, + TxID: TxHash.Bytes(), Timestamp: time.Now(), Nonce: nonce, Sequence: sequence, @@ -246,7 +252,7 @@ func testBatchWithcommitted( logger.Info("submitting a single transfer that should work") msg1 := common.MessagePublication{ - TxHash: TxHash, + TxID: TxHash.Bytes(), Timestamp: timestamp, Nonce: nonce, Sequence: sequence, @@ -269,7 +275,7 @@ func testBatchWithcommitted( nonce = nonce + 1 sequence = sequence + 1 msg2 := common.MessagePublication{ - TxHash: TxHash, + TxID: TxHash.Bytes(), Timestamp: time.Now(), Nonce: nonce, Sequence: sequence, @@ -338,7 +344,7 @@ func testBatchWithDigestError( logger.Info("submitting a single transfer that should work") msg1 := common.MessagePublication{ - TxHash: TxHash, + TxID: TxHash.Bytes(), Timestamp: timestamp, Nonce: nonce, Sequence: sequence, @@ -361,7 +367,7 @@ func testBatchWithDigestError( nonce = nonce + 1 sequence = sequence + 1 msg2 := common.MessagePublication{ - TxHash: TxHash, + TxID: TxHash.Bytes(), Timestamp: time.Now(), Nonce: nonce, Sequence: sequence, @@ -440,3 +446,72 @@ func submit( return accountant.SubmitObservationsToContract(ctx, logger, guardianSigner, gsIndex, guardianIndex, wormchainConn, contract, accountant.SubmitObservationPrefix, msgs) } + +func testBigBatch( + ctx context.Context, + logger *zap.Logger, + guardianSigner guardiansigner.GuardianSigner, + wormchainConn *wormconn.ClientConn, + contract string, + emitterAddressStr string, + timestamp time.Time, + sequence uint64, + expectedResult bool, + tag string, +) bool { + EmitterAddress, _ := vaa.StringToAddress(emitterAddressStr) + TxHash := []byte("0123456789012345678901234567890123456789012345678901234567890123") // 64 bytes, the size of a Solana signature. + Payload, _ := hex.DecodeString("010000000000000000000000000000000000000000000000000de0b6b3a76400000000000000000000000000002d8be6bf0baa74e0a907016679cae9190e80dd0a0002000000000000000000000000c10820983f33456ce7beb3a046f5a83fa34f027d0c200000000000000000000000000000000000000000000000000000000000000000") + + msgs := []*common.MessagePublication{} + for idx := 0; idx < 10; idx++ { + msg := common.MessagePublication{ + TxID: TxHash, + Timestamp: timestamp, + Nonce: uint32(0), + Sequence: sequence, + EmitterChain: vaa.ChainIDEthereum, + EmitterAddress: EmitterAddress, + ConsistencyLevel: uint8(15), + Payload: Payload, + } + + msgs = append(msgs, &msg) + sequence += 1 + } + + txResp, err := submit(ctx, logger, guardianSigner, wormchainConn, contract, msgs) + if err != nil { + logger.Error("failed to broadcast Observation request", zap.String("test", tag), zap.Error(err)) + return false + } + + responses, err := accountant.GetObservationResponses(txResp) + if err != nil { + logger.Error("failed to get responses", zap.Error(err)) + return false + } + + if len(responses) != len(msgs) { + logger.Error("number of responses does not match number of messages", zap.Int("numMsgs", len(msgs)), zap.Int("numResp", len(responses)), zap.Error(err)) + return false + } + + msgId := msgs[0].MessageIDString() + status, exists := responses[msgId] + if !exists { + logger.Info("test failed: did not receive an observation response for message", zap.String("test", tag), zap.String("msgId", msgId)) + return false + } + + committed := status.Type == "committed" + + if committed != expectedResult { + logger.Info("test failed", zap.String("test", tag), zap.Uint64("seqNo", sequence), zap.Bool("committed", committed), + zap.String("response", wormchainConn.BroadcastTxResponseToString(txResp))) + return false + } + + logger.Info("test of big batch succeeded", zap.String("test", tag)) + return true +} diff --git a/node/hack/repair_terra/repair.go b/node/hack/repair_terra/repair.go index 224a596ad4..b9aa260bed 100644 --- a/node/hack/repair_terra/repair.go +++ b/node/hack/repair_terra/repair.go @@ -237,7 +237,7 @@ func EventsToMessagePublications(contract string, txHash string, events []gjson. continue } messagePublication := &common.MessagePublication{ - TxHash: txHashValue, + TxID: txHashValue.Bytes(), Timestamp: time.Unix(blockTimeInt, 0), Nonce: uint32(nonceInt), Sequence: sequenceInt, diff --git a/node/pkg/accountant/accountant_test.go b/node/pkg/accountant/accountant_test.go index aae4d50b71..265cf89747 100644 --- a/node/pkg/accountant/accountant_test.go +++ b/node/pkg/accountant/accountant_test.go @@ -137,13 +137,13 @@ func newAccountantForTest( return acct } -// Converts a string into a go-ethereum Hash object used as test input. -func hashFromString(str string) ethCommon.Hash { //nolint:unparam +// Converts a TxHash string into a byte array to be used as a TxID. +func hashToTxID(str string) []byte { if (len(str) > 2) && (str[0] == '0') && (str[1] == 'x') { str = str[2:] } - return ethCommon.HexToHash(str) + return ethCommon.HexToHash(str).Bytes() } // Note this method assumes 18 decimals for the amount. @@ -188,7 +188,7 @@ func TestVaaFromUninterestingEmitter(t *testing.T) { var payload = []byte{1, 97, 97, 97, 97, 97} msg := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4064"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(1), @@ -216,7 +216,7 @@ func TestVaaForUninterestingPayloadType(t *testing.T) { var payload = []byte{2, 97, 97, 97, 97, 97} msg := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(1), @@ -251,7 +251,7 @@ func TestInterestingTransferShouldNotBeBlockedWhenNotEnforcingAccountant(t *test ) msg := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(1), @@ -295,7 +295,7 @@ func TestInterestingTransferShouldBeBlockedWhenEnforcingAccountant(t *testing.T) ) msg := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(1), @@ -347,7 +347,7 @@ func TestForDeadlock(t *testing.T) { ) msg := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(1683136244), @@ -374,7 +374,7 @@ func TestForDeadlock(t *testing.T) { assert.Equal(t, 1, len(acct.msgChan)) msg2 := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(1683136244), diff --git a/node/pkg/accountant/audit.go b/node/pkg/accountant/audit.go index ea8c6c6f98..a179764230 100644 --- a/node/pkg/accountant/audit.go +++ b/node/pkg/accountant/audit.go @@ -108,7 +108,7 @@ func (mo *MissingObservation) makeAuditKey() string { // makeAuditKey creates an audit map key from a pending observation entry. func (pe *pendingEntry) makeAuditKey() string { - return fmt.Sprintf("%d-%s", pe.msg.EmitterChain, strings.TrimPrefix(pe.msg.TxHash.String(), "0x")) + return fmt.Sprintf("%d-%s", pe.msg.EmitterChain, strings.TrimPrefix(pe.msg.TxIDString(), "0x")) } // audit is the runnable that executes the audit each interval. diff --git a/node/pkg/accountant/ntt_test.go b/node/pkg/accountant/ntt_test.go index eeafab8278..8fd78db451 100644 --- a/node/pkg/accountant/ntt_test.go +++ b/node/pkg/accountant/ntt_test.go @@ -50,7 +50,7 @@ func TestNttParseMsgSuccess(t *testing.T) { } msg := &common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(42), Sequence: uint64(123456), @@ -77,7 +77,7 @@ func TestNttParseMsgWrongEmitterChain(t *testing.T) { } msg := &common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(42), Sequence: uint64(123456), @@ -106,7 +106,7 @@ func TestNttParseMsgWrongEmitterAddress(t *testing.T) { } msg := &common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(42), Sequence: uint64(123456), @@ -221,7 +221,7 @@ func TestNttParseArMsgSuccess(t *testing.T) { require.NoError(t, err) msg := &common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1708575745), 0), Nonce: uint32(0), Sequence: uint64(259), @@ -258,7 +258,7 @@ func TestNttParseArMsgUnknownArEmitter(t *testing.T) { require.NoError(t, err) msg := &common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1708575745), 0), Nonce: uint32(0), Sequence: uint64(259), diff --git a/node/pkg/accountant/submit_obs.go b/node/pkg/accountant/submit_obs.go index 513e14a210..622a5b904a 100644 --- a/node/pkg/accountant/submit_obs.go +++ b/node/pkg/accountant/submit_obs.go @@ -308,7 +308,7 @@ func SubmitObservationsToContract( obs := make([]Observation, len(msgs)) for idx, msg := range msgs { obs[idx] = Observation{ - TxHash: msg.TxHash.Bytes(), + TxHash: msg.TxID, Timestamp: uint32(msg.Timestamp.Unix()), Nonce: msg.Nonce, EmitterChain: uint16(msg.EmitterChain), @@ -321,7 +321,7 @@ func SubmitObservationsToContract( logger.Debug("in SubmitObservationsToContract, encoding observation", zap.String("contract", contract), zap.Int("idx", idx), - zap.String("txHash", msg.TxHash.String()), zap.String("encTxHash", hex.EncodeToString(obs[idx].TxHash[:])), + zap.String("txHash", msg.TxIDString()), zap.String("encTxHash", hex.EncodeToString(obs[idx].TxHash[:])), zap.Stringer("timeStamp", msg.Timestamp), zap.Uint32("encTimestamp", obs[idx].Timestamp), zap.Uint32("nonce", msg.Nonce), zap.Uint32("encNonce", obs[idx].Nonce), zap.Stringer("emitterChain", msg.EmitterChain), zap.Uint16("encEmitterChain", obs[idx].EmitterChain), diff --git a/node/pkg/accountant/watcher.go b/node/pkg/accountant/watcher.go index 52ccaf37ce..deace9deff 100644 --- a/node/pkg/accountant/watcher.go +++ b/node/pkg/accountant/watcher.go @@ -11,8 +11,6 @@ import ( "github.com/certusone/wormhole/node/pkg/common" "github.com/wormhole-foundation/wormhole/sdk/vaa" - ethCommon "github.com/ethereum/go-ethereum/common" - tmAbci "github.com/tendermint/tendermint/abci/types" tmHttp "github.com/tendermint/tendermint/rpc/client/http" tmCoreTypes "github.com/tendermint/tendermint/rpc/core/types" @@ -181,7 +179,7 @@ func (acct *Accountant) processPendingTransfer(xfer *WasmObservation, tag string ) msg := &common.MessagePublication{ - TxHash: ethCommon.BytesToHash(xfer.TxHash), + TxID: xfer.TxHash, Timestamp: time.Unix(int64(xfer.Timestamp), 0), Nonce: xfer.Nonce, Sequence: xfer.Sequence, diff --git a/node/pkg/adminrpc/adminserver.go b/node/pkg/adminrpc/adminserver.go index 2f345b9807..6f08776738 100644 --- a/node/pkg/adminrpc/adminserver.go +++ b/node/pkg/adminrpc/adminserver.go @@ -798,7 +798,7 @@ func (s *nodePrivilegedService) InjectGovernanceVAA(ctx context.Context, req *no vaaInjectionsTotal.Inc() s.injectC <- &common.MessagePublication{ - TxHash: ethcommon.Hash{}, + TxID: ethcommon.Hash{}.Bytes(), Timestamp: v.Timestamp, Nonce: v.Nonce, Sequence: v.Sequence, diff --git a/node/pkg/common/chainlock.go b/node/pkg/common/chainlock.go index 052d1bbd5b..54b0749189 100644 --- a/node/pkg/common/chainlock.go +++ b/node/pkg/common/chainlock.go @@ -7,19 +7,19 @@ import ( "encoding/json" "errors" "fmt" + "math" "time" + "github.com/ethereum/go-ethereum/common" "github.com/wormhole-foundation/wormhole/sdk/vaa" "go.uber.org/zap" - - "github.com/ethereum/go-ethereum/common" ) const HashLength = 32 const AddressLength = 32 type MessagePublication struct { - TxHash common.Hash // TODO: rename to identifier? on Solana, this isn't actually the tx hash + TxID []byte Timestamp time.Time Nonce uint32 @@ -35,6 +35,10 @@ type MessagePublication struct { Unreliable bool } +func (msg *MessagePublication) TxIDString() string { + return "0x" + hex.EncodeToString(msg.TxID) +} + func (msg *MessagePublication) MessageID() []byte { return []byte(msg.MessageIDString()) } @@ -48,7 +52,12 @@ const minMsgLength = 88 // Marshalled length with empty payload func (msg *MessagePublication) Marshal() ([]byte, error) { buf := new(bytes.Buffer) - buf.Write(msg.TxHash[:]) + if len(msg.TxID) > math.MaxUint8 { + return nil, errors.New("TxID too long") + } + vaa.MustWrite(buf, binary.BigEndian, uint8(len(msg.TxID))) + buf.Write(msg.TxID) + vaa.MustWrite(buf, binary.BigEndian, uint32(msg.Timestamp.Unix())) vaa.MustWrite(buf, binary.BigEndian, msg.Nonce) vaa.MustWrite(buf, binary.BigEndian, msg.Sequence) @@ -61,12 +70,10 @@ func (msg *MessagePublication) Marshal() ([]byte, error) { return buf.Bytes(), nil } -const oldMinMsgLength = 83 // Old marshalled length with empty payload - -// UnmarshalOldMessagePublicationBeforeIsReobservation deserializes a MessagePublication from prior to the addition of IsReobservation. +// UnmarshalOldMessagePublicationWithTxHash deserializes a MessagePublication from when the TxHash was a fixed size ethCommon.Hash. // This function can be deleted once all guardians have been upgraded. That's why the code is just duplicated. -func UnmarshalOldMessagePublicationBeforeIsReobservation(data []byte) (*MessagePublication, error) { - if len(data) < oldMinMsgLength { +func UnmarshalOldMessagePublicationWithTxHash(data []byte) (*MessagePublication, error) { + if len(data) < minMsgLength { return nil, errors.New("message is too short") } @@ -78,7 +85,7 @@ func UnmarshalOldMessagePublicationBeforeIsReobservation(data []byte) (*MessageP if n, err := reader.Read(txHash[:]); err != nil || n != HashLength { return nil, fmt.Errorf("failed to read TxHash [%d]: %w", n, err) } - msg.TxHash = txHash + msg.TxID = txHash.Bytes() unixSeconds := uint32(0) if err := binary.Read(reader, binary.BigEndian, &unixSeconds); err != nil { @@ -108,6 +115,10 @@ func UnmarshalOldMessagePublicationBeforeIsReobservation(data []byte) (*MessageP } msg.EmitterAddress = emitterAddress + if err := binary.Read(reader, binary.BigEndian, &msg.IsReobservation); err != nil { + return nil, fmt.Errorf("failed to read isReobservation: %w", err) + } + payload := make([]byte, reader.Len()) n, err := reader.Read(payload) if err != nil || n == 0 { @@ -121,18 +132,22 @@ func UnmarshalOldMessagePublicationBeforeIsReobservation(data []byte) (*MessageP // UnmarshalMessagePublication deserializes a MessagePublication func UnmarshalMessagePublication(data []byte) (*MessagePublication, error) { if len(data) < minMsgLength { - return nil, fmt.Errorf("message is too short") + return nil, errors.New("message is too short") } msg := &MessagePublication{} reader := bytes.NewReader(data[:]) - txHash := common.Hash{} - if n, err := reader.Read(txHash[:]); err != nil || n != HashLength { - return nil, fmt.Errorf("failed to read TxHash [%d]: %w", n, err) + txIdLen := uint8(0) + if err := binary.Read(reader, binary.BigEndian, &txIdLen); err != nil { + return nil, fmt.Errorf("failed to read TxID len: %w", err) + } + + msg.TxID = make([]byte, txIdLen) + if n, err := reader.Read(msg.TxID[:]); err != nil || n != int(txIdLen) { + return nil, fmt.Errorf("failed to read TxID [%d]: %w", n, err) } - msg.TxHash = txHash unixSeconds := uint32(0) if err := binary.Read(reader, binary.BigEndian, &unixSeconds); err != nil { @@ -229,7 +244,7 @@ func (msg *MessagePublication) CreateDigest() string { // TODO refactor the codebase to use this function instead of manually logging the message with inconsistent fields func (msg *MessagePublication) ZapFields(fields ...zap.Field) []zap.Field { return append(fields, - zap.Stringer("tx", msg.TxHash), + zap.String("tx", msg.TxIDString()), zap.Time("timestamp", msg.Timestamp), zap.Uint32("nonce", msg.Nonce), zap.Uint8("consistency", msg.ConsistencyLevel), diff --git a/node/pkg/common/chainlock_test.go b/node/pkg/common/chainlock_test.go index 2ec27793f4..add2a03a03 100644 --- a/node/pkg/common/chainlock_test.go +++ b/node/pkg/common/chainlock_test.go @@ -51,7 +51,7 @@ func TestSerializeAndDeserializeOfMessagePublication(t *testing.T) { payloadBytes1 := encodePayloadBytes(payload1) msg1 := &MessagePublication{ - TxHash: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(), Timestamp: time.Unix(int64(1654516425), 0), Nonce: 123456, Sequence: 789101112131415, @@ -74,6 +74,118 @@ func TestSerializeAndDeserializeOfMessagePublication(t *testing.T) { assert.Equal(t, payload1, payload2) } +func TestSerializeAndDeserializeOfMessagePublicationWithEmptyTxID(t *testing.T) { + originAddress, err := vaa.StringToAddress("0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E") //nolint:gosec + require.NoError(t, err) + + targetAddress, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8") + require.NoError(t, err) + + tokenBridgeAddress, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8") + require.NoError(t, err) + + payload1 := &vaa.TransferPayloadHdr{ + Type: 0x01, + Amount: big.NewInt(27000000000), + OriginAddress: originAddress, + OriginChain: vaa.ChainIDEthereum, + TargetAddress: targetAddress, + TargetChain: vaa.ChainIDPolygon, + } + + payloadBytes1 := encodePayloadBytes(payload1) + + msg1 := &MessagePublication{ + TxID: []byte{}, + Timestamp: time.Unix(int64(1654516425), 0), + Nonce: 123456, + Sequence: 789101112131415, + EmitterChain: vaa.ChainIDEthereum, + EmitterAddress: tokenBridgeAddress, + Payload: payloadBytes1, + ConsistencyLevel: 32, + } + + bytes, err := msg1.Marshal() + require.NoError(t, err) + + msg2, err := UnmarshalMessagePublication(bytes) + require.NoError(t, err) + assert.Equal(t, msg1, msg2) + + payload2, err := vaa.DecodeTransferPayloadHdr(msg2.Payload) + require.NoError(t, err) + + assert.Equal(t, payload1, payload2) +} + +func TestSerializeAndDeserializeOfMessagePublicationWithArbitraryTxID(t *testing.T) { + originAddress, err := vaa.StringToAddress("0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E") //nolint:gosec + require.NoError(t, err) + + targetAddress, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8") + require.NoError(t, err) + + tokenBridgeAddress, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8") + require.NoError(t, err) + + payload1 := &vaa.TransferPayloadHdr{ + Type: 0x01, + Amount: big.NewInt(27000000000), + OriginAddress: originAddress, + OriginChain: vaa.ChainIDEthereum, + TargetAddress: targetAddress, + TargetChain: vaa.ChainIDPolygon, + } + + payloadBytes1 := encodePayloadBytes(payload1) + + msg1 := &MessagePublication{ + TxID: []byte("This is some arbitrary string with just some random junk in it. This is to prove that the TxID does not have to be a ethCommon.Hash"), + Timestamp: time.Unix(int64(1654516425), 0), + Nonce: 123456, + Sequence: 789101112131415, + EmitterChain: vaa.ChainIDEthereum, + EmitterAddress: tokenBridgeAddress, + Payload: payloadBytes1, + ConsistencyLevel: 32, + } + + bytes, err := msg1.Marshal() + require.NoError(t, err) + + msg2, err := UnmarshalMessagePublication(bytes) + require.NoError(t, err) + assert.Equal(t, msg1, msg2) + + payload2, err := vaa.DecodeTransferPayloadHdr(msg2.Payload) + require.NoError(t, err) + + assert.Equal(t, payload1, payload2) +} + +func TestTxIDStringTooLongShouldFail(t *testing.T) { + tokenBridgeAddress, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8") + require.NoError(t, err) + + // This is limited to 255. Make it 256 and the marshal should fail. + txID := []byte("0123456789012345678901234567890123456789012345678901234567890123012345678901234567890123456789012345678901234567890123456789012301234567890123456789012345678901234567890123456789012345678901230123456789012345678901234567890123456789012345678901234567890123") + + msg := &MessagePublication{ + TxID: txID, + Timestamp: time.Unix(int64(1654516425), 0), + Nonce: 123456, + Sequence: 789101112131415, + EmitterChain: vaa.ChainIDEthereum, + EmitterAddress: tokenBridgeAddress, + Payload: []byte("Hello, World!"), + ConsistencyLevel: 32, + } + + _, err = msg.Marshal() + assert.ErrorContains(t, err, "TxID too long") +} + func TestSerializeAndDeserializeOfMessagePublicationWithBigPayload(t *testing.T) { tokenBridgeAddress, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8") require.NoError(t, err) @@ -86,7 +198,7 @@ func TestSerializeAndDeserializeOfMessagePublicationWithBigPayload(t *testing.T) } msg1 := &MessagePublication{ - TxHash: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(), Timestamp: time.Unix(int64(1654516425), 0), Nonce: 123456, Sequence: 789101112131415, @@ -127,7 +239,53 @@ func TestMarshalUnmarshalJSONOfMessagePublication(t *testing.T) { payloadBytes1 := encodePayloadBytes(payload1) msg1 := &MessagePublication{ - TxHash: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(), + Timestamp: time.Unix(int64(1654516425), 0), + Nonce: 123456, + Sequence: 789101112131415, + EmitterChain: vaa.ChainIDEthereum, + EmitterAddress: tokenBridgeAddress, + Payload: payloadBytes1, + ConsistencyLevel: 32, + } + + bytes, err := msg1.MarshalJSON() + require.NoError(t, err) + + var msg2 MessagePublication + err = msg2.UnmarshalJSON(bytes) + require.NoError(t, err) + assert.Equal(t, *msg1, msg2) + + payload2, err := vaa.DecodeTransferPayloadHdr(msg2.Payload) + require.NoError(t, err) + + assert.Equal(t, *payload1, *payload2) +} + +func TestMarshalUnmarshalJSONOfMessagePublicationWithArbitraryTxID(t *testing.T) { + originAddress, err := vaa.StringToAddress("0xDDb64fE46a91D46ee29420539FC25FD07c5FEa3E") //nolint:gosec + require.NoError(t, err) + + targetAddress, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8") + require.NoError(t, err) + + tokenBridgeAddress, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8") + require.NoError(t, err) + + payload1 := &vaa.TransferPayloadHdr{ + Type: 0x01, + Amount: big.NewInt(27000000000), + OriginAddress: originAddress, + OriginChain: vaa.ChainIDEthereum, + TargetAddress: targetAddress, + TargetChain: vaa.ChainIDPolygon, + } + + payloadBytes1 := encodePayloadBytes(payload1) + + msg1 := &MessagePublication{ + TxID: []byte("This is some arbitrary string with just some random junk in it. This is to prove that the TxID does not have to be a ethCommon.Hash"), Timestamp: time.Unix(int64(1654516425), 0), Nonce: 123456, Sequence: 789101112131415, @@ -220,3 +378,23 @@ func TestMessageID(t *testing.T) { }) } } + +func TestTxIDStringMatchesHashToString(t *testing.T) { + tokenBridgeAddress, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8") + require.NoError(t, err) + + expectedHashID := "0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063" + + msg := &MessagePublication{ + TxID: eth_common.HexToHash(expectedHashID).Bytes(), + Timestamp: time.Unix(int64(1654516425), 0), + Nonce: 123456, + Sequence: 789101112131415, + EmitterChain: vaa.ChainIDEthereum, + EmitterAddress: tokenBridgeAddress, + Payload: []byte("Hello, World!"), + ConsistencyLevel: 32, + } + + assert.Equal(t, expectedHashID, msg.TxIDString()) +} diff --git a/node/pkg/db/accountant.go b/node/pkg/db/accountant.go index 2883ec4d59..3e2877fdaa 100644 --- a/node/pkg/db/accountant.go +++ b/node/pkg/db/accountant.go @@ -3,10 +3,13 @@ package db import ( "encoding/json" "fmt" + "time" "github.com/certusone/wormhole/node/pkg/common" "github.com/dgraph-io/badger/v3" + "github.com/wormhole-foundation/wormhole/sdk/vaa" + ethCommon "github.com/ethereum/go-ethereum/common" "go.uber.org/zap" ) @@ -31,10 +34,10 @@ func (d *MockAccountantDB) AcctGetData(logger *zap.Logger) ([]*common.MessagePub return nil, nil } -const acctOldPendingTransfer = "ACCT:PXFER:" +const acctOldPendingTransfer = "ACCT:PXFER2:" const acctOldPendingTransferLen = len(acctOldPendingTransfer) -const acctPendingTransfer = "ACCT:PXFER2:" +const acctPendingTransfer = "ACCT:PXFER3:" const acctPendingTransferLen = len(acctPendingTransfer) const acctMinMsgIdLen = len("1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0") @@ -59,6 +62,11 @@ func acctIsPendingTransfer(keyBytes []byte) bool { func (d *Database) AcctGetData(logger *zap.Logger) ([]*common.MessagePublication, error) { pendingTransfers := []*common.MessagePublication{} var err error + + if err = d.convertOldTransfersToNewFormat(logger); err != nil { + return pendingTransfers, fmt.Errorf("failed to convert old pending transfers to the new format: %w", err) + } + { prefixBytes := []byte(acctPendingTransfer) err = d.db.View(func(txn *badger.Txn) error { @@ -84,7 +92,7 @@ func (d *Database) AcctGetData(logger *zap.Logger) ([]*common.MessagePublication pendingTransfers = append(pendingTransfers, &pt) } else { - return fmt.Errorf("unexpected accountant pending transfer key '%s'", string(key)) + return fmt.Errorf("failed to load accountant pending transfer, unexpected key '%s'", string(key)) } } @@ -92,53 +100,6 @@ func (d *Database) AcctGetData(logger *zap.Logger) ([]*common.MessagePublication }) } - // Any pending transfers in the old format are long since obsolete. Just delete them. - if err == nil { - oldPendingTransfers := []string{} - prefixBytes := []byte(acctOldPendingTransfer) - err = d.db.View(func(txn *badger.Txn) error { - opts := badger.DefaultIteratorOptions - opts.PrefetchSize = 10 - it := txn.NewIterator(opts) - defer it.Close() - for it.Seek(prefixBytes); it.ValidForPrefix(prefixBytes); it.Next() { - item := it.Item() - key := item.Key() - val, err := item.ValueCopy(nil) - if err != nil { - return err - } - - if acctIsOldPendingTransfer(key) { - pt, err := common.UnmarshalOldMessagePublicationBeforeIsReobservation(val) - if err != nil { - logger.Error("failed to unmarshal old pending transfer for key", zap.String("key", string(key[:])), zap.Error(err)) - continue - } - - oldPendingTransfers = append(oldPendingTransfers, pt.MessageIDString()) - } else { - return fmt.Errorf("unexpected accountant pending transfer key '%s'", string(key)) - } - } - - return nil - }) - - if err == nil && len(oldPendingTransfers) != 0 { - for _, pt := range oldPendingTransfers { - key := acctOldPendingTransferMsgID(pt) - logger.Info("deleting obsolete pending transfer", zap.String("msgId", pt), zap.String("key", string(key))) - if err := d.db.Update(func(txn *badger.Txn) error { - err := txn.Delete(key) - return err - }); err != nil { - return pendingTransfers, fmt.Errorf("failed to delete old pending msg for key [%v]: %w", pt, err) - } - } - } - } - return pendingTransfers, err } @@ -170,3 +131,113 @@ func (d *Database) AcctDeletePendingTransfer(msgId string) error { return nil } + +////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// The code below here is used to read and convert old Pending transfers. Once the db has been migrated away from those, this can be deleted. +////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +// OldMessagePublication is used to unmarshal old JSON which has the TxHash rather than the TxID. +type OldMessagePublication struct { + TxHash ethCommon.Hash + Timestamp time.Time + + Nonce uint32 + Sequence uint64 + ConsistencyLevel uint8 + EmitterChain vaa.ChainID + EmitterAddress vaa.Address + Payload []byte + IsReobservation bool + Unreliable bool +} + +func (msg *OldMessagePublication) UnmarshalJSON(data []byte) error { + type Alias OldMessagePublication + aux := &struct { + Timestamp int64 + *Alias + }{ + Alias: (*Alias)(msg), + } + if err := json.Unmarshal(data, &aux); err != nil { + return err + } + msg.Timestamp = time.Unix(aux.Timestamp, 0) + return nil +} + +// convertOldToNew converts an OldMessagePublication to a MessagePublication. +func convertOldToNew(old *OldMessagePublication) *common.MessagePublication { + return &common.MessagePublication{ + TxID: old.TxHash.Bytes(), + Timestamp: old.Timestamp, + Nonce: old.Nonce, + Sequence: old.Sequence, + EmitterChain: old.EmitterChain, + EmitterAddress: old.EmitterAddress, + Payload: old.Payload, + ConsistencyLevel: old.ConsistencyLevel, + IsReobservation: old.IsReobservation, + Unreliable: old.Unreliable, + } +} + +// convertOldTransfersToNewFormat loads any pending transfers in the old format, writes them in the new format and deletes the old ones. +func (d *Database) convertOldTransfersToNewFormat(logger *zap.Logger) error { + pendingTransfers := []*common.MessagePublication{} + prefixBytes := []byte(acctOldPendingTransfer) + err := d.db.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + opts.PrefetchSize = 10 + it := txn.NewIterator(opts) + defer it.Close() + for it.Seek(prefixBytes); it.ValidForPrefix(prefixBytes); it.Next() { + item := it.Item() + key := item.Key() + val, err := item.ValueCopy(nil) + if err != nil { + return err + } + + if acctIsOldPendingTransfer(key) { + var pt OldMessagePublication + err := json.Unmarshal(val, &pt) + if err != nil { + return fmt.Errorf("failed to unmarshal old pending transfer for key '%s': %w", string(key), err) + } + + pendingTransfers = append(pendingTransfers, convertOldToNew(&pt)) + } else { + return fmt.Errorf("failed to convert old accountant pending transfer, unexpected key '%s'", string(key)) + } + } + + return nil + }) + + if err != nil { + return err + } + + if len(pendingTransfers) != 0 { + for _, pt := range pendingTransfers { + logger.Info("converting old pending transfer to new format", zap.String("msgId", pt.MessageIDString())) + if err := d.AcctStorePendingTransfer(pt); err != nil { + return fmt.Errorf("failed to convert old pending transfer for key [%v]: %w", pt, err) + } + } + + for _, pt := range pendingTransfers { + key := acctOldPendingTransferMsgID(pt.MessageIDString()) + logger.Info("deleting old pending transfer", zap.String("msgId", pt.MessageIDString()), zap.String("key", string(key))) + if err := d.db.Update(func(txn *badger.Txn) error { + err := txn.Delete(key) + return err + }); err != nil { + return fmt.Errorf("failed to delete old pending transfer for key [%v]: %w", pt, err) + } + } + } + + return nil +} diff --git a/node/pkg/db/accountant_test.go b/node/pkg/db/accountant_test.go index c79728bc1b..9e5f387c06 100644 --- a/node/pkg/db/accountant_test.go +++ b/node/pkg/db/accountant_test.go @@ -1,9 +1,10 @@ package db import ( - "bytes" - "encoding/binary" + "encoding/json" + "fmt" "os" + "sort" "testing" "time" @@ -16,6 +17,9 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest" + "go.uber.org/zap/zaptest/observer" ) func TestAcctPendingTransferMsgID(t *testing.T) { @@ -23,7 +27,7 @@ func TestAcctPendingTransferMsgID(t *testing.T) { require.NoError(t, err) msg1 := &common.MessagePublication{ - TxHash: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(), Timestamp: time.Unix(int64(1654516425), 0), Nonce: 123456, Sequence: 789101112131415, @@ -33,22 +37,22 @@ func TestAcctPendingTransferMsgID(t *testing.T) { ConsistencyLevel: 16, } - assert.Equal(t, []byte("ACCT:PXFER:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"), acctOldPendingTransferMsgID(msg1.MessageIDString())) - assert.Equal(t, []byte("ACCT:PXFER2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"), acctPendingTransferMsgID(msg1.MessageIDString())) + assert.Equal(t, []byte("ACCT:PXFER2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"), acctOldPendingTransferMsgID(msg1.MessageIDString())) + assert.Equal(t, []byte("ACCT:PXFER3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"), acctPendingTransferMsgID(msg1.MessageIDString())) } func TestAcctIsPendingTransfer(t *testing.T) { - assert.Equal(t, true, acctIsPendingTransfer([]byte("ACCT:PXFER2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) - assert.Equal(t, false, acctIsPendingTransfer([]byte("ACCT:PXFER2:"))) - assert.Equal(t, false, acctIsPendingTransfer([]byte("ACCT:PXFER2:1"))) - assert.Equal(t, false, acctIsPendingTransfer([]byte("ACCT:PXFER2:1/1/1"))) - assert.Equal(t, false, acctIsPendingTransfer([]byte("ACCT:PXFER2:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/"))) - assert.Equal(t, true, acctIsPendingTransfer([]byte("ACCT:PXFER2:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0"))) + assert.Equal(t, true, acctIsPendingTransfer([]byte("ACCT:PXFER3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) + assert.Equal(t, false, acctIsPendingTransfer([]byte("ACCT:PXFER3:"))) + assert.Equal(t, false, acctIsPendingTransfer([]byte("ACCT:PXFER3:1"))) + assert.Equal(t, false, acctIsPendingTransfer([]byte("ACCT:PXFER3:1/1/1"))) + assert.Equal(t, false, acctIsPendingTransfer([]byte("ACCT:PXFER3:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/"))) + assert.Equal(t, true, acctIsPendingTransfer([]byte("ACCT:PXFER3:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0"))) assert.Equal(t, false, acctIsPendingTransfer([]byte("GOV:PENDING:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) assert.Equal(t, false, acctIsPendingTransfer([]byte{0x01, 0x02, 0x03, 0x04})) assert.Equal(t, false, acctIsPendingTransfer([]byte{})) - assert.Equal(t, true, acctIsOldPendingTransfer([]byte("ACCT:PXFER:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) - assert.Equal(t, false, acctIsOldPendingTransfer([]byte("ACCT:PXFER2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) + assert.Equal(t, true, acctIsOldPendingTransfer([]byte("ACCT:PXFER2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) + assert.Equal(t, false, acctIsOldPendingTransfer([]byte("ACCT:PXFER3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) } func TestAcctStoreAndDeletePendingTransfers(t *testing.T) { @@ -60,7 +64,7 @@ func TestAcctStoreAndDeletePendingTransfers(t *testing.T) { require.NoError(t, err) msg1 := &common.MessagePublication{ - TxHash: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(), Timestamp: time.Unix(int64(1654516425), 0), Nonce: 123456, Sequence: 789101112131415, @@ -71,7 +75,7 @@ func TestAcctStoreAndDeletePendingTransfers(t *testing.T) { } msg2 := &common.MessagePublication{ - TxHash: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4064"), + TxID: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4064").Bytes(), Timestamp: time.Unix(int64(1654516425), 0), Nonce: 123457, Sequence: 789101112131416, @@ -99,7 +103,7 @@ func TestAcctStoreAndDeletePendingTransfers(t *testing.T) { // Delete something that doesn't exist. msg3 := &common.MessagePublication{ - TxHash: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4064"), + TxID: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4064").Bytes(), Timestamp: time.Unix(int64(1654516425), 0), Nonce: 123457, Sequence: 789101112131417, @@ -146,7 +150,7 @@ func TestAcctGetData(t *testing.T) { require.NoError(t, err) msg1 := &common.MessagePublication{ - TxHash: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(), Timestamp: time.Unix(int64(1654516425), 0), Nonce: 123456, Sequence: 789101112131415, @@ -157,7 +161,7 @@ func TestAcctGetData(t *testing.T) { } msg2 := &common.MessagePublication{ - TxHash: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4064"), + TxID: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4064").Bytes(), Timestamp: time.Unix(int64(1654516425), 0), Nonce: 123457, Sequence: 789101112131416, @@ -189,13 +193,13 @@ func TestAcctGetData(t *testing.T) { assert.Equal(t, *msg2, *pendings[1]) } -func TestAcctLoadingWhereOldPendingsGetDropped(t *testing.T) { +func TestAcctLoadingWhereOldPendingGetsUpdated(t *testing.T) { dbPath := t.TempDir() db := OpenDb(zap.NewNop(), &dbPath) defer db.Close() defer os.Remove(dbPath) - logger := zap.NewNop() + logger, zapObserver := setupLogsCapture(t) tokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16") require.NoError(t, err) @@ -203,7 +207,7 @@ func TestAcctLoadingWhereOldPendingsGetDropped(t *testing.T) { now := time.Unix(time.Now().Unix(), 0) // Write the first pending event in the old format. - pending1 := &common.MessagePublication{ + pending1 := &OldMessagePublication{ TxHash: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: now, Nonce: 123456, @@ -215,14 +219,14 @@ func TestAcctLoadingWhereOldPendingsGetDropped(t *testing.T) { // IsReobservation will not be serialized. It should be set to false on reload. } - db.acctStoreOldPendingTransfer(t, pending1) + err = db.acctStoreOldPendingTransfer(pending1) require.Nil(t, err) now2 := now.Add(time.Second * 5) // Write the second one in the new format. pending2 := &common.MessagePublication{ - TxHash: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(), Timestamp: now2, Nonce: 123456, Sequence: 789101112131418, @@ -236,35 +240,111 @@ func TestAcctLoadingWhereOldPendingsGetDropped(t *testing.T) { err = db.AcctStorePendingTransfer(pending2) require.Nil(t, err) - // When we reload the data, the first one should get dropped, so we should get back only one. + // When we reload the data, the first one should get converted and returned here. pendings, err := db.AcctGetData(logger) require.NoError(t, err) - require.Equal(t, 1, len(pendings)) + require.Equal(t, 2, len(pendings)) + + // Verify that we converted and deleted the old one. + loggedEntries := zapObserver.FilterMessage("converting old pending transfer to new format").All() + require.Equal(t, 1, len(loggedEntries)) + loggedEntries = zapObserver.FilterMessage("deleting old pending transfer").All() + require.Equal(t, 1, len(loggedEntries)) - assert.Equal(t, *pending2, *pendings[0]) + sort.SliceStable(pendings, func(i, j int) bool { + return pendings[i].Timestamp.Before(pendings[j].Timestamp) + }) + + assert.Equal(t, *convertOldToNew(pending1), *pendings[0]) + assert.Equal(t, *pending2, *pendings[1]) - // Make sure we can still reload things after deleting the old one. + // Make sure we can still reload things after updating the old one. + logger, zapObserver = setupLogsCapture(t) pendings2, err := db.AcctGetData(logger) require.Nil(t, err) - require.Equal(t, 1, len(pendings2)) + require.Equal(t, 2, len(pendings2)) - assert.Equal(t, pending2, pendings2[0]) -} + // Verify that we didn't do any conversions the second time. + loggedEntries = zapObserver.FilterMessage("converting old pending transfer to new format").All() + require.Equal(t, 0, len(loggedEntries)) + loggedEntries = zapObserver.FilterMessage("deleting old pending transfer").All() + require.Equal(t, 0, len(loggedEntries)) -func (d *Database) acctStoreOldPendingTransfer(t *testing.T, msg *common.MessagePublication) { - buf := new(bytes.Buffer) + assert.Equal(t, *convertOldToNew(pending1), *pendings[0]) + assert.Equal(t, *pending2, *pendings[1]) - b := marshalOldMessagePublication(msg) + sort.SliceStable(pendings, func(i, j int) bool { + return pendings[i].Timestamp.Before(pendings[j].Timestamp) + }) - vaa.MustWrite(buf, binary.BigEndian, b) + assert.Equal(t, *convertOldToNew(pending1), *pendings[0]) + assert.Equal(t, *pending2, *pendings[1]) +} + +// setupLogsCapture is a helper function for making a zap logger/observer combination for testing that certain logs have been made +func setupLogsCapture(t testing.TB) (*zap.Logger, *observer.ObservedLogs) { + t.Helper() + observedCore, observedLogs := observer.New(zap.InfoLevel) + consoleLogger := zaptest.NewLogger(t, zaptest.Level(zap.InfoLevel)) + parentLogger := zap.New(zapcore.NewTee(observedCore, consoleLogger.Core())) + return parentLogger, observedLogs +} + +func (d *Database) acctStoreOldPendingTransfer(msg *OldMessagePublication) error { + b, _ := json.Marshal(msg) err := d.db.Update(func(txn *badger.Txn) error { - if err := txn.Set(acctOldPendingTransferMsgID(msg.MessageIDString()), buf.Bytes()); err != nil { + if err := txn.Set(acctOldPendingTransferMsgID(msg.MessageIDString()), b); err != nil { return err } return nil }) + if err != nil { + return fmt.Errorf("failed to commit old accountant pending transfer for tx %s: %w", msg.MessageIDString(), err) + } + + return nil +} + +// The standard json Marshal / Unmarshal of time.Time gets confused between local and UTC time. +func (msg *OldMessagePublication) MarshalJSON() ([]byte, error) { + type Alias OldMessagePublication + return json.Marshal(&struct { + Timestamp int64 + *Alias + }{ + Timestamp: msg.Timestamp.Unix(), + Alias: (*Alias)(msg), + }) +} + +func (msg *OldMessagePublication) MessageIDString() string { + return fmt.Sprintf("%v/%v/%v", uint16(msg.EmitterChain), msg.EmitterAddress, msg.Sequence) +} + +func TestUnmarshalOldJSON(t *testing.T) { + jsn := ` + { + "TxID": "SGVsbG8=", + "Timestamp": 1654516425, + "TxHash": "0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063", + "Nonce": 123456, + "Sequence": 789101112131415, + "ConsistencyLevel": 32, + "EmitterChain": 2, + "EmitterAddress": "000000000000000000000000707f9118e33a9b8998bea41dd0d46f38bb963fc8", + "Payload": "AQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAZJU04AAAAAAAAAAAAAAAAA3bZP5GqR1G7ilCBTn8Jf0Hxf6j4AAgAAAAAAAAAAAAAAAHB/kRjjOpuJmL6kHdDUbzi7lj/IAAU=", + "IsReobservation": false, + "Unreliable": false + } + ` + + var oldMsg OldMessagePublication + err := json.Unmarshal([]byte(jsn), &oldMsg) require.NoError(t, err) + + newMsg := convertOldToNew(&oldMsg) + assert.Equal(t, oldMsg.TxHash.String(), newMsg.TxIDString()) } diff --git a/node/pkg/db/governor.go b/node/pkg/db/governor.go index 48bcc9d0e5..08419c2f34 100644 --- a/node/pkg/db/governor.go +++ b/node/pkg/db/governor.go @@ -268,7 +268,7 @@ func UnmarshalPendingTransfer(data []byte, isOld bool) (*PendingTransfer, error) var msg *common.MessagePublication if isOld { - msg, err = common.UnmarshalOldMessagePublicationBeforeIsReobservation(buf) + msg, err = common.UnmarshalOldMessagePublicationWithTxHash(buf) } else { msg, err = common.UnmarshalMessagePublication(buf) } @@ -287,13 +287,13 @@ const transfer = "GOV:XFER3:" const transferLen = len(transfer) // Since we are changing the DB format of pending entries, we will use a new tag in the pending key field. -// The first time we run this new release, any existing entries with the "GOV:PENDING2" tag will get converted -// to the new format and given the "GOV:PENDING3" format. In a future release, the "GOV:PENDING2" code can be deleted. +// The first time we run this new release, any existing entries with the old tag will get converted +// to the new format and the new tag. In a future release, code for the old format can be deleted. -const oldPending = "GOV:PENDING2:" +const oldPending = "GOV:PENDING3:" const oldPendingLen = len(oldPending) -const pending = "GOV:PENDING3:" +const pending = "GOV:PENDING4:" const pendingLen = len(pending) const minMsgIdLen = len("1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0") diff --git a/node/pkg/db/governor_test.go b/node/pkg/db/governor_test.go index 0b0b2d17b0..eca60efdff 100644 --- a/node/pkg/db/governor_test.go +++ b/node/pkg/db/governor_test.go @@ -65,7 +65,7 @@ func TestPendingMsgID(t *testing.T) { require.NoError(t, err) msg1 := &common.MessagePublication{ - TxHash: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(), Timestamp: time.Unix(int64(1654516425), 0), Nonce: 123456, Sequence: 789101112131415, @@ -75,7 +75,7 @@ func TestPendingMsgID(t *testing.T) { ConsistencyLevel: 16, } - assert.Equal(t, []byte("GOV:PENDING3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"), PendingMsgID(msg1)) + assert.Equal(t, []byte("GOV:PENDING4:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"), PendingMsgID(msg1)) } func TestTransferMsgID(t *testing.T) { @@ -120,18 +120,18 @@ func TestIsTransfer(t *testing.T) { } func TestIsPendingMsg(t *testing.T) { - assert.Equal(t, true, IsPendingMsg([]byte("GOV:PENDING3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) + assert.Equal(t, true, IsPendingMsg([]byte("GOV:PENDING4:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) assert.Equal(t, false, IsPendingMsg([]byte("GOV:XFER3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) - assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING3:"))) - assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING3:"+"1"))) - assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING3:"+"1/1/1"))) - assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING3:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/"))) - assert.Equal(t, true, IsPendingMsg([]byte("GOV:PENDING3:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0"))) - assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) + assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING4:"))) + assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING4:"+"1"))) + assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING4:"+"1/1/1"))) + assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING4:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/"))) + assert.Equal(t, true, IsPendingMsg([]byte("GOV:PENDING4:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0"))) + assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) assert.Equal(t, false, IsPendingMsg([]byte{0x01, 0x02, 0x03, 0x04})) assert.Equal(t, false, IsPendingMsg([]byte{})) - assert.Equal(t, true, isOldPendingMsg([]byte("GOV:PENDING2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) - assert.Equal(t, false, isOldPendingMsg([]byte("GOV:PENDING3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) + assert.Equal(t, true, isOldPendingMsg([]byte("GOV:PENDING3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) + assert.Equal(t, false, isOldPendingMsg([]byte("GOV:PENDING4:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"))) } func TestGetChainGovernorData(t *testing.T) { @@ -228,7 +228,7 @@ func TestStorePendingMsg(t *testing.T) { assert.NoError(t, err2) msg := &common.MessagePublication{ - TxHash: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(), Timestamp: time.Unix(int64(1654516425), 0), Nonce: 123456, Sequence: 789101112131415, @@ -253,7 +253,7 @@ func TestDeletePendingMsg(t *testing.T) { assert.NoError(t, err2) msg := &common.MessagePublication{ - TxHash: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(), Timestamp: time.Unix(int64(1654516425), 0), Nonce: 123456, Sequence: 789101112131415, @@ -283,7 +283,7 @@ func TestSerializeAndDeserializeOfPendingTransfer(t *testing.T) { require.NoError(t, err) msg := common.MessagePublication{ - TxHash: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(), Timestamp: time.Unix(int64(1654516425), 0), Nonce: 123456, Sequence: 789101112131415, @@ -307,7 +307,7 @@ func TestSerializeAndDeserializeOfPendingTransfer(t *testing.T) { assert.Equal(t, pending1, pending2) - expectedPendingKey := "GOV:PENDING3:2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415" + expectedPendingKey := "GOV:PENDING4:2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415" assert.Equal(t, expectedPendingKey, string(PendingMsgID(&pending2.Msg))) } @@ -361,7 +361,7 @@ func TestStoreAndReloadTransfers(t *testing.T) { pending1 := &PendingTransfer{ ReleaseTime: time.Unix(int64(1654516435+72*60*60), 0), Msg: common.MessagePublication{ - TxHash: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(), Timestamp: time.Unix(int64(1654516435), 0), Nonce: 123456, Sequence: 789101112131417, @@ -378,7 +378,7 @@ func TestStoreAndReloadTransfers(t *testing.T) { pending2 := &PendingTransfer{ ReleaseTime: time.Unix(int64(1654516440+72*60*60), 0), Msg: common.MessagePublication{ - TxHash: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(), Timestamp: time.Unix(int64(1654516440), 0), Nonce: 123456, Sequence: 789101112131418, @@ -524,7 +524,7 @@ func TestUnmarshalPendingTransferFailures(t *testing.T) { require.NoError(t, err) msg := common.MessagePublication{ - TxHash: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(), Timestamp: time.Unix(int64(1654516425), 0), Nonce: 123456, Sequence: 789101112131415, @@ -585,13 +585,14 @@ func (d *Database) storeOldPendingMsg(t *testing.T, p *PendingTransfer) { func marshalOldMessagePublication(msg *common.MessagePublication) []byte { buf := new(bytes.Buffer) - buf.Write(msg.TxHash[:]) + buf.Write(msg.TxID[:]) vaa.MustWrite(buf, binary.BigEndian, uint32(msg.Timestamp.Unix())) vaa.MustWrite(buf, binary.BigEndian, msg.Nonce) vaa.MustWrite(buf, binary.BigEndian, msg.Sequence) vaa.MustWrite(buf, binary.BigEndian, msg.ConsistencyLevel) vaa.MustWrite(buf, binary.BigEndian, msg.EmitterChain) buf.Write(msg.EmitterAddress[:]) + vaa.MustWrite(buf, binary.BigEndian, msg.IsReobservation) buf.Write(msg.Payload) return buf.Bytes() @@ -674,13 +675,12 @@ func TestLoadingOldPendingTransfers(t *testing.T) { err = db.StoreTransfer(newXfer2) require.NoError(t, err) - now := time.Unix(time.Now().Unix(), 0) - // Write the first pending event in the old format. + now := time.Unix(time.Now().Unix(), 0) pending1 := &PendingTransfer{ ReleaseTime: now.Add(time.Hour * 71), // Setting it to 71 hours so we can confirm it didn't get set to the default., Msg: common.MessagePublication{ - TxHash: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(), Timestamp: now, Nonce: 123456, Sequence: 789101112131417, @@ -695,14 +695,13 @@ func TestLoadingOldPendingTransfers(t *testing.T) { db.storeOldPendingMsg(t, pending1) require.NoError(t, err) - now2 := now.Add(time.Second * 5) - // Write the second one in the new format. + now = now.Add(time.Second * 5) pending2 := &PendingTransfer{ - ReleaseTime: now2.Add(time.Hour * 71), // Setting it to 71 hours so we can confirm it didn't get set to the default. + ReleaseTime: now.Add(time.Hour * 71), // Setting it to 71 hours so we can confirm it didn't get set to the default. Msg: common.MessagePublication{ - TxHash: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), - Timestamp: now2, + TxID: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(), + Timestamp: now, Nonce: 123456, Sequence: 789101112131418, EmitterChain: vaa.ChainIDEthereum, @@ -716,12 +715,39 @@ func TestLoadingOldPendingTransfers(t *testing.T) { err = db.StorePendingMsg(pending2) require.NoError(t, err) - logger := zap.NewNop() + // Write the third pending event in the old format. + now = now.Add(time.Second * 5) + pending3 := &PendingTransfer{ + ReleaseTime: now.Add(time.Hour * 71), // Setting it to 71 hours so we can confirm it didn't get set to the default., + Msg: common.MessagePublication{ + TxID: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4064").Bytes(), + Timestamp: now, + Nonce: 123456, + Sequence: 789101112131419, + EmitterChain: vaa.ChainIDEthereum, + EmitterAddress: ethereumTokenBridgeAddr, + Payload: []byte{4, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + ConsistencyLevel: 16, + // IsReobservation will not be serialized. It should be set to false on reload. + }, + } + + db.storeOldPendingMsg(t, pending3) + require.NoError(t, err) + + logger, zapObserver := setupLogsCapture(t) + xfers, pendings, err := db.GetChainGovernorDataForTime(logger, now) require.NoError(t, err) require.Equal(t, 4, len(xfers)) - require.Equal(t, 2, len(pendings)) + require.Equal(t, 3, len(pendings)) + + // Verify that we converted the two old pending transfers and the two old completed transfers. + loggedEntries := zapObserver.FilterMessage("updating format of database entry for pending vaa").All() + require.Equal(t, 2, len(loggedEntries)) + loggedEntries = zapObserver.FilterMessage("updating format of database entry for completed transfer").All() + require.Equal(t, 2, len(loggedEntries)) sort.SliceStable(xfers, func(i, j int) bool { return xfers[i].Timestamp.Before(xfers[j].Timestamp) @@ -739,14 +765,23 @@ func TestLoadingOldPendingTransfers(t *testing.T) { assert.Equal(t, pending1.Msg, pendings[0].Msg) assert.Equal(t, pending2.Msg, pendings[1].Msg) + assert.Equal(t, pending3.Msg, pendings[2].Msg) // Make sure we can reload the updated pendings. + logger, zapObserver = setupLogsCapture(t) + xfers2, pendings2, err := db.GetChainGovernorDataForTime(logger, now) require.NoError(t, err) require.Equal(t, 4, len(xfers2)) - require.Equal(t, 2, len(pendings2)) + require.Equal(t, 3, len(pendings2)) + + // This time we shouldn't have updated anything. + loggedEntries = zapObserver.FilterMessage("updating format of database entry for pending vaa").All() + require.Equal(t, 0, len(loggedEntries)) + loggedEntries = zapObserver.FilterMessage("updating format of database entry for completed transfer").All() + require.Equal(t, 0, len(loggedEntries)) sort.SliceStable(xfers2, func(i, j int) bool { return xfers2[i].Timestamp.Before(xfers2[j].Timestamp) diff --git a/node/pkg/governor/governor.go b/node/pkg/governor/governor.go index 8f6137b1c8..28313cfcfb 100644 --- a/node/pkg/governor/governor.go +++ b/node/pkg/governor/governor.go @@ -464,7 +464,7 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now gov.logger.Info("ignoring duplicate vaa because it is enqueued", zap.String("msgID", msg.MessageIDString()), zap.String("hash", hash), - zap.Stringer("txHash", msg.TxHash), + zap.String("txID", msg.TxIDString()), ) return false, nil } @@ -472,7 +472,7 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now gov.logger.Info("allowing duplicate vaa to be published again, but not adding it to the notional value", zap.String("msgID", msg.MessageIDString()), zap.String("hash", hash), - zap.Stringer("txHash", msg.TxHash), + zap.String("txID", msg.TxIDString()), ) return true, nil } @@ -484,7 +484,7 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now gov.logger.Error("Error when attempting to trim and sum transfers", zap.String("msgID", msg.MessageIDString()), zap.String("hash", hash), - zap.Stringer("txHash", msg.TxHash), + zap.String("txID", msg.TxIDString()), zap.Error(err), ) return false, err @@ -496,7 +496,7 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now gov.logger.Error("failed to compute value of transfer", zap.String("msgID", msg.MessageIDString()), zap.String("hash", hash), - zap.Stringer("txHash", msg.TxHash), + zap.String("txID", msg.TxIDString()), zap.Error(err), ) return false, err @@ -507,7 +507,7 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now gov.logger.Error("total value has overflowed", zap.String("msgID", msg.MessageIDString()), zap.String("hash", hash), - zap.Stringer("txHash", msg.TxHash), + zap.String("txID", msg.TxIDString()), zap.Uint64("prevTotalValue", prevTotalValue), zap.Uint64("newTotalValue", newTotalValue), ) @@ -527,7 +527,7 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now zap.Stringer("releaseTime", releaseTime), zap.Uint64("bigTransactionSize", emitterChainEntry.bigTransactionSize), zap.String("hash", hash), - zap.Stringer("txHash", msg.TxHash), + zap.String("txID", msg.TxIDString()), ) } else if newTotalValue > emitterChainEntry.dailyLimit { enqueueIt = true @@ -539,7 +539,7 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now zap.Stringer("releaseTime", releaseTime), zap.String("msgID", msg.MessageIDString()), zap.String("hash", hash), - zap.Stringer("txHash", msg.TxHash), + zap.String("txID", msg.TxIDString()), ) } @@ -550,7 +550,7 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now gov.logger.Error("failed to store pending vaa", zap.String("msgID", msg.MessageIDString()), zap.String("hash", hash), - zap.Stringer("txHash", msg.TxHash), + zap.String("txID", msg.TxIDString()), zap.Error(err), ) return false, err @@ -570,7 +570,7 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now zap.Uint64("newTotalValue", newTotalValue), zap.String("msgID", msg.MessageIDString()), zap.String("hash", hash), - zap.Stringer("txHash", msg.TxHash), + zap.String("txID", msg.TxIDString()), ) dbTransfer := db.Transfer{ @@ -591,7 +591,7 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now gov.logger.Error("failed to store transfer", zap.String("msgID", msg.MessageIDString()), zap.String("hash", hash), zap.Error(err), - zap.Stringer("txHash", msg.TxHash), + zap.String("txID", msg.TxIDString()), ) return false, err } diff --git a/node/pkg/governor/governor_db.go b/node/pkg/governor/governor_db.go index db928ce868..809b400510 100644 --- a/node/pkg/governor/governor_db.go +++ b/node/pkg/governor/governor_db.go @@ -67,7 +67,7 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer) { if !exists { gov.logger.Error("reloaded pending transfer for unsupported chain, dropping it", zap.String("MsgID", msg.MessageIDString()), - zap.Stringer("TxHash", msg.TxHash), + zap.String("txID", msg.TxIDString()), zap.Stringer("Timestamp", msg.Timestamp), zap.Uint32("Nonce", msg.Nonce), zap.Uint64("Sequence", msg.Sequence), @@ -81,7 +81,7 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer) { if msg.EmitterAddress != ce.emitterAddr { gov.logger.Error("reloaded pending transfer for unsupported emitter address, dropping it", zap.String("MsgID", msg.MessageIDString()), - zap.Stringer("TxHash", msg.TxHash), + zap.String("txID", msg.TxIDString()), zap.Stringer("Timestamp", msg.Timestamp), zap.Uint32("Nonce", msg.Nonce), zap.Uint64("Sequence", msg.Sequence), @@ -96,7 +96,7 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer) { if err != nil { gov.logger.Error("failed to parse payload for reloaded pending transfer, dropping it", zap.String("MsgID", msg.MessageIDString()), - zap.Stringer("TxHash", msg.TxHash), + zap.String("txID", msg.TxIDString()), zap.Stringer("Timestamp", msg.Timestamp), zap.Uint32("Nonce", msg.Nonce), zap.Uint64("Sequence", msg.Sequence), @@ -113,7 +113,7 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer) { if !exists { gov.logger.Error("reloaded pending transfer for unsupported token, dropping it", zap.String("MsgID", msg.MessageIDString()), - zap.Stringer("TxHash", msg.TxHash), + zap.String("txID", msg.TxIDString()), zap.Stringer("Timestamp", msg.Timestamp), zap.Uint32("Nonce", msg.Nonce), zap.Uint64("Sequence", msg.Sequence), @@ -131,7 +131,7 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer) { if _, alreadyExists := gov.msgsSeen[hash]; alreadyExists { gov.logger.Error("not reloading pending transfer because it is a duplicate", zap.String("MsgID", msg.MessageIDString()), - zap.Stringer("TxHash", msg.TxHash), + zap.String("txID", msg.TxIDString()), zap.Stringer("Timestamp", msg.Timestamp), zap.Uint32("Nonce", msg.Nonce), zap.Uint64("Sequence", msg.Sequence), @@ -146,7 +146,7 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer) { gov.logger.Info("reloaded pending transfer", zap.String("MsgID", msg.MessageIDString()), - zap.Stringer("TxHash", msg.TxHash), + zap.String("txID", msg.TxIDString()), zap.Stringer("Timestamp", msg.Timestamp), zap.Uint32("Nonce", msg.Nonce), zap.Uint64("Sequence", msg.Sequence), diff --git a/node/pkg/governor/governor_monitoring.go b/node/pkg/governor/governor_monitoring.go index 378bc9ba4e..cd03ada24c 100644 --- a/node/pkg/governor/governor_monitoring.go +++ b/node/pkg/governor/governor_monitoring.go @@ -381,7 +381,7 @@ func (gov *ChainGovernor) GetEnqueuedVAAs() []*publicrpcv1.GovernorGetEnqueuedVA Sequence: pe.dbData.Msg.Sequence, ReleaseTime: uint32(pe.dbData.ReleaseTime.Unix()), NotionalValue: value, - TxHash: pe.dbData.Msg.TxHash.String(), + TxHash: pe.dbData.Msg.TxIDString(), }) } } @@ -649,7 +649,7 @@ func (gov *ChainGovernor) publishStatus(ctx context.Context, hb *gossipv1.Heartb Sequence: pe.dbData.Msg.Sequence, ReleaseTime: uint32(pe.dbData.ReleaseTime.Unix()), NotionalValue: value, - TxHash: pe.dbData.Msg.TxHash.String(), + TxHash: pe.dbData.Msg.TxIDString(), }) } } diff --git a/node/pkg/governor/governor_test.go b/node/pkg/governor/governor_test.go index d44168c5d7..a7fed811aa 100644 --- a/node/pkg/governor/governor_test.go +++ b/node/pkg/governor/governor_test.go @@ -697,13 +697,13 @@ func newChainGovernorForTestWithLogger(ctx context.Context, logger *zap.Logger) return gov, nil } -// Converts a string into a go-ethereum Hash object used as test input. -func hashFromString(str string) eth_common.Hash { +// Converts a TxHash string into a byte array to be used as a TxID. +func hashToTxID(str string) []byte { if (len(str) > 2) && (str[0] == '0') && (str[1] == 'x') { str = str[2:] } - return eth_common.HexToHash(str) + return eth_common.HexToHash(str).Bytes() } func TestVaaForUninterestingEmitterChain(t *testing.T) { @@ -717,7 +717,7 @@ func TestVaaForUninterestingEmitterChain(t *testing.T) { payload := []byte{1, 97, 97, 97, 97, 97} msg := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(1), @@ -749,7 +749,7 @@ func TestVaaForUninterestingEmitterAddress(t *testing.T) { payload := []byte{1, 97, 97, 97, 97, 97} msg := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(1), @@ -782,7 +782,7 @@ func TestVaaForUninterestingPayloadType(t *testing.T) { payload := []byte{2, 97, 97, 97, 97, 97} msg := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(1), @@ -886,7 +886,7 @@ func TestVaaForUninterestingToken(t *testing.T) { tokenBridgeAddr, _ := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16") msg := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(1), @@ -971,7 +971,7 @@ func TestFlowCancelProcessMsgForTimeFullCancel(t *testing.T) { // Transfer from Ethereum to Sui via the token bridge msg1 := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: transferTime, Nonce: uint32(1), Sequence: uint64(1), @@ -989,7 +989,7 @@ func TestFlowCancelProcessMsgForTimeFullCancel(t *testing.T) { // Transfer from Sui to Ethereum via the token bridge msg2 := common.MessagePublication{ - TxHash: hashFromString("0xabc123f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4064"), + TxID: hashToTxID("0xabc123f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4064"), Timestamp: transferTime, Nonce: uint32(2), Sequence: uint64(2), @@ -1007,7 +1007,7 @@ func TestFlowCancelProcessMsgForTimeFullCancel(t *testing.T) { // msg and asset that are NOT flow cancelable msg3 := common.MessagePublication{ - TxHash: hashFromString("0x888888f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a8888"), + TxID: hashToTxID("0x888888f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a8888"), Timestamp: time.Unix(int64(transferTime.Unix()+1), 0), Nonce: uint32(3), Sequence: uint64(3), @@ -1204,7 +1204,7 @@ func TestFlowCancelProcessMsgForTimePartialCancel(t *testing.T) { // Transfer from Ethereum to Sui via the token bridge msg1 := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: transferTime, Nonce: uint32(1), Sequence: uint64(1), @@ -1222,7 +1222,7 @@ func TestFlowCancelProcessMsgForTimePartialCancel(t *testing.T) { // Transfer from Sui to Ethereum via the token bridge msg2 := common.MessagePublication{ - TxHash: hashFromString("0xabc123f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4064"), + TxID: hashToTxID("0xabc123f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4064"), Timestamp: transferTime, Nonce: uint32(2), Sequence: uint64(2), @@ -1240,7 +1240,7 @@ func TestFlowCancelProcessMsgForTimePartialCancel(t *testing.T) { // msg and asset that are NOT flow cancelable msg3 := common.MessagePublication{ - TxHash: hashFromString("0x888888f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a8888"), + TxID: hashToTxID("0x888888f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a8888"), Timestamp: time.Unix(int64(transferTime.Unix()+1), 0), Nonce: uint32(3), Sequence: uint64(3), @@ -1391,7 +1391,7 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) { // The first two transfers should be accepted. msg1 := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(1), @@ -1402,7 +1402,7 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) { } msg2 := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(2), @@ -1444,7 +1444,7 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) { ) msg3 := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(3), @@ -1467,7 +1467,7 @@ func TestTransfersUpToAndOverTheLimit(t *testing.T) { // But a small one should still go through. msg4 := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(4), @@ -1517,7 +1517,7 @@ func TestPendingTransferBeingReleased(t *testing.T) { ) msg1 := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(1), @@ -1549,7 +1549,7 @@ func TestPendingTransferBeingReleased(t *testing.T) { ) msg2 := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(1), @@ -1581,7 +1581,7 @@ func TestPendingTransferBeingReleased(t *testing.T) { ) msg3 := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(1), @@ -1613,7 +1613,7 @@ func TestPendingTransferBeingReleased(t *testing.T) { ) msg4 := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(1), @@ -1744,7 +1744,7 @@ func TestPendingTransferFlowCancelsWhenReleased(t *testing.T) { // First message: consume most of the dailyLimit for the emitter chain msg1 := common.MessagePublication{ - TxHash: hashFromString("0x888888f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a8888"), + TxID: hashToTxID("0x888888f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a8888"), Timestamp: time.Unix(int64(transferTime.Unix()+1), 0), Nonce: uint32(1), Sequence: uint64(1), @@ -1762,7 +1762,7 @@ func TestPendingTransferFlowCancelsWhenReleased(t *testing.T) { // Second message: This transfer gets queued because the limit is exhausted msg2 := common.MessagePublication{ - TxHash: hashFromString("0x888888f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a8888"), + TxID: hashToTxID("0x888888f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a8888"), Timestamp: time.Unix(int64(transferTime.Unix()+2), 0), Nonce: uint32(2), Sequence: uint64(2), @@ -1781,7 +1781,7 @@ func TestPendingTransferFlowCancelsWhenReleased(t *testing.T) { // Third message: Incoming flow cancelling transfer to the emitter chain for the previous messages. This // reduces the Governor usage for that chain. msg3 := common.MessagePublication{ - TxHash: hashFromString("0x888888f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a8888"), + TxID: hashToTxID("0x888888f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a8888"), Timestamp: time.Unix(int64(transferTime.Unix()+3), 0), Nonce: uint32(3), Sequence: uint64(3), @@ -1973,7 +1973,7 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) { // The first VAA should be accepted. msg1 := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(1), @@ -2003,7 +2003,7 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) { // And so should the second. msg2 := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(1), @@ -2033,7 +2033,7 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) { // But the third, big one should be queued up. msg3 := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(1), @@ -2063,7 +2063,7 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) { // A fourth, smaller, but still too big one, should get enqueued. msg4 := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(1), @@ -2093,7 +2093,7 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) { // A fifth, smaller, but still too big one, should also get enqueued. msg5 := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(1), @@ -2123,7 +2123,7 @@ func TestSmallerPendingTransfersAfterBigOneShouldGetReleased(t *testing.T) { // A sixth, big one should also get enqueued. msg6 := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(1), @@ -2223,7 +2223,7 @@ func TestNumDaysForReleaseTimerReset(t *testing.T) { // message that, when processed, should exceed the big transfer size msg := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: messageTimestamp, Nonce: uint32(1), Sequence: uint64(3), @@ -2284,7 +2284,7 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T // The first small transfer should be accepted. msg1 := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(1), @@ -2314,7 +2314,7 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T // And so should the second. msg2 := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(2), @@ -2344,7 +2344,7 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T // But the third big one should get enqueued. msg3 := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(3), @@ -2501,7 +2501,7 @@ func TestSmallTransactionsGetReleasedWhenTheTimerExpires(t *testing.T) { // Submit a small transfer that will get enqueued due to the low daily limit. msg1 := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(1), @@ -2606,7 +2606,7 @@ func TestTransferPayloadTooShort(t *testing.T) { payloadBytes1 = payloadBytes1[0 : len(payloadBytes1)-1] msg := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(1), @@ -2697,7 +2697,7 @@ func TestDontReloadDuplicates(t *testing.T) { pending1 := &db.PendingTransfer{ ReleaseTime: now.Add(time.Hour * 24), Msg: common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(200), @@ -2712,7 +2712,7 @@ func TestDontReloadDuplicates(t *testing.T) { pending2 := &db.PendingTransfer{ ReleaseTime: now.Add(time.Hour * 24), Msg: common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(201), @@ -2963,7 +2963,7 @@ func TestReobservationOfPublishedMsg(t *testing.T) { // The first transfer should be accepted. msg := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(1), @@ -3026,7 +3026,7 @@ func TestReobservationOfEnqueued(t *testing.T) { // A big transfer should get enqueued. msg := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(1), @@ -3088,7 +3088,7 @@ func TestReusedMsgIdWithDifferentPayloadGetsProcessed(t *testing.T) { // The first transfer should be accepted. msg1 := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(1), @@ -3118,7 +3118,7 @@ func TestReusedMsgIdWithDifferentPayloadGetsProcessed(t *testing.T) { // A second message with the same msgId but a different payload should also get published and apply to the notional value. msg2 := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(1), @@ -3291,7 +3291,7 @@ func TestPendingTransferWithBadPayloadGetsDroppedNotReleased(t *testing.T) { // Create two big transactions. msg1 := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(1), Sequence: uint64(1), @@ -3308,7 +3308,7 @@ func TestPendingTransferWithBadPayloadGetsDroppedNotReleased(t *testing.T) { } msg2 := common.MessagePublication{ - TxHash: hashFromString("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), + TxID: hashToTxID("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063"), Timestamp: time.Unix(int64(1654543099), 0), Nonce: uint32(2), Sequence: uint64(2), diff --git a/node/pkg/node/node_test.go b/node/pkg/node/node_test.go index f7ad4c7171..1678544057 100644 --- a/node/pkg/node/node_test.go +++ b/node/pkg/node/node_test.go @@ -374,8 +374,9 @@ var someMsgEmitterChain vaa.ChainID = vaa.ChainIDSolana func someMessage() *common.MessagePublication { someMsgSequenceCounter++ + txID := [32]byte{byte(someMsgSequenceCounter % 8), byte(someMsgSequenceCounter / 8), 3} return &common.MessagePublication{ - TxHash: [32]byte{byte(someMsgSequenceCounter % 8), byte(someMsgSequenceCounter / 8), 3}, + TxID: txID[:], Timestamp: randomTime(), Nonce: math_rand.Uint32(), //nolint Sequence: someMsgSequenceCounter, @@ -439,8 +440,9 @@ func governedMsg(shouldBeDelayed bool) *common.MessagePublication { ) tokenBridgeSequenceCounter++ + txID := [32]byte{byte(tokenBridgeSequenceCounter % 8), byte(tokenBridgeSequenceCounter / 8), 3, 1, 10, 76} return &common.MessagePublication{ - TxHash: [32]byte{byte(tokenBridgeSequenceCounter % 8), byte(tokenBridgeSequenceCounter / 8), 3, 1, 10, 76}, + TxID: txID[:], Timestamp: randomTime(), Nonce: math_rand.Uint32(), //nolint Sequence: tokenBridgeSequenceCounter, @@ -458,7 +460,7 @@ func makeObsDb(tc []testCase) mock.ObservationDb { if t.unavailableInReobservation { continue } - db[t.msg.TxHash] = t.msg + db[eth_common.BytesToHash(t.msg.TxID)] = t.msg } return db } @@ -748,7 +750,7 @@ func runConsensusTests(t *testing.T, testCases []testCase, numGuardians int) { _, err := adminCs[adminRpcGuardianIndex].SendObservationRequest(queryCtx, &nodev1.SendObservationRequestRequest{ ObservationRequest: &gossipv1.ObservationRequest{ ChainId: uint32(testCase.msg.EmitterChain), - TxHash: testCase.msg.TxHash[:], + TxHash: testCase.msg.TxID, }, }) queryCancel() diff --git a/node/pkg/node/options.go b/node/pkg/node/options.go index a5b46e2d0a..319b1916a2 100644 --- a/node/pkg/node/options.go +++ b/node/pkg/node/options.go @@ -353,7 +353,7 @@ func GuardianOptionWatchers(watcherConfigs []watchers.WatcherConfig, ibcWatcherC level = zapcore.ErrorLevel } logger.Log(level, "SECURITY CRITICAL: Received observation from a chain that was not marked as originating from that chain", - zap.Stringer("tx", msg.TxHash), + zap.String("tx", msg.TxIDString()), zap.Stringer("emitter_address", msg.EmitterAddress), zap.Uint64("sequence", msg.Sequence), zap.Stringer("msgChainId", msg.EmitterChain), @@ -368,7 +368,7 @@ func GuardianOptionWatchers(watcherConfigs []watchers.WatcherConfig, ibcWatcherC level = zapcore.ErrorLevel } logger.Log(level, "SECURITY ERROR: Received observation with EmitterAddress == 0x00", - zap.Stringer("tx", msg.TxHash), + zap.String("tx", msg.TxIDString()), zap.Stringer("emitter_address", msg.EmitterAddress), zap.Uint64("sequence", msg.Sequence), zap.Stringer("msgChainId", msg.EmitterChain), @@ -380,7 +380,7 @@ func GuardianOptionWatchers(watcherConfigs []watchers.WatcherConfig, ibcWatcherC zap.Stringer("emitter_chain", msg.EmitterChain), zap.Stringer("emitter_address", msg.EmitterAddress), zap.Uint32("nonce", msg.Nonce), - zap.Stringer("txhash", msg.TxHash), + zap.String("txID", msg.TxIDString()), zap.Time("timestamp", msg.Timestamp)) } else { g.msgC.writeC <- msg diff --git a/node/pkg/processor/batch_obs_test.go b/node/pkg/processor/batch_obs_test.go index 381fd985ca..6391dcb804 100644 --- a/node/pkg/processor/batch_obs_test.go +++ b/node/pkg/processor/batch_obs_test.go @@ -8,7 +8,6 @@ import ( "github.com/certusone/wormhole/node/pkg/devnet" "github.com/certusone/wormhole/node/pkg/p2p" gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1" - ethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/stretchr/testify/assert" @@ -43,6 +42,7 @@ func TestMarshalSignedObservationBatch(t *testing.T) { NumObservations := uint64(p2p.MaxObservationBatchSize) observations := make([]*gossipv1.Observation, 0, NumObservations) + txHash := []byte("0123456789012345678901234567890123456789012345678901234567890123") // 64 bytes, the size of a Solana signature. for seqNo := uint64(1); seqNo <= NumObservations; seqNo++ { vaa := getUniqueVAA(seqNo) digest := vaa.SigningDigest() @@ -52,14 +52,14 @@ func TestMarshalSignedObservationBatch(t *testing.T) { observations = append(observations, &gossipv1.Observation{ Hash: digest.Bytes(), Signature: sig, - TxHash: ethcommon.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(), + TxHash: txHash, MessageId: vaa.MessageID(), }) } obsBuf, err := proto.Marshal(observations[0]) require.NoError(t, err) - assert.Equal(t, 205, len(obsBuf)) + assert.Equal(t, (173 + len(txHash)), len(obsBuf)) batch := gossipv1.SignedObservationBatch{ Addr: crypto.PubkeyToAddress(gk.PublicKey).Bytes(), diff --git a/node/pkg/processor/benchmark_test.go b/node/pkg/processor/benchmark_test.go index c85fe5031b..1cb94a49e7 100644 --- a/node/pkg/processor/benchmark_test.go +++ b/node/pkg/processor/benchmark_test.go @@ -194,7 +194,7 @@ func createProcessorForTest(b *testing.B, numVAAs int, ctx context.Context, db * func (pd *ProcessorData) createMessagePublication(b *testing.B, sequence uint64) *common.MessagePublication { b.Helper() return &common.MessagePublication{ - TxHash: ethCommon.HexToHash(fmt.Sprintf("%064x", sequence)), + TxID: ethCommon.HexToHash(fmt.Sprintf("%064x", sequence)).Bytes(), Timestamp: time.Now(), Nonce: 42, Sequence: sequence, @@ -235,7 +235,7 @@ func (pd *ProcessorData) createObservation(b *testing.B, guardianIdx int, k *com return &gossipv1.Observation{ Hash: digest.Bytes(), Signature: signature, - TxHash: k.TxHash.Bytes(), + TxHash: k.TxID, MessageId: pd.messageID(k.Sequence), } } diff --git a/node/pkg/processor/message.go b/node/pkg/processor/message.go index 929f951760..ec67aae0fe 100644 --- a/node/pkg/processor/message.go +++ b/node/pkg/processor/message.go @@ -37,7 +37,7 @@ func (p *Processor) handleMessage(ctx context.Context, k *common.MessagePublicat p.logger.Warn("dropping observation since we haven't initialized our guardian set yet", zap.String("message_id", k.MessageIDString()), zap.Uint32("nonce", k.Nonce), - zap.Stringer("txhash", k.TxHash), + zap.String("txID", k.TxIDString()), zap.Time("timestamp", k.Timestamp), ) return @@ -80,8 +80,8 @@ func (p *Processor) handleMessage(ctx context.Context, k *common.MessagePublicat if p.logger.Core().Enabled(zapcore.DebugLevel) { p.logger.Debug("observed and signed confirmed message publication", zap.String("message_id", k.MessageIDString()), - zap.Stringer("txhash", k.TxHash), - zap.String("txhash_b58", base58.Encode(k.TxHash.Bytes())), + zap.String("txID", k.TxIDString()), + zap.String("txID_b58", base58.Encode(k.TxID)), zap.String("hash", hash), zap.Uint32("nonce", k.Nonce), zap.Time("timestamp", k.Timestamp), @@ -93,7 +93,7 @@ func (p *Processor) handleMessage(ctx context.Context, k *common.MessagePublicat } // Broadcast the signature. - ourObs, msg := p.broadcastSignature(v.MessageID(), k.TxHash.Bytes(), digest, signature, shouldPublishImmediately) + ourObs, msg := p.broadcastSignature(v.MessageID(), k.TxID, digest, signature, shouldPublishImmediately) // Indicate that we observed this one. observationsReceivedTotal.Inc() @@ -114,7 +114,7 @@ func (p *Processor) handleMessage(ctx context.Context, k *common.MessagePublicat // Update our state. s.ourObservation = v - s.txHash = k.TxHash.Bytes() + s.txHash = k.TxID s.source = v.GetEmitterChain().String() s.gs = p.gs // guaranteed to match ourObservation - there's no concurrent access to p.gs s.signatures[p.ourAddr] = signature diff --git a/node/pkg/watchers/algorand/watcher.go b/node/pkg/watchers/algorand/watcher.go index 5928bed401..e295e2e3ce 100644 --- a/node/pkg/watchers/algorand/watcher.go +++ b/node/pkg/watchers/algorand/watcher.go @@ -157,7 +157,7 @@ func lookAtTxn(e *Watcher, t types.SignedTxnInBlock, b types.Block, logger *zap. for _, obs := range observations { observation := &common.MessagePublication{ - TxHash: txHash, + TxID: txHash.Bytes(), Timestamp: time.Unix(b.TimeStamp, 0), Nonce: obs.nonce, Sequence: obs.sequence, diff --git a/node/pkg/watchers/aptos/watcher.go b/node/pkg/watchers/aptos/watcher.go index 9ea36ac4bb..90e1ee3212 100644 --- a/node/pkg/watchers/aptos/watcher.go +++ b/node/pkg/watchers/aptos/watcher.go @@ -305,7 +305,7 @@ func (e *Watcher) observeData(logger *zap.Logger, data gjson.Result, nativeSeq u } observation := &common.MessagePublication{ - TxHash: txHash, + TxID: txHash.Bytes(), Timestamp: time.Unix(int64(ts.Uint()), 0), Nonce: uint32(nonce.Uint()), // uint32 Sequence: sequence.Uint(), @@ -319,7 +319,7 @@ func (e *Watcher) observeData(logger *zap.Logger, data gjson.Result, nativeSeq u aptosMessagesConfirmed.Inc() logger.Info("message observed", - zap.Stringer("txHash", observation.TxHash), + zap.String("txHash", observation.TxIDString()), zap.Time("timestamp", observation.Timestamp), zap.Uint32("nonce", observation.Nonce), zap.Uint64("sequence", observation.Sequence), diff --git a/node/pkg/watchers/cosmwasm/watcher.go b/node/pkg/watchers/cosmwasm/watcher.go index 0cf856518b..0ec30b8b7b 100644 --- a/node/pkg/watchers/cosmwasm/watcher.go +++ b/node/pkg/watchers/cosmwasm/watcher.go @@ -511,7 +511,7 @@ func EventsToMessagePublications(contract string, txHash string, events []gjson. continue } messagePublication := &common.MessagePublication{ - TxHash: txHashValue, + TxID: txHashValue.Bytes(), Timestamp: time.Unix(blockTimeInt, 0), Nonce: uint32(nonceInt), Sequence: sequenceInt, diff --git a/node/pkg/watchers/evm/by_transaction.go b/node/pkg/watchers/evm/by_transaction.go index e1d8cdee8c..4a109864ee 100644 --- a/node/pkg/watchers/evm/by_transaction.go +++ b/node/pkg/watchers/evm/by_transaction.go @@ -75,7 +75,7 @@ func MessageEventsForTransaction( } message := &common.MessagePublication{ - TxHash: ev.Raw.TxHash, + TxID: ev.Raw.TxHash.Bytes(), Timestamp: time.Unix(int64(blockTime), 0), Nonce: ev.Nonce, Sequence: ev.Sequence, diff --git a/node/pkg/watchers/evm/watcher.go b/node/pkg/watchers/evm/watcher.go index 70b8d379c2..32e53cb219 100644 --- a/node/pkg/watchers/evm/watcher.go +++ b/node/pkg/watchers/evm/watcher.go @@ -327,7 +327,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { if msg.ConsistencyLevel == vaa.ConsistencyLevelPublishImmediately { logger.Info("re-observed message publication transaction, publishing it immediately", zap.String("msgId", msg.MessageIDString()), - zap.Stringer("txHash", msg.TxHash), + zap.String("txHash", msg.TxIDString()), zap.Uint64("current_block", blockNumberU), zap.Uint64("observed_block", blockNumber), ) @@ -339,7 +339,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { if safeBlockNumberU == 0 { logger.Error("no safe block number available, ignoring observation request", zap.String("msgId", msg.MessageIDString()), - zap.Stringer("txHash", msg.TxHash), + zap.String("txHash", msg.TxIDString()), ) continue } @@ -347,7 +347,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { if blockNumber <= safeBlockNumberU { logger.Info("re-observed message publication transaction", zap.String("msgId", msg.MessageIDString()), - zap.Stringer("txHash", msg.TxHash), + zap.String("txHash", msg.TxIDString()), zap.Uint64("current_safe_block", safeBlockNumberU), zap.Uint64("observed_block", blockNumber), ) @@ -355,7 +355,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { } else { logger.Info("ignoring re-observed message publication transaction", zap.String("msgId", msg.MessageIDString()), - zap.Stringer("txHash", msg.TxHash), + zap.String("txHash", msg.TxIDString()), zap.Uint64("current_safe_block", safeBlockNumberU), zap.Uint64("observed_block", blockNumber), ) @@ -367,7 +367,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { if blockNumberU == 0 { logger.Error("no block number available, ignoring observation request", zap.String("msgId", msg.MessageIDString()), - zap.Stringer("txHash", msg.TxHash), + zap.String("txHash", msg.TxIDString()), ) continue } @@ -384,7 +384,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { if blockNumber <= blockNumberU { logger.Info("re-observed message publication transaction", zap.String("msgId", msg.MessageIDString()), - zap.Stringer("txHash", msg.TxHash), + zap.String("txHash", msg.TxIDString()), zap.Uint64("current_block", blockNumberU), zap.Uint64("observed_block", blockNumber), ) @@ -392,7 +392,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { } else { logger.Info("ignoring re-observed message publication transaction", zap.String("msgId", msg.MessageIDString()), - zap.Stringer("txHash", msg.TxHash), + zap.String("txHash", msg.TxIDString()), zap.Uint64("current_block", blockNumberU), zap.Uint64("observed_block", blockNumber), ) @@ -512,7 +512,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { if pLock.height <= blockNumberU { msm := time.Now() timeout, cancel := context.WithTimeout(ctx, 5*time.Second) - tx, err := w.ethConn.TransactionReceipt(timeout, pLock.message.TxHash) + tx, err := w.ethConn.TransactionReceipt(timeout, eth_common.BytesToHash(pLock.message.TxID)) queryLatency.WithLabelValues(w.networkName, "transaction_receipt").Observe(time.Since(msm).Seconds()) cancel() @@ -527,7 +527,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { if tx == nil || err == rpc.ErrNoResult || (err != nil && err.Error() == "not found") { logger.Warn("tx was orphaned", zap.String("msgId", pLock.message.MessageIDString()), - zap.Stringer("txHash", pLock.message.TxHash), + zap.String("txHash", pLock.message.TxIDString()), zap.Stringer("blockHash", key.BlockHash), zap.Uint64("target_blockNum", pLock.height), zap.Stringer("current_blockNum", ev.Number), @@ -545,7 +545,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { if tx.Status != 1 { logger.Error("transaction receipt with non-success status", zap.String("msgId", pLock.message.MessageIDString()), - zap.Stringer("txHash", pLock.message.TxHash), + zap.String("txHash", pLock.message.TxIDString()), zap.Stringer("blockHash", key.BlockHash), zap.Uint64("target_blockNum", pLock.height), zap.Stringer("current_blockNum", ev.Number), @@ -563,7 +563,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { // An error from this "transient" case has persisted for more than MaxWaitConfirmations. logger.Info("observation timed out", zap.String("msgId", pLock.message.MessageIDString()), - zap.Stringer("txHash", pLock.message.TxHash), + zap.String("txHash", pLock.message.TxIDString()), zap.Stringer("blockHash", key.BlockHash), zap.Uint64("target_blockNum", pLock.height), zap.Stringer("current_blockNum", ev.Number), @@ -575,7 +575,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { } else { logger.Warn("transaction could not be fetched", zap.String("msgId", pLock.message.MessageIDString()), - zap.Stringer("txHash", pLock.message.TxHash), + zap.String("txHash", pLock.message.TxIDString()), zap.Stringer("blockHash", key.BlockHash), zap.Uint64("target_blockNum", pLock.height), zap.Stringer("current_blockNum", ev.Number), @@ -592,7 +592,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { if tx.BlockHash != key.BlockHash { logger.Info("tx got dropped and mined in a different block; the message should have been reobserved", zap.String("msgId", pLock.message.MessageIDString()), - zap.Stringer("txHash", pLock.message.TxHash), + zap.String("txHash", pLock.message.TxIDString()), zap.Stringer("blockHash", key.BlockHash), zap.Uint64("target_blockNum", pLock.height), zap.Stringer("current_blockNum", ev.Number), @@ -606,7 +606,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { logger.Info("observation confirmed", zap.String("msgId", pLock.message.MessageIDString()), - zap.Stringer("txHash", pLock.message.TxHash), + zap.String("txHash", pLock.message.TxIDString()), zap.Stringer("blockHash", key.BlockHash), zap.Uint64("target_blockNum", pLock.height), zap.Stringer("current_blockNum", ev.Number), @@ -832,7 +832,7 @@ func (w *Watcher) getBlockTime(ctx context.Context, blockHash eth_common.Hash) ( // postMessage creates a message object from a log event and adds it to the pending list for processing. func (w *Watcher) postMessage(logger *zap.Logger, ev *ethabi.AbiLogMessagePublished, blockTime uint64) { message := &common.MessagePublication{ - TxHash: ev.Raw.TxHash, + TxID: ev.Raw.TxHash.Bytes(), Timestamp: time.Unix(int64(blockTime), 0), Nonce: ev.Nonce, Sequence: ev.Sequence, @@ -847,7 +847,7 @@ func (w *Watcher) postMessage(logger *zap.Logger, ev *ethabi.AbiLogMessagePublis if message.ConsistencyLevel == vaa.ConsistencyLevelPublishImmediately { logger.Info("found new message publication transaction, publishing it immediately", zap.String("msgId", message.MessageIDString()), - zap.Stringer("txHash", message.TxHash), + zap.String("txHash", message.TxIDString()), zap.Uint64("blockNum", ev.Raw.BlockNumber), zap.Uint64("latestFinalizedBlock", atomic.LoadUint64(&w.latestFinalizedBlockNumber)), zap.Stringer("blockHash", ev.Raw.BlockHash), @@ -863,7 +863,7 @@ func (w *Watcher) postMessage(logger *zap.Logger, ev *ethabi.AbiLogMessagePublis logger.Info("found new message publication transaction", zap.String("msgId", message.MessageIDString()), - zap.Stringer("txHash", message.TxHash), + zap.String("txHash", message.TxIDString()), zap.Uint64("blockNum", ev.Raw.BlockNumber), zap.Uint64("latestFinalizedBlock", atomic.LoadUint64(&w.latestFinalizedBlockNumber)), zap.Stringer("blockHash", ev.Raw.BlockHash), @@ -873,7 +873,7 @@ func (w *Watcher) postMessage(logger *zap.Logger, ev *ethabi.AbiLogMessagePublis ) key := pendingKey{ - TxHash: message.TxHash, + TxHash: eth_common.BytesToHash(message.TxID), BlockHash: ev.Raw.BlockHash, EmitterAddress: message.EmitterAddress, Sequence: message.Sequence, diff --git a/node/pkg/watchers/ibc/watcher.go b/node/pkg/watchers/ibc/watcher.go index 74ad36b8db..0292242751 100644 --- a/node/pkg/watchers/ibc/watcher.go +++ b/node/pkg/watchers/ibc/watcher.go @@ -537,7 +537,7 @@ func parseIbcReceivePublishEvent(logger *zap.Logger, desiredContract string, eve evt := new(ibcReceivePublishEvent) evt.Msg = new(common.MessagePublication) - evt.Msg.TxHash = txHash + evt.Msg.TxID = txHash.Bytes() evt.ChannelID, err = attributes.GetAsString("channel_id") if err != nil { @@ -595,7 +595,7 @@ func (w *Watcher) processIbcReceivePublishEvent(evt *ibcReceivePublishEvent, obs if err != nil { w.logger.Error("query for IBC channel ID failed", zap.String("IbcChannelID", evt.ChannelID), - zap.Stringer("TxHash", evt.Msg.TxHash), + zap.String("TxID", evt.Msg.TxIDString()), zap.Stringer("EmitterChain", evt.Msg.EmitterChain), zap.Stringer("EmitterAddress", evt.Msg.EmitterAddress), zap.Uint64("Sequence", evt.Msg.Sequence), @@ -613,7 +613,7 @@ func (w *Watcher) processIbcReceivePublishEvent(evt *ibcReceivePublishEvent, obs // Therefore we don't want to return an error here. Restarting won't help. w.logger.Error(fmt.Sprintf("received %s message from unknown IBC channel, dropping observation", observationType), zap.String("IbcChannelID", evt.ChannelID), - zap.Stringer("TxHash", evt.Msg.TxHash), + zap.String("TxID", evt.Msg.TxIDString()), zap.Stringer("EmitterChain", evt.Msg.EmitterChain), zap.Stringer("EmitterAddress", evt.Msg.EmitterAddress), zap.Uint64("Sequence", evt.Msg.Sequence), @@ -631,7 +631,7 @@ func (w *Watcher) processIbcReceivePublishEvent(evt *ibcReceivePublishEvent, obs w.logger.Debug(fmt.Sprintf("received %s message from an unconfigured chain, dropping observation", observationType), zap.String("IbcChannelID", evt.ChannelID), zap.Stringer("ChainID", mappedChainID), - zap.Stringer("TxHash", evt.Msg.TxHash), + zap.String("TxID", evt.Msg.TxIDString()), zap.Stringer("EmitterChain", evt.Msg.EmitterChain), zap.Stringer("EmitterAddress", evt.Msg.EmitterAddress), zap.Uint64("Sequence", evt.Msg.Sequence), @@ -647,7 +647,7 @@ func (w *Watcher) processIbcReceivePublishEvent(evt *ibcReceivePublishEvent, obs zap.String("IbcChannelID", evt.ChannelID), zap.Uint16("MappedChainID", uint16(mappedChainID)), zap.Uint16("ExpectedChainID", uint16(ce.chainID)), - zap.Stringer("TxHash", evt.Msg.TxHash), + zap.String("TxID", evt.Msg.TxIDString()), zap.Stringer("EmitterChain", evt.Msg.EmitterChain), zap.Stringer("EmitterAddress", evt.Msg.EmitterAddress), zap.Uint64("Sequence", evt.Msg.Sequence), @@ -662,7 +662,7 @@ func (w *Watcher) processIbcReceivePublishEvent(evt *ibcReceivePublishEvent, obs w.logger.Info(fmt.Sprintf("%s message detected", observationType), zap.String("IbcChannelID", evt.ChannelID), zap.String("ChainName", ce.chainName), - zap.Stringer("TxHash", evt.Msg.TxHash), + zap.String("TxID", evt.Msg.TxIDString()), zap.Stringer("EmitterChain", evt.Msg.EmitterChain), zap.Stringer("EmitterAddress", evt.Msg.EmitterAddress), zap.Uint64("Sequence", evt.Msg.Sequence), diff --git a/node/pkg/watchers/ibc/watcher_test.go b/node/pkg/watchers/ibc/watcher_test.go index 016af80970..0ebef19ccb 100644 --- a/node/pkg/watchers/ibc/watcher_test.go +++ b/node/pkg/watchers/ibc/watcher_test.go @@ -55,7 +55,7 @@ func TestParseIbcReceivePublishEvent(t *testing.T) { expectedResult := ibcReceivePublishEvent{ ChannelID: "channel-0", Msg: &common.MessagePublication{ - TxHash: txHash, + TxID: txHash.Bytes(), EmitterAddress: expectedSender, EmitterChain: vaa.ChainIDTerra2, Nonce: 1, diff --git a/node/pkg/watchers/near/tx_processing.go b/node/pkg/watchers/near/tx_processing.go index afcf17c79e..5a8f272e69 100644 --- a/node/pkg/watchers/near/tx_processing.go +++ b/node/pkg/watchers/near/tx_processing.go @@ -235,7 +235,7 @@ func (e *Watcher) processWormholeLog(logger *zap.Logger, _ context.Context, job ts := outcomeBlockHeader.Timestamp observation := &common.MessagePublication{ - TxHash: txHashEthFormat, + TxID: txHashEthFormat.Bytes(), Timestamp: time.Unix(int64(ts), 0), Nonce: pubEvent.Nonce, Sequence: pubEvent.Seq, diff --git a/node/pkg/watchers/near/watcher_test.go b/node/pkg/watchers/near/watcher_test.go index 0389c96251..1fa47f62a5 100644 --- a/node/pkg/watchers/near/watcher_test.go +++ b/node/pkg/watchers/near/watcher_test.go @@ -14,7 +14,6 @@ import ( gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1" "github.com/certusone/wormhole/node/pkg/supervisor" mockserver "github.com/certusone/wormhole/node/pkg/watchers/near/nearapi/mock" - eth_common "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/assert" "github.com/wormhole-foundation/wormhole/sdk/vaa" "go.uber.org/zap" @@ -231,7 +230,7 @@ func TestWatcherSimple(t *testing.T) { }, expectedMsgObserved: []*common.MessagePublication{ { - TxHash: eth_common.BytesToHash(txHashBytes), + TxID: txHashBytes, EmitterAddress: portalEmitterAddress(), ConsistencyLevel: 0, EmitterChain: vaa.ChainIDNear, @@ -273,7 +272,7 @@ func TestWatcherSimple2(t *testing.T) { }, expectedMsgObserved: []*common.MessagePublication{ { - TxHash: eth_common.BytesToHash(txHashBytes), + TxID: txHashBytes, EmitterAddress: portalEmitterAddress(), ConsistencyLevel: 0, EmitterChain: vaa.ChainIDNear, @@ -309,7 +308,7 @@ func TestWatcherReobservation(t *testing.T) { }, expectedMsgReObserved: []*common.MessagePublication{ { - TxHash: eth_common.BytesToHash(txHashBytes), + TxID: txHashBytes, EmitterAddress: portalEmitterAddress(), ConsistencyLevel: 0, EmitterChain: vaa.ChainIDNear, @@ -359,7 +358,7 @@ func TestWatcherDelayedFinal(t *testing.T) { latestFinalBlocks: lfb, expectedMsgObserved: []*common.MessagePublication{ { - TxHash: eth_common.BytesToHash(txHashBytes), + TxID: txHashBytes, EmitterAddress: portalEmitterAddress(), ConsistencyLevel: 0, EmitterChain: vaa.ChainIDNear, @@ -398,7 +397,7 @@ func TestWatcherDelayedFinalAndGaps(t *testing.T) { }, expectedMsgObserved: []*common.MessagePublication{ { - TxHash: eth_common.BytesToHash(txHashBytes), + TxID: txHashBytes, EmitterAddress: portalEmitterAddress(), ConsistencyLevel: 0, EmitterChain: vaa.ChainIDNear, @@ -444,7 +443,7 @@ func TestWatcherSynthetic(t *testing.T) { }, expectedMsgReObserved: []*common.MessagePublication{ { - TxHash: eth_common.BytesToHash([]byte("_____________________________TX1")), + TxID: []byte("_____________________________TX1"), EmitterAddress: portalEmitterAddress(), ConsistencyLevel: 0, EmitterChain: vaa.ChainIDNear, @@ -455,7 +454,7 @@ func TestWatcherSynthetic(t *testing.T) { Unreliable: false, }, { - TxHash: eth_common.BytesToHash([]byte("_____________________________TX2")), + TxID: []byte("_____________________________TX2"), EmitterAddress: portalEmitterAddress(), ConsistencyLevel: 0, EmitterChain: vaa.ChainIDNear, @@ -466,7 +465,7 @@ func TestWatcherSynthetic(t *testing.T) { Unreliable: false, }, { - TxHash: eth_common.BytesToHash([]byte("_____________________________TX3")), + TxID: []byte("_____________________________TX3"), EmitterAddress: portalEmitterAddress(), ConsistencyLevel: 0, EmitterChain: vaa.ChainIDNear, @@ -537,7 +536,7 @@ func TestWatcherUnfinalized(t *testing.T) { }, expectedMsgReObserved: []*common.MessagePublication{ { - TxHash: eth_common.BytesToHash([]byte("_____________________________TX1")), + TxID: []byte("_____________________________TX1"), EmitterAddress: portalEmitterAddress(), ConsistencyLevel: 0, EmitterChain: vaa.ChainIDNear, @@ -548,7 +547,7 @@ func TestWatcherUnfinalized(t *testing.T) { Unreliable: false, }, { - TxHash: eth_common.BytesToHash([]byte("_____________________________TX3")), + TxID: []byte("_____________________________TX3"), EmitterAddress: portalEmitterAddress(), ConsistencyLevel: 0, EmitterChain: vaa.ChainIDNear, diff --git a/node/pkg/watchers/solana/client.go b/node/pkg/watchers/solana/client.go index c2c11c74d5..b8ad325db2 100644 --- a/node/pkg/watchers/solana/client.go +++ b/node/pkg/watchers/solana/client.go @@ -969,7 +969,7 @@ func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, a } observation := &common.MessagePublication{ - TxHash: txHash, + TxID: txHash.Bytes(), Timestamp: time.Unix(int64(proposal.SubmissionTime), 0), Nonce: proposal.Nonce, Sequence: proposal.Sequence, diff --git a/node/pkg/watchers/sui/watcher.go b/node/pkg/watchers/sui/watcher.go index fae0635d52..8327092ebc 100644 --- a/node/pkg/watchers/sui/watcher.go +++ b/node/pkg/watchers/sui/watcher.go @@ -262,7 +262,7 @@ func (e *Watcher) inspectBody(logger *zap.Logger, body SuiResult, isReobservatio } observation := &common.MessagePublication{ - TxHash: txHashEthFormat, + TxID: txHashEthFormat.Bytes(), Timestamp: time.Unix(ts, 0), Nonce: uint32(*fields.Nonce), Sequence: seq, @@ -276,7 +276,7 @@ func (e *Watcher) inspectBody(logger *zap.Logger, body SuiResult, isReobservatio suiMessagesConfirmed.Inc() logger.Info("message observed", - zap.Stringer("txHash", observation.TxHash), + zap.String("txHash", observation.TxIDString()), zap.Time("timestamp", observation.Timestamp), zap.Uint32("nonce", observation.Nonce), zap.Uint64("sequence", observation.Sequence),