Skip to content

Commit

Permalink
Merge pull request #786 from monder/mariadb-compressed
Browse files Browse the repository at this point in the history
Add support for MariaDB compressed binlog events
  • Loading branch information
lance6716 authored May 6, 2023
2 parents 850a82b + 1bfe8cd commit df895b5
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 6 deletions.
6 changes: 3 additions & 3 deletions canal/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,11 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {
}
var action string
switch e.Header.EventType {
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2, replication.MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1:
action = InsertAction
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2, replication.MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1:
action = DeleteAction
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2, replication.MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1:
action = UpdateAction
default:
return errors.Errorf("%s not supported now", e.Header.EventType)
Expand Down
20 changes: 20 additions & 0 deletions mysql/util.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package mysql

import (
"bytes"
"compress/zlib"
"crypto/rand"
"crypto/rsa"
"crypto/sha1"
Expand Down Expand Up @@ -92,6 +94,24 @@ func EncryptPassword(password string, seed []byte, pub *rsa.PublicKey) ([]byte,
return rsa.EncryptOAEP(sha1v, rand.Reader, pub, plain, nil)
}

func DecompressMariadbData(data []byte) ([]byte, error) {
// algorithm always 0=zlib
// algorithm := (data[pos] & 0x07) >> 4
headerSize := int(data[0] & 0x07)
uncompressedDataSize := BFixedLengthInt(data[1 : 1+headerSize])
uncompressedData := make([]byte, uncompressedDataSize)
r, err := zlib.NewReader(bytes.NewReader(data[1+headerSize:]))
if err != nil {
return nil, err
}
defer r.Close()
_, err = io.ReadFull(r, uncompressedData)
if err != nil {
return nil, err
}
return uncompressedData, nil
}

// AppendLengthEncodedInteger: encodes a uint64 value and appends it to the given bytes slice
func AppendLengthEncodedInteger(b []byte, n uint64) []byte {
switch {
Expand Down
15 changes: 15 additions & 0 deletions replication/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ const (
MARIADB_BINLOG_CHECKPOINT_EVENT
MARIADB_GTID_EVENT
MARIADB_GTID_LIST_EVENT
MARIADB_START_ENCRYPTION_EVENT
MARIADB_QUERY_COMPRESSED_EVENT
MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1
MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1
MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1
)

func (e EventType) String() string {
Expand Down Expand Up @@ -197,6 +202,16 @@ func (e EventType) String() string {
return "TransactionPayloadEvent"
case HEARTBEAT_LOG_EVENT_V2:
return "HeartbeatLogEventV2"
case MARIADB_START_ENCRYPTION_EVENT:
return "MariadbStartEncryptionEvent"
case MARIADB_QUERY_COMPRESSED_EVENT:
return "MariadbQueryCompressedEvent"
case MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1:
return "MariadbWriteRowsCompressedEventV1"
case MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1:
return "MariadbUpdateRowsCompressedEventV1"
case MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1:
return "MariadbDeleteRowsCompressedEventV1"

default:
return "UnknownEvent"
Expand Down
13 changes: 12 additions & 1 deletion replication/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,9 @@ type QueryEvent struct {
Schema []byte
Query []byte

// for mariadb QUERY_COMPRESSED_EVENT
compressed bool

// in fact QueryEvent dosen't have the GTIDSet information, just for beneficial to use
GSet GTIDSet
}
Expand Down Expand Up @@ -328,7 +331,15 @@ func (e *QueryEvent) Decode(data []byte) error {
//skip 0x00
pos++

e.Query = data[pos:]
if e.compressed {
decompressedQuery, err := DecompressMariadbData(data[pos:])
if err != nil {
return err
}
e.Query = decompressedQuery
} else {
e.Query = data[pos:]
}
return nil
}

Expand Down
18 changes: 18 additions & 0 deletions replication/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) (
switch h.EventType {
case QUERY_EVENT:
e = &QueryEvent{}
case MARIADB_QUERY_COMPRESSED_EVENT:
e = &QueryEvent{
compressed: true,
}
case XID_EVENT:
e = &XIDEvent{}
case TABLE_MAP_EVENT:
Expand All @@ -270,7 +274,11 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) (
WRITE_ROWS_EVENTv2,
UPDATE_ROWS_EVENTv2,
DELETE_ROWS_EVENTv2,
MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1,
MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1,
MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1,
PARTIAL_UPDATE_ROWS_EVENT: // Extension of UPDATE_ROWS_EVENT, allowing partial values according to binlog_row_value_options

e = p.newRowsEvent(h)
case ROWS_QUERY_EVENT:
e = &RowsQueryEvent{}
Expand Down Expand Up @@ -412,6 +420,16 @@ func (p *BinlogParser) newRowsEvent(h *EventHeader) *RowsEvent {
case UPDATE_ROWS_EVENTv1:
e.Version = 1
e.needBitmap2 = true
case MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1:
e.Version = 1
e.compressed = true
case MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1:
e.Version = 1
e.compressed = true
case MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1:
e.Version = 1
e.compressed = true
e.needBitmap2 = true
case WRITE_ROWS_EVENTv2:
e.Version = 2
case UPDATE_ROWS_EVENTv2:
Expand Down
14 changes: 12 additions & 2 deletions replication/row_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,9 @@ type RowsEvent struct {
tables map[uint64]*TableMapEvent
needBitmap2 bool

// for mariadb *_COMPRESSED_EVENT_V1
compressed bool

eventType EventType

Table *TableMapEvent
Expand Down Expand Up @@ -970,9 +973,9 @@ func (e *RowsEvent) DecodeData(pos int, data []byte) (err2 error) {

var rowImageType EnumRowImageType
switch e.eventType {
case WRITE_ROWS_EVENTv0, WRITE_ROWS_EVENTv1, WRITE_ROWS_EVENTv2:
case WRITE_ROWS_EVENTv0, WRITE_ROWS_EVENTv1, WRITE_ROWS_EVENTv2, MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1:
rowImageType = EnumRowImageTypeWriteAI
case DELETE_ROWS_EVENTv0, DELETE_ROWS_EVENTv1, DELETE_ROWS_EVENTv2:
case DELETE_ROWS_EVENTv0, DELETE_ROWS_EVENTv1, DELETE_ROWS_EVENTv2, MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1:
rowImageType = EnumRowImageTypeDeleteBI
default:
rowImageType = EnumRowImageTypeUpdateBI
Expand Down Expand Up @@ -1002,6 +1005,13 @@ func (e *RowsEvent) Decode(data []byte) error {
if err != nil {
return err
}
if e.compressed {
uncompressedData, err := DecompressMariadbData(data[pos:])
if err != nil {
return err
}
return e.DecodeData(0, uncompressedData)
}
return e.DecodeData(pos, data)
}

Expand Down

0 comments on commit df895b5

Please sign in to comment.