From d56f62c004ffcd9863ec54890692d0c4fd3f9f00 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Mon, 30 Jan 2023 17:51:46 +0800 Subject: [PATCH 1/6] prevent from restore point to cluster running log backup Signed-off-by: Leavrth --- br/pkg/restore/client.go | 9 +++++---- br/pkg/task/stream.go | 31 ++++++++++++++++++++++++++++--- 2 files changed, 33 insertions(+), 7 deletions(-) diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 18a3dc61879e4..a7c3dd2631708 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -2088,15 +2088,16 @@ func (rc *Client) RestoreKVFiles( return errors.Trace(err) }) + if err = eg.Wait(); err != nil { + summary.CollectFailureUnit("file", err) + log.Error("restore files failed", zap.Error(err)) + } + log.Info("total skip files due to table id not matched", zap.Int("count", skipFile)) if skipFile > 0 { log.Debug("table id in full backup storage", zap.Any("tables", rules)) } - if err = eg.Wait(); err != nil { - summary.CollectFailureUnit("file", err) - log.Error("restore files failed", zap.Error(err)) - } return errors.Trace(err) } diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 2ffa7bc7dd9af..9a746da6a5cf8 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -868,8 +868,8 @@ func RunStreamAdvancer(c context.Context, g glue.Glue, cmdName string, cfg *Stre return nil } -func checkConfigForStatus(cfg *StreamConfig) error { - if len(cfg.PD) == 0 { +func checkConfigForStatus(pd []string) error { + if len(pd) == 0 { return errors.Annotatef(berrors.ErrInvalidArgument, "the command needs access to PD, please specify `-u` or `--pd`") } @@ -919,7 +919,7 @@ func RunStreamStatus( ctx = opentracing.ContextWithSpan(ctx, span1) } - if err := checkConfigForStatus(cfg); err != nil { + if err := checkConfigForStatus(cfg.PD); err != nil { return err } ctl, err := makeStatusController(ctx, cfg, g) @@ -1034,6 +1034,27 @@ func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *Stre return nil } +// checkTaskExists checks whether there is a log backup task running. +// If so, return an error. +func checkTaskExists(ctx context.Context, cfg *RestoreConfig) error { + if err := checkConfigForStatus(cfg.PD); err != nil { + return err + } + etcdCLI, err := dialEtcdWithCfg(ctx, cfg.Config) + if err != nil { + return err + } + cli := streamhelper.NewMetaDataClient(etcdCLI) + tasks, err := cli.GetAllTasks(ctx) + if err != nil { + return err + } + if len(tasks) > 0 { + return errors.Errorf("There is a log backup task running: %s, please stop the task before restore", tasks[0].Info.Name) + } + return nil +} + // RunStreamRestore restores stream log. func RunStreamRestore( c context.Context, @@ -1050,6 +1071,10 @@ func RunStreamRestore( ctx = opentracing.ContextWithSpan(ctx, span1) } + if err := checkTaskExists(ctx, cfg); err != nil { + return errors.Trace(err) + } + logInfo, err := getLogRange(ctx, &cfg.Config) if err != nil { return errors.Trace(err) From 2035e4b38912c806ef4bf68f16b7d1f55c6e5ba9 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Mon, 30 Jan 2023 18:40:49 +0800 Subject: [PATCH 2/6] close the etcd client in time Signed-off-by: Leavrth --- br/pkg/task/stream.go | 1 + 1 file changed, 1 insertion(+) diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 9a746da6a5cf8..0acefb06bf53e 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1045,6 +1045,7 @@ func checkTaskExists(ctx context.Context, cfg *RestoreConfig) error { return err } cli := streamhelper.NewMetaDataClient(etcdCLI) + defer cli.Close() tasks, err := cli.GetAllTasks(ctx) if err != nil { return err From c6402dc9eb28a7640f7379f66779d54969d04450 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Fri, 3 Feb 2023 16:26:42 +0800 Subject: [PATCH 3/6] resolve call cycle Signed-off-by: Leavrth --- br/pkg/backup/push.go | 3 +-- br/pkg/task/restore.go | 7 +++++++ br/pkg/task/stream.go | 6 +----- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/br/pkg/backup/push.go b/br/pkg/backup/push.go index 2ffffe690ffe5..ed929a12895a4 100644 --- a/br/pkg/backup/push.go +++ b/br/pkg/backup/push.go @@ -218,10 +218,9 @@ func (push *pushDown) pushBackup( if len(errMsg) <= 0 { errMsg = errPb.Msg } - return errors.Annotatef(berrors.ErrKVStorage, "error happen in store %v at %s: %s %s", + return errors.Annotatef(berrors.ErrKVStorage, "error happen in store %v at %s: %s", store.GetId(), redact.String(store.GetAddress()), - req.StorageBackend.String(), errMsg, ) } diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 8a5cd0425e221..293468e1c5c0a 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -474,10 +474,17 @@ func IsStreamRestore(cmdName string) bool { // RunRestore starts a restore task inside the current goroutine. func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error { + if err := checkTaskExists(c, cfg); err != nil { + return errors.Trace(err) + } + if IsStreamRestore(cmdName) { return RunStreamRestore(c, g, cmdName, cfg) } + return runRestore(c, g, cmdName, cfg) +} +func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error { cfg.Adjust() defer summary.Summary(cmdName) ctx, cancel := context.WithCancel(c) diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 0acefb06bf53e..ff26ea13eb656 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1072,10 +1072,6 @@ func RunStreamRestore( ctx = opentracing.ContextWithSpan(ctx, span1) } - if err := checkTaskExists(ctx, cfg); err != nil { - return errors.Trace(err) - } - logInfo, err := getLogRange(ctx, &cfg.Config) if err != nil { return errors.Trace(err) @@ -1121,7 +1117,7 @@ func RunStreamRestore( logStorage := cfg.Config.Storage cfg.Config.Storage = cfg.FullBackupStorage // TiFlash replica is restored to down-stream on 'pitr' currently. - if err = RunRestore(ctx, g, FullRestoreCmd, cfg); err != nil { + if err = runRestore(ctx, g, FullRestoreCmd, cfg); err != nil { return errors.Trace(err) } cfg.Config.Storage = logStorage From 4392e9a5fa4de426f2fcc551d3a6e13b1349b3cc Mon Sep 17 00:00:00 2001 From: Jianjun Liao <36503113+Leavrth@users.noreply.github.com> Date: Fri, 3 Feb 2023 16:28:22 +0800 Subject: [PATCH 4/6] Update br/pkg/task/stream.go Co-authored-by: Zak Zhao <57036248+joccau@users.noreply.github.com> --- br/pkg/task/stream.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index ff26ea13eb656..83c85356691f1 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1051,7 +1051,7 @@ func checkTaskExists(ctx context.Context, cfg *RestoreConfig) error { return err } if len(tasks) > 0 { - return errors.Errorf("There is a log backup task running: %s, please stop the task before restore", tasks[0].Info.Name) + return errors.Errorf("log backup task is running: %s, please stop the task before restore, and after PITR operation finished, create log-backup task again and create a full backup on this cluster.", tasks[0].Info.Name) } return nil } From 12a5731b88e61f93384ec1c259f69a123f87a0c4 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Tue, 7 Feb 2023 16:31:16 +0800 Subject: [PATCH 5/6] add more info in error Signed-off-by: Leavrth --- br/pkg/task/restore.go | 2 +- br/pkg/task/stream.go | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 25056510ffc20..2a5c58a9febf3 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -491,7 +491,7 @@ func IsStreamRestore(cmdName string) bool { // RunRestore starts a restore task inside the current goroutine. func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error { if err := checkTaskExists(c, cfg); err != nil { - return errors.Trace(err) + return errors.Annotate(err, "failed to check task exits") } config.UpdateGlobal(func(conf *config.Config) { diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 83c85356691f1..942b1faefdc7c 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1045,7 +1045,11 @@ func checkTaskExists(ctx context.Context, cfg *RestoreConfig) error { return err } cli := streamhelper.NewMetaDataClient(etcdCLI) - defer cli.Close() + defer func() { + if err := cli.Close(); err != nil { + log.Error("failed to close the etcd client", zap.Error(err)) + } + }() tasks, err := cli.GetAllTasks(ctx) if err != nil { return err From 0b3c1cb23b970835cdaf70c49165f1de64687922 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Tue, 7 Feb 2023 18:24:43 +0800 Subject: [PATCH 6/6] add integration test Signed-off-by: Leavrth --- br/pkg/task/stream.go | 2 +- br/tests/br_restore_log_task_enable/run.sh | 56 ++++++++++++++++++++++ 2 files changed, 57 insertions(+), 1 deletion(-) create mode 100644 br/tests/br_restore_log_task_enable/run.sh diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 942b1faefdc7c..40f851048da37 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1055,7 +1055,7 @@ func checkTaskExists(ctx context.Context, cfg *RestoreConfig) error { return err } if len(tasks) > 0 { - return errors.Errorf("log backup task is running: %s, please stop the task before restore, and after PITR operation finished, create log-backup task again and create a full backup on this cluster.", tasks[0].Info.Name) + return errors.Errorf("log backup task is running: %s, please stop the task before restore, and after PITR operation finished, create log-backup task again and create a full backup on this cluster", tasks[0].Info.Name) } return nil } diff --git a/br/tests/br_restore_log_task_enable/run.sh b/br/tests/br_restore_log_task_enable/run.sh new file mode 100644 index 0000000000000..923f8fe7c2b33 --- /dev/null +++ b/br/tests/br_restore_log_task_enable/run.sh @@ -0,0 +1,56 @@ +#!/bin/sh +# +# Copyright 2022 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eux +DB="$TEST_NAME" +TABLE="usertable" + +# start log task +run_br log start --task-name 1234 -s "local://$TEST_DIR/$DB/log" --pd $PD_ADDR + +run_sql "CREATE DATABASE $DB;" +run_sql "CREATE TABLE $DB.$TABLE (id int);" +run_sql "INSERT INTO $DB.$TABLE VALUES (1), (2), (3);" + +# backup full +run_br backup full -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR + +# clean db +run_sql "DROP DATABASE $DB;" + +# restore full (should be failed) +run_br restore full -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR && exit 1 + +# restore point (should be failed) +run_br restore point -s "local://$TEST_DIR/$DB/log" --pd $PD_ADDR && exit 1 + +# pause log task +run_br log pause --task-name 1234 --pd $PD_ADDR + +# restore full (should be failed) +run_br restore full -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR && exit 1 + +# restore point (should be failed) +run_br restore point -s "local://$TEST_DIR/$DB/log" --pd $PD_ADDR && exit 1 + +# stop log task +run_br log stop --task-name 1234 --pd $PD_ADDR + +# restore full (should be success) +run_br restore full -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR + +# clean db +run_sql "DROP DATABASE $DB"