Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Timer Handling and Prevent Memory Leaks #297

Merged
merged 7 commits into from
Feb 7, 2025
14 changes: 9 additions & 5 deletions adnl/adnl.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ import (
"encoding/hex"
"errors"
"fmt"
"github.com/xssnick/tonutils-go/adnl/address"
"github.com/xssnick/tonutils-go/tl"
"log"
"reflect"
"strings"
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/xssnick/tonutils-go/adnl/address"
"github.com/xssnick/tonutils-go/tl"
)

const (
Expand Down Expand Up @@ -438,7 +439,6 @@ func (a *ADNL) Query(ctx context.Context, req, result tl.Serializable) error {
}

res := make(chan tl.Serializable, 1)

reqID := hex.EncodeToString(q.ID)

a.mx.Lock()
Expand All @@ -452,10 +452,12 @@ reSplit:
a.mx.Lock()
delete(a.activeQueries, reqID)
a.mx.Unlock()

return fmt.Errorf("request failed: %w", err)
}

timer := time.NewTimer(250 * time.Millisecond)
defer timer.Stop()

for {
for i, packet := range packets {
if err = a.send(packet); err != nil {
Expand All @@ -467,6 +469,8 @@ reSplit:
}
}

timer.Reset(250 * time.Millisecond)

select {
case resp := <-res:
if err, ok := resp.(error); ok {
Expand All @@ -476,7 +480,7 @@ reSplit:
return nil
case <-ctx.Done():
return fmt.Errorf("deadline exceeded, addr %s %s, err: %w", a.addr, hex.EncodeToString(a.peerKey), ctx.Err())
case <-time.After(250 * time.Millisecond):
case <-timer.C:
}
}
}
Expand Down
28 changes: 22 additions & 6 deletions adnl/adnl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import (
"context"
"crypto/ed25519"
"fmt"
"github.com/xssnick/tonutils-go/tl"
"testing"
"time"

"github.com/xssnick/tonutils-go/tl"
)

func init() {
Expand Down Expand Up @@ -150,9 +151,12 @@ func TestADNL_ClientServer(t *testing.T) {
t.Fatal(err)
}

timer := time.NewTimer(150 * time.Millisecond)
defer timer.Stop()

select {
case <-gotSrvDiscon:
case <-time.After(150 * time.Millisecond):
case <-timer.C:
t.Fatal("disconnect not triggered on server")
}
})*/
Expand All @@ -163,18 +167,24 @@ func TestADNL_ClientServer(t *testing.T) {
t.Fatal(err)
}

timer := time.NewTimer(150 * time.Millisecond)
defer timer.Stop()

select {
case <-gotSrvCustom:
case <-time.After(150 * time.Millisecond):
case <-timer.C:
t.Fatal("custom not received from client")
}

timer = time.NewTimer(150 * time.Millisecond)
defer timer.Stop()

select {
case m := <-gotCliCustom:
if len(m.(TestMsg).Data) != 1280 {
t.Fatal("invalid custom from server")
}
case <-time.After(150 * time.Millisecond):
case <-timer.C:
t.Fatal("custom not received from server")
}
})
Expand All @@ -190,18 +200,24 @@ func TestADNL_ClientServer(t *testing.T) {
t.Fatal(err)
}

timer := time.NewTimer(150 * time.Millisecond)
defer timer.Stop()

select {
case <-gotSrvCustom:
case <-time.After(150 * time.Millisecond):
case <-timer.C:
t.Fatal("custom not received from client")
}

timer = time.NewTimer(150 * time.Millisecond)
defer timer.Stop()

select {
case m := <-gotCliCustom2:
if len(m.(TestMsg).Data) != 1280 {
t.Fatal("invalid custom from server")
}
case <-time.After(150 * time.Millisecond):
case <-timer.C:
t.Fatal("custom not received from server")
}
})
Expand Down
11 changes: 7 additions & 4 deletions adnl/rldp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import (
"crypto/rand"
"errors"
"fmt"
"github.com/xssnick/raptorq"
"github.com/xssnick/tonutils-go/adnl"
"github.com/xssnick/tonutils-go/tl"
"log"
"reflect"
"sync"
"sync/atomic"
"time"

"github.com/xssnick/raptorq"
"github.com/xssnick/tonutils-go/adnl"
"github.com/xssnick/tonutils-go/tl"
)

