Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tt replicaset: add subcommand downgrade #1031

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Added

- `tt replicaset downgrade`: command to downgrade the schema on a Tarantool
cluster.
* `-v (--version)`: (Required) specify schema version to downgrade to.
* `-r (--replicaset)`: specify the replicaset name(s) to upgrade.
* `-t (--timeout)`: timeout for waiting the LSN synchronization (in seconds)
(default 5).
- `tt replicaset upgrade`: command to upgrade the schema on a Tarantool
cluster.
* `-r (--replicaset)`: specify the replicaset name(s) to upgrade.
Expand Down
83 changes: 83 additions & 0 deletions cli/cmd/replicaset.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package cmd

import (
"errors"
"fmt"
"os"
"regexp"
"strings"

"github.com/spf13/cobra"
Expand Down Expand Up @@ -48,6 +51,7 @@ var (

chosenReplicasetAliases []string
lsnTimeout int
downgradeVersion string

replicasetUriHelp = " The URI can be specified in the following formats:\n" +
" * [tcp://][username:password@][host:port]\n" +
Expand Down Expand Up @@ -84,6 +88,51 @@ func newUpgradeCmd() *cobra.Command {
return cmd
}

// newDowngradeCmd creates a "replicaset downgrade" command.
func newDowngradeCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "downgrade (<APP_NAME> | <URI>) [flags]\n\n" +
replicasetUriHelp,
DisableFlagsInUseLine: true,
Short: "Downgrade tarantool cluster",
Long: "Downgrade tarantool cluster.\n\n" +
libconnect.EnvCredentialsHelp + "\n\n",
Run: func(cmd *cobra.Command, args []string) {
var versionPattern = regexp.MustCompile(`^\d+\.\d+\.\d+$`)
if downgradeVersion == "" {
err := errors.New("need to specify the version to downgrade " +
"use --version (-v) option")
util.HandleCmdErr(cmd, err)
os.Exit(1)
} else if !versionPattern.MatchString(downgradeVersion) {
err := errors.New("--version (-v) must be in the format " +
"'x.x.x', where x is a number")
util.HandleCmdErr(cmd, err)
os.Exit(1)
}

cmdCtx.CommandName = cmd.Name()
err := modules.RunCmd(&cmdCtx, cmd.CommandPath(), &modulesInfo,
internalReplicasetDowngradeModule, args)
util.HandleCmdErr(cmd, err)
},
Args: cobra.ExactArgs(1),
}

cmd.Flags().StringArrayVarP(&chosenReplicasetAliases, "replicaset", "r",
[]string{}, "specify the replicaset name(s) to downgrade")

cmd.Flags().IntVarP(&lsnTimeout, "timeout", "t", 5,
"timeout for waiting the LSN synchronization (in seconds)")

cmd.Flags().StringVarP(&downgradeVersion, "version", "v", "",
"version to downgrade the schema to")

addOrchestratorFlags(cmd)
addTarantoolConnectFlags(cmd)
return cmd
}

// newStatusCmd creates a "replicaset status" command.
func newStatusCmd() *cobra.Command {
cmd := &cobra.Command{
Expand Down Expand Up @@ -374,6 +423,7 @@ func NewReplicasetCmd() *cobra.Command {
}

cmd.AddCommand(newUpgradeCmd())
cmd.AddCommand(newDowngradeCmd())
cmd.AddCommand(newStatusCmd())
cmd.AddCommand(newPromoteCmd())
cmd.AddCommand(newDemoteCmd())
Expand Down Expand Up @@ -555,6 +605,39 @@ func internalReplicasetUpgradeModule(cmdCtx *cmdcontext.CmdCtx, args []string) e
}, connOpts)
}

// internalReplicasetDowngradeModule is a "upgrade" command for the replicaset module.
func internalReplicasetDowngradeModule(cmdCtx *cmdcontext.CmdCtx, args []string) error {
var ctx replicasetCtx
if err := replicasetFillCtx(cmdCtx, &ctx, args, false); err != nil {
return err
}
if ctx.IsInstanceConnect {
defer ctx.Conn.Close()
}

connectCtx := connect.ConnectCtx{
Username: replicasetUser,
Password: replicasetPassword,
SslKeyFile: replicasetSslKeyFile,
SslCertFile: replicasetSslCertFile,
SslCaFile: replicasetSslCaFile,
SslCiphers: replicasetSslCiphers,
}
var connOpts connector.ConnectOpts
connOpts, _, _ = resolveConnectOpts(cmdCtx, cliOpts, &connectCtx, args)

return replicasetcmd.Downgrade(replicasetcmd.DiscoveryCtx{
IsApplication: ctx.IsApplication,
RunningCtx: ctx.RunningCtx,
Conn: ctx.Conn,
Orchestrator: ctx.Orchestrator,
}, replicasetcmd.DowngradeOpts{
ChosenReplicasetAliases: chosenReplicasetAliases,
Timeout: lsnTimeout,
DowngradeVersion: downgradeVersion,
}, connOpts)
}

