Skip to content

Commit

Permalink
Avoid I/O while holding the pool's queue lock (#556)
Browse files Browse the repository at this point in the history
Co-authored-by: Stephen Cathcart <stephen.cathcart@neotechnology.com>
  • Loading branch information
robsdedude and StephenCathcart authored Dec 14, 2023
1 parent ac6a50b commit 042c154
Showing 1 changed file with 13 additions and 34 deletions.
47 changes: 13 additions & 34 deletions neo4j/internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,35 +166,18 @@ func (p *Pool) getPenaltiesForServers(ctx context.Context, serverNames []string)
return penalties
}

func (p *Pool) tryAnyIdle(ctx context.Context, serverNames []string, idlenessTimeout time.Duration, auth *idb.ReAuthToken, logger log.BoltLogger) (idb.Connection, error) {
func (p *Pool) anyHasCapacity(serverNames []string) bool {
p.serversMut.Lock()
var unlock = new(sync.Once)
defer unlock.Do(p.serversMut.Unlock)
serverLoop:
defer p.serversMut.Unlock()
for _, serverName := range serverNames {
for {
srv := p.servers[serverName]
if srv != nil {
conn := srv.getIdle()
if conn == nil {
continue serverLoop
}
unlock.Do(p.serversMut.Unlock)
healthy, err := srv.healthCheck(ctx, conn, idlenessTimeout, auth, logger)
if healthy {
return conn, nil
}
p.unreg(ctx, serverName, conn, itime.Now())
if err != nil {
p.log.Debugf(log.Pool, p.logId, "Health check failed for %s: %s", serverName, err)
return nil, err
}
p.serversMut.Lock()
*unlock = sync.Once{}
srv := p.servers[serverName]
if srv != nil {
if srv.numIdle() > 0 || srv.size() < p.config.MaxConnectionPoolSize {
return true
}
}
}
return nil, nil
return false
}

func (p *Pool) Borrow(
Expand Down Expand Up @@ -251,17 +234,13 @@ func (p *Pool) Borrow(

// Wait for a matching connection to be returned from another thread.
p.queueMut.Lock()
// Ok, now that we own the queue we can add the item there but between getting the lock
// and above check for an existing connection another thread might have returned a connection
// so check again to avoid potentially starving this thread.
conn, err = p.tryAnyIdle(ctx, serverNames, idlenessTimeout, auth, boltLogger)
if err != nil {
p.queueMut.Unlock()
return nil, err
}
if conn != nil {
// By owning the queue lock, we are guaranteed that every call to Return from now on, until we release the
// lock, will notify us (or another waiter). To avoid starving this thread, we have to check once more whether
// any call to Return between checking for capacity above and acquiring the lock happened. In that case, we
// are no longer guaranteed to be notified, so we have to start over.
if p.anyHasCapacity(serverNames) {
p.queueMut.Unlock()
return conn, nil
continue
}
// Add a waiting request to the queue and unlock the queue to let other threads that return
// their connections access the queue.
Expand Down

0 comments on commit 042c154

Please sign in to comment.