diff --git a/br/pkg/lightning/backend/local/region_job.go b/br/pkg/lightning/backend/local/region_job.go index 544df9a6ca310..a021937bdcad9 100644 --- a/br/pkg/lightning/backend/local/region_job.go +++ b/br/pkg/lightning/backend/local/region_job.go @@ -17,6 +17,7 @@ package local import ( "container/heap" "context" + goerrors "errors" "strings" "sync" "time" @@ -159,7 +160,7 @@ func (j *regionJob) convertStageTo(stage jobStageTp) { // we don't need to do cleanup for the pairs written to tikv if encounters an error, // tikv will take the responsibility to do so. // TODO: let client-go provide a high-level write interface. -func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error { +func (local *Backend) writeToTiKV(pCtx context.Context, j *regionJob) (errRet error) { if j.stage != regionScanned { return nil } @@ -175,6 +176,19 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error { failpoint.Return(err) }) + ctx, cancel := context.WithTimeout(pCtx, 15*time.Minute) + defer cancel() + defer func() { + deadline, ok := ctx.Deadline() + if !ok { + // should not happen + return + } + if goerrors.Is(errRet, context.DeadlineExceeded) && time.Now().After(deadline) { + errRet = common.ErrWriteTooSlow + } + }() + apiVersion := local.tikvCodec.GetAPIVersion() clientFactory := local.importClientFactory kvBatchSize := local.KVWriteBatchSize diff --git a/br/pkg/lightning/common/retry.go b/br/pkg/lightning/common/retry.go index 3445402fc028a..f5270cdbb2fd4 100644 --- a/br/pkg/lightning/common/retry.go +++ b/br/pkg/lightning/common/retry.go @@ -88,13 +88,19 @@ var retryableErrorIDs = map[errors.ErrorID]struct{}{ drivererr.ErrUnknown.ID(): {}, } +// ErrWriteTooSlow is used to get rid of the gRPC blocking issue. +// there are some strange blocking issues of gRPC like +// https://github.com/pingcap/tidb/issues/48352 +// https://github.com/pingcap/tidb/issues/46321 and I don't know why 😭 +var ErrWriteTooSlow = errors.New("write too slow, maybe gRPC is blocked forever") + func isSingleRetryableError(err error) bool { err = errors.Cause(err) switch err { case nil, context.Canceled, context.DeadlineExceeded, io.EOF, sql.ErrNoRows: return false - case mysql.ErrInvalidConn, driver.ErrBadConn: + case mysql.ErrInvalidConn, driver.ErrBadConn, ErrWriteTooSlow: return true } diff --git a/br/pkg/lightning/common/retry_test.go b/br/pkg/lightning/common/retry_test.go index adec9ae2b6964..6e12c241381f7 100644 --- a/br/pkg/lightning/common/retry_test.go +++ b/br/pkg/lightning/common/retry_test.go @@ -35,6 +35,7 @@ import ( func TestIsRetryableError(t *testing.T) { require.False(t, IsRetryableError(context.Canceled)) require.False(t, IsRetryableError(context.DeadlineExceeded)) + require.True(t, IsRetryableError(ErrWriteTooSlow)) require.False(t, IsRetryableError(io.EOF)) require.False(t, IsRetryableError(&net.AddrError{})) require.False(t, IsRetryableError(&net.DNSError{}))