From b3e2cc3d6f1d194a4cd62bfc3c5b0e1383689c76 Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Sun, 23 Jun 2019 23:22:08 +0800 Subject: [PATCH 1/4] pump/: Refine gc of pump 1, Add a API to trigger gc 2, Check L0 files by *GetProperty* instead of Stats Stats can't get the right L0 files num, see: https://github.com/syndtr/goleveldb/pull/283/files 3, call vlog.gcTS step by step, help free space quickly --- pump/server.go | 48 +++++++++++++++++++++++++++-------------- pump/storage/storage.go | 27 +++++++++++++++++------ pump/storage/vlog.go | 1 + 3 files changed, 54 insertions(+), 22 deletions(-) diff --git a/pump/server.go b/pump/server.go index e09bdfbe8..fef52ae93 100644 --- a/pump/server.go +++ b/pump/server.go @@ -76,6 +76,7 @@ type Server struct { cancel context.CancelFunc wg sync.WaitGroup gcDuration time.Duration + triggerGC chan time.Time metrics *util.MetricClient // save the last time we write binlog to Storage // if long time not write, we can write a fake binlog @@ -172,6 +173,7 @@ func NewServer(cfg *Config) (*Server, error) { gcDuration: time.Duration(cfg.GC) * 24 * time.Hour, pdCli: pdCli, cfg: cfg, + triggerGC: make(chan time.Time), }, nil } @@ -389,6 +391,7 @@ func (s *Server) Start() error { router.HandleFunc("/state/{nodeID}/{action}", s.ApplyAction).Methods("PUT") router.HandleFunc("/drainers", s.AllDrainers).Methods("GET") router.HandleFunc("/debug/binlog/{ts}", s.BinlogByTS).Methods("GET") + router.HandleFunc("/debug/gc/trigger", s.TriggerGC).Methods("GET") http.Handle("/", router) prometheus.DefaultGatherer = registry http.Handle("/metrics", promhttp.Handler()) @@ -500,26 +503,29 @@ func (s *Server) gcBinlogFile() { case <-s.ctx.Done(): log.Info("gcBinlogFile exit") return + case <-s.triggerGC: + log.Info("trigger gc now") case <-time.After(gcInterval): - if s.gcDuration == 0 { - continue - } + } - safeTSO, err := s.getSafeGCTSOForDrainers(s.ctx) - if err != nil { - log.Warn("get save gc tso for drainers failed", zap.Error(err)) - continue - } - log.Info("get safe ts for drainers success", zap.Int64("ts", safeTSO)) + if s.gcDuration == 0 { + continue + } - millisecond := time.Now().Add(-s.gcDuration).UnixNano() / 1000 / 1000 - gcTS := int64(oracle.EncodeTSO(millisecond)) - if safeTSO < gcTS { - gcTS = safeTSO - } - log.Info("send gc request to storage", zap.Int64("ts", gcTS)) - s.storage.GCTS(gcTS) + safeTSO, err := s.getSafeGCTSOForDrainers(s.ctx) + if err != nil { + log.Warn("get save gc tso for drainers failed", zap.Error(err)) + continue + } + log.Info("get safe ts for drainers success", zap.Int64("ts", safeTSO)) + + millisecond := time.Now().Add(-s.gcDuration).UnixNano() / 1000 / 1000 + gcTS := int64(oracle.EncodeTSO(millisecond)) + if safeTSO < gcTS { + gcTS = safeTSO } + log.Info("send gc request to storage", zap.Int64("ts", gcTS)) + s.storage.GCTS(gcTS) } } @@ -577,6 +583,16 @@ func (s *Server) Status(w http.ResponseWriter, r *http.Request) { s.PumpStatus().Status(w, r) } +// TriggerGC trigger pump to gc now +func (s *Server) TriggerGC(w http.ResponseWriter, r *http.Request) { + select { + case s.triggerGC <- time.Now(): + fmt.Fprintln(w, "trigger gc success") + case <-time.After(time.Second): + fmt.Fprintln(w, "gc is working") + } +} + // BinlogByTS exposes api get get binlog by ts func (s *Server) BinlogByTS(w http.ResponseWriter, r *http.Request) { tsStr := mux.Vars(r)["ts"] diff --git a/pump/storage/storage.go b/pump/storage/storage.go index 8e5d0c293..f9c7c3eaa 100644 --- a/pump/storage/storage.go +++ b/pump/storage/storage.go @@ -20,6 +20,7 @@ import ( "os" "path" "reflect" + "strconv" "sync" "sync/atomic" "time" @@ -597,7 +598,6 @@ func (a *Append) GCTS(ts int64) { // so we forward a little bit to make sure we can get the according P binlog a.doGCTS(ts - int64(oracle.EncodeTSO(maxTxnTimeoutSecond*1000))) }() - } func (a *Append) doGCTS(ts int64) { @@ -610,15 +610,22 @@ func (a *Append) doGCTS(ts int64) { } for { - var stats leveldb.DBStats - err := a.metadata.Stats(&stats) + nStr, err := a.metadata.GetProperty("leveldb.num-files-at-level0") if err != nil { - log.Error("Stats failed", zap.Error(err)) + log.Error("GetProperty failed", zap.Error(err)) time.Sleep(5 * time.Second) continue } - if len(stats.LevelTablesCounts) > 0 && stats.LevelTablesCounts[0] >= l0Trigger { - log.Info("wait some time to gc cause too many L0 file", zap.Int("files", stats.LevelTablesCounts[0])) + + l0Num, err := strconv.Atoi(nStr) + if err != nil { + log.Error("parse int failed", zap.String("str", nStr), zap.Error(err)) + time.Sleep(5 * time.Second) + continue + } + + if l0Num >= l0Trigger { + log.Info("wait some time to gc cause too many L0 file", zap.Int("files", l0Num)) time.Sleep(5 * time.Second) continue } @@ -630,8 +637,12 @@ func (a *Append) doGCTS(ts int64) { iter := a.metadata.NewIterator(irange, nil) deleteBatch := 0 + var lastKey []byte + for iter.Next() && deleteBatch < 100 { batch.Delete(iter.Key()) + lastKey = iter.Key() + if batch.Len() == 1024 { err := a.metadata.Write(batch, nil) if err != nil { @@ -654,6 +665,10 @@ func (a *Append) doGCTS(ts int64) { } break } + + if len(lastKey) > 0 { + a.vlog.gcTS(decodeTSKey(lastKey)) + } } a.vlog.gcTS(ts) diff --git a/pump/storage/vlog.go b/pump/storage/vlog.go index 840d568fd..78fbde57c 100644 --- a/pump/storage/vlog.go +++ b/pump/storage/vlog.go @@ -478,6 +478,7 @@ func (vlog *valueLog) gcTS(gcTS int64) { if err != nil { log.Error("remove file failed", zap.String("path", logFile.path), zap.Error(err)) } + log.Info("remove file", zap.String("path", logFile.path)) logFile.lock.Unlock() } } From 933b225a0b3a6b838d1c5d3007619274c9183cd1 Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Mon, 24 Jun 2019 15:56:24 +0800 Subject: [PATCH 2/4] Address comment --- pump/server.go | 4 ++-- pump/storage/storage.go | 10 ++++------ 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/pump/server.go b/pump/server.go index fef52ae93..93e9d193f 100644 --- a/pump/server.go +++ b/pump/server.go @@ -391,7 +391,7 @@ func (s *Server) Start() error { router.HandleFunc("/state/{nodeID}/{action}", s.ApplyAction).Methods("PUT") router.HandleFunc("/drainers", s.AllDrainers).Methods("GET") router.HandleFunc("/debug/binlog/{ts}", s.BinlogByTS).Methods("GET") - router.HandleFunc("/debug/gc/trigger", s.TriggerGC).Methods("GET") + router.HandleFunc("/debug/gc/trigger", s.TriggerGC).Methods("POST") http.Handle("/", router) prometheus.DefaultGatherer = registry http.Handle("/metrics", promhttp.Handler()) @@ -588,7 +588,7 @@ func (s *Server) TriggerGC(w http.ResponseWriter, r *http.Request) { select { case s.triggerGC <- time.Now(): fmt.Fprintln(w, "trigger gc success") - case <-time.After(time.Second): + default: fmt.Fprintln(w, "gc is working") } } diff --git a/pump/storage/storage.go b/pump/storage/storage.go index f9c7c3eaa..cb537ae2b 100644 --- a/pump/storage/storage.go +++ b/pump/storage/storage.go @@ -612,16 +612,14 @@ func (a *Append) doGCTS(ts int64) { for { nStr, err := a.metadata.GetProperty("leveldb.num-files-at-level0") if err != nil { - log.Error("GetProperty failed", zap.Error(err)) - time.Sleep(5 * time.Second) - continue + log.Error("get `leveldb.num-files-at-level0` property failed", zap.Error(err)) + return } l0Num, err := strconv.Atoi(nStr) if err != nil { - log.Error("parse int failed", zap.String("str", nStr), zap.Error(err)) - time.Sleep(5 * time.Second) - continue + log.Error("parse `leveldb.num-files-at-level0` result to int failed", zap.String("str", nStr), zap.Error(err)) + return } if l0Num >= l0Trigger { From 5e91e874a6db935d100e92c87c540e58663a0a39 Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Wed, 26 Jun 2019 15:34:41 +0800 Subject: [PATCH 3/4] pump/: add log --- pump/storage/storage.go | 7 ++++++- pump/storage/vlog.go | 2 ++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/pump/storage/storage.go b/pump/storage/storage.go index cb537ae2b..9ec6382ce 100644 --- a/pump/storage/storage.go +++ b/pump/storage/storage.go @@ -609,6 +609,8 @@ func (a *Append) doGCTS(ts int64) { l0Trigger = a.options.KVConfig.CompactionL0Trigger } + deleteNum := 0 + for { nStr, err := a.metadata.GetProperty("leveldb.num-files-at-level0") if err != nil { @@ -639,6 +641,7 @@ func (a *Append) doGCTS(ts int64) { for iter.Next() && deleteBatch < 100 { batch.Delete(iter.Key()) + deleteNum++ lastKey = iter.Key() if batch.Len() == 1024 { @@ -667,10 +670,12 @@ func (a *Append) doGCTS(ts int64) { if len(lastKey) > 0 { a.vlog.gcTS(decodeTSKey(lastKey)) } + + log.Info("has delete", zap.Int("delete num", deleteNum)) } a.vlog.gcTS(ts) - log.Info("finish gc", zap.Int64("ts", ts)) + log.Info("finish gc", zap.Int64("ts", ts), zap.Int("delete num", deleteNum)) } // MaxCommitTS implement Storage.MaxCommitTS diff --git a/pump/storage/vlog.go b/pump/storage/vlog.go index 78fbde57c..b5d485a70 100644 --- a/pump/storage/vlog.go +++ b/pump/storage/vlog.go @@ -450,6 +450,8 @@ func (vlog *valueLog) scan(start valuePointer, fn func(vp valuePointer, record * // delete data <= gcTS func (vlog *valueLog) gcTS(gcTS int64) { + log.Info("gc vlog", zap.Int64("ts", gcTS)) + vlog.filesLock.Lock() var toDeleteFiles []*logFile From 521babd42430a58179fd41c384dba2301ecaf214 Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Wed, 26 Jun 2019 15:49:16 +0800 Subject: [PATCH 4/4] go.mod: update goleveldb dependency only one new commit https://github.com/syndtr/goleveldb/commit/02440ea7a28525b3079783ad9c27083994a42366 Fix the stat about levels of the Stats API --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index ff68bda7e..d5bb4eb03 100644 --- a/go.mod +++ b/go.mod @@ -43,7 +43,7 @@ require ( github.com/soheilhy/cmux v0.1.2 github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 // indirect github.com/spf13/pflag v1.0.3 // indirect - github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965 + github.com/syndtr/goleveldb v1.0.1-0.20190625010220-02440ea7a285 github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect github.com/unrolled/render v0.0.0-20180807193321-4206df6ff701 github.com/zanmato1984/clickhouse v1.3.4-0.20181106115746-3e9a6b9beb12 diff --git a/go.sum b/go.sum index 4c9f9e8b1..b3e4f3e55 100644 --- a/go.sum +++ b/go.sum @@ -182,8 +182,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965 h1:1oFLiOyVl+W7bnBzGhf7BbIv9loSFQcieWWYIjLqcAw= -github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965/go.mod h1:9OrXJhf154huy1nPWmuSrkgjPUtUNhA+Zmy+6AESzuA= +github.com/syndtr/goleveldb v1.0.1-0.20190625010220-02440ea7a285 h1:uSDYjYejelKyceA6DiCsngFof9jAyeaSyX9XC5a1a7Q= +github.com/syndtr/goleveldb v1.0.1-0.20190625010220-02440ea7a285/go.mod h1:9OrXJhf154huy1nPWmuSrkgjPUtUNhA+Zmy+6AESzuA= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/twinj/uuid v0.0.0-20150629100731-70cac2bcd273 h1:YqFyfcgqxQqjpRr0SEG0Z555J/3kPqDL/xmRyeAaX/0=