Skip to content

Commit

Permalink
Merge pull request #22 from libp2p/feat/ctrlbypass
Browse files Browse the repository at this point in the history
less head-of-line blocking for control messages
  • Loading branch information
Stebalien authored Mar 18, 2020
2 parents 97856b4 + dd0e186 commit d913bbd
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 38 deletions.
5 changes: 5 additions & 0 deletions mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ type Config struct {
// MaxMessageSize is the maximum size of a message that we'll send on a
// stream. This ensures that a single stream doesn't hog a connection.
MaxMessageSize uint32

// SendQueueSize is the maximum number of messages we'll keep in the local
// send queue before applying back pressure to writers.
SendQueueSize uint32
}

// DefaultConfig is used to return a default configuration
Expand All @@ -61,6 +65,7 @@ func DefaultConfig() *Config {
ReadBufSize: 4096,
MaxMessageSize: 64 * 1024, // Means 64KiB/10s = 52kbps minimum speed.
WriteCoalesceDelay: 100 * time.Microsecond,
SendQueueSize: 64,
}
}

Expand Down
97 changes: 67 additions & 30 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"sync/atomic"
"time"

"github.com/libp2p/go-buffer-pool"
pool "github.com/libp2p/go-buffer-pool"
)

// Session is used to wrap a reliable ordered connection and to
Expand Down Expand Up @@ -67,6 +67,9 @@ type Session struct {
// sendCh is used to send messages
sendCh chan []byte

// sendCtrlCh is used to send control messages (skipping the normal send queue)
sendCtrlCh chan []byte

// recvDoneCh is closed when recv() exits to avoid a race
// between stream registration and stream shutdown
recvDoneCh chan struct{}
Expand Down Expand Up @@ -109,7 +112,8 @@ func newSession(config *Config, conn net.Conn, client bool, readBuf int) *Sessio
inflight: make(map[uint32]struct{}),
synCh: make(chan struct{}, config.AcceptBacklog),
acceptCh: make(chan *Stream, config.AcceptBacklog),
sendCh: make(chan []byte, 64),
sendCh: make(chan []byte, config.SendQueueSize),
sendCtrlCh: make(chan []byte, 16),
recvDoneCh: make(chan struct{}),
sendDoneCh: make(chan struct{}),
shutdownCh: make(chan struct{}),
Expand Down Expand Up @@ -270,7 +274,7 @@ func (s *Session) exitErr(err error) {
// GoAway can be used to prevent accepting further
// connections. It does not close the underlying conn.
func (s *Session) GoAway() error {
return s.sendMsg(s.goAway(goAwayNormal), nil, nil)
return s.sendMsg(s.goAway(goAwayNormal), nil, nil, true)
}

// goAway is used to send a goAway message
Expand All @@ -294,7 +298,7 @@ func (s *Session) Ping() (time.Duration, error) {

// Send the ping request
hdr := encode(typePing, flagSYN, 0, id)
if err := s.sendMsg(hdr, nil, nil); err != nil {
if err := s.sendMsg(hdr, nil, nil, true); err != nil {
return 0, err
}

Expand Down Expand Up @@ -374,7 +378,7 @@ func (s *Session) extendKeepalive() {
}

// send sends the header and body.
func (s *Session) sendMsg(hdr header, body []byte, deadline <-chan struct{}) error {
func (s *Session) sendMsg(hdr header, body []byte, deadline <-chan struct{}, control bool) error {
select {
case <-s.shutdownCh:
return s.shutdownErr
Expand All @@ -386,11 +390,17 @@ func (s *Session) sendMsg(hdr header, body []byte, deadline <-chan struct{}) err
copy(buf[:headerSize], hdr[:])
copy(buf[headerSize:], body)

var sendCh chan []byte
if control {
sendCh = s.sendCtrlCh
} else {
sendCh = s.sendCh
}
select {
case <-s.shutdownCh:
pool.Put(buf)
return s.shutdownErr
case s.sendCh <- buf:
case sendCh <- buf:
return nil
case <-deadline:
pool.Put(buf)
Expand Down Expand Up @@ -446,38 +456,65 @@ func (s *Session) sendLoop() error {
default:
}

var buf []byte
// Preferentially use control channel.
select {
case buf = <-s.sendCtrlCh:
goto SEND
case <-s.shutdownCh:
return nil
default:
}

// Flushes at least once every 100 microseconds unless we're
// constantly writing.
var buf []byte
select {
case buf = <-s.sendCh:
goto SEND
case buf = <-s.sendCtrlCh:
goto SEND
case <-s.shutdownCh:
return nil
default:
}

select {
case buf = <-s.sendCh:
goto SEND
case buf = <-s.sendCtrlCh:
goto SEND
case <-s.shutdownCh:
return nil
case <-writeTimeoutCh:
}

if err := writer.Flush(); err != nil {
if os.IsTimeout(err) {
err = ErrConnectionWriteTimeout
}
return err
}

// Preferentially use control channel.
select {
case buf = <-s.sendCtrlCh:
case <-s.shutdownCh:
return nil
default:
select {
case buf = <-s.sendCh:
case buf = <-s.sendCtrlCh:
case <-s.shutdownCh:
return nil
case <-writeTimeoutCh:
if err := writer.Flush(); err != nil {
if os.IsTimeout(err) {
err = ErrConnectionWriteTimeout
}
return err
}

select {
case buf = <-s.sendCh:
case <-s.shutdownCh:
return nil
}

if writeTimeout != nil {
writeTimeout.Reset(s.config.WriteCoalesceDelay)
}
}
}

if writeTimeout != nil {
writeTimeout.Reset(s.config.WriteCoalesceDelay)
}

SEND:

if err := extendWriteDeadline(); err != nil {
pool.Put(buf)
return err
Expand Down Expand Up @@ -582,7 +619,7 @@ func (s *Session) handleStreamMessage(hdr header) error {
// Check if this is a window update
if hdr.MsgType() == typeWindowUpdate {
if err := stream.incrSendWindow(hdr, flags); err != nil {
if sendErr := s.sendMsg(s.goAway(goAwayProtoErr), nil, nil); sendErr != nil {
if sendErr := s.sendMsg(s.goAway(goAwayProtoErr), nil, nil, true); sendErr != nil {
s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr)
}
return err
Expand All @@ -592,7 +629,7 @@ func (s *Session) handleStreamMessage(hdr header) error {

// Read the new data
if err := stream.readData(hdr, flags, s.reader); err != nil {
if sendErr := s.sendMsg(s.goAway(goAwayProtoErr), nil, nil); sendErr != nil {
if sendErr := s.sendMsg(s.goAway(goAwayProtoErr), nil, nil, true); sendErr != nil {
s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr)
}
return err
Expand All @@ -610,7 +647,7 @@ func (s *Session) handlePing(hdr header) error {
if flags&flagSYN == flagSYN {
go func() {
hdr := encode(typePing, flagACK, 0, pingID)
if err := s.sendMsg(hdr, nil, nil); err != nil {
if err := s.sendMsg(hdr, nil, nil, true); err != nil {
s.logger.Printf("[WARN] yamux: failed to send ping reply: %v", err)
}
}()
Expand Down Expand Up @@ -656,7 +693,7 @@ func (s *Session) incomingStream(id uint32) error {
// Reject immediately if we are doing a go away
if atomic.LoadInt32(&s.localGoAway) == 1 {
hdr := encode(typeWindowUpdate, flagRST, id, 0)
return s.sendMsg(hdr, nil, nil)
return s.sendMsg(hdr, nil, nil, true)
}

// Allocate a new stream
Expand All @@ -668,7 +705,7 @@ func (s *Session) incomingStream(id uint32) error {
// Check if stream already exists
if _, ok := s.streams[id]; ok {
s.logger.Printf("[ERR] yamux: duplicate stream declared")
if sendErr := s.sendMsg(s.goAway(goAwayProtoErr), nil, nil); sendErr != nil {
if sendErr := s.sendMsg(s.goAway(goAwayProtoErr), nil, nil, true); sendErr != nil {
s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr)
}
return ErrDuplicateStream
Expand All @@ -686,7 +723,7 @@ func (s *Session) incomingStream(id uint32) error {
s.logger.Printf("[WARN] yamux: backlog exceeded, forcing connection reset")
delete(s.streams, id)
hdr := encode(typeWindowUpdate, flagRST, id, 0)
return s.sendMsg(hdr, nil, nil)
return s.sendMsg(hdr, nil, nil, false)
}
}

Expand Down
8 changes: 7 additions & 1 deletion session_norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ import (
)

func TestSession_PingOfDeath(t *testing.T) {
client, server := testClientServerConfig(testConfNoKeepAlive())
conf := testConfNoKeepAlive()
// This test is slow and can easily time out on writes on CI.
//
// In the future, we might want to prioritize ping-replies over even
// other control messages, but that seems like overkill for now.
conf.ConnectionWriteTimeout = 1 * time.Second
client, server := testClientServerConfig(conf)
defer client.Close()
defer server.Close()

Expand Down
2 changes: 1 addition & 1 deletion session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1197,7 +1197,7 @@ func TestSession_sendMsg_Timeout(t *testing.T) {

hdr := encode(typePing, flagACK, 0, 0)
for {
err := client.sendMsg(hdr, nil, nil)
err := client.sendMsg(hdr, nil, nil, true)
if err == nil {
continue
} else if err == ErrConnectionWriteTimeout {
Expand Down
15 changes: 9 additions & 6 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"sync/atomic"
"time"

"github.com/libp2p/go-buffer-pool"
pool "github.com/libp2p/go-buffer-pool"
)

type streamState int
Expand All @@ -20,6 +20,7 @@ const (
streamRemoteClose
streamClosed
streamReset
streamWriteTimeout
)

// Stream is used to represent a logical stream
Expand All @@ -41,6 +42,7 @@ type Stream struct {

recvNotifyCh chan struct{}
sendNotifyCh chan struct{}
sendCh chan []byte

readDeadline, writeDeadline pipeDeadline
}
Expand Down Expand Up @@ -164,12 +166,13 @@ START:
// Determine the flags if any
flags = s.sendFlags()

// Send up to min(message, window
// Send up to min(message, window)
max = min(window, s.session.config.MaxMessageSize-headerSize, uint32(len(b)))

// Send the header
hdr = encode(typeData, flags, s.id, max)
if err = s.session.sendMsg(hdr, b[:max], s.writeDeadline.wait()); err != nil {
if err = s.session.sendMsg(hdr, b[:max], s.writeDeadline.wait(), false); err != nil {
// Indicate queued message.
return 0, err
}

Expand Down Expand Up @@ -228,7 +231,7 @@ func (s *Stream) sendWindowUpdate() error {

// Send the header
hdr := encode(typeWindowUpdate, flags, s.id, delta)
if err := s.session.sendMsg(hdr, nil, nil); err != nil {
if err := s.session.sendMsg(hdr, nil, nil, true); err != nil {
return err
}
return nil
Expand All @@ -239,13 +242,13 @@ func (s *Stream) sendClose() error {
flags := s.sendFlags()
flags |= flagFIN
hdr := encode(typeWindowUpdate, flags, s.id, 0)
return s.session.sendMsg(hdr, nil, nil)
return s.session.sendMsg(hdr, nil, nil, false)
}

// sendReset is used to send a RST
func (s *Stream) sendReset() error {
hdr := encode(typeWindowUpdate, flagRST, s.id, 0)
return s.session.sendMsg(hdr, nil, nil)
return s.session.sendMsg(hdr, nil, nil, false)
}

// Reset resets the stream (forcibly closes the stream)
Expand Down

0 comments on commit d913bbd

Please sign in to comment.