From 956c134e6c86cc7753637c9b6c9b599d46490301 Mon Sep 17 00:00:00 2001 From: Maksim Tiushev Date: Fri, 4 Oct 2024 09:45:41 +0000 Subject: [PATCH] tt replicaset: add subcommand `upgrade` Part of tarantool#924 @TarantoolBot document Title: `tt replicaset upgrade` upgrades database schema. The `tt replicaset upgrade` command allows for a automate upgrade of each replicaset in a Tarantool cluster. The process is performed sequentially on the master instance and its replicas to ensure data consistency. Below are the steps involved: For Each Replicaset: - **On the Master Instance**: 1. Run the following commands in sequence to upgrade the schema and take a snapshot: ```lua box.schema.upgrade() box.snapshot() ``` - **On Each Replica**: 1. Wait for the replica to apply all transactions produced by the `box.schema.upgrade()` command executed on the master. This is done by monitoring the vector clocks (vclock) to ensure synchronization. 2. Once the repica has caught up, run the following command to take a snapshot: ```lua box.snapshot() ``` > **Error Handling**: If any errors occur during the upgrade process, the operation will halt, and an error report will be generated. --- - Timeout for Synchronization Replicas will wait for synchronization for a maximum of `Timeout` seconds. The default timeout is set to 5 seconds, but this can be adjusted manually using the `--timeout` option. **Example:** ```bash $ tt replicaset upgrade [] --timeout 10 ``` - Selecting Replicasets for Upgrade You can specify which replicaset(s) to upgrade by using the `--replicaset` or `-r` option to target specific replicaset names. **Example:** ```bash $ tt replicaset upgrade [ | ] --replicaset -r ... ``` This provides flexibility in upgrading only the desired parts of the cluster without affecting the entire system. --- cli/cmd/replicaset.go | 49 ++- cli/replicaset/cmd/lua/upgrade.lua | 11 + cli/replicaset/cmd/status.go | 46 +-- cli/replicaset/cmd/upgrade.go | 279 ++++++++++++++++++ .../single-t2-app/00000000000000000004.snap | Bin 0 -> 5082 bytes .../replicaset/single-t2-app/init.lua | 12 + .../replicaset/single-t2-app/tt.yaml | 9 + .../replicaset/test_replicaset_upgrade.py | 222 ++++++++++++++ 8 files changed, 605 insertions(+), 23 deletions(-) create mode 100644 cli/replicaset/cmd/lua/upgrade.lua create mode 100644 cli/replicaset/cmd/upgrade.go create mode 100644 test/integration/replicaset/single-t2-app/00000000000000000004.snap create mode 100644 test/integration/replicaset/single-t2-app/init.lua create mode 100644 test/integration/replicaset/single-t2-app/tt.yaml create mode 100644 test/integration/replicaset/test_replicaset_upgrade.py diff --git a/cli/cmd/replicaset.go b/cli/cmd/replicaset.go index 5068def08..49b0c5764 100644 --- a/cli/cmd/replicaset.go +++ b/cli/cmd/replicaset.go @@ -51,6 +51,35 @@ var ( " To specify relative path without `unix://` use `./`." ) +// newUpgradeCmd creates a "replicaset upgrade" command. +func newUpgradeCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "upgrade ( | ) [flags]\n\n" + + replicasetUriHelp, + DisableFlagsInUseLine: true, + Short: "Upgrade tarantool cluster", + Long: "Upgrade tarantool cluster.\n\n" + + libconnect.EnvCredentialsHelp + "\n\n", + Run: func(cmd *cobra.Command, args []string) { + cmdCtx.CommandName = cmd.Name() + err := modules.RunCmd(&cmdCtx, cmd.CommandPath(), &modulesInfo, + internalReplicasetUpgradeModule, args) + util.HandleCmdErr(cmd, err) + }, + Args: cobra.ExactArgs(1), + } + + cmd.Flags().StringArrayVarP(&replicasetcmd.ChosenReplicasetAliases, "replicaset", "r", + []string{}, "specify the replicaset name(s) to upgrade") + + cmd.Flags().IntVarP(&replicasetcmd.LsnTimeout, "timeout", "t", 5, + "timeout for waiting the LSN synchronization (in seconds)") + + addOrchestratorFlags(cmd) + addTarantoolConnectFlags(cmd) + return cmd +} + // newStatusCmd creates a "replicaset status" command. func newStatusCmd() *cobra.Command { cmd := &cobra.Command{ @@ -340,6 +369,7 @@ func NewReplicasetCmd() *cobra.Command { Aliases: []string{"rs"}, } + cmd.AddCommand(newUpgradeCmd()) cmd.AddCommand(newStatusCmd()) cmd.AddCommand(newPromoteCmd()) cmd.AddCommand(newDemoteCmd()) @@ -489,6 +519,23 @@ func replicasetFillCtx(cmdCtx *cmdcontext.CmdCtx, ctx *replicasetCtx, args []str return nil } +// internalReplicasetUpgradeModule is a "upgrade" command for the replicaset module. +func internalReplicasetUpgradeModule(cmdCtx *cmdcontext.CmdCtx, args []string) error { + var ctx replicasetCtx + if err := replicasetFillCtx(cmdCtx, &ctx, args, false); err != nil { + return err + } + if ctx.IsInstanceConnect { + defer ctx.Conn.Close() + } + return replicasetcmd.Upgrade(replicasetcmd.DiscoveryCtx{ + IsApplication: ctx.IsApplication, + RunningCtx: ctx.RunningCtx, + Conn: ctx.Conn, + Orchestrator: ctx.Orchestrator, + }) +} + // internalReplicasetPromoteModule is a "promote" command for the replicaset module. func internalReplicasetPromoteModule(cmdCtx *cmdcontext.CmdCtx, args []string) error { var ctx replicasetCtx @@ -560,7 +607,7 @@ func internalReplicasetStatusModule(cmdCtx *cmdcontext.CmdCtx, args []string) er if ctx.IsInstanceConnect { defer ctx.Conn.Close() } - return replicasetcmd.Status(replicasetcmd.StatusCtx{ + return replicasetcmd.Status(replicasetcmd.DiscoveryCtx{ IsApplication: ctx.IsApplication, RunningCtx: ctx.RunningCtx, Conn: ctx.Conn, diff --git a/cli/replicaset/cmd/lua/upgrade.lua b/cli/replicaset/cmd/lua/upgrade.lua new file mode 100644 index 000000000..d755e4c1d --- /dev/null +++ b/cli/replicaset/cmd/lua/upgrade.lua @@ -0,0 +1,11 @@ +local ok, err +ok, err = pcall(box.schema.upgrade) +if ok then + ok, err = pcall(box.snapshot) +end + +return { + lsn = box.info.lsn, + iid = box.info.id, + err = (not ok) and tostring(err) or nil, +} \ No newline at end of file diff --git a/cli/replicaset/cmd/status.go b/cli/replicaset/cmd/status.go index 99a8c41c1..687843fc6 100644 --- a/cli/replicaset/cmd/status.go +++ b/cli/replicaset/cmd/status.go @@ -10,41 +10,43 @@ import ( "github.com/tarantool/tt/cli/running" ) -// StatusCtx contains information about replicaset status command execution -// context. -type StatusCtx struct { +// DiscoveryCtx contains information about replicaset discovery. +type DiscoveryCtx struct { // IsApplication true if an application passed. IsApplication bool // RunningCtx is an application running context. RunningCtx running.RunningCtx // Conn is an active connection to a passed instance. Conn connector.Connector - // Orchestrator is a forced orchestator choice. + // Orchestrator is a forced orchestrator choice. Orchestrator replicaset.Orchestrator } -// Status shows a replicaset status. -func Status(statusCtx StatusCtx) error { - orchestratorType, err := getOrchestratorType(statusCtx.Orchestrator, - statusCtx.Conn, statusCtx.RunningCtx) +// GetReplicasets discovers and returns the list of replicasets. +func GetReplicasets(ctx DiscoveryCtx) (replicaset.Replicasets, error) { + orchestratorType, err := getOrchestratorType(ctx.Orchestrator, ctx.Conn, ctx.RunningCtx) if err != nil { - return err + return replicaset.Replicasets{}, err } var orchestrator replicasetOrchestrator - if statusCtx.IsApplication { - if orchestrator, err = makeApplicationOrchestrator( - orchestratorType, statusCtx.RunningCtx, nil, nil); err != nil { - return err - } + if ctx.IsApplication { + orchestrator, err = makeApplicationOrchestrator(orchestratorType, + ctx.RunningCtx, nil, nil) } else { - if orchestrator, err = makeInstanceOrchestrator( - orchestratorType, statusCtx.Conn); err != nil { - return err - } + orchestrator, err = makeInstanceOrchestrator(orchestratorType, ctx.Conn) + } + + if err != nil { + return replicaset.Replicasets{}, err } - replicasets, err := orchestrator.Discovery(replicaset.SkipCache) + return orchestrator.Discovery(replicaset.SkipCache) +} + +// Status shows a replicaset status. +func Status(discoveryCtx DiscoveryCtx) error { + replicasets, err := GetReplicasets(discoveryCtx) if err != nil { return err } @@ -61,7 +63,7 @@ func statusReplicasets(replicasets replicaset.Replicasets) error { fmt.Println("Orchestrator: ", replicasets.Orchestrator) fmt.Println("Replicasets state:", replicasets.State) - replicasets = fillAliases(replicasets) + replicasets = FillAliases(replicasets) replicasets = sortAliases(replicasets) if len(replicasets.Replicasets) > 0 { @@ -73,9 +75,9 @@ func statusReplicasets(replicasets replicaset.Replicasets) error { return nil } -// fillAliases fills missed aliases with UUID. The case: Tarantool 1.10 without +// FillAliases fills missed aliases with UUID. The case: Tarantool 1.10 without // an orchestrator. -func fillAliases(replicasets replicaset.Replicasets) replicaset.Replicasets { +func FillAliases(replicasets replicaset.Replicasets) replicaset.Replicasets { for i := range replicasets.Replicasets { replicaset := &replicasets.Replicasets[i] if replicaset.Alias == "" { diff --git a/cli/replicaset/cmd/upgrade.go b/cli/replicaset/cmd/upgrade.go new file mode 100644 index 000000000..a3a96b37a --- /dev/null +++ b/cli/replicaset/cmd/upgrade.go @@ -0,0 +1,279 @@ +package replicasetcmd + +import ( + _ "embed" + "errors" + "fmt" + "time" + + "github.com/mitchellh/mapstructure" + "github.com/tarantool/tt/cli/connector" + "github.com/tarantool/tt/cli/replicaset" + "github.com/tarantool/tt/cli/running" +) + +var ( + ChosenReplicasetAliases []string + LsnTimeout int +) + +type InstanceMeta struct { + run running.InstanceCtx + conn connector.Connector +} + +//go:embed lua/upgrade.lua +var upgradeMasterLua string + +type SyncInfo struct { + LSN uint64 `mapstructure:"lsn"` + IID uint32 `mapstructure:"iid"` + Err *string `mapstructure:"err"` +} + +// "FilterReplicasetsByAliases" filters the given replicaset list by chosen aliases +// and returns the chosen replicasets. If a non-existent alias is found, it returns an error. +func FilterReplicasetsByAliases(replicasets replicaset.Replicasets) ([]replicaset.Replicaset, + error) { + // If no aliases are provided, return all replicasets. + if len(ChosenReplicasetAliases) == 0 { + return replicasets.Replicasets, nil + } + + // Create a map for fast lookup of replicasets by alias + replicasetMap := make(map[string]replicaset.Replicaset) + for _, rs := range replicasets.Replicasets { + replicasetMap[rs.Alias] = rs + } + + var chosenReplicasets []replicaset.Replicaset + for _, alias := range ChosenReplicasetAliases { + rs, exists := replicasetMap[alias] + if !exists { + return nil, fmt.Errorf("replicaset with alias %q doesn't exist", alias) + } + chosenReplicasets = append(chosenReplicasets, rs) + } + + return chosenReplicasets, nil +} + +func Upgrade(discoveryCtx DiscoveryCtx) error { + replicasets, err := GetReplicasets(discoveryCtx) + if err != nil { + // This may be a single-instance application without Tarantool-3 config + // or instances.yml file. + if len(discoveryCtx.RunningCtx.Instances) == 1 { + // Create a dummy replicaset + var replicasetList []replicaset.Replicaset + var dummyReplicaset replicaset.Replicaset + var instance replicaset.Instance + + instance.InstanceCtx = discoveryCtx.RunningCtx.Instances[0] + instance.Alias = running.GetAppInstanceName(instance.InstanceCtx) + instance.InstanceCtxFound = true + + dummyReplicaset.Alias = instance.Alias + dummyReplicaset.Instances = append(dummyReplicaset.Instances, instance) + replicasetList = append(replicasetList, dummyReplicaset) + + return internalUpgrade(replicasetList) + } + return err + } + + replicasets = FillAliases(replicasets) + replicasetsToUpgrade, err := FilterReplicasetsByAliases(replicasets) + if err != nil { + return err + } + + return internalUpgrade(replicasetsToUpgrade) +} + +func internalUpgrade(replicasets []replicaset.Replicaset) error { + for _, replicaset := range replicasets { + err := upgradeReplicaset(replicaset) + if err != nil { + fmt.Printf("• %s: error\n", replicaset.Alias) + return fmt.Errorf("replicaset %s: %w", replicaset.Alias, err) + } + fmt.Printf("• %s: ok\n", replicaset.Alias) + } + return nil +} + +func getInstanceConnector(instance replicaset.Instance) (connector.Connector, error) { + run := instance.InstanceCtx + fullInstanceName := running.GetAppInstanceName(run) + if fullInstanceName == "" { + fullInstanceName = instance.Alias + } + if fullInstanceName == "" { + fullInstanceName = "unknown" + } + + // Try to connect via unix socket + conn, err := connector.Connect(connector.ConnectOpts{ + Network: "unix", + Address: run.ConsoleSocket, + }) + + if err != nil { + // try to connect via TCP [experimental] + conn, err = connector.Connect(connector.ConnectOpts{ + Network: "tcp", + Address: instance.URI, + Username: "client", // should be opt + Password: "secret", // should be opt + }) + if err != nil { + return nil, fmt.Errorf("instance %s failed to connect via both TCP "+ + "and UNIX socket [%s]: %w", fullInstanceName, instance.URI, err) + } + } + return conn, nil +} + +func СollectRWROInfo(replicaset replicaset.Replicaset, master **InstanceMeta, + replicas *[]InstanceMeta) error { + for _, instance := range replicaset.Instances { + run := instance.InstanceCtx + fullInstanceName := running.GetAppInstanceName(run) + conn, err := getInstanceConnector(instance) + + if err != nil { + return err + } + + var isRW bool + if instance.Mode.String() != "unknown" { + isRW = instance.Mode.String() == "rw" + } else { + res, err := conn.Eval( + "return (type(box.cfg) == 'function') or box.info.ro", + []any{}, connector.RequestOpts{}) + if err != nil { + return fmt.Errorf("[%s]: %w", fullInstanceName, err) + } + isRW = !res[0].(bool) + } + + if isRW && *master != nil { + return fmt.Errorf("%s and %s are both masters", + running.GetAppInstanceName((*master).run), fullInstanceName) + } else if isRW { + *master = &InstanceMeta{run, conn} + } else { + *replicas = append(*replicas, InstanceMeta{run, conn}) + } + } + return nil +} + +func WaitLSN(conn connector.Connector, masterIID uint32, masterLSN uint64) error { + var lastError error + query := fmt.Sprintf("return box.info.vclock[%d]", masterIID) + + deadline := time.Now().Add(time.Duration(LsnTimeout) * time.Second) + for { + res, err := conn.Eval(query, []any{}, connector.RequestOpts{}) + if err != nil { + lastError = fmt.Errorf("failed to evaluate LSN query: %w", err) + } else if len(res) == 0 { + lastError = errors.New("empty result from LSN query") + } else { + var lsn uint64 + if err := mapstructure.Decode(res[0], &lsn); err != nil { + lastError = fmt.Errorf("failed to decode LSN: %w", err) + } else if lsn >= masterLSN { + return nil + } else { + lastError = fmt.Errorf("current LSN %d is behind required "+ + "master LSN %d", lsn, masterLSN) + } + } + + if time.Now().After(deadline) { + break + } + + time.Sleep(1 * time.Second) + } + + return lastError +} + +func upgradeMaster(master *InstanceMeta) (SyncInfo, error) { + var upgradeInfo SyncInfo + fullMasterName := running.GetAppInstanceName(master.run) + res, err := master.conn.Eval(upgradeMasterLua, []any{}, connector.RequestOpts{}) + if err != nil { + return upgradeInfo, fmt.Errorf( + "failed to execute upgrade script on master instance - %s: %w", + fullMasterName, err) + } + + if err := mapstructure.Decode(res[0], &upgradeInfo); err != nil { + return upgradeInfo, fmt.Errorf( + "failed to decode response from master instance - %s: %w", + fullMasterName, err) + } + + if upgradeInfo.Err != nil { + return upgradeInfo, fmt.Errorf( + "master instance upgrade failed - %s: %w", + fullMasterName, err) + } + return upgradeInfo, nil +} + +func Snapshot(instance *InstanceMeta) error { + res, err := instance.conn.Eval("return box.snapshot()", []any{}, + connector.RequestOpts{}) + if err != nil { + return fmt.Errorf("failed to execute snapshot on replica: %w", err) + } + if len(res) == 0 { + return fmt.Errorf("snapshot command on %s returned an empty result, "+ + "'ok' - expected", running.GetAppInstanceName(instance.run)) + } + return nil +} + +func upgradeReplicaset(replicaset replicaset.Replicaset) error { + var master *InstanceMeta = nil + replicas := []InstanceMeta{} + + err := СollectRWROInfo(replicaset, &master, &replicas) + if err != nil { + return err + } + + // upgrade master instance, collect LSN and IID from master instance + upgradeInfo, err := upgradeMaster(master) + if err != nil { + return err + } + + // upgrade replica instances + masterLSN := upgradeInfo.LSN + masterIID := upgradeInfo.IID + + for _, replica := range replicas { + fullReplicaName := running.GetAppInstanceName(replica.run) + err := WaitLSN(replica.conn, masterIID, masterLSN) + if err != nil { + return fmt.Errorf("can't ensure that upgrade operations performed on %s "+ + "are replicated to %s to perform snapshotting on it: error "+ + "waiting LSN %d in vclock component %d: %w", + running.GetAppInstanceName(master.run), fullReplicaName, + masterLSN, masterIID, err) + } + err = Snapshot(&replica) + if err != nil { + return err + } + } + return nil +} diff --git a/test/integration/replicaset/single-t2-app/00000000000000000004.snap b/test/integration/replicaset/single-t2-app/00000000000000000004.snap new file mode 100644 index 0000000000000000000000000000000000000000..80e614a6e2f73713e6e9f3da800cb98d1f253c32 GIT binary patch literal 5082 zcmV<06D90ZPC-x#FfK7O3RY!ub7^mGIv_GGF)=PQEif%-IAdjEVrDX9VPpzPZgX^D zZewLSAT?q&V`efrF)d>@WnnEeI51-^VKFsfEjeK_Gc;s5W;bCnHVRflY;R+0Iv{&7 zIv_NC3JTS_3%bn{Tma4}cR=-}0000004TLD{Qy{R<#K1Q>w+g9fm$3JLLhrzDb@BpXxY=E|9BiYZJ_B=KGP|7{le;b!NmnIAv= z#`7tWYXW=%B?6huC%=_?-%9hmmAA4%ThB-&C^8!1B{n9u_?FbAZVJOoql0t-Maewm!x@a827 zf*7m;p(`@rz!ifUxZ)Lo7`QDEbh{}4bh{<6fRCbp_Xs55{Q*&KxXuu`;resH4Xtjt zLYaMEK0mduCs1~}->iH&@$m*c@xA~j-uD==c?#H@fS$bv;K8;g1Yq0F0N93YOqxVR z39u6t6#$c{C>;U3UlM@#N_4zWvZQw;N_sz%q<14o&J%zm=UK$adES)>0N;=z;^rat zS~Vd=#%o3M;CR6`$dFsVJKL~wgq*pfyBvaBvLHFK*wn4x{5$?xm`KPrhDSnQSR)epK3XV53&lGmQo&K+bo_m)!yRs>V`DKXbi(v>m|!UoB;X`P z5kwXX6J=Bxl_TVn*`DpytnKC8ZZX@ly)RB!)ojQ1o}L1U+1}F%NSu%NM<6wxAAsDa z+cK}=^VlP+c<7PUtlp7FPwl{?r*+)X)1J~{N1D!2N1Dn( zN4mx_$M*$?9JQ}E;z+H?^vex6`fuZn{@HM&|Nhr#<9{{S_&<#`{$J5hI!7Jj`Vi|@+{ENWj>T(SFC3GDuh3M;ywqKfXNprZTk zds8_1Fe#jT6g?;J=j{)s5OcLECUn|Q)|CYdH%rvmxKSd^qVMWq z7C|t3Hc7n69tk(uACW}%*&Bfb_C*|lJrPD=kNpruT;GO>>)8x({qYa_!b~t>VZuxm zq{2*dO#w{UHc?Pvo7?=7f;LY;L7P!uw5iS838o1ZWt!{L&9pLC73wynPq*o-x9K+d z)aU+vcJe;^jCL|}Co*r83}n#Qr3P^ShCA&a)UsL_7(a39h9s87)0M26;`MKUY|2~&s`P17~ z{`7Y2Q&4AzqJ%pgRS?StA<*&H7ySO}u77b4clbPcxZ@}k-f4Y9Q0sJy7>e7(eLd)N zSD3%gJD(kYEP_6Jc^rlp)J`7Fza;J?dA0oR>BoKNvj)%GFS-Tdoqx)E*G=TMEApn~ zzB()Y*fWWJ5$<#Zbr$G=Diz$>*{okwmvdE6Slk)JvOhm&RB+4sK>1e$dG?9VzF2c^ z)8P%5;SF`a-|ux$cjq(LF^T`qI~0F^WoF~l342h>I(bq?1$VZ`$-igkBNln}sQWwR zAt-8`N3Na8jj_srSeD0tC!AG(^E(yfvP}v=@0s%-YMpM-XD*!TtLkq1=B94g%#l0y@3F$*z{H^biBH`Wd});E)Y2qbl5absX^<%S zz>XxTff+%|Nc7e2 zkKuh)C$-DWMSUs5XX0wxTF-EN+H01tpA?k|E`Q$@!v!WlBM&1%;z8v=f4& zW;Y+Go~I_Zr&DcGYGpr8oH%i$u%0J|eaC9vX;u#+i&-)=Bd3nh)axu1G|U>soy+EM zta-l*v$mxumX&QVJD=j!SzBf{bjHL;K#nDP^EzqEm9T*>A>gMQmp_DVdBSR`5<$DA z7ea3tYDsnM)2r`sO6qK^%xp+2b@7tJQHdM>1OWw!musNkvG@1qCkTr?1R{cy#H5t7 zN#+o(3YDQMMYNwOG@)-u=&KFYf;a;viAgC(^~Q}Gyt$a2j#+1tn3QrcKJF}~W1mWq z0>y1YSj!~##SEw^Cd*$+*_gAR{3A|dD~dQVWU?{cENj9NsHFspg#;`kSR@gULO259 z=z}8<4q`@3Ik+)KOEfPu4rpD5HW<=ih)YlgK}u9XTY$6xWuXGXGQ$8jyYh1Z z?#BWjD?SzYB$fV_Pmr8Y;A8?P67)pS13^y}nT+Si;;7tM9wQzqO{O#{vUZ1P=ZoQ} z-r&?J?-WZ4Bb+8a*!*UTsW9+c)>-)^F$NiNBVV42N0m*zY{K^Z(d` zn`iH_2Z69(hIUzFxa@yG+wR7o&s!o7ufES!g&@yrDO{R|TUSKW&9%aRPaW6B{i0f; z&Shxk?|l3Bn}?t|f;=k5?S)d>lj(J}i8&yWqTMU{0q%XPT!%GR)JVGmI%FYobssG>TMClxLQ%Het90 zIOSehFk>8OAP?e&{ zRo;TE%H3O(JJ=c?hGUtbkARVP5~C2fYb=dR2qwobY|(1KFi(P-c#meZ`S`g2I(4MN zGR4Fr@w*g|3^0*{wugMno*e^;W7Ho`SZcYYR-$0i{fdYTme=g3pJv2SmC*)}$=fs6 z(_K+IzTAORXdEx9|%vJ%>`yBzz(ADdxPWsVRjxV1PH_KJ9mQDy)9Ea zST++mfmM@eCTgOsrnnh|j6AYrm{=wp-ViYTM4k*LAQFEH(sKylAJ!K{ctPnE>gcm1 zo2f;%DGa$$Gyx|mSG5Wk^h8U|f;7WJOt9Aa+0S}&1N2jJJBQEKnnW{py}F+-^~CA{ zVY3u7QXq+cvJFra+ttc`RpZ4Wqnp#B&58Nj$0s}(a|kX(hF$$uuRzvO|E$^gbfP$W zGa&HzO}v!^z?atVNJxj6t))KB_^h($E{OY&MOF~ z86<9Ms}edoFqX_<++i9{mMiN<5DsE$l34^vI4DL|jG|Qphci^b^Rp}>&d7;Nl(bvY zs5)r63^m_387F^bD;bZ7w7hPRRbuXyvNpZ3R+`V=;Elw7>`B~BDVDsx4n&ZXgLh%L zVl%|IImMwExS!V7S!wSbb#)H?AS%PiL9aZ9HZ9p5Ln9g;9IdD5xMTcG8c9ALw z0~PQ(D6r*LpchLVHP+&l8(h$+iFw7~1fLX|ocdi=%Cb0n8~VjMwSz%3Aw^ur0Zz0V z_#m6wVuj{Jt|HQG-J)~P=mrxZex_JqeiXe6e#E?7rnQst$h6#fOBt}x zvt60LT6LS$FBKgIXVqc{xRO@>wG6_Z9X2wFywyN)P60l0ibh9a#CgOI@i>)0B8EH^ z`iRn#j}89c?S%3L(dvVR2CjIAj#RoT*}cL-jeDx;jj+{CS%AEtM>6*(P8i+f&*J{3dr07asz}4K?1ga8%GYLiul=GGbP8Xp*NA4*a+Jy< zZ7(|B<6>!^GSC@cYp@NcwgH6-&=0xwt^56C=-f$v|9BzP}{ z%X_@?)eyJKMPBYevy_0|-bs=TtwUve1TcEr4<#k&vcGY{G{k8kuHjJRYUjivqMs;+ zjGz|L6mDoOBSbHV3nJ`WZP|>_zZA#{sp#-JokS-A_{R>eqDeYJE))*ziTL{an;U8T zy};DYv3g=AO=xhE`8o3!T7amSF%9W}ED*x{)s5Yl+)GKmkQRinx7xBAxjjjMmjZzi z^_Gq;MloL!Hd#avBK*?DYDo4?0bP^`K-#gjw;0R+TNCHNj--7#fsGJLd_A?4D|mq(p5I}`#?3qU3DGaIp78Q#wL(h;~%<2xG z&f-M5K}s<&5fKseH}4(Q>tFB2U3j;R?G>W_ZQHMGT=?XYDo`QldYCX{A}8fLVTnFO zVguWxN<&1`^}7u=JOM(zUJ=T7MFYxr9EW1t(qZ5EUVbnf+s2X-hd}UWLZzzBJS&9e zSni!ZMm*J3_#4Vg=A1d==IROECgK^@~{W33Ms4P>e!R>r1mexUkoEe wAXeBbVMKJ7-b$p;P7`$xj#wu8KLQT$V21UcgN~IN7wjX8lEfjRE7cIK?Q%GgX8-^I literal 0 HcmV?d00001 diff --git a/test/integration/replicaset/single-t2-app/init.lua b/test/integration/replicaset/single-t2-app/init.lua new file mode 100644 index 000000000..8d5d71d86 --- /dev/null +++ b/test/integration/replicaset/single-t2-app/init.lua @@ -0,0 +1,12 @@ +local fiber = require('fiber') +local fio = require('fio') + +box.cfg({}) + +fh = fio.open('ready', {'O_WRONLY', 'O_CREAT'}, tonumber('644',8)) +fh:close() + +while true do + fiber.sleep(5) +end + diff --git a/test/integration/replicaset/single-t2-app/tt.yaml b/test/integration/replicaset/single-t2-app/tt.yaml new file mode 100644 index 000000000..ee312214a --- /dev/null +++ b/test/integration/replicaset/single-t2-app/tt.yaml @@ -0,0 +1,9 @@ +env: + instances_enabled: . + +default: + app: + dir: . + file: init.lua + memtx_dir: ./ + wal_dir: ./ diff --git a/test/integration/replicaset/test_replicaset_upgrade.py b/test/integration/replicaset/test_replicaset_upgrade.py new file mode 100644 index 000000000..b914e4de8 --- /dev/null +++ b/test/integration/replicaset/test_replicaset_upgrade.py @@ -0,0 +1,222 @@ +import os +import re +import shutil +import subprocess +import tempfile + +import pytest +from replicaset_helpers import stop_application +from vshard_cluster import VshardCluster + +from utils import get_tarantool_version, run_command_and_get_output, wait_file + +tarantool_major_version, _ = get_tarantool_version() + + +def run_command_on_instance(tt_cmd, tmpdir, full_inst_name, cmd): + con_cmd = [tt_cmd, "connect", full_inst_name, "-f", "-"] + instance_process = subprocess.Popen( + con_cmd, + cwd=tmpdir, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + stdin=subprocess.PIPE, + text=True, + ) + instance_process.stdin.writelines([cmd]) + instance_process.stdin.close() + output = instance_process.stdout.read() + return output + + +@pytest.mark.skipif( + tarantool_major_version < 3, reason="skip centralized config test for Tarantool < 3" +) +def test_upgrade_cluster(tt_cmd, tmpdir_with_cfg): + tmpdir = tmpdir_with_cfg + app_name = "test_ccluster_app" + app_path = os.path.join(tmpdir, app_name) + shutil.copytree(os.path.join(os.path.dirname(__file__), app_name), app_path) + + replicasets = [ + "replicaset-001", + "replicaset-002", + ] + + try: + # Start a cluster. + start_cmd = [tt_cmd, "start", app_name] + rc, out = run_command_and_get_output(start_cmd, cwd=tmpdir) + assert rc == 0 + + for i in range(1, 6): + file = wait_file( + os.path.join(tmpdir, app_name), f"ready-instance-00{i}", [] + ) + assert file != "" + + _ = run_command_on_instance( + tt_cmd, tmpdir, "test_ccluster_app:instance-004", "box.cfg{read_only=true}" + ) + + status_cmd = [tt_cmd, "replicaset", "upgrade", app_name] + + rc, out = run_command_and_get_output(status_cmd, cwd=tmpdir) + assert rc == 0 + + upgrade_out = out.strip().split("\n") + assert len(upgrade_out) == len(replicasets) + + for i in range(len(replicasets)): + match = re.search(r"•\s*(.*?):\s*(.*)", upgrade_out[i]) + assert match.group(1) in replicasets + assert match.group(2) == "ok" + + finally: + stop_application(tt_cmd, app_name, tmpdir, []) + + +@pytest.mark.skipif( + tarantool_major_version < 3, reason="skip centralized config test for Tarantool < 3" +) +def test_upgrade_multi_master(tt_cmd, tmpdir_with_cfg): + tmpdir = tmpdir_with_cfg + app_name = "test_ccluster_app" + app_path = os.path.join(tmpdir, app_name) + shutil.copytree(os.path.join(os.path.dirname(__file__), app_name), app_path) + try: + # Start a cluster. + start_cmd = [tt_cmd, "start", app_name] + rc, out = run_command_and_get_output(start_cmd, cwd=tmpdir) + assert rc == 0 + + for i in range(1, 6): + file = wait_file( + os.path.join(tmpdir, app_name), f"ready-instance-00{i}", [] + ) + assert file != "" + + status_cmd = [tt_cmd, "replicaset", "upgrade", app_name] + + rc, out = run_command_and_get_output(status_cmd, cwd=tmpdir) + assert rc == 1 + assert "replicaset-002: error" in out and "are both masters" in out + + finally: + stop_application(tt_cmd, app_name, tmpdir, []) + + +@pytest.mark.skipif( + tarantool_major_version < 3, reason="skip centralized config test for Tarantool < 3" +) +def test_upgrade_t2_app_dummy_replicaset(tt_cmd): + app_name = "single-t2-app" + test_app_path_src = os.path.join(os.path.dirname(__file__), app_name) + + # snapshot from tarantool 2.11.4 app + snapfile = os.path.join(test_app_path_src, "00000000000000000004.snap") + + with tempfile.TemporaryDirectory() as tmpdir: + test_app_path = os.path.join(tmpdir, app_name) + shutil.copytree(test_app_path_src, test_app_path) + memtx_dir = os.path.join(test_app_path, "var", "lib", app_name) + os.makedirs(memtx_dir, exist_ok=True) + shutil.copy(snapfile, memtx_dir) + + try: + start_cmd = [tt_cmd, "start", app_name] + rc, out = run_command_and_get_output(start_cmd, cwd=test_app_path) + assert rc == 0 + + file = wait_file(test_app_path, "ready", []) + assert file != "" + + out = run_command_on_instance( + tt_cmd, + test_app_path, + app_name, + "return box.space.example_space:select{2}", + ) + assert "[2, 'Second record']" in out + + upgrade_cmd = [tt_cmd, "replicaset", "upgrade", app_name] + rc, out = run_command_and_get_output(upgrade_cmd, cwd=test_app_path) + assert rc == 0 + assert out == "• single-t2-app: ok\n" + finally: + stop_application(tt_cmd, app_name, test_app_path, []) + + +@pytest.mark.skipif(tarantool_major_version < 3, + reason="skip test with cluster config for Tarantool < 3") +def test_upgrade_downgraded_cluster_replicasets(tt_cmd, tmp_path): + app_name = "vshard_app" + replicasets = { + "router-001": ["router-001-a"], + "storage-001": ["storage-001-a", "storage-001-b"], + "storage-002": ["storage-002-a", "storage-001-a"], + } + app = VshardCluster(tt_cmd, tmp_path, app_name) + try: + app.build() + app.start() + cmd_master = '''box.space._schema:run_triggers(false) +box.space._schema:delete('replicaset_name') +box.space._schema:run_triggers(true) + +box.space._cluster:run_triggers(false) +box.atomic(function() + for _, tuple in box.space._cluster:pairs() do + pcall(box.space._cluster.update, box.space._cluster, {tuple.id}, {{'#', 'name', 1}}) + end +end) +box.space._cluster:run_triggers(true) +box.schema.downgrade('2.11.1') +box.snapshot() + ''' + # downgrade cluster + for replicaset_name, replicaset in replicasets.items(): + _ = run_command_on_instance( + tt_cmd, + tmp_path, + f"{app_name}:{replicaset[0]}", + cmd_master + ) + if len(replicaset) == 2: + _ = run_command_on_instance( + tt_cmd, + tmp_path, + f"{app_name}:{replicaset[1]}", + "box.snapshot()" + ) + + out = run_command_on_instance( + tt_cmd, + tmp_path, + f"{app_name}:storage-001-a", + "box.schema.space.create('example_space')" + ) + assert "error: Your schema version is 2.11.1" in out + + upgrade_cmd = [tt_cmd, "replicaset", "upgrade", app_name] + + rc, out = run_command_and_get_output(upgrade_cmd, cwd=tmp_path) + + assert rc == 1 + pattern = r"error waiting LSN (\d+) in vclock component (\d+):" + match = re.search(pattern, out) + assert match + + # For some reason LSN is not updated to master LSN even when timeout is increased. + # + # assert rc == 0 + # upgrade_out = out.strip().split("\n") + # assert len(upgrade_out) == len(replicasets) + + # for i in range(len(replicasets)): + # match = re.search(r"•\s*(.*?):\s*(.*)", upgrade_out[i]) + # assert match.group(1) in replicasets + # assert match.group(2) == "ok" + + finally: + app.stop()