Skip to content

Commit

Permalink
meta/autoid: make autoid client ResetConn operation concurrency-safe (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored Jan 21, 2024
1 parent b5a2eb8 commit 7a0b4df
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 21 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2973,8 +2973,8 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sum = "h1:tBKPWWqgWEBs04BV4UN7RhtUkZDs0oz+WyMbtRDVtL8=",
version = "v0.0.0-20230928035022-1bdcc25ed63c",
sum = "h1:ZWFeZNN+6poqqEQ3XU6M/Gw6oiNexbDD3yqIZ05GxlM=",
version = "v0.0.0-20240112060601-a0e3fbb1eeee",
)
go_repository(
name = "com_github_pingcap_log",
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ require (
github.com/pingcap/errors v0.11.5-0.20220729040631-518f63d66278
github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059
github.com/pingcap/kvproto v0.0.0-20230928035022-1bdcc25ed63c
github.com/pingcap/kvproto v0.0.0-20240112060601-a0e3fbb1eeee
github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c
github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4
github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -783,8 +783,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20230726063044-73d6d7f3756b/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20230928035022-1bdcc25ed63c h1:tBKPWWqgWEBs04BV4UN7RhtUkZDs0oz+WyMbtRDVtL8=
github.com/pingcap/kvproto v0.0.0-20230928035022-1bdcc25ed63c/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20240112060601-a0e3fbb1eeee h1:ZWFeZNN+6poqqEQ3XU6M/Gw6oiNexbDD3yqIZ05GxlM=
github.com/pingcap/kvproto v0.0.0-20240112060601-a0e3fbb1eeee/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
Expand Down
47 changes: 31 additions & 16 deletions meta/autoid/autoid_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -54,6 +55,8 @@ type ClientDiscover struct {
// See https://github.com/grpc/grpc-go/issues/5321
*grpc.ClientConn
}
// version is increased in every ResetConn() to make the operation safe.
version uint64
}

const (
Expand All @@ -68,27 +71,27 @@ func NewClientDiscover(etcdCli *clientv3.Client) *ClientDiscover {
}

// GetClient gets the AutoIDAllocClient.
func (d *ClientDiscover) GetClient(ctx context.Context) (autoid.AutoIDAllocClient, error) {
func (d *ClientDiscover) GetClient(ctx context.Context) (autoid.AutoIDAllocClient, uint64, error) {
d.mu.RLock()
cli := d.mu.AutoIDAllocClient
if cli != nil {
d.mu.RUnlock()
return cli, nil
return cli, atomic.LoadUint64(&d.version), nil
}
d.mu.RUnlock()

d.mu.Lock()
defer d.mu.Unlock()
if d.mu.AutoIDAllocClient != nil {
return d.mu.AutoIDAllocClient, nil
return d.mu.AutoIDAllocClient, atomic.LoadUint64(&d.version), nil
}

resp, err := d.etcdCli.Get(ctx, autoIDLeaderPath, clientv3.WithFirstCreate()...)
if err != nil {
return nil, errors.Trace(err)
return nil, 0, errors.Trace(err)
}
if len(resp.Kvs) == 0 {
return nil, errors.New("autoid service leader not found")
return nil, 0, errors.New("autoid service leader not found")
}

addr := string(resp.Kvs[0].Value)
Expand All @@ -98,19 +101,19 @@ func (d *ClientDiscover) GetClient(ctx context.Context) (autoid.AutoIDAllocClien
clusterSecurity := security.ClusterSecurity()
tlsConfig, err := clusterSecurity.ToTLSConfig()
if err != nil {
return nil, errors.Trace(err)
return nil, 0, errors.Trace(err)
}
opt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))
}
logutil.BgLogger().Info("[autoid client] connect to leader", zap.String("addr", addr))
grpcConn, err := grpc.Dial(addr, opt)
if err != nil {
return nil, errors.Trace(err)
return nil, 0, errors.Trace(err)
}
cli = autoid.NewAutoIDAllocClient(grpcConn)
d.mu.AutoIDAllocClient = cli
d.mu.ClientConn = grpcConn
return cli, nil
return cli, atomic.LoadUint64(&d.version), nil
}

// Alloc allocs N consecutive autoID for table with tableID, returning (min, max] of the allocated autoID batch.
Expand All @@ -131,7 +134,7 @@ func (sp *singlePointAlloc) Alloc(ctx context.Context, n uint64, increment, offs
}

retry:
cli, err := sp.GetClient(ctx)
cli, ver, err := sp.GetClient(ctx)
if err != nil {
return 0, 0, errors.Trace(err)
}
Expand All @@ -149,7 +152,7 @@ retry:
if err != nil {
if strings.Contains(err.Error(), "rpc error") {
time.Sleep(backoffDuration)
sp.ResetConn(err)
sp.resetConn(ver, err)
goto retry
}
return 0, 0, errors.Trace(err)
Expand All @@ -166,6 +169,14 @@ retry:

const backoffDuration = 200 * time.Millisecond

func (d *ClientDiscover) resetConn(version uint64, reason error) {
// Avoid repeated Reset operation
if !atomic.CompareAndSwapUint64(&d.version, version, version+1) {
return
}
d.ResetConn(reason)
}

// ResetConn reset the AutoIDAllocClient and underlying grpc connection.
// The next GetClient() call will recreate the client connecting to the correct leader by querying etcd.
func (d *ClientDiscover) ResetConn(reason error) {
Expand All @@ -181,10 +192,14 @@ func (d *ClientDiscover) ResetConn(reason error) {
d.mu.Unlock()
// Close grpc.ClientConn to release resource.
if grpcConn != nil {
err := grpcConn.Close()
if err != nil {
logutil.BgLogger().Warn("[autoid client] close grpc connection error", zap.Error(err))
}
go func() {
// Doen't close the conn immediately, in case the other sessions are still using it.
time.Sleep(200 * time.Millisecond)
err := grpcConn.Close()
if err != nil {
logutil.BgLogger().Warn("close grpc connection error", zap.String("category", "autoid client"), zap.Error(err))
}
}()
}
}

Expand Down Expand Up @@ -213,7 +228,7 @@ func (sp *singlePointAlloc) Rebase(ctx context.Context, newBase int64, _ bool) e

func (sp *singlePointAlloc) rebase(ctx context.Context, newBase int64, force bool) error {
retry:
cli, err := sp.GetClient(ctx)
cli, ver, err := sp.GetClient(ctx)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -228,7 +243,7 @@ retry:
if err != nil {
if strings.Contains(err.Error(), "rpc error") {
time.Sleep(backoffDuration)
sp.ResetConn(err)
sp.resetConn(ver, err)
goto retry
}
return errors.Trace(err)
Expand Down

0 comments on commit 7a0b4df

Please sign in to comment.