diff --git a/CHANGELOG.md b/CHANGELOG.md index 9ba11e011..fd852cd34 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. ### Added - Support queue 1.2.0 (#177) +- ConnectionListener interface for knowing the role changes of connections in + ConnectionPool (#178) ### Changed diff --git a/connection_pool/connection_pool.go b/connection_pool/connection_pool.go index e0c862386..a45f8df2d 100644 --- a/connection_pool/connection_pool.go +++ b/connection_pool/connection_pool.go @@ -32,10 +32,24 @@ var ( ErrNoHealthyInstance = errors.New("can't find healthy instance in pool") ) +// ConnectionListener provides an observer semantics for components interested +// in knowing the role changes of connections in a ConnectionPool. +type ConnectionListener interface { + // Add is called before a connection with a role added into a list of + // active connections. A ConnectionPool object will close connection + // if an error returned. + Add(conn *tarantool.Connection, role Role) error + // Removed is called after a connection with a role removed from a list + // of active connections. + Removed(conn *tarantool.Connection, role Role) +} + /* Additional options (configurable via ConnectWithOpts): - CheckTimeout - time interval to check for connection timeout and try to switch connection. + +- ConnectionListener - a listener for connection updates. */ type OptsPool struct { // timeout for timer to reopen connections @@ -43,6 +57,8 @@ type OptsPool struct { // to relocate connection between subpools // if ro/rw role has been updated CheckTimeout time.Duration + // ConnectionListener provides an ability to observe connection updates. + ConnectionListener ConnectionListener } /* @@ -69,15 +85,21 @@ type ConnectionPool struct { connOpts tarantool.Opts opts OptsPool - notify chan tarantool.ConnEvent state state - control chan struct{} + done chan struct{} roPool *RoundRobinStrategy rwPool *RoundRobinStrategy anyPool *RoundRobinStrategy poolsMutex sync.RWMutex } +type connState struct { + addr string + notify chan tarantool.ConnEvent + conn *tarantool.Connection + role Role +} + // ConnectWithOpts creates pool for instances with addresses addrs // with options opts. func ConnectWithOpts(addrs []string, connOpts tarantool.Opts, opts OptsPool) (connPool *ConnectionPool, err error) { @@ -88,9 +110,6 @@ func ConnectWithOpts(addrs []string, connOpts tarantool.Opts, opts OptsPool) (co return nil, ErrWrongCheckTimeout } - notify := make(chan tarantool.ConnEvent, 10*len(addrs)) // x10 to accept disconnected and closed event (with a margin) - connOpts.Notify = notify - size := len(addrs) rwPool := NewEmptyRoundRobin(size) roPool := NewEmptyRoundRobin(size) @@ -100,9 +119,8 @@ func ConnectWithOpts(addrs []string, connOpts tarantool.Opts, opts OptsPool) (co addrs: make([]string, 0, len(addrs)), connOpts: connOpts, opts: opts, - notify: notify, state: unknownState, - control: make(chan struct{}), + done: make(chan struct{}), rwPool: rwPool, roPool: roPool, anyPool: anyPool, @@ -116,15 +134,21 @@ func ConnectWithOpts(addrs []string, connOpts tarantool.Opts, opts OptsPool) (co } } - somebodyAlive := connPool.fillPools() + states, somebodyAlive := connPool.fillPools() if !somebodyAlive { connPool.state.set(closedState) connPool.closeImpl() + for _, s := range states { + close(s.notify) + } return nil, ErrNoConnection } connPool.state.set(connectedState) - go connPool.checker() + + for _, s := range states { + go connPool.checker(s) + } return connPool, nil } @@ -172,8 +196,6 @@ func (connPool *ConnectionPool) ConfiguredTimeout(mode Mode) (time.Duration, err } func (connPool *ConnectionPool) closeImpl() []error { - close(connPool.control) - errs := make([]error, 0, len(connPool.addrs)) for _, addr := range connPool.addrs { @@ -181,11 +203,18 @@ func (connPool *ConnectionPool) closeImpl() []error { if err := conn.Close(); err != nil { errs = append(errs, err) } + + role := UnknownRole + if conn := connPool.rwPool.DeleteConnByAddr(addr); conn != nil { + role = MasterRole + } else if conn := connPool.roPool.DeleteConnByAddr(addr); conn != nil { + role = ReplicaRole + } + connPool.connectionListenerRemoved(conn, role) } - connPool.rwPool.DeleteConnByAddr(addr) - connPool.roPool.DeleteConnByAddr(addr) } + close(connPool.done) return errs } @@ -665,19 +694,17 @@ func (connPool *ConnectionPool) getConnectionFromPool(addr string) (*tarantool.C return connPool.anyPool.GetConnByAddr(addr), UnknownRole } -func (connPool *ConnectionPool) deleteConnectionFromPool(addr string) { - _ = connPool.anyPool.DeleteConnByAddr(addr) - conn := connPool.rwPool.DeleteConnByAddr(addr) - if conn == nil { +func (connPool *ConnectionPool) deleteConnection(addr string) { + if conn := connPool.anyPool.DeleteConnByAddr(addr); conn != nil { + if conn := connPool.rwPool.DeleteConnByAddr(addr); conn != nil { + return + } connPool.roPool.DeleteConnByAddr(addr) } } -func (connPool *ConnectionPool) setConnectionToPool(addr string, conn *tarantool.Connection) error { - role, err := connPool.getConnectionRole(conn) - if err != nil { - return err - } +func (connPool *ConnectionPool) addConnection(addr string, + conn *tarantool.Connection, role Role) { connPool.anyPool.AddConn(addr, conn) @@ -687,85 +714,212 @@ func (connPool *ConnectionPool) setConnectionToPool(addr string, conn *tarantool case ReplicaRole: connPool.roPool.AddConn(addr, conn) } +} - return nil +func (connPool *ConnectionPool) connectionListenerAdd(addr string, + conn *tarantool.Connection, role Role) bool { + if connPool.opts.ConnectionListener != nil { + if err := connPool.opts.ConnectionListener.Add(conn, role); err != nil { + log.Printf("tarantool: storing connection to %s canceled: %s\n", addr, err) + return false + } + } + + return true } -func (connPool *ConnectionPool) refreshConnection(addr string) { - if conn, oldRole := connPool.getConnectionFromPool(addr); conn != nil { - if !conn.ClosedNow() { - curRole, _ := connPool.getConnectionRole(conn) - if oldRole != curRole { - connPool.deleteConnectionFromPool(addr) - err := connPool.setConnectionToPool(addr, conn) - if err != nil { - conn.Close() - log.Printf("tarantool: storing connection to %s failed: %s\n", addr, err.Error()) - } - } +func (connPool *ConnectionPool) connectionListenerRemoved(conn *tarantool.Connection, role Role) { + if connPool.opts.ConnectionListener != nil { + connPool.opts.ConnectionListener.Removed(conn, role) + } +} + +func (connPool *ConnectionPool) fillPools() ([]connState, bool) { + states := make([]connState, len(connPool.addrs)) + somebodyAlive := false + + // It is called before checker() goroutines and before closeImpl() may be + // called so we don't expect concurrency issues here. + for i, addr := range connPool.addrs { + states[i] = connState{ + addr: addr, + notify: make(chan tarantool.ConnEvent, 10), + conn: nil, + role: UnknownRole, } - } else { - conn, _ := tarantool.Connect(addr, connPool.connOpts) - if conn != nil { - err := connPool.setConnectionToPool(addr, conn) + connOpts := connPool.connOpts + connOpts.Notify = states[i].notify + + conn, err := tarantool.Connect(addr, connOpts) + if err != nil { + log.Printf("tarantool: connect to %s failed: %s\n", addr, err.Error()) + } else if conn != nil { + role, err := connPool.getConnectionRole(conn) if err != nil { conn.Close() - log.Printf("tarantool: storing connection to %s failed: %s\n", addr, err.Error()) + log.Printf("tarantool: storing connection to %s failed: %s\n", addr, err) + continue + } + + if connPool.connectionListenerAdd(addr, conn, role) { + connPool.addConnection(addr, conn, role) + + states[i].conn = conn + states[i].role = role + + if conn.ConnectedNow() { + somebodyAlive = true + } + } else { + conn.Close() } } } + + return states, somebodyAlive } -func (connPool *ConnectionPool) checker() { - timer := time.NewTicker(connPool.opts.CheckTimeout) - defer timer.Stop() +func (pool *ConnectionPool) updateConnection(s connState) connState { + pool.poolsMutex.Lock() - for connPool.state.get() != closedState { - select { - case <-connPool.control: - return - case e := <-connPool.notify: - connPool.poolsMutex.Lock() - if connPool.state.get() == connectedState && e.Conn.ClosedNow() { - connPool.deleteConnectionFromPool(e.Conn.Addr()) + if pool.state.get() != connectedState { + pool.poolsMutex.Unlock() + return s + } + + if role, err := pool.getConnectionRole(s.conn); err == nil { + if s.role != role { + pool.deleteConnection(s.addr) + pool.poolsMutex.Unlock() + + pool.connectionListenerRemoved(s.conn, s.role) + added := pool.connectionListenerAdd(s.addr, s.conn, role) + if !added { + s.conn.Close() + s.conn = nil + s.role = UnknownRole + return s } - connPool.poolsMutex.Unlock() - case <-timer.C: - connPool.poolsMutex.Lock() - if connPool.state.get() == connectedState { - for _, addr := range connPool.addrs { - // Reopen connection - // Relocate connection between subpools - // if ro/rw was updated - connPool.refreshConnection(addr) - } + + pool.poolsMutex.Lock() + if pool.state.get() != connectedState { + pool.poolsMutex.Unlock() + s.conn.Close() + pool.connectionListenerRemoved(s.conn, role) + s.conn = nil + s.role = UnknownRole + return s } - connPool.poolsMutex.Unlock() + + pool.addConnection(s.addr, s.conn, role) + s.role = role } } + + pool.poolsMutex.Unlock() + return s } -func (connPool *ConnectionPool) fillPools() bool { - somebodyAlive := false +func (pool *ConnectionPool) tryConnect(s connState) connState { + pool.poolsMutex.Lock() + + if pool.state.get() != connectedState { + pool.poolsMutex.Unlock() + return s + } + + s.conn = nil + s.role = UnknownRole + + connOpts := pool.connOpts + connOpts.Notify = s.notify + conn, _ := tarantool.Connect(s.addr, connOpts) + if conn != nil { + role, err := pool.getConnectionRole(conn) + pool.poolsMutex.Unlock() - // It is called before checker() goroutine and before closeImpl() may be - // called so we don't expect concurrency issues here. - for _, addr := range connPool.addrs { - conn, err := tarantool.Connect(addr, connPool.connOpts) if err != nil { - log.Printf("tarantool: connect to %s failed: %s\n", addr, err.Error()) - } else if conn != nil { - err = connPool.setConnectionToPool(addr, conn) - if err != nil { - conn.Close() - log.Printf("tarantool: storing connection to %s failed: %s\n", addr, err.Error()) - } else if conn.ConnectedNow() { - somebodyAlive = true - } + conn.Close() + log.Printf("tarantool: storing connection to %s failed: %s\n", s.addr, err) + return s + } + + added := pool.connectionListenerAdd(s.addr, conn, role) + if !added { + conn.Close() + return s } + + pool.poolsMutex.Lock() + if pool.state.get() != connectedState { + pool.poolsMutex.Unlock() + conn.Close() + pool.connectionListenerRemoved(conn, role) + return s + } + + pool.addConnection(s.addr, conn, role) + s.conn = conn + s.role = role } - return somebodyAlive + pool.poolsMutex.Unlock() + return s +} + +func (pool *ConnectionPool) reconnect(s connState) connState { + pool.poolsMutex.Lock() + + if pool.state.get() != connectedState { + pool.poolsMutex.Unlock() + return s + } + + pool.deleteConnection(s.addr) + pool.poolsMutex.Unlock() + + pool.connectionListenerRemoved(s.conn, s.role) + s.conn = nil + s.role = UnknownRole + + return pool.tryConnect(s) +} + +func (pool *ConnectionPool) checker(s connState) { + timer := time.NewTicker(pool.opts.CheckTimeout) + defer timer.Stop() + + for { + select { + case <-pool.done: + close(s.notify) + return + case <-s.notify: + if s.conn != nil && s.conn.ClosedNow() { + pool.poolsMutex.Lock() + if pool.state.get() == connectedState { + pool.deleteConnection(s.addr) + pool.poolsMutex.Unlock() + pool.connectionListenerRemoved(s.conn, s.role) + s.conn = nil + s.role = UnknownRole + } else { + pool.poolsMutex.Unlock() + } + } + case <-timer.C: + // Reopen connection + // Relocate connection between subpools + // if ro/rw was updated + if s.conn == nil { + s = pool.tryConnect(s) + } else if s.conn.ConnectedNow() { + s = pool.updateConnection(s) + } else { + s = pool.reconnect(s) + } + } + } } func (connPool *ConnectionPool) getNextConnection(mode Mode) (*tarantool.Connection, error) { diff --git a/connection_pool/connection_pool_test.go b/connection_pool/connection_pool_test.go index 326604b51..59aab2150 100644 --- a/connection_pool/connection_pool_test.go +++ b/connection_pool/connection_pool_test.go @@ -6,6 +6,7 @@ import ( "os" "reflect" "strings" + "sync" "testing" "time" @@ -233,6 +234,223 @@ func TestClose(t *testing.T) { require.Nil(t, err) } +type testListener struct { + added, closed int + errs []error + mutex sync.Mutex +} + +func (tl *testListener) addErr(err error) { + tl.errs = append(tl.errs, err) +} + +func (tl *testListener) Add(conn *tarantool.Connection, role connection_pool.Role) error { + tl.mutex.Lock() + defer tl.mutex.Unlock() + + tl.added++ + + if conn == nil { + tl.addErr(fmt.Errorf("added conn == nil")) + return nil + } + + addr := conn.Addr() + if addr == servers[0] { + if role != connection_pool.MasterRole && tl.closed != 1 || + role != connection_pool.ReplicaRole && tl.closed == 1 { + tl.addErr(fmt.Errorf("unexpected added role %d for addr %s", role, addr)) + } + } else if addr == servers[1] { + if role != connection_pool.ReplicaRole { + tl.addErr(fmt.Errorf("unexpected added role %d for addr %s", role, addr)) + } + } else { + tl.addErr(fmt.Errorf("unexpected added addr %s", addr)) + } + + return nil +} + +func (tl *testListener) Removed(conn *tarantool.Connection, role connection_pool.Role) { + tl.mutex.Lock() + defer tl.mutex.Unlock() + + tl.closed++ + + if conn == nil { + tl.addErr(fmt.Errorf("removed conn == nil")) + return + } + + addr := conn.Addr() + if tl.closed == 1 && addr == servers[0] { + // Update. + if role != connection_pool.MasterRole { + tl.addErr(fmt.Errorf("unexpected removed role %d for addr %s", role, addr)) + } + return + } + + if addr == servers[0] || addr == servers[1] { + // Close. + if role != connection_pool.ReplicaRole { + tl.addErr(fmt.Errorf("unexpected removed role %d for addr %s", role, addr)) + } + } else { + tl.addErr(fmt.Errorf("unexpected removed addr %s", addr)) + } +} + +func TestConnectionListener(t *testing.T) { + poolServers := []string{servers[0], servers[1]} + roles := []bool{false, true} + + err := test_helpers.SetClusterRO(poolServers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + + tl := &testListener{} + poolOpts := connection_pool.OptsPool{ + CheckTimeout: 100 * time.Microsecond, + ConnectionListener: tl, + } + connPool, err := connection_pool.ConnectWithOpts(poolServers, connOpts, poolOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + _, err = connPool.Call17("box.cfg", []interface{}{map[string]bool{ + "read_only": true, + }}, connection_pool.RW) + require.Nilf(t, err, "failed to make ro") + for i := 0; i < 100; i++ { + // Wait for update. + if tl.closed > 0 && tl.added >= len(poolServers)+1 { + break + } + time.Sleep(poolOpts.CheckTimeout) + } + + connPool.Close() + + for i := 0; i < 100; i++ { + // Wait for close. + if tl.closed >= 3 { + break + } + time.Sleep(poolOpts.CheckTimeout) + } + + args := test_helpers.CheckStatusesArgs{ + ConnPool: connPool, + Mode: connection_pool.ANY, + Servers: poolServers, + ExpectedPoolStatus: false, + ExpectedStatuses: map[string]bool{ + poolServers[0]: false, + poolServers[1]: false, + }, + } + + err = test_helpers.Retry(test_helpers.CheckPoolStatuses, args, defaultCountRetry, defaultTimeoutRetry) + require.Nil(t, err) + + for _, err := range tl.errs { + t.Errorf("Unexpected error: %s", err) + } + require.Equalf(t, tl.added, len(poolServers)+1, "unexpected added count") + require.Equalf(t, tl.closed, len(poolServers)+1, "unexpected closed count") +} + +type testAddErrorListener struct { + connection_pool.ConnectionListener +} + +func (tl *testAddErrorListener) Add(conn *tarantool.Connection, role connection_pool.Role) error { + return fmt.Errorf("any error") +} + +func TestConnectionListenerAddError(t *testing.T) { + poolServers := []string{servers[0], servers[1]} + + tl := &testAddErrorListener{} + poolOpts := connection_pool.OptsPool{ + CheckTimeout: 100 * time.Microsecond, + ConnectionListener: tl, + } + connPool, err := connection_pool.ConnectWithOpts(poolServers, connOpts, poolOpts) + if err == nil { + defer connPool.Close() + } + require.NotNilf(t, err, "success to connect") +} + +type testUpdateErrorListener struct { + connection_pool.ConnectionListener + updated int + mutex sync.Mutex +} + +func (tl *testUpdateErrorListener) Add(conn *tarantool.Connection, + role connection_pool.Role) error { + tl.mutex.Lock() + defer tl.mutex.Unlock() + + if tl.updated != 0 { + // Don't add again after it was deleted. + return fmt.Errorf("any error") + } + return nil +} + +func (tl *testUpdateErrorListener) Removed(conn *tarantool.Connection, + role connection_pool.Role) { + tl.mutex.Lock() + defer tl.mutex.Unlock() + + tl.updated++ +} + +func TestConnectionListenerUpdateError(t *testing.T) { + poolServers := []string{servers[0], servers[1]} + roles := []bool{false, false} + + err := test_helpers.SetClusterRO(poolServers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + + tl := &testUpdateErrorListener{} + poolOpts := connection_pool.OptsPool{ + CheckTimeout: 100 * time.Microsecond, + ConnectionListener: tl, + } + connPool, err := connection_pool.ConnectWithOpts(poolServers, connOpts, poolOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + defer connPool.Close() + + connected, err := connPool.ConnectedNow(connection_pool.ANY) + require.Nilf(t, err, "failed to get ConnectedNow()") + require.Truef(t, connected, "should be connected") + + for i := 0; i < len(poolServers); i++ { + _, err = connPool.Call17("box.cfg", []interface{}{map[string]bool{ + "read_only": true, + }}, connection_pool.RW) + require.Nilf(t, err, "failed to make ro") + } + + for i := 0; i < 10; i++ { + connected, err = connPool.ConnectedNow(connection_pool.ANY) + if !connected || err != nil { + break + } + time.Sleep(poolOpts.CheckTimeout) + } + + require.Equalf(t, 2, tl.updated, "unexpected updated count") + require.Nilf(t, err, "failed to get ConnectedNow()") + require.Falsef(t, connected, "still connected") +} + func TestRequestOnClosed(t *testing.T) { server1 := servers[0] server2 := servers[1]