Skip to content

Commit

Permalink
Merge branch 'master' into fix_bug
Browse files Browse the repository at this point in the history
  • Loading branch information
hehechen authored Nov 23, 2022
2 parents ecb7c1c + cf49466 commit 7328b3d
Show file tree
Hide file tree
Showing 114 changed files with 16,205 additions and 10,420 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ bazel-out
bazel-testlogs
bazel-tidb
.ijwb/
/oom_record/
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2915,8 +2915,8 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sum = "h1:ho5XUD8DVCnkpEj8oiTR57FXDTXnH6znyLe0gyrtzKk=",
version = "v0.0.0-20221103025916-e7e21f0e9cd9",
sum = "h1:HyWSOT/drBEtfXK2HLkWWR8dCO+rcf7OiRDRhBxAfU4=",
version = "v0.0.0-20221114102356-3debb6820e46",
)
go_repository(
name = "com_github_pingcap_log",
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,14 @@ type Client struct {
}

// NewBackupClient returns a new backup client.
func NewBackupClient(ctx context.Context, mgr ClientMgr) (*Client, error) {
func NewBackupClient(ctx context.Context, mgr ClientMgr) *Client {
log.Info("new backup client")
pdClient := mgr.GetPDClient()
clusterID := pdClient.GetClusterID(ctx)
return &Client{
clusterID: clusterID,
mgr: mgr,
}, nil
}
}

// GetTS gets a new timestamp from PD.
Expand Down
3 changes: 1 addition & 2 deletions br/pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ func createBackupSuite(t *testing.T) *testBackup {
mockMgr := &conn.Mgr{PdController: &pdutil.PdController{}}
mockMgr.SetPDClient(s.mockPDClient)
mockMgr.SetHTTP([]string{"test"}, nil)
s.backupClient, err = backup.NewBackupClient(s.ctx, mockMgr)
require.NoError(t, err)
s.backupClient = backup.NewBackupClient(s.ctx, mockMgr)

s.cluster, err = mock.NewCluster()
require.NoError(t, err)
Expand Down
6 changes: 6 additions & 0 deletions br/pkg/errors/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,9 @@ func TestIsContextCanceled(t *testing.T) {
require.True(t, berrors.IsContextCanceled(&url.Error{Err: context.Canceled}))
require.True(t, berrors.IsContextCanceled(&url.Error{Err: context.DeadlineExceeded}))
}

func TestEqual(t *testing.T) {
err := errors.Annotate(berrors.ErrPDBatchScanRegion, "test error equla")
r := berrors.ErrPDBatchScanRegion.Equal(err)
require.True(t, r)
}
14 changes: 10 additions & 4 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,8 @@ func (gs *tidbSession) showCreatePlacementPolicy(policy *model.PolicyInfo) strin

// mockSession is used for test.
type mockSession struct {
se session.Session
se session.Session
globalVars map[string]string
}

// GetSessionCtx implements glue.Glue
Expand Down Expand Up @@ -368,12 +369,16 @@ func (s *mockSession) Close() {

// GetGlobalVariables implements glue.Session.
func (s *mockSession) GetGlobalVariable(name string) (string, error) {
return "true", nil
if ret, ok := s.globalVars[name]; ok {
return ret, nil
}
return "True", nil
}

// MockGlue only used for test
type MockGlue struct {
se session.Session
se session.Session
GlobalVars map[string]string
}

func (m *MockGlue) SetSession(se session.Session) {
Expand All @@ -388,7 +393,8 @@ func (*MockGlue) GetDomain(store kv.Storage) (*domain.Domain, error) {
// CreateSession implements glue.Glue.
func (m *MockGlue) CreateSession(store kv.Storage) (glue.Session, error) {
glueSession := &mockSession{
se: m.se,
se: m.se,
globalVars: m.GlobalVars,
}
return glueSession, nil
}
Expand Down
13 changes: 11 additions & 2 deletions br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,16 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti
}
})

failpoint.Inject("PrintStatus", func() {
defer func() {
finished, total := l.Status()
o.logger.Warn("PrintStatus Failpoint",
zap.Int64("finished", finished),
zap.Int64("total", total),
zap.Bool("equal", finished == total))
}()
})

if err := taskCfg.TiDB.Security.RegisterMySQL(); err != nil {
return common.ErrInvalidTLSConfig.Wrap(err)
}
Expand Down Expand Up @@ -504,8 +514,6 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti
dbMetas := mdl.GetDatabases()
web.BroadcastInitProgress(dbMetas)

var procedure *restore.Controller

param := &restore.ControllerParam{
DBMetas: dbMetas,
Status: &l.status,
Expand All @@ -516,6 +524,7 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti
CheckpointName: o.checkpointName,
}

var procedure *restore.Controller
procedure, err = restore.NewRestoreController(ctx, taskCfg, param)
if err != nil {
o.logger.Error("restore failed", log.ShortError(err))
Expand Down
23 changes: 21 additions & 2 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,15 @@ type Controller struct {
precheckItemBuilder *PrecheckItemBuilder
}

// LightningStatus provides the finished bytes and total bytes of the current task.
// It should keep the value after restart from checkpoint.
// When it is tidb backend, FinishedFileSize can be counted after chunk data is
// restored to tidb. When it is local backend it's counted after whole engine is
// imported.
// TotalFileSize may be an estimated value, so when the task is finished, it may
// not equal to FinishedFileSize.
type LightningStatus struct {
backend string
FinishedFileSize atomic.Int64
TotalFileSize atomic.Int64
}
Expand Down Expand Up @@ -353,6 +361,7 @@ func NewRestoreControllerWithPauser(
default:
return nil, common.ErrUnknownBackend.GenWithStackByArgs(cfg.TikvImporter.Backend)
}
p.Status.backend = cfg.TikvImporter.Backend

var metaBuilder metaMgrBuilder
isSSTImport := cfg.TikvImporter.Backend == config.BackendLocal
Expand Down Expand Up @@ -2427,8 +2436,13 @@ func (cr *chunkRestore) deliverLoop(
// comes from chunk.Chunk.Offset. so it shouldn't happen that currOffset - startOffset < 0.
// but we met it one time, but cannot reproduce it now, we add this check to make code more robust
// TODO: reproduce and find the root cause and fix it completely
if currOffset >= startOffset {
m.BytesCounter.WithLabelValues(metric.BytesStateRestored).Add(float64(currOffset - startOffset))

delta := currOffset - startOffset
if delta >= 0 {
m.BytesCounter.WithLabelValues(metric.BytesStateRestored).Add(float64(delta))
if rc.status != nil && rc.status.backend == config.BackendTiDB {
rc.status.FinishedFileSize.Add(delta)
}
} else {
deliverLogger.Warn("offset go back", zap.Int64("curr", currOffset),
zap.Int64("start", startOffset))
Expand All @@ -2441,6 +2455,11 @@ func (cr *chunkRestore) deliverLoop(
}
failpoint.Inject("SlowDownWriteRows", func() {
deliverLogger.Warn("Slowed down write rows")
finished := rc.status.FinishedFileSize.Load()
total := rc.status.TotalFileSize.Load()
deliverLogger.Warn("PrintStatus Failpoint",
zap.Int64("finished", finished),
zap.Int64("total", total))
})
failpoint.Inject("FailAfterWriteRows", nil)
// TODO: for local backend, we may save checkpoint more frequently, e.g. after written
Expand Down
10 changes: 9 additions & 1 deletion br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (tr *TableRestore) restoreEngines(pCtx context.Context, rc *Controller, cp
dataWorker := rc.closedEngineLimit.Apply()
defer rc.closedEngineLimit.Recycle(dataWorker)
err = tr.importEngine(ctx, dataClosedEngine, rc, eid, ecp)
if rc.status != nil {
if rc.status != nil && rc.status.backend == config.BackendLocal {
for _, chunk := range ecp.Chunks {
rc.status.FinishedFileSize.Add(chunk.Chunk.EndOffset - chunk.Key.Offset)
}
Expand Down Expand Up @@ -406,6 +406,11 @@ func (tr *TableRestore) restoreEngine(
if err != nil {
return closedEngine, errors.Trace(err)
}
if rc.status != nil && rc.status.backend == config.BackendTiDB {
for _, chunk := range cp.Chunks {
rc.status.FinishedFileSize.Add(chunk.Chunk.EndOffset - chunk.Key.Offset)
}
}
return closedEngine, nil
}

Expand Down Expand Up @@ -475,6 +480,9 @@ func (tr *TableRestore) restoreEngine(

// Restore table data
for chunkIndex, chunk := range cp.Chunks {
if rc.status != nil && rc.status.backend == config.BackendTiDB {
rc.status.FinishedFileSize.Add(chunk.Chunk.Offset - chunk.Key.Offset)
}
if chunk.Chunk.Offset >= chunk.Chunk.EndOffset {
continue
}
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ go_library(
"//br/pkg/summary",
"//br/pkg/utils",
"//br/pkg/utils/iter",
"//br/pkg/version",
"//config",
"//ddl",
"//ddl/util",
Expand All @@ -57,6 +58,7 @@ go_library(
"//tablecodec",
"//util",
"//util/codec",
"//util/collate",
"//util/hack",
"//util/mathutil",
"//util/table-filter",
Expand Down
Loading

0 comments on commit 7328b3d

Please sign in to comment.