diff --git a/canal/sync.go b/canal/sync.go index a4ae8b583..7df181226 100644 --- a/canal/sync.go +++ b/canal/sync.go @@ -253,11 +253,11 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error { } var action string switch e.Header.EventType { - case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2: + case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2, replication.MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1: action = InsertAction - case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2: + case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2, replication.MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1: action = DeleteAction - case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2: + case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2, replication.MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1: action = UpdateAction default: return errors.Errorf("%s not supported now", e.Header.EventType) diff --git a/mysql/util.go b/mysql/util.go index 6d8ec4471..5abe540bc 100644 --- a/mysql/util.go +++ b/mysql/util.go @@ -1,6 +1,8 @@ package mysql import ( + "bytes" + "compress/zlib" "crypto/rand" "crypto/rsa" "crypto/sha1" @@ -92,6 +94,24 @@ func EncryptPassword(password string, seed []byte, pub *rsa.PublicKey) ([]byte, return rsa.EncryptOAEP(sha1v, rand.Reader, pub, plain, nil) } +func DecompressMariadbData(data []byte) ([]byte, error) { + // algorithm always 0=zlib + // algorithm := (data[pos] & 0x07) >> 4 + headerSize := int(data[0] & 0x07) + uncompressedDataSize := BFixedLengthInt(data[1 : 1+headerSize]) + uncompressedData := make([]byte, uncompressedDataSize) + r, err := zlib.NewReader(bytes.NewReader(data[1+headerSize:])) + if err != nil { + return nil, err + } + defer r.Close() + _, err = io.ReadFull(r, uncompressedData) + if err != nil { + return nil, err + } + return uncompressedData, nil +} + // AppendLengthEncodedInteger: encodes a uint64 value and appends it to the given bytes slice func AppendLengthEncodedInteger(b []byte, n uint64) []byte { switch { diff --git a/replication/const.go b/replication/const.go index fc31763ba..2ea6fb185 100644 --- a/replication/const.go +++ b/replication/const.go @@ -101,6 +101,11 @@ const ( MARIADB_BINLOG_CHECKPOINT_EVENT MARIADB_GTID_EVENT MARIADB_GTID_LIST_EVENT + MARIADB_START_ENCRYPTION_EVENT + MARIADB_QUERY_COMPRESSED_EVENT + MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1 + MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1 + MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1 ) func (e EventType) String() string { @@ -197,6 +202,16 @@ func (e EventType) String() string { return "TransactionPayloadEvent" case HEARTBEAT_LOG_EVENT_V2: return "HeartbeatLogEventV2" + case MARIADB_START_ENCRYPTION_EVENT: + return "MariadbStartEncryptionEvent" + case MARIADB_QUERY_COMPRESSED_EVENT: + return "MariadbQueryCompressedEvent" + case MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1: + return "MariadbWriteRowsCompressedEventV1" + case MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1: + return "MariadbUpdateRowsCompressedEventV1" + case MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1: + return "MariadbDeleteRowsCompressedEventV1" default: return "UnknownEvent" diff --git a/replication/event.go b/replication/event.go index 5a1f5c654..264b230ca 100644 --- a/replication/event.go +++ b/replication/event.go @@ -297,6 +297,9 @@ type QueryEvent struct { Schema []byte Query []byte + // for mariadb QUERY_COMPRESSED_EVENT + compressed bool + // in fact QueryEvent dosen't have the GTIDSet information, just for beneficial to use GSet GTIDSet } @@ -328,7 +331,15 @@ func (e *QueryEvent) Decode(data []byte) error { //skip 0x00 pos++ - e.Query = data[pos:] + if e.compressed { + decompressedQuery, err := DecompressMariadbData(data[pos:]) + if err != nil { + return err + } + e.Query = decompressedQuery + } else { + e.Query = data[pos:] + } return nil } diff --git a/replication/parser.go b/replication/parser.go index 37b9c5a6c..00d37076b 100644 --- a/replication/parser.go +++ b/replication/parser.go @@ -249,6 +249,10 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) ( switch h.EventType { case QUERY_EVENT: e = &QueryEvent{} + case MARIADB_QUERY_COMPRESSED_EVENT: + e = &QueryEvent{ + compressed: true, + } case XID_EVENT: e = &XIDEvent{} case TABLE_MAP_EVENT: @@ -270,7 +274,11 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) ( WRITE_ROWS_EVENTv2, UPDATE_ROWS_EVENTv2, DELETE_ROWS_EVENTv2, + MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1, + MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1, + MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1, PARTIAL_UPDATE_ROWS_EVENT: // Extension of UPDATE_ROWS_EVENT, allowing partial values according to binlog_row_value_options + e = p.newRowsEvent(h) case ROWS_QUERY_EVENT: e = &RowsQueryEvent{} @@ -412,6 +420,16 @@ func (p *BinlogParser) newRowsEvent(h *EventHeader) *RowsEvent { case UPDATE_ROWS_EVENTv1: e.Version = 1 e.needBitmap2 = true + case MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1: + e.Version = 1 + e.compressed = true + case MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1: + e.Version = 1 + e.compressed = true + case MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1: + e.Version = 1 + e.compressed = true + e.needBitmap2 = true case WRITE_ROWS_EVENTv2: e.Version = 2 case UPDATE_ROWS_EVENTv2: diff --git a/replication/row_event.go b/replication/row_event.go index 6e68648eb..a7fad168b 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -830,6 +830,9 @@ type RowsEvent struct { tables map[uint64]*TableMapEvent needBitmap2 bool + // for mariadb *_COMPRESSED_EVENT_V1 + compressed bool + eventType EventType Table *TableMapEvent @@ -970,9 +973,9 @@ func (e *RowsEvent) DecodeData(pos int, data []byte) (err2 error) { var rowImageType EnumRowImageType switch e.eventType { - case WRITE_ROWS_EVENTv0, WRITE_ROWS_EVENTv1, WRITE_ROWS_EVENTv2: + case WRITE_ROWS_EVENTv0, WRITE_ROWS_EVENTv1, WRITE_ROWS_EVENTv2, MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1: rowImageType = EnumRowImageTypeWriteAI - case DELETE_ROWS_EVENTv0, DELETE_ROWS_EVENTv1, DELETE_ROWS_EVENTv2: + case DELETE_ROWS_EVENTv0, DELETE_ROWS_EVENTv1, DELETE_ROWS_EVENTv2, MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1: rowImageType = EnumRowImageTypeDeleteBI default: rowImageType = EnumRowImageTypeUpdateBI @@ -1002,6 +1005,13 @@ func (e *RowsEvent) Decode(data []byte) error { if err != nil { return err } + if e.compressed { + uncompressedData, err := DecompressMariadbData(data[pos:]) + if err != nil { + return err + } + return e.DecodeData(0, uncompressedData) + } return e.DecodeData(pos, data) }