From 2d8ffecc9f25c23b562dd76b61b879d534b36660 Mon Sep 17 00:00:00 2001 From: Fizic Date: Tue, 20 Dec 2022 16:53:11 +0300 Subject: [PATCH 01/14] feat: add new handler and errors --- mysql/errcode.go | 1 + mysql/errname.go | 1 + server/command.go | 25 +++++++++++++++++++++++++ 3 files changed, 27 insertions(+) diff --git a/mysql/errcode.go b/mysql/errcode.go index 1be5c44ab..a1abd51ed 100644 --- a/mysql/errcode.go +++ b/mysql/errcode.go @@ -867,4 +867,5 @@ const ( ER_MUST_CHANGE_PASSWORD_LOGIN = 1862 ER_ROW_IN_WRONG_PARTITION = 1863 ER_ERROR_LAST = 1863 + ER_INVALID_HANDLER = 1864 ) diff --git a/mysql/errname.go b/mysql/errname.go index 592f4bb5f..5d07ce18a 100644 --- a/mysql/errname.go +++ b/mysql/errname.go @@ -865,4 +865,5 @@ var MySQLErrName = map[uint16]string{ ER_ALTER_OPERATION_NOT_SUPPORTED_REASON_NOT_NULL: "cannot silently convert NULL values, as required in this SQL_MODE", ER_MUST_CHANGE_PASSWORD_LOGIN: "Your password has expired. To log in you must change it using a client that supports expired passwords.", ER_ROW_IN_WRONG_PARTITION: "Found a row in wrong partition %s", + ER_INVALID_HANDLER: "Invalid handler reference, use %s", } diff --git a/server/command.go b/server/command.go index c635f56e4..e1e6eff91 100644 --- a/server/command.go +++ b/server/command.go @@ -3,6 +3,7 @@ package server import ( "bytes" "fmt" + "github.com/go-mysql-org/go-mysql/replication" . "github.com/go-mysql-org/go-mysql/mysql" "github.com/siddontang/go/hack" @@ -30,6 +31,11 @@ type Handler interface { HandleOtherCommand(cmd byte, data []byte) error } +type ReplicationHandler interface { + // handle Replication command + HandleBinlogDump(position uint32, name string) (*replication.Event, error) +} + func (c *Conn) HandleCommand() error { if c.Conn == nil { return fmt.Errorf("connection closed") @@ -74,6 +80,21 @@ func (c *Conn) dispatch(data []byte) interface{} { } case COM_PING: return nil + case COM_BINLOG_DUMP: + position := uint32(data[0]) | uint32(data[1])<<8 | uint32(data[2])<<16 | uint32(data[3])<<24 + // get server id + _ = uint32(data[4]) | uint32(data[5])<<8 | uint32(data[6])<<16 | uint32(data[7])<<24 + name := string(data[8:]) + if h, ok := c.h.(ReplicationHandler); ok { + if r, err := h.HandleBinlogDump(position, name); err != nil { + return err + } else { + return r + } + } else { + // TODO add normal error + return NewDefaultError(ER_UNKNOWN_ERROR, "not support binlog dump") + } case COM_INIT_DB: if err := c.h.UseDB(hack.String(data)); err != nil { return err @@ -146,6 +167,10 @@ func (h EmptyHandler) HandleQuery(query string) (*Result, error) { return nil, fmt.Errorf("not supported now") } +func (h EmptyHandler) HandleBinlogDump(position uint32, name string) (*replication.Event, error) { + return nil, fmt.Errorf("not supported now") +} + func (h EmptyHandler) HandleFieldList(table string, fieldWildcard string) ([]*Field, error) { return nil, fmt.Errorf("not supported now") } From e96001b52d7de1a2895f47f0f63c7365cb75cbf6 Mon Sep 17 00:00:00 2001 From: Fizic Date: Wed, 21 Dec 2022 17:40:21 +0300 Subject: [PATCH 02/14] fix: error codes, feat: method in BinlogStreamer, work with replication protocol in server package --- mysql/errcode.go | 1 - mysql/errname.go | 1 - replication/binlogstreamer.go | 11 +++++++- server/command.go | 51 ++++++++++++++++++++--------------- server/resp.go | 26 ++++++++++++++++++ 5 files changed, 66 insertions(+), 24 deletions(-) diff --git a/mysql/errcode.go b/mysql/errcode.go index a1abd51ed..1be5c44ab 100644 --- a/mysql/errcode.go +++ b/mysql/errcode.go @@ -867,5 +867,4 @@ const ( ER_MUST_CHANGE_PASSWORD_LOGIN = 1862 ER_ROW_IN_WRONG_PARTITION = 1863 ER_ERROR_LAST = 1863 - ER_INVALID_HANDLER = 1864 ) diff --git a/mysql/errname.go b/mysql/errname.go index 5d07ce18a..592f4bb5f 100644 --- a/mysql/errname.go +++ b/mysql/errname.go @@ -865,5 +865,4 @@ var MySQLErrName = map[uint16]string{ ER_ALTER_OPERATION_NOT_SUPPORTED_REASON_NOT_NULL: "cannot silently convert NULL values, as required in this SQL_MODE", ER_MUST_CHANGE_PASSWORD_LOGIN: "Your password has expired. To log in you must change it using a client that supports expired passwords.", ER_ROW_IN_WRONG_PARTITION: "Found a row in wrong partition %s", - ER_INVALID_HANDLER: "Invalid handler reference, use %s", } diff --git a/replication/binlogstreamer.go b/replication/binlogstreamer.go index 56b8622a8..d6de19403 100644 --- a/replication/binlogstreamer.go +++ b/replication/binlogstreamer.go @@ -84,7 +84,7 @@ func (s *BinlogStreamer) closeWithError(err error) { } } -func newBinlogStreamer() *BinlogStreamer { +func NewBinlogStreamer() *BinlogStreamer { s := new(BinlogStreamer) s.ch = make(chan *BinlogEvent, 10240) @@ -92,3 +92,12 @@ func newBinlogStreamer() *BinlogStreamer { return s } + +func (s *BinlogStreamer) AddEventToStreamer(ev *BinlogEvent) error { + select { + case s.ch <- ev: + return nil + case err := <-s.ech: + return err + } +} diff --git a/server/command.go b/server/command.go index e1e6eff91..136a57655 100644 --- a/server/command.go +++ b/server/command.go @@ -32,8 +32,9 @@ type Handler interface { } type ReplicationHandler interface { - // handle Replication command - HandleBinlogDump(position uint32, name string) (*replication.Event, error) + //handle Replication command + HandleBinlogDump(position uint32, name string, s *replication.BinlogStreamer) + HandleBinlogDumpGTID(gtidSet string, s *replication.BinlogStreamer) } func (c *Conn) HandleCommand() error { @@ -80,21 +81,6 @@ func (c *Conn) dispatch(data []byte) interface{} { } case COM_PING: return nil - case COM_BINLOG_DUMP: - position := uint32(data[0]) | uint32(data[1])<<8 | uint32(data[2])<<16 | uint32(data[3])<<24 - // get server id - _ = uint32(data[4]) | uint32(data[5])<<8 | uint32(data[6])<<16 | uint32(data[7])<<24 - name := string(data[8:]) - if h, ok := c.h.(ReplicationHandler); ok { - if r, err := h.HandleBinlogDump(position, name); err != nil { - return err - } else { - return r - } - } else { - // TODO add normal error - return NewDefaultError(ER_UNKNOWN_ERROR, "not support binlog dump") - } case COM_INIT_DB: if err := c.h.UseDB(hack.String(data)); err != nil { return err @@ -152,6 +138,23 @@ func (c *Conn) dispatch(data []byte) interface{} { } return eofResponse{} + case COM_REGISTER_SLAVE: + return nil + case COM_BINLOG_DUMP: + position := uint32(data[1]) | uint32(data[2])<<8 | uint32(data[3])<<16 | uint32(data[4])<<24 + // get server id + _ = uint32(data[4]) | uint32(data[5])<<8 | uint32(data[6])<<16 | uint32(data[7])<<24 + name := string(data[9:]) + if h, ok := c.h.(ReplicationHandler); ok { + s := replication.NewBinlogStreamer() + go h.HandleBinlogDump(position, name, s) + return s + } else { + return c.h.HandleOtherCommand(cmd, data) + } + case COM_BINLOG_DUMP_GTID: + // TODO support GTID + return nil default: return c.h.HandleOtherCommand(cmd, data) } @@ -160,6 +163,10 @@ func (c *Conn) dispatch(data []byte) interface{} { type EmptyHandler struct { } +type EmptyReplicationHandler struct { + EmptyHandler +} + func (h EmptyHandler) UseDB(dbName string) error { return nil } @@ -167,10 +174,6 @@ func (h EmptyHandler) HandleQuery(query string) (*Result, error) { return nil, fmt.Errorf("not supported now") } -func (h EmptyHandler) HandleBinlogDump(position uint32, name string) (*replication.Event, error) { - return nil, fmt.Errorf("not supported now") -} - func (h EmptyHandler) HandleFieldList(table string, fieldWildcard string) ([]*Field, error) { return nil, fmt.Errorf("not supported now") } @@ -185,6 +188,12 @@ func (h EmptyHandler) HandleStmtClose(context interface{}) error { return nil } +func (h EmptyReplicationHandler) HandleBinlogDump(position uint32, name string, r *replication.BinlogStreamer) { +} + +func (h EmptyReplicationHandler) HandleBinlogDumpGTID(gtidSet string, r *replication.BinlogStreamer) { +} + func (h EmptyHandler) HandleOtherCommand(cmd byte, data []byte) error { return NewError( ER_UNKNOWN_ERROR, diff --git a/server/resp.go b/server/resp.go index f7f395d8b..93f2f5e33 100644 --- a/server/resp.go +++ b/server/resp.go @@ -1,9 +1,12 @@ package server import ( + "context" "fmt" + "time" . "github.com/go-mysql-org/go-mysql/mysql" + "github.com/go-mysql-org/go-mysql/replication" ) func (c *Conn) writeOK(r *Result) error { @@ -197,6 +200,27 @@ func (c *Conn) writeFieldValues(fv []FieldValue) error { return c.WritePacket(data) } +func (c *Conn) writeBinlogEvent(s *replication.BinlogStreamer) error { + for { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + ev, err := s.GetEvent(ctx) + cancel() + + if err == context.DeadlineExceeded { + break + } + data := make([]byte, 4, 32) + data = append(data, OK_HEADER) + + data = append(data, ev.RawData...) + if err := c.WritePacket(data); err != nil { + return err + } + } + + return nil +} + type noResponse struct{} type eofResponse struct{} @@ -220,6 +244,8 @@ func (c *Conn) WriteValue(value interface{}) error { return c.writeFieldList(v, nil) case []FieldValue: return c.writeFieldValues(v) + case *replication.BinlogStreamer: + return c.writeBinlogEvent(v) case *Stmt: return c.writePrepare(v) default: From cae8dcf33fcaea8486bc893e2782edf4a5c21db2 Mon Sep 17 00:00:00 2001 From: Fizic Date: Thu, 22 Dec 2022 15:25:51 +0300 Subject: [PATCH 03/14] fix: Bug with use private method feat: all methods for replication protocol --- mysql/errcode.go | 1 + mysql/errname.go | 1 + replication/binlogsyncer.go | 2 +- server/command.go | 35 ++++++++++++++++++++++++----------- 4 files changed, 27 insertions(+), 12 deletions(-) diff --git a/mysql/errcode.go b/mysql/errcode.go index 1be5c44ab..b5f5533d1 100644 --- a/mysql/errcode.go +++ b/mysql/errcode.go @@ -867,4 +867,5 @@ const ( ER_MUST_CHANGE_PASSWORD_LOGIN = 1862 ER_ROW_IN_WRONG_PARTITION = 1863 ER_ERROR_LAST = 1863 + ER_HANDLER_DOES_NOT_SUPPORT = 1864 ) diff --git a/mysql/errname.go b/mysql/errname.go index 592f4bb5f..998a3bf8b 100644 --- a/mysql/errname.go +++ b/mysql/errname.go @@ -865,4 +865,5 @@ var MySQLErrName = map[uint16]string{ ER_ALTER_OPERATION_NOT_SUPPORTED_REASON_NOT_NULL: "cannot silently convert NULL values, as required in this SQL_MODE", ER_MUST_CHANGE_PASSWORD_LOGIN: "Your password has expired. To log in you must change it using a client that supports expired passwords.", ER_ROW_IN_WRONG_PARTITION: "Found a row in wrong partition %s", + ER_HANDLER_DOES_NOT_SUPPORT: "The handler does not support %s, use %s instead.", } diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 7421e102b..113ffb419 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -393,7 +393,7 @@ func (b *BinlogSyncer) prepare() error { func (b *BinlogSyncer) startDumpStream() *BinlogStreamer { b.running = true - s := newBinlogStreamer() + s := NewBinlogStreamer() b.wg.Add(1) go b.onStream(s) diff --git a/server/command.go b/server/command.go index 136a57655..ec5529b5a 100644 --- a/server/command.go +++ b/server/command.go @@ -33,8 +33,9 @@ type Handler interface { type ReplicationHandler interface { //handle Replication command - HandleBinlogDump(position uint32, name string, s *replication.BinlogStreamer) - HandleBinlogDumpGTID(gtidSet string, s *replication.BinlogStreamer) + HandleRegisterSlave(data []byte) error + HandleBinlogDump(data []byte, s *replication.BinlogStreamer) + HandleBinlogDumpGTID(data []byte, s *replication.BinlogStreamer) } func (c *Conn) HandleCommand() error { @@ -139,22 +140,30 @@ func (c *Conn) dispatch(data []byte) interface{} { return eofResponse{} case COM_REGISTER_SLAVE: - return nil + if h, ok := c.h.(ReplicationHandler); ok { + if err := h.HandleRegisterSlave(data); err != nil { + return err + } + return nil + } else { + return NewDefaultError(ER_HANDLER_DOES_NOT_SUPPORT, "replication protocol", "ReplicationHandler") + } case COM_BINLOG_DUMP: - position := uint32(data[1]) | uint32(data[2])<<8 | uint32(data[3])<<16 | uint32(data[4])<<24 - // get server id - _ = uint32(data[4]) | uint32(data[5])<<8 | uint32(data[6])<<16 | uint32(data[7])<<24 - name := string(data[9:]) if h, ok := c.h.(ReplicationHandler); ok { s := replication.NewBinlogStreamer() - go h.HandleBinlogDump(position, name, s) + go h.HandleBinlogDump(data, s) return s } else { - return c.h.HandleOtherCommand(cmd, data) + return NewDefaultError(ER_HANDLER_DOES_NOT_SUPPORT, "replication protocol", "ReplicationHandler") } case COM_BINLOG_DUMP_GTID: - // TODO support GTID - return nil + if h, ok := c.h.(ReplicationHandler); ok { + s := replication.NewBinlogStreamer() + go h.HandleBinlogDumpGTID(data, s) + return s + } else { + return NewDefaultError(ER_HANDLER_DOES_NOT_SUPPORT, "replication protocol", "ReplicationHandler") + } default: return c.h.HandleOtherCommand(cmd, data) } @@ -188,6 +197,10 @@ func (h EmptyHandler) HandleStmtClose(context interface{}) error { return nil } +func (h EmptyReplicationHandler) HandleRegisterSlave(data []byte) error { + return nil +} + func (h EmptyReplicationHandler) HandleBinlogDump(position uint32, name string, r *replication.BinlogStreamer) { } From c6e73da9e3b1a857dd5108089282bc7a77b89329 Mon Sep 17 00:00:00 2001 From: Fizic Date: Thu, 22 Dec 2022 15:44:47 +0300 Subject: [PATCH 04/14] fix: errors --- server/command.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/command.go b/server/command.go index ec5529b5a..5397c162a 100644 --- a/server/command.go +++ b/server/command.go @@ -198,7 +198,7 @@ func (h EmptyHandler) HandleStmtClose(context interface{}) error { } func (h EmptyReplicationHandler) HandleRegisterSlave(data []byte) error { - return nil + return fmt.Errorf("not supported now") } func (h EmptyReplicationHandler) HandleBinlogDump(position uint32, name string, r *replication.BinlogStreamer) { From 132c325f5b75112a9841606492c769a39c579e23 Mon Sep 17 00:00:00 2001 From: Fizic Date: Thu, 22 Dec 2022 15:46:43 +0300 Subject: [PATCH 05/14] fix: name in write value command --- server/resp.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/resp.go b/server/resp.go index 93f2f5e33..5a2e87462 100644 --- a/server/resp.go +++ b/server/resp.go @@ -200,7 +200,7 @@ func (c *Conn) writeFieldValues(fv []FieldValue) error { return c.WritePacket(data) } -func (c *Conn) writeBinlogEvent(s *replication.BinlogStreamer) error { +func (c *Conn) writeBinlogEvents(s *replication.BinlogStreamer) error { for { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ev, err := s.GetEvent(ctx) @@ -245,7 +245,7 @@ func (c *Conn) WriteValue(value interface{}) error { case []FieldValue: return c.writeFieldValues(v) case *replication.BinlogStreamer: - return c.writeBinlogEvent(v) + return c.writeBinlogEvents(v) case *Stmt: return c.writePrepare(v) default: From 5fdb922ac977c2355746b622c189162ed703396b Mon Sep 17 00:00:00 2001 From: Fizic Date: Thu, 22 Dec 2022 16:03:02 +0300 Subject: [PATCH 06/14] fix: delete error from mysql errors, change errors in dispatch --- mysql/errcode.go | 1 - mysql/errname.go | 1 - server/command.go | 6 +++--- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/mysql/errcode.go b/mysql/errcode.go index b5f5533d1..1be5c44ab 100644 --- a/mysql/errcode.go +++ b/mysql/errcode.go @@ -867,5 +867,4 @@ const ( ER_MUST_CHANGE_PASSWORD_LOGIN = 1862 ER_ROW_IN_WRONG_PARTITION = 1863 ER_ERROR_LAST = 1863 - ER_HANDLER_DOES_NOT_SUPPORT = 1864 ) diff --git a/mysql/errname.go b/mysql/errname.go index 998a3bf8b..592f4bb5f 100644 --- a/mysql/errname.go +++ b/mysql/errname.go @@ -865,5 +865,4 @@ var MySQLErrName = map[uint16]string{ ER_ALTER_OPERATION_NOT_SUPPORTED_REASON_NOT_NULL: "cannot silently convert NULL values, as required in this SQL_MODE", ER_MUST_CHANGE_PASSWORD_LOGIN: "Your password has expired. To log in you must change it using a client that supports expired passwords.", ER_ROW_IN_WRONG_PARTITION: "Found a row in wrong partition %s", - ER_HANDLER_DOES_NOT_SUPPORT: "The handler does not support %s, use %s instead.", } diff --git a/server/command.go b/server/command.go index 5397c162a..7f4ff4c38 100644 --- a/server/command.go +++ b/server/command.go @@ -146,7 +146,7 @@ func (c *Conn) dispatch(data []byte) interface{} { } return nil } else { - return NewDefaultError(ER_HANDLER_DOES_NOT_SUPPORT, "replication protocol", "ReplicationHandler") + return fmt.Errorf("the handler does not support replication protocol, use ReplicationHandler instead") } case COM_BINLOG_DUMP: if h, ok := c.h.(ReplicationHandler); ok { @@ -154,7 +154,7 @@ func (c *Conn) dispatch(data []byte) interface{} { go h.HandleBinlogDump(data, s) return s } else { - return NewDefaultError(ER_HANDLER_DOES_NOT_SUPPORT, "replication protocol", "ReplicationHandler") + return fmt.Errorf("the handler does not support replication protocol, use ReplicationHandler instead") } case COM_BINLOG_DUMP_GTID: if h, ok := c.h.(ReplicationHandler); ok { @@ -162,7 +162,7 @@ func (c *Conn) dispatch(data []byte) interface{} { go h.HandleBinlogDumpGTID(data, s) return s } else { - return NewDefaultError(ER_HANDLER_DOES_NOT_SUPPORT, "replication protocol", "ReplicationHandler") + return fmt.Errorf("the handler does not support replication protocol, use ReplicationHandler instead") } default: return c.h.HandleOtherCommand(cmd, data) From d8ec4a6dd4521dacb2c590c677fd1aace62d35af Mon Sep 17 00:00:00 2001 From: Fizic Date: Thu, 22 Dec 2022 16:10:23 +0300 Subject: [PATCH 07/14] fix: EmptyReplicationHandler methods --- server/command.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/command.go b/server/command.go index 7f4ff4c38..cafb5dc5f 100644 --- a/server/command.go +++ b/server/command.go @@ -201,10 +201,10 @@ func (h EmptyReplicationHandler) HandleRegisterSlave(data []byte) error { return fmt.Errorf("not supported now") } -func (h EmptyReplicationHandler) HandleBinlogDump(position uint32, name string, r *replication.BinlogStreamer) { +func (h EmptyReplicationHandler) HandleBinlogDump(data []byte, r *replication.BinlogStreamer) { } -func (h EmptyReplicationHandler) HandleBinlogDumpGTID(gtidSet string, r *replication.BinlogStreamer) { +func (h EmptyReplicationHandler) HandleBinlogDumpGTID(data []byte, r *replication.BinlogStreamer) { } func (h EmptyHandler) HandleOtherCommand(cmd byte, data []byte) error { From 4e4b1b594907c3edfb31a2de8007ee19826baac0 Mon Sep 17 00:00:00 2001 From: Fizic Date: Thu, 22 Dec 2022 16:31:27 +0300 Subject: [PATCH 08/14] fix: import --- server/command.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/command.go b/server/command.go index cafb5dc5f..0a431b0f1 100644 --- a/server/command.go +++ b/server/command.go @@ -3,9 +3,9 @@ package server import ( "bytes" "fmt" - "github.com/go-mysql-org/go-mysql/replication" . "github.com/go-mysql-org/go-mysql/mysql" + "github.com/go-mysql-org/go-mysql/replication" "github.com/siddontang/go/hack" ) From 125d938d9172f10ef4bd611c6598bf8896973306 Mon Sep 17 00:00:00 2001 From: Fizic Date: Tue, 27 Dec 2022 18:17:03 +0300 Subject: [PATCH 09/14] fix: Replication handlers methods --- replication/binlogstreamer.go | 6 ++++++ server/command.go | 20 ++++++++++++++------ server/replication.go | 20 ++++++++++++++++++++ server/resp.go | 4 +--- 4 files changed, 41 insertions(+), 9 deletions(-) create mode 100644 server/replication.go diff --git a/replication/binlogstreamer.go b/replication/binlogstreamer.go index d6de19403..198e07a71 100644 --- a/replication/binlogstreamer.go +++ b/replication/binlogstreamer.go @@ -101,3 +101,9 @@ func (s *BinlogStreamer) AddEventToStreamer(ev *BinlogEvent) error { return err } } + +func (s *BinlogStreamer) AddErrorToStreamer(err error) { + select { + case s.ech <- err: + } +} diff --git a/server/command.go b/server/command.go index 0a431b0f1..969dc6a3b 100644 --- a/server/command.go +++ b/server/command.go @@ -34,8 +34,8 @@ type Handler interface { type ReplicationHandler interface { //handle Replication command HandleRegisterSlave(data []byte) error - HandleBinlogDump(data []byte, s *replication.BinlogStreamer) - HandleBinlogDumpGTID(data []byte, s *replication.BinlogStreamer) + HandleBinlogDump(pos *Position, s *replication.BinlogStreamer) + HandleBinlogDumpGTID(gtidSet *MysqlGTIDSet, s *replication.BinlogStreamer) } func (c *Conn) HandleCommand() error { @@ -150,16 +150,24 @@ func (c *Conn) dispatch(data []byte) interface{} { } case COM_BINLOG_DUMP: if h, ok := c.h.(ReplicationHandler); ok { + pos, _ := parseBinlogDump(data) s := replication.NewBinlogStreamer() - go h.HandleBinlogDump(data, s) + go h.HandleBinlogDump(pos, s) + return s } else { return fmt.Errorf("the handler does not support replication protocol, use ReplicationHandler instead") } case COM_BINLOG_DUMP_GTID: if h, ok := c.h.(ReplicationHandler); ok { + gtidSet, err := parseBinlogDumpGTID(data) + if err != nil { + return err + } + s := replication.NewBinlogStreamer() - go h.HandleBinlogDumpGTID(data, s) + go h.HandleBinlogDumpGTID(gtidSet, s) + return s } else { return fmt.Errorf("the handler does not support replication protocol, use ReplicationHandler instead") @@ -201,10 +209,10 @@ func (h EmptyReplicationHandler) HandleRegisterSlave(data []byte) error { return fmt.Errorf("not supported now") } -func (h EmptyReplicationHandler) HandleBinlogDump(data []byte, r *replication.BinlogStreamer) { +func (h EmptyReplicationHandler) HandleBinlogDump(pos *Position, r *replication.BinlogStreamer) { } -func (h EmptyReplicationHandler) HandleBinlogDumpGTID(data []byte, r *replication.BinlogStreamer) { +func (h EmptyReplicationHandler) HandleBinlogDumpGTID(gtidSet *MysqlGTIDSet, r *replication.BinlogStreamer) { } func (h EmptyHandler) HandleOtherCommand(cmd byte, data []byte) error { diff --git a/server/replication.go b/server/replication.go new file mode 100644 index 000000000..b895ae72e --- /dev/null +++ b/server/replication.go @@ -0,0 +1,20 @@ +package server + +import ( + "encoding/binary" + "github.com/go-mysql-org/go-mysql/mysql" +) + +func parseBinlogDump(data []byte) (*mysql.Position, error) { + var p mysql.Position + p.Pos = binary.LittleEndian.Uint32(data[0:4]) + p.Name = string(data[10:]) + + return &p, nil +} + +func parseBinlogDumpGTID(data []byte) (*mysql.MysqlGTIDSet, error) { + lenPosName := binary.LittleEndian.Uint32(data[11:15]) + + return mysql.DecodeMysqlGTIDSet(data[22+lenPosName:]) +} diff --git a/server/resp.go b/server/resp.go index 5a2e87462..92ca68f5e 100644 --- a/server/resp.go +++ b/server/resp.go @@ -207,7 +207,7 @@ func (c *Conn) writeBinlogEvents(s *replication.BinlogStreamer) error { cancel() if err == context.DeadlineExceeded { - break + continue } data := make([]byte, 4, 32) data = append(data, OK_HEADER) @@ -217,8 +217,6 @@ func (c *Conn) writeBinlogEvents(s *replication.BinlogStreamer) error { return err } } - - return nil } type noResponse struct{} From d98f42c4041c851527a48c3655afcbf0b53a2206 Mon Sep 17 00:00:00 2001 From: Fizic Date: Wed, 28 Dec 2022 09:33:04 +0300 Subject: [PATCH 10/14] fix: golangci errors --- replication/binlogstreamer.go | 4 +--- server/replication.go | 1 + 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/replication/binlogstreamer.go b/replication/binlogstreamer.go index 198e07a71..15aae7d33 100644 --- a/replication/binlogstreamer.go +++ b/replication/binlogstreamer.go @@ -103,7 +103,5 @@ func (s *BinlogStreamer) AddEventToStreamer(ev *BinlogEvent) error { } func (s *BinlogStreamer) AddErrorToStreamer(err error) { - select { - case s.ech <- err: - } + s.ech <- err } diff --git a/server/replication.go b/server/replication.go index b895ae72e..b33d0dd88 100644 --- a/server/replication.go +++ b/server/replication.go @@ -2,6 +2,7 @@ package server import ( "encoding/binary" + "github.com/go-mysql-org/go-mysql/mysql" ) From 46a38cfb3a08ea8dc2e6de663670922c8d8c378e Mon Sep 17 00:00:00 2001 From: Fizic Date: Tue, 10 Jan 2023 15:22:07 +0500 Subject: [PATCH 11/14] fix: improved points in implementation, removed dangerous places that could lead to errors feat: comments explaining options, ReplicationHandler in tests --- replication/binlogstreamer.go | 19 ++++++++++++---- server/command.go | 42 +++++++++++++++++------------------ server/command_test.go | 1 + server/resp.go | 14 +++++------- 4 files changed, 42 insertions(+), 34 deletions(-) diff --git a/replication/binlogstreamer.go b/replication/binlogstreamer.go index 15aae7d33..cbe275ec9 100644 --- a/replication/binlogstreamer.go +++ b/replication/binlogstreamer.go @@ -9,8 +9,9 @@ import ( ) var ( - ErrNeedSyncAgain = errors.New("Last sync error or closed, try sync and get event again") - ErrSyncClosed = errors.New("Sync was closed") + ErrNeedSyncAgain = errors.New("Last sync error or closed, try sync and get event again") + ErrSyncClosed = errors.New("Sync was closed") + ErrStreamerIsFull = errors.New("streamer is full") ) // BinlogStreamer gets the streaming event. @@ -93,15 +94,25 @@ func NewBinlogStreamer() *BinlogStreamer { return s } +// AddEventToStreamer adds a binlog event to the streamer. You can use it when you want to add an event to the streamer manually. +// can be used in replication handlers func (s *BinlogStreamer) AddEventToStreamer(ev *BinlogEvent) error { select { case s.ch <- ev: return nil case err := <-s.ech: return err + default: + return ErrStreamerIsFull } } -func (s *BinlogStreamer) AddErrorToStreamer(err error) { - s.ech <- err +// AddErrorToStreamer adds an error to the streamer. +func (s *BinlogStreamer) AddErrorToStreamer(err error) bool { + select { + case s.ech <- err: + return true + default: + return false + } } diff --git a/server/command.go b/server/command.go index 969dc6a3b..41b57087a 100644 --- a/server/command.go +++ b/server/command.go @@ -32,10 +32,10 @@ type Handler interface { } type ReplicationHandler interface { - //handle Replication command + // handle Replication command HandleRegisterSlave(data []byte) error - HandleBinlogDump(pos *Position, s *replication.BinlogStreamer) - HandleBinlogDumpGTID(gtidSet *MysqlGTIDSet, s *replication.BinlogStreamer) + HandleBinlogDump(pos *Position) (*replication.BinlogStreamer, error) + HandleBinlogDumpGTID(gtidSet *MysqlGTIDSet) (*replication.BinlogStreamer, error) } func (c *Conn) HandleCommand() error { @@ -141,22 +141,20 @@ func (c *Conn) dispatch(data []byte) interface{} { return eofResponse{} case COM_REGISTER_SLAVE: if h, ok := c.h.(ReplicationHandler); ok { - if err := h.HandleRegisterSlave(data); err != nil { - return err - } - return nil + return h.HandleRegisterSlave(data) } else { - return fmt.Errorf("the handler does not support replication protocol, use ReplicationHandler instead") + return c.h.HandleOtherCommand(cmd, data) } case COM_BINLOG_DUMP: if h, ok := c.h.(ReplicationHandler); ok { pos, _ := parseBinlogDump(data) - s := replication.NewBinlogStreamer() - go h.HandleBinlogDump(pos, s) - - return s + if s, err := h.HandleBinlogDump(pos); err != nil { + return s + } else { + return err + } } else { - return fmt.Errorf("the handler does not support replication protocol, use ReplicationHandler instead") + return c.h.HandleOtherCommand(cmd, data) } case COM_BINLOG_DUMP_GTID: if h, ok := c.h.(ReplicationHandler); ok { @@ -164,13 +162,13 @@ func (c *Conn) dispatch(data []byte) interface{} { if err != nil { return err } - - s := replication.NewBinlogStreamer() - go h.HandleBinlogDumpGTID(gtidSet, s) - - return s + if s, err := h.HandleBinlogDumpGTID(gtidSet); err != nil { + return s + } else { + return err + } } else { - return fmt.Errorf("the handler does not support replication protocol, use ReplicationHandler instead") + return c.h.HandleOtherCommand(cmd, data) } default: return c.h.HandleOtherCommand(cmd, data) @@ -209,10 +207,12 @@ func (h EmptyReplicationHandler) HandleRegisterSlave(data []byte) error { return fmt.Errorf("not supported now") } -func (h EmptyReplicationHandler) HandleBinlogDump(pos *Position, r *replication.BinlogStreamer) { +func (h EmptyReplicationHandler) HandleBinlogDump(pos *Position) (*replication.BinlogStreamer, error) { + return nil, fmt.Errorf("not supported now") } -func (h EmptyReplicationHandler) HandleBinlogDumpGTID(gtidSet *MysqlGTIDSet, r *replication.BinlogStreamer) { +func (h EmptyReplicationHandler) HandleBinlogDumpGTID(gtidSet *MysqlGTIDSet) (*replication.BinlogStreamer, error) { + return nil, fmt.Errorf("not supported now") } func (h EmptyHandler) HandleOtherCommand(cmd byte, data []byte) error { diff --git a/server/command_test.go b/server/command_test.go index 34b034e28..5fe0476e8 100644 --- a/server/command_test.go +++ b/server/command_test.go @@ -2,3 +2,4 @@ package server // Ensure EmptyHandler implements Handler interface or cause compile time error var _ Handler = EmptyHandler{} +var _ ReplicationHandler = EmptyReplicationHandler{} diff --git a/server/resp.go b/server/resp.go index 92ca68f5e..e13e6270b 100644 --- a/server/resp.go +++ b/server/resp.go @@ -3,8 +3,6 @@ package server import ( "context" "fmt" - "time" - . "github.com/go-mysql-org/go-mysql/mysql" "github.com/go-mysql-org/go-mysql/replication" ) @@ -200,16 +198,14 @@ func (c *Conn) writeFieldValues(fv []FieldValue) error { return c.WritePacket(data) } +// see: https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_replication.html func (c *Conn) writeBinlogEvents(s *replication.BinlogStreamer) error { for { - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - ev, err := s.GetEvent(ctx) - cancel() - - if err == context.DeadlineExceeded { - continue + ev, err := s.GetEvent(context.Background()) + if err != nil { + return err } - data := make([]byte, 4, 32) + data := make([]byte, 4, 4+len(ev.RawData)) data = append(data, OK_HEADER) data = append(data, ev.RawData...) From 09f1f26b5c6e45b19630c330c7dbc28c470d0925 Mon Sep 17 00:00:00 2001 From: Fizic Date: Tue, 10 Jan 2023 15:10:49 +0300 Subject: [PATCH 12/14] fix: returned values in dispatch --- server/command.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/command.go b/server/command.go index 41b57087a..8a9043df1 100644 --- a/server/command.go +++ b/server/command.go @@ -149,9 +149,9 @@ func (c *Conn) dispatch(data []byte) interface{} { if h, ok := c.h.(ReplicationHandler); ok { pos, _ := parseBinlogDump(data) if s, err := h.HandleBinlogDump(pos); err != nil { - return s - } else { return err + } else { + return s } } else { return c.h.HandleOtherCommand(cmd, data) @@ -163,9 +163,9 @@ func (c *Conn) dispatch(data []byte) interface{} { return err } if s, err := h.HandleBinlogDumpGTID(gtidSet); err != nil { - return s - } else { return err + } else { + return s } } else { return c.h.HandleOtherCommand(cmd, data) From 960c01aab6aa18d6e8a1af34569871fe789490ab Mon Sep 17 00:00:00 2001 From: Fizic Date: Tue, 10 Jan 2023 15:21:26 +0300 Subject: [PATCH 13/14] fix: external default in case, imports --- replication/binlogstreamer.go | 7 ++----- server/resp.go | 1 + 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/replication/binlogstreamer.go b/replication/binlogstreamer.go index cbe275ec9..61254fba0 100644 --- a/replication/binlogstreamer.go +++ b/replication/binlogstreamer.go @@ -9,9 +9,8 @@ import ( ) var ( - ErrNeedSyncAgain = errors.New("Last sync error or closed, try sync and get event again") - ErrSyncClosed = errors.New("Sync was closed") - ErrStreamerIsFull = errors.New("streamer is full") + ErrNeedSyncAgain = errors.New("Last sync error or closed, try sync and get event again") + ErrSyncClosed = errors.New("Sync was closed") ) // BinlogStreamer gets the streaming event. @@ -102,8 +101,6 @@ func (s *BinlogStreamer) AddEventToStreamer(ev *BinlogEvent) error { return nil case err := <-s.ech: return err - default: - return ErrStreamerIsFull } } diff --git a/server/resp.go b/server/resp.go index e13e6270b..acc033c26 100644 --- a/server/resp.go +++ b/server/resp.go @@ -3,6 +3,7 @@ package server import ( "context" "fmt" + . "github.com/go-mysql-org/go-mysql/mysql" "github.com/go-mysql-org/go-mysql/replication" ) From 5f660c9c5759dfb06275f00c6731689469c30f96 Mon Sep 17 00:00:00 2001 From: Fizic Date: Wed, 11 Jan 2023 18:32:06 +0300 Subject: [PATCH 14/14] fix: parsing response --- server/command.go | 9 ++++++--- server/replication.go | 13 +++++++++++-- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/server/command.go b/server/command.go index 8a9043df1..78a0ea03d 100644 --- a/server/command.go +++ b/server/command.go @@ -34,7 +34,7 @@ type Handler interface { type ReplicationHandler interface { // handle Replication command HandleRegisterSlave(data []byte) error - HandleBinlogDump(pos *Position) (*replication.BinlogStreamer, error) + HandleBinlogDump(pos Position) (*replication.BinlogStreamer, error) HandleBinlogDumpGTID(gtidSet *MysqlGTIDSet) (*replication.BinlogStreamer, error) } @@ -147,7 +147,10 @@ func (c *Conn) dispatch(data []byte) interface{} { } case COM_BINLOG_DUMP: if h, ok := c.h.(ReplicationHandler); ok { - pos, _ := parseBinlogDump(data) + pos, err := parseBinlogDump(data) + if err != nil { + return err + } if s, err := h.HandleBinlogDump(pos); err != nil { return err } else { @@ -207,7 +210,7 @@ func (h EmptyReplicationHandler) HandleRegisterSlave(data []byte) error { return fmt.Errorf("not supported now") } -func (h EmptyReplicationHandler) HandleBinlogDump(pos *Position) (*replication.BinlogStreamer, error) { +func (h EmptyReplicationHandler) HandleBinlogDump(pos Position) (*replication.BinlogStreamer, error) { return nil, fmt.Errorf("not supported now") } diff --git a/server/replication.go b/server/replication.go index b33d0dd88..82d657c1b 100644 --- a/server/replication.go +++ b/server/replication.go @@ -6,16 +6,25 @@ import ( "github.com/go-mysql-org/go-mysql/mysql" ) -func parseBinlogDump(data []byte) (*mysql.Position, error) { +func parseBinlogDump(data []byte) (mysql.Position, error) { + if len(data) < 10 { + return mysql.Position{}, mysql.ErrMalformPacket + } var p mysql.Position p.Pos = binary.LittleEndian.Uint32(data[0:4]) p.Name = string(data[10:]) - return &p, nil + return p, nil } func parseBinlogDumpGTID(data []byte) (*mysql.MysqlGTIDSet, error) { + if len(data) < 15 { + return nil, mysql.ErrMalformPacket + } lenPosName := binary.LittleEndian.Uint32(data[11:15]) + if len(data) < 22+int(lenPosName) { + return nil, mysql.ErrMalformPacket + } return mysql.DecodeMysqlGTIDSet(data[22+lenPosName:]) }