Skip to content

Commit

Permalink
Merge pull request #6268 from planetscale/rn-se-notifier-lock
Browse files Browse the repository at this point in the history
Schema Engine Notifier: fix race between schema change broadcasts and…
  • Loading branch information
systay authored Jun 4, 2020
2 parents 3af7cd8 + 352322c commit ce919d6
Showing 1 changed file with 41 additions and 20 deletions.
61 changes: 41 additions & 20 deletions go/vt/vttablet/tabletserver/schema/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type Engine struct {
reloadTime time.Duration
//the position at which the schema was last loaded. it is only used in conjunction with ReloadAt
reloadAtPos mysql.Position
notifierMu sync.Mutex
notifiers map[string]notifier

// The following fields have their own synchronization
Expand All @@ -67,6 +68,18 @@ type Engine struct {
ticks *timer.Timer
}

// Lock acquires the SE mutex with optional logging (useful for debugging deadlocks)
func (se *Engine) Lock(msg string) {
log.V(2).Infof("SE: acquiring Lock %s", msg)
se.mu.Lock()
}

// Unlock releases the SE mutex with optional logging (useful for debugging deadlocks)
func (se *Engine) Unlock(msg string) {
log.V(2).Infof("SE: releasing Lock %s", msg)
se.mu.Unlock()
}

// NewEngine creates a new Engine.
func NewEngine(env tabletenv.Env) *Engine {
reloadTime := time.Duration(env.Config().SchemaReloadIntervalSeconds * 1e9)
Expand Down Expand Up @@ -106,8 +119,8 @@ func (se *Engine) InitDBConfig(cp dbconfigs.Connector) {
// Open initializes the Engine. Calling Open on an already
// open engine is a no-op.
func (se *Engine) Open() error {
se.mu.Lock()
defer se.mu.Unlock()
se.Lock("Open")
defer se.Unlock("Open")
if se.isOpen {
return nil
}
Expand All @@ -134,8 +147,8 @@ func (se *Engine) Open() error {

// IsOpen checks if engine is open
func (se *Engine) IsOpen() bool {
se.mu.Lock()
defer se.mu.Unlock()
se.Lock("IsOpen")
defer se.Unlock("IsOpen")
return se.isOpen
}

Expand All @@ -147,8 +160,8 @@ func (se *Engine) GetConnection(ctx context.Context) (*connpool.DBConn, error) {
// Close shuts down Engine and is idempotent.
// It can be re-opened after Close.
func (se *Engine) Close() {
se.mu.Lock()
defer se.mu.Unlock()
se.Lock("Close")
defer se.Unlock("Close")
if !se.isOpen {
return
}
Expand All @@ -164,8 +177,8 @@ func (se *Engine) Close() {
// they don't get accidentally reused after losing mastership.
func (se *Engine) MakeNonMaster() {
// This function is tested through endtoend test.
se.mu.Lock()
defer se.mu.Unlock()
se.Lock("MakeNonMaster")
defer se.Unlock("MakeNonMaster")
for _, t := range se.tables {
if t.SequenceInfo != nil {
t.SequenceInfo.Lock()
Expand All @@ -187,8 +200,8 @@ func (se *Engine) Reload(ctx context.Context) error {
// It maintains the position at which the schema was reloaded and if the same position is provided
// (say by multiple vstreams) it returns the cached schema. In case of a newer or empty pos it always reloads the schema
func (se *Engine) ReloadAt(ctx context.Context, pos mysql.Position) error {
se.mu.Lock()
defer se.mu.Unlock()
se.Lock("ReloadAt")
defer se.Unlock("ReloadAt")
if !se.isOpen {
log.Warning("Schema reload called for an engine that is not yet open")
return nil
Expand Down Expand Up @@ -324,12 +337,13 @@ func (se *Engine) populatePrimaryKeys(ctx context.Context, conn *connpool.DBConn
// function must not change the map or its contents. The only exception
// is the sequence table where the values can be changed using the lock.
func (se *Engine) RegisterNotifier(name string, f notifier) {
se.mu.Lock()
defer se.mu.Unlock()
if !se.isOpen {
return
}

se.notifierMu.Lock()
defer se.notifierMu.Unlock()

se.notifiers[name] = f
var created []string
for tableName := range se.tables {
Expand All @@ -340,17 +354,24 @@ func (se *Engine) RegisterNotifier(name string, f notifier) {

// UnregisterNotifier unregisters the notifier function.
func (se *Engine) UnregisterNotifier(name string) {
se.mu.Lock()
defer se.mu.Unlock()
if !se.isOpen {
return
}

se.notifierMu.Lock()
defer se.notifierMu.Unlock()

delete(se.notifiers, name)
}

// broadcast must be called while holding a lock on se.mu.
func (se *Engine) broadcast(created, altered, dropped []string) {
if !se.isOpen {
return
}

se.notifierMu.Lock()
defer se.notifierMu.Unlock()
s := make(map[string]*Table, len(se.tables))
for k, v := range se.tables {
s[k] = v
Expand All @@ -362,16 +383,16 @@ func (se *Engine) broadcast(created, altered, dropped []string) {

// GetTable returns the info for a table.
func (se *Engine) GetTable(tableName sqlparser.TableIdent) *Table {
se.mu.Lock()
defer se.mu.Unlock()
se.Lock("GetTable")
defer se.Unlock("GetTable")
return se.tables[tableName.String()]
}

// GetSchema returns the current The Tables are a shared
// data structure and must be treated as read-only.
func (se *Engine) GetSchema() map[string]*Table {
se.mu.Lock()
defer se.mu.Unlock()
se.Lock("GetSchema")
defer se.Unlock("GetSchema")
tables := make(map[string]*Table, len(se.tables))
for k, v := range se.tables {
tables[k] = v
Expand Down Expand Up @@ -422,7 +443,7 @@ func NewEngineForTests() *Engine {

// SetTableForTests puts a Table in the map directly.
func (se *Engine) SetTableForTests(table *Table) {
se.mu.Lock()
defer se.mu.Unlock()
se.Lock("SetTableForTests")
defer se.Unlock("SetTableForTests")
se.tables[table.Name.String()] = table
}

0 comments on commit ce919d6

Please sign in to comment.