Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserve is_retry_request flag for prewrite when retrying with region errors #513

Merged
merged 4 commits into from
May 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions integration_tests/async_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,15 +590,15 @@ func (s *testAsyncCommitSuite) TestPessimisticTxnResolveAsyncCommitLock() {
ctx := context.Background()
k := []byte("k")

// Lock the key with an async-commit lock.
s.lockKeysWithAsyncCommit([][]byte{}, [][]byte{}, k, k, false)

txn, err := s.store.Begin()
s.Nil(err)
txn.SetPessimistic(true)
err = txn.LockKeys(ctx, &kv.LockCtx{ForUpdateTS: txn.StartTS()}, []byte("k1"))
s.Nil(err)

// Lock the key with a async-commit lock.
s.lockKeysWithAsyncCommit([][]byte{}, [][]byte{}, k, k, false)

txn.Set(k, k)
err = txn.Commit(context.Background())
s.Nil(err)
Expand Down
66 changes: 66 additions & 0 deletions integration_tests/prewrite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,18 @@
package tikv_test

import (
"context"
"sync"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv/transaction"
)

Expand Down Expand Up @@ -84,3 +89,64 @@ func TestSetMinCommitTSInAsyncCommit(t *testing.T) {
assert.Equal(req.MinCommitTs, committer.GetMinCommitTS())

}

// TestIsRetryRequestFlagWithRegionError tests that the is_retry_request flag is true for all retrying prewrite requests.
func TestIsRetryRequestFlagWithRegionError(t *testing.T) {
require := require.New(t)

client, cluster, pdClient, err := testutils.NewMockTiKV("", nil)
require.Nil(err)
_, peerID, regionID := testutils.BootstrapWithSingleStore(cluster)
store, err := tikv.NewTestTiKVStore(client, pdClient, nil, nil, 0)
require.Nil(err)
defer store.Close()

failpoint.Enable("tikvclient/mockRetrySendReqToRegion", "1*return(true)->return(false)")
defer failpoint.Disable("tikvclient/mockRetrySendReqToRegion")
failpoint.Enable("tikvclient/invalidCacheAndRetry", "1*off->pause")

// the history of this field
isRetryRequest := make([]bool, 0, 0)
var mu sync.Mutex
hook := func(req *tikvrpc.Request) {
if req.Type != tikvrpc.CmdPrewrite {
return
}
if req != nil {
mu.Lock()
isRetryRequest = append(isRetryRequest, req.Context.IsRetryRequest)
mu.Unlock()
}
}
failpoint.Enable("tikvclient/beforeSendReqToRegion", "return")
defer failpoint.Disable("tikvclient/beforeSendReqToRegion")
ctx := context.WithValue(context.TODO(), "sendReqToRegionHook", hook)

tx, err := store.Begin()
require.Nil(err)
txn := transaction.TxnProbe{KVTxn: tx}
err = txn.Set([]byte("a"), []byte("v"))
require.Nil(err)
err = txn.Set([]byte("z"), []byte("v"))
require.Nil(err)
committer, err := txn.NewCommitter(1)
require.Nil(err)
var wg sync.WaitGroup
wg.Add(1)
go func() {
committer.PrewriteAllMutations(ctx)
wg.Done()
}()
time.Sleep(time.Second * 3)
cluster.Split(regionID, cluster.AllocID(), []byte("h"), []uint64{peerID}, peerID)
failpoint.Disable("tikvclient/invalidCacheAndRetry")
wg.Wait()

// The event history should be:
// 1. The first prewrite succeeds in TiKV, but due to some reason, client-go doesn't get the response. We inject a retry to simulate it.
// 2. The second prewrite request returns a region error, which is caused by the region split. And it retries.
// 3. The third and fourth prewrite requests (for 'a' and 'z' respectively, so there are 2 of them) succeed.
//
// The last three requests are retry requests, we assert the is_retry_request flags are all true.
require.Equal([]bool{false, true, true, true}, isRetryRequest)
}
8 changes: 8 additions & 0 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,14 @@ func (s *RegionRequestSender) SendReqCtx(

logutil.Eventf(bo.GetCtx(), "send %s request to region %d at %s", req.Type, regionID.id, rpcCtx.Addr)
s.storeAddr = rpcCtx.Addr

if _, err := util.EvalFailpoint("beforeSendReqToRegion"); err == nil {
if hook := bo.GetCtx().Value("sendReqToRegionHook"); hook != nil {
h := hook.(func(*tikvrpc.Request))
h(req)
}
}

var retry bool
resp, retry, err = s.sendReqToRegion(bo, rpcCtx, req, timeout)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions txnkv/transaction/prewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,9 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B
}()
for {
attempts++
if attempts > 1 || action.retry {
req.IsRetryRequest = true
}
if time.Since(tBegin) > slowRequestThreshold {
logutil.BgLogger().Warn("slow prewrite request", zap.Uint64("startTS", c.startTS), zap.Stringer("region", &batch.region), zap.Int("attempts", attempts))
tBegin = time.Now()
Expand Down