Skip to content

Commit

Permalink
Merge branch 'master' into nonbuffer-cop-chan
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored Jul 13, 2022
2 parents 5970be6 + 5e8f09b commit 11068ad
Show file tree
Hide file tree
Showing 264 changed files with 22,839 additions and 12,734 deletions.
6 changes: 6 additions & 0 deletions .bazelrc
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
startup --host_jvm_args=-Xmx8g
startup --unlimit_coredumps

build --java_language_version=17
build --java_runtime_version=17
build --tool_java_language_version=17
build --tool_java_runtime_version=17
build --experimental_remote_cache_compression

run --color=yes
build:release --workspace_status_command=./build/print-workspace-status.sh --stamp
build:release --config=ci
Expand Down
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
/sessionctx/variable @pingcap/tidb-configuration-reviewer
/config/config.toml.example @pingcap/tidb-configuration-reviewer
/session/bootstrap.go @pingcap/tidb-configuration-reviewer
/telemetry/ @pingcap/telemetry-reviewer
1 change: 1 addition & 0 deletions .github/licenserc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ header:
- 'dumpling/'
- 'tidb-binlog/driver/example'
- 'tidb-binlog/proto/go-binlog/secondary_binlog.pb.go'
- '**/*.sql'
comment: on-failure
2 changes: 1 addition & 1 deletion .github/workflows/license-checker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
steps:
- uses: actions/checkout@v2
- name: Check License Header
uses: apache/skywalking-eyes@main
uses: apache/skywalking-eyes@v0.3.0
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
Expand Down
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ dumpling_integration_test: dumpling_bins failpoint-enable build_dumpling
dumpling_bins:
@which bin/tidb-server
@which bin/minio
@which bin/mc
@which bin/tidb-lightning
@which bin/sync_diff_inspector

Expand Down Expand Up @@ -440,7 +441,7 @@ bazel_coverage_test: failpoint-enable bazel_ci_prepare

bazel_build: bazel_ci_prepare
mkdir -p bin
bazel --output_user_root=/home/jenkins/.tidb/tmp build -k --config=ci //tidb-server/... //br/cmd/... //cmd/... --//build:with_nogo_flag=true
bazel --output_user_root=/home/jenkins/.tidb/tmp build -k --config=ci //tidb-server/... //br/cmd/... //cmd/... //util/... //dumpling/cmd/... //tidb-binlog/... --//build:with_nogo_flag=true
cp bazel-out/k8-fastbuild/bin/tidb-server/tidb-server_/tidb-server ./bin
cp bazel-out/k8-fastbuild/bin/cmd/importer/importer_/importer ./bin
cp bazel-out/k8-fastbuild/bin/tidb-server/tidb-server-check_/tidb-server-check ./bin
Expand Down
2 changes: 1 addition & 1 deletion br/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ mysql --host 127.0.0.1 -P4000 -E -e "SELECT COUNT(*) FROM test.order_line" -uroo

## Compatibility test

See [COMPATBILE_TEST](./COMPATIBLE_TEST.md)
See [COMPATBILE_TEST](./COMPATIBILITY_TEST.md)

## Contributing

Expand Down
1 change: 1 addition & 0 deletions br/cmd/br/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"//br/pkg/redact",
"//br/pkg/restore",
"//br/pkg/rtree",
"//br/pkg/streamhelper/config",
"//br/pkg/summary",
"//br/pkg/task",
"//br/pkg/trace",
Expand Down
24 changes: 24 additions & 0 deletions br/cmd/br/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package main

