Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication and vstreamer metrics #6519

Merged
merged 4 commits into from
Aug 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions go/stats/timings.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ func NewTimings(name, help, label string, categories ...string) *Timings {
return t
}

// Reset will clear histograms: used during testing
func (t *Timings) Reset() {
t.mu.RLock()
t.histograms = make(map[string]*Histogram)
t.mu.RUnlock()
}

// Add will add a new value to the named histogram.
func (t *Timings) Add(name string, elapsed time.Duration) {
if t.labelCombined {
Expand Down
12 changes: 12 additions & 0 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ type Stats struct {
History *history.History

State sync2.AtomicString

PhaseTimings *stats.Timings
QueryTimings *stats.Timings
QueryCount *stats.CountersWithSingleLabel
CopyRowCount *stats.Counter
CopyLoopCount *stats.Counter
}

// SetLastPosition sets the last replication position.
Expand Down Expand Up @@ -118,6 +124,12 @@ func NewStats() *Stats {
bps.Rates = stats.NewRates("", bps.Timings, 15*60/5, 5*time.Second)
bps.History = history.New(3)
bps.SecondsBehindMaster.Set(math.MaxInt64)
bps.PhaseTimings = stats.NewTimings("", "", "Phase")
bps.QueryTimings = stats.NewTimings("", "", "Phase")
bps.QueryCount = stats.NewCountersWithSingleLabel("", "", "Phase", "")
bps.CopyRowCount = stats.NewCounter("", "")
bps.CopyLoopCount = stats.NewCounter("", "")

return bps
}

Expand Down
10 changes: 10 additions & 0 deletions go/vt/servenv/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,11 @@ func (tw *TimingsWrapper) Counts() map[string]int64 {
return tw.timings.Counts()
}

// Reset will clear histograms: used during testing
func (tw *TimingsWrapper) Reset() {
tw.timings.Reset()
}

//-----------------------------------------------------------------

// MultiTimingsWrapper provides a namespaced version of stats.MultiTimings.
Expand Down Expand Up @@ -668,6 +673,11 @@ func (tw *MultiTimingsWrapper) Counts() map[string]int64 {
return tw.timings.Counts()
}

// Reset will clear histograms: used during testing
func (tw *MultiTimingsWrapper) Reset() {
tw.timings.Reset()
}

//-----------------------------------------------------------------

// handleFunc stores the http Handler for an Exporter. This function can
Expand Down
22 changes: 22 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/golang/protobuf/proto"
"golang.org/x/net/context"

Expand Down Expand Up @@ -568,3 +570,23 @@ func customExpectData(t *testing.T, table string, values [][]string, exec func(c
}
}
}

func validateQueryCountStat(t *testing.T, phase string, want int64) {
var count int64
for _, ct := range globalStats.status().Controllers {
for ph, cnt := range ct.QueryCounts {
if ph == phase {
count += cnt
}
}
}
require.Equal(t, want, count, "QueryCount stat is incorrect")
}

func validateCopyRowCountStat(t *testing.T, want int64) {
var count int64
for _, ct := range globalStats.status().Controllers {
count += ct.CopyRowCount
}
require.Equal(t, want, count, "CopyRowCount stat is incorrect")
}
144 changes: 143 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (st *vrStats) register() {
})

stats.NewCounterFunc(
"VReplicationTotalSecondsBehindMaster",
"VReplicationSecondsBehindMasterTotal",
"vreplication seconds behind master aggregated across all streams",
func() int64 {
st.mu.Lock()
Expand Down Expand Up @@ -118,6 +118,140 @@ func (st *vrStats) register() {
}
return result
}))

stats.NewGaugesFuncWithMultiLabels(
"VReplicationPhaseTimings",
"vreplication per phase timings per stream",
[]string{"source_keyspace", "source_shard", "workflow", "counts", "phase"},
func() map[string]int64 {
st.mu.Lock()
defer st.mu.Unlock()
result := make(map[string]int64, len(st.controllers))
for _, ct := range st.controllers {
for phase, t := range ct.blpStats.PhaseTimings.Histograms() {
result[ct.source.Keyspace+"."+ct.source.Shard+"."+ct.workflow+"."+fmt.Sprintf("%v", ct.id)+"."+phase] = t.Total()
}
}
return result
})
stats.NewCounterFunc(
"VReplicationPhaseTimingsTotal",
"vreplication per phase timings aggregated across all phases and streams",
func() int64 {
st.mu.Lock()
defer st.mu.Unlock()
result := int64(0)
for _, ct := range st.controllers {
for _, t := range ct.blpStats.PhaseTimings.Histograms() {
result += t.Total()
}
}
return result
})

