Skip to content

Commit

Permalink
refactor: send RequestNext in batches for chainsync pipelining (#843)
Browse files Browse the repository at this point in the history
We now send pipelined RequestNext messages in batches instead of an
initial batch and subsequent single messages. This potentially improves
performance and reduces pressure on mini-protocol buffers
  • Loading branch information
agaffney authored Jan 27, 2025
1 parent 4810693 commit e62503b
Showing 1 changed file with 31 additions and 22 deletions.
53 changes: 31 additions & 22 deletions protocol/chainsync/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ import (
// Client implements the ChainSync client
type Client struct {
*protocol.Protocol
config *Config
callbackContext CallbackContext
busyMutex sync.Mutex
readyForNextBlockChan chan bool
onceStart sync.Once
onceStop sync.Once
config *Config
callbackContext CallbackContext
busyMutex sync.Mutex
readyForNextBlockChan chan bool
onceStart sync.Once
onceStop sync.Once
syncPipelinedRequestNext int

// waitingForCurrentTipChan will process all the requests for the current tip until the channel
// is empty.
Expand Down Expand Up @@ -404,8 +405,8 @@ func (c *Client) Sync(intersectPoints []common.Point) error {
}

intersectResultChan, cancel := c.wantIntersectFound()
msg := NewMsgFindIntersect(intersectPoints)
if err := c.SendMessage(msg); err != nil {
msgFindIntersect := NewMsgFindIntersect(intersectPoints)
if err := c.SendMessage(msgFindIntersect); err != nil {
cancel()
return err
}
Expand All @@ -418,14 +419,14 @@ func (c *Client) Sync(intersectPoints []common.Point) error {
}
}

// Pipeline the initial block requests to speed things up a bit
// Using a value higher than 10 seems to cause problems with NtN
for i := 0; i <= c.config.PipelineLimit; i++ {
msg := NewMsgRequestNext()
if err := c.SendMessage(msg); err != nil {
return err
}
// Send initial RequestNext
msgRequestNext := NewMsgRequestNext()
if err := c.SendMessage(msgRequestNext); err != nil {
return err
}
// Reset pipelined message counter
c.syncPipelinedRequestNext = 0
// Start sync loop
go c.syncLoop()
return nil
}
Expand All @@ -441,15 +442,23 @@ func (c *Client) syncLoop() {
return
}
c.busyMutex.Lock()
// Request the next block
// In practice we already have multiple block requests pipelined
// and this just adds another one to the pile
msg := NewMsgRequestNext()
if err := c.SendMessage(msg); err != nil {
c.SendError(err)
// Wait for next block if we have pipelined messages
if c.syncPipelinedRequestNext > 0 {
c.syncPipelinedRequestNext--
c.busyMutex.Unlock()
return
continue
}
// Request the next block(s)
msgCount := max(c.config.PipelineLimit, 1)
for i := 0; i < msgCount; i++ {
msg := NewMsgRequestNext()
if err := c.SendMessage(msg); err != nil {
c.SendError(err)
c.busyMutex.Unlock()
return
}
}
c.syncPipelinedRequestNext = msgCount - 1
c.busyMutex.Unlock()
}
}
Expand Down

0 comments on commit e62503b

Please sign in to comment.