Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dumpling: add recent speed to dump status #38456

Merged
merged 9 commits into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dumpling/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type Dumper struct {
selectTiDBTableRegionFunc func(tctx *tcontext.Context, conn *BaseConn, meta TableMeta) (pkFields []string, pkVals [][]string, err error)
totalTables int64
charsetAndDefaultCollationMap map[string]string

speedRecorder *SpeedRecorder
}

// NewDumper returns a new Dumper
Expand All @@ -78,6 +80,7 @@ func NewDumper(ctx context.Context, conf *Config) (*Dumper, error) {
conf: conf,
cancelCtx: cancelFn,
selectTiDBTableRegionFunc: selectTiDBTableRegion,
speedRecorder: NewSpeedRecorder(),
}

var err error
Expand Down
46 changes: 46 additions & 0 deletions dumpling/export/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package export

import (
"fmt"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -48,6 +49,7 @@ type DumpStatus struct {
FinishedRows float64
EstimateTotalRows float64
TotalTables int64
CurrentSpeedBPS int64
}

// GetStatus returns the status of dumping by reading metrics.
Expand All @@ -58,6 +60,7 @@ func (d *Dumper) GetStatus() *DumpStatus {
ret.FinishedBytes = ReadGauge(d.metrics.finishedSizeGauge)
ret.FinishedRows = ReadGauge(d.metrics.finishedRowsGauge)
ret.EstimateTotalRows = ReadCounter(d.metrics.estimateTotalRowsCounter)
ret.CurrentSpeedBPS = d.speedRecorder.GetSpeed(int64(ret.FinishedBytes))
return ret
}

Expand All @@ -72,3 +75,46 @@ func calculateTableCount(m DatabaseTables) int {
}
return cnt
}

// SpeedRecorder record the finished bytes and calculate its speed.
type SpeedRecorder struct {
mu sync.Mutex
lastFinished int64
lastUpdateTime time.Time
speedBPS int64
}

// NewSpeedRecorder new a SpeedRecorder.
func NewSpeedRecorder() *SpeedRecorder {
return &SpeedRecorder{
lastUpdateTime: time.Now(),
}
}

// GetSpeed calculate status speed.
func (s *SpeedRecorder) GetSpeed(finished int64) int64 {
s.mu.Lock()
defer s.mu.Unlock()

if finished <= s.lastFinished {
// for finished bytes does not get forwarded, use old speed to avoid
// display zero. We may find better strategy in future.
return s.speedBPS
}

now := time.Now()
elapsed := int64(now.Sub(s.lastUpdateTime).Seconds())
if elapsed == 0 {
elapsed = 1
}
currentSpeed := (finished - s.lastFinished) / elapsed
if currentSpeed == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use float variable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the unit of measurement should be Byte/s, so no need to use float.

currentSpeed = 1
}

s.lastFinished = finished
s.lastUpdateTime = now
s.speedBPS = currentSpeed

return currentSpeed
}
22 changes: 21 additions & 1 deletion dumpling/export/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ package export

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestGetParameters(t *testing.T) {
conf := defaultConfigForTest(t)
d := &Dumper{conf: conf}
d := &Dumper{conf: conf, speedRecorder: NewSpeedRecorder()}
d.metrics = newMetrics(conf.PromFactory, nil)

mid := d.GetStatus()
Expand All @@ -30,3 +31,22 @@ func TestGetParameters(t *testing.T) {
require.EqualValues(t, float64(30), mid.FinishedRows)
require.EqualValues(t, float64(40), mid.EstimateTotalRows)
}

func TestSpeedRecorder(t *testing.T) {
testCases := []struct {
spentTime int64
finished int64
expected int64
}{
{spentTime: 1, finished: 100, expected: 100},
{spentTime: 2, finished: 200, expected: 50},
// already finished, will return last speed
{spentTime: 3, finished: 200, expected: 50},
}
speedRecorder := NewSpeedRecorder()
for _, tc := range testCases {
time.Sleep(time.Duration(tc.spentTime) * time.Second)
recentSpeed := speedRecorder.GetSpeed(tc.finished)
require.Equal(t, tc.expected, recentSpeed)
}
}