Skip to content

Commit

Permalink
store: add OpenWithOptions to avoid caller modifying global config (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Feb 2, 2021
1 parent 7fbe3e5 commit f05f4a8
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 15 deletions.
28 changes: 28 additions & 0 deletions store/kv_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package store

import (
. "github.com/pingcap/check"
"github.com/pingcap/tidb/store/tikv/config"
)

type testTiKVDriverSuite struct {
OneByOneSuite
store *tikvStore
}

var _ = Suite(&testTiKVDriverSuite{})

func (s *testTiKVDriverSuite) TestSetDefaultAndOptions(c *C) {
globalConfig := config.GetGlobalConfig()
originSec := globalConfig.Security

d := TiKVDriver{}
security := config.Security{ClusterSSLCA: "test"}
d.setDefaultAndOptions(WithSecurity(security))

c.Assert(d.security, DeepEquals, security)
c.Assert(d.tikvConfig, DeepEquals, globalConfig.TiKVClient)
c.Assert(d.txnLocalLatches, DeepEquals, globalConfig.TxnLocalLatches)
c.Assert(d.pdConfig, DeepEquals, globalConfig.PDClient)
c.Assert(config.GetGlobalConfig().Security, DeepEquals, originSec)
}
79 changes: 64 additions & 15 deletions store/tikv_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,34 +47,83 @@ func init() {
rand.Seed(time.Now().UnixNano())
}

// DriverOption is a function that changes some config of Driver
type DriverOption func(*TiKVDriver)

// WithSecurity changes the config.Security used by tikv driver.
func WithSecurity(s config.Security) DriverOption {
return func(c *TiKVDriver) {
c.security = s
}
}

// WithTiKVClientConfig changes the config.TiKVClient used by tikv driver.
func WithTiKVClientConfig(client config.TiKVClient) DriverOption {
return func(c *TiKVDriver) {
c.tikvConfig = client
}
}

// WithTxnLocalLatches changes the config.TxnLocalLatches used by tikv driver.
func WithTxnLocalLatches(t config.TxnLocalLatches) DriverOption {
return func(c *TiKVDriver) {
c.txnLocalLatches = t
}
}

// WithPDClientConfig changes the config.PDClient used by tikv driver.
func WithPDClientConfig(client config.PDClient) DriverOption {
return func(c *TiKVDriver) {
c.pdConfig = client
}
}

// TiKVDriver implements engine TiKV.
type TiKVDriver struct {
pdConfig config.PDClient
security config.Security
tikvConfig config.TiKVClient
txnLocalLatches config.TxnLocalLatches
}

// Open opens or creates an TiKV storage with given path.
// Open opens or creates an TiKV storage with given path using global config.
// Path example: tikv://etcd-node1:port,etcd-node2:port?cluster=1&disableGC=false
func (d TiKVDriver) Open(path string) (kv.Storage, error) {
return d.OpenWithOptions(path)
}

func (d *TiKVDriver) setDefaultAndOptions(options ...DriverOption) {
tidbCfg := config.GetGlobalConfig()
d.pdConfig = tidbCfg.PDClient
d.security = tidbCfg.Security
d.tikvConfig = tidbCfg.TiKVClient
d.txnLocalLatches = tidbCfg.TxnLocalLatches
for _, f := range options {
f(d)
}
}

// OpenWithOptions is used by other program that use tidb as a library, to avoid modifying GlobalConfig
// unspecified options will be set to global config
func (d TiKVDriver) OpenWithOptions(path string, options ...DriverOption) (kv.Storage, error) {
mc.Lock()
defer mc.Unlock()
security := config.GetGlobalConfig().Security
pdConfig := config.GetGlobalConfig().PDClient
tikvConfig := config.GetGlobalConfig().TiKVClient
txnLocalLatches := config.GetGlobalConfig().TxnLocalLatches
d.setDefaultAndOptions(options...)
etcdAddrs, disableGC, err := config.ParsePath(path)
if err != nil {
return nil, errors.Trace(err)
}

pdCli, err := pd.NewClient(etcdAddrs, pd.SecurityOption{
CAPath: security.ClusterSSLCA,
CertPath: security.ClusterSSLCert,
KeyPath: security.ClusterSSLKey,
CAPath: d.security.ClusterSSLCA,
CertPath: d.security.ClusterSSLCert,
KeyPath: d.security.ClusterSSLKey,
}, pd.WithGRPCDialOptions(
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Duration(tikvConfig.GrpcKeepAliveTime) * time.Second,
Timeout: time.Duration(tikvConfig.GrpcKeepAliveTimeout) * time.Second,
Time: time.Duration(d.tikvConfig.GrpcKeepAliveTime) * time.Second,
Timeout: time.Duration(d.tikvConfig.GrpcKeepAliveTimeout) * time.Second,
}),
), pd.WithCustomTimeoutOption(time.Duration(pdConfig.PDServerTimeout)*time.Second))
), pd.WithCustomTimeoutOption(time.Duration(d.pdConfig.PDServerTimeout)*time.Second))
pdCli = execdetails.InterceptedPDClient{Client: pdCli}

if err != nil {
Expand All @@ -87,7 +136,7 @@ func (d TiKVDriver) Open(path string) (kv.Storage, error) {
return store, nil
}

tlsConfig, err := security.ToTLSConfig()
tlsConfig, err := d.security.ToTLSConfig()
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -98,12 +147,12 @@ func (d TiKVDriver) Open(path string) (kv.Storage, error) {
}

coprCacheConfig := &config.GetGlobalConfig().TiKVClient.CoprCache
s, err := tikv.NewKVStore(uuid, &tikv.CodecPDClient{Client: pdCli}, spkv, tikv.NewRPCClient(security), !disableGC, coprCacheConfig)
s, err := tikv.NewKVStore(uuid, &tikv.CodecPDClient{Client: pdCli}, spkv, tikv.NewRPCClient(d.security), !disableGC, coprCacheConfig)
if err != nil {
return nil, errors.Trace(err)
}
if txnLocalLatches.Enabled {
s.EnableTxnLocalLatches(txnLocalLatches.Capacity)
if d.txnLocalLatches.Enabled {
s.EnableTxnLocalLatches(d.txnLocalLatches.Capacity)
}

store := &tikvStore{
Expand Down

0 comments on commit f05f4a8

Please sign in to comment.