stats.NewGaugesFuncWithMultiLabels(
"VReplicationPhaseTimingsCounts",
"vreplication per phase count of timings per stream",
[]string{"source_keyspace", "source_shard", "workflow", "counts", "phase"},
func() map[string]int64 {
st.mu.Lock()
defer st.mu.Unlock()
result := make(map[string]int64, len(st.controllers))
for _, ct := range st.controllers {
for phase, t := range ct.blpStats.PhaseTimings.Counts() {
result[ct.source.Keyspace+"."+ct.source.Shard+"."+ct.workflow+"."+fmt.Sprintf("%v", ct.id)+"."+phase] = t
}
}
return result
})

stats.NewGaugesFuncWithMultiLabels(
"VReplicationQueryCount",
"vreplication query counts per stream",
[]string{"source_keyspace", "source_shard", "workflow", "counts", "phase"},
func() map[string]int64 {
st.mu.Lock()
defer st.mu.Unlock()
result := make(map[string]int64, len(st.controllers))
for _, ct := range st.controllers {
for label, count := range ct.blpStats.QueryCount.Counts() {
if label == "" {
continue
}
result[ct.source.Keyspace+"."+ct.source.Shard+"."+ct.workflow+"."+fmt.Sprintf("%v", ct.id)+"."+label] = count
}
}
return result
})

stats.NewCounterFunc(
"VReplicationQueryCountTotal",
"vreplication query counts aggregated across all streams",
func() int64 {
st.mu.Lock()
defer st.mu.Unlock()
result := int64(0)
for _, ct := range st.controllers {
for _, count := range ct.blpStats.QueryCount.Counts() {
result += count
}
}
return result
})

stats.NewGaugesFuncWithMultiLabels(
"VReplicationCopyRowCount",
"vreplication rows copied in copy phase per stream",
[]string{"source_keyspace", "source_shard", "workflow", "counts"},
func() map[string]int64 {
st.mu.Lock()
defer st.mu.Unlock()
result := make(map[string]int64, len(st.controllers))
for _, ct := range st.controllers {
result[ct.source.Keyspace+"."+ct.source.Shard+"."+ct.workflow+"."+fmt.Sprintf("%v", ct.id)] = ct.blpStats.CopyRowCount.Get()
}
return result
})

stats.NewCounterFunc(
"VReplicationCopyRowCountTotal",
"vreplication rows copied in copy phase aggregated across all streams",
func() int64 {
st.mu.Lock()
defer st.mu.Unlock()
result := int64(0)
for _, ct := range st.controllers {
result += ct.blpStats.CopyRowCount.Get()
}
return result
})

stats.NewGaugesFuncWithMultiLabels(
"VReplicationCopyLoopCount",
"Number of times the copy phase looped per stream",
[]string{"source_keyspace", "source_shard", "workflow", "counts"},
func() map[string]int64 {
st.mu.Lock()
defer st.mu.Unlock()
result := make(map[string]int64, len(st.controllers))
for _, ct := range st.controllers {
result[ct.source.Keyspace+"."+ct.source.Shard+"."+ct.workflow+"."+fmt.Sprintf("%v", ct.id)] = ct.blpStats.CopyLoopCount.Get()
}
return result
})

stats.NewCounterFunc(
"VReplicationCopyLoopCountTotal",
"Number of times the copy phase looped aggregated across streams",
func() int64 {
st.mu.Lock()
defer st.mu.Unlock()
result := int64(0)
for _, ct := range st.controllers {
result += ct.blpStats.CopyLoopCount.Get()
}
return result
})
}

func (st *vrStats) numControllers() int64 {
Expand Down Expand Up @@ -159,6 +293,10 @@ func (st *vrStats) status() *EngineStatus {
State: ct.blpStats.State.Get(),
SourceTablet: ct.sourceTablet.Get(),
Messages: ct.blpStats.MessageHistory(),
QueryCounts: ct.blpStats.QueryCount.Counts(),
PhaseTimings: ct.blpStats.PhaseTimings.Counts(),
CopyRowCount: ct.blpStats.CopyRowCount.Get(),
CopyLoopCount: ct.blpStats.CopyLoopCount.Get(),
}
i++
}
Expand All @@ -185,6 +323,10 @@ type ControllerStatus struct {
State string
SourceTablet string
Messages []string
QueryCounts map[string]int64
PhaseTimings map[string]int64
CopyRowCount int64
CopyLoopCount int64
}

