Skip to content

Commit

Permalink
loader(dm): simplify lightning checkpoint and add clean meta (pingcap…
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv authored Dec 22, 2021
1 parent d108cbb commit d84f15b
Show file tree
Hide file tree
Showing 7 changed files with 211 additions and 108 deletions.
2 changes: 2 additions & 0 deletions dm/dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1534,6 +1534,8 @@ func (s *Server) removeMetaData(ctx context.Context, taskName, metaSchema string
// clear loader and syncer checkpoints
sqls = append(sqls, fmt.Sprintf("DROP TABLE IF EXISTS %s",
dbutil.TableName(metaSchema, cputil.LoaderCheckpoint(taskName))))
sqls = append(sqls, fmt.Sprintf("DROP TABLE IF EXISTS %s",
dbutil.TableName(metaSchema, cputil.LightningCheckpoint(taskName))))
sqls = append(sqls, fmt.Sprintf("DROP TABLE IF EXISTS %s",
dbutil.TableName(metaSchema, cputil.SyncerCheckpoint(taskName))))
sqls = append(sqls, fmt.Sprintf("DROP TABLE IF EXISTS %s",
Expand Down
2 changes: 2 additions & 0 deletions dm/dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,7 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) {
mock := conn.InitMockDB(c)
mock.ExpectBegin()
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.LoaderCheckpoint(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.LightningCheckpoint(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.SyncerCheckpoint(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.SyncerShardMeta(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.SyncerOnlineDDL(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
Expand Down Expand Up @@ -1077,6 +1078,7 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) {
mock = conn.InitMockDB(c)
mock.ExpectBegin()
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.LoaderCheckpoint(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.LightningCheckpoint(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.SyncerCheckpoint(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.SyncerShardMeta(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.SyncerOnlineDDL(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
Expand Down
130 changes: 91 additions & 39 deletions dm/loader/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ import (
"go.uber.org/zap"
)

const (
LightningCheckpointListName = "lightning_checkpoint_list"
)

// CheckPoint represents checkpoint status.
type CheckPoint interface {
// Load loads all checkpoints recorded before.
Expand Down Expand Up @@ -470,19 +466,58 @@ func (cp *RemoteCheckPoint) String() string {
return string(bytes)
}

type lightingLoadStatus int

const (
lightningStatusInit lightingLoadStatus = iota
lightningStatusRunning
lightningStatusFinished
)

func (s lightingLoadStatus) String() string {
switch s {
case lightningStatusInit:
return "init"
case lightningStatusRunning:
return "running"
case lightningStatusFinished:
return "finished"
default:
panic(fmt.Sprintf("unknown lightning load stauts '%d'", s))
}
}

func parseLightningLoadStatus(s string) lightingLoadStatus {
switch s {
case "running":
return lightningStatusRunning
case "finished":
return lightningStatusFinished
case "init":
return lightningStatusInit
default:
log.L().Warn("unknown lightning load status, will fallback to init", zap.String("status", s))
return lightningStatusInit
}
}

type LightningCheckpointList struct {
db *conn.BaseDB
schema string
tableName string
logger log.Logger
db *conn.BaseDB
schema string
tableName string
taskName string
sourceName string
logger log.Logger
}

func NewLightningCheckpointList(db *conn.BaseDB, metaSchema string) *LightningCheckpointList {
func NewLightningCheckpointList(db *conn.BaseDB, taskName, sourceName, metaSchema string) *LightningCheckpointList {
return &LightningCheckpointList{
db: db,
schema: dbutil.ColumnName(metaSchema),
tableName: dbutil.TableName(metaSchema, LightningCheckpointListName),
logger: log.L().WithFields(zap.String("component", "lightning checkpoint database list")),
db: db,
schema: dbutil.ColumnName(metaSchema),
tableName: dbutil.TableName(metaSchema, cputil.LightningCheckpoint(taskName)),
taskName: taskName,
sourceName: sourceName,
logger: log.L().WithFields(zap.String("component", "lightning checkpoint database list")),
}
}

Expand All @@ -491,35 +526,38 @@ func (cp *LightningCheckpointList) Prepare(ctx context.Context) error {
if err != nil {
return terror.WithScope(terror.Annotate(err, "initialize connection when prepare"), terror.ScopeDownstream)
}
defer conn.CloseBaseConnWithoutErr(cp.db, connection)

createSchema := fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", cp.schema)
tctx := tcontext.NewContext(ctx, log.With(zap.String("job", "lightning-checkpoint")))
_, err = connection.ExecuteSQL(tctx, nil, "lightning-checkpoint", []string{createSchema})
if err != nil {
return err
}
createTable := `CREATE TABLE IF NOT EXISTS %s (
worker_name varchar(255) NOT NULL,
task_name varchar(255) NOT NULL,
PRIMARY KEY (task_name, worker_name)
source_name varchar(255) NOT NULL,
status varchar(10) NOT NULL DEFAULT 'init' COMMENT 'init,running,finished',
PRIMARY KEY (task_name, source_name)
);
`
sql2 := fmt.Sprintf(createTable, cp.tableName)
_, err = connection.ExecuteSQL(tctx, nil, "lightning-checkpoint", []string{sql2})
return terror.WithScope(err, terror.ScopeDownstream)
}

func (cp *LightningCheckpointList) RegisterCheckPoint(ctx context.Context, workerName, taskName string) error {
func (cp *LightningCheckpointList) RegisterCheckPoint(ctx context.Context) error {
connection, err := cp.db.GetBaseConn(ctx)
if err != nil {
return terror.WithScope(terror.Annotate(err, "initialize connection"), terror.ScopeDownstream)
}
defer conn.CloseBaseConnWithoutErr(cp.db, connection)

sql := fmt.Sprintf("INSERT IGNORE INTO %s (`worker_name`, `task_name`) VALUES(?,?)", cp.tableName)
sql := fmt.Sprintf("INSERT IGNORE INTO %s (`task_name`, `source_name`) VALUES (?, ?)", cp.tableName)
cp.logger.Info("initial checkpoint record",
zap.String("sql", sql),
zap.String("worker-name", workerName),
zap.String("task-name", taskName))
args := []interface{}{workerName, taskName}
zap.String("task", cp.taskName),
zap.String("source", cp.sourceName))
args := []interface{}{cp.taskName, cp.sourceName}
tctx := tcontext.NewContext(ctx, log.With(zap.String("job", "lightning-checkpoint")))
_, err = connection.ExecuteSQL(tctx, nil, "lightning-checkpoint", []string{sql}, args)
if err != nil {
Expand All @@ -528,35 +566,49 @@ func (cp *LightningCheckpointList) RegisterCheckPoint(ctx context.Context, worke
return nil
}

func (cp *LightningCheckpointList) RemoveTaskCheckPoint(ctx context.Context, taskName string) error {
func (cp *LightningCheckpointList) UpdateStatus(ctx context.Context, status lightingLoadStatus) error {
connection, err := cp.db.GetBaseConn(ctx)
if err != nil {
return terror.WithScope(terror.Annotate(err, "initialize connection"), terror.ScopeDownstream)
}
defer conn.CloseBaseConnWithoutErr(cp.db, connection)

sql := fmt.Sprintf("UPDATE %s set status = ? WHERE `task_name` = ? AND `source_name` = ?", cp.tableName)
cp.logger.Info("update lightning loader status",
zap.String("task", cp.taskName), zap.String("source", cp.sourceName),
zap.Stringer("status", status))
tctx := tcontext.NewContext(ctx, log.With(zap.String("job", "lightning-checkpoint")))
query := fmt.Sprintf("SELECT `worker_name` from %s where `task_name`=?", cp.tableName)
rows, err := connection.QuerySQL(tctx, query, taskName)
_, err = connection.ExecuteSQL(tctx, nil, "lightning-checkpoint", []string{sql},
[]interface{}{status.String(), cp.taskName, cp.sourceName})
if err != nil {
return terror.WithScope(err, terror.ScopeDownstream)
return terror.WithScope(terror.Annotate(err, "update lightning status"), terror.ScopeDownstream)
}
return nil
}

func (cp *LightningCheckpointList) taskStatus(ctx context.Context) (lightingLoadStatus, error) {
connection, err := cp.db.GetBaseConn(ctx)
if err != nil {
return lightningStatusInit, terror.WithScope(terror.Annotate(err, "initialize connection"), terror.ScopeDownstream)
}
defer conn.CloseBaseConnWithoutErr(cp.db, connection)

query := fmt.Sprintf("SELECT status FROM %s WHERE `task_name` = ? AND `source_name` = ?", cp.tableName)
tctx := tcontext.NewContext(ctx, log.With(zap.String("job", "lightning-checkpoint")))
rows, err := connection.QuerySQL(tctx, query, cp.taskName, cp.sourceName)
if err != nil {
return lightningStatusInit, err
}
defer rows.Close()
var workerName string
for rows.Next() {
err = rows.Scan(&workerName)
if err != nil {
return terror.WithScope(terror.DBErrorAdapt(err, terror.ErrDBDriverError), terror.ScopeDownstream)
}
cpdb := config.TiDBLightningCheckpointPrefix + dbutil.TableName(workerName, taskName)
sql := fmt.Sprintf("DROP DATABASE IF NOT EXISTS %s", cpdb)
_, err = connection.ExecuteSQL(tctx, nil, "lightning-checkpoint", []string{sql})
if err != nil {
return terror.WithScope(err, terror.ScopeDownstream)
if rows.Next() {
var status string
if err = rows.Scan(&status); err != nil {
return lightningStatusInit, terror.WithScope(err, terror.ScopeDownstream)
}
return parseLightningLoadStatus(status), nil
}
query = fmt.Sprintf("DELETE from %s where `task_name`=?", cp.tableName)
_, err = connection.ExecuteSQL(tctx, nil, "lightning-checkpoint", []string{query}, []interface{}{taskName})
return terror.WithScope(err, terror.ScopeDownstream)
// status row doesn't exist, return default value
return lightningStatusInit, nil
}

// Close implements CheckPoint.Close.
Expand Down
79 changes: 78 additions & 1 deletion dm/loader/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package loader

import (
"context"
"database/sql"
"fmt"
"os"
"strconv"
Expand All @@ -27,7 +29,10 @@ import (
"github.com/pingcap/tiflow/dm/pkg/cputil"
)

var _ = Suite(&testCheckPointSuite{})
var (
_ = Suite(&testCheckPointSuite{})
_ = Suite(&lightningCpListSuite{})
)

var (
schemaCreateSQL = ""
Expand Down Expand Up @@ -259,3 +264,75 @@ func (t *testCheckPointSuite) TestDeepCopy(c *C) {
cp.restoringFiles.pos["db"]["table"]["file3"] = []int64{0, 100}
c.Assert(ret, DeepEquals, map[string][]int64{"file": {10, 100}, "file2": {0, 100}})
}

type lightningCpListSuite struct {
mock sqlmock.Sqlmock
cpList *LightningCheckpointList
}

func (s *lightningCpListSuite) SetUpTest(c *C) {
s.mock = conn.InitMockDB(c)

baseDB, err := conn.DefaultDBProvider.Apply(&config.DBConfig{})
c.Assert(err, IsNil)

metaSchema := "dm_meta"
cpList := NewLightningCheckpointList(baseDB, "test_lightning", "source1", metaSchema)

s.cpList = cpList
}

func (s *lightningCpListSuite) TearDownTest(c *C) {
c.Assert(s.mock.ExpectationsWereMet(), IsNil)
}

func (s *lightningCpListSuite) TestLightningCheckpointListPrepare(c *C) {
ctx := context.Background()
s.mock.ExpectBegin()
s.mock.ExpectExec(fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s.*", s.cpList.schema)).WillReturnResult(sqlmock.NewResult(1, 1))
s.mock.ExpectCommit()
s.mock.ExpectBegin()
s.mock.ExpectExec(fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s.*", s.cpList.tableName)).WillReturnResult(sqlmock.NewResult(1, 1))
s.mock.ExpectCommit()
err := s.cpList.Prepare(ctx)
c.Assert(err, IsNil)
}

func (s *lightningCpListSuite) TestLightningCheckpointListStatusInit(c *C) {
// no rows in target table, will return default status
s.mock.ExpectQuery(fmt.Sprintf("SELECT status FROM %s WHERE `task_name` = \\? AND `source_name` = \\?", s.cpList.tableName)).
WithArgs(s.cpList.taskName, s.cpList.sourceName).
WillReturnRows(sqlmock.NewRows([]string{"status"}).RowError(0, sql.ErrNoRows))
status, err := s.cpList.taskStatus(context.Background())
c.Assert(err, IsNil)
c.Assert(status, Equals, lightningStatusInit)
}

func (s *lightningCpListSuite) TestLightningCheckpointListStatusRunning(c *C) {
s.mock.ExpectQuery(fmt.Sprintf("SELECT status FROM %s WHERE `task_name` = \\? AND `source_name` = \\?", s.cpList.tableName)).
WithArgs(s.cpList.taskName, s.cpList.sourceName).
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("running"))
status, err := s.cpList.taskStatus(context.Background())
c.Assert(err, IsNil)
c.Assert(status, Equals, lightningStatusRunning)
}

func (s *lightningCpListSuite) TestLightningCheckpointListRegister(c *C) {
s.mock.ExpectBegin()
s.mock.ExpectExec(fmt.Sprintf("INSERT IGNORE INTO %s \\(`task_name`, `source_name`\\) VALUES \\(\\?, \\?\\)", s.cpList.tableName)).
WithArgs(s.cpList.taskName, s.cpList.sourceName).
WillReturnResult(sqlmock.NewResult(2, 1))
s.mock.ExpectCommit()
err := s.cpList.RegisterCheckPoint(context.Background())
c.Assert(err, IsNil)
}

func (s *lightningCpListSuite) TestLightningCheckpointListUpdateStatus(c *C) {
s.mock.ExpectBegin()
s.mock.ExpectExec(fmt.Sprintf("UPDATE %s set status = \\? WHERE `task_name` = \\? AND `source_name` = \\?", s.cpList.tableName)).
WithArgs("running", s.cpList.taskName, s.cpList.sourceName).
WillReturnResult(sqlmock.NewResult(3, 1))
s.mock.ExpectCommit()
err := s.cpList.UpdateStatus(context.Background(), lightningStatusRunning)
c.Assert(err, IsNil)
}
Loading

0 comments on commit d84f15b

Please sign in to comment.