Skip to content

Commit

Permalink
removed upgrade changes and resolved conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
Alkaagr81 committed May 18, 2022
2 parents 0dea10f + 2d44ac9 commit 70d5032
Show file tree
Hide file tree
Showing 43 changed files with 673 additions and 253 deletions.
4 changes: 2 additions & 2 deletions br/cmd/br/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func newStreamCheckCommand() *cobra.Command {
Short: "get the metadata of log dir.",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
return streamCommand(cmd, task.StreamCheck)
return streamCommand(cmd, task.StreamMetadata)
},
}
return command
Expand All @@ -171,7 +171,7 @@ func streamCommand(command *cobra.Command, cmdName string) error {
}

switch cmdName {
case task.StreamCheck:
case task.StreamMetadata:
{
// do nothing.
}
Expand Down
12 changes: 9 additions & 3 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1452,14 +1452,16 @@ func (rc *Client) GetRebasedTables() map[UniqueTableName]bool {
func (rc *Client) PreCheckTableTiFlashReplica(
ctx context.Context,
tables []*metautil.Table,
skipTiflash bool,
) error {
tiFlashStores, err := conn.GetAllTiKVStores(ctx, rc.pdClient, conn.TiFlashOnly)
if err != nil {
return errors.Trace(err)
}
tiFlashStoreCount := len(tiFlashStores)
for _, table := range tables {
if table.Info.TiFlashReplica != nil && table.Info.TiFlashReplica.Count > uint64(tiFlashStoreCount) {
if skipTiflash ||
(table.Info.TiFlashReplica != nil && table.Info.TiFlashReplica.Count > uint64(tiFlashStoreCount)) {
// we cannot satisfy TiFlash replica in restore cluster. so we should
// set TiFlashReplica to unavailable in tableInfo, to avoid TiDB cannot sense TiFlash and make plan to TiFlash
// see details at https://github.com/pingcap/br/issues/931
Expand Down Expand Up @@ -1681,7 +1683,11 @@ func (rc *Client) RestoreKVFiles(
summary.CollectInt("File", 1)
log.Info("import files done", zap.String("name", file.Path), zap.Duration("take", time.Since(fileStart)))
}()
return rc.fileImporter.ImportKVFiles(ectx, file, rule, rc.restoreTS)
startTS := rc.startTS
if file.Cf == stream.DefaultCF {
startTS = rc.shiftStartTS
}
return rc.fileImporter.ImportKVFiles(ectx, file, rule, startTS, rc.restoreTS)
})
}
}
Expand Down Expand Up @@ -1948,7 +1954,7 @@ func (rc *Client) UpdateSchemaVersion(ctx context.Context) error {
func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
var e error
schemaVersion, e = t.GenSchemaVersion()
schemaVersion, e = t.GenSchemaVersions(128)
return e
},
); err != nil {
Expand Down
7 changes: 6 additions & 1 deletion br/pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func TestPreCheckTableTiFlashReplicas(t *testing.T) {
}
}
ctx := context.Background()
require.Nil(t, client.PreCheckTableTiFlashReplica(ctx, tables))
require.Nil(t, client.PreCheckTableTiFlashReplica(ctx, tables, false))

for i := 0; i < len(tables); i++ {
if i == 0 || i > 2 {
Expand All @@ -234,4 +234,9 @@ func TestPreCheckTableTiFlashReplicas(t *testing.T) {
require.Equal(t, i, obtainCount)
}
}

require.Nil(t, client.PreCheckTableTiFlashReplica(ctx, tables, true))
for i := 0; i < len(tables); i++ {
require.Nil(t, tables[i].Info.TiFlashReplica)
}
}
27 changes: 16 additions & 11 deletions br/pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,11 +335,12 @@ func (importer *FileImporter) ImportKVFileForRegion(
ctx context.Context,
file *backuppb.DataFileInfo,
rule *RewriteRules,
restoreTs uint64,
startTS uint64,
restoreTS uint64,
info *RegionInfo,
) RPCResult {
// Try to download file.
result := importer.downloadAndApplyKVFile(ctx, file, rule, info, restoreTs)
result := importer.downloadAndApplyKVFile(ctx, file, rule, info, startTS, restoreTS)
if !result.OK() {
errDownload := result.Err
for _, e := range multierr.Errors(errDownload) {
Expand Down Expand Up @@ -380,11 +381,13 @@ func (importer *FileImporter) ClearFiles(ctx context.Context, pdClient pd.Client
return nil
}

// ImportKVFiles restores the kv events.
func (importer *FileImporter) ImportKVFiles(
ctx context.Context,
file *backuppb.DataFileInfo,
rule *RewriteRules,
restoreTs uint64,
startTS uint64,
restoreTS uint64,
) error {
startTime := time.Now()
log.Debug("import kv files", zap.String("file", file.Path))
Expand All @@ -401,7 +404,7 @@ func (importer *FileImporter) ImportKVFiles(
rs := utils.InitialRetryState(32, 100*time.Millisecond, 8*time.Second)
ctl := OverRegionsInRange(startKey, endKey, importer.metaClient, &rs)
err = ctl.Run(ctx, func(ctx context.Context, r *RegionInfo) RPCResult {
return importer.ImportKVFileForRegion(ctx, file, rule, restoreTs, r)
return importer.ImportKVFileForRegion(ctx, file, rule, startTS, restoreTS, r)
})

log.Debug("download and apply file done",
Expand Down Expand Up @@ -801,7 +804,8 @@ func (importer *FileImporter) downloadAndApplyKVFile(
file *backuppb.DataFileInfo,
rules *RewriteRules,
regionInfo *RegionInfo,
restoreTs uint64,
startTS uint64,
restoreTS uint64,
) RPCResult {
leader := regionInfo.Leader
if leader == nil {
Expand All @@ -823,12 +827,13 @@ func (importer *FileImporter) downloadAndApplyKVFile(
Name: file.Path,
Cf: file.Cf,
// TODO fill the length
Length: 0,
IsDelete: file.Type == backuppb.FileType_Delete,
RestoreTs: restoreTs,
StartKey: regionInfo.Region.GetStartKey(),
EndKey: regionInfo.Region.GetEndKey(),
Sha256: file.GetSha256(),
Length: 0,
IsDelete: file.Type == backuppb.FileType_Delete,
StartSnapshotTs: startTS,
RestoreTs: restoreTS,
StartKey: regionInfo.Region.GetStartKey(),
EndKey: regionInfo.Region.GetEndKey(),
Sha256: file.GetSha256(),
}

reqCtx := &kvrpcpb.Context{
Expand Down
2 changes: 0 additions & 2 deletions br/pkg/restore/stream_metas.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,6 @@ func swapAndOverrideFile(ctx context.Context, s storage.ExternalStorage, path st
const (
// TruncateSafePointFileName is the filename that the ts(the log have been truncated) is saved into.
TruncateSafePointFileName = "v1_stream_trancate_safepoint.txt"
// GlobalCheckpointFileName is the filename that the ts(the global checkpoint) is saved into.
GlobalCheckpointFileName = "v1_stream_global_checkpoint.txt"
)

// GetTSFromFile gets the current truncate safepoint.
Expand Down
39 changes: 35 additions & 4 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"strings"
"time"

alicred "github.com/aliyun/alibaba-cloud-sdk-go/sdk/auth/credentials"
aliproviders "github.com/aliyun/alibaba-cloud-sdk-go/sdk/auth/credentials/providers"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/client"
Expand Down Expand Up @@ -52,6 +54,8 @@ const (

// TODO make this configurable, 5 mb is a good minimum size but on low latency/high bandwidth network you can go a lot bigger
hardcodedS3ChunkSize = 5 * 1024 * 1024
// to check the cloud type by endpoint tag.
domainAliyun = "aliyuncs.com"
)

var permissionCheckFn = map[Permission]func(*s3.S3, *backuppb.S3) error{
Expand Down Expand Up @@ -241,7 +245,34 @@ func NewS3Storage( // revive:disable-line:flag-parameter
})
}

func newS3Storage(backend *backuppb.S3, opts *ExternalStorageOptions) (*S3Storage, error) {
// auto access without ak / sk.
func autoNewCred(qs *backuppb.S3) (cred *credentials.Credentials, err error) {
if qs.AccessKey != "" && qs.SecretAccessKey != "" {
return credentials.NewStaticCredentials(qs.AccessKey, qs.SecretAccessKey, ""), nil
}
endpoint := qs.Endpoint
// if endpoint is empty,return no error and run default(aws) follow.
if endpoint == "" {
return nil, nil
}
// if it Contains 'aliyuncs', fetch the sts token.
if strings.Contains(endpoint, domainAliyun) {
return createOssRamCred()
}
// other case ,return no error and run default(aws) follow.
return nil, nil
}

func createOssRamCred() (*credentials.Credentials, error) {
cred, err := aliproviders.NewInstanceMetadataProvider().Retrieve()
if err != nil {
return nil, errors.Annotate(err, "Alibaba RAM Provider Retrieve")
}
ncred := cred.(*alicred.StsTokenCredential)
return credentials.NewStaticCredentials(ncred.AccessKeyId, ncred.AccessKeySecret, ncred.AccessKeyStsToken), nil
}

func newS3Storage(backend *backuppb.S3, opts *ExternalStorageOptions) (obj *S3Storage, errRet error) {
qs := *backend
awsConfig := aws.NewConfig().
WithS3ForcePathStyle(qs.ForcePathStyle).
Expand All @@ -253,9 +284,9 @@ func newS3Storage(backend *backuppb.S3, opts *ExternalStorageOptions) (*S3Storag
if opts.HTTPClient != nil {
awsConfig.WithHTTPClient(opts.HTTPClient)
}
var cred *credentials.Credentials
if qs.AccessKey != "" && qs.SecretAccessKey != "" {
cred = credentials.NewStaticCredentials(qs.AccessKey, qs.SecretAccessKey, "")
cred, err := autoNewCred(&qs)
if err != nil {
return nil, errors.Trace(err)
}
if cred != nil {
awsConfig.WithCredentials(cred)
Expand Down
17 changes: 10 additions & 7 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,9 @@ type RestoreConfig struct {
FullBackupStorage string `json:"full-backup-storage" toml:"full-backup-storage"`

// [startTs, RestoreTS] is used to `restore log` from StartTS to RestoreTS.
StartTS uint64 `json:"start-ts" toml:"start-ts"`
RestoreTS uint64 `json:"restore-ts" toml:"restore-ts"`
StartTS uint64 `json:"start-ts" toml:"start-ts"`
RestoreTS uint64 `json:"restore-ts" toml:"restore-ts"`
skipTiflash bool `json:"-" toml:"-"`
}

// DefineRestoreFlags defines common flags for the restore tidb command.
Expand Down Expand Up @@ -488,7 +489,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
ddlJobs := restore.FilterDDLJobs(client.GetDDLJobs(), tables)
ddlJobs = restore.FilterDDLJobByRules(ddlJobs, restore.DDLJobBlockListRule)

err = client.PreCheckTableTiFlashReplica(ctx, tables)
err = client.PreCheckTableTiFlashReplica(ctx, tables, cfg.skipTiflash)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -559,7 +560,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
summary.CollectInt("restore ranges", rangeSize)
log.Info("range and file prepared", zap.Int("file count", len(files)), zap.Int("range count", rangeSize))

restoreSchedulers, err := restorePreWork(ctx, client, mgr)
restoreSchedulers, err := restorePreWork(ctx, client, mgr, true)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -688,13 +689,15 @@ func filterRestoreFiles(

// restorePreWork executes some prepare work before restore.
// TODO make this function returns a restore post work.
func restorePreWork(ctx context.Context, client *restore.Client, mgr *conn.Mgr) (pdutil.UndoFunc, error) {
func restorePreWork(ctx context.Context, client *restore.Client, mgr *conn.Mgr, switchToImport bool) (pdutil.UndoFunc, error) {
if client.IsOnline() {
return pdutil.Nop, nil
}

// Switch TiKV cluster to import mode (adjust rocksdb configuration).
client.SwitchToImportMode(ctx)
if switchToImport {
// Switch TiKV cluster to import mode (adjust rocksdb configuration).
client.SwitchToImportMode(ctx)
}

return mgr.RemoveSchedulers(ctx)
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/restore_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR
return errors.Trace(err)
}

restoreSchedulers, err := restorePreWork(ctx, client, mgr)
restoreSchedulers, err := restorePreWork(ctx, client, mgr, true)
if err != nil {
return errors.Trace(err)
}
Expand Down
Loading

0 comments on commit 70d5032

Please sign in to comment.