diff --git a/replication/binlogstreamer.go b/replication/binlogstreamer.go index 56b8622a8..61254fba0 100644 --- a/replication/binlogstreamer.go +++ b/replication/binlogstreamer.go @@ -84,7 +84,7 @@ func (s *BinlogStreamer) closeWithError(err error) { } } -func newBinlogStreamer() *BinlogStreamer { +func NewBinlogStreamer() *BinlogStreamer { s := new(BinlogStreamer) s.ch = make(chan *BinlogEvent, 10240) @@ -92,3 +92,24 @@ func newBinlogStreamer() *BinlogStreamer { return s } + +// AddEventToStreamer adds a binlog event to the streamer. You can use it when you want to add an event to the streamer manually. +// can be used in replication handlers +func (s *BinlogStreamer) AddEventToStreamer(ev *BinlogEvent) error { + select { + case s.ch <- ev: + return nil + case err := <-s.ech: + return err + } +} + +// AddErrorToStreamer adds an error to the streamer. +func (s *BinlogStreamer) AddErrorToStreamer(err error) bool { + select { + case s.ech <- err: + return true + default: + return false + } +} diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index e68e48222..9c88aa8b8 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -393,7 +393,7 @@ func (b *BinlogSyncer) prepare() error { func (b *BinlogSyncer) startDumpStream() *BinlogStreamer { b.running = true - s := newBinlogStreamer() + s := NewBinlogStreamer() b.wg.Add(1) go b.onStream(s) diff --git a/server/command.go b/server/command.go index c635f56e4..78a0ea03d 100644 --- a/server/command.go +++ b/server/command.go @@ -5,6 +5,7 @@ import ( "fmt" . "github.com/go-mysql-org/go-mysql/mysql" + "github.com/go-mysql-org/go-mysql/replication" "github.com/siddontang/go/hack" ) @@ -30,6 +31,13 @@ type Handler interface { HandleOtherCommand(cmd byte, data []byte) error } +type ReplicationHandler interface { + // handle Replication command + HandleRegisterSlave(data []byte) error + HandleBinlogDump(pos Position) (*replication.BinlogStreamer, error) + HandleBinlogDumpGTID(gtidSet *MysqlGTIDSet) (*replication.BinlogStreamer, error) +} + func (c *Conn) HandleCommand() error { if c.Conn == nil { return fmt.Errorf("connection closed") @@ -131,6 +139,40 @@ func (c *Conn) dispatch(data []byte) interface{} { } return eofResponse{} + case COM_REGISTER_SLAVE: + if h, ok := c.h.(ReplicationHandler); ok { + return h.HandleRegisterSlave(data) + } else { + return c.h.HandleOtherCommand(cmd, data) + } + case COM_BINLOG_DUMP: + if h, ok := c.h.(ReplicationHandler); ok { + pos, err := parseBinlogDump(data) + if err != nil { + return err + } + if s, err := h.HandleBinlogDump(pos); err != nil { + return err + } else { + return s + } + } else { + return c.h.HandleOtherCommand(cmd, data) + } + case COM_BINLOG_DUMP_GTID: + if h, ok := c.h.(ReplicationHandler); ok { + gtidSet, err := parseBinlogDumpGTID(data) + if err != nil { + return err + } + if s, err := h.HandleBinlogDumpGTID(gtidSet); err != nil { + return err + } else { + return s + } + } else { + return c.h.HandleOtherCommand(cmd, data) + } default: return c.h.HandleOtherCommand(cmd, data) } @@ -139,6 +181,10 @@ func (c *Conn) dispatch(data []byte) interface{} { type EmptyHandler struct { } +type EmptyReplicationHandler struct { + EmptyHandler +} + func (h EmptyHandler) UseDB(dbName string) error { return nil } @@ -160,6 +206,18 @@ func (h EmptyHandler) HandleStmtClose(context interface{}) error { return nil } +func (h EmptyReplicationHandler) HandleRegisterSlave(data []byte) error { + return fmt.Errorf("not supported now") +} + +func (h EmptyReplicationHandler) HandleBinlogDump(pos Position) (*replication.BinlogStreamer, error) { + return nil, fmt.Errorf("not supported now") +} + +func (h EmptyReplicationHandler) HandleBinlogDumpGTID(gtidSet *MysqlGTIDSet) (*replication.BinlogStreamer, error) { + return nil, fmt.Errorf("not supported now") +} + func (h EmptyHandler) HandleOtherCommand(cmd byte, data []byte) error { return NewError( ER_UNKNOWN_ERROR, diff --git a/server/command_test.go b/server/command_test.go index 34b034e28..5fe0476e8 100644 --- a/server/command_test.go +++ b/server/command_test.go @@ -2,3 +2,4 @@ package server // Ensure EmptyHandler implements Handler interface or cause compile time error var _ Handler = EmptyHandler{} +var _ ReplicationHandler = EmptyReplicationHandler{} diff --git a/server/replication.go b/server/replication.go new file mode 100644 index 000000000..82d657c1b --- /dev/null +++ b/server/replication.go @@ -0,0 +1,30 @@ +package server + +import ( + "encoding/binary" + + "github.com/go-mysql-org/go-mysql/mysql" +) + +func parseBinlogDump(data []byte) (mysql.Position, error) { + if len(data) < 10 { + return mysql.Position{}, mysql.ErrMalformPacket + } + var p mysql.Position + p.Pos = binary.LittleEndian.Uint32(data[0:4]) + p.Name = string(data[10:]) + + return p, nil +} + +func parseBinlogDumpGTID(data []byte) (*mysql.MysqlGTIDSet, error) { + if len(data) < 15 { + return nil, mysql.ErrMalformPacket + } + lenPosName := binary.LittleEndian.Uint32(data[11:15]) + if len(data) < 22+int(lenPosName) { + return nil, mysql.ErrMalformPacket + } + + return mysql.DecodeMysqlGTIDSet(data[22+lenPosName:]) +} diff --git a/server/resp.go b/server/resp.go index f7f395d8b..acc033c26 100644 --- a/server/resp.go +++ b/server/resp.go @@ -1,9 +1,11 @@ package server import ( + "context" "fmt" . "github.com/go-mysql-org/go-mysql/mysql" + "github.com/go-mysql-org/go-mysql/replication" ) func (c *Conn) writeOK(r *Result) error { @@ -197,6 +199,23 @@ func (c *Conn) writeFieldValues(fv []FieldValue) error { return c.WritePacket(data) } +// see: https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_replication.html +func (c *Conn) writeBinlogEvents(s *replication.BinlogStreamer) error { + for { + ev, err := s.GetEvent(context.Background()) + if err != nil { + return err + } + data := make([]byte, 4, 4+len(ev.RawData)) + data = append(data, OK_HEADER) + + data = append(data, ev.RawData...) + if err := c.WritePacket(data); err != nil { + return err + } + } +} + type noResponse struct{} type eofResponse struct{} @@ -220,6 +239,8 @@ func (c *Conn) WriteValue(value interface{}) error { return c.writeFieldList(v, nil) case []FieldValue: return c.writeFieldValues(v) + case *replication.BinlogStreamer: + return c.writeBinlogEvents(v) case *Stmt: return c.writePrepare(v) default: