Skip to content

Commit

Permalink
fix: replace time.After with time.NewTimer and time.NewTicker for imp…
Browse files Browse the repository at this point in the history
…roved timer handling to avoid memory leaks
  • Loading branch information
iw4p committed Feb 6, 2025
1 parent 0b72bbf commit 8dda34c
Showing 1 changed file with 23 additions and 7 deletions.
30 changes: 23 additions & 7 deletions ton/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/xssnick/tonutils-go/address"
"github.com/xssnick/tonutils-go/tl"
"github.com/xssnick/tonutils-go/tlb"
"github.com/xssnick/tonutils-go/tvm/cell"
"strings"
"time"
)

func init() {
Expand Down Expand Up @@ -176,52 +177,60 @@ func (c *APIClient) GetTransaction(ctx context.Context, block *BlockIDExt, addr
}

func (c *APIClient) SubscribeOnTransactions(workerCtx context.Context, addr *address.Address, lastProcessedLT uint64, channel chan<- *tlb.Transaction) {
defer func() {
close(channel)
}()
defer close(channel)

wait := 0 * time.Second
timer := time.NewTimer(wait)
defer timer.Stop()

for {
select {
case <-workerCtx.Done():
return
case <-time.After(wait):
case <-timer.C:
}
wait = 3 * time.Second

ctx, cancel := context.WithTimeout(workerCtx, 10*time.Second)
master, err := c.CurrentMasterchainInfo(ctx)
cancel()
if err != nil {
timer.Reset(wait)
continue
}

ctx, cancel = context.WithTimeout(workerCtx, 10*time.Second)
acc, err := c.GetAccount(ctx, master, addr)
cancel()
if err != nil {
timer.Reset(wait)
continue
}
if !acc.IsActive || acc.LastTxLT == 0 {
// no transactions
timer.Reset(wait)
continue
}

if lastProcessedLT == acc.LastTxLT {
// already processed all
timer.Reset(wait)
continue
}

var transactions []*tlb.Transaction
lastHash, lastLT := acc.LastTxHash, acc.LastTxLT

waitList := 0 * time.Second
listTimer := time.NewTimer(waitList)

list:
for {
select {
case <-workerCtx.Done():
listTimer.Stop()
return
case <-time.After(waitList):
case <-listTimer.C:
}

ctx, cancel = context.WithTimeout(workerCtx, 10*time.Second)
Expand All @@ -230,9 +239,11 @@ func (c *APIClient) SubscribeOnTransactions(workerCtx context.Context, addr *add
if err != nil {
if lsErr, ok := err.(LSError); ok && lsErr.Code == -400 {
// lt not in db error
listTimer.Stop()
return
}
waitList = 3 * time.Second
listTimer.Reset(waitList)
continue
}

Expand All @@ -255,8 +266,11 @@ func (c *APIClient) SubscribeOnTransactions(workerCtx context.Context, addr *add
lastLT, lastHash = res[len(res)-1].PrevTxLT, res[len(res)-1].PrevTxHash
transactions = append(transactions, res...)
waitList = 0 * time.Second
listTimer.Reset(waitList)
}

listTimer.Stop()

if len(transactions) > 0 {
lastProcessedLT = transactions[0].LT // mark last transaction as known to not trigger twice

Expand All @@ -271,6 +285,8 @@ func (c *APIClient) SubscribeOnTransactions(workerCtx context.Context, addr *add

wait = 0 * time.Second
}

timer.Reset(wait)
}
}

Expand Down

0 comments on commit 8dda34c

Please sign in to comment.