diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c9def351..1d9aa2e1d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,8 +52,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. connection objects (#136). This function now does not attempt to reconnect and tries to establish a connection only once. Function might be canceled via context. Context accepted as first argument. - `pool.Connect` and `pool.Add` now accept context as first argument, which - user may cancel in process. If `pool.Connect` is canceled in progress, an + `pool.Connect` now accept context as first argument, which user may cancel + in process. If `pool.Connect` is canceled in progress, an error will be returned. All created connections will be closed. - `iproto.Feature` type now used instead of `ProtocolFeature` (#337) - `iproto.IPROTO_FEATURE_` constants now used instead of local `Feature` @@ -95,6 +95,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. - Renamed `StrangerResponse` to `MockResponse` (#237) - `pool.Connect`, `pool.ConnetcWithOpts` and `pool.Add` use a new type `pool.Instance` to determinate connection options (#356) +- `pool.Connect`, `pool.ConnectWithOpts` and `pool.Add` add connections to + the pool even it is unable to connect to it (#372) ### Deprecated diff --git a/README.md b/README.md index f37b3be64..23c81b26c 100644 --- a/README.md +++ b/README.md @@ -199,9 +199,10 @@ The subpackage has been deleted. You could use `pool` instead. the second argument instead of a list of addresses. Each instance is associated with a unique string name, `Dialer` and connection options which allows instances to be independently configured. -* `pool.Add` now accepts context as the first argument, which user may cancel - in process. -* `pool.Add` now accepts `pool.Instance` as the second argument instead of +* `pool.Connect`, `pool.ConnectWithOpts` and `pool.Add` add instances into + the pool even it is unable to connect to it. The pool will try to connect to + the instance later. +* `pool.Add` now accepts `pool.Instance` as the first argument instead of an address, it allows to configure a new instance more flexible. * `pool.GetPoolInfo` has been renamed to `pool.GetInfo`. Return type has been changed to `map[string]ConnectionInfo`. diff --git a/pool/connection_pool.go b/pool/connection_pool.go index 7dd6e0c46..3e64bbfff 100644 --- a/pool/connection_pool.go +++ b/pool/connection_pool.go @@ -24,9 +24,7 @@ import ( ) var ( - ErrEmptyInstances = errors.New("instances (second argument) should not be empty") ErrWrongCheckTimeout = errors.New("wrong check timeout, must be greater than 0") - ErrNoConnection = errors.New("no active connections") ErrTooManyArgs = errors.New("too many arguments") ErrIncorrectResponse = errors.New("incorrect response format") ErrIncorrectStatus = errors.New("incorrect instance status: status should be `running`") @@ -155,9 +153,6 @@ func newEndpoint(name string, dialer tarantool.Dialer, opts tarantool.Opts) *end // opts. Instances must have unique names. func ConnectWithOpts(ctx context.Context, instances []Instance, opts Opts) (*ConnectionPool, error) { - if len(instances) == 0 { - return nil, ErrEmptyInstances - } unique := make(map[string]bool) for _, instance := range instances { if _, ok := unique[instance.Name]; ok { @@ -185,13 +180,10 @@ func ConnectWithOpts(ctx context.Context, instances []Instance, anyPool: anyPool, } - somebodyAlive, ctxCanceled := connPool.fillPools(ctx, instances) - if !somebodyAlive { + canceled := connPool.fillPools(ctx, instances) + if canceled { connPool.state.set(closedState) - if ctxCanceled { - return nil, ErrContextCanceled - } - return nil, ErrNoConnection + return nil, ErrContextCanceled } connPool.state.set(connectedState) @@ -252,9 +244,9 @@ func (p *ConnectionPool) ConfiguredTimeout(mode Mode) (time.Duration, error) { return conn.ConfiguredTimeout(), nil } -// Add adds a new instance into the pool. This function adds the instance -// only after successful connection. -func (p *ConnectionPool) Add(ctx context.Context, instance Instance) error { +// Add adds a new instance into the pool. The pool will try to connect to the +// instance later if it is unable to establish a connection. +func (p *ConnectionPool) Add(instance Instance) error { e := newEndpoint(instance.Name, instance.Dialer, instance.Opts) p.endsMutex.Lock() @@ -274,14 +266,9 @@ func (p *ConnectionPool) Add(ctx context.Context, instance Instance) error { p.ends[instance.Name] = e p.endsMutex.Unlock() - if err := p.tryConnect(ctx, e); err != nil { - p.endsMutex.Lock() - delete(p.ends, instance.Name) - p.endsMutex.Unlock() - e.cancel() - close(e.closed) - return err - } + // The result does not matter, but we should try to connect to the instance + // as fast as possible. + _ = p.tryConnect(endpointCtx, e) go p.controller(endpointCtx, e) return nil @@ -1169,11 +1156,7 @@ func (p *ConnectionPool) processConnection(conn *tarantool.Connection, return true } -func (p *ConnectionPool) fillPools(ctx context.Context, - instances []Instance) (bool, bool) { - somebodyAlive := false - ctxCanceled := false - +func (p *ConnectionPool) fillPools(ctx context.Context, instances []Instance) bool { // It is called before controller() goroutines, so we don't expect // concurrency issues here. for _, instance := range instances { @@ -1187,22 +1170,20 @@ func (p *ConnectionPool) fillPools(ctx context.Context, instance.Name, err) select { case <-ctx.Done(): - ctxCanceled = true - p.ends[instance.Name] = nil log.Printf("tarantool: operation was canceled") p.deactivateConnections() - return false, ctxCanceled + return true default: } - } else if p.processConnection(conn, instance.Name, end) { - somebodyAlive = true + } else { + p.processConnection(conn, instance.Name, end) } } - return somebodyAlive, ctxCanceled + return false } func (p *ConnectionPool) updateConnection(e *endpoint) { diff --git a/pool/connection_pool_test.go b/pool/connection_pool_test.go index ea19225c8..f6f34fdf6 100644 --- a/pool/connection_pool_test.go +++ b/pool/connection_pool_test.go @@ -86,22 +86,6 @@ var defaultTimeoutRetry = 500 * time.Millisecond var helpInstances []test_helpers.TarantoolInstance -func TestConnect_error_empty_instances(t *testing.T) { - ctx, cancel := test_helpers.GetPoolConnectContext() - connPool, err := pool.Connect(ctx, []pool.Instance{}) - cancel() - require.Nilf(t, connPool, "conn is not nil with incorrect param") - require.ErrorIs(t, err, pool.ErrEmptyInstances) -} - -func TestConnect_error_unavailable(t *testing.T) { - ctx, cancel := test_helpers.GetPoolConnectContext() - connPool, err := pool.Connect(ctx, makeInstances([]string{"err1", "err2"}, connOpts)) - cancel() - require.Nilf(t, connPool, "conn is not nil with incorrect param") - require.ErrorIs(t, err, pool.ErrNoConnection) -} - func TestConnect_error_duplicate(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() connPool, err := pool.Connect(ctx, makeInstances([]string{"foo", "foo"}, connOpts)) @@ -145,6 +129,48 @@ func TestConnSuccessfully(t *testing.T) { require.Nil(t, err) } +func TestConnect_empty(t *testing.T) { + cases := []struct { + Name string + Instances []pool.Instance + }{ + {"nil", nil}, + {"empty", []pool.Instance{}}, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + ctx, cancel := test_helpers.GetPoolConnectContext() + defer cancel() + connPool, err := pool.Connect(ctx, tc.Instances) + if connPool != nil { + defer connPool.Close() + } + require.NoError(t, err, "failed to create a pool") + require.NotNilf(t, connPool, "pool is nil after Connect") + require.Lenf(t, connPool.GetInfo(), 0, "empty pool expected") + }) + } +} + +func TestConnect_unavailable(t *testing.T) { + servers := []string{"err1", "err2"} + ctx, cancel := test_helpers.GetPoolConnectContext() + connPool, err := pool.Connect(ctx, makeInstances([]string{"err1", "err2"}, connOpts)) + cancel() + + if connPool != nil { + defer connPool.Close() + } + + require.NoError(t, err, "failed to create a pool") + require.NotNilf(t, connPool, "pool is nil after Connect") + require.Equal(t, map[string]pool.ConnectionInfo{ + servers[0]: pool.ConnectionInfo{ConnectedNow: false, ConnRole: pool.UnknownRole}, + servers[1]: pool.ConnectionInfo{ConnectedNow: false, ConnRole: pool.UnknownRole}, + }, connPool.GetInfo()) +} + func TestConnErrorAfterCtxCancel(t *testing.T) { var connLongReconnectOpts = tarantool.Opts{ Timeout: 5 * time.Second, @@ -410,16 +436,14 @@ func TestDisconnectAll(t *testing.T) { func TestAdd(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() - connPool, err := pool.Connect(ctx, makeInstances(servers[:1], connOpts)) + connPool, err := pool.Connect(ctx, []pool.Instance{}) require.Nilf(t, err, "failed to connect") require.NotNilf(t, connPool, "conn is nil after Connect") defer connPool.Close() - for _, server := range servers[1:] { - ctx, cancel := test_helpers.GetConnectContext() - err = connPool.Add(ctx, makeInstance(server, connOpts)) - cancel() + for _, server := range servers { + err = connPool.Add(makeInstance(server, connOpts)) require.Nil(t, err) } @@ -452,9 +476,7 @@ func TestAdd_exist(t *testing.T) { defer connPool.Close() - ctx, cancel = test_helpers.GetConnectContext() - err = connPool.Add(ctx, makeInstance(server, connOpts)) - cancel() + err = connPool.Add(makeInstance(server, connOpts)) require.Equal(t, pool.ErrExists, err) args := test_helpers.CheckStatusesArgs{ @@ -484,17 +506,14 @@ func TestAdd_unreachable(t *testing.T) { defer connPool.Close() unhealthyServ := "127.0.0.2:6667" - ctx, cancel = test_helpers.GetConnectContext() - err = connPool.Add(ctx, pool.Instance{ + err = connPool.Add(pool.Instance{ Name: unhealthyServ, Dialer: tarantool.NetDialer{ Address: unhealthyServ, }, Opts: connOpts, }) - cancel() - // The OS-dependent error so we just check for existence. - require.NotNil(t, err) + require.NoError(t, err) args := test_helpers.CheckStatusesArgs{ ConnPool: connPool, @@ -502,7 +521,8 @@ func TestAdd_unreachable(t *testing.T) { Servers: servers, ExpectedPoolStatus: true, ExpectedStatuses: map[string]bool{ - server: true, + server: true, + unhealthyServ: false, }, } @@ -520,9 +540,7 @@ func TestAdd_afterClose(t *testing.T) { require.NotNilf(t, connPool, "conn is nil after Connect") connPool.Close() - ctx, cancel = test_helpers.GetConnectContext() - err = connPool.Add(ctx, makeInstance(server, connOpts)) - cancel() + err = connPool.Add(makeInstance(server, connOpts)) assert.Equal(t, err, pool.ErrClosed) } @@ -541,9 +559,7 @@ func TestAdd_Close_concurrent(t *testing.T) { go func() { defer wg.Done() - ctx, cancel := test_helpers.GetConnectContext() - err = connPool.Add(ctx, makeInstance(serv1, connOpts)) - cancel() + err = connPool.Add(makeInstance(serv1, connOpts)) if err != nil { assert.Equal(t, pool.ErrClosed, err) } @@ -569,9 +585,7 @@ func TestAdd_CloseGraceful_concurrent(t *testing.T) { go func() { defer wg.Done() - ctx, cancel := test_helpers.GetConnectContext() - err = connPool.Add(ctx, makeInstance(serv1, connOpts)) - cancel() + err = connPool.Add(makeInstance(serv1, connOpts)) if err != nil { assert.Equal(t, pool.ErrClosed, err) } @@ -1028,7 +1042,12 @@ func TestConnectionHandlerOpenError(t *testing.T) { if err == nil { defer connPool.Close() } - require.NotNilf(t, err, "success to connect") + require.NoError(t, err, "failed to connect") + require.NotNil(t, connPool, "pool expected") + require.Equal(t, map[string]pool.ConnectionInfo{ + servers[0]: pool.ConnectionInfo{ConnectedNow: false, ConnRole: pool.UnknownRole}, + servers[1]: pool.ConnectionInfo{ConnectedNow: false, ConnRole: pool.UnknownRole}, + }, connPool.GetInfo()) require.Equalf(t, 2, h.discovered, "unexpected discovered count") require.Equalf(t, 0, h.deactivated, "unexpected deactivated count") }