diff --git a/mysql/gtid.go b/mysql/gtid.go index f35c4d731..ae519ef25 100644 --- a/mysql/gtid.go +++ b/mysql/gtid.go @@ -1,6 +1,8 @@ package mysql -import "github.com/pingcap/errors" +import ( + "github.com/pingcap/errors" +) type GTIDSet interface { String() string diff --git a/mysql/mysql_gtid.go b/mysql/mysql_gtid.go index b410bdbc6..47eae6294 100644 --- a/mysql/mysql_gtid.go +++ b/mysql/mysql_gtid.go @@ -111,6 +111,48 @@ func (s IntervalSlice) Normalize() IntervalSlice { return n } +func min(a, b int64) int64 { + if a < b { + return a + } + return b +} + +func max(a, b int64) int64 { + if a > b { + return a + } + return b +} + +func (s *IntervalSlice) InsertInterval(interval Interval) { + var ( + count int + i int + ) + + *s = append(*s, interval) + total := len(*s) + for i = total - 1; i > 0; i-- { + if (*s)[i].Stop < (*s)[i-1].Start { + (*s)[i], (*s)[i-1] = (*s)[i-1], (*s)[i] + } else if (*s)[i].Start > (*s)[i-1].Stop { + break + } else { + (*s)[i-1].Start = min((*s)[i-1].Start, (*s)[i].Start) + (*s)[i-1].Stop = max((*s)[i-1].Stop, (*s)[i].Stop) + count++ + } + } + if count > 0 { + i++ + if i+count < total { + copy((*s)[i:], (*s)[i+count:]) + } + *s = (*s)[:total-count] + } +} + // Contain returns true if sub in s func (s IntervalSlice) Contain(sub IntervalSlice) bool { j := 0 @@ -343,10 +385,9 @@ func (s *UUIDSet) Decode(data []byte) error { func (s *UUIDSet) Clone() *UUIDSet { clone := new(UUIDSet) - - copy(clone.SID[:], s.SID[:]) - clone.Intervals = s.Intervals.Normalize() - + clone.SID = s.SID + clone.Intervals = make([]Interval, len(s.Intervals)) + copy(clone.Intervals, s.Intervals) return clone } @@ -439,6 +480,16 @@ func (s *MysqlGTIDSet) Update(GTIDStr string) error { return nil } +func (s *MysqlGTIDSet) AddGTID(uuid uuid.UUID, gno int64) { + sid := uuid.String() + o, ok := s.Sets[sid] + if ok { + o.Intervals.InsertInterval(Interval{gno, gno + 1}) + } else { + s.Sets[sid] = &UUIDSet{uuid, IntervalSlice{Interval{gno, gno + 1}}} + } +} + func (s *MysqlGTIDSet) Add(addend MysqlGTIDSet) error { for _, uuidSet := range addend.Sets { s.AddSet(uuidSet) diff --git a/mysql/mysql_test.go b/mysql/mysql_test.go index da314078b..7857ddc4e 100644 --- a/mysql/mysql_test.go +++ b/mysql/mysql_test.go @@ -5,6 +5,7 @@ import ( "strings" "testing" + "github.com/google/uuid" "github.com/pingcap/check" ) @@ -77,6 +78,42 @@ func (t *mysqlTestSuite) TestMysqlGTIDIntervalSlice(c *check.C) { c.Assert(n2.Contain(n1), check.Equals, true) } +func (t *mysqlTestSuite) TestMysqlGTIDInsertInterval(c *check.C) { + i := IntervalSlice{Interval{100, 200}} + i.InsertInterval(Interval{300, 400}) + c.Assert(i, check.DeepEquals, IntervalSlice{Interval{100, 200}, Interval{300, 400}}) + + i.InsertInterval(Interval{50, 70}) + c.Assert(i, check.DeepEquals, IntervalSlice{Interval{50, 70}, Interval{100, 200}, Interval{300, 400}}) + + i.InsertInterval(Interval{101, 201}) + c.Assert(i, check.DeepEquals, IntervalSlice{Interval{50, 70}, Interval{100, 201}, Interval{300, 400}}) + + i.InsertInterval(Interval{99, 202}) + c.Assert(i, check.DeepEquals, IntervalSlice{Interval{50, 70}, Interval{99, 202}, Interval{300, 400}}) + + i.InsertInterval(Interval{102, 302}) + c.Assert(i, check.DeepEquals, IntervalSlice{Interval{50, 70}, Interval{99, 400}}) + + i.InsertInterval(Interval{500, 600}) + c.Assert(i, check.DeepEquals, IntervalSlice{Interval{50, 70}, Interval{99, 400}, Interval{500, 600}}) + + i.InsertInterval(Interval{50, 100}) + c.Assert(i, check.DeepEquals, IntervalSlice{Interval{50, 400}, Interval{500, 600}}) + + i.InsertInterval(Interval{900, 1000}) + c.Assert(i, check.DeepEquals, IntervalSlice{Interval{50, 400}, Interval{500, 600}, Interval{900, 1000}}) + + i.InsertInterval(Interval{1010, 1020}) + c.Assert(i, check.DeepEquals, IntervalSlice{Interval{50, 400}, Interval{500, 600}, Interval{900, 1000}, Interval{1010, 1020}}) + + i.InsertInterval(Interval{49, 1000}) + c.Assert(i, check.DeepEquals, IntervalSlice{Interval{49, 1000}, Interval{1010, 1020}}) + + i.InsertInterval(Interval{1, 1012}) + c.Assert(i, check.DeepEquals, IntervalSlice{Interval{1, 1020}}) +} + func (t *mysqlTestSuite) TestMysqlGTIDCodec(c *check.C) { us, err := ParseUUIDSet("de278ad0-2106-11e4-9f8e-6edd0ca20947:1-2") c.Assert(err, check.IsNil) @@ -129,6 +166,35 @@ func (t *mysqlTestSuite) TestMysqlUpdate(c *check.C) { c.Assert(g2.Equal(g1), check.IsTrue) } +func (t *mysqlTestSuite) TestMysqlAddGTID(c *check.C) { + g, err := ParseMysqlGTIDSet("3E11FA47-71CA-11E1-9E33-C80AA9429562:21-57") + c.Assert(err, check.IsNil) + + g1 := g.(*MysqlGTIDSet) + + u, err := uuid.Parse("3E11FA47-71CA-11E1-9E33-C80AA9429562") + c.Assert(err, check.IsNil) + + g1.AddGTID(u, 58) + c.Assert(strings.ToUpper(g1.String()), check.Equals, "3E11FA47-71CA-11E1-9E33-C80AA9429562:21-58") + + g1.AddGTID(u, 60) + c.Assert(strings.ToUpper(g1.String()), check.Equals, "3E11FA47-71CA-11E1-9E33-C80AA9429562:21-58:60") + + g1.AddGTID(u, 59) + c.Assert(strings.ToUpper(g1.String()), check.Equals, "3E11FA47-71CA-11E1-9E33-C80AA9429562:21-60") + + u2, err := uuid.Parse("519CE70F-A893-11E9-A95A-B32DC65A7026") + c.Assert(err, check.IsNil) + g1.AddGTID(u2, 58) + g2, err := ParseMysqlGTIDSet(` + 3E11FA47-71CA-11E1-9E33-C80AA9429562:21-60, + 519CE70F-A893-11E9-A95A-B32DC65A7026:58 +`) + c.Assert(err, check.IsNil) + c.Assert(g2.Equal(g1), check.IsTrue) +} + func (t *mysqlTestSuite) TestMysqlGTIDContain(c *check.C) { g1, err := ParseMysqlGTIDSet("3E11FA47-71CA-11E1-9E33-C80AA9429562:23") c.Assert(err, check.IsNil) diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 082bdde98..0c434c21b 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -799,12 +799,19 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { return b.currGset.Clone() } - advanceCurrentGtidSet := func(gtid string) error { + advanceCurrentGtidSet := func(uuid uuid.UUID, gno int64, domainID uint32, serverID uint32, sequenceNumber uint64) (err error) { if b.currGset == nil { b.currGset = b.prevGset.Clone() } prev := b.currGset.Clone() - err := b.currGset.Update(gtid) + switch gset := b.currGset.(type) { + case *MysqlGTIDSet: + gset.AddGTID(uuid, gno) + case *MariadbGTIDSet: + err = gset.AddSet(&MariadbGTID{DomainID: domainID, ServerID: serverID, SequenceNumber: sequenceNumber}) + default: + err = errors.Errorf("unsupported GTIDSet type %T", b.currGset) + } if err == nil { // right after reconnect we will see same gtid as we saw before, thus currGset will not get changed if !b.currGset.Equal(prev) { @@ -824,7 +831,7 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { break } u, _ := uuid.FromBytes(event.SID) - err := advanceCurrentGtidSet(fmt.Sprintf("%s:%d", u.String(), event.GNO)) + err := advanceCurrentGtidSet(u, event.GNO, 0, 0, 0) if err != nil { return errors.Trace(err) } @@ -833,7 +840,7 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { break } GTID := event.GTID - err := advanceCurrentGtidSet(fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber)) + err := advanceCurrentGtidSet(uuid.Nil, 0, GTID.DomainID, GTID.ServerID, GTID.SequenceNumber) if err != nil { return errors.Trace(err) }