Skip to content

Commit

Permalink
planner: encapsulate binding operations behind 2 interfaces (#49261)
Browse files Browse the repository at this point in the history
ref #48875
  • Loading branch information
qw4990 authored Dec 8, 2023
1 parent b74d64c commit 90e272a
Show file tree
Hide file tree
Showing 14 changed files with 197 additions and 87 deletions.
4 changes: 2 additions & 2 deletions pkg/bindinfo/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ go_library(
"bind_cache.go",
"bind_record.go",
"capture.go",
"handle.go",
"global_handle.go",
"session_handle.go",
"util.go",
],
Expand Down Expand Up @@ -51,7 +51,7 @@ go_test(
srcs = [
"bind_cache_test.go",
"capture_test.go",
"handle_test.go",
"global_handle_test.go",
"main_test.go",
"optimize_test.go",
"session_handle_test.go",
Expand Down
4 changes: 2 additions & 2 deletions pkg/bindinfo/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func ParseCaptureTableFilter(tableFilter string) (f tablefilter.Filter, valid bo
return f, true
}

func (h *BindHandle) extractCaptureFilterFromStorage() (filter *captureFilter) {
func (h *globalBindingHandle) extractCaptureFilterFromStorage() (filter *captureFilter) {
filter = &captureFilter{
frequency: 1,
users: make(map[string]struct{}),
Expand Down Expand Up @@ -134,7 +134,7 @@ func (h *BindHandle) extractCaptureFilterFromStorage() (filter *captureFilter) {
}

// CaptureBaselines is used to automatically capture plan baselines.
func (h *BindHandle) CaptureBaselines() {
func (h *globalBindingHandle) CaptureBaselines() {
parser4Capture := parser.New()
captureFilter := h.extractCaptureFilterFromStorage()
emptyCaptureFilter := captureFilter.isEmpty()
Expand Down
154 changes: 117 additions & 37 deletions pkg/bindinfo/handle.go → pkg/bindinfo/global_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,88 @@ import (
"golang.org/x/exp/maps"
)

// BindHandle is used to handle all global sql bind operations.
type BindHandle struct {
// GlobalBindingHandle is used to handle all global sql bind operations.
type GlobalBindingHandle interface {
// Methods for create, get, drop global sql bindings.

// GetGlobalBinding returns the BindRecord of the (normalizedSQL,db) if BindRecord exist.
GetGlobalBinding(sqlDigest, normalizedSQL, db string) *BindRecord

// GetGlobalBindingBySQLDigest returns the BindRecord of the sql digest.
GetGlobalBindingBySQLDigest(sqlDigest string) (*BindRecord, error)

// GetAllGlobalBindings returns all bind records in cache.
GetAllGlobalBindings() (bindRecords []*BindRecord)

// CreateGlobalBinding creates a BindRecord to the storage and the cache.
// It replaces all the exists bindings for the same normalized SQL.
CreateGlobalBinding(sctx sessionctx.Context, record *BindRecord) (err error)

// DropGlobalBinding drops a BindRecord to the storage and BindRecord int the cache.
DropGlobalBinding(originalSQL, db string, binding *Binding) (deletedRows uint64, err error)

// DropGlobalBindingByDigest drop BindRecord to the storage and BindRecord int the cache.
DropGlobalBindingByDigest(sqlDigest string) (deletedRows uint64, err error)

// SetGlobalBindingStatus set a BindRecord's status to the storage and bind cache.
SetGlobalBindingStatus(originalSQL string, binding *Binding, newStatus string) (ok bool, err error)

// SetGlobalBindingStatusByDigest set a BindRecord's status to the storage and bind cache.
SetGlobalBindingStatusByDigest(newStatus, sqlDigest string) (ok bool, err error)

// AddInvalidGlobalBinding adds BindRecord which needs to be deleted into invalidBindRecordMap.
AddInvalidGlobalBinding(invalidBindRecord *BindRecord)

// DropInvalidGlobalBinding executes the drop BindRecord tasks.
DropInvalidGlobalBinding()

// Methods for load and clear global sql bindings.

// Reset is to reset the BindHandle and clean old info.
Reset()

// Update updates the global sql bind cache.
Update(fullLoad bool) (err error)

// GCGlobalBinding physically removes the deleted bind records in mysql.bind_info.
GCGlobalBinding() (err error)

// LockBindInfoSQL simulates LOCK TABLE by updating a same row in each pessimistic transaction.
LockBindInfoSQL() string

// Methods for memory control.

// Size returns the size of bind info cache.
Size() int

// SetBindCacheCapacity reset the capacity for the bindCache.
SetBindCacheCapacity(capacity int64)

// GetMemUsage returns the memory usage for the bind cache.
GetMemUsage() (memUsage int64)

// GetMemCapacity returns the memory capacity for the bind cache.
GetMemCapacity() (memCapacity int64)

// Clear resets the bind handle. It is only used for test.
Clear()

// FlushGlobalBindings flushes the BindRecord in temp maps to storage and loads them into cache.
FlushGlobalBindings() error

// ReloadGlobalBindings clears existing binding cache and do a full load from mysql.bind_info.
ReloadGlobalBindings() error

// Methods for Auto Capture.

// CaptureBaselines is used to automatically capture plan baselines.
CaptureBaselines()

variable.Statistics
}

// globalBindingHandle is used to handle all global sql bind operations.
type globalBindingHandle struct {
sPool SessionPool

bindingCache atomic.Pointer[bindCache]
Expand Down Expand Up @@ -82,24 +162,24 @@ type bindRecordUpdate struct {
updateTime time.Time
}

// NewBindHandle creates a new BindHandle.
func NewBindHandle(sPool SessionPool) *BindHandle {
handle := &BindHandle{sPool: sPool}
// NewGlobalBindingHandle creates a new GlobalBindingHandle.
func NewGlobalBindingHandle(sPool SessionPool) GlobalBindingHandle {
handle := &globalBindingHandle{sPool: sPool}
handle.Reset()
return handle
}

func (h *BindHandle) getCache() *bindCache {
func (h *globalBindingHandle) getCache() *bindCache {
return h.bindingCache.Load()
}

func (h *BindHandle) setCache(c *bindCache) {
func (h *globalBindingHandle) setCache(c *bindCache) {
// TODO: update the global cache in-place instead of replacing it and remove this function.
h.bindingCache.Store(c)
}

// Reset is to reset the BindHandle and clean old info.
func (h *BindHandle) Reset() {
func (h *globalBindingHandle) Reset() {
h.lastUpdateTime.Store(types.ZeroTimestamp)
h.invalidBindRecordMap.Value.Store(make(map[string]*bindRecordUpdate))
h.invalidBindRecordMap.flushFunc = func(record *BindRecord) error {
Expand All @@ -110,16 +190,16 @@ func (h *BindHandle) Reset() {
variable.RegisterStatistics(h)
}

func (h *BindHandle) getLastUpdateTime() types.Time {
func (h *globalBindingHandle) getLastUpdateTime() types.Time {
return h.lastUpdateTime.Load().(types.Time)
}

func (h *BindHandle) setLastUpdateTime(t types.Time) {
func (h *globalBindingHandle) setLastUpdateTime(t types.Time) {
h.lastUpdateTime.Store(t)
}

// Update updates the global sql bind cache.
func (h *BindHandle) Update(fullLoad bool) (err error) {
func (h *globalBindingHandle) Update(fullLoad bool) (err error) {
lastUpdateTime := h.getLastUpdateTime()
var timeCondition string
if !fullLoad {
Expand Down Expand Up @@ -187,7 +267,7 @@ func (h *BindHandle) Update(fullLoad bool) (err error) {

// CreateGlobalBinding creates a BindRecord to the storage and the cache.
// It replaces all the exists bindings for the same normalized SQL.
func (h *BindHandle) CreateGlobalBinding(sctx sessionctx.Context, record *BindRecord) (err error) {
func (h *globalBindingHandle) CreateGlobalBinding(sctx sessionctx.Context, record *BindRecord) (err error) {
err = record.prepareHints(sctx)
if err != nil {
return err
Expand Down Expand Up @@ -244,7 +324,7 @@ func (h *BindHandle) CreateGlobalBinding(sctx sessionctx.Context, record *BindRe
}

// DropGlobalBinding drops a BindRecord to the storage and BindRecord int the cache.
func (h *BindHandle) DropGlobalBinding(originalSQL, db string, binding *Binding) (deletedRows uint64, err error) {
func (h *globalBindingHandle) DropGlobalBinding(originalSQL, db string, binding *Binding) (deletedRows uint64, err error) {
err = h.callWithSCtx(false, func(sctx sessionctx.Context) error {
db = strings.ToLower(db)
defer func() {
Expand Down Expand Up @@ -283,7 +363,7 @@ func (h *BindHandle) DropGlobalBinding(originalSQL, db string, binding *Binding)
}

// DropGlobalBindingByDigest drop BindRecord to the storage and BindRecord int the cache.
func (h *BindHandle) DropGlobalBindingByDigest(sqlDigest string) (deletedRows uint64, err error) {
func (h *globalBindingHandle) DropGlobalBindingByDigest(sqlDigest string) (deletedRows uint64, err error) {
oldRecord, err := h.GetGlobalBindingBySQLDigest(sqlDigest)
if err != nil {
return 0, err
Expand All @@ -292,7 +372,7 @@ func (h *BindHandle) DropGlobalBindingByDigest(sqlDigest string) (deletedRows ui
}

// SetGlobalBindingStatus set a BindRecord's status to the storage and bind cache.
func (h *BindHandle) SetGlobalBindingStatus(originalSQL string, binding *Binding, newStatus string) (ok bool, err error) {
func (h *globalBindingHandle) SetGlobalBindingStatus(originalSQL string, binding *Binding, newStatus string) (ok bool, err error) {
var (
updateTs types.Time
oldStatus0, oldStatus1 string
Expand Down Expand Up @@ -361,7 +441,7 @@ func (h *BindHandle) SetGlobalBindingStatus(originalSQL string, binding *Binding
}

// SetGlobalBindingStatusByDigest set a BindRecord's status to the storage and bind cache.
func (h *BindHandle) SetGlobalBindingStatusByDigest(newStatus, sqlDigest string) (ok bool, err error) {
func (h *globalBindingHandle) SetGlobalBindingStatusByDigest(newStatus, sqlDigest string) (ok bool, err error) {
oldRecord, err := h.GetGlobalBindingBySQLDigest(sqlDigest)
if err != nil {
return false, err
Expand All @@ -370,7 +450,7 @@ func (h *BindHandle) SetGlobalBindingStatusByDigest(newStatus, sqlDigest string)
}

// GCGlobalBinding physically removes the deleted bind records in mysql.bind_info.
func (h *BindHandle) GCGlobalBinding() (err error) {
func (h *globalBindingHandle) GCGlobalBinding() (err error) {
return h.callWithSCtx(true, func(sctx sessionctx.Context) error {
// Lock mysql.bind_info to synchronize with CreateBindRecord / AddBindRecord / DropBindRecord on other tidb instances.
if err = h.lockBindInfoTable(sctx); err != nil {
Expand All @@ -392,14 +472,14 @@ func (h *BindHandle) GCGlobalBinding() (err error) {
// generally available later.
// This lock would enforce the CREATE / DROP GLOBAL BINDING statements to be executed sequentially,
// even if they come from different tidb instances.
func (h *BindHandle) lockBindInfoTable(sctx sessionctx.Context) error {
func (h *globalBindingHandle) lockBindInfoTable(sctx sessionctx.Context) error {
// h.sctx already locked.
_, err := exec(sctx, h.LockBindInfoSQL())
return err
}

// LockBindInfoSQL simulates LOCK TABLE by updating a same row in each pessimistic transaction.
func (*BindHandle) LockBindInfoSQL() string {
func (*globalBindingHandle) LockBindInfoSQL() string {
sql, err := sqlescape.EscapeSQL("UPDATE mysql.bind_info SET source= %? WHERE original_sql= %?", Builtin, BuiltinPseudoSQL4BindLock)
if err != nil {
return ""
Expand Down Expand Up @@ -458,49 +538,49 @@ func (tmpMap *tmpBindRecordMap) Add(bindRecord *BindRecord) {
}

// DropInvalidGlobalBinding executes the drop BindRecord tasks.
func (h *BindHandle) DropInvalidGlobalBinding() {
func (h *globalBindingHandle) DropInvalidGlobalBinding() {
h.invalidBindRecordMap.flushToStore()
}

// AddInvalidGlobalBinding adds BindRecord which needs to be deleted into invalidBindRecordMap.
func (h *BindHandle) AddInvalidGlobalBinding(invalidBindRecord *BindRecord) {
func (h *globalBindingHandle) AddInvalidGlobalBinding(invalidBindRecord *BindRecord) {
h.invalidBindRecordMap.Add(invalidBindRecord)
}

// Size returns the size of bind info cache.
func (h *BindHandle) Size() int {
func (h *globalBindingHandle) Size() int {
size := len(h.getCache().GetAllBindings())
return size
}

// GetGlobalBinding returns the BindRecord of the (normalizedSQL,db) if BindRecord exist.
func (h *BindHandle) GetGlobalBinding(sqlDigest, normalizedSQL, db string) *BindRecord {
func (h *globalBindingHandle) GetGlobalBinding(sqlDigest, normalizedSQL, db string) *BindRecord {
return h.getCache().GetBinding(sqlDigest, normalizedSQL, db)
}

// GetGlobalBindingBySQLDigest returns the BindRecord of the sql digest.
func (h *BindHandle) GetGlobalBindingBySQLDigest(sqlDigest string) (*BindRecord, error) {
func (h *globalBindingHandle) GetGlobalBindingBySQLDigest(sqlDigest string) (*BindRecord, error) {
return h.getCache().GetBindingBySQLDigest(sqlDigest)
}

// GetAllGlobalBinding returns all bind records in cache.
func (h *BindHandle) GetAllGlobalBinding() (bindRecords []*BindRecord) {
// GetAllGlobalBindings returns all bind records in cache.
func (h *globalBindingHandle) GetAllGlobalBindings() (bindRecords []*BindRecord) {
return h.getCache().GetAllBindings()
}

// SetBindCacheCapacity reset the capacity for the bindCache.
// It will not affect already cached BindRecords.
func (h *BindHandle) SetBindCacheCapacity(capacity int64) {
func (h *globalBindingHandle) SetBindCacheCapacity(capacity int64) {
h.getCache().SetMemCapacity(capacity)
}

// GetMemUsage returns the memory usage for the bind cache.
func (h *BindHandle) GetMemUsage() (memUsage int64) {
func (h *globalBindingHandle) GetMemUsage() (memUsage int64) {
return h.getCache().GetMemUsage()
}

// GetMemCapacity returns the memory capacity for the bind cache.
func (h *BindHandle) GetMemCapacity() (memCapacity int64) {
func (h *globalBindingHandle) GetMemCapacity() (memCapacity int64) {
return h.getCache().GetMemCapacity()
}

Expand Down Expand Up @@ -535,7 +615,7 @@ func newBindRecord(sctx sessionctx.Context, row chunk.Row) (string, *BindRecord,

// setGlobalCacheBinding sets the BindRecord to the cache, if there already exists a BindRecord,
// it will be overridden.
func (h *BindHandle) setGlobalCacheBinding(sqlDigest string, meta *BindRecord) {
func (h *globalBindingHandle) setGlobalCacheBinding(sqlDigest string, meta *BindRecord) {
newCache, err0 := h.getCache().Copy()
if err0 != nil {
logutil.BgLogger().Warn("BindHandle.setGlobalCacheBindRecord", zap.String("category", "sql-bind"), zap.Error(err0))
Expand All @@ -550,7 +630,7 @@ func (h *BindHandle) setGlobalCacheBinding(sqlDigest string, meta *BindRecord) {
}

// removeGlobalCacheBinding removes the BindRecord from the cache.
func (h *BindHandle) removeGlobalCacheBinding(sqlDigest string, meta *BindRecord) {
func (h *globalBindingHandle) removeGlobalCacheBinding(sqlDigest string, meta *BindRecord) {
newCache, err := h.getCache().Copy()
if err != nil {
logutil.BgLogger().Warn("", zap.String("category", "sql-bind"), zap.Error(err))
Expand Down Expand Up @@ -680,27 +760,27 @@ func (*paramMarkerChecker) Leave(in ast.Node) (ast.Node, bool) {
}

// Clear resets the bind handle. It is only used for test.
func (h *BindHandle) Clear() {
func (h *globalBindingHandle) Clear() {
h.setCache(newBindCache())
h.setLastUpdateTime(types.ZeroTimestamp)
h.invalidBindRecordMap.Store(make(map[string]*bindRecordUpdate))
}

// FlushGlobalBindings flushes the BindRecord in temp maps to storage and loads them into cache.
func (h *BindHandle) FlushGlobalBindings() error {
func (h *globalBindingHandle) FlushGlobalBindings() error {
h.DropInvalidGlobalBinding()
return h.Update(false)
}

// ReloadGlobalBindings clears existing binding cache and do a full load from mysql.bind_info.
// It is used to maintain consistency between cache and mysql.bind_info if the table is deleted or truncated.
func (h *BindHandle) ReloadGlobalBindings() error {
func (h *globalBindingHandle) ReloadGlobalBindings() error {
h.setCache(newBindCache())
h.setLastUpdateTime(types.ZeroTimestamp)
return h.Update(true)
}

func (h *BindHandle) callWithSCtx(wrapTxn bool, f func(sctx sessionctx.Context) error) (err error) {
func (h *globalBindingHandle) callWithSCtx(wrapTxn bool, f func(sctx sessionctx.Context) error) (err error) {
resource, err := h.sPool.Get()
if err != nil {
return err
Expand Down Expand Up @@ -735,12 +815,12 @@ var (
)

// GetScope gets the status variables scope.
func (*BindHandle) GetScope(_ string) variable.ScopeFlag {
func (*globalBindingHandle) GetScope(_ string) variable.ScopeFlag {
return variable.ScopeSession
}

// Stats returns the server statistics.
func (h *BindHandle) Stats(_ *variable.SessionVars) (map[string]interface{}, error) {
func (h *globalBindingHandle) Stats(_ *variable.SessionVars) (map[string]interface{}, error) {
m := make(map[string]interface{})
m[lastPlanBindingUpdateTime] = h.getLastUpdateTime().String()
return m, nil
Expand Down
Loading

0 comments on commit 90e272a

Please sign in to comment.