Skip to content
This repository has been archived by the owner on Dec 16, 2022. It is now read-only.

Commit

Permalink
Merge branch 'upstream-master' into slack-sync-2019-01-30
Browse files Browse the repository at this point in the history
  • Loading branch information
rafael committed Jan 31, 2019
2 parents a74cdc1 + 5867ebe commit 4a1cdfc
Show file tree
Hide file tree
Showing 53 changed files with 4,277 additions and 615 deletions.
16 changes: 14 additions & 2 deletions go/mysql/binlog_event_rbr.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,14 +853,26 @@ func CellValue(data []byte, pos int, typ byte, metadata uint16, styp querypb.Typ
max := int((((metadata >> 4) & 0x300) ^ 0x300) + (metadata & 0xff))
// Length is encoded in 1 or 2 bytes.
if max > 255 {
// This code path exists due to https://bugs.mysql.com/bug.php?id=37426.
// CHAR types need to allocate 3 bytes per char. So, the length for CHAR(255)
// cannot be represented in 1 byte. This also means that this rule does not
// apply to BINARY data.
l := int(uint64(data[pos]) |
uint64(data[pos+1])<<8)
return sqltypes.MakeTrusted(querypb.Type_VARCHAR,
data[pos+2:pos+2+l]), l + 2, nil
}
l := int(data[pos])
return sqltypes.MakeTrusted(querypb.Type_VARCHAR,
data[pos+1:pos+1+l]), l + 1, nil
mdata := data[pos+1 : pos+1+l]
if sqltypes.IsBinary(styp) {
// Fixed length binaries have to be padded with zeroes
// up to the length of the field. Otherwise, equality checks
// fail against saved data. See https://github.com/vitessio/vitess/issues/3984.
ret := make([]byte, max)
copy(ret, mdata)
return sqltypes.MakeTrusted(querypb.Type_BINARY, ret), l + 1, nil
}
return sqltypes.MakeTrusted(querypb.Type_VARCHAR, mdata), l + 1, nil

case TypeGeometry:
l := 0
Expand Down
34 changes: 20 additions & 14 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func NewBinlogPlayerTables(dbClient DBClient, tablet *topodatapb.Tablet, tables
// If an error is encountered, it updates the vreplication state to "Error".
// If a stop position was specifed, and reached, the state is updated to "Stopped".
func (blp *BinlogPlayer) ApplyBinlogEvents(ctx context.Context) error {
if err := setVReplicationState(blp.dbClient, blp.uid, BlpRunning, ""); err != nil {
if err := SetVReplicationState(blp.dbClient, blp.uid, BlpRunning, ""); err != nil {
log.Errorf("Error writing Running state: %v", err)
}

Expand All @@ -180,7 +180,7 @@ func (blp *BinlogPlayer) ApplyBinlogEvents(ctx context.Context) error {
Time: time.Now(),
Message: msg,
})
if err := setVReplicationState(blp.dbClient, blp.uid, BlpError, msg); err != nil {
if err := SetVReplicationState(blp.dbClient, blp.uid, BlpError, msg); err != nil {
log.Errorf("Error writing stop state: %v", err)
}
return err
Expand All @@ -191,7 +191,7 @@ func (blp *BinlogPlayer) ApplyBinlogEvents(ctx context.Context) error {
// applyEvents returns a recordable status message on termination or an error otherwise.
func (blp *BinlogPlayer) applyEvents(ctx context.Context) error {
// Read starting values for vreplication.
pos, stopPos, maxTPS, maxReplicationLag, err := readVRSettings(blp.dbClient, blp.uid)
pos, stopPos, maxTPS, maxReplicationLag, err := ReadVRSettings(blp.dbClient, blp.uid)
if err != nil {
log.Error(err)
return err
Expand Down Expand Up @@ -244,14 +244,14 @@ func (blp *BinlogPlayer) applyEvents(ctx context.Context) error {
case blp.position.Equal(blp.stopPosition):
msg := fmt.Sprintf("not starting BinlogPlayer, we're already at the desired position %v", blp.stopPosition)
log.Info(msg)
if err := setVReplicationState(blp.dbClient, blp.uid, BlpStopped, msg); err != nil {
if err := SetVReplicationState(blp.dbClient, blp.uid, BlpStopped, msg); err != nil {
log.Errorf("Error writing stop state: %v", err)
}
return nil
case blp.position.AtLeast(blp.stopPosition):
msg := fmt.Sprintf("starting point %v greater than stopping point %v", blp.position, blp.stopPosition)
log.Error(msg)
if err := setVReplicationState(blp.dbClient, blp.uid, BlpStopped, msg); err != nil {
if err := SetVReplicationState(blp.dbClient, blp.uid, BlpStopped, msg); err != nil {
log.Errorf("Error writing stop state: %v", err)
}
// Don't return an error. Otherwise, it will keep retrying.
Expand Down Expand Up @@ -351,7 +351,7 @@ func (blp *BinlogPlayer) applyEvents(ctx context.Context) error {
if blp.position.AtLeast(blp.stopPosition) {
msg := "Reached stopping position, done playing logs"
log.Info(msg)
if err := setVReplicationState(blp.dbClient, blp.uid, BlpStopped, msg); err != nil {
if err := SetVReplicationState(blp.dbClient, blp.uid, BlpStopped, msg); err != nil {
log.Errorf("Error writing stop state: %v", err)
}
return nil
Expand Down Expand Up @@ -447,7 +447,7 @@ func (blp *BinlogPlayer) writeRecoveryPosition(tx *binlogdatapb.BinlogTransactio
}

now := time.Now().Unix()
updateRecovery := updateVReplicationPos(blp.uid, position, now, tx.EventToken.Timestamp)
updateRecovery := GenerateUpdatePos(blp.uid, position, now, tx.EventToken.Timestamp)

qr, err := blp.exec(updateRecovery)
if err != nil {
Expand Down Expand Up @@ -503,18 +503,18 @@ func CreateVReplicationTable() []string {
) ENGINE=InnoDB`}
}

// setVReplicationState updates the state in the _vt.vreplication table.
func setVReplicationState(dbClient DBClient, uid uint32, state, message string) error {
// SetVReplicationState updates the state in the _vt.vreplication table.
func SetVReplicationState(dbClient DBClient, uid uint32, state, message string) error {
query := fmt.Sprintf("update _vt.vreplication set state='%v', message=%v where id=%v", state, encodeString(message), uid)
if _, err := dbClient.ExecuteFetch(query, 1); err != nil {
return fmt.Errorf("could not set state: %v: %v", query, err)
}
return nil
}

// readVRSettings retrieves the throttler settings for
// ReadVRSettings retrieves the throttler settings for
// vreplication from the checkpoint table.
func readVRSettings(dbClient DBClient, uid uint32) (pos, stopPos string, maxTPS, maxReplicationLag int64, err error) {
func ReadVRSettings(dbClient DBClient, uid uint32) (pos, stopPos string, maxTPS, maxReplicationLag int64, err error) {
query := fmt.Sprintf("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=%v", uid)
qr, err := dbClient.ExecuteFetch(query, 1)
if err != nil {
Expand Down Expand Up @@ -554,9 +554,9 @@ func CreateVReplicationStopped(workflow string, source *binlogdatapb.BinlogSourc
encodeString(workflow), encodeString(source.String()), encodeString(position), throttler.MaxRateModuleDisabled, throttler.ReplicationLagModuleDisabled, time.Now().Unix(), BlpStopped)
}

// updateVReplicationPos returns a statement to update a value in the
// GenerateUpdatePos returns a statement to update a value in the
// _vt.vreplication table.
func updateVReplicationPos(uid uint32, pos mysql.Position, timeUpdated int64, txTimestamp int64) string {
func GenerateUpdatePos(uid uint32, pos mysql.Position, timeUpdated int64, txTimestamp int64) string {
if txTimestamp != 0 {
return fmt.Sprintf(
"update _vt.vreplication set pos=%v, time_updated=%v, transaction_timestamp=%v where id=%v",
Expand Down Expand Up @@ -601,11 +601,17 @@ func encodeString(in string) string {
}

// ReadVReplicationPos returns a statement to query the gtid for a
// given shard from the _vt.vreplication table.
// given stream from the _vt.vreplication table.
func ReadVReplicationPos(index uint32) string {
return fmt.Sprintf("select pos from _vt.vreplication where id=%v", index)
}

// ReadVReplicationStatus returns a statement to query the status fields for a
// given stream from the _vt.vreplication table.
func ReadVReplicationStatus(index uint32) string {
return fmt.Sprintf("select pos, state, message from _vt.vreplication where id=%v", index)
}

// StatsHistoryRecord is used to store a Message with timestamp
type StatsHistoryRecord struct {
Time time.Time
Expand Down
14 changes: 11 additions & 3 deletions go/vt/binlog/binlogplayer/binlog_player_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func TestUpdateVReplicationPos(t *testing.T) {
"set pos='MariaDB/0-1-8283', time_updated=88822 " +
"where id=78522"

got := updateVReplicationPos(78522, mysql.Position{GTIDSet: gtid.GTIDSet()}, 88822, 0)
got := GenerateUpdatePos(78522, mysql.Position{GTIDSet: gtid.GTIDSet()}, 88822, 0)
if got != want {
t.Errorf("updateVReplicationPos() = %#v, want %#v", got, want)
}
Expand All @@ -367,7 +367,7 @@ func TestUpdateVReplicationTimestamp(t *testing.T) {
"set pos='MariaDB/0-2-582', time_updated=88822, transaction_timestamp=481828 " +
"where id=78522"

got := updateVReplicationPos(78522, mysql.Position{GTIDSet: gtid.GTIDSet()}, 88822, 481828)
got := GenerateUpdatePos(78522, mysql.Position{GTIDSet: gtid.GTIDSet()}, 88822, 481828)
if got != want {
t.Errorf("updateVReplicationPos() = %#v, want %#v", got, want)
}
Expand All @@ -377,6 +377,14 @@ func TestReadVReplicationPos(t *testing.T) {
want := "select pos from _vt.vreplication where id=482821"
got := ReadVReplicationPos(482821)
if got != want {
t.Errorf("ReadVReplicationThrottlerSettings(482821) = %#v, want %#v", got, want)
t.Errorf("ReadVReplicationPos(482821) = %#v, want %#v", got, want)
}
}

func TestReadVReplicationStatus(t *testing.T) {
want := "select pos, state, message from _vt.vreplication where id=482821"
got := ReadVReplicationStatus(482821)
if got != want {
t.Errorf("ReadVReplicationStatus(482821) = %#v, want %#v", got, want)
}
}
10 changes: 10 additions & 0 deletions go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,16 @@ func (fmd *FakeMysqlDaemon) GetSchema(dbName string, tables, excludeTables []str
return tmutils.FilterTables(fmd.Schema, tables, excludeTables, includeViews)
}

// GetColumns is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) GetColumns(dbName, table string) ([]string, error) {
return []string{}, nil
}

// GetPrimaryKeyColumns is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) GetPrimaryKeyColumns(dbName, table string) ([]string, error) {
return []string{}, nil
}

// PreflightSchemaChange is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) PreflightSchemaChange(dbName string, changes []string) ([]*tabletmanagerdatapb.SchemaChangeResult, error) {
if fmd.PreflightSchemaChangeResult == nil {
Expand Down
2 changes: 2 additions & 0 deletions go/vt/mysqlctl/mysql_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ type MysqlDaemon interface {

// Schema related methods
GetSchema(dbName string, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error)
GetColumns(dbName, table string) ([]string, error)
GetPrimaryKeyColumns(dbName, table string) ([]string, error)
PreflightSchemaChange(dbName string, changes []string) ([]*tabletmanagerdatapb.SchemaChangeResult, error)
ApplySchemaChange(dbName string, change *tmutils.SchemaChange) (*tabletmanagerdatapb.SchemaChangeResult, error)

Expand Down
Loading

0 comments on commit 4a1cdfc

Please sign in to comment.