Skip to content

Commit

Permalink
bugfix: prevent recreate connection after Close()
Browse files Browse the repository at this point in the history
The ConnectionPool.checker() goroutine may still work some time
after ConnectionPool.Close() call. It may lead to re-open
connection in a concurrent closing pool. The connection still
opened after the pool is closed.

The patch adds RWLock to protect blocks which work with anyPool,
roPool and rwPool. We don't need to protect regular requests
because in the worst case, we will send a request into
a closed connection. It can happen for other reasons and it seems
like we can't avoid it. So it is an expected behavior.

Closes #208
  • Loading branch information
oleg-jukovec committed Aug 24, 2022
1 parent 0c6a81a commit adb2bfd
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 42 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
- Segmentation faults in ConnectionPool requests after disconnect (#208)
- Addresses in ConnectionPool may be changed from an external code (#208)
- ConnectionPool recreates connections too often (#208)
- A connection is still opened after ConnectionPool.Close() (#208)

## [1.8.0] - 2022-08-17

Expand Down
85 changes: 51 additions & 34 deletions connection_pool/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"errors"
"fmt"
"log"
"sync/atomic"
"sync"
"time"

"github.com/tarantool/go-tarantool"
Expand Down Expand Up @@ -69,12 +69,13 @@ type ConnectionPool struct {
connOpts tarantool.Opts
opts OptsPool

notify chan tarantool.ConnEvent
state State
control chan struct{}
roPool *RoundRobinStrategy
rwPool *RoundRobinStrategy
anyPool *RoundRobinStrategy
notify chan tarantool.ConnEvent
state state
control chan struct{}
roPool *RoundRobinStrategy
rwPool *RoundRobinStrategy
anyPool *RoundRobinStrategy
poolsMutex sync.RWMutex
}

// ConnectWithOpts creates pool for instances with addresses addrs
Expand All @@ -100,6 +101,7 @@ func ConnectWithOpts(addrs []string, connOpts tarantool.Opts, opts OptsPool) (co
connOpts: connOpts,
opts: opts,
notify: notify,
state: unknownState,
control: make(chan struct{}),
rwPool: rwPool,
roPool: roPool,
Expand All @@ -109,10 +111,12 @@ func ConnectWithOpts(addrs []string, connOpts tarantool.Opts, opts OptsPool) (co

somebodyAlive := connPool.fillPools()
if !somebodyAlive {
connPool.Close()
connPool.state.set(closedState)
connPool.closeImpl()
return nil, ErrNoConnection
}

connPool.state.set(connectedState)
go connPool.checker()

return connPool, nil
Expand All @@ -128,7 +132,10 @@ func Connect(addrs []string, connOpts tarantool.Opts) (connPool *ConnectionPool,

// ConnectedNow gets connected status of pool.
func (connPool *ConnectionPool) ConnectedNow(mode Mode) (bool, error) {
if connPool.getState() != connConnected {
connPool.poolsMutex.RLock()
defer connPool.poolsMutex.RUnlock()

if connPool.state.get() != connectedState {
return false, nil
}
switch mode {
Expand Down Expand Up @@ -157,10 +164,8 @@ func (connPool *ConnectionPool) ConfiguredTimeout(mode Mode) (time.Duration, err
return conn.ConfiguredTimeout(), nil
}

// Close closes connections in pool.
func (connPool *ConnectionPool) Close() []error {
func (connPool *ConnectionPool) closeImpl() []error {
close(connPool.control)
connPool.state = connClosed

errs := make([]error, 0, len(connPool.addrs))

Expand All @@ -177,6 +182,17 @@ func (connPool *ConnectionPool) Close() []error {
return errs
}

// Close closes connections in pool.
func (connPool *ConnectionPool) Close() []error {
if connPool.state.cas(connectedState, closedState) {
connPool.poolsMutex.Lock()
defer connPool.poolsMutex.Unlock()

return connPool.closeImpl()
}
return nil
}

// GetAddrs gets addresses of connections in pool.
func (connPool *ConnectionPool) GetAddrs() []string {
cpy := make([]string, len(connPool.addrs))
Expand All @@ -188,6 +204,13 @@ func (connPool *ConnectionPool) GetAddrs() []string {
func (connPool *ConnectionPool) GetPoolInfo() map[string]*ConnectionInfo {
info := make(map[string]*ConnectionInfo)

connPool.poolsMutex.RLock()
defer connPool.poolsMutex.RUnlock()

if connPool.state.get() != connectedState {
return info
}

for _, addr := range connPool.addrs {
conn, role := connPool.getConnectionFromPool(addr)
if conn != nil {
Expand Down Expand Up @@ -638,11 +661,9 @@ func (connPool *ConnectionPool) getConnectionFromPool(addr string) (*tarantool.C
func (connPool *ConnectionPool) deleteConnectionFromPool(addr string) {
_ = connPool.anyPool.DeleteConnByAddr(addr)
conn := connPool.rwPool.DeleteConnByAddr(addr)
if conn != nil {
return
if conn == nil {
connPool.roPool.DeleteConnByAddr(addr)
}

connPool.roPool.DeleteConnByAddr(addr)
}

func (connPool *ConnectionPool) setConnectionToPool(addr string, conn *tarantool.Connection) error {
Expand Down Expand Up @@ -689,39 +710,39 @@ func (connPool *ConnectionPool) refreshConnection(addr string) {
}

func (connPool *ConnectionPool) checker() {

timer := time.NewTicker(connPool.opts.CheckTimeout)
defer timer.Stop()

for connPool.getState() != connClosed {
for connPool.state.get() != closedState {
select {
case <-connPool.control:
return
case e := <-connPool.notify:
if connPool.getState() == connClosed {
return
}
if e.Conn.ClosedNow() {
connPool.poolsMutex.Lock()
if connPool.state.get() == connectedState && e.Conn.ClosedNow() {
connPool.deleteConnectionFromPool(e.Conn.Addr())
}
connPool.poolsMutex.Unlock()
case <-timer.C:
for _, addr := range connPool.addrs {
if connPool.getState() == connClosed {
return
connPool.poolsMutex.Lock()
if connPool.state.get() == connectedState {
for _, addr := range connPool.addrs {
// Reopen connection
// Relocate connection between subpools
// if ro/rw was updated
connPool.refreshConnection(addr)
}

// Reopen connection
// Relocate connection between subpools
// if ro/rw was updated
connPool.refreshConnection(addr)
}
connPool.poolsMutex.Unlock()
}
}
}

func (connPool *ConnectionPool) fillPools() bool {
somebodyAlive := false

// It is called before checker() goroutine and before closeImpl() may be
// called so we don't expect concurrency issues here.
for _, addr := range connPool.addrs {
conn, err := tarantool.Connect(addr, connPool.connOpts)
if err != nil {
Expand All @@ -740,10 +761,6 @@ func (connPool *ConnectionPool) fillPools() bool {
return somebodyAlive
}

func (connPool *ConnectionPool) getState() uint32 {
return atomic.LoadUint32((*uint32)(&connPool.state))
}

func (connPool *ConnectionPool) getNextConnection(mode Mode) (*tarantool.Connection, error) {

switch mode {
Expand Down
8 changes: 0 additions & 8 deletions connection_pool/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,3 @@ const (
MasterRole // The instance is read-write mode.
ReplicaRole // The instance is in read-only mode.
)

type State uint32

// pool state
const (
connConnected = iota
connClosed
)
26 changes: 26 additions & 0 deletions connection_pool/state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package connection_pool

import (
"sync/atomic"
)

// pool state
type state uint32

const (
unknownState state = iota
connectedState
closedState
)

func (s *state) set(news state) {
atomic.StoreUint32((*uint32)(s), uint32(news))
}

func (s *state) cas(olds, news state) bool {
return atomic.CompareAndSwapUint32((*uint32)(s), uint32(olds), uint32(news))
}

func (s *state) get() state {
return state(atomic.LoadUint32((*uint32)(s)))
}

0 comments on commit adb2bfd

Please sign in to comment.