diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index e32606207082e..cc88fd6a89483 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -91,6 +91,7 @@ const ( gRPCKeepAliveTime = 10 * time.Minute gRPCKeepAliveTimeout = 5 * time.Minute gRPCBackOffMaxDelay = 10 * time.Minute + writeStallSleepTime = 10 * time.Second // The max ranges count in a batch to split and scatter. maxBatchSplitRanges = 4096 @@ -381,6 +382,12 @@ type local struct { encBuilder backend.EncodingBuilder targetInfoGetter backend.TargetInfoGetter + + // When TiKV is in normal mode, ingesting too many SSTs will cause TiKV write stall. + // To avoid this, we should check write stall before ingesting SSTs. Note that, we + // must check both leader node and followers in client side, because followers will + // not check write stall as long as ingest command is accepted by leader. + shouldCheckWriteStall bool } func openDuplicateDB(storeDir string) (*pebble.DB, error) { @@ -503,6 +510,7 @@ func NewLocalBackend( logger: log.FromContext(ctx), encBuilder: NewEncodingBuilder(ctx), targetInfoGetter: NewTargetInfoGetter(tls, g, cfg.TiDB.PdAddr), + shouldCheckWriteStall: cfg.Cron.SwitchMode.Duration == 0, } if m, ok := metric.FromContext(ctx); ok { local.metrics = m @@ -1146,6 +1154,25 @@ func (local *local) Ingest(ctx context.Context, metas []*sst.SSTMeta, region *sp return resp, errors.Trace(err) } + if local.shouldCheckWriteStall { + for { + maybeWriteStall, err := local.checkWriteStall(ctx, region) + if err != nil { + return nil, err + } + if !maybeWriteStall { + break + } + log.FromContext(ctx).Warn("ingest maybe cause write stall, sleep and retry", + zap.Duration("duration", writeStallSleepTime)) + select { + case <-time.After(writeStallSleepTime): + case <-ctx.Done(): + return nil, errors.Trace(ctx.Err()) + } + } + } + req := &sst.MultiIngestRequest{ Context: reqCtx, Ssts: metas, @@ -1154,6 +1181,23 @@ func (local *local) Ingest(ctx context.Context, metas []*sst.SSTMeta, region *sp return resp, errors.Trace(err) } +func (local *local) checkWriteStall(ctx context.Context, region *split.RegionInfo) (bool, error) { + for _, peer := range region.Region.GetPeers() { + cli, err := local.getImportClient(ctx, peer.StoreId) + if err != nil { + return false, errors.Trace(err) + } + resp, err := cli.MultiIngest(ctx, &sst.MultiIngestRequest{}) + if err != nil { + return false, errors.Trace(err) + } + if resp.Error != nil && resp.Error.ServerIsBusy != nil { + return true, nil + } + } + return false, nil +} + func splitRangeBySizeProps(fullRange Range, sizeProps *sizeProperties, sizeLimit int64, keysLimit int64) []Range { ranges := make([]Range, 0, sizeProps.totalSize/uint64(sizeLimit)) curSize := uint64(0)