var vreplicationTemplate = `
Expand Down
44 changes: 44 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/proto/binlogdata"
Expand Down Expand Up @@ -116,3 +117,46 @@ func TestStatusHtml(t *testing.T) {
t.Errorf("output: %v, want %v", buf, wantOut)
}
}

func TestVReplicationStats(t *testing.T) {
blpStats := binlogplayer.NewStats()

testStats := &vrStats{}
testStats.isOpen = true
testStats.controllers = map[int]*controller{
1: {
id: 1,
source: binlogdata.BinlogSource{
Keyspace: "ks",
Shard: "0",
},
blpStats: blpStats,
done: make(chan struct{}),
},
}
testStats.controllers[1].sourceTablet.Set("src1")

sleepTime := 1 * time.Millisecond
record := func(phase string) {
defer blpStats.PhaseTimings.Record(phase, time.Now())
time.Sleep(sleepTime)
}
want := int64(1.2 * float64(sleepTime)) //allow 10% overhead for recording timing

record("fastforward")
require.Greater(t, want, testStats.status().Controllers[0].PhaseTimings["fastforward"])
record("catchup")
require.Greater(t, want, testStats.status().Controllers[0].PhaseTimings["catchup"])
record("copy")
require.Greater(t, want, testStats.status().Controllers[0].PhaseTimings["copy"])

blpStats.QueryCount.Add("replicate", 11)
blpStats.QueryCount.Add("fastforward", 23)
require.Equal(t, int64(11), testStats.status().Controllers[0].QueryCounts["replicate"])
require.Equal(t, int64(23), testStats.status().Controllers[0].QueryCounts["fastforward"])

blpStats.CopyLoopCount.Add(100)
blpStats.CopyRowCount.Add(200)
require.Equal(t, int64(100), testStats.status().Controllers[0].CopyLoopCount)
require.Equal(t, int64(200), testStats.status().Controllers[0].CopyRowCount)
}
24 changes: 20 additions & 4 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ func (vc *vcopier) copyNext(ctx context.Context, settings binlogplayer.VRSetting
func (vc *vcopier) catchup(ctx context.Context, copyState map[string]*sqltypes.Result) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer func() {
vc.vr.stats.PhaseTimings.Record("catchup", time.Now())
}()

settings, err := binlogplayer.ReadVRSettings(vc.vr.dbClient, vc.vr.id)
if err != nil {
Expand All @@ -153,7 +156,7 @@ func (vc *vcopier) catchup(ctx context.Context, copyState map[string]*sqltypes.R
// Start vreplication.
errch := make(chan error, 1)
go func() {
errch <- newVPlayer(vc.vr, settings, copyState, mysql.Position{}).play(ctx)
errch <- newVPlayer(vc.vr, settings, copyState, mysql.Position{}, "catchup").play(ctx)
}()

// Wait for catchup.
Expand Down Expand Up @@ -188,6 +191,10 @@ func (vc *vcopier) catchup(ctx context.Context, copyState map[string]*sqltypes.R
// committed with the lastpk. This allows for consistent resumability.
func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState map[string]*sqltypes.Result) error {
defer vc.vr.dbClient.Rollback()
defer func() {
vc.vr.stats.PhaseTimings.Record("copy", time.Now())
vc.vr.stats.CopyLoopCount.Add(1)
}()

log.Infof("Copying table %s, lastpk: %v", tableName, copyState[tableName])

Expand Down Expand Up @@ -249,9 +256,15 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma
if err := vc.vr.dbClient.Begin(); err != nil {
return err
}

_, err = vc.tablePlan.applyBulkInsert(rows, func(sql string) (*sqltypes.Result, error) {
return vc.vr.dbClient.ExecuteWithRetry(ctx, sql)
start := time.Now()
qr, err := vc.vr.dbClient.ExecuteWithRetry(ctx, sql)
vc.vr.stats.QueryTimings.Record("copy", start)

vc.vr.stats.CopyRowCount.Add(int64(qr.RowsAffected))
vc.vr.stats.QueryCount.Add("copy", 1)

return qr, err
})
if err != nil {
return err
Expand Down Expand Up @@ -304,6 +317,9 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma
}

func (vc *vcopier) fastForward(ctx context.Context, copyState map[string]*sqltypes.Result, gtid string) error {
defer func() {
vc.vr.stats.PhaseTimings.Record("fastforward", time.Now())
}()
pos, err := mysql.DecodePosition(gtid)
if err != nil {
return err
Expand All @@ -317,5 +333,5 @@ func (vc *vcopier) fastForward(ctx context.Context, copyState map[string]*sqltyp
_, err := vc.vr.dbClient.Execute(update)
return err
}
return newVPlayer(vc.vr, settings, copyState, pos).play(ctx)
return newVPlayer(vc.vr, settings, copyState, pos, "fastforward").play(ctx)
}
Loading