From c783152a29e5c7eadbc8b9ef6278098a71753aed Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Tue, 23 Aug 2022 17:59:17 +0300 Subject: [PATCH] api: add Execute methods to ConnectionPool The patch adds missed Execute, ExecuteTyped and ExecuteAsync methods to the ConnectionPool type. Part of #176 --- CHANGELOG.md | 1 + connection_pool/connection_pool.go | 31 ++++++++++++ connection_pool/connection_pool_test.go | 61 +++++++++++++++++++++++ connection_pool/const.go | 1 + connection_pool/msgpack_helper_test.go | 10 ++++ connection_pool/msgpack_v5_helper_test.go | 10 ++++ 6 files changed, 114 insertions(+) create mode 100644 connection_pool/msgpack_helper_test.go create mode 100644 connection_pool/msgpack_v5_helper_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 333c09554..002bd758a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. - Support queue 1.2.0 (#177) - ConnectionHandler interface for handling changes of connections in ConnectionPool (#178) +- Execute, ExecuteTyped and ExecuteAsync methods to ConnectionPool (#176) ### Changed diff --git a/connection_pool/connection_pool.go b/connection_pool/connection_pool.go index 79667ae7c..d9d41edef 100644 --- a/connection_pool/connection_pool.go +++ b/connection_pool/connection_pool.go @@ -385,6 +385,16 @@ func (connPool *ConnectionPool) Eval(expr string, args interface{}, userMode Mod return conn.Eval(expr, args) } +// Execute passes sql expression to Tarantool for execution. +func (connPool *ConnectionPool) Execute(expr string, args interface{}, userMode Mode) (resp *tarantool.Response, err error) { + conn, err := connPool.getNextConnection(userMode) + if err != nil { + return nil, err + } + + return conn.Execute(expr, args) +} + // GetTyped performs select (with limit = 1 and offset = 0) // to box space and fills typed result. func (connPool *ConnectionPool) GetTyped(space, index interface{}, key interface{}, result interface{}, userMode ...Mode) (err error) { @@ -495,6 +505,16 @@ func (connPool *ConnectionPool) EvalTyped(expr string, args interface{}, result return conn.EvalTyped(expr, args, result) } +// ExecuteTyped passes sql expression to Tarantool for execution. +func (connPool *ConnectionPool) ExecuteTyped(expr string, args interface{}, result interface{}, userMode Mode) (tarantool.SQLInfo, []tarantool.ColumnMetaData, error) { + conn, err := connPool.getNextConnection(userMode) + if err != nil { + return tarantool.SQLInfo{}, nil, err + } + + return conn.ExecuteTyped(expr, args, result) +} + // SelectAsync sends select request to Tarantool and returns Future. func (connPool *ConnectionPool) SelectAsync(space, index interface{}, offset, limit, iterator uint32, key interface{}, userMode ...Mode) *tarantool.Future { conn, err := connPool.getConnByMode(ANY, userMode) @@ -607,6 +627,17 @@ func (connPool *ConnectionPool) EvalAsync(expr string, args interface{}, userMod return conn.EvalAsync(expr, args) } +// ExecuteAsync sends sql expression to Tarantool for execution and returns +// Future. +func (connPool *ConnectionPool) ExecuteAsync(expr string, args interface{}, userMode Mode) *tarantool.Future { + conn, err := connPool.getNextConnection(userMode) + if err != nil { + return newErrorFuture(err) + } + + return conn.ExecuteAsync(expr, args) +} + // Do sends the request and returns a future. // For requests that belong to an only one connection (e.g. Unprepare or ExecutePrepared) // the argument of type Mode is unused. diff --git a/connection_pool/connection_pool_test.go b/connection_pool/connection_pool_test.go index a41495d8d..b3aee615d 100644 --- a/connection_pool/connection_pool_test.go +++ b/connection_pool/connection_pool_test.go @@ -642,6 +642,67 @@ func TestEval(t *testing.T) { require.Falsef(t, val, "expected `false` with mode `RW`") } +type Member struct { + id uint + val string +} + +func (m *Member) DecodeMsgpack(d *decoder) error { + var err error + var l int + if l, err = d.DecodeArrayLen(); err != nil { + return err + } + if l != 2 { + return fmt.Errorf("array len doesn't match: %d", l) + } + if m.id, err = d.DecodeUint(); err != nil { + return err + } + if m.val, err = d.DecodeString(); err != nil { + return err + } + return nil +} + +func TestExecute(t *testing.T) { + test_helpers.SkipIfSQLUnsupported(t) + + roles := []bool{false, true, false, false, true} + + err := test_helpers.SetClusterRO(servers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + + connPool, err := connection_pool.Connect(servers, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + request := "SELECT NAME0, NAME1 FROM SQL_TEST WHERE NAME0 == 1;" + // Execute + resp, err := connPool.Execute(request, []interface{}{}, connection_pool.ANY) + require.Nilf(t, err, "failed to Execute") + require.NotNilf(t, resp, "response is nil after Execute") + require.GreaterOrEqualf(t, len(resp.Data), 1, "response.Data is empty after Execute") + require.Equalf(t, len(resp.Data[0].([]interface{})), 2, "unexpected response") + + // ExecuteTyped + mem := []Member{} + info, _, err := connPool.ExecuteTyped(request, []interface{}{}, &mem, connection_pool.ANY) + require.Nilf(t, err, "failed to ExecuteTyped") + require.Equalf(t, info.AffectedCount, uint64(0), "unexpected info.AffectedCount") + require.Equalf(t, len(mem), 1, "wrong count of results") + + // ExecuteAsync + fut := connPool.ExecuteAsync(request, []interface{}{}, connection_pool.ANY) + resp, err = fut.Get() + require.Nilf(t, err, "failed to ExecuteAsync") + require.NotNilf(t, resp, "response is nil after ExecuteAsync") + require.GreaterOrEqualf(t, len(resp.Data), 1, "response.Data is empty after ExecuteAsync") + require.Equalf(t, len(resp.Data[0].([]interface{})), 2, "unexpected response") +} + func TestRoundRobinStrategy(t *testing.T) { roles := []bool{false, true, false, false, true} diff --git a/connection_pool/const.go b/connection_pool/const.go index d77a55044..26b028f5a 100644 --- a/connection_pool/const.go +++ b/connection_pool/const.go @@ -7,6 +7,7 @@ Default mode for each request table: ---------- -------------- | call | no default | | eval | no default | + | execute | no default | | ping | no default | | insert | RW | | delete | RW | diff --git a/connection_pool/msgpack_helper_test.go b/connection_pool/msgpack_helper_test.go new file mode 100644 index 000000000..d60c7d84d --- /dev/null +++ b/connection_pool/msgpack_helper_test.go @@ -0,0 +1,10 @@ +//go:build !go_tarantool_msgpack_v5 +// +build !go_tarantool_msgpack_v5 + +package connection_pool_test + +import ( + "gopkg.in/vmihailenco/msgpack.v2" +) + +type decoder = msgpack.Decoder diff --git a/connection_pool/msgpack_v5_helper_test.go b/connection_pool/msgpack_v5_helper_test.go new file mode 100644 index 000000000..7c449bec5 --- /dev/null +++ b/connection_pool/msgpack_v5_helper_test.go @@ -0,0 +1,10 @@ +//go:build go_tarantool_msgpack_v5 +// +build go_tarantool_msgpack_v5 + +package connection_pool_test + +import ( + "github.com/vmihailenco/msgpack/v5" +) + +type decoder = msgpack.Decoder