Skip to content

Commit

Permalink
*: refine log and enable retry with sql execution (pingcap#43)
Browse files Browse the repository at this point in the history
Goal:

Improve stability
Improve log readability

What changes are:

Refine the log, so we can know the processing stage of each table.
Enable retry for sql execution and grpc api calling.
Remove useless code.
  • Loading branch information
holys authored Jun 11, 2018
1 parent ace0456 commit abc18e4
Show file tree
Hide file tree
Showing 11 changed files with 317 additions and 358 deletions.
131 changes: 126 additions & 5 deletions lightning/common/util.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,27 @@
package common

import (
"context"
"fmt"
"net"
"os"
"strings"
"time"

"database/sql"
"database/sql/driver"
"path/filepath"

_ "github.com/go-sql-driver/mysql"
"github.com/go-sql-driver/mysql"
"github.com/juju/errors"
"github.com/ngaut/log"
tmysql "github.com/pingcap/tidb/mysql"
log "github.com/sirupsen/logrus"
)

const (
retryTimeout = 3 * time.Second

defaultMaxRetry = 3
)

func Percent(a int, b int) string {
Expand All @@ -30,13 +41,13 @@ func ConnectDB(host string, port int, user string, psw string) (*sql.DB, error)
func GetFileSize(file string) (int64, error) {
fd, err := os.Open(file)
if err != nil {
return -1, err
return -1, errors.Trace(err)
}
defer fd.Close()

fstat, err := fd.Stat()
if err != nil {
return -1, err
return -1, errors.Trace(err)
}

return fstat.Size(), nil
Expand All @@ -59,7 +70,7 @@ func IsDirExists(name string) bool {
func EnsureDir(dir string) error {
if !FileExists(dir) {
if err := os.MkdirAll(dir, os.ModePerm); err != nil {
return err
return errors.Trace(err)
}
}
return nil
Expand Down Expand Up @@ -90,3 +101,113 @@ func ListFiles(dir string) map[string]string {

return files
}

func QueryRowWithRetry(ctx context.Context, db *sql.DB, query string, dest ...interface{}) (err error) {
maxRetry := defaultMaxRetry
for i := 0; i < maxRetry; i++ {
if i > 0 {
log.Warnf("query %s retry %d: %v", query, i, dest)
time.Sleep(retryTimeout)
}

err = db.QueryRowContext(ctx, query).Scan(dest...)
if err != nil {
if !isRetryableError(err) {
return errors.Trace(err)
}
log.Warnf("query %s [error] %v", query, err)
continue
}

return nil
}

return errors.Errorf("query sql [%s] failed", query)
}

// ExecWithRetry executes sqls with optional retry.
func ExecWithRetry(ctx context.Context, db *sql.DB, sqls []string) error {
maxRetry := defaultMaxRetry

if len(sqls) == 0 {
return nil
}

var err error
for i := 0; i < maxRetry; i++ {
if i > 0 {
log.Warnf("sql stmt_exec retry %d: %v", i, sqls)
time.Sleep(retryTimeout)
}

if err = executeSQLImp(ctx, db, sqls); err != nil {
if isRetryableError(err) {
continue
}
log.Errorf("[exec][sql] %s [error] %v", sqls, err)
return errors.Trace(err)
}

return nil
}

return errors.Errorf("exec sqls [%v] failed, err:%s", sqls, err.Error())
}

func executeSQLImp(ctx context.Context, db *sql.DB, sqls []string) error {
txn, err := db.BeginTx(ctx, nil)
if err != nil {
log.Errorf("exec sqls [%v] begin failed %v", sqls, errors.ErrorStack(err))
return errors.Trace(err)
}

for i := range sqls {
log.Debugf("[exec][sql] %s", sqls[i])

_, err = txn.ExecContext(ctx, sqls[i])
if err != nil {
log.Warnf("[exec][sql] %s [error]%v", sqls[i], err)
rerr := txn.Rollback()
if rerr != nil {
log.Errorf("[exec][sql] %s [error] %v", sqls[i], rerr)
}
// we should return the exec err, instead of the rollback rerr.
return errors.Trace(err)
}
}
err = txn.Commit()
if err != nil {
log.Errorf("exec sqls [%v] commit failed %v", sqls, errors.ErrorStack(err))
return errors.Trace(err)
}
return nil
}

func isRetryableError(err error) bool {
err = errors.Cause(err)
if err == driver.ErrBadConn {
return true
}

if nerr, ok := err.(net.Error); ok {
return nerr.Timeout()
}

mysqlErr, ok := err.(*mysql.MySQLError)
if ok {
switch mysqlErr.Number {
// ErrLockDeadlock can retry to commit while meet deadlock
case tmysql.ErrUnknown, tmysql.ErrLockDeadlock, tmysql.ErrPDServerTimeout, tmysql.ErrTiKVServerTimeout, tmysql.ErrTiKVServerBusy, tmysql.ErrResolveLockTimeout, tmysql.ErrRegionUnavailable:
return true
default:
return false
}
}

return true
}

// UniqueTable returns an unique table name.
func UniqueTable(schema string, table string) string {
return fmt.Sprintf("`%s`.`%s`", schema, table)
}
Loading

0 comments on commit abc18e4

Please sign in to comment.