Skip to content

Commit

Permalink
pool: add a connection even on connection error
Browse files Browse the repository at this point in the history
From a user's perspective, it is useful to add all target instances
to the pool, even some that are not currently unavailable. This way
the user don’t have to keep track of the list of actually added
instances.

The patch make it possible.

Closes #372
  • Loading branch information
oleg-jukovec committed Jan 26, 2024
1 parent b8d9914 commit 50dc5d0
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 78 deletions.
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
connection objects (#136). This function now does not attempt to reconnect
and tries to establish a connection only once. Function might be canceled
via context. Context accepted as first argument.
`pool.Connect` and `pool.Add` now accept context as first argument, which
user may cancel in process. If `pool.Connect` is canceled in progress, an
`pool.Connect` now accept context as first argument, which user may cancel
in process. If `pool.Connect` is canceled in progress, an
error will be returned. All created connections will be closed.
- `iproto.Feature` type now used instead of `ProtocolFeature` (#337)
- `iproto.IPROTO_FEATURE_` constants now used instead of local `Feature`
Expand Down Expand Up @@ -95,6 +95,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
- Renamed `StrangerResponse` to `MockResponse` (#237)
- `pool.Connect`, `pool.ConnetcWithOpts` and `pool.Add` use a new type
`pool.Instance` to determinate connection options (#356)
- `pool.Connect`, `pool.ConnectWithOpts` and `pool.Add` add connections to
the pool even it is unable to connect to it (#372)

### Deprecated

Expand Down
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,10 @@ The subpackage has been deleted. You could use `pool` instead.
the second argument instead of a list of addresses. Each instance is
associated with a unique string name, `Dialer` and connection options which
allows instances to be independently configured.
* `pool.Add` now accepts context as the first argument, which user may cancel
in process.
* `pool.Add` now accepts `pool.Instance` as the second argument instead of
* `pool.Connect`, `pool.ConnectWithOpts` and `pool.Add` add instances into
the pool even it is unable to connect to it. The pool will try to connect to
the instance later.
* `pool.Add` now accepts `pool.Instance` as the first argument instead of
an address, it allows to configure a new instance more flexible.
* `pool.GetPoolInfo` has been renamed to `pool.GetInfo`. Return type has been
changed to `map[string]ConnectionInfo`.
Expand Down
47 changes: 14 additions & 33 deletions pool/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ import (
)

var (
ErrEmptyInstances = errors.New("instances (second argument) should not be empty")
ErrWrongCheckTimeout = errors.New("wrong check timeout, must be greater than 0")
ErrNoConnection = errors.New("no active connections")
ErrTooManyArgs = errors.New("too many arguments")
ErrIncorrectResponse = errors.New("incorrect response format")
ErrIncorrectStatus = errors.New("incorrect instance status: status should be `running`")
Expand Down Expand Up @@ -155,9 +153,6 @@ func newEndpoint(name string, dialer tarantool.Dialer, opts tarantool.Opts) *end
// opts. Instances must have unique names.
func ConnectWithOpts(ctx context.Context, instances []Instance,
opts Opts) (*ConnectionPool, error) {
if len(instances) == 0 {
return nil, ErrEmptyInstances
}
unique := make(map[string]bool)
for _, instance := range instances {
if _, ok := unique[instance.Name]; ok {
Expand Down Expand Up @@ -185,13 +180,10 @@ func ConnectWithOpts(ctx context.Context, instances []Instance,
anyPool: anyPool,
}

somebodyAlive, ctxCanceled := connPool.fillPools(ctx, instances)
if !somebodyAlive {
canceled := connPool.fillPools(ctx, instances)
if canceled {
connPool.state.set(closedState)
if ctxCanceled {
return nil, ErrContextCanceled
}
return nil, ErrNoConnection
return nil, ErrContextCanceled
}

connPool.state.set(connectedState)
Expand Down Expand Up @@ -252,9 +244,9 @@ func (p *ConnectionPool) ConfiguredTimeout(mode Mode) (time.Duration, error) {
return conn.ConfiguredTimeout(), nil
}

// Add adds a new instance into the pool. This function adds the instance
// only after successful connection.
func (p *ConnectionPool) Add(ctx context.Context, instance Instance) error {
// Add adds a new instance into the pool. The pool will try to connect to the
// instance later if it is unable to establish a connection.
func (p *ConnectionPool) Add(instance Instance) error {
e := newEndpoint(instance.Name, instance.Dialer, instance.Opts)

p.endsMutex.Lock()
Expand All @@ -274,14 +266,9 @@ func (p *ConnectionPool) Add(ctx context.Context, instance Instance) error {
p.ends[instance.Name] = e
p.endsMutex.Unlock()

if err := p.tryConnect(ctx, e); err != nil {
p.endsMutex.Lock()
delete(p.ends, instance.Name)
p.endsMutex.Unlock()
e.cancel()
close(e.closed)
return err
}
// The result does not matter, but we should try to connect to the instance
// as fast as possible.
_ = p.tryConnect(endpointCtx, e)

go p.controller(endpointCtx, e)
return nil
Expand Down Expand Up @@ -1169,11 +1156,7 @@ func (p *ConnectionPool) processConnection(conn *tarantool.Connection,
return true
}

func (p *ConnectionPool) fillPools(ctx context.Context,
instances []Instance) (bool, bool) {
somebodyAlive := false
ctxCanceled := false

func (p *ConnectionPool) fillPools(ctx context.Context, instances []Instance) bool {
// It is called before controller() goroutines, so we don't expect
// concurrency issues here.
for _, instance := range instances {
Expand All @@ -1187,22 +1170,20 @@ func (p *ConnectionPool) fillPools(ctx context.Context,
instance.Name, err)
select {
case <-ctx.Done():
ctxCanceled = true

p.ends[instance.Name] = nil
log.Printf("tarantool: operation was canceled")

p.deactivateConnections()

return false, ctxCanceled
return true
default:
}
} else if p.processConnection(conn, instance.Name, end) {
somebodyAlive = true
} else {
p.processConnection(conn, instance.Name, end)
}
}

return somebodyAlive, ctxCanceled
return false
}

func (p *ConnectionPool) updateConnection(e *endpoint) {
Expand Down
99 changes: 59 additions & 40 deletions pool/connection_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,22 +86,6 @@ var defaultTimeoutRetry = 500 * time.Millisecond

var helpInstances []test_helpers.TarantoolInstance

func TestConnect_error_empty_instances(t *testing.T) {
ctx, cancel := test_helpers.GetPoolConnectContext()
connPool, err := pool.Connect(ctx, []pool.Instance{})
cancel()
require.Nilf(t, connPool, "conn is not nil with incorrect param")
require.ErrorIs(t, err, pool.ErrEmptyInstances)
}

func TestConnect_error_unavailable(t *testing.T) {
ctx, cancel := test_helpers.GetPoolConnectContext()
connPool, err := pool.Connect(ctx, makeInstances([]string{"err1", "err2"}, connOpts))
cancel()
require.Nilf(t, connPool, "conn is not nil with incorrect param")
require.ErrorIs(t, err, pool.ErrNoConnection)
}

func TestConnect_error_duplicate(t *testing.T) {
ctx, cancel := test_helpers.GetPoolConnectContext()
connPool, err := pool.Connect(ctx, makeInstances([]string{"foo", "foo"}, connOpts))
Expand Down Expand Up @@ -145,6 +129,48 @@ func TestConnSuccessfully(t *testing.T) {
require.Nil(t, err)
}

func TestConnect_empty(t *testing.T) {
cases := []struct {
Name string
Instances []pool.Instance
}{
{"nil", nil},
{"empty", []pool.Instance{}},
}

for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
ctx, cancel := test_helpers.GetPoolConnectContext()
defer cancel()
connPool, err := pool.Connect(ctx, tc.Instances)
if connPool != nil {
defer connPool.Close()
}
require.NoError(t, err, "failed to create a pool")
require.NotNilf(t, connPool, "pool is nil after Connect")
require.Lenf(t, connPool.GetInfo(), 0, "empty pool expected")
})
}
}

func TestConnect_unavailable(t *testing.T) {
servers := []string{"err1", "err2"}
ctx, cancel := test_helpers.GetPoolConnectContext()
connPool, err := pool.Connect(ctx, makeInstances([]string{"err1", "err2"}, connOpts))
cancel()

if connPool != nil {
defer connPool.Close()
}

require.NoError(t, err, "failed to create a pool")
require.NotNilf(t, connPool, "pool is nil after Connect")
require.Equal(t, map[string]pool.ConnectionInfo{
servers[0]: pool.ConnectionInfo{ConnectedNow: false, ConnRole: pool.UnknownRole},
servers[1]: pool.ConnectionInfo{ConnectedNow: false, ConnRole: pool.UnknownRole},
}, connPool.GetInfo())
}

func TestConnErrorAfterCtxCancel(t *testing.T) {
var connLongReconnectOpts = tarantool.Opts{
Timeout: 5 * time.Second,
Expand Down Expand Up @@ -410,16 +436,14 @@ func TestDisconnectAll(t *testing.T) {
func TestAdd(t *testing.T) {
ctx, cancel := test_helpers.GetPoolConnectContext()
defer cancel()
connPool, err := pool.Connect(ctx, makeInstances(servers[:1], connOpts))
connPool, err := pool.Connect(ctx, []pool.Instance{})
require.Nilf(t, err, "failed to connect")
require.NotNilf(t, connPool, "conn is nil after Connect")

defer connPool.Close()

for _, server := range servers[1:] {
ctx, cancel := test_helpers.GetConnectContext()
err = connPool.Add(ctx, makeInstance(server, connOpts))
cancel()
for _, server := range servers {
err = connPool.Add(makeInstance(server, connOpts))
require.Nil(t, err)
}

Expand Down Expand Up @@ -452,9 +476,7 @@ func TestAdd_exist(t *testing.T) {

defer connPool.Close()

ctx, cancel = test_helpers.GetConnectContext()
err = connPool.Add(ctx, makeInstance(server, connOpts))
cancel()
err = connPool.Add(makeInstance(server, connOpts))
require.Equal(t, pool.ErrExists, err)

args := test_helpers.CheckStatusesArgs{
Expand Down Expand Up @@ -484,25 +506,23 @@ func TestAdd_unreachable(t *testing.T) {
defer connPool.Close()

unhealthyServ := "127.0.0.2:6667"
ctx, cancel = test_helpers.GetConnectContext()
err = connPool.Add(ctx, pool.Instance{
err = connPool.Add(pool.Instance{
Name: unhealthyServ,
Dialer: tarantool.NetDialer{
Address: unhealthyServ,
},
Opts: connOpts,
})
cancel()
// The OS-dependent error so we just check for existence.
require.NotNil(t, err)
require.NoError(t, err)

args := test_helpers.CheckStatusesArgs{
ConnPool: connPool,
Mode: pool.ANY,
Servers: servers,
ExpectedPoolStatus: true,
ExpectedStatuses: map[string]bool{
server: true,
server: true,
unhealthyServ: false,
},
}

Expand All @@ -520,9 +540,7 @@ func TestAdd_afterClose(t *testing.T) {
require.NotNilf(t, connPool, "conn is nil after Connect")

connPool.Close()
ctx, cancel = test_helpers.GetConnectContext()
err = connPool.Add(ctx, makeInstance(server, connOpts))
cancel()
err = connPool.Add(makeInstance(server, connOpts))
assert.Equal(t, err, pool.ErrClosed)
}

Expand All @@ -541,9 +559,7 @@ func TestAdd_Close_concurrent(t *testing.T) {
go func() {
defer wg.Done()

ctx, cancel := test_helpers.GetConnectContext()
err = connPool.Add(ctx, makeInstance(serv1, connOpts))
cancel()
err = connPool.Add(makeInstance(serv1, connOpts))
if err != nil {
assert.Equal(t, pool.ErrClosed, err)
}
Expand All @@ -569,9 +585,7 @@ func TestAdd_CloseGraceful_concurrent(t *testing.T) {
go func() {
defer wg.Done()

ctx, cancel := test_helpers.GetConnectContext()
err = connPool.Add(ctx, makeInstance(serv1, connOpts))
cancel()
err = connPool.Add(makeInstance(serv1, connOpts))
if err != nil {
assert.Equal(t, pool.ErrClosed, err)
}
Expand Down Expand Up @@ -1028,7 +1042,12 @@ func TestConnectionHandlerOpenError(t *testing.T) {
if err == nil {
defer connPool.Close()
}
require.NotNilf(t, err, "success to connect")
require.NoError(t, err, "failed to connect")
require.NotNil(t, connPool, "pool expected")
require.Equal(t, map[string]pool.ConnectionInfo{
servers[0]: pool.ConnectionInfo{ConnectedNow: false, ConnRole: pool.UnknownRole},
servers[1]: pool.ConnectionInfo{ConnectedNow: false, ConnRole: pool.UnknownRole},
}, connPool.GetInfo())
require.Equalf(t, 2, h.discovered, "unexpected discovered count")
require.Equalf(t, 0, h.deactivated, "unexpected deactivated count")
}
Expand Down

0 comments on commit 50dc5d0

Please sign in to comment.