Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add function to parse 'extradata’ in rows event #817

Merged
5 changes: 5 additions & 0 deletions replication/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,8 @@ const (
LAST_INSERT_ID
INSERT_ID
)

const (
ENUM_EXTRA_ROW_INFO_TYPECODE_NDB byte = iota
ENUM_EXTRA_ROW_INFO_TYPECODE_PARTITION
)
39 changes: 36 additions & 3 deletions replication/row_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,12 @@ type RowsEvent struct {
Flags uint16

// if version == 2
ExtraData []byte
// Use when DataLen value is greater than 2
NdbFormat byte
NdbData []byte

PartitionId uint16
SourcePartitionId uint16

// lenenc_int
ColumnCount uint64
Expand Down Expand Up @@ -955,8 +960,12 @@ func (e *RowsEvent) DecodeHeader(data []byte) (int, error) {
if e.Version == 2 {
dataLen := binary.LittleEndian.Uint16(data[pos:])
pos += 2

e.ExtraData = data[pos : pos+int(dataLen-2)]
if dataLen > 2 {
err := e.decodeExtraData(data[pos:])
if err != nil {
return 0, err
}
}
pos += int(dataLen - 2)
}

Expand Down Expand Up @@ -985,6 +994,29 @@ func (e *RowsEvent) DecodeHeader(data []byte) (int, error) {
return pos, nil
}

func (e *RowsEvent) decodeExtraData(data []byte) (err2 error) {
pos := 0
extraDataType := data[pos]
pos += 1
switch extraDataType {
case ENUM_EXTRA_ROW_INFO_TYPECODE_NDB:
var ndbLength int = int(data[pos])
pos += 1
e.NdbFormat = data[pos]
pos += 1
e.NdbData = data[pos : pos+ndbLength-2]
case ENUM_EXTRA_ROW_INFO_TYPECODE_PARTITION:
if e.eventType == UPDATE_ROWS_EVENTv1 || e.eventType == UPDATE_ROWS_EVENTv2 || e.eventType == PARTIAL_UPDATE_ROWS_EVENT {
e.PartitionId = binary.LittleEndian.Uint16(data[pos:])
pos += 2
e.SourcePartitionId = binary.LittleEndian.Uint16(data[pos:])
} else {
e.PartitionId = binary.LittleEndian.Uint16(data[pos:])
}
}
return nil
}

func (e *RowsEvent) DecodeData(pos int, data []byte) (err2 error) {
// Rows_log_event::print_verbose()

Expand Down Expand Up @@ -1742,6 +1774,7 @@ func (e *RowsEvent) Dump(w io.Writer) {
fmt.Fprintf(w, "TableID: %d\n", e.TableID)
fmt.Fprintf(w, "Flags: %d\n", e.Flags)
fmt.Fprintf(w, "Column count: %d\n", e.ColumnCount)
fmt.Fprintf(w, "NDB data: %s\n", e.NdbData)

fmt.Fprintf(w, "Values:\n")
for _, rows := range e.Rows {
Expand Down
120 changes: 120 additions & 0 deletions replication/row_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1056,6 +1056,126 @@ func TestTableMapOptMetaVisibility(t *testing.T) {
}
}

func TestRowsDataExtraData(t *testing.T) {
// Only after mysql 8.0.16 version can be parsed from extradata to 'partition info' and 'ndb info'
testcases := []struct {
data []byte
tableData []byte
eventType EventType
expectPartitionId uint16
expectSourcePartitionId uint16
expectNdbFormat byte
expectNdbData []byte
}{
/*
mysql-cluster 8.0.32

+-------+------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+------+------+-----+---------+-------+
| p | int | NO | PRI | NULL | |
| c | int | YES | UNI | NULL | |
+-------+------+------+-----+---------+-------+

CREATE TABLE t (
p INT PRIMARY KEY,
c INT,
UNIQUE KEY u (c)
) ENGINE NDB;

INSERT INTO t VALUES (1,1), (2,2), (3,3), (4,4), (5,5);
*/
{
data: []byte("s\x00\x00\x00\x00\x00\x01\x00\x0f\x00\x00\f\x00\x01\x00\x00\x04\x80\x00\x04\x00\x00\x00\x02\xff\x00\x01\x00\x00\x00\x01\x00\x00\x00\x00\x02\x00\x00\x00\x02\x00\x00\x00\x00\x04\x00\x00\x00\x04\x00\x00\x00\x00\x03\x00\x00\x00\x03\x00\x00\x00\x00\x05\x00\x00\x00\x05\x00\x00\x00"),
tableData: []byte("s\x00\x00\x00\x00\x00\x01\x00\abdteste\x00\x01t\x00\x02\x03\x03\x00\x02\x01\x01\x00"),
eventType: WRITE_ROWS_EVENTv2,
expectPartitionId: 0x0,
expectSourcePartitionId: 0x0,
expectNdbFormat: 0x0,
expectNdbData: []byte("\x01\x00\x00\x04\x80\x00\x04\x00\x00\x00"),
},
/*
mysql 8.0.16

+-------+------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+------+------+-----+---------+-------+
| id | int | YES | | NULL | |
+-------+------+------+-----+---------+-------+

CREATE TABLE test (id INTEGER)
PARTITION BY RANGE (id) (
PARTITION p0 VALUES LESS THAN (1),
PARTITION p1 VALUES LESS THAN (2),
PARTITION p2 VALUES LESS THAN (3),
PARTITION p3 VALUES LESS THAN (4),
PARTITION p4 VALUES LESS THAN (5)
);

INSERT INTO test (id) VALUES(3);
UPDATE test set id = 1 WHERE id = 3;
*/
{
data: []byte("p\x03\x00\x00\x00\x00\x01\x00\x05\x00\x01\x03\x00\x01\xff\x00\x03\x00\x00\x00"),
tableData: []byte("p\x03\x00\x00\x00\x00\x01\x00\x04test\x00\x04test\x00\x01\x03\x00\x01\x01\x01\x00"),
eventType: WRITE_ROWS_EVENTv2,
expectPartitionId: 0x3,
expectSourcePartitionId: 0x0,
expectNdbFormat: 0x0,
expectNdbData: []byte(nil),
},
{
data: []byte("p\x03\x00\x00\x00\x00\x01\x00\a\x00\x01\x01\x00\x03\x00\x01\xff\xff\x00\x03\x00\x00\x00\x00\x01\x00\x00\x00"),
tableData: []byte("p\x03\x00\x00\x00\x00\x01\x00\x04test\x00\x04test\x00\x01\x03\x00\x01\x01\x01\x00"),
eventType: UPDATE_ROWS_EVENTv2,
expectPartitionId: 0x1,
expectSourcePartitionId: 0x3,
expectNdbFormat: 0x0,
expectNdbData: []byte(nil),
},
// mysql 5.7 and mariadb 14(15) does not surpot extra data
{
data: []byte("m\x00\x00\x00\x00\x00\x01\x00\x02\x00\x01\xff\xfe\x03\x00\x00\x00"),
tableData: []byte("m\x00\x00\x00\x00\x00\x01\x00\x04test\x00\x04test\x00\x01\x03\x00\x01"),
eventType: WRITE_ROWS_EVENTv2,
expectPartitionId: 0x0,
expectSourcePartitionId: 0x0,
expectNdbFormat: 0x0,
expectNdbData: []byte(nil),
},
{
data: []byte("m\x00\x00\x00\x00\x00\x01\x00\x02\x00\x01\xff\xff\xfe\x03\x00\x00\x00\xfe\x01\x00\x00\x00"),
tableData: []byte("m\x00\x00\x00\x00\x00\x01\x00\x04test\x00\x04test\x00\x01\x03\x00\x01"),
eventType: UPDATE_ROWS_EVENTv2,
expectPartitionId: 0x0,
expectSourcePartitionId: 0x0,
expectNdbFormat: 0x0,
expectNdbData: []byte(nil),
},
}

for _, tc := range testcases {
tableMapEvent := new(TableMapEvent)
tableMapEvent.tableIDSize = 6
err := tableMapEvent.Decode(tc.tableData)
require.NoError(t, err)

rowsEvent := new(RowsEvent)
rowsEvent.tableIDSize = 6
rowsEvent.tables = make(map[uint64]*TableMapEvent)
rowsEvent.tables[tableMapEvent.TableID] = tableMapEvent
rowsEvent.Version = 2
rowsEvent.eventType = tc.eventType

err = rowsEvent.Decode(tc.data)
require.NoError(t, err)
require.Equal(t, tc.expectPartitionId, rowsEvent.PartitionId)
require.Equal(t, tc.expectSourcePartitionId, rowsEvent.SourcePartitionId)
require.Equal(t, tc.expectNdbFormat, rowsEvent.NdbFormat)
require.Equal(t, tc.expectNdbData, rowsEvent.NdbData)
}
}

func TestTableMapHelperMaps(t *testing.T) {
/*
CREATE TABLE `_types` (
Expand Down