diff --git a/MODULE.bazel.lock b/MODULE.bazel.lock index 524a898c5a7e0..964faf91b3b1b 100644 --- a/MODULE.bazel.lock +++ b/MODULE.bazel.lock @@ -641,19 +641,12 @@ "name": "apple_support~1.5.0~apple_cc_configure_extension~local_config_apple_cc_toolchains" } } - }, - "recordedRepoMappingEntries": [ - [ - "apple_support~1.5.0", - "bazel_tools", - "bazel_tools" - ] - ] + } } }, "@@bazel_tools//tools/cpp:cc_configure.bzl%cc_configure_extension": { "general": { - "bzlTransitiveDigest": "mcsWHq3xORJexV5/4eCvNOLxFOQKV6eli3fkr+tEaqE=", + "bzlTransitiveDigest": "O9sf6ilKWU9Veed02jG9o2HM/xgV/UAyciuFBuxrFRY=", "accumulatedFileDigests": {}, "envVariables": {}, "generatedRepoSpecs": { @@ -671,14 +664,7 @@ "name": "bazel_tools~cc_configure_extension~local_config_cc_toolchains" } } - }, - "recordedRepoMappingEntries": [ - [ - "bazel_tools", - "bazel_tools", - "bazel_tools" - ] - ] + } } }, "@@bazel_tools//tools/osx:xcode_configure.bzl%xcode_configure_extension": { @@ -696,8 +682,7 @@ "remote_xcode": "" } } - }, - "recordedRepoMappingEntries": [] + } } }, "@@bazel_tools//tools/sh:sh_configure.bzl%sh_configure_extension": { @@ -713,13 +698,12 @@ "name": "bazel_tools~sh_configure_extension~local_config_sh" } } - }, - "recordedRepoMappingEntries": [] + } } }, "@@rules_java~7.1.0//java:extensions.bzl%toolchains": { "general": { - "bzlTransitiveDigest": "D02GmifxnV/IhYgspsJMDZ/aE8HxAjXgek5gi6FSto4=", + "bzlTransitiveDigest": "iUIRqCK7tkhvcDJCAfPPqSd06IHG0a8HQD0xeQyVAqw=", "accumulatedFileDigests": {}, "envVariables": {}, "generatedRepoSpecs": { @@ -1254,19 +1238,7 @@ "build_file": "\nconfig_setting(\n name = \"prefix_version_setting\",\n values = {\"java_runtime_version\": \"remotejdk_21\"},\n visibility = [\"//visibility:private\"],\n)\nconfig_setting(\n name = \"version_setting\",\n values = {\"java_runtime_version\": \"21\"},\n visibility = [\"//visibility:private\"],\n)\nalias(\n name = \"version_or_prefix_version_setting\",\n actual = select({\n \":version_setting\": \":version_setting\",\n \"//conditions:default\": \":prefix_version_setting\",\n }),\n visibility = [\"//visibility:private\"],\n)\ntoolchain(\n name = \"toolchain\",\n target_compatible_with = [\"@platforms//os:windows\", \"@platforms//cpu:x86_64\"],\n target_settings = [\":version_or_prefix_version_setting\"],\n toolchain_type = \"@bazel_tools//tools/jdk:runtime_toolchain_type\",\n toolchain = \"@remotejdk21_win//:jdk\",\n)\ntoolchain(\n name = \"bootstrap_runtime_toolchain\",\n # These constraints are not required for correctness, but prevent fetches of remote JDK for\n # different architectures. As every Java compilation toolchain depends on a bootstrap runtime in\n # the same configuration, this constraint will not result in toolchain resolution failures.\n exec_compatible_with = [\"@platforms//os:windows\", \"@platforms//cpu:x86_64\"],\n target_settings = [\":version_or_prefix_version_setting\"],\n toolchain_type = \"@bazel_tools//tools/jdk:bootstrap_runtime_toolchain_type\",\n toolchain = \"@remotejdk21_win//:jdk\",\n)\n" } } - }, - "recordedRepoMappingEntries": [ - [ - "rules_java~7.1.0", - "bazel_tools", - "bazel_tools" - ], - [ - "rules_java~7.1.0", - "remote_java_tools", - "rules_java~7.1.0~toolchains~remote_java_tools" - ] - ] + } } } } diff --git a/br/pkg/backup/prepare_snap/BUILD.bazel b/br/pkg/backup/prepare_snap/BUILD.bazel index ce61679db0c53..2fffcacd5e088 100644 --- a/br/pkg/backup/prepare_snap/BUILD.bazel +++ b/br/pkg/backup/prepare_snap/BUILD.bazel @@ -17,6 +17,7 @@ go_library( "@com_github_docker_go_units//:go-units", "@com_github_google_btree//:btree", "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/brpb", "@com_github_pingcap_kvproto//pkg/errorpb", "@com_github_pingcap_kvproto//pkg/metapb", @@ -35,7 +36,7 @@ go_test( timeout = "short", srcs = ["prepare_test.go"], flaky = True, - shard_count = 7, + shard_count = 10, deps = [ ":prepare_snap", "//br/pkg/utils", diff --git a/br/pkg/backup/prepare_snap/env.go b/br/pkg/backup/prepare_snap/env.go index e0998adc392e3..672a052ae555a 100644 --- a/br/pkg/backup/prepare_snap/env.go +++ b/br/pkg/backup/prepare_snap/env.go @@ -17,6 +17,7 @@ package preparesnap import ( "context" "slices" + "sync" "time" "github.com/docker/go-units" @@ -110,6 +111,34 @@ func (c CliEnv) GetAllLiveStores(ctx context.Context) ([]*metapb.Store, error) { return withoutTiFlash, err } +func AdaptForGRPCInTest(p PrepareClient) PrepareClient { + return &gRPCGoAdapter{ + inner: p, + } +} + +// GrpcGoAdapter makes the `Send` call synchronous. +// grpc-go doesn't guarantee concurrency call to `Send` or `Recv` is safe. +// But concurrency call to `send` and `recv` is safe. +// This type is exported for testing. +type gRPCGoAdapter struct { + inner PrepareClient + sendMu sync.Mutex + recvMu sync.Mutex +} + +func (s *gRPCGoAdapter) Send(req *brpb.PrepareSnapshotBackupRequest) error { + s.sendMu.Lock() + defer s.sendMu.Unlock() + return s.inner.Send(req) +} + +func (s *gRPCGoAdapter) Recv() (*brpb.PrepareSnapshotBackupResponse, error) { + s.recvMu.Lock() + defer s.recvMu.Unlock() + return s.inner.Recv() +} + func (c CliEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error) { var cli brpb.Backup_PrepareSnapshotBackupClient err := c.Mgr.TryWithConn(ctx, storeID, func(cc *grpc.ClientConn) error { @@ -124,7 +153,7 @@ func (c CliEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClie if err != nil { return nil, err } - return cli, nil + return &gRPCGoAdapter{inner: cli}, nil } func (c CliEnv) LoadRegionsInKeyRange(ctx context.Context, startKey []byte, endKey []byte) (regions []Region, err error) { @@ -151,9 +180,8 @@ type RetryAndSplitRequestEnv struct { } func (r RetryAndSplitRequestEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error) { - // Retry for about 2 minutes. - rs := utils.InitialRetryState(12, 10*time.Second, 10*time.Second) - bo := utils.Backoffer(&rs) + rs := utils.ConstantBackoff(10 * time.Second) + bo := utils.Backoffer(rs) if r.GetBackoffer != nil { bo = r.GetBackoffer() } diff --git a/br/pkg/backup/prepare_snap/prepare.go b/br/pkg/backup/prepare_snap/prepare.go index 46f1916873831..ddfb9d206e9a9 100644 --- a/br/pkg/backup/prepare_snap/prepare.go +++ b/br/pkg/backup/prepare_snap/prepare.go @@ -22,6 +22,7 @@ import ( "github.com/google/btree" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" brpb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" @@ -91,6 +92,9 @@ type Preparer struct { RetryBackoff time.Duration RetryLimit int LeaseDuration time.Duration + + /* Observers. Initialize them before starting.*/ + AfterConnectionsEstablished func() } func New(env Env) *Preparer { @@ -155,10 +159,13 @@ func (p *Preparer) DriveLoopAndWaitPrepare(ctx context.Context) error { zap.Int("retry_limit", p.RetryLimit), zap.Duration("lease_duration", p.LeaseDuration)) p.retryTime = 0 - if err := p.prepareConnections(ctx); err != nil { + if err := p.PrepareConnections(ctx); err != nil { log.Error("failed to prepare connections", logutil.ShortError(err)) return errors.Annotate(err, "failed to prepare connections") } + if p.AfterConnectionsEstablished != nil { + p.AfterConnectionsEstablished() + } if err := p.AdvanceState(ctx); err != nil { log.Error("failed to check the progress of our work", logutil.ShortError(err)) return errors.Annotate(err, "failed to begin step") @@ -186,19 +193,36 @@ func (p *Preparer) Finalize(ctx context.Context) error { return nil }) } - if err := eg.Wait(); err != nil { - logutil.CL(ctx).Warn("failed to finalize some prepare streams.", logutil.ShortError(err)) - return err - } - logutil.CL(ctx).Info("all connections to store have shuted down.") + errCh := make(chan error, 1) + go func() { + if err := eg.Wait(); err != nil { + logutil.CL(ctx).Warn("failed to finalize some prepare streams.", logutil.ShortError(err)) + errCh <- err + return + } + logutil.CL(ctx).Info("all connections to store have shuted down.") + errCh <- nil + }() for { select { - case event := <-p.eventChan: + case event, ok := <-p.eventChan: + if !ok { + return nil + } if err := p.onEvent(ctx, event); err != nil { return err } - default: - return nil + case err, ok := <-errCh: + if !ok { + panic("unreachable.") + } + if err != nil { + return err + } + // All streams are finialized, they shouldn't send more events to event chan. + close(p.eventChan) + case <-ctx.Done(): + return ctx.Err() } } } @@ -385,23 +409,35 @@ func (p *Preparer) sendWaitApply(ctx context.Context, reqs pendingRequests) erro } func (p *Preparer) streamOf(ctx context.Context, storeID uint64) (*prepareStream, error) { - s, ok := p.clients[storeID] + _, ok := p.clients[storeID] if !ok { + log.Warn("stream of store found a store not established connection", zap.Uint64("store", storeID)) cli, err := p.env.ConnectToStore(ctx, storeID) if err != nil { return nil, errors.Annotatef(err, "failed to dial store %d", storeID) } - s = new(prepareStream) - s.storeID = storeID - s.output = p.eventChan - s.leaseDuration = p.LeaseDuration - err = s.InitConn(ctx, cli) - if err != nil { - return nil, err + if err := p.createAndCacheStream(ctx, cli, storeID); err != nil { + return nil, errors.Annotatef(err, "failed to create and cache stream for store %d", storeID) } - p.clients[storeID] = s } - return s, nil + return p.clients[storeID], nil +} + +func (p *Preparer) createAndCacheStream(ctx context.Context, cli PrepareClient, storeID uint64) error { + if _, ok := p.clients[storeID]; ok { + return nil + } + + s := new(prepareStream) + s.storeID = storeID + s.output = p.eventChan + s.leaseDuration = p.LeaseDuration + err := s.InitConn(ctx, cli) + if err != nil { + return err + } + p.clients[storeID] = s + return nil } func (p *Preparer) pushWaitApply(reqs pendingRequests, region Region) { @@ -414,17 +450,34 @@ func (p *Preparer) pushWaitApply(reqs pendingRequests, region Region) { p.inflightReqs[region.GetMeta().Id] = *region.GetMeta() } -func (p *Preparer) prepareConnections(ctx context.Context) error { +// PrepareConnections prepares the connections for each store. +// This will pause the admin commands for each store. +func (p *Preparer) PrepareConnections(ctx context.Context) error { + failpoint.Inject("PrepareConnectionsErr", func() { + failpoint.Return(errors.New("mock PrepareConnectionsErr")) + }) log.Info("Preparing connections to stores.") stores, err := p.env.GetAllLiveStores(ctx) if err != nil { return errors.Annotate(err, "failed to get all live stores") } + + log.Info("Start to initialize the connections.", zap.Int("stores", len(stores))) + clients := map[uint64]PrepareClient{} for _, store := range stores { - _, err := p.streamOf(ctx, store.Id) + cli, err := p.env.ConnectToStore(ctx, store.Id) if err != nil { - return errors.Annotatef(err, "failed to prepare connection to store %d", store.Id) + return errors.Annotatef(err, "failed to dial the store %d", store.Id) } + clients[store.Id] = cli } + + for id, cli := range clients { + log.Info("Start to pause the admin commands.", zap.Uint64("store", id)) + if err := p.createAndCacheStream(ctx, cli, id); err != nil { + return errors.Annotatef(err, "failed to create and cache stream for store %d", id) + } + } + return nil } diff --git a/br/pkg/backup/prepare_snap/prepare_test.go b/br/pkg/backup/prepare_snap/prepare_test.go index 502af2249d9d9..33ddd482bb8a1 100644 --- a/br/pkg/backup/prepare_snap/prepare_test.go +++ b/br/pkg/backup/prepare_snap/prepare_test.go @@ -21,6 +21,7 @@ import ( "io" "slices" "sync" + "sync/atomic" "testing" "time" @@ -46,7 +47,12 @@ type mockStore struct { successRegions []metapb.Region onWaitApply func(*metapb.Region) error - now func() time.Time + + waitApplyDelay func() + delaiedWaitApplies sync.WaitGroup + + injectConnErr <-chan error + now func() time.Time } func (s *mockStore) Send(req *brpb.PrepareSnapshotBackupRequest) error { @@ -66,7 +72,16 @@ func (s *mockStore) Send(req *brpb.PrepareSnapshotBackupRequest) error { } } } - s.sendResp(resp) + if s.waitApplyDelay != nil { + s.delaiedWaitApplies.Add(1) + go func() { + defer s.delaiedWaitApplies.Done() + s.waitApplyDelay() + s.sendResp(resp) + }() + } else { + s.sendResp(resp) + } if resp.Error == nil { s.successRegions = append(s.successRegions, *region) } @@ -99,17 +114,27 @@ func (s *mockStore) sendResp(resp brpb.PrepareSnapshotBackupResponse) { } func (s *mockStore) Recv() (*brpb.PrepareSnapshotBackupResponse, error) { - out, ok := <-s.output - if !ok { - return nil, io.EOF + for { + select { + case out, ok := <-s.output: + if !ok { + return nil, io.EOF + } + return &out, nil + case err, ok := <-s.injectConnErr: + if ok { + return nil, err + } + s.injectConnErr = nil + } } - return &out, nil } type mockStores struct { mu sync.Mutex stores map[uint64]*mockStore onCreateStore func(*mockStore) + connectDelay func(uint64) <-chan struct{} onConnectToStore func(uint64) error pdc *tikv.RegionCache @@ -117,8 +142,16 @@ type mockStores struct { func newTestEnv(pdc pd.Client) *mockStores { r := tikv.NewRegionCache(pdc) + stores, err := pdc.GetAllStores(context.Background()) + if err != nil { + panic(err) + } + ss := map[uint64]*mockStore{} + for _, store := range stores { + ss[store.Id] = nil + } ms := &mockStores{ - stores: map[uint64]*mockStore{}, + stores: ss, pdc: r, onCreateStore: func(ms *mockStore) {}, } @@ -138,7 +171,14 @@ func (m *mockStores) GetAllLiveStores(ctx context.Context) ([]*metapb.Store, err func (m *mockStores) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error) { m.mu.Lock() - defer m.mu.Unlock() + defer func() { + m.mu.Unlock() + if m.connectDelay != nil { + if ch := m.connectDelay(storeID); ch != nil { + <-ch + } + } + }() if m.onConnectToStore != nil { err := m.onConnectToStore(storeID) @@ -147,10 +187,10 @@ func (m *mockStores) ConnectToStore(ctx context.Context, storeID uint64) (Prepar } } - _, ok := m.stores[storeID] - if !ok { + s, ok := m.stores[storeID] + if !ok || s == nil { m.stores[storeID] = &mockStore{ - output: make(chan brpb.PrepareSnapshotBackupResponse, 16), + output: make(chan brpb.PrepareSnapshotBackupResponse, 20480), successRegions: []metapb.Region{}, onWaitApply: func(r *metapb.Region) error { return nil @@ -161,7 +201,7 @@ func (m *mockStores) ConnectToStore(ctx context.Context, storeID uint64) (Prepar } m.onCreateStore(m.stores[storeID]) } - return m.stores[storeID], nil + return AdaptForGRPCInTest(m.stores[storeID]), nil } func (m *mockStores) LoadRegionsInKeyRange(ctx context.Context, startKey []byte, endKey []byte) (regions []Region, err error) { @@ -456,3 +496,102 @@ func TestSplitEnv(t *testing.T) { require.Equal(t, cc.PrepareClient.(*counterClient).send, 1) require.ElementsMatch(t, cc.PrepareClient.(*counterClient).regions, tinyRequest.Regions) } + +func TestConnectionDelay(t *testing.T) { + req := require.New(t) + pdc := fakeCluster(t, 3, dummyRegions(100)...) + ms := newTestEnv(pdc) + called := 0 + delayConn := make(chan struct{}) + blocked := make(chan struct{}, 64) + ms.connectDelay = func(i uint64) <-chan struct{} { + called += 1 + if called == 2 { + blocked <- struct{}{} + return delayConn + } + return nil + } + ctx := context.Background() + prep := New(ms) + connectionPrepareResult := make(chan error) + go func() { + connectionPrepareResult <- prep.PrepareConnections(ctx) + }() + <-blocked + ms.mu.Lock() + nonNilStore := 0 + for id, store := range ms.stores { + // We must not create and lease (i.e. reject admin command from any tikv) here. + if store != nil { + req.True(store.leaseUntil.Before(time.Now()), "%d->%s", id, store.leaseUntil) + nonNilStore += 1 + } + } + req.GreaterOrEqual(nonNilStore, 2) + ms.mu.Unlock() + delayConn <- struct{}{} + req.NoError(<-connectionPrepareResult) +} + +func TestHooks(t *testing.T) { + req := require.New(t) + pdc := fakeCluster(t, 3, dummyRegions(100)...) + pauseWaitApply := make(chan struct{}) + ms := newTestEnv(pdc) + ms.onCreateStore = func(ms *mockStore) { + ms.onWaitApply = func(r *metapb.Region) error { + <-pauseWaitApply + return nil + } + } + adv := New(ms) + connectionsEstablished := new(atomic.Bool) + adv.AfterConnectionsEstablished = func() { + connectionsEstablished.Store(true) + } + errCh := make(chan error, 1) + go func() { + errCh <- adv.DriveLoopAndWaitPrepare(context.Background()) + }() + req.Eventually(connectionsEstablished.Load, 1*time.Second, 100*time.Millisecond) + close(pauseWaitApply) + req.NoError(<-errCh) + ms.AssertSafeForBackup(t) + req.NoError(adv.Finalize(context.Background())) + ms.AssertIsNormalMode(t) +} + +func TestManyMessagesWhenFinalizing(t *testing.T) { + req := require.New(t) + pdc := fakeCluster(t, 3, dummyRegions(10240)...) + ms := newTestEnv(pdc) + blockCh := make(chan struct{}) + injectErr := make(chan error) + ms.onCreateStore = func(ms *mockStore) { + ms.waitApplyDelay = func() { + <-blockCh + } + ms.injectConnErr = injectErr + } + prep := New(ms) + ctx := context.Background() + req.NoError(prep.PrepareConnections(ctx)) + errC := async(func() error { return prep.DriveLoopAndWaitPrepare(ctx) }) + injectErr <- errors.NewNoStackError("whoa!") + req.Error(<-errC) + close(blockCh) + for _, s := range ms.stores { + s.delaiedWaitApplies.Wait() + } + // Closing the stream should be error. + req.Error(prep.Finalize(ctx)) +} + +func async[T any](f func() T) <-chan T { + ch := make(chan T) + go func() { + ch <- f() + }() + return ch +} diff --git a/br/pkg/backup/prepare_snap/stream.go b/br/pkg/backup/prepare_snap/stream.go index 9e253fc4a4d37..f963899b1d826 100644 --- a/br/pkg/backup/prepare_snap/stream.go +++ b/br/pkg/backup/prepare_snap/stream.go @@ -70,9 +70,13 @@ func (p *prepareStream) InitConn(ctx context.Context, cli PrepareClient) error { p.cli = cli p.clientLoopHandle, ctx = errgroup.WithContext(ctx) ctx, p.stopBgTasks = context.WithCancel(ctx) + log.Info("initializing", zap.Uint64("store", p.storeID)) return p.GoLeaseLoop(ctx, p.leaseDuration) } +// Finalize cuts down this connection and remove the lease. +// This will block until all messages has been flushed to `output` channel. +// After this return, no more messages should be appended to the `output` channel. func (p *prepareStream) Finalize(ctx context.Context) error { log.Info("shutting down", zap.Uint64("store", p.storeID)) return p.stopClientLoop(ctx) @@ -150,7 +154,8 @@ func (p *prepareStream) clientLoop(ctx context.Context, dur time.Duration) error return nil case res := <-p.serverStream: if err := p.onResponse(ctx, res); err != nil { - p.sendErr(errors.Annotate(err, "failed to recv from the stream")) + err = errors.Annotate(err, "failed to recv from the stream") + p.sendErr(err) return err } case <-ticker.C: @@ -185,6 +190,10 @@ func (p *prepareStream) sendErr(err error) { } func (p *prepareStream) convertToEvent(resp *brpb.PrepareSnapshotBackupResponse) (event, bool) { + if resp == nil { + log.Warn("Received nil message, that shouldn't happen in a normal cluster.", zap.Uint64("store", p.storeID)) + return event{}, false + } switch resp.Ty { case brpb.PrepareSnapshotBackupEventType_WaitApplyDone: return event{ diff --git a/br/pkg/task/operator/BUILD.bazel b/br/pkg/task/operator/BUILD.bazel index 52c99c845b57b..c7b8bbeb4ea23 100644 --- a/br/pkg/task/operator/BUILD.bazel +++ b/br/pkg/task/operator/BUILD.bazel @@ -16,6 +16,7 @@ go_library( "//br/pkg/task", "//br/pkg/utils", "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_log//:log", "@com_github_spf13_pflag//:pflag", "@com_github_tikv_client_go_v2//tikv", diff --git a/br/pkg/task/operator/cmd.go b/br/pkg/task/operator/cmd.go index 726d006e17da7..176547d5057b4 100644 --- a/br/pkg/task/operator/cmd.go +++ b/br/pkg/task/operator/cmd.go @@ -5,15 +5,13 @@ package operator import ( "context" "crypto/tls" - "fmt" - "math/rand" - "os" "runtime/debug" "strings" "sync" "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/log" preparesnap "github.com/pingcap/tidb/br/pkg/backup/prepare_snap" berrors "github.com/pingcap/tidb/br/pkg/errors" @@ -135,10 +133,23 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error { } defer cx.Close() + initChan := make(chan struct{}) cx.run(func() error { return pauseGCKeeper(cx) }) - cx.run(func() error { return pauseSchedulerKeeper(cx) }) - cx.run(func() error { return pauseAdminAndWaitApply(cx) }) + cx.run(func() error { + log.Info("Pause scheduler waiting all connections established.") + select { + case <-initChan: + case <-cx.Done(): + return cx.Err() + } + log.Info("Pause scheduler noticed connections established.") + return pauseSchedulerKeeper(cx) + }) + cx.run(func() error { return pauseAdminAndWaitApply(cx, initChan) }) go func() { + failpoint.Inject("SkipReadyHint", func() { + failpoint.Return() + }) cx.rdGrp.Wait() if cfg.OnAllReady != nil { cfg.OnAllReady() @@ -154,7 +165,7 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error { return eg.Wait() } -func pauseAdminAndWaitApply(cx *AdaptEnvForSnapshotBackupContext) error { +func pauseAdminAndWaitApply(cx *AdaptEnvForSnapshotBackupContext, afterConnectionsEstablished chan<- struct{}) error { env := preparesnap.CliEnv{ Cache: tikv.NewRegionCache(cx.pdMgr.GetPDClient()), Mgr: cx.kvMgr, @@ -164,6 +175,10 @@ func pauseAdminAndWaitApply(cx *AdaptEnvForSnapshotBackupContext) error { begin := time.Now() prep := preparesnap.New(retryEnv) prep.LeaseDuration = cx.cfg.TTL + prep.AfterConnectionsEstablished = func() { + log.Info("All connections are stablished.") + close(afterConnectionsEstablished) + } defer cx.cleanUpWith(func(ctx context.Context) { if err := prep.Finalize(ctx); err != nil { @@ -182,14 +197,6 @@ func pauseAdminAndWaitApply(cx *AdaptEnvForSnapshotBackupContext) error { return nil } -func getCallerName() string { - name, err := os.Hostname() - if err != nil { - name = fmt.Sprintf("UNKNOWN-%d", rand.Int63()) - } - return fmt.Sprintf("operator@%sT%d#%d", name, time.Now().Unix(), os.Getpid()) -} - func pauseGCKeeper(cx *AdaptEnvForSnapshotBackupContext) (err error) { // Note: should we remove the service safepoint as soon as this exits? sp := utils.BRServiceSafePoint{ diff --git a/br/pkg/task/restore_data.go b/br/pkg/task/restore_data.go index 3276a0f2af101..e1e15b837cb5f 100644 --- a/br/pkg/task/restore_data.go +++ b/br/pkg/task/restore_data.go @@ -13,7 +13,6 @@ import ( "github.com/pingcap/tidb/br/pkg/config" "github.com/pingcap/tidb/br/pkg/conn" "github.com/pingcap/tidb/br/pkg/conn/util" - berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/restore" "github.com/pingcap/tidb/br/pkg/storage" @@ -58,7 +57,7 @@ func RunResolveKvData(c context.Context, g glue.Glue, cmdName string, cfg *Resto return errors.Trace(err) } - resolveTS, numBackupStore, err := ReadBackupMetaData(ctx, externStorage) + resolveTS, numStores, err := ReadBackupMetaData(ctx, externStorage) if err != nil { return errors.Trace(err) } @@ -122,26 +121,23 @@ func RunResolveKvData(c context.Context, g glue.Glue, cmdName string, cfg *Resto if err != nil { return errors.Trace(err) } - numOnlineStore := len(allStores) - // in this version, it suppose to have the same number of tikvs between backup cluster and restore cluster - if numOnlineStore != numBackupStore { - log.Warn("the restore meta contains the number of tikvs inconsist with the resore cluster, retry ...", zap.Int("current stores", len(allStores)), zap.Int("backup stores", numBackupStore)) - return errors.Annotatef(berrors.ErrRestoreTotalKVMismatch, - "number of tikvs mismatch") - } return nil }, utils.NewPDReqBackofferExt(), ) + restoreNumStores := len(allStores) + if restoreNumStores != numStores { + log.Warn("the number of stores in the cluster has changed", zap.Int("origin", numStores), zap.Int("current", restoreNumStores)) + } if err != nil { return errors.Trace(err) } - log.Debug("total tikv", zap.Int("total", numBackupStore), zap.String("progress file", cfg.ProgressFile)) + log.Debug("total tikv", zap.Int("total", restoreNumStores), zap.String("progress file", cfg.ProgressFile)) // progress = read meta + send recovery + iterate tikv + (1 * prepareflashback + 1 * flashback) - progress := g.StartProgress(ctx, cmdName, int64(numBackupStore*3+2), !cfg.LogProgress) - go progressFileWriterRoutine(ctx, progress, int64(numBackupStore*3+2), cfg.ProgressFile) + progress := g.StartProgress(ctx, cmdName, int64(restoreNumStores*3+2), !cfg.LogProgress) + go progressFileWriterRoutine(ctx, progress, int64(restoreNumStores*3+2), cfg.ProgressFile) // restore tikv data from a snapshot volume var totalRegions int diff --git a/br/pkg/utils/BUILD.bazel b/br/pkg/utils/BUILD.bazel index 5c17ab603086b..e2d782f18d9ed 100644 --- a/br/pkg/utils/BUILD.bazel +++ b/br/pkg/utils/BUILD.bazel @@ -90,7 +90,7 @@ go_test( ], embed = [":utils"], flaky = True, - shard_count = 36, + shard_count = 37, deps = [ "//br/pkg/errors", "//br/pkg/metautil", diff --git a/br/pkg/utils/backoff.go b/br/pkg/utils/backoff.go index 6b7aa7a127863..a1a3cdfa436b0 100644 --- a/br/pkg/utils/backoff.go +++ b/br/pkg/utils/backoff.go @@ -6,6 +6,7 @@ import ( "context" "database/sql" "io" + "math" "strings" "time" @@ -62,6 +63,20 @@ func isGRPCCancel(err error) bool { return false } +// ConstantBackoff is a backoffer that retry forever until success. +type ConstantBackoff time.Duration + +// NextBackoff returns a duration to wait before retrying again +func (c ConstantBackoff) NextBackoff(err error) time.Duration { + return time.Duration(c) +} + +// Attempt returns the remain attempt times +func (c ConstantBackoff) Attempt() int { + // A large enough value. Also still safe for arithmetic operations (won't easily overflow). + return math.MaxInt16 +} + // RetryState is the mutable state needed for retrying. // It likes the `utils.Backoffer`, but more fundamental: // this only control the backoff time and knows nothing about what error happens. diff --git a/br/pkg/utils/backoff_test.go b/br/pkg/utils/backoff_test.go index 316896bde3f0d..dca90aa77d9cd 100644 --- a/br/pkg/utils/backoff_test.go +++ b/br/pkg/utils/backoff_test.go @@ -4,7 +4,9 @@ package utils_test import ( "context" + "fmt" "io" + "math" "testing" "time" @@ -197,3 +199,43 @@ func TestNewBackupSSTBackofferWithCancel(t *testing.T) { context.Canceled, }, multierr.Errors(err)) } + +func TestConstantBackoff(t *testing.T) { + backedOff := func(t *testing.T) { + backoffer := utils.ConstantBackoff(10 * time.Millisecond) + ctx, cancel := context.WithCancel(context.Background()) + i := 0 + ch := make(chan error) + + go func() { + _, err := utils.WithRetryV2(ctx, backoffer, func(ctx context.Context) (struct{}, error) { + i += 1 + return struct{}{}, fmt.Errorf("%d times, no meaning", i) + }) + ch <- err + }() + time.Sleep(100 * time.Millisecond) + cancel() + require.Error(t, <-ch) + // Make sure we have backed off. + require.Less(t, i, 20) + } + + infRetry := func(t *testing.T) { + backoffer := utils.ConstantBackoff(0) + ctx := context.Background() + i := math.MaxInt16 + + _, err := utils.WithRetryV2(ctx, backoffer, func(ctx context.Context) (struct{}, error) { + i -= 1 + if i == 0 { + return struct{}{}, nil + } + return struct{}{}, fmt.Errorf("try %d more times", i) + }) + require.NoError(t, err) + } + + t.Run("backedOff", backedOff) + t.Run("infRetry", infRetry) +} diff --git a/br/pkg/utils/retry.go b/br/pkg/utils/retry.go index 7c2e2dadf470d..8be761553dad6 100644 --- a/br/pkg/utils/retry.go +++ b/br/pkg/utils/retry.go @@ -239,6 +239,7 @@ func WithRetryV2[T any]( allErrors = multierr.Append(allErrors, err) select { case <-ctx.Done(): + // allErrors must not be `nil` here, so ignore the context error. return *new(T), allErrors case <-time.After(backoffer.NextBackoff(err)): } diff --git a/tests/realtikvtest/brietest/operator_test.go b/tests/realtikvtest/brietest/operator_test.go index 863ae10f12ade..37eae760a96d8 100644 --- a/tests/realtikvtest/brietest/operator_test.go +++ b/tests/realtikvtest/brietest/operator_test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/google/uuid" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/br/pkg/task" @@ -224,3 +225,33 @@ func TestOperator(t *testing.T) { verifySchedulerNotStopped(req, cfg) verifyGCNotStopped(req, cfg) } + +func TestFailure(t *testing.T) { + req := require.New(t) + req.NoError(failpoint.Enable("github.com/pingcap/tidb/br/pkg/backup/prepare_snap/PrepareConnectionsErr", "return()")) + // Make goleak happy. + req.NoError(failpoint.Enable("github.com/pingcap/tidb/br/pkg/task/operator/SkipReadyHint", "return()")) + defer func() { + req.NoError(failpoint.Disable("github.com/pingcap/tidb/br/pkg/backup/prepare_snap/PrepareConnectionsErr")) + req.NoError(failpoint.Disable("github.com/pingcap/tidb/br/pkg/task/operator/SkipReadyHint")) + }() + + cfg := operator.PauseGcConfig{ + Config: task.Config{ + PD: []string{"127.0.0.1:2379"}, + }, + TTL: 5 * time.Minute, + SafePoint: oracle.GoTimeToTS(time.Now()), + } + + verifyGCNotStopped(req, cfg) + verifySchedulerNotStopped(req, cfg) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err := operator.AdaptEnvForSnapshotBackup(ctx, &cfg) + require.Error(t, err) + + verifyGCNotStopped(req, cfg) + verifySchedulerNotStopped(req, cfg) +}