diff --git a/client/conn.go b/client/conn.go index 7716bf387..9d2fb78d7 100644 --- a/client/conn.go +++ b/client/conn.go @@ -9,10 +9,11 @@ import ( "strings" "time" + "github.com/pingcap/errors" + . "github.com/go-mysql-org/go-mysql/mysql" "github.com/go-mysql-org/go-mysql/packet" "github.com/go-mysql-org/go-mysql/utils" - "github.com/pingcap/errors" ) type Conn struct { @@ -198,6 +199,10 @@ func (c *Conn) GetServerVersion() string { return c.serverVersion } +func (c *Conn) CompareServerVersion(v string) (int, error) { + return CompareServerVersions(c.serverVersion, v) +} + func (c *Conn) Execute(command string, args ...interface{}) (*Result, error) { if len(args) == 0 { return c.exec(command) diff --git a/client/resp.go b/client/resp.go index ab809cc67..dfdfea3b7 100644 --- a/client/resp.go +++ b/client/resp.go @@ -6,6 +6,7 @@ import ( "crypto/x509" "encoding/binary" "encoding/pem" + "fmt" "github.com/pingcap/errors" "github.com/siddontang/go/hack" @@ -60,9 +61,9 @@ func (c *Conn) handleOKPacket(data []byte) (*Result, error) { // pos += 2 } - //new ok package will check CLIENT_SESSION_TRACK too, but I don't support it now. + // new ok package will check CLIENT_SESSION_TRACK too, but I don't support it now. - //skip info + // skip info return r, nil } @@ -75,7 +76,7 @@ func (c *Conn) handleErrorPacket(data []byte) error { pos += 2 if c.capability&CLIENT_PROTOCOL_41 > 0 { - //skip '#' + // skip '#' pos++ e.State = hack.String(data[pos : pos+5]) pos += 5 @@ -89,11 +90,11 @@ func (c *Conn) handleErrorPacket(data []byte) error { func (c *Conn) handleAuthResult() error { data, switchToPlugin, err := c.readAuthResult() if err != nil { - return err + return fmt.Errorf("readAuthResult: %w", err) } // handle auth switch, only support 'sha256_password', and 'caching_sha2_password' if switchToPlugin != "" { - //fmt.Printf("now switching auth plugin to '%s'\n", switchToPlugin) + // fmt.Printf("now switching auth plugin to '%s'\n", switchToPlugin) if data == nil { data = c.salt } else { @@ -168,7 +169,7 @@ func (c *Conn) handleAuthResult() error { func (c *Conn) readAuthResult() ([]byte, string, error) { data, err := c.ReadPacket() if err != nil { - return nil, "", err + return nil, "", fmt.Errorf("ReadPacket: %w", err) } // see: https://insidemysql.com/preparing-your-community-connector-for-mysql-8-part-2-sha256/ @@ -351,7 +352,7 @@ func (c *Conn) readResultColumns(result *Result) (err error) { if c.isEOFPacket(data) { if c.capability&CLIENT_PROTOCOL_41 > 0 { result.Warnings = binary.LittleEndian.Uint16(data[1:]) - //todo add strict_mode, warning will be treat as error + // todo add strict_mode, warning will be treat as error result.Status = binary.LittleEndian.Uint16(data[3:]) c.status = result.Status } @@ -392,7 +393,7 @@ func (c *Conn) readResultRows(result *Result, isBinary bool) (err error) { if c.isEOFPacket(data) { if c.capability&CLIENT_PROTOCOL_41 > 0 { result.Warnings = binary.LittleEndian.Uint16(data[1:]) - //todo add strict_mode, warning will be treat as error + // todo add strict_mode, warning will be treat as error result.Status = binary.LittleEndian.Uint16(data[3:]) c.status = result.Status } diff --git a/go.mod b/go.mod index b79f997e2..a8ac10eba 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.16 require ( github.com/BurntSushi/toml v0.3.1 github.com/DataDog/zstd v1.5.2 + github.com/Masterminds/semver v1.5.0 github.com/go-sql-driver/mysql v1.6.0 github.com/google/uuid v1.3.0 github.com/jmoiron/sqlx v1.3.3 @@ -16,6 +17,5 @@ require ( github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07 github.com/stretchr/testify v1.8.0 - golang.org/x/mod v0.3.0 golang.org/x/tools v0.0.0-20201125231158-b5590deeca9b // indirect ) diff --git a/go.sum b/go.sum index ea24a843c..3ddb545c8 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,8 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/DataDog/zstd v1.5.2 h1:vUG4lAyuPCXO0TLbXvPv7EB7cNK1QV/luu55UHLrrn8= github.com/DataDog/zstd v1.5.2/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= +github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww= +github.com/Masterminds/semver v1.5.0/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548/go.mod h1:e6NPNENfs9mPDVNRekM7lKScauxd5kXTr1Mfyig6TDM= @@ -75,7 +77,6 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/exp v0.0.0-20181106170214-d68db9428509/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= -golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= diff --git a/mysql/util.go b/mysql/util.go index e8d436fa2..6d8ec4471 100644 --- a/mysql/util.go +++ b/mysql/util.go @@ -13,6 +13,7 @@ import ( "strings" "time" + "github.com/Masterminds/semver" "github.com/pingcap/errors" "github.com/siddontang/go/hack" ) @@ -379,6 +380,23 @@ func ErrorEqual(err1, err2 error) bool { return e1.Error() == e2.Error() } +func CompareServerVersions(a, b string) (int, error) { + var ( + aVer, bVer *semver.Version + err error + ) + + if aVer, err = semver.NewVersion(a); err != nil { + return 0, fmt.Errorf("cannot parse %q as semver: %w", a, err) + } + + if bVer, err = semver.NewVersion(b); err != nil { + return 0, fmt.Errorf("cannot parse %q as semver: %w", b, err) + } + + return aVer.Compare(bVer), nil +} + var encodeRef = map[byte]byte{ '\x00': '0', '\'': '\'', diff --git a/mysql/util_test.go b/mysql/util_test.go new file mode 100644 index 000000000..29a5b02e4 --- /dev/null +++ b/mysql/util_test.go @@ -0,0 +1,30 @@ +package mysql + +import ( + "github.com/pingcap/check" +) + +type utilTestSuite struct { +} + +var _ = check.Suite(&utilTestSuite{}) + +func (s *utilTestSuite) TestCompareServerVersions(c *check.C) { + tests := []struct { + A string + B string + Expect int + }{ + {A: "1.2.3", B: "1.2.3", Expect: 0}, + {A: "5.6-999", B: "8.0", Expect: -1}, + {A: "8.0.32-0ubuntu0.20.04.2", B: "8.0.28", Expect: 1}, + } + + for _, test := range tests { + comment := check.Commentf("%q vs. %q", test.A, test.B) + + got, err := CompareServerVersions(test.A, test.B) + c.Assert(err, check.IsNil, comment) + c.Assert(got, check.Equals, test.Expect, comment) + } +} diff --git a/replication/json_binary.go b/replication/json_binary.go index f1e0abd33..7a8217181 100644 --- a/replication/json_binary.go +++ b/replication/json_binary.go @@ -5,9 +5,10 @@ import ( "fmt" "math" - . "github.com/go-mysql-org/go-mysql/mysql" "github.com/pingcap/errors" "github.com/siddontang/go/hack" + + . "github.com/go-mysql-org/go-mysql/mysql" ) const ( @@ -44,6 +45,60 @@ const ( jsonbValueEntrySizeLarge = 1 + jsonbLargeOffsetSize ) +var ( + ErrCorruptedJSONDiff = fmt.Errorf("corrupted JSON diff") // ER_CORRUPTED_JSON_DIFF +) + +type ( + // JsonDiffOperation is an enum that describes what kind of operation a JsonDiff object represents. + // https://github.com/mysql/mysql-server/blob/8.0/sql/json_diff.h + JsonDiffOperation byte +) + +const ( + // The JSON value in the given path is replaced with a new value. + // + // It has the same effect as `JSON_REPLACE(col, path, value)`. + JsonDiffOperationReplace = JsonDiffOperation(iota) + + // Add a new element at the given path. + // + // If the path specifies an array element, it has the same effect as `JSON_ARRAY_INSERT(col, path, value)`. + // + // If the path specifies an object member, it has the same effect as `JSON_INSERT(col, path, value)`. + JsonDiffOperationInsert + + // The JSON value at the given path is removed from an array or object. + // + // It has the same effect as `JSON_REMOVE(col, path)`. + JsonDiffOperationRemove +) + +type ( + JsonDiff struct { + Op JsonDiffOperation + Path string + Value string + } +) + +func (op JsonDiffOperation) String() string { + switch op { + case JsonDiffOperationReplace: + return "Replace" + case JsonDiffOperationInsert: + return "Insert" + case JsonDiffOperationRemove: + return "Remove" + default: + return fmt.Sprintf("Unknown(%d)", op) + } +} + +func (jd *JsonDiff) String() string { + return fmt.Sprintf("json_diff(op:%s path:%s value:%s)", jd.Op, jd.Path, jd.Value) +} + func jsonbGetOffsetSize(isSmall bool) int { if isSmall { return jsonbSmallOffsetSize @@ -71,11 +126,6 @@ func jsonbGetValueEntrySize(isSmall bool) int { // decodeJsonBinary decodes the JSON binary encoding data and returns // the common JSON encoding data. func (e *RowsEvent) decodeJsonBinary(data []byte) ([]byte, error) { - // Sometimes, we can insert a NULL JSON even we set the JSON field as NOT NULL. - // If we meet this case, we can return an empty slice. - if len(data) == 0 { - return []byte{}, nil - } d := jsonBinaryDecoder{ useDecimal: e.useDecimal, ignoreDecodeErr: e.ignoreJSONDecodeErr, @@ -491,3 +541,43 @@ func (d *jsonBinaryDecoder) decodeVariableLength(data []byte) (int, int) { return 0, 0 } + +func (e *RowsEvent) decodeJsonPartialBinary(data []byte) (*JsonDiff, error) { + // see Json_diff_vector::read_binary() in mysql-server/sql/json_diff.cc + operationNumber := JsonDiffOperation(data[0]) + switch operationNumber { + case JsonDiffOperationReplace: + case JsonDiffOperationInsert: + case JsonDiffOperationRemove: + default: + return nil, ErrCorruptedJSONDiff + } + data = data[1:] + + pathLength, _, n := LengthEncodedInt(data) + data = data[n:] + + path := data[:pathLength] + data = data[pathLength:] + + diff := &JsonDiff{ + Op: operationNumber, + Path: string(path), + // Value will be filled below + } + + if operationNumber == JsonDiffOperationRemove { + return diff, nil + } + + valueLength, _, n := LengthEncodedInt(data) + data = data[n:] + + d, err := e.decodeJsonBinary(data[:valueLength]) + if err != nil { + return nil, fmt.Errorf("cannot read json diff for field %q: %w", path, err) + } + diff.Value = string(d) + + return diff, nil +} diff --git a/replication/parser.go b/replication/parser.go index cb34707e6..37b9c5a6c 100644 --- a/replication/parser.go +++ b/replication/parser.go @@ -269,7 +269,8 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) ( UPDATE_ROWS_EVENTv1, WRITE_ROWS_EVENTv2, UPDATE_ROWS_EVENTv2, - DELETE_ROWS_EVENTv2: + DELETE_ROWS_EVENTv2, + PARTIAL_UPDATE_ROWS_EVENT: // Extension of UPDATE_ROWS_EVENT, allowing partial values according to binlog_row_value_options e = p.newRowsEvent(h) case ROWS_QUERY_EVENT: e = &RowsQueryEvent{} @@ -381,7 +382,9 @@ func (p *BinlogParser) verifyCrc32Checksum(rawData []byte) error { func (p *BinlogParser) newRowsEvent(h *EventHeader) *RowsEvent { e := &RowsEvent{} - if p.format.EventTypeHeaderLengths[h.EventType-1] == 6 { + + postHeaderLen := p.format.EventTypeHeaderLengths[h.EventType-1] + if postHeaderLen == 6 { e.tableIDSize = 4 } else { e.tableIDSize = 6 @@ -389,6 +392,7 @@ func (p *BinlogParser) newRowsEvent(h *EventHeader) *RowsEvent { e.needBitmap2 = false e.tables = p.tables + e.eventType = h.EventType e.parseTime = p.parseTime e.timestampStringLocation = p.timestampStringLocation e.useDecimal = p.useDecimal @@ -415,6 +419,9 @@ func (p *BinlogParser) newRowsEvent(h *EventHeader) *RowsEvent { e.needBitmap2 = true case DELETE_ROWS_EVENTv2: e.Version = 2 + case PARTIAL_UPDATE_ROWS_EVENT: + e.Version = 2 + e.needBitmap2 = true } return e diff --git a/replication/parser_test.go b/replication/parser_test.go index a9d3100c4..48137d700 100644 --- a/replication/parser_test.go +++ b/replication/parser_test.go @@ -14,7 +14,7 @@ func (t *testSyncerSuite) TestIndexOutOfRange(c *C) { ServerVersion: []uint8{0x35, 0x2e, 0x36, 0x2e, 0x32, 0x30, 0x2d, 0x6c, 0x6f, 0x67, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, CreateTimestamp: 0x0, EventHeaderLength: 0x13, - EventTypeHeaderLengths: []uint8{0x38, 0xd, 0x0, 0x8, 0x0, 0x12, 0x0, 0x4, 0x4, 0x4, 0x4, 0x12, 0x0, 0x0, 0x5c, 0x0, 0x4, 0x1a, 0x8, 0x0, 0x0, 0x0, 0x8, 0x8, 0x8, 0x2, 0x0, 0x0, 0x0, 0xa, 0xa, 0xa, 0x19, 0x19, 0x0}, + EventTypeHeaderLengths: []uint8{0x38, 0xd, 0x0, 0x8, 0x0, 0x12, 0x0, 0x4, 0x4, 0x4, 0x4, 0x12, 0x0, 0x0, 0x5c, 0x0, 0x4, 0x1a, 0x8, 0x0, 0x0, 0x0, 0x8, 0x8, 0x8, 0x2, 0x0, 0x0, 0x0, 0xa, 0xa, 0xa, 0x19, 0x19, 0x0, 0x12, 0x34, 0x0, 0xa, 0x28, 0x0}, ChecksumAlgorithm: 0x1, } @@ -50,7 +50,7 @@ func (t *testSyncerSuite) TestParseEvent(c *C) { ServerVersion: []uint8{0x35, 0x2e, 0x36, 0x2e, 0x32, 0x30, 0x2d, 0x6c, 0x6f, 0x67, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, CreateTimestamp: 0x0, EventHeaderLength: 0x13, - EventTypeHeaderLengths: []uint8{0x38, 0xd, 0x0, 0x8, 0x0, 0x12, 0x0, 0x4, 0x4, 0x4, 0x4, 0x12, 0x0, 0x0, 0x5c, 0x0, 0x4, 0x1a, 0x8, 0x0, 0x0, 0x0, 0x8, 0x8, 0x8, 0x2, 0x0, 0x0, 0x0, 0xa, 0xa, 0xa, 0x19, 0x19, 0x0}, + EventTypeHeaderLengths: []uint8{0x38, 0xd, 0x0, 0x8, 0x0, 0x12, 0x0, 0x4, 0x4, 0x4, 0x4, 0x12, 0x0, 0x0, 0x5c, 0x0, 0x4, 0x1a, 0x8, 0x0, 0x0, 0x0, 0x8, 0x8, 0x8, 0x2, 0x0, 0x0, 0x0, 0xa, 0xa, 0xa, 0x19, 0x19, 0x0, 0x12, 0x34, 0x0, 0xa, 0x28, 0x0}, ChecksumAlgorithm: 0x0, } testCases := []struct { @@ -108,3 +108,34 @@ func (t *testSyncerSuite) TestRowsEventDecodeFunc(c *C) { c.Assert(e.Header.EventSize, Equals, tc.eventSize) } } + +func (t *testSyncerSuite) TestRowsEventDecodeImageWithEmptyJSON(c *C) { + data := []byte("\x01\a\x00\xf6+\x0f\x00\xeb\xafP\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x99\xac\xfa\xbeÙ\xaf\xab1\x184\x11\x00\x00") + + bitmap := []byte{255} + + table := TableMapEvent{ + ColumnType: []byte{3, 3, 245, 245, 245, 18, 18, 3}, + ColumnMeta: []uint16{0, 0, 4, 4, 4, 0, 0, 0}, + } + + e := RowsEvent{ + eventType: PARTIAL_UPDATE_ROWS_EVENT, + Table: &table, + ColumnCount: uint64(len(table.ColumnType)), + } + n, err := e.decodeImage(data, bitmap, EnumRowImageTypeUpdateAI) + c.Assert(err, IsNil) + c.Assert(n, Equals, len(data)) + + c.Assert(len(e.Rows), Equals, 1) + c.Assert(len(e.Rows[0]), Equals, len(table.ColumnType)) + + row := e.Rows[0] + c.Assert(row[0], Equals, int32(994294)) + c.Assert(row[1], Equals, int32(38842347)) + c.Assert(row[2], DeepEquals, []byte{}) // empty json + c.Assert(row[3], DeepEquals, []byte{}) // empty json + c.Assert(row[4], DeepEquals, []byte{}) // empty json + c.Assert(row[7], Equals, int32(4404)) +} diff --git a/replication/replication_test.go b/replication/replication_test.go index ae530cb3c..0cb7512be 100644 --- a/replication/replication_test.go +++ b/replication/replication_test.go @@ -12,7 +12,6 @@ import ( "github.com/google/uuid" . "github.com/pingcap/check" - "golang.org/x/mod/semver" "github.com/go-mysql-org/go-mysql/client" "github.com/go-mysql-org/go-mysql/mysql" @@ -59,7 +58,9 @@ func (t *testSyncerSuite) TearDownTest(c *C) { func (t *testSyncerSuite) testExecute(c *C, query string) { _, err := t.c.Execute(query) - c.Assert(err, IsNil) + if err != nil { + c.Assert(fmt.Errorf("query %q execution failed: %w", query, err), IsNil) + } } func (t *testSyncerSuite) testSync(c *C, s *BinlogStreamer) { @@ -126,11 +127,14 @@ func (t *testSyncerSuite) testSync(c *C, s *BinlogStreamer) { "2012-05-07", "2012-05-07 14:01:01", "2012-05-07 14:01:01", "14:01:01", -45363.64, "abc", "12345", "a,b")`) - id := 100 - if t.flavor == mysql.MySQLFlavor { t.testExecute(c, "SET SESSION binlog_row_image = 'MINIMAL'") + if eq, err := t.c.CompareServerVersion("8.0.0"); (err == nil) && (eq >= 0) { + t.testExecute(c, "SET SESSION binlog_row_value_options = 'PARTIAL_JSON'") + } + + const id = 100 t.testExecute(c, fmt.Sprintf(`INSERT INTO test_replication (id, str, f, i, bb, de) VALUES (%d, "4", -3.14, 100, "abc", -45635.64)`, id)) t.testExecute(c, fmt.Sprintf(`UPDATE test_replication SET f = -12.14, de = 555.34 WHERE id = %d`, id)) t.testExecute(c, fmt.Sprintf(`DELETE FROM test_replication WHERE id = %d`, id)) @@ -156,7 +160,7 @@ func (t *testSyncerSuite) testSync(c *C, s *BinlogStreamer) { str = `CREATE TABLE test_json_v2 ( id INT, - c JSON, + c JSON, PRIMARY KEY (id) ) ENGINE=InnoDB` @@ -209,6 +213,21 @@ func (t *testSyncerSuite) testSync(c *C, s *BinlogStreamer) { t.testExecute(c, query) } + // "Partial Updates of JSON Values" from https://dev.mysql.com/doc/refman/8.0/en/json.html + jsonOrig := `'{"a":"aaaaaaaaaaaaa", "c":"ccccccccccccccc", "ab":["abababababababa", "babababababab"]}'` + tbls = []string{ + `ALTER TABLE test_json_v2 ADD COLUMN d JSON DEFAULT NULL, ADD COLUMN e JSON DEFAULT NULL`, + `INSERT INTO test_json_v2 VALUES (101, ` + jsonOrig + `, ` + jsonOrig + `, ` + jsonOrig + `)`, + `UPDATE test_json_v2 SET c = JSON_SET(c, '$.ab', '["ab_updatedccc"]') WHERE id = 101`, + `UPDATE test_json_v2 SET d = JSON_SET(d, '$.ab', '["ab_updatedddd"]') WHERE id = 101`, + `UPDATE test_json_v2 SET e = JSON_SET(e, '$.ab', '["ab_updatedeee"]') WHERE id = 101`, + `UPDATE test_json_v2 SET d = JSON_SET(d, '$.ab', '["ab_ddd"]'), e = json_set(e, '$.ab', '["ab_eee"]') WHERE id = 101`, + // ToDo(atercattus): add more tests with JSON_REPLACE() and JSON_REMOVE() + } + for _, query := range tbls { + t.testExecute(c, query) + } + // If MySQL supports JSON, it must supports GEOMETRY. t.testExecute(c, "DROP TABLE IF EXISTS test_geo") @@ -311,10 +330,12 @@ func (t *testSyncerSuite) testPositionSync(c *C) { c.Assert(r.Values, Not(HasLen), 0) // Slave_UUID is empty for mysql 8.0.28+ (8.0.32 still broken) - if semver.Compare(t.c.GetServerVersion(), "8.0.28") < 0 { + if eq, err := t.c.CompareServerVersion("8.0.28"); (err == nil) && (eq < 0) { // check we have set Slave_UUID slaveUUID, _ := r.GetString(0, 4) c.Assert(slaveUUID, HasLen, 36) + } else if err != nil { + c.Error("Cannot compare with server version: %w", err) } // Test re-sync. diff --git a/replication/row_event.go b/replication/row_event.go index fa3c05268..6e68648eb 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -33,7 +33,7 @@ type TableMapEvent struct { ColumnType []byte ColumnMeta []uint16 - //len = (ColumnCount + 7) / 8 + // len = (ColumnCount + 7) / 8 NullBitmap []byte /* @@ -97,7 +97,7 @@ func (e *TableMapEvent) Decode(data []byte) error { e.Schema = data[pos : pos+int(schemaLength)] pos += int(schemaLength) - //skip 0x00 + // skip 0x00 pos++ tableLength := data[pos] @@ -106,7 +106,7 @@ func (e *TableMapEvent) Decode(data []byte) error { e.Table = data[pos : pos+int(tableLength)] pos += int(tableLength) - //skip 0x00 + // skip 0x00 pos++ var n int @@ -196,13 +196,13 @@ func (e *TableMapEvent) decodeMeta(data []byte) error { for i, t := range e.ColumnType { switch t { case MYSQL_TYPE_STRING: - var x = uint16(data[pos]) << 8 //real type - x += uint16(data[pos+1]) //pack or field length + var x = uint16(data[pos]) << 8 // real type + x += uint16(data[pos+1]) // pack or field length e.ColumnMeta[i] = x pos += 2 case MYSQL_TYPE_NEWDECIMAL: - var x = uint16(data[pos]) << 8 //precision - x += uint16(data[pos+1]) //decimals + var x = uint16(data[pos]) << 8 // precision + x += uint16(data[pos+1]) // decimals e.ColumnMeta[i] = x pos += 2 case MYSQL_TYPE_VAR_STRING, @@ -807,27 +807,41 @@ func (e *TableMapEvent) IsEnumOrSetColumn(i int) bool { return rtyp == MYSQL_TYPE_ENUM || rtyp == MYSQL_TYPE_SET } +// JsonColumnCount returns the number of JSON columns in this table +func (e *TableMapEvent) JsonColumnCount() uint64 { + count := uint64(0) + for _, t := range e.ColumnType { + if t == MYSQL_TYPE_JSON { + count++ + } + } + + return count +} + // RowsEventStmtEndFlag is set in the end of the statement. const RowsEventStmtEndFlag = 0x01 type RowsEvent struct { - //0, 1, 2 + // 0, 1, 2 Version int tableIDSize int tables map[uint64]*TableMapEvent needBitmap2 bool + eventType EventType + Table *TableMapEvent TableID uint64 Flags uint16 - //if version == 2 + // if version == 2 ExtraData []byte - //lenenc_int + // lenenc_int ColumnCount uint64 /* @@ -839,14 +853,14 @@ type RowsEvent struct { ColumnBitmap1, ColumnBitmap2 and SkippedColumns are not set on the full row image. */ - //len = (ColumnCount + 7) / 8 + // len = (ColumnCount + 7) / 8 ColumnBitmap1 []byte - //if UPDATE_ROWS_EVENTv1 or v2 - //len = (ColumnCount + 7) / 8 + // if UPDATE_ROWS_EVENTv1 or v2, or PARTIAL_UPDATE_ROWS_EVENT + // len = (ColumnCount + 7) / 8 ColumnBitmap2 []byte - //rows: invalid: int64, float64, bool, []byte, string + // rows: all return types from RowsEvent.decodeValue() Rows [][]interface{} SkippedColumns [][]int @@ -856,6 +870,41 @@ type RowsEvent struct { ignoreJSONDecodeErr bool } +// EnumRowImageType is allowed types for every row in mysql binlog. +// See https://github.com/mysql/mysql-server/blob/1bfe02bdad6604d54913c62614bde57a055c8332/sql/rpl_record.h#L39 +// enum class enum_row_image_type { WRITE_AI, UPDATE_BI, UPDATE_AI, DELETE_BI }; +type EnumRowImageType byte + +const ( + EnumRowImageTypeWriteAI = EnumRowImageType(iota) + EnumRowImageTypeUpdateBI + EnumRowImageTypeUpdateAI + EnumRowImageTypeDeleteBI +) + +func (t EnumRowImageType) String() string { + switch t { + case EnumRowImageTypeWriteAI: + return "WriteAI" + case EnumRowImageTypeUpdateBI: + return "UpdateBI" + case EnumRowImageTypeUpdateAI: + return "UpdateAI" + case EnumRowImageTypeDeleteBI: + return "DeleteBI" + default: + return fmt.Sprintf("(%d)", t) + } +} + +// Bits for binlog_row_value_options sysvar +type EnumBinlogRowValueOptions byte + +const ( + // Store JSON updates in partial form + EnumBinlogRowValueOptionsPartialJsonUpdates = EnumBinlogRowValueOptions(iota + 1) +) + func (e *RowsEvent) DecodeHeader(data []byte) (int, error) { pos := 0 e.TableID = FixedLengthInt(data[0:e.tableIDSize]) @@ -898,6 +947,8 @@ func (e *RowsEvent) DecodeHeader(data []byte) (int, error) { } func (e *RowsEvent) DecodeData(pos int, data []byte) (err2 error) { + // Rows_log_event::print_verbose() + var ( n int err error @@ -909,22 +960,34 @@ func (e *RowsEvent) DecodeData(pos int, data []byte) (err2 error) { } }() - // Pre-allocate memory for rows. - rowsLen := e.ColumnCount + // Pre-allocate memory for rows: before image + (optional) after image + rowsLen := 1 if e.needBitmap2 { - rowsLen += e.ColumnCount + rowsLen++ } e.SkippedColumns = make([][]int, 0, rowsLen) e.Rows = make([][]interface{}, 0, rowsLen) + var rowImageType EnumRowImageType + switch e.eventType { + case WRITE_ROWS_EVENTv0, WRITE_ROWS_EVENTv1, WRITE_ROWS_EVENTv2: + rowImageType = EnumRowImageTypeWriteAI + case DELETE_ROWS_EVENTv0, DELETE_ROWS_EVENTv1, DELETE_ROWS_EVENTv2: + rowImageType = EnumRowImageTypeDeleteBI + default: + rowImageType = EnumRowImageTypeUpdateBI + } + for pos < len(data) { - if n, err = e.decodeRows(data[pos:], e.Table, e.ColumnBitmap1); err != nil { + // Parse the first image + if n, err = e.decodeImage(data[pos:], e.ColumnBitmap1, rowImageType); err != nil { return errors.Trace(err) } pos += n + // Parse the second image (for UPDATE only) if e.needBitmap2 { - if n, err = e.decodeRows(data[pos:], e.Table, e.ColumnBitmap2); err != nil { + if n, err = e.decodeImage(data[pos:], e.ColumnBitmap2, EnumRowImageTypeUpdateAI); err != nil { return errors.Trace(err) } pos += n @@ -946,12 +1009,34 @@ func isBitSet(bitmap []byte, i int) bool { return bitmap[i>>3]&(1<<(uint(i)&7)) > 0 } -func (e *RowsEvent) decodeRows(data []byte, table *TableMapEvent, bitmap []byte) (int, error) { - row := make([]interface{}, e.ColumnCount) - skips := make([]int, 0) +func isBitSetIncr(bitmap []byte, i *int) bool { + v := isBitSet(bitmap, *i) + *i++ + return v +} + +func (e *RowsEvent) decodeImage(data []byte, bitmap []byte, rowImageType EnumRowImageType) (int, error) { + // Rows_log_event::print_verbose_one_row() pos := 0 + var isPartialJsonUpdate bool + + var partialBitmap []byte + if e.eventType == PARTIAL_UPDATE_ROWS_EVENT && rowImageType == EnumRowImageTypeUpdateAI { + binlogRowValueOptions, _, n := LengthEncodedInt(data[pos:]) // binlog_row_value_options + pos += n + isPartialJsonUpdate = EnumBinlogRowValueOptions(binlogRowValueOptions)&EnumBinlogRowValueOptionsPartialJsonUpdates != 0 + if isPartialJsonUpdate { + byteCount := bitmapByteSize(int(e.Table.JsonColumnCount())) + partialBitmap = data[pos : pos+byteCount] + pos += byteCount + } + } + + row := make([]interface{}, e.ColumnCount) + skips := make([]int, 0) + // refer: https://github.com/alibaba/canal/blob/c3e38e50e269adafdd38a48c63a1740cde304c67/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogBuffer.java#L63 count := 0 for i := 0; i < int(e.ColumnCount); i++ { @@ -959,30 +1044,38 @@ func (e *RowsEvent) decodeRows(data []byte, table *TableMapEvent, bitmap []byte) count++ } } - count = (count + 7) / 8 + count = bitmapByteSize(count) nullBitmap := data[pos : pos+count] pos += count - nullbitIndex := 0 + partialBitmapIndex := 0 + nullBitmapIndex := 0 - var n int - var err error for i := 0; i < int(e.ColumnCount); i++ { + /* + Note: need to read partial bit before reading cols_bitmap, since + the partial_bits bitmap has a bit for every JSON column + regardless of whether it is included in the bitmap or not. + */ + isPartial := isPartialJsonUpdate && + (rowImageType == EnumRowImageTypeUpdateAI) && + (e.Table.ColumnType[i] == MYSQL_TYPE_JSON) && + isBitSetIncr(partialBitmap, &partialBitmapIndex) + if !isBitSet(bitmap, i) { skips = append(skips, i) continue } - isNull := (uint32(nullBitmap[nullbitIndex/8]) >> uint32(nullbitIndex%8)) & 0x01 - nullbitIndex++ - - if isNull > 0 { + if isBitSetIncr(nullBitmap, &nullBitmapIndex) { row[i] = nil continue } - row[i], n, err = e.decodeValue(data[pos:], table.ColumnType[i], table.ColumnMeta[i]) + var n int + var err error + row[i], n, err = e.decodeValue(data[pos:], e.Table.ColumnType[i], e.Table.ColumnMeta[i], isPartial) if err != nil { return 0, err @@ -1011,7 +1104,7 @@ func (e *RowsEvent) parseFracTime(t interface{}) interface{} { } // see mysql sql/log_event.cc log_event_print_value -func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{}, n int, err error) { +func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16, isPartial bool) (v interface{}, n int, err error) { var length = 0 if tp == MYSQL_TYPE_STRING { @@ -1063,7 +1156,7 @@ func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{ nbits := ((meta >> 8) * 8) + (meta & 0xFF) n = int(nbits+7) / 8 - //use int64 for bit + // use int64 for bit v, err = decodeBit(data, int(nbits), n) case MYSQL_TYPE_TIMESTAMP: n = 4 @@ -1161,10 +1254,37 @@ func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{ // Refer: https://github.com/shyiko/mysql-binlog-connector-java/blob/master/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AbstractRowsEventDataDeserializer.java#L404 length = int(FixedLengthInt(data[0:meta])) n = length + int(meta) - var d []byte - d, err = e.decodeJsonBinary(data[meta:n]) - if err == nil { - v = hack.String(d) + + /* + See https://github.com/mysql/mysql-server/blob/7b6fb0753b428537410f5b1b8dc60e5ccabc9f70/sql-common/json_binary.cc#L1077 + + Each document should start with a one-byte type specifier, so an + empty document is invalid according to the format specification. + Empty documents may appear due to inserts using the IGNORE keyword + or with non-strict SQL mode, which will insert an empty string if + the value NULL is inserted into a NOT NULL column. We choose to + interpret empty values as the JSON null literal. + + In our implementation (go-mysql) for backward compatibility we prefer return empty slice. + */ + if length == 0 { + v = []byte{} + } else { + if isPartial { + var diff *JsonDiff + diff, err = e.decodeJsonPartialBinary(data[meta:n]) + if err == nil { + v = diff + } else { + fmt.Printf("decodeJsonPartialBinary(%q) fail: %s\n", data[meta:n], err) + } + } else { + var d []byte + d, err = e.decodeJsonBinary(data[meta:n]) + if err == nil { + v = hack.String(d) + } + } } case MYSQL_TYPE_GEOMETRY: // MySQL saves Geometry as Blob in binlog @@ -1220,7 +1340,7 @@ func decodeDecimalDecompressValue(compIndx int, data []byte, mask uint8) (size i var zeros = [digitsPerInteger]byte{48, 48, 48, 48, 48, 48, 48, 48, 48} func decodeDecimal(data []byte, precision int, decimals int, useDecimal bool) (interface{}, int, error) { - //see python mysql replication and https://github.com/jeremycole/mysql_binlog + // see python mysql replication and https://github.com/jeremycole/mysql_binlog integral := precision - decimals uncompIntegral := integral / digitsPerInteger uncompFractional := decimals / digitsPerInteger @@ -1233,7 +1353,7 @@ func decodeDecimal(data []byte, precision int, decimals int, useDecimal bool) (i buf := make([]byte, binSize) copy(buf, data[:binSize]) - //must copy the data for later change + // must copy the data for later change data = buf // Support negative @@ -1248,7 +1368,7 @@ func decodeDecimal(data []byte, precision int, decimals int, useDecimal bool) (i res.WriteString("-") } - //clear sign + // clear sign data[0] ^= 0x80 zeroLeading := true @@ -1373,7 +1493,7 @@ func littleDecodeBit(data []byte, nbits int, length int) (value int64, err error } func decodeTimestamp2(data []byte, dec uint16, timestampStringLocation *time.Location) (interface{}, int, error) { - //get timestamp binary length + // get timestamp binary length n := int(4 + (dec+1)/2) sec := int64(binary.BigEndian.Uint32(data[0:4])) usec := int64(0) @@ -1400,7 +1520,7 @@ func decodeTimestamp2(data []byte, dec uint16, timestampStringLocation *time.Loc const DATETIMEF_INT_OFS int64 = 0x8000000000 func decodeDatetime2(data []byte, dec uint16) (interface{}, int, error) { - //get datetime binary length + // get datetime binary length n := int(5 + (dec+1)/2) intPart := int64(BFixedLengthInt(data[0:5])) - DATETIMEF_INT_OFS @@ -1420,7 +1540,7 @@ func decodeDatetime2(data []byte, dec uint16) (interface{}, int, error) { } tmp := intPart<<24 + frac - //handle sign??? + // handle sign??? if tmp < 0 { tmp = -tmp } @@ -1463,7 +1583,7 @@ const TIMEF_OFS int64 = 0x800000000000 const TIMEF_INT_OFS int64 = 0x800000 func decodeTime2(data []byte, dec uint16) (string, int, error) { - //time binary length + // time binary length n := int(3 + (dec+1)/2) tmp := int64(0) @@ -1581,9 +1701,12 @@ func (e *RowsEvent) Dump(w io.Writer) { for _, rows := range e.Rows { fmt.Fprintf(w, "--\n") for j, d := range rows { - if _, ok := d.([]byte); ok { - fmt.Fprintf(w, "%d:%q\n", j, d) - } else { + switch dt := d.(type) { + case []byte: + fmt.Fprintf(w, "%d:%q\n", j, dt) + case *JsonDiff: + fmt.Fprintf(w, "%d:%s\n", j, dt) + default: fmt.Fprintf(w, "%d:%#v\n", j, d) } } @@ -1596,7 +1719,7 @@ type RowsQueryEvent struct { } func (e *RowsQueryEvent) Decode(data []byte) error { - //ignore length byte 1 + // ignore length byte 1 e.Query = data[1:] return nil } diff --git a/replication/row_event_test.go b/replication/row_event_test.go index 9c0febe52..13114b464 100644 --- a/replication/row_event_test.go +++ b/replication/row_event_test.go @@ -1315,7 +1315,7 @@ func (_ *testDecodeSuite) BenchmarkUseDecimal(c *C) { c.ResetTimer() for i := 0; i < c.N; i++ { for _, d := range decimalData { - _, _, _ = e.decodeValue(d.dumpData, mysql.MYSQL_TYPE_NEWDECIMAL, d.meta) + _, _, _ = e.decodeValue(d.dumpData, mysql.MYSQL_TYPE_NEWDECIMAL, d.meta, false) } } } @@ -1325,7 +1325,7 @@ func (_ *testDecodeSuite) BenchmarkNotUseDecimal(c *C) { c.ResetTimer() for i := 0; i < c.N; i++ { for _, d := range decimalData { - _, _, _ = e.decodeValue(d.dumpData, mysql.MYSQL_TYPE_NEWDECIMAL, d.meta) + _, _, _ = e.decodeValue(d.dumpData, mysql.MYSQL_TYPE_NEWDECIMAL, d.meta, false) } } } @@ -1334,14 +1334,14 @@ func (_ *testDecodeSuite) TestDecimal(c *C) { e := &RowsEvent{useDecimal: true} e2 := &RowsEvent{useDecimal: false} for _, d := range decimalData { - v, _, err := e.decodeValue(d.dumpData, mysql.MYSQL_TYPE_NEWDECIMAL, d.meta) + v, _, err := e.decodeValue(d.dumpData, mysql.MYSQL_TYPE_NEWDECIMAL, d.meta, false) c.Assert(err, IsNil) // no trailing zero dec, err := decimal.NewFromString(d.num) c.Assert(err, IsNil) c.Assert(dec.Equal(v.(decimal.Decimal)), IsTrue) - v, _, err = e2.decodeValue(d.dumpData, mysql.MYSQL_TYPE_NEWDECIMAL, d.meta) + v, _, err = e2.decodeValue(d.dumpData, mysql.MYSQL_TYPE_NEWDECIMAL, d.meta, false) c.Assert(err, IsNil) c.Assert(v.(string), Equals, d.num) } @@ -1367,7 +1367,7 @@ func (_ *testDecodeSuite) BenchmarkInt(c *C) { c.ResetTimer() for i := 0; i < c.N; i++ { for _, d := range intData { - _, _, _ = e.decodeValue(d, mysql.MYSQL_TYPE_LONG, 0) + _, _, _ = e.decodeValue(d, mysql.MYSQL_TYPE_LONG, 0, false) } } }