Skip to content

Commit

Permalink
lib: common cmd parts to connect package
Browse files Browse the repository at this point in the history
Moves the common URL parsing logic, connect to etcd and tarantool
from the `cmd` package to the `connect` package.

Part of #TNTP-1081
  • Loading branch information
dmyger committed Jan 20, 2025
1 parent 0d17f18 commit 119ae51
Show file tree
Hide file tree
Showing 12 changed files with 381 additions and 336 deletions.
64 changes: 10 additions & 54 deletions cli/cluster/cmd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package cmd
import (
"errors"
"fmt"
"os"

clientv3 "go.etcd.io/etcd/client/v3"

Expand Down Expand Up @@ -142,59 +141,17 @@ type connectOpts struct {
Password string
}

// connectTarantool establishes a connection to Tarantool.
func connectTarantool(uriOpts UriOpts, connOpts connectOpts) (tarantool.Connector, error) {
addr, connectorOpts := MakeConnectOptsFromUriOpts(uriOpts)
if connectorOpts.User == "" && connectorOpts.Pass == "" {
connectorOpts.User = connOpts.Username
connectorOpts.Pass = connOpts.Password
if connectorOpts.User == "" {
connectorOpts.User = os.Getenv(connect.TarantoolUsernameEnv)
}
if connectorOpts.Pass == "" {
connectorOpts.Pass = os.Getenv(connect.TarantoolPasswordEnv)
}
}

conn, err := tarantool.Connect(addr, connectorOpts)
if err != nil {
return nil, fmt.Errorf("failed to connect to tarantool: %w", err)
}
return conn, nil
}

// connectEtcd establishes a connection to etcd.
func connectEtcd(uriOpts UriOpts, connOpts connectOpts) (*clientv3.Client, error) {
etcdOpts := MakeEtcdOptsFromUriOpts(uriOpts)
if etcdOpts.Username == "" && etcdOpts.Password == "" {
etcdOpts.Username = connOpts.Username
etcdOpts.Password = connOpts.Password
if etcdOpts.Username == "" {
etcdOpts.Username = os.Getenv(connect.EtcdUsernameEnv)
}
if etcdOpts.Password == "" {
etcdOpts.Password = os.Getenv(connect.EtcdPasswordEnv)
}
}

etcdcli, err := libcluster.ConnectEtcd(etcdOpts)
if err != nil {
return nil, fmt.Errorf("failed to connect to etcd: %w", err)
}
return etcdcli, nil
}

// doOnStorage determines a storage based on the opts.
func doOnStorage(connOpts connectOpts, opts UriOpts,
tarantoolFunc func(tarantool.Connector) error, etcdFunc func(*clientv3.Client) error) error {
etcdcli, errEtcd := connectEtcd(opts, connOpts)
if errEtcd == nil {
return etcdFunc(etcdcli)
func doOnStorage(opts connect.UriOpts,
tarantoolFunc connect.TarantoolFunc, etcdFunc connect.EtcdFunc) error {
done, errEtcd := connect.RunOnEtcd(opts, etcdFunc)
if done {
return errEtcd
}

conn, errTarantool := connectTarantool(opts, connOpts)
if errTarantool == nil {
return tarantoolFunc(conn)
done, errTarantool := connect.RunOnTarantool(opts, tarantoolFunc)
if done {
return errTarantool
}

return fmt.Errorf("failed to establish a connection to tarantool or etcd: %w, %w",
Expand All @@ -205,8 +162,7 @@ func doOnStorage(connOpts connectOpts, opts UriOpts,
func createPublisherAndCollector(
publishers libcluster.DataPublisherFactory,
collectors libcluster.CollectorFactory,
connOpts connectOpts,
opts UriOpts) (libcluster.DataPublisher, libcluster.Collector, func(), error) {
opts connect.UriOpts) (libcluster.DataPublisher, libcluster.Collector, func(), error) {
prefix, key, timeout := opts.Prefix, opts.Key, opts.Timeout

var (
Expand Down Expand Up @@ -254,7 +210,7 @@ func createPublisherAndCollector(
return nil
}

if err := doOnStorage(connOpts, opts, tarantoolFunc, etcdFunc); err != nil {
if err := doOnStorage(opts, tarantoolFunc, etcdFunc); err != nil {
return nil, nil, nil, err
}

Expand Down
7 changes: 4 additions & 3 deletions cli/cluster/cmd/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/apex/log"
"github.com/google/uuid"
libcluster "github.com/tarantool/tt/lib/cluster"
"github.com/tarantool/tt/lib/connect"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"gopkg.in/yaml.v2"
Expand Down Expand Up @@ -63,7 +64,7 @@ type SwitchStatusCtx struct {
TaskID string
}

func makeEtcdOpts(uriOpts UriOpts) libcluster.EtcdOpts {
func makeEtcdOpts(uriOpts connect.UriOpts) libcluster.EtcdOpts {
opts := libcluster.EtcdOpts{
Endpoints: []string{uriOpts.Endpoint},
Username: uriOpts.Username,
Expand All @@ -81,7 +82,7 @@ func makeEtcdOpts(uriOpts UriOpts) libcluster.EtcdOpts {

// Switch master instance.
func Switch(uri *url.URL, switchCtx SwitchCtx) error {
uriOpts, err := ParseUriOpts(uri)
uriOpts, err := connect.ParseUriOpts(uri, "", "")
if err != nil {
return fmt.Errorf("invalid URL %q: %w", uri, err)
}
Expand Down Expand Up @@ -173,7 +174,7 @@ func Switch(uri *url.URL, switchCtx SwitchCtx) error {

// SwitchStatus shows master switching status.
func SwitchStatus(uri *url.URL, switchCtx SwitchStatusCtx) error {
uriOpts, err := ParseUriOpts(uri)
uriOpts, err := connect.ParseUriOpts(uri, "", "")
if err != nil {
return fmt.Errorf("invalid URL %q: %w", uri, err)
}
Expand Down
10 changes: 4 additions & 6 deletions cli/cluster/cmd/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/url"

libcluster "github.com/tarantool/tt/lib/cluster"
"github.com/tarantool/tt/lib/connect"
)

// PublishCtx contains information abould cluster publish command execution
Expand Down Expand Up @@ -33,7 +34,8 @@ type PublishCtx struct {

// PublishUri publishes a configuration to URI.
func PublishUri(publishCtx PublishCtx, uri *url.URL) error {
uriOpts, err := ParseUriOpts(uri)
uriOpts, err := connect.ParseUriOpts(uri,
publishCtx.Username, publishCtx.Password)
if err != nil {
return fmt.Errorf("invalid URL %q: %w", uri, err)
}
Expand All @@ -43,14 +45,10 @@ func PublishUri(publishCtx PublishCtx, uri *url.URL) error {
return err
}

connOpts := connectOpts{
Username: publishCtx.Username,
Password: publishCtx.Password,
}
publisher, collector, cancel, err := createPublisherAndCollector(
publishCtx.Publishers,
publishCtx.Collectors,
connOpts, uriOpts)
uriOpts)
if err != nil {
return err
}
Expand Down
42 changes: 16 additions & 26 deletions cli/cluster/cmd/replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/tarantool/go-tarantool"
"github.com/tarantool/tt/cli/replicaset"
libcluster "github.com/tarantool/tt/lib/cluster"
"github.com/tarantool/tt/lib/connect"
clientv3 "go.etcd.io/etcd/client/v3"
)

Expand Down Expand Up @@ -107,7 +108,7 @@ func pickPatchKey(keys []string, force bool, pathMsg string) (int, error) {
func createDataCollectorAndKeyPublisher(
collectors libcluster.DataCollectorFactory,
publishers libcluster.DataPublisherFactory,
opts UriOpts, connOpts connectOpts) (
opts connect.UriOpts) (
libcluster.DataCollector, replicaset.DataPublisher, func(), error) {
prefix, key, timeout := opts.Prefix, opts.Key, opts.Timeout
var (
Expand Down Expand Up @@ -145,7 +146,7 @@ func createDataCollectorAndKeyPublisher(
return nil
}

if err := doOnStorage(connOpts, opts, tarantoolFunc, etcdFunc); err != nil {
if err := doOnStorage(opts, tarantoolFunc, etcdFunc); err != nil {
return nil, nil, nil, err
}

Expand All @@ -154,17 +155,14 @@ func createDataCollectorAndKeyPublisher(

// Promote promotes an instance by patching the cluster config.
func Promote(uri *url.URL, ctx PromoteCtx) error {
opts, err := ParseUriOpts(uri)
opts, err := connect.ParseUriOpts(uri,
ctx.Username, ctx.Password)
if err != nil {
return fmt.Errorf("invalid URL %q: %w", uri, err)
}
connOpts := connectOpts{
Username: ctx.Username,
Password: ctx.Password,
}

collector, publisher, closeFunc, err := createDataCollectorAndKeyPublisher(
ctx.Collectors, ctx.Publishers, opts, connOpts)
ctx.Collectors, ctx.Publishers, opts)
if err != nil {
return err
}
Expand Down Expand Up @@ -201,17 +199,14 @@ type DemoteCtx struct {

// Demote demotes an instance by patching the cluster config.
func Demote(uri *url.URL, ctx DemoteCtx) error {
opts, err := ParseUriOpts(uri)
opts, err := connect.ParseUriOpts(uri,
ctx.Username, ctx.Password)
if err != nil {
return fmt.Errorf("invalid URL %q: %w", uri, err)
}
connOpts := connectOpts{
Username: ctx.Username,
Password: ctx.Password,
}

collector, publisher, closeFunc, err := createDataCollectorAndKeyPublisher(
ctx.Collectors, ctx.Publishers, opts, connOpts)
ctx.Collectors, ctx.Publishers, opts)
if err != nil {
return err
}
Expand Down Expand Up @@ -248,17 +243,14 @@ type ExpelCtx struct {

// Expel expels an instance by patching the cluster config.
func Expel(uri *url.URL, ctx ExpelCtx) error {
opts, err := ParseUriOpts(uri)
opts, err := connect.ParseUriOpts(uri,
ctx.Username, ctx.Password)
if err != nil {
return fmt.Errorf("invalid URL %q: %w", uri, err)
}
connOpts := connectOpts{
Username: ctx.Username,
Password: ctx.Password,
}

collector, publisher, closeFunc, err := createDataCollectorAndKeyPublisher(
ctx.Collectors, ctx.Publishers, opts, connOpts)
ctx.Collectors, ctx.Publishers, opts)
if err != nil {
return err
}
Expand Down Expand Up @@ -302,17 +294,15 @@ type RolesChangeCtx struct {

// ChangeRole adds/removes a role by patching the cluster config.
func ChangeRole(uri *url.URL, ctx RolesChangeCtx, action replicaset.RolesChangerAction) error {
opts, err := ParseUriOpts(uri)
opts, err := connect.ParseUriOpts(uri,
ctx.Username, ctx.Password)

if err != nil {
return fmt.Errorf("invalid URL %q: %w", uri, err)
}
connOpts := connectOpts{
Username: ctx.Username,
Password: ctx.Password,
}

collector, publisher, closeFunc, err := createDataCollectorAndKeyPublisher(
ctx.Collectors, ctx.Publishers, opts, connOpts)
ctx.Collectors, ctx.Publishers, opts)
if err != nil {
return err
}
Expand Down
10 changes: 4 additions & 6 deletions cli/cluster/cmd/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/tarantool/tt/cli/cluster"
libcluster "github.com/tarantool/tt/lib/cluster"
"github.com/tarantool/tt/lib/connect"
)

// ShowCtx contains information about cluster show command execution context.
Expand All @@ -23,19 +24,16 @@ type ShowCtx struct {

// ShowUri shows a configuration from URI.
func ShowUri(showCtx ShowCtx, uri *url.URL) error {
uriOpts, err := ParseUriOpts(uri)
uriOpts, err := connect.ParseUriOpts(uri,
showCtx.Username, showCtx.Password)
if err != nil {
return fmt.Errorf("invalid URL %q: %w", uri, err)
}

connOpts := connectOpts{
Username: showCtx.Username,
Password: showCtx.Password,
}
_, collector, cancel, err := createPublisherAndCollector(
nil,
showCtx.Collectors,
connOpts, uriOpts)
uriOpts)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions cli/cmd/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ You could also specify etcd/tarantool username and password with environment var
The priority of credentials:
environment variables < command flags < URL credentials.
`, float64(clustercmd.DefaultUriTimeout)/float64(time.Second),
`, float64(libconnect.DefaultUriTimeout)/float64(time.Second),
libconnect.EtcdUsernameEnv, libconnect.EtcdPasswordEnv,
libconnect.TarantoolUsernameEnv, libconnect.TarantoolPasswordEnv)
failoverUriHelp = fmt.Sprintf(
Expand Down Expand Up @@ -129,7 +129,7 @@ You could also specify etcd/tarantool username and password with environment var
The priority of credentials:
environment variables < command flags < URL credentials.
`, float64(clustercmd.DefaultUriTimeout)/float64(time.Second),
`, float64(libconnect.DefaultUriTimeout)/float64(time.Second),
libconnect.EtcdUsernameEnv, libconnect.EtcdPasswordEnv,
libconnect.TarantoolUsernameEnv, libconnect.TarantoolPasswordEnv)
)
Expand Down
64 changes: 64 additions & 0 deletions lib/connect/etcd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package connect

import (
"fmt"
"os"

libcluster "github.com/tarantool/tt/lib/cluster"
clientv3 "go.etcd.io/etcd/client/v3"
)

// EtcdFunc is a function that can be called on an `etcd` connection.
type EtcdFunc func(*clientv3.Client) error

// makeEtcdOptsFromUriOpts create etcd connect options from URI options.
func makeEtcdOptsFromUriOpts(src UriOpts) libcluster.EtcdOpts {
var endpoints []string
if src.Endpoint != "" {
endpoints = []string{src.Endpoint}
}

return libcluster.EtcdOpts{
Endpoints: endpoints,
Username: src.Username,
Password: src.Password,
KeyFile: src.KeyFile,
CertFile: src.CertFile,
CaPath: src.CaPath,
CaFile: src.CaFile,
SkipHostVerify: src.SkipHostVerify || src.SkipPeerVerify,
Timeout: src.Timeout,
}
}

// connectEtcd establishes a connection to etcd.
func connectEtcd(uriOpts UriOpts) (*clientv3.Client, error) {
etcdOpts := makeEtcdOptsFromUriOpts(uriOpts)
if etcdOpts.Username == "" && etcdOpts.Password == "" {
if etcdOpts.Username == "" {
etcdOpts.Username = os.Getenv(EtcdUsernameEnv)
}
if etcdOpts.Password == "" {
etcdOpts.Password = os.Getenv(EtcdPasswordEnv)
}
}

c, err := libcluster.ConnectEtcd(etcdOpts)
if err != nil {
return nil, fmt.Errorf("failed to connect to etcd: %w", err)
}
return c, nil
}

// RunOnEtcd runs the provided function with etcd connection.
// Returns true if the function was executed.
func RunOnEtcd(opts UriOpts, f EtcdFunc) (bool, error) {
if f != nil {
c, err := connectEtcd(opts)
if err != nil {
return false, fmt.Errorf("failed to establish a connection to etcd: %w", err)
}
return true, f(c)
}
return false, nil
}
Loading

0 comments on commit 119ae51

Please sign in to comment.