type ADNL interface {
Expand Down Expand Up @@ -404,12 +405,14 @@ func (r *RLDP) recoverySender() {
timedOut := make([]*activeTransfer, 0, 128)
timedOutReq := make([]*activeRequest, 0, 128)
closerCtx := r.adnl.GetCloserCtx()
ticker := time.NewTicker(1 * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-closerCtx.Done():
return
case <-time.After(1 * time.Millisecond):
case <-ticker.C:
packets = packets[:0]
transfersToProcess = transfersToProcess[:0]
timedOut = timedOut[:0]
Expand Down
9 changes: 5 additions & 4 deletions liteclient/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,23 +336,24 @@ func (n *connection) listen(connResult chan<- error) {

func (n *connection) startPings(every time.Duration) {
// TODO: do without goroutines
ticker := time.NewTicker(every)
defer ticker.Stop()

for {
select {
case <-n.pool.globalCtx.Done():
return
case <-time.After(every):
case <-ticker.C:
}

num, err := rand.Int(rand.Reader, new(big.Int).SetInt64(0xFFFFFFFFFFFFFFF))
if err != nil {
continue
}

err = n.ping(num.Int64())
if err != nil {
if err := n.ping(num.Int64()); err != nil {
// force close in case of error
_ = n.tcp.Close()

break
}
}
Expand Down
25 changes: 20 additions & 5 deletions ton/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,52 +177,60 @@ func (c *APIClient) GetTransaction(ctx context.Context, block *BlockIDExt, addr
}

func (c *APIClient) SubscribeOnTransactions(workerCtx context.Context, addr *address.Address, lastProcessedLT uint64, channel chan<- *tlb.Transaction) {
defer func() {
close(channel)
}()
defer close(channel)

wait := 0 * time.Second
timer := time.NewTimer(wait)
defer timer.Stop()

for {
select {
case <-workerCtx.Done():
return
case <-time.After(wait):
case <-timer.C:
}
wait = 3 * time.Second

ctx, cancel := context.WithTimeout(workerCtx, 10*time.Second)
master, err := c.CurrentMasterchainInfo(ctx)
cancel()
if err != nil {
timer.Reset(wait)
continue
}

ctx, cancel = context.WithTimeout(workerCtx, 10*time.Second)
acc, err := c.GetAccount(ctx, master, addr)
cancel()
if err != nil {
timer.Reset(wait)
continue
}
if !acc.IsActive || acc.LastTxLT == 0 {
// no transactions
timer.Reset(wait)
continue
}

if lastProcessedLT == acc.LastTxLT {
// already processed all
timer.Reset(wait)
continue
}

var transactions []*tlb.Transaction
lastHash, lastLT := acc.LastTxHash, acc.LastTxLT

waitList := 0 * time.Second
listTimer := time.NewTimer(waitList)

list:
for {
select {
case <-workerCtx.Done():
listTimer.Stop()
return
case <-time.After(waitList):
case <-listTimer.C:
}

if lastLT == 0 {
Expand All @@ -236,12 +244,14 @@ func (c *APIClient) SubscribeOnTransactions(workerCtx context.Context, addr *add
if err != nil {
if lsErr, ok := err.(LSError); ok && lsErr.Code == -400 {
// lt not in db error
listTimer.Stop()
return
} else if errors.Is(err, ErrNoTransactionsWereFound) && (len(transactions) > 0) {
// process already found transactions
break
}
waitList = 3 * time.Second
listTimer.Reset(waitList)
continue
}

Expand All @@ -264,8 +274,11 @@ func (c *APIClient) SubscribeOnTransactions(workerCtx context.Context, addr *add
lastLT, lastHash = res[len(res)-1].PrevTxLT, res[len(res)-1].PrevTxHash
transactions = append(transactions, res...)
waitList = 0 * time.Second
listTimer.Reset(waitList)
}

listTimer.Stop()

if len(transactions) > 0 {
lastProcessedLT = transactions[0].LT // mark last transaction as known to not trigger twice

Expand All @@ -280,6 +293,8 @@ func (c *APIClient) SubscribeOnTransactions(workerCtx context.Context, addr *add

wait = 0 * time.Second
}

timer.Reset(wait)
}
}

Expand Down