From e62503bfc21c7ba5c76d681edf6751e00674e94e Mon Sep 17 00:00:00 2001 From: Aurora Gaffney Date: Mon, 27 Jan 2025 16:40:47 -0600 Subject: [PATCH] refactor: send RequestNext in batches for chainsync pipelining (#843) We now send pipelined RequestNext messages in batches instead of an initial batch and subsequent single messages. This potentially improves performance and reduces pressure on mini-protocol buffers --- protocol/chainsync/client.go | 53 +++++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 22 deletions(-) diff --git a/protocol/chainsync/client.go b/protocol/chainsync/client.go index 5755614a..3e474564 100644 --- a/protocol/chainsync/client.go +++ b/protocol/chainsync/client.go @@ -28,12 +28,13 @@ import ( // Client implements the ChainSync client type Client struct { *protocol.Protocol - config *Config - callbackContext CallbackContext - busyMutex sync.Mutex - readyForNextBlockChan chan bool - onceStart sync.Once - onceStop sync.Once + config *Config + callbackContext CallbackContext + busyMutex sync.Mutex + readyForNextBlockChan chan bool + onceStart sync.Once + onceStop sync.Once + syncPipelinedRequestNext int // waitingForCurrentTipChan will process all the requests for the current tip until the channel // is empty. @@ -404,8 +405,8 @@ func (c *Client) Sync(intersectPoints []common.Point) error { } intersectResultChan, cancel := c.wantIntersectFound() - msg := NewMsgFindIntersect(intersectPoints) - if err := c.SendMessage(msg); err != nil { + msgFindIntersect := NewMsgFindIntersect(intersectPoints) + if err := c.SendMessage(msgFindIntersect); err != nil { cancel() return err } @@ -418,14 +419,14 @@ func (c *Client) Sync(intersectPoints []common.Point) error { } } - // Pipeline the initial block requests to speed things up a bit - // Using a value higher than 10 seems to cause problems with NtN - for i := 0; i <= c.config.PipelineLimit; i++ { - msg := NewMsgRequestNext() - if err := c.SendMessage(msg); err != nil { - return err - } + // Send initial RequestNext + msgRequestNext := NewMsgRequestNext() + if err := c.SendMessage(msgRequestNext); err != nil { + return err } + // Reset pipelined message counter + c.syncPipelinedRequestNext = 0 + // Start sync loop go c.syncLoop() return nil } @@ -441,15 +442,23 @@ func (c *Client) syncLoop() { return } c.busyMutex.Lock() - // Request the next block - // In practice we already have multiple block requests pipelined - // and this just adds another one to the pile - msg := NewMsgRequestNext() - if err := c.SendMessage(msg); err != nil { - c.SendError(err) + // Wait for next block if we have pipelined messages + if c.syncPipelinedRequestNext > 0 { + c.syncPipelinedRequestNext-- c.busyMutex.Unlock() - return + continue + } + // Request the next block(s) + msgCount := max(c.config.PipelineLimit, 1) + for i := 0; i < msgCount; i++ { + msg := NewMsgRequestNext() + if err := c.SendMessage(msg); err != nil { + c.SendError(err) + c.busyMutex.Unlock() + return + } } + c.syncPipelinedRequestNext = msgCount - 1 c.busyMutex.Unlock() } }