Skip to content

Commit

Permalink
merge upstream
Browse files Browse the repository at this point in the history
Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>
  • Loading branch information
CabinfeverB committed Oct 17, 2023
2 parents 484596f + 91a8023 commit 2e78eb4
Show file tree
Hide file tree
Showing 75 changed files with 2,445 additions and 797 deletions.
30 changes: 15 additions & 15 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -293,15 +293,15 @@ endif
# Usage:
# make bench-daily TO=/path/to/file.json
bench-daily:
go test -tags intest github.com/pingcap/tidb/session -run TestBenchDaily -bench Ignore --outfile bench_daily.json
go test -tags intest github.com/pingcap/tidb/executor -run TestBenchDaily -bench Ignore --outfile bench_daily.json
go test -tags intest github.com/pingcap/tidb/executor/test/splittest -run TestBenchDaily -bench Ignore --outfile bench_daily.json
go test -tags intest github.com/pingcap/tidb/tablecodec -run TestBenchDaily -bench Ignore --outfile bench_daily.json
go test -tags intest github.com/pingcap/tidb/expression -run TestBenchDaily -bench Ignore --outfile bench_daily.json
go test -tags intest github.com/pingcap/tidb/util/rowcodec -run TestBenchDaily -bench Ignore --outfile bench_daily.json
go test -tags intest github.com/pingcap/tidb/util/codec -run TestBenchDaily -bench Ignore --outfile bench_daily.json
go test -tags intest github.com/pingcap/tidb/distsql -run TestBenchDaily -bench Ignore --outfile bench_daily.json
go test -tags intest github.com/pingcap/tidb/util/benchdaily -run TestBenchDaily -bench Ignore \
go test -tags intest github.com/pingcap/tidb/pkg/session -run TestBenchDaily -bench Ignore --outfile bench_daily.json
go test -tags intest github.com/pingcap/tidb/pkg/executor -run TestBenchDaily -bench Ignore --outfile bench_daily.json
go test -tags intest github.com/pingcap/tidb/pkg/executor/test/splittest -run TestBenchDaily -bench Ignore --outfile bench_daily.json
go test -tags intest github.com/pingcap/tidb/pkg/tablecodec -run TestBenchDaily -bench Ignore --outfile bench_daily.json
go test -tags intest github.com/pingcap/tidb/pkg/expression -run TestBenchDaily -bench Ignore --outfile bench_daily.json
go test -tags intest github.com/pingcap/tidb/pkg/util/rowcodec -run TestBenchDaily -bench Ignore --outfile bench_daily.json
go test -tags intest github.com/pingcap/tidb/pkg/util/codec -run TestBenchDaily -bench Ignore --outfile bench_daily.json
go test -tags intest github.com/pingcap/tidb/pkg/distsql -run TestBenchDaily -bench Ignore --outfile bench_daily.json
go test -tags intest github.com/pingcap/tidb/pkg/util/benchdaily -run TestBenchDaily -bench Ignore \
-date `git log -n1 --date=unix --pretty=format:%cd` \
-commit `git log -n1 --pretty=format:%h` \
-outfile $(TO)
Expand Down Expand Up @@ -384,12 +384,12 @@ mock_lightning: tools/bin/mockgen
tools/bin/mockgen -package mock github.com/pingcap/tidb/br/pkg/utils TaskRegister > br/pkg/mock/task_register.go

gen_mock: tools/bin/mockgen
tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/scheduler TaskTable,Pool,Scheduler,Extension > disttask/framework/mock/scheduler_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/dispatcher Dispatcher,CleanUpRoutine > disttask/framework/mock/dispatcher_mock.go
tools/bin/mockgen -package execute github.com/pingcap/tidb/disttask/framework/scheduler/execute SubtaskExecutor > disttask/framework/mock/execute/execute_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/importinto MiniTaskExecutor > disttask/importinto/mock/import_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/planner LogicalPlan,PipelineSpec > disttask/framework/mock/plan_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/util/sqlexec RestrictedSQLExecutor > util/sqlexec/mock/restricted_sql_executor_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/scheduler TaskTable,Pool,Scheduler,Extension > pkg/disttask/framework/mock/scheduler_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/dispatcher Dispatcher,CleanUpRoutine,TaskManager > pkg/disttask/framework/mock/dispatcher_mock.go
tools/bin/mockgen -package execute github.com/pingcap/tidb/pkg/disttask/framework/scheduler/execute SubtaskExecutor > pkg/disttask/framework/mock/execute/execute_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/importinto MiniTaskExecutor > pkg/disttask/importinto/mock/import_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/disttask/framework/planner LogicalPlan,PipelineSpec > pkg/disttask/framework/mock/plan_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/util/sqlexec RestrictedSQLExecutor > pkg/util/sqlexec/mock/restricted_sql_executor_mock.go

