Skip to content

Commit

Permalink
Preserve is_retry_request flag for prewrite when retrying with region…
Browse files Browse the repository at this point in the history
… errors (#513)

* add a test to verify the incorrect setting of is_retry_request

Signed-off-by: ekexium <ekexium@gmail.com>

* preserve the retry flag for prewrite

Signed-off-by: ekexium <ekexium@gmail.com>

* fix test

Signed-off-by: ekexium <ekexium@gmail.com>

* add more explanations

Signed-off-by: ekexium <ekexium@gmail.com>
  • Loading branch information
ekexium authored and sticnarf committed May 31, 2022
1 parent de7ca28 commit 04830ff
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 3 deletions.
6 changes: 3 additions & 3 deletions integration_tests/async_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,15 +590,15 @@ func (s *testAsyncCommitSuite) TestPessimisticTxnResolveAsyncCommitLock() {
ctx := context.Background()
k := []byte("k")

// Lock the key with an async-commit lock.
s.lockKeysWithAsyncCommit([][]byte{}, [][]byte{}, k, k, false)

txn, err := s.store.Begin()
s.Nil(err)
txn.SetPessimistic(true)
err = txn.LockKeys(ctx, &kv.LockCtx{ForUpdateTS: txn.StartTS()}, []byte("k1"))
s.Nil(err)

// Lock the key with a async-commit lock.
s.lockKeysWithAsyncCommit([][]byte{}, [][]byte{}, k, k, false)

txn.Set(k, k)
err = txn.Commit(context.Background())
s.Nil(err)
Expand Down
66 changes: 66 additions & 0 deletions integration_tests/prewrite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,18 @@
package tikv_test

import (
"context"
"sync"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv/transaction"
)

Expand Down Expand Up @@ -84,3 +89,64 @@ func TestSetMinCommitTSInAsyncCommit(t *testing.T) {
assert.Equal(req.MinCommitTs, committer.GetMinCommitTS())

}

// TestIsRetryRequestFlagWithRegionError tests that the is_retry_request flag is true for all retrying prewrite requests.
func TestIsRetryRequestFlagWithRegionError(t *testing.T) {
require := require.New(t)

client, cluster, pdClient, err := testutils.NewMockTiKV("", nil)
require.Nil(err)
_, peerID, regionID := testutils.BootstrapWithSingleStore(cluster)
store, err := tikv.NewTestTiKVStore(client, pdClient, nil, nil, 0)
require.Nil(err)
defer store.Close()

failpoint.Enable("tikvclient/mockRetrySendReqToRegion", "1*return(true)->return(false)")
defer failpoint.Disable("tikvclient/mockRetrySendReqToRegion")
failpoint.Enable("tikvclient/invalidCacheAndRetry", "1*off->pause")

// the history of this field
isRetryRequest := make([]bool, 0, 0)
var mu sync.Mutex
hook := func(req *tikvrpc.Request) {
if req.Type != tikvrpc.CmdPrewrite {
return
}
if req != nil {
mu.Lock()
isRetryRequest = append(isRetryRequest, req.Context.IsRetryRequest)
mu.Unlock()
}
}
failpoint.Enable("tikvclient/beforeSendReqToRegion", "return")
defer failpoint.Disable("tikvclient/beforeSendReqToRegion")
ctx := context.WithValue(context.TODO(), "sendReqToRegionHook", hook)

tx, err := store.Begin()
require.Nil(err)
txn := transaction.TxnProbe{KVTxn: tx}
err = txn.Set([]byte("a"), []byte("v"))
require.Nil(err)
err = txn.Set([]byte("z"), []byte("v"))
require.Nil(err)
committer, err := txn.NewCommitter(1)
require.Nil(err)
var wg sync.WaitGroup
wg.Add(1)
go func() {
committer.PrewriteAllMutations(ctx)
wg.Done()
}()
time.Sleep(time.Second * 3)
cluster.Split(regionID, cluster.AllocID(), []byte("h"), []uint64{peerID}, peerID)
failpoint.Disable("tikvclient/invalidCacheAndRetry")
wg.Wait()

// The event history should be:
// 1. The first prewrite succeeds in TiKV, but due to some reason, client-go doesn't get the response. We inject a retry to simulate it.
// 2. The second prewrite request returns a region error, which is caused by the region split. And it retries.
// 3. The third and fourth prewrite requests (for 'a' and 'z' respectively, so there are 2 of them) succeed.
//
// The last three requests are retry requests, we assert the is_retry_request flags are all true.
require.Equal([]bool{false, true, true, true}, isRetryRequest)
}
8 changes: 8 additions & 0 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,14 @@ func (s *RegionRequestSender) SendReqCtx(

logutil.Eventf(bo.GetCtx(), "send %s request to region %d at %s", req.Type, regionID.id, rpcCtx.Addr)
s.storeAddr = rpcCtx.Addr

if _, err := util.EvalFailpoint("beforeSendReqToRegion"); err == nil {
if hook := bo.GetCtx().Value("sendReqToRegionHook"); hook != nil {
h := hook.(func(*tikvrpc.Request))
h(req)
}
}

var retry bool
resp, retry, err = s.sendReqToRegion(bo, rpcCtx, req, timeout)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions txnkv/transaction/prewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,9 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B
}()
for {
attempts++
if attempts > 1 || action.retry {
req.IsRetryRequest = true
}
if time.Since(tBegin) > slowRequestThreshold {
logutil.BgLogger().Warn("slow prewrite request", zap.Uint64("startTS", c.startTS), zap.Stringer("region", &batch.region), zap.Int("attempts", attempts))
tBegin = time.Now()
Expand Down

0 comments on commit 04830ff

Please sign in to comment.