Skip to content

Commit

Permalink
api: add ConnectionListener to the connection_pool
Browse files Browse the repository at this point in the history
ConnectionListener provides an observer semantics for components interested
in knowing the role changes of connections in a ConnectionPool.

Closes #178
  • Loading branch information
oleg-jukovec committed Aug 24, 2022
1 parent adb2bfd commit 2a91428
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 13 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
### Added

- Support queue 1.2.0 (#177)
- RoleListener interface for knowing the role changes of connections in
ConnectionPool (#178)

### Changed

Expand Down
89 changes: 76 additions & 13 deletions connection_pool/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,35 @@ var (
ErrNoHealthyInstance = errors.New("can't find healthy instance in pool")
)

// ConnectionListener provides an observer semantics for components interested
// in knowing the role changes of connections in a ConnectionPool.
type ConnectionListener interface {
// Added is called before a connection with a role added into a list of
// active connections.
Added(conn *tarantool.Connection, role Role)
// Removed is called after a connection with a role removed from a list
// of active connections.
Removed(conn *tarantool.Connection, role Role)
// Updated is called if a connection with a role updated in a list
// of active connections.
Updated(conn *tarantool.Connection, oldRole, newRole Role)
}

/*
Additional options (configurable via ConnectWithOpts):
- CheckTimeout - time interval to check for connection timeout and try to switch connection.
- ConnectionListener - a listener for connection updates.
*/
type OptsPool struct {
// timeout for timer to reopen connections
// that have been closed by some events and
// to relocate connection between subpools
// if ro/rw role has been updated
CheckTimeout time.Duration
// ConnectionListener provides an ability to observe connection updates.
ConnectionListener ConnectionListener
}

/*
Expand Down Expand Up @@ -174,9 +192,22 @@ func (connPool *ConnectionPool) closeImpl() []error {
if err := conn.Close(); err != nil {
errs = append(errs, err)
}
if conn := connPool.rwPool.DeleteConnByAddr(addr); conn != nil {
if connPool.opts.ConnectionListener != nil {
connPool.opts.ConnectionListener.Removed(conn, MasterRole)
}
continue
}
if conn := connPool.roPool.DeleteConnByAddr(addr); conn != nil {
if connPool.opts.ConnectionListener != nil {
connPool.opts.ConnectionListener.Removed(conn, ReplicaRole)
}
continue
}
if connPool.opts.ConnectionListener != nil {
connPool.opts.ConnectionListener.Removed(conn, UnknownRole)
}
}
connPool.rwPool.DeleteConnByAddr(addr)
connPool.roPool.DeleteConnByAddr(addr)
}

return errs
Expand Down Expand Up @@ -659,10 +690,22 @@ func (connPool *ConnectionPool) getConnectionFromPool(addr string) (*tarantool.C
}

func (connPool *ConnectionPool) deleteConnectionFromPool(addr string) {
_ = connPool.anyPool.DeleteConnByAddr(addr)
conn := connPool.rwPool.DeleteConnByAddr(addr)
if conn == nil {
connPool.roPool.DeleteConnByAddr(addr)
if conn := connPool.anyPool.DeleteConnByAddr(addr); conn != nil {
if conn := connPool.rwPool.DeleteConnByAddr(addr); conn != nil {
if connPool.opts.ConnectionListener != nil {
connPool.opts.ConnectionListener.Removed(conn, MasterRole)
}
return
}
if conn := connPool.roPool.DeleteConnByAddr(addr); conn != nil {
if connPool.opts.ConnectionListener != nil {
connPool.opts.ConnectionListener.Removed(conn, ReplicaRole)
}
return
}
if connPool.opts.ConnectionListener != nil {
connPool.opts.ConnectionListener.Removed(conn, UnknownRole)
}
}
}

Expand All @@ -672,6 +715,10 @@ func (connPool *ConnectionPool) setConnectionToPool(addr string, conn *tarantool
return err
}

if connPool.opts.ConnectionListener != nil {
connPool.opts.ConnectionListener.Added(conn, role)
}

connPool.anyPool.AddConn(addr, conn)

switch role {
Expand All @@ -684,16 +731,32 @@ func (connPool *ConnectionPool) setConnectionToPool(addr string, conn *tarantool
return nil
}

func (connPool *ConnectionPool) updateConnectionInPool(addr string, conn *tarantool.Connection, oldRole Role, newRole Role) {
switch oldRole {
case MasterRole:
connPool.rwPool.DeleteConnByAddr(addr)
case ReplicaRole:
connPool.roPool.DeleteConnByAddr(addr)
}

if connPool.opts.ConnectionListener != nil {
connPool.opts.ConnectionListener.Updated(conn, oldRole, newRole)
}

switch newRole {
case MasterRole:
connPool.rwPool.AddConn(addr, conn)
case ReplicaRole:
connPool.roPool.AddConn(addr, conn)
}
}

func (connPool *ConnectionPool) refreshConnection(addr string) {
if conn, oldRole := connPool.getConnectionFromPool(addr); conn != nil {
if !conn.ClosedNow() {
curRole, _ := connPool.getConnectionRole(conn)
if oldRole != curRole {
connPool.deleteConnectionFromPool(addr)
err := connPool.setConnectionToPool(addr, conn)
if err != nil {
conn.Close()
log.Printf("tarantool: storing connection to %s failed: %s\n", addr, err.Error())
if curRole, err := connPool.getConnectionRole(conn); err == nil {
if oldRole != curRole {
connPool.updateConnectionInPool(addr, conn, oldRole, curRole)
}
}
}
Expand Down
122 changes: 122 additions & 0 deletions connection_pool/connection_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,128 @@ func TestClose(t *testing.T) {
require.Nil(t, err)
}

type testListener struct {
added, closed, updated int
errs []error
}

func (tl *testListener) addErr(err error) {
tl.errs = append(tl.errs, err)
}

func (tl *testListener) Added(conn *tarantool.Connection, role connection_pool.Role) {
tl.added++

if conn == nil {
tl.addErr(fmt.Errorf("added conn == nil"))
return
}

addr := conn.Addr()
if addr == servers[0] {
if role != connection_pool.MasterRole {
tl.addErr(fmt.Errorf("unexpected added role %d for addr %s", role, addr))
}
} else if addr == servers[1] {
if role != connection_pool.ReplicaRole {
tl.addErr(fmt.Errorf("unexpected added role %d for addr %s", role, addr))
}
} else {
tl.addErr(fmt.Errorf("unexpected added addr %s", addr))
}
}

func (tl *testListener) Removed(conn *tarantool.Connection, role connection_pool.Role) {
tl.closed++

if conn == nil {
tl.addErr(fmt.Errorf("removed conn == nil"))
return
}

addr := conn.Addr()
if addr == servers[0] || addr == servers[1] {
if role != connection_pool.ReplicaRole {
tl.addErr(fmt.Errorf("unexpected removed role %d for addr %s", role, addr))
}
} else {
tl.addErr(fmt.Errorf("unexpected removed addr %s", addr))
}
}

func (tl *testListener) Updated(conn *tarantool.Connection, oldRole, newRole connection_pool.Role) {
tl.updated++

if oldRole != connection_pool.MasterRole {
tl.addErr(fmt.Errorf("unexpected oldRole = %d", oldRole))
}

if newRole != connection_pool.ReplicaRole {
tl.addErr(fmt.Errorf("unexpected newRole = %d", newRole))
}

if conn == nil {
tl.addErr(fmt.Errorf("updated conn == nil"))
return
}

addr := conn.Addr()
if addr != servers[0] {
tl.addErr(fmt.Errorf("unexpected updated addr: %s", addr))
}
}

func TestConnectionListener(t *testing.T) {
poolServers := []string{servers[0], servers[1]}
roles := []bool{false, true}

err := test_helpers.SetClusterRO(poolServers, connOpts, roles)
require.Nilf(t, err, "fail to set roles for cluster")

tl := &testListener{}
poolOpts := connection_pool.OptsPool{
CheckTimeout: 100 * time.Microsecond,
ConnectionListener: tl,
}
connPool, err := connection_pool.ConnectWithOpts(poolServers, connOpts, poolOpts)
require.Nilf(t, err, "failed to connect")
require.NotNilf(t, connPool, "conn is nil after Connect")

_, err = connPool.Call17("box.cfg", []interface{}{map[string]bool{
"read_only": true,
}}, connection_pool.RW)
require.Nilf(t, err, "failed to make ro")
for i := 0; i < 10; i++ {
if tl.updated == 1 {
break
}
time.Sleep(poolOpts.CheckTimeout)
}

connPool.Close()

args := test_helpers.CheckStatusesArgs{
ConnPool: connPool,
Mode: connection_pool.ANY,
Servers: poolServers,
ExpectedPoolStatus: false,
ExpectedStatuses: map[string]bool{
poolServers[0]: false,
poolServers[1]: false,
},
}

err = test_helpers.Retry(test_helpers.CheckPoolStatuses, args, defaultCountRetry, defaultTimeoutRetry)
require.Nil(t, err)

for _, err := range tl.errs {
t.Errorf("Unexpected error: %s", err)
}
require.Equalf(t, tl.added, len(poolServers), "unexpected added count")
require.Equalf(t, tl.closed, len(poolServers), "unexpected closed count")
require.Equalf(t, tl.updated, 1, "unexpected updated count")
}

func TestRequestOnClosed(t *testing.T) {
server1 := servers[0]
server2 := servers[1]
Expand Down

0 comments on commit 2a91428

Please sign in to comment.