// internalReplicasetPromoteModule is a "promote" command for the replicaset module.
func internalReplicasetPromoteModule(cmdCtx *cmdcontext.CmdCtx, args []string) error {
var ctx replicasetCtx
Expand Down
132 changes: 132 additions & 0 deletions cli/replicaset/cmd/downgrade.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package replicasetcmd

import (
_ "embed"
"fmt"
"strings"

"github.com/mitchellh/mapstructure"
"github.com/tarantool/tt/cli/connector"
"github.com/tarantool/tt/cli/replicaset"
"github.com/tarantool/tt/cli/running"
)

// DowngradeOpts contains options used for the downgrade process.
type DowngradeOpts struct {
// ChosenReplicasetAliases is a list of replicaset names specified by
// the user for the downgrade.
ChosenReplicasetAliases []string
// Timeout period (in seconds) for waiting on LSN synchronization.
Timeout int
// DowngradeVersion is a version to downgrade the schema to.
DowngradeVersion string
}

//go:embed lua/downgrade.lua
var downgradeMasterLua string

func filterComments(script string) string {
var filteredLines []string
lines := strings.Split(script, "\n")
for _, line := range lines {
trimmedLine := strings.TrimSpace(line)
if !strings.HasPrefix(trimmedLine, "--") {
filteredLines = append(filteredLines, line)
}
}
return strings.Join(filteredLines, "\n")
}

// Downgrade downgrades tarantool schema.
func Downgrade(discoveryCtx DiscoveryCtx, opts DowngradeOpts,
connOpts connector.ConnectOpts) error {
replicasets, err := getReplicasets(discoveryCtx)
if err != nil {
return err
}

replicasets = fillAliases(replicasets)
replicasetsToDowngrade, err := filterReplicasetsByAliases(replicasets,
opts.ChosenReplicasetAliases)
if err != nil {
return err
}

return internalDowngrade(replicasetsToDowngrade, opts.Timeout,
opts.DowngradeVersion, connOpts)
}

func internalDowngrade(replicasets []replicaset.Replicaset, lsnTimeout int, version string,
connOpts connector.ConnectOpts) error {
for _, replicaset := range replicasets {
err := downgradeReplicaset(replicaset, lsnTimeout, version, connOpts)
if err != nil {
fmt.Printf("• %s: error\n", replicaset.Alias)
return fmt.Errorf("replicaset %s: %w", replicaset.Alias, err)
}
fmt.Printf("• %s: ok\n", replicaset.Alias)
}
return nil
}

func downgradeMaster(master *instanceMeta, version string) (syncInfo, error) {
var downgradeInfo syncInfo
fullMasterName := running.GetAppInstanceName(master.run)
res, err := master.conn.Eval(filterComments(downgradeMasterLua),
[]interface{}{version}, connector.RequestOpts{})
if err != nil {
return downgradeInfo, fmt.Errorf(
"failed to execute downgrade script on master instance - %s: %w",
fullMasterName, err)
}

if err := mapstructure.Decode(res[0], &downgradeInfo); err != nil {
return downgradeInfo, fmt.Errorf(
"failed to decode response from master instance - %s: %w",
fullMasterName, err)
}

if downgradeInfo.Err != nil {
return downgradeInfo, fmt.Errorf(
"master instance downgrade failed - %s: %s",
fullMasterName, *downgradeInfo.Err)
}
return downgradeInfo, nil
}

func downgradeReplicaset(replicaset replicaset.Replicaset, lsnTimeout int, version string,
connOpts connector.ConnectOpts) error {
master, replicas, err := collectRWROInfo(replicaset, connOpts)
if err != nil {
return err
}

defer closeConnectors(master, replicas)

// Downgrade master instance, collect LSN and IID from master instance.
downgradeInfo, err := downgradeMaster(master, version)
if err != nil {
return err
}

// Downgrade replica instances.
masterLSN := downgradeInfo.LSN
masterIID := downgradeInfo.IID

for _, replica := range replicas {
fullReplicaName := running.GetAppInstanceName(replica.run)
err := waitLSN(replica.conn, masterIID, masterLSN, lsnTimeout)
if err != nil {
return fmt.Errorf("can't ensure that downgrade operations performed on "+
"%s are replicated to %s to perform snapshotting on it: error "+
"waiting LSN %d in vclock component %d: %w",
running.GetAppInstanceName(master.run), fullReplicaName,
masterLSN, masterIID, err)
}
err = snapshot(&replica)
if err != nil {
return err
}
}
return nil
}
41 changes: 41 additions & 0 deletions cli/replicaset/cmd/lua/downgrade.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
local version = ...
local allowed_versions = box.schema.downgrade_versions()

local function is_version_allowed(version, allowed_versions)
for _, allowed_version in ipairs(allowed_versions) do
if allowed_version == version then
return true
end
end
return false
end

local function format_allowed_versions(versions)
return "[" .. table.concat(versions, ", ") .. "]"
end

local function downgrade_schema(version)
if not is_version_allowed(version, allowed_versions) then
local err = ("Version '%s' is not allowed.\nAllowed versions: %s"):format(
version, format_allowed_versions(allowed_versions)
)
return {
lsn = box.info.lsn,
iid = box.info.id,
err = err,
}
end

local ok, err = pcall(function()
box.schema.downgrade(version)
box.snapshot()
end)

return {
lsn = box.info.lsn,
iid = box.info.id,
err = not ok and tostring(err) or nil,
}
end

return downgrade_schema(version)
Loading