Skip to content

Commit

Permalink
optimizations and improvements.
Browse files Browse the repository at this point in the history
  • Loading branch information
cristure committed Jan 31, 2024
1 parent 0bbcca7 commit 15b0037
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 52 deletions.
5 changes: 3 additions & 2 deletions interactors/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type AddressNonceHandlerV3 interface {
ApplyNonceAndGasPrice(ctx context.Context, tx ...*transaction.FrontendTransaction) error
SendTransaction(ctx context.Context, tx *transaction.FrontendTransaction) (string, error)
IsInterfaceNil() bool
Close()
}

// TransactionNonceHandlerV1 defines the component able to manage transaction nonces
Expand All @@ -67,8 +68,8 @@ type TransactionNonceHandlerV2 interface {

// TransactionNonceHandlerV3 defines the component able to apply nonce for a given frontend transaction.
type TransactionNonceHandlerV3 interface {
ApplyNonceAndGasPrice(ctx context.Context, address core.AddressHandler, tx ...*transaction.FrontendTransaction) error
ApplyNonceAndGasPrice(ctx context.Context, tx ...*transaction.FrontendTransaction) error
SendTransactions(ctx context.Context, txs ...*transaction.FrontendTransaction) ([]string, error)
Close() error
Close()
IsInterfaceNil() bool
}
23 changes: 13 additions & 10 deletions interactors/nonceHandlerV3/addressNonceHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,26 @@ type addressNonceHandler struct {
proxy interactors.Proxy
gasPrice uint64
transactionWorker *workers.TransactionWorker
parentContext context.Context
cancelFunc func()
}

// NewAddressNonceHandlerV3 returns a new instance of a addressNonceHandler
func NewAddressNonceHandlerV3(parentContext context.Context, proxy interactors.Proxy, address sdkCore.AddressHandler, pollingInterval time.Duration) (interactors.AddressNonceHandlerV3, error) {
func NewAddressNonceHandlerV3(proxy interactors.Proxy, address sdkCore.AddressHandler, pollingInterval time.Duration) (*addressNonceHandler, error) {
if check.IfNil(proxy) {
return nil, interactors.ErrNilProxy
}
if check.IfNil(address) {
return nil, interactors.ErrNilAddress
}

ctx, cancelFunc := context.WithCancel(context.Background())

anh := &addressNonceHandler{
mut: sync.Mutex{},
address: address,
proxy: proxy,
transactionWorker: workers.NewTransactionWorker(parentContext, proxy, pollingInterval),
parentContext: parentContext,
transactionWorker: workers.NewTransactionWorker(ctx, proxy, pollingInterval),
cancelFunc: cancelFunc,
}

return anh, nil
Expand All @@ -59,7 +61,7 @@ func (anh *addressNonceHandler) ApplyNonceAndGasPrice(ctx context.Context, txs .
return err
}

anh.fetchGasPriceIfRequired(ctx, nonce)
anh.fetchGasPriceIfRequired(ctx)
tx.GasPrice = core.MaxUint64(anh.gasPrice, tx.GasPrice)
}

Expand All @@ -77,10 +79,6 @@ func (anh *addressNonceHandler) SendTransaction(ctx context.Context, tx *transac

case <-ctx.Done():
return "", ctx.Err()

case <-anh.parentContext.Done():
return "", anh.parentContext.Err()

}
}

Expand All @@ -89,7 +87,12 @@ func (anh *addressNonceHandler) IsInterfaceNil() bool {
return anh == nil
}

func (anh *addressNonceHandler) fetchGasPriceIfRequired(ctx context.Context, nonce uint64) {
// Close will cancel all related processes..
func (anh *addressNonceHandler) Close() {
anh.cancelFunc()
}

func (anh *addressNonceHandler) fetchGasPriceIfRequired(ctx context.Context) {
if anh.gasPrice == 0 {
networkConfig, err := anh.proxy.GetNetworkConfig(ctx)

Expand Down
22 changes: 9 additions & 13 deletions interactors/nonceHandlerV3/nonceTransactionsHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ type nonceTransactionsHandlerV3 struct {
proxy interactors.Proxy
mutHandlers sync.RWMutex
handlers map[string]interactors.AddressNonceHandlerV3
context context.Context
cancelFunc func()
pollingInterval time.Duration
}

Expand All @@ -53,12 +51,9 @@ func NewNonceTransactionHandlerV3(args ArgsNonceTransactionsHandlerV3) (*nonceTr
return nil, fmt.Errorf("%w for pollingInterval in NewNonceTransactionHandlerV2", interactors.ErrInvalidValue)
}

ctx, cancelFunc := context.WithCancel(context.Background())
nth := &nonceTransactionsHandlerV3{
proxy: args.Proxy,
handlers: make(map[string]interactors.AddressNonceHandlerV3),
context: ctx,
cancelFunc: cancelFunc,
pollingInterval: args.PollingInterval,
}

Expand Down Expand Up @@ -122,7 +117,7 @@ func (nth *nonceTransactionsHandlerV3) createAddressNonceHandler(address core.Ad
return anh, nil
}

anh, err := NewAddressNonceHandlerV3(nth.context, nth.proxy, address, nth.pollingInterval)
anh, err := NewAddressNonceHandlerV3(nth.proxy, address, nth.pollingInterval)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -151,7 +146,6 @@ func (nth *nonceTransactionsHandlerV3) SendTransactions(ctx context.Context, txs
g, ctx := errgroup.WithContext(ctx)
sentHashes := make([]string, len(txs))
for i, tx := range txs {

if tx == nil {
return nil, interactors.ErrNilTransaction
}
Expand Down Expand Up @@ -184,17 +178,19 @@ func (nth *nonceTransactionsHandlerV3) SendTransactions(ctx context.Context, txs
}

if err := g.Wait(); err != nil {
return nil, err
return sentHashes, err
}

return sentHashes, nil
}

// Close finishes the workers resend go routine
func (nth *nonceTransactionsHandlerV3) Close() error {
nth.cancelFunc()

return nil
// Close will cancel all related processes.
func (nth *nonceTransactionsHandlerV3) Close() {
nth.mutHandlers.RLock()
defer nth.mutHandlers.RUnlock()
for _, handler := range nth.handlers {
handler.Close()
}
}

// IsInterfaceNil returns true if there is no value under the interface
Expand Down
106 changes: 88 additions & 18 deletions interactors/nonceHandlerV3/nonceTransactionsHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,16 @@ import (
)

var testAddressAsBech32String = "erd1zptg3eu7uw0qvzhnu009lwxupcn6ntjxptj5gaxt8curhxjqr9tsqpsnht"
var testAddress, _ = data.NewAddressFromBech32String(testAddressAsBech32String)

func TestSendTransactionsOneByOne(t *testing.T) {
t.Parallel()

var sendTransactionCalled bool
var getAccountCalled bool

// Since the endpoint to send workers for the nonce-management-service has the same definition as the one
// in the gateway, we can create a proxy instance that points towards the nonce-management-service instead.
// The nonce-management-service will then, in turn send the workers to the gateway.
transactionHandler, err := NewNonceTransactionHandlerV3(createMockArgsNonceTransactionsHandlerV3(
&sendTransactionCalled, &getAccountCalled))
transactionHandler, err := NewNonceTransactionHandlerV3(createMockArgsNonceTransactionsHandlerV3(&getAccountCalled))
require.NoError(t, err, "failed to create transaction handler")

var txs []*transaction.FrontendTransaction
Expand Down Expand Up @@ -59,7 +56,6 @@ func TestSendTransactionsOneByOne(t *testing.T) {
h, err := transactionHandler.SendTransactions(context.Background(), tt)
require.NoError(t, err, "failed to send transaction")
require.Equal(t, []string{strconv.FormatUint(tt.Nonce, 10)}, h)
require.True(t, sendTransactionCalled, "send transaction was not called")
}(tt)
}
wg.Wait()
Expand All @@ -68,14 +64,12 @@ func TestSendTransactionsOneByOne(t *testing.T) {
func TestSendTransactionsBulk(t *testing.T) {
t.Parallel()

var sendTransactionCalled bool
var getAccountCalled bool

// Since the endpoint to send workers for the nonce-management-service has the same definition as the one
// in the gateway, we can create a proxy instance that points towards the nonce-management-service instead.
// The nonce-management-service will then, in turn send the workers to the gateway.
transactionHandler, err := NewNonceTransactionHandlerV3(createMockArgsNonceTransactionsHandlerV3(
&sendTransactionCalled, &getAccountCalled))
transactionHandler, err := NewNonceTransactionHandlerV3(createMockArgsNonceTransactionsHandlerV3(&getAccountCalled))
require.NoError(t, err, "failed to create transaction handler")

var txs []*transaction.FrontendTransaction
Expand All @@ -100,25 +94,23 @@ func TestSendTransactionsBulk(t *testing.T) {

txHashes, err := transactionHandler.SendTransactions(context.Background(), txs...)
require.NoError(t, err, "failed to send transactions as bulk")
require.Equal(t, mockedNonces(1000), txHashes)
require.True(t, sendTransactionCalled, "send transaction was not called")
require.Equal(t, mockedHashes(1000), txHashes)
}

func TestSendTransactionsClose(t *testing.T) {
func TestSendTransactionsCloseInstant(t *testing.T) {
t.Parallel()

var sendTransactionCalled bool
var getAccountCalled bool

// Since the endpoint to send workers for the nonce-management-service has the same definition as the one
// in the gateway, we can create a proxy instance that points towards the nonce-management-service instead.
// The nonce-management-service will then, in turn send the workers to the gateway.
transactionHandler, err := NewNonceTransactionHandlerV3(createMockArgsNonceTransactionsHandlerV3(
&sendTransactionCalled, &getAccountCalled))
transactionHandler, err := NewNonceTransactionHandlerV3(createMockArgsNonceTransactionsHandlerV3(&getAccountCalled))
require.NoError(t, err, "failed to create transaction handler")

var txs []*transaction.FrontendTransaction

// Create 1k transactions.
for i := 0; i < 1000; i++ {
tx := &transaction.FrontendTransaction{
Sender: testAddressAsBech32String,
Expand All @@ -133,28 +125,106 @@ func TestSendTransactionsClose(t *testing.T) {
txs = append(txs, tx)
}

// Apply nonce to them in a bulk.
err = transactionHandler.ApplyNonceAndGasPrice(context.Background(), txs...)
require.NoError(t, err, "failed to apply nonce")

// We only do this once, we check if the bool has been modified.
require.True(t, getAccountCalled, "get account was not called")

var wg sync.WaitGroup
wg.Add(1)
go func() {
hashes, err := transactionHandler.SendTransactions(context.Background(), txs...)
require.Empty(t, hashes, "no transaction should be processed")

// Since the close is almost instant, no transaction should be processed. Therefore the hashes should be blank.
require.Equal(t, make([]string, 1000), hashes, "no transaction should be processed")
require.Equal(t, "context canceled while sending transaction for address erd1zptg3eu7uw0qvzhnu009lwxupcn6ntjxptj5gaxt8curhxjqr9tsqpsnht", err.Error())
wg.Done()
}()

// Close the processes related to the transaction handler.
transactionHandler.Close()

wg.Wait()
require.NoError(t, err, "failed to send transactions as bulk")
}

func TestSendTransactionsCloseDelay(t *testing.T) {
t.Parallel()

var getAccountCalled bool

// Create another proxyStub that adds some delay when sending transactions.
mockArgs := ArgsNonceTransactionsHandlerV3{
Proxy: &testsCommon.ProxyStub{
SendTransactionCalled: func(tx *transaction.FrontendTransaction) (string, error) {
// Presume this operation is taking roughly 100 ms. Meaning 10 operations / second.
time.Sleep(100 * time.Millisecond)
return strconv.FormatUint(tx.Nonce, 10), nil
},
GetAccountCalled: func(address core.AddressHandler) (*data.Account, error) {
getAccountCalled = true
return &data.Account{}, nil
},
},
PollingInterval: time.Second * 5,
}

// Since the endpoint to send workers for the nonce-management-service has the same definition as the one
// in the gateway, we can create a proxy instance that points towards the nonce-management-service instead.
// The nonce-management-service will then, in turn send the workers to the gateway.
transactionHandler, err := NewNonceTransactionHandlerV3(mockArgs)
require.NoError(t, err, "failed to create transaction handler")

var txs []*transaction.FrontendTransaction

// Create 1k transactions.
for i := 0; i < 1000; i++ {
tx := &transaction.FrontendTransaction{
Sender: testAddressAsBech32String,
Receiver: testAddressAsBech32String,
GasLimit: 50000,
ChainID: "T",
Value: "5000000000000000000",
Nonce: uint64(i),
GasPrice: 1000000000,
Version: 2,
}
txs = append(txs, tx)
}

// Apply nonce to them in a bulk.
err = transactionHandler.ApplyNonceAndGasPrice(context.Background(), txs...)
require.NoError(t, err, "failed to apply nonce")

// We only do this once, we check if the bool has been modified.
require.True(t, getAccountCalled, "get account was not called")

var wg sync.WaitGroup
wg.Add(1)
go func() {
hashes, err := transactionHandler.SendTransactions(context.Background(), txs...)

// Since the close is not instant. There should be some hashes that have been processed.
require.NotEmpty(t, hashes, "no transaction should be processed")
require.Equal(t, "context canceled while sending transaction for address erd1zptg3eu7uw0qvzhnu009lwxupcn6ntjxptj5gaxt8curhxjqr9tsqpsnht", err.Error())
wg.Done()
}()

// Close the processes related to the transaction handler with a delay.
time.AfterFunc(2*time.Second, func() {
transactionHandler.Close()
})

wg.Wait()
require.NoError(t, err, "failed to send transactions as bulk")
}

func createMockArgsNonceTransactionsHandlerV3(sendTransactionWasCalled, getAccountCalled *bool) ArgsNonceTransactionsHandlerV3 {
func createMockArgsNonceTransactionsHandlerV3(getAccountCalled *bool) ArgsNonceTransactionsHandlerV3 {
return ArgsNonceTransactionsHandlerV3{
Proxy: &testsCommon.ProxyStub{
SendTransactionCalled: func(tx *transaction.FrontendTransaction) (string, error) {
*sendTransactionWasCalled = true
return strconv.FormatUint(tx.Nonce, 10), nil
},
GetAccountCalled: func(address core.AddressHandler) (*data.Account, error) {
Expand All @@ -166,7 +236,7 @@ func createMockArgsNonceTransactionsHandlerV3(sendTransactionWasCalled, getAccou
}
}

func mockedNonces(index int) []string {
func mockedHashes(index int) []string {
mock := make([]string, index)
for i := 0; i < index; i++ {
mock[i] = strconv.Itoa(i)
Expand Down
Loading

0 comments on commit 15b0037

Please sign in to comment.