diff --git a/go/vt/binlog/binlogplayer/binlog_player.go b/go/vt/binlog/binlogplayer/binlog_player.go index e231e503c71..501bc01b04b 100644 --- a/go/vt/binlog/binlogplayer/binlog_player.go +++ b/go/vt/binlog/binlogplayer/binlog_player.go @@ -613,6 +613,17 @@ func GenerateUpdatePos(uid uint32, pos mysql.Position, timeUpdated int64, txTime encodeString(mysql.EncodePosition(pos)), timeUpdated, uid) } +// GenerateUpdateTime returns a statement to update time_updated and transaction_timestamp in the +// _vt.vreplication table. +func GenerateUpdateTime(uid uint32, timeUpdated int64, txTimestamp int64) (string, error) { + if timeUpdated == 0 || txTimestamp == 0 { + return "", fmt.Errorf("invalid timeUpdated or txTimestamp supplied") + } + return fmt.Sprintf( + "update _vt.vreplication set time_updated=%v, transaction_timestamp=%v where id=%v", + timeUpdated, txTimestamp, uid), nil +} + // StartVReplication returns a statement to start the replication. func StartVReplication(uid uint32) string { return fmt.Sprintf( diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index e93ae5bdc42..4ac5e3d8082 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -458,8 +458,16 @@ func expectDBClientQueries(t *testing.T, queries []string) { continue } var got string + heartbeatRe := regexp.MustCompile(`update _vt.vreplication set time_updated=\d+, transaction_timestamp=\d+ where id=\d+`) + retry: select { case got = <-globalDBQueries: + // We rule out heartbeat time update queries because otherwise our query list + // is indeterminable and varies with each test execution. + if heartbeatRe.MatchString(got) { + goto retry + } + var match bool if query[0] == '/' { result, err := regexp.MatchString(query[1:], got) @@ -493,6 +501,7 @@ func expectDBClientQueries(t *testing.T, queries []string) { func expectNontxQueries(t *testing.T, queries []string) { t.Helper() failed := false + heartbeatRe := regexp.MustCompile(`update _vt.vreplication set time_updated=\d+, transaction_timestamp=\d+ where id=\d+`) for i, query := range queries { if failed { t.Errorf("no query received, expecting %s", query) @@ -503,7 +512,7 @@ func expectNontxQueries(t *testing.T, queries []string) { select { case got = <-globalDBQueries: - if got == "begin" || got == "commit" || got == "rollback" || strings.Contains(got, "update _vt.vreplication set pos") { + if got == "begin" || got == "commit" || got == "rollback" || strings.Contains(got, "update _vt.vreplication set pos") || heartbeatRe.MatchString(got) { goto retry } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index ba442fea3c4..847276ec380 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -243,6 +243,21 @@ func (vp *vplayer) updatePos(ts int64) (posReached bool, err error) { return posReached, nil } +func (vp *vplayer) updateTime(ts int64) (err error) { + update, err := binlogplayer.GenerateUpdateTime(vp.vr.id, time.Now().Unix(), ts) + if err != nil { + return err + } + if _, err := vp.vr.dbClient.Execute(update); err != nil { + return fmt.Errorf("error %v updating time", err) + } + vp.unsavedEvent = nil + vp.timeLastSaved = time.Now() + return nil +} + +// applyEvents is the main thread that applies the events. It has the following use + // applyEvents is the main thread that applies the events. It has the following use // cases to take into account: // * Normal transaction that has row mutations. In this case, the transaction @@ -580,7 +595,12 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m stats.Send(fmt.Sprintf("%v", event.Journal)) return io.EOF case binlogdatapb.VEventType_HEARTBEAT: - // No-op: heartbeat timings are calculated in outer loop. + if !vp.vr.dbClient.InTransaction { + err := vp.updateTime(event.Timestamp) + if err != nil { + return err + } + } } return nil } diff --git a/go/vt/wrangler/vexec.go b/go/vt/wrangler/vexec.go index f7b64a2738b..72e8ab716af 100644 --- a/go/vt/wrangler/vexec.go +++ b/go/vt/wrangler/vexec.go @@ -254,6 +254,8 @@ type ReplicationStatusResult struct { SourceLocation ReplicationLocation // TargetLocation represents the keyspace and shards that we are vreplicating into. TargetLocation ReplicationLocation + // MaxVReplicationLag represents the maximum vreplication lag seen across all shards. + MaxVReplicationLag int64 // Statuses is a map of / : ShardReplicationStatus (for the given shard). ShardStatuses map[string]*ShardReplicationStatus @@ -300,6 +302,8 @@ type ReplicationStatus struct { MaxReplicationLag int64 // DbName represents the db_name column from the _vt.vreplication table. DBName string + // TransactionTimestamp represents the transaction_timestamp column from the _vt.vreplication table. + TransactionTimestamp int64 // TimeUpdated represents the time_updated column from the _vt.vreplication table. TimeUpdated int64 // Message represents the message column from the _vt.vreplication table. @@ -311,7 +315,7 @@ type ReplicationStatus struct { func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row []sqltypes.Value, master *topo.TabletInfo) (*ReplicationStatus, string, error) { var err error - var id, maxReplicationLag, timeUpdated int64 + var id, maxReplicationLag, timeUpdated, transactionTimestamp int64 var state, dbName, pos, stopPos, message string var bls binlogdatapb.BinlogSource id, err = evalengine.ToInt64(row[0]) @@ -333,19 +337,24 @@ func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row []sqlty if err != nil { return nil, "", err } - message = row[8].ToString() + transactionTimestamp, err = evalengine.ToInt64(row[8]) + if err != nil { + return nil, "", err + } + message = row[9].ToString() status := &ReplicationStatus{ - Shard: master.Shard, - Tablet: master.AliasString(), - ID: id, - Bls: bls, - Pos: pos, - StopPos: stopPos, - State: state, - DBName: dbName, - MaxReplicationLag: maxReplicationLag, - TimeUpdated: timeUpdated, - Message: message, + Shard: master.Shard, + Tablet: master.AliasString(), + ID: id, + Bls: bls, + Pos: pos, + StopPos: stopPos, + State: state, + DBName: dbName, + MaxReplicationLag: maxReplicationLag, + TransactionTimestamp: transactionTimestamp, + TimeUpdated: timeUpdated, + Message: message, } status.CopyState, err = wr.getCopyState(ctx, master, id) if err != nil { @@ -361,7 +370,7 @@ func (wr *Wrangler) getStreams(ctx context.Context, workflow, keyspace string) ( rsr.ShardStatuses = make(map[string]*ShardReplicationStatus) rsr.Workflow = workflow var results map[*topo.TabletInfo]*querypb.QueryResult - query := "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, message from _vt.vreplication" + query := "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, message from _vt.vreplication" results, err := wr.runVexec(ctx, workflow, keyspace, query, false) if err != nil { return nil, err @@ -384,8 +393,13 @@ func (wr *Wrangler) getStreams(ctx context.Context, workflow, keyspace string) ( } sourceKeyspace = sk sourceShards.Insert(status.Bls.Shard) - rsrStatus = append(rsrStatus, status) + + transactionTimestamp := time.Unix(status.TransactionTimestamp, 0) + replicationLag := time.Since(transactionTimestamp) + if replicationLag.Seconds() > float64(rsr.MaxVReplicationLag) { + rsr.MaxVReplicationLag = int64(replicationLag.Seconds()) + } } si, err := wr.ts.GetShard(ctx, keyspace, master.Shard) if err != nil { diff --git a/go/vt/wrangler/vexec_test.go b/go/vt/wrangler/vexec_test.go index cb38d1570a2..415158041c6 100644 --- a/go/vt/wrangler/vexec_test.go +++ b/go/vt/wrangler/vexec_test.go @@ -19,12 +19,14 @@ package wrangler import ( "context" "fmt" + "regexp" "sort" "strings" "testing" "time" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/test/utils" "vitess.io/vitess/go/vt/logutil" @@ -205,6 +207,7 @@ func TestWorkflowListStreams(t *testing.T) { "80-" ] }, + "MaxVReplicationLag": 0, "ShardStatuses": { "-80/zone1-0000000200": { "MasterReplicationStatuses": [ @@ -228,6 +231,7 @@ func TestWorkflowListStreams(t *testing.T) { "State": "Copying", "MaxReplicationLag": 0, "DBName": "vt_target", + "TransactionTimestamp": 0, "TimeUpdated": 1234, "Message": "", "CopyState": [ @@ -263,6 +267,7 @@ func TestWorkflowListStreams(t *testing.T) { "State": "Copying", "MaxReplicationLag": 0, "DBName": "vt_target", + "TransactionTimestamp": 0, "TimeUpdated": 1234, "Message": "", "CopyState": [ @@ -280,7 +285,11 @@ func TestWorkflowListStreams(t *testing.T) { } ` - require.Equal(t, want, logger.String()) + got := logger.String() + // MaxVReplicationLag needs to be reset. This can't be determinable in this kind of a test because time.Now() is constantly shifting. + re := regexp.MustCompile(`"MaxVReplicationLag": \d+`) + got = re.ReplaceAllLiteralString(got, `"MaxVReplicationLag": 0`) + require.Equal(t, want, got) results, err := wr.execWorkflowAction(ctx, workflow, keyspace, "stop", false) require.Nil(t, err) diff --git a/go/vt/wrangler/wrangler_env_test.go b/go/vt/wrangler/wrangler_env_test.go index 497af49f336..16116f85412 100644 --- a/go/vt/wrangler/wrangler_env_test.go +++ b/go/vt/wrangler/wrangler_env_test.go @@ -144,11 +144,11 @@ func newWranglerTestEnv(sourceShards, targetShards []string, query string, posit env.tmc.setVRResults(master.tablet, "insert into _vt.vreplication(state, workflow, db_name) values ('Running', 'wk1', 'ks1'), ('Stopped', 'wk1', 'ks1')", &sqltypes.Result{RowsAffected: 2}) result := sqltypes.MakeTestResult(sqltypes.MakeTestFields( - "id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|message", - "int64|varchar|varchar|varchar|int64|varchar|varchar|int64|varchar"), - fmt.Sprintf("1|%v|pos||0|Running|vt_target|1234|", bls), + "id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|message", + "int64|varchar|varchar|varchar|int64|varchar|varchar|int64|int64|varchar"), + fmt.Sprintf("1|%v|pos||0|Running|vt_target|1234|0|", bls), ) - env.tmc.setVRResults(master.tablet, "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, message from _vt.vreplication where db_name = 'vt_target' and workflow = 'wrWorkflow'", result) + env.tmc.setVRResults(master.tablet, "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, message from _vt.vreplication where db_name = 'vt_target' and workflow = 'wrWorkflow'", result) env.tmc.setVRResults( master.tablet, "select source, pos from _vt.vreplication where db_name='vt_target' and workflow='wrWorkflow'",