Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor StartBackup to Support Custom Backup Handlers #849

Merged
merged 7 commits into from
Feb 4, 2024
2 changes: 1 addition & 1 deletion cmd/go-mysqlbinlog/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func main() {
pos := mysql.Position{Name: *file, Pos: uint32(*pos)}
if len(*backupPath) > 0 {
// Backup will always use RawMode.
err := b.StartBackup(*backupPath, pos, 0)
err := b.StartBackupToFile(*backupPath, pos, 0)
if err != nil {
fmt.Printf("Start backup error: %v\n", errors.ErrorStack(err))
return
Expand Down
54 changes: 41 additions & 13 deletions replication/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,39 @@ import (

// StartBackup: Like mysqlbinlog remote raw backup
// Backup remote binlog from position (filename, offset) and write in backupDir
func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Duration) error {
func (b *BinlogSyncer) StartBackupToFile(backupDir string, p Position, timeout time.Duration) error {
err := os.MkdirAll(backupDir, 0755)
if err != nil {
return errors.Trace(err)
}
return b.StartBackup(p, timeout, func(filename string) (io.WriteCloser, error) {
return os.OpenFile(path.Join(backupDir, filename), os.O_CREATE|os.O_WRONLY, 0644)
})
}

// StartBackup initiates a backup process for binlog events starting from a specified position.
// It continuously fetches binlog events and writes them to files managed by a provided handler function.
// The backup process can be controlled with a timeout duration, after which the backup will stop if no new events are received.
//
// Parameters:
// - p Position: The starting position in the binlog from which to begin the backup.
// - timeout time.Duration: The maximum duration to wait for new binlog events before stopping the backup process.
// If set to 0, a default very long timeout (30 days) is used instead.
// - handler func(filename string) (io.WriteCloser, error): A function provided by the caller to handle file creation and writing.
// This function is expected to return an io.WriteCloser for the specified filename, which will be used to write binlog events.
//
// The function first checks if a timeout is specified, setting a default if not. It then enables raw mode parsing for binlog events
doraemonkeys marked this conversation as resolved.
Show resolved Hide resolved
// to ensure that events are not parsed but passed as raw data for backup. It starts syncing binlog events from the specified position
// and enters a loop to continuously fetch and write events.
//
// For each event, it checks the event type. If it's a ROTATE_EVENT, it updates the filename to the next log file as indicated by the event.
// If it's a FORMAT_DESCRIPTION_EVENT, it signifies the start of a new binlog file, and the function closes the current file (if open) and opens
// a new one using the handler function. It also writes the binlog file header to the new file.
//
// The function writes the raw data of each event to the current file and handles errors such as context deadline exceeded (timeout),
// write errors, or short writes.
func (b *BinlogSyncer) StartBackup(p Position, timeout time.Duration,
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
handler func(filename string) (io.WriteCloser, error)) error {
doraemonkeys marked this conversation as resolved.
Show resolved Hide resolved
if timeout == 0 {
// a very long timeout here
timeout = 30 * 3600 * 24 * time.Second
Expand All @@ -22,10 +54,6 @@ func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Du
// Force use raw mode
b.parser.SetRawMode(true)

if err := os.MkdirAll(backupDir, 0755); err != nil {
return errors.Trace(err)
}

s, err := b.StartSync(p)
if err != nil {
return errors.Trace(err)
Expand All @@ -34,10 +62,10 @@ func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Du
var filename string
var offset uint32

var f *os.File
var w io.WriteCloser
defer func() {
if f != nil {
f.Close()
if w != nil {
w.Close()
}
}()

Expand Down Expand Up @@ -67,26 +95,26 @@ func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Du
} else if e.Header.EventType == FORMAT_DESCRIPTION_EVENT {
// FormateDescriptionEvent is the first event in binlog, we will close old one and create a new

if f != nil {
f.Close()
if w != nil {
_ = w.Close()
doraemonkeys marked this conversation as resolved.
Show resolved Hide resolved
}

if len(filename) == 0 {
return errors.Errorf("empty binlog filename for FormateDescriptionEvent")
}

f, err = os.OpenFile(path.Join(backupDir, filename), os.O_CREATE|os.O_WRONLY, 0644)
w, err = handler(filename)
if err != nil {
return errors.Trace(err)
}

// write binlog header fe'bin'
if _, err = f.Write(BinLogFileHeader); err != nil {
if _, err = w.Write(BinLogFileHeader); err != nil {
return errors.Trace(err)
}
}

if n, err := f.Write(e.RawData); err != nil {
if n, err := w.Write(e.RawData); err != nil {
return errors.Trace(err)
} else if n != len(e.RawData) {
return errors.Trace(io.ErrShortWrite)
Expand Down
2 changes: 1 addition & 1 deletion replication/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (t *testSyncerSuite) TestStartBackupEndInGivenTime() {
done := make(chan bool)

go func() {
err := t.b.StartBackup(binlogDir, mysql.Position{Name: "", Pos: uint32(0)}, timeout)
err := t.b.StartBackupToFile(binlogDir, mysql.Position{Name: "", Pos: uint32(0)}, timeout)
require.NoError(t.T(), err)
done <- true
}()
Expand Down
2 changes: 1 addition & 1 deletion replication/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ func (t *testSyncerSuite) TestMysqlBinlogCodec() {

os.RemoveAll(binlogDir)

err := t.b.StartBackup(binlogDir, mysql.Position{Name: "", Pos: uint32(0)}, 2*time.Second)
err := t.b.StartBackupToFile(binlogDir, mysql.Position{Name: "", Pos: uint32(0)}, 2*time.Second)
require.NoError(t.T(), err)

p := NewBinlogParser()
Expand Down
Loading