Skip to content

Commit

Permalink
pool: add a connection even on connection error
Browse files Browse the repository at this point in the history
From a user's perspective, it is useful to add all target instances
to the pool, even some that are not currently unavailable. This way
the user don’t have to keep track of the list of actually added
instances.

The patch make it possible.

Closes #372
  • Loading branch information
oleg-jukovec committed Jan 30, 2024
1 parent 6ba01ff commit f02579a
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 103 deletions.
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
connection objects (#136). This function now does not attempt to reconnect
and tries to establish a connection only once. Function might be canceled
via context. Context accepted as first argument.
`pool.Connect` and `pool.Add` now accept context as first argument, which
user may cancel in process. If `pool.Connect` is canceled in progress, an
`pool.Connect` and `pool.Add` now accept context as the first argument, which
user may cancel in process. If `pool.Connect` is canceled in progress, an
error will be returned. All created connections will be closed.
- `iproto.Feature` type now used instead of `ProtocolFeature` (#337)
- `iproto.IPROTO_FEATURE_` constants now used instead of local `Feature`
Expand Down Expand Up @@ -95,6 +95,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
- Renamed `StrangerResponse` to `MockResponse` (#237)
- `pool.Connect`, `pool.ConnetcWithOpts` and `pool.Add` use a new type
`pool.Instance` to determinate connection options (#356)
- `pool.Connect`, `pool.ConnectWithOpts` and `pool.Add` add connections to
the pool even it is unable to connect to it (#372)

### Deprecated

Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ The subpackage has been deleted. You could use `pool` instead.
the second argument instead of a list of addresses. Each instance is
associated with a unique string name, `Dialer` and connection options which
allows instances to be independently configured.
* `pool.Connect`, `pool.ConnectWithOpts` and `pool.Add` add instances into
the pool even it is unable to connect to it. The pool will try to connect to
the instance later.
* `pool.Add` now accepts context as the first argument, which user may cancel
in process.
* `pool.Add` now accepts `pool.Instance` as the second argument instead of
Expand Down
109 changes: 42 additions & 67 deletions pool/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ import (
)

var (
ErrEmptyInstances = errors.New("instances (second argument) should not be empty")
ErrWrongCheckTimeout = errors.New("wrong check timeout, must be greater than 0")
ErrNoConnection = errors.New("no active connections")
ErrTooManyArgs = errors.New("too many arguments")
ErrIncorrectResponse = errors.New("incorrect response format")
ErrIncorrectStatus = errors.New("incorrect instance status: status should be `running`")
Expand Down Expand Up @@ -155,9 +153,6 @@ func newEndpoint(name string, dialer tarantool.Dialer, opts tarantool.Opts) *end
// opts. Instances must have unique names.
func ConnectWithOpts(ctx context.Context, instances []Instance,
opts Opts) (*ConnectionPool, error) {
if len(instances) == 0 {
return nil, ErrEmptyInstances
}
unique := make(map[string]bool)
for _, instance := range instances {
if _, ok := unique[instance.Name]; ok {
Expand All @@ -178,28 +173,23 @@ func ConnectWithOpts(ctx context.Context, instances []Instance,
connPool := &ConnectionPool{
ends: make(map[string]*endpoint),
opts: opts,
state: unknownState,
state: connectedState,
done: make(chan struct{}),
rwPool: rwPool,
roPool: roPool,
anyPool: anyPool,
}

somebodyAlive, ctxCanceled := connPool.fillPools(ctx, instances)
if !somebodyAlive {
canceled := connPool.fillPools(ctx, instances)
if canceled {
connPool.state.set(closedState)
if ctxCanceled {
return nil, ErrContextCanceled
}
return nil, ErrNoConnection
return nil, ErrContextCanceled
}

connPool.state.set(connectedState)

for _, s := range connPool.ends {
for _, endpoint := range connPool.ends {
endpointCtx, cancel := context.WithCancel(context.Background())
s.cancel = cancel
go connPool.controller(endpointCtx, s)
endpoint.cancel = cancel
go connPool.controller(endpointCtx, endpoint)
}

return connPool, nil
Expand Down Expand Up @@ -252,8 +242,12 @@ func (p *ConnectionPool) ConfiguredTimeout(mode Mode) (time.Duration, error) {
return conn.ConfiguredTimeout(), nil
}

// Add adds a new instance into the pool. This function adds the instance
// only after successful connection.
// Add adds a new instance into the pool. The pool will try to connect to the
// instance later if it is unable to establish a connection.
//
// The function may return an error and don't add the instance into the pool
// if the context has been cancelled or on concurrent Close()/CloseGraceful()
// call.
func (p *ConnectionPool) Add(ctx context.Context, instance Instance) error {
e := newEndpoint(instance.Name, instance.Dialer, instance.Opts)

Expand All @@ -268,19 +262,34 @@ func (p *ConnectionPool) Add(ctx context.Context, instance Instance) error {
return ErrExists
}

endpointCtx, cancel := context.WithCancel(context.Background())
e.cancel = cancel
endpointCtx, endpointCancel := context.WithCancel(context.Background())
connectCtx, connectCancel := context.WithCancel(ctx)
e.cancel = func() {
connectCancel()
endpointCancel()
}

p.ends[instance.Name] = e
p.endsMutex.Unlock()

if err := p.tryConnect(ctx, e); err != nil {
p.endsMutex.Lock()
delete(p.ends, instance.Name)
p.endsMutex.Unlock()
e.cancel()
close(e.closed)
return err
if err := p.tryConnect(connectCtx, e); err != nil {
var canceled bool
select {
case <-connectCtx.Done():
canceled = true
case <-endpointCtx.Done():
canceled = true
default:
canceled = false
}
if canceled {
p.endsMutex.Lock()
delete(p.ends, instance.Name)
p.endsMutex.Unlock()
e.cancel()
close(e.closed)
return err
}
}

go p.controller(endpointCtx, e)
Expand Down Expand Up @@ -1145,64 +1154,30 @@ func (p *ConnectionPool) deactivateConnections() {
}
}

func (p *ConnectionPool) processConnection(conn *tarantool.Connection,
name string, end *endpoint) bool {
role, err := p.getConnectionRole(conn)
if err != nil {
conn.Close()
log.Printf("tarantool: storing connection to %s failed: %s\n", name, err)
return false
}

if !p.handlerDiscovered(name, conn, role) {
conn.Close()
return false
}
if p.addConnection(name, conn, role) != nil {
conn.Close()
p.handlerDeactivated(name, conn, role)
return false
}

end.conn = conn
end.role = role
return true
}

func (p *ConnectionPool) fillPools(ctx context.Context,
instances []Instance) (bool, bool) {
somebodyAlive := false
ctxCanceled := false

func (p *ConnectionPool) fillPools(ctx context.Context, instances []Instance) bool {
// It is called before controller() goroutines, so we don't expect
// concurrency issues here.
for _, instance := range instances {
end := newEndpoint(instance.Name, instance.Dialer, instance.Opts)
p.ends[instance.Name] = end
connOpts := instance.Opts
connOpts.Notify = end.notify
conn, err := tarantool.Connect(ctx, instance.Dialer, connOpts)
if err != nil {

if err := p.tryConnect(ctx, end); err != nil {
log.Printf("tarantool: connect to %s failed: %s\n",
instance.Name, err)
select {
case <-ctx.Done():
ctxCanceled = true

p.ends[instance.Name] = nil
log.Printf("tarantool: operation was canceled")

p.deactivateConnections()

return false, ctxCanceled
return true
default:
}
} else if p.processConnection(conn, instance.Name, end) {
somebodyAlive = true
}
}

return somebodyAlive, ctxCanceled
return false
}

func (p *ConnectionPool) updateConnection(e *endpoint) {
Expand Down
Loading

0 comments on commit f02579a

Please sign in to comment.