Skip to content

Commit

Permalink
store/tikv: move kv.TxnInfo to store/tikv (#24216)
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing authored Apr 26, 2021
1 parent c1518ac commit 7e15333
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 23 deletions.
2 changes: 1 addition & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2972,7 +2972,7 @@ func (s *testSuite) TestTiDBLastTxnInfo(c *C) {
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int primary key)")
tk.MustQuery("select json_extract(@@tidb_last_txn_info, '$.start_ts'), json_extract(@@tidb_last_txn_info, '$.commit_ts')").Check(testkit.Rows("0 0"))
tk.MustQuery("select @@tidb_last_txn_info").Check(testkit.Rows(""))

tk.MustExec("insert into t values (1)")
rows1 := tk.MustQuery("select json_extract(@@tidb_last_txn_info, '$.start_ts'), json_extract(@@tidb_last_txn_info, '$.commit_ts')").Rows()
Expand Down
11 changes: 0 additions & 11 deletions kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,3 @@ func BackOff(attempts uint) int {
time.Sleep(sleep)
return int(sleep)
}

// TxnInfo is used to keep track the info of a committed transaction (mainly for diagnosis and testing)
type TxnInfo struct {
TxnScope string `json:"txn_scope"`
StartTS uint64 `json:"start_ts"`
CommitTS uint64 `json:"commit_ts"`
TxnCommitMode string `json:"txn_commit_mode"`
AsyncCommitFallback bool `json:"async_commit_fallback"`
OnePCFallback bool `json:"one_pc_fallback"`
ErrMsg string `json:"error,omitempty"`
}
2 changes: 1 addition & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ func (s *session) doCommit(ctx context.Context) error {
// Set this option for 2 phase commit to validate schema lease.
s.txn.SetOption(tikvstore.SchemaChecker, domain.NewSchemaChecker(domain.GetDomain(s), s.sessionVars.TxnCtx.SchemaVersion, physicalTableIDs))
s.txn.SetOption(tikvstore.InfoSchema, s.sessionVars.TxnCtx.InfoSchema)
s.txn.SetOption(tikvstore.CommitHook, func(info kv.TxnInfo, _ error) { s.sessionVars.LastTxnInfo = info })
s.txn.SetOption(tikvstore.CommitHook, func(info string, _ error) { s.sessionVars.LastTxnInfo = info })
if s.GetSessionVars().EnableAmendPessimisticTxn {
s.txn.SetOption(tikvstore.SchemaAmender, NewSchemaAmenderForTikvTxn(s))
}
Expand Down
2 changes: 1 addition & 1 deletion sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ type SessionVars struct {
EnableAmendPessimisticTxn bool

// LastTxnInfo keeps track the info of last committed transaction.
LastTxnInfo kv.TxnInfo
LastTxnInfo string

// LastQueryInfo keeps track the info of last query.
LastQueryInfo QueryInfo
Expand Down
6 changes: 1 addition & 5 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,7 @@ func GetSessionOnlySysVars(s *SessionVars, key string) (string, bool, error) {
case TiDBCurrentTS:
return fmt.Sprintf("%d", s.TxnCtx.StartTS), true, nil
case TiDBLastTxnInfo:
info, err := json.Marshal(s.LastTxnInfo)
if err != nil {
return "", true, err
}
return string(info), true, nil
return s.LastTxnInfo, true, nil
case TiDBLastQueryInfo:
info, err := json.Marshal(s.LastQueryInfo)
if err != nil {
Expand Down
22 changes: 18 additions & 4 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package tikv
import (
"bytes"
"context"
"encoding/json"
"fmt"
"math/rand"
"runtime/trace"
Expand Down Expand Up @@ -70,7 +71,7 @@ type KVTxn struct {
// SchemaAmender is used amend pessimistic txn commit mutations for schema change
schemaAmender SchemaAmender
// commitCallback is called after current transaction gets committed
commitCallback func(info tidbkv.TxnInfo, err error)
commitCallback func(info string, err error)

binlog BinlogExecutor
}
Expand Down Expand Up @@ -199,7 +200,7 @@ func (txn *KVTxn) SetOption(opt int, val interface{}) {
case kv.SchemaAmender:
txn.schemaAmender = val.(SchemaAmender)
case kv.CommitHook:
txn.commitCallback = val.(func(info tidbkv.TxnInfo, err error))
txn.commitCallback = val.(func(info string, err error))
}
}

Expand Down Expand Up @@ -370,6 +371,17 @@ func (txn *KVTxn) collectLockedKeys() [][]byte {
return keys
}

// TxnInfo is used to keep track the info of a committed transaction (mainly for diagnosis and testing)
type TxnInfo struct {
TxnScope string `json:"txn_scope"`
StartTS uint64 `json:"start_ts"`
CommitTS uint64 `json:"commit_ts"`
TxnCommitMode string `json:"txn_commit_mode"`
AsyncCommitFallback bool `json:"async_commit_fallback"`
OnePCFallback bool `json:"one_pc_fallback"`
ErrMsg string `json:"error,omitempty"`
}

func (txn *KVTxn) onCommitted(err error) {
if txn.commitCallback != nil {
isAsyncCommit := txn.committer.isAsyncCommit()
Expand All @@ -382,7 +394,7 @@ func (txn *KVTxn) onCommitted(err error) {
commitMode = "async_commit"
}

info := tidbkv.TxnInfo{
info := TxnInfo{
TxnScope: txn.GetUnionStore().GetOption(kv.TxnScope).(string),
StartTS: txn.startTS,
CommitTS: txn.commitTS,
Expand All @@ -393,7 +405,9 @@ func (txn *KVTxn) onCommitted(err error) {
if err != nil {
info.ErrMsg = err.Error()
}
txn.commitCallback(info, err)
infoStr, err2 := json.Marshal(info)
_ = err2
txn.commitCallback(string(infoStr), err)
}
}

Expand Down

0 comments on commit 7e15333

Please sign in to comment.