Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add semi-sync monitor to unblock primaries blocked on semi-sync ACKs #17763

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
771dc7e
feat: add the first iteration of semi-sync watcher
GuptaManan100 Feb 12, 2025
4dcaf54
feat: add semisync recovery sql and other missing pieces in the code
GuptaManan100 Feb 13, 2025
8a47cc0
test: add a test to check semi-sync block issue
GuptaManan100 Feb 14, 2025
bf9b845
feat: address review comments
GuptaManan100 Feb 14, 2025
955d0c0
feat: wire up the monitor to the state manager and the tablet server
GuptaManan100 Feb 14, 2025
3124339
feat: fix some bugs
GuptaManan100 Feb 14, 2025
77f964f
Merge remote-tracking branch 'upstream/main' into semi-sync-watcher
GuptaManan100 Feb 17, 2025
2b2708f
feat: add the capability to the monitor to keep track of outstanding …
GuptaManan100 Feb 17, 2025
0be0743
feat: add the capability to run ERS to VTOrc when it detects semi-syn…
GuptaManan100 Feb 17, 2025
d33b460
feat: added test to verify VTOrc runs an ERS when semi-sync is blocke…
GuptaManan100 Feb 17, 2025
6092978
feat: make the monitor interval configurable
GuptaManan100 Feb 17, 2025
7286419
feat: add a gauge for the monitor
GuptaManan100 Feb 17, 2025
4725dd9
feat: move the semi-sync monitor from the tabletserver to tablet mana…
GuptaManan100 Feb 17, 2025
a8d2cbd
test: poll the new gauge in the test
GuptaManan100 Feb 17, 2025
da4a8f8
feat: fix bug in isClosed function where we locked twice :facepalm:
GuptaManan100 Feb 17, 2025
6cf02ab
test: add comprehensive testing for the semi-sync monitor
GuptaManan100 Feb 17, 2025
942eaf0
test: fix tests by doing nil checks and adding semi-sync monitor to f…
GuptaManan100 Feb 18, 2025
3a3db2a
test: add demote primary test that makes sure we block when semi-sync…
GuptaManan100 Feb 18, 2025
eaa9246
summary: add summary changes to make users aware of the change
GuptaManan100 Feb 18, 2025
a571e48
test: fix more tests by updating expectations
GuptaManan100 Feb 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions go/vt/sidecardb/schema/misc/semisync_recover.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
Copyright 2025 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

