Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update VReplication Timestamp w/ Heartbeat #6635

Merged
11 changes: 11 additions & 0 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,17 @@ func GenerateUpdatePos(uid uint32, pos mysql.Position, timeUpdated int64, txTime
encodeString(mysql.EncodePosition(pos)), timeUpdated, uid)
}

// GenerateUpdateTime returns a statement to update time_updated and transaction_timestamp in the
// _vt.vreplication table.
func GenerateUpdateTime(uid uint32, timeUpdated int64, txTimestamp int64) (string, error) {
if timeUpdated == 0 || txTimestamp == 0 {
return "", fmt.Errorf("invalid timeUpdated or txTimestamp supplied")
}
return fmt.Sprintf(
"update _vt.vreplication set time_updated=%v, transaction_timestamp=%v where id=%v",
timeUpdated, txTimestamp, uid), nil
}

// StartVReplication returns a statement to start the replication.
func StartVReplication(uid uint32) string {
return fmt.Sprintf(
Expand Down
11 changes: 10 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,16 @@ func expectDBClientQueries(t *testing.T, queries []string) {
continue
}
var got string
heartbeatRe := regexp.MustCompile(`update _vt.vreplication set time_updated=\d+, transaction_timestamp=\d+ where id=\d+`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

possibly make the query case insensitive? Granted, _vt.vreplication is case sensitive, but all other elements, keywords & columns, are not.

Suggested change
heartbeatRe := regexp.MustCompile(`update _vt.vreplication set time_updated=\d+, transaction_timestamp=\d+ where id=\d+`)
heartbeatRe := regexp.MustCompile(`(?i)update _vt.vreplication set time_updated=\d+, transaction_timestamp=\d+ where id=\d+`)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case we are the ones writing the heartbeat queries and they are always written as lower case. If there actually was a query written where that wasn't the case, it would be a sure sign that it was not a heartbeat update query. Therefore, I think it being case sensitive is a better catch method.

retry:
select {
case got = <-globalDBQueries:
// We rule out heartbeat time update queries because otherwise our query list
// is indeterminable and varies with each test execution.
if heartbeatRe.MatchString(got) {
goto retry
}

var match bool
if query[0] == '/' {
result, err := regexp.MatchString(query[1:], got)
Expand Down Expand Up @@ -493,6 +501,7 @@ func expectDBClientQueries(t *testing.T, queries []string) {
func expectNontxQueries(t *testing.T, queries []string) {
t.Helper()
failed := false
heartbeatRe := regexp.MustCompile(`update _vt.vreplication set time_updated=\d+, transaction_timestamp=\d+ where id=\d+`)
for i, query := range queries {
if failed {
t.Errorf("no query received, expecting %s", query)
Expand All @@ -503,7 +512,7 @@ func expectNontxQueries(t *testing.T, queries []string) {
select {
case got = <-globalDBQueries:

if got == "begin" || got == "commit" || got == "rollback" || strings.Contains(got, "update _vt.vreplication set pos") {
if got == "begin" || got == "commit" || got == "rollback" || strings.Contains(got, "update _vt.vreplication set pos") || heartbeatRe.MatchString(got) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps the query gets lower cased beforehand? Otherwise, BEGIN, COMMIT, ROLLBACK should also be accepted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These were already here so I assume whoever built this had some wisdom with writing this catch. I believe that we always write them as lowercase (in vplayer), so this is safe. It's also just testing logic so if it breaks at some point, whoever broke it (by changing the casing that vplayer writes queries with) could adjust it then to fit their needs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any risk at all for some query to contain the text update _vt.vreplication set pos? Does it make sense to convert strings.Contains() to strings.HasPrefix()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good question. I'm not sure why the original author of this test helper used Contains. It might be in an effort to strip potential whitespace? Or potentially to strip leading / which somehow end up in some of the test queries at the start?

goto retry
}

Expand Down
22 changes: 21 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,21 @@ func (vp *vplayer) updatePos(ts int64) (posReached bool, err error) {
return posReached, nil
}

func (vp *vplayer) updateTime(ts int64) (err error) {
update, err := binlogplayer.GenerateUpdateTime(vp.vr.id, time.Now().Unix(), ts)
if err != nil {
return err
}
if _, err := vp.vr.dbClient.Execute(update); err != nil {
return fmt.Errorf("error %v updating time", err)
}
vp.unsavedEvent = nil
vp.timeLastSaved = time.Now()
return nil
}

// applyEvents is the main thread that applies the events. It has the following use

// applyEvents is the main thread that applies the events. It has the following use
// cases to take into account:
// * Normal transaction that has row mutations. In this case, the transaction
Expand Down Expand Up @@ -580,7 +595,12 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
stats.Send(fmt.Sprintf("%v", event.Journal))
return io.EOF
case binlogdatapb.VEventType_HEARTBEAT:
// No-op: heartbeat timings are calculated in outer loop.
if !vp.vr.dbClient.InTransaction {
err := vp.updateTime(event.Timestamp)
if err != nil {
return err
}
}
}
return nil
}
44 changes: 29 additions & 15 deletions go/vt/wrangler/vexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ type ReplicationStatusResult struct {
SourceLocation ReplicationLocation
// TargetLocation represents the keyspace and shards that we are vreplicating into.
TargetLocation ReplicationLocation
// MaxVReplicationLag represents the maximum vreplication lag seen across all shards.
MaxVReplicationLag int64

// Statuses is a map of <shard>/<master tablet alias> : ShardReplicationStatus (for the given shard).
ShardStatuses map[string]*ShardReplicationStatus
Expand Down Expand Up @@ -300,6 +302,8 @@ type ReplicationStatus struct {
MaxReplicationLag int64
// DbName represents the db_name column from the _vt.vreplication table.
DBName string
// TransactionTimestamp represents the transaction_timestamp column from the _vt.vreplication table.
TransactionTimestamp int64
// TimeUpdated represents the time_updated column from the _vt.vreplication table.
TimeUpdated int64
// Message represents the message column from the _vt.vreplication table.
Expand All @@ -311,7 +315,7 @@ type ReplicationStatus struct {

func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row []sqltypes.Value, master *topo.TabletInfo) (*ReplicationStatus, string, error) {
var err error
var id, maxReplicationLag, timeUpdated int64
var id, maxReplicationLag, timeUpdated, transactionTimestamp int64
var state, dbName, pos, stopPos, message string
var bls binlogdatapb.BinlogSource
id, err = evalengine.ToInt64(row[0])
Expand All @@ -333,19 +337,24 @@ func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row []sqlty
if err != nil {
return nil, "", err
}
message = row[8].ToString()
transactionTimestamp, err = evalengine.ToInt64(row[8])
if err != nil {
return nil, "", err
}
message = row[9].ToString()
status := &ReplicationStatus{
Shard: master.Shard,
Tablet: master.AliasString(),
ID: id,
Bls: bls,
Pos: pos,
StopPos: stopPos,
State: state,
DBName: dbName,
MaxReplicationLag: maxReplicationLag,
TimeUpdated: timeUpdated,
Message: message,
Shard: master.Shard,
Tablet: master.AliasString(),
ID: id,
Bls: bls,
Pos: pos,
StopPos: stopPos,
State: state,
DBName: dbName,
MaxReplicationLag: maxReplicationLag,
TransactionTimestamp: transactionTimestamp,
TimeUpdated: timeUpdated,
Message: message,
}
status.CopyState, err = wr.getCopyState(ctx, master, id)
if err != nil {
Expand All @@ -361,7 +370,7 @@ func (wr *Wrangler) getStreams(ctx context.Context, workflow, keyspace string) (
rsr.ShardStatuses = make(map[string]*ShardReplicationStatus)
rsr.Workflow = workflow
var results map[*topo.TabletInfo]*querypb.QueryResult
query := "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, message from _vt.vreplication"
query := "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, message from _vt.vreplication"
results, err := wr.runVexec(ctx, workflow, keyspace, query, false)
if err != nil {
return nil, err
Expand All @@ -384,8 +393,13 @@ func (wr *Wrangler) getStreams(ctx context.Context, workflow, keyspace string) (
}
sourceKeyspace = sk
sourceShards.Insert(status.Bls.Shard)

rsrStatus = append(rsrStatus, status)

transactionTimestamp := time.Unix(status.TransactionTimestamp, 0)
replicationLag := time.Since(transactionTimestamp)
if replicationLag.Seconds() > float64(rsr.MaxVReplicationLag) {
rsr.MaxVReplicationLag = int64(replicationLag.Seconds())
}
}
si, err := wr.ts.GetShard(ctx, keyspace, master.Shard)
if err != nil {
Expand Down
11 changes: 10 additions & 1 deletion go/vt/wrangler/vexec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ package wrangler
import (
"context"
"fmt"
"regexp"
"sort"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/logutil"
Expand Down Expand Up @@ -205,6 +207,7 @@ func TestWorkflowListStreams(t *testing.T) {
"80-"
]
},
"MaxVReplicationLag": 0,
"ShardStatuses": {
"-80/zone1-0000000200": {
"MasterReplicationStatuses": [
Expand All @@ -228,6 +231,7 @@ func TestWorkflowListStreams(t *testing.T) {
"State": "Copying",
"MaxReplicationLag": 0,
"DBName": "vt_target",
"TransactionTimestamp": 0,
"TimeUpdated": 1234,
"Message": "",
"CopyState": [
Expand Down Expand Up @@ -263,6 +267,7 @@ func TestWorkflowListStreams(t *testing.T) {
"State": "Copying",
"MaxReplicationLag": 0,
"DBName": "vt_target",
"TransactionTimestamp": 0,
"TimeUpdated": 1234,
"Message": "",
"CopyState": [
Expand All @@ -280,7 +285,11 @@ func TestWorkflowListStreams(t *testing.T) {
}

`
require.Equal(t, want, logger.String())
got := logger.String()
// MaxVReplicationLag needs to be reset. This can't be determinable in this kind of a test because time.Now() is constantly shifting.
re := regexp.MustCompile(`"MaxVReplicationLag": \d+`)
got = re.ReplaceAllLiteralString(got, `"MaxVReplicationLag": 0`)
require.Equal(t, want, got)

results, err := wr.execWorkflowAction(ctx, workflow, keyspace, "stop", false)
require.Nil(t, err)
Expand Down
8 changes: 4 additions & 4 deletions go/vt/wrangler/wrangler_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,11 @@ func newWranglerTestEnv(sourceShards, targetShards []string, query string, posit
env.tmc.setVRResults(master.tablet, "insert into _vt.vreplication(state, workflow, db_name) values ('Running', 'wk1', 'ks1'), ('Stopped', 'wk1', 'ks1')", &sqltypes.Result{RowsAffected: 2})

result := sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|message",
"int64|varchar|varchar|varchar|int64|varchar|varchar|int64|varchar"),
fmt.Sprintf("1|%v|pos||0|Running|vt_target|1234|", bls),
"id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|message",
"int64|varchar|varchar|varchar|int64|varchar|varchar|int64|int64|varchar"),
fmt.Sprintf("1|%v|pos||0|Running|vt_target|1234|0|", bls),
)
env.tmc.setVRResults(master.tablet, "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, message from _vt.vreplication where db_name = 'vt_target' and workflow = 'wrWorkflow'", result)
env.tmc.setVRResults(master.tablet, "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, message from _vt.vreplication where db_name = 'vt_target' and workflow = 'wrWorkflow'", result)
env.tmc.setVRResults(
master.tablet,
"select source, pos from _vt.vreplication where db_name='vt_target' and workflow='wrWorkflow'",
Expand Down