From ca399f3d4499892f767bcd1a83c7bcccc9b65e24 Mon Sep 17 00:00:00 2001 From: kampadais Date: Thu, 3 Oct 2024 16:00:34 +0300 Subject: [PATCH] feat: redesign controller-replica communication Signed-off-by: kampadais --- pkg/dataconn/client.go | 163 +++++++---------------------------------- pkg/dataconn/server.go | 38 +++++++--- 2 files changed, 54 insertions(+), 147 deletions(-) diff --git a/pkg/dataconn/client.go b/pkg/dataconn/client.go index 280c448d5..54fc9b836 100644 --- a/pkg/dataconn/client.go +++ b/pkg/dataconn/client.go @@ -2,20 +2,14 @@ package dataconn import ( "errors" + "github.com/longhorn/longhorn-engine/pkg/types" + "github.com/sirupsen/logrus" "io" "net" - "time" - - "github.com/sirupsen/logrus" - - journal "github.com/longhorn/sparse-tools/stats" - - "github.com/longhorn/longhorn-engine/pkg/types" ) -var ( - //ErrRWTimeout r/w operation timeout - ErrRWTimeout = errors.New("r/w timeout") +const ( + queueLength = 4196 ) // Client replica client @@ -25,7 +19,8 @@ type Client struct { send chan *Message responses chan *Message seq uint32 - messages map[uint32]*Message + messages [queueLength]*Message + SeqChan chan uint32 wires []*Wire peerAddr string sharedTimeouts types.SharedTimeouts @@ -37,6 +32,7 @@ func NewClient(conns []net.Conn, sharedTimeouts types.SharedTimeouts) *Client { for _, conn := range conns { wires = append(wires, NewWire(conn)) } + c := &Client{ wires: wires, peerAddr: conns[0].RemoteAddr().String(), @@ -44,10 +40,13 @@ func NewClient(conns []net.Conn, sharedTimeouts types.SharedTimeouts) *Client { requests: make(chan *Message, 1024), send: make(chan *Message, 1024), responses: make(chan *Message, 1024), - messages: map[uint32]*Message{}, + messages: [queueLength]*Message{}, + SeqChan: make(chan uint32, queueLength), sharedTimeouts: sharedTimeouts, } - go c.loop() + for i := uint32(0); i < queueLength; i++ { + c.SeqChan <- i + } c.write() c.read() return c @@ -99,7 +98,7 @@ func (c *Client) operation(op uint32, buf []byte, length uint32, offset int64) ( msg.Data = buf } - c.requests <- &msg + c.handleRequest(&msg) <-msg.Complete // Only copy the message if a read is requested @@ -112,6 +111,9 @@ func (c *Client) operation(op uint32, buf []byte, length uint32, offset int64) ( if msg.Type == TypeEOF { return int(msg.Size), io.EOF } + + c.SeqChan <- msg.Seq + return int(msg.Size), nil } @@ -123,145 +125,34 @@ func (c *Client) Close() { c.end <- struct{}{} } -func (c *Client) loop() { - defer close(c.send) - ticker := time.NewTicker(1 * time.Second) - defer ticker.Stop() - - var clientError error - var ioInflight int - var timeOfLastActivity time.Time - - decremented := false - c.sharedTimeouts.Increment() - // Ensure we always decrement the sharedTimeouts counter regardless of how we leave this loop. - defer func() { - if !decremented { - c.sharedTimeouts.Decrement() - } - }() - - // handleClientError cleans up all in flight messages - // also stores the error so that future requests/responses get errored immediately. - handleClientError := func(err error) { - clientError = err - for _, msg := range c.messages { - c.replyError(msg, err) - } - - ioInflight = 0 - timeOfLastActivity = time.Time{} - } - - for { - select { - case <-c.end: - return - case <-ticker.C: - if timeOfLastActivity.IsZero() || ioInflight == 0 { - continue - } - - exceededTimeout := c.sharedTimeouts.CheckAndDecrement(time.Since(timeOfLastActivity)) - if exceededTimeout > 0 { - decremented = true - logrus.Errorf("R/W Timeout. No response received in %v", exceededTimeout) - handleClientError(ErrRWTimeout) - journal.PrintLimited(1000) - } - case req := <-c.requests: - if clientError != nil { - c.replyError(req, clientError) - continue - } - - if req.Type == TypeRead || req.Type == TypeWrite || req.Type == TypeUnmap { - if ioInflight == 0 { - // If nothing is in-flight, we should get a fresh timeout. - timeOfLastActivity = time.Now() - } - ioInflight++ - } - - c.handleRequest(req) - case resp := <-c.responses: - if resp.transportErr != nil { - handleClientError(resp.transportErr) - continue - } - - req, pending := c.messages[resp.Seq] - if !pending { - logrus.Warnf("Received response message id %v seq %v type %v for non pending request", resp.ID, resp.Seq, resp.Type) - continue - } - - if req.Type == TypeRead || req.Type == TypeWrite || req.Type == TypeUnmap { - ioInflight-- - timeOfLastActivity = time.Now() - } - - if clientError != nil { - c.replyError(req, clientError) - continue - } - - c.handleResponse(resp) - } - } -} - func (c *Client) nextSeq() uint32 { c.seq++ return c.seq } func (c *Client) replyError(req *Message, err error) { - if opErr := journal.RemovePendingOp(req.ID, false); opErr != nil { - logrus.WithError(opErr).WithFields(logrus.Fields{ - "seq": req.Seq, - "id": req.ID, - }).Warn("Error removing pending operation") - } - delete(c.messages, req.Seq) req.Type = TypeError req.Data = []byte(err.Error()) req.Complete <- struct{}{} } func (c *Client) handleRequest(req *Message) { - switch req.Type { - case TypeRead: - req.ID = journal.InsertPendingOp(time.Now(), c.TargetID(), journal.OpRead, int(req.Size)) - case TypeWrite: - req.ID = journal.InsertPendingOp(time.Now(), c.TargetID(), journal.OpWrite, int(req.Size)) - case TypeUnmap: - req.ID = journal.InsertPendingOp(time.Now(), c.TargetID(), journal.OpUnmap, int(req.Size)) - case TypePing: - req.ID = journal.InsertPendingOp(time.Now(), c.TargetID(), journal.OpPing, 0) - } - req.MagicVersion = MagicVersion - req.Seq = c.nextSeq() + + req.Seq = <-c.SeqChan + c.messages[req.Seq] = req c.send <- req } func (c *Client) handleResponse(resp *Message) { - if req, ok := c.messages[resp.Seq]; ok { - err := journal.RemovePendingOp(req.ID, true) - if err != nil { - logrus.WithError(err).WithFields(logrus.Fields{ - "seq": resp.Seq, - "id": req.ID, - }).Warn("Error removing pending operation") - } - delete(c.messages, resp.Seq) - req.Type = resp.Type - req.Size = resp.Size - req.Data = resp.Data - req.Complete <- struct{}{} - } + req := c.messages[resp.Seq] + + req.Type = resp.Type + req.Size = resp.Size + req.Data = resp.Data + req.Complete <- struct{}{} + } func (c *Client) write() { @@ -290,7 +181,7 @@ func (c *Client) read() { } break } - c.responses <- msg + c.handleResponse(msg) } }(wire) } diff --git a/pkg/dataconn/server.go b/pkg/dataconn/server.go index 88c2f43aa..701913209 100644 --- a/pkg/dataconn/server.go +++ b/pkg/dataconn/server.go @@ -9,20 +9,45 @@ import ( "github.com/longhorn/longhorn-engine/pkg/types" ) +const ( + threadCount = 256 +) + type Server struct { wire *Wire + requests chan *Message responses chan *Message done chan struct{} data types.DataProcessor } func NewServer(conn net.Conn, data types.DataProcessor) *Server { - return &Server{ + //init theads + server := &Server{ wire: NewWire(conn), + requests: make(chan *Message, 1024), responses: make(chan *Message, 1024), done: make(chan struct{}, 5), data: data, } + for i := 0; i < threadCount; i++ { + go func(s *Server) { + for { + msg := <-s.requests + switch msg.Type { + case TypeRead: + s.handleRead(msg) + case TypeWrite: + s.handleWrite(msg) + case TypeUnmap: + s.handleUnmap(msg) + case TypePing: + s.handlePing(msg) + } + } + }(server) + } + return server } func (s *Server) Handle() error { @@ -43,16 +68,7 @@ func (s *Server) readFromWire(ret chan<- error) { ret <- err return } - switch msg.Type { - case TypeRead: - go s.handleRead(msg) - case TypeWrite: - go s.handleWrite(msg) - case TypeUnmap: - go s.handleUnmap(msg) - case TypePing: - go s.handlePing(msg) - } + s.requests <- msg ret <- nil }