From 17fb94d8d959f986be6567bc59dc422bc11e8b21 Mon Sep 17 00:00:00 2001
From: Oleg Jukovec <oleg.jukovec@tarantool.org>
Date: Tue, 23 Aug 2022 17:59:17 +0300
Subject: [PATCH 1/4] api: add Execute methods to ConnectionPool

The patch adds missed Execute, ExecuteTyped and ExecuteAsync methods
to the ConnectionPool type.

Part of #176
---
 CHANGELOG.md                              |  1 +
 connection_pool/connection_pool.go        | 31 ++++++++++++
 connection_pool/connection_pool_test.go   | 61 +++++++++++++++++++++++
 connection_pool/const.go                  |  1 +
 connection_pool/msgpack_helper_test.go    | 10 ++++
 connection_pool/msgpack_v5_helper_test.go | 10 ++++
 6 files changed, 114 insertions(+)
 create mode 100644 connection_pool/msgpack_helper_test.go
 create mode 100644 connection_pool/msgpack_v5_helper_test.go

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 333c09554..002bd758a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -13,6 +13,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
 - Support queue 1.2.0 (#177)
 - ConnectionHandler interface for handling changes of connections in
   ConnectionPool (#178)
+- Execute, ExecuteTyped and ExecuteAsync methods to ConnectionPool (#176)
 
 ### Changed
 
diff --git a/connection_pool/connection_pool.go b/connection_pool/connection_pool.go
index 79667ae7c..d9d41edef 100644
--- a/connection_pool/connection_pool.go
+++ b/connection_pool/connection_pool.go
@@ -385,6 +385,16 @@ func (connPool *ConnectionPool) Eval(expr string, args interface{}, userMode Mod
 	return conn.Eval(expr, args)
 }
 
+// Execute passes sql expression to Tarantool for execution.
+func (connPool *ConnectionPool) Execute(expr string, args interface{}, userMode Mode) (resp *tarantool.Response, err error) {
+	conn, err := connPool.getNextConnection(userMode)
+	if err != nil {
+		return nil, err
+	}
+
+	return conn.Execute(expr, args)
+}
+
 // GetTyped performs select (with limit = 1 and offset = 0)
 // to box space and fills typed result.
 func (connPool *ConnectionPool) GetTyped(space, index interface{}, key interface{}, result interface{}, userMode ...Mode) (err error) {
@@ -495,6 +505,16 @@ func (connPool *ConnectionPool) EvalTyped(expr string, args interface{}, result
 	return conn.EvalTyped(expr, args, result)
 }
 
+// ExecuteTyped passes sql expression to Tarantool for execution.
+func (connPool *ConnectionPool) ExecuteTyped(expr string, args interface{}, result interface{}, userMode Mode) (tarantool.SQLInfo, []tarantool.ColumnMetaData, error) {
+	conn, err := connPool.getNextConnection(userMode)
+	if err != nil {
+		return tarantool.SQLInfo{}, nil, err
+	}
+
+	return conn.ExecuteTyped(expr, args, result)
+}
+
 // SelectAsync sends select request to Tarantool and returns Future.
 func (connPool *ConnectionPool) SelectAsync(space, index interface{}, offset, limit, iterator uint32, key interface{}, userMode ...Mode) *tarantool.Future {
 	conn, err := connPool.getConnByMode(ANY, userMode)
@@ -607,6 +627,17 @@ func (connPool *ConnectionPool) EvalAsync(expr string, args interface{}, userMod
 	return conn.EvalAsync(expr, args)
 }
 
+// ExecuteAsync sends sql expression to Tarantool for execution and returns
+// Future.
+func (connPool *ConnectionPool) ExecuteAsync(expr string, args interface{}, userMode Mode) *tarantool.Future {
+	conn, err := connPool.getNextConnection(userMode)
+	if err != nil {
+		return newErrorFuture(err)
+	}
+
+	return conn.ExecuteAsync(expr, args)
+}
+
 // Do sends the request and returns a future.
 // For requests that belong to an only one connection (e.g. Unprepare or ExecutePrepared)
 // the argument of type Mode is unused.
diff --git a/connection_pool/connection_pool_test.go b/connection_pool/connection_pool_test.go
index a41495d8d..b3aee615d 100644
--- a/connection_pool/connection_pool_test.go
+++ b/connection_pool/connection_pool_test.go
@@ -642,6 +642,67 @@ func TestEval(t *testing.T) {
 	require.Falsef(t, val, "expected `false` with mode `RW`")
 }
 
+type Member struct {
+	id  uint
+	val string
+}
+
+func (m *Member) DecodeMsgpack(d *decoder) error {
+	var err error
+	var l int
+	if l, err = d.DecodeArrayLen(); err != nil {
+		return err
+	}
+	if l != 2 {
+		return fmt.Errorf("array len doesn't match: %d", l)
+	}
+	if m.id, err = d.DecodeUint(); err != nil {
+		return err
+	}
+	if m.val, err = d.DecodeString(); err != nil {
+		return err
+	}
+	return nil
+}
+
+func TestExecute(t *testing.T) {
+	test_helpers.SkipIfSQLUnsupported(t)
+
+	roles := []bool{false, true, false, false, true}
+
+	err := test_helpers.SetClusterRO(servers, connOpts, roles)
+	require.Nilf(t, err, "fail to set roles for cluster")
+
+	connPool, err := connection_pool.Connect(servers, connOpts)
+	require.Nilf(t, err, "failed to connect")
+	require.NotNilf(t, connPool, "conn is nil after Connect")
+
+	defer connPool.Close()
+
+	request := "SELECT NAME0, NAME1 FROM SQL_TEST WHERE NAME0 == 1;"
+	// Execute
+	resp, err := connPool.Execute(request, []interface{}{}, connection_pool.ANY)
+	require.Nilf(t, err, "failed to Execute")
+	require.NotNilf(t, resp, "response is nil after Execute")
+	require.GreaterOrEqualf(t, len(resp.Data), 1, "response.Data is empty after Execute")
+	require.Equalf(t, len(resp.Data[0].([]interface{})), 2, "unexpected response")
+
+	// ExecuteTyped
+	mem := []Member{}
+	info, _, err := connPool.ExecuteTyped(request, []interface{}{}, &mem, connection_pool.ANY)
+	require.Nilf(t, err, "failed to ExecuteTyped")
+	require.Equalf(t, info.AffectedCount, uint64(0), "unexpected info.AffectedCount")
+	require.Equalf(t, len(mem), 1, "wrong count of results")
+
+	// ExecuteAsync
+	fut := connPool.ExecuteAsync(request, []interface{}{}, connection_pool.ANY)
+	resp, err = fut.Get()
+	require.Nilf(t, err, "failed to ExecuteAsync")
+	require.NotNilf(t, resp, "response is nil after ExecuteAsync")
+	require.GreaterOrEqualf(t, len(resp.Data), 1, "response.Data is empty after ExecuteAsync")
+	require.Equalf(t, len(resp.Data[0].([]interface{})), 2, "unexpected response")
+}
+
 func TestRoundRobinStrategy(t *testing.T) {
 	roles := []bool{false, true, false, false, true}
 
diff --git a/connection_pool/const.go b/connection_pool/const.go
index d77a55044..26b028f5a 100644
--- a/connection_pool/const.go
+++ b/connection_pool/const.go
@@ -7,6 +7,7 @@ Default mode for each request table:
 	---------- --------------
 	| call    | no default  |
 	| eval    | no default  |
+	| execute | no default  |
 	| ping    | no default  |
 	| insert  | RW          |
 	| delete  | RW          |
diff --git a/connection_pool/msgpack_helper_test.go b/connection_pool/msgpack_helper_test.go
new file mode 100644
index 000000000..d60c7d84d
--- /dev/null
+++ b/connection_pool/msgpack_helper_test.go
@@ -0,0 +1,10 @@
+//go:build !go_tarantool_msgpack_v5
+// +build !go_tarantool_msgpack_v5
+
+package connection_pool_test
+
+import (
+	"gopkg.in/vmihailenco/msgpack.v2"
+)
+
+type decoder = msgpack.Decoder
diff --git a/connection_pool/msgpack_v5_helper_test.go b/connection_pool/msgpack_v5_helper_test.go
new file mode 100644
index 000000000..7c449bec5
--- /dev/null
+++ b/connection_pool/msgpack_v5_helper_test.go
@@ -0,0 +1,10 @@
+//go:build go_tarantool_msgpack_v5
+// +build go_tarantool_msgpack_v5
+
+package connection_pool_test
+
+import (
+	"github.com/vmihailenco/msgpack/v5"
+)
+
+type decoder = msgpack.Decoder

From bf84c17a608c2b388b9d936af4b67f49863f3fb5 Mon Sep 17 00:00:00 2001
From: Oleg Jukovec <oleg.jukovec@tarantool.org>
Date: Tue, 23 Aug 2022 19:09:16 +0300
Subject: [PATCH 2/4] api: add ConnectorAdapter to connection_pool

ConnectorAdapter allows to use ConnectionPool as Connector. All
requests to a pool will be executed in a specified mode.

Part of #176
---
 CHANGELOG.md                       |    1 +
 connection_pool/connection_pool.go |    2 +
 connection_pool/connector.go       |  305 ++++++++
 connection_pool/connector_test.go  | 1168 ++++++++++++++++++++++++++++
 connection_pool/example_test.go    |   21 +
 connection_pool/pooler.go          |   89 +++
 6 files changed, 1586 insertions(+)
 create mode 100644 connection_pool/connector.go
 create mode 100644 connection_pool/connector_test.go
 create mode 100644 connection_pool/pooler.go

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 002bd758a..9e0303af8 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -14,6 +14,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
 - ConnectionHandler interface for handling changes of connections in
   ConnectionPool (#178)
 - Execute, ExecuteTyped and ExecuteAsync methods to ConnectionPool (#176)
+- ConnectorAdapter type to use ConnectionPool as Connector interface (#176)
 
 ### Changed
 
diff --git a/connection_pool/connection_pool.go b/connection_pool/connection_pool.go
index d9d41edef..6597e2dd0 100644
--- a/connection_pool/connection_pool.go
+++ b/connection_pool/connection_pool.go
@@ -99,6 +99,8 @@ type ConnectionPool struct {
 	poolsMutex sync.RWMutex
 }
 
+var _ Pooler = (*ConnectionPool)(nil)
+
 type connState struct {
 	addr   string
 	notify chan tarantool.ConnEvent
diff --git a/connection_pool/connector.go b/connection_pool/connector.go
new file mode 100644
index 000000000..e52109d92
--- /dev/null
+++ b/connection_pool/connector.go
@@ -0,0 +1,305 @@
+package connection_pool
+
+import (
+	"errors"
+	"fmt"
+	"time"
+
+	"github.com/tarantool/go-tarantool"
+)
+
+// ConnectorAdapter allows to use Pooler as Connector.
+type ConnectorAdapter struct {
+	pool Pooler
+	mode Mode
+}
+
+var _ tarantool.Connector = (*ConnectorAdapter)(nil)
+
+// NewConnectorAdapter creates a new ConnectorAdapter object for a pool
+// and with a mode. All requests to the pool will be executed in the
+// specified mode.
+func NewConnectorAdapter(pool Pooler, mode Mode) *ConnectorAdapter {
+	return &ConnectorAdapter{pool: pool, mode: mode}
+}
+
+// ConnectedNow reports if connections is established at the moment.
+func (c *ConnectorAdapter) ConnectedNow() bool {
+	ret, err := c.pool.ConnectedNow(c.mode)
+	if err != nil {
+		return false
+	}
+	return ret
+}
+
+// ClosedNow reports if the connector is closed by user or all connections
+// in the specified mode closed.
+func (c *ConnectorAdapter) Close() error {
+	errs := c.pool.Close()
+	if len(errs) == 0 {
+		return nil
+	}
+
+	err := errors.New("failed to close connection pool")
+	for _, e := range errs {
+		err = fmt.Errorf("%s: %w", err.Error(), e)
+	}
+	return err
+}
+
+// Ping sends empty request to Tarantool to check connection.
+func (c *ConnectorAdapter) Ping() (*tarantool.Response, error) {
+	return c.pool.Ping(c.mode)
+}
+
+// ConfiguredTimeout returns a timeout from connections config.
+func (c *ConnectorAdapter) ConfiguredTimeout() time.Duration {
+	ret, err := c.pool.ConfiguredTimeout(c.mode)
+	if err != nil {
+		return 0 * time.Second
+	}
+	return ret
+}
+
+// Select performs select to box space.
+func (c *ConnectorAdapter) Select(space, index interface{},
+	offset, limit, iterator uint32,
+	key interface{}) (*tarantool.Response, error) {
+	return c.pool.Select(space, index, offset, limit, iterator, key, c.mode)
+}
+
+// Insert performs insertion to box space.
+func (c *ConnectorAdapter) Insert(space interface{},
+	tuple interface{}) (*tarantool.Response, error) {
+	return c.pool.Insert(space, tuple, c.mode)
+}
+
+// Replace performs "insert or replace" action to box space.
+func (c *ConnectorAdapter) Replace(space interface{},
+	tuple interface{}) (*tarantool.Response, error) {
+	return c.pool.Replace(space, tuple, c.mode)
+}
+
+// Delete performs deletion of a tuple by key.
+func (c *ConnectorAdapter) Delete(space, index interface{},
+	key interface{}) (*tarantool.Response, error) {
+	return c.pool.Delete(space, index, key, c.mode)
+}
+
+// Update performs update of a tuple by key.
+func (c *ConnectorAdapter) Update(space, index interface{},
+	key, ops interface{}) (*tarantool.Response, error) {
+	return c.pool.Update(space, index, key, ops, c.mode)
+}
+
+// Upsert performs "update or insert" action of a tuple by key.
+func (c *ConnectorAdapter) Upsert(space interface{},
+	tuple, ops interface{}) (*tarantool.Response, error) {
+	return c.pool.Upsert(space, tuple, ops, c.mode)
+}
+
+// Call calls registered Tarantool function.
+// It uses request code for Tarantool >= 1.7 if go-tarantool
+// was build with go_tarantool_call_17 tag.
+// Otherwise, uses request code for Tarantool 1.6.
+func (c *ConnectorAdapter) Call(functionName string,
+	args interface{}) (*tarantool.Response, error) {
+	return c.pool.Call(functionName, args, c.mode)
+}
+
+// Call16 calls registered Tarantool function.
+// It uses request code for Tarantool 1.6, so result is converted to array of arrays
+// Deprecated since Tarantool 1.7.2.
+func (c *ConnectorAdapter) Call16(functionName string,
+	args interface{}) (*tarantool.Response, error) {
+	return c.pool.Call16(functionName, args, c.mode)
+}
+
+// Call17 calls registered Tarantool function.
+// It uses request code for Tarantool >= 1.7, so result is not converted
+// (though, keep in mind, result is always array)
+func (c *ConnectorAdapter) Call17(functionName string,
+	args interface{}) (*tarantool.Response, error) {
+	return c.pool.Call17(functionName, args, c.mode)
+}
+
+// Eval passes Lua expression for evaluation.
+func (c *ConnectorAdapter) Eval(expr string,
+	args interface{}) (*tarantool.Response, error) {
+	return c.pool.Eval(expr, args, c.mode)
+}
+
+// Execute passes sql expression to Tarantool for execution.
+func (c *ConnectorAdapter) Execute(expr string,
+	args interface{}) (*tarantool.Response, error) {
+	return c.pool.Execute(expr, args, c.mode)
+}
+
+// GetTyped performs select (with limit = 1 and offset = 0)
+// to box space and fills typed result.
+func (c *ConnectorAdapter) GetTyped(space, index interface{},
+	key interface{}, result interface{}) error {
+	return c.pool.GetTyped(space, index, key, result, c.mode)
+}
+
+// SelectTyped performs select to box space and fills typed result.
+func (c *ConnectorAdapter) SelectTyped(space, index interface{},
+	offset, limit, iterator uint32,
+	key interface{}, result interface{}) error {
+	return c.pool.SelectTyped(space, index, offset, limit, iterator, key, result, c.mode)
+}
+
+// InsertTyped performs insertion to box space.
+func (c *ConnectorAdapter) InsertTyped(space interface{},
+	tuple interface{}, result interface{}) error {
+	return c.pool.InsertTyped(space, tuple, result, c.mode)
+}
+
+// ReplaceTyped performs "insert or replace" action to box space.
+func (c *ConnectorAdapter) ReplaceTyped(space interface{},
+	tuple interface{}, result interface{}) error {
+	return c.pool.ReplaceTyped(space, tuple, result, c.mode)
+}
+
+// DeleteTyped performs deletion of a tuple by key and fills result with deleted tuple.
+func (c *ConnectorAdapter) DeleteTyped(space, index interface{},
+	key interface{}, result interface{}) error {
+	return c.pool.DeleteTyped(space, index, key, result, c.mode)
+}
+
+// UpdateTyped performs update of a tuple by key and fills result with updated tuple.
+func (c *ConnectorAdapter) UpdateTyped(space, index interface{},
+	key, ops interface{}, result interface{}) error {
+	return c.pool.UpdateTyped(space, index, key, ops, result, c.mode)
+}
+
+// CallTyped calls registered function.
+// It uses request code for Tarantool >= 1.7 if go-tarantool
+// was build with go_tarantool_call_17 tag.
+// Otherwise, uses request code for Tarantool 1.6.
+func (c *ConnectorAdapter) CallTyped(functionName string,
+	args interface{}, result interface{}) error {
+	return c.pool.CallTyped(functionName, args, result, c.mode)
+}
+
+// Call16Typed calls registered function.
+// It uses request code for Tarantool 1.6, so result is converted to array of arrays
+// Deprecated since Tarantool 1.7.2.
+func (c *ConnectorAdapter) Call16Typed(functionName string,
+	args interface{}, result interface{}) error {
+	return c.pool.Call16Typed(functionName, args, result, c.mode)
+}
+
+// Call17Typed calls registered function.
+// It uses request code for Tarantool >= 1.7, so result is not converted
+// (though, keep in mind, result is always array)
+func (c *ConnectorAdapter) Call17Typed(functionName string,
+	args interface{}, result interface{}) error {
+	return c.pool.Call17Typed(functionName, args, result, c.mode)
+}
+
+// EvalTyped passes Lua expression for evaluation.
+func (c *ConnectorAdapter) EvalTyped(expr string, args interface{},
+	result interface{}) error {
+	return c.pool.EvalTyped(expr, args, result, c.mode)
+}
+
+// ExecuteTyped passes sql expression to Tarantool for execution.
+func (c *ConnectorAdapter) ExecuteTyped(expr string, args interface{},
+	result interface{}) (tarantool.SQLInfo, []tarantool.ColumnMetaData, error) {
+	return c.pool.ExecuteTyped(expr, args, result, c.mode)
+}
+
+// SelectAsync sends select request to Tarantool and returns Future.
+func (c *ConnectorAdapter) SelectAsync(space, index interface{},
+	offset, limit, iterator uint32, key interface{}) *tarantool.Future {
+	return c.pool.SelectAsync(space, index, offset, limit, iterator, key, c.mode)
+}
+
+// InsertAsync sends insert action to Tarantool and returns Future.
+func (c *ConnectorAdapter) InsertAsync(space interface{},
+	tuple interface{}) *tarantool.Future {
+	return c.pool.InsertAsync(space, tuple, c.mode)
+}
+
+// ReplaceAsync sends "insert or replace" action to Tarantool and returns Future.
+func (c *ConnectorAdapter) ReplaceAsync(space interface{},
+	tuple interface{}) *tarantool.Future {
+	return c.pool.ReplaceAsync(space, tuple, c.mode)
+}
+
+// DeleteAsync sends deletion action to Tarantool and returns Future.
+func (c *ConnectorAdapter) DeleteAsync(space, index interface{},
+	key interface{}) *tarantool.Future {
+	return c.pool.DeleteAsync(space, index, key, c.mode)
+}
+
+// Update sends deletion of a tuple by key and returns Future.
+func (c *ConnectorAdapter) UpdateAsync(space, index interface{},
+	key, ops interface{}) *tarantool.Future {
+	return c.pool.UpdateAsync(space, index, key, ops, c.mode)
+}
+
+// UpsertAsync sends "update or insert" action to Tarantool and returns Future.
+func (c *ConnectorAdapter) UpsertAsync(space interface{}, tuple interface{},
+	ops interface{}) *tarantool.Future {
+	return c.pool.UpsertAsync(space, tuple, ops, c.mode)
+}
+
+// CallAsync sends a call to registered Tarantool function and returns Future.
+// It uses request code for Tarantool >= 1.7 if go-tarantool
+// was build with go_tarantool_call_17 tag.
+// Otherwise, uses request code for Tarantool 1.6.
+func (c *ConnectorAdapter) CallAsync(functionName string,
+	args interface{}) *tarantool.Future {
+	return c.pool.CallAsync(functionName, args, c.mode)
+}
+
+// Call16Async sends a call to registered Tarantool function and returns Future.
+// It uses request code for Tarantool 1.6, so future's result is always array of arrays.
+// Deprecated since Tarantool 1.7.2.
+func (c *ConnectorAdapter) Call16Async(functionName string,
+	args interface{}) *tarantool.Future {
+	return c.pool.Call16Async(functionName, args, c.mode)
+}
+
+// Call17Async sends a call to registered Tarantool function and returns Future.
+// It uses request code for Tarantool >= 1.7, so future's result will not be converted
+// (though, keep in mind, result is always array)
+func (c *ConnectorAdapter) Call17Async(functionName string,
+	args interface{}) *tarantool.Future {
+	return c.pool.Call17Async(functionName, args, c.mode)
+}
+
+// EvalAsync sends a Lua expression for evaluation and returns Future.
+func (c *ConnectorAdapter) EvalAsync(expr string,
+	args interface{}) *tarantool.Future {
+	return c.pool.EvalAsync(expr, args, c.mode)
+}
+
+// ExecuteAsync sends a sql expression for execution and returns Future.
+func (c *ConnectorAdapter) ExecuteAsync(expr string,
+	args interface{}) *tarantool.Future {
+	return c.pool.ExecuteAsync(expr, args, c.mode)
+}
+
+// NewPrepared passes a sql statement to Tarantool for preparation
+// synchronously.
+func (c *ConnectorAdapter) NewPrepared(expr string) (*tarantool.Prepared, error) {
+	return c.pool.NewPrepared(expr, c.mode)
+}
+
+// NewStream creates new Stream object for connection.
+//
+// Since v. 2.10.0, Tarantool supports streams and interactive transactions over
+// them. To use interactive transactions, memtx_use_mvcc_engine box option
+// should be set to true.
+// Since 1.7.0
+func (c *ConnectorAdapter) NewStream() (*tarantool.Stream, error) {
+	return c.pool.NewStream(c.mode)
+}
+
+// Do performs a request asynchronously on the connection.
+func (c *ConnectorAdapter) Do(req tarantool.Request) *tarantool.Future {
+	return c.pool.Do(req, c.mode)
+}
diff --git a/connection_pool/connector_test.go b/connection_pool/connector_test.go
new file mode 100644
index 000000000..fa7cf06ba
--- /dev/null
+++ b/connection_pool/connector_test.go
@@ -0,0 +1,1168 @@
+package connection_pool_test
+
+import (
+	"errors"
+	"fmt"
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/require"
+	"github.com/tarantool/go-tarantool"
+	. "github.com/tarantool/go-tarantool/connection_pool"
+)
+
+var testMode Mode = RW
+
+type connectedNowMock struct {
+	Pooler
+	called int
+	mode   Mode
+	retErr bool
+}
+
+// Tests for different logic.
+
+func (m *connectedNowMock) ConnectedNow(mode Mode) (bool, error) {
+	m.called++
+	m.mode = mode
+
+	if m.retErr {
+		return true, errors.New("mock error")
+	}
+	return true, nil
+}
+
+func TestConnectorConnectedNow(t *testing.T) {
+	m := &connectedNowMock{retErr: false}
+	c := NewConnectorAdapter(m, testMode)
+
+	require.Truef(t, c.ConnectedNow(), "unexpected result")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+func TestConnectorConnectedNowWithError(t *testing.T) {
+	m := &connectedNowMock{retErr: true}
+	c := NewConnectorAdapter(m, testMode)
+
+	require.Falsef(t, c.ConnectedNow(), "unexpected result")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+type closeMock struct {
+	Pooler
+	called int
+	retErr bool
+}
+
+func (m *closeMock) Close() []error {
+	m.called++
+	if m.retErr {
+		return []error{errors.New("err1"), errors.New("err2")}
+	}
+	return nil
+}
+
+func TestConnectorClose(t *testing.T) {
+	m := &closeMock{retErr: false}
+	c := NewConnectorAdapter(m, testMode)
+
+	require.Nilf(t, c.Close(), "unexpected result")
+	require.Equalf(t, 1, m.called, "should be called only once")
+}
+
+func TestConnectorCloseWithError(t *testing.T) {
+	m := &closeMock{retErr: true}
+	c := NewConnectorAdapter(m, testMode)
+
+	err := c.Close()
+	require.NotNilf(t, err, "unexpected result")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equal(t, "failed to close connection pool: err1: err2", err.Error())
+}
+
+type configuredTimeoutMock struct {
+	Pooler
+	called  int
+	timeout time.Duration
+	mode    Mode
+	retErr  bool
+}
+
+func (m *configuredTimeoutMock) ConfiguredTimeout(mode Mode) (time.Duration, error) {
+	m.called++
+	m.mode = mode
+	m.timeout = 5 * time.Second
+	if m.retErr {
+		return m.timeout, fmt.Errorf("err")
+	}
+	return m.timeout, nil
+}
+
+func TestConnectorConfiguredTimeout(t *testing.T) {
+	m := &configuredTimeoutMock{retErr: false}
+	c := NewConnectorAdapter(m, testMode)
+
+	require.Equalf(t, c.ConfiguredTimeout(), m.timeout, "unexpected result")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+func TestConnectorConfiguredTimeoutWithError(t *testing.T) {
+	m := &configuredTimeoutMock{retErr: true}
+	c := NewConnectorAdapter(m, testMode)
+
+	ret := c.ConfiguredTimeout()
+
+	require.NotEqualf(t, ret, m.timeout, "unexpected result")
+	require.Equalf(t, ret, time.Duration(0), "unexpected result")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+// Tests for that ConnectorAdapter is just a proxy for requests.
+
+type baseRequestMock struct {
+	Pooler
+	called                  int
+	functionName            string
+	offset, limit, iterator uint32
+	space, index            interface{}
+	args, tuple, key, ops   interface{}
+	result                  interface{}
+	mode                    Mode
+}
+
+var reqResp *tarantool.Response = &tarantool.Response{}
+var reqErr error = errors.New("response error")
+var reqFuture *tarantool.Future = &tarantool.Future{}
+
+var reqFunctionName string = "any_name"
+var reqOffset uint32 = 1
+var reqLimit uint32 = 2
+var reqIterator uint32 = 3
+var reqSpace interface{} = []interface{}{1}
+var reqIndex interface{} = []interface{}{2}
+var reqArgs interface{} = []interface{}{3}
+var reqTuple interface{} = []interface{}{4}
+var reqKey interface{} = []interface{}{5}
+var reqOps interface{} = []interface{}{6}
+
+var reqResult interface{} = []interface{}{7}
+var reqSqlInfo = tarantool.SQLInfo{AffectedCount: 3}
+var reqMeta = []tarantool.ColumnMetaData{{FieldIsNullable: false}}
+
+type getTypedMock struct {
+	baseRequestMock
+}
+
+func (m *getTypedMock) GetTyped(space, index, key interface{},
+	result interface{}, mode ...Mode) error {
+	m.called++
+	m.space = space
+	m.index = index
+	m.key = key
+	m.result = result
+	m.mode = mode[0]
+	return reqErr
+}
+
+func TestConnectorGetTyped(t *testing.T) {
+	m := &getTypedMock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	err := c.GetTyped(reqSpace, reqIndex, reqKey, reqResult)
+
+	require.Equalf(t, reqErr, err, "unexpected error")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, reqSpace, m.space, "unexpected space was passed")
+	require.Equalf(t, reqIndex, m.index, "unexpected index was passed")
+	require.Equalf(t, reqKey, m.key, "unexpected key was passed")
+	require.Equalf(t, reqResult, m.result, "unexpected result was passed")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+type selectMock struct {
+	baseRequestMock
+}
+
+func (m *selectMock) Select(space, index interface{},
+	offset, limit, iterator uint32, key interface{},
+	mode ...Mode) (*tarantool.Response, error) {
+	m.called++
+	m.space = space
+	m.index = index
+	m.offset = offset
+	m.limit = limit
+	m.iterator = iterator
+	m.key = key
+	m.mode = mode[0]
+	return reqResp, reqErr
+}
+
+func TestConnectorSelect(t *testing.T) {
+	m := &selectMock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	resp, err := c.Select(reqSpace, reqIndex, reqOffset, reqLimit, reqIterator, reqKey)
+
+	require.Equalf(t, reqResp, resp, "unexpected response")
+	require.Equalf(t, reqErr, err, "unexpected error")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, reqSpace, m.space, "unexpected space was passed")
+	require.Equalf(t, reqIndex, m.index, "unexpected index was passed")
+	require.Equalf(t, reqOffset, m.offset, "unexpected offset was passed")
+	require.Equalf(t, reqLimit, m.limit, "unexpected limit was passed")
+	require.Equalf(t, reqIterator, m.iterator, "unexpected iterator was passed")
+	require.Equalf(t, reqKey, m.key, "unexpected key was passed")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+type selectTypedMock struct {
+	baseRequestMock
+}
+
+func (m *selectTypedMock) SelectTyped(space, index interface{},
+	offset, limit, iterator uint32, key interface{},
+	result interface{}, mode ...Mode) error {
+	m.called++
+	m.space = space
+	m.index = index
+	m.offset = offset
+	m.limit = limit
+	m.iterator = iterator
+	m.key = key
+	m.result = result
+	m.mode = mode[0]
+	return reqErr
+}
+
+func TestConnectorSelectTyped(t *testing.T) {
+	m := &selectTypedMock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	err := c.SelectTyped(reqSpace, reqIndex, reqOffset, reqLimit,
+		reqIterator, reqKey, reqResult)
+
+	require.Equalf(t, reqErr, err, "unexpected error")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, reqSpace, m.space, "unexpected space was passed")
+	require.Equalf(t, reqIndex, m.index, "unexpected index was passed")
+	require.Equalf(t, reqOffset, m.offset, "unexpected offset was passed")
+	require.Equalf(t, reqLimit, m.limit, "unexpected limit was passed")
+	require.Equalf(t, reqIterator, m.iterator, "unexpected iterator was passed")
+	require.Equalf(t, reqKey, m.key, "unexpected key was passed")
+	require.Equalf(t, reqResult, m.result, "unexpected result was passed")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+type selectAsyncMock struct {
+	baseRequestMock
+}
+
+func (m *selectAsyncMock) SelectAsync(space, index interface{},
+	offset, limit, iterator uint32, key interface{},
+	mode ...Mode) *tarantool.Future {
+	m.called++
+	m.space = space
+	m.index = index
+	m.offset = offset
+	m.limit = limit
+	m.iterator = iterator
+	m.key = key
+	m.mode = mode[0]
+	return reqFuture
+}
+
+func TestConnectorSelectAsync(t *testing.T) {
+	m := &selectAsyncMock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	fut := c.SelectAsync(reqSpace, reqIndex, reqOffset, reqLimit,
+		reqIterator, reqKey)
+
+	require.Equalf(t, reqFuture, fut, "unexpected future")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, reqSpace, m.space, "unexpected space was passed")
+	require.Equalf(t, reqIndex, m.index, "unexpected index was passed")
+	require.Equalf(t, reqOffset, m.offset, "unexpected offset was passed")
+	require.Equalf(t, reqLimit, m.limit, "unexpected limit was passed")
+	require.Equalf(t, reqIterator, m.iterator, "unexpected iterator was passed")
+	require.Equalf(t, reqKey, m.key, "unexpected key was passed")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+type insertMock struct {
+	baseRequestMock
+}
+
+func (m *insertMock) Insert(space, tuple interface{},
+	mode ...Mode) (*tarantool.Response, error) {
+	m.called++
+	m.space = space
+	m.tuple = tuple
+	m.mode = mode[0]
+	return reqResp, reqErr
+}
+
+func TestConnectorInsert(t *testing.T) {
+	m := &insertMock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	resp, err := c.Insert(reqSpace, reqTuple)
+
+	require.Equalf(t, reqResp, resp, "unexpected response")
+	require.Equalf(t, reqErr, err, "unexpected error")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, reqSpace, m.space, "unexpected space was passed")
+	require.Equalf(t, reqTuple, m.tuple, "unexpected tuple was passed")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+type insertTypedMock struct {
+	baseRequestMock
+}
+
+func (m *insertTypedMock) InsertTyped(space, tuple interface{},
+	result interface{}, mode ...Mode) error {
+	m.called++
+	m.space = space
+	m.tuple = tuple
+	m.result = result
+	m.mode = mode[0]
+	return reqErr
+}
+
+func TestConnectorInsertTyped(t *testing.T) {
+	m := &insertTypedMock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	err := c.InsertTyped(reqSpace, reqTuple, reqResult)
+
+	require.Equalf(t, reqErr, err, "unexpected error")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, reqSpace, m.space, "unexpected space was passed")
+	require.Equalf(t, reqTuple, m.tuple, "unexpected tuple was passed")
+	require.Equalf(t, reqResult, m.result, "unexpected result was passed")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+type insertAsyncMock struct {
+	baseRequestMock
+}
+
+func (m *insertAsyncMock) InsertAsync(space, tuple interface{},
+	mode ...Mode) *tarantool.Future {
+	m.called++
+	m.space = space
+	m.tuple = tuple
+	m.mode = mode[0]
+	return reqFuture
+}
+
+func TestConnectorInsertAsync(t *testing.T) {
+	m := &insertAsyncMock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	fut := c.InsertAsync(reqSpace, reqTuple)
+
+	require.Equalf(t, reqFuture, fut, "unexpected future")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, reqSpace, m.space, "unexpected space was passed")
+	require.Equalf(t, reqTuple, m.tuple, "unexpected tuple was passed")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+type replaceMock struct {
+	baseRequestMock
+}
+
+func (m *replaceMock) Replace(space, tuple interface{},
+	mode ...Mode) (*tarantool.Response, error) {
+	m.called++
+	m.space = space
+	m.tuple = tuple
+	m.mode = mode[0]
+	return reqResp, reqErr
+}
+
+func TestConnectorReplace(t *testing.T) {
+	m := &replaceMock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	resp, err := c.Replace(reqSpace, reqTuple)
+
+	require.Equalf(t, reqResp, resp, "unexpected response")
+	require.Equalf(t, reqErr, err, "unexpected error")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, reqSpace, m.space, "unexpected space was passed")
+	require.Equalf(t, reqTuple, m.tuple, "unexpected tuple was passed")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+type replaceTypedMock struct {
+	baseRequestMock
+}
+
+func (m *replaceTypedMock) ReplaceTyped(space, tuple interface{},
+	result interface{}, mode ...Mode) error {
+	m.called++
+	m.space = space
+	m.tuple = tuple
+	m.result = result
+	m.mode = mode[0]
+	return reqErr
+}
+
+func TestConnectorReplaceTyped(t *testing.T) {
+	m := &replaceTypedMock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	err := c.ReplaceTyped(reqSpace, reqTuple, reqResult)
+
+	require.Equalf(t, reqErr, err, "unexpected error")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, reqSpace, m.space, "unexpected space was passed")
+	require.Equalf(t, reqTuple, m.tuple, "unexpected tuple was passed")
+	require.Equalf(t, reqResult, m.result, "unexpected result was passed")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+type replaceAsyncMock struct {
+	baseRequestMock
+}
+
+func (m *replaceAsyncMock) ReplaceAsync(space, tuple interface{},
+	mode ...Mode) *tarantool.Future {
+	m.called++
+	m.space = space
+	m.tuple = tuple
+	m.mode = mode[0]
+	return reqFuture
+}
+
+func TestConnectorReplaceAsync(t *testing.T) {
+	m := &replaceAsyncMock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	fut := c.ReplaceAsync(reqSpace, reqTuple)
+
+	require.Equalf(t, reqFuture, fut, "unexpected future")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, reqSpace, m.space, "unexpected space was passed")
+	require.Equalf(t, reqTuple, m.tuple, "unexpected tuple was passed")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+type deleteMock struct {
+	baseRequestMock
+}
+
+func (m *deleteMock) Delete(space, index, key interface{},
+	mode ...Mode) (*tarantool.Response, error) {
+	m.called++
+	m.space = space
+	m.index = index
+	m.key = key
+	m.mode = mode[0]
+	return reqResp, reqErr
+}
+
+func TestConnectorDelete(t *testing.T) {
+	m := &deleteMock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	resp, err := c.Delete(reqSpace, reqIndex, reqKey)
+
+	require.Equalf(t, reqResp, resp, "unexpected response")
+	require.Equalf(t, reqErr, err, "unexpected error")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, reqSpace, m.space, "unexpected space was passed")
+	require.Equalf(t, reqIndex, m.index, "unexpected index was passed")
+	require.Equalf(t, reqKey, m.key, "unexpected key was passed")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+type deleteTypedMock struct {
+	baseRequestMock
+}
+
+func (m *deleteTypedMock) DeleteTyped(space, index, key interface{},
+	result interface{}, mode ...Mode) error {
+	m.called++
+	m.space = space
+	m.index = index
+	m.key = key
+	m.result = result
+	m.mode = mode[0]
+	return reqErr
+}
+
+func TestConnectorDeleteTyped(t *testing.T) {
+	m := &deleteTypedMock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	err := c.DeleteTyped(reqSpace, reqIndex, reqKey, reqResult)
+
+	require.Equalf(t, reqErr, err, "unexpected error")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, reqSpace, m.space, "unexpected space was passed")
+	require.Equalf(t, reqIndex, m.index, "unexpected index was passed")
+	require.Equalf(t, reqKey, m.key, "unexpected key was passed")
+	require.Equalf(t, reqResult, m.result, "unexpected result was passed")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+type deleteAsyncMock struct {
+	baseRequestMock
+}
+
+func (m *deleteAsyncMock) DeleteAsync(space, index, key interface{},
+	mode ...Mode) *tarantool.Future {
+	m.called++
+	m.space = space
+	m.index = index
+	m.key = key
+	m.mode = mode[0]
+	return reqFuture
+}
+
+func TestConnectorDeleteAsync(t *testing.T) {
+	m := &deleteAsyncMock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	fut := c.DeleteAsync(reqSpace, reqIndex, reqKey)
+
+	require.Equalf(t, reqFuture, fut, "unexpected future")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, reqSpace, m.space, "unexpected space was passed")
+	require.Equalf(t, reqIndex, m.index, "unexpected index was passed")
+	require.Equalf(t, reqKey, m.key, "unexpected key was passed")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+type updateMock struct {
+	baseRequestMock
+}
+
+func (m *updateMock) Update(space, index, key, ops interface{},
+	mode ...Mode) (*tarantool.Response, error) {
+	m.called++
+	m.space = space
+	m.index = index
+	m.key = key
+	m.ops = ops
+	m.mode = mode[0]
+	return reqResp, reqErr
+}
+
+func TestConnectorUpdate(t *testing.T) {
+	m := &updateMock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	resp, err := c.Update(reqSpace, reqIndex, reqKey, reqOps)
+
+	require.Equalf(t, reqResp, resp, "unexpected response")
+	require.Equalf(t, reqErr, err, "unexpected error")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, reqSpace, m.space, "unexpected space was passed")
+	require.Equalf(t, reqIndex, m.index, "unexpected index was passed")
+	require.Equalf(t, reqKey, m.key, "unexpected key was passed")
+	require.Equalf(t, reqOps, m.ops, "unexpected ops was passed")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+type updateTypedMock struct {
+	baseRequestMock
+}
+
+func (m *updateTypedMock) UpdateTyped(space, index, key, ops interface{},
+	result interface{}, mode ...Mode) error {
+	m.called++
+	m.space = space
+	m.index = index
+	m.key = key
+	m.ops = ops
+	m.result = result
+	m.mode = mode[0]
+	return reqErr
+}
+
+func TestConnectorUpdateTyped(t *testing.T) {
+	m := &updateTypedMock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	err := c.UpdateTyped(reqSpace, reqIndex, reqKey, reqOps, reqResult)
+
+	require.Equalf(t, reqErr, err, "unexpected error")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, reqSpace, m.space, "unexpected space was passed")
+	require.Equalf(t, reqIndex, m.index, "unexpected index was passed")
+	require.Equalf(t, reqKey, m.key, "unexpected key was passed")
+	require.Equalf(t, reqOps, m.ops, "unexpected ops was passed")
+	require.Equalf(t, reqResult, m.result, "unexpected result was passed")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+type updateAsyncMock struct {
+	baseRequestMock
+}
+
+func (m *updateAsyncMock) UpdateAsync(space, index, key, ops interface{},
+	mode ...Mode) *tarantool.Future {
+	m.called++
+	m.space = space
+	m.index = index
+	m.key = key
+	m.ops = ops
+	m.mode = mode[0]
+	return reqFuture
+}
+
+func TestConnectorUpdateAsync(t *testing.T) {
+	m := &updateAsyncMock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	fut := c.UpdateAsync(reqSpace, reqIndex, reqKey, reqOps)
+
+	require.Equalf(t, reqFuture, fut, "unexpected future")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, reqSpace, m.space, "unexpected space was passed")
+	require.Equalf(t, reqIndex, m.index, "unexpected index was passed")
+	require.Equalf(t, reqKey, m.key, "unexpected key was passed")
+	require.Equalf(t, reqOps, m.ops, "unexpected ops was passed")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+type upsertMock struct {
+	baseRequestMock
+}
+
+func (m *upsertMock) Upsert(space, tuple, ops interface{},
+	mode ...Mode) (*tarantool.Response, error) {
+	m.called++
+	m.space = space
+	m.tuple = tuple
+	m.ops = ops
+	m.mode = mode[0]
+	return reqResp, reqErr
+}
+
+func TestConnectorUpsert(t *testing.T) {
+	m := &upsertMock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	resp, err := c.Upsert(reqSpace, reqTuple, reqOps)
+
+	require.Equalf(t, reqResp, resp, "unexpected response")
+	require.Equalf(t, reqErr, err, "unexpected error")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, reqSpace, m.space, "unexpected space was passed")
+	require.Equalf(t, reqTuple, m.tuple, "unexpected tuple was passed")
+	require.Equalf(t, reqOps, m.ops, "unexpected ops was passed")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+type upsertAsyncMock struct {
+	baseRequestMock
+}
+
+func (m *upsertAsyncMock) UpsertAsync(space, tuple, ops interface{},
+	mode ...Mode) *tarantool.Future {
+	m.called++
+	m.space = space
+	m.tuple = tuple
+	m.ops = ops
+	m.mode = mode[0]
+	return reqFuture
+}
+
+func TestConnectorUpsertAsync(t *testing.T) {
+	m := &upsertAsyncMock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	fut := c.UpsertAsync(reqSpace, reqTuple, reqOps)
+
+	require.Equalf(t, reqFuture, fut, "unexpected future")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, reqSpace, m.space, "unexpected space was passed")
+	require.Equalf(t, reqTuple, m.tuple, "unexpected tuple was passed")
+	require.Equalf(t, reqOps, m.ops, "unexpected ops was passed")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+type baseCallMock struct {
+	baseRequestMock
+}
+
+func (m *baseCallMock) call(functionName string, args interface{},
+	mode Mode) (*tarantool.Response, error) {
+	m.called++
+	m.functionName = functionName
+	m.args = args
+	m.mode = mode
+	return reqResp, reqErr
+}
+
+func (m *baseCallMock) callTyped(functionName string, args interface{},
+	result interface{}, mode Mode) error {
+	m.called++
+	m.functionName = functionName
+	m.args = args
+	m.result = result
+	m.mode = mode
+	return reqErr
+}
+
+func (m *baseCallMock) callAsync(functionName string, args interface{},
+	mode Mode) *tarantool.Future {
+	m.called++
+	m.functionName = functionName
+	m.args = args
+	m.mode = mode
+	return reqFuture
+}
+
+type callMock struct {
+	baseCallMock
+}
+
+func (m *callMock) Call(functionName string, args interface{},
+	mode Mode) (*tarantool.Response, error) {
+	return m.call(functionName, args, mode)
+}
+
+func TestConnectorCall(t *testing.T) {
+	m := &callMock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	resp, err := c.Call(reqFunctionName, reqArgs)
+
+	require.Equalf(t, reqResp, resp, "unexpected response")
+	require.Equalf(t, reqErr, err, "unexpected error")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, reqFunctionName, m.functionName,
+		"unexpected functionName was passed")
+	require.Equalf(t, reqArgs, m.args, "unexpected args was passed")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+type callTypedMock struct {
+	baseCallMock
+}
+
+func (m *callTypedMock) CallTyped(functionName string, args interface{},
+	result interface{}, mode Mode) error {
+	return m.callTyped(functionName, args, result, mode)
+}
+
+func TestConnectorCallTyped(t *testing.T) {
+	m := &callTypedMock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	err := c.CallTyped(reqFunctionName, reqArgs, reqResult)
+
+	require.Equalf(t, reqErr, err, "unexpected error")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, reqFunctionName, m.functionName,
+		"unexpected functionName was passed")
+	require.Equalf(t, reqArgs, m.args, "unexpected args was passed")
+	require.Equalf(t, reqResult, m.result, "unexpected result was passed")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+type callAsyncMock struct {
+	baseCallMock
+}
+
+func (m *callAsyncMock) CallAsync(functionName string, args interface{},
+	mode Mode) *tarantool.Future {
+	return m.callAsync(functionName, args, mode)
+}
+
+func TestConnectorCallAsync(t *testing.T) {
+	m := &callAsyncMock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	fut := c.CallAsync(reqFunctionName, reqArgs)
+
+	require.Equalf(t, reqFuture, fut, "unexpected future")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, reqFunctionName, m.functionName,
+		"unexpected functionName was passed")
+	require.Equalf(t, reqArgs, m.args, "unexpected args was passed")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+type call16Mock struct {
+	baseCallMock
+}
+
+func (m *call16Mock) Call16(functionName string, args interface{},
+	mode Mode) (*tarantool.Response, error) {
+	return m.call(functionName, args, mode)
+}
+
+func TestConnectorCall16(t *testing.T) {
+	m := &call16Mock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	resp, err := c.Call16(reqFunctionName, reqArgs)
+
+	require.Equalf(t, reqResp, resp, "unexpected response")
+	require.Equalf(t, reqErr, err, "unexpected error")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, reqFunctionName, m.functionName,
+		"unexpected functionName was passed")
+	require.Equalf(t, reqArgs, m.args, "unexpected args was passed")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+type call16TypedMock struct {
+	baseCallMock
+}
+
+func (m *call16TypedMock) Call16Typed(functionName string, args interface{},
+	result interface{}, mode Mode) error {
+	return m.callTyped(functionName, args, result, mode)
+}
+
+func TestConnectorCall16Typed(t *testing.T) {
+	m := &call16TypedMock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	err := c.Call16Typed(reqFunctionName, reqArgs, reqResult)
+
+	require.Equalf(t, reqErr, err, "unexpected error")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, reqFunctionName, m.functionName,
+		"unexpected functionName was passed")
+	require.Equalf(t, reqArgs, m.args, "unexpected args was passed")
+	require.Equalf(t, reqResult, m.result, "unexpected result was passed")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+type call16AsyncMock struct {
+	baseCallMock
+}
+
+func (m *call16AsyncMock) Call16Async(functionName string, args interface{},
+	mode Mode) *tarantool.Future {
+	return m.callAsync(functionName, args, mode)
+}
+
+func TestConnectorCall16Async(t *testing.T) {
+	m := &call16AsyncMock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	fut := c.Call16Async(reqFunctionName, reqArgs)
+
+	require.Equalf(t, reqFuture, fut, "unexpected future")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, reqFunctionName, m.functionName,
+		"unexpected functionName was passed")
+	require.Equalf(t, reqArgs, m.args, "unexpected args was passed")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+type call17Mock struct {
+	baseCallMock
+}
+
+func (m *call17Mock) Call17(functionName string, args interface{},
+	mode Mode) (*tarantool.Response, error) {
+	return m.call(functionName, args, mode)
+}
+
+func TestConnectorCall17(t *testing.T) {
+	m := &call17Mock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	resp, err := c.Call17(reqFunctionName, reqArgs)
+
+	require.Equalf(t, reqResp, resp, "unexpected response")
+	require.Equalf(t, reqErr, err, "unexpected error")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, reqFunctionName, m.functionName,
+		"unexpected functionName was passed")
+	require.Equalf(t, reqArgs, m.args, "unexpected args was passed")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+type call17TypedMock struct {
+	baseCallMock
+}
+
+func (m *call17TypedMock) Call17Typed(functionName string, args interface{},
+	result interface{}, mode Mode) error {
+	return m.callTyped(functionName, args, result, mode)
+}
+
+func TestConnectorCall17Typed(t *testing.T) {
+	m := &call17TypedMock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	err := c.Call17Typed(reqFunctionName, reqArgs, reqResult)
+
+	require.Equalf(t, reqErr, err, "unexpected error")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, reqFunctionName, m.functionName,
+		"unexpected functionName was passed")
+	require.Equalf(t, reqArgs, m.args, "unexpected args was passed")
+	require.Equalf(t, reqResult, m.result, "unexpected result was passed")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+type call17AsyncMock struct {
+	baseCallMock
+}
+
+func (m *call17AsyncMock) Call17Async(functionName string, args interface{},
+	mode Mode) *tarantool.Future {
+	return m.callAsync(functionName, args, mode)
+}
+
+func TestConnectorCall17Async(t *testing.T) {
+	m := &call17AsyncMock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	fut := c.Call17Async(reqFunctionName, reqArgs)
+
+	require.Equalf(t, reqFuture, fut, "unexpected future")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, reqFunctionName, m.functionName,
+		"unexpected functionName was passed")
+	require.Equalf(t, reqArgs, m.args, "unexpected args was passed")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+type evalMock struct {
+	baseCallMock
+}
+
+func (m *evalMock) Eval(functionName string, args interface{},
+	mode Mode) (*tarantool.Response, error) {
+	return m.call(functionName, args, mode)
+}
+
+func TestConnectorEval(t *testing.T) {
+	m := &evalMock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	resp, err := c.Eval(reqFunctionName, reqArgs)
+
+	require.Equalf(t, reqResp, resp, "unexpected response")
+	require.Equalf(t, reqErr, err, "unexpected error")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, reqFunctionName, m.functionName,
+		"unexpected expr was passed")
+	require.Equalf(t, reqArgs, m.args, "unexpected args was passed")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+type evalTypedMock struct {
+	baseCallMock
+}
+
+func (m *evalTypedMock) EvalTyped(functionName string, args interface{},
+	result interface{}, mode Mode) error {
+	return m.callTyped(functionName, args, result, mode)
+}
+
+func TestConnectorEvalTyped(t *testing.T) {
+	m := &evalTypedMock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	err := c.EvalTyped(reqFunctionName, reqArgs, reqResult)
+
+	require.Equalf(t, reqErr, err, "unexpected error")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, reqFunctionName, m.functionName,
+		"unexpected expr was passed")
+	require.Equalf(t, reqArgs, m.args, "unexpected args was passed")
+	require.Equalf(t, reqResult, m.result, "unexpected result was passed")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+type evalAsyncMock struct {
+	baseCallMock
+}
+
+func (m *evalAsyncMock) EvalAsync(functionName string, args interface{},
+	mode Mode) *tarantool.Future {
+	return m.callAsync(functionName, args, mode)
+}
+
+func TestConnectorEvalAsync(t *testing.T) {
+	m := &evalAsyncMock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	fut := c.EvalAsync(reqFunctionName, reqArgs)
+
+	require.Equalf(t, reqFuture, fut, "unexpected future")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, reqFunctionName, m.functionName,
+		"unexpected expr was passed")
+	require.Equalf(t, reqArgs, m.args, "unexpected args was passed")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+type executeMock struct {
+	baseCallMock
+}
+
+func (m *executeMock) Execute(functionName string, args interface{},
+	mode Mode) (*tarantool.Response, error) {
+	return m.call(functionName, args, mode)
+}
+
+func TestConnectorExecute(t *testing.T) {
+	m := &executeMock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	resp, err := c.Execute(reqFunctionName, reqArgs)
+
+	require.Equalf(t, reqResp, resp, "unexpected response")
+	require.Equalf(t, reqErr, err, "unexpected error")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, reqFunctionName, m.functionName,
+		"unexpected expr was passed")
+	require.Equalf(t, reqArgs, m.args, "unexpected args was passed")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+type executeTypedMock struct {
+	baseCallMock
+}
+
+func (m *executeTypedMock) ExecuteTyped(functionName string, args, result interface{},
+	mode Mode) (tarantool.SQLInfo, []tarantool.ColumnMetaData, error) {
+	m.callTyped(functionName, args, result, mode)
+	return reqSqlInfo, reqMeta, reqErr
+}
+
+func TestConnectorExecuteTyped(t *testing.T) {
+	m := &executeTypedMock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	info, meta, err := c.ExecuteTyped(reqFunctionName, reqArgs, reqResult)
+
+	require.Equalf(t, reqSqlInfo, info, "unexpected info")
+	require.Equalf(t, reqMeta, meta, "unexpected meta")
+	require.Equalf(t, reqErr, err, "unexpected error")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, reqFunctionName, m.functionName,
+		"unexpected expr was passed")
+	require.Equalf(t, reqArgs, m.args, "unexpected args was passed")
+	require.Equalf(t, reqResult, m.result, "unexpected result was passed")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+type executeAsyncMock struct {
+	baseCallMock
+}
+
+func (m *executeAsyncMock) ExecuteAsync(functionName string, args interface{},
+	mode Mode) *tarantool.Future {
+	return m.callAsync(functionName, args, mode)
+}
+
+func TestConnectorExecuteAsync(t *testing.T) {
+	m := &executeAsyncMock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	fut := c.ExecuteAsync(reqFunctionName, reqArgs)
+
+	require.Equalf(t, reqFuture, fut, "unexpected future")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, reqFunctionName, m.functionName,
+		"unexpected expr was passed")
+	require.Equalf(t, reqArgs, m.args, "unexpected args was passed")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+var reqPrepared *tarantool.Prepared = &tarantool.Prepared{}
+
+type newPreparedMock struct {
+	Pooler
+	called int
+	expr   string
+	mode   Mode
+}
+
+func (m *newPreparedMock) NewPrepared(expr string,
+	mode Mode) (*tarantool.Prepared, error) {
+	m.called++
+	m.expr = expr
+	m.mode = mode
+	return reqPrepared, reqErr
+}
+
+func TestConnectorNewPrepared(t *testing.T) {
+	m := &newPreparedMock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	p, err := c.NewPrepared(reqFunctionName)
+
+	require.Equalf(t, reqPrepared, p, "unexpected prepared")
+	require.Equalf(t, reqErr, err, "unexpected error")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, reqFunctionName, m.expr,
+		"unexpected expr was passed")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+var reqStream *tarantool.Stream = &tarantool.Stream{}
+
+type newStreamMock struct {
+	Pooler
+	called int
+	mode   Mode
+}
+
+func (m *newStreamMock) NewStream(mode Mode) (*tarantool.Stream, error) {
+	m.called++
+	m.mode = mode
+	return reqStream, reqErr
+}
+
+func TestConnectorNewStream(t *testing.T) {
+	m := &newStreamMock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	s, err := c.NewStream()
+
+	require.Equalf(t, reqStream, s, "unexpected stream")
+	require.Equalf(t, reqErr, err, "unexpected error")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
+
+var reqRequest tarantool.Request = tarantool.NewPingRequest()
+
+type doMock struct {
+	Pooler
+	called int
+	req    tarantool.Request
+	mode   Mode
+}
+
+func (m *doMock) Do(req tarantool.Request, mode Mode) *tarantool.Future {
+	m.called++
+	m.req = req
+	m.mode = mode
+	return reqFuture
+}
+
+func TestConnectorDo(t *testing.T) {
+	m := &doMock{}
+	c := NewConnectorAdapter(m, testMode)
+
+	fut := c.Do(reqRequest)
+
+	require.Equalf(t, reqFuture, fut, "unexpected future")
+	require.Equalf(t, 1, m.called, "should be called only once")
+	require.Equalf(t, reqRequest, m.req, "unexpected request")
+	require.Equalf(t, testMode, m.mode, "unexpected proxy mode")
+}
diff --git a/connection_pool/example_test.go b/connection_pool/example_test.go
index 9328c616a..9a486924a 100644
--- a/connection_pool/example_test.go
+++ b/connection_pool/example_test.go
@@ -834,3 +834,24 @@ func ExampleBeginRequest_TxnIsolation() {
 	}
 	fmt.Printf("Select after Rollback: response is %#v\n", resp.Data)
 }
+
+func ExampleConnectorAdapter() {
+	pool, err := examplePool(testRoles)
+	if err != nil {
+		fmt.Println(err)
+	}
+	defer pool.Close()
+
+	adapter := connection_pool.NewConnectorAdapter(pool, connection_pool.RW)
+	var connector tarantool.Connector = adapter
+
+	// Ping an RW instance to check connection.
+	resp, err := connector.Ping()
+	fmt.Println("Ping Code", resp.Code)
+	fmt.Println("Ping Data", resp.Data)
+	fmt.Println("Ping Error", err)
+	// Output:
+	// Ping Code 0
+	// Ping Data []
+	// Ping Error <nil>
+}
diff --git a/connection_pool/pooler.go b/connection_pool/pooler.go
new file mode 100644
index 000000000..a9dbe09f9
--- /dev/null
+++ b/connection_pool/pooler.go
@@ -0,0 +1,89 @@
+package connection_pool
+
+import (
+	"time"
+
+	"github.com/tarantool/go-tarantool"
+)
+
+// Pooler is the interface that must be implemented by a connection pool.
+type Pooler interface {
+	ConnectedNow(mode Mode) (bool, error)
+	Close() []error
+	Ping(mode Mode) (*tarantool.Response, error)
+	ConfiguredTimeout(mode Mode) (time.Duration, error)
+
+	Select(space, index interface{}, offset, limit, iterator uint32,
+		key interface{}, mode ...Mode) (*tarantool.Response, error)
+	Insert(space interface{}, tuple interface{},
+		mode ...Mode) (*tarantool.Response, error)
+	Replace(space interface{}, tuple interface{},
+		mode ...Mode) (*tarantool.Response, error)
+	Delete(space, index interface{}, key interface{},
+		mode ...Mode) (*tarantool.Response, error)
+	Update(space, index interface{}, key, ops interface{},
+		mode ...Mode) (*tarantool.Response, error)
+	Upsert(space interface{}, tuple, ops interface{},
+		mode ...Mode) (*tarantool.Response, error)
+	Call(functionName string, args interface{},
+		mode Mode) (*tarantool.Response, error)
+	Call16(functionName string, args interface{},
+		mode Mode) (*tarantool.Response, error)
+	Call17(functionName string, args interface{},
+		mode Mode) (*tarantool.Response, error)
+	Eval(expr string, args interface{},
+		mode Mode) (*tarantool.Response, error)
+	Execute(expr string, args interface{},
+		mode Mode) (*tarantool.Response, error)
+
+	GetTyped(space, index interface{}, key interface{}, result interface{},
+		mode ...Mode) error
+	SelectTyped(space, index interface{}, offset, limit, iterator uint32,
+		key interface{}, result interface{}, mode ...Mode) error
+	InsertTyped(space interface{}, tuple interface{}, result interface{},
+		mode ...Mode) error
+	ReplaceTyped(space interface{}, tuple interface{}, result interface{},
+		mode ...Mode) error
+	DeleteTyped(space, index interface{}, key interface{}, result interface{},
+		mode ...Mode) error
+	UpdateTyped(space, index interface{}, key, ops interface{},
+		result interface{}, mode ...Mode) error
+	CallTyped(functionName string, args interface{}, result interface{},
+		mode Mode) error
+	Call16Typed(functionName string, args interface{}, result interface{},
+		mode Mode) error
+	Call17Typed(functionName string, args interface{}, result interface{},
+		mode Mode) error
+	EvalTyped(expr string, args interface{}, result interface{},
+		mode Mode) error
+	ExecuteTyped(expr string, args interface{}, result interface{},
+		mode Mode) (tarantool.SQLInfo, []tarantool.ColumnMetaData, error)
+
+	SelectAsync(space, index interface{}, offset, limit, iterator uint32,
+		key interface{}, mode ...Mode) *tarantool.Future
+	InsertAsync(space interface{}, tuple interface{},
+		mode ...Mode) *tarantool.Future
+	ReplaceAsync(space interface{}, tuple interface{},
+		mode ...Mode) *tarantool.Future
+	DeleteAsync(space, index interface{}, key interface{},
+		mode ...Mode) *tarantool.Future
+	UpdateAsync(space, index interface{}, key, ops interface{},
+		mode ...Mode) *tarantool.Future
+	UpsertAsync(space interface{}, tuple interface{}, ops interface{},
+		mode ...Mode) *tarantool.Future
+	CallAsync(functionName string, args interface{},
+		mode Mode) *tarantool.Future
+	Call16Async(functionName string, args interface{},
+		mode Mode) *tarantool.Future
+	Call17Async(functionName string, args interface{},
+		mode Mode) *tarantool.Future
+	EvalAsync(expr string, args interface{},
+		mode Mode) *tarantool.Future
+	ExecuteAsync(expr string, args interface{},
+		mode Mode) *tarantool.Future
+
+	NewPrepared(expr string, mode Mode) (*tarantool.Prepared, error)
+	NewStream(mode Mode) (*tarantool.Stream, error)
+
+	Do(req tarantool.Request, mode Mode) (fut *tarantool.Future)
+}

From 12bc5e2bc6ed63d8fd422cb22e7fe13105eefcc4 Mon Sep 17 00:00:00 2001
From: Oleg Jukovec <oleg.jukovec@tarantool.org>
Date: Mon, 12 Sep 2022 16:03:08 +0300
Subject: [PATCH 3/4] queue: bump package version to 1.2.1

It fixes queue.cfg({in_replicaset = true}) for Tarantool 1.10 [1].

1. https://github.com/tarantool/queue/issues/185

Part of #176
---
 CHANGELOG.md | 2 ++
 Makefile     | 2 +-
 2 files changed, 3 insertions(+), 1 deletion(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 9e0303af8..fe991b84b 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -18,6 +18,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
 
 ### Changed
 
+- Bump queue package version to 1.2.1 (#176)
+
 ### Fixed
 
 - Mode type description in the connection_pool subpackage (#208)
diff --git a/Makefile b/Makefile
index efe0d668c..59961774b 100644
--- a/Makefile
+++ b/Makefile
@@ -22,7 +22,7 @@ clean:
 
 .PHONY: deps
 deps: clean
-	( cd ./queue; tarantoolctl rocks install queue 1.2.0 )
+	( cd ./queue; tarantoolctl rocks install queue 1.2.1 )
 
 .PHONY: datetime-timezones
 datetime-timezones:

From 842866f01003a7cfe6be109a73cee8cc47b8053f Mon Sep 17 00:00:00 2001
From: Oleg Jukovec <oleg.jukovec@tarantool.org>
Date: Thu, 25 Aug 2022 13:49:13 +0300
Subject: [PATCH 4/4] queue: add an example with connection_pool

The example demonstrates how to use the queue package with
the connection_pool package.

Closes #176
---
 CHANGELOG.md                            |   1 +
 connection_pool/connection_pool_test.go |   2 +-
 multi/multi_test.go                     |   2 +-
 queue/example_connection_pool_test.go   | 206 ++++++++++++++++++++++++
 queue/queue_test.go                     |  39 ++++-
 queue/{ => testdata}/config.lua         |  14 +-
 queue/testdata/pool.lua                 |  56 +++++++
 test_helpers/main.go                    |   7 +-
 8 files changed, 311 insertions(+), 16 deletions(-)
 create mode 100644 queue/example_connection_pool_test.go
 rename queue/{ => testdata}/config.lua (81%)
 create mode 100644 queue/testdata/pool.lua

diff --git a/CHANGELOG.md b/CHANGELOG.md
index fe991b84b..f69e97128 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -15,6 +15,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
   ConnectionPool (#178)
 - Execute, ExecuteTyped and ExecuteAsync methods to ConnectionPool (#176)
 - ConnectorAdapter type to use ConnectionPool as Connector interface (#176)
+- An example how to use queue and connection_pool subpackages together (#176)
 
 ### Changed
 
diff --git a/connection_pool/connection_pool_test.go b/connection_pool/connection_pool_test.go
index b3aee615d..091d5216c 100644
--- a/connection_pool/connection_pool_test.go
+++ b/connection_pool/connection_pool_test.go
@@ -2056,7 +2056,7 @@ func TestStream_TxnIsolationLevel(t *testing.T) {
 func runTestMain(m *testing.M) int {
 	initScript := "config.lua"
 	waitStart := 100 * time.Millisecond
-	var connectRetry uint = 3
+	connectRetry := 3
 	retryTimeout := 500 * time.Millisecond
 	workDirs := []string{
 		"work_dir1", "work_dir2",
diff --git a/multi/multi_test.go b/multi/multi_test.go
index 643ea186d..2d43bb179 100644
--- a/multi/multi_test.go
+++ b/multi/multi_test.go
@@ -556,7 +556,7 @@ func TestStream_Rollback(t *testing.T) {
 func runTestMain(m *testing.M) int {
 	initScript := "config.lua"
 	waitStart := 100 * time.Millisecond
-	var connectRetry uint = 3
+	connectRetry := 3
 	retryTimeout := 500 * time.Millisecond
 
 	// Tarantool supports streams and interactive transactions since version 2.10.0
diff --git a/queue/example_connection_pool_test.go b/queue/example_connection_pool_test.go
new file mode 100644
index 000000000..59f5020ea
--- /dev/null
+++ b/queue/example_connection_pool_test.go
@@ -0,0 +1,206 @@
+package queue_test
+
+import (
+	"fmt"
+	"sync"
+	"time"
+
+	"github.com/google/uuid"
+	"github.com/tarantool/go-tarantool"
+	"github.com/tarantool/go-tarantool/connection_pool"
+	"github.com/tarantool/go-tarantool/queue"
+	"github.com/tarantool/go-tarantool/test_helpers"
+)
+
+// QueueConnectionHandler handles new connections in a ConnectionPool.
+type QueueConnectionHandler struct {
+	name string
+	cfg  queue.Cfg
+
+	uuid          uuid.UUID
+	registered    bool
+	err           error
+	mutex         sync.Mutex
+	masterUpdated chan struct{}
+}
+
+// QueueConnectionHandler implements the ConnectionHandler interface.
+var _ connection_pool.ConnectionHandler = &QueueConnectionHandler{}
+
+// NewQueueConnectionHandler creates a QueueConnectionHandler object.
+func NewQueueConnectionHandler(name string, cfg queue.Cfg) *QueueConnectionHandler {
+	return &QueueConnectionHandler{
+		name:          name,
+		cfg:           cfg,
+		masterUpdated: make(chan struct{}, 10),
+	}
+}
+
+// Discovered configures a queue for an instance and identifies a shared queue
+// session on master instances.
+//
+// NOTE: the Queue supports only a master-replica cluster configuration. It
+// does not support a master-master configuration.
+func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection,
+	role connection_pool.Role) error {
+	h.mutex.Lock()
+	defer h.mutex.Unlock()
+
+	if h.err != nil {
+		return h.err
+	}
+
+	master := role == connection_pool.MasterRole
+	if master {
+		defer func() {
+			h.masterUpdated <- struct{}{}
+		}()
+	}
+
+	// Set up a queue module configuration for an instance.
+	q := queue.New(conn, h.name)
+	opts := queue.CfgOpts{InReplicaset: true, Ttr: 60 * time.Second}
+
+	if h.err = q.Cfg(opts); h.err != nil {
+		return fmt.Errorf("unable to configure queue: %w", h.err)
+	}
+
+	// The queue only works with a master instance.
+	if !master {
+		return nil
+	}
+
+	if h.err = q.Create(h.cfg); h.err != nil {
+		return h.err
+	}
+
+	if !h.registered {
+		// We register a shared session at the first time.
+		if h.uuid, h.err = q.Identify(nil); h.err != nil {
+			return h.err
+		}
+		h.registered = true
+	} else {
+		// We re-identify as the shared session.
+		if _, h.err = q.Identify(&h.uuid); h.err != nil {
+			return h.err
+		}
+	}
+
+	fmt.Printf("Master %s is ready to work!\n", conn.Addr())
+
+	return nil
+}
+
+// Deactivated doesn't do anything useful for the example.
+func (h *QueueConnectionHandler) Deactivated(conn *tarantool.Connection,
+	role connection_pool.Role) error {
+	return nil
+}
+
+// Closes closes a QueueConnectionHandler object.
+func (h *QueueConnectionHandler) Close() {
+	close(h.masterUpdated)
+}
+
+// Example demonstrates how to use the queue package with the connection_pool
+// package. First of all, you need to create a ConnectionHandler implementation
+// for the a ConnectionPool object to process new connections from
+// RW-instances.
+//
+// You need to register a shared session UUID at a first master connection.
+// It needs to be used to re-identify as the shared session on new
+// RW-instances. See QueueConnectionHandler.Discovered() implementation.
+//
+// After that, you need to create a ConnectorAdapter object with RW mode for
+// the ConnectionPool to send requests into RW-instances. This adapter can
+// be used to create a ready-to-work queue object.
+func Example_connectionPool() {
+	// Create a ConnectionHandler object.
+	cfg := queue.Cfg{
+		Temporary:   false,
+		IfNotExists: true,
+		Kind:        queue.FIFO,
+		Opts: queue.Opts{
+			Ttl: 10 * time.Second,
+		},
+	}
+	h := NewQueueConnectionHandler("test_queue", cfg)
+	defer h.Close()
+
+	// Create a ConnectionPool object.
+	servers := []string{
+		"127.0.0.1:3014",
+		"127.0.0.1:3015",
+	}
+	connOpts := tarantool.Opts{
+		Timeout: 1 * time.Second,
+		User:    "test",
+		Pass:    "test",
+	}
+	poolOpts := connection_pool.OptsPool{
+		CheckTimeout:      1 * time.Second,
+		ConnectionHandler: h,
+	}
+	connPool, err := connection_pool.ConnectWithOpts(servers, connOpts, poolOpts)
+	if err != nil {
+		fmt.Printf("Unable to connect to the pool: %s", err)
+		return
+	}
+	defer connPool.Close()
+
+	// Wait for a master instance identification in the queue.
+	<-h.masterUpdated
+	if h.err != nil {
+		fmt.Printf("Unable to identify in the pool: %s", h.err)
+		return
+	}
+
+	// Create a Queue object from the ConnectionPool object via
+	// a ConnectorAdapter.
+	rw := connection_pool.NewConnectorAdapter(connPool, connection_pool.RW)
+	q := queue.New(rw, "test_queue")
+	fmt.Println("A Queue object is ready to work.")
+
+	testData := "test_data"
+	fmt.Println("Send data:", testData)
+	if _, err = q.Put(testData); err != nil {
+		fmt.Printf("Unable to put data into the queue: %s", err)
+		return
+	}
+
+	// Switch a master instance in the pool.
+	roles := []bool{true, false}
+	err = test_helpers.SetClusterRO(servers, connOpts, roles)
+	if err != nil {
+		fmt.Printf("Unable to set cluster roles: %s", err)
+		return
+	}
+
+	// Wait for a new master instance re-identification.
+	<-h.masterUpdated
+	if h.err != nil {
+		fmt.Printf("Unable to re-identify in the pool: %s", h.err)
+		return
+	}
+
+	// Take a data from the new master instance.
+	task, err := q.TakeTimeout(1 * time.Second)
+	if err != nil {
+		fmt.Println("Unable to got task:", err)
+	} else if task == nil {
+		fmt.Println("task == nil")
+	} else if task.Data() == nil {
+		fmt.Println("task.Data() == nil")
+	} else {
+		task.Ack()
+		fmt.Println("Got data:", task.Data())
+	}
+
+	// Output:
+	// Master 127.0.0.1:3014 is ready to work!
+	// A Queue object is ready to work.
+	// Send data: test_data
+	// Master 127.0.0.1:3015 is ready to work!
+	// Got data: test_data
+}
diff --git a/queue/queue_test.go b/queue/queue_test.go
index 7b19c0dd3..85276e4a8 100644
--- a/queue/queue_test.go
+++ b/queue/queue_test.go
@@ -14,6 +14,13 @@ import (
 )
 
 var server = "127.0.0.1:3013"
+var serversPool = []string{
+	"127.0.0.1:3014",
+	"127.0.0.1:3015",
+}
+
+var instances []test_helpers.TarantoolInstance
+
 var opts = Opts{
 	Timeout: 2500 * time.Millisecond,
 	User:    "test",
@@ -890,7 +897,7 @@ func TestTask_Touch(t *testing.T) {
 // https://stackoverflow.com/questions/27629380/how-to-exit-a-go-program-honoring-deferred-calls
 func runTestMain(m *testing.M) int {
 	inst, err := test_helpers.StartTarantool(test_helpers.StartOpts{
-		InitScript:   "config.lua",
+		InitScript:   "testdata/config.lua",
 		Listen:       server,
 		WorkDir:      "work_dir",
 		User:         opts.User,
@@ -899,12 +906,40 @@ func runTestMain(m *testing.M) int {
 		ConnectRetry: 3,
 		RetryTimeout: 500 * time.Millisecond,
 	})
-	defer test_helpers.StopTarantoolWithCleanup(inst)
 
 	if err != nil {
 		log.Fatalf("Failed to prepare test tarantool: %s", err)
 	}
 
+	defer test_helpers.StopTarantoolWithCleanup(inst)
+
+	workDirs := []string{"work_dir1", "work_dir2"}
+	poolOpts := test_helpers.StartOpts{
+		InitScript:   "testdata/pool.lua",
+		User:         opts.User,
+		Pass:         opts.Pass,
+		WaitStart:    3 * time.Second, // replication_timeout * 3
+		ConnectRetry: -1,
+	}
+	instances, err = test_helpers.StartTarantoolInstances(serversPool, workDirs, poolOpts)
+
+	if err != nil {
+		log.Fatalf("Failed to prepare test tarantool pool: %s", err)
+	}
+
+	defer test_helpers.StopTarantoolInstances(instances)
+
+	roles := []bool{false, true}
+	connOpts := Opts{
+		Timeout: 500 * time.Millisecond,
+		User:    "test",
+		Pass:    "test",
+	}
+	err = test_helpers.SetClusterRO(serversPool, connOpts, roles)
+
+	if err != nil {
+		log.Fatalf("Failed to set roles in tarantool pool: %s", err)
+	}
 	return m.Run()
 }
 
diff --git a/queue/config.lua b/queue/testdata/config.lua
similarity index 81%
rename from queue/config.lua
rename to queue/testdata/config.lua
index df28496a3..eccb19a68 100644
--- a/queue/config.lua
+++ b/queue/testdata/config.lua
@@ -7,7 +7,7 @@ box.cfg{
     work_dir = os.getenv("TEST_TNT_WORK_DIR"),
 }
 
-    box.once("init", function()
+box.once("init", function()
     box.schema.user.create('test', {password = 'test'})
     box.schema.func.create('queue.tube.test_queue:touch')
     box.schema.func.create('queue.tube.test_queue:ack')
@@ -23,21 +23,15 @@ box.cfg{
     box.schema.func.create('queue.identify')
     box.schema.func.create('queue.state')
     box.schema.func.create('queue.statistics')
-    box.schema.user.grant('test', 'create', 'space')
-    box.schema.user.grant('test', 'write', 'space', '_schema')
-    box.schema.user.grant('test', 'write', 'space', '_space')
-    box.schema.user.grant('test', 'read,write', 'space', '_space_sequence')
-    box.schema.user.grant('test', 'write', 'space', '_index')
+    box.schema.user.grant('test', 'create,read,write,drop', 'space')
     box.schema.user.grant('test', 'read, write', 'space', '_queue_session_ids')
     box.schema.user.grant('test', 'execute', 'universe')
     box.schema.user.grant('test', 'read,write', 'space', '_queue')
     box.schema.user.grant('test', 'read,write', 'space', '_schema')
+    box.schema.user.grant('test', 'read,write', 'space', '_space_sequence')
     box.schema.user.grant('test', 'read,write', 'space', '_space')
     box.schema.user.grant('test', 'read,write', 'space', '_index')
-    box.schema.user.grant('test', 'read,write', 'space', '_queue_consumers')
     box.schema.user.grant('test', 'read,write', 'space', '_priv')
-    box.schema.user.grant('test', 'read,write', 'space', '_queue_taken_2')
-    box.schema.user.grant('test', 'read,write', 'space', '_queue_shared_sessions')
     if box.space._trigger ~= nil then
         box.schema.user.grant('test', 'read', 'space', '_trigger')
     end
@@ -56,3 +50,5 @@ end)
 box.cfg{
     listen = os.getenv("TEST_TNT_LISTEN"),
 }
+
+require('console').start()
diff --git a/queue/testdata/pool.lua b/queue/testdata/pool.lua
new file mode 100644
index 000000000..7c63aa787
--- /dev/null
+++ b/queue/testdata/pool.lua
@@ -0,0 +1,56 @@
+local queue = require('queue')
+rawset(_G, 'queue', queue)
+
+local listen = os.getenv("TEST_TNT_LISTEN")
+box.cfg{
+    work_dir = os.getenv("TEST_TNT_WORK_DIR"),
+    listen = listen,
+    replication = {
+        "test:test@127.0.0.1:3014",
+        "test:test@127.0.0.1:3015",
+    },
+    read_only = listen == "127.0.0.1:3015"
+}
+
+box.once("schema", function()
+    box.schema.user.create('test', {password = 'test'})
+    box.schema.user.grant('test', 'replication')
+
+    box.schema.func.create('queue.tube.test_queue:touch')
+    box.schema.func.create('queue.tube.test_queue:ack')
+    box.schema.func.create('queue.tube.test_queue:put')
+    box.schema.func.create('queue.tube.test_queue:drop')
+    box.schema.func.create('queue.tube.test_queue:peek')
+    box.schema.func.create('queue.tube.test_queue:kick')
+    box.schema.func.create('queue.tube.test_queue:take')
+    box.schema.func.create('queue.tube.test_queue:delete')
+    box.schema.func.create('queue.tube.test_queue:release')
+    box.schema.func.create('queue.tube.test_queue:release_all')
+    box.schema.func.create('queue.tube.test_queue:bury')
+    box.schema.func.create('queue.identify')
+    box.schema.func.create('queue.state')
+    box.schema.func.create('queue.statistics')
+    box.schema.user.grant('test', 'create,read,write,drop', 'space')
+    box.schema.user.grant('test', 'read, write', 'space', '_queue_session_ids')
+    box.schema.user.grant('test', 'execute', 'universe')
+    box.schema.user.grant('test', 'read,write', 'space', '_queue')
+    box.schema.user.grant('test', 'read,write', 'space', '_schema')
+    box.schema.user.grant('test', 'read,write', 'space', '_space_sequence')
+    box.schema.user.grant('test', 'read,write', 'space', '_space')
+    box.schema.user.grant('test', 'read,write', 'space', '_index')
+    box.schema.user.grant('test', 'read,write', 'space', '_priv')
+    if box.space._trigger ~= nil then
+        box.schema.user.grant('test', 'read', 'space', '_trigger')
+    end
+    if box.space._fk_constraint ~= nil then
+        box.schema.user.grant('test', 'read', 'space', '_fk_constraint')
+    end
+    if box.space._ck_constraint ~= nil then
+        box.schema.user.grant('test', 'read', 'space', '_ck_constraint')
+    end
+    if box.space._func_index ~= nil then
+        box.schema.user.grant('test', 'read', 'space', '_func_index')
+    end
+end)
+
+require('console').start()
diff --git a/test_helpers/main.go b/test_helpers/main.go
index cdc3c343d..77e22c535 100644
--- a/test_helpers/main.go
+++ b/test_helpers/main.go
@@ -67,8 +67,9 @@ type StartOpts struct {
 	// WaitStart is a time to wait before starting to ping tarantool.
 	WaitStart time.Duration
 
-	// ConnectRetry is a count of attempts to ping tarantool.
-	ConnectRetry uint
+	// ConnectRetry is a count of retry attempts to ping tarantool. If the
+	// value < 0 then there will be no ping tarantool at all.
+	ConnectRetry int
 
 	// RetryTimeout is a time between tarantool ping retries.
 	RetryTimeout time.Duration
@@ -240,7 +241,7 @@ func StartTarantool(startOpts StartOpts) (TarantoolInstance, error) {
 		Ssl:        startOpts.ClientSsl,
 	}
 
-	var i uint
+	var i int
 	var server string
 	if startOpts.ClientServer != "" {
 		server = startOpts.ClientServer