Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: queue works with connection_pool #212

Merged
merged 4 commits into from
Sep 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
- Support queue 1.2.0 (#177)
- ConnectionHandler interface for handling changes of connections in
ConnectionPool (#178)
- Execute, ExecuteTyped and ExecuteAsync methods to ConnectionPool (#176)
DifferentialOrange marked this conversation as resolved.
Show resolved Hide resolved
DifferentialOrange marked this conversation as resolved.
Show resolved Hide resolved
- ConnectorAdapter type to use ConnectionPool as Connector interface (#176)
- An example how to use queue and connection_pool subpackages together (#176)

### Changed

- Bump queue package version to 1.2.1 (#176)

### Fixed

- Mode type description in the connection_pool subpackage (#208)
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ clean:

.PHONY: deps
deps: clean
( cd ./queue; tarantoolctl rocks install queue 1.2.0 )
( cd ./queue; tarantoolctl rocks install queue 1.2.1 )

.PHONY: datetime-timezones
datetime-timezones:
Expand Down
33 changes: 33 additions & 0 deletions connection_pool/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ type ConnectionPool struct {
poolsMutex sync.RWMutex
}

DifferentialOrange marked this conversation as resolved.
Show resolved Hide resolved
var _ Pooler = (*ConnectionPool)(nil)
DifferentialOrange marked this conversation as resolved.
Show resolved Hide resolved

type connState struct {
addr string
notify chan tarantool.ConnEvent
Expand Down Expand Up @@ -385,6 +387,16 @@ func (connPool *ConnectionPool) Eval(expr string, args interface{}, userMode Mod
return conn.Eval(expr, args)
}

// Execute passes sql expression to Tarantool for execution.
func (connPool *ConnectionPool) Execute(expr string, args interface{}, userMode Mode) (resp *tarantool.Response, err error) {
conn, err := connPool.getNextConnection(userMode)
if err != nil {
return nil, err
}

return conn.Execute(expr, args)
}

// GetTyped performs select (with limit = 1 and offset = 0)
// to box space and fills typed result.
func (connPool *ConnectionPool) GetTyped(space, index interface{}, key interface{}, result interface{}, userMode ...Mode) (err error) {
Expand Down Expand Up @@ -495,6 +507,16 @@ func (connPool *ConnectionPool) EvalTyped(expr string, args interface{}, result
return conn.EvalTyped(expr, args, result)
}

// ExecuteTyped passes sql expression to Tarantool for execution.
func (connPool *ConnectionPool) ExecuteTyped(expr string, args interface{}, result interface{}, userMode Mode) (tarantool.SQLInfo, []tarantool.ColumnMetaData, error) {
conn, err := connPool.getNextConnection(userMode)
if err != nil {
return tarantool.SQLInfo{}, nil, err
}

return conn.ExecuteTyped(expr, args, result)
}

// SelectAsync sends select request to Tarantool and returns Future.
func (connPool *ConnectionPool) SelectAsync(space, index interface{}, offset, limit, iterator uint32, key interface{}, userMode ...Mode) *tarantool.Future {
conn, err := connPool.getConnByMode(ANY, userMode)
Expand Down Expand Up @@ -607,6 +629,17 @@ func (connPool *ConnectionPool) EvalAsync(expr string, args interface{}, userMod
return conn.EvalAsync(expr, args)
}

// ExecuteAsync sends sql expression to Tarantool for execution and returns
// Future.
func (connPool *ConnectionPool) ExecuteAsync(expr string, args interface{}, userMode Mode) *tarantool.Future {
conn, err := connPool.getNextConnection(userMode)
if err != nil {
return newErrorFuture(err)
}

return conn.ExecuteAsync(expr, args)
}

// Do sends the request and returns a future.
// For requests that belong to an only one connection (e.g. Unprepare or ExecutePrepared)
// the argument of type Mode is unused.
Expand Down
63 changes: 62 additions & 1 deletion connection_pool/connection_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,67 @@ func TestEval(t *testing.T) {
require.Falsef(t, val, "expected `false` with mode `RW`")
}

type Member struct {
id uint
val string
}

func (m *Member) DecodeMsgpack(d *decoder) error {
var err error
var l int
if l, err = d.DecodeArrayLen(); err != nil {
return err
}
if l != 2 {
return fmt.Errorf("array len doesn't match: %d", l)
}
if m.id, err = d.DecodeUint(); err != nil {
return err
}
if m.val, err = d.DecodeString(); err != nil {
return err
}
return nil
}

func TestExecute(t *testing.T) {
test_helpers.SkipIfSQLUnsupported(t)

roles := []bool{false, true, false, false, true}

err := test_helpers.SetClusterRO(servers, connOpts, roles)
require.Nilf(t, err, "fail to set roles for cluster")

connPool, err := connection_pool.Connect(servers, connOpts)
require.Nilf(t, err, "failed to connect")
require.NotNilf(t, connPool, "conn is nil after Connect")

defer connPool.Close()

request := "SELECT NAME0, NAME1 FROM SQL_TEST WHERE NAME0 == 1;"
// Execute
resp, err := connPool.Execute(request, []interface{}{}, connection_pool.ANY)
require.Nilf(t, err, "failed to Execute")
require.NotNilf(t, resp, "response is nil after Execute")
require.GreaterOrEqualf(t, len(resp.Data), 1, "response.Data is empty after Execute")
require.Equalf(t, len(resp.Data[0].([]interface{})), 2, "unexpected response")

// ExecuteTyped
mem := []Member{}
info, _, err := connPool.ExecuteTyped(request, []interface{}{}, &mem, connection_pool.ANY)
require.Nilf(t, err, "failed to ExecuteTyped")
require.Equalf(t, info.AffectedCount, uint64(0), "unexpected info.AffectedCount")
require.Equalf(t, len(mem), 1, "wrong count of results")

// ExecuteAsync
fut := connPool.ExecuteAsync(request, []interface{}{}, connection_pool.ANY)
resp, err = fut.Get()
require.Nilf(t, err, "failed to ExecuteAsync")
require.NotNilf(t, resp, "response is nil after ExecuteAsync")
require.GreaterOrEqualf(t, len(resp.Data), 1, "response.Data is empty after ExecuteAsync")
require.Equalf(t, len(resp.Data[0].([]interface{})), 2, "unexpected response")
}

func TestRoundRobinStrategy(t *testing.T) {
roles := []bool{false, true, false, false, true}

Expand Down Expand Up @@ -1995,7 +2056,7 @@ func TestStream_TxnIsolationLevel(t *testing.T) {
func runTestMain(m *testing.M) int {
initScript := "config.lua"
waitStart := 100 * time.Millisecond
var connectRetry uint = 3
connectRetry := 3
retryTimeout := 500 * time.Millisecond
workDirs := []string{
"work_dir1", "work_dir2",
Expand Down
Loading