From 67ce8be39620849af90aa8b084138c9d69de6ee9 Mon Sep 17 00:00:00 2001 From: satoru Date: Tue, 14 Jan 2020 18:06:47 +0800 Subject: [PATCH] drainer: Fix #724, Enable drainer to purge old incremental backup data on disk --- cmd/drainer/drainer.toml | 1 + drainer/relay/relayer.go | 2 +- drainer/sync/pb.go | 37 ++++++++++++++++++--- drainer/sync/syncer_test.go | 13 ++++---- drainer/sync/util.go | 11 ++++--- drainer/syncer.go | 2 +- pkg/binlogfile/binlogger.go | 56 ++++++++++++++++++++++---------- pkg/binlogfile/binlogger_test.go | 55 +++++++++++++++++++++++-------- 8 files changed, 130 insertions(+), 47 deletions(-) diff --git a/cmd/drainer/drainer.toml b/cmd/drainer/drainer.toml index fdd47c57b..f2eb3f5aa 100644 --- a/cmd/drainer/drainer.toml +++ b/cmd/drainer/drainer.toml @@ -126,6 +126,7 @@ port = 3306 # kafka-version = "0.8.2.0" # kafka-max-messages = 1024 # +# retention-time = 7 # # the topic name drainer will push msg, the default name is _obinlog # be careful don't use the same name if run multi drainer instances diff --git a/drainer/relay/relayer.go b/drainer/relay/relayer.go index 613eff57d..70091659f 100644 --- a/drainer/relay/relayer.go +++ b/drainer/relay/relayer.go @@ -79,7 +79,7 @@ func (r *relayer) WriteBinlog(schema string, table string, tiBinlog *tb.Binlog, func (r *relayer) GCBinlog(pos tb.Pos) { // If the file suffix increases, it means previous files are useless. if pos.Suffix > r.nextGCFileSuffix { - r.binlogger.GC(0, pos) + r.binlogger.GCByPos(pos) r.nextGCFileSuffix = pos.Suffix } } diff --git a/drainer/sync/pb.go b/drainer/sync/pb.go index d437c9099..781c930c1 100644 --- a/drainer/sync/pb.go +++ b/drainer/sync/pb.go @@ -14,7 +14,11 @@ package sync import ( + "context" + "time" + "github.com/pingcap/errors" + "github.com/pingcap/log" "github.com/pingcap/tidb-binlog/drainer/translator" "github.com/pingcap/tidb-binlog/pkg/binlogfile" pb "github.com/pingcap/tidb-binlog/proto/binlog" @@ -24,21 +28,44 @@ import ( var _ Syncer = &pbSyncer{} type pbSyncer struct { - binlogger binlogfile.Binlogger - *baseSyncer + + binlogger binlogfile.Binlogger + cancel func() } // NewPBSyncer sync binlog to files -func NewPBSyncer(dir string, tableInfoGetter translator.TableInfoGetter) (*pbSyncer, error) { - binlogger, err := binlogfile.OpenBinlogger(dir, binlogfile.SegmentSizeBytes) +func NewPBSyncer(cfg *DBConfig, tableInfoGetter translator.TableInfoGetter) (*pbSyncer, error) { + binlogger, err := binlogfile.OpenBinlogger(cfg.BinlogFileDir, binlogfile.SegmentSizeBytes) if err != nil { return nil, errors.Trace(err) } + ctx, cancel := context.WithCancel(context.TODO()) + s := &pbSyncer{ binlogger: binlogger, baseSyncer: newBaseSyncer(tableInfoGetter), + cancel: cancel, + } + + if cfg.BinlogFileRetentionTime > 0 { + // TODO: Add support for human readable format input of times like "7d", "12h" + retentionTime := time.Duration(cfg.BinlogFileRetentionTime) * 24 * time.Hour + ticker := time.NewTicker(time.Hour) + go func() { + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + log.Info("Binlog GC loop stopped") + return + case <-ticker.C: + log.Info("Trying to GC binlog files") + binlogger.GCByTime(retentionTime) + } + } + }() } return s, nil @@ -71,6 +98,8 @@ func (p *pbSyncer) saveBinlog(binlog *pb.Binlog) error { } func (p *pbSyncer) Close() error { + p.cancel() + err := p.binlogger.Close() p.setErr(err) close(p.success) diff --git a/drainer/sync/syncer_test.go b/drainer/sync/syncer_test.go index e78b4c14d..14fd15b51 100644 --- a/drainer/sync/syncer_test.go +++ b/drainer/sync/syncer_test.go @@ -42,15 +42,16 @@ type syncerSuite struct { func (s *syncerSuite) SetUpTest(c *check.C) { var infoGetter translator.TableInfoGetter cfg := &DBConfig{ - Host: "localhost", - User: "root", - Password: "", - Port: 3306, - KafkaVersion: "0.8.2.0", + Host: "localhost", + User: "root", + Password: "", + Port: 3306, + KafkaVersion: "0.8.2.0", + BinlogFileDir: c.MkDir(), } // create pb syncer - pb, err := NewPBSyncer(c.MkDir(), infoGetter) + pb, err := NewPBSyncer(cfg, infoGetter) c.Assert(err, check.IsNil) s.syncers = append(s.syncers, pb) diff --git a/drainer/sync/util.go b/drainer/sync/util.go index cfbbc233f..f569c433c 100644 --- a/drainer/sync/util.go +++ b/drainer/sync/util.go @@ -24,11 +24,12 @@ type DBConfig struct { User string `toml:"user" json:"user"` Password string `toml:"password" json:"password"` // if EncryptedPassword is not empty, Password will be ignore. - EncryptedPassword string `toml:"encrypted_password" json:"encrypted_password"` - SyncMode int `toml:"sync-mode" json:"sync-mode"` - Port int `toml:"port" json:"port"` - Checkpoint CheckpointConfig `toml:"checkpoint" json:"checkpoint"` - BinlogFileDir string `toml:"dir" json:"dir"` + EncryptedPassword string `toml:"encrypted_password" json:"encrypted_password"` + SyncMode int `toml:"sync-mode" json:"sync-mode"` + Port int `toml:"port" json:"port"` + Checkpoint CheckpointConfig `toml:"checkpoint" json:"checkpoint"` + BinlogFileDir string `toml:"dir" json:"dir"` + BinlogFileRetentionTime int `toml:"retention-time" json:"retention-time"` ZKAddrs string `toml:"zookeeper-addrs" json:"zookeeper-addrs"` KafkaAddrs string `toml:"kafka-addrs" json:"kafka-addrs"` diff --git a/drainer/syncer.go b/drainer/syncer.go index 9c7fda309..cc261ba53 100644 --- a/drainer/syncer.go +++ b/drainer/syncer.go @@ -95,7 +95,7 @@ func createDSyncer(cfg *SyncerConfig, schema *Schema) (dsyncer dsync.Syncer, err return nil, errors.Annotate(err, "fail to create kafka dsyncer") } case "file": - dsyncer, err = dsync.NewPBSyncer(cfg.To.BinlogFileDir, schema) + dsyncer, err = dsync.NewPBSyncer(cfg.To, schema) if err != nil { return nil, errors.Annotate(err, "fail to create pb dsyncer") } diff --git a/pkg/binlogfile/binlogger.go b/pkg/binlogfile/binlogger.go index b6bfef34d..70399fcf3 100644 --- a/pkg/binlogfile/binlogger.go +++ b/pkg/binlogfile/binlogger.go @@ -63,8 +63,11 @@ type Binlogger interface { // close the binlogger Close() error - // GC recycles the old binlog file - GC(days time.Duration, pos binlog.Pos) + // GGCByTime delete all files that's older than the specified duration, the latest file is always kept + GCByTime(retentionTime time.Duration) + + // GCByPos delete all files that's before the specified position, the latest file is always kept + GCByPos(pos binlog.Pos) } // binlogger is a logical representation of the log storage @@ -348,8 +351,37 @@ func (b *binlogger) Walk(ctx context.Context, from binlog.Pos, sendBinlog func(e return nil } -// GC recycles the old binlog file -func (b *binlogger) GC(days time.Duration, pos binlog.Pos) { +// GCByPos delete all files that's before the specified position, the latest file is always kept +func (b *binlogger) GCByPos(pos binlog.Pos) { + names, err := ReadBinlogNames(b.dir) + if err != nil { + log.Error("read binlog files failed", zap.Error(err)) + return + } + + if len(names) == 0 { + return + } + + // skip the latest binlog file + for _, name := range names[:len(names)-1] { + curSuffix, _, err := ParseBinlogName(name) + if err != nil { + log.Error("parse binlog failed", zap.Error(err)) + } + if curSuffix < pos.Suffix { + fileName := path.Join(b.dir, name) + if err := os.Remove(fileName); err != nil { + log.Error("remove old binlog file err", zap.Error(err), zap.String("file name", fileName)) + continue + } + log.Info("GC binlog file", zap.String("file name", fileName)) + } + } +} + +// GGCByTime delete all files that's older than the specified duration, the latest file is always kept +func (b *binlogger) GCByTime(retentionTime time.Duration) { names, err := ReadBinlogNames(b.dir) if err != nil { log.Error("read binlog files failed", zap.Error(err)) @@ -369,22 +401,12 @@ func (b *binlogger) GC(days time.Duration, pos binlog.Pos) { continue } - curSuffix, _, err := ParseBinlogName(name) - if err != nil { - log.Error("parse binlog failed", zap.Error(err)) - } - - if curSuffix < pos.Suffix { - err := os.Remove(fileName) - if err != nil { - log.Error("remove old binlog file err") + if time.Since(fi.ModTime()) > retentionTime { + if err := os.Remove(fileName); err != nil { + log.Error("remove old binlog file err", zap.Error(err), zap.String("file name", fileName)) continue } log.Info("GC binlog file", zap.String("file name", fileName)) - } else if time.Since(fi.ModTime()) > days { - log.Warn( - "binlog file is old enough to be garbage collected, but the position is behind the safe point", - zap.String("name", fileName), zap.Stringer("position", &pos)) } } } diff --git a/pkg/binlogfile/binlogger_test.go b/pkg/binlogfile/binlogger_test.go index 5fc82b5de..8c98f2c72 100644 --- a/pkg/binlogfile/binlogger_test.go +++ b/pkg/binlogfile/binlogger_test.go @@ -202,7 +202,43 @@ func (s *testBinloggerSuite) TestCourruption(c *C) { c.Assert(errors.Cause(err), Equals, io.ErrUnexpectedEOF) } -func (s *testBinloggerSuite) TestGC(c *C) { +func assertBinlogsCount(c *C, dir string, expected int) { + names, err := ReadBinlogNames(dir) + c.Assert(err, IsNil) + c.Assert(names, HasLen, expected) +} + +func (s *testBinloggerSuite) TestGCByPos(c *C) { + dir := c.MkDir() + bl, err := OpenBinlogger(dir, SegmentSizeBytes) + c.Assert(err, IsNil) + // A binlog file with index 0 is created at this point + defer func() { + err := CloseBinlogger(bl) + c.Assert(err, IsNil) + }() + + b, ok := bl.(*binlogger) + c.Assert(ok, IsTrue) + // Call rotate multiple times to create new binlog files + for i := 0; i < 4; i++ { + err = b.rotate() + c.Assert(err, IsNil) + } + + // We should have 1 + 4 files by now + assertBinlogsCount(c, b.dir, 5) + + b.GCByPos(binlog.Pos{Suffix: 2}) + + assertBinlogsCount(c, b.dir, 3) + + b.GCByPos(binlog.Pos{Suffix: 10}) + + assertBinlogsCount(c, b.dir, 1) +} + +func (s *testBinloggerSuite) TestGCByTime(c *C) { dir := c.MkDir() bl, err := OpenBinlogger(dir, SegmentSizeBytes) c.Assert(err, IsNil) @@ -218,22 +254,15 @@ func (s *testBinloggerSuite) TestGC(c *C) { // 2. rotate creates a new binlog file with index 1 c.Assert(err, IsNil) - // No binlog files should be collected, - // because both of the files has an index that's >= 0 - time.Sleep(10 * time.Millisecond) - b.GC(time.Millisecond, binlog.Pos{Suffix: 0}) - names, err := ReadBinlogNames(b.dir) c.Assert(err, IsNil) c.Assert(names, HasLen, 2) - for i, name := range names { - suffix, _, err := ParseBinlogName(name) - c.Assert(err, IsNil) - c.Assert(suffix, Equals, uint64(i)) - } - // The one with index 0 should be garbage collected - b.GC(time.Millisecond, binlog.Pos{Suffix: 1}) + // Should collect the first file because it's more than 1ms old after + // the following sleep + time.Sleep(10 * time.Millisecond) + b.GCByTime(time.Millisecond) + names, err = ReadBinlogNames(b.dir) c.Assert(err, IsNil) c.Assert(names, HasLen, 1)