Skip to content

Commit

Permalink
pool: track usage of incoming streams (#10710)
Browse files Browse the repository at this point in the history
Track usage of incoming streams on a connection. Connections without
reference counts get marked as unused and reaped in a periodic job.

This fixes a bug where `alloc exec` and `alloc fs` sessions get terminated
unexpectedly. Previously, when a client heartbeats switches between
servers, the pool connection reaper eventually identifies the connection
as unused and closes it even if it has an active exec/fs sessions.

Fixes #10579
  • Loading branch information
Mahmood Ali committed Jun 9, 2021
1 parent 9db0909 commit d80d0d6
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 26 deletions.
15 changes: 7 additions & 8 deletions client/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
inmem "github.com/hashicorp/nomad/helper/codec"
"github.com/hashicorp/nomad/helper/pool"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/yamux"
)

// rpcEndpoints holds the RPC endpoints
Expand Down Expand Up @@ -277,38 +276,38 @@ func (c *Client) setupClientRpcServer(server *rpc.Server) {
// connection.
func (c *Client) rpcConnListener() {
// Make a channel for new connections.
conns := make(chan *yamux.Session, 4)
conns := make(chan *pool.Conn, 4)
c.connPool.SetConnListener(conns)

for {
select {
case <-c.shutdownCh:
return
case session, ok := <-conns:
case conn, ok := <-conns:
if !ok {
continue
}

go c.listenConn(session)
go c.listenConn(conn)
}
}
}

// listenConn is used to listen for connections being made from the server on
// pre-existing connection. This should be called in a goroutine.
func (c *Client) listenConn(s *yamux.Session) {
func (c *Client) listenConn(conn *pool.Conn) {
for {
conn, err := s.Accept()
stream, err := conn.AcceptStream()
if err != nil {
if s.IsClosed() {
if conn.IsClosed() {
return
}

c.rpcLogger.Error("failed to accept RPC conn", "error", err)
continue
}

go c.handleConn(conn)
go c.handleConn(stream)
metrics.IncrCounter([]string{"client", "rpc", "accept_conn"}, 1)
}
}
Expand Down
65 changes: 49 additions & 16 deletions helper/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,21 @@ type Conn struct {
clientLock sync.Mutex
}

// markForUse does all the bookkeeping required to ready a connection for use.
// markForUse does all the bookkeeping required to ready a connection for use,
// and ensure that active connections don't get reaped.
func (c *Conn) markForUse() {
c.lastUsed = time.Now()
atomic.AddInt32(&c.refCount, 1)
}

// releaseUse is the complement of `markForUse`, to free up the reference count
func (c *Conn) releaseUse() {
refCount := atomic.AddInt32(&c.refCount, -1)
if refCount == 0 && atomic.LoadInt32(&c.shouldClose) == 1 {
c.Close()
}
}

func (c *Conn) Close() error {
return c.session.Close()
}
Expand Down Expand Up @@ -122,6 +131,40 @@ func (c *Conn) returnClient(client *StreamClient) {
}
}

func (c *Conn) IsClosed() bool {
return c.session.IsClosed()
}

func (c *Conn) AcceptStream() (net.Conn, error) {
s, err := c.session.AcceptStream()
if err != nil {
return nil, err
}

c.markForUse()
return &incomingStream{
Stream: s,
parent: c,
}, nil
}

// incomingStream wraps yamux.Stream but frees the underlying yamux.Session
// when closed
type incomingStream struct {
*yamux.Stream

parent *Conn
}

func (s *incomingStream) Close() error {
err := s.Stream.Close()

// always release parent even if error
s.parent.releaseUse()

return err
}

// ConnPool is used to maintain a connection pool to other
// Nomad servers. This is used to reduce the latency of
// RPC requests between servers. It is only used to pool
Expand Down Expand Up @@ -157,7 +200,7 @@ type ConnPool struct {

// connListener is used to notify a potential listener of a new connection
// being made.
connListener chan<- *yamux.Session
connListener chan<- *Conn
}

// NewPool is used to make a new connection pool
Expand Down Expand Up @@ -220,7 +263,7 @@ func (p *ConnPool) ReloadTLS(tlsWrap tlsutil.RegionWrapper) {

// SetConnListener is used to listen to new connections being made. The
// channel will be closed when the conn pool is closed or a new listener is set.
func (p *ConnPool) SetConnListener(l chan<- *yamux.Session) {
func (p *ConnPool) SetConnListener(l chan<- *Conn) {
p.Lock()
defer p.Unlock()

Expand Down Expand Up @@ -276,7 +319,7 @@ func (p *ConnPool) acquire(region string, addr net.Addr, version int) (*Conn, er
// If there is a connection listener, notify them of the new connection.
if p.connListener != nil {
select {
case p.connListener <- c.session:
case p.connListener <- c:
default:
}
}
Expand Down Expand Up @@ -386,14 +429,6 @@ func (p *ConnPool) clearConn(conn *Conn) {
}
}

// releaseConn is invoked when we are done with a conn to reduce the ref count
func (p *ConnPool) releaseConn(conn *Conn) {
refCount := atomic.AddInt32(&conn.refCount, -1)
if refCount == 0 && atomic.LoadInt32(&conn.shouldClose) == 1 {
conn.Close()
}
}

// getClient is used to get a usable client for an address and protocol version
func (p *ConnPool) getRPCClient(region string, addr net.Addr, version int) (*Conn, *StreamClient, error) {
retries := 0
Expand All @@ -408,7 +443,7 @@ START:
client, err := conn.getRPCClient()
if err != nil {
p.clearConn(conn)
p.releaseConn(conn)
conn.releaseUse()

// Try to redial, possible that the TCP session closed due to timeout
if retries == 0 {
Expand Down Expand Up @@ -448,6 +483,7 @@ func (p *ConnPool) RPC(region string, addr net.Addr, version int, method string,
if err != nil {
return fmt.Errorf("rpc error: %w", err)
}
defer conn.releaseUse()

// Make the RPC call
err = msgpackrpc.CallWithCodec(sc.codec, method, args, reply)
Expand All @@ -461,8 +497,6 @@ func (p *ConnPool) RPC(region string, addr net.Addr, version int, method string,
p.clearConn(conn)
}

p.releaseConn(conn)

// If the error is an RPC Coded error
// return the coded error without wrapping
if structs.IsErrRPCCoded(err) {
Expand All @@ -475,7 +509,6 @@ func (p *ConnPool) RPC(region string, addr net.Addr, version int, method string,

// Done with the connection
conn.returnClient(sc)
p.releaseConn(conn)
return nil
}

Expand Down
3 changes: 1 addition & 2 deletions helper/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/hashicorp/nomad/helper/freeport"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/yamux"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -47,7 +46,7 @@ func TestConnPool_ConnListener(t *testing.T) {
pool := newTestPool(t)

// Setup a listener
c := make(chan *yamux.Session, 1)
c := make(chan *Conn, 1)
pool.SetConnListener(c)

// Make an RPC
Expand Down

0 comments on commit d80d0d6

Please sign in to comment.