From 3a75f6a26d8819c2a88629e54c7de0b7698ae05c Mon Sep 17 00:00:00 2001 From: zing22845 <60884519+zing22845@users.noreply.github.com> Date: Fri, 20 Oct 2023 16:21:31 +0800 Subject: [PATCH] Add a optional function to decode optional meta of table map event (#833) * lower memory usage by reducing event chan size in streamer When consuming events from stream is blocked, the chan may use too much memory * make streamer's event chan size configurable When the consumption speed of the event chan in streamer cannot catch up with its production speed, leading to an accumulation of events, the current fixed channel size of 10240 might occupy significant memory, potentially triggering an Out Of Memory (OOM) condition. Making the size of the event chan configurable would allow for controlling the memory usage of the streamer. * update cfg name StreamerChanSize to EventCacheSize * EventCacheSize is renamed to EventCacheCount, to make the variable name more reflective of its actual meaning. * Add a optional function to decode optional meta of table map event --- replication/binlogsyncer.go | 3 +++ replication/parser.go | 9 ++++++++- replication/row_event.go | 12 ++++++++++-- 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index a5ca96359..0de77522a 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -121,6 +121,8 @@ type BinlogSyncerConfig struct { RowsEventDecodeFunc func(*RowsEvent, []byte) error + TableMapOptionalMetaDecodeFunc func([]byte) error + DiscardGTIDSet bool EventCacheCount int @@ -189,6 +191,7 @@ func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer { b.parser.SetUseDecimal(b.cfg.UseDecimal) b.parser.SetVerifyChecksum(b.cfg.VerifyChecksum) b.parser.SetRowsEventDecodeFunc(b.cfg.RowsEventDecodeFunc) + b.parser.SetTableMapOptionalMetaDecodeFunc(b.cfg.TableMapOptionalMetaDecodeFunc) b.running = false b.ctx, b.cancel = context.WithCancel(context.Background()) diff --git a/replication/parser.go b/replication/parser.go index 00d37076b..4caf496c2 100644 --- a/replication/parser.go +++ b/replication/parser.go @@ -42,6 +42,8 @@ type BinlogParser struct { verifyChecksum bool rowsEventDecodeFunc func(*RowsEvent, []byte) error + + tableMapOptionalMetaDecodeFunc func([]byte) error } func NewBinlogParser() *BinlogParser { @@ -218,6 +220,10 @@ func (p *BinlogParser) SetRowsEventDecodeFunc(rowsEventDecodeFunc func(*RowsEven p.rowsEventDecodeFunc = rowsEventDecodeFunc } +func (p *BinlogParser) SetTableMapOptionalMetaDecodeFunc(tableMapOptionalMetaDecondeFunc func([]byte) error) { + p.tableMapOptionalMetaDecodeFunc = tableMapOptionalMetaDecondeFunc +} + func (p *BinlogParser) parseHeader(data []byte) (*EventHeader, error) { h := new(EventHeader) err := h.Decode(data) @@ -257,7 +263,8 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) ( e = &XIDEvent{} case TABLE_MAP_EVENT: te := &TableMapEvent{ - flavor: p.flavor, + flavor: p.flavor, + optionalMetaDecodeFunc: p.tableMapOptionalMetaDecodeFunc, } if p.format.EventTypeHeaderLengths[TABLE_MAP_EVENT-1] == 6 { te.tableIDSize = 4 diff --git a/replication/row_event.go b/replication/row_event.go index d4eaf7105..d020cbc25 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -84,6 +84,8 @@ type TableMapEvent struct { // VisibilityBitmap stores bits that are set if corresponding column is not invisible (MySQL 8.0.23+) VisibilityBitmap []byte + + optionalMetaDecodeFunc func(data []byte) (err error) } func (e *TableMapEvent) Decode(data []byte) error { @@ -140,8 +142,14 @@ func (e *TableMapEvent) Decode(data []byte) error { pos += nullBitmapSize - if err = e.decodeOptionalMeta(data[pos:]); err != nil { - return err + if e.optionalMetaDecodeFunc != nil { + if err = e.optionalMetaDecodeFunc(data[pos:]); err != nil { + return err + } + } else { + if err = e.decodeOptionalMeta(data[pos:]); err != nil { + return err + } } return nil