Skip to content

Commit

Permalink
Add support for MariaDB compressed binlog events
Browse files Browse the repository at this point in the history
  • Loading branch information
monder committed May 4, 2023
1 parent f854680 commit e187ab6
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 5 deletions.
6 changes: 3 additions & 3 deletions canal/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,11 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {
}
var action string
switch e.Header.EventType {
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2, replication.MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1:
action = InsertAction
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2, replication.MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1:
action = DeleteAction
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2, replication.MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1:
action = UpdateAction
default:
return errors.Errorf("%s not supported now", e.Header.EventType)
Expand Down
11 changes: 11 additions & 0 deletions replication/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ const (
MARIADB_BINLOG_CHECKPOINT_EVENT
MARIADB_GTID_EVENT
MARIADB_GTID_LIST_EVENT
MARIADB_START_ENCRYPTION_EVENT
MARIADB_QUERY_COMPRESSED_EVENT
MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1
MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1
MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1
)

func (e EventType) String() string {
Expand Down Expand Up @@ -197,6 +202,12 @@ func (e EventType) String() string {
return "TransactionPayloadEvent"
case HEARTBEAT_LOG_EVENT_V2:
return "HeartbeatLogEventV2"
case MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1:
return "MariadbWriteRowsCompressedEventV1"
case MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1:
return "MariadbUpdateRowsCompressedEventV1"
case MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1:
return "MariadbDeleteRowsCompressedEventV1"

default:
return "UnknownEvent"
Expand Down
14 changes: 14 additions & 0 deletions replication/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,11 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) (
WRITE_ROWS_EVENTv2,
UPDATE_ROWS_EVENTv2,
DELETE_ROWS_EVENTv2,
MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1,
MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1,
MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1,
PARTIAL_UPDATE_ROWS_EVENT: // Extension of UPDATE_ROWS_EVENT, allowing partial values according to binlog_row_value_options

e = p.newRowsEvent(h)
case ROWS_QUERY_EVENT:
e = &RowsQueryEvent{}
Expand Down Expand Up @@ -412,6 +416,16 @@ func (p *BinlogParser) newRowsEvent(h *EventHeader) *RowsEvent {
case UPDATE_ROWS_EVENTv1:
e.Version = 1
e.needBitmap2 = true
case MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1:
e.Version = 1
e.compressed = true
case MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1:
e.Version = 1
e.compressed = true
case MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1:
e.Version = 1
e.compressed = true
e.needBitmap2 = true
case WRITE_ROWS_EVENTv2:
e.Version = 2
case UPDATE_ROWS_EVENTv2:
Expand Down
38 changes: 36 additions & 2 deletions replication/row_event.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package replication

import (
"bytes"
"compress/zlib"
"encoding/binary"
"encoding/hex"
"fmt"
Expand Down Expand Up @@ -830,6 +832,9 @@ type RowsEvent struct {
tables map[uint64]*TableMapEvent
needBitmap2 bool

// for mariadb *_COMPRESSED_EVENT_V1
compressed bool

eventType EventType

Table *TableMapEvent
Expand Down Expand Up @@ -970,9 +975,9 @@ func (e *RowsEvent) DecodeData(pos int, data []byte) (err2 error) {

var rowImageType EnumRowImageType
switch e.eventType {
case WRITE_ROWS_EVENTv0, WRITE_ROWS_EVENTv1, WRITE_ROWS_EVENTv2:
case WRITE_ROWS_EVENTv0, WRITE_ROWS_EVENTv1, WRITE_ROWS_EVENTv2, MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1:
rowImageType = EnumRowImageTypeWriteAI
case DELETE_ROWS_EVENTv0, DELETE_ROWS_EVENTv1, DELETE_ROWS_EVENTv2:
case DELETE_ROWS_EVENTv0, DELETE_ROWS_EVENTv1, DELETE_ROWS_EVENTv2, MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1:
rowImageType = EnumRowImageTypeDeleteBI
default:
rowImageType = EnumRowImageTypeUpdateBI
Expand Down Expand Up @@ -1002,6 +1007,13 @@ func (e *RowsEvent) Decode(data []byte) error {
if err != nil {
return err
}
if e.compressed {
uncompressedData, err := e.decompressData(pos, data)
if err != nil {
return err
}
return e.DecodeData(0, uncompressedData)
}
return e.DecodeData(pos, data)
}

Expand Down Expand Up @@ -1103,6 +1115,28 @@ func (e *RowsEvent) parseFracTime(t interface{}) interface{} {
return v.Time
}

func (e *RowsEvent) decompressData(pos int, data []byte) ([]byte, error) {
// algorithm always 0=zlib
// algorithm := (data[pos] & 0x07) >> 4
headerSize := int(data[pos] & 0x07)
pos++

uncompressedDataSize := BFixedLengthInt(data[pos : pos+headerSize])

pos += headerSize
uncompressedData := make([]byte, uncompressedDataSize)
r, err := zlib.NewReader(bytes.NewReader(data[pos:]))
if err != nil {
return nil, err
}
defer r.Close()
_, err = io.ReadFull(r, uncompressedData)
if err != nil {
return nil, err
}
return uncompressedData, nil
}

// see mysql sql/log_event.cc log_event_print_value
func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16, isPartial bool) (v interface{}, n int, err error) {
var length = 0
Expand Down

0 comments on commit e187ab6

Please sign in to comment.