From abc18e435274ff9a5401c76b1ffd43b2ef3e4aea Mon Sep 17 00:00:00 2001 From: David Chen Date: Mon, 11 Jun 2018 16:41:06 +0800 Subject: [PATCH] *: refine log and enable retry with sql execution (#43) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Goal: Improve stability Improve log readability What changes are: Refine the log, so we can know the processing stage of each table. Enable retry for sql execution and grpc api calling. Remove useless code. --- lightning/common/util.go | 131 ++++++++++++++- lightning/kv/kv-deliver.go | 285 ++++++++------------------------ lightning/kv/sql2kv.go | 2 +- lightning/lightning.go | 2 +- lightning/mydump/loader.go | 6 +- lightning/mydump/reader.go | 12 +- lightning/mydump/reader_test.go | 3 +- lightning/mydump/region.go | 3 +- lightning/restore/restore.go | 170 ++++++++----------- lightning/restore/tidb.go | 58 ++++--- lightning/sql/parser_test.go | 3 +- 11 files changed, 317 insertions(+), 358 deletions(-) diff --git a/lightning/common/util.go b/lightning/common/util.go index 6378457b9c180..b5ef21edd6730 100644 --- a/lightning/common/util.go +++ b/lightning/common/util.go @@ -1,16 +1,27 @@ package common import ( + "context" "fmt" + "net" "os" "strings" + "time" "database/sql" + "database/sql/driver" "path/filepath" - _ "github.com/go-sql-driver/mysql" + "github.com/go-sql-driver/mysql" "github.com/juju/errors" - "github.com/ngaut/log" + tmysql "github.com/pingcap/tidb/mysql" + log "github.com/sirupsen/logrus" +) + +const ( + retryTimeout = 3 * time.Second + + defaultMaxRetry = 3 ) func Percent(a int, b int) string { @@ -30,13 +41,13 @@ func ConnectDB(host string, port int, user string, psw string) (*sql.DB, error) func GetFileSize(file string) (int64, error) { fd, err := os.Open(file) if err != nil { - return -1, err + return -1, errors.Trace(err) } defer fd.Close() fstat, err := fd.Stat() if err != nil { - return -1, err + return -1, errors.Trace(err) } return fstat.Size(), nil @@ -59,7 +70,7 @@ func IsDirExists(name string) bool { func EnsureDir(dir string) error { if !FileExists(dir) { if err := os.MkdirAll(dir, os.ModePerm); err != nil { - return err + return errors.Trace(err) } } return nil @@ -90,3 +101,113 @@ func ListFiles(dir string) map[string]string { return files } + +func QueryRowWithRetry(ctx context.Context, db *sql.DB, query string, dest ...interface{}) (err error) { + maxRetry := defaultMaxRetry + for i := 0; i < maxRetry; i++ { + if i > 0 { + log.Warnf("query %s retry %d: %v", query, i, dest) + time.Sleep(retryTimeout) + } + + err = db.QueryRowContext(ctx, query).Scan(dest...) + if err != nil { + if !isRetryableError(err) { + return errors.Trace(err) + } + log.Warnf("query %s [error] %v", query, err) + continue + } + + return nil + } + + return errors.Errorf("query sql [%s] failed", query) +} + +// ExecWithRetry executes sqls with optional retry. +func ExecWithRetry(ctx context.Context, db *sql.DB, sqls []string) error { + maxRetry := defaultMaxRetry + + if len(sqls) == 0 { + return nil + } + + var err error + for i := 0; i < maxRetry; i++ { + if i > 0 { + log.Warnf("sql stmt_exec retry %d: %v", i, sqls) + time.Sleep(retryTimeout) + } + + if err = executeSQLImp(ctx, db, sqls); err != nil { + if isRetryableError(err) { + continue + } + log.Errorf("[exec][sql] %s [error] %v", sqls, err) + return errors.Trace(err) + } + + return nil + } + + return errors.Errorf("exec sqls [%v] failed, err:%s", sqls, err.Error()) +} + +func executeSQLImp(ctx context.Context, db *sql.DB, sqls []string) error { + txn, err := db.BeginTx(ctx, nil) + if err != nil { + log.Errorf("exec sqls [%v] begin failed %v", sqls, errors.ErrorStack(err)) + return errors.Trace(err) + } + + for i := range sqls { + log.Debugf("[exec][sql] %s", sqls[i]) + + _, err = txn.ExecContext(ctx, sqls[i]) + if err != nil { + log.Warnf("[exec][sql] %s [error]%v", sqls[i], err) + rerr := txn.Rollback() + if rerr != nil { + log.Errorf("[exec][sql] %s [error] %v", sqls[i], rerr) + } + // we should return the exec err, instead of the rollback rerr. + return errors.Trace(err) + } + } + err = txn.Commit() + if err != nil { + log.Errorf("exec sqls [%v] commit failed %v", sqls, errors.ErrorStack(err)) + return errors.Trace(err) + } + return nil +} + +func isRetryableError(err error) bool { + err = errors.Cause(err) + if err == driver.ErrBadConn { + return true + } + + if nerr, ok := err.(net.Error); ok { + return nerr.Timeout() + } + + mysqlErr, ok := err.(*mysql.MySQLError) + if ok { + switch mysqlErr.Number { + // ErrLockDeadlock can retry to commit while meet deadlock + case tmysql.ErrUnknown, tmysql.ErrLockDeadlock, tmysql.ErrPDServerTimeout, tmysql.ErrTiKVServerTimeout, tmysql.ErrTiKVServerBusy, tmysql.ErrResolveLockTimeout, tmysql.ErrRegionUnavailable: + return true + default: + return false + } + } + + return true +} + +// UniqueTable returns an unique table name. +func UniqueTable(schema string, table string) string { + return fmt.Sprintf("`%s`.`%s`", schema, table) +} diff --git a/lightning/kv/kv-deliver.go b/lightning/kv/kv-deliver.go index 1096d7dfdf05d..977f838c9f6de 100644 --- a/lightning/kv/kv-deliver.go +++ b/lightning/kv/kv-deliver.go @@ -1,13 +1,12 @@ package kv import ( - "fmt" "math" "sync" - "sync/atomic" "time" "github.com/juju/errors" + "github.com/pingcap/tidb-lightning/lightning/common" "github.com/pingcap/tidb-lightning/lightning/importpb" kvec "github.com/pingcap/tidb/util/kvencoder" log "github.com/sirupsen/logrus" @@ -23,9 +22,10 @@ var ( ) const ( - _G uint64 = 1 << 30 - flushSizeLimit uint64 = 1 * _G - maxRetryTimes int = 3 + _G uint64 = 1 << 30 + flushSizeLimit uint64 = 1 * _G + maxRetryTimes int = 3 // tikv-importer has done retry internally. so we don't retry many times. + retryBackoffTime = time.Second * 3 ) var ( @@ -64,162 +64,6 @@ type deliverTask struct { // TODO .. callback ? } -type PipeKvDeliver struct { - wg sync.WaitGroup - mux sync.Mutex - - ctx context.Context - shutdown context.CancelFunc - - uuid uuid.UUID - deliver *KVDeliverClient - tasks chan *deliverTask - - sumPuts uint32 - sumKVSize uint64 -} - -func NewPipeKvDeliver(uuid uuid.UUID, importServerAddr string, pdAddr string) (*PipeKvDeliver, error) { - ctx, shutdown := context.WithCancel(context.Background()) - - deliver, err := NewKVDeliverClient(context.Background(), uuid, importServerAddr, pdAddr) - if err != nil { - return nil, errors.Trace(err) - } - - p := &PipeKvDeliver{ - ctx: ctx, - shutdown: shutdown, - uuid: uuid, - deliver: deliver, - tasks: make(chan *deliverTask, 128), - } - p.start() - return p, nil -} - -func (p *PipeKvDeliver) Close() error { - p.shutdown() - p.wg.Wait() - return p.deliver.Close() -} - -func (p *PipeKvDeliver) CloseAndWait() error { - p.shutdown() - p.wg.Wait() - - for len(p.tasks) > 0 { - select { - case task := <-p.tasks: - p.handle(task) - default: - } - } - - return p.deliver.Close() -} - -func (p *PipeKvDeliver) Put(kvs []kvec.KvPair) error { - atomic.AddUint32(&p.sumPuts, 1) - p.tasks <- &deliverTask{ - op: opPut, - kvs: kvs, - retry: 0, - } - return nil -} - -func (p *PipeKvDeliver) Flush() error { - p.tasks <- &deliverTask{op: opFlush} - return nil -} - -func (p *PipeKvDeliver) Cleanup() error { - p.tasks <- &deliverTask{op: opCleanup} - return nil -} - -func (p *PipeKvDeliver) start() { - p.wg.Add(1) - go func() { - defer p.wg.Done() - p.run(p.ctx) - }() -} - -func (p *PipeKvDeliver) run(ctx context.Context) { - var task *deliverTask - var err error - - for { - select { - case <-ctx.Done(): - return - case task = <-p.tasks: - if err = p.handle(task); err != nil { - log.Warnf("[%s] Deliver task failed (retry = %d) : %s", p.uuid, err.Error()) - if task.retry > maxRetryTimes { - break // TODO ... - } - // ps : p.tasks might full ~ - task.retry++ - p.tasks <- task - } - } - } -} - -func (p *PipeKvDeliver) handle(task *deliverTask) error { - if task == nil { - return nil - } - - var err error - switch task.op { - case opPut: - var dataSize int - for _, pair := range task.kvs { - dataSize += len(pair.Key) + len(pair.Val) - } - - err = p.deliver.Put(task.kvs) - if err != nil { - log.Errorf("kv deliver manager put failed : %s", err.Error()) - } else { - p.sumKVSize += uint64(dataSize) - // TODO ... determine to call flush ~ - } - - /*if p.sumKVSize >= flushSizeLimit { - if err := p.doFlush(); err != nil { - log.Errorf("kv deliver manager auto flush failed (put size = %d) : %s", - p.sumKVSize, err.Error()) - } else { - p.sumKVSize = 0 - } - }*/ - - case opFlush: - err = p.doFlush() - case opCleanup: - err = p.deliver.Cleanup() // TODO .. error - default: - } - - return errors.Trace(err) -} - -func (p *PipeKvDeliver) doFlush() error { - log.Infof("kv deliver manager do flush !") - - err := p.deliver.Flush() - if err != nil { - log.Errorf("kv deliver manager flush failed : %s", err.Error()) - } - - return errors.Trace(err) -} - /////////////////////// KV Deliver Transaction /////////////////////// const ( @@ -230,19 +74,21 @@ const ( ) type deliverTxn struct { - mux sync.RWMutex - uuid uuid.UUID - stat int - kvSize int64 - kvPairs int64 + uniqueTable string + mux sync.RWMutex + uuid uuid.UUID + stat int + kvSize int64 + kvPairs int64 } -func newDeliverTxn(uuid uuid.UUID) *deliverTxn { +func newDeliverTxn(uuid uuid.UUID, uniqueTable string) *deliverTxn { return &deliverTxn{ - uuid: uuid, - stat: txnPutting, - kvSize: 0, - kvPairs: 0, + uniqueTable: uniqueTable, + uuid: uuid, + stat: txnPutting, + kvSize: 0, + kvPairs: 0, } } @@ -287,7 +133,7 @@ type KVDeliverKeeper struct { txnIDCounter int // TODO : need to update to another algorithm txnBoard map[uuid.UUID]*txnInfo - txns map[string][]*deliverTxn // map[tag]{*txn, *txn, *txn ...} + txns map[string][]*deliverTxn // map[uniqueTable]{*txn, *txn, *txn ...} flushWg sync.WaitGroup txnFlushQueue chan *deliverTxn @@ -322,10 +168,6 @@ func NewKVDeliverKeeper(importServerAddr, pdAddr string) *KVDeliverKeeper { return keeper } -func buildTag(db string, table string) string { - return fmt.Sprintf("%s.%s", db, table) -} - func (k *KVDeliverKeeper) Close() error { k.mux.Lock() defer k.mux.Unlock() @@ -352,9 +194,9 @@ func (k *KVDeliverKeeper) newTxn(db string, table string) *deliverTxn { k.txnIDCounter++ uuid := uuid.Must(uuid.NewV4()) - tag := buildTag(db, table) - txn := newDeliverTxn(uuid) - log.Infof("[deliver-keeper] new txn (UUID = %s) for [%s]", txn.uuid, tag) + uniqueTable := common.UniqueTable(db, table) + txn := newDeliverTxn(uuid, uniqueTable) + log.Infof("[deliver-keeper] [%s] new txn (UUID = %s) ", uniqueTable, txn.uuid) return txn } @@ -363,8 +205,8 @@ func (k *KVDeliverKeeper) applyTxn(db string, table string) *deliverTxn { var txn *deliverTxn // try to choose a valid deliver txn to join - tag := buildTag(db, table) - tagTxns, ok := k.txns[tag] + uniqueTable := common.UniqueTable(db, table) + tagTxns, ok := k.txns[uniqueTable] if ok { for _, tx := range tagTxns { if k.validate(tx) { @@ -380,8 +222,8 @@ func (k *KVDeliverKeeper) applyTxn(db string, table string) *deliverTxn { tagTxns = make([]*deliverTxn, 0, 4) tagTxns = append(tagTxns, txn) - k.txns[tag] = tagTxns - log.Infof("[deliver-keeper] holds [%s] txn count = %d", tag, len(tagTxns)) + k.txns[uniqueTable] = tagTxns + log.Infof("[deliver-keeper] [%s] holds txn count = %d", uniqueTable, len(tagTxns)) k.txnBoard[txn.uuid] = &txnInfo{ txn: txn, @@ -389,7 +231,7 @@ func (k *KVDeliverKeeper) applyTxn(db string, table string) *deliverTxn { table: table, clients: 0, } - log.Infof("[deliver-keeper] holds txn total = %d", len(k.txnBoard)) + log.Infof("[deliver-keeper] [%s] holds txn total = %d", uniqueTable, len(k.txnBoard)) } return txn @@ -430,9 +272,9 @@ func (k *KVDeliverKeeper) AcquireClient(db string, table string) *KVDeliverClien // pop client/connection from pool size := len(k.clientsPool) if size == 0 { - cli, err := NewKVDeliverClient(k.ctx, txn.uuid, k.importServerAddr, k.pdAddr) + cli, err := NewKVDeliverClient(k.ctx, txn.uuid, k.importServerAddr, k.pdAddr, common.UniqueTable(db, table)) if err != nil { - log.Infof("[deliver-keeper] failed to create deliver client (UUID = %s) : %s ", txn.uuid, err.Error()) + log.Errorf("[deliver-keeper] [%s] failed to create deliver client (UUID = %s) : %s ", common.UniqueTable(db, table), txn.uuid, err.Error()) return nil } @@ -451,7 +293,7 @@ func (k *KVDeliverKeeper) AcquireClient(db string, table string) *KVDeliverClien } func (k *KVDeliverKeeper) Compact(start, end []byte) error { - cli, err := NewKVDeliverClient(k.ctx, uuid.Nil, k.importServerAddr, k.pdAddr) + cli, err := NewKVDeliverClient(k.ctx, uuid.Nil, k.importServerAddr, k.pdAddr, "") if err != nil { return errors.Trace(err) } @@ -497,8 +339,6 @@ func (k *KVDeliverKeeper) closeTxnClients(txn *deliverTxn) { } func (k *KVDeliverKeeper) flushTxn(txn *deliverTxn) { - log.Infof("[deliver-keeper] gonna to flush txn (UUID = %s)", txn.uuid) - // release relating client/connection first k.closeTxnClients(txn) @@ -510,9 +350,9 @@ func (k *KVDeliverKeeper) flushTxn(txn *deliverTxn) { func (k *KVDeliverKeeper) handleTxnFlush(ctx context.Context) { doFlush := func(txn *deliverTxn) { - cli, err := NewKVDeliverClient(ctx, txn.uuid, k.importServerAddr, k.pdAddr) + cli, err := NewKVDeliverClient(ctx, txn.uuid, k.importServerAddr, k.pdAddr, txn.uniqueTable) if err != nil { - log.Infof("[deliver-keeper] failed to create deliver client (UUID = %s) : %s ", txn.uuid, err.Error()) + log.Errorf("[deliver-keeper] [%s] failed to create deliver client (UUID = %s) : %s ", txn.uniqueTable, txn.uuid, err.Error()) return } defer func() { @@ -521,9 +361,12 @@ func (k *KVDeliverKeeper) handleTxnFlush(ctx context.Context) { }() if err := cli.Flush(); err != nil { - log.Infof("[deliver-keeper] txn (UUID = %s) flush failed : %s ", txn.uuid, err.Error()) - } else { - cli.Cleanup() + log.Errorf("[deliver-keeper] [%s] txn (UUID = %s) flush failed : %s ", txn.uniqueTable, txn.uuid, err.Error()) + return + } + err = cli.Cleanup() + if err != nil { + log.Warnf("[deliver-keeper] [%s] txn (UUID = %s) cleanup failed: %s", txn.uniqueTable, txn.uuid, err.Error()) } } @@ -533,13 +376,12 @@ func (k *KVDeliverKeeper) handleTxnFlush(ctx context.Context) { return case txn := <-k.txnFlushQueue: now := time.Now() - log.Infof("[deliver-keeper] start flushing txn (UUID = %s) ... ", txn.uuid) + log.Infof("[deliver-keeper] [%s] start flushing txn (UUID = %s) ... ", txn.uniqueTable, txn.uuid) doFlush(txn) k.flushWg.Done() - log.Infof("[deliver-keeper] finished flushing txn (UUID = %s)", txn.uuid) - log.Infof("[deliver-keeper] cost time = %.1f sec", time.Since(now).Seconds()) + log.Infof("[deliver-keeper] [%s] finished flushing txn (UUID = %s), takes %v", txn.uniqueTable, txn.uuid, time.Since(now)) } } } @@ -570,10 +412,10 @@ func newImportClient(importServerAddr string) (*grpc.ClientConn, importpb.Import return conn, importpb.NewImportKVClient(conn), nil } -func NewKVDeliverClient(ctx context.Context, uuid uuid.UUID, importServerAddr string, pdAddr string) (*KVDeliverClient, error) { +func NewKVDeliverClient(ctx context.Context, uuid uuid.UUID, importServerAddr string, pdAddr string, uniqueTable string) (*KVDeliverClient, error) { conn, rpcCli, err := newImportClient(importServerAddr) // goruntine safe ??? if err != nil { - return nil, err + return nil, errors.Trace(err) } cli := &KVDeliverClient{ @@ -583,7 +425,7 @@ func NewKVDeliverClient(ctx context.Context, uuid uuid.UUID, importServerAddr st pdAddr: pdAddr, conn: conn, cli: rpcCli, - txn: newDeliverTxn(uuid), + txn: newDeliverTxn(uuid, uniqueTable), } return cli, nil @@ -591,7 +433,7 @@ func NewKVDeliverClient(ctx context.Context, uuid uuid.UUID, importServerAddr st func (c *KVDeliverClient) Close() error { defer c.conn.Close() - return c.closeWriteStream() + return errors.Trace(c.closeWriteStream()) } func (c *KVDeliverClient) bind(txn *deliverTxn) { @@ -610,7 +452,7 @@ func (c *KVDeliverClient) bind(txn *deliverTxn) { func (c *KVDeliverClient) exitTxn() { log.Debugf("Release kv client from txn (UUID = %s)", c.txn.uuid) c.closeWriteStream() - c.txn = newDeliverTxn(invalidUUID) + c.txn = newDeliverTxn(invalidUUID, "") return } @@ -710,10 +552,18 @@ func (c *KVDeliverClient) Put(kvs []kvec.KvPair) error { }, } - if err := wstream.Send(write); err != nil { - log.Errorf("[kv-deliver] write stream failed to send : %s", err.Error()) + var sendErr error + for i := 0; i < maxRetryTimes; i++ { + sendErr = wstream.Send(write) + if sendErr == nil { + break + } + log.Errorf("[kv-deliver] [%s] write stream failed to send: %s", c.txn.uniqueTable, sendErr.Error()) + time.Sleep(retryBackoffTime) + } + if sendErr != nil { c.closeWriteStream() - return errors.Trace(err) + return errors.Trace(sendErr) } kvSize := 0 @@ -747,7 +597,7 @@ func (c *KVDeliverClient) Flush() error { } func (c *KVDeliverClient) Compact(start, end []byte) error { - return c.callCompact(start, end) + return errors.Trace(c.callCompact(start, end)) } // Do compaction for specific table. `start` and `end`` key can be got in the following way: @@ -765,21 +615,28 @@ func (c *KVDeliverClient) callCompact(start, end []byte) error { func (c *KVDeliverClient) callClose() error { timer := time.Now() - log.Infof("[%s] close", c.txn.uuid) + log.Infof("[%s] [%s] close", c.txn.uniqueTable, c.txn.uuid) req := &importpb.CloseRequest{Uuid: c.txn.uuid.Bytes()} _, err := c.cli.Close(c.ctx, req) - log.Infof("[%s] close takes %v", c.txn.uuid, time.Since(timer)) + log.Infof("[%s] [%s] close takes %v", c.txn.uniqueTable, c.txn.uuid, time.Since(timer)) return errors.Trace(err) } func (c *KVDeliverClient) callImport() error { // TODO ... no matter what, to enusure available to import, call close first ! - timer := time.Now() - log.Infof("[%s] import", c.txn.uuid) - req := &importpb.ImportRequest{Uuid: c.txn.uuid.Bytes(), PdAddr: c.pdAddr} - _, err := c.cli.Import(c.ctx, req) - log.Infof("[%s] import takes %v", c.txn.uuid, time.Since(timer)) + for i := 0; i < maxRetryTimes; i++ { + timer := time.Now() + log.Infof("[%s] [%s] import", c.txn.uniqueTable, c.txn.uuid) + req := &importpb.ImportRequest{Uuid: c.txn.uuid.Bytes(), PdAddr: c.pdAddr} + _, err := c.cli.Import(c.ctx, req) + log.Infof("[%s] [%s] import takes %v", c.txn.uniqueTable, c.txn.uuid, time.Since(timer)) + if err == nil { + return nil + } + log.Warnf("[%s] [%s] import failed and retry %d time, err %v", c.txn.uniqueTable, c.txn.uuid, i+1, err) + time.Sleep(retryBackoffTime) + } - return errors.Trace(err) + return errors.Errorf("[%s] [%s] import reach max retry %d and still failed", c.txn.uniqueTable, c.txn.uuid, maxRetryTimes) } diff --git a/lightning/kv/sql2kv.go b/lightning/kv/sql2kv.go index bd79bf69a67c3..ab0d3ded0e9b0 100644 --- a/lightning/kv/sql2kv.go +++ b/lightning/kv/sql2kv.go @@ -107,7 +107,7 @@ func (kvcodec *TableKVEncoder) ResetRowID(rowID int64) { } func (kvcodec *TableKVEncoder) Close() error { - return kvcodec.encoder.Close() + return errors.Trace(kvcodec.encoder.Close()) } func (kvcodec *TableKVEncoder) NextRowID() int64 { diff --git a/lightning/lightning.go b/lightning/lightning.go index 27aaa2e06acbc..f54984b34ea8a 100644 --- a/lightning/lightning.go +++ b/lightning/lightning.go @@ -94,7 +94,7 @@ func (l *Lightning) run() { } func (l *Lightning) doCompact() error { - cli, err := kv.NewKVDeliverClient(context.Background(), uuid.Nil, l.cfg.TikvImporter.Addr, l.cfg.TiDB.PdAddr) + cli, err := kv.NewKVDeliverClient(context.Background(), uuid.Nil, l.cfg.TikvImporter.Addr, l.cfg.TiDB.PdAddr, "") if err != nil { return errors.Trace(err) } diff --git a/lightning/mydump/loader.go b/lightning/mydump/loader.go index 710d90613fa5d..bdf653c1da4db 100644 --- a/lightning/mydump/loader.go +++ b/lightning/mydump/loader.go @@ -24,8 +24,8 @@ type MDDatabaseMeta struct { Tables map[string]*MDTableMeta } -func (meta *MDDatabaseMeta) String() string { - v, err := json.Marshal(meta) +func (m *MDDatabaseMeta) String() string { + v, err := json.Marshal(m) if err != nil { log.Error("json marshal MDDatabaseMeta error %s", errors.ErrorStack(err)) } @@ -222,8 +222,6 @@ func (l *MDLoader) setupTablesData(files map[string]string) error { tableMeta.DataFiles = append(tableMeta.DataFiles, fpath) } - log.Infof("datafiles %+v", l.dbs) - // sort all tables' data files by file-name for _, dbMeta := range l.dbs { for _, tblMeta := range dbMeta.Tables { diff --git a/lightning/mydump/reader.go b/lightning/mydump/reader.go index 8a889fc8577b7..7b32c08e3949e 100644 --- a/lightning/mydump/reader.go +++ b/lightning/mydump/reader.go @@ -33,7 +33,7 @@ func ExportStatement(sqlFile string) ([]byte, error) { buffer := make([]byte, 0, f.Size()+1) for { line, err := br.ReadString('\n') - if err == io.EOF { + if errors.Cause(err) == io.EOF { break } @@ -118,7 +118,7 @@ func (r *MDDataReader) skipAnnotation(offset int64) int64 { br := bufio.NewReader(r.fd) for skipSize := 0; ; { line, err := br.ReadString('\n') - if err == io.EOF { + if errors.Cause(err) == io.EOF { break } @@ -164,7 +164,7 @@ func getInsertStatmentHeader(file string) []byte { br := bufio.NewReaderSize(f, int(defReadBlockSize)) for { line, err := br.ReadString('\n') - if err == io.EOF { + if errors.Cause(err) == io.EOF { break } @@ -298,7 +298,7 @@ func NewRegionReader(file string, offset int64, size int64) (*RegionReader, erro fileReader, err := NewMDDataReader(file, offset) if err != nil { - return nil, err + return nil, errors.Trace(err) } return &RegionReader{ @@ -322,9 +322,9 @@ func (r *RegionReader) Read(maxBlockSize int64) ([][]byte, error) { datas, err := r.fileReader.Read(readSize) r.pos = r.fileReader.Tell() - return datas, err + return datas, errors.Trace(err) } func (r *RegionReader) Close() error { - return r.fileReader.Close() + return errors.Trace(r.fileReader.Close()) } diff --git a/lightning/mydump/reader_test.go b/lightning/mydump/reader_test.go index 033a1ed45cc9c..695e113266d5a 100644 --- a/lightning/mydump/reader_test.go +++ b/lightning/mydump/reader_test.go @@ -9,6 +9,7 @@ import ( "github.com/pingcap/tidb-lightning/lightning/common" "github.com/pingcap/tidb-lightning/lightning/config" "github.com/pingcap/tidb-lightning/lightning/mydump" + "github.com/pkg/errors" ) const ( @@ -89,7 +90,7 @@ func mydump2mysql(c *C, dbMeta *MDDatabaseMeta, minBlockSize int64) { for { statements, err := reader.Read(minBlockSize) - if err == io.EOF { + if errors.Cause(err) == io.EOF { break } for _, stmt := range statements { diff --git a/lightning/mydump/region.go b/lightning/mydump/region.go index 48ed21e2e56aa..52fefae4d4a3d 100644 --- a/lightning/mydump/region.go +++ b/lightning/mydump/region.go @@ -7,6 +7,7 @@ import ( "sort" "sync" + "github.com/juju/errors" log "github.com/sirupsen/logrus" ) @@ -137,7 +138,7 @@ func splitFuzzyRegion(db string, table string, file string, minRegionSize int64) regions = append(regions, region) } - if err == io.EOF { + if errors.Cause(err) == io.EOF { break } offset = pos diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index 655ec78b44b5b..a3ae4be8ab73d 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -4,7 +4,6 @@ import ( "database/sql" "fmt" "io" - "strings" "sync" "time" @@ -83,7 +82,7 @@ func (rc *RestoreControlloer) Run(ctx context.Context) { for _, process := range opts { err := process(ctx) - if err == errCtxAborted { + if errors.Cause(err) == errCtxAborted { break } if err != nil { @@ -100,8 +99,6 @@ func (rc *RestoreControlloer) Run(ctx context.Context) { } func (rc *RestoreControlloer) restoreSchema(ctx context.Context) error { - log.Infof("restore schema %s from file %s", rc.dbMeta.Name, rc.dbMeta.SchemaFile) - tidbMgr, err := NewTiDBManager(rc.cfg.TiDB) if err != nil { return errors.Trace(err) @@ -111,16 +108,19 @@ func (rc *RestoreControlloer) restoreSchema(ctx context.Context) error { database := rc.dbMeta.Name if !rc.cfg.Mydumper.NoSchema { + timer := time.Now() + log.Infof("restore table schema for `%s`", rc.dbMeta.Name) tablesSchema := make(map[string]string) for tbl, tblMeta := range rc.dbMeta.Tables { tablesSchema[tbl] = tblMeta.GetSchema() } - err = tidbMgr.InitSchema(database, tablesSchema) + err = tidbMgr.InitSchema(ctx, database, tablesSchema) if err != nil { return errors.Errorf("db schema failed to init : %v", err) } + log.Infof("restore table schema for `%s` takes %v", rc.dbMeta.Name, time.Since(timer)) } - dbInfo, err := tidbMgr.LoadSchemaInfo(database) + dbInfo, err := tidbMgr.LoadSchemaInfo(ctx, database) if err != nil { return errors.Trace(err) } @@ -129,6 +129,7 @@ func (rc *RestoreControlloer) restoreSchema(ctx context.Context) error { } func (rc *RestoreControlloer) restoreTables(ctx context.Context) error { + timer := time.Now() // tables' restoring mission tablesRestoring := make([]*TableRestore, 0, len(rc.dbMeta.Tables)) defer func() { @@ -187,7 +188,7 @@ func (rc *RestoreControlloer) restoreTables(ctx context.Context) error { go func(w *RestoreWorker, t *regionRestoreTask) { defer workers.Recycle(w) defer wg.Done() - table := fmt.Sprintf("%s.%s", t.region.DB, t.region.Table) + table := common.UniqueTable(t.region.DB, t.region.Table) if _, ok := skipTables[table]; ok { log.Infof("something wrong with table %s before, so skip region %s", table, t.region.Name()) return @@ -201,6 +202,7 @@ func (rc *RestoreControlloer) restoreTables(ctx context.Context) error { }(worker, task) } wg.Wait() // TODO ... ctx abroted + log.Infof("restore table data takes %v", time.Since(timer)) return nil } @@ -212,7 +214,7 @@ func (rc *RestoreControlloer) compact(ctx context.Context) error { return nil } - cli, err := kv.NewKVDeliverClient(ctx, uuid.Nil, rc.cfg.TikvImporter.Addr, rc.cfg.TiDB.PdAddr) + cli, err := kv.NewKVDeliverClient(ctx, uuid.Nil, rc.cfg.TikvImporter.Addr, rc.cfg.TiDB.PdAddr, "") if err != nil { return errors.Trace(err) } @@ -232,16 +234,16 @@ func (rc *RestoreControlloer) checksum(ctx context.Context) error { } tables := rc.getTables() - remoteChecksums, err := DoChecksum(rc.cfg.TiDB, tables) + remoteChecksums, err := DoChecksum(ctx, rc.cfg.TiDB, tables) if err != nil { return errors.Trace(err) } for _, remoteChecksum := range remoteChecksums { - table := fmt.Sprintf("%s.%s", remoteChecksum.Schema, remoteChecksum.Table) + table := common.UniqueTable(remoteChecksum.Schema, remoteChecksum.Table) localChecksum, ok := rc.localChecksums[table] if !ok { - log.Warnf("[%s] no local checksum", table) + log.Warnf("[%s] no local checksum, remote checksum is %v", table, remoteChecksum) continue } @@ -265,11 +267,11 @@ func (rc *RestoreControlloer) analyze(ctx context.Context) error { } tables := rc.getTables() - analyzeTable(rc.cfg.TiDB, tables) - - return nil + err := analyzeTable(ctx, rc.cfg.TiDB, tables) + return errors.Trace(err) } +// getTables returns a table list, which table format is `db`.`table`. func (rc *RestoreControlloer) getTables() []string { tables := make([]string, 0, len(rc.dbMeta.Tables)) dbInfo := rc.dbInfo @@ -280,28 +282,30 @@ func (rc *RestoreControlloer) getTables() []string { log.Warnf("table info not found : %s", tbl) continue } - tables = append(tables, fmt.Sprintf("%s.%s", dbInfo.Name, tbl)) + tables = append(tables, common.UniqueTable(dbInfo.Name, tbl)) } return tables } -func analyzeTable(dsn config.DBStore, tables []string) error { +func analyzeTable(ctx context.Context, dsn config.DBStore, tables []string) error { db, err := common.ConnectDB(dsn.Host, dsn.Port, dsn.User, dsn.Psw) if err != nil { + log.Errorf("connect db failed %v, the next operation is: ANALYZE TABLE. You should do it one by one manually", err) return errors.Trace(err) } defer db.Close() // speed up executing analyze table temporarily - setSessionVarInt(db, "tidb_build_stats_concurrency", 16) - setSessionVarInt(db, "tidb_distsql_scan_concurrency", dsn.DistSQLScanConcurrency) + setSessionVarInt(ctx, db, "tidb_build_stats_concurrency", 16) + setSessionVarInt(ctx, db, "tidb_distsql_scan_concurrency", dsn.DistSQLScanConcurrency) for _, table := range tables { timer := time.Now() log.Infof("[%s] analyze", table) - _, err := db.Exec(fmt.Sprintf("ANALYZE TABLE %s", table)) + query := fmt.Sprintf("ANALYZE TABLE %s", table) + err := common.ExecWithRetry(ctx, db, []string{query}) if err != nil { - log.Errorf("analyze table %s error %s", table, errors.ErrorStack(err)) + log.Errorf("%s error %s", query, errors.ErrorStack(err)) continue } log.Infof("[%s] analyze takes %v", table, time.Since(timer)) @@ -312,30 +316,9 @@ func analyzeTable(dsn config.DBStore, tables []string) error { //////////////////////////////////////////////////////////////// -// TODO ... find another way to caculate -func adjustUUID(uuid string, length int) string { - size := len(uuid) - if size > length { - uuid = uuid[size-length:] - } else if size < length { - uuid = uuid + strings.Repeat("+", length-size) - } - return uuid -} - -func makeKVDeliver( - ctx context.Context, - cfg *config.Config, - dbInfo *TidbDBInfo, - tableInfo *TidbTableInfo) (kv.KVDeliver, error) { - - uuid := uuid.Must(uuid.NewV4()) - return kv.NewKVDeliverClient(ctx, uuid, cfg.TikvImporter.Addr, cfg.TiDB.PdAddr) -} - -func setSessionVarInt(db *sql.DB, name string, value int) { +func setSessionVarInt(ctx context.Context, db *sql.DB, name string, value int) { stmt := fmt.Sprintf("set session %s = %d", name, value) - if _, err := db.Exec(stmt); err != nil { + if err := common.ExecWithRetry(ctx, db, []string{stmt}); err != nil { log.Warnf("failed to set variable @%s to %d: %s", name, value, err.Error()) } } @@ -375,7 +358,7 @@ const ( statFailed string = "failed" ) -type restoreCallback func(regionID int, maxRowID int64, rows uint64, checksum *verify.KVChecksum) error +type restoreCallback func(ctx context.Context, regionID int, maxRowID int64, rows uint64, checksum *verify.KVChecksum) error type regionRestoreTask struct { status string @@ -407,7 +390,8 @@ func newRegionRestoreTask( func (t *regionRestoreTask) Run(ctx context.Context) error { timer := time.Now() region := t.region - log.Infof("[%s] restore region [%s]", region.Table, region.Name()) + table := common.UniqueTable(region.DB, region.Table) + log.Infof("[%s] restore region [%s]", table, region.Name()) t.status = statRunning maxRowID, rows, checksum, err := t.run(ctx) @@ -415,8 +399,8 @@ func (t *regionRestoreTask) Run(ctx context.Context) error { return errors.Trace(err) } - log.Infof("[%s] restore region [%s] takes %v", region.Table, region.Name(), time.Since(timer)) - err = t.callback(region.ID, maxRowID, rows, checksum) + log.Infof("[%s] restore region [%s] takes %v", table, region.Name(), time.Since(timer)) + err = t.callback(ctx, region.ID, maxRowID, rows, checksum) if err != nil { return errors.Trace(err) } @@ -432,7 +416,8 @@ func (t *regionRestoreTask) run(ctx context.Context) (int64, uint64, *verify.KVC kvDeliver := t.delivers.AcquireClient(t.executor.dbInfo.Name, t.executor.tableInfo.Name) defer t.delivers.RecycleClient(kvDeliver) - return t.executor.Run(ctx, t.region, kvEncoder, kvDeliver) + nextRowID, affectedRows, checksum, err := t.executor.Run(ctx, t.region, kvEncoder, kvDeliver) + return nextRowID, affectedRows, checksum, errors.Trace(err) } //////////////////////////////////////////////////////////////// @@ -581,7 +566,7 @@ func NewTableRestore( timer := time.Now() tr.loadRegions() - log.Infof("[%s] load regions takes %v", tableInfo.Name, time.Since(timer)) + log.Infof("[%s] load regions takes %v", common.UniqueTable(tableMeta.DB, tableMeta.Name), time.Since(timer)) return tr } @@ -589,19 +574,18 @@ func NewTableRestore( func (tr *TableRestore) Close() { // TODO : flush table meta right now ~ tr.encoders.Clear() - log.Infof("[%s] closed", tr.tableMeta.Name) + log.Infof("[%s] closed", common.UniqueTable(tr.tableMeta.DB, tr.tableMeta.Name)) } func (tr *TableRestore) loadRegions() { - log.Infof("[%s] load regions", tr.tableMeta.Name) + log.Infof("[%s] load regions", common.UniqueTable(tr.tableMeta.DB, tr.tableMeta.Name)) founder := mydump.NewRegionFounder(tr.cfg.Mydumper.MinRegionSize) regions := founder.MakeTableRegions(tr.tableMeta) - table := tr.tableMeta.Name id2regions := make(map[int]*mydump.TableRegion) for _, region := range regions { - log.Infof("[%s] region - %s", table, region.Name()) + log.Infof("[%s] region - %s", common.UniqueTable(tr.tableMeta.DB, tr.tableMeta.Name), region.Name()) id2regions[region.ID] = region } @@ -618,8 +602,8 @@ func (tr *TableRestore) loadRegions() { return } -func (tr *TableRestore) onRegionFinished(id int, maxRowID int64, rows uint64, checksum *verify.KVChecksum) error { - table := tr.tableInfo.Name +func (tr *TableRestore) onRegionFinished(ctx context.Context, id int, maxRowID int64, rows uint64, checksum *verify.KVChecksum) error { + table := common.UniqueTable(tr.tableMeta.DB, tr.tableMeta.Name) tr.mux.Lock() defer tr.mux.Unlock() @@ -633,7 +617,7 @@ func (tr *TableRestore) onRegionFinished(id int, maxRowID int64, rows uint64, ch handled := len(tr.handledRegions) log.Infof("[%s] handled region count = %d (%s)", table, handled, common.Percent(handled, total)) if handled == len(tr.tasks) { - err := tr.onFinished() + err := tr.onFinished(ctx) if err != nil { return errors.Trace(err) } @@ -642,11 +626,7 @@ func (tr *TableRestore) onRegionFinished(id int, maxRowID int64, rows uint64, ch return nil } -func (tr *TableRestore) makeKVDeliver() (kv.KVDeliver, error) { - return makeKVDeliver(tr.ctx, tr.cfg, tr.dbInfo, tr.tableInfo) -} - -func (tr *TableRestore) onFinished() error { +func (tr *TableRestore) onFinished(ctx context.Context) error { // flush all kvs into TiKV if err := tr.importKV(); err != nil { return errors.Trace(err) @@ -665,34 +645,35 @@ func (tr *TableRestore) onFinished() error { } checksum.Add(regStat.checksum) } - table := fmt.Sprintf("%s.%s", tr.tableMeta.DB, tr.tableMeta.Name) - log.Infof("[%s] local checksum %s", tr.tableMeta.Name, checksum) + table := common.UniqueTable(tr.tableMeta.DB, tr.tableMeta.Name) + log.Infof("[%s] local checksum %s", table, checksum) tr.localChecksums[table] = checksum - if err := tr.restoreTableMeta(tableMaxRowID); err != nil { + if err := tr.restoreTableMeta(ctx, tableMaxRowID); err != nil { return errors.Trace(err) } - log.Infof("[%s] has imported %d rows", tr.tableMeta.Name, tableRows) + log.Infof("[%s] has imported %d rows", table, tableRows) return nil } -func (tr *TableRestore) restoreTableMeta(rowID int64) error { +func (tr *TableRestore) restoreTableMeta(ctx context.Context, rowID int64) error { dsn := tr.cfg.TiDB db, err := common.ConnectDB(dsn.Host, dsn.Port, dsn.User, dsn.Psw) if err != nil { + // let it failed and record it to log. + log.Errorf("connect db failed %v, the next operation is: ALTER TABLE `%s`.`%s` AUTO_INCREMENT=%d; you should do it manually", err, tr.tableMeta.DB, tr.tableMeta.Name, rowID) return errors.Trace(err) } defer db.Close() - return errors.Trace(AlterAutoIncrement(db, tr.tableMeta.DB, tr.tableMeta.Name, rowID)) + return errors.Trace(AlterAutoIncrement(ctx, db, tr.tableMeta.DB, tr.tableMeta.Name, rowID)) } func (tr *TableRestore) importKV() error { - table := tr.tableInfo.Name + table := common.UniqueTable(tr.tableMeta.DB, tr.tableMeta.Name) log.Infof("[%s] flush kv deliver ...", table) - // kvDeliver, _ := tr.makeKVDeliver() kvDeliver := tr.deliversMgr start := time.Now() @@ -719,30 +700,34 @@ type RemoteChecksum struct { TotalBytes uint64 } +func (c *RemoteChecksum) String() string { + return fmt.Sprintf("[%s] remote_checksum=%d, total_kvs=%d, total_bytes=%d", common.UniqueTable(c.Schema, c.Table), c.Checksum, c.TotalKVs, c.TotalBytes) +} + // DoChecksum do checksum for tables. // table should be in ., format. e.g. foo.bar -func DoChecksum(dsn config.DBStore, tables []string) ([]*RemoteChecksum, error) { +func DoChecksum(ctx context.Context, dsn config.DBStore, tables []string) ([]*RemoteChecksum, error) { db, err := common.ConnectDB(dsn.Host, dsn.Port, dsn.User, dsn.Psw) if err != nil { return nil, errors.Trace(err) } defer db.Close() - ori, err := increaseGCLifeTime(db) + ori, err := increaseGCLifeTime(ctx, db) if err != nil { return nil, errors.Trace(err) } // set it back finally defer func() { - err = UpdateGCLifeTime(db, ori) + err = UpdateGCLifeTime(ctx, db, ori) if err != nil { log.Errorf("update tikv_gc_life_time error %s", errors.ErrorStack(err)) } }() // speed up executing checksum table temporarily - setSessionVarInt(db, "tidb_checksum_table_concurrency", 16) - setSessionVarInt(db, "tidb_distsql_scan_concurrency", dsn.DistSQLScanConcurrency) + setSessionVarInt(ctx, db, "tidb_checksum_table_concurrency", 16) + setSessionVarInt(ctx, db, "tidb_distsql_scan_concurrency", dsn.DistSQLScanConcurrency) // ADMIN CHECKSUM TABLE
,
example. // mysql> admin checksum table test.t; @@ -755,22 +740,25 @@ func DoChecksum(dsn config.DBStore, tables []string) ([]*RemoteChecksum, error) checksums := make([]*RemoteChecksum, 0, len(tables)) // do table checksum one by one instead of doing all at once to make tikv server comfortable for _, table := range tables { + timer := time.Now() cs := RemoteChecksum{} log.Infof("[%s] doing remote checksum", table) - err := db.QueryRow(fmt.Sprintf("ADMIN CHECKSUM TABLE %s", table)).Scan(&cs.Schema, &cs.Table, &cs.Checksum, &cs.TotalKVs, &cs.TotalBytes) + query := fmt.Sprintf("ADMIN CHECKSUM TABLE %s", table) + common.QueryRowWithRetry(ctx, db, query, &cs.Schema, &cs.Table, &cs.Checksum, &cs.TotalKVs, &cs.TotalBytes) if err != nil { return nil, errors.Trace(err) } checksums = append(checksums, &cs) + log.Infof("[%s] do checksum takes %v", table, time.Since(timer)) } return checksums, nil } -func increaseGCLifeTime(db *sql.DB) (oriGCLifeTime string, err error) { +func increaseGCLifeTime(ctx context.Context, db *sql.DB) (oriGCLifeTime string, err error) { // checksum command usually takes a long time to execute, // so here need to increase the gcLifeTime for single transaction. - oriGCLifeTime, err = ObtainGCLifeTime(db) + oriGCLifeTime, err = ObtainGCLifeTime(ctx, db) if err != nil { return "", errors.Trace(err) } @@ -789,7 +777,7 @@ func increaseGCLifeTime(db *sql.DB) (oriGCLifeTime string, err error) { } if increaseGCLifeTime { - err = UpdateGCLifeTime(db, defaultGCLifeTime.String()) + err = UpdateGCLifeTime(ctx, db, defaultGCLifeTime.String()) if err != nil { return "", errors.Trace(err) } @@ -798,22 +786,6 @@ func increaseGCLifeTime(db *sql.DB) (oriGCLifeTime string, err error) { return oriGCLifeTime, nil } -func (tr *TableRestore) excCheckTable() error { - log.Infof("Verify by execute `admin check table` : %s", tr.tableMeta.Name) - - dsn := tr.cfg.TiDB - db, err := common.ConnectDB(dsn.Host, dsn.Port, dsn.User, dsn.Psw) - if err != nil { - return errors.Trace(err) - } - defer db.Close() - - // verify datas completion via command "admin check table" - _, err = db.Exec( - fmt.Sprintf("ADMIN CHECK TABLE %s.%s", tr.tableMeta.DB, tr.tableMeta.Name)) - return errors.Trace(err) -} - //////////////////////////////////////////////////////////////// type RegionRestoreExectuor struct { @@ -844,7 +816,7 @@ func (exc *RegionRestoreExectuor) Run( ctx context.Context, region *mydump.TableRegion, kvEncoder *kv.TableKVEncoder, - kvDeliver kv.KVDeliver) (int64, uint64, *verify.KVChecksum, error) { + kvDeliver kv.KVDeliver) (nextRowID int64, affectedRows uint64, checksum *verify.KVChecksum, err error) { /* Flows : @@ -859,13 +831,13 @@ func (exc *RegionRestoreExectuor) Run( } defer reader.Close() - table := exc.tableInfo.Name + table := common.UniqueTable(exc.tableMeta.DB, exc.tableMeta.Name) readMark := fmt.Sprintf("[%s]_read_file", table) encodeMark := fmt.Sprintf("[%s]_sql_2_kv", table) deliverMark := fmt.Sprintf("[%s]_deliver_write", table) rows := uint64(0) - checksum := verify.NewKVChecksum(0) + checksum = verify.NewKVChecksum(0) /* TODO : So far, since checksum can not recompute on the same key-value pair, @@ -883,7 +855,7 @@ func (exc *RegionRestoreExectuor) Run( start := time.Now() sqls, err := reader.Read(defReadBlockSize) - if err == io.EOF { + if errors.Cause(err) == io.EOF { break } metrics.MarkTiming(readMark, start) @@ -893,7 +865,7 @@ func (exc *RegionRestoreExectuor) Run( start = time.Now() kvs, affectedRows, err := kvEncoder.SQL2KV(stmt) metrics.MarkTiming(encodeMark, start) - + log.Debugf("len(kvs) %d, len(sql) %d", len(kvs), len(stmt)) if err != nil { log.Errorf("kv encode failed = %s\n", err.Error()) return kvEncoder.NextRowID(), rows, checksum, errors.Trace(err) diff --git a/lightning/restore/tidb.go b/lightning/restore/tidb.go index bf377144331a9..a71bc5c419552 100644 --- a/lightning/restore/tidb.go +++ b/lightning/restore/tidb.go @@ -8,11 +8,14 @@ import ( "net/http" "net/url" "strings" + "time" "github.com/juju/errors" "github.com/pingcap/tidb-lightning/lightning/common" "github.com/pingcap/tidb-lightning/lightning/config" "github.com/pingcap/tidb/model" + log "github.com/sirupsen/logrus" + "golang.org/x/net/context" ) type TiDBManager struct { @@ -57,26 +60,30 @@ func (timgr *TiDBManager) Close() { timgr.db.Close() } -func (timgr *TiDBManager) InitSchema(database string, tablesSchema map[string]string) error { - _, err := timgr.db.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", database)) +func (timgr *TiDBManager) InitSchema(ctx context.Context, database string, tablesSchema map[string]string) error { + createDatabase := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS `%s`", database) + err := common.ExecWithRetry(ctx, timgr.db, []string{createDatabase}) if err != nil { return errors.Trace(err) } - _, err = timgr.db.Exec(fmt.Sprintf("USE %s", database)) + useDB := fmt.Sprintf("USE `%s`", database) + err = common.ExecWithRetry(ctx, timgr.db, []string{useDB}) if err != nil { return errors.Trace(err) } for _, sqlCreateTable := range tablesSchema { - if err = safeCreateTable(timgr.db, sqlCreateTable); err != nil { + timer := time.Now() + if err = safeCreateTable(ctx, timgr.db, sqlCreateTable); err != nil { return errors.Trace(err) } + log.Infof("%s takes %v", sqlCreateTable, time.Since(timer)) } return nil } -func toCreateTableIfNotExists(createTable string) string { +func createTableIfNotExistsStmt(createTable string) string { upCreateTable := strings.ToUpper(createTable) if strings.Index(upCreateTable, "CREATE TABLE IF NOT EXISTS") < 0 { substrs := strings.SplitN(upCreateTable, "CREATE TABLE", 2) @@ -90,9 +97,9 @@ func toCreateTableIfNotExists(createTable string) string { return createTable } -func safeCreateTable(db *sql.DB, createTable string) error { - createTable = toCreateTableIfNotExists(createTable) - _, err := db.Exec(createTable) +func safeCreateTable(ctx context.Context, db *sql.DB, createTable string) error { + createTable = createTableIfNotExistsStmt(createTable) + err := common.ExecWithRetry(ctx, db, []string{createTable}) return errors.Trace(err) } @@ -155,7 +162,7 @@ func (timgr *TiDBManager) getTables(schema string) ([]*model.TableInfo, error) { return tables, errors.Annotatef(err, "get tables for schema %s", schema) } -func (timgr *TiDBManager) LoadSchemaInfo(schema string) (*TidbDBInfo, error) { +func (timgr *TiDBManager) LoadSchemaInfo(ctx context.Context, schema string) (*TidbDBInfo, error) { tables, err := timgr.getTables(schema) if err != nil { return nil, errors.Trace(err) @@ -171,7 +178,7 @@ func (timgr *TiDBManager) LoadSchemaInfo(schema string) (*TidbDBInfo, error) { if tbl.State != model.StatePublic { return nil, errors.Errorf("table [%s.%s] state is not public", schema, tableName) } - createTableStmt, err := timgr.getCreateTableStmt(schema, tableName) + createTableStmt, err := timgr.getCreateTableStmt(ctx, schema, tableName) if err != nil { return nil, errors.Trace(err) } @@ -189,28 +196,29 @@ func (timgr *TiDBManager) LoadSchemaInfo(schema string) (*TidbDBInfo, error) { return dbInfo, nil } -func (timgr *TiDBManager) getCreateTableStmt(schema, table string) (string, error) { +func (timgr *TiDBManager) getCreateTableStmt(ctx context.Context, schema, table string) (string, error) { query := fmt.Sprintf("SHOW CREATE TABLE `%s`.`%s`", schema, table) var tbl, createTable string - err := timgr.db.QueryRow(query).Scan(&tbl, &createTable) - return createTable, errors.Annotatef(err, "query %s", query) + err := common.QueryRowWithRetry(ctx, timgr.db, query, &tbl, &createTable) + return createTable, errors.Annotatef(err, "%s", query) } -func ObtainGCLifeTime(db *sql.DB) (gcLifeTime string, err error) { - r := db.QueryRow( - "SELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME = 'tikv_gc_life_time'") - err = r.Scan(&gcLifeTime) - return gcLifeTime, errors.Annotatef(err, "query tikv_gc_life_time") +func ObtainGCLifeTime(ctx context.Context, db *sql.DB) (gcLifeTime string, err error) { + query := "SELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME = 'tikv_gc_life_time'" + err = common.QueryRowWithRetry(ctx, db, query, &gcLifeTime) + return gcLifeTime, errors.Annotatef(err, "%s", query) } -func UpdateGCLifeTime(db *sql.DB, gcLifeTime string) error { - _, err := db.Exec(fmt.Sprintf( - "UPDATE mysql.tidb SET VARIABLE_VALUE = '%s' WHERE VARIABLE_NAME = 'tikv_gc_life_time'", gcLifeTime)) - return errors.Annotatef(err, "update tikv_gc_life_time=%s", gcLifeTime) +func UpdateGCLifeTime(ctx context.Context, db *sql.DB, gcLifeTime string) error { + query := fmt.Sprintf( + "UPDATE mysql.tidb SET VARIABLE_VALUE = '%s' WHERE VARIABLE_NAME = 'tikv_gc_life_time'", gcLifeTime) + err := common.ExecWithRetry(ctx, db, []string{query}) + return errors.Annotatef(err, "%s", query) } -func AlterAutoIncrement(db *sql.DB, schema string, table string, incr int64) error { +func AlterAutoIncrement(ctx context.Context, db *sql.DB, schema string, table string, incr int64) error { query := fmt.Sprintf("ALTER TABLE `%s`.`%s` AUTO_INCREMENT=%d", schema, table, incr) - _, err := db.Exec(query) - return errors.Annotatef(err, "alter table %s.%s auto_increment=%d", schema, table, incr) + log.Infof("[%s.%s] %s", schema, table, query) + err := common.ExecWithRetry(ctx, db, []string{query}) + return errors.Annotatef(err, "%s", query) } diff --git a/lightning/sql/parser_test.go b/lightning/sql/parser_test.go index 46d1be178a574..e036751362308 100644 --- a/lightning/sql/parser_test.go +++ b/lightning/sql/parser_test.go @@ -9,6 +9,7 @@ import ( "testing" . "github.com/pingcap/check" + "github.com/pkg/errors" "github.com/pingcap/tidb-lightning/lightning/common" "github.com/pingcap/tidb-lightning/lightning/config" @@ -168,7 +169,7 @@ func (s *testParserSuite) testParseRealFile(c *C) { for { statments, err := reader.Read(4 * 1024) - if err == io.EOF { + if errors.Cause(err) == io.EOF { break }