Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schema tracking: initial schema insert #6435

Merged
merged 6 commits into from
Jul 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions go/mysql/fakesqldb/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ type DB struct {
// connections tracks all open connections.
// The key for the map is the value of mysql.Conn.ConnectionID.
connections map[uint32]*mysql.Conn

// queryPatternUserCallback stores optional callbacks when a query with a pattern is called
queryPatternUserCallback map[*regexp.Regexp]func(string)
}

// QueryHandler is the interface used by the DB to simulate executed queries
Expand Down Expand Up @@ -157,13 +160,14 @@ func New(t *testing.T) *DB {

// Create our DB.
db := &DB{
t: t,
socketFile: socketFile,
name: "fakesqldb",
data: make(map[string]*ExpectedResult),
rejectedData: make(map[string]error),
queryCalled: make(map[string]int),
connections: make(map[uint32]*mysql.Conn),
t: t,
socketFile: socketFile,
name: "fakesqldb",
data: make(map[string]*ExpectedResult),
rejectedData: make(map[string]error),
queryCalled: make(map[string]int),
connections: make(map[uint32]*mysql.Conn),
queryPatternUserCallback: make(map[*regexp.Regexp]func(string)),
}

db.Handler = db
Expand Down Expand Up @@ -344,7 +348,6 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R
defer db.mu.Unlock()
db.queryCalled[key]++
db.querylog = append(db.querylog, key)

// Check if we should close the connection and provoke errno 2013.
if db.shouldClose {
c.Close()
Expand Down Expand Up @@ -384,6 +387,10 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R
// Check query patterns from AddQueryPattern().
for _, pat := range db.patternData {
if pat.expr.MatchString(query) {
userCallback, ok := db.queryPatternUserCallback[pat.expr]
if ok {
userCallback(query)
}
return callback(pat.result)
}
}
Expand Down Expand Up @@ -500,6 +507,13 @@ func (db *DB) AddQueryPattern(queryPattern string, expectedResult *sqltypes.Resu
db.patternData = append(db.patternData, exprResult{expr, &result})
}

// AddQueryPatternWithCallback is similar to AddQueryPattern: in addition it calls the provided callback function
// The callback can be used to set user counters/variables for testing specific usecases
func (db *DB) AddQueryPatternWithCallback(queryPattern string, expectedResult *sqltypes.Result, callback func(string)) {
db.AddQueryPattern(queryPattern, expectedResult)
db.queryPatternUserCallback[db.patternData[len(db.patternData)-1].expr] = callback
}

// DeleteQuery deletes query from the fake DB.
func (db *DB) DeleteQuery(query string) {
db.mu.Lock()
Expand Down
12 changes: 10 additions & 2 deletions go/test/endtoend/cluster/vtctlclient_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,15 @@ type VtctlClientProcess struct {

// InitShardMaster executes vtctlclient command to make one of tablet as master
func (vtctlclient *VtctlClientProcess) InitShardMaster(Keyspace string, Shard string, Cell string, TabletUID int) (err error) {
return vtctlclient.ExecuteCommand(
output, err := vtctlclient.ExecuteCommandWithOutput(
"InitShardMaster",
"-force",
fmt.Sprintf("%s/%s", Keyspace, Shard),
fmt.Sprintf("%s-%d", Cell, TabletUID))
if err != nil {
log.Errorf("error in InitShardMaster output %s, err %s", output, err.Error())
}
return err
}

// ApplySchema applies SQL schema to the keyspace
Expand Down Expand Up @@ -73,7 +77,11 @@ func (vtctlclient *VtctlClientProcess) ExecuteCommand(args ...string) (err error
pArgs...,
)
log.Infof("Executing vtctlclient with command: %v", strings.Join(tmpProcess.Args, " "))
return tmpProcess.Run()
output, err := tmpProcess.Output()
if err != nil {
log.Errorf("Error executing %s: output %s, err %v", strings.Join(tmpProcess.Args, " "), output, err)
}
return err
}

// ExecuteCommandWithOutput executes any vtctlclient command and returns output
Expand Down
88 changes: 0 additions & 88 deletions go/vt/vttablet/endtoend/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,94 +320,6 @@ func TestSchemaVersioning(t *testing.T) {
log.Info("=== END OF TEST")
}

func TestSchemaVersioningLongDDL(t *testing.T) {
// Let's disable the already running tracker to prevent it from
// picking events from the previous test, and then re-enable it at the end.
tsv := framework.Server
tsv.EnableHistorian(false)
tsv.SetTracking(false)
defer tsv.EnableHistorian(true)
defer tsv.SetTracking(true)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tsv.EnableHistorian(true)
tsv.SetTracking(true)

target := &querypb.Target{
Keyspace: "vttest",
Shard: "0",
TabletType: tabletpb.TabletType_MASTER,
Cell: "",
}
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "/.*/",
}},
}
longDDL := "create table vitess_version ("
for i := 0; i < 100; i++ {
col := fmt.Sprintf("id%d_%s int", i, strings.Repeat("0", 10))
if i != 99 {
col += ", "
}
longDDL += col
}
longDDL += ")"

var cases = []test{
{
query: longDDL,
output: append(append([]string{
`gtid`, //gtid+other => vstream current pos
`other`,
`gtid`, //gtid+ddl => actual query
fmt.Sprintf(`type:DDL ddl:"%s" `, longDDL)},
getSchemaVersionTableCreationEvents()...),
`version`,
`gtid`,
),
},
}
eventCh := make(chan []*binlogdatapb.VEvent)
var startPos string
send := func(events []*binlogdatapb.VEvent) error {
var evs []*binlogdatapb.VEvent
for _, event := range events {
if event.Type == binlogdatapb.VEventType_GTID {
if startPos == "" {
startPos = event.Gtid
}
}
if event.Type == binlogdatapb.VEventType_HEARTBEAT {
continue
}
log.Infof("Received event %v", event)
evs = append(evs, event)
}
select {
case eventCh <- evs:
case <-ctx.Done():
return nil
}
return nil
}
go func() {
defer close(eventCh)
if err := tsv.VStream(ctx, target, "current", nil, filter, send); err != nil {
fmt.Printf("Error in tsv.VStream: %v", err)
t.Error(err)
}
}()
runCases(ctx, t, cases, eventCh)

cancel()

client := framework.NewClient()
client.Execute("drop table vitess_version", nil)
client.Execute("drop table _vt.schema_version", nil)
}

func runCases(ctx context.Context, t *testing.T, tests []test, eventCh chan []*binlogdatapb.VEvent) {
client := framework.NewClient()

Expand Down
64 changes: 63 additions & 1 deletion go/vt/vttablet/tabletserver/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"sync"
"time"

"vitess.io/vitess/go/mysql"

"vitess.io/vitess/go/vt/sqlparser"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -97,6 +99,7 @@ func (tr *Tracker) Open() {
tr.cancel = cancel
tr.wg.Add(1)
log.Info("Schema tracker enabled.")

go tr.process(ctx)
}

Expand Down Expand Up @@ -130,6 +133,10 @@ func (tr *Tracker) Enable(enabled bool) {
func (tr *Tracker) process(ctx context.Context) {
defer tr.env.LogError()
defer tr.wg.Done()
if err := tr.possiblyInsertInitialSchema(ctx); err != nil {
log.Errorf("possiblyInsertInitialSchema eror: %v", err)
return
}

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Expand Down Expand Up @@ -163,14 +170,69 @@ func (tr *Tracker) process(ctx context.Context) {
}
}

func (tr *Tracker) currentPosition(ctx context.Context) (mysql.Position, error) {
conn, err := tr.engine.cp.Connect(ctx)
if err != nil {
return mysql.Position{}, err
}
defer conn.Close()
return conn.MasterPosition()
}

func (tr *Tracker) isSchemaVersionTableEmpty(ctx context.Context) (bool, error) {
conn, err := tr.engine.GetConnection(ctx)
if err != nil {
return false, err
}
defer conn.Recycle()
result, err := withDDL.Exec(ctx, "select id from _vt.schema_version limit 1", conn.Exec)
if err != nil {
return false, err
}
if len(result.Rows) == 0 {
return true, nil
}
return false, nil
}

// possiblyInsertInitialSchema stores the latest schema when a tracker starts and the schema_version table is empty
// this enables the right schema to be available between the time the tracker starts first and the first DDL is applied
func (tr *Tracker) possiblyInsertInitialSchema(ctx context.Context) error {
var err error
needsWarming, err := tr.isSchemaVersionTableEmpty(ctx)
if err != nil {
return err
}
if !needsWarming { // _vt.schema_version is not empty, nothing to do here
return nil
}
if err = tr.engine.Reload(ctx); err != nil {
return err
}

timestamp := time.Now().UnixNano() / 1e9
ddl := ""
pos, err := tr.currentPosition(ctx)
if err != nil {
return err
}
gtid := mysql.EncodePosition(pos)
log.Infof("Saving initial schema for gtid %s", gtid)

return tr.saveCurrentSchemaToDb(ctx, gtid, ddl, timestamp)
}

func (tr *Tracker) schemaUpdated(gtid string, ddl string, timestamp int64) error {
log.Infof("Processing schemaUpdated event for gtid %s, ddl %s", gtid, ddl)
if gtid == "" || ddl == "" {
return fmt.Errorf("got invalid gtid or ddl in schemaUpdated")
}
ctx := context.Background()

// Engine will have reloaded the schema because vstream will reload it on a DDL
return tr.saveCurrentSchemaToDb(ctx, gtid, ddl, timestamp)
}

func (tr *Tracker) saveCurrentSchemaToDb(ctx context.Context, gtid, ddl string, timestamp int64) error {
tables := tr.engine.GetSchema()
dbSchema := &binlogdatapb.MinimalSchema{
Tables: []*binlogdatapb.MinimalTable{},
Expand Down
63 changes: 58 additions & 5 deletions go/vt/vttablet/tabletserver/schema/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,35 @@ package schema
import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"golang.org/x/net/context"
"vitess.io/vitess/go/sqltypes"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
)

func TestTracker(t *testing.T) {
initialSchemaInserted := false
se, db, cancel := getTestSchemaEngine(t)
defer cancel()

gtid1 := "MySQL56/7b04699f-f5e9-11e9-bf88-9cb6d089e1c3:1-10"
ddl1 := "create table tracker_test (id int)"
query := "CREATE TABLE IF NOT EXISTS _vt.schema_version.*"
db.AddQueryPattern(query, &sqltypes.Result{})

db.AddQueryPattern("insert into _vt.schema_version.*", &sqltypes.Result{})

db.AddQueryPattern("insert into _vt.schema_version.*1-10.*", &sqltypes.Result{})
db.AddQueryPatternWithCallback("insert into _vt.schema_version.*1-3.*", &sqltypes.Result{}, func(query string) {
initialSchemaInserted = true
})
// simulates empty schema_version table, so initial schema should be inserted
db.AddQuery("select id from _vt.schema_version limit 1", &sqltypes.Result{Rows: [][]sqltypes.Value{}})
// called to get current position
db.AddQuery("SELECT @@GLOBAL.gtid_executed", sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"",
"varchar"),
"7b04699f-f5e9-11e9-bf88-9cb6d089e1c3:1-3",
))
vs := &fakeVstreamer{
done: make(chan struct{}),
events: [][]*binlogdatapb.VEvent{{
Expand Down Expand Up @@ -74,7 +85,49 @@ func TestTracker(t *testing.T) {
tracker.Close()
// Two of those events should have caused an error.
final := env.Stats().ErrorCounters.Counts()["INTERNAL"]
assert.Equal(t, initial+2, final)
require.Equal(t, initial+2, final)
require.True(t, initialSchemaInserted)
}

func TestTrackerShouldNotInsertInitialSchema(t *testing.T) {
initialSchemaInserted := false
se, db, cancel := getTestSchemaEngine(t)
gtid1 := "MySQL56/7b04699f-f5e9-11e9-bf88-9cb6d089e1c3:1-10"

defer cancel()
// simulates existing rows in schema_version, so initial schema should not be inserted
db.AddQuery("select id from _vt.schema_version limit 1", sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id",
"int"),
"1",
))
// called to get current position
db.AddQuery("SELECT @@GLOBAL.gtid_executed", sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"",
"varchar"),
"7b04699f-f5e9-11e9-bf88-9cb6d089e1c3:1-3",
))
db.AddQueryPatternWithCallback("insert into _vt.schema_version.*1-3.*", &sqltypes.Result{}, func(query string) {
initialSchemaInserted = true
})
vs := &fakeVstreamer{
done: make(chan struct{}),
events: [][]*binlogdatapb.VEvent{{
{
Type: binlogdatapb.VEventType_GTID,
Gtid: gtid1,
},
}},
}
config := se.env.Config()
config.TrackSchemaVersions = true
env := tabletenv.NewEnv(config, "TrackerTest")
tracker := NewTracker(env, vs, se)
tracker.Open()
<-vs.done
cancel()
tracker.Close()
require.False(t, initialSchemaInserted)
}

var _ VStreamer = (*fakeVstreamer)(nil)
Expand Down