Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

roachtest: add failover/liveness #93039

Merged
merged 1 commit into from
Dec 8, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
256 changes: 256 additions & 0 deletions pkg/cmd/roachtest/tests/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,20 @@ import (
"context"
gosql "database/sql"
"fmt"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/clusterstats"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/prometheus"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

Expand All @@ -46,6 +50,24 @@ func registerFailover(r registry.Registry) {
})
}

for _, failureMode := range []failureMode{
&failureModeBlackhole{},
&failureModeBlackholeRecv{},
&failureModeBlackholeSend{},
&failureModeCrash{},
} {
failureMode := failureMode // pin loop variable
r.Add(registry.TestSpec{
Name: fmt.Sprintf("failover/liveness/%s", failureMode),
Owner: registry.OwnerKV,
Timeout: 20 * time.Minute,
Cluster: r.MakeClusterSpec(5, spec.CPU(4)),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runFailoverLiveness(ctx, t, c, failureMode)
},
})
}

for _, failureMode := range []failureMode{
&failureModeBlackhole{},
&failureModeBlackholeRecv{},
Expand Down Expand Up @@ -95,6 +117,7 @@ func runFailoverNonSystem(
ctx context.Context, t test.Test, c cluster.Cluster, failureMode failureMode,
) {
require.Equal(t, 7, c.Spec().NodeCount)
require.False(t, c.IsLocal(), "test can't use local cluster") // messes with iptables

rng, _ := randutil.NewTestRand()

Expand Down Expand Up @@ -216,6 +239,215 @@ func runFailoverNonSystem(
m.Wait()
}

// runFailoverLiveness benchmarks the maximum duration of *user* range
// unavailability following a liveness-only leaseholder failure. When the
// liveness range becomes unavailable, other nodes are unable to heartbeat and
// extend their leases, and their leases may thus expire as well making them
// unavailable.
//
// - Only liveness range located on the failed node, as leaseholder.
//
// - SQL clients do not connect to the failed node.
//
// - The workload consists of individual point reads and writes.
//
// Since the range unavailability is probabilistic, depending e.g. on the time
// since the last heartbeat and other variables, we run 9 failures and record
// the number of expired leases on n1-n3 as well as the pMax latency to find the
// upper bound on unavailability. We do not assert anything, but instead export
// metrics for graphing.
//
// The cluster layout is as follows:
//
// n1-n3: All ranges, including liveness.
// n4: Liveness range leaseholder.
// n5: Workload runner.
//
// The test runs a kv50 workload with batch size 1, using 256 concurrent workers
// directed at n1-n3 with a rate of 2048 reqs/s. n4 fails and recovers, with 30
// seconds between each operation, for 9 cycles.
//
// TODO(erikgrinaker): The metrics resolution of 10 seconds isn't really good
// enough to accurately measure the number of invalid leases, but it's what we
// have currently. Prometheus scraping more often isn't enough, because CRDB
// itself only samples every 10 seconds.
func runFailoverLiveness(
ctx context.Context, t test.Test, c cluster.Cluster, failureMode failureMode,
) {
require.Equal(t, 5, c.Spec().NodeCount)
require.False(t, c.IsLocal(), "test can't use local cluster") // messes with iptables

rng, _ := randutil.NewTestRand()

// Create cluster.
opts := option.DefaultStartOpts()
settings := install.MakeClusterSettings()
c.Put(ctx, t.Cockroach(), "./cockroach")
c.Start(ctx, t.L(), opts, settings, c.Range(1, 4))

if f, ok := failureMode.(*failureModeCrash); ok {
f.startOpts = opts
f.startSettings = settings
}

conn := c.Conn(ctx, t.L(), 1)
defer conn.Close()

// Setup the prometheus instance and client. We don't collect metrics from n4
// (the failing node) because it's occasionally offline, and StatsCollector
// doesn't like it when the time series are missing data points.
promCfg := (&prometheus.Config{}).
WithCluster(c.Range(1, 3).InstallNodes()).
WithPrometheusNode(5)

require.NoError(t, c.StartGrafana(ctx, t.L(), promCfg))
defer func() {
if err := c.StopGrafana(ctx, t.L(), t.ArtifactsDir()); err != nil {
t.L().ErrorfCtx(ctx, "Error(s) shutting down prom/grafana %s", err)
}
}()

promClient, err := clusterstats.SetupCollectorPromClient(ctx, c, t.L(), promCfg)
require.NoError(t, err)
statsCollector := clusterstats.NewStatsCollector(ctx, promClient)

// Configure cluster. This test controls the ranges manually.
t.Status("configuring cluster")
_, err = conn.ExecContext(ctx, `SET CLUSTER SETTING kv.range_split.by_load_enabled = 'false'`)
require.NoError(t, err)

// Constrain all existing zone configs to n1-n3.
rows, err := conn.QueryContext(ctx, `SELECT target FROM [SHOW ALL ZONE CONFIGURATIONS]`)
require.NoError(t, err)
for rows.Next() {
var target string
require.NoError(t, rows.Scan(&target))
_, err = conn.ExecContext(ctx, fmt.Sprintf(
`ALTER %s CONFIGURE ZONE USING num_replicas = 3, constraints = '[-node4]'`,
target))
require.NoError(t, err)
}
require.NoError(t, rows.Err())

// Constrain the liveness range to n1-n4, with leaseholder preference on n4.
_, err = conn.ExecContext(ctx, `ALTER RANGE liveness CONFIGURE ZONE USING `+
`num_replicas = 4, constraints = '[]', lease_preferences = '[[+node4]]'`)
require.NoError(t, err)

// Wait for upreplication.
require.NoError(t, WaitFor3XReplication(ctx, t, conn))

// Create the kv database, constrained to n1-n3. Despite the zone config, the
// ranges will initially be distributed across all cluster nodes.
t.Status("creating workload database")
_, err = conn.ExecContext(ctx, `CREATE DATABASE kv`)
require.NoError(t, err)
_, err = conn.ExecContext(ctx, `ALTER DATABASE kv CONFIGURE ZONE USING `+
`num_replicas = 3, constraints = '[-node4]'`)
require.NoError(t, err)
c.Run(ctx, c.Node(5), `./cockroach workload init kv --splits 1000 {pgurl:1}`)

// The replicate queue takes forever to move the other ranges off of n4 so we
// do it ourselves. Precreating the database/range and moving it to the
// correct nodes first is not sufficient, since workload will spread the
// ranges across all nodes regardless.
relocateRanges(t, ctx, conn, `range_id != 2`, []int{4}, []int{1, 2, 3})

// We also make sure the lease is located on n4.
relocateLeases(t, ctx, conn, `range_id = 2`, 4)

// Start workload on n7, using n1-n3 as gateways. Run it for 10 minutes, since
// we take ~1 minute to fail and recover the node, and we do 9 cycles.
t.Status("running workload")
m := c.NewMonitor(ctx, c.Range(1, 4))
m.Go(func(ctx context.Context) error {
c.Run(ctx, c.Node(5), `./cockroach workload run kv --read-percent 50 `+
`--duration 600s --concurrency 256 --max-rate 2048 --timeout 30s --tolerate-errors `+
`--histograms=`+t.PerfArtifactsDir()+`/stats.json `+
`{pgurl:1-3}`)
return nil
})
startTime := timeutil.Now()

// Start a worker to fail and recover n4.
defer failureMode.Cleanup(ctx, t, c)

m.Go(func(ctx context.Context) error {
var raftCfg base.RaftConfig
raftCfg.SetDefaults()

ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()

for i := 0; i < 9; i++ {
select {
case <-ticker.C:
case <-ctx.Done():
return ctx.Err()
}

randTimer := time.After(randutil.RandDuration(rng, raftCfg.RangeLeaseRenewalDuration()))

// Ranges and leases may occasionally escape their constraints. Move them
// to where they should be.
relocateRanges(t, ctx, conn, `range_id != 2`, []int{4}, []int{1, 2, 3})
relocateLeases(t, ctx, conn, `range_id = 2`, 4)

// Randomly sleep up to the lease renewal interval, to vary the time
// between the last lease renewal and the failure. We start the timer
// before the range relocation above to run them concurrently.
select {
case <-randTimer:
case <-ctx.Done():
}

t.Status(fmt.Sprintf("failing n%d (%s)", 4, failureMode))
if failureMode.ExpectDeath() {
m.ExpectDeath()
}
failureMode.Fail(ctx, t, c, 4)

select {
case <-ticker.C:
case <-ctx.Done():
return ctx.Err()
}

t.Status(fmt.Sprintf("recovering n%d (%s)", 4, failureMode))
failureMode.Recover(ctx, t, c, 4)
relocateLeases(t, ctx, conn, `range_id = 2`, 4)
}
return nil
})
m.Wait()

// Export roachperf metrics from Prometheus.
require.NoError(t, statsCollector.Exporter().Export(ctx, c, t, startTime, timeutil.Now(),
[]clusterstats.AggQuery{
{
Stat: clusterstats.ClusterStat{
LabelName: "node",
Query: "replicas_leaders_invalid_lease",
},
Query: "sum(replicas_leaders_invalid_lease)",
Tag: "Invalid Leases",
},
},
func(stats map[string]clusterstats.StatSummary) (string, float64) {
summary, ok := stats["replicas_leaders_invalid_lease"]
require.True(t, ok, "stat summary for replicas_leaders_invalid_lease not found")
var max float64
for _, v := range summary.Value {
if v > max {
max = v
}
}
t.Status(fmt.Sprintf("Max invalid leases: %d", int64(max)))
return "Max invalid leases", max
},
))
}

// runFailoverSystemNonLiveness benchmarks the maximum duration of range
// unavailability following a leaseholder failure with only system ranges,
// excluding the liveness range which is tested separately in
Expand Down Expand Up @@ -519,3 +751,27 @@ func relocateRanges(
}
}
}

// relocateLeases relocates all leases matching the given predicate to the
// given node. Errors and failures are retried indefinitely.
func relocateLeases(t test.Test, ctx context.Context, conn *gosql.DB, predicate string, to int) {
require.NotEmpty(t, predicate)
var count int
where := fmt.Sprintf("%s AND lease_holder != %d", predicate, to)
for {
require.NoError(t, conn.QueryRowContext(ctx,
`SELECT count(*) FROM crdb_internal.ranges WHERE `+where).Scan(&count))
if count == 0 {
break
}
t.Status(fmt.Sprintf("moving %d leases to n%d (%s)", count, to, predicate))
_, err := conn.ExecContext(ctx, `ALTER RANGE RELOCATE LEASE TO $1::int FOR `+
`SELECT range_id FROM crdb_internal.ranges WHERE `+where,
to)
// When a node recovers, it may not have gossiped its store key yet.
if err != nil && !strings.Contains(err.Error(), "KeyNotPresentError") {
require.NoError(t, err)
}
time.Sleep(time.Second)
}
}