From 1cfc58e0baf87c96ecf151c096d8c7b1ce7f3b7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B1=B1=E5=B2=9A?= <36239017+YuJuncen@users.noreply.github.com> Date: Mon, 29 Apr 2024 14:08:58 +0800 Subject: [PATCH] This is an automated cherry-pick of #52607 Signed-off-by: ti-chi-bot --- br/pkg/backup/prepare_snap/BUILD.bazel | 1 + br/pkg/backup/prepare_snap/prepare.go | 10 +++++++ br/pkg/task/operator/BUILD.bazel | 1 + br/pkg/task/operator/cmd.go | 29 +++++++++++------- tests/realtikvtest/brietest/operator_test.go | 31 ++++++++++++++++++++ 5 files changed, 61 insertions(+), 11 deletions(-) diff --git a/br/pkg/backup/prepare_snap/BUILD.bazel b/br/pkg/backup/prepare_snap/BUILD.bazel index ce61679db0c53..6672390ab42f6 100644 --- a/br/pkg/backup/prepare_snap/BUILD.bazel +++ b/br/pkg/backup/prepare_snap/BUILD.bazel @@ -17,6 +17,7 @@ go_library( "@com_github_docker_go_units//:go-units", "@com_github_google_btree//:btree", "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/brpb", "@com_github_pingcap_kvproto//pkg/errorpb", "@com_github_pingcap_kvproto//pkg/metapb", diff --git a/br/pkg/backup/prepare_snap/prepare.go b/br/pkg/backup/prepare_snap/prepare.go index 46f1916873831..a5b91e7784af0 100644 --- a/br/pkg/backup/prepare_snap/prepare.go +++ b/br/pkg/backup/prepare_snap/prepare.go @@ -22,6 +22,7 @@ import ( "github.com/google/btree" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" brpb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" @@ -414,7 +415,16 @@ func (p *Preparer) pushWaitApply(reqs pendingRequests, region Region) { p.inflightReqs[region.GetMeta().Id] = *region.GetMeta() } +<<<<<<< HEAD func (p *Preparer) prepareConnections(ctx context.Context) error { +======= +// PrepareConnections prepares the connections for each store. +// This will pause the admin commands for each store. +func (p *Preparer) PrepareConnections(ctx context.Context) error { + failpoint.Inject("PrepareConnectionsErr", func() { + failpoint.Return(errors.New("mock PrepareConnectionsErr")) + }) +>>>>>>> 2969b9e5767 (br/operator: fix adapt env for snapshot backup stuck when encountered error (#52607)) log.Info("Preparing connections to stores.") stores, err := p.env.GetAllLiveStores(ctx) if err != nil { diff --git a/br/pkg/task/operator/BUILD.bazel b/br/pkg/task/operator/BUILD.bazel index 52c99c845b57b..c7b8bbeb4ea23 100644 --- a/br/pkg/task/operator/BUILD.bazel +++ b/br/pkg/task/operator/BUILD.bazel @@ -16,6 +16,7 @@ go_library( "//br/pkg/task", "//br/pkg/utils", "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_log//:log", "@com_github_spf13_pflag//:pflag", "@com_github_tikv_client_go_v2//tikv", diff --git a/br/pkg/task/operator/cmd.go b/br/pkg/task/operator/cmd.go index 726d006e17da7..5b7b7b530a58f 100644 --- a/br/pkg/task/operator/cmd.go +++ b/br/pkg/task/operator/cmd.go @@ -5,15 +5,13 @@ package operator import ( "context" "crypto/tls" - "fmt" - "math/rand" - "os" "runtime/debug" "strings" "sync" "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/log" preparesnap "github.com/pingcap/tidb/br/pkg/backup/prepare_snap" berrors "github.com/pingcap/tidb/br/pkg/errors" @@ -136,9 +134,26 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error { defer cx.Close() cx.run(func() error { return pauseGCKeeper(cx) }) +<<<<<<< HEAD cx.run(func() error { return pauseSchedulerKeeper(cx) }) cx.run(func() error { return pauseAdminAndWaitApply(cx) }) +======= + cx.run(func() error { + log.Info("Pause scheduler waiting all connections established.") + select { + case <-initChan: + case <-cx.Done(): + return cx.Err() + } + log.Info("Pause scheduler noticed connections established.") + return pauseSchedulerKeeper(cx) + }) + cx.run(func() error { return pauseAdminAndWaitApply(cx, initChan) }) +>>>>>>> 2969b9e5767 (br/operator: fix adapt env for snapshot backup stuck when encountered error (#52607)) go func() { + failpoint.Inject("SkipReadyHint", func() { + failpoint.Return() + }) cx.rdGrp.Wait() if cfg.OnAllReady != nil { cfg.OnAllReady() @@ -182,14 +197,6 @@ func pauseAdminAndWaitApply(cx *AdaptEnvForSnapshotBackupContext) error { return nil } -func getCallerName() string { - name, err := os.Hostname() - if err != nil { - name = fmt.Sprintf("UNKNOWN-%d", rand.Int63()) - } - return fmt.Sprintf("operator@%sT%d#%d", name, time.Now().Unix(), os.Getpid()) -} - func pauseGCKeeper(cx *AdaptEnvForSnapshotBackupContext) (err error) { // Note: should we remove the service safepoint as soon as this exits? sp := utils.BRServiceSafePoint{ diff --git a/tests/realtikvtest/brietest/operator_test.go b/tests/realtikvtest/brietest/operator_test.go index 863ae10f12ade..37eae760a96d8 100644 --- a/tests/realtikvtest/brietest/operator_test.go +++ b/tests/realtikvtest/brietest/operator_test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/google/uuid" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/br/pkg/task" @@ -224,3 +225,33 @@ func TestOperator(t *testing.T) { verifySchedulerNotStopped(req, cfg) verifyGCNotStopped(req, cfg) } + +func TestFailure(t *testing.T) { + req := require.New(t) + req.NoError(failpoint.Enable("github.com/pingcap/tidb/br/pkg/backup/prepare_snap/PrepareConnectionsErr", "return()")) + // Make goleak happy. + req.NoError(failpoint.Enable("github.com/pingcap/tidb/br/pkg/task/operator/SkipReadyHint", "return()")) + defer func() { + req.NoError(failpoint.Disable("github.com/pingcap/tidb/br/pkg/backup/prepare_snap/PrepareConnectionsErr")) + req.NoError(failpoint.Disable("github.com/pingcap/tidb/br/pkg/task/operator/SkipReadyHint")) + }() + + cfg := operator.PauseGcConfig{ + Config: task.Config{ + PD: []string{"127.0.0.1:2379"}, + }, + TTL: 5 * time.Minute, + SafePoint: oracle.GoTimeToTS(time.Now()), + } + + verifyGCNotStopped(req, cfg) + verifySchedulerNotStopped(req, cfg) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err := operator.AdaptEnvForSnapshotBackup(ctx, &cfg) + require.Error(t, err) + + verifyGCNotStopped(req, cfg) + verifySchedulerNotStopped(req, cfg) +}