Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schema Engine Notifier: fix race between schema change broadcasts and… #6268

Merged
merged 1 commit into from
Jun 4, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 41 additions & 20 deletions go/vt/vttablet/tabletserver/schema/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type Engine struct {
reloadTime time.Duration
//the position at which the schema was last loaded. it is only used in conjunction with ReloadAt
reloadAtPos mysql.Position
notifierMu sync.Mutex
notifiers map[string]notifier

// The following fields have their own synchronization
Expand All @@ -67,6 +68,18 @@ type Engine struct {
ticks *timer.Timer
}

// Lock acquires the SE mutex with optional logging (useful for debugging deadlocks)
func (se *Engine) Lock(msg string) {
log.V(2).Infof("SE: acquiring Lock %s", msg)
se.mu.Lock()
}

// Unlock releases the SE mutex with optional logging (useful for debugging deadlocks)
func (se *Engine) Unlock(msg string) {
log.V(2).Infof("SE: releasing Lock %s", msg)
se.mu.Unlock()
}

// NewEngine creates a new Engine.
func NewEngine(env tabletenv.Env) *Engine {
reloadTime := time.Duration(env.Config().SchemaReloadIntervalSeconds * 1e9)
Expand Down Expand Up @@ -106,8 +119,8 @@ func (se *Engine) InitDBConfig(cp dbconfigs.Connector) {
// Open initializes the Engine. Calling Open on an already
// open engine is a no-op.
func (se *Engine) Open() error {
se.mu.Lock()
defer se.mu.Unlock()
se.Lock("Open")
defer se.Unlock("Open")
if se.isOpen {
return nil
}
Expand All @@ -134,8 +147,8 @@ func (se *Engine) Open() error {

// IsOpen checks if engine is open
func (se *Engine) IsOpen() bool {
se.mu.Lock()
defer se.mu.Unlock()
se.Lock("IsOpen")
defer se.Unlock("IsOpen")
return se.isOpen
}

Expand All @@ -147,8 +160,8 @@ func (se *Engine) GetConnection(ctx context.Context) (*connpool.DBConn, error) {
// Close shuts down Engine and is idempotent.
// It can be re-opened after Close.
func (se *Engine) Close() {
se.mu.Lock()
defer se.mu.Unlock()
se.Lock("Close")
defer se.Unlock("Close")
if !se.isOpen {
return
}
Expand All @@ -164,8 +177,8 @@ func (se *Engine) Close() {
// they don't get accidentally reused after losing mastership.
func (se *Engine) MakeNonMaster() {
// This function is tested through endtoend test.
se.mu.Lock()
defer se.mu.Unlock()
se.Lock("MakeNonMaster")
defer se.Unlock("MakeNonMaster")
for _, t := range se.tables {
if t.SequenceInfo != nil {
t.SequenceInfo.Lock()
Expand All @@ -187,8 +200,8 @@ func (se *Engine) Reload(ctx context.Context) error {
// It maintains the position at which the schema was reloaded and if the same position is provided
// (say by multiple vstreams) it returns the cached schema. In case of a newer or empty pos it always reloads the schema
func (se *Engine) ReloadAt(ctx context.Context, pos mysql.Position) error {
se.mu.Lock()
defer se.mu.Unlock()
se.Lock("ReloadAt")
defer se.Unlock("ReloadAt")
if !se.isOpen {
log.Warning("Schema reload called for an engine that is not yet open")
return nil
Expand Down Expand Up @@ -324,12 +337,13 @@ func (se *Engine) populatePrimaryKeys(ctx context.Context, conn *connpool.DBConn
// function must not change the map or its contents. The only exception
// is the sequence table where the values can be changed using the lock.
func (se *Engine) RegisterNotifier(name string, f notifier) {
se.mu.Lock()
defer se.mu.Unlock()
if !se.isOpen {
return
}

se.notifierMu.Lock()
defer se.notifierMu.Unlock()

se.notifiers[name] = f
var created []string
for tableName := range se.tables {
Expand All @@ -340,17 +354,24 @@ func (se *Engine) RegisterNotifier(name string, f notifier) {

// UnregisterNotifier unregisters the notifier function.
func (se *Engine) UnregisterNotifier(name string) {
se.mu.Lock()
defer se.mu.Unlock()
if !se.isOpen {
return
}

se.notifierMu.Lock()
defer se.notifierMu.Unlock()

delete(se.notifiers, name)
}

// broadcast must be called while holding a lock on se.mu.
func (se *Engine) broadcast(created, altered, dropped []string) {
if !se.isOpen {
return
}

se.notifierMu.Lock()
defer se.notifierMu.Unlock()
s := make(map[string]*Table, len(se.tables))
for k, v := range se.tables {
s[k] = v
Expand All @@ -362,16 +383,16 @@ func (se *Engine) broadcast(created, altered, dropped []string) {

// GetTable returns the info for a table.
func (se *Engine) GetTable(tableName sqlparser.TableIdent) *Table {
se.mu.Lock()
defer se.mu.Unlock()
se.Lock("GetTable")
defer se.Unlock("GetTable")
return se.tables[tableName.String()]
}

// GetSchema returns the current The Tables are a shared
// data structure and must be treated as read-only.
func (se *Engine) GetSchema() map[string]*Table {
se.mu.Lock()
defer se.mu.Unlock()
se.Lock("GetSchema")
defer se.Unlock("GetSchema")
tables := make(map[string]*Table, len(se.tables))
for k, v := range se.tables {
tables[k] = v
Expand Down Expand Up @@ -422,7 +443,7 @@ func NewEngineForTests() *Engine {

// SetTableForTests puts a Table in the map directly.
func (se *Engine) SetTableForTests(table *Table) {
se.mu.Lock()
defer se.mu.Unlock()
se.Lock("SetTableForTests")
defer se.Unlock("SetTableForTests")
se.tables[table.Name.String()] = table
}