Skip to content

Commit

Permalink
br: add retry for raw kv client put (#58963) (#59081)
Browse files Browse the repository at this point in the history
close #58845
  • Loading branch information
ti-chi-bot authored Feb 27, 2025
1 parent da5eed3 commit 468c62b
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 3 deletions.
12 changes: 11 additions & 1 deletion br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3176,7 +3176,7 @@ func (rc *Client) restoreMetaKvEntries(
failpoint.Inject("failed-to-restore-metakv", func(_ failpoint.Value) {
failpoint.Return(0, 0, errors.Errorf("failpoint: failed to restore metakv"))
})
if err := rc.rawKVClient.Put(ctx, newEntry.Key, newEntry.Value, entry.ts); err != nil {
if err := PutRawKvWithRetry(ctx, rc.rawKVClient, newEntry.Key, newEntry.Value, entry.ts); err != nil {
return 0, 0, errors.Trace(err)
}
// for failpoint, we need to flush the cache in rawKVClient every time
Expand Down Expand Up @@ -3802,3 +3802,13 @@ func (b *waitTiFlashBackoffer) NextBackoff(error) time.Duration {
func (b *waitTiFlashBackoffer) Attempt() int {
return b.Attempts
}

func PutRawKvWithRetry(ctx context.Context, client *RawKVBatchClient, key, value []byte, originTs uint64) error {
err := utils.WithRetry(ctx, func() error {
return client.Put(ctx, key, value, originTs)
}, utils.NewRawClientBackoffStrategy())
if err != nil {
return errors.Errorf("failed to put raw kv after retry")
}
return nil
}
67 changes: 67 additions & 0 deletions br/pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/pkg/tablecodec"
filter "github.com/pingcap/tidb/pkg/util/table-filter"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/rawkv"
pd "github.com/tikv/pd/client"
"google.golang.org/grpc/keepalive"
)
Expand Down Expand Up @@ -1928,3 +1929,69 @@ func TestCheckNewCollationEnable(t *testing.T) {
require.Equal(t, ca.newCollationEnableInCluster == "True", enabled)
}
}

type mockRawKVClient struct {
rawkv.Client
putCount int
errThreshold int
}

func (m *mockRawKVClient) BatchPut(ctx context.Context, keys, values [][]byte, options ...rawkv.RawOption) error {
m.putCount += 1
if m.errThreshold >= m.putCount {
return errors.New("rpcClient is idle")
}
return nil
}

func TestPutRawKvWithRetry(t *testing.T) {
tests := []struct {
name string
errThreshold int
cancelAfter time.Duration
wantErr string
wantPuts int
}{
{
name: "success on first try",
errThreshold: 0,
wantPuts: 1,
},
{
name: "success on after failure",
errThreshold: 2,
wantPuts: 3,
},
{
name: "fails all retries",
errThreshold: 5,
wantErr: "failed to put raw kv after retry",
wantPuts: 5,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mockRawClient := &mockRawKVClient{
errThreshold: tt.errThreshold,
}
client := restore.NewRawKVBatchClient(mockRawClient, 1)

ctx := context.Background()
if tt.cancelAfter > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, tt.cancelAfter)
defer cancel()
}

err := restore.PutRawKvWithRetry(ctx, client, []byte("key"), []byte("value"), 1)

if tt.wantErr != "" {
require.ErrorContains(t, err, tt.wantErr)
} else {
require.NoError(t, err)
}
require.Equal(t, tt.wantPuts, mockRawClient.putCount)
})
}
}
Empty file.
36 changes: 36 additions & 0 deletions br/pkg/utils/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ const (
ChecksumMaxWaitInterval = 30 * time.Second

gRPC_Cancel = "the client connection is closing"

rawClientMaxAttempts = 5
rawClientDelayTime = 500 * time.Millisecond
rawClientMaxDelayTime = 5 * time.Second
)

