Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
yibin87 authored Jan 24, 2022
2 parents 091a5e4 + 18fc286 commit 55f0309
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 93 deletions.
50 changes: 41 additions & 9 deletions br/pkg/lightning/mydump/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ import (
"context"
"path/filepath"
"sort"
"strings"

"github.com/pingcap/errors"
filter "github.com/pingcap/tidb-tools/pkg/table-filter"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/storage"
Expand All @@ -30,12 +32,30 @@ import (

type MDDatabaseMeta struct {
Name string
SchemaFile string
SchemaFile FileInfo
Tables []*MDTableMeta
Views []*MDTableMeta
charSet string
}

func (m *MDDatabaseMeta) GetSchema(ctx context.Context, store storage.ExternalStorage) (string, error) {
schema, err := ExportStatement(ctx, store, m.SchemaFile, m.charSet)
if err != nil {
log.L().Warn("failed to extract table schema",
zap.String("Path", m.SchemaFile.FileMeta.Path),
log.ShortError(err),
)
schema = nil
}
schemaStr := strings.TrimSpace(string(schema))
// set default if schema sql is empty
if len(schemaStr) == 0 {
schemaStr = "CREATE DATABASE IF NOT EXISTS " + common.EscapeIdentifier(m.Name)
}

return schemaStr, nil
}

type MDTableMeta struct {
DB string
Name string
Expand Down Expand Up @@ -219,7 +239,7 @@ func (s *mdLoaderSetup) setup(ctx context.Context, store storage.ExternalStorage
// setup database schema
if len(s.dbSchemas) != 0 {
for _, fileInfo := range s.dbSchemas {
if _, dbExists := s.insertDB(fileInfo.TableName.Schema, fileInfo.FileMeta.Path); dbExists && s.loader.router == nil {
if _, dbExists := s.insertDB(fileInfo); dbExists && s.loader.router == nil {
return errors.Errorf("invalid database schema file, duplicated item - %s", fileInfo.FileMeta.Path)
}
}
Expand Down Expand Up @@ -406,23 +426,29 @@ func (s *mdLoaderSetup) route() error {
return nil
}

func (s *mdLoaderSetup) insertDB(dbName string, path string) (*MDDatabaseMeta, bool) {
dbIndex, ok := s.dbIndexMap[dbName]
func (s *mdLoaderSetup) insertDB(f FileInfo) (*MDDatabaseMeta, bool) {
dbIndex, ok := s.dbIndexMap[f.TableName.Schema]
if ok {
return s.loader.dbs[dbIndex], true
}
s.dbIndexMap[dbName] = len(s.loader.dbs)
s.dbIndexMap[f.TableName.Schema] = len(s.loader.dbs)
ptr := &MDDatabaseMeta{
Name: dbName,
SchemaFile: path,
Name: f.TableName.Schema,
SchemaFile: f,
charSet: s.loader.charSet,
}
s.loader.dbs = append(s.loader.dbs, ptr)
return ptr, false
}

func (s *mdLoaderSetup) insertTable(fileInfo FileInfo) (*MDTableMeta, bool, bool) {
dbMeta, dbExists := s.insertDB(fileInfo.TableName.Schema, "")
dbFileInfo := FileInfo{
TableName: filter.Table{
Schema: fileInfo.TableName.Schema,
},
FileMeta: SourceFileMeta{Type: SourceTypeSchemaSchema},
}
dbMeta, dbExists := s.insertDB(dbFileInfo)
tableIndex, ok := s.tableIndexMap[fileInfo.TableName]
if ok {
return dbMeta.Tables[tableIndex], dbExists, true
Expand All @@ -442,7 +468,13 @@ func (s *mdLoaderSetup) insertTable(fileInfo FileInfo) (*MDTableMeta, bool, bool
}

func (s *mdLoaderSetup) insertView(fileInfo FileInfo) (bool, bool) {
dbMeta, dbExists := s.insertDB(fileInfo.TableName.Schema, "")
dbFileInfo := FileInfo{
TableName: filter.Table{
Schema: fileInfo.TableName.Schema,
},
FileMeta: SourceFileMeta{Type: SourceTypeSchemaSchema},
}
dbMeta, dbExists := s.insertDB(dbFileInfo)
_, ok := s.tableIndexMap[fileInfo.TableName]
if ok {
meta := &MDTableMeta{
Expand Down
33 changes: 21 additions & 12 deletions br/pkg/lightning/mydump/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ func (s *testMydumpLoaderSuite) TestTableInfoNotFound(c *C) {
loader, err := md.NewMyDumpLoader(ctx, s.cfg)
c.Assert(err, IsNil)
for _, dbMeta := range loader.GetDatabases() {
dbSQL, err := dbMeta.GetSchema(ctx, store)
c.Assert(err, IsNil)
c.Assert(dbSQL, Equals, "CREATE DATABASE IF NOT EXISTS `db`")
for _, tblMeta := range dbMeta.Tables {
sql, err := tblMeta.GetSchema(ctx, store)
c.Assert(sql, Equals, "")
Expand Down Expand Up @@ -272,8 +275,14 @@ func (s *testMydumpLoaderSuite) TestDataWithoutSchema(c *C) {
mdl, err := md.NewMyDumpLoader(context.Background(), s.cfg)
c.Assert(err, IsNil)
c.Assert(mdl.GetDatabases(), DeepEquals, []*md.MDDatabaseMeta{{
Name: "db",
SchemaFile: "",
Name: "db",
SchemaFile: md.FileInfo{
TableName: filter.Table{
Schema: "db",
Name: "",
},
FileMeta: md.SourceFileMeta{Type: md.SourceTypeSchemaSchema},
},
Tables: []*md.MDTableMeta{{
DB: "db",
Name: "tbl",
Expand Down Expand Up @@ -302,7 +311,7 @@ func (s *testMydumpLoaderSuite) TestTablesWithDots(c *C) {
c.Assert(err, IsNil)
c.Assert(mdl.GetDatabases(), DeepEquals, []*md.MDDatabaseMeta{{
Name: "db",
SchemaFile: "db-schema-create.sql",
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "db", Name: ""}, FileMeta: md.SourceFileMeta{Path: "db-schema-create.sql", Type: md.SourceTypeSchemaSchema}},
Tables: []*md.MDTableMeta{
{
DB: "db",
Expand Down Expand Up @@ -396,7 +405,7 @@ func (s *testMydumpLoaderSuite) TestRouter(c *C) {
c.Assert(mdl.GetDatabases(), DeepEquals, []*md.MDDatabaseMeta{
{
Name: "a1",
SchemaFile: "a1-schema-create.sql",
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "a1", Name: ""}, FileMeta: md.SourceFileMeta{Path: "a1-schema-create.sql", Type: md.SourceTypeSchemaSchema}},
Tables: []*md.MDTableMeta{
{
DB: "a1",
Expand Down Expand Up @@ -427,11 +436,11 @@ func (s *testMydumpLoaderSuite) TestRouter(c *C) {
},
{
Name: "d0",
SchemaFile: "d0-schema-create.sql",
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "d0", Name: ""}, FileMeta: md.SourceFileMeta{Path: "d0-schema-create.sql", Type: md.SourceTypeSchemaSchema}},
},
{
Name: "b",
SchemaFile: "a0-schema-create.sql",
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "b", Name: ""}, FileMeta: md.SourceFileMeta{Path: "a0-schema-create.sql", Type: md.SourceTypeSchemaSchema}},
Tables: []*md.MDTableMeta{
{
DB: "b",
Expand All @@ -449,7 +458,7 @@ func (s *testMydumpLoaderSuite) TestRouter(c *C) {
},
{
Name: "c",
SchemaFile: "c0-schema-create.sql",
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "c", Name: ""}, FileMeta: md.SourceFileMeta{Path: "c0-schema-create.sql", Type: md.SourceTypeSchemaSchema}},
Tables: []*md.MDTableMeta{
{
DB: "c",
Expand All @@ -463,7 +472,7 @@ func (s *testMydumpLoaderSuite) TestRouter(c *C) {
},
{
Name: "v",
SchemaFile: "e0-schema-create.sql",
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "v", Name: ""}, FileMeta: md.SourceFileMeta{Path: "e0-schema-create.sql", Type: md.SourceTypeSchemaSchema}},
Tables: []*md.MDTableMeta{
{
DB: "v",
Expand Down Expand Up @@ -552,7 +561,7 @@ func (s *testMydumpLoaderSuite) TestFileRouting(c *C) {
c.Assert(mdl.GetDatabases(), DeepEquals, []*md.MDDatabaseMeta{
{
Name: "d1",
SchemaFile: filepath.FromSlash("d1/schema.sql"),
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "d1", Name: ""}, FileMeta: md.SourceFileMeta{Path: filepath.FromSlash("d1/schema.sql"), Type: md.SourceTypeSchemaSchema}},
Tables: []*md.MDTableMeta{
{
DB: "d1",
Expand Down Expand Up @@ -605,7 +614,7 @@ func (s *testMydumpLoaderSuite) TestFileRouting(c *C) {
},
{
Name: "d2",
SchemaFile: filepath.FromSlash("d2/schema.sql"),
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "d2", Name: ""}, FileMeta: md.SourceFileMeta{Path: filepath.FromSlash("d2/schema.sql"), Type: md.SourceTypeSchemaSchema}},
Tables: []*md.MDTableMeta{
{
DB: "d2",
Expand Down Expand Up @@ -659,7 +668,7 @@ func (s *testMydumpLoaderSuite) TestInputWithSpecialChars(c *C) {
c.Assert(mdl.GetDatabases(), DeepEquals, []*md.MDDatabaseMeta{
{
Name: `db"`,
SchemaFile: filepath.FromSlash("db%22-schema-create.sql"),
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: `db"`, Name: ""}, FileMeta: md.SourceFileMeta{Path: filepath.FromSlash("db%22-schema-create.sql"), Type: md.SourceTypeSchemaSchema}},
Tables: []*md.MDTableMeta{
{
DB: `db"`,
Expand All @@ -681,7 +690,7 @@ func (s *testMydumpLoaderSuite) TestInputWithSpecialChars(c *C) {
},
{
Name: "test",
SchemaFile: filepath.FromSlash("test-schema-create.sql"),
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: `test`, Name: ""}, FileMeta: md.SourceFileMeta{Path: filepath.FromSlash("test-schema-create.sql"), Type: md.SourceTypeSchemaSchema}},
Tables: []*md.MDTableMeta{
{
DB: "test",
Expand Down
69 changes: 26 additions & 43 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,11 +501,7 @@ type schemaJob struct {
dbName string
tblName string // empty for create db jobs
stmtType schemaStmtType
stmts []*schemaStmt
}

type schemaStmt struct {
sql string
stmts []string
}

type restoreSchemaWorker struct {
Expand All @@ -518,6 +514,15 @@ type restoreSchemaWorker struct {
store storage.ExternalStorage
}

func (worker *restoreSchemaWorker) addJob(sqlStr string, job *schemaJob) error {
stmts, err := createIfNotExistsStmt(worker.glue.GetParser(), sqlStr, job.dbName, job.tblName)
if err != nil {
return err
}
job.stmts = stmts
return worker.appendJob(job)
}

func (worker *restoreSchemaWorker) makeJobs(
dbMetas []*mydump.MDDatabaseMeta,
getTables func(context.Context, string) ([]*model.TableInfo, error),
Expand All @@ -529,15 +534,15 @@ func (worker *restoreSchemaWorker) makeJobs(
var err error
// 1. restore databases, execute statements concurrency
for _, dbMeta := range dbMetas {
restoreSchemaJob := &schemaJob{
sql, err := dbMeta.GetSchema(worker.ctx, worker.store)
if err != nil {
return err
}
err = worker.addJob(sql, &schemaJob{
dbName: dbMeta.Name,
tblName: "",
stmtType: schemaCreateDatabase,
stmts: make([]*schemaStmt, 0, 1),
}
restoreSchemaJob.stmts = append(restoreSchemaJob.stmts, &schemaStmt{
sql: createDatabaseIfNotExistStmt(dbMeta.Name),
})
err = worker.appendJob(restoreSchemaJob)
if err != nil {
return err
}
Expand All @@ -563,30 +568,19 @@ func (worker *restoreSchemaWorker) makeJobs(
return errors.Errorf("table `%s`.`%s` schema not found", dbMeta.Name, tblMeta.Name)
}
sql, err := tblMeta.GetSchema(worker.ctx, worker.store)
if err != nil {
return err
}
if sql != "" {
stmts, err := createTableIfNotExistsStmt(worker.glue.GetParser(), sql, dbMeta.Name, tblMeta.Name)
if err != nil {
return err
}
restoreSchemaJob := &schemaJob{
err = worker.addJob(sql, &schemaJob{
dbName: dbMeta.Name,
tblName: tblMeta.Name,
stmtType: schemaCreateTable,
stmts: make([]*schemaStmt, 0, len(stmts)),
}
for _, sql := range stmts {
restoreSchemaJob.stmts = append(restoreSchemaJob.stmts, &schemaStmt{
sql: sql,
})
}
err = worker.appendJob(restoreSchemaJob)
})
if err != nil {
return err
}
}
if err != nil {
return err
}
}
}
err = worker.wait()
Expand All @@ -598,22 +592,11 @@ func (worker *restoreSchemaWorker) makeJobs(
for _, viewMeta := range dbMeta.Views {
sql, err := viewMeta.GetSchema(worker.ctx, worker.store)
if sql != "" {
stmts, err := createTableIfNotExistsStmt(worker.glue.GetParser(), sql, dbMeta.Name, viewMeta.Name)
if err != nil {
return err
}
restoreSchemaJob := &schemaJob{
err = worker.addJob(sql, &schemaJob{
dbName: dbMeta.Name,
tblName: viewMeta.Name,
stmtType: schemaCreateView,
stmts: make([]*schemaStmt, 0, len(stmts)),
}
for _, sql := range stmts {
restoreSchemaJob.stmts = append(restoreSchemaJob.stmts, &schemaStmt{
sql: sql,
})
}
err = worker.appendJob(restoreSchemaJob)
})
if err != nil {
return err
}
Expand Down Expand Up @@ -674,8 +657,8 @@ loop:
DB: session,
}
for _, stmt := range job.stmts {
task := logger.Begin(zap.DebugLevel, fmt.Sprintf("execute SQL: %s", stmt.sql))
err = sqlWithRetry.Exec(worker.ctx, "run create schema job", stmt.sql)
task := logger.Begin(zap.DebugLevel, fmt.Sprintf("execute SQL: %s", stmt))
err = sqlWithRetry.Exec(worker.ctx, "run create schema job", stmt)
task.End(zap.ErrorLevel, err)
if err != nil {
err = errors.Annotatef(err, "%s %s failed", job.stmtType.String(), common.UniqueTable(job.dbName, job.tblName))
Expand Down Expand Up @@ -735,7 +718,7 @@ func (worker *restoreSchemaWorker) appendJob(job *schemaJob) error {
case <-worker.ctx.Done():
// cancel the job
worker.wg.Done()
return worker.ctx.Err()
return errors.Trace(worker.ctx.Err())
case worker.jobCh <- job:
return nil
}
Expand Down
14 changes: 5 additions & 9 deletions br/pkg/lightning/restore/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ loopCreate:
for tbl, sqlCreateTable := range tablesSchema {
task.Debug("create table", zap.String("schema", sqlCreateTable))

sqlCreateStmts, err = createTableIfNotExistsStmt(g.GetParser(), sqlCreateTable, database, tbl)
sqlCreateStmts, err = createIfNotExistsStmt(g.GetParser(), sqlCreateTable, database, tbl)
if err != nil {
break
}
Expand All @@ -182,14 +182,7 @@ loopCreate:
return errors.Trace(err)
}

func createDatabaseIfNotExistStmt(dbName string) string {
var createDatabase strings.Builder
createDatabase.WriteString("CREATE DATABASE IF NOT EXISTS ")
common.WriteMySQLIdentifier(&createDatabase, dbName)
return createDatabase.String()
}

func createTableIfNotExistsStmt(p *parser.Parser, createTable, dbName, tblName string) ([]string, error) {
func createIfNotExistsStmt(p *parser.Parser, createTable, dbName, tblName string) ([]string, error) {
stmts, _, err := p.ParseSQL(createTable)
if err != nil {
return []string{}, err
Expand All @@ -201,6 +194,9 @@ func createTableIfNotExistsStmt(p *parser.Parser, createTable, dbName, tblName s
retStmts := make([]string, 0, len(stmts))
for _, stmt := range stmts {
switch node := stmt.(type) {
case *ast.CreateDatabaseStmt:
node.Name = dbName
node.IfNotExists = true
case *ast.CreateTableStmt:
node.Table.Schema = model.NewCIStr(dbName)
node.Table.Name = model.NewCIStr(tblName)
Expand Down
Loading

0 comments on commit 55f0309

Please sign in to comment.