Skip to content

Commit

Permalink
Merge branch 'master' into fix_cte_ilj_dead_lock
Browse files Browse the repository at this point in the history
  • Loading branch information
guo-shaoge authored Aug 23, 2021
2 parents 8c6827c + be4cdb2 commit 6a26501
Show file tree
Hide file tree
Showing 7 changed files with 202 additions and 48 deletions.
111 changes: 75 additions & 36 deletions br/pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/br/pkg/rtree"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/tikv/pd/pkg/codec"
Expand Down Expand Up @@ -294,6 +295,33 @@ func (rs *RegionSplitter) ScatterRegions(ctx context.Context, newRegions []*Regi
}
}

func checkRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) error {
// current pd can't guarantee the consistency of returned regions
if len(regions) == 0 {
return errors.Annotatef(berrors.ErrPDBatchScanRegion, "scan region return empty result, startKey: %s, endkey: %s",
redact.Key(startKey), redact.Key(endKey))
}

if bytes.Compare(regions[0].Region.StartKey, startKey) > 0 {
return errors.Annotatef(berrors.ErrPDBatchScanRegion, "first region's startKey > startKey, startKey: %s, regionStartKey: %s",
redact.Key(startKey), redact.Key(regions[0].Region.StartKey))
} else if len(regions[len(regions)-1].Region.EndKey) != 0 && bytes.Compare(regions[len(regions)-1].Region.EndKey, endKey) < 0 {
return errors.Annotatef(berrors.ErrPDBatchScanRegion, "last region's endKey < startKey, startKey: %s, regionStartKey: %s",
redact.Key(endKey), redact.Key(regions[len(regions)-1].Region.EndKey))
}

cur := regions[0]
for _, r := range regions[1:] {
if !bytes.Equal(cur.Region.EndKey, r.Region.StartKey) {
return errors.Annotatef(berrors.ErrPDBatchScanRegion, "region endKey not equal to next region startKey, endKey: %s, startKey: %s",
redact.Key(cur.Region.EndKey), redact.Key(r.Region.StartKey))
}
cur = r
}

return nil
}

// PaginateScanRegion scan regions with a limit pagination and
// return all regions at once.
// It reduces max gRPC message size.
Expand All @@ -305,50 +333,61 @@ func PaginateScanRegion(
hex.EncodeToString(startKey), hex.EncodeToString(endKey))
}

