Skip to content

Commit

Permalink
tt replicaset upgrade: support remote replicaset
Browse files Browse the repository at this point in the history
Part of tarantool#968

@TarantoolBot document
Title: `tt replicaset upgrade` supports upgrade database schema
on remote replica sets

`tt replicaset upgrade` now allows upgrade database schema on remote
replica sets within a Tarantool cluster. The feature enables to perform
upgrades by targeting specific instances of each replica set.

**Usage Example**:

Cluster configuration:
```
• router-001
    • router-001-a localhost:3305 rw
• storage-001
    • storage-001-a localhost:3301 rw
    • storage-001-b localhost:3302 read
• storage-002
    • storage-002-a localhost:3303 rw
    • storage-002-b localhost:3304 read
```

To update the schema across this cluster:

1. Update each replica set individually.
2. Select one instance from each replica set to run the command.

**Commands**:
```bash
$ tt replicaset upgrade tcp://client:secret@127.0.0.1:3301
  • storage-001: ok
$ tt replicaset upgrade tcp://client:secret@127.0.0.1:3304
  • storage-002: ok
$ tt replicaset upgrade tcp://client:secret@127.0.0.1:3305
  • router-001: ok
```
  • Loading branch information
mandesero committed Nov 27, 2024
1 parent 7d8a6cf commit cc2a233
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 19 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- `tt replicaset upgrade`: command to upgrade the schema on a Tarantool cluster.
* `-r (--replicaset)`: specify the replicaset name(s) to upgrade.
* `-t (--timeout)`: timeout for waiting the LSN synchronization (in seconds) (default 5).
* supports upgrading the database schema on remote cluster by upgrading
each replica set individually using `tt replicaset upgrade <URI>`.
- New flag `--timestamp` of `tt cat` and `tt play` commands is added to specify
operations ending with the given timestamp. This value can be specified
as a number or using [RFC3339/RFC3339Nano](https://go.dev/src/time/format.go) time format.
Expand Down
18 changes: 16 additions & 2 deletions cli/cmd/replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ var (
// newUpgradeCmd creates a "replicaset upgrade" command.
func newUpgradeCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "upgrade (<APP_NAME>) [flags]",
Use: "upgrade (<APP_NAME> | <URI>) [flags]\n\n" +
replicasetUriHelp,
DisableFlagsInUseLine: true,
Short: "Upgrade tarantool cluster",
Long: "Upgrade tarantool cluster.\n\n" +
Expand All @@ -79,6 +80,7 @@ func newUpgradeCmd() *cobra.Command {
"timeout for waiting the LSN synchronization (in seconds)")

addOrchestratorFlags(cmd)
addTarantoolConnectFlags(cmd)
return cmd
}

Expand Down Expand Up @@ -530,6 +532,18 @@ func internalReplicasetUpgradeModule(cmdCtx *cmdcontext.CmdCtx, args []string) e
if ctx.IsInstanceConnect {
defer ctx.Conn.Close()
}

connectCtx := connect.ConnectCtx{
Username: replicasetUser,
Password: replicasetPassword,
SslKeyFile: replicasetSslKeyFile,
SslCertFile: replicasetSslCertFile,
SslCaFile: replicasetSslCaFile,
SslCiphers: replicasetSslCiphers,
}
var connOpts connector.ConnectOpts
connOpts, _, _ = resolveConnectOpts(cmdCtx, cliOpts, &connectCtx, args)

return replicasetcmd.Upgrade(replicasetcmd.DiscoveryCtx{
IsApplication: ctx.IsApplication,
RunningCtx: ctx.RunningCtx,
Expand All @@ -538,7 +552,7 @@ func internalReplicasetUpgradeModule(cmdCtx *cmdcontext.CmdCtx, args []string) e
}, replicasetcmd.UpgradeOpts{
ChosenReplicasetAliases: chosenReplicasetAliases,
LsnTimeout: lsnTimeout,
})
}, connOpts)
}

// internalReplicasetPromoteModule is a "promote" command for the replicaset module.
Expand Down
57 changes: 41 additions & 16 deletions cli/replicaset/cmd/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func filterReplicasetsByAliases(replicasets replicaset.Replicasets,
}

