diff --git a/CHANGELOG.md b/CHANGELOG.md index 333c09554..f69e97128 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,9 +13,14 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. - Support queue 1.2.0 (#177) - ConnectionHandler interface for handling changes of connections in ConnectionPool (#178) +- Execute, ExecuteTyped and ExecuteAsync methods to ConnectionPool (#176) +- ConnectorAdapter type to use ConnectionPool as Connector interface (#176) +- An example how to use queue and connection_pool subpackages together (#176) ### Changed +- Bump queue package version to 1.2.1 (#176) + ### Fixed - Mode type description in the connection_pool subpackage (#208) diff --git a/Makefile b/Makefile index efe0d668c..59961774b 100644 --- a/Makefile +++ b/Makefile @@ -22,7 +22,7 @@ clean: .PHONY: deps deps: clean - ( cd ./queue; tarantoolctl rocks install queue 1.2.0 ) + ( cd ./queue; tarantoolctl rocks install queue 1.2.1 ) .PHONY: datetime-timezones datetime-timezones: diff --git a/connection_pool/connection_pool.go b/connection_pool/connection_pool.go index 79667ae7c..6597e2dd0 100644 --- a/connection_pool/connection_pool.go +++ b/connection_pool/connection_pool.go @@ -99,6 +99,8 @@ type ConnectionPool struct { poolsMutex sync.RWMutex } +var _ Pooler = (*ConnectionPool)(nil) + type connState struct { addr string notify chan tarantool.ConnEvent @@ -385,6 +387,16 @@ func (connPool *ConnectionPool) Eval(expr string, args interface{}, userMode Mod return conn.Eval(expr, args) } +// Execute passes sql expression to Tarantool for execution. +func (connPool *ConnectionPool) Execute(expr string, args interface{}, userMode Mode) (resp *tarantool.Response, err error) { + conn, err := connPool.getNextConnection(userMode) + if err != nil { + return nil, err + } + + return conn.Execute(expr, args) +} + // GetTyped performs select (with limit = 1 and offset = 0) // to box space and fills typed result. func (connPool *ConnectionPool) GetTyped(space, index interface{}, key interface{}, result interface{}, userMode ...Mode) (err error) { @@ -495,6 +507,16 @@ func (connPool *ConnectionPool) EvalTyped(expr string, args interface{}, result return conn.EvalTyped(expr, args, result) } +// ExecuteTyped passes sql expression to Tarantool for execution. +func (connPool *ConnectionPool) ExecuteTyped(expr string, args interface{}, result interface{}, userMode Mode) (tarantool.SQLInfo, []tarantool.ColumnMetaData, error) { + conn, err := connPool.getNextConnection(userMode) + if err != nil { + return tarantool.SQLInfo{}, nil, err + } + + return conn.ExecuteTyped(expr, args, result) +} + // SelectAsync sends select request to Tarantool and returns Future. func (connPool *ConnectionPool) SelectAsync(space, index interface{}, offset, limit, iterator uint32, key interface{}, userMode ...Mode) *tarantool.Future { conn, err := connPool.getConnByMode(ANY, userMode) @@ -607,6 +629,17 @@ func (connPool *ConnectionPool) EvalAsync(expr string, args interface{}, userMod return conn.EvalAsync(expr, args) } +// ExecuteAsync sends sql expression to Tarantool for execution and returns +// Future. +func (connPool *ConnectionPool) ExecuteAsync(expr string, args interface{}, userMode Mode) *tarantool.Future { + conn, err := connPool.getNextConnection(userMode) + if err != nil { + return newErrorFuture(err) + } + + return conn.ExecuteAsync(expr, args) +} + // Do sends the request and returns a future. // For requests that belong to an only one connection (e.g. Unprepare or ExecutePrepared) // the argument of type Mode is unused. diff --git a/connection_pool/connection_pool_test.go b/connection_pool/connection_pool_test.go index a41495d8d..091d5216c 100644 --- a/connection_pool/connection_pool_test.go +++ b/connection_pool/connection_pool_test.go @@ -642,6 +642,67 @@ func TestEval(t *testing.T) { require.Falsef(t, val, "expected `false` with mode `RW`") } +type Member struct { + id uint + val string +} + +func (m *Member) DecodeMsgpack(d *decoder) error { + var err error + var l int + if l, err = d.DecodeArrayLen(); err != nil { + return err + } + if l != 2 { + return fmt.Errorf("array len doesn't match: %d", l) + } + if m.id, err = d.DecodeUint(); err != nil { + return err + } + if m.val, err = d.DecodeString(); err != nil { + return err + } + return nil +} + +func TestExecute(t *testing.T) { + test_helpers.SkipIfSQLUnsupported(t) + + roles := []bool{false, true, false, false, true} + + err := test_helpers.SetClusterRO(servers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + + connPool, err := connection_pool.Connect(servers, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + request := "SELECT NAME0, NAME1 FROM SQL_TEST WHERE NAME0 == 1;" + // Execute + resp, err := connPool.Execute(request, []interface{}{}, connection_pool.ANY) + require.Nilf(t, err, "failed to Execute") + require.NotNilf(t, resp, "response is nil after Execute") + require.GreaterOrEqualf(t, len(resp.Data), 1, "response.Data is empty after Execute") + require.Equalf(t, len(resp.Data[0].([]interface{})), 2, "unexpected response") + + // ExecuteTyped + mem := []Member{} + info, _, err := connPool.ExecuteTyped(request, []interface{}{}, &mem, connection_pool.ANY) + require.Nilf(t, err, "failed to ExecuteTyped") + require.Equalf(t, info.AffectedCount, uint64(0), "unexpected info.AffectedCount") + require.Equalf(t, len(mem), 1, "wrong count of results") + + // ExecuteAsync + fut := connPool.ExecuteAsync(request, []interface{}{}, connection_pool.ANY) + resp, err = fut.Get() + require.Nilf(t, err, "failed to ExecuteAsync") + require.NotNilf(t, resp, "response is nil after ExecuteAsync") + require.GreaterOrEqualf(t, len(resp.Data), 1, "response.Data is empty after ExecuteAsync") + require.Equalf(t, len(resp.Data[0].([]interface{})), 2, "unexpected response") +} + func TestRoundRobinStrategy(t *testing.T) { roles := []bool{false, true, false, false, true} @@ -1995,7 +2056,7 @@ func TestStream_TxnIsolationLevel(t *testing.T) { func runTestMain(m *testing.M) int { initScript := "config.lua" waitStart := 100 * time.Millisecond - var connectRetry uint = 3 + connectRetry := 3 retryTimeout := 500 * time.Millisecond workDirs := []string{ "work_dir1", "work_dir2", diff --git a/connection_pool/connector.go b/connection_pool/connector.go new file mode 100644 index 000000000..e52109d92 --- /dev/null +++ b/connection_pool/connector.go @@ -0,0 +1,305 @@ +package connection_pool + +import ( + "errors" + "fmt" + "time" + + "github.com/tarantool/go-tarantool" +) + +// ConnectorAdapter allows to use Pooler as Connector. +type ConnectorAdapter struct { + pool Pooler + mode Mode +} + +var _ tarantool.Connector = (*ConnectorAdapter)(nil) + +// NewConnectorAdapter creates a new ConnectorAdapter object for a pool +// and with a mode. All requests to the pool will be executed in the +// specified mode. +func NewConnectorAdapter(pool Pooler, mode Mode) *ConnectorAdapter { + return &ConnectorAdapter{pool: pool, mode: mode} +} + +// ConnectedNow reports if connections is established at the moment. +func (c *ConnectorAdapter) ConnectedNow() bool { + ret, err := c.pool.ConnectedNow(c.mode) + if err != nil { + return false + } + return ret +} + +// ClosedNow reports if the connector is closed by user or all connections +// in the specified mode closed. +func (c *ConnectorAdapter) Close() error { + errs := c.pool.Close() + if len(errs) == 0 { + return nil + } + + err := errors.New("failed to close connection pool") + for _, e := range errs { + err = fmt.Errorf("%s: %w", err.Error(), e) + } + return err +} + +// Ping sends empty request to Tarantool to check connection. +func (c *ConnectorAdapter) Ping() (*tarantool.Response, error) { + return c.pool.Ping(c.mode) +} + +// ConfiguredTimeout returns a timeout from connections config. +func (c *ConnectorAdapter) ConfiguredTimeout() time.Duration { + ret, err := c.pool.ConfiguredTimeout(c.mode) + if err != nil { + return 0 * time.Second + } + return ret +} + +// Select performs select to box space. +func (c *ConnectorAdapter) Select(space, index interface{}, + offset, limit, iterator uint32, + key interface{}) (*tarantool.Response, error) { + return c.pool.Select(space, index, offset, limit, iterator, key, c.mode) +} + +// Insert performs insertion to box space. +func (c *ConnectorAdapter) Insert(space interface{}, + tuple interface{}) (*tarantool.Response, error) { + return c.pool.Insert(space, tuple, c.mode) +} + +// Replace performs "insert or replace" action to box space. +func (c *ConnectorAdapter) Replace(space interface{}, + tuple interface{}) (*tarantool.Response, error) { + return c.pool.Replace(space, tuple, c.mode) +} + +// Delete performs deletion of a tuple by key. +func (c *ConnectorAdapter) Delete(space, index interface{}, + key interface{}) (*tarantool.Response, error) { + return c.pool.Delete(space, index, key, c.mode) +} + +// Update performs update of a tuple by key. +func (c *ConnectorAdapter) Update(space, index interface{}, + key, ops interface{}) (*tarantool.Response, error) { + return c.pool.Update(space, index, key, ops, c.mode) +} + +// Upsert performs "update or insert" action of a tuple by key. +func (c *ConnectorAdapter) Upsert(space interface{}, + tuple, ops interface{}) (*tarantool.Response, error) { + return c.pool.Upsert(space, tuple, ops, c.mode) +} + +// Call calls registered Tarantool function. +// It uses request code for Tarantool >= 1.7 if go-tarantool +// was build with go_tarantool_call_17 tag. +// Otherwise, uses request code for Tarantool 1.6. +func (c *ConnectorAdapter) Call(functionName string, + args interface{}) (*tarantool.Response, error) { + return c.pool.Call(functionName, args, c.mode) +} + +// Call16 calls registered Tarantool function. +// It uses request code for Tarantool 1.6, so result is converted to array of arrays +// Deprecated since Tarantool 1.7.2. +func (c *ConnectorAdapter) Call16(functionName string, + args interface{}) (*tarantool.Response, error) { + return c.pool.Call16(functionName, args, c.mode) +} + +// Call17 calls registered Tarantool function. +// It uses request code for Tarantool >= 1.7, so result is not converted +// (though, keep in mind, result is always array) +func (c *ConnectorAdapter) Call17(functionName string, + args interface{}) (*tarantool.Response, error) { + return c.pool.Call17(functionName, args, c.mode) +} + +// Eval passes Lua expression for evaluation. +func (c *ConnectorAdapter) Eval(expr string, + args interface{}) (*tarantool.Response, error) { + return c.pool.Eval(expr, args, c.mode) +} + +// Execute passes sql expression to Tarantool for execution. +func (c *ConnectorAdapter) Execute(expr string, + args interface{}) (*tarantool.Response, error) { + return c.pool.Execute(expr, args, c.mode) +} + +// GetTyped performs select (with limit = 1 and offset = 0) +// to box space and fills typed result. +func (c *ConnectorAdapter) GetTyped(space, index interface{}, + key interface{}, result interface{}) error { + return c.pool.GetTyped(space, index, key, result, c.mode) +} + +// SelectTyped performs select to box space and fills typed result. +func (c *ConnectorAdapter) SelectTyped(space, index interface{}, + offset, limit, iterator uint32, + key interface{}, result interface{}) error { + return c.pool.SelectTyped(space, index, offset, limit, iterator, key, result, c.mode) +} + +// InsertTyped performs insertion to box space. +func (c *ConnectorAdapter) InsertTyped(space interface{}, + tuple interface{}, result interface{}) error { + return c.pool.InsertTyped(space, tuple, result, c.mode) +} + +// ReplaceTyped performs "insert or replace" action to box space. +func (c *ConnectorAdapter) ReplaceTyped(space interface{}, + tuple interface{}, result interface{}) error { + return c.pool.ReplaceTyped(space, tuple, result, c.mode) +} + +// DeleteTyped performs deletion of a tuple by key and fills result with deleted tuple. +func (c *ConnectorAdapter) DeleteTyped(space, index interface{}, + key interface{}, result interface{}) error { + return c.pool.DeleteTyped(space, index, key, result, c.mode) +} + +// UpdateTyped performs update of a tuple by key and fills result with updated tuple. +func (c *ConnectorAdapter) UpdateTyped(space, index interface{}, + key, ops interface{}, result interface{}) error { + return c.pool.UpdateTyped(space, index, key, ops, result, c.mode) +} + +// CallTyped calls registered function. +// It uses request code for Tarantool >= 1.7 if go-tarantool +// was build with go_tarantool_call_17 tag. +// Otherwise, uses request code for Tarantool 1.6. +func (c *ConnectorAdapter) CallTyped(functionName string, + args interface{}, result interface{}) error { + return c.pool.CallTyped(functionName, args, result, c.mode) +} + +// Call16Typed calls registered function. +// It uses request code for Tarantool 1.6, so result is converted to array of arrays +// Deprecated since Tarantool 1.7.2. +func (c *ConnectorAdapter) Call16Typed(functionName string, + args interface{}, result interface{}) error { + return c.pool.Call16Typed(functionName, args, result, c.mode) +} + +// Call17Typed calls registered function. +// It uses request code for Tarantool >= 1.7, so result is not converted +// (though, keep in mind, result is always array) +func (c *ConnectorAdapter) Call17Typed(functionName string, + args interface{}, result interface{}) error { + return c.pool.Call17Typed(functionName, args, result, c.mode) +} + +// EvalTyped passes Lua expression for evaluation. +func (c *ConnectorAdapter) EvalTyped(expr string, args interface{}, + result interface{}) error { + return c.pool.EvalTyped(expr, args, result, c.mode) +} + +// ExecuteTyped passes sql expression to Tarantool for execution. +func (c *ConnectorAdapter) ExecuteTyped(expr string, args interface{}, + result interface{}) (tarantool.SQLInfo, []tarantool.ColumnMetaData, error) { + return c.pool.ExecuteTyped(expr, args, result, c.mode) +} + +// SelectAsync sends select request to Tarantool and returns Future. +func (c *ConnectorAdapter) SelectAsync(space, index interface{}, + offset, limit, iterator uint32, key interface{}) *tarantool.Future { + return c.pool.SelectAsync(space, index, offset, limit, iterator, key, c.mode) +} + +// InsertAsync sends insert action to Tarantool and returns Future. +func (c *ConnectorAdapter) InsertAsync(space interface{}, + tuple interface{}) *tarantool.Future { + return c.pool.InsertAsync(space, tuple, c.mode) +} + +// ReplaceAsync sends "insert or replace" action to Tarantool and returns Future. +func (c *ConnectorAdapter) ReplaceAsync(space interface{}, + tuple interface{}) *tarantool.Future { + return c.pool.ReplaceAsync(space, tuple, c.mode) +} + +// DeleteAsync sends deletion action to Tarantool and returns Future. +func (c *ConnectorAdapter) DeleteAsync(space, index interface{}, + key interface{}) *tarantool.Future { + return c.pool.DeleteAsync(space, index, key, c.mode) +} + +// Update sends deletion of a tuple by key and returns Future. +func (c *ConnectorAdapter) UpdateAsync(space, index interface{}, + key, ops interface{}) *tarantool.Future { + return c.pool.UpdateAsync(space, index, key, ops, c.mode) +} + +// UpsertAsync sends "update or insert" action to Tarantool and returns Future. +func (c *ConnectorAdapter) UpsertAsync(space interface{}, tuple interface{}, + ops interface{}) *tarantool.Future { + return c.pool.UpsertAsync(space, tuple, ops, c.mode) +} + +// CallAsync sends a call to registered Tarantool function and returns Future. +// It uses request code for Tarantool >= 1.7 if go-tarantool +// was build with go_tarantool_call_17 tag. +// Otherwise, uses request code for Tarantool 1.6. +func (c *ConnectorAdapter) CallAsync(functionName string, + args interface{}) *tarantool.Future { + return c.pool.CallAsync(functionName, args, c.mode) +} + +// Call16Async sends a call to registered Tarantool function and returns Future. +// It uses request code for Tarantool 1.6, so future's result is always array of arrays. +// Deprecated since Tarantool 1.7.2. +func (c *ConnectorAdapter) Call16Async(functionName string, + args interface{}) *tarantool.Future { + return c.pool.Call16Async(functionName, args, c.mode) +} + +// Call17Async sends a call to registered Tarantool function and returns Future. +// It uses request code for Tarantool >= 1.7, so future's result will not be converted +// (though, keep in mind, result is always array) +func (c *ConnectorAdapter) Call17Async(functionName string, + args interface{}) *tarantool.Future { + return c.pool.Call17Async(functionName, args, c.mode) +} + +// EvalAsync sends a Lua expression for evaluation and returns Future. +func (c *ConnectorAdapter) EvalAsync(expr string, + args interface{}) *tarantool.Future { + return c.pool.EvalAsync(expr, args, c.mode) +} + +// ExecuteAsync sends a sql expression for execution and returns Future. +func (c *ConnectorAdapter) ExecuteAsync(expr string, + args interface{}) *tarantool.Future { + return c.pool.ExecuteAsync(expr, args, c.mode) +} + +// NewPrepared passes a sql statement to Tarantool for preparation +// synchronously. +func (c *ConnectorAdapter) NewPrepared(expr string) (*tarantool.Prepared, error) { + return c.pool.NewPrepared(expr, c.mode) +} + +// NewStream creates new Stream object for connection. +// +// Since v. 2.10.0, Tarantool supports streams and interactive transactions over +// them. To use interactive transactions, memtx_use_mvcc_engine box option +// should be set to true. +// Since 1.7.0 +func (c *ConnectorAdapter) NewStream() (*tarantool.Stream, error) { + return c.pool.NewStream(c.mode) +} + +// Do performs a request asynchronously on the connection. +func (c *ConnectorAdapter) Do(req tarantool.Request) *tarantool.Future { + return c.pool.Do(req, c.mode) +} diff --git a/connection_pool/connector_test.go b/connection_pool/connector_test.go new file mode 100644 index 000000000..fa7cf06ba --- /dev/null +++ b/connection_pool/connector_test.go @@ -0,0 +1,1168 @@ +package connection_pool_test + +import ( + "errors" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/tarantool/go-tarantool" + . "github.com/tarantool/go-tarantool/connection_pool" +) + +var testMode Mode = RW + +type connectedNowMock struct { + Pooler + called int + mode Mode + retErr bool +} + +// Tests for different logic. + +func (m *connectedNowMock) ConnectedNow(mode Mode) (bool, error) { + m.called++ + m.mode = mode + + if m.retErr { + return true, errors.New("mock error") + } + return true, nil +} + +func TestConnectorConnectedNow(t *testing.T) { + m := &connectedNowMock{retErr: false} + c := NewConnectorAdapter(m, testMode) + + require.Truef(t, c.ConnectedNow(), "unexpected result") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +func TestConnectorConnectedNowWithError(t *testing.T) { + m := &connectedNowMock{retErr: true} + c := NewConnectorAdapter(m, testMode) + + require.Falsef(t, c.ConnectedNow(), "unexpected result") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +type closeMock struct { + Pooler + called int + retErr bool +} + +func (m *closeMock) Close() []error { + m.called++ + if m.retErr { + return []error{errors.New("err1"), errors.New("err2")} + } + return nil +} + +func TestConnectorClose(t *testing.T) { + m := &closeMock{retErr: false} + c := NewConnectorAdapter(m, testMode) + + require.Nilf(t, c.Close(), "unexpected result") + require.Equalf(t, 1, m.called, "should be called only once") +} + +func TestConnectorCloseWithError(t *testing.T) { + m := &closeMock{retErr: true} + c := NewConnectorAdapter(m, testMode) + + err := c.Close() + require.NotNilf(t, err, "unexpected result") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equal(t, "failed to close connection pool: err1: err2", err.Error()) +} + +type configuredTimeoutMock struct { + Pooler + called int + timeout time.Duration + mode Mode + retErr bool +} + +func (m *configuredTimeoutMock) ConfiguredTimeout(mode Mode) (time.Duration, error) { + m.called++ + m.mode = mode + m.timeout = 5 * time.Second + if m.retErr { + return m.timeout, fmt.Errorf("err") + } + return m.timeout, nil +} + +func TestConnectorConfiguredTimeout(t *testing.T) { + m := &configuredTimeoutMock{retErr: false} + c := NewConnectorAdapter(m, testMode) + + require.Equalf(t, c.ConfiguredTimeout(), m.timeout, "unexpected result") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +func TestConnectorConfiguredTimeoutWithError(t *testing.T) { + m := &configuredTimeoutMock{retErr: true} + c := NewConnectorAdapter(m, testMode) + + ret := c.ConfiguredTimeout() + + require.NotEqualf(t, ret, m.timeout, "unexpected result") + require.Equalf(t, ret, time.Duration(0), "unexpected result") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +// Tests for that ConnectorAdapter is just a proxy for requests. + +type baseRequestMock struct { + Pooler + called int + functionName string + offset, limit, iterator uint32 + space, index interface{} + args, tuple, key, ops interface{} + result interface{} + mode Mode +} + +var reqResp *tarantool.Response = &tarantool.Response{} +var reqErr error = errors.New("response error") +var reqFuture *tarantool.Future = &tarantool.Future{} + +var reqFunctionName string = "any_name" +var reqOffset uint32 = 1 +var reqLimit uint32 = 2 +var reqIterator uint32 = 3 +var reqSpace interface{} = []interface{}{1} +var reqIndex interface{} = []interface{}{2} +var reqArgs interface{} = []interface{}{3} +var reqTuple interface{} = []interface{}{4} +var reqKey interface{} = []interface{}{5} +var reqOps interface{} = []interface{}{6} + +var reqResult interface{} = []interface{}{7} +var reqSqlInfo = tarantool.SQLInfo{AffectedCount: 3} +var reqMeta = []tarantool.ColumnMetaData{{FieldIsNullable: false}} + +type getTypedMock struct { + baseRequestMock +} + +func (m *getTypedMock) GetTyped(space, index, key interface{}, + result interface{}, mode ...Mode) error { + m.called++ + m.space = space + m.index = index + m.key = key + m.result = result + m.mode = mode[0] + return reqErr +} + +func TestConnectorGetTyped(t *testing.T) { + m := &getTypedMock{} + c := NewConnectorAdapter(m, testMode) + + err := c.GetTyped(reqSpace, reqIndex, reqKey, reqResult) + + require.Equalf(t, reqErr, err, "unexpected error") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, reqSpace, m.space, "unexpected space was passed") + require.Equalf(t, reqIndex, m.index, "unexpected index was passed") + require.Equalf(t, reqKey, m.key, "unexpected key was passed") + require.Equalf(t, reqResult, m.result, "unexpected result was passed") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +type selectMock struct { + baseRequestMock +} + +func (m *selectMock) Select(space, index interface{}, + offset, limit, iterator uint32, key interface{}, + mode ...Mode) (*tarantool.Response, error) { + m.called++ + m.space = space + m.index = index + m.offset = offset + m.limit = limit + m.iterator = iterator + m.key = key + m.mode = mode[0] + return reqResp, reqErr +} + +func TestConnectorSelect(t *testing.T) { + m := &selectMock{} + c := NewConnectorAdapter(m, testMode) + + resp, err := c.Select(reqSpace, reqIndex, reqOffset, reqLimit, reqIterator, reqKey) + + require.Equalf(t, reqResp, resp, "unexpected response") + require.Equalf(t, reqErr, err, "unexpected error") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, reqSpace, m.space, "unexpected space was passed") + require.Equalf(t, reqIndex, m.index, "unexpected index was passed") + require.Equalf(t, reqOffset, m.offset, "unexpected offset was passed") + require.Equalf(t, reqLimit, m.limit, "unexpected limit was passed") + require.Equalf(t, reqIterator, m.iterator, "unexpected iterator was passed") + require.Equalf(t, reqKey, m.key, "unexpected key was passed") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +type selectTypedMock struct { + baseRequestMock +} + +func (m *selectTypedMock) SelectTyped(space, index interface{}, + offset, limit, iterator uint32, key interface{}, + result interface{}, mode ...Mode) error { + m.called++ + m.space = space + m.index = index + m.offset = offset + m.limit = limit + m.iterator = iterator + m.key = key + m.result = result + m.mode = mode[0] + return reqErr +} + +func TestConnectorSelectTyped(t *testing.T) { + m := &selectTypedMock{} + c := NewConnectorAdapter(m, testMode) + + err := c.SelectTyped(reqSpace, reqIndex, reqOffset, reqLimit, + reqIterator, reqKey, reqResult) + + require.Equalf(t, reqErr, err, "unexpected error") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, reqSpace, m.space, "unexpected space was passed") + require.Equalf(t, reqIndex, m.index, "unexpected index was passed") + require.Equalf(t, reqOffset, m.offset, "unexpected offset was passed") + require.Equalf(t, reqLimit, m.limit, "unexpected limit was passed") + require.Equalf(t, reqIterator, m.iterator, "unexpected iterator was passed") + require.Equalf(t, reqKey, m.key, "unexpected key was passed") + require.Equalf(t, reqResult, m.result, "unexpected result was passed") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +type selectAsyncMock struct { + baseRequestMock +} + +func (m *selectAsyncMock) SelectAsync(space, index interface{}, + offset, limit, iterator uint32, key interface{}, + mode ...Mode) *tarantool.Future { + m.called++ + m.space = space + m.index = index + m.offset = offset + m.limit = limit + m.iterator = iterator + m.key = key + m.mode = mode[0] + return reqFuture +} + +func TestConnectorSelectAsync(t *testing.T) { + m := &selectAsyncMock{} + c := NewConnectorAdapter(m, testMode) + + fut := c.SelectAsync(reqSpace, reqIndex, reqOffset, reqLimit, + reqIterator, reqKey) + + require.Equalf(t, reqFuture, fut, "unexpected future") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, reqSpace, m.space, "unexpected space was passed") + require.Equalf(t, reqIndex, m.index, "unexpected index was passed") + require.Equalf(t, reqOffset, m.offset, "unexpected offset was passed") + require.Equalf(t, reqLimit, m.limit, "unexpected limit was passed") + require.Equalf(t, reqIterator, m.iterator, "unexpected iterator was passed") + require.Equalf(t, reqKey, m.key, "unexpected key was passed") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +type insertMock struct { + baseRequestMock +} + +func (m *insertMock) Insert(space, tuple interface{}, + mode ...Mode) (*tarantool.Response, error) { + m.called++ + m.space = space + m.tuple = tuple + m.mode = mode[0] + return reqResp, reqErr +} + +func TestConnectorInsert(t *testing.T) { + m := &insertMock{} + c := NewConnectorAdapter(m, testMode) + + resp, err := c.Insert(reqSpace, reqTuple) + + require.Equalf(t, reqResp, resp, "unexpected response") + require.Equalf(t, reqErr, err, "unexpected error") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, reqSpace, m.space, "unexpected space was passed") + require.Equalf(t, reqTuple, m.tuple, "unexpected tuple was passed") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +type insertTypedMock struct { + baseRequestMock +} + +func (m *insertTypedMock) InsertTyped(space, tuple interface{}, + result interface{}, mode ...Mode) error { + m.called++ + m.space = space + m.tuple = tuple + m.result = result + m.mode = mode[0] + return reqErr +} + +func TestConnectorInsertTyped(t *testing.T) { + m := &insertTypedMock{} + c := NewConnectorAdapter(m, testMode) + + err := c.InsertTyped(reqSpace, reqTuple, reqResult) + + require.Equalf(t, reqErr, err, "unexpected error") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, reqSpace, m.space, "unexpected space was passed") + require.Equalf(t, reqTuple, m.tuple, "unexpected tuple was passed") + require.Equalf(t, reqResult, m.result, "unexpected result was passed") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +type insertAsyncMock struct { + baseRequestMock +} + +func (m *insertAsyncMock) InsertAsync(space, tuple interface{}, + mode ...Mode) *tarantool.Future { + m.called++ + m.space = space + m.tuple = tuple + m.mode = mode[0] + return reqFuture +} + +func TestConnectorInsertAsync(t *testing.T) { + m := &insertAsyncMock{} + c := NewConnectorAdapter(m, testMode) + + fut := c.InsertAsync(reqSpace, reqTuple) + + require.Equalf(t, reqFuture, fut, "unexpected future") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, reqSpace, m.space, "unexpected space was passed") + require.Equalf(t, reqTuple, m.tuple, "unexpected tuple was passed") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +type replaceMock struct { + baseRequestMock +} + +func (m *replaceMock) Replace(space, tuple interface{}, + mode ...Mode) (*tarantool.Response, error) { + m.called++ + m.space = space + m.tuple = tuple + m.mode = mode[0] + return reqResp, reqErr +} + +func TestConnectorReplace(t *testing.T) { + m := &replaceMock{} + c := NewConnectorAdapter(m, testMode) + + resp, err := c.Replace(reqSpace, reqTuple) + + require.Equalf(t, reqResp, resp, "unexpected response") + require.Equalf(t, reqErr, err, "unexpected error") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, reqSpace, m.space, "unexpected space was passed") + require.Equalf(t, reqTuple, m.tuple, "unexpected tuple was passed") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +type replaceTypedMock struct { + baseRequestMock +} + +func (m *replaceTypedMock) ReplaceTyped(space, tuple interface{}, + result interface{}, mode ...Mode) error { + m.called++ + m.space = space + m.tuple = tuple + m.result = result + m.mode = mode[0] + return reqErr +} + +func TestConnectorReplaceTyped(t *testing.T) { + m := &replaceTypedMock{} + c := NewConnectorAdapter(m, testMode) + + err := c.ReplaceTyped(reqSpace, reqTuple, reqResult) + + require.Equalf(t, reqErr, err, "unexpected error") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, reqSpace, m.space, "unexpected space was passed") + require.Equalf(t, reqTuple, m.tuple, "unexpected tuple was passed") + require.Equalf(t, reqResult, m.result, "unexpected result was passed") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +type replaceAsyncMock struct { + baseRequestMock +} + +func (m *replaceAsyncMock) ReplaceAsync(space, tuple interface{}, + mode ...Mode) *tarantool.Future { + m.called++ + m.space = space + m.tuple = tuple + m.mode = mode[0] + return reqFuture +} + +func TestConnectorReplaceAsync(t *testing.T) { + m := &replaceAsyncMock{} + c := NewConnectorAdapter(m, testMode) + + fut := c.ReplaceAsync(reqSpace, reqTuple) + + require.Equalf(t, reqFuture, fut, "unexpected future") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, reqSpace, m.space, "unexpected space was passed") + require.Equalf(t, reqTuple, m.tuple, "unexpected tuple was passed") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +type deleteMock struct { + baseRequestMock +} + +func (m *deleteMock) Delete(space, index, key interface{}, + mode ...Mode) (*tarantool.Response, error) { + m.called++ + m.space = space + m.index = index + m.key = key + m.mode = mode[0] + return reqResp, reqErr +} + +func TestConnectorDelete(t *testing.T) { + m := &deleteMock{} + c := NewConnectorAdapter(m, testMode) + + resp, err := c.Delete(reqSpace, reqIndex, reqKey) + + require.Equalf(t, reqResp, resp, "unexpected response") + require.Equalf(t, reqErr, err, "unexpected error") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, reqSpace, m.space, "unexpected space was passed") + require.Equalf(t, reqIndex, m.index, "unexpected index was passed") + require.Equalf(t, reqKey, m.key, "unexpected key was passed") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +type deleteTypedMock struct { + baseRequestMock +} + +func (m *deleteTypedMock) DeleteTyped(space, index, key interface{}, + result interface{}, mode ...Mode) error { + m.called++ + m.space = space + m.index = index + m.key = key + m.result = result + m.mode = mode[0] + return reqErr +} + +func TestConnectorDeleteTyped(t *testing.T) { + m := &deleteTypedMock{} + c := NewConnectorAdapter(m, testMode) + + err := c.DeleteTyped(reqSpace, reqIndex, reqKey, reqResult) + + require.Equalf(t, reqErr, err, "unexpected error") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, reqSpace, m.space, "unexpected space was passed") + require.Equalf(t, reqIndex, m.index, "unexpected index was passed") + require.Equalf(t, reqKey, m.key, "unexpected key was passed") + require.Equalf(t, reqResult, m.result, "unexpected result was passed") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +type deleteAsyncMock struct { + baseRequestMock +} + +func (m *deleteAsyncMock) DeleteAsync(space, index, key interface{}, + mode ...Mode) *tarantool.Future { + m.called++ + m.space = space + m.index = index + m.key = key + m.mode = mode[0] + return reqFuture +} + +func TestConnectorDeleteAsync(t *testing.T) { + m := &deleteAsyncMock{} + c := NewConnectorAdapter(m, testMode) + + fut := c.DeleteAsync(reqSpace, reqIndex, reqKey) + + require.Equalf(t, reqFuture, fut, "unexpected future") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, reqSpace, m.space, "unexpected space was passed") + require.Equalf(t, reqIndex, m.index, "unexpected index was passed") + require.Equalf(t, reqKey, m.key, "unexpected key was passed") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +type updateMock struct { + baseRequestMock +} + +func (m *updateMock) Update(space, index, key, ops interface{}, + mode ...Mode) (*tarantool.Response, error) { + m.called++ + m.space = space + m.index = index + m.key = key + m.ops = ops + m.mode = mode[0] + return reqResp, reqErr +} + +func TestConnectorUpdate(t *testing.T) { + m := &updateMock{} + c := NewConnectorAdapter(m, testMode) + + resp, err := c.Update(reqSpace, reqIndex, reqKey, reqOps) + + require.Equalf(t, reqResp, resp, "unexpected response") + require.Equalf(t, reqErr, err, "unexpected error") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, reqSpace, m.space, "unexpected space was passed") + require.Equalf(t, reqIndex, m.index, "unexpected index was passed") + require.Equalf(t, reqKey, m.key, "unexpected key was passed") + require.Equalf(t, reqOps, m.ops, "unexpected ops was passed") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +type updateTypedMock struct { + baseRequestMock +} + +func (m *updateTypedMock) UpdateTyped(space, index, key, ops interface{}, + result interface{}, mode ...Mode) error { + m.called++ + m.space = space + m.index = index + m.key = key + m.ops = ops + m.result = result + m.mode = mode[0] + return reqErr +} + +func TestConnectorUpdateTyped(t *testing.T) { + m := &updateTypedMock{} + c := NewConnectorAdapter(m, testMode) + + err := c.UpdateTyped(reqSpace, reqIndex, reqKey, reqOps, reqResult) + + require.Equalf(t, reqErr, err, "unexpected error") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, reqSpace, m.space, "unexpected space was passed") + require.Equalf(t, reqIndex, m.index, "unexpected index was passed") + require.Equalf(t, reqKey, m.key, "unexpected key was passed") + require.Equalf(t, reqOps, m.ops, "unexpected ops was passed") + require.Equalf(t, reqResult, m.result, "unexpected result was passed") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +type updateAsyncMock struct { + baseRequestMock +} + +func (m *updateAsyncMock) UpdateAsync(space, index, key, ops interface{}, + mode ...Mode) *tarantool.Future { + m.called++ + m.space = space + m.index = index + m.key = key + m.ops = ops + m.mode = mode[0] + return reqFuture +} + +func TestConnectorUpdateAsync(t *testing.T) { + m := &updateAsyncMock{} + c := NewConnectorAdapter(m, testMode) + + fut := c.UpdateAsync(reqSpace, reqIndex, reqKey, reqOps) + + require.Equalf(t, reqFuture, fut, "unexpected future") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, reqSpace, m.space, "unexpected space was passed") + require.Equalf(t, reqIndex, m.index, "unexpected index was passed") + require.Equalf(t, reqKey, m.key, "unexpected key was passed") + require.Equalf(t, reqOps, m.ops, "unexpected ops was passed") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +type upsertMock struct { + baseRequestMock +} + +func (m *upsertMock) Upsert(space, tuple, ops interface{}, + mode ...Mode) (*tarantool.Response, error) { + m.called++ + m.space = space + m.tuple = tuple + m.ops = ops + m.mode = mode[0] + return reqResp, reqErr +} + +func TestConnectorUpsert(t *testing.T) { + m := &upsertMock{} + c := NewConnectorAdapter(m, testMode) + + resp, err := c.Upsert(reqSpace, reqTuple, reqOps) + + require.Equalf(t, reqResp, resp, "unexpected response") + require.Equalf(t, reqErr, err, "unexpected error") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, reqSpace, m.space, "unexpected space was passed") + require.Equalf(t, reqTuple, m.tuple, "unexpected tuple was passed") + require.Equalf(t, reqOps, m.ops, "unexpected ops was passed") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +type upsertAsyncMock struct { + baseRequestMock +} + +func (m *upsertAsyncMock) UpsertAsync(space, tuple, ops interface{}, + mode ...Mode) *tarantool.Future { + m.called++ + m.space = space + m.tuple = tuple + m.ops = ops + m.mode = mode[0] + return reqFuture +} + +func TestConnectorUpsertAsync(t *testing.T) { + m := &upsertAsyncMock{} + c := NewConnectorAdapter(m, testMode) + + fut := c.UpsertAsync(reqSpace, reqTuple, reqOps) + + require.Equalf(t, reqFuture, fut, "unexpected future") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, reqSpace, m.space, "unexpected space was passed") + require.Equalf(t, reqTuple, m.tuple, "unexpected tuple was passed") + require.Equalf(t, reqOps, m.ops, "unexpected ops was passed") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +type baseCallMock struct { + baseRequestMock +} + +func (m *baseCallMock) call(functionName string, args interface{}, + mode Mode) (*tarantool.Response, error) { + m.called++ + m.functionName = functionName + m.args = args + m.mode = mode + return reqResp, reqErr +} + +func (m *baseCallMock) callTyped(functionName string, args interface{}, + result interface{}, mode Mode) error { + m.called++ + m.functionName = functionName + m.args = args + m.result = result + m.mode = mode + return reqErr +} + +func (m *baseCallMock) callAsync(functionName string, args interface{}, + mode Mode) *tarantool.Future { + m.called++ + m.functionName = functionName + m.args = args + m.mode = mode + return reqFuture +} + +type callMock struct { + baseCallMock +} + +func (m *callMock) Call(functionName string, args interface{}, + mode Mode) (*tarantool.Response, error) { + return m.call(functionName, args, mode) +} + +func TestConnectorCall(t *testing.T) { + m := &callMock{} + c := NewConnectorAdapter(m, testMode) + + resp, err := c.Call(reqFunctionName, reqArgs) + + require.Equalf(t, reqResp, resp, "unexpected response") + require.Equalf(t, reqErr, err, "unexpected error") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, reqFunctionName, m.functionName, + "unexpected functionName was passed") + require.Equalf(t, reqArgs, m.args, "unexpected args was passed") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +type callTypedMock struct { + baseCallMock +} + +func (m *callTypedMock) CallTyped(functionName string, args interface{}, + result interface{}, mode Mode) error { + return m.callTyped(functionName, args, result, mode) +} + +func TestConnectorCallTyped(t *testing.T) { + m := &callTypedMock{} + c := NewConnectorAdapter(m, testMode) + + err := c.CallTyped(reqFunctionName, reqArgs, reqResult) + + require.Equalf(t, reqErr, err, "unexpected error") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, reqFunctionName, m.functionName, + "unexpected functionName was passed") + require.Equalf(t, reqArgs, m.args, "unexpected args was passed") + require.Equalf(t, reqResult, m.result, "unexpected result was passed") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +type callAsyncMock struct { + baseCallMock +} + +func (m *callAsyncMock) CallAsync(functionName string, args interface{}, + mode Mode) *tarantool.Future { + return m.callAsync(functionName, args, mode) +} + +func TestConnectorCallAsync(t *testing.T) { + m := &callAsyncMock{} + c := NewConnectorAdapter(m, testMode) + + fut := c.CallAsync(reqFunctionName, reqArgs) + + require.Equalf(t, reqFuture, fut, "unexpected future") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, reqFunctionName, m.functionName, + "unexpected functionName was passed") + require.Equalf(t, reqArgs, m.args, "unexpected args was passed") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +type call16Mock struct { + baseCallMock +} + +func (m *call16Mock) Call16(functionName string, args interface{}, + mode Mode) (*tarantool.Response, error) { + return m.call(functionName, args, mode) +} + +func TestConnectorCall16(t *testing.T) { + m := &call16Mock{} + c := NewConnectorAdapter(m, testMode) + + resp, err := c.Call16(reqFunctionName, reqArgs) + + require.Equalf(t, reqResp, resp, "unexpected response") + require.Equalf(t, reqErr, err, "unexpected error") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, reqFunctionName, m.functionName, + "unexpected functionName was passed") + require.Equalf(t, reqArgs, m.args, "unexpected args was passed") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +type call16TypedMock struct { + baseCallMock +} + +func (m *call16TypedMock) Call16Typed(functionName string, args interface{}, + result interface{}, mode Mode) error { + return m.callTyped(functionName, args, result, mode) +} + +func TestConnectorCall16Typed(t *testing.T) { + m := &call16TypedMock{} + c := NewConnectorAdapter(m, testMode) + + err := c.Call16Typed(reqFunctionName, reqArgs, reqResult) + + require.Equalf(t, reqErr, err, "unexpected error") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, reqFunctionName, m.functionName, + "unexpected functionName was passed") + require.Equalf(t, reqArgs, m.args, "unexpected args was passed") + require.Equalf(t, reqResult, m.result, "unexpected result was passed") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +type call16AsyncMock struct { + baseCallMock +} + +func (m *call16AsyncMock) Call16Async(functionName string, args interface{}, + mode Mode) *tarantool.Future { + return m.callAsync(functionName, args, mode) +} + +func TestConnectorCall16Async(t *testing.T) { + m := &call16AsyncMock{} + c := NewConnectorAdapter(m, testMode) + + fut := c.Call16Async(reqFunctionName, reqArgs) + + require.Equalf(t, reqFuture, fut, "unexpected future") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, reqFunctionName, m.functionName, + "unexpected functionName was passed") + require.Equalf(t, reqArgs, m.args, "unexpected args was passed") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +type call17Mock struct { + baseCallMock +} + +func (m *call17Mock) Call17(functionName string, args interface{}, + mode Mode) (*tarantool.Response, error) { + return m.call(functionName, args, mode) +} + +func TestConnectorCall17(t *testing.T) { + m := &call17Mock{} + c := NewConnectorAdapter(m, testMode) + + resp, err := c.Call17(reqFunctionName, reqArgs) + + require.Equalf(t, reqResp, resp, "unexpected response") + require.Equalf(t, reqErr, err, "unexpected error") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, reqFunctionName, m.functionName, + "unexpected functionName was passed") + require.Equalf(t, reqArgs, m.args, "unexpected args was passed") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +type call17TypedMock struct { + baseCallMock +} + +func (m *call17TypedMock) Call17Typed(functionName string, args interface{}, + result interface{}, mode Mode) error { + return m.callTyped(functionName, args, result, mode) +} + +func TestConnectorCall17Typed(t *testing.T) { + m := &call17TypedMock{} + c := NewConnectorAdapter(m, testMode) + + err := c.Call17Typed(reqFunctionName, reqArgs, reqResult) + + require.Equalf(t, reqErr, err, "unexpected error") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, reqFunctionName, m.functionName, + "unexpected functionName was passed") + require.Equalf(t, reqArgs, m.args, "unexpected args was passed") + require.Equalf(t, reqResult, m.result, "unexpected result was passed") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +type call17AsyncMock struct { + baseCallMock +} + +func (m *call17AsyncMock) Call17Async(functionName string, args interface{}, + mode Mode) *tarantool.Future { + return m.callAsync(functionName, args, mode) +} + +func TestConnectorCall17Async(t *testing.T) { + m := &call17AsyncMock{} + c := NewConnectorAdapter(m, testMode) + + fut := c.Call17Async(reqFunctionName, reqArgs) + + require.Equalf(t, reqFuture, fut, "unexpected future") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, reqFunctionName, m.functionName, + "unexpected functionName was passed") + require.Equalf(t, reqArgs, m.args, "unexpected args was passed") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +type evalMock struct { + baseCallMock +} + +func (m *evalMock) Eval(functionName string, args interface{}, + mode Mode) (*tarantool.Response, error) { + return m.call(functionName, args, mode) +} + +func TestConnectorEval(t *testing.T) { + m := &evalMock{} + c := NewConnectorAdapter(m, testMode) + + resp, err := c.Eval(reqFunctionName, reqArgs) + + require.Equalf(t, reqResp, resp, "unexpected response") + require.Equalf(t, reqErr, err, "unexpected error") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, reqFunctionName, m.functionName, + "unexpected expr was passed") + require.Equalf(t, reqArgs, m.args, "unexpected args was passed") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +type evalTypedMock struct { + baseCallMock +} + +func (m *evalTypedMock) EvalTyped(functionName string, args interface{}, + result interface{}, mode Mode) error { + return m.callTyped(functionName, args, result, mode) +} + +func TestConnectorEvalTyped(t *testing.T) { + m := &evalTypedMock{} + c := NewConnectorAdapter(m, testMode) + + err := c.EvalTyped(reqFunctionName, reqArgs, reqResult) + + require.Equalf(t, reqErr, err, "unexpected error") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, reqFunctionName, m.functionName, + "unexpected expr was passed") + require.Equalf(t, reqArgs, m.args, "unexpected args was passed") + require.Equalf(t, reqResult, m.result, "unexpected result was passed") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +type evalAsyncMock struct { + baseCallMock +} + +func (m *evalAsyncMock) EvalAsync(functionName string, args interface{}, + mode Mode) *tarantool.Future { + return m.callAsync(functionName, args, mode) +} + +func TestConnectorEvalAsync(t *testing.T) { + m := &evalAsyncMock{} + c := NewConnectorAdapter(m, testMode) + + fut := c.EvalAsync(reqFunctionName, reqArgs) + + require.Equalf(t, reqFuture, fut, "unexpected future") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, reqFunctionName, m.functionName, + "unexpected expr was passed") + require.Equalf(t, reqArgs, m.args, "unexpected args was passed") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +type executeMock struct { + baseCallMock +} + +func (m *executeMock) Execute(functionName string, args interface{}, + mode Mode) (*tarantool.Response, error) { + return m.call(functionName, args, mode) +} + +func TestConnectorExecute(t *testing.T) { + m := &executeMock{} + c := NewConnectorAdapter(m, testMode) + + resp, err := c.Execute(reqFunctionName, reqArgs) + + require.Equalf(t, reqResp, resp, "unexpected response") + require.Equalf(t, reqErr, err, "unexpected error") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, reqFunctionName, m.functionName, + "unexpected expr was passed") + require.Equalf(t, reqArgs, m.args, "unexpected args was passed") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +type executeTypedMock struct { + baseCallMock +} + +func (m *executeTypedMock) ExecuteTyped(functionName string, args, result interface{}, + mode Mode) (tarantool.SQLInfo, []tarantool.ColumnMetaData, error) { + m.callTyped(functionName, args, result, mode) + return reqSqlInfo, reqMeta, reqErr +} + +func TestConnectorExecuteTyped(t *testing.T) { + m := &executeTypedMock{} + c := NewConnectorAdapter(m, testMode) + + info, meta, err := c.ExecuteTyped(reqFunctionName, reqArgs, reqResult) + + require.Equalf(t, reqSqlInfo, info, "unexpected info") + require.Equalf(t, reqMeta, meta, "unexpected meta") + require.Equalf(t, reqErr, err, "unexpected error") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, reqFunctionName, m.functionName, + "unexpected expr was passed") + require.Equalf(t, reqArgs, m.args, "unexpected args was passed") + require.Equalf(t, reqResult, m.result, "unexpected result was passed") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +type executeAsyncMock struct { + baseCallMock +} + +func (m *executeAsyncMock) ExecuteAsync(functionName string, args interface{}, + mode Mode) *tarantool.Future { + return m.callAsync(functionName, args, mode) +} + +func TestConnectorExecuteAsync(t *testing.T) { + m := &executeAsyncMock{} + c := NewConnectorAdapter(m, testMode) + + fut := c.ExecuteAsync(reqFunctionName, reqArgs) + + require.Equalf(t, reqFuture, fut, "unexpected future") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, reqFunctionName, m.functionName, + "unexpected expr was passed") + require.Equalf(t, reqArgs, m.args, "unexpected args was passed") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +var reqPrepared *tarantool.Prepared = &tarantool.Prepared{} + +type newPreparedMock struct { + Pooler + called int + expr string + mode Mode +} + +func (m *newPreparedMock) NewPrepared(expr string, + mode Mode) (*tarantool.Prepared, error) { + m.called++ + m.expr = expr + m.mode = mode + return reqPrepared, reqErr +} + +func TestConnectorNewPrepared(t *testing.T) { + m := &newPreparedMock{} + c := NewConnectorAdapter(m, testMode) + + p, err := c.NewPrepared(reqFunctionName) + + require.Equalf(t, reqPrepared, p, "unexpected prepared") + require.Equalf(t, reqErr, err, "unexpected error") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, reqFunctionName, m.expr, + "unexpected expr was passed") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +var reqStream *tarantool.Stream = &tarantool.Stream{} + +type newStreamMock struct { + Pooler + called int + mode Mode +} + +func (m *newStreamMock) NewStream(mode Mode) (*tarantool.Stream, error) { + m.called++ + m.mode = mode + return reqStream, reqErr +} + +func TestConnectorNewStream(t *testing.T) { + m := &newStreamMock{} + c := NewConnectorAdapter(m, testMode) + + s, err := c.NewStream() + + require.Equalf(t, reqStream, s, "unexpected stream") + require.Equalf(t, reqErr, err, "unexpected error") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} + +var reqRequest tarantool.Request = tarantool.NewPingRequest() + +type doMock struct { + Pooler + called int + req tarantool.Request + mode Mode +} + +func (m *doMock) Do(req tarantool.Request, mode Mode) *tarantool.Future { + m.called++ + m.req = req + m.mode = mode + return reqFuture +} + +func TestConnectorDo(t *testing.T) { + m := &doMock{} + c := NewConnectorAdapter(m, testMode) + + fut := c.Do(reqRequest) + + require.Equalf(t, reqFuture, fut, "unexpected future") + require.Equalf(t, 1, m.called, "should be called only once") + require.Equalf(t, reqRequest, m.req, "unexpected request") + require.Equalf(t, testMode, m.mode, "unexpected proxy mode") +} diff --git a/connection_pool/const.go b/connection_pool/const.go index d77a55044..26b028f5a 100644 --- a/connection_pool/const.go +++ b/connection_pool/const.go @@ -7,6 +7,7 @@ Default mode for each request table: ---------- -------------- | call | no default | | eval | no default | + | execute | no default | | ping | no default | | insert | RW | | delete | RW | diff --git a/connection_pool/example_test.go b/connection_pool/example_test.go index 9328c616a..9a486924a 100644 --- a/connection_pool/example_test.go +++ b/connection_pool/example_test.go @@ -834,3 +834,24 @@ func ExampleBeginRequest_TxnIsolation() { } fmt.Printf("Select after Rollback: response is %#v\n", resp.Data) } + +func ExampleConnectorAdapter() { + pool, err := examplePool(testRoles) + if err != nil { + fmt.Println(err) + } + defer pool.Close() + + adapter := connection_pool.NewConnectorAdapter(pool, connection_pool.RW) + var connector tarantool.Connector = adapter + + // Ping an RW instance to check connection. + resp, err := connector.Ping() + fmt.Println("Ping Code", resp.Code) + fmt.Println("Ping Data", resp.Data) + fmt.Println("Ping Error", err) + // Output: + // Ping Code 0 + // Ping Data [] + // Ping Error +} diff --git a/connection_pool/msgpack_helper_test.go b/connection_pool/msgpack_helper_test.go new file mode 100644 index 000000000..d60c7d84d --- /dev/null +++ b/connection_pool/msgpack_helper_test.go @@ -0,0 +1,10 @@ +//go:build !go_tarantool_msgpack_v5 +// +build !go_tarantool_msgpack_v5 + +package connection_pool_test + +import ( + "gopkg.in/vmihailenco/msgpack.v2" +) + +type decoder = msgpack.Decoder diff --git a/connection_pool/msgpack_v5_helper_test.go b/connection_pool/msgpack_v5_helper_test.go new file mode 100644 index 000000000..7c449bec5 --- /dev/null +++ b/connection_pool/msgpack_v5_helper_test.go @@ -0,0 +1,10 @@ +//go:build go_tarantool_msgpack_v5 +// +build go_tarantool_msgpack_v5 + +package connection_pool_test + +import ( + "github.com/vmihailenco/msgpack/v5" +) + +type decoder = msgpack.Decoder diff --git a/connection_pool/pooler.go b/connection_pool/pooler.go new file mode 100644 index 000000000..a9dbe09f9 --- /dev/null +++ b/connection_pool/pooler.go @@ -0,0 +1,89 @@ +package connection_pool + +import ( + "time" + + "github.com/tarantool/go-tarantool" +) + +// Pooler is the interface that must be implemented by a connection pool. +type Pooler interface { + ConnectedNow(mode Mode) (bool, error) + Close() []error + Ping(mode Mode) (*tarantool.Response, error) + ConfiguredTimeout(mode Mode) (time.Duration, error) + + Select(space, index interface{}, offset, limit, iterator uint32, + key interface{}, mode ...Mode) (*tarantool.Response, error) + Insert(space interface{}, tuple interface{}, + mode ...Mode) (*tarantool.Response, error) + Replace(space interface{}, tuple interface{}, + mode ...Mode) (*tarantool.Response, error) + Delete(space, index interface{}, key interface{}, + mode ...Mode) (*tarantool.Response, error) + Update(space, index interface{}, key, ops interface{}, + mode ...Mode) (*tarantool.Response, error) + Upsert(space interface{}, tuple, ops interface{}, + mode ...Mode) (*tarantool.Response, error) + Call(functionName string, args interface{}, + mode Mode) (*tarantool.Response, error) + Call16(functionName string, args interface{}, + mode Mode) (*tarantool.Response, error) + Call17(functionName string, args interface{}, + mode Mode) (*tarantool.Response, error) + Eval(expr string, args interface{}, + mode Mode) (*tarantool.Response, error) + Execute(expr string, args interface{}, + mode Mode) (*tarantool.Response, error) + + GetTyped(space, index interface{}, key interface{}, result interface{}, + mode ...Mode) error + SelectTyped(space, index interface{}, offset, limit, iterator uint32, + key interface{}, result interface{}, mode ...Mode) error + InsertTyped(space interface{}, tuple interface{}, result interface{}, + mode ...Mode) error + ReplaceTyped(space interface{}, tuple interface{}, result interface{}, + mode ...Mode) error + DeleteTyped(space, index interface{}, key interface{}, result interface{}, + mode ...Mode) error + UpdateTyped(space, index interface{}, key, ops interface{}, + result interface{}, mode ...Mode) error + CallTyped(functionName string, args interface{}, result interface{}, + mode Mode) error + Call16Typed(functionName string, args interface{}, result interface{}, + mode Mode) error + Call17Typed(functionName string, args interface{}, result interface{}, + mode Mode) error + EvalTyped(expr string, args interface{}, result interface{}, + mode Mode) error + ExecuteTyped(expr string, args interface{}, result interface{}, + mode Mode) (tarantool.SQLInfo, []tarantool.ColumnMetaData, error) + + SelectAsync(space, index interface{}, offset, limit, iterator uint32, + key interface{}, mode ...Mode) *tarantool.Future + InsertAsync(space interface{}, tuple interface{}, + mode ...Mode) *tarantool.Future + ReplaceAsync(space interface{}, tuple interface{}, + mode ...Mode) *tarantool.Future + DeleteAsync(space, index interface{}, key interface{}, + mode ...Mode) *tarantool.Future + UpdateAsync(space, index interface{}, key, ops interface{}, + mode ...Mode) *tarantool.Future + UpsertAsync(space interface{}, tuple interface{}, ops interface{}, + mode ...Mode) *tarantool.Future + CallAsync(functionName string, args interface{}, + mode Mode) *tarantool.Future + Call16Async(functionName string, args interface{}, + mode Mode) *tarantool.Future + Call17Async(functionName string, args interface{}, + mode Mode) *tarantool.Future + EvalAsync(expr string, args interface{}, + mode Mode) *tarantool.Future + ExecuteAsync(expr string, args interface{}, + mode Mode) *tarantool.Future + + NewPrepared(expr string, mode Mode) (*tarantool.Prepared, error) + NewStream(mode Mode) (*tarantool.Stream, error) + + Do(req tarantool.Request, mode Mode) (fut *tarantool.Future) +} diff --git a/multi/multi_test.go b/multi/multi_test.go index 643ea186d..2d43bb179 100644 --- a/multi/multi_test.go +++ b/multi/multi_test.go @@ -556,7 +556,7 @@ func TestStream_Rollback(t *testing.T) { func runTestMain(m *testing.M) int { initScript := "config.lua" waitStart := 100 * time.Millisecond - var connectRetry uint = 3 + connectRetry := 3 retryTimeout := 500 * time.Millisecond // Tarantool supports streams and interactive transactions since version 2.10.0 diff --git a/queue/example_connection_pool_test.go b/queue/example_connection_pool_test.go new file mode 100644 index 000000000..59f5020ea --- /dev/null +++ b/queue/example_connection_pool_test.go @@ -0,0 +1,206 @@ +package queue_test + +import ( + "fmt" + "sync" + "time" + + "github.com/google/uuid" + "github.com/tarantool/go-tarantool" + "github.com/tarantool/go-tarantool/connection_pool" + "github.com/tarantool/go-tarantool/queue" + "github.com/tarantool/go-tarantool/test_helpers" +) + +// QueueConnectionHandler handles new connections in a ConnectionPool. +type QueueConnectionHandler struct { + name string + cfg queue.Cfg + + uuid uuid.UUID + registered bool + err error + mutex sync.Mutex + masterUpdated chan struct{} +} + +// QueueConnectionHandler implements the ConnectionHandler interface. +var _ connection_pool.ConnectionHandler = &QueueConnectionHandler{} + +// NewQueueConnectionHandler creates a QueueConnectionHandler object. +func NewQueueConnectionHandler(name string, cfg queue.Cfg) *QueueConnectionHandler { + return &QueueConnectionHandler{ + name: name, + cfg: cfg, + masterUpdated: make(chan struct{}, 10), + } +} + +// Discovered configures a queue for an instance and identifies a shared queue +// session on master instances. +// +// NOTE: the Queue supports only a master-replica cluster configuration. It +// does not support a master-master configuration. +func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection, + role connection_pool.Role) error { + h.mutex.Lock() + defer h.mutex.Unlock() + + if h.err != nil { + return h.err + } + + master := role == connection_pool.MasterRole + if master { + defer func() { + h.masterUpdated <- struct{}{} + }() + } + + // Set up a queue module configuration for an instance. + q := queue.New(conn, h.name) + opts := queue.CfgOpts{InReplicaset: true, Ttr: 60 * time.Second} + + if h.err = q.Cfg(opts); h.err != nil { + return fmt.Errorf("unable to configure queue: %w", h.err) + } + + // The queue only works with a master instance. + if !master { + return nil + } + + if h.err = q.Create(h.cfg); h.err != nil { + return h.err + } + + if !h.registered { + // We register a shared session at the first time. + if h.uuid, h.err = q.Identify(nil); h.err != nil { + return h.err + } + h.registered = true + } else { + // We re-identify as the shared session. + if _, h.err = q.Identify(&h.uuid); h.err != nil { + return h.err + } + } + + fmt.Printf("Master %s is ready to work!\n", conn.Addr()) + + return nil +} + +// Deactivated doesn't do anything useful for the example. +func (h *QueueConnectionHandler) Deactivated(conn *tarantool.Connection, + role connection_pool.Role) error { + return nil +} + +// Closes closes a QueueConnectionHandler object. +func (h *QueueConnectionHandler) Close() { + close(h.masterUpdated) +} + +// Example demonstrates how to use the queue package with the connection_pool +// package. First of all, you need to create a ConnectionHandler implementation +// for the a ConnectionPool object to process new connections from +// RW-instances. +// +// You need to register a shared session UUID at a first master connection. +// It needs to be used to re-identify as the shared session on new +// RW-instances. See QueueConnectionHandler.Discovered() implementation. +// +// After that, you need to create a ConnectorAdapter object with RW mode for +// the ConnectionPool to send requests into RW-instances. This adapter can +// be used to create a ready-to-work queue object. +func Example_connectionPool() { + // Create a ConnectionHandler object. + cfg := queue.Cfg{ + Temporary: false, + IfNotExists: true, + Kind: queue.FIFO, + Opts: queue.Opts{ + Ttl: 10 * time.Second, + }, + } + h := NewQueueConnectionHandler("test_queue", cfg) + defer h.Close() + + // Create a ConnectionPool object. + servers := []string{ + "127.0.0.1:3014", + "127.0.0.1:3015", + } + connOpts := tarantool.Opts{ + Timeout: 1 * time.Second, + User: "test", + Pass: "test", + } + poolOpts := connection_pool.OptsPool{ + CheckTimeout: 1 * time.Second, + ConnectionHandler: h, + } + connPool, err := connection_pool.ConnectWithOpts(servers, connOpts, poolOpts) + if err != nil { + fmt.Printf("Unable to connect to the pool: %s", err) + return + } + defer connPool.Close() + + // Wait for a master instance identification in the queue. + <-h.masterUpdated + if h.err != nil { + fmt.Printf("Unable to identify in the pool: %s", h.err) + return + } + + // Create a Queue object from the ConnectionPool object via + // a ConnectorAdapter. + rw := connection_pool.NewConnectorAdapter(connPool, connection_pool.RW) + q := queue.New(rw, "test_queue") + fmt.Println("A Queue object is ready to work.") + + testData := "test_data" + fmt.Println("Send data:", testData) + if _, err = q.Put(testData); err != nil { + fmt.Printf("Unable to put data into the queue: %s", err) + return + } + + // Switch a master instance in the pool. + roles := []bool{true, false} + err = test_helpers.SetClusterRO(servers, connOpts, roles) + if err != nil { + fmt.Printf("Unable to set cluster roles: %s", err) + return + } + + // Wait for a new master instance re-identification. + <-h.masterUpdated + if h.err != nil { + fmt.Printf("Unable to re-identify in the pool: %s", h.err) + return + } + + // Take a data from the new master instance. + task, err := q.TakeTimeout(1 * time.Second) + if err != nil { + fmt.Println("Unable to got task:", err) + } else if task == nil { + fmt.Println("task == nil") + } else if task.Data() == nil { + fmt.Println("task.Data() == nil") + } else { + task.Ack() + fmt.Println("Got data:", task.Data()) + } + + // Output: + // Master 127.0.0.1:3014 is ready to work! + // A Queue object is ready to work. + // Send data: test_data + // Master 127.0.0.1:3015 is ready to work! + // Got data: test_data +} diff --git a/queue/queue_test.go b/queue/queue_test.go index 7b19c0dd3..85276e4a8 100644 --- a/queue/queue_test.go +++ b/queue/queue_test.go @@ -14,6 +14,13 @@ import ( ) var server = "127.0.0.1:3013" +var serversPool = []string{ + "127.0.0.1:3014", + "127.0.0.1:3015", +} + +var instances []test_helpers.TarantoolInstance + var opts = Opts{ Timeout: 2500 * time.Millisecond, User: "test", @@ -890,7 +897,7 @@ func TestTask_Touch(t *testing.T) { // https://stackoverflow.com/questions/27629380/how-to-exit-a-go-program-honoring-deferred-calls func runTestMain(m *testing.M) int { inst, err := test_helpers.StartTarantool(test_helpers.StartOpts{ - InitScript: "config.lua", + InitScript: "testdata/config.lua", Listen: server, WorkDir: "work_dir", User: opts.User, @@ -899,12 +906,40 @@ func runTestMain(m *testing.M) int { ConnectRetry: 3, RetryTimeout: 500 * time.Millisecond, }) - defer test_helpers.StopTarantoolWithCleanup(inst) if err != nil { log.Fatalf("Failed to prepare test tarantool: %s", err) } + defer test_helpers.StopTarantoolWithCleanup(inst) + + workDirs := []string{"work_dir1", "work_dir2"} + poolOpts := test_helpers.StartOpts{ + InitScript: "testdata/pool.lua", + User: opts.User, + Pass: opts.Pass, + WaitStart: 3 * time.Second, // replication_timeout * 3 + ConnectRetry: -1, + } + instances, err = test_helpers.StartTarantoolInstances(serversPool, workDirs, poolOpts) + + if err != nil { + log.Fatalf("Failed to prepare test tarantool pool: %s", err) + } + + defer test_helpers.StopTarantoolInstances(instances) + + roles := []bool{false, true} + connOpts := Opts{ + Timeout: 500 * time.Millisecond, + User: "test", + Pass: "test", + } + err = test_helpers.SetClusterRO(serversPool, connOpts, roles) + + if err != nil { + log.Fatalf("Failed to set roles in tarantool pool: %s", err) + } return m.Run() } diff --git a/queue/config.lua b/queue/testdata/config.lua similarity index 81% rename from queue/config.lua rename to queue/testdata/config.lua index df28496a3..eccb19a68 100644 --- a/queue/config.lua +++ b/queue/testdata/config.lua @@ -7,7 +7,7 @@ box.cfg{ work_dir = os.getenv("TEST_TNT_WORK_DIR"), } - box.once("init", function() +box.once("init", function() box.schema.user.create('test', {password = 'test'}) box.schema.func.create('queue.tube.test_queue:touch') box.schema.func.create('queue.tube.test_queue:ack') @@ -23,21 +23,15 @@ box.cfg{ box.schema.func.create('queue.identify') box.schema.func.create('queue.state') box.schema.func.create('queue.statistics') - box.schema.user.grant('test', 'create', 'space') - box.schema.user.grant('test', 'write', 'space', '_schema') - box.schema.user.grant('test', 'write', 'space', '_space') - box.schema.user.grant('test', 'read,write', 'space', '_space_sequence') - box.schema.user.grant('test', 'write', 'space', '_index') + box.schema.user.grant('test', 'create,read,write,drop', 'space') box.schema.user.grant('test', 'read, write', 'space', '_queue_session_ids') box.schema.user.grant('test', 'execute', 'universe') box.schema.user.grant('test', 'read,write', 'space', '_queue') box.schema.user.grant('test', 'read,write', 'space', '_schema') + box.schema.user.grant('test', 'read,write', 'space', '_space_sequence') box.schema.user.grant('test', 'read,write', 'space', '_space') box.schema.user.grant('test', 'read,write', 'space', '_index') - box.schema.user.grant('test', 'read,write', 'space', '_queue_consumers') box.schema.user.grant('test', 'read,write', 'space', '_priv') - box.schema.user.grant('test', 'read,write', 'space', '_queue_taken_2') - box.schema.user.grant('test', 'read,write', 'space', '_queue_shared_sessions') if box.space._trigger ~= nil then box.schema.user.grant('test', 'read', 'space', '_trigger') end @@ -56,3 +50,5 @@ end) box.cfg{ listen = os.getenv("TEST_TNT_LISTEN"), } + +require('console').start() diff --git a/queue/testdata/pool.lua b/queue/testdata/pool.lua new file mode 100644 index 000000000..7c63aa787 --- /dev/null +++ b/queue/testdata/pool.lua @@ -0,0 +1,56 @@ +local queue = require('queue') +rawset(_G, 'queue', queue) + +local listen = os.getenv("TEST_TNT_LISTEN") +box.cfg{ + work_dir = os.getenv("TEST_TNT_WORK_DIR"), + listen = listen, + replication = { + "test:test@127.0.0.1:3014", + "test:test@127.0.0.1:3015", + }, + read_only = listen == "127.0.0.1:3015" +} + +box.once("schema", function() + box.schema.user.create('test', {password = 'test'}) + box.schema.user.grant('test', 'replication') + + box.schema.func.create('queue.tube.test_queue:touch') + box.schema.func.create('queue.tube.test_queue:ack') + box.schema.func.create('queue.tube.test_queue:put') + box.schema.func.create('queue.tube.test_queue:drop') + box.schema.func.create('queue.tube.test_queue:peek') + box.schema.func.create('queue.tube.test_queue:kick') + box.schema.func.create('queue.tube.test_queue:take') + box.schema.func.create('queue.tube.test_queue:delete') + box.schema.func.create('queue.tube.test_queue:release') + box.schema.func.create('queue.tube.test_queue:release_all') + box.schema.func.create('queue.tube.test_queue:bury') + box.schema.func.create('queue.identify') + box.schema.func.create('queue.state') + box.schema.func.create('queue.statistics') + box.schema.user.grant('test', 'create,read,write,drop', 'space') + box.schema.user.grant('test', 'read, write', 'space', '_queue_session_ids') + box.schema.user.grant('test', 'execute', 'universe') + box.schema.user.grant('test', 'read,write', 'space', '_queue') + box.schema.user.grant('test', 'read,write', 'space', '_schema') + box.schema.user.grant('test', 'read,write', 'space', '_space_sequence') + box.schema.user.grant('test', 'read,write', 'space', '_space') + box.schema.user.grant('test', 'read,write', 'space', '_index') + box.schema.user.grant('test', 'read,write', 'space', '_priv') + if box.space._trigger ~= nil then + box.schema.user.grant('test', 'read', 'space', '_trigger') + end + if box.space._fk_constraint ~= nil then + box.schema.user.grant('test', 'read', 'space', '_fk_constraint') + end + if box.space._ck_constraint ~= nil then + box.schema.user.grant('test', 'read', 'space', '_ck_constraint') + end + if box.space._func_index ~= nil then + box.schema.user.grant('test', 'read', 'space', '_func_index') + end +end) + +require('console').start() diff --git a/test_helpers/main.go b/test_helpers/main.go index cdc3c343d..77e22c535 100644 --- a/test_helpers/main.go +++ b/test_helpers/main.go @@ -67,8 +67,9 @@ type StartOpts struct { // WaitStart is a time to wait before starting to ping tarantool. WaitStart time.Duration - // ConnectRetry is a count of attempts to ping tarantool. - ConnectRetry uint + // ConnectRetry is a count of retry attempts to ping tarantool. If the + // value < 0 then there will be no ping tarantool at all. + ConnectRetry int // RetryTimeout is a time between tarantool ping retries. RetryTimeout time.Duration @@ -240,7 +241,7 @@ func StartTarantool(startOpts StartOpts) (TarantoolInstance, error) { Ssl: startOpts.ClientSsl, } - var i uint + var i int var server string if startOpts.ClientServer != "" { server = startOpts.ClientServer