From f090cdb15eecce616e93460ebabb59541eaeb668 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 12 Feb 2020 18:02:04 -0800 Subject: [PATCH 1/2] feat: debounce wants manually This: * Makes it easy to send immediately if we wait too long and/or if we have enough to send. * Is significantly more efficient than the debounce library as it doesn't spin off a bunch of "after" timers. fixes #245 This commit was moved from ipfs/go-bitswap@777c0d9ab790560b0813dd786e09d0d5b7299393 --- bitswap/internal/messagequeue/messagequeue.go | 61 ++++++++++++++----- 1 file changed, 47 insertions(+), 14 deletions(-) diff --git a/bitswap/internal/messagequeue/messagequeue.go b/bitswap/internal/messagequeue/messagequeue.go index 15f8100d2..4610a95b2 100644 --- a/bitswap/internal/messagequeue/messagequeue.go +++ b/bitswap/internal/messagequeue/messagequeue.go @@ -6,8 +6,6 @@ import ( "sync" "time" - debounce "github.com/bep/debounce" - bsmsg "github.com/ipfs/go-bitswap/message" pb "github.com/ipfs/go-bitswap/message/pb" bsnet "github.com/ipfs/go-bitswap/network" @@ -34,6 +32,11 @@ const ( maxPriority = math.MaxInt32 // sendMessageDebounce is the debounce duration when calling sendMessage() sendMessageDebounce = time.Millisecond + // when we reach sendMessaageCuttoff wants/cancels, we'll send the message immediately. + sendMessageCuttoff = 100 + // when we debounce for more than sendMessageMaxDelay, we'll send the + // message immediately. + sendMessageMaxDelay = 100 * time.Millisecond ) // MessageNetwork is any network that can connect peers and generate a message @@ -54,9 +57,8 @@ type MessageQueue struct { maxMessageSize int sendErrorBackoff time.Duration - signalWorkReady func() - outgoingWork chan struct{} - done chan struct{} + outgoingWork chan time.Time + done chan struct{} // Take lock whenever any of these variables are modified wllock sync.Mutex @@ -165,17 +167,13 @@ func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork, bcstWants: newRecallWantList(), peerWants: newRecallWantList(), cancels: cid.NewSet(), - outgoingWork: make(chan struct{}, 1), + outgoingWork: make(chan time.Time, 1), done: make(chan struct{}), rebroadcastInterval: defaultRebroadcastInterval, sendErrorBackoff: sendErrorBackoff, priority: maxPriority, } - // Apply debounce to the work ready signal (which triggers sending a message) - debounced := debounce.New(sendMessageDebounce) - mq.signalWorkReady = func() { debounced(mq.onWorkReady) } - return mq } @@ -285,11 +283,42 @@ func (mq *MessageQueue) onShutdown() { func (mq *MessageQueue) runQueue() { defer mq.onShutdown() + // Create a timer for debouncing scheduled work. + scheduleWork := time.NewTimer(0) + if !scheduleWork.Stop() { + <-scheduleWork.C + } + + var workScheduled time.Time for { select { case <-mq.rebroadcastTimer.C: mq.rebroadcastWantlist() - case <-mq.outgoingWork: + case when := <-mq.outgoingWork: + // If we have work scheduled, cancel the timer. If we + // don't, record when the work was scheduled. + // We send the time on the channel so we accurately + // track delay. + if workScheduled.IsZero() { + workScheduled = when + } else if !scheduleWork.Stop() { + <-scheduleWork.C + } + + // If we have too many updates and/or we've waited too + // long, send immediately. + if mq.pendingWorkCount() > sendMessageCuttoff || + time.Since(workScheduled) >= sendMessageMaxDelay { + mq.sendIfReady() + workScheduled = time.Time{} + } else { + // Otherwise, extend the timer. + scheduleWork.Reset(sendMessageDebounce) + } + case <-scheduleWork.C: + // We have work scheduled and haven't seen any updates + // in sendMessageDebounce. Send immediately. + workScheduled = time.Time{} mq.sendIfReady() case <-mq.done: if mq.sender != nil { @@ -335,9 +364,9 @@ func (mq *MessageQueue) transferRebroadcastWants() bool { return true } -func (mq *MessageQueue) onWorkReady() { +func (mq *MessageQueue) signalWorkReady() { select { - case mq.outgoingWork <- struct{}{}: + case mq.outgoingWork <- time.Now(): default: } } @@ -443,10 +472,14 @@ func (mq *MessageQueue) simulateDontHaveWithTimeout(msg bsmsg.BitSwapMessage) { // } func (mq *MessageQueue) hasPendingWork() bool { + return mq.pendingWorkCount() > 0 +} + +func (mq *MessageQueue) pendingWorkCount() int { mq.wllock.Lock() defer mq.wllock.Unlock() - return mq.bcstWants.pending.Len() > 0 || mq.peerWants.pending.Len() > 0 || mq.cancels.Len() > 0 + return mq.bcstWants.pending.Len() + mq.peerWants.pending.Len() + mq.cancels.Len() } func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwapMessage, func()) { From 4017e538142e87edd9a9108a7e10e2b0466e56e7 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Thu, 13 Feb 2020 12:01:52 -0500 Subject: [PATCH 2/2] refactor: adjust message queue debounce limits This commit was moved from ipfs/go-bitswap@7ccab36f6a6e3038d94ef11b60d645b1de442feb --- bitswap/internal/messagequeue/messagequeue.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/bitswap/internal/messagequeue/messagequeue.go b/bitswap/internal/messagequeue/messagequeue.go index 4610a95b2..e60d52c3d 100644 --- a/bitswap/internal/messagequeue/messagequeue.go +++ b/bitswap/internal/messagequeue/messagequeue.go @@ -32,11 +32,11 @@ const ( maxPriority = math.MaxInt32 // sendMessageDebounce is the debounce duration when calling sendMessage() sendMessageDebounce = time.Millisecond - // when we reach sendMessaageCuttoff wants/cancels, we'll send the message immediately. - sendMessageCuttoff = 100 + // when we reach sendMessageCutoff wants/cancels, we'll send the message immediately. + sendMessageCutoff = 256 // when we debounce for more than sendMessageMaxDelay, we'll send the // message immediately. - sendMessageMaxDelay = 100 * time.Millisecond + sendMessageMaxDelay = 20 * time.Millisecond ) // MessageNetwork is any network that can connect peers and generate a message @@ -286,6 +286,8 @@ func (mq *MessageQueue) runQueue() { // Create a timer for debouncing scheduled work. scheduleWork := time.NewTimer(0) if !scheduleWork.Stop() { + // Need to drain the timer if Stop() returns false + // See: https://golang.org/pkg/time/#Timer.Stop <-scheduleWork.C } @@ -302,12 +304,13 @@ func (mq *MessageQueue) runQueue() { if workScheduled.IsZero() { workScheduled = when } else if !scheduleWork.Stop() { + // Need to drain the timer if Stop() returns false <-scheduleWork.C } // If we have too many updates and/or we've waited too // long, send immediately. - if mq.pendingWorkCount() > sendMessageCuttoff || + if mq.pendingWorkCount() > sendMessageCutoff || time.Since(workScheduled) >= sendMessageMaxDelay { mq.sendIfReady() workScheduled = time.Time{}