Skip to content

Commit

Permalink
Add signaling support for connection pool waiting (globalsign#115)
Browse files Browse the repository at this point in the history
* Add signaling support for connection pool waiting

The current behaviour when the poolLimit is reached and a new connection
is required is to poll every 100ms to see if there is now headroom to
make a new connection. This adds tremendous latency to the
limit-hit-path.

This commit changes the checkout behaviour to watch on a condition
variable for connections to become available, and the checkin behaviour
to signal this variable. This should allow waiters to use connections
immediately after they become available.

A new parameter is also added to DialInfo, PoolTimeout, which is the
maximum time that clients will wait for connection headroom to become
available. By default this is unlimited.

* Add stats for connection pool timings

This exposes four new counters

* The number of times a socket was successfully obtained from the
  connection pool
* The number of times the connection pool needed to be waited on
* The total time that has been spent waiting for a conneciton to become
  available
* The number of times socket acquisition failed due to a pool timeout

* Gitignore .vscode directory

I'm using vscode and accidently committed the .vscode directroy;
.gitignore this footgun.
  • Loading branch information
Konstantinos Tsanaktsidis authored and domodwyer committed Apr 3, 2018
1 parent 00e7550 commit f48d937
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 26 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
_harness

.vscode
22 changes: 13 additions & 9 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,9 +618,17 @@ func (cluster *mongoCluster) syncServersIteration(direct bool) {
// true, it will attempt to return a socket to a slave server. If it is
// false, the socket will necessarily be to a master server.
func (cluster *mongoCluster) AcquireSocket(mode Mode, slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D, poolLimit int) (s *mongoSocket, err error) {
return cluster.AcquireSocketWithPoolTimeout(mode, slaveOk, syncTimeout, socketTimeout, serverTags, poolLimit, 0)
}

// AcquireSocketWithPoolTimeout returns a socket to a server in the cluster. If slaveOk is
// true, it will attempt to return a socket to a slave server. If it is
// false, the socket will necessarily be to a master server.
func (cluster *mongoCluster) AcquireSocketWithPoolTimeout(
mode Mode, slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D, poolLimit int, poolTimeout time.Duration,
) (s *mongoSocket, err error) {
var started time.Time
var syncCount uint
warnedLimit := false
for {
cluster.RLock()
for {
Expand Down Expand Up @@ -662,14 +670,10 @@ func (cluster *mongoCluster) AcquireSocket(mode Mode, slaveOk bool, syncTimeout
continue
}

s, abended, err := server.AcquireSocket(poolLimit, socketTimeout)
if err == errPoolLimit {
if !warnedLimit {
warnedLimit = true
log("WARNING: Per-server connection limit reached.")
}
time.Sleep(100 * time.Millisecond)
continue
s, abended, err := server.AcquireSocketWithBlocking(poolLimit, socketTimeout, poolTimeout)
if err == errPoolTimeout {
// No need to remove servers from the topology if acquiring a socket fails for this reason.
return nil, err
}
if err != nil {
cluster.removeServer(server)
Expand Down
42 changes: 42 additions & 0 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1583,6 +1583,9 @@ func (s *S) TestPoolLimitSimple(c *C) {
}
defer session.Close()

// So we can measure the stats for the blocking operation
mgo.ResetStats()

// Put one socket in use.
c.Assert(session.Ping(), IsNil)

Expand All @@ -1603,6 +1606,11 @@ func (s *S) TestPoolLimitSimple(c *C) {
session.Refresh()
delay := <-done
c.Assert(delay > 300*time.Millisecond, Equals, true, Commentf("Delay: %s", delay))
stats := mgo.GetStats()
c.Assert(stats.TimesSocketAcquired, Equals, 2)
c.Assert(stats.TimesWaitedForPool, Equals, 1)
c.Assert(stats.PoolTimeouts, Equals, 0)
c.Assert(stats.TotalPoolWaitTime > 300*time.Millisecond, Equals, true)
}
}

Expand Down Expand Up @@ -1649,6 +1657,40 @@ func (s *S) TestPoolLimitMany(c *C) {
c.Assert(delay < 6e9, Equals, true)
}

func (s *S) TestPoolLimitTimeout(c *C) {
if *fast {
c.Skip("-fast")
}

session, err := mgo.Dial("localhost:40011")
c.Assert(err, IsNil)
defer session.Close()
session.SetPoolTimeout(1 * time.Second)
session.SetPoolLimit(1)

mgo.ResetStats()

// Put one socket in use.
c.Assert(session.Ping(), IsNil)

// Now block trying to get another one due to the pool limit.
copy := session.Copy()
defer copy.Close()
started := time.Now()
err = copy.Ping()
delay := time.Since(started)

c.Assert(delay > 900*time.Millisecond, Equals, true, Commentf("Delay: %s", delay))
c.Assert(delay < 1100*time.Millisecond, Equals, true, Commentf("Delay: %s", delay))
c.Assert(strings.Contains(err.Error(), "could not acquire connection within pool timeout"), Equals, true, Commentf("Error: %s", err))
stats := mgo.GetStats()
c.Assert(stats.PoolTimeouts, Equals, 1)
c.Assert(stats.TimesSocketAcquired, Equals, 1)
c.Assert(stats.TimesWaitedForPool, Equals, 1)
c.Assert(stats.TotalPoolWaitTime > 900*time.Millisecond, Equals, true)
c.Assert(stats.TotalPoolWaitTime < 1100*time.Millisecond, Equals, true)
}

func (s *S) TestSetModeEventualIterBug(c *C) {
session1, err := mgo.Dial("localhost:40011")
c.Assert(err, IsNil)
Expand Down
85 changes: 79 additions & 6 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type mongoServer struct {
abended bool
minPoolSize int
maxIdleTimeMS int
poolWaiter *sync.Cond
}

type dialer struct {
Expand All @@ -78,18 +79,19 @@ type mongoServerInfo struct {

var defaultServerInfo mongoServerInfo

func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer, minPoolSize, maxIdleTimeMS int) *mongoServer {
func newServer(addr string, tcpaddr *net.TCPAddr, syncChan chan bool, dial dialer, minPoolSize, maxIdleTimeMS int) *mongoServer {
server := &mongoServer{
Addr: addr,
ResolvedAddr: tcpaddr.String(),
tcpaddr: tcpaddr,
sync: sync,
sync: syncChan,
dial: dial,
info: &defaultServerInfo,
pingValue: time.Hour, // Push it back before an actual ping.
minPoolSize: minPoolSize,
maxIdleTimeMS: maxIdleTimeMS,
}
server.poolWaiter = sync.NewCond(server)
go server.pinger(true)
if maxIdleTimeMS != 0 {
go server.poolShrinker()
Expand All @@ -98,6 +100,7 @@ func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer, m
}

var errPoolLimit = errors.New("per-server connection limit reached")
var errPoolTimeout = errors.New("could not acquire connection within pool timeout")
var errServerClosed = errors.New("server was closed")

// AcquireSocket returns a socket for communicating with the server.
Expand All @@ -109,18 +112,80 @@ var errServerClosed = errors.New("server was closed")
// use in this server is greater than the provided limit, errPoolLimit is
// returned.
func (server *mongoServer) AcquireSocket(poolLimit int, timeout time.Duration) (socket *mongoSocket, abended bool, err error) {
return server.acquireSocketInternal(poolLimit, timeout, false, 0*time.Millisecond)
}

// AcquireSocketWithBlocking wraps AcquireSocket, but if a socket is not available, it will _not_
// return errPoolLimit. Instead, it will block waiting for a socket to become available. If poolTimeout
// should elapse before a socket is available, it will return errPoolTimeout.
func (server *mongoServer) AcquireSocketWithBlocking(
poolLimit int, socketTimeout time.Duration, poolTimeout time.Duration,
) (socket *mongoSocket, abended bool, err error) {
return server.acquireSocketInternal(poolLimit, socketTimeout, true, poolTimeout)
}

func (server *mongoServer) acquireSocketInternal(
poolLimit int, timeout time.Duration, shouldBlock bool, poolTimeout time.Duration,
) (socket *mongoSocket, abended bool, err error) {
for {
server.Lock()
abended = server.abended
if server.closed {
server.Unlock()
return nil, abended, errServerClosed
}
n := len(server.unusedSockets)
if poolLimit > 0 && len(server.liveSockets)-n >= poolLimit {
server.Unlock()
return nil, false, errPoolLimit
if poolLimit > 0 {
if shouldBlock {
// Beautiful. Golang conditions don't have a WaitWithTimeout, so I've implemented the timeout
// with a wait + broadcast. The broadcast will cause the loop here to re-check the timeout,
// and fail if it is blown.
// Yes, this is a spurious wakeup, but we can't do a directed signal without having one condition
// variable per waiter, which would involve loop traversal in the RecycleSocket
// method.
// We also can't use the approach of turning a condition variable into a channel outlined in
// https://github.com/golang/go/issues/16620, since the lock needs to be held in _this_ goroutine.
waitDone := make(chan struct{})
timeoutHit := false
if poolTimeout > 0 {
go func() {
select {
case <-waitDone:
case <-time.After(poolTimeout):
// timeoutHit is part of the wait condition, so needs to be changed under mutex.
server.Lock()
defer server.Unlock()
timeoutHit = true
server.poolWaiter.Broadcast()
}
}()
}
timeSpentWaiting := time.Duration(0)
for len(server.liveSockets)-len(server.unusedSockets) >= poolLimit && !timeoutHit {
// We only count time spent in Wait(), and not time evaluating the entire loop,
// so that in the happy non-blocking path where the condition above evaluates true
// first time, we record a nice round zero wait time.
waitStart := time.Now()
// unlocks server mutex, waits, and locks again. Thus, the condition
// above is evaluated only when the lock is held.
server.poolWaiter.Wait()
timeSpentWaiting += time.Since(waitStart)
}
close(waitDone)
if timeoutHit {
server.Unlock()
stats.noticePoolTimeout(timeSpentWaiting)
return nil, false, errPoolTimeout
}
// Record that we fetched a connection of of a socket list and how long we spent waiting
stats.noticeSocketAcquisition(timeSpentWaiting)
} else {
if len(server.liveSockets)-len(server.unusedSockets) >= poolLimit {
server.Unlock()
return nil, false, errPoolLimit
}
}
}
n := len(server.unusedSockets)
if n > 0 {
socket = server.unusedSockets[n-1]
server.unusedSockets[n-1] = nil // Help GC.
Expand Down Expand Up @@ -231,6 +296,14 @@ func (server *mongoServer) RecycleSocket(socket *mongoSocket) {
socket.lastTimeUsed = time.Now()
server.unusedSockets = append(server.unusedSockets, socket)
}
// If anybody is waiting for a connection, they should try now.
// Note that this _has_ to be broadcast, not signal; the signature of AcquireSocket
// and AcquireSocketWithBlocking allow the caller to specify the max number of connections,
// rather than that being an intrinsic property of the connection pool (I assume to ensure
// that there is always a connection available for replset topology discovery). Thus, once
// a connection is returned to the pool, _every_ waiter needs to check if the connection count
// is underneath their particular value for poolLimit.
server.poolWaiter.Broadcast()
server.Unlock()
}

Expand Down
25 changes: 24 additions & 1 deletion session.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type Session struct {
syncTimeout time.Duration
sockTimeout time.Duration
poolLimit int
poolTimeout time.Duration
consistency Mode
creds []Credential
dialCred *Credential
Expand Down Expand Up @@ -486,6 +487,11 @@ type DialInfo struct {
// See Session.SetPoolLimit for details.
PoolLimit int

// PoolTimeout defines max time to wait for a connection to become available
// if the pool limit is reaqched. Defaults to zero, which means forever.
// See Session.SetPoolTimeout for details
PoolTimeout time.Duration

// The identifier of the client application which ran the operation.
AppName string

Expand Down Expand Up @@ -596,6 +602,10 @@ func DialWithInfo(info *DialInfo) (*Session, error) {
cluster.minPoolSize = info.MinPoolSize
cluster.maxIdleTimeMS = info.MaxIdleTimeMS

if info.PoolTimeout > 0 {
session.poolTimeout = info.PoolTimeout
}

cluster.Release()

// People get confused when we return a session that is not actually
Expand Down Expand Up @@ -711,6 +721,7 @@ func copySession(session *Session, keepCreds bool) (s *Session) {
syncTimeout: session.syncTimeout,
sockTimeout: session.sockTimeout,
poolLimit: session.poolLimit,
poolTimeout: session.poolTimeout,
consistency: session.consistency,
creds: creds,
dialCred: session.dialCred,
Expand Down Expand Up @@ -2051,6 +2062,16 @@ func (s *Session) SetPoolLimit(limit int) {
s.m.Unlock()
}

// SetPoolTimeout sets the maxinum time connection attempts will wait to reuse
// an existing connection from the pool if the PoolLimit has been reached. If
// the value is exceeded, the attempt to use a session will fail with an error.
// The default value is zero, which means to wait forever with no timeout.
func (s *Session) SetPoolTimeout(timeout time.Duration) {
s.m.Lock()
s.poolTimeout = timeout
s.m.Unlock()
}

// SetBypassValidation sets whether the server should bypass the registered
// validation expressions executed when documents are inserted or modified,
// in the interest of preserving invariants in the collection being modified.
Expand Down Expand Up @@ -4908,7 +4929,9 @@ func (s *Session) acquireSocket(slaveOk bool) (*mongoSocket, error) {
}

// Still not good. We need a new socket.
sock, err := s.cluster().AcquireSocket(s.consistency, slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags, s.poolLimit)
sock, err := s.cluster().AcquireSocketWithPoolTimeout(
s.consistency, slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags, s.poolLimit, s.poolTimeout,
)
if err != nil {
return nil, err
}
Expand Down
45 changes: 36 additions & 9 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ package mgo

import (
"sync"
"time"
)

var stats *Stats
Expand Down Expand Up @@ -77,15 +78,19 @@ func ResetStats() {
//
// TODO outdated fields ?
type Stats struct {
Clusters int
MasterConns int
SlaveConns int
SentOps int
ReceivedOps int
ReceivedDocs int
SocketsAlive int
SocketsInUse int
SocketRefs int
Clusters int
MasterConns int
SlaveConns int
SentOps int
ReceivedOps int
ReceivedDocs int
SocketsAlive int
SocketsInUse int
SocketRefs int
TimesSocketAcquired int
TimesWaitedForPool int
TotalPoolWaitTime time.Duration
PoolTimeouts int
}

func (stats *Stats) cluster(delta int) {
Expand Down Expand Up @@ -155,3 +160,25 @@ func (stats *Stats) socketRefs(delta int) {
statsMutex.Unlock()
}
}

func (stats *Stats) noticeSocketAcquisition(waitTime time.Duration) {
if stats != nil {
statsMutex.Lock()
stats.TimesSocketAcquired++
stats.TotalPoolWaitTime += waitTime
if waitTime > 0 {
stats.TimesWaitedForPool++
}
statsMutex.Unlock()
}
}

func (stats *Stats) noticePoolTimeout(waitTime time.Duration) {
if stats != nil {
statsMutex.Lock()
stats.TimesWaitedForPool++
stats.PoolTimeouts++
stats.TotalPoolWaitTime += waitTime
statsMutex.Unlock()
}
}

0 comments on commit f48d937

Please sign in to comment.