Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

drainer: Fix #724, Enable drainer to purge old incremental backup data on disk #885

Merged
merged 6 commits into from
Feb 7, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/drainer/drainer.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ port = 3306
#[syncer.to]
# directory to save binlog file, default same as data-dir(save checkpoint file) if this is not configured.
# dir = "data.drainer"
#
# retention-time = 7


# when db-type is kafka, you can uncomment this to config the down stream kafka, it will be the globle config kafka default
Expand All @@ -139,7 +141,6 @@ port = 3306
# kafka-version = "0.8.2.0"
# kafka-max-messages = 1024
#
#
# the topic name drainer will push msg, the default name is <cluster-id>_obinlog
# be careful don't use the same name if run multi drainer instances
# topic-name = ""
2 changes: 1 addition & 1 deletion drainer/relay/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (r *relayer) WriteBinlog(schema string, table string, tiBinlog *tb.Binlog,
func (r *relayer) GCBinlog(pos tb.Pos) {
// If the file suffix increases, it means previous files are useless.
if pos.Suffix > r.nextGCFileSuffix {
r.binlogger.GC(0, pos)
r.binlogger.GCByPos(pos)
r.nextGCFileSuffix = pos.Suffix
}
}
Expand Down
35 changes: 32 additions & 3 deletions drainer/sync/pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
package sync

import (
"context"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb-binlog/drainer/translator"
"github.com/pingcap/tidb-binlog/pkg/binlogfile"
pb "github.com/pingcap/tidb-binlog/proto/binlog"
Expand All @@ -24,21 +28,44 @@ import (
var _ Syncer = &pbSyncer{}

type pbSyncer struct {
binlogger binlogfile.Binlogger

*baseSyncer

binlogger binlogfile.Binlogger
cancel func()
}

// NewPBSyncer sync binlog to files
func NewPBSyncer(dir string, tableInfoGetter translator.TableInfoGetter) (*pbSyncer, error) {
func NewPBSyncer(dir string, retentionDays int, tableInfoGetter translator.TableInfoGetter) (*pbSyncer, error) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported func NewPBSyncer returns unexported type *sync.pbSyncer, which can be annoying to use

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported func NewPBSyncer returns unexported type *sync.pbSyncer, which can be annoying to use

binlogger, err := binlogfile.OpenBinlogger(dir, binlogfile.SegmentSizeBytes)
if err != nil {
return nil, errors.Trace(err)
}

ctx, cancel := context.WithCancel(context.TODO())

s := &pbSyncer{
binlogger: binlogger,
baseSyncer: newBaseSyncer(tableInfoGetter),
cancel: cancel,
}

if retentionDays > 0 {
// TODO: Add support for human readable format input of times like "7d", "12h"
retentionTime := time.Duration(retentionDays) * 24 * time.Hour
ticker := time.NewTicker(time.Hour)
go func() {
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Info("Binlog GC loop stopped")
return
case <-ticker.C:
log.Info("Trying to GC binlog files")
binlogger.GCByTime(retentionTime)
}
}
}()
}

return s, nil
Expand Down Expand Up @@ -71,6 +98,8 @@ func (p *pbSyncer) saveBinlog(binlog *pb.Binlog) error {
}

func (p *pbSyncer) Close() error {
p.cancel()

err := p.binlogger.Close()
p.setErr(err)
close(p.success)
Expand Down
13 changes: 7 additions & 6 deletions drainer/sync/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,16 @@ type syncerSuite struct {
func (s *syncerSuite) SetUpTest(c *check.C) {
var infoGetter translator.TableInfoGetter
cfg := &DBConfig{
Host: "localhost",
User: "root",
Password: "",
Port: 3306,
KafkaVersion: "0.8.2.0",
Host: "localhost",
User: "root",
Password: "",
Port: 3306,
KafkaVersion: "0.8.2.0",
BinlogFileDir: c.MkDir(),
}

// create pb syncer
pb, err := NewPBSyncer(c.MkDir(), infoGetter)
pb, err := NewPBSyncer(cfg.BinlogFileDir, cfg.BinlogFileRetentionTime, infoGetter)
c.Assert(err, check.IsNil)

s.syncers = append(s.syncers, pb)
Expand Down
11 changes: 6 additions & 5 deletions drainer/sync/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ type DBConfig struct {
User string `toml:"user" json:"user"`
Password string `toml:"password" json:"password"`
// if EncryptedPassword is not empty, Password will be ignore.
EncryptedPassword string `toml:"encrypted_password" json:"encrypted_password"`
SyncMode int `toml:"sync-mode" json:"sync-mode"`
Port int `toml:"port" json:"port"`
Checkpoint CheckpointConfig `toml:"checkpoint" json:"checkpoint"`
BinlogFileDir string `toml:"dir" json:"dir"`
EncryptedPassword string `toml:"encrypted_password" json:"encrypted_password"`
SyncMode int `toml:"sync-mode" json:"sync-mode"`
Port int `toml:"port" json:"port"`
Checkpoint CheckpointConfig `toml:"checkpoint" json:"checkpoint"`
BinlogFileDir string `toml:"dir" json:"dir"`
BinlogFileRetentionTime int `toml:"retention-time" json:"retention-time"`

ZKAddrs string `toml:"zookeeper-addrs" json:"zookeeper-addrs"`
KafkaAddrs string `toml:"kafka-addrs" json:"kafka-addrs"`
Expand Down
2 changes: 1 addition & 1 deletion drainer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func createDSyncer(cfg *SyncerConfig, schema *Schema, info *loopbacksync.LoopBac
return nil, errors.Annotate(err, "fail to create kafka dsyncer")
}
case "file":
dsyncer, err = dsync.NewPBSyncer(cfg.To.BinlogFileDir, schema)
dsyncer, err = dsync.NewPBSyncer(cfg.To.BinlogFileDir, cfg.To.BinlogFileRetentionTime, schema)
if err != nil {
return nil, errors.Annotate(err, "fail to create pb dsyncer")
}
Expand Down
56 changes: 39 additions & 17 deletions pkg/binlogfile/binlogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,11 @@ type Binlogger interface {
// close the binlogger
Close() error

// GC recycles the old binlog file
GC(days time.Duration, pos binlog.Pos)
// GGCByTime delete all files that's older than the specified duration, the latest file is always kept
GCByTime(retentionTime time.Duration)

// GCByPos delete all files that's before the specified position, the latest file is always kept
GCByPos(pos binlog.Pos)
}

// binlogger is a logical representation of the log storage
Expand Down Expand Up @@ -348,8 +351,37 @@ func (b *binlogger) Walk(ctx context.Context, from binlog.Pos, sendBinlog func(e
return nil
}

// GC recycles the old binlog file
func (b *binlogger) GC(days time.Duration, pos binlog.Pos) {
// GCByPos delete all files that's before the specified position, the latest file is always kept
func (b *binlogger) GCByPos(pos binlog.Pos) {
names, err := ReadBinlogNames(b.dir)
if err != nil {
log.Error("read binlog files failed", zap.Error(err))
return
}

if len(names) == 0 {
return
}

// skip the latest binlog file
for _, name := range names[:len(names)-1] {
curSuffix, _, err := ParseBinlogName(name)
if err != nil {
log.Error("parse binlog failed", zap.Error(err))
}
if curSuffix < pos.Suffix {
fileName := path.Join(b.dir, name)
if err := os.Remove(fileName); err != nil {
log.Error("remove old binlog file err", zap.Error(err), zap.String("file name", fileName))
continue
}
log.Info("GC binlog file", zap.String("file name", fileName))
}
}
}

// GGCByTime delete all files that's older than the specified duration, the latest file is always kept
func (b *binlogger) GCByTime(retentionTime time.Duration) {
names, err := ReadBinlogNames(b.dir)
if err != nil {
log.Error("read binlog files failed", zap.Error(err))
Expand All @@ -369,22 +401,12 @@ func (b *binlogger) GC(days time.Duration, pos binlog.Pos) {
continue
}

curSuffix, _, err := ParseBinlogName(name)
if err != nil {
log.Error("parse binlog failed", zap.Error(err))
}

if curSuffix < pos.Suffix {
err := os.Remove(fileName)
if err != nil {
log.Error("remove old binlog file err")
if time.Since(fi.ModTime()) > retentionTime {
if err := os.Remove(fileName); err != nil {
log.Error("remove old binlog file err", zap.Error(err), zap.String("file name", fileName))
continue
}
log.Info("GC binlog file", zap.String("file name", fileName))
} else if time.Since(fi.ModTime()) > days {
log.Warn(
"binlog file is old enough to be garbage collected, but the position is behind the safe point",
zap.String("name", fileName), zap.Stringer("position", &pos))
}
}
}
Expand Down
55 changes: 42 additions & 13 deletions pkg/binlogfile/binlogger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,43 @@ func (s *testBinloggerSuite) TestCourruption(c *C) {
c.Assert(errors.Cause(err), Equals, io.ErrUnexpectedEOF)
}

func (s *testBinloggerSuite) TestGC(c *C) {
func assertBinlogsCount(c *C, dir string, expected int) {
names, err := ReadBinlogNames(dir)
c.Assert(err, IsNil)
c.Assert(names, HasLen, expected)
}

func (s *testBinloggerSuite) TestGCByPos(c *C) {
dir := c.MkDir()
bl, err := OpenBinlogger(dir, SegmentSizeBytes)
c.Assert(err, IsNil)
// A binlog file with index 0 is created at this point
defer func() {
err := CloseBinlogger(bl)
c.Assert(err, IsNil)
}()

b, ok := bl.(*binlogger)
c.Assert(ok, IsTrue)
// Call rotate multiple times to create new binlog files
for i := 0; i < 4; i++ {
err = b.rotate()
c.Assert(err, IsNil)
}

// We should have 1 + 4 files by now
assertBinlogsCount(c, b.dir, 5)

b.GCByPos(binlog.Pos{Suffix: 2})

assertBinlogsCount(c, b.dir, 3)

b.GCByPos(binlog.Pos{Suffix: 10})

assertBinlogsCount(c, b.dir, 1)
}

func (s *testBinloggerSuite) TestGCByTime(c *C) {
dir := c.MkDir()
bl, err := OpenBinlogger(dir, SegmentSizeBytes)
c.Assert(err, IsNil)
Expand All @@ -218,22 +254,15 @@ func (s *testBinloggerSuite) TestGC(c *C) {
// 2. rotate creates a new binlog file with index 1
c.Assert(err, IsNil)

// No binlog files should be collected,
// because both of the files has an index that's >= 0
time.Sleep(10 * time.Millisecond)
b.GC(time.Millisecond, binlog.Pos{Suffix: 0})

names, err := ReadBinlogNames(b.dir)
c.Assert(err, IsNil)
c.Assert(names, HasLen, 2)
for i, name := range names {
suffix, _, err := ParseBinlogName(name)
c.Assert(err, IsNil)
c.Assert(suffix, Equals, uint64(i))
}

// The one with index 0 should be garbage collected
b.GC(time.Millisecond, binlog.Pos{Suffix: 1})
// Should collect the first file because it's more than 1ms old after
// the following sleep
time.Sleep(10 * time.Millisecond)
b.GCByTime(time.Millisecond)

names, err = ReadBinlogNames(b.dir)
c.Assert(err, IsNil)
c.Assert(names, HasLen, 1)
Expand Down