Skip to content

Commit

Permalink
Add a optional function to decode optional meta of table map event (#833
Browse files Browse the repository at this point in the history
)

* lower memory usage by reducing event chan size in streamer

When consuming events from stream is blocked, the chan may use too much memory

* make streamer's event chan size configurable

When the consumption speed of the event chan in streamer cannot catch up with its production speed, leading to an accumulation of events, the current fixed channel size of 10240 might occupy significant memory, potentially triggering an Out Of Memory (OOM) condition. Making the size of the event chan configurable would allow for controlling the memory usage of the streamer.

* update cfg name StreamerChanSize to EventCacheSize

* EventCacheSize is renamed to EventCacheCount, to make the variable name more reflective of its actual meaning.

* Add a optional function to decode optional meta of table map event
  • Loading branch information
zing22845 authored Oct 20, 2023
1 parent e598b6e commit 3a75f6a
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 3 deletions.
3 changes: 3 additions & 0 deletions replication/binlogsyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ type BinlogSyncerConfig struct {

RowsEventDecodeFunc func(*RowsEvent, []byte) error

TableMapOptionalMetaDecodeFunc func([]byte) error

DiscardGTIDSet bool

EventCacheCount int
Expand Down Expand Up @@ -189,6 +191,7 @@ func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer {
b.parser.SetUseDecimal(b.cfg.UseDecimal)
b.parser.SetVerifyChecksum(b.cfg.VerifyChecksum)
b.parser.SetRowsEventDecodeFunc(b.cfg.RowsEventDecodeFunc)
b.parser.SetTableMapOptionalMetaDecodeFunc(b.cfg.TableMapOptionalMetaDecodeFunc)
b.running = false
b.ctx, b.cancel = context.WithCancel(context.Background())

Expand Down
9 changes: 8 additions & 1 deletion replication/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type BinlogParser struct {
verifyChecksum bool

rowsEventDecodeFunc func(*RowsEvent, []byte) error

tableMapOptionalMetaDecodeFunc func([]byte) error
}

func NewBinlogParser() *BinlogParser {
Expand Down Expand Up @@ -218,6 +220,10 @@ func (p *BinlogParser) SetRowsEventDecodeFunc(rowsEventDecodeFunc func(*RowsEven
p.rowsEventDecodeFunc = rowsEventDecodeFunc
}

func (p *BinlogParser) SetTableMapOptionalMetaDecodeFunc(tableMapOptionalMetaDecondeFunc func([]byte) error) {
p.tableMapOptionalMetaDecodeFunc = tableMapOptionalMetaDecondeFunc
}

func (p *BinlogParser) parseHeader(data []byte) (*EventHeader, error) {
h := new(EventHeader)
err := h.Decode(data)
Expand Down Expand Up @@ -257,7 +263,8 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) (
e = &XIDEvent{}
case TABLE_MAP_EVENT:
te := &TableMapEvent{
flavor: p.flavor,
flavor: p.flavor,
optionalMetaDecodeFunc: p.tableMapOptionalMetaDecodeFunc,
}
if p.format.EventTypeHeaderLengths[TABLE_MAP_EVENT-1] == 6 {
te.tableIDSize = 4
Expand Down
12 changes: 10 additions & 2 deletions replication/row_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ type TableMapEvent struct {

// VisibilityBitmap stores bits that are set if corresponding column is not invisible (MySQL 8.0.23+)
VisibilityBitmap []byte

optionalMetaDecodeFunc func(data []byte) (err error)
}

func (e *TableMapEvent) Decode(data []byte) error {
Expand Down Expand Up @@ -140,8 +142,14 @@ func (e *TableMapEvent) Decode(data []byte) error {

pos += nullBitmapSize

if err = e.decodeOptionalMeta(data[pos:]); err != nil {
return err
if e.optionalMetaDecodeFunc != nil {
if err = e.optionalMetaDecodeFunc(data[pos:]); err != nil {
return err
}
} else {
if err = e.decodeOptionalMeta(data[pos:]); err != nil {
return err
}
}

return nil
Expand Down

0 comments on commit 3a75f6a

Please sign in to comment.