Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

just for check #2

Open
wants to merge 20 commits into
base: xiang/master
Choose a base branch
from
  •  
  •  
  •  
3 changes: 2 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,8 @@ type TiKVClient struct {

// Binlog is the config for binlog.
type Binlog struct {
BinlogSocket string `toml:"binlog-socket" json:"binlog-socket"`
//Strategy string `toml:"binlog-strategy" json:"binlog-strategy"`
Strategy string `toml:"binlog-socket" json:"binlog-socket"`
WriteTimeout string `toml:"write-timeout" json:"write-timeout"`
// If IgnoreError is true, when writting binlog meets error, TiDB would
// ignore the error.
Expand Down
4 changes: 2 additions & 2 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@ capacity = 10240000

[binlog]

# Socket file to write binlog.
binlog-socket = ""
# the strategy that choose pump to write binlog.
binlog-strategy = "hash"

# WriteTimeout specifies how long it will wait for writing binlog to pump.
write-timeout = "15s"
Expand Down
4 changes: 2 additions & 2 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestT(t *testing.T) {

func (s *testConfigSuite) TestConfig(c *C) {
conf := new(Config)
conf.Binlog.BinlogSocket = "/tmp/socket"
conf.Binlog.Strategy = "hash"
conf.Binlog.IgnoreError = true
conf.TiKVClient.CommitTimeout = "10s"

Expand All @@ -52,7 +52,7 @@ commit-timeout="41s"`)
c.Assert(conf.Load(configFile), IsNil)

// Test that the original value will not be clear by load the config file that does not contain the option.
c.Assert(conf.Binlog.BinlogSocket, Equals, "/tmp/socket")
c.Assert(conf.Binlog.Strategy, Equals, "hash")

c.Assert(conf.TiKVClient.CommitTimeout, Equals, "41s")
c.Assert(f.Close(), IsNil)
Expand Down
7 changes: 6 additions & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,12 @@ func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage,
ddlJobDoneCh: make(chan struct{}, 1),
ownerManager: manager,
schemaSyncer: syncer,
binlogCli: binloginfo.GetPumpClient(),
binlogCli: binloginfo.GetPumpsClient(),
}
log.Infof("binloginfo.GetPumpsClient() is nil %v", binloginfo.GetPumpsClient() == nil)
log.Infof("ddlCtx.binlogCli is nil %v", ddlCtx.binlogCli == nil)
if binloginfo.GetPumpsClient() == nil {
ddlCtx.binlogCli = nil
}
ddlCtx.mu.hook = hook
ddlCtx.mu.interceptor = &BaseInterceptor{}
Expand Down
4 changes: 2 additions & 2 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,8 @@ func runTestLoadData(c *C, server *Server) {
defer func() {
err = fp.Close()
c.Assert(err, IsNil)
err = os.Remove(path)
c.Assert(err, IsNil)
//err = os.Remove(path)
//c.Assert(err, IsNil)
}()
_, err = fp.WriteString("\n" +
"xxx row1_col1 - row1_col2 1abc\n" +
Expand Down
17 changes: 15 additions & 2 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import (
binlog "github.com/pingcap/tipb/go-binlog"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
pClient "github.com/pingcap/tidb-tools/tidb-binlog/pump_client"
)

// Session context
Expand Down Expand Up @@ -280,6 +281,7 @@ func (s *session) doCommit(ctx context.Context) error {
if s.txn.IsReadOnly() {
return nil
}
log.Infof("s.sessionVars.BinlogClient is nil %v", s.sessionVars.BinlogClient == nil)
if s.sessionVars.BinlogClient != nil {
prewriteValue := binloginfo.GetPrewriteValue(s, false)
if prewriteValue != nil {
Expand All @@ -292,8 +294,9 @@ func (s *session) doCommit(ctx context.Context) error {
Tp: binlog.BinlogType_Prewrite,
PrewriteValue: prewriteData,
},
Client: s.sessionVars.BinlogClient.(binlog.PumpClient),
Client: s.sessionVars.BinlogClient.(*pClient.PumpsClient),
}
log.Info("set option")
s.txn.SetOption(kv.BinlogInfo, info)
}
}
Expand Down Expand Up @@ -1168,7 +1171,17 @@ func createSession(store kv.Storage) (*session, error) {
domain.BindDomain(s, dom)
// session implements variable.GlobalVarAccessor. Bind it to ctx.
s.sessionVars.GlobalVarsAccessor = s
s.sessionVars.BinlogClient = binloginfo.GetPumpClient()
client := binloginfo.GetPumpsClient()
if client == nil {
log.Infof("set s.sessionVars.BinlogClient to nil")
s.sessionVars.BinlogClient = nil
log.Infof("s.sessionVars.BinlogClient is nil %v", s.sessionVars.BinlogClient == nil)
} else {
s.sessionVars.BinlogClient = client
}
//s.sessionVars.BinlogClient = binloginfo.GetPumpsClient()
log.Infof("binloginfo.GetPumpsClient() is nil %v", binloginfo.GetPumpsClient()==nil)
log.Infof("s.sessionVars.BinlogClient is nil %v", s.sessionVars.BinlogClient == nil)
s.txn.init()
return s, nil
}
Expand Down
29 changes: 0 additions & 29 deletions session/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ import (
"github.com/pingcap/tidb/util/chunk"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

type domainMap struct {
Expand Down Expand Up @@ -271,33 +269,6 @@ func newStoreWithRetry(path string, maxRetries int) (kv.Storage, error) {
return s, errors.Trace(err)
}

// DialPumpClientWithRetry tries to dial to binlogSocket,
// if any error happens, it will try to re-dial,
// or return this error when timeout.
func DialPumpClientWithRetry(binlogSocket string, maxRetries int, dialerOpt grpc.DialOption) (*grpc.ClientConn, error) {
var clientCon *grpc.ClientConn
err := util.RunWithRetry(maxRetries, util.RetryInterval, func() (bool, error) {
log.Infof("setup binlog client")
var err error
tlsConfig, err := config.GetGlobalConfig().Security.ToTLSConfig()
if err != nil {
log.Infof("error happen when setting binlog client: %s", errors.ErrorStack(err))
}

if tlsConfig != nil {
clientCon, err = grpc.Dial(binlogSocket, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)), dialerOpt)
} else {
clientCon, err = grpc.Dial(binlogSocket, grpc.WithInsecure(), dialerOpt)
}

if err != nil {
log.Infof("error happen when setting binlog client: %s", errors.ErrorStack(err))
}
return true, errors.Trace(err)
})
return clientCon, errors.Trace(err)
}

var queryStmtTable = []string{"explain", "select", "show", "execute", "describe", "desc", "admin"}

func trimSQL(sql string) string {
Expand Down
78 changes: 37 additions & 41 deletions sessionctx/binloginfo/binloginfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import (
"github.com/pingcap/tidb/terror"
binlog "github.com/pingcap/tipb/go-binlog"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
//"golang.org/x/net/context"
pClient "github.com/pingcap/tidb-tools/tidb-binlog/pump_client"
"google.golang.org/grpc"
)

Expand All @@ -39,28 +40,36 @@ var binlogWriteTimeout = 15 * time.Second

// pumpClient is the gRPC client to write binlog, it is opened on server start and never close,
// shared by all sessions.
var pumpClient binlog.PumpClient
var pumpClientLock sync.RWMutex
//var pumpClient binlog.PumpClient
//var pumpClientLock sync.RWMutex
var pumpsClient *pClient.PumpsClient
var pumpsClientLock sync.RWMutex

// BinlogInfo contains binlog data and binlog client.
type BinlogInfo struct {
Data *binlog.Binlog
Client binlog.PumpClient
Data *binlog.Binlog
//Client binlog.PumpClient
Client *pClient.PumpsClient
}

// GetPumpClient gets the pump client instance.
func GetPumpClient() binlog.PumpClient {
pumpClientLock.RLock()
client := pumpClient
pumpClientLock.RUnlock()
// GetPumpsClient gets the pump client instance.
func GetPumpsClient() *pClient.PumpsClient {
pumpsClientLock.RLock()
client := pumpsClient
log.Infof("GetPumpsClient client is nil %v", client == nil)
pumpsClientLock.RUnlock()
if client == nil {
return nil
}
return client
}

// SetPumpClient sets the pump client instance.
func SetPumpClient(client binlog.PumpClient) {
pumpClientLock.Lock()
pumpClient = client
pumpClientLock.Unlock()
// SetPumpsClient sets the PumpsClient instance.
func SetPumpsClient(client *pClient.PumpsClient) {
pumpsClientLock.Lock()
log.Info("SetPumpsClient")
pumpsClient = client
pumpsClientLock.Unlock()
}

// SetGRPCTimeout sets grpc timeout for writing binlog.
Expand Down Expand Up @@ -111,47 +120,33 @@ func (info *BinlogInfo) WriteBinlog(clusterID uint64) error {
return nil
}

commitData, err := info.Data.Marshal()
if err != nil {
return errors.Trace(err)
}
req := &binlog.WriteBinlogReq{ClusterID: clusterID, Payload: commitData}

// Retry many times because we may raise CRITICAL error here.
for i := 0; i < 20; i++ {
var resp *binlog.WriteBinlogResp
ctx, cancel := context.WithTimeout(context.Background(), binlogWriteTimeout)
resp, err = info.Client.WriteBinlog(ctx, req)
cancel()
if err == nil && resp.Errmsg != "" {
err = errors.New(resp.Errmsg)
}
if err == nil {
return nil
}
if strings.Contains(err.Error(), "received message larger than max") {
// This kind of error is not critical and not retryable, return directly.
return errors.Errorf("binlog data is too large (%s)", err.Error())
}
log.Errorf("write binlog error %v", err)
time.Sleep(time.Second)
if info.Client == nil {
log.Error("pump client is nil")
return errors.New("pump client is nil")
}

log.Debugf("begin write binlog, start ts: %d, type: %s", info.Data.StartTs, info.Data.Tp)
err := info.Client.WriteBinlog(info.Data)
log.Debugf("end write binlog, start ts: %d, type: %s", info.Data.StartTs, info.Data.Tp)
if err != nil {
log.Errorf("write binlog fail %v", errors.ErrorStack(err))
if atomic.LoadUint32(&ignoreError) == 1 {
log.Errorf("critical error, write binlog fail but error ignored: %s", errors.ErrorStack(err))
metrics.CriticalErrorCounter.Add(1)
// If error happens once, we'll stop writing binlog.
atomic.CompareAndSwapUint32(&skipBinlog, skip, skip+1)
return nil
}

return terror.ErrCritical.GenByArgs(err)
}

return terror.ErrCritical.GenByArgs(err)
return nil
}

// SetDDLBinlog sets DDL binlog in the kv.Transaction.
func SetDDLBinlog(client interface{}, txn kv.Transaction, jobID int64, ddlQuery string) {
log.Infof("SetDDLBinlog client is nil %v", client)
if client == nil {
return
}
Expand All @@ -162,8 +157,9 @@ func SetDDLBinlog(client interface{}, txn kv.Transaction, jobID int64, ddlQuery
DdlJobId: jobID,
DdlQuery: []byte(ddlQuery),
},
Client: client.(binlog.PumpClient),
Client: client.(*pClient.PumpsClient),
}
log.Info("txn.SetOption(kv.BinlogInfo, info)")
txn.SetOption(kv.BinlogInfo, info)
}

Expand Down
1 change: 1 addition & 0 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,7 @@ func (c *twoPhaseCommitter) writeFinishBinlog(tp binlog.BinlogType, commitTS int
}

func (c *twoPhaseCommitter) shouldWriteBinlog() bool {
log.Infof("shouldWriteBinlog %v", c.txn.us.GetOption(kv.BinlogInfo) != nil)
return c.txn.us.GetOption(kv.BinlogInfo) != nil
}

Expand Down
1 change: 1 addition & 0 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,7 @@ func (t *tableCommon) Type() table.Type {
}

func shouldWriteBinlog(ctx sessionctx.Context) bool {
log.Infof("ctx.GetSessionVars().BinlogClient is nil %v", ctx.GetSessionVars().BinlogClient == nil)
if ctx.GetSessionVars().BinlogClient == nil {
return false
}
Expand Down
Loading