Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

checkpoint: flush global checkpoint when first time flush checkpoint #758

Merged
merged 4 commits into from
Jun 28, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ func (cp *RemoteCheckPoint) Clear(tctx *tcontext.Context) error {
}

cp.globalPoint = newBinlogPoint(binlog.NewLocation(cp.cfg.Flavor), binlog.NewLocation(cp.cfg.Flavor), nil, nil, cp.cfg.EnableGTID)
cp.globalPointSaveTime = time.Time{}
cp.points = make(map[string]map[string]*binlogPoint)

return nil
Expand Down Expand Up @@ -452,7 +453,7 @@ func (cp *RemoteCheckPoint) FlushPointsExcept(tctx *tcontext.Context, exceptTabl
sqls := make([]string, 0, 100)
args := make([][]interface{}, 0, 100)

if cp.globalPoint.outOfDate() {
if cp.globalPoint.outOfDate() || cp.globalPointSaveTime.IsZero() {
locationG := cp.GlobalPoint()
sqlG, argG := cp.genUpdateSQL(globalCpSchema, globalCpTable, locationG, nil, true)
sqls = append(sqls, sqlG)
Expand Down
39 changes: 27 additions & 12 deletions syncer/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,22 +144,10 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) {
s.cfg.Dir = oldDir
}()

// try load from mydumper's output
pos1 := mysql.Position{
Name: "mysql-bin.000003",
Pos: 1943,
}
dir, err := ioutil.TempDir("", "test_global_checkpoint")
c.Assert(err, IsNil)
defer os.RemoveAll(dir)

filename := filepath.Join(dir, "metadata")
err = ioutil.WriteFile(filename, []byte(
fmt.Sprintf("SHOW MASTER STATUS:\n\tLog: %s\n\tPos: %d\n\tGTID:\n\nSHOW SLAVE STATUS:\n\tHost: %s\n\tLog: %s\n\tPos: %d\n\tGTID:\n\n", pos1.Name, pos1.Pos, "slave_host", pos1.Name, pos1.Pos+1000)),
0644)
c.Assert(err, IsNil)
s.cfg.Mode = config.ModeAll
s.cfg.Dir = dir

s.mock.ExpectQuery(loadCheckPointSQL).WithArgs(cpid).WillReturnRows(sqlmock.NewRows(nil))
err = cp.Load(tctx, s.tracker)
Expand Down Expand Up @@ -252,6 +240,33 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) {
c.Assert(err, IsNil)
c.Assert(cp.GlobalPoint().Position, Equals, binlog.MinPosition)
c.Assert(cp.FlushedGlobalPoint().Position, Equals, binlog.MinPosition)

// try load from mydumper's output
dir, err := ioutil.TempDir("", "test_global_checkpoint")
c.Assert(err, IsNil)
defer os.RemoveAll(dir)

filename := filepath.Join(dir, "metadata")
err = ioutil.WriteFile(filename, []byte(
fmt.Sprintf("SHOW MASTER STATUS:\n\tLog: %s\n\tPos: %d\n\tGTID:\n\nSHOW SLAVE STATUS:\n\tHost: %s\n\tLog: %s\n\tPos: %d\n\tGTID:\n\n", pos1.Name, pos1.Pos, "slave_host", pos1.Name, pos1.Pos+1000)),
0644)
c.Assert(err, IsNil)
s.cfg.Mode = config.ModeAll
s.cfg.Dir = dir
cp.LoadMeta()

// should flush because globalPointSaveTime is zero
s.mock.ExpectBegin()
s.mock.ExpectExec("(202)?"+flushCheckPointSQL).WithArgs(cpid, "", "", pos1.Name, pos1.Pos, "", []byte("null"), true).WillReturnResult(sqlmock.NewResult(0, 1))
s.mock.ExpectCommit()
err = cp.FlushPointsExcept(tctx, nil, nil, nil)
c.Assert(err, IsNil)
s.mock.ExpectQuery(loadCheckPointSQL).WillReturnRows(sqlmock.NewRows(nil))
err = cp.Load(tctx, s.tracker)
c.Assert(err, IsNil)
c.Assert(cp.GlobalPoint().Position, Equals, pos1)
c.Assert(cp.FlushedGlobalPoint().Position, Equals, pos1)

}

func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) {
Expand Down
11 changes: 11 additions & 0 deletions syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,17 @@ func (s *testSyncerSuite) TestCasuality(c *C) {
c.Assert(key, Equals, "b")

// will detect casuality and add a flush job
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
dbConn, err := db.Conn(context.Background())
c.Assert(err, IsNil)

syncer.checkpoint.(*RemoteCheckPoint).dbConn = &DBConn{cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}
syncer.checkpoint.(*RemoteCheckPoint).prepare(tcontext.Background())

mock.ExpectBegin()
mock.ExpectExec(".*INSERT INTO .* VALUES.* ON DUPLICATE KEY UPDATE.*").WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectCommit()
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
key, err = syncer.resolveCasuality([]string{"a", "b"})
c.Assert(err, IsNil)
c.Assert(key, Equals, "a")
Expand Down