import (
"github.com/pingcap/errors"
advancercfg "github.com/pingcap/tidb/br/pkg/streamhelper/config"
"github.com/pingcap/tidb/br/pkg/task"
"github.com/pingcap/tidb/br/pkg/trace"
"github.com/pingcap/tidb/br/pkg/utils"
Expand Down Expand Up @@ -49,6 +50,7 @@ func NewStreamCommand() *cobra.Command {
newStreamStatusCommand(),
newStreamTruncateCommand(),
newStreamCheckCommand(),
newStreamAdvancerCommand(),
)
command.SetHelpFunc(func(command *cobra.Command, strings []string) {
task.HiddenFlagsForStream(command.Root().PersistentFlags())
Expand Down Expand Up @@ -157,6 +159,21 @@ func newStreamCheckCommand() *cobra.Command {
return command
}

func newStreamAdvancerCommand() *cobra.Command {
command := &cobra.Command{
Use: "advancer",
Short: "Start a central worker for advancing the checkpoint. (only for debuging, this subcommand should be integrated to TiDB)",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
return streamCommand(cmd, task.StreamCtl)
},
Hidden: true,
}
task.DefineStreamCommonFlags(command.Flags())
advancercfg.DefineFlagsForCheckpointAdvancerConfig(command.Flags())
return command
}

func streamCommand(command *cobra.Command, cmdName string) error {
var cfg task.StreamConfig
var err error
Expand Down Expand Up @@ -192,6 +209,13 @@ func streamCommand(command *cobra.Command, cmdName string) error {
if err = cfg.ParseStreamPauseFromFlags(command.Flags()); err != nil {
return errors.Trace(err)
}
case task.StreamCtl:
if err = cfg.ParseStreamCommonFromFlags(command.Flags()); err != nil {
return errors.Trace(err)
}
if err = cfg.AdvancerCfg.GetFromFlags(command.Flags()); err != nil {
return errors.Trace(err)
}
default:
if err = cfg.ParseStreamCommonFromFlags(command.Flags()); err != nil {
return errors.Trace(err)
Expand Down
19 changes: 12 additions & 7 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,9 +533,11 @@ func (bc *Client) BackupRanges(
metaWriter *metautil.MetaWriter,
progressCallBack func(ProgressUnit),
) error {
log.Info("Backup Ranges Started", rtree.ZapRanges(ranges))
init := time.Now()

defer func() {
log.Info("Backup Ranges", zap.Duration("take", time.Since(init)))
log.Info("Backup Ranges Completed", zap.Duration("take", time.Since(init)))
}()

if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
Expand Down Expand Up @@ -580,13 +582,15 @@ func (bc *Client) BackupRange(
start := time.Now()
defer func() {
elapsed := time.Since(start)
logutil.CL(ctx).Info("backup range finished", zap.Duration("take", elapsed))
logutil.CL(ctx).Info("backup range completed",
logutil.Key("startKey", req.StartKey), logutil.Key("endKey", req.EndKey),
zap.Duration("take", elapsed))
key := "range start:" + hex.EncodeToString(req.StartKey) + " end:" + hex.EncodeToString(req.EndKey)
if err != nil {
summary.CollectFailureUnit(key, err)
}
}()
logutil.CL(ctx).Info("backup started",
logutil.CL(ctx).Info("backup range started",
logutil.Key("startKey", req.StartKey), logutil.Key("endKey", req.EndKey),
zap.Uint64("rateLimit", req.RateLimit),
zap.Uint32("concurrency", req.Concurrency))
Expand All @@ -597,12 +601,13 @@ func (bc *Client) BackupRange(
return errors.Trace(err)
}

logutil.CL(ctx).Info("backup push down started")
push := newPushDown(bc.mgr, len(allStores))
results, err := push.pushBackup(ctx, req, allStores, progressCallBack)
if err != nil {
return errors.Trace(err)
}
logutil.CL(ctx).Info("finish backup push down", zap.Int("small-range-count", results.Len()))
logutil.CL(ctx).Info("backup push down completed", zap.Int("small-range-count", results.Len()))

// Find and backup remaining ranges.
// TODO: test fine grained backup.
Expand All @@ -619,9 +624,9 @@ func (bc *Client) BackupRange(
logutil.Key("endKey", req.EndKey),
zap.String("cf", req.Cf))
} else {
logutil.CL(ctx).Info("time range backed up",
zap.Reflect("StartVersion", req.StartVersion),
zap.Reflect("EndVersion", req.EndVersion))
logutil.CL(ctx).Info("transactional range backup completed",
zap.Reflect("StartTS", req.StartVersion),
zap.Reflect("EndTS", req.EndVersion))
}

var ascendErr error
Expand Down
43 changes: 0 additions & 43 deletions br/pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,49 +250,6 @@ func TestOnBackupRegionErrorResponse(t *testing.T) {
}
}

func TestSendCreds(t *testing.T) {
s, clean := createBackupSuite(t)
defer clean()

accessKey := "ab"
secretAccessKey := "cd"
backendOpt := storage.BackendOptions{
S3: storage.S3BackendOptions{
AccessKey: accessKey,
SecretAccessKey: secretAccessKey,
},
}
backend, err := storage.ParseBackend("s3://bucket/prefix/", &backendOpt)
require.NoError(t, err)
opts := &storage.ExternalStorageOptions{
SendCredentials: true,
}
_, err = storage.New(s.ctx, backend, opts)
require.NoError(t, err)
access_key := backend.GetS3().AccessKey
require.Equal(t, "ab", access_key)
secret_access_key := backend.GetS3().SecretAccessKey
require.Equal(t, "cd", secret_access_key)

backendOpt = storage.BackendOptions{
S3: storage.S3BackendOptions{
AccessKey: accessKey,
SecretAccessKey: secretAccessKey,
},
}
backend, err = storage.ParseBackend("s3://bucket/prefix/", &backendOpt)
require.NoError(t, err)
opts = &storage.ExternalStorageOptions{
SendCredentials: false,
}
_, err = storage.New(s.ctx, backend, opts)
require.NoError(t, err)
access_key = backend.GetS3().AccessKey
require.Equal(t, "", access_key)
secret_access_key = backend.GetS3().SecretAccessKey
require.Equal(t, "", secret_access_key)
}

func TestSkipUnsupportedDDLJob(t *testing.T) {
s, clean := createBackupSuite(t)
defer clean()
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/backup/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,13 @@ func (ss *Schemas) BackupSchemas(
)

if !skipChecksum {
logger.Info("table checksum start")
logger.Info("Calculate table checksum start")
start := time.Now()
err := schema.calculateChecksum(ectx, store.GetClient(), backupTS, copConcurrency)
if err != nil {
return errors.Trace(err)
}
logger.Info("table checksum finished",
logger.Info("Calculate table checksum completed",
zap.Uint64("Crc64Xor", schema.crc64xor),
zap.Uint64("TotalKvs", schema.totalKvs),
zap.Uint64("TotalBytes", schema.totalBytes),
Expand Down Expand Up @@ -143,7 +143,7 @@ func (ss *Schemas) BackupSchemas(
if err := errg.Wait(); err != nil {
return errors.Trace(err)
}
log.Info("backup checksum", zap.Duration("take", time.Since(startAll)))
log.Info("Backup calculated table checksum into metas", zap.Duration("take", time.Since(startAll)))
summary.CollectDuration("backup checksum", time.Since(startAll))
return metaWriter.FinishWriteMetas(ctx, op)
}
Expand Down
3 changes: 1 addition & 2 deletions br/pkg/conn/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@ go_library(
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/logbackuppb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_log//:log",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//txnkv/txnlock",
"@com_github_tikv_pd_client//:client",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//backoff",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//credentials",
"@org_golang_google_grpc//keepalive",
"@org_golang_google_grpc//status",
"@org_uber_go_zap//:zap",
Expand Down
Loading

0 comments on commit 11068ad

Please sign in to comment.