From cdb367dd6bef2e311dadfb8bdf4e0c1e849b0c2a Mon Sep 17 00:00:00 2001 From: Maksim Tiushev Date: Fri, 22 Nov 2024 13:32:48 +0000 Subject: [PATCH] `tt replicaset`: add subcommand downgrade Closes #968 @TarantoolBot document Title: `tt replicaset downgrade` downgrades database schema. The `tt replicaset downgrade` command allows for a automate downgrade of each replicaset in a Tarantool cluster. The process is performed sequentially on the master instance and its replicas to ensure data consistency. Below are the steps involved: For Each Replicaset: - **On the Master Instance**: 1. Run the following commands in sequence to downgrade the schema and take a snapshot: ```lua box.schema.downgrade(<..version..>) box.snapshot() ``` - **On Each Replica**: 1. Wait for the replica to apply all transactions produced by the `box.schema.downgrade` command executed on the master. This is done by monitoring the vector clocks (vclock) to ensure synchronization. 2. Once the repica has caught up, run the following command to take a snapshot: ```lua box.snapshot() ``` > **Error Handling**: If any errors occur during the downgrade process, the operation will halt, and an error report will be generated. --- - Timeout for Synchronization Replicas will wait for synchronization for a maximum of `Timeout` seconds. The default timeout is set to 5 seconds, but this can be adjusted manually using the `--timeout` option. **Example:** ```bash $ tt replicaset downgrade [ | ] -v 3.0.0 ``` - Selecting Replicasets for Downgrade You can specify which replicaset(s) to downgrade by using the `--replicaset` or `-r` option to target specific replicaset names. **Example:** ```bash $ tt replicaset upgrade [ | ] -v 3.0.0 replicaset -r ... ``` This provides flexibility in downgrading only the desired parts of the cluster without affecting the entire system. --- CHANGELOG.md | 4 + cli/cmd/replicaset.go | 82 ++++++ cli/replicaset/cmd/downgrade.go | 117 ++++++++ cli/replicaset/cmd/lua/downgrade.lua | 41 +++ .../replicaset/test_replicaset_downgrade.py | 256 ++++++++++++++++++ 5 files changed, 500 insertions(+) create mode 100644 cli/replicaset/cmd/downgrade.go create mode 100644 cli/replicaset/cmd/lua/downgrade.lua create mode 100644 test/integration/replicaset/test_replicaset_downgrade.py diff --git a/CHANGELOG.md b/CHANGELOG.md index e9efdc404..86596aabb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added +- `tt replicaset downgrade`: command to downgrade the schema on a Tarantool cluster. + * `-v (--version)`: (Required) specify schema version to downgrade to. + * `-r (--replicaset)`: specify the replicaset name(s) to upgrade. + * `-t (--timeout)`: timeout for waiting the LSN synchronization (in seconds) (default 5). - `tt replicaset upgrade`: command to upgrade the schema on a Tarantool cluster. * `-r (--replicaset)`: specify the replicaset name(s) to upgrade. * `-t (--timeout)`: timeout for waiting the LSN synchronization (in seconds) (default 5). diff --git a/cli/cmd/replicaset.go b/cli/cmd/replicaset.go index a69318b77..526b1f779 100644 --- a/cli/cmd/replicaset.go +++ b/cli/cmd/replicaset.go @@ -1,7 +1,10 @@ package cmd import ( + "errors" "fmt" + "os" + "regexp" "strings" "github.com/spf13/cobra" @@ -48,6 +51,7 @@ var ( chosenReplicasetAliases []string lsnTimeout int + downgradeVersion string replicasetUriHelp = " The URI can be specified in the following formats:\n" + " * [tcp://][username:password@][host:port]\n" + @@ -84,6 +88,50 @@ func newUpgradeCmd() *cobra.Command { return cmd } +// newDowngradeCmd creates a "replicaset downgrade" command. +func newDowngradeCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "downgrade ( | ) [flags]\n\n" + + replicasetUriHelp, + DisableFlagsInUseLine: true, + Short: "Downgrade tarantool cluster", + Long: "Upgrade tarantool cluster.\n\n" + + libconnect.EnvCredentialsHelp + "\n\n", + Run: func(cmd *cobra.Command, args []string) { + var versionPattern = regexp.MustCompile(`^\d+\.\d+\.\d+$`) + if downgradeVersion == "" { + err := errors.New("need to specify the version to downgrade " + + "use --version (-v) option") + util.HandleCmdErr(cmd, err) + os.Exit(1) + } else if !versionPattern.MatchString(downgradeVersion) { + err := errors.New("--version (-v) must be in the format " + + "'x.x.x', where x is a number") + util.HandleCmdErr(cmd, err) + os.Exit(1) + } + + cmdCtx.CommandName = cmd.Name() + err := modules.RunCmd(&cmdCtx, cmd.CommandPath(), &modulesInfo, + internalReplicasetDowngradeModule, args) + util.HandleCmdErr(cmd, err) + }, + Args: cobra.ExactArgs(1), + } + + cmd.Flags().StringArrayVarP(&chosenReplicasetAliases, "replicaset", "r", + []string{}, "specify the replicaset name(s) to downgrade") + + cmd.Flags().IntVarP(&lsnTimeout, "timeout", "t", 5, + "timeout for waiting the LSN synchronization (in seconds)") + + cmd.Flags().StringVarP(&downgradeVersion, "version", "v", "", "version to downgrade") + + addOrchestratorFlags(cmd) + addTarantoolConnectFlags(cmd) + return cmd +} + // newStatusCmd creates a "replicaset status" command. func newStatusCmd() *cobra.Command { cmd := &cobra.Command{ @@ -374,6 +422,7 @@ func NewReplicasetCmd() *cobra.Command { } cmd.AddCommand(newUpgradeCmd()) + cmd.AddCommand(newDowngradeCmd()) cmd.AddCommand(newStatusCmd()) cmd.AddCommand(newPromoteCmd()) cmd.AddCommand(newDemoteCmd()) @@ -555,6 +604,39 @@ func internalReplicasetUpgradeModule(cmdCtx *cmdcontext.CmdCtx, args []string) e }, connOpts) } +// internalReplicasetDowngradeModule is a "upgrade" command for the replicaset module. +func internalReplicasetDowngradeModule(cmdCtx *cmdcontext.CmdCtx, args []string) error { + var ctx replicasetCtx + if err := replicasetFillCtx(cmdCtx, &ctx, args, false); err != nil { + return err + } + if ctx.IsInstanceConnect { + defer ctx.Conn.Close() + } + + connectCtx := connect.ConnectCtx{ + Username: replicasetUser, + Password: replicasetPassword, + SslKeyFile: replicasetSslKeyFile, + SslCertFile: replicasetSslCertFile, + SslCaFile: replicasetSslCaFile, + SslCiphers: replicasetSslCiphers, + } + var connOpts connector.ConnectOpts + connOpts, _, _ = resolveConnectOpts(cmdCtx, cliOpts, &connectCtx, args) + + return replicasetcmd.Downgrade(replicasetcmd.DiscoveryCtx{ + IsApplication: ctx.IsApplication, + RunningCtx: ctx.RunningCtx, + Conn: ctx.Conn, + Orchestrator: ctx.Orchestrator, + }, replicasetcmd.DowngradeOpts{ + ChosenReplicasetAliases: chosenReplicasetAliases, + LsnTimeout: lsnTimeout, + DowngradeVersion: downgradeVersion, + }, connOpts) +} + // internalReplicasetPromoteModule is a "promote" command for the replicaset module. func internalReplicasetPromoteModule(cmdCtx *cmdcontext.CmdCtx, args []string) error { var ctx replicasetCtx diff --git a/cli/replicaset/cmd/downgrade.go b/cli/replicaset/cmd/downgrade.go new file mode 100644 index 000000000..3c3a6c19c --- /dev/null +++ b/cli/replicaset/cmd/downgrade.go @@ -0,0 +1,117 @@ +package replicasetcmd + +import ( + _ "embed" + "fmt" + + "github.com/mitchellh/mapstructure" + "github.com/tarantool/tt/cli/connector" + "github.com/tarantool/tt/cli/replicaset" + "github.com/tarantool/tt/cli/running" +) + +// DowngradeOpts contains options used for the downgrade process. +type DowngradeOpts struct { + // List of replicaset names specified by the user for the downgrade. + ChosenReplicasetAliases []string + // Timeout period (in seconds) for waiting on LSN synchronization. + LsnTimeout int + // Schema version to downgrade. + DowngradeVersion string +} + +//go:embed lua/downgrade.lua +var downgradeMasterLua string + +func Downgrade(discoveryCtx DiscoveryCtx, opts DowngradeOpts, + connOpts connector.ConnectOpts) error { + replicasets, err := getReplicasets(discoveryCtx) + if err != nil { + return err + } + + replicasets = fillAliases(replicasets) + replicasetsToDowngrade, err := filterReplicasetsByAliases(replicasets, + opts.ChosenReplicasetAliases) + if err != nil { + return err + } + + return internalDowngrade(replicasetsToDowngrade, opts.LsnTimeout, + opts.DowngradeVersion, connOpts) +} + +func internalDowngrade(replicasets []replicaset.Replicaset, lsnTimeout int, version string, + connOpts connector.ConnectOpts) error { + for _, replicaset := range replicasets { + err := downgradeReplicaset(replicaset, lsnTimeout, version, connOpts) + if err != nil { + fmt.Printf("• %s: error\n", replicaset.Alias) + return fmt.Errorf("replicaset %s: %w", replicaset.Alias, err) + } + fmt.Printf("• %s: ok\n", replicaset.Alias) + } + return nil +} + +func downgradeMaster(master *instanceMeta, version string) (syncInfo, error) { + var downgradeInfo syncInfo + fullMasterName := running.GetAppInstanceName(master.run) + res, err := master.conn.Eval(downgradeMasterLua, + []interface{}{version}, connector.RequestOpts{}) + if err != nil { + return downgradeInfo, fmt.Errorf( + "failed to execute downgrade script on master instance - %s: %w", + fullMasterName, err) + } + + if err := mapstructure.Decode(res[0], &downgradeInfo); err != nil { + return downgradeInfo, fmt.Errorf( + "failed to decode response from master instance - %s: %w", + fullMasterName, err) + } + + if downgradeInfo.Err != nil { + return downgradeInfo, fmt.Errorf( + "master instance downgrade failed - %s: %s", + fullMasterName, *downgradeInfo.Err) + } + return downgradeInfo, nil +} + +func downgradeReplicaset(replicaset replicaset.Replicaset, lsnTimeout int, version string, + connOpts connector.ConnectOpts) error { + master, replicas, err := collectRWROInfo(replicaset, connOpts) + if err != nil { + return err + } + + defer closeConnectors(master, replicas) + + // Downgrade master instance, collect LSN and IID from master instance. + downgradeInfo, err := downgradeMaster(master, version) + if err != nil { + return err + } + + // Downgrade replica instances. + masterLSN := downgradeInfo.LSN + masterIID := downgradeInfo.IID + + for _, replica := range replicas { + fullReplicaName := running.GetAppInstanceName(replica.run) + err := waitLSN(replica.conn, masterIID, masterLSN, lsnTimeout) + if err != nil { + return fmt.Errorf("can't ensure that downgrade operations performed on "+ + "%s are replicated to %s to perform snapshotting on it: error "+ + "waiting LSN %d in vclock component %d: %w", + running.GetAppInstanceName(master.run), fullReplicaName, + masterLSN, masterIID, err) + } + err = snapshot(&replica) + if err != nil { + return err + } + } + return nil +} diff --git a/cli/replicaset/cmd/lua/downgrade.lua b/cli/replicaset/cmd/lua/downgrade.lua new file mode 100644 index 000000000..795c4366c --- /dev/null +++ b/cli/replicaset/cmd/lua/downgrade.lua @@ -0,0 +1,41 @@ +local version = ... +local allowed_versions = box.schema.downgrade_versions() + +local function is_version_allowed() + for _, allowed_version in ipairs(allowed_versions) do + if allowed_version == version then + return true + end + end + return false +end + +local err +local ok = false + +if not is_version_allowed() then + err = "Version '" .. version .. "' is not allowed." + local result = "[" + for i, value in ipairs(allowed_versions) do + result = result .. tostring(value) + if i < #allowed_versions then + result = result .. ", " + else + result = result .. "]" + end + end + err = err .. "\nAllowed versions: " .. result +end + +if err == nil then + ok, err = pcall(box.schema.downgrade, tostring(version)) + if ok then + ok, err = pcall(box.snapshot) + end +end + +return { + lsn = box.info.lsn, + iid = box.info.id, + err = (not ok) and tostring(err) or nil, +} diff --git a/test/integration/replicaset/test_replicaset_downgrade.py b/test/integration/replicaset/test_replicaset_downgrade.py new file mode 100644 index 000000000..292525091 --- /dev/null +++ b/test/integration/replicaset/test_replicaset_downgrade.py @@ -0,0 +1,256 @@ +import os +import shutil +import subprocess +import tempfile +import time + +import pytest +from replicaset_helpers import stop_application +from vshard_cluster import VshardCluster + +from utils import (control_socket, get_tarantool_version, + run_command_and_get_output, run_path, wait_file) + +tarantool_major_version, _ = get_tarantool_version() +cmd_master = '''box.space._schema:run_triggers(false) +box.space._schema:delete('replicaset_name') +box.space._schema:run_triggers(true) + +box.space._cluster:run_triggers(false) +box.atomic(function() + for _, tuple in box.space._cluster:pairs() do + pcall(box.space._cluster.update, box.space._cluster, {tuple.id}, {{'#', 'name', 1}}) + end +end) +box.space._cluster:run_triggers(true) +''' + + +def run_command_on_instance(tt_cmd, tmpdir, full_inst_name, cmd): + con_cmd = [tt_cmd, "connect", full_inst_name, "-f", "-"] + instance_process = subprocess.Popen( + con_cmd, + cwd=tmpdir, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + stdin=subprocess.PIPE, + text=True, + ) + instance_process.stdin.writelines([cmd]) + instance_process.stdin.close() + output = instance_process.stdout.read() + return output + + +@pytest.mark.skipif( + tarantool_major_version < 3, reason="skip centralized config test for Tarantool < 3" +) +def test_downgrade_multi_master(tt_cmd, tmpdir_with_cfg): + tmpdir = tmpdir_with_cfg + app_name = "test_ccluster_app" + app_path = os.path.join(tmpdir, app_name) + shutil.copytree(os.path.join(os.path.dirname(__file__), app_name), app_path) + try: + # Start a cluster. + start_cmd = [tt_cmd, "start", app_name] + rc, out = run_command_and_get_output(start_cmd, cwd=tmpdir) + assert rc == 0 + + for i in range(1, 6): + file = wait_file( + os.path.join(tmpdir, app_name), f"ready-instance-00{i}", [] + ) + assert file != "" + + downgrade_cmd = [tt_cmd, "replicaset", "downgrade", app_name, "-v=3.0.0"] + + rc, out = run_command_and_get_output(downgrade_cmd, cwd=tmpdir) + assert rc == 1 + assert "replicaset-002: error" in out and "are both masters" in out + + finally: + stop_application(tt_cmd, app_name, tmpdir, []) + + +def test_downgrade_t2_app_dummy_replicaset(tt_cmd): + app_name = "single-t2-app" + test_app_path_src = os.path.join(os.path.dirname(__file__), app_name) + + with tempfile.TemporaryDirectory() as tmpdir: + test_app_path = os.path.join(tmpdir, app_name) + shutil.copytree(test_app_path_src, test_app_path) + memtx_dir = os.path.join(test_app_path, "var", "lib", app_name) + os.makedirs(memtx_dir, exist_ok=True) + + try: + start_cmd = [tt_cmd, "start", app_name] + rc, out = run_command_and_get_output(start_cmd, cwd=test_app_path) + assert rc == 0 + + file = wait_file(test_app_path, "ready", []) + assert file != "" + + downgrade_cmd = [ + tt_cmd, "replicaset", "downgrade", app_name, "--custom", "-v=2.8.2" + ] + rc, out = run_command_and_get_output(downgrade_cmd, cwd=test_app_path) + assert rc == 0 + # Out is `• : ok` because the instance has no name. + assert "ok" in out + finally: + stop_application(tt_cmd, app_name, test_app_path, []) + + +@pytest.mark.skipif(tarantool_major_version < 3, + reason="skip test with cluster config for Tarantool < 3") +def test_cluster_replicasets(tt_cmd, tmp_path): + app_name = "vshard_app" + replicasets = { + "router-001": ["router-001-a"], + "storage-001": ["storage-001-a", "storage-001-b"], + "storage-002": ["storage-002-a", "storage-002-b"], + } + app = VshardCluster(tt_cmd, tmp_path, app_name) + try: + app.build() + app.start() + # This is necessary to downgrade a Tarantool 3.x cluster to 2.x. + for _, replicaset in replicasets.items(): + for replica in replicaset: + out = run_command_on_instance( + tt_cmd, + tmp_path, + f"{app_name}:{replica}", + "box.cfg{force_recovery=true} return box.cfg.force_recovery" + ) + assert "true" in out + + for _, replicaset in replicasets.items(): + _ = run_command_on_instance( + tt_cmd, + tmp_path, + f"{app_name}:{replicaset[0]}", + cmd_master + ) + + downgrade_cmd = [tt_cmd, "replicaset", "downgrade", app_name, "-t=15", "-v=2.11.1"] + rc, out = run_command_and_get_output(downgrade_cmd, cwd=tmp_path) + + assert rc == 0 + + upgrade_out = out.strip().split("\n") + assert len(upgrade_out) == len(replicasets) + + for i in range(len(replicasets)): + assert "ok" in upgrade_out[i] + + # Can't create data (old schema). + out = run_command_on_instance( + tt_cmd, + tmp_path, + f"{app_name}:storage-001-a", + "box.schema.space.create('example_space')" + ) + assert "error: Your schema version is 2.11.1" in out + finally: + app.stop() + + +@pytest.mark.skipif(tarantool_major_version < 3, + reason="skip test with cluster config for Tarantool < 3") +def test_downgrade_invalid_version(tt_cmd, tmp_path): + app_name = "vshard_app" + app = VshardCluster(tt_cmd, tmp_path, app_name) + try: + app.build() + app.start() + + downgrade_cmd = [tt_cmd, "replicaset", "downgrade", app_name, "-t=15", "-v=1.1.1"] + rc, out = run_command_and_get_output(downgrade_cmd, cwd=tmp_path) + + assert rc == 1 + assert "Version '1.1.1' is not allowed." in out + + downgrade_cmd = [tt_cmd, "replicaset", "downgrade", app_name, "-t=15", "-v=3.0"] + rc, out = run_command_and_get_output(downgrade_cmd, cwd=tmp_path) + + assert "⨯ --version (-v) must be in the format 'x.x.x', where x is a number" in out + finally: + app.stop() + + +def start_application(cmd, workdir, app_name, instances): + instance_process = subprocess.Popen( + cmd, + cwd=workdir, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + text=True + ) + start_output = instance_process.stdout.read() + for inst in instances: + assert f"Starting an instance [{app_name}:{inst}]" in start_output + + +@pytest.mark.skipif(tarantool_major_version < 3, + reason="skip cluster instances test for Tarantool < 3") +def test_downgrade_remote_replicasets(tt_cmd, tmpdir_with_cfg): + tmpdir = tmpdir_with_cfg + app_name = "small_cluster_app" + app_path = os.path.join(tmpdir, app_name) + shutil.copytree(os.path.join(os.path.dirname(__file__), app_name), app_path) + + run_dir = os.path.join(tmpdir, app_name, run_path) + instances = ['storage-master', 'storage-replica'] + + try: + # Start an instance. + start_cmd = [tt_cmd, "start", app_name] + start_application(start_cmd, tmpdir, app_name, instances) + + # Check status. + for inst in instances: + file = wait_file(os.path.join(run_dir, inst), 'tarantool.pid', []) + assert file != "" + file = wait_file(os.path.join(run_dir, inst), control_socket, []) + assert file != "" + # App is ready, but It is necessary to wait for some time + # to be able to establish a connection. + time.sleep(3) + + # This is necessary to downgrade a Tarantool 3.x cluster to 2.x. + for inst in instances: + out = run_command_on_instance( + tt_cmd, + tmpdir, + f"{app_name}:{inst}", + "box.cfg{force_recovery=true} return box.cfg.force_recovery" + ) + assert "true" in out + + _ = run_command_on_instance( + tt_cmd, + tmpdir, + f"{app_name}:{instances[0]}", + cmd_master + ) + + uri = "tcp://client:secret@127.0.0.1:3301" + upgrade_cmd = [tt_cmd, "replicaset", "downgrade", uri, "-t=15", "-v=2.11.1"] + rc, out = run_command_and_get_output(upgrade_cmd, cwd=tmpdir) + assert rc == 0 + assert "ok" in out + + # Can't create data (old schema). + out = run_command_on_instance( + tt_cmd, + tmpdir, + f"{app_name}:{instances[0]}", + "box.schema.space.create('example_space')" + ) + assert "error: Your schema version is 2.11.1" in out + + finally: + stop_cmd = [tt_cmd, "stop", app_name, "-y"] + stop_rc, stop_out = run_command_and_get_output(stop_cmd, cwd=tmpdir) + assert stop_rc == 0