diff --git a/CHANGELOG.md b/CHANGELOG.md index 0332f376f..a5a8743c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,10 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. ### Changed +### Removed + +- multi subpackage (#240) + ### Fixed ## [1.12.0] - 2023-06-07 diff --git a/Makefile b/Makefile index cc689dae7..b1d808cda 100644 --- a/Makefile +++ b/Makefile @@ -75,12 +75,6 @@ test-decimal: go clean -testcache go test -tags "$(TAGS)" ./decimal/ -v -p 1 -.PHONY: test-multi -test-multi: - @echo "Running tests in multiconnection package" - go clean -testcache - go test -tags "$(TAGS)" ./multi/ -v -p 1 - .PHONY: test-queue test-queue: @echo "Running tests in queue package" diff --git a/README.md b/README.md index 9fb5611b9..8b7b2fbb4 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,7 @@ faster than other packages according to public benchmarks. * [API reference](#api-reference) * [Walking\-through example](#walking-through-example) * [Migration to v2](#migration-to-v2) + * [multi package](#multi-package) * [msgpack.v5](#msgpackv5) * [Contributing](#contributing) * [Alternative connectors](#alternative-connectors) @@ -158,6 +159,10 @@ There are two parameters: The article describes migration from go-tarantool to go-tarantool/v2. +#### multi package + +The subpackage has been deleted. You could use `connection_pool` instead. + #### msgpack.v5 Most function names and argument types in `msgpack.v5` and `msgpack.v2` diff --git a/multi/call_16_test.go b/multi/call_16_test.go deleted file mode 100644 index 6c67dab3f..000000000 --- a/multi/call_16_test.go +++ /dev/null @@ -1,33 +0,0 @@ -//go:build !go_tarantool_call_17 -// +build !go_tarantool_call_17 - -package multi - -import ( - "testing" - - "github.com/tarantool/go-tarantool/v2" -) - -func TestCall(t *testing.T) { - var resp *tarantool.Response - var err error - - multiConn, err := Connect([]string{server1, server2}, connOpts) - if err != nil { - t.Fatalf("Failed to connect: %s", err.Error()) - } - if multiConn == nil { - t.Fatalf("conn is nil after Connect") - } - defer multiConn.Close() - - // Call16 - resp, err = multiConn.Call("simple_concat", []interface{}{"t"}) - if err != nil { - t.Fatalf("Failed to use Call: %s", err.Error()) - } - if resp.Data[0].([]interface{})[0].(string) != "tt" { - t.Fatalf("result is not {{1}} : %v", resp.Data) - } -} diff --git a/multi/call_17_test.go b/multi/call_17_test.go deleted file mode 100644 index c59d7d97f..000000000 --- a/multi/call_17_test.go +++ /dev/null @@ -1,33 +0,0 @@ -//go:build go_tarantool_call_17 -// +build go_tarantool_call_17 - -package multi - -import ( - "testing" - - "github.com/tarantool/go-tarantool/v2" -) - -func TestCall(t *testing.T) { - var resp *tarantool.Response - var err error - - multiConn, err := Connect([]string{server1, server2}, connOpts) - if err != nil { - t.Fatalf("Failed to connect: %s", err.Error()) - } - if multiConn == nil { - t.Fatalf("conn is nil after Connect") - } - defer multiConn.Close() - - // Call17 - resp, err = multiConn.Call("simple_concat", []interface{}{"t"}) - if err != nil { - t.Fatalf("Failed to use Call: %s", err.Error()) - } - if resp.Data[0].(string) != "tt" { - t.Fatalf("result is not {{1}} : %v", resp.Data) - } -} diff --git a/multi/config.lua b/multi/config.lua deleted file mode 100644 index 7364c0bd6..000000000 --- a/multi/config.lua +++ /dev/null @@ -1,49 +0,0 @@ -local nodes_load = require("config_load_nodes") - --- Do not set listen for now so connector won't be --- able to send requests until everything is configured. -box.cfg{ - work_dir = os.getenv("TEST_TNT_WORK_DIR"), - memtx_use_mvcc_engine = os.getenv("TEST_TNT_MEMTX_USE_MVCC_ENGINE") == 'true' or nil, -} - --- Function to call for getting address list, part of tarantool/multi API. -local get_cluster_nodes = nodes_load.get_cluster_nodes -rawset(_G, 'get_cluster_nodes', get_cluster_nodes) - -box.once("init", function() - local s = box.schema.space.create('test', { - id = 617, - if_not_exists = true, - }) - s:create_index('primary', {type = 'tree', parts = {1, 'string'}, if_not_exists = true}) - - box.schema.user.create('test', { password = 'test' }) - box.schema.user.grant('test', 'read,write,execute', 'universe') - - local sp = box.schema.space.create('SQL_TEST', { - id = 621, - if_not_exists = true, - format = { - {name = "NAME0", type = "unsigned"}, - {name = "NAME1", type = "string"}, - {name = "NAME2", type = "string"}, - } - }) - sp:create_index('primary', {type = 'tree', parts = {1, 'uint'}, if_not_exists = true}) - sp:insert{1, "test", "test"} - -- grants for sql tests - box.schema.user.grant('test', 'create,read,write,drop,alter', 'space') - box.schema.user.grant('test', 'create', 'sequence') -end) - -local function simple_concat(a) - return a .. a -end - -rawset(_G, 'simple_concat', simple_concat) - --- Set listen only when every other thing is configured. -box.cfg{ - listen = os.getenv("TEST_TNT_LISTEN"), -} diff --git a/multi/config_load_nodes.lua b/multi/config_load_nodes.lua deleted file mode 100644 index 4df87fc28..000000000 --- a/multi/config_load_nodes.lua +++ /dev/null @@ -1,7 +0,0 @@ -local function get_cluster_nodes() - return { 'localhost:3013', 'localhost:3014' } -end - -return { - get_cluster_nodes = get_cluster_nodes -} \ No newline at end of file diff --git a/multi/example_test.go b/multi/example_test.go deleted file mode 100644 index d22e3d1c8..000000000 --- a/multi/example_test.go +++ /dev/null @@ -1,39 +0,0 @@ -package multi - -import ( - "fmt" - "time" - - "github.com/tarantool/go-tarantool/v2" -) - -func ExampleConnect() { - multiConn, err := Connect([]string{"127.0.0.1:3031", "127.0.0.1:3032"}, tarantool.Opts{ - Timeout: 5 * time.Second, - User: "test", - Pass: "test", - }) - if err != nil { - fmt.Printf("error in connect is %v", err) - } - fmt.Println(multiConn) -} - -func ExampleConnectWithOpts() { - multiConn, err := ConnectWithOpts([]string{"127.0.0.1:3301", "127.0.0.1:3302"}, tarantool.Opts{ - Timeout: 5 * time.Second, - User: "test", - Pass: "test", - }, OptsMulti{ - // Check for connection timeout every 1 second. - CheckTimeout: 1 * time.Second, - // Lua function name for getting address list. - NodesGetFunctionName: "get_cluster_nodes", - // Ask server for updated address list every 3 seconds. - ClusterDiscoveryTime: 3 * time.Second, - }) - if err != nil { - fmt.Printf("error in connect is %v", err) - } - fmt.Println(multiConn) -} diff --git a/multi/multi.go b/multi/multi.go deleted file mode 100644 index e3693630a..000000000 --- a/multi/multi.go +++ /dev/null @@ -1,534 +0,0 @@ -// Package with methods to work with a Tarantool cluster. -// -// Main features: -// -// - Check the active connection with a configurable time interval and switch -// to the next connection in the pool if there is a connection failure. -// -// - Get the address list from the server and reconfigure it for use in -// MultiConnection. -// -// Since: 1.5 -package multi - -import ( - "errors" - "fmt" - "sync" - "sync/atomic" - "time" - - "github.com/tarantool/go-tarantool/v2" -) - -const ( - connConnected = iota - connClosed -) - -var ( - ErrEmptyAddrs = errors.New("addrs should not be empty") - ErrWrongCheckTimeout = errors.New("wrong check timeout, must be greater than 0") - ErrNoConnection = errors.New("no active connections") -) - -func indexOf(sstring string, data []string) int { - for i, v := range data { - if sstring == v { - return i - } - } - return -1 -} - -// ConnectionMulti is a handle with connections to a number of Tarantool instances. -// -// It is created and configured with Connect function, and could not be -// reconfigured later. -type ConnectionMulti struct { - addrs []string - connOpts tarantool.Opts - opts OptsMulti - - mutex sync.RWMutex - notify chan tarantool.ConnEvent - state uint32 - control chan struct{} - pool map[string]*tarantool.Connection - fallback *tarantool.Connection -} - -var _ = tarantool.Connector(&ConnectionMulti{}) // Check compatibility with connector interface. - -// OptsMulti is a way to configure Connection with multiconnect-specific options. -type OptsMulti struct { - // CheckTimeout is a time interval to check for connection timeout and try to - // switch connection. - CheckTimeout time.Duration - // Lua function name of the server called to retrieve the address list. - NodesGetFunctionName string - // Time interval to ask the server for an updated address list (works - // if NodesGetFunctionName is set). - ClusterDiscoveryTime time.Duration -} - -// Connect creates and configures new ConnectionMulti with multiconnection options. -func ConnectWithOpts(addrs []string, connOpts tarantool.Opts, opts OptsMulti) (connMulti *ConnectionMulti, err error) { - if len(addrs) == 0 { - return nil, ErrEmptyAddrs - } - if opts.CheckTimeout <= 0 { - return nil, ErrWrongCheckTimeout - } - if opts.ClusterDiscoveryTime <= 0 { - opts.ClusterDiscoveryTime = 60 * time.Second - } - - notify := make(chan tarantool.ConnEvent, 10*len(addrs)) // x10 to accept disconnected and closed event (with a margin). - connOpts.Notify = notify - connMulti = &ConnectionMulti{ - addrs: addrs, - connOpts: connOpts.Clone(), - opts: opts, - notify: notify, - control: make(chan struct{}), - pool: make(map[string]*tarantool.Connection), - } - somebodyAlive, _ := connMulti.warmUp() - if !somebodyAlive { - connMulti.Close() - return nil, ErrNoConnection - } - go connMulti.checker() - - return connMulti, nil -} - -// Connect creates and configures new ConnectionMulti. -func Connect(addrs []string, connOpts tarantool.Opts) (connMulti *ConnectionMulti, err error) { - opts := OptsMulti{ - CheckTimeout: 1 * time.Second, - } - return ConnectWithOpts(addrs, connOpts, opts) -} - -func (connMulti *ConnectionMulti) warmUp() (somebodyAlive bool, errs []error) { - errs = make([]error, len(connMulti.addrs)) - - for i, addr := range connMulti.addrs { - conn, err := tarantool.Connect(addr, connMulti.connOpts) - errs[i] = err - if conn != nil && err == nil { - if connMulti.fallback == nil { - connMulti.fallback = conn - } - connMulti.pool[addr] = conn - if conn.ConnectedNow() { - somebodyAlive = true - } - } - } - return -} - -func (connMulti *ConnectionMulti) getState() uint32 { - return atomic.LoadUint32(&connMulti.state) -} - -func (connMulti *ConnectionMulti) getConnectionFromPool(addr string) (*tarantool.Connection, bool) { - connMulti.mutex.RLock() - defer connMulti.mutex.RUnlock() - conn, ok := connMulti.pool[addr] - return conn, ok -} - -func (connMulti *ConnectionMulti) setConnectionToPool(addr string, conn *tarantool.Connection) { - connMulti.mutex.Lock() - defer connMulti.mutex.Unlock() - connMulti.pool[addr] = conn -} - -func (connMulti *ConnectionMulti) deleteConnectionFromPool(addr string) { - connMulti.mutex.Lock() - defer connMulti.mutex.Unlock() - delete(connMulti.pool, addr) -} - -func (connMulti *ConnectionMulti) checker() { - - refreshTimer := time.NewTicker(connMulti.opts.ClusterDiscoveryTime) - timer := time.NewTicker(connMulti.opts.CheckTimeout) - defer refreshTimer.Stop() - defer timer.Stop() - - for connMulti.getState() != connClosed { - - select { - case <-connMulti.control: - return - case e := <-connMulti.notify: - if connMulti.getState() == connClosed { - return - } - if e.Conn.ClosedNow() { - addr := e.Conn.Addr() - if _, ok := connMulti.getConnectionFromPool(addr); !ok { - continue - } - conn, _ := tarantool.Connect(addr, connMulti.connOpts) - if conn != nil { - connMulti.setConnectionToPool(addr, conn) - } else { - connMulti.deleteConnectionFromPool(addr) - } - } - case <-refreshTimer.C: - if connMulti.getState() == connClosed || connMulti.opts.NodesGetFunctionName == "" { - continue - } - var resp [][]string - err := connMulti.Call17Typed(connMulti.opts.NodesGetFunctionName, []interface{}{}, &resp) - if err != nil { - continue - } - if len(resp) > 0 && len(resp[0]) > 0 { - addrs := resp[0] - // Fill pool with new connections. - for _, v := range addrs { - if indexOf(v, connMulti.addrs) < 0 { - conn, _ := tarantool.Connect(v, connMulti.connOpts) - if conn != nil { - connMulti.setConnectionToPool(v, conn) - } - } - } - // Clear pool from obsolete connections. - for _, v := range connMulti.addrs { - if indexOf(v, addrs) < 0 { - con, ok := connMulti.getConnectionFromPool(v) - if con != nil && ok { - con.Close() - } - connMulti.deleteConnectionFromPool(v) - } - } - connMulti.mutex.Lock() - connMulti.addrs = addrs - connMulti.mutex.Unlock() - } - case <-timer.C: - for _, addr := range connMulti.addrs { - if connMulti.getState() == connClosed { - return - } - if conn, ok := connMulti.getConnectionFromPool(addr); ok { - if !conn.ClosedNow() { - continue - } - } - conn, _ := tarantool.Connect(addr, connMulti.connOpts) - if conn != nil { - connMulti.setConnectionToPool(addr, conn) - } - } - } - } -} - -func (connMulti *ConnectionMulti) getCurrentConnection() *tarantool.Connection { - connMulti.mutex.RLock() - defer connMulti.mutex.RUnlock() - - for _, addr := range connMulti.addrs { - conn := connMulti.pool[addr] - if conn != nil { - if conn.ConnectedNow() { - return conn - } - connMulti.fallback = conn - } - } - return connMulti.fallback -} - -// ConnectedNow reports if connection is established at the moment. -func (connMulti *ConnectionMulti) ConnectedNow() bool { - return connMulti.getState() == connConnected && connMulti.getCurrentConnection().ConnectedNow() -} - -// Close closes Connection. -// After this method called, there is no way to reopen this Connection. -func (connMulti *ConnectionMulti) Close() (err error) { - connMulti.mutex.Lock() - defer connMulti.mutex.Unlock() - - close(connMulti.control) - atomic.StoreUint32(&connMulti.state, connClosed) - - for _, conn := range connMulti.pool { - if err == nil { - err = conn.Close() - } else { - conn.Close() - } - } - if connMulti.fallback != nil { - connMulti.fallback.Close() - } - - return -} - -// Ping sends empty request to Tarantool to check connection. -func (connMulti *ConnectionMulti) Ping() (resp *tarantool.Response, err error) { - return connMulti.getCurrentConnection().Ping() -} - -// ConfiguredTimeout returns a timeout from connection config. -func (connMulti *ConnectionMulti) ConfiguredTimeout() time.Duration { - return connMulti.getCurrentConnection().ConfiguredTimeout() -} - -// Select performs select to box space. -func (connMulti *ConnectionMulti) Select(space, index interface{}, offset, limit, iterator uint32, key interface{}) (resp *tarantool.Response, err error) { - return connMulti.getCurrentConnection().Select(space, index, offset, limit, iterator, key) -} - -// Insert performs insertion to box space. -// Tarantool will reject Insert when tuple with same primary key exists. -func (connMulti *ConnectionMulti) Insert(space interface{}, tuple interface{}) (resp *tarantool.Response, err error) { - return connMulti.getCurrentConnection().Insert(space, tuple) -} - -// Replace performs "insert or replace" action to box space. -// If tuple with same primary key exists, it will be replaced. -func (connMulti *ConnectionMulti) Replace(space interface{}, tuple interface{}) (resp *tarantool.Response, err error) { - return connMulti.getCurrentConnection().Replace(space, tuple) -} - -// Delete performs deletion of a tuple by key. -// Result will contain array with deleted tuple. -func (connMulti *ConnectionMulti) Delete(space, index interface{}, key interface{}) (resp *tarantool.Response, err error) { - return connMulti.getCurrentConnection().Delete(space, index, key) -} - -// Update performs update of a tuple by key. -// Result will contain array with updated tuple. -func (connMulti *ConnectionMulti) Update(space, index interface{}, key, ops interface{}) (resp *tarantool.Response, err error) { - return connMulti.getCurrentConnection().Update(space, index, key, ops) -} - -// Upsert performs "update or insert" action of a tuple by key. -// Result will not contain any tuple. -func (connMulti *ConnectionMulti) Upsert(space interface{}, tuple, ops interface{}) (resp *tarantool.Response, err error) { - return connMulti.getCurrentConnection().Upsert(space, tuple, ops) -} - -// Call calls registered Tarantool function. -// It uses request code for Tarantool >= 1.7 if go-tarantool -// was build with go_tarantool_call_17 tag. -// Otherwise, uses request code for Tarantool 1.6. -func (connMulti *ConnectionMulti) Call(functionName string, args interface{}) (resp *tarantool.Response, err error) { - return connMulti.getCurrentConnection().Call(functionName, args) -} - -// Call16 calls registered Tarantool function. -// It uses request code for Tarantool 1.6, so result is converted to array of -// arrays. -// Deprecated since Tarantool 1.7.2. -func (connMulti *ConnectionMulti) Call16(functionName string, args interface{}) (resp *tarantool.Response, err error) { - return connMulti.getCurrentConnection().Call16(functionName, args) -} - -// Call17 calls registered Tarantool function. -// It uses request code for Tarantool >= 1.7, so result is not converted -// (though, keep in mind, result is always array). -func (connMulti *ConnectionMulti) Call17(functionName string, args interface{}) (resp *tarantool.Response, err error) { - return connMulti.getCurrentConnection().Call17(functionName, args) -} - -// Eval passes Lua expression for evaluation. -func (connMulti *ConnectionMulti) Eval(expr string, args interface{}) (resp *tarantool.Response, err error) { - return connMulti.getCurrentConnection().Eval(expr, args) -} - -// Execute passes sql expression to Tarantool for execution. -// -// Since 1.6.0 -func (connMulti *ConnectionMulti) Execute(expr string, args interface{}) (resp *tarantool.Response, err error) { - return connMulti.getCurrentConnection().Execute(expr, args) -} - -// GetTyped performs select (with limit = 1 and offset = 0) to box space and -// fills typed result. -func (connMulti *ConnectionMulti) GetTyped(space, index interface{}, key interface{}, result interface{}) (err error) { - return connMulti.getCurrentConnection().GetTyped(space, index, key, result) -} - -// SelectTyped performs select to box space and fills typed result. -func (connMulti *ConnectionMulti) SelectTyped(space, index interface{}, offset, limit, iterator uint32, key interface{}, result interface{}) (err error) { - return connMulti.getCurrentConnection().SelectTyped(space, index, offset, limit, iterator, key, result) -} - -// InsertTyped performs insertion to box space. -// Tarantool will reject Insert when tuple with same primary key exists. -func (connMulti *ConnectionMulti) InsertTyped(space interface{}, tuple interface{}, result interface{}) (err error) { - return connMulti.getCurrentConnection().InsertTyped(space, tuple, result) -} - -// ReplaceTyped performs "insert or replace" action to box space. -// If tuple with same primary key exists, it will be replaced. -func (connMulti *ConnectionMulti) ReplaceTyped(space interface{}, tuple interface{}, result interface{}) (err error) { - return connMulti.getCurrentConnection().ReplaceTyped(space, tuple, result) -} - -// DeleteTyped performs deletion of a tuple by key and fills result with -// deleted tuple. -func (connMulti *ConnectionMulti) DeleteTyped(space, index interface{}, key interface{}, result interface{}) (err error) { - return connMulti.getCurrentConnection().DeleteTyped(space, index, key, result) -} - -// UpdateTyped performs update of a tuple by key and fills result with updated -// tuple. -func (connMulti *ConnectionMulti) UpdateTyped(space, index interface{}, key, ops interface{}, result interface{}) (err error) { - return connMulti.getCurrentConnection().UpdateTyped(space, index, key, ops, result) -} - -// CallTyped calls registered function. -// It uses request code for Tarantool >= 1.7 if go-tarantool -// was build with go_tarantool_call_17 tag. -// Otherwise, uses request code for Tarantool 1.6. -func (connMulti *ConnectionMulti) CallTyped(functionName string, args interface{}, result interface{}) (err error) { - return connMulti.getCurrentConnection().CallTyped(functionName, args, result) -} - -// Call16Typed calls registered function. -// It uses request code for Tarantool 1.6, so result is converted to array of -// arrays. -// Deprecated since Tarantool 1.7.2. -func (connMulti *ConnectionMulti) Call16Typed(functionName string, args interface{}, result interface{}) (err error) { - return connMulti.getCurrentConnection().Call16Typed(functionName, args, result) -} - -// Call17Typed calls registered function. -// It uses request code for Tarantool >= 1.7, so result is not converted (though, -// keep in mind, result is always array) -func (connMulti *ConnectionMulti) Call17Typed(functionName string, args interface{}, result interface{}) (err error) { - return connMulti.getCurrentConnection().Call17Typed(functionName, args, result) -} - -// EvalTyped passes Lua expression for evaluation. -func (connMulti *ConnectionMulti) EvalTyped(expr string, args interface{}, result interface{}) (err error) { - return connMulti.getCurrentConnection().EvalTyped(expr, args, result) -} - -// ExecuteTyped passes sql expression to Tarantool for execution. -func (connMulti *ConnectionMulti) ExecuteTyped(expr string, args interface{}, result interface{}) (tarantool.SQLInfo, []tarantool.ColumnMetaData, error) { - return connMulti.getCurrentConnection().ExecuteTyped(expr, args, result) -} - -// SelectAsync sends select request to Tarantool and returns Future. -func (connMulti *ConnectionMulti) SelectAsync(space, index interface{}, offset, limit, iterator uint32, key interface{}) *tarantool.Future { - return connMulti.getCurrentConnection().SelectAsync(space, index, offset, limit, iterator, key) -} - -// InsertAsync sends insert action to Tarantool and returns Future. -// Tarantool will reject Insert when tuple with same primary key exists. -func (connMulti *ConnectionMulti) InsertAsync(space interface{}, tuple interface{}) *tarantool.Future { - return connMulti.getCurrentConnection().InsertAsync(space, tuple) -} - -// ReplaceAsync sends "insert or replace" action to Tarantool and returns Future. -// If tuple with same primary key exists, it will be replaced. -func (connMulti *ConnectionMulti) ReplaceAsync(space interface{}, tuple interface{}) *tarantool.Future { - return connMulti.getCurrentConnection().ReplaceAsync(space, tuple) -} - -// DeleteAsync sends deletion action to Tarantool and returns Future. -// Future's result will contain array with deleted tuple. -func (connMulti *ConnectionMulti) DeleteAsync(space, index interface{}, key interface{}) *tarantool.Future { - return connMulti.getCurrentConnection().DeleteAsync(space, index, key) -} - -// Update sends deletion of a tuple by key and returns Future. -// Future's result will contain array with updated tuple. -func (connMulti *ConnectionMulti) UpdateAsync(space, index interface{}, key, ops interface{}) *tarantool.Future { - return connMulti.getCurrentConnection().UpdateAsync(space, index, key, ops) -} - -// UpsertAsync sends "update or insert" action to Tarantool and returns Future. -// Future's sesult will not contain any tuple. -func (connMulti *ConnectionMulti) UpsertAsync(space interface{}, tuple interface{}, ops interface{}) *tarantool.Future { - return connMulti.getCurrentConnection().UpsertAsync(space, tuple, ops) -} - -// CallAsync sends a call to registered Tarantool function and returns Future. -// It uses request code for Tarantool >= 1.7 if go-tarantool -// was build with go_tarantool_call_17 tag. -// Otherwise, uses request code for Tarantool 1.6. -func (connMulti *ConnectionMulti) CallAsync(functionName string, args interface{}) *tarantool.Future { - return connMulti.getCurrentConnection().CallAsync(functionName, args) -} - -// Call16Async sends a call to registered Tarantool function and returns Future. -// It uses request code for Tarantool 1.6, so future's result is always array -// of arrays. -// Deprecated since Tarantool 1.7.2. -func (connMulti *ConnectionMulti) Call16Async(functionName string, args interface{}) *tarantool.Future { - return connMulti.getCurrentConnection().Call16Async(functionName, args) -} - -// Call17Async sends a call to registered Tarantool function and returns Future. -// It uses request code for Tarantool >= 1.7, so future's result will not be converted -// (though, keep in mind, result is always array). -func (connMulti *ConnectionMulti) Call17Async(functionName string, args interface{}) *tarantool.Future { - return connMulti.getCurrentConnection().Call17Async(functionName, args) -} - -// EvalAsync passes Lua expression for evaluation. -func (connMulti *ConnectionMulti) EvalAsync(expr string, args interface{}) *tarantool.Future { - return connMulti.getCurrentConnection().EvalAsync(expr, args) -} - -// ExecuteAsync passes sql expression to Tarantool for execution. -func (connMulti *ConnectionMulti) ExecuteAsync(expr string, args interface{}) *tarantool.Future { - return connMulti.getCurrentConnection().ExecuteAsync(expr, args) -} - -// NewPrepared passes a sql statement to Tarantool for preparation synchronously. -func (connMulti *ConnectionMulti) NewPrepared(expr string) (*tarantool.Prepared, error) { - return connMulti.getCurrentConnection().NewPrepared(expr) -} - -// NewStream creates new Stream object for connection. -// -// Since v. 2.10.0, Tarantool supports streams and interactive transactions over them. -// To use interactive transactions, memtx_use_mvcc_engine box option should be set to true. -// Since 1.7.0 -func (connMulti *ConnectionMulti) NewStream() (*tarantool.Stream, error) { - return connMulti.getCurrentConnection().NewStream() -} - -// NewWatcher does not supported by the ConnectionMulti. The ConnectionMulti is -// deprecated: use ConnectionPool instead. -// -// Since 1.10.0 -func (connMulti *ConnectionMulti) NewWatcher(key string, - callback tarantool.WatchCallback) (tarantool.Watcher, error) { - return nil, errors.New("ConnectionMulti is deprecated " + - "use ConnectionPool") -} - -// Do sends the request and returns a future. -func (connMulti *ConnectionMulti) Do(req tarantool.Request) *tarantool.Future { - if connectedReq, ok := req.(tarantool.ConnectedRequest); ok { - _, belongs := connMulti.getConnectionFromPool(connectedReq.Conn().Addr()) - if !belongs { - fut := tarantool.NewFuture() - fut.SetError(fmt.Errorf("the passed connected request doesn't belong to the current connection or connection pool")) - return fut - } - return connectedReq.Conn().Do(req) - } - return connMulti.getCurrentConnection().Do(req) -} diff --git a/multi/multi_test.go b/multi/multi_test.go deleted file mode 100644 index dbb783860..000000000 --- a/multi/multi_test.go +++ /dev/null @@ -1,647 +0,0 @@ -package multi - -import ( - "fmt" - "log" - "os" - "reflect" - "testing" - "time" - - "github.com/stretchr/testify/require" - - "github.com/tarantool/go-tarantool/v2" - "github.com/tarantool/go-tarantool/v2/test_helpers" -) - -var server1 = "127.0.0.1:3013" -var server2 = "127.0.0.1:3014" -var spaceNo = uint32(617) -var spaceName = "test" -var indexNo = uint32(0) -var connOpts = tarantool.Opts{ - Timeout: 5 * time.Second, - User: "test", - Pass: "test", -} - -var connOptsMulti = OptsMulti{ - CheckTimeout: 1 * time.Second, - NodesGetFunctionName: "get_cluster_nodes", - ClusterDiscoveryTime: 3 * time.Second, -} - -var instances []test_helpers.TarantoolInstance - -func TestConnError_IncorrectParams(t *testing.T) { - multiConn, err := Connect([]string{}, tarantool.Opts{}) - if err == nil { - t.Fatalf("err is nil with incorrect params") - } - if multiConn != nil { - t.Fatalf("conn is not nill with incorrect params") - } - if err.Error() != "addrs should not be empty" { - t.Errorf("incorrect error: %s", err.Error()) - } - - multiConn, err = ConnectWithOpts([]string{server1}, tarantool.Opts{}, OptsMulti{}) - if err == nil { - t.Fatal("err is nil with incorrect params") - } - if multiConn != nil { - t.Fatal("conn is not nill with incorrect params") - } - if err.Error() != "wrong check timeout, must be greater than 0" { - t.Errorf("incorrect error: %s", err.Error()) - } -} - -func TestConnError_Connection(t *testing.T) { - multiConn, err := Connect([]string{"err1", "err2"}, connOpts) - if err == nil { - t.Errorf("err is nil with incorrect params") - return - } - if multiConn != nil { - t.Errorf("conn is not nil with incorrect params") - return - } -} - -func TestConnSuccessfully(t *testing.T) { - multiConn, err := Connect([]string{"err", server1}, connOpts) - if err != nil { - t.Errorf("Failed to connect: %s", err.Error()) - return - } - if multiConn == nil { - t.Errorf("conn is nil after Connect") - return - } - defer multiConn.Close() - - if !multiConn.ConnectedNow() { - t.Errorf("conn has incorrect status") - return - } - if multiConn.getCurrentConnection().Addr() != server1 { - t.Errorf("conn has incorrect addr") - return - } -} - -func TestReconnect(t *testing.T) { - sleep := 100 * time.Millisecond - sleepCnt := 50 - servers := []string{server1, server2} - multiConn, _ := Connect(servers, connOpts) - if multiConn == nil { - t.Errorf("conn is nil after Connect") - return - } - test_helpers.StopTarantoolWithCleanup(instances[0]) - - for i := 0; i < sleepCnt; i++ { - _, ok := multiConn.getConnectionFromPool(servers[0]) - if !ok { - break - } - time.Sleep(sleep) - } - - _, ok := multiConn.getConnectionFromPool(servers[0]) - if ok { - t.Fatalf("failed to close conn") - } - - if multiConn.getCurrentConnection().Addr() == servers[0] { - t.Errorf("conn has incorrect addr: %s after disconnect server1", multiConn.getCurrentConnection().Addr()) - } - - err := test_helpers.RestartTarantool(&instances[0]) - if err != nil { - t.Fatalf("failed to restart Tarantool: %s", err) - } - - for i := 0; i < sleepCnt; i++ { - _, ok := multiConn.getConnectionFromPool(servers[0]) - if ok { - break - } - time.Sleep(sleep) - } - - _, ok = multiConn.getConnectionFromPool(servers[0]) - if !ok { - t.Fatalf("incorrect conn status after reconnecting") - } -} - -func TestDisconnectAll(t *testing.T) { - sleep := 100 * time.Millisecond - sleepCnt := int((time.Second / sleep) * 2) // Checkout time * 2. - - servers := []string{server1, server2} - multiConn, _ := Connect(servers, connOpts) - if multiConn == nil { - t.Errorf("conn is nil after Connect") - return - } - - for _, inst := range instances { - test_helpers.StopTarantoolWithCleanup(inst) - } - - for i := 0; i < sleepCnt && multiConn.ConnectedNow(); i++ { - time.Sleep(sleep) - } - - if multiConn.ConnectedNow() { - t.Errorf("incorrect status after desconnect all") - } - - for _, inst := range instances { - err := test_helpers.RestartTarantool(&inst) - if err != nil { - t.Fatalf("failed to restart Tarantool: %s", err) - } - } - - for i := 0; i < sleepCnt && !multiConn.ConnectedNow(); i++ { - time.Sleep(sleep) - } - - if !multiConn.ConnectedNow() { - t.Errorf("incorrect multiConn status after reconnecting") - } -} - -func TestClose(t *testing.T) { - multiConn, _ := Connect([]string{server1, server2}, connOpts) - if multiConn == nil { - t.Errorf("conn is nil after Connect") - return - } - timer := time.NewTimer(300 * time.Millisecond) - <-timer.C - - conn, _ := multiConn.getConnectionFromPool(server1) - if !conn.ConnectedNow() { - t.Errorf("incorrect conn server1 status") - } - conn, _ = multiConn.getConnectionFromPool(server2) - if !conn.ConnectedNow() { - t.Errorf("incorrect conn server2 status") - } - - multiConn.Close() - timer = time.NewTimer(100 * time.Millisecond) - <-timer.C - - if multiConn.ConnectedNow() { - t.Errorf("incorrect multiConn status after close") - } - conn, _ = multiConn.getConnectionFromPool(server1) - if conn.ConnectedNow() { - t.Errorf("incorrect server1 conn status after close") - } - conn, _ = multiConn.getConnectionFromPool(server2) - if conn.ConnectedNow() { - t.Errorf("incorrect server2 conn status after close") - } -} - -func TestRefresh(t *testing.T) { - - multiConn, _ := ConnectWithOpts([]string{server1, server2}, connOpts, connOptsMulti) - if multiConn == nil { - t.Errorf("conn is nil after Connect") - return - } - - multiConn.mutex.RLock() - curAddr := multiConn.addrs[0] - multiConn.mutex.RUnlock() - - // Wait for refresh timer. - // Scenario 1 nodeload, 1 refresh, 1 nodeload. - time.Sleep(10 * time.Second) - - multiConn.mutex.RLock() - newAddr := multiConn.addrs[0] - multiConn.mutex.RUnlock() - - if curAddr == newAddr { - t.Errorf("Expect address refresh") - } - - if !multiConn.ConnectedNow() { - t.Errorf("Expect connection to exist") - } - - _, err := multiConn.Call17(multiConn.opts.NodesGetFunctionName, []interface{}{}) - if err != nil { - t.Error("Expect to get data after reconnect") - } -} - -func TestCall17(t *testing.T) { - var resp *tarantool.Response - var err error - - multiConn, err := Connect([]string{server1, server2}, connOpts) - if err != nil { - t.Fatalf("Failed to connect: %s", err.Error()) - } - if multiConn == nil { - t.Fatalf("conn is nil after Connect") - } - defer multiConn.Close() - - // Call17 - resp, err = multiConn.Call17("simple_concat", []interface{}{"s"}) - if err != nil { - t.Fatalf("Failed to use Call: %s", err.Error()) - } - if resp.Data[0].(string) != "ss" { - t.Fatalf("result is not {{1}} : %v", resp.Data) - } -} - -func TestNewPrepared(t *testing.T) { - test_helpers.SkipIfSQLUnsupported(t) - - multiConn, err := Connect([]string{server1, server2}, connOpts) - if err != nil { - t.Fatalf("Failed to connect: %s", err.Error()) - } - if multiConn == nil { - t.Fatalf("conn is nil after Connect") - } - defer multiConn.Close() - - stmt, err := multiConn.NewPrepared("SELECT NAME0, NAME1 FROM SQL_TEST WHERE NAME0=:id AND NAME1=:name;") - require.Nilf(t, err, "fail to prepare statement: %v", err) - - executeReq := tarantool.NewExecutePreparedRequest(stmt) - unprepareReq := tarantool.NewUnprepareRequest(stmt) - - resp, err := multiConn.Do(executeReq.Args([]interface{}{1, "test"})).Get() - if err != nil { - t.Fatalf("failed to execute prepared: %v", err) - } - if resp == nil { - t.Fatalf("nil response") - } - if resp.Code != tarantool.OkCode { - t.Fatalf("failed to execute prepared: code %d", resp.Code) - } - if reflect.DeepEqual(resp.Data[0], []interface{}{1, "test"}) { - t.Error("Select with named arguments failed") - } - if resp.MetaData[0].FieldType != "unsigned" || - resp.MetaData[0].FieldName != "NAME0" || - resp.MetaData[1].FieldType != "string" || - resp.MetaData[1].FieldName != "NAME1" { - t.Error("Wrong metadata") - } - - // the second argument for unprepare request is unused - it already belongs to some connection - resp, err = multiConn.Do(unprepareReq).Get() - if err != nil { - t.Errorf("failed to unprepare prepared statement: %v", err) - } - if resp.Code != tarantool.OkCode { - t.Errorf("failed to unprepare prepared statement: code %d", resp.Code) - } - - _, err = multiConn.Do(unprepareReq).Get() - if err == nil { - t.Errorf("the statement must be already unprepared") - } - require.Contains(t, err.Error(), "Prepared statement with id") - - _, err = multiConn.Do(executeReq).Get() - if err == nil { - t.Errorf("the statement must be already unprepared") - } - require.Contains(t, err.Error(), "Prepared statement with id") -} - -func TestDoWithStrangerConn(t *testing.T) { - expectedErr := fmt.Errorf("the passed connected request doesn't belong to the current connection or connection pool") - - multiConn, err := Connect([]string{server1, server2}, connOpts) - if err != nil { - t.Fatalf("Failed to connect: %s", err.Error()) - } - if multiConn == nil { - t.Fatalf("conn is nil after Connect") - } - defer multiConn.Close() - - req := test_helpers.NewStrangerRequest() - - _, err = multiConn.Do(req).Get() - if err == nil { - t.Fatalf("nil error caught") - } - if err.Error() != expectedErr.Error() { - t.Fatalf("Unexpected error caught") - } -} - -func TestStream_Commit(t *testing.T) { - var req tarantool.Request - var resp *tarantool.Response - var err error - - test_helpers.SkipIfStreamsUnsupported(t) - - multiConn, err := Connect([]string{server1, server2}, connOpts) - if err != nil { - t.Fatalf("Failed to connect: %s", err.Error()) - } - if multiConn == nil { - t.Fatalf("conn is nil after Connect") - } - defer multiConn.Close() - - stream, _ := multiConn.NewStream() - - // Begin transaction - req = tarantool.NewBeginRequest() - resp, err = stream.Do(req).Get() - if err != nil { - t.Fatalf("Failed to Begin: %s", err.Error()) - } - if resp.Code != tarantool.OkCode { - t.Fatalf("Failed to Begin: wrong code returned %d", resp.Code) - } - - // Insert in stream - req = tarantool.NewInsertRequest(spaceName). - Tuple([]interface{}{"1001", "hello2", "world2"}) - resp, err = stream.Do(req).Get() - if err != nil { - t.Fatalf("Failed to Insert: %s", err.Error()) - } - if resp.Code != tarantool.OkCode { - t.Errorf("Failed to Insert: wrong code returned %d", resp.Code) - } - defer test_helpers.DeleteRecordByKey(t, multiConn, spaceNo, indexNo, []interface{}{"1001"}) - - // Select not related to the transaction - // while transaction is not committed - // result of select is empty - selectReq := tarantool.NewSelectRequest(spaceNo). - Index(indexNo). - Limit(1). - Iterator(tarantool.IterEq). - Key([]interface{}{"1001"}) - resp, err = multiConn.Do(selectReq).Get() - if err != nil { - t.Fatalf("Failed to Select: %s", err.Error()) - } - if resp == nil { - t.Fatalf("Response is nil after Select") - } - if len(resp.Data) != 0 { - t.Fatalf("Response Data len != 0") - } - - // Select in stream - resp, err = stream.Do(selectReq).Get() - if err != nil { - t.Fatalf("Failed to Select: %s", err.Error()) - } - if resp == nil { - t.Fatalf("Response is nil after Select") - } - if len(resp.Data) != 1 { - t.Fatalf("Response Data len != 1") - } - if tpl, ok := resp.Data[0].([]interface{}); !ok { - t.Fatalf("Unexpected body of Select") - } else { - if id, ok := tpl[0].(string); !ok || id != "1001" { - t.Fatalf("Unexpected body of Select (0)") - } - if h, ok := tpl[1].(string); !ok || h != "hello2" { - t.Fatalf("Unexpected body of Select (1)") - } - if h, ok := tpl[2].(string); !ok || h != "world2" { - t.Fatalf("Unexpected body of Select (2)") - } - } - - // Commit transaction - req = tarantool.NewCommitRequest() - resp, err = stream.Do(req).Get() - if err != nil { - t.Fatalf("Failed to Commit: %s", err.Error()) - } - if resp.Code != tarantool.OkCode { - t.Fatalf("Failed to Commit: wrong code returned %d", resp.Code) - } - - // Select outside of transaction - resp, err = multiConn.Do(selectReq).Get() - if err != nil { - t.Fatalf("Failed to Select: %s", err.Error()) - } - if resp == nil { - t.Fatalf("Response is nil after Select") - } - if len(resp.Data) != 1 { - t.Fatalf("Response Data len != 1") - } - if tpl, ok := resp.Data[0].([]interface{}); !ok { - t.Fatalf("Unexpected body of Select") - } else { - if id, ok := tpl[0].(string); !ok || id != "1001" { - t.Fatalf("Unexpected body of Select (0)") - } - if h, ok := tpl[1].(string); !ok || h != "hello2" { - t.Fatalf("Unexpected body of Select (1)") - } - if h, ok := tpl[2].(string); !ok || h != "world2" { - t.Fatalf("Unexpected body of Select (2)") - } - } -} - -func TestStream_Rollback(t *testing.T) { - var req tarantool.Request - var resp *tarantool.Response - var err error - - test_helpers.SkipIfStreamsUnsupported(t) - - multiConn, err := Connect([]string{server1, server2}, connOpts) - if err != nil { - t.Fatalf("Failed to connect: %s", err.Error()) - } - if multiConn == nil { - t.Fatalf("conn is nil after Connect") - } - defer multiConn.Close() - - stream, _ := multiConn.NewStream() - - // Begin transaction - req = tarantool.NewBeginRequest() - resp, err = stream.Do(req).Get() - if err != nil { - t.Fatalf("Failed to Begin: %s", err.Error()) - } - if resp.Code != tarantool.OkCode { - t.Fatalf("Failed to Begin: wrong code returned %d", resp.Code) - } - - // Insert in stream - req = tarantool.NewInsertRequest(spaceName). - Tuple([]interface{}{"1001", "hello2", "world2"}) - resp, err = stream.Do(req).Get() - if err != nil { - t.Fatalf("Failed to Insert: %s", err.Error()) - } - if resp.Code != tarantool.OkCode { - t.Errorf("Failed to Insert: wrong code returned %d", resp.Code) - } - defer test_helpers.DeleteRecordByKey(t, multiConn, spaceNo, indexNo, []interface{}{"1001"}) - - // Select not related to the transaction - // while transaction is not committed - // result of select is empty - selectReq := tarantool.NewSelectRequest(spaceNo). - Index(indexNo). - Limit(1). - Iterator(tarantool.IterEq). - Key([]interface{}{"1001"}) - resp, err = multiConn.Do(selectReq).Get() - if err != nil { - t.Fatalf("Failed to Select: %s", err.Error()) - } - if resp == nil { - t.Fatalf("Response is nil after Select") - } - if len(resp.Data) != 0 { - t.Fatalf("Response Data len != 0") - } - - // Select in stream - resp, err = stream.Do(selectReq).Get() - if err != nil { - t.Fatalf("Failed to Select: %s", err.Error()) - } - if resp == nil { - t.Fatalf("Response is nil after Select") - } - if len(resp.Data) != 1 { - t.Fatalf("Response Data len != 1") - } - if tpl, ok := resp.Data[0].([]interface{}); !ok { - t.Fatalf("Unexpected body of Select") - } else { - if id, ok := tpl[0].(string); !ok || id != "1001" { - t.Fatalf("Unexpected body of Select (0)") - } - if h, ok := tpl[1].(string); !ok || h != "hello2" { - t.Fatalf("Unexpected body of Select (1)") - } - if h, ok := tpl[2].(string); !ok || h != "world2" { - t.Fatalf("Unexpected body of Select (2)") - } - } - - // Rollback transaction - req = tarantool.NewRollbackRequest() - resp, err = stream.Do(req).Get() - if err != nil { - t.Fatalf("Failed to Rollback: %s", err.Error()) - } - if resp.Code != tarantool.OkCode { - t.Fatalf("Failed to Rollback: wrong code returned %d", resp.Code) - } - - // Select outside of transaction - resp, err = multiConn.Do(selectReq).Get() - if err != nil { - t.Fatalf("Failed to Select: %s", err.Error()) - } - if resp == nil { - t.Fatalf("Response is nil after Select") - } - if len(resp.Data) != 0 { - t.Fatalf("Response Data len != 0") - } -} - -func TestConnectionMulti_NewWatcher(t *testing.T) { - test_helpers.SkipIfStreamsUnsupported(t) - - multiConn, err := Connect([]string{server1, server2}, connOpts) - if err != nil { - t.Fatalf("Failed to connect: %s", err.Error()) - } - if multiConn == nil { - t.Fatalf("conn is nil after Connect") - } - defer multiConn.Close() - - watcher, err := multiConn.NewWatcher("foo", func(event tarantool.WatchEvent) {}) - if watcher != nil { - t.Errorf("Unexpected watcher") - } - if err == nil { - t.Fatalf("Unexpected success") - } - if err.Error() != "ConnectionMulti is deprecated use ConnectionPool" { - t.Fatalf("Unexpected error: %s", err) - } -} - -// runTestMain is a body of TestMain function -// (see https://pkg.go.dev/testing#hdr-Main). -// Using defer + os.Exit is not works so TestMain body -// is a separate function, see -// https://stackoverflow.com/questions/27629380/how-to-exit-a-go-program-honoring-deferred-calls -func runTestMain(m *testing.M) int { - initScript := "config.lua" - waitStart := 100 * time.Millisecond - connectRetry := 10 - retryTimeout := 500 * time.Millisecond - - // Tarantool supports streams and interactive transactions since version 2.10.0 - isStreamUnsupported, err := test_helpers.IsTarantoolVersionLess(2, 10, 0) - if err != nil { - log.Fatalf("Could not check the Tarantool version") - } - - servers := []string{server1, server2} - instances, err = test_helpers.StartTarantoolInstances(servers, nil, test_helpers.StartOpts{ - InitScript: initScript, - User: connOpts.User, - Pass: connOpts.Pass, - WaitStart: waitStart, - ConnectRetry: connectRetry, - RetryTimeout: retryTimeout, - MemtxUseMvccEngine: !isStreamUnsupported, - }) - - if err != nil { - log.Fatalf("Failed to prepare test tarantool: %s", err) - return -1 - } - - defer test_helpers.StopTarantoolInstances(instances) - - return m.Run() -} - -func TestMain(m *testing.M) { - code := runTestMain(m) - os.Exit(code) -}