Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add semi-sync monitor to unblock primaries blocked on semi-sync ACKs #17763

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
771dc7e
feat: add the first iteration of semi-sync watcher
GuptaManan100 Feb 12, 2025
4dcaf54
feat: add semisync recovery sql and other missing pieces in the code
GuptaManan100 Feb 13, 2025
8a47cc0
test: add a test to check semi-sync block issue
GuptaManan100 Feb 14, 2025
bf9b845
feat: address review comments
GuptaManan100 Feb 14, 2025
955d0c0
feat: wire up the monitor to the state manager and the tablet server
GuptaManan100 Feb 14, 2025
3124339
feat: fix some bugs
GuptaManan100 Feb 14, 2025
77f964f
Merge remote-tracking branch 'upstream/main' into semi-sync-watcher
GuptaManan100 Feb 17, 2025
2b2708f
feat: add the capability to the monitor to keep track of outstanding …
GuptaManan100 Feb 17, 2025
0be0743
feat: add the capability to run ERS to VTOrc when it detects semi-syn…
GuptaManan100 Feb 17, 2025
d33b460
feat: added test to verify VTOrc runs an ERS when semi-sync is blocke…
GuptaManan100 Feb 17, 2025
6092978
feat: make the monitor interval configurable
GuptaManan100 Feb 17, 2025
7286419
feat: add a gauge for the monitor
GuptaManan100 Feb 17, 2025
4725dd9
feat: move the semi-sync monitor from the tabletserver to tablet mana…
GuptaManan100 Feb 17, 2025
a8d2cbd
test: poll the new gauge in the test
GuptaManan100 Feb 17, 2025
da4a8f8
feat: fix bug in isClosed function where we locked twice :facepalm:
GuptaManan100 Feb 17, 2025
6cf02ab
test: add comprehensive testing for the semi-sync monitor
GuptaManan100 Feb 17, 2025
942eaf0
test: fix tests by doing nil checks and adding semi-sync monitor to f…
GuptaManan100 Feb 18, 2025
3a3db2a
test: add demote primary test that makes sure we block when semi-sync…
GuptaManan100 Feb 18, 2025
eaa9246
summary: add summary changes to make users aware of the change
GuptaManan100 Feb 18, 2025
a571e48
test: fix more tests by updating expectations
GuptaManan100 Feb 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions changelog/22.0/22.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
- **[Update lite images to Debian Bookworm](#debian-bookworm)**
- **[KeyRanges in `--clusters_to_watch` in VTOrc](#key-range-vtorc)**
- **[Support for Filtering Query logs on Error](#query-logs)**
- **[Semi-sync monitor in vttablet](#semi-sync-monitor)**
- **[Minor Changes](#minor-changes)**
- **[VTTablet Flags](#flags-vttablet)**
- **[VTTablet ACL enforcement and reloading](#reloading-vttablet-acl)**
Expand Down Expand Up @@ -162,6 +163,14 @@ Users can continue to specify exact keyranges. The new feature is backward compa

The `querylog-mode` setting can be configured to `error` to log only queries that result in errors. This option is supported in both VTGate and VTTablet.

### <a id="semi-sync-monitor"/>Semi-sync monitor in vttablet</a>

A new component has been added to the vttablet binary to monitor the semi-sync status of primary vttablets. We've observed cases where a brief network disruption can cause the primary to get stuck indefinitely waiting for semi-sync ACKs. In rare scenarios, this can block reparent operations and render the primary unresponsive. More information can be found in the issues https://github.com/vitessio/vitess/issues/17709 and https://github.com/vitessio/vitess/issues/17749.

To address this, the new component continuously monitors the semi-sync status. If the primary becomes stuck on semi-sync ACKs, it generates writes to unblock it. If this fails, VTOrc is notified of the issue and initiates an emergency reparent operation.

The monitoring interval can be adjusted using the `--semi-sync-monitor-interval` flag, which defaults to 10 seconds.

## <a id="minor-changes"/>Minor Changes</a>

#### <a id="flags-vttablet"/>VTTablet Flags</a>
Expand Down
7 changes: 7 additions & 0 deletions go/cmd/vtctldclient/command/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ import (
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtenv"
"vitess.io/vitess/go/vt/vttablet/grpctmserver"
"vitess.io/vitess/go/vt/vttablet/tabletconntest"
"vitess.io/vitess/go/vt/vttablet/tabletmanager"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/semisyncmonitor"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
"vitess.io/vitess/go/vt/vttablet/tabletservermock"
"vitess.io/vitess/go/vt/vttablet/tmclient"
Expand Down Expand Up @@ -158,6 +160,10 @@ func NewFakeTablet(t *testing.T, ts *topo.Server, cell string, uid uint32, table
}
}

var (
exporter = servenv.NewExporter("TestVtctldClientCommand", "")
)

// StartActionLoop will start the action loop for a fake tablet,
// using ft.FakeMysqlDaemon as the backing mysqld.
func (ft *FakeTablet) StartActionLoop(t *testing.T, ts *topo.Server) {
Expand Down Expand Up @@ -203,6 +209,7 @@ func (ft *FakeTablet) StartActionLoop(t *testing.T, ts *topo.Server) {
DBConfigs: &dbconfigs.DBConfigs{},
QueryServiceControl: tabletservermock.NewController(),
VREngine: vreplication.NewTestEngine(ts, ft.Tablet.Alias.Cell, ft.FakeMysqlDaemon, binlogplayer.NewFakeDBClient, binlogplayer.NewFakeDBClient, topoproto.TabletDbName(ft.Tablet), nil),
SemiSyncMonitor: semisyncmonitor.CreateTestSemiSyncMonitor(ft.FakeMysqlDaemon.DB(), exporter),
Env: vtenv.NewTestEnv(),
}
if err := ft.TM.Start(ft.Tablet, nil); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions go/cmd/vttablet/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtenv"
"vitess.io/vitess/go/vt/vttablet/tabletmanager"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/semisyncmonitor"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
"vitess.io/vitess/go/vt/vttablet/tabletserver"
Expand Down Expand Up @@ -168,6 +169,7 @@ func run(cmd *cobra.Command, args []string) error {
QueryServiceControl: qsc,
UpdateStream: binlog.NewUpdateStream(ts, tablet.Keyspace, tabletAlias.Cell, qsc.SchemaEngine(), env.Parser()),
VREngine: vreplication.NewEngine(env, config, ts, tabletAlias.Cell, mysqld, qsc.LagThrottler()),
SemiSyncMonitor: semisyncmonitor.NewMonitor(config, qsc.Exporter()),
VDiffEngine: vdiff.NewEngine(ts, tablet, env.CollationEnv(), env.Parser()),
}
if err := tm.Start(tablet, config); err != nil {
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ Flags:
--schema_change_signal Enable the schema tracker; requires queryserver-config-schema-change-signal to be enabled on the underlying vttablets for this to work (default true)
--schema_dir string Schema base directory. Should contain one directory per keyspace, with a vschema.json file if necessary.
--security_policy string the name of a registered security policy to use for controlling access to URLs - empty means allow all for anyone (built-in policies: deny-all, read-only)
--semi-sync-monitor-interval duration Interval between semi-sync monitor checks if the primary is blocked on semi-sync ACKs (default 10s)
--service_map strings comma separated list of services to enable (or disable if prefixed with '-') Example: grpc-queryservice
--serving_state_grace_period duration how long to pause after broadcasting health to vtgate, before enforcing a new serving state
--shard_sync_retry_delay duration delay between retries of updates to keep the tablet and its shard record in sync (default 30s)
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ Flags:
--schema-change-reload-timeout duration query server schema change reload timeout, this is how long to wait for the signaled schema reload operation to complete before giving up (default 30s)
--schema-version-max-age-seconds int max age of schema version records to kept in memory by the vreplication historian
--security_policy string the name of a registered security policy to use for controlling access to URLs - empty means allow all for anyone (built-in policies: deny-all, read-only)
--semi-sync-monitor-interval duration Interval between semi-sync monitor checks if the primary is blocked on semi-sync ACKs (default 10s)
--service_map strings comma separated list of services to enable (or disable if prefixed with '-') Example: grpc-queryservice
--serving_state_grace_period duration how long to pause after broadcasting health to vtgate, before enforcing a new serving state
--shard_sync_retry_delay duration delay between retries of updates to keep the tablet and its shard record in sync (default 30s)
Expand Down
94 changes: 94 additions & 0 deletions go/test/endtoend/reparent/newfeaturetest/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@ package newfeaturetest
import (
"context"
"fmt"
"os/exec"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/reparent/utils"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vtctl/reparentutil/policy"
)

Expand Down Expand Up @@ -234,3 +237,94 @@ func TestBufferingWithMultipleDisruptions(t *testing.T) {
// Wait for all the writes to have succeeded.
wg.Wait()
}

// TestSemiSyncBlockDueToDisruption tests that Vitess can recover from a situation
// where a primary is stuck waiting for semi-sync ACKs due to a network issue,
// even if no new writes from the user arrives.
func TestSemiSyncBlockDueToDisruption(t *testing.T) {
t.Skip("Test not meant to be run on CI")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should only do this if the CI env var is set. Otherwise you have to edit the test to run it locally as well. We do this today in a number of other places.

clusterInstance := utils.SetupReparentCluster(t, policy.DurabilitySemiSync)
defer utils.TeardownCluster(clusterInstance)
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets
utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2], tablets[3]})

// stop heartbeats on all the replicas
for idx, tablet := range tablets {
if idx == 0 {
continue
}
utils.RunSQLs(context.Background(), t, []string{
"stop slave;",
"change master to MASTER_HEARTBEAT_PERIOD = 0;",
"start slave;",
Comment on lines +257 to +259
Copy link
Contributor

@mattlord mattlord Feb 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

w/o the heartbeats... could we also set MASTER_CONNECT_RETRY to 120 seconds and lower the net_read_timeout (only on the replicas) to 5 seconds so that after no replicated events for 5 seconds the connection is closed on the replica side, and we don't reconnect, and then we can possibly create the scenario w/o needing root privs and external commands by then doing a write on the primary? I'm not sure, but wanted to ask.

}, tablet)
}

// Take a backup of the pf.conf file
runCommandWithSudo(t, "cp", "/etc/pf.conf", "/etc/pf.conf.backup")
defer func() {
// Restore the file from backup
runCommandWithSudo(t, "mv", "/etc/pf.conf.backup", "/etc/pf.conf")
runCommandWithSudo(t, "pfctl", "-f", "/etc/pf.conf")
}()
// Disrupt the network between the primary and the replicas
runCommandWithSudo(t, "sh", "-c", fmt.Sprintf("echo 'block in proto tcp from any to any port %d' | sudo tee -a /etc/pf.conf > /dev/null", tablets[0].MySQLPort))

// This following command is only required if pfctl is not already enabled
//runCommandWithSudo(t, "pfctl", "-e")
runCommandWithSudo(t, "pfctl", "-f", "/etc/pf.conf")
rules := runCommandWithSudo(t, "pfctl", "-s", "rules")
log.Errorf("Rules enforced - %v", rules)

// Start a write that will be blocked by the primary waiting for semi-sync ACKs
ch := make(chan any)
go func() {
defer func() {
close(ch)
}()
utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2], tablets[3]})
}()

// Starting VTOrc later now, because we don't want it to fix the heartbeat interval
// on the replica's before the disruption has been introduced.
err := clusterInstance.StartVTOrc(clusterInstance.Keyspaces[0].Name)
require.NoError(t, err)
go func() {
for {
select {
case <-ch:
return
case <-time.After(1 * time.Second):
str, isPresent := tablets[0].VttabletProcess.GetVars()["SemiSyncMonitorWritesBlocked"]
if isPresent {
log.Errorf("SemiSyncMonitorWritesBlocked - %v", str)
}
}
}
}()
// If the network disruption is too long lived, then we will end up running ERS from VTOrc.
networkDisruptionDuration := 43 * time.Second
time.Sleep(networkDisruptionDuration)

// Restore the network
runCommandWithSudo(t, "cp", "/etc/pf.conf.backup", "/etc/pf.conf")
runCommandWithSudo(t, "pfctl", "-f", "/etc/pf.conf")

// We expect the problem to be resolved in less than 30 seconds.
select {
case <-time.After(30 * time.Second):
t.Errorf("Timed out waiting for semi-sync to be unblocked")
case <-ch:
log.Errorf("Woohoo, write finished!")
}
}

// runCommandWithSudo runs the provided command with sudo privileges
// when the command is run, it prompts the user for the password, and it must be
// entered for the program to resume.
func runCommandWithSudo(t *testing.T, args ...string) string {
cmd := exec.Command("sudo", args...)
out, err := cmd.CombinedOutput()
assert.NoError(t, err, string(out))
return string(out)
}
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/sidecardb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var ddls1, ddls2 []string

func init() {
sidecarDBTables = []string{"copy_state", "dt_participant", "dt_state", "heartbeat", "post_copy_action",
"redo_state", "redo_statement", "reparent_journal", "resharding_journal", "schema_migrations", "schema_version",
"redo_state", "redo_statement", "reparent_journal", "resharding_journal", "schema_migrations", "schema_version", "semisync_recover",
"tables", "udfs", "vdiff", "vdiff_log", "vdiff_table", "views", "vreplication", "vreplication_log"}
numSidecarDBTables = len(sidecarDBTables)
ddls1 = []string{
Expand Down
2 changes: 2 additions & 0 deletions go/test/endtoend/vtorc/readtopologyinstance/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func TestReadTopologyInstanceBufferable(t *testing.T) {
assert.True(t, primaryInstance.SemiSyncReplicaEnabled)
assert.True(t, primaryInstance.SemiSyncPrimaryStatus)
assert.False(t, primaryInstance.SemiSyncReplicaStatus)
assert.False(t, primaryInstance.SemiSyncMonitorBlocked)
assert.EqualValues(t, 2, primaryInstance.SemiSyncPrimaryClients)
assert.EqualValues(t, 1, primaryInstance.SemiSyncPrimaryWaitForReplicaCount)
assert.EqualValues(t, 1000000000000000000, primaryInstance.SemiSyncPrimaryTimeout)
Expand Down Expand Up @@ -142,6 +143,7 @@ func TestReadTopologyInstanceBufferable(t *testing.T) {
assert.False(t, replicaInstance.SemiSyncPrimaryEnabled)
assert.True(t, replicaInstance.SemiSyncReplicaEnabled)
assert.False(t, replicaInstance.SemiSyncPrimaryStatus)
assert.False(t, replicaInstance.SemiSyncMonitorBlocked)
assert.True(t, replicaInstance.SemiSyncReplicaStatus)
assert.EqualValues(t, 0, replicaInstance.SemiSyncPrimaryClients)
assert.EqualValues(t, 1, replicaInstance.SemiSyncPrimaryWaitForReplicaCount)
Expand Down
5 changes: 5 additions & 0 deletions go/vt/mysqlctl/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ func NewFakeMysqlDaemon(db *fakesqldb.DB) *FakeMysqlDaemon {
return result
}

// DB returns the fakesqldb.DB object.
func (fmd *FakeMysqlDaemon) DB() *fakesqldb.DB {
return fmd.db
}

// Start is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) Start(ctx context.Context, cnf *Mycnf, mysqldArgs ...string) error {
if fmd.Running {
Expand Down
30 changes: 21 additions & 9 deletions go/vt/proto/replicationdata/replicationdata.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 36 additions & 0 deletions go/vt/proto/replicationdata/replicationdata_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions go/vt/sidecardb/schema/misc/semisync_recover.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
Copyright 2025 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

CREATE TABLE IF NOT EXISTS semisync_recover
(
ts BIGINT UNSIGNED NOT NULL,
PRIMARY KEY (`ts`)
) ENGINE = InnoDB CHARSET = utf8mb4
1 change: 1 addition & 0 deletions go/vt/vtorc/db/generate_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ CREATE TABLE database_instance (
semi_sync_primary_status TINYint NOT NULL DEFAULT 0,
semi_sync_replica_status TINYint NOT NULL DEFAULT 0,
semi_sync_primary_clients int NOT NULL DEFAULT 0,
semi_sync_monitor_blocked tinyint NOT NULL DEFAULT 0,
is_disk_stalled TINYint NOT NULL DEFAULT 0,
PRIMARY KEY (alias)
)`,
Expand Down
Loading
Loading