CREATE TABLE IF NOT EXISTS semisync_recover
(
ts BIGINT UNSIGNED NOT NULL,
PRIMARY KEY (`ts`)
) ENGINE = InnoDB CHARSET = utf8mb4
2 changes: 2 additions & 0 deletions go/vt/vttablet/tabletmanager/rpc_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,8 @@ func (tm *TabletManager) demotePrimary(ctx context.Context, revertPartialFailure
// set MySQL to super_read_only mode. If we are already super_read_only because of a
// previous demotion, or because we are not primary anyway, this should be
// idempotent.

// TODO(@GuptaManan100): Reject PR if not done. Check we have no writes blocked on semi-sync.
if _, err := tm.MysqlDaemon.SetSuperReadOnly(ctx, true); err != nil {
if sqlErr, ok := err.(*sqlerror.SQLError); ok && sqlErr.Number() == sqlerror.ERUnknownSystemVariable {
log.Warningf("server does not know about super_read_only, continuing anyway...")
Expand Down
282 changes: 282 additions & 0 deletions go/vt/vttablet/tabletserver/semisyncwatcher/watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
/*
Copyright 2025 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package semisyncwatcher

import (
"context"
"errors"
"math"
"sync"
"time"

"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/dbconnpool"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
)

const (
semiSyncWaitSessionsRead = "SHOW STATUS LIKE 'Rpl_semi_sync_%_wait_sessions'"
semiSyncRecoverWrite = "INSERT INTO semisync_recover (ts) VALUES (NOW())"
semiSyncRecoverClear = "DELETE FROM semisync_recover"
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
)

// Watcher is a watcher that checks if the primary tablet
// is blocked on a semi-sync ack from the replica.
// If the semi-sync ACK is lost in the network,
// it is possible that the primary is indefinitely stuck,
// blocking PRS. The watcher looks for this situation and manufactures a write
// periodically to unblock the primary.
type Watcher struct {
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
// env is used to get the connection parameters.
env tabletenv.Env
// ticks is the ticker on which we'll check
// if the primary is blocked on semi-sync ACKs or not.
ticks *timer.Timer
// clearTicks is the ticker to clear the data in
// the semisync_recover table.
clearTicks *timer.Timer

// mu protects the fields below.
mu sync.Mutex
appPool *dbconnpool.ConnectionPool
isOpen bool
// isWriting stores if the watcher is currently writing to the DB.
// We don't want two different threads initiating writes, so we use this
// for synchronization.
isWriting bool
// isBlocked stores if the primary is blocked on semi-sync ack.
isBlocked bool
// waiters stores the list of waiters that are waiting for the primary to be unblocked.
waiters []chan any
}

// NewWatcher creates a new Watcher.
func NewWatcher(env tabletenv.Env) *Watcher {
mattlord marked this conversation as resolved.
Show resolved Hide resolved
// TODO (@GuptaManan100): Parameterize the watch interval.
watchInterval := 30 * time.Second
return &Watcher{
env: env,
ticks: timer.NewTimer(watchInterval),
// We clear the data every day. We can make it configurable in the future,
// but this seams fine for now.
clearTicks: timer.NewTimer(24 * time.Hour),
appPool: dbconnpool.NewConnectionPool("SemiSyncWatcherAppPool", env.Exporter(), 20, mysqlctl.DbaIdleTimeout, 0, mysqlctl.PoolDynamicHostnameResolution),
waiters: make([]chan any, 0),
}
}

// Open starts the watcher.
func (w *Watcher) Open() {
w.mu.Lock()
defer w.mu.Unlock()
if w.isOpen {
// If we are already open, then there is nothing to do
return
}
// Set the watcher to be open.
w.isOpen = true
log.Info("SemiSync Watcher: opening")

// This function could be running from within a unit test scope, in which case we use
// mock pools that are already open. This is why we test for the pool being open.
if !w.appPool.IsOpen() {
w.appPool.Open(w.env.Config().DB.AppWithDB())
}
w.clearTicks.Start(w.clearAllData)
w.ticks.Start(w.checkAndFixSemiSyncBlocked)
}

// Close stops the watcher.
func (w *Watcher) Close() {
w.mu.Lock()
defer w.mu.Unlock()
if !w.isOpen {
// If we are already closed, then there is nothing to do
return
}
w.isOpen = false
log.Info("SemiSync Watcher: closing")
w.clearTicks.Stop()
w.ticks.Stop()
w.appPool.Close()
}

// checkAndFixSemiSyncBlocked checks if the primary is blocked on semi-sync ack
// and manufactures a write to unblock the primary. This function is safe to
// be called multiple times in parallel.
func (w *Watcher) checkAndFixSemiSyncBlocked() {
// Check if semi-sync is blocked or not
isBlocked, err := w.isSemiSyncBlocked(context.Background())
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
// If we are unable to determine whether the primary is blocked or not,
// then we can just abort the function and try again later.
log.Errorf("SemiSync Watcher: failed to check if primary is blocked on semi-sync: %v", err)
return
}
// Set the isBlocked state.
w.setIsBlocked(isBlocked)
if isBlocked {
// If we are blocked, then we want to start the writes.
// That function is re-entrant. If we are already writing, then it will just return.
w.startWrites()
}
}

// isSemiSyncBlocked checks if the primary is blocked on semi-sync.
func (w *Watcher) isSemiSyncBlocked(ctx context.Context) (bool, error) {
// Get a connection from the pool
conn, err := w.appPool.Get(ctx)
if err != nil {
return false, err
}
defer conn.Recycle()

// Execute the query to check if the primary is blocked on semi-sync.
res, err := conn.Conn.ExecuteFetch(semiSyncWaitSessionsRead, 1, false)
if err != nil {
return false, err
}
// If we have no rows, then the primary doesn't have semi-sync enabled.
// It then follows, that the primary isn't blocked :)
if len(res.Rows) == 0 {
return false, nil
}

// Read the status value and check if it is non-zero.
if len(res.Rows) != 1 || len(res.Rows[0]) != 2 {
return false, errors.New("unexpected number of rows received")
}
value, err := res.Rows[0][1].ToInt()
return value != 0, err
}

// waitUntilSemiSyncUnblocked waits until the primary is not blocked
// on semi-sync.
func (w *Watcher) waitUntilSemiSyncUnblocked() {
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
// run one iteration of checking if semi-sync is blocked or not.
w.checkAndFixSemiSyncBlocked()
if !w.stillBlocked() {
// If we find that the primary isn't blocked, we're good,
// we don't need to wait for anything.
return
}
// The primary is blocked. We need to wait for it to be unblocked.
ch := w.addWaiter()
<-ch
}

// stillBlocked returns true if the watcher should continue writing to the DB
// because the watcher is still open, and the primary is still blocked.
func (w *Watcher) stillBlocked() bool {
w.mu.Lock()
defer w.mu.Unlock()
return w.isOpen && w.isBlocked
}

// checkAndSetIsWriting checks if the watcher is already writing to the DB.
// If it is not, then it sets the isWriting field and signals the caller.
func (w *Watcher) checkAndSetIsWriting() bool {
w.mu.Lock()
defer w.mu.Unlock()
if w.isWriting {
return false
}
w.isWriting = true
return true
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this affects the query hot path, right? If it does, then it might be worth e.g. using 1 byte for the status and using bits in there for isWriting, isBlocked, isOpen etc. so that we can use atomics for reading them, CAS for optional changes, etc. If nothing else, it's probably worth moving these to atomic.Bool so that e.g. checkAndSetIsWriting can be one atomic call:

    return w.isWriting(false, true)

It makes the code simpler, clearer, and less contentious / efficient.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think performance is too much of a concern, but the usage of having multiple bool fields behind a mutex vs atomic.Bool I think becomes a matter of preference. I for one, like to have the former because that means that only one boolean value transitions at a point in time, but with atomic bool values it can change even when you've just read that value.


// clearIsWriting clears the isWriting field.
func (w *Watcher) clearIsWriting() {
w.mu.Lock()
defer w.mu.Unlock()
w.isWriting = false
}

// startWrites starts writing to the DB.
// It is re-entrant and will return if we are already writing.
func (w *Watcher) startWrites() {
// If we are already writing, then we can just return.
if !w.checkAndSetIsWriting() {
return
}
// We defer the clear of the isWriting field.
defer w.clearIsWriting()

// We start writing to the DB with a backoff.
backoff := 1 * time.Second
maxBackoff := 1 * time.Minute
// Check if we need to continue writing or not.
for w.stillBlocked() {
go w.write()
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
<-time.After(backoff)
backoff = time.Duration(math.Min(float64(backoff*2), float64(maxBackoff)))
}
}

// write writes a heartbeat to unblock semi-sync being stuck.
func (w *Watcher) write() {
// Get a connection from the pool
conn, err := w.appPool.Get(context.Background())
if err != nil {
return
}
defer conn.Recycle()
_, _ = conn.Conn.ExecuteFetch(semiSyncRecoverWrite, 0, false)
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
}

// setIsBlocked sets the isBlocked field.
func (w *Watcher) setIsBlocked(val bool) {
w.mu.Lock()
defer w.mu.Unlock()
w.isBlocked = val
if !val {
// If we are unblocked, then we need to signal all the waiters.
for _, ch := range w.waiters {
close(ch)
}
// We also empty the list of current waiters.
w.waiters = nil
}
}

// clearAllData clears all the data in the table so that it never
// consumes too much space on the MySQL instance.
func (w *Watcher) clearAllData() {
// Get a connection from the pool
conn, err := w.appPool.Get(context.Background())
if err != nil {
log.Errorf("SemiSync Watcher: failed to clear semisync_recovery table: %v", err)
return
}
defer conn.Recycle()
_, err = conn.Conn.ExecuteFetch(semiSyncRecoverClear, 0, false)
if err != nil {
log.Errorf("SemiSync Watcher: failed to clear semisync_recovery table: %v", err)
}
}

// addWaiter adds a waiter to the list of waiters
// that will be unblocked when the primary is no longer blocked.
func (w *Watcher) addWaiter() chan any {
w.mu.Lock()
defer w.mu.Unlock()
ch := make(chan any, 1)
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
w.waiters = append(w.waiters, ch)
return ch
}
Loading