diff --git a/CHANGELOG.md b/CHANGELOG.md index 2761e52e1..52306fc00 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - `tt replicaset upgrade`: command to upgrade the schema on a Tarantool cluster. * `-r (--replicaset)`: specify the replicaset name(s) to upgrade. * `-t (--timeout)`: timeout for waiting the LSN synchronization (in seconds) (default 5). + * supports upgrading the database schema on remote cluster by upgrading + each replica set individually using `tt replicaset upgrade `. - New flag `--timestamp` of `tt cat` and `tt play` commands is added to specify operations ending with the given timestamp. This value can be specified as a number or using [RFC3339/RFC3339Nano](https://go.dev/src/time/format.go) time format. diff --git a/cli/cmd/replicaset.go b/cli/cmd/replicaset.go index d5c620cfb..a69318b77 100644 --- a/cli/cmd/replicaset.go +++ b/cli/cmd/replicaset.go @@ -58,7 +58,8 @@ var ( // newUpgradeCmd creates a "replicaset upgrade" command. func newUpgradeCmd() *cobra.Command { cmd := &cobra.Command{ - Use: "upgrade () [flags]", + Use: "upgrade ( | ) [flags]\n\n" + + replicasetUriHelp, DisableFlagsInUseLine: true, Short: "Upgrade tarantool cluster", Long: "Upgrade tarantool cluster.\n\n" + @@ -79,6 +80,7 @@ func newUpgradeCmd() *cobra.Command { "timeout for waiting the LSN synchronization (in seconds)") addOrchestratorFlags(cmd) + addTarantoolConnectFlags(cmd) return cmd } @@ -530,6 +532,18 @@ func internalReplicasetUpgradeModule(cmdCtx *cmdcontext.CmdCtx, args []string) e if ctx.IsInstanceConnect { defer ctx.Conn.Close() } + + connectCtx := connect.ConnectCtx{ + Username: replicasetUser, + Password: replicasetPassword, + SslKeyFile: replicasetSslKeyFile, + SslCertFile: replicasetSslCertFile, + SslCaFile: replicasetSslCaFile, + SslCiphers: replicasetSslCiphers, + } + var connOpts connector.ConnectOpts + connOpts, _, _ = resolveConnectOpts(cmdCtx, cliOpts, &connectCtx, args) + return replicasetcmd.Upgrade(replicasetcmd.DiscoveryCtx{ IsApplication: ctx.IsApplication, RunningCtx: ctx.RunningCtx, @@ -538,7 +552,7 @@ func internalReplicasetUpgradeModule(cmdCtx *cmdcontext.CmdCtx, args []string) e }, replicasetcmd.UpgradeOpts{ ChosenReplicasetAliases: chosenReplicasetAliases, LsnTimeout: lsnTimeout, - }) + }, connOpts) } // internalReplicasetPromoteModule is a "promote" command for the replicaset module. diff --git a/cli/replicaset/cmd/upgrade.go b/cli/replicaset/cmd/upgrade.go index 285f0513a..47b59cf03 100644 --- a/cli/replicaset/cmd/upgrade.go +++ b/cli/replicaset/cmd/upgrade.go @@ -62,7 +62,7 @@ func filterReplicasetsByAliases(replicasets replicaset.Replicasets, } // Upgrade upgrades tarantool schema. -func Upgrade(discoveryCtx DiscoveryCtx, opts UpgradeOpts) error { +func Upgrade(discoveryCtx DiscoveryCtx, opts UpgradeOpts, connOpts connector.ConnectOpts) error { replicasets, err := getReplicasets(discoveryCtx) if err != nil { return err @@ -75,12 +75,13 @@ func Upgrade(discoveryCtx DiscoveryCtx, opts UpgradeOpts) error { return err } - return internalUpgrade(replicasetsToUpgrade, opts.LsnTimeout) + return internalUpgrade(replicasetsToUpgrade, opts.LsnTimeout, connOpts) } -func internalUpgrade(replicasets []replicaset.Replicaset, lsnTimeout int) error { +func internalUpgrade(replicasets []replicaset.Replicaset, lsnTimeout int, + connOpts connector.ConnectOpts) error { for _, replicaset := range replicasets { - err := upgradeReplicaset(replicaset, lsnTimeout) + err := upgradeReplicaset(replicaset, lsnTimeout, connOpts) if err != nil { fmt.Printf("• %s: error\n", replicaset.Alias) return fmt.Errorf("replicaset %s: %w", replicaset.Alias, err) @@ -99,7 +100,8 @@ func closeConnectors(master *instanceMeta, replicas []instanceMeta) { } } -func getInstanceConnector(instance replicaset.Instance) (connector.Connector, error) { +func getInstanceConnector(instance replicaset.Instance, + connOpts connector.ConnectOpts) (connector.Connector, error) { run := instance.InstanceCtx fullInstanceName := running.GetAppInstanceName(run) if fullInstanceName == "" { @@ -116,33 +118,55 @@ func getInstanceConnector(instance replicaset.Instance) (connector.Connector, er }) if err != nil { - return nil, fmt.Errorf("instance %s failed to connect via UNIX socket "+ - ": %w", fullInstanceName, err) + fErr := err + conn, err = connector.Connect(connector.ConnectOpts{ + Network: connOpts.Network, + Address: instance.URI, + Username: connOpts.Username, + Password: connOpts.Password, + Ssl: connOpts.Ssl, + }) + if err != nil { + return nil, fmt.Errorf("instance %s failed to connect via UNIX socket "+ + "and uri: %w %w", fullInstanceName, err, fErr) + } } return conn, nil } -func collectRWROInfo(replset replicaset.Replicaset) (*instanceMeta, []instanceMeta, +func collectRWROInfo(replset replicaset.Replicaset, + connOpts connector.ConnectOpts) (*instanceMeta, []instanceMeta, error) { var master *instanceMeta = nil var replicas []instanceMeta for _, instance := range replset.Instances { run := instance.InstanceCtx fullInstanceName := running.GetAppInstanceName(run) - conn, err := getInstanceConnector(instance) + conn, err := getInstanceConnector(instance, connOpts) if err != nil { return nil, nil, err } + isRW := false + if instance.Mode == replicaset.ModeUnknown { - closeConnectors(master, replicas) - return nil, nil, fmt.Errorf( - "can't determine RO/RW mode on instance: %s", fullInstanceName) + // TODO (gh-1034): Temporary workaround for mode assignment. + // Ideally, the discovery code should be updated to handle + // this case properly. + res, err := conn.Eval( + "return (type(box.cfg) == 'function') or box.info.ro", + []any{}, connector.RequestOpts{}) + if err != nil || len(res) == 0 { + return nil, nil, fmt.Errorf( + "can't determine RO/RW mode on instance: %s", + fullInstanceName) + } + isRW = !res[0].(bool) + } else { + isRW = instance.Mode.String() == "rw" } - isRW := instance.Mode.String() == "rw" - if isRW && master != nil { closeConnectors(master, replicas) return nil, nil, fmt.Errorf("%s and %s are both masters", @@ -231,8 +255,9 @@ func snapshot(instance *instanceMeta) error { return nil } -func upgradeReplicaset(replicaset replicaset.Replicaset, lsnTimeout int) error { - master, replicas, err := collectRWROInfo(replicaset) +func upgradeReplicaset(replicaset replicaset.Replicaset, lsnTimeout int, + connOpts connector.ConnectOpts) error { + master, replicas, err := collectRWROInfo(replicaset, connOpts) if err != nil { return err } diff --git a/test/integration/replicaset/small_cluster_app/config.yaml b/test/integration/replicaset/small_cluster_app/config.yaml new file mode 100644 index 000000000..31a9edc1c --- /dev/null +++ b/test/integration/replicaset/small_cluster_app/config.yaml @@ -0,0 +1,25 @@ +credentials: + users: + client: + password: 'secret' + roles: [super] + guest: + roles: [super] + +groups: + group-001: + replicasets: + replicaset-001: + instances: + storage-master: + iproto: + listen: + - uri: '127.0.0.1:3301' + database: + mode: rw + storage-replica: + iproto: + listen: + - uri: '127.0.0.1:3302' + database: + mode: ro diff --git a/test/integration/replicaset/small_cluster_app/instances.yml b/test/integration/replicaset/small_cluster_app/instances.yml new file mode 100644 index 000000000..6eae339ec --- /dev/null +++ b/test/integration/replicaset/small_cluster_app/instances.yml @@ -0,0 +1,4 @@ +storage-master: + +storage-replica: + diff --git a/test/integration/replicaset/test_replicaset_upgrade.py b/test/integration/replicaset/test_replicaset_upgrade.py index 3bc303e11..23e2d8b36 100644 --- a/test/integration/replicaset/test_replicaset_upgrade.py +++ b/test/integration/replicaset/test_replicaset_upgrade.py @@ -2,12 +2,14 @@ import shutil import subprocess import tempfile +import time import pytest from replicaset_helpers import stop_application from vshard_cluster import VshardCluster -from utils import get_tarantool_version, run_command_and_get_output, wait_file +from utils import (control_socket, get_tarantool_version, + run_command_and_get_output, run_path, wait_file) tarantool_major_version, _ = get_tarantool_version() @@ -192,3 +194,53 @@ def test_upgrade_downgraded_cluster_replicasets(tt_cmd, tmp_path): finally: app.stop() + + +def start_application(cmd, workdir, app_name, instances): + instance_process = subprocess.Popen( + cmd, + cwd=workdir, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + text=True + ) + start_output = instance_process.stdout.read() + for inst in instances: + assert f"Starting an instance [{app_name}:{inst}]" in start_output + + +@pytest.mark.skipif(tarantool_major_version < 3, + reason="skip cluster instances test for Tarantool < 3") +def test_upgrade_remote_replicasets(tt_cmd, tmpdir_with_cfg): + tmpdir = tmpdir_with_cfg + app_name = "small_cluster_app" + app_path = os.path.join(tmpdir, app_name) + shutil.copytree(os.path.join(os.path.dirname(__file__), app_name), app_path) + + run_dir = os.path.join(tmpdir, app_name, run_path) + instances = ['storage-master', 'storage-replica'] + + try: + # Start an instance. + start_cmd = [tt_cmd, "start", app_name] + start_application(start_cmd, tmpdir, app_name, instances) + + # Check status. + for inst in instances: + file = wait_file(os.path.join(run_dir, inst), 'tarantool.pid', []) + assert file != "" + file = wait_file(os.path.join(run_dir, inst), control_socket, []) + assert file != "" + # App is ready, but It is necessary to wait for some time + # to be able to establish a connection. + time.sleep(3) + uri = "tcp://client:secret@127.0.0.1:3301" + upgrade_cmd = [tt_cmd, "replicaset", "upgrade", uri, "-t=15"] + rc, out = run_command_and_get_output(upgrade_cmd, cwd=tmpdir) + assert rc == 0 + assert "ok" in out + + finally: + stop_cmd = [tt_cmd, "stop", app_name, "-y"] + stop_rc, stop_out = run_command_and_get_output(stop_cmd, cwd=tmpdir) + assert stop_rc == 0