Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

binlog replication: support GTID #521

Merged
merged 37 commits into from
Mar 9, 2020
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
6f8c040
use Location to store position and gtid
WangXiangUSTC Mar 3, 2020
dd80b84
update
WangXiangUSTC Mar 4, 2020
80619bf
update
WangXiangUSTC Mar 5, 2020
f9ccc8c
add TODO
WangXiangUSTC Mar 5, 2020
36b1f8b
use gtid.Set
WangXiangUSTC Mar 5, 2020
4b5ede3
update test
WangXiangUSTC Mar 5, 2020
0b92af9
minor fix
WangXiangUSTC Mar 6, 2020
ce6494d
enable gtid in test
WangXiangUSTC Mar 6, 2020
93b1609
merge master
WangXiangUSTC Mar 6, 2020
b705867
fix nil point
WangXiangUSTC Mar 7, 2020
6b244aa
update unit test
WangXiangUSTC Mar 7, 2020
556a424
fix increment test
WangXiangUSTC Mar 7, 2020
1ac97e0
minor fix on pointer
WangXiangUSTC Mar 7, 2020
b5a50a3
remove useless code
WangXiangUSTC Mar 7, 2020
bd70d96
minor update
WangXiangUSTC Mar 7, 2020
79d86e6
fix shard meta data restore
WangXiangUSTC Mar 7, 2020
6d3ab39
address comment
WangXiangUSTC Mar 7, 2020
e2e705e
address comment
WangXiangUSTC Mar 7, 2020
ffeb7eb
add clone
WangXiangUSTC Mar 7, 2020
8132f8f
address comemnt and fix test
WangXiangUSTC Mar 8, 2020
118c8f1
minor fix and add test for compare location
WangXiangUSTC Mar 8, 2020
109fdd5
fix test
WangXiangUSTC Mar 8, 2020
2b807cd
fix test
WangXiangUSTC Mar 8, 2020
c5bfffa
add help function
WangXiangUSTC Mar 8, 2020
d8dea9c
remove useless code
WangXiangUSTC Mar 8, 2020
a59f087
address comment
WangXiangUSTC Mar 8, 2020
072222a
some test use gtid && minor fix
WangXiangUSTC Mar 8, 2020
916ffa1
ignore fake rotate event
WangXiangUSTC Mar 9, 2020
9648397
update current location before redirect streamer
WangXiangUSTC Mar 9, 2020
9f48852
address comment
WangXiangUSTC Mar 9, 2020
d5584c7
add some clone
WangXiangUSTC Mar 9, 2020
c63832e
Merge branch 'master' into xiang/support_GTID
WangXiangUSTC Mar 9, 2020
53bef75
add location clone
WangXiangUSTC Mar 9, 2020
5605ab8
address comment
WangXiangUSTC Mar 9, 2020
35248aa
address comemnt
WangXiangUSTC Mar 9, 2020
e0b6cb5
address comment
WangXiangUSTC Mar 9, 2020
ca671b5
add wait time
WangXiangUSTC Mar 9, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ ErrBinlogExtractPosition,[code=22001:class=binlog-op:scope=internal:level=high],
ErrBinlogInvalidFilename,[code=22002:class=binlog-op:scope=internal:level=high],"invalid binlog filename"
ErrBinlogParsePosFromStr,[code=22003:class=binlog-op:scope=internal:level=high],""
ErrCheckpointInvalidTaskMode,[code=24001:class=checkpoint:scope=internal:level=medium],"invalid task mode: %s"
ErrCheckpointSaveInvalidPos,[code=24002:class=checkpoint:scope=internal:level=high],"save point %v is older than current pos %v"
ErrCheckpointSaveInvalidPos,[code=24002:class=checkpoint:scope=internal:level=high],"save point %s is older than current location %s"
ErrCheckpointInvalidTableFile,[code=24003:class=checkpoint:scope=internal:level=medium],"invalid db table sql file - %s"
ErrCheckpointDBNotExistInFile,[code=24004:class=checkpoint:scope=internal:level=medium],"db (%s) not exist in data files, but in checkpoint"
ErrCheckpointTableNotExistInFile,[code=24005:class=checkpoint:scope=internal:level=medium],"table (%s) not exist in db (%s) data files, but in checkpoint"
Expand Down
1 change: 1 addition & 0 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ var (
type Meta struct {
BinLogName string `toml:"binlog-name" yaml:"binlog-name"`
BinLogPos uint32 `toml:"binlog-pos" yaml:"binlog-pos"`
BinLogGTID string `toml:"binlog-gtid" yaml:"binlog-gtid"`
}

// Verify does verification on configs
Expand Down
282 changes: 141 additions & 141 deletions dm/pb/dmworker.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion dm/proto/dmworker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ message LoadStatus {
message ShardingGroup {
string target = 1;
repeated string DDLs = 2;
string firstPos = 3;
string firstLocation = 3;
repeated string synced = 4;
repeated string unsynced = 5;
}
Expand Down
5 changes: 3 additions & 2 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,7 @@ func makeCommonWorkerResponse(reqErr error) *pb.CommonWorkerResponse {

// all subTask in subTaskCfgs should have same source
// this function return the min position in all subtasks, used for relay's position
// TODO: get min gtidSet
func getMinPosInAllSubTasks(ctx context.Context, subTaskCfgs []*config.SubTaskConfig) (minPos *mysql.Position, err error) {
for _, subTaskCfg := range subTaskCfgs {
pos, err := getMinPosForSubTaskFunc(ctx, subTaskCfg)
Expand Down Expand Up @@ -853,6 +854,6 @@ func getMinPosForSubTask(ctx context.Context, subTaskCfg *config.SubTaskConfig)
return nil, errors.Annotate(err, "get min position from checkpoint")
}

pos := checkpoint.GlobalPoint()
return &pos, nil
location := checkpoint.GlobalPoint()
return &location.Position, nil
}
1 change: 1 addition & 0 deletions dm/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,7 @@ func (w *Worker) copyConfigFromWorker(cfg *config.SubTaskConfig) {
cfg.ServerID = w.cfg.ServerID
cfg.RelayDir = w.cfg.RelayDir
cfg.EnableGTID = w.cfg.EnableGTID
cfg.SyncerConfig.EnableGTID = w.cfg.EnableGTID
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
cfg.UseRelay = w.cfg.EnableRelay

// we can remove this from SubTaskConfig later, because syncer will always read from relay
Expand Down
2 changes: 1 addition & 1 deletion loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1194,7 +1194,7 @@ func (l *Loader) checkpointID() string {

func (l *Loader) getMydumpMetadata() error {
metafile := filepath.Join(l.cfg.LoaderConfig.Dir, "metadata")
pos, err := utils.ParseMetaData(metafile)
pos, _, err := utils.ParseMetaData(metafile)
if err != nil {
l.logCtx.L().Error("fail to parse dump metadata", log.ShortError(err))
return err
Expand Down
90 changes: 90 additions & 0 deletions pkg/binlog/position.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@
package binlog

import (
"fmt"
"strconv"
"strings"

gmysql "github.com/siddontang/go-mysql/mysql"
"go.uber.org/zap"

"github.com/pingcap/dm/pkg/gtid"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
)
Expand All @@ -33,6 +37,11 @@ const (
posUUIDSuffixSeparator = "|"
)

var (
// MinPosition is the min binlog position
MinPosition = gmysql.Position{Pos: 4}
)

// PositionFromStr constructs a mysql.Position from a string representation like `mysql-bin.000001:2345`
func PositionFromStr(s string) (gmysql.Position, error) {
parsed := strings.Split(s, ":")
Expand Down Expand Up @@ -160,3 +169,84 @@ func ComparePosition(pos1, pos2 gmysql.Position) int {

return adjustedPos1.Compare(adjustedPos2)
}

// Location is used for save binlog's position and gtid
type Location struct {
Position gmysql.Position

GTIDSet gtid.Set
}

// NewLocation returns a new Location
func NewLocation(flavor string) Location {
return Location{
Position: MinPosition,
GTIDSet: gtid.MinGTIDSet(flavor),
}
}

func (l Location) String() string {
return fmt.Sprintf("position: %v, gtid-set: %s", l.Position, l.GTIDSetStr())
}

// GTIDSetStr returns gtid set's string
func (l Location) GTIDSetStr() string {
gsetStr := ""
if l.GTIDSet != nil {
gsetStr = l.GTIDSet.String()
}

return gsetStr
}

// Clone clones a same Location
func (l Location) Clone() Location {
return l.CloneWithFlavor("")
}

// CloneWithFlavor clones the location, and if the GTIDSet is nil, will create a GTIDSet with specified flavor.
func (l Location) CloneWithFlavor(flavor string) Location {
var newGTIDSet gtid.Set
if len(flavor) != 0 {
newGTIDSet = gtid.MinGTIDSet(flavor)
}

if l.GTIDSet != nil {
newGTIDSet = l.GTIDSet.Clone()
}
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved

return Location{
Position: gmysql.Position{
Name: l.Position.Name,
Pos: l.Position.Pos,
},
GTIDSet: newGTIDSet,
}
}

// CompareLocation returns:
// 1 if point1 is bigger than point2
// 0 if point1 is equal to point2
// -1 if point1 is less than point2
func CompareLocation(location1, location2 Location) int {
if location1.GTIDSet != nil && len(location1.GTIDSet.String()) != 0 &&
location2.GTIDSet != nil && len(location2.GTIDSet.String()) != 0 {
contain1 := location1.GTIDSet.Contain(location2.GTIDSet)
contain2 := location2.GTIDSet.Contain(location1.GTIDSet)
if contain1 && contain2 {
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
// gtidSet1 contains gtidSet2 and gtidSet2 contains gtidSet1 means gtidSet1 equals to gtidSet2,
// then need to compare by position.
} else {
if contain1 {
return 1
} else if contain2 {
return -1
}

// can't compare location by gtid, and will compare by position
log.L().Warn("gtidSet can't be compared", zap.Stringer("location1", location1), zap.Stringer("location2", location2))
}
}

return ComparePosition(location1.Position, location2.Position)
}
195 changes: 195 additions & 0 deletions pkg/binlog/position_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (

. "github.com/pingcap/check"
gmysql "github.com/siddontang/go-mysql/mysql"

"github.com/pingcap/dm/pkg/gtid"
)

var _ = Suite(&testPositionSuite{})
Expand Down Expand Up @@ -347,3 +349,196 @@ func (t *testPositionSuite) TestComparePosition(c *C) {
c.Assert(cmp, Equals, cs.cmp)
}
}

func (t *testPositionSuite) TestCompareCompareLocation(c *C) {
testCases := []struct {
flavor string
pos1 gmysql.Position
gset1 string
pos2 gmysql.Position
gset2 string
cmp int
}{
{
// pos1 = pos2
gmysql.MySQLFlavor,
gmysql.Position{
Name: "binlog.00001",
Pos: 123,
},
"",
gmysql.Position{
Name: "binlog.00001",
Pos: 123,
},
"",
0,
}, {
// pos1 = pos2
gmysql.MariaDBFlavor,
gmysql.Position{
Name: "binlog.00001",
Pos: 123,
},
"",
gmysql.Position{
Name: "binlog.00001",
Pos: 123,
},
"",
0,
}, {
// pos1 < pos2
gmysql.MariaDBFlavor,
gmysql.Position{
Name: "binlog.00001",
Pos: 123,
},
"",
gmysql.Position{
Name: "binlog.00002",
Pos: 122,
},
"",
-1,
}, {
// pos1 > pos2
gmysql.MySQLFlavor,
gmysql.Position{
Name: "binlog.00003",
Pos: 123,
},
"",
gmysql.Position{
Name: "binlog.00002",
Pos: 122,
},
"",
1,
}, {
// gset1 = gset2, pos1 < pos2
gmysql.MySQLFlavor,
gmysql.Position{
Name: "binlog.00001",
Pos: 123,
},
"53ea0ed1-9bf8-11e6-8bea-64006a897c73:1-4",
gmysql.Position{
Name: "binlog.00002",
Pos: 122,
},
"53ea0ed1-9bf8-11e6-8bea-64006a897c73:1-4",
-1,
}, {
// compare by gtid set, gset2 contains gset1
gmysql.MySQLFlavor,
gmysql.Position{
Name: "binlog.00001",
Pos: 123,
},
"53ea0ed1-9bf8-11e6-8bea-64006a897c73:1-2,53ea0ed1-9bf8-11e6-8bea-64006a897c74:1-2",
gmysql.Position{
lichunzhu marked this conversation as resolved.
Show resolved Hide resolved
Name: "binlog.00002",
Pos: 124,
},
"53ea0ed1-9bf8-11e6-8bea-64006a897c73:1-4,53ea0ed1-9bf8-11e6-8bea-64006a897c74:1-3",
-1,
}, {
// compare by gtid set, gset1 contains gset2
gmysql.MySQLFlavor,
gmysql.Position{
Name: "binlog.00001",
Pos: 123,
},
"53ea0ed1-9bf8-11e6-8bea-64006a897c73:1-2,53ea0ed1-9bf8-11e6-8bea-64006a897c74:1-3",
gmysql.Position{
Name: "binlog.00002",
Pos: 124,
},
"53ea0ed1-9bf8-11e6-8bea-64006a897c73:1-2",
1,
}, {
// can't compare by gtid set, will compare by position, pos1 < pos2
gmysql.MySQLFlavor,
gmysql.Position{
Name: "binlog.00001",
Pos: 123,
},
"53ea0ed1-9bf8-11e6-8bea-64006a897c73:1-2,53ea0ed1-9bf8-11e6-8bea-64006a897c74:2-4",
gmysql.Position{
Name: "binlog.00002",
Pos: 124,
},
"53ea0ed1-9bf8-11e6-8bea-64006a897c73:1-2,53ea0ed1-9bf8-11e6-8bea-64006a897c74:1-3",
-1,
}, {
// gset1 = gset2, pos1 < pos2
gmysql.MariaDBFlavor,
gmysql.Position{
Name: "binlog.00001",
Pos: 123,
},
"1-1-1,2-2-2",
gmysql.Position{
Name: "binlog.00002",
Pos: 122,
},
"1-1-1,2-2-2",
-1,
}, {
// compare by gtid set, gset2 contains gset1
gmysql.MariaDBFlavor,
gmysql.Position{
Name: "binlog.00001",
Pos: 123,
},
"1-1-1,2-2-2",
gmysql.Position{
Name: "binlog.00002",
Pos: 124,
},
"1-1-1,2-2-2,3-3-3",
-1,
}, {
// compare by gtid set, gset1 contains gset2
gmysql.MariaDBFlavor,
gmysql.Position{
Name: "binlog.00001",
Pos: 123,
},
"1-1-1,2-2-3",
gmysql.Position{
Name: "binlog.00002",
Pos: 124,
},
"1-1-1,2-2-2",
1,
}, {
// can't compare by gtid set, will compare by position, pos1 < pos2
gmysql.MariaDBFlavor,
gmysql.Position{
Name: "binlog.00001",
Pos: 123,
},
"1-1-1,2-2-2",
gmysql.Position{
Name: "binlog.00002",
Pos: 124,
},
"2-2-2,3-3-3",
-1,
},
}

for _, cs := range testCases {
c.Log(cs)
gset1, err := gtid.ParserGTID(cs.flavor, cs.gset1)
c.Assert(err, IsNil)
gset2, err := gtid.ParserGTID(cs.flavor, cs.gset2)
c.Assert(err, IsNil)

cmp := CompareLocation(Location{cs.pos1, gset1}, Location{cs.pos2, gset2})
c.Assert(cmp, Equals, cs.cmp)
}

}
Loading