diff --git a/executor/admin_plugins.go b/executor/admin_plugins.go new file mode 100644 index 0000000000000..440c1c0852306 --- /dev/null +++ b/executor/admin_plugins.go @@ -0,0 +1,52 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor + +import ( + "context" + + "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/plugin" + "github.com/pingcap/tidb/util/chunk" +) + +// AdminPluginsExec indicates AdminPlugins executor. +type AdminPluginsExec struct { + baseExecutor + Action core.AdminPluginsAction + Plugins []string +} + +// Next implements the Executor Next interface. +func (e *AdminPluginsExec) Next(ctx context.Context, _ *chunk.Chunk) error { + switch e.Action { + case core.Enable: + return e.changeDisableFlagAndFlush(false) + case core.Disable: + return e.changeDisableFlagAndFlush(true) + } + return nil +} + +func (e *AdminPluginsExec) changeDisableFlagAndFlush(disabled bool) error { + dom := domain.GetDomain(e.ctx) + for _, pluginName := range e.Plugins { + err := plugin.ChangeDisableFlagAndFlush(dom, pluginName, disabled) + if err != nil { + return err + } + } + return nil +} diff --git a/executor/builder.go b/executor/builder.go index 2bec6ffa8b769..94aa7fd37fdf4 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -108,6 +108,8 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor { return b.buildChecksumTable(v) case *plannercore.ReloadExprPushdownBlacklist: return b.buildReloadExprPushdownBlacklist(v) + case *plannercore.AdminPlugins: + return b.buildAdminPlugins(v) case *plannercore.DDL: return b.buildDDL(v) case *plannercore.Deallocate: @@ -467,6 +469,10 @@ func (b *executorBuilder) buildReloadExprPushdownBlacklist(v *plannercore.Reload return &ReloadExprPushdownBlacklistExec{baseExecutor{ctx: b.ctx}} } +func (b *executorBuilder) buildAdminPlugins(v *plannercore.AdminPlugins) Executor { + return &AdminPluginsExec{baseExecutor: baseExecutor{ctx: b.ctx}, Action: v.Action, Plugins: v.Plugins} +} + func (b *executorBuilder) buildDeallocate(v *plannercore.Deallocate) Executor { base := newBaseExecutor(b.ctx, nil, v.ExplainID()) base.initCap = chunk.ZeroCapacity diff --git a/executor/show.go b/executor/show.go index 33720f277426e..acc93e7fa3e80 100644 --- a/executor/show.go +++ b/executor/show.go @@ -1030,7 +1030,7 @@ func (e *ShowExec) fetchShowPlugins() error { tiPlugins := plugin.GetAll() for _, ps := range tiPlugins { for _, p := range ps { - e.appendRow([]interface{}{p.Name, p.State.String(), p.Kind.String(), p.Path, p.License, strconv.Itoa(int(p.Version))}) + e.appendRow([]interface{}{p.Name, p.StateValue(), p.Kind.String(), p.Path, p.License, strconv.Itoa(int(p.Version))}) } } return nil diff --git a/go.mod b/go.mod index 64917a182d71d..91aa99b647cee 100644 --- a/go.mod +++ b/go.mod @@ -41,7 +41,7 @@ require ( github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531 github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 - github.com/pingcap/parser v0.0.0-20190701123046-5768e68c1e65 + github.com/pingcap/parser v0.0.0-20190708123555-29973f7a22eb github.com/pingcap/pd v0.0.0-20190617100349-293d4b5189bf github.com/pingcap/tidb-tools v2.1.3-0.20190321065848-1e8b48f5c168+incompatible github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330 diff --git a/go.sum b/go.sum index 675d987a4cd1b..7757f3e24a50f 100644 --- a/go.sum +++ b/go.sum @@ -164,8 +164,8 @@ github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531/go.mod h1:QMdbTAXC github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w= github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ= github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw= -github.com/pingcap/parser v0.0.0-20190701123046-5768e68c1e65 h1:yVYWPPQIq3csxtHvzx2fVO4HrQQOhxYDdYDQ4euSCIc= -github.com/pingcap/parser v0.0.0-20190701123046-5768e68c1e65/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA= +github.com/pingcap/parser v0.0.0-20190708123555-29973f7a22eb h1:jfhJo/D1bWMF+zVaVdmixWG5EnxbnFt99GS2pdxuToo= +github.com/pingcap/parser v0.0.0-20190708123555-29973f7a22eb/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA= github.com/pingcap/pd v0.0.0-20190617100349-293d4b5189bf h1:vmlN6DpZI5LtHd8r9YRAsyCeTU2pxRq+WlWn5CZ+ax4= github.com/pingcap/pd v0.0.0-20190617100349-293d4b5189bf/go.mod h1:3DlDlFT7EF64A1bmb/tulZb6wbPSagm5G4p1AlhaEDs= github.com/pingcap/tidb-tools v2.1.3-0.20190321065848-1e8b48f5c168+incompatible h1:MkWCxgZpJBgY2f4HtwWMMFzSBb3+JPzeJgF3VrXE/bU= diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index a98ed76d66d30..55a63915f9e3b 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -133,6 +133,23 @@ type ReloadExprPushdownBlacklist struct { baseSchemaProducer } +// AdminPluginsAction indicate action will be taken on plugins. +type AdminPluginsAction int + +const ( + // Enable indicates enable plugins. + Enable AdminPluginsAction = iota + 1 + // Disable indicates disable plugins. + Disable +) + +// AdminPlugins administrates tidb plugins. +type AdminPlugins struct { + baseSchemaProducer + Action AdminPluginsAction + Plugins []string +} + // Change represents a change plan. type Change struct { baseSchemaProducer diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 1f2b2afff5942..5a4688b8fe0e6 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -673,6 +673,10 @@ func (b *PlanBuilder) buildAdmin(as *ast.AdminStmt) (Plan, error) { ret = p case ast.AdminReloadExprPushdownBlacklist: return &ReloadExprPushdownBlacklist{}, nil + case ast.AdminPluginEnable: + return &AdminPlugins{Action: Enable, Plugins: as.Plugins}, nil + case ast.AdminPluginDisable: + return &AdminPlugins{Action: Disable, Plugins: as.Plugins}, nil default: return nil, ErrUnsupportedType.GenWithStack("Unsupported ast.AdminStmt(%T) for buildAdmin", as) } diff --git a/plugin/audit.go b/plugin/audit.go index 8ad556495ac62..603b7e0f8982f 100644 --- a/plugin/audit.go +++ b/plugin/audit.go @@ -16,7 +16,6 @@ package plugin import ( "context" - "github.com/pingcap/parser/auth" "github.com/pingcap/tidb/sessionctx/variable" ) @@ -77,7 +76,7 @@ type AuditManifest struct { Manifest // OnConnectionEvent will be called when TiDB receive or disconnect from client. // return error will ignore and close current connection. - OnConnectionEvent func(ctx context.Context, identity *auth.UserIdentity, event ConnectionEvent, info *variable.ConnectionInfo) error + OnConnectionEvent func(ctx context.Context, event ConnectionEvent, info *variable.ConnectionInfo) error // OnGeneralEvent will be called during TiDB execution. OnGeneralEvent func(ctx context.Context, sctx *variable.SessionVars, event GeneralEvent, cmd string) // OnGlobalVariableEvent will be called when Change GlobalVariable. diff --git a/plugin/plugin.go b/plugin/plugin.go index f0093dfef60bb..06c9fe520159e 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -95,9 +95,28 @@ type Config struct { // Plugin presents a TiDB plugin. type Plugin struct { *Manifest - library *gplugin.Plugin - State State - Path string + library *gplugin.Plugin + State State + Path string + Disabled uint32 +} + +// StateValue returns readable state string. +func (p *Plugin) StateValue() string { + flag := "enable" + if atomic.LoadUint32(&p.Disabled) == 1 { + flag = "disable" + } + return p.State.String() + "-" + flag +} + +// DisableFlag changes the disable flag of plugin. +func (p *Plugin) DisableFlag(disable bool) { + if disable { + atomic.StoreUint32(&p.Disabled, 1) + } else { + atomic.StoreUint32(&p.Disabled, 0) + } } func (p *Plugin) validate(ctx context.Context, tiPlugins *plugins) error { @@ -225,6 +244,7 @@ func Init(ctx context.Context, cfg Config) (err error) { path: pluginWatchPrefix + tiPlugins.plugins[kind][i].Name, etcd: cfg.EtcdClient, manifest: tiPlugins.plugins[kind][i].Manifest, + plugin: &tiPlugins.plugins[kind][i], } tiPlugins.plugins[kind][i].flushWatcher = watcher go util.WithRecovery(watcher.watchLoop, nil) @@ -241,6 +261,7 @@ type flushWatcher struct { path string etcd *clientv3.Client manifest *Manifest + plugin *Plugin } func (w *flushWatcher) watchLoop() { @@ -250,7 +271,16 @@ func (w *flushWatcher) watchLoop() { case <-w.ctx.Done(): return case <-watchChan: - err := w.manifest.OnFlush(w.ctx, w.manifest) + disabled, err := w.getPluginDisabledFlag() + if err != nil { + logutil.BgLogger().Error("get plugin disabled flag failure", zap.String("plugin", w.manifest.Name), zap.Error(err)) + } + if disabled { + atomic.StoreUint32(&w.manifest.flushWatcher.plugin.Disabled, 1) + } else { + atomic.StoreUint32(&w.manifest.flushWatcher.plugin.Disabled, 0) + } + err = w.manifest.OnFlush(w.ctx, w.manifest) if err != nil { logutil.BgLogger().Error("notify plugin flush event failed", zap.String("plugin", w.manifest.Name), zap.Error(err)) } @@ -258,6 +288,20 @@ func (w *flushWatcher) watchLoop() { } } +func (w *flushWatcher) getPluginDisabledFlag() (bool, error) { + if w == nil || w.etcd == nil { + return true, errors.New("etcd is need to get plugin enable status") + } + resp, err := w.etcd.Get(context.Background(), w.manifest.flushWatcher.path) + if err != nil { + return true, errors.Trace(err) + } + if len(resp.Kvs) == 0 { + return false, nil + } + return string(resp.Kvs[0].Value) == "1", nil +} + type loadFn func(plugin *Plugin, dir string, pluginID ID) (manifest func() *Manifest, err error) var testHook *struct { @@ -366,6 +410,9 @@ func ForeachPlugin(kind Kind, fn func(plugin *Plugin) error) error { if p.State != Ready { continue } + if atomic.LoadUint32(&p.Disabled) == 1 { + continue + } err := fn(p) if err != nil { return err @@ -382,7 +429,7 @@ func IsEnable(kind Kind) bool { } for i := range plugins.plugins[kind] { p := &plugins.plugins[kind][i] - if p.State == Ready { + if p.State == Ready && atomic.LoadUint32(&p.Disabled) != 1 { return true } } @@ -404,7 +451,25 @@ func NotifyFlush(dom *domain.Domain, pluginName string) error { if p == nil || p.Manifest.flushWatcher == nil || p.State != Ready { return errors.Errorf("plugin %s doesn't exists or unsupported flush or doesn't start with PD", pluginName) } - _, err := dom.GetEtcdClient().KV.Put(context.Background(), p.Manifest.flushWatcher.path, "") + _, err := dom.GetEtcdClient().KV.Put(context.Background(), p.Manifest.flushWatcher.path, strconv.Itoa(int(p.Disabled))) + if err != nil { + return err + } + return nil +} + +// ChangeDisableFlagAndFlush changes plugin disable flag and notify other nodes to do same change. +func ChangeDisableFlagAndFlush(dom *domain.Domain, pluginName string, disable bool) error { + p := getByName(pluginName) + if p == nil || p.Manifest.flushWatcher == nil || p.State != Ready { + return errors.Errorf("plugin %s doesn't exists or unsupported flush or doesn't start with PD", pluginName) + } + disableInt := uint32(0) + if disable { + disableInt = 1 + } + atomic.StoreUint32(&p.Disabled, disableInt) + _, err := dom.GetEtcdClient().KV.Put(context.Background(), p.Manifest.flushWatcher.path, strconv.Itoa(int(disableInt))) if err != nil { return err } diff --git a/server/conn.go b/server/conn.go index d22f6f5c91e10..8fb03b7cd1811 100644 --- a/server/conn.go +++ b/server/conn.go @@ -1457,7 +1457,7 @@ func (cc *clientConn) handleChangeUser(ctx context.Context, data []byte) error { authPlugin := plugin.DeclareAuditManifest(p.Manifest) if authPlugin.OnConnectionEvent != nil { connInfo := cc.ctx.GetSessionVars().ConnectionInfo - err = authPlugin.OnConnectionEvent(context.Background(), &auth.UserIdentity{Hostname: connInfo.Host}, plugin.ChangeUser, connInfo) + err = authPlugin.OnConnectionEvent(context.Background(), plugin.ChangeUser, connInfo) if err != nil { return err } diff --git a/server/server.go b/server/server.go index 12a3e303cc841..6d1945fb6b22a 100644 --- a/server/server.go +++ b/server/server.go @@ -49,7 +49,6 @@ import ( "github.com/blacktear23/go-proxyprotocol" "github.com/pingcap/errors" - "github.com/pingcap/parser/auth" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/config" @@ -342,7 +341,7 @@ func (s *Server) Run() error { terror.Log(clientConn.Close()) return errors.Trace(err) } - err = authPlugin.OnConnectionEvent(context.Background(), &auth.UserIdentity{Hostname: host}, plugin.PreAuth, nil) + err = authPlugin.OnConnectionEvent(context.Background(), plugin.PreAuth, &variable.ConnectionInfo{Host: host}) if err != nil { logutil.BgLogger().Info("do connection event failed", zap.Error(err)) terror.Log(clientConn.Close()) @@ -429,7 +428,7 @@ func (s *Server) onConn(conn *clientConn) { authPlugin := plugin.DeclareAuditManifest(p.Manifest) if authPlugin.OnConnectionEvent != nil { sessionVars := conn.ctx.GetSessionVars() - return authPlugin.OnConnectionEvent(context.Background(), sessionVars.User, plugin.Connected, sessionVars.ConnectionInfo) + return authPlugin.OnConnectionEvent(context.Background(), plugin.Connected, sessionVars.ConnectionInfo) } return nil }) @@ -445,7 +444,7 @@ func (s *Server) onConn(conn *clientConn) { if authPlugin.OnConnectionEvent != nil { sessionVars := conn.ctx.GetSessionVars() sessionVars.ConnectionInfo.Duration = float64(time.Since(connectedTime)) / float64(time.Millisecond) - err := authPlugin.OnConnectionEvent(context.Background(), sessionVars.User, plugin.Disconnect, sessionVars.ConnectionInfo) + err := authPlugin.OnConnectionEvent(context.Background(), plugin.Disconnect, sessionVars.ConnectionInfo) if err != nil { logutil.BgLogger().Warn("do connection event failed", zap.String("plugin", authPlugin.Name), zap.Error(err)) }