diff --git a/CHANGELOG.md b/CHANGELOG.md index d0f696cc0..3c9def351 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -65,7 +65,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. - Change `OverrideSchema(*Schema)` to `SetSchema(Schema)` (#7) - Change values, stored by pointers in the `Schema`, `Space`, `Index` structs, to be stored by their values (#7) -- Make `Dialer` mandatory for creation a single connection / connection pool (#321) +- Make `Dialer` mandatory for creation a single connection (#321) - Remove `Connection.RemoteAddr()`, `Connection.LocalAddr()`. Add `Addr()` function instead (#321) - Remove `Connection.ClientProtocolInfo`, `Connection.ServerProtocolInfo`. @@ -93,6 +93,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. `Call`, `Call16`, `Call17`, `Eval`, `Execute` of a `Connector` and `Pooler` return response data instead of an actual responses (#237) - Renamed `StrangerResponse` to `MockResponse` (#237) +- `pool.Connect`, `pool.ConnetcWithOpts` and `pool.Add` use a new type + `pool.Instance` to determinate connection options (#356) ### Deprecated diff --git a/README.md b/README.md index 5472e50a2..f37b3be64 100644 --- a/README.md +++ b/README.md @@ -192,16 +192,19 @@ The subpackage has been deleted. You could use `pool` instead. * The `connection_pool` subpackage has been renamed to `pool`. * The type `PoolOpts` has been renamed to `Opts`. -* `pool.Connect` now accepts context as first argument, which user may cancel - in process. If it is canceled in progress, an error will be returned. - All created connections will be closed. -* `pool.Add` now accepts context as first argument, which user may cancel in - process. -* Now you need to pass `map[string]Dialer` to the `pool.Connect` as the second - argument, instead of a list of addresses. Each dialer is associated with a - unique string ID, which allows them to be distinguished. -* `pool.GetPoolInfo` has been renamed to `pool.GetInfo`. Return type has been changed - to `map[string]ConnectionInfo`. +* `pool.Connect` and `pool.ConnectWithOpts` now accept context as the first + argument, which user may cancel in process. If it is canceled in progress, + an error will be returned and all created connections will be closed. +* `pool.Connect` and `pool.ConnectWithOpts` now accept `[]pool.Instance` as + the second argument instead of a list of addresses. Each instance is + associated with a unique string name, `Dialer` and connection options which + allows instances to be independently configured. +* `pool.Add` now accepts context as the first argument, which user may cancel + in process. +* `pool.Add` now accepts `pool.Instance` as the second argument instead of + an address, it allows to configure a new instance more flexible. +* `pool.GetPoolInfo` has been renamed to `pool.GetInfo`. Return type has been + changed to `map[string]ConnectionInfo`. * Operations `Ping`, `Select`, `Insert`, `Replace`, `Delete`, `Update`, `Upsert`, `Call`, `Call16`, `Call17`, `Eval`, `Execute` of a `Pooler` return response data instead of an actual responses. diff --git a/pool/connection_pool.go b/pool/connection_pool.go index 861221290..7dd6e0c46 100644 --- a/pool/connection_pool.go +++ b/pool/connection_pool.go @@ -13,6 +13,7 @@ package pool import ( "context" "errors" + "fmt" "log" "sync" "time" @@ -23,7 +24,7 @@ import ( ) var ( - ErrEmptyDialers = errors.New("dialers (second argument) should not be empty") + ErrEmptyInstances = errors.New("instances (second argument) should not be empty") ErrWrongCheckTimeout = errors.New("wrong check timeout, must be greater than 0") ErrNoConnection = errors.New("no active connections") ErrTooManyArgs = errors.New("too many arguments") @@ -50,7 +51,7 @@ type ConnectionHandler interface { // The client code may cancel adding a connection to the pool. The client // need to return an error from the Discovered call for that. In this case // the pool will close connection and will try to reopen it later. - Discovered(id string, conn *tarantool.Connection, role Role) error + Discovered(name string, conn *tarantool.Connection, role Role) error // Deactivated is called when a connection with a role has become // unavaileble to send requests. It happens if the connection is closed or // the connection role is switched. @@ -61,7 +62,17 @@ type ConnectionHandler interface { // Deactivated will not be called if a previous Discovered() call returns // an error. Because in this case, the connection does not become available // for sending requests. - Deactivated(id string, conn *tarantool.Connection, role Role) error + Deactivated(name string, conn *tarantool.Connection, role Role) error +} + +// Instance describes a single instance configuration in the pool. +type Instance struct { + // Name is an instance name. The name must be unique. + Name string + // Dialer will be used to create a connection to the instance. + Dialer tarantool.Dialer + // Opts configures a connection to the instance. + Opts tarantool.Opts } // Opts provides additional options (configurable via ConnectWithOpts). @@ -97,8 +108,7 @@ type ConnectionPool struct { ends map[string]*endpoint endsMutex sync.RWMutex - connOpts tarantool.Opts - opts Opts + opts Opts state state done chan struct{} @@ -112,8 +122,9 @@ type ConnectionPool struct { var _ Pooler = (*ConnectionPool)(nil) type endpoint struct { - id string + name string dialer tarantool.Dialer + opts tarantool.Opts notify chan tarantool.ConnEvent conn *tarantool.Connection role Role @@ -125,10 +136,11 @@ type endpoint struct { closeErr error } -func newEndpoint(id string, dialer tarantool.Dialer) *endpoint { +func newEndpoint(name string, dialer tarantool.Dialer, opts tarantool.Opts) *endpoint { return &endpoint{ - id: id, + name: name, dialer: dialer, + opts: opts, notify: make(chan tarantool.ConnEvent, 100), conn: nil, role: UnknownRole, @@ -139,34 +151,41 @@ func newEndpoint(id string, dialer tarantool.Dialer) *endpoint { } } -// ConnectWithOpts creates pool for instances with specified dialers and options opts. -// Each dialer corresponds to a certain id by which they will be distinguished. -func ConnectWithOpts(ctx context.Context, dialers map[string]tarantool.Dialer, - connOpts tarantool.Opts, opts Opts) (*ConnectionPool, error) { - if len(dialers) == 0 { - return nil, ErrEmptyDialers +// ConnectWithOpts creates pool for instances with specified instances and +// opts. Instances must have unique names. +func ConnectWithOpts(ctx context.Context, instances []Instance, + opts Opts) (*ConnectionPool, error) { + if len(instances) == 0 { + return nil, ErrEmptyInstances } + unique := make(map[string]bool) + for _, instance := range instances { + if _, ok := unique[instance.Name]; ok { + return nil, fmt.Errorf("duplicate instance name: %q", instance.Name) + } + unique[instance.Name] = true + } + if opts.CheckTimeout <= 0 { return nil, ErrWrongCheckTimeout } - size := len(dialers) + size := len(instances) rwPool := newRoundRobinStrategy(size) roPool := newRoundRobinStrategy(size) anyPool := newRoundRobinStrategy(size) connPool := &ConnectionPool{ - ends: make(map[string]*endpoint), - connOpts: connOpts, - opts: opts, - state: unknownState, - done: make(chan struct{}), - rwPool: rwPool, - roPool: roPool, - anyPool: anyPool, + ends: make(map[string]*endpoint), + opts: opts, + state: unknownState, + done: make(chan struct{}), + rwPool: rwPool, + roPool: roPool, + anyPool: anyPool, } - somebodyAlive, ctxCanceled := connPool.fillPools(ctx, dialers) + somebodyAlive, ctxCanceled := connPool.fillPools(ctx, instances) if !somebodyAlive { connPool.state.set(closedState) if ctxCanceled { @@ -186,18 +205,17 @@ func ConnectWithOpts(ctx context.Context, dialers map[string]tarantool.Dialer, return connPool, nil } -// Connect creates pool for instances with specified dialers. -// Each dialer corresponds to a certain id by which they will be distinguished. +// Connect creates pool for instances with specified instances. Instances must +// have unique names. // // It is useless to set up tarantool.Opts.Reconnect value for a connection. // The connection pool has its own reconnection logic. See // Opts.CheckTimeout description. -func Connect(ctx context.Context, dialers map[string]tarantool.Dialer, - connOpts tarantool.Opts) (*ConnectionPool, error) { +func Connect(ctx context.Context, instances []Instance) (*ConnectionPool, error) { opts := Opts{ CheckTimeout: 1 * time.Second, } - return ConnectWithOpts(ctx, dialers, connOpts, opts) + return ConnectWithOpts(ctx, instances, opts) } // ConnectedNow gets connected status of pool. @@ -234,10 +252,10 @@ func (p *ConnectionPool) ConfiguredTimeout(mode Mode) (time.Duration, error) { return conn.ConfiguredTimeout(), nil } -// Add adds a new endpoint with the id into the pool. This function -// adds the endpoint only after successful connection. -func (p *ConnectionPool) Add(ctx context.Context, id string, dialer tarantool.Dialer) error { - e := newEndpoint(id, dialer) +// Add adds a new instance into the pool. This function adds the instance +// only after successful connection. +func (p *ConnectionPool) Add(ctx context.Context, instance Instance) error { + e := newEndpoint(instance.Name, instance.Dialer, instance.Opts) p.endsMutex.Lock() // Ensure that Close()/CloseGraceful() not in progress/done. @@ -245,7 +263,7 @@ func (p *ConnectionPool) Add(ctx context.Context, id string, dialer tarantool.Di p.endsMutex.Unlock() return ErrClosed } - if _, ok := p.ends[id]; ok { + if _, ok := p.ends[instance.Name]; ok { p.endsMutex.Unlock() return ErrExists } @@ -253,12 +271,12 @@ func (p *ConnectionPool) Add(ctx context.Context, id string, dialer tarantool.Di endpointCtx, cancel := context.WithCancel(context.Background()) e.cancel = cancel - p.ends[id] = e + p.ends[instance.Name] = e p.endsMutex.Unlock() if err := p.tryConnect(ctx, e); err != nil { p.endsMutex.Lock() - delete(p.ends, id) + delete(p.ends, instance.Name) p.endsMutex.Unlock() e.cancel() close(e.closed) @@ -269,11 +287,11 @@ func (p *ConnectionPool) Add(ctx context.Context, id string, dialer tarantool.Di return nil } -// Remove removes an endpoint with the id from the pool. The call +// Remove removes an endpoint with the name from the pool. The call // closes an active connection gracefully. -func (p *ConnectionPool) Remove(id string) error { +func (p *ConnectionPool) Remove(name string) error { p.endsMutex.Lock() - endpoint, ok := p.ends[id] + endpoint, ok := p.ends[name] if !ok { p.endsMutex.Unlock() return errors.New("endpoint not exist") @@ -289,7 +307,7 @@ func (p *ConnectionPool) Remove(id string) error { close(endpoint.shutdown) } - delete(p.ends, id) + delete(p.ends, name) p.endsMutex.Unlock() <-endpoint.closed @@ -357,12 +375,12 @@ func (p *ConnectionPool) GetInfo() map[string]ConnectionInfo { return info } - for id := range p.ends { - conn, role := p.getConnectionFromPool(id) + for name := range p.ends { + conn, role := p.getConnectionFromPool(name) if conn != nil { - info[id] = ConnectionInfo{ConnectedNow: conn.ConnectedNow(), ConnRole: role} + info[name] = ConnectionInfo{ConnectedNow: conn.ConnectedNow(), ConnRole: role} } else { - info[id] = ConnectionInfo{ConnectedNow: false, ConnRole: UnknownRole} + info[name] = ConnectionInfo{ConnectedNow: false, ConnRole: UnknownRole} } } @@ -1011,22 +1029,22 @@ func (p *ConnectionPool) getConnectionRole(conn *tarantool.Connection) (Role, er return UnknownRole, nil } -func (p *ConnectionPool) getConnectionFromPool(id string) (*tarantool.Connection, Role) { - if conn := p.rwPool.GetConnById(id); conn != nil { +func (p *ConnectionPool) getConnectionFromPool(name string) (*tarantool.Connection, Role) { + if conn := p.rwPool.GetConnection(name); conn != nil { return conn, MasterRole } - if conn := p.roPool.GetConnById(id); conn != nil { + if conn := p.roPool.GetConnection(name); conn != nil { return conn, ReplicaRole } - return p.anyPool.GetConnById(id), UnknownRole + return p.anyPool.GetConnection(name), UnknownRole } -func (p *ConnectionPool) deleteConnection(id string) { - if conn := p.anyPool.DeleteConnById(id); conn != nil { - if conn := p.rwPool.DeleteConnById(id); conn == nil { - p.roPool.DeleteConnById(id) +func (p *ConnectionPool) deleteConnection(name string) { + if conn := p.anyPool.DeleteConnection(name); conn != nil { + if conn := p.rwPool.DeleteConnection(name); conn == nil { + p.roPool.DeleteConnection(name) } // The internal connection deinitialization. p.watcherContainer.mutex.RLock() @@ -1039,7 +1057,7 @@ func (p *ConnectionPool) deleteConnection(id string) { } } -func (p *ConnectionPool) addConnection(id string, +func (p *ConnectionPool) addConnection(name string, conn *tarantool.Connection, role Role) error { // The internal connection initialization. p.watcherContainer.mutex.RLock() @@ -1069,78 +1087,80 @@ func (p *ConnectionPool) addConnection(id string, for _, watcher := range watched { watcher.unwatch(conn) } - log.Printf("tarantool: failed initialize watchers for %s: %s", id, err) + log.Printf("tarantool: failed initialize watchers for %s: %s", name, err) return err } } - p.anyPool.AddConn(id, conn) + p.anyPool.AddConnection(name, conn) switch role { case MasterRole: - p.rwPool.AddConn(id, conn) + p.rwPool.AddConnection(name, conn) case ReplicaRole: - p.roPool.AddConn(id, conn) + p.roPool.AddConnection(name, conn) } return nil } -func (p *ConnectionPool) handlerDiscovered(id string, conn *tarantool.Connection, +func (p *ConnectionPool) handlerDiscovered(name string, conn *tarantool.Connection, role Role) bool { var err error if p.opts.ConnectionHandler != nil { - err = p.opts.ConnectionHandler.Discovered(id, conn, role) + err = p.opts.ConnectionHandler.Discovered(name, conn, role) } if err != nil { - log.Printf("tarantool: storing connection to %s canceled: %s\n", id, err) + log.Printf("tarantool: storing connection to %s canceled: %s\n", name, err) return false } return true } -func (p *ConnectionPool) handlerDeactivated(id string, conn *tarantool.Connection, +func (p *ConnectionPool) handlerDeactivated(name string, conn *tarantool.Connection, role Role) { var err error if p.opts.ConnectionHandler != nil { - err = p.opts.ConnectionHandler.Deactivated(id, conn, role) + err = p.opts.ConnectionHandler.Deactivated(name, conn, role) } if err != nil { - log.Printf("tarantool: deactivating connection to %s by user failed: %s\n", id, err) + log.Printf("tarantool: deactivating connection to %s by user failed: %s\n", + name, err) } } -func (p *ConnectionPool) deactivateConnection(id string, conn *tarantool.Connection, role Role) { - p.deleteConnection(id) +func (p *ConnectionPool) deactivateConnection(name string, + conn *tarantool.Connection, role Role) { + p.deleteConnection(name) conn.Close() - p.handlerDeactivated(id, conn, role) + p.handlerDeactivated(name, conn, role) } func (p *ConnectionPool) deactivateConnections() { - for id, endpoint := range p.ends { + for name, endpoint := range p.ends { if endpoint != nil && endpoint.conn != nil { - p.deactivateConnection(id, endpoint.conn, endpoint.role) + p.deactivateConnection(name, endpoint.conn, endpoint.role) } } } func (p *ConnectionPool) processConnection(conn *tarantool.Connection, - id string, end *endpoint) bool { + name string, end *endpoint) bool { role, err := p.getConnectionRole(conn) if err != nil { conn.Close() - log.Printf("tarantool: storing connection to %s failed: %s\n", id, err) + log.Printf("tarantool: storing connection to %s failed: %s\n", name, err) return false } - if !p.handlerDiscovered(id, conn, role) { + if !p.handlerDiscovered(name, conn, role) { conn.Close() return false } - if p.addConnection(id, conn, role) != nil { + if p.addConnection(name, conn, role) != nil { conn.Close() - p.handlerDeactivated(id, conn, role) + p.handlerDeactivated(name, conn, role) return false } @@ -1149,27 +1169,27 @@ func (p *ConnectionPool) processConnection(conn *tarantool.Connection, return true } -func (p *ConnectionPool) fillPools( - ctx context.Context, - dialers map[string]tarantool.Dialer) (bool, bool) { +func (p *ConnectionPool) fillPools(ctx context.Context, + instances []Instance) (bool, bool) { somebodyAlive := false ctxCanceled := false // It is called before controller() goroutines, so we don't expect // concurrency issues here. - for id, dialer := range dialers { - end := newEndpoint(id, dialer) - p.ends[id] = end - connOpts := p.connOpts + for _, instance := range instances { + end := newEndpoint(instance.Name, instance.Dialer, instance.Opts) + p.ends[instance.Name] = end + connOpts := instance.Opts connOpts.Notify = end.notify - conn, err := tarantool.Connect(ctx, dialer, connOpts) + conn, err := tarantool.Connect(ctx, instance.Dialer, connOpts) if err != nil { - log.Printf("tarantool: connect to %s failed: %s\n", id, err.Error()) + log.Printf("tarantool: connect to %s failed: %s\n", + instance.Name, err) select { case <-ctx.Done(): ctxCanceled = true - p.ends[id] = nil + p.ends[instance.Name] = nil log.Printf("tarantool: operation was canceled") p.deactivateConnections() @@ -1177,7 +1197,7 @@ func (p *ConnectionPool) fillPools( return false, ctxCanceled default: } - } else if p.processConnection(conn, id, end) { + } else if p.processConnection(conn, instance.Name, end) { somebodyAlive = true } } @@ -1195,11 +1215,11 @@ func (p *ConnectionPool) updateConnection(e *endpoint) { if role, err := p.getConnectionRole(e.conn); err == nil { if e.role != role { - p.deleteConnection(e.id) + p.deleteConnection(e.name) p.poolsMutex.Unlock() - p.handlerDeactivated(e.id, e.conn, e.role) - opened := p.handlerDiscovered(e.id, e.conn, role) + p.handlerDeactivated(e.name, e.conn, e.role) + opened := p.handlerDiscovered(e.name, e.conn, role) if !opened { e.conn.Close() e.conn = nil @@ -1212,17 +1232,17 @@ func (p *ConnectionPool) updateConnection(e *endpoint) { p.poolsMutex.Unlock() e.conn.Close() - p.handlerDeactivated(e.id, e.conn, role) + p.handlerDeactivated(e.name, e.conn, role) e.conn = nil e.role = UnknownRole return } - if p.addConnection(e.id, e.conn, role) != nil { + if p.addConnection(e.name, e.conn, role) != nil { p.poolsMutex.Unlock() e.conn.Close() - p.handlerDeactivated(e.id, e.conn, role) + p.handlerDeactivated(e.name, e.conn, role) e.conn = nil e.role = UnknownRole return @@ -1232,11 +1252,11 @@ func (p *ConnectionPool) updateConnection(e *endpoint) { p.poolsMutex.Unlock() return } else { - p.deleteConnection(e.id) + p.deleteConnection(e.name) p.poolsMutex.Unlock() e.conn.Close() - p.handlerDeactivated(e.id, e.conn, e.role) + p.handlerDeactivated(e.name, e.conn, e.role) e.conn = nil e.role = UnknownRole return @@ -1254,7 +1274,7 @@ func (p *ConnectionPool) tryConnect(ctx context.Context, e *endpoint) error { e.conn = nil e.role = UnknownRole - connOpts := p.connOpts + connOpts := e.opts connOpts.Notify = e.notify conn, err := tarantool.Connect(ctx, e.dialer, connOpts) if err == nil { @@ -1264,11 +1284,11 @@ func (p *ConnectionPool) tryConnect(ctx context.Context, e *endpoint) error { if err != nil { conn.Close() log.Printf("tarantool: storing connection to %s failed: %s\n", - e.id, err) + e.name, err) return err } - opened := p.handlerDiscovered(e.id, conn, role) + opened := p.handlerDiscovered(e.name, conn, role) if !opened { conn.Close() return errors.New("storing connection canceled") @@ -1278,14 +1298,14 @@ func (p *ConnectionPool) tryConnect(ctx context.Context, e *endpoint) error { if p.state.get() != connectedState { p.poolsMutex.Unlock() conn.Close() - p.handlerDeactivated(e.id, conn, role) + p.handlerDeactivated(e.name, conn, role) return ErrClosed } - if err = p.addConnection(e.id, conn, role); err != nil { + if err = p.addConnection(e.name, conn, role); err != nil { p.poolsMutex.Unlock() conn.Close() - p.handlerDeactivated(e.id, conn, role) + p.handlerDeactivated(e.name, conn, role) return err } e.conn = conn @@ -1304,10 +1324,10 @@ func (p *ConnectionPool) reconnect(ctx context.Context, e *endpoint) { return } - p.deleteConnection(e.id) + p.deleteConnection(e.name) p.poolsMutex.Unlock() - p.handlerDeactivated(e.id, e.conn, e.role) + p.handlerDeactivated(e.name, e.conn, e.role) e.conn = nil e.role = UnknownRole @@ -1340,12 +1360,12 @@ func (p *ConnectionPool) controller(ctx context.Context, e *endpoint) { case <-e.close: if e.conn != nil { p.poolsMutex.Lock() - p.deleteConnection(e.id) + p.deleteConnection(e.name) p.poolsMutex.Unlock() if !shutdown { e.closeErr = e.conn.Close() - p.handlerDeactivated(e.id, e.conn, e.role) + p.handlerDeactivated(e.name, e.conn, e.role) close(e.closed) } else { // Force close the connection. @@ -1362,7 +1382,7 @@ func (p *ConnectionPool) controller(ctx context.Context, e *endpoint) { shutdown = true if e.conn != nil { p.poolsMutex.Lock() - p.deleteConnection(e.id) + p.deleteConnection(e.name) p.poolsMutex.Unlock() // We need to catch s.close in the current goroutine, so @@ -1384,9 +1404,9 @@ func (p *ConnectionPool) controller(ctx context.Context, e *endpoint) { if e.conn != nil && e.conn.ClosedNow() { p.poolsMutex.Lock() if p.state.get() == connectedState { - p.deleteConnection(e.id) + p.deleteConnection(e.name) p.poolsMutex.Unlock() - p.handlerDeactivated(e.id, e.conn, e.role) + p.handlerDeactivated(e.name, e.conn, e.role) e.conn = nil e.role = UnknownRole } else { diff --git a/pool/connection_pool_test.go b/pool/connection_pool_test.go index acdc756d3..ea19225c8 100644 --- a/pool/connection_pool_test.go +++ b/pool/connection_pool_test.go @@ -30,8 +30,6 @@ var indexNo = uint32(0) var ports = []string{"3013", "3014", "3015", "3016", "3017"} var host = "127.0.0.1" -type DialersMap = map[string]tarantool.Dialer - var servers = []string{ strings.Join([]string{host, ports[0]}, ":"), strings.Join([]string{host, ports[1]}, ":"), @@ -40,9 +38,9 @@ var servers = []string{ strings.Join([]string{host, ports[4]}, ":"), } -func makeDialer(serv string) tarantool.Dialer { +func makeDialer(server string) tarantool.Dialer { return tarantool.NetDialer{ - Address: serv, + Address: server, User: user, Password: pass, } @@ -50,23 +48,35 @@ func makeDialer(serv string) tarantool.Dialer { func makeDialers(servers []string) []tarantool.Dialer { dialers := make([]tarantool.Dialer, 0, len(servers)) - for _, serv := range servers { - dialers = append(dialers, makeDialer(serv)) + for _, server := range servers { + dialers = append(dialers, makeDialer(server)) } return dialers } -func makeDialersMap(servers []string) DialersMap { - dialersMap := DialersMap{} - for _, serv := range servers { - dialersMap[serv] = makeDialer(serv) +var dialers = makeDialers(servers) + +func makeInstance(server string, opts tarantool.Opts) pool.Instance { + return pool.Instance{ + Name: server, + Dialer: tarantool.NetDialer{ + Address: server, + User: user, + Password: pass, + }, + Opts: opts, } - return dialersMap } -var dialers = makeDialers(servers) -var dialersMap = makeDialersMap(servers) +func makeInstances(servers []string, opts tarantool.Opts) []pool.Instance { + var instances []pool.Instance + for _, server := range servers { + instances = append(instances, makeInstance(server, opts)) + } + return instances +} +var instances = makeInstances(servers, connOpts) var connOpts = tarantool.Opts{ Timeout: 5 * time.Second, } @@ -74,32 +84,40 @@ var connOpts = tarantool.Opts{ var defaultCountRetry = 5 var defaultTimeoutRetry = 500 * time.Millisecond -var instances []test_helpers.TarantoolInstance +var helpInstances []test_helpers.TarantoolInstance -func TestConnError_IncorrectParams(t *testing.T) { +func TestConnect_error_empty_instances(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() - connPool, err := pool.Connect(ctx, DialersMap{}, tarantool.Opts{}) + connPool, err := pool.Connect(ctx, []pool.Instance{}) cancel() require.Nilf(t, connPool, "conn is not nil with incorrect param") - require.NotNilf(t, err, "err is nil with incorrect params") - require.Equal(t, "dialers (second argument) should not be empty", err.Error()) - - ctx, cancel = test_helpers.GetPoolConnectContext() - connPool, err = pool.Connect(ctx, DialersMap{ - "err1": tarantool.NetDialer{Address: "err1"}, - "err2": tarantool.NetDialer{Address: "err2"}, - }, connOpts) + require.ErrorIs(t, err, pool.ErrEmptyInstances) +} + +func TestConnect_error_unavailable(t *testing.T) { + ctx, cancel := test_helpers.GetPoolConnectContext() + connPool, err := pool.Connect(ctx, makeInstances([]string{"err1", "err2"}, connOpts)) cancel() require.Nilf(t, connPool, "conn is not nil with incorrect param") - require.NotNilf(t, err, "err is nil with incorrect params") - require.Equal(t, "no active connections", err.Error()) + require.ErrorIs(t, err, pool.ErrNoConnection) +} + +func TestConnect_error_duplicate(t *testing.T) { + ctx, cancel := test_helpers.GetPoolConnectContext() + connPool, err := pool.Connect(ctx, makeInstances([]string{"foo", "foo"}, connOpts)) + cancel() + + require.Nilf(t, connPool, "conn is not nil with incorrect param") + require.EqualError(t, err, "duplicate instance name: \"foo\"") +} - ctx, cancel = test_helpers.GetPoolConnectContext() - connPool, err = pool.ConnectWithOpts(ctx, dialersMap, tarantool.Opts{}, pool.Opts{}) +func TestConnectWithOpts_error_no_timeout(t *testing.T) { + ctx, cancel := test_helpers.GetPoolConnectContext() + connPool, err := pool.ConnectWithOpts(ctx, makeInstances([]string{"any"}, connOpts), + pool.Opts{}) cancel() require.Nilf(t, connPool, "conn is not nil with incorrect param") - require.NotNilf(t, err, "err is nil with incorrect params") - require.Equal(t, "wrong check timeout, must be greater than 0", err.Error()) + require.ErrorIs(t, err, pool.ErrWrongCheckTimeout) } func TestConnSuccessfully(t *testing.T) { @@ -107,7 +125,7 @@ func TestConnSuccessfully(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, makeDialersMap([]string{healthyServ, "err"}), connOpts) + connPool, err := pool.Connect(ctx, makeInstances([]string{healthyServ, "err"}, connOpts)) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -140,7 +158,7 @@ func TestConnErrorAfterCtxCancel(t *testing.T) { var err error cancel() - connPool, err = pool.Connect(ctx, dialersMap, connLongReconnectOpts) + connPool, err = pool.Connect(ctx, makeInstances(servers, connLongReconnectOpts)) if connPool != nil || err == nil { t.Fatalf("ConnectionPool was created after cancel") @@ -180,17 +198,21 @@ func TestContextCancelInProgress(t *testing.T) { defer cancel() cnt := new(int) - poolDialers := DialersMap{} - for _, serv := range servers { - poolDialers[serv] = &mockClosingDialer{ - addr: serv, - cnt: cnt, - ctx: ctx, - ctxCancel: cancel, - } + var instances []pool.Instance + for _, server := range servers { + instances = append(instances, pool.Instance{ + Name: server, + Dialer: &mockClosingDialer{ + addr: server, + cnt: cnt, + ctx: ctx, + ctxCancel: cancel, + }, + Opts: connOpts, + }) } - connPool, err := pool.Connect(ctx, poolDialers, tarantool.Opts{}) + connPool, err := pool.Connect(ctx, instances) require.NotNilf(t, err, "expected err after ctx cancel") assert.Truef(t, strings.Contains(err.Error(), "operation was canceled"), fmt.Sprintf("unexpected error, expected to contain %s, got %v", @@ -201,18 +223,22 @@ func TestContextCancelInProgress(t *testing.T) { func TestConnSuccessfullyDuplicates(t *testing.T) { server := servers[0] - poolDialers := DialersMap{} + var instances []pool.Instance for i := 0; i < 4; i++ { - poolDialers[fmt.Sprintf("c%d", i)] = tarantool.NetDialer{ - Address: server, - User: user, - Password: pass, - } + instances = append(instances, pool.Instance{ + Name: fmt.Sprintf("c%d", i), + Dialer: tarantool.NetDialer{ + Address: server, + User: user, + Password: pass, + }, + Opts: connOpts, + }) } ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, poolDialers, connOpts) + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -240,13 +266,13 @@ func TestReconnect(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, dialersMap, connOpts) + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") defer connPool.Close() - test_helpers.StopTarantoolWithCleanup(instances[0]) + test_helpers.StopTarantoolWithCleanup(helpInstances[0]) args := test_helpers.CheckStatusesArgs{ ConnPool: connPool, @@ -262,7 +288,7 @@ func TestReconnect(t *testing.T) { defaultCountRetry, defaultTimeoutRetry) require.Nil(t, err) - err = test_helpers.RestartTarantool(&instances[0]) + err = test_helpers.RestartTarantool(&helpInstances[0]) require.Nilf(t, err, "failed to restart tarantool") args = test_helpers.CheckStatusesArgs{ @@ -289,14 +315,14 @@ func TestDisconnect_withReconnect(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, makeDialersMap([]string{server}), opts) + connPool, err := pool.Connect(ctx, makeInstances([]string{server}, opts)) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") defer connPool.Close() // Test. - test_helpers.StopTarantoolWithCleanup(instances[serverId]) + test_helpers.StopTarantoolWithCleanup(helpInstances[serverId]) args := test_helpers.CheckStatusesArgs{ ConnPool: connPool, Mode: pool.ANY, @@ -311,7 +337,7 @@ func TestDisconnect_withReconnect(t *testing.T) { require.Nil(t, err) // Restart the server after success. - err = test_helpers.RestartTarantool(&instances[serverId]) + err = test_helpers.RestartTarantool(&helpInstances[serverId]) require.Nilf(t, err, "failed to restart tarantool") args = test_helpers.CheckStatusesArgs{ @@ -335,14 +361,14 @@ func TestDisconnectAll(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, makeDialersMap([]string{server1, server2}), connOpts) + connPool, err := pool.Connect(ctx, makeInstances([]string{server1, server2}, connOpts)) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") defer connPool.Close() - test_helpers.StopTarantoolWithCleanup(instances[0]) - test_helpers.StopTarantoolWithCleanup(instances[1]) + test_helpers.StopTarantoolWithCleanup(helpInstances[0]) + test_helpers.StopTarantoolWithCleanup(helpInstances[1]) args := test_helpers.CheckStatusesArgs{ ConnPool: connPool, @@ -359,10 +385,10 @@ func TestDisconnectAll(t *testing.T) { defaultCountRetry, defaultTimeoutRetry) require.Nil(t, err) - err = test_helpers.RestartTarantool(&instances[0]) + err = test_helpers.RestartTarantool(&helpInstances[0]) require.Nilf(t, err, "failed to restart tarantool") - err = test_helpers.RestartTarantool(&instances[1]) + err = test_helpers.RestartTarantool(&helpInstances[1]) require.Nilf(t, err, "failed to restart tarantool") args = test_helpers.CheckStatusesArgs{ @@ -384,7 +410,7 @@ func TestDisconnectAll(t *testing.T) { func TestAdd(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, makeDialersMap(servers[:1]), connOpts) + connPool, err := pool.Connect(ctx, makeInstances(servers[:1], connOpts)) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -392,7 +418,7 @@ func TestAdd(t *testing.T) { for _, server := range servers[1:] { ctx, cancel := test_helpers.GetConnectContext() - err = connPool.Add(ctx, server, dialersMap[server]) + err = connPool.Add(ctx, makeInstance(server, connOpts)) cancel() require.Nil(t, err) } @@ -419,7 +445,7 @@ func TestAdd(t *testing.T) { func TestAdd_exist(t *testing.T) { server := servers[0] ctx, cancel := test_helpers.GetPoolConnectContext() - connPool, err := pool.Connect(ctx, makeDialersMap([]string{server}), connOpts) + connPool, err := pool.Connect(ctx, makeInstances([]string{server}, connOpts)) cancel() require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -427,7 +453,7 @@ func TestAdd_exist(t *testing.T) { defer connPool.Close() ctx, cancel = test_helpers.GetConnectContext() - err = connPool.Add(ctx, server, makeDialer(server)) + err = connPool.Add(ctx, makeInstance(server, connOpts)) cancel() require.Equal(t, pool.ErrExists, err) @@ -450,7 +476,7 @@ func TestAdd_unreachable(t *testing.T) { server := servers[0] ctx, cancel := test_helpers.GetPoolConnectContext() - connPool, err := pool.Connect(ctx, makeDialersMap([]string{server}), connOpts) + connPool, err := pool.Connect(ctx, makeInstances([]string{server}, connOpts)) cancel() require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -459,8 +485,12 @@ func TestAdd_unreachable(t *testing.T) { unhealthyServ := "127.0.0.2:6667" ctx, cancel = test_helpers.GetConnectContext() - err = connPool.Add(ctx, unhealthyServ, tarantool.NetDialer{ - Address: unhealthyServ, + err = connPool.Add(ctx, pool.Instance{ + Name: unhealthyServ, + Dialer: tarantool.NetDialer{ + Address: unhealthyServ, + }, + Opts: connOpts, }) cancel() // The OS-dependent error so we just check for existence. @@ -484,14 +514,14 @@ func TestAdd_unreachable(t *testing.T) { func TestAdd_afterClose(t *testing.T) { server := servers[0] ctx, cancel := test_helpers.GetPoolConnectContext() - connPool, err := pool.Connect(ctx, makeDialersMap([]string{server}), connOpts) + connPool, err := pool.Connect(ctx, makeInstances([]string{server}, connOpts)) cancel() require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") connPool.Close() ctx, cancel = test_helpers.GetConnectContext() - err = connPool.Add(ctx, server, dialersMap[server]) + err = connPool.Add(ctx, makeInstance(server, connOpts)) cancel() assert.Equal(t, err, pool.ErrClosed) } @@ -501,7 +531,7 @@ func TestAdd_Close_concurrent(t *testing.T) { serv1 := servers[1] ctx, cancel := test_helpers.GetPoolConnectContext() - connPool, err := pool.Connect(ctx, makeDialersMap([]string{serv0}), connOpts) + connPool, err := pool.Connect(ctx, makeInstances([]string{serv0}, connOpts)) cancel() require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -512,7 +542,7 @@ func TestAdd_Close_concurrent(t *testing.T) { defer wg.Done() ctx, cancel := test_helpers.GetConnectContext() - err = connPool.Add(ctx, serv1, makeDialer(serv1)) + err = connPool.Add(ctx, makeInstance(serv1, connOpts)) cancel() if err != nil { assert.Equal(t, pool.ErrClosed, err) @@ -529,7 +559,7 @@ func TestAdd_CloseGraceful_concurrent(t *testing.T) { serv1 := servers[1] ctx, cancel := test_helpers.GetPoolConnectContext() - connPool, err := pool.Connect(ctx, makeDialersMap([]string{serv0}), connOpts) + connPool, err := pool.Connect(ctx, makeInstances([]string{serv0}, connOpts)) cancel() require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -540,7 +570,7 @@ func TestAdd_CloseGraceful_concurrent(t *testing.T) { defer wg.Done() ctx, cancel := test_helpers.GetConnectContext() - err = connPool.Add(ctx, serv1, makeDialer(serv1)) + err = connPool.Add(ctx, makeInstance(serv1, connOpts)) cancel() if err != nil { assert.Equal(t, pool.ErrClosed, err) @@ -555,7 +585,7 @@ func TestAdd_CloseGraceful_concurrent(t *testing.T) { func TestRemove(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, dialersMap, connOpts) + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -584,7 +614,7 @@ func TestRemove(t *testing.T) { func TestRemove_double(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, makeDialersMap(servers[:2]), connOpts) + connPool, err := pool.Connect(ctx, makeInstances(servers[:2], connOpts)) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -613,7 +643,7 @@ func TestRemove_double(t *testing.T) { func TestRemove_unknown(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, makeDialersMap(servers[:2]), connOpts) + connPool, err := pool.Connect(ctx, makeInstances(servers[:2], connOpts)) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -641,7 +671,7 @@ func TestRemove_unknown(t *testing.T) { func TestRemove_concurrent(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, makeDialersMap(servers[:2]), connOpts) + connPool, err := pool.Connect(ctx, makeInstances(servers[:2], connOpts)) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -690,7 +720,7 @@ func TestRemove_concurrent(t *testing.T) { func TestRemove_Close_concurrent(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, makeDialersMap(servers[:2]), connOpts) + connPool, err := pool.Connect(ctx, makeInstances(servers[:2], connOpts)) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -711,7 +741,7 @@ func TestRemove_Close_concurrent(t *testing.T) { func TestRemove_CloseGraceful_concurrent(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, makeDialersMap(servers[:2]), connOpts) + connPool, err := pool.Connect(ctx, makeInstances(servers[:2], connOpts)) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -735,7 +765,7 @@ func TestClose(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, makeDialersMap([]string{server1, server2}), connOpts) + connPool, err := pool.Connect(ctx, makeInstances([]string{server1, server2}, connOpts)) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -777,7 +807,7 @@ func TestCloseGraceful(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, makeDialersMap([]string{server1, server2}), connOpts) + connPool, err := pool.Connect(ctx, makeInstances([]string{server1, server2}, connOpts)) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -842,8 +872,7 @@ func (h *testHandler) addErr(err error) { h.errs = append(h.errs, err) } -func (h *testHandler) Discovered(id string, conn *tarantool.Connection, - +func (h *testHandler) Discovered(name string, conn *tarantool.Connection, role pool.Role) error { discovered := atomic.AddUint32(&h.discovered, 1) @@ -854,29 +883,28 @@ func (h *testHandler) Discovered(id string, conn *tarantool.Connection, // discovered < 3 - initial open of connections // discovered >= 3 - update a connection after a role update - if id == servers[0] { + if name == servers[0] { if discovered < 3 && role != pool.MasterRole { - h.addErr(fmt.Errorf("unexpected init role %d for id %s", role, id)) + h.addErr(fmt.Errorf("unexpected init role %d for name %s", role, name)) } if discovered >= 3 && role != pool.ReplicaRole { - h.addErr(fmt.Errorf("unexpected updated role %d for id %s", role, id)) + h.addErr(fmt.Errorf("unexpected updated role %d for name %s", role, name)) } - } else if id == servers[1] { + } else if name == servers[1] { if discovered >= 3 { - h.addErr(fmt.Errorf("unexpected discovery for id %s", id)) + h.addErr(fmt.Errorf("unexpected discovery for name %s", name)) } if role != pool.ReplicaRole { - h.addErr(fmt.Errorf("unexpected role %d for id %s", role, id)) + h.addErr(fmt.Errorf("unexpected role %d for name %s", role, name)) } } else { - h.addErr(fmt.Errorf("unexpected discovered id %s", id)) + h.addErr(fmt.Errorf("unexpected discovered name %s", name)) } return nil } -func (h *testHandler) Deactivated(id string, conn *tarantool.Connection, - +func (h *testHandler) Deactivated(name string, conn *tarantool.Connection, role pool.Role) error { deactivated := atomic.AddUint32(&h.deactivated, 1) @@ -885,21 +913,21 @@ func (h *testHandler) Deactivated(id string, conn *tarantool.Connection, return nil } - if deactivated == 1 && id == servers[0] { + if deactivated == 1 && name == servers[0] { // A first close is a role update. if role != pool.MasterRole { - h.addErr(fmt.Errorf("unexpected removed role %d for id %s", role, id)) + h.addErr(fmt.Errorf("unexpected removed role %d for name %s", role, name)) } return nil } - if id == servers[0] || id == servers[1] { + if name == servers[0] || name == servers[1] { // Close. if role != pool.ReplicaRole { - h.addErr(fmt.Errorf("unexpected removed role %d for id %s", role, id)) + h.addErr(fmt.Errorf("unexpected removed role %d for name %s", role, name)) } } else { - h.addErr(fmt.Errorf("unexpected removed id %s", id)) + h.addErr(fmt.Errorf("unexpected removed name %s", name)) } return nil @@ -907,7 +935,7 @@ func (h *testHandler) Deactivated(id string, conn *tarantool.Connection, func TestConnectionHandlerOpenUpdateClose(t *testing.T) { poolServers := []string{servers[0], servers[1]} - poolDialers := makeDialersMap(poolServers) + poolInstances := makeInstances(poolServers, connOpts) roles := []bool{false, true} err := test_helpers.SetClusterRO(makeDialers(poolServers), connOpts, roles) @@ -920,7 +948,7 @@ func TestConnectionHandlerOpenUpdateClose(t *testing.T) { } ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.ConnectWithOpts(ctx, poolDialers, connOpts, poolOpts) + connPool, err := pool.ConnectWithOpts(ctx, poolInstances, poolOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -974,15 +1002,13 @@ type testAddErrorHandler struct { discovered, deactivated int } -func (h *testAddErrorHandler) Discovered(id string, conn *tarantool.Connection, - +func (h *testAddErrorHandler) Discovered(name string, conn *tarantool.Connection, role pool.Role) error { h.discovered++ return fmt.Errorf("any error") } -func (h *testAddErrorHandler) Deactivated(id string, conn *tarantool.Connection, - +func (h *testAddErrorHandler) Deactivated(name string, conn *tarantool.Connection, role pool.Role) error { h.deactivated++ return nil @@ -998,7 +1024,7 @@ func TestConnectionHandlerOpenError(t *testing.T) { } ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.ConnectWithOpts(ctx, makeDialersMap(poolServers), connOpts, poolOpts) + connPool, err := pool.ConnectWithOpts(ctx, makeInstances(poolServers, connOpts), poolOpts) if err == nil { defer connPool.Close() } @@ -1011,8 +1037,7 @@ type testUpdateErrorHandler struct { discovered, deactivated uint32 } -func (h *testUpdateErrorHandler) Discovered(id string, conn *tarantool.Connection, - +func (h *testUpdateErrorHandler) Discovered(name string, conn *tarantool.Connection, role pool.Role) error { atomic.AddUint32(&h.discovered, 1) @@ -1023,8 +1048,7 @@ func (h *testUpdateErrorHandler) Discovered(id string, conn *tarantool.Connectio return nil } -func (h *testUpdateErrorHandler) Deactivated(id string, conn *tarantool.Connection, - +func (h *testUpdateErrorHandler) Deactivated(name string, conn *tarantool.Connection, role pool.Role) error { atomic.AddUint32(&h.deactivated, 1) return nil @@ -1032,7 +1056,7 @@ func (h *testUpdateErrorHandler) Deactivated(id string, conn *tarantool.Connecti func TestConnectionHandlerUpdateError(t *testing.T) { poolServers := []string{servers[0], servers[1]} - poolDialers := makeDialersMap(poolServers) + poolInstances := makeInstances(poolServers, connOpts) roles := []bool{false, false} err := test_helpers.SetClusterRO(makeDialers(poolServers), connOpts, roles) @@ -1045,7 +1069,7 @@ func TestConnectionHandlerUpdateError(t *testing.T) { } ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.ConnectWithOpts(ctx, poolDialers, connOpts, poolOpts) + connPool, err := pool.ConnectWithOpts(ctx, poolInstances, poolOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") defer connPool.Close() @@ -1092,14 +1116,14 @@ func TestRequestOnClosed(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, makeDialersMap([]string{server1, server2}), connOpts) + connPool, err := pool.Connect(ctx, makeInstances([]string{server1, server2}, connOpts)) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") defer connPool.Close() - test_helpers.StopTarantoolWithCleanup(instances[0]) - test_helpers.StopTarantoolWithCleanup(instances[1]) + test_helpers.StopTarantoolWithCleanup(helpInstances[0]) + test_helpers.StopTarantoolWithCleanup(helpInstances[1]) args := test_helpers.CheckStatusesArgs{ ConnPool: connPool, @@ -1118,10 +1142,10 @@ func TestRequestOnClosed(t *testing.T) { _, err = connPool.Ping(pool.ANY) require.NotNilf(t, err, "err is nil after Ping") - err = test_helpers.RestartTarantool(&instances[0]) + err = test_helpers.RestartTarantool(&helpInstances[0]) require.Nilf(t, err, "failed to restart tarantool") - err = test_helpers.RestartTarantool(&instances[1]) + err = test_helpers.RestartTarantool(&helpInstances[1]) require.Nilf(t, err, "failed to restart tarantool") } @@ -1133,7 +1157,7 @@ func TestCall(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, dialersMap, connOpts) + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1192,7 +1216,7 @@ func TestCall16(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, dialersMap, connOpts) + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1251,7 +1275,7 @@ func TestCall17(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, dialersMap, connOpts) + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1310,7 +1334,7 @@ func TestEval(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, dialersMap, connOpts) + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1390,7 +1414,7 @@ func TestExecute(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, dialersMap, connOpts) + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1448,7 +1472,7 @@ func TestRoundRobinStrategy(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, dialersMap, connOpts) + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1527,7 +1551,7 @@ func TestRoundRobinStrategy_NoReplica(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, dialersMap, connOpts) + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1600,7 +1624,7 @@ func TestRoundRobinStrategy_NoMaster(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, dialersMap, connOpts) + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1685,7 +1709,7 @@ func TestUpdateInstancesRoles(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, dialersMap, connOpts) + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1831,7 +1855,7 @@ func TestInsert(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, dialersMap, connOpts) + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1930,7 +1954,7 @@ func TestDelete(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, dialersMap, connOpts) + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -1994,7 +2018,7 @@ func TestUpsert(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, dialersMap, connOpts) + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2066,7 +2090,7 @@ func TestUpdate(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, dialersMap, connOpts) + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2155,7 +2179,7 @@ func TestReplace(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, dialersMap, connOpts) + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2240,7 +2264,7 @@ func TestSelect(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, dialersMap, connOpts) + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2362,7 +2386,7 @@ func TestPing(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, dialersMap, connOpts) + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2401,7 +2425,7 @@ func TestDo(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, dialersMap, connOpts) + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2437,7 +2461,7 @@ func TestDo_concurrent(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, dialersMap, connOpts) + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2470,7 +2494,7 @@ func TestNewPrepared(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, dialersMap, connOpts) + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2542,7 +2566,7 @@ func TestDoWithStrangerConn(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, dialersMap, connOpts) + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") @@ -2572,7 +2596,7 @@ func TestStream_Commit(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, dialersMap, connOpts) + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") defer connPool.Close() @@ -2663,7 +2687,7 @@ func TestStream_Rollback(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, dialersMap, connOpts) + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") defer connPool.Close() @@ -2753,7 +2777,7 @@ func TestStream_TxnIsolationLevel(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, dialersMap, connOpts) + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") defer connPool.Close() @@ -2834,7 +2858,7 @@ func TestConnectionPool_NewWatcher_no_watchers(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, dialersMap, connOpts) + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nill after Connect") defer connPool.Close() @@ -2864,7 +2888,7 @@ func TestConnectionPool_NewWatcher_modes(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, dialersMap, connOpts) + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") defer connPool.Close() @@ -2944,7 +2968,7 @@ func TestConnectionPool_NewWatcher_update(t *testing.T) { } ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - pool, err := pool.ConnectWithOpts(ctx, dialersMap, connOpts, poolOpts) + pool, err := pool.ConnectWithOpts(ctx, instances, poolOpts) require.Nilf(t, err, "failed to connect") require.NotNilf(t, pool, "conn is nil after Connect") @@ -3026,7 +3050,7 @@ func TestWatcher_Unregister(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - pool, err := pool.Connect(ctx, dialersMap, connOpts) + pool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, pool, "conn is nil after Connect") defer pool.Close() @@ -3083,7 +3107,7 @@ func TestConnectionPool_NewWatcher_concurrent(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, dialersMap, connOpts) + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") defer connPool.Close() @@ -3121,7 +3145,7 @@ func TestWatcher_Unregister_concurrent(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, dialersMap, connOpts) + connPool, err := pool.Connect(ctx, instances) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") defer connPool.Close() @@ -3177,14 +3201,14 @@ func runTestMain(m *testing.M) int { }) } - instances, err = test_helpers.StartTarantoolInstances(instsOpts) + helpInstances, err = test_helpers.StartTarantoolInstances(instsOpts) if err != nil { log.Fatalf("Failed to prepare test tarantool: %s", err) return -1 } - defer test_helpers.StopTarantoolInstances(instances) + defer test_helpers.StopTarantoolInstances(helpInstances) return m.Run() } diff --git a/pool/example_test.go b/pool/example_test.go index 7eb480177..a4d3d4ba1 100644 --- a/pool/example_test.go +++ b/pool/example_test.go @@ -29,7 +29,7 @@ func examplePool(roles []bool, } ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, dialersMap, connOpts) + connPool, err := pool.Connect(ctx, instances) if err != nil || connPool == nil { return nil, fmt.Errorf("ConnectionPool is not established") } @@ -39,16 +39,21 @@ func examplePool(roles []bool, func exampleFeaturesPool(roles []bool, connOpts tarantool.Opts, requiredProtocol tarantool.ProtocolInfo) (*pool.ConnectionPool, error) { - poolDialersMap := map[string]tarantool.Dialer{} + poolInstances := []pool.Instance{} poolDialers := []tarantool.Dialer{} - for _, serv := range servers { - poolDialersMap[serv] = tarantool.NetDialer{ - Address: serv, + for _, server := range servers { + dialer := tarantool.NetDialer{ + Address: server, User: user, Password: pass, RequiredProtocolInfo: requiredProtocol, } - poolDialers = append(poolDialers, poolDialersMap[serv]) + poolInstances = append(poolInstances, pool.Instance{ + Name: server, + Dialer: dialer, + Opts: connOpts, + }) + poolDialers = append(poolDialers, dialer) } err := test_helpers.SetClusterRO(poolDialers, connOpts, roles) if err != nil { @@ -56,7 +61,7 @@ func exampleFeaturesPool(roles []bool, connOpts tarantool.Opts, } ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, poolDialersMap, connOpts) + connPool, err := pool.Connect(ctx, poolInstances) if err != nil || connPool == nil { return nil, fmt.Errorf("ConnectionPool is not established") } diff --git a/pool/round_robin.go b/pool/round_robin.go index 6e0b158f4..82cf26f39 100644 --- a/pool/round_robin.go +++ b/pool/round_robin.go @@ -24,7 +24,7 @@ func newRoundRobinStrategy(size int) *roundRobinStrategy { } } -func (r *roundRobinStrategy) GetConnById(id string) *tarantool.Connection { +func (r *roundRobinStrategy) GetConnection(id string) *tarantool.Connection { r.mutex.RLock() defer r.mutex.RUnlock() @@ -36,7 +36,7 @@ func (r *roundRobinStrategy) GetConnById(id string) *tarantool.Connection { return r.conns[index] } -func (r *roundRobinStrategy) DeleteConnById(id string) *tarantool.Connection { +func (r *roundRobinStrategy) DeleteConnection(id string) *tarantool.Connection { r.mutex.Lock() defer r.mutex.Unlock() @@ -93,7 +93,7 @@ func (r *roundRobinStrategy) GetConnections() map[string]*tarantool.Connection { return conns } -func (r *roundRobinStrategy) AddConn(id string, conn *tarantool.Connection) { +func (r *roundRobinStrategy) AddConnection(id string, conn *tarantool.Connection) { r.mutex.Lock() defer r.mutex.Unlock() diff --git a/pool/round_robin_test.go b/pool/round_robin_test.go index 6f133a799..6f028f2de 100644 --- a/pool/round_robin_test.go +++ b/pool/round_robin_test.go @@ -18,11 +18,11 @@ func TestRoundRobinAddDelete(t *testing.T) { conns := []*tarantool.Connection{&tarantool.Connection{}, &tarantool.Connection{}} for i, addr := range addrs { - rr.AddConn(addr, conns[i]) + rr.AddConnection(addr, conns[i]) } for i, addr := range addrs { - if conn := rr.DeleteConnById(addr); conn != conns[i] { + if conn := rr.DeleteConnection(addr); conn != conns[i] { t.Errorf("Unexpected connection on address %s", addr) } } @@ -37,16 +37,16 @@ func TestRoundRobinAddDuplicateDelete(t *testing.T) { conn1 := &tarantool.Connection{} conn2 := &tarantool.Connection{} - rr.AddConn(validAddr1, conn1) - rr.AddConn(validAddr1, conn2) + rr.AddConnection(validAddr1, conn1) + rr.AddConnection(validAddr1, conn2) - if rr.DeleteConnById(validAddr1) != conn2 { + if rr.DeleteConnection(validAddr1) != conn2 { t.Errorf("Unexpected deleted connection") } if !rr.IsEmpty() { t.Errorf("RoundRobin does not empty") } - if rr.DeleteConnById(validAddr1) != nil { + if rr.DeleteConnection(validAddr1) != nil { t.Errorf("Unexpected value after second deletion") } } @@ -58,7 +58,7 @@ func TestRoundRobinGetNextConnection(t *testing.T) { conns := []*tarantool.Connection{&tarantool.Connection{}, &tarantool.Connection{}} for i, addr := range addrs { - rr.AddConn(addr, conns[i]) + rr.AddConnection(addr, conns[i]) } expectedConns := []*tarantool.Connection{conns[0], conns[1], conns[0], conns[1]} @@ -76,7 +76,7 @@ func TestRoundRobinStrategy_GetConnections(t *testing.T) { conns := []*tarantool.Connection{&tarantool.Connection{}, &tarantool.Connection{}} for i, addr := range addrs { - rr.AddConn(addr, conns[i]) + rr.AddConnection(addr, conns[i]) } rr.GetConnections()[validAddr2] = conns[0] // GetConnections() returns a copy. diff --git a/queue/example_connection_pool_test.go b/queue/example_connection_pool_test.go index b6e89d3f8..355a491ed 100644 --- a/queue/example_connection_pool_test.go +++ b/queue/example_connection_pool_test.go @@ -44,7 +44,7 @@ func NewQueueConnectionHandler(name string, cfg queue.Cfg) *QueueConnectionHandl // // NOTE: the Queue supports only a master-replica cluster configuration. It // does not support a master-master configuration. -func (h *QueueConnectionHandler) Discovered(id string, conn *tarantool.Connection, +func (h *QueueConnectionHandler) Discovered(name string, conn *tarantool.Connection, role pool.Role) error { h.mutex.Lock() defer h.mutex.Unlock() @@ -106,14 +106,14 @@ func (h *QueueConnectionHandler) Discovered(id string, conn *tarantool.Connectio return h.err } - fmt.Printf("Master %s is ready to work!\n", id) + fmt.Printf("Master %s is ready to work!\n", name) atomic.AddInt32(&h.masterCnt, 1) return nil } // Deactivated doesn't do anything useful for the example. -func (h *QueueConnectionHandler) Deactivated(id string, conn *tarantool.Connection, +func (h *QueueConnectionHandler) Deactivated(name string, conn *tarantool.Connection, role pool.Role) error { if role == pool.MasterRole { atomic.AddInt32(&h.masterCnt, -1) @@ -154,28 +154,33 @@ func Example_connectionPool() { // Create a ConnectionPool object. poolServers := []string{"127.0.0.1:3014", "127.0.0.1:3015"} poolDialers := []tarantool.Dialer{} - poolDialersMap := map[string]tarantool.Dialer{} + poolInstances := []pool.Instance{} - for _, serv := range poolServers { + connOpts := tarantool.Opts{ + Timeout: 5 * time.Second, + } + for _, server := range poolServers { dialer := tarantool.NetDialer{ - Address: serv, + Address: server, User: "test", Password: "test", } poolDialers = append(poolDialers, dialer) - poolDialersMap[serv] = dialer + poolInstances = append(poolInstances, pool.Instance{ + Name: server, + Dialer: dialer, + Opts: connOpts, + }) } - connOpts := tarantool.Opts{ - Timeout: 5 * time.Second, - } + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + poolOpts := pool.Opts{ CheckTimeout: 5 * time.Second, ConnectionHandler: h, } - ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) - defer cancel() - connPool, err := pool.ConnectWithOpts(ctx, poolDialersMap, connOpts, poolOpts) + connPool, err := pool.ConnectWithOpts(ctx, poolInstances, poolOpts) if err != nil { fmt.Printf("Unable to connect to the pool: %s", err) return