From b6d31ffcba1ad4ea6d25736a5ff625301f9dccfc Mon Sep 17 00:00:00 2001 From: lysu Date: Wed, 13 Mar 2019 20:18:09 +0800 Subject: [PATCH 1/7] plugin: add audit plugin extension point --- cmd/pluginpkg/pluginpkg.go | 15 ++- docs/design/2018-12-10-plugin-framework.md | 2 +- executor/adapter.go | 26 +++- executor/set.go | 11 ++ planner/core/planbuilder.go | 10 ++ planner/optimize.go | 2 + plugin/README.md | 3 + plugin/audit.go | 87 ++++++++++++ plugin/conn_ip_example/conn_ip_example.go | 4 +- .../conn_ip_example/conn_ip_example_test.go | 21 ++- plugin/conn_ip_example/manifest.toml | 2 +- plugin/plugin.go | 84 +++++++----- plugin/spi.go | 28 ++-- plugin/spi_test.go | 5 +- server/conn.go | 57 ++++++-- server/server.go | 124 ++++++++++++++---- server/tidb_test.go | 2 +- session/session.go | 16 ++- sessionctx/stmtctx/stmtctx.go | 7 + sessionctx/variable/session.go | 23 ++++ util/sys/linux/sys_linux.go | 40 ++++++ util/sys/linux/sys_other.go | 24 ++++ util/sys/linux/sys_test.go | 29 ++++ 23 files changed, 519 insertions(+), 103 deletions(-) create mode 100644 plugin/README.md create mode 100644 plugin/audit.go create mode 100644 util/sys/linux/sys_linux.go create mode 100644 util/sys/linux/sys_other.go create mode 100644 util/sys/linux/sys_test.go diff --git a/cmd/pluginpkg/pluginpkg.go b/cmd/pluginpkg/pluginpkg.go index e1b1db5a3dba3..335a8f5b93a49 100644 --- a/cmd/pluginpkg/pluginpkg.go +++ b/cmd/pluginpkg/pluginpkg.go @@ -62,9 +62,18 @@ func PluginManifest() *plugin.Manifest { }, {{end}} }, - Validate: {{.validate}}, - OnInit: {{.onInit}}, - OnShutdown: {{.onShutdown}}, + {{if .validate }} + Validate: {{.validate}}, + {{end}} + {{if .onInit }} + OnInit: {{.onInit}}, + {{end}} + {{if .onShutdown }} + OnShutdown: {{.onShutdown}}, + {{end}} + {{if .onFlush }} + OnFlush: {{.onFlush}}, + {{end}} }, {{range .export}} {{.extPoint}}: {{.impl}}, diff --git a/docs/design/2018-12-10-plugin-framework.md b/docs/design/2018-12-10-plugin-framework.md index 59b11d7ed8d2d..8d2de6671a9b5 100644 --- a/docs/design/2018-12-10-plugin-framework.md +++ b/docs/design/2018-12-10-plugin-framework.md @@ -140,7 +140,7 @@ validate = "Validate" onInit = "OnInit" onShutdown = "OnShutdown" export = [ - {extPoint="NotifyEvent", impl="NotifyEvent"} + {extPoint="OnGeneralEvent", impl="OnGeneralEvent"} ] ``` diff --git a/executor/adapter.go b/executor/adapter.go index 32f1c1431c398..21eed8410b834 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -36,6 +36,7 @@ import ( "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/planner" plannercore "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/plugin" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util/chunk" @@ -129,6 +130,7 @@ func (a *recordSet) NewRecordBatch() *chunk.RecordBatch { func (a *recordSet) Close() error { err := a.executor.Close() a.stmt.LogSlowQuery(a.txnStartTS, a.lastErr == nil) + a.stmt.logAudit() return errors.Trace(err) } @@ -295,6 +297,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Co } } a.LogSlowQuery(txnTS, err == nil) + a.logAudit() }() err = e.Next(ctx, chunk.NewRecordBatch(e.newFirstChunk())) @@ -362,8 +365,30 @@ func (a *ExecStmt) buildExecutor(ctx sessionctx.Context) (Executor, error) { // QueryReplacer replaces new line and tab for grep result including query string. var QueryReplacer = strings.NewReplacer("\r", " ", "\n", " ", "\t", " ") +func (a *ExecStmt) logAudit() { + sessVars := a.Ctx.GetSessionVars() + if sessVars.InRestrictedSQL { + return + } + err := plugin.ForeachPlugin(plugin.Audit, func(p *plugin.Plugin) error { + audit := plugin.DeclareAuditManifest(p.Manifest) + if audit.OnGeneralEvent != nil { + cmd, exists := mysql.Command2Str[byte(atomic.LoadUint32(&a.Ctx.GetSessionVars().CommandValue))] + if !exists { + cmd = "" + } + audit.OnGeneralEvent(context.Background(), sessVars, plugin.Log, cmd) + } + return nil + }) + if err != nil { + log.Error("log audit log failure", zap.Error(err)) + } +} + // LogSlowQuery is used to print the slow query in the log files. func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool) { + sessVars := a.Ctx.GetSessionVars() level := log.GetLevel() if level > zapcore.WarnLevel { return @@ -378,7 +403,6 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool) { if maxQueryLen := atomic.LoadUint64(&cfg.Log.QueryLogMaxLen); uint64(len(sql)) > maxQueryLen { sql = fmt.Sprintf("%.*q(len:%d)", maxQueryLen, sql, len(a.Text)) } - sessVars := a.Ctx.GetSessionVars() sql = QueryReplacer.Replace(sql) + sessVars.GetExecuteArgumentsInfo() var tableIDs, indexIDs string diff --git a/executor/set.go b/executor/set.go index a5b2a7f6a7d80..43635cc1cacee 100644 --- a/executor/set.go +++ b/executor/set.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/plugin" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" @@ -138,6 +139,16 @@ func (e *SetExecutor) setSysVariable(name string, v *expression.VarAssignment) e if err != nil { return errors.Trace(err) } + err = plugin.ForeachPlugin(plugin.Audit, func(p *plugin.Plugin) error { + auditPlugin := plugin.DeclareAuditManifest(p.Manifest) + if auditPlugin.OnGlobalVariableEvent != nil { + auditPlugin.OnGlobalVariableEvent(context.Background(), e.ctx.GetSessionVars(), name, svalue) + } + return nil + }) + if err != nil { + return err + } } else { // Set session scope system variable. if sysVar.Scope&variable.ScopeSession == 0 { diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index ee1f5e640f29f..8aa066d299649 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/planner/property" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/types/parser_driver" @@ -159,6 +160,15 @@ func (b *PlanBuilder) GetVisitInfo() []visitInfo { return b.visitInfo } +// GetDBTableInfo gets the accessed dbs and tables info. +func (b *PlanBuilder) GetDBTableInfo() (tables map[stmtctx.TableEntry]struct{}) { + tables = make(map[stmtctx.TableEntry]struct{}) + for _, v := range b.visitInfo { + tables[stmtctx.TableEntry{DB: v.db, Table: v.table}] = struct{}{} + } + return +} + // GetOptFlag gets the optFlag of the PlanBuilder. func (b *PlanBuilder) GetOptFlag() uint64 { return b.optFlag diff --git a/planner/optimize.go b/planner/optimize.go index 0f2cfe3a5ba67..c8962916a110e 100644 --- a/planner/optimize.go +++ b/planner/optimize.go @@ -40,6 +40,8 @@ func Optimize(ctx sessionctx.Context, node ast.Node, is infoschema.InfoSchema) ( return nil, err } + ctx.GetSessionVars().StmtCtx.Tables = builder.GetDBTableInfo() + // Check privilege. Maybe it's better to move this to the Preprocess, but // we need the table information to check privilege, which is collected // into the visitInfo in the logical plan builder. diff --git a/plugin/README.md b/plugin/README.md new file mode 100644 index 0000000000000..7735c7cd15efa --- /dev/null +++ b/plugin/README.md @@ -0,0 +1,3 @@ +# The Plugin Framework + +https://github.com/pingcap/tidb/blob/master/docs/design/2018-12-10-plugin-framework.md diff --git a/plugin/audit.go b/plugin/audit.go new file mode 100644 index 0000000000000..8ad556495ac62 --- /dev/null +++ b/plugin/audit.go @@ -0,0 +1,87 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package plugin + +import ( + "context" + + "github.com/pingcap/parser/auth" + "github.com/pingcap/tidb/sessionctx/variable" +) + +// GeneralEvent presents TiDB generate event. +type GeneralEvent byte + +const ( + // Log presents log event. + Log GeneralEvent = iota + // Error presents error event. + Error + // Result presents result event. + Result + // Status presents status event. + Status +) + +// ConnectionEvent presents TiDB connection event. +type ConnectionEvent byte + +const ( + // Connected presents new connection establish event(finish auth). + Connected ConnectionEvent = iota + // Disconnect presents disconnect event. + Disconnect + // ChangeUser presents change user. + ChangeUser + // PreAuth presents event before start auth. + PreAuth +) + +func (c ConnectionEvent) String() string { + switch c { + case Connected: + return "Connected" + case Disconnect: + return "Disconnect" + case ChangeUser: + return "ChangeUser" + case PreAuth: + return "PreAuth" + } + return "" +} + +// ParseEvent presents events happen around parser. +type ParseEvent byte + +const ( + // PreParse presents event before parse. + PreParse ParseEvent = 1 + iota + // PostParse presents event after parse. + PostParse +) + +// AuditManifest presents a sub-manifest that every audit plugin must provide. +type AuditManifest struct { + Manifest + // OnConnectionEvent will be called when TiDB receive or disconnect from client. + // return error will ignore and close current connection. + OnConnectionEvent func(ctx context.Context, identity *auth.UserIdentity, event ConnectionEvent, info *variable.ConnectionInfo) error + // OnGeneralEvent will be called during TiDB execution. + OnGeneralEvent func(ctx context.Context, sctx *variable.SessionVars, event GeneralEvent, cmd string) + // OnGlobalVariableEvent will be called when Change GlobalVariable. + OnGlobalVariableEvent func(ctx context.Context, sctx *variable.SessionVars, varName, varValue string) + // OnParseEvent will be called around parse logic. + OnParseEvent func(ctx context.Context, sctx *variable.SessionVars, event ParseEvent) error +} diff --git a/plugin/conn_ip_example/conn_ip_example.go b/plugin/conn_ip_example/conn_ip_example.go index 30cafec357fed..ebdc33ca5f114 100644 --- a/plugin/conn_ip_example/conn_ip_example.go +++ b/plugin/conn_ip_example/conn_ip_example.go @@ -40,8 +40,8 @@ func OnShutdown(ctx context.Context, manifest *plugin.Manifest) error { return nil } -// NotifyEvent implements TiDB Audit plugin's NotifyEvent SPI. -func NotifyEvent(ctx context.Context) error { +// OnGeneralEvent implements TiDB Audit plugin's OnGeneralEvent SPI. +func OnGeneralEvent(ctx context.Context, sctx *variable.SessionVars, event plugin.GeneralEvent, cmd byte, stmt string) error { fmt.Println("conn_ip_example notifiy called") fmt.Println("variable test: ", variable.GetSysVar("conn_ip_example_test_variable").Value) fmt.Printf("new connection by %s\n", ctx.Value("ip")) diff --git a/plugin/conn_ip_example/conn_ip_example_test.go b/plugin/conn_ip_example/conn_ip_example_test.go index 91d73ee1ce293..70422d8ce5ffb 100644 --- a/plugin/conn_ip_example/conn_ip_example_test.go +++ b/plugin/conn_ip_example/conn_ip_example_test.go @@ -30,17 +30,24 @@ func LoadRunShutdownPluginExample() { PluginVarNames: &pluginVarNames, } - err := plugin.Init(ctx, cfg) + err := plugin.Load(ctx, cfg) if err != nil { panic(err) } - ps := plugin.GetByKind(plugin.Audit) - for _, auditPlugin := range ps { - if auditPlugin.State != plugin.Ready { - continue - } - plugin.DeclareAuditManifest(auditPlugin.Manifest).NotifyEvent(context.Background(), nil) + // load and start TiDB domain. + + err = plugin.Init(ctx, cfg) + if err != nil { + panic(err) + } + + err = plugin.ForeachPlugin(plugin.Audit, func(auditPlugin *plugin.Plugin) error { + plugin.DeclareAuditManifest(auditPlugin.Manifest).OnGeneralEvent(context.Background(), nil, plugin.Log, "QUERY") + return nil + }) + if err != nil { + panic(err) } plugin.Shutdown(context.Background()) diff --git a/plugin/conn_ip_example/manifest.toml b/plugin/conn_ip_example/manifest.toml index 8f1a2c74ba7f8..b57badaf689f0 100644 --- a/plugin/conn_ip_example/manifest.toml +++ b/plugin/conn_ip_example/manifest.toml @@ -11,5 +11,5 @@ validate = "Validate" onInit = "OnInit" onShutdown = "OnShutdown" export = [ - {extPoint="NotifyEvent", impl="NotifyEvent"} + {extPoint="OnGeneralEvent", impl="OnGeneralEvent"} ] diff --git a/plugin/plugin.go b/plugin/plugin.go index dfc09cce4fc7a..a6771e655c35a 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -149,9 +149,9 @@ func (p *Plugin) validate(ctx context.Context, tiPlugins *plugins, mode validate return nil } -// Init initializes the plugin and load plugin by config param. -// This method isn't thread-safe and must be called before any other plugin operation. -func Init(ctx context.Context, cfg Config) (err error) { +// Load load plugin by config param. +// This method need be called before domain init to inject global variable info during bootstrap. +func Load(ctx context.Context, cfg Config) (err error) { tiPlugins := &plugins{ plugins: make(map[Kind][]Plugin), versions: make(map[string]uint16), @@ -175,6 +175,7 @@ func Init(ctx context.Context, cfg Config) (err error) { _, dup := tiPlugins.versions[pName] if dup { if cfg.SkipWhenFail { + log.Warnf("duplicate load %s and ignored", pName) continue } err = errDuplicatePlugin.GenWithStackByArgs(pluginID) @@ -185,6 +186,7 @@ func Init(ctx context.Context, cfg Config) (err error) { plugin, err = loadOne(cfg.PluginDir, ID(pluginID)) if err != nil { if cfg.SkipWhenFail { + log.Warnf("load plugin %v failure and ignored, err: %v", pluginID, err) continue } return @@ -197,15 +199,7 @@ func Init(ctx context.Context, cfg Config) (err error) { for i := range tiPlugins.plugins[kind] { if err = tiPlugins.plugins[kind][i].validate(ctx, tiPlugins, initMode); err != nil { if cfg.SkipWhenFail { - tiPlugins.plugins[kind][i].State = Disable - err = nil - continue - } - return - } - p := tiPlugins.plugins[kind][i] - if err = p.OnInit(ctx, p.Manifest); err != nil { - if cfg.SkipWhenFail { + log.Warnf("validate plugin %s fail and disable plugin, err: %v", tiPlugins.plugins[kind][i].Name, err) tiPlugins.plugins[kind][i].State = Disable err = nil continue @@ -220,7 +214,6 @@ func Init(ctx context.Context, cfg Config) (err error) { } } } - tiPlugins.plugins[kind][i].State = Ready } } pluginGlobal = copyOnWriteContext{tiPlugins: unsafe.Pointer(tiPlugins)} @@ -228,30 +221,42 @@ func Init(ctx context.Context, cfg Config) (err error) { return } -// InitWatchLoops starts etcd watch loops for plugin that need watch. -func InitWatchLoops(etcdClient *clientv3.Client) { - if etcdClient == nil { - return - } +// Init initializes the loaded plugin by config param. +// This method must be called after `Load` but before any other plugin method call, so it call got TiDB domain info. +func Init(ctx context.Context, cfg Config) (err error) { tiPlugins := pluginGlobal.plugins() + if tiPlugins == nil { + return nil + } for kind := range tiPlugins.plugins { for i := range tiPlugins.plugins[kind] { - if tiPlugins.plugins[kind][i].OnFlush == nil { - continue + p := tiPlugins.plugins[kind][i] + if err = p.OnInit(ctx, p.Manifest); err != nil { + if cfg.SkipWhenFail { + log.Warnf("call Plugin %s OnInit failure, err: %v", p.Name, err) + tiPlugins.plugins[kind][i].State = Disable + err = nil + continue + } + return } - const pluginWatchPrefix = "/tidb/plugins/" - ctx, cancel := context.WithCancel(context.Background()) - watcher := &flushWatcher{ - ctx: ctx, - cancel: cancel, - path: pluginWatchPrefix + tiPlugins.plugins[kind][i].Name, - etcd: etcdClient, - manifest: tiPlugins.plugins[kind][i].Manifest, + if p.OnFlush != nil && cfg.EtcdClient != nil { + const pluginWatchPrefix = "/tidb/plugins/" + ctx, cancel := context.WithCancel(context.Background()) + watcher := &flushWatcher{ + ctx: ctx, + cancel: cancel, + path: pluginWatchPrefix + tiPlugins.plugins[kind][i].Name, + etcd: cfg.EtcdClient, + manifest: tiPlugins.plugins[kind][i].Manifest, + } + tiPlugins.plugins[kind][i].flushWatcher = watcher + go util.WithRecovery(watcher.watchLoop, nil) } - tiPlugins.plugins[kind][i].flushWatcher = watcher - go util.WithRecovery(watcher.watchLoop, nil) + tiPlugins.plugins[kind][i].State = Ready } } + return } type flushWatcher struct { @@ -326,6 +331,7 @@ func Shutdown(ctx context.Context) { p.flushWatcher.cancel() } if err := p.OnShutdown(ctx, p.Manifest); err != nil { + log.Errorf("call OnShutdown for %s failure, err: %v", p.Name, err) } } } @@ -349,13 +355,23 @@ func Get(kind Kind, name string) *Plugin { return nil } -// GetByKind finds and returns plugin by kind parameters. -func GetByKind(kind Kind) []Plugin { +// ForeachPlugin loops all ready plugins. +func ForeachPlugin(kind Kind, fn func(plugin *Plugin) error) error { plugins := pluginGlobal.plugins() if plugins == nil { return nil } - return plugins.plugins[kind] + for i := range plugins.plugins[kind] { + p := &plugins.plugins[kind][i] + if p.State != Ready { + continue + } + err := fn(p) + if err != nil { + return err + } + } + return nil } // GetAll finds and returns all plugins. @@ -370,7 +386,7 @@ func GetAll() map[Kind][]Plugin { // NotifyFlush notify plugins to do flush logic. func NotifyFlush(dom *domain.Domain, pluginName string) error { p := getByName(pluginName) - if p == nil || p.Manifest.flushWatcher == nil { + if p == nil || p.Manifest.flushWatcher == nil || p.State != Ready { return errors.Errorf("plugin %s doesn't exists or unsupported flush", pluginName) } _, err := dom.GetEtcdClient().KV.Put(context.Background(), p.Manifest.flushWatcher.path, "") diff --git a/plugin/spi.go b/plugin/spi.go index 3a77a57562c9f..8be7f6253f52e 100644 --- a/plugin/spi.go +++ b/plugin/spi.go @@ -18,7 +18,6 @@ import ( "reflect" "unsafe" - "github.com/pingcap/parser/auth" "github.com/pingcap/tidb/sessionctx/variable" ) @@ -40,11 +39,21 @@ type Manifest struct { License string BuildTime string SysVars map[string]*variable.SysVar - Validate func(ctx context.Context, manifest *Manifest) error - OnInit func(ctx context.Context, manifest *Manifest) error - OnShutdown func(ctx context.Context, manifest *Manifest) error - OnFlush func(ctx context.Context, manifest *Manifest) error - flushWatcher *flushWatcher + // Validate defines the validate logic for plugin. + // returns error will stop load plugin process and TiDB startup. + Validate func(ctx context.Context, manifest *Manifest) error + // OnInit defines the plugin init logic. + // it will be called after domain init. + // return error will stop load plugin process and TiDB startup. + OnInit func(ctx context.Context, manifest *Manifest) error + // OnShutDown defines the plugin cleanup logic. + // return error will write log and continue shutdown. + OnShutdown func(ctx context.Context, manifest *Manifest) error + // OnFlush defines flush logic after executed `flush tidb plugins`. + // it will be called after OnInit. + // return error will write log and continue watch following flush. + OnFlush func(ctx context.Context, manifest *Manifest) error + flushWatcher *flushWatcher } // ExportManifest exports a manifest to TiDB as a known format. @@ -54,13 +63,6 @@ func ExportManifest(m interface{}) *Manifest { return (*Manifest)(unsafe.Pointer(v.Pointer())) } -// AuditManifest presents a sub-manifest that every audit plugin must provide. -type AuditManifest struct { - Manifest - NotifyEvent func(ctx context.Context, sctx *variable.SessionVars) error - OnConnectionEvent func(ctx context.Context, u *auth.UserIdentity) error -} - // AuthenticationManifest presents a sub-manifest that every audit plugin must provide. type AuthenticationManifest struct { Manifest diff --git a/plugin/spi_test.go b/plugin/spi_test.go index 98e676acfcc3a..efdd8b53802cb 100644 --- a/plugin/spi_test.go +++ b/plugin/spi_test.go @@ -36,15 +36,14 @@ func TestExportManifest(t *testing.T) { return nil }, }, - NotifyEvent: func(ctx context.Context, sctx *variable.SessionVars) error { + OnGeneralEvent: func(ctx context.Context, sctx *variable.SessionVars, event plugin.GeneralEvent, cmd string) { callRecorder.NotifyEventCalled = true - return nil }, } exported := plugin.ExportManifest(manifest) exported.OnInit(context.Background(), exported) audit := plugin.DeclareAuditManifest(exported) - audit.NotifyEvent(context.Background(), nil) + audit.OnGeneralEvent(context.Background(), nil, plugin.Log, "QUERY") if !callRecorder.NotifyEventCalled || !callRecorder.OnInitCalled { t.Fatalf("export test failure") } diff --git a/server/conn.go b/server/conn.go index cac8f93d2c0d7..21d33be19f0a3 100644 --- a/server/conn.go +++ b/server/conn.go @@ -49,7 +49,7 @@ import ( "sync/atomic" "time" - opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/parser/auth" "github.com/pingcap/parser/mysql" @@ -57,6 +57,7 @@ import ( "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" + "github.com/pingcap/tidb/plugin" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util/arena" @@ -103,6 +104,9 @@ type clientConn struct { ctx QueryCtx // an interface to execute sql statements. attrs map[string]string // attributes parsed from client handshake response, not used for now. status int32 // dispatching/reading/shutdown/waitshutdown + peerHost string // peer host + peerPort string // peer port + lastCode uint16 // last error code // mu is used for cancelling the execution of current transaction. mu struct { @@ -497,18 +501,13 @@ func (cc *clientConn) openSessionAndDoAuth(authData []byte) error { if err != nil { return errors.Trace(err) } - host := variable.DefHostname hasPassword := "YES" if len(authData) == 0 { hasPassword = "NO" } - if !cc.server.isUnixSocket() { - addr := cc.bufReadConn.RemoteAddr().String() - // Do Auth. - host, _, err = net.SplitHostPort(addr) - if err != nil { - return errors.Trace(errAccessDenied.GenWithStackByArgs(cc.user, addr, hasPassword)) - } + host, err := cc.PeerHost(hasPassword) + if err != nil { + return err } if !cc.ctx.Auth(&auth.UserIdentity{Username: cc.user, Hostname: host}, authData, cc.salt) { return errors.Trace(errAccessDenied.GenWithStackByArgs(cc.user, host, hasPassword)) @@ -523,6 +522,27 @@ func (cc *clientConn) openSessionAndDoAuth(authData []byte) error { return nil } +func (cc *clientConn) PeerHost(hasPassword string) (host string, err error) { + if len(cc.peerHost) > 0 { + return cc.peerHost, nil + } + host = variable.DefHostname + if cc.server.isUnixSocket() { + cc.peerHost = host + return + } + addr := cc.bufReadConn.RemoteAddr().String() + var port string + host, port, err = net.SplitHostPort(addr) + if err != nil { + err = errAccessDenied.GenWithStackByArgs(cc.user, addr, hasPassword) + return + } + cc.peerHost = host + cc.peerPort = port + return +} + // Run reads client query and writes query result to client in for loop, if there is a panic during query handling, // it will be recovered and log the panic error. // This function returns and the connection is closed if there is an IO error or there is a panic. @@ -837,6 +857,7 @@ func (cc *clientConn) writeError(e error) error { m = mysql.NewErrf(mysql.ErrUnknown, "%s", e.Error()) } + cc.lastCode = m.Code data := cc.alloc.AllocWithLen(4, 16+len(m.Message)) data = append(data, mysql.ErrHeader) data = append(data, byte(m.Code), byte(m.Code>>8)) @@ -1292,9 +1313,27 @@ func (cc *clientConn) handleChangeUser(ctx context.Context, data []byte) error { if err != nil { logutil.Logger(ctx).Debug("close old context error", zap.Error(err)) } + err = cc.openSessionAndDoAuth(pass) if err != nil { return errors.Trace(err) } + + cc.ctx.GetSessionVars().ConnectionInfo.User = cc.ctx.GetSessionVars().User.Username + connInfo := cc.ctx.GetSessionVars().ConnectionInfo + err = plugin.ForeachPlugin(plugin.Audit, func(p *plugin.Plugin) error { + authPlugin := plugin.DeclareAuditManifest(p.Manifest) + if authPlugin.OnConnectionEvent != nil { + err = authPlugin.OnConnectionEvent(context.Background(), &auth.UserIdentity{Hostname: connInfo.Host}, plugin.ChangeUser, connInfo) + if err != nil { + return errors.Trace(err) + } + } + return nil + }) + if err != nil { + return err + } + return cc.writeOK() } diff --git a/server/server.go b/server/server.go index 5c11bc668a26d..a3cb557c693c6 100644 --- a/server/server.go +++ b/server/server.go @@ -38,11 +38,13 @@ import ( "math/rand" "net" "net/http" - // For pprof - _ "net/http/pprof" + "os" + "os/user" "sync" "sync/atomic" "time" + // For pprof + _ "net/http/pprof" "github.com/blacktear23/go-proxyprotocol" "github.com/pingcap/errors" @@ -55,14 +57,32 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/sys/linux" log "github.com/sirupsen/logrus" "go.uber.org/zap" ) var ( baseConnID uint32 + serverPID int + osUser string + osVersion string ) +func init() { + serverPID = os.Getpid() + currentUser, err := user.Current() + if err != nil { + osUser = "" + } else { + osUser = currentUser.Name + } + osVersion, err = linux.OSVersion() + if err != nil { + osVersion = "" + } +} + var ( errUnknownFieldType = terror.ClassServer.New(codeUnknownFieldType, "unknown field type") errInvalidPayloadLen = terror.ClassServer.New(codeInvalidPayloadLen, "invalid payload length") @@ -311,26 +331,31 @@ func (s *Server) Run() error { break } - for _, p := range plugin.GetByKind(plugin.Audit) { + clientConn := s.newConn(conn) + + err = plugin.ForeachPlugin(plugin.Audit, func(p *plugin.Plugin) error { authPlugin := plugin.DeclareAuditManifest(p.Manifest) if authPlugin.OnConnectionEvent != nil { - host, err := getPeerHost(conn) + host, err := clientConn.PeerHost("") if err != nil { - log.Error(err) - terror.Log(conn.Close()) - continue + log.Info(err) + terror.Log(clientConn.Close()) + return errors.Trace(err) } - - err = authPlugin.OnConnectionEvent(context.Background(), &auth.UserIdentity{Hostname: host}) + err = authPlugin.OnConnectionEvent(context.Background(), &auth.UserIdentity{Hostname: host}, plugin.PreAuth, nil) if err != nil { log.Info(err) - terror.Log(conn.Close()) - continue + terror.Log(clientConn.Close()) + return errors.Trace(err) } } + return nil + }) + if err != nil { + return err } - go s.onConn(conn) + go s.onConn(clientConn) } err := s.listener.Close() terror.Log(errors.Trace(err)) @@ -342,15 +367,6 @@ func (s *Server) Run() error { } } -func getPeerHost(conn net.Conn) (string, error) { - addr := conn.RemoteAddr().String() - host, _, err := net.SplitHostPort(addr) - if err != nil { - return "", errors.Trace(err) - } - return host, nil -} - func (s *Server) shouldStopListener() bool { select { case <-s.stopListenerCh: @@ -384,18 +400,19 @@ func (s *Server) Close() { } // onConn runs in its own goroutine, handles queries from this connection. -func (s *Server) onConn(c net.Conn) { - conn := s.newConn(c) +func (s *Server) onConn(conn *clientConn) { ctx := logutil.WithConnID(context.Background(), conn.connectionID) if err := conn.handshake(ctx); err != nil { // Some keep alive services will send request to TiDB and disconnect immediately. // So we only record metrics. metrics.HandShakeErrorCounter.Inc() - err = c.Close() + err = conn.Close() terror.Log(errors.Trace(err)) return } - logutil.Logger(ctx).Info("new connection", zap.String("remoteAddr", c.RemoteAddr().String())) + + logutil.Logger(ctx).Info("new connection", zap.String("remoteAddr", conn.bufReadConn.RemoteAddr().String())) + defer func() { logutil.Logger(ctx).Info("close connection") }() @@ -405,7 +422,64 @@ func (s *Server) onConn(c net.Conn) { s.rwlock.Unlock() metrics.ConnGauge.Set(float64(connections)) + connInfo := s.buildConnectInfo(conn) + err := plugin.ForeachPlugin(plugin.Audit, func(p *plugin.Plugin) error { + authPlugin := plugin.DeclareAuditManifest(p.Manifest) + if authPlugin.OnConnectionEvent != nil { + return authPlugin.OnConnectionEvent(context.Background(), conn.ctx.GetSessionVars().User, plugin.Connected, connInfo) + } + return nil + }) + if err != nil { + return + } + + conn.ctx.GetSessionVars().ConnectionInfo = connInfo + connectedTime := time.Now() conn.Run(ctx) + + connInfo.Duration = float64(time.Since(connectedTime)) / float64(time.Millisecond) + err = plugin.ForeachPlugin(plugin.Audit, func(p *plugin.Plugin) error { + authPlugin := plugin.DeclareAuditManifest(p.Manifest) + if authPlugin.OnConnectionEvent != nil { + err := authPlugin.OnConnectionEvent(context.Background(), conn.ctx.GetSessionVars().User, plugin.Disconnect, connInfo) + if err != nil { + log.Warnf("call Plugin %s OnConnectionEvent(Disconnect) failure, err: %v", authPlugin.Name, err) + } + } + return nil + }) + if err != nil { + return + } +} + +func (s *Server) buildConnectInfo(conn *clientConn) *variable.ConnectionInfo { + connType := "Socket" + if conn.server.isUnixSocket() { + connType = "UnixSocket" + } else if conn.tlsConn != nil { + connType = "SSL/TLS" + } + connInfo := &variable.ConnectionInfo{ + ConnectionID: conn.connectionID, + ConnectionType: connType, + Host: conn.peerHost, + ClientIP: conn.peerHost, + ClientPort: conn.peerPort, + ServerID: 1, + ServerPort: int(conn.server.cfg.Port), + Duration: 0, + User: conn.user, + ServerOSLoginUser: osUser, + OSVersion: osVersion, + ClientVersion: "", + ServerVersion: mysql.TiDBReleaseVersion, + SSLVersion: "v1.2.0", // for current go version + PID: serverPID, + DB: conn.dbname, + } + return connInfo } // ShowProcessList implements the SessionManager interface. diff --git a/server/tidb_test.go b/server/tidb_test.go index b608df2424cda..7f7441ae93ba4 100644 --- a/server/tidb_test.go +++ b/server/tidb_test.go @@ -72,7 +72,7 @@ func (ts *TidbTestSuite) SetUpSuite(c *C) { // Run this test here because parallel would affect the result of it. runTestStmtCount(c) - defaultLoadDataBatchCnt = 3 + defaultLoadDataBatchCnt = 1 } func (ts *TidbTestSuite) TearDownSuite(c *C) { diff --git a/session/session.go b/session/session.go index 9f1f6b5e4b00f..23dda68237301 100644 --- a/session/session.go +++ b/session/session.go @@ -1304,7 +1304,7 @@ func loadSystemTZ(se *session) (string, error) { func BootstrapSession(store kv.Storage) (*domain.Domain, error) { cfg := config.GetGlobalConfig() if len(cfg.Plugin.Load) > 0 { - err := plugin.Init(context.Background(), plugin.Config{ + err := plugin.Load(context.Background(), plugin.Config{ Plugins: strings.Split(cfg.Plugin.Load, ","), PluginDir: cfg.Plugin.Dir, GlobalSysVar: &variable.SysVars, @@ -1344,6 +1344,13 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { } } + if len(cfg.Plugin.Load) > 0 { + err := plugin.Init(context.Background(), plugin.Config{EtcdClient: dom.GetEtcdClient()}) + if err != nil { + return nil, errors.Trace(err) + } + } + se1, err := createSession(store) if err != nil { return nil, errors.Trace(err) @@ -1361,9 +1368,12 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { return nil, errors.Trace(err) } - if len(cfg.Plugin.Load) > 0 { - plugin.InitWatchLoops(dom.GetEtcdClient()) + // get global system variable tidb_log_bin from mysql.GLOBAL_VARIABLES + tidbLogBin, err := se1.GetGlobalSysVar(variable.TiDBLogBin) + if err != nil { + return nil, errors.Trace(err) } + variable.SysVars[variable.TiDBLogBin].Value = tidbLogBin if raw, ok := store.(domain.EtcdBackend); ok { err = raw.StartGCWorker() diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index e999a2921b6fe..3921ae57b66a2 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -122,6 +122,7 @@ type StatementContext struct { normalized string digest string } + Tables map[TableEntry]struct{} } // SQLDigest gets normalized and digest for provided sql. @@ -133,6 +134,12 @@ func (sc *StatementContext) SQLDigest() (normalized, sqlDigest string) { return sc.digestMemo.normalized, sc.digestMemo.digest } +// TableEntry presents table in db. +type TableEntry struct { + DB string + Table string +} + // AddAffectedRows adds affected rows. func (sc *StatementContext) AddAffectedRows(rows uint64) { sc.mu.Lock() diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index bbffa19c577d6..3e75e45d430dc 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -340,6 +340,29 @@ type SessionVars struct { // SlowQueryFile indicates which slow query log file for SLOW_QUERY table to parse. SlowQueryFile string + + // ConnectionInfo indicate current connection information. + ConnectionInfo *ConnectionInfo +} + +// ConnectionInfo present connection used by audit. +type ConnectionInfo struct { + ConnectionID uint32 + ConnectionType string + Host string + ClientIP string + ClientPort string + ServerID int + ServerPort int + Duration float64 + User string + ServerOSLoginUser string + OSVersion string + ClientVersion string + ServerVersion string + SSLVersion string + PID int + DB string } // NewSessionVars creates a session vars object. diff --git a/util/sys/linux/sys_linux.go b/util/sys/linux/sys_linux.go new file mode 100644 index 0000000000000..c53bf77b31681 --- /dev/null +++ b/util/sys/linux/sys_linux.go @@ -0,0 +1,40 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. +// +build linux + +package linux + +import "syscall" + +// OSVersion returns version info of operation system. +// e.g. Linux 4.15.0-45-generic.x86_64 +func OSVersion() (osVersion string, err error) { + var un syscall.Utsname + err = syscall.Uname(&un) + if err != nil { + return + } + charsToString := func(ca []int8) string { + s := make([]byte, len(ca)) + var lens int + for ; lens < len(ca); lens++ { + if ca[lens] == 0 { + break + } + s[lens] = uint8(ca[lens]) + } + return string(s[0:lens]) + } + osVersion = charsToString(un.Sysname[:]) + " " + charsToString(un.Release[:]) + "." + charsToString(un.Machine[:]) + return +} diff --git a/util/sys/linux/sys_other.go b/util/sys/linux/sys_other.go new file mode 100644 index 0000000000000..98b4eae9749f9 --- /dev/null +++ b/util/sys/linux/sys_other.go @@ -0,0 +1,24 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. +// +build !linux + +package linux + +import "runtime" + +// OSVersion returns version info of operation system. +// for non-linux system will only return os and arch info. +func OSVersion() (osVersion string, err error) { + osVersion = runtime.GOOS + "." + runtime.GOARCH + return +} diff --git a/util/sys/linux/sys_test.go b/util/sys/linux/sys_test.go new file mode 100644 index 0000000000000..b9ccc2a823bec --- /dev/null +++ b/util/sys/linux/sys_test.go @@ -0,0 +1,29 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. +package linux_test + +import ( + "testing" + + "github.com/pingcap/tidb/util/sys/linux" +) + +func TestGetOSVersion(t *testing.T) { + osRelease, err := linux.OSVersion() + if err != nil { + t.Fatal(t) + } + if len(osRelease) == 0 { + t.Fatalf("counld not get os version") + } +} From 55b3d5ecf26ad83fd0fc0ac7d9f66f88010e7538 Mon Sep 17 00:00:00 2001 From: lysu Date: Wed, 13 Mar 2019 20:59:33 +0800 Subject: [PATCH 2/7] better flush failure message --- plugin/plugin.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin/plugin.go b/plugin/plugin.go index a6771e655c35a..a479a6423e5fc 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -387,7 +387,7 @@ func GetAll() map[Kind][]Plugin { func NotifyFlush(dom *domain.Domain, pluginName string) error { p := getByName(pluginName) if p == nil || p.Manifest.flushWatcher == nil || p.State != Ready { - return errors.Errorf("plugin %s doesn't exists or unsupported flush", pluginName) + return errors.Errorf("plugin %s doesn't exists or unsupported flush or doesn't start with PD %v", pluginName) } _, err := dom.GetEtcdClient().KV.Put(context.Background(), p.Manifest.flushWatcher.path, "") if err != nil { From 4474973b267df2c794df9b0f8ef7e63bbfa76218 Mon Sep 17 00:00:00 2001 From: lysu Date: Thu, 14 Mar 2019 17:14:45 +0800 Subject: [PATCH 3/7] address comment --- plugin/plugin.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin/plugin.go b/plugin/plugin.go index a479a6423e5fc..89cc74f156e48 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -387,7 +387,7 @@ func GetAll() map[Kind][]Plugin { func NotifyFlush(dom *domain.Domain, pluginName string) error { p := getByName(pluginName) if p == nil || p.Manifest.flushWatcher == nil || p.State != Ready { - return errors.Errorf("plugin %s doesn't exists or unsupported flush or doesn't start with PD %v", pluginName) + return errors.Errorf("plugin %s doesn't exists or unsupported flush or doesn't start with PD", pluginName) } _, err := dom.GetEtcdClient().KV.Put(context.Background(), p.Manifest.flushWatcher.path, "") if err != nil { From 037e61957b327f91aede960cd856c2862e0ba6da Mon Sep 17 00:00:00 2001 From: lysu Date: Thu, 14 Mar 2019 22:12:07 +0800 Subject: [PATCH 4/7] address comment --- executor/adapter.go | 5 +---- server/tidb_test.go | 2 +- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/executor/adapter.go b/executor/adapter.go index 21eed8410b834..6a8443ddbd81d 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -373,10 +373,7 @@ func (a *ExecStmt) logAudit() { err := plugin.ForeachPlugin(plugin.Audit, func(p *plugin.Plugin) error { audit := plugin.DeclareAuditManifest(p.Manifest) if audit.OnGeneralEvent != nil { - cmd, exists := mysql.Command2Str[byte(atomic.LoadUint32(&a.Ctx.GetSessionVars().CommandValue))] - if !exists { - cmd = "" - } + cmd := mysql.Command2Str[byte(atomic.LoadUint32(&a.Ctx.GetSessionVars().CommandValue))] audit.OnGeneralEvent(context.Background(), sessVars, plugin.Log, cmd) } return nil diff --git a/server/tidb_test.go b/server/tidb_test.go index 7f7441ae93ba4..b608df2424cda 100644 --- a/server/tidb_test.go +++ b/server/tidb_test.go @@ -72,7 +72,7 @@ func (ts *TidbTestSuite) SetUpSuite(c *C) { // Run this test here because parallel would affect the result of it. runTestStmtCount(c) - defaultLoadDataBatchCnt = 1 + defaultLoadDataBatchCnt = 3 } func (ts *TidbTestSuite) TearDownSuite(c *C) { From bc9767622ba049a11d049cad23518d1533ccc648 Mon Sep 17 00:00:00 2001 From: lysu Date: Fri, 15 Mar 2019 17:52:45 +0800 Subject: [PATCH 5/7] address comments --- planner/core/planbuilder.go | 19 +++++++++++++++---- sessionctx/stmtctx/stmtctx.go | 2 +- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 8aa066d299649..1a327bd3bd880 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -161,12 +161,23 @@ func (b *PlanBuilder) GetVisitInfo() []visitInfo { } // GetDBTableInfo gets the accessed dbs and tables info. -func (b *PlanBuilder) GetDBTableInfo() (tables map[stmtctx.TableEntry]struct{}) { - tables = make(map[stmtctx.TableEntry]struct{}) +func (b *PlanBuilder) GetDBTableInfo() []stmtctx.TableEntry { + var tables []stmtctx.TableEntry + existsFunc := func(tbls []stmtctx.TableEntry, tbl *stmtctx.TableEntry) bool { + for _, t := range tbls { + if t == *tbl { + return true + } + } + return false + } for _, v := range b.visitInfo { - tables[stmtctx.TableEntry{DB: v.db, Table: v.table}] = struct{}{} + tbl := &stmtctx.TableEntry{DB: v.db, Table: v.table} + if !existsFunc(tables, tbl) { + tables = append(tables, *tbl) + } } - return + return tables } // GetOptFlag gets the optFlag of the PlanBuilder. diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 3921ae57b66a2..de6cd9fb1857e 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -122,7 +122,7 @@ type StatementContext struct { normalized string digest string } - Tables map[TableEntry]struct{} + Tables []TableEntry } // SQLDigest gets normalized and digest for provided sql. From 703df7bd98e40d60005502eef7b16307d368720d Mon Sep 17 00:00:00 2001 From: lysu Date: Thu, 21 Mar 2019 21:39:40 +0800 Subject: [PATCH 6/7] fix case and lazy connInfo alloc --- server/conn.go | 3 +-- server/server.go | 26 +++++++++++++------------- sessionctx/variable/session.go | 3 --- 3 files changed, 14 insertions(+), 18 deletions(-) diff --git a/server/conn.go b/server/conn.go index 21d33be19f0a3..dba6ef82a0ffd 100644 --- a/server/conn.go +++ b/server/conn.go @@ -1319,11 +1319,10 @@ func (cc *clientConn) handleChangeUser(ctx context.Context, data []byte) error { return errors.Trace(err) } - cc.ctx.GetSessionVars().ConnectionInfo.User = cc.ctx.GetSessionVars().User.Username - connInfo := cc.ctx.GetSessionVars().ConnectionInfo err = plugin.ForeachPlugin(plugin.Audit, func(p *plugin.Plugin) error { authPlugin := plugin.DeclareAuditManifest(p.Manifest) if authPlugin.OnConnectionEvent != nil { + connInfo := cc.connectInfo() err = authPlugin.OnConnectionEvent(context.Background(), &auth.UserIdentity{Hostname: connInfo.Host}, plugin.ChangeUser, connInfo) if err != nil { return errors.Trace(err) diff --git a/server/server.go b/server/server.go index a3cb557c693c6..a114a43c58d7c 100644 --- a/server/server.go +++ b/server/server.go @@ -422,10 +422,10 @@ func (s *Server) onConn(conn *clientConn) { s.rwlock.Unlock() metrics.ConnGauge.Set(float64(connections)) - connInfo := s.buildConnectInfo(conn) err := plugin.ForeachPlugin(plugin.Audit, func(p *plugin.Plugin) error { authPlugin := plugin.DeclareAuditManifest(p.Manifest) if authPlugin.OnConnectionEvent != nil { + connInfo := conn.connectInfo() return authPlugin.OnConnectionEvent(context.Background(), conn.ctx.GetSessionVars().User, plugin.Connected, connInfo) } return nil @@ -434,14 +434,14 @@ func (s *Server) onConn(conn *clientConn) { return } - conn.ctx.GetSessionVars().ConnectionInfo = connInfo connectedTime := time.Now() conn.Run(ctx) - connInfo.Duration = float64(time.Since(connectedTime)) / float64(time.Millisecond) err = plugin.ForeachPlugin(plugin.Audit, func(p *plugin.Plugin) error { authPlugin := plugin.DeclareAuditManifest(p.Manifest) if authPlugin.OnConnectionEvent != nil { + connInfo := conn.connectInfo() + connInfo.Duration = float64(time.Since(connectedTime)) / float64(time.Millisecond) err := authPlugin.OnConnectionEvent(context.Background(), conn.ctx.GetSessionVars().User, plugin.Disconnect, connInfo) if err != nil { log.Warnf("call Plugin %s OnConnectionEvent(Disconnect) failure, err: %v", authPlugin.Name, err) @@ -454,30 +454,30 @@ func (s *Server) onConn(conn *clientConn) { } } -func (s *Server) buildConnectInfo(conn *clientConn) *variable.ConnectionInfo { +func (cc *clientConn) connectInfo() *variable.ConnectionInfo { connType := "Socket" - if conn.server.isUnixSocket() { + if cc.server.isUnixSocket() { connType = "UnixSocket" - } else if conn.tlsConn != nil { + } else if cc.tlsConn != nil { connType = "SSL/TLS" } connInfo := &variable.ConnectionInfo{ - ConnectionID: conn.connectionID, + ConnectionID: cc.connectionID, ConnectionType: connType, - Host: conn.peerHost, - ClientIP: conn.peerHost, - ClientPort: conn.peerPort, + Host: cc.peerHost, + ClientIP: cc.peerHost, + ClientPort: cc.peerPort, ServerID: 1, - ServerPort: int(conn.server.cfg.Port), + ServerPort: int(cc.server.cfg.Port), Duration: 0, - User: conn.user, + User: cc.user, ServerOSLoginUser: osUser, OSVersion: osVersion, ClientVersion: "", ServerVersion: mysql.TiDBReleaseVersion, SSLVersion: "v1.2.0", // for current go version PID: serverPID, - DB: conn.dbname, + DB: cc.dbname, } return connInfo } diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 3e75e45d430dc..36f0cec8d196f 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -340,9 +340,6 @@ type SessionVars struct { // SlowQueryFile indicates which slow query log file for SLOW_QUERY table to parse. SlowQueryFile string - - // ConnectionInfo indicate current connection information. - ConnectionInfo *ConnectionInfo } // ConnectionInfo present connection used by audit. From 67eb27063b553ac0a7f1faa7bb7a61b332a2d43e Mon Sep 17 00:00:00 2001 From: lysu Date: Mon, 25 Mar 2019 12:41:02 +0800 Subject: [PATCH 7/7] fix compile error after rebase --- plugin/plugin.go | 13 ++++++++----- session/session.go | 7 ------- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/plugin/plugin.go b/plugin/plugin.go index 89cc74f156e48..ee5a7d68c6de6 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -175,7 +175,7 @@ func Load(ctx context.Context, cfg Config) (err error) { _, dup := tiPlugins.versions[pName] if dup { if cfg.SkipWhenFail { - log.Warnf("duplicate load %s and ignored", pName) + logutil.Logger(ctx).Warn("duplicate load %s and ignored", zap.String("pluginName", pName)) continue } err = errDuplicatePlugin.GenWithStackByArgs(pluginID) @@ -186,7 +186,7 @@ func Load(ctx context.Context, cfg Config) (err error) { plugin, err = loadOne(cfg.PluginDir, ID(pluginID)) if err != nil { if cfg.SkipWhenFail { - log.Warnf("load plugin %v failure and ignored, err: %v", pluginID, err) + logutil.Logger(ctx).Warn("load plugin failure and ignored", zap.String("pluginID", pluginID), zap.Error(err)) continue } return @@ -199,7 +199,8 @@ func Load(ctx context.Context, cfg Config) (err error) { for i := range tiPlugins.plugins[kind] { if err = tiPlugins.plugins[kind][i].validate(ctx, tiPlugins, initMode); err != nil { if cfg.SkipWhenFail { - log.Warnf("validate plugin %s fail and disable plugin, err: %v", tiPlugins.plugins[kind][i].Name, err) + logutil.Logger(ctx).Warn("validate plugin fail and disable plugin", + zap.String("plugin", tiPlugins.plugins[kind][i].Name), zap.Error(err)) tiPlugins.plugins[kind][i].State = Disable err = nil continue @@ -233,7 +234,8 @@ func Init(ctx context.Context, cfg Config) (err error) { p := tiPlugins.plugins[kind][i] if err = p.OnInit(ctx, p.Manifest); err != nil { if cfg.SkipWhenFail { - log.Warnf("call Plugin %s OnInit failure, err: %v", p.Name, err) + logutil.Logger(ctx).Warn("call Plugin OnInit failure, err: %v", + zap.String("plugin", p.Name), zap.Error(err)) tiPlugins.plugins[kind][i].State = Disable err = nil continue @@ -331,7 +333,8 @@ func Shutdown(ctx context.Context) { p.flushWatcher.cancel() } if err := p.OnShutdown(ctx, p.Manifest); err != nil { - log.Errorf("call OnShutdown for %s failure, err: %v", p.Name, err) + logutil.Logger(ctx).Error("call OnShutdown for failure", + zap.String("plugin", p.Name), zap.Error(err)) } } } diff --git a/session/session.go b/session/session.go index 23dda68237301..489990bb6f2d9 100644 --- a/session/session.go +++ b/session/session.go @@ -1368,13 +1368,6 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { return nil, errors.Trace(err) } - // get global system variable tidb_log_bin from mysql.GLOBAL_VARIABLES - tidbLogBin, err := se1.GetGlobalSysVar(variable.TiDBLogBin) - if err != nil { - return nil, errors.Trace(err) - } - variable.SysVars[variable.TiDBLogBin].Value = tidbLogBin - if raw, ok := store.(domain.EtcdBackend); ok { err = raw.StartGCWorker() if err != nil {