// At least, there are two possible cancel() call,
Expand Down Expand Up @@ -300,3 +304,35 @@ func (bo *pdReqBackoffer) NextBackoff(err error) time.Duration {
func (bo *pdReqBackoffer) Attempt() int {
return bo.attempt
}

type RawClientBackoffStrategy struct {
Attempts int
BaseBackoff time.Duration
MaxBackoff time.Duration
}

func NewRawClientBackoffStrategy() Backoffer {
return &RawClientBackoffStrategy{
Attempts: rawClientMaxAttempts,
BaseBackoff: rawClientDelayTime,
MaxBackoff: rawClientMaxDelayTime,
}
}

// NextBackoff returns a duration to wait before retrying again
func (b *RawClientBackoffStrategy) NextBackoff(error) time.Duration {
bo := b.BaseBackoff
b.Attempts--
if b.Attempts == 0 {
return 0
}
b.BaseBackoff *= 2
if b.BaseBackoff > b.MaxBackoff {
b.BaseBackoff = b.MaxBackoff
}
return bo
}

func (b *RawClientBackoffStrategy) Attempt() int {
return b.Attempts
}
2 changes: 1 addition & 1 deletion br/tests/br_file_corruption/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ for filename in $(find $TEST_DIR/$DB -name "*.sst_bak"); do
mv "$filename" "${filename%_bak}"
done

export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/restore/snap_client/full-restore-validate-checksum=return(true)"
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/restore/full-restore-validate-checksum=return(true)"
restore_fail=0
run_br --pd $PD_ADDR restore full -s "local://$TEST_DIR/$DB" --checksum=true || restore_fail=1
export GO_FAILPOINTS=""
Expand Down
51 changes: 51 additions & 0 deletions br/tests/br_test_utils.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#!/bin/sh
#
# Copyright 2024 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eux

wait_log_checkpoint_advance() {
local task_name=${1:-$TASK_NAME}
echo "wait for log checkpoint to advance for task: $task_name"
sleep 10
local current_ts=$(python3 -c "import time; print(int(time.time() * 1000) << 18)")
echo "current ts: $current_ts"
i=0
while true; do
# extract the checkpoint ts of the log backup task. If there is some error, the checkpoint ts should be empty
log_backup_status=$(unset BR_LOG_TO_TERM && run_br --pd $PD_ADDR log status --task-name $task_name --json 2>br.log)
echo "log backup status: $log_backup_status"
local checkpoint_ts=$(echo "$log_backup_status" | head -n 1 | jq 'if .[0].last_errors | length == 0 then .[0].checkpoint else empty end')
echo "checkpoint ts: $checkpoint_ts"

# check whether the checkpoint ts is a number
if [ $checkpoint_ts -gt 0 ] 2>/dev/null; then
if [ $checkpoint_ts -gt $current_ts ]; then
echo "the checkpoint has advanced"
break
fi
echo "the checkpoint hasn't advanced"
i=$((i+1))
if [ "$i" -gt 50 ]; then
echo 'the checkpoint lag is too large'
exit 1
fi
sleep 10
else
echo "TEST: [$TEST_NAME] failed to wait checkpoint advance!"
exit 1
fi
done
}
2 changes: 1 addition & 1 deletion br/tests/run_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ mkdir $COV_DIR
# Putting multiple light tests together and heavy tests in a separate group.
declare -A groups
groups=(
["G00"]="br_300_small_tables br_backup_empty br_backup_version br_cache_table br_case_sensitive br_charset_gbk br_check_new_collocation_enable"
["G00"]="br_300_small_tables br_backup_empty br_backup_version br_cache_table br_case_sensitive br_charset_gbk br_check_new_collocation_enable br_check_dup_table"
["G01"]="br_autoid br_crypter2 br_db br_db_online br_db_online_newkv br_db_skip br_debug_meta br_ebs br_foreign_key br_full"
["G02"]="br_full_cluster_restore br_full_ddl br_full_index br_gcs br_history br_pitr_failpoint br_pitr_long_running_schema_loading"
["G03"]='br_incompatible_tidb_config br_incremental br_incremental_ddl br_incremental_index br_pitr'
Expand Down

0 comments on commit 468c62b

Please sign in to comment.