Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

binlog replication: support GTID #521

Merged
merged 37 commits into from
Mar 9, 2020
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
6f8c040
use Location to store position and gtid
WangXiangUSTC Mar 3, 2020
dd80b84
update
WangXiangUSTC Mar 4, 2020
80619bf
update
WangXiangUSTC Mar 5, 2020
f9ccc8c
add TODO
WangXiangUSTC Mar 5, 2020
36b1f8b
use gtid.Set
WangXiangUSTC Mar 5, 2020
4b5ede3
update test
WangXiangUSTC Mar 5, 2020
0b92af9
minor fix
WangXiangUSTC Mar 6, 2020
ce6494d
enable gtid in test
WangXiangUSTC Mar 6, 2020
93b1609
merge master
WangXiangUSTC Mar 6, 2020
b705867
fix nil point
WangXiangUSTC Mar 7, 2020
6b244aa
update unit test
WangXiangUSTC Mar 7, 2020
556a424
fix increment test
WangXiangUSTC Mar 7, 2020
1ac97e0
minor fix on pointer
WangXiangUSTC Mar 7, 2020
b5a50a3
remove useless code
WangXiangUSTC Mar 7, 2020
bd70d96
minor update
WangXiangUSTC Mar 7, 2020
79d86e6
fix shard meta data restore
WangXiangUSTC Mar 7, 2020
6d3ab39
address comment
WangXiangUSTC Mar 7, 2020
e2e705e
address comment
WangXiangUSTC Mar 7, 2020
ffeb7eb
add clone
WangXiangUSTC Mar 7, 2020
8132f8f
address comemnt and fix test
WangXiangUSTC Mar 8, 2020
118c8f1
minor fix and add test for compare location
WangXiangUSTC Mar 8, 2020
109fdd5
fix test
WangXiangUSTC Mar 8, 2020
2b807cd
fix test
WangXiangUSTC Mar 8, 2020
c5bfffa
add help function
WangXiangUSTC Mar 8, 2020
d8dea9c
remove useless code
WangXiangUSTC Mar 8, 2020
a59f087
address comment
WangXiangUSTC Mar 8, 2020
072222a
some test use gtid && minor fix
WangXiangUSTC Mar 8, 2020
916ffa1
ignore fake rotate event
WangXiangUSTC Mar 9, 2020
9648397
update current location before redirect streamer
WangXiangUSTC Mar 9, 2020
9f48852
address comment
WangXiangUSTC Mar 9, 2020
d5584c7
add some clone
WangXiangUSTC Mar 9, 2020
c63832e
Merge branch 'master' into xiang/support_GTID
WangXiangUSTC Mar 9, 2020
53bef75
add location clone
WangXiangUSTC Mar 9, 2020
5605ab8
address comment
WangXiangUSTC Mar 9, 2020
35248aa
address comemnt
WangXiangUSTC Mar 9, 2020
e0b6cb5
address comment
WangXiangUSTC Mar 9, 2020
ca671b5
add wait time
WangXiangUSTC Mar 9, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ var (
type Meta struct {
BinLogName string `toml:"binlog-name" yaml:"binlog-name"`
BinLogPos uint32 `toml:"binlog-pos" yaml:"binlog-pos"`
BinLogGTID string `toml:"binlog-gtid" yaml:"binlog-gtid"`
}

// Verify does verification on configs
Expand Down
282 changes: 141 additions & 141 deletions dm/pb/dmworker.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion dm/proto/dmworker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ message LoadStatus {
message ShardingGroup {
string target = 1;
repeated string DDLs = 2;
string firstPos = 3;
string firstLocation = 3;
repeated string synced = 4;
repeated string unsynced = 5;
}
Expand Down
5 changes: 3 additions & 2 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,7 @@ func makeCommonWorkerResponse(reqErr error) *pb.CommonWorkerResponse {

// all subTask in subTaskCfgs should have same source
// this function return the min position in all subtasks, used for relay's position
// Notes: used for relay, so don't need to use GTID
func getMinPosInAllSubTasks(ctx context.Context, subTaskCfgs []*config.SubTaskConfig) (minPos *mysql.Position, err error) {
for _, subTaskCfg := range subTaskCfgs {
pos, err := getMinPosForSubTaskFunc(ctx, subTaskCfg)
Expand Down Expand Up @@ -853,6 +854,6 @@ func getMinPosForSubTask(ctx context.Context, subTaskCfg *config.SubTaskConfig)
return nil, errors.Annotate(err, "get min position from checkpoint")
}

pos := checkpoint.GlobalPoint()
return &pos, nil
location := checkpoint.GlobalPoint()
return &location.Position, nil
}
1 change: 1 addition & 0 deletions dm/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,7 @@ func (w *Worker) copyConfigFromWorker(cfg *config.SubTaskConfig) {
cfg.ServerID = w.cfg.ServerID
cfg.RelayDir = w.cfg.RelayDir
cfg.EnableGTID = w.cfg.EnableGTID
cfg.SyncerConfig.EnableGTID = w.cfg.EnableGTID
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
cfg.UseRelay = w.cfg.EnableRelay

// we can remove this from SubTaskConfig later, because syncer will always read from relay
Expand Down
2 changes: 1 addition & 1 deletion loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1194,7 +1194,7 @@ func (l *Loader) checkpointID() string {

func (l *Loader) getMydumpMetadata() error {
metafile := filepath.Join(l.cfg.LoaderConfig.Dir, "metadata")
pos, err := utils.ParseMetaData(metafile)
pos, _, err := utils.ParseMetaData(metafile)
if err != nil {
l.logCtx.L().Error("fail to parse dump metadata", log.ShortError(err))
return err
Expand Down
37 changes: 37 additions & 0 deletions pkg/binlog/position.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
package binlog

import (
"fmt"
"strconv"
"strings"

gmysql "github.com/siddontang/go-mysql/mysql"

"github.com/pingcap/dm/pkg/gtid"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
)
Expand Down Expand Up @@ -160,3 +162,38 @@ func ComparePosition(pos1, pos2 gmysql.Position) int {

return adjustedPos1.Compare(adjustedPos2)
}

// Location is used for save binlog's position and gtid
type Location struct {
Position gmysql.Position

GTIDSet gtid.Set
}

func (p Location) String() string {
return fmt.Sprintf("Position: %v, GTIDSet: %s", p.Position, p.GTIDSet)
}

// CompareLocation returns:
// 1 if point1 is bigger than point2
// 0 if point1 is equal to point2
// -1 if point1 is less than point2
func CompareLocation(location1, location2 Location) int {
if location1.GTIDSet != nil && len(location1.GTIDSet.String()) != 0 &&
location2.GTIDSet != nil && len(location2.GTIDSet.String()) != 0 {
contain1 := location1.GTIDSet.Contain(location2.GTIDSet)
contain2 := location2.GTIDSet.Contain(location1.GTIDSet)
if contain1 && contain2 {
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
// gtidSet1 contains gtidSet2 and gtidSet2 contains gtidSet1 means gtidSet1 equals to gtidSet2,
// then need to compare by position.
} else {
if contain1 {
return 1
} else if contain2 {
return -1
}
}
}

return ComparePosition(location1.Position, location2.Position)
}
87 changes: 51 additions & 36 deletions pkg/gtid/gtid.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ func ParserGTID(flavor, gtidStr string) (Set, error) {

switch flavor {
case mysql.MariaDBFlavor:
m = &mariadbGTIDSet{}
m = &MariadbGTIDSet{}
case mysql.MySQLFlavor:
m = &mySQLGTIDSet{}
m = &MySQLGTIDSet{}
default:
return nil, terror.ErrNotSupportedFlavor.Generate(flavor)
}
Expand All @@ -72,12 +72,12 @@ func ParserGTID(flavor, gtidStr string) (Set, error) {

// MySQLGTIDSet wraps mysql.MysqlGTIDSet to implement gtidSet interface
// extend some functions to retrieve and compute an intersection with other MySQL GTID Set
type mySQLGTIDSet struct {
type MySQLGTIDSet struct {
set *mysql.MysqlGTIDSet
}

// replace g by other
func (g *mySQLGTIDSet) Set(other mysql.GTIDSet) error {
// Set implements Set.Set, replace g by other
func (g *MySQLGTIDSet) Set(other mysql.GTIDSet) error {
if other == nil {
return nil
}
Expand All @@ -91,12 +91,13 @@ func (g *mySQLGTIDSet) Set(other mysql.GTIDSet) error {
return nil
}

func (g *mySQLGTIDSet) Replace(other Set, masters []interface{}) error {
// Replace implements Set.Replace
func (g *MySQLGTIDSet) Replace(other Set, masters []interface{}) error {
if other == nil {
return nil
}

otherGS, ok := other.(*mySQLGTIDSet)
otherGS, ok := other.(*MySQLGTIDSet)
if !ok {
return terror.ErrNotMySQLGTID.Generate(other)
}
Expand Down Expand Up @@ -124,29 +125,32 @@ func (g *mySQLGTIDSet) Replace(other Set, masters []interface{}) error {
return nil
}

func (g *mySQLGTIDSet) delete(uuid string) {
func (g *MySQLGTIDSet) delete(uuid string) {
delete(g.set.Sets, uuid)
}

func (g *mySQLGTIDSet) get(uuid string) (*mysql.UUIDSet, bool) {
func (g *MySQLGTIDSet) get(uuid string) (*mysql.UUIDSet, bool) {
uuidSet, ok := g.set.Sets[uuid]
return uuidSet, ok
}

func (g *mySQLGTIDSet) Clone() Set {
return &mySQLGTIDSet{
// Clone implements Set.Clone
func (g *MySQLGTIDSet) Clone() Set {
return &MySQLGTIDSet{
set: g.set.Clone().(*mysql.MysqlGTIDSet),
}
}

func (g *mySQLGTIDSet) Origin() mysql.GTIDSet {
// Origin implements Set.Origin
func (g *MySQLGTIDSet) Origin() mysql.GTIDSet {
return g.set.Clone().(*mysql.MysqlGTIDSet)
}

func (g *mySQLGTIDSet) Equal(other Set) bool {
// Equal implements Set.Equal
func (g *MySQLGTIDSet) Equal(other Set) bool {
otherIsNil := other == nil
if !otherIsNil {
otherGS, ok := other.(*mySQLGTIDSet)
otherGS, ok := other.(*MySQLGTIDSet)
if !ok {
return false
}
Expand All @@ -162,10 +166,11 @@ func (g *mySQLGTIDSet) Equal(other Set) bool {
return g.set.Equal(other.Origin())
}

func (g *mySQLGTIDSet) Contain(other Set) bool {
// Contain implements Set.Contain
func (g *MySQLGTIDSet) Contain(other Set) bool {
otherIsNil := other == nil
if !otherIsNil {
otherGs, ok := other.(*mySQLGTIDSet)
otherGs, ok := other.(*MySQLGTIDSet)
if !ok {
return false
}
Expand All @@ -179,14 +184,15 @@ func (g *mySQLGTIDSet) Contain(other Set) bool {
return g.set.Contain(other.Origin())
}

func (g *mySQLGTIDSet) Truncate(end Set) error {
// Truncate implements Set.Truncate
func (g *MySQLGTIDSet) Truncate(end Set) error {
if end == nil {
return nil // do nothing
}
if !g.Contain(end) {
return terror.ErrGTIDTruncateInvalid.Generate(g, end)
}
endGs := end.(*mySQLGTIDSet) // already verify the type is `*mySQLGTIDSet` in `Contain`.
endGs := end.(*MySQLGTIDSet) // already verify the type is `*MySQLGTIDSet` in `Contain`.
if endGs == nil {
return nil // do nothing
}
Expand All @@ -209,20 +215,23 @@ func (g *mySQLGTIDSet) Truncate(end Set) error {
return nil
}

func (g *mySQLGTIDSet) String() string {
func (g *MySQLGTIDSet) String() string {
if g.set == nil {
return ""
}
return g.set.String()
}

/************************ mariadb gtid set ***************************/
type mariadbGTIDSet struct {

// MariadbGTIDSet wraps mysql.MariadbGTIDSet to implement gtidSet interface
// extend some functions to retrieve and compute an intersection with other Mariadb GTID Set
type MariadbGTIDSet struct {
set *mysql.MariadbGTIDSet
}

// replace g by other
func (m *mariadbGTIDSet) Set(other mysql.GTIDSet) error {
// Set implements Set.Set, replace g by other
func (m *MariadbGTIDSet) Set(other mysql.GTIDSet) error {
if other == nil {
return nil
}
Expand All @@ -236,12 +245,13 @@ func (m *mariadbGTIDSet) Set(other mysql.GTIDSet) error {
return nil
}

func (m *mariadbGTIDSet) Replace(other Set, masters []interface{}) error {
// Replace implements Set.Replace
func (m *MariadbGTIDSet) Replace(other Set, masters []interface{}) error {
if other == nil {
return nil
}

otherGS, ok := other.(*mariadbGTIDSet)
otherGS, ok := other.(*MariadbGTIDSet)
if !ok {
return terror.ErrNotMariaDBGTID.Generate(other)
}
Expand Down Expand Up @@ -269,29 +279,32 @@ func (m *mariadbGTIDSet) Replace(other Set, masters []interface{}) error {
return nil
}

func (m *mariadbGTIDSet) delete(domainID uint32) {
func (m *MariadbGTIDSet) delete(domainID uint32) {
delete(m.set.Sets, domainID)
}

func (m *mariadbGTIDSet) get(domainID uint32) (*mysql.MariadbGTID, bool) {
func (m *MariadbGTIDSet) get(domainID uint32) (*mysql.MariadbGTID, bool) {
gtid, ok := m.set.Sets[domainID]
return gtid, ok
}

func (m *mariadbGTIDSet) Clone() Set {
return &mariadbGTIDSet{
// Clone implements Set.Clone
func (m *MariadbGTIDSet) Clone() Set {
return &MariadbGTIDSet{
set: m.set.Clone().(*mysql.MariadbGTIDSet),
}
}

func (m *mariadbGTIDSet) Origin() mysql.GTIDSet {
// Origin implements Set.Origin
func (m *MariadbGTIDSet) Origin() mysql.GTIDSet {
return m.set.Clone().(*mysql.MariadbGTIDSet)
}

func (m *mariadbGTIDSet) Equal(other Set) bool {
// Equal implements Set.Equal
func (m *MariadbGTIDSet) Equal(other Set) bool {
otherIsNil := other == nil
if !otherIsNil {
otherGS, ok := other.(*mariadbGTIDSet)
otherGS, ok := other.(*MariadbGTIDSet)
if !ok {
return false
}
Expand All @@ -307,10 +320,11 @@ func (m *mariadbGTIDSet) Equal(other Set) bool {
return m.set.Equal(other.Origin())
}

func (m *mariadbGTIDSet) Contain(other Set) bool {
// Contain implements Set.Contain
func (m *MariadbGTIDSet) Contain(other Set) bool {
otherIsNil := other == nil
if !otherIsNil {
otherGS, ok := other.(*mariadbGTIDSet)
otherGS, ok := other.(*MariadbGTIDSet)
if !ok {
return false
}
Expand All @@ -324,14 +338,15 @@ func (m *mariadbGTIDSet) Contain(other Set) bool {
return m.set.Contain(other.Origin())
}

func (m *mariadbGTIDSet) Truncate(end Set) error {
// Truncate implements Set.Truncate
func (m *MariadbGTIDSet) Truncate(end Set) error {
if end == nil {
return nil // do nothing
}
if !m.Contain(end) {
return terror.ErrGTIDTruncateInvalid.Generate(m, end)
}
endGs := end.(*mariadbGTIDSet) // already verify the type is `*mariadbGTIDSet` in `Contain`.
endGs := end.(*MariadbGTIDSet) // already verify the type is `*MariadbGTIDSet` in `Contain`.
if endGs == nil {
return nil // do nothing
}
Expand All @@ -350,7 +365,7 @@ func (m *mariadbGTIDSet) Truncate(end Set) error {
return nil
}

func (m *mariadbGTIDSet) String() string {
func (m *MariadbGTIDSet) String() string {
if m.set == nil {
return ""
}
Expand Down
Loading