Skip to content

Commit

Permalink
binlog replication: support GTID (pingcap#521)
Browse files Browse the repository at this point in the history
* add new struct Location to save binlog position and gtid set
* use Location in syncer unit
  • Loading branch information
WangXiangUSTC authored and lichunzhu committed Apr 6, 2020
1 parent 5077d0b commit 7537066
Show file tree
Hide file tree
Showing 44 changed files with 1,281 additions and 797 deletions.
2 changes: 1 addition & 1 deletion _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ ErrBinlogExtractPosition,[code=22001:class=binlog-op:scope=internal:level=high],
ErrBinlogInvalidFilename,[code=22002:class=binlog-op:scope=internal:level=high],"invalid binlog filename"
ErrBinlogParsePosFromStr,[code=22003:class=binlog-op:scope=internal:level=high],""
ErrCheckpointInvalidTaskMode,[code=24001:class=checkpoint:scope=internal:level=medium],"invalid task mode: %s"
ErrCheckpointSaveInvalidPos,[code=24002:class=checkpoint:scope=internal:level=high],"save point %v is older than current pos %v"
ErrCheckpointSaveInvalidPos,[code=24002:class=checkpoint:scope=internal:level=high],"save point %s is older than current location %s"
ErrCheckpointInvalidTableFile,[code=24003:class=checkpoint:scope=internal:level=medium],"invalid db table sql file - %s"
ErrCheckpointDBNotExistInFile,[code=24004:class=checkpoint:scope=internal:level=medium],"db (%s) not exist in data files, but in checkpoint"
ErrCheckpointTableNotExistInFile,[code=24005:class=checkpoint:scope=internal:level=medium],"table (%s) not exist in db (%s) data files, but in checkpoint"
Expand Down
1 change: 1 addition & 0 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ var (
type Meta struct {
BinLogName string `toml:"binlog-name" yaml:"binlog-name"`
BinLogPos uint32 `toml:"binlog-pos" yaml:"binlog-pos"`
BinLogGTID string `toml:"binlog-gtid" yaml:"binlog-gtid"`
}

// Verify does verification on configs
Expand Down
8 changes: 4 additions & 4 deletions dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func testMockScheduler(ctx context.Context, wg *sync.WaitGroup, c *check.C, sour
defer wg.Done()
c.Assert(ha.KeepAlive(ctx, etcdTestCli, workerName, keepAliveTTL), check.IsNil)
}(ctx1, name)
c.Assert(utils.WaitSomething(30, 10*time.Millisecond, func() bool {
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
w := scheduler2.GetWorkerBySource(sources[i])
return w != nil && w.BaseInfo().Name == name
}), check.IsTrue)
Expand Down Expand Up @@ -827,7 +827,7 @@ func (t *testMaster) TestServer(c *check.C) {
cancel()
s.Close()

c.Assert(utils.WaitSomething(30, 10*time.Millisecond, func() bool {
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
return s.closed.Get()
}), check.IsTrue)
}
Expand Down Expand Up @@ -861,7 +861,7 @@ func (t *testMaster) TestJoinMember(c *check.C) {
defer s1.Close()

// wait the first one become the leader
c.Assert(utils.WaitSomething(30, 10*time.Millisecond, func() bool {
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
return s1.election.IsLeader()
}), check.IsTrue)

Expand Down Expand Up @@ -971,7 +971,7 @@ func (t *testMaster) TestOperateSource(c *check.C) {
defer wg.Done()
c.Assert(ha.KeepAlive(ctx, s1.etcdClient, workerName, keepAliveTTL), check.IsNil)
}(ctx1, workerName)
c.Assert(utils.WaitSomething(30, 10*time.Millisecond, func() bool {
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
w := s1.scheduler.GetWorkerBySource(sourceID)
return w != nil && w.BaseInfo().Name == workerName
}), check.IsTrue)
Expand Down
282 changes: 141 additions & 141 deletions dm/pb/dmworker.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion dm/proto/dmworker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ message LoadStatus {
message ShardingGroup {
string target = 1;
repeated string DDLs = 2;
string firstPos = 3;
string firstLocation = 3;
repeated string synced = 4;
repeated string unsynced = 5;
}
Expand Down
5 changes: 3 additions & 2 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,7 @@ func makeCommonWorkerResponse(reqErr error) *pb.CommonWorkerResponse {

// all subTask in subTaskCfgs should have same source
// this function return the min position in all subtasks, used for relay's position
// TODO: get min gtidSet
func getMinPosInAllSubTasks(ctx context.Context, subTaskCfgs []*config.SubTaskConfig) (minPos *mysql.Position, err error) {
for _, subTaskCfg := range subTaskCfgs {
pos, err := getMinPosForSubTaskFunc(ctx, subTaskCfg)
Expand Down Expand Up @@ -853,6 +854,6 @@ func getMinPosForSubTask(ctx context.Context, subTaskCfg *config.SubTaskConfig)
return nil, errors.Annotate(err, "get min position from checkpoint")
}

pos := checkpoint.GlobalPoint()
return &pos, nil
location := checkpoint.GlobalPoint()
return &location.Position, nil
}
2 changes: 1 addition & 1 deletion loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1194,7 +1194,7 @@ func (l *Loader) checkpointID() string {

func (l *Loader) getMydumpMetadata() error {
metafile := filepath.Join(l.cfg.LoaderConfig.Dir, "metadata")
pos, err := utils.ParseMetaData(metafile)
pos, _, err := utils.ParseMetaData(metafile)
if err != nil {
l.logCtx.L().Error("fail to parse dump metadata", log.ShortError(err))
return err
Expand Down
88 changes: 88 additions & 0 deletions pkg/binlog/position.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@
package binlog

import (
"fmt"
"strconv"
"strings"

gmysql "github.com/siddontang/go-mysql/mysql"
"go.uber.org/zap"

"github.com/pingcap/dm/pkg/gtid"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
)
Expand All @@ -33,6 +37,11 @@ const (
posUUIDSuffixSeparator = "|"
)

var (
// MinPosition is the min binlog position
MinPosition = gmysql.Position{Pos: 4}
)

// PositionFromStr constructs a mysql.Position from a string representation like `mysql-bin.000001:2345`
func PositionFromStr(s string) (gmysql.Position, error) {
parsed := strings.Split(s, ":")
Expand Down Expand Up @@ -160,3 +169,82 @@ func ComparePosition(pos1, pos2 gmysql.Position) int {

return adjustedPos1.Compare(adjustedPos2)
}

// Location is used for save binlog's position and gtid
type Location struct {
Position gmysql.Position

GTIDSet gtid.Set
}

// NewLocation returns a new Location
func NewLocation(flavor string) Location {
return Location{
Position: MinPosition,
GTIDSet: gtid.MinGTIDSet(flavor),
}
}

func (l Location) String() string {
return fmt.Sprintf("position: %v, gtid-set: %s", l.Position, l.GTIDSetStr())
}

// GTIDSetStr returns gtid set's string
func (l Location) GTIDSetStr() string {
gsetStr := ""
if l.GTIDSet != nil {
gsetStr = l.GTIDSet.String()
}

return gsetStr
}

// Clone clones a same Location
func (l Location) Clone() Location {
return l.CloneWithFlavor("")
}

// CloneWithFlavor clones the location, and if the GTIDSet is nil, will create a GTIDSet with specified flavor.
func (l Location) CloneWithFlavor(flavor string) Location {
var newGTIDSet gtid.Set
if l.GTIDSet != nil {
newGTIDSet = l.GTIDSet.Clone()
} else if len(flavor) != 0 {
newGTIDSet = gtid.MinGTIDSet(flavor)
}

return Location{
Position: gmysql.Position{
Name: l.Position.Name,
Pos: l.Position.Pos,
},
GTIDSet: newGTIDSet,
}
}

// CompareLocation returns:
// 1 if point1 is bigger than point2
// 0 if point1 is equal to point2
// -1 if point1 is less than point2
func CompareLocation(location1, location2 Location) int {
if location1.GTIDSet != nil && len(location1.GTIDSet.String()) != 0 &&
location2.GTIDSet != nil && len(location2.GTIDSet.String()) != 0 {
contain1 := location1.GTIDSet.Contain(location2.GTIDSet)
contain2 := location2.GTIDSet.Contain(location1.GTIDSet)
if contain1 && contain2 {
// gtidSet1 contains gtidSet2 and gtidSet2 contains gtidSet1 means gtidSet1 equals to gtidSet2,
// then need to compare by position.
} else {
if contain1 {
return 1
} else if contain2 {
return -1
}

// can't compare location by gtid, and will compare by position
log.L().Warn("gtidSet can't be compared", zap.Stringer("location1", location1), zap.Stringer("location2", location2))
}
}

return ComparePosition(location1.Position, location2.Position)
}
Loading

0 comments on commit 7537066

Please sign in to comment.