From 46d5d0ef5d60b37a888136d89c89349720c75eb9 Mon Sep 17 00:00:00 2001 From: Jack Yu Date: Tue, 2 Apr 2019 17:21:30 +0800 Subject: [PATCH] session: use the uniform log format for session (#9517) (#9990) --- session/bench_test.go | 18 ++++--- session/bootstrap.go | 34 +++++++----- session/session.go | 114 ++++++++++++++++++++++++++++------------ session/session_test.go | 4 +- session/tidb.go | 23 +++++--- session/txn.go | 26 ++++----- 6 files changed, 142 insertions(+), 77 deletions(-) diff --git a/session/bench_test.go b/session/bench_test.go index b68a1bdf76823..6216197ae7cec 100644 --- a/session/bench_test.go +++ b/session/bench_test.go @@ -16,14 +16,18 @@ package session import ( "fmt" "math/rand" + "strconv" "testing" "time" + "github.com/pingcap/log" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/mockstore" + "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sqlexec" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" "golang.org/x/net/context" ) @@ -33,16 +37,16 @@ var bigCount = 10000 func prepareBenchSession() (Session, *domain.Domain, kv.Storage) { store, err := mockstore.NewMockTikvStore() if err != nil { - log.Fatal(err) + logutil.Logger(context.Background()).Fatal(err.Error()) } domain, err := BootstrapSession(store) if err != nil { - log.Fatal(err) + logutil.Logger(context.Background()).Fatal(err.Error()) } - log.SetLevel(log.ErrorLevel) + log.SetLevel(zapcore.ErrorLevel) se, err := CreateSession4Test(store) if err != nil { - log.Fatal(err) + logutil.Logger(context.Background()).Fatal(err.Error()) } mustExecute(se, "use test") return se, domain, store @@ -88,10 +92,10 @@ func readResult(ctx context.Context, rs sqlexec.RecordSet, count int) { for count > 0 { err := rs.Next(ctx, chk) if err != nil { - log.Fatal(err) + logutil.Logger(ctx).Fatal("read result failed", zap.Error(err)) } if chk.NumRows() == 0 { - log.Fatal(count) + logutil.Logger(ctx).Fatal(strconv.Itoa(count)) } count -= chk.NumRows() } diff --git a/session/bootstrap.go b/session/bootstrap.go index 696c06eb2888e..9f1d51f6e99da 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -33,8 +33,9 @@ import ( "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/timeutil" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" "golang.org/x/net/context" ) @@ -215,7 +216,8 @@ const ( func bootstrap(s Session) { b, err := checkBootstrapped(s) if err != nil { - log.Fatal(err) + logutil.Logger(context.Background()).Fatal("check bootstrap error", + zap.Error(err)) } if b { upgrade(s) @@ -268,7 +270,8 @@ func checkBootstrapped(s Session) (bool, error) { // Check if system db exists. _, err := s.Execute(context.Background(), fmt.Sprintf("USE %s;", mysql.SystemDB)) if err != nil && infoschema.ErrDatabaseNotExists.NotEqual(err) { - log.Fatal(err) + logutil.Logger(context.Background()).Fatal("check bootstrap error", + zap.Error(err)) } // Check bootstrapped variable value in TiDB table. sVal, _, err := getTiDBVar(s, bootstrappedVar) @@ -424,14 +427,17 @@ func upgrade(s Session) { // Check if TiDB is already upgraded. v, err1 := getBootstrapVersion(s) if err1 != nil { - log.Fatal(err1) + logutil.Logger(context.Background()).Fatal("upgrade error", + zap.Error(err1)) } if v >= currentBootstrapVersion { // It is already bootstrapped/upgraded by a higher version TiDB server. return } - log.Errorf("[Upgrade] upgrade from %d to %d error", ver, currentBootstrapVersion) - log.Fatal(err) + logutil.Logger(context.Background()).Fatal("[Upgrade] upgrade error", + zap.Int64("from", ver), + zap.Int("to", currentBootstrapVersion), + zap.Error(err)) } } @@ -503,7 +509,7 @@ func doReentrantDDL(s Session, sql string, ignorableErrs ...error) { } } if err != nil { - log.Fatal(err) + logutil.Logger(context.Background()).Fatal("doReentrantDDL error", zap.Error(err)) } } @@ -521,7 +527,7 @@ func upgradeToVer11(s Session) { if terror.ErrorEqual(err, infoschema.ErrColumnExists) { return } - log.Fatal(err) + logutil.Logger(context.Background()).Fatal("upgradeToVer11 error", zap.Error(err)) } mustExecute(s, "UPDATE HIGH_PRIORITY mysql.user SET References_priv='Y'") } @@ -582,7 +588,7 @@ func upgradeToVer13(s Session) { if terror.ErrorEqual(err, infoschema.ErrColumnExists) { continue } - log.Fatal(err) + logutil.Logger(context.Background()).Fatal("upgradeToVer13 error", zap.Error(err)) } } mustExecute(s, "UPDATE HIGH_PRIORITY mysql.user SET Create_tmp_table_priv='Y',Lock_tables_priv='Y',Create_view_priv='Y',Show_view_priv='Y',Create_routine_priv='Y',Alter_routine_priv='Y',Event_priv='Y'") @@ -607,7 +613,7 @@ func upgradeToVer14(s Session) { if terror.ErrorEqual(err, infoschema.ErrColumnExists) { continue } - log.Fatal(err) + logutil.Logger(context.Background()).Fatal("upgradeToVer14 error", zap.Error(err)) } } } @@ -616,7 +622,7 @@ func upgradeToVer15(s Session) { var err error _, err = s.Execute(context.Background(), CreateGCDeleteRangeTable) if err != nil { - log.Fatal(err) + logutil.Logger(context.Background()).Fatal("upgradeToVer15 error", zap.Error(err)) } } @@ -760,12 +766,12 @@ func doDMLWorks(s Session) { // Check if TiDB is already bootstrapped. b, err1 := checkBootstrapped(s) if err1 != nil { - log.Fatal(err1) + logutil.Logger(context.Background()).Fatal("doDMLWorks error", zap.Error(err1)) } if b { return } - log.Fatal(err) + logutil.Logger(context.Background()).Fatal("doDMLWorks error", zap.Error(err)) } } @@ -773,7 +779,7 @@ func mustExecute(s Session, sql string) { _, err := s.Execute(context.Background(), sql) if err != nil { debug.PrintStack() - log.Fatal(err) + logutil.Logger(context.Background()).Fatal("mustExecute error", zap.Error(err)) } } diff --git a/session/session.go b/session/session.go index 78865c54eb51f..c8ecd91abc569 100644 --- a/session/session.go +++ b/session/session.go @@ -57,10 +57,11 @@ import ( "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/kvcache" + "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tidb/util/timeutil" "github.com/pingcap/tipb/go-binlog" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" "golang.org/x/net/context" ) @@ -234,13 +235,13 @@ func (s *session) StoreQueryFeedback(feedback interface{}) { if s.statsCollector != nil { do, err := GetDomain(s.store) if err != nil { - log.Debug("domain not found: ", err) + logutil.Logger(context.Background()).Debug("domain not found", zap.Error(err)) metrics.StoreQueryFeedbackCounter.WithLabelValues(metrics.LblError).Inc() return } err = s.statsCollector.StoreQueryFeedback(feedback, do.StatsHandle()) if err != nil { - log.Debug("store query feedback error: ", err) + logutil.Logger(context.Background()).Debug("store query feedback", zap.Error(err)) metrics.StoreQueryFeedbackCounter.WithLabelValues(metrics.LblError).Inc() return } @@ -343,7 +344,10 @@ func (s *session) doCommitWithRetry(ctx context.Context) error { // BatchInsert already commit the first batch 1000 rows, then it commit 1000-2000 and retry the statement, // Finally t1 will have more data than t2, with no errors return to user! if s.isRetryableError(err) && !s.sessionVars.BatchInsert && !isBatched && commitRetryLimit > 0 { - log.Warnf("[%s] con:%d retryable error: %v, txn: %#v", s.getSQLLabel(), s.sessionVars.ConnectionID, err, &s.txn) + logutil.Logger(ctx).Warn("sql", + zap.String("label", s.getSQLLabel()), + zap.Error(err), + zap.String("txn", s.txn.GoString())) // Transactions will retry 2 ~ commitRetryLimit times. // We make larger transactions retry less times to prevent cluster resource outage. txnSizeRate := float64(txnSize) / float64(kv.TxnTotalSizeLimit) @@ -369,7 +373,9 @@ func (s *session) doCommitWithRetry(ctx context.Context) error { } if err != nil { - log.Warnf("con:%d finished txn:%#v, %v", s.sessionVars.ConnectionID, &s.txn, err) + logutil.Logger(ctx).Warn("commit failed", + zap.String("finished txn", s.txn.GoString()), + zap.Error(err)) return errors.Trace(err) } mapper := s.GetSessionVars().TxnCtx.TableDeltaMap @@ -526,10 +532,16 @@ func (s *session) retry(ctx context.Context, maxCnt uint) (err error) { if retryCnt == 0 { // We do not have to log the query every time. // We print the queries at the first try only. - log.Warnf("con:%d schema_ver:%d retry_cnt:%d query_num:%d sql:%s%s", connID, schemaVersion, retryCnt, - i, sqlForLog(st.OriginText()), sessVars.GetExecuteArgumentsInfo()) + logutil.Logger(ctx).Warn("retrying", + zap.Int64("schemaVersion", schemaVersion), + zap.Uint("retryCnt", retryCnt), + zap.Int("queryNum", i), + zap.String("sql", sqlForLog(st.OriginText())+sessVars.GetExecuteArgumentsInfo())) } else { - log.Warnf("con:%d schema_ver:%d retry_cnt:%d query_num:%d", connID, schemaVersion, retryCnt, i) + logutil.Logger(ctx).Warn("retrying", + zap.Int64("schemaVersion", schemaVersion), + zap.Uint("retryCnt", retryCnt), + zap.Int("queryNum", i)) } _, err = st.Exec(ctx) if err != nil { @@ -541,8 +553,9 @@ func (s *session) retry(ctx context.Context, maxCnt uint) (err error) { return errors.Trace(err) } } - log.Warnf("con:%d retrying_txn_start_ts:%d original_txn_start_ts:(%d)", - connID, s.GetSessionVars().TxnCtx.StartTS, orgStartTS) + logutil.Logger(ctx).Warn("transaction association", + zap.Uint64("retrying txnStartTS", s.GetSessionVars().TxnCtx.StartTS), + zap.Uint64("original txnStartTS", orgStartTS)) if hook := ctx.Value("preCommitHook"); hook != nil { // For testing purpose. hook.(func())() @@ -554,17 +567,25 @@ func (s *session) retry(ctx context.Context, maxCnt uint) (err error) { } } if !s.isRetryableError(err) { - log.Warnf("[%s] con:%d session:%v, err:%v in retry", label, connID, s, err) + logutil.Logger(ctx).Warn("sql", + zap.String("label", label), + zap.Stringer("session", s), + zap.Error(err)) metrics.SessionRetryErrorCounter.WithLabelValues(label, metrics.LblUnretryable) return errors.Trace(err) } retryCnt++ if retryCnt >= maxCnt { - log.Warnf("[%s] con:%d Retry reached max count %d", label, connID, retryCnt) + logutil.Logger(ctx).Warn("sql", + zap.String("label", label), + zap.Uint("retry reached max count", retryCnt)) metrics.SessionRetryErrorCounter.WithLabelValues(label, metrics.LblReachMax) return errors.Trace(err) } - log.Warnf("[%s] con:%d retryable error: %v, txn: %#v", label, connID, err, &s.txn) + logutil.Logger(ctx).Warn("sql", + zap.String("label", label), + zap.Error(err), + zap.String("txn", s.txn.GoString())) kv.BackOff(retryCnt) s.txn.changeToInvalid() s.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, false) @@ -796,8 +817,10 @@ func (s *session) executeStatement(ctx context.Context, connID uint64, stmtNode recordSet, err := runStmt(ctx, s, stmt) if err != nil { if !kv.ErrKeyExists.Equal(err) { - log.Warnf("con:%d schema_ver:%d session error:\n%v\n%s", - connID, s.sessionVars.TxnCtx.SchemaVersion, errors.ErrorStack(err), s) + logutil.Logger(ctx).Warn("run statement error", + zap.Int64("schemaVersion", s.sessionVars.TxnCtx.SchemaVersion), + zap.Error(err), + zap.String("session", s.String())) } return nil, errors.Trace(err) } @@ -832,7 +855,9 @@ func (s *session) execute(ctx context.Context, sql string) (recordSets []sqlexec stmtNodes, warns, err := s.ParseSQL(ctx, sql, charsetInfo, collation) if err != nil { s.rollbackOnError(ctx) - log.Warnf("con:%d parse error:\n%v\n%s", connID, err, sql) + logutil.Logger(ctx).Warn("parse sql error", + zap.Error(err), + zap.String("sql", sql)) return nil, errors.Trace(err) } label := s.getSQLLabel() @@ -851,7 +876,9 @@ func (s *session) execute(ctx context.Context, sql string) (recordSets []sqlexec stmt, err := compiler.Compile(ctx, stmtNode) if err != nil { s.rollbackOnError(ctx) - log.Warnf("con:%d compile error:\n%v\n%s", connID, err, sql) + logutil.Logger(ctx).Warn("compile sql error", + zap.Error(err), + zap.String("sql", sql)) return nil, errors.Trace(err) } metrics.SessionExecuteCompileDuration.WithLabelValues(label).Observe(time.Since(startTS).Seconds()) @@ -993,7 +1020,8 @@ func (s *session) Txn(active bool) (kv.Transaction, error) { // If Txn() is called later, wait for the future to get a valid txn. txnCap := s.getMembufCap() if err := s.txn.changePendingToValid(txnCap); err != nil { - log.Errorf("active transaction fail, err = %+v", err) + logutil.Logger(context.Background()).Error("active transaction fail", + zap.Error(err)) s.txn.cleanup() s.sessionVars.TxnCtx.StartTS = 0 return &s.txn, errors.Trace(err) @@ -1015,7 +1043,9 @@ func (s *session) NewTxn() error { return errors.Trace(err) } vars := s.GetSessionVars() - log.Infof("con:%d schema_ver:%d NewTxn() inside a transaction auto commit: %d", vars.ConnectionID, vars.TxnCtx.SchemaVersion, txnID) + logutil.Logger(ctx).Info("NewTxn() inside a transaction auto commit", + zap.Int64("schemaVersion", vars.TxnCtx.SchemaVersion), + zap.Uint64("txnStartTS", txnID)) } txn, err := s.store.Begin() @@ -1061,7 +1091,8 @@ func (s *session) Close() { } ctx := context.TODO() if err := s.RollbackTxn(ctx); err != nil { - log.Error("session Close error:", errors.ErrorStack(err)) + logutil.Logger(context.Background()).Error("session Close failed", + zap.Error(err)) } } @@ -1078,7 +1109,8 @@ func (s *session) Auth(user *auth.UserIdentity, authentication []byte, salt []by s.sessionVars.User = user return true } else if user.Hostname == variable.DefHostname { - log.Errorf("User connection verification failed %s", user) + logutil.Logger(context.Background()).Error("user connection verification failed", + zap.Stringer("user", user)) return false } @@ -1093,7 +1125,8 @@ func (s *session) Auth(user *auth.UserIdentity, authentication []byte, salt []by } } - log.Errorf("User connection verification failed %s", user) + logutil.Logger(context.Background()).Error("user connection verification failed", + zap.Stringer("user", user)) return false } @@ -1159,7 +1192,7 @@ func loadSystemTZ(se *session) (string, error) { // the record of mysql.tidb under where condition: variable_name = "system_tz" should shall only be one. defer func() { if err := rss[0].Close(); err != nil { - log.Error(errors.ErrorStack(err)) + logutil.Logger(context.Background()).Error("close result set error", zap.Error(err)) } }() chk := rss[0].NewChunk() @@ -1244,14 +1277,14 @@ func GetDomain(store kv.Storage) (*domain.Domain, error) { // runInBootstrapSession create a special session for boostrap to run. // If no bootstrap and storage is remote, we must use a little lease time to // bootstrap quickly, after bootstrapped, we will reset the lease time. -// TODO: Using a bootstap tool for doing this may be better later. +// TODO: Using a bootstrap tool for doing this may be better later. func runInBootstrapSession(store kv.Storage, bootstrap func(Session)) { saveLease := schemaLease schemaLease = chooseMinLease(schemaLease, 100*time.Millisecond) s, err := createSession(store) if err != nil { // Bootstrap fail will cause program exit. - log.Fatal(errors.ErrorStack(err)) + logutil.Logger(context.Background()).Fatal("createSession error", zap.Error(err)) } schemaLease = saveLease @@ -1333,7 +1366,8 @@ func getStoreBootstrapVersion(store kv.Storage) int64 { }) if err != nil { - log.Fatalf("check bootstrapped err %v", err) + logutil.Logger(context.Background()).Fatal("check bootstrapped failed", + zap.Error(err)) } if ver > notBootstrapped { @@ -1355,7 +1389,8 @@ func finishBootstrap(store kv.Storage) { return errors.Trace(err) }) if err != nil { - log.Fatalf("finish bootstrap err %v", err) + logutil.Logger(context.Background()).Fatal("finish bootstrap failed", + zap.Error(err)) } } @@ -1425,7 +1460,7 @@ func (s *session) loadCommonGlobalVariablesIfNeeded() error { rows, fields, err = s.ExecRestrictedSQL(s, loadCommonGlobalVarsSQL) if err != nil { vars.CommonGlobalLoaded = false - log.Errorf("Failed to load common global variables.") + logutil.Logger(context.Background()).Error("failed to load common global variables.") return errors.Trace(err) } gvc.Update(rows, fields) @@ -1516,9 +1551,18 @@ func logStmt(node ast.StmtNode, vars *variable.SessionVars) { user := vars.User schemaVersion := vars.TxnCtx.SchemaVersion if ss, ok := node.(ast.SensitiveStmtNode); ok { - log.Infof("[CRUCIAL OPERATION] con:%d schema_ver:%d %s (by %s).", vars.ConnectionID, schemaVersion, ss.SecureText(), user) + logutil.Logger(context.Background()).Info("[CRUCIAL OPERATION]", + zap.Uint64("con", vars.ConnectionID), + zap.Int64("schemaVersion", schemaVersion), + zap.String("secure text", ss.SecureText()), + zap.Stringer("user", user)) } else { - log.Infof("[CRUCIAL OPERATION] con:%d schema_ver:%d %s (by %s).", vars.ConnectionID, schemaVersion, stmt.Text(), user) + logutil.Logger(context.Background()).Info("[CRUCIAL OPERATION]", + zap.Uint64("con", vars.ConnectionID), + zap.Int64("schemaVersion", schemaVersion), + zap.String("cur_db", vars.CurrentDB), + zap.String("sql", stmt.Text()), + zap.Stringer("user", user)) } default: logQuery(node.Text(), vars) @@ -1528,8 +1572,12 @@ func logStmt(node ast.StmtNode, vars *variable.SessionVars) { func logQuery(query string, vars *variable.SessionVars) { if atomic.LoadUint32(&variable.ProcessGeneralLog) != 0 && !vars.InRestrictedSQL { query = executor.QueryReplacer.Replace(query) - log.Infof("[GENERAL_LOG] con:%d user:%s schema_ver:%d txn_start_ts:%d current_db:%s, sql:%s%s", - vars.ConnectionID, vars.User, vars.TxnCtx.SchemaVersion, vars.TxnCtx.StartTS, vars.CurrentDB, query, - vars.GetExecuteArgumentsInfo()) + logutil.Logger(context.Background()).Info("[GENERAL_LOG]", + zap.Uint64("con", vars.ConnectionID), + zap.Stringer("user", vars.User), + zap.Int64("schemaVersion", vars.TxnCtx.SchemaVersion), + zap.Uint64("txnStartTS", vars.TxnCtx.StartTS), + zap.String("current_db", vars.CurrentDB), + zap.String("sql", query+vars.GetExecuteArgumentsInfo())) } } diff --git a/session/session_test.go b/session/session_test.go index 9babc894e808b..c4ec53d7912ca 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -2340,10 +2340,10 @@ func (s *testSessionSuite) TestTxnGoString(c *C) { tk.MustExec("begin") txn, err = tk.Se.Txn(false) c.Assert(err, IsNil) - c.Assert(fmt.Sprintf("%#v", txn), Equals, fmt.Sprintf("Txn{state=valid, startTS=%d}", txn.StartTS())) + c.Assert(fmt.Sprintf("%#v", txn), Equals, fmt.Sprintf("Txn{state=valid, txnStartTS=%d}", txn.StartTS())) tk.MustExec("insert into gostr values (1)") - c.Assert(fmt.Sprintf("%#v", txn), Equals, fmt.Sprintf("Txn{state=valid, startTS=%d}", txn.StartTS())) + c.Assert(fmt.Sprintf("%#v", txn), Equals, fmt.Sprintf("Txn{state=valid, txnStartTS=%d}", txn.StartTS())) tk.MustExec("rollback") c.Assert(fmt.Sprintf("%#v", txn), Equals, "Txn{state=invalid}") diff --git a/session/tidb.go b/session/tidb.go index a35e59ed1abfe..8f14ce5004ac7 100644 --- a/session/tidb.go +++ b/session/tidb.go @@ -36,8 +36,9 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sqlexec" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" "golang.org/x/net/context" ) @@ -60,7 +61,10 @@ func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) { ddlLease = schemaLease statisticLease = statsLease err = util.RunWithRetry(util.DefaultMaxRetries, util.RetryInterval, func() (retry bool, err1 error) { - log.Infof("store %v new domain, ddl lease %v, stats lease %d", store.UUID(), ddlLease, statisticLease) + logutil.Logger(context.Background()).Info("new domain", + zap.String("store", store.UUID()), + zap.Stringer("ddl lease", ddlLease), + zap.Stringer("stats lease", statisticLease)) factory := createSessionFunc(store) sysFactory := createSessionWithDomainFunc(store) d = domain.NewDomain(store, ddlLease, statisticLease, factory) @@ -68,7 +72,8 @@ func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) { if err1 != nil { // If we don't clean it, there are some dirty data when retrying the function of Init. d.Close() - log.Errorf("[ddl] init domain failed %v", errors.ErrorStack(errors.Trace(err1))) + logutil.Logger(context.Background()).Error("[ddl] init domain failed", + zap.Error(err1)) } return true, errors.Trace(err1) }) @@ -121,7 +126,7 @@ func SetStatsLease(lease time.Duration) { // Parse parses a query string to raw ast.StmtNode. func Parse(ctx sessionctx.Context, src string) ([]ast.StmtNode, error) { - log.Debug("compiling", src) + logutil.Logger(context.Background()).Debug("compiling", zap.String("source", src)) charset, collation := ctx.GetSessionVars().GetCharsetInfo() p := parser.New() p.SetSQLMode(ctx.GetSessionVars().SQLMode) @@ -130,7 +135,9 @@ func Parse(ctx sessionctx.Context, src string) ([]ast.StmtNode, error) { ctx.GetSessionVars().StmtCtx.AppendWarning(warn) } if err != nil { - log.Warnf("compiling %s, error: %v", src, err) + logutil.Logger(context.Background()).Warn("compiling", + zap.String("source", src), + zap.Error(err)) return nil, errors.Trace(err) } return stmts, nil @@ -146,7 +153,7 @@ func Compile(ctx context.Context, sctx sessionctx.Context, stmtNode ast.StmtNode func finishStmt(ctx context.Context, sctx sessionctx.Context, se *session, sessVars *variable.SessionVars, meetsErr error) error { if meetsErr != nil { if !sessVars.InTxn() { - log.Info("RollbackTxn for ddl/autocommit error.") + logutil.Logger(context.Background()).Info("rollbackTxn for ddl/autocommit error.") terror.Log(se.RollbackTxn(ctx)) } return meetsErr @@ -199,7 +206,7 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement) } } } else { - log.Error(err1) + logutil.Logger(context.Background()).Error("get txn error", zap.Error(err1)) } } @@ -295,7 +302,7 @@ func newStoreWithRetry(path string, maxRetries int) (kv.Storage, error) { var s kv.Storage err = util.RunWithRetry(maxRetries, util.RetryInterval, func() (bool, error) { - log.Infof("new store") + logutil.Logger(context.Background()).Info("new store") s, err = d.Open(path) return kv.IsRetryableError(err), err }) diff --git a/session/txn.go b/session/txn.go index 23e9dc8e94989..24df05c0b794f 100644 --- a/session/txn.go +++ b/session/txn.go @@ -15,7 +15,6 @@ package session import ( "fmt" - "runtime/debug" "strings" "sync/atomic" @@ -27,8 +26,9 @@ import ( "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" - binlog "github.com/pingcap/tipb/go-binlog" - log "github.com/sirupsen/logrus" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tipb/go-binlog" + "go.uber.org/zap" "golang.org/x/net/context" ) @@ -89,12 +89,15 @@ func (st *TxnState) GoString() string { s.WriteString("state=pending") } else if st.Valid() { s.WriteString("state=valid") - fmt.Fprintf(&s, ", startTS=%d", st.Transaction.StartTS()) + fmt.Fprintf(&s, ", txnStartTS=%d", st.Transaction.StartTS()) if len(st.dirtyTableOP) > 0 { - fmt.Fprintf(&s, ", len(dirtyTable)=%d", len(st.dirtyTableOP)) + fmt.Fprintf(&s, ", len(dirtyTable)=%d, %#v", len(st.dirtyTableOP), st.dirtyTableOP) } if len(st.mutations) > 0 { - fmt.Fprintf(&s, ", len(mutations)=%d", len(st.mutations)) + fmt.Fprintf(&s, ", len(mutations)=%d, %#v", len(st.mutations), st.mutations) + } + if st.buf != nil && st.buf.Len() != 0 { + fmt.Fprintf(&s, ", buf.length: %d, buf.size: %d", st.buf.Len(), st.buf.Size()) } } else { s.WriteString("state=invalid") @@ -160,17 +163,14 @@ func mockAutoIDRetry() bool { func (st *TxnState) Commit(ctx context.Context) error { defer st.reset() if len(st.mutations) != 0 || len(st.dirtyTableOP) != 0 || st.buf.Len() != 0 { - log.Errorf("The code should never run here, TxnState=%#v, mutations=%#v, dirtyTableOP=%#v, buf=%#v something must be wrong: %s", - st, - st.mutations, - st.dirtyTableOP, - st.buf, - debug.Stack()) + logutil.Logger(context.Background()).Error("the code should never run here", + zap.String("TxnState", st.GoString()), + zap.Stack("something must be wrong")) return errors.New("invalid transaction") } if st.doNotCommit != nil { if err1 := st.Transaction.Rollback(); err1 != nil { - log.Error(err1) + logutil.Logger(context.Background()).Error("rollback error", zap.Error(err1)) } return errors.Trace(st.doNotCommit) }