diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index 35244b9b1bd60..77db3f8f94e49 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -17,6 +17,7 @@ import ( "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/logutil" + "github.com/pingcap/tidb/br/pkg/redact" "github.com/pingcap/tidb/br/pkg/rtree" "github.com/pingcap/tidb/br/pkg/utils" "github.com/tikv/pd/pkg/codec" @@ -294,6 +295,33 @@ func (rs *RegionSplitter) ScatterRegions(ctx context.Context, newRegions []*Regi } } +func checkRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) error { + // current pd can't guarantee the consistency of returned regions + if len(regions) == 0 { + return errors.Annotatef(berrors.ErrPDBatchScanRegion, "scan region return empty result, startKey: %s, endkey: %s", + redact.Key(startKey), redact.Key(endKey)) + } + + if bytes.Compare(regions[0].Region.StartKey, startKey) > 0 { + return errors.Annotatef(berrors.ErrPDBatchScanRegion, "first region's startKey > startKey, startKey: %s, regionStartKey: %s", + redact.Key(startKey), redact.Key(regions[0].Region.StartKey)) + } else if len(regions[len(regions)-1].Region.EndKey) != 0 && bytes.Compare(regions[len(regions)-1].Region.EndKey, endKey) < 0 { + return errors.Annotatef(berrors.ErrPDBatchScanRegion, "last region's endKey < startKey, startKey: %s, regionStartKey: %s", + redact.Key(endKey), redact.Key(regions[len(regions)-1].Region.EndKey)) + } + + cur := regions[0] + for _, r := range regions[1:] { + if !bytes.Equal(cur.Region.EndKey, r.Region.StartKey) { + return errors.Annotatef(berrors.ErrPDBatchScanRegion, "region endKey not equal to next region startKey, endKey: %s, startKey: %s", + redact.Key(cur.Region.EndKey), redact.Key(r.Region.StartKey)) + } + cur = r + } + + return nil +} + // PaginateScanRegion scan regions with a limit pagination and // return all regions at once. // It reduces max gRPC message size. @@ -305,50 +333,61 @@ func PaginateScanRegion( hex.EncodeToString(startKey), hex.EncodeToString(endKey)) } - regions := []*RegionInfo{} - scanStartKey := startKey - for { - batch, err := client.ScanRegions(ctx, scanStartKey, endKey, limit) - if err != nil { - return nil, errors.Trace(err) - } - regions = append(regions, batch...) - if len(batch) < limit { - // No more region - break + var regions []*RegionInfo + err := utils.WithRetry(ctx, func() error { + regions = []*RegionInfo{} + scanStartKey := startKey + for { + batch, err := client.ScanRegions(ctx, scanStartKey, endKey, limit) + if err != nil { + return errors.Trace(err) + } + regions = append(regions, batch...) + if len(batch) < limit { + // No more region + break + } + scanStartKey = batch[len(batch)-1].Region.GetEndKey() + if len(scanStartKey) == 0 || + (len(endKey) > 0 && bytes.Compare(scanStartKey, endKey) >= 0) { + // All key space have scanned + break + } } - scanStartKey = batch[len(batch)-1].Region.GetEndKey() - if len(scanStartKey) == 0 || - (len(endKey) > 0 && bytes.Compare(scanStartKey, endKey) >= 0) { - // All key space have scanned - break + if err := checkRegionConsistency(startKey, endKey, regions); err != nil { + log.Warn("failed to scan region, retrying", logutil.ShortError(err)) + return err } - } + return nil + }, newScanRegionBackoffer()) - // current pd can't guarantee the consistency of returned regions - if len(regions) == 0 { - return nil, errors.Annotatef(berrors.ErrPDBatchScanRegion, "scan region return empty result, startKey: %s, endkey: %s", - hex.EncodeToString(startKey), hex.EncodeToString(endKey)) - } + return regions, err +} - if bytes.Compare(regions[0].Region.StartKey, startKey) > 0 { - return nil, errors.Annotatef(berrors.ErrPDBatchScanRegion, "first region's startKey > startKey, startKey: %s, regionStartKey: %s", - hex.EncodeToString(startKey), hex.EncodeToString(regions[0].Region.StartKey)) - } else if len(regions[len(regions)-1].Region.EndKey) != 0 && bytes.Compare(regions[len(regions)-1].Region.EndKey, endKey) < 0 { - return nil, errors.Annotatef(berrors.ErrPDBatchScanRegion, "last region's endKey < startKey, startKey: %s, regionStartKey: %s", - hex.EncodeToString(endKey), hex.EncodeToString(regions[len(regions)-1].Region.EndKey)) +type scanRegionBackoffer struct { + attempt int +} + +func newScanRegionBackoffer() utils.Backoffer { + return &scanRegionBackoffer{ + attempt: 3, } +} - cur := regions[0] - for _, r := range regions[1:] { - if !bytes.Equal(cur.Region.EndKey, r.Region.StartKey) { - return nil, errors.Annotatef(berrors.ErrPDBatchScanRegion, "region endKey not equal to next region startKey, endKey: %s, startKey: %s", - hex.EncodeToString(cur.Region.EndKey), hex.EncodeToString(r.Region.StartKey)) - } - cur = r +// NextBackoff returns a duration to wait before retrying again +func (b *scanRegionBackoffer) NextBackoff(err error) time.Duration { + if berrors.ErrPDBatchScanRegion.Equal(err) { + // 500ms * 3 could be enough for splitting remain regions in the hole. + b.attempt-- + return 500 * time.Millisecond } + b.attempt = 0 + return 0 +} - return regions, nil +// Attempt returns the remain attempt times +func (b *scanRegionBackoffer) Attempt() int { + return b.attempt } // getSplitKeys checks if the regions should be split by the new prefix of the rewrites rule and the end key of diff --git a/br/pkg/storage/s3.go b/br/pkg/storage/s3.go index d07fe6b30ca23..8fe1b6b3ae24c 100644 --- a/br/pkg/storage/s3.go +++ b/br/pkg/storage/s3.go @@ -26,6 +26,7 @@ import ( backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/br/pkg/logutil" "github.com/spf13/pflag" "go.uber.org/zap" ) @@ -43,6 +44,7 @@ const ( maxRetries = 7 // max number of retries when meets error maxErrorRetries = 3 + ec2MetaAddress = "169.254.169.254" // the maximum number of byte to read for seek. maxSkipOffsetByRead = 1 << 16 // 64KB @@ -736,6 +738,29 @@ type retryerWithLog struct { client.DefaultRetryer } +func isDeadlineExceedError(err error) bool { + // TODO find a better way. + // Known challenges: + // + // If we want to unwrap the r.Error: + // 1. the err should be an awserr.Error (let it be awsErr) + // 2. awsErr.OrigErr() should be an *url.Error (let it be urlErr). + // 3. urlErr.Err should be a http.httpError (which is private). + // + // If we want to reterive the error from the request context: + // The error of context in the HTTPRequest (i.e. r.HTTPRequest.Context().Err() ) is nil. + return strings.Contains(err.Error(), "context deadline exceeded") +} + +func (rl retryerWithLog) ShouldRetry(r *request.Request) bool { + if isDeadlineExceedError(r.Error) && r.HTTPRequest.URL.Host == ec2MetaAddress { + // fast fail for unreachable linklocal address in EC2 containers. + log.Warn("failed to get EC2 metadata. skipping.", logutil.ShortError(r.Error)) + return false + } + return rl.DefaultRetryer.ShouldRetry(r) +} + func (rl retryerWithLog) RetryRules(r *request.Request) time.Duration { backoffTime := rl.DefaultRetryer.RetryRules(r) if backoffTime > 0 { diff --git a/ddl/label/rule.go b/ddl/label/rule.go index fe2d5035686fd..b1529f598f5e6 100644 --- a/ddl/label/rule.go +++ b/ddl/label/rule.go @@ -93,18 +93,20 @@ func (r *Rule) Reset(id int64, dbName, tableName string, partName ...string) *Ru } var hasDBKey, hasTableKey, hasPartitionKey bool - for _, label := range r.Labels { - if label.Key == dbKey { - label.Value = dbName + for i := range r.Labels { + switch r.Labels[i].Key { + case dbKey: + r.Labels[i].Value = dbName hasDBKey = true - } - if label.Key == tableKey { - label.Value = tableName + case tableKey: + r.Labels[i].Value = tableName hasTableKey = true - } - if isPartition && label.Key == partitionKey { - label.Value = partName[0] - hasPartitionKey = true + case partitionKey: + if isPartition { + r.Labels[i].Value = partName[0] + hasPartitionKey = true + } + default: } } diff --git a/ddl/label/rule_test.go b/ddl/label/rule_test.go index 21b291425fe95..ea498d4a9e210 100644 --- a/ddl/label/rule_test.go +++ b/ddl/label/rule_test.go @@ -32,7 +32,7 @@ func (t *testRuleSuite) TestApplyAttributesSpec(c *C) { c.Assert(rule.Labels[1].Key, Equals, "attr2") } -func (t *testRuleSuite) TestResetID(c *C) { +func (t *testRuleSuite) TestReset(c *C) { rule := NewRule() rule.Reset(1, "db1", "t1") c.Assert(rule.ID, Equals, "schema/db1/t1") @@ -46,4 +46,14 @@ func (t *testRuleSuite) TestResetID(c *C) { r1 := rule.Clone() c.Assert(rule, DeepEquals, r1) + + r2 := rule.Reset(2, "db2", "t2", "p2") + c.Assert(r2.ID, Equals, "schema/db2/t2/p2") + c.Assert(r2.Labels, HasLen, 3) + c.Assert(rule.Labels[0].Value, Equals, "db2") + c.Assert(rule.Labels[1].Value, Equals, "t2") + c.Assert(rule.Labels[2].Value, Equals, "p2") + r = r2.Rule.(map[string]string) + c.Assert(r["start_key"], Equals, "7480000000000000ff025f720000000000fa") + c.Assert(r["end_key"], Equals, "7480000000000000ff035f720000000000fa") } diff --git a/ddl/table.go b/ddl/table.go index 4766b583affab..36ea2a6174b00 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -518,6 +518,45 @@ func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ erro return 0, errors.Wrapf(err, "failed to notify PD the placement rules") } + tableRuleID := fmt.Sprintf(label.TableIDFormat, label.IDPrefix, job.SchemaName, tblInfo.Name.L) + ids := []string{tableRuleID} + if tblInfo.GetPartitionInfo() != nil { + for _, def := range tblInfo.GetPartitionInfo().Definitions { + ids = append(ids, fmt.Sprintf(label.PartitionIDFormat, label.IDPrefix, job.SchemaName, tblInfo.Name.L, def.Name.L)) + } + } + + oldRules, err := infosync.GetLabelRules(context.TODO(), ids) + if err != nil { + job.State = model.JobStateCancelled + return 0, errors.Wrapf(err, "failed to get PD the label rule") + } + + var newRules []*label.Rule + for _, r := range oldRules { + if r.ID == tableRuleID { + newRules = append(newRules, r.Clone().Reset(newTableID, job.SchemaName, tblInfo.Name.L)) + } + } + + if tblInfo.GetPartitionInfo() != nil { + for _, r := range oldRules { + for _, def := range tblInfo.GetPartitionInfo().Definitions { + if r.ID == fmt.Sprintf(label.PartitionIDFormat, label.IDPrefix, job.SchemaName, tblInfo.Name.L, def.Name.L) { + newRules = append(newRules, r.Clone().Reset(def.ID, job.SchemaName, tblInfo.Name.L, def.Name.L)) + } + } + } + } + + // update the key range with same rule id. + patch := label.NewRulePatch(newRules, nil) + err = infosync.UpdateLabelRules(context.TODO(), patch) + if err != nil { + job.State = model.JobStateCancelled + return 0, errors.Wrapf(err, "failed to notify PD the label rules") + } + // Clear the tiflash replica available status. if tblInfo.TiFlashReplica != nil { tblInfo.TiFlashReplica.AvailablePartitionIDs = nil diff --git a/domain/infosync/info.go b/domain/infosync/info.go index e6bd04cffd0b7..e885bbeb175e4 100644 --- a/domain/infosync/info.go +++ b/domain/infosync/info.go @@ -892,3 +892,38 @@ func GetAllLabelRules(ctx context.Context) ([]*label.Rule, error) { } return rules, err } + +// GetLabelRules gets the label rules according to the given IDs from PD. +func GetLabelRules(ctx context.Context, ruleIDs []string) ([]*label.Rule, error) { + if len(ruleIDs) == 0 { + return nil, nil + } + + is, err := getGlobalInfoSyncer() + if err != nil { + return nil, err + } + + if is.etcdCli == nil { + return nil, nil + } + + addrs := is.etcdCli.Endpoints() + + if len(addrs) == 0 { + return nil, errors.Errorf("pd unavailable") + } + + ids, err := json.Marshal(ruleIDs) + if err != nil { + return nil, err + } + + rules := []*label.Rule{} + res, err := doRequest(ctx, addrs, path.Join(pdapi.Config, "region-label", "rules", "ids"), "GET", bytes.NewReader(ids)) + + if err == nil && res != nil { + err = json.Unmarshal(res, &rules) + } + return rules, err +} diff --git a/executor/slow_query.go b/executor/slow_query.go index 8633f7a5e00f0..46abdb9c201d0 100755 --- a/executor/slow_query.go +++ b/executor/slow_query.go @@ -433,7 +433,11 @@ func (e *slowQueryRetriever) parseSlowLog(ctx context.Context, sctx sessionctx.C if err != nil { t := slowLogTask{} t.resultCh = make(chan parsedSlowLog, 1) - e.taskList <- t + select { + case <-ctx.Done(): + return + case e.taskList <- t: + } e.sendParsedSlowLogCh(ctx, t, parsedSlowLog{nil, err}) } if len(logs) == 0 || len(logs[0]) == 0 {