Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for PARTIAL_UPDATE_ROWS_EVENT binlog event and PARTIAL_JSON mode #774

Merged
merged 2 commits into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion client/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ import (
"strings"
"time"

"github.com/pingcap/errors"

. "github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/packet"
"github.com/go-mysql-org/go-mysql/utils"
"github.com/pingcap/errors"
)

type Conn struct {
Expand Down Expand Up @@ -198,6 +199,10 @@ func (c *Conn) GetServerVersion() string {
return c.serverVersion
}

func (c *Conn) CompareServerVersion(v string) (int, error) {
return CompareServerVersions(c.serverVersion, v)
}

func (c *Conn) Execute(command string, args ...interface{}) (*Result, error) {
if len(args) == 0 {
return c.exec(command)
Expand Down
17 changes: 9 additions & 8 deletions client/resp.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/x509"
"encoding/binary"
"encoding/pem"
"fmt"

"github.com/pingcap/errors"
"github.com/siddontang/go/hack"
Expand Down Expand Up @@ -60,9 +61,9 @@ func (c *Conn) handleOKPacket(data []byte) (*Result, error) {
// pos += 2
}

//new ok package will check CLIENT_SESSION_TRACK too, but I don't support it now.
// new ok package will check CLIENT_SESSION_TRACK too, but I don't support it now.

//skip info
// skip info
return r, nil
}

Expand All @@ -75,7 +76,7 @@ func (c *Conn) handleErrorPacket(data []byte) error {
pos += 2

if c.capability&CLIENT_PROTOCOL_41 > 0 {
//skip '#'
// skip '#'
pos++
e.State = hack.String(data[pos : pos+5])
pos += 5
Expand All @@ -89,11 +90,11 @@ func (c *Conn) handleErrorPacket(data []byte) error {
func (c *Conn) handleAuthResult() error {
data, switchToPlugin, err := c.readAuthResult()
if err != nil {
return err
return fmt.Errorf("readAuthResult: %w", err)
}
// handle auth switch, only support 'sha256_password', and 'caching_sha2_password'
if switchToPlugin != "" {
//fmt.Printf("now switching auth plugin to '%s'\n", switchToPlugin)
// fmt.Printf("now switching auth plugin to '%s'\n", switchToPlugin)
if data == nil {
data = c.salt
} else {
Expand Down Expand Up @@ -168,7 +169,7 @@ func (c *Conn) handleAuthResult() error {
func (c *Conn) readAuthResult() ([]byte, string, error) {
data, err := c.ReadPacket()
if err != nil {
return nil, "", err
return nil, "", fmt.Errorf("ReadPacket: %w", err)
}

// see: https://insidemysql.com/preparing-your-community-connector-for-mysql-8-part-2-sha256/
Expand Down Expand Up @@ -351,7 +352,7 @@ func (c *Conn) readResultColumns(result *Result) (err error) {
if c.isEOFPacket(data) {
if c.capability&CLIENT_PROTOCOL_41 > 0 {
result.Warnings = binary.LittleEndian.Uint16(data[1:])
//todo add strict_mode, warning will be treat as error
// todo add strict_mode, warning will be treat as error
result.Status = binary.LittleEndian.Uint16(data[3:])
c.status = result.Status
}
Expand Down Expand Up @@ -392,7 +393,7 @@ func (c *Conn) readResultRows(result *Result, isBinary bool) (err error) {
if c.isEOFPacket(data) {
if c.capability&CLIENT_PROTOCOL_41 > 0 {
result.Warnings = binary.LittleEndian.Uint16(data[1:])
//todo add strict_mode, warning will be treat as error
// todo add strict_mode, warning will be treat as error
result.Status = binary.LittleEndian.Uint16(data[3:])
c.status = result.Status
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.16
require (
github.com/BurntSushi/toml v0.3.1
github.com/DataDog/zstd v1.5.2
github.com/Masterminds/semver v1.5.0
github.com/go-sql-driver/mysql v1.6.0
github.com/google/uuid v1.3.0
github.com/jmoiron/sqlx v1.3.3
Expand All @@ -16,6 +17,5 @@ require (
github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726
github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07
github.com/stretchr/testify v1.8.0
golang.org/x/mod v0.3.0
golang.org/x/tools v0.0.0-20201125231158-b5590deeca9b // indirect
)
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/DataDog/zstd v1.5.2 h1:vUG4lAyuPCXO0TLbXvPv7EB7cNK1QV/luu55UHLrrn8=
github.com/DataDog/zstd v1.5.2/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww=
github.com/Masterminds/semver v1.5.0/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548/go.mod h1:e6NPNENfs9mPDVNRekM7lKScauxd5kXTr1Mfyig6TDM=
Expand Down Expand Up @@ -75,7 +77,6 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/exp v0.0.0-20181106170214-d68db9428509/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
Expand Down
18 changes: 18 additions & 0 deletions mysql/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"strings"
"time"

"github.com/Masterminds/semver"
"github.com/pingcap/errors"
"github.com/siddontang/go/hack"
)
Expand Down Expand Up @@ -379,6 +380,23 @@ func ErrorEqual(err1, err2 error) bool {
return e1.Error() == e2.Error()
}

func CompareServerVersions(a, b string) (int, error) {
var (
aVer, bVer *semver.Version
err error
)

if aVer, err = semver.NewVersion(a); err != nil {
return 0, fmt.Errorf("cannot parse %q as semver: %w", a, err)
}

if bVer, err = semver.NewVersion(b); err != nil {
return 0, fmt.Errorf("cannot parse %q as semver: %w", b, err)
}

return aVer.Compare(bVer), nil
}

var encodeRef = map[byte]byte{
'\x00': '0',
'\'': '\'',
Expand Down
30 changes: 30 additions & 0 deletions mysql/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package mysql

import (
"github.com/pingcap/check"
)

type utilTestSuite struct {
}

var _ = check.Suite(&utilTestSuite{})

func (s *utilTestSuite) TestCompareServerVersions(c *check.C) {
tests := []struct {
A string
B string
Expect int
}{
{A: "1.2.3", B: "1.2.3", Expect: 0},
{A: "5.6-999", B: "8.0", Expect: -1},
{A: "8.0.32-0ubuntu0.20.04.2", B: "8.0.28", Expect: 1},
}

for _, test := range tests {
comment := check.Commentf("%q vs. %q", test.A, test.B)

got, err := CompareServerVersions(test.A, test.B)
c.Assert(err, check.IsNil, comment)
c.Assert(got, check.Equals, test.Expect, comment)
}
}
102 changes: 96 additions & 6 deletions replication/json_binary.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (
"fmt"
"math"

. "github.com/go-mysql-org/go-mysql/mysql"
"github.com/pingcap/errors"
"github.com/siddontang/go/hack"

. "github.com/go-mysql-org/go-mysql/mysql"
)

const (
Expand Down Expand Up @@ -44,6 +45,60 @@ const (
jsonbValueEntrySizeLarge = 1 + jsonbLargeOffsetSize
)

var (
ErrCorruptedJSONDiff = fmt.Errorf("corrupted JSON diff") // ER_CORRUPTED_JSON_DIFF
)

type (
// JsonDiffOperation is an enum that describes what kind of operation a JsonDiff object represents.
// https://github.com/mysql/mysql-server/blob/8.0/sql/json_diff.h
JsonDiffOperation byte
)

const (
// The JSON value in the given path is replaced with a new value.
//
// It has the same effect as `JSON_REPLACE(col, path, value)`.
JsonDiffOperationReplace = JsonDiffOperation(iota)

// Add a new element at the given path.
//
// If the path specifies an array element, it has the same effect as `JSON_ARRAY_INSERT(col, path, value)`.
//
// If the path specifies an object member, it has the same effect as `JSON_INSERT(col, path, value)`.
JsonDiffOperationInsert

// The JSON value at the given path is removed from an array or object.
//
// It has the same effect as `JSON_REMOVE(col, path)`.
JsonDiffOperationRemove
)

type (
JsonDiff struct {
Op JsonDiffOperation
Path string
Value string
}
)

func (op JsonDiffOperation) String() string {
switch op {
case JsonDiffOperationReplace:
return "Replace"
case JsonDiffOperationInsert:
return "Insert"
case JsonDiffOperationRemove:
return "Remove"
default:
return fmt.Sprintf("Unknown(%d)", op)
}
}

func (jd *JsonDiff) String() string {
return fmt.Sprintf("json_diff(op:%s path:%s value:%s)", jd.Op, jd.Path, jd.Value)
}

func jsonbGetOffsetSize(isSmall bool) int {
if isSmall {
return jsonbSmallOffsetSize
Expand Down Expand Up @@ -71,11 +126,6 @@ func jsonbGetValueEntrySize(isSmall bool) int {
// decodeJsonBinary decodes the JSON binary encoding data and returns
// the common JSON encoding data.
func (e *RowsEvent) decodeJsonBinary(data []byte) ([]byte, error) {
// Sometimes, we can insert a NULL JSON even we set the JSON field as NOT NULL.
// If we meet this case, we can return an empty slice.
if len(data) == 0 {
return []byte{}, nil
}
d := jsonBinaryDecoder{
useDecimal: e.useDecimal,
ignoreDecodeErr: e.ignoreJSONDecodeErr,
Expand Down Expand Up @@ -491,3 +541,43 @@ func (d *jsonBinaryDecoder) decodeVariableLength(data []byte) (int, int) {

return 0, 0
}

func (e *RowsEvent) decodeJsonPartialBinary(data []byte) (*JsonDiff, error) {
// see Json_diff_vector::read_binary() in mysql-server/sql/json_diff.cc
operationNumber := JsonDiffOperation(data[0])
switch operationNumber {
case JsonDiffOperationReplace:
case JsonDiffOperationInsert:
case JsonDiffOperationRemove:
default:
return nil, ErrCorruptedJSONDiff
}
data = data[1:]

pathLength, _, n := LengthEncodedInt(data)
data = data[n:]

path := data[:pathLength]
data = data[pathLength:]

diff := &JsonDiff{
Op: operationNumber,
Path: string(path),
// Value will be filled below
}

if operationNumber == JsonDiffOperationRemove {
return diff, nil
}

valueLength, _, n := LengthEncodedInt(data)
data = data[n:]

d, err := e.decodeJsonBinary(data[:valueLength])
if err != nil {
return nil, fmt.Errorf("cannot read json diff for field %q: %w", path, err)
}
diff.Value = string(d)

return diff, nil
}
11 changes: 9 additions & 2 deletions replication/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,8 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) (
UPDATE_ROWS_EVENTv1,
WRITE_ROWS_EVENTv2,
UPDATE_ROWS_EVENTv2,
DELETE_ROWS_EVENTv2:
DELETE_ROWS_EVENTv2,
PARTIAL_UPDATE_ROWS_EVENT: // Extension of UPDATE_ROWS_EVENT, allowing partial values according to binlog_row_value_options
e = p.newRowsEvent(h)
case ROWS_QUERY_EVENT:
e = &RowsQueryEvent{}
Expand Down Expand Up @@ -381,14 +382,17 @@ func (p *BinlogParser) verifyCrc32Checksum(rawData []byte) error {

func (p *BinlogParser) newRowsEvent(h *EventHeader) *RowsEvent {
e := &RowsEvent{}
if p.format.EventTypeHeaderLengths[h.EventType-1] == 6 {

postHeaderLen := p.format.EventTypeHeaderLengths[h.EventType-1]
if postHeaderLen == 6 {
e.tableIDSize = 4
} else {
e.tableIDSize = 6
}

e.needBitmap2 = false
e.tables = p.tables
e.eventType = h.EventType
e.parseTime = p.parseTime
e.timestampStringLocation = p.timestampStringLocation
e.useDecimal = p.useDecimal
Expand All @@ -415,6 +419,9 @@ func (p *BinlogParser) newRowsEvent(h *EventHeader) *RowsEvent {
e.needBitmap2 = true
case DELETE_ROWS_EVENTv2:
e.Version = 2
case PARTIAL_UPDATE_ROWS_EVENT:
e.Version = 2
e.needBitmap2 = true
}

return e
Expand Down
Loading