regions := []*RegionInfo{}
scanStartKey := startKey
for {
batch, err := client.ScanRegions(ctx, scanStartKey, endKey, limit)
if err != nil {
return nil, errors.Trace(err)
}
regions = append(regions, batch...)
if len(batch) < limit {
// No more region
break
var regions []*RegionInfo
err := utils.WithRetry(ctx, func() error {
regions = []*RegionInfo{}
scanStartKey := startKey
for {
batch, err := client.ScanRegions(ctx, scanStartKey, endKey, limit)
if err != nil {
return errors.Trace(err)
}
regions = append(regions, batch...)
if len(batch) < limit {
// No more region
break
}
scanStartKey = batch[len(batch)-1].Region.GetEndKey()
if len(scanStartKey) == 0 ||
(len(endKey) > 0 && bytes.Compare(scanStartKey, endKey) >= 0) {
// All key space have scanned
break
}
}
scanStartKey = batch[len(batch)-1].Region.GetEndKey()
if len(scanStartKey) == 0 ||
(len(endKey) > 0 && bytes.Compare(scanStartKey, endKey) >= 0) {
// All key space have scanned
break
if err := checkRegionConsistency(startKey, endKey, regions); err != nil {
log.Warn("failed to scan region, retrying", logutil.ShortError(err))
return err
}
}
return nil
}, newScanRegionBackoffer())

// current pd can't guarantee the consistency of returned regions
if len(regions) == 0 {
return nil, errors.Annotatef(berrors.ErrPDBatchScanRegion, "scan region return empty result, startKey: %s, endkey: %s",
hex.EncodeToString(startKey), hex.EncodeToString(endKey))
}
return regions, err
}

if bytes.Compare(regions[0].Region.StartKey, startKey) > 0 {
return nil, errors.Annotatef(berrors.ErrPDBatchScanRegion, "first region's startKey > startKey, startKey: %s, regionStartKey: %s",
hex.EncodeToString(startKey), hex.EncodeToString(regions[0].Region.StartKey))
} else if len(regions[len(regions)-1].Region.EndKey) != 0 && bytes.Compare(regions[len(regions)-1].Region.EndKey, endKey) < 0 {
return nil, errors.Annotatef(berrors.ErrPDBatchScanRegion, "last region's endKey < startKey, startKey: %s, regionStartKey: %s",
hex.EncodeToString(endKey), hex.EncodeToString(regions[len(regions)-1].Region.EndKey))
type scanRegionBackoffer struct {
attempt int
}

func newScanRegionBackoffer() utils.Backoffer {
return &scanRegionBackoffer{
attempt: 3,
}
}

cur := regions[0]
for _, r := range regions[1:] {
if !bytes.Equal(cur.Region.EndKey, r.Region.StartKey) {
return nil, errors.Annotatef(berrors.ErrPDBatchScanRegion, "region endKey not equal to next region startKey, endKey: %s, startKey: %s",
hex.EncodeToString(cur.Region.EndKey), hex.EncodeToString(r.Region.StartKey))
}
cur = r
// NextBackoff returns a duration to wait before retrying again
func (b *scanRegionBackoffer) NextBackoff(err error) time.Duration {
if berrors.ErrPDBatchScanRegion.Equal(err) {
// 500ms * 3 could be enough for splitting remain regions in the hole.
b.attempt--
return 500 * time.Millisecond
}
b.attempt = 0
return 0
}

return regions, nil
// Attempt returns the remain attempt times
func (b *scanRegionBackoffer) Attempt() int {
return b.attempt
}

// getSplitKeys checks if the regions should be split by the new prefix of the rewrites rule and the end key of
Expand Down
25 changes: 25 additions & 0 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/spf13/pflag"
"go.uber.org/zap"
)
Expand All @@ -43,6 +44,7 @@ const (
maxRetries = 7
// max number of retries when meets error
maxErrorRetries = 3
ec2MetaAddress = "169.254.169.254"

// the maximum number of byte to read for seek.
maxSkipOffsetByRead = 1 << 16 // 64KB
Expand Down Expand Up @@ -736,6 +738,29 @@ type retryerWithLog struct {
client.DefaultRetryer
}

func isDeadlineExceedError(err error) bool {
// TODO find a better way.
// Known challenges:
//
// If we want to unwrap the r.Error:
// 1. the err should be an awserr.Error (let it be awsErr)
// 2. awsErr.OrigErr() should be an *url.Error (let it be urlErr).
// 3. urlErr.Err should be a http.httpError (which is private).
//
// If we want to reterive the error from the request context:
// The error of context in the HTTPRequest (i.e. r.HTTPRequest.Context().Err() ) is nil.
return strings.Contains(err.Error(), "context deadline exceeded")
}

func (rl retryerWithLog) ShouldRetry(r *request.Request) bool {
if isDeadlineExceedError(r.Error) && r.HTTPRequest.URL.Host == ec2MetaAddress {
// fast fail for unreachable linklocal address in EC2 containers.
log.Warn("failed to get EC2 metadata. skipping.", logutil.ShortError(r.Error))
return false
}
return rl.DefaultRetryer.ShouldRetry(r)
}

func (rl retryerWithLog) RetryRules(r *request.Request) time.Duration {
backoffTime := rl.DefaultRetryer.RetryRules(r)
if backoffTime > 0 {
Expand Down
22 changes: 12 additions & 10 deletions ddl/label/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,20 @@ func (r *Rule) Reset(id int64, dbName, tableName string, partName ...string) *Ru
}

var hasDBKey, hasTableKey, hasPartitionKey bool
for _, label := range r.Labels {
if label.Key == dbKey {
label.Value = dbName
for i := range r.Labels {
switch r.Labels[i].Key {
case dbKey:
r.Labels[i].Value = dbName
hasDBKey = true
}
if label.Key == tableKey {
label.Value = tableName
case tableKey:
r.Labels[i].Value = tableName
hasTableKey = true
}
if isPartition && label.Key == partitionKey {
label.Value = partName[0]
hasPartitionKey = true
case partitionKey:
if isPartition {
r.Labels[i].Value = partName[0]
hasPartitionKey = true
}
default:
}
}

Expand Down
12 changes: 11 additions & 1 deletion ddl/label/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (t *testRuleSuite) TestApplyAttributesSpec(c *C) {
c.Assert(rule.Labels[1].Key, Equals, "attr2")
}

func (t *testRuleSuite) TestResetID(c *C) {
func (t *testRuleSuite) TestReset(c *C) {
rule := NewRule()
rule.Reset(1, "db1", "t1")
c.Assert(rule.ID, Equals, "schema/db1/t1")
Expand All @@ -46,4 +46,14 @@ func (t *testRuleSuite) TestResetID(c *C) {

r1 := rule.Clone()
c.Assert(rule, DeepEquals, r1)

r2 := rule.Reset(2, "db2", "t2", "p2")
c.Assert(r2.ID, Equals, "schema/db2/t2/p2")
c.Assert(r2.Labels, HasLen, 3)
c.Assert(rule.Labels[0].Value, Equals, "db2")
c.Assert(rule.Labels[1].Value, Equals, "t2")
c.Assert(rule.Labels[2].Value, Equals, "p2")
r = r2.Rule.(map[string]string)
c.Assert(r["start_key"], Equals, "7480000000000000ff025f720000000000fa")
c.Assert(r["end_key"], Equals, "7480000000000000ff035f720000000000fa")
}
39 changes: 39 additions & 0 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,45 @@ func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ erro
return 0, errors.Wrapf(err, "failed to notify PD the placement rules")
}

tableRuleID := fmt.Sprintf(label.TableIDFormat, label.IDPrefix, job.SchemaName, tblInfo.Name.L)
ids := []string{tableRuleID}
if tblInfo.GetPartitionInfo() != nil {
for _, def := range tblInfo.GetPartitionInfo().Definitions {
ids = append(ids, fmt.Sprintf(label.PartitionIDFormat, label.IDPrefix, job.SchemaName, tblInfo.Name.L, def.Name.L))
}
}

oldRules, err := infosync.GetLabelRules(context.TODO(), ids)
if err != nil {
job.State = model.JobStateCancelled
return 0, errors.Wrapf(err, "failed to get PD the label rule")
}

var newRules []*label.Rule
for _, r := range oldRules {
if r.ID == tableRuleID {
newRules = append(newRules, r.Clone().Reset(newTableID, job.SchemaName, tblInfo.Name.L))
}
}

if tblInfo.GetPartitionInfo() != nil {
for _, r := range oldRules {
for _, def := range tblInfo.GetPartitionInfo().Definitions {
if r.ID == fmt.Sprintf(label.PartitionIDFormat, label.IDPrefix, job.SchemaName, tblInfo.Name.L, def.Name.L) {
newRules = append(newRules, r.Clone().Reset(def.ID, job.SchemaName, tblInfo.Name.L, def.Name.L))
}
}
}
}

// update the key range with same rule id.
patch := label.NewRulePatch(newRules, nil)
err = infosync.UpdateLabelRules(context.TODO(), patch)
if err != nil {
job.State = model.JobStateCancelled
return 0, errors.Wrapf(err, "failed to notify PD the label rules")
}

// Clear the tiflash replica available status.
if tblInfo.TiFlashReplica != nil {
tblInfo.TiFlashReplica.AvailablePartitionIDs = nil
Expand Down
35 changes: 35 additions & 0 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,3 +892,38 @@ func GetAllLabelRules(ctx context.Context) ([]*label.Rule, error) {
}
return rules, err
}

// GetLabelRules gets the label rules according to the given IDs from PD.
func GetLabelRules(ctx context.Context, ruleIDs []string) ([]*label.Rule, error) {
if len(ruleIDs) == 0 {
return nil, nil
}

is, err := getGlobalInfoSyncer()
if err != nil {
return nil, err
}

if is.etcdCli == nil {
return nil, nil
}

addrs := is.etcdCli.Endpoints()

if len(addrs) == 0 {
return nil, errors.Errorf("pd unavailable")
}

ids, err := json.Marshal(ruleIDs)
if err != nil {
return nil, err
}

rules := []*label.Rule{}
res, err := doRequest(ctx, addrs, path.Join(pdapi.Config, "region-label", "rules", "ids"), "GET", bytes.NewReader(ids))

if err == nil && res != nil {
err = json.Unmarshal(res, &rules)
}
return rules, err
}
6 changes: 5 additions & 1 deletion executor/slow_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,11 @@ func (e *slowQueryRetriever) parseSlowLog(ctx context.Context, sctx sessionctx.C
if err != nil {
t := slowLogTask{}
t.resultCh = make(chan parsedSlowLog, 1)
e.taskList <- t
select {
case <-ctx.Done():
return
case e.taskList <- t:
}
e.sendParsedSlowLogCh(ctx, t, parsedSlowLog{nil, err})
}
if len(logs) == 0 || len(logs[0]) == 0 {
Expand Down

0 comments on commit 6a26501

Please sign in to comment.