Skip to content

Commit

Permalink
Merge branch 'master' into atercattus/allow-to-run-tests-in-local-docker
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Jan 16, 2023
2 parents 9e7980a + 3dc80ac commit bb0e767
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 2 deletions.
23 changes: 22 additions & 1 deletion replication/binlogstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,32 @@ func (s *BinlogStreamer) closeWithError(err error) {
}
}

func newBinlogStreamer() *BinlogStreamer {
func NewBinlogStreamer() *BinlogStreamer {
s := new(BinlogStreamer)

s.ch = make(chan *BinlogEvent, 10240)
s.ech = make(chan error, 4)

return s
}

// AddEventToStreamer adds a binlog event to the streamer. You can use it when you want to add an event to the streamer manually.
// can be used in replication handlers
func (s *BinlogStreamer) AddEventToStreamer(ev *BinlogEvent) error {
select {
case s.ch <- ev:
return nil
case err := <-s.ech:
return err
}
}

// AddErrorToStreamer adds an error to the streamer.
func (s *BinlogStreamer) AddErrorToStreamer(err error) bool {
select {
case s.ech <- err:
return true
default:
return false
}
}
2 changes: 1 addition & 1 deletion replication/binlogsyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func (b *BinlogSyncer) prepare() error {
func (b *BinlogSyncer) startDumpStream() *BinlogStreamer {
b.running = true

s := newBinlogStreamer()
s := NewBinlogStreamer()

b.wg.Add(1)
go b.onStream(s)
Expand Down
58 changes: 58 additions & 0 deletions server/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

. "github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
"github.com/siddontang/go/hack"
)

Expand All @@ -30,6 +31,13 @@ type Handler interface {
HandleOtherCommand(cmd byte, data []byte) error
}

type ReplicationHandler interface {
// handle Replication command
HandleRegisterSlave(data []byte) error
HandleBinlogDump(pos Position) (*replication.BinlogStreamer, error)
HandleBinlogDumpGTID(gtidSet *MysqlGTIDSet) (*replication.BinlogStreamer, error)
}

func (c *Conn) HandleCommand() error {
if c.Conn == nil {
return fmt.Errorf("connection closed")
Expand Down Expand Up @@ -131,6 +139,40 @@ func (c *Conn) dispatch(data []byte) interface{} {
}

return eofResponse{}
case COM_REGISTER_SLAVE:
if h, ok := c.h.(ReplicationHandler); ok {
return h.HandleRegisterSlave(data)
} else {
return c.h.HandleOtherCommand(cmd, data)
}
case COM_BINLOG_DUMP:
if h, ok := c.h.(ReplicationHandler); ok {
pos, err := parseBinlogDump(data)
if err != nil {
return err
}
if s, err := h.HandleBinlogDump(pos); err != nil {
return err
} else {
return s
}
} else {
return c.h.HandleOtherCommand(cmd, data)
}
case COM_BINLOG_DUMP_GTID:
if h, ok := c.h.(ReplicationHandler); ok {
gtidSet, err := parseBinlogDumpGTID(data)
if err != nil {
return err
}
if s, err := h.HandleBinlogDumpGTID(gtidSet); err != nil {
return err
} else {
return s
}
} else {
return c.h.HandleOtherCommand(cmd, data)
}
default:
return c.h.HandleOtherCommand(cmd, data)
}
Expand All @@ -139,6 +181,10 @@ func (c *Conn) dispatch(data []byte) interface{} {
type EmptyHandler struct {
}

type EmptyReplicationHandler struct {
EmptyHandler
}

func (h EmptyHandler) UseDB(dbName string) error {
return nil
}
Expand All @@ -160,6 +206,18 @@ func (h EmptyHandler) HandleStmtClose(context interface{}) error {
return nil
}

func (h EmptyReplicationHandler) HandleRegisterSlave(data []byte) error {
return fmt.Errorf("not supported now")
}

func (h EmptyReplicationHandler) HandleBinlogDump(pos Position) (*replication.BinlogStreamer, error) {
return nil, fmt.Errorf("not supported now")
}

func (h EmptyReplicationHandler) HandleBinlogDumpGTID(gtidSet *MysqlGTIDSet) (*replication.BinlogStreamer, error) {
return nil, fmt.Errorf("not supported now")
}

func (h EmptyHandler) HandleOtherCommand(cmd byte, data []byte) error {
return NewError(
ER_UNKNOWN_ERROR,
Expand Down
1 change: 1 addition & 0 deletions server/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ package server

// Ensure EmptyHandler implements Handler interface or cause compile time error
var _ Handler = EmptyHandler{}
var _ ReplicationHandler = EmptyReplicationHandler{}
30 changes: 30 additions & 0 deletions server/replication.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package server

import (
"encoding/binary"

"github.com/go-mysql-org/go-mysql/mysql"
)

func parseBinlogDump(data []byte) (mysql.Position, error) {
if len(data) < 10 {
return mysql.Position{}, mysql.ErrMalformPacket
}
var p mysql.Position
p.Pos = binary.LittleEndian.Uint32(data[0:4])
p.Name = string(data[10:])

return p, nil
}

func parseBinlogDumpGTID(data []byte) (*mysql.MysqlGTIDSet, error) {
if len(data) < 15 {
return nil, mysql.ErrMalformPacket
}
lenPosName := binary.LittleEndian.Uint32(data[11:15])
if len(data) < 22+int(lenPosName) {
return nil, mysql.ErrMalformPacket
}

return mysql.DecodeMysqlGTIDSet(data[22+lenPosName:])
}
21 changes: 21 additions & 0 deletions server/resp.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package server

import (
"context"
"fmt"

. "github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
)

func (c *Conn) writeOK(r *Result) error {
Expand Down Expand Up @@ -197,6 +199,23 @@ func (c *Conn) writeFieldValues(fv []FieldValue) error {
return c.WritePacket(data)
}

// see: https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_replication.html
func (c *Conn) writeBinlogEvents(s *replication.BinlogStreamer) error {
for {
ev, err := s.GetEvent(context.Background())
if err != nil {
return err
}
data := make([]byte, 4, 4+len(ev.RawData))
data = append(data, OK_HEADER)

data = append(data, ev.RawData...)
if err := c.WritePacket(data); err != nil {
return err
}
}
}

type noResponse struct{}
type eofResponse struct{}

Expand All @@ -220,6 +239,8 @@ func (c *Conn) WriteValue(value interface{}) error {
return c.writeFieldList(v, nil)
case []FieldValue:
return c.writeFieldValues(v)
case *replication.BinlogStreamer:
return c.writeBinlogEvents(v)
case *Stmt:
return c.writePrepare(v)
default:
Expand Down

0 comments on commit bb0e767

Please sign in to comment.