diff --git a/canal/canal.go b/canal/canal.go index 6c07bb741..989c4e297 100644 --- a/canal/canal.go +++ b/canal/canal.go @@ -451,6 +451,19 @@ func (c *Canal) prepareSyncer() error { Logger: c.cfg.Logger, Dialer: c.cfg.Dialer, Localhost: c.cfg.Localhost, + RowsEventDecodeFunc: func(event *replication.RowsEvent, data []byte) error { + pos, err := event.DecodeHeader(data) + if err != nil { + return err + } + + key := fmt.Sprintf("%s.%s", string(event.Table.Schema), string(event.Table.Table)) + if !c.checkTableMatch(key) { + return nil + } + + return event.DecodeData(pos, data) + }, } if strings.Contains(c.cfg.Addr, "/") { diff --git a/replication/row_event.go b/replication/row_event.go index 87fe49dbf..d4eaf7105 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -1018,6 +1018,14 @@ func (e *RowsEvent) decodeExtraData(data []byte) (err2 error) { } func (e *RowsEvent) DecodeData(pos int, data []byte) (err2 error) { + if e.compressed { + data, err2 = DecompressMariadbData(data[pos:]) + if err2 != nil { + //nolint:nakedret + return + } + } + // Rows_log_event::print_verbose() var ( @@ -1073,13 +1081,6 @@ func (e *RowsEvent) Decode(data []byte) error { if err != nil { return err } - if e.compressed { - uncompressedData, err := DecompressMariadbData(data[pos:]) - if err != nil { - return err - } - return e.DecodeData(0, uncompressedData) - } return e.DecodeData(pos, data) }