From bec8c484bf4211dbd7fe0a1a25f46a34e5dcfc18 Mon Sep 17 00:00:00 2001 From: doraemon Date: Thu, 1 Feb 2024 19:18:45 +0800 Subject: [PATCH 1/7] Refactor StartBackup to Support Custom Backup Handlers --- replication/backup.go | 50 ++++++++++++++++++++++++++++++++----------- 1 file changed, 37 insertions(+), 13 deletions(-) diff --git a/replication/backup.go b/replication/backup.go index d75ba365f..f152764b9 100644 --- a/replication/backup.go +++ b/replication/backup.go @@ -13,7 +13,35 @@ import ( // StartBackup: Like mysqlbinlog remote raw backup // Backup remote binlog from position (filename, offset) and write in backupDir -func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Duration) error { +func (b *BinlogSyncer) StartBackupToFile(backupDir string, p Position, timeout time.Duration) error { + return b.StartBackup(p, timeout, func(filename string) (io.WriteCloser, error) { + return os.OpenFile(path.Join(backupDir, filename), os.O_CREATE|os.O_WRONLY, 0644) + }) +} + +// StartBackup initiates a backup process for binlog events starting from a specified position. +// It continuously fetches binlog events and writes them to files managed by a provided handler function. +// The backup process can be controlled with a timeout duration, after which the backup will stop if no new events are received. +// +// Parameters: +// - p Position: The starting position in the binlog from which to begin the backup. +// - timeout time.Duration: The maximum duration to wait for new binlog events before stopping the backup process. +// If set to 0, a default very long timeout (30 days) is used instead. +// - handler func(filename string) (io.WriteCloser, error): A function provided by the caller to handle file creation and writing. +// This function is expected to return an io.WriteCloser for the specified filename, which will be used to write binlog events. +// +// The function first checks if a timeout is specified, setting a default if not. It then enables raw mode parsing for binlog events +// to ensure that events are not parsed but passed as raw data for backup. It starts syncing binlog events from the specified position +// and enters a loop to continuously fetch and write events. +// +// For each event, it checks the event type. If it's a ROTATE_EVENT, it updates the filename to the next log file as indicated by the event. +// If it's a FORMAT_DESCRIPTION_EVENT, it signifies the start of a new binlog file, and the function closes the current file (if open) and opens +// a new one using the handler function. It also writes the binlog file header to the new file. +// +// The function writes the raw data of each event to the current file and handles errors such as context deadline exceeded (timeout), +// write errors, or short writes. +func (b *BinlogSyncer) StartBackup(p Position, timeout time.Duration, + handler func(filename string) (io.WriteCloser, error)) error { if timeout == 0 { // a very long timeout here timeout = 30 * 3600 * 24 * time.Second @@ -22,10 +50,6 @@ func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Du // Force use raw mode b.parser.SetRawMode(true) - if err := os.MkdirAll(backupDir, 0755); err != nil { - return errors.Trace(err) - } - s, err := b.StartSync(p) if err != nil { return errors.Trace(err) @@ -34,10 +58,10 @@ func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Du var filename string var offset uint32 - var f *os.File + var w io.WriteCloser defer func() { - if f != nil { - f.Close() + if w != nil { + w.Close() } }() @@ -67,26 +91,26 @@ func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Du } else if e.Header.EventType == FORMAT_DESCRIPTION_EVENT { // FormateDescriptionEvent is the first event in binlog, we will close old one and create a new - if f != nil { - f.Close() + if w != nil { + _ = w.Close() } if len(filename) == 0 { return errors.Errorf("empty binlog filename for FormateDescriptionEvent") } - f, err = os.OpenFile(path.Join(backupDir, filename), os.O_CREATE|os.O_WRONLY, 0644) + w, err = handler(filename) if err != nil { return errors.Trace(err) } // write binlog header fe'bin' - if _, err = f.Write(BinLogFileHeader); err != nil { + if _, err = w.Write(BinLogFileHeader); err != nil { return errors.Trace(err) } } - if n, err := f.Write(e.RawData); err != nil { + if n, err := w.Write(e.RawData); err != nil { return errors.Trace(err) } else if n != len(e.RawData) { return errors.Trace(io.ErrShortWrite) From 9df67b16fc1e7dd12ee26e9643a3b9fbc9dbd6f6 Mon Sep 17 00:00:00 2001 From: doraemon Date: Thu, 1 Feb 2024 22:17:49 +0800 Subject: [PATCH 2/7] fix: TestStartBackupEndInGivenTime test failure --- cmd/go-mysqlbinlog/main.go | 2 +- replication/backup.go | 4 ++++ replication/backup_test.go | 2 +- replication/replication_test.go | 2 +- 4 files changed, 7 insertions(+), 3 deletions(-) diff --git a/cmd/go-mysqlbinlog/main.go b/cmd/go-mysqlbinlog/main.go index eb97ace9c..af60eef6b 100644 --- a/cmd/go-mysqlbinlog/main.go +++ b/cmd/go-mysqlbinlog/main.go @@ -54,7 +54,7 @@ func main() { pos := mysql.Position{Name: *file, Pos: uint32(*pos)} if len(*backupPath) > 0 { // Backup will always use RawMode. - err := b.StartBackup(*backupPath, pos, 0) + err := b.StartBackupToFile(*backupPath, pos, 0) if err != nil { fmt.Printf("Start backup error: %v\n", errors.ErrorStack(err)) return diff --git a/replication/backup.go b/replication/backup.go index f152764b9..50c77dbb4 100644 --- a/replication/backup.go +++ b/replication/backup.go @@ -15,6 +15,10 @@ import ( // Backup remote binlog from position (filename, offset) and write in backupDir func (b *BinlogSyncer) StartBackupToFile(backupDir string, p Position, timeout time.Duration) error { return b.StartBackup(p, timeout, func(filename string) (io.WriteCloser, error) { + err := os.MkdirAll(backupDir, 0755) + if err != nil { + return nil, errors.Trace(err) + } return os.OpenFile(path.Join(backupDir, filename), os.O_CREATE|os.O_WRONLY, 0644) }) } diff --git a/replication/backup_test.go b/replication/backup_test.go index b4e1e98b6..cc088f49f 100644 --- a/replication/backup_test.go +++ b/replication/backup_test.go @@ -28,7 +28,7 @@ func (t *testSyncerSuite) TestStartBackupEndInGivenTime() { done := make(chan bool) go func() { - err := t.b.StartBackup(binlogDir, mysql.Position{Name: "", Pos: uint32(0)}, timeout) + err := t.b.StartBackupToFile(binlogDir, mysql.Position{Name: "", Pos: uint32(0)}, timeout) require.NoError(t.T(), err) done <- true }() diff --git a/replication/replication_test.go b/replication/replication_test.go index 8b70f9e44..868a5c2da 100644 --- a/replication/replication_test.go +++ b/replication/replication_test.go @@ -426,7 +426,7 @@ func (t *testSyncerSuite) TestMysqlBinlogCodec() { os.RemoveAll(binlogDir) - err := t.b.StartBackup(binlogDir, mysql.Position{Name: "", Pos: uint32(0)}, 2*time.Second) + err := t.b.StartBackupToFile(binlogDir, mysql.Position{Name: "", Pos: uint32(0)}, 2*time.Second) require.NoError(t.T(), err) p := NewBinlogParser() From 898e7deb055528a0f5ba52e57fa643fe02699006 Mon Sep 17 00:00:00 2001 From: doraemon Date: Thu, 1 Feb 2024 22:24:03 +0800 Subject: [PATCH 3/7] Refactor StartBackupToFile to create backup directory before initiating backup --- replication/backup.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/replication/backup.go b/replication/backup.go index 50c77dbb4..83675e579 100644 --- a/replication/backup.go +++ b/replication/backup.go @@ -14,11 +14,11 @@ import ( // StartBackup: Like mysqlbinlog remote raw backup // Backup remote binlog from position (filename, offset) and write in backupDir func (b *BinlogSyncer) StartBackupToFile(backupDir string, p Position, timeout time.Duration) error { + err := os.MkdirAll(backupDir, 0755) + if err != nil { + return errors.Trace(err) + } return b.StartBackup(p, timeout, func(filename string) (io.WriteCloser, error) { - err := os.MkdirAll(backupDir, 0755) - if err != nil { - return nil, errors.Trace(err) - } return os.OpenFile(path.Join(backupDir, filename), os.O_CREATE|os.O_WRONLY, 0644) }) } From 474d010070cb8d3eeaff578fe4f0ba1fb23be8eb Mon Sep 17 00:00:00 2001 From: doraemon Date: Fri, 2 Feb 2024 17:00:00 +0800 Subject: [PATCH 4/7] fix: Add error handling for w.Close() --- replication/backup.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/replication/backup.go b/replication/backup.go index 83675e579..3155497f3 100644 --- a/replication/backup.go +++ b/replication/backup.go @@ -45,7 +45,7 @@ func (b *BinlogSyncer) StartBackupToFile(backupDir string, p Position, timeout t // The function writes the raw data of each event to the current file and handles errors such as context deadline exceeded (timeout), // write errors, or short writes. func (b *BinlogSyncer) StartBackup(p Position, timeout time.Duration, - handler func(filename string) (io.WriteCloser, error)) error { + handler func(filename string) (io.WriteCloser, error)) (retErr error) { if timeout == 0 { // a very long timeout here timeout = 30 * 3600 * 24 * time.Second @@ -64,8 +64,12 @@ func (b *BinlogSyncer) StartBackup(p Position, timeout time.Duration, var w io.WriteCloser defer func() { + var closeErr error if w != nil { - w.Close() + closeErr = w.Close() + } + if retErr == nil { + retErr = closeErr } }() @@ -96,7 +100,10 @@ func (b *BinlogSyncer) StartBackup(p Position, timeout time.Duration, // FormateDescriptionEvent is the first event in binlog, we will close old one and create a new if w != nil { - _ = w.Close() + if err = w.Close(); err != nil { + w = nil + return errors.Trace(err) + } } if len(filename) == 0 { From 9359b98c60fcad91e63d4b37c96745fc22096446 Mon Sep 17 00:00:00 2001 From: doraemon Date: Fri, 2 Feb 2024 17:05:42 +0800 Subject: [PATCH 5/7] =?UTF-8?q?style=EF=BC=9ARename=20filename=20to=20binl?= =?UTF-8?q?ogFilename=20for=20clarity=20in=20StartBackup?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- replication/backup.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/replication/backup.go b/replication/backup.go index 3155497f3..5ef7ebe67 100644 --- a/replication/backup.go +++ b/replication/backup.go @@ -45,7 +45,7 @@ func (b *BinlogSyncer) StartBackupToFile(backupDir string, p Position, timeout t // The function writes the raw data of each event to the current file and handles errors such as context deadline exceeded (timeout), // write errors, or short writes. func (b *BinlogSyncer) StartBackup(p Position, timeout time.Duration, - handler func(filename string) (io.WriteCloser, error)) (retErr error) { + handler func(binlogFilename string) (io.WriteCloser, error)) (retErr error) { if timeout == 0 { // a very long timeout here timeout = 30 * 3600 * 24 * time.Second From cb75639ce1094d2f2e46a20568c2f436507f93b3 Mon Sep 17 00:00:00 2001 From: doraemon Date: Fri, 2 Feb 2024 17:19:15 +0800 Subject: [PATCH 6/7] docs: Simplify the comments of the StartBackup method --- replication/backup.go | 23 +++++------------------ 1 file changed, 5 insertions(+), 18 deletions(-) diff --git a/replication/backup.go b/replication/backup.go index 5ef7ebe67..98aba92b2 100644 --- a/replication/backup.go +++ b/replication/backup.go @@ -23,27 +23,14 @@ func (b *BinlogSyncer) StartBackupToFile(backupDir string, p Position, timeout t }) } -// StartBackup initiates a backup process for binlog events starting from a specified position. -// It continuously fetches binlog events and writes them to files managed by a provided handler function. -// The backup process can be controlled with a timeout duration, after which the backup will stop if no new events are received. +// StartBackup starts the backup process for the binary log using the specified position and handler. +// The process will continue until the timeout is reached or an error occurs. // // Parameters: -// - p Position: The starting position in the binlog from which to begin the backup. -// - timeout time.Duration: The maximum duration to wait for new binlog events before stopping the backup process. +// - p: The starting position in the binlog from which to begin the backup. +// - timeout: The maximum duration to wait for new binlog events before stopping the backup process. // If set to 0, a default very long timeout (30 days) is used instead. -// - handler func(filename string) (io.WriteCloser, error): A function provided by the caller to handle file creation and writing. -// This function is expected to return an io.WriteCloser for the specified filename, which will be used to write binlog events. -// -// The function first checks if a timeout is specified, setting a default if not. It then enables raw mode parsing for binlog events -// to ensure that events are not parsed but passed as raw data for backup. It starts syncing binlog events from the specified position -// and enters a loop to continuously fetch and write events. -// -// For each event, it checks the event type. If it's a ROTATE_EVENT, it updates the filename to the next log file as indicated by the event. -// If it's a FORMAT_DESCRIPTION_EVENT, it signifies the start of a new binlog file, and the function closes the current file (if open) and opens -// a new one using the handler function. It also writes the binlog file header to the new file. -// -// The function writes the raw data of each event to the current file and handles errors such as context deadline exceeded (timeout), -// write errors, or short writes. +// - handler: A function that takes a binlog filename and returns an WriteCloser for writing raw events to. func (b *BinlogSyncer) StartBackup(p Position, timeout time.Duration, handler func(binlogFilename string) (io.WriteCloser, error)) (retErr error) { if timeout == 0 { From 785243b3b7c526352189533f08538694bbc43e3a Mon Sep 17 00:00:00 2001 From: doraemon Date: Sun, 4 Feb 2024 11:21:07 +0800 Subject: [PATCH 7/7] refactor: Rename the StartBackup method to StartBackupWithHandler so that the StartBackup method retains its original behavior --- cmd/go-mysqlbinlog/main.go | 2 +- replication/backup.go | 8 ++++---- replication/backup_test.go | 2 +- replication/replication_test.go | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/cmd/go-mysqlbinlog/main.go b/cmd/go-mysqlbinlog/main.go index af60eef6b..eb97ace9c 100644 --- a/cmd/go-mysqlbinlog/main.go +++ b/cmd/go-mysqlbinlog/main.go @@ -54,7 +54,7 @@ func main() { pos := mysql.Position{Name: *file, Pos: uint32(*pos)} if len(*backupPath) > 0 { // Backup will always use RawMode. - err := b.StartBackupToFile(*backupPath, pos, 0) + err := b.StartBackup(*backupPath, pos, 0) if err != nil { fmt.Printf("Start backup error: %v\n", errors.ErrorStack(err)) return diff --git a/replication/backup.go b/replication/backup.go index 98aba92b2..86265ae70 100644 --- a/replication/backup.go +++ b/replication/backup.go @@ -13,17 +13,17 @@ import ( // StartBackup: Like mysqlbinlog remote raw backup // Backup remote binlog from position (filename, offset) and write in backupDir -func (b *BinlogSyncer) StartBackupToFile(backupDir string, p Position, timeout time.Duration) error { +func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Duration) error { err := os.MkdirAll(backupDir, 0755) if err != nil { return errors.Trace(err) } - return b.StartBackup(p, timeout, func(filename string) (io.WriteCloser, error) { + return b.StartBackupWithHandler(p, timeout, func(filename string) (io.WriteCloser, error) { return os.OpenFile(path.Join(backupDir, filename), os.O_CREATE|os.O_WRONLY, 0644) }) } -// StartBackup starts the backup process for the binary log using the specified position and handler. +// StartBackupWithHandler starts the backup process for the binary log using the specified position and handler. // The process will continue until the timeout is reached or an error occurs. // // Parameters: @@ -31,7 +31,7 @@ func (b *BinlogSyncer) StartBackupToFile(backupDir string, p Position, timeout t // - timeout: The maximum duration to wait for new binlog events before stopping the backup process. // If set to 0, a default very long timeout (30 days) is used instead. // - handler: A function that takes a binlog filename and returns an WriteCloser for writing raw events to. -func (b *BinlogSyncer) StartBackup(p Position, timeout time.Duration, +func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration, handler func(binlogFilename string) (io.WriteCloser, error)) (retErr error) { if timeout == 0 { // a very long timeout here diff --git a/replication/backup_test.go b/replication/backup_test.go index cc088f49f..b4e1e98b6 100644 --- a/replication/backup_test.go +++ b/replication/backup_test.go @@ -28,7 +28,7 @@ func (t *testSyncerSuite) TestStartBackupEndInGivenTime() { done := make(chan bool) go func() { - err := t.b.StartBackupToFile(binlogDir, mysql.Position{Name: "", Pos: uint32(0)}, timeout) + err := t.b.StartBackup(binlogDir, mysql.Position{Name: "", Pos: uint32(0)}, timeout) require.NoError(t.T(), err) done <- true }() diff --git a/replication/replication_test.go b/replication/replication_test.go index 868a5c2da..8b70f9e44 100644 --- a/replication/replication_test.go +++ b/replication/replication_test.go @@ -426,7 +426,7 @@ func (t *testSyncerSuite) TestMysqlBinlogCodec() { os.RemoveAll(binlogDir) - err := t.b.StartBackupToFile(binlogDir, mysql.Position{Name: "", Pos: uint32(0)}, 2*time.Second) + err := t.b.StartBackup(binlogDir, mysql.Position{Name: "", Pos: uint32(0)}, 2*time.Second) require.NoError(t.T(), err) p := NewBinlogParser()