From b0d49533b1624815ec9195ca3d0501ffd45619bc Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 22 Nov 2023 15:47:41 +0800 Subject: [PATCH] lightning: add timeout for "write" RPC (#48355) (#48396) close pingcap/tidb#46321, close pingcap/tidb#48352 --- br/pkg/lightning/backend/local/local.go | 18 ++++++++++++++++-- br/pkg/lightning/common/retry.go | 8 +++++++- br/pkg/lightning/common/retry_test.go | 1 + 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 45787e6e01b0b..5f53106b4384b 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -17,6 +17,7 @@ package local import ( "bytes" "context" + goerrors "errors" "fmt" "io" "math" @@ -907,17 +908,30 @@ type rangeStats struct { // we don't need to do cleanup for the pairs written to tikv if encounters an error, // tikv will takes the responsibility to do so. func (local *local) WriteToTiKV( - ctx context.Context, + pCtx context.Context, engine *Engine, region *split.RegionInfo, start, end []byte, regionSplitSize int64, regionSplitKeys int64, -) ([]*sst.SSTMeta, Range, rangeStats, error) { +) (s []*sst.SSTMeta, r Range, r2 rangeStats, errRet error) { failpoint.Inject("WriteToTiKVNotEnoughDiskSpace", func(_ failpoint.Value) { failpoint.Return(nil, Range{}, rangeStats{}, errors.Errorf("The available disk of TiKV (%s) only left %d, and capacity is %d", "", 0, 0)) }) + ctx, cancel := context.WithTimeout(pCtx, 15*time.Minute) + defer cancel() + defer func() { + deadline, ok := ctx.Deadline() + if !ok { + // should not happen + return + } + if goerrors.Is(errRet, context.DeadlineExceeded) && time.Now().After(deadline) { + errRet = common.ErrWriteTooSlow + } + }() + if local.checkTiKVAvaliable { for _, peer := range region.Region.GetPeers() { var e error diff --git a/br/pkg/lightning/common/retry.go b/br/pkg/lightning/common/retry.go index c3bb979a9bd32..f6e96c9e8dff0 100644 --- a/br/pkg/lightning/common/retry.go +++ b/br/pkg/lightning/common/retry.go @@ -88,13 +88,19 @@ var retryableErrorIDs = map[errors.ErrorID]struct{}{ drivererr.ErrUnknown.ID(): {}, } +// ErrWriteTooSlow is used to get rid of the gRPC blocking issue. +// there are some strange blocking issues of gRPC like +// https://github.com/pingcap/tidb/issues/48352 +// https://github.com/pingcap/tidb/issues/46321 and I don't know why 😭 +var ErrWriteTooSlow = errors.New("write too slow, maybe gRPC is blocked forever") + func isSingleRetryableError(err error) bool { err = errors.Cause(err) switch err { case nil, context.Canceled, context.DeadlineExceeded, io.EOF, sql.ErrNoRows: return false - case mysql.ErrInvalidConn, driver.ErrBadConn: + case mysql.ErrInvalidConn, driver.ErrBadConn, ErrWriteTooSlow: return true } diff --git a/br/pkg/lightning/common/retry_test.go b/br/pkg/lightning/common/retry_test.go index cbfc87085f11a..226771eae980c 100644 --- a/br/pkg/lightning/common/retry_test.go +++ b/br/pkg/lightning/common/retry_test.go @@ -34,6 +34,7 @@ import ( func TestIsRetryableError(t *testing.T) { require.False(t, IsRetryableError(context.Canceled)) require.False(t, IsRetryableError(context.DeadlineExceeded)) + require.True(t, IsRetryableError(ErrWriteTooSlow)) require.False(t, IsRetryableError(io.EOF)) require.False(t, IsRetryableError(&net.AddrError{})) require.False(t, IsRetryableError(&net.DNSError{}))