Skip to content

Commit

Permalink
Merge branch 'master' into fix-rg-meta
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei authored Feb 3, 2023
2 parents 5750ff4 + 3c4976b commit 95b2ed5
Show file tree
Hide file tree
Showing 68 changed files with 11,312 additions and 10,422 deletions.
8 changes: 6 additions & 2 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1656,6 +1656,9 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi
// the table when table is created.
needSplit := len(unfinishedRanges) > 1 || lfTotalSize > regionSplitSize || lfLength > regionSplitKeys
// split region by given ranges
failpoint.Inject("failToSplit", func(_ failpoint.Value) {
needSplit = true
})
for i := 0; i < maxRetryTimes; i++ {
err = local.SplitAndScatterRegionInBatches(ctx, unfinishedRanges, lf.tableInfo, needSplit, regionSplitSize, maxBatchSplitRanges)
if err == nil || common.IsContextCanceledError(err) {
Expand Down Expand Up @@ -2063,7 +2066,8 @@ func nextKey(key []byte) []byte {

// in tikv <= 4.x, tikv will truncate the row key, so we should fetch the next valid row key
// See: https://github.com/tikv/tikv/blob/f7f22f70e1585d7ca38a59ea30e774949160c3e8/components/raftstore/src/coprocessor/split_observer.rs#L36-L41
if tablecodec.IsRecordKey(key) {
// we only do this for IntHandle, which is checked by length
if tablecodec.IsRecordKey(key) && len(key) == tablecodec.RecordRowKeyLen {
tableID, handle, _ := tablecodec.DecodeRecordKey(key)
nextHandle := handle.Next()
// int handle overflow, use the next table prefix as nextKey
Expand All @@ -2073,7 +2077,7 @@ func nextKey(key []byte) []byte {
return tablecodec.EncodeRowKeyWithHandle(tableID, nextHandle)
}

// if key is an index, directly append a 0x00 to the key.
// for index key and CommonHandle, directly append a 0x00 to the key.
res := make([]byte, 0, len(key)+1)
res = append(res, key...)
res = append(res, 0)
Expand Down
15 changes: 13 additions & 2 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,21 @@ func TestNextKey(t *testing.T) {
require.NoError(t, err)
nextHdl, err := tidbkv.NewCommonHandle(nextKeyBytes)
require.NoError(t, err)
expectNextKey := []byte(tablecodec.EncodeRowKeyWithHandle(1, nextHdl))
require.Equal(t, expectNextKey, nextKey(key))
nextValidKey := []byte(tablecodec.EncodeRowKeyWithHandle(1, nextHdl))
// nextKey may return a key that can't be decoded, but it must not be larger than the valid next key.
require.True(t, bytes.Compare(nextKey(key), nextValidKey) <= 0, "datums: %v", datums)
}

// a special case that when len(string datum) % 8 == 7, nextKey twice should not panic.
keyBytes, err := codec.EncodeKey(stmtCtx, nil, types.NewStringDatum("1234567"))
require.NoError(t, err)
h, err := tidbkv.NewCommonHandle(keyBytes)
require.NoError(t, err)
key = tablecodec.EncodeRowKeyWithHandle(1, h)
nextOnce := nextKey(key)
// should not panic
_ = nextKey(nextOnce)

// dIAAAAAAAAD/PV9pgAAAAAD/AAABA4AAAAD/AAAAAQOAAAD/AAAAAAEAAAD8
// a index key with: table: 61, index: 1, int64: 1, int64: 1
a := []byte{116, 128, 0, 0, 0, 0, 0, 0, 255, 61, 95, 105, 128, 0, 0, 0, 0, 255, 0, 0, 1, 3, 128, 0, 0, 0, 255, 0, 0, 0, 1, 3, 128, 0, 0, 255, 0, 0, 0, 0, 1, 0, 0, 0, 252}
Expand Down
10 changes: 9 additions & 1 deletion br/pkg/lightning/backend/local/localhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
sst "github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
Expand Down Expand Up @@ -379,7 +380,14 @@ func fetchTableRegionSizeStats(ctx context.Context, db *sql.DB, tableID int64) (
return stats, errors.Trace(err)
}

func (local *local) BatchSplitRegions(ctx context.Context, region *split.RegionInfo, keys [][]byte) (*split.RegionInfo, []*split.RegionInfo, error) {
func (local *local) BatchSplitRegions(
ctx context.Context,
region *split.RegionInfo,
keys [][]byte,
) (*split.RegionInfo, []*split.RegionInfo, error) {
failpoint.Inject("failToSplit", func(_ failpoint.Value) {
failpoint.Return(nil, nil, errors.New("retryable error"))
})
region, newRegions, err := local.splitCli.BatchSplitRegionsWithOrigin(ctx, region, keys)
if err != nil {
return nil, nil, errors.Annotatef(err, "batch split regions failed")
Expand Down
47 changes: 16 additions & 31 deletions br/pkg/restore/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package restore
import (
"context"
"io"
"sync/atomic"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -80,29 +79,27 @@ func NewStoreMeta(storeId uint64) StoreMeta {

// for test
type Recovery struct {
allStores []*metapb.Store
StoreMetas []StoreMeta
RecoveryPlan map[uint64][]*recovpb.RecoverRegionRequest
MaxAllocID uint64
mgr *conn.Mgr
progress glue.Progress
concurrency uint32
totalFlashbackRegions uint64
allStores []*metapb.Store
StoreMetas []StoreMeta
RecoveryPlan map[uint64][]*recovpb.RecoverRegionRequest
MaxAllocID uint64
mgr *conn.Mgr
progress glue.Progress
concurrency uint32
}

func NewRecovery(allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progress, concurrency uint32) Recovery {
totalStores := len(allStores)
var StoreMetas = make([]StoreMeta, totalStores)
var regionRecovers = make(map[uint64][]*recovpb.RecoverRegionRequest, totalStores)
return Recovery{
allStores: allStores,
StoreMetas: StoreMetas,
RecoveryPlan: regionRecovers,
MaxAllocID: 0,
mgr: mgr,
progress: progress,
concurrency: concurrency,
totalFlashbackRegions: 0}
allStores: allStores,
StoreMetas: StoreMetas,
RecoveryPlan: regionRecovers,
MaxAllocID: 0,
mgr: mgr,
progress: progress,
concurrency: concurrency}
}

func (recovery *Recovery) newRecoveryClient(ctx context.Context, storeAddr string) (recovpb.RecoverDataClient, *grpc.ClientConn, error) {
Expand Down Expand Up @@ -305,12 +302,8 @@ func (recovery *Recovery) WaitApply(ctx context.Context) (err error) {

// prepare the region for flashback the data, the purpose is to stop region service, put region in flashback state
func (recovery *Recovery) PrepareFlashbackToVersion(ctx context.Context, resolveTS uint64, startTS uint64) (err error) {
var totalRegions atomic.Uint64
totalRegions.Store(0)

handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
stats, err := ddl.SendPrepareFlashbackToVersionRPC(ctx, recovery.mgr.GetStorage().(tikv.Storage), resolveTS, startTS, r)
totalRegions.Add(uint64(stats.CompletedRegions))
return stats, err
}

Expand All @@ -321,23 +314,16 @@ func (recovery *Recovery) PrepareFlashbackToVersion(ctx context.Context, resolve
log.Error("region flashback prepare get error")
return errors.Trace(err)
}

recovery.totalFlashbackRegions = totalRegions.Load()
recovery.progress.Inc()
log.Info("region flashback prepare complete", zap.Int("regions", runner.CompletedRegions()))

return nil
}

// flashback the region data to version resolveTS
func (recovery *Recovery) FlashbackToVersion(ctx context.Context, resolveTS uint64, commitTS uint64) (err error) {
var completedRegions atomic.Uint64

// only know the total progress of tikv, progress is total state of the whole restore flow.
ratio := int(recovery.totalFlashbackRegions) / len(recovery.allStores)

handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
stats, err := ddl.SendFlashbackToVersionRPC(ctx, recovery.mgr.GetStorage().(tikv.Storage), resolveTS, commitTS-1, commitTS, r)
completedRegions.Add(uint64(stats.CompletedRegions))
return stats, err
}

Expand All @@ -352,13 +338,12 @@ func (recovery *Recovery) FlashbackToVersion(ctx context.Context, resolveTS uint
return errors.Trace(err)
}

recovery.progress.IncBy(int64(completedRegions.Load()) / int64(ratio))

log.Info("region flashback complete",
zap.Uint64("resolveTS", resolveTS),
zap.Uint64("commitTS", commitTS),
zap.Int("regions", runner.CompletedRegions()))

recovery.progress.Inc()
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions br/pkg/task/restore_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ func RunResolveKvData(c context.Context, g glue.Glue, cmdName string, cfg *Resto
}

log.Debug("total tikv", zap.Int("total", numBackupStore), zap.String("progress file", cfg.ProgressFile))
// progress = read meta + send recovery + iterate tikv + flashback.
progress := g.StartProgress(ctx, cmdName, int64(numBackupStore*4), !cfg.LogProgress)
go progressFileWriterRoutine(ctx, progress, int64(numBackupStore*4), cfg.ProgressFile)
// progress = read meta + send recovery + iterate tikv + (1 * prepareflashback + 1 * flashback)
progress := g.StartProgress(ctx, cmdName, int64(numBackupStore*3+2), !cfg.LogProgress)
go progressFileWriterRoutine(ctx, progress, int64(numBackupStore*3+2), cfg.ProgressFile)

// restore tikv data from a snapshot volume
var totalRegions int
Expand Down
2 changes: 1 addition & 1 deletion br/tests/lightning_local_backend/data/cpeng.a-schema.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
create table a (c int);
create table a (c VARCHAR(20) PRIMARY KEY);
2 changes: 1 addition & 1 deletion br/tests/lightning_local_backend/data/cpeng.a.1.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
insert into a values (1);
insert into a values ('0000001');
2 changes: 1 addition & 1 deletion br/tests/lightning_local_backend/data/cpeng.a.2.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
insert into a values (2);
insert into a values ('0000002');
2 changes: 1 addition & 1 deletion br/tests/lightning_local_backend/data/cpeng.a.3.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
insert into a values (3),(4);
insert into a values ('0000003'),('0000004');
5 changes: 3 additions & 2 deletions br/tests/lightning_local_backend/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ grep -Fq 'table(s) [`cpeng`.`a`, `cpeng`.`b`] are not empty' $TEST_DIR/lightning


# First, verify that inject with not leader error is fine.
export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/lightning/backend/local/FailIngestMeta=1*return("notleader")'
export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/lightning/backend/local/FailIngestMeta=1*return("notleader");github.com/pingcap/tidb/br/pkg/lightning/backend/local/failToSplit=2*return("")'
rm -f "$TEST_DIR/lightning-local.log"
run_sql 'DROP DATABASE IF EXISTS cpeng;'
run_lightning --backend local --enable-checkpoint=1 --log-file "$TEST_DIR/lightning-local.log" --config "tests/$TEST_NAME/config.toml"
run_lightning --backend local --enable-checkpoint=1 --log-file "$TEST_DIR/lightning-local.log" --config "tests/$TEST_NAME/config.toml" -L debug
grep -Eq "split regions.*retryable error" "$TEST_DIR/lightning-local.log"

# Check that everything is correctly imported
run_sql 'SELECT count(*), sum(c) FROM cpeng.a'
Expand Down
5 changes: 4 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,8 @@ type Config struct {
TiFlashComputeAutoScalerAddr string `toml:"autoscaler-addr" json:"autoscaler-addr"`
IsTiFlashComputeFixedPool bool `toml:"is-tiflashcompute-fixed-pool" json:"is-tiflashcompute-fixed-pool"`
AutoScalerClusterID string `toml:"autoscaler-cluster-id" json:"autoscaler-cluster-id"`
// todo: remove this after AutoScaler is stable.
UseAutoScaler bool `toml:"use-autoscaler" json:"use-autoscaler"`

// TiDBMaxReuseChunk indicates max cached chunk num
TiDBMaxReuseChunk uint32 `toml:"tidb-max-reuse-chunk" json:"tidb-max-reuse-chunk"`
Expand Down Expand Up @@ -1012,6 +1014,7 @@ var defaultConf = Config{
TiFlashComputeAutoScalerAddr: tiflashcompute.DefAWSAutoScalerAddr,
IsTiFlashComputeFixedPool: false,
AutoScalerClusterID: "",
UseAutoScaler: true,
TiDBMaxReuseChunk: 64,
TiDBMaxReuseColumn: 256,
TiDBEnableExitCheck: false,
Expand Down Expand Up @@ -1348,7 +1351,7 @@ func (c *Config) Valid() error {
}

// Check tiflash_compute topo fetch is valid.
if c.DisaggregatedTiFlash {
if c.DisaggregatedTiFlash && c.UseAutoScaler {
if !tiflashcompute.IsValidAutoScalerConfig(c.TiFlashComputeAutoScalerType) {
return fmt.Errorf("invalid AutoScaler type, expect %s, %s or %s, got %s",
tiflashcompute.MockASStr, tiflashcompute.AWSASStr, tiflashcompute.GCPASStr, c.TiFlashComputeAutoScalerType)
Expand Down
4 changes: 4 additions & 0 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ autoscaler-addr = "tiflash-autoscale-lb.tiflash-autoscale.svc.cluster.local:8081
# Only meaningful when disaggregated-tiflash is true.
autoscaler-cluster-id = ""

# use-autoscaler indicates whether use AutoScaler or PD for tiflash_compute nodes, only meaningful when disaggregated-tiflash is true.
# Will remove this after AutoScaler is stable.
use-autoscaler = true

[log]
# Log level: debug, info, warn, error, fatal.
level = "info"
Expand Down
12 changes: 6 additions & 6 deletions ddl/attributes_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ PARTITION BY RANGE (c) (
func TestFlashbackTable(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -327,7 +327,7 @@ PARTITION BY RANGE (c) (
func TestDropTable(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -380,7 +380,7 @@ PARTITION BY RANGE (c) (
func TestCreateWithSameName(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -444,7 +444,7 @@ PARTITION BY RANGE (c) (
func TestPartition(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -504,7 +504,7 @@ PARTITION BY RANGE (c) (
func TestDropSchema(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand All @@ -530,7 +530,7 @@ PARTITION BY RANGE (c) (
func TestDefaultKeyword(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down
16 changes: 13 additions & 3 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3034,7 +3034,7 @@ func SetDirectPlacementOpt(placementSettings *model.PlacementSettings, placement
}

// SetDirectResourceGroupUnit tries to set the ResourceGroupSettings.
func SetDirectResourceGroupUnit(resourceGroupSettings *model.ResourceGroupSettings, typ ast.ResourceUnitType, stringVal string, uintVal uint64) error {
func SetDirectResourceGroupUnit(resourceGroupSettings *model.ResourceGroupSettings, typ ast.ResourceUnitType, stringVal string, uintVal uint64, boolValue bool) error {
switch typ {
case ast.ResourceRURate:
resourceGroupSettings.RURate = uintVal
Expand All @@ -3044,6 +3044,16 @@ func SetDirectResourceGroupUnit(resourceGroupSettings *model.ResourceGroupSettin
resourceGroupSettings.IOReadBandwidth = stringVal
case ast.ResourceUnitIOWriteBandwidth:
resourceGroupSettings.IOWriteBandwidth = stringVal
case ast.ResourceBurstableOpiton:
// Some about BurstLimit(b):
// - If b == 0, that means the limiter is unlimited capacity. default use in resource controller (burst with a rate within a unlimited capacity).
// - If b < 0, that means the limiter is unlimited capacity and fillrate(r) is ignored, can be seen as r == Inf (burst with a inf rate within a unlimited capacity).
// - If b > 0, that means the limiter is limited capacity. (current not used).
limit := int64(0)
if boolValue {
limit = -1
}
resourceGroupSettings.BurstLimit = limit
default:
return errors.Trace(errors.New("unknown resource unit type"))
}
Expand Down Expand Up @@ -7610,7 +7620,7 @@ func (d *ddl) CreateResourceGroup(ctx sessionctx.Context, stmt *ast.CreateResour
groupName := stmt.ResourceGroupName
groupInfo.Name = groupName
for _, opt := range stmt.ResourceGroupOptionList {
err := SetDirectResourceGroupUnit(groupInfo.ResourceGroupSettings, opt.Tp, opt.StrValue, opt.UintValue)
err := SetDirectResourceGroupUnit(groupInfo.ResourceGroupSettings, opt.Tp, opt.StrValue, opt.UintValue, opt.BoolValue)
if err != nil {
return err
}
Expand Down Expand Up @@ -7697,7 +7707,7 @@ func (d *ddl) DropResourceGroup(ctx sessionctx.Context, stmt *ast.DropResourceGr
func buildResourceGroup(oldGroup *model.ResourceGroupInfo, options []*ast.ResourceGroupOption) (*model.ResourceGroupInfo, error) {
groupInfo := &model.ResourceGroupInfo{Name: oldGroup.Name, ID: oldGroup.ID, ResourceGroupSettings: &model.ResourceGroupSettings{}}
for _, opt := range options {
err := SetDirectResourceGroupUnit(groupInfo.ResourceGroupSettings, opt.Tp, opt.StrValue, opt.UintValue)
err := SetDirectResourceGroupUnit(groupInfo.ResourceGroupSettings, opt.Tp, opt.StrValue, opt.UintValue, opt.BoolValue)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions ddl/indexmergetest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_test(
"merge_test.go",
],
flaky = True,
race = "on",
shard_count = 4,
deps = [
"//config",
Expand Down
3 changes: 3 additions & 0 deletions ddl/indexmergetest/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,7 @@ func TestAddIndexMergeConflictWithPessimistic(t *testing.T) {
callback := &callback.TestDDLCallback{Do: dom}

runPessimisticTxn := false
afterPessDML := make(chan struct{}, 1)
callback.OnJobRunBeforeExported = func(job *model.Job) {
if t.Failed() {
return
Expand All @@ -500,6 +501,7 @@ func TestAddIndexMergeConflictWithPessimistic(t *testing.T) {
assert.NoError(t, err)
_, err = tk2.Exec("update t set a = 3 where id = 1;")
assert.NoError(t, err)
afterPessDML <- struct{}{}
}
}
dom.DDL().SetHook(callback)
Expand All @@ -515,6 +517,7 @@ func TestAddIndexMergeConflictWithPessimistic(t *testing.T) {
case <-afterCommit:
require.Fail(t, "should be blocked by the pessimistic txn")
}
<-afterPessDML
tk2.MustExec("rollback;")
<-afterCommit
dom.DDL().SetHook(originHook)
Expand Down
Loading

0 comments on commit 95b2ed5

Please sign in to comment.