diff --git a/interactors/nonceHandlerV1/addressNonceHandler.go b/interactors/nonceHandlerV1/addressNonceHandler.go index f087412e..5f62ab00 100644 --- a/interactors/nonceHandlerV1/addressNonceHandler.go +++ b/interactors/nonceHandlerV1/addressNonceHandler.go @@ -7,6 +7,7 @@ import ( "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/data/transaction" + sdkCore "github.com/multiversx/mx-sdk-go/core" "github.com/multiversx/mx-sdk-go/interactors" ) diff --git a/interactors/nonceHandlerV1/nonceTransactionsHandler.go b/interactors/nonceHandlerV1/nonceTransactionsHandler.go index 1d397193..3ccfc4e8 100644 --- a/interactors/nonceHandlerV1/nonceTransactionsHandler.go +++ b/interactors/nonceHandlerV1/nonceTransactionsHandler.go @@ -9,6 +9,7 @@ import ( "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/data/transaction" logger "github.com/multiversx/mx-chain-logger-go" + "github.com/multiversx/mx-sdk-go/core" "github.com/multiversx/mx-sdk-go/data" "github.com/multiversx/mx-sdk-go/interactors" @@ -111,14 +112,12 @@ func (nth *nonceTransactionsHandlerV1) SendTransaction(ctx context.Context, tx * } func (nth *nonceTransactionsHandlerV1) resendTransactionsLoop(ctx context.Context, intervalToResend time.Duration) { - timer := time.NewTimer(intervalToResend) - defer timer.Stop() + ticker := time.NewTicker(intervalToResend) + defer ticker.Stop() for { - timer.Reset(intervalToResend) - select { - case <-timer.C: + case <-ticker.C: nth.resendTransactions(ctx) case <-ctx.Done(): log.Debug("finishing nonceTransactionsHandlerV1.resendTransactionsLoop...") diff --git a/interactors/nonceHandlerV2/addressNonceHandler.go b/interactors/nonceHandlerV2/addressNonceHandler.go index 5b69ee37..2ea8a15c 100644 --- a/interactors/nonceHandlerV2/addressNonceHandler.go +++ b/interactors/nonceHandlerV2/addressNonceHandler.go @@ -7,6 +7,7 @@ import ( "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/data/transaction" + sdkCore "github.com/multiversx/mx-sdk-go/core" "github.com/multiversx/mx-sdk-go/interactors" ) @@ -161,7 +162,7 @@ func (anh *addressNonceHandler) ReSendTransactionsIfRequired(ctx context.Context return err } - log.Debug("resent transactions", "address", addressAsBech32String, "total txs", len(resendableTxs), "received hashes", len(hashes)) + log.Info("resent transactions", "address", addressAsBech32String, "total txs", len(resendableTxs), "received hashes", len(hashes)) return nil } diff --git a/interactors/nonceHandlerV2/nonceTransactionsHandler.go b/interactors/nonceHandlerV2/nonceTransactionsHandler.go index 297fc78d..1f488430 100644 --- a/interactors/nonceHandlerV2/nonceTransactionsHandler.go +++ b/interactors/nonceHandlerV2/nonceTransactionsHandler.go @@ -9,6 +9,7 @@ import ( "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/data/transaction" logger "github.com/multiversx/mx-chain-logger-go" + "github.com/multiversx/mx-sdk-go/core" "github.com/multiversx/mx-sdk-go/data" "github.com/multiversx/mx-sdk-go/interactors" @@ -149,14 +150,12 @@ func (nth *nonceTransactionsHandlerV2) SendTransaction(ctx context.Context, tx * } func (nth *nonceTransactionsHandlerV2) resendTransactionsLoop(ctx context.Context) { - timer := time.NewTimer(nth.intervalToResend) - defer timer.Stop() + ticker := time.NewTicker(nth.intervalToResend) + defer ticker.Stop() for { - timer.Reset(nth.intervalToResend) - select { - case <-timer.C: + case <-ticker.C: nth.resendTransactions(ctx) case <-ctx.Done(): log.Debug("finishing nonceTransactionsHandlerV2.resendTransactionsLoop...") diff --git a/interactors/nonceHandlerV3/addressNonceHandler.go b/interactors/nonceHandlerV3/addressNonceHandler.go new file mode 100644 index 00000000..d725dbb2 --- /dev/null +++ b/interactors/nonceHandlerV3/addressNonceHandler.go @@ -0,0 +1,230 @@ +package nonceHandlerV3 + +import ( + "context" + "fmt" + "sync" + + "github.com/multiversx/mx-chain-core-go/core" + "github.com/multiversx/mx-chain-core-go/core/check" + "github.com/multiversx/mx-chain-core-go/data/transaction" + + sdkCore "github.com/multiversx/mx-sdk-go/core" + "github.com/multiversx/mx-sdk-go/interactors" +) + +type TransactionResponse struct { + TxHash string + Error error +} + +// addressNonceHandler is the handler used for one address. It is able to handle the current +// nonce as max(current_stored_nonce, account_nonce). After each call of the getNonce function +// the current_stored_nonce is incremented. This will prevent "nonce too low in transaction" +// errors on the node interceptor. To prevent the "nonce too high in transaction" error, +// a retrial mechanism is implemented. This struct is able to store all sent transactions, +// having a function that sweeps the map in order to resend a transaction or remove them +// because they were executed. This struct is concurrent safe. +type addressNonceHandler struct { + mut sync.RWMutex + address sdkCore.AddressHandler + proxy interactors.Proxy + computedNonceWasSet bool + computedNonce uint64 + lowestNonce uint64 + gasPrice uint64 + nonceUntilGasIncreased uint64 + transactions *TransactionQueueHandler + + responsesChannels map[uint64]chan TransactionResponse +} + +// NewAddressNonceHandler returns a new instance of a addressNonceHandler +func NewAddressNonceHandler(proxy interactors.Proxy, address sdkCore.AddressHandler) (interactors.AddressNonceHandler, error) { + if check.IfNil(proxy) { + return nil, interactors.ErrNilProxy + } + if check.IfNil(address) { + return nil, interactors.ErrNilAddress + } + return &addressNonceHandler{ + address: address, + proxy: proxy, + transactions: NewTransactionQueueHandler(), + + responsesChannels: make(map[uint64]chan TransactionResponse), + }, nil +} + +// ApplyNonceAndGasPrice will apply the computed nonce to the given FrontendTransaction +func (anh *addressNonceHandler) ApplyNonceAndGasPrice(ctx context.Context, tx *transaction.FrontendTransaction) error { + oldTx := anh.getOlderTxWithSameNonce(tx) + if oldTx != nil { + err := anh.handleTxWithSameNonce(oldTx, tx) + if err != nil { + return err + } + } + + nonce, err := anh.getNonceUpdatingCurrent(ctx) + tx.Nonce = nonce + if err != nil { + return err + } + + anh.fetchGasPriceIfRequired(ctx, nonce) + tx.GasPrice = core.MaxUint64(anh.gasPrice, tx.GasPrice) + return nil +} + +func (anh *addressNonceHandler) handleTxWithSameNonce(oldTx *transaction.FrontendTransaction, tx *transaction.FrontendTransaction) error { + if oldTx.GasPrice < tx.GasPrice { + return nil + } + + if oldTx.GasPrice == tx.GasPrice && oldTx.GasPrice < anh.gasPrice { + return nil + } + + return interactors.ErrTxWithSameNonceAndGasPriceAlreadySent +} + +func (anh *addressNonceHandler) fetchGasPriceIfRequired(ctx context.Context, nonce uint64) { + if nonce == anh.nonceUntilGasIncreased+1 || anh.gasPrice == 0 { + networkConfig, err := anh.proxy.GetNetworkConfig(ctx) + + anh.mut.Lock() + defer anh.mut.Unlock() + if err != nil { + log.Error("%w: while fetching network config", err) + anh.gasPrice = 0 + return + } + anh.gasPrice = networkConfig.MinGasPrice + } +} + +func (anh *addressNonceHandler) getNonceUpdatingCurrent(ctx context.Context) (uint64, error) { + account, err := anh.proxy.GetAccount(ctx, anh.address) + if err != nil { + return 0, err + } + + if anh.lowestNonce > account.Nonce { + return account.Nonce, interactors.ErrGapNonce + } + + anh.mut.Lock() + defer anh.mut.Unlock() + + if !anh.computedNonceWasSet { + anh.computedNonce = account.Nonce + anh.computedNonceWasSet = true + + return anh.computedNonce, nil + } + + anh.computedNonce++ + + return core.MaxUint64(anh.computedNonce, account.Nonce), nil +} + +// ReSendTransactionsIfRequired will resend the cached transactions that still have a nonce greater that the one fetched from the blockchain +func (anh *addressNonceHandler) ReSendTransactionsIfRequired(ctx context.Context) error { + account, err := anh.proxy.GetAccount(ctx, anh.address) + if err != nil { + return err + } + + anh.mut.Lock() + + //resendableTxs := make([]*transaction.FrontendTransaction, 0, anh.transactions.tq.Len()) + //minNonce := anh.computedNonce + + addressAsBech32String, err := anh.address.AddressAsBech32String() + if err != nil { + return fmt.Errorf("failed to retrieve address as bech32: %w", err) + } + + for anh.transactions.Len() > 0 { + t := anh.transactions.NextTransaction() + + if t.Nonce > account.Nonce { + hash, err := anh.proxy.SendTransaction(ctx, t) + if err != nil { + log.Error("failed to send transaction", "error", err.Error()) + resp := TransactionResponse{TxHash: "", Error: err} + anh.responsesChannels[t.Nonce] <- resp + return nil + } + resp := TransactionResponse{TxHash: hash, Error: nil} + anh.responsesChannels[t.Nonce] <- resp + log.Info(fmt.Sprintf("successfully resent transaction with nonce %d for address %q", t.Nonce, addressAsBech32String), "hash", hash) + } + } + + //anh.transactions.NextTransaction() + //for txNonce, tx := range anh.transactions { + // if txNonce <= account.Nonce { + // delete(anh.transactions, txNonce) + // continue + // } + // minNonce = core.MinUint64(txNonce, minNonce) + // resendableTxs = append(resendableTxs, tx) + //} + //anh.lowestNonce = minNonce + anh.mut.Unlock() + + //if len(resendableTxs) == 0 { + // return nil + //} + // + //hashes, err := anh.proxy.SendTransactions(ctx, resendableTxs) + //if err != nil { + // return err + //} + // + //addressAsBech32String, err := anh.address.AddressAsBech32String() + //if err != nil { + // return err + //} + // + //log.Info("resent transactions", "address", addressAsBech32String, "total txs", len(resendableTxs), "received hashes", len(hashes)) + + return nil +} + +// SendTransaction will save and propagate a transaction to the network +func (anh *addressNonceHandler) SendTransaction(ctx context.Context, tx *transaction.FrontendTransaction) (string, error) { + anh.mut.Lock() + anh.transactions.AddTransaction(tx) + anh.responsesChannels[tx.Nonce] = make(chan TransactionResponse) + anh.mut.Unlock() + + hash := <-anh.responsesChannels[tx.Nonce] + close(anh.responsesChannels[tx.Nonce]) + + return hash.TxHash, nil +} + +// DropTransactions will delete the cached transactions and will try to replace the current transactions from the pool using more gas price +func (anh *addressNonceHandler) DropTransactions() { + //anh.mut.Lock() + //anh.transactions = make(map[uint64]*transaction.FrontendTransaction) + //anh.computedNonceWasSet = false + //anh.gasPrice++ + //anh.nonceUntilGasIncreased = anh.computedNonce + //anh.mut.Unlock() +} + +func (anh *addressNonceHandler) getOlderTxWithSameNonce(tx *transaction.FrontendTransaction) *transaction.FrontendTransaction { + anh.mut.RLock() + defer anh.mut.RUnlock() + + return anh.transactions.SearchForNonce(tx.Nonce) +} + +// IsInterfaceNil returns true if there is no value under the interface +func (anh *addressNonceHandler) IsInterfaceNil() bool { + return anh == nil +} diff --git a/interactors/nonceHandlerV3/nonceTransactionsHandler.go b/interactors/nonceHandlerV3/nonceTransactionsHandler.go new file mode 100644 index 00000000..291b7059 --- /dev/null +++ b/interactors/nonceHandlerV3/nonceTransactionsHandler.go @@ -0,0 +1,215 @@ +package nonceHandlerV3 + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/multiversx/mx-chain-core-go/core/check" + "github.com/multiversx/mx-chain-core-go/data/transaction" + logger "github.com/multiversx/mx-chain-logger-go" + + "github.com/multiversx/mx-sdk-go/core" + "github.com/multiversx/mx-sdk-go/data" + "github.com/multiversx/mx-sdk-go/interactors" +) + +const minimumIntervalToResend = time.Second + +var log = logger.GetOrCreate("mx-sdk-go/interactors/nonceHandlerV3") + +// ArgsNonceTransactionsHandlerV3 is the argument DTO for a nonce transactions handler component +type ArgsNonceTransactionsHandlerV3 struct { + Proxy interactors.Proxy + IntervalToResend time.Duration +} + +// nonceTransactionsHandlerV3 is the handler used for an unlimited number of addresses. +// It basically contains a map of addressNonceHandler, creating new entries on the first +// access of a provided address. This struct delegates all the operations on the right +// instance of addressNonceHandler. It also starts a go routine that will periodically +// try to resend "stuck transactions" and to clean the inner state. The recommended resend +// interval is 1 minute. The Close method should be called whenever the current instance of +// nonceTransactionsHandlerV3 should be terminated and collected by the GC. +// This struct is concurrent safe. +type nonceTransactionsHandlerV3 struct { + proxy interactors.Proxy + mutHandlers sync.RWMutex + handlers map[string]interactors.AddressNonceHandler + cancelFunc func() + intervalToResend time.Duration +} + +// NewNonceTransactionHandlerV3 will create a new instance of the nonceTransactionsHandlerV3. It requires a Proxy implementation +// and an interval at which the transactions sent are rechecked and eventually, resent. +func NewNonceTransactionHandlerV3(args ArgsNonceTransactionsHandlerV3) (*nonceTransactionsHandlerV3, error) { + if check.IfNil(args.Proxy) { + return nil, interactors.ErrNilProxy + } + if args.IntervalToResend < minimumIntervalToResend { + return nil, fmt.Errorf("%w for intervalToResend in NewNonceTransactionHandlerV2", interactors.ErrInvalidValue) + } + + nth := &nonceTransactionsHandlerV3{ + proxy: args.Proxy, + handlers: make(map[string]interactors.AddressNonceHandler), + intervalToResend: args.IntervalToResend, + } + + ctx, cancelFunc := context.WithCancel(context.Background()) + nth.cancelFunc = cancelFunc + go nth.resendTransactionsLoop(ctx) + + return nth, nil +} + +// ApplyNonceAndGasPrice will apply the nonce to the given frontend transaction +func (nth *nonceTransactionsHandlerV3) ApplyNonceAndGasPrice(ctx context.Context, address core.AddressHandler, tx *transaction.FrontendTransaction) error { + if check.IfNil(address) { + return interactors.ErrNilAddress + } + if tx == nil { + return interactors.ErrNilTransaction + } + + anh, err := nth.getOrCreateAddressNonceHandler(address) + if err != nil { + return err + } + + return anh.ApplyNonceAndGasPrice(ctx, tx) +} + +func (nth *nonceTransactionsHandlerV3) getOrCreateAddressNonceHandler(address core.AddressHandler) (interactors.AddressNonceHandler, error) { + anh := nth.getAddressNonceHandler(address) + if !check.IfNil(anh) { + return anh, nil + } + + return nth.createAddressNonceHandler(address) +} + +func (nth *nonceTransactionsHandlerV3) getAddressNonceHandler(address core.AddressHandler) interactors.AddressNonceHandler { + nth.mutHandlers.RLock() + defer nth.mutHandlers.RUnlock() + + anh, found := nth.handlers[string(address.AddressBytes())] + if found { + return anh + } + return nil +} + +func (nth *nonceTransactionsHandlerV3) createAddressNonceHandler(address core.AddressHandler) (interactors.AddressNonceHandler, error) { + nth.mutHandlers.Lock() + defer nth.mutHandlers.Unlock() + + addressAsString := string(address.AddressBytes()) + anh, found := nth.handlers[addressAsString] + if found { + return anh, nil + } + + anh, err := NewAddressNonceHandler(nth.proxy, address) + if err != nil { + return nil, err + } + nth.handlers[addressAsString] = anh + + return anh, nil +} + +// SendTransaction will store and send the provided transaction +func (nth *nonceTransactionsHandlerV3) SendTransaction(ctx context.Context, tx *transaction.FrontendTransaction) (string, error) { + if tx == nil { + return "", interactors.ErrNilTransaction + } + + // Work with a full copy of the provided transaction so the provided one can change without affecting this component. + // Abnormal and unpredictable behaviors due to the resending mechanism are prevented this way + txCopy := *tx + + addrAsBech32 := txCopy.Sender + address, err := data.NewAddressFromBech32String(addrAsBech32) + if err != nil { + return "", fmt.Errorf("%w while creating address handler for string %s", err, addrAsBech32) + } + + anh, err := nth.getOrCreateAddressNonceHandler(address) + if err != nil { + return "", err + } + + sentHash, err := anh.SendTransaction(ctx, &txCopy) + if err != nil { + return "", fmt.Errorf("%w while sending transaction for address %s", err, addrAsBech32) + } + + return sentHash, nil +} + +func (nth *nonceTransactionsHandlerV3) resendTransactionsLoop(ctx context.Context) { + ticker := time.NewTicker(nth.intervalToResend) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + nth.resendTransactions(ctx) + case <-ctx.Done(): + log.Debug("finishing nonceTransactionsHandlerV3.resendTransactionsLoop...") + return + } + } +} + +func (nth *nonceTransactionsHandlerV3) resendTransactions(ctx context.Context) { + nth.mutHandlers.Lock() + defer nth.mutHandlers.Unlock() + + for _, anh := range nth.handlers { + select { + case <-ctx.Done(): + log.Debug("finishing nonceTransactionsHandlerV3.resendTransactions...") + return + default: + } + + // TODO: this is a debug timeout. + resendCtx, cancel := context.WithTimeout(ctx, time.Minute) + err := anh.ReSendTransactionsIfRequired(resendCtx) + fmt.Println("i got cancelled once") + log.LogIfError(err) + cancel() + } +} + +// DropTransactions will clean the addressNonceHandler cached transactions. A little gas increase will be applied to the next transactions +// in order to also replace the transactions from the txPool. +// This should be only used in a fallback plan, when some transactions are completely lost (or due to a bug, not even sent in first time) +func (nth *nonceTransactionsHandlerV3) DropTransactions(address core.AddressHandler) error { + if check.IfNil(address) { + return interactors.ErrNilAddress + } + + anh, err := nth.getOrCreateAddressNonceHandler(address) + if err != nil { + return err + } + anh.DropTransactions() + + return nil +} + +// Close finishes the transactions resend go routine +func (nth *nonceTransactionsHandlerV3) Close() error { + nth.cancelFunc() + + return nil +} + +// IsInterfaceNil returns true if there is no value under the interface +func (nth *nonceTransactionsHandlerV3) IsInterfaceNil() bool { + return nth == nil +} diff --git a/interactors/nonceHandlerV3/transactionQueue.go b/interactors/nonceHandlerV3/transactionQueue.go new file mode 100644 index 00000000..d95976a9 --- /dev/null +++ b/interactors/nonceHandlerV3/transactionQueue.go @@ -0,0 +1,82 @@ +package nonceHandlerV3 + +import ( + "container/heap" + + "github.com/multiversx/mx-chain-core-go/data/transaction" +) + +type TransactionQueue []*transaction.FrontendTransaction + +// Push required by the heap.Interface +func (tq *TransactionQueue) Push(newTx interface{}) { + tx := newTx.(*transaction.FrontendTransaction) + *tq = append(*tq, tx) +} + +// Pop required by the heap.Interface +func (tq *TransactionQueue) Pop() interface{} { + currentQueue := *tq + n := len(currentQueue) + tx := currentQueue[n-1] + *tq = currentQueue[0 : n-1] + return tx +} + +// Len required by sort.Interface +func (tq *TransactionQueue) Len() int { + return len(*tq) +} + +// Swap required by the sort.Interface +func (tq TransactionQueue) Swap(a, b int) { + tq[a], tq[b] = tq[b], tq[a] +} + +// Less required by the sort.Interface +// we flip the comparer (to greater than) because we need +// the comparer to sort by highest prio, not lowest +func (tq TransactionQueue) Less(a, b int) bool { + return tq[a].Nonce < tq[b].Nonce +} + +type TransactionQueueHandler struct { + tq TransactionQueue +} + +func NewTransactionQueueHandler() *TransactionQueueHandler { + th := &TransactionQueueHandler{tq: make(TransactionQueue, 0)} + heap.Init(&th.tq) + return th +} + +func (th *TransactionQueueHandler) AddTransaction(transaction *transaction.FrontendTransaction) { + heap.Push(&th.tq, transaction) +} + +func (th *TransactionQueueHandler) NextTransaction() *transaction.FrontendTransaction { + if th.tq.Len() == 0 { + return nil + } + + nextTransaction := heap.Pop(&th.tq) + return nextTransaction.(*transaction.FrontendTransaction) +} + +func (th *TransactionQueueHandler) Len() int { + return th.tq.Len() +} + +func (th *TransactionQueueHandler) SearchForNonce(nonce uint64) *transaction.FrontendTransaction { + if th.tq.Len() == 0 { + return nil + } + + for i, _ := range th.tq { + if th.tq[i].Nonce == nonce { + return th.tq[i] + } + } + + return nil +}