Skip to content

Commit

Permalink
lightning(dm): add current speed in status
Browse files Browse the repository at this point in the history
Signed-off-by: lance6716 <lance6716@gmail.com>
  • Loading branch information
lance6716 committed Sep 26, 2022
1 parent d32cbe8 commit dcde5b9
Show file tree
Hide file tree
Showing 8 changed files with 409 additions and 282 deletions.
17 changes: 12 additions & 5 deletions dm/loader/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ type LightningLoader struct {
closed atomic.Bool
metaBinlog atomic.String
metaBinlogGTID atomic.String

statusRecorder *statusRecorder
}

// NewLightning creates a new Loader importing data with lightning.
Expand All @@ -89,6 +91,7 @@ func NewLightning(cfg *config.SubTaskConfig, cli *clientv3.Client, workerName st
lightningGlobalConfig: lightningCfg,
core: lightning.New(lightningCfg),
logger: logger.WithFields(zap.String("task", cfg.Name), zap.String("unit", "lightning-load")),
statusRecorder: newStatusRecorder(),
}
return loader
}
Expand Down Expand Up @@ -461,17 +464,21 @@ func (l *LightningLoader) Update(ctx context.Context, cfg *config.SubTaskConfig)
func (l *LightningLoader) status() *pb.LoadStatus {
finished, total := l.core.Status()
progress := percent(finished, total, l.finish.Load())
currentSpeed := l.statusRecorder.getSpeed(finished)

l.logger.Info("progress status of lightning",
zap.Int64("finished_bytes", finished),
zap.Int64("total_bytes", total),
zap.String("progress", progress),
zap.Int64("current speed (bytes / seconds)", currentSpeed),
)
s := &pb.LoadStatus{
FinishedBytes: finished,
TotalBytes: total,
Progress: progress,
MetaBinlog: l.metaBinlog.Load(),
MetaBinlogGTID: l.metaBinlogGTID.Load(),
FinishedBytes: finished,
TotalBytes: total,
Progress: progress,
MetaBinlog: l.metaBinlog.Load(),
MetaBinlogGTID: l.metaBinlogGTID.Load(),
CurrentSpeedBytesPerSecond: currentSpeed,
}
return s
}
Expand Down
16 changes: 9 additions & 7 deletions dm/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ type Loader struct {
dbTableDataFinishedSize map[string]map[string]*atomic.Int64
dbTableDataLastFinishedSize map[string]map[string]*atomic.Int64
dbTableDataLastUpdatedTime atomic.Time
statusRecorder *statusRecorder

metaBinlog atomic.String
metaBinlogGTID atomic.String
Expand All @@ -467,13 +468,14 @@ type Loader struct {
// NewLoader creates a new Loader.
func NewLoader(cfg *config.SubTaskConfig, cli *clientv3.Client, workerName string) *Loader {
loader := &Loader{
cfg: cfg,
cli: cli,
db2Tables: make(map[string]Tables2DataFiles),
tableInfos: make(map[string]*tableInfo),
workerWg: new(sync.WaitGroup),
logger: log.With(zap.String("task", cfg.Name), zap.String("unit", "load")),
workerName: workerName,
cfg: cfg,
cli: cli,
db2Tables: make(map[string]Tables2DataFiles),
tableInfos: make(map[string]*tableInfo),
workerWg: new(sync.WaitGroup),
logger: log.With(zap.String("task", cfg.Name), zap.String("unit", "load")),
workerName: workerName,
statusRecorder: newStatusRecorder(),
}
loader.fileJobQueueClosed.Store(true) // not open yet
return loader
Expand Down
54 changes: 49 additions & 5 deletions dm/loader/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package loader

import (
"sync"
"time"

"go.uber.org/zap"
Expand All @@ -22,17 +23,60 @@ import (
"github.com/pingcap/tiflow/dm/pkg/binlog"
)

type statusRecorder struct {
mu sync.Mutex
lastFinished int64
lastUpdateTime time.Time
speedBPS int64
}

func newStatusRecorder() *statusRecorder {
return &statusRecorder{
lastUpdateTime: time.Now(),
}
}

func (s *statusRecorder) getSpeed(finished int64) int64 {
s.mu.Lock()
defer s.mu.Unlock()

if finished == s.lastFinished {
// for finished bytes does not get forwarded, use old speed to avoid
// display zero. We may find better strategy in future.
return s.speedBPS
}

now := time.Now()
elapsed := int64(now.Sub(s.lastUpdateTime).Seconds())
if elapsed == 0 {
elapsed = 1
}
currentSpeed := (finished - s.lastFinished) / elapsed
if currentSpeed == 0 {
currentSpeed = 1
}

s.lastFinished = finished
s.lastUpdateTime = now
s.speedBPS = currentSpeed

return currentSpeed
}

// Status implements Unit.Status.
func (l *Loader) Status(_ *binlog.SourceStatus) interface{} {
finishedSize := l.finishedDataSize.Load()
totalSize := l.totalDataSize.Load()
progress := percent(finishedSize, totalSize, l.finish.Load())
currentSpeed := l.statusRecorder.getSpeed(finishedSize)

s := &pb.LoadStatus{
FinishedBytes: finishedSize,
TotalBytes: totalSize,
Progress: progress,
MetaBinlog: l.metaBinlog.Load(),
MetaBinlogGTID: l.metaBinlogGTID.Load(),
FinishedBytes: finishedSize,
TotalBytes: totalSize,
Progress: progress,
MetaBinlog: l.metaBinlog.Load(),
MetaBinlogGTID: l.metaBinlogGTID.Load(),
CurrentSpeedBytesPerSecond: currentSpeed,
}
go l.printStatus()
return s
Expand Down
Loading

0 comments on commit dcde5b9

Please sign in to comment.