# There is no FreeBSD environment for GitHub actions. So cross-compile on Linux
# but that doesn't work with CGO_ENABLED=1, so disable cgo. The reason to have
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_library(
"//br/pkg/membuf",
"//br/pkg/storage",
"//pkg/kv",
"//pkg/metrics",
"//pkg/sessionctx/variable",
"//pkg/util/hack",
"//pkg/util/logutil",
Expand Down
10 changes: 8 additions & 2 deletions br/pkg/lightning/backend/external/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/atomic"
"go.uber.org/zap"
Expand Down Expand Up @@ -197,12 +198,13 @@ func (e *Engine) loadIngestData(
hex.EncodeToString(start))
}

now := time.Now()
startTs := time.Now()
keys := make([][]byte, 0, 1024)
values := make([][]byte, 0, 1024)
memBuf := e.bufPool.NewBuffer()
cnt := 0
size := 0
totalSize := 0
largeRegion := e.regionSplitSize > 2*int64(config.SplitRegionSize)
ret := make([]common.DataAndRange, 0, 1)
curStart := start
Expand All @@ -215,6 +217,7 @@ func (e *Engine) loadIngestData(
values = append(values, memBuf.AddBytes(v))
cnt++
size += len(k) + len(v)
totalSize += len(k) + len(v)
}

for iter.Next() {
Expand All @@ -241,13 +244,16 @@ func (e *Engine) loadIngestData(
values = append(values, memBuf.AddBytes(v))
cnt++
size += len(k) + len(v)
totalSize += len(k) + len(v)
}
if iter.Error() != nil {
return nil, errors.Trace(iter.Error())
}

metrics.GlobalSortReadFromCloudStorageRate.WithLabelValues("read_and_sort").Observe(float64(totalSize) / 1024.0 / 1024.0 / time.Since(startTs).Seconds())
metrics.GlobalSortReadFromCloudStorageDuration.WithLabelValues("read_and_sort").Observe(time.Since(startTs).Seconds())
logutil.Logger(ctx).Info("load data from external storage",
zap.Duration("cost time", time.Since(now)),
zap.Duration("cost time", time.Since(startTs)),
zap.Int("iterated count", cnt))
ret = append(ret, common.DataAndRange{
Data: e.buildIngestData(keys, values, memBuf),
Expand Down
21 changes: 13 additions & 8 deletions br/pkg/lightning/backend/external/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/storage"
tidbkv "github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/size"
Expand Down Expand Up @@ -385,7 +386,10 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) {
}

ts := time.Now()
var savedBytes uint64
savedBytes := w.batchSize

startTs := time.Now()
var startTsForWrite time.Time

defer func() {
w.currentSeq++
Expand All @@ -407,6 +411,10 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) {
zap.Duration("time", time.Since(ts)),
zap.Uint64("bytes", savedBytes),
zap.Any("rate", float64(savedBytes)/1024.0/1024.0/time.Since(ts).Seconds()))
metrics.GlobalSortWriteToCloudStorageDuration.WithLabelValues("write").Observe(time.Since(startTsForWrite).Seconds())
metrics.GlobalSortWriteToCloudStorageRate.WithLabelValues("write").Observe(float64(savedBytes) / 1024.0 / 1024.0 / time.Since(startTsForWrite).Seconds())
metrics.GlobalSortWriteToCloudStorageDuration.WithLabelValues("sort_and_write").Observe(time.Since(startTs).Seconds())
metrics.GlobalSortWriteToCloudStorageRate.WithLabelValues("sort_and_write").Observe(float64(savedBytes) / 1024.0 / 1024.0 / time.Since(startTs).Seconds())
}()

sorty.MaxGor = min(8, uint64(variable.GetDDLReorgWorkerCounter()))
Expand All @@ -420,6 +428,10 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) {
return false
})

startTsForWrite = time.Now()
metrics.GlobalSortWriteToCloudStorageDuration.WithLabelValues("sort").Observe(time.Since(startTs).Seconds())
metrics.GlobalSortWriteToCloudStorageRate.WithLabelValues("sort").Observe(float64(savedBytes) / 1024.0 / 1024.0 / time.Since(startTs).Seconds())

