Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add function to parse 'extradata’ in rows event #817

Merged
40 changes: 37 additions & 3 deletions replication/row_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,13 @@ type RowsEvent struct {
Flags uint16

// if version == 2
ExtraData []byte
// Use when DataLen value is greater than 2
NdbLength uint8
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems it's complaining that the type of variables are not vertically aligned.

NdbFormat []byte
NdbData []byte
PartitionId uint16
SourceParitionId uint16


// lenenc_int
ColumnCount uint64
Expand Down Expand Up @@ -955,8 +961,12 @@ func (e *RowsEvent) DecodeHeader(data []byte) (int, error) {
if e.Version == 2 {
dataLen := binary.LittleEndian.Uint16(data[pos:])
pos += 2

e.ExtraData = data[pos : pos+int(dataLen-2)]
if dataLen > 2 {
err := e.decodeExtraData(pos,data)
if err != nil {
return 0, err
}
}
pos += int(dataLen - 2)
}

Expand Down Expand Up @@ -985,6 +995,30 @@ func (e *RowsEvent) DecodeHeader(data []byte) (int, error) {
return pos, nil
}

func (e *RowsEvent) decodeExtraData(pos int, data []byte) (err2 error) {
extraDataType := data[pos]
pos +=1
switch extraDataType {
case 0:
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
e.NdbLength = data[pos]
pos +=1
e.NdbFormat = data[pos:]
pos +=1
e.NdbData = data[pos:e.NdbLength-2]
case 1:
if e.eventType == UPDATE_ROWS_EVENTv2 {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
e.PartitionId = binary.LittleEndian.Uint16(data[pos:])
pos +=2
e.SourceParitionId = binary.LittleEndian.Uint16(data[pos:])
pos +=2
}else {
e.PartitionId = binary.LittleEndian.Uint16(data[pos:])
pos +=2
}
}
return nil
}

func (e *RowsEvent) DecodeData(pos int, data []byte) (err2 error) {
// Rows_log_event::print_verbose()

Expand Down