diff --git a/adnl/adnl.go b/adnl/adnl.go index b2c0d76..61a2587 100644 --- a/adnl/adnl.go +++ b/adnl/adnl.go @@ -9,8 +9,6 @@ import ( "encoding/hex" "errors" "fmt" - "github.com/xssnick/tonutils-go/adnl/address" - "github.com/xssnick/tonutils-go/tl" "log" "reflect" "strings" @@ -18,6 +16,9 @@ import ( "sync/atomic" "time" "unsafe" + + "github.com/xssnick/tonutils-go/adnl/address" + "github.com/xssnick/tonutils-go/tl" ) const ( @@ -438,7 +439,6 @@ func (a *ADNL) Query(ctx context.Context, req, result tl.Serializable) error { } res := make(chan tl.Serializable, 1) - reqID := hex.EncodeToString(q.ID) a.mx.Lock() @@ -452,10 +452,12 @@ reSplit: a.mx.Lock() delete(a.activeQueries, reqID) a.mx.Unlock() - return fmt.Errorf("request failed: %w", err) } + timer := time.NewTimer(250 * time.Millisecond) + defer timer.Stop() + for { for i, packet := range packets { if err = a.send(packet); err != nil { @@ -467,6 +469,8 @@ reSplit: } } + timer.Reset(250 * time.Millisecond) + select { case resp := <-res: if err, ok := resp.(error); ok { @@ -476,7 +480,7 @@ reSplit: return nil case <-ctx.Done(): return fmt.Errorf("deadline exceeded, addr %s %s, err: %w", a.addr, hex.EncodeToString(a.peerKey), ctx.Err()) - case <-time.After(250 * time.Millisecond): + case <-timer.C: } } } diff --git a/adnl/adnl_test.go b/adnl/adnl_test.go index 008a5b8..2e38fdc 100644 --- a/adnl/adnl_test.go +++ b/adnl/adnl_test.go @@ -4,9 +4,10 @@ import ( "context" "crypto/ed25519" "fmt" - "github.com/xssnick/tonutils-go/tl" "testing" "time" + + "github.com/xssnick/tonutils-go/tl" ) func init() { @@ -150,9 +151,12 @@ func TestADNL_ClientServer(t *testing.T) { t.Fatal(err) } + timer := time.NewTimer(150 * time.Millisecond) + defer timer.Stop() + select { case <-gotSrvDiscon: - case <-time.After(150 * time.Millisecond): + case <-timer.C: t.Fatal("disconnect not triggered on server") } })*/ @@ -163,18 +167,24 @@ func TestADNL_ClientServer(t *testing.T) { t.Fatal(err) } + timer := time.NewTimer(150 * time.Millisecond) + defer timer.Stop() + select { case <-gotSrvCustom: - case <-time.After(150 * time.Millisecond): + case <-timer.C: t.Fatal("custom not received from client") } + timer = time.NewTimer(150 * time.Millisecond) + defer timer.Stop() + select { case m := <-gotCliCustom: if len(m.(TestMsg).Data) != 1280 { t.Fatal("invalid custom from server") } - case <-time.After(150 * time.Millisecond): + case <-timer.C: t.Fatal("custom not received from server") } }) @@ -190,18 +200,24 @@ func TestADNL_ClientServer(t *testing.T) { t.Fatal(err) } + timer := time.NewTimer(150 * time.Millisecond) + defer timer.Stop() + select { case <-gotSrvCustom: - case <-time.After(150 * time.Millisecond): + case <-timer.C: t.Fatal("custom not received from client") } + timer = time.NewTimer(150 * time.Millisecond) + defer timer.Stop() + select { case m := <-gotCliCustom2: if len(m.(TestMsg).Data) != 1280 { t.Fatal("invalid custom from server") } - case <-time.After(150 * time.Millisecond): + case <-timer.C: t.Fatal("custom not received from server") } }) diff --git a/adnl/rldp/client.go b/adnl/rldp/client.go index 3c4e900..b00acb6 100644 --- a/adnl/rldp/client.go +++ b/adnl/rldp/client.go @@ -6,14 +6,15 @@ import ( "crypto/rand" "errors" "fmt" - "github.com/xssnick/raptorq" - "github.com/xssnick/tonutils-go/adnl" - "github.com/xssnick/tonutils-go/tl" "log" "reflect" "sync" "sync/atomic" "time" + + "github.com/xssnick/raptorq" + "github.com/xssnick/tonutils-go/adnl" + "github.com/xssnick/tonutils-go/tl" ) type ADNL interface { @@ -404,12 +405,14 @@ func (r *RLDP) recoverySender() { timedOut := make([]*activeTransfer, 0, 128) timedOutReq := make([]*activeRequest, 0, 128) closerCtx := r.adnl.GetCloserCtx() + ticker := time.NewTicker(1 * time.Millisecond) + defer ticker.Stop() for { select { case <-closerCtx.Done(): return - case <-time.After(1 * time.Millisecond): + case <-ticker.C: packets = packets[:0] transfersToProcess = transfersToProcess[:0] timedOut = timedOut[:0] diff --git a/liteclient/connection.go b/liteclient/connection.go index 73a06cb..3dd723e 100644 --- a/liteclient/connection.go +++ b/liteclient/connection.go @@ -336,11 +336,14 @@ func (n *connection) listen(connResult chan<- error) { func (n *connection) startPings(every time.Duration) { // TODO: do without goroutines + ticker := time.NewTicker(every) + defer ticker.Stop() + for { select { case <-n.pool.globalCtx.Done(): return - case <-time.After(every): + case <-ticker.C: } num, err := rand.Int(rand.Reader, new(big.Int).SetInt64(0xFFFFFFFFFFFFFFF)) @@ -348,11 +351,9 @@ func (n *connection) startPings(every time.Duration) { continue } - err = n.ping(num.Int64()) - if err != nil { + if err := n.ping(num.Int64()); err != nil { // force close in case of error _ = n.tcp.Close() - break } } diff --git a/ton/transactions.go b/ton/transactions.go index 0814c02..b3f6539 100644 --- a/ton/transactions.go +++ b/ton/transactions.go @@ -177,16 +177,17 @@ func (c *APIClient) GetTransaction(ctx context.Context, block *BlockIDExt, addr } func (c *APIClient) SubscribeOnTransactions(workerCtx context.Context, addr *address.Address, lastProcessedLT uint64, channel chan<- *tlb.Transaction) { - defer func() { - close(channel) - }() + defer close(channel) wait := 0 * time.Second + timer := time.NewTimer(wait) + defer timer.Stop() + for { select { case <-workerCtx.Done(): return - case <-time.After(wait): + case <-timer.C: } wait = 3 * time.Second @@ -194,6 +195,7 @@ func (c *APIClient) SubscribeOnTransactions(workerCtx context.Context, addr *add master, err := c.CurrentMasterchainInfo(ctx) cancel() if err != nil { + timer.Reset(wait) continue } @@ -201,15 +203,18 @@ func (c *APIClient) SubscribeOnTransactions(workerCtx context.Context, addr *add acc, err := c.GetAccount(ctx, master, addr) cancel() if err != nil { + timer.Reset(wait) continue } if !acc.IsActive || acc.LastTxLT == 0 { // no transactions + timer.Reset(wait) continue } if lastProcessedLT == acc.LastTxLT { // already processed all + timer.Reset(wait) continue } @@ -217,12 +222,15 @@ func (c *APIClient) SubscribeOnTransactions(workerCtx context.Context, addr *add lastHash, lastLT := acc.LastTxHash, acc.LastTxLT waitList := 0 * time.Second + listTimer := time.NewTimer(waitList) + list: for { select { case <-workerCtx.Done(): + listTimer.Stop() return - case <-time.After(waitList): + case <-listTimer.C: } if lastLT == 0 { @@ -236,12 +244,14 @@ func (c *APIClient) SubscribeOnTransactions(workerCtx context.Context, addr *add if err != nil { if lsErr, ok := err.(LSError); ok && lsErr.Code == -400 { // lt not in db error + listTimer.Stop() return } else if errors.Is(err, ErrNoTransactionsWereFound) && (len(transactions) > 0) { // process already found transactions break } waitList = 3 * time.Second + listTimer.Reset(waitList) continue } @@ -264,8 +274,11 @@ func (c *APIClient) SubscribeOnTransactions(workerCtx context.Context, addr *add lastLT, lastHash = res[len(res)-1].PrevTxLT, res[len(res)-1].PrevTxHash transactions = append(transactions, res...) waitList = 0 * time.Second + listTimer.Reset(waitList) } + listTimer.Stop() + if len(transactions) > 0 { lastProcessedLT = transactions[0].LT // mark last transaction as known to not trigger twice @@ -280,6 +293,8 @@ func (c *APIClient) SubscribeOnTransactions(workerCtx context.Context, addr *add wait = 0 * time.Second } + + timer.Reset(wait) } }