w.kvStore, err = NewKeyValueStore(ctx, dataWriter, w.rc)
if err != nil {
return err
Expand Down Expand Up @@ -488,9 +500,6 @@ func (w *Writer) createStorageWriter(ctx context.Context) (

// EngineWriter implements backend.EngineWriter interface.
type EngineWriter struct {
// Only 1 writer is used for some kv group(data or some index), no matter
// how many routines are encoding data, so need to sync write to it.
sync.Mutex
w *Writer
}

Expand All @@ -501,8 +510,6 @@ func NewEngineWriter(w *Writer) *EngineWriter {

// AppendRows implements backend.EngineWriter interface.
func (e *EngineWriter) AppendRows(ctx context.Context, _ []string, rows encode.Rows) error {
e.Lock()
defer e.Unlock()
kvs := kv.Rows2KvPairs(rows)
if len(kvs) == 0 {
return nil
Expand All @@ -524,7 +531,5 @@ func (e *EngineWriter) IsSynced() bool {

// Close implements backend.EngineWriter interface.
func (e *EngineWriter) Close(ctx context.Context) (backend.ChunkFlushStatus, error) {
e.Lock()
defer e.Unlock()
return nil, e.w.Close(ctx)
}
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ go_library(
"//pkg/distsql",
"//pkg/infoschema",
"//pkg/kv",
"//pkg/metrics",
"//pkg/parser/model",
"//pkg/parser/mysql",
"//pkg/sessionctx/variable",
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/store/pdtypes"
"github.com/pingcap/tidb/pkg/tablecodec"
Expand Down Expand Up @@ -1322,6 +1323,7 @@ func (local *Backend) startWorker(
jobInCh, jobOutCh chan *regionJob,
jobWg *sync.WaitGroup,
) error {
metrics.GlobalSortIngestWorkerCnt.WithLabelValues("execute job").Set(0)
for {
select {
case <-ctx.Done():
Expand All @@ -1333,7 +1335,9 @@ func (local *Backend) startWorker(
return nil
}

metrics.GlobalSortIngestWorkerCnt.WithLabelValues("execute job").Inc()
err := local.executeJob(ctx, job)
metrics.GlobalSortIngestWorkerCnt.WithLabelValues("execute job").Dec()
switch job.stage {
case regionScanned, wrote, ingested:
jobOutCh <- job
Expand Down
1 change: 1 addition & 0 deletions br/pkg/utils/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var retryableServerError = []string{
"internalerror",
"not read from or written to within the timeout period",
"<code>requesttimeout</code>",
"<code>invalidpart</code>",
}

// RetryableFunc presents a retryable operation.
Expand Down
2 changes: 1 addition & 1 deletion br/tests/br_full/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ test_log="${TEST_DIR}/${DB}_test.log"
error_str="not read from or written to within the timeout period"
unset BR_LOG_TO_TERM

export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/backup/backup-storage-error=1*return(\"connection refused\")->1*return(\"InternalError\");github.com/pingcap/tidb/br/pkg/backup/backup-timeout-error=1*return(\"<Code>RequestTimeout</Code>\")->1*return(\"not read from or written to within the timeout period\")"
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/backup/backup-storage-error=1*return(\"connection refused\")->1*return(\"InternalError\");github.com/pingcap/tidb/br/pkg/backup/backup-timeout-error=1*return(\"<Code>RequestTimeout</Code>\")->1*return(\"not read from or written to within the timeout period\")->1*return(\"<Code>InvalidPart</Code>\")""
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-lz4" --concurrency 4 --compression lz4 --log-file $test_log
export GO_FAILPOINTS=""
size_lz4=$(du -d 0 $TEST_DIR/$DB-lz4 | awk '{print $1}')
Expand Down
1 change: 1 addition & 0 deletions build/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ nogo(
"//build/linter/asciicheck",
"//build/linter/bodyclose",
"//build/linter/bootstrap",
"//build/linter/constructor",
"//build/linter/deferrecover",
"//build/linter/durationcheck",
"//build/linter/etcdconfig",
Expand Down
24 changes: 24 additions & 0 deletions build/linter/constructor/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "constructor",
srcs = ["analyzer.go"],
importpath = "github.com/pingcap/tidb/build/linter/constructor",
visibility = ["//visibility:public"],
deps = [
"@com_github_fatih_structtag//:structtag",
"@org_golang_x_tools//go/analysis",
"@org_golang_x_tools//go/ast/inspector",
],
)

go_test(
name = "constructor_test",
timeout = "short",
srcs = ["analyzer_test.go"],
flaky = True,
deps = [
":constructor",
"@org_golang_x_tools//go/analysis/analysistest",
],
)
Loading

0 comments on commit 2e78eb4

Please sign in to comment.