Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dumpling: add recent speed to dump status #38456

Merged
merged 9 commits into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dumpling/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type Dumper struct {
selectTiDBTableRegionFunc func(tctx *tcontext.Context, conn *BaseConn, meta TableMeta) (pkFields []string, pkVals [][]string, err error)
totalTables int64
charsetAndDefaultCollationMap map[string]string

statusRecorder *StatusRecorder
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry now I want to change the name to SpeedRecorder or something

}

// NewDumper returns a new Dumper
Expand All @@ -78,6 +80,7 @@ func NewDumper(ctx context.Context, conf *Config) (*Dumper, error) {
conf: conf,
cancelCtx: cancelFn,
selectTiDBTableRegionFunc: selectTiDBTableRegion,
statusRecorder: NewStatusRecorder(),
}

var err error
Expand Down
46 changes: 46 additions & 0 deletions dumpling/export/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package export

import (
"fmt"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -48,6 +49,7 @@ type DumpStatus struct {
FinishedRows float64
EstimateTotalRows float64
TotalTables int64
CurrentSpeedBPS int64
}

// GetStatus returns the status of dumping by reading metrics.
Expand All @@ -58,6 +60,7 @@ func (d *Dumper) GetStatus() *DumpStatus {
ret.FinishedBytes = ReadGauge(d.metrics.finishedSizeGauge)
ret.FinishedRows = ReadGauge(d.metrics.finishedRowsGauge)
ret.EstimateTotalRows = ReadCounter(d.metrics.estimateTotalRowsCounter)
ret.CurrentSpeedBPS = d.statusRecorder.GetSpeed(int64(ret.FinishedBytes))
return ret
}

Expand All @@ -72,3 +75,46 @@ func calculateTableCount(m DatabaseTables) int {
}
return cnt
}

// StatusRecorder record the finished bytes and calculate its speed.
type StatusRecorder struct {
mu sync.Mutex
lastFinished int64
lastUpdateTime time.Time
speedBPS int64
}

// NewStatusRecorder new a StatusRecorder.
func NewStatusRecorder() *StatusRecorder {
return &StatusRecorder{
lastUpdateTime: time.Now(),
}
}

// GetSpeed calculate status speed.
func (s *StatusRecorder) GetSpeed(finished int64) int64 {
s.mu.Lock()
defer s.mu.Unlock()

if finished <= s.lastFinished {
// for finished bytes does not get forwarded, use old speed to avoid
// display zero. We may find better strategy in future.
return s.speedBPS
}

now := time.Now()
elapsed := int64(now.Sub(s.lastUpdateTime).Seconds())
if elapsed == 0 {
elapsed = 1
}
currentSpeed := (finished - s.lastFinished) / elapsed
if currentSpeed == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use float variable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the unit of measurement should be Byte/s, so no need to use float.

currentSpeed = 1
}

s.lastFinished = finished
s.lastUpdateTime = now
s.speedBPS = currentSpeed

return currentSpeed
}
2 changes: 1 addition & 1 deletion dumpling/export/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

func TestGetParameters(t *testing.T) {
conf := defaultConfigForTest(t)
d := &Dumper{conf: conf}
d := &Dumper{conf: conf, statusRecorder: NewStatusRecorder()}
d.metrics = newMetrics(conf.PromFactory, nil)

mid := d.GetStatus()
Expand Down