diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 6719228654aa5..95100edb9cbad 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -3176,7 +3176,7 @@ func (rc *Client) restoreMetaKvEntries( failpoint.Inject("failed-to-restore-metakv", func(_ failpoint.Value) { failpoint.Return(0, 0, errors.Errorf("failpoint: failed to restore metakv")) }) - if err := rc.rawKVClient.Put(ctx, newEntry.Key, newEntry.Value, entry.ts); err != nil { + if err := PutRawKvWithRetry(ctx, rc.rawKVClient, newEntry.Key, newEntry.Value, entry.ts); err != nil { return 0, 0, errors.Trace(err) } // for failpoint, we need to flush the cache in rawKVClient every time @@ -3802,3 +3802,13 @@ func (b *waitTiFlashBackoffer) NextBackoff(error) time.Duration { func (b *waitTiFlashBackoffer) Attempt() int { return b.Attempts } + +func PutRawKvWithRetry(ctx context.Context, client *RawKVBatchClient, key, value []byte, originTs uint64) error { + err := utils.WithRetry(ctx, func() error { + return client.Put(ctx, key, value, originTs) + }, utils.NewRawClientBackoffStrategy()) + if err != nil { + return errors.Errorf("failed to put raw kv after retry") + } + return nil +} diff --git a/br/pkg/restore/client_test.go b/br/pkg/restore/client_test.go index 331344e04c65c..7574a579b40e8 100644 --- a/br/pkg/restore/client_test.go +++ b/br/pkg/restore/client_test.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/pkg/tablecodec" filter "github.com/pingcap/tidb/pkg/util/table-filter" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/rawkv" pd "github.com/tikv/pd/client" "google.golang.org/grpc/keepalive" ) @@ -1928,3 +1929,69 @@ func TestCheckNewCollationEnable(t *testing.T) { require.Equal(t, ca.newCollationEnableInCluster == "True", enabled) } } + +type mockRawKVClient struct { + rawkv.Client + putCount int + errThreshold int +} + +func (m *mockRawKVClient) BatchPut(ctx context.Context, keys, values [][]byte, options ...rawkv.RawOption) error { + m.putCount += 1 + if m.errThreshold >= m.putCount { + return errors.New("rpcClient is idle") + } + return nil +} + +func TestPutRawKvWithRetry(t *testing.T) { + tests := []struct { + name string + errThreshold int + cancelAfter time.Duration + wantErr string + wantPuts int + }{ + { + name: "success on first try", + errThreshold: 0, + wantPuts: 1, + }, + { + name: "success on after failure", + errThreshold: 2, + wantPuts: 3, + }, + { + name: "fails all retries", + errThreshold: 5, + wantErr: "failed to put raw kv after retry", + wantPuts: 5, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockRawClient := &mockRawKVClient{ + errThreshold: tt.errThreshold, + } + client := restore.NewRawKVBatchClient(mockRawClient, 1) + + ctx := context.Background() + if tt.cancelAfter > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, tt.cancelAfter) + defer cancel() + } + + err := restore.PutRawKvWithRetry(ctx, client, []byte("key"), []byte("value"), 1) + + if tt.wantErr != "" { + require.ErrorContains(t, err, tt.wantErr) + } else { + require.NoError(t, err) + } + require.Equal(t, tt.wantPuts, mockRawClient.putCount) + }) + } +} diff --git a/br/pkg/restore/log_client/BUILD.bazel b/br/pkg/restore/log_client/BUILD.bazel deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/br/pkg/utils/backoff.go b/br/pkg/utils/backoff.go index f1b6a837bcdb9..d5bedec24cf4b 100644 --- a/br/pkg/utils/backoff.go +++ b/br/pkg/utils/backoff.go @@ -52,6 +52,10 @@ const ( ChecksumMaxWaitInterval = 30 * time.Second gRPC_Cancel = "the client connection is closing" + + rawClientMaxAttempts = 5 + rawClientDelayTime = 500 * time.Millisecond + rawClientMaxDelayTime = 5 * time.Second ) // At least, there are two possible cancel() call, @@ -300,3 +304,35 @@ func (bo *pdReqBackoffer) NextBackoff(err error) time.Duration { func (bo *pdReqBackoffer) Attempt() int { return bo.attempt } + +type RawClientBackoffStrategy struct { + Attempts int + BaseBackoff time.Duration + MaxBackoff time.Duration +} + +func NewRawClientBackoffStrategy() Backoffer { + return &RawClientBackoffStrategy{ + Attempts: rawClientMaxAttempts, + BaseBackoff: rawClientDelayTime, + MaxBackoff: rawClientMaxDelayTime, + } +} + +// NextBackoff returns a duration to wait before retrying again +func (b *RawClientBackoffStrategy) NextBackoff(error) time.Duration { + bo := b.BaseBackoff + b.Attempts-- + if b.Attempts == 0 { + return 0 + } + b.BaseBackoff *= 2 + if b.BaseBackoff > b.MaxBackoff { + b.BaseBackoff = b.MaxBackoff + } + return bo +} + +func (b *RawClientBackoffStrategy) Attempt() int { + return b.Attempts +} diff --git a/br/tests/br_file_corruption/run.sh b/br/tests/br_file_corruption/run.sh index 60907ac2e7a4c..b74354a1a9c6e 100644 --- a/br/tests/br_file_corruption/run.sh +++ b/br/tests/br_file_corruption/run.sh @@ -68,7 +68,7 @@ for filename in $(find $TEST_DIR/$DB -name "*.sst_bak"); do mv "$filename" "${filename%_bak}" done -export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/restore/snap_client/full-restore-validate-checksum=return(true)" +export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/restore/full-restore-validate-checksum=return(true)" restore_fail=0 run_br --pd $PD_ADDR restore full -s "local://$TEST_DIR/$DB" --checksum=true || restore_fail=1 export GO_FAILPOINTS="" diff --git a/br/tests/br_test_utils.sh b/br/tests/br_test_utils.sh new file mode 100644 index 0000000000000..4b63324a186d2 --- /dev/null +++ b/br/tests/br_test_utils.sh @@ -0,0 +1,51 @@ +#!/bin/sh +# +# Copyright 2024 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eux + +wait_log_checkpoint_advance() { + local task_name=${1:-$TASK_NAME} + echo "wait for log checkpoint to advance for task: $task_name" + sleep 10 + local current_ts=$(python3 -c "import time; print(int(time.time() * 1000) << 18)") + echo "current ts: $current_ts" + i=0 + while true; do + # extract the checkpoint ts of the log backup task. If there is some error, the checkpoint ts should be empty + log_backup_status=$(unset BR_LOG_TO_TERM && run_br --pd $PD_ADDR log status --task-name $task_name --json 2>br.log) + echo "log backup status: $log_backup_status" + local checkpoint_ts=$(echo "$log_backup_status" | head -n 1 | jq 'if .[0].last_errors | length == 0 then .[0].checkpoint else empty end') + echo "checkpoint ts: $checkpoint_ts" + + # check whether the checkpoint ts is a number + if [ $checkpoint_ts -gt 0 ] 2>/dev/null; then + if [ $checkpoint_ts -gt $current_ts ]; then + echo "the checkpoint has advanced" + break + fi + echo "the checkpoint hasn't advanced" + i=$((i+1)) + if [ "$i" -gt 50 ]; then + echo 'the checkpoint lag is too large' + exit 1 + fi + sleep 10 + else + echo "TEST: [$TEST_NAME] failed to wait checkpoint advance!" + exit 1 + fi + done +} diff --git a/br/tests/run_group.sh b/br/tests/run_group.sh index a7ad4befaf541..c5bc094091d95 100755 --- a/br/tests/run_group.sh +++ b/br/tests/run_group.sh @@ -20,7 +20,7 @@ mkdir $COV_DIR # Putting multiple light tests together and heavy tests in a separate group. declare -A groups groups=( - ["G00"]="br_300_small_tables br_backup_empty br_backup_version br_cache_table br_case_sensitive br_charset_gbk br_check_new_collocation_enable" + ["G00"]="br_300_small_tables br_backup_empty br_backup_version br_cache_table br_case_sensitive br_charset_gbk br_check_new_collocation_enable br_check_dup_table" ["G01"]="br_autoid br_crypter2 br_db br_db_online br_db_online_newkv br_db_skip br_debug_meta br_ebs br_foreign_key br_full" ["G02"]="br_full_cluster_restore br_full_ddl br_full_index br_gcs br_history br_pitr_failpoint br_pitr_long_running_schema_loading" ["G03"]='br_incompatible_tidb_config br_incremental br_incremental_ddl br_incremental_index br_pitr'