Skip to content

Commit

Permalink
plugin: add audit plugin extension point (#9136)
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu authored Mar 25, 2019
1 parent d39053d commit 0313cbb
Show file tree
Hide file tree
Showing 22 changed files with 521 additions and 105 deletions.
15 changes: 12 additions & 3 deletions cmd/pluginpkg/pluginpkg.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,18 @@ func PluginManifest() *plugin.Manifest {
},
{{end}}
},
Validate: {{.validate}},
OnInit: {{.onInit}},
OnShutdown: {{.onShutdown}},
{{if .validate }}
Validate: {{.validate}},
{{end}}
{{if .onInit }}
OnInit: {{.onInit}},
{{end}}
{{if .onShutdown }}
OnShutdown: {{.onShutdown}},
{{end}}
{{if .onFlush }}
OnFlush: {{.onFlush}},
{{end}}
},
{{range .export}}
{{.extPoint}}: {{.impl}},
Expand Down
2 changes: 1 addition & 1 deletion docs/design/2018-12-10-plugin-framework.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ validate = "Validate"
onInit = "OnInit"
onShutdown = "OnShutdown"
export = [
{extPoint="NotifyEvent", impl="NotifyEvent"}
{extPoint="OnGeneralEvent", impl="OnGeneralEvent"}
]
```

Expand Down
23 changes: 22 additions & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/planner"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/plugin"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -129,6 +130,7 @@ func (a *recordSet) NewRecordBatch() *chunk.RecordBatch {
func (a *recordSet) Close() error {
err := a.executor.Close()
a.stmt.LogSlowQuery(a.txnStartTS, a.lastErr == nil)
a.stmt.logAudit()
return errors.Trace(err)
}

Expand Down Expand Up @@ -295,6 +297,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Co
}
}
a.LogSlowQuery(txnTS, err == nil)
a.logAudit()
}()

err = e.Next(ctx, chunk.NewRecordBatch(e.newFirstChunk()))
Expand Down Expand Up @@ -362,8 +365,27 @@ func (a *ExecStmt) buildExecutor(ctx sessionctx.Context) (Executor, error) {
// QueryReplacer replaces new line and tab for grep result including query string.
var QueryReplacer = strings.NewReplacer("\r", " ", "\n", " ", "\t", " ")

func (a *ExecStmt) logAudit() {
sessVars := a.Ctx.GetSessionVars()
if sessVars.InRestrictedSQL {
return
}
err := plugin.ForeachPlugin(plugin.Audit, func(p *plugin.Plugin) error {
audit := plugin.DeclareAuditManifest(p.Manifest)
if audit.OnGeneralEvent != nil {
cmd := mysql.Command2Str[byte(atomic.LoadUint32(&a.Ctx.GetSessionVars().CommandValue))]
audit.OnGeneralEvent(context.Background(), sessVars, plugin.Log, cmd)
}
return nil
})
if err != nil {
log.Error("log audit log failure", zap.Error(err))
}
}

// LogSlowQuery is used to print the slow query in the log files.
func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool) {
sessVars := a.Ctx.GetSessionVars()
level := log.GetLevel()
if level > zapcore.WarnLevel {
return
Expand All @@ -378,7 +400,6 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool) {
if maxQueryLen := atomic.LoadUint64(&cfg.Log.QueryLogMaxLen); uint64(len(sql)) > maxQueryLen {
sql = fmt.Sprintf("%.*q(len:%d)", maxQueryLen, sql, len(a.Text))
}
sessVars := a.Ctx.GetSessionVars()
sql = QueryReplacer.Replace(sql) + sessVars.GetExecuteArgumentsInfo()

var tableIDs, indexIDs string
Expand Down
11 changes: 11 additions & 0 deletions executor/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/plugin"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -138,6 +139,16 @@ func (e *SetExecutor) setSysVariable(name string, v *expression.VarAssignment) e
if err != nil {
return errors.Trace(err)
}
err = plugin.ForeachPlugin(plugin.Audit, func(p *plugin.Plugin) error {
auditPlugin := plugin.DeclareAuditManifest(p.Manifest)
if auditPlugin.OnGlobalVariableEvent != nil {
auditPlugin.OnGlobalVariableEvent(context.Background(), e.ctx.GetSessionVars(), name, svalue)
}
return nil
})
if err != nil {
return err
}
} else {
// Set session scope system variable.
if sysVar.Scope&variable.ScopeSession == 0 {
Expand Down
21 changes: 21 additions & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/planner/property"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/types/parser_driver"
Expand Down Expand Up @@ -159,6 +160,26 @@ func (b *PlanBuilder) GetVisitInfo() []visitInfo {
return b.visitInfo
}

// GetDBTableInfo gets the accessed dbs and tables info.
func (b *PlanBuilder) GetDBTableInfo() []stmtctx.TableEntry {
var tables []stmtctx.TableEntry
existsFunc := func(tbls []stmtctx.TableEntry, tbl *stmtctx.TableEntry) bool {
for _, t := range tbls {
if t == *tbl {
return true
}
}
return false
}
for _, v := range b.visitInfo {
tbl := &stmtctx.TableEntry{DB: v.db, Table: v.table}
if !existsFunc(tables, tbl) {
tables = append(tables, *tbl)
}
}
return tables
}

// GetOptFlag gets the optFlag of the PlanBuilder.
func (b *PlanBuilder) GetOptFlag() uint64 {
return b.optFlag
Expand Down
2 changes: 2 additions & 0 deletions planner/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ func Optimize(ctx sessionctx.Context, node ast.Node, is infoschema.InfoSchema) (
return nil, err
}

ctx.GetSessionVars().StmtCtx.Tables = builder.GetDBTableInfo()

// Check privilege. Maybe it's better to move this to the Preprocess, but
// we need the table information to check privilege, which is collected
// into the visitInfo in the logical plan builder.
Expand Down
3 changes: 3 additions & 0 deletions plugin/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# The Plugin Framework

https://github.com/pingcap/tidb/blob/master/docs/design/2018-12-10-plugin-framework.md
87 changes: 87 additions & 0 deletions plugin/audit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package plugin

import (
"context"

"github.com/pingcap/parser/auth"
"github.com/pingcap/tidb/sessionctx/variable"
)

// GeneralEvent presents TiDB generate event.
type GeneralEvent byte

const (
// Log presents log event.
Log GeneralEvent = iota
// Error presents error event.
Error
// Result presents result event.
Result
// Status presents status event.
Status
)

// ConnectionEvent presents TiDB connection event.
type ConnectionEvent byte

const (
// Connected presents new connection establish event(finish auth).
Connected ConnectionEvent = iota
// Disconnect presents disconnect event.
Disconnect
// ChangeUser presents change user.
ChangeUser
// PreAuth presents event before start auth.
PreAuth
)

func (c ConnectionEvent) String() string {
switch c {
case Connected:
return "Connected"
case Disconnect:
return "Disconnect"
case ChangeUser:
return "ChangeUser"
case PreAuth:
return "PreAuth"
}
return ""
}

// ParseEvent presents events happen around parser.
type ParseEvent byte

const (
// PreParse presents event before parse.
PreParse ParseEvent = 1 + iota
// PostParse presents event after parse.
PostParse
)

// AuditManifest presents a sub-manifest that every audit plugin must provide.
type AuditManifest struct {
Manifest
// OnConnectionEvent will be called when TiDB receive or disconnect from client.
// return error will ignore and close current connection.
OnConnectionEvent func(ctx context.Context, identity *auth.UserIdentity, event ConnectionEvent, info *variable.ConnectionInfo) error
// OnGeneralEvent will be called during TiDB execution.
OnGeneralEvent func(ctx context.Context, sctx *variable.SessionVars, event GeneralEvent, cmd string)
// OnGlobalVariableEvent will be called when Change GlobalVariable.
OnGlobalVariableEvent func(ctx context.Context, sctx *variable.SessionVars, varName, varValue string)
// OnParseEvent will be called around parse logic.
OnParseEvent func(ctx context.Context, sctx *variable.SessionVars, event ParseEvent) error
}
4 changes: 2 additions & 2 deletions plugin/conn_ip_example/conn_ip_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func OnShutdown(ctx context.Context, manifest *plugin.Manifest) error {
return nil
}

// NotifyEvent implements TiDB Audit plugin's NotifyEvent SPI.
func NotifyEvent(ctx context.Context) error {
// OnGeneralEvent implements TiDB Audit plugin's OnGeneralEvent SPI.
func OnGeneralEvent(ctx context.Context, sctx *variable.SessionVars, event plugin.GeneralEvent, cmd byte, stmt string) error {
fmt.Println("conn_ip_example notifiy called")
fmt.Println("variable test: ", variable.GetSysVar("conn_ip_example_test_variable").Value)
fmt.Printf("new connection by %s\n", ctx.Value("ip"))
Expand Down
21 changes: 14 additions & 7 deletions plugin/conn_ip_example/conn_ip_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,24 @@ func LoadRunShutdownPluginExample() {
PluginVarNames: &pluginVarNames,
}

err := plugin.Init(ctx, cfg)
err := plugin.Load(ctx, cfg)
if err != nil {
panic(err)
}

ps := plugin.GetByKind(plugin.Audit)
for _, auditPlugin := range ps {
if auditPlugin.State != plugin.Ready {
continue
}
plugin.DeclareAuditManifest(auditPlugin.Manifest).NotifyEvent(context.Background(), nil)
// load and start TiDB domain.

err = plugin.Init(ctx, cfg)
if err != nil {
panic(err)
}

err = plugin.ForeachPlugin(plugin.Audit, func(auditPlugin *plugin.Plugin) error {
plugin.DeclareAuditManifest(auditPlugin.Manifest).OnGeneralEvent(context.Background(), nil, plugin.Log, "QUERY")
return nil
})
if err != nil {
panic(err)
}

plugin.Shutdown(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion plugin/conn_ip_example/manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ validate = "Validate"
onInit = "OnInit"
onShutdown = "OnShutdown"
export = [
{extPoint="NotifyEvent", impl="NotifyEvent"}
{extPoint="OnGeneralEvent", impl="OnGeneralEvent"}
]
Loading

0 comments on commit 0313cbb

Please sign in to comment.