From f866148e7d6718a8053f47d2e17944c46d718ca0 Mon Sep 17 00:00:00 2001 From: zing22845 <60884519+zing22845@users.noreply.github.com> Date: Mon, 9 Oct 2023 10:53:53 +0800 Subject: [PATCH 1/5] lower memory usage by reducing event chan size in streamer When consuming events from stream is blocked, the chan may use too much memory --- replication/binlogstreamer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/replication/binlogstreamer.go b/replication/binlogstreamer.go index 61254fba0..c3617bf5d 100644 --- a/replication/binlogstreamer.go +++ b/replication/binlogstreamer.go @@ -87,7 +87,7 @@ func (s *BinlogStreamer) closeWithError(err error) { func NewBinlogStreamer() *BinlogStreamer { s := new(BinlogStreamer) - s.ch = make(chan *BinlogEvent, 10240) + s.ch = make(chan *BinlogEvent, 16) s.ech = make(chan error, 4) return s From 998731174616d61ab0fcd99be6bc97ae78c969a7 Mon Sep 17 00:00:00 2001 From: zing22845 <60884519+zing22845@users.noreply.github.com> Date: Mon, 9 Oct 2023 11:18:06 +0800 Subject: [PATCH 2/5] make streamer's event chan size configurable When the consumption speed of the event chan in streamer cannot catch up with its production speed, leading to an accumulation of events, the current fixed channel size of 10240 might occupy significant memory, potentially triggering an Out Of Memory (OOM) condition. Making the size of the event chan configurable would allow for controlling the memory usage of the streamer. --- replication/binlogstreamer.go | 10 +++++++++- replication/binlogsyncer.go | 7 ++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/replication/binlogstreamer.go b/replication/binlogstreamer.go index c3617bf5d..72bc7ddd0 100644 --- a/replication/binlogstreamer.go +++ b/replication/binlogstreamer.go @@ -85,9 +85,17 @@ func (s *BinlogStreamer) closeWithError(err error) { } func NewBinlogStreamer() *BinlogStreamer { + return NewBinlogStreamerWithChanSize(10240) +} + +func NewBinlogStreamerWithChanSize(chanSize int) *BinlogStreamer { s := new(BinlogStreamer) - s.ch = make(chan *BinlogEvent, 16) + if chanSize <= 0 { + chanSize = 10240 + } + + s.ch = make(chan *BinlogEvent, chanSize) s.ech = make(chan error, 4) return s diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 9c70740fc..45277559b 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -122,6 +122,8 @@ type BinlogSyncerConfig struct { RowsEventDecodeFunc func(*RowsEvent, []byte) error DiscardGTIDSet bool + + StreamerChanSize int } // BinlogSyncer syncs binlog event from server. @@ -166,6 +168,9 @@ func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer { dialer := &net.Dialer{} cfg.Dialer = dialer.DialContext } + if cfg.StreamerChanSize == 0 { + cfg.StreamerChanSize = 10240 + } // Clear the Password to avoid outputing it in log. pass := cfg.Password @@ -393,7 +398,7 @@ func (b *BinlogSyncer) prepare() error { func (b *BinlogSyncer) startDumpStream() *BinlogStreamer { b.running = true - s := NewBinlogStreamer() + s := NewBinlogStreamerWithChanSize(b.cfg.StreamerChanSize) b.wg.Add(1) go b.onStream(s) From e85bf72d10265421d25d3eebc53910f11bf48898 Mon Sep 17 00:00:00 2001 From: zing22845 <60884519+zing22845@users.noreply.github.com> Date: Wed, 11 Oct 2023 18:04:41 +0800 Subject: [PATCH 3/5] update cfg name StreamerChanSize to EventCacheSize --- replication/binlogsyncer.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 45277559b..c2e4a6f96 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -123,7 +123,7 @@ type BinlogSyncerConfig struct { DiscardGTIDSet bool - StreamerChanSize int + EventCacheSize int } // BinlogSyncer syncs binlog event from server. @@ -168,8 +168,8 @@ func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer { dialer := &net.Dialer{} cfg.Dialer = dialer.DialContext } - if cfg.StreamerChanSize == 0 { - cfg.StreamerChanSize = 10240 + if cfg.EventCacheSize == 0 { + cfg.EventCacheSize = 10240 } // Clear the Password to avoid outputing it in log. @@ -398,7 +398,7 @@ func (b *BinlogSyncer) prepare() error { func (b *BinlogSyncer) startDumpStream() *BinlogStreamer { b.running = true - s := NewBinlogStreamerWithChanSize(b.cfg.StreamerChanSize) + s := NewBinlogStreamerWithChanSize(b.cfg.EventCacheSize) b.wg.Add(1) go b.onStream(s) From ef58cab6e8b555679302fe77d1eb9b6b7b5b4c6c Mon Sep 17 00:00:00 2001 From: zing22845 <60884519+zing22845@users.noreply.github.com> Date: Thu, 12 Oct 2023 10:41:16 +0800 Subject: [PATCH 4/5] EventCacheSize is renamed to EventCacheCount, to make the variable name more reflective of its actual meaning. --- replication/binlogsyncer.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index c2e4a6f96..a5ca96359 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -123,7 +123,7 @@ type BinlogSyncerConfig struct { DiscardGTIDSet bool - EventCacheSize int + EventCacheCount int } // BinlogSyncer syncs binlog event from server. @@ -168,8 +168,8 @@ func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer { dialer := &net.Dialer{} cfg.Dialer = dialer.DialContext } - if cfg.EventCacheSize == 0 { - cfg.EventCacheSize = 10240 + if cfg.EventCacheCount == 0 { + cfg.EventCacheCount = 10240 } // Clear the Password to avoid outputing it in log. @@ -398,7 +398,7 @@ func (b *BinlogSyncer) prepare() error { func (b *BinlogSyncer) startDumpStream() *BinlogStreamer { b.running = true - s := NewBinlogStreamerWithChanSize(b.cfg.EventCacheSize) + s := NewBinlogStreamerWithChanSize(b.cfg.EventCacheCount) b.wg.Add(1) go b.onStream(s) From 165d121aeb1215c3d31c6c8171526ac28cb60d6a Mon Sep 17 00:00:00 2001 From: zing22845 <60884519+zing22845@users.noreply.github.com> Date: Thu, 19 Oct 2023 13:57:05 +0800 Subject: [PATCH 5/5] Add a optional function to decode optional meta of table map event --- replication/binlogsyncer.go | 3 +++ replication/parser.go | 9 ++++++++- replication/row_event.go | 12 ++++++++++-- 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index a5ca96359..0de77522a 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -121,6 +121,8 @@ type BinlogSyncerConfig struct { RowsEventDecodeFunc func(*RowsEvent, []byte) error + TableMapOptionalMetaDecodeFunc func([]byte) error + DiscardGTIDSet bool EventCacheCount int @@ -189,6 +191,7 @@ func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer { b.parser.SetUseDecimal(b.cfg.UseDecimal) b.parser.SetVerifyChecksum(b.cfg.VerifyChecksum) b.parser.SetRowsEventDecodeFunc(b.cfg.RowsEventDecodeFunc) + b.parser.SetTableMapOptionalMetaDecodeFunc(b.cfg.TableMapOptionalMetaDecodeFunc) b.running = false b.ctx, b.cancel = context.WithCancel(context.Background()) diff --git a/replication/parser.go b/replication/parser.go index 00d37076b..4caf496c2 100644 --- a/replication/parser.go +++ b/replication/parser.go @@ -42,6 +42,8 @@ type BinlogParser struct { verifyChecksum bool rowsEventDecodeFunc func(*RowsEvent, []byte) error + + tableMapOptionalMetaDecodeFunc func([]byte) error } func NewBinlogParser() *BinlogParser { @@ -218,6 +220,10 @@ func (p *BinlogParser) SetRowsEventDecodeFunc(rowsEventDecodeFunc func(*RowsEven p.rowsEventDecodeFunc = rowsEventDecodeFunc } +func (p *BinlogParser) SetTableMapOptionalMetaDecodeFunc(tableMapOptionalMetaDecondeFunc func([]byte) error) { + p.tableMapOptionalMetaDecodeFunc = tableMapOptionalMetaDecondeFunc +} + func (p *BinlogParser) parseHeader(data []byte) (*EventHeader, error) { h := new(EventHeader) err := h.Decode(data) @@ -257,7 +263,8 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) ( e = &XIDEvent{} case TABLE_MAP_EVENT: te := &TableMapEvent{ - flavor: p.flavor, + flavor: p.flavor, + optionalMetaDecodeFunc: p.tableMapOptionalMetaDecodeFunc, } if p.format.EventTypeHeaderLengths[TABLE_MAP_EVENT-1] == 6 { te.tableIDSize = 4 diff --git a/replication/row_event.go b/replication/row_event.go index d4eaf7105..d020cbc25 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -84,6 +84,8 @@ type TableMapEvent struct { // VisibilityBitmap stores bits that are set if corresponding column is not invisible (MySQL 8.0.23+) VisibilityBitmap []byte + + optionalMetaDecodeFunc func(data []byte) (err error) } func (e *TableMapEvent) Decode(data []byte) error { @@ -140,8 +142,14 @@ func (e *TableMapEvent) Decode(data []byte) error { pos += nullBitmapSize - if err = e.decodeOptionalMeta(data[pos:]); err != nil { - return err + if e.optionalMetaDecodeFunc != nil { + if err = e.optionalMetaDecodeFunc(data[pos:]); err != nil { + return err + } + } else { + if err = e.decodeOptionalMeta(data[pos:]); err != nil { + return err + } } return nil