diff --git a/config/config.go b/config/config.go index 19b1dfbb027c9..427831b3b5d72 100644 --- a/config/config.go +++ b/config/config.go @@ -30,6 +30,7 @@ type Config struct { Lease string `toml:"lease" json:"lease"` RunDDL bool `toml:"run-ddl" json:"run-ddl"` SplitTable bool `toml:"split-table" json:"split-table"` + TokenLimit int `toml:"token-limit" json:"token-limit"` Log Log `toml:"log" json:"log"` Security Security `toml:"security" json:"security"` @@ -97,12 +98,13 @@ type PlanCache struct { } var defaultConf = Config{ - Host: "0.0.0.0", - Port: 4000, - Store: "mocktikv", - Path: "/tmp/tidb", - RunDDL: true, - Lease: "10s", + Host: "0.0.0.0", + Port: 4000, + Store: "mocktikv", + Path: "/tmp/tidb", + RunDDL: true, + Lease: "10s", + TokenLimit: 1000, Log: Log{ Level: "info", Format: "text", @@ -154,6 +156,9 @@ func GetGlobalConfig() *Config { // Load loads config options from a toml file. func (c *Config) Load(confFile string) error { _, err := toml.DecodeFile(confFile, c) + if c.TokenLimit <= 0 { + c.TokenLimit = 1000 + } return errors.Trace(err) } diff --git a/config/config.toml.example b/config/config.toml.example index 6e8246be8b838..1d8aa3d1c9681 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -27,6 +27,9 @@ lease = "10s" # When create table, split a separated region for it. # split-table = false +# The limit of concurrent executed sessions. +# token-limit = 1000 + [log] # Log level: info, debug, warn, error, fatal. level = "info" diff --git a/context/context.go b/context/context.go index 1b7c24999df83..d742d67e161b6 100644 --- a/context/context.go +++ b/context/context.go @@ -76,6 +76,8 @@ func (t basicCtxType) String() string { return "query_string" case Initing: return "initing" + case LastExecuteDDL: + return "last_execute_ddl" } return "unknown" } @@ -86,4 +88,6 @@ const ( QueryString basicCtxType = 1 // Initing is the key for indicating if the server is running bootstrap or upgrad job. Initing basicCtxType = 2 + // LastExecuteDDL is the key for whether the session execute a ddl command last time. + LastExecuteDDL basicCtxType = 3 ) diff --git a/executor/adapter.go b/executor/adapter.go index 0317b1899e012..18f2b188c9418 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -289,10 +289,12 @@ func (a *ExecStmt) logSlowQuery(succ bool) { sql = sql[:cfg.Log.QueryLogMaxLen] + fmt.Sprintf("(len:%d)", len(sql)) } connID := a.ctx.GetSessionVars().ConnectionID + currentDB := a.ctx.GetSessionVars().CurrentDB logEntry := log.NewEntry(logutil.SlowQueryLogger) logEntry.Data = log.Fields{ "connectionId": connID, "costTime": costTime, + "database": currentDB, "sql": sql, } if costTime < time.Duration(cfg.Log.SlowThreshold)*time.Millisecond { diff --git a/new_session_test.go b/new_session_test.go index a176396f1db0c..36765e54ddf04 100644 --- a/new_session_test.go +++ b/new_session_test.go @@ -1031,6 +1031,15 @@ func (s *testSessionSuite) TestMultiStmts(c *C) { tk.MustQuery("select * from t1;").Check(testkit.Rows("1")) } +func (s *testSessionSuite) TestLastExecuteDDLFlag(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(id int)") + c.Assert(tk.Se.Value(context.LastExecuteDDL), NotNil) + tk.MustExec("insert into t1 values (1)") + c.Assert(tk.Se.Value(context.LastExecuteDDL), IsNil) +} + func (s *testSessionSuite) TestDecimal(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) diff --git a/server/conn.go b/server/conn.go index 62810c519d1a3..8ce4cba6477db 100644 --- a/server/conn.go +++ b/server/conn.go @@ -48,6 +48,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/juju/errors" + "github.com/pingcap/tidb/context" "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/mysql" @@ -458,6 +459,11 @@ func (cc *clientConn) addMetrics(cmd byte, startTime time.Time, err error) { case mysql.ComQuit: label = "Quit" case mysql.ComQuery: + if cc.ctx.Value(context.LastExecuteDDL) != nil { + // Don't take DDL execute time into account. + // It's already recorded by other metrics in ddl package. + return + } label = "Query" case mysql.ComPing: label = "Ping" diff --git a/server/region_handler_test.go b/server/region_handler_test.go index e9a092e979794..2f0d5837a7ad4 100644 --- a/server/region_handler_test.go +++ b/server/region_handler_test.go @@ -195,10 +195,9 @@ func (ts *TidbRegionHandlerTestSuite) startServer(c *C) { c.Assert(err, IsNil) tidbdrv := NewTiDBDriver(store) - cfg := &config.Config{ - Port: 4001, - Store: "tikv", - } + cfg := config.NewConfig() + cfg.Port = 4001 + cfg.Store = "tikv" cfg.Status.StatusPort = 10090 cfg.Status.ReportStatus = true diff --git a/server/server.go b/server/server.go index df0f24197cdd4..1ac59f8c9a031 100644 --- a/server/server.go +++ b/server/server.go @@ -132,14 +132,12 @@ func (s *Server) skipAuth() bool { return s.cfg.Socket != "" } -const tokenLimit = 1000 - // NewServer creates a new Server. func NewServer(cfg *config.Config, driver IDriver) (*Server, error) { s := &Server{ cfg: cfg, driver: driver, - concurrentLimiter: NewTokenLimiter(tokenLimit), + concurrentLimiter: NewTokenLimiter(cfg.TokenLimit), rwlock: &sync.RWMutex{}, clients: make(map[uint32]*clientConn), stopListenerCh: make(chan struct{}, 1), diff --git a/server/tidb_test.go b/server/tidb_test.go index ebee1749999a8..69f4a60d31926 100644 --- a/server/tidb_test.go +++ b/server/tidb_test.go @@ -48,16 +48,11 @@ func (ts *TidbTestSuite) SetUpSuite(c *C) { _, err = tidb.BootstrapSession(store) c.Assert(err, IsNil) ts.tidbdrv = NewTiDBDriver(store) - cfg := &config.Config{ - Port: 4001, - Status: config.Status{ - ReportStatus: true, - StatusPort: 10090, - }, - Performance: config.Performance{ - TCPKeepAlive: true, - }, - } + cfg := config.NewConfig() + cfg.Port = 4001 + cfg.Status.ReportStatus = true + cfg.Status.StatusPort = 10090 + cfg.Performance.TCPKeepAlive = true server, err := NewServer(cfg, ts.tidbdrv) c.Assert(err, IsNil) @@ -145,12 +140,9 @@ func (ts *TidbTestSuite) TestMultiStatements(c *C) { } func (ts *TidbTestSuite) TestSocket(c *C) { - cfg := &config.Config{ - Socket: "/tmp/tidbtest.sock", - Status: config.Status{ - StatusPort: 10091, - }, - } + cfg := config.NewConfig() + cfg.Socket = "/tmp/tidbtest.sock" + cfg.Status.StatusPort = 10091 server, err := NewServer(cfg, ts.tidbdrv) c.Assert(err, IsNil) @@ -280,13 +272,10 @@ func (ts *TidbTestSuite) TestTLS(c *C) { config.TLSConfig = "skip-verify" config.Addr = "localhost:4002" } - cfg := &config.Config{ - Port: 4002, - Status: config.Status{ - ReportStatus: true, - StatusPort: 10091, - }, - } + cfg := config.NewConfig() + cfg.Port = 4002 + cfg.Status.ReportStatus = true + cfg.Status.StatusPort = 10091 server, err := NewServer(cfg, ts.tidbdrv) c.Assert(err, IsNil) go server.Run() @@ -301,16 +290,13 @@ func (ts *TidbTestSuite) TestTLS(c *C) { config.TLSConfig = "skip-verify" config.Addr = "localhost:4003" } - cfg = &config.Config{ - Port: 4003, - Status: config.Status{ - ReportStatus: true, - StatusPort: 10091, - }, - Security: config.Security{ - SSLCert: "/tmp/server-cert.pem", - SSLKey: "/tmp/server-key.pem", - }, + cfg = config.NewConfig() + cfg.Port = 4003 + cfg.Status.ReportStatus = true + cfg.Status.StatusPort = 10091 + cfg.Security = config.Security{ + SSLCert: "/tmp/server-cert.pem", + SSLKey: "/tmp/server-key.pem", } server, err = NewServer(cfg, ts.tidbdrv) c.Assert(err, IsNil) @@ -334,17 +320,14 @@ func (ts *TidbTestSuite) TestTLS(c *C) { config.TLSConfig = "client-certificate" config.Addr = "localhost:4004" } - cfg = &config.Config{ - Port: 4004, - Status: config.Status{ - ReportStatus: true, - StatusPort: 10091, - }, - Security: config.Security{ - SSLCA: "/tmp/ca-cert.pem", - SSLCert: "/tmp/server-cert.pem", - SSLKey: "/tmp/server-key.pem", - }, + cfg = config.NewConfig() + cfg.Port = 4004 + cfg.Status.ReportStatus = true + cfg.Status.StatusPort = 10091 + cfg.Security = config.Security{ + SSLCA: "/tmp/ca-cert.pem", + SSLCert: "/tmp/server-cert.pem", + SSLKey: "/tmp/server-key.pem", } server, err = NewServer(cfg, ts.tidbdrv) c.Assert(err, IsNil) diff --git a/session.go b/session.go index 40783e9b5d508..50edd87104bf7 100644 --- a/session.go +++ b/session.go @@ -655,6 +655,11 @@ func (s *session) SetProcessInfo(sql string) { func (s *session) executeStatement(connID uint64, stmtNode ast.StmtNode, stmt ast.Statement, recordSets []ast.RecordSet) ([]ast.RecordSet, error) { s.SetValue(context.QueryString, stmt.OriginText()) + if _, ok := stmtNode.(ast.DDLNode); ok { + s.SetValue(context.LastExecuteDDL, true) + } else { + s.ClearValue(context.LastExecuteDDL) + } startTS := time.Now() recordSet, err := runStmt(s, stmt) diff --git a/tidb-server/main.go b/tidb-server/main.go index 8eb1f99c361f0..cc5873401b30d 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -67,6 +67,7 @@ const ( nmMetricsAddr = "metrics-addr" nmMetricsInterval = "metrics-interval" nmDdlLease = "lease" + nmTokenLimit = "token-limit" ) var ( @@ -82,6 +83,7 @@ var ( binlogSocket = flag.String(nmBinlogSocket, "", "socket file to write binlog") runDDL = flagBoolean(nmRunDDL, true, "run ddl worker on this tidb-server") ddlLease = flag.String(nmDdlLease, "10s", "schema lease duration, very dangerous to change only if you know what you do") + tokenLimit = flag.Int(nmTokenLimit, 1000, "the limit of concurrent executed sessions") // Log logLevel = flag.String(nmLogLevel, "info", "log level: info, debug, warn, error, fatal") @@ -273,6 +275,9 @@ func overrideConfig() { if actualFlags[nmDdlLease] { cfg.Lease = *ddlLease } + if actualFlags[nmTokenLimit] { + cfg.TokenLimit = *tokenLimit + } // Log if actualFlags[nmLogLevel] { @@ -347,8 +352,9 @@ func createServer() { terror.MustNil(err) if cfg.XProtocol.XServer { xcfg := &xserver.Config{ - Addr: fmt.Sprintf("%s:%d", cfg.XProtocol.XHost, cfg.XProtocol.XPort), - Socket: cfg.XProtocol.XSocket, + Addr: fmt.Sprintf("%s:%d", cfg.XProtocol.XHost, cfg.XProtocol.XPort), + Socket: cfg.XProtocol.XSocket, + TokenLimit: cfg.TokenLimit, } xsvr, err = xserver.NewServer(xcfg) terror.MustNil(err) diff --git a/x-server/config.go b/x-server/config.go index d6e560f970117..64e39d9527954 100644 --- a/x-server/config.go +++ b/x-server/config.go @@ -15,7 +15,8 @@ package xserver // Config contains configuration options. type Config struct { - Addr string `json:"addr" toml:"addr"` - Socket string `json:"socket" toml:"socket"` - SkipAuth bool `json:"skip_auth" toml:"skip_auth"` + Addr string `json:"addr" toml:"addr"` + Socket string `json:"socket" toml:"socket"` + SkipAuth bool `json:"skip-auth" toml:"skip-auth"` + TokenLimit int `json:"token-limit" toml:"token-limit"` } diff --git a/x-server/server.go b/x-server/server.go index 59d67ce7ab300..eb8b746a6e9a3 100644 --- a/x-server/server.go +++ b/x-server/server.go @@ -29,8 +29,6 @@ import ( "github.com/pingcap/tidb/util/arena" ) -const tokenLimit = 1000 - var ( baseConnID uint32 ) @@ -49,7 +47,7 @@ type Server struct { func NewServer(cfg *Config) (s *Server, err error) { s = &Server{ cfg: cfg, - concurrentLimiter: server.NewTokenLimiter(tokenLimit), + concurrentLimiter: server.NewTokenLimiter(cfg.TokenLimit), rwlock: &sync.RWMutex{}, stopListenerCh: make(chan struct{}, 1), }