// Upgrade upgrades tarantool schema.
func Upgrade(discoveryCtx DiscoveryCtx, opts UpgradeOpts) error {
func Upgrade(discoveryCtx DiscoveryCtx, opts UpgradeOpts, connOpts connector.ConnectOpts) error {
replicasets, err := getReplicasets(discoveryCtx)
if err != nil {
return err
Expand All @@ -75,12 +75,13 @@ func Upgrade(discoveryCtx DiscoveryCtx, opts UpgradeOpts) error {
return err
}

return internalUpgrade(replicasetsToUpgrade, opts.LsnTimeout)
return internalUpgrade(replicasetsToUpgrade, opts.LsnTimeout, connOpts)
}

func internalUpgrade(replicasets []replicaset.Replicaset, lsnTimeout int) error {
func internalUpgrade(replicasets []replicaset.Replicaset, lsnTimeout int,
connOpts connector.ConnectOpts) error {
for _, replicaset := range replicasets {
err := upgradeReplicaset(replicaset, lsnTimeout)
err := upgradeReplicaset(replicaset, lsnTimeout, connOpts)
if err != nil {
fmt.Printf("• %s: error\n", replicaset.Alias)
return fmt.Errorf("replicaset %s: %w", replicaset.Alias, err)
Expand All @@ -99,7 +100,8 @@ func closeConnectors(master *instanceMeta, replicas []instanceMeta) {
}
}

func getInstanceConnector(instance replicaset.Instance) (connector.Connector, error) {
func getInstanceConnector(instance replicaset.Instance,
connOpts connector.ConnectOpts) (connector.Connector, error) {
run := instance.InstanceCtx
fullInstanceName := running.GetAppInstanceName(run)
if fullInstanceName == "" {
Expand All @@ -116,33 +118,55 @@ func getInstanceConnector(instance replicaset.Instance) (connector.Connector, er
})

if err != nil {
return nil, fmt.Errorf("instance %s failed to connect via UNIX socket "+
": %w", fullInstanceName, err)
fErr := err
conn, err = connector.Connect(connector.ConnectOpts{
Network: connOpts.Network,
Address: instance.URI,
Username: connOpts.Username,
Password: connOpts.Password,
Ssl: connOpts.Ssl,
})
if err != nil {
return nil, fmt.Errorf("instance %s failed to connect via UNIX socket "+
"and uri: %w %w", fullInstanceName, err, fErr)
}
}
return conn, nil
}

func collectRWROInfo(replset replicaset.Replicaset) (*instanceMeta, []instanceMeta,
func collectRWROInfo(replset replicaset.Replicaset,
connOpts connector.ConnectOpts) (*instanceMeta, []instanceMeta,
error) {
var master *instanceMeta = nil
var replicas []instanceMeta
for _, instance := range replset.Instances {
run := instance.InstanceCtx
fullInstanceName := running.GetAppInstanceName(run)
conn, err := getInstanceConnector(instance)
conn, err := getInstanceConnector(instance, connOpts)

if err != nil {
return nil, nil, err
}

isRW := false

if instance.Mode == replicaset.ModeUnknown {
closeConnectors(master, replicas)
return nil, nil, fmt.Errorf(
"can't determine RO/RW mode on instance: %s", fullInstanceName)
// TODO (gh-1034): Temporary workaround for mode assignment.
// Ideally, the discovery code should be updated to handle
// this case properly.
res, err := conn.Eval(
"return (type(box.cfg) == 'function') or box.info.ro",
[]any{}, connector.RequestOpts{})
if err != nil || len(res) == 0 {
return nil, nil, fmt.Errorf(
"can't determine RO/RW mode on instance: %s",
fullInstanceName)
}
isRW = !res[0].(bool)
} else {
isRW = instance.Mode.String() == "rw"
}

isRW := instance.Mode.String() == "rw"

if isRW && master != nil {
closeConnectors(master, replicas)
return nil, nil, fmt.Errorf("%s and %s are both masters",
Expand Down Expand Up @@ -231,8 +255,9 @@ func snapshot(instance *instanceMeta) error {
return nil
}

func upgradeReplicaset(replicaset replicaset.Replicaset, lsnTimeout int) error {
master, replicas, err := collectRWROInfo(replicaset)
func upgradeReplicaset(replicaset replicaset.Replicaset, lsnTimeout int,
connOpts connector.ConnectOpts) error {
master, replicas, err := collectRWROInfo(replicaset, connOpts)
if err != nil {
return err
}
Expand Down
28 changes: 28 additions & 0 deletions test/integration/replicaset/small_cluster_app/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
credentials:
users:
client:
password: 'secret'
roles: [super]
guest:
roles: [super]

app:
file: 'init.lua'

groups:
group-001:
replicasets:
replicaset-001:
instances:
storage-master:
iproto:
listen:
- uri: '127.0.0.1:3301'
database:
mode: rw
storage-replica:
iproto:
listen:
- uri: '127.0.0.1:3302'
database:
mode: ro
4 changes: 4 additions & 0 deletions test/integration/replicaset/small_cluster_app/init.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
local fio = require('fio')

fh = fio.open('ready-' .. box.info.name, {'O_WRONLY', 'O_CREAT'}, tonumber('644',8))
fh:close()
4 changes: 4 additions & 0 deletions test/integration/replicaset/small_cluster_app/instances.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
storage-master:

storage-replica:

25 changes: 24 additions & 1 deletion test/integration/replicaset/test_replicaset_upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import tempfile

import pytest
from replicaset_helpers import stop_application
from replicaset_helpers import start_application, stop_application
from vshard_cluster import VshardCluster

from utils import get_tarantool_version, run_command_and_get_output, wait_file
Expand Down Expand Up @@ -192,3 +192,26 @@ def test_upgrade_downgraded_cluster_replicasets(tt_cmd, tmp_path):

finally:
app.stop()


@pytest.mark.skipif(tarantool_major_version < 3,
reason="skip cluster instances test for Tarantool < 3")
def test_upgrade_remote_replicasets(tt_cmd, tmpdir_with_cfg):
tmpdir = tmpdir_with_cfg
app_name = "small_cluster_app"
app_path = os.path.join(tmpdir, app_name)
shutil.copytree(os.path.join(os.path.dirname(__file__), app_name), app_path)
instances = ['storage-master', 'storage-replica']

try:
start_application(tt_cmd, tmpdir, app_name, instances)
uri = "tcp://client:secret@127.0.0.1:3301"
upgrade_cmd = [tt_cmd, "replicaset", "upgrade", uri, "-t=15"]
rc, out = run_command_and_get_output(upgrade_cmd, cwd=tmpdir)
assert rc == 0
assert "ok" in out

finally:
stop_cmd = [tt_cmd, "stop", app_name, "-y"]
stop_rc, stop_out = run_command_and_get_output(stop_cmd, cwd=tmpdir)
assert stop_rc == 0

0 comments on commit cc2a233

Please sign in to comment.