Skip to content

Commit

Permalink
lightning: support timeout for tikv/tidb retry (#40805)
Browse files Browse the repository at this point in the history
ref #33714
  • Loading branch information
lichunzhu authored Jan 29, 2023
1 parent 744de86 commit 5048568
Show file tree
Hide file tree
Showing 10 changed files with 32 additions and 12 deletions.
4 changes: 4 additions & 0 deletions br/pkg/lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const (
retryTimeout = 3 * time.Second

defaultMaxRetry = 3

dbTimeout = 30 * time.Second
)

// MySQLConnectParam records the parameters needed to connect to a MySQL database.
Expand Down Expand Up @@ -74,6 +76,8 @@ func (param *MySQLConnectParam) ToDriverConfig() *mysql.Config {
cfg.Params["charset"] = "utf8mb4"
cfg.Params["sql_mode"] = fmt.Sprintf("'%s'", param.SQLMode)
cfg.MaxAllowedPacket = int(param.MaxAllowedPacket)
cfg.ReadTimeout = dbTimeout
cfg.WriteTimeout = dbTimeout

cfg.TLS = param.TLSConfig
cfg.AllowFallbackToPlaintext = param.AllowFallbackToPlaintext
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/config/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ go_library(
"@com_github_docker_go_units//:go-units",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_pingcap_errors//:errors",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//keepalive",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ func TestLoadConfig(t *testing.T) {
err = taskCfg.Adjust(context.Background())
require.NoError(t, err)
equivalentDSN := taskCfg.Checkpoint.MySQLParam.ToDriverConfig().FormatDSN()
expectedDSN := "guest:12345@tcp(172.16.30.11:4001)/?maxAllowedPacket=67108864&charset=utf8mb4&sql_mode=%27ONLY_FULL_GROUP_BY%2CSTRICT_TRANS_TABLES%2CNO_ZERO_IN_DATE%2CNO_ZERO_DATE%2CERROR_FOR_DIVISION_BY_ZERO%2CNO_AUTO_CREATE_USER%2CNO_ENGINE_SUBSTITUTION%27"
expectedDSN := "guest:12345@tcp(172.16.30.11:4001)/?readTimeout=30s&writeTimeout=30s&maxAllowedPacket=67108864&charset=utf8mb4&sql_mode=%27ONLY_FULL_GROUP_BY%2CSTRICT_TRANS_TABLES%2CNO_ZERO_IN_DATE%2CNO_ZERO_DATE%2CERROR_FOR_DIVISION_BY_ZERO%2CNO_AUTO_CREATE_USER%2CNO_ENGINE_SUBSTITUTION%27"
require.Equal(t, expectedDSN, equivalentDSN)

result := taskCfg.String()
Expand Down
12 changes: 12 additions & 0 deletions br/pkg/lightning/config/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
package config

import (
"time"

"github.com/docker/go-units"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)

const (
Expand All @@ -34,3 +38,11 @@ const (

DefaultBatchSize ByteSize = 100 * units.GiB
)

var (
DefaultGrpcKeepaliveParams = grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 20 * time.Second,
PermitWithoutStream: false,
})
)
1 change: 0 additions & 1 deletion br/pkg/lightning/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ go_library(
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//keepalive",
"@org_golang_x_exp//maps",
"@org_golang_x_exp//slices",
"@org_golang_x_sync//errgroup",
Expand Down
7 changes: 1 addition & 6 deletions br/pkg/lightning/restore/precheck_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ import (
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)

type clusterResourceCheckItem struct {
Expand Down Expand Up @@ -733,11 +732,7 @@ func dialEtcdWithCfg(ctx context.Context, cfg *config.Config) (*clientv3.Client,
AutoSyncInterval: 30 * time.Second,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 3 * time.Second,
PermitWithoutStream: false,
}),
config.DefaultGrpcKeepaliveParams,
grpc.WithBlock(),
grpc.WithReturnConnectionError(),
},
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/tikv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//br/pkg/lightning/common",
"//br/pkg/lightning/config",
"//br/pkg/lightning/log",
"//br/pkg/pdutil",
"//br/pkg/version",
Expand Down
6 changes: 4 additions & 2 deletions br/pkg/lightning/tikv/tikv.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/kvproto/pkg/debugpb"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/version"
Expand Down Expand Up @@ -88,7 +89,7 @@ func withTiKVConnection(ctx context.Context, tls *common.TLS, tikvAddr string, a
// Connect to the ImportSST service on the given TiKV node.
// The connection is needed for executing `action` and will be tear down
// when this function exits.
conn, err := grpc.DialContext(ctx, tikvAddr, tls.ToGRPCDialOption())
conn, err := grpc.DialContext(ctx, tikvAddr, tls.ToGRPCDialOption(), config.DefaultGrpcKeepaliveParams)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -172,7 +173,8 @@ var fetchModeRegexp = regexp.MustCompile(`\btikv_config_rocksdb\{cf="default",na

// FetchMode obtains the import mode status of the TiKV node.
func FetchMode(ctx context.Context, tls *common.TLS, tikvAddr string) (import_sstpb.SwitchMode, error) {
conn, err := grpc.DialContext(ctx, tikvAddr, tls.ToGRPCDialOption())
conn, err := grpc.DialContext(ctx, tikvAddr, tls.ToGRPCDialOption(),
config.DefaultGrpcKeepaliveParams)
if err != nil {
return 0, err
}
Expand Down
1 change: 1 addition & 0 deletions br/pkg/restore/split/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"//br/pkg/conn/util",
"//br/pkg/errors",
"//br/pkg/httputil",
"//br/pkg/lightning/config",
"//br/pkg/logutil",
"//br/pkg/redact",
"//br/pkg/utils",
Expand Down
8 changes: 6 additions & 2 deletions br/pkg/restore/split/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/br/pkg/conn/util"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/httputil"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/store/pdtypes"
pd "github.com/tikv/pd/client"
Expand Down Expand Up @@ -201,7 +202,9 @@ func (c *pdClient) SplitRegion(ctx context.Context, regionInfo *RegionInfo, key
if err != nil {
return nil, errors.Trace(err)
}
conn, err := grpc.Dial(store.GetAddress(), grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.Dial(store.GetAddress(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
config.DefaultGrpcKeepaliveParams)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -341,7 +344,8 @@ func sendSplitRegionRequest(ctx context.Context, c *pdClient, regionInfo *Region
if c.tlsConf != nil {
opt = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConf))
}
conn, err := grpc.Dial(store.GetAddress(), opt)
conn, err := grpc.Dial(store.GetAddress(), opt,
config.DefaultGrpcKeepaliveParams)
if err != nil {
return false, nil, err
}
Expand Down

0 comments on commit 5048568

Please sign in to comment.