Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a optional function to decode optional meta of table map event #833

Merged
merged 7 commits into from
Oct 20, 2023
3 changes: 3 additions & 0 deletions replication/binlogsyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ type BinlogSyncerConfig struct {

RowsEventDecodeFunc func(*RowsEvent, []byte) error

TableMapOptionalMetaDecodeFunc func([]byte) error

DiscardGTIDSet bool

EventCacheCount int
Expand Down Expand Up @@ -189,6 +191,7 @@ func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer {
b.parser.SetUseDecimal(b.cfg.UseDecimal)
b.parser.SetVerifyChecksum(b.cfg.VerifyChecksum)
b.parser.SetRowsEventDecodeFunc(b.cfg.RowsEventDecodeFunc)
b.parser.SetTableMapOptionalMetaDecodeFunc(b.cfg.TableMapOptionalMetaDecodeFunc)
b.running = false
b.ctx, b.cancel = context.WithCancel(context.Background())

Expand Down
9 changes: 8 additions & 1 deletion replication/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type BinlogParser struct {
verifyChecksum bool

rowsEventDecodeFunc func(*RowsEvent, []byte) error

tableMapOptionalMetaDecodeFunc func([]byte) error
}

func NewBinlogParser() *BinlogParser {
Expand Down Expand Up @@ -218,6 +220,10 @@ func (p *BinlogParser) SetRowsEventDecodeFunc(rowsEventDecodeFunc func(*RowsEven
p.rowsEventDecodeFunc = rowsEventDecodeFunc
}

func (p *BinlogParser) SetTableMapOptionalMetaDecodeFunc(tableMapOptionalMetaDecondeFunc func([]byte) error) {
p.tableMapOptionalMetaDecodeFunc = tableMapOptionalMetaDecondeFunc
}

func (p *BinlogParser) parseHeader(data []byte) (*EventHeader, error) {
h := new(EventHeader)
err := h.Decode(data)
Expand Down Expand Up @@ -257,7 +263,8 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) (
e = &XIDEvent{}
case TABLE_MAP_EVENT:
te := &TableMapEvent{
flavor: p.flavor,
flavor: p.flavor,
optionalMetaDecodeFunc: p.tableMapOptionalMetaDecodeFunc,
}
if p.format.EventTypeHeaderLengths[TABLE_MAP_EVENT-1] == 6 {
te.tableIDSize = 4
Expand Down
12 changes: 10 additions & 2 deletions replication/row_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ type TableMapEvent struct {

// VisibilityBitmap stores bits that are set if corresponding column is not invisible (MySQL 8.0.23+)
VisibilityBitmap []byte

optionalMetaDecodeFunc func(data []byte) (err error)
}

func (e *TableMapEvent) Decode(data []byte) error {
Expand Down Expand Up @@ -140,8 +142,14 @@ func (e *TableMapEvent) Decode(data []byte) error {

pos += nullBitmapSize

if err = e.decodeOptionalMeta(data[pos:]); err != nil {
return err
if e.optionalMetaDecodeFunc != nil {
if err = e.optionalMetaDecodeFunc(data[pos:]); err != nil {
return err
}
} else {
if err = e.decodeOptionalMeta(data[pos:]); err != nil {
return err
}
}

return nil
Expand Down